Skip to content

Commit d9d9e5c

Browse files
ParidelPooyaPooya Paridel
andauthored
feat: add running operations tracking to prevent unsafe termination (#10)
## Running Operations Tracking ### **What We Built:** 1. hasRunningOperations() method - Clean API to check if critical operations are running 2. Internal tracking system - addRunningOperation() / removeRunningOperation() with Set-based storage 3. Automatic cleanup - try/finally blocks ensure operations are removed even on errors ### **Where We Added Tracking (Only 2 Operations):** ✅ ctx.step: • **Executes user code** • **Critical for workflow state** - if interrupted, workflow state becomes inconsistent ✅ ctx.waitForCondition: • **Executes user condition checks** • **Critical for workflow logic** - if interrupted, condition evaluation becomes unreliable ### **Why NOT Other Operations:** ❌ ctx.runInChildContext - Safe to interrupt, just context management ❌ ctx.wait - Simple timer, no user code execution ❌ ctx.createCallback - Just creates callback config, no execution ❌ ctx.waitForCallback - Uses ctx.step internally, so already tracked automatically ❌ ctx.map/parallel/executeConcurrently - Use ctx.runInChildContext internally, which is safe to interrupt ### **Key Insight:** We only needed to add tracking to the 2 fundamental operations that execute user code: • ctx.step - Direct user code execution • ctx.waitForCondition - User condition evaluation All other operations either: • Are safe to interrupt (context/config operations) • Delegate to tracked operations (so get protection automatically) ### **The Result:** ```typescript // Safe termination check if (durableContext.hasRunningOperations()) { return; // Critical user code is running - wait } terminate(); // Safe to terminate ``` Surgical precision - Only the 2 operations that actually execute user code are tracked! 🎯 Co-authored-by: Pooya Paridel <parpooya@amazon.com>
1 parent 62c11d6 commit d9d9e5c

File tree

7 files changed

+65
-6
lines changed

7 files changed

+65
-6
lines changed

packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.test.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,21 @@ describe("Durable Context", () => {
138138
mockParentContext,
139139
expect.any(Function), // createStepId
140140
expect.any(Function), // createContextLogger
141+
expect.any(Function), // addRunningOperation
142+
expect.any(Function), // removeRunningOperation
141143
);
142144
expect(mockStepHandler).toHaveBeenCalledWith("test-step", stepFn, options);
143145
});
144146

