Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@
memoryLimitInMB: "128",
logGroupName: "/aws/lambda/mock-function",
logStreamName: "2023/01/01/[$LATEST]abcdef123456",
getRemainingTimeInMillis: () => 30000,

Check warning on line 63 in packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.test.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Missing return type on function
done: () => {},

Check warning on line 64 in packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.test.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Missing return type on function
fail: () => {},

Check warning on line 65 in packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.test.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Missing return type on function
succeed: () => {},

Check warning on line 66 in packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.test.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Missing return type on function
};

// Setup mocks
Expand Down Expand Up @@ -144,15 +144,6 @@
expect(mockStepHandler).toHaveBeenCalledWith("test-step", stepFn, options);
});

test("should have hasRunningOperations method that returns false initially", () => {
const durableContext = createDurableContext(
mockExecutionContext,
mockParentContext,
);

expect(durableContext.hasRunningOperations()).toBe(false);
});

test("should call block handler when runInChildContext method is invoked", () => {
const durableContext = createDurableContext(
mockExecutionContext,
Expand Down Expand Up @@ -187,10 +178,29 @@
mockExecutionContext,
mockCheckpointHandler,
expect.any(Function),
expect.any(Function), // hasRunningOperations
);
expect(mockWaitHandler).toHaveBeenCalledWith("test-wait", 1000);
});

test("should provide hasRunningOperations function that returns false when no operations", () => {
const durableContext = createDurableContext(
mockExecutionContext,
mockParentContext,
);

// Call wait to trigger the creation of wait handler with hasRunningOperations
durableContext.wait(1000);

// Extract hasRunningOperations function from the createWaitHandler call
const createWaitHandlerCall = jest.mocked(createWaitHandler).mock.calls[0];
const hasRunningOperations = createWaitHandlerCall[3]; // 4th parameter

// Call hasRunningOperations when no operations are running
const result = hasRunningOperations();
expect(result).toBe(false);
});

test("should call callback handler when createCallback method is invoked", () => {
const durableContext = createDurableContext(
mockExecutionContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@
};

const map: DurableContext["map"] = <T>(
nameOrItems: string | undefined | any[],

Check warning on line 137 in packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Unexpected any. Specify a different type
itemsOrMapFunc?: any[] | MapFunc<T>,

Check warning on line 138 in packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Unexpected any. Specify a different type
mapFuncOrConfig?: MapFunc<T> | MapConfig,
maybeConfig?: MapConfig,
) => {
Expand Down Expand Up @@ -193,10 +193,9 @@
...parentContext,
_stepPrefix: stepPrefix,
_stepCounter: stepCounter,
hasRunningOperations,
step,
runInChildContext,
wait: createWaitHandler(executionContext, checkpoint, createStepId),
wait: createWaitHandler(executionContext, checkpoint, createStepId, hasRunningOperations),
waitForCondition: createWaitForConditionHandler(
executionContext,
checkpoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
OperationStatus,
OperationType,
Operation,
OperationAction,
} from "@amzn/dex-internal-sdk";
import { ExecutionContext, OperationSubType } from "../../types";
import { TerminationManager } from "../../termination-manager/termination-manager";
Expand Down Expand Up @@ -53,6 +54,7 @@ describe("Wait Handler", () => {
mockExecutionContext,
mockCheckpoint,
createStepId,
jest.fn(() => false), // hasRunningOperations
);
});

Expand Down Expand Up @@ -292,5 +294,164 @@ describe("Wait Handler", () => {
},
});
});

