Skip to content

Commit ac6b3db

Browse files
authored
feat(sdk): add checkpoint queue completion mechanism (#335)
Implement waitForQueueCompletion to ensure all pending checkpoint operations complete before function termination, preventing data loss. Add clearQueue method for graceful termination handling. - Add waitForQueueCompletion() method to CheckpointManager - Add clearQueue() public method to CheckpointManager - Integrate queue completion in withDurableExecution with 10s timeout - Clear queue on termination without waiting - Update context validation tests to reflect new behavior - Add unit tests for queue completion functionality
1 parent befa81d commit ac6b3db

File tree

10 files changed

+402
-21
lines changed

10 files changed

+402
-21
lines changed

packages/aws-durable-execution-sdk-js-examples/src/examples/context-validation/parent-context-in-child/parent-context-in-child.test.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,12 @@ createTests({
2222

2323
const operations = execution.getOperations();
2424

25-
// Based on actual behavior: no operations are created when validation fails immediately
26-
expect(operations).toHaveLength(0);
25+
// The child-context operation is now properly persisted before error occurs,
26+
// thanks to waitForQueueCompletion. This ensures the checkpoint is saved
27+
// regardless of the events that happens after that
28+
expect(operations).toHaveLength(1);
29+
expect(operations[0].getName()).toBe("child-context");
30+
expect(operations[0].getStatus()).toBe("STARTED");
2731
});
2832
},
2933
});

packages/aws-durable-execution-sdk-js-examples/src/examples/context-validation/parent-context-in-wait-condition/parent-context-in-wait-condition.test.ts

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,22 @@ createTests({
2626

2727
const operations = execution.getOperations();
2828

29-
// Note: runInChildContext doesn't await the checkpoint for STARTING the context.
30-
// If termination happens very fast (like in this case where validation fails immediately),
31-
// the child-context operation may not be persisted to the checkpoint at all.
32-
// This is expected behavior - the validation catches the error before any operations complete.
29+
// With waitForQueueCompletion, operations that start are now properly persisted
30+
// even when validation fails later, which is the correct behavior
3331

34-
// The child-context operation may or may not exist depending on checkpoint timing
32+
// The child-context operation should exist and be in STARTED state
3533
const childContextOp = operations.find(
3634
(op: any) => op.getName() === "child-context",
3735
);
38-
// If it exists, it should be in STARTED state
39-
if (childContextOp) {
40-
expect(childContextOp.getStatus()).toBe(OperationStatus.STARTED);
41-
}
36+
expect(childContextOp).toBeDefined();
37+
expect(childContextOp!.getStatus()).toBe(OperationStatus.STARTED);
4238

43-
// The wrong wait condition should NOT exist - validation prevented it
39+
// The wrong wait condition should exist since it started before validation failed
4440
const wrongWaitOp = operations.find(
4541
(op: any) => op.getName() === "wrong-wait-condition",
4642
);
47-
expect(wrongWaitOp).toBeUndefined();
43+
expect(wrongWaitOp).toBeDefined();
44+
expect(wrongWaitOp!.getStatus()).toBe(OperationStatus.STARTED);
4845

4946
// The nested wrong step should NOT exist anywhere
5047
// 1. Should not exist at root level
@@ -66,8 +63,8 @@ createTests({
6663
);
6764
expect(shouldNotStartWait).toBeUndefined();
6865

69-
// Should have at most the child-context operation (0 or 1 operations)
70-
expect(operations.length).toBeLessThanOrEqual(1);
66+
// Should have exactly 2 operations: child-context and wrong-wait-condition
67+
expect(operations.length).toBe(2);
7168
});
7269
},
7370
});

