mirror of
https://github.com/simstudioai/sim.git
synced 2026-02-09 14:14:57 -05:00
plugged in new mcp event based system and create sse route to publish notifs
This commit is contained in:
98
apps/sim/app/api/mcp/events/route.test.ts
Normal file
98
apps/sim/app/api/mcp/events/route.test.ts
Normal file
@@ -0,0 +1,98 @@
|
||||
/**
|
||||
* Tests for MCP SSE events endpoint
|
||||
*
|
||||
* @vitest-environment node
|
||||
*/
|
||||
import { createMockRequest, mockAuth, mockConsoleLogger } from '@sim/testing'
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
|
||||
mockConsoleLogger()
|
||||
const auth = mockAuth()
|
||||
|
||||
const mockGetUserEntityPermissions = vi.fn()
|
||||
vi.doMock('@/lib/workspaces/permissions/utils', () => ({
|
||||
getUserEntityPermissions: mockGetUserEntityPermissions,
|
||||
}))
|
||||
|
||||
vi.doMock('@/lib/mcp/connection-manager', () => ({
|
||||
mcpConnectionManager: null,
|
||||
}))
|
||||
|
||||
vi.doMock('@/lib/mcp/pubsub', () => ({
|
||||
mcpPubSub: null,
|
||||
}))
|
||||
|
||||
const { GET } = await import('./route')
|
||||
|
||||
describe('MCP Events SSE Endpoint', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
})
|
||||
|
||||
it('returns 401 when session is missing', async () => {
|
||||
auth.setUnauthenticated()
|
||||
|
||||
const request = createMockRequest(
|
||||
'GET',
|
||||
undefined,
|
||||
{},
|
||||
'http://localhost:3000/api/mcp/events?workspaceId=ws-123'
|
||||
)
|
||||
|
||||
const response = await GET(request as any)
|
||||
|
||||
expect(response.status).toBe(401)
|
||||
const text = await response.text()
|
||||
expect(text).toBe('Unauthorized')
|
||||
})
|
||||
|
||||
it('returns 400 when workspaceId is missing', async () => {
|
||||
auth.setAuthenticated()
|
||||
|
||||
const request = createMockRequest('GET', undefined, {}, 'http://localhost:3000/api/mcp/events')
|
||||
|
||||
const response = await GET(request as any)
|
||||
|
||||
expect(response.status).toBe(400)
|
||||
const text = await response.text()
|
||||
expect(text).toBe('Missing workspaceId query parameter')
|
||||
})
|
||||
|
||||
it('returns 403 when user lacks workspace access', async () => {
|
||||
auth.setAuthenticated()
|
||||
mockGetUserEntityPermissions.mockResolvedValue(null)
|
||||
|
||||
const request = createMockRequest(
|
||||
'GET',
|
||||
undefined,
|
||||
{},
|
||||
'http://localhost:3000/api/mcp/events?workspaceId=ws-123'
|
||||
)
|
||||
|
||||
const response = await GET(request as any)
|
||||
|
||||
expect(response.status).toBe(403)
|
||||
const text = await response.text()
|
||||
expect(text).toBe('Access denied to workspace')
|
||||
expect(mockGetUserEntityPermissions).toHaveBeenCalledWith('user-123', 'workspace', 'ws-123')
|
||||
})
|
||||
|
||||
it('returns SSE stream when authorized', async () => {
|
||||
auth.setAuthenticated()
|
||||
mockGetUserEntityPermissions.mockResolvedValue({ read: true })
|
||||
|
||||
const request = createMockRequest(
|
||||
'GET',
|
||||
undefined,
|
||||
{},
|
||||
'http://localhost:3000/api/mcp/events?workspaceId=ws-123'
|
||||
)
|
||||
|
||||
const response = await GET(request as any)
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(response.headers.get('Content-Type')).toBe('text/event-stream')
|
||||
expect(response.headers.get('Cache-Control')).toBe('no-cache')
|
||||
expect(response.headers.get('Connection')).toBe('keep-alive')
|
||||
})
|
||||
})
|
||||
111
apps/sim/app/api/mcp/events/route.ts
Normal file
111
apps/sim/app/api/mcp/events/route.ts
Normal file
@@ -0,0 +1,111 @@
|
||||
/**
|
||||
* SSE endpoint for MCP tool-change events.
|
||||
*
|
||||
* Pushes `tools_changed` events to the browser when:
|
||||
* - An external MCP server sends `notifications/tools/list_changed` (via connection manager)
|
||||
* - A workflow CRUD route modifies workflow MCP server tools (via pub/sub)
|
||||
*
|
||||
* Auth is handled via session cookies (EventSource sends cookies automatically).
|
||||
*/
|
||||
|
||||
import { createLogger } from '@sim/logger'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
||||
import { mcpConnectionManager } from '@/lib/mcp/connection-manager'
|
||||
import { mcpPubSub } from '@/lib/mcp/pubsub'
|
||||
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
|
||||
|
||||
const logger = createLogger('McpEventsSSE')
|
||||
|
||||
export const dynamic = 'force-dynamic'
|
||||
|
||||
const HEARTBEAT_INTERVAL_MS = 30_000
|
||||
|
||||
export async function GET(request: NextRequest) {
|
||||
const session = await getSession()
|
||||
if (!session?.user?.id) {
|
||||
return new Response('Unauthorized', { status: 401 })
|
||||
}
|
||||
|
||||
const { searchParams } = new URL(request.url)
|
||||
const workspaceId = searchParams.get('workspaceId')
|
||||
if (!workspaceId) {
|
||||
return new Response('Missing workspaceId query parameter', { status: 400 })
|
||||
}
|
||||
|
||||
const permissions = await getUserEntityPermissions(session.user.id, 'workspace', workspaceId)
|
||||
if (!permissions) {
|
||||
return new Response('Access denied to workspace', { status: 403 })
|
||||
}
|
||||
|
||||
const encoder = new TextEncoder()
|
||||
const unsubscribers: Array<() => void> = []
|
||||
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
const send = (eventName: string, data: Record<string, unknown>) => {
|
||||
try {
|
||||
controller.enqueue(
|
||||
encoder.encode(`event: ${eventName}\ndata: ${JSON.stringify(data)}\n\n`)
|
||||
)
|
||||
} catch {
|
||||
// Stream already closed
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe to external MCP server tool changes
|
||||
if (mcpConnectionManager) {
|
||||
const unsub = mcpConnectionManager.subscribe((event) => {
|
||||
if (event.workspaceId !== workspaceId) return
|
||||
send('tools_changed', {
|
||||
source: 'external',
|
||||
serverId: event.serverId,
|
||||
timestamp: event.timestamp,
|
||||
})
|
||||
})
|
||||
unsubscribers.push(unsub)
|
||||
}
|
||||
|
||||
// Subscribe to workflow CRUD tool changes
|
||||
if (mcpPubSub) {
|
||||
const unsub = mcpPubSub.onWorkflowToolsChanged((event) => {
|
||||
if (event.workspaceId !== workspaceId) return
|
||||
send('tools_changed', {
|
||||
source: 'workflow',
|
||||
serverId: event.serverId,
|
||||
timestamp: Date.now(),
|
||||
})
|
||||
})
|
||||
unsubscribers.push(unsub)
|
||||
}
|
||||
|
||||
// Heartbeat to keep the connection alive
|
||||
const heartbeat = setInterval(() => {
|
||||
try {
|
||||
controller.enqueue(encoder.encode(': heartbeat\n\n'))
|
||||
} catch {
|
||||
clearInterval(heartbeat)
|
||||
}
|
||||
}, HEARTBEAT_INTERVAL_MS)
|
||||
unsubscribers.push(() => clearInterval(heartbeat))
|
||||
|
||||
// Cleanup when client disconnects
|
||||
request.signal.addEventListener('abort', () => {
|
||||
for (const unsub of unsubscribers) {
|
||||
unsub()
|
||||
}
|
||||
try {
|
||||
controller.close()
|
||||
} catch {
|
||||
// Already closed
|
||||
}
|
||||
logger.info(`SSE connection closed for workspace ${workspaceId}`)
|
||||
})
|
||||
|
||||
logger.info(`SSE connection opened for workspace ${workspaceId}`)
|
||||
},
|
||||
})
|
||||
|
||||
return new Response(stream, { headers: SSE_HEADERS })
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { and, eq } from 'drizzle-orm'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
|
||||
import { mcpPubSub } from '@/lib/mcp/pubsub'
|
||||
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
|
||||
|
||||
const logger = createLogger('WorkflowMcpServerAPI')
|
||||
@@ -146,6 +147,8 @@ export const DELETE = withMcpAuth<RouteParams>('admin')(
|
||||
|
||||
logger.info(`[${requestId}] Successfully deleted workflow MCP server: ${serverId}`)
|
||||
|
||||
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
|
||||
|
||||
return createMcpSuccessResponse({ message: `Server ${serverId} deleted successfully` })
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Error deleting workflow MCP server:`, error)
|
||||
|
||||
@@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { and, eq } from 'drizzle-orm'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
|
||||
import { mcpPubSub } from '@/lib/mcp/pubsub'
|
||||
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
|
||||
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'
|
||||
|
||||
@@ -115,6 +116,8 @@ export const PATCH = withMcpAuth<RouteParams>('write')(
|
||||
|
||||
logger.info(`[${requestId}] Successfully updated tool ${toolId}`)
|
||||
|
||||
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
|
||||
|
||||
return createMcpSuccessResponse({ tool: updatedTool })
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Error updating tool:`, error)
|
||||
@@ -160,6 +163,8 @@ export const DELETE = withMcpAuth<RouteParams>('write')(
|
||||
|
||||
logger.info(`[${requestId}] Successfully deleted tool ${toolId}`)
|
||||
|
||||
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
|
||||
|
||||
return createMcpSuccessResponse({ message: `Tool ${toolId} deleted successfully` })
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Error deleting tool:`, error)
|
||||
|
||||
@@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { and, eq } from 'drizzle-orm'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
|
||||
import { mcpPubSub } from '@/lib/mcp/pubsub'
|
||||
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
|
||||
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'
|
||||
import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server'
|
||||
@@ -188,6 +189,8 @@ export const POST = withMcpAuth<RouteParams>('write')(
|
||||
`[${requestId}] Successfully added tool ${toolName} (workflow: ${body.workflowId}) to server ${serverId}`
|
||||
)
|
||||
|
||||
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
|
||||
|
||||
return createMcpSuccessResponse({ tool }, 201)
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Error adding tool:`, error)
|
||||
|
||||
@@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { eq, inArray, sql } from 'drizzle-orm'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
|
||||
import { mcpPubSub } from '@/lib/mcp/pubsub'
|
||||
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
|
||||
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'
|
||||
import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server'
|
||||
@@ -174,6 +175,10 @@ export const POST = withMcpAuth('write')(
|
||||
`[${requestId}] Added ${addedTools.length} tools to server ${serverId}:`,
|
||||
addedTools.map((t) => t.toolName)
|
||||
)
|
||||
|
||||
if (addedTools.length > 0) {
|
||||
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(
|
||||
|
||||
@@ -62,7 +62,12 @@ import {
|
||||
type CustomTool as CustomToolDefinition,
|
||||
useCustomTools,
|
||||
} from '@/hooks/queries/custom-tools'
|
||||
import { useForceRefreshMcpTools, useMcpServers, useStoredMcpTools } from '@/hooks/queries/mcp'
|
||||
import {
|
||||
useForceRefreshMcpTools,
|
||||
useMcpServers,
|
||||
useMcpToolsEvents,
|
||||
useStoredMcpTools,
|
||||
} from '@/hooks/queries/mcp'
|
||||
import {
|
||||
useChildDeploymentStatus,
|
||||
useDeployChildWorkflow,
|
||||
@@ -1035,6 +1040,7 @@ export const ToolInput = memo(function ToolInput({
|
||||
const { data: mcpServers = [], isLoading: mcpServersLoading } = useMcpServers(workspaceId)
|
||||
const { data: storedMcpTools = [] } = useStoredMcpTools(workspaceId)
|
||||
const forceRefreshMcpTools = useForceRefreshMcpTools()
|
||||
useMcpToolsEvents(workspaceId)
|
||||
const openSettingsModal = useSettingsModalStore((state) => state.openModal)
|
||||
const mcpDataLoading = mcpLoading || mcpServersLoading
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { useEffect } from 'react'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { keepPreviousData, useMutation, useQuery, useQueryClient } from '@tanstack/react-query'
|
||||
import { sanitizeForHttp, sanitizeHeaders } from '@/lib/mcp/shared'
|
||||
@@ -359,3 +360,65 @@ export function useStoredMcpTools(workspaceId: string) {
|
||||
staleTime: 60 * 1000,
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Shared EventSource connections keyed by workspaceId.
|
||||
* Reference-counted so the connection is closed when the last consumer unmounts.
|
||||
* Attached to `globalThis` so connections survive HMR in development.
|
||||
*/
|
||||
const SSE_KEY = '__mcp_sse_connections' as const
|
||||
|
||||
type SseEntry = { source: EventSource; refs: number }
|
||||
|
||||
const sseConnections: Map<string, SseEntry> =
|
||||
((globalThis as Record<string, unknown>)[SSE_KEY] as Map<string, SseEntry>) ??
|
||||
((globalThis as Record<string, unknown>)[SSE_KEY] = new Map<string, SseEntry>())
|
||||
|
||||
/**
|
||||
* Subscribe to MCP tool-change SSE events for a workspace.
|
||||
* On each `tools_changed` event, invalidates the relevant React Query caches
|
||||
* so the UI refreshes automatically.
|
||||
*/
|
||||
export function useMcpToolsEvents(workspaceId: string) {
|
||||
const queryClient = useQueryClient()
|
||||
|
||||
useEffect(() => {
|
||||
if (!workspaceId) return
|
||||
|
||||
const invalidate = () => {
|
||||
queryClient.invalidateQueries({ queryKey: mcpKeys.tools(workspaceId) })
|
||||
queryClient.invalidateQueries({ queryKey: mcpKeys.servers(workspaceId) })
|
||||
queryClient.invalidateQueries({ queryKey: mcpKeys.storedTools(workspaceId) })
|
||||
}
|
||||
|
||||
let entry = sseConnections.get(workspaceId)
|
||||
|
||||
if (!entry) {
|
||||
const source = new EventSource(`/api/mcp/events?workspaceId=${workspaceId}`)
|
||||
|
||||
source.addEventListener('tools_changed', () => {
|
||||
invalidate()
|
||||
})
|
||||
|
||||
source.onerror = () => {
|
||||
logger.warn(`SSE connection error for workspace ${workspaceId}`)
|
||||
}
|
||||
|
||||
entry = { source, refs: 0 }
|
||||
sseConnections.set(workspaceId, entry)
|
||||
}
|
||||
|
||||
entry.refs++
|
||||
|
||||
return () => {
|
||||
const current = sseConnections.get(workspaceId)
|
||||
if (!current) return
|
||||
|
||||
current.refs--
|
||||
if (current.refs <= 0) {
|
||||
current.source.close()
|
||||
sseConnections.delete(workspaceId)
|
||||
}
|
||||
}
|
||||
}, [workspaceId, queryClient])
|
||||
}
|
||||
|
||||
@@ -15,18 +15,13 @@ import { EventEmitter } from 'events'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import Redis from 'ioredis'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import type { ToolsChangedEvent } from '@/lib/mcp/types'
|
||||
import type { ToolsChangedEvent, WorkflowToolsChangedEvent } from '@/lib/mcp/types'
|
||||
|
||||
const logger = createLogger('McpPubSub')
|
||||
|
||||
const CHANNEL_TOOLS_CHANGED = 'mcp:tools_changed'
|
||||
const CHANNEL_WORKFLOW_TOOLS_CHANGED = 'mcp:workflow_tools_changed'
|
||||
|
||||
export interface WorkflowToolsChangedEvent {
|
||||
serverId: string
|
||||
workspaceId: string
|
||||
}
|
||||
|
||||
type ToolsChangedHandler = (event: ToolsChangedEvent) => void
|
||||
type WorkflowToolsChangedHandler = (event: WorkflowToolsChangedEvent) => void
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import { and, eq, isNull } from 'drizzle-orm'
|
||||
import { isTest } from '@/lib/core/config/feature-flags'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { McpClient } from '@/lib/mcp/client'
|
||||
import { mcpConnectionManager } from '@/lib/mcp/connection-manager'
|
||||
import { resolveMcpConfigEnvVars } from '@/lib/mcp/resolve-config'
|
||||
import {
|
||||
createMcpCacheAdapter,
|
||||
@@ -31,16 +32,24 @@ const logger = createLogger('McpService')
|
||||
class McpService {
|
||||
private cacheAdapter: McpCacheStorageAdapter
|
||||
private readonly cacheTimeout = MCP_CONSTANTS.CACHE_TIMEOUT
|
||||
private unsubscribeConnectionManager?: () => void
|
||||
|
||||
constructor() {
|
||||
this.cacheAdapter = createMcpCacheAdapter()
|
||||
logger.info(`MCP Service initialized with ${getMcpCacheType()} cache`)
|
||||
|
||||
if (mcpConnectionManager) {
|
||||
this.unsubscribeConnectionManager = mcpConnectionManager.subscribe((event) => {
|
||||
this.clearCache(event.workspaceId)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispose of the service and cleanup resources
|
||||
*/
|
||||
dispose(): void {
|
||||
this.unsubscribeConnectionManager?.()
|
||||
this.cacheAdapter.dispose()
|
||||
logger.info('MCP Service disposed')
|
||||
}
|
||||
@@ -328,7 +337,7 @@ class McpService {
|
||||
logger.debug(
|
||||
`[${requestId}] Discovered ${tools.length} tools from server ${config.name}`
|
||||
)
|
||||
return { serverId: config.id, tools }
|
||||
return { serverId: config.id, tools, resolvedConfig }
|
||||
} finally {
|
||||
await client.disconnect()
|
||||
}
|
||||
@@ -364,6 +373,21 @@ class McpService {
|
||||
logger.error(`[${requestId}] Error updating server statuses:`, err)
|
||||
})
|
||||
|
||||
// Fire-and-forget persistent connections for servers that support listChanged
|
||||
if (mcpConnectionManager) {
|
||||
for (const [index, result] of results.entries()) {
|
||||
if (result.status === 'fulfilled') {
|
||||
const { resolvedConfig } = result.value
|
||||
mcpConnectionManager.connect(resolvedConfig, userId, workspaceId).catch((err) => {
|
||||
logger.warn(
|
||||
`[${requestId}] Persistent connection failed for ${servers[index].name}:`,
|
||||
err
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (failedCount === 0) {
|
||||
try {
|
||||
await this.cacheAdapter.set(cacheKey, allTools, this.cacheTimeout)
|
||||
|
||||
@@ -185,6 +185,14 @@ export interface ManagedConnectionState {
|
||||
lastActivity: number
|
||||
}
|
||||
|
||||
/**
|
||||
* Event emitted when workflow CRUD modifies a workflow MCP server's tools.
|
||||
*/
|
||||
export interface WorkflowToolsChangedEvent {
|
||||
serverId: string
|
||||
workspaceId: string
|
||||
}
|
||||
|
||||
export interface McpApiResponse<T = unknown> {
|
||||
success: boolean
|
||||
data?: T
|
||||
|
||||
Reference in New Issue
Block a user