Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/blink/src/cli/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ export default async function run(
const manager = new ChatManager({
chatId: opts?.chat,
chatsDirectory: chatsDir,
onError: (error) => {
console.error("Error:", error);
},
});
manager.setAgent(agent.client);

Expand Down Expand Up @@ -95,9 +98,6 @@ export default async function run(

// Print final state
const finalState = manager.getState();
if (finalState.error) {
console.error("Error:", finalState.error);
}
console.log("Final state:", finalState.messages.pop());
} finally {
manager.dispose();
Expand Down
145 changes: 30 additions & 115 deletions packages/blink/src/local/chat-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ test("initializes with empty state for non-existent chat", async () => {
expect(state.messages).toEqual([]);
expect(state.status).toBe("idle");
expect(state.streamingMessage).toBeUndefined();
expect(state.error).toBeUndefined();
expect(state.queuedMessages).toEqual([]);

manager.dispose();
Expand Down Expand Up @@ -873,126 +872,42 @@ test("watcher onChange does not cause status to flicker during lock release", as
manager.dispose();
});

test("error clearing: errors clear when sending new message", async () => {
const chatsDir = await mkdtemp(join(tmpdir(), "chat-test-"));

try {
// Create a manager with an agent that will fail
const failingAgent: any = {
chat: async () => {
throw new Error("Test error");
},
};

const manager = new ChatManager({
chatId: crypto.randomUUID(),
chatsDirectory: chatsDir,
});

// Track state changes before sending message
let errorSeen = false;
let errorCleared = false;
const unsubscribe = manager.subscribe((state) => {
if (state.error) {
errorSeen = true;
}
if (errorSeen && !state.error) {
errorCleared = true;
}
});

manager.setAgent(failingAgent);

// Send a message that will fail
const message: StoredMessage = {
id: crypto.randomUUID(),
created_at: new Date().toISOString(),
role: "user",
parts: [{ type: "text", text: "Hello" }],
mode: "run",
metadata: undefined,
};

await manager.sendMessages([message]);

// Wait for error state
await new Promise((resolve) => setTimeout(resolve, 100));

// Should have seen an error
expect(errorSeen).toBe(true);
let state = manager.getState();
expect(state.status).toBe("error");

// Now set a working agent
const workingAgent = createMockAgent("Success!");
manager.setAgent(workingAgent);

// Send another message
const message2: StoredMessage = {
id: crypto.randomUUID(),
created_at: new Date().toISOString(),
role: "user",
parts: [{ type: "text", text: "Try again" }],
mode: "run",
metadata: undefined,
};

await manager.sendMessages([message2]);

// Wait for completion
await new Promise((resolve) => setTimeout(resolve, 300));
test("onError callback is called when no agent is available", async () => {
const chatId = crypto.randomUUID();

// Error should have been cleared at some point during the lifecycle
// This is the key behavior - errors should clear when sending new messages
expect(errorCleared).toBe(true);
// Track errors via onError callback
const errors: string[] = [];
const onError = mock((error: string) => {
errors.push(error);
});

// Final state should have no error (watcher may still show error status briefly)
state = manager.getState();
expect(state.error).toBeUndefined();
const manager = new ChatManager({
chatId,
chatsDirectory: tempDir,
onError,
});

unsubscribe();
manager.dispose();
} finally {
await rm(chatsDir, { recursive: true, force: true });
}
});
// Don't set an agent, so it should fail when we try to send a message

test("error clearing: persisted errors don't load from disk", async () => {
const chatsDir = await mkdtemp(join(tmpdir(), "chat-test-"));

try {
const chatId = crypto.randomUUID();

// Manually create a chat with an error in the store
const store = createDiskStore<StoredChat>(chatsDir, "id");
const locked = await store.lock(chatId);
try {
await locked.set({
id: chatId,
created_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
messages: [],
error: "Old persisted error",
});
} finally {
await locked.release();
}
// Send a message without an agent
const message: StoredMessage = {
id: crypto.randomUUID(),
created_at: new Date().toISOString(),
role: "user",
parts: [{ type: "text", text: "Hello" }],
mode: "run",
metadata: undefined,
};

// Create a new manager - it should clear the persisted error
const manager = new ChatManager({
chatId,
chatsDirectory: chatsDir,
});
await manager.sendMessages([message]);

// Wait for initial load
await new Promise((resolve) => setTimeout(resolve, 100));
// Wait a bit for the error to be processed
await new Promise((resolve) => setTimeout(resolve, 100));

const state = manager.getState();
expect(state.error).toBeUndefined(); // Error should be cleared
expect(state.loading).toBe(false);
// Verify onError was called with the "no agent" error message
expect(onError).toHaveBeenCalled();
expect(errors.length).toBeGreaterThan(0);
expect(errors[0]).toContain("agent is not available");

manager.dispose();
} finally {
await rm(chatsDir, { recursive: true, force: true });
}
manager.dispose();
});
64 changes: 17 additions & 47 deletions packages/blink/src/local/chat-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ export interface ChatState {
readonly messages: StoredMessage[];
readonly status: ChatStatus;
readonly streamingMessage?: StoredMessage;
readonly error?: string;
readonly loading: boolean;
readonly queuedMessages: StoredMessage[];
}
Expand All @@ -43,6 +42,10 @@ export interface ChatManagerOptions {
* Return true to include the message, false to exclude it.
*/
readonly filterMessages?: (message: StoredMessage) => boolean;
/**
* Optional callback invoked when an error occurs during chat operations.
*/
readonly onError?: (error: string) => void;
}

type StateListener = (state: ChatState) => void;
Expand All @@ -57,6 +60,7 @@ export class ChatManager {
private chatStore: Store<StoredChat>;
private serializeMessage?: (message: UIMessage) => StoredMessage | undefined;
private filterMessages?: (message: StoredMessage) => boolean;
private onError?: (error: string) => void;

private chat: StoredChat;
private loading = false;
Expand All @@ -82,6 +86,7 @@ export class ChatManager {
this.chatStore = createDiskStore<StoredChat>(options.chatsDirectory, "id");
this.serializeMessage = options.serializeMessage;
this.filterMessages = options.filterMessages;
this.onError = options.onError;

// Start disk watcher
this.watcher = createDiskStoreWatcher<StoredChat>(options.chatsDirectory, {
Expand Down Expand Up @@ -122,21 +127,14 @@ export class ChatManager {

const diskValue = event.value;

let newStatus = event.value?.error ? "error" : "idle";
if (event.locked) {
newStatus = "streaming";
}
let newStatus: ChatStatus = event.locked ? "streaming" : "idle";
const shouldEmit =
this.chat.updated_at !== diskValue?.updated_at ||
this.status !== newStatus;

// Clear persisted errors - they're stale from disk
this.chat = {
...diskValue,
error: undefined,
};
this.chat = diskValue;
this.streamingMessage = undefined;
this.status = newStatus as ChatStatus;
this.status = newStatus;

if (shouldEmit) {
this.notifyListeners();
Expand All @@ -154,14 +152,11 @@ export class ChatManager {
if (!chat) {
return;
}
// Clear any persisted errors on load - they're stale
this.chat = {
...chat,
error: undefined,
};
this.chat = chat;
})
.catch((err) => {
this.chat.error = err instanceof Error ? err.message : String(err);
const errorMessage = err instanceof Error ? err.message : String(err);
this.onError?.(errorMessage);
})
.finally(() => {
this.loading = false;
Expand Down Expand Up @@ -190,7 +185,6 @@ export class ChatManager {
updated_at: this.chat?.updated_at,
status: this.status,
streamingMessage: this.streamingMessage,
error: this.chat?.error,
loading: this.loading,
queuedMessages: this.queue,
};
Expand Down Expand Up @@ -298,22 +292,6 @@ export class ChatManager {
* Send a message to the agent
*/
async sendMessages(messages: StoredMessage[]): Promise<void> {
// Clear any previous errors when sending a new message (persist to disk)
if (this.chat.error) {
const locked = await this.chatStore.lock(this.chatId);
try {
const current = await locked.get();
this.chat = {
...current,
error: undefined,
updated_at: new Date().toISOString(),
};
await locked.set(this.chat);
} finally {
await locked.release();
}
}

this.status = "idle";
this.notifyListeners();

Expand All @@ -331,8 +309,6 @@ export class ChatManager {
}

async start(): Promise<void> {
// Clear error when explicitly starting
this.chat.error = undefined;
this.status = "idle";
this.notifyListeners();
// Do not await this - it will block the server.
Expand All @@ -347,19 +323,16 @@ export class ChatManager {

private async processQueueOrRun(): Promise<void> {
if (!this.agent) {
// Set error state instead of throwing
this.chat.error =
const errorMessage =
"The agent is not available. Please wait for the build to succeed.";
this.status = "error";
this.onError?.(errorMessage);
this.queue = []; // Clear the queue
this.notifyListeners();
return;
}
if (this.isProcessingQueue) {
return;
}
this.isProcessingQueue = true;
this.chat.error = undefined;

let locked: LockedStoreEntry<StoredChat> | undefined;
try {
Expand Down Expand Up @@ -501,15 +474,12 @@ export class ChatManager {
}
}
} catch (err: any) {
this.chat.error = err instanceof Error ? err.message : String(err);
const errorMessage = err instanceof Error ? err.message : String(err);
this.onError?.(errorMessage);
} finally {
this.isProcessingQueue = false;
this.streamingMessage = undefined;
if (this.chat.error) {
this.status = "error";
} else {
this.status = "idle";
}
this.status = "idle";

if (locked) {
this.chat.updated_at = new Date().toISOString();
Expand Down
1 change: 0 additions & 1 deletion packages/blink/src/local/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ export interface StoredChat {
created_at: string;
updated_at: string;
messages: StoredMessage[];
error?: string;
}

export type StoredMessageMetadata = {
Expand Down
2 changes: 0 additions & 2 deletions packages/blink/src/react/use-chat.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ const Harness: React.FC<HarnessProps> = ({ options, onUpdate }) => {
result.status,
result.messages.length,
result.streamingMessage,
result.error,
result.queuedMessages.length,
]);
return null;
Expand Down Expand Up @@ -106,7 +105,6 @@ test("initializes with empty state for non-existent chat", async () => {
expect(r.messages).toEqual([]);
expect(r.status).toBe("idle");
expect(r.streamingMessage).toBeUndefined();
expect(r.error).toBeUndefined();

app.unmount();
});
Expand Down
Loading