fix(mcp): harden notification system against race conditions (#3168)

* fix(mcp): harden notification system against race conditions

- Guard concurrent connect() calls in connection manager with connectingServers Set
- Suppress post-disconnect notification handler firing in MCP client
- Clean up Redis event listeners in pub/sub dispose()
- Add tests for all three hardening fixes (11 new tests)

* updated tests

* plugged in new mcp event based system and create sse route to publish notifs

* ack commetns

* fix reconnect timer

* cleanup when running onClose

* fixed spacing on mcp settings tab

* keep error listeners before quiet in redis
This commit is contained in:
Waleed
2026-02-09 19:36:01 -08:00
committed by GitHub
parent 190f12fd77
commit 8b4b3af120
17 changed files with 1421 additions and 23 deletions

View File

@@ -0,0 +1,98 @@
/**
* Tests for MCP SSE events endpoint
*
* @vitest-environment node
*/
import { createMockRequest, mockAuth, mockConsoleLogger } from '@sim/testing'
import { beforeEach, describe, expect, it, vi } from 'vitest'
mockConsoleLogger()
const auth = mockAuth()
const mockGetUserEntityPermissions = vi.fn()
vi.doMock('@/lib/workspaces/permissions/utils', () => ({
getUserEntityPermissions: mockGetUserEntityPermissions,
}))
vi.doMock('@/lib/mcp/connection-manager', () => ({
mcpConnectionManager: null,
}))
vi.doMock('@/lib/mcp/pubsub', () => ({
mcpPubSub: null,
}))
const { GET } = await import('./route')
describe('MCP Events SSE Endpoint', () => {
beforeEach(() => {
vi.clearAllMocks()
})
it('returns 401 when session is missing', async () => {
auth.setUnauthenticated()
const request = createMockRequest(
'GET',
undefined,
{},
'http://localhost:3000/api/mcp/events?workspaceId=ws-123'
)
const response = await GET(request as any)
expect(response.status).toBe(401)
const text = await response.text()
expect(text).toBe('Unauthorized')
})
it('returns 400 when workspaceId is missing', async () => {
auth.setAuthenticated()
const request = createMockRequest('GET', undefined, {}, 'http://localhost:3000/api/mcp/events')
const response = await GET(request as any)
expect(response.status).toBe(400)
const text = await response.text()
expect(text).toBe('Missing workspaceId query parameter')
})
it('returns 403 when user lacks workspace access', async () => {
auth.setAuthenticated()
mockGetUserEntityPermissions.mockResolvedValue(null)
const request = createMockRequest(
'GET',
undefined,
{},
'http://localhost:3000/api/mcp/events?workspaceId=ws-123'
)
const response = await GET(request as any)
expect(response.status).toBe(403)
const text = await response.text()
expect(text).toBe('Access denied to workspace')
expect(mockGetUserEntityPermissions).toHaveBeenCalledWith('user-123', 'workspace', 'ws-123')
})
it('returns SSE stream when authorized', async () => {
auth.setAuthenticated()
mockGetUserEntityPermissions.mockResolvedValue({ read: true })
const request = createMockRequest(
'GET',
undefined,
{},
'http://localhost:3000/api/mcp/events?workspaceId=ws-123'
)
const response = await GET(request as any)
expect(response.status).toBe(200)
expect(response.headers.get('Content-Type')).toBe('text/event-stream')
expect(response.headers.get('Cache-Control')).toBe('no-cache')
expect(response.headers.get('Connection')).toBe('keep-alive')
})
})

View File

@@ -0,0 +1,111 @@
/**
* SSE endpoint for MCP tool-change events.
*
* Pushes `tools_changed` events to the browser when:
* - An external MCP server sends `notifications/tools/list_changed` (via connection manager)
* - A workflow CRUD route modifies workflow MCP server tools (via pub/sub)
*
* Auth is handled via session cookies (EventSource sends cookies automatically).
*/
import { createLogger } from '@sim/logger'
import type { NextRequest } from 'next/server'
import { getSession } from '@/lib/auth'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { mcpConnectionManager } from '@/lib/mcp/connection-manager'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
const logger = createLogger('McpEventsSSE')
export const dynamic = 'force-dynamic'
const HEARTBEAT_INTERVAL_MS = 30_000
export async function GET(request: NextRequest) {
const session = await getSession()
if (!session?.user?.id) {
return new Response('Unauthorized', { status: 401 })
}
const { searchParams } = new URL(request.url)
const workspaceId = searchParams.get('workspaceId')
if (!workspaceId) {
return new Response('Missing workspaceId query parameter', { status: 400 })
}
const permissions = await getUserEntityPermissions(session.user.id, 'workspace', workspaceId)
if (!permissions) {
return new Response('Access denied to workspace', { status: 403 })
}
const encoder = new TextEncoder()
const unsubscribers: Array<() => void> = []
const stream = new ReadableStream({
start(controller) {
const send = (eventName: string, data: Record<string, unknown>) => {
try {
controller.enqueue(
encoder.encode(`event: ${eventName}\ndata: ${JSON.stringify(data)}\n\n`)
)
} catch {
// Stream already closed
}
}
// Subscribe to external MCP server tool changes
if (mcpConnectionManager) {
const unsub = mcpConnectionManager.subscribe((event) => {
if (event.workspaceId !== workspaceId) return
send('tools_changed', {
source: 'external',
serverId: event.serverId,
timestamp: event.timestamp,
})
})
unsubscribers.push(unsub)
}
// Subscribe to workflow CRUD tool changes
if (mcpPubSub) {
const unsub = mcpPubSub.onWorkflowToolsChanged((event) => {
if (event.workspaceId !== workspaceId) return
send('tools_changed', {
source: 'workflow',
serverId: event.serverId,
timestamp: Date.now(),
})
})
unsubscribers.push(unsub)
}
// Heartbeat to keep the connection alive
const heartbeat = setInterval(() => {
try {
controller.enqueue(encoder.encode(': heartbeat\n\n'))
} catch {
clearInterval(heartbeat)
}
}, HEARTBEAT_INTERVAL_MS)
unsubscribers.push(() => clearInterval(heartbeat))
// Cleanup when client disconnects
request.signal.addEventListener('abort', () => {
for (const unsub of unsubscribers) {
unsub()
}
try {
controller.close()
} catch {
// Already closed
}
logger.info(`SSE connection closed for workspace ${workspaceId}`)
})
logger.info(`SSE connection opened for workspace ${workspaceId}`)
},
})
return new Response(stream, { headers: SSE_HEADERS })
}

View File

@@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
const logger = createLogger('WorkflowMcpServerAPI')
@@ -146,6 +147,8 @@ export const DELETE = withMcpAuth<RouteParams>('admin')(
logger.info(`[${requestId}] Successfully deleted workflow MCP server: ${serverId}`)
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
return createMcpSuccessResponse({ message: `Server ${serverId} deleted successfully` })
} catch (error) {
logger.error(`[${requestId}] Error deleting workflow MCP server:`, error)

View File

@@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'
@@ -115,6 +116,8 @@ export const PATCH = withMcpAuth<RouteParams>('write')(
logger.info(`[${requestId}] Successfully updated tool ${toolId}`)
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
return createMcpSuccessResponse({ tool: updatedTool })
} catch (error) {
logger.error(`[${requestId}] Error updating tool:`, error)
@@ -160,6 +163,8 @@ export const DELETE = withMcpAuth<RouteParams>('write')(
logger.info(`[${requestId}] Successfully deleted tool ${toolId}`)
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
return createMcpSuccessResponse({ message: `Tool ${toolId} deleted successfully` })
} catch (error) {
logger.error(`[${requestId}] Error deleting tool:`, error)

View File

@@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'
import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server'
@@ -188,6 +189,8 @@ export const POST = withMcpAuth<RouteParams>('write')(
`[${requestId}] Successfully added tool ${toolName} (workflow: ${body.workflowId}) to server ${serverId}`
)
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
return createMcpSuccessResponse({ tool }, 201)
} catch (error) {
logger.error(`[${requestId}] Error adding tool:`, error)

View File

@@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
import { eq, inArray, sql } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'
import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server'
@@ -174,6 +175,10 @@ export const POST = withMcpAuth('write')(
`[${requestId}] Added ${addedTools.length} tools to server ${serverId}:`,
addedTools.map((t) => t.toolName)
)
if (addedTools.length > 0) {
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
}
}
logger.info(