Skip to content

Commit 2ca2427

Browse files
committed
handle errors from get partitions
1 parent 51e59a9 commit 2ca2427

File tree

1 file changed

+34
-30
lines changed

1 file changed

+34
-30
lines changed

dbos/queue.go

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ func (qr *queueRunner) runQueue(ctx *dbosContext, queue *WorkflowQueue) {
233233

234234
for {
235235
hasBackoffError := false
236+
skipDequeue := false
236237

237238
// Build list of partition keys to dequeue from
238239
// Default to empty string for non-partitioned queues
@@ -242,6 +243,7 @@ func (qr *queueRunner) runQueue(ctx *dbosContext, queue *WorkflowQueue) {
242243
return ctx.systemDB.getQueuePartitions(ctx, queue.Name)
243244
}, withRetrierLogger(queueLogger))
244245
if err != nil {
246+
skipDequeue = true
245247
if pgErr, ok := err.(*pgconn.PgError); ok {
246248
switch pgErr.Code {
247249
case pgerrcode.SerializationFailure, pgerrcode.LockNotAvailable:
@@ -256,41 +258,43 @@ func (qr *queueRunner) runQueue(ctx *dbosContext, queue *WorkflowQueue) {
256258
}
257259

258260
// Dequeue from each partition (or once for non-partitioned queues)
259-
var dequeuedWorkflows []dequeuedWorkflow
260-
for _, partitionKey := range partitionKeys {
261-
workflows, shouldContinue := qr.dequeueWorkflows(ctx, *queue, partitionKey, &hasBackoffError)
262-
if shouldContinue {
263-
continue
261+
if !skipDequeue {
262+
var dequeuedWorkflows []dequeuedWorkflow
263+
for _, partitionKey := range partitionKeys {
264+
workflows, shouldContinue := qr.dequeueWorkflows(ctx, *queue, partitionKey, &hasBackoffError)
265+
if shouldContinue {
266+
continue
267+
}
268+
dequeuedWorkflows = append(dequeuedWorkflows, workflows...)
264269
}
265-
dequeuedWorkflows = append(dequeuedWorkflows, workflows...)
266-
}
267270

268-
if len(dequeuedWorkflows) > 0 {
269-
queueLogger.Debug("Dequeued workflows from queue", "workflows", len(dequeuedWorkflows))
270-
}
271-
for _, workflow := range dequeuedWorkflows {
272-
// Find the workflow in the registry
273-
wfName, ok := ctx.workflowCustomNametoFQN.Load(workflow.name)
274-
if !ok {
275-
queueLogger.Error("Workflow not found in registry", "workflow_name", workflow.name)
276-
continue
271+
if len(dequeuedWorkflows) > 0 {
272+
queueLogger.Debug("Dequeued workflows from queue", "workflows", len(dequeuedWorkflows))
277273
}
274+
for _, workflow := range dequeuedWorkflows {
275+
// Find the workflow in the registry
276+
wfName, ok := ctx.workflowCustomNametoFQN.Load(workflow.name)
277+
if !ok {
278+
queueLogger.Error("Workflow not found in registry", "workflow_name", workflow.name)
279+
continue
280+
}
278281

279-
registeredWorkflowAny, exists := ctx.workflowRegistry.Load(wfName.(string))
280-
if !exists {
281-
queueLogger.Error("workflow function not found in registry", "workflow_name", workflow.name)
282-
continue
283-
}
284-
registeredWorkflow, ok := registeredWorkflowAny.(WorkflowRegistryEntry)
285-
if !ok {
286-
queueLogger.Error("invalid workflow registry entry type", "workflow_name", workflow.name)
287-
continue
288-
}
282+
registeredWorkflowAny, exists := ctx.workflowRegistry.Load(wfName.(string))
283+
if !exists {
284+
queueLogger.Error("workflow function not found in registry", "workflow_name", workflow.name)
285+
continue
286+
}
287+
registeredWorkflow, ok := registeredWorkflowAny.(WorkflowRegistryEntry)
288+
if !ok {
289+
queueLogger.Error("invalid workflow registry entry type", "workflow_name", workflow.name)
290+
continue
291+
}
289292

290-
// Pass encoded input directly - decoding will happen in workflow wrapper when we know the target type
291-
_, err := registeredWorkflow.wrappedFunction(ctx, workflow.input, WithWorkflowID(workflow.id))
292-
if err != nil {
293-
queueLogger.Error("Error running queued workflow", "error", err)
293+
// Pass encoded input directly - decoding will happen in workflow wrapper when we know the target type
294+
_, err := registeredWorkflow.wrappedFunction(ctx, workflow.input, WithWorkflowID(workflow.id))
295+
if err != nil {
296+
queueLogger.Error("Error running queued workflow", "error", err)
297+
}
294298
}
295299
}
296300

0 commit comments

Comments
 (0)