Skip to content

Commit 51e59a9

Browse files
committed
per queue goroutine
1 parent a45e2ed commit 51e59a9

File tree

2 files changed

+156
-123
lines changed

2 files changed

+156
-123
lines changed

dbos/queue.go

Lines changed: 128 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package dbos
33
import (
44
"context"
55
"log/slog"
6-
"math"
76
"math/rand"
7+
"sync"
88
"time"
99

1010
"github.com/jackc/pgerrcode"
@@ -14,9 +14,8 @@ import (
1414
const (
1515
_DBOS_INTERNAL_QUEUE_NAME = "_dbos_internal_queue"
1616
_DEFAULT_MAX_TASKS_PER_ITERATION = 100
17-
_DEFAULT_BASE_INTERVAL = 1.0
18-
_DEFAULT_MAX_INTERVAL = 120.0
19-
_DEFAULT_MIN_INTERVAL = 1.0
17+
_DEFAULT_BASE_POLLING_INTERVAL = 1 * time.Second
18+
_DEFAULT_MAX_POLLING_INTERVAL = 120 * time.Second
2019
)
2120

2221
// RateLimiter configures rate limiting for workflow queue execution.
@@ -29,13 +28,16 @@ type RateLimiter struct {
2928
// WorkflowQueue defines a named queue for workflow execution.
3029
// Queues provide controlled workflow execution with concurrency limits, priority scheduling, and rate limiting.
3130
type WorkflowQueue struct {
32-
Name string `json:"name"` // Unique queue name
33-
WorkerConcurrency *int `json:"workerConcurrency,omitempty"` // Max concurrent workflows per executor
34-
GlobalConcurrency *int `json:"concurrency,omitempty"` // Max concurrent workflows across all executors
35-
PriorityEnabled bool `json:"priorityEnabled,omitempty"` // Enable priority-based scheduling
36-
RateLimit *RateLimiter `json:"rateLimit,omitempty"` // Rate limiting configuration
37-
MaxTasksPerIteration int `json:"maxTasksPerIteration"` // Max workflows to dequeue per iteration
38-
PartitionQueue bool `json:"partitionQueue,omitempty"` // Enable partitioned queue mode
31+
Name string `json:"name"` // Unique queue name
32+
WorkerConcurrency *int `json:"workerConcurrency,omitempty"` // Max concurrent workflows per executor
33+
GlobalConcurrency *int `json:"concurrency,omitempty"` // Max concurrent workflows across all executors
34+
PriorityEnabled bool `json:"priorityEnabled,omitempty"` // Enable priority-based scheduling
35+
RateLimit *RateLimiter `json:"rateLimit,omitempty"` // Rate limiting configuration
36+
MaxTasksPerIteration int `json:"maxTasksPerIteration"` // Max workflows to dequeue per iteration
37+
PartitionQueue bool `json:"partitionQueue,omitempty"` // Enable partitioned queue mode
38+
BasePollingInterval time.Duration `json:"basePollingInterval,omitempty"` // Initial polling interval (minimum, never poll faster)
39+
MaxPollingInterval time.Duration `json:"maxPollingInterval,omitempty"` // Maximum polling interval (never poll slower)
40+
currentPollingInterval time.Duration
3941
}
4042

4143
// QueueOption is a functional option for configuring a workflow queue
@@ -91,9 +93,28 @@ func WithPartitionQueue() QueueOption {
9193
}
9294
}
9395

96+
// WithQueueBasePollingInterval sets the initial polling interval for the queue.
97+
// This is the starting interval and the minimum - the queue will never poll faster than this.
98+
// If not set (0), the queue will use the default base polling interval during creation.
99+
func WithQueueBasePollingInterval(interval time.Duration) QueueOption {
100+
return func(q *WorkflowQueue) {
101+
q.BasePollingInterval = interval
102+
q.currentPollingInterval = interval
103+
}
104+
}
105+
106+
// WithQueueMaxPollingInterval sets the maximum polling interval for the queue.
107+
// The queue will never poll slower than this value, even when backing off due to errors.
108+
// If not set (0), the queue will use the default max polling interval during creation.
109+
func WithQueueMaxPollingInterval(interval time.Duration) QueueOption {
110+
return func(q *WorkflowQueue) {
111+
q.MaxPollingInterval = interval
112+
}
113+
}
114+
94115
// NewWorkflowQueue creates a new workflow queue with the specified name and configuration options.
95116
// The queue must be created before workflows can be enqueued to it using the WithQueue option in RunWorkflow.
96-
// Queues provide controlled execution with support for concurrency limits, priority scheduling, and rate limiting.
117+
// Queues provide controlled execution with support for concurrency limits, priority scheduling, rate limiting, and polling intervals.
97118
//
98119
// Example:
99120
//
@@ -104,6 +125,8 @@ func WithPartitionQueue() QueueOption {
104125
// Period: 60 * time.Second, // 100 workflows per minute
105126
// }),
106127
// dbos.WithPriorityEnabled(),
128+
// dbos.WithQueueBasePollingInterval(1 * time.Second),
129+
// dbos.WithQueueMaxPollingInterval(60 * time.Second),
107130
// )
108131
//
109132
// // Enqueue workflows to this queue:
@@ -124,71 +147,53 @@ func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...QueueOption)
124147

125148
// Create queue with default settings
126149
q := WorkflowQueue{
127-
Name: name,
128-
WorkerConcurrency: nil,
129-
GlobalConcurrency: nil,
130-
PriorityEnabled: false,
131-
RateLimit: nil,
132-
MaxTasksPerIteration: _DEFAULT_MAX_TASKS_PER_ITERATION,
150+
Name: name,
151+
WorkerConcurrency: nil,
152+
GlobalConcurrency: nil,
153+
PriorityEnabled: false,
154+
RateLimit: nil,
155+
MaxTasksPerIteration: _DEFAULT_MAX_TASKS_PER_ITERATION,
156+
BasePollingInterval: _DEFAULT_BASE_POLLING_INTERVAL,
157+
MaxPollingInterval: _DEFAULT_MAX_POLLING_INTERVAL,
158+
currentPollingInterval: _DEFAULT_BASE_POLLING_INTERVAL,
133159
}
134160

135161
// Apply functional options
136162
for _, option := range options {
137163
option(&q)
138164
}
139-
140165
// Register the queue in the global registry
141-
ctx.queueRunner.workflowQueueRegistry[name] = q
166+
ctx.queueRunner.workflowQueueRegistry[name] = &q
142167

143168
return q
144169
}
145170

146-
// QueueRunnerConfig configures the queue runner polling behavior.
147-
type QueueRunnerConfig struct {
148-
BaseInterval float64 // seconds
149-
MinInterval float64 // seconds
150-
MaxInterval float64 // seconds
151-
}
152-
153171
type queueRunner struct {
154172
logger *slog.Logger
155173

156174
// Queue runner iteration parameters
157-
baseInterval float64
158-
minInterval float64
159-
maxInterval float64
160175
backoffFactor float64
161176
scalebackFactor float64
162177
jitterMin float64
163178
jitterMax float64
164179

165180
// Queue registry
166-
workflowQueueRegistry map[string]WorkflowQueue
181+
workflowQueueRegistry map[string]*WorkflowQueue
182+
183+
// WaitGroup to track all queue goroutines
184+
queueGoroutinesWg sync.WaitGroup
167185

168186
// Channel to signal completion back to the DBOS context
169187
completionChan chan struct{}
170188
}
171189

172-
func newQueueRunner(logger *slog.Logger, config QueueRunnerConfig) *queueRunner {
173-
if config.BaseInterval == 0 {
174-
config.BaseInterval = _DEFAULT_BASE_INTERVAL
175-
}
176-
if config.MinInterval == 0 {
177-
config.MinInterval = _DEFAULT_MIN_INTERVAL
178-
}
179-
if config.MaxInterval == 0 {
180-
config.MaxInterval = _DEFAULT_MAX_INTERVAL
181-
}
182-
190+
func newQueueRunner(logger *slog.Logger) *queueRunner {
183191
return &queueRunner{
184-
baseInterval: config.BaseInterval,
185-
minInterval: config.MinInterval,
186-
maxInterval: config.MaxInterval,
187192
backoffFactor: 2.0,
188193
scalebackFactor: 0.9,
189194
jitterMin: 0.95,
190195
jitterMax: 1.05,
191-
workflowQueueRegistry: make(map[string]WorkflowQueue),
196+
workflowQueueRegistry: make(map[string]*WorkflowQueue),
192197
completionChan: make(chan struct{}, 1),
193198
logger: logger.With("service", "queue_runner"),
194199
}
@@ -197,101 +202,117 @@ func newQueueRunner(logger *slog.Logger, config QueueRunnerConfig) *queueRunner
197202
func (qr *queueRunner) listQueues() []WorkflowQueue {
198203
queues := make([]WorkflowQueue, 0, len(qr.workflowQueueRegistry))
199204
for _, queue := range qr.workflowQueueRegistry {
200-
queues = append(queues, queue)
205+
queues = append(queues, *queue)
201206
}
202207
return queues
203208
}
204209

205210
// getQueue returns the queue with the given name from the registry.
206211
// Returns a pointer to the queue if found, or nil if the queue does not exist.
207212
func (qr *queueRunner) getQueue(queueName string) *WorkflowQueue {
208-
if queue, exists := qr.workflowQueueRegistry[queueName]; exists {
209-
return &queue
210-
}
211-
return nil
213+
return qr.workflowQueueRegistry[queueName]
212214
}
213215

216+
// run starts a goroutine for each registered queue to handle polling independently.
214217
func (qr *queueRunner) run(ctx *dbosContext) {
215-
pollingInterval := qr.baseInterval
218+
for _, queue := range qr.workflowQueueRegistry {
219+
qr.queueGoroutinesWg.Add(1)
220+
go qr.runQueue(ctx, queue)
221+
}
222+
223+
// Wait for all queue goroutines to complete
224+
qr.queueGoroutinesWg.Wait()
225+
qr.logger.Debug("All queue goroutines completed")
226+
qr.completionChan <- struct{}{}
227+
}
228+
229+
func (qr *queueRunner) runQueue(ctx *dbosContext, queue *WorkflowQueue) {
230+
defer qr.queueGoroutinesWg.Done()
231+
232+
queueLogger := qr.logger.With("queue_name", queue.Name)
216233

217234
for {
218235
hasBackoffError := false
219236

220-
// Iterate through all queues in the registry
221-
for _, queue := range qr.workflowQueueRegistry {
222-
// Build list of partition keys to dequeue from
223-
// Default to empty string for non-partitioned queues
224-
partitionKeys := []string{""}
225-
if queue.PartitionQueue {
226-
partitions, err := retryWithResult(ctx, func() ([]string, error) {
227-
return ctx.systemDB.getQueuePartitions(ctx, queue.Name)
228-
}, withRetrierLogger(qr.logger))
229-
if err != nil {
230-
qr.logger.Error("Error getting queue partitions", "queue_name", queue.Name, "error", err)
231-
continue
237+
// Build list of partition keys to dequeue from
238+
// Default to empty string for non-partitioned queues
239+
partitionKeys := []string{""}
240+
if queue.PartitionQueue {
241+
partitions, err := retryWithResult(ctx, func() ([]string, error) {
242+
return ctx.systemDB.getQueuePartitions(ctx, queue.Name)
243+
}, withRetrierLogger(queueLogger))
244+
if err != nil {
245+
if pgErr, ok := err.(*pgconn.PgError); ok {
246+
switch pgErr.Code {
247+
case pgerrcode.SerializationFailure, pgerrcode.LockNotAvailable:
248+
hasBackoffError = true
249+
}
250+
} else {
251+
queueLogger.Error("Error getting queue partitions", "error", err)
232252
}
253+
} else {
233254
partitionKeys = partitions
234255
}
256+
}
235257

236-
// Dequeue from each partition (or once for non-partitioned queues)
237-
var dequeuedWorkflows []dequeuedWorkflow
238-
for _, partitionKey := range partitionKeys {
239-
workflows, shouldContinue := qr.dequeueWorkflows(ctx, queue, partitionKey, &hasBackoffError)
240-
if shouldContinue {
241-
continue
242-
}
243-
dequeuedWorkflows = append(dequeuedWorkflows, workflows...)
258+
// Dequeue from each partition (or once for non-partitioned queues)
259+
var dequeuedWorkflows []dequeuedWorkflow
260+
for _, partitionKey := range partitionKeys {
261+
workflows, shouldContinue := qr.dequeueWorkflows(ctx, *queue, partitionKey, &hasBackoffError)
262+
if shouldContinue {
263+
continue
244264
}
265+
dequeuedWorkflows = append(dequeuedWorkflows, workflows...)
266+
}
245267

246-
if len(dequeuedWorkflows) > 0 {
247-
qr.logger.Debug("Dequeued workflows from queue", "queue_name", queue.Name, "workflows", len(dequeuedWorkflows))
268+
if len(dequeuedWorkflows) > 0 {
269+
queueLogger.Debug("Dequeued workflows from queue", "workflows", len(dequeuedWorkflows))
270+
}
271+
for _, workflow := range dequeuedWorkflows {
272+
// Find the workflow in the registry
273+
wfName, ok := ctx.workflowCustomNametoFQN.Load(workflow.name)
274+
if !ok {
275+
queueLogger.Error("Workflow not found in registry", "workflow_name", workflow.name)
276+
continue
248277
}
249-
for _, workflow := range dequeuedWorkflows {
250-
// Find the workflow in the registry
251278

252-
wfName, ok := ctx.workflowCustomNametoFQN.Load(workflow.name)
253-
if !ok {
254-
qr.logger.Error("Workflow not found in registry", "workflow_name", workflow.name)
255-
continue
256-
}
257-
258-
registeredWorkflowAny, exists := ctx.workflowRegistry.Load(wfName.(string))
259-
if !exists {
260-
qr.logger.Error("workflow function not found in registry", "workflow_name", workflow.name)
261-
continue
262-
}
263-
registeredWorkflow, ok := registeredWorkflowAny.(WorkflowRegistryEntry)
264-
if !ok {
265-
qr.logger.Error("invalid workflow registry entry type", "workflow_name", workflow.name)
266-
continue
267-
}
279+
registeredWorkflowAny, exists := ctx.workflowRegistry.Load(wfName.(string))
280+
if !exists {
281+
queueLogger.Error("workflow function not found in registry", "workflow_name", workflow.name)
282+
continue
283+
}
284+
registeredWorkflow, ok := registeredWorkflowAny.(WorkflowRegistryEntry)
285+
if !ok {
286+
queueLogger.Error("invalid workflow registry entry type", "workflow_name", workflow.name)
287+
continue
288+
}
268289

269-
// Pass encoded input directly - decoding will happen in workflow wrapper when we know the target type
270-
_, err := registeredWorkflow.wrappedFunction(ctx, workflow.input, WithWorkflowID(workflow.id))
271-
if err != nil {
272-
qr.logger.Error("Error running queued workflow", "error", err)
273-
}
290+
// Pass encoded input directly - decoding will happen in workflow wrapper when we know the target type
291+
_, err := registeredWorkflow.wrappedFunction(ctx, workflow.input, WithWorkflowID(workflow.id))
292+
if err != nil {
293+
queueLogger.Error("Error running queued workflow", "error", err)
274294
}
275295
}
276296

277-
// Adjust polling interval based on errors
297+
// Adjust polling interval for this queue based on errors
278298
if hasBackoffError {
279-
// Increase polling interval using exponential backoff
280-
pollingInterval = math.Min(pollingInterval*qr.backoffFactor, qr.maxInterval)
299+
// Increase polling interval using exponential backoff, but never exceed maxPollingInterval
300+
newInterval := time.Duration(float64(queue.currentPollingInterval) * qr.backoffFactor)
301+
queue.currentPollingInterval = min(newInterval, queue.MaxPollingInterval)
281302
} else {
282-
// Scale back polling interval on successful iteration
283-
pollingInterval = math.Max(qr.minInterval, pollingInterval*qr.scalebackFactor)
303+
// Scale back polling interval on successful iteration, but never go below basePollingInterval
304+
newInterval := time.Duration(float64(queue.currentPollingInterval) * qr.scalebackFactor)
305+
queue.currentPollingInterval = max(newInterval, queue.BasePollingInterval)
284306
}
285307

286-
// Apply jitter to the polling interval
308+
// Apply jitter to this queue's polling interval
287309
jitter := qr.jitterMin + rand.Float64()*(qr.jitterMax-qr.jitterMin) // #nosec G404 -- non-crypto jitter; acceptable
288-
sleepDuration := time.Duration(pollingInterval * jitter * float64(time.Second))
310+
sleepDuration := time.Duration(float64(queue.currentPollingInterval) * jitter)
289311

290312
// Sleep with jittered interval, but allow early exit on context cancellation
291313
select {
292314
case <-ctx.Done():
293-
qr.logger.Debug("Queue runner stopping due to context cancellation", "cause", context.Cause(ctx))
294-
qr.completionChan <- struct{}{}
315+
queueLogger.Debug("Queue goroutine stopping due to context cancellation", "cause", context.Cause(ctx))
295316
return
296317
case <-time.After(sleepDuration):
297318
// Continue to next iteration
@@ -314,9 +335,7 @@ func (qr *queueRunner) dequeueWorkflows(ctx *dbosContext, queue WorkflowQueue, p
314335
if err != nil {
315336
if pgErr, ok := err.(*pgconn.PgError); ok {
316337
switch pgErr.Code {
317-
case pgerrcode.SerializationFailure:
318-
*hasBackoffError = true
319-
case pgerrcode.LockNotAvailable:
338+
case pgerrcode.SerializationFailure, pgerrcode.LockNotAvailable:
320339
*hasBackoffError = true
321340
}
322341
} else {

0 commit comments

Comments
 (0)