Skip to content

Commit c30803a

Browse files
authored
Per queue polling intervals (#198)
- Launch one goroutine per queue - Allow per queue polling intervals - Handle errors from getting the partitions - Simplify the logic (use `base` as a `min`)
1 parent e92c6ef commit c30803a

File tree

3 files changed

+135
-103
lines changed

3 files changed

+135
-103
lines changed

dbos/dbos.go

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,17 @@ const (
2929
// Config holds configuration parameters for initializing a DBOS context.
3030
// DatabaseURL and AppName are required.
3131
type Config struct {
32-
AppName string // Application name for identification (required)
33-
DatabaseURL string // DatabaseURL is a PostgreSQL connection string. Either this or SystemDBPool is required.
34-
SystemDBPool *pgxpool.Pool // SystemDBPool is a custom System Database Pool. It's optional and takes precedence over DatabaseURL if both are provided.
35-
DatabaseSchema string // Database schema name (defaults to "dbos")
36-
Logger *slog.Logger // Custom logger instance (defaults to a new slog logger)
37-
AdminServer bool // Enable Transact admin HTTP server (disabled by default)
38-
AdminServerPort int // Port for the admin HTTP server (default: 3001)
39-
ConductorURL string // DBOS conductor service URL (optional)
40-
ConductorAPIKey string // DBOS conductor API key (optional)
41-
ApplicationVersion string // Application version (optional, overridden by DBOS__APPVERSION env var)
42-
ExecutorID string // Executor ID (optional, overridden by DBOS__VMID env var)
43-
QueueRunnerConfig QueueRunnerConfig // Queue runner configuration (optional)
32+
AppName string // Application name for identification (required)
33+
DatabaseURL string // DatabaseURL is a PostgreSQL connection string. Either this or SystemDBPool is required.
34+
SystemDBPool *pgxpool.Pool // SystemDBPool is a custom System Database Pool. It's optional and takes precedence over DatabaseURL if both are provided.
35+
DatabaseSchema string // Database schema name (defaults to "dbos")
36+
Logger *slog.Logger // Custom logger instance (defaults to a new slog logger)
37+
AdminServer bool // Enable Transact admin HTTP server (disabled by default)
38+
AdminServerPort int // Port for the admin HTTP server (default: 3001)
39+
ConductorURL string // DBOS conductor service URL (optional)
40+
ConductorAPIKey string // DBOS conductor API key (optional)
41+
ApplicationVersion string // Application version (optional, overridden by DBOS__APPVERSION env var)
42+
ExecutorID string // Executor ID (optional, overridden by DBOS__VMID env var)
4443
}
4544

4645
func processConfig(inputConfig *Config) (*Config, error) {
@@ -55,10 +54,6 @@ func processConfig(inputConfig *Config) (*Config, error) {
5554
inputConfig.AdminServerPort = _DEFAULT_ADMIN_SERVER_PORT
5655
}
5756

58-
if inputConfig.QueueRunnerConfig.MinInterval > inputConfig.QueueRunnerConfig.MaxInterval {
59-
return nil, fmt.Errorf("minInterval must be less than maxInterval")
60-
}
61-
6257
dbosConfig := &Config{
6358
DatabaseURL: inputConfig.DatabaseURL,
6459
AppName: inputConfig.AppName,
@@ -71,7 +66,6 @@ func processConfig(inputConfig *Config) (*Config, error) {
7166
ApplicationVersion: inputConfig.ApplicationVersion,
7267
ExecutorID: inputConfig.ExecutorID,
7368
SystemDBPool: inputConfig.SystemDBPool,
74-
QueueRunnerConfig: inputConfig.QueueRunnerConfig,
7569
}
7670

7771
// Load defaults
@@ -394,7 +388,7 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error
394388
initExecutor.logger.Debug("System database initialized")
395389

396390
// Initialize the queue runner and register DBOS internal queue
397-
initExecutor.queueRunner = newQueueRunner(initExecutor.logger, config.QueueRunnerConfig)
391+
initExecutor.queueRunner = newQueueRunner(initExecutor.logger)
398392

399393
// Initialize conductor if API key is provided
400394
if config.ConductorAPIKey != "" {

dbos/queue.go

Lines changed: 96 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package dbos
33
import (
44
"context"
55
"log/slog"
6-
"math"
76
"math/rand"
7+
"sync"
88
"time"
99

1010
"github.com/jackc/pgerrcode"
@@ -14,9 +14,8 @@ import (
1414
const (
1515
_DBOS_INTERNAL_QUEUE_NAME = "_dbos_internal_queue"
1616
_DEFAULT_MAX_TASKS_PER_ITERATION = 100
17-
_DEFAULT_BASE_INTERVAL = 1.0
18-
_DEFAULT_MAX_INTERVAL = 120.0
19-
_DEFAULT_MIN_INTERVAL = 1.0
17+
_DEFAULT_BASE_POLLING_INTERVAL = 1 * time.Second
18+
_DEFAULT_MAX_POLLING_INTERVAL = 120 * time.Second
2019
)
2120

2221
// RateLimiter configures rate limiting for workflow queue execution.
@@ -29,13 +28,15 @@ type RateLimiter struct {
2928
// WorkflowQueue defines a named queue for workflow execution.
3029
// Queues provide controlled workflow execution with concurrency limits, priority scheduling, and rate limiting.
3130
type WorkflowQueue struct {
32-
Name string `json:"name"` // Unique queue name
33-
WorkerConcurrency *int `json:"workerConcurrency,omitempty"` // Max concurrent workflows per executor
34-
GlobalConcurrency *int `json:"concurrency,omitempty"` // Max concurrent workflows across all executors
35-
PriorityEnabled bool `json:"priorityEnabled,omitempty"` // Enable priority-based scheduling
36-
RateLimit *RateLimiter `json:"rateLimit,omitempty"` // Rate limiting configuration
37-
MaxTasksPerIteration int `json:"maxTasksPerIteration"` // Max workflows to dequeue per iteration
38-
PartitionQueue bool `json:"partitionQueue,omitempty"` // Enable partitioned queue mode
31+
Name string `json:"name"` // Unique queue name
32+
WorkerConcurrency *int `json:"workerConcurrency,omitempty"` // Max concurrent workflows per executor
33+
GlobalConcurrency *int `json:"concurrency,omitempty"` // Max concurrent workflows across all executors
34+
PriorityEnabled bool `json:"priorityEnabled,omitempty"` // Enable priority-based scheduling
35+
RateLimit *RateLimiter `json:"rateLimit,omitempty"` // Rate limiting configuration
36+
MaxTasksPerIteration int `json:"maxTasksPerIteration"` // Max workflows to dequeue per iteration
37+
PartitionQueue bool `json:"partitionQueue,omitempty"` // Enable partitioned queue mode
38+
basePollingInterval time.Duration // Base polling interval (minimum, never poll faster)
39+
maxPollingInterval time.Duration // Maximum polling interval (never poll slower)
3940
}
4041

4142
// QueueOption is a functional option for configuring a workflow queue
@@ -91,9 +92,27 @@ func WithPartitionQueue() QueueOption {
9192
}
9293
}
9394

95+
// WithQueueBasePollingInterval sets the initial polling interval for the queue.
96+
// This is the starting interval and the minimum - the queue will never poll faster than this.
97+
// If not set (0), the queue will use the default base polling interval during creation.
98+
func WithQueueBasePollingInterval(interval time.Duration) QueueOption {
99+
return func(q *WorkflowQueue) {
100+
q.basePollingInterval = interval
101+
}
102+
}
103+
104+
// WithQueueMaxPollingInterval sets the maximum polling interval for the queue.
105+
// The queue will never poll slower than this value, even when backing off due to errors.
106+
// If not set (0), the queue will use the default max polling interval during creation.
107+
func WithQueueMaxPollingInterval(interval time.Duration) QueueOption {
108+
return func(q *WorkflowQueue) {
109+
q.maxPollingInterval = interval
110+
}
111+
}
112+
94113
// NewWorkflowQueue creates a new workflow queue with the specified name and configuration options.
95114
// The queue must be created before workflows can be enqueued to it using the WithQueue option in RunWorkflow.
96-
// Queues provide controlled execution with support for concurrency limits, priority scheduling, and rate limiting.
115+
// Queues provide controlled execution with support for concurrency limits, priority scheduling, rate limiting, and polling intervals.
97116
//
98117
// Example:
99118
//
@@ -104,6 +123,8 @@ func WithPartitionQueue() QueueOption {
104123
// Period: 60 * time.Second, // 100 workflows per minute
105124
// }),
106125
// dbos.WithPriorityEnabled(),
126+
// dbos.WithQueueBasePollingInterval(1 * time.Second),
127+
// dbos.WithQueueMaxPollingInterval(60 * time.Second),
107128
// )
108129
//
109130
// // Enqueue workflows to this queue:
@@ -130,33 +151,24 @@ func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...QueueOption)
130151
PriorityEnabled: false,
131152
RateLimit: nil,
132153
MaxTasksPerIteration: _DEFAULT_MAX_TASKS_PER_ITERATION,
154+
basePollingInterval: _DEFAULT_BASE_POLLING_INTERVAL,
155+
maxPollingInterval: _DEFAULT_MAX_POLLING_INTERVAL,
133156
}
134157

135158
// Apply functional options
136159
for _, option := range options {
137160
option(&q)
138161
}
139-
140162
// Register the queue in the global registry
141163
ctx.queueRunner.workflowQueueRegistry[name] = q
142164

143165
return q
144166
}
145167

146-
// QueueRunnerConfig configures the queue runner polling behavior.
147-
type QueueRunnerConfig struct {
148-
BaseInterval float64 // seconds
149-
MinInterval float64 // seconds
150-
MaxInterval float64 // seconds
151-
}
152-
153168
type queueRunner struct {
154169
logger *slog.Logger
155170

156171
// Queue runner iteration parameters
157-
baseInterval float64
158-
minInterval float64
159-
maxInterval float64
160172
backoffFactor float64
161173
scalebackFactor float64
162174
jitterMin float64
@@ -165,25 +177,15 @@ type queueRunner struct {
165177
// Queue registry
166178
workflowQueueRegistry map[string]WorkflowQueue
167179

180+
// WaitGroup to track all queue goroutines
181+
queueGoroutinesWg sync.WaitGroup
182+
168183
// Channel to signal completion back to the DBOS context
169184
completionChan chan struct{}
170185
}
171186

172-
func newQueueRunner(logger *slog.Logger, config QueueRunnerConfig) *queueRunner {
173-
if config.BaseInterval == 0 {
174-
config.BaseInterval = _DEFAULT_BASE_INTERVAL
175-
}
176-
if config.MinInterval == 0 {
177-
config.MinInterval = _DEFAULT_MIN_INTERVAL
178-
}
179-
if config.MaxInterval == 0 {
180-
config.MaxInterval = _DEFAULT_MAX_INTERVAL
181-
}
182-
187+
func newQueueRunner(logger *slog.Logger) *queueRunner {
183188
return &queueRunner{
184-
baseInterval: config.BaseInterval,
185-
minInterval: config.MinInterval,
186-
maxInterval: config.MaxInterval,
187189
backoffFactor: 2.0,
188190
scalebackFactor: 0.9,
189191
jitterMin: 0.95,
@@ -203,37 +205,62 @@ func (qr *queueRunner) listQueues() []WorkflowQueue {
203205
}
204206

205207
// getQueue returns the queue with the given name from the registry.
206-
// Returns a pointer to the queue if found, or nil if the queue does not exist.
208+
// Returns a pointer to the queue if found, or nil if it does not exist.
207209
func (qr *queueRunner) getQueue(queueName string) *WorkflowQueue {
208210
if queue, exists := qr.workflowQueueRegistry[queueName]; exists {
209211
return &queue
210212
}
211213
return nil
212214
}
213215

216+
// run starts a goroutine for each registered queue to handle polling independently.
214217
func (qr *queueRunner) run(ctx *dbosContext) {
215-
pollingInterval := qr.baseInterval
218+
for _, queue := range qr.workflowQueueRegistry {
219+
qr.queueGoroutinesWg.Add(1)
220+
go qr.runQueue(ctx, queue)
221+
}
222+
223+
// Wait for all queue goroutines to complete
224+
qr.queueGoroutinesWg.Wait()
225+
qr.logger.Debug("All queue goroutines completed")
226+
qr.completionChan <- struct{}{}
227+
}
228+
229+
func (qr *queueRunner) runQueue(ctx *dbosContext, queue WorkflowQueue) {
230+
defer qr.queueGoroutinesWg.Done()
231+
232+
queueLogger := qr.logger.With("queue_name", queue.Name)
233+
// Current polling interval starts at the base interval and adjusts based on errors
234+
currentPollingInterval := queue.basePollingInterval
216235

217236
for {
218237
hasBackoffError := false
219-
220-
// Iterate through all queues in the registry
221-
for _, queue := range qr.workflowQueueRegistry {
222-
// Build list of partition keys to dequeue from
223-
// Default to empty string for non-partitioned queues
224-
partitionKeys := []string{""}
225-
if queue.PartitionQueue {
226-
partitions, err := retryWithResult(ctx, func() ([]string, error) {
227-
return ctx.systemDB.getQueuePartitions(ctx, queue.Name)
228-
}, withRetrierLogger(qr.logger))
229-
if err != nil {
230-
qr.logger.Error("Error getting queue partitions", "queue_name", queue.Name, "error", err)
231-
continue
238+
skipDequeue := false
239+
240+
// Build list of partition keys to dequeue from
241+
// Default to empty string for non-partitioned queues
242+
partitionKeys := []string{""}
243+
if queue.PartitionQueue {
244+
partitions, err := retryWithResult(ctx, func() ([]string, error) {
245+
return ctx.systemDB.getQueuePartitions(ctx, queue.Name)
246+
}, withRetrierLogger(queueLogger))
247+
if err != nil {
248+
skipDequeue = true
249+
if pgErr, ok := err.(*pgconn.PgError); ok {
250+
switch pgErr.Code {
251+
case pgerrcode.SerializationFailure, pgerrcode.LockNotAvailable:
252+
hasBackoffError = true
253+
}
254+
} else {
255+
queueLogger.Error("Error getting queue partitions", "error", err)
232256
}
257+
} else {
233258
partitionKeys = partitions
234259
}
260+
}
235261

236-
// Dequeue from each partition (or once for non-partitioned queues)
262+
// Dequeue from each partition (or once for non-partitioned queues)
263+
if !skipDequeue {
237264
var dequeuedWorkflows []dequeuedWorkflow
238265
for _, partitionKey := range partitionKeys {
239266
workflows, shouldContinue := qr.dequeueWorkflows(ctx, queue, partitionKey, &hasBackoffError)
@@ -244,54 +271,54 @@ func (qr *queueRunner) run(ctx *dbosContext) {
244271
}
245272

246273
if len(dequeuedWorkflows) > 0 {
247-
qr.logger.Debug("Dequeued workflows from queue", "queue_name", queue.Name, "workflows", len(dequeuedWorkflows))
274+
queueLogger.Debug("Dequeued workflows from queue", "workflows", len(dequeuedWorkflows))
248275
}
249276
for _, workflow := range dequeuedWorkflows {
250277
// Find the workflow in the registry
251-
252278
wfName, ok := ctx.workflowCustomNametoFQN.Load(workflow.name)
253279
if !ok {
254-
qr.logger.Error("Workflow not found in registry", "workflow_name", workflow.name)
280+
queueLogger.Error("Workflow not found in registry", "workflow_name", workflow.name)
255281
continue
256282
}
257283

258284
registeredWorkflowAny, exists := ctx.workflowRegistry.Load(wfName.(string))
259285
if !exists {
260-
qr.logger.Error("workflow function not found in registry", "workflow_name", workflow.name)
286+
queueLogger.Error("workflow function not found in registry", "workflow_name", workflow.name)
261287
continue
262288
}
263289
registeredWorkflow, ok := registeredWorkflowAny.(WorkflowRegistryEntry)
264290
if !ok {
265-
qr.logger.Error("invalid workflow registry entry type", "workflow_name", workflow.name)
291+
queueLogger.Error("invalid workflow registry entry type", "workflow_name", workflow.name)
266292
continue
267293
}
268294

269295
// Pass encoded input directly - decoding will happen in workflow wrapper when we know the target type
270296
_, err := registeredWorkflow.wrappedFunction(ctx, workflow.input, WithWorkflowID(workflow.id))
271297
if err != nil {
272-
qr.logger.Error("Error running queued workflow", "error", err)
298+
queueLogger.Error("Error running queued workflow", "error", err)
273299
}
274300
}
275301
}
276302

277-
// Adjust polling interval based on errors
303+
// Adjust polling interval for this queue based on errors
278304
if hasBackoffError {
279-
// Increase polling interval using exponential backoff
280-
pollingInterval = math.Min(pollingInterval*qr.backoffFactor, qr.maxInterval)
305+
// Increase polling interval using exponential backoff, but never exceed maxPollingInterval
306+
newInterval := time.Duration(float64(currentPollingInterval) * qr.backoffFactor)
307+
currentPollingInterval = min(newInterval, queue.maxPollingInterval)
281308
} else {
282-
// Scale back polling interval on successful iteration
283-
pollingInterval = math.Max(qr.minInterval, pollingInterval*qr.scalebackFactor)
309+
// Scale back polling interval on successful iteration, but never go below base interval
310+
newInterval := time.Duration(float64(currentPollingInterval) * qr.scalebackFactor)
311+
currentPollingInterval = max(newInterval, queue.basePollingInterval)
284312
}
285313

286-
// Apply jitter to the polling interval
314+
// Apply jitter to this queue's polling interval
287315
jitter := qr.jitterMin + rand.Float64()*(qr.jitterMax-qr.jitterMin) // #nosec G404 -- non-crypto jitter; acceptable
288-
sleepDuration := time.Duration(pollingInterval * jitter * float64(time.Second))
316+
sleepDuration := time.Duration(float64(currentPollingInterval) * jitter)
289317

290318
// Sleep with jittered interval, but allow early exit on context cancellation
291319
select {
292320
case <-ctx.Done():
293-
qr.logger.Debug("Queue runner stopping due to context cancellation", "cause", context.Cause(ctx))
294-
qr.completionChan <- struct{}{}
321+
queueLogger.Debug("Queue goroutine stopping due to context cancellation", "cause", context.Cause(ctx))
295322
return
296323
case <-time.After(sleepDuration):
297324
// Continue to next iteration
@@ -314,9 +341,7 @@ func (qr *queueRunner) dequeueWorkflows(ctx *dbosContext, queue WorkflowQueue, p
314341
if err != nil {
315342
if pgErr, ok := err.(*pgconn.PgError); ok {
316343
switch pgErr.Code {
317-
case pgerrcode.SerializationFailure:
318-
*hasBackoffError = true
319-
case pgerrcode.LockNotAvailable:
344+
case pgerrcode.SerializationFailure, pgerrcode.LockNotAvailable:
320345
*hasBackoffError = true
321346
}
322347
} else {

0 commit comments

Comments
 (0)