From fc47242a806cfaecdd5aa24d6b939c6a914d9259 Mon Sep 17 00:00:00 2001 From: Pooya Paridel Date: Fri, 5 Sep 2025 21:56:26 -0700 Subject: [PATCH] feat: add running operations tracking to prevent unsafe termination - Add hasRunningOperations() method to DurableContext interface - Track critical operations (ctx.step and ctx.waitForCondition) with try/finally cleanup - Prevent termination during user code execution to avoid workflow state corruption - Add unit test coverage for hasRunningOperations method This enables safe termination checks: if (durableContext.hasRunningOperations()) return; // Wait for critical ops terminate(); // Safe to terminate --- .../durable-context/durable-context.test.ts | 11 +++++++++++ .../durable-context/durable-context.ts | 19 +++++++++++++++++++ .../step-handler/step-handler.test.ts | 2 ++ .../src/handlers/step-handler/step-handler.ts | 18 +++++++++++++++--- .../wait-for-condition-handler.test.ts | 2 ++ .../wait-for-condition-handler.ts | 18 +++++++++++++++--- .../src/types/index.ts | 1 + 7 files changed, 65 insertions(+), 6 deletions(-) diff --git a/packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.test.ts b/packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.test.ts index 23d581ad..1f475298 100644 --- a/packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.test.ts +++ b/packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.test.ts @@ -138,10 +138,21 @@ describe("Durable Context", () => { mockParentContext, expect.any(Function), // createStepId expect.any(Function), // createContextLogger + expect.any(Function), // addRunningOperation + expect.any(Function), // removeRunningOperation ); expect(mockStepHandler).toHaveBeenCalledWith("test-step", stepFn, options); }); + test("should have hasRunningOperations method that returns false initially", () => { + const durableContext = createDurableContext( + mockExecutionContext, + mockParentContext, + ); + + expect(durableContext.hasRunningOperations()).toBe(false); + }); + test("should call block handler when runInChildContext method is invoked", () => { const durableContext = createDurableContext( mockExecutionContext, diff --git a/packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.ts b/packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.ts index d6ba43ea..d76107c1 100644 --- a/packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.ts +++ b/packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.ts @@ -53,6 +53,7 @@ export const createDurableContext = ( ); let stepCounter = 0; + const runningOperations = new Set(); const checkpoint = createCheckpoint(executionContext, checkpointToken || ""); const createStepId = (): string => { @@ -60,6 +61,19 @@ export const createDurableContext = ( return stepPrefix ? `${stepPrefix}-${stepCounter}` : `${stepCounter}`; }; + // Internal helpers for managing running operations + const addRunningOperation = (stepId: string): void => { + runningOperations.add(stepId); + }; + + const removeRunningOperation = (stepId: string): void => { + runningOperations.delete(stepId); + }; + + const hasRunningOperations = (): boolean => { + return runningOperations.size > 0; + }; + const step: DurableContext["step"] = ( nameOrFn: string | undefined | StepFunc, fnOrOptions?: StepFunc | StepConfig, @@ -71,6 +85,8 @@ export const createDurableContext = ( parentContext, createStepId, createContextLogger, + addRunningOperation, + removeRunningOperation, ); return stepHandler(nameOrFn, fnOrOptions, maybeOptions); }; @@ -177,6 +193,7 @@ export const createDurableContext = ( ...parentContext, _stepPrefix: stepPrefix, _stepCounter: stepCounter, + hasRunningOperations, step, runInChildContext, wait: createWaitHandler(executionContext, checkpoint, createStepId), @@ -185,6 +202,8 @@ export const createDurableContext = ( checkpoint, createStepId, createContextLogger, + addRunningOperation, + removeRunningOperation, ), createCallback, waitForCallback, diff --git a/packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.test.ts b/packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.test.ts index de82494c..294ae571 100644 --- a/packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.test.ts +++ b/packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.test.ts @@ -83,6 +83,8 @@ describe("Step Handler", () => { mockParentContext, createStepId, createMockEnrichedLogger, + jest.fn(), // addRunningOperation + jest.fn(), // removeRunningOperation ); // Reset the mock for retryPresets.default diff --git a/packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.ts b/packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.ts index 67f60c36..7e04a014 100644 --- a/packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.ts +++ b/packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.ts @@ -48,6 +48,8 @@ export const createStepHandler = ( parentContext: Context, createStepId: () => string, createContextLogger: (stepId: string, attempt?: number) => Logger, + addRunningOperation: (stepId: string) => void, + removeRunningOperation: (stepId: string) => void, ) => { return async ( nameOrFn: string | undefined | StepFunc, @@ -166,6 +168,8 @@ export const createStepHandler = ( name, fn, createContextLogger, + addRunningOperation, + removeRunningOperation, options, ); }; @@ -205,6 +209,8 @@ export const executeStep = async ( name: string | undefined, fn: StepFunc, createContextLogger: (stepId: string, attempt?: number) => Logger, + addRunningOperation: (stepId: string) => void, + removeRunningOperation: (stepId: string) => void, options?: StepConfig, ): Promise => { // Determine step semantics (default to AT_LEAST_ONCE_PER_RETRY if not specified) @@ -248,9 +254,15 @@ export const executeStep = async ( }; // Execute the step function with stepContext - const result = await OperationInterceptor.forExecution( - context.durableExecutionArn, - ).execute(name, () => fn(stepContext)); + addRunningOperation(stepId); + let result: T; + try { + result = await OperationInterceptor.forExecution( + context.durableExecutionArn, + ).execute(name, () => fn(stepContext)); + } finally { + removeRunningOperation(stepId); + } // Serialize the result for consistency const serializedResult = await safeSerialize( diff --git a/packages/lambda-durable-functions-sdk-js/src/handlers/wait-for-condition-handler/wait-for-condition-handler.test.ts b/packages/lambda-durable-functions-sdk-js/src/handlers/wait-for-condition-handler/wait-for-condition-handler.test.ts index ea361fbb..03199c52 100644 --- a/packages/lambda-durable-functions-sdk-js/src/handlers/wait-for-condition-handler/wait-for-condition-handler.test.ts +++ b/packages/lambda-durable-functions-sdk-js/src/handlers/wait-for-condition-handler/wait-for-condition-handler.test.ts @@ -81,6 +81,8 @@ describe("WaitForCondition Handler", () => { mockCheckpoint, createStepId, createMockEnrichedLogger, + jest.fn(), // addRunningOperation + jest.fn(), // removeRunningOperation ); }); diff --git a/packages/lambda-durable-functions-sdk-js/src/handlers/wait-for-condition-handler/wait-for-condition-handler.ts b/packages/lambda-durable-functions-sdk-js/src/handlers/wait-for-condition-handler/wait-for-condition-handler.ts index 2a461b14..c1a0ccf4 100644 --- a/packages/lambda-durable-functions-sdk-js/src/handlers/wait-for-condition-handler/wait-for-condition-handler.ts +++ b/packages/lambda-durable-functions-sdk-js/src/handlers/wait-for-condition-handler/wait-for-condition-handler.ts @@ -43,6 +43,8 @@ export const createWaitForConditionHandler = ( checkpoint: ReturnType, createStepId: () => string, createContextLogger: (stepId: string, attempt?: number) => Logger, + addRunningOperation: (stepId: string) => void, + removeRunningOperation: (stepId: string) => void, ) => { return async ( nameOrCheck: string | undefined | WaitForConditionCheckFunc, @@ -111,6 +113,8 @@ export const createWaitForConditionHandler = ( check, config, createContextLogger, + addRunningOperation, + removeRunningOperation, ); }; }; @@ -150,6 +154,8 @@ export const executeWaitForCondition = async ( check: WaitForConditionCheckFunc, config: WaitForConditionConfig, createContextLogger: (stepId: string, attempt?: number) => Logger, + addRunningOperation: (stepId: string) => void, + removeRunningOperation: (stepId: string) => void, ): Promise => { const serdes = config.serdes || defaultSerdes; @@ -215,9 +221,15 @@ export const executeWaitForCondition = async ( }; // Execute the check function - const newState = await OperationInterceptor.forExecution( - context.durableExecutionArn, - ).execute(name, () => check(currentState, waitForConditionContext)); + addRunningOperation(stepId); + let newState: T; + try { + newState = await OperationInterceptor.forExecution( + context.durableExecutionArn, + ).execute(name, () => check(currentState, waitForConditionContext)); + } finally { + removeRunningOperation(stepId); + } // Serialize the new state for consistency const serializedState = await safeSerialize( diff --git a/packages/lambda-durable-functions-sdk-js/src/types/index.ts b/packages/lambda-durable-functions-sdk-js/src/types/index.ts index d6cf74be..ae519c9a 100644 --- a/packages/lambda-durable-functions-sdk-js/src/types/index.ts +++ b/packages/lambda-durable-functions-sdk-js/src/types/index.ts @@ -87,6 +87,7 @@ export type DurableExecutionInvocationOutput = export interface DurableContext extends Context { _stepPrefix?: string; _stepCounter: number; + hasRunningOperations(): boolean; step: ( nameOrFn: string | undefined | StepFunc, fnOrOptions?: StepFunc | StepConfig,