Skip to content

Commit 48b54ff

Browse files
ParidelPooyaPooya Paridel
andauthored
Fix/avoid redundant checkpoints (#8)
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. ### Issue Link, if available SIM ticket: DAR-SDK-242 ### Description - Skip START checkpoint when operation status is already STARTED to prevent redundant checkpointing on retries - Add READY status support for state restoration in wait-for-condition handler to maintain state continuity across timer completions - Replace hardcoded Action strings with OperationAction constants (START, SUCCEED, RETRY) for type safety - Add test for PENDING status scenarios in both step-handler and wait-for-condition-handler ### Checklist - [Y] I have filled out every section of the PR template - [Y] I have thoroughly tested this change ### Testing #### Unit Tests Yes Co-authored-by: Pooya Paridel <parpooya@amazon.com>
1 parent 9801dc7 commit 48b54ff

File tree

8 files changed

+220
-66
lines changed

8 files changed

+220
-66
lines changed

packages/lambda-durable-functions-sdk-js/bundle-size-history.json

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,5 +188,10 @@
188188
"timestamp": "2025-09-03T22:20:22.674Z",
189189
"size": 213216,
190190
"gitCommit": "c0034633240ceb29e00fd34c67f0d33cfe0a6714"
191+
},
192+
{
193+
"timestamp": "2025-09-04T17:42:40.403Z",
194+
"size": 214426,
195+
"gitCommit": "9801dc79f2a4a980f971e2de1b85aa23b4c7a579"
191196
}
192-
]
197+
]