describe("running operations awareness", () => {
test("should terminate immediately when no operations are running", async () => {
const mockHasRunningOperations = jest.fn(() => false);
waitHandler = createWaitHandler(
mockExecutionContext,
mockCheckpoint,
createStepId,
mockHasRunningOperations,
);

waitHandler("test-wait", 1000);
await new Promise(resolve => setTimeout(resolve, 50));

expect(mockExecutionContext.terminationManager.terminate).toHaveBeenCalledWith({
reason: TerminationReason.WAIT_SCHEDULED,
message: "Operation test-wait scheduled to wait",
});
expect(mockHasRunningOperations).toHaveBeenCalled();
});

test("should not checkpoint START if step data already exists", async () => {
mockExecutionContext.getStepData.mockReturnValue({
Status: OperationStatus.STARTED,
WaitDetails: { ScheduledTimestamp: new Date(Date.now() + 5000) }
} as Operation);

const mockHasRunningOperations = jest.fn(() => false);
waitHandler = createWaitHandler(
mockExecutionContext,
mockCheckpoint,
createStepId,
mockHasRunningOperations,
);

waitHandler("test-wait", 1000);
await new Promise(resolve => setTimeout(resolve, 50));

expect(mockCheckpoint).not.toHaveBeenCalled();
});

test("should checkpoint START only on first execution", async () => {
mockExecutionContext.getStepData.mockReturnValue(undefined);

const mockHasRunningOperations = jest.fn(() => false);
waitHandler = createWaitHandler(
mockExecutionContext,
mockCheckpoint,
createStepId,
mockHasRunningOperations,
);

waitHandler("test-wait", 1000);
await new Promise(resolve => setTimeout(resolve, 50));

expect(mockCheckpoint).toHaveBeenCalledWith("test-step-id", {
Id: "test-step-id",
ParentId: undefined,
Action: OperationAction.START,
SubType: OperationSubType.WAIT,
Type: OperationType.WAIT,
Name: "test-wait",
WaitOptions: {
WaitSeconds: 1,
},
});
});

test("should wait for operations to complete before terminating", async () => {
let operationsRunning = true;
const mockHasRunningOperations = jest.fn(() => operationsRunning);

// Mock step data with existing wait
mockExecutionContext.getStepData.mockReturnValue({
Status: OperationStatus.STARTED,
WaitDetails: { ScheduledTimestamp: new Date(Date.now() + 5000) }
} as Operation);

waitHandler = createWaitHandler(
mockExecutionContext,
mockCheckpoint,
createStepId,
mockHasRunningOperations,
);

// Start the wait handler (don't await - it will wait for operations)
const waitPromise = waitHandler("test-wait", 1000);

// Give it time to enter the waiting logic
await new Promise(resolve => setTimeout(resolve, 50));

// Should not terminate immediately since operations are running
expect(mockExecutionContext.terminationManager.terminate).not.toHaveBeenCalled();
expect(mockHasRunningOperations).toHaveBeenCalled();

// Simulate operations completing after 150ms
setTimeout(() => {
operationsRunning = false;
}, 150);

// Wait for the polling to detect the change and terminate
await new Promise(resolve => setTimeout(resolve, 300));

// Should eventually terminate when operations complete
expect(mockExecutionContext.terminationManager.terminate).toHaveBeenCalledWith({
reason: TerminationReason.WAIT_SCHEDULED,
message: "Operation test-wait scheduled to wait",
});
});

test("should handle wait during parallel execution with running step", async () => {
// This integration test simulates:
// ctx.parallel([
// branch1: ctx.wait(2 sec),
// branch2: ctx.step (that has internal wait for 3 second)
// ])

let operationsRunning = true;
const mockHasRunningOperations = jest.fn(() => operationsRunning);

// Mock step data for wait operation (2 second wait)
const waitTime = Date.now() + 2000;
mockExecutionContext.getStepData.mockReturnValue({
Status: OperationStatus.STARTED,
WaitDetails: { ScheduledTimestamp: new Date(waitTime) }
} as Operation);

waitHandler = createWaitHandler(
mockExecutionContext,
mockCheckpoint,
createStepId,
mockHasRunningOperations,
);

// Start wait handler - should detect running operations and wait
const waitPromise = waitHandler("parallel-wait", 2000);

// Give time for wait handler to enter complex waiting logic
await new Promise(resolve => setTimeout(resolve, 50));

// Should not terminate immediately due to running operations
expect(mockExecutionContext.terminationManager.terminate).not.toHaveBeenCalled();
expect(mockHasRunningOperations).toHaveBeenCalled();

// Simulate step operation completing (after 1 second)
setTimeout(() => {
operationsRunning = false;
}, 100);

// Wait for operations to complete and handler to terminate
await new Promise(resolve => setTimeout(resolve, 200));

// Should eventually terminate when operations complete
expect(mockExecutionContext.terminationManager.terminate).toHaveBeenCalledWith({
reason: TerminationReason.WAIT_SCHEDULED,
message: "Operation parallel-wait scheduled to wait",
});
});
});
});
});
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { ExecutionContext, OperationSubType } from "../../types";
import { OperationStatus, OperationType } from "@amzn/dex-internal-sdk";
import { OperationStatus, OperationType, OperationAction } from "@amzn/dex-internal-sdk";
import { log } from "../../utils/logger/logger";
import { createCheckpoint } from "../../utils/checkpoint/checkpoint";
import { TerminationReason } from "../../termination-manager/types";
import { OperationInterceptor } from "../../mocks/operation-interceptor";
import { CheckpointFailedError } from "../../errors/checkpoint-errors/checkpoint-errors";
import { waitBeforeContinue } from "../../utils/wait-before-continue/wait-before-continue";

