From 3d571259934c114d83f9b08101f1ad5e61941ce2 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Sat, 31 Jan 2026 11:38:26 -0800 Subject: [PATCH] Add mcp --- apps/sim/app/api/mcp/copilot/route.ts | 517 ++++++++++++++++++ .../lib/copilot/orchestrator/sse-handlers.ts | 1 + apps/sim/lib/copilot/orchestrator/subagent.ts | 239 ++++++++ .../lib/copilot/orchestrator/tool-executor.ts | 273 ++++++++- apps/sim/lib/copilot/orchestrator/types.ts | 2 + 5 files changed, 1027 insertions(+), 5 deletions(-) create mode 100644 apps/sim/app/api/mcp/copilot/route.ts create mode 100644 apps/sim/lib/copilot/orchestrator/subagent.ts diff --git a/apps/sim/app/api/mcp/copilot/route.ts b/apps/sim/app/api/mcp/copilot/route.ts new file mode 100644 index 000000000..6e4d6a9a7 --- /dev/null +++ b/apps/sim/app/api/mcp/copilot/route.ts @@ -0,0 +1,517 @@ +import { + type CallToolResult, + ErrorCode, + type InitializeResult, + isJSONRPCNotification, + isJSONRPCRequest, + type JSONRPCError, + type JSONRPCMessage, + type JSONRPCResponse, + type ListToolsResult, + type RequestId, +} from '@modelcontextprotocol/sdk/types.js' +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { getCopilotModel } from '@/lib/copilot/config' +import { orchestrateSubagentStream } from '@/lib/copilot/orchestrator/subagent' +import { executeToolServerSide, prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor' + +const logger = createLogger('CopilotMcpAPI') + +export const dynamic = 'force-dynamic' + +/** + * Direct tools that execute immediately without LLM orchestration. + * These are fast database queries that don't need AI reasoning. + */ +const DIRECT_TOOL_DEFS: Array<{ + name: string + description: string + inputSchema: { type: 'object'; properties?: Record; required?: string[] } + toolId: string +}> = [ + { + name: 'list_workflows', + toolId: 'list_user_workflows', + description: 'List all workflows the user has access to. Returns workflow IDs, names, and workspace info.', + inputSchema: { + type: 'object', + properties: { + workspaceId: { + type: 'string', + description: 'Optional workspace ID to filter workflows.', + }, + folderId: { + type: 'string', + description: 'Optional folder ID to filter workflows.', + }, + }, + }, + }, + { + name: 'list_workspaces', + toolId: 'list_user_workspaces', + description: 'List all workspaces the user has access to. Returns workspace IDs, names, and roles.', + inputSchema: { + type: 'object', + properties: {}, + }, + }, + { + name: 'list_folders', + toolId: 'list_folders', + description: 'List all folders in a workspace.', + inputSchema: { + type: 'object', + properties: { + workspaceId: { + type: 'string', + description: 'Workspace ID to list folders from.', + }, + }, + required: ['workspaceId'], + }, + }, + { + name: 'get_workflow', + toolId: 'get_workflow_from_name', + description: 'Get a workflow by name or ID. Returns the full workflow definition.', + inputSchema: { + type: 'object', + properties: { + name: { + type: 'string', + description: 'Workflow name to search for.', + }, + workflowId: { + type: 'string', + description: 'Workflow ID to retrieve directly.', + }, + }, + }, + }, +] + +const SUBAGENT_TOOL_DEFS: Array<{ + name: string + description: string + inputSchema: { type: 'object'; properties?: Record; required?: string[] } + agentId: string +}> = [ + { + name: 'copilot_discovery', + agentId: 'discovery', + description: `Find workflows by their contents or functionality when the user doesn't know the exact name or ID. + +USE THIS WHEN: +- User describes a workflow by what it does: "the one that sends emails", "my Slack notification workflow" +- User refers to workflow contents: "the workflow with the OpenAI block" +- User needs to search/match workflows by functionality or description + +DO NOT USE (use direct tools instead): +- User knows the workflow name → use get_workflow +- User wants to list all workflows → use list_workflows +- User wants to list workspaces → use list_workspaces +- User wants to list folders → use list_folders`, + inputSchema: { + type: 'object', + properties: { + request: { type: 'string' }, + workspaceId: { type: 'string' }, + context: { type: 'object' }, + }, + required: ['request'], + }, + }, + { + name: 'copilot_plan', + agentId: 'plan', + description: 'Plan workflow changes by gathering required information.', + inputSchema: { + type: 'object', + properties: { + request: { type: 'string' }, + workflowId: { type: 'string' }, + context: { type: 'object' }, + }, + required: ['request'], + }, + }, + { + name: 'copilot_edit', + agentId: 'edit', + description: 'Execute a workflow plan and apply edits.', + inputSchema: { + type: 'object', + properties: { + message: { type: 'string' }, + workflowId: { type: 'string' }, + plan: { type: 'object' }, + context: { type: 'object' }, + }, + required: ['workflowId'], + }, + }, + { + name: 'copilot_debug', + agentId: 'debug', + description: 'Diagnose errors or unexpected workflow behavior.', + inputSchema: { + type: 'object', + properties: { + error: { type: 'string' }, + workflowId: { type: 'string' }, + context: { type: 'object' }, + }, + required: ['error'], + }, + }, + { + name: 'copilot_deploy', + agentId: 'deploy', + description: 'Deploy or manage workflow deployments.', + inputSchema: { + type: 'object', + properties: { + request: { type: 'string' }, + workflowId: { type: 'string' }, + context: { type: 'object' }, + }, + required: ['request'], + }, + }, + { + name: 'copilot_auth', + agentId: 'auth', + description: 'Handle OAuth connection flows.', + inputSchema: { + type: 'object', + properties: { + request: { type: 'string' }, + context: { type: 'object' }, + }, + required: ['request'], + }, + }, + { + name: 'copilot_knowledge', + agentId: 'knowledge', + description: 'Create and manage knowledge bases.', + inputSchema: { + type: 'object', + properties: { + request: { type: 'string' }, + context: { type: 'object' }, + }, + required: ['request'], + }, + }, + { + name: 'copilot_custom_tool', + agentId: 'custom_tool', + description: 'Create or manage custom tools.', + inputSchema: { + type: 'object', + properties: { + request: { type: 'string' }, + context: { type: 'object' }, + }, + required: ['request'], + }, + }, + { + name: 'copilot_info', + agentId: 'info', + description: 'Inspect blocks, outputs, and workflow metadata.', + inputSchema: { + type: 'object', + properties: { + request: { type: 'string' }, + workflowId: { type: 'string' }, + context: { type: 'object' }, + }, + required: ['request'], + }, + }, + { + name: 'copilot_workflow', + agentId: 'workflow', + description: 'Manage workflow environment and configuration.', + inputSchema: { + type: 'object', + properties: { + request: { type: 'string' }, + workflowId: { type: 'string' }, + context: { type: 'object' }, + }, + required: ['request'], + }, + }, + { + name: 'copilot_research', + agentId: 'research', + description: 'Research external APIs and documentation.', + inputSchema: { + type: 'object', + properties: { + request: { type: 'string' }, + context: { type: 'object' }, + }, + required: ['request'], + }, + }, + { + name: 'copilot_tour', + agentId: 'tour', + description: 'Explain platform features and usage.', + inputSchema: { + type: 'object', + properties: { + request: { type: 'string' }, + context: { type: 'object' }, + }, + required: ['request'], + }, + }, + { + name: 'copilot_test', + agentId: 'test', + description: 'Run workflows and verify outputs.', + inputSchema: { + type: 'object', + properties: { + request: { type: 'string' }, + workflowId: { type: 'string' }, + context: { type: 'object' }, + }, + required: ['request'], + }, + }, + { + name: 'copilot_superagent', + agentId: 'superagent', + description: 'Execute direct external actions (email, Slack, etc.).', + inputSchema: { + type: 'object', + properties: { + request: { type: 'string' }, + context: { type: 'object' }, + }, + required: ['request'], + }, + }, +] + +function createResponse(id: RequestId, result: unknown): JSONRPCResponse { + return { + jsonrpc: '2.0', + id, + result: result as JSONRPCResponse['result'], + } +} + +function createError(id: RequestId, code: ErrorCode | number, message: string): JSONRPCError { + return { + jsonrpc: '2.0', + id, + error: { code, message }, + } +} + +export async function GET() { + return NextResponse.json({ + name: 'copilot-subagents', + version: '1.0.0', + protocolVersion: '2024-11-05', + capabilities: { tools: {} }, + }) +} + +export async function POST(request: NextRequest) { + try { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const body = (await request.json()) as JSONRPCMessage + + if (isJSONRPCNotification(body)) { + return new NextResponse(null, { status: 202 }) + } + + if (!isJSONRPCRequest(body)) { + return NextResponse.json( + createError(0, ErrorCode.InvalidRequest, 'Invalid JSON-RPC message'), + { status: 400 } + ) + } + + const { id, method, params } = body + + switch (method) { + case 'initialize': { + const result: InitializeResult = { + protocolVersion: '2024-11-05', + capabilities: { tools: {} }, + serverInfo: { name: 'copilot-subagents', version: '1.0.0' }, + } + return NextResponse.json(createResponse(id, result)) + } + case 'ping': + return NextResponse.json(createResponse(id, {})) + case 'tools/list': + return handleToolsList(id) + case 'tools/call': + return handleToolsCall( + id, + params as { name: string; arguments?: Record }, + auth.userId + ) + default: + return NextResponse.json( + createError(id, ErrorCode.MethodNotFound, `Method not found: ${method}`), + { status: 404 } + ) + } + } catch (error) { + logger.error('Error handling MCP request', { error }) + return NextResponse.json(createError(0, ErrorCode.InternalError, 'Internal error'), { + status: 500, + }) + } +} + +async function handleToolsList(id: RequestId): Promise { + const directTools = DIRECT_TOOL_DEFS.map((tool) => ({ + name: tool.name, + description: tool.description, + inputSchema: tool.inputSchema, + })) + + const subagentTools = SUBAGENT_TOOL_DEFS.map((tool) => ({ + name: tool.name, + description: tool.description, + inputSchema: tool.inputSchema, + })) + + const result: ListToolsResult = { + tools: [...directTools, ...subagentTools], + } + + return NextResponse.json(createResponse(id, result)) +} + +async function handleToolsCall( + id: RequestId, + params: { name: string; arguments?: Record }, + userId: string +): Promise { + const args = params.arguments || {} + + // Check if this is a direct tool (fast, no LLM) + const directTool = DIRECT_TOOL_DEFS.find((tool) => tool.name === params.name) + if (directTool) { + return handleDirectToolCall(id, directTool, args, userId) + } + + // Check if this is a subagent tool (slower, uses LLM) + const subagentTool = SUBAGENT_TOOL_DEFS.find((tool) => tool.name === params.name) + if (subagentTool) { + return handleSubagentToolCall(id, subagentTool, args, userId) + } + + return NextResponse.json( + createError(id, ErrorCode.MethodNotFound, `Tool not found: ${params.name}`), + { status: 404 } + ) + } + +async function handleDirectToolCall( + id: RequestId, + toolDef: (typeof DIRECT_TOOL_DEFS)[number], + args: Record, + userId: string +): Promise { + try { + const execContext = await prepareExecutionContext(userId, (args.workflowId as string) || '') + + const toolCall = { + id: crypto.randomUUID(), + name: toolDef.toolId, + status: 'pending' as const, + params: args as Record, + startTime: Date.now(), + } + + const result = await executeToolServerSide(toolCall, execContext) + + const response: CallToolResult = { + content: [ + { + type: 'text', + text: JSON.stringify(result.output ?? result, null, 2), + }, + ], + isError: !result.success, + } + + return NextResponse.json(createResponse(id, response)) + } catch (error) { + logger.error('Direct tool execution failed', { tool: toolDef.name, error }) + return NextResponse.json( + createError(id, ErrorCode.InternalError, `Tool execution failed: ${error}`), + { status: 500 } + ) + } +} + +async function handleSubagentToolCall( + id: RequestId, + toolDef: (typeof SUBAGENT_TOOL_DEFS)[number], + args: Record, + userId: string +): Promise { + const requestText = + (args.request as string) || + (args.message as string) || + (args.error as string) || + JSON.stringify(args) + + const context = (args.context as Record) || {} + if (args.plan && !context.plan) { + context.plan = args.plan + } + + const { model } = getCopilotModel('chat') + + const result = await orchestrateSubagentStream( + toolDef.agentId, + { + message: requestText, + workflowId: args.workflowId, + workspaceId: args.workspaceId, + context, + model, + }, + { + userId, + workflowId: args.workflowId as string | undefined, + workspaceId: args.workspaceId as string | undefined, + } + ) + + const response: CallToolResult = { + content: [ + { + type: 'text', + text: JSON.stringify(result, null, 2), + }, + ], + isError: !result.success, + } + + return NextResponse.json(createResponse(id, response)) +} + diff --git a/apps/sim/lib/copilot/orchestrator/sse-handlers.ts b/apps/sim/lib/copilot/orchestrator/sse-handlers.ts index dbe512b72..d65da8e89 100644 --- a/apps/sim/lib/copilot/orchestrator/sse-handlers.ts +++ b/apps/sim/lib/copilot/orchestrator/sse-handlers.ts @@ -26,6 +26,7 @@ const RESPOND_TOOL_SET = new Set([ 'research_respond', 'deploy_respond', 'superagent_respond', + 'discovery_respond', ]) export type SSEHandler = ( diff --git a/apps/sim/lib/copilot/orchestrator/subagent.ts b/apps/sim/lib/copilot/orchestrator/subagent.ts new file mode 100644 index 000000000..bdc69fd68 --- /dev/null +++ b/apps/sim/lib/copilot/orchestrator/subagent.ts @@ -0,0 +1,239 @@ +import { createLogger } from '@sim/logger' +import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants' +import { parseSSEStream } from '@/lib/copilot/orchestrator/sse-parser' +import { + sseHandlers, + subAgentHandlers, + handleSubagentRouting, +} from '@/lib/copilot/orchestrator/sse-handlers' +import { prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor' +import type { + ExecutionContext, + OrchestratorOptions, + SSEEvent, + StreamingContext, + ToolCallSummary, +} from '@/lib/copilot/orchestrator/types' +import { env } from '@/lib/core/config/env' +import { getEffectiveDecryptedEnv } from '@/lib/environment/utils' + +const logger = createLogger('CopilotSubagentOrchestrator') +const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT + +export interface SubagentOrchestratorOptions extends OrchestratorOptions { + userId: string + workflowId?: string + workspaceId?: string +} + +export interface SubagentOrchestratorResult { + success: boolean + content: string + toolCalls: ToolCallSummary[] + structuredResult?: { + type?: string + summary?: string + data?: any + success?: boolean + } + error?: string + errors?: string[] +} + +export async function orchestrateSubagentStream( + agentId: string, + requestPayload: Record, + options: SubagentOrchestratorOptions +): Promise { + const { userId, workflowId, workspaceId, timeout = 300000, abortSignal } = options + const execContext = await buildExecutionContext(userId, workflowId, workspaceId) + + const context: StreamingContext = { + chatId: undefined, + conversationId: undefined, + messageId: requestPayload?.messageId || crypto.randomUUID(), + accumulatedContent: '', + contentBlocks: [], + toolCalls: new Map(), + currentThinkingBlock: null, + isInThinkingBlock: false, + subAgentParentToolCallId: undefined, + subAgentContent: {}, + subAgentToolCalls: {}, + pendingContent: '', + streamComplete: false, + wasAborted: false, + errors: [], + } + + let structuredResult: SubagentOrchestratorResult['structuredResult'] + + try { + const response = await fetch(`${SIM_AGENT_API_URL}/api/subagent/${agentId}`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + ...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}), + }, + body: JSON.stringify({ ...requestPayload, stream: true, userId }), + signal: abortSignal, + }) + + if (!response.ok) { + const errorText = await response.text().catch(() => '') + throw new Error( + `Copilot backend error (${response.status}): ${errorText || response.statusText}` + ) + } + + if (!response.body) { + throw new Error('Copilot backend response missing body') + } + + const reader = response.body.getReader() + const decoder = new TextDecoder() + + const timeoutId = setTimeout(() => { + context.errors.push('Request timed out') + context.streamComplete = true + reader.cancel().catch(() => {}) + }, timeout) + + try { + for await (const event of parseSSEStream(reader, decoder, abortSignal)) { + if (abortSignal?.aborted) { + context.wasAborted = true + break + } + + await forwardEvent(event, options) + + if (event.type === 'structured_result' || event.type === 'subagent_result') { + structuredResult = normalizeStructuredResult(event.data) + context.streamComplete = true + continue + } + + // Handle subagent_start/subagent_end events to track nested subagent calls + if (event.type === 'subagent_start') { + const toolCallId = event.data?.tool_call_id + if (toolCallId) { + context.subAgentParentToolCallId = toolCallId + context.subAgentContent[toolCallId] = '' + context.subAgentToolCalls[toolCallId] = [] + } + continue + } + + if (event.type === 'subagent_end') { + context.subAgentParentToolCallId = undefined + continue + } + + // For direct subagent calls, events may have the subagent field set (e.g., subagent: "discovery") + // but no subagent_start event because this IS the top-level agent. Skip subagent routing + // for events where the subagent field matches the current agentId - these are top-level events. + const isTopLevelSubagentEvent = event.subagent === agentId && !context.subAgentParentToolCallId + + // Only route to subagent handlers for nested subagent events (not matching current agentId) + if (!isTopLevelSubagentEvent && handleSubagentRouting(event, context)) { + const handler = subAgentHandlers[event.type] + if (handler) { + await handler(event, context, execContext, options) + } + if (context.streamComplete) break + continue + } + + // Process as a regular SSE event (including top-level subagent events) + const handler = sseHandlers[event.type] + if (handler) { + await handler(event, context, execContext, options) + } + if (context.streamComplete) break + } + } finally { + clearTimeout(timeoutId) + } + + const result = buildResult(context, structuredResult) + await options.onComplete?.(result) + return result + } catch (error) { + const err = error instanceof Error ? error : new Error('Subagent orchestration failed') + logger.error('Subagent orchestration failed', { error: err.message, agentId }) + await options.onError?.(err) + return { + success: false, + content: context.accumulatedContent, + toolCalls: [], + error: err.message, + } + } +} + +async function forwardEvent(event: SSEEvent, options: OrchestratorOptions): Promise { + try { + await options.onEvent?.(event) + } catch (error) { + logger.warn('Failed to forward SSE event', { + type: event.type, + error: error instanceof Error ? error.message : String(error), + }) + } +} + +function normalizeStructuredResult(data: any): SubagentOrchestratorResult['structuredResult'] { + if (!data || typeof data !== 'object') { + return undefined + } + return { + type: data.result_type || data.type, + summary: data.summary, + data: data.data ?? data, + success: data.success, + } +} + +async function buildExecutionContext( + userId: string, + workflowId?: string, + workspaceId?: string +): Promise { + if (workflowId) { + return prepareExecutionContext(userId, workflowId) + } + + const decryptedEnvVars = await getEffectiveDecryptedEnv(userId, workspaceId) + return { + userId, + workflowId: workflowId || '', + workspaceId, + decryptedEnvVars, + } +} + +function buildResult( + context: StreamingContext, + structuredResult?: SubagentOrchestratorResult['structuredResult'] +): SubagentOrchestratorResult { + const toolCalls: ToolCallSummary[] = Array.from(context.toolCalls.values()).map((toolCall) => ({ + id: toolCall.id, + name: toolCall.name, + status: toolCall.status, + params: toolCall.params, + result: toolCall.result?.output, + error: toolCall.error, + durationMs: + toolCall.endTime && toolCall.startTime ? toolCall.endTime - toolCall.startTime : undefined, + })) + + return { + success: context.errors.length === 0 && !context.wasAborted, + content: context.accumulatedContent, + toolCalls, + structuredResult, + errors: context.errors.length ? context.errors : undefined, + } +} + diff --git a/apps/sim/lib/copilot/orchestrator/tool-executor.ts b/apps/sim/lib/copilot/orchestrator/tool-executor.ts index 15638b0a7..e2611c90d 100644 --- a/apps/sim/lib/copilot/orchestrator/tool-executor.ts +++ b/apps/sim/lib/copilot/orchestrator/tool-executor.ts @@ -5,11 +5,13 @@ import { customTools, permissions, workflow, + workflowFolder, workflowMcpServer, workflowMcpTool, + workspace, } from '@sim/db/schema' import { createLogger } from '@sim/logger' -import { and, asc, desc, eq, inArray, isNull, or } from 'drizzle-orm' +import { and, asc, desc, eq, inArray, isNull, max, or } from 'drizzle-orm' import { refreshTokenIfNeeded } from '@/app/api/auth/oauth/utils' import { checkChatAccess, checkWorkflowAccessForChatCreation } from '@/app/api/chat/utils' import { resolveEnvVarReferences } from '@/executor/utils/reference-validation' @@ -21,9 +23,14 @@ import { getEffectiveDecryptedEnv } from '@/lib/environment/utils' import { listWorkspaceFiles } from '@/lib/uploads/contexts/workspace' import { mcpService } from '@/lib/mcp/service' import { sanitizeForCopilot } from '@/lib/workflows/sanitization/json-sanitizer' -import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' +import { buildDefaultWorkflowArtifacts } from '@/lib/workflows/defaults' +import { + deployWorkflow, + loadWorkflowFromNormalizedTables, + saveWorkflowToNormalizedTables, + undeployWorkflow, +} from '@/lib/workflows/persistence/utils' import { executeWorkflow } from '@/lib/workflows/executor/execute-workflow' -import { deployWorkflow, undeployWorkflow } from '@/lib/workflows/persistence/utils' import { BlockPathCalculator } from '@/lib/workflows/blocks/block-path-calculator' import { getBlockOutputPaths } from '@/lib/workflows/blocks/block-outputs' import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers' @@ -57,6 +64,10 @@ const SIM_WORKFLOW_TOOLS = new Set([ 'get_user_workflow', 'get_workflow_from_name', 'list_user_workflows', + 'list_user_workspaces', + 'list_folders', + 'create_workflow', + 'create_folder', 'get_workflow_data', 'get_block_outputs', 'get_block_upstream_references', @@ -225,7 +236,15 @@ async function executeSimWorkflowTool( case 'get_workflow_from_name': return executeGetWorkflowFromName(params, context) case 'list_user_workflows': - return executeListUserWorkflows(context) + return executeListUserWorkflows(params, context) + case 'list_user_workspaces': + return executeListUserWorkspaces(context) + case 'list_folders': + return executeListFolders(params, context) + case 'create_workflow': + return executeCreateWorkflow(params, context) + case 'create_folder': + return executeCreateFolder(params, context) case 'get_workflow_data': return executeGetWorkflowData(params, context) case 'get_block_outputs': @@ -292,6 +311,61 @@ async function ensureWorkflowAccess(workflowId: string, userId: string): Promise throw new Error('Unauthorized workflow access') } +async function getDefaultWorkspaceId(userId: string): Promise { + const workspaces = await db + .select({ workspaceId: workspace.id }) + .from(permissions) + .innerJoin(workspace, eq(permissions.entityId, workspace.id)) + .where(and(eq(permissions.userId, userId), eq(permissions.entityType, 'workspace'))) + .orderBy(desc(workspace.createdAt)) + .limit(1) + + const workspaceId = workspaces[0]?.workspaceId + if (!workspaceId) { + throw new Error('No workspace found for user') + } + + return workspaceId +} + +async function ensureWorkspaceAccess( + workspaceId: string, + userId: string, + requireWrite: boolean +): Promise { + const [row] = await db + .select({ + permissionType: permissions.permissionType, + ownerId: workspace.ownerId, + }) + .from(permissions) + .innerJoin(workspace, eq(permissions.entityId, workspace.id)) + .where( + and( + eq(permissions.entityType, 'workspace'), + eq(permissions.entityId, workspaceId), + eq(permissions.userId, userId) + ) + ) + .limit(1) + + if (!row) { + throw new Error(`Workspace ${workspaceId} not found`) + } + + const isOwner = row.ownerId === userId + const permissionType = row.permissionType + const canWrite = isOwner || permissionType === 'admin' || permissionType === 'write' + + if (requireWrite && !canWrite) { + throw new Error('Write or admin access required for this workspace') + } + + if (!requireWrite && !canWrite && permissionType !== 'read') { + throw new Error('Access denied to workspace') + } +} + async function executeGetUserWorkflow( params: Record, context: ExecutionContext @@ -398,8 +472,14 @@ async function executeGetWorkflowFromName( } } -async function executeListUserWorkflows(context: ExecutionContext): Promise { +async function executeListUserWorkflows( + params: Record, + context: ExecutionContext +): Promise { try { + const workspaceId = params?.workspaceId as string | undefined + const folderId = params?.folderId as string | undefined + const workspaceIds = await db .select({ entityId: permissions.entityId }) .from(permissions) @@ -411,6 +491,12 @@ async function executeListUserWorkflows(context: ExecutionContext): Promise 0) { workflowConditions.push(inArray(workflow.workspaceId, workspaceIdList)) } + if (workspaceId) { + workflowConditions.push(eq(workflow.workspaceId, workspaceId)) + } + if (folderId) { + workflowConditions.push(eq(workflow.folderId, folderId)) + } const workflows = await db .select() .from(workflow) @@ -426,6 +512,7 @@ async function executeListUserWorkflows(context: ExecutionContext): Promise { + try { + const workspaces = await db + .select({ + workspaceId: workspace.id, + workspaceName: workspace.name, + ownerId: workspace.ownerId, + permissionType: permissions.permissionType, + }) + .from(permissions) + .innerJoin(workspace, eq(permissions.entityId, workspace.id)) + .where(and(eq(permissions.userId, context.userId), eq(permissions.entityType, 'workspace'))) + .orderBy(desc(workspace.createdAt)) + + const output = workspaces.map((row) => ({ + workspaceId: row.workspaceId, + workspaceName: row.workspaceName, + role: row.ownerId === context.userId ? 'owner' : row.permissionType, + })) + + return { success: true, output: { workspaces: output } } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +async function executeListFolders( + params: Record, + context: ExecutionContext +): Promise { + try { + const workspaceId = (params?.workspaceId as string | undefined) || + (await getDefaultWorkspaceId(context.userId)) + + await ensureWorkspaceAccess(workspaceId, context.userId, false) + + const folders = await db + .select({ + folderId: workflowFolder.id, + folderName: workflowFolder.name, + parentId: workflowFolder.parentId, + sortOrder: workflowFolder.sortOrder, + }) + .from(workflowFolder) + .where(eq(workflowFolder.workspaceId, workspaceId)) + .orderBy(asc(workflowFolder.sortOrder), asc(workflowFolder.createdAt)) + + return { + success: true, + output: { + workspaceId, + folders, + }, + } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +async function executeCreateWorkflow( + params: Record, + context: ExecutionContext +): Promise { + try { + const name = typeof params?.name === 'string' ? params.name.trim() : '' + if (!name) { + return { success: false, error: 'name is required' } + } + + const workspaceId = params?.workspaceId || (await getDefaultWorkspaceId(context.userId)) + const folderId = params?.folderId || null + const description = typeof params?.description === 'string' ? params.description : null + + await ensureWorkspaceAccess(workspaceId, context.userId, true) + + const workflowId = crypto.randomUUID() + const now = new Date() + + const folderCondition = folderId ? eq(workflow.folderId, folderId) : isNull(workflow.folderId) + const [maxResult] = await db + .select({ maxOrder: max(workflow.sortOrder) }) + .from(workflow) + .where(and(eq(workflow.workspaceId, workspaceId), folderCondition)) + const sortOrder = (maxResult?.maxOrder ?? 0) + 1 + + await db.insert(workflow).values({ + id: workflowId, + userId: context.userId, + workspaceId, + folderId, + sortOrder, + name, + description, + color: '#3972F6', + lastSynced: now, + createdAt: now, + updatedAt: now, + isDeployed: false, + runCount: 0, + variables: {}, + }) + + const { workflowState } = buildDefaultWorkflowArtifacts() + const saveResult = await saveWorkflowToNormalizedTables(workflowId, workflowState) + if (!saveResult.success) { + throw new Error(saveResult.error || 'Failed to save workflow state') + } + + return { + success: true, + output: { + workflowId, + workflowName: name, + workspaceId, + folderId, + }, + } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +async function executeCreateFolder( + params: Record, + context: ExecutionContext +): Promise { + try { + const name = typeof params?.name === 'string' ? params.name.trim() : '' + if (!name) { + return { success: false, error: 'name is required' } + } + + const workspaceId = params?.workspaceId || (await getDefaultWorkspaceId(context.userId)) + const parentId = params?.parentId || null + + await ensureWorkspaceAccess(workspaceId, context.userId, true) + + const [maxOrder] = await db + .select({ maxOrder: max(workflowFolder.sortOrder) }) + .from(workflowFolder) + .where( + and( + eq(workflowFolder.workspaceId, workspaceId), + parentId ? eq(workflowFolder.parentId, parentId) : isNull(workflowFolder.parentId) + ) + ) + .limit(1) + + const sortOrder = (maxOrder?.maxOrder ?? 0) + 1 + const folderId = crypto.randomUUID() + + await db.insert(workflowFolder).values({ + id: folderId, + name, + userId: context.userId, + workspaceId, + parentId, + color: '#6B7280', + sortOrder, + }) + + return { + success: true, + output: { + folderId, + folderName: name, + workspaceId, + parentId, + sortOrder, + }, + } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + async function executeGetWorkflowData( params: Record, context: ExecutionContext diff --git a/apps/sim/lib/copilot/orchestrator/types.ts b/apps/sim/lib/copilot/orchestrator/types.ts index f4adbdeea..12cdee9da 100644 --- a/apps/sim/lib/copilot/orchestrator/types.ts +++ b/apps/sim/lib/copilot/orchestrator/types.ts @@ -11,6 +11,8 @@ export type SSEEventType = | 'tool_error' | 'subagent_start' | 'subagent_end' + | 'structured_result' + | 'subagent_result' | 'done' | 'error' | 'start'