From 6855f3436839ad38080b6de845336d24108dd1b5 Mon Sep 17 00:00:00 2001 From: shirzady1934 Date: Thu, 4 Dec 2025 17:14:24 +0330 Subject: [PATCH 1/4] Merge recordChildGetResult into recordOperationResult --- dbos/system_database.go | 174 +++++++++------------------------------- dbos/workflow.go | 40 ++++----- 2 files changed, 59 insertions(+), 155 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 6fbc703..1c68af3 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -47,7 +47,6 @@ type systemDatabase interface { // Child workflows recordChildWorkflow(ctx context.Context, input recordChildWorkflowDBInput) error checkChildWorkflow(ctx context.Context, workflowUUID string, functionID int) (*string, error) - recordChildGetResult(ctx context.Context, input recordChildGetResultDBInput) error // Steps recordOperationResult(ctx context.Context, input recordOperationResultDBInput) error @@ -1228,171 +1227,72 @@ func (s *sysDB) awaitWorkflowResult(ctx context.Context, workflowID string, poll } type recordOperationResultDBInput struct { - workflowID string - stepID int - stepName string - output *string - err error - tx pgx.Tx - startedAt time.Time - completedAt time.Time + workflowID string + childWorkflowID string + stepID int + stepName string + output *string + err error + tx pgx.Tx + startedAt time.Time + completedAt time.Time + isGetResult bool } func (s *sysDB) recordOperationResult(ctx context.Context, input recordOperationResultDBInput) error { startedAtMs := input.startedAt.UnixMilli() completedAtMs := input.completedAt.UnixMilli() - query := fmt.Sprintf(`INSERT INTO %s.operation_outputs - (workflow_uuid, function_id, output, error, function_name, started_at_epoch_ms, completed_at_epoch_ms) - VALUES ($1, $2, $3, $4, $5, $6, $7)`, pgx.Identifier{s.schema}.Sanitize()) + var query string + args := []any{ + input.workflowID, + input.stepID, + input.output, + } var errorString *string if input.err != nil { e := input.err.Error() errorString = &e } + args = append(args, errorString) + args = append(args, input.stepName) - var err error - if input.tx != nil { - _, err = input.tx.Exec(ctx, query, - input.workflowID, - input.stepID, - input.output, - errorString, - input.stepName, - startedAtMs, - completedAtMs, - ) - } else { - _, err = s.pool.Exec(ctx, query, - input.workflowID, - input.stepID, - input.output, - errorString, - input.stepName, - startedAtMs, - completedAtMs, - ) - } + columns := "workflow_uuid, function_id, output, error, function_name, started_at_epoch_ms, completed_at_epoch_ms" + values := "$1, $2, $3, $4, $5, $6, $7" + argCounter := 7 - if err != nil { - if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == _PG_ERROR_UNIQUE_VIOLATION { - return newWorkflowConflictIDError(input.workflowID) - } - return err + if input.childWorkflowID != "" { + columns += ", child_workflow_id" + argCounter++ + values += fmt.Sprintf(", $%d", argCounter) + args = append(args, input.childWorkflowID) } - return nil -} - -/*******************************/ -/******* CHILD WORKFLOWS ********/ -/*******************************/ + args = append(args, startedAtMs, completedAtMs) -type recordChildWorkflowDBInput struct { - parentWorkflowID string - childWorkflowID string - stepID int - stepName string - tx pgx.Tx -} + conflictClause := "" + if input.isGetResult { + conflictClause = "ON CONFLICT DO NOTHING" + } -func (s *sysDB) recordChildWorkflow(ctx context.Context, input recordChildWorkflowDBInput) error { - query := fmt.Sprintf(`INSERT INTO %s.operation_outputs - (workflow_uuid, function_id, function_name, child_workflow_id) - VALUES ($1, $2, $3, $4)`, pgx.Identifier{s.schema}.Sanitize()) + query = fmt.Sprintf(`INSERT INTO %s.operation_outputs (%s) VALUES (%s) %s`, + pgx.Identifier{s.schema}.Sanitize(), columns, values, conflictClause) - var commandTag pgconn.CommandTag var err error - if input.tx != nil { - commandTag, err = input.tx.Exec(ctx, query, - input.parentWorkflowID, - input.stepID, - input.stepName, - input.childWorkflowID, - ) + _, err = input.tx.Exec(ctx, query, args...) } else { - commandTag, err = s.pool.Exec(ctx, query, - input.parentWorkflowID, - input.stepID, - input.stepName, - input.childWorkflowID, - ) + _, err = s.pool.Exec(ctx, query, args...) } if err != nil { - // Check for unique constraint violation (conflict ID error) if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == _PG_ERROR_UNIQUE_VIOLATION { - return fmt.Errorf( - "child workflow %s already registered for parent workflow %s (operation ID: %d)", - input.childWorkflowID, input.parentWorkflowID, input.stepID) - } - return fmt.Errorf("failed to record child workflow: %w", err) - } - - if commandTag.RowsAffected() == 0 { - s.logger.Warn("RecordChildWorkflow No rows were affected by the insert") - } - - return nil -} - -func (s *sysDB) checkChildWorkflow(ctx context.Context, workflowID string, functionID int) (*string, error) { - query := fmt.Sprintf(`SELECT child_workflow_id - FROM %s.operation_outputs - WHERE workflow_uuid = $1 AND function_id = $2`, pgx.Identifier{s.schema}.Sanitize()) - - var childWorkflowID *string - err := s.pool.QueryRow(ctx, query, workflowID, functionID).Scan(&childWorkflowID) - if err != nil { - if err == pgx.ErrNoRows { - return nil, nil + return newWorkflowConflictIDError(input.workflowID) } - return nil, fmt.Errorf("failed to check child workflow: %w", err) - } - - return childWorkflowID, nil -} - -type recordChildGetResultDBInput struct { - parentWorkflowID string - childWorkflowID string - stepID int - output *string - err error - startedAt time.Time - completedAt time.Time -} - -func (s *sysDB) recordChildGetResult(ctx context.Context, input recordChildGetResultDBInput) error { - startedAtMs := input.startedAt.UnixMilli() - completedAtMs := input.completedAt.UnixMilli() - - query := fmt.Sprintf(`INSERT INTO %s.operation_outputs - (workflow_uuid, function_id, function_name, output, error, child_workflow_id, started_at_epoch_ms, completed_at_epoch_ms) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - ON CONFLICT DO NOTHING`, pgx.Identifier{s.schema}.Sanitize()) - - var errorString *string - if input.err != nil { - e := input.err.Error() - errorString = &e + return err } - _, err := s.pool.Exec(ctx, query, - input.parentWorkflowID, - input.stepID, - "DBOS.getResult", - input.output, - errorString, - input.childWorkflowID, - startedAtMs, - completedAtMs, - ) - if err != nil { - return fmt.Errorf("failed to record get result: %w", err) - } return nil } diff --git a/dbos/workflow.go b/dbos/workflow.go index 1ed7755..cd2904f 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -243,17 +243,19 @@ func (h *workflowHandle[R]) processOutcome(outcome workflowOutcome[R], startTime if encErr != nil { return *new(R), newWorkflowExecutionError(workflowState.workflowID, fmt.Errorf("serializing child workflow result: %w", encErr)) } - recordGetResultInput := recordChildGetResultDBInput{ - parentWorkflowID: workflowState.workflowID, - childWorkflowID: h.workflowID, - stepID: workflowState.nextStepID(), - output: encodedOutput, - err: outcome.err, - startedAt: startTime, - completedAt: completedTime, + recordGetResultInput := recordOperationResultDBInput{ + workflowID: workflowState.workflowID, + childWorkflowID: h.workflowID, + stepID: workflowState.nextStepID(), + output: encodedOutput, + err: outcome.err, + startedAt: startTime, + completedAt: completedTime, + isGetResult: true, + stepName: "DBOS.getResult", } recordResultErr := retry(h.dbosContext, func() error { - return h.dbosContext.(*dbosContext).systemDB.recordChildGetResult(h.dbosContext, recordGetResultInput) + return h.dbosContext.(*dbosContext).systemDB.recordOperationResult(h.dbosContext, recordGetResultInput) }, withRetrierLogger(h.dbosContext.(*dbosContext).logger)) if recordResultErr != nil { h.dbosContext.(*dbosContext).logger.Error("failed to record get result", "error", recordResultErr) @@ -307,17 +309,19 @@ func (h *workflowPollingHandle[R]) GetResult(opts ...GetResultOption) (R, error) workflowState, ok := h.dbosContext.Value(workflowStateKey).(*workflowState) isWithinWorkflow := ok && workflowState != nil if isWithinWorkflow { - recordGetResultInput := recordChildGetResultDBInput{ - parentWorkflowID: workflowState.workflowID, - childWorkflowID: h.workflowID, - stepID: workflowState.nextStepID(), - output: encodedStr, - err: err, - startedAt: startTime, - completedAt: completedTime, + recordGetResultInput := recordOperationResultDBInput{ + workflowID: workflowState.workflowID, + childWorkflowID: h.workflowID, + stepID: workflowState.nextStepID(), + output: encodedStr, + err: err, + startedAt: startTime, + completedAt: completedTime, + isGetResult: true, + stepName: "DBOS.getResult", } recordResultErr := retry(h.dbosContext, func() error { - return h.dbosContext.(*dbosContext).systemDB.recordChildGetResult(h.dbosContext, recordGetResultInput) + return h.dbosContext.(*dbosContext).systemDB.recordOperationResult(h.dbosContext, recordGetResultInput) }, withRetrierLogger(h.dbosContext.(*dbosContext).logger)) if recordResultErr != nil { h.dbosContext.(*dbosContext).logger.Error("failed to record get result", "error", recordResultErr) From 67c3cf8dd00c3dbee57e95868cf301c17fd562bc Mon Sep 17 00:00:00 2001 From: shirzady1934 Date: Thu, 4 Dec 2025 17:24:19 +0330 Subject: [PATCH 2/4] define recordChildWorkflowDBInput --- dbos/system_database.go | 69 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/dbos/system_database.go b/dbos/system_database.go index 1c68af3..e94055f 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1296,6 +1296,75 @@ func (s *sysDB) recordOperationResult(ctx context.Context, input recordOperation return nil } +/*******************************/ +/******* CHILD WORKFLOWS ********/ +/*******************************/ + +type recordChildWorkflowDBInput struct { + parentWorkflowID string + childWorkflowID string + stepID int + stepName string + tx pgx.Tx +} + +func (s *sysDB) recordChildWorkflow(ctx context.Context, input recordChildWorkflowDBInput) error { + query := fmt.Sprintf(`INSERT INTO %s.operation_outputs + (workflow_uuid, function_id, function_name, child_workflow_id) + VALUES ($1, $2, $3, $4)`, pgx.Identifier{s.schema}.Sanitize()) + + var commandTag pgconn.CommandTag + var err error + + if input.tx != nil { + commandTag, err = input.tx.Exec(ctx, query, + input.parentWorkflowID, + input.stepID, + input.stepName, + input.childWorkflowID, + ) + } else { + commandTag, err = s.pool.Exec(ctx, query, + input.parentWorkflowID, + input.stepID, + input.stepName, + input.childWorkflowID, + ) + } + + if err != nil { + // Check for unique constraint violation (conflict ID error) + if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == _PG_ERROR_UNIQUE_VIOLATION { + return fmt.Errorf( + "child workflow %s already registered for parent workflow %s (operation ID: %d)", + input.childWorkflowID, input.parentWorkflowID, input.stepID) + } + return fmt.Errorf("failed to record child workflow: %w", err) + } + + if commandTag.RowsAffected() == 0 { + s.logger.Warn("RecordChildWorkflow No rows were affected by the insert") + } + + return nil +} + +func (s *sysDB) checkChildWorkflow(ctx context.Context, workflowID string, functionID int) (*string, error) { + query := fmt.Sprintf(`SELECT child_workflow_id + FROM %s.operation_outputs + WHERE workflow_uuid = $1 AND function_id = $2`, pgx.Identifier{s.schema}.Sanitize()) + + var childWorkflowID *string + err := s.pool.QueryRow(ctx, query, workflowID, functionID).Scan(&childWorkflowID) + if err != nil { + if err == pgx.ErrNoRows { + return nil, nil + } + return nil, fmt.Errorf("failed to check child workflow: %w", err) + } + + return childWorkflowID, nil +} /*******************************/ /******* STEPS ********/ /*******************************/ From d68464a0410432cc98f892fb177f9e08b14e3fa8 Mon Sep 17 00:00:00 2001 From: shirzady1934 Date: Thu, 4 Dec 2025 17:44:31 +0330 Subject: [PATCH 3/4] fix bug in recordOperationResult. --- dbos/system_database.go | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index e94055f..f121918 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1243,41 +1243,31 @@ func (s *sysDB) recordOperationResult(ctx context.Context, input recordOperation startedAtMs := input.startedAt.UnixMilli() completedAtMs := input.completedAt.UnixMilli() - var query string - args := []any{ - input.workflowID, - input.stepID, - input.output, - } - var errorString *string if input.err != nil { e := input.err.Error() errorString = &e } - args = append(args, errorString) - args = append(args, input.stepName) - columns := "workflow_uuid, function_id, output, error, function_name, started_at_epoch_ms, completed_at_epoch_ms" - values := "$1, $2, $3, $4, $5, $6, $7" + columns := []string{"workflow_uuid", "function_id", "output", "error", "function_name", "started_at_epoch_ms", "completed_at_epoch_ms"} + placeholders := []string{"$1", "$2", "$3", "$4", "$5", "$6", "$7"} + args := []any{input.workflowID, input.stepID, input.output, errorString, input.stepName, startedAtMs, completedAtMs} argCounter := 7 if input.childWorkflowID != "" { - columns += ", child_workflow_id" + columns = append(columns, "child_workflow_id") argCounter++ - values += fmt.Sprintf(", $%d", argCounter) + placeholders = append(placeholders, fmt.Sprintf("$%d", argCounter)) args = append(args, input.childWorkflowID) } - args = append(args, startedAtMs, completedAtMs) - conflictClause := "" if input.isGetResult { conflictClause = "ON CONFLICT DO NOTHING" } - query = fmt.Sprintf(`INSERT INTO %s.operation_outputs (%s) VALUES (%s) %s`, - pgx.Identifier{s.schema}.Sanitize(), columns, values, conflictClause) + query := fmt.Sprintf(`INSERT INTO %s.operation_outputs (%s) VALUES (%s) %s`, + pgx.Identifier{s.schema}.Sanitize(), strings.Join(columns, ", "), strings.Join(placeholders, ", "), conflictClause) var err error if input.tx != nil { From 94a9e331cc8e4f00b60dcc509a97545754d66c86 Mon Sep 17 00:00:00 2001 From: shirzady1934 Date: Thu, 4 Dec 2025 18:21:16 +0330 Subject: [PATCH 4/4] Fix race in TestCLIWorkflow by waiting for async QueueWorkflow via Eventually --- cmd/dbos/cli_integration_test.go | 18 ++++++++++++++++++ dbos/system_database.go | 1 + 2 files changed, 19 insertions(+) diff --git a/cmd/dbos/cli_integration_test.go b/cmd/dbos/cli_integration_test.go index 1441fb9..ee0f979 100644 --- a/cmd/dbos/cli_integration_test.go +++ b/cmd/dbos/cli_integration_test.go @@ -376,6 +376,24 @@ func testListWorkflows(t *testing.T, cliPath string, baseArgs []string, dbRole s workflowID, exists := response["workflow_id"] assert.True(t, exists, "Response should contain workflow_id") assert.NotEmpty(t, workflowID, "Workflow ID should not be empty") + + require.Eventually(t, func() bool { + args := append([]string{"workflow", "list", "--queue", "example-queue"}, baseArgs...) + cmd := exec.Command(cliPath, args...) + cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL(dbRole)) + + output, err := cmd.CombinedOutput() + if err != nil { + return false + } + var workflows []dbos.WorkflowStatus + err = json.Unmarshal(output, &workflows) + if err != nil { + return false + } + return len(workflows) >= 10 + }, 5*time.Second, 500*time.Millisecond, "Should find at least 10 workflows in the queue") + // Get the current time for time-based filtering currentTime := time.Now() diff --git a/dbos/system_database.go b/dbos/system_database.go index f121918..6df5547 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1355,6 +1355,7 @@ func (s *sysDB) checkChildWorkflow(ctx context.Context, workflowID string, funct return childWorkflowID, nil } + /*******************************/ /******* STEPS ********/ /*******************************/