Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions scripts/bump_tag.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ if [[ -z "$CURRENT_VERSION" || "$CURRENT_VERSION" == "null" ]]; then
fi

# Parse semver components
IFS='.' read -r MAJOR MINOR_V PATCH <<< "$CURRENT_VERSION"
IFS='.' read -r MAJOR MINOR_V PATCH <<<"$CURRENT_VERSION"

# Calculate new version
if [[ "$MINOR" == "true" ]]; then
Expand All @@ -30,7 +30,7 @@ fi
echo "Bumping version: $CURRENT_VERSION -> $NEW_VERSION"

# Update package.json
jq --arg v "$NEW_VERSION" '.version = $v' package.json > package.json.tmp
jq --arg v "$NEW_VERSION" '.version = $v' package.json >package.json.tmp
mv package.json.tmp package.json

# Commit and tag
Expand Down
7 changes: 6 additions & 1 deletion src/browser/stores/WorkspaceStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,13 @@ export class WorkspaceStore {
const rawContextUsage = activeStreamId
? aggregator.getActiveStreamUsage(activeStreamId)
: undefined;
const rawStepProviderMetadata = activeStreamId
? aggregator.getActiveStreamStepProviderMetadata(activeStreamId)
: undefined;
const liveUsage =
rawContextUsage && model ? createDisplayUsage(rawContextUsage, model) : undefined;
rawContextUsage && model
? createDisplayUsage(rawContextUsage, model, rawStepProviderMetadata)
: undefined;

// Live cost usage (cumulative across all steps, with accumulated cache creation tokens)
const rawCumulativeUsage = activeStreamId
Expand Down
37 changes: 37 additions & 0 deletions src/browser/utils/messages/StreamingMessageAggregator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,40 @@ describe("StreamingMessageAggregator", () => {
expect(aggregator.getActiveStreamCumulativeProviderMetadata("msg-1")).toBeUndefined();
});

test("stores and retrieves step providerMetadata for cache creation display", () => {
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);

aggregator.handleUsageDelta({
type: "usage-delta",
workspaceId: "ws-1",
messageId: "msg-1",
usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 },
cumulativeUsage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 },
providerMetadata: {
anthropic: { cacheCreationInputTokens: 800 },
},
});

expect(aggregator.getActiveStreamStepProviderMetadata("msg-1")).toEqual({
anthropic: { cacheCreationInputTokens: 800 },
});
});

test("step providerMetadata is undefined when not provided", () => {
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);

aggregator.handleUsageDelta({
type: "usage-delta",
workspaceId: "ws-1",
messageId: "msg-1",
usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 },
cumulativeUsage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 },
// No providerMetadata
});

expect(aggregator.getActiveStreamStepProviderMetadata("msg-1")).toBeUndefined();
});

test("clearTokenState clears all usage tracking (step, cumulative, metadata)", () => {
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);

Expand All @@ -526,18 +560,21 @@ describe("StreamingMessageAggregator", () => {
messageId: "msg-1",
usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 },
cumulativeUsage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 },
providerMetadata: { anthropic: { cacheCreationInputTokens: 300 } },
cumulativeProviderMetadata: { anthropic: { cacheCreationInputTokens: 500 } },
});

// All should be defined
expect(aggregator.getActiveStreamUsage("msg-1")).toBeDefined();
expect(aggregator.getActiveStreamStepProviderMetadata("msg-1")).toBeDefined();
expect(aggregator.getActiveStreamCumulativeUsage("msg-1")).toBeDefined();
expect(aggregator.getActiveStreamCumulativeProviderMetadata("msg-1")).toBeDefined();

aggregator.clearTokenState("msg-1");

