Skip to content

Commit 75cdba9

Browse files
authored
Fix steps registration and admin server steps output (#162)
- Fix steps output/error formatting in get workflow steps admin server endpoint - Fix error formatting in admin server list workflows endpoint - Fix step output registration: we now automatically register steps output type for gob encoding
1 parent 1f6b69f commit 75cdba9

File tree

4 files changed

+323
-8
lines changed

4 files changed

+323
-8
lines changed

dbos/admin_server.go

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) {
117117
"AssumedRole": ws.AssumedRole,
118118
"AuthenticatedRoles": ws.AuthenticatedRoles,
119119
"Output": ws.Output,
120-
"Error": ws.Error,
121120
"ExecutorID": ws.ExecutorID,
122121
"ApplicationVersion": ws.ApplicationVersion,
123122
"ApplicationID": ws.ApplicationID,
@@ -169,6 +168,18 @@ func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) {
169168
result["Output"] = string(bytes)
170169
}
171170

171+
if ws.Error != nil {
172+
// Convert error to string first, then marshal as JSON
173+
errStr := ws.Error.Error()
174+
bytes, err := json.Marshal(errStr)
175+
if err != nil {
176+
return nil, fmt.Errorf("failed to marshal error: %w", err)
177+
}
178+
result["Error"] = string(bytes)
179+
} else {
180+
result["Error"] = ""
181+
}
182+
172183
return result, nil
173184
}
174185

