Skip to content

Commit 829ec28

Browse files
committed
fix: resolve race conditions in stream error handling and test flakiness
- Make debugTriggerStreamError async and await processingPromise to ensure error handling completes before returning (eliminates duplicate logic) - Wait for in-flight partial writes before writing error state to prevent inconsistent partial.json - Pass toolPolicy on resume to match original message (tools disabled) - Add delay after resume to let history update complete before reading
1 parent 8f162fb commit 829ec28

File tree

4 files changed

+36
-46
lines changed

4 files changed

+36
-46
lines changed

src/node/services/aiService.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1270,7 +1270,7 @@ export class AIService extends EventEmitter {
12701270
debugTriggerStreamError(
12711271
workspaceId: string,
12721272
errorMessage = "Test-triggered stream error"
1273-
): boolean {
1273+
): Promise<boolean> {
12741274
return this.streamManager.debugTriggerStreamError(workspaceId, errorMessage);
12751275
}
12761276

src/node/services/streamManager.ts

Lines changed: 20 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,8 +1177,14 @@ export class StreamManager extends EventEmitter {
11771177
},
11781178
parts: streamInfo.parts,
11791179
};
1180-
// Write error state to disk (fire-and-forget to not block error emission)
1181-
void this.partialService.writePartial(workspaceId as string, errorPartialMessage);
1180+
// Wait for any in-flight partial write to complete before writing error state
1181+
// This prevents race conditions where the error write and a throttled flush
1182+
// write at the same time, causing inconsistent partial.json state
1183+
if (streamInfo.partialWritePromise) {
1184+
await streamInfo.partialWritePromise;
1185+
}
1186+
// Write error state to disk - await to ensure consistent state before any resume
1187+
await this.partialService.writePartial(workspaceId as string, errorPartialMessage);
11821188

