mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-31 17:58:04 -05:00
Add mcp
This commit is contained in:
517
apps/sim/app/api/mcp/copilot/route.ts
Normal file
517
apps/sim/app/api/mcp/copilot/route.ts
Normal file
@@ -0,0 +1,517 @@
|
||||
import {
|
||||
type CallToolResult,
|
||||
ErrorCode,
|
||||
type InitializeResult,
|
||||
isJSONRPCNotification,
|
||||
isJSONRPCRequest,
|
||||
type JSONRPCError,
|
||||
type JSONRPCMessage,
|
||||
type JSONRPCResponse,
|
||||
type ListToolsResult,
|
||||
type RequestId,
|
||||
} from '@modelcontextprotocol/sdk/types.js'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { getCopilotModel } from '@/lib/copilot/config'
|
||||
import { orchestrateSubagentStream } from '@/lib/copilot/orchestrator/subagent'
|
||||
import { executeToolServerSide, prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor'
|
||||
|
||||
const logger = createLogger('CopilotMcpAPI')
|
||||
|
||||
export const dynamic = 'force-dynamic'
|
||||
|
||||
/**
|
||||
* Direct tools that execute immediately without LLM orchestration.
|
||||
* These are fast database queries that don't need AI reasoning.
|
||||
*/
|
||||
const DIRECT_TOOL_DEFS: Array<{
|
||||
name: string
|
||||
description: string
|
||||
inputSchema: { type: 'object'; properties?: Record<string, unknown>; required?: string[] }
|
||||
toolId: string
|
||||
}> = [
|
||||
{
|
||||
name: 'list_workflows',
|
||||
toolId: 'list_user_workflows',
|
||||
description: 'List all workflows the user has access to. Returns workflow IDs, names, and workspace info.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
workspaceId: {
|
||||
type: 'string',
|
||||
description: 'Optional workspace ID to filter workflows.',
|
||||
},
|
||||
folderId: {
|
||||
type: 'string',
|
||||
description: 'Optional folder ID to filter workflows.',
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'list_workspaces',
|
||||
toolId: 'list_user_workspaces',
|
||||
description: 'List all workspaces the user has access to. Returns workspace IDs, names, and roles.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'list_folders',
|
||||
toolId: 'list_folders',
|
||||
description: 'List all folders in a workspace.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
workspaceId: {
|
||||
type: 'string',
|
||||
description: 'Workspace ID to list folders from.',
|
||||
},
|
||||
},
|
||||
required: ['workspaceId'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'get_workflow',
|
||||
toolId: 'get_workflow_from_name',
|
||||
description: 'Get a workflow by name or ID. Returns the full workflow definition.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
name: {
|
||||
type: 'string',
|
||||
description: 'Workflow name to search for.',
|
||||
},
|
||||
workflowId: {
|
||||
type: 'string',
|
||||
description: 'Workflow ID to retrieve directly.',
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
]
|
||||
|
||||
const SUBAGENT_TOOL_DEFS: Array<{
|
||||
name: string
|
||||
description: string
|
||||
inputSchema: { type: 'object'; properties?: Record<string, unknown>; required?: string[] }
|
||||
agentId: string
|
||||
}> = [
|
||||
{
|
||||
name: 'copilot_discovery',
|
||||
agentId: 'discovery',
|
||||
description: `Find workflows by their contents or functionality when the user doesn't know the exact name or ID.
|
||||
|
||||
USE THIS WHEN:
|
||||
- User describes a workflow by what it does: "the one that sends emails", "my Slack notification workflow"
|
||||
- User refers to workflow contents: "the workflow with the OpenAI block"
|
||||
- User needs to search/match workflows by functionality or description
|
||||
|
||||
DO NOT USE (use direct tools instead):
|
||||
- User knows the workflow name → use get_workflow
|
||||
- User wants to list all workflows → use list_workflows
|
||||
- User wants to list workspaces → use list_workspaces
|
||||
- User wants to list folders → use list_folders`,
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
workspaceId: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'copilot_plan',
|
||||
agentId: 'plan',
|
||||
description: 'Plan workflow changes by gathering required information.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
workflowId: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'copilot_edit',
|
||||
agentId: 'edit',
|
||||
description: 'Execute a workflow plan and apply edits.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
message: { type: 'string' },
|
||||
workflowId: { type: 'string' },
|
||||
plan: { type: 'object' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['workflowId'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'copilot_debug',
|
||||
agentId: 'debug',
|
||||
description: 'Diagnose errors or unexpected workflow behavior.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
error: { type: 'string' },
|
||||
workflowId: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['error'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'copilot_deploy',
|
||||
agentId: 'deploy',
|
||||
description: 'Deploy or manage workflow deployments.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
workflowId: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'copilot_auth',
|
||||
agentId: 'auth',
|
||||
description: 'Handle OAuth connection flows.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'copilot_knowledge',
|
||||
agentId: 'knowledge',
|
||||
description: 'Create and manage knowledge bases.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'copilot_custom_tool',
|
||||
agentId: 'custom_tool',
|
||||
description: 'Create or manage custom tools.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'copilot_info',
|
||||
agentId: 'info',
|
||||
description: 'Inspect blocks, outputs, and workflow metadata.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
workflowId: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'copilot_workflow',
|
||||
agentId: 'workflow',
|
||||
description: 'Manage workflow environment and configuration.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
workflowId: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'copilot_research',
|
||||
agentId: 'research',
|
||||
description: 'Research external APIs and documentation.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'copilot_tour',
|
||||
agentId: 'tour',
|
||||
description: 'Explain platform features and usage.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'copilot_test',
|
||||
agentId: 'test',
|
||||
description: 'Run workflows and verify outputs.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
workflowId: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'copilot_superagent',
|
||||
agentId: 'superagent',
|
||||
description: 'Execute direct external actions (email, Slack, etc.).',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
]
|
||||
|
||||
function createResponse(id: RequestId, result: unknown): JSONRPCResponse {
|
||||
return {
|
||||
jsonrpc: '2.0',
|
||||
id,
|
||||
result: result as JSONRPCResponse['result'],
|
||||
}
|
||||
}
|
||||
|
||||
function createError(id: RequestId, code: ErrorCode | number, message: string): JSONRPCError {
|
||||
return {
|
||||
jsonrpc: '2.0',
|
||||
id,
|
||||
error: { code, message },
|
||||
}
|
||||
}
|
||||
|
||||
export async function GET() {
|
||||
return NextResponse.json({
|
||||
name: 'copilot-subagents',
|
||||
version: '1.0.0',
|
||||
protocolVersion: '2024-11-05',
|
||||
capabilities: { tools: {} },
|
||||
})
|
||||
}
|
||||
|
||||
export async function POST(request: NextRequest) {
|
||||
try {
|
||||
const auth = await checkHybridAuth(request, { requireWorkflowId: false })
|
||||
if (!auth.success || !auth.userId) {
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
|
||||
const body = (await request.json()) as JSONRPCMessage
|
||||
|
||||
if (isJSONRPCNotification(body)) {
|
||||
return new NextResponse(null, { status: 202 })
|
||||
}
|
||||
|
||||
if (!isJSONRPCRequest(body)) {
|
||||
return NextResponse.json(
|
||||
createError(0, ErrorCode.InvalidRequest, 'Invalid JSON-RPC message'),
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
|
||||
const { id, method, params } = body
|
||||
|
||||
switch (method) {
|
||||
case 'initialize': {
|
||||
const result: InitializeResult = {
|
||||
protocolVersion: '2024-11-05',
|
||||
capabilities: { tools: {} },
|
||||
serverInfo: { name: 'copilot-subagents', version: '1.0.0' },
|
||||
}
|
||||
return NextResponse.json(createResponse(id, result))
|
||||
}
|
||||
case 'ping':
|
||||
return NextResponse.json(createResponse(id, {}))
|
||||
case 'tools/list':
|
||||
return handleToolsList(id)
|
||||
case 'tools/call':
|
||||
return handleToolsCall(
|
||||
id,
|
||||
params as { name: string; arguments?: Record<string, unknown> },
|
||||
auth.userId
|
||||
)
|
||||
default:
|
||||
return NextResponse.json(
|
||||
createError(id, ErrorCode.MethodNotFound, `Method not found: ${method}`),
|
||||
{ status: 404 }
|
||||
)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Error handling MCP request', { error })
|
||||
return NextResponse.json(createError(0, ErrorCode.InternalError, 'Internal error'), {
|
||||
status: 500,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async function handleToolsList(id: RequestId): Promise<NextResponse> {
|
||||
const directTools = DIRECT_TOOL_DEFS.map((tool) => ({
|
||||
name: tool.name,
|
||||
description: tool.description,
|
||||
inputSchema: tool.inputSchema,
|
||||
}))
|
||||
|
||||
const subagentTools = SUBAGENT_TOOL_DEFS.map((tool) => ({
|
||||
name: tool.name,
|
||||
description: tool.description,
|
||||
inputSchema: tool.inputSchema,
|
||||
}))
|
||||
|
||||
const result: ListToolsResult = {
|
||||
tools: [...directTools, ...subagentTools],
|
||||
}
|
||||
|
||||
return NextResponse.json(createResponse(id, result))
|
||||
}
|
||||
|
||||
async function handleToolsCall(
|
||||
id: RequestId,
|
||||
params: { name: string; arguments?: Record<string, unknown> },
|
||||
userId: string
|
||||
): Promise<NextResponse> {
|
||||
const args = params.arguments || {}
|
||||
|
||||
// Check if this is a direct tool (fast, no LLM)
|
||||
const directTool = DIRECT_TOOL_DEFS.find((tool) => tool.name === params.name)
|
||||
if (directTool) {
|
||||
return handleDirectToolCall(id, directTool, args, userId)
|
||||
}
|
||||
|
||||
// Check if this is a subagent tool (slower, uses LLM)
|
||||
const subagentTool = SUBAGENT_TOOL_DEFS.find((tool) => tool.name === params.name)
|
||||
if (subagentTool) {
|
||||
return handleSubagentToolCall(id, subagentTool, args, userId)
|
||||
}
|
||||
|
||||
return NextResponse.json(
|
||||
createError(id, ErrorCode.MethodNotFound, `Tool not found: ${params.name}`),
|
||||
{ status: 404 }
|
||||
)
|
||||
}
|
||||
|
||||
async function handleDirectToolCall(
|
||||
id: RequestId,
|
||||
toolDef: (typeof DIRECT_TOOL_DEFS)[number],
|
||||
args: Record<string, unknown>,
|
||||
userId: string
|
||||
): Promise<NextResponse> {
|
||||
try {
|
||||
const execContext = await prepareExecutionContext(userId, (args.workflowId as string) || '')
|
||||
|
||||
const toolCall = {
|
||||
id: crypto.randomUUID(),
|
||||
name: toolDef.toolId,
|
||||
status: 'pending' as const,
|
||||
params: args as Record<string, any>,
|
||||
startTime: Date.now(),
|
||||
}
|
||||
|
||||
const result = await executeToolServerSide(toolCall, execContext)
|
||||
|
||||
const response: CallToolResult = {
|
||||
content: [
|
||||
{
|
||||
type: 'text',
|
||||
text: JSON.stringify(result.output ?? result, null, 2),
|
||||
},
|
||||
],
|
||||
isError: !result.success,
|
||||
}
|
||||
|
||||
return NextResponse.json(createResponse(id, response))
|
||||
} catch (error) {
|
||||
logger.error('Direct tool execution failed', { tool: toolDef.name, error })
|
||||
return NextResponse.json(
|
||||
createError(id, ErrorCode.InternalError, `Tool execution failed: ${error}`),
|
||||
{ status: 500 }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
async function handleSubagentToolCall(
|
||||
id: RequestId,
|
||||
toolDef: (typeof SUBAGENT_TOOL_DEFS)[number],
|
||||
args: Record<string, unknown>,
|
||||
userId: string
|
||||
): Promise<NextResponse> {
|
||||
const requestText =
|
||||
(args.request as string) ||
|
||||
(args.message as string) ||
|
||||
(args.error as string) ||
|
||||
JSON.stringify(args)
|
||||
|
||||
const context = (args.context as Record<string, unknown>) || {}
|
||||
if (args.plan && !context.plan) {
|
||||
context.plan = args.plan
|
||||
}
|
||||
|
||||
const { model } = getCopilotModel('chat')
|
||||
|
||||
const result = await orchestrateSubagentStream(
|
||||
toolDef.agentId,
|
||||
{
|
||||
message: requestText,
|
||||
workflowId: args.workflowId,
|
||||
workspaceId: args.workspaceId,
|
||||
context,
|
||||
model,
|
||||
},
|
||||
{
|
||||
userId,
|
||||
workflowId: args.workflowId as string | undefined,
|
||||
workspaceId: args.workspaceId as string | undefined,
|
||||
}
|
||||
)
|
||||
|
||||
const response: CallToolResult = {
|
||||
content: [
|
||||
{
|
||||
type: 'text',
|
||||
text: JSON.stringify(result, null, 2),
|
||||
},
|
||||
],
|
||||
isError: !result.success,
|
||||
}
|
||||
|
||||
return NextResponse.json(createResponse(id, response))
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ const RESPOND_TOOL_SET = new Set([
|
||||
'research_respond',
|
||||
'deploy_respond',
|
||||
'superagent_respond',
|
||||
'discovery_respond',
|
||||
])
|
||||
|
||||
export type SSEHandler = (
|
||||
|
||||
239
apps/sim/lib/copilot/orchestrator/subagent.ts
Normal file
239
apps/sim/lib/copilot/orchestrator/subagent.ts
Normal file
@@ -0,0 +1,239 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants'
|
||||
import { parseSSEStream } from '@/lib/copilot/orchestrator/sse-parser'
|
||||
import {
|
||||
sseHandlers,
|
||||
subAgentHandlers,
|
||||
handleSubagentRouting,
|
||||
} from '@/lib/copilot/orchestrator/sse-handlers'
|
||||
import { prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor'
|
||||
import type {
|
||||
ExecutionContext,
|
||||
OrchestratorOptions,
|
||||
SSEEvent,
|
||||
StreamingContext,
|
||||
ToolCallSummary,
|
||||
} from '@/lib/copilot/orchestrator/types'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
|
||||
|
||||
const logger = createLogger('CopilotSubagentOrchestrator')
|
||||
const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
|
||||
|
||||
export interface SubagentOrchestratorOptions extends OrchestratorOptions {
|
||||
userId: string
|
||||
workflowId?: string
|
||||
workspaceId?: string
|
||||
}
|
||||
|
||||
export interface SubagentOrchestratorResult {
|
||||
success: boolean
|
||||
content: string
|
||||
toolCalls: ToolCallSummary[]
|
||||
structuredResult?: {
|
||||
type?: string
|
||||
summary?: string
|
||||
data?: any
|
||||
success?: boolean
|
||||
}
|
||||
error?: string
|
||||
errors?: string[]
|
||||
}
|
||||
|
||||
export async function orchestrateSubagentStream(
|
||||
agentId: string,
|
||||
requestPayload: Record<string, any>,
|
||||
options: SubagentOrchestratorOptions
|
||||
): Promise<SubagentOrchestratorResult> {
|
||||
const { userId, workflowId, workspaceId, timeout = 300000, abortSignal } = options
|
||||
const execContext = await buildExecutionContext(userId, workflowId, workspaceId)
|
||||
|
||||
const context: StreamingContext = {
|
||||
chatId: undefined,
|
||||
conversationId: undefined,
|
||||
messageId: requestPayload?.messageId || crypto.randomUUID(),
|
||||
accumulatedContent: '',
|
||||
contentBlocks: [],
|
||||
toolCalls: new Map(),
|
||||
currentThinkingBlock: null,
|
||||
isInThinkingBlock: false,
|
||||
subAgentParentToolCallId: undefined,
|
||||
subAgentContent: {},
|
||||
subAgentToolCalls: {},
|
||||
pendingContent: '',
|
||||
streamComplete: false,
|
||||
wasAborted: false,
|
||||
errors: [],
|
||||
}
|
||||
|
||||
let structuredResult: SubagentOrchestratorResult['structuredResult']
|
||||
|
||||
try {
|
||||
const response = await fetch(`${SIM_AGENT_API_URL}/api/subagent/${agentId}`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
|
||||
},
|
||||
body: JSON.stringify({ ...requestPayload, stream: true, userId }),
|
||||
signal: abortSignal,
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const errorText = await response.text().catch(() => '')
|
||||
throw new Error(
|
||||
`Copilot backend error (${response.status}): ${errorText || response.statusText}`
|
||||
)
|
||||
}
|
||||
|
||||
if (!response.body) {
|
||||
throw new Error('Copilot backend response missing body')
|
||||
}
|
||||
|
||||
const reader = response.body.getReader()
|
||||
const decoder = new TextDecoder()
|
||||
|
||||
const timeoutId = setTimeout(() => {
|
||||
context.errors.push('Request timed out')
|
||||
context.streamComplete = true
|
||||
reader.cancel().catch(() => {})
|
||||
}, timeout)
|
||||
|
||||
try {
|
||||
for await (const event of parseSSEStream(reader, decoder, abortSignal)) {
|
||||
if (abortSignal?.aborted) {
|
||||
context.wasAborted = true
|
||||
break
|
||||
}
|
||||
|
||||
await forwardEvent(event, options)
|
||||
|
||||
if (event.type === 'structured_result' || event.type === 'subagent_result') {
|
||||
structuredResult = normalizeStructuredResult(event.data)
|
||||
context.streamComplete = true
|
||||
continue
|
||||
}
|
||||
|
||||
// Handle subagent_start/subagent_end events to track nested subagent calls
|
||||
if (event.type === 'subagent_start') {
|
||||
const toolCallId = event.data?.tool_call_id
|
||||
if (toolCallId) {
|
||||
context.subAgentParentToolCallId = toolCallId
|
||||
context.subAgentContent[toolCallId] = ''
|
||||
context.subAgentToolCalls[toolCallId] = []
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if (event.type === 'subagent_end') {
|
||||
context.subAgentParentToolCallId = undefined
|
||||
continue
|
||||
}
|
||||
|
||||
// For direct subagent calls, events may have the subagent field set (e.g., subagent: "discovery")
|
||||
// but no subagent_start event because this IS the top-level agent. Skip subagent routing
|
||||
// for events where the subagent field matches the current agentId - these are top-level events.
|
||||
const isTopLevelSubagentEvent = event.subagent === agentId && !context.subAgentParentToolCallId
|
||||
|
||||
// Only route to subagent handlers for nested subagent events (not matching current agentId)
|
||||
if (!isTopLevelSubagentEvent && handleSubagentRouting(event, context)) {
|
||||
const handler = subAgentHandlers[event.type]
|
||||
if (handler) {
|
||||
await handler(event, context, execContext, options)
|
||||
}
|
||||
if (context.streamComplete) break
|
||||
continue
|
||||
}
|
||||
|
||||
// Process as a regular SSE event (including top-level subagent events)
|
||||
const handler = sseHandlers[event.type]
|
||||
if (handler) {
|
||||
await handler(event, context, execContext, options)
|
||||
}
|
||||
if (context.streamComplete) break
|
||||
}
|
||||
} finally {
|
||||
clearTimeout(timeoutId)
|
||||
}
|
||||
|
||||
const result = buildResult(context, structuredResult)
|
||||
await options.onComplete?.(result)
|
||||
return result
|
||||
} catch (error) {
|
||||
const err = error instanceof Error ? error : new Error('Subagent orchestration failed')
|
||||
logger.error('Subagent orchestration failed', { error: err.message, agentId })
|
||||
await options.onError?.(err)
|
||||
return {
|
||||
success: false,
|
||||
content: context.accumulatedContent,
|
||||
toolCalls: [],
|
||||
error: err.message,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function forwardEvent(event: SSEEvent, options: OrchestratorOptions): Promise<void> {
|
||||
try {
|
||||
await options.onEvent?.(event)
|
||||
} catch (error) {
|
||||
logger.warn('Failed to forward SSE event', {
|
||||
type: event.type,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
function normalizeStructuredResult(data: any): SubagentOrchestratorResult['structuredResult'] {
|
||||
if (!data || typeof data !== 'object') {
|
||||
return undefined
|
||||
}
|
||||
return {
|
||||
type: data.result_type || data.type,
|
||||
summary: data.summary,
|
||||
data: data.data ?? data,
|
||||
success: data.success,
|
||||
}
|
||||
}
|
||||
|
||||
async function buildExecutionContext(
|
||||
userId: string,
|
||||
workflowId?: string,
|
||||
workspaceId?: string
|
||||
): Promise<ExecutionContext> {
|
||||
if (workflowId) {
|
||||
return prepareExecutionContext(userId, workflowId)
|
||||
}
|
||||
|
||||
const decryptedEnvVars = await getEffectiveDecryptedEnv(userId, workspaceId)
|
||||
return {
|
||||
userId,
|
||||
workflowId: workflowId || '',
|
||||
workspaceId,
|
||||
decryptedEnvVars,
|
||||
}
|
||||
}
|
||||
|
||||
function buildResult(
|
||||
context: StreamingContext,
|
||||
structuredResult?: SubagentOrchestratorResult['structuredResult']
|
||||
): SubagentOrchestratorResult {
|
||||
const toolCalls: ToolCallSummary[] = Array.from(context.toolCalls.values()).map((toolCall) => ({
|
||||
id: toolCall.id,
|
||||
name: toolCall.name,
|
||||
status: toolCall.status,
|
||||
params: toolCall.params,
|
||||
result: toolCall.result?.output,
|
||||
error: toolCall.error,
|
||||
durationMs:
|
||||
toolCall.endTime && toolCall.startTime ? toolCall.endTime - toolCall.startTime : undefined,
|
||||
}))
|
||||
|
||||
return {
|
||||
success: context.errors.length === 0 && !context.wasAborted,
|
||||
content: context.accumulatedContent,
|
||||
toolCalls,
|
||||
structuredResult,
|
||||
errors: context.errors.length ? context.errors : undefined,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,11 +5,13 @@ import {
|
||||
customTools,
|
||||
permissions,
|
||||
workflow,
|
||||
workflowFolder,
|
||||
workflowMcpServer,
|
||||
workflowMcpTool,
|
||||
workspace,
|
||||
} from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, asc, desc, eq, inArray, isNull, or } from 'drizzle-orm'
|
||||
import { and, asc, desc, eq, inArray, isNull, max, or } from 'drizzle-orm'
|
||||
import { refreshTokenIfNeeded } from '@/app/api/auth/oauth/utils'
|
||||
import { checkChatAccess, checkWorkflowAccessForChatCreation } from '@/app/api/chat/utils'
|
||||
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
|
||||
@@ -21,9 +23,14 @@ import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
|
||||
import { listWorkspaceFiles } from '@/lib/uploads/contexts/workspace'
|
||||
import { mcpService } from '@/lib/mcp/service'
|
||||
import { sanitizeForCopilot } from '@/lib/workflows/sanitization/json-sanitizer'
|
||||
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
|
||||
import { buildDefaultWorkflowArtifacts } from '@/lib/workflows/defaults'
|
||||
import {
|
||||
deployWorkflow,
|
||||
loadWorkflowFromNormalizedTables,
|
||||
saveWorkflowToNormalizedTables,
|
||||
undeployWorkflow,
|
||||
} from '@/lib/workflows/persistence/utils'
|
||||
import { executeWorkflow } from '@/lib/workflows/executor/execute-workflow'
|
||||
import { deployWorkflow, undeployWorkflow } from '@/lib/workflows/persistence/utils'
|
||||
import { BlockPathCalculator } from '@/lib/workflows/blocks/block-path-calculator'
|
||||
import { getBlockOutputPaths } from '@/lib/workflows/blocks/block-outputs'
|
||||
import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers'
|
||||
@@ -57,6 +64,10 @@ const SIM_WORKFLOW_TOOLS = new Set<string>([
|
||||
'get_user_workflow',
|
||||
'get_workflow_from_name',
|
||||
'list_user_workflows',
|
||||
'list_user_workspaces',
|
||||
'list_folders',
|
||||
'create_workflow',
|
||||
'create_folder',
|
||||
'get_workflow_data',
|
||||
'get_block_outputs',
|
||||
'get_block_upstream_references',
|
||||
@@ -225,7 +236,15 @@ async function executeSimWorkflowTool(
|
||||
case 'get_workflow_from_name':
|
||||
return executeGetWorkflowFromName(params, context)
|
||||
case 'list_user_workflows':
|
||||
return executeListUserWorkflows(context)
|
||||
return executeListUserWorkflows(params, context)
|
||||
case 'list_user_workspaces':
|
||||
return executeListUserWorkspaces(context)
|
||||
case 'list_folders':
|
||||
return executeListFolders(params, context)
|
||||
case 'create_workflow':
|
||||
return executeCreateWorkflow(params, context)
|
||||
case 'create_folder':
|
||||
return executeCreateFolder(params, context)
|
||||
case 'get_workflow_data':
|
||||
return executeGetWorkflowData(params, context)
|
||||
case 'get_block_outputs':
|
||||
@@ -292,6 +311,61 @@ async function ensureWorkflowAccess(workflowId: string, userId: string): Promise
|
||||
throw new Error('Unauthorized workflow access')
|
||||
}
|
||||
|
||||
async function getDefaultWorkspaceId(userId: string): Promise<string> {
|
||||
const workspaces = await db
|
||||
.select({ workspaceId: workspace.id })
|
||||
.from(permissions)
|
||||
.innerJoin(workspace, eq(permissions.entityId, workspace.id))
|
||||
.where(and(eq(permissions.userId, userId), eq(permissions.entityType, 'workspace')))
|
||||
.orderBy(desc(workspace.createdAt))
|
||||
.limit(1)
|
||||
|
||||
const workspaceId = workspaces[0]?.workspaceId
|
||||
if (!workspaceId) {
|
||||
throw new Error('No workspace found for user')
|
||||
}
|
||||
|
||||
return workspaceId
|
||||
}
|
||||
|
||||
async function ensureWorkspaceAccess(
|
||||
workspaceId: string,
|
||||
userId: string,
|
||||
requireWrite: boolean
|
||||
): Promise<void> {
|
||||
const [row] = await db
|
||||
.select({
|
||||
permissionType: permissions.permissionType,
|
||||
ownerId: workspace.ownerId,
|
||||
})
|
||||
.from(permissions)
|
||||
.innerJoin(workspace, eq(permissions.entityId, workspace.id))
|
||||
.where(
|
||||
and(
|
||||
eq(permissions.entityType, 'workspace'),
|
||||
eq(permissions.entityId, workspaceId),
|
||||
eq(permissions.userId, userId)
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
|
||||
if (!row) {
|
||||
throw new Error(`Workspace ${workspaceId} not found`)
|
||||
}
|
||||
|
||||
const isOwner = row.ownerId === userId
|
||||
const permissionType = row.permissionType
|
||||
const canWrite = isOwner || permissionType === 'admin' || permissionType === 'write'
|
||||
|
||||
if (requireWrite && !canWrite) {
|
||||
throw new Error('Write or admin access required for this workspace')
|
||||
}
|
||||
|
||||
if (!requireWrite && !canWrite && permissionType !== 'read') {
|
||||
throw new Error('Access denied to workspace')
|
||||
}
|
||||
}
|
||||
|
||||
async function executeGetUserWorkflow(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
@@ -398,8 +472,14 @@ async function executeGetWorkflowFromName(
|
||||
}
|
||||
}
|
||||
|
||||
async function executeListUserWorkflows(context: ExecutionContext): Promise<ToolCallResult> {
|
||||
async function executeListUserWorkflows(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workspaceId = params?.workspaceId as string | undefined
|
||||
const folderId = params?.folderId as string | undefined
|
||||
|
||||
const workspaceIds = await db
|
||||
.select({ entityId: permissions.entityId })
|
||||
.from(permissions)
|
||||
@@ -411,6 +491,12 @@ async function executeListUserWorkflows(context: ExecutionContext): Promise<Tool
|
||||
if (workspaceIdList.length > 0) {
|
||||
workflowConditions.push(inArray(workflow.workspaceId, workspaceIdList))
|
||||
}
|
||||
if (workspaceId) {
|
||||
workflowConditions.push(eq(workflow.workspaceId, workspaceId))
|
||||
}
|
||||
if (folderId) {
|
||||
workflowConditions.push(eq(workflow.folderId, folderId))
|
||||
}
|
||||
const workflows = await db
|
||||
.select()
|
||||
.from(workflow)
|
||||
@@ -426,6 +512,7 @@ async function executeListUserWorkflows(context: ExecutionContext): Promise<Tool
|
||||
workflowId: w.id,
|
||||
workflowName: w.name || '',
|
||||
workspaceId: w.workspaceId,
|
||||
folderId: w.folderId,
|
||||
}))
|
||||
|
||||
return { success: true, output: { workflow_names: names, workflows: workflowList } }
|
||||
@@ -434,6 +521,182 @@ async function executeListUserWorkflows(context: ExecutionContext): Promise<Tool
|
||||
}
|
||||
}
|
||||
|
||||
async function executeListUserWorkspaces(context: ExecutionContext): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workspaces = await db
|
||||
.select({
|
||||
workspaceId: workspace.id,
|
||||
workspaceName: workspace.name,
|
||||
ownerId: workspace.ownerId,
|
||||
permissionType: permissions.permissionType,
|
||||
})
|
||||
.from(permissions)
|
||||
.innerJoin(workspace, eq(permissions.entityId, workspace.id))
|
||||
.where(and(eq(permissions.userId, context.userId), eq(permissions.entityType, 'workspace')))
|
||||
.orderBy(desc(workspace.createdAt))
|
||||
|
||||
const output = workspaces.map((row) => ({
|
||||
workspaceId: row.workspaceId,
|
||||
workspaceName: row.workspaceName,
|
||||
role: row.ownerId === context.userId ? 'owner' : row.permissionType,
|
||||
}))
|
||||
|
||||
return { success: true, output: { workspaces: output } }
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
async function executeListFolders(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workspaceId = (params?.workspaceId as string | undefined) ||
|
||||
(await getDefaultWorkspaceId(context.userId))
|
||||
|
||||
await ensureWorkspaceAccess(workspaceId, context.userId, false)
|
||||
|
||||
const folders = await db
|
||||
.select({
|
||||
folderId: workflowFolder.id,
|
||||
folderName: workflowFolder.name,
|
||||
parentId: workflowFolder.parentId,
|
||||
sortOrder: workflowFolder.sortOrder,
|
||||
})
|
||||
.from(workflowFolder)
|
||||
.where(eq(workflowFolder.workspaceId, workspaceId))
|
||||
.orderBy(asc(workflowFolder.sortOrder), asc(workflowFolder.createdAt))
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
workspaceId,
|
||||
folders,
|
||||
},
|
||||
}
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
async function executeCreateWorkflow(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const name = typeof params?.name === 'string' ? params.name.trim() : ''
|
||||
if (!name) {
|
||||
return { success: false, error: 'name is required' }
|
||||
}
|
||||
|
||||
const workspaceId = params?.workspaceId || (await getDefaultWorkspaceId(context.userId))
|
||||
const folderId = params?.folderId || null
|
||||
const description = typeof params?.description === 'string' ? params.description : null
|
||||
|
||||
await ensureWorkspaceAccess(workspaceId, context.userId, true)
|
||||
|
||||
const workflowId = crypto.randomUUID()
|
||||
const now = new Date()
|
||||
|
||||
const folderCondition = folderId ? eq(workflow.folderId, folderId) : isNull(workflow.folderId)
|
||||
const [maxResult] = await db
|
||||
.select({ maxOrder: max(workflow.sortOrder) })
|
||||
.from(workflow)
|
||||
.where(and(eq(workflow.workspaceId, workspaceId), folderCondition))
|
||||
const sortOrder = (maxResult?.maxOrder ?? 0) + 1
|
||||
|
||||
await db.insert(workflow).values({
|
||||
id: workflowId,
|
||||
userId: context.userId,
|
||||
workspaceId,
|
||||
folderId,
|
||||
sortOrder,
|
||||
name,
|
||||
description,
|
||||
color: '#3972F6',
|
||||
lastSynced: now,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
isDeployed: false,
|
||||
runCount: 0,
|
||||
variables: {},
|
||||
})
|
||||
|
||||
const { workflowState } = buildDefaultWorkflowArtifacts()
|
||||
const saveResult = await saveWorkflowToNormalizedTables(workflowId, workflowState)
|
||||
if (!saveResult.success) {
|
||||
throw new Error(saveResult.error || 'Failed to save workflow state')
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
workflowId,
|
||||
workflowName: name,
|
||||
workspaceId,
|
||||
folderId,
|
||||
},
|
||||
}
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
async function executeCreateFolder(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const name = typeof params?.name === 'string' ? params.name.trim() : ''
|
||||
if (!name) {
|
||||
return { success: false, error: 'name is required' }
|
||||
}
|
||||
|
||||
const workspaceId = params?.workspaceId || (await getDefaultWorkspaceId(context.userId))
|
||||
const parentId = params?.parentId || null
|
||||
|
||||
await ensureWorkspaceAccess(workspaceId, context.userId, true)
|
||||
|
||||
const [maxOrder] = await db
|
||||
.select({ maxOrder: max(workflowFolder.sortOrder) })
|
||||
.from(workflowFolder)
|
||||
.where(
|
||||
and(
|
||||
eq(workflowFolder.workspaceId, workspaceId),
|
||||
parentId ? eq(workflowFolder.parentId, parentId) : isNull(workflowFolder.parentId)
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
|
||||
const sortOrder = (maxOrder?.maxOrder ?? 0) + 1
|
||||
const folderId = crypto.randomUUID()
|
||||
|
||||
await db.insert(workflowFolder).values({
|
||||
id: folderId,
|
||||
name,
|
||||
userId: context.userId,
|
||||
workspaceId,
|
||||
parentId,
|
||||
color: '#6B7280',
|
||||
sortOrder,
|
||||
})
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
folderId,
|
||||
folderName: name,
|
||||
workspaceId,
|
||||
parentId,
|
||||
sortOrder,
|
||||
},
|
||||
}
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
async function executeGetWorkflowData(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
|
||||
@@ -11,6 +11,8 @@ export type SSEEventType =
|
||||
| 'tool_error'
|
||||
| 'subagent_start'
|
||||
| 'subagent_end'
|
||||
| 'structured_result'
|
||||
| 'subagent_result'
|
||||
| 'done'
|
||||
| 'error'
|
||||
| 'start'
|
||||
|
||||
Reference in New Issue
Block a user