mirror of
https://github.com/modelcontextprotocol/servers.git
synced 2026-02-19 11:54:58 -05:00
Merge pull request #3193 from olaservo/add-tasks-support
feat(everything): add SEP-1686 Tasks support
This commit is contained in:
@@ -22,7 +22,10 @@
|
||||
- `trigger-long-running-operation` (tools/trigger-trigger-long-running-operation.ts): Simulates a multi-step operation over a given `duration` and number of `steps`; reports progress via `notifications/progress` when a `progressToken` is provided by the client.
|
||||
- `toggle-simulated-logging` (tools/toggle-simulated-logging.ts): Starts or stops simulated, random‑leveled logging for the invoking session. Respects the client’s selected minimum logging level.
|
||||
- `toggle-subscriber-updates` (tools/toggle-subscriber-updates.ts): Starts or stops simulated resource update notifications for URIs the invoking session has subscribed to.
|
||||
- `trigger-sampling-request` (tools/trigger-sampling-request.ts): Issues a `sampling/createMessage` request to the client/LLM using provided `prompt` and optional generation controls; returns the LLM’s response payload.
|
||||
- `trigger-sampling-request` (tools/trigger-sampling-request.ts): Issues a `sampling/createMessage` request to the client/LLM using provided `prompt` and optional generation controls; returns the LLM's response payload.
|
||||
- `simulate-research-query` (tools/simulate-research-query.ts): Demonstrates MCP Tasks (SEP-1686) with a simulated multi-stage research operation. Accepts `topic` and `ambiguous` parameters. Returns a task that progresses through stages with status updates. If `ambiguous` is true and client supports elicitation, sends an elicitation request directly to gather clarification before completing.
|
||||
- `trigger-sampling-request-async` (tools/trigger-sampling-request-async.ts): Demonstrates bidirectional tasks where the server sends a sampling request that the client executes as a background task. Server polls for status and retrieves the LLM result when complete. Requires client to support `tasks.requests.sampling.createMessage`.
|
||||
- `trigger-elicitation-request-async` (tools/trigger-elicitation-request-async.ts): Demonstrates bidirectional tasks where the server sends an elicitation request that the client executes as a background task. Server polls while waiting for user input. Requires client to support `tasks.requests.elicitation.create`.
|
||||
|
||||
## Prompts
|
||||
|
||||
@@ -50,3 +53,50 @@
|
||||
- Simulated logging is available but off by default.
|
||||
- Use the `toggle-simulated-logging` tool to start/stop periodic log messages of varying levels (debug, info, notice, warning, error, critical, alert, emergency) per session.
|
||||
- Clients can control the minimum level they receive via the standard MCP `logging/setLevel` request.
|
||||
|
||||
## Tasks (SEP-1686)
|
||||
|
||||
The server advertises support for MCP Tasks, enabling long-running operations with status tracking:
|
||||
|
||||
- **Capabilities advertised**: `tasks.list`, `tasks.cancel`, `tasks.requests.tools.call`
|
||||
- **Task Store**: Uses `InMemoryTaskStore` from SDK experimental for task lifecycle management
|
||||
- **Message Queue**: Uses `InMemoryTaskMessageQueue` for task-related messaging
|
||||
|
||||
### Task Lifecycle
|
||||
|
||||
1. Client calls `tools/call` with `task: true` parameter
|
||||
2. Server returns `CreateTaskResult` with `taskId` instead of immediate result
|
||||
3. Client polls `tasks/get` to check status and receive `statusMessage` updates
|
||||
4. When status is `completed`, client calls `tasks/result` to retrieve the final result
|
||||
|
||||
### Task Statuses
|
||||
|
||||
- `working`: Task is actively processing
|
||||
- `input_required`: Task needs additional input (server sends elicitation request directly)
|
||||
- `completed`: Task finished successfully
|
||||
- `failed`: Task encountered an error
|
||||
- `cancelled`: Task was cancelled by client
|
||||
|
||||
### Demo Tools
|
||||
|
||||
**Server-side tasks (client calls server):**
|
||||
Use the `simulate-research-query` tool to exercise the full task lifecycle. Set `ambiguous: true` to trigger elicitation - the server will send an `elicitation/create` request directly and await the response before completing.
|
||||
|
||||
**Client-side tasks (server calls client):**
|
||||
Use `trigger-sampling-request-async` or `trigger-elicitation-request-async` to demonstrate bidirectional tasks where the server sends requests that the client executes as background tasks. These require the client to advertise `tasks.requests.sampling.createMessage` or `tasks.requests.elicitation.create` capabilities respectively.
|
||||
|
||||
### Bidirectional Task Flow
|
||||
|
||||
MCP Tasks are bidirectional - both server and client can be task executors:
|
||||
|
||||
| Direction | Request Type | Task Executor | Demo Tool |
|
||||
|-----------|--------------|---------------|-----------|
|
||||
| Client -> Server | `tools/call` | Server | `simulate-research-query` |
|
||||
| Server -> Client | `sampling/createMessage` | Client | `trigger-sampling-request-async` |
|
||||
| Server -> Client | `elicitation/create` | Client | `trigger-elicitation-request-async` |
|
||||
|
||||
For client-side tasks:
|
||||
1. Server sends request with task metadata (e.g., `params.task.ttl`)
|
||||
2. Client creates task and returns `CreateTaskResult` with `taskId`
|
||||
3. Server polls `tasks/get` for status updates
|
||||
4. When complete, server calls `tasks/result` to retrieve the result
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
|
||||
import {
|
||||
InMemoryTaskStore,
|
||||
InMemoryTaskMessageQueue,
|
||||
} from "@modelcontextprotocol/sdk/experimental/tasks";
|
||||
import {
|
||||
setSubscriptionHandlers,
|
||||
stopSimulatedResourceUpdates,
|
||||
@@ -32,6 +36,10 @@ export const createServer: () => ServerFactoryResponse = () => {
|
||||
// Read the server instructions
|
||||
const instructions = readInstructions();
|
||||
|
||||
// Create task store and message queue for task support
|
||||
const taskStore = new InMemoryTaskStore();
|
||||
const taskMessageQueue = new InMemoryTaskMessageQueue();
|
||||
|
||||
// Create the server
|
||||
const server = new McpServer(
|
||||
{
|
||||
@@ -52,8 +60,19 @@ export const createServer: () => ServerFactoryResponse = () => {
|
||||
listChanged: true,
|
||||
},
|
||||
logging: {},
|
||||
tasks: {
|
||||
list: {},
|
||||
cancel: {},
|
||||
requests: {
|
||||
tools: {
|
||||
call: {},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
instructions,
|
||||
taskStore,
|
||||
taskMessageQueue,
|
||||
}
|
||||
);
|
||||
|
||||
@@ -89,6 +108,8 @@ export const createServer: () => ServerFactoryResponse = () => {
|
||||
// Stop any simulated logging or resource updates that may have been initiated.
|
||||
stopSimulatedLogging(sessionId);
|
||||
stopSimulatedResourceUpdates(sessionId);
|
||||
// Clean up task store timers
|
||||
taskStore.cleanup();
|
||||
},
|
||||
} satisfies ServerFactoryResponse;
|
||||
};
|
||||
|
||||
@@ -14,6 +14,9 @@ import { registerToggleSubscriberUpdatesTool } from "./toggle-subscriber-updates
|
||||
import { registerTriggerElicitationRequestTool } from "./trigger-elicitation-request.js";
|
||||
import { registerTriggerLongRunningOperationTool } from "./trigger-long-running-operation.js";
|
||||
import { registerTriggerSamplingRequestTool } from "./trigger-sampling-request.js";
|
||||
import { registerTriggerSamplingRequestAsyncTool } from "./trigger-sampling-request-async.js";
|
||||
import { registerTriggerElicitationRequestAsyncTool } from "./trigger-elicitation-request-async.js";
|
||||
import { registerSimulateResearchQueryTool } from "./simulate-research-query.js";
|
||||
|
||||
/**
|
||||
* Register the tools with the MCP server.
|
||||
@@ -42,4 +45,9 @@ export const registerConditionalTools = (server: McpServer) => {
|
||||
registerGetRootsListTool(server);
|
||||
registerTriggerElicitationRequestTool(server);
|
||||
registerTriggerSamplingRequestTool(server);
|
||||
// Task-based research tool (uses experimental tasks API)
|
||||
registerSimulateResearchQueryTool(server);
|
||||
// Bidirectional task tools - server sends requests that client executes as tasks
|
||||
registerTriggerSamplingRequestAsyncTool(server);
|
||||
registerTriggerElicitationRequestAsyncTool(server);
|
||||
};
|
||||
|
||||
336
src/everything/tools/simulate-research-query.ts
Normal file
336
src/everything/tools/simulate-research-query.ts
Normal file
@@ -0,0 +1,336 @@
|
||||
import { z } from "zod";
|
||||
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
|
||||
import {
|
||||
CallToolResult,
|
||||
GetTaskResult,
|
||||
Task,
|
||||
ElicitResult,
|
||||
ElicitResultSchema,
|
||||
} from "@modelcontextprotocol/sdk/types.js";
|
||||
import { CreateTaskResult } from "@modelcontextprotocol/sdk/experimental/tasks";
|
||||
|
||||
// Tool input schema
|
||||
const SimulateResearchQuerySchema = z.object({
|
||||
topic: z.string().describe("The research topic to investigate"),
|
||||
ambiguous: z
|
||||
.boolean()
|
||||
.default(false)
|
||||
.describe(
|
||||
"Simulate an ambiguous query that requires clarification (triggers input_required status)"
|
||||
),
|
||||
});
|
||||
|
||||
// Research stages
|
||||
const STAGES = [
|
||||
"Gathering sources",
|
||||
"Analyzing content",
|
||||
"Synthesizing findings",
|
||||
"Generating report",
|
||||
];
|
||||
|
||||
// Duration per stage in milliseconds
|
||||
const STAGE_DURATION = 1000;
|
||||
|
||||
// Internal state for tracking research tasks
|
||||
interface ResearchState {
|
||||
topic: string;
|
||||
ambiguous: boolean;
|
||||
currentStage: number;
|
||||
clarification?: string;
|
||||
completed: boolean;
|
||||
result?: CallToolResult;
|
||||
}
|
||||
|
||||
// Map to store research state per task
|
||||
const researchStates = new Map<string, ResearchState>();
|
||||
|
||||
/**
|
||||
* Runs the background research process.
|
||||
* Updates task status as it progresses through stages.
|
||||
* If clarification is needed, attempts elicitation via sendRequest.
|
||||
*
|
||||
* Note: Elicitation only works on STDIO transport. On HTTP transport,
|
||||
* sendRequest will fail and the task will use a default interpretation.
|
||||
* Full HTTP support requires SDK PR #1210's elicitInputStream API.
|
||||
*/
|
||||
async function runResearchProcess(
|
||||
taskId: string,
|
||||
args: z.infer<typeof SimulateResearchQuerySchema>,
|
||||
taskStore: {
|
||||
updateTaskStatus: (
|
||||
taskId: string,
|
||||
status: Task["status"],
|
||||
message?: string
|
||||
) => Promise<void>;
|
||||
storeTaskResult: (
|
||||
taskId: string,
|
||||
status: "completed" | "failed",
|
||||
result: CallToolResult
|
||||
) => Promise<void>;
|
||||
},
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
sendRequest: any
|
||||
): Promise<void> {
|
||||
const state = researchStates.get(taskId);
|
||||
if (!state) return;
|
||||
|
||||
// Process each stage
|
||||
for (let i = state.currentStage; i < STAGES.length; i++) {
|
||||
state.currentStage = i;
|
||||
|
||||
// Check if task was cancelled externally
|
||||
if (state.completed) return;
|
||||
|
||||
// Update status message for current stage
|
||||
await taskStore.updateTaskStatus(taskId, "working", `${STAGES[i]}...`);
|
||||
|
||||
// At synthesis stage (index 2), check if clarification is needed
|
||||
if (i === 2 && state.ambiguous && !state.clarification) {
|
||||
// Update status to show we're requesting input (spec SHOULD)
|
||||
await taskStore.updateTaskStatus(
|
||||
taskId,
|
||||
"input_required",
|
||||
`Found multiple interpretations for "${state.topic}". Requesting clarification...`
|
||||
);
|
||||
|
||||
try {
|
||||
// Try elicitation via sendRequest (works on STDIO, fails on HTTP)
|
||||
const elicitResult: ElicitResult = await sendRequest(
|
||||
{
|
||||
method: "elicitation/create",
|
||||
params: {
|
||||
message: `The research query "${state.topic}" could have multiple interpretations. Please clarify what you're looking for:`,
|
||||
requestedSchema: {
|
||||
type: "object",
|
||||
properties: {
|
||||
interpretation: {
|
||||
type: "string",
|
||||
title: "Clarification",
|
||||
description: "Which interpretation of the topic do you mean?",
|
||||
oneOf: getInterpretationsForTopic(state.topic),
|
||||
},
|
||||
},
|
||||
required: ["interpretation"],
|
||||
},
|
||||
},
|
||||
},
|
||||
ElicitResultSchema
|
||||
);
|
||||
|
||||
// Process elicitation response
|
||||
if (elicitResult.action === "accept" && elicitResult.content) {
|
||||
state.clarification =
|
||||
(elicitResult.content as { interpretation?: string })
|
||||
.interpretation || "User accepted without selection";
|
||||
} else if (elicitResult.action === "decline") {
|
||||
state.clarification = "User declined - using default interpretation";
|
||||
} else {
|
||||
state.clarification = "User cancelled - using default interpretation";
|
||||
}
|
||||
} catch (error) {
|
||||
// Elicitation failed (likely HTTP transport without streaming support)
|
||||
// Use default interpretation and continue - task should still complete
|
||||
console.warn(
|
||||
`Elicitation failed for task ${taskId} (HTTP transport?):`,
|
||||
error instanceof Error ? error.message : String(error)
|
||||
);
|
||||
state.clarification =
|
||||
"technical (default - elicitation unavailable on HTTP)";
|
||||
}
|
||||
|
||||
// Resume with working status (spec SHOULD)
|
||||
await taskStore.updateTaskStatus(
|
||||
taskId,
|
||||
"working",
|
||||
`Continuing with interpretation: "${state.clarification}"...`
|
||||
);
|
||||
|
||||
// Continue processing (no return - just keep going through the loop)
|
||||
}
|
||||
|
||||
// Simulate work for this stage
|
||||
await new Promise((resolve) => setTimeout(resolve, STAGE_DURATION));
|
||||
}
|
||||
|
||||
// All stages complete - generate result
|
||||
state.completed = true;
|
||||
const result = generateResearchReport(state);
|
||||
state.result = result;
|
||||
|
||||
await taskStore.storeTaskResult(taskId, "completed", result);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates the final research report with educational content about tasks.
|
||||
*/
|
||||
function generateResearchReport(state: ResearchState): CallToolResult {
|
||||
const topic = state.clarification
|
||||
? `${state.topic} (${state.clarification})`
|
||||
: state.topic;
|
||||
|
||||
const report = `# Research Report: ${topic}
|
||||
|
||||
## Research Parameters
|
||||
- **Topic**: ${state.topic}
|
||||
${state.clarification ? `- **Clarification**: ${state.clarification}` : ""}
|
||||
|
||||
## Synthesis
|
||||
This research query was processed through ${STAGES.length} stages:
|
||||
${STAGES.map((s, i) => `- Stage ${i + 1}: ${s} ✓`).join("\n")}
|
||||
|
||||
---
|
||||
|
||||
## About This Demo (SEP-1686: Tasks)
|
||||
|
||||
This tool demonstrates MCP's task-based execution pattern for long-running operations:
|
||||
|
||||
**Task Lifecycle Demonstrated:**
|
||||
1. \`tools/call\` with \`task\` parameter → Server returns \`CreateTaskResult\` (not the final result)
|
||||
2. Client polls \`tasks/get\` → Server returns current status and \`statusMessage\`
|
||||
3. Status progressed: \`working\` → ${state.clarification ? `\`input_required\` → \`working\` → ` : ""}\`completed\`
|
||||
4. Client calls \`tasks/result\` → Server returns this final result
|
||||
|
||||
${state.clarification ? `**Elicitation Flow:**
|
||||
When the query was ambiguous, the server sent an \`elicitation/create\` request
|
||||
to the client. The task status changed to \`input_required\` while awaiting user input.
|
||||
${state.clarification.includes("unavailable on HTTP") ? `
|
||||
**Note:** Elicitation was skipped because this server is running over HTTP transport.
|
||||
The current SDK's \`sendRequest\` only works over STDIO. Full HTTP elicitation support
|
||||
requires SDK PR #1210's streaming \`elicitInputStream\` API.
|
||||
` : `After receiving clarification ("${state.clarification}"), the task resumed processing and completed.`}
|
||||
` : ""}
|
||||
**Key Concepts:**
|
||||
- Tasks enable "call now, fetch later" patterns
|
||||
- \`statusMessage\` provides human-readable progress updates
|
||||
- Tasks have TTL (time-to-live) for automatic cleanup
|
||||
- \`pollInterval\` suggests how often to check status
|
||||
- Elicitation requests can be sent directly during task execution
|
||||
|
||||
*This is a simulated research report from the Everything MCP Server.*
|
||||
`;
|
||||
|
||||
return {
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: report,
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers the 'simulate-research-query' tool as a task-based tool.
|
||||
*
|
||||
* This tool demonstrates the MCP Tasks feature (SEP-1686) with a real-world scenario:
|
||||
* a research tool that gathers and synthesizes information from multiple sources.
|
||||
* If the query is ambiguous, it pauses to ask for clarification before completing.
|
||||
*
|
||||
* @param {McpServer} server - The McpServer instance where the tool will be registered.
|
||||
*/
|
||||
export const registerSimulateResearchQueryTool = (server: McpServer) => {
|
||||
// Check if client supports elicitation (needed for input_required flow)
|
||||
const clientCapabilities = server.server.getClientCapabilities() || {};
|
||||
const clientSupportsElicitation: boolean =
|
||||
clientCapabilities.elicitation !== undefined;
|
||||
|
||||
server.experimental.tasks.registerToolTask(
|
||||
"simulate-research-query",
|
||||
{
|
||||
title: "Simulate Research Query",
|
||||
description:
|
||||
"Simulates a deep research operation that gathers, analyzes, and synthesizes information. " +
|
||||
"Demonstrates MCP task-based operations with progress through multiple stages. " +
|
||||
"If 'ambiguous' is true and client supports elicitation, sends an elicitation request for clarification.",
|
||||
inputSchema: SimulateResearchQuerySchema,
|
||||
execution: { taskSupport: "required" },
|
||||
},
|
||||
{
|
||||
/**
|
||||
* Creates a new research task and starts background processing.
|
||||
*/
|
||||
createTask: async (args, extra): Promise<CreateTaskResult> => {
|
||||
const validatedArgs = SimulateResearchQuerySchema.parse(args);
|
||||
|
||||
// Create the task in the store
|
||||
const task = await extra.taskStore.createTask({
|
||||
ttl: 300000, // 5 minutes
|
||||
pollInterval: 1000,
|
||||
});
|
||||
|
||||
// Initialize research state
|
||||
const state: ResearchState = {
|
||||
topic: validatedArgs.topic,
|
||||
ambiguous: validatedArgs.ambiguous && clientSupportsElicitation,
|
||||
currentStage: 0,
|
||||
completed: false,
|
||||
};
|
||||
researchStates.set(task.taskId, state);
|
||||
|
||||
// Start background research (don't await - runs asynchronously)
|
||||
// Pass sendRequest for elicitation (works on STDIO, gracefully degrades on HTTP)
|
||||
runResearchProcess(
|
||||
task.taskId,
|
||||
validatedArgs,
|
||||
extra.taskStore,
|
||||
extra.sendRequest
|
||||
).catch((error) => {
|
||||
console.error(`Research task ${task.taskId} failed:`, error);
|
||||
extra.taskStore
|
||||
.updateTaskStatus(task.taskId, "failed", String(error))
|
||||
.catch(console.error);
|
||||
});
|
||||
|
||||
return { task };
|
||||
},
|
||||
|
||||
/**
|
||||
* Returns the current status of the research task.
|
||||
*/
|
||||
getTask: async (args, extra): Promise<GetTaskResult> => {
|
||||
const task = await extra.taskStore.getTask(extra.taskId);
|
||||
// The SDK's RequestTaskStore.getTask throws if not found, so task is always defined
|
||||
return task;
|
||||
},
|
||||
|
||||
/**
|
||||
* Returns the task result.
|
||||
* Elicitation is now handled directly in the background process.
|
||||
*/
|
||||
getTaskResult: async (args, extra): Promise<CallToolResult> => {
|
||||
// Return the stored result
|
||||
const result = await extra.taskStore.getTaskResult(extra.taskId);
|
||||
|
||||
// Clean up state
|
||||
researchStates.delete(extra.taskId);
|
||||
|
||||
return result as CallToolResult;
|
||||
},
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
/**
|
||||
* Returns contextual interpretation options based on the topic.
|
||||
*/
|
||||
function getInterpretationsForTopic(
|
||||
topic: string
|
||||
): Array<{ const: string; title: string }> {
|
||||
const lowerTopic = topic.toLowerCase();
|
||||
|
||||
// Example: contextual interpretations for "python"
|
||||
if (lowerTopic.includes("python")) {
|
||||
return [
|
||||
{ const: "programming", title: "Python programming language" },
|
||||
{ const: "snake", title: "Python snake species" },
|
||||
{ const: "comedy", title: "Monty Python comedy group" },
|
||||
];
|
||||
}
|
||||
|
||||
// Default generic interpretations
|
||||
return [
|
||||
{ const: "technical", title: "Technical/scientific perspective" },
|
||||
{ const: "historical", title: "Historical perspective" },
|
||||
{ const: "current", title: "Current events/news perspective" },
|
||||
];
|
||||
}
|
||||
239
src/everything/tools/trigger-elicitation-request-async.ts
Normal file
239
src/everything/tools/trigger-elicitation-request-async.ts
Normal file
@@ -0,0 +1,239 @@
|
||||
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
|
||||
import { CallToolResult } from "@modelcontextprotocol/sdk/types.js";
|
||||
import { z } from "zod";
|
||||
|
||||
// Tool configuration
|
||||
const name = "trigger-elicitation-request-async";
|
||||
const config = {
|
||||
title: "Trigger Async Elicitation Request Tool",
|
||||
description:
|
||||
"Trigger an async elicitation request that the CLIENT executes as a background task. " +
|
||||
"Demonstrates bidirectional MCP tasks where the server sends an elicitation request and " +
|
||||
"the client handles user input asynchronously, allowing the server to poll for completion.",
|
||||
inputSchema: {},
|
||||
};
|
||||
|
||||
// Poll interval in milliseconds
|
||||
const POLL_INTERVAL = 1000;
|
||||
|
||||
// Maximum poll attempts before timeout (10 minutes for user input)
|
||||
const MAX_POLL_ATTEMPTS = 600;
|
||||
|
||||
/**
|
||||
* Registers the 'trigger-elicitation-request-async' tool.
|
||||
*
|
||||
* This tool demonstrates bidirectional MCP tasks for elicitation:
|
||||
* - Server sends elicitation request to client with task metadata
|
||||
* - Client creates a task and returns CreateTaskResult
|
||||
* - Client prompts user for input (task status: input_required)
|
||||
* - Server polls client's tasks/get endpoint for status
|
||||
* - Server fetches final result from client's tasks/result endpoint
|
||||
*
|
||||
* @param {McpServer} server - The McpServer instance where the tool will be registered.
|
||||
*/
|
||||
export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) => {
|
||||
// Check client capabilities
|
||||
const clientCapabilities = server.server.getClientCapabilities() || {};
|
||||
|
||||
// Client must support elicitation AND tasks.requests.elicitation
|
||||
const clientSupportsElicitation = clientCapabilities.elicitation !== undefined;
|
||||
const clientTasksCapability = clientCapabilities.tasks as {
|
||||
requests?: { elicitation?: { create?: object } };
|
||||
} | undefined;
|
||||
const clientSupportsAsyncElicitation =
|
||||
clientTasksCapability?.requests?.elicitation?.create !== undefined;
|
||||
|
||||
if (clientSupportsElicitation && clientSupportsAsyncElicitation) {
|
||||
server.registerTool(
|
||||
name,
|
||||
config,
|
||||
async (args, extra): Promise<CallToolResult> => {
|
||||
// Create the elicitation request WITH task metadata
|
||||
// Using z.any() schema to avoid complex type matching with _meta
|
||||
const request = {
|
||||
method: "elicitation/create" as const,
|
||||
params: {
|
||||
task: {
|
||||
ttl: 600000, // 10 minutes (user input may take a while)
|
||||
},
|
||||
message: "Please provide inputs for the following fields (async task demo):",
|
||||
requestedSchema: {
|
||||
type: "object" as const,
|
||||
properties: {
|
||||
name: {
|
||||
title: "Your Name",
|
||||
type: "string" as const,
|
||||
description: "Your full name",
|
||||
},
|
||||
favoriteColor: {
|
||||
title: "Favorite Color",
|
||||
type: "string" as const,
|
||||
description: "What is your favorite color?",
|
||||
enum: ["Red", "Blue", "Green", "Yellow", "Purple"],
|
||||
},
|
||||
agreeToTerms: {
|
||||
title: "Terms Agreement",
|
||||
type: "boolean" as const,
|
||||
description: "Do you agree to the terms and conditions?",
|
||||
},
|
||||
},
|
||||
required: ["name"],
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
// Send the elicitation request
|
||||
// Client may return either:
|
||||
// - ElicitResult (synchronous execution)
|
||||
// - CreateTaskResult (task-based execution with { task } object)
|
||||
const elicitResponse = await extra.sendRequest(
|
||||
request as Parameters<typeof extra.sendRequest>[0],
|
||||
z.union([
|
||||
// CreateTaskResult - client created a task
|
||||
z.object({
|
||||
task: z.object({
|
||||
taskId: z.string(),
|
||||
status: z.string(),
|
||||
pollInterval: z.number().optional(),
|
||||
statusMessage: z.string().optional(),
|
||||
}),
|
||||
}),
|
||||
// ElicitResult - synchronous execution
|
||||
z.object({
|
||||
action: z.string(),
|
||||
content: z.any().optional(),
|
||||
}),
|
||||
])
|
||||
);
|
||||
|
||||
// Check if client returned CreateTaskResult (has task object)
|
||||
const isTaskResult = 'task' in elicitResponse && elicitResponse.task;
|
||||
if (!isTaskResult) {
|
||||
// Client executed synchronously - return the direct response
|
||||
return {
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: `[SYNC] Client executed synchronously:\n${JSON.stringify(elicitResponse, null, 2)}`,
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
const taskId = elicitResponse.task.taskId;
|
||||
const statusMessages: string[] = [];
|
||||
statusMessages.push(`Task created: ${taskId}`);
|
||||
|
||||
// Poll for task completion
|
||||
let attempts = 0;
|
||||
let taskStatus = elicitResponse.task.status;
|
||||
let taskStatusMessage: string | undefined;
|
||||
|
||||
while (
|
||||
taskStatus !== "completed" &&
|
||||
taskStatus !== "failed" &&
|
||||
taskStatus !== "cancelled" &&
|
||||
attempts < MAX_POLL_ATTEMPTS
|
||||
) {
|
||||
// Wait before polling
|
||||
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL));
|
||||
attempts++;
|
||||
|
||||
// Get task status from client
|
||||
const pollResult = await extra.sendRequest(
|
||||
{
|
||||
method: "tasks/get",
|
||||
params: { taskId },
|
||||
},
|
||||
z.object({
|
||||
status: z.string(),
|
||||
statusMessage: z.string().optional(),
|
||||
}).passthrough()
|
||||
);
|
||||
|
||||
taskStatus = pollResult.status;
|
||||
taskStatusMessage = pollResult.statusMessage;
|
||||
|
||||
// Only log status changes or every 10 polls to avoid spam
|
||||
if (attempts === 1 || attempts % 10 === 0 || taskStatus !== "input_required") {
|
||||
statusMessages.push(
|
||||
`Poll ${attempts}: ${taskStatus}${taskStatusMessage ? ` - ${taskStatusMessage}` : ""}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Check for timeout
|
||||
if (attempts >= MAX_POLL_ATTEMPTS) {
|
||||
return {
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: `[TIMEOUT] Task timed out after ${MAX_POLL_ATTEMPTS} poll attempts\n\nProgress:\n${statusMessages.join("\n")}`,
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
// Check for failure/cancellation
|
||||
if (taskStatus === "failed" || taskStatus === "cancelled") {
|
||||
return {
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: `[${taskStatus.toUpperCase()}] ${taskStatusMessage || "No message"}\n\nProgress:\n${statusMessages.join("\n")}`,
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
// Fetch the final result
|
||||
const result = await extra.sendRequest(
|
||||
{
|
||||
method: "tasks/result",
|
||||
params: { taskId },
|
||||
},
|
||||
z.any()
|
||||
);
|
||||
|
||||
// Format the elicitation result
|
||||
const content: CallToolResult["content"] = [];
|
||||
|
||||
if (result.action === "accept" && result.content) {
|
||||
content.push({
|
||||
type: "text",
|
||||
text: `[COMPLETED] User provided the requested information!`,
|
||||
});
|
||||
|
||||
const userData = result.content as Record<string, unknown>;
|
||||
const lines = [];
|
||||
if (userData.name) lines.push(`- Name: ${userData.name}`);
|
||||
if (userData.favoriteColor) lines.push(`- Favorite Color: ${userData.favoriteColor}`);
|
||||
if (userData.agreeToTerms !== undefined) lines.push(`- Agreed to terms: ${userData.agreeToTerms}`);
|
||||
|
||||
content.push({
|
||||
type: "text",
|
||||
text: `User inputs:\n${lines.join("\n")}`,
|
||||
});
|
||||
} else if (result.action === "decline") {
|
||||
content.push({
|
||||
type: "text",
|
||||
text: `[DECLINED] User declined to provide the requested information.`,
|
||||
});
|
||||
} else if (result.action === "cancel") {
|
||||
content.push({
|
||||
type: "text",
|
||||
text: `[CANCELLED] User cancelled the elicitation dialog.`,
|
||||
});
|
||||
}
|
||||
|
||||
// Include progress and raw result for debugging
|
||||
content.push({
|
||||
type: "text",
|
||||
text: `\nProgress:\n${statusMessages.join("\n")}\n\nRaw result: ${JSON.stringify(result, null, 2)}`,
|
||||
});
|
||||
|
||||
return { content };
|
||||
}
|
||||
);
|
||||
}
|
||||
};
|
||||
@@ -1,6 +1,5 @@
|
||||
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
|
||||
import { ElicitResultSchema } from "@modelcontextprotocol/sdk/types.js";
|
||||
import { CallToolResult } from "@modelcontextprotocol/sdk/types.js";
|
||||
import { ElicitResultSchema, CallToolResult } from "@modelcontextprotocol/sdk/types.js";
|
||||
|
||||
// Tool configuration
|
||||
const name = "trigger-elicitation-request";
|
||||
|
||||
211
src/everything/tools/trigger-sampling-request-async.ts
Normal file
211
src/everything/tools/trigger-sampling-request-async.ts
Normal file
@@ -0,0 +1,211 @@
|
||||
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
|
||||
import {
|
||||
CallToolResult,
|
||||
CreateMessageRequest,
|
||||
} from "@modelcontextprotocol/sdk/types.js";
|
||||
import { z } from "zod";
|
||||
|
||||
// Tool input schema
|
||||
const TriggerSamplingRequestAsyncSchema = z.object({
|
||||
prompt: z.string().describe("The prompt to send to the LLM"),
|
||||
maxTokens: z
|
||||
.number()
|
||||
.default(100)
|
||||
.describe("Maximum number of tokens to generate"),
|
||||
});
|
||||
|
||||
// Tool configuration
|
||||
const name = "trigger-sampling-request-async";
|
||||
const config = {
|
||||
title: "Trigger Async Sampling Request Tool",
|
||||
description:
|
||||
"Trigger an async sampling request that the CLIENT executes as a background task. " +
|
||||
"Demonstrates bidirectional MCP tasks where the server sends a request and the client " +
|
||||
"executes it asynchronously, allowing the server to poll for progress and results.",
|
||||
inputSchema: TriggerSamplingRequestAsyncSchema,
|
||||
};
|
||||
|
||||
// Poll interval in milliseconds
|
||||
const POLL_INTERVAL = 1000;
|
||||
|
||||
// Maximum poll attempts before timeout
|
||||
const MAX_POLL_ATTEMPTS = 60;
|
||||
|
||||
/**
|
||||
* Registers the 'trigger-sampling-request-async' tool.
|
||||
*
|
||||
* This tool demonstrates bidirectional MCP tasks:
|
||||
* - Server sends sampling request to client with task metadata
|
||||
* - Client creates a task and returns CreateTaskResult
|
||||
* - Server polls client's tasks/get endpoint for status
|
||||
* - Server fetches final result from client's tasks/result endpoint
|
||||
*
|
||||
* @param {McpServer} server - The McpServer instance where the tool will be registered.
|
||||
*/
|
||||
export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
|
||||
// Check client capabilities
|
||||
const clientCapabilities = server.server.getClientCapabilities() || {};
|
||||
|
||||
// Client must support sampling AND tasks.requests.sampling
|
||||
const clientSupportsSampling = clientCapabilities.sampling !== undefined;
|
||||
const clientTasksCapability = clientCapabilities.tasks as {
|
||||
requests?: { sampling?: { createMessage?: object } };
|
||||
} | undefined;
|
||||
const clientSupportsAsyncSampling =
|
||||
clientTasksCapability?.requests?.sampling?.createMessage !== undefined;
|
||||
|
||||
if (clientSupportsSampling && clientSupportsAsyncSampling) {
|
||||
server.registerTool(
|
||||
name,
|
||||
config,
|
||||
async (args, extra): Promise<CallToolResult> => {
|
||||
const validatedArgs = TriggerSamplingRequestAsyncSchema.parse(args);
|
||||
const { prompt, maxTokens } = validatedArgs;
|
||||
|
||||
// Create the sampling request WITH task metadata
|
||||
// The params.task field signals to the client that this should be executed as a task
|
||||
const request: CreateMessageRequest & { params: { task?: { ttl: number } } } = {
|
||||
method: "sampling/createMessage",
|
||||
params: {
|
||||
task: {
|
||||
ttl: 300000, // 5 minutes
|
||||
},
|
||||
messages: [
|
||||
{
|
||||
role: "user",
|
||||
content: {
|
||||
type: "text",
|
||||
text: `Resource ${name} context: ${prompt}`,
|
||||
},
|
||||
},
|
||||
],
|
||||
systemPrompt: "You are a helpful test server.",
|
||||
maxTokens,
|
||||
temperature: 0.7,
|
||||
},
|
||||
};
|
||||
|
||||
// Send the sampling request
|
||||
// Client may return either:
|
||||
// - CreateMessageResult (synchronous execution)
|
||||
// - CreateTaskResult (task-based execution with { task } object)
|
||||
const samplingResponse = await extra.sendRequest(
|
||||
request,
|
||||
z.union([
|
||||
// CreateTaskResult - client created a task
|
||||
z.object({
|
||||
task: z.object({
|
||||
taskId: z.string(),
|
||||
status: z.string(),
|
||||
pollInterval: z.number().optional(),
|
||||
statusMessage: z.string().optional(),
|
||||
}),
|
||||
}),
|
||||
// CreateMessageResult - synchronous execution
|
||||
z.object({
|
||||
role: z.string(),
|
||||
content: z.any(),
|
||||
model: z.string(),
|
||||
stopReason: z.string().optional(),
|
||||
}),
|
||||
])
|
||||
);
|
||||
|
||||
// Check if client returned CreateTaskResult (has task object)
|
||||
const isTaskResult = 'task' in samplingResponse && samplingResponse.task;
|
||||
if (!isTaskResult) {
|
||||
// Client executed synchronously - return the direct response
|
||||
return {
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: `[SYNC] Client executed synchronously:\n${JSON.stringify(samplingResponse, null, 2)}`,
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
const taskId = samplingResponse.task.taskId;
|
||||
const statusMessages: string[] = [];
|
||||
statusMessages.push(`Task created: ${taskId}`);
|
||||
|
||||
// Poll for task completion
|
||||
let attempts = 0;
|
||||
let taskStatus = samplingResponse.task.status;
|
||||
let taskStatusMessage: string | undefined;
|
||||
|
||||
while (
|
||||
taskStatus !== "completed" &&
|
||||
taskStatus !== "failed" &&
|
||||
taskStatus !== "cancelled" &&
|
||||
attempts < MAX_POLL_ATTEMPTS
|
||||
) {
|
||||
// Wait before polling
|
||||
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL));
|
||||
attempts++;
|
||||
|
||||
// Get task status from client
|
||||
const pollResult = await extra.sendRequest(
|
||||
{
|
||||
method: "tasks/get",
|
||||
params: { taskId },
|
||||
},
|
||||
z.object({
|
||||
status: z.string(),
|
||||
statusMessage: z.string().optional(),
|
||||
}).passthrough()
|
||||
);
|
||||
|
||||
taskStatus = pollResult.status;
|
||||
taskStatusMessage = pollResult.statusMessage;
|
||||
statusMessages.push(
|
||||
`Poll ${attempts}: ${taskStatus}${taskStatusMessage ? ` - ${taskStatusMessage}` : ""}`
|
||||
);
|
||||
}
|
||||
|
||||
// Check for timeout
|
||||
if (attempts >= MAX_POLL_ATTEMPTS) {
|
||||
return {
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: `[TIMEOUT] Task timed out after ${MAX_POLL_ATTEMPTS} poll attempts\n\nProgress:\n${statusMessages.join("\n")}`,
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
// Check for failure/cancellation
|
||||
if (taskStatus === "failed" || taskStatus === "cancelled") {
|
||||
return {
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: `[${taskStatus.toUpperCase()}] ${taskStatusMessage || "No message"}\n\nProgress:\n${statusMessages.join("\n")}`,
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
// Fetch the final result
|
||||
const result = await extra.sendRequest(
|
||||
{
|
||||
method: "tasks/result",
|
||||
params: { taskId },
|
||||
},
|
||||
z.any()
|
||||
);
|
||||
|
||||
// Return the result with status history
|
||||
return {
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: `[COMPLETED] Async sampling completed!\n\n**Progress:**\n${statusMessages.join("\n")}\n\n**Result:**\n${JSON.stringify(result, null, 2)}`,
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
);
|
||||
}
|
||||
};
|
||||
@@ -1,10 +1,37 @@
|
||||
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
|
||||
import { InMemoryEventStore } from "@modelcontextprotocol/sdk/examples/shared/inMemoryEventStore.js";
|
||||
import { StreamableHTTPServerTransport, EventStore } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
|
||||
import express, { Request, Response } from "express";
|
||||
import { createServer } from "../server/index.js";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import cors from "cors";
|
||||
|
||||
// Simple in-memory event store for SSE resumability
|
||||
class InMemoryEventStore implements EventStore {
|
||||
private events: Map<string, { streamId: string; message: unknown }> = new Map();
|
||||
|
||||
async storeEvent(streamId: string, message: unknown): Promise<string> {
|
||||
const eventId = randomUUID();
|
||||
this.events.set(eventId, { streamId, message });
|
||||
return eventId;
|
||||
}
|
||||
|
||||
async replayEventsAfter(
|
||||
lastEventId: string,
|
||||
{ send }: { send: (eventId: string, message: unknown) => Promise<void> }
|
||||
): Promise<string> {
|
||||
const entries = Array.from(this.events.entries());
|
||||
const startIndex = entries.findIndex(([id]) => id === lastEventId);
|
||||
if (startIndex === -1) return lastEventId;
|
||||
|
||||
let lastId: string = lastEventId;
|
||||
for (let i = startIndex + 1; i < entries.length; i++) {
|
||||
const [eventId, { message }] = entries[i];
|
||||
await send(eventId, message);
|
||||
lastId = eventId;
|
||||
}
|
||||
return lastId;
|
||||
}
|
||||
}
|
||||
|
||||
console.log("Starting Streamable HTTP server...");
|
||||
|
||||
// Express app with permissive CORS for testing with Inspector direct connect mode
|
||||
|
||||
Reference in New Issue
Block a user