Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions packages/blink/src/agent/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,10 @@ export type CapabilitiesResponse = Awaited<ReturnType<Client["capabilities"]>>;
export class Client {
public readonly baseUrl: string;
private readonly client: ReturnType<typeof hc<typeof api>>;
public readonly agentLock: RWLock;

public constructor(options: ClientOptions) {
this.client = hc<typeof api>(options.baseUrl);
this.baseUrl = options.baseUrl;
this.agentLock = new RWLock();
}

/**
Expand Down
3 changes: 2 additions & 1 deletion packages/blink/src/cli/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { resolveConfig } from "../build";
import { findNearestEntry } from "../build/util";
import { existsSync } from "node:fs";
import type { ID } from "../agent/types";
import { RWLock } from "../local/rw-lock";

export default async function run(
message: string[],
Expand Down Expand Up @@ -71,7 +72,7 @@ export default async function run(
console.error("Error:", error);
},
});
manager.setAgent(agent.client);
manager.setAgent({ client: agent.client, lock: new RWLock() });

try {
// Wait for completion by subscribing to state changes
Expand Down
174 changes: 90 additions & 84 deletions packages/blink/src/local/chat-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,99 +10,45 @@ import type { StoredChat, StoredMessage } from "./types";
import type { Client } from "../agent/client";

// Helper to create a mock agent
function createMockAgent(
responseText: string = "Assistant response"
): Client & { chatCalls: any[] } {
function createMockAgent(responseText: string = "Assistant response"): {
lock: RWLock;
client: Client;
chatCalls: any[];
} {
const chatCalls: any[] = [];
return {
agentLock: new RWLock(),
lock: new RWLock(),
chatCalls,
chat: async ({ messages, signal }: any) => {
chatCalls.push({ messages, signal });
client: {
chat: async ({ messages, signal }: any) => {
chatCalls.push({ messages, signal });

// Return a ReadableStream of UIMessageChunk objects
const stream = new ReadableStream<UIMessageChunk>({
async start(controller) {
if (signal?.aborted) {
controller.close();
return;
}

// Start the message
controller.enqueue({
type: "start",
messageId: "msg-1",
} as UIMessageChunk);

// Add text content
controller.enqueue({
type: "text-start",
id: "text-1",
} as UIMessageChunk);

// Send text
controller.enqueue({
type: "text-delta",
id: "text-1",
delta: responseText,
} as UIMessageChunk);

if (!signal?.aborted) {
controller.enqueue({
type: "text-end",
id: "text-1",
} as UIMessageChunk);

controller.enqueue({
type: "finish",
finishReason: "stop",
usage: { promptTokens: 10, completionTokens: 5 },
} as UIMessageChunk);
}
controller.close();
},
});

return stream;
},
} as any;
}

// Helper to create a slow-streaming agent (yields control between chunks)
function createSlowAgent(chunks: number = 5): Client {
return {
agentLock: new RWLock(),
chat: async ({ signal }: any) => {
const stream = new ReadableStream<UIMessageChunk>({
async start(controller) {
try {
// Return a ReadableStream of UIMessageChunk objects
const stream = new ReadableStream<UIMessageChunk>({
async start(controller) {
if (signal?.aborted) {
controller.close();
return;
}

// Start the message
controller.enqueue({
type: "start",
messageId: "msg-1",
} as UIMessageChunk);

// Add text content
controller.enqueue({
type: "text-start",
id: "text-1",
} as UIMessageChunk);

for (let i = 0; i < chunks; i++) {
if (signal?.aborted) {
throw new Error("AbortError");
}
controller.enqueue({
type: "text-delta",
id: "text-1",
delta: `chunk${i}`,
} as UIMessageChunk);
// Yield control to allow other operations
await new Promise((resolve) => setImmediate(resolve));
}
// Send text
controller.enqueue({
type: "text-delta",
id: "text-1",
delta: responseText,
} as UIMessageChunk);

if (!signal?.aborted) {
controller.enqueue({
Expand All @@ -117,18 +63,78 @@ function createSlowAgent(chunks: number = 5): Client {
} as UIMessageChunk);
}
controller.close();
} catch (err: any) {
if (err.message === "AbortError" || signal?.aborted) {
},
});

return stream;
},
} as any,
};
}

// Helper to create a slow-streaming agent (yields control between chunks)
function createSlowAgent(chunks: number = 5): { client: Client; lock: RWLock } {
return {
lock: new RWLock(),
client: {
chat: async ({ signal }: any) => {
const stream = new ReadableStream<UIMessageChunk>({
async start(controller) {
try {
if (signal?.aborted) {
controller.close();
return;
}

controller.enqueue({
type: "start",
messageId: "msg-1",
} as UIMessageChunk);

controller.enqueue({
type: "text-start",
id: "text-1",
} as UIMessageChunk);

for (let i = 0; i < chunks; i++) {
if (signal?.aborted) {
throw new Error("AbortError");
}
controller.enqueue({
type: "text-delta",
id: "text-1",
delta: `chunk${i}`,
} as UIMessageChunk);
// Yield control to allow other operations
await new Promise((resolve) => setImmediate(resolve));
}

if (!signal?.aborted) {
controller.enqueue({
type: "text-end",
id: "text-1",
} as UIMessageChunk);

controller.enqueue({
type: "finish",
finishReason: "stop",
usage: { promptTokens: 10, completionTokens: 5 },
} as UIMessageChunk);
}
controller.close();
} else {
controller.error(err);
} catch (err: any) {
if (err.message === "AbortError" || signal?.aborted) {
controller.close();
} else {
controller.error(err);
}
}
}
},
});
return stream;
},
} as any;
},
});
return stream;
},
} as any,
};
}

// Helper to create a stored message
Expand Down
9 changes: 5 additions & 4 deletions packages/blink/src/local/chat-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
import type { ID } from "../agent/types";
import { stripVTControlCharacters } from "node:util";
import { RWLock } from "./rw-lock";
import type { Agent } from "../react/use-agent";

export type ChatStatus = "idle" | "streaming" | "error";

Expand Down Expand Up @@ -60,7 +61,7 @@ type StateListener = (state: ChatState) => void;
*/
export class ChatManager {
private chatId: ID;
private agent: Client | undefined;
private agent: Agent | undefined;
private chatStore: Store<StoredChat>;
private serializeMessage?: (message: UIMessage) => StoredMessage | undefined;
private filterMessages?: (message: StoredMessage) => boolean;
Expand Down Expand Up @@ -171,7 +172,7 @@ export class ChatManager {
/**
* Update the agent instance to be used for chats
*/
setAgent(agent: Client | undefined): void {
setAgent(agent: Agent | undefined): void {
this.agent = agent;
}

Expand Down Expand Up @@ -428,11 +429,11 @@ export class ChatManager {
});

// Acquire read lock on agent to prevent it from being disposed while streaming.
using _agentLock = await this.agent.agentLock.read();
using _agentLock = await this.agent.lock.read();
// Stream agent response
const streamStartTime = performance.now();
const stream = await runAgent({
agent: this.agent,
agent: this.agent.client,
id: this.chatId as ID,
signal: controller.signal,
messages,
Expand Down
3 changes: 2 additions & 1 deletion packages/blink/src/local/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import { ChatManager } from "./chat-manager";
import { createDiskStore } from "./disk-store";
import { convertMessage, type StoredChat } from "./types";
import { v5 as uuidv5 } from "uuid";
import type { Agent } from "../react/use-agent";

export interface CreateLocalServerOptions {
readonly dataDirectory: string;
readonly port?: number;
readonly getAgent: () => Client | undefined;
readonly getAgent: () => Agent | undefined;
}

export type LocalServer = ReturnType<typeof createLocalServer>;
Expand Down
12 changes: 9 additions & 3 deletions packages/blink/src/react/use-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ export interface UseAgentOptions {
readonly apiServerUrl?: string;
}

export interface Agent {
readonly client: Client;
readonly lock: RWLock;
}

// useAgent is a hook that provides a client for an agent at the given entrypoint.
export default function useAgent(options: UseAgentOptions) {
const [agent, setAgent] = useState<Client | undefined>(undefined);
const [agent, setAgent] = useState<Agent | undefined>(undefined);
const [logs, setLogs] = useState<AgentLog[]>([]);
const [error, setError] = useState<Error | undefined>(undefined);
const [buildResult, setBuildResult] = useState(options.buildResult);
Expand Down Expand Up @@ -133,7 +138,8 @@ export default function useAgent(options: UseAgentOptions) {
const client = new Client({
baseUrl: `http://127.0.0.1:${port}`,
});
lock = client.agentLock;
const agentLock = new RWLock();
lock = agentLock;
// Wait for the health endpoint to be alive.
while (!controller.signal.aborted) {
try {
Expand All @@ -150,7 +156,7 @@ export default function useAgent(options: UseAgentOptions) {
ready = true;
const capabilities = await client.capabilities();
setCapabilities(capabilities);
setAgent(client);
setAgent({ client, lock: agentLock });
})().catch((err) => {
// Don't set error if this was just a cleanup abort
if (!isCleanup) {
Expand Down
3 changes: 2 additions & 1 deletion packages/blink/src/react/use-chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import type { Client } from "../agent/client";
import { ChatManager, type ChatState } from "../local/chat-manager";
import type { StoredMessage } from "../local/types";
import type { ID } from "../agent/types";
import type { Agent } from "./use-agent";

export type { ChatStatus } from "../local/chat-manager";

export interface UseChatOptions {
readonly chatId: ID;
readonly agent: Client | undefined;
readonly agent: Agent | undefined;
readonly chatsDirectory: string;
/**
* Optional function to filter messages before persisting them.
Expand Down
12 changes: 6 additions & 6 deletions packages/blink/src/react/use-dev-mode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { isLogMessage, isStoredMessageMetadata } from "../local/types";
import type { BuildLog } from "../build";
import type { ID, UIOptions, UIOptionsSchema } from "../agent/index.browser";
import useOptions from "./use-options";
import useAgent, { type AgentLog } from "./use-agent";
import useAgent, { type AgentLog, type Agent } from "./use-agent";
import useBundler, { type BundlerStatus } from "./use-bundler";
import useChat, { type UseChat } from "./use-chat";
import useDevhook from "./use-devhook";
Expand Down Expand Up @@ -196,7 +196,7 @@ export default function useDevMode(options: UseDevModeOptions): UseDevMode {
}, [env, options.onEnvLoaded]);

// Server - always use run agent for webhook/API handling
const runAgentRef = useRef<Client | undefined>(undefined);
const runAgentRef = useRef<Agent | undefined>(undefined);
const server = useMemo(() => {
return createLocalServer({
port: 0,
Expand All @@ -219,7 +219,7 @@ export default function useDevMode(options: UseDevModeOptions): UseDevMode {

// Edit agent
const {
client: editAgent,
agent: editAgent,
error: editAgentError,
missingApiKey: editModeMissingApiKey,
setUserAgentUrl,
Expand Down Expand Up @@ -247,7 +247,7 @@ export default function useDevMode(options: UseDevModeOptions): UseDevMode {
// Update edit agent with user agent URL and handle cleanup
useEffect(() => {
if (agent) {
setUserAgentUrl(agent.baseUrl);
setUserAgentUrl(agent.client.baseUrl);
}

// Stop streaming when agents become unavailable
Expand Down Expand Up @@ -382,7 +382,7 @@ export default function useDevMode(options: UseDevModeOptions): UseDevMode {

// Always send the request to the user's agent (not the edit agent)
const requestURL = new URL(request.url);
const agentURL = new URL(agent.baseUrl);
const agentURL = new URL(agent.client.baseUrl);
agentURL.pathname = requestURL.pathname;
agentURL.search = requestURL.search;

Expand Down Expand Up @@ -431,7 +431,7 @@ export default function useDevMode(options: UseDevModeOptions): UseDevMode {
error: optionsError,
setOption,
} = useOptions({
agent: mode === "run" ? agent : editAgent,
agent: mode === "run" ? agent?.client : editAgent?.client,
capabilities,
messages: chat.messages,
});
Expand Down
Loading