@@ -422,13 +433,37 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
422433
// Transform to snake_case format with function_id and function_name
423434
formattedSteps := make([]map[string]any, len(steps))
424435
for i, step := range steps {
425-
formattedSteps[i] = map[string]any{
436+
formattedStep := map[string]any{
426437
"function_id": step.StepID,
427438
"function_name": step.StepName,
428-
"output": step.Output,
429-
"error": step.Error,
430439
"child_workflow_id": step.ChildWorkflowID,
431440
}
441+
442+
// Marshal Output as JSON string if present
443+
if step.Output != nil && step.Output != "" {
444+
bytes, err := json.Marshal(step.Output)
445+
if err != nil {
446+
ctx.logger.Error("Failed to marshal step output", "error", err)
447+
http.Error(w, fmt.Sprintf("Failed to format step output: %v", err), http.StatusInternalServerError)
448+
return
449+
}
450+
formattedStep["output"] = string(bytes)
451+
}
452+
453+
// Marshal Error as JSON string if present
454+
if step.Error != nil {
455+
// Convert error to string first, then marshal as JSON
456+
errStr := step.Error.Error()
457+
bytes, err := json.Marshal(errStr)
458+
if err != nil {
459+
ctx.logger.Error("Failed to marshal step error", "error", err)
460+
http.Error(w, fmt.Sprintf("Failed to format step error: %v", err), http.StatusInternalServerError)
461+
return
462+
}
463+
formattedStep["error"] = string(bytes)
464+
}
465+
466+
formattedSteps[i] = formattedStep
432467
}
433468

434469
w.Header().Set("Content-Type", "application/json")

dbos/admin_server_test.go

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@ import (
1717
"go.uber.org/goleak"
1818
)
1919

20+
// TestStepResult is a custom struct for testing step outputs
21+
type TestStepResult struct {
22+
Message string `json:"message"`
23+
Count int `json:"count"`
24+
Success bool `json:"success"`
25+
}
26+
2027
func TestAdminServer(t *testing.T) {
2128
defer goleak.VerifyNone(t,
2229
goleak.IgnoreAnyFunction("github.com/jackc/pgx/v5/pgxpool.(*Pool).backgroundHealthCheck"),
@@ -726,6 +733,168 @@ func TestAdminServer(t *testing.T) {
726733
assert.Equal(t, queue.Name, queueName, "Expected queue name to be 'test-queue'")
727734
})
728735

736+
t.Run("WorkflowSteps", func(t *testing.T) {
737+
resetTestDatabase(t, databaseURL)
738+
ctx, err := NewDBOSContext(context.Background(), Config{
739+
DatabaseURL: databaseURL,
740+
AppName: "test-app",
741+
AdminServer: true,
742+
})
743+
require.NoError(t, err)
744+
745+
// Test workflow with multiple steps - simpler version that won't fail on serialization
746+
testWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
747+
// Step 1: Return a string
748+
stepResult1, err := RunAsStep(dbosCtx, func(ctx context.Context) (string, error) {
749+
return "step1-output", nil
750+
}, WithStepName("stringStep"))
751+
if err != nil {
752+
return "", err
753+
}
754+
755+
// Step 2: Return a user-defined struct
756+
stepResult2, err := RunAsStep(dbosCtx, func(ctx context.Context) (TestStepResult, error) {
757+
return TestStepResult{
758+
Message: "structured data",
759+
Count: 100,
760+
Success: true,
761+
}, nil
762+
}, WithStepName("structStep"))
763+
if err != nil {
764+
return "", err
765+
}
766+
767+
// Step 3: Return an error - but we don't abort on error to test error marshaling
768+
_, _ = RunAsStep(dbosCtx, func(ctx context.Context) (string, error) {
769+
return "", fmt.Errorf("deliberate error for testing")
770+
}, WithStepName("errorStep"))
771+
772+
// Step 4: Return empty string (to test empty value handling)
773+
stepResult4, err := RunAsStep(dbosCtx, func(ctx context.Context) (string, error) {
774+
return "", nil
775+
}, WithStepName("emptyStep"))
776+
if err != nil {
777+
return "", err
778+
}
779+
780+
// Combine results
781+
return fmt.Sprintf("workflow complete: %s, struct(%s,%d,%v), %s", stepResult1, stepResult2.Message, stepResult2.Count, stepResult2.Success, stepResult4), nil
782+
}
783+
784+
RegisterWorkflow(ctx, testWorkflow)
785+
786+
err = Launch(ctx)
787+
require.NoError(t, err)
788+
789+
// Ensure cleanup
790+
defer func() {
791+
if ctx != nil {
792+
Shutdown(ctx, 1*time.Minute)
793+
}
794+
}()
795+
796+
// Give the server a moment to start
797+
time.Sleep(100 * time.Millisecond)
798+
799+
client := &http.Client{Timeout: 5 * time.Second}
800+
801+
// Create and run the workflow
802+
handle, err := RunWorkflow(ctx, testWorkflow, "test-input")
803+
require.NoError(t, err, "Failed to create workflow")
804+
805+
// Wait for workflow to complete
806+
result, err := handle.GetResult()
807+
require.NoError(t, err, "Workflow should complete successfully")
808+
t.Logf("Workflow result: %s", result)
809+
810+
// Call the workflow steps endpoint
811+
workflowID := handle.GetWorkflowID()
812+
endpoint := fmt.Sprintf("http://localhost:%d/workflows/%s/steps", _DEFAULT_ADMIN_SERVER_PORT, workflowID)
813+
req, err := http.NewRequest("GET", endpoint, nil)
814+
require.NoError(t, err, "Failed to create request")
815+
816+
resp, err := client.Do(req)
817+
require.NoError(t, err, "Failed to make request")
818+
defer resp.Body.Close()
819+
820+
assert.Equal(t, http.StatusOK, resp.StatusCode, "Expected 200 OK from steps endpoint")
821+
822+
// Decode the response
823+
var steps []map[string]any
824+
err = json.NewDecoder(resp.Body).Decode(&steps)
825+
require.NoError(t, err, "Failed to decode steps response")
826+
827+
// Should have 4 steps
828+
assert.Equal(t, 4, len(steps), "Expected exactly 4 steps")
829+
830+
// Verify each step's output/error is properly marshaled
831+
for i, step := range steps {
832+
functionName, ok := step["function_name"].(string)
833+
require.True(t, ok, "function_name should be a string for step %d", i)
834+
835+
t.Logf("Step %d (%s): output=%v, error=%v", i, functionName, step["output"], step["error"])
836+
837+
switch functionName {
838+
case "stringStep":
839+
// String output should be marshaled as JSON string
840+
outputStr, ok := step["output"].(string)
841+
require.True(t, ok, "String step output should be a JSON string")
842+
843+
var unmarshaledOutput string
844+
err = json.Unmarshal([]byte(outputStr), &unmarshaledOutput)
845+
require.NoError(t, err, "Failed to unmarshal string step output")
846+
assert.Equal(t, "step1-output", unmarshaledOutput, "String step output should match")
847+
848+
assert.Nil(t, step["error"], "String step should have no error")
849+
850+
case "structStep":
851+
// Struct output should be marshaled as JSON string
852+
outputStr, ok := step["output"].(string)
853+
require.True(t, ok, "Struct step output should be a JSON string")
854+
855+
var unmarshaledOutput TestStepResult
856+
err = json.Unmarshal([]byte(outputStr), &unmarshaledOutput)
857+
require.NoError(t, err, "Failed to unmarshal struct step output")
858+
assert.Equal(t, TestStepResult{
859+
Message: "structured data",
860+
Count: 100,
861+
Success: true,
862+
}, unmarshaledOutput, "Struct step output should match")
863+
864+
assert.Nil(t, step["error"], "Struct step should have no error")
865+
866+
case "errorStep":
867+
// Error step should have error marshaled as JSON string
868+
errorStr, ok := step["error"].(string)
869+
require.True(t, ok, "Error step error should be a JSON string")
870+
871+
var unmarshaledError string
872+
err = json.Unmarshal([]byte(errorStr), &unmarshaledError)
873+
require.NoError(t, err, "Failed to unmarshal error step error")
874+
assert.Contains(t, unmarshaledError, "deliberate error for testing", "Error message should be preserved")
875+
876+
case "emptyStep":
877+
// Empty string might be returned as nil or as an empty JSON string
878+
output := step["output"]
879+
if output == nil {
880+
// Empty string was not included in response (which is fine)
881+
t.Logf("Empty step output was nil (not included)")
882+
} else {
883+
// If it was included, it should be marshaled as JSON string `""`
884+
outputStr, ok := output.(string)
885+
require.True(t, ok, "If present, empty step output should be a JSON string")
886+
887+
var unmarshaledOutput string
888+
err = json.Unmarshal([]byte(outputStr), &unmarshaledOutput)
889+
require.NoError(t, err, "Failed to unmarshal empty step output")
890+
assert.Equal(t, "", unmarshaledOutput, "Empty step output should be empty string")
891+
}
892+
893+
assert.Nil(t, step["error"], "Empty step should have no error")
894+
}
895+
}
896+
})
897+
729898
t.Run("TestDeactivate", func(t *testing.T) {
730899
resetTestDatabase(t, databaseURL)
731900
ctx, err := NewDBOSContext(context.Background(), Config{

dbos/workflow.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,6 +1041,10 @@ func RunAsStep[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error
10411041
return *new(R), newStepExecutionError("", "", "step function cannot be nil")
10421042
}
10431043

1044+
// Register the output type for gob encoding
1045+
var r R
1046+
gob.Register(r)
1047+
10441048
// Append WithStepName option to ensure the step name is set. This will not erase a user-provided step name
10451049
stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
10461050
opts = append(opts, WithStepName(stepName))
@@ -1490,9 +1494,9 @@ func (c *dbosContext) CancelWorkflow(_ DBOSContext, workflowID string) error {
14901494
workflowState, ok := c.Value(workflowStateKey).(*workflowState)
14911495
isWithinWorkflow := ok && workflowState != nil
14921496
if isWithinWorkflow {
1493-
_, err := RunAsStep(c, func(ctx context.Context) (any, error) {
1497+
_, err := RunAsStep(c, func(ctx context.Context) (string, error) {
14941498
err := c.systemDB.cancelWorkflow(ctx, workflowID)
1495-
return nil, err
1499+
return "", err
14961500
}, WithStepName("DBOS.cancelWorkflow"))
14971501
return err
14981502
} else {
@@ -1527,9 +1531,9 @@ func (c *dbosContext) ResumeWorkflow(_ DBOSContext, workflowID string) (Workflow
15271531
isWithinWorkflow := ok && workflowState != nil
15281532
var err error
15291533
if isWithinWorkflow {
1530-
_, err = RunAsStep(c, func(ctx context.Context) (any, error) {
1534+
_, err = RunAsStep(c, func(ctx context.Context) (string, error) {
15311535
err := c.systemDB.resumeWorkflow(ctx, workflowID)
1532-
return nil, err
1536+
return "", err
15331537
}, WithStepName("DBOS.resumeWorkflow"))
15341538
} else {
15351539
err = c.systemDB.resumeWorkflow(c, workflowID)

dbos/workflows_test.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,113 @@ func TestSteps(t *testing.T) {
567567
assert.Equal(t, "MyCustomStep2", steps[1].StepName, "expected second step to have custom name")
568568
assert.Equal(t, 1, steps[1].StepID)
569569
})
570+
571+
t.Run("stepsOutputEncoding", func(t *testing.T) {
572+
// Define user-defined types for testing serialization
573+
type StepInput struct {
574+
Name string `json:"name"`
575+
Count int `json:"count"`
576+
Active bool `json:"active"`
577+
Metadata map[string]string `json:"metadata"`
578+
CreatedAt time.Time `json:"created_at"`
579+
}
580+
581+
type StepOutput struct {
582+
ProcessedName string `json:"processed_name"`
583+
TotalCount int `json:"total_count"`
584+
Success bool `json:"success"`
585+
ProcessedAt time.Time `json:"processed_at"`
586+
Details []string `json:"details"`
587+
}
588+
589+
// Create a step function that accepts StepInput and returns StepOutput
590+
processUserObjectStep := func(_ context.Context, input StepInput) (StepOutput, error) {
591+
// Process the input and create output
592+
output := StepOutput{
593+
ProcessedName: fmt.Sprintf("Processed_%s", input.Name),
594+
TotalCount: input.Count * 2,
595+
Success: input.Active,
596+
ProcessedAt: time.Now(),
597+
Details: []string{"step1", "step2", "step3"},
598+
}
599+
600+
// Verify input was correctly deserialized
601+
if input.Metadata == nil {
602+
return StepOutput{}, fmt.Errorf("metadata map was not properly deserialized")
603+
}
604+
605+
return output, nil
606+
}
607+
608+
// Create a workflow that uses the step with user-defined objects
609+
userObjectWorkflow := func(dbosCtx DBOSContext, workflowInput string) (string, error) {
610+
// Create input for the step
611+
stepInput := StepInput{
612+
Name: workflowInput,
613+
Count: 42,
614+
Active: true,
615+
Metadata: map[string]string{
616+
"key1": "value1",
617+
"key2": "value2",
618+
},
619+
CreatedAt: time.Now(),
620+
}
621+
622+
// Run the step with user-defined input and output
623+
output, err := RunAsStep(dbosCtx, func(ctx context.Context) (StepOutput, error) {
624+
return processUserObjectStep(ctx, stepInput)
625+
})
626+
if err != nil {
627+
return "", fmt.Errorf("step failed: %w", err)
628+
}
629+
630+
// Verify the output was correctly returned
631+
if output.ProcessedName == "" {
632+
return "", fmt.Errorf("output ProcessedName is empty")
633+
}
634+
if output.TotalCount != 84 {
635+
return "", fmt.Errorf("expected TotalCount to be 84, got %d", output.TotalCount)
636+
}
637+
if len(output.Details) != 3 {
638+
return "", fmt.Errorf("expected 3 details, got %d", len(output.Details))
639+
}
640+
641+
return "", nil
642+
}
643+
644+
// Register the workflow
645+
RegisterWorkflow(dbosCtx, userObjectWorkflow)
646+
647+
// Execute the workflow
648+
handle, err := RunWorkflow(dbosCtx, userObjectWorkflow, "TestObject")
649+
require.NoError(t, err, "failed to run workflow with user-defined objects")
650+
651+
// Get the result
652+
_, err = handle.GetResult()
653+
require.NoError(t, err, "failed to get result from workflow")
654+
655+
// Verify the step was recorded
656+
steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID())
657+
require.NoError(t, err, "failed to get workflow steps")
658+
require.Len(t, steps, 1, "expected 1 step")
659+
660+
// Verify step output was properly serialized and stored
661+
step := steps[0]
662+
require.NotNil(t, step.Output, "step output should not be nil")
663+
assert.Nil(t, step.Error)
664+
665+
// Deserialize the output from the database to verify proper encoding
666+
storedOutput, ok := step.Output.(StepOutput)
667+
require.True(t, ok, "failed to cast step output to StepOutput")
668+
669+
// Verify all fields were correctly serialized and deserialized
670+
assert.Equal(t, "Processed_TestObject", storedOutput.ProcessedName, "ProcessedName not correctly serialized")
671+
assert.Equal(t, 84, storedOutput.TotalCount, "TotalCount not correctly serialized")
672+
assert.True(t, storedOutput.Success, "Success flag not correctly serialized")
673+
assert.Len(t, storedOutput.Details, 3, "Details array length incorrect")
674+
assert.Equal(t, []string{"step1", "step2", "step3"}, storedOutput.Details, "Details array not correctly serialized")
675+
assert.False(t, storedOutput.ProcessedAt.IsZero(), "ProcessedAt timestamp should not be zero")
676+
})
570677
}
571678

572679
func TestChildWorkflow(t *testing.T) {

0 commit comments

Comments
 (0)