mirror of
https://github.com/simstudioai/sim.git
synced 2026-02-05 04:05:14 -05:00
Compare commits
47 Commits
feat/copil
...
v0.5.80
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
46822e91f3 | ||
|
|
2bb68335ee | ||
|
|
8528fbe2d2 | ||
|
|
31fdd2be13 | ||
|
|
028bc652c2 | ||
|
|
c6bf5cd58c | ||
|
|
11dc18a80d | ||
|
|
ab4e9dc72f | ||
|
|
1c58c35bd8 | ||
|
|
d63a5cb504 | ||
|
|
8bd5d41723 | ||
|
|
c12931bc50 | ||
|
|
e9c4251c1c | ||
|
|
cc2be33d6b | ||
|
|
45371e521e | ||
|
|
0ce0f98aa5 | ||
|
|
dff1c9d083 | ||
|
|
b09f683072 | ||
|
|
a8bb0db660 | ||
|
|
af82820a28 | ||
|
|
4372841797 | ||
|
|
5e8c843241 | ||
|
|
7bf3d73ee6 | ||
|
|
7ffc11a738 | ||
|
|
be578e2ed7 | ||
|
|
f415e5edc4 | ||
|
|
13a6e6c3fa | ||
|
|
f5ab7f21ae | ||
|
|
bfb6fffe38 | ||
|
|
4fbec0a43f | ||
|
|
585f5e365b | ||
|
|
3792bdd252 | ||
|
|
eb5d1f3e5b | ||
|
|
54ab82c8dd | ||
|
|
f895bf469b | ||
|
|
dd3209af06 | ||
|
|
b6ba3b50a7 | ||
|
|
b304233062 | ||
|
|
57e4b49bd6 | ||
|
|
e12dd204ed | ||
|
|
3d9d9cbc54 | ||
|
|
0f4ec962ad | ||
|
|
4827866f9a | ||
|
|
3e697d9ed9 | ||
|
|
4431a1a484 | ||
|
|
4d1a9a3f22 | ||
|
|
eb07a080fb |
@@ -7,14 +7,8 @@ import { z } from 'zod'
|
|||||||
import { getSession } from '@/lib/auth'
|
import { getSession } from '@/lib/auth'
|
||||||
import { generateChatTitle } from '@/lib/copilot/chat-title'
|
import { generateChatTitle } from '@/lib/copilot/chat-title'
|
||||||
import { getCopilotModel } from '@/lib/copilot/config'
|
import { getCopilotModel } from '@/lib/copilot/config'
|
||||||
import { SIM_AGENT_VERSION } from '@/lib/copilot/constants'
|
import { SIM_AGENT_API_URL_DEFAULT, SIM_AGENT_VERSION } from '@/lib/copilot/constants'
|
||||||
import { COPILOT_MODEL_IDS, COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
|
import { COPILOT_MODEL_IDS, COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
|
||||||
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
|
|
||||||
import {
|
|
||||||
createStreamEventWriter,
|
|
||||||
resetStreamBuffer,
|
|
||||||
setStreamMeta,
|
|
||||||
} from '@/lib/copilot/orchestrator/stream-buffer'
|
|
||||||
import {
|
import {
|
||||||
authenticateCopilotRequestSessionOnly,
|
authenticateCopilotRequestSessionOnly,
|
||||||
createBadRequestResponse,
|
createBadRequestResponse,
|
||||||
@@ -27,12 +21,13 @@ import type { CopilotProviderConfig } from '@/lib/copilot/types'
|
|||||||
import { env } from '@/lib/core/config/env'
|
import { env } from '@/lib/core/config/env'
|
||||||
import { CopilotFiles } from '@/lib/uploads'
|
import { CopilotFiles } from '@/lib/uploads'
|
||||||
import { createFileContent } from '@/lib/uploads/utils/file-utils'
|
import { createFileContent } from '@/lib/uploads/utils/file-utils'
|
||||||
import { resolveWorkflowIdForUser } from '@/lib/workflows/utils'
|
|
||||||
import { tools } from '@/tools/registry'
|
import { tools } from '@/tools/registry'
|
||||||
import { getLatestVersionTools, stripVersionSuffix } from '@/tools/utils'
|
import { getLatestVersionTools, stripVersionSuffix } from '@/tools/utils'
|
||||||
|
|
||||||
const logger = createLogger('CopilotChatAPI')
|
const logger = createLogger('CopilotChatAPI')
|
||||||
|
|
||||||
|
const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
|
||||||
|
|
||||||
const FileAttachmentSchema = z.object({
|
const FileAttachmentSchema = z.object({
|
||||||
id: z.string(),
|
id: z.string(),
|
||||||
key: z.string(),
|
key: z.string(),
|
||||||
@@ -45,8 +40,7 @@ const ChatMessageSchema = z.object({
|
|||||||
message: z.string().min(1, 'Message is required'),
|
message: z.string().min(1, 'Message is required'),
|
||||||
userMessageId: z.string().optional(), // ID from frontend for the user message
|
userMessageId: z.string().optional(), // ID from frontend for the user message
|
||||||
chatId: z.string().optional(),
|
chatId: z.string().optional(),
|
||||||
workflowId: z.string().optional(),
|
workflowId: z.string().min(1, 'Workflow ID is required'),
|
||||||
workflowName: z.string().optional(),
|
|
||||||
model: z.enum(COPILOT_MODEL_IDS).optional().default('claude-4.5-opus'),
|
model: z.enum(COPILOT_MODEL_IDS).optional().default('claude-4.5-opus'),
|
||||||
mode: z.enum(COPILOT_REQUEST_MODES).optional().default('agent'),
|
mode: z.enum(COPILOT_REQUEST_MODES).optional().default('agent'),
|
||||||
prefetch: z.boolean().optional(),
|
prefetch: z.boolean().optional(),
|
||||||
@@ -106,8 +100,7 @@ export async function POST(req: NextRequest) {
|
|||||||
message,
|
message,
|
||||||
userMessageId,
|
userMessageId,
|
||||||
chatId,
|
chatId,
|
||||||
workflowId: providedWorkflowId,
|
workflowId,
|
||||||
workflowName,
|
|
||||||
model,
|
model,
|
||||||
mode,
|
mode,
|
||||||
prefetch,
|
prefetch,
|
||||||
@@ -120,20 +113,6 @@ export async function POST(req: NextRequest) {
|
|||||||
contexts,
|
contexts,
|
||||||
commands,
|
commands,
|
||||||
} = ChatMessageSchema.parse(body)
|
} = ChatMessageSchema.parse(body)
|
||||||
|
|
||||||
// Resolve workflowId - if not provided, use first workflow or find by name
|
|
||||||
const resolved = await resolveWorkflowIdForUser(
|
|
||||||
authenticatedUserId,
|
|
||||||
providedWorkflowId,
|
|
||||||
workflowName
|
|
||||||
)
|
|
||||||
if (!resolved) {
|
|
||||||
return createBadRequestResponse(
|
|
||||||
'No workflows found. Create a workflow first or provide a valid workflowId.'
|
|
||||||
)
|
|
||||||
}
|
|
||||||
const workflowId = resolved.workflowId
|
|
||||||
|
|
||||||
// Ensure we have a consistent user message ID for this request
|
// Ensure we have a consistent user message ID for this request
|
||||||
const userMessageIdToUse = userMessageId || crypto.randomUUID()
|
const userMessageIdToUse = userMessageId || crypto.randomUUID()
|
||||||
try {
|
try {
|
||||||
@@ -486,53 +465,77 @@ export async function POST(req: NextRequest) {
|
|||||||
})
|
})
|
||||||
} catch {}
|
} catch {}
|
||||||
|
|
||||||
if (stream) {
|
const simAgentResponse = await fetch(`${SIM_AGENT_API_URL}/api/chat-completion-streaming`, {
|
||||||
const streamId = userMessageIdToUse
|
method: 'POST',
|
||||||
let eventWriter: ReturnType<typeof createStreamEventWriter> | null = null
|
headers: {
|
||||||
let clientDisconnected = false
|
'Content-Type': 'application/json',
|
||||||
|
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
|
||||||
|
},
|
||||||
|
body: JSON.stringify(requestPayload),
|
||||||
|
})
|
||||||
|
|
||||||
|
if (!simAgentResponse.ok) {
|
||||||
|
if (simAgentResponse.status === 401 || simAgentResponse.status === 402) {
|
||||||
|
// Rethrow status only; client will render appropriate assistant message
|
||||||
|
return new NextResponse(null, { status: simAgentResponse.status })
|
||||||
|
}
|
||||||
|
|
||||||
|
const errorText = await simAgentResponse.text().catch(() => '')
|
||||||
|
logger.error(`[${tracker.requestId}] Sim agent API error:`, {
|
||||||
|
status: simAgentResponse.status,
|
||||||
|
error: errorText,
|
||||||
|
})
|
||||||
|
|
||||||
|
return NextResponse.json(
|
||||||
|
{ error: `Sim agent API error: ${simAgentResponse.statusText}` },
|
||||||
|
{ status: simAgentResponse.status }
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If streaming is requested, forward the stream and update chat later
|
||||||
|
if (stream && simAgentResponse.body) {
|
||||||
|
// Create user message to save
|
||||||
|
const userMessage = {
|
||||||
|
id: userMessageIdToUse, // Consistent ID used for request and persistence
|
||||||
|
role: 'user',
|
||||||
|
content: message,
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
...(fileAttachments && fileAttachments.length > 0 && { fileAttachments }),
|
||||||
|
...(Array.isArray(contexts) && contexts.length > 0 && { contexts }),
|
||||||
|
...(Array.isArray(contexts) &&
|
||||||
|
contexts.length > 0 && {
|
||||||
|
contentBlocks: [{ type: 'contexts', contexts: contexts as any, timestamp: Date.now() }],
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a pass-through stream that captures the response
|
||||||
const transformedStream = new ReadableStream({
|
const transformedStream = new ReadableStream({
|
||||||
async start(controller) {
|
async start(controller) {
|
||||||
const encoder = new TextEncoder()
|
const encoder = new TextEncoder()
|
||||||
|
let assistantContent = ''
|
||||||
|
const toolCalls: any[] = []
|
||||||
|
let buffer = ''
|
||||||
|
const isFirstDone = true
|
||||||
|
let responseIdFromStart: string | undefined
|
||||||
|
let responseIdFromDone: string | undefined
|
||||||
|
// Track tool call progress to identify a safe done event
|
||||||
|
const announcedToolCallIds = new Set<string>()
|
||||||
|
const startedToolExecutionIds = new Set<string>()
|
||||||
|
const completedToolExecutionIds = new Set<string>()
|
||||||
|
let lastDoneResponseId: string | undefined
|
||||||
|
let lastSafeDoneResponseId: string | undefined
|
||||||
|
|
||||||
await resetStreamBuffer(streamId)
|
// Send chatId as first event
|
||||||
await setStreamMeta(streamId, { status: 'active', userId: authenticatedUserId })
|
|
||||||
eventWriter = createStreamEventWriter(streamId)
|
|
||||||
|
|
||||||
const shouldFlushEvent = (event: Record<string, any>) =>
|
|
||||||
event.type === 'tool_call' ||
|
|
||||||
event.type === 'tool_result' ||
|
|
||||||
event.type === 'tool_error' ||
|
|
||||||
event.type === 'subagent_end' ||
|
|
||||||
event.type === 'structured_result' ||
|
|
||||||
event.type === 'subagent_result' ||
|
|
||||||
event.type === 'done' ||
|
|
||||||
event.type === 'error'
|
|
||||||
|
|
||||||
const pushEvent = async (event: Record<string, any>) => {
|
|
||||||
if (!eventWriter) return
|
|
||||||
const entry = await eventWriter.write(event)
|
|
||||||
if (shouldFlushEvent(event)) {
|
|
||||||
await eventWriter.flush()
|
|
||||||
}
|
|
||||||
const payload = {
|
|
||||||
...event,
|
|
||||||
eventId: entry.eventId,
|
|
||||||
streamId,
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
if (!clientDisconnected) {
|
|
||||||
controller.enqueue(encoder.encode(`data: ${JSON.stringify(payload)}\n\n`))
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
clientDisconnected = true
|
|
||||||
await eventWriter.flush()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (actualChatId) {
|
if (actualChatId) {
|
||||||
await pushEvent({ type: 'chat_id', chatId: actualChatId })
|
const chatIdEvent = `data: ${JSON.stringify({
|
||||||
|
type: 'chat_id',
|
||||||
|
chatId: actualChatId,
|
||||||
|
})}\n\n`
|
||||||
|
controller.enqueue(encoder.encode(chatIdEvent))
|
||||||
|
logger.debug(`[${tracker.requestId}] Sent initial chatId event to client`)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start title generation in parallel if needed
|
||||||
if (actualChatId && !currentChat?.title && conversationHistory.length === 0) {
|
if (actualChatId && !currentChat?.title && conversationHistory.length === 0) {
|
||||||
generateChatTitle(message)
|
generateChatTitle(message)
|
||||||
.then(async (title) => {
|
.then(async (title) => {
|
||||||
@@ -544,64 +547,311 @@ export async function POST(req: NextRequest) {
|
|||||||
updatedAt: new Date(),
|
updatedAt: new Date(),
|
||||||
})
|
})
|
||||||
.where(eq(copilotChats.id, actualChatId!))
|
.where(eq(copilotChats.id, actualChatId!))
|
||||||
await pushEvent({ type: 'title_updated', title })
|
|
||||||
|
const titleEvent = `data: ${JSON.stringify({
|
||||||
|
type: 'title_updated',
|
||||||
|
title: title,
|
||||||
|
})}\n\n`
|
||||||
|
controller.enqueue(encoder.encode(titleEvent))
|
||||||
|
logger.info(`[${tracker.requestId}] Generated and saved title: ${title}`)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.catch((error) => {
|
.catch((error) => {
|
||||||
logger.error(`[${tracker.requestId}] Title generation failed:`, error)
|
logger.error(`[${tracker.requestId}] Title generation failed:`, error)
|
||||||
})
|
})
|
||||||
|
} else {
|
||||||
|
logger.debug(`[${tracker.requestId}] Skipping title generation`)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Forward the sim agent stream and capture assistant response
|
||||||
|
const reader = simAgentResponse.body!.getReader()
|
||||||
|
const decoder = new TextDecoder()
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const result = await orchestrateCopilotStream(requestPayload, {
|
while (true) {
|
||||||
userId: authenticatedUserId,
|
const { done, value } = await reader.read()
|
||||||
workflowId,
|
if (done) {
|
||||||
chatId: actualChatId,
|
break
|
||||||
autoExecuteTools: true,
|
}
|
||||||
interactive: true,
|
|
||||||
onEvent: async (event) => {
|
// Decode and parse SSE events for logging and capturing content
|
||||||
await pushEvent(event)
|
const decodedChunk = decoder.decode(value, { stream: true })
|
||||||
},
|
buffer += decodedChunk
|
||||||
|
|
||||||
|
const lines = buffer.split('\n')
|
||||||
|
buffer = lines.pop() || '' // Keep incomplete line in buffer
|
||||||
|
|
||||||
|
for (const line of lines) {
|
||||||
|
if (line.trim() === '') continue // Skip empty lines
|
||||||
|
|
||||||
|
if (line.startsWith('data: ') && line.length > 6) {
|
||||||
|
try {
|
||||||
|
const jsonStr = line.slice(6)
|
||||||
|
|
||||||
|
// Check if the JSON string is unusually large (potential streaming issue)
|
||||||
|
if (jsonStr.length > 50000) {
|
||||||
|
// 50KB limit
|
||||||
|
logger.warn(`[${tracker.requestId}] Large SSE event detected`, {
|
||||||
|
size: jsonStr.length,
|
||||||
|
preview: `${jsonStr.substring(0, 100)}...`,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
const event = JSON.parse(jsonStr)
|
||||||
|
|
||||||
|
// Log different event types comprehensively
|
||||||
|
switch (event.type) {
|
||||||
|
case 'content':
|
||||||
|
if (event.data) {
|
||||||
|
assistantContent += event.data
|
||||||
|
}
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'reasoning':
|
||||||
|
logger.debug(
|
||||||
|
`[${tracker.requestId}] Reasoning chunk received (${(event.data || event.content || '').length} chars)`
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'tool_call':
|
||||||
|
if (!event.data?.partial) {
|
||||||
|
toolCalls.push(event.data)
|
||||||
|
if (event.data?.id) {
|
||||||
|
announcedToolCallIds.add(event.data.id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'tool_generating':
|
||||||
|
if (event.toolCallId) {
|
||||||
|
startedToolExecutionIds.add(event.toolCallId)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'tool_result':
|
||||||
|
if (event.toolCallId) {
|
||||||
|
completedToolExecutionIds.add(event.toolCallId)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'tool_error':
|
||||||
|
logger.error(`[${tracker.requestId}] Tool error:`, {
|
||||||
|
toolCallId: event.toolCallId,
|
||||||
|
toolName: event.toolName,
|
||||||
|
error: event.error,
|
||||||
|
success: event.success,
|
||||||
|
})
|
||||||
|
if (event.toolCallId) {
|
||||||
|
completedToolExecutionIds.add(event.toolCallId)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'start':
|
||||||
|
if (event.data?.responseId) {
|
||||||
|
responseIdFromStart = event.data.responseId
|
||||||
|
}
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'done':
|
||||||
|
if (event.data?.responseId) {
|
||||||
|
responseIdFromDone = event.data.responseId
|
||||||
|
lastDoneResponseId = responseIdFromDone
|
||||||
|
|
||||||
|
// Mark this done as safe only if no tool call is currently in progress or pending
|
||||||
|
const announced = announcedToolCallIds.size
|
||||||
|
const completed = completedToolExecutionIds.size
|
||||||
|
const started = startedToolExecutionIds.size
|
||||||
|
const hasToolInProgress = announced > completed || started > completed
|
||||||
|
if (!hasToolInProgress) {
|
||||||
|
lastSafeDoneResponseId = responseIdFromDone
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'error':
|
||||||
|
break
|
||||||
|
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// Emit to client: rewrite 'error' events into user-friendly assistant message
|
||||||
|
if (event?.type === 'error') {
|
||||||
|
try {
|
||||||
|
const displayMessage: string =
|
||||||
|
(event?.data && (event.data.displayMessage as string)) ||
|
||||||
|
'Sorry, I encountered an error. Please try again.'
|
||||||
|
const formatted = `_${displayMessage}_`
|
||||||
|
// Accumulate so it persists to DB as assistant content
|
||||||
|
assistantContent += formatted
|
||||||
|
// Send as content chunk
|
||||||
|
try {
|
||||||
|
controller.enqueue(
|
||||||
|
encoder.encode(
|
||||||
|
`data: ${JSON.stringify({ type: 'content', data: formatted })}\n\n`
|
||||||
|
)
|
||||||
|
)
|
||||||
|
} catch (enqueueErr) {
|
||||||
|
reader.cancel()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// Then close this response cleanly for the client
|
||||||
|
try {
|
||||||
|
controller.enqueue(
|
||||||
|
encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`)
|
||||||
|
)
|
||||||
|
} catch (enqueueErr) {
|
||||||
|
reader.cancel()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
} catch {}
|
||||||
|
// Do not forward the original error event
|
||||||
|
} else {
|
||||||
|
// Forward original event to client
|
||||||
|
try {
|
||||||
|
controller.enqueue(encoder.encode(`data: ${jsonStr}\n\n`))
|
||||||
|
} catch (enqueueErr) {
|
||||||
|
reader.cancel()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
// Enhanced error handling for large payloads and parsing issues
|
||||||
|
const lineLength = line.length
|
||||||
|
const isLargePayload = lineLength > 10000
|
||||||
|
|
||||||
|
if (isLargePayload) {
|
||||||
|
logger.error(
|
||||||
|
`[${tracker.requestId}] Failed to parse large SSE event (${lineLength} chars)`,
|
||||||
|
{
|
||||||
|
error: e,
|
||||||
|
preview: `${line.substring(0, 200)}...`,
|
||||||
|
size: lineLength,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
logger.warn(
|
||||||
|
`[${tracker.requestId}] Failed to parse SSE event: "${line.substring(0, 200)}..."`,
|
||||||
|
e
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (line.trim() && line !== 'data: [DONE]') {
|
||||||
|
logger.debug(`[${tracker.requestId}] Non-SSE line from sim agent: "${line}"`)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process any remaining buffer
|
||||||
|
if (buffer.trim()) {
|
||||||
|
logger.debug(`[${tracker.requestId}] Processing remaining buffer: "${buffer}"`)
|
||||||
|
if (buffer.startsWith('data: ')) {
|
||||||
|
try {
|
||||||
|
const jsonStr = buffer.slice(6)
|
||||||
|
const event = JSON.parse(jsonStr)
|
||||||
|
if (event.type === 'content' && event.data) {
|
||||||
|
assistantContent += event.data
|
||||||
|
}
|
||||||
|
// Forward remaining event, applying same error rewrite behavior
|
||||||
|
if (event?.type === 'error') {
|
||||||
|
const displayMessage: string =
|
||||||
|
(event?.data && (event.data.displayMessage as string)) ||
|
||||||
|
'Sorry, I encountered an error. Please try again.'
|
||||||
|
const formatted = `_${displayMessage}_`
|
||||||
|
assistantContent += formatted
|
||||||
|
try {
|
||||||
|
controller.enqueue(
|
||||||
|
encoder.encode(
|
||||||
|
`data: ${JSON.stringify({ type: 'content', data: formatted })}\n\n`
|
||||||
|
)
|
||||||
|
)
|
||||||
|
controller.enqueue(
|
||||||
|
encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`)
|
||||||
|
)
|
||||||
|
} catch (enqueueErr) {
|
||||||
|
reader.cancel()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
controller.enqueue(encoder.encode(`data: ${jsonStr}\n\n`))
|
||||||
|
} catch (enqueueErr) {
|
||||||
|
reader.cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
logger.warn(`[${tracker.requestId}] Failed to parse final buffer: "${buffer}"`)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log final streaming summary
|
||||||
|
logger.info(`[${tracker.requestId}] Streaming complete summary:`, {
|
||||||
|
totalContentLength: assistantContent.length,
|
||||||
|
toolCallsCount: toolCalls.length,
|
||||||
|
hasContent: assistantContent.length > 0,
|
||||||
|
toolNames: toolCalls.map((tc) => tc?.name).filter(Boolean),
|
||||||
})
|
})
|
||||||
|
|
||||||
if (currentChat && result.conversationId) {
|
// NOTE: Messages are saved by the client via update-messages endpoint with full contentBlocks.
|
||||||
await db
|
// Server only updates conversationId here to avoid overwriting client's richer save.
|
||||||
.update(copilotChats)
|
if (currentChat) {
|
||||||
.set({
|
// Persist only a safe conversationId to avoid continuing from a state that expects tool outputs
|
||||||
updatedAt: new Date(),
|
const previousConversationId = currentChat?.conversationId as string | undefined
|
||||||
conversationId: result.conversationId,
|
const responseId = lastSafeDoneResponseId || previousConversationId || undefined
|
||||||
})
|
|
||||||
.where(eq(copilotChats.id, actualChatId!))
|
if (responseId) {
|
||||||
|
await db
|
||||||
|
.update(copilotChats)
|
||||||
|
.set({
|
||||||
|
updatedAt: new Date(),
|
||||||
|
conversationId: responseId,
|
||||||
|
})
|
||||||
|
.where(eq(copilotChats.id, actualChatId!))
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
`[${tracker.requestId}] Updated conversationId for chat ${actualChatId}`,
|
||||||
|
{
|
||||||
|
updatedConversationId: responseId,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
await eventWriter.close()
|
|
||||||
await setStreamMeta(streamId, { status: 'complete', userId: authenticatedUserId })
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(`[${tracker.requestId}] Orchestration error:`, error)
|
logger.error(`[${tracker.requestId}] Error processing stream:`, error)
|
||||||
await eventWriter.close()
|
|
||||||
await setStreamMeta(streamId, {
|
// Send an error event to the client before closing so it knows what happened
|
||||||
status: 'error',
|
try {
|
||||||
userId: authenticatedUserId,
|
const errorMessage =
|
||||||
error: error instanceof Error ? error.message : 'Stream error',
|
error instanceof Error && error.message === 'terminated'
|
||||||
})
|
? 'Connection to AI service was interrupted. Please try again.'
|
||||||
await pushEvent({
|
: 'An unexpected error occurred while processing the response.'
|
||||||
type: 'error',
|
const encoder = new TextEncoder()
|
||||||
data: {
|
|
||||||
displayMessage: 'An unexpected error occurred while processing the response.',
|
// Send error as content so it shows in the chat
|
||||||
},
|
controller.enqueue(
|
||||||
})
|
encoder.encode(
|
||||||
|
`data: ${JSON.stringify({ type: 'content', data: `\n\n_${errorMessage}_` })}\n\n`
|
||||||
|
)
|
||||||
|
)
|
||||||
|
// Send done event to properly close the stream on client
|
||||||
|
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`))
|
||||||
|
} catch (enqueueError) {
|
||||||
|
// Stream might already be closed, that's ok
|
||||||
|
logger.warn(
|
||||||
|
`[${tracker.requestId}] Could not send error event to client:`,
|
||||||
|
enqueueError
|
||||||
|
)
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
controller.close()
|
try {
|
||||||
}
|
controller.close()
|
||||||
},
|
} catch {
|
||||||
async cancel() {
|
// Controller might already be closed
|
||||||
clientDisconnected = true
|
}
|
||||||
if (eventWriter) {
|
|
||||||
await eventWriter.flush()
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
return new Response(transformedStream, {
|
const response = new Response(transformedStream, {
|
||||||
headers: {
|
headers: {
|
||||||
'Content-Type': 'text/event-stream',
|
'Content-Type': 'text/event-stream',
|
||||||
'Cache-Control': 'no-cache',
|
'Cache-Control': 'no-cache',
|
||||||
@@ -609,31 +859,43 @@ export async function POST(req: NextRequest) {
|
|||||||
'X-Accel-Buffering': 'no',
|
'X-Accel-Buffering': 'no',
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
logger.info(`[${tracker.requestId}] Returning streaming response to client`, {
|
||||||
|
duration: tracker.getDuration(),
|
||||||
|
chatId: actualChatId,
|
||||||
|
headers: {
|
||||||
|
'Content-Type': 'text/event-stream',
|
||||||
|
'Cache-Control': 'no-cache',
|
||||||
|
Connection: 'keep-alive',
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
return response
|
||||||
}
|
}
|
||||||
|
|
||||||
const nonStreamingResult = await orchestrateCopilotStream(requestPayload, {
|
// For non-streaming responses
|
||||||
userId: authenticatedUserId,
|
const responseData = await simAgentResponse.json()
|
||||||
workflowId,
|
logger.info(`[${tracker.requestId}] Non-streaming response from sim agent:`, {
|
||||||
chatId: actualChatId,
|
|
||||||
autoExecuteTools: true,
|
|
||||||
interactive: true,
|
|
||||||
})
|
|
||||||
|
|
||||||
const responseData = {
|
|
||||||
content: nonStreamingResult.content,
|
|
||||||
toolCalls: nonStreamingResult.toolCalls,
|
|
||||||
model: selectedModel,
|
|
||||||
provider: providerConfig?.provider || env.COPILOT_PROVIDER || 'openai',
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info(`[${tracker.requestId}] Non-streaming response from orchestrator:`, {
|
|
||||||
hasContent: !!responseData.content,
|
hasContent: !!responseData.content,
|
||||||
contentLength: responseData.content?.length || 0,
|
contentLength: responseData.content?.length || 0,
|
||||||
model: responseData.model,
|
model: responseData.model,
|
||||||
provider: responseData.provider,
|
provider: responseData.provider,
|
||||||
toolCallsCount: responseData.toolCalls?.length || 0,
|
toolCallsCount: responseData.toolCalls?.length || 0,
|
||||||
|
hasTokens: !!responseData.tokens,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Log tool calls if present
|
||||||
|
if (responseData.toolCalls?.length > 0) {
|
||||||
|
responseData.toolCalls.forEach((toolCall: any) => {
|
||||||
|
logger.info(`[${tracker.requestId}] Tool call in response:`, {
|
||||||
|
id: toolCall.id,
|
||||||
|
name: toolCall.name,
|
||||||
|
success: toolCall.success,
|
||||||
|
result: `${JSON.stringify(toolCall.result).substring(0, 200)}...`,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Save messages if we have a chat
|
// Save messages if we have a chat
|
||||||
if (currentChat && responseData.content) {
|
if (currentChat && responseData.content) {
|
||||||
const userMessage = {
|
const userMessage = {
|
||||||
@@ -685,9 +947,6 @@ export async function POST(req: NextRequest) {
|
|||||||
.set({
|
.set({
|
||||||
messages: updatedMessages,
|
messages: updatedMessages,
|
||||||
updatedAt: new Date(),
|
updatedAt: new Date(),
|
||||||
...(nonStreamingResult.conversationId
|
|
||||||
? { conversationId: nonStreamingResult.conversationId }
|
|
||||||
: {}),
|
|
||||||
})
|
})
|
||||||
.where(eq(copilotChats.id, actualChatId!))
|
.where(eq(copilotChats.id, actualChatId!))
|
||||||
}
|
}
|
||||||
@@ -739,7 +998,10 @@ export async function GET(req: NextRequest) {
|
|||||||
try {
|
try {
|
||||||
const { searchParams } = new URL(req.url)
|
const { searchParams } = new URL(req.url)
|
||||||
const workflowId = searchParams.get('workflowId')
|
const workflowId = searchParams.get('workflowId')
|
||||||
const chatId = searchParams.get('chatId')
|
|
||||||
|
if (!workflowId) {
|
||||||
|
return createBadRequestResponse('workflowId is required')
|
||||||
|
}
|
||||||
|
|
||||||
// Get authenticated user using consolidated helper
|
// Get authenticated user using consolidated helper
|
||||||
const { userId: authenticatedUserId, isAuthenticated } =
|
const { userId: authenticatedUserId, isAuthenticated } =
|
||||||
@@ -748,47 +1010,6 @@ export async function GET(req: NextRequest) {
|
|||||||
return createUnauthorizedResponse()
|
return createUnauthorizedResponse()
|
||||||
}
|
}
|
||||||
|
|
||||||
// If chatId is provided, fetch a single chat
|
|
||||||
if (chatId) {
|
|
||||||
const [chat] = await db
|
|
||||||
.select({
|
|
||||||
id: copilotChats.id,
|
|
||||||
title: copilotChats.title,
|
|
||||||
model: copilotChats.model,
|
|
||||||
messages: copilotChats.messages,
|
|
||||||
planArtifact: copilotChats.planArtifact,
|
|
||||||
config: copilotChats.config,
|
|
||||||
createdAt: copilotChats.createdAt,
|
|
||||||
updatedAt: copilotChats.updatedAt,
|
|
||||||
})
|
|
||||||
.from(copilotChats)
|
|
||||||
.where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, authenticatedUserId)))
|
|
||||||
.limit(1)
|
|
||||||
|
|
||||||
if (!chat) {
|
|
||||||
return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 })
|
|
||||||
}
|
|
||||||
|
|
||||||
const transformedChat = {
|
|
||||||
id: chat.id,
|
|
||||||
title: chat.title,
|
|
||||||
model: chat.model,
|
|
||||||
messages: Array.isArray(chat.messages) ? chat.messages : [],
|
|
||||||
messageCount: Array.isArray(chat.messages) ? chat.messages.length : 0,
|
|
||||||
planArtifact: chat.planArtifact || null,
|
|
||||||
config: chat.config || null,
|
|
||||||
createdAt: chat.createdAt,
|
|
||||||
updatedAt: chat.updatedAt,
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info(`Retrieved chat ${chatId}`)
|
|
||||||
return NextResponse.json({ success: true, chat: transformedChat })
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!workflowId) {
|
|
||||||
return createBadRequestResponse('workflowId or chatId is required')
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetch chats for this user and workflow
|
// Fetch chats for this user and workflow
|
||||||
const chats = await db
|
const chats = await db
|
||||||
.select({
|
.select({
|
||||||
|
|||||||
@@ -1,130 +0,0 @@
|
|||||||
import { createLogger } from '@sim/logger'
|
|
||||||
import { type NextRequest, NextResponse } from 'next/server'
|
|
||||||
import {
|
|
||||||
getStreamMeta,
|
|
||||||
readStreamEvents,
|
|
||||||
type StreamMeta,
|
|
||||||
} from '@/lib/copilot/orchestrator/stream-buffer'
|
|
||||||
import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers'
|
|
||||||
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
|
||||||
|
|
||||||
const logger = createLogger('CopilotChatStreamAPI')
|
|
||||||
const POLL_INTERVAL_MS = 250
|
|
||||||
const MAX_STREAM_MS = 10 * 60 * 1000
|
|
||||||
|
|
||||||
function encodeEvent(event: Record<string, any>): Uint8Array {
|
|
||||||
return new TextEncoder().encode(`data: ${JSON.stringify(event)}\n\n`)
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function GET(request: NextRequest) {
|
|
||||||
const { userId: authenticatedUserId, isAuthenticated } =
|
|
||||||
await authenticateCopilotRequestSessionOnly()
|
|
||||||
|
|
||||||
if (!isAuthenticated || !authenticatedUserId) {
|
|
||||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
|
||||||
}
|
|
||||||
|
|
||||||
const url = new URL(request.url)
|
|
||||||
const streamId = url.searchParams.get('streamId') || ''
|
|
||||||
const fromParam = url.searchParams.get('from') || '0'
|
|
||||||
const fromEventId = Number(fromParam || 0)
|
|
||||||
// If batch=true, return buffered events as JSON instead of SSE
|
|
||||||
const batchMode = url.searchParams.get('batch') === 'true'
|
|
||||||
const toParam = url.searchParams.get('to')
|
|
||||||
const toEventId = toParam ? Number(toParam) : undefined
|
|
||||||
|
|
||||||
if (!streamId) {
|
|
||||||
return NextResponse.json({ error: 'streamId is required' }, { status: 400 })
|
|
||||||
}
|
|
||||||
|
|
||||||
const meta = (await getStreamMeta(streamId)) as StreamMeta | null
|
|
||||||
logger.info('[Resume] Stream lookup', {
|
|
||||||
streamId,
|
|
||||||
fromEventId,
|
|
||||||
toEventId,
|
|
||||||
batchMode,
|
|
||||||
hasMeta: !!meta,
|
|
||||||
metaStatus: meta?.status,
|
|
||||||
})
|
|
||||||
if (!meta) {
|
|
||||||
return NextResponse.json({ error: 'Stream not found' }, { status: 404 })
|
|
||||||
}
|
|
||||||
if (meta.userId && meta.userId !== authenticatedUserId) {
|
|
||||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 403 })
|
|
||||||
}
|
|
||||||
|
|
||||||
// Batch mode: return all buffered events as JSON
|
|
||||||
if (batchMode) {
|
|
||||||
const events = await readStreamEvents(streamId, fromEventId)
|
|
||||||
const filteredEvents = toEventId ? events.filter((e) => e.eventId <= toEventId) : events
|
|
||||||
logger.info('[Resume] Batch response', {
|
|
||||||
streamId,
|
|
||||||
fromEventId,
|
|
||||||
toEventId,
|
|
||||||
eventCount: filteredEvents.length,
|
|
||||||
})
|
|
||||||
return NextResponse.json({
|
|
||||||
success: true,
|
|
||||||
events: filteredEvents,
|
|
||||||
status: meta.status,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
const startTime = Date.now()
|
|
||||||
|
|
||||||
const stream = new ReadableStream({
|
|
||||||
async start(controller) {
|
|
||||||
let lastEventId = Number.isFinite(fromEventId) ? fromEventId : 0
|
|
||||||
|
|
||||||
const flushEvents = async () => {
|
|
||||||
const events = await readStreamEvents(streamId, lastEventId)
|
|
||||||
if (events.length > 0) {
|
|
||||||
logger.info('[Resume] Flushing events', {
|
|
||||||
streamId,
|
|
||||||
fromEventId: lastEventId,
|
|
||||||
eventCount: events.length,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
for (const entry of events) {
|
|
||||||
lastEventId = entry.eventId
|
|
||||||
const payload = {
|
|
||||||
...entry.event,
|
|
||||||
eventId: entry.eventId,
|
|
||||||
streamId: entry.streamId,
|
|
||||||
}
|
|
||||||
controller.enqueue(encodeEvent(payload))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
await flushEvents()
|
|
||||||
|
|
||||||
while (Date.now() - startTime < MAX_STREAM_MS) {
|
|
||||||
const currentMeta = await getStreamMeta(streamId)
|
|
||||||
if (!currentMeta) break
|
|
||||||
|
|
||||||
await flushEvents()
|
|
||||||
|
|
||||||
if (currentMeta.status === 'complete' || currentMeta.status === 'error') {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if (request.signal.aborted) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
logger.warn('Stream replay failed', {
|
|
||||||
streamId,
|
|
||||||
error: error instanceof Error ? error.message : String(error),
|
|
||||||
})
|
|
||||||
} finally {
|
|
||||||
controller.close()
|
|
||||||
}
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
return new Response(stream, { headers: SSE_HEADERS })
|
|
||||||
}
|
|
||||||
@@ -1,413 +0,0 @@
|
|||||||
import {
|
|
||||||
type CallToolResult,
|
|
||||||
ErrorCode,
|
|
||||||
type InitializeResult,
|
|
||||||
isJSONRPCNotification,
|
|
||||||
isJSONRPCRequest,
|
|
||||||
type JSONRPCError,
|
|
||||||
type JSONRPCMessage,
|
|
||||||
type JSONRPCResponse,
|
|
||||||
type ListToolsResult,
|
|
||||||
type RequestId,
|
|
||||||
} from '@modelcontextprotocol/sdk/types.js'
|
|
||||||
import { createLogger } from '@sim/logger'
|
|
||||||
import { type NextRequest, NextResponse } from 'next/server'
|
|
||||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
|
||||||
import { getCopilotModel } from '@/lib/copilot/config'
|
|
||||||
import { orchestrateSubagentStream } from '@/lib/copilot/orchestrator/subagent'
|
|
||||||
import {
|
|
||||||
executeToolServerSide,
|
|
||||||
prepareExecutionContext,
|
|
||||||
} from '@/lib/copilot/orchestrator/tool-executor'
|
|
||||||
import { DIRECT_TOOL_DEFS, SUBAGENT_TOOL_DEFS } from '@/lib/copilot/tools/mcp/definitions'
|
|
||||||
|
|
||||||
const logger = createLogger('CopilotMcpAPI')
|
|
||||||
|
|
||||||
export const dynamic = 'force-dynamic'
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MCP Server instructions that guide LLMs on how to use the Sim copilot tools.
|
|
||||||
* This is included in the initialize response to help external LLMs understand
|
|
||||||
* the workflow lifecycle and best practices.
|
|
||||||
*/
|
|
||||||
const MCP_SERVER_INSTRUCTIONS = `
|
|
||||||
## Sim Workflow Copilot - Usage Guide
|
|
||||||
|
|
||||||
You are interacting with Sim's workflow automation platform. These tools orchestrate specialized AI agents that build workflows. Follow these guidelines carefully.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Platform Knowledge
|
|
||||||
|
|
||||||
Sim is a workflow automation platform. Workflows are visual pipelines of blocks.
|
|
||||||
|
|
||||||
### Block Types
|
|
||||||
|
|
||||||
**Core Logic:**
|
|
||||||
- **Agent** - The heart of Sim (LLM block with tools, memory, structured output, knowledge bases)
|
|
||||||
- **Function** - JavaScript code execution
|
|
||||||
- **Condition** - If/else branching
|
|
||||||
- **Router** - AI-powered content-based routing
|
|
||||||
- **Loop** - While/do-while iteration
|
|
||||||
- **Parallel** - Simultaneous execution
|
|
||||||
- **API** - HTTP requests
|
|
||||||
|
|
||||||
**Integrations (3rd Party):**
|
|
||||||
- OAuth: Slack, Gmail, Google Calendar, Sheets, Outlook, Linear, GitHub, Notion
|
|
||||||
- API: Stripe, Twilio, SendGrid, any REST API
|
|
||||||
|
|
||||||
### The Agent Block
|
|
||||||
|
|
||||||
The Agent block is the core of intelligent workflows:
|
|
||||||
- **Tools** - Add integrations, custom tools, web search to give it capabilities
|
|
||||||
- **Memory** - Multi-turn conversations with persistent context
|
|
||||||
- **Structured Output** - JSON schema for reliable parsing
|
|
||||||
- **Knowledge Bases** - RAG-powered document retrieval
|
|
||||||
|
|
||||||
**Design principle:** Put tools INSIDE agents rather than using standalone tool blocks.
|
|
||||||
|
|
||||||
### Triggers
|
|
||||||
|
|
||||||
| Type | Description |
|
|
||||||
|------|-------------|
|
|
||||||
| Manual/Chat | User sends message in UI (start block: input, files, conversationId) |
|
|
||||||
| API | REST endpoint with custom input schema |
|
|
||||||
| Webhook | External services POST to trigger URL |
|
|
||||||
| Schedule | Cron-based (hourly, daily, weekly) |
|
|
||||||
|
|
||||||
### Deployments
|
|
||||||
|
|
||||||
| Type | Trigger | Use Case |
|
|
||||||
|------|---------|----------|
|
|
||||||
| API | Start block | REST endpoint for programmatic access |
|
|
||||||
| Chat | Start block | Managed chat UI with auth options |
|
|
||||||
| MCP | Start block | Expose as MCP tool for AI agents |
|
|
||||||
| General | Schedule/Webhook | Activate triggers to run automatically |
|
|
||||||
|
|
||||||
**Undeployed workflows only run in the builder UI.**
|
|
||||||
|
|
||||||
### Variable Syntax
|
|
||||||
|
|
||||||
Reference outputs from previous blocks: \`<blockname.field>\`
|
|
||||||
Reference environment variables: \`{{ENV_VAR_NAME}}\`
|
|
||||||
|
|
||||||
Rules:
|
|
||||||
- Block names must be lowercase, no spaces, no special characters
|
|
||||||
- Use dot notation for nested fields: \`<blockname.field.subfield>\`
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Workflow Lifecycle
|
|
||||||
|
|
||||||
1. **Create**: For NEW workflows, FIRST call create_workflow to get a workflowId
|
|
||||||
2. **Plan**: Use copilot_plan with the workflowId to plan the workflow
|
|
||||||
3. **Edit**: Use copilot_edit with the workflowId AND the plan to build the workflow
|
|
||||||
4. **Deploy**: ALWAYS deploy after building using copilot_deploy before testing/running
|
|
||||||
5. **Test**: Use copilot_test to verify the workflow works correctly
|
|
||||||
6. **Share**: Provide the user with the workflow URL after completion
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## CRITICAL: Always Pass workflowId
|
|
||||||
|
|
||||||
- For NEW workflows: Call create_workflow FIRST, then use the returned workflowId
|
|
||||||
- For EXISTING workflows: Pass the workflowId to all copilot tools
|
|
||||||
- copilot_plan, copilot_edit, copilot_deploy, copilot_test, copilot_debug all REQUIRE workflowId
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## CRITICAL: How to Handle Plans
|
|
||||||
|
|
||||||
The copilot_plan tool returns a structured plan object. You MUST:
|
|
||||||
|
|
||||||
1. **Do NOT modify the plan**: Pass the plan object EXACTLY as returned to copilot_edit
|
|
||||||
2. **Do NOT interpret or summarize the plan**: The edit agent needs the raw plan data
|
|
||||||
3. **Pass the plan in the context.plan field**: \`{ "context": { "plan": <plan_object> } }\`
|
|
||||||
4. **Include ALL plan data**: Block configurations, connections, credentials, everything
|
|
||||||
|
|
||||||
Example flow:
|
|
||||||
\`\`\`
|
|
||||||
1. copilot_plan({ request: "build a workflow...", workflowId: "abc123" })
|
|
||||||
-> Returns: { "plan": { "blocks": [...], "connections": [...], ... } }
|
|
||||||
|
|
||||||
2. copilot_edit({
|
|
||||||
workflowId: "abc123",
|
|
||||||
message: "Execute the plan",
|
|
||||||
context: { "plan": <EXACT plan object from step 1> }
|
|
||||||
})
|
|
||||||
\`\`\`
|
|
||||||
|
|
||||||
**Why this matters**: The plan contains technical details (block IDs, field mappings, API schemas) that the edit agent needs verbatim. Summarizing or rephrasing loses critical information.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## CRITICAL: Error Handling
|
|
||||||
|
|
||||||
**If the user says "doesn't work", "broke", "failed", "error" → ALWAYS use copilot_debug FIRST.**
|
|
||||||
|
|
||||||
Don't guess. Don't plan. Debug first to find the actual problem.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Important Rules
|
|
||||||
|
|
||||||
- ALWAYS deploy a workflow before attempting to run or test it
|
|
||||||
- Workflows must be deployed to have an "active deployment" for execution
|
|
||||||
- After building, call copilot_deploy with the appropriate deployment type (api, chat, or mcp)
|
|
||||||
- Return the workflow URL to the user so they can access it in Sim
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Quick Operations (use direct tools)
|
|
||||||
- list_workflows, list_workspaces, list_folders, get_workflow: Fast database queries
|
|
||||||
- create_workflow: Create new workflow and get workflowId (CALL THIS FIRST for new workflows)
|
|
||||||
- create_folder: Create new resources
|
|
||||||
|
|
||||||
## Workflow Building (use copilot tools)
|
|
||||||
- copilot_plan: Plan workflow changes (REQUIRES workflowId) - returns a plan object
|
|
||||||
- copilot_edit: Execute the plan (REQUIRES workflowId AND plan from copilot_plan)
|
|
||||||
- copilot_deploy: Deploy workflows (REQUIRES workflowId)
|
|
||||||
- copilot_test: Test workflow execution (REQUIRES workflowId)
|
|
||||||
- copilot_debug: Diagnose errors (REQUIRES workflowId) - USE THIS FIRST for issues
|
|
||||||
`
|
|
||||||
|
|
||||||
function createResponse(id: RequestId, result: unknown): JSONRPCResponse {
|
|
||||||
return {
|
|
||||||
jsonrpc: '2.0',
|
|
||||||
id,
|
|
||||||
result: result as JSONRPCResponse['result'],
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function createError(id: RequestId, code: ErrorCode | number, message: string): JSONRPCError {
|
|
||||||
return {
|
|
||||||
jsonrpc: '2.0',
|
|
||||||
id,
|
|
||||||
error: { code, message },
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function GET() {
|
|
||||||
return NextResponse.json({
|
|
||||||
name: 'copilot-subagents',
|
|
||||||
version: '1.0.0',
|
|
||||||
protocolVersion: '2024-11-05',
|
|
||||||
capabilities: { tools: {} },
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function POST(request: NextRequest) {
|
|
||||||
try {
|
|
||||||
const auth = await checkHybridAuth(request, { requireWorkflowId: false })
|
|
||||||
if (!auth.success || !auth.userId) {
|
|
||||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
|
||||||
}
|
|
||||||
|
|
||||||
const body = (await request.json()) as JSONRPCMessage
|
|
||||||
|
|
||||||
if (isJSONRPCNotification(body)) {
|
|
||||||
return new NextResponse(null, { status: 202 })
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!isJSONRPCRequest(body)) {
|
|
||||||
return NextResponse.json(
|
|
||||||
createError(0, ErrorCode.InvalidRequest, 'Invalid JSON-RPC message'),
|
|
||||||
{ status: 400 }
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
const { id, method, params } = body
|
|
||||||
|
|
||||||
switch (method) {
|
|
||||||
case 'initialize': {
|
|
||||||
const result: InitializeResult = {
|
|
||||||
protocolVersion: '2024-11-05',
|
|
||||||
capabilities: { tools: {} },
|
|
||||||
serverInfo: { name: 'sim-copilot', version: '1.0.0' },
|
|
||||||
instructions: MCP_SERVER_INSTRUCTIONS,
|
|
||||||
}
|
|
||||||
return NextResponse.json(createResponse(id, result))
|
|
||||||
}
|
|
||||||
case 'ping':
|
|
||||||
return NextResponse.json(createResponse(id, {}))
|
|
||||||
case 'tools/list':
|
|
||||||
return handleToolsList(id)
|
|
||||||
case 'tools/call':
|
|
||||||
return handleToolsCall(
|
|
||||||
id,
|
|
||||||
params as { name: string; arguments?: Record<string, unknown> },
|
|
||||||
auth.userId
|
|
||||||
)
|
|
||||||
default:
|
|
||||||
return NextResponse.json(
|
|
||||||
createError(id, ErrorCode.MethodNotFound, `Method not found: ${method}`),
|
|
||||||
{ status: 404 }
|
|
||||||
)
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
logger.error('Error handling MCP request', { error })
|
|
||||||
return NextResponse.json(createError(0, ErrorCode.InternalError, 'Internal error'), {
|
|
||||||
status: 500,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function handleToolsList(id: RequestId): Promise<NextResponse> {
|
|
||||||
const directTools = DIRECT_TOOL_DEFS.map((tool) => ({
|
|
||||||
name: tool.name,
|
|
||||||
description: tool.description,
|
|
||||||
inputSchema: tool.inputSchema,
|
|
||||||
}))
|
|
||||||
|
|
||||||
const subagentTools = SUBAGENT_TOOL_DEFS.map((tool) => ({
|
|
||||||
name: tool.name,
|
|
||||||
description: tool.description,
|
|
||||||
inputSchema: tool.inputSchema,
|
|
||||||
}))
|
|
||||||
|
|
||||||
const result: ListToolsResult = {
|
|
||||||
tools: [...directTools, ...subagentTools],
|
|
||||||
}
|
|
||||||
|
|
||||||
return NextResponse.json(createResponse(id, result))
|
|
||||||
}
|
|
||||||
|
|
||||||
async function handleToolsCall(
|
|
||||||
id: RequestId,
|
|
||||||
params: { name: string; arguments?: Record<string, unknown> },
|
|
||||||
userId: string
|
|
||||||
): Promise<NextResponse> {
|
|
||||||
const args = params.arguments || {}
|
|
||||||
|
|
||||||
// Check if this is a direct tool (fast, no LLM)
|
|
||||||
const directTool = DIRECT_TOOL_DEFS.find((tool) => tool.name === params.name)
|
|
||||||
if (directTool) {
|
|
||||||
return handleDirectToolCall(id, directTool, args, userId)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if this is a subagent tool (uses LLM orchestration)
|
|
||||||
const subagentTool = SUBAGENT_TOOL_DEFS.find((tool) => tool.name === params.name)
|
|
||||||
if (subagentTool) {
|
|
||||||
return handleSubagentToolCall(id, subagentTool, args, userId)
|
|
||||||
}
|
|
||||||
|
|
||||||
return NextResponse.json(
|
|
||||||
createError(id, ErrorCode.MethodNotFound, `Tool not found: ${params.name}`),
|
|
||||||
{ status: 404 }
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
async function handleDirectToolCall(
|
|
||||||
id: RequestId,
|
|
||||||
toolDef: (typeof DIRECT_TOOL_DEFS)[number],
|
|
||||||
args: Record<string, unknown>,
|
|
||||||
userId: string
|
|
||||||
): Promise<NextResponse> {
|
|
||||||
try {
|
|
||||||
const execContext = await prepareExecutionContext(userId, (args.workflowId as string) || '')
|
|
||||||
|
|
||||||
const toolCall = {
|
|
||||||
id: crypto.randomUUID(),
|
|
||||||
name: toolDef.toolId,
|
|
||||||
status: 'pending' as const,
|
|
||||||
params: args as Record<string, any>,
|
|
||||||
startTime: Date.now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
const result = await executeToolServerSide(toolCall, execContext)
|
|
||||||
|
|
||||||
const response: CallToolResult = {
|
|
||||||
content: [
|
|
||||||
{
|
|
||||||
type: 'text',
|
|
||||||
text: JSON.stringify(result.output ?? result, null, 2),
|
|
||||||
},
|
|
||||||
],
|
|
||||||
isError: !result.success,
|
|
||||||
}
|
|
||||||
|
|
||||||
return NextResponse.json(createResponse(id, response))
|
|
||||||
} catch (error) {
|
|
||||||
logger.error('Direct tool execution failed', { tool: toolDef.name, error })
|
|
||||||
return NextResponse.json(
|
|
||||||
createError(id, ErrorCode.InternalError, `Tool execution failed: ${error}`),
|
|
||||||
{ status: 500 }
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function handleSubagentToolCall(
|
|
||||||
id: RequestId,
|
|
||||||
toolDef: (typeof SUBAGENT_TOOL_DEFS)[number],
|
|
||||||
args: Record<string, unknown>,
|
|
||||||
userId: string
|
|
||||||
): Promise<NextResponse> {
|
|
||||||
const requestText =
|
|
||||||
(args.request as string) ||
|
|
||||||
(args.message as string) ||
|
|
||||||
(args.error as string) ||
|
|
||||||
JSON.stringify(args)
|
|
||||||
|
|
||||||
const context = (args.context as Record<string, unknown>) || {}
|
|
||||||
if (args.plan && !context.plan) {
|
|
||||||
context.plan = args.plan
|
|
||||||
}
|
|
||||||
|
|
||||||
const { model } = getCopilotModel('chat')
|
|
||||||
|
|
||||||
const result = await orchestrateSubagentStream(
|
|
||||||
toolDef.agentId,
|
|
||||||
{
|
|
||||||
message: requestText,
|
|
||||||
workflowId: args.workflowId,
|
|
||||||
workspaceId: args.workspaceId,
|
|
||||||
context,
|
|
||||||
model,
|
|
||||||
// Signal to the copilot backend that this is a headless request
|
|
||||||
// so it can enforce workflowId requirements on tools
|
|
||||||
headless: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
userId,
|
|
||||||
workflowId: args.workflowId as string | undefined,
|
|
||||||
workspaceId: args.workspaceId as string | undefined,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
// When a respond tool (plan_respond, edit_respond, etc.) was used,
|
|
||||||
// return only the structured result - not the full result with all internal tool calls.
|
|
||||||
// This provides clean output for MCP consumers.
|
|
||||||
let responseData: unknown
|
|
||||||
if (result.structuredResult) {
|
|
||||||
responseData = {
|
|
||||||
success: result.structuredResult.success ?? result.success,
|
|
||||||
type: result.structuredResult.type,
|
|
||||||
summary: result.structuredResult.summary,
|
|
||||||
data: result.structuredResult.data,
|
|
||||||
}
|
|
||||||
} else if (result.error) {
|
|
||||||
responseData = {
|
|
||||||
success: false,
|
|
||||||
error: result.error,
|
|
||||||
errors: result.errors,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Fallback: return content if no structured result
|
|
||||||
responseData = {
|
|
||||||
success: result.success,
|
|
||||||
content: result.content,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const response: CallToolResult = {
|
|
||||||
content: [
|
|
||||||
{
|
|
||||||
type: 'text',
|
|
||||||
text: JSON.stringify(responseData, null, 2),
|
|
||||||
},
|
|
||||||
],
|
|
||||||
isError: !result.success,
|
|
||||||
}
|
|
||||||
|
|
||||||
return NextResponse.json(createResponse(id, response))
|
|
||||||
}
|
|
||||||
@@ -1,116 +0,0 @@
|
|||||||
import { createLogger } from '@sim/logger'
|
|
||||||
import { type NextRequest, NextResponse } from 'next/server'
|
|
||||||
import { z } from 'zod'
|
|
||||||
import { getCopilotModel } from '@/lib/copilot/config'
|
|
||||||
import { SIM_AGENT_VERSION } from '@/lib/copilot/constants'
|
|
||||||
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
|
|
||||||
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
|
|
||||||
import { resolveWorkflowIdForUser } from '@/lib/workflows/utils'
|
|
||||||
import { authenticateV1Request } from '@/app/api/v1/auth'
|
|
||||||
|
|
||||||
const logger = createLogger('CopilotHeadlessAPI')
|
|
||||||
|
|
||||||
const RequestSchema = z.object({
|
|
||||||
message: z.string().min(1, 'message is required'),
|
|
||||||
workflowId: z.string().optional(),
|
|
||||||
workflowName: z.string().optional(),
|
|
||||||
chatId: z.string().optional(),
|
|
||||||
mode: z.enum(COPILOT_REQUEST_MODES).optional().default('agent'),
|
|
||||||
model: z.string().optional(),
|
|
||||||
autoExecuteTools: z.boolean().optional().default(true),
|
|
||||||
timeout: z.number().optional().default(300000),
|
|
||||||
})
|
|
||||||
|
|
||||||
/**
|
|
||||||
* POST /api/v1/copilot/chat
|
|
||||||
* Headless copilot endpoint for server-side orchestration.
|
|
||||||
*
|
|
||||||
* workflowId is optional - if not provided:
|
|
||||||
* - If workflowName is provided, finds that workflow
|
|
||||||
* - Otherwise uses the user's first workflow as context
|
|
||||||
* - The copilot can still operate on any workflow using list_user_workflows
|
|
||||||
*/
|
|
||||||
export async function POST(req: NextRequest) {
|
|
||||||
const auth = await authenticateV1Request(req)
|
|
||||||
if (!auth.authenticated || !auth.userId) {
|
|
||||||
return NextResponse.json(
|
|
||||||
{ success: false, error: auth.error || 'Unauthorized' },
|
|
||||||
{ status: 401 }
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
const body = await req.json()
|
|
||||||
const parsed = RequestSchema.parse(body)
|
|
||||||
const defaults = getCopilotModel('chat')
|
|
||||||
const selectedModel = parsed.model || defaults.model
|
|
||||||
|
|
||||||
// Resolve workflow ID
|
|
||||||
const resolved = await resolveWorkflowIdForUser(
|
|
||||||
auth.userId,
|
|
||||||
parsed.workflowId,
|
|
||||||
parsed.workflowName
|
|
||||||
)
|
|
||||||
if (!resolved) {
|
|
||||||
return NextResponse.json(
|
|
||||||
{
|
|
||||||
success: false,
|
|
||||||
error: 'No workflows found. Create a workflow first or provide a valid workflowId.',
|
|
||||||
},
|
|
||||||
{ status: 400 }
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Transform mode to transport mode (same as client API)
|
|
||||||
// build and agent both map to 'agent' on the backend
|
|
||||||
const effectiveMode = parsed.mode === 'agent' ? 'build' : parsed.mode
|
|
||||||
const transportMode = effectiveMode === 'build' ? 'agent' : effectiveMode
|
|
||||||
|
|
||||||
// Always generate a chatId - required for artifacts system to work with subagents
|
|
||||||
const chatId = parsed.chatId || crypto.randomUUID()
|
|
||||||
|
|
||||||
const requestPayload = {
|
|
||||||
message: parsed.message,
|
|
||||||
workflowId: resolved.workflowId,
|
|
||||||
userId: auth.userId,
|
|
||||||
stream: true,
|
|
||||||
streamToolCalls: true,
|
|
||||||
model: selectedModel,
|
|
||||||
mode: transportMode,
|
|
||||||
messageId: crypto.randomUUID(),
|
|
||||||
version: SIM_AGENT_VERSION,
|
|
||||||
headless: true, // Enable cross-workflow operations via workflowId params
|
|
||||||
chatId,
|
|
||||||
}
|
|
||||||
|
|
||||||
const result = await orchestrateCopilotStream(requestPayload, {
|
|
||||||
userId: auth.userId,
|
|
||||||
workflowId: resolved.workflowId,
|
|
||||||
chatId,
|
|
||||||
autoExecuteTools: parsed.autoExecuteTools,
|
|
||||||
timeout: parsed.timeout,
|
|
||||||
interactive: false,
|
|
||||||
})
|
|
||||||
|
|
||||||
return NextResponse.json({
|
|
||||||
success: result.success,
|
|
||||||
content: result.content,
|
|
||||||
toolCalls: result.toolCalls,
|
|
||||||
chatId: result.chatId || chatId, // Return the chatId for conversation continuity
|
|
||||||
conversationId: result.conversationId,
|
|
||||||
error: result.error,
|
|
||||||
})
|
|
||||||
} catch (error) {
|
|
||||||
if (error instanceof z.ZodError) {
|
|
||||||
return NextResponse.json(
|
|
||||||
{ success: false, error: 'Invalid request', details: error.errors },
|
|
||||||
{ status: 400 }
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.error('Headless copilot request failed', {
|
|
||||||
error: error instanceof Error ? error.message : String(error),
|
|
||||||
})
|
|
||||||
return NextResponse.json({ success: false, error: 'Internal server error' }, { status: 500 })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,7 +1,6 @@
|
|||||||
'use client'
|
'use client'
|
||||||
|
|
||||||
import { memo, useEffect, useMemo, useRef, useState } from 'react'
|
import { memo, useEffect, useMemo, useRef, useState } from 'react'
|
||||||
import { createLogger } from '@sim/logger'
|
|
||||||
import clsx from 'clsx'
|
import clsx from 'clsx'
|
||||||
import { ChevronUp, LayoutList } from 'lucide-react'
|
import { ChevronUp, LayoutList } from 'lucide-react'
|
||||||
import Editor from 'react-simple-code-editor'
|
import Editor from 'react-simple-code-editor'
|
||||||
@@ -1258,42 +1257,99 @@ function shouldShowRunSkipButtons(toolCall: CopilotToolCall): boolean {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
const toolCallLogger = createLogger('CopilotToolCall')
|
|
||||||
|
|
||||||
async function sendToolDecision(
|
|
||||||
toolCallId: string,
|
|
||||||
status: 'accepted' | 'rejected' | 'background'
|
|
||||||
) {
|
|
||||||
try {
|
|
||||||
await fetch('/api/copilot/confirm', {
|
|
||||||
method: 'POST',
|
|
||||||
headers: { 'Content-Type': 'application/json' },
|
|
||||||
body: JSON.stringify({ toolCallId, status }),
|
|
||||||
})
|
|
||||||
} catch (error) {
|
|
||||||
toolCallLogger.warn('Failed to send tool decision', {
|
|
||||||
toolCallId,
|
|
||||||
status,
|
|
||||||
error: error instanceof Error ? error.message : String(error),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function handleRun(
|
async function handleRun(
|
||||||
toolCall: CopilotToolCall,
|
toolCall: CopilotToolCall,
|
||||||
setToolCallState: any,
|
setToolCallState: any,
|
||||||
onStateChange?: any,
|
onStateChange?: any,
|
||||||
editedParams?: any
|
editedParams?: any
|
||||||
) {
|
) {
|
||||||
setToolCallState(toolCall, 'executing', editedParams ? { params: editedParams } : undefined)
|
const instance = getClientTool(toolCall.id)
|
||||||
onStateChange?.('executing')
|
|
||||||
await sendToolDecision(toolCall.id, 'accepted')
|
if (!instance && isIntegrationTool(toolCall.name)) {
|
||||||
|
onStateChange?.('executing')
|
||||||
|
try {
|
||||||
|
await useCopilotStore.getState().executeIntegrationTool(toolCall.id)
|
||||||
|
} catch (e) {
|
||||||
|
setToolCallState(toolCall, 'error', { error: e instanceof Error ? e.message : String(e) })
|
||||||
|
onStateChange?.('error')
|
||||||
|
try {
|
||||||
|
await fetch('/api/copilot/tools/mark-complete', {
|
||||||
|
method: 'POST',
|
||||||
|
headers: { 'Content-Type': 'application/json' },
|
||||||
|
body: JSON.stringify({
|
||||||
|
id: toolCall.id,
|
||||||
|
name: toolCall.name,
|
||||||
|
status: 500,
|
||||||
|
message: e instanceof Error ? e.message : 'Tool execution failed',
|
||||||
|
data: { error: e instanceof Error ? e.message : String(e) },
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
} catch {
|
||||||
|
console.error('[handleRun] Failed to notify backend of tool error:', toolCall.id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!instance) return
|
||||||
|
try {
|
||||||
|
const mergedParams =
|
||||||
|
editedParams ||
|
||||||
|
(toolCall as any).params ||
|
||||||
|
(toolCall as any).parameters ||
|
||||||
|
(toolCall as any).input ||
|
||||||
|
{}
|
||||||
|
await instance.handleAccept?.(mergedParams)
|
||||||
|
onStateChange?.('executing')
|
||||||
|
} catch (e) {
|
||||||
|
setToolCallState(toolCall, 'error', { error: e instanceof Error ? e.message : String(e) })
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function handleSkip(toolCall: CopilotToolCall, setToolCallState: any, onStateChange?: any) {
|
async function handleSkip(toolCall: CopilotToolCall, setToolCallState: any, onStateChange?: any) {
|
||||||
|
const instance = getClientTool(toolCall.id)
|
||||||
|
|
||||||
|
if (!instance && isIntegrationTool(toolCall.name)) {
|
||||||
|
setToolCallState(toolCall, 'rejected')
|
||||||
|
onStateChange?.('rejected')
|
||||||
|
|
||||||
|
let notified = false
|
||||||
|
for (let attempt = 0; attempt < 3 && !notified; attempt++) {
|
||||||
|
try {
|
||||||
|
const res = await fetch('/api/copilot/tools/mark-complete', {
|
||||||
|
method: 'POST',
|
||||||
|
headers: { 'Content-Type': 'application/json' },
|
||||||
|
body: JSON.stringify({
|
||||||
|
id: toolCall.id,
|
||||||
|
name: toolCall.name,
|
||||||
|
status: 400,
|
||||||
|
message: 'Tool execution skipped by user',
|
||||||
|
data: { skipped: true, reason: 'user_skipped' },
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
if (res.ok) {
|
||||||
|
notified = true
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
if (attempt < 2) {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 500))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!notified) {
|
||||||
|
console.error('[handleSkip] Failed to notify backend after 3 attempts:', toolCall.id)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if (instance) {
|
||||||
|
try {
|
||||||
|
await instance.handleReject?.()
|
||||||
|
} catch {}
|
||||||
|
}
|
||||||
setToolCallState(toolCall, 'rejected')
|
setToolCallState(toolCall, 'rejected')
|
||||||
onStateChange?.('rejected')
|
onStateChange?.('rejected')
|
||||||
await sendToolDecision(toolCall.id, 'rejected')
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function getDisplayName(toolCall: CopilotToolCall): string {
|
function getDisplayName(toolCall: CopilotToolCall): string {
|
||||||
@@ -1453,7 +1509,7 @@ export function ToolCall({
|
|||||||
// Check if this integration tool is auto-allowed
|
// Check if this integration tool is auto-allowed
|
||||||
// Subscribe to autoAllowedTools so we re-render when it changes
|
// Subscribe to autoAllowedTools so we re-render when it changes
|
||||||
const autoAllowedTools = useCopilotStore((s) => s.autoAllowedTools)
|
const autoAllowedTools = useCopilotStore((s) => s.autoAllowedTools)
|
||||||
const { removeAutoAllowedTool, setToolCallState } = useCopilotStore()
|
const { removeAutoAllowedTool } = useCopilotStore()
|
||||||
const isAutoAllowed = isIntegrationTool(toolCall.name) && autoAllowedTools.includes(toolCall.name)
|
const isAutoAllowed = isIntegrationTool(toolCall.name) && autoAllowedTools.includes(toolCall.name)
|
||||||
|
|
||||||
// Update edited params when toolCall params change (deep comparison to avoid resetting user edits on ref change)
|
// Update edited params when toolCall params change (deep comparison to avoid resetting user edits on ref change)
|
||||||
@@ -2155,9 +2211,16 @@ export function ToolCall({
|
|||||||
<div className='mt-[10px]'>
|
<div className='mt-[10px]'>
|
||||||
<Button
|
<Button
|
||||||
onClick={async () => {
|
onClick={async () => {
|
||||||
setToolCallState(toolCall, ClientToolCallState.background)
|
try {
|
||||||
onStateChange?.('background')
|
const instance = getClientTool(toolCall.id)
|
||||||
await sendToolDecision(toolCall.id, 'background')
|
instance?.setState?.((ClientToolCallState as any).background)
|
||||||
|
await instance?.markToolComplete?.(
|
||||||
|
200,
|
||||||
|
'The user has chosen to move the workflow execution to the background. Check back with them later to know when the workflow execution is complete'
|
||||||
|
)
|
||||||
|
forceUpdate({})
|
||||||
|
onStateChange?.('background')
|
||||||
|
} catch {}
|
||||||
}}
|
}}
|
||||||
variant='tertiary'
|
variant='tertiary'
|
||||||
title='Move to Background'
|
title='Move to Background'
|
||||||
@@ -2169,9 +2232,21 @@ export function ToolCall({
|
|||||||
<div className='mt-[10px]'>
|
<div className='mt-[10px]'>
|
||||||
<Button
|
<Button
|
||||||
onClick={async () => {
|
onClick={async () => {
|
||||||
setToolCallState(toolCall, ClientToolCallState.background)
|
try {
|
||||||
onStateChange?.('background')
|
const instance = getClientTool(toolCall.id)
|
||||||
await sendToolDecision(toolCall.id, 'background')
|
const elapsedSeconds = instance?.getElapsedSeconds?.() || 0
|
||||||
|
instance?.setState?.((ClientToolCallState as any).background, {
|
||||||
|
result: { _elapsedSeconds: elapsedSeconds },
|
||||||
|
})
|
||||||
|
const { updateToolCallParams } = useCopilotStore.getState()
|
||||||
|
updateToolCallParams?.(toolCall.id, { _elapsedSeconds: Math.round(elapsedSeconds) })
|
||||||
|
await instance?.markToolComplete?.(
|
||||||
|
200,
|
||||||
|
`User woke you up after ${Math.round(elapsedSeconds)} seconds`
|
||||||
|
)
|
||||||
|
forceUpdate({})
|
||||||
|
onStateChange?.('background')
|
||||||
|
} catch {}
|
||||||
}}
|
}}
|
||||||
variant='tertiary'
|
variant='tertiary'
|
||||||
title='Wake'
|
title='Wake'
|
||||||
|
|||||||
@@ -114,7 +114,6 @@ export const Copilot = forwardRef<CopilotRef, CopilotProps>(({ panelWidth }, ref
|
|||||||
clearPlanArtifact,
|
clearPlanArtifact,
|
||||||
savePlanArtifact,
|
savePlanArtifact,
|
||||||
loadAutoAllowedTools,
|
loadAutoAllowedTools,
|
||||||
resumeActiveStream,
|
|
||||||
} = useCopilotStore()
|
} = useCopilotStore()
|
||||||
|
|
||||||
// Initialize copilot
|
// Initialize copilot
|
||||||
@@ -127,7 +126,6 @@ export const Copilot = forwardRef<CopilotRef, CopilotProps>(({ panelWidth }, ref
|
|||||||
loadAutoAllowedTools,
|
loadAutoAllowedTools,
|
||||||
currentChat,
|
currentChat,
|
||||||
isSendingMessage,
|
isSendingMessage,
|
||||||
resumeActiveStream,
|
|
||||||
})
|
})
|
||||||
|
|
||||||
// Handle scroll management (80px stickiness for copilot)
|
// Handle scroll management (80px stickiness for copilot)
|
||||||
@@ -423,8 +421,8 @@ export const Copilot = forwardRef<CopilotRef, CopilotProps>(({ panelWidth }, ref
|
|||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
{/* Show loading state until fully initialized, but skip if actively streaming (resume case) */}
|
{/* Show loading state until fully initialized */}
|
||||||
{!isInitialized && !isSendingMessage ? (
|
{!isInitialized ? (
|
||||||
<div className='flex h-full w-full items-center justify-center'>
|
<div className='flex h-full w-full items-center justify-center'>
|
||||||
<div className='flex flex-col items-center gap-3'>
|
<div className='flex flex-col items-center gap-3'>
|
||||||
<p className='text-muted-foreground text-sm'>Loading copilot</p>
|
<p className='text-muted-foreground text-sm'>Loading copilot</p>
|
||||||
|
|||||||
@@ -14,7 +14,6 @@ interface UseCopilotInitializationProps {
|
|||||||
loadAutoAllowedTools: () => Promise<void>
|
loadAutoAllowedTools: () => Promise<void>
|
||||||
currentChat: any
|
currentChat: any
|
||||||
isSendingMessage: boolean
|
isSendingMessage: boolean
|
||||||
resumeActiveStream: () => Promise<boolean>
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -33,13 +32,11 @@ export function useCopilotInitialization(props: UseCopilotInitializationProps) {
|
|||||||
loadAutoAllowedTools,
|
loadAutoAllowedTools,
|
||||||
currentChat,
|
currentChat,
|
||||||
isSendingMessage,
|
isSendingMessage,
|
||||||
resumeActiveStream,
|
|
||||||
} = props
|
} = props
|
||||||
|
|
||||||
const [isInitialized, setIsInitialized] = useState(false)
|
const [isInitialized, setIsInitialized] = useState(false)
|
||||||
const lastWorkflowIdRef = useRef<string | null>(null)
|
const lastWorkflowIdRef = useRef<string | null>(null)
|
||||||
const hasMountedRef = useRef(false)
|
const hasMountedRef = useRef(false)
|
||||||
const hasResumedRef = useRef(false)
|
|
||||||
|
|
||||||
/** Initialize on mount - loads chats if needed. Never loads during streaming */
|
/** Initialize on mount - loads chats if needed. Never loads during streaming */
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
@@ -108,16 +105,6 @@ export function useCopilotInitialization(props: UseCopilotInitializationProps) {
|
|||||||
isSendingMessage,
|
isSendingMessage,
|
||||||
])
|
])
|
||||||
|
|
||||||
/** Try to resume active stream on mount - runs early, before waiting for chats */
|
|
||||||
useEffect(() => {
|
|
||||||
if (hasResumedRef.current || isSendingMessage) return
|
|
||||||
hasResumedRef.current = true
|
|
||||||
// Resume immediately on mount - don't wait for isInitialized
|
|
||||||
resumeActiveStream().catch((err) => {
|
|
||||||
logger.warn('[Copilot] Failed to resume active stream', err)
|
|
||||||
})
|
|
||||||
}, [isSendingMessage, resumeActiveStream])
|
|
||||||
|
|
||||||
/** Load auto-allowed tools once on mount - runs immediately, independent of workflow */
|
/** Load auto-allowed tools once on mount - runs immediately, independent of workflow */
|
||||||
const hasLoadedAutoAllowedToolsRef = useRef(false)
|
const hasLoadedAutoAllowedToolsRef = useRef(false)
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
|
|||||||
@@ -82,7 +82,6 @@ export interface SendMessageRequest {
|
|||||||
executionId?: string
|
executionId?: string
|
||||||
}>
|
}>
|
||||||
commands?: string[]
|
commands?: string[]
|
||||||
resumeFromEventId?: number
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -121,7 +120,7 @@ export async function sendStreamingMessage(
|
|||||||
request: SendMessageRequest
|
request: SendMessageRequest
|
||||||
): Promise<StreamingResponse> {
|
): Promise<StreamingResponse> {
|
||||||
try {
|
try {
|
||||||
const { abortSignal, resumeFromEventId, ...requestBody } = request
|
const { abortSignal, ...requestBody } = request
|
||||||
try {
|
try {
|
||||||
const preview = Array.isArray((requestBody as any).contexts)
|
const preview = Array.isArray((requestBody as any).contexts)
|
||||||
? (requestBody as any).contexts.map((c: any) => ({
|
? (requestBody as any).contexts.map((c: any) => ({
|
||||||
@@ -137,51 +136,8 @@ export async function sendStreamingMessage(
|
|||||||
? (requestBody as any).contexts.length
|
? (requestBody as any).contexts.length
|
||||||
: 0,
|
: 0,
|
||||||
contextsPreview: preview,
|
contextsPreview: preview,
|
||||||
resumeFromEventId,
|
|
||||||
})
|
})
|
||||||
} catch {}
|
} catch {}
|
||||||
|
|
||||||
const streamId = request.userMessageId
|
|
||||||
if (typeof resumeFromEventId === 'number') {
|
|
||||||
if (!streamId) {
|
|
||||||
return {
|
|
||||||
success: false,
|
|
||||||
error: 'streamId is required to resume a stream',
|
|
||||||
status: 400,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
const url = `/api/copilot/chat/stream?streamId=${encodeURIComponent(
|
|
||||||
streamId
|
|
||||||
)}&from=${encodeURIComponent(String(resumeFromEventId))}`
|
|
||||||
const response = await fetch(url, {
|
|
||||||
method: 'GET',
|
|
||||||
signal: abortSignal,
|
|
||||||
credentials: 'include',
|
|
||||||
})
|
|
||||||
|
|
||||||
if (!response.ok) {
|
|
||||||
const errorMessage = await handleApiError(response, 'Failed to resume streaming message')
|
|
||||||
return {
|
|
||||||
success: false,
|
|
||||||
error: errorMessage,
|
|
||||||
status: response.status,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!response.body) {
|
|
||||||
return {
|
|
||||||
success: false,
|
|
||||||
error: 'No response body received',
|
|
||||||
status: 500,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
|
||||||
success: true,
|
|
||||||
stream: response.body,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const response = await fetch('/api/copilot/chat', {
|
const response = await fetch('/api/copilot/chat', {
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
headers: { 'Content-Type': 'application/json' },
|
headers: { 'Content-Type': 'application/json' },
|
||||||
|
|||||||
@@ -1,36 +0,0 @@
|
|||||||
export const INTERRUPT_TOOL_NAMES = [
|
|
||||||
'set_global_workflow_variables',
|
|
||||||
'run_workflow',
|
|
||||||
'manage_mcp_tool',
|
|
||||||
'manage_custom_tool',
|
|
||||||
'deploy_mcp',
|
|
||||||
'deploy_chat',
|
|
||||||
'deploy_api',
|
|
||||||
'create_workspace_mcp_server',
|
|
||||||
'set_environment_variables',
|
|
||||||
'make_api_request',
|
|
||||||
'oauth_request_access',
|
|
||||||
'navigate_ui',
|
|
||||||
'knowledge_base',
|
|
||||||
] as const
|
|
||||||
|
|
||||||
export const INTERRUPT_TOOL_SET = new Set<string>(INTERRUPT_TOOL_NAMES)
|
|
||||||
|
|
||||||
export const SUBAGENT_TOOL_NAMES = [
|
|
||||||
'debug',
|
|
||||||
'edit',
|
|
||||||
'plan',
|
|
||||||
'test',
|
|
||||||
'deploy',
|
|
||||||
'auth',
|
|
||||||
'research',
|
|
||||||
'knowledge',
|
|
||||||
'custom_tool',
|
|
||||||
'tour',
|
|
||||||
'info',
|
|
||||||
'workflow',
|
|
||||||
'evaluate',
|
|
||||||
'superagent',
|
|
||||||
] as const
|
|
||||||
|
|
||||||
export const SUBAGENT_TOOL_SET = new Set<string>(SUBAGENT_TOOL_NAMES)
|
|
||||||
@@ -1,224 +0,0 @@
|
|||||||
import { createLogger } from '@sim/logger'
|
|
||||||
import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants'
|
|
||||||
import {
|
|
||||||
getToolCallIdFromEvent,
|
|
||||||
handleSubagentRouting,
|
|
||||||
markToolCallSeen,
|
|
||||||
markToolResultSeen,
|
|
||||||
normalizeSseEvent,
|
|
||||||
sseHandlers,
|
|
||||||
subAgentHandlers,
|
|
||||||
wasToolCallSeen,
|
|
||||||
wasToolResultSeen,
|
|
||||||
} from '@/lib/copilot/orchestrator/sse-handlers'
|
|
||||||
import { parseSSEStream } from '@/lib/copilot/orchestrator/sse-parser'
|
|
||||||
import { prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor'
|
|
||||||
import type {
|
|
||||||
OrchestratorOptions,
|
|
||||||
OrchestratorResult,
|
|
||||||
SSEEvent,
|
|
||||||
StreamingContext,
|
|
||||||
ToolCallSummary,
|
|
||||||
} from '@/lib/copilot/orchestrator/types'
|
|
||||||
import { env } from '@/lib/core/config/env'
|
|
||||||
|
|
||||||
const logger = createLogger('CopilotOrchestrator')
|
|
||||||
const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
|
|
||||||
|
|
||||||
export interface OrchestrateStreamOptions extends OrchestratorOptions {
|
|
||||||
userId: string
|
|
||||||
workflowId: string
|
|
||||||
chatId?: string
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Orchestrate a copilot SSE stream and execute tool calls server-side.
|
|
||||||
*/
|
|
||||||
export async function orchestrateCopilotStream(
|
|
||||||
requestPayload: Record<string, any>,
|
|
||||||
options: OrchestrateStreamOptions
|
|
||||||
): Promise<OrchestratorResult> {
|
|
||||||
const { userId, workflowId, chatId, timeout = 300000, abortSignal } = options
|
|
||||||
const execContext = await prepareExecutionContext(userId, workflowId)
|
|
||||||
|
|
||||||
const context: StreamingContext = {
|
|
||||||
chatId,
|
|
||||||
conversationId: undefined,
|
|
||||||
messageId: requestPayload?.messageId || crypto.randomUUID(),
|
|
||||||
accumulatedContent: '',
|
|
||||||
contentBlocks: [],
|
|
||||||
toolCalls: new Map(),
|
|
||||||
currentThinkingBlock: null,
|
|
||||||
isInThinkingBlock: false,
|
|
||||||
subAgentParentToolCallId: undefined,
|
|
||||||
subAgentContent: {},
|
|
||||||
subAgentToolCalls: {},
|
|
||||||
pendingContent: '',
|
|
||||||
streamComplete: false,
|
|
||||||
wasAborted: false,
|
|
||||||
errors: [],
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
const response = await fetch(`${SIM_AGENT_API_URL}/api/chat-completion-streaming`, {
|
|
||||||
method: 'POST',
|
|
||||||
headers: {
|
|
||||||
'Content-Type': 'application/json',
|
|
||||||
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
|
|
||||||
},
|
|
||||||
body: JSON.stringify(requestPayload),
|
|
||||||
signal: abortSignal,
|
|
||||||
})
|
|
||||||
|
|
||||||
if (!response.ok) {
|
|
||||||
const errorText = await response.text().catch(() => '')
|
|
||||||
throw new Error(
|
|
||||||
`Copilot backend error (${response.status}): ${errorText || response.statusText}`
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!response.body) {
|
|
||||||
throw new Error('Copilot backend response missing body')
|
|
||||||
}
|
|
||||||
|
|
||||||
const reader = response.body.getReader()
|
|
||||||
const decoder = new TextDecoder()
|
|
||||||
|
|
||||||
const timeoutId = setTimeout(() => {
|
|
||||||
context.errors.push('Request timed out')
|
|
||||||
context.streamComplete = true
|
|
||||||
reader.cancel().catch(() => {})
|
|
||||||
}, timeout)
|
|
||||||
|
|
||||||
try {
|
|
||||||
for await (const event of parseSSEStream(reader, decoder, abortSignal)) {
|
|
||||||
if (abortSignal?.aborted) {
|
|
||||||
context.wasAborted = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
const normalizedEvent = normalizeSseEvent(event)
|
|
||||||
|
|
||||||
// Skip tool_result events for tools the sim-side already executed.
|
|
||||||
// The sim-side emits its own tool_result with complete data.
|
|
||||||
// For server-side tools (not executed by sim), we still forward the Go backend's tool_result.
|
|
||||||
const toolCallId = getToolCallIdFromEvent(normalizedEvent)
|
|
||||||
const eventData = normalizedEvent.data
|
|
||||||
|
|
||||||
const isPartialToolCall =
|
|
||||||
normalizedEvent.type === 'tool_call' && eventData?.partial === true
|
|
||||||
|
|
||||||
const shouldSkipToolCall =
|
|
||||||
normalizedEvent.type === 'tool_call' &&
|
|
||||||
!!toolCallId &&
|
|
||||||
!isPartialToolCall &&
|
|
||||||
(wasToolResultSeen(toolCallId) || wasToolCallSeen(toolCallId))
|
|
||||||
|
|
||||||
if (
|
|
||||||
normalizedEvent.type === 'tool_call' &&
|
|
||||||
toolCallId &&
|
|
||||||
!isPartialToolCall &&
|
|
||||||
!shouldSkipToolCall
|
|
||||||
) {
|
|
||||||
markToolCallSeen(toolCallId)
|
|
||||||
}
|
|
||||||
|
|
||||||
const shouldSkipToolResult =
|
|
||||||
normalizedEvent.type === 'tool_result' &&
|
|
||||||
(() => {
|
|
||||||
if (!toolCallId) return false
|
|
||||||
if (wasToolResultSeen(toolCallId)) return true
|
|
||||||
markToolResultSeen(toolCallId)
|
|
||||||
return false
|
|
||||||
})()
|
|
||||||
|
|
||||||
if (!shouldSkipToolCall && !shouldSkipToolResult) {
|
|
||||||
await forwardEvent(normalizedEvent, options)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (normalizedEvent.type === 'subagent_start') {
|
|
||||||
const toolCallId = normalizedEvent.data?.tool_call_id
|
|
||||||
if (toolCallId) {
|
|
||||||
context.subAgentParentToolCallId = toolCallId
|
|
||||||
context.subAgentContent[toolCallId] = ''
|
|
||||||
context.subAgentToolCalls[toolCallId] = []
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if (normalizedEvent.type === 'subagent_end') {
|
|
||||||
context.subAgentParentToolCallId = undefined
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if (handleSubagentRouting(normalizedEvent, context)) {
|
|
||||||
const handler = subAgentHandlers[normalizedEvent.type]
|
|
||||||
if (handler) {
|
|
||||||
await handler(normalizedEvent, context, execContext, options)
|
|
||||||
}
|
|
||||||
if (context.streamComplete) break
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
const handler = sseHandlers[normalizedEvent.type]
|
|
||||||
if (handler) {
|
|
||||||
await handler(normalizedEvent, context, execContext, options)
|
|
||||||
}
|
|
||||||
if (context.streamComplete) break
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
clearTimeout(timeoutId)
|
|
||||||
}
|
|
||||||
|
|
||||||
const result = buildResult(context)
|
|
||||||
await options.onComplete?.(result)
|
|
||||||
return result
|
|
||||||
} catch (error) {
|
|
||||||
const err = error instanceof Error ? error : new Error('Copilot orchestration failed')
|
|
||||||
logger.error('Copilot orchestration failed', { error: err.message })
|
|
||||||
await options.onError?.(err)
|
|
||||||
return {
|
|
||||||
success: false,
|
|
||||||
content: '',
|
|
||||||
contentBlocks: [],
|
|
||||||
toolCalls: [],
|
|
||||||
chatId: context.chatId,
|
|
||||||
conversationId: context.conversationId,
|
|
||||||
error: err.message,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function forwardEvent(event: SSEEvent, options: OrchestratorOptions): Promise<void> {
|
|
||||||
try {
|
|
||||||
await options.onEvent?.(event)
|
|
||||||
} catch (error) {
|
|
||||||
logger.warn('Failed to forward SSE event', {
|
|
||||||
type: event.type,
|
|
||||||
error: error instanceof Error ? error.message : String(error),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function buildResult(context: StreamingContext): OrchestratorResult {
|
|
||||||
const toolCalls: ToolCallSummary[] = Array.from(context.toolCalls.values()).map((toolCall) => ({
|
|
||||||
id: toolCall.id,
|
|
||||||
name: toolCall.name,
|
|
||||||
status: toolCall.status,
|
|
||||||
params: toolCall.params,
|
|
||||||
result: toolCall.result?.output,
|
|
||||||
error: toolCall.error,
|
|
||||||
durationMs:
|
|
||||||
toolCall.endTime && toolCall.startTime ? toolCall.endTime - toolCall.startTime : undefined,
|
|
||||||
}))
|
|
||||||
|
|
||||||
return {
|
|
||||||
success: context.errors.length === 0,
|
|
||||||
content: context.accumulatedContent,
|
|
||||||
contentBlocks: context.contentBlocks,
|
|
||||||
toolCalls,
|
|
||||||
chatId: context.chatId,
|
|
||||||
conversationId: context.conversationId,
|
|
||||||
errors: context.errors.length ? context.errors : undefined,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,140 +0,0 @@
|
|||||||
import { db } from '@sim/db'
|
|
||||||
import { copilotChats } from '@sim/db/schema'
|
|
||||||
import { createLogger } from '@sim/logger'
|
|
||||||
import { and, eq } from 'drizzle-orm'
|
|
||||||
import { getRedisClient } from '@/lib/core/config/redis'
|
|
||||||
|
|
||||||
const logger = createLogger('CopilotOrchestratorPersistence')
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new copilot chat record.
|
|
||||||
*/
|
|
||||||
export async function createChat(params: {
|
|
||||||
userId: string
|
|
||||||
workflowId: string
|
|
||||||
model: string
|
|
||||||
}): Promise<{ id: string }> {
|
|
||||||
const [chat] = await db
|
|
||||||
.insert(copilotChats)
|
|
||||||
.values({
|
|
||||||
userId: params.userId,
|
|
||||||
workflowId: params.workflowId,
|
|
||||||
model: params.model,
|
|
||||||
messages: [],
|
|
||||||
})
|
|
||||||
.returning({ id: copilotChats.id })
|
|
||||||
|
|
||||||
return { id: chat.id }
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Load an existing chat for a user.
|
|
||||||
*/
|
|
||||||
export async function loadChat(chatId: string, userId: string) {
|
|
||||||
const [chat] = await db
|
|
||||||
.select()
|
|
||||||
.from(copilotChats)
|
|
||||||
.where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, userId)))
|
|
||||||
.limit(1)
|
|
||||||
|
|
||||||
return chat || null
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Save chat messages and metadata.
|
|
||||||
*/
|
|
||||||
export async function saveMessages(
|
|
||||||
chatId: string,
|
|
||||||
messages: any[],
|
|
||||||
options?: {
|
|
||||||
title?: string
|
|
||||||
conversationId?: string
|
|
||||||
planArtifact?: string | null
|
|
||||||
config?: { mode?: string; model?: string }
|
|
||||||
}
|
|
||||||
): Promise<void> {
|
|
||||||
await db
|
|
||||||
.update(copilotChats)
|
|
||||||
.set({
|
|
||||||
messages,
|
|
||||||
updatedAt: new Date(),
|
|
||||||
...(options?.title ? { title: options.title } : {}),
|
|
||||||
...(options?.conversationId ? { conversationId: options.conversationId } : {}),
|
|
||||||
...(options?.planArtifact !== undefined ? { planArtifact: options.planArtifact } : {}),
|
|
||||||
...(options?.config ? { config: options.config } : {}),
|
|
||||||
})
|
|
||||||
.where(eq(copilotChats.id, chatId))
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Update the conversationId for a chat without overwriting messages.
|
|
||||||
*/
|
|
||||||
export async function updateChatConversationId(
|
|
||||||
chatId: string,
|
|
||||||
conversationId: string
|
|
||||||
): Promise<void> {
|
|
||||||
await db
|
|
||||||
.update(copilotChats)
|
|
||||||
.set({
|
|
||||||
conversationId,
|
|
||||||
updatedAt: new Date(),
|
|
||||||
})
|
|
||||||
.where(eq(copilotChats.id, chatId))
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set a tool call confirmation status in Redis.
|
|
||||||
*/
|
|
||||||
export async function setToolConfirmation(
|
|
||||||
toolCallId: string,
|
|
||||||
status: 'accepted' | 'rejected' | 'background' | 'pending',
|
|
||||||
message?: string
|
|
||||||
): Promise<boolean> {
|
|
||||||
const redis = getRedisClient()
|
|
||||||
if (!redis) {
|
|
||||||
logger.warn('Redis client not available for tool confirmation')
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
const key = `tool_call:${toolCallId}`
|
|
||||||
const payload = {
|
|
||||||
status,
|
|
||||||
message: message || null,
|
|
||||||
timestamp: new Date().toISOString(),
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
await redis.set(key, JSON.stringify(payload), 'EX', 86400)
|
|
||||||
return true
|
|
||||||
} catch (error) {
|
|
||||||
logger.error('Failed to set tool confirmation', {
|
|
||||||
toolCallId,
|
|
||||||
error: error instanceof Error ? error.message : String(error),
|
|
||||||
})
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get a tool call confirmation status from Redis.
|
|
||||||
*/
|
|
||||||
export async function getToolConfirmation(toolCallId: string): Promise<{
|
|
||||||
status: string
|
|
||||||
message?: string
|
|
||||||
timestamp?: string
|
|
||||||
} | null> {
|
|
||||||
const redis = getRedisClient()
|
|
||||||
if (!redis) return null
|
|
||||||
|
|
||||||
try {
|
|
||||||
const data = await redis.get(`tool_call:${toolCallId}`)
|
|
||||||
if (!data) return null
|
|
||||||
return JSON.parse(data) as { status: string; message?: string; timestamp?: string }
|
|
||||||
} catch (error) {
|
|
||||||
logger.error('Failed to read tool confirmation', {
|
|
||||||
toolCallId,
|
|
||||||
error: error instanceof Error ? error.message : String(error),
|
|
||||||
})
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,589 +0,0 @@
|
|||||||
import { createLogger } from '@sim/logger'
|
|
||||||
import { INTERRUPT_TOOL_SET, SUBAGENT_TOOL_SET } from '@/lib/copilot/orchestrator/config'
|
|
||||||
import { getToolConfirmation } from '@/lib/copilot/orchestrator/persistence'
|
|
||||||
import { executeToolServerSide, markToolComplete } from '@/lib/copilot/orchestrator/tool-executor'
|
|
||||||
import type {
|
|
||||||
ContentBlock,
|
|
||||||
ExecutionContext,
|
|
||||||
OrchestratorOptions,
|
|
||||||
SSEEvent,
|
|
||||||
StreamingContext,
|
|
||||||
ToolCallState,
|
|
||||||
} from '@/lib/copilot/orchestrator/types'
|
|
||||||
|
|
||||||
const logger = createLogger('CopilotSseHandlers')
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tracks tool call IDs for which a tool_call has already been forwarded/emitted (non-partial).
|
|
||||||
*/
|
|
||||||
const seenToolCalls = new Set<string>()
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tracks tool call IDs for which a tool_result has already been emitted or forwarded.
|
|
||||||
*/
|
|
||||||
const seenToolResults = new Set<string>()
|
|
||||||
|
|
||||||
export function markToolCallSeen(toolCallId: string): void {
|
|
||||||
seenToolCalls.add(toolCallId)
|
|
||||||
setTimeout(
|
|
||||||
() => {
|
|
||||||
seenToolCalls.delete(toolCallId)
|
|
||||||
},
|
|
||||||
5 * 60 * 1000
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
export function wasToolCallSeen(toolCallId: string): boolean {
|
|
||||||
return seenToolCalls.has(toolCallId)
|
|
||||||
}
|
|
||||||
|
|
||||||
type EventDataObject = Record<string, any> | undefined
|
|
||||||
|
|
||||||
const parseEventData = (data: unknown): EventDataObject => {
|
|
||||||
if (!data) return undefined
|
|
||||||
if (typeof data !== 'string') {
|
|
||||||
return data as EventDataObject
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
return JSON.parse(data) as EventDataObject
|
|
||||||
} catch {
|
|
||||||
return undefined
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const hasToolFields = (data: EventDataObject): boolean => {
|
|
||||||
if (!data) return false
|
|
||||||
return (
|
|
||||||
data.id !== undefined ||
|
|
||||||
data.toolCallId !== undefined ||
|
|
||||||
data.name !== undefined ||
|
|
||||||
data.success !== undefined ||
|
|
||||||
data.result !== undefined ||
|
|
||||||
data.arguments !== undefined
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
const getEventData = (event: SSEEvent): EventDataObject => {
|
|
||||||
const topLevel = parseEventData(event.data)
|
|
||||||
if (!topLevel) return undefined
|
|
||||||
if (hasToolFields(topLevel)) return topLevel
|
|
||||||
const nested = parseEventData(topLevel.data)
|
|
||||||
return nested || topLevel
|
|
||||||
}
|
|
||||||
|
|
||||||
export function getToolCallIdFromEvent(event: SSEEvent): string | undefined {
|
|
||||||
const data = getEventData(event)
|
|
||||||
return event.toolCallId || data?.id || data?.toolCallId
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Normalizes SSE events so tool metadata is available at the top level. */
|
|
||||||
export function normalizeSseEvent(event: SSEEvent): SSEEvent {
|
|
||||||
if (!event) return event
|
|
||||||
const data = getEventData(event)
|
|
||||||
if (!data) return event
|
|
||||||
const toolCallId = event.toolCallId || data.id || data.toolCallId
|
|
||||||
const toolName = event.toolName || data.name || data.toolName
|
|
||||||
const success = event.success ?? data.success
|
|
||||||
const result = event.result ?? data.result
|
|
||||||
const normalizedData = typeof event.data === 'string' ? data : event.data
|
|
||||||
return {
|
|
||||||
...event,
|
|
||||||
data: normalizedData,
|
|
||||||
toolCallId,
|
|
||||||
toolName,
|
|
||||||
success,
|
|
||||||
result,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Mark a tool call as executed by the sim-side.
|
|
||||||
* This prevents the Go backend's duplicate tool_result from being forwarded.
|
|
||||||
*/
|
|
||||||
export function markToolResultSeen(toolCallId: string): void {
|
|
||||||
seenToolResults.add(toolCallId)
|
|
||||||
setTimeout(
|
|
||||||
() => {
|
|
||||||
seenToolResults.delete(toolCallId)
|
|
||||||
},
|
|
||||||
5 * 60 * 1000
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if a tool call was executed by the sim-side.
|
|
||||||
*/
|
|
||||||
export function wasToolResultSeen(toolCallId: string): boolean {
|
|
||||||
return seenToolResults.has(toolCallId)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Respond tools are internal to the copilot's subagent system.
|
|
||||||
* They're used by subagents to signal completion and should NOT be executed by the sim side.
|
|
||||||
* The copilot backend handles these internally.
|
|
||||||
*/
|
|
||||||
const RESPOND_TOOL_SET = new Set([
|
|
||||||
'plan_respond',
|
|
||||||
'edit_respond',
|
|
||||||
'debug_respond',
|
|
||||||
'info_respond',
|
|
||||||
'research_respond',
|
|
||||||
'deploy_respond',
|
|
||||||
'superagent_respond',
|
|
||||||
'discovery_respond',
|
|
||||||
])
|
|
||||||
|
|
||||||
export type SSEHandler = (
|
|
||||||
event: SSEEvent,
|
|
||||||
context: StreamingContext,
|
|
||||||
execContext: ExecutionContext,
|
|
||||||
options: OrchestratorOptions
|
|
||||||
) => void | Promise<void>
|
|
||||||
|
|
||||||
function addContentBlock(context: StreamingContext, block: Omit<ContentBlock, 'timestamp'>): void {
|
|
||||||
context.contentBlocks.push({
|
|
||||||
...block,
|
|
||||||
timestamp: Date.now(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
async function executeToolAndReport(
|
|
||||||
toolCallId: string,
|
|
||||||
context: StreamingContext,
|
|
||||||
execContext: ExecutionContext,
|
|
||||||
options?: OrchestratorOptions
|
|
||||||
): Promise<void> {
|
|
||||||
const toolCall = context.toolCalls.get(toolCallId)
|
|
||||||
if (!toolCall) return
|
|
||||||
|
|
||||||
if (toolCall.status === 'executing') return
|
|
||||||
if (wasToolResultSeen(toolCall.id)) return
|
|
||||||
|
|
||||||
toolCall.status = 'executing'
|
|
||||||
try {
|
|
||||||
const result = await executeToolServerSide(toolCall, execContext)
|
|
||||||
toolCall.status = result.success ? 'success' : 'error'
|
|
||||||
toolCall.result = result
|
|
||||||
toolCall.error = result.error
|
|
||||||
toolCall.endTime = Date.now()
|
|
||||||
|
|
||||||
// If create_workflow was successful, update the execution context with the new workflowId
|
|
||||||
// This ensures subsequent tools in the same stream have access to the workflowId
|
|
||||||
if (
|
|
||||||
toolCall.name === 'create_workflow' &&
|
|
||||||
result.success &&
|
|
||||||
result.output?.workflowId &&
|
|
||||||
!execContext.workflowId
|
|
||||||
) {
|
|
||||||
execContext.workflowId = result.output.workflowId
|
|
||||||
if (result.output.workspaceId) {
|
|
||||||
execContext.workspaceId = result.output.workspaceId
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
markToolResultSeen(toolCall.id)
|
|
||||||
|
|
||||||
await markToolComplete(
|
|
||||||
toolCall.id,
|
|
||||||
toolCall.name,
|
|
||||||
result.success ? 200 : 500,
|
|
||||||
result.error || (result.success ? 'Tool completed' : 'Tool failed'),
|
|
||||||
result.output
|
|
||||||
)
|
|
||||||
|
|
||||||
await options?.onEvent?.({
|
|
||||||
type: 'tool_result',
|
|
||||||
toolCallId: toolCall.id,
|
|
||||||
toolName: toolCall.name,
|
|
||||||
success: result.success,
|
|
||||||
result: result.output,
|
|
||||||
data: {
|
|
||||||
id: toolCall.id,
|
|
||||||
name: toolCall.name,
|
|
||||||
success: result.success,
|
|
||||||
result: result.output,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
} catch (error) {
|
|
||||||
toolCall.status = 'error'
|
|
||||||
toolCall.error = error instanceof Error ? error.message : String(error)
|
|
||||||
toolCall.endTime = Date.now()
|
|
||||||
|
|
||||||
markToolResultSeen(toolCall.id)
|
|
||||||
|
|
||||||
await markToolComplete(toolCall.id, toolCall.name, 500, toolCall.error)
|
|
||||||
|
|
||||||
await options?.onEvent?.({
|
|
||||||
type: 'tool_error',
|
|
||||||
toolCallId: toolCall.id,
|
|
||||||
data: {
|
|
||||||
id: toolCall.id,
|
|
||||||
name: toolCall.name,
|
|
||||||
error: toolCall.error,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function waitForToolDecision(
|
|
||||||
toolCallId: string,
|
|
||||||
timeoutMs: number
|
|
||||||
): Promise<{ status: string; message?: string } | null> {
|
|
||||||
const start = Date.now()
|
|
||||||
while (Date.now() - start < timeoutMs) {
|
|
||||||
const decision = await getToolConfirmation(toolCallId)
|
|
||||||
if (decision?.status) {
|
|
||||||
return decision
|
|
||||||
}
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, 100))
|
|
||||||
}
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
|
|
||||||
export const sseHandlers: Record<string, SSEHandler> = {
|
|
||||||
chat_id: (event, context) => {
|
|
||||||
context.chatId = event.data?.chatId
|
|
||||||
},
|
|
||||||
title_updated: () => {},
|
|
||||||
tool_result: (event, context) => {
|
|
||||||
const data = getEventData(event)
|
|
||||||
const toolCallId = event.toolCallId || data?.id
|
|
||||||
if (!toolCallId) return
|
|
||||||
const current = context.toolCalls.get(toolCallId)
|
|
||||||
if (!current) return
|
|
||||||
|
|
||||||
// Determine success: explicit success field, or if there's result data without explicit failure
|
|
||||||
const hasExplicitSuccess = data?.success !== undefined || data?.result?.success !== undefined
|
|
||||||
const explicitSuccess = data?.success ?? data?.result?.success
|
|
||||||
const hasResultData = data?.result !== undefined || data?.data !== undefined
|
|
||||||
const hasError = !!data?.error || !!data?.result?.error
|
|
||||||
|
|
||||||
// If explicitly set, use that; otherwise infer from data presence
|
|
||||||
const success = hasExplicitSuccess ? !!explicitSuccess : hasResultData && !hasError
|
|
||||||
|
|
||||||
current.status = success ? 'success' : 'error'
|
|
||||||
current.endTime = Date.now()
|
|
||||||
if (hasResultData) {
|
|
||||||
current.result = {
|
|
||||||
success,
|
|
||||||
output: data?.result || data?.data,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (hasError) {
|
|
||||||
current.error = data?.error || data?.result?.error
|
|
||||||
}
|
|
||||||
},
|
|
||||||
tool_error: (event, context) => {
|
|
||||||
const data = getEventData(event)
|
|
||||||
const toolCallId = event.toolCallId || data?.id
|
|
||||||
if (!toolCallId) return
|
|
||||||
const current = context.toolCalls.get(toolCallId)
|
|
||||||
if (!current) return
|
|
||||||
current.status = 'error'
|
|
||||||
current.error = data?.error || 'Tool execution failed'
|
|
||||||
current.endTime = Date.now()
|
|
||||||
},
|
|
||||||
tool_generating: (event, context) => {
|
|
||||||
const data = getEventData(event)
|
|
||||||
const toolCallId = event.toolCallId || data?.toolCallId || data?.id
|
|
||||||
const toolName = event.toolName || data?.toolName || data?.name
|
|
||||||
if (!toolCallId || !toolName) return
|
|
||||||
if (!context.toolCalls.has(toolCallId)) {
|
|
||||||
context.toolCalls.set(toolCallId, {
|
|
||||||
id: toolCallId,
|
|
||||||
name: toolName,
|
|
||||||
status: 'pending',
|
|
||||||
startTime: Date.now(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
},
|
|
||||||
tool_call: async (event, context, execContext, options) => {
|
|
||||||
const toolData = getEventData(event) || {}
|
|
||||||
const toolCallId = toolData.id || event.toolCallId
|
|
||||||
const toolName = toolData.name || event.toolName
|
|
||||||
if (!toolCallId || !toolName) return
|
|
||||||
|
|
||||||
const args = toolData.arguments || toolData.input || event.data?.input
|
|
||||||
const isPartial = toolData.partial === true
|
|
||||||
const existing = context.toolCalls.get(toolCallId)
|
|
||||||
|
|
||||||
// If we've already completed this tool call, ignore late/duplicate tool_call events
|
|
||||||
// to avoid resetting UI/state back to pending and re-executing.
|
|
||||||
if (
|
|
||||||
existing?.endTime ||
|
|
||||||
(existing && existing.status !== 'pending' && existing.status !== 'executing')
|
|
||||||
) {
|
|
||||||
if (!existing.params && args) {
|
|
||||||
existing.params = args
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (existing) {
|
|
||||||
if (args && !existing.params) existing.params = args
|
|
||||||
} else {
|
|
||||||
context.toolCalls.set(toolCallId, {
|
|
||||||
id: toolCallId,
|
|
||||||
name: toolName,
|
|
||||||
status: 'pending',
|
|
||||||
params: args,
|
|
||||||
startTime: Date.now(),
|
|
||||||
})
|
|
||||||
const created = context.toolCalls.get(toolCallId)!
|
|
||||||
addContentBlock(context, { type: 'tool_call', toolCall: created })
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isPartial) return
|
|
||||||
if (wasToolResultSeen(toolCallId)) return
|
|
||||||
|
|
||||||
const toolCall = context.toolCalls.get(toolCallId)
|
|
||||||
if (!toolCall) return
|
|
||||||
|
|
||||||
// Subagent tools are executed by the copilot backend, not sim side
|
|
||||||
if (SUBAGENT_TOOL_SET.has(toolName)) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Respond tools are internal to copilot's subagent system - skip execution
|
|
||||||
// The copilot backend handles these internally to signal subagent completion
|
|
||||||
if (RESPOND_TOOL_SET.has(toolName)) {
|
|
||||||
toolCall.status = 'success'
|
|
||||||
toolCall.endTime = Date.now()
|
|
||||||
toolCall.result = {
|
|
||||||
success: true,
|
|
||||||
output: 'Internal respond tool - handled by copilot backend',
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
const isInterruptTool = INTERRUPT_TOOL_SET.has(toolName)
|
|
||||||
const isInteractive = options.interactive === true
|
|
||||||
|
|
||||||
if (isInterruptTool && isInteractive) {
|
|
||||||
const decision = await waitForToolDecision(toolCallId, options.timeout || 600000)
|
|
||||||
if (decision?.status === 'accepted' || decision?.status === 'success') {
|
|
||||||
await executeToolAndReport(toolCallId, context, execContext, options)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (decision?.status === 'rejected' || decision?.status === 'error') {
|
|
||||||
toolCall.status = 'rejected'
|
|
||||||
toolCall.endTime = Date.now()
|
|
||||||
await markToolComplete(
|
|
||||||
toolCall.id,
|
|
||||||
toolCall.name,
|
|
||||||
400,
|
|
||||||
decision.message || 'Tool execution rejected',
|
|
||||||
{ skipped: true, reason: 'user_rejected' }
|
|
||||||
)
|
|
||||||
markToolResultSeen(toolCall.id)
|
|
||||||
await options.onEvent?.({
|
|
||||||
type: 'tool_result',
|
|
||||||
toolCallId: toolCall.id,
|
|
||||||
data: {
|
|
||||||
id: toolCall.id,
|
|
||||||
name: toolCall.name,
|
|
||||||
success: false,
|
|
||||||
result: { skipped: true, reason: 'user_rejected' },
|
|
||||||
},
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (decision?.status === 'background') {
|
|
||||||
toolCall.status = 'skipped'
|
|
||||||
toolCall.endTime = Date.now()
|
|
||||||
await markToolComplete(
|
|
||||||
toolCall.id,
|
|
||||||
toolCall.name,
|
|
||||||
202,
|
|
||||||
decision.message || 'Tool execution moved to background',
|
|
||||||
{ background: true }
|
|
||||||
)
|
|
||||||
markToolResultSeen(toolCall.id)
|
|
||||||
await options.onEvent?.({
|
|
||||||
type: 'tool_result',
|
|
||||||
toolCallId: toolCall.id,
|
|
||||||
data: {
|
|
||||||
id: toolCall.id,
|
|
||||||
name: toolCall.name,
|
|
||||||
success: true,
|
|
||||||
result: { background: true },
|
|
||||||
},
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (options.autoExecuteTools !== false) {
|
|
||||||
await executeToolAndReport(toolCallId, context, execContext, options)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
reasoning: (event, context) => {
|
|
||||||
const phase = event.data?.phase || event.data?.data?.phase
|
|
||||||
if (phase === 'start') {
|
|
||||||
context.isInThinkingBlock = true
|
|
||||||
context.currentThinkingBlock = {
|
|
||||||
type: 'thinking',
|
|
||||||
content: '',
|
|
||||||
timestamp: Date.now(),
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if (phase === 'end') {
|
|
||||||
if (context.currentThinkingBlock) {
|
|
||||||
context.contentBlocks.push(context.currentThinkingBlock)
|
|
||||||
}
|
|
||||||
context.isInThinkingBlock = false
|
|
||||||
context.currentThinkingBlock = null
|
|
||||||
return
|
|
||||||
}
|
|
||||||
const chunk =
|
|
||||||
typeof event.data === 'string' ? event.data : event.data?.data || event.data?.content
|
|
||||||
if (!chunk || !context.currentThinkingBlock) return
|
|
||||||
context.currentThinkingBlock.content = `${context.currentThinkingBlock.content || ''}${chunk}`
|
|
||||||
},
|
|
||||||
content: (event, context) => {
|
|
||||||
const chunk =
|
|
||||||
typeof event.data === 'string' ? event.data : event.data?.content || event.data?.data
|
|
||||||
if (!chunk) return
|
|
||||||
context.accumulatedContent += chunk
|
|
||||||
addContentBlock(context, { type: 'text', content: chunk })
|
|
||||||
},
|
|
||||||
done: (event, context) => {
|
|
||||||
if (event.data?.responseId) {
|
|
||||||
context.conversationId = event.data.responseId
|
|
||||||
}
|
|
||||||
context.streamComplete = true
|
|
||||||
},
|
|
||||||
start: (event, context) => {
|
|
||||||
if (event.data?.responseId) {
|
|
||||||
context.conversationId = event.data.responseId
|
|
||||||
}
|
|
||||||
},
|
|
||||||
error: (event, context) => {
|
|
||||||
const message =
|
|
||||||
event.data?.message ||
|
|
||||||
event.data?.error ||
|
|
||||||
(typeof event.data === 'string' ? event.data : null)
|
|
||||||
if (message) {
|
|
||||||
context.errors.push(message)
|
|
||||||
}
|
|
||||||
context.streamComplete = true
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
export const subAgentHandlers: Record<string, SSEHandler> = {
|
|
||||||
content: (event, context) => {
|
|
||||||
const parentToolCallId = context.subAgentParentToolCallId
|
|
||||||
if (!parentToolCallId || !event.data) return
|
|
||||||
const chunk = typeof event.data === 'string' ? event.data : event.data?.content || ''
|
|
||||||
if (!chunk) return
|
|
||||||
context.subAgentContent[parentToolCallId] =
|
|
||||||
(context.subAgentContent[parentToolCallId] || '') + chunk
|
|
||||||
addContentBlock(context, { type: 'subagent_text', content: chunk })
|
|
||||||
},
|
|
||||||
tool_call: async (event, context, execContext, options) => {
|
|
||||||
const parentToolCallId = context.subAgentParentToolCallId
|
|
||||||
if (!parentToolCallId) return
|
|
||||||
const toolData = getEventData(event) || {}
|
|
||||||
const toolCallId = toolData.id || event.toolCallId
|
|
||||||
const toolName = toolData.name || event.toolName
|
|
||||||
if (!toolCallId || !toolName) return
|
|
||||||
const isPartial = toolData.partial === true
|
|
||||||
const args = toolData.arguments || toolData.input || event.data?.input
|
|
||||||
|
|
||||||
const existing = context.toolCalls.get(toolCallId)
|
|
||||||
// Ignore late/duplicate tool_call events once we already have a result
|
|
||||||
if (wasToolResultSeen(toolCallId) || existing?.endTime) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
const toolCall: ToolCallState = {
|
|
||||||
id: toolCallId,
|
|
||||||
name: toolName,
|
|
||||||
status: 'pending',
|
|
||||||
params: args,
|
|
||||||
startTime: Date.now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store in both places - but do NOT overwrite existing tool call state for the same id
|
|
||||||
if (!context.subAgentToolCalls[parentToolCallId]) {
|
|
||||||
context.subAgentToolCalls[parentToolCallId] = []
|
|
||||||
}
|
|
||||||
if (!context.subAgentToolCalls[parentToolCallId].some((tc) => tc.id === toolCallId)) {
|
|
||||||
context.subAgentToolCalls[parentToolCallId].push(toolCall)
|
|
||||||
}
|
|
||||||
if (!context.toolCalls.has(toolCallId)) {
|
|
||||||
context.toolCalls.set(toolCallId, toolCall)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isPartial) return
|
|
||||||
|
|
||||||
// Respond tools are internal to copilot's subagent system - skip execution
|
|
||||||
if (RESPOND_TOOL_SET.has(toolName)) {
|
|
||||||
toolCall.status = 'success'
|
|
||||||
toolCall.endTime = Date.now()
|
|
||||||
toolCall.result = {
|
|
||||||
success: true,
|
|
||||||
output: 'Internal respond tool - handled by copilot backend',
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (options.autoExecuteTools !== false) {
|
|
||||||
await executeToolAndReport(toolCallId, context, execContext, options)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
tool_result: (event, context) => {
|
|
||||||
const parentToolCallId = context.subAgentParentToolCallId
|
|
||||||
if (!parentToolCallId) return
|
|
||||||
const data = getEventData(event)
|
|
||||||
const toolCallId = event.toolCallId || data?.id
|
|
||||||
if (!toolCallId) return
|
|
||||||
|
|
||||||
// Update in subAgentToolCalls
|
|
||||||
const toolCalls = context.subAgentToolCalls[parentToolCallId] || []
|
|
||||||
const subAgentToolCall = toolCalls.find((tc) => tc.id === toolCallId)
|
|
||||||
|
|
||||||
// Also update in main toolCalls (where we added it for execution)
|
|
||||||
const mainToolCall = context.toolCalls.get(toolCallId)
|
|
||||||
|
|
||||||
// Use same success inference logic as main handler
|
|
||||||
const hasExplicitSuccess = data?.success !== undefined || data?.result?.success !== undefined
|
|
||||||
const explicitSuccess = data?.success ?? data?.result?.success
|
|
||||||
const hasResultData = data?.result !== undefined || data?.data !== undefined
|
|
||||||
const hasError = !!data?.error || !!data?.result?.error
|
|
||||||
const success = hasExplicitSuccess ? !!explicitSuccess : hasResultData && !hasError
|
|
||||||
|
|
||||||
const status = success ? 'success' : 'error'
|
|
||||||
const endTime = Date.now()
|
|
||||||
const result = hasResultData ? { success, output: data?.result || data?.data } : undefined
|
|
||||||
|
|
||||||
if (subAgentToolCall) {
|
|
||||||
subAgentToolCall.status = status
|
|
||||||
subAgentToolCall.endTime = endTime
|
|
||||||
if (result) subAgentToolCall.result = result
|
|
||||||
if (hasError) subAgentToolCall.error = data?.error || data?.result?.error
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mainToolCall) {
|
|
||||||
mainToolCall.status = status
|
|
||||||
mainToolCall.endTime = endTime
|
|
||||||
if (result) mainToolCall.result = result
|
|
||||||
if (hasError) mainToolCall.error = data?.error || data?.result?.error
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
export function handleSubagentRouting(event: SSEEvent, context: StreamingContext): boolean {
|
|
||||||
if (!event.subagent) return false
|
|
||||||
if (!context.subAgentParentToolCallId) {
|
|
||||||
logger.warn('Subagent event missing parent tool call', {
|
|
||||||
type: event.type,
|
|
||||||
subagent: event.subagent,
|
|
||||||
})
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
@@ -1,71 +0,0 @@
|
|||||||
import { createLogger } from '@sim/logger'
|
|
||||||
import type { SSEEvent } from '@/lib/copilot/orchestrator/types'
|
|
||||||
|
|
||||||
const logger = createLogger('CopilotSseParser')
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Parses SSE streams from the copilot backend into typed events.
|
|
||||||
*/
|
|
||||||
export async function* parseSSEStream(
|
|
||||||
reader: ReadableStreamDefaultReader<Uint8Array>,
|
|
||||||
decoder: TextDecoder,
|
|
||||||
abortSignal?: AbortSignal
|
|
||||||
): AsyncGenerator<SSEEvent> {
|
|
||||||
let buffer = ''
|
|
||||||
|
|
||||||
try {
|
|
||||||
while (true) {
|
|
||||||
if (abortSignal?.aborted) {
|
|
||||||
logger.info('SSE stream aborted by signal')
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
const { done, value } = await reader.read()
|
|
||||||
if (done) break
|
|
||||||
|
|
||||||
buffer += decoder.decode(value, { stream: true })
|
|
||||||
const lines = buffer.split('\n')
|
|
||||||
buffer = lines.pop() || ''
|
|
||||||
|
|
||||||
for (const line of lines) {
|
|
||||||
if (!line.trim()) continue
|
|
||||||
if (!line.startsWith('data: ')) continue
|
|
||||||
|
|
||||||
const jsonStr = line.slice(6)
|
|
||||||
if (jsonStr === '[DONE]') continue
|
|
||||||
|
|
||||||
try {
|
|
||||||
const event = JSON.parse(jsonStr) as SSEEvent
|
|
||||||
if (event?.type) {
|
|
||||||
yield event
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
logger.warn('Failed to parse SSE event', {
|
|
||||||
preview: jsonStr.slice(0, 200),
|
|
||||||
error: error instanceof Error ? error.message : String(error),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (buffer.trim() && buffer.startsWith('data: ')) {
|
|
||||||
try {
|
|
||||||
const event = JSON.parse(buffer.slice(6)) as SSEEvent
|
|
||||||
if (event?.type) {
|
|
||||||
yield event
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
logger.warn('Failed to parse final SSE buffer', {
|
|
||||||
preview: buffer.slice(0, 200),
|
|
||||||
error: error instanceof Error ? error.message : String(error),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
try {
|
|
||||||
reader.releaseLock()
|
|
||||||
} catch {
|
|
||||||
logger.warn('Failed to release SSE reader lock')
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,262 +0,0 @@
|
|||||||
import { createLogger } from '@sim/logger'
|
|
||||||
import { getRedisClient } from '@/lib/core/config/redis'
|
|
||||||
|
|
||||||
const logger = createLogger('CopilotStreamBuffer')
|
|
||||||
|
|
||||||
const STREAM_TTL_SECONDS = 60 * 60
|
|
||||||
const STREAM_EVENT_LIMIT = 5000
|
|
||||||
const STREAM_RESERVE_BATCH = 200
|
|
||||||
const STREAM_FLUSH_INTERVAL_MS = 15
|
|
||||||
const STREAM_FLUSH_MAX_BATCH = 200
|
|
||||||
|
|
||||||
const APPEND_STREAM_EVENT_LUA = `
|
|
||||||
local seqKey = KEYS[1]
|
|
||||||
local eventsKey = KEYS[2]
|
|
||||||
local ttl = tonumber(ARGV[1])
|
|
||||||
local limit = tonumber(ARGV[2])
|
|
||||||
local streamId = ARGV[3]
|
|
||||||
local eventJson = ARGV[4]
|
|
||||||
|
|
||||||
local id = redis.call('INCR', seqKey)
|
|
||||||
local entry = '{"eventId":' .. id .. ',"streamId":' .. cjson.encode(streamId) .. ',"event":' .. eventJson .. '}'
|
|
||||||
redis.call('ZADD', eventsKey, id, entry)
|
|
||||||
redis.call('EXPIRE', eventsKey, ttl)
|
|
||||||
redis.call('EXPIRE', seqKey, ttl)
|
|
||||||
if limit > 0 then
|
|
||||||
redis.call('ZREMRANGEBYRANK', eventsKey, 0, -limit-1)
|
|
||||||
end
|
|
||||||
return id
|
|
||||||
`
|
|
||||||
|
|
||||||
function getStreamKeyPrefix(streamId: string) {
|
|
||||||
return `copilot_stream:${streamId}`
|
|
||||||
}
|
|
||||||
|
|
||||||
function getEventsKey(streamId: string) {
|
|
||||||
return `${getStreamKeyPrefix(streamId)}:events`
|
|
||||||
}
|
|
||||||
|
|
||||||
function getSeqKey(streamId: string) {
|
|
||||||
return `${getStreamKeyPrefix(streamId)}:seq`
|
|
||||||
}
|
|
||||||
|
|
||||||
function getMetaKey(streamId: string) {
|
|
||||||
return `${getStreamKeyPrefix(streamId)}:meta`
|
|
||||||
}
|
|
||||||
|
|
||||||
export type StreamStatus = 'active' | 'complete' | 'error'
|
|
||||||
|
|
||||||
export type StreamMeta = {
|
|
||||||
status: StreamStatus
|
|
||||||
userId?: string
|
|
||||||
updatedAt?: string
|
|
||||||
error?: string
|
|
||||||
}
|
|
||||||
|
|
||||||
export type StreamEventEntry = {
|
|
||||||
eventId: number
|
|
||||||
streamId: string
|
|
||||||
event: Record<string, any>
|
|
||||||
}
|
|
||||||
|
|
||||||
export type StreamEventWriter = {
|
|
||||||
write: (event: Record<string, any>) => Promise<StreamEventEntry>
|
|
||||||
flush: () => Promise<void>
|
|
||||||
close: () => Promise<void>
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function resetStreamBuffer(streamId: string): Promise<void> {
|
|
||||||
const redis = getRedisClient()
|
|
||||||
if (!redis) return
|
|
||||||
try {
|
|
||||||
await redis.del(getEventsKey(streamId), getSeqKey(streamId), getMetaKey(streamId))
|
|
||||||
} catch (error) {
|
|
||||||
logger.warn('Failed to reset stream buffer', {
|
|
||||||
streamId,
|
|
||||||
error: error instanceof Error ? error.message : String(error),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function setStreamMeta(streamId: string, meta: StreamMeta): Promise<void> {
|
|
||||||
const redis = getRedisClient()
|
|
||||||
if (!redis) return
|
|
||||||
try {
|
|
||||||
const payload: Record<string, string> = {
|
|
||||||
status: meta.status,
|
|
||||||
updatedAt: meta.updatedAt || new Date().toISOString(),
|
|
||||||
}
|
|
||||||
if (meta.userId) payload.userId = meta.userId
|
|
||||||
if (meta.error) payload.error = meta.error
|
|
||||||
await redis.hset(getMetaKey(streamId), payload)
|
|
||||||
await redis.expire(getMetaKey(streamId), STREAM_TTL_SECONDS)
|
|
||||||
} catch (error) {
|
|
||||||
logger.warn('Failed to update stream meta', {
|
|
||||||
streamId,
|
|
||||||
error: error instanceof Error ? error.message : String(error),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function getStreamMeta(streamId: string): Promise<StreamMeta | null> {
|
|
||||||
const redis = getRedisClient()
|
|
||||||
if (!redis) return null
|
|
||||||
try {
|
|
||||||
const meta = await redis.hgetall(getMetaKey(streamId))
|
|
||||||
if (!meta || Object.keys(meta).length === 0) return null
|
|
||||||
return meta as StreamMeta
|
|
||||||
} catch (error) {
|
|
||||||
logger.warn('Failed to read stream meta', {
|
|
||||||
streamId,
|
|
||||||
error: error instanceof Error ? error.message : String(error),
|
|
||||||
})
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function appendStreamEvent(
|
|
||||||
streamId: string,
|
|
||||||
event: Record<string, any>
|
|
||||||
): Promise<StreamEventEntry> {
|
|
||||||
const redis = getRedisClient()
|
|
||||||
if (!redis) {
|
|
||||||
return { eventId: 0, streamId, event }
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
const eventJson = JSON.stringify(event)
|
|
||||||
const nextId = await redis.eval(
|
|
||||||
APPEND_STREAM_EVENT_LUA,
|
|
||||||
2,
|
|
||||||
getSeqKey(streamId),
|
|
||||||
getEventsKey(streamId),
|
|
||||||
STREAM_TTL_SECONDS,
|
|
||||||
STREAM_EVENT_LIMIT,
|
|
||||||
streamId,
|
|
||||||
eventJson
|
|
||||||
)
|
|
||||||
const eventId = typeof nextId === 'number' ? nextId : Number(nextId)
|
|
||||||
return { eventId, streamId, event }
|
|
||||||
} catch (error) {
|
|
||||||
logger.warn('Failed to append stream event', {
|
|
||||||
streamId,
|
|
||||||
error: error instanceof Error ? error.message : String(error),
|
|
||||||
})
|
|
||||||
return { eventId: 0, streamId, event }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export function createStreamEventWriter(streamId: string): StreamEventWriter {
|
|
||||||
const redis = getRedisClient()
|
|
||||||
if (!redis) {
|
|
||||||
return {
|
|
||||||
write: async (event) => ({ eventId: 0, streamId, event }),
|
|
||||||
flush: async () => {},
|
|
||||||
close: async () => {},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let pending: StreamEventEntry[] = []
|
|
||||||
let nextEventId = 0
|
|
||||||
let maxReservedId = 0
|
|
||||||
let flushTimer: ReturnType<typeof setTimeout> | null = null
|
|
||||||
let isFlushing = false
|
|
||||||
|
|
||||||
const scheduleFlush = () => {
|
|
||||||
if (flushTimer) return
|
|
||||||
flushTimer = setTimeout(() => {
|
|
||||||
flushTimer = null
|
|
||||||
void flush()
|
|
||||||
}, STREAM_FLUSH_INTERVAL_MS)
|
|
||||||
}
|
|
||||||
|
|
||||||
const reserveIds = async (minCount: number) => {
|
|
||||||
const reserveCount = Math.max(STREAM_RESERVE_BATCH, minCount)
|
|
||||||
const newMax = await redis.incrby(getSeqKey(streamId), reserveCount)
|
|
||||||
const startId = newMax - reserveCount + 1
|
|
||||||
if (nextEventId === 0 || nextEventId > maxReservedId) {
|
|
||||||
nextEventId = startId
|
|
||||||
maxReservedId = newMax
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const flush = async () => {
|
|
||||||
if (isFlushing || pending.length === 0) return
|
|
||||||
isFlushing = true
|
|
||||||
const batch = pending
|
|
||||||
pending = []
|
|
||||||
try {
|
|
||||||
const key = getEventsKey(streamId)
|
|
||||||
const zaddArgs: (string | number)[] = []
|
|
||||||
for (const entry of batch) {
|
|
||||||
zaddArgs.push(entry.eventId, JSON.stringify(entry))
|
|
||||||
}
|
|
||||||
const pipeline = redis.pipeline()
|
|
||||||
pipeline.zadd(key, ...(zaddArgs as any))
|
|
||||||
pipeline.expire(key, STREAM_TTL_SECONDS)
|
|
||||||
pipeline.expire(getSeqKey(streamId), STREAM_TTL_SECONDS)
|
|
||||||
pipeline.zremrangebyrank(key, 0, -STREAM_EVENT_LIMIT - 1)
|
|
||||||
await pipeline.exec()
|
|
||||||
} catch (error) {
|
|
||||||
logger.warn('Failed to flush stream events', {
|
|
||||||
streamId,
|
|
||||||
error: error instanceof Error ? error.message : String(error),
|
|
||||||
})
|
|
||||||
pending = batch.concat(pending)
|
|
||||||
} finally {
|
|
||||||
isFlushing = false
|
|
||||||
if (pending.length > 0) scheduleFlush()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const write = async (event: Record<string, any>) => {
|
|
||||||
if (nextEventId === 0 || nextEventId > maxReservedId) {
|
|
||||||
await reserveIds(1)
|
|
||||||
}
|
|
||||||
const eventId = nextEventId++
|
|
||||||
const entry: StreamEventEntry = { eventId, streamId, event }
|
|
||||||
pending.push(entry)
|
|
||||||
if (pending.length >= STREAM_FLUSH_MAX_BATCH) {
|
|
||||||
await flush()
|
|
||||||
} else {
|
|
||||||
scheduleFlush()
|
|
||||||
}
|
|
||||||
return entry
|
|
||||||
}
|
|
||||||
|
|
||||||
const close = async () => {
|
|
||||||
if (flushTimer) {
|
|
||||||
clearTimeout(flushTimer)
|
|
||||||
flushTimer = null
|
|
||||||
}
|
|
||||||
await flush()
|
|
||||||
}
|
|
||||||
|
|
||||||
return { write, flush, close }
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function readStreamEvents(
|
|
||||||
streamId: string,
|
|
||||||
afterEventId: number
|
|
||||||
): Promise<StreamEventEntry[]> {
|
|
||||||
const redis = getRedisClient()
|
|
||||||
if (!redis) return []
|
|
||||||
try {
|
|
||||||
const raw = await redis.zrangebyscore(getEventsKey(streamId), afterEventId + 1, '+inf')
|
|
||||||
return raw
|
|
||||||
.map((entry) => {
|
|
||||||
try {
|
|
||||||
return JSON.parse(entry) as StreamEventEntry
|
|
||||||
} catch {
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.filter((entry): entry is StreamEventEntry => Boolean(entry))
|
|
||||||
} catch (error) {
|
|
||||||
logger.warn('Failed to read stream events', {
|
|
||||||
streamId,
|
|
||||||
error: error instanceof Error ? error.message : String(error),
|
|
||||||
})
|
|
||||||
return []
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,286 +0,0 @@
|
|||||||
import { createLogger } from '@sim/logger'
|
|
||||||
import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants'
|
|
||||||
import {
|
|
||||||
getToolCallIdFromEvent,
|
|
||||||
handleSubagentRouting,
|
|
||||||
markToolCallSeen,
|
|
||||||
markToolResultSeen,
|
|
||||||
normalizeSseEvent,
|
|
||||||
sseHandlers,
|
|
||||||
subAgentHandlers,
|
|
||||||
wasToolCallSeen,
|
|
||||||
wasToolResultSeen,
|
|
||||||
} from '@/lib/copilot/orchestrator/sse-handlers'
|
|
||||||
import { parseSSEStream } from '@/lib/copilot/orchestrator/sse-parser'
|
|
||||||
import { prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor'
|
|
||||||
import type {
|
|
||||||
ExecutionContext,
|
|
||||||
OrchestratorOptions,
|
|
||||||
SSEEvent,
|
|
||||||
StreamingContext,
|
|
||||||
ToolCallSummary,
|
|
||||||
} from '@/lib/copilot/orchestrator/types'
|
|
||||||
import { env } from '@/lib/core/config/env'
|
|
||||||
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
|
|
||||||
|
|
||||||
const logger = createLogger('CopilotSubagentOrchestrator')
|
|
||||||
const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
|
|
||||||
|
|
||||||
export interface SubagentOrchestratorOptions extends Omit<OrchestratorOptions, 'onComplete'> {
|
|
||||||
userId: string
|
|
||||||
workflowId?: string
|
|
||||||
workspaceId?: string
|
|
||||||
onComplete?: (result: SubagentOrchestratorResult) => void | Promise<void>
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface SubagentOrchestratorResult {
|
|
||||||
success: boolean
|
|
||||||
content: string
|
|
||||||
toolCalls: ToolCallSummary[]
|
|
||||||
structuredResult?: {
|
|
||||||
type?: string
|
|
||||||
summary?: string
|
|
||||||
data?: any
|
|
||||||
success?: boolean
|
|
||||||
}
|
|
||||||
error?: string
|
|
||||||
errors?: string[]
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function orchestrateSubagentStream(
|
|
||||||
agentId: string,
|
|
||||||
requestPayload: Record<string, any>,
|
|
||||||
options: SubagentOrchestratorOptions
|
|
||||||
): Promise<SubagentOrchestratorResult> {
|
|
||||||
const { userId, workflowId, workspaceId, timeout = 300000, abortSignal } = options
|
|
||||||
const execContext = await buildExecutionContext(userId, workflowId, workspaceId)
|
|
||||||
|
|
||||||
const context: StreamingContext = {
|
|
||||||
chatId: undefined,
|
|
||||||
conversationId: undefined,
|
|
||||||
messageId: requestPayload?.messageId || crypto.randomUUID(),
|
|
||||||
accumulatedContent: '',
|
|
||||||
contentBlocks: [],
|
|
||||||
toolCalls: new Map(),
|
|
||||||
currentThinkingBlock: null,
|
|
||||||
isInThinkingBlock: false,
|
|
||||||
subAgentParentToolCallId: undefined,
|
|
||||||
subAgentContent: {},
|
|
||||||
subAgentToolCalls: {},
|
|
||||||
pendingContent: '',
|
|
||||||
streamComplete: false,
|
|
||||||
wasAborted: false,
|
|
||||||
errors: [],
|
|
||||||
}
|
|
||||||
|
|
||||||
let structuredResult: SubagentOrchestratorResult['structuredResult']
|
|
||||||
|
|
||||||
try {
|
|
||||||
const response = await fetch(`${SIM_AGENT_API_URL}/api/subagent/${agentId}`, {
|
|
||||||
method: 'POST',
|
|
||||||
headers: {
|
|
||||||
'Content-Type': 'application/json',
|
|
||||||
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
|
|
||||||
},
|
|
||||||
body: JSON.stringify({ ...requestPayload, stream: true, userId }),
|
|
||||||
signal: abortSignal,
|
|
||||||
})
|
|
||||||
|
|
||||||
if (!response.ok) {
|
|
||||||
const errorText = await response.text().catch(() => '')
|
|
||||||
throw new Error(
|
|
||||||
`Copilot backend error (${response.status}): ${errorText || response.statusText}`
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!response.body) {
|
|
||||||
throw new Error('Copilot backend response missing body')
|
|
||||||
}
|
|
||||||
|
|
||||||
const reader = response.body.getReader()
|
|
||||||
const decoder = new TextDecoder()
|
|
||||||
|
|
||||||
const timeoutId = setTimeout(() => {
|
|
||||||
context.errors.push('Request timed out')
|
|
||||||
context.streamComplete = true
|
|
||||||
reader.cancel().catch(() => {})
|
|
||||||
}, timeout)
|
|
||||||
|
|
||||||
try {
|
|
||||||
for await (const event of parseSSEStream(reader, decoder, abortSignal)) {
|
|
||||||
if (abortSignal?.aborted) {
|
|
||||||
context.wasAborted = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
const normalizedEvent = normalizeSseEvent(event)
|
|
||||||
|
|
||||||
// Skip tool_result events for tools the sim-side already executed.
|
|
||||||
// The sim-side emits its own tool_result with complete data.
|
|
||||||
// For server-side tools (not executed by sim), we still forward the Go backend's tool_result.
|
|
||||||
const toolCallId = getToolCallIdFromEvent(normalizedEvent)
|
|
||||||
const eventData = normalizedEvent.data
|
|
||||||
|
|
||||||
const isPartialToolCall =
|
|
||||||
normalizedEvent.type === 'tool_call' && eventData?.partial === true
|
|
||||||
|
|
||||||
const shouldSkipToolCall =
|
|
||||||
normalizedEvent.type === 'tool_call' &&
|
|
||||||
!!toolCallId &&
|
|
||||||
!isPartialToolCall &&
|
|
||||||
(wasToolResultSeen(toolCallId) || wasToolCallSeen(toolCallId))
|
|
||||||
|
|
||||||
if (
|
|
||||||
normalizedEvent.type === 'tool_call' &&
|
|
||||||
toolCallId &&
|
|
||||||
!isPartialToolCall &&
|
|
||||||
!shouldSkipToolCall
|
|
||||||
) {
|
|
||||||
markToolCallSeen(toolCallId)
|
|
||||||
}
|
|
||||||
|
|
||||||
const shouldSkipToolResult =
|
|
||||||
normalizedEvent.type === 'tool_result' &&
|
|
||||||
(() => {
|
|
||||||
if (!toolCallId) return false
|
|
||||||
if (wasToolResultSeen(toolCallId)) return true
|
|
||||||
markToolResultSeen(toolCallId)
|
|
||||||
return false
|
|
||||||
})()
|
|
||||||
|
|
||||||
if (!shouldSkipToolCall && !shouldSkipToolResult) {
|
|
||||||
await forwardEvent(normalizedEvent, options)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (
|
|
||||||
normalizedEvent.type === 'structured_result' ||
|
|
||||||
normalizedEvent.type === 'subagent_result'
|
|
||||||
) {
|
|
||||||
structuredResult = normalizeStructuredResult(normalizedEvent.data)
|
|
||||||
context.streamComplete = true
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handle subagent_start/subagent_end events to track nested subagent calls
|
|
||||||
if (normalizedEvent.type === 'subagent_start') {
|
|
||||||
const toolCallId = normalizedEvent.data?.tool_call_id
|
|
||||||
if (toolCallId) {
|
|
||||||
context.subAgentParentToolCallId = toolCallId
|
|
||||||
context.subAgentContent[toolCallId] = ''
|
|
||||||
context.subAgentToolCalls[toolCallId] = []
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if (normalizedEvent.type === 'subagent_end') {
|
|
||||||
context.subAgentParentToolCallId = undefined
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// For direct subagent calls, events may have the subagent field set (e.g., subagent: "discovery")
|
|
||||||
// but no subagent_start event because this IS the top-level agent. Skip subagent routing
|
|
||||||
// for events where the subagent field matches the current agentId - these are top-level events.
|
|
||||||
const isTopLevelSubagentEvent =
|
|
||||||
normalizedEvent.subagent === agentId && !context.subAgentParentToolCallId
|
|
||||||
|
|
||||||
// Only route to subagent handlers for nested subagent events (not matching current agentId)
|
|
||||||
if (!isTopLevelSubagentEvent && handleSubagentRouting(normalizedEvent, context)) {
|
|
||||||
const handler = subAgentHandlers[normalizedEvent.type]
|
|
||||||
if (handler) {
|
|
||||||
await handler(normalizedEvent, context, execContext, options)
|
|
||||||
}
|
|
||||||
if (context.streamComplete) break
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process as a regular SSE event (including top-level subagent events)
|
|
||||||
const handler = sseHandlers[normalizedEvent.type]
|
|
||||||
if (handler) {
|
|
||||||
await handler(normalizedEvent, context, execContext, options)
|
|
||||||
}
|
|
||||||
if (context.streamComplete) break
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
clearTimeout(timeoutId)
|
|
||||||
}
|
|
||||||
|
|
||||||
const result = buildResult(context, structuredResult)
|
|
||||||
await options.onComplete?.(result)
|
|
||||||
return result
|
|
||||||
} catch (error) {
|
|
||||||
const err = error instanceof Error ? error : new Error('Subagent orchestration failed')
|
|
||||||
logger.error('Subagent orchestration failed', { error: err.message, agentId })
|
|
||||||
await options.onError?.(err)
|
|
||||||
return {
|
|
||||||
success: false,
|
|
||||||
content: context.accumulatedContent,
|
|
||||||
toolCalls: [],
|
|
||||||
error: err.message,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function forwardEvent(event: SSEEvent, options: OrchestratorOptions): Promise<void> {
|
|
||||||
try {
|
|
||||||
await options.onEvent?.(event)
|
|
||||||
} catch (error) {
|
|
||||||
logger.warn('Failed to forward SSE event', {
|
|
||||||
type: event.type,
|
|
||||||
error: error instanceof Error ? error.message : String(error),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function normalizeStructuredResult(data: any): SubagentOrchestratorResult['structuredResult'] {
|
|
||||||
if (!data || typeof data !== 'object') {
|
|
||||||
return undefined
|
|
||||||
}
|
|
||||||
return {
|
|
||||||
type: data.result_type || data.type,
|
|
||||||
summary: data.summary,
|
|
||||||
data: data.data ?? data,
|
|
||||||
success: data.success,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function buildExecutionContext(
|
|
||||||
userId: string,
|
|
||||||
workflowId?: string,
|
|
||||||
workspaceId?: string
|
|
||||||
): Promise<ExecutionContext> {
|
|
||||||
if (workflowId) {
|
|
||||||
return prepareExecutionContext(userId, workflowId)
|
|
||||||
}
|
|
||||||
|
|
||||||
const decryptedEnvVars = await getEffectiveDecryptedEnv(userId, workspaceId)
|
|
||||||
return {
|
|
||||||
userId,
|
|
||||||
workflowId: workflowId || '',
|
|
||||||
workspaceId,
|
|
||||||
decryptedEnvVars,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function buildResult(
|
|
||||||
context: StreamingContext,
|
|
||||||
structuredResult?: SubagentOrchestratorResult['structuredResult']
|
|
||||||
): SubagentOrchestratorResult {
|
|
||||||
const toolCalls: ToolCallSummary[] = Array.from(context.toolCalls.values()).map((toolCall) => ({
|
|
||||||
id: toolCall.id,
|
|
||||||
name: toolCall.name,
|
|
||||||
status: toolCall.status,
|
|
||||||
params: toolCall.params,
|
|
||||||
result: toolCall.result?.output,
|
|
||||||
error: toolCall.error,
|
|
||||||
durationMs:
|
|
||||||
toolCall.endTime && toolCall.startTime ? toolCall.endTime - toolCall.startTime : undefined,
|
|
||||||
}))
|
|
||||||
|
|
||||||
return {
|
|
||||||
success: context.errors.length === 0 && !context.wasAborted,
|
|
||||||
content: context.accumulatedContent,
|
|
||||||
toolCalls,
|
|
||||||
structuredResult,
|
|
||||||
errors: context.errors.length ? context.errors : undefined,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
File diff suppressed because it is too large
Load Diff
@@ -1,130 +0,0 @@
|
|||||||
import type { CopilotProviderConfig } from '@/lib/copilot/types'
|
|
||||||
|
|
||||||
export type SSEEventType =
|
|
||||||
| 'chat_id'
|
|
||||||
| 'title_updated'
|
|
||||||
| 'content'
|
|
||||||
| 'reasoning'
|
|
||||||
| 'tool_call'
|
|
||||||
| 'tool_generating'
|
|
||||||
| 'tool_result'
|
|
||||||
| 'tool_error'
|
|
||||||
| 'subagent_start'
|
|
||||||
| 'subagent_end'
|
|
||||||
| 'structured_result'
|
|
||||||
| 'subagent_result'
|
|
||||||
| 'done'
|
|
||||||
| 'error'
|
|
||||||
| 'start'
|
|
||||||
|
|
||||||
export interface SSEEvent {
|
|
||||||
type: SSEEventType
|
|
||||||
data?: any
|
|
||||||
subagent?: string
|
|
||||||
toolCallId?: string
|
|
||||||
toolName?: string
|
|
||||||
success?: boolean
|
|
||||||
result?: any
|
|
||||||
}
|
|
||||||
|
|
||||||
export type ToolCallStatus = 'pending' | 'executing' | 'success' | 'error' | 'skipped' | 'rejected'
|
|
||||||
|
|
||||||
export interface ToolCallState {
|
|
||||||
id: string
|
|
||||||
name: string
|
|
||||||
status: ToolCallStatus
|
|
||||||
params?: Record<string, any>
|
|
||||||
result?: ToolCallResult
|
|
||||||
error?: string
|
|
||||||
startTime?: number
|
|
||||||
endTime?: number
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface ToolCallResult {
|
|
||||||
success: boolean
|
|
||||||
output?: any
|
|
||||||
error?: string
|
|
||||||
}
|
|
||||||
|
|
||||||
export type ContentBlockType = 'text' | 'thinking' | 'tool_call' | 'subagent_text'
|
|
||||||
|
|
||||||
export interface ContentBlock {
|
|
||||||
type: ContentBlockType
|
|
||||||
content?: string
|
|
||||||
toolCall?: ToolCallState
|
|
||||||
timestamp: number
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface StreamingContext {
|
|
||||||
chatId?: string
|
|
||||||
conversationId?: string
|
|
||||||
messageId: string
|
|
||||||
accumulatedContent: string
|
|
||||||
contentBlocks: ContentBlock[]
|
|
||||||
toolCalls: Map<string, ToolCallState>
|
|
||||||
currentThinkingBlock: ContentBlock | null
|
|
||||||
isInThinkingBlock: boolean
|
|
||||||
subAgentParentToolCallId?: string
|
|
||||||
subAgentContent: Record<string, string>
|
|
||||||
subAgentToolCalls: Record<string, ToolCallState[]>
|
|
||||||
pendingContent: string
|
|
||||||
streamComplete: boolean
|
|
||||||
wasAborted: boolean
|
|
||||||
errors: string[]
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface OrchestratorRequest {
|
|
||||||
message: string
|
|
||||||
workflowId: string
|
|
||||||
userId: string
|
|
||||||
chatId?: string
|
|
||||||
mode?: 'agent' | 'ask' | 'plan'
|
|
||||||
model?: string
|
|
||||||
conversationId?: string
|
|
||||||
contexts?: Array<{ type: string; content: string }>
|
|
||||||
fileAttachments?: any[]
|
|
||||||
commands?: string[]
|
|
||||||
provider?: CopilotProviderConfig
|
|
||||||
streamToolCalls?: boolean
|
|
||||||
version?: string
|
|
||||||
prefetch?: boolean
|
|
||||||
userName?: string
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface OrchestratorOptions {
|
|
||||||
autoExecuteTools?: boolean
|
|
||||||
timeout?: number
|
|
||||||
onEvent?: (event: SSEEvent) => void | Promise<void>
|
|
||||||
onComplete?: (result: OrchestratorResult) => void | Promise<void>
|
|
||||||
onError?: (error: Error) => void | Promise<void>
|
|
||||||
abortSignal?: AbortSignal
|
|
||||||
interactive?: boolean
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface OrchestratorResult {
|
|
||||||
success: boolean
|
|
||||||
content: string
|
|
||||||
contentBlocks: ContentBlock[]
|
|
||||||
toolCalls: ToolCallSummary[]
|
|
||||||
chatId?: string
|
|
||||||
conversationId?: string
|
|
||||||
error?: string
|
|
||||||
errors?: string[]
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface ToolCallSummary {
|
|
||||||
id: string
|
|
||||||
name: string
|
|
||||||
status: ToolCallStatus
|
|
||||||
params?: Record<string, any>
|
|
||||||
result?: any
|
|
||||||
error?: string
|
|
||||||
durationMs?: number
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface ExecutionContext {
|
|
||||||
userId: string
|
|
||||||
workflowId: string
|
|
||||||
workspaceId?: string
|
|
||||||
decryptedEnvVars?: Record<string, string>
|
|
||||||
}
|
|
||||||
@@ -5,10 +5,7 @@ import {
|
|||||||
type BaseClientToolMetadata,
|
type BaseClientToolMetadata,
|
||||||
ClientToolCallState,
|
ClientToolCallState,
|
||||||
} from '@/lib/copilot/tools/client/base-tool'
|
} from '@/lib/copilot/tools/client/base-tool'
|
||||||
import {
|
import { sanitizeForCopilot } from '@/lib/workflows/sanitization/json-sanitizer'
|
||||||
formatWorkflowStateForCopilot,
|
|
||||||
normalizeWorkflowName,
|
|
||||||
} from '@/lib/copilot/tools/shared/workflow-utils'
|
|
||||||
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
||||||
|
|
||||||
const logger = createLogger('GetWorkflowFromNameClientTool')
|
const logger = createLogger('GetWorkflowFromNameClientTool')
|
||||||
@@ -70,9 +67,11 @@ export class GetWorkflowFromNameClientTool extends BaseClientTool {
|
|||||||
|
|
||||||
// Try to find by name from registry first to get ID
|
// Try to find by name from registry first to get ID
|
||||||
const registry = useWorkflowRegistry.getState()
|
const registry = useWorkflowRegistry.getState()
|
||||||
const targetName = normalizeWorkflowName(workflowName)
|
|
||||||
const match = Object.values((registry as any).workflows || {}).find(
|
const match = Object.values((registry as any).workflows || {}).find(
|
||||||
(w: any) => normalizeWorkflowName(w?.name) === targetName
|
(w: any) =>
|
||||||
|
String(w?.name || '')
|
||||||
|
.trim()
|
||||||
|
.toLowerCase() === workflowName.toLowerCase()
|
||||||
) as any
|
) as any
|
||||||
|
|
||||||
if (!match?.id) {
|
if (!match?.id) {
|
||||||
@@ -99,12 +98,15 @@ export class GetWorkflowFromNameClientTool extends BaseClientTool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Convert state to the same string format as get_user_workflow
|
// Convert state to the same string format as get_user_workflow
|
||||||
const userWorkflow = formatWorkflowStateForCopilot({
|
const workflowState = {
|
||||||
blocks: wf.state.blocks || {},
|
blocks: wf.state.blocks || {},
|
||||||
edges: wf.state.edges || [],
|
edges: wf.state.edges || [],
|
||||||
loops: wf.state.loops || {},
|
loops: wf.state.loops || {},
|
||||||
parallels: wf.state.parallels || {},
|
parallels: wf.state.parallels || {},
|
||||||
})
|
}
|
||||||
|
// Sanitize workflow state for copilot (remove UI-specific data)
|
||||||
|
const sanitizedState = sanitizeForCopilot(workflowState)
|
||||||
|
const userWorkflow = JSON.stringify(sanitizedState, null, 2)
|
||||||
|
|
||||||
await this.markToolComplete(200, `Retrieved workflow ${workflowName}`, { userWorkflow })
|
await this.markToolComplete(200, `Retrieved workflow ${workflowName}`, { userWorkflow })
|
||||||
this.setState(ClientToolCallState.success)
|
this.setState(ClientToolCallState.success)
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ import {
|
|||||||
type BaseClientToolMetadata,
|
type BaseClientToolMetadata,
|
||||||
ClientToolCallState,
|
ClientToolCallState,
|
||||||
} from '@/lib/copilot/tools/client/base-tool'
|
} from '@/lib/copilot/tools/client/base-tool'
|
||||||
import { extractWorkflowNames } from '@/lib/copilot/tools/shared/workflow-utils'
|
|
||||||
|
|
||||||
const logger = createLogger('ListUserWorkflowsClientTool')
|
const logger = createLogger('ListUserWorkflowsClientTool')
|
||||||
|
|
||||||
@@ -42,7 +41,9 @@ export class ListUserWorkflowsClientTool extends BaseClientTool {
|
|||||||
|
|
||||||
const json = await res.json()
|
const json = await res.json()
|
||||||
const workflows = Array.isArray(json?.data) ? json.data : []
|
const workflows = Array.isArray(json?.data) ? json.data : []
|
||||||
const names = extractWorkflowNames(workflows)
|
const names = workflows
|
||||||
|
.map((w: any) => (typeof w?.name === 'string' ? w.name : null))
|
||||||
|
.filter((n: string | null) => !!n)
|
||||||
|
|
||||||
logger.info('Found workflows', { count: names.length })
|
logger.info('Found workflows', { count: names.length })
|
||||||
|
|
||||||
|
|||||||
@@ -1,474 +0,0 @@
|
|||||||
export type DirectToolDef = {
|
|
||||||
name: string
|
|
||||||
description: string
|
|
||||||
inputSchema: { type: 'object'; properties?: Record<string, unknown>; required?: string[] }
|
|
||||||
toolId: string
|
|
||||||
}
|
|
||||||
|
|
||||||
export type SubagentToolDef = {
|
|
||||||
name: string
|
|
||||||
description: string
|
|
||||||
inputSchema: { type: 'object'; properties?: Record<string, unknown>; required?: string[] }
|
|
||||||
agentId: string
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Direct tools that execute immediately without LLM orchestration.
|
|
||||||
* These are fast database queries that don't need AI reasoning.
|
|
||||||
*/
|
|
||||||
export const DIRECT_TOOL_DEFS: DirectToolDef[] = [
|
|
||||||
{
|
|
||||||
name: 'list_workflows',
|
|
||||||
toolId: 'list_user_workflows',
|
|
||||||
description:
|
|
||||||
'List all workflows the user has access to. Returns workflow IDs, names, and workspace info.',
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
workspaceId: {
|
|
||||||
type: 'string',
|
|
||||||
description: 'Optional workspace ID to filter workflows.',
|
|
||||||
},
|
|
||||||
folderId: {
|
|
||||||
type: 'string',
|
|
||||||
description: 'Optional folder ID to filter workflows.',
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'list_workspaces',
|
|
||||||
toolId: 'list_user_workspaces',
|
|
||||||
description:
|
|
||||||
'List all workspaces the user has access to. Returns workspace IDs, names, and roles.',
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'list_folders',
|
|
||||||
toolId: 'list_folders',
|
|
||||||
description: 'List all folders in a workspace.',
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
workspaceId: {
|
|
||||||
type: 'string',
|
|
||||||
description: 'Workspace ID to list folders from.',
|
|
||||||
},
|
|
||||||
},
|
|
||||||
required: ['workspaceId'],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'get_workflow',
|
|
||||||
toolId: 'get_workflow_from_name',
|
|
||||||
description: 'Get a workflow by name or ID. Returns the full workflow definition.',
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
name: {
|
|
||||||
type: 'string',
|
|
||||||
description: 'Workflow name to search for.',
|
|
||||||
},
|
|
||||||
workflowId: {
|
|
||||||
type: 'string',
|
|
||||||
description: 'Workflow ID to retrieve directly.',
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'create_workflow',
|
|
||||||
toolId: 'create_workflow',
|
|
||||||
description: 'Create a new workflow. Returns the new workflow ID.',
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
name: {
|
|
||||||
type: 'string',
|
|
||||||
description: 'Name for the new workflow.',
|
|
||||||
},
|
|
||||||
workspaceId: {
|
|
||||||
type: 'string',
|
|
||||||
description: 'Optional workspace ID. Uses default workspace if not provided.',
|
|
||||||
},
|
|
||||||
folderId: {
|
|
||||||
type: 'string',
|
|
||||||
description: 'Optional folder ID to place the workflow in.',
|
|
||||||
},
|
|
||||||
description: {
|
|
||||||
type: 'string',
|
|
||||||
description: 'Optional description for the workflow.',
|
|
||||||
},
|
|
||||||
},
|
|
||||||
required: ['name'],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'create_folder',
|
|
||||||
toolId: 'create_folder',
|
|
||||||
description: 'Create a new folder in a workspace.',
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
name: {
|
|
||||||
type: 'string',
|
|
||||||
description: 'Name for the new folder.',
|
|
||||||
},
|
|
||||||
workspaceId: {
|
|
||||||
type: 'string',
|
|
||||||
description: 'Optional workspace ID. Uses default workspace if not provided.',
|
|
||||||
},
|
|
||||||
parentId: {
|
|
||||||
type: 'string',
|
|
||||||
description: 'Optional parent folder ID for nested folders.',
|
|
||||||
},
|
|
||||||
},
|
|
||||||
required: ['name'],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
]
|
|
||||||
|
|
||||||
export const SUBAGENT_TOOL_DEFS: SubagentToolDef[] = [
|
|
||||||
{
|
|
||||||
name: 'copilot_build',
|
|
||||||
agentId: 'build',
|
|
||||||
description: `Build a workflow end-to-end in a single step. This is the fast mode equivalent for headless/MCP usage.
|
|
||||||
|
|
||||||
USE THIS WHEN:
|
|
||||||
- Building a new workflow from scratch
|
|
||||||
- Modifying an existing workflow
|
|
||||||
- You want to gather information and build in one pass without separate plan→edit steps
|
|
||||||
|
|
||||||
WORKFLOW ID (REQUIRED):
|
|
||||||
- For NEW workflows: First call create_workflow to get a workflowId, then pass it here
|
|
||||||
- For EXISTING workflows: Always pass the workflowId parameter
|
|
||||||
|
|
||||||
CAN DO:
|
|
||||||
- Gather information about blocks, credentials, patterns
|
|
||||||
- Search documentation and patterns for best practices
|
|
||||||
- Add, modify, or remove blocks
|
|
||||||
- Configure block settings and connections
|
|
||||||
- Set environment variables and workflow variables
|
|
||||||
|
|
||||||
CANNOT DO:
|
|
||||||
- Run or test workflows (use copilot_test separately after deploying)
|
|
||||||
- Deploy workflows (use copilot_deploy separately)
|
|
||||||
|
|
||||||
WORKFLOW:
|
|
||||||
1. Call create_workflow to get a workflowId (for new workflows)
|
|
||||||
2. Call copilot_build with the request and workflowId
|
|
||||||
3. Build agent gathers info and builds in one pass
|
|
||||||
4. Call copilot_deploy to deploy the workflow
|
|
||||||
5. Optionally call copilot_test to verify it works`,
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
request: {
|
|
||||||
type: 'string',
|
|
||||||
description: 'What you want to build or modify in the workflow.',
|
|
||||||
},
|
|
||||||
workflowId: {
|
|
||||||
type: 'string',
|
|
||||||
description:
|
|
||||||
'REQUIRED. The workflow ID. For new workflows, call create_workflow first to get this.',
|
|
||||||
},
|
|
||||||
context: { type: 'object' },
|
|
||||||
},
|
|
||||||
required: ['request', 'workflowId'],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'copilot_discovery',
|
|
||||||
agentId: 'discovery',
|
|
||||||
description: `Find workflows by their contents or functionality when the user doesn't know the exact name or ID.
|
|
||||||
|
|
||||||
USE THIS WHEN:
|
|
||||||
- User describes a workflow by what it does: "the one that sends emails", "my Slack notification workflow"
|
|
||||||
- User refers to workflow contents: "the workflow with the OpenAI block"
|
|
||||||
- User needs to search/match workflows by functionality or description
|
|
||||||
|
|
||||||
DO NOT USE (use direct tools instead):
|
|
||||||
- User knows the workflow name → use get_workflow
|
|
||||||
- User wants to list all workflows → use list_workflows
|
|
||||||
- User wants to list workspaces → use list_workspaces
|
|
||||||
- User wants to list folders → use list_folders`,
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
request: { type: 'string' },
|
|
||||||
workspaceId: { type: 'string' },
|
|
||||||
context: { type: 'object' },
|
|
||||||
},
|
|
||||||
required: ['request'],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'copilot_plan',
|
|
||||||
agentId: 'plan',
|
|
||||||
description: `Plan workflow changes by gathering required information.
|
|
||||||
|
|
||||||
USE THIS WHEN:
|
|
||||||
- Building a new workflow
|
|
||||||
- Modifying an existing workflow
|
|
||||||
- You need to understand what blocks and integrations are available
|
|
||||||
- The workflow requires multiple blocks or connections
|
|
||||||
|
|
||||||
WORKFLOW ID (REQUIRED):
|
|
||||||
- For NEW workflows: First call create_workflow to get a workflowId, then pass it here
|
|
||||||
- For EXISTING workflows: Always pass the workflowId parameter
|
|
||||||
|
|
||||||
This tool gathers information about available blocks, credentials, and the current workflow state.
|
|
||||||
|
|
||||||
RETURNS: A plan object containing block configurations, connections, and technical details.
|
|
||||||
IMPORTANT: Pass the returned plan EXACTLY to copilot_edit - do not modify or summarize it.`,
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
request: {
|
|
||||||
type: 'string',
|
|
||||||
description: 'What you want to build or modify in the workflow.',
|
|
||||||
},
|
|
||||||
workflowId: {
|
|
||||||
type: 'string',
|
|
||||||
description:
|
|
||||||
'REQUIRED. The workflow ID. For new workflows, call create_workflow first to get this.',
|
|
||||||
},
|
|
||||||
context: { type: 'object' },
|
|
||||||
},
|
|
||||||
required: ['request', 'workflowId'],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'copilot_edit',
|
|
||||||
agentId: 'edit',
|
|
||||||
description: `Execute a workflow plan and apply edits.
|
|
||||||
|
|
||||||
USE THIS WHEN:
|
|
||||||
- You have a plan from copilot_plan that needs to be executed
|
|
||||||
- Building or modifying a workflow based on the plan
|
|
||||||
- Making changes to blocks, connections, or configurations
|
|
||||||
|
|
||||||
WORKFLOW ID (REQUIRED):
|
|
||||||
- You MUST provide the workflowId parameter
|
|
||||||
- For new workflows, get the workflowId from create_workflow first
|
|
||||||
|
|
||||||
PLAN (REQUIRED):
|
|
||||||
- Pass the EXACT plan object from copilot_plan in the context.plan field
|
|
||||||
- Do NOT modify, summarize, or interpret the plan - pass it verbatim
|
|
||||||
- The plan contains technical details the edit agent needs exactly as-is
|
|
||||||
|
|
||||||
IMPORTANT: After copilot_edit completes, you MUST call copilot_deploy before the workflow can be run or tested.`,
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
message: { type: 'string', description: 'Optional additional instructions for the edit.' },
|
|
||||||
workflowId: {
|
|
||||||
type: 'string',
|
|
||||||
description:
|
|
||||||
'REQUIRED. The workflow ID to edit. Get this from create_workflow for new workflows.',
|
|
||||||
},
|
|
||||||
plan: {
|
|
||||||
type: 'object',
|
|
||||||
description:
|
|
||||||
'The plan object from copilot_plan. Pass it EXACTLY as returned, do not modify.',
|
|
||||||
},
|
|
||||||
context: {
|
|
||||||
type: 'object',
|
|
||||||
description:
|
|
||||||
'Additional context. Put the plan in context.plan if not using the plan field directly.',
|
|
||||||
},
|
|
||||||
},
|
|
||||||
required: ['workflowId'],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'copilot_debug',
|
|
||||||
agentId: 'debug',
|
|
||||||
description: `Diagnose errors or unexpected workflow behavior.
|
|
||||||
|
|
||||||
WORKFLOW ID (REQUIRED): Always provide the workflowId of the workflow to debug.`,
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
error: { type: 'string', description: 'The error message or description of the issue.' },
|
|
||||||
workflowId: { type: 'string', description: 'REQUIRED. The workflow ID to debug.' },
|
|
||||||
context: { type: 'object' },
|
|
||||||
},
|
|
||||||
required: ['error', 'workflowId'],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'copilot_deploy',
|
|
||||||
agentId: 'deploy',
|
|
||||||
description: `Deploy or manage workflow deployments.
|
|
||||||
|
|
||||||
CRITICAL: You MUST deploy a workflow after building before it can be run or tested.
|
|
||||||
Workflows without an active deployment will fail with "no active deployment" error.
|
|
||||||
|
|
||||||
WORKFLOW ID (REQUIRED):
|
|
||||||
- Always provide the workflowId parameter
|
|
||||||
- This must match the workflow you built with copilot_edit
|
|
||||||
|
|
||||||
USE THIS:
|
|
||||||
- After copilot_edit completes to activate the workflow
|
|
||||||
- To update deployment settings
|
|
||||||
- To redeploy after making changes
|
|
||||||
|
|
||||||
DEPLOYMENT TYPES:
|
|
||||||
- "deploy as api" - REST API endpoint
|
|
||||||
- "deploy as chat" - Chat interface
|
|
||||||
- "deploy as mcp" - MCP server`,
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
request: {
|
|
||||||
type: 'string',
|
|
||||||
description: 'The deployment request, e.g. "deploy as api" or "deploy as chat"',
|
|
||||||
},
|
|
||||||
workflowId: {
|
|
||||||
type: 'string',
|
|
||||||
description: 'REQUIRED. The workflow ID to deploy.',
|
|
||||||
},
|
|
||||||
context: { type: 'object' },
|
|
||||||
},
|
|
||||||
required: ['request', 'workflowId'],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'copilot_auth',
|
|
||||||
agentId: 'auth',
|
|
||||||
description: 'Handle OAuth connection flows.',
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
request: { type: 'string' },
|
|
||||||
context: { type: 'object' },
|
|
||||||
},
|
|
||||||
required: ['request'],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'copilot_knowledge',
|
|
||||||
agentId: 'knowledge',
|
|
||||||
description: 'Create and manage knowledge bases.',
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
request: { type: 'string' },
|
|
||||||
context: { type: 'object' },
|
|
||||||
},
|
|
||||||
required: ['request'],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'copilot_custom_tool',
|
|
||||||
agentId: 'custom_tool',
|
|
||||||
description: 'Create or manage custom tools.',
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
request: { type: 'string' },
|
|
||||||
context: { type: 'object' },
|
|
||||||
},
|
|
||||||
required: ['request'],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'copilot_info',
|
|
||||||
agentId: 'info',
|
|
||||||
description: 'Inspect blocks, outputs, and workflow metadata.',
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
request: { type: 'string' },
|
|
||||||
workflowId: { type: 'string' },
|
|
||||||
context: { type: 'object' },
|
|
||||||
},
|
|
||||||
required: ['request'],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'copilot_workflow',
|
|
||||||
agentId: 'workflow',
|
|
||||||
description: 'Manage workflow environment and configuration.',
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
request: { type: 'string' },
|
|
||||||
workflowId: { type: 'string' },
|
|
||||||
context: { type: 'object' },
|
|
||||||
},
|
|
||||||
required: ['request'],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'copilot_research',
|
|
||||||
agentId: 'research',
|
|
||||||
description: 'Research external APIs and documentation.',
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
request: { type: 'string' },
|
|
||||||
context: { type: 'object' },
|
|
||||||
},
|
|
||||||
required: ['request'],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'copilot_tour',
|
|
||||||
agentId: 'tour',
|
|
||||||
description: 'Explain platform features and usage.',
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
request: { type: 'string' },
|
|
||||||
context: { type: 'object' },
|
|
||||||
},
|
|
||||||
required: ['request'],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'copilot_test',
|
|
||||||
agentId: 'test',
|
|
||||||
description: `Run workflows and verify outputs.
|
|
||||||
|
|
||||||
PREREQUISITE: The workflow MUST be deployed first using copilot_deploy.
|
|
||||||
Undeployed workflows will fail with "no active deployment" error.
|
|
||||||
|
|
||||||
WORKFLOW ID (REQUIRED):
|
|
||||||
- Always provide the workflowId parameter
|
|
||||||
|
|
||||||
USE THIS:
|
|
||||||
- After deploying to verify the workflow works correctly
|
|
||||||
- To test with sample inputs
|
|
||||||
- To validate workflow behavior before sharing with user`,
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
request: { type: 'string' },
|
|
||||||
workflowId: {
|
|
||||||
type: 'string',
|
|
||||||
description: 'REQUIRED. The workflow ID to test.',
|
|
||||||
},
|
|
||||||
context: { type: 'object' },
|
|
||||||
},
|
|
||||||
required: ['request', 'workflowId'],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'copilot_superagent',
|
|
||||||
agentId: 'superagent',
|
|
||||||
description: 'Execute direct external actions (email, Slack, etc.).',
|
|
||||||
inputSchema: {
|
|
||||||
type: 'object',
|
|
||||||
properties: {
|
|
||||||
request: { type: 'string' },
|
|
||||||
context: { type: 'object' },
|
|
||||||
},
|
|
||||||
required: ['request'],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
]
|
|
||||||
@@ -6,13 +6,9 @@ import { eq } from 'drizzle-orm'
|
|||||||
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
|
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
|
||||||
import { validateSelectorIds } from '@/lib/copilot/validation/selector-validator'
|
import { validateSelectorIds } from '@/lib/copilot/validation/selector-validator'
|
||||||
import type { PermissionGroupConfig } from '@/lib/permission-groups/types'
|
import type { PermissionGroupConfig } from '@/lib/permission-groups/types'
|
||||||
import { applyAutoLayout } from '@/lib/workflows/autolayout'
|
|
||||||
import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
|
import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
|
||||||
import { extractAndPersistCustomTools } from '@/lib/workflows/persistence/custom-tools-persistence'
|
import { extractAndPersistCustomTools } from '@/lib/workflows/persistence/custom-tools-persistence'
|
||||||
import {
|
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
|
||||||
loadWorkflowFromNormalizedTables,
|
|
||||||
saveWorkflowToNormalizedTables,
|
|
||||||
} from '@/lib/workflows/persistence/utils'
|
|
||||||
import { isValidKey } from '@/lib/workflows/sanitization/key-validation'
|
import { isValidKey } from '@/lib/workflows/sanitization/key-validation'
|
||||||
import { validateWorkflowState } from '@/lib/workflows/sanitization/validation'
|
import { validateWorkflowState } from '@/lib/workflows/sanitization/validation'
|
||||||
import { buildCanonicalIndex, isCanonicalPair } from '@/lib/workflows/subblocks/visibility'
|
import { buildCanonicalIndex, isCanonicalPair } from '@/lib/workflows/subblocks/visibility'
|
||||||
@@ -1403,101 +1399,6 @@ function filterDisallowedTools(
|
|||||||
return allowedTools
|
return allowedTools
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Normalizes block IDs in operations to ensure they are valid UUIDs.
|
|
||||||
* The LLM may generate human-readable IDs like "web_search" or "research_agent"
|
|
||||||
* which need to be converted to proper UUIDs for database compatibility.
|
|
||||||
*
|
|
||||||
* Returns the normalized operations and a mapping from old IDs to new UUIDs.
|
|
||||||
*/
|
|
||||||
function normalizeBlockIdsInOperations(operations: EditWorkflowOperation[]): {
|
|
||||||
normalizedOperations: EditWorkflowOperation[]
|
|
||||||
idMapping: Map<string, string>
|
|
||||||
} {
|
|
||||||
const logger = createLogger('EditWorkflowServerTool')
|
|
||||||
const idMapping = new Map<string, string>()
|
|
||||||
|
|
||||||
// First pass: collect all non-UUID block_ids from add/insert operations
|
|
||||||
for (const op of operations) {
|
|
||||||
if (op.operation_type === 'add' || op.operation_type === 'insert_into_subflow') {
|
|
||||||
if (op.block_id && !UUID_REGEX.test(op.block_id)) {
|
|
||||||
const newId = crypto.randomUUID()
|
|
||||||
idMapping.set(op.block_id, newId)
|
|
||||||
logger.debug('Normalizing block ID', { oldId: op.block_id, newId })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (idMapping.size === 0) {
|
|
||||||
return { normalizedOperations: operations, idMapping }
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info('Normalizing block IDs in operations', {
|
|
||||||
normalizedCount: idMapping.size,
|
|
||||||
mappings: Object.fromEntries(idMapping),
|
|
||||||
})
|
|
||||||
|
|
||||||
// Helper to replace an ID if it's in the mapping
|
|
||||||
const replaceId = (id: string | undefined): string | undefined => {
|
|
||||||
if (!id) return id
|
|
||||||
return idMapping.get(id) ?? id
|
|
||||||
}
|
|
||||||
|
|
||||||
// Second pass: update all references to use new UUIDs
|
|
||||||
const normalizedOperations = operations.map((op) => {
|
|
||||||
const normalized: EditWorkflowOperation = {
|
|
||||||
...op,
|
|
||||||
block_id: replaceId(op.block_id) ?? op.block_id,
|
|
||||||
}
|
|
||||||
|
|
||||||
if (op.params) {
|
|
||||||
normalized.params = { ...op.params }
|
|
||||||
|
|
||||||
// Update subflowId references (for insert_into_subflow)
|
|
||||||
if (normalized.params.subflowId) {
|
|
||||||
normalized.params.subflowId = replaceId(normalized.params.subflowId)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update connection references
|
|
||||||
if (normalized.params.connections) {
|
|
||||||
const normalizedConnections: Record<string, any> = {}
|
|
||||||
for (const [handle, targets] of Object.entries(normalized.params.connections)) {
|
|
||||||
if (typeof targets === 'string') {
|
|
||||||
normalizedConnections[handle] = replaceId(targets)
|
|
||||||
} else if (Array.isArray(targets)) {
|
|
||||||
normalizedConnections[handle] = targets.map((t) => {
|
|
||||||
if (typeof t === 'string') return replaceId(t)
|
|
||||||
if (t && typeof t === 'object' && t.block) {
|
|
||||||
return { ...t, block: replaceId(t.block) }
|
|
||||||
}
|
|
||||||
return t
|
|
||||||
})
|
|
||||||
} else if (targets && typeof targets === 'object' && (targets as any).block) {
|
|
||||||
normalizedConnections[handle] = { ...targets, block: replaceId((targets as any).block) }
|
|
||||||
} else {
|
|
||||||
normalizedConnections[handle] = targets
|
|
||||||
}
|
|
||||||
}
|
|
||||||
normalized.params.connections = normalizedConnections
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update nestedNodes block IDs
|
|
||||||
if (normalized.params.nestedNodes) {
|
|
||||||
const normalizedNestedNodes: Record<string, any> = {}
|
|
||||||
for (const [childId, childBlock] of Object.entries(normalized.params.nestedNodes)) {
|
|
||||||
const newChildId = replaceId(childId) ?? childId
|
|
||||||
normalizedNestedNodes[newChildId] = childBlock
|
|
||||||
}
|
|
||||||
normalized.params.nestedNodes = normalizedNestedNodes
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return normalized
|
|
||||||
})
|
|
||||||
|
|
||||||
return { normalizedOperations, idMapping }
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Apply operations directly to the workflow JSON state
|
* Apply operations directly to the workflow JSON state
|
||||||
*/
|
*/
|
||||||
@@ -1517,11 +1418,6 @@ function applyOperationsToWorkflowState(
|
|||||||
|
|
||||||
// Log initial state
|
// Log initial state
|
||||||
const logger = createLogger('EditWorkflowServerTool')
|
const logger = createLogger('EditWorkflowServerTool')
|
||||||
|
|
||||||
// Normalize block IDs to UUIDs before processing
|
|
||||||
const { normalizedOperations } = normalizeBlockIdsInOperations(operations)
|
|
||||||
operations = normalizedOperations
|
|
||||||
|
|
||||||
logger.info('Applying operations to workflow:', {
|
logger.info('Applying operations to workflow:', {
|
||||||
totalOperations: operations.length,
|
totalOperations: operations.length,
|
||||||
operationTypes: operations.reduce((acc: any, op) => {
|
operationTypes: operations.reduce((acc: any, op) => {
|
||||||
@@ -3266,59 +3162,10 @@ export const editWorkflowServerTool: BaseServerTool<EditWorkflowParams, any> = {
|
|||||||
const skippedMessages =
|
const skippedMessages =
|
||||||
skippedItems.length > 0 ? skippedItems.map((item) => item.reason) : undefined
|
skippedItems.length > 0 ? skippedItems.map((item) => item.reason) : undefined
|
||||||
|
|
||||||
// Persist the workflow state to the database
|
// Return the modified workflow state for the client to convert to YAML if needed
|
||||||
const finalWorkflowState = validation.sanitizedState || modifiedWorkflowState
|
|
||||||
|
|
||||||
// Apply autolayout to position blocks properly
|
|
||||||
const layoutResult = applyAutoLayout(finalWorkflowState.blocks, finalWorkflowState.edges, {
|
|
||||||
horizontalSpacing: 250,
|
|
||||||
verticalSpacing: 100,
|
|
||||||
padding: { x: 100, y: 100 },
|
|
||||||
})
|
|
||||||
|
|
||||||
const layoutedBlocks =
|
|
||||||
layoutResult.success && layoutResult.blocks ? layoutResult.blocks : finalWorkflowState.blocks
|
|
||||||
|
|
||||||
if (!layoutResult.success) {
|
|
||||||
logger.warn('Autolayout failed, using default positions', {
|
|
||||||
workflowId,
|
|
||||||
error: layoutResult.error,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
const workflowStateForDb = {
|
|
||||||
blocks: layoutedBlocks,
|
|
||||||
edges: finalWorkflowState.edges,
|
|
||||||
loops: generateLoopBlocks(layoutedBlocks as any),
|
|
||||||
parallels: generateParallelBlocks(layoutedBlocks as any),
|
|
||||||
lastSaved: Date.now(),
|
|
||||||
isDeployed: false,
|
|
||||||
}
|
|
||||||
|
|
||||||
const saveResult = await saveWorkflowToNormalizedTables(workflowId, workflowStateForDb as any)
|
|
||||||
if (!saveResult.success) {
|
|
||||||
logger.error('Failed to persist workflow state to database', {
|
|
||||||
workflowId,
|
|
||||||
error: saveResult.error,
|
|
||||||
})
|
|
||||||
throw new Error(`Failed to save workflow: ${saveResult.error}`)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update workflow's lastSynced timestamp
|
|
||||||
await db
|
|
||||||
.update(workflowTable)
|
|
||||||
.set({
|
|
||||||
lastSynced: new Date(),
|
|
||||||
updatedAt: new Date(),
|
|
||||||
})
|
|
||||||
.where(eq(workflowTable.id, workflowId))
|
|
||||||
|
|
||||||
logger.info('Workflow state persisted to database', { workflowId })
|
|
||||||
|
|
||||||
// Return the modified workflow state with autolayout applied
|
|
||||||
return {
|
return {
|
||||||
success: true,
|
success: true,
|
||||||
workflowState: { ...finalWorkflowState, blocks: layoutedBlocks },
|
workflowState: validation.sanitizedState || modifiedWorkflowState,
|
||||||
// Include input validation errors so the LLM can see what was rejected
|
// Include input validation errors so the LLM can see what was rejected
|
||||||
...(inputErrors && {
|
...(inputErrors && {
|
||||||
inputValidationErrors: inputErrors,
|
inputValidationErrors: inputErrors,
|
||||||
|
|||||||
@@ -1,38 +0,0 @@
|
|||||||
import { sanitizeForCopilot } from '@/lib/workflows/sanitization/json-sanitizer'
|
|
||||||
|
|
||||||
type CopilotWorkflowState = {
|
|
||||||
blocks?: Record<string, any>
|
|
||||||
edges?: any[]
|
|
||||||
loops?: Record<string, any>
|
|
||||||
parallels?: Record<string, any>
|
|
||||||
}
|
|
||||||
|
|
||||||
export function formatWorkflowStateForCopilot(state: CopilotWorkflowState): string {
|
|
||||||
const workflowState = {
|
|
||||||
blocks: state.blocks || {},
|
|
||||||
edges: state.edges || [],
|
|
||||||
loops: state.loops || {},
|
|
||||||
parallels: state.parallels || {},
|
|
||||||
}
|
|
||||||
const sanitized = sanitizeForCopilot(workflowState)
|
|
||||||
return JSON.stringify(sanitized, null, 2)
|
|
||||||
}
|
|
||||||
|
|
||||||
export function formatNormalizedWorkflowForCopilot(
|
|
||||||
normalized: CopilotWorkflowState | null | undefined
|
|
||||||
): string | null {
|
|
||||||
if (!normalized) return null
|
|
||||||
return formatWorkflowStateForCopilot(normalized)
|
|
||||||
}
|
|
||||||
|
|
||||||
export function normalizeWorkflowName(name?: string | null): string {
|
|
||||||
return String(name || '')
|
|
||||||
.trim()
|
|
||||||
.toLowerCase()
|
|
||||||
}
|
|
||||||
|
|
||||||
export function extractWorkflowNames(workflows: Array<{ name?: string | null }>): string[] {
|
|
||||||
return workflows
|
|
||||||
.map((workflow) => (typeof workflow?.name === 'string' ? workflow.name : null))
|
|
||||||
.filter((name): name is string => Boolean(name))
|
|
||||||
}
|
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
import { db } from '@sim/db'
|
import { db } from '@sim/db'
|
||||||
import { permissions, userStats, workflow as workflowTable } from '@sim/db/schema'
|
import { permissions, userStats, workflow as workflowTable } from '@sim/db/schema'
|
||||||
import { createLogger } from '@sim/logger'
|
import { createLogger } from '@sim/logger'
|
||||||
import { and, asc, eq, inArray, or } from 'drizzle-orm'
|
import { and, eq } from 'drizzle-orm'
|
||||||
import { NextResponse } from 'next/server'
|
import { NextResponse } from 'next/server'
|
||||||
import { getSession } from '@/lib/auth'
|
import { getSession } from '@/lib/auth'
|
||||||
import { getWorkspaceWithOwner, type PermissionType } from '@/lib/workspaces/permissions/utils'
|
import { getWorkspaceWithOwner, type PermissionType } from '@/lib/workspaces/permissions/utils'
|
||||||
@@ -15,53 +15,6 @@ export async function getWorkflowById(id: string) {
|
|||||||
return rows[0]
|
return rows[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function resolveWorkflowIdForUser(
|
|
||||||
userId: string,
|
|
||||||
workflowId?: string,
|
|
||||||
workflowName?: string
|
|
||||||
): Promise<{ workflowId: string; workflowName?: string } | null> {
|
|
||||||
if (workflowId) {
|
|
||||||
return { workflowId }
|
|
||||||
}
|
|
||||||
|
|
||||||
const workspaceIds = await db
|
|
||||||
.select({ entityId: permissions.entityId })
|
|
||||||
.from(permissions)
|
|
||||||
.where(and(eq(permissions.userId, userId), eq(permissions.entityType, 'workspace')))
|
|
||||||
|
|
||||||
const workspaceIdList = workspaceIds.map((row) => row.entityId)
|
|
||||||
|
|
||||||
const workflowConditions = [eq(workflowTable.userId, userId)]
|
|
||||||
if (workspaceIdList.length > 0) {
|
|
||||||
workflowConditions.push(inArray(workflowTable.workspaceId, workspaceIdList))
|
|
||||||
}
|
|
||||||
|
|
||||||
const workflows = await db
|
|
||||||
.select()
|
|
||||||
.from(workflowTable)
|
|
||||||
.where(or(...workflowConditions))
|
|
||||||
.orderBy(asc(workflowTable.sortOrder), asc(workflowTable.createdAt), asc(workflowTable.id))
|
|
||||||
|
|
||||||
if (workflows.length === 0) {
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
|
|
||||||
if (workflowName) {
|
|
||||||
const match = workflows.find(
|
|
||||||
(w) =>
|
|
||||||
String(w.name || '')
|
|
||||||
.trim()
|
|
||||||
.toLowerCase() === workflowName.toLowerCase()
|
|
||||||
)
|
|
||||||
if (match) {
|
|
||||||
return { workflowId: match.id, workflowName: match.name || undefined }
|
|
||||||
}
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
|
|
||||||
return { workflowId: workflows[0].id, workflowName: workflows[0].name || undefined }
|
|
||||||
}
|
|
||||||
|
|
||||||
type WorkflowRecord = ReturnType<typeof getWorkflowById> extends Promise<infer R>
|
type WorkflowRecord = ReturnType<typeof getWorkflowById> extends Promise<infer R>
|
||||||
? NonNullable<R>
|
? NonNullable<R>
|
||||||
: never
|
: never
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -33,20 +33,6 @@ export interface CopilotToolCall {
|
|||||||
subAgentStreaming?: boolean
|
subAgentStreaming?: boolean
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface CopilotStreamInfo {
|
|
||||||
streamId: string
|
|
||||||
workflowId: string
|
|
||||||
chatId?: string
|
|
||||||
userMessageId: string
|
|
||||||
assistantMessageId: string
|
|
||||||
lastEventId: number
|
|
||||||
resumeAttempts: number
|
|
||||||
userMessageContent: string
|
|
||||||
fileAttachments?: MessageFileAttachment[]
|
|
||||||
contexts?: ChatContext[]
|
|
||||||
startedAt: number
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface MessageFileAttachment {
|
export interface MessageFileAttachment {
|
||||||
id: string
|
id: string
|
||||||
key: string
|
key: string
|
||||||
@@ -168,9 +154,6 @@ export interface CopilotState {
|
|||||||
// Auto-allowed integration tools (tools that can run without confirmation)
|
// Auto-allowed integration tools (tools that can run without confirmation)
|
||||||
autoAllowedTools: string[]
|
autoAllowedTools: string[]
|
||||||
|
|
||||||
// Active stream metadata for reconnect/replay
|
|
||||||
activeStream: CopilotStreamInfo | null
|
|
||||||
|
|
||||||
// Message queue for messages sent while another is in progress
|
// Message queue for messages sent while another is in progress
|
||||||
messageQueue: QueuedMessage[]
|
messageQueue: QueuedMessage[]
|
||||||
|
|
||||||
@@ -211,7 +194,6 @@ export interface CopilotActions {
|
|||||||
toolCallState: 'accepted' | 'rejected' | 'error',
|
toolCallState: 'accepted' | 'rejected' | 'error',
|
||||||
toolCallId?: string
|
toolCallId?: string
|
||||||
) => void
|
) => void
|
||||||
resumeActiveStream: () => Promise<boolean>
|
|
||||||
setToolCallState: (toolCall: any, newState: ClientToolCallState, options?: any) => void
|
setToolCallState: (toolCall: any, newState: ClientToolCallState, options?: any) => void
|
||||||
updateToolCallParams: (toolCallId: string, params: Record<string, any>) => void
|
updateToolCallParams: (toolCallId: string, params: Record<string, any>) => void
|
||||||
sendDocsMessage: (query: string, options?: { stream?: boolean; topK?: number }) => Promise<void>
|
sendDocsMessage: (query: string, options?: { stream?: boolean; topK?: number }) => Promise<void>
|
||||||
@@ -246,10 +228,11 @@ export interface CopilotActions {
|
|||||||
stream: ReadableStream,
|
stream: ReadableStream,
|
||||||
messageId: string,
|
messageId: string,
|
||||||
isContinuation?: boolean,
|
isContinuation?: boolean,
|
||||||
triggerUserMessageId?: string,
|
triggerUserMessageId?: string
|
||||||
abortSignal?: AbortSignal
|
|
||||||
) => Promise<void>
|
) => Promise<void>
|
||||||
handleNewChatCreation: (newChatId: string) => Promise<void>
|
handleNewChatCreation: (newChatId: string) => Promise<void>
|
||||||
|
executeIntegrationTool: (toolCallId: string) => Promise<void>
|
||||||
|
skipIntegrationTool: (toolCallId: string) => void
|
||||||
loadAutoAllowedTools: () => Promise<void>
|
loadAutoAllowedTools: () => Promise<void>
|
||||||
addAutoAllowedTool: (toolId: string) => Promise<void>
|
addAutoAllowedTool: (toolId: string) => Promise<void>
|
||||||
removeAutoAllowedTool: (toolId: string) => Promise<void>
|
removeAutoAllowedTool: (toolId: string) => Promise<void>
|
||||||
|
|||||||
@@ -121,13 +121,6 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
|
|||||||
|
|
||||||
const candidateState = diffResult.diff.proposedState
|
const candidateState = diffResult.diff.proposedState
|
||||||
|
|
||||||
logger.info('[WorkflowDiff] Applying proposed state', {
|
|
||||||
blockCount: Object.keys(candidateState.blocks || {}).length,
|
|
||||||
edgeCount: candidateState.edges?.length ?? 0,
|
|
||||||
hasLoops: !!candidateState.loops,
|
|
||||||
hasParallels: !!candidateState.parallels,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Validate proposed workflow using serializer round-trip
|
// Validate proposed workflow using serializer round-trip
|
||||||
const serializer = new Serializer()
|
const serializer = new Serializer()
|
||||||
const serialized = serializer.serializeWorkflow(
|
const serialized = serializer.serializeWorkflow(
|
||||||
@@ -141,7 +134,6 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
|
|||||||
|
|
||||||
// OPTIMISTIC: Apply state immediately to stores (this is what makes UI update)
|
// OPTIMISTIC: Apply state immediately to stores (this is what makes UI update)
|
||||||
applyWorkflowStateToStores(activeWorkflowId, candidateState)
|
applyWorkflowStateToStores(activeWorkflowId, candidateState)
|
||||||
logger.info('[WorkflowDiff] Applied state to stores')
|
|
||||||
|
|
||||||
// OPTIMISTIC: Update diff state immediately so UI shows the diff
|
// OPTIMISTIC: Update diff state immediately so UI shows the diff
|
||||||
const triggerMessageId =
|
const triggerMessageId =
|
||||||
|
|||||||
@@ -37,26 +37,10 @@ export function applyWorkflowStateToStores(
|
|||||||
workflowState: WorkflowState,
|
workflowState: WorkflowState,
|
||||||
options?: { updateLastSaved?: boolean }
|
options?: { updateLastSaved?: boolean }
|
||||||
) {
|
) {
|
||||||
logger.info('[applyWorkflowStateToStores] Applying state', {
|
|
||||||
workflowId,
|
|
||||||
blockCount: Object.keys(workflowState.blocks || {}).length,
|
|
||||||
edgeCount: workflowState.edges?.length ?? 0,
|
|
||||||
edgePreview: workflowState.edges?.slice(0, 3).map((e) => `${e.source} -> ${e.target}`),
|
|
||||||
})
|
|
||||||
const workflowStore = useWorkflowStore.getState()
|
const workflowStore = useWorkflowStore.getState()
|
||||||
const cloned = cloneWorkflowState(workflowState)
|
workflowStore.replaceWorkflowState(cloneWorkflowState(workflowState), options)
|
||||||
logger.info('[applyWorkflowStateToStores] Cloned state edges', {
|
|
||||||
clonedEdgeCount: cloned.edges?.length ?? 0,
|
|
||||||
})
|
|
||||||
workflowStore.replaceWorkflowState(cloned, options)
|
|
||||||
const subBlockValues = extractSubBlockValues(workflowState)
|
const subBlockValues = extractSubBlockValues(workflowState)
|
||||||
useSubBlockStore.getState().setWorkflowValues(workflowId, subBlockValues)
|
useSubBlockStore.getState().setWorkflowValues(workflowId, subBlockValues)
|
||||||
|
|
||||||
// Verify what's in the store after apply
|
|
||||||
const afterState = workflowStore.getWorkflowState()
|
|
||||||
logger.info('[applyWorkflowStateToStores] After apply', {
|
|
||||||
afterEdgeCount: afterState.edges?.length ?? 0,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export function captureBaselineSnapshot(workflowId: string): WorkflowState {
|
export function captureBaselineSnapshot(workflowId: string): WorkflowState {
|
||||||
|
|||||||
@@ -1,927 +0,0 @@
|
|||||||
# Copilot Server-Side Refactor Plan
|
|
||||||
|
|
||||||
> **Goal**: Move copilot orchestration logic from the browser (React/Zustand) to the Next.js server, enabling both headless API access and a simplified interactive client.
|
|
||||||
|
|
||||||
## Table of Contents
|
|
||||||
|
|
||||||
1. [Executive Summary](#executive-summary)
|
|
||||||
2. [Current Architecture](#current-architecture)
|
|
||||||
3. [Target Architecture](#target-architecture)
|
|
||||||
4. [Scope & Boundaries](#scope--boundaries)
|
|
||||||
5. [Module Design](#module-design)
|
|
||||||
6. [Implementation Plan](#implementation-plan)
|
|
||||||
7. [API Contracts](#api-contracts)
|
|
||||||
8. [Migration Strategy](#migration-strategy)
|
|
||||||
9. [Testing Strategy](#testing-strategy)
|
|
||||||
10. [Risks & Mitigations](#risks--mitigations)
|
|
||||||
11. [File Inventory](#file-inventory)
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Executive Summary
|
|
||||||
|
|
||||||
### Problem
|
|
||||||
|
|
||||||
The current copilot implementation in Sim has all orchestration logic in the browser:
|
|
||||||
- SSE stream parsing happens in the React client
|
|
||||||
- Tool execution is triggered from the browser
|
|
||||||
- OAuth tokens are sent to the client
|
|
||||||
- No headless/API access is possible
|
|
||||||
- The Zustand store is ~4,200 lines of complex async logic
|
|
||||||
|
|
||||||
### Solution
|
|
||||||
|
|
||||||
Move orchestration to the Next.js server:
|
|
||||||
- Server parses SSE from copilot backend
|
|
||||||
- Server executes tools directly (no HTTP round-trips)
|
|
||||||
- Server forwards events to client (if attached)
|
|
||||||
- Headless API returns JSON response
|
|
||||||
- Client store becomes a thin UI layer (~600 lines)
|
|
||||||
|
|
||||||
### Benefits
|
|
||||||
|
|
||||||
| Aspect | Before | After |
|
|
||||||
|--------|--------|-------|
|
|
||||||
| Security | OAuth tokens in browser | Tokens stay server-side |
|
|
||||||
| Headless access | Not possible | Full API support |
|
|
||||||
| Store complexity | ~4,200 lines | ~600 lines |
|
|
||||||
| Tool execution | Browser-initiated | Server-side |
|
|
||||||
| Testing | Complex async | Simple state |
|
|
||||||
| Bundle size | Large (tool classes) | Minimal |
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Current Architecture
|
|
||||||
|
|
||||||
```
|
|
||||||
┌─────────────────────────────────────────────────────────────────────────────┐
|
|
||||||
│ BROWSER (React) │
|
|
||||||
├─────────────────────────────────────────────────────────────────────────────┤
|
|
||||||
│ │
|
|
||||||
│ ┌─────────────────────────────────────────────────────────────────────────┐│
|
|
||||||
│ │ Copilot Store (4,200 lines) ││
|
|
||||||
│ │ ││
|
|
||||||
│ │ • SSE stream parsing (parseSSEStream) ││
|
|
||||||
│ │ • Event handlers (sseHandlers, subAgentSSEHandlers) ││
|
|
||||||
│ │ • Tool execution logic ││
|
|
||||||
│ │ • Client tool instantiation ││
|
|
||||||
│ │ • Content block processing ││
|
|
||||||
│ │ • State management ││
|
|
||||||
│ │ • UI state ││
|
|
||||||
│ └─────────────────────────────────────────────────────────────────────────┘│
|
|
||||||
│ │ │
|
|
||||||
│ │ HTTP calls for tool execution │
|
|
||||||
│ ▼ │
|
|
||||||
└─────────────────────────────────────────────────────────────────────────────┘
|
|
||||||
│
|
|
||||||
▼
|
|
||||||
┌─────────────────────────────────────────────────────────────────────────────┐
|
|
||||||
│ NEXT.JS SERVER │
|
|
||||||
├─────────────────────────────────────────────────────────────────────────────┤
|
|
||||||
│ │
|
|
||||||
│ /api/copilot/chat - Proxy to copilot backend (pass-through) │
|
|
||||||
│ /api/copilot/execute-tool - Execute integration tools │
|
|
||||||
│ /api/copilot/confirm - Update Redis with tool status │
|
|
||||||
│ /api/copilot/tools/mark-complete - Notify copilot backend │
|
|
||||||
│ /api/copilot/execute-copilot-server-tool - Execute server tools │
|
|
||||||
│ │
|
|
||||||
└─────────────────────────────────────────────────────────────────────────────┘
|
|
||||||
│
|
|
||||||
▼
|
|
||||||
┌─────────────────────────────────────────────────────────────────────────────┐
|
|
||||||
│ COPILOT BACKEND (Go) │
|
|
||||||
│ copilot.sim.ai │
|
|
||||||
├─────────────────────────────────────────────────────────────────────────────┤
|
|
||||||
│ │
|
|
||||||
│ • LLM orchestration │
|
|
||||||
│ • Subagent system (plan, edit, debug, etc.) │
|
|
||||||
│ • Tool definitions │
|
|
||||||
│ • Conversation management │
|
|
||||||
│ • SSE streaming │
|
|
||||||
│ │
|
|
||||||
└─────────────────────────────────────────────────────────────────────────────┘
|
|
||||||
```
|
|
||||||
|
|
||||||
### Current Flow (Interactive)
|
|
||||||
|
|
||||||
1. User sends message in UI
|
|
||||||
2. Store calls `/api/copilot/chat`
|
|
||||||
3. Chat route proxies to copilot backend, streams SSE back
|
|
||||||
4. **Store parses SSE in browser**
|
|
||||||
5. On `tool_call` event:
|
|
||||||
- Store decides if tool needs confirmation
|
|
||||||
- Store calls `/api/copilot/execute-tool` or `/api/copilot/execute-copilot-server-tool`
|
|
||||||
- Store calls `/api/copilot/tools/mark-complete`
|
|
||||||
6. Store updates UI state
|
|
||||||
|
|
||||||
### Problems with Current Flow
|
|
||||||
|
|
||||||
1. **No headless access**: Must have browser client
|
|
||||||
2. **Security**: OAuth tokens sent to browser for tool execution
|
|
||||||
3. **Complexity**: All orchestration logic in Zustand store
|
|
||||||
4. **Performance**: Multiple HTTP round-trips from browser
|
|
||||||
5. **Reliability**: Browser can disconnect mid-operation
|
|
||||||
6. **Testing**: Hard to test async browser logic
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Target Architecture
|
|
||||||
|
|
||||||
```
|
|
||||||
┌─────────────────────────────────────────────────────────────────────────────┐
|
|
||||||
│ BROWSER (React) │
|
|
||||||
├─────────────────────────────────────────────────────────────────────────────┤
|
|
||||||
│ │
|
|
||||||
│ ┌─────────────────────────────────────────────────────────────────────────┐│
|
|
||||||
│ │ Copilot Store (~600 lines) ││
|
|
||||||
│ │ ││
|
|
||||||
│ │ • UI state (messages, toolCalls display) ││
|
|
||||||
│ │ • Event listener (receive server events) ││
|
|
||||||
│ │ • User actions (send message, confirm/reject) ││
|
|
||||||
│ │ • Simple API calls ││
|
|
||||||
│ └─────────────────────────────────────────────────────────────────────────┘│
|
|
||||||
│ │ │
|
|
||||||
│ │ SSE events from server │
|
|
||||||
│ │ │
|
|
||||||
└─────────────────────────────────────────────────────────────────────────────┘
|
|
||||||
▲
|
|
||||||
│ (Optional - headless mode has no client)
|
|
||||||
│
|
|
||||||
┌─────────────────────────────────────────────────────────────────────────────┐
|
|
||||||
│ NEXT.JS SERVER │
|
|
||||||
├─────────────────────────────────────────────────────────────────────────────┤
|
|
||||||
│ │
|
|
||||||
│ ┌─────────────────────────────────────────────────────────────────────────┐│
|
|
||||||
│ │ Orchestrator Module (NEW) ││
|
|
||||||
│ │ lib/copilot/orchestrator/ ││
|
|
||||||
│ │ ││
|
|
||||||
│ │ • SSE stream parsing ││
|
|
||||||
│ │ • Event handlers ││
|
|
||||||
│ │ • Tool execution (direct function calls) ││
|
|
||||||
│ │ • Response building ││
|
|
||||||
│ │ • Event forwarding (to client if attached) ││
|
|
||||||
│ └─────────────────────────────────────────────────────────────────────────┘│
|
|
||||||
│ │ │
|
|
||||||
│ ┌──────┴──────┐ │
|
|
||||||
│ │ │ │
|
|
||||||
│ ▼ ▼ │
|
|
||||||
│ /api/copilot/chat /api/v1/copilot/chat │
|
|
||||||
│ (Interactive) (Headless) │
|
|
||||||
│ - Session auth - API key auth │
|
|
||||||
│ - SSE to client - JSON response │
|
|
||||||
│ │
|
|
||||||
└─────────────────────────────────────────────────────────────────────────────┘
|
|
||||||
│
|
|
||||||
│ (Single external HTTP call)
|
|
||||||
▼
|
|
||||||
┌─────────────────────────────────────────────────────────────────────────────┐
|
|
||||||
│ COPILOT BACKEND (Go) │
|
|
||||||
│ (UNCHANGED - no modifications) │
|
|
||||||
└─────────────────────────────────────────────────────────────────────────────┘
|
|
||||||
```
|
|
||||||
|
|
||||||
### Target Flow (Headless)
|
|
||||||
|
|
||||||
1. External client calls `POST /api/v1/copilot/chat` with API key
|
|
||||||
2. Orchestrator calls copilot backend
|
|
||||||
3. **Server parses SSE stream**
|
|
||||||
4. **Server executes tools directly** (no HTTP)
|
|
||||||
5. Server notifies copilot backend (mark-complete)
|
|
||||||
6. Server returns JSON response
|
|
||||||
|
|
||||||
### Target Flow (Interactive)
|
|
||||||
|
|
||||||
1. User sends message in UI
|
|
||||||
2. Store calls `/api/copilot/chat`
|
|
||||||
3. **Server orchestrates everything**
|
|
||||||
4. Server forwards events to client via SSE
|
|
||||||
5. Client just updates UI from events
|
|
||||||
6. Server returns when complete
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Scope & Boundaries
|
|
||||||
|
|
||||||
### In Scope
|
|
||||||
|
|
||||||
| Item | Description |
|
|
||||||
|------|-------------|
|
|
||||||
| Orchestrator module | New module in `lib/copilot/orchestrator/` |
|
|
||||||
| Headless API route | New route `POST /api/v1/copilot/chat` |
|
|
||||||
| SSE parsing | Move from store to server |
|
|
||||||
| Tool execution | Direct function calls on server |
|
|
||||||
| Event forwarding | SSE to client (interactive mode) |
|
|
||||||
| Store simplification | Reduce to UI-only logic |
|
|
||||||
|
|
||||||
### Out of Scope
|
|
||||||
|
|
||||||
| Item | Reason |
|
|
||||||
|------|--------|
|
|
||||||
| Copilot backend (Go) | Separate repo, working correctly |
|
|
||||||
| Tool definitions | Already work, just called differently |
|
|
||||||
| LLM providers | Handled by copilot backend |
|
|
||||||
| Subagent system | Handled by copilot backend |
|
|
||||||
|
|
||||||
### Boundaries
|
|
||||||
|
|
||||||
```
|
|
||||||
┌─────────────────────────────────────┐
|
|
||||||
│ MODIFICATION ZONE │
|
|
||||||
│ │
|
|
||||||
┌────────────────┼─────────────────────────────────────┼────────────────┐
|
|
||||||
│ │ │ │
|
|
||||||
│ UNCHANGED │ apps/sim/ │ UNCHANGED │
|
|
||||||
│ │ ├── lib/copilot/orchestrator/ │ │
|
|
||||||
│ copilot/ │ │ └── (NEW) │ apps/sim/ │
|
|
||||||
│ (Go backend) │ ├── app/api/v1/copilot/ │ tools/ │
|
|
||||||
│ │ │ └── (NEW) │ (definitions)│
|
|
||||||
│ │ ├── app/api/copilot/chat/ │ │
|
|
||||||
│ │ │ └── (MODIFIED) │ │
|
|
||||||
│ │ └── stores/panel/copilot/ │ │
|
|
||||||
│ │ └── (SIMPLIFIED) │ │
|
|
||||||
│ │ │ │
|
|
||||||
└────────────────┼─────────────────────────────────────┼────────────────┘
|
|
||||||
│ │
|
|
||||||
└─────────────────────────────────────┘
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Module Design
|
|
||||||
|
|
||||||
### Directory Structure
|
|
||||||
|
|
||||||
```
|
|
||||||
apps/sim/lib/copilot/orchestrator/
|
|
||||||
├── index.ts # Main orchestrator function
|
|
||||||
├── types.ts # Type definitions
|
|
||||||
├── sse-parser.ts # Parse SSE stream from copilot backend
|
|
||||||
├── sse-handlers.ts # Handle each SSE event type
|
|
||||||
├── tool-executor.ts # Execute tools directly (no HTTP)
|
|
||||||
├── persistence.ts # Database and Redis operations
|
|
||||||
└── response-builder.ts # Build final response
|
|
||||||
```
|
|
||||||
|
|
||||||
### Module Responsibilities
|
|
||||||
|
|
||||||
#### `types.ts`
|
|
||||||
|
|
||||||
Defines all types used by the orchestrator:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
// SSE Events
|
|
||||||
interface SSEEvent { type, data, subagent?, toolCallId?, toolName? }
|
|
||||||
type SSEEventType = 'content' | 'tool_call' | 'tool_result' | 'done' | ...
|
|
||||||
|
|
||||||
// Tool State
|
|
||||||
interface ToolCallState { id, name, status, params?, result?, error? }
|
|
||||||
type ToolCallStatus = 'pending' | 'executing' | 'success' | 'error' | 'skipped'
|
|
||||||
|
|
||||||
// Streaming Context (internal state during orchestration)
|
|
||||||
interface StreamingContext {
|
|
||||||
chatId?, conversationId?, messageId
|
|
||||||
accumulatedContent, contentBlocks
|
|
||||||
toolCalls: Map<string, ToolCallState>
|
|
||||||
streamComplete, errors[]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Orchestrator API
|
|
||||||
interface OrchestratorRequest { message, workflowId, userId, chatId?, mode?, ... }
|
|
||||||
interface OrchestratorOptions { autoExecuteTools?, onEvent?, timeout?, ... }
|
|
||||||
interface OrchestratorResult { success, content, toolCalls[], chatId?, error? }
|
|
||||||
|
|
||||||
// Execution Context (passed to tool executors)
|
|
||||||
interface ExecutionContext { userId, workflowId, workspaceId?, decryptedEnvVars? }
|
|
||||||
```
|
|
||||||
|
|
||||||
#### `sse-parser.ts`
|
|
||||||
|
|
||||||
Parses SSE stream into typed events:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
async function* parseSSEStream(
|
|
||||||
reader: ReadableStreamDefaultReader,
|
|
||||||
decoder: TextDecoder,
|
|
||||||
abortSignal?: AbortSignal
|
|
||||||
): AsyncGenerator<SSEEvent>
|
|
||||||
```
|
|
||||||
|
|
||||||
- Handles buffering for partial lines
|
|
||||||
- Parses JSON from `data:` lines
|
|
||||||
- Yields typed `SSEEvent` objects
|
|
||||||
- Supports abort signal
|
|
||||||
|
|
||||||
#### `sse-handlers.ts`
|
|
||||||
|
|
||||||
Handles each SSE event type:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
const sseHandlers: Record<SSEEventType, SSEHandler> = {
|
|
||||||
content: (event, context) => { /* append to accumulated content */ },
|
|
||||||
tool_call: async (event, context, execContext, options) => {
|
|
||||||
/* track tool, execute if autoExecuteTools */
|
|
||||||
},
|
|
||||||
tool_result: (event, context) => { /* update tool status */ },
|
|
||||||
tool_generating: (event, context) => { /* create pending tool */ },
|
|
||||||
reasoning: (event, context) => { /* handle thinking blocks */ },
|
|
||||||
done: (event, context) => { /* mark stream complete */ },
|
|
||||||
error: (event, context) => { /* record error */ },
|
|
||||||
// ... etc
|
|
||||||
}
|
|
||||||
|
|
||||||
const subAgentHandlers: Record<SSEEventType, SSEHandler> = {
|
|
||||||
// Handlers for events within subagent context
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
#### `tool-executor.ts`
|
|
||||||
|
|
||||||
Executes tools directly without HTTP:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
// Main entry point
|
|
||||||
async function executeToolServerSide(
|
|
||||||
toolCall: ToolCallState,
|
|
||||||
context: ExecutionContext
|
|
||||||
): Promise<ToolCallResult>
|
|
||||||
|
|
||||||
// Server tools (edit_workflow, search_documentation, etc.)
|
|
||||||
async function executeServerToolDirect(
|
|
||||||
toolName: string,
|
|
||||||
params: Record<string, any>,
|
|
||||||
context: ExecutionContext
|
|
||||||
): Promise<ToolCallResult>
|
|
||||||
|
|
||||||
// Integration tools (slack_send, gmail_read, etc.)
|
|
||||||
async function executeIntegrationToolDirect(
|
|
||||||
toolCallId: string,
|
|
||||||
toolName: string,
|
|
||||||
toolConfig: ToolConfig,
|
|
||||||
params: Record<string, any>,
|
|
||||||
context: ExecutionContext
|
|
||||||
): Promise<ToolCallResult>
|
|
||||||
|
|
||||||
// Notify copilot backend (external HTTP - required)
|
|
||||||
async function markToolComplete(
|
|
||||||
toolCallId: string,
|
|
||||||
toolName: string,
|
|
||||||
status: number,
|
|
||||||
message?: any,
|
|
||||||
data?: any
|
|
||||||
): Promise<boolean>
|
|
||||||
|
|
||||||
// Prepare cached context for tool execution
|
|
||||||
async function prepareExecutionContext(
|
|
||||||
userId: string,
|
|
||||||
workflowId: string
|
|
||||||
): Promise<ExecutionContext>
|
|
||||||
```
|
|
||||||
|
|
||||||
**Key principle**: Internal tool execution uses direct function calls. Only `markToolComplete` makes HTTP call (to copilot backend - external).
|
|
||||||
|
|
||||||
#### `persistence.ts`
|
|
||||||
|
|
||||||
Database and Redis operations:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
// Chat persistence
|
|
||||||
async function createChat(params): Promise<{ id: string }>
|
|
||||||
async function loadChat(chatId, userId): Promise<Chat | null>
|
|
||||||
async function saveMessages(chatId, messages, options?): Promise<void>
|
|
||||||
async function updateChatConversationId(chatId, conversationId): Promise<void>
|
|
||||||
|
|
||||||
// Tool confirmation (Redis)
|
|
||||||
async function setToolConfirmation(toolCallId, status, message?): Promise<boolean>
|
|
||||||
async function getToolConfirmation(toolCallId): Promise<Confirmation | null>
|
|
||||||
```
|
|
||||||
|
|
||||||
#### `index.ts`
|
|
||||||
|
|
||||||
Main orchestrator function:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
async function orchestrateCopilotRequest(
|
|
||||||
request: OrchestratorRequest,
|
|
||||||
options: OrchestratorOptions = {}
|
|
||||||
): Promise<OrchestratorResult> {
|
|
||||||
|
|
||||||
// 1. Prepare execution context (cache env vars, etc.)
|
|
||||||
const execContext = await prepareExecutionContext(userId, workflowId)
|
|
||||||
|
|
||||||
// 2. Handle chat creation/loading
|
|
||||||
let chatId = await resolveChat(request)
|
|
||||||
|
|
||||||
// 3. Build request payload for copilot backend
|
|
||||||
const payload = buildCopilotPayload(request)
|
|
||||||
|
|
||||||
// 4. Call copilot backend
|
|
||||||
const response = await fetch(COPILOT_URL, { body: JSON.stringify(payload) })
|
|
||||||
|
|
||||||
// 5. Create streaming context
|
|
||||||
const context = createStreamingContext(chatId)
|
|
||||||
|
|
||||||
// 6. Parse and handle SSE stream
|
|
||||||
for await (const event of parseSSEStream(response.body)) {
|
|
||||||
// Forward to client if attached
|
|
||||||
options.onEvent?.(event)
|
|
||||||
|
|
||||||
// Handle event
|
|
||||||
const handler = getHandler(event)
|
|
||||||
await handler(event, context, execContext, options)
|
|
||||||
|
|
||||||
if (context.streamComplete) break
|
|
||||||
}
|
|
||||||
|
|
||||||
// 7. Persist to database
|
|
||||||
await persistChat(chatId, context)
|
|
||||||
|
|
||||||
// 8. Build and return result
|
|
||||||
return buildResult(context)
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Implementation Plan
|
|
||||||
|
|
||||||
### Phase 1: Create Orchestrator Module (3-4 days)
|
|
||||||
|
|
||||||
**Goal**: Build the orchestrator module that can run independently.
|
|
||||||
|
|
||||||
#### Tasks
|
|
||||||
|
|
||||||
1. **Create `types.ts`** (~200 lines)
|
|
||||||
- [ ] Define SSE event types
|
|
||||||
- [ ] Define tool call state types
|
|
||||||
- [ ] Define streaming context type
|
|
||||||
- [ ] Define orchestrator request/response types
|
|
||||||
- [ ] Define execution context type
|
|
||||||
|
|
||||||
2. **Create `sse-parser.ts`** (~80 lines)
|
|
||||||
- [ ] Extract parsing logic from store.ts
|
|
||||||
- [ ] Add abort signal support
|
|
||||||
- [ ] Add error handling
|
|
||||||
|
|
||||||
3. **Create `persistence.ts`** (~120 lines)
|
|
||||||
- [ ] Extract DB operations from chat route
|
|
||||||
- [ ] Extract Redis operations from confirm route
|
|
||||||
- [ ] Add chat creation/loading
|
|
||||||
- [ ] Add message saving
|
|
||||||
|
|
||||||
4. **Create `tool-executor.ts`** (~300 lines)
|
|
||||||
- [ ] Create `executeToolServerSide()` main entry
|
|
||||||
- [ ] Create `executeServerToolDirect()` for server tools
|
|
||||||
- [ ] Create `executeIntegrationToolDirect()` for integration tools
|
|
||||||
- [ ] Create `markToolComplete()` for copilot backend notification
|
|
||||||
- [ ] Create `prepareExecutionContext()` for caching
|
|
||||||
- [ ] Handle OAuth token resolution
|
|
||||||
- [ ] Handle env var resolution
|
|
||||||
|
|
||||||
5. **Create `sse-handlers.ts`** (~350 lines)
|
|
||||||
- [ ] Extract handlers from store.ts
|
|
||||||
- [ ] Adapt for server-side context
|
|
||||||
- [ ] Add tool execution integration
|
|
||||||
- [ ] Add subagent handlers
|
|
||||||
|
|
||||||
6. **Create `index.ts`** (~250 lines)
|
|
||||||
- [ ] Create `orchestrateCopilotRequest()` main function
|
|
||||||
- [ ] Wire together all modules
|
|
||||||
- [ ] Add timeout handling
|
|
||||||
- [ ] Add abort signal support
|
|
||||||
- [ ] Add event forwarding
|
|
||||||
|
|
||||||
#### Deliverables
|
|
||||||
|
|
||||||
- Complete `lib/copilot/orchestrator/` module
|
|
||||||
- Unit tests for each component
|
|
||||||
- Integration test for full orchestration
|
|
||||||
|
|
||||||
### Phase 2: Create Headless API Route (1 day)
|
|
||||||
|
|
||||||
**Goal**: Create API endpoint for headless copilot access.
|
|
||||||
|
|
||||||
#### Tasks
|
|
||||||
|
|
||||||
1. **Create route** `app/api/v1/copilot/chat/route.ts` (~100 lines)
|
|
||||||
- [ ] Add API key authentication
|
|
||||||
- [ ] Parse and validate request
|
|
||||||
- [ ] Call orchestrator
|
|
||||||
- [ ] Return JSON response
|
|
||||||
|
|
||||||
2. **Add to API documentation**
|
|
||||||
- [ ] Document request format
|
|
||||||
- [ ] Document response format
|
|
||||||
- [ ] Document error codes
|
|
||||||
|
|
||||||
#### Deliverables
|
|
||||||
|
|
||||||
- Working `POST /api/v1/copilot/chat` endpoint
|
|
||||||
- API documentation
|
|
||||||
- E2E test
|
|
||||||
|
|
||||||
### Phase 3: Wire Interactive Route (2 days)
|
|
||||||
|
|
||||||
**Goal**: Use orchestrator for existing interactive flow.
|
|
||||||
|
|
||||||
#### Tasks
|
|
||||||
|
|
||||||
1. **Modify `/api/copilot/chat/route.ts`**
|
|
||||||
- [ ] Add feature flag for new vs old flow
|
|
||||||
- [ ] Call orchestrator with `onEvent` callback
|
|
||||||
- [ ] Forward events to client via SSE
|
|
||||||
- [ ] Maintain backward compatibility
|
|
||||||
|
|
||||||
2. **Test both flows**
|
|
||||||
- [ ] Verify interactive works with new orchestrator
|
|
||||||
- [ ] Verify old flow still works (feature flag off)
|
|
||||||
|
|
||||||
#### Deliverables
|
|
||||||
|
|
||||||
- Interactive route using orchestrator
|
|
||||||
- Feature flag for gradual rollout
|
|
||||||
- No breaking changes
|
|
||||||
|
|
||||||
### Phase 4: Simplify Client Store (2-3 days)
|
|
||||||
|
|
||||||
**Goal**: Remove orchestration logic from client, keep UI-only.
|
|
||||||
|
|
||||||
#### Tasks
|
|
||||||
|
|
||||||
1. **Create simplified store** (new file or gradual refactor)
|
|
||||||
- [ ] Keep: UI state, messages, tool display
|
|
||||||
- [ ] Keep: Simple API calls
|
|
||||||
- [ ] Keep: Event listener
|
|
||||||
- [ ] Remove: SSE parsing
|
|
||||||
- [ ] Remove: Tool execution logic
|
|
||||||
- [ ] Remove: Client tool instantiators
|
|
||||||
|
|
||||||
2. **Update components**
|
|
||||||
- [ ] Update components to use simplified store
|
|
||||||
- [ ] Remove tool execution from UI components
|
|
||||||
- [ ] Simplify tool display components
|
|
||||||
|
|
||||||
3. **Remove dead code**
|
|
||||||
- [ ] Remove unused imports
|
|
||||||
- [ ] Remove unused helper functions
|
|
||||||
- [ ] Remove client tool classes (if no longer needed)
|
|
||||||
|
|
||||||
#### Deliverables
|
|
||||||
|
|
||||||
- Simplified store (~600 lines)
|
|
||||||
- Updated components
|
|
||||||
- Reduced bundle size
|
|
||||||
|
|
||||||
### Phase 5: Testing & Polish (2-3 days)
|
|
||||||
|
|
||||||
#### Tasks
|
|
||||||
|
|
||||||
1. **E2E testing**
|
|
||||||
- [ ] Test headless API with various prompts
|
|
||||||
- [ ] Test interactive with various prompts
|
|
||||||
- [ ] Test tool execution scenarios
|
|
||||||
- [ ] Test error handling
|
|
||||||
- [ ] Test abort/timeout scenarios
|
|
||||||
|
|
||||||
2. **Performance testing**
|
|
||||||
- [ ] Compare latency (old vs new)
|
|
||||||
- [ ] Check memory usage
|
|
||||||
- [ ] Check for connection issues
|
|
||||||
|
|
||||||
3. **Documentation**
|
|
||||||
- [ ] Update developer docs
|
|
||||||
- [ ] Add architecture diagram
|
|
||||||
- [ ] Document new API
|
|
||||||
|
|
||||||
#### Deliverables
|
|
||||||
|
|
||||||
- Comprehensive test suite
|
|
||||||
- Performance benchmarks
|
|
||||||
- Complete documentation
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## API Contracts
|
|
||||||
|
|
||||||
### Headless API
|
|
||||||
|
|
||||||
#### Request
|
|
||||||
|
|
||||||
```http
|
|
||||||
POST /api/v1/copilot/chat
|
|
||||||
Content-Type: application/json
|
|
||||||
X-API-Key: sim_xxx
|
|
||||||
|
|
||||||
{
|
|
||||||
"message": "Create a Slack notification workflow",
|
|
||||||
"workflowId": "wf_abc123",
|
|
||||||
"chatId": "chat_xyz", // Optional: continue existing chat
|
|
||||||
"mode": "agent", // Optional: "agent" | "ask" | "plan"
|
|
||||||
"model": "claude-4-sonnet", // Optional
|
|
||||||
"autoExecuteTools": true, // Optional: default true
|
|
||||||
"timeout": 300000 // Optional: default 5 minutes
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
#### Response (Success)
|
|
||||||
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"success": true,
|
|
||||||
"content": "I've created a Slack notification workflow that...",
|
|
||||||
"toolCalls": [
|
|
||||||
{
|
|
||||||
"id": "tc_001",
|
|
||||||
"name": "search_patterns",
|
|
||||||
"status": "success",
|
|
||||||
"params": { "query": "slack notification" },
|
|
||||||
"result": { "patterns": [...] },
|
|
||||||
"durationMs": 234
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "tc_002",
|
|
||||||
"name": "edit_workflow",
|
|
||||||
"status": "success",
|
|
||||||
"params": { "operations": [...] },
|
|
||||||
"result": { "blocksAdded": 3 },
|
|
||||||
"durationMs": 1523
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"chatId": "chat_xyz",
|
|
||||||
"conversationId": "conv_123"
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
#### Response (Error)
|
|
||||||
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"success": false,
|
|
||||||
"error": "Workflow not found",
|
|
||||||
"content": "",
|
|
||||||
"toolCalls": []
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
#### Error Codes
|
|
||||||
|
|
||||||
| Status | Error | Description |
|
|
||||||
|--------|-------|-------------|
|
|
||||||
| 400 | Invalid request | Missing required fields |
|
|
||||||
| 401 | Unauthorized | Invalid or missing API key |
|
|
||||||
| 404 | Workflow not found | Workflow ID doesn't exist |
|
|
||||||
| 500 | Internal error | Server-side failure |
|
|
||||||
| 504 | Timeout | Request exceeded timeout |
|
|
||||||
|
|
||||||
### Interactive API (Existing - Modified)
|
|
||||||
|
|
||||||
The existing `/api/copilot/chat` endpoint continues to work but now uses the orchestrator internally. SSE events forwarded to client remain the same format.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Migration Strategy
|
|
||||||
|
|
||||||
### Rollout Plan
|
|
||||||
|
|
||||||
```
|
|
||||||
Week 1: Phase 1 (Orchestrator)
|
|
||||||
├── Day 1-2: Types + SSE Parser
|
|
||||||
├── Day 3: Tool Executor
|
|
||||||
└── Day 4-5: Handlers + Main Orchestrator
|
|
||||||
|
|
||||||
Week 2: Phase 2-3 (Routes)
|
|
||||||
├── Day 1: Headless API route
|
|
||||||
├── Day 2-3: Wire interactive route
|
|
||||||
└── Day 4-5: Testing both modes
|
|
||||||
|
|
||||||
Week 3: Phase 4-5 (Cleanup)
|
|
||||||
├── Day 1-3: Simplify store
|
|
||||||
├── Day 4: Testing
|
|
||||||
└── Day 5: Documentation
|
|
||||||
```
|
|
||||||
|
|
||||||
### Feature Flags
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
// lib/copilot/config.ts
|
|
||||||
|
|
||||||
export const COPILOT_FLAGS = {
|
|
||||||
// Use new orchestrator for interactive mode
|
|
||||||
USE_SERVER_ORCHESTRATOR: process.env.COPILOT_USE_SERVER_ORCHESTRATOR === 'true',
|
|
||||||
|
|
||||||
// Enable headless API
|
|
||||||
ENABLE_HEADLESS_API: process.env.COPILOT_ENABLE_HEADLESS_API === 'true',
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### Rollback Plan
|
|
||||||
|
|
||||||
If issues arise:
|
|
||||||
1. Set `COPILOT_USE_SERVER_ORCHESTRATOR=false`
|
|
||||||
2. Interactive mode falls back to old client-side flow
|
|
||||||
3. Headless API returns 503 Service Unavailable
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Testing Strategy
|
|
||||||
|
|
||||||
### Unit Tests
|
|
||||||
|
|
||||||
```
|
|
||||||
lib/copilot/orchestrator/
|
|
||||||
├── __tests__/
|
|
||||||
│ ├── sse-parser.test.ts
|
|
||||||
│ ├── sse-handlers.test.ts
|
|
||||||
│ ├── tool-executor.test.ts
|
|
||||||
│ ├── persistence.test.ts
|
|
||||||
│ └── index.test.ts
|
|
||||||
```
|
|
||||||
|
|
||||||
#### SSE Parser Tests
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
describe('parseSSEStream', () => {
|
|
||||||
it('parses content events')
|
|
||||||
it('parses tool_call events')
|
|
||||||
it('handles partial lines')
|
|
||||||
it('handles malformed JSON')
|
|
||||||
it('respects abort signal')
|
|
||||||
})
|
|
||||||
```
|
|
||||||
|
|
||||||
#### Tool Executor Tests
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
describe('executeToolServerSide', () => {
|
|
||||||
it('executes server tools directly')
|
|
||||||
it('executes integration tools with OAuth')
|
|
||||||
it('resolves env var references')
|
|
||||||
it('handles tool not found')
|
|
||||||
it('handles execution errors')
|
|
||||||
})
|
|
||||||
```
|
|
||||||
|
|
||||||
### Integration Tests
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
describe('orchestrateCopilotRequest', () => {
|
|
||||||
it('handles simple message without tools')
|
|
||||||
it('handles message with single tool call')
|
|
||||||
it('handles message with multiple tool calls')
|
|
||||||
it('handles subagent tool calls')
|
|
||||||
it('handles stream errors')
|
|
||||||
it('respects timeout')
|
|
||||||
it('forwards events to callback')
|
|
||||||
})
|
|
||||||
```
|
|
||||||
|
|
||||||
### E2E Tests
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
describe('POST /api/v1/copilot/chat', () => {
|
|
||||||
it('returns 401 without API key')
|
|
||||||
it('returns 400 with invalid request')
|
|
||||||
it('executes simple ask query')
|
|
||||||
it('executes workflow modification')
|
|
||||||
it('handles tool execution')
|
|
||||||
})
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Risks & Mitigations
|
|
||||||
|
|
||||||
### Risk 1: Breaking Interactive Mode
|
|
||||||
|
|
||||||
**Risk**: Refactoring could break existing interactive copilot.
|
|
||||||
|
|
||||||
**Mitigation**:
|
|
||||||
- Feature flag for gradual rollout
|
|
||||||
- Keep old code path available
|
|
||||||
- Extensive E2E testing
|
|
||||||
- Staged deployment (internal → beta → production)
|
|
||||||
|
|
||||||
### Risk 2: Tool Execution Differences
|
|
||||||
|
|
||||||
**Risk**: Tool behavior differs between client and server execution.
|
|
||||||
|
|
||||||
**Mitigation**:
|
|
||||||
- Reuse existing tool execution logic (same functions)
|
|
||||||
- Compare outputs in parallel testing
|
|
||||||
- Log discrepancies for investigation
|
|
||||||
|
|
||||||
### Risk 3: Performance Regression
|
|
||||||
|
|
||||||
**Risk**: Server-side orchestration could be slower.
|
|
||||||
|
|
||||||
**Mitigation**:
|
|
||||||
- Actually should be faster (no browser round-trips)
|
|
||||||
- Benchmark before/after
|
|
||||||
- Profile critical paths
|
|
||||||
|
|
||||||
### Risk 4: Memory Usage
|
|
||||||
|
|
||||||
**Risk**: Server accumulates state during long-running requests.
|
|
||||||
|
|
||||||
**Mitigation**:
|
|
||||||
- Set reasonable timeouts
|
|
||||||
- Clean up context after request
|
|
||||||
- Monitor memory in production
|
|
||||||
|
|
||||||
### Risk 5: Connection Issues
|
|
||||||
|
|
||||||
**Risk**: Long-running SSE connections could drop.
|
|
||||||
|
|
||||||
**Mitigation**:
|
|
||||||
- Implement reconnection logic
|
|
||||||
- Save checkpoints to resume
|
|
||||||
- Handle partial completions gracefully
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## File Inventory
|
|
||||||
|
|
||||||
### New Files
|
|
||||||
|
|
||||||
| File | Lines | Description |
|
|
||||||
|------|-------|-------------|
|
|
||||||
| `lib/copilot/orchestrator/types.ts` | ~200 | Type definitions |
|
|
||||||
| `lib/copilot/orchestrator/sse-parser.ts` | ~80 | SSE stream parsing |
|
|
||||||
| `lib/copilot/orchestrator/sse-handlers.ts` | ~350 | Event handlers |
|
|
||||||
| `lib/copilot/orchestrator/tool-executor.ts` | ~300 | Tool execution |
|
|
||||||
| `lib/copilot/orchestrator/persistence.ts` | ~120 | DB/Redis operations |
|
|
||||||
| `lib/copilot/orchestrator/index.ts` | ~250 | Main orchestrator |
|
|
||||||
| `app/api/v1/copilot/chat/route.ts` | ~100 | Headless API |
|
|
||||||
| **Total New** | **~1,400** | |
|
|
||||||
|
|
||||||
### Modified Files
|
|
||||||
|
|
||||||
| File | Change |
|
|
||||||
|------|--------|
|
|
||||||
| `app/api/copilot/chat/route.ts` | Use orchestrator (optional) |
|
|
||||||
| `stores/panel/copilot/store.ts` | Simplify to ~600 lines |
|
|
||||||
|
|
||||||
### Deleted Code (from store.ts)
|
|
||||||
|
|
||||||
| Section | Lines Removed |
|
|
||||||
|---------|---------------|
|
|
||||||
| SSE parsing logic | ~150 |
|
|
||||||
| `sseHandlers` object | ~750 |
|
|
||||||
| `subAgentSSEHandlers` | ~280 |
|
|
||||||
| Tool execution logic | ~400 |
|
|
||||||
| Client tool instantiators | ~120 |
|
|
||||||
| Content block helpers | ~200 |
|
|
||||||
| Streaming context | ~100 |
|
|
||||||
| **Total Removed** | **~2,000** |
|
|
||||||
|
|
||||||
### Net Change
|
|
||||||
|
|
||||||
```
|
|
||||||
New code: +1,400 lines (orchestrator module)
|
|
||||||
Removed code: -2,000 lines (from store)
|
|
||||||
Modified code: ~200 lines (route changes)
|
|
||||||
───────────────────────────────────────
|
|
||||||
Net change: -400 lines (cleaner, more maintainable)
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Appendix: Code Extraction Map
|
|
||||||
|
|
||||||
### From `stores/panel/copilot/store.ts`
|
|
||||||
|
|
||||||
| Source Lines | Destination | Notes |
|
|
||||||
|--------------|-------------|-------|
|
|
||||||
| 900-1050 (parseSSEStream) | `sse-parser.ts` | Adapt for server |
|
|
||||||
| 1120-1867 (sseHandlers) | `sse-handlers.ts` | Remove Zustand deps |
|
|
||||||
| 1940-2217 (subAgentSSEHandlers) | `sse-handlers.ts` | Merge with above |
|
|
||||||
| 1365-1583 (tool execution) | `tool-executor.ts` | Direct calls |
|
|
||||||
| 330-380 (StreamingContext) | `types.ts` | Clean up |
|
|
||||||
| 3328-3648 (handleStreamingResponse) | `index.ts` | Main loop |
|
|
||||||
|
|
||||||
### From `app/api/copilot/execute-tool/route.ts`
|
|
||||||
|
|
||||||
| Source Lines | Destination | Notes |
|
|
||||||
|--------------|-------------|-------|
|
|
||||||
| 30-247 (POST handler) | `tool-executor.ts` | Extract core logic |
|
|
||||||
|
|
||||||
### From `app/api/copilot/confirm/route.ts`
|
|
||||||
|
|
||||||
| Source Lines | Destination | Notes |
|
|
||||||
|--------------|-------------|-------|
|
|
||||||
| 28-89 (updateToolCallStatus) | `persistence.ts` | Redis operations |
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Approval & Sign-off
|
|
||||||
|
|
||||||
- [ ] Technical review complete
|
|
||||||
- [ ] Security review complete
|
|
||||||
- [ ] Performance impact assessed
|
|
||||||
- [ ] Rollback plan approved
|
|
||||||
- [ ] Testing plan approved
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
*Document created: January 2026*
|
|
||||||
*Last updated: January 2026*
|
|
||||||
|
|
||||||
Reference in New Issue
Block a user