Skip to content

Commit 3c5ade4

Browse files
committed
Refactor: Modify /mcp endpoint for concurrent session handling
The /mcp endpoint in src/sse.ts has been updated to dynamically create and manage StreamableHTTPServerTransport instances for each client session. This allows the /mcp endpoint to handle concurrent connections with independent session IDs, similar to the /sse endpoint. Each new transport instance now uses its own sessionId generated via crypto.randomUUID() and updates an internal map accordingly. Error and close handlers are also set up for each dynamic transport.
1 parent 64e2c74 commit 3c5ade4

File tree

4 files changed

+165
-87
lines changed

4 files changed

+165
-87
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ browser-*
1212
chatmcp
1313
typescript-sdk
1414
inspector
15+
MCP-protocol.txt
1516
tools/**
1617
config/mcp_server.json
1718
config/tool_config.json

config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: "MCP Proxy Server"
2-
version: "0.3.1"
2+
version: "0.3.2"
33
slug: "mcp_proxy_server"
44
description: "A central hub for Model Context Protocol (MCP) servers. Manages multiple backend MCP servers (Stdio/SSE), exposing their combined tools and resources via a unified SSE interface or as a Stdio server. Features Web UI for server/tool management, real-time installation monitoring, and optional web terminal."
55
arch:

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "mcp-proxy-server",
3-
"version": "0.3.1",
3+
"version": "0.3.2",
44
"author": "ptbsare",
55
"license": "MIT",
66
"description": "An MCP proxy server that aggregates and serves multiple MCP resource servers through a single interface with stdio/sse support",

src/sse.ts

Lines changed: 162 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -39,44 +39,15 @@ const SECRET_FILE_PATH = path.resolve(__dirname, '..', 'config', '.session_secre
3939
const publicPath = path.join(__dirname, '..', 'public');
4040

4141
const sseTransports = new Map<string, SSEServerTransport>();
42-
const streamableHttpTransports = new Map<string, StreamableHTTPServerTransport>(); // Define this map earlier
42+
const streamableHttpTransports = new Map<string, StreamableHTTPServerTransport>();
4343

4444
// createServer no longer returns connectedClients
4545
const { server, cleanup } = await createServer();
4646

47-
// Create and connect the main StreamableHTTPServerTransport for the /mcp endpoint at startup
48-
const mcpEndpointTransportKey = "main_mcp_transport";
49-
const mcpTransportOptions: StreamableHTTPServerTransportOptions = {
50-
sessionIdGenerator: undefined, // Stateless, as per current setup
51-
onsessioninitialized: (sessionId: string) => { // Added type for sessionId
52-
// This should not be called if sessionIdGenerator is undefined
53-
console.log(`[MCP Endpoint] Main transport session initialized: ${sessionId}`);
54-
},
55-
enableJsonResponse: false, // Revert to default SSE streaming for POST responses
56-
};
57-
const mainHttpTransport = new StreamableHTTPServerTransport(mcpTransportOptions);
58-
59-
try {
60-
await server.connect(mainHttpTransport);
61-
streamableHttpTransports.set(mcpEndpointTransportKey, mainHttpTransport);
62-
console.log("Main StreamableHTTPServerTransport for /mcp endpoint connected and ready.");
63-
64-
// Standard onclose and onerror handlers
65-
mainHttpTransport.onclose = () => {
66-
console.log("Main StreamableHTTPServerTransport for /mcp endpoint closed."); // Restored simpler log
67-
streamableHttpTransports.delete(mcpEndpointTransportKey);
68-
};
69-
mainHttpTransport.onerror = (error: Error) => {
70-
console.error("Main StreamableHTTPServerTransport for /mcp endpoint error:", error); // Restored simpler log
71-
streamableHttpTransports.delete(mcpEndpointTransportKey);
72-
};
73-
74-
} catch (e) {
75-
console.error("FATAL: Could not connect main StreamableHTTPServerTransport for /mcp endpoint at startup:", e);
76-
process.exit(1); // Exit if the main transport cannot be set up
77-
}
47+
// No longer creating a single mainHttpTransport at startup for /mcp.
48+
// Transports for /mcp will be created dynamically per session.
7849

79-
const allowedKeysRaw = process.env.ALLOWED_KEYS || ""; // Renamed
50+
const allowedKeysRaw = process.env.ALLOWED_KEYS || "";
8051
const allowedKeys = new Set(allowedKeysRaw.split(',').map(k => k.trim()).filter(k => k.length > 0));
8152

8253
const allowedTokensRaw = process.env.ALLOWED_TOKENS || ""; // Renamed
@@ -655,93 +626,199 @@ app.get("/sse", async (req, res) => {
655626
// Removed GET /message?action=new_session endpoint as it's deemed unnecessary.
656627
// The client should rely on the sessionId provided by the 'endpoint' event from the /sse connection.
657628

658-
app.post("/mcp", async (req, res) => {
629+
app.all("/mcp", async (req, res) => { // Changed to app.all to handle GET for SSE and POST for messages
659630
const clientId = req.ip || `client-http-${Date.now()}`;
660-
console.log(`[${clientId}] Received POST request on /mcp`);
631+
console.log(`[${clientId}] Received ${req.method} request on /mcp`);
661632

662633
// Authentication check (similar to /sse)
663-
if (authEnabled) { // authEnabled is defined globally
634+
if (authEnabled) {
664635
let authenticated = false;
665-
666-
// 1. Check for Bearer Token in Authorization header
667636
const authHeader = req.headers['authorization'] as string | undefined;
668637
if (authHeader && authHeader.startsWith('Bearer ')) {
669638
const token = authHeader.substring('Bearer '.length).trim();
670-
if (allowedTokens.has(token)) { // allowedTokens is defined globally
639+
if (allowedTokens.has(token)) {
671640
console.log(`[${clientId}] Authorized /mcp connection using Bearer Token.`);
672641
authenticated = true;
673642
} else {
674-
console.warn(`[${clientId}] Unauthorized /mcp connection attempt. Invalid Bearer Token.`);
643+
console.warn(`[${clientId}] Unauthorized /mcp (Bearer) for ${req.method}. Invalid Token.`);
675644
}
676645
}
677-
678-
// 2. If not authenticated by Bearer Token, check for API Key
679-
if (!authenticated && allowedKeys.size > 0) { // allowedKeys is defined globally
646+
if (!authenticated && allowedKeys.size > 0) {
680647
const headerKey = req.headers['x-api-key'] as string | undefined;
681648
const queryKey = req.query.key as string | undefined;
682649
const providedKey = headerKey || queryKey;
683-
684650
if (providedKey && allowedKeys.has(providedKey)) {
685651
console.log(`[${clientId}] Authorized /mcp connection using ${headerKey ? 'header' : 'query'} API Key.`);
686652
authenticated = true;
687653
} else if (providedKey) {
688-
console.warn(`[${clientId}] Unauthorized /mcp connection attempt. Invalid API Key.`);
654+
console.warn(`[${clientId}] Unauthorized /mcp (API Key) for ${req.method}. Invalid Key.`);
689655
}
690656
}
691-
692-
// If authentication is enabled but no valid credentials were provided
693657
if (!authenticated) {
694-
console.warn(`[${clientId}] Unauthorized /mcp connection attempt. No valid credentials provided.`);
695-
res.status(401).send('Unauthorized'); // Send 401 and return
658+
console.warn(`[${clientId}] Unauthorized /mcp for ${req.method}. No valid credentials.`);
659+
res.status(401).send('Unauthorized');
696660
return;
697661
}
698662
}
699663

700-
// Use the pre-initialized mainHttpTransport
701-
const httpTransport = streamableHttpTransports.get(mcpEndpointTransportKey);
664+
let httpTransport: StreamableHTTPServerTransport | undefined;
665+
const clientProvidedSessionId = req.headers['mcp-session-id'] as string | undefined;
666+
let transportSessionIdToUse: string | undefined = clientProvidedSessionId;
667+
668+
if (clientProvidedSessionId) {
669+
httpTransport = streamableHttpTransports.get(clientProvidedSessionId);
670+
if (!httpTransport) {
671+
console.warn(`[${clientId}] /mcp: Client provided Mcp-Session-Id '${clientProvidedSessionId}', but no active transport found. Responding 404.`);
672+
if (!res.headersSent) {
673+
res.status(404).json({
674+
jsonrpc: "2.0",
675+
error: { code: -32000, message: `Session not found for Mcp-Session-Id: ${clientProvidedSessionId}` },
676+
id: (req.body as any)?.id ?? null
677+
});
678+
}
679+
return;
680+
}
681+
console.log(`[${clientId}] /mcp: Using existing transport for Mcp-Session-Id: ${clientProvidedSessionId}`);
682+
} else {
683+
// No Mcp-Session-Id from client, or it's an InitializeRequest that might not have one yet.
684+
// Create a new transport. The transport itself will generate a session ID.
685+
console.log(`[${clientId}] /mcp: No Mcp-Session-Id from client, or new session. Creating new StreamableHTTPServerTransport.`);
686+
const tempGeneratedIdForEarlyMap = `pending-${crypto.randomBytes(8).toString('hex')}`;
687+
let capturedHttpTransportInstance: StreamableHTTPServerTransport | null = null; // To ensure closure captures the correct instance
688+
689+
const newTransportOptions: StreamableHTTPServerTransportOptions = {
690+
sessionIdGenerator: () => crypto.randomUUID(), // Use crypto.randomUUID for session ID generation
691+
enableJsonResponse: false,
692+
onsessioninitialized: (sdkGeneratedSessionId: string) => {
693+
console.log(`[${clientId}] /mcp: SDK 'onsessioninitialized' called. SDK Session ID: ${sdkGeneratedSessionId}`);
694+
if (capturedHttpTransportInstance) {
695+
// The SDK has now initialized the session and `capturedHttpTransportInstance.sessionId` should be set.
696+
// Verify it matches sdkGeneratedSessionId for sanity.
697+
if (capturedHttpTransportInstance.sessionId !== sdkGeneratedSessionId) {
698+
console.warn(`[${clientId}] /mcp: Discrepancy! sdkGeneratedSessionId (${sdkGeneratedSessionId}) vs transport.sessionId (${capturedHttpTransportInstance.sessionId}). Using sdkGeneratedSessionId.`);
699+
}
700+
const finalSessionId = sdkGeneratedSessionId; // Use the ID from the callback
701+
702+
if (streamableHttpTransports.has(tempGeneratedIdForEarlyMap)) {
703+
const transportInstanceFromMap = streamableHttpTransports.get(tempGeneratedIdForEarlyMap);
704+
if (transportInstanceFromMap === capturedHttpTransportInstance) {
705+
streamableHttpTransports.delete(tempGeneratedIdForEarlyMap);
706+
streamableHttpTransports.set(finalSessionId, capturedHttpTransportInstance);
707+
if (transportSessionIdToUse === tempGeneratedIdForEarlyMap) {
708+
transportSessionIdToUse = finalSessionId;
709+
}
710+
console.log(`[${clientId}] /mcp: Transport map updated. Temp ID '${tempGeneratedIdForEarlyMap}' replaced with final '${finalSessionId}'. Active: ${streamableHttpTransports.size}`);
711+
} else {
712+
console.error(`[${clientId}] /mcp: Mismatch during onsessioninitialized! Temp ID ${tempGeneratedIdForEarlyMap} found but instance differs.`);
713+
if (!streamableHttpTransports.has(finalSessionId) || streamableHttpTransports.get(finalSessionId) !== capturedHttpTransportInstance) {
714+
streamableHttpTransports.set(finalSessionId, capturedHttpTransportInstance);
715+
console.warn(`[${clientId}] /mcp: Force-mapped transport with final ID '${finalSessionId}' due to instance mismatch.`);
716+
}
717+
}
718+
} else {
719+
if (!streamableHttpTransports.has(finalSessionId) || streamableHttpTransports.get(finalSessionId) !== capturedHttpTransportInstance) {
720+
streamableHttpTransports.set(finalSessionId, capturedHttpTransportInstance);
721+
if (transportSessionIdToUse === tempGeneratedIdForEarlyMap) {
722+
transportSessionIdToUse = finalSessionId;
723+
}
724+
console.log(`[${clientId}] /mcp: Transport (re)added to map with final ID '${finalSessionId}' (temp not found or instance check). Active: ${streamableHttpTransports.size}`);
725+
}
726+
}
727+
} else {
728+
console.error(`[${clientId}] /mcp: onsessioninitialized called but capturedHttpTransportInstance is null. SDK SessionId: ${sdkGeneratedSessionId}`);
729+
}
730+
},
731+
};
732+
733+
httpTransport = new StreamableHTTPServerTransport(newTransportOptions);
734+
capturedHttpTransportInstance = httpTransport; // Capture for the onsessioninitialized closure
735+
736+
// Store with a temporary ID. This will be updated by onsessioninitialized when the SDK provides the actual session ID.
737+
transportSessionIdToUse = tempGeneratedIdForEarlyMap;
738+
streamableHttpTransports.set(tempGeneratedIdForEarlyMap, httpTransport);
739+
console.log(`[${clientId}] /mcp: New transport created. Stored with temp ID: ${tempGeneratedIdForEarlyMap}. Active transports: ${streamableHttpTransports.size}`);
740+
741+
const currentTransportForHandlers = httpTransport; // Use this specific instance in handlers
742+
743+
currentTransportForHandlers.onerror = (error: Error) => {
744+
// Use currentTransportForHandlers.sessionId if available, otherwise fallback to transportSessionIdToUse (which might be temp or final)
745+
const idToClean = currentTransportForHandlers.sessionId || transportSessionIdToUse;
746+
console.error(`[${clientId}] /mcp: StreamableHTTPServerTransport error for session related to ${idToClean}:`, error);
747+
748+
if (streamableHttpTransports.get(tempGeneratedIdForEarlyMap) === currentTransportForHandlers) {
749+
streamableHttpTransports.delete(tempGeneratedIdForEarlyMap);
750+
}
751+
if (currentTransportForHandlers.sessionId && streamableHttpTransports.get(currentTransportForHandlers.sessionId) === currentTransportForHandlers) {
752+
streamableHttpTransports.delete(currentTransportForHandlers.sessionId);
753+
}
754+
console.log(`[${clientId}] /mcp: Transport for session related to ${idToClean} removed due to error. Active: ${streamableHttpTransports.size}`);
755+
};
756+
757+
currentTransportForHandlers.onclose = () => {
758+
const idToClean = currentTransportForHandlers.sessionId || transportSessionIdToUse;
759+
console.log(`[${clientId}] /mcp: StreamableHTTPServerTransport closed for session related to ${idToClean}.`);
760+
if (streamableHttpTransports.get(tempGeneratedIdForEarlyMap) === currentTransportForHandlers) {
761+
streamableHttpTransports.delete(tempGeneratedIdForEarlyMap);
762+
}
763+
if (currentTransportForHandlers.sessionId && streamableHttpTransports.get(currentTransportForHandlers.sessionId) === currentTransportForHandlers) {
764+
streamableHttpTransports.delete(currentTransportForHandlers.sessionId);
765+
}
766+
console.log(`[${clientId}] /mcp: Transport for session related to ${idToClean} removed on close. Active: ${streamableHttpTransports.size}`);
767+
};
768+
769+
try {
770+
await server.connect(currentTransportForHandlers);
771+
console.log(`[${clientId}] /mcp: New transport (temp ID: ${transportSessionIdToUse}, awaiting final SDK sessionId) connected to server.`);
772+
} catch (connectError: any) {
773+
console.error(`[${clientId}] /mcp: Failed to connect new transport to server:`, connectError);
774+
streamableHttpTransports.delete(tempGeneratedIdForEarlyMap); // Clean up temp entry
775+
if (!res.headersSent) {
776+
res.status(500).json({
777+
jsonrpc: "2.0",
778+
error: { code: -32001, message: `Failed to connect new MCP transport: ${connectError.message}` },
779+
id: (req.body as any)?.id ?? null
780+
});
781+
}
782+
return;
783+
}
784+
}
702785

703786
if (!httpTransport) {
704-
console.error(`[${clientId}] FATAL: Main StreamableHTTPServerTransport for /mcp not found during request! This should have been initialized at startup.`);
787+
// This case should ideally be caught earlier if clientProvidedSessionId was present but not found.
788+
// If it's a new session and httpTransport somehow didn't get created.
789+
console.error(`[${clientId}] /mcp: Transport is unexpectedly undefined before handling request.`);
705790
if (!res.headersSent) {
706-
res.status(500).send("MCP transport not available");
791+
res.status(500).json({
792+
jsonrpc: "2.0",
793+
error: { code: -32002, message: "MCP transport not available for session." },
794+
id: (req.body as any)?.id ?? null
795+
});
707796
}
708797
return;
709798
}
710-
711-
// The mainHttpTransport's onmessage, onerror, onclose are already set up during startup
712-
// and connected to the main 'server' instance.
713-
// We don't need to (and shouldn't) override them here.
714799

715-
console.log(`[${clientId}] About to call mainHttpTransport.handleRequest for ${req.method} ${req.originalUrl}`);
800+
console.log(`[${clientId}] /mcp: About to call transport.handleRequest for session ${transportSessionIdToUse || httpTransport.sessionId} - Method: ${req.method}`);
716801
try {
717-
// Handle the incoming HTTP request using the pre-configured mainHttpTransport.
718-
// This will parse the body and, if successful, trigger the mainHttpTransport.onmessage
719-
// (which is our wrapper that calls the Server instance's handler).
720-
await httpTransport.handleRequest(req, res, req.body);
721-
722-
// The response stream (res) is now managed by httpTransport.
723-
// It will send SSE events (or a direct JSON if it were configured for it)
724-
// and will end the response when appropriate.
725-
console.log(`[${clientId}] mainHttpTransport.handleRequest completed for ${req.method} ${req.originalUrl}. Response stream is now managed by the transport.`);
726-
802+
// The SDK's StreamableHTTPServerTransport.handleRequest should:
803+
// - For new sessions (e.g., on InitializeRequest), establish the session,
804+
// generate/obtain a session ID, and ensure Mcp-Session-Id header is in the response.
805+
// - For existing sessions, use the provided Mcp-Session-Id.
806+
// - Handle both POST (for client messages) and GET (for server-initiated SSE streams).
807+
await httpTransport.handleRequest(req, res, req.body);
808+
console.log(`[${clientId}] /mcp: transport.handleRequest completed for session ${transportSessionIdToUse || httpTransport.sessionId}. Response stream managed by transport.`);
727809
} catch (error: any) {
728-
console.error(`[${clientId}] Error during StreamableHTTP Transport handling:`, error);
729-
// If an error occurs *before* the transport takes over the response,
730-
// we need to send an error response here.
731-
if (!res.headersSent) {
732-
res.writeHead(500, { 'Content-Type': 'application/json' });
733-
res.end(JSON.stringify({
734-
jsonrpc: "2.0",
735-
error: {
736-
code: -32603, // Internal error
737-
message: `Internal server error: ${error.message || error}`
738-
},
739-
id: (req.body as any)?.id ?? null // Include original request id if available
740-
}) + '\n');
741-
} else if (!res.writableEnded) {
742-
// If headers were sent but an error occurred, just end the stream
743-
res.end();
744-
}
810+
const idToLog = transportSessionIdToUse || httpTransport.sessionId;
811+
console.error(`[${clientId}] /mcp: Error during transport.handleRequest for session ${idToLog}:`, error);
812+
if (!res.headersSent) {
813+
res.writeHead(500, { 'Content-Type': 'application/json' });
814+
res.end(JSON.stringify({
815+
jsonrpc: "2.0",
816+
error: { code: -32603, message: `Internal server error during MCP request handling: ${error.message || error}` },
817+
id: (req.body as any)?.id ?? null
818+
}) + '\n');
819+
} else if (!res.writableEnded) {
820+
res.end();
821+
}
745822
}
746823
});
747824
app.post("/message", async (req, res) => {

0 commit comments

Comments
 (0)