Skip to content

Commit 2737bd0

Browse files
authored
Add missing request IDs (#1458)
These ensure idempotency across retries, which is particularly important for avoiding duplicate signals.
1 parent 73d0e88 commit 2737bd0

File tree

3 files changed

+63
-31
lines changed

3 files changed

+63
-31
lines changed

internal/internal_task_handlers.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"time"
3535

3636
"github.com/opentracing/opentracing-go"
37+
"github.com/pborman/uuid"
3738
"github.com/uber-go/tally"
3839
"go.uber.org/zap"
3940

@@ -1627,6 +1628,7 @@ func signalWorkflow(
16271628
SignalName: common.StringPtr(signalName),
16281629
Input: signalInput,
16291630
Identity: common.StringPtr(identity),
1631+
RequestId: common.StringPtr(uuid.New()),
16301632
}
16311633

16321634
return backoff.Retry(ctx,

internal/internal_workflow_client.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,8 @@ func (wc *workflowClient) CancelWorkflow(ctx context.Context, workflowID string,
401401
WorkflowId: common.StringPtr(workflowID),
402402
RunId: getRunID(runID),
403403
},
404-
Identity: common.StringPtr(wc.identity),
404+
Identity: common.StringPtr(wc.identity),
405+
RequestId: common.StringPtr(uuid.New()),
405406
}
406407

407408
for _, opt := range opts {

internal/internal_workflow_client_test.go

Lines changed: 59 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,6 +1047,31 @@ func (s *workflowClientTestSuite) TearDownSubTest() {
10471047
s.TearDownTest()
10481048
}
10491049

1050+
func (s *workflowClientTestSuite) TestSignalWorkflow() {
1051+
signalName := "my signal"
1052+
signalInput := []byte("my signal input")
1053+
1054+
s.service.EXPECT().SignalWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, request *shared.SignalWorkflowExecutionRequest, _ ...yarpc.CallOption) error {
1055+
s.NotNil(request.RequestId)
1056+
request.RequestId = nil
1057+
s.Equal(&shared.SignalWorkflowExecutionRequest{
1058+
Domain: common.StringPtr(domain),
1059+
WorkflowExecution: &shared.WorkflowExecution{
1060+
WorkflowId: common.StringPtr(workflowID),
1061+
RunId: common.StringPtr(runID),
1062+
},
1063+
SignalName: common.StringPtr(signalName),
1064+
Input: signalInput,
1065+
Identity: common.StringPtr(identity),
1066+
RequestId: nil,
1067+
}, request)
1068+
return nil
1069+
}).Times(1)
1070+
1071+
err := s.client.SignalWorkflow(context.Background(), workflowID, runID, signalName, signalInput)
1072+
s.NoError(err)
1073+
}
1074+
10501075
func (s *workflowClientTestSuite) TestSignalWithStartWorkflow() {
10511076
signalName := "my signal"
10521077
signalInput := []byte("my signal input")
@@ -1707,46 +1732,50 @@ func serializeEvents(events []*shared.HistoryEvent) *shared.DataBlob {
17071732
}
17081733

17091734
func (s *workflowClientTestSuite) TestCancelWorkflow() {
1710-
s.service.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), newPartialCancelRequestMatcher(common.StringPtr("testWf"), common.StringPtr("test reason")), gomock.All(gomock.Any())).Return(nil)
1735+
s.service.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), gomock.Any(), gomock.All(gomock.Any())).DoAndReturn(func(_ context.Context, request *shared.RequestCancelWorkflowExecutionRequest, _ ...yarpc.CallOption) error {
1736+
s.NotNil(request.RequestId)
1737+
request.RequestId = nil
1738+
s.Equal(&shared.RequestCancelWorkflowExecutionRequest{
1739+
Domain: common.StringPtr(domain),
1740+
WorkflowExecution: &shared.WorkflowExecution{
1741+
WorkflowId: common.StringPtr(workflowID),
1742+
RunId: common.StringPtr(runID),
1743+
},
1744+
Identity: common.StringPtr(identity),
1745+
RequestId: nil,
1746+
Cause: common.StringPtr("test reason"),
1747+
FirstExecutionRunID: nil,
1748+
}, request)
1749+
return nil
1750+
}).Times(1)
17111751

1712-
err := s.client.CancelWorkflow(context.Background(), "testWf", "testRun", WithCancelReason("test reason"))
1752+
err := s.client.CancelWorkflow(context.Background(), workflowID, runID, WithCancelReason("test reason"))
17131753

17141754
s.NoError(err)
17151755
}
17161756

17171757
func (s *workflowClientTestSuite) TestCancelWorkflowBackwardsCompatible() {
1718-
s.service.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), newPartialCancelRequestMatcher(common.StringPtr("testWf"), nil), gomock.All(gomock.Any())).Return(nil)
1719-
1720-
err := s.client.CancelWorkflow(context.Background(), "testWf", "testRun")
1758+
s.service.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), gomock.Any(), gomock.All(gomock.Any())).DoAndReturn(func(_ context.Context, request *shared.RequestCancelWorkflowExecutionRequest, _ ...yarpc.CallOption) error {
1759+
s.NotNil(request.RequestId)
1760+
request.RequestId = nil
1761+
s.Equal(&shared.RequestCancelWorkflowExecutionRequest{
1762+
Domain: common.StringPtr(domain),
1763+
WorkflowExecution: &shared.WorkflowExecution{
1764+
WorkflowId: common.StringPtr(workflowID),
1765+
RunId: common.StringPtr(runID),
1766+
},
1767+
Identity: common.StringPtr(identity),
1768+
RequestId: nil,
1769+
Cause: nil,
1770+
FirstExecutionRunID: nil,
1771+
}, request)
1772+
return nil
1773+
}).Times(1)
1774+
err := s.client.CancelWorkflow(context.Background(), workflowID, runID)
17211775

17221776
s.NoError(err)
17231777
}
17241778

1725-
type PartialCancelRequestMatcher struct {
1726-
wfID *string
1727-
cause *string
1728-
}
1729-
1730-
func newPartialCancelRequestMatcher(wfID *string, cause *string) gomock.Matcher {
1731-
return &PartialCancelRequestMatcher{
1732-
wfID: wfID,
1733-
cause: cause,
1734-
}
1735-
}
1736-
1737-
func (m *PartialCancelRequestMatcher) Matches(a interface{}) bool {
1738-
aEx, ok := a.(*shared.RequestCancelWorkflowExecutionRequest)
1739-
if !ok {
1740-
return false
1741-
}
1742-
1743-
return (aEx.Cause == m.cause || *aEx.Cause == *m.cause) && *aEx.WorkflowExecution.WorkflowId == *m.wfID
1744-
}
1745-
1746-
func (m *PartialCancelRequestMatcher) String() string {
1747-
return "partial cancellation request matcher matches cause and wfId fields"
1748-
}
1749-
17501779
func (s *workflowClientTestSuite) TestTerminateWorkflow() {
17511780
expectedRequest := &shared.TerminateWorkflowExecutionRequest{
17521781
Domain: common.StringPtr(domain),

0 commit comments

Comments
 (0)