Skip to content

Commit 0c8c2be

Browse files
authored
fix(sdk): fix waitForCallback determinism and serdes errors (#328)
*Issue #, if available:* #324 *Description of changes:* Fixing waitForCallback serdes not working properly. The serdes was only passed to the `createCallback` result, and `runInChildContext` had the default serdes (JSON stringify/parse). Before: - Any deserialization that happened in createCallback would be lost on replay, since runInChildContext had the default serdes - If the serdes passed into createCallback created a string that could not be serialized or deserialized into JSON, it would fail the execution - For example, if you deserialized into a structure that had a circular object, it would fail the execution (even though it's valid to do this) - waitForCallback and createCallback both required the serdes to include a `serialization` function, even though it was not used After: - Deserialization works both before and after replay - Now, createCallback is not passed a serdes, and and runInChildContext uses passThroughSerdes. Deserialization happens on the runInChildContext result - We can't pass serdes to runInChildContext since deserialization does not happen on the initial execution, only on replay. We want this deserialization to happen on the createCallback string result every time - Deserialization into circular structures will not fail the execution - waitForCallback and createCallback only use the `deserialization` function of serdes. By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent bc67710 commit 0c8c2be

File tree

10 files changed

+317
-123
lines changed

10 files changed

+317
-123
lines changed

packages/aws-durable-execution-sdk-js-examples/src/examples/wait-for-callback/serdes/wait-for-callback-serdes.test.ts

Lines changed: 27 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,36 +3,9 @@ import {
33
OperationStatus,
44
WaitingOperationStatus,
55
} from "@aws/durable-execution-sdk-js-testing";
6-
import { handler } from "./wait-for-callback-serdes";
6+
import { CustomData, handler } from "./wait-for-callback-serdes";
77
import { createTests } from "../../../utils/test-helper";
88

9-
// Define CustomData type to match handler
10-
interface CustomData {
11-
id: number;
12-
message: string;
13-
timestamp: Date;
14-
metadata: {
15-
version: string;
16-
processed: boolean;
17-
};
18-
}
19-
20-
// Custom serdes from handler (needed for test)
21-
const customSerdes = {
22-
serialize: async (
23-
data: CustomData | undefined,
24-
): Promise<string | undefined> => {
25-
if (data === undefined) return Promise.resolve(undefined);
26-
return Promise.resolve(
27-
JSON.stringify({
28-
...data,
29-
timestamp: data.timestamp.toISOString(),
30-
_serializedBy: "custom-serdes-v1",
31-
}),
32-
);
33-
},
34-
};
35-
369
createTests({
3710
name: "wait-for-callback-serdes test",
3811
functionName: "wait-for-callback-serdes",
@@ -46,29 +19,40 @@ createTests({
4619

4720
await callbackOperation.waitForData(WaitingOperationStatus.STARTED);
4821

49-
// Send data that requires custom serialization
50-
const testData: CustomData = {
51-
id: 42,
52-
message: "Hello Custom Serdes",
53-
timestamp: new Date("2025-06-15T12:30:45Z"),
54-
metadata: {
55-
version: "2.0.0",
56-
processed: true,
57-
},
58-
};
59-
6022
// Serialize the data using custom serdes for sending
61-
const serializedData = await customSerdes.serialize(testData);
62-
await callbackOperation.sendCallbackSuccess(serializedData!);
23+
await callbackOperation.sendCallbackSuccess(
24+
JSON.stringify({
25+
id: 42,
26+
message: "Hello Custom Serdes",
27+
timestamp: "2025-06-15T12:30:45Z",
28+
metadata: {
29+
version: "2.0.0",
30+
processed: false,
31+
},
32+
}),
33+
);
6334

6435
const result = await executionPromise;
6536

6637
expect(result.getResult()).toEqual(
6738
JSON.parse(
6839
// the result will always get stringified since it's the lambda response
6940
JSON.stringify({
70-
receivedData: testData,
71-
isDateObject: true,
41+
receivedData: {
42+
id: 42,
43+
message: "Hello Custom Serdes",
44+
timestamp: new Date("2025-06-15T12:30:45Z"),
45+
metadata: {
46+
version: "2.0.0",
47+
processed: true,
48+
},
49+
circular: undefined,
50+
} satisfies CustomData,
51+
isDateBeforeReplay: true,
52+
isDateAfterReplay: true,
53+
isSerdesProcessedBefore: true,
54+
isSerdesProcessedAfter: true,
55+
hasCircularReference: true,
7256
}),
7357
),
7458
);

packages/aws-durable-execution-sdk-js-examples/src/examples/wait-for-callback/serdes/wait-for-callback-serdes.ts

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,29 +10,18 @@ export const config: ExampleConfig = {
1010
"Demonstrates waitForCallback with custom serialization/deserialization",
1111
};
1212

13-
interface CustomData {
13+
export interface CustomData {
1414
id: number;
1515
message: string;
1616
timestamp: Date;
1717
metadata: {
1818
version: string;
1919
processed: boolean;
2020
};
21+
circular: CustomData | undefined;
2122
}
2223

2324
const customSerdes = {
24-
serialize: async (
25-
data: CustomData | undefined,
26-
): Promise<string | undefined> => {
27-
if (data === undefined) return Promise.resolve(undefined);
28-
return Promise.resolve(
29-
JSON.stringify({
30-
...data,
31-
timestamp: data.timestamp.toISOString(),
32-
_serializedBy: "custom-serdes-v1",
33-
}),
34-
);
35-
},
3625
deserialize: async (
3726
str: string | undefined,
3827
): Promise<CustomData | undefined> => {
@@ -45,20 +34,29 @@ const customSerdes = {
4534
version: string;
4635
processed: boolean;
4736
};
48-
_serializedBy: string;
4937
};
50-
return Promise.resolve({
38+
const result: CustomData = {
5139
id: parsed.id,
5240
message: parsed.message,
5341
timestamp: new Date(parsed.timestamp),
54-
metadata: parsed.metadata,
55-
});
42+
metadata: {
43+
...parsed.metadata,
44+
// Set to true by serdes, but always false in the callback result
45+
processed: true,
46+
},
47+
circular: undefined,
48+
};
49+
50+
// Deserializing into a circular reference should cause no issues
51+
result.circular = result;
52+
53+
return result;
5654
},
5755
};
5856

5957
export const handler = withDurableExecution(
6058
async (event: unknown, context: DurableContext) => {
61-
const result = await context.waitForCallback<CustomData>(
59+
const result = await context.waitForCallback(
6260
"custom-serdes-callback",
6361
async () => {
6462
// Submitter succeeds
@@ -70,9 +68,28 @@ export const handler = withDurableExecution(
7068
},
7169
);
7270

71+
const isSerdesProcessedBefore = await context.step(() =>
72+
Promise.resolve(result.metadata.processed),
73+
);
74+
75+
const isDateBeforeReplay = await context.step(() =>
76+
Promise.resolve(result.timestamp instanceof Date),
77+
);
78+
79+
await context.wait({ seconds: 1 });
80+
81+
const hasCircularReference = result.circular === result;
82+
83+
// Don't return the circular result to avoid result serialization issues
84+
delete result.circular;
85+
7386
return {
7487
receivedData: result,
75-
isDateObject: result.timestamp instanceof Date,
88+
hasCircularReference,
89+
isDateAfterReplay: result.timestamp instanceof Date,
90+
isDateBeforeReplay: isDateBeforeReplay,
91+
isSerdesProcessedBefore: isSerdesProcessedBefore,
92+
isSerdesProcessedAfter: result.metadata.processed,
7693
};
7794
},
7895
);

packages/aws-durable-execution-sdk-js/src/context/durable-context/durable-context.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,7 @@ export class DurableContextImpl<Logger extends DurableLogger>
438438
return this.withDurableModeManagement(() => {
439439
const waitForCallbackHandler = createWaitForCallbackHandler(
440440
this.executionContext,
441+
this.getNextStepId.bind(this),
441442
this.runInChildContext.bind(this),
442443
);
443444
return waitForCallbackHandler(

packages/aws-durable-execution-sdk-js/src/handlers/callback-handler/callback-promise.test.ts

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@ jest.mock("../../utils/wait-before-continue/wait-before-continue");
1111
jest.mock("../../errors/serdes-errors/serdes-errors");
1212
jest.mock("../../utils/logger/logger");
1313

14-
const mockWaitBeforeContinue = waitBeforeContinue as jest.MockedFunction<typeof waitBeforeContinue>;
15-
const mockSafeDeserialize = safeDeserialize as jest.MockedFunction<typeof safeDeserialize>;
14+
const mockWaitBeforeContinue = waitBeforeContinue as jest.MockedFunction<
15+
typeof waitBeforeContinue
16+
>;
17+
const mockSafeDeserialize = safeDeserialize as jest.MockedFunction<
18+
typeof safeDeserialize
19+
>;
1620

1721
describe("createCallbackPromise", () => {
1822
let mockContext: ExecutionContext;
@@ -21,7 +25,7 @@ describe("createCallbackPromise", () => {
2125

2226
beforeEach(() => {
2327
jest.clearAllMocks();
24-
28+
2529
mockContext = {
2630
getStepData: jest.fn(),
2731
terminationManager: {
@@ -36,12 +40,13 @@ describe("createCallbackPromise", () => {
3640

3741
describe("uncovered scenarios", () => {
3842
it("should handle no step data with running operations", async () => {
39-
const hasRunningOperations = jest.fn()
40-
.mockReturnValueOnce(true) // First check: has running operations
43+
const hasRunningOperations = jest
44+
.fn()
45+
.mockReturnValueOnce(true) // First check: has running operations
4146
.mockReturnValueOnce(false); // After wait: no running operations
4247

4348
(mockContext.getStepData as jest.Mock)
44-
.mockReturnValueOnce(null) // First call: no step data
49+
.mockReturnValueOnce(null) // First call: no step data
4550
.mockReturnValueOnce(null); // Second call: still no step data
4651

4752
// Mock termination manager to actually terminate
@@ -56,7 +61,7 @@ describe("createCallbackPromise", () => {
5661
mockContext,
5762
"test-step-id",
5863
"test-step",
59-
{ serialize: jest.fn(), deserialize: jest.fn() },
64+
{ deserialize: jest.fn() },
6065
hasRunningOperations,
6166
mockOperationsEmitter,
6267
"test termination message",
@@ -67,7 +72,7 @@ describe("createCallbackPromise", () => {
6772
expect(mockWaitBeforeContinue).toHaveBeenCalled();
6873
expect(mockTerminate).toHaveBeenCalledWith({
6974
message: "test termination message",
70-
reason: "CALLBACK_PENDING"
75+
reason: "CALLBACK_PENDING",
7176
});
7277
});
7378

@@ -88,15 +93,17 @@ describe("createCallbackPromise", () => {
8893
mockContext,
8994
"test-step-id",
9095
"test-step",
91-
{ serialize: jest.fn(), deserialize: jest.fn() },
96+
{ deserialize: jest.fn() },
9297
hasRunningOperations,
9398
mockOperationsEmitter,
9499
"test termination message",
95100
mockCheckAndUpdateReplayMode,
96101
);
97102

98103
await expect(promise).rejects.toThrow(CallbackError);
99-
await expect(promise).rejects.toThrow("No callback ID found for completed callback: test-step-id");
104+
await expect(promise).rejects.toThrow(
105+
"No callback ID found for completed callback: test-step-id",
106+
);
100107
});
101108

102109
it("should handle succeeded callback with deserialization", async () => {
@@ -161,15 +168,15 @@ describe("createCallbackPromise", () => {
161168
mockContext,
162169
"test-step-id",
163170
"test-step",
164-
{ serialize: jest.fn(), deserialize: jest.fn() },
171+
{ deserialize: jest.fn() },
165172
hasRunningOperations,
166173
mockOperationsEmitter,
167174
"test termination message",
168175
mockCheckAndUpdateReplayMode,
169176
);
170177

171178
await expect(promise).rejects.toThrow(CallbackError);
172-
179+
173180
try {
174181
await promise;
175182
} catch (error) {
@@ -196,7 +203,7 @@ describe("createCallbackPromise", () => {
196203
mockContext,
197204
"test-step-id",
198205
"test-step",
199-
{ serialize: jest.fn(), deserialize: jest.fn() },
206+
{ deserialize: jest.fn() },
200207
hasRunningOperations,
201208
mockOperationsEmitter,
202209
"test termination message",
@@ -221,7 +228,7 @@ describe("createCallbackPromise", () => {
221228
mockContext,
222229
"test-step-id",
223230
"test-step",
224-
{ serialize: jest.fn(), deserialize: jest.fn() },
231+
{ deserialize: jest.fn() },
225232
hasRunningOperations,
226233
mockOperationsEmitter,
227234
"test termination message",
@@ -246,15 +253,17 @@ describe("createCallbackPromise", () => {
246253
mockContext,
247254
"test-step-id",
248255
"test-step",
249-
{ serialize: jest.fn(), deserialize: jest.fn() },
256+
{ deserialize: jest.fn() },
250257
hasRunningOperations,
251258
mockOperationsEmitter,
252259
"test termination message",
253260
mockCheckAndUpdateReplayMode,
254261
);
255262

256263
await expect(promise).rejects.toThrow(CallbackError);
257-
await expect(promise).rejects.toThrow("Unexpected callback status: UNKNOWN_STATUS");
264+
await expect(promise).rejects.toThrow(
265+
"Unexpected callback status: UNKNOWN_STATUS",
266+
);
258267
});
259268
});
260269
});

packages/aws-durable-execution-sdk-js/src/handlers/callback-handler/callback-promise.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export const createCallbackPromise = <T>(
1313
context: ExecutionContext,
1414
stepId: string,
1515
stepName: string | undefined,
16-
serdes: Serdes<T>,
16+
serdes: Omit<Serdes<T>, "serialize">,
1717
hasRunningOperations: () => boolean,
1818
operationsEmitter: EventEmitter,
1919
terminationMessage: string,

packages/aws-durable-execution-sdk-js/src/handlers/callback-handler/callback.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import { validateReplayConsistency } from "../../utils/replay-validation/replay-
1616
import { durationToSeconds } from "../../utils/duration/duration";
1717
import { createCallbackPromise } from "./callback-promise";
1818

19-
const createPassThroughSerdes = <T>(): Serdes<T> => ({
19+
export const createPassThroughSerdes = <T>(): Serdes<T> => ({
2020
serialize: async (value: T | undefined) => value as string | undefined,
2121
deserialize: async (data: string | undefined) => data as T | undefined,
2222
});

packages/aws-durable-execution-sdk-js/src/handlers/invoke-handler/invoke-handler.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ describe("InvokeHandler", () => {
343343
"test-arn",
344344
);
345345

346-
expect(mockCheckpointFn).toHaveBeenCalledWith("test-step-1", {
346+
expect(mockCheckpointFn.checkpoint).toHaveBeenCalledWith("test-step-1", {
347347
Id: "test-step-1",
348348
ParentId: "parent-123",
349349
Action: OperationAction.START,
@@ -402,7 +402,7 @@ describe("InvokeHandler", () => {
402402
"test-arn",
403403
);
404404

405-
expect(mockCheckpointFn).toHaveBeenCalledWith("test-step-1", {
405+
expect(mockCheckpointFn.checkpoint).toHaveBeenCalledWith("test-step-1", {
406406
Id: "test-step-1",
407407
ParentId: "parent-123",
408408
Action: OperationAction.START,

0 commit comments

Comments
 (0)