Skip to content

Commit 46420c2

Browse files
authored
Handle links from ExecuteWorkflow in Nexus WorkflowRunOperation (#1934)
* Handle links from ExecuteWorkflow in Nexus WorkflowRunOperation * add test
1 parent 52c7523 commit 46420c2

File tree

6 files changed

+167
-58
lines changed

6 files changed

+167
-58
lines changed

internal/client.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -762,6 +762,10 @@ type (
762762
// WARNING: Task queue priority is currently experimental.
763763
Priority Priority
764764

765+
// responseInfo - Optional pointer to store information of StartWorkflowExecution response.
766+
// Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
767+
responseInfo *startWorkflowResponseInfo
768+
765769
// request ID. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
766770
requestID string
767771
// workflow completion callback. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
@@ -778,6 +782,13 @@ type (
778782
onConflictOptions *OnConflictOptions
779783
}
780784

785+
// startWorkflowResponseInfo can be passed to StartWorkflowOptions to receive additional information
786+
// of StartWorkflowExecution response.
787+
startWorkflowResponseInfo struct {
788+
// Link to the workflow event.
789+
Link *commonpb.Link
790+
}
791+
781792
// WithStartWorkflowOperation defines how to start a workflow when using UpdateWithStartWorkflow.
782793
// See [NewWithStartWorkflowOperation] and [UpdateWithStartWorkflow].
783794
//
@@ -1276,3 +1287,13 @@ func SetOnConflictOptionsOnStartWorkflowOptions(opts *StartWorkflowOptions) {
12761287
AttachLinks: true,
12771288
}
12781289
}
1290+
1291+
// SetResponseInfoOnStartWorkflowOptions is an internal only method for setting start workflow
1292+
// response info object pointer on StartWorkflowOptions and return the object pointer.
1293+
// StartWorkflowResponseInfo is purposefully not exposed to users for the time being.
1294+
func SetResponseInfoOnStartWorkflowOptions(opts *StartWorkflowOptions) *startWorkflowResponseInfo {
1295+
if opts.responseInfo == nil {
1296+
opts.responseInfo = &startWorkflowResponseInfo{}
1297+
}
1298+
return opts.responseInfo
1299+
}

internal/cmd/build/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func (b *builder) integrationTest() error {
113113
if *devServerFlag {
114114
devServer, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{
115115
CachedDownload: testsuite.CachedDownload{
116-
Version: "v1.3.1-nexus-cancellation.0",
116+
Version: "v1.3.1-nexus-links.0",
117117
},
118118
ClientOptions: &client.Options{
119119
HostPort: "127.0.0.1:7233",
@@ -147,6 +147,7 @@ func (b *builder) integrationTest() error {
147147
"--dynamic-config-value", `component.callbacks.allowedAddresses=[{"Pattern":"*","AllowInsecure":true}]`, // SDK tests use arbitrary callback URLs, permit that on the server
148148
"--dynamic-config-value", `system.refreshNexusEndpointsMinWait="0s"`, // Make Nexus tests faster
149149
"--dynamic-config-value", `component.nexusoperations.recordCancelRequestCompletionEvents=true`, // Defaults to false until after OSS 1.28 is released
150+
"--dynamic-config-value", `history.enableRequestIdRefLinks=true`,
150151
},
151152
})
152153
if err != nil {

internal/internal_workflow_client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1798,6 +1798,10 @@ func (w *workflowClientInterceptor) ExecuteWorkflow(
17981798
runID = response.RunId
17991799
}
18001800

1801+
if responseInfo := in.Options.responseInfo; responseInfo != nil {
1802+
responseInfo.Link = response.GetLink()
1803+
}
1804+
18011805
iterFn := func(fnCtx context.Context, fnRunID string) HistoryEventIterator {
18021806
metricsHandler := w.client.metricsHandler.WithTags(metrics.RPCTags(in.WorkflowType,
18031807
metrics.NoneTagValue, in.Options.TaskQueue))

internal/internal_workflow_client_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,47 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_RawHistory_Success() {
452452
s.Equal(workflowResult, decodedResult)
453453
}
454454

455+
func (s *workflowRunSuite) TestExecuteWorkflow_StartWorkflowResponseInfo() {
456+
link := &commonpb.Link{
457+
Variant: &commonpb.Link_WorkflowEvent_{
458+
WorkflowEvent: &commonpb.Link_WorkflowEvent{
459+
Namespace: DefaultNamespace,
460+
WorkflowId: workflowID,
461+
RunId: runID,
462+
Reference: &commonpb.Link_WorkflowEvent_EventRef{
463+
EventRef: &commonpb.Link_WorkflowEvent_EventReference{
464+
EventId: 1,
465+
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
466+
},
467+
},
468+
},
469+
},
470+
}
471+
createResponse := &workflowservice.StartWorkflowExecutionResponse{
472+
RunId: runID,
473+
Started: true,
474+
Link: link,
475+
}
476+
s.workflowServiceClient.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any()).
477+
Return(createResponse, nil).Times(1)
478+
479+
responseInfo := &startWorkflowResponseInfo{}
480+
_, err := s.workflowClient.ExecuteWorkflow(
481+
context.Background(),
482+
StartWorkflowOptions{
483+
ID: workflowID,
484+
TaskQueue: taskqueue,
485+
WorkflowExecutionTimeout: timeoutInSeconds * time.Second,
486+
WorkflowTaskTimeout: timeoutInSeconds * time.Second,
487+
WorkflowIDReusePolicy: workflowIDReusePolicy,
488+
WorkflowExecutionErrorWhenAlreadyStarted: true,
489+
responseInfo: responseInfo,
490+
}, workflowType,
491+
)
492+
s.NoError(err)
493+
s.Equal(link, responseInfo.Link)
494+
}
495+
455496
func (s *workflowRunSuite) TestExecuteWorkflowWorkflowExecutionAlreadyStartedError() {
456497
mockerr := serviceerror.NewWorkflowExecutionAlreadyStarted("Already Started", "", runID)
457498
s.workflowServiceClient.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).

temporalnexus/operation.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ type workflowHandle[T any] struct {
214214
namespace string
215215
id string
216216
runID string
217+
wfEventLink *common.Link
217218
cachedToken string
218219
}
219220

@@ -226,16 +227,19 @@ func (h workflowHandle[T]) RunID() string {
226227
}
227228

228229
func (h workflowHandle[T]) link() nexus.Link {
229-
// Create the link information about the new workflow and return to the caller.
230-
link := &common.Link_WorkflowEvent{
231-
Namespace: h.namespace,
232-
WorkflowId: h.ID(),
233-
RunId: h.RunID(),
234-
Reference: &common.Link_WorkflowEvent_EventRef{
235-
EventRef: &common.Link_WorkflowEvent_EventReference{
236-
EventType: enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
230+
// Create the link information about the workflow and return to the caller.
231+
link := h.wfEventLink.GetWorkflowEvent()
232+
if link == nil {
233+
link = &common.Link_WorkflowEvent{
234+
Namespace: h.namespace,
235+
WorkflowId: h.ID(),
236+
RunId: h.RunID(),
237+
Reference: &common.Link_WorkflowEvent_EventRef{
238+
EventRef: &common.Link_WorkflowEvent_EventReference{
239+
EventType: enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
240+
},
237241
},
238-
},
242+
}
239243
}
240244
return ConvertLinkWorkflowEventToNexusLink(link)
241245
}
@@ -327,6 +331,7 @@ func ExecuteUntypedWorkflow[R any](
327331
// don't support links in callbacks.
328332
internal.SetLinksOnStartWorkflowOptions(&startWorkflowOptions, links)
329333
internal.SetOnConflictOptionsOnStartWorkflowOptions(&startWorkflowOptions)
334+
responseInfo := internal.SetResponseInfoOnStartWorkflowOptions(&startWorkflowOptions)
330335

331336
// This makes sure that ExecuteWorkflow will respect the WorkflowIDConflictPolicy, ie., if the
332337
// conflict policy is to fail (default value), then ExecuteWorkflow will return an error if the
@@ -342,6 +347,7 @@ func ExecuteUntypedWorkflow[R any](
342347
namespace: nctx.Namespace,
343348
id: run.GetID(),
344349
runID: run.GetRunID(),
350+
wfEventLink: responseInfo.Link,
345351
cachedToken: encodedToken,
346352
}, nil
347353
}

test/nexus_test.go

Lines changed: 84 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -459,8 +459,10 @@ func TestNexusWorkflowRunOperation(t *testing.T) {
459459
event, err := iter.Next()
460460
require.NoError(t, err)
461461
if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED {
462-
require.Len(t, event.GetLinks(), 1)
463-
require.True(t, proto.Equal(link, event.GetLinks()[0].GetWorkflowEvent()))
462+
completionCallbacks := event.GetWorkflowExecutionStartedEventAttributes().GetCompletionCallbacks()
463+
require.Len(t, completionCallbacks, 1)
464+
require.Len(t, completionCallbacks[0].GetLinks(), 1)
465+
require.True(t, proto.Equal(link, completionCallbacks[0].GetLinks()[0].GetWorkflowEvent()))
464466
break
465467
}
466468
}
@@ -923,12 +925,7 @@ func TestAsyncOperationFromWorkflow(t *testing.T) {
923925
require.Equal(t, tc.testConfig.Namespace, link.GetWorkflowEvent().GetNamespace())
924926
require.Equal(t, handlerWfID, link.GetWorkflowEvent().GetWorkflowId())
925927
require.NotEmpty(t, link.GetWorkflowEvent().GetRunId())
926-
require.True(t, proto.Equal(
927-
&common.Link_WorkflowEvent_EventReference{
928-
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
929-
},
930-
link.GetWorkflowEvent().GetEventRef(),
931-
))
928+
require.Equal(t, enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, link.GetWorkflowEvent().GetEventRef().GetEventType())
932929
handlerRunID := link.GetWorkflowEvent().GetRunId()
933930

934931
// Check the link is added in the handler workflow.
@@ -952,21 +949,21 @@ func TestAsyncOperationFromWorkflow(t *testing.T) {
952949
// Verify that calling by name works.
953950
require.Equal(t, "foo", targetEvent.GetWorkflowExecutionStartedEventAttributes().WorkflowType.Name)
954951
// Verify that links are properly attached.
955-
require.Len(t, targetEvent.GetLinks(), 1)
956-
require.True(t, proto.Equal(
957-
&common.Link_WorkflowEvent{
958-
Namespace: tc.testConfig.Namespace,
959-
WorkflowId: run.GetID(),
960-
RunId: run.GetRunID(),
961-
Reference: &common.Link_WorkflowEvent_EventRef{
962-
EventRef: &common.Link_WorkflowEvent_EventReference{
963-
EventId: nexusOperationScheduleEventID,
964-
EventType: enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED,
965-
},
952+
completionCallbacks := targetEvent.GetWorkflowExecutionStartedEventAttributes().GetCompletionCallbacks()
953+
require.Len(t, completionCallbacks, 1)
954+
expectedLink := &common.Link_WorkflowEvent{
955+
Namespace: tc.testConfig.Namespace,
956+
WorkflowId: run.GetID(),
957+
RunId: run.GetRunID(),
958+
Reference: &common.Link_WorkflowEvent_EventRef{
959+
EventRef: &common.Link_WorkflowEvent_EventReference{
960+
EventId: nexusOperationScheduleEventID,
961+
EventType: enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED,
966962
},
967963
},
968-
targetEvent.GetLinks()[0].GetWorkflowEvent(),
969-
))
964+
}
965+
require.Len(t, completionCallbacks[0].GetLinks(), 1)
966+
require.True(t, proto.Equal(expectedLink, completionCallbacks[0].GetLinks()[0].GetWorkflowEvent()))
970967
})
971968

972969
t.Run("OpFailed", func(t *testing.T) {
@@ -1276,7 +1273,10 @@ func TestAsyncOperationFromWorkflow_MultipleCallers(t *testing.T) {
12761273
defer cancel()
12771274
tctx := newTestContext(t, ctx)
12781275

1279-
handlerWorkflowID := uuid.NewString()
1276+
handlerWfIDSuffix := uuid.NewString()
1277+
getHandlerWfID := func(tcName string) string {
1278+
return tcName + "-" + handlerWfIDSuffix
1279+
}
12801280
handlerWf := func(ctx workflow.Context, input string) (string, error) {
12811281
workflow.GetSignalChannel(ctx, "terminate").Receive(ctx, nil)
12821282
return "hello " + input, nil
@@ -1285,13 +1285,13 @@ func TestAsyncOperationFromWorkflow_MultipleCallers(t *testing.T) {
12851285
op := temporalnexus.NewWorkflowRunOperation(
12861286
"op",
12871287
handlerWf,
1288-
func(ctx context.Context, input string, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
1288+
func(ctx context.Context, tcName string, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
12891289
var conflictPolicy enumspb.WorkflowIdConflictPolicy
1290-
if input == "conflict-policy-use-existing" {
1290+
if tcName == "conflict-policy-use-existing" {
12911291
conflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING
12921292
}
12931293
return client.StartWorkflowOptions{
1294-
ID: handlerWorkflowID,
1294+
ID: getHandlerWfID(tcName),
12951295
WorkflowIDConflictPolicy: conflictPolicy,
12961296
}, nil
12971297
},
@@ -1302,7 +1302,7 @@ func TestAsyncOperationFromWorkflow_MultipleCallers(t *testing.T) {
13021302
CntErr int
13031303
}
13041304

1305-
callerWf := func(ctx workflow.Context, input string, numCalls int) (CallerWfOutput, error) {
1305+
callerWf := func(ctx workflow.Context, tcName string, numCalls int) (CallerWfOutput, error) {
13061306
output := CallerWfOutput{}
13071307
var retError error
13081308

@@ -1314,7 +1314,7 @@ func TestAsyncOperationFromWorkflow_MultipleCallers(t *testing.T) {
13141314
wg.Add(1)
13151315
workflow.Go(ctx, func(ctx workflow.Context) {
13161316
defer wg.Done()
1317-
fut := client.ExecuteOperation(ctx, op, input, workflow.NexusOperationOptions{})
1317+
fut := client.ExecuteOperation(ctx, op, tcName, workflow.NexusOperationOptions{})
13181318
var exec workflow.NexusOperationExecution
13191319
err := fut.GetNexusOperationExecution().Get(ctx, &exec)
13201320
if err != nil {
@@ -1339,7 +1339,7 @@ func TestAsyncOperationFromWorkflow_MultipleCallers(t *testing.T) {
13391339
err = fut.Get(ctx, &res)
13401340
if err != nil {
13411341
retError = err
1342-
} else if res != "hello "+input {
1342+
} else if res != "hello "+tcName {
13431343
retError = fmt.Errorf("unexpected result from handler workflow: %q", res)
13441344
}
13451345
})
@@ -1351,7 +1351,7 @@ func TestAsyncOperationFromWorkflow_MultipleCallers(t *testing.T) {
13511351

13521352
if output.CntOk > 0 {
13531353
// signal handler workflow so it will complete
1354-
workflow.SignalExternalWorkflow(ctx, handlerWorkflowID, "", "terminate", nil).Get(ctx, nil)
1354+
workflow.SignalExternalWorkflow(ctx, getHandlerWfID(tcName), "", "terminate", nil).Get(ctx, nil)
13551355
}
13561356
wg.Wait(ctx)
13571357
return output, retError
@@ -1366,48 +1366,84 @@ func TestAsyncOperationFromWorkflow_MultipleCallers(t *testing.T) {
13661366
require.NoError(t, w.Start())
13671367
t.Cleanup(w.Stop)
13681368

1369+
// number of concurrent Nexus operation calls
1370+
numCalls := 5
1371+
13691372
testCases := []struct {
1370-
input string
1371-
checkOutput func(t *testing.T, numCalls int, res CallerWfOutput, err error)
1373+
name string
1374+
expectedOk int
1375+
expectedErr int
13721376
}{
13731377
{
1374-
input: "conflict-policy-fail",
1375-
checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput, err error) {
1376-
require.NoError(t, err)
1377-
require.EqualValues(t, 1, res.CntOk)
1378-
require.EqualValues(t, numCalls-1, res.CntErr)
1379-
},
1378+
name: "conflict-policy-fail",
1379+
expectedOk: 1,
1380+
expectedErr: numCalls - 1,
13801381
},
13811382
{
1382-
input: "conflict-policy-use-existing",
1383-
checkOutput: func(t *testing.T, numCalls int, res CallerWfOutput, err error) {
1384-
require.NoError(t, err)
1385-
require.EqualValues(t, numCalls, res.CntOk)
1386-
require.EqualValues(t, 0, res.CntErr)
1387-
},
1383+
name: "conflict-policy-use-existing",
1384+
expectedOk: numCalls,
1385+
expectedErr: 0,
13881386
},
13891387
}
13901388

1391-
// number of concurrent Nexus operation calls
1392-
numCalls := 5
13931389
for _, tc := range testCases {
1394-
t.Run(tc.input, func(t *testing.T) {
1390+
t.Run(tc.name, func(t *testing.T) {
1391+
callerWfID := uuid.NewString()
13951392
run, err := tctx.client.ExecuteWorkflow(
13961393
ctx,
13971394
client.StartWorkflowOptions{
1395+
ID: callerWfID,
13981396
TaskQueue: tctx.taskQueue,
13991397
// The endpoint registry may take a bit to propagate to the history service, use a shorter
14001398
// workflow task timeout to speed up the attempts.
14011399
WorkflowTaskTimeout: time.Second,
14021400
},
14031401
callerWf,
1404-
tc.input,
1402+
tc.name,
14051403
numCalls,
14061404
)
14071405
require.NoError(t, err)
14081406
var res CallerWfOutput
14091407
err = run.Get(ctx, &res)
1410-
tc.checkOutput(t, numCalls, res, err)
1408+
require.NoError(t, err)
1409+
require.Equal(t, tc.expectedOk, res.CntOk)
1410+
require.Equal(t, tc.expectedErr, res.CntErr)
1411+
1412+
cntEventRefLinks := 0
1413+
cntRequestIDRefLinks := 0
1414+
iter := tctx.client.GetWorkflowHistory(
1415+
context.Background(),
1416+
callerWfID,
1417+
"",
1418+
false,
1419+
enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
1420+
)
1421+
for iter.HasNext() {
1422+
event, err := iter.Next()
1423+
require.NoError(t, err)
1424+
if event.GetEventType() != enumspb.EVENT_TYPE_NEXUS_OPERATION_STARTED {
1425+
continue
1426+
}
1427+
require.Len(t, event.GetLinks(), 1)
1428+
link := event.GetLinks()[0].GetWorkflowEvent()
1429+
require.NotNil(t, link)
1430+
require.Equal(t, tctx.testConfig.Namespace, link.GetNamespace())
1431+
require.Equal(t, getHandlerWfID(tc.name), link.GetWorkflowId())
1432+
switch link.Reference.(type) {
1433+
case *common.Link_WorkflowEvent_EventRef:
1434+
cntEventRefLinks++
1435+
require.Equal(t, int64(1), link.GetEventRef().GetEventId())
1436+
require.Equal(t, enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, link.GetEventRef().GetEventType())
1437+
case *common.Link_WorkflowEvent_RequestIdRef:
1438+
cntRequestIDRefLinks++
1439+
require.NotEmpty(t, link.GetRequestIdRef().GetRequestId())
1440+
require.Equal(t, enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED, link.GetRequestIdRef().GetEventType())
1441+
default:
1442+
require.Fail(t, fmt.Sprintf("Unexpected link reference type: %T", link.Reference))
1443+
}
1444+
}
1445+
require.Equal(t, 1, cntEventRefLinks)
1446+
require.Equal(t, tc.expectedOk-1, cntRequestIDRefLinks)
14111447
})
14121448
}
14131449
}

0 commit comments

Comments
 (0)