Compare commits

...

10 Commits

Author SHA1 Message Date
Siddharth Ganesan
1ddcebfe57 BROKEN 2026-02-03 15:32:40 -08:00
Siddharth Ganesan
370adf362c Fix 2026-02-03 12:56:57 -08:00
Siddharth Ganesan
3b2b411a6a Improvement 2026-01-31 16:00:53 -08:00
Siddharth Ganesan
034819a7c1 mcp v1 2026-01-31 12:26:06 -08:00
Siddharth Ganesan
3d57125993 Add mcp 2026-01-31 11:38:26 -08:00
Siddharth Ganesan
c9e182216e Stuff 2026-01-30 17:01:15 -08:00
Siddharth Ganesan
f00c710d58 Ss tests 2026-01-30 16:52:23 -08:00
Siddharth Ganesan
e37c33eb9f Basic ss tes 2026-01-30 16:24:27 -08:00
Siddharth Ganesan
eedcde0ce1 v1 2026-01-30 12:15:41 -08:00
Siddharth Ganesan
deccca0276 v0 2026-01-30 11:33:08 -08:00
21 changed files with 5217 additions and 1004 deletions

View File

@@ -7,7 +7,7 @@ import { z } from 'zod'
import { getSession } from '@/lib/auth' import { getSession } from '@/lib/auth'
import { generateChatTitle } from '@/lib/copilot/chat-title' import { generateChatTitle } from '@/lib/copilot/chat-title'
import { getCopilotModel } from '@/lib/copilot/config' import { getCopilotModel } from '@/lib/copilot/config'
import { SIM_AGENT_API_URL_DEFAULT, SIM_AGENT_VERSION } from '@/lib/copilot/constants' import { SIM_AGENT_VERSION } from '@/lib/copilot/constants'
import { COPILOT_MODEL_IDS, COPILOT_REQUEST_MODES } from '@/lib/copilot/models' import { COPILOT_MODEL_IDS, COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
import { import {
authenticateCopilotRequestSessionOnly, authenticateCopilotRequestSessionOnly,
@@ -21,12 +21,13 @@ import type { CopilotProviderConfig } from '@/lib/copilot/types'
import { env } from '@/lib/core/config/env' import { env } from '@/lib/core/config/env'
import { CopilotFiles } from '@/lib/uploads' import { CopilotFiles } from '@/lib/uploads'
import { createFileContent } from '@/lib/uploads/utils/file-utils' import { createFileContent } from '@/lib/uploads/utils/file-utils'
import { resolveWorkflowIdForUser } from '@/lib/workflows/utils'
import { tools } from '@/tools/registry' import { tools } from '@/tools/registry'
import { getLatestVersionTools, stripVersionSuffix } from '@/tools/utils' import { getLatestVersionTools, stripVersionSuffix } from '@/tools/utils'
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
const logger = createLogger('CopilotChatAPI') const logger = createLogger('CopilotChatAPI')
const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
const FileAttachmentSchema = z.object({ const FileAttachmentSchema = z.object({
id: z.string(), id: z.string(),
@@ -40,7 +41,8 @@ const ChatMessageSchema = z.object({
message: z.string().min(1, 'Message is required'), message: z.string().min(1, 'Message is required'),
userMessageId: z.string().optional(), // ID from frontend for the user message userMessageId: z.string().optional(), // ID from frontend for the user message
chatId: z.string().optional(), chatId: z.string().optional(),
workflowId: z.string().min(1, 'Workflow ID is required'), workflowId: z.string().optional(),
workflowName: z.string().optional(),
model: z.enum(COPILOT_MODEL_IDS).optional().default('claude-4.5-opus'), model: z.enum(COPILOT_MODEL_IDS).optional().default('claude-4.5-opus'),
mode: z.enum(COPILOT_REQUEST_MODES).optional().default('agent'), mode: z.enum(COPILOT_REQUEST_MODES).optional().default('agent'),
prefetch: z.boolean().optional(), prefetch: z.boolean().optional(),
@@ -100,7 +102,8 @@ export async function POST(req: NextRequest) {
message, message,
userMessageId, userMessageId,
chatId, chatId,
workflowId, workflowId: providedWorkflowId,
workflowName,
model, model,
mode, mode,
prefetch, prefetch,
@@ -113,6 +116,20 @@ export async function POST(req: NextRequest) {
contexts, contexts,
commands, commands,
} = ChatMessageSchema.parse(body) } = ChatMessageSchema.parse(body)
// Resolve workflowId - if not provided, use first workflow or find by name
const resolved = await resolveWorkflowIdForUser(
authenticatedUserId,
providedWorkflowId,
workflowName
)
if (!resolved) {
return createBadRequestResponse(
'No workflows found. Create a workflow first or provide a valid workflowId.'
)
}
const workflowId = resolved.workflowId
// Ensure we have a consistent user message ID for this request // Ensure we have a consistent user message ID for this request
const userMessageIdToUse = userMessageId || crypto.randomUUID() const userMessageIdToUse = userMessageId || crypto.randomUUID()
try { try {
@@ -465,77 +482,19 @@ export async function POST(req: NextRequest) {
}) })
} catch {} } catch {}
const simAgentResponse = await fetch(`${SIM_AGENT_API_URL}/api/chat-completion-streaming`, { if (stream) {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
},
body: JSON.stringify(requestPayload),
})
if (!simAgentResponse.ok) {
if (simAgentResponse.status === 401 || simAgentResponse.status === 402) {
// Rethrow status only; client will render appropriate assistant message
return new NextResponse(null, { status: simAgentResponse.status })
}
const errorText = await simAgentResponse.text().catch(() => '')
logger.error(`[${tracker.requestId}] Sim agent API error:`, {
status: simAgentResponse.status,
error: errorText,
})
return NextResponse.json(
{ error: `Sim agent API error: ${simAgentResponse.statusText}` },
{ status: simAgentResponse.status }
)
}
// If streaming is requested, forward the stream and update chat later
if (stream && simAgentResponse.body) {
// Create user message to save
const userMessage = {
id: userMessageIdToUse, // Consistent ID used for request and persistence
role: 'user',
content: message,
timestamp: new Date().toISOString(),
...(fileAttachments && fileAttachments.length > 0 && { fileAttachments }),
...(Array.isArray(contexts) && contexts.length > 0 && { contexts }),
...(Array.isArray(contexts) &&
contexts.length > 0 && {
contentBlocks: [{ type: 'contexts', contexts: contexts as any, timestamp: Date.now() }],
}),
}
// Create a pass-through stream that captures the response
const transformedStream = new ReadableStream({ const transformedStream = new ReadableStream({
async start(controller) { async start(controller) {
const encoder = new TextEncoder() const encoder = new TextEncoder()
let assistantContent = ''
const toolCalls: any[] = []
let buffer = ''
const isFirstDone = true
let responseIdFromStart: string | undefined
let responseIdFromDone: string | undefined
// Track tool call progress to identify a safe done event
const announcedToolCallIds = new Set<string>()
const startedToolExecutionIds = new Set<string>()
const completedToolExecutionIds = new Set<string>()
let lastDoneResponseId: string | undefined
let lastSafeDoneResponseId: string | undefined
// Send chatId as first event
if (actualChatId) { if (actualChatId) {
const chatIdEvent = `data: ${JSON.stringify({ controller.enqueue(
type: 'chat_id', encoder.encode(
chatId: actualChatId, `data: ${JSON.stringify({ type: 'chat_id', chatId: actualChatId })}\n\n`
})}\n\n` )
controller.enqueue(encoder.encode(chatIdEvent)) )
logger.debug(`[${tracker.requestId}] Sent initial chatId event to client`)
} }
// Start title generation in parallel if needed
if (actualChatId && !currentChat?.title && conversationHistory.length === 0) { if (actualChatId && !currentChat?.title && conversationHistory.length === 0) {
generateChatTitle(message) generateChatTitle(message)
.then(async (title) => { .then(async (title) => {
@@ -547,311 +506,61 @@ export async function POST(req: NextRequest) {
updatedAt: new Date(), updatedAt: new Date(),
}) })
.where(eq(copilotChats.id, actualChatId!)) .where(eq(copilotChats.id, actualChatId!))
controller.enqueue(
const titleEvent = `data: ${JSON.stringify({ encoder.encode(`data: ${JSON.stringify({ type: 'title_updated', title })}\n\n`)
type: 'title_updated', )
title: title,
})}\n\n`
controller.enqueue(encoder.encode(titleEvent))
logger.info(`[${tracker.requestId}] Generated and saved title: ${title}`)
} }
}) })
.catch((error) => { .catch((error) => {
logger.error(`[${tracker.requestId}] Title generation failed:`, error) logger.error(`[${tracker.requestId}] Title generation failed:`, error)
}) })
} else {
logger.debug(`[${tracker.requestId}] Skipping title generation`)
} }
// Forward the sim agent stream and capture assistant response
const reader = simAgentResponse.body!.getReader()
const decoder = new TextDecoder()
try { try {
while (true) { const result = await orchestrateCopilotStream(requestPayload, {
const { done, value } = await reader.read() userId: authenticatedUserId,
if (done) { workflowId,
break chatId: actualChatId,
} autoExecuteTools: true,
interactive: true,
// Decode and parse SSE events for logging and capturing content onEvent: async (event) => {
const decodedChunk = decoder.decode(value, { stream: true })
buffer += decodedChunk
const lines = buffer.split('\n')
buffer = lines.pop() || '' // Keep incomplete line in buffer
for (const line of lines) {
if (line.trim() === '') continue // Skip empty lines
if (line.startsWith('data: ') && line.length > 6) {
try {
const jsonStr = line.slice(6)
// Check if the JSON string is unusually large (potential streaming issue)
if (jsonStr.length > 50000) {
// 50KB limit
logger.warn(`[${tracker.requestId}] Large SSE event detected`, {
size: jsonStr.length,
preview: `${jsonStr.substring(0, 100)}...`,
})
}
const event = JSON.parse(jsonStr)
// Log different event types comprehensively
switch (event.type) {
case 'content':
if (event.data) {
assistantContent += event.data
}
break
case 'reasoning':
logger.debug(
`[${tracker.requestId}] Reasoning chunk received (${(event.data || event.content || '').length} chars)`
)
break
case 'tool_call':
if (!event.data?.partial) {
toolCalls.push(event.data)
if (event.data?.id) {
announcedToolCallIds.add(event.data.id)
}
}
break
case 'tool_generating':
if (event.toolCallId) {
startedToolExecutionIds.add(event.toolCallId)
}
break
case 'tool_result':
if (event.toolCallId) {
completedToolExecutionIds.add(event.toolCallId)
}
break
case 'tool_error':
logger.error(`[${tracker.requestId}] Tool error:`, {
toolCallId: event.toolCallId,
toolName: event.toolName,
error: event.error,
success: event.success,
})
if (event.toolCallId) {
completedToolExecutionIds.add(event.toolCallId)
}
break
case 'start':
if (event.data?.responseId) {
responseIdFromStart = event.data.responseId
}
break
case 'done':
if (event.data?.responseId) {
responseIdFromDone = event.data.responseId
lastDoneResponseId = responseIdFromDone
// Mark this done as safe only if no tool call is currently in progress or pending
const announced = announcedToolCallIds.size
const completed = completedToolExecutionIds.size
const started = startedToolExecutionIds.size
const hasToolInProgress = announced > completed || started > completed
if (!hasToolInProgress) {
lastSafeDoneResponseId = responseIdFromDone
}
}
break
case 'error':
break
default:
}
// Emit to client: rewrite 'error' events into user-friendly assistant message
if (event?.type === 'error') {
try {
const displayMessage: string =
(event?.data && (event.data.displayMessage as string)) ||
'Sorry, I encountered an error. Please try again.'
const formatted = `_${displayMessage}_`
// Accumulate so it persists to DB as assistant content
assistantContent += formatted
// Send as content chunk
try {
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ type: 'content', data: formatted })}\n\n`
)
)
} catch (enqueueErr) {
reader.cancel()
break
}
// Then close this response cleanly for the client
try {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`)
)
} catch (enqueueErr) {
reader.cancel()
break
}
} catch {}
// Do not forward the original error event
} else {
// Forward original event to client
try {
controller.enqueue(encoder.encode(`data: ${jsonStr}\n\n`))
} catch (enqueueErr) {
reader.cancel()
break
}
}
} catch (e) {
// Enhanced error handling for large payloads and parsing issues
const lineLength = line.length
const isLargePayload = lineLength > 10000
if (isLargePayload) {
logger.error(
`[${tracker.requestId}] Failed to parse large SSE event (${lineLength} chars)`,
{
error: e,
preview: `${line.substring(0, 200)}...`,
size: lineLength,
}
)
} else {
logger.warn(
`[${tracker.requestId}] Failed to parse SSE event: "${line.substring(0, 200)}..."`,
e
)
}
}
} else if (line.trim() && line !== 'data: [DONE]') {
logger.debug(`[${tracker.requestId}] Non-SSE line from sim agent: "${line}"`)
}
}
}
// Process any remaining buffer
if (buffer.trim()) {
logger.debug(`[${tracker.requestId}] Processing remaining buffer: "${buffer}"`)
if (buffer.startsWith('data: ')) {
try { try {
const jsonStr = buffer.slice(6) controller.enqueue(encoder.encode(`data: ${JSON.stringify(event)}\n\n`))
const event = JSON.parse(jsonStr) } catch {
if (event.type === 'content' && event.data) { controller.error('Failed to forward SSE event')
assistantContent += event.data
}
// Forward remaining event, applying same error rewrite behavior
if (event?.type === 'error') {
const displayMessage: string =
(event?.data && (event.data.displayMessage as string)) ||
'Sorry, I encountered an error. Please try again.'
const formatted = `_${displayMessage}_`
assistantContent += formatted
try {
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ type: 'content', data: formatted })}\n\n`
)
)
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`)
)
} catch (enqueueErr) {
reader.cancel()
}
} else {
try {
controller.enqueue(encoder.encode(`data: ${jsonStr}\n\n`))
} catch (enqueueErr) {
reader.cancel()
}
}
} catch (e) {
logger.warn(`[${tracker.requestId}] Failed to parse final buffer: "${buffer}"`)
} }
} },
}
// Log final streaming summary
logger.info(`[${tracker.requestId}] Streaming complete summary:`, {
totalContentLength: assistantContent.length,
toolCallsCount: toolCalls.length,
hasContent: assistantContent.length > 0,
toolNames: toolCalls.map((tc) => tc?.name).filter(Boolean),
}) })
// NOTE: Messages are saved by the client via update-messages endpoint with full contentBlocks. if (currentChat && result.conversationId) {
// Server only updates conversationId here to avoid overwriting client's richer save. await db
if (currentChat) { .update(copilotChats)
// Persist only a safe conversationId to avoid continuing from a state that expects tool outputs .set({
const previousConversationId = currentChat?.conversationId as string | undefined updatedAt: new Date(),
const responseId = lastSafeDoneResponseId || previousConversationId || undefined conversationId: result.conversationId,
})
if (responseId) { .where(eq(copilotChats.id, actualChatId!))
await db
.update(copilotChats)
.set({
updatedAt: new Date(),
conversationId: responseId,
})
.where(eq(copilotChats.id, actualChatId!))
logger.info(
`[${tracker.requestId}] Updated conversationId for chat ${actualChatId}`,
{
updatedConversationId: responseId,
}
)
}
} }
} catch (error) { } catch (error) {
logger.error(`[${tracker.requestId}] Error processing stream:`, error) logger.error(`[${tracker.requestId}] Orchestration error:`, error)
controller.enqueue(
// Send an error event to the client before closing so it knows what happened encoder.encode(
try { `data: ${JSON.stringify({
const errorMessage = type: 'error',
error instanceof Error && error.message === 'terminated' data: {
? 'Connection to AI service was interrupted. Please try again.' displayMessage:
: 'An unexpected error occurred while processing the response.' 'An unexpected error occurred while processing the response.',
const encoder = new TextEncoder() },
})}\n\n`
// Send error as content so it shows in the chat
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ type: 'content', data: `\n\n_${errorMessage}_` })}\n\n`
)
) )
// Send done event to properly close the stream on client )
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`))
} catch (enqueueError) {
// Stream might already be closed, that's ok
logger.warn(
`[${tracker.requestId}] Could not send error event to client:`,
enqueueError
)
}
} finally { } finally {
try { controller.close()
controller.close()
} catch {
// Controller might already be closed
}
} }
}, },
}) })
const response = new Response(transformedStream, { return new Response(transformedStream, {
headers: { headers: {
'Content-Type': 'text/event-stream', 'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache', 'Cache-Control': 'no-cache',
@@ -859,43 +568,31 @@ export async function POST(req: NextRequest) {
'X-Accel-Buffering': 'no', 'X-Accel-Buffering': 'no',
}, },
}) })
logger.info(`[${tracker.requestId}] Returning streaming response to client`, {
duration: tracker.getDuration(),
chatId: actualChatId,
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
})
return response
} }
// For non-streaming responses const nonStreamingResult = await orchestrateCopilotStream(requestPayload, {
const responseData = await simAgentResponse.json() userId: authenticatedUserId,
logger.info(`[${tracker.requestId}] Non-streaming response from sim agent:`, { workflowId,
chatId: actualChatId,
autoExecuteTools: true,
interactive: true,
})
const responseData = {
content: nonStreamingResult.content,
toolCalls: nonStreamingResult.toolCalls,
model: selectedModel,
provider: providerConfig?.provider || env.COPILOT_PROVIDER || 'openai',
}
logger.info(`[${tracker.requestId}] Non-streaming response from orchestrator:`, {
hasContent: !!responseData.content, hasContent: !!responseData.content,
contentLength: responseData.content?.length || 0, contentLength: responseData.content?.length || 0,
model: responseData.model, model: responseData.model,
provider: responseData.provider, provider: responseData.provider,
toolCallsCount: responseData.toolCalls?.length || 0, toolCallsCount: responseData.toolCalls?.length || 0,
hasTokens: !!responseData.tokens,
}) })
// Log tool calls if present
if (responseData.toolCalls?.length > 0) {
responseData.toolCalls.forEach((toolCall: any) => {
logger.info(`[${tracker.requestId}] Tool call in response:`, {
id: toolCall.id,
name: toolCall.name,
success: toolCall.success,
result: `${JSON.stringify(toolCall.result).substring(0, 200)}...`,
})
})
}
// Save messages if we have a chat // Save messages if we have a chat
if (currentChat && responseData.content) { if (currentChat && responseData.content) {
const userMessage = { const userMessage = {
@@ -947,6 +644,9 @@ export async function POST(req: NextRequest) {
.set({ .set({
messages: updatedMessages, messages: updatedMessages,
updatedAt: new Date(), updatedAt: new Date(),
...(nonStreamingResult.conversationId
? { conversationId: nonStreamingResult.conversationId }
: {}),
}) })
.where(eq(copilotChats.id, actualChatId!)) .where(eq(copilotChats.id, actualChatId!))
} }

View File

@@ -0,0 +1,411 @@
import {
type CallToolResult,
ErrorCode,
type InitializeResult,
isJSONRPCNotification,
isJSONRPCRequest,
type JSONRPCError,
type JSONRPCMessage,
type JSONRPCResponse,
type ListToolsResult,
type RequestId,
} from '@modelcontextprotocol/sdk/types.js'
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { getCopilotModel } from '@/lib/copilot/config'
import { orchestrateSubagentStream } from '@/lib/copilot/orchestrator/subagent'
import { DIRECT_TOOL_DEFS, SUBAGENT_TOOL_DEFS } from '@/lib/copilot/tools/mcp/definitions'
import { executeToolServerSide, prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor'
const logger = createLogger('CopilotMcpAPI')
export const dynamic = 'force-dynamic'
/**
* MCP Server instructions that guide LLMs on how to use the Sim copilot tools.
* This is included in the initialize response to help external LLMs understand
* the workflow lifecycle and best practices.
*/
const MCP_SERVER_INSTRUCTIONS = `
## Sim Workflow Copilot - Usage Guide
You are interacting with Sim's workflow automation platform. These tools orchestrate specialized AI agents that build workflows. Follow these guidelines carefully.
---
## Platform Knowledge
Sim is a workflow automation platform. Workflows are visual pipelines of blocks.
### Block Types
**Core Logic:**
- **Agent** - The heart of Sim (LLM block with tools, memory, structured output, knowledge bases)
- **Function** - JavaScript code execution
- **Condition** - If/else branching
- **Router** - AI-powered content-based routing
- **Loop** - While/do-while iteration
- **Parallel** - Simultaneous execution
- **API** - HTTP requests
**Integrations (3rd Party):**
- OAuth: Slack, Gmail, Google Calendar, Sheets, Outlook, Linear, GitHub, Notion
- API: Stripe, Twilio, SendGrid, any REST API
### The Agent Block
The Agent block is the core of intelligent workflows:
- **Tools** - Add integrations, custom tools, web search to give it capabilities
- **Memory** - Multi-turn conversations with persistent context
- **Structured Output** - JSON schema for reliable parsing
- **Knowledge Bases** - RAG-powered document retrieval
**Design principle:** Put tools INSIDE agents rather than using standalone tool blocks.
### Triggers
| Type | Description |
|------|-------------|
| Manual/Chat | User sends message in UI (start block: input, files, conversationId) |
| API | REST endpoint with custom input schema |
| Webhook | External services POST to trigger URL |
| Schedule | Cron-based (hourly, daily, weekly) |
### Deployments
| Type | Trigger | Use Case |
|------|---------|----------|
| API | Start block | REST endpoint for programmatic access |
| Chat | Start block | Managed chat UI with auth options |
| MCP | Start block | Expose as MCP tool for AI agents |
| General | Schedule/Webhook | Activate triggers to run automatically |
**Undeployed workflows only run in the builder UI.**
### Variable Syntax
Reference outputs from previous blocks: \`<blockname.field>\`
Reference environment variables: \`{{ENV_VAR_NAME}}\`
Rules:
- Block names must be lowercase, no spaces, no special characters
- Use dot notation for nested fields: \`<blockname.field.subfield>\`
---
## Workflow Lifecycle
1. **Create**: For NEW workflows, FIRST call create_workflow to get a workflowId
2. **Plan**: Use copilot_plan with the workflowId to plan the workflow
3. **Edit**: Use copilot_edit with the workflowId AND the plan to build the workflow
4. **Deploy**: ALWAYS deploy after building using copilot_deploy before testing/running
5. **Test**: Use copilot_test to verify the workflow works correctly
6. **Share**: Provide the user with the workflow URL after completion
---
## CRITICAL: Always Pass workflowId
- For NEW workflows: Call create_workflow FIRST, then use the returned workflowId
- For EXISTING workflows: Pass the workflowId to all copilot tools
- copilot_plan, copilot_edit, copilot_deploy, copilot_test, copilot_debug all REQUIRE workflowId
---
## CRITICAL: How to Handle Plans
The copilot_plan tool returns a structured plan object. You MUST:
1. **Do NOT modify the plan**: Pass the plan object EXACTLY as returned to copilot_edit
2. **Do NOT interpret or summarize the plan**: The edit agent needs the raw plan data
3. **Pass the plan in the context.plan field**: \`{ "context": { "plan": <plan_object> } }\`
4. **Include ALL plan data**: Block configurations, connections, credentials, everything
Example flow:
\`\`\`
1. copilot_plan({ request: "build a workflow...", workflowId: "abc123" })
-> Returns: { "plan": { "blocks": [...], "connections": [...], ... } }
2. copilot_edit({
workflowId: "abc123",
message: "Execute the plan",
context: { "plan": <EXACT plan object from step 1> }
})
\`\`\`
**Why this matters**: The plan contains technical details (block IDs, field mappings, API schemas) that the edit agent needs verbatim. Summarizing or rephrasing loses critical information.
---
## CRITICAL: Error Handling
**If the user says "doesn't work", "broke", "failed", "error" → ALWAYS use copilot_debug FIRST.**
Don't guess. Don't plan. Debug first to find the actual problem.
---
## Important Rules
- ALWAYS deploy a workflow before attempting to run or test it
- Workflows must be deployed to have an "active deployment" for execution
- After building, call copilot_deploy with the appropriate deployment type (api, chat, or mcp)
- Return the workflow URL to the user so they can access it in Sim
---
## Quick Operations (use direct tools)
- list_workflows, list_workspaces, list_folders, get_workflow: Fast database queries
- create_workflow: Create new workflow and get workflowId (CALL THIS FIRST for new workflows)
- create_folder: Create new resources
## Workflow Building (use copilot tools)
- copilot_plan: Plan workflow changes (REQUIRES workflowId) - returns a plan object
- copilot_edit: Execute the plan (REQUIRES workflowId AND plan from copilot_plan)
- copilot_deploy: Deploy workflows (REQUIRES workflowId)
- copilot_test: Test workflow execution (REQUIRES workflowId)
- copilot_debug: Diagnose errors (REQUIRES workflowId) - USE THIS FIRST for issues
`
function createResponse(id: RequestId, result: unknown): JSONRPCResponse {
return {
jsonrpc: '2.0',
id,
result: result as JSONRPCResponse['result'],
}
}
function createError(id: RequestId, code: ErrorCode | number, message: string): JSONRPCError {
return {
jsonrpc: '2.0',
id,
error: { code, message },
}
}
export async function GET() {
return NextResponse.json({
name: 'copilot-subagents',
version: '1.0.0',
protocolVersion: '2024-11-05',
capabilities: { tools: {} },
})
}
export async function POST(request: NextRequest) {
try {
const auth = await checkHybridAuth(request, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const body = (await request.json()) as JSONRPCMessage
if (isJSONRPCNotification(body)) {
return new NextResponse(null, { status: 202 })
}
if (!isJSONRPCRequest(body)) {
return NextResponse.json(
createError(0, ErrorCode.InvalidRequest, 'Invalid JSON-RPC message'),
{ status: 400 }
)
}
const { id, method, params } = body
switch (method) {
case 'initialize': {
const result: InitializeResult = {
protocolVersion: '2024-11-05',
capabilities: { tools: {} },
serverInfo: { name: 'sim-copilot', version: '1.0.0' },
instructions: MCP_SERVER_INSTRUCTIONS,
}
return NextResponse.json(createResponse(id, result))
}
case 'ping':
return NextResponse.json(createResponse(id, {}))
case 'tools/list':
return handleToolsList(id)
case 'tools/call':
return handleToolsCall(
id,
params as { name: string; arguments?: Record<string, unknown> },
auth.userId
)
default:
return NextResponse.json(
createError(id, ErrorCode.MethodNotFound, `Method not found: ${method}`),
{ status: 404 }
)
}
} catch (error) {
logger.error('Error handling MCP request', { error })
return NextResponse.json(createError(0, ErrorCode.InternalError, 'Internal error'), {
status: 500,
})
}
}
async function handleToolsList(id: RequestId): Promise<NextResponse> {
const directTools = DIRECT_TOOL_DEFS.map((tool) => ({
name: tool.name,
description: tool.description,
inputSchema: tool.inputSchema,
}))
const subagentTools = SUBAGENT_TOOL_DEFS.map((tool) => ({
name: tool.name,
description: tool.description,
inputSchema: tool.inputSchema,
}))
const result: ListToolsResult = {
tools: [...directTools, ...subagentTools],
}
return NextResponse.json(createResponse(id, result))
}
async function handleToolsCall(
id: RequestId,
params: { name: string; arguments?: Record<string, unknown> },
userId: string
): Promise<NextResponse> {
const args = params.arguments || {}
// Check if this is a direct tool (fast, no LLM)
const directTool = DIRECT_TOOL_DEFS.find((tool) => tool.name === params.name)
if (directTool) {
return handleDirectToolCall(id, directTool, args, userId)
}
// Check if this is a subagent tool (uses LLM orchestration)
const subagentTool = SUBAGENT_TOOL_DEFS.find((tool) => tool.name === params.name)
if (subagentTool) {
return handleSubagentToolCall(id, subagentTool, args, userId)
}
return NextResponse.json(
createError(id, ErrorCode.MethodNotFound, `Tool not found: ${params.name}`),
{ status: 404 }
)
}
async function handleDirectToolCall(
id: RequestId,
toolDef: (typeof DIRECT_TOOL_DEFS)[number],
args: Record<string, unknown>,
userId: string
): Promise<NextResponse> {
try {
const execContext = await prepareExecutionContext(userId, (args.workflowId as string) || '')
const toolCall = {
id: crypto.randomUUID(),
name: toolDef.toolId,
status: 'pending' as const,
params: args as Record<string, any>,
startTime: Date.now(),
}
const result = await executeToolServerSide(toolCall, execContext)
const response: CallToolResult = {
content: [
{
type: 'text',
text: JSON.stringify(result.output ?? result, null, 2),
},
],
isError: !result.success,
}
return NextResponse.json(createResponse(id, response))
} catch (error) {
logger.error('Direct tool execution failed', { tool: toolDef.name, error })
return NextResponse.json(
createError(id, ErrorCode.InternalError, `Tool execution failed: ${error}`),
{ status: 500 }
)
}
}
async function handleSubagentToolCall(
id: RequestId,
toolDef: (typeof SUBAGENT_TOOL_DEFS)[number],
args: Record<string, unknown>,
userId: string
): Promise<NextResponse> {
const requestText =
(args.request as string) ||
(args.message as string) ||
(args.error as string) ||
JSON.stringify(args)
const context = (args.context as Record<string, unknown>) || {}
if (args.plan && !context.plan) {
context.plan = args.plan
}
const { model } = getCopilotModel('chat')
const result = await orchestrateSubagentStream(
toolDef.agentId,
{
message: requestText,
workflowId: args.workflowId,
workspaceId: args.workspaceId,
context,
model,
// Signal to the copilot backend that this is a headless request
// so it can enforce workflowId requirements on tools
headless: true,
},
{
userId,
workflowId: args.workflowId as string | undefined,
workspaceId: args.workspaceId as string | undefined,
}
)
// When a respond tool (plan_respond, edit_respond, etc.) was used,
// return only the structured result - not the full result with all internal tool calls.
// This provides clean output for MCP consumers.
let responseData: unknown
if (result.structuredResult) {
responseData = {
success: result.structuredResult.success ?? result.success,
type: result.structuredResult.type,
summary: result.structuredResult.summary,
data: result.structuredResult.data,
}
} else if (result.error) {
responseData = {
success: false,
error: result.error,
errors: result.errors,
}
} else {
// Fallback: return content if no structured result
responseData = {
success: result.success,
content: result.content,
}
}
const response: CallToolResult = {
content: [
{
type: 'text',
text: JSON.stringify(responseData, null, 2),
},
],
isError: !result.success,
}
return NextResponse.json(createResponse(id, response))
}

View File

@@ -0,0 +1,107 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { authenticateV1Request } from '@/app/api/v1/auth'
import { getCopilotModel } from '@/lib/copilot/config'
import { SIM_AGENT_VERSION } from '@/lib/copilot/constants'
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
import { resolveWorkflowIdForUser } from '@/lib/workflows/utils'
const logger = createLogger('CopilotHeadlessAPI')
const RequestSchema = z.object({
message: z.string().min(1, 'message is required'),
workflowId: z.string().optional(),
workflowName: z.string().optional(),
chatId: z.string().optional(),
mode: z.enum(COPILOT_REQUEST_MODES).optional().default('agent'),
model: z.string().optional(),
autoExecuteTools: z.boolean().optional().default(true),
timeout: z.number().optional().default(300000),
})
/**
* POST /api/v1/copilot/chat
* Headless copilot endpoint for server-side orchestration.
*
* workflowId is optional - if not provided:
* - If workflowName is provided, finds that workflow
* - Otherwise uses the user's first workflow as context
* - The copilot can still operate on any workflow using list_user_workflows
*/
export async function POST(req: NextRequest) {
const auth = await authenticateV1Request(req)
if (!auth.authenticated || !auth.userId) {
return NextResponse.json({ success: false, error: auth.error || 'Unauthorized' }, { status: 401 })
}
try {
const body = await req.json()
const parsed = RequestSchema.parse(body)
const defaults = getCopilotModel('chat')
const selectedModel = parsed.model || defaults.model
// Resolve workflow ID
const resolved = await resolveWorkflowIdForUser(auth.userId, parsed.workflowId, parsed.workflowName)
if (!resolved) {
return NextResponse.json(
{ success: false, error: 'No workflows found. Create a workflow first or provide a valid workflowId.' },
{ status: 400 }
)
}
// Transform mode to transport mode (same as client API)
// build and agent both map to 'agent' on the backend
const effectiveMode = parsed.mode === 'agent' ? 'build' : parsed.mode
const transportMode = effectiveMode === 'build' ? 'agent' : effectiveMode
// Always generate a chatId - required for artifacts system to work with subagents
const chatId = parsed.chatId || crypto.randomUUID()
const requestPayload = {
message: parsed.message,
workflowId: resolved.workflowId,
userId: auth.userId,
stream: true,
streamToolCalls: true,
model: selectedModel,
mode: transportMode,
messageId: crypto.randomUUID(),
version: SIM_AGENT_VERSION,
headless: true, // Enable cross-workflow operations via workflowId params
chatId,
}
const result = await orchestrateCopilotStream(requestPayload, {
userId: auth.userId,
workflowId: resolved.workflowId,
chatId,
autoExecuteTools: parsed.autoExecuteTools,
timeout: parsed.timeout,
interactive: false,
})
return NextResponse.json({
success: result.success,
content: result.content,
toolCalls: result.toolCalls,
chatId: result.chatId || chatId, // Return the chatId for conversation continuity
conversationId: result.conversationId,
error: result.error,
})
} catch (error) {
if (error instanceof z.ZodError) {
return NextResponse.json(
{ success: false, error: 'Invalid request', details: error.errors },
{ status: 400 }
)
}
logger.error('Headless copilot request failed', {
error: error instanceof Error ? error.message : String(error),
})
return NextResponse.json({ success: false, error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -1,5 +1,6 @@
'use client' 'use client'
import { createLogger } from '@sim/logger'
import { memo, useEffect, useMemo, useRef, useState } from 'react' import { memo, useEffect, useMemo, useRef, useState } from 'react'
import clsx from 'clsx' import clsx from 'clsx'
import { ChevronUp, LayoutList } from 'lucide-react' import { ChevronUp, LayoutList } from 'lucide-react'
@@ -1259,99 +1260,39 @@ function shouldShowRunSkipButtons(toolCall: CopilotToolCall): boolean {
return false return false
} }
const toolCallLogger = createLogger('CopilotToolCall')
async function sendToolDecision(toolCallId: string, status: 'accepted' | 'rejected' | 'background') {
try {
await fetch('/api/copilot/confirm', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ toolCallId, status }),
})
} catch (error) {
toolCallLogger.warn('Failed to send tool decision', {
toolCallId,
status,
error: error instanceof Error ? error.message : String(error),
})
}
}
async function handleRun( async function handleRun(
toolCall: CopilotToolCall, toolCall: CopilotToolCall,
setToolCallState: any, setToolCallState: any,
onStateChange?: any, onStateChange?: any,
editedParams?: any editedParams?: any
) { ) {
const instance = getClientTool(toolCall.id) setToolCallState(toolCall, 'executing', editedParams ? { params: editedParams } : undefined)
onStateChange?.('executing')
if (!instance && isIntegrationTool(toolCall.name)) { await sendToolDecision(toolCall.id, 'accepted')
onStateChange?.('executing')
try {
await useCopilotStore.getState().executeIntegrationTool(toolCall.id)
} catch (e) {
setToolCallState(toolCall, 'error', { error: e instanceof Error ? e.message : String(e) })
onStateChange?.('error')
try {
await fetch('/api/copilot/tools/mark-complete', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
id: toolCall.id,
name: toolCall.name,
status: 500,
message: e instanceof Error ? e.message : 'Tool execution failed',
data: { error: e instanceof Error ? e.message : String(e) },
}),
})
} catch {
console.error('[handleRun] Failed to notify backend of tool error:', toolCall.id)
}
}
return
}
if (!instance) return
try {
const mergedParams =
editedParams ||
(toolCall as any).params ||
(toolCall as any).parameters ||
(toolCall as any).input ||
{}
await instance.handleAccept?.(mergedParams)
onStateChange?.('executing')
} catch (e) {
setToolCallState(toolCall, 'error', { error: e instanceof Error ? e.message : String(e) })
}
} }
async function handleSkip(toolCall: CopilotToolCall, setToolCallState: any, onStateChange?: any) { async function handleSkip(toolCall: CopilotToolCall, setToolCallState: any, onStateChange?: any) {
const instance = getClientTool(toolCall.id)
if (!instance && isIntegrationTool(toolCall.name)) {
setToolCallState(toolCall, 'rejected')
onStateChange?.('rejected')
let notified = false
for (let attempt = 0; attempt < 3 && !notified; attempt++) {
try {
const res = await fetch('/api/copilot/tools/mark-complete', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
id: toolCall.id,
name: toolCall.name,
status: 400,
message: 'Tool execution skipped by user',
data: { skipped: true, reason: 'user_skipped' },
}),
})
if (res.ok) {
notified = true
}
} catch (e) {
if (attempt < 2) {
await new Promise((resolve) => setTimeout(resolve, 500))
}
}
}
if (!notified) {
console.error('[handleSkip] Failed to notify backend after 3 attempts:', toolCall.id)
}
return
}
if (instance) {
try {
await instance.handleReject?.()
} catch {}
}
setToolCallState(toolCall, 'rejected') setToolCallState(toolCall, 'rejected')
onStateChange?.('rejected') onStateChange?.('rejected')
await sendToolDecision(toolCall.id, 'rejected')
} }
function getDisplayName(toolCall: CopilotToolCall): string { function getDisplayName(toolCall: CopilotToolCall): string {
@@ -1511,7 +1452,7 @@ export function ToolCall({
// Check if this integration tool is auto-allowed // Check if this integration tool is auto-allowed
// Subscribe to autoAllowedTools so we re-render when it changes // Subscribe to autoAllowedTools so we re-render when it changes
const autoAllowedTools = useCopilotStore((s) => s.autoAllowedTools) const autoAllowedTools = useCopilotStore((s) => s.autoAllowedTools)
const { removeAutoAllowedTool } = useCopilotStore() const { removeAutoAllowedTool, setToolCallState } = useCopilotStore()
const isAutoAllowed = isIntegrationTool(toolCall.name) && autoAllowedTools.includes(toolCall.name) const isAutoAllowed = isIntegrationTool(toolCall.name) && autoAllowedTools.includes(toolCall.name)
// Update edited params when toolCall params change (deep comparison to avoid resetting user edits on ref change) // Update edited params when toolCall params change (deep comparison to avoid resetting user edits on ref change)
@@ -2213,16 +2154,9 @@ export function ToolCall({
<div className='mt-[10px]'> <div className='mt-[10px]'>
<Button <Button
onClick={async () => { onClick={async () => {
try { setToolCallState(toolCall, ClientToolCallState.background)
const instance = getClientTool(toolCall.id) onStateChange?.('background')
instance?.setState?.((ClientToolCallState as any).background) await sendToolDecision(toolCall.id, 'background')
await instance?.markToolComplete?.(
200,
'The user has chosen to move the workflow execution to the background. Check back with them later to know when the workflow execution is complete'
)
forceUpdate({})
onStateChange?.('background')
} catch {}
}} }}
variant='tertiary' variant='tertiary'
title='Move to Background' title='Move to Background'
@@ -2234,21 +2168,9 @@ export function ToolCall({
<div className='mt-[10px]'> <div className='mt-[10px]'>
<Button <Button
onClick={async () => { onClick={async () => {
try { setToolCallState(toolCall, ClientToolCallState.background)
const instance = getClientTool(toolCall.id) onStateChange?.('background')
const elapsedSeconds = instance?.getElapsedSeconds?.() || 0 await sendToolDecision(toolCall.id, 'background')
instance?.setState?.((ClientToolCallState as any).background, {
result: { _elapsedSeconds: elapsedSeconds },
})
const { updateToolCallParams } = useCopilotStore.getState()
updateToolCallParams?.(toolCall.id, { _elapsedSeconds: Math.round(elapsedSeconds) })
await instance?.markToolComplete?.(
200,
`User woke you up after ${Math.round(elapsedSeconds)} seconds`
)
forceUpdate({})
onStateChange?.('background')
} catch {}
}} }}
variant='tertiary' variant='tertiary'
title='Wake' title='Wake'

View File

@@ -0,0 +1,37 @@
export const INTERRUPT_TOOL_NAMES = [
'set_global_workflow_variables',
'run_workflow',
'manage_mcp_tool',
'manage_custom_tool',
'deploy_mcp',
'deploy_chat',
'deploy_api',
'create_workspace_mcp_server',
'set_environment_variables',
'make_api_request',
'oauth_request_access',
'navigate_ui',
'knowledge_base',
] as const
export const INTERRUPT_TOOL_SET = new Set<string>(INTERRUPT_TOOL_NAMES)
export const SUBAGENT_TOOL_NAMES = [
'debug',
'edit',
'plan',
'test',
'deploy',
'auth',
'research',
'knowledge',
'custom_tool',
'tour',
'info',
'workflow',
'evaluate',
'superagent',
] as const
export const SUBAGENT_TOOL_SET = new Set<string>(SUBAGENT_TOOL_NAMES)

View File

@@ -0,0 +1,181 @@
import { createLogger } from '@sim/logger'
import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants'
import { env } from '@/lib/core/config/env'
import { parseSSEStream } from '@/lib/copilot/orchestrator/sse-parser'
import {
handleSubagentRouting,
sseHandlers,
subAgentHandlers,
} from '@/lib/copilot/orchestrator/sse-handlers'
import { prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor'
import type {
ExecutionContext,
OrchestratorOptions,
OrchestratorResult,
SSEEvent,
StreamingContext,
ToolCallSummary,
} from '@/lib/copilot/orchestrator/types'
const logger = createLogger('CopilotOrchestrator')
const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
export interface OrchestrateStreamOptions extends OrchestratorOptions {
userId: string
workflowId: string
chatId?: string
}
/**
* Orchestrate a copilot SSE stream and execute tool calls server-side.
*/
export async function orchestrateCopilotStream(
requestPayload: Record<string, any>,
options: OrchestrateStreamOptions
): Promise<OrchestratorResult> {
const { userId, workflowId, chatId, timeout = 300000, abortSignal } = options
const execContext = await prepareExecutionContext(userId, workflowId)
const context: StreamingContext = {
chatId,
conversationId: undefined,
messageId: requestPayload?.messageId || crypto.randomUUID(),
accumulatedContent: '',
contentBlocks: [],
toolCalls: new Map(),
currentThinkingBlock: null,
isInThinkingBlock: false,
subAgentParentToolCallId: undefined,
subAgentContent: {},
subAgentToolCalls: {},
pendingContent: '',
streamComplete: false,
wasAborted: false,
errors: [],
}
try {
const response = await fetch(`${SIM_AGENT_API_URL}/api/chat-completion-streaming`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
},
body: JSON.stringify(requestPayload),
signal: abortSignal,
})
if (!response.ok) {
const errorText = await response.text().catch(() => '')
throw new Error(`Copilot backend error (${response.status}): ${errorText || response.statusText}`)
}
if (!response.body) {
throw new Error('Copilot backend response missing body')
}
const reader = response.body.getReader()
const decoder = new TextDecoder()
const timeoutId = setTimeout(() => {
context.errors.push('Request timed out')
context.streamComplete = true
reader.cancel().catch(() => {})
}, timeout)
try {
for await (const event of parseSSEStream(reader, decoder, abortSignal)) {
if (abortSignal?.aborted) {
context.wasAborted = true
break
}
await forwardEvent(event, options)
if (event.type === 'subagent_start') {
const toolCallId = event.data?.tool_call_id
if (toolCallId) {
context.subAgentParentToolCallId = toolCallId
context.subAgentContent[toolCallId] = ''
context.subAgentToolCalls[toolCallId] = []
}
continue
}
if (event.type === 'subagent_end') {
context.subAgentParentToolCallId = undefined
continue
}
if (handleSubagentRouting(event, context)) {
const handler = subAgentHandlers[event.type]
if (handler) {
await handler(event, context, execContext, options)
}
if (context.streamComplete) break
continue
}
const handler = sseHandlers[event.type]
if (handler) {
await handler(event, context, execContext, options)
}
if (context.streamComplete) break
}
} finally {
clearTimeout(timeoutId)
}
const result = buildResult(context)
await options.onComplete?.(result)
return result
} catch (error) {
const err = error instanceof Error ? error : new Error('Copilot orchestration failed')
logger.error('Copilot orchestration failed', { error: err.message })
await options.onError?.(err)
return {
success: false,
content: '',
contentBlocks: [],
toolCalls: [],
chatId: context.chatId,
conversationId: context.conversationId,
error: err.message,
}
}
}
async function forwardEvent(event: SSEEvent, options: OrchestratorOptions): Promise<void> {
try {
await options.onEvent?.(event)
} catch (error) {
logger.warn('Failed to forward SSE event', {
type: event.type,
error: error instanceof Error ? error.message : String(error),
})
}
}
function buildResult(context: StreamingContext): OrchestratorResult {
const toolCalls: ToolCallSummary[] = Array.from(context.toolCalls.values()).map((toolCall) => ({
id: toolCall.id,
name: toolCall.name,
status: toolCall.status,
params: toolCall.params,
result: toolCall.result?.output,
error: toolCall.error,
durationMs:
toolCall.endTime && toolCall.startTime ? toolCall.endTime - toolCall.startTime : undefined,
}))
return {
success: context.errors.length === 0,
content: context.accumulatedContent,
contentBlocks: context.contentBlocks,
toolCalls,
chatId: context.chatId,
conversationId: context.conversationId,
errors: context.errors.length ? context.errors : undefined,
}
}

View File

@@ -0,0 +1,138 @@
import { db } from '@sim/db'
import { copilotChats } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import { getRedisClient } from '@/lib/core/config/redis'
const logger = createLogger('CopilotOrchestratorPersistence')
/**
* Create a new copilot chat record.
*/
export async function createChat(params: {
userId: string
workflowId: string
model: string
}): Promise<{ id: string }> {
const [chat] = await db
.insert(copilotChats)
.values({
userId: params.userId,
workflowId: params.workflowId,
model: params.model,
messages: [],
})
.returning({ id: copilotChats.id })
return { id: chat.id }
}
/**
* Load an existing chat for a user.
*/
export async function loadChat(chatId: string, userId: string) {
const [chat] = await db
.select()
.from(copilotChats)
.where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, userId)))
.limit(1)
return chat || null
}
/**
* Save chat messages and metadata.
*/
export async function saveMessages(
chatId: string,
messages: any[],
options?: {
title?: string
conversationId?: string
planArtifact?: string | null
config?: { mode?: string; model?: string }
}
): Promise<void> {
await db
.update(copilotChats)
.set({
messages,
updatedAt: new Date(),
...(options?.title ? { title: options.title } : {}),
...(options?.conversationId ? { conversationId: options.conversationId } : {}),
...(options?.planArtifact !== undefined ? { planArtifact: options.planArtifact } : {}),
...(options?.config ? { config: options.config } : {}),
})
.where(eq(copilotChats.id, chatId))
}
/**
* Update the conversationId for a chat without overwriting messages.
*/
export async function updateChatConversationId(chatId: string, conversationId: string): Promise<void> {
await db
.update(copilotChats)
.set({
conversationId,
updatedAt: new Date(),
})
.where(eq(copilotChats.id, chatId))
}
/**
* Set a tool call confirmation status in Redis.
*/
export async function setToolConfirmation(
toolCallId: string,
status: 'accepted' | 'rejected' | 'background' | 'pending',
message?: string
): Promise<boolean> {
const redis = getRedisClient()
if (!redis) {
logger.warn('Redis client not available for tool confirmation')
return false
}
const key = `tool_call:${toolCallId}`
const payload = {
status,
message: message || null,
timestamp: new Date().toISOString(),
}
try {
await redis.set(key, JSON.stringify(payload), 'EX', 86400)
return true
} catch (error) {
logger.error('Failed to set tool confirmation', {
toolCallId,
error: error instanceof Error ? error.message : String(error),
})
return false
}
}
/**
* Get a tool call confirmation status from Redis.
*/
export async function getToolConfirmation(toolCallId: string): Promise<{
status: string
message?: string
timestamp?: string
} | null> {
const redis = getRedisClient()
if (!redis) return null
try {
const data = await redis.get(`tool_call:${toolCallId}`)
if (!data) return null
return JSON.parse(data) as { status: string; message?: string; timestamp?: string }
} catch (error) {
logger.error('Failed to read tool confirmation', {
toolCallId,
error: error instanceof Error ? error.message : String(error),
})
return null
}
}

View File

@@ -0,0 +1,439 @@
import { createLogger } from '@sim/logger'
import type {
ContentBlock,
ExecutionContext,
OrchestratorOptions,
SSEEvent,
StreamingContext,
ToolCallState,
} from '@/lib/copilot/orchestrator/types'
import { executeToolServerSide, markToolComplete } from '@/lib/copilot/orchestrator/tool-executor'
import { getToolConfirmation } from '@/lib/copilot/orchestrator/persistence'
import { INTERRUPT_TOOL_SET, SUBAGENT_TOOL_SET } from '@/lib/copilot/orchestrator/config'
const logger = createLogger('CopilotSseHandlers')
/**
* Respond tools are internal to the copilot's subagent system.
* They're used by subagents to signal completion and should NOT be executed by the sim side.
* The copilot backend handles these internally.
*/
const RESPOND_TOOL_SET = new Set([
'plan_respond',
'edit_respond',
'debug_respond',
'info_respond',
'research_respond',
'deploy_respond',
'superagent_respond',
'discovery_respond',
])
export type SSEHandler = (
event: SSEEvent,
context: StreamingContext,
execContext: ExecutionContext,
options: OrchestratorOptions
) => void | Promise<void>
function addContentBlock(
context: StreamingContext,
block: Omit<ContentBlock, 'timestamp'>
): void {
context.contentBlocks.push({
...block,
timestamp: Date.now(),
})
}
async function executeToolAndReport(
toolCallId: string,
context: StreamingContext,
execContext: ExecutionContext,
options?: OrchestratorOptions
): Promise<void> {
const toolCall = context.toolCalls.get(toolCallId)
if (!toolCall) return
if (toolCall.status === 'executing') return
toolCall.status = 'executing'
try {
const result = await executeToolServerSide(toolCall, execContext)
toolCall.status = result.success ? 'success' : 'error'
toolCall.result = result
toolCall.error = result.error
toolCall.endTime = Date.now()
// If create_workflow was successful, update the execution context with the new workflowId
// This ensures subsequent tools in the same stream have access to the workflowId
if (
toolCall.name === 'create_workflow' &&
result.success &&
result.output?.workflowId &&
!execContext.workflowId
) {
execContext.workflowId = result.output.workflowId
if (result.output.workspaceId) {
execContext.workspaceId = result.output.workspaceId
}
}
await markToolComplete(
toolCall.id,
toolCall.name,
result.success ? 200 : 500,
result.error || (result.success ? 'Tool completed' : 'Tool failed'),
result.output
)
await options?.onEvent?.({
type: 'tool_result',
toolCallId: toolCall.id,
data: {
id: toolCall.id,
name: toolCall.name,
success: result.success,
result: result.output,
},
})
} catch (error) {
toolCall.status = 'error'
toolCall.error = error instanceof Error ? error.message : String(error)
toolCall.endTime = Date.now()
await markToolComplete(toolCall.id, toolCall.name, 500, toolCall.error)
await options?.onEvent?.({
type: 'tool_error',
toolCallId: toolCall.id,
data: {
id: toolCall.id,
name: toolCall.name,
error: toolCall.error,
},
})
}
}
async function waitForToolDecision(
toolCallId: string,
timeoutMs: number
): Promise<{ status: string; message?: string } | null> {
const start = Date.now()
while (Date.now() - start < timeoutMs) {
const decision = await getToolConfirmation(toolCallId)
if (decision?.status) {
return decision
}
await new Promise((resolve) => setTimeout(resolve, 100))
}
return null
}
export const sseHandlers: Record<string, SSEHandler> = {
chat_id: (event, context) => {
context.chatId = event.data?.chatId
},
title_updated: () => {},
tool_result: (event, context) => {
const toolCallId = event.toolCallId || event.data?.id
if (!toolCallId) return
const current = context.toolCalls.get(toolCallId)
if (!current) return
// Determine success: explicit success field, or if there's result data without explicit failure
const hasExplicitSuccess = event.data?.success !== undefined || event.data?.result?.success !== undefined
const explicitSuccess = event.data?.success ?? event.data?.result?.success
const hasResultData = event.data?.result !== undefined || event.data?.data !== undefined
const hasError = !!event.data?.error || !!event.data?.result?.error
// If explicitly set, use that; otherwise infer from data presence
const success = hasExplicitSuccess ? !!explicitSuccess : (hasResultData && !hasError)
current.status = success ? 'success' : 'error'
current.endTime = Date.now()
if (hasResultData) {
current.result = {
success,
output: event.data?.result || event.data?.data,
}
}
if (hasError) {
current.error = event.data?.error || event.data?.result?.error
}
},
tool_error: (event, context) => {
const toolCallId = event.toolCallId || event.data?.id
if (!toolCallId) return
const current = context.toolCalls.get(toolCallId)
if (!current) return
current.status = 'error'
current.error = event.data?.error || 'Tool execution failed'
current.endTime = Date.now()
},
tool_generating: (event, context) => {
const toolCallId = event.toolCallId || event.data?.toolCallId || event.data?.id
const toolName = event.toolName || event.data?.toolName || event.data?.name
if (!toolCallId || !toolName) return
if (!context.toolCalls.has(toolCallId)) {
context.toolCalls.set(toolCallId, {
id: toolCallId,
name: toolName,
status: 'pending',
startTime: Date.now(),
})
}
},
tool_call: async (event, context, execContext, options) => {
const toolData = event.data || {}
const toolCallId = toolData.id || event.toolCallId
const toolName = toolData.name || event.toolName
if (!toolCallId || !toolName) return
const args = toolData.arguments || toolData.input || event.data?.input
const isPartial = toolData.partial === true
const existing = context.toolCalls.get(toolCallId)
const toolCall: ToolCallState = existing
? { ...existing, status: 'pending', params: args || existing.params }
: {
id: toolCallId,
name: toolName,
status: 'pending',
params: args,
startTime: Date.now(),
}
context.toolCalls.set(toolCallId, toolCall)
addContentBlock(context, { type: 'tool_call', toolCall })
if (isPartial) return
// Subagent tools are executed by the copilot backend, not sim side
if (SUBAGENT_TOOL_SET.has(toolName)) {
return
}
// Respond tools are internal to copilot's subagent system - skip execution
// The copilot backend handles these internally to signal subagent completion
if (RESPOND_TOOL_SET.has(toolName)) {
toolCall.status = 'success'
toolCall.endTime = Date.now()
toolCall.result = { success: true, output: 'Internal respond tool - handled by copilot backend' }
return
}
const isInterruptTool = INTERRUPT_TOOL_SET.has(toolName)
const isInteractive = options.interactive === true
if (isInterruptTool && isInteractive) {
const decision = await waitForToolDecision(toolCallId, options.timeout || 600000)
if (decision?.status === 'accepted' || decision?.status === 'success') {
await executeToolAndReport(toolCallId, context, execContext, options)
return
}
if (decision?.status === 'rejected' || decision?.status === 'error') {
toolCall.status = 'rejected'
toolCall.endTime = Date.now()
await markToolComplete(
toolCall.id,
toolCall.name,
400,
decision.message || 'Tool execution rejected',
{ skipped: true, reason: 'user_rejected' }
)
await options.onEvent?.({
type: 'tool_result',
toolCallId: toolCall.id,
data: {
id: toolCall.id,
name: toolCall.name,
success: false,
result: { skipped: true, reason: 'user_rejected' },
},
})
return
}
if (decision?.status === 'background') {
toolCall.status = 'skipped'
toolCall.endTime = Date.now()
await markToolComplete(
toolCall.id,
toolCall.name,
202,
decision.message || 'Tool execution moved to background',
{ background: true }
)
await options.onEvent?.({
type: 'tool_result',
toolCallId: toolCall.id,
data: {
id: toolCall.id,
name: toolCall.name,
success: true,
result: { background: true },
},
})
return
}
}
if (options.autoExecuteTools !== false) {
await executeToolAndReport(toolCallId, context, execContext, options)
}
},
reasoning: (event, context) => {
const phase = event.data?.phase || event.data?.data?.phase
if (phase === 'start') {
context.isInThinkingBlock = true
context.currentThinkingBlock = {
type: 'thinking',
content: '',
timestamp: Date.now(),
}
return
}
if (phase === 'end') {
if (context.currentThinkingBlock) {
context.contentBlocks.push(context.currentThinkingBlock)
}
context.isInThinkingBlock = false
context.currentThinkingBlock = null
return
}
const chunk = typeof event.data === 'string' ? event.data : event.data?.data || event.data?.content
if (!chunk || !context.currentThinkingBlock) return
context.currentThinkingBlock.content = `${context.currentThinkingBlock.content || ''}${chunk}`
},
content: (event, context) => {
const chunk = typeof event.data === 'string' ? event.data : event.data?.content || event.data?.data
if (!chunk) return
context.accumulatedContent += chunk
addContentBlock(context, { type: 'text', content: chunk })
},
done: (event, context) => {
if (event.data?.responseId) {
context.conversationId = event.data.responseId
}
context.streamComplete = true
},
start: (event, context) => {
if (event.data?.responseId) {
context.conversationId = event.data.responseId
}
},
error: (event, context) => {
const message =
event.data?.message || event.data?.error || (typeof event.data === 'string' ? event.data : null)
if (message) {
context.errors.push(message)
}
context.streamComplete = true
},
}
export const subAgentHandlers: Record<string, SSEHandler> = {
content: (event, context) => {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId || !event.data) return
const chunk = typeof event.data === 'string' ? event.data : event.data?.content || ''
if (!chunk) return
context.subAgentContent[parentToolCallId] = (context.subAgentContent[parentToolCallId] || '') + chunk
addContentBlock(context, { type: 'subagent_text', content: chunk })
},
tool_call: async (event, context, execContext, options) => {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId) return
const toolData = event.data || {}
const toolCallId = toolData.id || event.toolCallId
const toolName = toolData.name || event.toolName
if (!toolCallId || !toolName) return
const isPartial = toolData.partial === true
const args = toolData.arguments || toolData.input || event.data?.input
const toolCall: ToolCallState = {
id: toolCallId,
name: toolName,
status: 'pending',
params: args,
startTime: Date.now(),
}
// Store in both places - subAgentToolCalls for tracking and toolCalls for executeToolAndReport
if (!context.subAgentToolCalls[parentToolCallId]) {
context.subAgentToolCalls[parentToolCallId] = []
}
context.subAgentToolCalls[parentToolCallId].push(toolCall)
context.toolCalls.set(toolCallId, toolCall)
if (isPartial) return
// Respond tools are internal to copilot's subagent system - skip execution
if (RESPOND_TOOL_SET.has(toolName)) {
toolCall.status = 'success'
toolCall.endTime = Date.now()
toolCall.result = { success: true, output: 'Internal respond tool - handled by copilot backend' }
return
}
if (options.autoExecuteTools !== false) {
await executeToolAndReport(toolCallId, context, execContext, options)
}
},
tool_result: (event, context) => {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId) return
const toolCallId = event.toolCallId || event.data?.id
if (!toolCallId) return
// Update in subAgentToolCalls
const toolCalls = context.subAgentToolCalls[parentToolCallId] || []
const subAgentToolCall = toolCalls.find((tc) => tc.id === toolCallId)
// Also update in main toolCalls (where we added it for execution)
const mainToolCall = context.toolCalls.get(toolCallId)
// Use same success inference logic as main handler
const hasExplicitSuccess =
event.data?.success !== undefined || event.data?.result?.success !== undefined
const explicitSuccess = event.data?.success ?? event.data?.result?.success
const hasResultData = event.data?.result !== undefined || event.data?.data !== undefined
const hasError = !!event.data?.error || !!event.data?.result?.error
const success = hasExplicitSuccess ? !!explicitSuccess : hasResultData && !hasError
const status = success ? 'success' : 'error'
const endTime = Date.now()
const result = hasResultData
? { success, output: event.data?.result || event.data?.data }
: undefined
if (subAgentToolCall) {
subAgentToolCall.status = status
subAgentToolCall.endTime = endTime
if (result) subAgentToolCall.result = result
if (hasError) subAgentToolCall.error = event.data?.error || event.data?.result?.error
}
if (mainToolCall) {
mainToolCall.status = status
mainToolCall.endTime = endTime
if (result) mainToolCall.result = result
if (hasError) mainToolCall.error = event.data?.error || event.data?.result?.error
}
},
}
export function handleSubagentRouting(event: SSEEvent, context: StreamingContext): boolean {
if (!event.subagent) return false
if (!context.subAgentParentToolCallId) {
logger.warn('Subagent event missing parent tool call', {
type: event.type,
subagent: event.subagent,
})
return false
}
return true
}

View File

@@ -0,0 +1,72 @@
import { createLogger } from '@sim/logger'
import type { SSEEvent } from '@/lib/copilot/orchestrator/types'
const logger = createLogger('CopilotSseParser')
/**
* Parses SSE streams from the copilot backend into typed events.
*/
export async function* parseSSEStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
decoder: TextDecoder,
abortSignal?: AbortSignal
): AsyncGenerator<SSEEvent> {
let buffer = ''
try {
while (true) {
if (abortSignal?.aborted) {
logger.info('SSE stream aborted by signal')
break
}
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.trim()) continue
if (!line.startsWith('data: ')) continue
const jsonStr = line.slice(6)
if (jsonStr === '[DONE]') continue
try {
const event = JSON.parse(jsonStr) as SSEEvent
if (event?.type) {
yield event
}
} catch (error) {
logger.warn('Failed to parse SSE event', {
preview: jsonStr.slice(0, 200),
error: error instanceof Error ? error.message : String(error),
})
}
}
}
if (buffer.trim() && buffer.startsWith('data: ')) {
try {
const event = JSON.parse(buffer.slice(6)) as SSEEvent
if (event?.type) {
yield event
}
} catch (error) {
logger.warn('Failed to parse final SSE buffer', {
preview: buffer.slice(0, 200),
error: error instanceof Error ? error.message : String(error),
})
}
}
} finally {
try {
reader.releaseLock()
} catch {
logger.warn('Failed to release SSE reader lock')
}
}
}

