Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/node/services/aiService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,7 @@ export class AIService extends EventEmitter {
debugTriggerStreamError(
workspaceId: string,
errorMessage = "Test-triggered stream error"
): boolean {
): Promise<boolean> {
return this.streamManager.debugTriggerStreamError(workspaceId, errorMessage);
}

Expand Down
29 changes: 23 additions & 6 deletions src/node/services/streamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1177,8 +1177,14 @@ export class StreamManager extends EventEmitter {
},
parts: streamInfo.parts,
};
// Write error state to disk (fire-and-forget to not block error emission)
void this.partialService.writePartial(workspaceId as string, errorPartialMessage);
// Wait for any in-flight partial write to complete before writing error state
// This prevents race conditions where the error write and a throttled flush
// write at the same time, causing inconsistent partial.json state
if (streamInfo.partialWritePromise) {
await streamInfo.partialWritePromise;
}
// Write error state to disk - await to ensure consistent state before any resume
await this.partialService.writePartial(workspaceId as string, errorPartialMessage);

// Emit error event
this.emit("error", {
Expand Down Expand Up @@ -1685,9 +1691,10 @@ export class StreamManager extends EventEmitter {
* This method allows integration tests to simulate stream errors without
* mocking the AI SDK or network layer. It triggers the same error handling
* path as genuine stream errors by aborting the stream and manually triggering
* the error event (since abort alone doesn't throw, it just sets a flag).
* the error event (since abort alone doesn't throw, it just sets a flag that
* causes the for-await loop to break cleanly).
*/
debugTriggerStreamError(workspaceId: string, errorMessage: string): boolean {
async debugTriggerStreamError(workspaceId: string, errorMessage: string): Promise<boolean> {
const typedWorkspaceId = workspaceId as WorkspaceId;
const streamInfo = this.workspaceStreams.get(typedWorkspaceId);

Expand All @@ -1699,9 +1706,12 @@ export class StreamManager extends EventEmitter {
return false;
}

// Abort the stream first
// Abort the stream first (causes for-await loop to break cleanly)
streamInfo.abortController.abort(new Error(errorMessage));

// Mark as error state (same as catch block does)
streamInfo.state = StreamState.ERROR;

// Update streamInfo metadata with error (so subsequent flushes preserve it)
streamInfo.initialMetadata = {
...streamInfo.initialMetadata,
Expand All @@ -1710,6 +1720,10 @@ export class StreamManager extends EventEmitter {
};

// Write error state to partial.json (same as real error handling)
// Wait for any in-flight partial write to complete first
if (streamInfo.partialWritePromise) {
await streamInfo.partialWritePromise;
}
const errorPartialMessage: MuxMessage = {
id: streamInfo.messageId,
role: "assistant",
Expand All @@ -1724,7 +1738,7 @@ export class StreamManager extends EventEmitter {
},
parts: streamInfo.parts,
};
void this.partialService.writePartial(workspaceId, errorPartialMessage);
await this.partialService.writePartial(workspaceId, errorPartialMessage);

// Emit error event (same as real error handling)
this.emit("error", {
Expand All @@ -1735,6 +1749,9 @@ export class StreamManager extends EventEmitter {
errorType: "network",
} as ErrorEvent);

// Wait for the stream processing to complete (cleanup)
await streamInfo.processingPromise;

return true;
}
}
2 changes: 1 addition & 1 deletion src/node/services/workspaceService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export class WorkspaceService extends EventEmitter {
* This is used by integration tests to simulate network errors mid-stream.
* @returns true if an active stream was found and error was triggered
*/
debugTriggerStreamError(workspaceId: string, errorMessage?: string): boolean {
debugTriggerStreamError(workspaceId: string, errorMessage?: string): Promise<boolean> {
return this.aiService.debugTriggerStreamError(workspaceId, errorMessage);
}

Expand Down
19 changes: 14 additions & 5 deletions tests/integration/streamErrorRecovery.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,27 +87,28 @@ function truncateToLastCompleteMarker(text: string, nonce: string): string {
return text.substring(0, endIndex);
}

import type { OrpcSource } from "./helpers";
import type { OrpcTestClient } from "./orpcTestClient";

/**
* Helper: Resume stream and wait for successful completion
* Uses StreamCollector for ORPC-native event handling
*/
async function resumeAndWaitForSuccess(
source: OrpcSource,
workspaceId: string,
client: OrpcTestClient,
model: string,
timeoutMs = 15000
timeoutMs = 15000,
options?: {
toolPolicy?: Array<{ regex_match: string; action: "enable" | "disable" | "require" }>;
}
): Promise<void> {
const collector = createStreamCollector(client, workspaceId);
collector.start();

try {
const resumeResult = await client.workspace.resumeStream({
workspaceId,
options: { model },
options: { model, toolPolicy: options?.toolPolicy },
});

if (!resumeResult.success) {
Expand Down Expand Up @@ -261,11 +262,19 @@ IMPORTANT: Do not add any other text. Start immediately with ${nonce}-1: one. If
await new Promise((resolve) => setTimeout(resolve, 500));

// Resume and wait for completion
await resumeAndWaitForSuccess(env, workspaceId, client, `${PROVIDER}:${MODEL}`);
// Disable all tools (same as original message) so model outputs text, not tool calls
await resumeAndWaitForSuccess(workspaceId, client, `${PROVIDER}:${MODEL}`, 15000, {
toolPolicy: [{ regex_match: ".*", action: "disable" }],
});

// Small delay to let history update complete after stream-end
// stream-end is emitted before updateHistory completes
await new Promise((resolve) => setTimeout(resolve, 500));

// Read final assistant message from history
const history = await readChatHistory(env.tempDir, workspaceId);
const assistantMessages = history.filter((m) => m.role === "assistant");

const finalText = assistantMessages
.flatMap((m) => m.parts)
.filter((p) => p.type === "text")
Expand Down