Skip to content

Commit 9de36a9

Browse files
authored
Handle activity cancel due to worker shutdown properly (#1910)
* Differentiate context cancel due to worker shutdown * change cancelAllowed to cancelReason to handle more nuanced scenario of cancel source * go back to using isActivityCanceled * Fix sessions for TestSessionStateFailedWorkerFailed * PR feedback * Add local activity test * Add local activity test for worker shutdown, as well as for no heartbeating due to shutdown * Only avoid heartbeating early due to non-server cancel * Remove redundant tests, remove old LA fix now that fix is in main, * Add new test, activity actually handles ctx cancel * increase time to reduce race condition change
1 parent b955aa8 commit 9de36a9

File tree

5 files changed

+113
-3
lines changed

5 files changed

+113
-3
lines changed

internal/internal_task_handlers.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2283,8 +2283,10 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice
22832283

22842284
output, err := activityImplementation.Execute(ctx, t.Input)
22852285
// Check if context canceled at a higher level before we cancel it ourselves
2286-
// TODO : check if the cause of the context cancellation is from the server
2287-
isActivityCanceled := ctx.Err() == context.Canceled
2286+
2287+
// Cancels that don't originate from the server will have separate cancel reasons, like
2288+
// ErrWorkerShutdown or ErrActivityPaused
2289+
isActivityCanceled := ctx.Err() == context.Canceled && errors.Is(context.Cause(ctx), &CanceledError{})
22882290

22892291
dlCancelFunc()
22902292
if <-ctx.Done(); ctx.Err() == context.DeadlineExceeded {

internal/session.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ func createSession(ctx Context, creationTaskqueue string, options *SessionOption
286286

287287
taskqueueChan := GetSignalChannel(ctx, sessionID) // use sessionID as channel name
288288
// Retry is only needed when creating new session and the error returned is
289-
// NewApplicationError(errTooManySessionsMsg). Therefore we make sure to
289+
// NewApplicationError(errTooManySessionsMsg). Therefore, we make sure to
290290
// disable retrying for start-to-close and heartbeat timeouts which can occur
291291
// when attempting to retry a create-session on a different worker.
292292
retryPolicy := &RetryPolicy{
@@ -417,6 +417,14 @@ func sessionCreationActivity(ctx context.Context, sessionID string) error {
417417
select {
418418
case <-ctx.Done():
419419
sessionEnv.CompleteSession(sessionID)
420+
// Because of how session creation configures retryPolicy, we need to wrap context cancels that don't
421+
// originate from the server as non-retryable errors. See retrypolicy in createSession() above.
422+
if !(ctx.Err() == context.Canceled && errors.Is(context.Cause(ctx), &CanceledError{})) {
423+
return NewApplicationErrorWithOptions(
424+
"session failed due to worker shutdown", "SessionWorkerShutdown",
425+
ApplicationErrorOptions{NonRetryable: true, Cause: ctx.Err()})
426+
427+
}
420428
return ctx.Err()
421429
case <-ticker.C:
422430
heartbeatOp := func() error {

test/activity_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,3 +461,13 @@ func (a *Activities) RawValueActivity(ctx context.Context, value converter.RawVa
461461
activity.GetLogger(ctx).Info("RawValue value", value.Payload())
462462
return value, nil
463463
}
464+
465+
func (a *Activities) CancelActivity(ctx context.Context) error {
466+
t := time.NewTicker(200 * time.Millisecond)
467+
defer t.Stop()
468+
select {
469+
case <-ctx.Done():
470+
case <-t.C:
471+
}
472+
return ctx.Err()
473+
}

test/integration_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7709,6 +7709,66 @@ func (ts *IntegrationTestSuite) TestLocalActivityFailureMetric_BenignHandling()
77097709
ts.assertMetricCount(metrics.LocalActivityExecutionFailedCounter, currCount)
77107710
}
77117711

7712+
func (ts *IntegrationTestSuite) TestActivityCancelFromWorkerShutdown() {
7713+
ctx, cancel := context.WithCancel(context.Background())
7714+
defer cancel()
7715+
7716+
run, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("test-activity-cancel"), ts.workflows.WorkflowReactToCancel, false)
7717+
ts.NoError(err)
7718+
7719+
// Give the workflow time to run and run activity
7720+
time.Sleep(100 * time.Millisecond)
7721+
ts.worker.Stop()
7722+
ts.workerStopped = true
7723+
// Now create a new worker on that same task queue to resume the work of the
7724+
// activity retry
7725+
nextWorker := worker.New(ts.client, ts.taskQueueName, worker.Options{})
7726+
ts.registerWorkflowsAndActivities(nextWorker)
7727+
ts.NoError(nextWorker.Start())
7728+
defer nextWorker.Stop()
7729+
7730+
err = run.Get(ctx, nil)
7731+
ts.NoError(err)
7732+
}
7733+
7734+
func (ts *IntegrationTestSuite) TestLocalActivityCancelFromWorkerShutdown() {
7735+
ctx, cancel := context.WithCancel(context.Background())
7736+
defer cancel()
7737+
7738+
run, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("test-local-activity-cancel"), ts.workflows.WorkflowReactToCancel, true)
7739+
ts.NoError(err)
7740+
7741+
// Give the workflow time to run and run activity
7742+
time.Sleep(100 * time.Millisecond)
7743+
ts.worker.Stop()
7744+
ts.workerStopped = true
7745+
// Now create a new worker on that same task queue to resume the work of the
7746+
// activity retry
7747+
nextWorker := worker.New(ts.client, ts.taskQueueName, worker.Options{})
7748+
ts.registerWorkflowsAndActivities(nextWorker)
7749+
ts.NoError(nextWorker.Start())
7750+
defer nextWorker.Stop()
7751+
7752+
err = run.Get(ctx, nil)
7753+
ts.NoError(err)
7754+
7755+
timeout_count := 0
7756+
iter := ts.client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), true, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
7757+
for iter.HasNext() {
7758+
event, err := iter.Next()
7759+
if err != nil {
7760+
break
7761+
}
7762+
7763+
// WFT timeout should come from first worker stopping and LA being canceled
7764+
if event.EventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT {
7765+
timeout_count++
7766+
}
7767+
}
7768+
7769+
ts.Equal(1, timeout_count)
7770+
}
7771+
77127772
func (ts *IntegrationTestSuite) TestLocalActivityWorkerShutdownNoHeartbeat() {
77137773
// FYI, setup of this test allows the worker to wait to stop for 10 seconds
77147774
ctx, cancel := context.WithCancel(context.Background())

test/workflow_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3500,6 +3500,35 @@ func (w *Workflows) WorkflowRawValue(ctx workflow.Context, value converter.RawVa
35003500
return returnVal, err
35013501
}
35023502

3503+
func (w *Workflows) WorkflowReactToCancel(ctx workflow.Context, localActivity bool) error {
3504+
var activities *Activities
3505+
var err error
3506+
// Allow for 2 attempts so when a worker shuts down and a 2nd one is created,
3507+
// it can use the 2nd attempt to complete the activity.
3508+
retryPolicy := temporal.RetryPolicy{
3509+
MaximumAttempts: 2,
3510+
}
3511+
3512+
if localActivity {
3513+
ctx = workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
3514+
ScheduleToCloseTimeout: 2 * time.Second,
3515+
RetryPolicy: &retryPolicy,
3516+
})
3517+
err = workflow.ExecuteLocalActivity(ctx, activities.CancelActivity).Get(ctx, nil)
3518+
} else {
3519+
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
3520+
ScheduleToCloseTimeout: 2 * time.Second,
3521+
RetryPolicy: &retryPolicy,
3522+
})
3523+
err = workflow.ExecuteActivity(ctx, activities.CancelActivity).Get(ctx, nil)
3524+
}
3525+
3526+
if err != nil {
3527+
return err
3528+
}
3529+
return nil
3530+
}
3531+
35033532
func (w *Workflows) register(worker worker.Worker) {
35043533
worker.RegisterWorkflow(w.ActivityCancelRepro)
35053534
worker.RegisterWorkflow(w.ActivityCompletionUsingID)
@@ -3648,6 +3677,7 @@ func (w *Workflows) register(worker worker.Worker) {
36483677
worker.RegisterWorkflow(w.WorkflowClientFromActivity)
36493678
worker.RegisterWorkflow(w.WorkflowTemporalPrefixSignal)
36503679
worker.RegisterWorkflow(w.WorkflowRawValue)
3680+
worker.RegisterWorkflow(w.WorkflowReactToCancel)
36513681
}
36523682

36533683
func (w *Workflows) defaultActivityOptions() workflow.ActivityOptions {

0 commit comments

Comments
 (0)