// All should be cleared
expect(aggregator.getActiveStreamUsage("msg-1")).toBeUndefined();
expect(aggregator.getActiveStreamStepProviderMetadata("msg-1")).toBeUndefined();
expect(aggregator.getActiveStreamCumulativeUsage("msg-1")).toBeUndefined();
expect(aggregator.getActiveStreamCumulativeProviderMetadata("msg-1")).toBeUndefined();
});
Expand Down
53 changes: 29 additions & 24 deletions src/browser/utils/messages/StreamingMessageAggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,17 @@ export class StreamingMessageAggregator {
// Delta history for token counting and TPS calculation
private deltaHistory = new Map<string, DeltaRecordStorage>();

// Active stream step usage (updated on each stream-step event)
// Tracks last step's usage for context window display
private activeStreamStepUsage = new Map<string, LanguageModelV2Usage>();
// Tracks cumulative usage across all steps for live cost display
private activeStreamCumulativeUsage = new Map<string, LanguageModelV2Usage>();
// Tracks cumulative provider metadata for live cost display (with cache creation tokens)
private activeStreamCumulativeProviderMetadata = new Map<string, Record<string, unknown>>();
// Active stream usage tracking (updated on each usage-delta event)
// Consolidates step-level (context window) and cumulative (cost) usage by messageId
private activeStreamUsage = new Map<
string,
{
// Step-level: this step only (for context window display)
step: { usage: LanguageModelV2Usage; providerMetadata?: Record<string, unknown> };
// Cumulative: sum across all steps (for live cost display)
cumulative: { usage: LanguageModelV2Usage; providerMetadata?: Record<string, unknown> };
}
>();

// Current TODO list (updated when todo_write succeeds, cleared on stream end)
// Stream-scoped: automatically reset when stream completes
Expand Down Expand Up @@ -1060,40 +1064,41 @@ export class StreamingMessageAggregator {
*/
clearTokenState(messageId: string): void {
this.deltaHistory.delete(messageId);
this.activeStreamStepUsage.delete(messageId);
this.activeStreamCumulativeUsage.delete(messageId);
this.activeStreamCumulativeProviderMetadata.delete(messageId);
this.activeStreamUsage.delete(messageId);
}

/**
* Handle usage-delta event: update usage tracking for active stream
*/
handleUsageDelta(data: UsageDeltaEvent): void {
// Store last step's usage for context window display
this.activeStreamStepUsage.set(data.messageId, data.usage);
// Store cumulative usage for cost display
this.activeStreamCumulativeUsage.set(data.messageId, data.cumulativeUsage);
// Store cumulative provider metadata for live cost display (with cache creation tokens)
if (data.cumulativeProviderMetadata) {
this.activeStreamCumulativeProviderMetadata.set(
data.messageId,
data.cumulativeProviderMetadata
);
}
this.activeStreamUsage.set(data.messageId, {
step: { usage: data.usage, providerMetadata: data.providerMetadata },
cumulative: {
usage: data.cumulativeUsage,
providerMetadata: data.cumulativeProviderMetadata,
},
});
}

/**
* Get active stream usage for context window display (last step's inputTokens = context size)
*/
getActiveStreamUsage(messageId: string): LanguageModelV2Usage | undefined {
return this.activeStreamStepUsage.get(messageId);
return this.activeStreamUsage.get(messageId)?.step.usage;
}

/**
* Get step provider metadata for context window cache display
*/
getActiveStreamStepProviderMetadata(messageId: string): Record<string, unknown> | undefined {
return this.activeStreamUsage.get(messageId)?.step.providerMetadata;
}

/**
* Get active stream cumulative usage for cost display (sum of all steps)
*/
getActiveStreamCumulativeUsage(messageId: string): LanguageModelV2Usage | undefined {
return this.activeStreamCumulativeUsage.get(messageId);
return this.activeStreamUsage.get(messageId)?.cumulative.usage;
}

/**
Expand All @@ -1102,6 +1107,6 @@ export class StreamingMessageAggregator {
getActiveStreamCumulativeProviderMetadata(
messageId: string
): Record<string, unknown> | undefined {
return this.activeStreamCumulativeProviderMetadata.get(messageId);
return this.activeStreamUsage.get(messageId)?.cumulative.providerMetadata;
}
}
8 changes: 5 additions & 3 deletions src/common/types/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,13 @@ export interface UsageDeltaEvent {
type: "usage-delta";
workspaceId: string;
messageId: string;
// This step's usage (inputTokens = current context size, for context window display)

// Step-level: this step only (for context window display)
usage: LanguageModelV2Usage;
// Cumulative usage across all steps so far (for live cost display)
providerMetadata?: Record<string, unknown>;

// Cumulative: sum across all steps (for live cost display)
cumulativeUsage: LanguageModelV2Usage;
// Cumulative provider metadata across all steps (for live cost display with cache tokens)
cumulativeProviderMetadata?: Record<string, unknown>;
}

Expand Down
64 changes: 64 additions & 0 deletions src/common/utils/tokens/displayUsage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,4 +276,68 @@ describe("createDisplayUsage", () => {
expect(result!.input.tokens).toBe(1000);
expect(result!.cached.tokens).toBe(0);
});

describe("Anthropic cache creation tokens from providerMetadata", () => {
// Cache creation tokens are Anthropic-specific and only available in
// providerMetadata.anthropic.cacheCreationInputTokens, not in LanguageModelV2Usage.
// This is critical for liveUsage display during streaming.

test("extracts cacheCreationInputTokens from providerMetadata", () => {
const usage: LanguageModelV2Usage = {
inputTokens: 1000,
outputTokens: 50,
totalTokens: 1050,
};

const result = createDisplayUsage(usage, "anthropic:claude-sonnet-4-20250514", {
anthropic: { cacheCreationInputTokens: 800 },
});

expect(result).toBeDefined();
expect(result!.cacheCreate.tokens).toBe(800);
});

test("cacheCreate is 0 when providerMetadata is undefined", () => {
const usage: LanguageModelV2Usage = {
inputTokens: 1000,
outputTokens: 50,
totalTokens: 1050,
};

const result = createDisplayUsage(usage, "anthropic:claude-sonnet-4-20250514");

expect(result).toBeDefined();
expect(result!.cacheCreate.tokens).toBe(0);
});

test("cacheCreate is 0 when anthropic metadata lacks cacheCreationInputTokens", () => {
const usage: LanguageModelV2Usage = {
inputTokens: 1000,
outputTokens: 50,
totalTokens: 1050,
};

const result = createDisplayUsage(usage, "anthropic:claude-sonnet-4-20250514", {
anthropic: { someOtherField: 123 },
});

expect(result).toBeDefined();
expect(result!.cacheCreate.tokens).toBe(0);
});

test("handles gateway Anthropic model with cache creation", () => {
const usage: LanguageModelV2Usage = {
inputTokens: 2000,
outputTokens: 100,
totalTokens: 2100,
};

const result = createDisplayUsage(usage, "mux-gateway:anthropic/claude-sonnet-4-5", {
anthropic: { cacheCreationInputTokens: 1500 },
});

expect(result).toBeDefined();
expect(result!.cacheCreate.tokens).toBe(1500);
});
});
});
9 changes: 6 additions & 3 deletions src/node/services/streamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -969,9 +969,12 @@ export class StreamManager extends EventEmitter {
type: "usage-delta",
workspaceId: workspaceId as string,
messageId: streamInfo.messageId,
usage: finishStepPart.usage, // For context window display
cumulativeUsage: streamInfo.cumulativeUsage, // For live cost display
cumulativeProviderMetadata: streamInfo.cumulativeProviderMetadata, // For live cache costs
// Step-level (for context window display)
usage: finishStepPart.usage,
providerMetadata: finishStepPart.providerMetadata,
// Cumulative (for live cost display)
cumulativeUsage: streamInfo.cumulativeUsage,
cumulativeProviderMetadata: streamInfo.cumulativeProviderMetadata,
};
this.emit("usage-delta", usageEvent);
break;
Expand Down