diff --git a/apps/sim/app/api/copilot/chat/route.ts b/apps/sim/app/api/copilot/chat/route.ts index 9d31bf5c3..ad68d85a7 100644 --- a/apps/sim/app/api/copilot/chat/route.ts +++ b/apps/sim/app/api/copilot/chat/route.ts @@ -16,6 +16,10 @@ import { createRequestTracker, createUnauthorizedResponse, } from '@/lib/copilot/request-helpers' +import { + handleToolCallEvent, + registerServerHandledTool, +} from '@/lib/copilot/server-executor/stream-handler' import { getCredentialsServerTool } from '@/lib/copilot/tools/server/user/get-credentials' import type { CopilotProviderConfig } from '@/lib/copilot/types' import { env } from '@/lib/core/config/env' @@ -618,6 +622,33 @@ export async function POST(req: NextRequest) { toolCalls.push(event.data) if (event.data?.id) { announcedToolCallIds.add(event.data.id) + + // Execute server-side tools automatically + // This runs async and calls mark-complete when done + handleToolCallEvent( + { + id: event.data.id, + name: event.data.name, + arguments: event.data.arguments || {}, + partial: false, + }, + { + userId: authenticatedUserId, + workflowId, + chatId: actualChatId, + } + ).then((handledServerSide) => { + if (handledServerSide) { + registerServerHandledTool(event.data.id, event.data.name) + logger.info( + `[${tracker.requestId}] Tool will be executed server-side`, + { + toolCallId: event.data.id, + toolName: event.data.name, + } + ) + } + }) } } break diff --git a/apps/sim/app/api/copilot/tools/execution-state/[toolCallId]/route.ts b/apps/sim/app/api/copilot/tools/execution-state/[toolCallId]/route.ts new file mode 100644 index 000000000..7d2ec8819 --- /dev/null +++ b/apps/sim/app/api/copilot/tools/execution-state/[toolCallId]/route.ts @@ -0,0 +1,64 @@ +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { + authenticateCopilotRequestSessionOnly, + createUnauthorizedResponse, +} from '@/lib/copilot/request-helpers' +import { getToolExecutionState } from '@/lib/copilot/server-executor/stream-handler' + +const logger = createLogger('ToolExecutionStateAPI') + +/** + * GET /api/copilot/tools/execution-state/[toolCallId] + * + * Returns the execution state of a tool call. + * Useful for client reconnection scenarios. + */ +export async function GET( + req: NextRequest, + { params }: { params: Promise<{ toolCallId: string }> } +) { + try { + const { userId, isAuthenticated } = await authenticateCopilotRequestSessionOnly() + if (!isAuthenticated || !userId) { + return createUnauthorizedResponse() + } + + const { toolCallId } = await params + + if (!toolCallId) { + return NextResponse.json({ error: 'Tool call ID is required' }, { status: 400 }) + } + + const state = await getToolExecutionState(toolCallId) + + if (!state) { + return NextResponse.json({ error: 'Tool call not found' }, { status: 404 }) + } + + // Verify the user owns this tool execution + if (state.userId !== userId) { + logger.warn("User attempted to access another user's tool execution", { + requestingUserId: userId, + ownerUserId: state.userId, + toolCallId, + }) + return NextResponse.json({ error: 'Tool call not found' }, { status: 404 }) + } + + return NextResponse.json({ + toolCallId: state.toolCallId, + toolName: state.toolName, + status: state.status, + startedAt: state.startedAt, + completedAt: state.completedAt, + result: state.result, + error: state.error, + }) + } catch (error) { + logger.error('Error fetching tool execution state', { + error: error instanceof Error ? error.message : String(error), + }) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} diff --git a/apps/sim/app/api/copilot/tools/execution-states/route.ts b/apps/sim/app/api/copilot/tools/execution-states/route.ts new file mode 100644 index 000000000..8924980c3 --- /dev/null +++ b/apps/sim/app/api/copilot/tools/execution-states/route.ts @@ -0,0 +1,85 @@ +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { + authenticateCopilotRequestSessionOnly, + createBadRequestResponse, + createUnauthorizedResponse, +} from '@/lib/copilot/request-helpers' +import { getToolExecutionState } from '@/lib/copilot/server-executor/stream-handler' + +const logger = createLogger('ToolExecutionStatesAPI') + +const RequestSchema = z.object({ + toolCallIds: z.array(z.string()).min(1).max(50), +}) + +/** + * POST /api/copilot/tools/execution-states + * + * Returns the execution states of multiple tool calls at once. + * Useful for efficient reconnection scenarios. + */ +export async function POST(req: NextRequest) { + try { + const { userId, isAuthenticated } = await authenticateCopilotRequestSessionOnly() + if (!isAuthenticated || !userId) { + return createUnauthorizedResponse() + } + + const body = await req.json() + const { toolCallIds } = RequestSchema.parse(body) + + const states: Record< + string, + { + toolCallId: string + toolName: string + status: string + startedAt: number + completedAt?: number + result?: unknown + error?: string + } | null + > = {} + + // Fetch all states in parallel + const results = await Promise.all( + toolCallIds.map(async (toolCallId) => { + const state = await getToolExecutionState(toolCallId) + // Filter out states that don't belong to this user + if (state && state.userId !== userId) { + return { toolCallId, state: null } + } + return { toolCallId, state } + }) + ) + + for (const { toolCallId, state } of results) { + if (state) { + states[toolCallId] = { + toolCallId: state.toolCallId, + toolName: state.toolName, + status: state.status, + startedAt: state.startedAt, + completedAt: state.completedAt, + result: state.result, + error: state.error, + } + } else { + states[toolCallId] = null + } + } + + return NextResponse.json({ states }) + } catch (error) { + if (error instanceof z.ZodError) { + return createBadRequestResponse('Invalid request body') + } + + logger.error('Error fetching tool execution states', { + error: error instanceof Error ? error.message : String(error), + }) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} diff --git a/apps/sim/app/api/copilot/tools/mark-complete/route.ts b/apps/sim/app/api/copilot/tools/mark-complete/route.ts index 1ada484e5..87ceeb2d4 100644 --- a/apps/sim/app/api/copilot/tools/mark-complete/route.ts +++ b/apps/sim/app/api/copilot/tools/mark-complete/route.ts @@ -9,20 +9,13 @@ import { createRequestTracker, createUnauthorizedResponse, } from '@/lib/copilot/request-helpers' +import { MarkCompletePayloadSchema } from '@/lib/copilot/tools/shared/schemas' import { env } from '@/lib/core/config/env' const logger = createLogger('CopilotMarkToolCompleteAPI') const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT -const MarkCompleteSchema = z.object({ - id: z.string(), - name: z.string(), - status: z.number().int(), - message: z.any().optional(), - data: z.any().optional(), -}) - /** * POST /api/copilot/tools/mark-complete * Proxy to Sim Agent: POST /api/tools/mark-complete @@ -46,7 +39,7 @@ export async function POST(req: NextRequest) { }) } catch {} - const parsed = MarkCompleteSchema.parse(body) + const parsed = MarkCompletePayloadSchema.parse(body) const messagePreview = (() => { try { diff --git a/apps/sim/app/api/copilot/tools/server-executed/route.ts b/apps/sim/app/api/copilot/tools/server-executed/route.ts new file mode 100644 index 000000000..3228a48bb --- /dev/null +++ b/apps/sim/app/api/copilot/tools/server-executed/route.ts @@ -0,0 +1,14 @@ +import { NextResponse } from 'next/server' +import { SERVER_EXECUTED_TOOLS } from '@/lib/copilot/server-executor/registry' + +/** + * GET /api/copilot/tools/server-executed + * + * Returns the list of tools that are executed server-side. + * Clients can use this to avoid double-executing these tools. + */ +export async function GET() { + return NextResponse.json({ + tools: SERVER_EXECUTED_TOOLS, + }) +} diff --git a/apps/sim/lib/copilot/server-executed-tools.ts b/apps/sim/lib/copilot/server-executed-tools.ts new file mode 100644 index 000000000..86be81e69 --- /dev/null +++ b/apps/sim/lib/copilot/server-executed-tools.ts @@ -0,0 +1,162 @@ +/** + * Client-side utilities for server-executed tools. + * + * This module helps the client know which tools are executed server-side + * to avoid double-execution. + */ + +import { createLogger } from '@sim/logger' + +const logger = createLogger('ServerExecutedTools') + +/** + * List of tools that are executed server-side. + * This is cached after the first fetch. + */ +let cachedServerExecutedTools: Set | null = null + +/** + * Tools currently being executed server-side. + * Maps toolCallId to tool info. + */ +const serverHandledToolCalls = new Map< + string, + { + toolName: string + startedAt: number + } +>() + +/** + * Fetch the list of server-executed tools from the API. + * Results are cached for the session. + */ +export async function fetchServerExecutedTools(): Promise> { + if (cachedServerExecutedTools) { + return cachedServerExecutedTools + } + + try { + const response = await fetch('/api/copilot/tools/server-executed') + if (!response.ok) { + throw new Error(`HTTP ${response.status}`) + } + + const data = (await response.json()) as { tools: string[] } + cachedServerExecutedTools = new Set(data.tools) + + logger.info('Fetched server-executed tools', { + count: cachedServerExecutedTools.size, + tools: Array.from(cachedServerExecutedTools), + }) + + return cachedServerExecutedTools + } catch (error) { + logger.warn('Failed to fetch server-executed tools, using empty set', { + error: error instanceof Error ? error.message : String(error), + }) + // Return empty set on error - tools will execute client-side as fallback + return new Set() + } +} + +/** + * Check if a tool is executed server-side. + * Uses cached list or fetches if not available. + */ +export async function isServerExecutedTool(toolName: string): Promise { + const serverTools = await fetchServerExecutedTools() + return serverTools.has(toolName) +} + +/** + * Synchronous check if a tool is server-executed. + * Returns false if cache is not yet populated. + */ +export function isServerExecutedToolSync(toolName: string): boolean { + if (!cachedServerExecutedTools) { + return false + } + return cachedServerExecutedTools.has(toolName) +} + +/** + * Get the cached list of server-executed tools. + * Returns null if not yet fetched. + */ +export function getServerExecutedToolsSync(): Set | null { + return cachedServerExecutedTools +} + +/** + * Mark a tool call as being handled by the server. + * Used to prevent client from executing it. + */ +export function markToolCallServerHandled(toolCallId: string, toolName: string): void { + serverHandledToolCalls.set(toolCallId, { + toolName, + startedAt: Date.now(), + }) + + logger.debug('Marked tool call as server-handled', { toolCallId, toolName }) + + // Cleanup old entries (older than 1 hour) + const oneHourAgo = Date.now() - 60 * 60 * 1000 + for (const [id, info] of serverHandledToolCalls.entries()) { + if (info.startedAt < oneHourAgo) { + serverHandledToolCalls.delete(id) + } + } +} + +/** + * Check if a specific tool call is being handled by the server. + */ +export function isToolCallServerHandled(toolCallId: string): boolean { + return serverHandledToolCalls.has(toolCallId) +} + +/** + * Remove a tool call from server-handled tracking. + * Called when tool_result is received. + */ +export function clearToolCallServerHandled(toolCallId: string): void { + serverHandledToolCalls.delete(toolCallId) +} + +/** + * Get tool execution state from the server (for reconnection scenarios). + */ +export async function getToolExecutionState(toolCallId: string): Promise<{ + status: 'pending' | 'executing' | 'completed' | 'failed' | 'unknown' + result?: unknown + error?: string +} | null> { + try { + const response = await fetch(`/api/copilot/tools/execution-state/${toolCallId}`) + if (!response.ok) { + if (response.status === 404) { + return null + } + throw new Error(`HTTP ${response.status}`) + } + + return await response.json() + } catch (error) { + logger.warn('Failed to get tool execution state', { + toolCallId, + error: error instanceof Error ? error.message : String(error), + }) + return null + } +} + +/** + * Pre-fetch server-executed tools list. + * Call this early in the app lifecycle. + */ +export function prefetchServerExecutedTools(): void { + fetchServerExecutedTools().catch(() => { + // Errors already logged in fetchServerExecutedTools + }) +} diff --git a/apps/sim/lib/copilot/server-executor/index.ts b/apps/sim/lib/copilot/server-executor/index.ts new file mode 100644 index 000000000..2d4305db1 --- /dev/null +++ b/apps/sim/lib/copilot/server-executor/index.ts @@ -0,0 +1,58 @@ +/** + * Server-side tool executor. + * + * This module provides the ability to execute tools server-side (in Next.js API routes) + * rather than requiring the browser to execute them. + * + * Key function: executeToolOnServer() + * - Returns ToolResult if the tool was executed server-side + * - Returns null if the tool is not registered (client should handle) + */ + +import { createLogger } from '@sim/logger' +import { executeRegisteredTool, isServerExecutedTool } from './registry' +import type { ExecutionContext, ToolResult } from './types' + +const logger = createLogger('ServerExecutor') + +/** + * Execute a tool on the server if it's registered. + * + * @param toolName - The name of the tool to execute + * @param args - The arguments to pass to the tool + * @param context - Execution context (userId, workflowId, etc.) + * @returns ToolResult if executed, null if tool not registered server-side + */ +export async function executeToolOnServer( + toolName: string, + args: unknown, + context: ExecutionContext +): Promise { + // Check if this tool should be executed server-side + if (!isServerExecutedTool(toolName)) { + logger.debug('Tool not registered for server execution, client will handle', { toolName }) + return null + } + + logger.info('Executing tool server-side', { + toolName, + userId: context.userId, + workflowId: context.workflowId, + }) + + const startTime = Date.now() + const result = await executeRegisteredTool(toolName, args, context) + + logger.info('Tool execution completed', { + toolName, + success: result.success, + durationMs: Date.now() - startTime, + }) + + return result +} + +export { isServerExecutedTool, SERVER_EXECUTED_TOOLS } from './registry' +// Re-export types and utilities +export type { ExecutionContext, ToolResult } from './types' +export { errorResult, successResult } from './types' diff --git a/apps/sim/lib/copilot/server-executor/registry.ts b/apps/sim/lib/copilot/server-executor/registry.ts new file mode 100644 index 000000000..2543e6506 --- /dev/null +++ b/apps/sim/lib/copilot/server-executor/registry.ts @@ -0,0 +1,381 @@ +/** + * Server Tool Registry + * + * Central registry for all server-executed tools. This replaces the scattered + * executor files with a single, declarative registry. + * + * Benefits: + * - Single source of truth for tool registration + * - Type-safe with Zod schemas + * - No duplicate wrapper code + * - Easy to add new tools + */ + +import { createLogger } from '@sim/logger' +import type { z } from 'zod' +import { getBlockConfigServerTool } from '../tools/server/blocks/get-block-config' +import { getBlockOptionsServerTool } from '../tools/server/blocks/get-block-options' +// Import server tool implementations +import { getBlocksAndToolsServerTool } from '../tools/server/blocks/get-blocks-and-tools' +import { getBlocksMetadataServerTool } from '../tools/server/blocks/get-blocks-metadata-tool' +import { getTriggerBlocksServerTool } from '../tools/server/blocks/get-trigger-blocks' +import { searchDocumentationServerTool } from '../tools/server/docs/search-documentation' +import { knowledgeBaseServerTool } from '../tools/server/knowledge/knowledge-base' +import { CheckoffTodoInput, checkoffTodoServerTool } from '../tools/server/other/checkoff-todo' +import { makeApiRequestServerTool } from '../tools/server/other/make-api-request' +import { + MarkTodoInProgressInput, + markTodoInProgressServerTool, +} from '../tools/server/other/mark-todo-in-progress' +import { searchOnlineServerTool } from '../tools/server/other/search-online' +import { SleepInput, sleepServerTool } from '../tools/server/other/sleep' +import { getCredentialsServerTool } from '../tools/server/user/get-credentials' +import { setEnvironmentVariablesServerTool } from '../tools/server/user/set-environment-variables' +import { + CheckDeploymentStatusInput, + checkDeploymentStatusServerTool, +} from '../tools/server/workflow/check-deployment-status' +import { + CreateWorkspaceMcpServerInput, + createWorkspaceMcpServerServerTool, +} from '../tools/server/workflow/create-workspace-mcp-server' +import { DeployApiInput, deployApiServerTool } from '../tools/server/workflow/deploy-api' +import { DeployChatInput, deployChatServerTool } from '../tools/server/workflow/deploy-chat' +import { DeployMcpInput, deployMcpServerTool } from '../tools/server/workflow/deploy-mcp' +import { editWorkflowServerTool } from '../tools/server/workflow/edit-workflow' +import { + GetBlockOutputsInput, + getBlockOutputsServerTool, +} from '../tools/server/workflow/get-block-outputs' +import { + GetUserWorkflowInput, + getUserWorkflowServerTool, +} from '../tools/server/workflow/get-user-workflow' +import { getWorkflowConsoleServerTool } from '../tools/server/workflow/get-workflow-console' +import { + GetWorkflowFromNameInput, + getWorkflowFromNameServerTool, +} from '../tools/server/workflow/get-workflow-from-name' +import { listUserWorkflowsServerTool } from '../tools/server/workflow/list-user-workflows' +import { + ListWorkspaceMcpServersInput, + listWorkspaceMcpServersServerTool, +} from '../tools/server/workflow/list-workspace-mcp-servers' +import { RedeployInput, redeployServerTool } from '../tools/server/workflow/redeploy' +import { RunWorkflowInput, runWorkflowServerTool } from '../tools/server/workflow/run-workflow' +import { + SetGlobalWorkflowVariablesInput, + setGlobalWorkflowVariablesServerTool, +} from '../tools/server/workflow/set-global-workflow-variables' +// Import schemas +import { + EditWorkflowInput, + GetBlockConfigInput, + GetBlockOptionsInput, + GetBlocksAndToolsInput, + GetBlocksMetadataInput, + GetCredentialsInput, + GetTriggerBlocksInput, + GetWorkflowConsoleInput, + KnowledgeBaseArgsSchema, + ListUserWorkflowsInput, + MakeApiRequestInput, + SearchDocumentationInput, + SearchOnlineInput, + SetEnvironmentVariablesInput, +} from '../tools/shared/schemas' +import type { ExecutionContext, ToolResult } from './types' +import { errorResult, successResult } from './types' + +const logger = createLogger('ToolRegistry') + +/** + * Context type for server tools. + */ +type ServerToolContext = { userId: string } | undefined + +/** + * Helper to create a typed executor wrapper. + * This provides a clean boundary between our registry (unknown args) + * and the underlying typed server tools. + * + * The generic TArgs is inferred from the Zod schema, ensuring type safety + * at compile time while allowing runtime validation. + */ +function createExecutor( + serverTool: { execute: (args: TArgs, ctx?: ServerToolContext) => Promise }, + options: { passContext: boolean } = { passContext: true } +): (args: unknown, ctx: ServerToolContext) => Promise { + return (args, ctx) => { + // After Zod validation, we know args matches TArgs + // This cast is safe because validation happens before execution + const typedArgs = args as TArgs + return options.passContext ? serverTool.execute(typedArgs, ctx) : serverTool.execute(typedArgs) + } +} + +/** + * Tool registration entry. + */ +interface ToolRegistration { + /** Zod schema for input validation (optional) */ + inputSchema?: z.ZodType + /** Whether this tool requires authentication */ + requiresAuth: boolean + /** The underlying execute function */ + execute: (args: unknown, context: ServerToolContext) => Promise +} + +/** + * The tool registry - maps tool names to their configurations. + * + * Each tool is registered with: + * - inputSchema: Zod schema for validation (optional) + * - requiresAuth: Whether userId is required + * - execute: The underlying server tool's execute function + */ +const TOOL_REGISTRY: Record = { + // ───────────────────────────────────────────────────────────────────────── + // Block Tools + // ───────────────────────────────────────────────────────────────────────── + get_blocks_and_tools: { + inputSchema: GetBlocksAndToolsInput, + requiresAuth: true, + execute: createExecutor(getBlocksAndToolsServerTool), + }, + get_block_config: { + inputSchema: GetBlockConfigInput, + requiresAuth: true, + execute: createExecutor(getBlockConfigServerTool), + }, + get_block_options: { + inputSchema: GetBlockOptionsInput, + requiresAuth: true, + execute: createExecutor(getBlockOptionsServerTool), + }, + get_blocks_metadata: { + inputSchema: GetBlocksMetadataInput, + requiresAuth: true, + execute: createExecutor(getBlocksMetadataServerTool), + }, + get_trigger_blocks: { + inputSchema: GetTriggerBlocksInput, + requiresAuth: true, + execute: createExecutor(getTriggerBlocksServerTool), + }, + + // ───────────────────────────────────────────────────────────────────────── + // Workflow Tools + // ───────────────────────────────────────────────────────────────────────── + edit_workflow: { + inputSchema: EditWorkflowInput, + requiresAuth: true, + execute: createExecutor(editWorkflowServerTool), + }, + get_workflow_console: { + inputSchema: GetWorkflowConsoleInput, + requiresAuth: false, // Tool validates workflowId itself + execute: createExecutor(getWorkflowConsoleServerTool, { passContext: false }), + }, + list_user_workflows: { + inputSchema: ListUserWorkflowsInput, + requiresAuth: true, + execute: createExecutor(listUserWorkflowsServerTool), + }, + get_workflow_from_name: { + inputSchema: GetWorkflowFromNameInput, + requiresAuth: true, + execute: createExecutor(getWorkflowFromNameServerTool), + }, + check_deployment_status: { + inputSchema: CheckDeploymentStatusInput, + requiresAuth: true, + execute: createExecutor(checkDeploymentStatusServerTool), + }, + list_workspace_mcp_servers: { + inputSchema: ListWorkspaceMcpServersInput, + requiresAuth: true, + execute: createExecutor(listWorkspaceMcpServersServerTool), + }, + set_global_workflow_variables: { + inputSchema: SetGlobalWorkflowVariablesInput, + requiresAuth: true, + execute: createExecutor(setGlobalWorkflowVariablesServerTool), + }, + redeploy: { + inputSchema: RedeployInput, + requiresAuth: true, + execute: createExecutor(redeployServerTool), + }, + create_workspace_mcp_server: { + inputSchema: CreateWorkspaceMcpServerInput, + requiresAuth: true, + execute: createExecutor(createWorkspaceMcpServerServerTool), + }, + deploy_api: { + inputSchema: DeployApiInput, + requiresAuth: true, + execute: createExecutor(deployApiServerTool), + }, + deploy_chat: { + inputSchema: DeployChatInput, + requiresAuth: true, + execute: createExecutor(deployChatServerTool), + }, + deploy_mcp: { + inputSchema: DeployMcpInput, + requiresAuth: true, + execute: createExecutor(deployMcpServerTool), + }, + run_workflow: { + inputSchema: RunWorkflowInput, + requiresAuth: true, + execute: createExecutor(runWorkflowServerTool), + }, + get_user_workflow: { + inputSchema: GetUserWorkflowInput, + requiresAuth: true, + execute: createExecutor(getUserWorkflowServerTool), + }, + get_block_outputs: { + inputSchema: GetBlockOutputsInput, + requiresAuth: true, + execute: createExecutor(getBlockOutputsServerTool), + }, + + // ───────────────────────────────────────────────────────────────────────── + // Search Tools + // ───────────────────────────────────────────────────────────────────────── + search_documentation: { + inputSchema: SearchDocumentationInput, + requiresAuth: false, + execute: createExecutor(searchDocumentationServerTool, { passContext: false }), + }, + search_online: { + inputSchema: SearchOnlineInput, + requiresAuth: false, + execute: createExecutor(searchOnlineServerTool, { passContext: false }), + }, + make_api_request: { + inputSchema: MakeApiRequestInput, + requiresAuth: false, + execute: createExecutor(makeApiRequestServerTool, { passContext: false }), + }, + + // ───────────────────────────────────────────────────────────────────────── + // Knowledge Tools + // ───────────────────────────────────────────────────────────────────────── + knowledge_base: { + inputSchema: KnowledgeBaseArgsSchema, + requiresAuth: true, + execute: createExecutor(knowledgeBaseServerTool), + }, + + // ───────────────────────────────────────────────────────────────────────── + // User Tools + // ───────────────────────────────────────────────────────────────────────── + get_credentials: { + inputSchema: GetCredentialsInput, + requiresAuth: true, + execute: createExecutor(getCredentialsServerTool), + }, + set_environment_variables: { + inputSchema: SetEnvironmentVariablesInput, + requiresAuth: true, + execute: createExecutor(setEnvironmentVariablesServerTool), + }, + + // ───────────────────────────────────────────────────────────────────────── + // Todo Tools + // ───────────────────────────────────────────────────────────────────────── + checkoff_todo: { + inputSchema: CheckoffTodoInput, + requiresAuth: false, // Just returns success, no auth needed + execute: createExecutor(checkoffTodoServerTool, { passContext: false }), + }, + mark_todo_in_progress: { + inputSchema: MarkTodoInProgressInput, + requiresAuth: false, + execute: createExecutor(markTodoInProgressServerTool, { passContext: false }), + }, + + // ───────────────────────────────────────────────────────────────────────── + // Utility Tools + // ───────────────────────────────────────────────────────────────────────── + sleep: { + inputSchema: SleepInput, + requiresAuth: false, + execute: createExecutor(sleepServerTool, { passContext: false }), + }, +} + +/** + * List of all server-executed tool names. + * Export this so clients know which tools NOT to execute locally. + */ +export const SERVER_EXECUTED_TOOLS = Object.keys(TOOL_REGISTRY) + +/** + * Check if a tool is registered for server execution. + */ +export function isServerExecutedTool(toolName: string): boolean { + return toolName in TOOL_REGISTRY +} + +/** + * Execute a tool with proper validation and error handling. + * + * This is the main entry point for tool execution. It: + * 1. Looks up the tool in the registry + * 2. Validates input against the schema (if provided) + * 3. Checks authentication requirements + * 4. Executes the tool + * 5. Returns a standardized ToolResult + */ +export async function executeRegisteredTool( + toolName: string, + args: unknown, + context: ExecutionContext +): Promise { + const registration = TOOL_REGISTRY[toolName] + + if (!registration) { + logger.warn('Unknown tool requested', { toolName }) + return errorResult('UNKNOWN_TOOL', `Tool '${toolName}' is not registered for server execution`) + } + + // Check authentication requirement + if (registration.requiresAuth && !context.userId) { + logger.error('Authentication required but not provided', { toolName }) + return errorResult('AUTH_REQUIRED', `Tool '${toolName}' requires authentication`) + } + + // Validate input if schema is provided + let validatedArgs: unknown = args ?? {} + if (registration.inputSchema) { + const parseResult = registration.inputSchema.safeParse(args ?? {}) + if (!parseResult.success) { + logger.warn('Input validation failed', { + toolName, + errors: parseResult.error.flatten(), + }) + return errorResult('VALIDATION_ERROR', 'Invalid input arguments', { + errors: parseResult.error.flatten().fieldErrors, + }) + } + validatedArgs = parseResult.data + } + + // Execute the tool + try { + const result = await registration.execute( + validatedArgs, + context.userId ? { userId: context.userId } : undefined + ) + return successResult(result) + } catch (error) { + const message = error instanceof Error ? error.message : String(error) + logger.error('Tool execution failed', { toolName, error: message }) + return errorResult('EXECUTION_ERROR', message) + } +} diff --git a/apps/sim/lib/copilot/server-executor/stream-handler.ts b/apps/sim/lib/copilot/server-executor/stream-handler.ts new file mode 100644 index 000000000..9f3fd2311 --- /dev/null +++ b/apps/sim/lib/copilot/server-executor/stream-handler.ts @@ -0,0 +1,425 @@ +/** + * SSE Stream Tool Execution Handler + * + * This module intercepts tool_call events from the Go copilot SSE stream + * and executes server-side tools, calling mark-complete to return results. + * + * Key features: + * - Non-blocking: Tool execution happens in parallel with stream forwarding + * - Resilient: Uses Redis for state persistence across disconnects + * - Transparent: Still forwards all events to browser for UI updates + */ + +import { createLogger } from '@sim/logger' +import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants' +import { env } from '@/lib/core/config/env' +import { getRedisClient } from '@/lib/core/config/redis' +import { executeToolOnServer, isServerExecutedTool } from './index' +import type { ExecutionContext } from './types' + +const logger = createLogger('StreamToolHandler') + +const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT + +/** Redis key prefix for tool execution state */ +const REDIS_KEY_PREFIX = 'copilot:tool_exec:' + +/** TTL for Redis entries (1 hour) */ +const REDIS_TTL_SECONDS = 60 * 60 + +/** + * Tool execution state stored in Redis + */ +interface ToolExecutionState { + toolCallId: string + toolName: string + status: 'pending' | 'executing' | 'completed' | 'failed' + userId: string + workflowId?: string + chatId?: string + startedAt: number + completedAt?: number + result?: unknown + error?: string +} + +/** + * Tool call data from SSE event + */ +interface ToolCallEvent { + id: string + name: string + arguments: Record + partial?: boolean +} + +/** + * Save tool execution state to Redis. + */ +async function saveToolState(state: ToolExecutionState): Promise { + const redis = getRedisClient() + if (!redis) { + logger.debug('Redis not available, skipping state save', { + toolCallId: state.toolCallId, + }) + return + } + + try { + const key = `${REDIS_KEY_PREFIX}${state.toolCallId}` + await redis.setex(key, REDIS_TTL_SECONDS, JSON.stringify(state)) + logger.debug('Saved tool execution state to Redis', { + toolCallId: state.toolCallId, + status: state.status, + }) + } catch (error) { + logger.warn('Failed to save tool state to Redis', { + toolCallId: state.toolCallId, + error: error instanceof Error ? error.message : String(error), + }) + } +} + +/** + * Get tool execution state from Redis. + */ +async function getToolState(toolCallId: string): Promise { + const redis = getRedisClient() + if (!redis) return null + + try { + const key = `${REDIS_KEY_PREFIX}${toolCallId}` + const data = await redis.get(key) + if (!data) return null + return JSON.parse(data) as ToolExecutionState + } catch (error) { + logger.warn('Failed to get tool state from Redis', { + toolCallId, + error: error instanceof Error ? error.message : String(error), + }) + return null + } +} + +/** + * Mark a tool as complete by calling the Go copilot endpoint. + */ +async function markToolComplete( + toolCallId: string, + toolName: string, + status: number, + message?: unknown, + data?: unknown +): Promise { + try { + const payload = { + id: toolCallId, + name: toolName, + status, + message, + data, + } + + logger.info('Marking tool complete from server', { + toolCallId, + toolName, + status, + hasMessage: message !== undefined, + hasData: data !== undefined, + }) + + const response = await fetch(`${SIM_AGENT_API_URL}/api/tools/mark-complete`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + ...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}), + }, + body: JSON.stringify(payload), + }) + + if (!response.ok) { + const errorText = await response.text().catch(() => '') + logger.error('Failed to mark tool complete', { + toolCallId, + toolName, + status: response.status, + error: errorText, + }) + return false + } + + logger.info('Tool marked complete successfully', { toolCallId, toolName }) + return true + } catch (error) { + logger.error('Error marking tool complete', { + toolCallId, + toolName, + error: error instanceof Error ? error.message : String(error), + }) + return false + } +} + +/** + * Handle a tool call event from the SSE stream. + * + * If the tool is server-executed: + * 1. Execute it using the server executor + * 2. Call mark-complete to return result to Go + * + * This runs asynchronously and doesn't block the stream. + * + * @returns true if this tool will be handled server-side, false if client should handle + */ +export async function handleToolCallEvent( + event: ToolCallEvent, + context: ExecutionContext +): Promise { + // Skip partial tool calls (streaming arguments) + if (event.partial) { + return false + } + + // Check if this tool should be executed server-side + if (!isServerExecutedTool(event.name)) { + logger.debug('Tool not server-executed, client will handle', { + toolCallId: event.id, + toolName: event.name, + }) + return false + } + + // Check if this tool is already being executed (recovery scenario) + const existingState = await getToolState(event.id) + if (existingState) { + if (existingState.status === 'executing') { + logger.info('Tool already being executed (recovery scenario)', { + toolCallId: event.id, + toolName: event.name, + startedAt: existingState.startedAt, + }) + return true + } + if (existingState.status === 'completed') { + logger.info('Tool already completed (recovery scenario)', { + toolCallId: event.id, + toolName: event.name, + completedAt: existingState.completedAt, + }) + return true + } + } + + logger.info('Handling tool call server-side', { + toolCallId: event.id, + toolName: event.name, + userId: context.userId, + }) + + // Save initial state to Redis + await saveToolState({ + toolCallId: event.id, + toolName: event.name, + status: 'pending', + userId: context.userId, + workflowId: context.workflowId, + chatId: context.chatId, + startedAt: Date.now(), + }) + + // Execute asynchronously - don't await here to avoid blocking stream + executeToolServerSide(event, context).catch((error) => { + logger.error('Async tool execution failed', { + toolCallId: event.id, + toolName: event.name, + error: error instanceof Error ? error.message : String(error), + }) + }) + + return true +} + +/** + * Execute a tool server-side and mark it complete. + * This is called asynchronously from handleToolCallEvent. + */ +async function executeToolServerSide( + event: ToolCallEvent, + context: ExecutionContext +): Promise { + const startTime = Date.now() + + // Update state to executing + await saveToolState({ + toolCallId: event.id, + toolName: event.name, + status: 'executing', + userId: context.userId, + workflowId: context.workflowId, + chatId: context.chatId, + startedAt: startTime, + }) + + try { + const result = await executeToolOnServer(event.name, event.arguments, context) + + if (!result) { + // This shouldn't happen since we checked isServerExecutedTool + logger.error('executeToolOnServer returned null for registered tool', { + toolCallId: event.id, + toolName: event.name, + }) + + await saveToolState({ + toolCallId: event.id, + toolName: event.name, + status: 'failed', + userId: context.userId, + workflowId: context.workflowId, + chatId: context.chatId, + startedAt: startTime, + completedAt: Date.now(), + error: 'Internal error: tool not found', + }) + + await markToolComplete(event.id, event.name, 500, 'Internal error: tool not found') + return + } + + const durationMs = Date.now() - startTime + + if (result.success) { + logger.info('Tool executed successfully', { + toolCallId: event.id, + toolName: event.name, + durationMs, + }) + + await saveToolState({ + toolCallId: event.id, + toolName: event.name, + status: 'completed', + userId: context.userId, + workflowId: context.workflowId, + chatId: context.chatId, + startedAt: startTime, + completedAt: Date.now(), + result: result.data, + }) + + // Mark complete with success + await markToolComplete( + event.id, + event.name, + 200, + undefined, // message + result.data // data + ) + } else { + logger.warn('Tool execution failed', { + toolCallId: event.id, + toolName: event.name, + durationMs, + error: result.error, + }) + + await saveToolState({ + toolCallId: event.id, + toolName: event.name, + status: 'failed', + userId: context.userId, + workflowId: context.workflowId, + chatId: context.chatId, + startedAt: startTime, + completedAt: Date.now(), + error: result.error?.message, + }) + + // Mark complete with error + await markToolComplete( + event.id, + event.name, + 400, + result.error?.message ?? 'Tool execution failed', + result.error?.details + ) + } + } catch (error) { + const durationMs = Date.now() - startTime + const message = error instanceof Error ? error.message : String(error) + + logger.error('Tool execution threw exception', { + toolCallId: event.id, + toolName: event.name, + durationMs, + error: message, + }) + + await saveToolState({ + toolCallId: event.id, + toolName: event.name, + status: 'failed', + userId: context.userId, + workflowId: context.workflowId, + chatId: context.chatId, + startedAt: startTime, + completedAt: Date.now(), + error: message, + }) + + // Mark complete with error + await markToolComplete(event.id, event.name, 500, message) + } +} + +/** + * In-memory fallback for tracking server-handled tools when Redis is unavailable. + */ +const serverHandledTools = new Map() + +/** + * Register a tool as being handled server-side. + */ +export function registerServerHandledTool(toolCallId: string, toolName: string): void { + serverHandledTools.set(toolCallId, { + toolName, + handledAt: Date.now(), + }) + + // Clean up old entries (older than 1 hour) + const oneHourAgo = Date.now() - 60 * 60 * 1000 + for (const [id, info] of serverHandledTools.entries()) { + if (info.handledAt < oneHourAgo) { + serverHandledTools.delete(id) + } + } +} + +/** + * Check if a tool was handled server-side. + */ +export async function wasToolHandledServerSide(toolCallId: string): Promise { + // Check in-memory first + if (serverHandledTools.has(toolCallId)) { + return true + } + + // Check Redis + const state = await getToolState(toolCallId) + return state !== null +} + +/** + * Get the execution state of a tool. + * Useful for client reconnection scenarios. + */ +export async function getToolExecutionState( + toolCallId: string +): Promise { + return getToolState(toolCallId) +} + +/** + * Get list of server-executed tool names for client reference. + */ +export { SERVER_EXECUTED_TOOLS } from './registry' diff --git a/apps/sim/lib/copilot/server-executor/types.ts b/apps/sim/lib/copilot/server-executor/types.ts new file mode 100644 index 000000000..785b5f319 --- /dev/null +++ b/apps/sim/lib/copilot/server-executor/types.ts @@ -0,0 +1,87 @@ +/** + * Type definitions for the server executor. + * + * This provides a clean, type-safe interface for tool execution + * without any 'any' types. + */ + +import type { z } from 'zod' + +/** + * Standard result type for all tool executions. + * This is the contract between server executors and the chat route. + */ +export interface ToolResult { + success: boolean + data?: T + error?: { + code: string + message: string + details?: Record + } +} + +/** + * Context passed to tool executors. + */ +export interface ExecutionContext { + userId: string + workflowId?: string + chatId?: string +} + +/** + * Configuration for a registered tool. + * This defines how a tool should be validated and executed. + */ +export interface ToolConfig< + TInputSchema extends z.ZodType = z.ZodType, + TOutputSchema extends z.ZodType = z.ZodType, +> { + /** The canonical name of the tool */ + name: string + + /** Zod schema for validating input args (optional - if not provided, args pass through) */ + inputSchema?: TInputSchema + + /** Zod schema for validating output (optional - if not provided, output passes through) */ + outputSchema?: TOutputSchema + + /** Whether context (userId) is required for this tool */ + requiresAuth?: boolean + + /** + * The execute function. + * Takes validated args and context, returns result data. + */ + execute: ( + args: TInputSchema extends z.ZodType ? z.infer : unknown, + context: ExecutionContext + ) => Promise : unknown> +} + +/** + * Type for a tool executor function (after wrapping). + */ +export type ToolExecutor = (args: unknown, context: ExecutionContext) => Promise + +/** + * Helper to create a success result. + */ +export function successResult(data: T): ToolResult { + return { success: true, data } +} + +/** + * Helper to create an error result. + */ +export function errorResult( + code: string, + message: string, + details?: Record +): ToolResult { + return { + success: false, + error: { code, message, details }, + } +} diff --git a/apps/sim/lib/copilot/tools/server/base-tool.ts b/apps/sim/lib/copilot/tools/server/base-tool.ts index 40ec3584c..753dbee93 100644 --- a/apps/sim/lib/copilot/tools/server/base-tool.ts +++ b/apps/sim/lib/copilot/tools/server/base-tool.ts @@ -1,4 +1,12 @@ -export interface BaseServerTool { +/** + * Base interface for server-executed tools. + * + * @template TArgs - The type of arguments the tool accepts + * @template TResult - The type of result the tool returns + */ +export interface BaseServerTool { + /** The canonical name of the tool (must match the registry key) */ name: string + /** Execute the tool with the given arguments and context */ execute(args: TArgs, context?: { userId: string }): Promise } diff --git a/apps/sim/lib/copilot/tools/server/other/checkoff-todo.ts b/apps/sim/lib/copilot/tools/server/other/checkoff-todo.ts new file mode 100644 index 000000000..30f63af16 --- /dev/null +++ b/apps/sim/lib/copilot/tools/server/other/checkoff-todo.ts @@ -0,0 +1,44 @@ +import { createLogger } from '@sim/logger' +import { z } from 'zod' +import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool' + +const logger = createLogger('CheckoffTodoServerTool') + +export const CheckoffTodoInput = z.object({ + id: z.string().optional(), + todoId: z.string().optional(), +}) + +export const CheckoffTodoResult = z.object({ + todoId: z.string(), + success: z.boolean(), +}) + +export type CheckoffTodoInputType = z.infer +export type CheckoffTodoResultType = z.infer + +/** + * Server-side tool to mark a todo as complete. + * The actual UI update happens client-side when the store receives the tool_result event. + */ +export const checkoffTodoServerTool: BaseServerTool = + { + name: 'checkoff_todo', + async execute(args: unknown, _context?: { userId: string }) { + const parsed = CheckoffTodoInput.parse(args) + const todoId = parsed.id || parsed.todoId + + if (!todoId) { + throw new Error('Missing todo id') + } + + logger.info('Marking todo as complete', { todoId }) + + // The actual state update happens client-side via tool_result handler + // We just return success to signal the action was processed + return CheckoffTodoResult.parse({ + todoId, + success: true, + }) + }, + } diff --git a/apps/sim/lib/copilot/tools/server/other/mark-todo-in-progress.ts b/apps/sim/lib/copilot/tools/server/other/mark-todo-in-progress.ts new file mode 100644 index 000000000..cbad786dc --- /dev/null +++ b/apps/sim/lib/copilot/tools/server/other/mark-todo-in-progress.ts @@ -0,0 +1,45 @@ +import { createLogger } from '@sim/logger' +import { z } from 'zod' +import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool' + +const logger = createLogger('MarkTodoInProgressServerTool') + +export const MarkTodoInProgressInput = z.object({ + id: z.string().optional(), + todoId: z.string().optional(), +}) + +export const MarkTodoInProgressResult = z.object({ + todoId: z.string(), + success: z.boolean(), +}) + +export type MarkTodoInProgressInputType = z.infer +export type MarkTodoInProgressResultType = z.infer + +/** + * Server-side tool to mark a todo as in progress. + * The actual UI update happens client-side when the store receives the tool_result event. + */ +export const markTodoInProgressServerTool: BaseServerTool< + MarkTodoInProgressInputType, + MarkTodoInProgressResultType +> = { + name: 'mark_todo_in_progress', + async execute(args: unknown, _context?: { userId: string }) { + const parsed = MarkTodoInProgressInput.parse(args) + const todoId = parsed.id || parsed.todoId + + if (!todoId) { + throw new Error('Missing todo id') + } + + logger.info('Marking todo as in progress', { todoId }) + + // The actual state update happens client-side via tool_result handler + return MarkTodoInProgressResult.parse({ + todoId, + success: true, + }) + }, +} diff --git a/apps/sim/lib/copilot/tools/server/other/sleep.ts b/apps/sim/lib/copilot/tools/server/other/sleep.ts new file mode 100644 index 000000000..dc3fbcdb3 --- /dev/null +++ b/apps/sim/lib/copilot/tools/server/other/sleep.ts @@ -0,0 +1,45 @@ +import { createLogger } from '@sim/logger' +import { z } from 'zod' +import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool' + +const logger = createLogger('SleepServerTool') + +/** Maximum sleep duration in seconds (3 minutes) */ +const MAX_SLEEP_SECONDS = 180 + +export const SleepInput = z.object({ + seconds: z.number().min(0).max(MAX_SLEEP_SECONDS).optional().default(0), +}) + +export const SleepResult = z.object({ + sleptFor: z.number(), + success: z.boolean(), +}) + +export type SleepInputType = z.infer +export type SleepResultType = z.infer + +export const sleepServerTool: BaseServerTool = { + name: 'sleep', + async execute(args: unknown, _context?: { userId: string }) { + const parsed = SleepInput.parse(args) + let seconds = parsed.seconds + + // Clamp to max + if (seconds > MAX_SLEEP_SECONDS) { + seconds = MAX_SLEEP_SECONDS + } + + logger.info('Starting sleep', { seconds }) + + // Actually sleep + await new Promise((resolve) => setTimeout(resolve, seconds * 1000)) + + logger.info('Sleep completed', { seconds }) + + return SleepResult.parse({ + sleptFor: seconds, + success: true, + }) + }, +} diff --git a/apps/sim/lib/copilot/tools/server/router.ts b/apps/sim/lib/copilot/tools/server/router.ts index 2c79cff74..ae1c39c06 100644 --- a/apps/sim/lib/copilot/tools/server/router.ts +++ b/apps/sim/lib/copilot/tools/server/router.ts @@ -1,66 +1,37 @@ -import { createLogger } from '@sim/logger' -import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool' -import { getBlockConfigServerTool } from '@/lib/copilot/tools/server/blocks/get-block-config' -import { getBlockOptionsServerTool } from '@/lib/copilot/tools/server/blocks/get-block-options' -import { getBlocksAndToolsServerTool } from '@/lib/copilot/tools/server/blocks/get-blocks-and-tools' -import { getBlocksMetadataServerTool } from '@/lib/copilot/tools/server/blocks/get-blocks-metadata-tool' -import { getTriggerBlocksServerTool } from '@/lib/copilot/tools/server/blocks/get-trigger-blocks' -import { searchDocumentationServerTool } from '@/lib/copilot/tools/server/docs/search-documentation' -import { knowledgeBaseServerTool } from '@/lib/copilot/tools/server/knowledge/knowledge-base' -import { makeApiRequestServerTool } from '@/lib/copilot/tools/server/other/make-api-request' -import { searchOnlineServerTool } from '@/lib/copilot/tools/server/other/search-online' -import { getCredentialsServerTool } from '@/lib/copilot/tools/server/user/get-credentials' -import { setEnvironmentVariablesServerTool } from '@/lib/copilot/tools/server/user/set-environment-variables' -import { editWorkflowServerTool } from '@/lib/copilot/tools/server/workflow/edit-workflow' -import { getWorkflowConsoleServerTool } from '@/lib/copilot/tools/server/workflow/get-workflow-console' -import { - ExecuteResponseSuccessSchema, - GetBlockConfigInput, - GetBlockConfigResult, - GetBlockOptionsInput, - GetBlockOptionsResult, - GetBlocksAndToolsInput, - GetBlocksAndToolsResult, - GetBlocksMetadataInput, - GetBlocksMetadataResult, - GetTriggerBlocksInput, - GetTriggerBlocksResult, - KnowledgeBaseArgsSchema, -} from '@/lib/copilot/tools/shared/schemas' +/** + * Server Tool Router + * + * This module provides backwards compatibility for the execute-copilot-server-tool API route. + * It delegates to the new unified registry in server-executor/registry.ts + * + * @deprecated Use executeRegisteredTool from server-executor/registry.ts directly + */ -// Generic execute response schemas (success path only for this route; errors handled via HTTP status) +import { createLogger } from '@sim/logger' +import { executeRegisteredTool, isServerExecutedTool } from '@/lib/copilot/server-executor/registry' +import { ExecuteResponseSuccessSchema } from '@/lib/copilot/tools/shared/schemas' + +const logger = createLogger('ServerToolRouter') + +// Re-export for backwards compatibility export { ExecuteResponseSuccessSchema } export type ExecuteResponseSuccess = (typeof ExecuteResponseSuccessSchema)['_type'] -// Define server tool registry for the new copilot runtime -const serverToolRegistry: Record> = {} -const logger = createLogger('ServerToolRouter') - -// Register tools -serverToolRegistry[getBlocksAndToolsServerTool.name] = getBlocksAndToolsServerTool -serverToolRegistry[getBlocksMetadataServerTool.name] = getBlocksMetadataServerTool -serverToolRegistry[getBlockOptionsServerTool.name] = getBlockOptionsServerTool -serverToolRegistry[getBlockConfigServerTool.name] = getBlockConfigServerTool -serverToolRegistry[getTriggerBlocksServerTool.name] = getTriggerBlocksServerTool -serverToolRegistry[editWorkflowServerTool.name] = editWorkflowServerTool -serverToolRegistry[getWorkflowConsoleServerTool.name] = getWorkflowConsoleServerTool -serverToolRegistry[searchDocumentationServerTool.name] = searchDocumentationServerTool -serverToolRegistry[searchOnlineServerTool.name] = searchOnlineServerTool -serverToolRegistry[setEnvironmentVariablesServerTool.name] = setEnvironmentVariablesServerTool -serverToolRegistry[getCredentialsServerTool.name] = getCredentialsServerTool -serverToolRegistry[makeApiRequestServerTool.name] = makeApiRequestServerTool -serverToolRegistry[knowledgeBaseServerTool.name] = knowledgeBaseServerTool - +/** + * Route execution to the appropriate server tool. + * + * @deprecated Use executeRegisteredTool from server-executor/registry.ts directly + */ export async function routeExecution( toolName: string, payload: unknown, context?: { userId: string } -): Promise { - const tool = serverToolRegistry[toolName] - if (!tool) { +): Promise { + if (!isServerExecutedTool(toolName)) { throw new Error(`Unknown server tool: ${toolName}`) } - logger.debug('Routing to tool', { + + logger.debug('Routing to tool via unified registry', { toolName, payloadPreview: (() => { try { @@ -71,43 +42,15 @@ export async function routeExecution( })(), }) - let args: any = payload || {} - if (toolName === 'get_blocks_and_tools') { - args = GetBlocksAndToolsInput.parse(args) - } - if (toolName === 'get_blocks_metadata') { - args = GetBlocksMetadataInput.parse(args) - } - if (toolName === 'get_block_options') { - args = GetBlockOptionsInput.parse(args) - } - if (toolName === 'get_block_config') { - args = GetBlockConfigInput.parse(args) - } - if (toolName === 'get_trigger_blocks') { - args = GetTriggerBlocksInput.parse(args) - } - if (toolName === 'knowledge_base') { - args = KnowledgeBaseArgsSchema.parse(args) + const result = await executeRegisteredTool(toolName, payload, { + userId: context?.userId ?? '', + }) + + // The old API expected the raw result, not wrapped in ToolResult + // For backwards compatibility, unwrap and throw on error + if (!result.success) { + throw new Error(result.error?.message ?? 'Tool execution failed') } - const result = await tool.execute(args, context) - - if (toolName === 'get_blocks_and_tools') { - return GetBlocksAndToolsResult.parse(result) - } - if (toolName === 'get_blocks_metadata') { - return GetBlocksMetadataResult.parse(result) - } - if (toolName === 'get_block_options') { - return GetBlockOptionsResult.parse(result) - } - if (toolName === 'get_block_config') { - return GetBlockConfigResult.parse(result) - } - if (toolName === 'get_trigger_blocks') { - return GetTriggerBlocksResult.parse(result) - } - - return result + return result.data } diff --git a/apps/sim/lib/copilot/tools/server/workflow/check-deployment-status.ts b/apps/sim/lib/copilot/tools/server/workflow/check-deployment-status.ts new file mode 100644 index 000000000..18af0d6d2 --- /dev/null +++ b/apps/sim/lib/copilot/tools/server/workflow/check-deployment-status.ts @@ -0,0 +1,172 @@ +import { db } from '@sim/db' +import { + chat, + workflow, + workflowDeploymentVersion, + workflowMcpServer, + workflowMcpTool, +} from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq } from 'drizzle-orm' +import { z } from 'zod' +import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool' +import { env } from '@/lib/core/config/env' + +const logger = createLogger('CheckDeploymentStatusServerTool') + +export const CheckDeploymentStatusInput = z.object({ + workflowId: z.string(), +}) + +export const CheckDeploymentStatusResult = z.object({ + isDeployed: z.boolean(), + deploymentTypes: z.array(z.string()), + api: z.object({ + isDeployed: z.boolean(), + deployedAt: z.string().nullable(), + endpoint: z.string().nullable(), + }), + chat: z.object({ + isDeployed: z.boolean(), + chatId: z.string().nullable(), + identifier: z.string().nullable(), + chatUrl: z.string().nullable(), + title: z.string().nullable(), + }), + mcp: z.object({ + isDeployed: z.boolean(), + servers: z.array( + z.object({ + serverId: z.string(), + serverName: z.string(), + toolName: z.string(), + }) + ), + }), + message: z.string(), +}) + +export type CheckDeploymentStatusInputType = z.infer +export type CheckDeploymentStatusResultType = z.infer + +export const checkDeploymentStatusServerTool: BaseServerTool< + CheckDeploymentStatusInputType, + CheckDeploymentStatusResultType +> = { + name: 'check_deployment_status', + async execute(args: unknown, _context?: { userId: string }) { + const parsed = CheckDeploymentStatusInput.parse(args) + const { workflowId } = parsed + + logger.debug('Checking deployment status', { workflowId }) + + // Get workflow to find workspaceId + const [wf] = await db + .select({ workspaceId: workflow.workspaceId }) + .from(workflow) + .where(eq(workflow.id, workflowId)) + .limit(1) + + const workspaceId = wf?.workspaceId + + // Check API deployment (active deployment version) + const [apiDeploy] = await db + .select({ + id: workflowDeploymentVersion.id, + createdAt: workflowDeploymentVersion.createdAt, + }) + .from(workflowDeploymentVersion) + .where( + and( + eq(workflowDeploymentVersion.workflowId, workflowId), + eq(workflowDeploymentVersion.isActive, true) + ) + ) + .limit(1) + + const isApiDeployed = !!apiDeploy + const appUrl = env.NEXT_PUBLIC_APP_URL || 'https://simstudio.ai' + + // Check chat deployment + const [chatDeploy] = await db + .select({ + id: chat.id, + identifier: chat.identifier, + title: chat.title, + }) + .from(chat) + .where(eq(chat.workflowId, workflowId)) + .limit(1) + + const isChatDeployed = !!chatDeploy + + // Check MCP deployment + let mcpToolDeployments: { serverId: string; serverName: string; toolName: string }[] = [] + if (workspaceId) { + const mcpTools = await db + .select({ + toolName: workflowMcpTool.toolName, + serverId: workflowMcpTool.serverId, + serverName: workflowMcpServer.name, + }) + .from(workflowMcpTool) + .innerJoin(workflowMcpServer, eq(workflowMcpTool.serverId, workflowMcpServer.id)) + .where(eq(workflowMcpTool.workflowId, workflowId)) + + mcpToolDeployments = mcpTools.map((t) => ({ + serverId: t.serverId, + serverName: t.serverName, + toolName: t.toolName, + })) + } + + const isMcpDeployed = mcpToolDeployments.length > 0 + + // Build result + const deploymentTypes: string[] = [] + if (isApiDeployed) deploymentTypes.push('api') + if (isChatDeployed) deploymentTypes.push('chat') + if (isMcpDeployed) deploymentTypes.push('mcp') + + const isDeployed = isApiDeployed || isChatDeployed || isMcpDeployed + + // Build summary message + let message = '' + if (!isDeployed) { + message = 'Workflow is not deployed' + } else { + const parts: string[] = [] + if (isApiDeployed) parts.push('API') + if (isChatDeployed) parts.push(`Chat (${chatDeploy?.identifier})`) + if (isMcpDeployed) { + const serverNames = [...new Set(mcpToolDeployments.map((d) => d.serverName))].join(', ') + parts.push(`MCP (${serverNames})`) + } + message = `Workflow is deployed as: ${parts.join(', ')}` + } + + logger.info('Checked deployment status', { workflowId, isDeployed, deploymentTypes }) + + return CheckDeploymentStatusResult.parse({ + isDeployed, + deploymentTypes, + api: { + isDeployed: isApiDeployed, + deployedAt: apiDeploy?.createdAt?.toISOString() || null, + endpoint: isApiDeployed ? `${appUrl}/api/workflows/${workflowId}/execute` : null, + }, + chat: { + isDeployed: isChatDeployed, + chatId: chatDeploy?.id || null, + identifier: chatDeploy?.identifier || null, + chatUrl: isChatDeployed ? `${appUrl}/chat/${chatDeploy?.identifier}` : null, + title: chatDeploy?.title || null, + }, + mcp: { + isDeployed: isMcpDeployed, + servers: mcpToolDeployments, + }, + message, + }) + }, +} diff --git a/apps/sim/lib/copilot/tools/server/workflow/create-workspace-mcp-server.ts b/apps/sim/lib/copilot/tools/server/workflow/create-workspace-mcp-server.ts new file mode 100644 index 000000000..505f041a4 --- /dev/null +++ b/apps/sim/lib/copilot/tools/server/workflow/create-workspace-mcp-server.ts @@ -0,0 +1,73 @@ +import { db } from '@sim/db' +import { workflowMcpServer } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { eq } from 'drizzle-orm' +import { z } from 'zod' +import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool' + +const logger = createLogger('CreateWorkspaceMcpServerServerTool') + +export const CreateWorkspaceMcpServerInput = z.object({ + name: z.string().min(1), + description: z.string().optional(), + workspaceId: z.string().min(1), +}) + +export const CreateWorkspaceMcpServerResult = z.object({ + success: z.boolean(), + serverId: z.string().nullable(), + serverName: z.string().nullable(), + description: z.string().nullable(), + message: z.string(), +}) + +export type CreateWorkspaceMcpServerInputType = z.infer +export type CreateWorkspaceMcpServerResultType = z.infer + +export const createWorkspaceMcpServerServerTool: BaseServerTool< + CreateWorkspaceMcpServerInputType, + CreateWorkspaceMcpServerResultType +> = { + name: 'create_workspace_mcp_server', + async execute(args: unknown, context?: { userId: string }) { + const parsed = CreateWorkspaceMcpServerInput.parse(args) + const { name, description, workspaceId } = parsed + + if (!context?.userId) { + throw new Error('User authentication required') + } + + logger.debug('Creating workspace MCP server', { name, workspaceId }) + + // Check if server with same name already exists + const existing = await db + .select({ id: workflowMcpServer.id }) + .from(workflowMcpServer) + .where(eq(workflowMcpServer.workspaceId, workspaceId)) + .limit(100) + + // Generate unique ID + const serverId = crypto.randomUUID() + const now = new Date() + + await db.insert(workflowMcpServer).values({ + id: serverId, + workspaceId, + createdBy: context.userId, + name: name.trim(), + description: description?.trim() || null, + createdAt: now, + updatedAt: now, + }) + + logger.info('Created MCP server', { serverId, name }) + + return CreateWorkspaceMcpServerResult.parse({ + success: true, + serverId, + serverName: name.trim(), + description: description?.trim() || null, + message: `MCP server "${name}" created successfully. You can now deploy workflows to it using deploy_mcp.`, + }) + }, +} diff --git a/apps/sim/lib/copilot/tools/server/workflow/deploy-api.ts b/apps/sim/lib/copilot/tools/server/workflow/deploy-api.ts new file mode 100644 index 000000000..5c1dc2003 --- /dev/null +++ b/apps/sim/lib/copilot/tools/server/workflow/deploy-api.ts @@ -0,0 +1,152 @@ +import { db } from '@sim/db' +import { apiKey, workflow, workflowDeploymentVersion } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { desc, eq, or } from 'drizzle-orm' +import { z } from 'zod' +import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool' +import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' + +const logger = createLogger('DeployApiServerTool') + +export const DeployApiInput = z.object({ + action: z.enum(['deploy', 'undeploy']).default('deploy'), + workflowId: z.string().min(1), +}) + +export const DeployApiResult = z.object({ + success: z.boolean(), + action: z.string(), + isDeployed: z.boolean(), + deployedAt: z.string().nullable(), + endpoint: z.string().nullable(), + curlCommand: z.string().nullable(), + message: z.string(), + needsApiKey: z.boolean().optional(), +}) + +export type DeployApiInputType = z.infer +export type DeployApiResultType = z.infer + +export const deployApiServerTool: BaseServerTool = { + name: 'deploy_api', + async execute(args: unknown, context?: { userId: string }) { + const parsed = DeployApiInput.parse(args) + const { action, workflowId } = parsed + + if (!context?.userId) { + throw new Error('User authentication required') + } + + logger.debug('Deploy API', { action, workflowId }) + + // Get workflow info + const [wf] = await db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1) + + if (!wf) { + throw new Error(`Workflow not found: ${workflowId}`) + } + + const workspaceId = wf.workspaceId + + if (action === 'undeploy') { + // Deactivate all deployment versions + await db + .update(workflowDeploymentVersion) + .set({ isActive: false }) + .where(eq(workflowDeploymentVersion.workflowId, workflowId)) + + logger.info('Workflow undeployed', { workflowId }) + + return DeployApiResult.parse({ + success: true, + action: 'undeploy', + isDeployed: false, + deployedAt: null, + endpoint: null, + curlCommand: null, + message: 'Workflow undeployed successfully.', + }) + } + + // Deploy action - check if user has API keys + const keys = await db + .select({ id: apiKey.id }) + .from(apiKey) + .where( + or( + eq(apiKey.userId, context.userId), + workspaceId ? eq(apiKey.workspaceId, workspaceId) : undefined + ) + ) + .limit(1) + + if (keys.length === 0) { + return DeployApiResult.parse({ + success: false, + action: 'deploy', + isDeployed: false, + deployedAt: null, + endpoint: null, + curlCommand: null, + message: + 'Cannot deploy without an API key. Please create an API key in settings first, then try deploying again.', + needsApiKey: true, + }) + } + + // Get current max version + const [maxVersion] = await db + .select({ version: workflowDeploymentVersion.version }) + .from(workflowDeploymentVersion) + .where(eq(workflowDeploymentVersion.workflowId, workflowId)) + .orderBy(desc(workflowDeploymentVersion.version)) + .limit(1) + + const newVersion = (maxVersion?.version || 0) + 1 + + // Deactivate all existing versions + await db + .update(workflowDeploymentVersion) + .set({ isActive: false }) + .where(eq(workflowDeploymentVersion.workflowId, workflowId)) + + // Create new deployment version + const deploymentId = crypto.randomUUID() + const now = new Date() + + // Load workflow state from normalized tables + const normalizedData = await loadWorkflowFromNormalizedTables(workflowId) + const workflowState = { + blocks: normalizedData?.blocks || {}, + edges: normalizedData?.edges || [], + loops: normalizedData?.loops || {}, + parallels: normalizedData?.parallels || {}, + } + + await db.insert(workflowDeploymentVersion).values({ + id: deploymentId, + workflowId, + version: newVersion, + state: workflowState, + isActive: true, + createdAt: now, + }) + + // Build API info + const appUrl = process.env.NEXT_PUBLIC_APP_URL || 'http://localhost:3000' + const apiEndpoint = `${appUrl}/api/workflows/${workflowId}/execute` + const curlCommand = `curl -X POST -H "X-API-Key: $SIM_API_KEY" -H "Content-Type: application/json" ${apiEndpoint}` + + logger.info('Workflow deployed as API', { workflowId, version: newVersion }) + + return DeployApiResult.parse({ + success: true, + action: 'deploy', + isDeployed: true, + deployedAt: now.toISOString(), + endpoint: apiEndpoint, + curlCommand, + message: 'Workflow deployed successfully as API. You can now call it via REST.', + }) + }, +} diff --git a/apps/sim/lib/copilot/tools/server/workflow/deploy-chat.ts b/apps/sim/lib/copilot/tools/server/workflow/deploy-chat.ts new file mode 100644 index 000000000..9e8fc7dc9 --- /dev/null +++ b/apps/sim/lib/copilot/tools/server/workflow/deploy-chat.ts @@ -0,0 +1,274 @@ +import { db } from '@sim/db' +import { chat, workflow, workflowDeploymentVersion } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, desc, eq } from 'drizzle-orm' +import { z } from 'zod' +import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool' +import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' + +const logger = createLogger('DeployChatServerTool') + +const OutputConfigSchema = z.object({ + blockId: z.string(), + path: z.string(), +}) + +export const DeployChatInput = z.object({ + action: z.enum(['deploy', 'undeploy']).default('deploy'), + workflowId: z.string().min(1), + identifier: z.string().optional(), + title: z.string().optional(), + description: z.string().optional(), + authType: z.enum(['public', 'password', 'email', 'sso']).optional().default('public'), + password: z.string().optional(), + allowedEmails: z.array(z.string()).optional(), + welcomeMessage: z.string().optional(), + outputConfigs: z.array(OutputConfigSchema).optional(), +}) + +export const DeployChatResult = z.object({ + success: z.boolean(), + action: z.string(), + isDeployed: z.boolean(), + chatId: z.string().nullable(), + chatUrl: z.string().nullable(), + identifier: z.string().nullable(), + title: z.string().nullable(), + authType: z.string().nullable(), + message: z.string(), + error: z.string().optional(), + errorCode: z.string().optional(), +}) + +export type DeployChatInputType = z.infer +export type DeployChatResultType = z.infer + +function generateIdentifier(workflowName: string): string { + return workflowName + .toLowerCase() + .replace(/[^a-z0-9]+/g, '-') + .replace(/^-|-$/g, '') + .substring(0, 50) +} + +export const deployChatServerTool: BaseServerTool = { + name: 'deploy_chat', + async execute(args: unknown, context?: { userId: string }) { + const parsed = DeployChatInput.parse(args) + const { action, workflowId, authType = 'public' } = parsed + + if (!context?.userId) { + throw new Error('User authentication required') + } + + logger.debug('Deploy Chat', { action, workflowId }) + + // Get workflow info + const [wf] = await db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1) + + if (!wf) { + throw new Error(`Workflow not found: ${workflowId}`) + } + + const appUrl = process.env.NEXT_PUBLIC_APP_URL || 'http://localhost:3000' + + // Check for existing deployment + const [existingChat] = await db + .select() + .from(chat) + .where(and(eq(chat.workflowId, workflowId), eq(chat.isActive, true))) + .limit(1) + + if (action === 'undeploy') { + if (!existingChat) { + return DeployChatResult.parse({ + success: false, + action: 'undeploy', + isDeployed: false, + chatId: null, + chatUrl: null, + identifier: null, + title: null, + authType: null, + message: 'No active chat deployment found for this workflow', + error: 'No active chat deployment found', + errorCode: 'VALIDATION_ERROR', + }) + } + + // Deactivate the chat deployment + await db.update(chat).set({ isActive: false }).where(eq(chat.id, existingChat.id)) + + logger.info('Chat undeployed', { workflowId, chatId: existingChat.id }) + + return DeployChatResult.parse({ + success: true, + action: 'undeploy', + isDeployed: false, + chatId: null, + chatUrl: null, + identifier: null, + title: null, + authType: null, + message: 'Chat deployment removed successfully.', + }) + } + + // Deploy action + const identifier = + parsed.identifier || existingChat?.identifier || generateIdentifier(wf.name || 'chat') + const title = parsed.title || existingChat?.title || wf.name || 'Chat' + const description = parsed.description ?? existingChat?.description ?? '' + const welcomeMessage = + parsed.welcomeMessage || + (existingChat?.customizations as any)?.welcomeMessage || + 'Hi there! How can I help you today?' + const primaryColor = + (existingChat?.customizations as any)?.primaryColor || 'var(--brand-primary-hover-hex)' + const existingAllowedEmails = Array.isArray(existingChat?.allowedEmails) + ? existingChat.allowedEmails + : [] + const allowedEmails = parsed.allowedEmails || existingAllowedEmails + const outputConfigs = parsed.outputConfigs || existingChat?.outputConfigs || [] + + // Validate requirements + if (authType === 'password' && !parsed.password && !existingChat?.password) { + throw new Error('Password is required when using password protection') + } + + if ((authType === 'email' || authType === 'sso') && allowedEmails.length === 0) { + throw new Error(`At least one email or domain is required when using ${authType} access`) + } + + // Check if identifier is already in use by another workflow + if (!existingChat) { + const [existingIdentifier] = await db + .select({ id: chat.id }) + .from(chat) + .where(and(eq(chat.identifier, identifier), eq(chat.isActive, true))) + .limit(1) + + if (existingIdentifier) { + return DeployChatResult.parse({ + success: false, + action: 'deploy', + isDeployed: false, + chatId: null, + chatUrl: null, + identifier, + title: null, + authType: null, + message: `The identifier "${identifier}" is already in use. Please choose a different one.`, + error: `Identifier "${identifier}" is already taken`, + errorCode: 'IDENTIFIER_TAKEN', + }) + } + } + + // Ensure workflow is deployed as API first + const [deployment] = await db + .select({ id: workflowDeploymentVersion.id }) + .from(workflowDeploymentVersion) + .where( + and( + eq(workflowDeploymentVersion.workflowId, workflowId), + eq(workflowDeploymentVersion.isActive, true) + ) + ) + .limit(1) + + if (!deployment) { + // Auto-deploy the API + const [maxVersion] = await db + .select({ version: workflowDeploymentVersion.version }) + .from(workflowDeploymentVersion) + .where(eq(workflowDeploymentVersion.workflowId, workflowId)) + .orderBy(desc(workflowDeploymentVersion.version)) + .limit(1) + + const newVersion = (maxVersion?.version || 0) + 1 + const deploymentId = crypto.randomUUID() + const now = new Date() + + // Load workflow state from normalized tables + const normalizedData = await loadWorkflowFromNormalizedTables(workflowId) + const workflowState = { + blocks: normalizedData?.blocks || {}, + edges: normalizedData?.edges || [], + loops: normalizedData?.loops || {}, + parallels: normalizedData?.parallels || {}, + } + + await db.insert(workflowDeploymentVersion).values({ + id: deploymentId, + workflowId, + version: newVersion, + state: workflowState, + isActive: true, + createdAt: now, + }) + + logger.info('Auto-deployed API for chat', { workflowId, version: newVersion }) + } + + const now = new Date() + let chatId: string + + if (existingChat) { + // Update existing deployment + await db + .update(chat) + .set({ + identifier: identifier.trim(), + title: title.trim(), + description: description.trim(), + authType, + password: authType === 'password' ? parsed.password || existingChat.password : null, + allowedEmails: authType === 'email' || authType === 'sso' ? allowedEmails : [], + customizations: { primaryColor, welcomeMessage: welcomeMessage.trim() }, + outputConfigs, + updatedAt: now, + }) + .where(eq(chat.id, existingChat.id)) + + chatId = existingChat.id + logger.info('Updated chat deployment', { chatId }) + } else { + // Create new deployment + chatId = crypto.randomUUID() + + await db.insert(chat).values({ + id: chatId, + workflowId, + userId: context.userId, + identifier: identifier.trim(), + title: title.trim(), + description: description.trim(), + authType, + password: authType === 'password' ? parsed.password : null, + allowedEmails: authType === 'email' || authType === 'sso' ? allowedEmails : [], + customizations: { primaryColor, welcomeMessage: welcomeMessage.trim() }, + outputConfigs, + isActive: true, + createdAt: now, + updatedAt: now, + }) + + logger.info('Created chat deployment', { chatId }) + } + + const chatUrl = `${appUrl}/chat/${identifier}` + + return DeployChatResult.parse({ + success: true, + action: 'deploy', + isDeployed: true, + chatId, + chatUrl, + identifier, + title, + authType, + message: `Chat deployed successfully! Available at: ${chatUrl}`, + }) + }, +} diff --git a/apps/sim/lib/copilot/tools/server/workflow/deploy-mcp.ts b/apps/sim/lib/copilot/tools/server/workflow/deploy-mcp.ts new file mode 100644 index 000000000..e5757bfb6 --- /dev/null +++ b/apps/sim/lib/copilot/tools/server/workflow/deploy-mcp.ts @@ -0,0 +1,166 @@ +import { db } from '@sim/db' +import { + workflow, + workflowDeploymentVersion, + workflowMcpServer, + workflowMcpTool, +} from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq } from 'drizzle-orm' +import { z } from 'zod' +import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool' + +const logger = createLogger('DeployMcpServerTool') + +const ParameterDescriptionSchema = z.object({ + name: z.string(), + description: z.string(), +}) + +export const DeployMcpInput = z.object({ + serverId: z.string().min(1), + workflowId: z.string().min(1), + toolName: z.string().optional(), + toolDescription: z.string().optional(), + parameterDescriptions: z.array(ParameterDescriptionSchema).optional(), +}) + +export const DeployMcpResult = z.object({ + success: z.boolean(), + toolId: z.string().nullable(), + toolName: z.string().nullable(), + toolDescription: z.string().nullable(), + serverId: z.string().nullable(), + updated: z.boolean().optional(), + message: z.string(), + error: z.string().optional(), +}) + +export type DeployMcpInputType = z.infer +export type DeployMcpResultType = z.infer + +export const deployMcpServerTool: BaseServerTool = { + name: 'deploy_mcp', + async execute(args: unknown, context?: { userId: string }) { + const parsed = DeployMcpInput.parse(args) + const { serverId, workflowId, toolName, toolDescription, parameterDescriptions } = parsed + + if (!context?.userId) { + throw new Error('User authentication required') + } + + logger.debug('Deploy MCP', { serverId, workflowId }) + + // Get workflow info + const [wf] = await db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1) + + if (!wf) { + throw new Error(`Workflow not found: ${workflowId}`) + } + + // Check if server exists + const [server] = await db + .select() + .from(workflowMcpServer) + .where(eq(workflowMcpServer.id, serverId)) + .limit(1) + + if (!server) { + throw new Error( + 'MCP server not found. Use list_workspace_mcp_servers to see available servers.' + ) + } + + // Check if workflow is deployed as API + const [deployment] = await db + .select({ id: workflowDeploymentVersion.id }) + .from(workflowDeploymentVersion) + .where( + and( + eq(workflowDeploymentVersion.workflowId, workflowId), + eq(workflowDeploymentVersion.isActive, true) + ) + ) + .limit(1) + + if (!deployment) { + throw new Error( + 'Workflow must be deployed before adding as an MCP tool. Use deploy_api first.' + ) + } + + // Build parameter schema if provided + let parameterSchema: Record | null = null + if (parameterDescriptions && parameterDescriptions.length > 0) { + const properties: Record = {} + for (const param of parameterDescriptions) { + properties[param.name] = { description: param.description } + } + parameterSchema = { properties } + } + + const finalToolName = toolName?.trim() || wf.name || 'workflow' + const finalToolDescription = toolDescription?.trim() || null + + // Check if tool already exists for this workflow on this server + const [existingTool] = await db + .select() + .from(workflowMcpTool) + .where( + and(eq(workflowMcpTool.serverId, serverId), eq(workflowMcpTool.workflowId, workflowId)) + ) + .limit(1) + + const now = new Date() + + if (existingTool) { + // Update existing tool + await db + .update(workflowMcpTool) + .set({ + toolName: finalToolName, + toolDescription: finalToolDescription, + parameterSchema, + updatedAt: now, + }) + .where(eq(workflowMcpTool.id, existingTool.id)) + + logger.info('Updated MCP tool', { toolId: existingTool.id, toolName: finalToolName }) + + return DeployMcpResult.parse({ + success: true, + toolId: existingTool.id, + toolName: finalToolName, + toolDescription: finalToolDescription, + serverId, + updated: true, + message: `Workflow MCP tool updated to "${finalToolName}".`, + }) + } + + // Create new tool + const toolId = crypto.randomUUID() + + await db.insert(workflowMcpTool).values({ + id: toolId, + serverId, + workflowId, + toolName: finalToolName, + toolDescription: finalToolDescription, + parameterSchema, + createdAt: now, + updatedAt: now, + }) + + logger.info('Created MCP tool', { toolId, toolName: finalToolName }) + + return DeployMcpResult.parse({ + success: true, + toolId, + toolName: finalToolName, + toolDescription: finalToolDescription, + serverId, + message: `Workflow deployed as MCP tool "${finalToolName}" to server.`, + }) + }, +} diff --git a/apps/sim/lib/copilot/tools/server/workflow/get-block-outputs.ts b/apps/sim/lib/copilot/tools/server/workflow/get-block-outputs.ts new file mode 100644 index 000000000..619a1e5a1 --- /dev/null +++ b/apps/sim/lib/copilot/tools/server/workflow/get-block-outputs.ts @@ -0,0 +1,181 @@ +import { db } from '@sim/db' +import { workflow } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { eq } from 'drizzle-orm' +import { z } from 'zod' +import { getBlockOutputPaths } from '@/lib/workflows/blocks/block-outputs' +import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' +import { normalizeName } from '@/executor/constants' +import type { Loop, Parallel } from '@/stores/workflows/workflow/types' +import type { BaseServerTool } from '../base-tool' + +const logger = createLogger('GetBlockOutputsServerTool') + +export const GetBlockOutputsInput = z.object({ + workflowId: z.string().min(1), + blockIds: z.array(z.string()).optional(), +}) + +const BlockOutputSchema = z.object({ + blockId: z.string(), + blockName: z.string(), + blockType: z.string(), + triggerMode: z.boolean().optional(), + outputs: z.array(z.string()), + insideSubflowOutputs: z.array(z.string()).optional(), + outsideSubflowOutputs: z.array(z.string()).optional(), +}) + +const VariableOutputSchema = z.object({ + id: z.string(), + name: z.string(), + type: z.string(), + tag: z.string(), +}) + +export const GetBlockOutputsResult = z.object({ + blocks: z.array(BlockOutputSchema), + variables: z.array(VariableOutputSchema).optional(), +}) + +export type GetBlockOutputsInputType = z.infer +export type GetBlockOutputsResultType = z.infer + +interface Variable { + id: string + name: string + type: string +} + +function formatOutputsWithPrefix(paths: string[], blockName: string): string[] { + const normalizedName = normalizeName(blockName) + return paths.map((path) => `${normalizedName}.${path}`) +} + +function getSubflowInsidePaths( + blockType: 'loop' | 'parallel', + blockId: string, + loops: Record, + parallels: Record +): string[] { + const paths = ['index'] + if (blockType === 'loop') { + const loopType = loops[blockId]?.loopType || 'for' + if (loopType === 'forEach') { + paths.push('currentItem', 'items') + } + } else { + const parallelType = parallels[blockId]?.parallelType || 'count' + if (parallelType === 'collection') { + paths.push('currentItem', 'items') + } + } + return paths +} + +export const getBlockOutputsServerTool: BaseServerTool< + GetBlockOutputsInputType, + GetBlockOutputsResultType +> = { + name: 'get_block_outputs', + async execute(args: unknown, context?: { userId: string }) { + const parsed = GetBlockOutputsInput.parse(args) + const { workflowId, blockIds } = parsed + + if (!context?.userId) { + throw new Error('User authentication required') + } + + logger.debug('Getting block outputs', { workflowId, blockIds }) + + // Load workflow from normalized tables + const normalizedData = await loadWorkflowFromNormalizedTables(workflowId) + + if (!normalizedData?.blocks) { + throw new Error('Workflow state is empty or invalid') + } + + const blocks = normalizedData.blocks + const loops = normalizedData.loops || {} + const parallels = normalizedData.parallels || {} + + const targetBlockIds = blockIds && blockIds.length > 0 ? blockIds : Object.keys(blocks) + + const blockOutputs: GetBlockOutputsResultType['blocks'] = [] + + for (const blockId of targetBlockIds) { + const block = blocks[blockId] + if (!block?.type) continue + + const blockName = block.name || block.type + + const blockOutput: GetBlockOutputsResultType['blocks'][0] = { + blockId, + blockName, + blockType: block.type, + outputs: [], + } + + // Include triggerMode if the block is in trigger mode + if (block.triggerMode) { + blockOutput.triggerMode = true + } + + if (block.type === 'loop' || block.type === 'parallel') { + const insidePaths = getSubflowInsidePaths(block.type, blockId, loops, parallels) + blockOutput.insideSubflowOutputs = formatOutputsWithPrefix(insidePaths, blockName) + blockOutput.outsideSubflowOutputs = formatOutputsWithPrefix(['results'], blockName) + } else { + // Compute output paths using the block's subBlocks + const outputPaths = getBlockOutputPaths(block.type, block.subBlocks, block.triggerMode) + blockOutput.outputs = formatOutputsWithPrefix(outputPaths, blockName) + } + + blockOutputs.push(blockOutput) + } + + // Get workflow variables if no specific blockIds requested + let variables: GetBlockOutputsResultType['variables'] | undefined + const includeVariables = !blockIds || blockIds.length === 0 + + if (includeVariables) { + // Get variables from workflow record + const [wf] = await db + .select({ variables: workflow.variables }) + .from(workflow) + .where(eq(workflow.id, workflowId)) + .limit(1) + + const workflowVariables = wf?.variables as Record | null + + if (workflowVariables && typeof workflowVariables === 'object') { + variables = Object.values(workflowVariables) + .filter( + (v): v is Variable => + typeof v === 'object' && + v !== null && + 'name' in v && + typeof v.name === 'string' && + v.name.trim() !== '' + ) + .map((variable) => ({ + id: variable.id, + name: variable.name, + type: variable.type || 'string', + tag: `variable.${normalizeName(variable.name)}`, + })) + } + } + + logger.info('Retrieved block outputs', { + workflowId, + blockCount: blockOutputs.length, + variableCount: variables?.length ?? 0, + }) + + return GetBlockOutputsResult.parse({ + blocks: blockOutputs, + ...(variables && { variables }), + }) + }, +} diff --git a/apps/sim/lib/copilot/tools/server/workflow/get-user-workflow.ts b/apps/sim/lib/copilot/tools/server/workflow/get-user-workflow.ts new file mode 100644 index 000000000..6c488d258 --- /dev/null +++ b/apps/sim/lib/copilot/tools/server/workflow/get-user-workflow.ts @@ -0,0 +1,82 @@ +import { db } from '@sim/db' +import { workflow } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { eq } from 'drizzle-orm' +import { z } from 'zod' +import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool' +import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' +import { sanitizeForCopilot } from '@/lib/workflows/sanitization/json-sanitizer' + +const logger = createLogger('GetUserWorkflowServerTool') + +export const GetUserWorkflowInput = z.object({ + workflowId: z.string().min(1), +}) + +export const GetUserWorkflowResult = z.object({ + userWorkflow: z.string(), + workflowId: z.string(), + workflowName: z.string(), +}) + +export type GetUserWorkflowInputType = z.infer +export type GetUserWorkflowResultType = z.infer + +export const getUserWorkflowServerTool: BaseServerTool< + GetUserWorkflowInputType, + GetUserWorkflowResultType +> = { + name: 'get_user_workflow', + async execute(args: unknown, context?: { userId: string }) { + const parsed = GetUserWorkflowInput.parse(args) + const { workflowId } = parsed + + if (!context?.userId) { + throw new Error('User authentication required') + } + + logger.debug('Getting user workflow', { workflowId }) + + // Get workflow metadata + const [wf] = await db + .select({ id: workflow.id, name: workflow.name, userId: workflow.userId }) + .from(workflow) + .where(eq(workflow.id, workflowId)) + .limit(1) + + if (!wf) { + throw new Error(`Workflow not found: ${workflowId}`) + } + + // Load workflow from normalized tables + const normalizedData = await loadWorkflowFromNormalizedTables(workflowId) + + if (!normalizedData?.blocks || Object.keys(normalizedData.blocks).length === 0) { + throw new Error('Workflow state is empty or invalid') + } + + // Build workflow state + const workflowState = { + blocks: normalizedData.blocks, + edges: normalizedData.edges || [], + loops: normalizedData.loops || {}, + parallels: normalizedData.parallels || {}, + } + + // Sanitize for copilot (remove UI-specific data) + const sanitizedState = sanitizeForCopilot(workflowState as any) + const userWorkflow = JSON.stringify(sanitizedState, null, 2) + + logger.info('Retrieved user workflow', { + workflowId, + workflowName: wf.name, + blockCount: Object.keys(normalizedData.blocks).length, + }) + + return GetUserWorkflowResult.parse({ + userWorkflow, + workflowId, + workflowName: wf.name || 'Untitled', + }) + }, +} diff --git a/apps/sim/lib/copilot/tools/server/workflow/get-workflow-from-name.ts b/apps/sim/lib/copilot/tools/server/workflow/get-workflow-from-name.ts new file mode 100644 index 000000000..b78c1b008 --- /dev/null +++ b/apps/sim/lib/copilot/tools/server/workflow/get-workflow-from-name.ts @@ -0,0 +1,86 @@ +import { db } from '@sim/db' +import { workflow } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq, ilike } from 'drizzle-orm' +import { z } from 'zod' +import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool' +import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' +import { sanitizeForCopilot } from '@/lib/workflows/sanitization/json-sanitizer' + +const logger = createLogger('GetWorkflowFromNameServerTool') + +export const GetWorkflowFromNameInput = z.object({ + workflow_name: z.string().min(1), +}) + +export const GetWorkflowFromNameResult = z.object({ + userWorkflow: z.string(), + workflowId: z.string(), + workflowName: z.string(), +}) + +export type GetWorkflowFromNameInputType = z.infer +export type GetWorkflowFromNameResultType = z.infer + +export const getWorkflowFromNameServerTool: BaseServerTool< + GetWorkflowFromNameInputType, + GetWorkflowFromNameResultType +> = { + name: 'get_workflow_from_name', + async execute(args: unknown, context?: { userId: string }) { + const parsed = GetWorkflowFromNameInput.parse(args) + const workflowName = parsed.workflow_name.trim() + + logger.debug('Executing get_workflow_from_name', { + workflowName, + userId: context?.userId, + }) + + if (!context?.userId) { + throw new Error('User ID is required') + } + + // Find workflow by name (case-insensitive) + const workflows = await db + .select({ id: workflow.id, name: workflow.name }) + .from(workflow) + .where(and(eq(workflow.userId, context.userId), ilike(workflow.name, workflowName))) + .limit(1) + + if (workflows.length === 0) { + throw new Error(`Workflow not found: ${workflowName}`) + } + + const wf = workflows[0] + + // Load workflow from normalized tables + const normalizedData = await loadWorkflowFromNormalizedTables(wf.id) + + if (!normalizedData?.blocks || Object.keys(normalizedData.blocks).length === 0) { + throw new Error('Workflow state is empty or invalid') + } + + // Build workflow state from normalized data + const workflowState = { + blocks: normalizedData.blocks, + edges: normalizedData.edges || [], + loops: normalizedData.loops || {}, + parallels: normalizedData.parallels || {}, + } + + // Sanitize for copilot + const sanitizedState = sanitizeForCopilot(workflowState as any) + const userWorkflow = JSON.stringify(sanitizedState, null, 2) + + logger.info('Retrieved workflow by name', { + workflowId: wf.id, + workflowName: wf.name, + }) + + return GetWorkflowFromNameResult.parse({ + userWorkflow, + workflowId: wf.id, + workflowName: wf.name || workflowName, + }) + }, +} diff --git a/apps/sim/lib/copilot/tools/server/workflow/list-user-workflows.ts b/apps/sim/lib/copilot/tools/server/workflow/list-user-workflows.ts new file mode 100644 index 000000000..fe4a24b12 --- /dev/null +++ b/apps/sim/lib/copilot/tools/server/workflow/list-user-workflows.ts @@ -0,0 +1,42 @@ +import { db } from '@sim/db' +import { workflow } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { eq } from 'drizzle-orm' +import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool' +import { + type ListUserWorkflowsInputType, + ListUserWorkflowsResult, + type ListUserWorkflowsResultType, +} from '@/lib/copilot/tools/shared/schemas' + +const logger = createLogger('ListUserWorkflowsServerTool') + +export const listUserWorkflowsServerTool: BaseServerTool< + ListUserWorkflowsInputType, + ListUserWorkflowsResultType +> = { + name: 'list_user_workflows', + async execute(_args: unknown, context?: { userId: string }) { + logger.debug('Executing list_user_workflows', { userId: context?.userId }) + + if (!context?.userId) { + throw new Error('User ID is required to list workflows') + } + + const workflows = await db + .select({ id: workflow.id, name: workflow.name }) + .from(workflow) + .where(eq(workflow.userId, context.userId)) + + const names = workflows + .map((w) => w.name) + .filter((n): n is string => typeof n === 'string' && n.length > 0) + + logger.info('Found workflows', { count: names.length, userId: context.userId }) + + return ListUserWorkflowsResult.parse({ + workflow_names: names, + count: names.length, + }) + }, +} diff --git a/apps/sim/lib/copilot/tools/server/workflow/list-workspace-mcp-servers.ts b/apps/sim/lib/copilot/tools/server/workflow/list-workspace-mcp-servers.ts new file mode 100644 index 000000000..e8b9ec77f --- /dev/null +++ b/apps/sim/lib/copilot/tools/server/workflow/list-workspace-mcp-servers.ts @@ -0,0 +1,83 @@ +import { db } from '@sim/db' +import { workflowMcpServer, workflowMcpTool } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { eq } from 'drizzle-orm' +import { z } from 'zod' +import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool' + +const logger = createLogger('ListWorkspaceMcpServersServerTool') + +export const ListWorkspaceMcpServersInput = z.object({ + workspaceId: z.string(), +}) + +export const ListWorkspaceMcpServersResult = z.object({ + servers: z.array( + z.object({ + id: z.string(), + name: z.string(), + description: z.string().nullable(), + toolCount: z.number(), + toolNames: z.array(z.string()), + }) + ), + count: z.number(), + message: z.string(), +}) + +export type ListWorkspaceMcpServersInputType = z.infer +export type ListWorkspaceMcpServersResultType = z.infer + +export const listWorkspaceMcpServersServerTool: BaseServerTool< + ListWorkspaceMcpServersInputType, + ListWorkspaceMcpServersResultType +> = { + name: 'list_workspace_mcp_servers', + async execute(args: unknown, _context?: { userId: string }) { + const parsed = ListWorkspaceMcpServersInput.parse(args) + const { workspaceId } = parsed + + logger.debug('Listing workspace MCP servers', { workspaceId }) + + // Get all MCP servers in the workspace with their tool counts + const servers = await db + .select({ + id: workflowMcpServer.id, + name: workflowMcpServer.name, + description: workflowMcpServer.description, + }) + .from(workflowMcpServer) + .where(eq(workflowMcpServer.workspaceId, workspaceId)) + + // Get tool names for each server + const serversWithTools = await Promise.all( + servers.map(async (server) => { + const tools = await db + .select({ toolName: workflowMcpTool.toolName }) + .from(workflowMcpTool) + .where(eq(workflowMcpTool.serverId, server.id)) + + return { + id: server.id, + name: server.name, + description: server.description, + toolCount: tools.length, + toolNames: tools.map((t) => t.toolName), + } + }) + ) + + const message = + serversWithTools.length === 0 + ? 'No MCP servers found in this workspace. Use create_workspace_mcp_server to create one.' + : `Found ${serversWithTools.length} MCP server(s) in the workspace.` + + logger.info('Listed MCP servers', { workspaceId, count: serversWithTools.length }) + + return ListWorkspaceMcpServersResult.parse({ + servers: serversWithTools, + count: serversWithTools.length, + message, + }) + }, +} diff --git a/apps/sim/lib/copilot/tools/server/workflow/redeploy.ts b/apps/sim/lib/copilot/tools/server/workflow/redeploy.ts new file mode 100644 index 000000000..8fe1dea57 --- /dev/null +++ b/apps/sim/lib/copilot/tools/server/workflow/redeploy.ts @@ -0,0 +1,87 @@ +import { db } from '@sim/db' +import { workflow, workflowDeploymentVersion } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { desc, eq } from 'drizzle-orm' +import { z } from 'zod' +import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool' +import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' + +const logger = createLogger('RedeployServerTool') + +export const RedeployInput = z.object({ + workflowId: z.string(), +}) + +export const RedeployResult = z.object({ + success: z.boolean(), + workflowId: z.string(), + deployedAt: z.string().nullable(), + message: z.string(), +}) + +export type RedeployInputType = z.infer +export type RedeployResultType = z.infer + +export const redeployServerTool: BaseServerTool = { + name: 'redeploy', + async execute(args: unknown, _context?: { userId: string }) { + const parsed = RedeployInput.parse(args) + const { workflowId } = parsed + + logger.debug('Redeploying workflow', { workflowId }) + + // Get workflow state + const [wf] = await db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1) + + if (!wf) { + throw new Error(`Workflow not found: ${workflowId}`) + } + + // Get current max version + const [maxVersion] = await db + .select({ version: workflowDeploymentVersion.version }) + .from(workflowDeploymentVersion) + .where(eq(workflowDeploymentVersion.workflowId, workflowId)) + .orderBy(desc(workflowDeploymentVersion.version)) + .limit(1) + + const newVersion = (maxVersion?.version || 0) + 1 + + // Deactivate all existing versions + await db + .update(workflowDeploymentVersion) + .set({ isActive: false }) + .where(eq(workflowDeploymentVersion.workflowId, workflowId)) + + // Create new deployment version + const deploymentId = crypto.randomUUID() + const now = new Date() + + // Load workflow state from normalized tables + const normalizedData = await loadWorkflowFromNormalizedTables(workflowId) + const workflowState = { + blocks: normalizedData?.blocks || {}, + edges: normalizedData?.edges || [], + loops: normalizedData?.loops || {}, + parallels: normalizedData?.parallels || {}, + } + + await db.insert(workflowDeploymentVersion).values({ + id: deploymentId, + workflowId, + version: newVersion, + state: workflowState, + isActive: true, + createdAt: now, + }) + + logger.info('Workflow redeployed', { workflowId, version: newVersion }) + + return RedeployResult.parse({ + success: true, + workflowId, + deployedAt: now.toISOString(), + message: `Workflow redeployed (version ${newVersion})`, + }) + }, +} diff --git a/apps/sim/lib/copilot/tools/server/workflow/run-workflow.ts b/apps/sim/lib/copilot/tools/server/workflow/run-workflow.ts new file mode 100644 index 000000000..e1f56c3f8 --- /dev/null +++ b/apps/sim/lib/copilot/tools/server/workflow/run-workflow.ts @@ -0,0 +1,130 @@ +import { db } from '@sim/db' +import { workflow, workflowDeploymentVersion } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq } from 'drizzle-orm' +import { z } from 'zod' +import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool' + +const logger = createLogger('RunWorkflowServerTool') + +export const RunWorkflowInput = z.object({ + workflowId: z.string().min(1), + workflow_input: z.record(z.any()).optional(), +}) + +export const RunWorkflowResult = z.object({ + success: z.boolean(), + executionId: z.string().nullable(), + executionStartTime: z.string().nullable(), + output: z.any().nullable(), + message: z.string(), + error: z.string().optional(), +}) + +export type RunWorkflowInputType = z.infer +export type RunWorkflowResultType = z.infer + +export const runWorkflowServerTool: BaseServerTool = { + name: 'run_workflow', + async execute(args: unknown, context?: { userId: string }) { + const parsed = RunWorkflowInput.parse(args) + const { workflowId, workflow_input } = parsed + + if (!context?.userId) { + throw new Error('User authentication required') + } + + logger.debug('Running workflow', { workflowId, hasInput: !!workflow_input }) + + // Get workflow info + const [wf] = await db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1) + + if (!wf) { + throw new Error(`Workflow not found: ${workflowId}`) + } + + // Check if workflow is deployed + const [deployment] = await db + .select({ id: workflowDeploymentVersion.id }) + .from(workflowDeploymentVersion) + .where( + and( + eq(workflowDeploymentVersion.workflowId, workflowId), + eq(workflowDeploymentVersion.isActive, true) + ) + ) + .limit(1) + + const executionId = crypto.randomUUID() + const executionStartTime = new Date().toISOString() + + // If workflow is deployed, we can use the execute API + // Otherwise we need to execute directly + const appUrl = process.env.NEXT_PUBLIC_APP_URL || 'http://localhost:3000' + + try { + // Call the internal execution endpoint + // Note: For server-side execution without a browser, we call the API directly + const executeUrl = `${appUrl}/api/workflows/${workflowId}/execute` + + const response = await fetch(executeUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + // Use internal auth header + 'X-Internal-Auth': context.userId, + }, + body: JSON.stringify({ + input: workflow_input || {}, + executionId, + }), + }) + + if (!response.ok) { + const errorText = await response.text().catch(() => '') + throw new Error(errorText || `Execution failed with status ${response.status}`) + } + + const result = await response.json() + + // Determine success from result + const succeeded = result.success !== false + const output = result.output || result.result || null + + if (succeeded) { + logger.info('Workflow execution completed', { workflowId, executionId }) + + return RunWorkflowResult.parse({ + success: true, + executionId, + executionStartTime, + output, + message: `Workflow execution completed. Started at: ${executionStartTime}`, + }) + } + const errorMessage = result.error || 'Workflow execution failed' + logger.error('Workflow execution failed', { workflowId, error: errorMessage }) + + return RunWorkflowResult.parse({ + success: false, + executionId, + executionStartTime, + output: null, + message: errorMessage, + error: errorMessage, + }) + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error) + logger.error('Workflow execution error', { workflowId, error: errorMessage }) + + return RunWorkflowResult.parse({ + success: false, + executionId, + executionStartTime, + output: null, + message: errorMessage, + error: errorMessage, + }) + } + }, +} diff --git a/apps/sim/lib/copilot/tools/server/workflow/set-global-workflow-variables.ts b/apps/sim/lib/copilot/tools/server/workflow/set-global-workflow-variables.ts new file mode 100644 index 000000000..d2d28635b --- /dev/null +++ b/apps/sim/lib/copilot/tools/server/workflow/set-global-workflow-variables.ts @@ -0,0 +1,161 @@ +import { db } from '@sim/db' +import { workflow } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { eq } from 'drizzle-orm' +import { z } from 'zod' +import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool' + +const logger = createLogger('SetGlobalWorkflowVariablesServerTool') + +const OperationItemSchema = z.object({ + operation: z.enum(['add', 'edit', 'delete']), + name: z.string(), + type: z.enum(['plain', 'number', 'boolean', 'array', 'object']).optional(), + value: z.string().optional(), +}) + +export const SetGlobalWorkflowVariablesInput = z.object({ + workflowId: z.string(), + operations: z.array(OperationItemSchema), +}) + +export const SetGlobalWorkflowVariablesResult = z.object({ + success: z.boolean(), + message: z.string(), + variables: z.record(z.unknown()), +}) + +export type SetGlobalWorkflowVariablesInputType = z.infer +export type SetGlobalWorkflowVariablesResultType = z.infer + +function coerceValue( + value: string | undefined, + type?: 'plain' | 'number' | 'boolean' | 'array' | 'object' +): unknown { + if (value === undefined) return value + const t = type || 'plain' + try { + if (t === 'number') { + const n = Number(value) + if (Number.isNaN(n)) return value + return n + } + if (t === 'boolean') { + const v = String(value).trim().toLowerCase() + if (v === 'true') return true + if (v === 'false') return false + return value + } + if (t === 'array' || t === 'object') { + const parsed = JSON.parse(value) + if (t === 'array' && Array.isArray(parsed)) return parsed + if (t === 'object' && parsed && typeof parsed === 'object' && !Array.isArray(parsed)) + return parsed + return value + } + } catch { + // Fall through to return value as-is + } + return value +} + +export const setGlobalWorkflowVariablesServerTool: BaseServerTool< + SetGlobalWorkflowVariablesInputType, + SetGlobalWorkflowVariablesResultType +> = { + name: 'set_global_workflow_variables', + async execute(args: unknown, _context?: { userId: string }) { + const parsed = SetGlobalWorkflowVariablesInput.parse(args) + const { workflowId, operations } = parsed + + logger.debug('Setting workflow variables', { workflowId, operationCount: operations.length }) + + // Get current workflow variables + const [wf] = await db + .select({ variables: workflow.variables }) + .from(workflow) + .where(eq(workflow.id, workflowId)) + .limit(1) + + if (!wf) { + throw new Error(`Workflow not found: ${workflowId}`) + } + + const currentVarsRecord = (wf.variables as Record) || {} + + // Build mutable map by variable name + const byName: Record> = {} + Object.values(currentVarsRecord).forEach((v: unknown) => { + if (v && typeof v === 'object' && 'id' in v && 'name' in v) { + const variable = v as Record + byName[String(variable.name)] = variable + } + }) + + // Apply operations in order + for (const op of operations) { + const key = String(op.name) + const nextType = op.type || (byName[key]?.type as string) || 'plain' + + if (op.operation === 'delete') { + delete byName[key] + continue + } + + const typedValue = coerceValue( + op.value, + nextType as 'plain' | 'number' | 'boolean' | 'array' | 'object' + ) + + if (op.operation === 'add') { + byName[key] = { + id: crypto.randomUUID(), + workflowId, + name: key, + type: nextType, + value: typedValue, + } + continue + } + + if (op.operation === 'edit') { + if (!byName[key]) { + // If editing a non-existent variable, create it + byName[key] = { + id: crypto.randomUUID(), + workflowId, + name: key, + type: nextType, + value: typedValue, + } + } else { + byName[key] = { + ...byName[key], + type: nextType, + ...(op.value !== undefined ? { value: typedValue } : {}), + } + } + } + } + + // Convert byName (keyed by name) to record keyed by ID for storage + const variablesRecord: Record = {} + for (const v of Object.values(byName)) { + variablesRecord[v.id as string] = v + } + + // Update workflow variables + await db.update(workflow).set({ variables: variablesRecord }).where(eq(workflow.id, workflowId)) + + logger.info('Updated workflow variables', { + workflowId, + variableCount: Object.keys(byName).length, + }) + + return SetGlobalWorkflowVariablesResult.parse({ + success: true, + message: 'Workflow variables updated', + variables: byName, + }) + }, +} diff --git a/apps/sim/lib/copilot/tools/shared/schemas.ts b/apps/sim/lib/copilot/tools/shared/schemas.ts index 2377aecf7..a010a1ddd 100644 --- a/apps/sim/lib/copilot/tools/shared/schemas.ts +++ b/apps/sim/lib/copilot/tools/shared/schemas.ts @@ -1,5 +1,9 @@ import { z } from 'zod' +// ============================================================================ +// Common Schemas (shared across multiple tools and API routes) +// ============================================================================ + // Generic envelope used by client to validate API responses export const ExecuteResponseSuccessSchema = z.object({ success: z.literal(true), @@ -7,6 +11,85 @@ export const ExecuteResponseSuccessSchema = z.object({ }) export type ExecuteResponseSuccess = z.infer +/** + * Standard tool error structure. + * Used in ToolResult and tool_result events. + */ +export const ToolErrorSchema = z.object({ + code: z.string(), + message: z.string(), + details: z.record(z.unknown()).optional(), +}) +export type ToolError = z.infer + +/** + * Standard tool result structure. + * This is the canonical format for tool execution results across the system. + */ +export const ToolResultSchema = z.object({ + success: z.boolean(), + data: z.unknown().optional(), + error: ToolErrorSchema.optional(), +}) +export type ToolResultType = z.infer + +/** + * Mark-complete payload schema. + * Used when calling the Go copilot's mark-complete endpoint. + */ +export const MarkCompletePayloadSchema = z.object({ + /** Tool call ID */ + id: z.string(), + /** Tool name */ + name: z.string(), + /** HTTP-like status code (200 = success, 4xx = client error, 5xx = server error) */ + status: z.number().int().min(100).max(599), + /** Optional message (typically error message or success description) */ + message: z.unknown().optional(), + /** Optional data payload (tool result data) */ + data: z.unknown().optional(), +}) +export type MarkCompletePayload = z.infer + +/** + * Tool result event from Go (received via SSE stream). + * This represents what we receive back after mark-complete. + */ +export const ToolResultEventSchema = z.object({ + toolCallId: z.string().optional(), + data: z + .object({ + id: z.string().optional(), + }) + .optional(), + success: z.boolean().optional(), + failedDependency: z.boolean().optional(), + result: z + .object({ + skipped: z.boolean().optional(), + }) + .passthrough() + .optional(), +}) +export type ToolResultEvent = z.infer + +/** + * Helper to extract toolCallId from tool_result event data. + * Handles the various formats Go might send. + */ +export function extractToolCallId(data: unknown): string | undefined { + if (!data || typeof data !== 'object') return undefined + const d = data as Record + // Try direct toolCallId first + if (typeof d.toolCallId === 'string') return d.toolCallId + // Then try nested data.id + if (d.data && typeof d.data === 'object') { + const nested = d.data as Record + if (typeof nested.id === 'string') return nested.id + } + return undefined +} + // get_blocks_and_tools export const GetBlocksAndToolsInput = z.object({}) export const GetBlocksAndToolsResult = z.object({ @@ -176,3 +259,187 @@ export const GetBlockUpstreamReferencesResult = z.object({ }) export type GetBlockUpstreamReferencesInputType = z.infer export type GetBlockUpstreamReferencesResultType = z.infer + +// ============================================================================ +// Search Tools +// ============================================================================ + +// search_documentation +export const SearchDocumentationInput = z.object({ + query: z.string().min(1), + topK: z.number().min(1).max(50).optional().default(10), + threshold: z.number().min(0).max(1).optional(), +}) +export const SearchDocumentationResult = z.object({ + results: z.array( + z.object({ + id: z.number(), + title: z.string(), + url: z.string(), + content: z.string(), + similarity: z.number(), + }) + ), + query: z.string(), + totalResults: z.number(), +}) +export type SearchDocumentationInputType = z.infer +export type SearchDocumentationResultType = z.infer + +// search_online +export const SearchOnlineInput = z.object({ + query: z.string().min(1), + num: z.number().min(1).max(100).optional().default(10), + type: z.string().optional().default('search'), + gl: z.string().optional(), + hl: z.string().optional(), +}) +export const SearchOnlineResult = z.object({ + results: z.array(z.record(z.unknown())), + query: z.string(), + type: z.string(), + totalResults: z.number(), + source: z.enum(['exa', 'serper']), +}) +export type SearchOnlineInputType = z.infer +export type SearchOnlineResultType = z.infer + +// make_api_request +export const MakeApiRequestInput = z.object({ + url: z.string().url(), + method: z.enum(['GET', 'POST', 'PUT']), + queryParams: z.record(z.union([z.string(), z.number(), z.boolean()])).optional(), + headers: z.record(z.string()).optional(), + body: z.unknown().optional(), +}) +export const MakeApiRequestResult = z.object({ + data: z.unknown(), + status: z.number(), + headers: z.record(z.unknown()).optional(), + truncated: z.boolean().optional(), + totalChars: z.number().optional(), + previewChars: z.number().optional(), + note: z.string().optional(), +}) +export type MakeApiRequestInputType = z.infer +export type MakeApiRequestResultType = z.infer + +// ============================================================================ +// Workflow Tools +// ============================================================================ + +// edit_workflow - input is complex, using passthrough for flexibility +export const EditWorkflowInput = z.object({ + workflowId: z.string(), + operations: z.array(z.record(z.unknown())), + currentUserWorkflow: z.unknown().optional(), +}) +export const EditWorkflowResult = z.object({ + success: z.boolean(), + workflowState: z.unknown(), + inputValidationErrors: z.array(z.string()).optional(), + inputValidationMessage: z.string().optional(), + skippedItems: z.array(z.string()).optional(), + skippedItemsMessage: z.string().optional(), +}) +export type EditWorkflowInputType = z.infer +export type EditWorkflowResultType = z.infer + +// get_workflow_console +export const GetWorkflowConsoleInput = z.object({ + workflowId: z.string(), + limit: z.number().min(1).max(50).optional().default(2), + includeDetails: z.boolean().optional().default(false), +}) +export const GetWorkflowConsoleResult = z.array( + z.object({ + executionId: z.string(), + startedAt: z.string(), + blocks: z.array( + z.object({ + id: z.string(), + name: z.string(), + startedAt: z.string(), + endedAt: z.string(), + durationMs: z.number(), + output: z.unknown(), + error: z.string().optional(), + }) + ), + }) +) +export type GetWorkflowConsoleInputType = z.infer +export type GetWorkflowConsoleResultType = z.infer + +// list_user_workflows +export const ListUserWorkflowsInput = z.object({}) +export const ListUserWorkflowsResult = z.object({ + workflow_names: z.array(z.string()), + count: z.number(), +}) +export type ListUserWorkflowsInputType = z.infer +export type ListUserWorkflowsResultType = z.infer + +// ============================================================================ +// User Tools +// ============================================================================ + +// get_credentials +export const GetCredentialsInput = z.object({ + workflowId: z.string().optional(), +}) +export const GetCredentialsResult = z.object({ + oauth: z.object({ + connected: z.object({ + credentials: z.array( + z.object({ + id: z.string(), + name: z.string(), + provider: z.string(), + serviceName: z.string(), + lastUsed: z.string(), + isDefault: z.boolean(), + accessToken: z.string().nullable(), + }) + ), + total: z.number(), + }), + notConnected: z.object({ + services: z.array( + z.object({ + providerId: z.string(), + name: z.string(), + description: z.string().optional(), + baseProvider: z.string().optional(), + }) + ), + total: z.number(), + }), + }), + environment: z.object({ + variableNames: z.array(z.string()), + count: z.number(), + personalVariables: z.array(z.string()), + workspaceVariables: z.array(z.string()), + conflicts: z.array(z.string()).optional(), + }), +}) +export type GetCredentialsInputType = z.infer +export type GetCredentialsResultType = z.infer + +// set_environment_variables +export const SetEnvironmentVariablesInput = z.object({ + variables: z.array( + z.object({ + key: z.string(), + value: z.string(), + }) + ), + workspaceId: z.string().optional(), +}) +export const SetEnvironmentVariablesResult = z.object({ + success: z.boolean(), + message: z.string(), + savedCount: z.number().optional(), + variables: z.array(z.string()).optional(), +}) diff --git a/apps/sim/stores/panel/copilot/store.ts b/apps/sim/stores/panel/copilot/store.ts index e368d412e..378a72f0c 100644 --- a/apps/sim/stores/panel/copilot/store.ts +++ b/apps/sim/stores/panel/copilot/store.ts @@ -5,6 +5,11 @@ import { create } from 'zustand' import { devtools } from 'zustand/middleware' import { type CopilotChat, sendStreamingMessage } from '@/lib/copilot/api' import type { CopilotTransportMode } from '@/lib/copilot/models' +import { + isServerExecutedToolSync, + markToolCallServerHandled, + prefetchServerExecutedTools, +} from '@/lib/copilot/server-executed-tools' import type { BaseClientToolMetadata, ClientToolDisplay, @@ -75,6 +80,7 @@ import { ManageMcpToolClientTool } from '@/lib/copilot/tools/client/workflow/man import { RedeployClientTool } from '@/lib/copilot/tools/client/workflow/redeploy' import { RunWorkflowClientTool } from '@/lib/copilot/tools/client/workflow/run-workflow' import { SetGlobalWorkflowVariablesClientTool } from '@/lib/copilot/tools/client/workflow/set-global-workflow-variables' +import { extractToolCallId } from '@/lib/copilot/tools/shared/schemas' import { getQueryClient } from '@/app/_shell/providers/query-provider' import { subscriptionKeys } from '@/hooks/queries/subscription' import type { @@ -347,6 +353,17 @@ function isBackgroundState(state: any): boolean { } } +/** + * Checks if a tool call state is aborted + */ +function isAbortedState(state: any): boolean { + try { + return state === 'aborted' || state === (ClientToolCallState as any).aborted + } catch { + return state === 'aborted' + } +} + /** * Checks if a tool call state is terminal (success, error, rejected, aborted, review, or background) */ @@ -1138,8 +1155,8 @@ const sseHandlers: Record = { }, tool_result: (data, context, get, set) => { try { - const toolCallId: string | undefined = data?.toolCallId || data?.data?.id - const success: boolean | undefined = data?.success + const toolCallId = extractToolCallId(data) + const success: boolean | undefined = (data as Record)?.success as boolean const failedDependency: boolean = data?.failedDependency === true const skipped: boolean = data?.result?.skipped === true if (!toolCallId) return @@ -1149,9 +1166,10 @@ const sseHandlers: Record = { if ( isRejectedState(current.state) || isReviewState(current.state) || - isBackgroundState(current.state) + isBackgroundState(current.state) || + isAbortedState(current.state) ) { - // Preserve terminal review/rejected state; do not override + // Preserve terminal review/rejected/aborted state; do not override return } const targetState = success @@ -1207,7 +1225,8 @@ const sseHandlers: Record = { if ( isRejectedState(b.toolCall?.state) || isReviewState(b.toolCall?.state) || - isBackgroundState(b.toolCall?.state) + isBackgroundState(b.toolCall?.state) || + isAbortedState(b.toolCall?.state) ) break const targetState = success @@ -1236,8 +1255,8 @@ const sseHandlers: Record = { }, tool_error: (data, context, get, set) => { try { - const toolCallId: string | undefined = data?.toolCallId || data?.data?.id - const failedDependency: boolean = data?.failedDependency === true + const toolCallId = extractToolCallId(data) + const failedDependency: boolean = (data as Record)?.failedDependency === true if (!toolCallId) return const { toolCallsById } = get() const current = toolCallsById[toolCallId] @@ -1245,7 +1264,8 @@ const sseHandlers: Record = { if ( isRejectedState(current.state) || isReviewState(current.state) || - isBackgroundState(current.state) + isBackgroundState(current.state) || + isAbortedState(current.state) ) { return } @@ -1271,7 +1291,8 @@ const sseHandlers: Record = { if ( isRejectedState(b.toolCall?.state) || isReviewState(b.toolCall?.state) || - isBackgroundState(b.toolCall?.state) + isBackgroundState(b.toolCall?.state) || + isAbortedState(b.toolCall?.state) ) break const targetState = failedDependency @@ -1362,6 +1383,28 @@ const sseHandlers: Record = { return } + // Check if this tool is executed server-side + // If so, skip client execution - the server will handle it and send tool_result + if (name && isServerExecutedToolSync(name)) { + markToolCallServerHandled(id, name) + logger.info('[toolCallsById] Tool is server-executed, skipping client execution', { + id, + name, + }) + // Update state to executing to show progress in UI + const executingMap = { ...get().toolCallsById } + executingMap[id] = { + ...executingMap[id], + state: ClientToolCallState.executing, + display: resolveToolDisplay(name, ClientToolCallState.executing, id, args), + } + set({ toolCallsById: executingMap }) + // Update inline content block + upsertToolCallBlock(context, executingMap[id]) + updateStreamingMessage(set, context) + return + } + // Prefer interface-based registry to determine interrupt and execute try { const def = name ? getTool(name) : undefined @@ -1419,11 +1462,12 @@ const sseHandlers: Record = { ? result.status >= 200 && result.status < 300 : true const completeMap = { ...get().toolCallsById } - // Do not override terminal review/rejected + // Do not override terminal review/rejected/aborted if ( isRejectedState(completeMap[id]?.state) || isReviewState(completeMap[id]?.state) || - isBackgroundState(completeMap[id]?.state) + isBackgroundState(completeMap[id]?.state) || + isAbortedState(completeMap[id]?.state) ) { return } @@ -1461,11 +1505,12 @@ const sseHandlers: Record = { }) .catch((e) => { const errorMap = { ...get().toolCallsById } - // Do not override terminal review/rejected + // Do not override terminal review/rejected/aborted if ( isRejectedState(errorMap[id]?.state) || isReviewState(errorMap[id]?.state) || - isBackgroundState(errorMap[id]?.state) + isBackgroundState(errorMap[id]?.state) || + isAbortedState(errorMap[id]?.state) ) { return } @@ -1530,11 +1575,12 @@ const sseHandlers: Record = { }) .catch(() => { const errorMap = { ...get().toolCallsById } - // Do not override terminal review/rejected + // Do not override terminal review/rejected/aborted if ( isRejectedState(errorMap[id]?.state) || isReviewState(errorMap[id]?.state) || - isBackgroundState(errorMap[id]?.state) + isBackgroundState(errorMap[id]?.state) || + isAbortedState(errorMap[id]?.state) ) { return } @@ -2157,8 +2203,8 @@ const subAgentSSEHandlers: Record = { const parentToolCallId = context.subAgentParentToolCallId if (!parentToolCallId) return - const toolCallId: string | undefined = data?.toolCallId || data?.data?.id - const success: boolean | undefined = data?.success !== false // Default to true if not specified + const toolCallId = extractToolCallId(data) + const success: boolean | undefined = (data as Record)?.success !== false // Default to true if not specified if (!toolCallId) return // Initialize if needed @@ -2173,6 +2219,12 @@ const subAgentSSEHandlers: Record = { if (existingIndex >= 0) { const existing = context.subAgentToolCalls[parentToolCallId][existingIndex] + + // Preserve aborted state - don't override if user aborted + if (isAbortedState(existing.state)) { + return + } + const updatedSubAgentToolCall = { ...existing, state: targetState, @@ -2191,6 +2243,10 @@ const subAgentSSEHandlers: Record = { // Update the individual tool call in toolCallsById so ToolCall component gets latest state const { toolCallsById } = get() if (toolCallsById[toolCallId]) { + // Also check toolCallsById state in case it was aborted there + if (isAbortedState(toolCallsById[toolCallId].state)) { + return + } const updatedMap = { ...toolCallsById, [toolCallId]: updatedSubAgentToolCall, @@ -2385,6 +2441,9 @@ export const useCopilotStore = create()( const { isSendingMessage } = get() if (isSendingMessage) get().abortMessage() + // Prefetch server-executed tools list (for skipping client execution) + prefetchServerExecutedTools() + // Abort all in-progress tools and clear any diff preview abortAllInProgressTools(set, get) try { @@ -3063,9 +3122,9 @@ export const useCopilotStore = create()( const map = { ...get().toolCallsById } const current = map[id] if (!current) return - // Preserve rejected state from being overridden + // Preserve rejected/aborted state from being overridden with success if ( - isRejectedState(current.state) && + (isRejectedState(current.state) || isAbortedState(current.state)) && (newState === 'success' || newState === (ClientToolCallState as any).success) ) { return @@ -3143,8 +3202,11 @@ export const useCopilotStore = create()( if (!id) return const current = toolCallsById[id] if (!current) return - // Do not override a rejected tool with success - if (isRejectedState(current.state) && targetState === (ClientToolCallState as any).success) { + // Do not override a rejected or aborted tool with success + if ( + (isRejectedState(current.state) || isAbortedState(current.state)) && + targetState === (ClientToolCallState as any).success + ) { return } @@ -3862,11 +3924,12 @@ export const useCopilotStore = create()( const success = result.success && result.result?.success const completeMap = { ...get().toolCallsById } - // Do not override terminal review/rejected + // Do not override terminal review/rejected/aborted if ( isRejectedState(completeMap[id]?.state) || isReviewState(completeMap[id]?.state) || - isBackgroundState(completeMap[id]?.state) + isBackgroundState(completeMap[id]?.state) || + isAbortedState(completeMap[id]?.state) ) { return } @@ -3911,11 +3974,12 @@ export const useCopilotStore = create()( } catch {} } catch (e) { const errorMap = { ...get().toolCallsById } - // Do not override terminal review/rejected + // Do not override terminal review/rejected/aborted if ( isRejectedState(errorMap[id]?.state) || isReviewState(errorMap[id]?.state) || - isBackgroundState(errorMap[id]?.state) + isBackgroundState(errorMap[id]?.state) || + isAbortedState(errorMap[id]?.state) ) { return }