View File

@@ -0,0 +1,239 @@
import { createLogger } from '@sim/logger'
import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants'
import { parseSSEStream } from '@/lib/copilot/orchestrator/sse-parser'
import {
sseHandlers,
subAgentHandlers,
handleSubagentRouting,
} from '@/lib/copilot/orchestrator/sse-handlers'
import { prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor'
import type {
ExecutionContext,
OrchestratorOptions,
SSEEvent,
StreamingContext,
ToolCallSummary,
} from '@/lib/copilot/orchestrator/types'
import { env } from '@/lib/core/config/env'
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
const logger = createLogger('CopilotSubagentOrchestrator')
const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
export interface SubagentOrchestratorOptions extends OrchestratorOptions {
userId: string
workflowId?: string
workspaceId?: string
}
export interface SubagentOrchestratorResult {
success: boolean
content: string
toolCalls: ToolCallSummary[]
structuredResult?: {
type?: string
summary?: string
data?: any
success?: boolean
}
error?: string
errors?: string[]
}
export async function orchestrateSubagentStream(
agentId: string,
requestPayload: Record<string, any>,
options: SubagentOrchestratorOptions
): Promise<SubagentOrchestratorResult> {
const { userId, workflowId, workspaceId, timeout = 300000, abortSignal } = options
const execContext = await buildExecutionContext(userId, workflowId, workspaceId)
const context: StreamingContext = {
chatId: undefined,
conversationId: undefined,
messageId: requestPayload?.messageId || crypto.randomUUID(),
accumulatedContent: '',
contentBlocks: [],
toolCalls: new Map(),
currentThinkingBlock: null,
isInThinkingBlock: false,
subAgentParentToolCallId: undefined,
subAgentContent: {},
subAgentToolCalls: {},
pendingContent: '',
streamComplete: false,
wasAborted: false,
errors: [],
}
let structuredResult: SubagentOrchestratorResult['structuredResult']
try {
const response = await fetch(`${SIM_AGENT_API_URL}/api/subagent/${agentId}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
},
body: JSON.stringify({ ...requestPayload, stream: true, userId }),
signal: abortSignal,
})
if (!response.ok) {
const errorText = await response.text().catch(() => '')
throw new Error(
`Copilot backend error (${response.status}): ${errorText || response.statusText}`
)
}
if (!response.body) {
throw new Error('Copilot backend response missing body')
}
const reader = response.body.getReader()
const decoder = new TextDecoder()
const timeoutId = setTimeout(() => {
context.errors.push('Request timed out')
context.streamComplete = true
reader.cancel().catch(() => {})
}, timeout)
try {
for await (const event of parseSSEStream(reader, decoder, abortSignal)) {
if (abortSignal?.aborted) {
context.wasAborted = true
break
}
await forwardEvent(event, options)
if (event.type === 'structured_result' || event.type === 'subagent_result') {
structuredResult = normalizeStructuredResult(event.data)
context.streamComplete = true
continue
}
// Handle subagent_start/subagent_end events to track nested subagent calls
if (event.type === 'subagent_start') {
const toolCallId = event.data?.tool_call_id
if (toolCallId) {
context.subAgentParentToolCallId = toolCallId
context.subAgentContent[toolCallId] = ''
context.subAgentToolCalls[toolCallId] = []
}
continue
}
if (event.type === 'subagent_end') {
context.subAgentParentToolCallId = undefined
continue
}
// For direct subagent calls, events may have the subagent field set (e.g., subagent: "discovery")
// but no subagent_start event because this IS the top-level agent. Skip subagent routing
// for events where the subagent field matches the current agentId - these are top-level events.
const isTopLevelSubagentEvent = event.subagent === agentId && !context.subAgentParentToolCallId
// Only route to subagent handlers for nested subagent events (not matching current agentId)
if (!isTopLevelSubagentEvent && handleSubagentRouting(event, context)) {
const handler = subAgentHandlers[event.type]
if (handler) {
await handler(event, context, execContext, options)
}
if (context.streamComplete) break
continue
}
// Process as a regular SSE event (including top-level subagent events)
const handler = sseHandlers[event.type]
if (handler) {
await handler(event, context, execContext, options)
}
if (context.streamComplete) break
}
} finally {
clearTimeout(timeoutId)
}
const result = buildResult(context, structuredResult)
await options.onComplete?.(result)
return result
} catch (error) {
const err = error instanceof Error ? error : new Error('Subagent orchestration failed')
logger.error('Subagent orchestration failed', { error: err.message, agentId })
await options.onError?.(err)
return {
success: false,
content: context.accumulatedContent,
toolCalls: [],
error: err.message,
}
}
}
async function forwardEvent(event: SSEEvent, options: OrchestratorOptions): Promise<void> {
try {
await options.onEvent?.(event)
} catch (error) {
logger.warn('Failed to forward SSE event', {
type: event.type,
error: error instanceof Error ? error.message : String(error),
})
}
}
function normalizeStructuredResult(data: any): SubagentOrchestratorResult['structuredResult'] {
if (!data || typeof data !== 'object') {
return undefined
}
return {
type: data.result_type || data.type,
summary: data.summary,
data: data.data ?? data,
success: data.success,
}
}
async function buildExecutionContext(
userId: string,
workflowId?: string,
workspaceId?: string
): Promise<ExecutionContext> {
if (workflowId) {
return prepareExecutionContext(userId, workflowId)
}
const decryptedEnvVars = await getEffectiveDecryptedEnv(userId, workspaceId)
return {
userId,
workflowId: workflowId || '',
workspaceId,
decryptedEnvVars,
}
}
function buildResult(
context: StreamingContext,
structuredResult?: SubagentOrchestratorResult['structuredResult']
): SubagentOrchestratorResult {
const toolCalls: ToolCallSummary[] = Array.from(context.toolCalls.values()).map((toolCall) => ({
id: toolCall.id,
name: toolCall.name,
status: toolCall.status,
params: toolCall.params,
result: toolCall.result?.output,
error: toolCall.error,
durationMs:
toolCall.endTime && toolCall.startTime ? toolCall.endTime - toolCall.startTime : undefined,
}))
return {
success: context.errors.length === 0 && !context.wasAborted,
content: context.accumulatedContent,
toolCalls,
structuredResult,
errors: context.errors.length ? context.errors : undefined,
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,129 @@
import type { CopilotProviderConfig } from '@/lib/copilot/types'
export type SSEEventType =
| 'chat_id'
| 'title_updated'
| 'content'
| 'reasoning'
| 'tool_call'
| 'tool_generating'
| 'tool_result'
| 'tool_error'
| 'subagent_start'
| 'subagent_end'
| 'structured_result'
| 'subagent_result'
| 'done'
| 'error'
| 'start'
export interface SSEEvent {
type: SSEEventType
data?: any
subagent?: string
toolCallId?: string
toolName?: string
}
export type ToolCallStatus = 'pending' | 'executing' | 'success' | 'error' | 'skipped' | 'rejected'
export interface ToolCallState {
id: string
name: string
status: ToolCallStatus
params?: Record<string, any>
result?: ToolCallResult
error?: string
startTime?: number
endTime?: number
}
export interface ToolCallResult {
success: boolean
output?: any
error?: string
}
export type ContentBlockType = 'text' | 'thinking' | 'tool_call' | 'subagent_text'
export interface ContentBlock {
type: ContentBlockType
content?: string
toolCall?: ToolCallState
timestamp: number
}
export interface StreamingContext {
chatId?: string
conversationId?: string
messageId: string
accumulatedContent: string
contentBlocks: ContentBlock[]
toolCalls: Map<string, ToolCallState>
currentThinkingBlock: ContentBlock | null
isInThinkingBlock: boolean
subAgentParentToolCallId?: string
subAgentContent: Record<string, string>
subAgentToolCalls: Record<string, ToolCallState[]>
pendingContent: string
streamComplete: boolean
wasAborted: boolean
errors: string[]
}
export interface OrchestratorRequest {
message: string
workflowId: string
userId: string
chatId?: string
mode?: 'agent' | 'ask' | 'plan'
model?: string
conversationId?: string
contexts?: Array<{ type: string; content: string }>
fileAttachments?: any[]
commands?: string[]
provider?: CopilotProviderConfig
streamToolCalls?: boolean
version?: string
prefetch?: boolean
userName?: string
}
export interface OrchestratorOptions {
autoExecuteTools?: boolean
timeout?: number
onEvent?: (event: SSEEvent) => void | Promise<void>
onComplete?: (result: OrchestratorResult) => void | Promise<void>
onError?: (error: Error) => void | Promise<void>
abortSignal?: AbortSignal
interactive?: boolean
}
export interface OrchestratorResult {
success: boolean
content: string
contentBlocks: ContentBlock[]
toolCalls: ToolCallSummary[]
chatId?: string
conversationId?: string
error?: string
errors?: string[]
}
export interface ToolCallSummary {
id: string
name: string
status: ToolCallStatus
params?: Record<string, any>
result?: any
error?: string
durationMs?: number
}
export interface ExecutionContext {
userId: string
workflowId: string
workspaceId?: string
decryptedEnvVars?: Record<string, string>
}

View File

@@ -5,7 +5,10 @@ import {
type BaseClientToolMetadata, type BaseClientToolMetadata,
ClientToolCallState, ClientToolCallState,
} from '@/lib/copilot/tools/client/base-tool' } from '@/lib/copilot/tools/client/base-tool'
import { sanitizeForCopilot } from '@/lib/workflows/sanitization/json-sanitizer' import {
formatWorkflowStateForCopilot,
normalizeWorkflowName,
} from '@/lib/copilot/tools/shared/workflow-utils'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store' import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
const logger = createLogger('GetWorkflowFromNameClientTool') const logger = createLogger('GetWorkflowFromNameClientTool')
@@ -67,11 +70,9 @@ export class GetWorkflowFromNameClientTool extends BaseClientTool {
// Try to find by name from registry first to get ID // Try to find by name from registry first to get ID
const registry = useWorkflowRegistry.getState() const registry = useWorkflowRegistry.getState()
const targetName = normalizeWorkflowName(workflowName)
const match = Object.values((registry as any).workflows || {}).find( const match = Object.values((registry as any).workflows || {}).find(
(w: any) => (w: any) => normalizeWorkflowName(w?.name) === targetName
String(w?.name || '')
.trim()
.toLowerCase() === workflowName.toLowerCase()
) as any ) as any
if (!match?.id) { if (!match?.id) {
@@ -98,15 +99,12 @@ export class GetWorkflowFromNameClientTool extends BaseClientTool {
} }
// Convert state to the same string format as get_user_workflow // Convert state to the same string format as get_user_workflow
const workflowState = { const userWorkflow = formatWorkflowStateForCopilot({
blocks: wf.state.blocks || {}, blocks: wf.state.blocks || {},
edges: wf.state.edges || [], edges: wf.state.edges || [],
loops: wf.state.loops || {}, loops: wf.state.loops || {},
parallels: wf.state.parallels || {}, parallels: wf.state.parallels || {},
} })
// Sanitize workflow state for copilot (remove UI-specific data)
const sanitizedState = sanitizeForCopilot(workflowState)
const userWorkflow = JSON.stringify(sanitizedState, null, 2)
await this.markToolComplete(200, `Retrieved workflow ${workflowName}`, { userWorkflow }) await this.markToolComplete(200, `Retrieved workflow ${workflowName}`, { userWorkflow })
this.setState(ClientToolCallState.success) this.setState(ClientToolCallState.success)

View File

@@ -5,6 +5,7 @@ import {
type BaseClientToolMetadata, type BaseClientToolMetadata,
ClientToolCallState, ClientToolCallState,
} from '@/lib/copilot/tools/client/base-tool' } from '@/lib/copilot/tools/client/base-tool'
import { extractWorkflowNames } from '@/lib/copilot/tools/shared/workflow-utils'
const logger = createLogger('ListUserWorkflowsClientTool') const logger = createLogger('ListUserWorkflowsClientTool')
@@ -41,9 +42,7 @@ export class ListUserWorkflowsClientTool extends BaseClientTool {
const json = await res.json() const json = await res.json()
const workflows = Array.isArray(json?.data) ? json.data : [] const workflows = Array.isArray(json?.data) ? json.data : []
const names = workflows const names = extractWorkflowNames(workflows)
.map((w: any) => (typeof w?.name === 'string' ? w.name : null))
.filter((n: string | null) => !!n)
logger.info('Found workflows', { count: names.length }) logger.info('Found workflows', { count: names.length })

View File

@@ -0,0 +1,466 @@
export type DirectToolDef = {
name: string
description: string
inputSchema: { type: 'object'; properties?: Record<string, unknown>; required?: string[] }
toolId: string
}
export type SubagentToolDef = {
name: string
description: string
inputSchema: { type: 'object'; properties?: Record<string, unknown>; required?: string[] }
agentId: string
}
/**
* Direct tools that execute immediately without LLM orchestration.
* These are fast database queries that don't need AI reasoning.
*/
export const DIRECT_TOOL_DEFS: DirectToolDef[] = [
{
name: 'list_workflows',
toolId: 'list_user_workflows',
description: 'List all workflows the user has access to. Returns workflow IDs, names, and workspace info.',
inputSchema: {
type: 'object',
properties: {
workspaceId: {
type: 'string',
description: 'Optional workspace ID to filter workflows.',
},
folderId: {
type: 'string',
description: 'Optional folder ID to filter workflows.',
},
},
},
},
{
name: 'list_workspaces',
toolId: 'list_user_workspaces',
description: 'List all workspaces the user has access to. Returns workspace IDs, names, and roles.',
inputSchema: {
type: 'object',
properties: {},
},
},
{
name: 'list_folders',
toolId: 'list_folders',
description: 'List all folders in a workspace.',
inputSchema: {
type: 'object',
properties: {
workspaceId: {
type: 'string',
description: 'Workspace ID to list folders from.',
},
},
required: ['workspaceId'],
},
},
{
name: 'get_workflow',
toolId: 'get_workflow_from_name',
description: 'Get a workflow by name or ID. Returns the full workflow definition.',
inputSchema: {
type: 'object',
properties: {
name: {
type: 'string',
description: 'Workflow name to search for.',
},
workflowId: {
type: 'string',
description: 'Workflow ID to retrieve directly.',
},
},
},
},
{
name: 'create_workflow',
toolId: 'create_workflow',
description: 'Create a new workflow. Returns the new workflow ID.',
inputSchema: {
type: 'object',
properties: {
name: {
type: 'string',
description: 'Name for the new workflow.',
},
workspaceId: {
type: 'string',
description: 'Optional workspace ID. Uses default workspace if not provided.',
},
folderId: {
type: 'string',
description: 'Optional folder ID to place the workflow in.',
},
description: {
type: 'string',
description: 'Optional description for the workflow.',
},
},
required: ['name'],
},
},
{
name: 'create_folder',
toolId: 'create_folder',
description: 'Create a new folder in a workspace.',
inputSchema: {
type: 'object',
properties: {
name: {
type: 'string',
description: 'Name for the new folder.',
},
workspaceId: {
type: 'string',
description: 'Optional workspace ID. Uses default workspace if not provided.',
},
parentId: {
type: 'string',
description: 'Optional parent folder ID for nested folders.',
},
},
required: ['name'],
},
},
]
export const SUBAGENT_TOOL_DEFS: SubagentToolDef[] = [
{
name: 'copilot_build',
agentId: 'build',
description: `Build a workflow end-to-end in a single step. This is the fast mode equivalent for headless/MCP usage.
USE THIS WHEN:
- Building a new workflow from scratch
- Modifying an existing workflow
- You want to gather information and build in one pass without separate plan→edit steps
WORKFLOW ID (REQUIRED):
- For NEW workflows: First call create_workflow to get a workflowId, then pass it here
- For EXISTING workflows: Always pass the workflowId parameter
CAN DO:
- Gather information about blocks, credentials, patterns
- Search documentation and patterns for best practices
- Add, modify, or remove blocks
- Configure block settings and connections
- Set environment variables and workflow variables
CANNOT DO:
- Run or test workflows (use copilot_test separately after deploying)
- Deploy workflows (use copilot_deploy separately)
WORKFLOW:
1. Call create_workflow to get a workflowId (for new workflows)
2. Call copilot_build with the request and workflowId
3. Build agent gathers info and builds in one pass
4. Call copilot_deploy to deploy the workflow
5. Optionally call copilot_test to verify it works`,
inputSchema: {
type: 'object',
properties: {
request: {
type: 'string',
description: 'What you want to build or modify in the workflow.',
},
workflowId: {
type: 'string',
description:
'REQUIRED. The workflow ID. For new workflows, call create_workflow first to get this.',
},
context: { type: 'object' },
},
required: ['request', 'workflowId'],
},
},
{
name: 'copilot_discovery',
agentId: 'discovery',
description: `Find workflows by their contents or functionality when the user doesn't know the exact name or ID.
USE THIS WHEN:
- User describes a workflow by what it does: "the one that sends emails", "my Slack notification workflow"
- User refers to workflow contents: "the workflow with the OpenAI block"
- User needs to search/match workflows by functionality or description
DO NOT USE (use direct tools instead):
- User knows the workflow name → use get_workflow
- User wants to list all workflows → use list_workflows
- User wants to list workspaces → use list_workspaces
- User wants to list folders → use list_folders`,
inputSchema: {
type: 'object',
properties: {
request: { type: 'string' },
workspaceId: { type: 'string' },
context: { type: 'object' },
},
required: ['request'],
},
},
{
name: 'copilot_plan',
agentId: 'plan',
description: `Plan workflow changes by gathering required information.
USE THIS WHEN:
- Building a new workflow
- Modifying an existing workflow
- You need to understand what blocks and integrations are available
- The workflow requires multiple blocks or connections
WORKFLOW ID (REQUIRED):
- For NEW workflows: First call create_workflow to get a workflowId, then pass it here
- For EXISTING workflows: Always pass the workflowId parameter
This tool gathers information about available blocks, credentials, and the current workflow state.
RETURNS: A plan object containing block configurations, connections, and technical details.
IMPORTANT: Pass the returned plan EXACTLY to copilot_edit - do not modify or summarize it.`,
inputSchema: {
type: 'object',
properties: {
request: { type: 'string', description: 'What you want to build or modify in the workflow.' },
workflowId: {
type: 'string',
description: 'REQUIRED. The workflow ID. For new workflows, call create_workflow first to get this.',
},
context: { type: 'object' },
},
required: ['request', 'workflowId'],
},
},
{
name: 'copilot_edit',
agentId: 'edit',
description: `Execute a workflow plan and apply edits.
USE THIS WHEN:
- You have a plan from copilot_plan that needs to be executed
- Building or modifying a workflow based on the plan
- Making changes to blocks, connections, or configurations
WORKFLOW ID (REQUIRED):
- You MUST provide the workflowId parameter
- For new workflows, get the workflowId from create_workflow first
PLAN (REQUIRED):
- Pass the EXACT plan object from copilot_plan in the context.plan field
- Do NOT modify, summarize, or interpret the plan - pass it verbatim
- The plan contains technical details the edit agent needs exactly as-is
IMPORTANT: After copilot_edit completes, you MUST call copilot_deploy before the workflow can be run or tested.`,
inputSchema: {
type: 'object',
properties: {
message: { type: 'string', description: 'Optional additional instructions for the edit.' },
workflowId: {
type: 'string',
description: 'REQUIRED. The workflow ID to edit. Get this from create_workflow for new workflows.',
},
plan: {
type: 'object',
description: 'The plan object from copilot_plan. Pass it EXACTLY as returned, do not modify.',
},
context: {
type: 'object',
description: 'Additional context. Put the plan in context.plan if not using the plan field directly.',
},
},
required: ['workflowId'],
},
},
{
name: 'copilot_debug',
agentId: 'debug',
description: `Diagnose errors or unexpected workflow behavior.
WORKFLOW ID (REQUIRED): Always provide the workflowId of the workflow to debug.`,
inputSchema: {
type: 'object',
properties: {
error: { type: 'string', description: 'The error message or description of the issue.' },
workflowId: { type: 'string', description: 'REQUIRED. The workflow ID to debug.' },
context: { type: 'object' },
},
required: ['error', 'workflowId'],
},
},
{
name: 'copilot_deploy',
agentId: 'deploy',
description: `Deploy or manage workflow deployments.
CRITICAL: You MUST deploy a workflow after building before it can be run or tested.
Workflows without an active deployment will fail with "no active deployment" error.
WORKFLOW ID (REQUIRED):
- Always provide the workflowId parameter
- This must match the workflow you built with copilot_edit
USE THIS:
- After copilot_edit completes to activate the workflow
- To update deployment settings
- To redeploy after making changes
DEPLOYMENT TYPES:
- "deploy as api" - REST API endpoint
- "deploy as chat" - Chat interface
- "deploy as mcp" - MCP server`,
inputSchema: {
type: 'object',
properties: {
request: {
type: 'string',
description: 'The deployment request, e.g. "deploy as api" or "deploy as chat"',
},
workflowId: {
type: 'string',
description: 'REQUIRED. The workflow ID to deploy.',
},
context: { type: 'object' },
},
required: ['request', 'workflowId'],
},
},
{
name: 'copilot_auth',
agentId: 'auth',
description: 'Handle OAuth connection flows.',
inputSchema: {
type: 'object',
properties: {
request: { type: 'string' },
context: { type: 'object' },
},
required: ['request'],
},
},
{
name: 'copilot_knowledge',
agentId: 'knowledge',
description: 'Create and manage knowledge bases.',
inputSchema: {
type: 'object',
properties: {
request: { type: 'string' },
context: { type: 'object' },
},
required: ['request'],
},
},
{
name: 'copilot_custom_tool',
agentId: 'custom_tool',
description: 'Create or manage custom tools.',
inputSchema: {
type: 'object',
properties: {
request: { type: 'string' },
context: { type: 'object' },
},
required: ['request'],
},
},
{
name: 'copilot_info',
agentId: 'info',
description: 'Inspect blocks, outputs, and workflow metadata.',
inputSchema: {
type: 'object',
properties: {
request: { type: 'string' },
workflowId: { type: 'string' },
context: { type: 'object' },
},
required: ['request'],
},
},
{
name: 'copilot_workflow',
agentId: 'workflow',
description: 'Manage workflow environment and configuration.',
inputSchema: {
type: 'object',
properties: {
request: { type: 'string' },
workflowId: { type: 'string' },
context: { type: 'object' },
},
required: ['request'],
},
},
{
name: 'copilot_research',
agentId: 'research',
description: 'Research external APIs and documentation.',
inputSchema: {
type: 'object',
properties: {
request: { type: 'string' },
context: { type: 'object' },
},
required: ['request'],
},
},
{
name: 'copilot_tour',
agentId: 'tour',
description: 'Explain platform features and usage.',
inputSchema: {
type: 'object',
properties: {
request: { type: 'string' },
context: { type: 'object' },
},
required: ['request'],
},
},
{
name: 'copilot_test',
agentId: 'test',
description: `Run workflows and verify outputs.
PREREQUISITE: The workflow MUST be deployed first using copilot_deploy.
Undeployed workflows will fail with "no active deployment" error.
WORKFLOW ID (REQUIRED):
- Always provide the workflowId parameter
USE THIS:
- After deploying to verify the workflow works correctly
- To test with sample inputs
- To validate workflow behavior before sharing with user`,
inputSchema: {
type: 'object',
properties: {
request: { type: 'string' },
workflowId: {
type: 'string',
description: 'REQUIRED. The workflow ID to test.',
},
context: { type: 'object' },
},
required: ['request', 'workflowId'],
},
},
{
name: 'copilot_superagent',
agentId: 'superagent',
description: 'Execute direct external actions (email, Slack, etc.).',
inputSchema: {
type: 'object',
properties: {
request: { type: 'string' },
context: { type: 'object' },
},
required: ['request'],
},
},
]

View File

@@ -8,7 +8,11 @@ import { validateSelectorIds } from '@/lib/copilot/validation/selector-validator
import type { PermissionGroupConfig } from '@/lib/permission-groups/types' import type { PermissionGroupConfig } from '@/lib/permission-groups/types'
import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs' import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
import { extractAndPersistCustomTools } from '@/lib/workflows/persistence/custom-tools-persistence' import { extractAndPersistCustomTools } from '@/lib/workflows/persistence/custom-tools-persistence'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' import { applyAutoLayout } from '@/lib/workflows/autolayout'
import {
loadWorkflowFromNormalizedTables,
saveWorkflowToNormalizedTables,
} from '@/lib/workflows/persistence/utils'
import { isValidKey } from '@/lib/workflows/sanitization/key-validation' import { isValidKey } from '@/lib/workflows/sanitization/key-validation'
import { validateWorkflowState } from '@/lib/workflows/sanitization/validation' import { validateWorkflowState } from '@/lib/workflows/sanitization/validation'
import { buildCanonicalIndex, isCanonicalPair } from '@/lib/workflows/subblocks/visibility' import { buildCanonicalIndex, isCanonicalPair } from '@/lib/workflows/subblocks/visibility'
@@ -1397,6 +1401,101 @@ function filterDisallowedTools(
return allowedTools return allowedTools
} }
/**
* Normalizes block IDs in operations to ensure they are valid UUIDs.
* The LLM may generate human-readable IDs like "web_search" or "research_agent"
* which need to be converted to proper UUIDs for database compatibility.
*
* Returns the normalized operations and a mapping from old IDs to new UUIDs.
*/
function normalizeBlockIdsInOperations(operations: EditWorkflowOperation[]): {
normalizedOperations: EditWorkflowOperation[]
idMapping: Map<string, string>
} {
const logger = createLogger('EditWorkflowServerTool')
const idMapping = new Map<string, string>()
// First pass: collect all non-UUID block_ids from add/insert operations
for (const op of operations) {
if (op.operation_type === 'add' || op.operation_type === 'insert_into_subflow') {
if (op.block_id && !UUID_REGEX.test(op.block_id)) {
const newId = crypto.randomUUID()
idMapping.set(op.block_id, newId)
logger.debug('Normalizing block ID', { oldId: op.block_id, newId })
}
}
}
if (idMapping.size === 0) {
return { normalizedOperations: operations, idMapping }
}
logger.info('Normalizing block IDs in operations', {
normalizedCount: idMapping.size,
mappings: Object.fromEntries(idMapping),
})
// Helper to replace an ID if it's in the mapping
const replaceId = (id: string | undefined): string | undefined => {
if (!id) return id
return idMapping.get(id) ?? id
}
// Second pass: update all references to use new UUIDs
const normalizedOperations = operations.map((op) => {
const normalized: EditWorkflowOperation = {
...op,
block_id: replaceId(op.block_id) ?? op.block_id,
}
if (op.params) {
normalized.params = { ...op.params }
// Update subflowId references (for insert_into_subflow)
if (normalized.params.subflowId) {
normalized.params.subflowId = replaceId(normalized.params.subflowId)
}
// Update connection references
if (normalized.params.connections) {
const normalizedConnections: Record<string, any> = {}
for (const [handle, targets] of Object.entries(normalized.params.connections)) {
if (typeof targets === 'string') {
normalizedConnections[handle] = replaceId(targets)
} else if (Array.isArray(targets)) {
normalizedConnections[handle] = targets.map((t) => {
if (typeof t === 'string') return replaceId(t)
if (t && typeof t === 'object' && t.block) {
return { ...t, block: replaceId(t.block) }
}
return t
})
} else if (targets && typeof targets === 'object' && (targets as any).block) {
normalizedConnections[handle] = { ...targets, block: replaceId((targets as any).block) }
} else {
normalizedConnections[handle] = targets
}
}
normalized.params.connections = normalizedConnections
}
// Update nestedNodes block IDs
if (normalized.params.nestedNodes) {
const normalizedNestedNodes: Record<string, any> = {}
for (const [childId, childBlock] of Object.entries(normalized.params.nestedNodes)) {
const newChildId = replaceId(childId) ?? childId
normalizedNestedNodes[newChildId] = childBlock
}
normalized.params.nestedNodes = normalizedNestedNodes
}
}
return normalized
})
return { normalizedOperations, idMapping }
}
/** /**
* Apply operations directly to the workflow JSON state * Apply operations directly to the workflow JSON state
*/ */
@@ -1416,6 +1515,11 @@ function applyOperationsToWorkflowState(
// Log initial state // Log initial state
const logger = createLogger('EditWorkflowServerTool') const logger = createLogger('EditWorkflowServerTool')
// Normalize block IDs to UUIDs before processing
const { normalizedOperations } = normalizeBlockIdsInOperations(operations)
operations = normalizedOperations
logger.info('Applying operations to workflow:', { logger.info('Applying operations to workflow:', {
totalOperations: operations.length, totalOperations: operations.length,
operationTypes: operations.reduce((acc: any, op) => { operationTypes: operations.reduce((acc: any, op) => {
@@ -3067,10 +3171,60 @@ export const editWorkflowServerTool: BaseServerTool<EditWorkflowParams, any> = {
const skippedMessages = const skippedMessages =
skippedItems.length > 0 ? skippedItems.map((item) => item.reason) : undefined skippedItems.length > 0 ? skippedItems.map((item) => item.reason) : undefined
// Return the modified workflow state for the client to convert to YAML if needed // Persist the workflow state to the database
const finalWorkflowState = validation.sanitizedState || modifiedWorkflowState
// Apply autolayout to position blocks properly
const layoutResult = applyAutoLayout(finalWorkflowState.blocks, finalWorkflowState.edges, {
horizontalSpacing: 250,
verticalSpacing: 100,
padding: { x: 100, y: 100 },
})
const layoutedBlocks = layoutResult.success && layoutResult.blocks
? layoutResult.blocks
: finalWorkflowState.blocks
if (!layoutResult.success) {
logger.warn('Autolayout failed, using default positions', {
workflowId,
error: layoutResult.error,
})
}
const workflowStateForDb = {
blocks: layoutedBlocks,
edges: finalWorkflowState.edges,
loops: generateLoopBlocks(layoutedBlocks as any),
parallels: generateParallelBlocks(layoutedBlocks as any),
lastSaved: Date.now(),
isDeployed: false,
}
const saveResult = await saveWorkflowToNormalizedTables(workflowId, workflowStateForDb as any)
if (!saveResult.success) {
logger.error('Failed to persist workflow state to database', {
workflowId,
error: saveResult.error,
})
throw new Error(`Failed to save workflow: ${saveResult.error}`)
}
// Update workflow's lastSynced timestamp
await db
.update(workflowTable)
.set({
lastSynced: new Date(),
updatedAt: new Date(),
})
.where(eq(workflowTable.id, workflowId))
logger.info('Workflow state persisted to database', { workflowId })
// Return the modified workflow state with autolayout applied
return { return {
success: true, success: true,
workflowState: validation.sanitizedState || modifiedWorkflowState, workflowState: { ...finalWorkflowState, blocks: layoutedBlocks },
// Include input validation errors so the LLM can see what was rejected // Include input validation errors so the LLM can see what was rejected
...(inputErrors && { ...(inputErrors && {
inputValidationErrors: inputErrors, inputValidationErrors: inputErrors,

View File

@@ -0,0 +1,37 @@
import { sanitizeForCopilot } from '@/lib/workflows/sanitization/json-sanitizer'
type CopilotWorkflowState = {
blocks?: Record<string, any>
edges?: any[]
loops?: Record<string, any>
parallels?: Record<string, any>
}
export function formatWorkflowStateForCopilot(state: CopilotWorkflowState): string {
const workflowState = {
blocks: state.blocks || {},
edges: state.edges || [],
loops: state.loops || {},
parallels: state.parallels || {},
}
const sanitized = sanitizeForCopilot(workflowState)
return JSON.stringify(sanitized, null, 2)
}
export function formatNormalizedWorkflowForCopilot(
normalized: CopilotWorkflowState | null | undefined
): string | null {
if (!normalized) return null
return formatWorkflowStateForCopilot(normalized)
}
export function normalizeWorkflowName(name?: string | null): string {
return String(name || '').trim().toLowerCase()
}
export function extractWorkflowNames(workflows: Array<{ name?: string | null }>): string[] {
return workflows
.map((workflow) => (typeof workflow?.name === 'string' ? workflow.name : null))
.filter((name): name is string => Boolean(name))
}

View File

@@ -1,7 +1,7 @@
import { db } from '@sim/db' import { db } from '@sim/db'
import { permissions, userStats, workflow as workflowTable } from '@sim/db/schema' import { permissions, userStats, workflow as workflowTable } from '@sim/db/schema'
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm' import { and, asc, eq, inArray, or } from 'drizzle-orm'
import { NextResponse } from 'next/server' import { NextResponse } from 'next/server'
import { getSession } from '@/lib/auth' import { getSession } from '@/lib/auth'
import { getWorkspaceWithOwner, type PermissionType } from '@/lib/workspaces/permissions/utils' import { getWorkspaceWithOwner, type PermissionType } from '@/lib/workspaces/permissions/utils'
@@ -15,6 +15,50 @@ export async function getWorkflowById(id: string) {
return rows[0] return rows[0]
} }
export async function resolveWorkflowIdForUser(
userId: string,
workflowId?: string,
workflowName?: string
): Promise<{ workflowId: string; workflowName?: string } | null> {
if (workflowId) {
return { workflowId }
}
const workspaceIds = await db
.select({ entityId: permissions.entityId })
.from(permissions)
.where(and(eq(permissions.userId, userId), eq(permissions.entityType, 'workspace')))
const workspaceIdList = workspaceIds.map((row) => row.entityId)
const workflowConditions = [eq(workflowTable.userId, userId)]
if (workspaceIdList.length > 0) {
workflowConditions.push(inArray(workflowTable.workspaceId, workspaceIdList))
}
const workflows = await db
.select()
.from(workflowTable)
.where(or(...workflowConditions))
.orderBy(asc(workflowTable.sortOrder), asc(workflowTable.createdAt), asc(workflowTable.id))
if (workflows.length === 0) {
return null
}
if (workflowName) {
const match = workflows.find(
(w) => String(w.name || '').trim().toLowerCase() === workflowName.toLowerCase()
)
if (match) {
return { workflowId: match.id, workflowName: match.name || undefined }
}
return null
}
return { workflowId: workflows[0].id, workflowName: workflows[0].name || undefined }
}
type WorkflowRecord = ReturnType<typeof getWorkflowById> extends Promise<infer R> type WorkflowRecord = ReturnType<typeof getWorkflowById> extends Promise<infer R>
? NonNullable<R> ? NonNullable<R>
: never : never

View File

@@ -53,7 +53,7 @@ import { SleepClientTool } from '@/lib/copilot/tools/client/other/sleep'
import { TestClientTool } from '@/lib/copilot/tools/client/other/test' import { TestClientTool } from '@/lib/copilot/tools/client/other/test'
import { TourClientTool } from '@/lib/copilot/tools/client/other/tour' import { TourClientTool } from '@/lib/copilot/tools/client/other/tour'
import { WorkflowClientTool } from '@/lib/copilot/tools/client/other/workflow' import { WorkflowClientTool } from '@/lib/copilot/tools/client/other/workflow'
import { createExecutionContext, getTool } from '@/lib/copilot/tools/client/registry' import { getTool } from '@/lib/copilot/tools/client/registry'
import { GetCredentialsClientTool } from '@/lib/copilot/tools/client/user/get-credentials' import { GetCredentialsClientTool } from '@/lib/copilot/tools/client/user/get-credentials'
import { SetEnvironmentVariablesClientTool } from '@/lib/copilot/tools/client/user/set-environment-variables' import { SetEnvironmentVariablesClientTool } from '@/lib/copilot/tools/client/user/set-environment-variables'
import { CheckDeploymentStatusClientTool } from '@/lib/copilot/tools/client/workflow/check-deployment-status' import { CheckDeploymentStatusClientTool } from '@/lib/copilot/tools/client/workflow/check-deployment-status'
@@ -1198,6 +1198,18 @@ const sseHandlers: Record<string, SSEHandler> = {
} }
} catch {} } catch {}
} }
if (current.name === 'edit_workflow') {
try {
const resultPayload =
data?.result || data?.data?.result || data?.data?.data || data?.data || {}
const workflowState = resultPayload?.workflowState
if (workflowState) {
const diffStore = useWorkflowDiffStore.getState()
void diffStore.setProposedChanges(workflowState)
}
} catch {}
}
} }
// Update inline content block state // Update inline content block state
@@ -1362,225 +1374,7 @@ const sseHandlers: Record<string, SSEHandler> = {
return return
} }
// Prefer interface-based registry to determine interrupt and execute return
try {
const def = name ? getTool(name) : undefined
if (def) {
const hasInterrupt =
typeof def.hasInterrupt === 'function'
? !!def.hasInterrupt(args || {})
: !!def.hasInterrupt
// Check if tool is auto-allowed - if so, execute even if it has an interrupt
const { autoAllowedTools } = get()
const isAutoAllowed = name ? autoAllowedTools.includes(name) : false
if ((!hasInterrupt || isAutoAllowed) && typeof def.execute === 'function') {
if (isAutoAllowed && hasInterrupt) {
logger.info('[toolCallsById] Auto-executing tool with interrupt (auto-allowed)', {
id,
name,
})
}
const ctx = createExecutionContext({ toolCallId: id, toolName: name || 'unknown_tool' })
// Defer executing transition by a tick to let pending render
setTimeout(() => {
// Guard against duplicate execution - check if already executing or terminal
const currentState = get().toolCallsById[id]?.state
if (currentState === ClientToolCallState.executing || isTerminalState(currentState)) {
return
}
const executingMap = { ...get().toolCallsById }
executingMap[id] = {
...executingMap[id],
state: ClientToolCallState.executing,
display: resolveToolDisplay(name, ClientToolCallState.executing, id, args),
}
set({ toolCallsById: executingMap })
logger.info('[toolCallsById] pending → executing (registry)', { id, name })
// Update inline content block to executing
for (let i = 0; i < context.contentBlocks.length; i++) {
const b = context.contentBlocks[i] as any
if (b.type === 'tool_call' && b.toolCall?.id === id) {
context.contentBlocks[i] = {
...b,
toolCall: { ...b.toolCall, state: ClientToolCallState.executing },
}
break
}
}
updateStreamingMessage(set, context)
Promise.resolve()
.then(async () => {
const result = await def.execute(ctx, args || {})
const success =
result && typeof result.status === 'number'
? result.status >= 200 && result.status < 300
: true
const completeMap = { ...get().toolCallsById }
// Do not override terminal review/rejected
if (
isRejectedState(completeMap[id]?.state) ||
isReviewState(completeMap[id]?.state) ||
isBackgroundState(completeMap[id]?.state)
) {
return
}
completeMap[id] = {
...completeMap[id],
state: success ? ClientToolCallState.success : ClientToolCallState.error,
display: resolveToolDisplay(
name,
success ? ClientToolCallState.success : ClientToolCallState.error,
id,
args
),
}
set({ toolCallsById: completeMap })
logger.info(
`[toolCallsById] executing → ${success ? 'success' : 'error'} (registry)`,
{ id, name }
)
// Notify backend tool mark-complete endpoint
try {
await fetch('/api/copilot/tools/mark-complete', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
id,
name: name || 'unknown_tool',
status:
typeof result?.status === 'number' ? result.status : success ? 200 : 500,
message: result?.message,
data: result?.data,
}),
})
} catch {}
})
.catch((e) => {
const errorMap = { ...get().toolCallsById }
// Do not override terminal review/rejected
if (
isRejectedState(errorMap[id]?.state) ||
isReviewState(errorMap[id]?.state) ||
isBackgroundState(errorMap[id]?.state)
) {
return
}
errorMap[id] = {
...errorMap[id],
state: ClientToolCallState.error,
display: resolveToolDisplay(name, ClientToolCallState.error, id, args),
}
set({ toolCallsById: errorMap })
logger.error('Registry auto-execute tool failed', { id, name, error: e })
})
}, 0)
return
}
}
} catch (e) {
logger.warn('tool_call registry auto-exec check failed', { id, name, error: e })
}
// Class-based auto-exec for non-interrupt tools or auto-allowed tools
try {
const inst = getClientTool(id) as any
const hasInterrupt = !!inst?.getInterruptDisplays?.()
// Check if tool is auto-allowed - if so, execute even if it has an interrupt
const { autoAllowedTools: classAutoAllowed } = get()
const isClassAutoAllowed = name ? classAutoAllowed.includes(name) : false
if (
(!hasInterrupt || isClassAutoAllowed) &&
(typeof inst?.execute === 'function' || typeof inst?.handleAccept === 'function')
) {
if (isClassAutoAllowed && hasInterrupt) {
logger.info('[toolCallsById] Auto-executing class tool with interrupt (auto-allowed)', {
id,
name,
})
}
setTimeout(() => {
// Guard against duplicate execution - check if already executing or terminal
const currentState = get().toolCallsById[id]?.state
if (currentState === ClientToolCallState.executing || isTerminalState(currentState)) {
return
}
const executingMap = { ...get().toolCallsById }
executingMap[id] = {
...executingMap[id],
state: ClientToolCallState.executing,
display: resolveToolDisplay(name, ClientToolCallState.executing, id, args),
}
set({ toolCallsById: executingMap })
logger.info('[toolCallsById] pending → executing (class)', { id, name })
Promise.resolve()
.then(async () => {
// Use handleAccept for tools with interrupts, execute for others
if (hasInterrupt && typeof inst?.handleAccept === 'function') {
await inst.handleAccept(args || {})
} else {
await inst.execute(args || {})
}
// Success/error will be synced via registerToolStateSync
})
.catch(() => {
const errorMap = { ...get().toolCallsById }
// Do not override terminal review/rejected
if (
isRejectedState(errorMap[id]?.state) ||
isReviewState(errorMap[id]?.state) ||
isBackgroundState(errorMap[id]?.state)
) {
return
}
errorMap[id] = {
...errorMap[id],
state: ClientToolCallState.error,
display: resolveToolDisplay(name, ClientToolCallState.error, id, args),
}
set({ toolCallsById: errorMap })
})
}, 0)
return
}
} catch {}
// Integration tools: Check auto-allowed or stay in pending state until user confirms
// This handles tools like google_calendar_*, exa_*, gmail_read, etc. that aren't in the client registry
// Only relevant if mode is 'build' (agent)
const { mode, workflowId, autoAllowedTools, executeIntegrationTool } = get()
if (mode === 'build' && workflowId) {
// Check if tool was NOT found in client registry
const def = name ? getTool(name) : undefined
const inst = getClientTool(id) as any
if (!def && !inst && name) {
// Check if this integration tool is auto-allowed - if so, execute it immediately
if (autoAllowedTools.includes(name)) {
logger.info('[build mode] Auto-executing integration tool (auto-allowed)', { id, name })
// Defer to allow pending state to render briefly
setTimeout(() => {
executeIntegrationTool(id).catch((err) => {
logger.error('[build mode] Auto-execute integration tool failed', {
id,
name,
error: err,
})
})
}, 0)
} else {
// Integration tools stay in pending state until user confirms
logger.info('[build mode] Integration tool awaiting user confirmation', {
id,
name,
})
}
}
}
}, },
reasoning: (data, context, _get, set) => { reasoning: (data, context, _get, set) => {
const phase = (data && (data.phase || data?.data?.phase)) as string | undefined const phase = (data && (data.phase || data?.data?.phase)) as string | undefined
@@ -2065,91 +1859,6 @@ const subAgentSSEHandlers: Record<string, SSEHandler> = {
if (isPartial) { if (isPartial) {
return return
} }
// Execute client tools in parallel (non-blocking) - same pattern as main tool_call handler
// Check if tool is auto-allowed
const { autoAllowedTools: subAgentAutoAllowed } = get()
const isSubAgentAutoAllowed = name ? subAgentAutoAllowed.includes(name) : false
try {
const def = getTool(name)
if (def) {
const hasInterrupt =
typeof def.hasInterrupt === 'function'
? !!def.hasInterrupt(args || {})
: !!def.hasInterrupt
// Auto-execute if no interrupt OR if auto-allowed
if (!hasInterrupt || isSubAgentAutoAllowed) {
if (isSubAgentAutoAllowed && hasInterrupt) {
logger.info('[SubAgent] Auto-executing tool with interrupt (auto-allowed)', {
id,
name,
})
}
// Auto-execute tools - non-blocking
const ctx = createExecutionContext({ toolCallId: id, toolName: name })
Promise.resolve()
.then(() => def.execute(ctx, args || {}))
.catch((execErr: any) => {
logger.error('[SubAgent] Tool execution failed', {
id,
name,
error: execErr?.message,
})
})
}
} else {
// Fallback to class-based tools - non-blocking
const instance = getClientTool(id)
if (instance) {
const hasInterruptDisplays = !!instance.getInterruptDisplays?.()
// Auto-execute if no interrupt OR if auto-allowed
if (!hasInterruptDisplays || isSubAgentAutoAllowed) {
if (isSubAgentAutoAllowed && hasInterruptDisplays) {
logger.info('[SubAgent] Auto-executing class tool with interrupt (auto-allowed)', {
id,
name,
})
}
Promise.resolve()
.then(() => {
// Use handleAccept for tools with interrupts, execute for others
if (hasInterruptDisplays && typeof instance.handleAccept === 'function') {
return instance.handleAccept(args || {})
}
return instance.execute(args || {})
})
.catch((execErr: any) => {
logger.error('[SubAgent] Class tool execution failed', {
id,
name,
error: execErr?.message,
})
})
}
} else {
// Check if this is an integration tool (server-side) that should be auto-executed
const isIntegrationTool = !CLASS_TOOL_METADATA[name]
if (isIntegrationTool && isSubAgentAutoAllowed) {
logger.info('[SubAgent] Auto-executing integration tool (auto-allowed)', {
id,
name,
})
// Execute integration tool via the store method
const { executeIntegrationTool } = get()
executeIntegrationTool(id).catch((err) => {
logger.error('[SubAgent] Integration tool auto-execution failed', {
id,
name,
error: err?.message || err,
})
})
}
}
}
} catch (e: any) {
logger.error('[SubAgent] Tool registry/execution error', { id, name, error: e?.message })
}
}, },
// Handle subagent tool results // Handle subagent tool results
@@ -3191,21 +2900,13 @@ export const useCopilotStore = create<CopilotStore>()(
return { messages } return { messages }
}) })
// Notify backend mark-complete to finalize tool server-side
try { try {
fetch('/api/copilot/tools/mark-complete', { fetch('/api/copilot/confirm', {
method: 'POST', method: 'POST',
headers: { 'Content-Type': 'application/json' }, headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ body: JSON.stringify({
id, toolCallId: id,
name: current.name, status: toolCallState,
status:
targetState === ClientToolCallState.success
? 200
: targetState === ClientToolCallState.rejected
? 409
: 500,
message: toolCallState,
}), }),
}).catch(() => {}) }).catch(() => {})
} catch {} } catch {}
@@ -3819,147 +3520,6 @@ export const useCopilotStore = create<CopilotStore>()(
setAgentPrefetch: (prefetch) => set({ agentPrefetch: prefetch }), setAgentPrefetch: (prefetch) => set({ agentPrefetch: prefetch }),
setEnabledModels: (models) => set({ enabledModels: models }), setEnabledModels: (models) => set({ enabledModels: models }),
executeIntegrationTool: async (toolCallId: string) => {
const { toolCallsById, workflowId } = get()
const toolCall = toolCallsById[toolCallId]
if (!toolCall || !workflowId) return
const { id, name, params } = toolCall
// Guard against double execution - skip if already executing or in terminal state
if (toolCall.state === ClientToolCallState.executing || isTerminalState(toolCall.state)) {
logger.info('[executeIntegrationTool] Skipping - already executing or terminal', {
id,
name,
state: toolCall.state,
})
return
}
// Set to executing state
const executingMap = { ...get().toolCallsById }
executingMap[id] = {
...executingMap[id],
state: ClientToolCallState.executing,
display: resolveToolDisplay(name, ClientToolCallState.executing, id, params),
}
set({ toolCallsById: executingMap })
logger.info('[toolCallsById] pending → executing (integration tool)', { id, name })
try {
const res = await fetch('/api/copilot/execute-tool', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
toolCallId: id,
toolName: name,
arguments: params || {},
workflowId,
}),
})
const result = await res.json()
const success = result.success && result.result?.success
const completeMap = { ...get().toolCallsById }
// Do not override terminal review/rejected
if (
isRejectedState(completeMap[id]?.state) ||
isReviewState(completeMap[id]?.state) ||
isBackgroundState(completeMap[id]?.state)
) {
return
}
completeMap[id] = {
...completeMap[id],
state: success ? ClientToolCallState.success : ClientToolCallState.error,
display: resolveToolDisplay(
name,
success ? ClientToolCallState.success : ClientToolCallState.error,
id,
params
),
}
set({ toolCallsById: completeMap })
logger.info(`[toolCallsById] executing → ${success ? 'success' : 'error'} (integration)`, {
id,
name,
result,
})
// Notify backend tool mark-complete endpoint
try {
await fetch('/api/copilot/tools/mark-complete', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
id,
name: name || 'unknown_tool',
status: success ? 200 : 500,
message: success
? result.result?.output?.content
: result.result?.error || result.error || 'Tool execution failed',
data: success
? result.result?.output
: {
error: result.result?.error || result.error,
output: result.result?.output,
},
}),
})
} catch {}
} catch (e) {
const errorMap = { ...get().toolCallsById }
// Do not override terminal review/rejected
if (
isRejectedState(errorMap[id]?.state) ||
isReviewState(errorMap[id]?.state) ||
isBackgroundState(errorMap[id]?.state)
) {
return
}
errorMap[id] = {
...errorMap[id],
state: ClientToolCallState.error,
display: resolveToolDisplay(name, ClientToolCallState.error, id, params),
}
set({ toolCallsById: errorMap })
logger.error('Integration tool execution failed', { id, name, error: e })
}
},
skipIntegrationTool: (toolCallId: string) => {
const { toolCallsById } = get()
const toolCall = toolCallsById[toolCallId]
if (!toolCall) return
const { id, name, params } = toolCall
// Set to rejected state
const rejectedMap = { ...get().toolCallsById }
rejectedMap[id] = {
...rejectedMap[id],
state: ClientToolCallState.rejected,
display: resolveToolDisplay(name, ClientToolCallState.rejected, id, params),
}
set({ toolCallsById: rejectedMap })
logger.info('[toolCallsById] pending → rejected (integration tool skipped)', { id, name })
// Notify backend tool mark-complete endpoint with skip status
fetch('/api/copilot/tools/mark-complete', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
id,
name: name || 'unknown_tool',
status: 200,
message: 'Tool execution skipped by user',
data: { skipped: true },
}),
}).catch(() => {})
},
loadAutoAllowedTools: async () => { loadAutoAllowedTools: async () => {
try { try {
logger.info('[AutoAllowedTools] Loading from API...') logger.info('[AutoAllowedTools] Loading from API...')
@@ -3993,45 +3553,6 @@ export const useCopilotStore = create<CopilotStore>()(
set({ autoAllowedTools: data.autoAllowedTools || [] }) set({ autoAllowedTools: data.autoAllowedTools || [] })
logger.info('[AutoAllowedTools] Added tool to store', { toolId }) logger.info('[AutoAllowedTools] Added tool to store', { toolId })
// Auto-execute all pending tools of the same type
const { toolCallsById, executeIntegrationTool } = get()
const pendingToolCalls = Object.values(toolCallsById).filter(
(tc) => tc.name === toolId && tc.state === ClientToolCallState.pending
)
if (pendingToolCalls.length > 0) {
const isIntegrationTool = !CLASS_TOOL_METADATA[toolId]
logger.info('[AutoAllowedTools] Auto-executing pending tools', {
toolId,
count: pendingToolCalls.length,
isIntegrationTool,
})
for (const tc of pendingToolCalls) {
if (isIntegrationTool) {
// Integration tools use executeIntegrationTool
executeIntegrationTool(tc.id).catch((err) => {
logger.error('[AutoAllowedTools] Auto-execute pending integration tool failed', {
toolCallId: tc.id,
toolId,
error: err,
})
})
} else {
// Client tools with interrupts use handleAccept
const inst = getClientTool(tc.id) as any
if (inst && typeof inst.handleAccept === 'function') {
Promise.resolve()
.then(() => inst.handleAccept(tc.params || {}))
.catch((err: any) => {
logger.error('[AutoAllowedTools] Auto-execute pending client tool failed', {
toolCallId: tc.id,
toolId,
error: err,
})
})
}
}
}
}
} }
} catch (err) { } catch (err) {
logger.error('[AutoAllowedTools] Failed to add tool', { toolId, error: err }) logger.error('[AutoAllowedTools] Failed to add tool', { toolId, error: err })

View File

@@ -231,8 +231,6 @@ export interface CopilotActions {
triggerUserMessageId?: string triggerUserMessageId?: string
) => Promise<void> ) => Promise<void>
handleNewChatCreation: (newChatId: string) => Promise<void> handleNewChatCreation: (newChatId: string) => Promise<void>
executeIntegrationTool: (toolCallId: string) => Promise<void>
skipIntegrationTool: (toolCallId: string) => void
loadAutoAllowedTools: () => Promise<void> loadAutoAllowedTools: () => Promise<void>
addAutoAllowedTool: (toolId: string) => Promise<void> addAutoAllowedTool: (toolId: string) => Promise<void>
removeAutoAllowedTool: (toolId: string) => Promise<void> removeAutoAllowedTool: (toolId: string) => Promise<void>

View File

@@ -0,0 +1,927 @@
# Copilot Server-Side Refactor Plan
> **Goal**: Move copilot orchestration logic from the browser (React/Zustand) to the Next.js server, enabling both headless API access and a simplified interactive client.
## Table of Contents
1. [Executive Summary](#executive-summary)
2. [Current Architecture](#current-architecture)
3. [Target Architecture](#target-architecture)
4. [Scope & Boundaries](#scope--boundaries)
5. [Module Design](#module-design)
6. [Implementation Plan](#implementation-plan)
7. [API Contracts](#api-contracts)
8. [Migration Strategy](#migration-strategy)
9. [Testing Strategy](#testing-strategy)
10. [Risks & Mitigations](#risks--mitigations)
11. [File Inventory](#file-inventory)
---
## Executive Summary
### Problem
The current copilot implementation in Sim has all orchestration logic in the browser:
- SSE stream parsing happens in the React client
- Tool execution is triggered from the browser
- OAuth tokens are sent to the client
- No headless/API access is possible
- The Zustand store is ~4,200 lines of complex async logic
### Solution
Move orchestration to the Next.js server:
- Server parses SSE from copilot backend
- Server executes tools directly (no HTTP round-trips)
- Server forwards events to client (if attached)
- Headless API returns JSON response
- Client store becomes a thin UI layer (~600 lines)
### Benefits
| Aspect | Before | After |
|--------|--------|-------|
| Security | OAuth tokens in browser | Tokens stay server-side |
| Headless access | Not possible | Full API support |
| Store complexity | ~4,200 lines | ~600 lines |
| Tool execution | Browser-initiated | Server-side |
| Testing | Complex async | Simple state |
| Bundle size | Large (tool classes) | Minimal |
---
## Current Architecture
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ BROWSER (React) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ Copilot Store (4,200 lines) ││
│ │ ││
│ │ • SSE stream parsing (parseSSEStream) ││
│ │ • Event handlers (sseHandlers, subAgentSSEHandlers) ││
│ │ • Tool execution logic ││
│ │ • Client tool instantiation ││
│ │ • Content block processing ││
│ │ • State management ││
│ │ • UI state ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │ │
│ │ HTTP calls for tool execution │
│ ▼ │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ NEXT.JS SERVER │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ /api/copilot/chat - Proxy to copilot backend (pass-through) │
│ /api/copilot/execute-tool - Execute integration tools │
│ /api/copilot/confirm - Update Redis with tool status │
│ /api/copilot/tools/mark-complete - Notify copilot backend │
│ /api/copilot/execute-copilot-server-tool - Execute server tools │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ COPILOT BACKEND (Go) │
│ copilot.sim.ai │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ • LLM orchestration │
│ • Subagent system (plan, edit, debug, etc.) │
│ • Tool definitions │
│ • Conversation management │
│ • SSE streaming │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
```
### Current Flow (Interactive)
1. User sends message in UI
2. Store calls `/api/copilot/chat`
3. Chat route proxies to copilot backend, streams SSE back
4. **Store parses SSE in browser**
5. On `tool_call` event:
- Store decides if tool needs confirmation
- Store calls `/api/copilot/execute-tool` or `/api/copilot/execute-copilot-server-tool`
- Store calls `/api/copilot/tools/mark-complete`
6. Store updates UI state
### Problems with Current Flow
1. **No headless access**: Must have browser client
2. **Security**: OAuth tokens sent to browser for tool execution
3. **Complexity**: All orchestration logic in Zustand store
4. **Performance**: Multiple HTTP round-trips from browser
5. **Reliability**: Browser can disconnect mid-operation
6. **Testing**: Hard to test async browser logic
---
## Target Architecture
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ BROWSER (React) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ Copilot Store (~600 lines) ││
│ │ ││
│ │ • UI state (messages, toolCalls display) ││
│ │ • Event listener (receive server events) ││
│ │ • User actions (send message, confirm/reject) ││
│ │ • Simple API calls ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │ │
│ │ SSE events from server │
│ │ │
└─────────────────────────────────────────────────────────────────────────────┘
│ (Optional - headless mode has no client)
┌─────────────────────────────────────────────────────────────────────────────┐
│ NEXT.JS SERVER │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ Orchestrator Module (NEW) ││
│ │ lib/copilot/orchestrator/ ││
│ │ ││
│ │ • SSE stream parsing ││
│ │ • Event handlers ││
│ │ • Tool execution (direct function calls) ││
│ │ • Response building ││
│ │ • Event forwarding (to client if attached) ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │ │
│ ┌──────┴──────┐ │
│ │ │ │
│ ▼ ▼ │
│ /api/copilot/chat /api/v1/copilot/chat │
│ (Interactive) (Headless) │
│ - Session auth - API key auth │
│ - SSE to client - JSON response │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
│ (Single external HTTP call)
┌─────────────────────────────────────────────────────────────────────────────┐
│ COPILOT BACKEND (Go) │
│ (UNCHANGED - no modifications) │
└─────────────────────────────────────────────────────────────────────────────┘
```
### Target Flow (Headless)
1. External client calls `POST /api/v1/copilot/chat` with API key
2. Orchestrator calls copilot backend
3. **Server parses SSE stream**
4. **Server executes tools directly** (no HTTP)
5. Server notifies copilot backend (mark-complete)
6. Server returns JSON response
### Target Flow (Interactive)
1. User sends message in UI
2. Store calls `/api/copilot/chat`
3. **Server orchestrates everything**
4. Server forwards events to client via SSE
5. Client just updates UI from events
6. Server returns when complete
---
## Scope & Boundaries
### In Scope
| Item | Description |
|------|-------------|
| Orchestrator module | New module in `lib/copilot/orchestrator/` |
| Headless API route | New route `POST /api/v1/copilot/chat` |
| SSE parsing | Move from store to server |
| Tool execution | Direct function calls on server |
| Event forwarding | SSE to client (interactive mode) |
| Store simplification | Reduce to UI-only logic |
### Out of Scope
| Item | Reason |
|------|--------|
| Copilot backend (Go) | Separate repo, working correctly |
| Tool definitions | Already work, just called differently |
| LLM providers | Handled by copilot backend |
| Subagent system | Handled by copilot backend |
### Boundaries
```
┌─────────────────────────────────────┐
│ MODIFICATION ZONE │
│ │
┌────────────────┼─────────────────────────────────────┼────────────────┐
│ │ │ │
│ UNCHANGED │ apps/sim/ │ UNCHANGED │
│ │ ├── lib/copilot/orchestrator/ │ │
│ copilot/ │ │ └── (NEW) │ apps/sim/ │
│ (Go backend) │ ├── app/api/v1/copilot/ │ tools/ │
│ │ │ └── (NEW) │ (definitions)│
│ │ ├── app/api/copilot/chat/ │ │
│ │ │ └── (MODIFIED) │ │
│ │ └── stores/panel/copilot/ │ │
│ │ └── (SIMPLIFIED) │ │
│ │ │ │
└────────────────┼─────────────────────────────────────┼────────────────┘
│ │
└─────────────────────────────────────┘
```
---
## Module Design
### Directory Structure
```
apps/sim/lib/copilot/orchestrator/
├── index.ts # Main orchestrator function
├── types.ts # Type definitions
├── sse-parser.ts # Parse SSE stream from copilot backend
├── sse-handlers.ts # Handle each SSE event type
├── tool-executor.ts # Execute tools directly (no HTTP)
├── persistence.ts # Database and Redis operations
└── response-builder.ts # Build final response
```
### Module Responsibilities
#### `types.ts`
Defines all types used by the orchestrator:
```typescript
// SSE Events
interface SSEEvent { type, data, subagent?, toolCallId?, toolName? }
type SSEEventType = 'content' | 'tool_call' | 'tool_result' | 'done' | ...
// Tool State
interface ToolCallState { id, name, status, params?, result?, error? }
type ToolCallStatus = 'pending' | 'executing' | 'success' | 'error' | 'skipped'
// Streaming Context (internal state during orchestration)
interface StreamingContext {
chatId?, conversationId?, messageId
accumulatedContent, contentBlocks
toolCalls: Map<string, ToolCallState>
streamComplete, errors[]
}
// Orchestrator API
interface OrchestratorRequest { message, workflowId, userId, chatId?, mode?, ... }
interface OrchestratorOptions { autoExecuteTools?, onEvent?, timeout?, ... }
interface OrchestratorResult { success, content, toolCalls[], chatId?, error? }
// Execution Context (passed to tool executors)
interface ExecutionContext { userId, workflowId, workspaceId?, decryptedEnvVars? }
```
#### `sse-parser.ts`
Parses SSE stream into typed events:
```typescript
async function* parseSSEStream(
reader: ReadableStreamDefaultReader,
decoder: TextDecoder,
abortSignal?: AbortSignal
): AsyncGenerator<SSEEvent>
```
- Handles buffering for partial lines
- Parses JSON from `data:` lines
- Yields typed `SSEEvent` objects
- Supports abort signal
#### `sse-handlers.ts`
Handles each SSE event type:
```typescript
const sseHandlers: Record<SSEEventType, SSEHandler> = {
content: (event, context) => { /* append to accumulated content */ },
tool_call: async (event, context, execContext, options) => {
/* track tool, execute if autoExecuteTools */
},
tool_result: (event, context) => { /* update tool status */ },
tool_generating: (event, context) => { /* create pending tool */ },
reasoning: (event, context) => { /* handle thinking blocks */ },
done: (event, context) => { /* mark stream complete */ },
error: (event, context) => { /* record error */ },
// ... etc
}
const subAgentHandlers: Record<SSEEventType, SSEHandler> = {
// Handlers for events within subagent context
}
```
#### `tool-executor.ts`
Executes tools directly without HTTP:
```typescript
// Main entry point
async function executeToolServerSide(
toolCall: ToolCallState,
context: ExecutionContext
): Promise<ToolCallResult>
// Server tools (edit_workflow, search_documentation, etc.)
async function executeServerToolDirect(
toolName: string,
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult>
// Integration tools (slack_send, gmail_read, etc.)
async function executeIntegrationToolDirect(
toolCallId: string,
toolName: string,
toolConfig: ToolConfig,
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult>
// Notify copilot backend (external HTTP - required)
async function markToolComplete(
toolCallId: string,
toolName: string,
status: number,
message?: any,
data?: any
): Promise<boolean>
// Prepare cached context for tool execution
async function prepareExecutionContext(
userId: string,
workflowId: string
): Promise<ExecutionContext>
```
**Key principle**: Internal tool execution uses direct function calls. Only `markToolComplete` makes HTTP call (to copilot backend - external).
#### `persistence.ts`
Database and Redis operations:
```typescript
// Chat persistence
async function createChat(params): Promise<{ id: string }>
async function loadChat(chatId, userId): Promise<Chat | null>
async function saveMessages(chatId, messages, options?): Promise<void>
async function updateChatConversationId(chatId, conversationId): Promise<void>
// Tool confirmation (Redis)
async function setToolConfirmation(toolCallId, status, message?): Promise<boolean>
async function getToolConfirmation(toolCallId): Promise<Confirmation | null>
```
#### `index.ts`
Main orchestrator function:
```typescript
async function orchestrateCopilotRequest(
request: OrchestratorRequest,
options: OrchestratorOptions = {}
): Promise<OrchestratorResult> {
// 1. Prepare execution context (cache env vars, etc.)
const execContext = await prepareExecutionContext(userId, workflowId)
// 2. Handle chat creation/loading
let chatId = await resolveChat(request)
// 3. Build request payload for copilot backend
const payload = buildCopilotPayload(request)
// 4. Call copilot backend
const response = await fetch(COPILOT_URL, { body: JSON.stringify(payload) })
// 5. Create streaming context
const context = createStreamingContext(chatId)
// 6. Parse and handle SSE stream
for await (const event of parseSSEStream(response.body)) {
// Forward to client if attached
options.onEvent?.(event)
// Handle event
const handler = getHandler(event)
await handler(event, context, execContext, options)
if (context.streamComplete) break
}
// 7. Persist to database
await persistChat(chatId, context)
// 8. Build and return result
return buildResult(context)
}
```
---
## Implementation Plan
### Phase 1: Create Orchestrator Module (3-4 days)
**Goal**: Build the orchestrator module that can run independently.
#### Tasks
1. **Create `types.ts`** (~200 lines)
- [ ] Define SSE event types
- [ ] Define tool call state types
- [ ] Define streaming context type
- [ ] Define orchestrator request/response types
- [ ] Define execution context type
2. **Create `sse-parser.ts`** (~80 lines)
- [ ] Extract parsing logic from store.ts
- [ ] Add abort signal support
- [ ] Add error handling
3. **Create `persistence.ts`** (~120 lines)
- [ ] Extract DB operations from chat route
- [ ] Extract Redis operations from confirm route
- [ ] Add chat creation/loading
- [ ] Add message saving
4. **Create `tool-executor.ts`** (~300 lines)
- [ ] Create `executeToolServerSide()` main entry
- [ ] Create `executeServerToolDirect()` for server tools
- [ ] Create `executeIntegrationToolDirect()` for integration tools
- [ ] Create `markToolComplete()` for copilot backend notification
- [ ] Create `prepareExecutionContext()` for caching
- [ ] Handle OAuth token resolution
- [ ] Handle env var resolution
5. **Create `sse-handlers.ts`** (~350 lines)
- [ ] Extract handlers from store.ts
- [ ] Adapt for server-side context
- [ ] Add tool execution integration
- [ ] Add subagent handlers
6. **Create `index.ts`** (~250 lines)
- [ ] Create `orchestrateCopilotRequest()` main function
- [ ] Wire together all modules
- [ ] Add timeout handling
- [ ] Add abort signal support
- [ ] Add event forwarding
#### Deliverables
- Complete `lib/copilot/orchestrator/` module
- Unit tests for each component
- Integration test for full orchestration
### Phase 2: Create Headless API Route (1 day)
**Goal**: Create API endpoint for headless copilot access.
#### Tasks
1. **Create route** `app/api/v1/copilot/chat/route.ts` (~100 lines)
- [ ] Add API key authentication
- [ ] Parse and validate request
- [ ] Call orchestrator
- [ ] Return JSON response
2. **Add to API documentation**
- [ ] Document request format
- [ ] Document response format
- [ ] Document error codes
#### Deliverables
- Working `POST /api/v1/copilot/chat` endpoint
- API documentation
- E2E test
### Phase 3: Wire Interactive Route (2 days)
**Goal**: Use orchestrator for existing interactive flow.
#### Tasks
1. **Modify `/api/copilot/chat/route.ts`**
- [ ] Add feature flag for new vs old flow
- [ ] Call orchestrator with `onEvent` callback
- [ ] Forward events to client via SSE
- [ ] Maintain backward compatibility
2. **Test both flows**
- [ ] Verify interactive works with new orchestrator
- [ ] Verify old flow still works (feature flag off)
#### Deliverables
- Interactive route using orchestrator
- Feature flag for gradual rollout
- No breaking changes
### Phase 4: Simplify Client Store (2-3 days)
**Goal**: Remove orchestration logic from client, keep UI-only.
#### Tasks
1. **Create simplified store** (new file or gradual refactor)
- [ ] Keep: UI state, messages, tool display
- [ ] Keep: Simple API calls
- [ ] Keep: Event listener
- [ ] Remove: SSE parsing
- [ ] Remove: Tool execution logic
- [ ] Remove: Client tool instantiators
2. **Update components**
- [ ] Update components to use simplified store
- [ ] Remove tool execution from UI components
- [ ] Simplify tool display components
3. **Remove dead code**
- [ ] Remove unused imports
- [ ] Remove unused helper functions
- [ ] Remove client tool classes (if no longer needed)
#### Deliverables
- Simplified store (~600 lines)
- Updated components
- Reduced bundle size
### Phase 5: Testing & Polish (2-3 days)
#### Tasks
1. **E2E testing**
- [ ] Test headless API with various prompts
- [ ] Test interactive with various prompts
- [ ] Test tool execution scenarios
- [ ] Test error handling
- [ ] Test abort/timeout scenarios
2. **Performance testing**
- [ ] Compare latency (old vs new)
- [ ] Check memory usage
- [ ] Check for connection issues
3. **Documentation**
- [ ] Update developer docs
- [ ] Add architecture diagram
- [ ] Document new API
#### Deliverables
- Comprehensive test suite
- Performance benchmarks
- Complete documentation
---
## API Contracts
### Headless API
#### Request
```http
POST /api/v1/copilot/chat
Content-Type: application/json
X-API-Key: sim_xxx
{
"message": "Create a Slack notification workflow",
"workflowId": "wf_abc123",
"chatId": "chat_xyz", // Optional: continue existing chat
"mode": "agent", // Optional: "agent" | "ask" | "plan"
"model": "claude-4-sonnet", // Optional
"autoExecuteTools": true, // Optional: default true
"timeout": 300000 // Optional: default 5 minutes
}
```
#### Response (Success)
```json
{
"success": true,
"content": "I've created a Slack notification workflow that...",
"toolCalls": [
{
"id": "tc_001",
"name": "search_patterns",
"status": "success",
"params": { "query": "slack notification" },
"result": { "patterns": [...] },
"durationMs": 234
},
{
"id": "tc_002",
"name": "edit_workflow",
"status": "success",
"params": { "operations": [...] },
"result": { "blocksAdded": 3 },
"durationMs": 1523
}
],
"chatId": "chat_xyz",
"conversationId": "conv_123"
}
```
#### Response (Error)
```json
{
"success": false,
"error": "Workflow not found",
"content": "",
"toolCalls": []
}
```
#### Error Codes
| Status | Error | Description |
|--------|-------|-------------|
| 400 | Invalid request | Missing required fields |
| 401 | Unauthorized | Invalid or missing API key |
| 404 | Workflow not found | Workflow ID doesn't exist |
| 500 | Internal error | Server-side failure |
| 504 | Timeout | Request exceeded timeout |
### Interactive API (Existing - Modified)
The existing `/api/copilot/chat` endpoint continues to work but now uses the orchestrator internally. SSE events forwarded to client remain the same format.
---
## Migration Strategy
### Rollout Plan
```
Week 1: Phase 1 (Orchestrator)
├── Day 1-2: Types + SSE Parser
├── Day 3: Tool Executor
└── Day 4-5: Handlers + Main Orchestrator
Week 2: Phase 2-3 (Routes)
├── Day 1: Headless API route
├── Day 2-3: Wire interactive route
└── Day 4-5: Testing both modes
Week 3: Phase 4-5 (Cleanup)
├── Day 1-3: Simplify store
├── Day 4: Testing
└── Day 5: Documentation
```
### Feature Flags
```typescript
// lib/copilot/config.ts
export const COPILOT_FLAGS = {
// Use new orchestrator for interactive mode
USE_SERVER_ORCHESTRATOR: process.env.COPILOT_USE_SERVER_ORCHESTRATOR === 'true',
// Enable headless API
ENABLE_HEADLESS_API: process.env.COPILOT_ENABLE_HEADLESS_API === 'true',
}
```
### Rollback Plan
If issues arise:
1. Set `COPILOT_USE_SERVER_ORCHESTRATOR=false`
2. Interactive mode falls back to old client-side flow
3. Headless API returns 503 Service Unavailable
---
## Testing Strategy
### Unit Tests
```
lib/copilot/orchestrator/
├── __tests__/
│ ├── sse-parser.test.ts
│ ├── sse-handlers.test.ts
│ ├── tool-executor.test.ts
│ ├── persistence.test.ts
│ └── index.test.ts
```
#### SSE Parser Tests
```typescript
describe('parseSSEStream', () => {
it('parses content events')
it('parses tool_call events')
it('handles partial lines')
it('handles malformed JSON')
it('respects abort signal')
})
```
#### Tool Executor Tests
```typescript
describe('executeToolServerSide', () => {
it('executes server tools directly')
it('executes integration tools with OAuth')
it('resolves env var references')
it('handles tool not found')
it('handles execution errors')
})
```
### Integration Tests
```typescript
describe('orchestrateCopilotRequest', () => {
it('handles simple message without tools')
it('handles message with single tool call')
it('handles message with multiple tool calls')
it('handles subagent tool calls')
it('handles stream errors')
it('respects timeout')
it('forwards events to callback')
})
```
### E2E Tests
```typescript
describe('POST /api/v1/copilot/chat', () => {
it('returns 401 without API key')
it('returns 400 with invalid request')
it('executes simple ask query')
it('executes workflow modification')
it('handles tool execution')
})
```
---
## Risks & Mitigations
### Risk 1: Breaking Interactive Mode
**Risk**: Refactoring could break existing interactive copilot.
**Mitigation**:
- Feature flag for gradual rollout
- Keep old code path available
- Extensive E2E testing
- Staged deployment (internal → beta → production)
### Risk 2: Tool Execution Differences
**Risk**: Tool behavior differs between client and server execution.
**Mitigation**:
- Reuse existing tool execution logic (same functions)
- Compare outputs in parallel testing
- Log discrepancies for investigation
### Risk 3: Performance Regression
**Risk**: Server-side orchestration could be slower.
**Mitigation**:
- Actually should be faster (no browser round-trips)
- Benchmark before/after
- Profile critical paths
### Risk 4: Memory Usage
**Risk**: Server accumulates state during long-running requests.
**Mitigation**:
- Set reasonable timeouts
- Clean up context after request
- Monitor memory in production
### Risk 5: Connection Issues
**Risk**: Long-running SSE connections could drop.
**Mitigation**:
- Implement reconnection logic
- Save checkpoints to resume
- Handle partial completions gracefully
---
## File Inventory
### New Files
| File | Lines | Description |
|------|-------|-------------|
| `lib/copilot/orchestrator/types.ts` | ~200 | Type definitions |
| `lib/copilot/orchestrator/sse-parser.ts` | ~80 | SSE stream parsing |
| `lib/copilot/orchestrator/sse-handlers.ts` | ~350 | Event handlers |
| `lib/copilot/orchestrator/tool-executor.ts` | ~300 | Tool execution |
| `lib/copilot/orchestrator/persistence.ts` | ~120 | DB/Redis operations |
| `lib/copilot/orchestrator/index.ts` | ~250 | Main orchestrator |
| `app/api/v1/copilot/chat/route.ts` | ~100 | Headless API |
| **Total New** | **~1,400** | |
### Modified Files
| File | Change |
|------|--------|
| `app/api/copilot/chat/route.ts` | Use orchestrator (optional) |
| `stores/panel/copilot/store.ts` | Simplify to ~600 lines |
### Deleted Code (from store.ts)
| Section | Lines Removed |
|---------|---------------|
| SSE parsing logic | ~150 |
| `sseHandlers` object | ~750 |
| `subAgentSSEHandlers` | ~280 |
| Tool execution logic | ~400 |
| Client tool instantiators | ~120 |
| Content block helpers | ~200 |
| Streaming context | ~100 |
| **Total Removed** | **~2,000** |
### Net Change
```
New code: +1,400 lines (orchestrator module)
Removed code: -2,000 lines (from store)
Modified code: ~200 lines (route changes)
───────────────────────────────────────
Net change: -400 lines (cleaner, more maintainable)
```
---
## Appendix: Code Extraction Map
### From `stores/panel/copilot/store.ts`
| Source Lines | Destination | Notes |
|--------------|-------------|-------|
| 900-1050 (parseSSEStream) | `sse-parser.ts` | Adapt for server |
| 1120-1867 (sseHandlers) | `sse-handlers.ts` | Remove Zustand deps |
| 1940-2217 (subAgentSSEHandlers) | `sse-handlers.ts` | Merge with above |
| 1365-1583 (tool execution) | `tool-executor.ts` | Direct calls |
| 330-380 (StreamingContext) | `types.ts` | Clean up |
| 3328-3648 (handleStreamingResponse) | `index.ts` | Main loop |
### From `app/api/copilot/execute-tool/route.ts`
| Source Lines | Destination | Notes |
|--------------|-------------|-------|
| 30-247 (POST handler) | `tool-executor.ts` | Extract core logic |
### From `app/api/copilot/confirm/route.ts`
| Source Lines | Destination | Notes |
|--------------|-------------|-------|
| 28-89 (updateToolCallStatus) | `persistence.ts` | Redis operations |
---
## Approval & Sign-off
- [ ] Technical review complete
- [ ] Security review complete
- [ ] Performance impact assessed
- [ ] Rollback plan approved
- [ ] Testing plan approved
---
*Document created: January 2026*
*Last updated: January 2026*