mirror of
https://github.com/simstudioai/sim.git
synced 2026-02-13 07:55:09 -05:00
improvement(mcp): improved mcp sse events notifs, update jira to handle files, fix UI issues in settings modal, fix org and workspace invitations when bundled (#3182)
* improvement(mcp): improved mcp sse events notifs, update jira to handle files, fix UI issues in settings modal, fix org and workspace invitations when bundled * added back useMcpToolsEvents for event-driven discovery * ack PR comments * updated placeholder * updated colors, error throwing in mcp modal * ack comments * updated error msg
This commit is contained in:
@@ -4,6 +4,8 @@ import { chat, workflowMcpTool } from '@sim/db/schema'
|
||||
import { and, eq } from 'drizzle-orm'
|
||||
import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/orchestrator/types'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
import { mcpPubSub } from '@/lib/mcp/pubsub'
|
||||
import { generateParameterSchemaForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
|
||||
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'
|
||||
import { deployWorkflow, undeployWorkflow } from '@/lib/workflows/persistence/utils'
|
||||
import { checkChatAccess, checkWorkflowAccessForChatCreation } from '@/app/api/chat/utils'
|
||||
@@ -245,7 +247,10 @@ export async function executeDeployMcp(
|
||||
params.toolDescription ||
|
||||
workflowRecord.description ||
|
||||
`Execute ${workflowRecord.name} workflow`
|
||||
const parameterSchema = params.parameterSchema || {}
|
||||
const parameterSchema =
|
||||
params.parameterSchema && Object.keys(params.parameterSchema).length > 0
|
||||
? params.parameterSchema
|
||||
: await generateParameterSchemaForWorkflow(workflowId)
|
||||
|
||||
const baseUrl = getBaseUrl()
|
||||
const mcpServerUrl = `${baseUrl}/api/mcp/serve/${serverId}`
|
||||
@@ -261,6 +266,9 @@ export async function executeDeployMcp(
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(workflowMcpTool.id, toolId))
|
||||
|
||||
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: { toolId, toolName, toolDescription, updated: true, mcpServerUrl, baseUrl },
|
||||
@@ -279,6 +287,8 @@ export async function executeDeployMcp(
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
|
||||
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: { toolId, toolName, toolDescription, updated: false, mcpServerUrl, baseUrl },
|
||||
|
||||
@@ -3,6 +3,8 @@ import { db } from '@sim/db'
|
||||
import { chat, workflow, workflowMcpServer, workflowMcpTool } from '@sim/db/schema'
|
||||
import { eq, inArray } from 'drizzle-orm'
|
||||
import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/orchestrator/types'
|
||||
import { mcpPubSub } from '@/lib/mcp/pubsub'
|
||||
import { generateParameterSchemaForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
|
||||
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'
|
||||
import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server'
|
||||
import { ensureWorkflowAccess } from '../access'
|
||||
@@ -205,13 +207,14 @@ export async function executeCreateWorkspaceMcpServer(
|
||||
continue
|
||||
}
|
||||
const toolName = sanitizeToolName(wf.name || `workflow_${wf.id}`)
|
||||
const parameterSchema = await generateParameterSchemaForWorkflow(wf.id)
|
||||
await db.insert(workflowMcpTool).values({
|
||||
id: crypto.randomUUID(),
|
||||
serverId,
|
||||
workflowId: wf.id,
|
||||
toolName,
|
||||
toolDescription: wf.description || `Execute ${wf.name} workflow`,
|
||||
parameterSchema: {},
|
||||
parameterSchema,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
@@ -219,6 +222,10 @@ export async function executeCreateWorkspaceMcpServer(
|
||||
}
|
||||
}
|
||||
|
||||
if (addedTools.length > 0) {
|
||||
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
|
||||
}
|
||||
|
||||
return { success: true, output: { server, addedTools } }
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
|
||||
@@ -1,24 +1,44 @@
|
||||
import { db, workflowMcpTool } from '@sim/db'
|
||||
import { db, workflowMcpServer, workflowMcpTool } from '@sim/db'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
|
||||
import { eq, inArray } from 'drizzle-orm'
|
||||
import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils'
|
||||
import { hasValidStartBlockInState } from '@/lib/workflows/triggers/trigger-utils'
|
||||
import type { WorkflowState } from '@/stores/workflows/workflow/types'
|
||||
import { mcpPubSub } from './pubsub'
|
||||
import { extractInputFormatFromBlocks, generateToolInputSchema } from './workflow-tool-schema'
|
||||
|
||||
const logger = createLogger('WorkflowMcpSync')
|
||||
|
||||
const EMPTY_SCHEMA: Record<string, unknown> = Object.freeze({ type: 'object', properties: {} })
|
||||
|
||||
/**
|
||||
* Generate MCP tool parameter schema from workflow blocks
|
||||
* Generate MCP tool parameter schema from workflow blocks.
|
||||
*/
|
||||
function generateSchemaFromBlocks(blocks: Record<string, unknown>): Record<string, unknown> {
|
||||
export function generateSchemaFromBlocks(blocks: Record<string, unknown>): Record<string, unknown> {
|
||||
const inputFormat = extractInputFormatFromBlocks(blocks)
|
||||
if (!inputFormat || inputFormat.length === 0) {
|
||||
return { type: 'object', properties: {} }
|
||||
return EMPTY_SCHEMA
|
||||
}
|
||||
return generateToolInputSchema(inputFormat) as unknown as Record<string, unknown>
|
||||
}
|
||||
|
||||
/**
|
||||
* Load a workflow's active deployed state and generate its MCP parameter schema.
|
||||
* Returns a proper JSON Schema derived from the start block's input format,
|
||||
* or a fallback empty schema if the workflow has no inputs or no active deployment.
|
||||
*/
|
||||
export async function generateParameterSchemaForWorkflow(
|
||||
workflowId: string
|
||||
): Promise<Record<string, unknown>> {
|
||||
try {
|
||||
const deployed = await loadDeployedWorkflowState(workflowId)
|
||||
if (!deployed?.blocks) return EMPTY_SCHEMA
|
||||
return generateSchemaFromBlocks(deployed.blocks as Record<string, unknown>)
|
||||
} catch {
|
||||
return EMPTY_SCHEMA
|
||||
}
|
||||
}
|
||||
|
||||
interface SyncOptions {
|
||||
workflowId: string
|
||||
requestId: string
|
||||
@@ -42,9 +62,8 @@ export async function syncMcpToolsForWorkflow(options: SyncOptions): Promise<voi
|
||||
const { workflowId, requestId, state, context = 'sync' } = options
|
||||
|
||||
try {
|
||||
// Get all MCP tools that use this workflow
|
||||
const tools = await db
|
||||
.select({ id: workflowMcpTool.id })
|
||||
.select({ id: workflowMcpTool.id, serverId: workflowMcpTool.serverId })
|
||||
.from(workflowMcpTool)
|
||||
.where(eq(workflowMcpTool.workflowId, workflowId))
|
||||
|
||||
@@ -53,25 +72,23 @@ export async function syncMcpToolsForWorkflow(options: SyncOptions): Promise<voi
|
||||
return
|
||||
}
|
||||
|
||||
// Get workflow state (from param or load from DB)
|
||||
let workflowState: { blocks?: Record<string, unknown> } | null = state ?? null
|
||||
if (!workflowState) {
|
||||
workflowState = await loadWorkflowFromNormalizedTables(workflowId)
|
||||
workflowState = await loadDeployedWorkflowState(workflowId)
|
||||
}
|
||||
|
||||
// Check if workflow has a valid start block
|
||||
if (!hasValidStartBlockInState(workflowState as WorkflowState | null)) {
|
||||
await db.delete(workflowMcpTool).where(eq(workflowMcpTool.workflowId, workflowId))
|
||||
logger.info(
|
||||
`[${requestId}] Removed ${tools.length} MCP tool(s) - workflow has no start block (${context}): ${workflowId}`
|
||||
)
|
||||
notifyAffectedServers(tools)
|
||||
return
|
||||
}
|
||||
|
||||
// Generate and update parameter schema
|
||||
const parameterSchema = workflowState?.blocks
|
||||
? generateSchemaFromBlocks(workflowState.blocks)
|
||||
: { type: 'object', properties: {} }
|
||||
: EMPTY_SCHEMA
|
||||
|
||||
await db
|
||||
.update(workflowMcpTool)
|
||||
@@ -84,24 +101,62 @@ export async function syncMcpToolsForWorkflow(options: SyncOptions): Promise<voi
|
||||
logger.info(
|
||||
`[${requestId}] Synced ${tools.length} MCP tool(s) for workflow (${context}): ${workflowId}`
|
||||
)
|
||||
|
||||
notifyAffectedServers(tools)
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Error syncing MCP tools (${context}):`, error)
|
||||
// Don't throw - this is a non-critical operation
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove all MCP tools for a workflow (used when undeploying)
|
||||
* Remove all MCP tools for a workflow (used when undeploying).
|
||||
* Queries affected tools before deleting so we can notify their servers.
|
||||
*/
|
||||
export async function removeMcpToolsForWorkflow(
|
||||
workflowId: string,
|
||||
requestId: string
|
||||
): Promise<void> {
|
||||
try {
|
||||
const tools = await db
|
||||
.select({ id: workflowMcpTool.id, serverId: workflowMcpTool.serverId })
|
||||
.from(workflowMcpTool)
|
||||
.where(eq(workflowMcpTool.workflowId, workflowId))
|
||||
|
||||
if (tools.length === 0) return
|
||||
|
||||
await db.delete(workflowMcpTool).where(eq(workflowMcpTool.workflowId, workflowId))
|
||||
logger.info(`[${requestId}] Removed MCP tools for workflow: ${workflowId}`)
|
||||
|
||||
notifyAffectedServers(tools)
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Error removing MCP tools:`, error)
|
||||
// Don't throw - this is a non-critical operation
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish pubsub events for each unique server affected by a tool change.
|
||||
* Resolves workspace IDs from the server table so callers don't need to pass them.
|
||||
*/
|
||||
function notifyAffectedServers(tools: Array<{ serverId: string }>): void {
|
||||
if (!mcpPubSub) return
|
||||
|
||||
const uniqueServerIds = [...new Set(tools.map((t) => t.serverId))]
|
||||
|
||||
void (async () => {
|
||||
try {
|
||||
const servers = await db
|
||||
.select({ id: workflowMcpServer.id, workspaceId: workflowMcpServer.workspaceId })
|
||||
.from(workflowMcpServer)
|
||||
.where(inArray(workflowMcpServer.id, uniqueServerIds))
|
||||
|
||||
for (const server of servers) {
|
||||
mcpPubSub.publishWorkflowToolsChanged({
|
||||
serverId: server.id,
|
||||
workspaceId: server.workspaceId,
|
||||
})
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Error notifying affected servers:', error)
|
||||
}
|
||||
})()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user