From 86489da5fb97d19f0cf919ffa61a74b63c9ced5e Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 3 Dec 2025 14:00:35 +0100 Subject: [PATCH] feat: unify Electron and CLI with shared oRPC API server Electron now runs an HTTP/WS API server on 127.0.0.1 alongside the existing MessagePort transport. Both transports share the same oRPC router instance with auth middleware. - Add ServerLockfile to manage ~/.mux/server.lock for discovery - Extend ServerService with startServer/stopServer lifecycle - Move orpcServer.ts to src/node/orpc/server.ts (shared module) - Electron generates auth token, injects into both transports - CLI server checks lockfile to prevent conflicts - CLI api auto-discovers running server via lockfile Env vars: - MUX_SERVER_AUTH_TOKEN: Override auth token - MUX_SERVER_PORT: Fixed port (default: random) - MUX_NO_API_SERVER=1: Disable API server in Electron --- src/cli/api.ts | 58 +++- src/cli/cli.test.ts | 262 ++++++++++++++++++ src/cli/index.ts | 5 +- src/cli/proxifyOrpc.ts | 32 ++- src/cli/server.test.ts | 2 +- src/cli/server.ts | 29 +- src/desktop/main.ts | 69 ++++- .../orpcServer.ts => node/orpc/server.ts} | 21 +- src/node/services/serverLockfile.test.ts | 164 +++++++++++ src/node/services/serverLockfile.ts | 109 ++++++++ src/node/services/serverService.test.ts | 152 +++++++++- src/node/services/serverService.ts | 111 ++++++++ 12 files changed, 981 insertions(+), 33 deletions(-) create mode 100644 src/cli/cli.test.ts rename src/{cli/orpcServer.ts => node/orpc/server.ts} (84%) create mode 100644 src/node/services/serverLockfile.test.ts create mode 100644 src/node/services/serverLockfile.ts diff --git a/src/cli/api.ts b/src/cli/api.ts index eee11821d..3618d83c8 100644 --- a/src/cli/api.ts +++ b/src/cli/api.ts @@ -3,19 +3,63 @@ * * This module is loaded lazily to avoid pulling in ESM-only dependencies * (trpc-cli) when running other commands like the desktop app. + * + * Server discovery priority: + * 1. MUX_SERVER_URL env var (explicit override) + * 2. Lockfile at ~/.mux/server.lock (running Electron or mux server) + * 3. Fallback to http://localhost:3000 */ import { createCli } from "trpc-cli"; import { router } from "@/node/orpc/router"; import { proxifyOrpc } from "./proxifyOrpc"; +import { ServerLockfile } from "@/node/services/serverLockfile"; +import { getMuxHome } from "@/common/constants/paths"; import type { Command } from "commander"; -const baseUrl = process.env.MUX_SERVER_URL ?? "http://localhost:3000"; -const authToken = process.env.MUX_SERVER_AUTH_TOKEN; +interface ServerDiscovery { + baseUrl: string; + authToken: string | undefined; +} + +async function discoverServer(): Promise { + // Priority 1: Explicit env vars override everything + if (process.env.MUX_SERVER_URL) { + return { + baseUrl: process.env.MUX_SERVER_URL, + authToken: process.env.MUX_SERVER_AUTH_TOKEN, + }; + } + + // Priority 2: Try lockfile discovery (running Electron or mux server) + try { + const lockfile = new ServerLockfile(getMuxHome()); + const data = await lockfile.read(); + if (data) { + return { + baseUrl: data.baseUrl, + authToken: data.token, + }; + } + } catch { + // Ignore lockfile errors + } + + // Priority 3: Default fallback (standalone server on default port) + return { + baseUrl: "http://localhost:3000", + authToken: process.env.MUX_SERVER_AUTH_TOKEN, + }; +} + +// Run async discovery then start CLI +(async () => { + const { baseUrl, authToken } = await discoverServer(); -const proxiedRouter = proxifyOrpc(router(), { baseUrl, authToken }); -const cli = createCli({ router: proxiedRouter }).buildProgram() as Command; + const proxiedRouter = proxifyOrpc(router(), { baseUrl, authToken }); + const cli = createCli({ router: proxiedRouter }).buildProgram() as Command; -cli.name("mux api"); -cli.description("Interact with the mux API via a running server"); -cli.parse(); + cli.name("mux api"); + cli.description("Interact with the mux API via a running server"); + cli.parse(); +})(); diff --git a/src/cli/cli.test.ts b/src/cli/cli.test.ts new file mode 100644 index 000000000..58ca6f197 --- /dev/null +++ b/src/cli/cli.test.ts @@ -0,0 +1,262 @@ +/** + * E2E tests for the CLI layer (mux api commands). + * + * These tests verify that: + * 1. CLI commands work correctly via HTTP to a real server + * 2. Input schema transformations (proxifyOrpc) are correct + * 3. Authentication flows work as expected + * + * Uses bun:test and the same server setup pattern as server.test.ts. + * Tests the full flow: CLI args → trpc-cli → proxifyOrpc → HTTP → oRPC server + */ +import { describe, test, expect, beforeAll, afterAll } from "bun:test"; +import * as os from "os"; +import * as path from "path"; +import * as fs from "fs/promises"; +import type { BrowserWindow, WebContents } from "electron"; + +import { createCli, FailedToExitError } from "trpc-cli"; +import { router } from "@/node/orpc/router"; +import { proxifyOrpc } from "./proxifyOrpc"; +import type { ORPCContext } from "@/node/orpc/context"; +import { Config } from "@/node/config"; +import { ServiceContainer } from "@/node/services/serviceContainer"; +import { createOrpcServer, type OrpcServer } from "@/node/orpc/server"; + +// --- Test Server Factory --- + +interface TestServerHandle { + server: OrpcServer; + tempDir: string; + close: () => Promise; +} + +/** + * Create a test server using the actual createOrpcServer function. + * Sets up services and config in a temp directory. + */ +async function createTestServer(authToken?: string): Promise { + // Create temp dir for config + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "mux-cli-test-")); + const config = new Config(tempDir); + + // Mock BrowserWindow + const mockWindow: BrowserWindow = { + isDestroyed: () => false, + setTitle: () => undefined, + webContents: { + send: () => undefined, + openDevTools: () => undefined, + } as unknown as WebContents, + } as unknown as BrowserWindow; + + // Initialize services + const services = new ServiceContainer(config); + await services.initialize(); + services.windowService.setMainWindow(mockWindow); + + // Build context + const context: ORPCContext = { + projectService: services.projectService, + workspaceService: services.workspaceService, + providerService: services.providerService, + terminalService: services.terminalService, + windowService: services.windowService, + updateService: services.updateService, + tokenizerService: services.tokenizerService, + serverService: services.serverService, + menuEventService: services.menuEventService, + voiceService: services.voiceService, + }; + + // Use the actual createOrpcServer function + const server = await createOrpcServer({ + context, + authToken, + // port 0 = random available port + onOrpcError: () => undefined, // Silence errors in tests + }); + + return { + server, + tempDir, + close: async () => { + await server.close(); + // Cleanup temp directory + await fs.rm(tempDir, { recursive: true, force: true }).catch(() => undefined); + }, + }; +} + +// --- CLI Runner Factory --- + +/** + * Create a CLI runner that executes commands against a running server. + * Uses trpc-cli's programmatic API to avoid subprocess overhead. + */ +function createCliRunner(baseUrl: string, authToken?: string) { + const proxiedRouter = proxifyOrpc(router(), { baseUrl, authToken }); + const cli = createCli({ router: proxiedRouter }); + + return async (args: string[]): Promise => { + return cli + .run({ + argv: args, + process: { exit: () => void 0 as never }, + // eslint-disable-next-line @typescript-eslint/no-empty-function + logger: { info: () => {}, error: () => {} }, + }) + .catch((err) => { + // Extract the result or re-throw the actual error + while (err instanceof FailedToExitError) { + if (err.exitCode === 0) { + return err.cause; // This is the return value of the procedure + } + err = err.cause; // Use the underlying error + } + throw err; + }); + }; +} + +// --- Tests --- + +describe("CLI via HTTP", () => { + let serverHandle: TestServerHandle; + let runCli: (args: string[]) => Promise; + + beforeAll(async () => { + serverHandle = await createTestServer(); + runCli = createCliRunner(serverHandle.server.baseUrl); + }); + + afterAll(async () => { + await serverHandle.close(); + }); + + describe("void input schemas (regression for proxifyOrpc fix)", () => { + // These tests verify the fix in proxifyOrpc.ts that transforms {} to undefined + // for z.void() inputs. Without the fix, these would fail with BAD_REQUEST. + + test("workspace list works with void input", async () => { + const result = await runCli(["workspace", "list"]); + expect(Array.isArray(result)).toBe(true); + }); + + test("providers list works with void input", async () => { + const result = (await runCli(["providers", "list"])) as string[]; + expect(Array.isArray(result)).toBe(true); + expect(result).toContain("anthropic"); + }); + + test("projects list works with void input", async () => { + const result = await runCli(["projects", "list"]); + expect(Array.isArray(result)).toBe(true); + }); + + test("providers get-config works with void input", async () => { + const result = await runCli(["providers", "get-config"]); + expect(typeof result).toBe("object"); + expect(result).not.toBeNull(); + }); + + test("workspace activity list works with void input", async () => { + const result = await runCli(["workspace", "activity", "list"]); + expect(typeof result).toBe("object"); + expect(result).not.toBeNull(); + }); + }); + + describe("string input schemas", () => { + test("general ping with string argument", async () => { + const result = await runCli(["general", "ping", "hello"]); + expect(result).toBe("Pong: hello"); + }); + + test("general ping with empty string", async () => { + const result = await runCli(["general", "ping", ""]); + expect(result).toBe("Pong: "); + }); + + test("general ping with special characters", async () => { + const result = await runCli(["general", "ping", "hello world!"]); + expect(result).toBe("Pong: hello world!"); + }); + }); + + describe("object input schemas", () => { + test("workspace get-info with workspace-id option", async () => { + const result = await runCli(["workspace", "get-info", "--workspace-id", "nonexistent"]); + expect(result).toBeNull(); // Non-existent workspace returns null + }); + + test("general tick with object options", async () => { + const result = await runCli(["general", "tick", "--count", "2", "--interval-ms", "10"]); + // tick returns an async generator, so result should be the generator + expect(result).toBeDefined(); + }); + }); +}); + +describe("CLI Authentication", () => { + test("valid auth token allows requests", async () => { + const authToken = "test-secret-token"; + const serverHandle = await createTestServer(authToken); + const runCli = createCliRunner(serverHandle.server.baseUrl, authToken); + + try { + const result = await runCli(["workspace", "list"]); + expect(Array.isArray(result)).toBe(true); + } finally { + await serverHandle.close(); + } + }); + + test("invalid auth token rejects requests", async () => { + const authToken = "correct-token"; + const serverHandle = await createTestServer(authToken); + const runCli = createCliRunner(serverHandle.server.baseUrl, "wrong-token"); + + try { + let threw = false; + try { + await runCli(["workspace", "list"]); + } catch { + threw = true; + } + expect(threw).toBe(true); + } finally { + await serverHandle.close(); + } + }); + + test("missing auth token when required rejects requests", async () => { + const authToken = "required-token"; + const serverHandle = await createTestServer(authToken); + const runCli = createCliRunner(serverHandle.server.baseUrl); // No token + + try { + let threw = false; + try { + await runCli(["workspace", "list"]); + } catch { + threw = true; + } + expect(threw).toBe(true); + } finally { + await serverHandle.close(); + } + }); + + test("no auth token required when server has none", async () => { + const serverHandle = await createTestServer(); // No auth token on server + const runCli = createCliRunner(serverHandle.server.baseUrl); // No token + + try { + const result = await runCli(["workspace", "list"]); + expect(Array.isArray(result)).toBe(true); + } finally { + await serverHandle.close(); + } + }); +}); diff --git a/src/cli/index.ts b/src/cli/index.ts index c1df6cbce..e518c9e50 100644 --- a/src/cli/index.ts +++ b/src/cli/index.ts @@ -40,8 +40,9 @@ if (subcommand === "run") { require("./server"); } else if (subcommand === "api") { process.argv.splice(2, 1); - // eslint-disable-next-line @typescript-eslint/no-require-imports - require("./api"); + // Dynamic import required: trpc-cli is ESM-only and can't be require()'d + // eslint-disable-next-line no-restricted-syntax + void import("./api"); } else if ( subcommand === "desktop" || (isElectron && (subcommand === undefined || isElectronLaunchArg)) diff --git a/src/cli/proxifyOrpc.ts b/src/cli/proxifyOrpc.ts index f17d005d5..784999190 100644 --- a/src/cli/proxifyOrpc.ts +++ b/src/cli/proxifyOrpc.ts @@ -74,6 +74,28 @@ function isZod4Like(value: unknown): value is Zod4Like { ); } +/** + * Check if a schema is z.void() or z.undefined(). + * These schemas accept `undefined` but not `{}`. + */ +function isVoidOrUndefinedSchema(schema: unknown): boolean { + if (!isZod4Like(schema)) return false; + const def = getDef(schema); + return def?.type === "void" || def?.type === "undefined"; +} + +/** + * Check if a value is an empty object `{}`. + */ +function isEmptyObject(value: unknown): boolean { + return ( + typeof value === "object" && + value !== null && + !Array.isArray(value) && + Object.keys(value).length === 0 + ); +} + /** * Get the def from a Zod 4 schema (handles both .def and ._def). */ @@ -572,9 +594,13 @@ function createProcedureProxy( path: string[] ): OrpcProcedureLike { const originalDef = procedure["~orpc"]; + const originalInputSchema = originalDef.inputSchema; + + // Check if the original schema was void/undefined - trpc-cli sends {} but server expects undefined + const isVoidInput = isVoidOrUndefinedSchema(originalInputSchema); // Enhance input schema to show rich field descriptions in CLI help - const enhancedInputSchema = enhanceInputSchema(originalDef.inputSchema); + const enhancedInputSchema = enhanceInputSchema(originalInputSchema); // Navigate to the client method using the path (lazily creates client on call) const getClientMethod = (): ((input: unknown) => Promise) => { @@ -604,7 +630,9 @@ function createProcedureProxy( // eslint-disable-next-line @typescript-eslint/no-explicit-any handler: async (opts: { input: unknown }): Promise => { const clientMethod = getClientMethod(); - return clientMethod(opts.input); + // trpc-cli sends {} for void inputs, but the server expects undefined + const input = isVoidInput && isEmptyObject(opts.input) ? undefined : opts.input; + return clientMethod(input); }, }, }; diff --git a/src/cli/server.test.ts b/src/cli/server.test.ts index 904048e38..336ae9fac 100644 --- a/src/cli/server.test.ts +++ b/src/cli/server.test.ts @@ -24,7 +24,7 @@ import type { ORPCContext } from "@/node/orpc/context"; import { Config } from "@/node/config"; import { ServiceContainer } from "@/node/services/serviceContainer"; import type { RouterClient } from "@orpc/server"; -import { createOrpcServer, type OrpcServer } from "./orpcServer"; +import { createOrpcServer, type OrpcServer } from "@/node/orpc/server"; // --- Test Server Factory --- diff --git a/src/cli/server.ts b/src/cli/server.ts index 89f98d150..cdd1624be 100644 --- a/src/cli/server.ts +++ b/src/cli/server.ts @@ -4,11 +4,12 @@ */ import { Config } from "@/node/config"; import { ServiceContainer } from "@/node/services/serviceContainer"; -import { migrateLegacyMuxHome } from "@/common/constants/paths"; +import { ServerLockfile } from "@/node/services/serverLockfile"; +import { getMuxHome, migrateLegacyMuxHome } from "@/common/constants/paths"; import type { BrowserWindow } from "electron"; import { Command } from "commander"; import { validateProjectPath } from "@/node/utils/pathUtils"; -import { createOrpcServer } from "./orpcServer"; +import { createOrpcServer } from "@/node/orpc/server"; import type { ORPCContext } from "@/node/orpc/context"; const program = new Command(); @@ -44,6 +45,15 @@ const mockWindow: BrowserWindow = { (async () => { migrateLegacyMuxHome(); + // Check for existing server (Electron or another mux server instance) + const lockfile = new ServerLockfile(getMuxHome()); + const existing = await lockfile.read(); + if (existing) { + console.error(`Error: mux API server is already running at ${existing.baseUrl}`); + console.error(`Use 'mux api' commands to interact with the running instance.`); + process.exit(1); + } + const config = new Config(); const serviceContainer = new ServiceContainer(config); await serviceContainer.initialize(); @@ -78,7 +88,22 @@ const mockWindow: BrowserWindow = { serveStatic: true, }); + // Acquire lockfile so other instances know we're running + await lockfile.acquire(server.baseUrl, AUTH_TOKEN ?? ""); + console.log(`Server is running on ${server.baseUrl}`); + + // Cleanup on shutdown + const cleanup = () => { + console.log("Shutting down server..."); + void lockfile + .release() + .then(() => server.close()) + .then(() => process.exit(0)); + }; + + process.on("SIGINT", cleanup); + process.on("SIGTERM", cleanup); })().catch((error) => { console.error("Failed to initialize server:", error); process.exit(1); diff --git a/src/desktop/main.ts b/src/desktop/main.ts index 0add1b776..b51782abc 100644 --- a/src/desktop/main.ts +++ b/src/desktop/main.ts @@ -1,8 +1,10 @@ // Enable source map support for better error stack traces in production import "source-map-support/register"; +import { randomBytes } from "crypto"; import { RPCHandler } from "@orpc/server/message-port"; import { onError } from "@orpc/server"; import { router } from "@/node/orpc/router"; +import { ServerLockfile } from "@/node/services/serverLockfile"; import "disposablestack/auto"; import type { MenuItemConstructorOptions } from "electron"; @@ -304,7 +306,13 @@ async function loadServices(): Promise { services = new ServiceContainerClass(config); await services.initialize(); - const orpcHandler = new RPCHandler(router(), { + // Generate auth token (use env var or random per-session) + const authToken = process.env.MUX_SERVER_AUTH_TOKEN ?? randomBytes(32).toString("hex"); + + // Single router instance with auth middleware - used for both MessagePort and HTTP/WS + const orpcRouter = router(authToken); + + const orpcHandler = new RPCHandler(orpcRouter, { interceptors: [ onError((error) => { console.error("ORPC Error:", error); @@ -312,25 +320,57 @@ async function loadServices(): Promise { ], }); + // Build the oRPC context with all services + const orpcContext = { + projectService: services.projectService, + workspaceService: services.workspaceService, + providerService: services.providerService, + terminalService: services.terminalService, + windowService: services.windowService, + updateService: services.updateService, + tokenizerService: services.tokenizerService, + serverService: services.serverService, + menuEventService: services.menuEventService, + voiceService: services.voiceService, + }; + electronIpcMain.on("start-orpc-server", (event) => { const [serverPort] = event.ports; orpcHandler.upgrade(serverPort, { context: { - projectService: services!.projectService, - workspaceService: services!.workspaceService, - providerService: services!.providerService, - terminalService: services!.terminalService, - windowService: services!.windowService, - updateService: services!.updateService, - tokenizerService: services!.tokenizerService, - serverService: services!.serverService, - menuEventService: services!.menuEventService, - voiceService: services!.voiceService, + ...orpcContext, + // Inject synthetic auth header so auth middleware passes + headers: { authorization: `Bearer ${authToken}` }, }, }); serverPort.start(); }); + // Start HTTP/WS API server for CLI access (unless explicitly disabled) + if (process.env.MUX_NO_API_SERVER !== "1") { + const lockfile = new ServerLockfile(config.rootDir); + const existing = await lockfile.read(); + + if (existing) { + console.log(`[${timestamp()}] API server already running at ${existing.baseUrl}, skipping`); + } else { + try { + const port = process.env.MUX_SERVER_PORT ? parseInt(process.env.MUX_SERVER_PORT, 10) : 0; + const serverInfo = await services.serverService.startServer({ + muxHome: config.rootDir, + context: orpcContext, + router: orpcRouter, + authToken, + port, + }); + console.log(`[${timestamp()}] API server started at ${serverInfo.baseUrl}`); + } catch (error) { + console.error(`[${timestamp()}] Failed to start API server:`, error); + // Non-fatal - continue without API server + } + } + } + // Set TerminalWindowManager for desktop mode (pop-out terminal windows) const terminalWindowManager = new TerminalWindowManagerClass(config); services.setProjectDirectoryPicker(async () => { @@ -527,6 +567,13 @@ if (gotTheLock) { } }); + app.on("before-quit", () => { + console.log(`[${timestamp()}] App before-quit - cleaning up API server...`); + if (services) { + void services.serverService.stopServer(); + } + }); + app.on("activate", () => { // Skip splash on reactivation - services already loaded, window creation is fast if (app.isReady() && mainWindow === null) { diff --git a/src/cli/orpcServer.ts b/src/node/orpc/server.ts similarity index 84% rename from src/cli/orpcServer.ts rename to src/node/orpc/server.ts index 10a3258ae..ea2d1cacd 100644 --- a/src/cli/orpcServer.ts +++ b/src/node/orpc/server.ts @@ -13,7 +13,7 @@ import { WebSocketServer } from "ws"; import { RPCHandler } from "@orpc/server/node"; import { RPCHandler as ORPCWebSocketServerHandler } from "@orpc/server/ws"; import { onError } from "@orpc/server"; -import { router } from "@/node/orpc/router"; +import { router, type AppRouter } from "@/node/orpc/router"; import type { ORPCContext } from "@/node/orpc/context"; import { extractWsHeaders } from "@/node/orpc/authMiddleware"; import { VERSION } from "@/version"; @@ -30,12 +30,14 @@ export interface OrpcServerOptions { context: ORPCContext; /** Whether to serve static files and SPA fallback (default: false) */ serveStatic?: boolean; - /** Directory to serve static files from (default: __dirname/..) */ + /** Directory to serve static files from (default: dist/ relative to dist/node/orpc/) */ staticDir?: string; /** Custom error handler for oRPC errors */ onOrpcError?: (error: unknown) => void; - /** Optional bearer token for HTTP auth */ + /** Optional bearer token for HTTP auth (used if router not provided) */ authToken?: string; + /** Optional pre-created router (if not provided, creates router(authToken)) */ + router?: AppRouter; } export interface OrpcServer { @@ -71,8 +73,10 @@ export async function createOrpcServer({ authToken, context, serveStatic = false, - staticDir = path.join(__dirname, ".."), + // From dist/node/orpc/, go up 2 levels to reach dist/ where index.html lives + staticDir = path.join(__dirname, "../.."), onOrpcError = (error) => log.error("ORPC Error:", error), + router: existingRouter, }: OrpcServerOptions): Promise { // Express app setup const app = express(); @@ -94,7 +98,7 @@ export async function createOrpcServer({ res.json({ ...VERSION, mode: "server" }); }); - const orpcRouter = router(authToken); + const orpcRouter = existingRouter ?? router(authToken); // oRPC HTTP handler const orpcHandler = new RPCHandler(orpcRouter, { @@ -147,13 +151,16 @@ export async function createOrpcServer({ } const actualPort = address.port; + // Wildcard addresses (0.0.0.0, ::) are not routable - convert to loopback for lockfile + const connectableHost = host === "0.0.0.0" || host === "::" ? "127.0.0.1" : host; + return { httpServer, wsServer, app, port: actualPort, - baseUrl: `http://${host}:${actualPort}`, - wsUrl: `ws://${host}:${actualPort}/orpc/ws`, + baseUrl: `http://${connectableHost}:${actualPort}`, + wsUrl: `ws://${connectableHost}:${actualPort}/orpc/ws`, close: async () => { // Close WebSocket server first wsServer.close(); diff --git a/src/node/services/serverLockfile.test.ts b/src/node/services/serverLockfile.test.ts new file mode 100644 index 000000000..2e66d9eca --- /dev/null +++ b/src/node/services/serverLockfile.test.ts @@ -0,0 +1,164 @@ +import { describe, test, expect, beforeEach, afterEach } from "bun:test"; +import * as fs from "fs/promises"; +import * as path from "path"; +import * as os from "os"; +import { ServerLockfile } from "./serverLockfile"; + +describe("ServerLockfile", () => { + let tempDir: string; + let lockfile: ServerLockfile; + + beforeEach(async () => { + tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "mux-lock-test-")); + lockfile = new ServerLockfile(tempDir); + }); + + afterEach(async () => { + await fs.rm(tempDir, { recursive: true, force: true }); + }); + + test("acquire creates lockfile with correct data", async () => { + await lockfile.acquire("http://localhost:12345", "test-token"); + + const data = await lockfile.read(); + expect(data).not.toBeNull(); + expect(data!.baseUrl).toBe("http://localhost:12345"); + expect(data!.token).toBe("test-token"); + expect(data!.pid).toBe(process.pid); + expect(data!.startedAt).toBeDefined(); + }); + + test("read returns null for non-existent lockfile", async () => { + const data = await lockfile.read(); + expect(data).toBeNull(); + }); + + test("read returns null for stale lockfile (dead PID)", async () => { + const lockPath = lockfile.getLockPath(); + + // Write lockfile with fake dead PID + await fs.writeFile( + lockPath, + JSON.stringify({ + pid: 999999999, // Very unlikely to be a real PID + baseUrl: "http://localhost:12345", + token: "test-token", + startedAt: new Date().toISOString(), + }) + ); + + const data = await lockfile.read(); + expect(data).toBeNull(); + + // Stale lockfile should be cleaned up + let exists = true; + try { + await fs.access(lockPath); + } catch { + exists = false; + } + expect(exists).toBe(false); + }); + + test("read returns data for lockfile with current PID", async () => { + await lockfile.acquire("http://127.0.0.1:54321", "valid-token"); + + const data = await lockfile.read(); + expect(data).not.toBeNull(); + expect(data!.baseUrl).toBe("http://127.0.0.1:54321"); + expect(data!.token).toBe("valid-token"); + }); + + test("release removes lockfile", async () => { + await lockfile.acquire("http://localhost:12345", "test-token"); + const lockPath = lockfile.getLockPath(); + + let exists = true; + try { + await fs.access(lockPath); + } catch { + exists = false; + } + expect(exists).toBe(true); + + await lockfile.release(); + + try { + await fs.access(lockPath); + exists = true; + } catch { + exists = false; + } + expect(exists).toBe(false); + }); + + test("release is idempotent (no error if file doesn't exist)", async () => { + // Should not throw + await lockfile.release(); + await lockfile.release(); + }); + + test("lockfile has restrictive permissions on unix", async () => { + // Skip on Windows where file permissions work differently + if (process.platform === "win32") { + return; + } + + await lockfile.acquire("http://localhost:12345", "test-token"); + const lockPath = lockfile.getLockPath(); + const stats = await fs.stat(lockPath); + + // 0o600 = owner read/write only + expect(stats.mode & 0o777).toBe(0o600); + }); + + test("acquire overwrites existing lockfile", async () => { + await lockfile.acquire("http://localhost:11111", "first-token"); + await lockfile.acquire("https://my.machine.local/mux", "second-token"); + + const data = await lockfile.read(); + expect(data).not.toBeNull(); + expect(data!.baseUrl).toBe("https://my.machine.local/mux"); + expect(data!.token).toBe("second-token"); + }); + + test("read handles corrupted lockfile gracefully", async () => { + const lockPath = lockfile.getLockPath(); + await fs.writeFile(lockPath, "not valid json"); + + const data = await lockfile.read(); + expect(data).toBeNull(); + }); + + test("acquire creates parent directory if it doesn't exist", async () => { + const nestedDir = path.join(tempDir, "nested", "dir"); + const nestedLockfile = new ServerLockfile(nestedDir); + + await nestedLockfile.acquire("http://localhost:12345", "test-token"); + + const data = await nestedLockfile.read(); + expect(data).not.toBeNull(); + expect(data!.baseUrl).toBe("http://localhost:12345"); + }); + + test("getLockPath returns correct path", () => { + const expectedPath = path.join(tempDir, "server.lock"); + expect(lockfile.getLockPath()).toBe(expectedPath); + }); + + test("supports HTTPS URLs", async () => { + await lockfile.acquire("https://secure.example.com:8443/api", "secure-token"); + + const data = await lockfile.read(); + expect(data).not.toBeNull(); + expect(data!.baseUrl).toBe("https://secure.example.com:8443/api"); + }); + + test("supports URLs with path prefixes", async () => { + await lockfile.acquire("https://my.machine.local/mux/", "path-token"); + + const data = await lockfile.read(); + expect(data).not.toBeNull(); + expect(data!.baseUrl).toBe("https://my.machine.local/mux/"); + }); +}); diff --git a/src/node/services/serverLockfile.ts b/src/node/services/serverLockfile.ts new file mode 100644 index 000000000..60525bdd7 --- /dev/null +++ b/src/node/services/serverLockfile.ts @@ -0,0 +1,109 @@ +import * as fs from "fs/promises"; +import * as path from "path"; +import { z } from "zod"; + +export const ServerLockDataSchema = z.object({ + pid: z.number(), + /** Base URL for HTTP API (e.g., "http://localhost:3000" or "https://my.box.com/mux") */ + baseUrl: z.url(), + token: z.string(), + startedAt: z.string(), +}); + +export type ServerLockData = z.infer; + +/** + * Manages the server lockfile at ~/.mux/server.lock + * + * The lockfile enables CLI tools to discover a running mux server + * (either Electron app or standalone mux server) and connect to it. + */ +export class ServerLockfile { + private readonly lockPath: string; + + constructor(muxHome: string) { + this.lockPath = path.join(muxHome, "server.lock"); + } + + /** + * Acquire the lockfile with the given baseUrl and token. + * Writes atomically with 0600 permissions (owner read/write only). + */ + async acquire(baseUrl: string, token: string): Promise { + const data: ServerLockData = { + pid: process.pid, + baseUrl, + token, + startedAt: new Date().toISOString(), + }; + + // Ensure directory exists + const dir = path.dirname(this.lockPath); + try { + await fs.access(dir); + } catch { + await fs.mkdir(dir, { recursive: true }); + } + + // Write atomically by writing to temp file then renaming + const tempPath = `${this.lockPath}.${process.pid}.tmp`; + await fs.writeFile(tempPath, JSON.stringify(data, null, 2), { + mode: 0o600, // Owner read/write only + }); + await fs.rename(tempPath, this.lockPath); + } + + /** + * Read the lockfile and validate it. + * Returns null if the lockfile doesn't exist or is stale (dead PID). + */ + async read(): Promise { + try { + await fs.access(this.lockPath); + const content = await fs.readFile(this.lockPath, "utf-8"); + const data = ServerLockDataSchema.parse(JSON.parse(content)); + + // Validate PID is still alive + if (!this.isProcessAlive(data.pid)) { + // Clean up stale lockfile + await this.release(); + return null; + } + + return data; + } catch { + return null; + } + } + + /** + * Release the lockfile by deleting it. + */ + async release(): Promise { + try { + await fs.unlink(this.lockPath); + } catch { + // Ignore cleanup errors (file may not exist) + } + } + + /** + * Check if a process with the given PID is still running. + * Uses signal 0 which tests existence without actually sending a signal. + */ + private isProcessAlive(pid: number): boolean { + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } + } + + /** + * Get the path to the lockfile (for testing/debugging). + */ + getLockPath(): string { + return this.lockPath; + } +} diff --git a/src/node/services/serverService.test.ts b/src/node/services/serverService.test.ts index 3e64f0c6a..25676aaac 100644 --- a/src/node/services/serverService.test.ts +++ b/src/node/services/serverService.test.ts @@ -1,5 +1,11 @@ -import { describe, expect, test } from "bun:test"; +import { describe, expect, test, beforeEach, afterEach } from "bun:test"; +import * as fs from "fs/promises"; +import * as path from "path"; +import * as os from "os"; +import * as net from "net"; import { ServerService } from "./serverService"; +import type { ORPCContext } from "@/node/orpc/context"; +import { ServerLockDataSchema } from "./serverLockfile"; describe("ServerService", () => { test("initializes with null path", async () => { @@ -29,3 +35,147 @@ describe("ServerService", () => { expect(await service.getLaunchProject()).toBeNull(); }); }); + +describe("ServerService.startServer", () => { + let tempDir: string; + + // Minimal context stub - server creation only needs the shape, not real services + const stubContext: Partial = {}; + + beforeEach(async () => { + tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "mux-server-test-")); + }); + + afterEach(async () => { + // Restore permissions before cleanup + try { + await fs.chmod(tempDir, 0o755); + } catch { + // ignore + } + await fs.rm(tempDir, { recursive: true, force: true }); + }); + + /** Check if a port is in use by attempting to connect to it */ + async function isPortListening(port: number): Promise { + return new Promise((resolve) => { + const socket = new net.Socket(); + socket.setTimeout(100); + socket.on("connect", () => { + socket.destroy(); + resolve(true); + }); + socket.on("timeout", () => { + socket.destroy(); + resolve(false); + }); + socket.on("error", () => { + resolve(false); + }); + socket.connect(port, "127.0.0.1"); + }); + } + + test("cleans up server when lockfile acquisition fails", async () => { + // Skip on Windows where chmod doesn't work the same way + if (process.platform === "win32") { + return; + } + + const service = new ServerService(); + + // Make muxHome read-only so lockfile.acquire() will fail + await fs.chmod(tempDir, 0o444); + + let thrownError: Error | null = null; + + try { + // Start server - this should fail when trying to write lockfile + await service.startServer({ + muxHome: tempDir, + context: stubContext as ORPCContext, + authToken: "test-token", + port: 0, // random port + }); + } catch (err) { + thrownError = err as Error; + } + + // Verify that an error was thrown + expect(thrownError).not.toBeNull(); + expect(thrownError!.message).toMatch(/EACCES|permission denied/i); + + // Verify the server is NOT left running + expect(service.isServerRunning()).toBe(false); + expect(service.getServerInfo()).toBeNull(); + }); + + test("does not delete another process's lockfile when start fails", async () => { + const service = new ServerService(); + + // Create a lockfile simulating another running server (use our own PID so it appears "alive") + const lockPath = path.join(tempDir, "server.lock"); + const existingLockData = { + pid: process.pid, + baseUrl: "http://127.0.0.1:9999", + token: "other-server-token", + startedAt: new Date().toISOString(), + }; + await fs.writeFile(lockPath, JSON.stringify(existingLockData, null, 2)); + + // Try to start - should fail due to existing server + let thrownError: Error | null = null; + try { + await service.startServer({ + muxHome: tempDir, + context: stubContext as ORPCContext, + authToken: "test-token", + port: 0, + }); + } catch (err) { + thrownError = err as Error; + } + + expect(thrownError).not.toBeNull(); + expect(thrownError!.message).toMatch(/already running/i); + + // Critical: call stopServer (simulating cleanup in finally block) + await service.stopServer(); + + // Verify the OTHER process's lockfile was NOT deleted + const lockContent = await fs.readFile(lockPath, "utf-8"); + const lockData = ServerLockDataSchema.parse(JSON.parse(lockContent)); + expect(lockData.baseUrl).toBe("http://127.0.0.1:9999"); + expect(lockData.token).toBe("other-server-token"); + }); + + test("successful start creates lockfile and server", async () => { + const service = new ServerService(); + + const info = await service.startServer({ + muxHome: tempDir, + context: stubContext as ORPCContext, + authToken: "test-token", + port: 0, + }); + + try { + expect(info.baseUrl).toMatch(/^http:\/\/127\.0\.0\.1:\d+$/); + expect(info.token).toBe("test-token"); + expect(service.isServerRunning()).toBe(true); + + // Verify lockfile was created + const lockPath = path.join(tempDir, "server.lock"); + const lockContent = await fs.readFile(lockPath, "utf-8"); + const lockData = ServerLockDataSchema.parse(JSON.parse(lockContent)); + expect(lockData.baseUrl).toBe(info.baseUrl); + expect(lockData.token).toBe("test-token"); + + // Verify server is actually listening + const port = parseInt(info.baseUrl.split(":")[2], 10); + expect(await isPortListening(port)).toBe(true); + } finally { + await service.stopServer(); + } + }); +}); diff --git a/src/node/services/serverService.ts b/src/node/services/serverService.ts index f7106315f..d44dbf3f6 100644 --- a/src/node/services/serverService.ts +++ b/src/node/services/serverService.ts @@ -1,5 +1,33 @@ +import { createOrpcServer, type OrpcServer, type OrpcServerOptions } from "@/node/orpc/server"; +import { ServerLockfile } from "./serverLockfile"; +import type { ORPCContext } from "@/node/orpc/context"; +import type { AppRouter } from "@/node/orpc/router"; + +export interface ServerInfo { + baseUrl: string; + token: string; +} + +export interface StartServerOptions { + /** Path to mux home directory (for lockfile) */ + muxHome: string; + /** oRPC context with services */ + context: ORPCContext; + /** Auth token for the server */ + authToken: string; + /** Port to bind to (0 = random) */ + port?: number; + /** Optional pre-created router (if not provided, creates router(authToken)) */ + router?: AppRouter; + /** Whether to serve static files */ + serveStatic?: boolean; +} + export class ServerService { private launchProjectPath: string | null = null; + private server: OrpcServer | null = null; + private lockfile: ServerLockfile | null = null; + private serverInfo: ServerInfo | null = null; /** * Set the launch project path @@ -14,4 +42,87 @@ export class ServerService { getLaunchProject(): Promise { return Promise.resolve(this.launchProjectPath); } + + /** + * Start the HTTP/WS API server. + * + * @throws Error if a server is already running (check lockfile first) + */ + async startServer(options: StartServerOptions): Promise { + if (this.server) { + throw new Error("Server already running in this process"); + } + + // Create lockfile instance for checking - don't store yet + const lockfile = new ServerLockfile(options.muxHome); + + // Check for existing server (another process) + const existing = await lockfile.read(); + if (existing) { + throw new Error( + `Another mux server is already running at ${existing.baseUrl} (PID: ${existing.pid})` + ); + } + + // Create the server (Electron always binds to 127.0.0.1) + const serverOptions: OrpcServerOptions = { + host: "127.0.0.1", + port: options.port ?? 0, + context: options.context, + authToken: options.authToken, + router: options.router, + serveStatic: options.serveStatic ?? false, + }; + + const server = await createOrpcServer(serverOptions); + + // Acquire the lockfile - clean up server if this fails + try { + await lockfile.acquire(server.baseUrl, options.authToken); + } catch (err) { + await server.close(); + throw err; + } + + // Only store references after successful acquisition - ensures stopServer + // won't delete another process's lockfile if we failed before acquiring + this.lockfile = lockfile; + this.server = server; + this.serverInfo = { + baseUrl: this.server.baseUrl, + token: options.authToken, + }; + + return this.serverInfo; + } + + /** + * Stop the HTTP/WS API server and release the lockfile. + */ + async stopServer(): Promise { + if (this.lockfile) { + await this.lockfile.release(); + this.lockfile = null; + } + if (this.server) { + await this.server.close(); + this.server = null; + } + this.serverInfo = null; + } + + /** + * Get information about the running server. + * Returns null if no server is running in this process. + */ + getServerInfo(): ServerInfo | null { + return this.serverInfo; + } + + /** + * Check if a server is running in this process. + */ + isServerRunning(): boolean { + return this.server !== null; + } }