From 12177e864c99e3cbf67d1f11e9e2637768b98aa7 Mon Sep 17 00:00:00 2001 From: shirzady1934 Date: Fri, 21 Nov 2025 20:37:20 +0330 Subject: [PATCH 1/3] make awaitWorkflowResult polling interval configurable --- dbos/system_database.go | 11 +- dbos/workflow.go | 25 +++- dbos/workflow_getresult_options_test.go | 153 ++++++++++++++++++++++++ 3 files changed, 180 insertions(+), 9 deletions(-) create mode 100644 dbos/workflow_getresult_options_test.go diff --git a/dbos/system_database.go b/dbos/system_database.go index e3365bc..6fbc703 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -38,7 +38,7 @@ type systemDatabase interface { insertWorkflowStatus(ctx context.Context, input insertWorkflowStatusDBInput) (*insertWorkflowResult, error) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ([]WorkflowStatus, error) updateWorkflowOutcome(ctx context.Context, input updateWorkflowOutcomeDBInput) error - awaitWorkflowResult(ctx context.Context, workflowID string) (*string, error) + awaitWorkflowResult(ctx context.Context, workflowID string, pollInterval time.Duration) (*string, error) cancelWorkflow(ctx context.Context, workflowID string) error cancelAllBefore(ctx context.Context, cutoffTime time.Time) error resumeWorkflow(ctx context.Context, workflowID string) error @@ -1188,9 +1188,12 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) (st return forkedWorkflowID, nil } -func (s *sysDB) awaitWorkflowResult(ctx context.Context, workflowID string) (*string, error) { +func (s *sysDB) awaitWorkflowResult(ctx context.Context, workflowID string, pollInterval time.Duration) (*string, error) { query := fmt.Sprintf(`SELECT status, output, error FROM %s.workflow_status WHERE workflow_uuid = $1`, pgx.Identifier{s.schema}.Sanitize()) var status WorkflowStatusType + if pollInterval <= 0 { + pollInterval = _DB_RETRY_INTERVAL + } for { select { case <-ctx.Done(): @@ -1204,7 +1207,7 @@ func (s *sysDB) awaitWorkflowResult(ctx context.Context, workflowID string) (*st err := row.Scan(&status, &outputString, &errorStr) if err != nil { if err == pgx.ErrNoRows { - time.Sleep(_DB_RETRY_INTERVAL) + time.Sleep(pollInterval) continue } return nil, fmt.Errorf("failed to query workflow status: %w", err) @@ -1219,7 +1222,7 @@ func (s *sysDB) awaitWorkflowResult(ctx context.Context, workflowID string) (*st case WorkflowStatusCancelled: return outputString, newAwaitedWorkflowCancelledError(workflowID) default: - time.Sleep(_DB_RETRY_INTERVAL) + time.Sleep(pollInterval) } } } diff --git a/dbos/workflow.go b/dbos/workflow.go index fff9de4..cc2a7fa 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -104,7 +104,12 @@ type GetResultOption func(*getResultOptions) // getResultOptions holds the configuration for GetResult execution. type getResultOptions struct { - timeout time.Duration + timeout time.Duration + pollInterval time.Duration +} + +func defaultGetResultOptions() *getResultOptions { + return &getResultOptions{pollInterval: _DB_RETRY_INTERVAL} } // WithHandleTimeout sets a timeout for the GetResult operation. @@ -115,6 +120,16 @@ func WithHandleTimeout(timeout time.Duration) GetResultOption { } } +// WithPollingInterval sets the polling interval for awaiting workflow completion in GetResult. +// If a non-positive interval is provided, the default interval is used. +func WithPollingInterval(interval time.Duration) GetResultOption { + return func(opts *getResultOptions) { + if interval > 0 { + opts.pollInterval = interval + } + } +} + // GetStatus returns the current status of the workflow from the database // If the DBOSContext is running in client mode, do not load input and outputs func (h *baseWorkflowHandle) GetStatus() (WorkflowStatus, error) { @@ -186,7 +201,7 @@ type workflowHandle[R any] struct { } func (h *workflowHandle[R]) GetResult(opts ...GetResultOption) (R, error) { - options := &getResultOptions{} + options := defaultGetResultOptions() for _, opt := range opts { opt(options) } @@ -253,7 +268,7 @@ type workflowPollingHandle[R any] struct { } func (h *workflowPollingHandle[R]) GetResult(opts ...GetResultOption) (R, error) { - options := &getResultOptions{} + options := defaultGetResultOptions() for _, opt := range opts { opt(options) } @@ -269,7 +284,7 @@ func (h *workflowPollingHandle[R]) GetResult(opts ...GetResultOption) (R, error) } encodedResult, err := retryWithResult(ctx, func() (any, error) { - return h.dbosContext.(*dbosContext).systemDB.awaitWorkflowResult(ctx, h.workflowID) + return h.dbosContext.(*dbosContext).systemDB.awaitWorkflowResult(ctx, h.workflowID, options.pollInterval) }, withRetrierLogger(h.dbosContext.(*dbosContext).logger)) completedTime := time.Now() @@ -1051,7 +1066,7 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt c.logger.Warn("Workflow ID conflict detected. Waiting for existing workflow to complete", "workflow_id", workflowID) var encodedResult any encodedResult, err = retryWithResult(c, func() (any, error) { - return c.systemDB.awaitWorkflowResult(uncancellableCtx, workflowID) + return c.systemDB.awaitWorkflowResult(uncancellableCtx, workflowID, _DB_RETRY_INTERVAL) }, withRetrierLogger(c.logger)) // Keep the encoded result - decoding will happen in RunWorkflow[P,R] when we know the target type outcomeChan <- workflowOutcome[any]{result: encodedResult, err: err, needsDecoding: true} diff --git a/dbos/workflow_getresult_options_test.go b/dbos/workflow_getresult_options_test.go new file mode 100644 index 0000000..2d0973f --- /dev/null +++ b/dbos/workflow_getresult_options_test.go @@ -0,0 +1,153 @@ +package dbos + +import ( + "context" + "errors" + "io" + "log/slog" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type recordingSystemDB struct { + lastInterval time.Duration + result *string + err error +} + +func (r *recordingSystemDB) launch(ctx context.Context) {} +func (r *recordingSystemDB) shutdown(ctx context.Context, timeout time.Duration) {} +func (r *recordingSystemDB) resetSystemDB(ctx context.Context) error { return nil } +func (r *recordingSystemDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowStatusDBInput) (*insertWorkflowResult, error) { + return nil, errors.New("not implemented") +} +func (r *recordingSystemDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ([]WorkflowStatus, error) { + return nil, errors.New("not implemented") +} +func (r *recordingSystemDB) updateWorkflowOutcome(ctx context.Context, input updateWorkflowOutcomeDBInput) error { + return errors.New("not implemented") +} +func (r *recordingSystemDB) awaitWorkflowResult(ctx context.Context, workflowID string, pollInterval time.Duration) (*string, error) { + r.lastInterval = pollInterval + return r.result, r.err +} +func (r *recordingSystemDB) cancelWorkflow(ctx context.Context, workflowID string) error { + return errors.New("not implemented") +} +func (r *recordingSystemDB) cancelAllBefore(ctx context.Context, cutoffTime time.Time) error { + return errors.New("not implemented") +} +func (r *recordingSystemDB) resumeWorkflow(ctx context.Context, workflowID string) error { + return errors.New("not implemented") +} +func (r *recordingSystemDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) (string, error) { + return "", errors.New("not implemented") +} +func (r *recordingSystemDB) recordChildWorkflow(ctx context.Context, input recordChildWorkflowDBInput) error { + return errors.New("not implemented") +} +func (r *recordingSystemDB) checkChildWorkflow(ctx context.Context, workflowUUID string, functionID int) (*string, error) { + return nil, errors.New("not implemented") +} +func (r *recordingSystemDB) recordChildGetResult(ctx context.Context, input recordChildGetResultDBInput) error { + return errors.New("not implemented") +} +func (r *recordingSystemDB) recordOperationResult(ctx context.Context, input recordOperationResultDBInput) error { + return errors.New("not implemented") +} +func (r *recordingSystemDB) checkOperationExecution(ctx context.Context, input checkOperationExecutionDBInput) (*recordedResult, error) { + return nil, errors.New("not implemented") +} +func (r *recordingSystemDB) getWorkflowSteps(ctx context.Context, input getWorkflowStepsInput) ([]stepInfo, error) { + return nil, errors.New("not implemented") +} +func (r *recordingSystemDB) send(ctx context.Context, input WorkflowSendInput) error { + return errors.New("not implemented") +} +func (r *recordingSystemDB) recv(ctx context.Context, input recvInput) (*string, error) { + return nil, errors.New("not implemented") +} +func (r *recordingSystemDB) setEvent(ctx context.Context, input WorkflowSetEventInput) error { + return errors.New("not implemented") +} +func (r *recordingSystemDB) getEvent(ctx context.Context, input getEventInput) (*string, error) { + return nil, errors.New("not implemented") +} +func (r *recordingSystemDB) sleep(ctx context.Context, input sleepInput) (time.Duration, error) { + return 0, errors.New("not implemented") +} +func (r *recordingSystemDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInput) ([]dequeuedWorkflow, error) { + return nil, errors.New("not implemented") +} +func (r *recordingSystemDB) clearQueueAssignment(ctx context.Context, workflowID string) (bool, error) { + return false, errors.New("not implemented") +} +func (r *recordingSystemDB) getQueuePartitions(ctx context.Context, queueName string) ([]string, error) { + return nil, errors.New("not implemented") +} +func (r *recordingSystemDB) garbageCollectWorkflows(ctx context.Context, input garbageCollectWorkflowsInput) error { + return errors.New("not implemented") +} + +func newTestDBOSContext(systemDB systemDatabase) *dbosContext { + logger := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{})) + return &dbosContext{ + ctx: context.Background(), + logger: logger, + systemDB: systemDB, + workflowsWg: &sync.WaitGroup{}, + workflowRegistry: &sync.Map{}, + workflowCustomNametoFQN: &sync.Map{}, + queueRunner: newQueueRunner(logger), + } +} + +func TestGetResultUsesDefaultPollingInterval(t *testing.T) { + serializer := newJSONSerializer[string]() + encoded, err := serializer.Encode("ok") + require.NoError(t, err) + + sysDB := &recordingSystemDB{result: encoded} + ctx := newTestDBOSContext(sysDB) + handle := newWorkflowPollingHandle[string](ctx, "workflow-id") + + result, err := handle.GetResult() + require.NoError(t, err) + require.Equal(t, "ok", result) + assert.Equal(t, _DB_RETRY_INTERVAL, sysDB.lastInterval) +} + +func TestGetResultUsesCustomPollingInterval(t *testing.T) { + serializer := newJSONSerializer[int]() + encoded, err := serializer.Encode(42) + require.NoError(t, err) + + sysDB := &recordingSystemDB{result: encoded} + ctx := newTestDBOSContext(sysDB) + handle := newWorkflowPollingHandle[int](ctx, "workflow-id") + + customInterval := 25 * time.Millisecond + result, err := handle.GetResult(WithPollingInterval(customInterval)) + require.NoError(t, err) + require.Equal(t, 42, result) + assert.Equal(t, customInterval, sysDB.lastInterval) +} + +func TestGetResultIgnoresNonPositivePollingInterval(t *testing.T) { + serializer := newJSONSerializer[string]() + encoded, err := serializer.Encode("value") + require.NoError(t, err) + + sysDB := &recordingSystemDB{result: encoded} + ctx := newTestDBOSContext(sysDB) + handle := newWorkflowPollingHandle[string](ctx, "workflow-id") + + result, err := handle.GetResult(WithPollingInterval(0)) + require.NoError(t, err) + require.Equal(t, "value", result) + assert.Equal(t, _DB_RETRY_INTERVAL, sysDB.lastInterval) +} From e476a080d167a3706f19b843abca8ccfe51b2b48 Mon Sep 17 00:00:00 2001 From: shirzady1934 Date: Sun, 23 Nov 2025 02:17:34 +0330 Subject: [PATCH 2/3] deleted workflow_getresult_options_test.go and rewrite in workflows_test.go --- dbos/workflow_getresult_options_test.go | 153 ------------------------ dbos/workflows_test.go | 42 +++---- 2 files changed, 21 insertions(+), 174 deletions(-) delete mode 100644 dbos/workflow_getresult_options_test.go diff --git a/dbos/workflow_getresult_options_test.go b/dbos/workflow_getresult_options_test.go deleted file mode 100644 index 2d0973f..0000000 --- a/dbos/workflow_getresult_options_test.go +++ /dev/null @@ -1,153 +0,0 @@ -package dbos - -import ( - "context" - "errors" - "io" - "log/slog" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -type recordingSystemDB struct { - lastInterval time.Duration - result *string - err error -} - -func (r *recordingSystemDB) launch(ctx context.Context) {} -func (r *recordingSystemDB) shutdown(ctx context.Context, timeout time.Duration) {} -func (r *recordingSystemDB) resetSystemDB(ctx context.Context) error { return nil } -func (r *recordingSystemDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowStatusDBInput) (*insertWorkflowResult, error) { - return nil, errors.New("not implemented") -} -func (r *recordingSystemDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ([]WorkflowStatus, error) { - return nil, errors.New("not implemented") -} -func (r *recordingSystemDB) updateWorkflowOutcome(ctx context.Context, input updateWorkflowOutcomeDBInput) error { - return errors.New("not implemented") -} -func (r *recordingSystemDB) awaitWorkflowResult(ctx context.Context, workflowID string, pollInterval time.Duration) (*string, error) { - r.lastInterval = pollInterval - return r.result, r.err -} -func (r *recordingSystemDB) cancelWorkflow(ctx context.Context, workflowID string) error { - return errors.New("not implemented") -} -func (r *recordingSystemDB) cancelAllBefore(ctx context.Context, cutoffTime time.Time) error { - return errors.New("not implemented") -} -func (r *recordingSystemDB) resumeWorkflow(ctx context.Context, workflowID string) error { - return errors.New("not implemented") -} -func (r *recordingSystemDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) (string, error) { - return "", errors.New("not implemented") -} -func (r *recordingSystemDB) recordChildWorkflow(ctx context.Context, input recordChildWorkflowDBInput) error { - return errors.New("not implemented") -} -func (r *recordingSystemDB) checkChildWorkflow(ctx context.Context, workflowUUID string, functionID int) (*string, error) { - return nil, errors.New("not implemented") -} -func (r *recordingSystemDB) recordChildGetResult(ctx context.Context, input recordChildGetResultDBInput) error { - return errors.New("not implemented") -} -func (r *recordingSystemDB) recordOperationResult(ctx context.Context, input recordOperationResultDBInput) error { - return errors.New("not implemented") -} -func (r *recordingSystemDB) checkOperationExecution(ctx context.Context, input checkOperationExecutionDBInput) (*recordedResult, error) { - return nil, errors.New("not implemented") -} -func (r *recordingSystemDB) getWorkflowSteps(ctx context.Context, input getWorkflowStepsInput) ([]stepInfo, error) { - return nil, errors.New("not implemented") -} -func (r *recordingSystemDB) send(ctx context.Context, input WorkflowSendInput) error { - return errors.New("not implemented") -} -func (r *recordingSystemDB) recv(ctx context.Context, input recvInput) (*string, error) { - return nil, errors.New("not implemented") -} -func (r *recordingSystemDB) setEvent(ctx context.Context, input WorkflowSetEventInput) error { - return errors.New("not implemented") -} -func (r *recordingSystemDB) getEvent(ctx context.Context, input getEventInput) (*string, error) { - return nil, errors.New("not implemented") -} -func (r *recordingSystemDB) sleep(ctx context.Context, input sleepInput) (time.Duration, error) { - return 0, errors.New("not implemented") -} -func (r *recordingSystemDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInput) ([]dequeuedWorkflow, error) { - return nil, errors.New("not implemented") -} -func (r *recordingSystemDB) clearQueueAssignment(ctx context.Context, workflowID string) (bool, error) { - return false, errors.New("not implemented") -} -func (r *recordingSystemDB) getQueuePartitions(ctx context.Context, queueName string) ([]string, error) { - return nil, errors.New("not implemented") -} -func (r *recordingSystemDB) garbageCollectWorkflows(ctx context.Context, input garbageCollectWorkflowsInput) error { - return errors.New("not implemented") -} - -func newTestDBOSContext(systemDB systemDatabase) *dbosContext { - logger := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{})) - return &dbosContext{ - ctx: context.Background(), - logger: logger, - systemDB: systemDB, - workflowsWg: &sync.WaitGroup{}, - workflowRegistry: &sync.Map{}, - workflowCustomNametoFQN: &sync.Map{}, - queueRunner: newQueueRunner(logger), - } -} - -func TestGetResultUsesDefaultPollingInterval(t *testing.T) { - serializer := newJSONSerializer[string]() - encoded, err := serializer.Encode("ok") - require.NoError(t, err) - - sysDB := &recordingSystemDB{result: encoded} - ctx := newTestDBOSContext(sysDB) - handle := newWorkflowPollingHandle[string](ctx, "workflow-id") - - result, err := handle.GetResult() - require.NoError(t, err) - require.Equal(t, "ok", result) - assert.Equal(t, _DB_RETRY_INTERVAL, sysDB.lastInterval) -} - -func TestGetResultUsesCustomPollingInterval(t *testing.T) { - serializer := newJSONSerializer[int]() - encoded, err := serializer.Encode(42) - require.NoError(t, err) - - sysDB := &recordingSystemDB{result: encoded} - ctx := newTestDBOSContext(sysDB) - handle := newWorkflowPollingHandle[int](ctx, "workflow-id") - - customInterval := 25 * time.Millisecond - result, err := handle.GetResult(WithPollingInterval(customInterval)) - require.NoError(t, err) - require.Equal(t, 42, result) - assert.Equal(t, customInterval, sysDB.lastInterval) -} - -func TestGetResultIgnoresNonPositivePollingInterval(t *testing.T) { - serializer := newJSONSerializer[string]() - encoded, err := serializer.Encode("value") - require.NoError(t, err) - - sysDB := &recordingSystemDB{result: encoded} - ctx := newTestDBOSContext(sysDB) - handle := newWorkflowPollingHandle[string](ctx, "workflow-id") - - result, err := handle.GetResult(WithPollingInterval(0)) - require.NoError(t, err) - require.Equal(t, "value", result) - assert.Equal(t, _DB_RETRY_INTERVAL, sysDB.lastInterval) -} diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 6037d67..d6db770 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -4543,21 +4543,21 @@ func TestWorkflowIdentity(t *testing.T) { }) } -func TestWorkflowHandleTimeout(t *testing.T) { - dbosCtx := setupDBOS(t, true, true) - RegisterWorkflow(dbosCtx, slowWorkflow) +func TestWorkflowHandles(t *testing.T) { +dbosCtx := setupDBOS(t, true, true) +RegisterWorkflow(dbosCtx, slowWorkflow) - t.Run("WorkflowHandleTimeout", func(t *testing.T) { - handle, err := RunWorkflow(dbosCtx, slowWorkflow, 10*time.Second) - require.NoError(t, err, "failed to start workflow") +t.Run("WorkflowHandleTimeout", func(t *testing.T) { +handle, err := RunWorkflow(dbosCtx, slowWorkflow, 10*time.Second) +require.NoError(t, err, "failed to start workflow") - start := time.Now() - _, err = handle.GetResult(WithHandleTimeout(10 * time.Millisecond)) - duration := time.Since(start) +start := time.Now() +_, err = handle.GetResult(WithHandleTimeout(10*time.Millisecond), WithPollingInterval(1*time.Millisecond)) +duration := time.Since(start) - require.Error(t, err, "expected timeout error") - assert.Contains(t, err.Error(), "workflow result timeout") - assert.True(t, duration < 100*time.Millisecond, "timeout should occur quickly") +require.Error(t, err, "expected timeout error") +assert.Contains(t, err.Error(), "workflow result timeout") +assert.True(t, duration < 100*time.Millisecond, "timeout should occur quickly") assert.True(t, errors.Is(err, context.DeadlineExceeded), "expected error to be detectable as context.DeadlineExceeded, got: %v", err) }) @@ -4567,18 +4567,18 @@ func TestWorkflowHandleTimeout(t *testing.T) { originalHandle, err := RunWorkflow(dbosCtx, slowWorkflow, 10*time.Second) require.NoError(t, err, "failed to start workflow") - pollingHandle, err := RetrieveWorkflow[string](dbosCtx, originalHandle.GetWorkflowID()) - require.NoError(t, err, "failed to retrieve workflow") +pollingHandle, err := RetrieveWorkflow[string](dbosCtx, originalHandle.GetWorkflowID()) +require.NoError(t, err, "failed to retrieve workflow") - _, ok := pollingHandle.(*workflowPollingHandle[string]) - require.True(t, ok, "expected polling handle, got %T", pollingHandle) +_, ok := pollingHandle.(*workflowPollingHandle[string]) +require.True(t, ok, "expected polling handle, got %T", pollingHandle) - _, err = pollingHandle.GetResult(WithHandleTimeout(10 * time.Millisecond)) +_, err = pollingHandle.GetResult(WithHandleTimeout(10*time.Millisecond), WithPollingInterval(1*time.Millisecond)) - require.Error(t, err, "expected timeout error") - assert.True(t, errors.Is(err, context.DeadlineExceeded), - "expected error to be detectable as context.DeadlineExceeded, got: %v", err) - }) +require.Error(t, err, "expected timeout error") +assert.True(t, errors.Is(err, context.DeadlineExceeded), +"expected error to be detectable as context.DeadlineExceeded, got: %v", err) +}) } func TestWorkflowHandleContextCancel(t *testing.T) { From c049658f6b3ac99d857eb6b90960b58ae37239ba Mon Sep 17 00:00:00 2001 From: shirzady1934 Date: Wed, 3 Dec 2025 16:03:01 +0330 Subject: [PATCH 3/3] change function name and run fmt --- dbos/workflow.go | 4 ++-- dbos/workflows_test.go | 40 ++++++++++++++++++++-------------------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index cc2a7fa..1ed7755 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -120,9 +120,9 @@ func WithHandleTimeout(timeout time.Duration) GetResultOption { } } -// WithPollingInterval sets the polling interval for awaiting workflow completion in GetResult. +// WithHandlePollingInterval sets the polling interval for awaiting workflow completion in GetResult. // If a non-positive interval is provided, the default interval is used. -func WithPollingInterval(interval time.Duration) GetResultOption { +func WithHandlePollingInterval(interval time.Duration) GetResultOption { return func(opts *getResultOptions) { if interval > 0 { opts.pollInterval = interval diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index d6db770..182b2da 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -4544,20 +4544,20 @@ func TestWorkflowIdentity(t *testing.T) { } func TestWorkflowHandles(t *testing.T) { -dbosCtx := setupDBOS(t, true, true) -RegisterWorkflow(dbosCtx, slowWorkflow) + dbosCtx := setupDBOS(t, true, true) + RegisterWorkflow(dbosCtx, slowWorkflow) -t.Run("WorkflowHandleTimeout", func(t *testing.T) { -handle, err := RunWorkflow(dbosCtx, slowWorkflow, 10*time.Second) -require.NoError(t, err, "failed to start workflow") + t.Run("WorkflowHandleTimeout", func(t *testing.T) { + handle, err := RunWorkflow(dbosCtx, slowWorkflow, 10*time.Second) + require.NoError(t, err, "failed to start workflow") -start := time.Now() -_, err = handle.GetResult(WithHandleTimeout(10*time.Millisecond), WithPollingInterval(1*time.Millisecond)) -duration := time.Since(start) + start := time.Now() + _, err = handle.GetResult(WithHandleTimeout(10*time.Millisecond), WithHandlePollingInterval(1*time.Millisecond)) + duration := time.Since(start) -require.Error(t, err, "expected timeout error") -assert.Contains(t, err.Error(), "workflow result timeout") -assert.True(t, duration < 100*time.Millisecond, "timeout should occur quickly") + require.Error(t, err, "expected timeout error") + assert.Contains(t, err.Error(), "workflow result timeout") + assert.True(t, duration < 100*time.Millisecond, "timeout should occur quickly") assert.True(t, errors.Is(err, context.DeadlineExceeded), "expected error to be detectable as context.DeadlineExceeded, got: %v", err) }) @@ -4567,18 +4567,18 @@ assert.True(t, duration < 100*time.Millisecond, "timeout should occur quickly") originalHandle, err := RunWorkflow(dbosCtx, slowWorkflow, 10*time.Second) require.NoError(t, err, "failed to start workflow") -pollingHandle, err := RetrieveWorkflow[string](dbosCtx, originalHandle.GetWorkflowID()) -require.NoError(t, err, "failed to retrieve workflow") + pollingHandle, err := RetrieveWorkflow[string](dbosCtx, originalHandle.GetWorkflowID()) + require.NoError(t, err, "failed to retrieve workflow") -_, ok := pollingHandle.(*workflowPollingHandle[string]) -require.True(t, ok, "expected polling handle, got %T", pollingHandle) + _, ok := pollingHandle.(*workflowPollingHandle[string]) + require.True(t, ok, "expected polling handle, got %T", pollingHandle) -_, err = pollingHandle.GetResult(WithHandleTimeout(10*time.Millisecond), WithPollingInterval(1*time.Millisecond)) + _, err = pollingHandle.GetResult(WithHandleTimeout(10*time.Millisecond), WithHandlePollingInterval(1*time.Millisecond)) -require.Error(t, err, "expected timeout error") -assert.True(t, errors.Is(err, context.DeadlineExceeded), -"expected error to be detectable as context.DeadlineExceeded, got: %v", err) -}) + require.Error(t, err, "expected timeout error") + assert.True(t, errors.Is(err, context.DeadlineExceeded), + "expected error to be detectable as context.DeadlineExceeded, got: %v", err) + }) } func TestWorkflowHandleContextCancel(t *testing.T) {