Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dbos/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ func newAwaitedWorkflowCancelledError(workflowID string) *DBOSError {
}
}

func newAwaitedWorkflowMaxStepRetriesExceeded(workflowID string) *DBOSError {
return &DBOSError{
Message: fmt.Sprintf("Awaited workflow %s has exceeded the maximum number of step retries", workflowID),
Code: MaxStepRetriesExceeded,
WorkflowID: workflowID,
}
}

func newWorkflowCancelledError(workflowID string) *DBOSError {
return &DBOSError{
Message: fmt.Sprintf("Workflow %s was cancelled", workflowID),
Expand Down
12 changes: 8 additions & 4 deletions dbos/queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,16 @@ func TestWorkflowQueues(t *testing.T) {
}

// Check the workflow completes
dlqCompleteEvent.Set()
for _, handle := range handles {
result, err := handle.GetResult()
require.NoError(t, err, "failed to get result from recovered workflow handle")
assert.Equal(t, "test-input", result, "expected result to be 'test-input'")

_, resultErr := handle.GetResult()

var dbosErr *DBOSError
require.ErrorAs(t, resultErr, &dbosErr, "expected error to be of type *DBOSError, got %T", resultErr)

assert.Equal(t, DBOSErrorCode(13), dbosErr.Code, "expected workflow to be in DLQ after max retries exceeded")
}
dlqCompleteEvent.Set()

require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after successive enqueues test")
})
Expand Down
2 changes: 2 additions & 0 deletions dbos/system_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,8 @@ func (s *sysDB) awaitWorkflowResult(ctx context.Context, workflowID string, poll
return outputString, errors.New(*errorStr)
case WorkflowStatusCancelled:
return outputString, newAwaitedWorkflowCancelledError(workflowID)
case WorkflowStatusMaxRecoveryAttemptsExceeded:
return outputString, newAwaitedWorkflowMaxStepRetriesExceeded(workflowID)
default:
time.Sleep(pollInterval)
}
Expand Down