From 50675c3f23a399a3be5ca3493e606206d2e0c1e0 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 3 Dec 2025 14:40:45 +0100 Subject: [PATCH] fix: resolve race conditions in stream error handling and test flakiness - Make debugTriggerStreamError async: - Await in-flight partial writes before writing error state - Await processingPromise to ensure stream cleanup completes - Keep manual error handling (abort alone causes clean break, not throw) - Fix race condition in error catch block: await partial writes before writing error state to prevent inconsistent partial.json - Test fixes: - Pass toolPolicy on resume to match original message (tools disabled) - Add delay after resume to let history update complete --- src/node/services/aiService.ts | 2 +- src/node/services/streamManager.ts | 29 +++++++++++++++---- src/node/services/workspaceService.ts | 2 +- tests/integration/streamErrorRecovery.test.ts | 19 ++++++++---- 4 files changed, 39 insertions(+), 13 deletions(-) diff --git a/src/node/services/aiService.ts b/src/node/services/aiService.ts index d7c00b6da..2b3b3dceb 100644 --- a/src/node/services/aiService.ts +++ b/src/node/services/aiService.ts @@ -1270,7 +1270,7 @@ export class AIService extends EventEmitter { debugTriggerStreamError( workspaceId: string, errorMessage = "Test-triggered stream error" - ): boolean { + ): Promise { return this.streamManager.debugTriggerStreamError(workspaceId, errorMessage); } diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index d0f7f6f09..61f80eb41 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -1177,8 +1177,14 @@ export class StreamManager extends EventEmitter { }, parts: streamInfo.parts, }; - // Write error state to disk (fire-and-forget to not block error emission) - void this.partialService.writePartial(workspaceId as string, errorPartialMessage); + // Wait for any in-flight partial write to complete before writing error state + // This prevents race conditions where the error write and a throttled flush + // write at the same time, causing inconsistent partial.json state + if (streamInfo.partialWritePromise) { + await streamInfo.partialWritePromise; + } + // Write error state to disk - await to ensure consistent state before any resume + await this.partialService.writePartial(workspaceId as string, errorPartialMessage); // Emit error event this.emit("error", { @@ -1685,9 +1691,10 @@ export class StreamManager extends EventEmitter { * This method allows integration tests to simulate stream errors without * mocking the AI SDK or network layer. It triggers the same error handling * path as genuine stream errors by aborting the stream and manually triggering - * the error event (since abort alone doesn't throw, it just sets a flag). + * the error event (since abort alone doesn't throw, it just sets a flag that + * causes the for-await loop to break cleanly). */ - debugTriggerStreamError(workspaceId: string, errorMessage: string): boolean { + async debugTriggerStreamError(workspaceId: string, errorMessage: string): Promise { const typedWorkspaceId = workspaceId as WorkspaceId; const streamInfo = this.workspaceStreams.get(typedWorkspaceId); @@ -1699,9 +1706,12 @@ export class StreamManager extends EventEmitter { return false; } - // Abort the stream first + // Abort the stream first (causes for-await loop to break cleanly) streamInfo.abortController.abort(new Error(errorMessage)); + // Mark as error state (same as catch block does) + streamInfo.state = StreamState.ERROR; + // Update streamInfo metadata with error (so subsequent flushes preserve it) streamInfo.initialMetadata = { ...streamInfo.initialMetadata, @@ -1710,6 +1720,10 @@ export class StreamManager extends EventEmitter { }; // Write error state to partial.json (same as real error handling) + // Wait for any in-flight partial write to complete first + if (streamInfo.partialWritePromise) { + await streamInfo.partialWritePromise; + } const errorPartialMessage: MuxMessage = { id: streamInfo.messageId, role: "assistant", @@ -1724,7 +1738,7 @@ export class StreamManager extends EventEmitter { }, parts: streamInfo.parts, }; - void this.partialService.writePartial(workspaceId, errorPartialMessage); + await this.partialService.writePartial(workspaceId, errorPartialMessage); // Emit error event (same as real error handling) this.emit("error", { @@ -1735,6 +1749,9 @@ export class StreamManager extends EventEmitter { errorType: "network", } as ErrorEvent); + // Wait for the stream processing to complete (cleanup) + await streamInfo.processingPromise; + return true; } } diff --git a/src/node/services/workspaceService.ts b/src/node/services/workspaceService.ts index e47b3e59a..48616be6c 100644 --- a/src/node/services/workspaceService.ts +++ b/src/node/services/workspaceService.ts @@ -108,7 +108,7 @@ export class WorkspaceService extends EventEmitter { * This is used by integration tests to simulate network errors mid-stream. * @returns true if an active stream was found and error was triggered */ - debugTriggerStreamError(workspaceId: string, errorMessage?: string): boolean { + debugTriggerStreamError(workspaceId: string, errorMessage?: string): Promise { return this.aiService.debugTriggerStreamError(workspaceId, errorMessage); } diff --git a/tests/integration/streamErrorRecovery.test.ts b/tests/integration/streamErrorRecovery.test.ts index f09d6231b..7e80864d0 100644 --- a/tests/integration/streamErrorRecovery.test.ts +++ b/tests/integration/streamErrorRecovery.test.ts @@ -87,7 +87,6 @@ function truncateToLastCompleteMarker(text: string, nonce: string): string { return text.substring(0, endIndex); } -import type { OrpcSource } from "./helpers"; import type { OrpcTestClient } from "./orpcTestClient"; /** @@ -95,11 +94,13 @@ import type { OrpcTestClient } from "./orpcTestClient"; * Uses StreamCollector for ORPC-native event handling */ async function resumeAndWaitForSuccess( - source: OrpcSource, workspaceId: string, client: OrpcTestClient, model: string, - timeoutMs = 15000 + timeoutMs = 15000, + options?: { + toolPolicy?: Array<{ regex_match: string; action: "enable" | "disable" | "require" }>; + } ): Promise { const collector = createStreamCollector(client, workspaceId); collector.start(); @@ -107,7 +108,7 @@ async function resumeAndWaitForSuccess( try { const resumeResult = await client.workspace.resumeStream({ workspaceId, - options: { model }, + options: { model, toolPolicy: options?.toolPolicy }, }); if (!resumeResult.success) { @@ -261,11 +262,19 @@ IMPORTANT: Do not add any other text. Start immediately with ${nonce}-1: one. If await new Promise((resolve) => setTimeout(resolve, 500)); // Resume and wait for completion - await resumeAndWaitForSuccess(env, workspaceId, client, `${PROVIDER}:${MODEL}`); + // Disable all tools (same as original message) so model outputs text, not tool calls + await resumeAndWaitForSuccess(workspaceId, client, `${PROVIDER}:${MODEL}`, 15000, { + toolPolicy: [{ regex_match: ".*", action: "disable" }], + }); + + // Small delay to let history update complete after stream-end + // stream-end is emitted before updateHistory completes + await new Promise((resolve) => setTimeout(resolve, 500)); // Read final assistant message from history const history = await readChatHistory(env.tempDir, workspaceId); const assistantMessages = history.filter((m) => m.role === "assistant"); + const finalText = assistantMessages .flatMap((m) => m.parts) .filter((p) => p.type === "text")