From 6d511d7be38513685270b8099f7fa340804f8a5d Mon Sep 17 00:00:00 2001 From: Michael James Schock Date: Wed, 26 Nov 2025 16:27:40 -0800 Subject: [PATCH 1/4] fix: prevent duplicate function_call items in session history after resuming from interruptions --- packages/agents-core/src/run.ts | 48 ++++++++++++++----- packages/agents-core/src/runImplementation.ts | 39 +++++++++++++++ 2 files changed, 74 insertions(+), 13 deletions(-) diff --git a/packages/agents-core/src/run.ts b/packages/agents-core/src/run.ts index 474f94c6..38ce127a 100644 --- a/packages/agents-core/src/run.ts +++ b/packages/agents-core/src/run.ts @@ -732,9 +732,8 @@ export class Runner extends RunHooks> { state._originalInput = turnResult.originalInput; state._generatedItems = turnResult.generatedItems; - if (turnResult.nextStep.type === 'next_step_run_again') { - state._currentTurnPersistedItemCount = 0; - } + // Don't reset counter here - it's already been adjusted by resolveInterruptedTurn's rewind logic + // Counter will be reset when _currentTurn is incremented (starting a new turn) state._currentStep = turnResult.nextStep; if (turnResult.nextStep.type === 'next_step_interruption') { @@ -742,14 +741,27 @@ export class Runner extends RunHooks> { return new RunResult(state); } + // If continuing from interruption with next_step_run_again, continue the loop + // but DON'T increment turn or reset counter - we're continuing the same turn + if (turnResult.nextStep.type === 'next_step_run_again') { + continue; + } + continue; } if (state._currentStep.type === 'next_step_run_again') { const artifacts = await prepareAgentArtifacts(state); + // Only increment turn and reset counter when starting a NEW turn + // If counter is non-zero, it means we're continuing from an interruption (counter was rewound) + // In that case, don't reset the counter - it's already been adjusted by the rewind logic state._currentTurn++; - state._currentTurnPersistedItemCount = 0; + if (state._currentTurnPersistedItemCount === 0) { + // Only reset if counter is already 0 (starting a new turn) + // If counter is non-zero, we're continuing from interruption and it was already adjusted + state._currentTurnPersistedItemCount = 0; + } if (state._currentTurn > state._maxTurns) { state._currentAgentSpan?.setError({ @@ -867,9 +879,8 @@ export class Runner extends RunHooks> { state._originalInput = turnResult.originalInput; state._generatedItems = turnResult.generatedItems; - if (turnResult.nextStep.type === 'next_step_run_again') { - state._currentTurnPersistedItemCount = 0; - } + // Don't reset counter here - it's already been adjusted by resolveInterruptedTurn's rewind logic + // Counter will be reset when _currentTurn is incremented (starting a new turn) state._currentStep = turnResult.nextStep; if (parallelGuardrailPromise) { @@ -1021,9 +1032,8 @@ export class Runner extends RunHooks> { result.state._originalInput = turnResult.originalInput; result.state._generatedItems = turnResult.generatedItems; - if (turnResult.nextStep.type === 'next_step_run_again') { - result.state._currentTurnPersistedItemCount = 0; - } + // Don't reset counter here - it's already been adjusted by resolveInterruptedTurn's rewind logic + // Counter will be reset when _currentTurn is incremented (starting a new turn) result.state._currentStep = turnResult.nextStep; if (turnResult.nextStep.type === 'next_step_interruption') { // we are still in an interruption, so we need to avoid an infinite loop @@ -1035,8 +1045,15 @@ export class Runner extends RunHooks> { if (result.state._currentStep.type === 'next_step_run_again') { const artifacts = await prepareAgentArtifacts(result.state); + // Only increment turn and reset counter when starting a NEW turn + // If counter is non-zero, it means we're continuing from an interruption (counter was rewound) + // In that case, don't reset the counter - it's already been adjusted by the rewind logic result.state._currentTurn++; - result.state._currentTurnPersistedItemCount = 0; + if (result.state._currentTurnPersistedItemCount === 0) { + // Only reset if counter is already 0 (starting a new turn) + // If counter is non-zero, we're continuing from interruption and it was already adjusted + result.state._currentTurnPersistedItemCount = 0; + } if (result.state._currentTurn > result.state._maxTurns) { result.state._currentAgentSpan?.setError({ @@ -1208,10 +1225,15 @@ export class Runner extends RunHooks> { result.state._originalInput = turnResult.originalInput; result.state._generatedItems = turnResult.generatedItems; + // Don't reset counter here - it's already been adjusted by resolveInterruptedTurn's rewind logic + // Counter will be reset when _currentTurn is incremented (starting a new turn) + result.state._currentStep = turnResult.nextStep; + + // If continuing from interruption with next_step_run_again, don't increment turn or reset counter + // We're continuing the same turn, not starting a new one if (turnResult.nextStep.type === 'next_step_run_again') { - result.state._currentTurnPersistedItemCount = 0; + continue; } - result.state._currentStep = turnResult.nextStep; } if (result.state._currentStep.type === 'next_step_final_output') { diff --git a/packages/agents-core/src/runImplementation.ts b/packages/agents-core/src/runImplementation.ts index 53c832ee..a7f5ffe3 100644 --- a/packages/agents-core/src/runImplementation.ts +++ b/packages/agents-core/src/runImplementation.ts @@ -687,6 +687,37 @@ export async function resolveInterruptedTurn( return false; }); + // Filter out handoffs that were already executed before the interruption. + // Handoffs that were already executed will have their call items in originalPreStepItems. + // We check by callId to avoid re-executing the same handoff call. + const executedHandoffCallIds = new Set(); + for (const item of originalPreStepItems) { + if (item instanceof RunHandoffCallItem && item.rawItem.callId) { + executedHandoffCallIds.add(item.rawItem.callId); + } + } + const pendingHandoffs = processedResponse.handoffs.filter((handoff) => { + const callId = handoff.toolCall?.callId; + // Only filter by callId - if callId matches, this handoff was already executed + return !callId || !executedHandoffCallIds.has(callId); + }); + + // If there are pending handoffs that haven't been executed yet, execute them now. + // Otherwise, if there were handoffs that were already executed, we need to make sure + // they don't get re-executed when we continue. + if (pendingHandoffs.length > 0) { + return await executeHandoffCalls( + agent, + originalInput, + preStepItems, + newItems, + newResponse, + pendingHandoffs, + runner, + state._context, + ); + } + const completedStep = await maybeCompleteTurnFromToolResults({ agent, runner, @@ -1803,6 +1834,14 @@ export async function executeHandoffCalls< runner: Runner, runContext: RunContext, ): Promise { + logger.debug( + `[executeHandoffCalls] Executing ${runHandoffs.length} handoff(s):`, + ); + for (const handoff of runHandoffs) { + logger.debug( + ` - ${handoff.toolCall.name} (callId: ${handoff.toolCall.callId})`, + ); + } newStepItems = [...newStepItems]; if (runHandoffs.length === 0) { From e058c31dc91f3aad768c8f38dc78fc61e1f82f9a Mon Sep 17 00:00:00 2001 From: Michael James Schock Date: Wed, 26 Nov 2025 16:32:36 -0800 Subject: [PATCH 2/4] refactor: remove debug logging from executeHandoffCalls function --- packages/agents-core/src/runImplementation.ts | 8 -------- 1 file changed, 8 deletions(-) diff --git a/packages/agents-core/src/runImplementation.ts b/packages/agents-core/src/runImplementation.ts index a7f5ffe3..eb76a480 100644 --- a/packages/agents-core/src/runImplementation.ts +++ b/packages/agents-core/src/runImplementation.ts @@ -1834,14 +1834,6 @@ export async function executeHandoffCalls< runner: Runner, runContext: RunContext, ): Promise { - logger.debug( - `[executeHandoffCalls] Executing ${runHandoffs.length} handoff(s):`, - ); - for (const handoff of runHandoffs) { - logger.debug( - ` - ${handoff.toolCall.name} (callId: ${handoff.toolCall.callId})`, - ); - } newStepItems = [...newStepItems]; if (runHandoffs.length === 0) { From 152c8d169a81eda07ecd15edc3900d2dda8810c7 Mon Sep 17 00:00:00 2001 From: Michael James Schock Date: Wed, 3 Dec 2025 17:24:01 -0800 Subject: [PATCH 3/4] fix: reset to main branch behavior to verify bug test failure --- packages/agents-core/src/run.ts | 48 +++------- packages/agents-core/src/runImplementation.ts | 31 ------ packages/agents-core/test/run.test.ts | 95 +++++++++++++++++++ 3 files changed, 108 insertions(+), 66 deletions(-) diff --git a/packages/agents-core/src/run.ts b/packages/agents-core/src/run.ts index 38ce127a..474f94c6 100644 --- a/packages/agents-core/src/run.ts +++ b/packages/agents-core/src/run.ts @@ -732,8 +732,9 @@ export class Runner extends RunHooks> { state._originalInput = turnResult.originalInput; state._generatedItems = turnResult.generatedItems; - // Don't reset counter here - it's already been adjusted by resolveInterruptedTurn's rewind logic - // Counter will be reset when _currentTurn is incremented (starting a new turn) + if (turnResult.nextStep.type === 'next_step_run_again') { + state._currentTurnPersistedItemCount = 0; + } state._currentStep = turnResult.nextStep; if (turnResult.nextStep.type === 'next_step_interruption') { @@ -741,27 +742,14 @@ export class Runner extends RunHooks> { return new RunResult(state); } - // If continuing from interruption with next_step_run_again, continue the loop - // but DON'T increment turn or reset counter - we're continuing the same turn - if (turnResult.nextStep.type === 'next_step_run_again') { - continue; - } - continue; } if (state._currentStep.type === 'next_step_run_again') { const artifacts = await prepareAgentArtifacts(state); - // Only increment turn and reset counter when starting a NEW turn - // If counter is non-zero, it means we're continuing from an interruption (counter was rewound) - // In that case, don't reset the counter - it's already been adjusted by the rewind logic state._currentTurn++; - if (state._currentTurnPersistedItemCount === 0) { - // Only reset if counter is already 0 (starting a new turn) - // If counter is non-zero, we're continuing from interruption and it was already adjusted - state._currentTurnPersistedItemCount = 0; - } + state._currentTurnPersistedItemCount = 0; if (state._currentTurn > state._maxTurns) { state._currentAgentSpan?.setError({ @@ -879,8 +867,9 @@ export class Runner extends RunHooks> { state._originalInput = turnResult.originalInput; state._generatedItems = turnResult.generatedItems; - // Don't reset counter here - it's already been adjusted by resolveInterruptedTurn's rewind logic - // Counter will be reset when _currentTurn is incremented (starting a new turn) + if (turnResult.nextStep.type === 'next_step_run_again') { + state._currentTurnPersistedItemCount = 0; + } state._currentStep = turnResult.nextStep; if (parallelGuardrailPromise) { @@ -1032,8 +1021,9 @@ export class Runner extends RunHooks> { result.state._originalInput = turnResult.originalInput; result.state._generatedItems = turnResult.generatedItems; - // Don't reset counter here - it's already been adjusted by resolveInterruptedTurn's rewind logic - // Counter will be reset when _currentTurn is incremented (starting a new turn) + if (turnResult.nextStep.type === 'next_step_run_again') { + result.state._currentTurnPersistedItemCount = 0; + } result.state._currentStep = turnResult.nextStep; if (turnResult.nextStep.type === 'next_step_interruption') { // we are still in an interruption, so we need to avoid an infinite loop @@ -1045,15 +1035,8 @@ export class Runner extends RunHooks> { if (result.state._currentStep.type === 'next_step_run_again') { const artifacts = await prepareAgentArtifacts(result.state); - // Only increment turn and reset counter when starting a NEW turn - // If counter is non-zero, it means we're continuing from an interruption (counter was rewound) - // In that case, don't reset the counter - it's already been adjusted by the rewind logic result.state._currentTurn++; - if (result.state._currentTurnPersistedItemCount === 0) { - // Only reset if counter is already 0 (starting a new turn) - // If counter is non-zero, we're continuing from interruption and it was already adjusted - result.state._currentTurnPersistedItemCount = 0; - } + result.state._currentTurnPersistedItemCount = 0; if (result.state._currentTurn > result.state._maxTurns) { result.state._currentAgentSpan?.setError({ @@ -1225,15 +1208,10 @@ export class Runner extends RunHooks> { result.state._originalInput = turnResult.originalInput; result.state._generatedItems = turnResult.generatedItems; - // Don't reset counter here - it's already been adjusted by resolveInterruptedTurn's rewind logic - // Counter will be reset when _currentTurn is incremented (starting a new turn) - result.state._currentStep = turnResult.nextStep; - - // If continuing from interruption with next_step_run_again, don't increment turn or reset counter - // We're continuing the same turn, not starting a new one if (turnResult.nextStep.type === 'next_step_run_again') { - continue; + result.state._currentTurnPersistedItemCount = 0; } + result.state._currentStep = turnResult.nextStep; } if (result.state._currentStep.type === 'next_step_final_output') { diff --git a/packages/agents-core/src/runImplementation.ts b/packages/agents-core/src/runImplementation.ts index eb76a480..53c832ee 100644 --- a/packages/agents-core/src/runImplementation.ts +++ b/packages/agents-core/src/runImplementation.ts @@ -687,37 +687,6 @@ export async function resolveInterruptedTurn( return false; }); - // Filter out handoffs that were already executed before the interruption. - // Handoffs that were already executed will have their call items in originalPreStepItems. - // We check by callId to avoid re-executing the same handoff call. - const executedHandoffCallIds = new Set(); - for (const item of originalPreStepItems) { - if (item instanceof RunHandoffCallItem && item.rawItem.callId) { - executedHandoffCallIds.add(item.rawItem.callId); - } - } - const pendingHandoffs = processedResponse.handoffs.filter((handoff) => { - const callId = handoff.toolCall?.callId; - // Only filter by callId - if callId matches, this handoff was already executed - return !callId || !executedHandoffCallIds.has(callId); - }); - - // If there are pending handoffs that haven't been executed yet, execute them now. - // Otherwise, if there were handoffs that were already executed, we need to make sure - // they don't get re-executed when we continue. - if (pendingHandoffs.length > 0) { - return await executeHandoffCalls( - agent, - originalInput, - preStepItems, - newItems, - newResponse, - pendingHandoffs, - runner, - state._context, - ); - } - const completedStep = await maybeCompleteTurnFromToolResults({ agent, runner, diff --git a/packages/agents-core/test/run.test.ts b/packages/agents-core/test/run.test.ts index e1befe9b..b151a51b 100644 --- a/packages/agents-core/test/run.test.ts +++ b/packages/agents-core/test/run.test.ts @@ -1235,6 +1235,101 @@ describe('Runner.run', () => { expect(savedHostedCall.providerData).toEqual(hostedCall.providerData); expect(savedHostedCall.id).toBe('approval-1'); }); + + it('prevents duplicate function_call items when resuming from interruption after tool approval', async () => { + // Regression test for issue #701 + // + // Bug: When resuming a turn after approving a tool call, duplicate function_call items + // were saved to the session. The bug occurred because _currentTurnPersistedItemCount + // was incorrectly reset to 0 after resolveInterruptedTurn returned next_step_run_again, + // causing saveToSession to save all items from the beginning of the turn, including + // the already-persisted function_call item. + // + // Expected behavior: Only 1 function_call item should be saved to the session. + // Buggy behavior: 2 function_call items (duplicate) were saved. + // + // Test scenario: + // 1. Initial run with tool requiring approval creates an interruption + // 2. Tool call is approved + // 3. Run is resumed with the approved state + // 4. Session should contain exactly 1 function_call item (not 2) + + const getWeatherTool = tool({ + name: 'get_weather', + description: 'Get weather for a city', + parameters: z.object({ city: z.string() }), + needsApproval: async () => true, // Require approval for all calls + execute: async ({ city }) => `Sunny, 72°F in ${city}`, + }); + + const model = new FakeModel([ + // First response: tool call that requires approval + { + output: [ + { + type: 'function_call', + id: 'fc_1', + callId: 'call_weather_1', + name: 'get_weather', + status: 'completed', + arguments: JSON.stringify({ city: 'Oakland' }), + providerData: {}, + } as protocol.FunctionCallItem, + ], + usage: new Usage(), + }, + // Second response: after approval, final answer + { + output: [fakeModelMessage('The weather is sunny in Oakland.')], + usage: new Usage(), + }, + ]); + + const agent = new Agent({ + name: 'Assistant', + instructions: 'Use get_weather tool to answer weather questions.', + model, + tools: [getWeatherTool], + toolUseBehavior: 'run_llm_again', // Must use 'run_llm_again' so resolveInterruptedTurn returns next_step_run_again + }); + + const session = new MemorySession(); + + // Use sessionInputCallback to match the scenario from issue #701 + const sessionInputCallback = async ( + historyItems: AgentInputItem[], + newItems: AgentInputItem[], + ) => { + return [...historyItems, ...newItems]; + }; + + // Step 1: Initial run creates an interruption for tool approval + let result = await run( + agent, + [{ role: 'user', content: "What's the weather in Oakland?" }], + { session, sessionInputCallback }, + ); + + // Step 2: Approve the tool call + for (const interruption of result.interruptions || []) { + result.state.approve(interruption); + } + + // Step 3: Resume the run with the approved state + // Note: No sessionInputCallback on resume - this is part of the bug scenario + result = await run(agent, result.state, { session }); + + // Step 4: Verify only one function_call item exists in the session + const allItems = await session.getItems(); + const functionCalls = allItems.filter( + (item): item is protocol.FunctionCallItem => + item.type === 'function_call' && item.callId === 'call_weather_1', + ); + + // The bug would cause 2 function_call items to be saved (duplicate) + // The fix ensures only 1 function_call item is saved + expect(functionCalls).toHaveLength(1); + }); }); }); From 9a619ad7d6758e87b999c633ac7238bf75024741 Mon Sep 17 00:00:00 2001 From: Michael James Schock Date: Wed, 3 Dec 2025 17:35:13 -0800 Subject: [PATCH 4/4] fix: prevent duplicate function_call items in session history after resuming from interruptions --- packages/agents-core/src/run.ts | 61 ++++++++++++++++++++++++--------- 1 file changed, 45 insertions(+), 16 deletions(-) diff --git a/packages/agents-core/src/run.ts b/packages/agents-core/src/run.ts index 474f94c6..078614b4 100644 --- a/packages/agents-core/src/run.ts +++ b/packages/agents-core/src/run.ts @@ -732,24 +732,35 @@ export class Runner extends RunHooks> { state._originalInput = turnResult.originalInput; state._generatedItems = turnResult.generatedItems; - if (turnResult.nextStep.type === 'next_step_run_again') { - state._currentTurnPersistedItemCount = 0; - } - state._currentStep = turnResult.nextStep; + // Don't reset counter here - resolveInterruptedTurn already adjusted it via rewind logic + // The counter will be reset when _currentTurn is incremented (starting a new turn) if (turnResult.nextStep.type === 'next_step_interruption') { // we are still in an interruption, so we need to avoid an infinite loop + state._currentStep = turnResult.nextStep; return new RunResult(state); } - continue; + // If continuing from interruption with next_step_run_again, set step to undefined + // so the loop treats it as a new step without incrementing the turn. + // The counter has already been adjusted by resolveInterruptedTurn's rewind logic. + if (turnResult.nextStep.type === 'next_step_run_again') { + state._currentStep = undefined; + continue; + } + + state._currentStep = turnResult.nextStep; } if (state._currentStep.type === 'next_step_run_again') { const artifacts = await prepareAgentArtifacts(state); - state._currentTurn++; - state._currentTurnPersistedItemCount = 0; + // Only increment turn and reset counter if we're starting a new turn, + // not if we're continuing from an interruption (which would have _lastTurnResponse set) + if (!state._lastTurnResponse) { + state._currentTurn++; + state._currentTurnPersistedItemCount = 0; + } if (state._currentTurn > state._maxTurns) { state._currentAgentSpan?.setError({ @@ -770,7 +781,8 @@ export class Runner extends RunHooks> { let parallelGuardrailPromise: | Promise | undefined; - if (state._currentTurn === 1) { + // Only run input guardrails on the first turn of a new run, not when continuing from an interruption + if (state._currentTurn === 1 && !state._lastTurnResponse) { const guardrails = this.#splitInputGuardrails(state); if (guardrails.blocking.length > 0) { await this.#runInputGuardrails(state, guardrails.blocking); @@ -1021,22 +1033,35 @@ export class Runner extends RunHooks> { result.state._originalInput = turnResult.originalInput; result.state._generatedItems = turnResult.generatedItems; - if (turnResult.nextStep.type === 'next_step_run_again') { - result.state._currentTurnPersistedItemCount = 0; - } - result.state._currentStep = turnResult.nextStep; + // Don't reset counter here - resolveInterruptedTurn already adjusted it via rewind logic + // The counter will be reset when _currentTurn is incremented (starting a new turn) + if (turnResult.nextStep.type === 'next_step_interruption') { // we are still in an interruption, so we need to avoid an infinite loop + result.state._currentStep = turnResult.nextStep; return; } - continue; + + // If continuing from interruption with next_step_run_again, set step to undefined + // so the loop treats it as a new step without incrementing the turn. + // The counter has already been adjusted by resolveInterruptedTurn's rewind logic. + if (turnResult.nextStep.type === 'next_step_run_again') { + result.state._currentStep = undefined; + continue; + } + + result.state._currentStep = turnResult.nextStep; } if (result.state._currentStep.type === 'next_step_run_again') { const artifacts = await prepareAgentArtifacts(result.state); - result.state._currentTurn++; - result.state._currentTurnPersistedItemCount = 0; + // Only increment turn and reset counter if we're starting a new turn, + // not if we're continuing from an interruption (which would have _lastTurnResponse set) + if (!result.state._lastTurnResponse) { + result.state._currentTurn++; + result.state._currentTurnPersistedItemCount = 0; + } if (result.state._currentTurn > result.state._maxTurns) { result.state._currentAgentSpan?.setError({ @@ -1057,7 +1082,11 @@ export class Runner extends RunHooks> { let parallelGuardrailPromise: | Promise | undefined; - if (result.state._currentTurn === 1) { + // Only run input guardrails on the first turn of a new run, not when continuing from an interruption + if ( + result.state._currentTurn === 1 && + !result.state._lastTurnResponse + ) { const guardrails = this.#splitInputGuardrails(result.state); if (guardrails.blocking.length > 0) { await this.#runInputGuardrails(result.state, guardrails.blocking);