|
1 | 1 | import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; |
| 2 | +import { StreamableHTTPServerTransport, StreamableHTTPServerTransportOptions } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; |
2 | 3 | import express, { Request, Response, NextFunction } from "express"; |
3 | 4 | import session from 'express-session'; |
4 | 5 | import { ServerResponse } from "node:http"; // Import ServerResponse |
@@ -483,6 +484,7 @@ if (enableAdminUI) { |
483 | 484 |
|
484 | 485 |
|
485 | 486 | const sseTransports = new Map<string, SSEServerTransport>(); |
| 487 | +const streamableHttpTransports = new Map<string, StreamableHTTPServerTransport>(); |
486 | 488 |
|
487 | 489 | app.get("/sse", async (req, res) => { |
488 | 490 | const clientId = req.ip || `client-${Date.now()}`; |
@@ -603,6 +605,122 @@ app.get("/sse", async (req, res) => { |
603 | 605 | } |
604 | 606 | }); |
605 | 607 |
|
| 608 | +app.all("/mcp", async (req, res) => { |
| 609 | + const clientId = req.ip || `mcp-client-${Date.now()}`; |
| 610 | + console.log(`[${clientId}] MCP connection received for ${req.method} ${req.originalUrl}`); |
| 611 | + |
| 612 | + // Authentication (similar to /sse) |
| 613 | + if (authEnabled) { |
| 614 | + let authenticated = false; |
| 615 | + const authHeader = req.headers['authorization'] as string | undefined; |
| 616 | + if (authHeader && authHeader.startsWith('Bearer ')) { |
| 617 | + const token = authHeader.substring('Bearer '.length).trim(); |
| 618 | + if (allowedTokens.has(token)) { |
| 619 | + console.log(`[${clientId}] Authorized MCP connection using Bearer Token.`); |
| 620 | + authenticated = true; |
| 621 | + } else { |
| 622 | + console.warn(`[${clientId}] Unauthorized MCP connection attempt. Invalid Bearer Token.`); |
| 623 | + } |
| 624 | + } |
| 625 | + |
| 626 | + if (!authenticated && allowedKeys.size > 0) { |
| 627 | + const headerKey = req.headers['x-api-key'] as string | undefined; |
| 628 | + // MCP spec does not mention query param for API key, but we can keep it for consistency if desired. |
| 629 | + const queryKey = req.query.key as string | undefined; |
| 630 | + const providedKey = headerKey || queryKey; |
| 631 | + // const providedKey = headerKey; |
| 632 | + |
| 633 | + if (providedKey && allowedKeys.has(providedKey)) { |
| 634 | + console.log(`[${clientId}] Authorized MCP connection using X-API-Key header.`); |
| 635 | + authenticated = true; |
| 636 | + } else if (providedKey) { |
| 637 | + console.warn(`[${clientId}] Unauthorized MCP connection attempt. Invalid API Key in header.`); |
| 638 | + } |
| 639 | + } |
| 640 | + |
| 641 | + if (!authenticated) { |
| 642 | + console.warn(`[${clientId}] Unauthorized MCP connection attempt. No valid credentials provided.`); |
| 643 | + // For POST requests with invalid/missing auth, if they are not initialization, |
| 644 | + // the SDK's transport might handle specific MCP errors. |
| 645 | + // For GET, a 401 is appropriate. |
| 646 | + if (req.method === "GET") { |
| 647 | + res.status(401).send('Unauthorized'); |
| 648 | + return; |
| 649 | + } |
| 650 | + // For POST, let the transport handle it if it's an MCP message, |
| 651 | + // otherwise, if it's not an MCP message (e.g. random POST), a 401 is also fine. |
| 652 | + // The SDK's transport will produce more specific errors if it's an MCP request |
| 653 | + // without a session ID when one is required. |
| 654 | + } |
| 655 | + } |
| 656 | + |
| 657 | + // Session ID handling for Streamable HTTP |
| 658 | + // The transport itself manages session IDs based on its configuration. |
| 659 | + // We need to find or create a transport instance. |
| 660 | + // For Streamable HTTP, a single transport instance can handle multiple "sessions" |
| 661 | + // if configured to do so (e.g. by using a session ID generator). |
| 662 | + // Or, it can be stateless. |
| 663 | + |
| 664 | + // For simplicity in this proxy, we'll create one main StreamableHTTPServerTransport |
| 665 | + // and let it handle session logic internally based on its options. |
| 666 | + // We'll use a fixed key "main_mcp_transport" for our map. |
| 667 | + const transportKey = "main_mcp_transport"; |
| 668 | + let httpTransport = streamableHttpTransports.get(transportKey); |
| 669 | + |
| 670 | + if (!httpTransport) { |
| 671 | + console.log(`[${clientId}] Creating new StreamableHTTPServerTransport instance.`); |
| 672 | + const transportOptions: StreamableHTTPServerTransportOptions = { |
| 673 | + // sessionIdGenerator: () => crypto.randomUUID(), // Enable stateful sessions |
| 674 | + sessionIdGenerator: undefined, // Start with stateless for simplicity, as per example |
| 675 | + onsessioninitialized: (sessionId) => { |
| 676 | + console.log(`[${clientId}] MCP Session initialized: ${sessionId}`); |
| 677 | + }, |
| 678 | + // enableJsonResponse: false, // Default is false (SSE preferred for streaming) |
| 679 | + }; |
| 680 | + httpTransport = new StreamableHTTPServerTransport(transportOptions); |
| 681 | + |
| 682 | + // Connect this transport to the main MCP server instance |
| 683 | + try { |
| 684 | + await server.connect(httpTransport); |
| 685 | + streamableHttpTransports.set(transportKey, httpTransport); |
| 686 | + console.log(`[${clientId}] New StreamableHTTPServerTransport connected to server and stored.`); |
| 687 | + |
| 688 | + httpTransport.onclose = () => { |
| 689 | + console.log(`[${clientId}] StreamableHTTPServerTransport (main) closed.`); |
| 690 | + streamableHttpTransports.delete(transportKey); |
| 691 | + // Re-create and re-connect if needed, or handle as a permanent closure. |
| 692 | + }; |
| 693 | + httpTransport.onerror = (error: Error) => { |
| 694 | + console.error(`[${clientId}] StreamableHTTPServerTransport (main) error:`, error); |
| 695 | + streamableHttpTransports.delete(transportKey); |
| 696 | + }; |
| 697 | + |
| 698 | + } catch (connectError) { |
| 699 | + console.error(`[${clientId}] Failed to connect new StreamableHTTPServerTransport to server:`, connectError); |
| 700 | + if (!res.headersSent) { |
| 701 | + res.status(500).send("Failed to initialize MCP transport"); |
| 702 | + } |
| 703 | + return; |
| 704 | + } |
| 705 | + } else { |
| 706 | + console.log(`[${clientId}] Using existing StreamableHTTPServerTransport instance.`); |
| 707 | + } |
| 708 | + |
| 709 | + try { |
| 710 | + // Pass the request and response to the transport's handler |
| 711 | + // The SDK transport will handle GET, POST, DELETE appropriately. |
| 712 | + await httpTransport.handleRequest(req, res, req.body); // req.body is already parsed by express.json() |
| 713 | + console.log(`[${clientId}] StreamableHTTPServerTransport successfully handled ${req.method} request.`); |
| 714 | + } catch (error: any) { |
| 715 | + console.error(`[${clientId}] Error in StreamableHTTPServerTransport.handleRequest:`, error); |
| 716 | + if (!res.headersSent) { |
| 717 | + // The transport might have already sent an error response. |
| 718 | + // If not, send a generic one. |
| 719 | + res.status(500).send({ error: "Failed to process MCP request via transport" }); |
| 720 | + } |
| 721 | + } |
| 722 | +}); |
| 723 | + |
606 | 724 | // Removed GET /message?action=new_session endpoint as it's deemed unnecessary. |
607 | 725 | // The client should rely on the sessionId provided by the 'endpoint' event from the /sse connection. |
608 | 726 |
|
@@ -648,9 +766,14 @@ const shutdown = async (signal: string) => { |
648 | 766 | console.log(`\nReceived ${signal}. Shutting down gracefully...`); |
649 | 767 | try { |
650 | 768 | console.log("Closing MCP Server (disconnecting transports)..."); |
651 | | - await server.close(); |
| 769 | + await server.close(); // This will call close on all connected transports (SSE and HTTP) |
652 | 770 | console.log("MCP Server closed."); |
653 | 771 |
|
| 772 | + // streamableHttpTransports are closed by server.close() if they were connected. |
| 773 | + // Explicitly clear the map if needed, though server.close() should handle their disconnection. |
| 774 | + streamableHttpTransports.clear(); |
| 775 | + console.log("Streamable HTTP transports cleared/closed."); |
| 776 | + |
654 | 777 | console.log("Cleaning up backend clients..."); |
655 | 778 | await cleanup(); |
656 | 779 | console.log("Backend clients cleaned up."); |
|
0 commit comments