Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bun.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"lockfileVersion": 1,
"configVersion": 0,
"workspaces": {
"": {
"name": "mux",
Expand Down
7 changes: 6 additions & 1 deletion src/common/orpc/schemas/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,12 @@ export const workspace = {
interruptStream: {
input: z.object({
workspaceId: z.string(),
options: z.object({ abandonPartial: z.boolean().optional() }).optional(),
options: z
.object({
soft: z.boolean().optional(),
abandonPartial: z.boolean().optional(),
})
.optional(),
}),
output: ResultSchema(z.void(), z.string()),
},
Expand Down
17 changes: 10 additions & 7 deletions src/node/services/agentSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ export class AgentSession {
if (this.aiService.isStreaming(this.workspaceId)) {
// MUST use abandonPartial=true to prevent handleAbort from performing partial compaction
// with mismatched history (since we're about to truncate it)
const stopResult = await this.interruptStream(/* abandonPartial */ true);
const stopResult = await this.interruptStream({ abandonPartial: true });
if (!stopResult.success) {
return Err(createUnknownSendMessageError(stopResult.error));
}
Expand Down Expand Up @@ -405,24 +405,27 @@ export class AgentSession {
return this.streamWithHistory(model, options);
}

async interruptStream(abandonPartial?: boolean): Promise<Result<void>> {
async interruptStream(options?: {
soft?: boolean;
abandonPartial?: boolean;
}): Promise<Result<void>> {
this.assertNotDisposed("interruptStream");

if (!this.aiService.isStreaming(this.workspaceId)) {
return Ok(undefined);
}

// Delete partial BEFORE stopping to prevent abort handler from committing it
// The abort handler in aiService.ts runs immediately when stopStream is called,
// so we must delete first to ensure it finds no partial to commit
if (abandonPartial) {
// For hard interrupts, delete partial BEFORE stopping to prevent abort handler
// from committing it. For soft interrupts, defer to stream-abort handler since
// the stream continues running and would recreate the partial.
if (options?.abandonPartial && !options?.soft) {
const deleteResult = await this.partialService.deletePartial(this.workspaceId);
if (!deleteResult.success) {
return Err(deleteResult.error);
}
}

const stopResult = await this.aiService.stopStream(this.workspaceId, abandonPartial);
const stopResult = await this.aiService.stopStream(this.workspaceId, options);
if (!stopResult.success) {
return Err(stopResult.error);
}
Expand Down
24 changes: 15 additions & 9 deletions src/node/services/aiService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,17 +197,20 @@ export class AIService extends EventEmitter {
this.streamManager.on("stream-delta", (data) => this.emit("stream-delta", data));
this.streamManager.on("stream-end", (data) => this.emit("stream-end", data));

// Handle stream-abort: commit partial to history before forwarding
// Note: If abandonPartial option was used, partial is already deleted by IPC handler
// Handle stream-abort: dispose of partial based on abandonPartial flag
this.streamManager.on("stream-abort", (data: StreamAbortEvent) => {
void (async () => {
// Check if partial still exists (not abandoned)
const partial = await this.partialService.readPartial(data.workspaceId);
if (partial) {
if (data.abandonPartial) {
// Caller requested discarding partial - delete without committing
await this.partialService.deletePartial(data.workspaceId);
} else {
// Commit interrupted message to history with partial:true metadata
// This ensures /clear and /truncate can clean up interrupted messages
await this.partialService.commitToHistory(data.workspaceId);
await this.partialService.deletePartial(data.workspaceId);
const partial = await this.partialService.readPartial(data.workspaceId);
if (partial) {
await this.partialService.commitToHistory(data.workspaceId);
await this.partialService.deletePartial(data.workspaceId);
}
}

// Forward abort event to consumers
Expand Down Expand Up @@ -1084,12 +1087,15 @@ export class AIService extends EventEmitter {
}
}

async stopStream(workspaceId: string, abandonPartial?: boolean): Promise<Result<void>> {
async stopStream(
workspaceId: string,
options?: { soft?: boolean; abandonPartial?: boolean }
): Promise<Result<void>> {
if (this.mockModeEnabled && this.mockScenarioPlayer) {
this.mockScenarioPlayer.stop(workspaceId);
return Ok(undefined);
}
return this.streamManager.stopStream(workspaceId, abandonPartial);
return this.streamManager.stopStream(workspaceId, options);
}

/**
Expand Down
107 changes: 84 additions & 23 deletions src/node/services/streamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ interface WorkspaceStreamInfo {
partialWritePromise?: Promise<void>;
// Track background processing promise for guaranteed cleanup
processingPromise: Promise<void>;
// Soft-interrupt state: when pending, stream will end at next block boundary
softInterrupt: { pending: false } | { pending: true; abandonPartial: boolean };
// Temporary directory for tool outputs (auto-cleaned when stream ends)
runtimeTempDir: string;
// Runtime for temp directory cleanup
Expand Down Expand Up @@ -418,38 +420,73 @@ export class StreamManager extends EventEmitter {
): Promise<void> {
try {
streamInfo.state = StreamState.STOPPING;

// Flush any pending partial write immediately (preserves work on interruption)
await this.flushPartialWrite(workspaceId, streamInfo);

streamInfo.abortController.abort();

// CRITICAL: Wait for processing to fully complete before cleanup
// This prevents race conditions where the old stream is still running
// while a new stream starts (e.g., old stream writing to partial.json)
await streamInfo.processingPromise;
await this.cleanupStream(workspaceId, streamInfo, abandonPartial);
} catch (error) {
console.error("Error during stream cancellation:", error);
// Force cleanup even if cancellation fails
this.workspaceStreams.delete(workspaceId);
}
}

// Get usage and duration metadata (usage may be undefined if aborted early)
const { usage, duration } = await this.getStreamMetadata(streamInfo);
// Checks if a soft interrupt is necessary, and performs one if so
// Similar to cancelStreamSafely but performs cleanup without blocking
private async checkSoftCancelStream(
workspaceId: WorkspaceId,
streamInfo: WorkspaceStreamInfo
): Promise<void> {
if (!streamInfo.softInterrupt.pending) return;
try {
streamInfo.state = StreamState.STOPPING;

// Emit abort event with usage if available
this.emit("stream-abort", {
type: "stream-abort",
workspaceId: workspaceId as string,
messageId: streamInfo.messageId,
metadata: { usage, duration },
abandonPartial,
});
// Flush any pending partial write immediately (preserves work on interruption)
await this.flushPartialWrite(workspaceId, streamInfo);

// Clean up immediately
this.workspaceStreams.delete(workspaceId);
streamInfo.abortController.abort();

// Return back to the stream loop so we can wait for it to finish before
// sending the stream abort event.
const abandonPartial = streamInfo.softInterrupt.pending
? streamInfo.softInterrupt.abandonPartial
: false;
void this.cleanupStream(workspaceId, streamInfo, abandonPartial);
} catch (error) {
console.error("Error during stream cancellation:", error);
// Force cleanup even if cancellation fails
this.workspaceStreams.delete(workspaceId);
}
}

private async cleanupStream(
workspaceId: WorkspaceId,
streamInfo: WorkspaceStreamInfo,
abandonPartial?: boolean
): Promise<void> {
// CRITICAL: Wait for processing to fully complete before cleanup
// This prevents race conditions where the old stream is still running
// while a new stream starts (e.g., old stream writing to partial.json)
await streamInfo.processingPromise;

// Get usage and duration metadata (usage may be undefined if aborted early)
const { usage, duration } = await this.getStreamMetadata(streamInfo);

// Emit abort event with usage if available
this.emit("stream-abort", {
type: "stream-abort",
workspaceId: workspaceId as string,
messageId: streamInfo.messageId,
metadata: { usage, duration },
abandonPartial,
});

// Clean up immediately
this.workspaceStreams.delete(workspaceId);
}

/**
* Atomically creates a new stream with all necessary setup
*/
Expand Down Expand Up @@ -555,6 +592,7 @@ export class StreamManager extends EventEmitter {
lastPartialWriteTime: 0, // Initialize to 0 to allow immediate first write
partialWritePromise: undefined, // No write in flight initially
processingPromise: Promise.resolve(), // Placeholder, overwritten in startStream
softInterrupt: { pending: false },
runtimeTempDir, // Stream-scoped temp directory for tool outputs
runtime, // Runtime for temp directory cleanup
};
Expand Down Expand Up @@ -718,6 +756,7 @@ export class StreamManager extends EventEmitter {
workspaceId: workspaceId as string,
messageId: streamInfo.messageId,
});
await this.checkSoftCancelStream(workspaceId, streamInfo);
break;
}

Expand Down Expand Up @@ -772,6 +811,7 @@ export class StreamManager extends EventEmitter {
strippedOutput
);
}
await this.checkSoftCancelStream(workspaceId, streamInfo);
break;
}

Expand Down Expand Up @@ -808,6 +848,7 @@ export class StreamManager extends EventEmitter {
toolErrorPart.toolName,
errorOutput
);
await this.checkSoftCancelStream(workspaceId, streamInfo);
break;
}

Expand Down Expand Up @@ -852,6 +893,7 @@ export class StreamManager extends EventEmitter {
case "start":
case "start-step":
case "text-start":
case "finish":
// These events can be logged or handled if needed
break;

Expand All @@ -869,13 +911,14 @@ export class StreamManager extends EventEmitter {
usage: finishStepPart.usage,
};
this.emit("usage-delta", usageEvent);
await this.checkSoftCancelStream(workspaceId, streamInfo);
break;
}

case "finish":
// No usage-delta here - totalUsage sums all steps, not current context.
// Last finish-step already has correct context window usage.
case "text-end": {
await this.checkSoftCancelStream(workspaceId, streamInfo);
break;
}
}
}

Expand Down Expand Up @@ -1363,14 +1406,32 @@ export class StreamManager extends EventEmitter {

/**
* Stops an active stream for a workspace
* First call: Sets soft interrupt and emits delta event → frontend shows "Interrupting..."
* Second call: Hard aborts the stream immediately
*/
async stopStream(workspaceId: string, abandonPartial?: boolean): Promise<Result<void>> {
async stopStream(
workspaceId: string,
options?: { soft?: boolean; abandonPartial?: boolean }
): Promise<Result<void>> {
const typedWorkspaceId = workspaceId as WorkspaceId;

try {
const streamInfo = this.workspaceStreams.get(typedWorkspaceId);
if (streamInfo) {
await this.cancelStreamSafely(typedWorkspaceId, streamInfo, abandonPartial);
if (!streamInfo) {
return Ok(undefined); // No active stream
}

const soft = options?.soft ?? false;

if (soft) {
// Soft interrupt: set flag, will cancel at next block boundary
streamInfo.softInterrupt = {
pending: true,
abandonPartial: options?.abandonPartial ?? false,
};
} else {
// Hard interrupt: cancel immediately
await this.cancelStreamSafely(typedWorkspaceId, streamInfo, options?.abandonPartial);
}
return Ok(undefined);
} catch (error) {
Expand Down
8 changes: 5 additions & 3 deletions src/node/services/workspaceService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -875,17 +875,19 @@ export class WorkspaceService extends EventEmitter {

async interruptStream(
workspaceId: string,
options?: { abandonPartial?: boolean }
options?: { soft?: boolean; abandonPartial?: boolean }
): Promise<Result<void>> {
try {
const session = this.getOrCreateSession(workspaceId);
const stopResult = await session.interruptStream(options?.abandonPartial);
const stopResult = await session.interruptStream(options);
if (!stopResult.success) {
log.error("Failed to stop stream:", stopResult.error);
return Err(stopResult.error);
}

if (options?.abandonPartial) {
// For hard interrupts, delete partial immediately. For soft interrupts,
// defer to stream-abort handler (stream is still running and may recreate partial).
if (options?.abandonPartial && !options?.soft) {
log.debug("Abandoning partial for workspace:", workspaceId);
await this.partialService.deletePartial(workspaceId);
}
Expand Down