export const createWaitHandler = (
context: ExecutionContext,
checkpoint: ReturnType<typeof createCheckpoint>,
createStepId: () => string,
hasRunningOperations: () => boolean,
) => {
function waitHandler(name: string, millis: number): Promise<void>;
function waitHandler(millis: number): Promise<void>;
Expand All @@ -28,36 +30,61 @@ export const createWaitHandler = (
millis: actualMillis,
});

if (context.getStepData(stepId)?.Status === OperationStatus.SUCCEEDED) {
log(context.isVerbose, "⏭️", "Wait already completed:", { stepId });
return;
}
// Main wait logic - can be re-executed if step data changes
while (true) {
const stepData = context.getStepData(stepId);
if (stepData?.Status === OperationStatus.SUCCEEDED) {
log(context.isVerbose, "⏭️", "Wait already completed:", { stepId });
return;
}

const wouldBeMocked = OperationInterceptor.forExecution(
context.durableExecutionArn,
).recordOnly(actualName);
if (wouldBeMocked) {
throw new CheckpointFailedError("Wait step cannot be mocked");
}
const wouldBeMocked = OperationInterceptor.forExecution(
context.durableExecutionArn,
).recordOnly(actualName);
if (wouldBeMocked) {
throw new CheckpointFailedError("Wait step cannot be mocked");
}

await checkpoint(stepId, {
Id: stepId,
ParentId: context.parentId,
Action: "START",
SubType: OperationSubType.WAIT,
Type: OperationType.WAIT,
Name: actualName,
WaitOptions: {
WaitSeconds: actualMillis / 1000,
},
});
// Only checkpoint START if we haven't started this wait before
if (!stepData) {
await checkpoint(stepId, {
Id: stepId,
ParentId: context.parentId,
Action: OperationAction.START,
SubType: OperationSubType.WAIT,
Type: OperationType.WAIT,
Name: actualName,
WaitOptions: {
WaitSeconds: actualMillis / 1000,
},
});
}

context.terminationManager.terminate({
reason: TerminationReason.WAIT_SCHEDULED,
message: `Operation ${actualName || stepId} scheduled to wait`,
});
// Check if there are any ongoing operations
if (!hasRunningOperations()) {
// A.1: No ongoing operations - safe to terminate
context.terminationManager.terminate({
reason: TerminationReason.WAIT_SCHEDULED,
message: `Operation ${actualName || stepId} scheduled to wait`,
});
return new Promise(() => { });
}

return new Promise(() => {});

// There are ongoing operations - wait before continuing
await waitBeforeContinue({
checkHasRunningOperations: true,
checkStepStatus: true,
checkTimer: true,
scheduledTimestamp: stepData?.WaitDetails?.ScheduledTimestamp,
stepId,
context,
hasRunningOperations,
checkpoint,
});

// Continue the loop to re-evaluate all conditions from the beginning
}
}

return waitHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ export type DurableExecutionInvocationOutput =
export interface DurableContext extends Context {
_stepPrefix?: string;
_stepCounter: number;
hasRunningOperations(): boolean;
step: <T>(
nameOrFn: string | undefined | StepFunc<T>,
fnOrOptions?: StepFunc<T> | StepConfig<T>,
Expand Down
Loading
Loading