Skip to content

Commit fafb936

Browse files
authored
feat(sdk): implement centralized termination (#345)
# Centralized Termination Refactor Language SDK to use a centralized `OperationCoordinator` that manages operation lifecycle and termination decisions, replacing the siloed termination logic currently spread across handlers. ## Changes list - Created OperationLifecycleState enum with 5 states: EXECUTING, RETRY_WAITING, IDLE_NOT_AWAITED, IDLE_AWAITED, COMPLETED - Created OperationMetadata interface to track operation metadata (stepId, name, type, subType, parentId) - Created OperationInfo interface with fields: stepId, state, metadata, endTimestamp, timer, resolver, pollCount, pollStartTime - Added 6 new methods to Checkpoint interface: markOperationState(), waitForRetryTimer(), waitForStatusChange(), markOperationAwaited(), getOperationState(), getAllOperations() - Added operations Map to CheckpointManager to track all operation lifecycle states - Implemented markOperationState() method that updates operation state and triggers automatic cleanup when state becomes COMPLETED - Implemented waitForRetryTimer() method that waits for retry timer expiration then polls backend every 5 seconds - Implemented waitForStatusChange() method that polls backend for status changes with incremental backoff (1s → 10s max) - Implemented markOperationAwaited() method to transition operations from IDLE_NOT_AWAITED to IDLE_AWAITED - Implemented checkAndTerminate() method with 4 termination rules: queue empty, not processing, no force checkpoint promises, no EXECUTING operations - Implemented determineTerminationReason() with priority: RETRY_SCHEDULED > WAIT_SCHEDULED > CALLBACK_PENDING - Added 200ms termination cooldown with scheduleTermination() and executeTermination() methods - Added 15-minute max polling duration check in forceRefreshAndCheckStatus() to prevent infinite polling - Implemented startTimerWithPolling() to initialize polling with appropriate delay based on endTimestamp - Implemented forceRefreshAndCheckStatus() to poll backend, compare old/new status, and resolve promises on status change - Implemented cleanupOperation() to clear timers and resolvers for single operation - Implemented cleanupAllOperations() to clear all timers and resolvers during termination - Added ancestor completion check in checkAndTerminate() to clean up operations whose ancestors are complete - Rewrote wait-handler.ts to use centralized approach: reduced from 150+ lines to 90 lines (40% reduction) - Rewrote invoke-handler.ts to use centralized approach with cleaner architecture - Rewrote callback.ts and callback-promise.ts to use centralized approach: reduced callback-promise from 130 to 72 lines (45% reduction) - Rewrote step-handler.ts to use centralized approach: reduced from 548 to 260 lines (52% reduction) - Rewrote wait-for-condition-handler.ts to use centralized approach: reduced from 454 to 220 lines (52% reduction) - Removed hasRunningOperations(), addRunningOperation(), removeRunningOperation() methods from all handlers - Removed runningOperations Set from DurableContext class - Removed operationsEmitter EventEmitter from DurableContext class - Removed OPERATIONS_COMPLETE_EVENT constant from constants.ts - Deleted wait-before-continue.ts utility - Deleted wait-before-continue.test.ts - Updated all handler signatures to remove addRunningOperation, removeRunningOperation, hasRunningOperations, getOperationsEmitter parameters - All handlers now call checkpoint.markOperationState(stepId, OperationLifecycleState.EXECUTING) before executing user code - All handlers now call checkpoint.markOperationState(stepId, OperationLifecycleState.COMPLETED) after completion Handlers with retry logic call checkpoint.markOperationState(stepId, OperationLifecycleState.RETRY_WAITING, {endTimestamp}) then await checkpoint.waitForRetryTimer(stepId) - Handlers without user code execution call checkpoint.markOperationState(stepId, OperationLifecycleState.IDLE_NOT_AWAITED) in phase 1 - Handlers call checkpoint.markOperationAwaited(stepId) when operation is awaited in phase 2 - Handlers call await checkpoint.waitForStatusChange(stepId) to wait for external events (callbacks, invokes, waits) - Removed all manual termination logic from handlers (no more terminate() helper calls) - Removed all manual polling logic from handlers (no more waitBeforeContinue() calls) - Rewrote callback-promise.test.ts to test checkpoint-based waiting - Rewrote callback.test.ts to test two-phase callback creation - Rewrote invoke-handler-two-phase.test.ts to test two-phase invoke execution - Rewrote invoke-handler.test.ts to test invoke handler with centralized termination - Rewrote step-handler-two-phase.test.ts to test two-phase step execution - Rewrote step-handler.test.ts to test step handler with centralized termination - Rewrote step-handler.timing.test.ts to test retry timing with waitForRetryTimer() - Rewrote wait-for-condition-handler-two-phase.test.ts to test two-phase execution - Rewrote wait-for-condition-handler.test.ts to test wait-for-condition with centralized termination - Rewrote wait-for-condition-handler.timing.test.ts to test retry timing - Added wait-handler-comparison.test.ts to compare v1 and v2 behavior - Rewrote wait-handler-two-phase.test.ts to test two-phase wait execution - Rewrote wait-handler.test.ts to test wait handler with centralized termination - Removed parts durable-context.unit.test.ts related to operation tracking - Added type conversion check for endTimestamp in startTimerWithPolling() to handle non-Date objects - Added incremental backoff for polling: starts at 1s, increases by 1s per poll, caps at 10s - Added pollCount tracking to OperationInfo for backoff calculation - Added pollStartTime tracking to OperationInfo for max duration check ## Termination Rules The invocation can be safely terminated when: - Checkpoint queue is empty - No pending checkpoint operations - No active checkpoint API call in flight - All force checkpoint requests completed - No user code currently running - Ancestors are not completed All other operation states are safe to terminate because the backend will reinvoke the when needed: - `RETRY_WAITING` - Backend reinvokes when retry timer expires - `IDLE_AWAITED` - Backend reinvokes when external event occurs - `COMPLETED` - Operation finished, no reinvocation needed **Key Insight:** We only block termination when user code is executing (`EXECUTING` state) or when checkpoint operations are in progress. The backend handles all other cases by reinvoking the Lambda at the appropriate time. ## Current Architecture Problems - Siloed Termination Logic: Each handler independently decides when to terminate - Duplicated Code: `hasRunningOperations()` and `waitBeforeContinue()` logic repeated across handlers - Complex State Tracking: Operation state scattered across handlers, checkpoint, and context - Difficult to Debug: No central view of why termination did/didn't happen - Poluted operation logic: operations like step, wait, ... have a lot of logics not related to operation but rather related to safe termination. - No gurantee of correct state to terminate: we could end up terminating to early, or terminating before processing add received states - Synchronization: checkpoint can return result when we are in termination process. - Early-completing operations: current logic does not handle Early-completing operations like promise.race ot parallel/map with minSuccessful in a good way and we are finding many cases that are not handled correctly. ## Proposed Architecture ### Core Components ``` ┌─────────────────┐ │ Handlers │ (step, wait, invoke, callback, etc.) │ (DurablePromise)│ └────────┬────────┘ │ notify lifecycle events + persist state ▼ ┌─────────────────┐ │ Checkpoint │ Persists operation state + tracks lifecycle │ (Enhanced) │ + manages timers + decides termination └─────────────────┘ ``` **Key Change:** Instead of creating a separate `OperationCoordinator`, we enhance the existing `Checkpoint` interface to include operation lifecycle management and termination logic. Checkpoint later will be renamed to `OperationCoordinator` ### Operation States ```typescript enum OperationLifecycleState { EXECUTING, // Running user code (step function, waitForCondition check) RETRY_WAITING, // Waiting for retry timer, will re-execute user code (phase 1) IDLE_NOT_AWAITED, // Waiting for external event, not awaited yet (phase 1) IDLE_AWAITED, // Waiting for external event, awaited (phase 2) COMPLETED, // Operation finished (success or permanent failure) } ``` ### Operation Types by Execution Pattern | Operation | Executes User Code? | Retry Logic? | Phase 1 Behavior | Phase 2 Behavior | | -------------------- | ------------------- | --------------- | ----------------------- | -------------------- | | **step** | ✅ Yes | ✅ Yes | Execute + retry loop | Return cached result | | **waitForCondition** | ✅ Yes | ✅ Yes | Check + retry loop | Return cached result | | **wait** | ❌ No | ❌ No | Mark idle, return | Wait for timer | | **invoke** | ❌ No | ❌ No | Start invoke, return | Wait for completion | | **callback** | ❌ No | ❌ No | Create callback, return | Wait for completion | | **map/parallel** | ✅ Via children | ✅ Via children | Execute children | Return cached result | ## Two-Phase Execution Pattern ### Operations That Execute User Code (step, waitForCondition) **Phase 1: Execute with Retry Loop** ```typescript const phase1Promise = (async () => { // Register operation on first call checkpoint.markOperationState(stepId, OperationLifecycleState.EXECUTING, { metadata: { stepId, name, type, subType, parentId }, }); while (true) { const status = context.getStepData(stepId)?.Status; // Check cached status first if (status === SUCCEEDED) { checkpoint.markOperationState(stepId, OperationLifecycleState.COMPLETED); return cachedResult; } if (status === FAILED) { checkpoint.markOperationState(stepId, OperationLifecycleState.COMPLETED); throw cachedError; } // Status is PENDING (retry scheduled) if (status === PENDING) { checkpoint.markOperationState( stepId, OperationLifecycleState.RETRY_WAITING, { endTimestamp: stepData.NextAttemptTimestamp, }, ); await checkpoint.waitForRetryTimer(stepId); // Timer expired, continue to execute } // Execute user code checkpoint.markOperationState(stepId, OperationLifecycleState.EXECUTING); try { const result = await executeUserCode(); await checkpoint.checkpoint(stepId, { Action: SUCCEED, Payload: result, }); checkpoint.markOperationState(stepId, OperationLifecycleState.COMPLETED); return result; } catch (error) { const retryDecision = retryStrategy(error, attempt); if (!retryDecision.shouldRetry) { // Permanent failure await checkpoint.checkpoint(stepId, { Action: FAIL, Error: error, }); checkpoint.markOperationState( stepId, OperationLifecycleState.COMPLETED, ); throw error; } // Schedule retry await checkpoint.checkpoint(stepId, { Action: RETRY, StepOptions: { NextAttemptDelaySeconds: delay }, }); // Loop continues to PENDING check above continue; } } })(); phase1Promise.catch(() => {}); // Prevent unhandled rejection ``` **Phase 2: Return Phase 1 Result** ```typescript return new DurablePromise(async () => { return await phase1Promise; // Just return phase 1 result }); ``` ### Operations That Don't Execute User Code (wait, invoke, callback) **Phase 1: Start Operation, Mark Idle** ```typescript const phase1Promise = (async () => { checkpoint.markOperationState(stepId, OperationLifecycleState.EXECUTING, { metadata: { stepId, name, type, subType, parentId }, }); const status = context.getStepData(stepId)?.Status; // Check cached status if (status === SUCCEEDED) { checkpoint.markOperationState(stepId, OperationLifecycleState.COMPLETED); return cachedResult; } if (status === FAILED) { checkpoint.markOperationState(stepId, OperationLifecycleState.COMPLETED); throw cachedError; } // Operation not started yet if (!status) { await checkpoint.checkpoint(stepId, { Action: START, // ... operation-specific options }); } // Mark as idle (not awaited yet) checkpoint.markOperationState( stepId, OperationLifecycleState.IDLE_NOT_AWAITED, { endTimestamp: stepData.ScheduledEndTimestamp, // for wait // no endTimestamp for callback/invoke }, ); return; // Phase 1 completes without waiting })(); phase1Promise.catch(() => {}); ``` **Phase 2: Wait for Completion** ```typescript return new DurablePromise(async () => { await phase1Promise; // Wait for phase 1 while (true) { const status = context.getStepData(stepId)?.Status; if (status === SUCCEEDED) { checkpoint.markOperationState(stepId, OperationLifecycleState.COMPLETED); return cachedResult; } if (status === FAILED || status === TIMED_OUT) { checkpoint.markOperationState(stepId, OperationLifecycleState.COMPLETED); throw cachedError; } // Transition to IDLE_AWAITED and wait for status change checkpoint.markOperationState( stepId, OperationLifecycleState.IDLE_AWAITED, { endTimestamp: stepData.ScheduledEndTimestamp, }, ); await checkpoint.waitForStatusChange(stepId); // Status changed, loop to check new status } }); ``` ## Enhanced Checkpoint Interface ```typescript interface OperationMetadata { stepId: string; name?: string; type: OperationType; subType: OperationSubType; parentId?: string; } interface Checkpoint { // ===== Existing Methods (Persistence) ===== checkpoint(stepId: string, data: Partial<OperationUpdate>): Promise<void>; forceCheckpoint?(): Promise<void>; force?(): Promise<void>; setTerminating?(): void; hasPendingAncestorCompletion?(stepId: string): boolean; waitForQueueCompletion(): Promise<void>; // ===== New Methods (Lifecycle & Termination) ===== // Single method to update operation state markOperationState( stepId: string, state: OperationLifecycleState, options?: { metadata?: OperationMetadata; // Required on first call (EXECUTING state) endTimestamp?: Date; // For RETRY_WAITING, IDLE_NOT_AWAITED, IDLE_AWAITED }, ): void; // Waiting operations waitForRetryTimer(stepId: string): Promise<void>; waitForStatusChange(stepId: string): Promise<void>; // Mark operation as awaited (IDLE_NOT_AWAITED → IDLE_AWAITED) markOperationAwaited(stepId: string): void; // Query getOperationState(stepId: string): OperationLifecycleState | undefined; getAllOperations(): Map<string, OperationInfo>; // Cleanup (internal, called automatically) // - cleanupOperation(stepId): Clean up single operation // - cleanupAllOperations(): Clean up all operations (during termination) } interface OperationInfo { stepId: string; state: OperationLifecycleState; metadata: OperationMetadata; endTimestamp?: Date; timer?: NodeJS.Timeout; resolver?: () => void; } ``` **Note:** The `checkAndTerminate()` method is internal to the Checkpoint implementation and called automatically when operation states change. ## Implementation Details The existing `CheckpointManager` class will be enhanced to include operation lifecycle tracking and termination logic. ### State Transitions ``` STARTED ↓ EXECUTING ←──────────┐ ↓ │ ├─→ RETRY_WAITING ─┘ (retry loop in phase 1) ├─→ IDLE_NOT_AWAITED (phase 1 complete, not awaited) │ ↓ │ IDLE_AWAITED (phase 2, awaited) ↓ COMPLETED (cleanup triggered) ```` ### State Transitions ``` STARTED ↓ EXECUTING ←──────────┐ ↓ │ ├─→ RETRY_WAITING ─┘ (retry loop in phase 1) ├─→ IDLE_NOT_AWAITED (phase 1 complete, not awaited) │ ↓ │ IDLE_AWAITED (phase 2, awaited) ↓ COMPLETED ``` ### Timer Management **Key Principle: Backend Controls Status Changes** All status changes come from the backend. The checkpoint manager's role is to: 1. **Wait for the appropriate time** (timer expiry or polling interval) 2. **Call `forceCheckpoint()`** to refresh state from backend 3. **Check if status changed** for the operation 4. **Resolve the promise** if status changed, otherwise **poll again in 5 seconds** **Unified Logic for All Operations:** - Operations with timestamp (retry, wait, waitForCondition): Wait until timestamp, then poll every 5s - Operations without timestamp (callback, invoke): Start polling immediately (now + 1s), then every 5s **Flow Diagram:** ``` Handler calls waitForRetryTimer(stepId) or waitForStatusChange(stepId) ↓ Checkpoint starts timer: - If endTimestamp exists: wait until endTimestamp - If no endTimestamp: wait 1 second (immediate polling) ↓ Timer expires ↓ Checkpoint calls forceCheckpoint() ← Calls backend API ↓ Backend returns updated execution state ↓ Checkpoint updates stepData from response ↓ Checkpoint checks: did status change for stepId? ↓ ├─ YES → Resolve promise, handler continues └─ NO → Schedule another force refresh in 5 seconds, repeat ``` ## Systems Being Removed The centralized design eliminates redundant tracking systems: ### `runningOperations` (DurableContext) **Current Purpose:** Tracks operations executing user code (per-context Set) **Replacement:** Operation state tracking with `EXECUTING` state ### `activeOperationsTracker` (ExecutionContext) **Current Purpose:** Tracks in-flight checkpoint operations (global counter) **Replacement:** Checkpoint queue status checks ### `waitBeforeContinue()` (Utility Function) **Current Purpose:** Waits for multiple conditions (operations complete, status change, timer expiry, awaited change) **Replacement:** Checkpoint methods (`waitForStatusChange`, `waitForRetryTimer`) ### `terminate()` (Helper Function) **Current Purpose:** Defers termination until checkpoint operations complete, then calls `terminationManager.terminate()` **Replacement:** Checkpoint automatic termination decision
1 parent 4216357 commit fafb936

File tree

52 files changed

+4687
-6029
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+4687
-6029
lines changed

package-lock.json

Lines changed: 0 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/aws-durable-execution-sdk-js-examples/scripts/generate-sam-template.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ const EXAMPLE_CONFIGS = {
1717
},
1818
],
1919
},
20+
"wait-for-callback-submitter-retry-success": {
21+
memorySize: 128,
22+
timeout: 120,
23+
policies: [],
24+
},
2025
};
2126

