Skip to content

Commit fed0b16

Browse files
committed
simplify + solve data race
1 parent 2ca2427 commit fed0b16

File tree

2 files changed

+39
-41
lines changed

2 files changed

+39
-41
lines changed

dbos/queue.go

Lines changed: 35 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,15 @@ type RateLimiter struct {
2828
// WorkflowQueue defines a named queue for workflow execution.
2929
// Queues provide controlled workflow execution with concurrency limits, priority scheduling, and rate limiting.
3030
type WorkflowQueue struct {
31-
Name string `json:"name"` // Unique queue name
32-
WorkerConcurrency *int `json:"workerConcurrency,omitempty"` // Max concurrent workflows per executor
33-
GlobalConcurrency *int `json:"concurrency,omitempty"` // Max concurrent workflows across all executors
34-
PriorityEnabled bool `json:"priorityEnabled,omitempty"` // Enable priority-based scheduling
35-
RateLimit *RateLimiter `json:"rateLimit,omitempty"` // Rate limiting configuration
36-
MaxTasksPerIteration int `json:"maxTasksPerIteration"` // Max workflows to dequeue per iteration
37-
PartitionQueue bool `json:"partitionQueue,omitempty"` // Enable partitioned queue mode
38-
BasePollingInterval time.Duration `json:"basePollingInterval,omitempty"` // Initial polling interval (minimum, never poll faster)
39-
MaxPollingInterval time.Duration `json:"maxPollingInterval,omitempty"` // Maximum polling interval (never poll slower)
40-
currentPollingInterval time.Duration
31+
Name string `json:"name"` // Unique queue name
32+
WorkerConcurrency *int `json:"workerConcurrency,omitempty"` // Max concurrent workflows per executor
33+
GlobalConcurrency *int `json:"concurrency,omitempty"` // Max concurrent workflows across all executors
34+
PriorityEnabled bool `json:"priorityEnabled,omitempty"` // Enable priority-based scheduling
35+
RateLimit *RateLimiter `json:"rateLimit,omitempty"` // Rate limiting configuration
36+
MaxTasksPerIteration int `json:"maxTasksPerIteration"` // Max workflows to dequeue per iteration
37+
PartitionQueue bool `json:"partitionQueue,omitempty"` // Enable partitioned queue mode
38+
basePollingInterval time.Duration // Base polling interval (minimum, never poll faster)
39+
maxPollingInterval time.Duration // Maximum polling interval (never poll slower)
4140
}
4241

4342
// QueueOption is a functional option for configuring a workflow queue
@@ -98,8 +97,7 @@ func WithPartitionQueue() QueueOption {
9897
// If not set (0), the queue will use the default base polling interval during creation.
9998
func WithQueueBasePollingInterval(interval time.Duration) QueueOption {
10099
return func(q *WorkflowQueue) {
101-
q.BasePollingInterval = interval
102-
q.currentPollingInterval = interval
100+
q.basePollingInterval = interval
103101
}
104102
}
105103

@@ -108,7 +106,7 @@ func WithQueueBasePollingInterval(interval time.Duration) QueueOption {
108106
// If not set (0), the queue will use the default max polling interval during creation.
109107
func WithQueueMaxPollingInterval(interval time.Duration) QueueOption {
110108
return func(q *WorkflowQueue) {
111-
q.MaxPollingInterval = interval
109+
q.maxPollingInterval = interval
112110
}
113111
}
114112

@@ -147,23 +145,22 @@ func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...QueueOption)
147145

148146
// Create queue with default settings
149147
q := WorkflowQueue{
150-
Name: name,
151-
WorkerConcurrency: nil,
152-
GlobalConcurrency: nil,
153-
PriorityEnabled: false,
154-
RateLimit: nil,
155-
MaxTasksPerIteration: _DEFAULT_MAX_TASKS_PER_ITERATION,
156-
BasePollingInterval: _DEFAULT_BASE_POLLING_INTERVAL,
157-
MaxPollingInterval: _DEFAULT_MAX_POLLING_INTERVAL,
158-
currentPollingInterval: _DEFAULT_BASE_POLLING_INTERVAL,
148+
Name: name,
149+
WorkerConcurrency: nil,
150+
GlobalConcurrency: nil,
151+
PriorityEnabled: false,
152+
RateLimit: nil,
153+
MaxTasksPerIteration: _DEFAULT_MAX_TASKS_PER_ITERATION,
154+
basePollingInterval: _DEFAULT_BASE_POLLING_INTERVAL,
155+
maxPollingInterval: _DEFAULT_MAX_POLLING_INTERVAL,
159156
}
160157

161158
// Apply functional options
162159
for _, option := range options {
163160
option(&q)
164161
}
165162
// Register the queue in the global registry
166-
ctx.queueRunner.workflowQueueRegistry[name] = &q
163+
ctx.queueRunner.workflowQueueRegistry[name] = q
167164

168165
return q
169166
}
@@ -178,7 +175,7 @@ type queueRunner struct {
178175
jitterMax float64
179176

180177
// Queue registry
181-
workflowQueueRegistry map[string]*WorkflowQueue
178+
workflowQueueRegistry map[string]WorkflowQueue
182179

183180
// WaitGroup to track all queue goroutines
184181
queueGoroutinesWg sync.WaitGroup
@@ -193,7 +190,7 @@ func newQueueRunner(logger *slog.Logger) *queueRunner {
193190
scalebackFactor: 0.9,
194191
jitterMin: 0.95,
195192
jitterMax: 1.05,
196-
workflowQueueRegistry: make(map[string]*WorkflowQueue),
193+
workflowQueueRegistry: make(map[string]WorkflowQueue),
197194
completionChan: make(chan struct{}, 1),
198195
logger: logger.With("service", "queue_runner"),
199196
}
@@ -202,14 +199,14 @@ func newQueueRunner(logger *slog.Logger) *queueRunner {
202199
func (qr *queueRunner) listQueues() []WorkflowQueue {
203200
queues := make([]WorkflowQueue, 0, len(qr.workflowQueueRegistry))
204201
for _, queue := range qr.workflowQueueRegistry {
205-
queues = append(queues, *queue)
202+
queues = append(queues, queue)
206203
}
207204
return queues
208205
}
209206

210207
// getQueue returns the queue with the given name from the registry.
211-
// Returns a pointer to the queue if found, or nil if the queue does not exist.
212-
func (qr *queueRunner) getQueue(queueName string) *WorkflowQueue {
208+
// Returns the queue if found, or an empty queue if it does not exist.
209+
func (qr *queueRunner) getQueue(queueName string) WorkflowQueue {
213210
return qr.workflowQueueRegistry[queueName]
214211
}
215212

@@ -226,10 +223,12 @@ func (qr *queueRunner) run(ctx *dbosContext) {
226223
qr.completionChan <- struct{}{}
227224
}
228225

229-
func (qr *queueRunner) runQueue(ctx *dbosContext, queue *WorkflowQueue) {
226+
func (qr *queueRunner) runQueue(ctx *dbosContext, queue WorkflowQueue) {
230227
defer qr.queueGoroutinesWg.Done()
231228

232229
queueLogger := qr.logger.With("queue_name", queue.Name)
230+
// Current polling interval starts at the base interval and adjusts based on errors
231+
currentPollingInterval := queue.basePollingInterval
233232

234233
for {
235234
hasBackoffError := false
@@ -261,7 +260,7 @@ func (qr *queueRunner) runQueue(ctx *dbosContext, queue *WorkflowQueue) {
261260
if !skipDequeue {
262261
var dequeuedWorkflows []dequeuedWorkflow
263262
for _, partitionKey := range partitionKeys {
264-
workflows, shouldContinue := qr.dequeueWorkflows(ctx, *queue, partitionKey, &hasBackoffError)
263+
workflows, shouldContinue := qr.dequeueWorkflows(ctx, queue, partitionKey, &hasBackoffError)
265264
if shouldContinue {
266265
continue
267266
}
@@ -301,17 +300,17 @@ func (qr *queueRunner) runQueue(ctx *dbosContext, queue *WorkflowQueue) {
301300
// Adjust polling interval for this queue based on errors
302301
if hasBackoffError {
303302
// Increase polling interval using exponential backoff, but never exceed maxPollingInterval
304-
newInterval := time.Duration(float64(queue.currentPollingInterval) * qr.backoffFactor)
305-
queue.currentPollingInterval = min(newInterval, queue.MaxPollingInterval)
303+
newInterval := time.Duration(float64(currentPollingInterval) * qr.backoffFactor)
304+
currentPollingInterval = min(newInterval, queue.maxPollingInterval)
306305
} else {
307-
// Scale back polling interval on successful iteration, but never go below basePollingInterval
308-
newInterval := time.Duration(float64(queue.currentPollingInterval) * qr.scalebackFactor)
309-
queue.currentPollingInterval = max(newInterval, queue.BasePollingInterval)
306+
// Scale back polling interval on successful iteration, but never go below base interval
307+
newInterval := time.Duration(float64(currentPollingInterval) * qr.scalebackFactor)
308+
currentPollingInterval = max(newInterval, queue.basePollingInterval)
310309
}
311310

312311
// Apply jitter to this queue's polling interval
313312
jitter := qr.jitterMin + rand.Float64()*(qr.jitterMax-qr.jitterMin) // #nosec G404 -- non-crypto jitter; acceptable
314-
sleepDuration := time.Duration(float64(queue.currentPollingInterval) * jitter)
313+
sleepDuration := time.Duration(float64(currentPollingInterval) * jitter)
315314

316315
// Sleep with jittered interval, but allow early exit on context cancellation
317316
select {

dbos/queues_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1640,9 +1640,8 @@ func TestQueuePollingIntervals(t *testing.T) {
16401640

16411641
queue := NewWorkflowQueue(ctx, "test-queue")
16421642
// Intervals are resolved during creation, so defaults should be applied
1643-
require.Equal(t, _DEFAULT_BASE_POLLING_INTERVAL, queue.BasePollingInterval)
1644-
require.Equal(t, _DEFAULT_MAX_POLLING_INTERVAL, queue.MaxPollingInterval)
1645-
require.Equal(t, _DEFAULT_BASE_POLLING_INTERVAL, queue.currentPollingInterval)
1643+
require.Equal(t, _DEFAULT_BASE_POLLING_INTERVAL, queue.basePollingInterval)
1644+
require.Equal(t, _DEFAULT_MAX_POLLING_INTERVAL, queue.maxPollingInterval)
16461645
})
16471646

16481647
t.Run("queue uses custom intervals when specified", func(t *testing.T) {
@@ -1656,7 +1655,7 @@ func TestQueuePollingIntervals(t *testing.T) {
16561655
WithQueueMaxPollingInterval(maxPollingInterval),
16571656
)
16581657

1659-
require.Equal(t, basePollingInterval, queue.BasePollingInterval)
1660-
require.Equal(t, maxPollingInterval, queue.MaxPollingInterval)
1658+
require.Equal(t, basePollingInterval, queue.basePollingInterval)
1659+
require.Equal(t, maxPollingInterval, queue.maxPollingInterval)
16611660
})
16621661
}

0 commit comments

Comments
 (0)