diff --git a/apps/sim/app/api/copilot/chat/route.ts b/apps/sim/app/api/copilot/chat/route.ts index 9d31bf5c3..8c57cddff 100644 --- a/apps/sim/app/api/copilot/chat/route.ts +++ b/apps/sim/app/api/copilot/chat/route.ts @@ -7,7 +7,7 @@ import { z } from 'zod' import { getSession } from '@/lib/auth' import { generateChatTitle } from '@/lib/copilot/chat-title' import { getCopilotModel } from '@/lib/copilot/config' -import { SIM_AGENT_API_URL_DEFAULT, SIM_AGENT_VERSION } from '@/lib/copilot/constants' +import { SIM_AGENT_VERSION } from '@/lib/copilot/constants' import { COPILOT_MODEL_IDS, COPILOT_REQUEST_MODES } from '@/lib/copilot/models' import { authenticateCopilotRequestSessionOnly, @@ -23,10 +23,10 @@ import { CopilotFiles } from '@/lib/uploads' import { createFileContent } from '@/lib/uploads/utils/file-utils' import { tools } from '@/tools/registry' import { getLatestVersionTools, stripVersionSuffix } from '@/tools/utils' +import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator' const logger = createLogger('CopilotChatAPI') -const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT const FileAttachmentSchema = z.object({ id: z.string(), @@ -465,77 +465,19 @@ export async function POST(req: NextRequest) { }) } catch {} - const simAgentResponse = await fetch(`${SIM_AGENT_API_URL}/api/chat-completion-streaming`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - ...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}), - }, - body: JSON.stringify(requestPayload), - }) - - if (!simAgentResponse.ok) { - if (simAgentResponse.status === 401 || simAgentResponse.status === 402) { - // Rethrow status only; client will render appropriate assistant message - return new NextResponse(null, { status: simAgentResponse.status }) - } - - const errorText = await simAgentResponse.text().catch(() => '') - logger.error(`[${tracker.requestId}] Sim agent API error:`, { - status: simAgentResponse.status, - error: errorText, - }) - - return NextResponse.json( - { error: `Sim agent API error: ${simAgentResponse.statusText}` }, - { status: simAgentResponse.status } - ) - } - - // If streaming is requested, forward the stream and update chat later - if (stream && simAgentResponse.body) { - // Create user message to save - const userMessage = { - id: userMessageIdToUse, // Consistent ID used for request and persistence - role: 'user', - content: message, - timestamp: new Date().toISOString(), - ...(fileAttachments && fileAttachments.length > 0 && { fileAttachments }), - ...(Array.isArray(contexts) && contexts.length > 0 && { contexts }), - ...(Array.isArray(contexts) && - contexts.length > 0 && { - contentBlocks: [{ type: 'contexts', contexts: contexts as any, timestamp: Date.now() }], - }), - } - - // Create a pass-through stream that captures the response + if (stream) { const transformedStream = new ReadableStream({ async start(controller) { const encoder = new TextEncoder() - let assistantContent = '' - const toolCalls: any[] = [] - let buffer = '' - const isFirstDone = true - let responseIdFromStart: string | undefined - let responseIdFromDone: string | undefined - // Track tool call progress to identify a safe done event - const announcedToolCallIds = new Set() - const startedToolExecutionIds = new Set() - const completedToolExecutionIds = new Set() - let lastDoneResponseId: string | undefined - let lastSafeDoneResponseId: string | undefined - // Send chatId as first event if (actualChatId) { - const chatIdEvent = `data: ${JSON.stringify({ - type: 'chat_id', - chatId: actualChatId, - })}\n\n` - controller.enqueue(encoder.encode(chatIdEvent)) - logger.debug(`[${tracker.requestId}] Sent initial chatId event to client`) + controller.enqueue( + encoder.encode( + `data: ${JSON.stringify({ type: 'chat_id', chatId: actualChatId })}\n\n` + ) + ) } - // Start title generation in parallel if needed if (actualChatId && !currentChat?.title && conversationHistory.length === 0) { generateChatTitle(message) .then(async (title) => { @@ -547,311 +489,61 @@ export async function POST(req: NextRequest) { updatedAt: new Date(), }) .where(eq(copilotChats.id, actualChatId!)) - - const titleEvent = `data: ${JSON.stringify({ - type: 'title_updated', - title: title, - })}\n\n` - controller.enqueue(encoder.encode(titleEvent)) - logger.info(`[${tracker.requestId}] Generated and saved title: ${title}`) + controller.enqueue( + encoder.encode(`data: ${JSON.stringify({ type: 'title_updated', title })}\n\n`) + ) } }) .catch((error) => { logger.error(`[${tracker.requestId}] Title generation failed:`, error) }) - } else { - logger.debug(`[${tracker.requestId}] Skipping title generation`) } - // Forward the sim agent stream and capture assistant response - const reader = simAgentResponse.body!.getReader() - const decoder = new TextDecoder() - try { - while (true) { - const { done, value } = await reader.read() - if (done) { - break - } - - // Decode and parse SSE events for logging and capturing content - const decodedChunk = decoder.decode(value, { stream: true }) - buffer += decodedChunk - - const lines = buffer.split('\n') - buffer = lines.pop() || '' // Keep incomplete line in buffer - - for (const line of lines) { - if (line.trim() === '') continue // Skip empty lines - - if (line.startsWith('data: ') && line.length > 6) { - try { - const jsonStr = line.slice(6) - - // Check if the JSON string is unusually large (potential streaming issue) - if (jsonStr.length > 50000) { - // 50KB limit - logger.warn(`[${tracker.requestId}] Large SSE event detected`, { - size: jsonStr.length, - preview: `${jsonStr.substring(0, 100)}...`, - }) - } - - const event = JSON.parse(jsonStr) - - // Log different event types comprehensively - switch (event.type) { - case 'content': - if (event.data) { - assistantContent += event.data - } - break - - case 'reasoning': - logger.debug( - `[${tracker.requestId}] Reasoning chunk received (${(event.data || event.content || '').length} chars)` - ) - break - - case 'tool_call': - if (!event.data?.partial) { - toolCalls.push(event.data) - if (event.data?.id) { - announcedToolCallIds.add(event.data.id) - } - } - break - - case 'tool_generating': - if (event.toolCallId) { - startedToolExecutionIds.add(event.toolCallId) - } - break - - case 'tool_result': - if (event.toolCallId) { - completedToolExecutionIds.add(event.toolCallId) - } - break - - case 'tool_error': - logger.error(`[${tracker.requestId}] Tool error:`, { - toolCallId: event.toolCallId, - toolName: event.toolName, - error: event.error, - success: event.success, - }) - if (event.toolCallId) { - completedToolExecutionIds.add(event.toolCallId) - } - break - - case 'start': - if (event.data?.responseId) { - responseIdFromStart = event.data.responseId - } - break - - case 'done': - if (event.data?.responseId) { - responseIdFromDone = event.data.responseId - lastDoneResponseId = responseIdFromDone - - // Mark this done as safe only if no tool call is currently in progress or pending - const announced = announcedToolCallIds.size - const completed = completedToolExecutionIds.size - const started = startedToolExecutionIds.size - const hasToolInProgress = announced > completed || started > completed - if (!hasToolInProgress) { - lastSafeDoneResponseId = responseIdFromDone - } - } - break - - case 'error': - break - - default: - } - - // Emit to client: rewrite 'error' events into user-friendly assistant message - if (event?.type === 'error') { - try { - const displayMessage: string = - (event?.data && (event.data.displayMessage as string)) || - 'Sorry, I encountered an error. Please try again.' - const formatted = `_${displayMessage}_` - // Accumulate so it persists to DB as assistant content - assistantContent += formatted - // Send as content chunk - try { - controller.enqueue( - encoder.encode( - `data: ${JSON.stringify({ type: 'content', data: formatted })}\n\n` - ) - ) - } catch (enqueueErr) { - reader.cancel() - break - } - // Then close this response cleanly for the client - try { - controller.enqueue( - encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`) - ) - } catch (enqueueErr) { - reader.cancel() - break - } - } catch {} - // Do not forward the original error event - } else { - // Forward original event to client - try { - controller.enqueue(encoder.encode(`data: ${jsonStr}\n\n`)) - } catch (enqueueErr) { - reader.cancel() - break - } - } - } catch (e) { - // Enhanced error handling for large payloads and parsing issues - const lineLength = line.length - const isLargePayload = lineLength > 10000 - - if (isLargePayload) { - logger.error( - `[${tracker.requestId}] Failed to parse large SSE event (${lineLength} chars)`, - { - error: e, - preview: `${line.substring(0, 200)}...`, - size: lineLength, - } - ) - } else { - logger.warn( - `[${tracker.requestId}] Failed to parse SSE event: "${line.substring(0, 200)}..."`, - e - ) - } - } - } else if (line.trim() && line !== 'data: [DONE]') { - logger.debug(`[${tracker.requestId}] Non-SSE line from sim agent: "${line}"`) - } - } - } - - // Process any remaining buffer - if (buffer.trim()) { - logger.debug(`[${tracker.requestId}] Processing remaining buffer: "${buffer}"`) - if (buffer.startsWith('data: ')) { + const result = await orchestrateCopilotStream(requestPayload, { + userId: authenticatedUserId, + workflowId, + chatId: actualChatId, + autoExecuteTools: true, + interactive: true, + onEvent: async (event) => { try { - const jsonStr = buffer.slice(6) - const event = JSON.parse(jsonStr) - if (event.type === 'content' && event.data) { - assistantContent += event.data - } - // Forward remaining event, applying same error rewrite behavior - if (event?.type === 'error') { - const displayMessage: string = - (event?.data && (event.data.displayMessage as string)) || - 'Sorry, I encountered an error. Please try again.' - const formatted = `_${displayMessage}_` - assistantContent += formatted - try { - controller.enqueue( - encoder.encode( - `data: ${JSON.stringify({ type: 'content', data: formatted })}\n\n` - ) - ) - controller.enqueue( - encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`) - ) - } catch (enqueueErr) { - reader.cancel() - } - } else { - try { - controller.enqueue(encoder.encode(`data: ${jsonStr}\n\n`)) - } catch (enqueueErr) { - reader.cancel() - } - } - } catch (e) { - logger.warn(`[${tracker.requestId}] Failed to parse final buffer: "${buffer}"`) + controller.enqueue(encoder.encode(`data: ${JSON.stringify(event)}\n\n`)) + } catch { + controller.error('Failed to forward SSE event') } - } - } - - // Log final streaming summary - logger.info(`[${tracker.requestId}] Streaming complete summary:`, { - totalContentLength: assistantContent.length, - toolCallsCount: toolCalls.length, - hasContent: assistantContent.length > 0, - toolNames: toolCalls.map((tc) => tc?.name).filter(Boolean), + }, }) - // NOTE: Messages are saved by the client via update-messages endpoint with full contentBlocks. - // Server only updates conversationId here to avoid overwriting client's richer save. - if (currentChat) { - // Persist only a safe conversationId to avoid continuing from a state that expects tool outputs - const previousConversationId = currentChat?.conversationId as string | undefined - const responseId = lastSafeDoneResponseId || previousConversationId || undefined - - if (responseId) { - await db - .update(copilotChats) - .set({ - updatedAt: new Date(), - conversationId: responseId, - }) - .where(eq(copilotChats.id, actualChatId!)) - - logger.info( - `[${tracker.requestId}] Updated conversationId for chat ${actualChatId}`, - { - updatedConversationId: responseId, - } - ) - } + if (currentChat && result.conversationId) { + await db + .update(copilotChats) + .set({ + updatedAt: new Date(), + conversationId: result.conversationId, + }) + .where(eq(copilotChats.id, actualChatId!)) } } catch (error) { - logger.error(`[${tracker.requestId}] Error processing stream:`, error) - - // Send an error event to the client before closing so it knows what happened - try { - const errorMessage = - error instanceof Error && error.message === 'terminated' - ? 'Connection to AI service was interrupted. Please try again.' - : 'An unexpected error occurred while processing the response.' - const encoder = new TextEncoder() - - // Send error as content so it shows in the chat - controller.enqueue( - encoder.encode( - `data: ${JSON.stringify({ type: 'content', data: `\n\n_${errorMessage}_` })}\n\n` - ) + logger.error(`[${tracker.requestId}] Orchestration error:`, error) + controller.enqueue( + encoder.encode( + `data: ${JSON.stringify({ + type: 'error', + data: { + displayMessage: + 'An unexpected error occurred while processing the response.', + }, + })}\n\n` ) - // Send done event to properly close the stream on client - controller.enqueue(encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`)) - } catch (enqueueError) { - // Stream might already be closed, that's ok - logger.warn( - `[${tracker.requestId}] Could not send error event to client:`, - enqueueError - ) - } + ) } finally { - try { - controller.close() - } catch { - // Controller might already be closed - } + controller.close() } }, }) - const response = new Response(transformedStream, { + return new Response(transformedStream, { headers: { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', @@ -859,43 +551,31 @@ export async function POST(req: NextRequest) { 'X-Accel-Buffering': 'no', }, }) - - logger.info(`[${tracker.requestId}] Returning streaming response to client`, { - duration: tracker.getDuration(), - chatId: actualChatId, - headers: { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - Connection: 'keep-alive', - }, - }) - - return response } - // For non-streaming responses - const responseData = await simAgentResponse.json() - logger.info(`[${tracker.requestId}] Non-streaming response from sim agent:`, { + const nonStreamingResult = await orchestrateCopilotStream(requestPayload, { + userId: authenticatedUserId, + workflowId, + chatId: actualChatId, + autoExecuteTools: true, + interactive: true, + }) + + const responseData = { + content: nonStreamingResult.content, + toolCalls: nonStreamingResult.toolCalls, + model: selectedModel, + provider: providerConfig?.provider || env.COPILOT_PROVIDER || 'openai', + } + + logger.info(`[${tracker.requestId}] Non-streaming response from orchestrator:`, { hasContent: !!responseData.content, contentLength: responseData.content?.length || 0, model: responseData.model, provider: responseData.provider, toolCallsCount: responseData.toolCalls?.length || 0, - hasTokens: !!responseData.tokens, }) - // Log tool calls if present - if (responseData.toolCalls?.length > 0) { - responseData.toolCalls.forEach((toolCall: any) => { - logger.info(`[${tracker.requestId}] Tool call in response:`, { - id: toolCall.id, - name: toolCall.name, - success: toolCall.success, - result: `${JSON.stringify(toolCall.result).substring(0, 200)}...`, - }) - }) - } - // Save messages if we have a chat if (currentChat && responseData.content) { const userMessage = { @@ -947,6 +627,9 @@ export async function POST(req: NextRequest) { .set({ messages: updatedMessages, updatedAt: new Date(), + ...(nonStreamingResult.conversationId + ? { conversationId: nonStreamingResult.conversationId } + : {}), }) .where(eq(copilotChats.id, actualChatId!)) } diff --git a/apps/sim/app/api/v1/copilot/chat/route.ts b/apps/sim/app/api/v1/copilot/chat/route.ts new file mode 100644 index 000000000..8cd1e0104 --- /dev/null +++ b/apps/sim/app/api/v1/copilot/chat/route.ts @@ -0,0 +1,81 @@ +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { authenticateV1Request } from '@/app/api/v1/auth' +import { getCopilotModel } from '@/lib/copilot/config' +import { SIM_AGENT_VERSION } from '@/lib/copilot/constants' +import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator' + +const logger = createLogger('CopilotHeadlessAPI') + +const RequestSchema = z.object({ + message: z.string().min(1, 'message is required'), + workflowId: z.string().min(1, 'workflowId is required'), + chatId: z.string().optional(), + mode: z.enum(['agent', 'ask', 'plan']).optional().default('agent'), + model: z.string().optional(), + autoExecuteTools: z.boolean().optional().default(true), + timeout: z.number().optional().default(300000), +}) + +/** + * POST /api/v1/copilot/chat + * Headless copilot endpoint for server-side orchestration. + */ +export async function POST(req: NextRequest) { + const auth = await authenticateV1Request(req) + if (!auth.authenticated || !auth.userId) { + return NextResponse.json({ success: false, error: auth.error || 'Unauthorized' }, { status: 401 }) + } + + try { + const body = await req.json() + const parsed = RequestSchema.parse(body) + const defaults = getCopilotModel('chat') + const selectedModel = parsed.model || defaults.model + + const requestPayload = { + message: parsed.message, + workflowId: parsed.workflowId, + userId: auth.userId, + stream: true, + streamToolCalls: true, + model: selectedModel, + mode: parsed.mode, + messageId: crypto.randomUUID(), + version: SIM_AGENT_VERSION, + ...(parsed.chatId ? { chatId: parsed.chatId } : {}), + } + + const result = await orchestrateCopilotStream(requestPayload, { + userId: auth.userId, + workflowId: parsed.workflowId, + chatId: parsed.chatId, + autoExecuteTools: parsed.autoExecuteTools, + timeout: parsed.timeout, + interactive: false, + }) + + return NextResponse.json({ + success: result.success, + content: result.content, + toolCalls: result.toolCalls, + chatId: result.chatId, + conversationId: result.conversationId, + error: result.error, + }) + } catch (error) { + if (error instanceof z.ZodError) { + return NextResponse.json( + { success: false, error: 'Invalid request', details: error.errors }, + { status: 400 } + ) + } + + logger.error('Headless copilot request failed', { + error: error instanceof Error ? error.message : String(error), + }) + return NextResponse.json({ success: false, error: 'Internal server error' }, { status: 500 }) + } +} + diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/components/tool-call/tool-call.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/components/tool-call/tool-call.tsx index d22542375..04eb79699 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/components/tool-call/tool-call.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/components/tool-call/tool-call.tsx @@ -1,5 +1,6 @@ 'use client' +import { createLogger } from '@sim/logger' import { memo, useEffect, useMemo, useRef, useState } from 'react' import clsx from 'clsx' import { ChevronUp, LayoutList } from 'lucide-react' @@ -25,6 +26,7 @@ import { getBlock } from '@/blocks/registry' import type { CopilotToolCall } from '@/stores/panel' import { useCopilotStore } from '@/stores/panel' import { CLASS_TOOL_METADATA } from '@/stores/panel/copilot/store' +import { COPILOT_SERVER_ORCHESTRATED } from '@/lib/copilot/orchestrator/config' import type { SubAgentContentBlock } from '@/stores/panel/copilot/types' import { useWorkflowStore } from '@/stores/workflows/workflow/store' @@ -1259,12 +1261,36 @@ function shouldShowRunSkipButtons(toolCall: CopilotToolCall): boolean { return false } +const toolCallLogger = createLogger('CopilotToolCall') + +async function sendToolDecision(toolCallId: string, status: 'accepted' | 'rejected') { + try { + await fetch('/api/copilot/confirm', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ toolCallId, status }), + }) + } catch (error) { + toolCallLogger.warn('Failed to send tool decision', { + toolCallId, + status, + error: error instanceof Error ? error.message : String(error), + }) + } +} + async function handleRun( toolCall: CopilotToolCall, setToolCallState: any, onStateChange?: any, editedParams?: any ) { + if (COPILOT_SERVER_ORCHESTRATED) { + setToolCallState(toolCall, 'executing') + onStateChange?.('executing') + await sendToolDecision(toolCall.id, 'accepted') + return + } const instance = getClientTool(toolCall.id) if (!instance && isIntegrationTool(toolCall.name)) { @@ -1309,6 +1335,12 @@ async function handleRun( } async function handleSkip(toolCall: CopilotToolCall, setToolCallState: any, onStateChange?: any) { + if (COPILOT_SERVER_ORCHESTRATED) { + setToolCallState(toolCall, 'rejected') + onStateChange?.('rejected') + await sendToolDecision(toolCall.id, 'rejected') + return + } const instance = getClientTool(toolCall.id) if (!instance && isIntegrationTool(toolCall.name)) { diff --git a/apps/sim/lib/copilot/orchestrator/config.ts b/apps/sim/lib/copilot/orchestrator/config.ts new file mode 100644 index 000000000..3f6eb9987 --- /dev/null +++ b/apps/sim/lib/copilot/orchestrator/config.ts @@ -0,0 +1,23 @@ +/** + * Feature flag for server-side copilot orchestration. + */ +export const COPILOT_SERVER_ORCHESTRATED = true + +export const INTERRUPT_TOOL_NAMES = [ + 'set_global_workflow_variables', + 'run_workflow', + 'manage_mcp_tool', + 'manage_custom_tool', + 'deploy_mcp', + 'deploy_chat', + 'deploy_api', + 'create_workspace_mcp_server', + 'set_environment_variables', + 'make_api_request', + 'oauth_request_access', + 'navigate_ui', + 'knowledge_base', +] as const + +export const INTERRUPT_TOOL_SET = new Set(INTERRUPT_TOOL_NAMES) + diff --git a/apps/sim/lib/copilot/orchestrator/index.ts b/apps/sim/lib/copilot/orchestrator/index.ts new file mode 100644 index 000000000..24746c24d --- /dev/null +++ b/apps/sim/lib/copilot/orchestrator/index.ts @@ -0,0 +1,181 @@ +import { createLogger } from '@sim/logger' +import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants' +import { env } from '@/lib/core/config/env' +import { parseSSEStream } from '@/lib/copilot/orchestrator/sse-parser' +import { + handleSubagentRouting, + sseHandlers, + subAgentHandlers, +} from '@/lib/copilot/orchestrator/sse-handlers' +import { prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor' +import type { + ExecutionContext, + OrchestratorOptions, + OrchestratorResult, + SSEEvent, + StreamingContext, + ToolCallSummary, +} from '@/lib/copilot/orchestrator/types' + +const logger = createLogger('CopilotOrchestrator') +const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT + +export interface OrchestrateStreamOptions extends OrchestratorOptions { + userId: string + workflowId: string + chatId?: string +} + +/** + * Orchestrate a copilot SSE stream and execute tool calls server-side. + */ +export async function orchestrateCopilotStream( + requestPayload: Record, + options: OrchestrateStreamOptions +): Promise { + const { userId, workflowId, chatId, timeout = 300000, abortSignal } = options + const execContext = await prepareExecutionContext(userId, workflowId) + + const context: StreamingContext = { + chatId, + conversationId: undefined, + messageId: requestPayload?.messageId || crypto.randomUUID(), + accumulatedContent: '', + contentBlocks: [], + toolCalls: new Map(), + currentThinkingBlock: null, + isInThinkingBlock: false, + subAgentParentToolCallId: undefined, + subAgentContent: {}, + subAgentToolCalls: {}, + pendingContent: '', + streamComplete: false, + wasAborted: false, + errors: [], + } + + try { + const response = await fetch(`${SIM_AGENT_API_URL}/api/chat-completion-streaming`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + ...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}), + }, + body: JSON.stringify(requestPayload), + signal: abortSignal, + }) + + if (!response.ok) { + const errorText = await response.text().catch(() => '') + throw new Error(`Copilot backend error (${response.status}): ${errorText || response.statusText}`) + } + + if (!response.body) { + throw new Error('Copilot backend response missing body') + } + + const reader = response.body.getReader() + const decoder = new TextDecoder() + + const timeoutId = setTimeout(() => { + context.errors.push('Request timed out') + context.streamComplete = true + reader.cancel().catch(() => {}) + }, timeout) + + try { + for await (const event of parseSSEStream(reader, decoder, abortSignal)) { + if (abortSignal?.aborted) { + context.wasAborted = true + break + } + + await forwardEvent(event, options) + + if (event.type === 'subagent_start') { + const toolCallId = event.data?.tool_call_id + if (toolCallId) { + context.subAgentParentToolCallId = toolCallId + context.subAgentContent[toolCallId] = '' + context.subAgentToolCalls[toolCallId] = [] + } + continue + } + + if (event.type === 'subagent_end') { + context.subAgentParentToolCallId = undefined + continue + } + + if (handleSubagentRouting(event, context)) { + const handler = subAgentHandlers[event.type] + if (handler) { + await handler(event, context, execContext, options) + } + if (context.streamComplete) break + continue + } + + const handler = sseHandlers[event.type] + if (handler) { + await handler(event, context, execContext, options) + } + if (context.streamComplete) break + } + } finally { + clearTimeout(timeoutId) + } + + const result = buildResult(context) + await options.onComplete?.(result) + return result + } catch (error) { + const err = error instanceof Error ? error : new Error('Copilot orchestration failed') + logger.error('Copilot orchestration failed', { error: err.message }) + await options.onError?.(err) + return { + success: false, + content: '', + contentBlocks: [], + toolCalls: [], + chatId: context.chatId, + conversationId: context.conversationId, + error: err.message, + } + } +} + +async function forwardEvent(event: SSEEvent, options: OrchestratorOptions): Promise { + try { + await options.onEvent?.(event) + } catch (error) { + logger.warn('Failed to forward SSE event', { + type: event.type, + error: error instanceof Error ? error.message : String(error), + }) + } +} + +function buildResult(context: StreamingContext): OrchestratorResult { + const toolCalls: ToolCallSummary[] = Array.from(context.toolCalls.values()).map((toolCall) => ({ + id: toolCall.id, + name: toolCall.name, + status: toolCall.status, + params: toolCall.params, + result: toolCall.result?.output, + error: toolCall.error, + durationMs: + toolCall.endTime && toolCall.startTime ? toolCall.endTime - toolCall.startTime : undefined, + })) + + return { + success: context.errors.length === 0, + content: context.accumulatedContent, + contentBlocks: context.contentBlocks, + toolCalls, + chatId: context.chatId, + conversationId: context.conversationId, + errors: context.errors.length ? context.errors : undefined, + } +} + diff --git a/apps/sim/lib/copilot/orchestrator/persistence.ts b/apps/sim/lib/copilot/orchestrator/persistence.ts new file mode 100644 index 000000000..d7b015f00 --- /dev/null +++ b/apps/sim/lib/copilot/orchestrator/persistence.ts @@ -0,0 +1,138 @@ +import { db } from '@sim/db' +import { copilotChats } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq } from 'drizzle-orm' +import { getRedisClient } from '@/lib/core/config/redis' + +const logger = createLogger('CopilotOrchestratorPersistence') + +/** + * Create a new copilot chat record. + */ +export async function createChat(params: { + userId: string + workflowId: string + model: string +}): Promise<{ id: string }> { + const [chat] = await db + .insert(copilotChats) + .values({ + userId: params.userId, + workflowId: params.workflowId, + model: params.model, + messages: [], + }) + .returning({ id: copilotChats.id }) + + return { id: chat.id } +} + +/** + * Load an existing chat for a user. + */ +export async function loadChat(chatId: string, userId: string) { + const [chat] = await db + .select() + .from(copilotChats) + .where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, userId))) + .limit(1) + + return chat || null +} + +/** + * Save chat messages and metadata. + */ +export async function saveMessages( + chatId: string, + messages: any[], + options?: { + title?: string + conversationId?: string + planArtifact?: string | null + config?: { mode?: string; model?: string } + } +): Promise { + await db + .update(copilotChats) + .set({ + messages, + updatedAt: new Date(), + ...(options?.title ? { title: options.title } : {}), + ...(options?.conversationId ? { conversationId: options.conversationId } : {}), + ...(options?.planArtifact !== undefined ? { planArtifact: options.planArtifact } : {}), + ...(options?.config ? { config: options.config } : {}), + }) + .where(eq(copilotChats.id, chatId)) +} + +/** + * Update the conversationId for a chat without overwriting messages. + */ +export async function updateChatConversationId(chatId: string, conversationId: string): Promise { + await db + .update(copilotChats) + .set({ + conversationId, + updatedAt: new Date(), + }) + .where(eq(copilotChats.id, chatId)) +} + +/** + * Set a tool call confirmation status in Redis. + */ +export async function setToolConfirmation( + toolCallId: string, + status: 'accepted' | 'rejected' | 'background' | 'pending', + message?: string +): Promise { + const redis = getRedisClient() + if (!redis) { + logger.warn('Redis client not available for tool confirmation') + return false + } + + const key = `tool_call:${toolCallId}` + const payload = { + status, + message: message || null, + timestamp: new Date().toISOString(), + } + + try { + await redis.set(key, JSON.stringify(payload), 'EX', 86400) + return true + } catch (error) { + logger.error('Failed to set tool confirmation', { + toolCallId, + error: error instanceof Error ? error.message : String(error), + }) + return false + } +} + +/** + * Get a tool call confirmation status from Redis. + */ +export async function getToolConfirmation(toolCallId: string): Promise<{ + status: string + message?: string + timestamp?: string +} | null> { + const redis = getRedisClient() + if (!redis) return null + + try { + const data = await redis.get(`tool_call:${toolCallId}`) + if (!data) return null + return JSON.parse(data) as { status: string; message?: string; timestamp?: string } + } catch (error) { + logger.error('Failed to read tool confirmation', { + toolCallId, + error: error instanceof Error ? error.message : String(error), + }) + return null + } +} + diff --git a/apps/sim/lib/copilot/orchestrator/sse-handlers.ts b/apps/sim/lib/copilot/orchestrator/sse-handlers.ts new file mode 100644 index 000000000..101b28138 --- /dev/null +++ b/apps/sim/lib/copilot/orchestrator/sse-handlers.ts @@ -0,0 +1,342 @@ +import { createLogger } from '@sim/logger' +import type { + ContentBlock, + ExecutionContext, + OrchestratorOptions, + SSEEvent, + StreamingContext, + ToolCallState, +} from '@/lib/copilot/orchestrator/types' +import { executeToolServerSide, markToolComplete } from '@/lib/copilot/orchestrator/tool-executor' +import { getToolConfirmation } from '@/lib/copilot/orchestrator/persistence' +import { INTERRUPT_TOOL_SET } from '@/lib/copilot/orchestrator/config' + +const logger = createLogger('CopilotSseHandlers') + +export type SSEHandler = ( + event: SSEEvent, + context: StreamingContext, + execContext: ExecutionContext, + options: OrchestratorOptions +) => void | Promise + +function addContentBlock( + context: StreamingContext, + block: Omit +): void { + context.contentBlocks.push({ + ...block, + timestamp: Date.now(), + }) +} + +async function executeToolAndReport( + toolCallId: string, + context: StreamingContext, + execContext: ExecutionContext, + options?: OrchestratorOptions +): Promise { + const toolCall = context.toolCalls.get(toolCallId) + if (!toolCall) return + + if (toolCall.status === 'executing') return + + toolCall.status = 'executing' + try { + const result = await executeToolServerSide(toolCall, execContext) + toolCall.status = result.success ? 'success' : 'error' + toolCall.result = result + toolCall.error = result.error + toolCall.endTime = Date.now() + + await markToolComplete( + toolCall.id, + toolCall.name, + result.success ? 200 : 500, + result.error || (result.success ? 'Tool completed' : 'Tool failed'), + result.output + ) + + await options?.onEvent?.({ + type: 'tool_result', + toolCallId: toolCall.id, + data: { + id: toolCall.id, + name: toolCall.name, + success: result.success, + result: result.output, + }, + }) + } catch (error) { + toolCall.status = 'error' + toolCall.error = error instanceof Error ? error.message : String(error) + toolCall.endTime = Date.now() + + await markToolComplete(toolCall.id, toolCall.name, 500, toolCall.error) + + await options?.onEvent?.({ + type: 'tool_error', + toolCallId: toolCall.id, + data: { + id: toolCall.id, + name: toolCall.name, + error: toolCall.error, + }, + }) + } +} + +async function waitForToolDecision( + toolCallId: string, + timeoutMs: number +): Promise<{ status: string; message?: string } | null> { + const start = Date.now() + while (Date.now() - start < timeoutMs) { + const decision = await getToolConfirmation(toolCallId) + if (decision?.status) { + return decision + } + await new Promise((resolve) => setTimeout(resolve, 100)) + } + return null +} + +export const sseHandlers: Record = { + chat_id: (event, context) => { + context.chatId = event.data?.chatId + }, + title_updated: () => {}, + tool_result: (event, context) => { + const toolCallId = event.toolCallId || event.data?.id + if (!toolCallId) return + const current = context.toolCalls.get(toolCallId) + if (!current) return + + const success = event.data?.success ?? event.data?.result?.success + current.status = success ? 'success' : 'error' + current.endTime = Date.now() + if (event.data?.result || event.data?.data) { + current.result = { + success: !!success, + output: event.data?.result || event.data?.data, + } + } + }, + tool_error: (event, context) => { + const toolCallId = event.toolCallId || event.data?.id + if (!toolCallId) return + const current = context.toolCalls.get(toolCallId) + if (!current) return + current.status = 'error' + current.error = event.data?.error || 'Tool execution failed' + current.endTime = Date.now() + }, + tool_generating: (event, context) => { + const toolCallId = event.toolCallId || event.data?.toolCallId || event.data?.id + const toolName = event.toolName || event.data?.toolName || event.data?.name + if (!toolCallId || !toolName) return + if (!context.toolCalls.has(toolCallId)) { + context.toolCalls.set(toolCallId, { + id: toolCallId, + name: toolName, + status: 'pending', + startTime: Date.now(), + }) + } + }, + tool_call: async (event, context, execContext, options) => { + const toolData = event.data || {} + const toolCallId = toolData.id || event.toolCallId + const toolName = toolData.name || event.toolName + if (!toolCallId || !toolName) return + + const args = toolData.arguments || toolData.input || event.data?.input + const isPartial = toolData.partial === true + const existing = context.toolCalls.get(toolCallId) + const toolCall: ToolCallState = existing + ? { ...existing, status: 'pending', params: args || existing.params } + : { + id: toolCallId, + name: toolName, + status: 'pending', + params: args, + startTime: Date.now(), + } + + context.toolCalls.set(toolCallId, toolCall) + addContentBlock(context, { type: 'tool_call', toolCall }) + + if (isPartial) return + + const isInterruptTool = INTERRUPT_TOOL_SET.has(toolName) + const isInteractive = options.interactive === true + + if (isInterruptTool && isInteractive) { + const decision = await waitForToolDecision(toolCallId, options.timeout || 600000) + if (decision?.status === 'accepted' || decision?.status === 'success') { + await executeToolAndReport(toolCallId, context, execContext, options) + return + } + + if (decision?.status === 'rejected' || decision?.status === 'error') { + toolCall.status = 'rejected' + toolCall.endTime = Date.now() + await markToolComplete( + toolCall.id, + toolCall.name, + 400, + decision.message || 'Tool execution rejected', + { skipped: true, reason: 'user_rejected' } + ) + await options.onEvent?.({ + type: 'tool_result', + toolCallId: toolCall.id, + data: { + id: toolCall.id, + name: toolCall.name, + success: false, + result: { skipped: true, reason: 'user_rejected' }, + }, + }) + return + } + + if (decision?.status === 'background') { + toolCall.status = 'skipped' + toolCall.endTime = Date.now() + await markToolComplete( + toolCall.id, + toolCall.name, + 202, + decision.message || 'Tool execution moved to background', + { background: true } + ) + await options.onEvent?.({ + type: 'tool_result', + toolCallId: toolCall.id, + data: { + id: toolCall.id, + name: toolCall.name, + success: true, + result: { background: true }, + }, + }) + return + } + } + + if (options.autoExecuteTools !== false) { + await executeToolAndReport(toolCallId, context, execContext, options) + } + }, + reasoning: (event, context) => { + const phase = event.data?.phase || event.data?.data?.phase + if (phase === 'start') { + context.isInThinkingBlock = true + context.currentThinkingBlock = { + type: 'thinking', + content: '', + timestamp: Date.now(), + } + return + } + if (phase === 'end') { + if (context.currentThinkingBlock) { + context.contentBlocks.push(context.currentThinkingBlock) + } + context.isInThinkingBlock = false + context.currentThinkingBlock = null + return + } + const chunk = typeof event.data === 'string' ? event.data : event.data?.data || event.data?.content + if (!chunk || !context.currentThinkingBlock) return + context.currentThinkingBlock.content = `${context.currentThinkingBlock.content || ''}${chunk}` + }, + content: (event, context) => { + const chunk = typeof event.data === 'string' ? event.data : event.data?.content || event.data?.data + if (!chunk) return + context.accumulatedContent += chunk + addContentBlock(context, { type: 'text', content: chunk }) + }, + done: (event, context) => { + if (event.data?.responseId) { + context.conversationId = event.data.responseId + } + context.streamComplete = true + }, + start: (event, context) => { + if (event.data?.responseId) { + context.conversationId = event.data.responseId + } + }, + error: (event, context) => { + const message = + event.data?.message || event.data?.error || (typeof event.data === 'string' ? event.data : null) + if (message) { + context.errors.push(message) + } + context.streamComplete = true + }, +} + +export const subAgentHandlers: Record = { + content: (event, context) => { + const parentToolCallId = context.subAgentParentToolCallId + if (!parentToolCallId || !event.data) return + const chunk = typeof event.data === 'string' ? event.data : event.data?.content || '' + if (!chunk) return + context.subAgentContent[parentToolCallId] = (context.subAgentContent[parentToolCallId] || '') + chunk + addContentBlock(context, { type: 'subagent_text', content: chunk }) + }, + tool_call: async (event, context, execContext, options) => { + const parentToolCallId = context.subAgentParentToolCallId + if (!parentToolCallId) return + const toolData = event.data || {} + const toolCallId = toolData.id || event.toolCallId + const toolName = toolData.name || event.toolName + if (!toolCallId || !toolName) return + const isPartial = toolData.partial === true + const args = toolData.arguments || toolData.input || event.data?.input + + const toolCall: ToolCallState = { + id: toolCallId, + name: toolName, + status: 'pending', + params: args, + startTime: Date.now(), + } + if (!context.subAgentToolCalls[parentToolCallId]) { + context.subAgentToolCalls[parentToolCallId] = [] + } + context.subAgentToolCalls[parentToolCallId].push(toolCall) + + if (isPartial) return + if (options.autoExecuteTools !== false) { + await executeToolAndReport(toolCallId, context, execContext, options) + } + }, + tool_result: (event, context) => { + const parentToolCallId = context.subAgentParentToolCallId + if (!parentToolCallId) return + const toolCallId = event.toolCallId || event.data?.id + if (!toolCallId) return + const toolCalls = context.subAgentToolCalls[parentToolCallId] || [] + const toolCall = toolCalls.find((tc) => tc.id === toolCallId) + if (!toolCall) return + toolCall.status = event.data?.success ? 'success' : 'error' + toolCall.endTime = Date.now() + }, +} + +export function handleSubagentRouting(event: SSEEvent, context: StreamingContext): boolean { + if (!event.subagent) return false + if (!context.subAgentParentToolCallId) { + logger.warn('Subagent event missing parent tool call', { + type: event.type, + subagent: event.subagent, + }) + return false + } + return true +} + diff --git a/apps/sim/lib/copilot/orchestrator/sse-parser.ts b/apps/sim/lib/copilot/orchestrator/sse-parser.ts new file mode 100644 index 000000000..06873289e --- /dev/null +++ b/apps/sim/lib/copilot/orchestrator/sse-parser.ts @@ -0,0 +1,72 @@ +import { createLogger } from '@sim/logger' +import type { SSEEvent } from '@/lib/copilot/orchestrator/types' + +const logger = createLogger('CopilotSseParser') + +/** + * Parses SSE streams from the copilot backend into typed events. + */ +export async function* parseSSEStream( + reader: ReadableStreamDefaultReader, + decoder: TextDecoder, + abortSignal?: AbortSignal +): AsyncGenerator { + let buffer = '' + + try { + while (true) { + if (abortSignal?.aborted) { + logger.info('SSE stream aborted by signal') + break + } + + const { done, value } = await reader.read() + if (done) break + + buffer += decoder.decode(value, { stream: true }) + const lines = buffer.split('\n') + buffer = lines.pop() || '' + + for (const line of lines) { + if (!line.trim()) continue + if (!line.startsWith('data: ')) continue + + const jsonStr = line.slice(6) + if (jsonStr === '[DONE]') continue + + try { + const event = JSON.parse(jsonStr) as SSEEvent + if (event?.type) { + yield event + } + } catch (error) { + logger.warn('Failed to parse SSE event', { + preview: jsonStr.slice(0, 200), + error: error instanceof Error ? error.message : String(error), + }) + } + } + } + + if (buffer.trim() && buffer.startsWith('data: ')) { + try { + const event = JSON.parse(buffer.slice(6)) as SSEEvent + if (event?.type) { + yield event + } + } catch (error) { + logger.warn('Failed to parse final SSE buffer', { + preview: buffer.slice(0, 200), + error: error instanceof Error ? error.message : String(error), + }) + } + } + } finally { + try { + reader.releaseLock() + } catch { + logger.warn('Failed to release SSE reader lock') + } + } +} + diff --git a/apps/sim/lib/copilot/orchestrator/tool-executor.ts b/apps/sim/lib/copilot/orchestrator/tool-executor.ts new file mode 100644 index 000000000..fe288f244 --- /dev/null +++ b/apps/sim/lib/copilot/orchestrator/tool-executor.ts @@ -0,0 +1,239 @@ +import { db } from '@sim/db' +import { account, workflow } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq } from 'drizzle-orm' +import { refreshTokenIfNeeded } from '@/app/api/auth/oauth/utils' +import { resolveEnvVarReferences } from '@/executor/utils/reference-validation' +import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants' +import { generateRequestId } from '@/lib/core/utils/request' +import { env } from '@/lib/core/config/env' +import { getEffectiveDecryptedEnv } from '@/lib/environment/utils' +import { executeTool } from '@/tools' +import { getTool, resolveToolId } from '@/tools/utils' +import { routeExecution } from '@/lib/copilot/tools/server/router' +import type { ExecutionContext, ToolCallResult, ToolCallState } from '@/lib/copilot/orchestrator/types' + +const logger = createLogger('CopilotToolExecutor') +const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT + +const SERVER_TOOLS = new Set([ + 'get_blocks_and_tools', + 'get_blocks_metadata', + 'get_block_options', + 'get_block_config', + 'get_trigger_blocks', + 'edit_workflow', + 'get_workflow_console', + 'search_documentation', + 'search_online', + 'set_environment_variables', + 'get_credentials', + 'make_api_request', + 'knowledge_base', +]) + +/** + * Execute a tool server-side without calling internal routes. + */ +export async function executeToolServerSide( + toolCall: ToolCallState, + context: ExecutionContext +): Promise { + const toolName = toolCall.name + const resolvedToolName = resolveToolId(toolName) + + if (SERVER_TOOLS.has(toolName)) { + return executeServerToolDirect(toolName, toolCall.params || {}, context) + } + + const toolConfig = getTool(resolvedToolName) + if (!toolConfig) { + logger.warn('Tool not found in registry', { toolName, resolvedToolName }) + return { + success: false, + error: `Tool not found: ${toolName}`, + } + } + + return executeIntegrationToolDirect(toolCall, toolConfig, context) +} + +/** + * Execute a server tool directly via the server tool router. + */ +async function executeServerToolDirect( + toolName: string, + params: Record, + context: ExecutionContext +): Promise { + try { + const result = await routeExecution(toolName, params, { userId: context.userId }) + return { success: true, output: result } + } catch (error) { + logger.error('Server tool execution failed', { + toolName, + error: error instanceof Error ? error.message : String(error), + }) + return { + success: false, + error: error instanceof Error ? error.message : 'Server tool execution failed', + } + } +} + +/** + * Execute an integration tool directly via the tools registry. + */ +async function executeIntegrationToolDirect( + toolCall: ToolCallState, + toolConfig: any, + context: ExecutionContext +): Promise { + const { userId, workflowId } = context + const toolName = resolveToolId(toolCall.name) + const toolArgs = toolCall.params || {} + + let workspaceId = context.workspaceId + if (!workspaceId && workflowId) { + const workflowResult = await db + .select({ workspaceId: workflow.workspaceId }) + .from(workflow) + .where(eq(workflow.id, workflowId)) + .limit(1) + workspaceId = workflowResult[0]?.workspaceId ?? undefined + } + + const decryptedEnvVars = + context.decryptedEnvVars || (await getEffectiveDecryptedEnv(userId, workspaceId)) + + const executionParams: Record = resolveEnvVarReferences( + toolArgs, + decryptedEnvVars, + { deep: true } + ) as Record + + if (toolConfig.oauth?.required && toolConfig.oauth.provider) { + const provider = toolConfig.oauth.provider + const accounts = await db + .select() + .from(account) + .where(and(eq(account.providerId, provider), eq(account.userId, userId))) + .limit(1) + + if (!accounts.length) { + return { + success: false, + error: `No ${provider} account connected. Please connect your account first.`, + } + } + + const acc = accounts[0] + const requestId = generateRequestId() + const { accessToken } = await refreshTokenIfNeeded(requestId, acc as any, acc.id) + + if (!accessToken) { + return { + success: false, + error: `OAuth token not available for ${provider}. Please reconnect your account.`, + } + } + + executionParams.accessToken = accessToken + } + + if (toolConfig.params?.apiKey?.required && !executionParams.apiKey) { + return { + success: false, + error: `API key not provided for ${toolName}. Use {{YOUR_API_KEY_ENV_VAR}} to reference your environment variable.`, + } + } + + executionParams._context = { + workflowId, + userId, + } + + if (toolName === 'function_execute') { + executionParams.envVars = decryptedEnvVars + executionParams.workflowVariables = {} + executionParams.blockData = {} + executionParams.blockNameMapping = {} + executionParams.language = executionParams.language || 'javascript' + executionParams.timeout = executionParams.timeout || 30000 + } + + const result = await executeTool(toolName, executionParams) + + return { + success: result.success, + output: result.output, + error: result.error, + } +} + +/** + * Notify the copilot backend that a tool has completed. + */ +export async function markToolComplete( + toolCallId: string, + toolName: string, + status: number, + message?: any, + data?: any +): Promise { + try { + const response = await fetch(`${SIM_AGENT_API_URL}/api/tools/mark-complete`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + ...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}), + }, + body: JSON.stringify({ + id: toolCallId, + name: toolName, + status, + message, + data, + }), + }) + + if (!response.ok) { + logger.warn('Mark-complete call failed', { toolCallId, status: response.status }) + return false + } + + return true + } catch (error) { + logger.error('Mark-complete call failed', { + toolCallId, + error: error instanceof Error ? error.message : String(error), + }) + return false + } +} + +/** + * Prepare execution context with cached environment values. + */ +export async function prepareExecutionContext( + userId: string, + workflowId: string +): Promise { + let workspaceId: string | undefined + const workflowResult = await db + .select({ workspaceId: workflow.workspaceId }) + .from(workflow) + .where(eq(workflow.id, workflowId)) + .limit(1) + workspaceId = workflowResult[0]?.workspaceId ?? undefined + + const decryptedEnvVars = await getEffectiveDecryptedEnv(userId, workspaceId) + + return { + userId, + workflowId, + workspaceId, + decryptedEnvVars, + } +} + diff --git a/apps/sim/lib/copilot/orchestrator/types.ts b/apps/sim/lib/copilot/orchestrator/types.ts new file mode 100644 index 000000000..f4adbdeea --- /dev/null +++ b/apps/sim/lib/copilot/orchestrator/types.ts @@ -0,0 +1,127 @@ +import type { CopilotProviderConfig } from '@/lib/copilot/types' + +export type SSEEventType = + | 'chat_id' + | 'title_updated' + | 'content' + | 'reasoning' + | 'tool_call' + | 'tool_generating' + | 'tool_result' + | 'tool_error' + | 'subagent_start' + | 'subagent_end' + | 'done' + | 'error' + | 'start' + +export interface SSEEvent { + type: SSEEventType + data?: any + subagent?: string + toolCallId?: string + toolName?: string +} + +export type ToolCallStatus = 'pending' | 'executing' | 'success' | 'error' | 'skipped' | 'rejected' + +export interface ToolCallState { + id: string + name: string + status: ToolCallStatus + params?: Record + result?: ToolCallResult + error?: string + startTime?: number + endTime?: number +} + +export interface ToolCallResult { + success: boolean + output?: any + error?: string +} + +export type ContentBlockType = 'text' | 'thinking' | 'tool_call' | 'subagent_text' + +export interface ContentBlock { + type: ContentBlockType + content?: string + toolCall?: ToolCallState + timestamp: number +} + +export interface StreamingContext { + chatId?: string + conversationId?: string + messageId: string + accumulatedContent: string + contentBlocks: ContentBlock[] + toolCalls: Map + currentThinkingBlock: ContentBlock | null + isInThinkingBlock: boolean + subAgentParentToolCallId?: string + subAgentContent: Record + subAgentToolCalls: Record + pendingContent: string + streamComplete: boolean + wasAborted: boolean + errors: string[] +} + +export interface OrchestratorRequest { + message: string + workflowId: string + userId: string + chatId?: string + mode?: 'agent' | 'ask' | 'plan' + model?: string + conversationId?: string + contexts?: Array<{ type: string; content: string }> + fileAttachments?: any[] + commands?: string[] + provider?: CopilotProviderConfig + streamToolCalls?: boolean + version?: string + prefetch?: boolean + userName?: string +} + +export interface OrchestratorOptions { + autoExecuteTools?: boolean + timeout?: number + onEvent?: (event: SSEEvent) => void | Promise + onComplete?: (result: OrchestratorResult) => void | Promise + onError?: (error: Error) => void | Promise + abortSignal?: AbortSignal + interactive?: boolean +} + +export interface OrchestratorResult { + success: boolean + content: string + contentBlocks: ContentBlock[] + toolCalls: ToolCallSummary[] + chatId?: string + conversationId?: string + error?: string + errors?: string[] +} + +export interface ToolCallSummary { + id: string + name: string + status: ToolCallStatus + params?: Record + result?: any + error?: string + durationMs?: number +} + +export interface ExecutionContext { + userId: string + workflowId: string + workspaceId?: string + decryptedEnvVars?: Record +} + diff --git a/apps/sim/stores/panel/copilot/store.ts b/apps/sim/stores/panel/copilot/store.ts index e368d412e..3cdff056b 100644 --- a/apps/sim/stores/panel/copilot/store.ts +++ b/apps/sim/stores/panel/copilot/store.ts @@ -54,6 +54,7 @@ import { TestClientTool } from '@/lib/copilot/tools/client/other/test' import { TourClientTool } from '@/lib/copilot/tools/client/other/tour' import { WorkflowClientTool } from '@/lib/copilot/tools/client/other/workflow' import { createExecutionContext, getTool } from '@/lib/copilot/tools/client/registry' +import { COPILOT_SERVER_ORCHESTRATED } from '@/lib/copilot/orchestrator/config' import { GetCredentialsClientTool } from '@/lib/copilot/tools/client/user/get-credentials' import { SetEnvironmentVariablesClientTool } from '@/lib/copilot/tools/client/user/set-environment-variables' import { CheckDeploymentStatusClientTool } from '@/lib/copilot/tools/client/workflow/check-deployment-status' @@ -1198,6 +1199,18 @@ const sseHandlers: Record = { } } catch {} } + + if (COPILOT_SERVER_ORCHESTRATED && current.name === 'edit_workflow') { + try { + const resultPayload = + data?.result || data?.data?.result || data?.data?.data || data?.data || {} + const workflowState = resultPayload?.workflowState + if (workflowState) { + const diffStore = useWorkflowDiffStore.getState() + void diffStore.setProposedChanges(workflowState) + } + } catch {} + } } // Update inline content block state @@ -1362,6 +1375,10 @@ const sseHandlers: Record = { return } + if (COPILOT_SERVER_ORCHESTRATED) { + return + } + // Prefer interface-based registry to determine interrupt and execute try { const def = name ? getTool(name) : undefined @@ -3820,6 +3837,9 @@ export const useCopilotStore = create()( setEnabledModels: (models) => set({ enabledModels: models }), executeIntegrationTool: async (toolCallId: string) => { + if (COPILOT_SERVER_ORCHESTRATED) { + return + } const { toolCallsById, workflowId } = get() const toolCall = toolCallsById[toolCallId] if (!toolCall || !workflowId) return diff --git a/docs/COPILOT_SERVER_REFACTOR.md b/docs/COPILOT_SERVER_REFACTOR.md new file mode 100644 index 000000000..a58e5aa6a --- /dev/null +++ b/docs/COPILOT_SERVER_REFACTOR.md @@ -0,0 +1,927 @@ +# Copilot Server-Side Refactor Plan + +> **Goal**: Move copilot orchestration logic from the browser (React/Zustand) to the Next.js server, enabling both headless API access and a simplified interactive client. + +## Table of Contents + +1. [Executive Summary](#executive-summary) +2. [Current Architecture](#current-architecture) +3. [Target Architecture](#target-architecture) +4. [Scope & Boundaries](#scope--boundaries) +5. [Module Design](#module-design) +6. [Implementation Plan](#implementation-plan) +7. [API Contracts](#api-contracts) +8. [Migration Strategy](#migration-strategy) +9. [Testing Strategy](#testing-strategy) +10. [Risks & Mitigations](#risks--mitigations) +11. [File Inventory](#file-inventory) + +--- + +## Executive Summary + +### Problem + +The current copilot implementation in Sim has all orchestration logic in the browser: +- SSE stream parsing happens in the React client +- Tool execution is triggered from the browser +- OAuth tokens are sent to the client +- No headless/API access is possible +- The Zustand store is ~4,200 lines of complex async logic + +### Solution + +Move orchestration to the Next.js server: +- Server parses SSE from copilot backend +- Server executes tools directly (no HTTP round-trips) +- Server forwards events to client (if attached) +- Headless API returns JSON response +- Client store becomes a thin UI layer (~600 lines) + +### Benefits + +| Aspect | Before | After | +|--------|--------|-------| +| Security | OAuth tokens in browser | Tokens stay server-side | +| Headless access | Not possible | Full API support | +| Store complexity | ~4,200 lines | ~600 lines | +| Tool execution | Browser-initiated | Server-side | +| Testing | Complex async | Simple state | +| Bundle size | Large (tool classes) | Minimal | + +--- + +## Current Architecture + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ BROWSER (React) │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────────────────────────────────────────────────────────────────────┐│ +│ │ Copilot Store (4,200 lines) ││ +│ │ ││ +│ │ • SSE stream parsing (parseSSEStream) ││ +│ │ • Event handlers (sseHandlers, subAgentSSEHandlers) ││ +│ │ • Tool execution logic ││ +│ │ • Client tool instantiation ││ +│ │ • Content block processing ││ +│ │ • State management ││ +│ │ • UI state ││ +│ └─────────────────────────────────────────────────────────────────────────┘│ +│ │ │ +│ │ HTTP calls for tool execution │ +│ ▼ │ +└─────────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ NEXT.JS SERVER │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ /api/copilot/chat - Proxy to copilot backend (pass-through) │ +│ /api/copilot/execute-tool - Execute integration tools │ +│ /api/copilot/confirm - Update Redis with tool status │ +│ /api/copilot/tools/mark-complete - Notify copilot backend │ +│ /api/copilot/execute-copilot-server-tool - Execute server tools │ +│ │ +└─────────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ COPILOT BACKEND (Go) │ +│ copilot.sim.ai │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ • LLM orchestration │ +│ • Subagent system (plan, edit, debug, etc.) │ +│ • Tool definitions │ +│ • Conversation management │ +│ • SSE streaming │ +│ │ +└─────────────────────────────────────────────────────────────────────────────┘ +``` + +### Current Flow (Interactive) + +1. User sends message in UI +2. Store calls `/api/copilot/chat` +3. Chat route proxies to copilot backend, streams SSE back +4. **Store parses SSE in browser** +5. On `tool_call` event: + - Store decides if tool needs confirmation + - Store calls `/api/copilot/execute-tool` or `/api/copilot/execute-copilot-server-tool` + - Store calls `/api/copilot/tools/mark-complete` +6. Store updates UI state + +### Problems with Current Flow + +1. **No headless access**: Must have browser client +2. **Security**: OAuth tokens sent to browser for tool execution +3. **Complexity**: All orchestration logic in Zustand store +4. **Performance**: Multiple HTTP round-trips from browser +5. **Reliability**: Browser can disconnect mid-operation +6. **Testing**: Hard to test async browser logic + +--- + +## Target Architecture + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ BROWSER (React) │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────────────────────────────────────────────────────────────────────┐│ +│ │ Copilot Store (~600 lines) ││ +│ │ ││ +│ │ • UI state (messages, toolCalls display) ││ +│ │ • Event listener (receive server events) ││ +│ │ • User actions (send message, confirm/reject) ││ +│ │ • Simple API calls ││ +│ └─────────────────────────────────────────────────────────────────────────┘│ +│ │ │ +│ │ SSE events from server │ +│ │ │ +└─────────────────────────────────────────────────────────────────────────────┘ + ▲ + │ (Optional - headless mode has no client) + │ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ NEXT.JS SERVER │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────────────────────────────────────────────────────────────────────┐│ +│ │ Orchestrator Module (NEW) ││ +│ │ lib/copilot/orchestrator/ ││ +│ │ ││ +│ │ • SSE stream parsing ││ +│ │ • Event handlers ││ +│ │ • Tool execution (direct function calls) ││ +│ │ • Response building ││ +│ │ • Event forwarding (to client if attached) ││ +│ └─────────────────────────────────────────────────────────────────────────┘│ +│ │ │ +│ ┌──────┴──────┐ │ +│ │ │ │ +│ ▼ ▼ │ +│ /api/copilot/chat /api/v1/copilot/chat │ +│ (Interactive) (Headless) │ +│ - Session auth - API key auth │ +│ - SSE to client - JSON response │ +│ │ +└─────────────────────────────────────────────────────────────────────────────┘ + │ + │ (Single external HTTP call) + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ COPILOT BACKEND (Go) │ +│ (UNCHANGED - no modifications) │ +└─────────────────────────────────────────────────────────────────────────────┘ +``` + +### Target Flow (Headless) + +1. External client calls `POST /api/v1/copilot/chat` with API key +2. Orchestrator calls copilot backend +3. **Server parses SSE stream** +4. **Server executes tools directly** (no HTTP) +5. Server notifies copilot backend (mark-complete) +6. Server returns JSON response + +### Target Flow (Interactive) + +1. User sends message in UI +2. Store calls `/api/copilot/chat` +3. **Server orchestrates everything** +4. Server forwards events to client via SSE +5. Client just updates UI from events +6. Server returns when complete + +--- + +## Scope & Boundaries + +### In Scope + +| Item | Description | +|------|-------------| +| Orchestrator module | New module in `lib/copilot/orchestrator/` | +| Headless API route | New route `POST /api/v1/copilot/chat` | +| SSE parsing | Move from store to server | +| Tool execution | Direct function calls on server | +| Event forwarding | SSE to client (interactive mode) | +| Store simplification | Reduce to UI-only logic | + +### Out of Scope + +| Item | Reason | +|------|--------| +| Copilot backend (Go) | Separate repo, working correctly | +| Tool definitions | Already work, just called differently | +| LLM providers | Handled by copilot backend | +| Subagent system | Handled by copilot backend | + +### Boundaries + +``` + ┌─────────────────────────────────────┐ + │ MODIFICATION ZONE │ + │ │ + ┌────────────────┼─────────────────────────────────────┼────────────────┐ + │ │ │ │ + │ UNCHANGED │ apps/sim/ │ UNCHANGED │ + │ │ ├── lib/copilot/orchestrator/ │ │ + │ copilot/ │ │ └── (NEW) │ apps/sim/ │ + │ (Go backend) │ ├── app/api/v1/copilot/ │ tools/ │ + │ │ │ └── (NEW) │ (definitions)│ + │ │ ├── app/api/copilot/chat/ │ │ + │ │ │ └── (MODIFIED) │ │ + │ │ └── stores/panel/copilot/ │ │ + │ │ └── (SIMPLIFIED) │ │ + │ │ │ │ + └────────────────┼─────────────────────────────────────┼────────────────┘ + │ │ + └─────────────────────────────────────┘ +``` + +--- + +## Module Design + +### Directory Structure + +``` +apps/sim/lib/copilot/orchestrator/ +├── index.ts # Main orchestrator function +├── types.ts # Type definitions +├── sse-parser.ts # Parse SSE stream from copilot backend +├── sse-handlers.ts # Handle each SSE event type +├── tool-executor.ts # Execute tools directly (no HTTP) +├── persistence.ts # Database and Redis operations +└── response-builder.ts # Build final response +``` + +### Module Responsibilities + +#### `types.ts` + +Defines all types used by the orchestrator: + +```typescript +// SSE Events +interface SSEEvent { type, data, subagent?, toolCallId?, toolName? } +type SSEEventType = 'content' | 'tool_call' | 'tool_result' | 'done' | ... + +// Tool State +interface ToolCallState { id, name, status, params?, result?, error? } +type ToolCallStatus = 'pending' | 'executing' | 'success' | 'error' | 'skipped' + +// Streaming Context (internal state during orchestration) +interface StreamingContext { + chatId?, conversationId?, messageId + accumulatedContent, contentBlocks + toolCalls: Map + streamComplete, errors[] +} + +// Orchestrator API +interface OrchestratorRequest { message, workflowId, userId, chatId?, mode?, ... } +interface OrchestratorOptions { autoExecuteTools?, onEvent?, timeout?, ... } +interface OrchestratorResult { success, content, toolCalls[], chatId?, error? } + +// Execution Context (passed to tool executors) +interface ExecutionContext { userId, workflowId, workspaceId?, decryptedEnvVars? } +``` + +#### `sse-parser.ts` + +Parses SSE stream into typed events: + +```typescript +async function* parseSSEStream( + reader: ReadableStreamDefaultReader, + decoder: TextDecoder, + abortSignal?: AbortSignal +): AsyncGenerator +``` + +- Handles buffering for partial lines +- Parses JSON from `data:` lines +- Yields typed `SSEEvent` objects +- Supports abort signal + +#### `sse-handlers.ts` + +Handles each SSE event type: + +```typescript +const sseHandlers: Record = { + content: (event, context) => { /* append to accumulated content */ }, + tool_call: async (event, context, execContext, options) => { + /* track tool, execute if autoExecuteTools */ + }, + tool_result: (event, context) => { /* update tool status */ }, + tool_generating: (event, context) => { /* create pending tool */ }, + reasoning: (event, context) => { /* handle thinking blocks */ }, + done: (event, context) => { /* mark stream complete */ }, + error: (event, context) => { /* record error */ }, + // ... etc +} + +const subAgentHandlers: Record = { + // Handlers for events within subagent context +} +``` + +#### `tool-executor.ts` + +Executes tools directly without HTTP: + +```typescript +// Main entry point +async function executeToolServerSide( + toolCall: ToolCallState, + context: ExecutionContext +): Promise + +// Server tools (edit_workflow, search_documentation, etc.) +async function executeServerToolDirect( + toolName: string, + params: Record, + context: ExecutionContext +): Promise + +// Integration tools (slack_send, gmail_read, etc.) +async function executeIntegrationToolDirect( + toolCallId: string, + toolName: string, + toolConfig: ToolConfig, + params: Record, + context: ExecutionContext +): Promise + +// Notify copilot backend (external HTTP - required) +async function markToolComplete( + toolCallId: string, + toolName: string, + status: number, + message?: any, + data?: any +): Promise + +// Prepare cached context for tool execution +async function prepareExecutionContext( + userId: string, + workflowId: string +): Promise +``` + +**Key principle**: Internal tool execution uses direct function calls. Only `markToolComplete` makes HTTP call (to copilot backend - external). + +#### `persistence.ts` + +Database and Redis operations: + +```typescript +// Chat persistence +async function createChat(params): Promise<{ id: string }> +async function loadChat(chatId, userId): Promise +async function saveMessages(chatId, messages, options?): Promise +async function updateChatConversationId(chatId, conversationId): Promise + +// Tool confirmation (Redis) +async function setToolConfirmation(toolCallId, status, message?): Promise +async function getToolConfirmation(toolCallId): Promise +``` + +#### `index.ts` + +Main orchestrator function: + +```typescript +async function orchestrateCopilotRequest( + request: OrchestratorRequest, + options: OrchestratorOptions = {} +): Promise { + + // 1. Prepare execution context (cache env vars, etc.) + const execContext = await prepareExecutionContext(userId, workflowId) + + // 2. Handle chat creation/loading + let chatId = await resolveChat(request) + + // 3. Build request payload for copilot backend + const payload = buildCopilotPayload(request) + + // 4. Call copilot backend + const response = await fetch(COPILOT_URL, { body: JSON.stringify(payload) }) + + // 5. Create streaming context + const context = createStreamingContext(chatId) + + // 6. Parse and handle SSE stream + for await (const event of parseSSEStream(response.body)) { + // Forward to client if attached + options.onEvent?.(event) + + // Handle event + const handler = getHandler(event) + await handler(event, context, execContext, options) + + if (context.streamComplete) break + } + + // 7. Persist to database + await persistChat(chatId, context) + + // 8. Build and return result + return buildResult(context) +} +``` + +--- + +## Implementation Plan + +### Phase 1: Create Orchestrator Module (3-4 days) + +**Goal**: Build the orchestrator module that can run independently. + +#### Tasks + +1. **Create `types.ts`** (~200 lines) + - [ ] Define SSE event types + - [ ] Define tool call state types + - [ ] Define streaming context type + - [ ] Define orchestrator request/response types + - [ ] Define execution context type + +2. **Create `sse-parser.ts`** (~80 lines) + - [ ] Extract parsing logic from store.ts + - [ ] Add abort signal support + - [ ] Add error handling + +3. **Create `persistence.ts`** (~120 lines) + - [ ] Extract DB operations from chat route + - [ ] Extract Redis operations from confirm route + - [ ] Add chat creation/loading + - [ ] Add message saving + +4. **Create `tool-executor.ts`** (~300 lines) + - [ ] Create `executeToolServerSide()` main entry + - [ ] Create `executeServerToolDirect()` for server tools + - [ ] Create `executeIntegrationToolDirect()` for integration tools + - [ ] Create `markToolComplete()` for copilot backend notification + - [ ] Create `prepareExecutionContext()` for caching + - [ ] Handle OAuth token resolution + - [ ] Handle env var resolution + +5. **Create `sse-handlers.ts`** (~350 lines) + - [ ] Extract handlers from store.ts + - [ ] Adapt for server-side context + - [ ] Add tool execution integration + - [ ] Add subagent handlers + +6. **Create `index.ts`** (~250 lines) + - [ ] Create `orchestrateCopilotRequest()` main function + - [ ] Wire together all modules + - [ ] Add timeout handling + - [ ] Add abort signal support + - [ ] Add event forwarding + +#### Deliverables + +- Complete `lib/copilot/orchestrator/` module +- Unit tests for each component +- Integration test for full orchestration + +### Phase 2: Create Headless API Route (1 day) + +**Goal**: Create API endpoint for headless copilot access. + +#### Tasks + +1. **Create route** `app/api/v1/copilot/chat/route.ts` (~100 lines) + - [ ] Add API key authentication + - [ ] Parse and validate request + - [ ] Call orchestrator + - [ ] Return JSON response + +2. **Add to API documentation** + - [ ] Document request format + - [ ] Document response format + - [ ] Document error codes + +#### Deliverables + +- Working `POST /api/v1/copilot/chat` endpoint +- API documentation +- E2E test + +### Phase 3: Wire Interactive Route (2 days) + +**Goal**: Use orchestrator for existing interactive flow. + +#### Tasks + +1. **Modify `/api/copilot/chat/route.ts`** + - [ ] Add feature flag for new vs old flow + - [ ] Call orchestrator with `onEvent` callback + - [ ] Forward events to client via SSE + - [ ] Maintain backward compatibility + +2. **Test both flows** + - [ ] Verify interactive works with new orchestrator + - [ ] Verify old flow still works (feature flag off) + +#### Deliverables + +- Interactive route using orchestrator +- Feature flag for gradual rollout +- No breaking changes + +### Phase 4: Simplify Client Store (2-3 days) + +**Goal**: Remove orchestration logic from client, keep UI-only. + +#### Tasks + +1. **Create simplified store** (new file or gradual refactor) + - [ ] Keep: UI state, messages, tool display + - [ ] Keep: Simple API calls + - [ ] Keep: Event listener + - [ ] Remove: SSE parsing + - [ ] Remove: Tool execution logic + - [ ] Remove: Client tool instantiators + +2. **Update components** + - [ ] Update components to use simplified store + - [ ] Remove tool execution from UI components + - [ ] Simplify tool display components + +3. **Remove dead code** + - [ ] Remove unused imports + - [ ] Remove unused helper functions + - [ ] Remove client tool classes (if no longer needed) + +#### Deliverables + +- Simplified store (~600 lines) +- Updated components +- Reduced bundle size + +### Phase 5: Testing & Polish (2-3 days) + +#### Tasks + +1. **E2E testing** + - [ ] Test headless API with various prompts + - [ ] Test interactive with various prompts + - [ ] Test tool execution scenarios + - [ ] Test error handling + - [ ] Test abort/timeout scenarios + +2. **Performance testing** + - [ ] Compare latency (old vs new) + - [ ] Check memory usage + - [ ] Check for connection issues + +3. **Documentation** + - [ ] Update developer docs + - [ ] Add architecture diagram + - [ ] Document new API + +#### Deliverables + +- Comprehensive test suite +- Performance benchmarks +- Complete documentation + +--- + +## API Contracts + +### Headless API + +#### Request + +```http +POST /api/v1/copilot/chat +Content-Type: application/json +X-API-Key: sim_xxx + +{ + "message": "Create a Slack notification workflow", + "workflowId": "wf_abc123", + "chatId": "chat_xyz", // Optional: continue existing chat + "mode": "agent", // Optional: "agent" | "ask" | "plan" + "model": "claude-4-sonnet", // Optional + "autoExecuteTools": true, // Optional: default true + "timeout": 300000 // Optional: default 5 minutes +} +``` + +#### Response (Success) + +```json +{ + "success": true, + "content": "I've created a Slack notification workflow that...", + "toolCalls": [ + { + "id": "tc_001", + "name": "search_patterns", + "status": "success", + "params": { "query": "slack notification" }, + "result": { "patterns": [...] }, + "durationMs": 234 + }, + { + "id": "tc_002", + "name": "edit_workflow", + "status": "success", + "params": { "operations": [...] }, + "result": { "blocksAdded": 3 }, + "durationMs": 1523 + } + ], + "chatId": "chat_xyz", + "conversationId": "conv_123" +} +``` + +#### Response (Error) + +```json +{ + "success": false, + "error": "Workflow not found", + "content": "", + "toolCalls": [] +} +``` + +#### Error Codes + +| Status | Error | Description | +|--------|-------|-------------| +| 400 | Invalid request | Missing required fields | +| 401 | Unauthorized | Invalid or missing API key | +| 404 | Workflow not found | Workflow ID doesn't exist | +| 500 | Internal error | Server-side failure | +| 504 | Timeout | Request exceeded timeout | + +### Interactive API (Existing - Modified) + +The existing `/api/copilot/chat` endpoint continues to work but now uses the orchestrator internally. SSE events forwarded to client remain the same format. + +--- + +## Migration Strategy + +### Rollout Plan + +``` +Week 1: Phase 1 (Orchestrator) +├── Day 1-2: Types + SSE Parser +├── Day 3: Tool Executor +└── Day 4-5: Handlers + Main Orchestrator + +Week 2: Phase 2-3 (Routes) +├── Day 1: Headless API route +├── Day 2-3: Wire interactive route +└── Day 4-5: Testing both modes + +Week 3: Phase 4-5 (Cleanup) +├── Day 1-3: Simplify store +├── Day 4: Testing +└── Day 5: Documentation +``` + +### Feature Flags + +```typescript +// lib/copilot/config.ts + +export const COPILOT_FLAGS = { + // Use new orchestrator for interactive mode + USE_SERVER_ORCHESTRATOR: process.env.COPILOT_USE_SERVER_ORCHESTRATOR === 'true', + + // Enable headless API + ENABLE_HEADLESS_API: process.env.COPILOT_ENABLE_HEADLESS_API === 'true', +} +``` + +### Rollback Plan + +If issues arise: +1. Set `COPILOT_USE_SERVER_ORCHESTRATOR=false` +2. Interactive mode falls back to old client-side flow +3. Headless API returns 503 Service Unavailable + +--- + +## Testing Strategy + +### Unit Tests + +``` +lib/copilot/orchestrator/ +├── __tests__/ +│ ├── sse-parser.test.ts +│ ├── sse-handlers.test.ts +│ ├── tool-executor.test.ts +│ ├── persistence.test.ts +│ └── index.test.ts +``` + +#### SSE Parser Tests + +```typescript +describe('parseSSEStream', () => { + it('parses content events') + it('parses tool_call events') + it('handles partial lines') + it('handles malformed JSON') + it('respects abort signal') +}) +``` + +#### Tool Executor Tests + +```typescript +describe('executeToolServerSide', () => { + it('executes server tools directly') + it('executes integration tools with OAuth') + it('resolves env var references') + it('handles tool not found') + it('handles execution errors') +}) +``` + +### Integration Tests + +```typescript +describe('orchestrateCopilotRequest', () => { + it('handles simple message without tools') + it('handles message with single tool call') + it('handles message with multiple tool calls') + it('handles subagent tool calls') + it('handles stream errors') + it('respects timeout') + it('forwards events to callback') +}) +``` + +### E2E Tests + +```typescript +describe('POST /api/v1/copilot/chat', () => { + it('returns 401 without API key') + it('returns 400 with invalid request') + it('executes simple ask query') + it('executes workflow modification') + it('handles tool execution') +}) +``` + +--- + +## Risks & Mitigations + +### Risk 1: Breaking Interactive Mode + +**Risk**: Refactoring could break existing interactive copilot. + +**Mitigation**: +- Feature flag for gradual rollout +- Keep old code path available +- Extensive E2E testing +- Staged deployment (internal → beta → production) + +### Risk 2: Tool Execution Differences + +**Risk**: Tool behavior differs between client and server execution. + +**Mitigation**: +- Reuse existing tool execution logic (same functions) +- Compare outputs in parallel testing +- Log discrepancies for investigation + +### Risk 3: Performance Regression + +**Risk**: Server-side orchestration could be slower. + +**Mitigation**: +- Actually should be faster (no browser round-trips) +- Benchmark before/after +- Profile critical paths + +### Risk 4: Memory Usage + +**Risk**: Server accumulates state during long-running requests. + +**Mitigation**: +- Set reasonable timeouts +- Clean up context after request +- Monitor memory in production + +### Risk 5: Connection Issues + +**Risk**: Long-running SSE connections could drop. + +**Mitigation**: +- Implement reconnection logic +- Save checkpoints to resume +- Handle partial completions gracefully + +--- + +## File Inventory + +### New Files + +| File | Lines | Description | +|------|-------|-------------| +| `lib/copilot/orchestrator/types.ts` | ~200 | Type definitions | +| `lib/copilot/orchestrator/sse-parser.ts` | ~80 | SSE stream parsing | +| `lib/copilot/orchestrator/sse-handlers.ts` | ~350 | Event handlers | +| `lib/copilot/orchestrator/tool-executor.ts` | ~300 | Tool execution | +| `lib/copilot/orchestrator/persistence.ts` | ~120 | DB/Redis operations | +| `lib/copilot/orchestrator/index.ts` | ~250 | Main orchestrator | +| `app/api/v1/copilot/chat/route.ts` | ~100 | Headless API | +| **Total New** | **~1,400** | | + +### Modified Files + +| File | Change | +|------|--------| +| `app/api/copilot/chat/route.ts` | Use orchestrator (optional) | +| `stores/panel/copilot/store.ts` | Simplify to ~600 lines | + +### Deleted Code (from store.ts) + +| Section | Lines Removed | +|---------|---------------| +| SSE parsing logic | ~150 | +| `sseHandlers` object | ~750 | +| `subAgentSSEHandlers` | ~280 | +| Tool execution logic | ~400 | +| Client tool instantiators | ~120 | +| Content block helpers | ~200 | +| Streaming context | ~100 | +| **Total Removed** | **~2,000** | + +### Net Change + +``` +New code: +1,400 lines (orchestrator module) +Removed code: -2,000 lines (from store) +Modified code: ~200 lines (route changes) +─────────────────────────────────────── +Net change: -400 lines (cleaner, more maintainable) +``` + +--- + +## Appendix: Code Extraction Map + +### From `stores/panel/copilot/store.ts` + +| Source Lines | Destination | Notes | +|--------------|-------------|-------| +| 900-1050 (parseSSEStream) | `sse-parser.ts` | Adapt for server | +| 1120-1867 (sseHandlers) | `sse-handlers.ts` | Remove Zustand deps | +| 1940-2217 (subAgentSSEHandlers) | `sse-handlers.ts` | Merge with above | +| 1365-1583 (tool execution) | `tool-executor.ts` | Direct calls | +| 330-380 (StreamingContext) | `types.ts` | Clean up | +| 3328-3648 (handleStreamingResponse) | `index.ts` | Main loop | + +### From `app/api/copilot/execute-tool/route.ts` + +| Source Lines | Destination | Notes | +|--------------|-------------|-------| +| 30-247 (POST handler) | `tool-executor.ts` | Extract core logic | + +### From `app/api/copilot/confirm/route.ts` + +| Source Lines | Destination | Notes | +|--------------|-------------|-------| +| 28-89 (updateToolCallStatus) | `persistence.ts` | Redis operations | + +--- + +## Approval & Sign-off + +- [ ] Technical review complete +- [ ] Security review complete +- [ ] Performance impact assessed +- [ ] Rollback plan approved +- [ ] Testing plan approved + +--- + +*Document created: January 2026* +*Last updated: January 2026* +