2227
// Default configuration for Lambda functions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import { handler } from "./force-checkpointing-callback";
2+
import { createTests } from "../../../utils/test-helper";
3+
import {
4+
ExecutionStatus,
5+
InvocationType,
6+
WaitingOperationStatus,
7+
} from "@aws/durable-execution-sdk-js-testing";
8+
9+
createTests({
10+
name: "force-checkpointing-callback",
11+
functionName: "force-checkpointing-callback",
12+
handler,
13+
invocationType: InvocationType.Event,
14+
tests: (runner) => {
15+
it("should complete with force checkpointing when one branch blocks termination with multiple callbacks", async () => {
16+
const startTime = Date.now();
17+
18+
const callback1 = runner.getOperation("callback-1");
19+
const callback2 = runner.getOperation("callback-2");
20+
const callback3 = runner.getOperation("callback-3");
21+
22+
const executionPromise = runner.run();
23+
24+
// Wait for first callback to start and send success
25+
await callback1.waitForData(WaitingOperationStatus.STARTED);
26+
await callback1.sendCallbackSuccess("callback-1-done");
27+
28+
// Wait for second callback to start and send success
29+
await callback2.waitForData(WaitingOperationStatus.STARTED);
30+
await callback2.sendCallbackSuccess("callback-2-done");
31+
32+
// Wait for third callback to start and send success
33+
await callback3.waitForData(WaitingOperationStatus.STARTED);
34+
await callback3.sendCallbackSuccess("callback-3-done");
35+
36+
const execution = await executionPromise;
37+
38+
const duration = Date.now() - startTime;
39+
40+
// Verify the result
41+
expect(execution.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
42+
const result = JSON.parse(execution.getResult() as string);
43+
expect(result.completionReason).toBe("ALL_COMPLETED");
44+
expect(result.all).toHaveLength(2);
45+
expect(result.all[0].result).toBe("long-complete");
46+
expect(result.all[1].result).toBe("callbacks-complete");
47+
48+
// Should complete in less than 15 seconds
49+
// (10s for long-running step + time for callbacks)
50+
expect(duration).toBeLessThan(15000);
51+
52+
// Should complete in a single invocation
53+
// The long-running step prevents termination, so the callback operations
54+
// use force checkpoint to get status updates without terminating
55+
const invocations = execution.getInvocations();
56+
expect(invocations).toHaveLength(1);
57+
58+
// Verify operations were tracked
59+
const operations = execution.getOperations();
60+
expect(operations.length).toBeGreaterThan(0);
61+
}, 20000); // 20 second timeout
62+
},
63+
});
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import {
2+
withDurableExecution,
3+
DurableContext,
4+
} from "@aws/durable-execution-sdk-js";
5+
import { ExampleConfig } from "../../../types";
6+
7+
export const config: ExampleConfig = {
8+
name: "Force Checkpointing - Callback",
9+
description:
10+
"Demonstrates force checkpoint polling when a long-running operation blocks termination while another branch performs multiple sequential callbacks",
11+
};
12+
13+
export const handler = withDurableExecution(
14+
async (_event, ctx: DurableContext): Promise<string> => {
15+
const results = await ctx.parallel([
16+
// Branch 1: Long-running operation that blocks termination
17+
async (branchCtx: DurableContext) => {
18+
return await branchCtx.step("long-running-step", async () => {
19+
await new Promise((resolve) => setTimeout(resolve, 10000));
20+
return "long-complete";
21+
});
22+
},
23+
// Branch 2: Multiple sequential callbacks that need force checkpoint
24+
async (branchCtx: DurableContext) => {
25+
const [callback1Promise] = await branchCtx.createCallback("callback-1");
26+
await callback1Promise;
27+
28+
const [callback2Promise] = await branchCtx.createCallback("callback-2");
29+
await callback2Promise;
30+
31+
const [callback3Promise] = await branchCtx.createCallback("callback-3");
32+
await callback3Promise;
33+
34+
return "callbacks-complete";
35+
},
36+
]);
37+
38+
return JSON.stringify(results);
39+
},
40+
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import { handler } from "./force-checkpointing-invoke";
2+
import { createTests } from "../../../utils/test-helper";
3+
import {
4+
ExecutionStatus,
5+
LocalDurableTestRunner,
6+
} from "@aws/durable-execution-sdk-js-testing";
7+
import { handler as waitHandler } from "../../wait/basic/wait";
8+
import { handler as stepHandler } from "../../step/basic/step-basic";
9+
10+
createTests({
11+
name: "force-checkpointing-invoke",
12+
functionName: "force-checkpointing-invoke",
13+
handler,
14+
tests: (runner, _, functionNameMap) => {
15+
it("should complete with force checkpointing when one branch blocks termination with multiple invokes", async () => {
16+
// Register the invoked functions for local testing
17+
if (runner instanceof LocalDurableTestRunner) {
18+
runner.registerDurableFunction(
19+
functionNameMap.getFunctionName("wait"),
20+
waitHandler,
21+
);
22+
runner.registerDurableFunction(
23+
functionNameMap.getFunctionName("step-basic"),
24+
stepHandler,
25+
);
26+
}
27+
28+
const startTime = Date.now();
29+
30+
const execution = await runner.run({
31+
payload: {
32+
functionNames: [
33+
functionNameMap.getFunctionName("wait"),
34+
functionNameMap.getFunctionName("step-basic"),
35+
functionNameMap.getFunctionName("wait"),
36+
],
37+
},
38+
});
39+
40+
const duration = Date.now() - startTime;
41+
42+
// Verify the result
43+
expect(execution.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
44+
const result = JSON.parse(execution.getResult() as string);
45+
expect(result.completionReason).toBe("ALL_COMPLETED");
46+
expect(result.all).toHaveLength(2);
47+
expect(result.all[0].result).toBe("long-complete");
48+
expect(result.all[1].result).toBe("invokes-complete");
49+
50+
// Should complete in less than 25 seconds
51+
// (20s for long-running step + time for invokes)
52+
expect(duration).toBeLessThan(25000);
53+
54+
// Should complete in a single invocation
55+
// The long-running step prevents termination, so the invoke operations
56+
// use force checkpoint to get status updates without terminating
57+
const invocations = execution.getInvocations();
58+
expect(invocations).toHaveLength(1);
59+
60+
// Verify operations were tracked
61+
const operations = execution.getOperations();
62+
expect(operations.length).toBeGreaterThan(0);
63+
}, 30000); // 30 second timeout
64+
},
65+
});
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import {
2+
withDurableExecution,
3+
DurableContext,
4+
} from "@aws/durable-execution-sdk-js";
5+
import { ExampleConfig } from "../../../types";
6+
7+
export const config: ExampleConfig = {
8+
name: "Force Checkpointing - Invoke",
9+
description:
10+
"Demonstrates force checkpoint polling when a long-running operation blocks termination while another branch performs multiple sequential invokes",
11+
};
12+
13+
export const handler = withDurableExecution(
14+
async (
15+
event: { functionNames: string[] },
16+
ctx: DurableContext,
17+
): Promise<string> => {
18+
const results = await ctx.parallel([
19+
// Branch 1: Long-running operation that blocks termination
20+
async (branchCtx: DurableContext) => {
21+
return await branchCtx.step("long-running-step", async () => {
22+
await new Promise((resolve) => setTimeout(resolve, 20000));
23+
return "long-complete";
24+
});
25+
},
26+
// Branch 2: Multiple sequential invokes that need force checkpoint
27+
async (branchCtx: DurableContext) => {
28+
await branchCtx.invoke(event.functionNames[0], { input: "data-1" });
29+
await branchCtx.invoke(event.functionNames[1], { input: "data-2" });
30+
await branchCtx.invoke(event.functionNames[2], { input: "data-3" });
31+
return "invokes-complete";
32+
},
33+
]);
34+
35+
return JSON.stringify(results);
36+
},
37+
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import { handler } from "./force-checkpointing-multiple-wait";
2+
import { createTests } from "../../../utils/test-helper";
3+
import { ExecutionStatus } from "@aws/durable-execution-sdk-js-testing";
4+
5+
createTests({
6+
name: "force-checkpointing-multiple-wait",
7+
functionName: "force-checkpointing-multiple-wait",
8+
handler,
9+
tests: (runner) => {
10+
it("should complete with force checkpointing when one branch blocks termination with multiple waits", async () => {
11+
const startTime = Date.now();
12+
13+
const execution = await runner.run();
14+
15+
const duration = Date.now() - startTime;
16+
17+
// Verify the result
18+
expect(execution.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
19+
const result = JSON.parse(execution.getResult() as string);
20+
expect(result.completionReason).toBe("ALL_COMPLETED");
21+
expect(result.all).toHaveLength(2);
22+
expect(result.all[0].result).toBe("long-complete");
23+
expect(result.all[1].result).toBe("waits-complete");
24+
25+
// Should complete in less than 15 seconds
26+
// (10s for long-running step + ~5s for waits)
27+
expect(duration).toBeLessThan(15000);
28+
29+
// Should complete in a single invocation
30+
// The long-running step prevents termination, so the wait operations
31+
// use force checkpoint to get status updates without terminating
32+
const invocations = execution.getInvocations();
33+
expect(invocations).toHaveLength(1);
34+
35+
// Verify operations were tracked
36+
const operations = execution.getOperations();
37+
expect(operations.length).toBeGreaterThan(0);
38+
}, 20000); // 20 second timeout
39+
},
40+
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import {
2+
withDurableExecution,
3+
DurableContext,
4+
} from "@aws/durable-execution-sdk-js";
5+
import { ExampleConfig } from "../../../types";
6+
7+
export const config: ExampleConfig = {
8+
name: "Force Checkpointing - Multiple Wait",
9+
description:
10+
"Demonstrates force checkpoint polling when a long-running operation blocks termination while another branch performs multiple sequential waits",
11+
};
12+
13+
export const handler = withDurableExecution(
14+
async (_event, ctx: DurableContext): Promise<string> => {
15+
const results = await ctx.parallel([
16+
// Branch 1: Long-running operation that blocks termination
17+
async (branchCtx: DurableContext) => {
18+
return await branchCtx.step("long-running-step", async () => {
19+
await new Promise((resolve) => setTimeout(resolve, 10000));
20+
return "long-complete";
21+
});
22+
},
23+
// Branch 2: Multiple sequential waits that need force checkpoint
24+
async (branchCtx: DurableContext) => {
25+
await branchCtx.wait("wait-1", { seconds: 1 });
26+
await branchCtx.wait("wait-2", { seconds: 1 });
27+
await branchCtx.wait("wait-3", { seconds: 1 });
28+
await branchCtx.wait("wait-4", { seconds: 1 });
29+
await branchCtx.wait("wait-5", { seconds: 1 });
30+
return "waits-complete";
31+
},
32+
]);
33+
34+
return JSON.stringify(results);
35+
},
36+
);

0 commit comments

Comments
 (0)