Skip to content

Commit f4c4d3c

Browse files
authored
🤖 fix: resolve race conditions in stream error handling and test flakiness (#880)
## Summary - Make `debugTriggerStreamError` async and await `processingPromise` to ensure error handling completes before returning (eliminates duplicate error-handling logic) - Wait for in-flight partial writes before writing error state to prevent inconsistent `partial.json` - Pass `toolPolicy` on resume to match original message (tools disabled → text output) - Add delay after resume to let history update complete before reading _Generated with `mux`_
1 parent 1974be6 commit f4c4d3c

File tree

4 files changed

+39
-13
lines changed

4 files changed

+39
-13
lines changed

‎src/node/services/aiService.ts‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1270,7 +1270,7 @@ export class AIService extends EventEmitter {
12701270
debugTriggerStreamError(
12711271
workspaceId: string,
12721272
errorMessage = "Test-triggered stream error"
1273-
): boolean {
1273+
): Promise<boolean> {
12741274
return this.streamManager.debugTriggerStreamError(workspaceId, errorMessage);
12751275
}
12761276

‎src/node/services/streamManager.ts‎

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,8 +1177,14 @@ export class StreamManager extends EventEmitter {
11771177
},
11781178
parts: streamInfo.parts,
11791179
};
1180-
// Write error state to disk (fire-and-forget to not block error emission)
1181-
void this.partialService.writePartial(workspaceId as string, errorPartialMessage);
1180+
// Wait for any in-flight partial write to complete before writing error state
1181+
// This prevents race conditions where the error write and a throttled flush
1182+
// write at the same time, causing inconsistent partial.json state
1183+
if (streamInfo.partialWritePromise) {
1184+
await streamInfo.partialWritePromise;
1185+
}
1186+
// Write error state to disk - await to ensure consistent state before any resume
1187+
await this.partialService.writePartial(workspaceId as string, errorPartialMessage);
11821188

11831189
// Emit error event
11841190
this.emit("error", {
@@ -1685,9 +1691,10 @@ export class StreamManager extends EventEmitter {
16851691
* This method allows integration tests to simulate stream errors without
16861692
* mocking the AI SDK or network layer. It triggers the same error handling
16871693
* path as genuine stream errors by aborting the stream and manually triggering
1688-
* the error event (since abort alone doesn't throw, it just sets a flag).
1694+
* the error event (since abort alone doesn't throw, it just sets a flag that
1695+
* causes the for-await loop to break cleanly).
16891696
*/
1690-
debugTriggerStreamError(workspaceId: string, errorMessage: string): boolean {
1697+
async debugTriggerStreamError(workspaceId: string, errorMessage: string): Promise<boolean> {
16911698
const typedWorkspaceId = workspaceId as WorkspaceId;
16921699
const streamInfo = this.workspaceStreams.get(typedWorkspaceId);
16931700

@@ -1699,9 +1706,12 @@ export class StreamManager extends EventEmitter {
16991706
return false;
17001707
}
17011708

1702-
// Abort the stream first
1709+
// Abort the stream first (causes for-await loop to break cleanly)
17031710
streamInfo.abortController.abort(new Error(errorMessage));
17041711

1712+
// Mark as error state (same as catch block does)
1713+
streamInfo.state = StreamState.ERROR;
1714+
17051715
// Update streamInfo metadata with error (so subsequent flushes preserve it)
17061716
streamInfo.initialMetadata = {
17071717
...streamInfo.initialMetadata,
@@ -1710,6 +1720,10 @@ export class StreamManager extends EventEmitter {
17101720
};
17111721

17121722
// Write error state to partial.json (same as real error handling)
1723+
// Wait for any in-flight partial write to complete first
1724+
if (streamInfo.partialWritePromise) {
1725+
await streamInfo.partialWritePromise;
1726+
}
17131727
const errorPartialMessage: MuxMessage = {
17141728
id: streamInfo.messageId,
17151729
role: "assistant",
@@ -1724,7 +1738,7 @@ export class StreamManager extends EventEmitter {
17241738
},
17251739
parts: streamInfo.parts,
17261740
};
1727-
void this.partialService.writePartial(workspaceId, errorPartialMessage);
1741+
await this.partialService.writePartial(workspaceId, errorPartialMessage);
17281742

17291743
// Emit error event (same as real error handling)
17301744
this.emit("error", {
@@ -1735,6 +1749,9 @@ export class StreamManager extends EventEmitter {
17351749
errorType: "network",
17361750
} as ErrorEvent);
17371751

1752+
// Wait for the stream processing to complete (cleanup)
1753+
await streamInfo.processingPromise;
1754+
17381755
return true;
17391756
}
17401757
}

‎src/node/services/workspaceService.ts‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ export class WorkspaceService extends EventEmitter {
108108
* This is used by integration tests to simulate network errors mid-stream.
109109
* @returns true if an active stream was found and error was triggered
110110
*/
111-
debugTriggerStreamError(workspaceId: string, errorMessage?: string): boolean {
111+
debugTriggerStreamError(workspaceId: string, errorMessage?: string): Promise<boolean> {
112112
return this.aiService.debugTriggerStreamError(workspaceId, errorMessage);
113113
}
114114

‎tests/integration/streamErrorRecovery.test.ts‎

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,27 +87,28 @@ function truncateToLastCompleteMarker(text: string, nonce: string): string {
8787
return text.substring(0, endIndex);
8888
}
8989

90-
import type { OrpcSource } from "./helpers";
9190
import type { OrpcTestClient } from "./orpcTestClient";
9291

9392
/**
9493
* Helper: Resume stream and wait for successful completion
9594
* Uses StreamCollector for ORPC-native event handling
9695
*/
9796
async function resumeAndWaitForSuccess(
98-
source: OrpcSource,
9997
workspaceId: string,
10098
client: OrpcTestClient,
10199
model: string,
102-
timeoutMs = 15000
100+
timeoutMs = 15000,
101+
options?: {
102+
toolPolicy?: Array<{ regex_match: string; action: "enable" | "disable" | "require" }>;
103+
}
103104
): Promise<void> {
104105
const collector = createStreamCollector(client, workspaceId);
105106
collector.start();
106107

107108
try {
108109
const resumeResult = await client.workspace.resumeStream({
109110
workspaceId,
110-
options: { model },
111+
options: { model, toolPolicy: options?.toolPolicy },
111112
});
112113

113114
if (!resumeResult.success) {
@@ -261,11 +262,19 @@ IMPORTANT: Do not add any other text. Start immediately with ${nonce}-1: one. If
261262
await new Promise((resolve) => setTimeout(resolve, 500));
262263

263264
// Resume and wait for completion
264-
await resumeAndWaitForSuccess(env, workspaceId, client, `${PROVIDER}:${MODEL}`);
265+
// Disable all tools (same as original message) so model outputs text, not tool calls
266+
await resumeAndWaitForSuccess(workspaceId, client, `${PROVIDER}:${MODEL}`, 15000, {
267+
toolPolicy: [{ regex_match: ".*", action: "disable" }],
268+
});
269+
270+
// Small delay to let history update complete after stream-end
271+
// stream-end is emitted before updateHistory completes
272+
await new Promise((resolve) => setTimeout(resolve, 500));
265273

266274
// Read final assistant message from history
267275
const history = await readChatHistory(env.tempDir, workspaceId);
268276
const assistantMessages = history.filter((m) => m.role === "assistant");
277+
269278
const finalText = assistantMessages
270279
.flatMap((m) => m.parts)
271280
.filter((p) => p.type === "text")

0 commit comments

Comments
 (0)