packages/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.test.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -815,13 +815,13 @@ describe("Mock Integration", () => {
815815
test("should use custom summaryGenerator for large payloads", async () => {
816816
const largePayload = { data: "x".repeat(300000) };
817817
const childFn = jest.fn().mockResolvedValue(largePayload);
818-
const summaryGenerator = jest.fn().mockReturnValue("Custom summary of large data");
818+
const summaryGenerator = jest
819+
.fn()
820+
.mockReturnValue("Custom summary of large data");
819821

820-
await runInChildContextHandler(
821-
TEST_CONSTANTS.CHILD_CONTEXT_NAME,
822-
childFn,
823-
{ summaryGenerator },
824-
);
822+
await runInChildContextHandler(TEST_CONSTANTS.CHILD_CONTEXT_NAME, childFn, {
823+
summaryGenerator,
824+
});
825825

826826
expect(summaryGenerator).toHaveBeenCalledWith(largePayload);
827827
expect(mockCheckpoint).toHaveBeenNthCalledWith(

packages/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,20 +181,20 @@ export const executeChildContext = async <T>(
181181
// Check if payload is too large for adaptive mode
182182
let payloadToCheckpoint = serializedResult;
183183
let replayChildren = false;
184-
184+
185185
if (
186186
serializedResult &&
187187
Buffer.byteLength(serializedResult, "utf8") > CHECKPOINT_SIZE_LIMIT
188188
) {
189189
replayChildren = true;
190-
190+
191191
// Use summary generator if provided, otherwise use empty string
192192
if (options?.summaryGenerator) {
193193
payloadToCheckpoint = options.summaryGenerator(result);
194194
} else {
195195
payloadToCheckpoint = "";
196196
}
197-
197+
198198
log(
199199
context.isVerbose,
200200
"📦",

packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.test.ts

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ describe("Step Handler", () => {
206206

207207
// Verify terminate was called
208208
expect(mockTerminationManager.terminate).toHaveBeenCalledWith({
209-
reason: TerminationReason.RETRY_INTERRUPTED_STEP,
209+
reason: TerminationReason.RETRY_SCHEDULED,
210210
message: expect.stringContaining("test-step"),
211211
});
212212
}, 10000);
@@ -553,7 +553,7 @@ describe("Step Handler", () => {
553553

554554
// Verify terminate was called
555555
expect(mockTerminationManager.terminate).toHaveBeenCalledWith({
556-
reason: TerminationReason.RETRY_INTERRUPTED_STEP,
556+
reason: TerminationReason.RETRY_SCHEDULED,
557557
message: expect.stringContaining("test-step"),
558558
});
559559
}, 10000);
@@ -652,11 +652,41 @@ describe("Step Handler", () => {
652652

653653
// Verify terminate was called with stepId in the message
654654
expect(mockTerminationManager.terminate).toHaveBeenCalledWith({
655-
reason: TerminationReason.RETRY_INTERRUPTED_STEP,
656-
message: "Retry scheduled for interrupted step test-step-id",
655+
reason: TerminationReason.RETRY_SCHEDULED,
656+
message: "Retry scheduled for test-step-id",
657657
});
658658
});
659659

660+
test("should wait for timer when status is PENDING", async () => {
661+
const stepId = "test-step-id";
662+
const hashedStepId = hashId(stepId);
663+
mockExecutionContext._stepData = {
664+
[hashedStepId]: {
665+
Id: hashedStepId,
666+
Status: OperationStatus.PENDING,
667+
},
668+
};
669+
670+
const stepFunction = jest.fn().mockResolvedValue("result");
671+
672+
const promise = stepHandler(stepId, stepFunction);
673+
674+
// Should terminate with retry scheduled message
675+
expect(mockTerminationManager.terminate).toHaveBeenCalledWith({
676+
reason: TerminationReason.RETRY_SCHEDULED,
677+
message: "Retry scheduled for test-step-id",
678+
});
679+
680+
// Should return never-resolving promise
681+
let resolved = false;
682+
promise.then(() => {
683+
resolved = true;
684+
});
685+
686+
await new Promise((resolve) => setTimeout(resolve, 50));
687+
expect(resolved).toBe(false);
688+
});
689+
660690
test("should handle missing attemptCount for interrupted step", async () => {
661691
// Set up a step that was started but not completed and has no attempt
662692
const stepId = "test-step-id";

packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.ts

Lines changed: 51 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,20 @@ import { OperationInterceptor } from "../../mocks/operation-interceptor";
2828
import { createErrorObjectFromError } from "../../utils/error-object/error-object";
2929
import { createStructuredLogger } from "../../utils/logger/structured-logger";
3030

31+
const waitForTimer = <T>(
32+
context: ExecutionContext,
33+
stepId: string,
34+
name: string | undefined,
35+
): Promise<T> => {
36+
// TODO: Current implementation assumes sequential operations only
37+
// Will be enhanced to handle concurrent operations in future milestone
38+
context.terminationManager.terminate({
39+
reason: TerminationReason.RETRY_SCHEDULED,
40+
message: `Retry scheduled for ${name || stepId}`,
41+
});
42+
return new Promise<T>(() => { });
43+
};
44+
3145
export const createStepHandler = (
3246
context: ExecutionContext,
3347
checkpoint: ReturnType<typeof createCheckpoint>,
@@ -72,6 +86,11 @@ export const createStepHandler = (
7286
throw new Error(errorMessage || "Unknown error");
7387
}
7488

89+
// If PENDING, wait for timer to complete
90+
if (stepData?.Status === OperationStatus.PENDING) {
91+
return waitForTimer(context, stepId, name);
92+
}
93+
7594
// Check for interrupted step with AT_MOST_ONCE_PER_RETRY semantics
7695
if (stepData?.Status === OperationStatus.STARTED) {
7796
const semantics = options?.semantics || StepSemantics.AtLeastOncePerRetry;
@@ -130,17 +149,15 @@ export const createStepHandler = (
130149
},
131150
});
132151

133-
context.terminationManager.terminate({
134-
reason: TerminationReason.RETRY_INTERRUPTED_STEP,
135-
message: `Retry scheduled for interrupted step ${name || stepId}`,
136-
});
137-
138-
// Return a never-resolving promise to ensure the execution doesn't continue
139-
return new Promise<T>(() => {});
152+
return waitForTimer(context, stepId, name);
140153
}
141154
}
142155
}
143156

157+
// Execute step function for READY, STARTED (AtLeastOncePerRetry), or first time (undefined)
158+
// READY: Timer completed, execute step function
159+
// STARTED: Retry after error (AtLeastOncePerRetry semantics), execute step function
160+
// undefined: First execution, execute step function
144161
return executeStep(context, checkpoint, stepId, name, fn, options);
145162
};
146163
};
@@ -184,27 +201,30 @@ export const executeStep = async <T>(
184201
const semantics = options?.semantics || StepSemantics.AtLeastOncePerRetry;
185202
const serdes = options?.serdes || defaultSerdes;
186203

187-
// Checkpoint at start for both semantics
188-
if (semantics === StepSemantics.AtMostOncePerRetry) {
189-
// Wait for checkpoint to complete
190-
await checkpoint(stepId, {
191-
Id: stepId,
192-
ParentId: context.parentId,
193-
Action: "START",
194-
SubType: OperationSubType.STEP,
195-
Type: OperationType.STEP,
196-
Name: name,
197-
});
198-
} else {
199-
// Fire and forget for AtLeastOncePerRetry
200-
checkpoint(stepId, {
201-
Id: stepId,
202-
ParentId: context.parentId,
203-
Action: "START",
204-
SubType: OperationSubType.STEP,
205-
Type: OperationType.STEP,
206-
Name: name,
207-
});
204+
// Checkpoint at start for both semantics (only if not already started)
205+
const stepData = context.getStepData(stepId);
206+
if (stepData?.Status !== OperationStatus.STARTED) {
207+
if (semantics === StepSemantics.AtMostOncePerRetry) {
208+
// Wait for checkpoint to complete
209+
await checkpoint(stepId, {
210+
Id: stepId,
211+
ParentId: context.parentId,
212+
Action: OperationAction.START,
213+
SubType: OperationSubType.STEP,
214+
Type: OperationType.STEP,
215+
Name: name,
216+
});
217+
} else {
218+
// Fire and forget for AtLeastOncePerRetry
219+
checkpoint(stepId, {
220+
Id: stepId,
221+
ParentId: context.parentId,
222+
Action: OperationAction.START,
223+
SubType: OperationSubType.STEP,
224+
Type: OperationType.STEP,
225+
Name: name,
226+
});
227+
}
208228
}
209229

210230
try {
@@ -241,7 +261,7 @@ export const executeStep = async <T>(
241261
await checkpoint(stepId, {
242262
Id: stepId,
243263
ParentId: context.parentId,
244-
Action: "SUCCEED",
264+
Action: OperationAction.SUCCEED,
245265
SubType: OperationSubType.STEP,
246266
Type: OperationType.STEP,
247267
Payload: serializedResult,
@@ -289,7 +309,7 @@ export const executeStep = async <T>(
289309
});
290310

291311
// Return a never-resolving promise to ensure the execution doesn't continue
292-
return new Promise<T>(() => {});
312+
return new Promise<T>(() => { });
293313
}
294314

295315
const stepData = context.getStepData(stepId);
@@ -349,14 +369,7 @@ export const executeStep = async <T>(
349369
},
350370
});
351371

352-
context.terminationManager.terminate({
353-
reason: TerminationReason.RETRY_SCHEDULED,
354-
message: `Retry scheduled for ${name || stepId}`,
355-
});
356-
357-
// Return a never-resolving promise to ensure the execution doesn't continue
358-
// This will be handled by Promise.race in withDurableFunctions.ts
359-
return new Promise<T>(() => {});
372+
return waitForTimer(context, stepId, name);
360373
}
361374
}
362375
};

packages/lambda-durable-functions-sdk-js/src/handlers/wait-for-condition-handler/wait-for-condition-handler.test.ts

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ describe("WaitForCondition Handler", () => {
298298

299299
expect(mockTerminationManager.terminate).toHaveBeenCalledWith({
300300
reason: TerminationReason.RETRY_SCHEDULED,
301-
message: "waitForCondition step-1 will retry in 30 seconds",
301+
message: "Retry scheduled for step-1",
302302
});
303303
});
304304
});
@@ -343,6 +343,45 @@ describe("WaitForCondition Handler", () => {
343343
expect(mockExecutionRunner.execute).toHaveBeenCalled();
344344
});
345345

346+
it("should restore state from valid checkpoint data when status is READY", async () => {
347+
const stepId = "step-1";
348+
const hashedStepId = hashId(stepId);
349+
mockExecutionContext._stepData[hashedStepId] = {
350+
Id: hashedStepId,
351+
Status: OperationStatus.READY,
352+
StepDetails: {
353+
Result: '"previous-state"', // Just the serialized state, not wrapped
354+
Attempt: 2, // System-provided attempt number
355+
},
356+
} as any;
357+
358+
const checkFunc: WaitForConditionCheckFunc<string> = jest
359+
.fn()
360+
.mockResolvedValue("ready");
361+
const config: WaitForConditionConfig<string> = {
362+
waitStrategy: (state, attempt) => {
363+
expect(state).toBe("ready");
364+
expect(attempt).toBe(2); // Should use attempt from system
365+
return { shouldContinue: false };
366+
},
367+
initialState: "initial",
368+
};
369+
370+
// Mock the execution to call the check function with the restored state
371+
mockExecutionRunner.execute.mockImplementation(
372+
async (name: any, fn: any) => {
373+
const result = await fn();
374+
return result;
375+
},
376+
);
377+
378+
const result = await waitForConditionHandler(checkFunc, config);
379+
380+
expect(result).toBe("ready");
381+
// Verify the execution runner was called
382+
expect(mockExecutionRunner.execute).toHaveBeenCalled();
383+
});
384+
346385
it("should use initial state when checkpoint data is invalid JSON", async () => {
347386
const stepId = "step-1";
348387
const hashedStepId = hashId(stepId);
@@ -482,12 +521,46 @@ describe("WaitForCondition Handler", () => {
482521

483522
expect(mockTerminationManager.terminate).toHaveBeenCalledWith({
484523
reason: TerminationReason.RETRY_SCHEDULED,
485-
message: "waitForCondition step-1 will retry in 30 seconds",
524+
message: "Retry scheduled for step-1",
486525
});
487526

488527
// Verify that the promise is indeed never-resolving by checking its constructor
489528
expect(promise).toBeInstanceOf(Promise);
490529
});
530+
531+
it("should wait for timer when status is PENDING", async () => {
532+
const stepId = "step-1";
533+
const hashedStepId = hashId(stepId);
534+
mockExecutionContext._stepData[hashedStepId] = {
535+
Id: hashedStepId,
536+
Status: OperationStatus.PENDING,
537+
} as any;
538+
539+
const checkFunc: WaitForConditionCheckFunc<string> = jest
540+
.fn()
541+
.mockResolvedValue("ready");
542+
const config: WaitForConditionConfig<string> = {
543+
waitStrategy: () => ({ shouldContinue: false }),
544+
initialState: "initial",
545+
};
546+
547+
const promise = waitForConditionHandler(checkFunc, config);
548+
549+
// Should terminate with retry scheduled message
550+
expect(mockTerminationManager.terminate).toHaveBeenCalledWith({
551+
reason: TerminationReason.RETRY_SCHEDULED,
552+
message: "Retry scheduled for step-1",
553+
});
554+
555+
// Should return never-resolving promise
556+
let resolved = false;
557+
promise.then(() => {
558+
resolved = true;
559+
});
560+
561+
await new Promise((resolve) => setTimeout(resolve, 50));
562+
expect(resolved).toBe(false);
563+
});
491564
});
492565

493566
describe("Error handling", () => {

0 commit comments

Comments
 (0)