diff --git a/cmd/dbos/cli_integration_test.go b/cmd/dbos/cli_integration_test.go index 1441fb9..ee0f979 100644 --- a/cmd/dbos/cli_integration_test.go +++ b/cmd/dbos/cli_integration_test.go @@ -376,6 +376,24 @@ func testListWorkflows(t *testing.T, cliPath string, baseArgs []string, dbRole s workflowID, exists := response["workflow_id"] assert.True(t, exists, "Response should contain workflow_id") assert.NotEmpty(t, workflowID, "Workflow ID should not be empty") + + require.Eventually(t, func() bool { + args := append([]string{"workflow", "list", "--queue", "example-queue"}, baseArgs...) + cmd := exec.Command(cliPath, args...) + cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) + + output, err := cmd.CombinedOutput() + if err != nil { + return false + } + var workflows []dbos.WorkflowStatus + err = json.Unmarshal(output, &workflows) + if err != nil { + return false + } + return len(workflows) >= 10 + }, 5*time.Second, 500*time.Millisecond, "Should find at least 10 workflows in the queue") + // Get the current time for time-based filtering currentTime := time.Now() diff --git a/dbos/system_database.go b/dbos/system_database.go index 6fbc703..6df5547 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -47,7 +47,6 @@ type systemDatabase interface { // Child workflows recordChildWorkflow(ctx context.Context, input recordChildWorkflowDBInput) error checkChildWorkflow(ctx context.Context, workflowUUID string, functionID int) (*string, error) - recordChildGetResult(ctx context.Context, input recordChildGetResultDBInput) error // Steps recordOperationResult(ctx context.Context, input recordOperationResultDBInput) error @@ -1228,51 +1227,53 @@ func (s *sysDB) awaitWorkflowResult(ctx context.Context, workflowID string, poll } type recordOperationResultDBInput struct { - workflowID string - stepID int - stepName string - output *string - err error - tx pgx.Tx - startedAt time.Time - completedAt time.Time + workflowID string + childWorkflowID string + stepID int + stepName string + output *string + err error + tx pgx.Tx + startedAt time.Time + completedAt time.Time + isGetResult bool } func (s *sysDB) recordOperationResult(ctx context.Context, input recordOperationResultDBInput) error { startedAtMs := input.startedAt.UnixMilli() completedAtMs := input.completedAt.UnixMilli() - query := fmt.Sprintf(`INSERT INTO %s.operation_outputs - (workflow_uuid, function_id, output, error, function_name, started_at_epoch_ms, completed_at_epoch_ms) - VALUES ($1, $2, $3, $4, $5, $6, $7)`, pgx.Identifier{s.schema}.Sanitize()) - var errorString *string if input.err != nil { e := input.err.Error() errorString = &e } + columns := []string{"workflow_uuid", "function_id", "output", "error", "function_name", "started_at_epoch_ms", "completed_at_epoch_ms"} + placeholders := []string{"$1", "$2", "$3", "$4", "$5", "$6", "$7"} + args := []any{input.workflowID, input.stepID, input.output, errorString, input.stepName, startedAtMs, completedAtMs} + argCounter := 7 + + if input.childWorkflowID != "" { + columns = append(columns, "child_workflow_id") + argCounter++ + placeholders = append(placeholders, fmt.Sprintf("$%d", argCounter)) + args = append(args, input.childWorkflowID) + } + + conflictClause := "" + if input.isGetResult { + conflictClause = "ON CONFLICT DO NOTHING" + } + + query := fmt.Sprintf(`INSERT INTO %s.operation_outputs (%s) VALUES (%s) %s`, + pgx.Identifier{s.schema}.Sanitize(), strings.Join(columns, ", "), strings.Join(placeholders, ", "), conflictClause) + var err error if input.tx != nil { - _, err = input.tx.Exec(ctx, query, - input.workflowID, - input.stepID, - input.output, - errorString, - input.stepName, - startedAtMs, - completedAtMs, - ) + _, err = input.tx.Exec(ctx, query, args...) } else { - _, err = s.pool.Exec(ctx, query, - input.workflowID, - input.stepID, - input.output, - errorString, - input.stepName, - startedAtMs, - completedAtMs, - ) + _, err = s.pool.Exec(ctx, query, args...) } if err != nil { @@ -1355,47 +1356,6 @@ func (s *sysDB) checkChildWorkflow(ctx context.Context, workflowID string, funct return childWorkflowID, nil } -type recordChildGetResultDBInput struct { - parentWorkflowID string - childWorkflowID string - stepID int - output *string - err error - startedAt time.Time - completedAt time.Time -} - -func (s *sysDB) recordChildGetResult(ctx context.Context, input recordChildGetResultDBInput) error { - startedAtMs := input.startedAt.UnixMilli() - completedAtMs := input.completedAt.UnixMilli() - - query := fmt.Sprintf(`INSERT INTO %s.operation_outputs - (workflow_uuid, function_id, function_name, output, error, child_workflow_id, started_at_epoch_ms, completed_at_epoch_ms) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - ON CONFLICT DO NOTHING`, pgx.Identifier{s.schema}.Sanitize()) - - var errorString *string - if input.err != nil { - e := input.err.Error() - errorString = &e - } - - _, err := s.pool.Exec(ctx, query, - input.parentWorkflowID, - input.stepID, - "DBOS.getResult", - input.output, - errorString, - input.childWorkflowID, - startedAtMs, - completedAtMs, - ) - if err != nil { - return fmt.Errorf("failed to record get result: %w", err) - } - return nil -} - /*******************************/ /******* STEPS ********/ /*******************************/ diff --git a/dbos/workflow.go b/dbos/workflow.go index 1ed7755..cd2904f 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -243,17 +243,19 @@ func (h *workflowHandle[R]) processOutcome(outcome workflowOutcome[R], startTime if encErr != nil { return *new(R), newWorkflowExecutionError(workflowState.workflowID, fmt.Errorf("serializing child workflow result: %w", encErr)) } - recordGetResultInput := recordChildGetResultDBInput{ - parentWorkflowID: workflowState.workflowID, - childWorkflowID: h.workflowID, - stepID: workflowState.nextStepID(), - output: encodedOutput, - err: outcome.err, - startedAt: startTime, - completedAt: completedTime, + recordGetResultInput := recordOperationResultDBInput{ + workflowID: workflowState.workflowID, + childWorkflowID: h.workflowID, + stepID: workflowState.nextStepID(), + output: encodedOutput, + err: outcome.err, + startedAt: startTime, + completedAt: completedTime, + isGetResult: true, + stepName: "DBOS.getResult", } recordResultErr := retry(h.dbosContext, func() error { - return h.dbosContext.(*dbosContext).systemDB.recordChildGetResult(h.dbosContext, recordGetResultInput) + return h.dbosContext.(*dbosContext).systemDB.recordOperationResult(h.dbosContext, recordGetResultInput) }, withRetrierLogger(h.dbosContext.(*dbosContext).logger)) if recordResultErr != nil { h.dbosContext.(*dbosContext).logger.Error("failed to record get result", "error", recordResultErr) @@ -307,17 +309,19 @@ func (h *workflowPollingHandle[R]) GetResult(opts ...GetResultOption) (R, error) workflowState, ok := h.dbosContext.Value(workflowStateKey).(*workflowState) isWithinWorkflow := ok && workflowState != nil if isWithinWorkflow { - recordGetResultInput := recordChildGetResultDBInput{ - parentWorkflowID: workflowState.workflowID, - childWorkflowID: h.workflowID, - stepID: workflowState.nextStepID(), - output: encodedStr, - err: err, - startedAt: startTime, - completedAt: completedTime, + recordGetResultInput := recordOperationResultDBInput{ + workflowID: workflowState.workflowID, + childWorkflowID: h.workflowID, + stepID: workflowState.nextStepID(), + output: encodedStr, + err: err, + startedAt: startTime, + completedAt: completedTime, + isGetResult: true, + stepName: "DBOS.getResult", } recordResultErr := retry(h.dbosContext, func() error { - return h.dbosContext.(*dbosContext).systemDB.recordChildGetResult(h.dbosContext, recordGetResultInput) + return h.dbosContext.(*dbosContext).systemDB.recordOperationResult(h.dbosContext, recordGetResultInput) }, withRetrierLogger(h.dbosContext.(*dbosContext).logger)) if recordResultErr != nil { h.dbosContext.(*dbosContext).logger.Error("failed to record get result", "error", recordResultErr)