diff --git a/bun.lock b/bun.lock index a06261494..2d4ab805b 100644 --- a/bun.lock +++ b/bun.lock @@ -964,7 +964,7 @@ "@types/ms": ["@types/ms@2.1.0", "", {}, "sha512-GsCCIZDE/p3i96vtEqx+7dBUGXrc7zeSK3wwPHIaRThS+9OhWIXRqzs4d6k1SVU8g91DrNRWxWUGhp5KXQb2VA=="], - "@types/node": ["@types/node@24.9.2", "", { "dependencies": { "undici-types": "~7.16.0" } }, "sha512-uWN8YqxXxqFMX2RqGOrumsKeti4LlmIMIyV0lgut4jx7KQBcBiW6vkDtIBvHnHIquwNfJhk8v2OtmO8zXWHfPA=="], + "@types/node": ["@types/node@24.10.1", "", { "dependencies": { "undici-types": "~7.16.0" } }, "sha512-GNWcUTRBgIRJD5zj+Tq0fKOJ5XZajIiBroOF0yvj2bSU1WvNdYS/dn9UxwsujGW4JX06dnHyjV2y9rRaybH0iQ=="], "@types/plist": ["@types/plist@3.0.5", "", { "dependencies": { "@types/node": "*", "xmlbuilder": ">=11.0.1" } }, "sha512-E6OCaRmAe4WDmWNsL/9RMqdkkzDCY1etutkflWk4c+AcjDU07Pcz1fQwTX0TQz+Pxqn9i4L1TU3UFpjnrcDgxA=="], @@ -3286,7 +3286,7 @@ "dom-serializer/entities": ["entities@2.2.0", "", {}, "sha512-p92if5Nz619I0w+akJrLZH0MX0Pb5DX39XOwQTtXSdQQOaYH03S1uIQp4mhOZtAXrxq4ViO67YTiLBo2638o9A=="], - "electron/@types/node": ["@types/node@22.18.13", "", { "dependencies": { "undici-types": "~6.21.0" } }, "sha512-Bo45YKIjnmFtv6I1TuC8AaHBbqXtIo+Om5fE4QiU1Tj8QR/qt+8O3BAtOimG5IFmwaWiPmB3Mv3jtYzBA4Us2A=="], + "electron/@types/node": ["@types/node@22.19.1", "", { "dependencies": { "undici-types": "~6.21.0" } }, "sha512-LCCV0HdSZZZb34qifBsyWlUmok6W7ouER+oQIGBScS8EsZsQbrtFTUrDX4hOl+CS6p7cnNC4td+qrSVGSCTUfQ=="], "electron-builder/chalk": ["chalk@4.1.2", "", { "dependencies": { "ansi-styles": "^4.1.0", "supports-color": "^7.1.0" } }, "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA=="], diff --git a/src/browser/hooks/useAIViewKeybinds.ts b/src/browser/hooks/useAIViewKeybinds.ts index 4032379c3..47e86381e 100644 --- a/src/browser/hooks/useAIViewKeybinds.ts +++ b/src/browser/hooks/useAIViewKeybinds.ts @@ -8,7 +8,7 @@ import { DEFAULT_THINKING_LEVEL } from "@/common/types/thinking"; import { getThinkingPolicyForModel } from "@/browser/utils/thinking/policy"; import { getDefaultModelFromLRU } from "@/browser/hooks/useModelLRU"; import type { StreamingMessageAggregator } from "@/browser/utils/messages/StreamingMessageAggregator"; -import { isCompactingStream, cancelCompaction } from "@/common/utils/compaction/handler"; +import { isCompactingStream, cancelCompaction } from "@/browser/utils/compaction/handler"; interface UseAIViewKeybindsParams { workspaceId: string; diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index ab7b79951..b2ecbd59f 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -1,6 +1,5 @@ import assert from "@/common/utils/assert"; import type { MuxMessage, DisplayedMessage, QueuedMessage } from "@/common/types/message"; -import { createMuxMessage } from "@/common/types/message"; import type { FrontendWorkspaceMetadata } from "@/common/types/workspace"; import type { WorkspaceChatMessage } from "@/common/types/ipc"; import type { TodoItem } from "@/common/types/tools"; @@ -18,17 +17,11 @@ import { isRestoreToInput, } from "@/common/types/ipc"; import { MapStore } from "./MapStore"; -import { createDisplayUsage } from "@/common/utils/tokens/displayUsage"; +import { collectUsageHistory } from "@/common/utils/tokens/displayUsage"; import { WorkspaceConsumerManager } from "./WorkspaceConsumerManager"; import type { ChatUsageDisplay } from "@/common/utils/tokens/usageAggregator"; -import { sumUsageHistory } from "@/common/utils/tokens/usageAggregator"; import type { TokenConsumer } from "@/common/types/chatStats"; import type { LanguageModelV2Usage } from "@ai-sdk/provider"; -import { getCancelledCompactionKey } from "@/common/constants/storage"; -import { - isCompactingStream, - findCompactionRequestMessage, -} from "@/common/utils/compaction/handler"; import { createFreshRetryState } from "@/browser/utils/messages/retryState"; export interface WorkspaceState { @@ -149,10 +142,6 @@ export class WorkspaceStore { aggregator.handleStreamEnd(data as never); aggregator.clearTokenState((data as { messageId: string }).messageId); - if (this.handleCompactionCompletion(workspaceId, aggregator, data)) { - return; - } - // Reset retry state on successful stream completion updatePersistedState(getRetryStateKey(workspaceId), createFreshRetryState()); @@ -164,10 +153,6 @@ export class WorkspaceStore { aggregator.clearTokenState((data as { messageId: string }).messageId); aggregator.handleStreamAbort(data as never); - if (this.handleCompactionAbort(workspaceId, aggregator, data)) { - return; - } - this.states.bump(workspaceId); this.dispatchResumeCheck(workspaceId); this.finalizeUsageStats(workspaceId, (data as { metadata?: never }).metadata); @@ -446,42 +431,8 @@ export class WorkspaceStore { const aggregator = this.assertGet(workspaceId); const messages = aggregator.getAllMessages(); - - // Extract usage from assistant messages - const usageHistory: ChatUsageDisplay[] = []; - let cumulativeHistorical: ChatUsageDisplay | undefined; - - for (const msg of messages) { - if (msg.role === "assistant") { - // Check for historical usage from compaction summaries - // This preserves costs from messages deleted during compaction - if (msg.metadata?.historicalUsage) { - cumulativeHistorical = msg.metadata.historicalUsage; - } - - // Extract current message's usage - if (msg.metadata?.usage) { - // Use the model from this specific message (not global) - const model = msg.metadata.model ?? aggregator.getCurrentModel() ?? "unknown"; - - const usage = createDisplayUsage( - msg.metadata.usage, - model, - msg.metadata.providerMetadata - ); - - if (usage) { - usageHistory.push(usage); - } - } - } - } - - // If we have historical usage from a compaction, prepend it to history - // This ensures costs from pre-compaction messages are included in totals - if (cumulativeHistorical) { - usageHistory.unshift(cumulativeHistorical); - } + const model = aggregator.getCurrentModel(); + const usageHistory = collectUsageHistory(messages, model); // Calculate total from usage history (now includes historical) const totalTokens = usageHistory.reduce( @@ -544,169 +495,6 @@ export class WorkspaceStore { return this.consumersStore.subscribeKey(workspaceId, listener); } - /** - * Handle compact_summary tool completion. - * Returns true if compaction was handled (caller should early return). - */ - // Track processed compaction-request IDs to dedupe performCompaction across duplicated events - private processedCompactionRequestIds = new Set(); - - private handleCompactionCompletion( - workspaceId: string, - aggregator: StreamingMessageAggregator, - data: WorkspaceChatMessage - ): boolean { - // Type guard: only StreamEndEvent has messageId - if (!("messageId" in data)) return false; - - // Check if this was a compaction stream - if (!isCompactingStream(aggregator)) { - return false; - } - - // Extract the compaction-request message to identify this compaction run - const compactionRequestMsg = findCompactionRequestMessage(aggregator); - if (!compactionRequestMsg) { - return false; - } - - // Dedupe: If we've already processed this compaction-request, skip re-running - if (this.processedCompactionRequestIds.has(compactionRequestMsg.id)) { - return true; // Already handled compaction for this request - } - - // Extract the summary text from the assistant's response - const summary = aggregator.getCompactionSummary(data.messageId); - if (!summary) { - console.warn("[WorkspaceStore] Compaction completed but no summary text found"); - return false; - } - - // Mark this compaction-request as processed before performing compaction - this.processedCompactionRequestIds.add(compactionRequestMsg.id); - - this.performCompaction(workspaceId, aggregator, data, summary); - return true; - } - - /** - * Handle interruption of a compaction stream (StreamAbortEvent). - * - * Two distinct flows trigger this: - * - **Ctrl+A (accept early)**: Perform compaction with [truncated] sentinel - * - **Ctrl+C (cancel)**: Skip compaction, let cancelCompaction handle cleanup - * - * Uses localStorage to distinguish flows: - * - Checks for cancellation marker in localStorage - * - Verifies messageId matches for freshness - * - Reload-safe: localStorage persists across page reloads - */ - private handleCompactionAbort( - workspaceId: string, - aggregator: StreamingMessageAggregator, - data: WorkspaceChatMessage - ): boolean { - // Type guard: only StreamAbortEvent has messageId - if (!("messageId" in data)) return false; - - // Check if this was a compaction stream - if (!isCompactingStream(aggregator)) { - return false; - } - - // Get the compaction request message for ID verification - const compactionRequestMsg = findCompactionRequestMessage(aggregator); - if (!compactionRequestMsg) { - return false; - } - - // Ctrl+C flow: Check localStorage for cancellation marker - // Verify compaction-request user message ID matches (stable across retries) - const storageKey = getCancelledCompactionKey(workspaceId); - const cancelData = localStorage.getItem(storageKey); - if (cancelData) { - try { - const parsed = JSON.parse(cancelData) as { compactionRequestId: string; timestamp: number }; - if (parsed.compactionRequestId === compactionRequestMsg.id) { - // This is a cancelled compaction - clean up marker and skip compaction - localStorage.removeItem(storageKey); - return false; // Skip compaction, cancelCompaction() handles cleanup - } - } catch (error) { - console.error("[WorkspaceStore] Failed to parse cancellation data:", error); - } - // If compactionRequestId doesn't match or parse failed, clean up stale data - localStorage.removeItem(storageKey); - } - - // Ctrl+A flow: Accept early with [truncated] sentinel - const partialSummary = aggregator.getCompactionSummary(data.messageId); - if (!partialSummary) { - console.warn("[WorkspaceStore] Compaction aborted but no partial summary found"); - return false; - } - - // Append [truncated] sentinel on new line to indicate incomplete summary - const truncatedSummary = partialSummary.trim() + "\n\n[truncated]"; - - this.performCompaction(workspaceId, aggregator, data, truncatedSummary); - return true; - } - - /** - * Perform history compaction by replacing chat history with summary message. - * Type-safe: only called when we've verified data is a StreamEndEvent. - */ - private performCompaction( - workspaceId: string, - aggregator: StreamingMessageAggregator, - data: WorkspaceChatMessage, - summary: string - ): void { - // Extract metadata safely with type guard - const metadata = "metadata" in data ? data.metadata : undefined; - - // Calculate cumulative historical usage before replacing history - // This preserves costs from all messages that are about to be deleted - const currentUsage = this.getWorkspaceUsage(workspaceId); - const historicalUsage = - currentUsage.usageHistory.length > 0 ? sumUsageHistory(currentUsage.usageHistory) : undefined; - - const summaryMessage = createMuxMessage( - `summary-${Date.now()}-${Math.random().toString(36).substring(2, 11)}`, - "assistant", - summary, - { - timestamp: Date.now(), - compacted: true, - model: aggregator.getCurrentModel(), - usage: metadata?.usage, - historicalUsage, // Store cumulative costs from all pre-compaction messages - providerMetadata: - metadata && "providerMetadata" in metadata - ? (metadata.providerMetadata as Record | undefined) - : undefined, - duration: metadata?.duration, - systemMessageTokens: - metadata && "systemMessageTokens" in metadata - ? (metadata.systemMessageTokens as number | undefined) - : undefined, - muxMetadata: { type: "normal" }, - } - ); - - void (async () => { - try { - await window.api.workspace.replaceChatHistory(workspaceId, summaryMessage); - } catch (error) { - console.error("[WorkspaceStore] Failed to replace history:", error); - } finally { - this.states.bump(workspaceId); - this.checkAndBumpRecencyIfChanged(); - } - })(); - } - /** * Update usage and schedule consumer calculation after stream completion. * diff --git a/src/common/utils/compaction/handler.ts b/src/browser/utils/compaction/handler.ts similarity index 70% rename from src/common/utils/compaction/handler.ts rename to src/browser/utils/compaction/handler.ts index adcc68bed..eaf93a0a9 100644 --- a/src/common/utils/compaction/handler.ts +++ b/src/browser/utils/compaction/handler.ts @@ -12,7 +12,6 @@ */ import type { StreamingMessageAggregator } from "@/browser/utils/messages/StreamingMessageAggregator"; -import { getCancelledCompactionKey } from "@/common/constants/storage"; /** * Check if the workspace is currently in a compaction stream @@ -55,19 +54,14 @@ export function getCompactionCommand(aggregator: StreamingMessageAggregator): st * Cancel compaction (Ctrl+C flow) * * Aborts the compaction stream and puts user in edit mode for compaction-request: - * - Interrupts stream with abandonPartial flag (deletes partial, doesn't commit) - * - Skips compaction (via localStorage marker checked by handleCompactionAbort) + * - Interrupts stream with abandonPartial=true flag (backend skips compaction) * - Enters edit mode on compaction-request message * - Restores original /compact command to input for re-editing * - Leaves compaction-request message in history (can edit or delete it) * * Flow: - * 1. Store cancellation marker in localStorage with compactionRequestId for verification - * 2. Interrupt stream with {abandonPartial: true} - backend deletes partial - * 3. handleCompactionAbort checks localStorage, verifies compactionRequestId, skips compaction - * 4. Enter edit mode on compaction-request message with original command - * - * Reload-safe: localStorage persists across reloads, compactionRequestId ensures freshness + * 1. Interrupt stream with {abandonPartial: true} - backend detects and skips compaction + * 2. Enter edit mode on compaction-request message with original command */ export async function cancelCompaction( workspaceId: string, @@ -86,21 +80,8 @@ export async function cancelCompaction( return false; } - // CRITICAL: Store cancellation marker in localStorage BEFORE interrupt - // Use the compaction-request user message ID (stable across retries) - // This persists across reloads and verifies we're cancelling the right compaction - const storageKey = getCancelledCompactionKey(workspaceId); - localStorage.setItem( - storageKey, - JSON.stringify({ - compactionRequestId: compactionRequestMsg.id, - timestamp: Date.now(), - }) - ); - // Interrupt stream with abandonPartial flag - // This tells backend to DELETE the partial instead of committing it - // Result: history ends with the compaction-request user message (which is fine - just a user message) + // Backend detects this and skips compaction (Ctrl+C flow) await window.api.workspace.interruptStream(workspaceId, { abandonPartial: true }); // Enter edit mode on the compaction-request message with original command diff --git a/src/common/types/stream.ts b/src/common/types/stream.ts index dcbf3547a..4639329a7 100644 --- a/src/common/types/stream.ts +++ b/src/common/types/stream.ts @@ -56,6 +56,7 @@ export interface StreamAbortEvent { usage?: LanguageModelV2Usage; duration?: number; }; + abandonPartial?: boolean; } export interface ErrorEvent { diff --git a/src/common/utils/tokens/displayUsage.test.ts b/src/common/utils/tokens/displayUsage.test.ts new file mode 100644 index 000000000..b5e98e7d7 --- /dev/null +++ b/src/common/utils/tokens/displayUsage.test.ts @@ -0,0 +1,148 @@ +import { describe, test, expect } from "bun:test"; +import { collectUsageHistory } from "./displayUsage"; +import { createMuxMessage, type MuxMessage } from "@/common/types/message"; +import type { LanguageModelV2Usage } from "@ai-sdk/provider"; +import type { ChatUsageDisplay } from "./usageAggregator"; + +// Helper to create assistant message with usage +const createAssistant = ( + id: string, + usage?: LanguageModelV2Usage, + model?: string, + historicalUsage?: ChatUsageDisplay +): MuxMessage => { + const msg = createMuxMessage(id, "assistant", "Response", { + historySequence: 0, + usage, + model, + historicalUsage, + }); + return msg; +}; + +describe("collectUsageHistory", () => { + const basicUsage: LanguageModelV2Usage = { + inputTokens: 100, + outputTokens: 50, + totalTokens: 150, + }; + + test("returns empty array for empty messages", () => { + expect(collectUsageHistory([])).toEqual([]); + }); + + test("returns empty array when no assistant messages", () => { + const userMsg = createMuxMessage("u1", "user", "Hello", { historySequence: 0 }); + expect(collectUsageHistory([userMsg])).toEqual([]); + }); + + test("extracts usage from single assistant message", () => { + const msg = createAssistant("a1", basicUsage, "claude-sonnet-4-5"); + const result = collectUsageHistory([msg]); + + expect(result).toHaveLength(1); + expect(result[0].model).toBe("claude-sonnet-4-5"); + expect(result[0].input.tokens).toBe(100); + expect(result[0].output.tokens).toBe(50); + }); + + test("extracts usage from multiple assistant messages", () => { + const msg1 = createAssistant("a1", basicUsage, "claude-sonnet-4-5"); + const msg2 = createAssistant("a2", { ...basicUsage, inputTokens: 200 }, "claude-sonnet-4-5"); + const result = collectUsageHistory([msg1, msg2]); + + expect(result).toHaveLength(2); + expect(result[0].input.tokens).toBe(100); + expect(result[1].input.tokens).toBe(200); + }); + + test("skips assistant messages without usage", () => { + const msg1 = createAssistant("a1", basicUsage, "claude-sonnet-4-5"); + const msg2 = createAssistant("a2", undefined, "claude-sonnet-4-5"); // No usage + const msg3 = createAssistant("a3", basicUsage, "claude-sonnet-4-5"); + const result = collectUsageHistory([msg1, msg2, msg3]); + + expect(result).toHaveLength(2); // msg2 excluded + }); + + test("filters out user messages", () => { + const userMsg = createMuxMessage("u1", "user", "Hello", { historySequence: 0 }); + const assistantMsg = createAssistant("a1", basicUsage, "claude-sonnet-4-5"); + const result = collectUsageHistory([userMsg, assistantMsg]); + + expect(result).toHaveLength(1); + }); + + test("uses fallbackModel when message has no model", () => { + const msg = createAssistant("a1", basicUsage, undefined); + const result = collectUsageHistory([msg], "fallback-model"); + + expect(result[0].model).toBe("fallback-model"); + }); + + test("defaults to 'unknown' when no model provided", () => { + const msg = createAssistant("a1", basicUsage, undefined); + const result = collectUsageHistory([msg]); + + expect(result[0].model).toBe("unknown"); + }); + + test("prepends historical usage from compaction summary", () => { + const historicalUsage: ChatUsageDisplay = { + input: { tokens: 500, cost_usd: 0.01 }, + output: { tokens: 250, cost_usd: 0.02 }, + cached: { tokens: 0 }, + cacheCreate: { tokens: 0 }, + reasoning: { tokens: 0 }, + model: "historical-model", + }; + + const msg = createAssistant("a1", basicUsage, "claude-sonnet-4-5", historicalUsage); + const result = collectUsageHistory([msg]); + + expect(result).toHaveLength(2); + expect(result[0]).toBe(historicalUsage); // Historical comes first + expect(result[1].model).toBe("claude-sonnet-4-5"); // Current message second + }); + + test("uses latest historical usage when multiple messages have it", () => { + const historical1: ChatUsageDisplay = { + input: { tokens: 100 }, + output: { tokens: 50 }, + cached: { tokens: 0 }, + cacheCreate: { tokens: 0 }, + reasoning: { tokens: 0 }, + model: "first", + }; + + const historical2: ChatUsageDisplay = { + input: { tokens: 200 }, + output: { tokens: 100 }, + cached: { tokens: 0 }, + cacheCreate: { tokens: 0 }, + reasoning: { tokens: 0 }, + model: "second", + }; + + const msg1 = createAssistant("a1", basicUsage, "model-1", historical1); + const msg2 = createAssistant("a2", basicUsage, "model-2", historical2); + const result = collectUsageHistory([msg1, msg2]); + + expect(result).toHaveLength(3); // historical2 + msg1 + msg2 + expect(result[0]).toBe(historical2); // Latest historical usage wins + expect(result[0].model).toBe("second"); + }); + + test("handles mixed message order correctly", () => { + const userMsg = createMuxMessage("u1", "user", "Hello", { historySequence: 0 }); + const assistantMsg1 = createAssistant("a1", basicUsage, "model-1"); + const userMsg2 = createMuxMessage("u2", "user", "More", { historySequence: 2 }); + const assistantMsg2 = createAssistant("a2", basicUsage, "model-2"); + + const result = collectUsageHistory([userMsg, assistantMsg1, userMsg2, assistantMsg2]); + + expect(result).toHaveLength(2); + expect(result[0].model).toBe("model-1"); + expect(result[1].model).toBe("model-2"); + }); +}); diff --git a/src/common/utils/tokens/displayUsage.ts b/src/common/utils/tokens/displayUsage.ts index b98c5e771..886548fd1 100644 --- a/src/common/utils/tokens/displayUsage.ts +++ b/src/common/utils/tokens/displayUsage.ts @@ -8,6 +8,7 @@ import type { LanguageModelV2Usage } from "@ai-sdk/provider"; import { getModelStats } from "./modelStats"; import type { ChatUsageDisplay } from "./usageAggregator"; +import type { MuxMessage } from "@/common/types/message"; /** * Create a display-friendly usage object from AI SDK usage @@ -90,3 +91,41 @@ export function createDisplayUsage( model, // Include model for display purposes }; } + +export function collectUsageHistory( + messages: MuxMessage[], + fallbackModel?: string +): ChatUsageDisplay[] { + // Extract usage from assistant messages + const usageHistory: ChatUsageDisplay[] = []; + let cumulativeHistorical: ChatUsageDisplay | undefined; + + for (const msg of messages) { + if (msg.role === "assistant") { + // Check for historical usage from compaction summaries + // This preserves costs from messages deleted during compaction + if (msg.metadata?.historicalUsage) { + cumulativeHistorical = msg.metadata.historicalUsage; + } + + // Extract current message's usage + if (msg.metadata?.usage) { + // Use the model from this specific message (not global) + const model = msg.metadata.model ?? fallbackModel ?? "unknown"; + const usage = createDisplayUsage(msg.metadata.usage, model, msg.metadata.providerMetadata); + + if (usage) { + usageHistory.push(usage); + } + } + } + } + + // If we have historical usage from a compaction, prepend it to history + // This ensures costs from pre-compaction messages are included in totals + if (cumulativeHistorical) { + usageHistory.unshift(cumulativeHistorical); + } + + return usageHistory; +} diff --git a/src/node/services/agentSession.ts b/src/node/services/agentSession.ts index a3ed32dff..adbe96ee3 100644 --- a/src/node/services/agentSession.ts +++ b/src/node/services/agentSession.ts @@ -23,6 +23,8 @@ import { Ok, Err } from "@/common/types/result"; import { enforceThinkingPolicy } from "@/browser/utils/thinking/policy"; import { createRuntime } from "@/node/runtime/runtimeFactory"; import { MessageQueue } from "./messageQueue"; +import type { StreamEndEvent, StreamAbortEvent } from "@/common/types/stream"; +import { CompactionHandler } from "./compactionHandler"; export interface AgentSessionChatEvent { workspaceId: string; @@ -57,6 +59,7 @@ export class AgentSession { []; private disposed = false; private readonly messageQueue = new MessageQueue(); + private readonly compactionHandler: CompactionHandler; constructor(options: AgentSessionOptions) { assert(options, "AgentSession requires options"); @@ -74,6 +77,12 @@ export class AgentSession { this.aiService = aiService; this.initStateManager = initStateManager; + this.compactionHandler = new CompactionHandler({ + workspaceId: this.workspaceId, + historyService: this.historyService, + emitter: this.emitter, + }); + this.attachAiListeners(); this.attachInitListeners(); } @@ -346,14 +355,14 @@ export class AgentSession { return this.streamWithHistory(model, options); } - async interruptStream(): Promise> { + async interruptStream(abandonPartial?: boolean): Promise> { this.assertNotDisposed("interruptStream"); if (!this.aiService.isStreaming(this.workspaceId)) { return Ok(undefined); } - const stopResult = await this.aiService.stopStream(this.workspaceId); + const stopResult = await this.aiService.stopStream(this.workspaceId, abandonPartial); if (!stopResult.success) { return Err(stopResult.error); } @@ -396,7 +405,10 @@ export class AgentSession { } private attachAiListeners(): void { - const forward = (event: string, handler: (payload: WorkspaceChatMessage) => void) => { + const forward = ( + event: string, + handler: (payload: WorkspaceChatMessage) => Promise | void + ) => { const wrapped = (...args: unknown[]) => { const [payload] = args; if ( @@ -407,7 +419,7 @@ export class AgentSession { ) { return; } - handler(payload as WorkspaceChatMessage); + void handler(payload as WorkspaceChatMessage); }; this.aiListeners.push({ event, handler: wrapped }); this.aiService.on(event, wrapped as never); @@ -425,14 +437,21 @@ export class AgentSession { forward("reasoning-delta", (payload) => this.emitChatEvent(payload)); forward("reasoning-end", (payload) => this.emitChatEvent(payload)); - forward("stream-end", (payload) => { - this.emitChatEvent(payload); + forward("stream-end", async (payload) => { + const handled = await this.compactionHandler.handleCompletion(payload as StreamEndEvent); + if (!handled) { + this.emitChatEvent(payload); + } // Stream end: auto-send queued messages this.sendQueuedMessages(); }); - forward("stream-abort", (payload) => { - this.emitChatEvent(payload); + forward("stream-abort", async (payload) => { + const handled = await this.compactionHandler.handleAbort(payload as StreamAbortEvent); + if (!handled) { + this.emitChatEvent(payload); + } + // Stream aborted: restore queued messages to input if (!this.messageQueue.isEmpty()) { const displayText = this.messageQueue.getDisplayText(); diff --git a/src/node/services/aiService.ts b/src/node/services/aiService.ts index d16d6b01d..33107cedc 100644 --- a/src/node/services/aiService.ts +++ b/src/node/services/aiService.ts @@ -955,12 +955,12 @@ export class AIService extends EventEmitter { } } - async stopStream(workspaceId: string): Promise> { + async stopStream(workspaceId: string, abandonPartial?: boolean): Promise> { if (this.mockModeEnabled && this.mockScenarioPlayer) { this.mockScenarioPlayer.stop(workspaceId); return Ok(undefined); } - return this.streamManager.stopStream(workspaceId); + return this.streamManager.stopStream(workspaceId, abandonPartial); } /** diff --git a/src/node/services/compactionHandler.test.ts b/src/node/services/compactionHandler.test.ts new file mode 100644 index 000000000..33bb1e750 --- /dev/null +++ b/src/node/services/compactionHandler.test.ts @@ -0,0 +1,753 @@ +import { describe, it, expect, beforeEach, mock } from "bun:test"; +import { CompactionHandler } from "./compactionHandler"; +import type { HistoryService } from "./historyService"; +import type { EventEmitter } from "events"; +import { createMuxMessage, type MuxMessage } from "@/common/types/message"; +import type { StreamEndEvent, StreamAbortEvent } from "@/common/types/stream"; +import { Ok, Err, type Result } from "@/common/types/result"; +import type { LanguageModelV2Usage } from "@ai-sdk/provider"; + +interface EmittedEvent { + event: string; + data: ChatEventData; +} + +// Type guards for emitted events +interface ChatEventData { + workspaceId: string; + message: unknown; +} + +const createMockHistoryService = () => { + let getHistoryResult: Result = Ok([]); + let clearHistoryResult: Result = Ok([]); + let appendToHistoryResult: Result = Ok(undefined); + + const getHistory = mock((_) => Promise.resolve(getHistoryResult)); + const clearHistory = mock((_) => Promise.resolve(clearHistoryResult)); + const appendToHistory = mock((_, __) => Promise.resolve(appendToHistoryResult)); + const updateHistory = mock(() => Promise.resolve(Ok(undefined))); + const truncateAfterMessage = mock(() => Promise.resolve(Ok(undefined))); + + return { + getHistory, + clearHistory, + appendToHistory, + updateHistory, + truncateAfterMessage, + // Allow setting mock return values + mockGetHistory: (result: Result) => { + getHistoryResult = result; + }, + mockClearHistory: (result: Result) => { + clearHistoryResult = result; + }, + mockAppendToHistory: (result: Result) => { + appendToHistoryResult = result; + }, + }; +}; + +const createMockEmitter = (): { emitter: EventEmitter; events: EmittedEvent[] } => { + const events: EmittedEvent[] = []; + const emitter = { + emit: (_event: string, data: ChatEventData) => { + events.push({ event: _event, data }); + return true; + }, + }; + return { emitter: emitter as EventEmitter, events }; +}; + +const createCompactionRequest = (id = "req-1"): MuxMessage => + createMuxMessage(id, "user", "Please summarize the conversation", { + historySequence: 0, + muxMetadata: { type: "compaction-request", rawCommand: "/compact", parsed: {} }, + }); + +const createAssistantMessage = ( + content: string, + options: { + id?: string; + historySequence?: number; + model?: string; + usage?: { inputTokens: number; outputTokens: number; totalTokens?: number }; + duration?: number; + } = {} +): MuxMessage => + createMuxMessage(options.id ?? "asst-1", "assistant", content, { + historySequence: options.historySequence ?? 1, + model: options.model ?? "claude-3-5-sonnet-20241022", + usage: options.usage as LanguageModelV2Usage | undefined, + duration: options.duration, + }); + +const createStreamEndEvent = ( + summary: string, + metadata?: Record +): StreamEndEvent => ({ + type: "stream-end", + workspaceId: "test-workspace", + messageId: "msg-id", + parts: [{ type: "text", text: summary }], + metadata: { + model: "claude-3-5-sonnet-20241022", + usage: { inputTokens: 100, outputTokens: 50, totalTokens: undefined }, + duration: 1500, + ...metadata, + }, +}); + +const createStreamAbortEvent = ( + abandonPartial = false, + metadata?: Record +): StreamAbortEvent => ({ + type: "stream-abort", + workspaceId: "test-workspace", + messageId: "msg-id", + abandonPartial, + metadata: { + usage: { inputTokens: 100, outputTokens: 25, totalTokens: undefined }, + duration: 800, + ...metadata, + }, +}); + +// DRY helper to set up successful compaction scenario +const setupSuccessfulCompaction = ( + mockHistoryService: ReturnType, + messages: MuxMessage[] = [createCompactionRequest()], + clearedSequences?: number[] +) => { + mockHistoryService.mockGetHistory(Ok(messages)); + mockHistoryService.mockClearHistory(Ok(clearedSequences ?? messages.map((_, i) => i))); + mockHistoryService.mockAppendToHistory(Ok(undefined)); +}; + +describe("CompactionHandler", () => { + let handler: CompactionHandler; + let mockHistoryService: ReturnType; + let mockEmitter: EventEmitter; + let emittedEvents: EmittedEvent[]; + const workspaceId = "test-workspace"; + + beforeEach(() => { + const { emitter, events } = createMockEmitter(); + mockEmitter = emitter; + emittedEvents = events; + + mockHistoryService = createMockHistoryService(); + + handler = new CompactionHandler({ + workspaceId, + historyService: mockHistoryService as unknown as HistoryService, + emitter: mockEmitter, + }); + }); + + describe("handleAbort() - Ctrl+C (cancel) Flow", () => { + it("should return false when no compaction request found in history", async () => { + const normalUserMsg = createMuxMessage("msg1", "user", "Hello", { + historySequence: 0, + muxMetadata: { type: "normal" }, + }); + mockHistoryService.mockGetHistory(Ok([normalUserMsg])); + + const event = createStreamAbortEvent(false); + const result = await handler.handleAbort(event); + + expect(result).toBe(false); + expect(emittedEvents).toHaveLength(0); + }); + + it("should return false when abandonPartial=true (Ctrl+C cancel)", async () => { + const compactionReq = createCompactionRequest(); + const assistantMsg = createAssistantMessage("Partial summary..."); + mockHistoryService.mockGetHistory(Ok([compactionReq, assistantMsg])); + + const event = createStreamAbortEvent(true); + const result = await handler.handleAbort(event); + + expect(result).toBe(false); + expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(0); + expect(emittedEvents).toHaveLength(0); + }); + + it("should not perform compaction when cancelled", async () => { + const compactionReq = createCompactionRequest(); + const assistantMsg = createAssistantMessage("Partial summary"); + mockHistoryService.mockGetHistory(Ok([compactionReq, assistantMsg])); + + const event = createStreamAbortEvent(true); + await handler.handleAbort(event); + + expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(0); + expect(mockHistoryService.appendToHistory.mock.calls).toHaveLength(0); + }); + + it("should not emit events when cancelled", async () => { + const compactionReq = createCompactionRequest(); + const assistantMsg = createAssistantMessage("Partial"); + mockHistoryService.mockGetHistory(Ok([compactionReq, assistantMsg])); + + const event = createStreamAbortEvent(true); + await handler.handleAbort(event); + + expect(emittedEvents).toHaveLength(0); + }); + }); + + describe("handleAbort() - Ctrl+A (accept early) Flow", () => { + it("should return false when last message is not assistant role", async () => { + const compactionReq = createCompactionRequest(); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + + const event = createStreamAbortEvent(false); + const result = await handler.handleAbort(event); + + expect(result).toBe(false); + }); + + it("should return true when successful", async () => { + const compactionReq = createCompactionRequest(); + const assistantMsg = createAssistantMessage("Partial summary"); + setupSuccessfulCompaction(mockHistoryService, [compactionReq, assistantMsg]); + + const event = createStreamAbortEvent(false); + const result = await handler.handleAbort(event); + + expect(result).toBe(true); + }); + + it("should read partial summary from last assistant message in history", async () => { + const compactionReq = createCompactionRequest(); + const assistantMsg = createAssistantMessage("Here is a partial summary"); + setupSuccessfulCompaction(mockHistoryService, [compactionReq, assistantMsg]); + + const event = createStreamAbortEvent(false); + await handler.handleAbort(event); + + expect(mockHistoryService.appendToHistory.mock.calls).toHaveLength(1); + const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; + expect((appendedMsg.parts[0] as { type: "text"; text: string }).text).toContain( + "Here is a partial summary" + ); + }); + + it("should append [truncated] sentinel to partial summary", async () => { + const compactionReq = createCompactionRequest(); + const assistantMsg = createAssistantMessage("Partial text"); + setupSuccessfulCompaction(mockHistoryService, [compactionReq, assistantMsg]); + + const event = createStreamAbortEvent(false); + await handler.handleAbort(event); + + const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; + expect((appendedMsg.parts[0] as { type: "text"; text: string }).text).toContain( + "[truncated]" + ); + }); + + it("should call clearHistory() and appendToHistory() with summary message", async () => { + const compactionReq = createCompactionRequest(); + const assistantMsg = createAssistantMessage("Summary"); + setupSuccessfulCompaction(mockHistoryService, [compactionReq, assistantMsg]); + + const event = createStreamAbortEvent(false); + await handler.handleAbort(event); + + expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(1); + expect(mockHistoryService.clearHistory.mock.calls[0][0]).toBe(workspaceId); + expect(mockHistoryService.appendToHistory.mock.calls).toHaveLength(1); + expect(mockHistoryService.appendToHistory.mock.calls[0][0]).toBe(workspaceId); + const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; + expect(appendedMsg.role).toBe("assistant"); + expect((appendedMsg.parts[0] as { type: "text"; text: string }).text).toContain( + "[truncated]" + ); + }); + + it("should emit delete event with cleared sequence numbers", async () => { + const compactionReq = createCompactionRequest(); + const assistantMsg = createAssistantMessage("Summary"); + mockHistoryService.mockGetHistory(Ok([compactionReq, assistantMsg])); + mockHistoryService.mockClearHistory(Ok([0, 1, 2])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamAbortEvent(false); + await handler.handleAbort(event); + + const deleteEvent = emittedEvents.find( + (_e) => (_e.data.message as { type?: string })?.type === "delete" + ); + expect(deleteEvent).toBeDefined(); + expect(deleteEvent?.data).toEqual({ + workspaceId, + message: { + type: "delete", + historySequences: [0, 1, 2], + }, + }); + }); + + it("should emit summary message as assistant message", async () => { + const compactionReq = createCompactionRequest(); + const assistantMsg = createAssistantMessage("Summary text"); + mockHistoryService.mockGetHistory(Ok([compactionReq, assistantMsg])); + mockHistoryService.mockClearHistory(Ok([0, 1])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamAbortEvent(false); + await handler.handleAbort(event); + + const summaryEvent = emittedEvents.find((_e) => { + const msg = _e.data.message as MuxMessage | undefined; + return msg?.role === "assistant" && msg?.parts !== undefined; + }); + expect(summaryEvent).toBeDefined(); + expect(summaryEvent?.data.workspaceId).toBe(workspaceId); + const summaryMsg = summaryEvent?.data.message as MuxMessage; + expect((summaryMsg.parts[0] as { type: "text"; text: string }).text).toContain("[truncated]"); + }); + + it("should emit original stream-abort event to frontend", async () => { + const compactionReq = createCompactionRequest(); + const assistantMsg = createAssistantMessage("Summary"); + mockHistoryService.mockGetHistory(Ok([compactionReq, assistantMsg])); + mockHistoryService.mockClearHistory(Ok([0, 1])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamAbortEvent(false, { duration: 999 }); + await handler.handleAbort(event); + + const abortEvent = emittedEvents.find((_e) => _e.data.message === event); + expect(abortEvent).toBeDefined(); + expect(abortEvent?.event).toBe("chat-event"); + expect(abortEvent?.data.workspaceId).toBe(workspaceId); + const abortMsg = abortEvent?.data.message as StreamAbortEvent; + expect(abortMsg.metadata?.duration).toBe(999); + }); + + it("should preserve metadata (model, usage, duration, systemMessageTokens)", async () => { + const compactionReq = createCompactionRequest(); + const usage = { inputTokens: 100, outputTokens: 25, totalTokens: 125 }; + const assistantMsg = createAssistantMessage("Summary", { + usage, + duration: 800, + model: "claude-3-opus-20240229", + }); + assistantMsg.metadata!.systemMessageTokens = 50; + mockHistoryService.mockGetHistory(Ok([compactionReq, assistantMsg])); + mockHistoryService.mockClearHistory(Ok([0, 1])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamAbortEvent(false, { usage, duration: 800 }); + await handler.handleAbort(event); + + const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; + expect(appendedMsg.metadata?.model).toBe("claude-3-opus-20240229"); + expect(appendedMsg.metadata?.usage).toEqual(usage); + expect(appendedMsg.metadata?.duration).toBe(800); + expect(appendedMsg.metadata?.systemMessageTokens).toBe(50); + }); + + it("should handle empty partial text gracefully (just [truncated])", async () => { + const compactionReq = createCompactionRequest(); + const assistantMsg = createAssistantMessage(""); + mockHistoryService.mockGetHistory(Ok([compactionReq, assistantMsg])); + mockHistoryService.mockClearHistory(Ok([0, 1])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamAbortEvent(false); + await handler.handleAbort(event); + + const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; + expect((appendedMsg.parts[0] as { type: "text"; text: string }).text).toBe("\n\n[truncated]"); + }); + }); + + describe("handleCompletion() - Normal Compaction Flow", () => { + it("should return false when no compaction request found", async () => { + const normalMsg = createMuxMessage("msg1", "user", "Hello", { + historySequence: 0, + muxMetadata: { type: "normal" }, + }); + mockHistoryService.mockGetHistory(Ok([normalMsg])); + + const event = createStreamEndEvent("Summary"); + const result = await handler.handleCompletion(event); + + expect(result).toBe(false); + expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(0); + }); + + it("should return false when historyService fails", async () => { + mockHistoryService.mockGetHistory(Err("Database error")); + + const event = createStreamEndEvent("Summary"); + const result = await handler.handleCompletion(event); + + expect(result).toBe(false); + }); + + it("should return true when successful", async () => { + const compactionReq = createCompactionRequest(); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Ok([0])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamEndEvent("Complete summary"); + const result = await handler.handleCompletion(event); + + expect(result).toBe(true); + }); + + it("should join multiple text parts from event.parts", async () => { + const compactionReq = createCompactionRequest(); + setupSuccessfulCompaction(mockHistoryService, [compactionReq]); + + // Create event with multiple text parts + const event: StreamEndEvent = { + type: "stream-end", + workspaceId: "test-workspace", + messageId: "msg-id", + parts: [ + { type: "text", text: "Part 1 " }, + { type: "text", text: "Part 2 " }, + { type: "text", text: "Part 3" }, + ], + metadata: { + model: "claude-3-5-sonnet-20241022", + usage: { inputTokens: 100, outputTokens: 50, totalTokens: undefined }, + duration: 1500, + }, + }; + await handler.handleCompletion(event); + + const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; + expect((appendedMsg.parts[0] as { type: "text"; text: string }).text).toBe( + "Part 1 Part 2 Part 3" + ); + }); + + it("should extract summary text from event.parts", async () => { + const compactionReq = createCompactionRequest(); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Ok([0])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamEndEvent("This is the summary"); + await handler.handleCompletion(event); + + const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; + expect((appendedMsg.parts[0] as { type: "text"; text: string }).text).toBe( + "This is the summary" + ); + }); + + it("should call clearHistory() and appendToHistory()", async () => { + const compactionReq = createCompactionRequest(); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Ok([0])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamEndEvent("Summary"); + await handler.handleCompletion(event); + + expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(1); + expect(mockHistoryService.clearHistory.mock.calls[0][0]).toBe(workspaceId); + expect(mockHistoryService.appendToHistory.mock.calls).toHaveLength(1); + expect(mockHistoryService.appendToHistory.mock.calls[0][0]).toBe(workspaceId); + const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; + expect(appendedMsg.role).toBe("assistant"); + expect((appendedMsg.parts[0] as { type: "text"; text: string }).text).toBe("Summary"); + }); + + it("should emit delete event for old messages", async () => { + const compactionReq = createCompactionRequest(); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Ok([0, 1, 2, 3])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamEndEvent("Summary"); + await handler.handleCompletion(event); + + const deleteEvent = emittedEvents.find( + (_e) => (_e.data.message as { type?: string })?.type === "delete" + ); + expect(deleteEvent).toBeDefined(); + const delMsg = deleteEvent?.data.message as { type: "delete"; historySequences: number[] }; + expect(delMsg.historySequences).toEqual([0, 1, 2, 3]); + }); + + it("should emit summary message with complete metadata", async () => { + const compactionReq = createCompactionRequest(); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Ok([0])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const usage = { inputTokens: 200, outputTokens: 100, totalTokens: 300 }; + const event = createStreamEndEvent("Summary", { + model: "claude-3-5-sonnet-20241022", + usage, + duration: 2000, + providerMetadata: { foo: "bar" }, + systemMessageTokens: 100, + }); + await handler.handleCompletion(event); + + const summaryEvent = emittedEvents.find((_e) => { + const m = _e.data.message as MuxMessage | undefined; + return m?.role === "assistant" && m?.parts !== undefined; + }); + expect(summaryEvent).toBeDefined(); + const sevt = summaryEvent?.data.message as MuxMessage; + expect(sevt.metadata).toMatchObject({ + model: "claude-3-5-sonnet-20241022", + usage, + duration: 2000, + providerMetadata: { foo: "bar" }, + systemMessageTokens: 100, + compacted: true, + }); + }); + + it("should emit stream-end event to frontend", async () => { + const compactionReq = createCompactionRequest(); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Ok([0])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamEndEvent("Summary", { duration: 1234 }); + await handler.handleCompletion(event); + + const streamEndEvent = emittedEvents.find((_e) => _e.data.message === event); + expect(streamEndEvent).toBeDefined(); + expect(streamEndEvent?.data.workspaceId).toBe(workspaceId); + const streamMsg = streamEndEvent?.data.message as StreamEndEvent; + expect(streamMsg.metadata.duration).toBe(1234); + }); + + it("should calculate historicalUsage from all messages using collectUsageHistory and sumUsageHistory", async () => { + const compactionReq = createCompactionRequest(); + const msg1 = createAssistantMessage("Response 1", { + historySequence: 1, + usage: { inputTokens: 50, outputTokens: 25, totalTokens: 75 }, + }); + const msg2 = createAssistantMessage("Response 2", { + historySequence: 2, + usage: { inputTokens: 100, outputTokens: 50, totalTokens: 150 }, + }); + mockHistoryService.mockGetHistory(Ok([compactionReq, msg1, msg2])); + mockHistoryService.mockClearHistory(Ok([0, 1, 2])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamEndEvent("Summary"); + await handler.handleCompletion(event); + + const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; + expect(appendedMsg.metadata?.historicalUsage).toBeDefined(); + // historicalUsage is ChatUsageDisplay with input/output/cached/reasoning components + expect(appendedMsg.metadata?.historicalUsage).toHaveProperty("input"); + expect(appendedMsg.metadata?.historicalUsage).toHaveProperty("output"); + }); + + it("should set compacted: true in summary metadata", async () => { + const compactionReq = createCompactionRequest(); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Ok([0])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamEndEvent("Summary"); + await handler.handleCompletion(event); + + const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; + expect(appendedMsg.metadata?.compacted).toBe(true); + }); + }); + + describe("handleCompletion() - Deduplication", () => { + it("should track processed compaction-request IDs", async () => { + const compactionReq = createCompactionRequest("req-unique"); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Ok([0])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamEndEvent("Summary"); + await handler.handleCompletion(event); + + expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(1); + }); + + it("should return true without re-processing when same request ID seen twice", async () => { + const compactionReq = createCompactionRequest("req-dupe"); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Ok([0])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamEndEvent("Summary"); + const result1 = await handler.handleCompletion(event); + const result2 = await handler.handleCompletion(event); + + expect(result1).toBe(true); + expect(result2).toBe(true); + expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(1); + }); + + it("should not emit duplicate events", async () => { + const compactionReq = createCompactionRequest("req-dupe-2"); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Ok([0])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamEndEvent("Summary"); + await handler.handleCompletion(event); + const eventCountAfterFirst = emittedEvents.length; + + await handler.handleCompletion(event); + const eventCountAfterSecond = emittedEvents.length; + + expect(eventCountAfterSecond).toBe(eventCountAfterFirst); + }); + + it("should not clear history twice", async () => { + const compactionReq = createCompactionRequest("req-dupe-3"); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Ok([0])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamEndEvent("Summary"); + await handler.handleCompletion(event); + await handler.handleCompletion(event); + + expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(1); + expect(mockHistoryService.appendToHistory.mock.calls).toHaveLength(1); + }); + }); + + describe("Error Handling", () => { + it("should return false when clearHistory() fails", async () => { + const compactionReq = createCompactionRequest(); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Err("Clear failed")); + + const event = createStreamEndEvent("Summary"); + const result = await handler.handleCompletion(event); + + expect(result).toBe(false); + expect(mockHistoryService.appendToHistory.mock.calls).toHaveLength(0); + }); + + it("should return false when appendToHistory() fails", async () => { + const compactionReq = createCompactionRequest(); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Ok([0])); + mockHistoryService.mockAppendToHistory(Err("Append failed")); + + const event = createStreamEndEvent("Summary"); + const result = await handler.handleCompletion(event); + + expect(result).toBe(false); + }); + + it("should log errors but not throw", async () => { + const compactionReq = createCompactionRequest(); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Err("Database corruption")); + + const event = createStreamEndEvent("Summary"); + + // Should not throw + const result = await handler.handleCompletion(event); + expect(result).toBe(false); + }); + + it("should not emit events when compaction fails mid-process", async () => { + const compactionReq = createCompactionRequest(); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Err("Clear failed")); + + const event = createStreamEndEvent("Summary"); + await handler.handleCompletion(event); + + expect(emittedEvents).toHaveLength(0); + }); + }); + + describe("Event Emission", () => { + it("should include workspaceId in all chat-event emissions", async () => { + const compactionReq = createCompactionRequest(); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Ok([0])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamEndEvent("Summary"); + await handler.handleCompletion(event); + + const chatEvents = emittedEvents.filter((e) => e.event === "chat-event"); + expect(chatEvents.length).toBeGreaterThan(0); + chatEvents.forEach((e) => { + expect(e.data.workspaceId).toBe(workspaceId); + }); + }); + + it("should emit DeleteMessage with correct type and historySequences array", async () => { + const compactionReq = createCompactionRequest(); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Ok([5, 10, 15])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamEndEvent("Summary"); + await handler.handleCompletion(event); + + const deleteEvent = emittedEvents.find( + (_e) => (_e.data.message as { type?: string })?.type === "delete" + ); + expect(deleteEvent?.data.message).toEqual({ + type: "delete", + historySequences: [5, 10, 15], + }); + }); + + it("should emit summary message with proper MuxMessage structure", async () => { + const compactionReq = createCompactionRequest(); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Ok([0])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamEndEvent("Summary text"); + await handler.handleCompletion(event); + + const summaryEvent = emittedEvents.find((_e) => { + const m = _e.data.message as MuxMessage | undefined; + return m?.role === "assistant" && m?.parts !== undefined; + }); + expect(summaryEvent).toBeDefined(); + const summaryMsg = summaryEvent?.data.message as MuxMessage; + expect(summaryMsg).toMatchObject({ + id: expect.stringContaining("summary-") as string, + role: "assistant", + parts: [{ type: "text", text: "Summary text" }], + metadata: expect.objectContaining({ + compacted: true, + muxMetadata: { type: "normal" }, + }) as MuxMessage["metadata"], + }); + }); + + it("should forward stream events (stream-end, stream-abort) correctly", async () => { + const compactionReq = createCompactionRequest(); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Ok([0])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamEndEvent("Summary", { customField: "test" }); + await handler.handleCompletion(event); + + const streamEndEvent = emittedEvents.find((_e) => _e.data.message === event); + expect(streamEndEvent).toBeDefined(); + const streamMsg = streamEndEvent?.data.message as StreamEndEvent; + expect((streamMsg.metadata as Record).customField).toBe("test"); + }); + }); +}); diff --git a/src/node/services/compactionHandler.ts b/src/node/services/compactionHandler.ts new file mode 100644 index 000000000..43e13961f --- /dev/null +++ b/src/node/services/compactionHandler.ts @@ -0,0 +1,227 @@ +import type { EventEmitter } from "events"; +import type { HistoryService } from "./historyService"; +import type { StreamEndEvent, StreamAbortEvent } from "@/common/types/stream"; +import type { WorkspaceChatMessage, DeleteMessage } from "@/common/types/ipc"; +import type { Result } from "@/common/types/result"; +import { Ok, Err } from "@/common/types/result"; +import type { LanguageModelV2Usage } from "@ai-sdk/provider"; +import { collectUsageHistory } from "@/common/utils/tokens/displayUsage"; +import { sumUsageHistory } from "@/common/utils/tokens/usageAggregator"; +import { createMuxMessage, type MuxMessage } from "@/common/types/message"; + +interface CompactionHandlerOptions { + workspaceId: string; + historyService: HistoryService; + emitter: EventEmitter; +} + +/** + * Handles history compaction for agent sessions + * + * Responsible for: + * - Detecting compaction requests in stream events + * - Handling Ctrl+C (cancel) and Ctrl+A (accept early) flows + * - Replacing chat history with compacted summaries + * - Preserving cumulative usage across compactions + */ +export class CompactionHandler { + private readonly workspaceId: string; + private readonly historyService: HistoryService; + private readonly emitter: EventEmitter; + private readonly processedCompactionRequestIds: Set = new Set(); + + constructor(options: CompactionHandlerOptions) { + this.workspaceId = options.workspaceId; + this.historyService = options.historyService; + this.emitter = options.emitter; + } + + /** + * Handle compaction stream abort (Ctrl+C cancel or Ctrl+A accept early) + * + * Two flows: + * - Ctrl+C: abandonPartial=true → skip compaction + * - Ctrl+A: abandonPartial=false/undefined → perform compaction with [truncated] + */ + async handleAbort(event: StreamAbortEvent): Promise { + // Check if the last user message is a compaction-request + const historyResult = await this.historyService.getHistory(this.workspaceId); + if (!historyResult.success) { + return false; + } + + const messages = historyResult.data; + const lastUserMsg = [...messages].reverse().find((m) => m.role === "user"); + const isCompaction = lastUserMsg?.metadata?.muxMetadata?.type === "compaction-request"; + + if (!isCompaction || !lastUserMsg) { + return false; + } + + // Ctrl+C flow: abandonPartial=true means user cancelled, skip compaction + if (event.abandonPartial === true) { + return false; + } + + // Ctrl+A flow: Accept early with [truncated] sentinel + // Get the truncated message from historyResult.data + const lastMessage = messages[messages.length - 1]; + if (!lastMessage || lastMessage.role !== "assistant") { + console.warn("[CompactionHandler] Compaction aborted but last message is not assistant"); + return false; + } + + const partialSummary = lastMessage.parts + .filter((part): part is { type: "text"; text: string } => part.type === "text") + .map((part) => part.text) + .join(""); + + // Append [truncated] sentinel + const truncatedSummary = partialSummary.trim() + "\n\n[truncated]"; + + // Perform compaction with truncated summary + const result = await this.performCompaction(truncatedSummary, messages, { + model: lastMessage.metadata?.model ?? "unknown", + usage: event.metadata?.usage, + duration: event.metadata?.duration, + providerMetadata: lastMessage.metadata?.providerMetadata, + systemMessageTokens: lastMessage.metadata?.systemMessageTokens, + }); + if (!result.success) { + console.error("[CompactionHandler] Early compaction failed:", result.error); + return false; + } + + this.emitChatEvent(event); + return true; + } + + /** + * Handle compaction stream completion + * + * Detects when a compaction stream finishes, extracts the summary, + * and performs history replacement atomically. + */ + async handleCompletion(event: StreamEndEvent): Promise { + // Check if the last user message is a compaction-request + const historyResult = await this.historyService.getHistory(this.workspaceId); + if (!historyResult.success) { + return false; + } + + const messages = historyResult.data; + const lastUserMsg = [...messages].reverse().find((m) => m.role === "user"); + const isCompaction = lastUserMsg?.metadata?.muxMetadata?.type === "compaction-request"; + + if (!isCompaction || !lastUserMsg) { + return false; + } + + // Dedupe: If we've already processed this compaction-request, skip + if (this.processedCompactionRequestIds.has(lastUserMsg.id)) { + return true; + } + + const summary = event.parts + .filter((part): part is { type: "text"; text: string } => part.type === "text") + .map((part) => part.text) + .join(""); + + // Mark as processed before performing compaction + this.processedCompactionRequestIds.add(lastUserMsg.id); + + const result = await this.performCompaction(summary, messages, event.metadata); + if (!result.success) { + console.error("[CompactionHandler] Compaction failed:", result.error); + return false; + } + + // Emit stream-end to frontend so UI knows compaction is complete + this.emitChatEvent(event); + return true; + } + + /** + * Perform history compaction by replacing all messages with a summary + * + * Steps: + * 1. Calculate cumulative usage from all messages (for historicalUsage field) + * 2. Clear entire history and get deleted sequence numbers + * 3. Append summary message with metadata + * 4. Emit delete event for old messages + * 5. Emit summary message to frontend + */ + private async performCompaction( + summary: string, + messages: MuxMessage[], + metadata: { + model: string; + usage?: LanguageModelV2Usage; + duration?: number; + providerMetadata?: Record; + systemMessageTokens?: number; + } + ): Promise> { + const usageHistory = collectUsageHistory(messages, undefined); + + const historicalUsage = usageHistory.length > 0 ? sumUsageHistory(usageHistory) : undefined; + + // Clear entire history and get deleted sequences + const clearResult = await this.historyService.clearHistory(this.workspaceId); + if (!clearResult.success) { + return Err(`Failed to clear history: ${clearResult.error}`); + } + const deletedSequences = clearResult.data; + + // Create summary message with metadata + const summaryMessage = createMuxMessage( + `summary-${Date.now()}-${Math.random().toString(36).substring(2, 11)}`, + "assistant", + summary, + { + timestamp: Date.now(), + compacted: true, + model: metadata.model, + usage: metadata.usage, + historicalUsage, + providerMetadata: metadata.providerMetadata, + duration: metadata.duration, + systemMessageTokens: metadata.systemMessageTokens, + muxMetadata: { type: "normal" }, + } + ); + + // Append summary to history + const appendResult = await this.historyService.appendToHistory( + this.workspaceId, + summaryMessage + ); + if (!appendResult.success) { + return Err(`Failed to append summary: ${appendResult.error}`); + } + + // Emit delete event for old messages + if (deletedSequences.length > 0) { + const deleteMessage: DeleteMessage = { + type: "delete", + historySequences: deletedSequences, + }; + this.emitChatEvent(deleteMessage); + } + + // Emit summary message to frontend + this.emitChatEvent(summaryMessage); + + return Ok(undefined); + } + + /** + * Emit chat event through the session's emitter + */ + private emitChatEvent(message: WorkspaceChatMessage): void { + this.emitter.emit("chat-event", { + workspaceId: this.workspaceId, + message, + }); + } +} diff --git a/src/node/services/historyService.ts b/src/node/services/historyService.ts index 312c89b2c..13be7fd38 100644 --- a/src/node/services/historyService.ts +++ b/src/node/services/historyService.ts @@ -418,12 +418,12 @@ export class HistoryService { }); } - async clearHistory(workspaceId: string): Promise> { + async clearHistory(workspaceId: string): Promise> { const result = await this.truncateHistory(workspaceId, 1.0); if (!result.success) { return Err(result.error); } - return Ok(undefined); + return Ok(result.data); } /** diff --git a/src/node/services/ipcMain.ts b/src/node/services/ipcMain.ts index 66cd80d71..519a55ee5 100644 --- a/src/node/services/ipcMain.ts +++ b/src/node/services/ipcMain.ts @@ -1098,7 +1098,7 @@ export class IpcMain { log.debug("interruptStream handler: Received", { workspaceId, options }); try { const session = this.getOrCreateSession(workspaceId); - const stopResult = await session.interruptStream(); + const stopResult = await session.interruptStream(options?.abandonPartial); if (!stopResult.success) { log.error("Failed to stop stream:", stopResult.error); return { success: false, error: stopResult.error }; @@ -1182,19 +1182,12 @@ export class IpcMain { } try { - // Get all existing messages to collect their historySequence numbers - const historyResult = await this.historyService.getHistory(workspaceId); - const deletedSequences = historyResult.success - ? historyResult.data - .map((msg) => msg.metadata?.historySequence ?? -1) - .filter((s) => s >= 0) - : []; - // Clear entire history const clearResult = await this.historyService.clearHistory(workspaceId); if (!clearResult.success) { return Err(`Failed to clear history: ${clearResult.error}`); } + const deletedSequences = clearResult.data; // Append the summary message to history (gets historySequence assigned by backend) // Frontend provides the message with all metadata (compacted, timestamp, etc.) diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index 0faebea56..0c3475340 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -225,7 +225,7 @@ export class StreamManager extends EventEmitter { const existing = this.workspaceStreams.get(workspaceId); if (existing && existing.state !== StreamState.IDLE) { - await this.cancelStreamSafely(workspaceId, existing); + await this.cancelStreamSafely(workspaceId, existing, undefined); } // Generate unique token for this stream (8 hex chars for context efficiency) @@ -408,7 +408,8 @@ export class StreamManager extends EventEmitter { private async cancelStreamSafely( workspaceId: WorkspaceId, - streamInfo: WorkspaceStreamInfo + streamInfo: WorkspaceStreamInfo, + abandonPartial?: boolean ): Promise { try { streamInfo.state = StreamState.STOPPING; @@ -432,6 +433,7 @@ export class StreamManager extends EventEmitter { workspaceId: workspaceId as string, messageId: streamInfo.messageId, metadata: { usage, duration }, + abandonPartial, }); // Clean up immediately @@ -1318,13 +1320,13 @@ export class StreamManager extends EventEmitter { /** * Stops an active stream for a workspace */ - async stopStream(workspaceId: string): Promise> { + async stopStream(workspaceId: string, abandonPartial?: boolean): Promise> { const typedWorkspaceId = workspaceId as WorkspaceId; try { const streamInfo = this.workspaceStreams.get(typedWorkspaceId); if (streamInfo) { - await this.cancelStreamSafely(typedWorkspaceId, streamInfo); + await this.cancelStreamSafely(typedWorkspaceId, streamInfo, abandonPartial); } return Ok(undefined); } catch (error) { diff --git a/tests/ipcMain/sendMessage.test.ts b/tests/ipcMain/sendMessage.test.ts index 61d4113d6..040505ca8 100644 --- a/tests/ipcMain/sendMessage.test.ts +++ b/tests/ipcMain/sendMessage.test.ts @@ -1380,7 +1380,7 @@ These are general instructions that apply to all modes. // Wait for first stream to complete const collector1 = createEventCollector(env.sentEvents, workspaceId); - await collector1.waitForEvent("stream-end", 30000); + await collector1.waitForEvent("stream-end", 60000); assertStreamSuccess(collector1); // 2) Validate UI/history has a dynamic-tool part with a real diff string