Skip to content

Commit ad6f952

Browse files
ParidelPooyaPooya Paridel
andcommitted
feat: replace ADAPTIVE_EMPTY_RESULT with ReplayChildren flag (#5)
# ReplayChildren Implementation for Run-in-Child-Context Handler This change replaces the previous `ADAPTIVE_EMPTY_RESULT` marker approach with `ReplayChildren` flag mechanism for handling large payloads in child context operations. ## Key Changes ### 1. **ChildConfig Interface Enhancement** - Added `summaryGenerator?: (result: T) => string` option - summaryGenerator Will be used internall to create a summary for ctx.map and ctx.parallel when result is big - **Note**: `summaryGenerator` receives the **raw result** (before serialization) ### 2. **Checkpoint Behavior Update** - **Before**: Used magic string `"__LARGE_PAYLOAD__"` as payload marker - **After**: Uses `ContextOptions: { ReplayChildren: true }` flag with empty string payload (or custom summary) ### 3. **Replay Detection Logic** - **Before**: `if (result === ADAPTIVE_EMPTY_RESULT)` - **After**: `if (stepData?.ContextDetails?.ReplayChildren)` ### 4. **Data Flow Details** - **Size Check**: Uses **serialized result** (after `serdes.serialize()`) to determine if payload > 256KB - **summaryGenerator**: Receives **raw result** (before serialization) for flexible summary creation ## Workflow Diagram ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Child Context Execution Flow │ └─────────────────────────────────────────────────────────────────────────────┘ ┌─────────────────┐ │ Execute Child │ │ Context │ └─────────┬───────┘ │ ┌─────────▼───────┐ │ Raw Result (T) │ │ │ └─────────┬───────┘ │ ┌─────────▼───────┐ │ Apply Serdes │ │ Serialization │ │ result → string │ └─────────┬───────┘ │ ┌─────────▼───────┐ │ Check Serialized│ │ Payload Size │ │ > 256KB? │ └─────┬───────┬───┘ │ │ NO │ │ YES │ │ ┌───────────────▼─┐ ┌─▼──────────────────┐ │ Normal │ │ Large Payload │ │ Checkpoint │ │ Handling │ │ │ │ │ │ Payload: │ │ ReplayChildren: │ │ serialized data │ │ true │ │ ReplayChildren: │ │ │ │ undefined │ │ │ └───────────────┬─┘ └─┬──────────────────┘ │ │ │ ┌───▼──────────────────┐ │ │ summaryGenerator │ │ │ provided? │ │ │ (gets RAW result) │ │ └───┬──────────┬───────┘ │ │ │ │ YES │ │ NO │ │ │ │ ┌───▼────┐ ┌───▼────┐ │ │Custom │ │Empty │ │ │Summary │ │String │ │ │from │ │"" │ │ │raw data│ │ │ │ └───┬────┘ └───┬────┘ │ │ │ └───────┼──────────┘ │ ┌─────────────▼─────────────┐ │ Checkpoint Saved │ └─────────────┬─────────────┘ │ ┌─────────────▼─────────────┐ │ On Replay... │ └─────────────┬─────────────┘ │ ┌─────────────▼─────────────┐ │ Check ReplayChildren │ │ flag in ContextDetails │ └─────┬───────────┬─────────┘ │ │ FALSE│ │TRUE │ │ ┌───────────────▼─┐ ┌─────▼──────────────┐ │ Return Cached │ │ Re-execute Child │ │ Result from │ │ Context Function │ │ Checkpoint │ │ (Reconstruct │ │ (deserialize) │ │ Full Result) │ └─────────────────┘ └────────────────────┘ ``` ## Data Flow Summary 1. **Execute** child context function → Raw result `T` 2. **Serialize** result using serdes → `string` 3. **Check size** of serialized string 4. **If large**: Call `summaryGenerator(rawResult)` → custom summary 5. **Checkpoint** with ReplayChildren flag and summary/empty payload 6. **On replay**: Check ReplayChildren flag to decide re-execution ### Checklist - [Y] I have filled out every section of the PR template - [Y] I have thoroughly tested this change ### Testing #### Unit Tests Yes Co-authored-by: Pooya Paridel <parpooya@amazon.com>
1 parent 30816d9 commit ad6f952

File tree

4 files changed

+86
-16
lines changed

4 files changed

+86
-16
lines changed

lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.test.ts

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,10 +202,14 @@ describe("Run In Child Context Handler", () => {
202202
TEST_CONSTANTS.CHILD_CONTEXT_ID,
203203
{
204204
Id: TEST_CONSTANTS.CHILD_CONTEXT_ID,
205+
ParentId: mockExecutionContext.parentId,
205206
Action: OperationAction.SUCCEED,
206207
SubType: OperationSubType.RUN_IN_CHILD_CONTEXT,
207208
Type: OperationType.CONTEXT,
208-
Payload: "__LARGE_PAYLOAD__",
209+
Payload: "",
210+
ContextOptions: {
211+
ReplayChildren: true,
212+
},
209213
Name: TEST_CONSTANTS.CHILD_CONTEXT_NAME,
210214
},
211215
);
@@ -807,4 +811,56 @@ describe("Mock Integration", () => {
807811
},
808812
);
809813
});
814+
815+
test("should use custom summaryGenerator for large payloads", async () => {
816+
const largePayload = { data: "x".repeat(300000) };
817+
const childFn = jest.fn().mockResolvedValue(largePayload);
818+
const summaryGenerator = jest.fn().mockReturnValue("Custom summary of large data");
819+
820+
await runInChildContextHandler(
821+
TEST_CONSTANTS.CHILD_CONTEXT_NAME,
822+
childFn,
823+
{ summaryGenerator },
824+
);
825+
826+
expect(summaryGenerator).toHaveBeenCalledWith(largePayload);
827+
expect(mockCheckpoint).toHaveBeenNthCalledWith(
828+
2,
829+
TEST_CONSTANTS.CHILD_CONTEXT_ID,
830+
{
831+
Id: TEST_CONSTANTS.CHILD_CONTEXT_ID,
832+
ParentId: mockExecutionContext.parentId,
833+
Action: OperationAction.SUCCEED,
834+
SubType: OperationSubType.RUN_IN_CHILD_CONTEXT,
835+
Type: OperationType.CONTEXT,
836+
Payload: "Custom summary of large data",
837+
ContextOptions: {
838+
ReplayChildren: true,
839+
},
840+
Name: TEST_CONSTANTS.CHILD_CONTEXT_NAME,
841+
},
842+
);
843+
});
844+
845+
test("should re-execute child context when ReplayChildren is true", async () => {
846+
const stepData = mockExecutionContext._stepData;
847+
stepData[hashId(TEST_CONSTANTS.CHILD_CONTEXT_ID)] = {
848+
Id: TEST_CONSTANTS.CHILD_CONTEXT_ID,
849+
Status: OperationStatus.SUCCEEDED,
850+
ContextDetails: {
851+
Result: "Summary of large payload",
852+
ReplayChildren: true,
853+
},
854+
} as any;
855+
856+
const childFn = jest.fn().mockResolvedValue("re-executed-result");
857+
858+
const result = await runInChildContextHandler(
859+
TEST_CONSTANTS.CHILD_CONTEXT_NAME,
860+
childFn,
861+
);
862+
863+
expect(result).toBe("re-executed-result");
864+
expect(childFn).toHaveBeenCalledTimes(1);
865+
});
810866
});

lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.ts

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@ import { createErrorObjectFromError } from "../../utils/error-object/error-objec
2424
// Checkpoint size limit in bytes (256KB)
2525
const CHECKPOINT_SIZE_LIMIT = 256 * 1024;
2626

27-
// Marker for adaptive mode when payload is too large
28-
const ADAPTIVE_EMPTY_RESULT = "__LARGE_PAYLOAD__";
29-
3027
export const createRunInChildContextHandler = (
3128
context: ExecutionContext,
3229
checkpoint: ReturnType<typeof createCheckpoint>,
@@ -91,14 +88,15 @@ export const handleCompletedChildContext = async <T>(
9188
options?: ChildConfig<T>,
9289
): Promise<T> => {
9390
const serdes = options?.serdes || defaultSerdes;
94-
const result = context.getStepData(entityId)?.ContextDetails?.Result;
91+
const stepData = context.getStepData(entityId);
92+
const result = stepData?.ContextDetails?.Result;
9593

96-
// For adaptive mode, if result is empty string, we need to re-execute
97-
if (result === ADAPTIVE_EMPTY_RESULT) {
94+
// Check if we need to replay children due to large payload
95+
if (stepData?.ContextDetails?.ReplayChildren) {
9896
log(
9997
context.isVerbose,
10098
"🔄",
101-
"Adaptive mode: Re-executing child context due to large payload:",
99+
"ReplayChildren mode: Re-executing child context due to large payload:",
102100
{ entityId, stepName },
103101
);
104102

@@ -182,22 +180,32 @@ export const executeChildContext = async <T>(
182180

183181
// Check if payload is too large for adaptive mode
184182
let payloadToCheckpoint = serializedResult;
183+
let replayChildren = false;
184+
185185
if (
186186
serializedResult &&
187187
Buffer.byteLength(serializedResult, "utf8") > CHECKPOINT_SIZE_LIMIT
188188
) {
189+
replayChildren = true;
190+
191+
// Use summary generator if provided, otherwise use empty string
192+
if (options?.summaryGenerator) {
193+
payloadToCheckpoint = options.summaryGenerator(result);
194+
} else {
195+
payloadToCheckpoint = "";
196+
}
197+
189198
log(
190199
context.isVerbose,
191200
"📦",
192-
"Adaptive mode: Payload exceeds limit, checkpointing empty string:",
201+
"Large payload detected, using ReplayChildren mode:",
193202
{
194203
entityId,
195204
name,
196205
payloadSize: Buffer.byteLength(serializedResult, "utf8"),
197206
limit: CHECKPOINT_SIZE_LIMIT,
198207
},
199208
);
200-
payloadToCheckpoint = ADAPTIVE_EMPTY_RESULT;
201209
}
202210

203211
const subType = options?.subType || OperationSubType.RUN_IN_CHILD_CONTEXT;
@@ -208,6 +216,7 @@ export const executeChildContext = async <T>(
208216
SubType: subType,
209217
Type: OperationType.CONTEXT,
210218
Payload: payloadToCheckpoint,
219+
ContextOptions: replayChildren ? { ReplayChildren: true } : undefined,
211220
Name: name,
212221
});
213222

lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-integration.test.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -298,22 +298,24 @@ describe("Run In Child Context Integration Tests", () => {
298298
expect(checkpointCalls[1].data.Updates[0].Name).toBe(
299299
"test-large-child-context",
300300
);
301-
expect(checkpointCalls[1].data.Updates[0].Payload).toBe(
302-
"__LARGE_PAYLOAD__",
303-
);
301+
expect(checkpointCalls[1].data.Updates[0].Payload).toBe("");
302+
expect(checkpointCalls[1].data.Updates[0].ContextOptions).toEqual({
303+
ReplayChildren: true,
304+
});
304305
});
305306

306-
test("should re-execute on replay when adaptive mode has empty result", async () => {
307+
test("should re-execute on replay when ReplayChildren is true", async () => {
307308
const largePayload = "x".repeat(300 * 1024); // 300KB string
308309
let executionCount = 0;
309310

310-
// Set up completed step data with empty result (simulating large payload checkpoint)
311+
// Set up completed step data with ReplayChildren flag
311312
mockExecutionContext._stepData = {
312313
[hashId("1")]: {
313314
Id: "1",
314315
Status: OperationStatus.SUCCEEDED,
315316
ContextDetails: {
316-
Result: "__LARGE_PAYLOAD__", // Empty string indicates large payload in adaptive mode
317+
Result: "[Large payload summary]",
318+
ReplayChildren: true, // This triggers re-execution
317319
},
318320
},
319321
};

lambda-durable-functions-sdk-js/src/types/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,9 @@ export interface StepConfig<T> {
187187
export interface ChildConfig<T = any> {
188188
serdes?: Serdes<T>;
189189
subType?: string;
190+
// summaryGenerator Will be used internall to create a summary for
191+
// ctx.map and ctx.parallel when result is big
192+
summaryGenerator?: (result: T) => string;
190193
}
191194

192195
export interface CreateCallbackConfig {

0 commit comments

Comments
 (0)