packages/aws-durable-execution-sdk-js/src/testing/create-test-durable-context.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ export function createTestDurableContext(options?: {
108108
force: jest.fn().mockResolvedValue(undefined),
109109
setTerminating: jest.fn(),
110110
hasPendingAncestorCompletion: jest.fn().mockReturnValue(false),
111+
waitForQueueCompletion: jest.fn().mockResolvedValue(undefined),
111112
};
112113

113114
const mockDurableExecution = {

packages/aws-durable-execution-sdk-js/src/testing/mock-checkpoint.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ export interface CheckpointFunction extends Checkpoint {
77
force(): Promise<void>;
88
setTerminating(): void;
99
hasPendingAncestorCompletion(stepId: string): boolean;
10+
waitForQueueCompletion(): Promise<void>;
1011
}
1112

1213
export const createMockCheckpoint = (
@@ -18,12 +19,13 @@ export const createMockCheckpoint = (
1819
const mockFn = jest.fn(
1920
mockImplementation || jest.fn().mockResolvedValue(undefined),
2021
);
21-
22+
2223
const mockCheckpoint = Object.assign(mockFn, {
2324
checkpoint: mockFn, // Same function so calls are tracked together
2425
force: jest.fn().mockResolvedValue(undefined),
2526
setTerminating: jest.fn(),
2627
hasPendingAncestorCompletion: jest.fn().mockReturnValue(false),
28+
waitForQueueCompletion: jest.fn().mockResolvedValue(undefined),
2729
}) as jest.MockedFunction<CheckpointFunction>;
2830

2931
return mockCheckpoint;

packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-helper.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ export interface Checkpoint {
66
force?(): Promise<void>;
77
setTerminating?(): void;
88
hasPendingAncestorCompletion?(stepId: string): boolean;
9+
waitForQueueCompletion(): Promise<void>;
910
}

packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-manager.ts

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ export class CheckpointManager implements Checkpoint {
4040
resolve: () => void;
4141
reject: (error: Error) => void;
4242
}> = [];
43+
private queueCompletionResolver: (() => void) | null = null;
44+
private queueCompletionTimeout: NodeJS.Timeout | null = null;
4345
private readonly MAX_PAYLOAD_SIZE = 750 * 1024; // 750KB in bytes
4446
private isTerminating = false;
4547
private static textEncoder = new TextEncoder();
@@ -98,6 +100,33 @@ export class CheckpointManager implements Checkpoint {
98100
});
99101
}
100102

103+
async waitForQueueCompletion(): Promise<void> {
104+
if (this.queue.length === 0 && !this.isProcessing) {
105+
return;
106+
}
107+
108+
return new Promise<void>((resolve, reject) => {
109+
this.queueCompletionResolver = resolve;
110+
111+
// Set a timeout to prevent infinite waiting
112+
this.queueCompletionTimeout = setTimeout(() => {
113+
this.queueCompletionResolver = null;
114+
this.queueCompletionTimeout = null;
115+
// Clear the queue since it's taking too long
116+
this.clearQueue();
117+
reject(new Error("Timeout waiting for checkpoint queue completion"));
118+
}, 3000); // 3 second timeout
119+
});
120+
}
121+
122+
public clearQueue(): void {
123+
// Silently clear queue - we're terminating so no need to reject promises
124+
this.queue = [];
125+
this.forceCheckpointPromises = [];
126+
// Resolve any waiting queue completion promises since we're clearing
127+
this.notifyQueueCompletion();
128+
}
129+
101130
// Alias for backward compatibility with Checkpoint interface
102131
async force(): Promise<void> {
103132
return this.forceCheckpoint();
@@ -321,6 +350,9 @@ export class CheckpointManager implements Checkpoint {
321350

322351
const checkpointError = this.classifyCheckpointError(error);
323352

353+
// Clear remaining queue silently - we're terminating
354+
this.clearQueue();
355+
324356
this.terminationManager.terminate({
325357
reason: TerminationReason.CHECKPOINT_FAILED,
326358
message: checkpointError.message,
@@ -333,10 +365,24 @@ export class CheckpointManager implements Checkpoint {
333365
setImmediate(() => {
334366
this.processQueue();
335367
});
368+
} else {
369+
// Queue is empty and processing is done - notify all waiting promises
370+
this.notifyQueueCompletion();
336371
}
337372
}
338373
}
339374

375+
private notifyQueueCompletion(): void {
376+
if (this.queueCompletionResolver) {
377+
if (this.queueCompletionTimeout) {
378+
clearTimeout(this.queueCompletionTimeout);
379+
this.queueCompletionTimeout = null;
380+
}
381+
this.queueCompletionResolver();
382+
this.queueCompletionResolver = null;
383+
}
384+
}
385+
340386
private async processBatch(batch: QueuedCheckpoint[]): Promise<void> {
341387
const updates: OperationUpdate[] = batch.map((item) => {
342388
const hashedStepId = hashId(item.stepId);
@@ -368,10 +414,7 @@ export class CheckpointManager implements Checkpoint {
368414
})),
369415
});
370416

371-
const response = await this.storage.checkpoint(
372-
checkpointData,
373-
this.logger,
374-
);
417+
const response = await this.storage.checkpoint(checkpointData, this.logger);
375418

376419
if (response.CheckpointToken) {
377420
this.currentTaskToken = response.CheckpointToken;
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
import { CheckpointManager } from "./checkpoint-manager";
2+
import { createMockExecutionContext } from "../../testing/mock-context";
3+
import { TerminationManager } from "../../termination-manager/termination-manager";
4+
import { EventEmitter } from "events";
5+
import { createDefaultLogger } from "../logger/default-logger";
6+
7+
jest.mock("../../utils/logger/logger", () => ({
8+
log: jest.fn(),
9+
}));
10+
11+
describe("CheckpointManager Queue Completion", () => {
12+
let checkpointManager: CheckpointManager;
13+
let mockContext: any;
14+
let mockTerminationManager: TerminationManager;
15+
let mockEmitter: EventEmitter;
16+
let mockLogger: any;
17+
18+
beforeEach(() => {
19+
mockContext = createMockExecutionContext();
20+
mockTerminationManager = new TerminationManager();
21+
mockEmitter = new EventEmitter();
22+
mockLogger = createDefaultLogger();
23+
24+
checkpointManager = new CheckpointManager(
25+
"test-arn",
26+
mockContext._stepData,
27+
mockContext.state,
28+
mockTerminationManager,
29+
undefined,
30+
"test-token",
31+
mockEmitter,
32+
mockLogger,
33+
new Set<string>(),
34+
);
35+
});
36+
37+
describe("waitForQueueCompletion", () => {
38+
it("should resolve immediately when queue is empty", async () => {
39+
await expect(
40+
checkpointManager.waitForQueueCompletion(),
41+
).resolves.toBeUndefined();
42+
});
43+
44+
it("should wait until queue is empty", async () => {
45+
const mockCheckpoint = jest
46+
.fn()
47+
.mockImplementation(
48+
() => new Promise((resolve) => setTimeout(() => resolve({}), 200)),
49+
);
50+
(checkpointManager as any).storage = { checkpoint: mockCheckpoint };
51+
52+
// Add item to queue
53+
checkpointManager.checkpoint("test-step", {});
54+
55+
// Should wait for queue to empty
56+
await expect(
57+
checkpointManager.waitForQueueCompletion(),
58+
).resolves.toBeUndefined();
59+
expect(mockCheckpoint).toHaveBeenCalled();
60+
});
61+
62+
it("should handle multiple concurrent waits", async () => {
63+
const waits = Promise.all([
64+
checkpointManager.waitForQueueCompletion(),
65+
checkpointManager.waitForQueueCompletion(),
66+
checkpointManager.waitForQueueCompletion(),
67+
]);
68+
69+
await expect(waits).resolves.toEqual([undefined, undefined, undefined]);
70+
});
71+
72+
it("should timeout after 3 seconds if queue doesn't complete", async () => {
73+
jest.useFakeTimers();
74+
75+
const mockCheckpoint = jest.fn().mockImplementation(
76+
() => new Promise(() => {}), // Never resolves
77+
);
78+
(checkpointManager as any).storage = { checkpoint: mockCheckpoint };
79+
80+
// Add item to queue
81+
checkpointManager.checkpoint("test-step", {});
82+
83+
const waitPromise = checkpointManager.waitForQueueCompletion();
84+
85+
// Fast-forward time by 3 seconds
86+
jest.advanceTimersByTime(3000);
87+
88+
await expect(waitPromise).rejects.toThrow(
89+
"Timeout waiting for checkpoint queue completion",
90+
);
91+
92+
jest.useRealTimers();
93+
});
94+
95+
it("should clear queue on timeout", async () => {
96+
jest.useFakeTimers();
97+
98+
const mockCheckpoint = jest.fn().mockImplementation(
99+
() => new Promise(() => {}), // Never resolves
100+
);
101+
(checkpointManager as any).storage = { checkpoint: mockCheckpoint };
102+
103+
// Add items to queue
104+
checkpointManager.checkpoint("test-step-1", {});
105+
checkpointManager.checkpoint("test-step-2", {});
106+
107+
expect((checkpointManager as any).queue.length).toBe(2);
108+
109+
const waitPromise = checkpointManager.waitForQueueCompletion();
110+
111+
// Fast-forward time by 3 seconds
112+
jest.advanceTimersByTime(3000);
113+
114+
await expect(waitPromise).rejects.toThrow(
115+
"Timeout waiting for checkpoint queue completion",
116+
);
117+
118+
// Queue should be cleared
119+
expect((checkpointManager as any).queue.length).toBe(0);
120+
121+
jest.useRealTimers();
122+
});
123+
});
124+
125+
describe("clearQueue", () => {
126+
it("should clear the queue", () => {
127+
const mockCheckpoint = jest.fn().mockResolvedValue({});
128+
(checkpointManager as any).storage = { checkpoint: mockCheckpoint };
129+
130+
// Add items to queue
131+
checkpointManager.checkpoint("test-step-1", {});
132+
checkpointManager.checkpoint("test-step-2", {});
133+
134+
expect((checkpointManager as any).queue.length).toBe(2);
135+
136+
checkpointManager.clearQueue();
137+
138+
expect((checkpointManager as any).queue.length).toBe(0);
139+
});
140+
141+
it("should allow waitForQueueCompletion to resolve after clearQueue", async () => {
142+
const mockCheckpoint = jest.fn().mockResolvedValue({});
143+
(checkpointManager as any).storage = { checkpoint: mockCheckpoint };
144+
145+
// Add item to queue
146+
checkpointManager.checkpoint("test-step", {});
147+
148+
// Clear queue
149+
checkpointManager.clearQueue();
150+
151+
// Wait should resolve immediately
152+
await expect(
153+
checkpointManager.waitForQueueCompletion(),
154+
).resolves.toBeUndefined();
155+
});
156+
});
157+
});

0 commit comments

Comments
 (0)