11831189
// Emit error event
11841190
this.emit("error", {
@@ -1683,11 +1689,12 @@ export class StreamManager extends EventEmitter {
16831689
/**
16841690
* DEBUG ONLY: Trigger an artificial stream error for testing
16851691
* This method allows integration tests to simulate stream errors without
1686-
* mocking the AI SDK or network layer. It triggers the same error handling
1687-
* path as genuine stream errors by aborting the stream and manually triggering
1688-
* the error event (since abort alone doesn't throw, it just sets a flag).
1692+
* mocking the AI SDK or network layer. It aborts the stream with an error,
1693+
* which causes the for-await loop to throw. The catch block in
1694+
* processStreamWithCleanup handles writing partial.json and emitting the
1695+
* error event - we just need to abort and wait.
16891696
*/
1690-
debugTriggerStreamError(workspaceId: string, errorMessage: string): boolean {
1697+
async debugTriggerStreamError(workspaceId: string, errorMessage: string): Promise<boolean> {
16911698
const typedWorkspaceId = workspaceId as WorkspaceId;
16921699
const streamInfo = this.workspaceStreams.get(typedWorkspaceId);
16931700

@@ -1699,41 +1706,15 @@ export class StreamManager extends EventEmitter {
16991706
return false;
17001707
}
17011708

1702-
// Abort the stream first
1709+
// Abort the stream with an error - this causes the for-await loop to throw
1710+
// when it's blocked waiting for the next iteration. The catch block in
1711+
// processStreamWithCleanup will handle:
1712+
// 1. Writing error state to partial.json (with all accumulated parts)
1713+
// 2. Emitting the error event
17031714
streamInfo.abortController.abort(new Error(errorMessage));
17041715

1705-
// Update streamInfo metadata with error (so subsequent flushes preserve it)
1706-
streamInfo.initialMetadata = {
1707-
...streamInfo.initialMetadata,
1708-
error: errorMessage,
1709-
errorType: "network",
1710-
};
1711-
1712-
// Write error state to partial.json (same as real error handling)
1713-
const errorPartialMessage: MuxMessage = {
1714-
id: streamInfo.messageId,
1715-
role: "assistant",
1716-
metadata: {
1717-
historySequence: streamInfo.historySequence,
1718-
timestamp: streamInfo.startTime,
1719-
model: streamInfo.model,
1720-
partial: true,
1721-
error: errorMessage,
1722-
errorType: "network", // Test errors are network-like
1723-
...streamInfo.initialMetadata,
1724-
},
1725-
parts: streamInfo.parts,
1726-
};
1727-
void this.partialService.writePartial(workspaceId, errorPartialMessage);
1728-
1729-
// Emit error event (same as real error handling)
1730-
this.emit("error", {
1731-
type: "error",
1732-
workspaceId,
1733-
messageId: streamInfo.messageId,
1734-
error: errorMessage,
1735-
errorType: "network",
1736-
} as ErrorEvent);
1716+
// Wait for the stream processing to complete (includes error handling)
1717+
await streamInfo.processingPromise;
17371718

17381719
return true;
17391720
}

src/node/services/workspaceService.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ export class WorkspaceService extends EventEmitter {
108108
* This is used by integration tests to simulate network errors mid-stream.
109109
* @returns true if an active stream was found and error was triggered
110110
*/
111-
debugTriggerStreamError(workspaceId: string, errorMessage?: string): boolean {
111+
debugTriggerStreamError(workspaceId: string, errorMessage?: string): Promise<boolean> {
112112
return this.aiService.debugTriggerStreamError(workspaceId, errorMessage);
113113
}
114114

tests/integration/streamErrorRecovery.test.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,27 +87,28 @@ function truncateToLastCompleteMarker(text: string, nonce: string): string {
8787
return text.substring(0, endIndex);
8888
}
8989

90-
import type { OrpcSource } from "./helpers";
9190
import type { OrpcTestClient } from "./orpcTestClient";
9291

9392
/**
9493
* Helper: Resume stream and wait for successful completion
9594
* Uses StreamCollector for ORPC-native event handling
9695
*/
9796
async function resumeAndWaitForSuccess(
98-
source: OrpcSource,
9997
workspaceId: string,
10098
client: OrpcTestClient,
10199
model: string,
102-
timeoutMs = 15000
100+
timeoutMs = 15000,
101+
options?: {
102+
toolPolicy?: Array<{ regex_match: string; action: "enable" | "disable" | "require" }>;
103+
}
103104
): Promise<void> {
104105
const collector = createStreamCollector(client, workspaceId);
105106
collector.start();
106107

107108
try {
108109
const resumeResult = await client.workspace.resumeStream({
109110
workspaceId,
110-
options: { model },
111+
options: { model, toolPolicy: options?.toolPolicy },
111112
});
112113

113114
if (!resumeResult.success) {
@@ -261,11 +262,19 @@ IMPORTANT: Do not add any other text. Start immediately with ${nonce}-1: one. If
261262
await new Promise((resolve) => setTimeout(resolve, 500));
262263

263264
// Resume and wait for completion
264-
await resumeAndWaitForSuccess(env, workspaceId, client, `${PROVIDER}:${MODEL}`);
265+
// Disable all tools (same as original message) so model outputs text, not tool calls
266+
await resumeAndWaitForSuccess(workspaceId, client, `${PROVIDER}:${MODEL}`, 15000, {
267+
toolPolicy: [{ regex_match: ".*", action: "disable" }],
268+
});
269+
270+
// Small delay to let history update complete after stream-end
271+
// stream-end is emitted before updateHistory completes
272+
await new Promise((resolve) => setTimeout(resolve, 500));
265273

266274
// Read final assistant message from history
267275
const history = await readChatHistory(env.tempDir, workspaceId);
268276
const assistantMessages = history.filter((m) => m.role === "assistant");
277+
269278
const finalText = assistantMessages
270279
.flatMap((m) => m.parts)
271280
.filter((p) => p.type === "text")

0 commit comments

Comments
 (0)