Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@
memoryLimitInMB: "128",
logGroupName: "/aws/lambda/mock-function",
logStreamName: "2023/01/01/[$LATEST]abcdef123456",
getRemainingTimeInMillis: () => 30000,

Check warning on line 63 in packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.test.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Missing return type on function
done: () => {},

Check warning on line 64 in packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.test.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Missing return type on function
fail: () => {},

Check warning on line 65 in packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.test.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Missing return type on function
succeed: () => {},

Check warning on line 66 in packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.test.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Missing return type on function
};

// Setup mocks
Expand Down Expand Up @@ -138,10 +138,21 @@
mockParentContext,
expect.any(Function), // createStepId
expect.any(Function), // createContextLogger
expect.any(Function), // addRunningOperation
expect.any(Function), // removeRunningOperation
);
expect(mockStepHandler).toHaveBeenCalledWith("test-step", stepFn, options);
});

test("should have hasRunningOperations method that returns false initially", () => {
const durableContext = createDurableContext(
mockExecutionContext,
mockParentContext,
);

expect(durableContext.hasRunningOperations()).toBe(false);
});

test("should call block handler when runInChildContext method is invoked", () => {
const durableContext = createDurableContext(
mockExecutionContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,27 @@
);

let stepCounter = 0;
const runningOperations = new Set<string>();
const checkpoint = createCheckpoint(executionContext, checkpointToken || "");

const createStepId = (): string => {
stepCounter++;
return stepPrefix ? `${stepPrefix}-${stepCounter}` : `${stepCounter}`;
};

// Internal helpers for managing running operations
const addRunningOperation = (stepId: string): void => {
runningOperations.add(stepId);
};

const removeRunningOperation = (stepId: string): void => {
runningOperations.delete(stepId);
};

const hasRunningOperations = (): boolean => {
return runningOperations.size > 0;
};

const step: DurableContext["step"] = <T>(
nameOrFn: string | undefined | StepFunc<T>,
fnOrOptions?: StepFunc<T> | StepConfig<T>,
Expand All @@ -71,6 +85,8 @@
parentContext,
createStepId,
createContextLogger,
addRunningOperation,
removeRunningOperation,
);
return stepHandler(nameOrFn, fnOrOptions, maybeOptions);
};
Expand Down Expand Up @@ -118,8 +134,8 @@
};

const map: DurableContext["map"] = <T>(
nameOrItems: string | undefined | any[],

Check warning on line 137 in packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Unexpected any. Specify a different type
itemsOrMapFunc?: any[] | MapFunc<T>,

Check warning on line 138 in packages/lambda-durable-functions-sdk-js/src/context/durable-context/durable-context.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Unexpected any. Specify a different type
mapFuncOrConfig?: MapFunc<T> | MapConfig,
maybeConfig?: MapConfig,
) => {
Expand Down Expand Up @@ -177,6 +193,7 @@
...parentContext,
_stepPrefix: stepPrefix,
_stepCounter: stepCounter,
hasRunningOperations,
step,
runInChildContext,
wait: createWaitHandler(executionContext, checkpoint, createStepId),
Expand All @@ -185,6 +202,8 @@
checkpoint,
createStepId,
createContextLogger,
addRunningOperation,
removeRunningOperation,
),
createCallback,
waitForCallback,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ describe("Step Handler", () => {
mockParentContext,
createStepId,
createMockEnrichedLogger,
jest.fn(), // addRunningOperation
jest.fn(), // removeRunningOperation
);

// Reset the mock for retryPresets.default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export const createStepHandler = (
parentContext: Context,
createStepId: () => string,
createContextLogger: (stepId: string, attempt?: number) => Logger,
addRunningOperation: (stepId: string) => void,
removeRunningOperation: (stepId: string) => void,
) => {
return async <T>(
nameOrFn: string | undefined | StepFunc<T>,
Expand Down Expand Up @@ -166,6 +168,8 @@ export const createStepHandler = (
name,
fn,
createContextLogger,
addRunningOperation,
removeRunningOperation,
options,
);
};
Expand Down Expand Up @@ -205,6 +209,8 @@ export const executeStep = async <T>(
name: string | undefined,
fn: StepFunc<T>,
createContextLogger: (stepId: string, attempt?: number) => Logger,
addRunningOperation: (stepId: string) => void,
removeRunningOperation: (stepId: string) => void,
options?: StepConfig<T>,
): Promise<T> => {
// Determine step semantics (default to AT_LEAST_ONCE_PER_RETRY if not specified)
Expand Down Expand Up @@ -248,9 +254,15 @@ export const executeStep = async <T>(
};

// Execute the step function with stepContext
const result = await OperationInterceptor.forExecution(
context.durableExecutionArn,
).execute(name, () => fn(stepContext));
addRunningOperation(stepId);
let result: T;
try {
result = await OperationInterceptor.forExecution(
context.durableExecutionArn,
).execute(name, () => fn(stepContext));
} finally {
removeRunningOperation(stepId);
}

// Serialize the result for consistency
const serializedResult = await safeSerialize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ describe("WaitForCondition Handler", () => {
mockCheckpoint,
createStepId,
createMockEnrichedLogger,
jest.fn(), // addRunningOperation
jest.fn(), // removeRunningOperation
);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ export const createWaitForConditionHandler = (
checkpoint: ReturnType<typeof createCheckpoint>,
createStepId: () => string,
createContextLogger: (stepId: string, attempt?: number) => Logger,
addRunningOperation: (stepId: string) => void,
removeRunningOperation: (stepId: string) => void,
) => {
return async <T>(
nameOrCheck: string | undefined | WaitForConditionCheckFunc<T>,
Expand Down Expand Up @@ -111,6 +113,8 @@ export const createWaitForConditionHandler = (
check,
config,
createContextLogger,
addRunningOperation,
removeRunningOperation,
);
};
};
Expand Down Expand Up @@ -150,6 +154,8 @@ export const executeWaitForCondition = async <T>(
check: WaitForConditionCheckFunc<T>,
config: WaitForConditionConfig<T>,
createContextLogger: (stepId: string, attempt?: number) => Logger,
addRunningOperation: (stepId: string) => void,
removeRunningOperation: (stepId: string) => void,
): Promise<T> => {
const serdes = config.serdes || defaultSerdes;

Expand Down Expand Up @@ -215,9 +221,15 @@ export const executeWaitForCondition = async <T>(
};

// Execute the check function
const newState = await OperationInterceptor.forExecution(
context.durableExecutionArn,
).execute(name, () => check(currentState, waitForConditionContext));
addRunningOperation(stepId);
let newState: T;
try {
newState = await OperationInterceptor.forExecution(
context.durableExecutionArn,
).execute(name, () => check(currentState, waitForConditionContext));
} finally {
removeRunningOperation(stepId);
}

// Serialize the new state for consistency
const serializedState = await safeSerialize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export type DurableExecutionInvocationOutput =
export interface DurableContext extends Context {
_stepPrefix?: string;
_stepCounter: number;
hasRunningOperations(): boolean;
step: <T>(
nameOrFn: string | undefined | StepFunc<T>,
fnOrOptions?: StepFunc<T> | StepConfig<T>,
Expand Down
Loading