Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dbos/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ func newAwaitedWorkflowCancelledError(workflowID string) *DBOSError {
}
}

func newAwaitedWorkflowMaxStepRetriesExceeded(workflowID string) *DBOSError {
return &DBOSError{
Message: fmt.Sprintf("Awaited workflow %s has exceeded the maximum number of step retries", workflowID),
Code: MaxStepRetriesExceeded,
WorkflowID: workflowID,
}
}

func newWorkflowCancelledError(workflowID string) *DBOSError {
return &DBOSError{
Message: fmt.Sprintf("Workflow %s was cancelled", workflowID),
Expand Down
14 changes: 11 additions & 3 deletions dbos/queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,17 @@ func TestWorkflowQueues(t *testing.T) {
// Check the workflow completes
dlqCompleteEvent.Set()
for _, handle := range handles {
result, err := handle.GetResult()
require.NoError(t, err, "failed to get result from recovered workflow handle")
assert.Equal(t, "test-input", result, "expected result to be 'test-input'")
result, _ := handle.GetResult()
handleStatus, _ := handle.GetStatus()

if result == nil {
assert.Equal(t, WorkflowStatusMaxRecoveryAttemptsExceeded, handleStatus.Status, "expected workflow to be in DLQ after max retries exceeded")
// resErr is not nil
} else {
// resErr is nil
assert.Equal(t, WorkflowStatusSuccess, handleStatus.Status, "expected workflow status to be SUCCESS when result is not nil")
}

}

require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after successive enqueues test")
Expand Down
2 changes: 2 additions & 0 deletions dbos/system_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,8 @@ func (s *sysDB) awaitWorkflowResult(ctx context.Context, workflowID string) (*st
return outputString, errors.New(*errorStr)
case WorkflowStatusCancelled:
return outputString, newAwaitedWorkflowCancelledError(workflowID)
case WorkflowStatusMaxRecoveryAttemptsExceeded:
return outputString, newAwaitedWorkflowMaxStepRetriesExceeded(workflowID)
default:
time.Sleep(_DB_RETRY_INTERVAL)
}
Expand Down
Loading