Skip to content

Commit e109c9c

Browse files
authored
feat(sdk): migrate remaining handlers to DurablePromise two-phase pattern (#307)
- Convert concurrent-execution-handler to return DurablePromise with immediate execution - Update map-handler and parallel-handler to use DurablePromise delegation pattern - Move validation logic inside DurablePromise execution for consistent async error handling - Update DurableContext type definitions to return DurablePromise<BatchResult<T>> - Fix validation tests to expect async rejections instead of synchronous throws - Simplify handlers by removing intermediate result variables - Directly await phase1Promise in DurablePromise executor - Update DurableContext interface to reflect DurablePromise return types - Add catch handler to step-handler phase1Promise to prevent unhandled rejections - Improve promise combinator error handling with proper promise conversion - Complete two-phase execution pattern for all concurrent handlers
1 parent 8d74dd2 commit e109c9c

File tree

14 files changed

+647
-326
lines changed

14 files changed

+647
-326
lines changed

packages/aws-durable-execution-sdk-js/src/context/durable-context/durable-context.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -427,13 +427,13 @@ export class DurableContextImpl implements DurableContext {
427427
itemsOrMapFunc: TInput[] | MapFunc<TInput, TOutput>,
428428
mapFuncOrConfig?: MapFunc<TInput, TOutput> | MapConfig<TInput, TOutput>,
429429
maybeConfig?: MapConfig<TInput, TOutput>,
430-
): Promise<BatchResult<TOutput>> {
430+
): DurablePromise<BatchResult<TOutput>> {
431431
validateContextUsage(
432432
this._stepPrefix,
433433
"map",
434434
this.executionContext.terminationManager,
435435
);
436-
return this.withModeManagement(() => {
436+
return this.withDurableModeManagement(() => {
437437
const mapHandler = createMapHandler(
438438
this.executionContext,
439439
this._executeConcurrently.bind(this),
@@ -456,13 +456,13 @@ export class DurableContextImpl implements DurableContext {
456456
| (ParallelFunc<T> | NamedParallelBranch<T>)[]
457457
| ParallelConfig<T>,
458458
maybeConfig?: ParallelConfig<T>,
459-
): Promise<BatchResult<T>> {
459+
): DurablePromise<BatchResult<T>> {
460460
validateContextUsage(
461461
this._stepPrefix,
462462
"parallel",
463463
this.executionContext.terminationManager,
464464
);
465-
return this.withModeManagement(() => {
465+
return this.withDurableModeManagement(() => {
466466
const parallelHandler = createParallelHandler(
467467
this.executionContext,
468468
this._executeConcurrently.bind(this),
@@ -480,13 +480,13 @@ export class DurableContextImpl implements DurableContext {
480480
| ConcurrentExecutor<TItem, TResult>
481481
| ConcurrencyConfig<TResult>,
482482
maybeConfig?: ConcurrencyConfig<TResult>,
483-
): Promise<BatchResult<TResult>> {
483+
): DurablePromise<BatchResult<TResult>> {
484484
validateContextUsage(
485485
this._stepPrefix,
486486
"_executeConcurrently",
487487
this.executionContext.terminationManager,
488488
);
489-
return this.withModeManagement(() => {
489+
return this.withDurableModeManagement(() => {
490490
const concurrentExecutionHandler = createConcurrentExecutionHandler(
491491
this.executionContext,
492492
this.runInChildContext.bind(this),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import { createConcurrentExecutionHandler } from "./concurrent-execution-handler";
2+
import { ExecutionContext, ConcurrentExecutionItem, ConcurrentExecutor } from "../../types";
3+
import { DurablePromise } from "../../types/durable-promise";
4+
5+
describe("Concurrent Execution Handler Two-Phase Execution", () => {
6+
let mockContext: ExecutionContext;
7+
let mockRunInChildContext: jest.Mock;
8+
let mockStep: jest.Mock;
9+
let executionStarted = false;
10+
11+
beforeEach(() => {
12+
executionStarted = false;
13+
mockContext = {
14+
getStepData: jest.fn().mockReturnValue(null),
15+
durableExecutionArn: "test-arn",
16+
terminationManager: {
17+
shouldTerminate: jest.fn().mockReturnValue(false),
18+
terminate: jest.fn(),
19+
},
20+
} as any;
21+
22+
// Mock runInChildContext to track when execution starts
23+
mockRunInChildContext = jest.fn().mockImplementation(async (name, fn, config) => {
24+
executionStarted = true;
25+
// Create a mock child context with runInChildContext method
26+
const mockChildContext = {
27+
runInChildContext: jest.fn().mockImplementation(async (childName, childFn) => {
28+
return await childFn({} as any);
29+
}),
30+
};
31+
return await fn(mockChildContext);
32+
});
33+
34+
mockStep = jest.fn().mockImplementation(async (name, fn) => {
35+
return await fn();
36+
});
37+
});
38+
39+
it("should start execution in phase 1 immediately (before await)", async () => {
40+
const concurrentHandler = createConcurrentExecutionHandler(
41+
mockContext,
42+
mockRunInChildContext,
43+
mockStep,
44+
);
45+
46+
const items: ConcurrentExecutionItem<string>[] = [
47+
{ id: "item1", data: "test1", index: 0 },
48+
];
49+
50+
const executor: ConcurrentExecutor<string, string> = jest
51+
.fn()
52+
.mockResolvedValue("processed");
53+
54+
// Phase 1: Create the promise - this should start execution immediately
55+
const concurrentPromise = concurrentHandler(
56+
"test-concurrent",
57+
items,
58+
executor,
59+
);
60+
61+
// Should return a DurablePromise
62+
expect(concurrentPromise).toBeInstanceOf(DurablePromise);
63+
64+
// Wait briefly for phase 1 to start executing
65+
await new Promise((resolve) => setTimeout(resolve, 10));
66+
67+
// Phase 1 should have started execution (before we await the promise)
68+
expect(executionStarted).toBe(true);
69+
70+
// Now await the promise to verify it completes
71+
const result = await concurrentPromise;
72+
expect(result).toBeDefined();
73+
});
74+
75+
it("should mark promise as executed only when awaited", async () => {
76+
const concurrentHandler = createConcurrentExecutionHandler(
77+
mockContext,
78+
mockRunInChildContext,
79+
mockStep,
80+
);
81+
82+
const items: ConcurrentExecutionItem<string>[] = [
83+
{ id: "item1", data: "test1", index: 0 },
84+
];
85+
86+
const executor: ConcurrentExecutor<string, string> = jest
87+
.fn()
88+
.mockResolvedValue("result");
89+
90+
// Phase 1: Create the promise
91+
const concurrentPromise = concurrentHandler(
92+
"test-concurrent",
93+
items,
94+
executor,
95+
);
96+
97+
// Wait for phase 1 to complete
98+
await new Promise((resolve) => setTimeout(resolve, 50));
99+
100+
// Promise should not be marked as executed yet (not awaited)
101+
expect((concurrentPromise as DurablePromise<any>).isExecuted).toBe(false);
102+
103+
// Phase 2: Await the promise
104+
await concurrentPromise;
105+
106+
// Now it should be marked as executed
107+
expect((concurrentPromise as DurablePromise<any>).isExecuted).toBe(true);
108+
});
109+
});

packages/aws-durable-execution-sdk-js/src/handlers/concurrent-execution-handler/concurrent-execution-handler.ts

Lines changed: 91 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
ConcurrentExecutor,
99
BatchResult,
1010
BatchItem,
11+
DurablePromise,
1112
} from "../../types";
1213
import { OperationStatus } from "@aws-sdk/client-lambda";
1314
import { log } from "../../utils/logger/logger";
@@ -424,7 +425,7 @@ export const createConcurrentExecutionHandler = (
424425
runInChildContext: DurableContext["runInChildContext"],
425426
skipNextOperation: () => void,
426427
) => {
427-
return async <TItem, TResult>(
428+
return <TItem, TResult>(
428429
nameOrItems: string | undefined | ConcurrentExecutionItem<TItem>[],
429430
itemsOrExecutor?:
430431
| ConcurrentExecutionItem<TItem>[]
@@ -433,92 +434,95 @@ export const createConcurrentExecutionHandler = (
433434
| ConcurrentExecutor<TItem, TResult>
434435
| ConcurrencyConfig<TResult>,
435436
maybeConfig?: ConcurrencyConfig<TResult>,
436-
): Promise<BatchResult<TResult>> => {
437-
let name: string | undefined;
438-
let items: ConcurrentExecutionItem<TItem>[];
439-
let executor: ConcurrentExecutor<TItem, TResult>;
440-
let config: ConcurrencyConfig<TResult> | undefined;
441-
442-
if (typeof nameOrItems === "string" || nameOrItems === undefined) {
443-
name = nameOrItems;
444-
items = itemsOrExecutor as ConcurrentExecutionItem<TItem>[];
445-
executor = executorOrConfig as ConcurrentExecutor<TItem, TResult>;
446-
config = maybeConfig;
447-
} else {
448-
items = nameOrItems;
449-
executor = itemsOrExecutor as ConcurrentExecutor<TItem, TResult>;
450-
config = executorOrConfig as ConcurrencyConfig<TResult>;
451-
}
437+
): DurablePromise<BatchResult<TResult>> => {
438+
// Phase 1: Start execution immediately
439+
const phase1Promise = (async (): Promise<BatchResult<TResult>> => {
440+
let name: string | undefined;
441+
let items: ConcurrentExecutionItem<TItem>[];
442+
let executor: ConcurrentExecutor<TItem, TResult>;
443+
let config: ConcurrencyConfig<TResult> | undefined;
444+
445+
if (typeof nameOrItems === "string" || nameOrItems === undefined) {
446+
name = nameOrItems;
447+
items = itemsOrExecutor as ConcurrentExecutionItem<TItem>[];
448+
executor = executorOrConfig as ConcurrentExecutor<TItem, TResult>;
449+
config = maybeConfig;
450+
} else {
451+
items = nameOrItems;
452+
executor = itemsOrExecutor as ConcurrentExecutor<TItem, TResult>;
453+
config = executorOrConfig as ConcurrencyConfig<TResult>;
454+
}
452455

453-
log("🔄", "Starting concurrent execution:", {
454-
name,
455-
itemCount: items.length,
456-
maxConcurrency: config?.maxConcurrency,
457-
});
456+
log("🔄", "Starting concurrent execution:", {
457+
name,
458+
itemCount: items.length,
459+
maxConcurrency: config?.maxConcurrency,
460+
});
458461

459-
if (!Array.isArray(items)) {
460-
throw new Error("Concurrent execution requires an array of items");
461-
}
462+
if (!Array.isArray(items)) {
463+
throw new Error("Concurrent execution requires an array of items");
464+
}
462465

463-
if (typeof executor !== "function") {
464-
throw new Error("Concurrent execution requires an executor function");
465-
}
466+
if (typeof executor !== "function") {
467+
throw new Error("Concurrent execution requires an executor function");
468+
}
466469

467-
if (
468-
config?.maxConcurrency !== undefined &&
469-
config.maxConcurrency !== null &&
470-
config.maxConcurrency <= 0
471-
) {
472-
throw new Error(
473-
`Invalid maxConcurrency: ${config.maxConcurrency}. Must be a positive number or undefined for unlimited concurrency.`,
474-
);
475-
}
470+
if (
471+
config?.maxConcurrency !== undefined &&
472+
config.maxConcurrency !== null &&
473+
config.maxConcurrency <= 0
474+
) {
475+
throw new Error(
476+
`Invalid maxConcurrency: ${config.maxConcurrency}. Must be a positive number or undefined for unlimited concurrency.`,
477+
);
478+
}
476479

477-
const executeOperation = async (
478-
executionContext: DurableContext,
479-
): Promise<BatchResult<TResult>> => {
480-
const concurrencyController = new ConcurrencyController(
481-
"concurrent-execution",
482-
skipNextOperation,
483-
);
484-
485-
// Access durableExecutionMode from the context - it's set by runInChildContext
486-
// based on determineChildReplayMode logic
487-
const durableExecutionMode = (
488-
executionContext as unknown as {
489-
durableExecutionMode: DurableExecutionMode;
490-
}
491-
).durableExecutionMode;
480+
const executeOperation = async (
481+
executionContext: DurableContext,
482+
): Promise<BatchResult<TResult>> => {
483+
const concurrencyController = new ConcurrencyController(
484+
"concurrent-execution",
485+
skipNextOperation,
486+
);
492487

493-
// Get the entity ID (step prefix) from the child context
494-
const entityId = (
495-
executionContext as unknown as {
496-
_stepPrefix?: string;
497-
}
498-
)._stepPrefix;
488+
// Access durableExecutionMode from the context - it's set by runInChildContext
489+
// based on determineChildReplayMode logic
490+
const durableExecutionMode = (
491+
executionContext as unknown as {
492+
durableExecutionMode: DurableExecutionMode;
493+
}
494+
).durableExecutionMode;
499495

500-
log("🔄", "Concurrent execution mode:", {
501-
mode: durableExecutionMode,
502-
itemCount: items.length,
503-
entityId,
504-
});
496+
// Get the entity ID (step prefix) from the child context
497+
const entityId = (
498+
executionContext as unknown as {
499+
_stepPrefix?: string;
500+
}
501+
)._stepPrefix;
505502

506-
return await concurrencyController.executeItems(
507-
items,
508-
executor,
509-
executionContext,
510-
config || {},
511-
durableExecutionMode,
512-
entityId,
513-
context,
514-
);
515-
};
503+
log("🔄", "Concurrent execution mode:", {
504+
mode: durableExecutionMode,
505+
itemCount: items.length,
506+
entityId,
507+
});
508+
509+
return await concurrencyController.executeItems(
510+
items,
511+
executor,
512+
executionContext,
513+
config || {},
514+
durableExecutionMode,
515+
entityId,
516+
context,
517+
);
518+
};
519+
520+
const result = await runInChildContext(name, executeOperation, {
521+
subType: config?.topLevelSubType,
522+
summaryGenerator: config?.summaryGenerator,
523+
serdes: config?.serdes,
524+
});
516525

517-
return await runInChildContext(name, executeOperation, {
518-
subType: config?.topLevelSubType,
519-
summaryGenerator: config?.summaryGenerator,
520-
serdes: config?.serdes,
521-
}).then((result) => {
522526
// Restore BatchResult methods if the result came from deserialized data
523527
if (
524528
result &&
@@ -529,6 +533,15 @@ export const createConcurrentExecutionHandler = (
529533
return restoreBatchResult<TResult>(result);
530534
}
531535
return result as BatchResult<TResult>;
536+
})();
537+
538+
// Attach catch handler to prevent unhandled promise rejections
539+
// The error will still be thrown when the DurablePromise is awaited
540+
phase1Promise.catch(() => {});
541+
542+
// Phase 2: Return DurablePromise that returns Phase 1 result when awaited
543+
return new DurablePromise(async () => {
544+
return await phase1Promise;
532545
});
533546
};
534547
};

packages/aws-durable-execution-sdk-js/src/handlers/invoke-handler/invoke-handler.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,10 @@ export const createInvokeHandler = (
222222
throw error; // Re-throw to fail phase 1
223223
});
224224

225+
// Attach catch handler to prevent unhandled promise rejections
226+
// The error will still be thrown when the DurablePromise is awaited
227+
startInvokePromise.catch(() => {});
228+
225229
// Return DurablePromise that will execute phase 2 when awaited
226230
return new DurablePromise(async () => {
227231
// Wait for phase 1 to complete first

0 commit comments

Comments
 (0)