diff --git a/src/everything/server/index.ts b/src/everything/server/index.ts index 312305e9..7ee3eed9 100644 --- a/src/everything/server/index.ts +++ b/src/everything/server/index.ts @@ -2,7 +2,7 @@ import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { InMemoryTaskStore, InMemoryTaskMessageQueue, -} from "@modelcontextprotocol/sdk/experimental"; +} from "@modelcontextprotocol/sdk/experimental/tasks"; import { setSubscriptionHandlers, stopSimulatedResourceUpdates, diff --git a/src/everything/tools/simulate-research-query.ts b/src/everything/tools/simulate-research-query.ts index 7853b7c4..85fd20f9 100644 --- a/src/everything/tools/simulate-research-query.ts +++ b/src/everything/tools/simulate-research-query.ts @@ -4,11 +4,10 @@ import { CallToolResult, GetTaskResult, Task, + ElicitResult, ElicitResultSchema, - ServerRequest, } from "@modelcontextprotocol/sdk/types.js"; -import { CreateTaskResult } from "@modelcontextprotocol/sdk/experimental"; -import type { AnySchema, SchemaOutput } from "@modelcontextprotocol/sdk/server/zod-compat.js"; +import { CreateTaskResult } from "@modelcontextprotocol/sdk/experimental/tasks"; // Tool input schema const SimulateResearchQuerySchema = z.object({ @@ -48,7 +47,11 @@ const researchStates = new Map(); /** * Runs the background research process. * Updates task status as it progresses through stages. - * If clarification is needed, sends elicitation request directly. + * If clarification is needed, attempts elicitation via sendRequest. + * + * Note: Elicitation only works on STDIO transport. On HTTP transport, + * sendRequest will fail and the task will use a default interpretation. + * Full HTTP support requires SDK PR #1210's elicitInputStream API. */ async function runResearchProcess( taskId: string, @@ -65,11 +68,8 @@ async function runResearchProcess( result: CallToolResult ) => Promise; }, - sendRequest: ( - request: ServerRequest, - resultSchema: U, - options?: { timeout?: number } - ) => Promise> + // eslint-disable-next-line @typescript-eslint/no-explicit-any + sendRequest: any ): Promise { const state = researchStates.get(taskId); if (!state) return; @@ -86,56 +86,63 @@ async function runResearchProcess( // At synthesis stage (index 2), check if clarification is needed if (i === 2 && state.ambiguous && !state.clarification) { - // Update status to show we're requesting input + // Update status to show we're requesting input (spec SHOULD) await taskStore.updateTaskStatus( taskId, "input_required", `Found multiple interpretations for "${state.topic}". Requesting clarification...` ); - // Send elicitation directly and await response - const elicitationResult = await sendRequest( - { - method: "elicitation/create", - params: { - message: `The research query "${state.topic}" could have multiple interpretations. Please clarify what you're looking for:`, - requestedSchema: { - type: "object", - properties: { - interpretation: { - type: "string", - title: "Clarification", - description: "Which interpretation of the topic do you mean?", - oneOf: getInterpretationsForTopic(state.topic), + try { + // Try elicitation via sendRequest (works on STDIO, fails on HTTP) + const elicitResult: ElicitResult = await sendRequest( + { + method: "elicitation/create", + params: { + message: `The research query "${state.topic}" could have multiple interpretations. Please clarify what you're looking for:`, + requestedSchema: { + type: "object", + properties: { + interpretation: { + type: "string", + title: "Clarification", + description: "Which interpretation of the topic do you mean?", + oneOf: getInterpretationsForTopic(state.topic), + }, }, + required: ["interpretation"], }, - required: ["interpretation"], }, }, - }, - ElicitResultSchema, - { timeout: 5 * 60 * 1000 /* 5 minutes */ } - ); + ElicitResultSchema + ); - // Process elicitation response - if ( - elicitationResult.action === "accept" && - elicitationResult.content - ) { + // Process elicitation response + if (elicitResult.action === "accept" && elicitResult.content) { + state.clarification = + (elicitResult.content as { interpretation?: string }) + .interpretation || "User accepted without selection"; + } else if (elicitResult.action === "decline") { + state.clarification = "User declined - using default interpretation"; + } else { + state.clarification = "User cancelled - using default interpretation"; + } + } catch (error) { + // Elicitation failed (likely HTTP transport without streaming support) + // Use default interpretation and continue - task should still complete + console.warn( + `Elicitation failed for task ${taskId} (HTTP transport?):`, + error instanceof Error ? error.message : String(error) + ); state.clarification = - (elicitationResult.content as { interpretation?: string }) - .interpretation || "User accepted without selection"; - } else if (elicitationResult.action === "decline") { - state.clarification = "User declined - using default interpretation"; - } else { - state.clarification = "User cancelled - using default interpretation"; + "technical (default - elicitation unavailable on HTTP)"; } - // Resume with working status + // Resume with working status (spec SHOULD) await taskStore.updateTaskStatus( taskId, "working", - `Received clarification: "${state.clarification}". Continuing...` + `Continuing with interpretation: "${state.clarification}"...` ); // Continue processing (no return - just keep going through the loop) @@ -185,9 +192,12 @@ This tool demonstrates MCP's task-based execution pattern for long-running opera ${state.clarification ? `**Elicitation Flow:** When the query was ambiguous, the server sent an \`elicitation/create\` request -directly to the client. The task status changed to \`input_required\` while -awaiting user input. After receiving clarification ("${state.clarification}"), -the task resumed processing and completed. +to the client. The task status changed to \`input_required\` while awaiting user input. +${state.clarification.includes("unavailable on HTTP") ? ` +**Note:** Elicitation was skipped because this server is running over HTTP transport. +The current SDK's \`sendRequest\` only works over STDIO. Full HTTP elicitation support +requires SDK PR #1210's streaming \`elicitInputStream\` API. +` : `After receiving clarification ("${state.clarification}"), the task resumed processing and completed.`} ` : ""} **Key Concepts:** - Tasks enable "call now, fetch later" patterns @@ -258,7 +268,7 @@ export const registerSimulateResearchQueryTool = (server: McpServer) => { researchStates.set(task.taskId, state); // Start background research (don't await - runs asynchronously) - // Pass sendRequest so elicitation can be sent directly from the background process + // Pass sendRequest for elicitation (works on STDIO, gracefully degrades on HTTP) runResearchProcess( task.taskId, validatedArgs, diff --git a/src/everything/tools/trigger-elicitation-request.ts b/src/everything/tools/trigger-elicitation-request.ts index 6281c87d..16eaac89 100644 --- a/src/everything/tools/trigger-elicitation-request.ts +++ b/src/everything/tools/trigger-elicitation-request.ts @@ -1,6 +1,5 @@ import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; -import { ElicitResultSchema } from "@modelcontextprotocol/sdk/types.js"; -import { CallToolResult } from "@modelcontextprotocol/sdk/types.js"; +import { ElicitResultSchema, CallToolResult } from "@modelcontextprotocol/sdk/types.js"; // Tool configuration const name = "trigger-elicitation-request"; diff --git a/src/everything/transports/streamableHttp.ts b/src/everything/transports/streamableHttp.ts index 13ed2507..1c168f30 100644 --- a/src/everything/transports/streamableHttp.ts +++ b/src/everything/transports/streamableHttp.ts @@ -1,10 +1,37 @@ -import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; -import { InMemoryEventStore } from "@modelcontextprotocol/sdk/examples/shared/inMemoryEventStore.js"; +import { StreamableHTTPServerTransport, EventStore } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import express, { Request, Response } from "express"; import { createServer } from "../server/index.js"; import { randomUUID } from "node:crypto"; import cors from "cors"; +// Simple in-memory event store for SSE resumability +class InMemoryEventStore implements EventStore { + private events: Map = new Map(); + + async storeEvent(streamId: string, message: unknown): Promise { + const eventId = randomUUID(); + this.events.set(eventId, { streamId, message }); + return eventId; + } + + async replayEventsAfter( + lastEventId: string, + { send }: { send: (eventId: string, message: unknown) => Promise } + ): Promise { + const entries = Array.from(this.events.entries()); + const startIndex = entries.findIndex(([id]) => id === lastEventId); + if (startIndex === -1) return lastEventId; + + let lastId: string = lastEventId; + for (let i = startIndex + 1; i < entries.length; i++) { + const [eventId, { message }] = entries[i]; + await send(eventId, message); + lastId = eventId; + } + return lastId; + } +} + console.log("Starting Streamable HTTP server..."); // Express app with permissive CORS for testing with Inspector direct connect mode