Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 12 additions & 18 deletions dbos/dbos.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,17 @@ const (
// Config holds configuration parameters for initializing a DBOS context.
// DatabaseURL and AppName are required.
type Config struct {
AppName string // Application name for identification (required)
DatabaseURL string // DatabaseURL is a PostgreSQL connection string. Either this or SystemDBPool is required.
SystemDBPool *pgxpool.Pool // SystemDBPool is a custom System Database Pool. It's optional and takes precedence over DatabaseURL if both are provided.
DatabaseSchema string // Database schema name (defaults to "dbos")
Logger *slog.Logger // Custom logger instance (defaults to a new slog logger)
AdminServer bool // Enable Transact admin HTTP server (disabled by default)
AdminServerPort int // Port for the admin HTTP server (default: 3001)
ConductorURL string // DBOS conductor service URL (optional)
ConductorAPIKey string // DBOS conductor API key (optional)
ApplicationVersion string // Application version (optional, overridden by DBOS__APPVERSION env var)
ExecutorID string // Executor ID (optional, overridden by DBOS__VMID env var)
QueueRunnerConfig QueueRunnerConfig // Queue runner configuration (optional)
AppName string // Application name for identification (required)
DatabaseURL string // DatabaseURL is a PostgreSQL connection string. Either this or SystemDBPool is required.
SystemDBPool *pgxpool.Pool // SystemDBPool is a custom System Database Pool. It's optional and takes precedence over DatabaseURL if both are provided.
DatabaseSchema string // Database schema name (defaults to "dbos")
Logger *slog.Logger // Custom logger instance (defaults to a new slog logger)
AdminServer bool // Enable Transact admin HTTP server (disabled by default)
AdminServerPort int // Port for the admin HTTP server (default: 3001)
ConductorURL string // DBOS conductor service URL (optional)
ConductorAPIKey string // DBOS conductor API key (optional)
ApplicationVersion string // Application version (optional, overridden by DBOS__APPVERSION env var)
ExecutorID string // Executor ID (optional, overridden by DBOS__VMID env var)
}

func processConfig(inputConfig *Config) (*Config, error) {
Expand All @@ -55,10 +54,6 @@ func processConfig(inputConfig *Config) (*Config, error) {
inputConfig.AdminServerPort = _DEFAULT_ADMIN_SERVER_PORT
}

if inputConfig.QueueRunnerConfig.MinInterval > inputConfig.QueueRunnerConfig.MaxInterval {
return nil, fmt.Errorf("minInterval must be less than maxInterval")
}

dbosConfig := &Config{
DatabaseURL: inputConfig.DatabaseURL,
AppName: inputConfig.AppName,
Expand All @@ -71,7 +66,6 @@ func processConfig(inputConfig *Config) (*Config, error) {
ApplicationVersion: inputConfig.ApplicationVersion,
ExecutorID: inputConfig.ExecutorID,
SystemDBPool: inputConfig.SystemDBPool,
QueueRunnerConfig: inputConfig.QueueRunnerConfig,
}

// Load defaults
Expand Down Expand Up @@ -394,7 +388,7 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error
initExecutor.logger.Debug("System database initialized")

// Initialize the queue runner and register DBOS internal queue
initExecutor.queueRunner = newQueueRunner(initExecutor.logger, config.QueueRunnerConfig)
initExecutor.queueRunner = newQueueRunner(initExecutor.logger)

// Initialize conductor if API key is provided
if config.ConductorAPIKey != "" {
Expand Down
167 changes: 96 additions & 71 deletions dbos/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package dbos
import (
"context"
"log/slog"
"math"
"math/rand"
"sync"
"time"

"github.com/jackc/pgerrcode"
Expand All @@ -14,9 +14,8 @@ import (
const (
_DBOS_INTERNAL_QUEUE_NAME = "_dbos_internal_queue"
_DEFAULT_MAX_TASKS_PER_ITERATION = 100
_DEFAULT_BASE_INTERVAL = 1.0
_DEFAULT_MAX_INTERVAL = 120.0
_DEFAULT_MIN_INTERVAL = 1.0
_DEFAULT_BASE_POLLING_INTERVAL = 1 * time.Second
_DEFAULT_MAX_POLLING_INTERVAL = 120 * time.Second
)

// RateLimiter configures rate limiting for workflow queue execution.
Expand All @@ -29,13 +28,15 @@ type RateLimiter struct {
// WorkflowQueue defines a named queue for workflow execution.
// Queues provide controlled workflow execution with concurrency limits, priority scheduling, and rate limiting.
type WorkflowQueue struct {
Name string `json:"name"` // Unique queue name
WorkerConcurrency *int `json:"workerConcurrency,omitempty"` // Max concurrent workflows per executor
GlobalConcurrency *int `json:"concurrency,omitempty"` // Max concurrent workflows across all executors
PriorityEnabled bool `json:"priorityEnabled,omitempty"` // Enable priority-based scheduling
RateLimit *RateLimiter `json:"rateLimit,omitempty"` // Rate limiting configuration
MaxTasksPerIteration int `json:"maxTasksPerIteration"` // Max workflows to dequeue per iteration
PartitionQueue bool `json:"partitionQueue,omitempty"` // Enable partitioned queue mode
Name string `json:"name"` // Unique queue name
WorkerConcurrency *int `json:"workerConcurrency,omitempty"` // Max concurrent workflows per executor
GlobalConcurrency *int `json:"concurrency,omitempty"` // Max concurrent workflows across all executors
PriorityEnabled bool `json:"priorityEnabled,omitempty"` // Enable priority-based scheduling
RateLimit *RateLimiter `json:"rateLimit,omitempty"` // Rate limiting configuration
MaxTasksPerIteration int `json:"maxTasksPerIteration"` // Max workflows to dequeue per iteration
PartitionQueue bool `json:"partitionQueue,omitempty"` // Enable partitioned queue mode
basePollingInterval time.Duration // Base polling interval (minimum, never poll faster)
maxPollingInterval time.Duration // Maximum polling interval (never poll slower)
}

// QueueOption is a functional option for configuring a workflow queue
Expand Down Expand Up @@ -91,9 +92,27 @@ func WithPartitionQueue() QueueOption {
}
}

// WithQueueBasePollingInterval sets the initial polling interval for the queue.
// This is the starting interval and the minimum - the queue will never poll faster than this.
// If not set (0), the queue will use the default base polling interval during creation.
func WithQueueBasePollingInterval(interval time.Duration) QueueOption {
return func(q *WorkflowQueue) {
q.basePollingInterval = interval
}
}

// WithQueueMaxPollingInterval sets the maximum polling interval for the queue.
// The queue will never poll slower than this value, even when backing off due to errors.
// If not set (0), the queue will use the default max polling interval during creation.
func WithQueueMaxPollingInterval(interval time.Duration) QueueOption {
return func(q *WorkflowQueue) {
q.maxPollingInterval = interval
}
}

// NewWorkflowQueue creates a new workflow queue with the specified name and configuration options.
// The queue must be created before workflows can be enqueued to it using the WithQueue option in RunWorkflow.
// Queues provide controlled execution with support for concurrency limits, priority scheduling, and rate limiting.
// Queues provide controlled execution with support for concurrency limits, priority scheduling, rate limiting, and polling intervals.
//
// Example:
//
Expand All @@ -104,6 +123,8 @@ func WithPartitionQueue() QueueOption {
// Period: 60 * time.Second, // 100 workflows per minute
// }),
// dbos.WithPriorityEnabled(),
// dbos.WithQueueBasePollingInterval(1 * time.Second),
// dbos.WithQueueMaxPollingInterval(60 * time.Second),
// )
//
// // Enqueue workflows to this queue:
Expand All @@ -130,33 +151,24 @@ func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...QueueOption)
PriorityEnabled: false,
RateLimit: nil,
MaxTasksPerIteration: _DEFAULT_MAX_TASKS_PER_ITERATION,
basePollingInterval: _DEFAULT_BASE_POLLING_INTERVAL,
maxPollingInterval: _DEFAULT_MAX_POLLING_INTERVAL,
}

// Apply functional options
for _, option := range options {
option(&q)
}

// Register the queue in the global registry
ctx.queueRunner.workflowQueueRegistry[name] = q

return q
}

// QueueRunnerConfig configures the queue runner polling behavior.
type QueueRunnerConfig struct {
BaseInterval float64 // seconds
MinInterval float64 // seconds
MaxInterval float64 // seconds
}

type queueRunner struct {
logger *slog.Logger

// Queue runner iteration parameters
baseInterval float64
minInterval float64
maxInterval float64
backoffFactor float64
scalebackFactor float64
jitterMin float64
Expand All @@ -165,25 +177,15 @@ type queueRunner struct {
// Queue registry
workflowQueueRegistry map[string]WorkflowQueue

// WaitGroup to track all queue goroutines
queueGoroutinesWg sync.WaitGroup

// Channel to signal completion back to the DBOS context
completionChan chan struct{}
}

func newQueueRunner(logger *slog.Logger, config QueueRunnerConfig) *queueRunner {
if config.BaseInterval == 0 {
config.BaseInterval = _DEFAULT_BASE_INTERVAL
}
if config.MinInterval == 0 {
config.MinInterval = _DEFAULT_MIN_INTERVAL
}
if config.MaxInterval == 0 {
config.MaxInterval = _DEFAULT_MAX_INTERVAL
}

func newQueueRunner(logger *slog.Logger) *queueRunner {
return &queueRunner{
baseInterval: config.BaseInterval,
minInterval: config.MinInterval,
maxInterval: config.MaxInterval,
backoffFactor: 2.0,
scalebackFactor: 0.9,
jitterMin: 0.95,
Expand All @@ -203,37 +205,62 @@ func (qr *queueRunner) listQueues() []WorkflowQueue {
}

// getQueue returns the queue with the given name from the registry.
// Returns a pointer to the queue if found, or nil if the queue does not exist.
// Returns a pointer to the queue if found, or nil if it does not exist.
func (qr *queueRunner) getQueue(queueName string) *WorkflowQueue {
if queue, exists := qr.workflowQueueRegistry[queueName]; exists {
return &queue
}
return nil
}

// run starts a goroutine for each registered queue to handle polling independently.
func (qr *queueRunner) run(ctx *dbosContext) {
pollingInterval := qr.baseInterval
for _, queue := range qr.workflowQueueRegistry {
qr.queueGoroutinesWg.Add(1)
go qr.runQueue(ctx, queue)
}

// Wait for all queue goroutines to complete
qr.queueGoroutinesWg.Wait()
qr.logger.Debug("All queue goroutines completed")
qr.completionChan <- struct{}{}
}

func (qr *queueRunner) runQueue(ctx *dbosContext, queue WorkflowQueue) {
defer qr.queueGoroutinesWg.Done()

queueLogger := qr.logger.With("queue_name", queue.Name)
// Current polling interval starts at the base interval and adjusts based on errors
currentPollingInterval := queue.basePollingInterval

for {
hasBackoffError := false

// Iterate through all queues in the registry
for _, queue := range qr.workflowQueueRegistry {
// Build list of partition keys to dequeue from
// Default to empty string for non-partitioned queues
partitionKeys := []string{""}
if queue.PartitionQueue {
partitions, err := retryWithResult(ctx, func() ([]string, error) {
return ctx.systemDB.getQueuePartitions(ctx, queue.Name)
}, withRetrierLogger(qr.logger))
if err != nil {
qr.logger.Error("Error getting queue partitions", "queue_name", queue.Name, "error", err)
continue
skipDequeue := false

// Build list of partition keys to dequeue from
// Default to empty string for non-partitioned queues
partitionKeys := []string{""}
if queue.PartitionQueue {
partitions, err := retryWithResult(ctx, func() ([]string, error) {
return ctx.systemDB.getQueuePartitions(ctx, queue.Name)
}, withRetrierLogger(queueLogger))
if err != nil {
skipDequeue = true
if pgErr, ok := err.(*pgconn.PgError); ok {
switch pgErr.Code {
case pgerrcode.SerializationFailure, pgerrcode.LockNotAvailable:
hasBackoffError = true
}
} else {
queueLogger.Error("Error getting queue partitions", "error", err)
}
} else {
partitionKeys = partitions
}
}

// Dequeue from each partition (or once for non-partitioned queues)
// Dequeue from each partition (or once for non-partitioned queues)
if !skipDequeue {
var dequeuedWorkflows []dequeuedWorkflow
for _, partitionKey := range partitionKeys {
workflows, shouldContinue := qr.dequeueWorkflows(ctx, queue, partitionKey, &hasBackoffError)
Expand All @@ -244,54 +271,54 @@ func (qr *queueRunner) run(ctx *dbosContext) {
}

if len(dequeuedWorkflows) > 0 {
qr.logger.Debug("Dequeued workflows from queue", "queue_name", queue.Name, "workflows", len(dequeuedWorkflows))
queueLogger.Debug("Dequeued workflows from queue", "workflows", len(dequeuedWorkflows))
}
for _, workflow := range dequeuedWorkflows {
// Find the workflow in the registry

wfName, ok := ctx.workflowCustomNametoFQN.Load(workflow.name)
if !ok {
qr.logger.Error("Workflow not found in registry", "workflow_name", workflow.name)
queueLogger.Error("Workflow not found in registry", "workflow_name", workflow.name)
continue
}

registeredWorkflowAny, exists := ctx.workflowRegistry.Load(wfName.(string))
if !exists {
qr.logger.Error("workflow function not found in registry", "workflow_name", workflow.name)
queueLogger.Error("workflow function not found in registry", "workflow_name", workflow.name)
continue
}
registeredWorkflow, ok := registeredWorkflowAny.(WorkflowRegistryEntry)
if !ok {
qr.logger.Error("invalid workflow registry entry type", "workflow_name", workflow.name)
queueLogger.Error("invalid workflow registry entry type", "workflow_name", workflow.name)
continue
}

// Pass encoded input directly - decoding will happen in workflow wrapper when we know the target type
_, err := registeredWorkflow.wrappedFunction(ctx, workflow.input, WithWorkflowID(workflow.id))
if err != nil {
qr.logger.Error("Error running queued workflow", "error", err)
queueLogger.Error("Error running queued workflow", "error", err)
}
}
}

// Adjust polling interval based on errors
// Adjust polling interval for this queue based on errors
if hasBackoffError {
// Increase polling interval using exponential backoff
pollingInterval = math.Min(pollingInterval*qr.backoffFactor, qr.maxInterval)
// Increase polling interval using exponential backoff, but never exceed maxPollingInterval
newInterval := time.Duration(float64(currentPollingInterval) * qr.backoffFactor)
currentPollingInterval = min(newInterval, queue.maxPollingInterval)
} else {
// Scale back polling interval on successful iteration
pollingInterval = math.Max(qr.minInterval, pollingInterval*qr.scalebackFactor)
// Scale back polling interval on successful iteration, but never go below base interval
newInterval := time.Duration(float64(currentPollingInterval) * qr.scalebackFactor)
currentPollingInterval = max(newInterval, queue.basePollingInterval)
}

// Apply jitter to the polling interval
// Apply jitter to this queue's polling interval
jitter := qr.jitterMin + rand.Float64()*(qr.jitterMax-qr.jitterMin) // #nosec G404 -- non-crypto jitter; acceptable
sleepDuration := time.Duration(pollingInterval * jitter * float64(time.Second))
sleepDuration := time.Duration(float64(currentPollingInterval) * jitter)

// Sleep with jittered interval, but allow early exit on context cancellation
select {
case <-ctx.Done():
qr.logger.Debug("Queue runner stopping due to context cancellation", "cause", context.Cause(ctx))
qr.completionChan <- struct{}{}
queueLogger.Debug("Queue goroutine stopping due to context cancellation", "cause", context.Cause(ctx))
return
case <-time.After(sleepDuration):
// Continue to next iteration
Expand All @@ -314,9 +341,7 @@ func (qr *queueRunner) dequeueWorkflows(ctx *dbosContext, queue WorkflowQueue, p
if err != nil {
if pgErr, ok := err.(*pgconn.PgError); ok {
switch pgErr.Code {
case pgerrcode.SerializationFailure:
*hasBackoffError = true
case pgerrcode.LockNotAvailable:
case pgerrcode.SerializationFailure, pgerrcode.LockNotAvailable:
*hasBackoffError = true
}
} else {
Expand Down
Loading