Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bun.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"lockfileVersion": 1,
"configVersion": 0,
"workspaces": {
"": {
"name": "mux",
Expand Down
7 changes: 6 additions & 1 deletion src/common/orpc/schemas/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,12 @@ export const workspace = {
interruptStream: {
input: z.object({
workspaceId: z.string(),
options: z.object({ abandonPartial: z.boolean().optional() }).optional(),
options: z
.object({
soft: z.boolean().optional(),
abandonPartial: z.boolean().optional(),
})
.optional(),
}),
output: ResultSchema(z.void(), z.string()),
},
Expand Down
11 changes: 7 additions & 4 deletions src/node/services/agentSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ export class AgentSession {
if (this.aiService.isStreaming(this.workspaceId)) {
// MUST use abandonPartial=true to prevent handleAbort from performing partial compaction
// with mismatched history (since we're about to truncate it)
const stopResult = await this.interruptStream(/* abandonPartial */ true);
const stopResult = await this.interruptStream({ abandonPartial: true });
if (!stopResult.success) {
return Err(createUnknownSendMessageError(stopResult.error));
}
Expand Down Expand Up @@ -405,7 +405,10 @@ export class AgentSession {
return this.streamWithHistory(model, options);
}

async interruptStream(abandonPartial?: boolean): Promise<Result<void>> {
async interruptStream(options?: {
soft?: boolean;
abandonPartial?: boolean;
}): Promise<Result<void>> {
this.assertNotDisposed("interruptStream");

if (!this.aiService.isStreaming(this.workspaceId)) {
Expand All @@ -415,14 +418,14 @@ export class AgentSession {
// Delete partial BEFORE stopping to prevent abort handler from committing it
// The abort handler in aiService.ts runs immediately when stopStream is called,
// so we must delete first to ensure it finds no partial to commit
if (abandonPartial) {
if (options?.abandonPartial) {
const deleteResult = await this.partialService.deletePartial(this.workspaceId);
if (!deleteResult.success) {
return Err(deleteResult.error);
}
}

const stopResult = await this.aiService.stopStream(this.workspaceId, abandonPartial);
const stopResult = await this.aiService.stopStream(this.workspaceId, options);
if (!stopResult.success) {
return Err(stopResult.error);
}
Expand Down
7 changes: 5 additions & 2 deletions src/node/services/aiService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1084,12 +1084,15 @@ export class AIService extends EventEmitter {
}
}

async stopStream(workspaceId: string, abandonPartial?: boolean): Promise<Result<void>> {
async stopStream(
workspaceId: string,
options?: { soft?: boolean; abandonPartial?: boolean }
): Promise<Result<void>> {
if (this.mockModeEnabled && this.mockScenarioPlayer) {
this.mockScenarioPlayer.stop(workspaceId);
return Ok(undefined);
}
return this.streamManager.stopStream(workspaceId, abandonPartial);
return this.streamManager.stopStream(workspaceId, options);
}

/**
Expand Down
107 changes: 84 additions & 23 deletions src/node/services/streamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ interface WorkspaceStreamInfo {
partialWritePromise?: Promise<void>;
// Track background processing promise for guaranteed cleanup
processingPromise: Promise<void>;
// Soft-interrupt state: when pending, stream will end at next block boundary
softInterrupt: { pending: false } | { pending: true; abandonPartial: boolean };
// Temporary directory for tool outputs (auto-cleaned when stream ends)
runtimeTempDir: string;
// Runtime for temp directory cleanup
Expand Down Expand Up @@ -418,38 +420,73 @@ export class StreamManager extends EventEmitter {
): Promise<void> {
try {
streamInfo.state = StreamState.STOPPING;

// Flush any pending partial write immediately (preserves work on interruption)
await this.flushPartialWrite(workspaceId, streamInfo);

streamInfo.abortController.abort();

// CRITICAL: Wait for processing to fully complete before cleanup
// This prevents race conditions where the old stream is still running
// while a new stream starts (e.g., old stream writing to partial.json)
await streamInfo.processingPromise;
await this.cleanupStream(workspaceId, streamInfo, abandonPartial);
} catch (error) {
console.error("Error during stream cancellation:", error);
// Force cleanup even if cancellation fails
this.workspaceStreams.delete(workspaceId);
}
}

// Get usage and duration metadata (usage may be undefined if aborted early)
const { usage, duration } = await this.getStreamMetadata(streamInfo);
// Checks if a soft interrupt is necessary, and performs one if so
// Similar to cancelStreamSafely but performs cleanup without blocking
private async checkSoftCancelStream(
workspaceId: WorkspaceId,
streamInfo: WorkspaceStreamInfo
): Promise<void> {
if (!streamInfo.softInterrupt.pending) return;
try {
streamInfo.state = StreamState.STOPPING;

// Emit abort event with usage if available
this.emit("stream-abort", {
type: "stream-abort",
workspaceId: workspaceId as string,
messageId: streamInfo.messageId,
metadata: { usage, duration },
abandonPartial,
});
// Flush any pending partial write immediately (preserves work on interruption)
await this.flushPartialWrite(workspaceId, streamInfo);

// Clean up immediately
this.workspaceStreams.delete(workspaceId);
streamInfo.abortController.abort();

// Return back to the stream loop so we can wait for it to finish before
// sending the stream abort event.
const abandonPartial = streamInfo.softInterrupt.pending
? streamInfo.softInterrupt.abandonPartial
: false;
void this.cleanupStream(workspaceId, streamInfo, abandonPartial);
} catch (error) {
console.error("Error during stream cancellation:", error);
// Force cleanup even if cancellation fails
this.workspaceStreams.delete(workspaceId);
}
}

private async cleanupStream(
workspaceId: WorkspaceId,
streamInfo: WorkspaceStreamInfo,
abandonPartial?: boolean
): Promise<void> {
// CRITICAL: Wait for processing to fully complete before cleanup
// This prevents race conditions where the old stream is still running
// while a new stream starts (e.g., old stream writing to partial.json)
await streamInfo.processingPromise;

// Get usage and duration metadata (usage may be undefined if aborted early)
const { usage, duration } = await this.getStreamMetadata(streamInfo);

// Emit abort event with usage if available
this.emit("stream-abort", {
type: "stream-abort",
workspaceId: workspaceId as string,
messageId: streamInfo.messageId,
metadata: { usage, duration },
abandonPartial,
});

// Clean up immediately
this.workspaceStreams.delete(workspaceId);
}

/**
* Atomically creates a new stream with all necessary setup
*/
Expand Down Expand Up @@ -555,6 +592,7 @@ export class StreamManager extends EventEmitter {
lastPartialWriteTime: 0, // Initialize to 0 to allow immediate first write
partialWritePromise: undefined, // No write in flight initially
processingPromise: Promise.resolve(), // Placeholder, overwritten in startStream
softInterrupt: { pending: false },
runtimeTempDir, // Stream-scoped temp directory for tool outputs
runtime, // Runtime for temp directory cleanup
};
Expand Down Expand Up @@ -718,6 +756,7 @@ export class StreamManager extends EventEmitter {
workspaceId: workspaceId as string,
messageId: streamInfo.messageId,
});
await this.checkSoftCancelStream(workspaceId, streamInfo);
break;
}

Expand Down Expand Up @@ -772,6 +811,7 @@ export class StreamManager extends EventEmitter {
strippedOutput
);
}
await this.checkSoftCancelStream(workspaceId, streamInfo);
break;
}

Expand Down Expand Up @@ -808,6 +848,7 @@ export class StreamManager extends EventEmitter {
toolErrorPart.toolName,
errorOutput
);
await this.checkSoftCancelStream(workspaceId, streamInfo);
break;
}

Expand Down Expand Up @@ -852,6 +893,7 @@ export class StreamManager extends EventEmitter {
case "start":
case "start-step":
case "text-start":
case "finish":
// These events can be logged or handled if needed
break;

Expand All @@ -869,13 +911,14 @@ export class StreamManager extends EventEmitter {
usage: finishStepPart.usage,
};
this.emit("usage-delta", usageEvent);
await this.checkSoftCancelStream(workspaceId, streamInfo);
break;
}

case "finish":
// No usage-delta here - totalUsage sums all steps, not current context.
// Last finish-step already has correct context window usage.
case "text-end": {
await this.checkSoftCancelStream(workspaceId, streamInfo);
break;
}
}
}

Expand Down Expand Up @@ -1363,14 +1406,32 @@ export class StreamManager extends EventEmitter {

/**
* Stops an active stream for a workspace
* First call: Sets soft interrupt and emits delta event → frontend shows "Interrupting..."
* Second call: Hard aborts the stream immediately
*/
async stopStream(workspaceId: string, abandonPartial?: boolean): Promise<Result<void>> {
async stopStream(
workspaceId: string,
options?: { soft?: boolean; abandonPartial?: boolean }
): Promise<Result<void>> {
const typedWorkspaceId = workspaceId as WorkspaceId;

try {
const streamInfo = this.workspaceStreams.get(typedWorkspaceId);
if (streamInfo) {
await this.cancelStreamSafely(typedWorkspaceId, streamInfo, abandonPartial);
if (!streamInfo) {
return Ok(undefined); // No active stream
}

const soft = options?.soft ?? false;

if (soft) {
// Soft interrupt: set flag, will cancel at next block boundary
streamInfo.softInterrupt = {
pending: true,
abandonPartial: options?.abandonPartial ?? false,
};
} else {
// Hard interrupt: cancel immediately
await this.cancelStreamSafely(typedWorkspaceId, streamInfo, options?.abandonPartial);
}
return Ok(undefined);
} catch (error) {
Expand Down
4 changes: 2 additions & 2 deletions src/node/services/workspaceService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -875,11 +875,11 @@ export class WorkspaceService extends EventEmitter {

async interruptStream(
workspaceId: string,
options?: { abandonPartial?: boolean }
options?: { soft?: boolean; abandonPartial?: boolean }
): Promise<Result<void>> {
try {
const session = this.getOrCreateSession(workspaceId);
const stopResult = await session.interruptStream(options?.abandonPartial);
const stopResult = await session.interruptStream(options);
if (!stopResult.success) {
log.error("Failed to stop stream:", stopResult.error);
return Err(stopResult.error);
Expand Down