This commit is contained in:
Siddharth Ganesan
2026-01-30 11:33:08 -08:00
parent 5add92a613
commit deccca0276
12 changed files with 2244 additions and 379 deletions

View File

@@ -7,7 +7,7 @@ import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { generateChatTitle } from '@/lib/copilot/chat-title'
import { getCopilotModel } from '@/lib/copilot/config'
import { SIM_AGENT_API_URL_DEFAULT, SIM_AGENT_VERSION } from '@/lib/copilot/constants'
import { SIM_AGENT_VERSION } from '@/lib/copilot/constants'
import { COPILOT_MODEL_IDS, COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
import {
authenticateCopilotRequestSessionOnly,
@@ -23,10 +23,10 @@ import { CopilotFiles } from '@/lib/uploads'
import { createFileContent } from '@/lib/uploads/utils/file-utils'
import { tools } from '@/tools/registry'
import { getLatestVersionTools, stripVersionSuffix } from '@/tools/utils'
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
const logger = createLogger('CopilotChatAPI')
const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
const FileAttachmentSchema = z.object({
id: z.string(),
@@ -465,77 +465,19 @@ export async function POST(req: NextRequest) {
})
} catch {}
const simAgentResponse = await fetch(`${SIM_AGENT_API_URL}/api/chat-completion-streaming`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
},
body: JSON.stringify(requestPayload),
})
if (!simAgentResponse.ok) {
if (simAgentResponse.status === 401 || simAgentResponse.status === 402) {
// Rethrow status only; client will render appropriate assistant message
return new NextResponse(null, { status: simAgentResponse.status })
}
const errorText = await simAgentResponse.text().catch(() => '')
logger.error(`[${tracker.requestId}] Sim agent API error:`, {
status: simAgentResponse.status,
error: errorText,
})
return NextResponse.json(
{ error: `Sim agent API error: ${simAgentResponse.statusText}` },
{ status: simAgentResponse.status }
)
}
// If streaming is requested, forward the stream and update chat later
if (stream && simAgentResponse.body) {
// Create user message to save
const userMessage = {
id: userMessageIdToUse, // Consistent ID used for request and persistence
role: 'user',
content: message,
timestamp: new Date().toISOString(),
...(fileAttachments && fileAttachments.length > 0 && { fileAttachments }),
...(Array.isArray(contexts) && contexts.length > 0 && { contexts }),
...(Array.isArray(contexts) &&
contexts.length > 0 && {
contentBlocks: [{ type: 'contexts', contexts: contexts as any, timestamp: Date.now() }],
}),
}
// Create a pass-through stream that captures the response
if (stream) {
const transformedStream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder()
let assistantContent = ''
const toolCalls: any[] = []
let buffer = ''
const isFirstDone = true
let responseIdFromStart: string | undefined
let responseIdFromDone: string | undefined
// Track tool call progress to identify a safe done event
const announcedToolCallIds = new Set<string>()
const startedToolExecutionIds = new Set<string>()
const completedToolExecutionIds = new Set<string>()
let lastDoneResponseId: string | undefined
let lastSafeDoneResponseId: string | undefined
// Send chatId as first event
if (actualChatId) {
const chatIdEvent = `data: ${JSON.stringify({
type: 'chat_id',
chatId: actualChatId,
})}\n\n`
controller.enqueue(encoder.encode(chatIdEvent))
logger.debug(`[${tracker.requestId}] Sent initial chatId event to client`)
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ type: 'chat_id', chatId: actualChatId })}\n\n`
)
)
}
// Start title generation in parallel if needed
if (actualChatId && !currentChat?.title && conversationHistory.length === 0) {
generateChatTitle(message)
.then(async (title) => {
@@ -547,311 +489,61 @@ export async function POST(req: NextRequest) {
updatedAt: new Date(),
})
.where(eq(copilotChats.id, actualChatId!))
const titleEvent = `data: ${JSON.stringify({
type: 'title_updated',
title: title,
})}\n\n`
controller.enqueue(encoder.encode(titleEvent))
logger.info(`[${tracker.requestId}] Generated and saved title: ${title}`)
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ type: 'title_updated', title })}\n\n`)
)
}
})
.catch((error) => {
logger.error(`[${tracker.requestId}] Title generation failed:`, error)
})
} else {
logger.debug(`[${tracker.requestId}] Skipping title generation`)
}
// Forward the sim agent stream and capture assistant response
const reader = simAgentResponse.body!.getReader()
const decoder = new TextDecoder()
try {
while (true) {
const { done, value } = await reader.read()
if (done) {
break
}
// Decode and parse SSE events for logging and capturing content
const decodedChunk = decoder.decode(value, { stream: true })
buffer += decodedChunk
const lines = buffer.split('\n')
buffer = lines.pop() || '' // Keep incomplete line in buffer
for (const line of lines) {
if (line.trim() === '') continue // Skip empty lines
if (line.startsWith('data: ') && line.length > 6) {
try {
const jsonStr = line.slice(6)
// Check if the JSON string is unusually large (potential streaming issue)
if (jsonStr.length > 50000) {
// 50KB limit
logger.warn(`[${tracker.requestId}] Large SSE event detected`, {
size: jsonStr.length,
preview: `${jsonStr.substring(0, 100)}...`,
})
}
const event = JSON.parse(jsonStr)
// Log different event types comprehensively
switch (event.type) {
case 'content':
if (event.data) {
assistantContent += event.data
}
break
case 'reasoning':
logger.debug(
`[${tracker.requestId}] Reasoning chunk received (${(event.data || event.content || '').length} chars)`
)
break
case 'tool_call':
if (!event.data?.partial) {
toolCalls.push(event.data)
if (event.data?.id) {
announcedToolCallIds.add(event.data.id)
}
}
break
case 'tool_generating':
if (event.toolCallId) {
startedToolExecutionIds.add(event.toolCallId)
}
break
case 'tool_result':
if (event.toolCallId) {
completedToolExecutionIds.add(event.toolCallId)
}
break
case 'tool_error':
logger.error(`[${tracker.requestId}] Tool error:`, {
toolCallId: event.toolCallId,
toolName: event.toolName,
error: event.error,
success: event.success,
})
if (event.toolCallId) {
completedToolExecutionIds.add(event.toolCallId)
}
break
case 'start':
if (event.data?.responseId) {
responseIdFromStart = event.data.responseId
}
break
case 'done':
if (event.data?.responseId) {
responseIdFromDone = event.data.responseId
lastDoneResponseId = responseIdFromDone
// Mark this done as safe only if no tool call is currently in progress or pending
const announced = announcedToolCallIds.size
const completed = completedToolExecutionIds.size
const started = startedToolExecutionIds.size
const hasToolInProgress = announced > completed || started > completed
if (!hasToolInProgress) {
lastSafeDoneResponseId = responseIdFromDone
}
}
break
case 'error':
break
default:
}
// Emit to client: rewrite 'error' events into user-friendly assistant message
if (event?.type === 'error') {
try {
const displayMessage: string =
(event?.data && (event.data.displayMessage as string)) ||
'Sorry, I encountered an error. Please try again.'
const formatted = `_${displayMessage}_`
// Accumulate so it persists to DB as assistant content
assistantContent += formatted
// Send as content chunk
try {
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ type: 'content', data: formatted })}\n\n`
)
)
} catch (enqueueErr) {
reader.cancel()
break
}
// Then close this response cleanly for the client
try {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`)
)
} catch (enqueueErr) {
reader.cancel()
break
}
} catch {}
// Do not forward the original error event
} else {
// Forward original event to client
try {
controller.enqueue(encoder.encode(`data: ${jsonStr}\n\n`))
} catch (enqueueErr) {
reader.cancel()
break
}
}
} catch (e) {
// Enhanced error handling for large payloads and parsing issues
const lineLength = line.length
const isLargePayload = lineLength > 10000
if (isLargePayload) {
logger.error(
`[${tracker.requestId}] Failed to parse large SSE event (${lineLength} chars)`,
{
error: e,
preview: `${line.substring(0, 200)}...`,
size: lineLength,
}
)
} else {
logger.warn(
`[${tracker.requestId}] Failed to parse SSE event: "${line.substring(0, 200)}..."`,
e
)
}
}
} else if (line.trim() && line !== 'data: [DONE]') {
logger.debug(`[${tracker.requestId}] Non-SSE line from sim agent: "${line}"`)
}
}
}
// Process any remaining buffer
if (buffer.trim()) {
logger.debug(`[${tracker.requestId}] Processing remaining buffer: "${buffer}"`)
if (buffer.startsWith('data: ')) {
const result = await orchestrateCopilotStream(requestPayload, {
userId: authenticatedUserId,
workflowId,
chatId: actualChatId,
autoExecuteTools: true,
interactive: true,
onEvent: async (event) => {
try {
const jsonStr = buffer.slice(6)
const event = JSON.parse(jsonStr)
if (event.type === 'content' && event.data) {
assistantContent += event.data
}
// Forward remaining event, applying same error rewrite behavior
if (event?.type === 'error') {
const displayMessage: string =
(event?.data && (event.data.displayMessage as string)) ||
'Sorry, I encountered an error. Please try again.'
const formatted = `_${displayMessage}_`
assistantContent += formatted
try {
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ type: 'content', data: formatted })}\n\n`
)
)
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`)
)
} catch (enqueueErr) {
reader.cancel()
}
} else {
try {
controller.enqueue(encoder.encode(`data: ${jsonStr}\n\n`))
} catch (enqueueErr) {
reader.cancel()
}
}
} catch (e) {
logger.warn(`[${tracker.requestId}] Failed to parse final buffer: "${buffer}"`)
controller.enqueue(encoder.encode(`data: ${JSON.stringify(event)}\n\n`))
} catch {
controller.error('Failed to forward SSE event')
}
}
}
// Log final streaming summary
logger.info(`[${tracker.requestId}] Streaming complete summary:`, {
totalContentLength: assistantContent.length,
toolCallsCount: toolCalls.length,
hasContent: assistantContent.length > 0,
toolNames: toolCalls.map((tc) => tc?.name).filter(Boolean),
},
})
// NOTE: Messages are saved by the client via update-messages endpoint with full contentBlocks.
// Server only updates conversationId here to avoid overwriting client's richer save.
if (currentChat) {
// Persist only a safe conversationId to avoid continuing from a state that expects tool outputs
const previousConversationId = currentChat?.conversationId as string | undefined
const responseId = lastSafeDoneResponseId || previousConversationId || undefined
if (responseId) {
await db
.update(copilotChats)
.set({
updatedAt: new Date(),
conversationId: responseId,
})
.where(eq(copilotChats.id, actualChatId!))
logger.info(
`[${tracker.requestId}] Updated conversationId for chat ${actualChatId}`,
{
updatedConversationId: responseId,
}
)
}
if (currentChat && result.conversationId) {
await db
.update(copilotChats)
.set({
updatedAt: new Date(),
conversationId: result.conversationId,
})
.where(eq(copilotChats.id, actualChatId!))
}
} catch (error) {
logger.error(`[${tracker.requestId}] Error processing stream:`, error)
// Send an error event to the client before closing so it knows what happened
try {
const errorMessage =
error instanceof Error && error.message === 'terminated'
? 'Connection to AI service was interrupted. Please try again.'
: 'An unexpected error occurred while processing the response.'
const encoder = new TextEncoder()
// Send error as content so it shows in the chat
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ type: 'content', data: `\n\n_${errorMessage}_` })}\n\n`
)
logger.error(`[${tracker.requestId}] Orchestration error:`, error)
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({
type: 'error',
data: {
displayMessage:
'An unexpected error occurred while processing the response.',
},
})}\n\n`
)
// Send done event to properly close the stream on client
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`))
} catch (enqueueError) {
// Stream might already be closed, that's ok
logger.warn(
`[${tracker.requestId}] Could not send error event to client:`,
enqueueError
)
}
)
} finally {
try {
controller.close()
} catch {
// Controller might already be closed
}
controller.close()
}
},
})
const response = new Response(transformedStream, {
return new Response(transformedStream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
@@ -859,43 +551,31 @@ export async function POST(req: NextRequest) {
'X-Accel-Buffering': 'no',
},
})
logger.info(`[${tracker.requestId}] Returning streaming response to client`, {
duration: tracker.getDuration(),
chatId: actualChatId,
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
})
return response
}
// For non-streaming responses
const responseData = await simAgentResponse.json()
logger.info(`[${tracker.requestId}] Non-streaming response from sim agent:`, {
const nonStreamingResult = await orchestrateCopilotStream(requestPayload, {
userId: authenticatedUserId,
workflowId,
chatId: actualChatId,
autoExecuteTools: true,
interactive: true,
})
const responseData = {
content: nonStreamingResult.content,
toolCalls: nonStreamingResult.toolCalls,
model: selectedModel,
provider: providerConfig?.provider || env.COPILOT_PROVIDER || 'openai',
}
logger.info(`[${tracker.requestId}] Non-streaming response from orchestrator:`, {
hasContent: !!responseData.content,
contentLength: responseData.content?.length || 0,
model: responseData.model,
provider: responseData.provider,
toolCallsCount: responseData.toolCalls?.length || 0,
hasTokens: !!responseData.tokens,
})
// Log tool calls if present
if (responseData.toolCalls?.length > 0) {
responseData.toolCalls.forEach((toolCall: any) => {
logger.info(`[${tracker.requestId}] Tool call in response:`, {
id: toolCall.id,
name: toolCall.name,
success: toolCall.success,
result: `${JSON.stringify(toolCall.result).substring(0, 200)}...`,
})
})
}
// Save messages if we have a chat
if (currentChat && responseData.content) {
const userMessage = {
@@ -947,6 +627,9 @@ export async function POST(req: NextRequest) {
.set({
messages: updatedMessages,
updatedAt: new Date(),
...(nonStreamingResult.conversationId
? { conversationId: nonStreamingResult.conversationId }
: {}),
})
.where(eq(copilotChats.id, actualChatId!))
}

View File

@@ -0,0 +1,81 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { authenticateV1Request } from '@/app/api/v1/auth'
import { getCopilotModel } from '@/lib/copilot/config'
import { SIM_AGENT_VERSION } from '@/lib/copilot/constants'
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
const logger = createLogger('CopilotHeadlessAPI')
const RequestSchema = z.object({
message: z.string().min(1, 'message is required'),
workflowId: z.string().min(1, 'workflowId is required'),
chatId: z.string().optional(),
mode: z.enum(['agent', 'ask', 'plan']).optional().default('agent'),
model: z.string().optional(),
autoExecuteTools: z.boolean().optional().default(true),
timeout: z.number().optional().default(300000),
})
/**
* POST /api/v1/copilot/chat
* Headless copilot endpoint for server-side orchestration.
*/
export async function POST(req: NextRequest) {
const auth = await authenticateV1Request(req)
if (!auth.authenticated || !auth.userId) {
return NextResponse.json({ success: false, error: auth.error || 'Unauthorized' }, { status: 401 })
}
try {
const body = await req.json()
const parsed = RequestSchema.parse(body)
const defaults = getCopilotModel('chat')
const selectedModel = parsed.model || defaults.model
const requestPayload = {
message: parsed.message,
workflowId: parsed.workflowId,
userId: auth.userId,
stream: true,
streamToolCalls: true,
model: selectedModel,
mode: parsed.mode,
messageId: crypto.randomUUID(),
version: SIM_AGENT_VERSION,
...(parsed.chatId ? { chatId: parsed.chatId } : {}),
}
const result = await orchestrateCopilotStream(requestPayload, {
userId: auth.userId,
workflowId: parsed.workflowId,
chatId: parsed.chatId,
autoExecuteTools: parsed.autoExecuteTools,
timeout: parsed.timeout,
interactive: false,
})
return NextResponse.json({
success: result.success,
content: result.content,
toolCalls: result.toolCalls,
chatId: result.chatId,
conversationId: result.conversationId,
error: result.error,
})
} catch (error) {
if (error instanceof z.ZodError) {
return NextResponse.json(
{ success: false, error: 'Invalid request', details: error.errors },
{ status: 400 }
)
}
logger.error('Headless copilot request failed', {
error: error instanceof Error ? error.message : String(error),
})
return NextResponse.json({ success: false, error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -1,5 +1,6 @@
'use client'
import { createLogger } from '@sim/logger'
import { memo, useEffect, useMemo, useRef, useState } from 'react'
import clsx from 'clsx'
import { ChevronUp, LayoutList } from 'lucide-react'
@@ -25,6 +26,7 @@ import { getBlock } from '@/blocks/registry'
import type { CopilotToolCall } from '@/stores/panel'
import { useCopilotStore } from '@/stores/panel'
import { CLASS_TOOL_METADATA } from '@/stores/panel/copilot/store'
import { COPILOT_SERVER_ORCHESTRATED } from '@/lib/copilot/orchestrator/config'
import type { SubAgentContentBlock } from '@/stores/panel/copilot/types'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
@@ -1259,12 +1261,36 @@ function shouldShowRunSkipButtons(toolCall: CopilotToolCall): boolean {
return false
}
const toolCallLogger = createLogger('CopilotToolCall')
async function sendToolDecision(toolCallId: string, status: 'accepted' | 'rejected') {
try {
await fetch('/api/copilot/confirm', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ toolCallId, status }),
})
} catch (error) {
toolCallLogger.warn('Failed to send tool decision', {
toolCallId,
status,
error: error instanceof Error ? error.message : String(error),
})
}
}
async function handleRun(
toolCall: CopilotToolCall,
setToolCallState: any,
onStateChange?: any,
editedParams?: any
) {
if (COPILOT_SERVER_ORCHESTRATED) {
setToolCallState(toolCall, 'executing')
onStateChange?.('executing')
await sendToolDecision(toolCall.id, 'accepted')
return
}
const instance = getClientTool(toolCall.id)
if (!instance && isIntegrationTool(toolCall.name)) {
@@ -1309,6 +1335,12 @@ async function handleRun(
}
async function handleSkip(toolCall: CopilotToolCall, setToolCallState: any, onStateChange?: any) {
if (COPILOT_SERVER_ORCHESTRATED) {
setToolCallState(toolCall, 'rejected')
onStateChange?.('rejected')
await sendToolDecision(toolCall.id, 'rejected')
return
}
const instance = getClientTool(toolCall.id)
if (!instance && isIntegrationTool(toolCall.name)) {

View File

@@ -0,0 +1,23 @@
/**
* Feature flag for server-side copilot orchestration.
*/
export const COPILOT_SERVER_ORCHESTRATED = true
export const INTERRUPT_TOOL_NAMES = [
'set_global_workflow_variables',
'run_workflow',
'manage_mcp_tool',
'manage_custom_tool',
'deploy_mcp',
'deploy_chat',
'deploy_api',
'create_workspace_mcp_server',
'set_environment_variables',
'make_api_request',
'oauth_request_access',
'navigate_ui',
'knowledge_base',
] as const
export const INTERRUPT_TOOL_SET = new Set<string>(INTERRUPT_TOOL_NAMES)

View File

@@ -0,0 +1,181 @@
import { createLogger } from '@sim/logger'
import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants'
import { env } from '@/lib/core/config/env'
import { parseSSEStream } from '@/lib/copilot/orchestrator/sse-parser'
import {
handleSubagentRouting,
sseHandlers,
subAgentHandlers,
} from '@/lib/copilot/orchestrator/sse-handlers'
import { prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor'
import type {
ExecutionContext,
OrchestratorOptions,
OrchestratorResult,
SSEEvent,
StreamingContext,
ToolCallSummary,
} from '@/lib/copilot/orchestrator/types'
const logger = createLogger('CopilotOrchestrator')
const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
export interface OrchestrateStreamOptions extends OrchestratorOptions {
userId: string
workflowId: string
chatId?: string
}
/**
* Orchestrate a copilot SSE stream and execute tool calls server-side.
*/
export async function orchestrateCopilotStream(
requestPayload: Record<string, any>,
options: OrchestrateStreamOptions
): Promise<OrchestratorResult> {
const { userId, workflowId, chatId, timeout = 300000, abortSignal } = options
const execContext = await prepareExecutionContext(userId, workflowId)
const context: StreamingContext = {
chatId,
conversationId: undefined,
messageId: requestPayload?.messageId || crypto.randomUUID(),
accumulatedContent: '',
contentBlocks: [],
toolCalls: new Map(),
currentThinkingBlock: null,
isInThinkingBlock: false,
subAgentParentToolCallId: undefined,
subAgentContent: {},
subAgentToolCalls: {},
pendingContent: '',
streamComplete: false,
wasAborted: false,
errors: [],
}
try {
const response = await fetch(`${SIM_AGENT_API_URL}/api/chat-completion-streaming`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
},
body: JSON.stringify(requestPayload),
signal: abortSignal,
})
if (!response.ok) {
const errorText = await response.text().catch(() => '')
throw new Error(`Copilot backend error (${response.status}): ${errorText || response.statusText}`)
}
if (!response.body) {
throw new Error('Copilot backend response missing body')
}
const reader = response.body.getReader()
const decoder = new TextDecoder()
const timeoutId = setTimeout(() => {
context.errors.push('Request timed out')
context.streamComplete = true
reader.cancel().catch(() => {})
}, timeout)
try {
for await (const event of parseSSEStream(reader, decoder, abortSignal)) {
if (abortSignal?.aborted) {
context.wasAborted = true
break
}
await forwardEvent(event, options)
if (event.type === 'subagent_start') {
const toolCallId = event.data?.tool_call_id
if (toolCallId) {
context.subAgentParentToolCallId = toolCallId
context.subAgentContent[toolCallId] = ''
context.subAgentToolCalls[toolCallId] = []
}
continue
}
if (event.type === 'subagent_end') {
context.subAgentParentToolCallId = undefined
continue
}
if (handleSubagentRouting(event, context)) {
const handler = subAgentHandlers[event.type]
if (handler) {
await handler(event, context, execContext, options)
}
if (context.streamComplete) break
continue
}
const handler = sseHandlers[event.type]
if (handler) {
await handler(event, context, execContext, options)
}
if (context.streamComplete) break
}
} finally {
clearTimeout(timeoutId)
}
const result = buildResult(context)
await options.onComplete?.(result)
return result
} catch (error) {
const err = error instanceof Error ? error : new Error('Copilot orchestration failed')
logger.error('Copilot orchestration failed', { error: err.message })
await options.onError?.(err)
return {
success: false,
content: '',
contentBlocks: [],
toolCalls: [],
chatId: context.chatId,
conversationId: context.conversationId,
error: err.message,
}
}
}
async function forwardEvent(event: SSEEvent, options: OrchestratorOptions): Promise<void> {
try {
await options.onEvent?.(event)
} catch (error) {
logger.warn('Failed to forward SSE event', {
type: event.type,
error: error instanceof Error ? error.message : String(error),
})
}
}
function buildResult(context: StreamingContext): OrchestratorResult {
const toolCalls: ToolCallSummary[] = Array.from(context.toolCalls.values()).map((toolCall) => ({
id: toolCall.id,
name: toolCall.name,
status: toolCall.status,
params: toolCall.params,
result: toolCall.result?.output,
error: toolCall.error,
durationMs:
toolCall.endTime && toolCall.startTime ? toolCall.endTime - toolCall.startTime : undefined,
}))
return {
success: context.errors.length === 0,
content: context.accumulatedContent,
contentBlocks: context.contentBlocks,
toolCalls,
chatId: context.chatId,
conversationId: context.conversationId,
errors: context.errors.length ? context.errors : undefined,
}
}

View File

@@ -0,0 +1,138 @@
import { db } from '@sim/db'
import { copilotChats } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import { getRedisClient } from '@/lib/core/config/redis'
const logger = createLogger('CopilotOrchestratorPersistence')
/**
* Create a new copilot chat record.
*/
export async function createChat(params: {
userId: string
workflowId: string
model: string
}): Promise<{ id: string }> {
const [chat] = await db
.insert(copilotChats)
.values({
userId: params.userId,
workflowId: params.workflowId,
model: params.model,
messages: [],
})
.returning({ id: copilotChats.id })
return { id: chat.id }
}
/**
* Load an existing chat for a user.
*/
export async function loadChat(chatId: string, userId: string) {
const [chat] = await db
.select()
.from(copilotChats)
.where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, userId)))
.limit(1)
return chat || null
}
/**
* Save chat messages and metadata.
*/
export async function saveMessages(
chatId: string,
messages: any[],
options?: {
title?: string
conversationId?: string
planArtifact?: string | null
config?: { mode?: string; model?: string }
}
): Promise<void> {
await db
.update(copilotChats)
.set({
messages,
updatedAt: new Date(),
...(options?.title ? { title: options.title } : {}),
...(options?.conversationId ? { conversationId: options.conversationId } : {}),
...(options?.planArtifact !== undefined ? { planArtifact: options.planArtifact } : {}),
...(options?.config ? { config: options.config } : {}),
})
.where(eq(copilotChats.id, chatId))
}
/**
* Update the conversationId for a chat without overwriting messages.
*/
export async function updateChatConversationId(chatId: string, conversationId: string): Promise<void> {
await db
.update(copilotChats)
.set({
conversationId,
updatedAt: new Date(),
})
.where(eq(copilotChats.id, chatId))
}
/**
* Set a tool call confirmation status in Redis.
*/
export async function setToolConfirmation(
toolCallId: string,
status: 'accepted' | 'rejected' | 'background' | 'pending',
message?: string
): Promise<boolean> {
const redis = getRedisClient()
if (!redis) {
logger.warn('Redis client not available for tool confirmation')
return false
}
const key = `tool_call:${toolCallId}`
const payload = {
status,
message: message || null,
timestamp: new Date().toISOString(),
}
try {
await redis.set(key, JSON.stringify(payload), 'EX', 86400)
return true
} catch (error) {
logger.error('Failed to set tool confirmation', {
toolCallId,
error: error instanceof Error ? error.message : String(error),
})
return false
}
}
/**
* Get a tool call confirmation status from Redis.
*/
export async function getToolConfirmation(toolCallId: string): Promise<{
status: string
message?: string
timestamp?: string
} | null> {
const redis = getRedisClient()
if (!redis) return null
try {
const data = await redis.get(`tool_call:${toolCallId}`)
if (!data) return null
return JSON.parse(data) as { status: string; message?: string; timestamp?: string }
} catch (error) {
logger.error('Failed to read tool confirmation', {
toolCallId,
error: error instanceof Error ? error.message : String(error),
})
return null
}
}

View File

@@ -0,0 +1,342 @@
import { createLogger } from '@sim/logger'
import type {
ContentBlock,
ExecutionContext,
OrchestratorOptions,
SSEEvent,
StreamingContext,
ToolCallState,
} from '@/lib/copilot/orchestrator/types'
import { executeToolServerSide, markToolComplete } from '@/lib/copilot/orchestrator/tool-executor'
import { getToolConfirmation } from '@/lib/copilot/orchestrator/persistence'
import { INTERRUPT_TOOL_SET } from '@/lib/copilot/orchestrator/config'
const logger = createLogger('CopilotSseHandlers')
export type SSEHandler = (
event: SSEEvent,
context: StreamingContext,
execContext: ExecutionContext,
options: OrchestratorOptions
) => void | Promise<void>
function addContentBlock(
context: StreamingContext,
block: Omit<ContentBlock, 'timestamp'>
): void {
context.contentBlocks.push({
...block,
timestamp: Date.now(),
})
}
async function executeToolAndReport(
toolCallId: string,
context: StreamingContext,
execContext: ExecutionContext,
options?: OrchestratorOptions
): Promise<void> {
const toolCall = context.toolCalls.get(toolCallId)
if (!toolCall) return
if (toolCall.status === 'executing') return
toolCall.status = 'executing'
try {
const result = await executeToolServerSide(toolCall, execContext)
toolCall.status = result.success ? 'success' : 'error'
toolCall.result = result
toolCall.error = result.error
toolCall.endTime = Date.now()
await markToolComplete(
toolCall.id,
toolCall.name,
result.success ? 200 : 500,
result.error || (result.success ? 'Tool completed' : 'Tool failed'),
result.output
)
await options?.onEvent?.({
type: 'tool_result',
toolCallId: toolCall.id,
data: {
id: toolCall.id,
name: toolCall.name,
success: result.success,
result: result.output,
},
})
} catch (error) {
toolCall.status = 'error'
toolCall.error = error instanceof Error ? error.message : String(error)
toolCall.endTime = Date.now()
await markToolComplete(toolCall.id, toolCall.name, 500, toolCall.error)
await options?.onEvent?.({
type: 'tool_error',
toolCallId: toolCall.id,
data: {
id: toolCall.id,
name: toolCall.name,
error: toolCall.error,
},
})
}
}
async function waitForToolDecision(
toolCallId: string,
timeoutMs: number
): Promise<{ status: string; message?: string } | null> {
const start = Date.now()
while (Date.now() - start < timeoutMs) {
const decision = await getToolConfirmation(toolCallId)
if (decision?.status) {
return decision
}
await new Promise((resolve) => setTimeout(resolve, 100))
}
return null
}
export const sseHandlers: Record<string, SSEHandler> = {
chat_id: (event, context) => {
context.chatId = event.data?.chatId
},
title_updated: () => {},
tool_result: (event, context) => {
const toolCallId = event.toolCallId || event.data?.id
if (!toolCallId) return
const current = context.toolCalls.get(toolCallId)
if (!current) return
const success = event.data?.success ?? event.data?.result?.success
current.status = success ? 'success' : 'error'
current.endTime = Date.now()
if (event.data?.result || event.data?.data) {
current.result = {
success: !!success,
output: event.data?.result || event.data?.data,
}
}
},
tool_error: (event, context) => {
const toolCallId = event.toolCallId || event.data?.id
if (!toolCallId) return
const current = context.toolCalls.get(toolCallId)
if (!current) return
current.status = 'error'
current.error = event.data?.error || 'Tool execution failed'
current.endTime = Date.now()
},
tool_generating: (event, context) => {
const toolCallId = event.toolCallId || event.data?.toolCallId || event.data?.id
const toolName = event.toolName || event.data?.toolName || event.data?.name
if (!toolCallId || !toolName) return
if (!context.toolCalls.has(toolCallId)) {
context.toolCalls.set(toolCallId, {
id: toolCallId,
name: toolName,
status: 'pending',
startTime: Date.now(),
})
}
},
tool_call: async (event, context, execContext, options) => {
const toolData = event.data || {}
const toolCallId = toolData.id || event.toolCallId
const toolName = toolData.name || event.toolName
if (!toolCallId || !toolName) return
const args = toolData.arguments || toolData.input || event.data?.input
const isPartial = toolData.partial === true
const existing = context.toolCalls.get(toolCallId)
const toolCall: ToolCallState = existing
? { ...existing, status: 'pending', params: args || existing.params }
: {
id: toolCallId,
name: toolName,
status: 'pending',
params: args,
startTime: Date.now(),
}
context.toolCalls.set(toolCallId, toolCall)
addContentBlock(context, { type: 'tool_call', toolCall })
if (isPartial) return
const isInterruptTool = INTERRUPT_TOOL_SET.has(toolName)
const isInteractive = options.interactive === true
if (isInterruptTool && isInteractive) {
const decision = await waitForToolDecision(toolCallId, options.timeout || 600000)
if (decision?.status === 'accepted' || decision?.status === 'success') {
await executeToolAndReport(toolCallId, context, execContext, options)
return
}
if (decision?.status === 'rejected' || decision?.status === 'error') {
toolCall.status = 'rejected'
toolCall.endTime = Date.now()
await markToolComplete(
toolCall.id,
toolCall.name,
400,
decision.message || 'Tool execution rejected',
{ skipped: true, reason: 'user_rejected' }
)
await options.onEvent?.({
type: 'tool_result',
toolCallId: toolCall.id,
data: {
id: toolCall.id,
name: toolCall.name,
success: false,
result: { skipped: true, reason: 'user_rejected' },
},
})
return
}
if (decision?.status === 'background') {
toolCall.status = 'skipped'
toolCall.endTime = Date.now()
await markToolComplete(
toolCall.id,
toolCall.name,
202,
decision.message || 'Tool execution moved to background',
{ background: true }
)
await options.onEvent?.({
type: 'tool_result',
toolCallId: toolCall.id,
data: {
id: toolCall.id,
name: toolCall.name,
success: true,
result: { background: true },
},
})
return
}
}
if (options.autoExecuteTools !== false) {
await executeToolAndReport(toolCallId, context, execContext, options)
}
},
reasoning: (event, context) => {
const phase = event.data?.phase || event.data?.data?.phase
if (phase === 'start') {
context.isInThinkingBlock = true
context.currentThinkingBlock = {
type: 'thinking',
content: '',
timestamp: Date.now(),
}
return
}
if (phase === 'end') {
if (context.currentThinkingBlock) {
context.contentBlocks.push(context.currentThinkingBlock)
}
context.isInThinkingBlock = false
context.currentThinkingBlock = null
return
}
const chunk = typeof event.data === 'string' ? event.data : event.data?.data || event.data?.content
if (!chunk || !context.currentThinkingBlock) return
context.currentThinkingBlock.content = `${context.currentThinkingBlock.content || ''}${chunk}`
},
content: (event, context) => {
const chunk = typeof event.data === 'string' ? event.data : event.data?.content || event.data?.data
if (!chunk) return
context.accumulatedContent += chunk
addContentBlock(context, { type: 'text', content: chunk })
},
done: (event, context) => {
if (event.data?.responseId) {
context.conversationId = event.data.responseId
}
context.streamComplete = true
},
start: (event, context) => {
if (event.data?.responseId) {
context.conversationId = event.data.responseId
}
},
error: (event, context) => {
const message =
event.data?.message || event.data?.error || (typeof event.data === 'string' ? event.data : null)
if (message) {
context.errors.push(message)
}
context.streamComplete = true
},
}
export const subAgentHandlers: Record<string, SSEHandler> = {
content: (event, context) => {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId || !event.data) return
const chunk = typeof event.data === 'string' ? event.data : event.data?.content || ''
if (!chunk) return
context.subAgentContent[parentToolCallId] = (context.subAgentContent[parentToolCallId] || '') + chunk
addContentBlock(context, { type: 'subagent_text', content: chunk })
},
tool_call: async (event, context, execContext, options) => {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId) return
const toolData = event.data || {}
const toolCallId = toolData.id || event.toolCallId
const toolName = toolData.name || event.toolName
if (!toolCallId || !toolName) return
const isPartial = toolData.partial === true
const args = toolData.arguments || toolData.input || event.data?.input
const toolCall: ToolCallState = {
id: toolCallId,
name: toolName,
status: 'pending',
params: args,
startTime: Date.now(),
}
if (!context.subAgentToolCalls[parentToolCallId]) {
context.subAgentToolCalls[parentToolCallId] = []
}
context.subAgentToolCalls[parentToolCallId].push(toolCall)
if (isPartial) return
if (options.autoExecuteTools !== false) {
await executeToolAndReport(toolCallId, context, execContext, options)
}
},
tool_result: (event, context) => {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId) return
const toolCallId = event.toolCallId || event.data?.id
if (!toolCallId) return
const toolCalls = context.subAgentToolCalls[parentToolCallId] || []
const toolCall = toolCalls.find((tc) => tc.id === toolCallId)
if (!toolCall) return
toolCall.status = event.data?.success ? 'success' : 'error'
toolCall.endTime = Date.now()
},
}
export function handleSubagentRouting(event: SSEEvent, context: StreamingContext): boolean {
if (!event.subagent) return false
if (!context.subAgentParentToolCallId) {
logger.warn('Subagent event missing parent tool call', {
type: event.type,
subagent: event.subagent,
})
return false
}
return true
}

View File

@@ -0,0 +1,72 @@
import { createLogger } from '@sim/logger'
import type { SSEEvent } from '@/lib/copilot/orchestrator/types'
const logger = createLogger('CopilotSseParser')
/**
* Parses SSE streams from the copilot backend into typed events.
*/
export async function* parseSSEStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
decoder: TextDecoder,
abortSignal?: AbortSignal
): AsyncGenerator<SSEEvent> {
let buffer = ''
try {
while (true) {
if (abortSignal?.aborted) {
logger.info('SSE stream aborted by signal')
break
}
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.trim()) continue
if (!line.startsWith('data: ')) continue
const jsonStr = line.slice(6)
if (jsonStr === '[DONE]') continue
try {
const event = JSON.parse(jsonStr) as SSEEvent
if (event?.type) {
yield event
}
} catch (error) {
logger.warn('Failed to parse SSE event', {
preview: jsonStr.slice(0, 200),
error: error instanceof Error ? error.message : String(error),
})
}
}
}
if (buffer.trim() && buffer.startsWith('data: ')) {
try {
const event = JSON.parse(buffer.slice(6)) as SSEEvent
if (event?.type) {
yield event
}
} catch (error) {
logger.warn('Failed to parse final SSE buffer', {
preview: buffer.slice(0, 200),
error: error instanceof Error ? error.message : String(error),
})
}
}
} finally {
try {
reader.releaseLock()
} catch {
logger.warn('Failed to release SSE reader lock')
}
}
}

View File

@@ -0,0 +1,239 @@
import { db } from '@sim/db'
import { account, workflow } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import { refreshTokenIfNeeded } from '@/app/api/auth/oauth/utils'
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants'
import { generateRequestId } from '@/lib/core/utils/request'
import { env } from '@/lib/core/config/env'
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
import { executeTool } from '@/tools'
import { getTool, resolveToolId } from '@/tools/utils'
import { routeExecution } from '@/lib/copilot/tools/server/router'
import type { ExecutionContext, ToolCallResult, ToolCallState } from '@/lib/copilot/orchestrator/types'
const logger = createLogger('CopilotToolExecutor')
const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
const SERVER_TOOLS = new Set<string>([
'get_blocks_and_tools',
'get_blocks_metadata',
'get_block_options',
'get_block_config',
'get_trigger_blocks',
'edit_workflow',
'get_workflow_console',
'search_documentation',
'search_online',
'set_environment_variables',
'get_credentials',
'make_api_request',
'knowledge_base',
])
/**
* Execute a tool server-side without calling internal routes.
*/
export async function executeToolServerSide(
toolCall: ToolCallState,
context: ExecutionContext
): Promise<ToolCallResult> {
const toolName = toolCall.name
const resolvedToolName = resolveToolId(toolName)
if (SERVER_TOOLS.has(toolName)) {
return executeServerToolDirect(toolName, toolCall.params || {}, context)
}
const toolConfig = getTool(resolvedToolName)
if (!toolConfig) {
logger.warn('Tool not found in registry', { toolName, resolvedToolName })
return {
success: false,
error: `Tool not found: ${toolName}`,
}
}
return executeIntegrationToolDirect(toolCall, toolConfig, context)
}
/**
* Execute a server tool directly via the server tool router.
*/
async function executeServerToolDirect(
toolName: string,
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const result = await routeExecution(toolName, params, { userId: context.userId })
return { success: true, output: result }
} catch (error) {
logger.error('Server tool execution failed', {
toolName,
error: error instanceof Error ? error.message : String(error),
})
return {
success: false,
error: error instanceof Error ? error.message : 'Server tool execution failed',
}
}
}
/**
* Execute an integration tool directly via the tools registry.
*/
async function executeIntegrationToolDirect(
toolCall: ToolCallState,
toolConfig: any,
context: ExecutionContext
): Promise<ToolCallResult> {
const { userId, workflowId } = context
const toolName = resolveToolId(toolCall.name)
const toolArgs = toolCall.params || {}
let workspaceId = context.workspaceId
if (!workspaceId && workflowId) {
const workflowResult = await db
.select({ workspaceId: workflow.workspaceId })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
workspaceId = workflowResult[0]?.workspaceId ?? undefined
}
const decryptedEnvVars =
context.decryptedEnvVars || (await getEffectiveDecryptedEnv(userId, workspaceId))
const executionParams: Record<string, any> = resolveEnvVarReferences(
toolArgs,
decryptedEnvVars,
{ deep: true }
) as Record<string, any>
if (toolConfig.oauth?.required && toolConfig.oauth.provider) {
const provider = toolConfig.oauth.provider
const accounts = await db
.select()
.from(account)
.where(and(eq(account.providerId, provider), eq(account.userId, userId)))
.limit(1)
if (!accounts.length) {
return {
success: false,
error: `No ${provider} account connected. Please connect your account first.`,
}
}
const acc = accounts[0]
const requestId = generateRequestId()
const { accessToken } = await refreshTokenIfNeeded(requestId, acc as any, acc.id)
if (!accessToken) {
return {
success: false,
error: `OAuth token not available for ${provider}. Please reconnect your account.`,
}
}
executionParams.accessToken = accessToken
}
if (toolConfig.params?.apiKey?.required && !executionParams.apiKey) {
return {
success: false,
error: `API key not provided for ${toolName}. Use {{YOUR_API_KEY_ENV_VAR}} to reference your environment variable.`,
}
}
executionParams._context = {
workflowId,
userId,
}
if (toolName === 'function_execute') {
executionParams.envVars = decryptedEnvVars
executionParams.workflowVariables = {}
executionParams.blockData = {}
executionParams.blockNameMapping = {}
executionParams.language = executionParams.language || 'javascript'
executionParams.timeout = executionParams.timeout || 30000
}
const result = await executeTool(toolName, executionParams)
return {
success: result.success,
output: result.output,
error: result.error,
}
}
/**
* Notify the copilot backend that a tool has completed.
*/
export async function markToolComplete(
toolCallId: string,
toolName: string,
status: number,
message?: any,
data?: any
): Promise<boolean> {
try {
const response = await fetch(`${SIM_AGENT_API_URL}/api/tools/mark-complete`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
},
body: JSON.stringify({
id: toolCallId,
name: toolName,
status,
message,
data,
}),
})
if (!response.ok) {
logger.warn('Mark-complete call failed', { toolCallId, status: response.status })
return false
}
return true
} catch (error) {
logger.error('Mark-complete call failed', {
toolCallId,
error: error instanceof Error ? error.message : String(error),
})
return false
}
}
/**
* Prepare execution context with cached environment values.
*/
export async function prepareExecutionContext(
userId: string,
workflowId: string
): Promise<ExecutionContext> {
let workspaceId: string | undefined
const workflowResult = await db
.select({ workspaceId: workflow.workspaceId })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
workspaceId = workflowResult[0]?.workspaceId ?? undefined
const decryptedEnvVars = await getEffectiveDecryptedEnv(userId, workspaceId)
return {
userId,
workflowId,
workspaceId,
decryptedEnvVars,
}
}

View File

@@ -0,0 +1,127 @@
import type { CopilotProviderConfig } from '@/lib/copilot/types'
export type SSEEventType =
| 'chat_id'
| 'title_updated'
| 'content'
| 'reasoning'
| 'tool_call'
| 'tool_generating'
| 'tool_result'
| 'tool_error'
| 'subagent_start'
| 'subagent_end'
| 'done'
| 'error'
| 'start'
export interface SSEEvent {
type: SSEEventType
data?: any
subagent?: string
toolCallId?: string
toolName?: string
}
export type ToolCallStatus = 'pending' | 'executing' | 'success' | 'error' | 'skipped' | 'rejected'
export interface ToolCallState {
id: string
name: string
status: ToolCallStatus
params?: Record<string, any>
result?: ToolCallResult
error?: string
startTime?: number
endTime?: number
}
export interface ToolCallResult {
success: boolean
output?: any
error?: string
}
export type ContentBlockType = 'text' | 'thinking' | 'tool_call' | 'subagent_text'
export interface ContentBlock {
type: ContentBlockType
content?: string
toolCall?: ToolCallState
timestamp: number
}
export interface StreamingContext {
chatId?: string
conversationId?: string
messageId: string
accumulatedContent: string
contentBlocks: ContentBlock[]
toolCalls: Map<string, ToolCallState>
currentThinkingBlock: ContentBlock | null
isInThinkingBlock: boolean
subAgentParentToolCallId?: string
subAgentContent: Record<string, string>
subAgentToolCalls: Record<string, ToolCallState[]>
pendingContent: string
streamComplete: boolean
wasAborted: boolean
errors: string[]
}
export interface OrchestratorRequest {
message: string
workflowId: string
userId: string
chatId?: string
mode?: 'agent' | 'ask' | 'plan'
model?: string
conversationId?: string
contexts?: Array<{ type: string; content: string }>
fileAttachments?: any[]
commands?: string[]
provider?: CopilotProviderConfig
streamToolCalls?: boolean
version?: string
prefetch?: boolean
userName?: string
}
export interface OrchestratorOptions {
autoExecuteTools?: boolean
timeout?: number
onEvent?: (event: SSEEvent) => void | Promise<void>
onComplete?: (result: OrchestratorResult) => void | Promise<void>
onError?: (error: Error) => void | Promise<void>
abortSignal?: AbortSignal
interactive?: boolean
}
export interface OrchestratorResult {
success: boolean
content: string
contentBlocks: ContentBlock[]
toolCalls: ToolCallSummary[]
chatId?: string
conversationId?: string
error?: string
errors?: string[]
}
export interface ToolCallSummary {
id: string
name: string
status: ToolCallStatus
params?: Record<string, any>
result?: any
error?: string
durationMs?: number
}
export interface ExecutionContext {
userId: string
workflowId: string
workspaceId?: string
decryptedEnvVars?: Record<string, string>
}

View File

@@ -54,6 +54,7 @@ import { TestClientTool } from '@/lib/copilot/tools/client/other/test'
import { TourClientTool } from '@/lib/copilot/tools/client/other/tour'
import { WorkflowClientTool } from '@/lib/copilot/tools/client/other/workflow'
import { createExecutionContext, getTool } from '@/lib/copilot/tools/client/registry'
import { COPILOT_SERVER_ORCHESTRATED } from '@/lib/copilot/orchestrator/config'
import { GetCredentialsClientTool } from '@/lib/copilot/tools/client/user/get-credentials'
import { SetEnvironmentVariablesClientTool } from '@/lib/copilot/tools/client/user/set-environment-variables'
import { CheckDeploymentStatusClientTool } from '@/lib/copilot/tools/client/workflow/check-deployment-status'
@@ -1198,6 +1199,18 @@ const sseHandlers: Record<string, SSEHandler> = {
}
} catch {}
}
if (COPILOT_SERVER_ORCHESTRATED && current.name === 'edit_workflow') {
try {
const resultPayload =
data?.result || data?.data?.result || data?.data?.data || data?.data || {}
const workflowState = resultPayload?.workflowState
if (workflowState) {
const diffStore = useWorkflowDiffStore.getState()
void diffStore.setProposedChanges(workflowState)
}
} catch {}
}
}
// Update inline content block state
@@ -1362,6 +1375,10 @@ const sseHandlers: Record<string, SSEHandler> = {
return
}
if (COPILOT_SERVER_ORCHESTRATED) {
return
}
// Prefer interface-based registry to determine interrupt and execute
try {
const def = name ? getTool(name) : undefined
@@ -3820,6 +3837,9 @@ export const useCopilotStore = create<CopilotStore>()(
setEnabledModels: (models) => set({ enabledModels: models }),
executeIntegrationTool: async (toolCallId: string) => {
if (COPILOT_SERVER_ORCHESTRATED) {
return
}
const { toolCallsById, workflowId } = get()
const toolCall = toolCallsById[toolCallId]
if (!toolCall || !workflowId) return

View File

@@ -0,0 +1,927 @@
# Copilot Server-Side Refactor Plan
> **Goal**: Move copilot orchestration logic from the browser (React/Zustand) to the Next.js server, enabling both headless API access and a simplified interactive client.
## Table of Contents
1. [Executive Summary](#executive-summary)
2. [Current Architecture](#current-architecture)
3. [Target Architecture](#target-architecture)
4. [Scope & Boundaries](#scope--boundaries)
5. [Module Design](#module-design)
6. [Implementation Plan](#implementation-plan)
7. [API Contracts](#api-contracts)
8. [Migration Strategy](#migration-strategy)
9. [Testing Strategy](#testing-strategy)
10. [Risks & Mitigations](#risks--mitigations)
11. [File Inventory](#file-inventory)
---
## Executive Summary
### Problem
The current copilot implementation in Sim has all orchestration logic in the browser:
- SSE stream parsing happens in the React client
- Tool execution is triggered from the browser
- OAuth tokens are sent to the client
- No headless/API access is possible
- The Zustand store is ~4,200 lines of complex async logic
### Solution
Move orchestration to the Next.js server:
- Server parses SSE from copilot backend
- Server executes tools directly (no HTTP round-trips)
- Server forwards events to client (if attached)
- Headless API returns JSON response
- Client store becomes a thin UI layer (~600 lines)
### Benefits
| Aspect | Before | After |
|--------|--------|-------|
| Security | OAuth tokens in browser | Tokens stay server-side |
| Headless access | Not possible | Full API support |
| Store complexity | ~4,200 lines | ~600 lines |
| Tool execution | Browser-initiated | Server-side |
| Testing | Complex async | Simple state |
| Bundle size | Large (tool classes) | Minimal |
---
## Current Architecture
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ BROWSER (React) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ Copilot Store (4,200 lines) ││
│ │ ││
│ │ • SSE stream parsing (parseSSEStream) ││
│ │ • Event handlers (sseHandlers, subAgentSSEHandlers) ││
│ │ • Tool execution logic ││
│ │ • Client tool instantiation ││
│ │ • Content block processing ││
│ │ • State management ││
│ │ • UI state ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │ │
│ │ HTTP calls for tool execution │
│ ▼ │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ NEXT.JS SERVER │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ /api/copilot/chat - Proxy to copilot backend (pass-through) │
│ /api/copilot/execute-tool - Execute integration tools │
│ /api/copilot/confirm - Update Redis with tool status │
│ /api/copilot/tools/mark-complete - Notify copilot backend │
│ /api/copilot/execute-copilot-server-tool - Execute server tools │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ COPILOT BACKEND (Go) │
│ copilot.sim.ai │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ • LLM orchestration │
│ • Subagent system (plan, edit, debug, etc.) │
│ • Tool definitions │
│ • Conversation management │
│ • SSE streaming │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
```
### Current Flow (Interactive)
1. User sends message in UI
2. Store calls `/api/copilot/chat`
3. Chat route proxies to copilot backend, streams SSE back
4. **Store parses SSE in browser**
5. On `tool_call` event:
- Store decides if tool needs confirmation
- Store calls `/api/copilot/execute-tool` or `/api/copilot/execute-copilot-server-tool`
- Store calls `/api/copilot/tools/mark-complete`
6. Store updates UI state
### Problems with Current Flow
1. **No headless access**: Must have browser client
2. **Security**: OAuth tokens sent to browser for tool execution
3. **Complexity**: All orchestration logic in Zustand store
4. **Performance**: Multiple HTTP round-trips from browser
5. **Reliability**: Browser can disconnect mid-operation
6. **Testing**: Hard to test async browser logic
---
## Target Architecture
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ BROWSER (React) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ Copilot Store (~600 lines) ││
│ │ ││
│ │ • UI state (messages, toolCalls display) ││
│ │ • Event listener (receive server events) ││
│ │ • User actions (send message, confirm/reject) ││
│ │ • Simple API calls ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │ │
│ │ SSE events from server │
│ │ │
└─────────────────────────────────────────────────────────────────────────────┘
│ (Optional - headless mode has no client)
┌─────────────────────────────────────────────────────────────────────────────┐
│ NEXT.JS SERVER │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ Orchestrator Module (NEW) ││
│ │ lib/copilot/orchestrator/ ││
│ │ ││
│ │ • SSE stream parsing ││
│ │ • Event handlers ││
│ │ • Tool execution (direct function calls) ││
│ │ • Response building ││
│ │ • Event forwarding (to client if attached) ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │ │
│ ┌──────┴──────┐ │
│ │ │ │
│ ▼ ▼ │
│ /api/copilot/chat /api/v1/copilot/chat │
│ (Interactive) (Headless) │
│ - Session auth - API key auth │
│ - SSE to client - JSON response │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
│ (Single external HTTP call)
┌─────────────────────────────────────────────────────────────────────────────┐
│ COPILOT BACKEND (Go) │
│ (UNCHANGED - no modifications) │
└─────────────────────────────────────────────────────────────────────────────┘
```
### Target Flow (Headless)
1. External client calls `POST /api/v1/copilot/chat` with API key
2. Orchestrator calls copilot backend
3. **Server parses SSE stream**
4. **Server executes tools directly** (no HTTP)
5. Server notifies copilot backend (mark-complete)
6. Server returns JSON response
### Target Flow (Interactive)
1. User sends message in UI
2. Store calls `/api/copilot/chat`
3. **Server orchestrates everything**
4. Server forwards events to client via SSE
5. Client just updates UI from events
6. Server returns when complete
---
## Scope & Boundaries
### In Scope
| Item | Description |
|------|-------------|
| Orchestrator module | New module in `lib/copilot/orchestrator/` |
| Headless API route | New route `POST /api/v1/copilot/chat` |
| SSE parsing | Move from store to server |
| Tool execution | Direct function calls on server |
| Event forwarding | SSE to client (interactive mode) |
| Store simplification | Reduce to UI-only logic |
### Out of Scope
| Item | Reason |
|------|--------|
| Copilot backend (Go) | Separate repo, working correctly |
| Tool definitions | Already work, just called differently |
| LLM providers | Handled by copilot backend |
| Subagent system | Handled by copilot backend |
### Boundaries
```
┌─────────────────────────────────────┐
│ MODIFICATION ZONE │
│ │
┌────────────────┼─────────────────────────────────────┼────────────────┐
│ │ │ │
│ UNCHANGED │ apps/sim/ │ UNCHANGED │
│ │ ├── lib/copilot/orchestrator/ │ │
│ copilot/ │ │ └── (NEW) │ apps/sim/ │
│ (Go backend) │ ├── app/api/v1/copilot/ │ tools/ │
│ │ │ └── (NEW) │ (definitions)│
│ │ ├── app/api/copilot/chat/ │ │
│ │ │ └── (MODIFIED) │ │
│ │ └── stores/panel/copilot/ │ │
│ │ └── (SIMPLIFIED) │ │
│ │ │ │
└────────────────┼─────────────────────────────────────┼────────────────┘
│ │
└─────────────────────────────────────┘
```
---
## Module Design
### Directory Structure
```
apps/sim/lib/copilot/orchestrator/
├── index.ts # Main orchestrator function
├── types.ts # Type definitions
├── sse-parser.ts # Parse SSE stream from copilot backend
├── sse-handlers.ts # Handle each SSE event type
├── tool-executor.ts # Execute tools directly (no HTTP)
├── persistence.ts # Database and Redis operations
└── response-builder.ts # Build final response
```
### Module Responsibilities
#### `types.ts`
Defines all types used by the orchestrator:
```typescript
// SSE Events
interface SSEEvent { type, data, subagent?, toolCallId?, toolName? }
type SSEEventType = 'content' | 'tool_call' | 'tool_result' | 'done' | ...
// Tool State
interface ToolCallState { id, name, status, params?, result?, error? }
type ToolCallStatus = 'pending' | 'executing' | 'success' | 'error' | 'skipped'
// Streaming Context (internal state during orchestration)
interface StreamingContext {
chatId?, conversationId?, messageId
accumulatedContent, contentBlocks
toolCalls: Map<string, ToolCallState>
streamComplete, errors[]
}
// Orchestrator API
interface OrchestratorRequest { message, workflowId, userId, chatId?, mode?, ... }
interface OrchestratorOptions { autoExecuteTools?, onEvent?, timeout?, ... }
interface OrchestratorResult { success, content, toolCalls[], chatId?, error? }
// Execution Context (passed to tool executors)
interface ExecutionContext { userId, workflowId, workspaceId?, decryptedEnvVars? }
```
#### `sse-parser.ts`
Parses SSE stream into typed events:
```typescript
async function* parseSSEStream(
reader: ReadableStreamDefaultReader,
decoder: TextDecoder,
abortSignal?: AbortSignal
): AsyncGenerator<SSEEvent>
```
- Handles buffering for partial lines
- Parses JSON from `data:` lines
- Yields typed `SSEEvent` objects
- Supports abort signal
#### `sse-handlers.ts`
Handles each SSE event type:
```typescript
const sseHandlers: Record<SSEEventType, SSEHandler> = {
content: (event, context) => { /* append to accumulated content */ },
tool_call: async (event, context, execContext, options) => {
/* track tool, execute if autoExecuteTools */
},
tool_result: (event, context) => { /* update tool status */ },
tool_generating: (event, context) => { /* create pending tool */ },
reasoning: (event, context) => { /* handle thinking blocks */ },
done: (event, context) => { /* mark stream complete */ },
error: (event, context) => { /* record error */ },
// ... etc
}
const subAgentHandlers: Record<SSEEventType, SSEHandler> = {
// Handlers for events within subagent context
}
```
#### `tool-executor.ts`
Executes tools directly without HTTP:
```typescript
// Main entry point
async function executeToolServerSide(
toolCall: ToolCallState,
context: ExecutionContext
): Promise<ToolCallResult>
// Server tools (edit_workflow, search_documentation, etc.)
async function executeServerToolDirect(
toolName: string,
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult>
// Integration tools (slack_send, gmail_read, etc.)
async function executeIntegrationToolDirect(
toolCallId: string,
toolName: string,
toolConfig: ToolConfig,
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult>
// Notify copilot backend (external HTTP - required)
async function markToolComplete(
toolCallId: string,
toolName: string,
status: number,
message?: any,
data?: any
): Promise<boolean>
// Prepare cached context for tool execution
async function prepareExecutionContext(
userId: string,
workflowId: string
): Promise<ExecutionContext>
```
**Key principle**: Internal tool execution uses direct function calls. Only `markToolComplete` makes HTTP call (to copilot backend - external).
#### `persistence.ts`
Database and Redis operations:
```typescript
// Chat persistence
async function createChat(params): Promise<{ id: string }>
async function loadChat(chatId, userId): Promise<Chat | null>
async function saveMessages(chatId, messages, options?): Promise<void>
async function updateChatConversationId(chatId, conversationId): Promise<void>
// Tool confirmation (Redis)
async function setToolConfirmation(toolCallId, status, message?): Promise<boolean>
async function getToolConfirmation(toolCallId): Promise<Confirmation | null>
```
#### `index.ts`
Main orchestrator function:
```typescript
async function orchestrateCopilotRequest(
request: OrchestratorRequest,
options: OrchestratorOptions = {}
): Promise<OrchestratorResult> {
// 1. Prepare execution context (cache env vars, etc.)
const execContext = await prepareExecutionContext(userId, workflowId)
// 2. Handle chat creation/loading
let chatId = await resolveChat(request)
// 3. Build request payload for copilot backend
const payload = buildCopilotPayload(request)
// 4. Call copilot backend
const response = await fetch(COPILOT_URL, { body: JSON.stringify(payload) })
// 5. Create streaming context
const context = createStreamingContext(chatId)
// 6. Parse and handle SSE stream
for await (const event of parseSSEStream(response.body)) {
// Forward to client if attached
options.onEvent?.(event)
// Handle event
const handler = getHandler(event)
await handler(event, context, execContext, options)
if (context.streamComplete) break
}
// 7. Persist to database
await persistChat(chatId, context)
// 8. Build and return result
return buildResult(context)
}
```
---
## Implementation Plan
### Phase 1: Create Orchestrator Module (3-4 days)
**Goal**: Build the orchestrator module that can run independently.
#### Tasks
1. **Create `types.ts`** (~200 lines)
- [ ] Define SSE event types
- [ ] Define tool call state types
- [ ] Define streaming context type
- [ ] Define orchestrator request/response types
- [ ] Define execution context type
2. **Create `sse-parser.ts`** (~80 lines)
- [ ] Extract parsing logic from store.ts
- [ ] Add abort signal support
- [ ] Add error handling
3. **Create `persistence.ts`** (~120 lines)
- [ ] Extract DB operations from chat route
- [ ] Extract Redis operations from confirm route
- [ ] Add chat creation/loading
- [ ] Add message saving
4. **Create `tool-executor.ts`** (~300 lines)
- [ ] Create `executeToolServerSide()` main entry
- [ ] Create `executeServerToolDirect()` for server tools
- [ ] Create `executeIntegrationToolDirect()` for integration tools
- [ ] Create `markToolComplete()` for copilot backend notification
- [ ] Create `prepareExecutionContext()` for caching
- [ ] Handle OAuth token resolution
- [ ] Handle env var resolution
5. **Create `sse-handlers.ts`** (~350 lines)
- [ ] Extract handlers from store.ts
- [ ] Adapt for server-side context
- [ ] Add tool execution integration
- [ ] Add subagent handlers
6. **Create `index.ts`** (~250 lines)
- [ ] Create `orchestrateCopilotRequest()` main function
- [ ] Wire together all modules
- [ ] Add timeout handling
- [ ] Add abort signal support
- [ ] Add event forwarding
#### Deliverables
- Complete `lib/copilot/orchestrator/` module
- Unit tests for each component
- Integration test for full orchestration
### Phase 2: Create Headless API Route (1 day)
**Goal**: Create API endpoint for headless copilot access.
#### Tasks
1. **Create route** `app/api/v1/copilot/chat/route.ts` (~100 lines)
- [ ] Add API key authentication
- [ ] Parse and validate request
- [ ] Call orchestrator
- [ ] Return JSON response
2. **Add to API documentation**
- [ ] Document request format
- [ ] Document response format
- [ ] Document error codes
#### Deliverables
- Working `POST /api/v1/copilot/chat` endpoint
- API documentation
- E2E test
### Phase 3: Wire Interactive Route (2 days)
**Goal**: Use orchestrator for existing interactive flow.
#### Tasks
1. **Modify `/api/copilot/chat/route.ts`**
- [ ] Add feature flag for new vs old flow
- [ ] Call orchestrator with `onEvent` callback
- [ ] Forward events to client via SSE
- [ ] Maintain backward compatibility
2. **Test both flows**
- [ ] Verify interactive works with new orchestrator
- [ ] Verify old flow still works (feature flag off)
#### Deliverables
- Interactive route using orchestrator
- Feature flag for gradual rollout
- No breaking changes
### Phase 4: Simplify Client Store (2-3 days)
**Goal**: Remove orchestration logic from client, keep UI-only.
#### Tasks
1. **Create simplified store** (new file or gradual refactor)
- [ ] Keep: UI state, messages, tool display
- [ ] Keep: Simple API calls
- [ ] Keep: Event listener
- [ ] Remove: SSE parsing
- [ ] Remove: Tool execution logic
- [ ] Remove: Client tool instantiators
2. **Update components**
- [ ] Update components to use simplified store
- [ ] Remove tool execution from UI components
- [ ] Simplify tool display components
3. **Remove dead code**
- [ ] Remove unused imports
- [ ] Remove unused helper functions
- [ ] Remove client tool classes (if no longer needed)
#### Deliverables
- Simplified store (~600 lines)
- Updated components
- Reduced bundle size
### Phase 5: Testing & Polish (2-3 days)
#### Tasks
1. **E2E testing**
- [ ] Test headless API with various prompts
- [ ] Test interactive with various prompts
- [ ] Test tool execution scenarios
- [ ] Test error handling
- [ ] Test abort/timeout scenarios
2. **Performance testing**
- [ ] Compare latency (old vs new)
- [ ] Check memory usage
- [ ] Check for connection issues
3. **Documentation**
- [ ] Update developer docs
- [ ] Add architecture diagram
- [ ] Document new API
#### Deliverables
- Comprehensive test suite
- Performance benchmarks
- Complete documentation
---
## API Contracts
### Headless API
#### Request
```http
POST /api/v1/copilot/chat
Content-Type: application/json
X-API-Key: sim_xxx
{
"message": "Create a Slack notification workflow",
"workflowId": "wf_abc123",
"chatId": "chat_xyz", // Optional: continue existing chat
"mode": "agent", // Optional: "agent" | "ask" | "plan"
"model": "claude-4-sonnet", // Optional
"autoExecuteTools": true, // Optional: default true
"timeout": 300000 // Optional: default 5 minutes
}
```
#### Response (Success)
```json
{
"success": true,
"content": "I've created a Slack notification workflow that...",
"toolCalls": [
{
"id": "tc_001",
"name": "search_patterns",
"status": "success",
"params": { "query": "slack notification" },
"result": { "patterns": [...] },
"durationMs": 234
},
{
"id": "tc_002",
"name": "edit_workflow",
"status": "success",
"params": { "operations": [...] },
"result": { "blocksAdded": 3 },
"durationMs": 1523
}
],
"chatId": "chat_xyz",
"conversationId": "conv_123"
}
```
#### Response (Error)
```json
{
"success": false,
"error": "Workflow not found",
"content": "",
"toolCalls": []
}
```
#### Error Codes
| Status | Error | Description |
|--------|-------|-------------|
| 400 | Invalid request | Missing required fields |
| 401 | Unauthorized | Invalid or missing API key |
| 404 | Workflow not found | Workflow ID doesn't exist |
| 500 | Internal error | Server-side failure |
| 504 | Timeout | Request exceeded timeout |
### Interactive API (Existing - Modified)
The existing `/api/copilot/chat` endpoint continues to work but now uses the orchestrator internally. SSE events forwarded to client remain the same format.
---
## Migration Strategy
### Rollout Plan
```
Week 1: Phase 1 (Orchestrator)
├── Day 1-2: Types + SSE Parser
├── Day 3: Tool Executor
└── Day 4-5: Handlers + Main Orchestrator
Week 2: Phase 2-3 (Routes)
├── Day 1: Headless API route
├── Day 2-3: Wire interactive route
└── Day 4-5: Testing both modes
Week 3: Phase 4-5 (Cleanup)
├── Day 1-3: Simplify store
├── Day 4: Testing
└── Day 5: Documentation
```
### Feature Flags
```typescript
// lib/copilot/config.ts
export const COPILOT_FLAGS = {
// Use new orchestrator for interactive mode
USE_SERVER_ORCHESTRATOR: process.env.COPILOT_USE_SERVER_ORCHESTRATOR === 'true',
// Enable headless API
ENABLE_HEADLESS_API: process.env.COPILOT_ENABLE_HEADLESS_API === 'true',
}
```
### Rollback Plan
If issues arise:
1. Set `COPILOT_USE_SERVER_ORCHESTRATOR=false`
2. Interactive mode falls back to old client-side flow
3. Headless API returns 503 Service Unavailable
---
## Testing Strategy
### Unit Tests
```
lib/copilot/orchestrator/
├── __tests__/
│ ├── sse-parser.test.ts
│ ├── sse-handlers.test.ts
│ ├── tool-executor.test.ts
│ ├── persistence.test.ts
│ └── index.test.ts
```
#### SSE Parser Tests
```typescript
describe('parseSSEStream', () => {
it('parses content events')
it('parses tool_call events')
it('handles partial lines')
it('handles malformed JSON')
it('respects abort signal')
})
```
#### Tool Executor Tests
```typescript
describe('executeToolServerSide', () => {
it('executes server tools directly')
it('executes integration tools with OAuth')
it('resolves env var references')
it('handles tool not found')
it('handles execution errors')
})
```
### Integration Tests
```typescript
describe('orchestrateCopilotRequest', () => {
it('handles simple message without tools')
it('handles message with single tool call')
it('handles message with multiple tool calls')
it('handles subagent tool calls')
it('handles stream errors')
it('respects timeout')
it('forwards events to callback')
})
```
### E2E Tests
```typescript
describe('POST /api/v1/copilot/chat', () => {
it('returns 401 without API key')
it('returns 400 with invalid request')
it('executes simple ask query')
it('executes workflow modification')
it('handles tool execution')
})
```
---
## Risks & Mitigations
### Risk 1: Breaking Interactive Mode
**Risk**: Refactoring could break existing interactive copilot.
**Mitigation**:
- Feature flag for gradual rollout
- Keep old code path available
- Extensive E2E testing
- Staged deployment (internal → beta → production)
### Risk 2: Tool Execution Differences
**Risk**: Tool behavior differs between client and server execution.
**Mitigation**:
- Reuse existing tool execution logic (same functions)
- Compare outputs in parallel testing
- Log discrepancies for investigation
### Risk 3: Performance Regression
**Risk**: Server-side orchestration could be slower.
**Mitigation**:
- Actually should be faster (no browser round-trips)
- Benchmark before/after
- Profile critical paths
### Risk 4: Memory Usage
**Risk**: Server accumulates state during long-running requests.
**Mitigation**:
- Set reasonable timeouts
- Clean up context after request
- Monitor memory in production
### Risk 5: Connection Issues
**Risk**: Long-running SSE connections could drop.
**Mitigation**:
- Implement reconnection logic
- Save checkpoints to resume
- Handle partial completions gracefully
---
## File Inventory
### New Files
| File | Lines | Description |
|------|-------|-------------|
| `lib/copilot/orchestrator/types.ts` | ~200 | Type definitions |
| `lib/copilot/orchestrator/sse-parser.ts` | ~80 | SSE stream parsing |
| `lib/copilot/orchestrator/sse-handlers.ts` | ~350 | Event handlers |
| `lib/copilot/orchestrator/tool-executor.ts` | ~300 | Tool execution |
| `lib/copilot/orchestrator/persistence.ts` | ~120 | DB/Redis operations |
| `lib/copilot/orchestrator/index.ts` | ~250 | Main orchestrator |
| `app/api/v1/copilot/chat/route.ts` | ~100 | Headless API |
| **Total New** | **~1,400** | |
### Modified Files
| File | Change |
|------|--------|
| `app/api/copilot/chat/route.ts` | Use orchestrator (optional) |
| `stores/panel/copilot/store.ts` | Simplify to ~600 lines |
### Deleted Code (from store.ts)
| Section | Lines Removed |
|---------|---------------|
| SSE parsing logic | ~150 |
| `sseHandlers` object | ~750 |
| `subAgentSSEHandlers` | ~280 |
| Tool execution logic | ~400 |
| Client tool instantiators | ~120 |
| Content block helpers | ~200 |
| Streaming context | ~100 |
| **Total Removed** | **~2,000** |
### Net Change
```
New code: +1,400 lines (orchestrator module)
Removed code: -2,000 lines (from store)
Modified code: ~200 lines (route changes)
───────────────────────────────────────
Net change: -400 lines (cleaner, more maintainable)
```
---
## Appendix: Code Extraction Map
### From `stores/panel/copilot/store.ts`
| Source Lines | Destination | Notes |
|--------------|-------------|-------|
| 900-1050 (parseSSEStream) | `sse-parser.ts` | Adapt for server |
| 1120-1867 (sseHandlers) | `sse-handlers.ts` | Remove Zustand deps |
| 1940-2217 (subAgentSSEHandlers) | `sse-handlers.ts` | Merge with above |
| 1365-1583 (tool execution) | `tool-executor.ts` | Direct calls |
| 330-380 (StreamingContext) | `types.ts` | Clean up |
| 3328-3648 (handleStreamingResponse) | `index.ts` | Main loop |
### From `app/api/copilot/execute-tool/route.ts`
| Source Lines | Destination | Notes |
|--------------|-------------|-------|
| 30-247 (POST handler) | `tool-executor.ts` | Extract core logic |
### From `app/api/copilot/confirm/route.ts`
| Source Lines | Destination | Notes |
|--------------|-------------|-------|
| 28-89 (updateToolCallStatus) | `persistence.ts` | Redis operations |
---
## Approval & Sign-off
- [ ] Technical review complete
- [ ] Security review complete
- [ ] Performance impact assessed
- [ ] Rollback plan approved
- [ ] Testing plan approved
---
*Document created: January 2026*
*Last updated: January 2026*