diff --git a/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.test.ts b/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.test.ts index 2cb41aac..2b4feb7f 100644 --- a/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.test.ts +++ b/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.test.ts @@ -202,10 +202,14 @@ describe("Run In Child Context Handler", () => { TEST_CONSTANTS.CHILD_CONTEXT_ID, { Id: TEST_CONSTANTS.CHILD_CONTEXT_ID, + ParentId: mockExecutionContext.parentId, Action: OperationAction.SUCCEED, SubType: OperationSubType.RUN_IN_CHILD_CONTEXT, Type: OperationType.CONTEXT, - Payload: "__LARGE_PAYLOAD__", + Payload: "", + ContextOptions: { + ReplayChildren: true, + }, Name: TEST_CONSTANTS.CHILD_CONTEXT_NAME, }, ); @@ -807,4 +811,56 @@ describe("Mock Integration", () => { }, ); }); + + test("should use custom summaryGenerator for large payloads", async () => { + const largePayload = { data: "x".repeat(300000) }; + const childFn = jest.fn().mockResolvedValue(largePayload); + const summaryGenerator = jest.fn().mockReturnValue("Custom summary of large data"); + + await runInChildContextHandler( + TEST_CONSTANTS.CHILD_CONTEXT_NAME, + childFn, + { summaryGenerator }, + ); + + expect(summaryGenerator).toHaveBeenCalledWith(largePayload); + expect(mockCheckpoint).toHaveBeenNthCalledWith( + 2, + TEST_CONSTANTS.CHILD_CONTEXT_ID, + { + Id: TEST_CONSTANTS.CHILD_CONTEXT_ID, + ParentId: mockExecutionContext.parentId, + Action: OperationAction.SUCCEED, + SubType: OperationSubType.RUN_IN_CHILD_CONTEXT, + Type: OperationType.CONTEXT, + Payload: "Custom summary of large data", + ContextOptions: { + ReplayChildren: true, + }, + Name: TEST_CONSTANTS.CHILD_CONTEXT_NAME, + }, + ); + }); + + test("should re-execute child context when ReplayChildren is true", async () => { + const stepData = mockExecutionContext._stepData; + stepData[hashId(TEST_CONSTANTS.CHILD_CONTEXT_ID)] = { + Id: TEST_CONSTANTS.CHILD_CONTEXT_ID, + Status: OperationStatus.SUCCEEDED, + ContextDetails: { + Result: "Summary of large payload", + ReplayChildren: true, + }, + } as any; + + const childFn = jest.fn().mockResolvedValue("re-executed-result"); + + const result = await runInChildContextHandler( + TEST_CONSTANTS.CHILD_CONTEXT_NAME, + childFn, + ); + + expect(result).toBe("re-executed-result"); + expect(childFn).toHaveBeenCalledTimes(1); + }); }); diff --git a/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.ts b/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.ts index 7de1e720..6fe56879 100644 --- a/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.ts +++ b/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.ts @@ -24,9 +24,6 @@ import { createErrorObjectFromError } from "../../utils/error-object/error-objec // Checkpoint size limit in bytes (256KB) const CHECKPOINT_SIZE_LIMIT = 256 * 1024; -// Marker for adaptive mode when payload is too large -const ADAPTIVE_EMPTY_RESULT = "__LARGE_PAYLOAD__"; - export const createRunInChildContextHandler = ( context: ExecutionContext, checkpoint: ReturnType, @@ -91,14 +88,15 @@ export const handleCompletedChildContext = async ( options?: ChildConfig, ): Promise => { const serdes = options?.serdes || defaultSerdes; - const result = context.getStepData(entityId)?.ContextDetails?.Result; + const stepData = context.getStepData(entityId); + const result = stepData?.ContextDetails?.Result; - // For adaptive mode, if result is empty string, we need to re-execute - if (result === ADAPTIVE_EMPTY_RESULT) { + // Check if we need to replay children due to large payload + if (stepData?.ContextDetails?.ReplayChildren) { log( context.isVerbose, "🔄", - "Adaptive mode: Re-executing child context due to large payload:", + "ReplayChildren mode: Re-executing child context due to large payload:", { entityId, stepName }, ); @@ -182,14 +180,25 @@ export const executeChildContext = async ( // Check if payload is too large for adaptive mode let payloadToCheckpoint = serializedResult; + let replayChildren = false; + if ( serializedResult && Buffer.byteLength(serializedResult, "utf8") > CHECKPOINT_SIZE_LIMIT ) { + replayChildren = true; + + // Use summary generator if provided, otherwise use empty string + if (options?.summaryGenerator) { + payloadToCheckpoint = options.summaryGenerator(result); + } else { + payloadToCheckpoint = ""; + } + log( context.isVerbose, "📦", - "Adaptive mode: Payload exceeds limit, checkpointing empty string:", + "Large payload detected, using ReplayChildren mode:", { entityId, name, @@ -197,7 +206,6 @@ export const executeChildContext = async ( limit: CHECKPOINT_SIZE_LIMIT, }, ); - payloadToCheckpoint = ADAPTIVE_EMPTY_RESULT; } const subType = options?.subType || OperationSubType.RUN_IN_CHILD_CONTEXT; @@ -208,6 +216,7 @@ export const executeChildContext = async ( SubType: subType, Type: OperationType.CONTEXT, Payload: payloadToCheckpoint, + ContextOptions: replayChildren ? { ReplayChildren: true } : undefined, Name: name, }); diff --git a/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-integration.test.ts b/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-integration.test.ts index 18ed8728..5e9d11d8 100644 --- a/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-integration.test.ts +++ b/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-integration.test.ts @@ -298,22 +298,24 @@ describe("Run In Child Context Integration Tests", () => { expect(checkpointCalls[1].data.Updates[0].Name).toBe( "test-large-child-context", ); - expect(checkpointCalls[1].data.Updates[0].Payload).toBe( - "__LARGE_PAYLOAD__", - ); + expect(checkpointCalls[1].data.Updates[0].Payload).toBe(""); + expect(checkpointCalls[1].data.Updates[0].ContextOptions).toEqual({ + ReplayChildren: true, + }); }); - test("should re-execute on replay when adaptive mode has empty result", async () => { + test("should re-execute on replay when ReplayChildren is true", async () => { const largePayload = "x".repeat(300 * 1024); // 300KB string let executionCount = 0; - // Set up completed step data with empty result (simulating large payload checkpoint) + // Set up completed step data with ReplayChildren flag mockExecutionContext._stepData = { [hashId("1")]: { Id: "1", Status: OperationStatus.SUCCEEDED, ContextDetails: { - Result: "__LARGE_PAYLOAD__", // Empty string indicates large payload in adaptive mode + Result: "[Large payload summary]", + ReplayChildren: true, // This triggers re-execution }, }, }; diff --git a/lambda-durable-functions-sdk-js/src/types/index.ts b/lambda-durable-functions-sdk-js/src/types/index.ts index 5cdc356b..3769b5ec 100644 --- a/lambda-durable-functions-sdk-js/src/types/index.ts +++ b/lambda-durable-functions-sdk-js/src/types/index.ts @@ -187,6 +187,9 @@ export interface StepConfig { export interface ChildConfig { serdes?: Serdes; subType?: string; + // summaryGenerator Will be used internall to create a summary for + // ctx.map and ctx.parallel when result is big + summaryGenerator?: (result: T) => string; } export interface CreateCallbackConfig {