Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,14 @@ describe("Run In Child Context Handler", () => {
TEST_CONSTANTS.CHILD_CONTEXT_ID,
{
Id: TEST_CONSTANTS.CHILD_CONTEXT_ID,
ParentId: mockExecutionContext.parentId,
Action: OperationAction.SUCCEED,
SubType: OperationSubType.RUN_IN_CHILD_CONTEXT,
Type: OperationType.CONTEXT,
Payload: "__LARGE_PAYLOAD__",
Payload: "",
ContextOptions: {
ReplayChildren: true,
},
Name: TEST_CONSTANTS.CHILD_CONTEXT_NAME,
},
);
Expand Down Expand Up @@ -807,4 +811,56 @@ describe("Mock Integration", () => {
},
);
});

test("should use custom summaryGenerator for large payloads", async () => {
const largePayload = { data: "x".repeat(300000) };
const childFn = jest.fn().mockResolvedValue(largePayload);
const summaryGenerator = jest.fn().mockReturnValue("Custom summary of large data");

await runInChildContextHandler(
TEST_CONSTANTS.CHILD_CONTEXT_NAME,
childFn,
{ summaryGenerator },
);

expect(summaryGenerator).toHaveBeenCalledWith(largePayload);
expect(mockCheckpoint).toHaveBeenNthCalledWith(
2,
TEST_CONSTANTS.CHILD_CONTEXT_ID,
{
Id: TEST_CONSTANTS.CHILD_CONTEXT_ID,
ParentId: mockExecutionContext.parentId,
Action: OperationAction.SUCCEED,
SubType: OperationSubType.RUN_IN_CHILD_CONTEXT,
Type: OperationType.CONTEXT,
Payload: "Custom summary of large data",
ContextOptions: {
ReplayChildren: true,
},
Name: TEST_CONSTANTS.CHILD_CONTEXT_NAME,
},
);
});

test("should re-execute child context when ReplayChildren is true", async () => {
const stepData = mockExecutionContext._stepData;
stepData[hashId(TEST_CONSTANTS.CHILD_CONTEXT_ID)] = {
Id: TEST_CONSTANTS.CHILD_CONTEXT_ID,
Status: OperationStatus.SUCCEEDED,
ContextDetails: {
Result: "Summary of large payload",
ReplayChildren: true,
},
} as any;

const childFn = jest.fn().mockResolvedValue("re-executed-result");

const result = await runInChildContextHandler(
TEST_CONSTANTS.CHILD_CONTEXT_NAME,
childFn,
);

expect(result).toBe("re-executed-result");
expect(childFn).toHaveBeenCalledTimes(1);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ import { createErrorObjectFromError } from "../../utils/error-object/error-objec
// Checkpoint size limit in bytes (256KB)
const CHECKPOINT_SIZE_LIMIT = 256 * 1024;

// Marker for adaptive mode when payload is too large
const ADAPTIVE_EMPTY_RESULT = "__LARGE_PAYLOAD__";

export const createRunInChildContextHandler = (
context: ExecutionContext,
checkpoint: ReturnType<typeof createCheckpoint>,
Expand Down Expand Up @@ -91,14 +88,15 @@ export const handleCompletedChildContext = async <T>(
options?: ChildConfig<T>,
): Promise<T> => {
const serdes = options?.serdes || defaultSerdes;
const result = context.getStepData(entityId)?.ContextDetails?.Result;
const stepData = context.getStepData(entityId);
const result = stepData?.ContextDetails?.Result;

// For adaptive mode, if result is empty string, we need to re-execute
if (result === ADAPTIVE_EMPTY_RESULT) {
// Check if we need to replay children due to large payload
if (stepData?.ContextDetails?.ReplayChildren) {
log(
context.isVerbose,
"🔄",
"Adaptive mode: Re-executing child context due to large payload:",
"ReplayChildren mode: Re-executing child context due to large payload:",
{ entityId, stepName },
);

Expand Down Expand Up @@ -182,22 +180,32 @@ export const executeChildContext = async <T>(

// Check if payload is too large for adaptive mode
let payloadToCheckpoint = serializedResult;
let replayChildren = false;

if (
serializedResult &&
Buffer.byteLength(serializedResult, "utf8") > CHECKPOINT_SIZE_LIMIT
) {
replayChildren = true;

// Use summary generator if provided, otherwise use empty string
if (options?.summaryGenerator) {
payloadToCheckpoint = options.summaryGenerator(result);
} else {
payloadToCheckpoint = "";
}

log(
context.isVerbose,
"📦",
"Adaptive mode: Payload exceeds limit, checkpointing empty string:",
"Large payload detected, using ReplayChildren mode:",
{
entityId,
name,
payloadSize: Buffer.byteLength(serializedResult, "utf8"),
limit: CHECKPOINT_SIZE_LIMIT,
},
);
payloadToCheckpoint = ADAPTIVE_EMPTY_RESULT;
}

const subType = options?.subType || OperationSubType.RUN_IN_CHILD_CONTEXT;
Expand All @@ -208,6 +216,7 @@ export const executeChildContext = async <T>(
SubType: subType,
Type: OperationType.CONTEXT,
Payload: payloadToCheckpoint,
ContextOptions: replayChildren ? { ReplayChildren: true } : undefined,
Name: name,
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,22 +298,24 @@ describe("Run In Child Context Integration Tests", () => {
expect(checkpointCalls[1].data.Updates[0].Name).toBe(
"test-large-child-context",
);
expect(checkpointCalls[1].data.Updates[0].Payload).toBe(
"__LARGE_PAYLOAD__",
);
expect(checkpointCalls[1].data.Updates[0].Payload).toBe("");
expect(checkpointCalls[1].data.Updates[0].ContextOptions).toEqual({
ReplayChildren: true,
});
});

test("should re-execute on replay when adaptive mode has empty result", async () => {
test("should re-execute on replay when ReplayChildren is true", async () => {
const largePayload = "x".repeat(300 * 1024); // 300KB string
let executionCount = 0;

// Set up completed step data with empty result (simulating large payload checkpoint)
// Set up completed step data with ReplayChildren flag
mockExecutionContext._stepData = {
[hashId("1")]: {
Id: "1",
Status: OperationStatus.SUCCEEDED,
ContextDetails: {
Result: "__LARGE_PAYLOAD__", // Empty string indicates large payload in adaptive mode
Result: "[Large payload summary]",
ReplayChildren: true, // This triggers re-execution
},
},
};
Expand Down
3 changes: 3 additions & 0 deletions lambda-durable-functions-sdk-js/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ export interface StepConfig<T> {
export interface ChildConfig<T = any> {
serdes?: Serdes<T>;
subType?: string;
// summaryGenerator Will be used internall to create a summary for
// ctx.map and ctx.parallel when result is big
summaryGenerator?: (result: T) => string;
}

export interface CreateCallbackConfig {
Expand Down
Loading