diff --git a/package-lock.json b/package-lock.json index c419eda6..11bffb72 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1862,7 +1862,6 @@ "resolved": "https://registry.npmjs.org/express/-/express-5.2.1.tgz", "integrity": "sha512-hIS4idWWai69NezIdRt2xFVofaF4j+6INOpJlVOLDO8zXGpUVEVzIYk12UUi2JzjEzWL3IOAxcTubgz9Po0yXw==", "license": "MIT", - "peer": true, "dependencies": { "accepts": "^2.0.0", "body-parser": "^2.2.1", @@ -3390,7 +3389,6 @@ "integrity": "sha512-j3lYzGC3P+B5Yfy/pfKNgVEg4+UtcIJcVRt2cDjIOmhLourAqPqf8P7acgxeiSgUB7E3p2P8/3gNIgDLpwzs4g==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "esbuild": "^0.21.3", "postcss": "^8.4.43", @@ -3474,7 +3472,6 @@ "integrity": "sha512-MSmPM9REYqDGBI8439mA4mWhV5sKmDlBKWIYbA3lRb2PTHACE0mgKwA8yQ2xq9vxDTuk4iPrECBAEW2aoFXY0Q==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@vitest/expect": "2.1.9", "@vitest/mocker": "2.1.9", @@ -3644,7 +3641,6 @@ "resolved": "https://registry.npmjs.org/zod/-/zod-3.25.76.tgz", "integrity": "sha512-gzUt/qt81nXsFGKIFcC3YnfEAx5NkunCfnDlvuBSSFS02bcXu4Lmea0AFIUwbLWxWPx3d9p8S5QoaujKcNQxcQ==", "license": "MIT", - "peer": true, "funding": { "url": "https://github.com/sponsors/colinhacks" } diff --git a/src/everything/index.ts b/src/everything/index.ts index 801fe721..d755c408 100644 --- a/src/everything/index.ts +++ b/src/everything/index.ts @@ -10,15 +10,15 @@ async function run() { switch (scriptName) { case 'stdio': // Import and run the default server - await import('./stdio.js'); + await import('./transports/stdio.js'); break; case 'sse': // Import and run the SSE server - await import('./sse.js'); + await import('./transports/sse.js'); break; case 'streamableHttp': // Import and run the streamable HTTP server - await import('./streamableHttp.js'); + await import('./transports/streamableHttp.js'); break; default: console.error(`Unknown script: ${scriptName}`); diff --git a/src/everything/everything.ts b/src/everything/server/everything.ts similarity index 100% rename from src/everything/everything.ts rename to src/everything/server/everything.ts diff --git a/src/everything/server/index.ts b/src/everything/server/index.ts new file mode 100644 index 00000000..6781c165 --- /dev/null +++ b/src/everything/server/index.ts @@ -0,0 +1,50 @@ +import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; +import { registerTools } from "../tools/index.js"; +import { dirname, join } from "path"; +import { readFileSync } from "fs"; +import { fileURLToPath } from "url"; +const __filename = fileURLToPath(import.meta.url); +const __dirname = dirname(__filename); +const instructions = readInstructions(); + +// Create the MCP resource server +export const createServer = () => { + const server = new McpServer( + { + name: "mcp-servers/everything", + title: "Everything Reference Server", + version: "2.0.0", + }, + { + capabilities: { + tools: {}, + logging: {}, + prompts: {}, + resources: { + subscribe: true, + } + }, + instructions, + }, + ); + + // Register the tools + registerTools(server); + + return { + server, + cleanup: () => {}, + startNotificationIntervals: (sessionId?: string) => {} + }; +}; + +function readInstructions(): string { + let instructions; + + try { + instructions = readFileSync(join(__dirname, "../instructions.md"), "utf-8"); + } catch (e) { + instructions = "Server instructions not loaded: " + e; + } + return instructions; +} diff --git a/src/everything/stdio.ts b/src/everything/stdio.ts deleted file mode 100644 index 102af4f1..00000000 --- a/src/everything/stdio.ts +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/env node - -import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; -import { createServer } from "./everything.js"; - -console.error('Starting default (STDIO) server...'); - -async function main() { - const transport = new StdioServerTransport(); - const {server, cleanup, startNotificationIntervals} = createServer(); - - // Cleanup when client disconnects - server.onclose = async () => { - await cleanup(); - process.exit(0); - }; - - await server.connect(transport); - startNotificationIntervals(); - - // Cleanup on exit - process.on("SIGINT", async () => { - await server.close(); - }); -} - -main().catch((error) => { - console.error("Server error:", error); - process.exit(1); -}); - diff --git a/src/everything/streamableHttp.ts b/src/everything/streamableHttp.ts deleted file mode 100644 index c5d0eeea..00000000 --- a/src/everything/streamableHttp.ts +++ /dev/null @@ -1,193 +0,0 @@ -import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; -import { InMemoryEventStore } from '@modelcontextprotocol/sdk/examples/shared/inMemoryEventStore.js'; -import express, { Request, Response } from "express"; -import { createServer } from "./everything.js"; -import { randomUUID } from 'node:crypto'; -import cors from 'cors'; - -console.error('Starting Streamable HTTP server...'); - -const app = express(); -app.use(cors({ - "origin": "*", // use "*" with caution in production - "methods": "GET,POST,DELETE", - "preflightContinue": false, - "optionsSuccessStatus": 204, - "exposedHeaders": [ - 'mcp-session-id', - 'last-event-id', - 'mcp-protocol-version' - ] -})); // Enable CORS for all routes so Inspector can connect - -const transports: Map = new Map(); - -app.post('/mcp', async (req: Request, res: Response) => { - console.error('Received MCP POST request'); - try { - // Check for existing session ID - const sessionId = req.headers['mcp-session-id'] as string | undefined; - - let transport: StreamableHTTPServerTransport; - - if (sessionId && transports.has(sessionId)) { - // Reuse existing transport - transport = transports.get(sessionId)!; - } else if (!sessionId) { - - const { server, cleanup, startNotificationIntervals } = createServer(); - - // New initialization request - const eventStore = new InMemoryEventStore(); - transport = new StreamableHTTPServerTransport({ - sessionIdGenerator: () => randomUUID(), - eventStore, // Enable resumability - onsessioninitialized: (sessionId: string) => { - // Store the transport by session ID when session is initialized - // This avoids race conditions where requests might come in before the session is stored - console.error(`Session initialized with ID: ${sessionId}`); - transports.set(sessionId, transport); - } - }); - - - // Set up onclose handler to clean up transport when closed - server.onclose = async () => { - const sid = transport.sessionId; - if (sid && transports.has(sid)) { - console.error(`Transport closed for session ${sid}, removing from transports map`); - transports.delete(sid); - await cleanup(); - } - }; - - // Connect the transport to the MCP server BEFORE handling the request - // so responses can flow back through the same transport - await server.connect(transport); - - await transport.handleRequest(req, res); - - // Wait until initialize is complete and transport will have a sessionId - startNotificationIntervals(transport.sessionId); - - return; // Already handled - } else { - // Invalid request - no session ID or not initialization request - res.status(400).json({ - jsonrpc: '2.0', - error: { - code: -32000, - message: 'Bad Request: No valid session ID provided', - }, - id: req?.body?.id, - }); - return; - } - - // Handle the request with existing transport - no need to reconnect - // The existing transport is already connected to the server - await transport.handleRequest(req, res); - } catch (error) { - console.error('Error handling MCP request:', error); - if (!res.headersSent) { - res.status(500).json({ - jsonrpc: '2.0', - error: { - code: -32603, - message: 'Internal server error', - }, - id: req?.body?.id, - }); - return; - } - } -}); - -// Handle GET requests for SSE streams (using built-in support from StreamableHTTP) -app.get('/mcp', async (req: Request, res: Response) => { - console.error('Received MCP GET request'); - const sessionId = req.headers['mcp-session-id'] as string | undefined; - if (!sessionId || !transports.has(sessionId)) { - res.status(400).json({ - jsonrpc: '2.0', - error: { - code: -32000, - message: 'Bad Request: No valid session ID provided', - }, - id: req?.body?.id, - }); - return; - } - - // Check for Last-Event-ID header for resumability - const lastEventId = req.headers['last-event-id'] as string | undefined; - if (lastEventId) { - console.error(`Client reconnecting with Last-Event-ID: ${lastEventId}`); - } else { - console.error(`Establishing new SSE stream for session ${sessionId}`); - } - - const transport = transports.get(sessionId); - await transport!.handleRequest(req, res); -}); - -// Handle DELETE requests for session termination (according to MCP spec) -app.delete('/mcp', async (req: Request, res: Response) => { - const sessionId = req.headers['mcp-session-id'] as string | undefined; - if (!sessionId || !transports.has(sessionId)) { - res.status(400).json({ - jsonrpc: '2.0', - error: { - code: -32000, - message: 'Bad Request: No valid session ID provided', - }, - id: req?.body?.id, - }); - return; - } - - console.error(`Received session termination request for session ${sessionId}`); - - try { - const transport = transports.get(sessionId); - await transport!.handleRequest(req, res); - } catch (error) { - console.error('Error handling session termination:', error); - if (!res.headersSent) { - res.status(500).json({ - jsonrpc: '2.0', - error: { - code: -32603, - message: 'Error handling session termination', - }, - id: req?.body?.id, - }); - return; - } - } -}); - -// Start the server -const PORT = process.env.PORT || 3001; -app.listen(PORT, () => { - console.error(`MCP Streamable HTTP Server listening on port ${PORT}`); -}); - -// Handle server shutdown -process.on('SIGINT', async () => { - console.error('Shutting down server...'); - - // Close all active transports to properly clean up resources - for (const sessionId in transports) { - try { - console.error(`Closing transport for session ${sessionId}`); - await transports.get(sessionId)!.close(); - transports.delete(sessionId); - } catch (error) { - console.error(`Error closing transport for session ${sessionId}:`, error); - } - } - - console.error('Server shutdown complete'); - process.exit(0); -}); diff --git a/src/everything/tools/echo.ts b/src/everything/tools/echo.ts new file mode 100644 index 00000000..166d43ff --- /dev/null +++ b/src/everything/tools/echo.ts @@ -0,0 +1,23 @@ +import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; +import { CallToolResult } from "@modelcontextprotocol/sdk/types.js"; +import { z } from "zod"; + +export const EchoSchema = z.object({ + message: z.string().describe("Message to echo"), +}); + +const name = "echo"; +const config = { + title: "Echo Tool", + description: "Echoes back the input string", + inputSchema: EchoSchema, +}; + +export const addToolEcho = (server: McpServer) => { + server.registerTool(name, config, async (args): Promise => { + const validatedArgs = EchoSchema.parse(args); + return { + content: [{ type: "text", text: `Echo: ${validatedArgs.message}` }], + }; + }); +}; diff --git a/src/everything/tools/index.ts b/src/everything/tools/index.ts new file mode 100644 index 00000000..3e58c34f --- /dev/null +++ b/src/everything/tools/index.ts @@ -0,0 +1,11 @@ +import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; +import { addToolEcho } from "./echo.js"; + + +/** + * Register the tools with the MCP server. + * @param server + */ +export const registerTools = (server: McpServer) => { + addToolEcho(server); +}; diff --git a/src/everything/sse.ts b/src/everything/transports/sse.ts similarity index 95% rename from src/everything/sse.ts rename to src/everything/transports/sse.ts index f5b984e9..26cb8efb 100644 --- a/src/everything/sse.ts +++ b/src/everything/transports/sse.ts @@ -1,6 +1,6 @@ import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; import express from "express"; -import { createServer } from "./everything.js"; +import { createServer } from "../server/index.js"; import cors from 'cors'; console.error('Starting SSE server...'); @@ -35,7 +35,7 @@ app.get("/sse", async (req, res) => { startNotificationIntervals(transport.sessionId); // Handle close of connection - server.onclose = async () => { + server.server.onclose = async () => { console.error("Client Disconnected: ", transport.sessionId); transports.delete(transport.sessionId); await cleanup(); diff --git a/src/everything/transports/stdio.ts b/src/everything/transports/stdio.ts new file mode 100644 index 00000000..c5060f4e --- /dev/null +++ b/src/everything/transports/stdio.ts @@ -0,0 +1,24 @@ +#!/usr/bin/env node + +import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; +import { createServer } from "../server/index.js"; + +console.error("Starting default (STDIO) server..."); + +async function main() { + const transport = new StdioServerTransport(); + const { server } = createServer(); + + await server.connect(transport); + + // Cleanup on exit + process.on("SIGINT", async () => { + await server.close(); + process.exit(0); + }); +} + +main().catch((error) => { + console.error("Server error:", error); + process.exit(1); +}); diff --git a/src/everything/transports/streamableHttp.ts b/src/everything/transports/streamableHttp.ts new file mode 100644 index 00000000..526db127 --- /dev/null +++ b/src/everything/transports/streamableHttp.ts @@ -0,0 +1,206 @@ +import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; +import { InMemoryEventStore } from "@modelcontextprotocol/sdk/examples/shared/inMemoryEventStore.js"; +import express, { Request, Response } from "express"; +import { createServer } from "../server/index.js"; +import { randomUUID } from "node:crypto"; +import cors from "cors"; + +console.log("Starting Streamable HTTP server..."); + +const app = express(); +app.use( + cors({ + origin: "*", // use "*" with caution in production + methods: "GET,POST,DELETE", + preflightContinue: false, + optionsSuccessStatus: 204, + exposedHeaders: ["mcp-session-id", "last-event-id", "mcp-protocol-version"], + }), +); // Enable CORS for all routes so Inspector can connect + +const transports: Map = new Map< + string, + StreamableHTTPServerTransport +>(); + +app.post("/mcp", async (req: Request, res: Response) => { + console.log("Received MCP POST request"); + try { + // Check for existing session ID + const sessionId = req.headers["mcp-session-id"] as string | undefined; + + let transport: StreamableHTTPServerTransport; + + if (sessionId && transports.has(sessionId)) { + // Reuse existing transport + transport = transports.get(sessionId)!; + } else if (!sessionId) { + const { server } = createServer(); + + // New initialization request + const eventStore = new InMemoryEventStore(); + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + eventStore, // Enable resumability + onsessioninitialized: (sessionId: string) => { + // Store the transport by session ID when session is initialized + // This avoids race conditions where requests might come in before the session is stored + console.log(`Session initialized with ID: ${sessionId}`); + transports.set(sessionId, transport); + }, + }); + + // Set up onclose handler to clean up transport when closed + server.server.onclose = async () => { + const sid = transport.sessionId; + if (sid && transports.has(sid)) { + console.log( + `Transport closed for session ${sid}, removing from transports map`, + ); + transports.delete(sid); + } + }; + + // Connect the transport to the MCP server BEFORE handling the request + // so responses can flow back through the same transport + await server.connect(transport); + + await transport.handleRequest(req, res); + + return; + } else { + // Invalid request - no session ID or not initialization request + res.status(400).json({ + jsonrpc: "2.0", + error: { + code: -32000, + message: "Bad Request: No valid session ID provided", + }, + id: req?.body?.id, + }); + return; + } + + // Handle the request with existing transport - no need to reconnect + // The existing transport is already connected to the server + await transport.handleRequest(req, res); + } catch (error) { + console.log("Error handling MCP request:", error); + if (!res.headersSent) { + res.status(500).json({ + jsonrpc: "2.0", + error: { + code: -32603, + message: "Internal server error", + }, + id: req?.body?.id, + }); + return; + } + } +}); + +// Handle GET requests for SSE streams (using built-in support from StreamableHTTP) +app.get("/mcp", async (req: Request, res: Response) => { + console.log("Received MCP GET request"); + const sessionId = req.headers["mcp-session-id"] as string | undefined; + if (!sessionId || !transports.has(sessionId)) { + res.status(400).json({ + jsonrpc: "2.0", + error: { + code: -32000, + message: "Bad Request: No valid session ID provided", + }, + id: req?.body?.id, + }); + return; + } + + // Check for Last-Event-ID header for resumability + const lastEventId = req.headers["last-event-id"] as string | undefined; + if (lastEventId) { + console.log(`Client reconnecting with Last-Event-ID: ${lastEventId}`); + } else { + console.log(`Establishing new SSE stream for session ${sessionId}`); + } + + const transport = transports.get(sessionId); + await transport!.handleRequest(req, res); +}); + +// Handle DELETE requests for session termination (according to MCP spec) +app.delete("/mcp", async (req: Request, res: Response) => { + const sessionId = req.headers["mcp-session-id"] as string | undefined; + if (!sessionId || !transports.has(sessionId)) { + res.status(400).json({ + jsonrpc: "2.0", + error: { + code: -32000, + message: "Bad Request: No valid session ID provided", + }, + id: req?.body?.id, + }); + return; + } + + console.log(`Received session termination request for session ${sessionId}`); + + try { + const transport = transports.get(sessionId); + await transport!.handleRequest(req, res); + } catch (error) { + console.log("Error handling session termination:", error); + if (!res.headersSent) { + res.status(500).json({ + jsonrpc: "2.0", + error: { + code: -32603, + message: "Error handling session termination", + }, + id: req?.body?.id, + }); + return; + } + } +}); + +// Start the server +const PORT = process.env.PORT || 3001; +const server = app.listen(PORT, () => { + console.error(`MCP Streamable HTTP Server listening on port ${PORT}`); +}); + +server.on("error", (err: unknown) => { + const code = + typeof err === "object" && err !== null && "code" in err + ? (err as { code?: unknown }).code + : undefined; + if (code === "EADDRINUSE") { + console.error( + `Failed to start: Port ${PORT} is already in use. Set PORT to a free port or stop the conflicting process.`, + ); + } else { + console.error("HTTP server encountered an error while starting:", err); + } + // Ensure a non-zero exit so npm reports the failure instead of silently exiting + process.exit(1); +}); + +// Handle server shutdown +process.on("SIGINT", async () => { + console.log("Shutting down server..."); + + // Close all active transports to properly clean up resources + for (const sessionId in transports) { + try { + console.log(`Closing transport for session ${sessionId}`); + await transports.get(sessionId)!.close(); + transports.delete(sessionId); + } catch (error) { + console.log(`Error closing transport for session ${sessionId}:`, error); + } + } + + console.log("Server shutdown complete"); + process.exit(0); +});