fix(mcp): prevent redundant MCP server discovery calls at runtime, use cached tool schema instead (#2273)

* fix(mcp): prevent redundant MCP server discovery calls at runtime, use cached tool schema instead

* added backfill, added loading state for tools in settings > mcp

* fix tool inp
This commit is contained in:
Waleed
2025-12-09 12:44:53 -08:00
committed by GitHub
parent aa1d896b38
commit 22abf98835
7 changed files with 819 additions and 191 deletions

View File

@@ -15,7 +15,6 @@ const logger = createLogger('McpToolExecutionAPI')
export const dynamic = 'force-dynamic'
// Type definitions for improved type safety
interface SchemaProperty {
type: 'string' | 'number' | 'boolean' | 'object' | 'array'
description?: string
@@ -31,9 +30,6 @@ interface ToolExecutionResult {
error?: string
}
/**
* Type guard to safely check if a schema property has a type field
*/
function hasType(prop: unknown): prop is SchemaProperty {
return typeof prop === 'object' && prop !== null && 'type' in prop
}
@@ -57,7 +53,8 @@ export const POST = withMcpAuth('read')(
userId: userId,
})
const { serverId, toolName, arguments: args } = body
const { serverId, toolName, arguments: rawArgs } = body
const args = rawArgs || {}
const serverIdValidation = validateStringParam(serverId, 'serverId')
if (!serverIdValidation.isValid) {
@@ -75,22 +72,31 @@ export const POST = withMcpAuth('read')(
`[${requestId}] Executing tool ${toolName} on server ${serverId} for user ${userId} in workspace ${workspaceId}`
)
let tool = null
let tool: McpTool | null = null
try {
const tools = await mcpService.discoverServerTools(userId, serverId, workspaceId)
tool = tools.find((t) => t.name === toolName)
if (body.toolSchema) {
tool = {
name: toolName,
inputSchema: body.toolSchema,
serverId: serverId,
serverName: 'provided-schema',
} as McpTool
logger.debug(`[${requestId}] Using provided schema for ${toolName}, skipping discovery`)
} else {
const tools = await mcpService.discoverServerTools(userId, serverId, workspaceId)
tool = tools.find((t) => t.name === toolName) ?? null
if (!tool) {
return createMcpErrorResponse(
new Error(
`Tool ${toolName} not found on server ${serverId}. Available tools: ${tools.map((t) => t.name).join(', ')}`
),
'Tool not found',
404
)
if (!tool) {
return createMcpErrorResponse(
new Error(
`Tool ${toolName} not found on server ${serverId}. Available tools: ${tools.map((t) => t.name).join(', ')}`
),
'Tool not found',
404
)
}
}
// Cast arguments to their expected types based on tool schema
if (tool.inputSchema?.properties) {
for (const [paramName, paramSchema] of Object.entries(tool.inputSchema.properties)) {
const schema = paramSchema as any
@@ -100,7 +106,6 @@ export const POST = withMcpAuth('read')(
continue
}
// Cast numbers
if (
(schema.type === 'number' || schema.type === 'integer') &&
typeof value === 'string'
@@ -110,42 +115,33 @@ export const POST = withMcpAuth('read')(
if (!Number.isNaN(numValue)) {
args[paramName] = numValue
}
}
// Cast booleans
else if (schema.type === 'boolean' && typeof value === 'string') {
} else if (schema.type === 'boolean' && typeof value === 'string') {
if (value.toLowerCase() === 'true') {
args[paramName] = true
} else if (value.toLowerCase() === 'false') {
args[paramName] = false
}
}
// Cast arrays
else if (schema.type === 'array' && typeof value === 'string') {
} else if (schema.type === 'array' && typeof value === 'string') {
const stringValue = value.trim()
if (stringValue) {
try {
// Try to parse as JSON first (handles ["item1", "item2"])
const parsed = JSON.parse(stringValue)
if (Array.isArray(parsed)) {
args[paramName] = parsed
} else {
// JSON parsed but not an array, wrap in array
args[paramName] = [parsed]
}
} catch (error) {
// JSON parsing failed - treat as comma-separated if contains commas, otherwise single item
} catch {
if (stringValue.includes(',')) {
args[paramName] = stringValue
.split(',')
.map((item) => item.trim())
.filter((item) => item)
} else {
// Single item - wrap in array since schema expects array
args[paramName] = [stringValue]
}
}
} else {
// Empty string becomes empty array
args[paramName] = []
}
}
@@ -172,7 +168,7 @@ export const POST = withMcpAuth('read')(
const toolCall: McpToolCall = {
name: toolName,
arguments: args || {},
arguments: args,
}
const result = await Promise.race([
@@ -197,7 +193,6 @@ export const POST = withMcpAuth('read')(
}
logger.info(`[${requestId}] Successfully executed tool ${toolName} on server ${serverId}`)
// Track MCP tool execution
try {
const { trackPlatformEvent } = await import('@/lib/core/telemetry')
trackPlatformEvent('platform.mcp.tool_executed', {
@@ -206,8 +201,8 @@ export const POST = withMcpAuth('read')(
'mcp.execution_status': 'success',
'workspace.id': workspaceId,
})
} catch (_e) {
// Silently fail
} catch {
// Telemetry failure is non-critical
}
return createMcpSuccessResponse(transformedResult)
@@ -220,12 +215,9 @@ export const POST = withMcpAuth('read')(
}
)
/**
* Validate tool arguments against schema
*/
function validateToolArguments(tool: McpTool, args: Record<string, unknown>): string | null {
if (!tool.inputSchema) {
return null // No schema to validate against
return null
}
const schema = tool.inputSchema
@@ -270,9 +262,6 @@ function validateToolArguments(tool: McpTool, args: Record<string, unknown>): st
return null
}
/**
* Transform MCP tool result to platform format
*/
function transformToolResult(result: McpToolResult): ToolExecutionResult {
if (result.isError) {
return {

View File

@@ -1,5 +1,5 @@
import type React from 'react'
import { useCallback, useEffect, useMemo, useState } from 'react'
import { useCallback, useEffect, useMemo, useRef, useState } from 'react'
import { useQuery } from '@tanstack/react-query'
import { Loader2, PlusIcon, WrenchIcon, XIcon } from 'lucide-react'
import { useParams } from 'next/navigation'
@@ -845,6 +845,52 @@ export function ToolInput({
? (value as unknown as StoredTool[])
: []
const hasBackfilledRef = useRef(false)
useEffect(() => {
if (
isPreview ||
mcpLoading ||
mcpTools.length === 0 ||
selectedTools.length === 0 ||
hasBackfilledRef.current
) {
return
}
const mcpToolsNeedingSchema = selectedTools.filter(
(tool) => tool.type === 'mcp' && !tool.schema && tool.params?.toolName
)
if (mcpToolsNeedingSchema.length === 0) {
return
}
const updatedTools = selectedTools.map((tool) => {
if (tool.type !== 'mcp' || tool.schema || !tool.params?.toolName) {
return tool
}
const mcpTool = mcpTools.find(
(mt) => mt.name === tool.params?.toolName && mt.serverId === tool.params?.serverId
)
if (mcpTool?.inputSchema) {
logger.info(`Backfilling schema for MCP tool: ${tool.params.toolName}`)
return { ...tool, schema: mcpTool.inputSchema }
}
return tool
})
const hasChanges = updatedTools.some((tool, i) => tool.schema && !selectedTools[i].schema)
if (hasChanges) {
hasBackfilledRef.current = true
logger.info(`Backfilled schemas for ${mcpToolsNeedingSchema.length} MCP tool(s)`)
setStoreValue(updatedTools)
}
}, [mcpTools, mcpLoading, selectedTools, isPreview, setStoreValue])
/**
* Checks if a tool is already selected in the current workflow
* @param toolId - The tool identifier to check
@@ -2314,7 +2360,7 @@ export function ToolInput({
mcpTools={mcpTools}
searchQuery={searchQuery || ''}
customFilter={customFilter}
onToolSelect={(tool) => handleMcpToolSelect(tool, false)}
onToolSelect={handleMcpToolSelect}
disabled={false}
/>

View File

@@ -28,6 +28,7 @@ interface ServerListItemProps {
server: any
tools: any[]
isDeleting: boolean
isLoadingTools?: boolean
onRemove: () => void
onViewDetails: () => void
}
@@ -39,6 +40,7 @@ export function ServerListItem({
server,
tools,
isDeleting,
isLoadingTools = false,
onRemove,
onViewDetails,
}: ServerListItemProps) {
@@ -54,7 +56,9 @@ export function ServerListItem({
</span>
<span className='text-[13px] text-[var(--text-secondary)]'>({transportLabel})</span>
</div>
<p className='truncate text-[13px] text-[var(--text-muted)]'>{toolsLabel}</p>
<p className='truncate text-[13px] text-[var(--text-muted)]'>
{isLoadingTools && tools.length === 0 ? 'Loading...' : toolsLabel}
</p>
</div>
<div className='flex flex-shrink-0 items-center gap-[4px]'>
<Button

View File

@@ -80,7 +80,12 @@ export function MCP() {
isLoading: serversLoading,
error: serversError,
} = useMcpServers(workspaceId)
const { data: mcpToolsData = [], error: toolsError } = useMcpToolsQuery(workspaceId)
const {
data: mcpToolsData = [],
error: toolsError,
isLoading: toolsLoading,
isFetching: toolsFetching,
} = useMcpToolsQuery(workspaceId)
const createServerMutation = useCreateMcpServer()
const deleteServerMutation = useDeleteMcpServer()
const { testResult, isTestingConnection, testConnection, clearTestResult } = useMcpServerTest()
@@ -632,6 +637,7 @@ export function MCP() {
{filteredServers.map((server) => {
if (!server?.id) return null
const tools = toolsByServer[server.id] || []
const isLoadingTools = toolsLoading || toolsFetching
return (
<ServerListItem
@@ -639,6 +645,7 @@ export function MCP() {
server={server}
tools={tools}
isDeleting={deletingServers.has(server.id)}
isLoadingTools={isLoadingTools}
onRemove={() => handleRemoveServer(server.id, server.name || 'this server')}
onViewDetails={() => handleViewDetails(server.id)}
/>

View File

@@ -1719,5 +1719,339 @@ describe('AgentBlockHandler', () => {
expect(contextWithWorkspace.workspaceId).toBe('test-workspace-456')
})
it('should use cached schema for MCP tools (no discovery needed)', async () => {
const fetchCalls: any[] = []
mockFetch.mockImplementation((url: string, options: any) => {
fetchCalls.push({ url, options })
if (url.includes('/api/providers')) {
return Promise.resolve({
ok: true,
headers: {
get: (name: string) => (name === 'Content-Type' ? 'application/json' : null),
},
json: () =>
Promise.resolve({
content: 'Used MCP tool successfully',
model: 'gpt-4o',
tokens: { prompt: 10, completion: 10, total: 20 },
toolCalls: [],
timing: { total: 50 },
}),
})
}
if (url.includes('/api/mcp/tools/execute')) {
return Promise.resolve({
ok: true,
json: () =>
Promise.resolve({
success: true,
data: { output: { content: [{ type: 'text', text: 'Tool executed' }] } },
}),
})
}
return Promise.resolve({ ok: true, json: () => Promise.resolve({}) })
})
const inputs = {
model: 'gpt-4o',
userPrompt: 'Use the MCP tool',
apiKey: 'test-api-key',
tools: [
{
type: 'mcp',
title: 'list_files',
schema: {
type: 'object',
properties: {
path: { type: 'string', description: 'Directory path' },
},
required: ['path'],
},
params: {
serverId: 'mcp-server-123',
toolName: 'list_files',
serverName: 'filesystem',
},
usageControl: 'auto' as const,
},
],
}
const contextWithWorkspace = {
...mockContext,
workspaceId: 'test-workspace-123',
workflowId: 'test-workflow-456',
}
mockGetProviderFromModel.mockReturnValue('openai')
await handler.execute(contextWithWorkspace, mockBlock, inputs)
const discoveryCalls = fetchCalls.filter((c) => c.url.includes('/api/mcp/tools/discover'))
expect(discoveryCalls.length).toBe(0)
const providerCalls = fetchCalls.filter((c) => c.url.includes('/api/providers'))
expect(providerCalls.length).toBe(1)
})
it('should pass toolSchema to execution endpoint when using cached schema', async () => {
let executionCall: any = null
mockFetch.mockImplementation((url: string, options: any) => {
if (url.includes('/api/providers')) {
return Promise.resolve({
ok: true,
headers: {
get: (name: string) => (name === 'Content-Type' ? 'application/json' : null),
},
json: () =>
Promise.resolve({
content: 'Tool executed',
model: 'gpt-4o',
tokens: { prompt: 10, completion: 10, total: 20 },
toolCalls: [
{
name: 'search_files',
arguments: { query: 'test' },
result: { success: true, output: {} },
},
],
timing: { total: 50 },
}),
})
}
if (url.includes('/api/mcp/tools/execute')) {
executionCall = { url, body: JSON.parse(options.body) }
return Promise.resolve({
ok: true,
json: () =>
Promise.resolve({
success: true,
data: { output: { content: [{ type: 'text', text: 'Search results' }] } },
}),
})
}
return Promise.resolve({ ok: true, json: () => Promise.resolve({}) })
})
const cachedSchema = {
type: 'object',
properties: {
query: { type: 'string', description: 'Search query' },
},
required: ['query'],
}
const inputs = {
model: 'gpt-4o',
userPrompt: 'Search for files',
apiKey: 'test-api-key',
tools: [
{
type: 'mcp',
title: 'search_files',
schema: cachedSchema,
params: {
serverId: 'mcp-search-server',
toolName: 'search_files',
serverName: 'search',
},
usageControl: 'auto' as const,
},
],
}
const contextWithWorkspace = {
...mockContext,
workspaceId: 'test-workspace-123',
workflowId: 'test-workflow-456',
}
mockGetProviderFromModel.mockReturnValue('openai')
await handler.execute(contextWithWorkspace, mockBlock, inputs)
const providerCalls = mockFetch.mock.calls.filter((c: any) => c[0].includes('/api/providers'))
expect(providerCalls.length).toBe(1)
const providerRequestBody = JSON.parse(providerCalls[0][1].body)
expect(providerRequestBody.tools).toBeDefined()
expect(providerRequestBody.tools.length).toBe(1)
expect(providerRequestBody.tools[0].name).toBe('search_files')
})
it('should handle multiple MCP tools from the same server efficiently', async () => {
const fetchCalls: any[] = []
mockFetch.mockImplementation((url: string, options: any) => {
fetchCalls.push({ url, options })
if (url.includes('/api/providers')) {
return Promise.resolve({
ok: true,
headers: {
get: (name: string) => (name === 'Content-Type' ? 'application/json' : null),
},
json: () =>
Promise.resolve({
content: 'Used tools',
model: 'gpt-4o',
tokens: { prompt: 10, completion: 10, total: 20 },
toolCalls: [],
timing: { total: 50 },
}),
})
}
return Promise.resolve({ ok: true, json: () => Promise.resolve({}) })
})
const inputs = {
model: 'gpt-4o',
userPrompt: 'Use all the tools',
apiKey: 'test-api-key',
tools: [
{
type: 'mcp',
title: 'tool_1',
schema: { type: 'object', properties: {} },
params: {
serverId: 'same-server',
toolName: 'tool_1',
serverName: 'server',
},
usageControl: 'auto' as const,
},
{
type: 'mcp',
title: 'tool_2',
schema: { type: 'object', properties: {} },
params: {
serverId: 'same-server',
toolName: 'tool_2',
serverName: 'server',
},
usageControl: 'auto' as const,
},
{
type: 'mcp',
title: 'tool_3',
schema: { type: 'object', properties: {} },
params: {
serverId: 'same-server',
toolName: 'tool_3',
serverName: 'server',
},
usageControl: 'auto' as const,
},
],
}
const contextWithWorkspace = {
...mockContext,
workspaceId: 'test-workspace-123',
workflowId: 'test-workflow-456',
}
mockGetProviderFromModel.mockReturnValue('openai')
await handler.execute(contextWithWorkspace, mockBlock, inputs)
const discoveryCalls = fetchCalls.filter((c) => c.url.includes('/api/mcp/tools/discover'))
expect(discoveryCalls.length).toBe(0)
const providerCalls = fetchCalls.filter((c) => c.url.includes('/api/providers'))
expect(providerCalls.length).toBe(1)
const providerRequestBody = JSON.parse(providerCalls[0].options.body)
expect(providerRequestBody.tools.length).toBe(3)
})
it('should should fallback to discovery for MCP tools without cached schema', async () => {
const fetchCalls: any[] = []
mockFetch.mockImplementation((url: string, options: any) => {
fetchCalls.push({ url, options })
if (url.includes('/api/mcp/tools/discover')) {
return Promise.resolve({
ok: true,
json: () =>
Promise.resolve({
success: true,
data: {
tools: [
{
name: 'legacy_tool',
description: 'A legacy tool without cached schema',
inputSchema: { type: 'object', properties: {} },
serverName: 'legacy-server',
},
],
},
}),
})
}
if (url.includes('/api/providers')) {
return Promise.resolve({
ok: true,
headers: {
get: (name: string) => (name === 'Content-Type' ? 'application/json' : null),
},
json: () =>
Promise.resolve({
content: 'Used legacy tool',
model: 'gpt-4o',
tokens: { prompt: 10, completion: 10, total: 20 },
toolCalls: [],
timing: { total: 50 },
}),
})
}
return Promise.resolve({ ok: true, json: () => Promise.resolve({}) })
})
const inputs = {
model: 'gpt-4o',
userPrompt: 'Use the legacy tool',
apiKey: 'test-api-key',
tools: [
{
type: 'mcp',
title: 'legacy_tool',
params: {
serverId: 'mcp-legacy-server',
toolName: 'legacy_tool',
serverName: 'legacy-server',
},
usageControl: 'auto' as const,
},
],
}
const contextWithWorkspace = {
...mockContext,
workspaceId: 'test-workspace-123',
workflowId: 'test-workflow-456',
}
mockGetProviderFromModel.mockReturnValue('openai')
await handler.execute(contextWithWorkspace, mockBlock, inputs)
const discoveryCalls = fetchCalls.filter((c) => c.url.includes('/api/mcp/tools/discover'))
expect(discoveryCalls.length).toBe(1)
expect(discoveryCalls[0].url).toContain('serverId=mcp-legacy-server')
})
})
})

View File

@@ -61,7 +61,6 @@ export class AgentBlockHandler implements BlockHandler {
inputs
)
// Auto-persist response to memory if configured
await this.persistResponseToMemory(ctx, inputs, result, block.id)
return result
@@ -119,45 +118,51 @@ export class AgentBlockHandler implements BlockHandler {
private async formatTools(ctx: ExecutionContext, inputTools: ToolInput[]): Promise<any[]> {
if (!Array.isArray(inputTools)) return []
const tools = await Promise.all(
inputTools
.filter((tool) => {
const usageControl = tool.usageControl || 'auto'
return usageControl !== 'none'
})
.map(async (tool) => {
try {
// Handle custom tools - either inline (schema) or reference (customToolId)
if (tool.type === 'custom-tool' && (tool.schema || tool.customToolId)) {
return await this.createCustomTool(ctx, tool)
}
if (tool.type === 'mcp') {
return await this.createMcpTool(ctx, tool)
}
return this.transformBlockTool(ctx, tool)
} catch (error) {
logger.error(`[AgentHandler] Error creating tool:`, { tool, error })
return null
const filtered = inputTools.filter((tool) => {
const usageControl = tool.usageControl || 'auto'
return usageControl !== 'none'
})
const mcpTools: ToolInput[] = []
const otherTools: ToolInput[] = []
for (const tool of filtered) {
if (tool.type === 'mcp') {
mcpTools.push(tool)
} else {
otherTools.push(tool)
}
}
const otherResults = await Promise.all(
otherTools.map(async (tool) => {
try {
if (tool.type === 'custom-tool' && (tool.schema || tool.customToolId)) {
return await this.createCustomTool(ctx, tool)
}
})
return this.transformBlockTool(ctx, tool)
} catch (error) {
logger.error(`[AgentHandler] Error creating tool:`, { tool, error })
return null
}
})
)
const filteredTools = tools.filter(
const mcpResults = await this.processMcpToolsBatched(ctx, mcpTools)
const allTools = [...otherResults, ...mcpResults]
return allTools.filter(
(tool): tool is NonNullable<typeof tool> => tool !== null && tool !== undefined
)
return filteredTools
}
private async createCustomTool(ctx: ExecutionContext, tool: ToolInput): Promise<any> {
const userProvidedParams = tool.params || {}
// Resolve tool definition - either inline or from database reference
let schema = tool.schema
let code = tool.code
let title = tool.title
// If this is a reference-only tool (has customToolId but no schema), fetch from API
if (tool.customToolId && !schema) {
const resolved = await this.fetchCustomToolById(ctx, tool.customToolId)
if (!resolved) {
@@ -169,7 +174,6 @@ export class AgentBlockHandler implements BlockHandler {
title = resolved.title
}
// Validate we have the required data
if (!schema?.function) {
logger.error('Custom tool missing schema:', { customToolId: tool.customToolId, title })
return null
@@ -231,13 +235,11 @@ export class AgentBlockHandler implements BlockHandler {
/**
* Fetches a custom tool definition from the database by ID
* Uses Zustand store in browser, API call on server
*/
private async fetchCustomToolById(
ctx: ExecutionContext,
customToolId: string
): Promise<{ schema: any; code: string; title: string } | null> {
// In browser, use the Zustand store which has cached data from React Query
if (typeof window !== 'undefined') {
try {
const { useCustomToolsStore } = await import('@/stores/custom-tools/store')
@@ -255,7 +257,6 @@ export class AgentBlockHandler implements BlockHandler {
}
}
// Server-side: fetch from API
try {
const headers = await buildAuthHeaders()
const params: Record<string, string> = {}
@@ -301,104 +302,301 @@ export class AgentBlockHandler implements BlockHandler {
}
}
private async createMcpTool(ctx: ExecutionContext, tool: ToolInput): Promise<any> {
const { serverId, toolName, ...userProvidedParams } = tool.params || {}
/**
* Process MCP tools using cached schemas from build time.
*/
private async processMcpToolsBatched(
ctx: ExecutionContext,
mcpTools: ToolInput[]
): Promise<any[]> {
if (mcpTools.length === 0) return []
if (!serverId || !toolName) {
logger.error('MCP tool missing required parameters:', { serverId, toolName })
return null
const results: any[] = []
const toolsWithSchema: ToolInput[] = []
const toolsNeedingDiscovery: ToolInput[] = []
for (const tool of mcpTools) {
const serverId = tool.params?.serverId
const toolName = tool.params?.toolName
if (!serverId || !toolName) {
logger.error('MCP tool missing serverId or toolName:', tool)
continue
}
if (tool.schema) {
toolsWithSchema.push(tool)
} else {
logger.warn(`MCP tool ${toolName} missing cached schema, will need discovery`)
toolsNeedingDiscovery.push(tool)
}
}
try {
if (!ctx.workspaceId) {
throw new Error('workspaceId is required for MCP tool discovery')
}
if (!ctx.workflowId) {
throw new Error('workflowId is required for internal JWT authentication')
for (const tool of toolsWithSchema) {
try {
const created = await this.createMcpToolFromCachedSchema(ctx, tool)
if (created) results.push(created)
} catch (error) {
logger.error(`Error creating MCP tool from cached schema:`, { tool, error })
}
}
const headers = await buildAuthHeaders()
const url = buildAPIUrl('/api/mcp/tools/discover', {
serverId,
workspaceId: ctx.workspaceId,
workflowId: ctx.workflowId,
if (toolsNeedingDiscovery.length > 0) {
const discoveredResults = await this.processMcpToolsWithDiscovery(ctx, toolsNeedingDiscovery)
results.push(...discoveredResults)
}
return results
}
/**
* Create MCP tool from cached schema. No MCP server connection required.
*/
private async createMcpToolFromCachedSchema(
ctx: ExecutionContext,
tool: ToolInput
): Promise<any> {
const { serverId, toolName, serverName, ...userProvidedParams } = tool.params || {}
const { filterSchemaForLLM } = await import('@/tools/params')
const filteredSchema = filterSchemaForLLM(
tool.schema || { type: 'object', properties: {} },
userProvidedParams
)
const toolId = createMcpToolId(serverId, toolName)
return {
id: toolId,
name: toolName,
description:
tool.schema?.description || `MCP tool ${toolName} from ${serverName || serverId}`,
parameters: filteredSchema,
params: userProvidedParams,
usageControl: tool.usageControl || 'auto',
executeFunction: async (callParams: Record<string, any>) => {
const headers = await buildAuthHeaders()
const execUrl = buildAPIUrl('/api/mcp/tools/execute')
const execResponse = await fetch(execUrl.toString(), {
method: 'POST',
headers,
body: stringifyJSON({
serverId,
toolName,
arguments: callParams,
workspaceId: ctx.workspaceId,
workflowId: ctx.workflowId,
toolSchema: tool.schema,
}),
})
if (!execResponse.ok) {
throw new Error(
`MCP tool execution failed: ${execResponse.status} ${execResponse.statusText}`
)
}
const result = await execResponse.json()
if (!result.success) {
throw new Error(result.error || 'MCP tool execution failed')
}
return {
success: true,
output: result.data.output || {},
metadata: {
source: 'mcp',
serverId,
serverName: serverName || serverId,
toolName,
},
}
},
}
}
/**
* Fallback for legacy tools without cached schemas. Groups by server to minimize connections.
*/
private async processMcpToolsWithDiscovery(
ctx: ExecutionContext,
mcpTools: ToolInput[]
): Promise<any[]> {
const toolsByServer = new Map<string, ToolInput[]>()
for (const tool of mcpTools) {
const serverId = tool.params?.serverId
if (!toolsByServer.has(serverId)) {
toolsByServer.set(serverId, [])
}
toolsByServer.get(serverId)!.push(tool)
}
const serverDiscoveryResults = await Promise.all(
Array.from(toolsByServer.entries()).map(async ([serverId, tools]) => {
try {
const discoveredTools = await this.discoverMcpToolsForServer(ctx, serverId)
return { serverId, tools, discoveredTools, error: null as Error | null }
} catch (error) {
logger.error(`Failed to discover tools from server ${serverId}:`, error)
return { serverId, tools, discoveredTools: [] as any[], error: error as Error }
}
})
)
const response = await fetch(url.toString(), {
method: 'GET',
headers,
})
if (!response.ok) {
throw new Error(`Failed to discover tools from server ${serverId}`)
const results: any[] = []
for (const { serverId, tools, discoveredTools, error } of serverDiscoveryResults) {
if (error) continue
for (const tool of tools) {
try {
const toolName = tool.params?.toolName
const mcpTool = discoveredTools.find((t: any) => t.name === toolName)
if (!mcpTool) {
logger.error(`MCP tool ${toolName} not found on server ${serverId}`)
continue
}
const created = await this.createMcpToolFromDiscoveredData(ctx, tool, mcpTool, serverId)
if (created) results.push(created)
} catch (error) {
logger.error(`Error creating MCP tool:`, { tool, error })
}
}
}
const data = await response.json()
if (!data.success) {
throw new Error(data.error || 'Failed to discover MCP tools')
}
return results
}
const mcpTool = data.data.tools.find((t: any) => t.name === toolName)
if (!mcpTool) {
throw new Error(`MCP tool ${toolName} not found on server ${serverId}`)
}
/**
* Discover tools from a single MCP server with retry logic.
*/
private async discoverMcpToolsForServer(ctx: ExecutionContext, serverId: string): Promise<any[]> {
if (!ctx.workspaceId) {
throw new Error('workspaceId is required for MCP tool discovery')
}
if (!ctx.workflowId) {
throw new Error('workflowId is required for internal JWT authentication')
}
const toolId = createMcpToolId(serverId, toolName)
const headers = await buildAuthHeaders()
const url = buildAPIUrl('/api/mcp/tools/discover', {
serverId,
workspaceId: ctx.workspaceId,
workflowId: ctx.workflowId,
})
const { filterSchemaForLLM } = await import('@/tools/params')
const filteredSchema = filterSchemaForLLM(
mcpTool.inputSchema || { type: 'object', properties: {} },
userProvidedParams
)
const maxAttempts = 2
for (let attempt = 0; attempt < maxAttempts; attempt++) {
try {
const response = await fetch(url.toString(), { method: 'GET', headers })
return {
id: toolId,
name: toolName,
description: mcpTool.description || `MCP tool ${toolName} from ${mcpTool.serverName}`,
parameters: filteredSchema,
params: userProvidedParams,
usageControl: tool.usageControl || 'auto',
executeFunction: async (callParams: Record<string, any>) => {
const headers = await buildAuthHeaders()
const execUrl = buildAPIUrl('/api/mcp/tools/execute')
const execResponse = await fetch(execUrl.toString(), {
method: 'POST',
headers,
body: stringifyJSON({
serverId,
toolName,
arguments: callParams,
workspaceId: ctx.workspaceId,
workflowId: ctx.workflowId,
}),
})
if (!execResponse.ok) {
throw new Error(
`MCP tool execution failed: ${execResponse.status} ${execResponse.statusText}`
if (!response.ok) {
const errorText = await response.text()
if (this.isRetryableError(errorText) && attempt < maxAttempts - 1) {
logger.warn(
`[AgentHandler] Session error discovering tools from ${serverId}, retrying (attempt ${attempt + 1})`
)
await new Promise((r) => setTimeout(r, 100))
continue
}
throw new Error(`Failed to discover tools: ${response.status} ${errorText}`)
}
const result = await execResponse.json()
if (!result.success) {
throw new Error(result.error || 'MCP tool execution failed')
}
const data = await response.json()
if (!data.success) {
throw new Error(data.error || 'Failed to discover MCP tools')
}
return {
success: true,
output: result.data.output || {},
metadata: {
source: 'mcp',
serverId,
serverName: mcpTool.serverName,
toolName,
},
}
},
return data.data.tools
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error)
if (this.isRetryableError(errorMsg) && attempt < maxAttempts - 1) {
logger.warn(
`[AgentHandler] Retryable error discovering tools from ${serverId} (attempt ${attempt + 1}):`,
error
)
await new Promise((r) => setTimeout(r, 100))
continue
}
throw error
}
} catch (error) {
logger.error(`Failed to create MCP tool ${toolName} from server ${serverId}:`, error)
return null
}
throw new Error(
`Failed to discover tools from server ${serverId} after ${maxAttempts} attempts`
)
}
private isRetryableError(errorMsg: string): boolean {
const lowerMsg = errorMsg.toLowerCase()
return lowerMsg.includes('session') || lowerMsg.includes('400') || lowerMsg.includes('404')
}
private async createMcpToolFromDiscoveredData(
ctx: ExecutionContext,
tool: ToolInput,
mcpTool: any,
serverId: string
): Promise<any> {
const { toolName, ...userProvidedParams } = tool.params || {}
const { filterSchemaForLLM } = await import('@/tools/params')
const filteredSchema = filterSchemaForLLM(
mcpTool.inputSchema || { type: 'object', properties: {} },
userProvidedParams
)
const toolId = createMcpToolId(serverId, toolName)
return {
id: toolId,
name: toolName,
description: mcpTool.description || `MCP tool ${toolName} from ${mcpTool.serverName}`,
parameters: filteredSchema,
params: userProvidedParams,
usageControl: tool.usageControl || 'auto',
executeFunction: async (callParams: Record<string, any>) => {
const headers = await buildAuthHeaders()
const execUrl = buildAPIUrl('/api/mcp/tools/execute')
const execResponse = await fetch(execUrl.toString(), {
method: 'POST',
headers,
body: stringifyJSON({
serverId,
toolName,
arguments: callParams,
workspaceId: ctx.workspaceId,
workflowId: ctx.workflowId,
toolSchema: mcpTool.inputSchema,
}),
})
if (!execResponse.ok) {
throw new Error(
`MCP tool execution failed: ${execResponse.status} ${execResponse.statusText}`
)
}
const result = await execResponse.json()
if (!result.success) {
throw new Error(result.error || 'MCP tool execution failed')
}
return {
success: true,
output: result.data.output || {},
metadata: {
source: 'mcp',
serverId,
serverName: mcpTool.serverName,
toolName,
},
}
},
}
}

View File

@@ -322,7 +322,8 @@ class McpService {
}
/**
* Execute a tool on a specific server
* Execute a tool on a specific server with retry logic for session errors.
* Retries once on session-related errors (400, 404, session ID issues).
*/
async executeTool(
userId: string,
@@ -331,27 +332,57 @@ class McpService {
workspaceId: string
): Promise<McpToolResult> {
const requestId = generateRequestId()
const maxRetries = 2
logger.info(
`[${requestId}] Executing MCP tool ${toolCall.name} on server ${serverId} for user ${userId}`
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
logger.info(
`[${requestId}] Executing MCP tool ${toolCall.name} on server ${serverId} for user ${userId}${attempt > 0 ? ` (attempt ${attempt + 1})` : ''}`
)
const config = await this.getServerConfig(serverId, workspaceId)
if (!config) {
throw new Error(`Server ${serverId} not found or not accessible`)
}
const resolvedConfig = await this.resolveConfigEnvVars(config, userId, workspaceId)
const client = await this.createClient(resolvedConfig)
try {
const result = await client.callTool(toolCall)
logger.info(`[${requestId}] Successfully executed tool ${toolCall.name}`)
return result
} finally {
await client.disconnect()
}
} catch (error) {
if (this.isSessionError(error) && attempt < maxRetries - 1) {
logger.warn(
`[${requestId}] Session error executing tool ${toolCall.name}, retrying (attempt ${attempt + 1}):`,
error
)
await new Promise((resolve) => setTimeout(resolve, 100))
continue
}
throw error
}
}
throw new Error(`Failed to execute tool ${toolCall.name} after ${maxRetries} attempts`)
}
/**
* Check if an error indicates a session-related issue that might be resolved by retry
*/
private isSessionError(error: unknown): boolean {
const message = error instanceof Error ? error.message : String(error)
const lowerMessage = message.toLowerCase()
return (
lowerMessage.includes('session') ||
lowerMessage.includes('400') ||
lowerMessage.includes('404') ||
lowerMessage.includes('no valid session')
)
const config = await this.getServerConfig(serverId, workspaceId)
if (!config) {
throw new Error(`Server ${serverId} not found or not accessible`)
}
const resolvedConfig = await this.resolveConfigEnvVars(config, userId, workspaceId)
const client = await this.createClient(resolvedConfig)
try {
const result = await client.callTool(toolCall)
logger.info(`[${requestId}] Successfully executed tool ${toolCall.name}`)
return result
} finally {
await client.disconnect()
}
}
/**
@@ -433,7 +464,8 @@ class McpService {
}
/**
* Discover tools from a specific server
* Discover tools from a specific server with retry logic for session errors.
* Retries once on session-related errors (400, 404, session ID issues).
*/
async discoverServerTools(
userId: string,
@@ -441,25 +473,43 @@ class McpService {
workspaceId: string
): Promise<McpTool[]> {
const requestId = generateRequestId()
const maxRetries = 2
logger.info(`[${requestId}] Discovering tools from server ${serverId} for user ${userId}`)
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
logger.info(
`[${requestId}] Discovering tools from server ${serverId} for user ${userId}${attempt > 0 ? ` (attempt ${attempt + 1})` : ''}`
)
const config = await this.getServerConfig(serverId, workspaceId)
if (!config) {
throw new Error(`Server ${serverId} not found or not accessible`)
const config = await this.getServerConfig(serverId, workspaceId)
if (!config) {
throw new Error(`Server ${serverId} not found or not accessible`)
}
const resolvedConfig = await this.resolveConfigEnvVars(config, userId, workspaceId)
const client = await this.createClient(resolvedConfig)
try {
const tools = await client.listTools()
logger.info(`[${requestId}] Discovered ${tools.length} tools from server ${config.name}`)
return tools
} finally {
await client.disconnect()
}
} catch (error) {
if (this.isSessionError(error) && attempt < maxRetries - 1) {
logger.warn(
`[${requestId}] Session error discovering tools from server ${serverId}, retrying (attempt ${attempt + 1}):`,
error
)
await new Promise((resolve) => setTimeout(resolve, 100))
continue
}
throw error
}
}
const resolvedConfig = await this.resolveConfigEnvVars(config, userId, workspaceId)
const client = await this.createClient(resolvedConfig)
try {
const tools = await client.listTools()
logger.info(`[${requestId}] Discovered ${tools.length} tools from server ${config.name}`)
return tools
} finally {
await client.disconnect()
}
throw new Error(`Failed to discover tools from server ${serverId} after ${maxRetries} attempts`)
}
/**