147+
test("should have hasRunningOperations method that returns false initially", () => {
148+
const durableContext = createDurableContext(
149+
mockExecutionContext,
150+
mockParentContext,
151+
);
152+
153+
expect(durableContext.hasRunningOperations()).toBe(false);
154+
});
155+
145156
test("should call block handler when runInChildContext method is invoked", () => {
146157
const durableContext = createDurableContext(
147158
mockExecutionContext,

packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,27 @@ export const createDurableContext = (
5353
);
5454

5555
let stepCounter = 0;
56+
const runningOperations = new Set<string>();
5657
const checkpoint = createCheckpoint(executionContext, checkpointToken || "");
5758

5859
const createStepId = (): string => {
5960
stepCounter++;
6061
return stepPrefix ? `${stepPrefix}-${stepCounter}` : `${stepCounter}`;
6162
};
6263

64+
// Internal helpers for managing running operations
65+
const addRunningOperation = (stepId: string): void => {
66+
runningOperations.add(stepId);
67+
};
68+
69+
const removeRunningOperation = (stepId: string): void => {
70+
runningOperations.delete(stepId);
71+
};
72+
73+
const hasRunningOperations = (): boolean => {
74+
return runningOperations.size > 0;
75+
};
76+
6377
const step: DurableContext["step"] = <T>(
6478
nameOrFn: string | undefined | StepFunc<T>,
6579
fnOrOptions?: StepFunc<T> | StepConfig<T>,
@@ -71,6 +85,8 @@ export const createDurableContext = (
7185
parentContext,
7286
createStepId,
7387
createContextLogger,
88+
addRunningOperation,
89+
removeRunningOperation,
7490
);
7591
return stepHandler(nameOrFn, fnOrOptions, maybeOptions);
7692
};
@@ -177,6 +193,7 @@ export const createDurableContext = (
177193
...parentContext,
178194
_stepPrefix: stepPrefix,
179195
_stepCounter: stepCounter,
196+
hasRunningOperations,
180197
step,
181198
runInChildContext,
182199
wait: createWaitHandler(executionContext, checkpoint, createStepId),
@@ -185,6 +202,8 @@ export const createDurableContext = (
185202
checkpoint,
186203
createStepId,
187204
createContextLogger,
205+
addRunningOperation,
206+
removeRunningOperation,
188207
),
189208
createCallback,
190209
waitForCallback,

packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ describe("Step Handler", () => {
8383
mockParentContext,
8484
createStepId,
8585
createMockEnrichedLogger,
86+
jest.fn(), // addRunningOperation
87+
jest.fn(), // removeRunningOperation
8688
);
8789

8890
// Reset the mock for retryPresets.default

packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ export const createStepHandler = (
4848
parentContext: Context,
4949
createStepId: () => string,
5050
createContextLogger: (stepId: string, attempt?: number) => Logger,
51+
addRunningOperation: (stepId: string) => void,
52+
removeRunningOperation: (stepId: string) => void,
5153
) => {
5254
return async <T>(
5355
nameOrFn: string | undefined | StepFunc<T>,
@@ -166,6 +168,8 @@ export const createStepHandler = (
166168
name,
167169
fn,
168170
createContextLogger,
171+
addRunningOperation,
172+
removeRunningOperation,
169173
options,
170174
);
171175
};
@@ -205,6 +209,8 @@ export const executeStep = async <T>(
205209
name: string | undefined,
206210
fn: StepFunc<T>,
207211
createContextLogger: (stepId: string, attempt?: number) => Logger,
212+
addRunningOperation: (stepId: string) => void,
213+
removeRunningOperation: (stepId: string) => void,
208214
options?: StepConfig<T>,
209215
): Promise<T> => {
210216
// Determine step semantics (default to AT_LEAST_ONCE_PER_RETRY if not specified)
@@ -248,9 +254,15 @@ export const executeStep = async <T>(
248254
};
249255

250256
// Execute the step function with stepContext
251-
const result = await OperationInterceptor.forExecution(
252-
context.durableExecutionArn,
253-
).execute(name, () => fn(stepContext));
257+
addRunningOperation(stepId);
258+
let result: T;
259+
try {
260+
result = await OperationInterceptor.forExecution(
261+
context.durableExecutionArn,
262+
).execute(name, () => fn(stepContext));
263+
} finally {
264+
removeRunningOperation(stepId);
265+
}
254266

255267
// Serialize the result for consistency
256268
const serializedResult = await safeSerialize(

packages/lambda-durable-functions-sdk-js/src/handlers/wait-for-condition-handler/wait-for-condition-handler.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ describe("WaitForCondition Handler", () => {
8181
mockCheckpoint,
8282
createStepId,
8383
createMockEnrichedLogger,
84+
jest.fn(), // addRunningOperation
85+
jest.fn(), // removeRunningOperation
8486
);
8587
});
8688

packages/lambda-durable-functions-sdk-js/src/handlers/wait-for-condition-handler/wait-for-condition-handler.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ export const createWaitForConditionHandler = (
4343
checkpoint: ReturnType<typeof createCheckpoint>,
4444
createStepId: () => string,
4545
createContextLogger: (stepId: string, attempt?: number) => Logger,
46+
addRunningOperation: (stepId: string) => void,
47+
removeRunningOperation: (stepId: string) => void,
4648
) => {
4749
return async <T>(
4850
nameOrCheck: string | undefined | WaitForConditionCheckFunc<T>,
@@ -111,6 +113,8 @@ export const createWaitForConditionHandler = (
111113
check,
112114
config,
113115
createContextLogger,
116+
addRunningOperation,
117+
removeRunningOperation,
114118
);
115119
};
116120
};
@@ -150,6 +154,8 @@ export const executeWaitForCondition = async <T>(
150154
check: WaitForConditionCheckFunc<T>,
151155
config: WaitForConditionConfig<T>,
152156
createContextLogger: (stepId: string, attempt?: number) => Logger,
157+
addRunningOperation: (stepId: string) => void,
158+
removeRunningOperation: (stepId: string) => void,
153159
): Promise<T> => {
154160
const serdes = config.serdes || defaultSerdes;
155161

@@ -215,9 +221,15 @@ export const executeWaitForCondition = async <T>(
215221
};
216222

217223
// Execute the check function
218-
const newState = await OperationInterceptor.forExecution(
219-
context.durableExecutionArn,
220-
).execute(name, () => check(currentState, waitForConditionContext));
224+
addRunningOperation(stepId);
225+
let newState: T;
226+
try {
227+
newState = await OperationInterceptor.forExecution(
228+
context.durableExecutionArn,
229+
).execute(name, () => check(currentState, waitForConditionContext));
230+
} finally {
231+
removeRunningOperation(stepId);
232+
}
221233

222234
// Serialize the new state for consistency
223235
const serializedState = await safeSerialize(

packages/lambda-durable-functions-sdk-js/src/types/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ export type DurableExecutionInvocationOutput =
8787
export interface DurableContext extends Context {
8888
_stepPrefix?: string;
8989
_stepCounter: number;
90+
hasRunningOperations(): boolean;
9091
step: <T>(
9192
nameOrFn: string | undefined | StepFunc<T>,
9293
fnOrOptions?: StepFunc<T> | StepConfig<T>,

0 commit comments

Comments
 (0)