diff --git a/apps/sim/app/api/mcp/events/route.test.ts b/apps/sim/app/api/mcp/events/route.test.ts new file mode 100644 index 000000000..f3db4d575 --- /dev/null +++ b/apps/sim/app/api/mcp/events/route.test.ts @@ -0,0 +1,98 @@ +/** + * Tests for MCP SSE events endpoint + * + * @vitest-environment node + */ +import { createMockRequest, mockAuth, mockConsoleLogger } from '@sim/testing' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +mockConsoleLogger() +const auth = mockAuth() + +const mockGetUserEntityPermissions = vi.fn() +vi.doMock('@/lib/workspaces/permissions/utils', () => ({ + getUserEntityPermissions: mockGetUserEntityPermissions, +})) + +vi.doMock('@/lib/mcp/connection-manager', () => ({ + mcpConnectionManager: null, +})) + +vi.doMock('@/lib/mcp/pubsub', () => ({ + mcpPubSub: null, +})) + +const { GET } = await import('./route') + +describe('MCP Events SSE Endpoint', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('returns 401 when session is missing', async () => { + auth.setUnauthenticated() + + const request = createMockRequest( + 'GET', + undefined, + {}, + 'http://localhost:3000/api/mcp/events?workspaceId=ws-123' + ) + + const response = await GET(request as any) + + expect(response.status).toBe(401) + const text = await response.text() + expect(text).toBe('Unauthorized') + }) + + it('returns 400 when workspaceId is missing', async () => { + auth.setAuthenticated() + + const request = createMockRequest('GET', undefined, {}, 'http://localhost:3000/api/mcp/events') + + const response = await GET(request as any) + + expect(response.status).toBe(400) + const text = await response.text() + expect(text).toBe('Missing workspaceId query parameter') + }) + + it('returns 403 when user lacks workspace access', async () => { + auth.setAuthenticated() + mockGetUserEntityPermissions.mockResolvedValue(null) + + const request = createMockRequest( + 'GET', + undefined, + {}, + 'http://localhost:3000/api/mcp/events?workspaceId=ws-123' + ) + + const response = await GET(request as any) + + expect(response.status).toBe(403) + const text = await response.text() + expect(text).toBe('Access denied to workspace') + expect(mockGetUserEntityPermissions).toHaveBeenCalledWith('user-123', 'workspace', 'ws-123') + }) + + it('returns SSE stream when authorized', async () => { + auth.setAuthenticated() + mockGetUserEntityPermissions.mockResolvedValue({ read: true }) + + const request = createMockRequest( + 'GET', + undefined, + {}, + 'http://localhost:3000/api/mcp/events?workspaceId=ws-123' + ) + + const response = await GET(request as any) + + expect(response.status).toBe(200) + expect(response.headers.get('Content-Type')).toBe('text/event-stream') + expect(response.headers.get('Cache-Control')).toBe('no-cache') + expect(response.headers.get('Connection')).toBe('keep-alive') + }) +}) diff --git a/apps/sim/app/api/mcp/events/route.ts b/apps/sim/app/api/mcp/events/route.ts new file mode 100644 index 000000000..6df91db5c --- /dev/null +++ b/apps/sim/app/api/mcp/events/route.ts @@ -0,0 +1,111 @@ +/** + * SSE endpoint for MCP tool-change events. + * + * Pushes `tools_changed` events to the browser when: + * - An external MCP server sends `notifications/tools/list_changed` (via connection manager) + * - A workflow CRUD route modifies workflow MCP server tools (via pub/sub) + * + * Auth is handled via session cookies (EventSource sends cookies automatically). + */ + +import { createLogger } from '@sim/logger' +import type { NextRequest } from 'next/server' +import { getSession } from '@/lib/auth' +import { SSE_HEADERS } from '@/lib/core/utils/sse' +import { mcpConnectionManager } from '@/lib/mcp/connection-manager' +import { mcpPubSub } from '@/lib/mcp/pubsub' +import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' + +const logger = createLogger('McpEventsSSE') + +export const dynamic = 'force-dynamic' + +const HEARTBEAT_INTERVAL_MS = 30_000 + +export async function GET(request: NextRequest) { + const session = await getSession() + if (!session?.user?.id) { + return new Response('Unauthorized', { status: 401 }) + } + + const { searchParams } = new URL(request.url) + const workspaceId = searchParams.get('workspaceId') + if (!workspaceId) { + return new Response('Missing workspaceId query parameter', { status: 400 }) + } + + const permissions = await getUserEntityPermissions(session.user.id, 'workspace', workspaceId) + if (!permissions) { + return new Response('Access denied to workspace', { status: 403 }) + } + + const encoder = new TextEncoder() + const unsubscribers: Array<() => void> = [] + + const stream = new ReadableStream({ + start(controller) { + const send = (eventName: string, data: Record) => { + try { + controller.enqueue( + encoder.encode(`event: ${eventName}\ndata: ${JSON.stringify(data)}\n\n`) + ) + } catch { + // Stream already closed + } + } + + // Subscribe to external MCP server tool changes + if (mcpConnectionManager) { + const unsub = mcpConnectionManager.subscribe((event) => { + if (event.workspaceId !== workspaceId) return + send('tools_changed', { + source: 'external', + serverId: event.serverId, + timestamp: event.timestamp, + }) + }) + unsubscribers.push(unsub) + } + + // Subscribe to workflow CRUD tool changes + if (mcpPubSub) { + const unsub = mcpPubSub.onWorkflowToolsChanged((event) => { + if (event.workspaceId !== workspaceId) return + send('tools_changed', { + source: 'workflow', + serverId: event.serverId, + timestamp: Date.now(), + }) + }) + unsubscribers.push(unsub) + } + + // Heartbeat to keep the connection alive + const heartbeat = setInterval(() => { + try { + controller.enqueue(encoder.encode(': heartbeat\n\n')) + } catch { + clearInterval(heartbeat) + } + }, HEARTBEAT_INTERVAL_MS) + unsubscribers.push(() => clearInterval(heartbeat)) + + // Cleanup when client disconnects + request.signal.addEventListener('abort', () => { + for (const unsub of unsubscribers) { + unsub() + } + try { + controller.close() + } catch { + // Already closed + } + logger.info(`SSE connection closed for workspace ${workspaceId}`) + }) + + logger.info(`SSE connection opened for workspace ${workspaceId}`) + }, + }) + + return new Response(stream, { headers: SSE_HEADERS }) +} diff --git a/apps/sim/app/api/mcp/workflow-servers/[id]/route.ts b/apps/sim/app/api/mcp/workflow-servers/[id]/route.ts index 3ce0e0045..e0a1f085e 100644 --- a/apps/sim/app/api/mcp/workflow-servers/[id]/route.ts +++ b/apps/sim/app/api/mcp/workflow-servers/[id]/route.ts @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger' import { and, eq } from 'drizzle-orm' import type { NextRequest } from 'next/server' import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware' +import { mcpPubSub } from '@/lib/mcp/pubsub' import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils' const logger = createLogger('WorkflowMcpServerAPI') @@ -146,6 +147,8 @@ export const DELETE = withMcpAuth('admin')( logger.info(`[${requestId}] Successfully deleted workflow MCP server: ${serverId}`) + mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId }) + return createMcpSuccessResponse({ message: `Server ${serverId} deleted successfully` }) } catch (error) { logger.error(`[${requestId}] Error deleting workflow MCP server:`, error) diff --git a/apps/sim/app/api/mcp/workflow-servers/[id]/tools/[toolId]/route.ts b/apps/sim/app/api/mcp/workflow-servers/[id]/tools/[toolId]/route.ts index d7fd53259..87113b868 100644 --- a/apps/sim/app/api/mcp/workflow-servers/[id]/tools/[toolId]/route.ts +++ b/apps/sim/app/api/mcp/workflow-servers/[id]/tools/[toolId]/route.ts @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger' import { and, eq } from 'drizzle-orm' import type { NextRequest } from 'next/server' import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware' +import { mcpPubSub } from '@/lib/mcp/pubsub' import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils' import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema' @@ -115,6 +116,8 @@ export const PATCH = withMcpAuth('write')( logger.info(`[${requestId}] Successfully updated tool ${toolId}`) + mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId }) + return createMcpSuccessResponse({ tool: updatedTool }) } catch (error) { logger.error(`[${requestId}] Error updating tool:`, error) @@ -160,6 +163,8 @@ export const DELETE = withMcpAuth('write')( logger.info(`[${requestId}] Successfully deleted tool ${toolId}`) + mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId }) + return createMcpSuccessResponse({ message: `Tool ${toolId} deleted successfully` }) } catch (error) { logger.error(`[${requestId}] Error deleting tool:`, error) diff --git a/apps/sim/app/api/mcp/workflow-servers/[id]/tools/route.ts b/apps/sim/app/api/mcp/workflow-servers/[id]/tools/route.ts index b2cef8ee5..6705d5298 100644 --- a/apps/sim/app/api/mcp/workflow-servers/[id]/tools/route.ts +++ b/apps/sim/app/api/mcp/workflow-servers/[id]/tools/route.ts @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger' import { and, eq } from 'drizzle-orm' import type { NextRequest } from 'next/server' import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware' +import { mcpPubSub } from '@/lib/mcp/pubsub' import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils' import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema' import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server' @@ -188,6 +189,8 @@ export const POST = withMcpAuth('write')( `[${requestId}] Successfully added tool ${toolName} (workflow: ${body.workflowId}) to server ${serverId}` ) + mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId }) + return createMcpSuccessResponse({ tool }, 201) } catch (error) { logger.error(`[${requestId}] Error adding tool:`, error) diff --git a/apps/sim/app/api/mcp/workflow-servers/route.ts b/apps/sim/app/api/mcp/workflow-servers/route.ts index e2900f5a8..1779e51a9 100644 --- a/apps/sim/app/api/mcp/workflow-servers/route.ts +++ b/apps/sim/app/api/mcp/workflow-servers/route.ts @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger' import { eq, inArray, sql } from 'drizzle-orm' import type { NextRequest } from 'next/server' import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware' +import { mcpPubSub } from '@/lib/mcp/pubsub' import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils' import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema' import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server' @@ -174,6 +175,10 @@ export const POST = withMcpAuth('write')( `[${requestId}] Added ${addedTools.length} tools to server ${serverId}:`, addedTools.map((t) => t.toolName) ) + + if (addedTools.length > 0) { + mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId }) + } } logger.info( diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/tool-input/tool-input.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/tool-input/tool-input.tsx index 8f03f4b2e..9990c3eeb 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/tool-input/tool-input.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/tool-input/tool-input.tsx @@ -62,7 +62,12 @@ import { type CustomTool as CustomToolDefinition, useCustomTools, } from '@/hooks/queries/custom-tools' -import { useForceRefreshMcpTools, useMcpServers, useStoredMcpTools } from '@/hooks/queries/mcp' +import { + useForceRefreshMcpTools, + useMcpServers, + useMcpToolsEvents, + useStoredMcpTools, +} from '@/hooks/queries/mcp' import { useChildDeploymentStatus, useDeployChildWorkflow, @@ -1035,6 +1040,7 @@ export const ToolInput = memo(function ToolInput({ const { data: mcpServers = [], isLoading: mcpServersLoading } = useMcpServers(workspaceId) const { data: storedMcpTools = [] } = useStoredMcpTools(workspaceId) const forceRefreshMcpTools = useForceRefreshMcpTools() + useMcpToolsEvents(workspaceId) const openSettingsModal = useSettingsModalStore((state) => state.openModal) const mcpDataLoading = mcpLoading || mcpServersLoading diff --git a/apps/sim/hooks/queries/mcp.ts b/apps/sim/hooks/queries/mcp.ts index 5ef4170d3..607cb5e1e 100644 --- a/apps/sim/hooks/queries/mcp.ts +++ b/apps/sim/hooks/queries/mcp.ts @@ -1,3 +1,4 @@ +import { useEffect } from 'react' import { createLogger } from '@sim/logger' import { keepPreviousData, useMutation, useQuery, useQueryClient } from '@tanstack/react-query' import { sanitizeForHttp, sanitizeHeaders } from '@/lib/mcp/shared' @@ -359,3 +360,65 @@ export function useStoredMcpTools(workspaceId: string) { staleTime: 60 * 1000, }) } + +/** + * Shared EventSource connections keyed by workspaceId. + * Reference-counted so the connection is closed when the last consumer unmounts. + * Attached to `globalThis` so connections survive HMR in development. + */ +const SSE_KEY = '__mcp_sse_connections' as const + +type SseEntry = { source: EventSource; refs: number } + +const sseConnections: Map = + ((globalThis as Record)[SSE_KEY] as Map) ?? + ((globalThis as Record)[SSE_KEY] = new Map()) + +/** + * Subscribe to MCP tool-change SSE events for a workspace. + * On each `tools_changed` event, invalidates the relevant React Query caches + * so the UI refreshes automatically. + */ +export function useMcpToolsEvents(workspaceId: string) { + const queryClient = useQueryClient() + + useEffect(() => { + if (!workspaceId) return + + const invalidate = () => { + queryClient.invalidateQueries({ queryKey: mcpKeys.tools(workspaceId) }) + queryClient.invalidateQueries({ queryKey: mcpKeys.servers(workspaceId) }) + queryClient.invalidateQueries({ queryKey: mcpKeys.storedTools(workspaceId) }) + } + + let entry = sseConnections.get(workspaceId) + + if (!entry) { + const source = new EventSource(`/api/mcp/events?workspaceId=${workspaceId}`) + + source.addEventListener('tools_changed', () => { + invalidate() + }) + + source.onerror = () => { + logger.warn(`SSE connection error for workspace ${workspaceId}`) + } + + entry = { source, refs: 0 } + sseConnections.set(workspaceId, entry) + } + + entry.refs++ + + return () => { + const current = sseConnections.get(workspaceId) + if (!current) return + + current.refs-- + if (current.refs <= 0) { + current.source.close() + sseConnections.delete(workspaceId) + } + } + }, [workspaceId, queryClient]) +} diff --git a/apps/sim/lib/mcp/pubsub.ts b/apps/sim/lib/mcp/pubsub.ts index b7e6097c6..2db2bb337 100644 --- a/apps/sim/lib/mcp/pubsub.ts +++ b/apps/sim/lib/mcp/pubsub.ts @@ -15,18 +15,13 @@ import { EventEmitter } from 'events' import { createLogger } from '@sim/logger' import Redis from 'ioredis' import { env } from '@/lib/core/config/env' -import type { ToolsChangedEvent } from '@/lib/mcp/types' +import type { ToolsChangedEvent, WorkflowToolsChangedEvent } from '@/lib/mcp/types' const logger = createLogger('McpPubSub') const CHANNEL_TOOLS_CHANGED = 'mcp:tools_changed' const CHANNEL_WORKFLOW_TOOLS_CHANGED = 'mcp:workflow_tools_changed' -export interface WorkflowToolsChangedEvent { - serverId: string - workspaceId: string -} - type ToolsChangedHandler = (event: ToolsChangedEvent) => void type WorkflowToolsChangedHandler = (event: WorkflowToolsChangedEvent) => void diff --git a/apps/sim/lib/mcp/service.ts b/apps/sim/lib/mcp/service.ts index 64001b50c..e38cfb3f0 100644 --- a/apps/sim/lib/mcp/service.ts +++ b/apps/sim/lib/mcp/service.ts @@ -9,6 +9,7 @@ import { and, eq, isNull } from 'drizzle-orm' import { isTest } from '@/lib/core/config/feature-flags' import { generateRequestId } from '@/lib/core/utils/request' import { McpClient } from '@/lib/mcp/client' +import { mcpConnectionManager } from '@/lib/mcp/connection-manager' import { resolveMcpConfigEnvVars } from '@/lib/mcp/resolve-config' import { createMcpCacheAdapter, @@ -31,16 +32,24 @@ const logger = createLogger('McpService') class McpService { private cacheAdapter: McpCacheStorageAdapter private readonly cacheTimeout = MCP_CONSTANTS.CACHE_TIMEOUT + private unsubscribeConnectionManager?: () => void constructor() { this.cacheAdapter = createMcpCacheAdapter() logger.info(`MCP Service initialized with ${getMcpCacheType()} cache`) + + if (mcpConnectionManager) { + this.unsubscribeConnectionManager = mcpConnectionManager.subscribe((event) => { + this.clearCache(event.workspaceId) + }) + } } /** * Dispose of the service and cleanup resources */ dispose(): void { + this.unsubscribeConnectionManager?.() this.cacheAdapter.dispose() logger.info('MCP Service disposed') } @@ -328,7 +337,7 @@ class McpService { logger.debug( `[${requestId}] Discovered ${tools.length} tools from server ${config.name}` ) - return { serverId: config.id, tools } + return { serverId: config.id, tools, resolvedConfig } } finally { await client.disconnect() } @@ -364,6 +373,21 @@ class McpService { logger.error(`[${requestId}] Error updating server statuses:`, err) }) + // Fire-and-forget persistent connections for servers that support listChanged + if (mcpConnectionManager) { + for (const [index, result] of results.entries()) { + if (result.status === 'fulfilled') { + const { resolvedConfig } = result.value + mcpConnectionManager.connect(resolvedConfig, userId, workspaceId).catch((err) => { + logger.warn( + `[${requestId}] Persistent connection failed for ${servers[index].name}:`, + err + ) + }) + } + } + } + if (failedCount === 0) { try { await this.cacheAdapter.set(cacheKey, allTools, this.cacheTimeout) diff --git a/apps/sim/lib/mcp/types.ts b/apps/sim/lib/mcp/types.ts index a0a2848df..b7e0d838e 100644 --- a/apps/sim/lib/mcp/types.ts +++ b/apps/sim/lib/mcp/types.ts @@ -185,6 +185,14 @@ export interface ManagedConnectionState { lastActivity: number } +/** + * Event emitted when workflow CRUD modifies a workflow MCP server's tools. + */ +export interface WorkflowToolsChangedEvent { + serverId: string + workspaceId: string +} + export interface McpApiResponse { success: boolean data?: T