mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-25 14:58:14 -05:00
Compare commits
3 Commits
fix/docs
...
feat/copil
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f28bbacb32 | ||
|
|
41d7c2847d | ||
|
|
5da1dfb5e4 |
@@ -0,0 +1,86 @@
|
|||||||
|
/**
|
||||||
|
* GET /api/copilot/chat/[chatId]/active-stream
|
||||||
|
*
|
||||||
|
* Check if a chat has an active stream that can be resumed.
|
||||||
|
* Used by the client on page load to detect if there's an in-progress stream.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { createLogger } from '@sim/logger'
|
||||||
|
import { type NextRequest, NextResponse } from 'next/server'
|
||||||
|
import { getSession } from '@/lib/auth'
|
||||||
|
import {
|
||||||
|
getActiveStreamForChat,
|
||||||
|
getChunkCount,
|
||||||
|
getStreamMeta,
|
||||||
|
} from '@/lib/copilot/stream-persistence'
|
||||||
|
|
||||||
|
const logger = createLogger('CopilotActiveStreamAPI')
|
||||||
|
|
||||||
|
export async function GET(
|
||||||
|
req: NextRequest,
|
||||||
|
{ params }: { params: Promise<{ chatId: string }> }
|
||||||
|
) {
|
||||||
|
const session = await getSession()
|
||||||
|
if (!session?.user?.id) {
|
||||||
|
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||||
|
}
|
||||||
|
|
||||||
|
const { chatId } = await params
|
||||||
|
|
||||||
|
logger.info('Active stream check', { chatId, userId: session.user.id })
|
||||||
|
|
||||||
|
// Look up active stream ID from Redis
|
||||||
|
const streamId = await getActiveStreamForChat(chatId)
|
||||||
|
|
||||||
|
if (!streamId) {
|
||||||
|
logger.debug('No active stream found', { chatId })
|
||||||
|
return NextResponse.json({ hasActiveStream: false })
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get stream metadata
|
||||||
|
const meta = await getStreamMeta(streamId)
|
||||||
|
|
||||||
|
if (!meta) {
|
||||||
|
logger.debug('Stream metadata not found', { streamId, chatId })
|
||||||
|
return NextResponse.json({ hasActiveStream: false })
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the stream is still active
|
||||||
|
if (meta.status !== 'streaming') {
|
||||||
|
logger.debug('Stream not active', { streamId, chatId, status: meta.status })
|
||||||
|
return NextResponse.json({ hasActiveStream: false })
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify ownership
|
||||||
|
if (meta.userId !== session.user.id) {
|
||||||
|
logger.warn('Stream belongs to different user', {
|
||||||
|
streamId,
|
||||||
|
chatId,
|
||||||
|
requesterId: session.user.id,
|
||||||
|
ownerId: meta.userId,
|
||||||
|
})
|
||||||
|
return NextResponse.json({ hasActiveStream: false })
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get current chunk count for client to track progress
|
||||||
|
const chunkCount = await getChunkCount(streamId)
|
||||||
|
|
||||||
|
logger.info('Active stream found', {
|
||||||
|
streamId,
|
||||||
|
chatId,
|
||||||
|
chunkCount,
|
||||||
|
toolCallsCount: meta.toolCalls.length,
|
||||||
|
})
|
||||||
|
|
||||||
|
return NextResponse.json({
|
||||||
|
hasActiveStream: true,
|
||||||
|
streamId,
|
||||||
|
chunkCount,
|
||||||
|
toolCalls: meta.toolCalls.filter(
|
||||||
|
(tc) => tc.state === 'pending' || tc.state === 'executing'
|
||||||
|
),
|
||||||
|
createdAt: meta.createdAt,
|
||||||
|
updatedAt: meta.updatedAt,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
@@ -2,6 +2,7 @@ import { db } from '@sim/db'
|
|||||||
import { copilotChats } from '@sim/db/schema'
|
import { copilotChats } from '@sim/db/schema'
|
||||||
import { createLogger } from '@sim/logger'
|
import { createLogger } from '@sim/logger'
|
||||||
import { and, desc, eq } from 'drizzle-orm'
|
import { and, desc, eq } from 'drizzle-orm'
|
||||||
|
import { after } from 'next/server'
|
||||||
import { type NextRequest, NextResponse } from 'next/server'
|
import { type NextRequest, NextResponse } from 'next/server'
|
||||||
import { z } from 'zod'
|
import { z } from 'zod'
|
||||||
import { getSession } from '@/lib/auth'
|
import { getSession } from '@/lib/auth'
|
||||||
@@ -16,6 +17,21 @@ import {
|
|||||||
createRequestTracker,
|
createRequestTracker,
|
||||||
createUnauthorizedResponse,
|
createUnauthorizedResponse,
|
||||||
} from '@/lib/copilot/request-helpers'
|
} from '@/lib/copilot/request-helpers'
|
||||||
|
import {
|
||||||
|
type RenderEvent,
|
||||||
|
serializeRenderEvent,
|
||||||
|
} from '@/lib/copilot/render-events'
|
||||||
|
import {
|
||||||
|
appendChunk,
|
||||||
|
appendContent,
|
||||||
|
checkAbortSignal,
|
||||||
|
completeStream,
|
||||||
|
createStream,
|
||||||
|
errorStream,
|
||||||
|
refreshStreamTTL,
|
||||||
|
updateToolCall,
|
||||||
|
} from '@/lib/copilot/stream-persistence'
|
||||||
|
import { transformStream } from '@/lib/copilot/stream-transformer'
|
||||||
import { getCredentialsServerTool } from '@/lib/copilot/tools/server/user/get-credentials'
|
import { getCredentialsServerTool } from '@/lib/copilot/tools/server/user/get-credentials'
|
||||||
import type { CopilotProviderConfig } from '@/lib/copilot/types'
|
import type { CopilotProviderConfig } from '@/lib/copilot/types'
|
||||||
import { env } from '@/lib/core/config/env'
|
import { env } from '@/lib/core/config/env'
|
||||||
@@ -492,385 +508,205 @@ export async function POST(req: NextRequest) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If streaming is requested, forward the stream and update chat later
|
// If streaming is requested, start background processing and return streamId immediately
|
||||||
if (stream && simAgentResponse.body) {
|
if (stream && simAgentResponse.body) {
|
||||||
// Create user message to save
|
// Create stream ID for persistence and resumption
|
||||||
const userMessage = {
|
const streamId = crypto.randomUUID()
|
||||||
id: userMessageIdToUse, // Consistent ID used for request and persistence
|
|
||||||
role: 'user',
|
// Initialize stream state in Redis
|
||||||
content: message,
|
await createStream({
|
||||||
timestamp: new Date().toISOString(),
|
streamId,
|
||||||
...(fileAttachments && fileAttachments.length > 0 && { fileAttachments }),
|
chatId: actualChatId!,
|
||||||
...(Array.isArray(contexts) && contexts.length > 0 && { contexts }),
|
userId: authenticatedUserId,
|
||||||
...(Array.isArray(contexts) &&
|
workflowId,
|
||||||
contexts.length > 0 && {
|
userMessageId: userMessageIdToUse,
|
||||||
contentBlocks: [{ type: 'contexts', contexts: contexts as any, timestamp: Date.now() }],
|
isClientSession: true,
|
||||||
}),
|
})
|
||||||
|
|
||||||
|
// Save user message to database immediately so it's available on refresh
|
||||||
|
// This is critical for stream resumption - user message must be persisted before stream starts
|
||||||
|
if (currentChat) {
|
||||||
|
const existingMessages = Array.isArray(currentChat.messages) ? currentChat.messages : []
|
||||||
|
const userMessage = {
|
||||||
|
id: userMessageIdToUse,
|
||||||
|
role: 'user',
|
||||||
|
content: message,
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
...(fileAttachments && fileAttachments.length > 0 && { fileAttachments }),
|
||||||
|
...(Array.isArray(contexts) && contexts.length > 0 && { contexts }),
|
||||||
|
...(Array.isArray(contexts) &&
|
||||||
|
contexts.length > 0 && {
|
||||||
|
contentBlocks: [{ type: 'contexts', contexts: contexts as any, timestamp: Date.now() }],
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
|
||||||
|
await db
|
||||||
|
.update(copilotChats)
|
||||||
|
.set({
|
||||||
|
messages: [...existingMessages, userMessage],
|
||||||
|
updatedAt: new Date(),
|
||||||
|
})
|
||||||
|
.where(eq(copilotChats.id, actualChatId!))
|
||||||
|
|
||||||
|
logger.info(`[${tracker.requestId}] Saved user message before streaming`, {
|
||||||
|
chatId: actualChatId,
|
||||||
|
messageId: userMessageIdToUse,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a pass-through stream that captures the response
|
// Track last TTL refresh time
|
||||||
const transformedStream = new ReadableStream({
|
const TTL_REFRESH_INTERVAL = 60000 // Refresh TTL every minute
|
||||||
async start(controller) {
|
|
||||||
const encoder = new TextEncoder()
|
|
||||||
let assistantContent = ''
|
|
||||||
const toolCalls: any[] = []
|
|
||||||
let buffer = ''
|
|
||||||
const isFirstDone = true
|
|
||||||
let responseIdFromStart: string | undefined
|
|
||||||
let responseIdFromDone: string | undefined
|
|
||||||
// Track tool call progress to identify a safe done event
|
|
||||||
const announcedToolCallIds = new Set<string>()
|
|
||||||
const startedToolExecutionIds = new Set<string>()
|
|
||||||
const completedToolExecutionIds = new Set<string>()
|
|
||||||
let lastDoneResponseId: string | undefined
|
|
||||||
let lastSafeDoneResponseId: string | undefined
|
|
||||||
|
|
||||||
// Send chatId as first event
|
// Capture needed values for background task
|
||||||
if (actualChatId) {
|
const capturedChatId = actualChatId!
|
||||||
const chatIdEvent = `data: ${JSON.stringify({
|
const capturedCurrentChat = currentChat
|
||||||
type: 'chat_id',
|
|
||||||
chatId: actualChatId,
|
|
||||||
})}\n\n`
|
|
||||||
controller.enqueue(encoder.encode(chatIdEvent))
|
|
||||||
logger.debug(`[${tracker.requestId}] Sent initial chatId event to client`)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start title generation in parallel if needed
|
// Generate assistant message ID upfront
|
||||||
if (actualChatId && !currentChat?.title && conversationHistory.length === 0) {
|
const assistantMessageId = crypto.randomUUID()
|
||||||
generateChatTitle(message)
|
|
||||||
.then(async (title) => {
|
|
||||||
if (title) {
|
|
||||||
await db
|
|
||||||
.update(copilotChats)
|
|
||||||
.set({
|
|
||||||
title,
|
|
||||||
updatedAt: new Date(),
|
|
||||||
})
|
|
||||||
.where(eq(copilotChats.id, actualChatId!))
|
|
||||||
|
|
||||||
const titleEvent = `data: ${JSON.stringify({
|
// Start background processing task using the stream transformer
|
||||||
type: 'title_updated',
|
// This processes the Sim Agent stream, executes tools, and emits render events
|
||||||
title: title,
|
// Client will connect to /api/copilot/stream/{streamId} for live updates
|
||||||
})}\n\n`
|
const backgroundTask = (async () => {
|
||||||
controller.enqueue(encoder.encode(titleEvent))
|
// Start title generation if needed (runs in parallel)
|
||||||
logger.info(`[${tracker.requestId}] Generated and saved title: ${title}`)
|
if (capturedChatId && !capturedCurrentChat?.title && conversationHistory.length === 0) {
|
||||||
}
|
generateChatTitle(message)
|
||||||
})
|
.then(async (title) => {
|
||||||
.catch((error) => {
|
if (title) {
|
||||||
logger.error(`[${tracker.requestId}] Title generation failed:`, error)
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
logger.debug(`[${tracker.requestId}] Skipping title generation`)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Forward the sim agent stream and capture assistant response
|
|
||||||
const reader = simAgentResponse.body!.getReader()
|
|
||||||
const decoder = new TextDecoder()
|
|
||||||
|
|
||||||
try {
|
|
||||||
while (true) {
|
|
||||||
const { done, value } = await reader.read()
|
|
||||||
if (done) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// Decode and parse SSE events for logging and capturing content
|
|
||||||
const decodedChunk = decoder.decode(value, { stream: true })
|
|
||||||
buffer += decodedChunk
|
|
||||||
|
|
||||||
const lines = buffer.split('\n')
|
|
||||||
buffer = lines.pop() || '' // Keep incomplete line in buffer
|
|
||||||
|
|
||||||
for (const line of lines) {
|
|
||||||
if (line.trim() === '') continue // Skip empty lines
|
|
||||||
|
|
||||||
if (line.startsWith('data: ') && line.length > 6) {
|
|
||||||
try {
|
|
||||||
const jsonStr = line.slice(6)
|
|
||||||
|
|
||||||
// Check if the JSON string is unusually large (potential streaming issue)
|
|
||||||
if (jsonStr.length > 50000) {
|
|
||||||
// 50KB limit
|
|
||||||
logger.warn(`[${tracker.requestId}] Large SSE event detected`, {
|
|
||||||
size: jsonStr.length,
|
|
||||||
preview: `${jsonStr.substring(0, 100)}...`,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
const event = JSON.parse(jsonStr)
|
|
||||||
|
|
||||||
// Log different event types comprehensively
|
|
||||||
switch (event.type) {
|
|
||||||
case 'content':
|
|
||||||
if (event.data) {
|
|
||||||
assistantContent += event.data
|
|
||||||
}
|
|
||||||
break
|
|
||||||
|
|
||||||
case 'reasoning':
|
|
||||||
logger.debug(
|
|
||||||
`[${tracker.requestId}] Reasoning chunk received (${(event.data || event.content || '').length} chars)`
|
|
||||||
)
|
|
||||||
break
|
|
||||||
|
|
||||||
case 'tool_call':
|
|
||||||
if (!event.data?.partial) {
|
|
||||||
toolCalls.push(event.data)
|
|
||||||
if (event.data?.id) {
|
|
||||||
announcedToolCallIds.add(event.data.id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break
|
|
||||||
|
|
||||||
case 'tool_generating':
|
|
||||||
if (event.toolCallId) {
|
|
||||||
startedToolExecutionIds.add(event.toolCallId)
|
|
||||||
}
|
|
||||||
break
|
|
||||||
|
|
||||||
case 'tool_result':
|
|
||||||
if (event.toolCallId) {
|
|
||||||
completedToolExecutionIds.add(event.toolCallId)
|
|
||||||
}
|
|
||||||
break
|
|
||||||
|
|
||||||
case 'tool_error':
|
|
||||||
logger.error(`[${tracker.requestId}] Tool error:`, {
|
|
||||||
toolCallId: event.toolCallId,
|
|
||||||
toolName: event.toolName,
|
|
||||||
error: event.error,
|
|
||||||
success: event.success,
|
|
||||||
})
|
|
||||||
if (event.toolCallId) {
|
|
||||||
completedToolExecutionIds.add(event.toolCallId)
|
|
||||||
}
|
|
||||||
break
|
|
||||||
|
|
||||||
case 'start':
|
|
||||||
if (event.data?.responseId) {
|
|
||||||
responseIdFromStart = event.data.responseId
|
|
||||||
}
|
|
||||||
break
|
|
||||||
|
|
||||||
case 'done':
|
|
||||||
if (event.data?.responseId) {
|
|
||||||
responseIdFromDone = event.data.responseId
|
|
||||||
lastDoneResponseId = responseIdFromDone
|
|
||||||
|
|
||||||
// Mark this done as safe only if no tool call is currently in progress or pending
|
|
||||||
const announced = announcedToolCallIds.size
|
|
||||||
const completed = completedToolExecutionIds.size
|
|
||||||
const started = startedToolExecutionIds.size
|
|
||||||
const hasToolInProgress = announced > completed || started > completed
|
|
||||||
if (!hasToolInProgress) {
|
|
||||||
lastSafeDoneResponseId = responseIdFromDone
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break
|
|
||||||
|
|
||||||
case 'error':
|
|
||||||
break
|
|
||||||
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
// Emit to client: rewrite 'error' events into user-friendly assistant message
|
|
||||||
if (event?.type === 'error') {
|
|
||||||
try {
|
|
||||||
const displayMessage: string =
|
|
||||||
(event?.data && (event.data.displayMessage as string)) ||
|
|
||||||
'Sorry, I encountered an error. Please try again.'
|
|
||||||
const formatted = `_${displayMessage}_`
|
|
||||||
// Accumulate so it persists to DB as assistant content
|
|
||||||
assistantContent += formatted
|
|
||||||
// Send as content chunk
|
|
||||||
try {
|
|
||||||
controller.enqueue(
|
|
||||||
encoder.encode(
|
|
||||||
`data: ${JSON.stringify({ type: 'content', data: formatted })}\n\n`
|
|
||||||
)
|
|
||||||
)
|
|
||||||
} catch (enqueueErr) {
|
|
||||||
reader.cancel()
|
|
||||||
break
|
|
||||||
}
|
|
||||||
// Then close this response cleanly for the client
|
|
||||||
try {
|
|
||||||
controller.enqueue(
|
|
||||||
encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`)
|
|
||||||
)
|
|
||||||
} catch (enqueueErr) {
|
|
||||||
reader.cancel()
|
|
||||||
break
|
|
||||||
}
|
|
||||||
} catch {}
|
|
||||||
// Do not forward the original error event
|
|
||||||
} else {
|
|
||||||
// Forward original event to client
|
|
||||||
try {
|
|
||||||
controller.enqueue(encoder.encode(`data: ${jsonStr}\n\n`))
|
|
||||||
} catch (enqueueErr) {
|
|
||||||
reader.cancel()
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
// Enhanced error handling for large payloads and parsing issues
|
|
||||||
const lineLength = line.length
|
|
||||||
const isLargePayload = lineLength > 10000
|
|
||||||
|
|
||||||
if (isLargePayload) {
|
|
||||||
logger.error(
|
|
||||||
`[${tracker.requestId}] Failed to parse large SSE event (${lineLength} chars)`,
|
|
||||||
{
|
|
||||||
error: e,
|
|
||||||
preview: `${line.substring(0, 200)}...`,
|
|
||||||
size: lineLength,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
logger.warn(
|
|
||||||
`[${tracker.requestId}] Failed to parse SSE event: "${line.substring(0, 200)}..."`,
|
|
||||||
e
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if (line.trim() && line !== 'data: [DONE]') {
|
|
||||||
logger.debug(`[${tracker.requestId}] Non-SSE line from sim agent: "${line}"`)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process any remaining buffer
|
|
||||||
if (buffer.trim()) {
|
|
||||||
logger.debug(`[${tracker.requestId}] Processing remaining buffer: "${buffer}"`)
|
|
||||||
if (buffer.startsWith('data: ')) {
|
|
||||||
try {
|
|
||||||
const jsonStr = buffer.slice(6)
|
|
||||||
const event = JSON.parse(jsonStr)
|
|
||||||
if (event.type === 'content' && event.data) {
|
|
||||||
assistantContent += event.data
|
|
||||||
}
|
|
||||||
// Forward remaining event, applying same error rewrite behavior
|
|
||||||
if (event?.type === 'error') {
|
|
||||||
const displayMessage: string =
|
|
||||||
(event?.data && (event.data.displayMessage as string)) ||
|
|
||||||
'Sorry, I encountered an error. Please try again.'
|
|
||||||
const formatted = `_${displayMessage}_`
|
|
||||||
assistantContent += formatted
|
|
||||||
try {
|
|
||||||
controller.enqueue(
|
|
||||||
encoder.encode(
|
|
||||||
`data: ${JSON.stringify({ type: 'content', data: formatted })}\n\n`
|
|
||||||
)
|
|
||||||
)
|
|
||||||
controller.enqueue(
|
|
||||||
encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`)
|
|
||||||
)
|
|
||||||
} catch (enqueueErr) {
|
|
||||||
reader.cancel()
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
controller.enqueue(encoder.encode(`data: ${jsonStr}\n\n`))
|
|
||||||
} catch (enqueueErr) {
|
|
||||||
reader.cancel()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
logger.warn(`[${tracker.requestId}] Failed to parse final buffer: "${buffer}"`)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Log final streaming summary
|
|
||||||
logger.info(`[${tracker.requestId}] Streaming complete summary:`, {
|
|
||||||
totalContentLength: assistantContent.length,
|
|
||||||
toolCallsCount: toolCalls.length,
|
|
||||||
hasContent: assistantContent.length > 0,
|
|
||||||
toolNames: toolCalls.map((tc) => tc?.name).filter(Boolean),
|
|
||||||
})
|
|
||||||
|
|
||||||
// NOTE: Messages are saved by the client via update-messages endpoint with full contentBlocks.
|
|
||||||
// Server only updates conversationId here to avoid overwriting client's richer save.
|
|
||||||
if (currentChat) {
|
|
||||||
// Persist only a safe conversationId to avoid continuing from a state that expects tool outputs
|
|
||||||
const previousConversationId = currentChat?.conversationId as string | undefined
|
|
||||||
const responseId = lastSafeDoneResponseId || previousConversationId || undefined
|
|
||||||
|
|
||||||
if (responseId) {
|
|
||||||
await db
|
await db
|
||||||
.update(copilotChats)
|
.update(copilotChats)
|
||||||
.set({
|
.set({ title, updatedAt: new Date() })
|
||||||
updatedAt: new Date(),
|
.where(eq(copilotChats.id, capturedChatId))
|
||||||
conversationId: responseId,
|
logger.info(`[${tracker.requestId}] Generated and saved title: ${title}`)
|
||||||
})
|
|
||||||
.where(eq(copilotChats.id, actualChatId!))
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
`[${tracker.requestId}] Updated conversationId for chat ${actualChatId}`,
|
|
||||||
{
|
|
||||||
updatedConversationId: responseId,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
} catch (error) {
|
.catch((error) => {
|
||||||
logger.error(`[${tracker.requestId}] Error processing stream:`, error)
|
logger.error(`[${tracker.requestId}] Title generation failed:`, error)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Send an error event to the client before closing so it knows what happened
|
// Track accumulated content for final persistence
|
||||||
try {
|
let accumulatedContent = ''
|
||||||
const errorMessage =
|
const accumulatedToolCalls: Array<{
|
||||||
error instanceof Error && error.message === 'terminated'
|
id: string
|
||||||
? 'Connection to AI service was interrupted. Please try again.'
|
name: string
|
||||||
: 'An unexpected error occurred while processing the response.'
|
args: Record<string, unknown>
|
||||||
const encoder = new TextEncoder()
|
state: string
|
||||||
|
result?: unknown
|
||||||
|
}> = []
|
||||||
|
|
||||||
// Send error as content so it shows in the chat
|
try {
|
||||||
controller.enqueue(
|
// Use the stream transformer to process the Sim Agent stream
|
||||||
encoder.encode(
|
await transformStream(simAgentResponse.body!, {
|
||||||
`data: ${JSON.stringify({ type: 'content', data: `\n\n_${errorMessage}_` })}\n\n`
|
streamId,
|
||||||
)
|
chatId: capturedChatId,
|
||||||
)
|
userId: authenticatedUserId,
|
||||||
// Send done event to properly close the stream on client
|
workflowId,
|
||||||
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`))
|
userMessageId: userMessageIdToUse,
|
||||||
} catch (enqueueError) {
|
assistantMessageId,
|
||||||
// Stream might already be closed, that's ok
|
|
||||||
logger.warn(
|
// Emit render events to Redis for client consumption
|
||||||
`[${tracker.requestId}] Could not send error event to client:`,
|
onRenderEvent: async (event: RenderEvent) => {
|
||||||
enqueueError
|
// Serialize and append to Redis
|
||||||
)
|
const serialized = serializeRenderEvent(event)
|
||||||
}
|
await appendChunk(streamId, serialized).catch(() => {})
|
||||||
} finally {
|
|
||||||
try {
|
// Also update stream metadata for specific events
|
||||||
controller.close()
|
switch (event.type) {
|
||||||
} catch {
|
case 'text_delta':
|
||||||
// Controller might already be closed
|
accumulatedContent += (event as any).content || ''
|
||||||
}
|
appendContent(streamId, (event as any).content || '').catch(() => {})
|
||||||
|
break
|
||||||
|
case 'tool_pending':
|
||||||
|
updateToolCall(streamId, (event as any).toolCallId, {
|
||||||
|
id: (event as any).toolCallId,
|
||||||
|
name: (event as any).toolName,
|
||||||
|
args: (event as any).args || {},
|
||||||
|
state: 'pending',
|
||||||
|
}).catch(() => {})
|
||||||
|
break
|
||||||
|
case 'tool_executing':
|
||||||
|
updateToolCall(streamId, (event as any).toolCallId, {
|
||||||
|
state: 'executing',
|
||||||
|
}).catch(() => {})
|
||||||
|
break
|
||||||
|
case 'tool_success':
|
||||||
|
updateToolCall(streamId, (event as any).toolCallId, {
|
||||||
|
state: 'success',
|
||||||
|
result: (event as any).result,
|
||||||
|
}).catch(() => {})
|
||||||
|
accumulatedToolCalls.push({
|
||||||
|
id: (event as any).toolCallId,
|
||||||
|
name: (event as any).display?.label || '',
|
||||||
|
args: {},
|
||||||
|
state: 'success',
|
||||||
|
result: (event as any).result,
|
||||||
|
})
|
||||||
|
break
|
||||||
|
case 'tool_error':
|
||||||
|
updateToolCall(streamId, (event as any).toolCallId, {
|
||||||
|
state: 'error',
|
||||||
|
error: (event as any).error,
|
||||||
|
}).catch(() => {})
|
||||||
|
accumulatedToolCalls.push({
|
||||||
|
id: (event as any).toolCallId,
|
||||||
|
name: (event as any).display?.label || '',
|
||||||
|
args: {},
|
||||||
|
state: 'error',
|
||||||
|
})
|
||||||
|
break
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
// Persist data at key moments
|
||||||
|
onPersist: async (data) => {
|
||||||
|
if (data.type === 'message_complete') {
|
||||||
|
// Stream complete - save final message to DB
|
||||||
|
await completeStream(streamId, undefined)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
// Check for user-initiated abort
|
||||||
|
isAborted: () => {
|
||||||
|
// We'll check Redis for abort signal synchronously cached
|
||||||
|
// For now, return false - proper abort checking can be async in transformer
|
||||||
|
return false
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Update chat with conversationId if available
|
||||||
|
if (capturedCurrentChat) {
|
||||||
|
await db
|
||||||
|
.update(copilotChats)
|
||||||
|
.set({ updatedAt: new Date() })
|
||||||
|
.where(eq(copilotChats.id, capturedChatId))
|
||||||
}
|
}
|
||||||
},
|
|
||||||
|
logger.info(`[${tracker.requestId}] Background stream processing complete`, {
|
||||||
|
streamId,
|
||||||
|
contentLength: accumulatedContent.length,
|
||||||
|
toolCallsCount: accumulatedToolCalls.length,
|
||||||
|
})
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[${tracker.requestId}] Background stream error`, { streamId, error })
|
||||||
|
await errorStream(streamId, error instanceof Error ? error.message : 'Unknown error')
|
||||||
|
}
|
||||||
|
})()
|
||||||
|
|
||||||
|
// Use after() to ensure background task completes even after response is sent
|
||||||
|
after(() => backgroundTask)
|
||||||
|
|
||||||
|
// Return streamId immediately - client will connect to stream endpoint
|
||||||
|
logger.info(`[${tracker.requestId}] Returning streamId for client to connect`, {
|
||||||
|
streamId,
|
||||||
|
chatId: capturedChatId,
|
||||||
})
|
})
|
||||||
|
|
||||||
const response = new Response(transformedStream, {
|
return NextResponse.json({
|
||||||
headers: {
|
success: true,
|
||||||
'Content-Type': 'text/event-stream',
|
streamId,
|
||||||
'Cache-Control': 'no-cache',
|
chatId: capturedChatId,
|
||||||
Connection: 'keep-alive',
|
|
||||||
'X-Accel-Buffering': 'no',
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
|
|
||||||
logger.info(`[${tracker.requestId}] Returning streaming response to client`, {
|
|
||||||
duration: tracker.getDuration(),
|
|
||||||
chatId: actualChatId,
|
|
||||||
headers: {
|
|
||||||
'Content-Type': 'text/event-stream',
|
|
||||||
'Cache-Control': 'no-cache',
|
|
||||||
Connection: 'keep-alive',
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
return response
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// For non-streaming responses
|
// For non-streaming responses
|
||||||
@@ -899,7 +735,7 @@ export async function POST(req: NextRequest) {
|
|||||||
// Save messages if we have a chat
|
// Save messages if we have a chat
|
||||||
if (currentChat && responseData.content) {
|
if (currentChat && responseData.content) {
|
||||||
const userMessage = {
|
const userMessage = {
|
||||||
id: userMessageIdToUse, // Consistent ID used for request and persistence
|
id: userMessageIdToUse,
|
||||||
role: 'user',
|
role: 'user',
|
||||||
content: message,
|
content: message,
|
||||||
timestamp: new Date().toISOString(),
|
timestamp: new Date().toISOString(),
|
||||||
|
|||||||
64
apps/sim/app/api/copilot/stream/[streamId]/abort/route.ts
Normal file
64
apps/sim/app/api/copilot/stream/[streamId]/abort/route.ts
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
/**
|
||||||
|
* POST /api/copilot/stream/[streamId]/abort
|
||||||
|
*
|
||||||
|
* Signal the server to abort an active stream.
|
||||||
|
* The original request handler will check for this signal and cancel the stream.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { createLogger } from '@sim/logger'
|
||||||
|
import { type NextRequest, NextResponse } from 'next/server'
|
||||||
|
import { getSession } from '@/lib/auth'
|
||||||
|
import { getStreamMeta, setAbortSignal } from '@/lib/copilot/stream-persistence'
|
||||||
|
|
||||||
|
const logger = createLogger('CopilotStreamAbortAPI')
|
||||||
|
|
||||||
|
export async function POST(
|
||||||
|
req: NextRequest,
|
||||||
|
{ params }: { params: Promise<{ streamId: string }> }
|
||||||
|
) {
|
||||||
|
const session = await getSession()
|
||||||
|
if (!session?.user?.id) {
|
||||||
|
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||||
|
}
|
||||||
|
|
||||||
|
const { streamId } = await params
|
||||||
|
|
||||||
|
logger.info('Stream abort request', { streamId, userId: session.user.id })
|
||||||
|
|
||||||
|
const meta = await getStreamMeta(streamId)
|
||||||
|
|
||||||
|
if (!meta) {
|
||||||
|
logger.info('Stream not found for abort', { streamId })
|
||||||
|
return NextResponse.json({ error: 'Stream not found' }, { status: 404 })
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify ownership
|
||||||
|
if (meta.userId !== session.user.id) {
|
||||||
|
logger.warn('Unauthorized abort attempt', {
|
||||||
|
streamId,
|
||||||
|
requesterId: session.user.id,
|
||||||
|
ownerId: meta.userId,
|
||||||
|
})
|
||||||
|
return NextResponse.json({ error: 'Forbidden' }, { status: 403 })
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stream already finished
|
||||||
|
if (meta.status !== 'streaming') {
|
||||||
|
logger.info('Stream already finished, nothing to abort', {
|
||||||
|
streamId,
|
||||||
|
status: meta.status,
|
||||||
|
})
|
||||||
|
return NextResponse.json({
|
||||||
|
success: true,
|
||||||
|
message: 'Stream already finished',
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set abort signal in Redis
|
||||||
|
await setAbortSignal(streamId)
|
||||||
|
|
||||||
|
logger.info('Abort signal set for stream', { streamId })
|
||||||
|
|
||||||
|
return NextResponse.json({ success: true })
|
||||||
|
}
|
||||||
|
|
||||||
146
apps/sim/app/api/copilot/stream/[streamId]/pending-diff/route.ts
Normal file
146
apps/sim/app/api/copilot/stream/[streamId]/pending-diff/route.ts
Normal file
@@ -0,0 +1,146 @@
|
|||||||
|
import { createLogger } from '@sim/logger'
|
||||||
|
import { NextResponse } from 'next/server'
|
||||||
|
import { getSession } from '@/lib/auth'
|
||||||
|
import {
|
||||||
|
clearPendingDiff,
|
||||||
|
getPendingDiff,
|
||||||
|
getStreamMeta,
|
||||||
|
setPendingDiff,
|
||||||
|
} from '@/lib/copilot/stream-persistence'
|
||||||
|
|
||||||
|
const logger = createLogger('CopilotPendingDiffAPI')
|
||||||
|
|
||||||
|
/**
|
||||||
|
* GET /api/copilot/stream/[streamId]/pending-diff
|
||||||
|
* Retrieve pending diff state for a stream (used for resumption after page refresh)
|
||||||
|
*/
|
||||||
|
export async function GET(
|
||||||
|
request: Request,
|
||||||
|
{ params }: { params: Promise<{ streamId: string }> }
|
||||||
|
) {
|
||||||
|
try {
|
||||||
|
const session = await getSession()
|
||||||
|
if (!session?.user?.id) {
|
||||||
|
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||||
|
}
|
||||||
|
|
||||||
|
const { streamId } = await params
|
||||||
|
if (!streamId) {
|
||||||
|
return NextResponse.json({ error: 'Stream ID required' }, { status: 400 })
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify user owns this stream
|
||||||
|
const meta = await getStreamMeta(streamId)
|
||||||
|
if (!meta) {
|
||||||
|
return NextResponse.json({ error: 'Stream not found' }, { status: 404 })
|
||||||
|
}
|
||||||
|
|
||||||
|
if (meta.userId !== session.user.id) {
|
||||||
|
return NextResponse.json({ error: 'Unauthorized' }, { status: 403 })
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get pending diff
|
||||||
|
const pendingDiff = await getPendingDiff(streamId)
|
||||||
|
|
||||||
|
if (!pendingDiff) {
|
||||||
|
return NextResponse.json({ pendingDiff: null })
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info('Retrieved pending diff', {
|
||||||
|
streamId,
|
||||||
|
toolCallId: pendingDiff.toolCallId,
|
||||||
|
})
|
||||||
|
|
||||||
|
return NextResponse.json({ pendingDiff })
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Failed to get pending diff', { error })
|
||||||
|
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* POST /api/copilot/stream/[streamId]/pending-diff
|
||||||
|
* Store pending diff state for a stream
|
||||||
|
*/
|
||||||
|
export async function POST(
|
||||||
|
request: Request,
|
||||||
|
{ params }: { params: Promise<{ streamId: string }> }
|
||||||
|
) {
|
||||||
|
try {
|
||||||
|
const session = await getSession()
|
||||||
|
if (!session?.user?.id) {
|
||||||
|
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||||
|
}
|
||||||
|
|
||||||
|
const { streamId } = await params
|
||||||
|
if (!streamId) {
|
||||||
|
return NextResponse.json({ error: 'Stream ID required' }, { status: 400 })
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify user owns this stream
|
||||||
|
const meta = await getStreamMeta(streamId)
|
||||||
|
if (!meta) {
|
||||||
|
return NextResponse.json({ error: 'Stream not found' }, { status: 404 })
|
||||||
|
}
|
||||||
|
|
||||||
|
if (meta.userId !== session.user.id) {
|
||||||
|
return NextResponse.json({ error: 'Unauthorized' }, { status: 403 })
|
||||||
|
}
|
||||||
|
|
||||||
|
const body = await request.json()
|
||||||
|
const { pendingDiff } = body
|
||||||
|
|
||||||
|
if (!pendingDiff || !pendingDiff.toolCallId) {
|
||||||
|
return NextResponse.json({ error: 'Invalid pending diff data' }, { status: 400 })
|
||||||
|
}
|
||||||
|
|
||||||
|
await setPendingDiff(streamId, pendingDiff)
|
||||||
|
|
||||||
|
logger.info('Stored pending diff', {
|
||||||
|
streamId,
|
||||||
|
toolCallId: pendingDiff.toolCallId,
|
||||||
|
})
|
||||||
|
|
||||||
|
return NextResponse.json({ success: true })
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Failed to store pending diff', { error })
|
||||||
|
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DELETE /api/copilot/stream/[streamId]/pending-diff
|
||||||
|
* Clear pending diff state for a stream
|
||||||
|
*/
|
||||||
|
export async function DELETE(
|
||||||
|
request: Request,
|
||||||
|
{ params }: { params: Promise<{ streamId: string }> }
|
||||||
|
) {
|
||||||
|
try {
|
||||||
|
const session = await getSession()
|
||||||
|
if (!session?.user?.id) {
|
||||||
|
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||||
|
}
|
||||||
|
|
||||||
|
const { streamId } = await params
|
||||||
|
if (!streamId) {
|
||||||
|
return NextResponse.json({ error: 'Stream ID required' }, { status: 400 })
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify user owns this stream (if it exists - might already be cleaned up)
|
||||||
|
const meta = await getStreamMeta(streamId)
|
||||||
|
if (meta && meta.userId !== session.user.id) {
|
||||||
|
return NextResponse.json({ error: 'Unauthorized' }, { status: 403 })
|
||||||
|
}
|
||||||
|
|
||||||
|
await clearPendingDiff(streamId)
|
||||||
|
|
||||||
|
logger.info('Cleared pending diff', { streamId })
|
||||||
|
|
||||||
|
return NextResponse.json({ success: true })
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Failed to clear pending diff', { error })
|
||||||
|
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
160
apps/sim/app/api/copilot/stream/[streamId]/route.ts
Normal file
160
apps/sim/app/api/copilot/stream/[streamId]/route.ts
Normal file
@@ -0,0 +1,160 @@
|
|||||||
|
/**
|
||||||
|
* GET /api/copilot/stream/[streamId]
|
||||||
|
*
|
||||||
|
* Resume an active copilot stream.
|
||||||
|
* - If stream is still active: returns SSE with replay of missed chunks + live updates via Redis Pub/Sub
|
||||||
|
* - If stream is completed: returns JSON indicating to load from database
|
||||||
|
* - If stream not found: returns 404
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { createLogger } from '@sim/logger'
|
||||||
|
import { type NextRequest, NextResponse } from 'next/server'
|
||||||
|
import { getSession } from '@/lib/auth'
|
||||||
|
import {
|
||||||
|
getChunks,
|
||||||
|
getStreamMeta,
|
||||||
|
subscribeToStream,
|
||||||
|
} from '@/lib/copilot/stream-persistence'
|
||||||
|
|
||||||
|
const logger = createLogger('CopilotStreamResumeAPI')
|
||||||
|
|
||||||
|
const SSE_HEADERS = {
|
||||||
|
'Content-Type': 'text/event-stream',
|
||||||
|
'Cache-Control': 'no-cache',
|
||||||
|
Connection: 'keep-alive',
|
||||||
|
'X-Accel-Buffering': 'no',
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function GET(
|
||||||
|
req: NextRequest,
|
||||||
|
{ params }: { params: Promise<{ streamId: string }> }
|
||||||
|
) {
|
||||||
|
const session = await getSession()
|
||||||
|
if (!session?.user?.id) {
|
||||||
|
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||||
|
}
|
||||||
|
|
||||||
|
const { streamId } = await params
|
||||||
|
const fromChunk = parseInt(req.nextUrl.searchParams.get('from') || '0')
|
||||||
|
|
||||||
|
logger.info('Stream resume request', { streamId, fromChunk, userId: session.user.id })
|
||||||
|
|
||||||
|
const meta = await getStreamMeta(streamId)
|
||||||
|
|
||||||
|
if (!meta) {
|
||||||
|
logger.info('Stream not found or expired', { streamId })
|
||||||
|
return NextResponse.json(
|
||||||
|
{
|
||||||
|
status: 'not_found',
|
||||||
|
message: 'Stream not found or expired. Reload chat from database.',
|
||||||
|
},
|
||||||
|
{ status: 404 }
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify ownership
|
||||||
|
if (meta.userId !== session.user.id) {
|
||||||
|
logger.warn('Unauthorized stream access attempt', {
|
||||||
|
streamId,
|
||||||
|
requesterId: session.user.id,
|
||||||
|
ownerId: meta.userId,
|
||||||
|
})
|
||||||
|
return NextResponse.json({ error: 'Forbidden' }, { status: 403 })
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stream completed - tell client to load from database
|
||||||
|
if (meta.status === 'completed') {
|
||||||
|
logger.info('Stream already completed', { streamId, chatId: meta.chatId })
|
||||||
|
return NextResponse.json({
|
||||||
|
status: 'completed',
|
||||||
|
chatId: meta.chatId,
|
||||||
|
message: 'Stream completed. Messages saved to database.',
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stream errored
|
||||||
|
if (meta.status === 'error') {
|
||||||
|
logger.info('Stream encountered error', { streamId, chatId: meta.chatId })
|
||||||
|
return NextResponse.json({
|
||||||
|
status: 'error',
|
||||||
|
chatId: meta.chatId,
|
||||||
|
message: 'Stream encountered an error.',
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stream still active - return SSE with replay + live updates
|
||||||
|
logger.info('Resuming active stream', { streamId, chatId: meta.chatId })
|
||||||
|
|
||||||
|
const encoder = new TextEncoder()
|
||||||
|
const abortController = new AbortController()
|
||||||
|
|
||||||
|
// Handle client disconnect
|
||||||
|
req.signal.addEventListener('abort', () => {
|
||||||
|
logger.info('Client disconnected from resumed stream', { streamId })
|
||||||
|
abortController.abort()
|
||||||
|
})
|
||||||
|
|
||||||
|
const responseStream = new ReadableStream({
|
||||||
|
async start(controller) {
|
||||||
|
try {
|
||||||
|
// 1. Replay missed chunks (single read from Redis LIST)
|
||||||
|
const missedChunks = await getChunks(streamId, fromChunk)
|
||||||
|
logger.info('Replaying missed chunks', {
|
||||||
|
streamId,
|
||||||
|
fromChunk,
|
||||||
|
missedChunkCount: missedChunks.length,
|
||||||
|
})
|
||||||
|
|
||||||
|
for (const chunk of missedChunks) {
|
||||||
|
// Chunks are already in SSE format, just re-encode
|
||||||
|
controller.enqueue(encoder.encode(chunk))
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Subscribe to live chunks via Redis Pub/Sub (blocking, no polling)
|
||||||
|
await subscribeToStream(
|
||||||
|
streamId,
|
||||||
|
(chunk) => {
|
||||||
|
try {
|
||||||
|
controller.enqueue(encoder.encode(chunk))
|
||||||
|
} catch {
|
||||||
|
// Client disconnected
|
||||||
|
abortController.abort()
|
||||||
|
}
|
||||||
|
},
|
||||||
|
() => {
|
||||||
|
// Stream complete - close connection
|
||||||
|
logger.info('Stream completed during resume', { streamId })
|
||||||
|
try {
|
||||||
|
controller.close()
|
||||||
|
} catch {
|
||||||
|
// Already closed
|
||||||
|
}
|
||||||
|
},
|
||||||
|
abortController.signal
|
||||||
|
)
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Error in stream resume', {
|
||||||
|
streamId,
|
||||||
|
error: error instanceof Error ? error.message : String(error),
|
||||||
|
})
|
||||||
|
try {
|
||||||
|
controller.close()
|
||||||
|
} catch {
|
||||||
|
// Already closed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
cancel() {
|
||||||
|
abortController.abort()
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
return new Response(responseStream, {
|
||||||
|
headers: {
|
||||||
|
...SSE_HEADERS,
|
||||||
|
'X-Stream-Id': streamId,
|
||||||
|
'X-Chat-Id': meta.chatId,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
@@ -191,26 +191,10 @@ export const Copilot = forwardRef<CopilotRef, CopilotProps>(({ panelWidth }, ref
|
|||||||
}, [isInitialized, messages.length, scrollToBottom])
|
}, [isInitialized, messages.length, scrollToBottom])
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Cleanup on component unmount (page refresh, navigation, etc.)
|
* Note: We intentionally do NOT abort on component unmount.
|
||||||
* Uses a ref to track sending state to avoid stale closure issues
|
* Streams continue server-side and can be resumed when user returns.
|
||||||
* Note: Parent workflow.tsx also has useStreamCleanup for page-level cleanup
|
* The server persists chunks to Redis for resumption.
|
||||||
*/
|
*/
|
||||||
const isSendingRef = useRef(isSendingMessage)
|
|
||||||
isSendingRef.current = isSendingMessage
|
|
||||||
const abortMessageRef = useRef(abortMessage)
|
|
||||||
abortMessageRef.current = abortMessage
|
|
||||||
|
|
||||||
useEffect(() => {
|
|
||||||
return () => {
|
|
||||||
// Use refs to check current values, not stale closure values
|
|
||||||
if (isSendingRef.current) {
|
|
||||||
abortMessageRef.current()
|
|
||||||
logger.info('Aborted active message streaming due to component unmount')
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Empty deps - only run cleanup on actual unmount, not on re-renders
|
|
||||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
|
||||||
}, [])
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Container-level click capture to cancel edit mode when clicking outside the current edit area
|
* Container-level click capture to cancel edit mode when clicking outside the current edit area
|
||||||
|
|||||||
@@ -434,13 +434,20 @@ const WorkflowContent = React.memo(() => {
|
|||||||
window.removeEventListener('open-oauth-connect', handleOpenOAuthConnect as EventListener)
|
window.removeEventListener('open-oauth-connect', handleOpenOAuthConnect as EventListener)
|
||||||
}, [])
|
}, [])
|
||||||
|
|
||||||
const { diffAnalysis, isShowingDiff, isDiffReady, reapplyDiffMarkers, hasActiveDiff } =
|
const {
|
||||||
useWorkflowDiffStore(
|
diffAnalysis,
|
||||||
|
isShowingDiff,
|
||||||
|
isDiffReady,
|
||||||
|
reapplyDiffMarkers,
|
||||||
|
hasActiveDiff,
|
||||||
|
restoreDiffFromMarkers,
|
||||||
|
} = useWorkflowDiffStore(
|
||||||
useShallow((state) => ({
|
useShallow((state) => ({
|
||||||
diffAnalysis: state.diffAnalysis,
|
diffAnalysis: state.diffAnalysis,
|
||||||
isShowingDiff: state.isShowingDiff,
|
isShowingDiff: state.isShowingDiff,
|
||||||
isDiffReady: state.isDiffReady,
|
isDiffReady: state.isDiffReady,
|
||||||
reapplyDiffMarkers: state.reapplyDiffMarkers,
|
reapplyDiffMarkers: state.reapplyDiffMarkers,
|
||||||
|
restoreDiffFromMarkers: state.restoreDiffFromMarkers,
|
||||||
hasActiveDiff: state.hasActiveDiff,
|
hasActiveDiff: state.hasActiveDiff,
|
||||||
}))
|
}))
|
||||||
)
|
)
|
||||||
@@ -466,6 +473,16 @@ const WorkflowContent = React.memo(() => {
|
|||||||
}
|
}
|
||||||
}, [blocks, hasActiveDiff, isDiffReady, reapplyDiffMarkers, isWorkflowReady])
|
}, [blocks, hasActiveDiff, isDiffReady, reapplyDiffMarkers, isWorkflowReady])
|
||||||
|
|
||||||
|
/** Restore diff state from markers on page load if blocks have is_diff markers. */
|
||||||
|
const hasRestoredDiff = useRef(false)
|
||||||
|
useEffect(() => {
|
||||||
|
if (!isWorkflowReady || hasRestoredDiff.current || hasActiveDiff) return
|
||||||
|
// Check once when workflow becomes ready
|
||||||
|
hasRestoredDiff.current = true
|
||||||
|
// Delay slightly to ensure blocks are fully loaded
|
||||||
|
setTimeout(() => restoreDiffFromMarkers(), 100)
|
||||||
|
}, [isWorkflowReady, hasActiveDiff, restoreDiffFromMarkers])
|
||||||
|
|
||||||
/** Reconstructs deleted edges for diff view and filters invalid edges. */
|
/** Reconstructs deleted edges for diff view and filters invalid edges. */
|
||||||
const edgesForDisplay = useMemo(() => {
|
const edgesForDisplay = useMemo(() => {
|
||||||
let edgesToFilter = edges
|
let edgesToFilter = edges
|
||||||
|
|||||||
@@ -3,6 +3,15 @@ import type { CopilotMode, CopilotModelId, CopilotTransportMode } from '@/lib/co
|
|||||||
|
|
||||||
const logger = createLogger('CopilotAPI')
|
const logger = createLogger('CopilotAPI')
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Response from chat initiation endpoint
|
||||||
|
*/
|
||||||
|
export interface ChatInitResponse {
|
||||||
|
success: boolean
|
||||||
|
streamId: string
|
||||||
|
chatId: string
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Citation interface for documentation references
|
* Citation interface for documentation references
|
||||||
*/
|
*/
|
||||||
@@ -115,10 +124,16 @@ async function handleApiError(response: Response, defaultMessage: string): Promi
|
|||||||
/**
|
/**
|
||||||
* Send a streaming message to the copilot chat API
|
* Send a streaming message to the copilot chat API
|
||||||
* This is the main API endpoint that handles all chat operations
|
* This is the main API endpoint that handles all chat operations
|
||||||
|
*
|
||||||
|
* Server-first architecture:
|
||||||
|
* 1. POST to /api/copilot/chat - starts background processing, returns { streamId, chatId }
|
||||||
|
* 2. Connect to /api/copilot/stream/{streamId} for SSE stream
|
||||||
|
*
|
||||||
|
* This ensures stream continues server-side even if client disconnects
|
||||||
*/
|
*/
|
||||||
export async function sendStreamingMessage(
|
export async function sendStreamingMessage(
|
||||||
request: SendMessageRequest
|
request: SendMessageRequest
|
||||||
): Promise<StreamingResponse> {
|
): Promise<StreamingResponse & { streamId?: string; chatId?: string }> {
|
||||||
try {
|
try {
|
||||||
const { abortSignal, ...requestBody } = request
|
const { abortSignal, ...requestBody } = request
|
||||||
try {
|
try {
|
||||||
@@ -138,34 +153,83 @@ export async function sendStreamingMessage(
|
|||||||
contextsPreview: preview,
|
contextsPreview: preview,
|
||||||
})
|
})
|
||||||
} catch {}
|
} catch {}
|
||||||
const response = await fetch('/api/copilot/chat', {
|
|
||||||
|
// Step 1: Initiate chat - server starts background processing
|
||||||
|
const initResponse = await fetch('/api/copilot/chat', {
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
headers: { 'Content-Type': 'application/json' },
|
headers: { 'Content-Type': 'application/json' },
|
||||||
body: JSON.stringify({ ...requestBody, stream: true }),
|
body: JSON.stringify({ ...requestBody, stream: true }),
|
||||||
signal: abortSignal,
|
credentials: 'include',
|
||||||
credentials: 'include', // Include cookies for session authentication
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if (!response.ok) {
|
if (!initResponse.ok) {
|
||||||
const errorMessage = await handleApiError(response, 'Failed to send streaming message')
|
const errorMessage = await handleApiError(initResponse, 'Failed to initiate chat')
|
||||||
return {
|
return {
|
||||||
success: false,
|
success: false,
|
||||||
error: errorMessage,
|
error: errorMessage,
|
||||||
status: response.status,
|
status: initResponse.status,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!response.body) {
|
const initData: ChatInitResponse = await initResponse.json()
|
||||||
|
if (!initData.success || !initData.streamId) {
|
||||||
return {
|
return {
|
||||||
success: false,
|
success: false,
|
||||||
error: 'No response body received',
|
error: 'Failed to get stream ID from server',
|
||||||
status: 500,
|
status: 500,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.info('Chat initiated, connecting to stream', {
|
||||||
|
streamId: initData.streamId,
|
||||||
|
chatId: initData.chatId,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Step 2: Connect to stream endpoint for SSE
|
||||||
|
const streamResponse = await fetch(`/api/copilot/stream/${initData.streamId}`, {
|
||||||
|
method: 'GET',
|
||||||
|
headers: { Accept: 'text/event-stream' },
|
||||||
|
signal: abortSignal,
|
||||||
|
credentials: 'include',
|
||||||
|
})
|
||||||
|
|
||||||
|
if (!streamResponse.ok) {
|
||||||
|
// Handle completed/not found cases
|
||||||
|
if (streamResponse.status === 404) {
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
error: 'Stream not found or expired',
|
||||||
|
status: 404,
|
||||||
|
streamId: initData.streamId,
|
||||||
|
chatId: initData.chatId,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const errorMessage = await handleApiError(streamResponse, 'Failed to connect to stream')
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
error: errorMessage,
|
||||||
|
status: streamResponse.status,
|
||||||
|
streamId: initData.streamId,
|
||||||
|
chatId: initData.chatId,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!streamResponse.body) {
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
error: 'No stream body received',
|
||||||
|
status: 500,
|
||||||
|
streamId: initData.streamId,
|
||||||
|
chatId: initData.chatId,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
success: true,
|
success: true,
|
||||||
stream: response.body,
|
stream: streamResponse.body,
|
||||||
|
streamId: initData.streamId,
|
||||||
|
chatId: initData.chatId,
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
// Handle AbortError gracefully - this is expected when user aborts
|
// Handle AbortError gracefully - this is expected when user aborts
|
||||||
|
|||||||
743
apps/sim/lib/copilot/client-renderer.ts
Normal file
743
apps/sim/lib/copilot/client-renderer.ts
Normal file
@@ -0,0 +1,743 @@
|
|||||||
|
/**
|
||||||
|
* Client Renderer - Handles render events from the server
|
||||||
|
*
|
||||||
|
* This is the client-side counterpart to the stream transformer.
|
||||||
|
* It receives render events from the server and updates the UI accordingly.
|
||||||
|
* All business logic (tool execution, persistence) is handled server-side.
|
||||||
|
* The client just renders.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { createLogger } from '@sim/logger'
|
||||||
|
import type { RenderEvent, RenderEventType } from './render-events'
|
||||||
|
|
||||||
|
const logger = createLogger('ClientRenderer')
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Types
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
export interface RendererState {
|
||||||
|
// Stream state
|
||||||
|
streamId: string | null
|
||||||
|
chatId: string | null
|
||||||
|
isStreaming: boolean
|
||||||
|
isComplete: boolean
|
||||||
|
hasError: boolean
|
||||||
|
errorMessage: string | null
|
||||||
|
|
||||||
|
// Message state
|
||||||
|
currentMessageId: string | null
|
||||||
|
content: string
|
||||||
|
|
||||||
|
// Thinking state
|
||||||
|
isThinking: boolean
|
||||||
|
thinkingContent: string
|
||||||
|
|
||||||
|
// Tool calls
|
||||||
|
toolCalls: Map<string, ToolCallState>
|
||||||
|
|
||||||
|
// Plan state
|
||||||
|
isCapturingPlan: boolean
|
||||||
|
planContent: string
|
||||||
|
planTodos: PlanTodo[]
|
||||||
|
|
||||||
|
// Options state
|
||||||
|
isCapturingOptions: boolean
|
||||||
|
optionsContent: string
|
||||||
|
options: string[]
|
||||||
|
|
||||||
|
// Subagent state
|
||||||
|
activeSubagents: Map<string, SubagentState>
|
||||||
|
|
||||||
|
// Interrupts
|
||||||
|
pendingInterrupts: Map<string, InterruptState>
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ToolCallState {
|
||||||
|
id: string
|
||||||
|
name: string
|
||||||
|
args: Record<string, unknown>
|
||||||
|
status: 'pending' | 'generating' | 'executing' | 'success' | 'error' | 'aborted'
|
||||||
|
result?: unknown
|
||||||
|
error?: string
|
||||||
|
display: {
|
||||||
|
label: string
|
||||||
|
description?: string
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface SubagentState {
|
||||||
|
parentToolCallId: string
|
||||||
|
subagentId: string
|
||||||
|
label?: string
|
||||||
|
toolCalls: Map<string, ToolCallState>
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface PlanTodo {
|
||||||
|
id: string
|
||||||
|
content: string
|
||||||
|
status: 'pending' | 'in_progress' | 'completed'
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface InterruptState {
|
||||||
|
toolCallId: string
|
||||||
|
toolName: string
|
||||||
|
options: Array<{
|
||||||
|
id: string
|
||||||
|
label: string
|
||||||
|
description?: string
|
||||||
|
variant?: 'default' | 'destructive' | 'outline'
|
||||||
|
}>
|
||||||
|
message?: string
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface RendererCallbacks {
|
||||||
|
/** Called when state changes - trigger UI re-render */
|
||||||
|
onStateChange: (state: RendererState) => void
|
||||||
|
|
||||||
|
/** Called when a diff is ready - read workflow from DB */
|
||||||
|
onDiffReady?: (workflowId: string, toolCallId: string) => void
|
||||||
|
|
||||||
|
/** Called when user needs to resolve an interrupt */
|
||||||
|
onInterruptRequired?: (interrupt: InterruptState) => void
|
||||||
|
|
||||||
|
/** Called when stream completes */
|
||||||
|
onStreamComplete?: () => void
|
||||||
|
|
||||||
|
/** Called when stream errors */
|
||||||
|
onStreamError?: (error: string) => void
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Renderer Class
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
export class ClientRenderer {
|
||||||
|
private state: RendererState
|
||||||
|
private callbacks: RendererCallbacks
|
||||||
|
private eventQueue: RenderEvent[] = []
|
||||||
|
private isProcessing = false
|
||||||
|
|
||||||
|
constructor(callbacks: RendererCallbacks) {
|
||||||
|
this.callbacks = callbacks
|
||||||
|
this.state = this.createInitialState()
|
||||||
|
}
|
||||||
|
|
||||||
|
private createInitialState(): RendererState {
|
||||||
|
return {
|
||||||
|
streamId: null,
|
||||||
|
chatId: null,
|
||||||
|
isStreaming: false,
|
||||||
|
isComplete: false,
|
||||||
|
hasError: false,
|
||||||
|
errorMessage: null,
|
||||||
|
currentMessageId: null,
|
||||||
|
content: '',
|
||||||
|
isThinking: false,
|
||||||
|
thinkingContent: '',
|
||||||
|
toolCalls: new Map(),
|
||||||
|
isCapturingPlan: false,
|
||||||
|
planContent: '',
|
||||||
|
planTodos: [],
|
||||||
|
isCapturingOptions: false,
|
||||||
|
optionsContent: '',
|
||||||
|
options: [],
|
||||||
|
activeSubagents: new Map(),
|
||||||
|
pendingInterrupts: new Map(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Reset renderer state for a new stream */
|
||||||
|
reset(): void {
|
||||||
|
this.state = this.createInitialState()
|
||||||
|
this.eventQueue = []
|
||||||
|
this.isProcessing = false
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Get current state (immutable copy) */
|
||||||
|
getState(): Readonly<RendererState> {
|
||||||
|
return { ...this.state }
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Process a render event from the server */
|
||||||
|
async processEvent(event: RenderEvent): Promise<void> {
|
||||||
|
this.eventQueue.push(event)
|
||||||
|
await this.processQueue()
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Process multiple events (for replay) */
|
||||||
|
async processEvents(events: RenderEvent[]): Promise<void> {
|
||||||
|
this.eventQueue.push(...events)
|
||||||
|
await this.processQueue()
|
||||||
|
}
|
||||||
|
|
||||||
|
private async processQueue(): Promise<void> {
|
||||||
|
if (this.isProcessing) return
|
||||||
|
this.isProcessing = true
|
||||||
|
|
||||||
|
try {
|
||||||
|
while (this.eventQueue.length > 0) {
|
||||||
|
const event = this.eventQueue.shift()!
|
||||||
|
await this.handleEvent(event)
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
this.isProcessing = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async handleEvent(event: RenderEvent): Promise<void> {
|
||||||
|
const type = event.type as RenderEventType
|
||||||
|
|
||||||
|
switch (type) {
|
||||||
|
// ========== Stream Lifecycle ==========
|
||||||
|
case 'stream_start':
|
||||||
|
this.handleStreamStart(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'stream_end':
|
||||||
|
this.handleStreamEnd()
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'stream_error':
|
||||||
|
this.handleStreamError(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
// ========== Message Lifecycle ==========
|
||||||
|
case 'message_start':
|
||||||
|
this.handleMessageStart(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'message_saved':
|
||||||
|
this.handleMessageSaved(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'message_end':
|
||||||
|
this.handleMessageEnd(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
// ========== Text Content ==========
|
||||||
|
case 'text_delta':
|
||||||
|
this.handleTextDelta(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
// ========== Thinking ==========
|
||||||
|
case 'thinking_start':
|
||||||
|
this.handleThinkingStart()
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'thinking_delta':
|
||||||
|
this.handleThinkingDelta(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'thinking_end':
|
||||||
|
this.handleThinkingEnd()
|
||||||
|
break
|
||||||
|
|
||||||
|
// ========== Tool Calls ==========
|
||||||
|
case 'tool_pending':
|
||||||
|
this.handleToolPending(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'tool_generating':
|
||||||
|
this.handleToolGenerating(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'tool_executing':
|
||||||
|
this.handleToolExecuting(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'tool_success':
|
||||||
|
this.handleToolSuccess(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'tool_error':
|
||||||
|
this.handleToolError(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'tool_aborted':
|
||||||
|
this.handleToolAborted(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
// ========== Interrupts ==========
|
||||||
|
case 'interrupt_show':
|
||||||
|
this.handleInterruptShow(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'interrupt_resolved':
|
||||||
|
this.handleInterruptResolved(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
// ========== Diffs ==========
|
||||||
|
case 'diff_ready':
|
||||||
|
this.handleDiffReady(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
// ========== Plans ==========
|
||||||
|
case 'plan_start':
|
||||||
|
this.handlePlanStart()
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'plan_delta':
|
||||||
|
this.handlePlanDelta(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'plan_end':
|
||||||
|
this.handlePlanEnd(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
// ========== Options ==========
|
||||||
|
case 'options_start':
|
||||||
|
this.handleOptionsStart()
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'options_delta':
|
||||||
|
this.handleOptionsDelta(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'options_end':
|
||||||
|
this.handleOptionsEnd(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
// ========== Subagents ==========
|
||||||
|
case 'subagent_start':
|
||||||
|
this.handleSubagentStart(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'subagent_tool_pending':
|
||||||
|
this.handleSubagentToolPending(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'subagent_tool_executing':
|
||||||
|
this.handleSubagentToolExecuting(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'subagent_tool_success':
|
||||||
|
this.handleSubagentToolSuccess(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'subagent_tool_error':
|
||||||
|
this.handleSubagentToolError(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'subagent_end':
|
||||||
|
this.handleSubagentEnd(event as any)
|
||||||
|
break
|
||||||
|
|
||||||
|
// ========== Chat Metadata ==========
|
||||||
|
case 'chat_id':
|
||||||
|
this.state.chatId = (event as any).chatId
|
||||||
|
this.notifyStateChange()
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'title_updated':
|
||||||
|
// Title updates are handled externally
|
||||||
|
logger.debug('Title updated', { title: (event as any).title })
|
||||||
|
break
|
||||||
|
|
||||||
|
default:
|
||||||
|
logger.warn('Unknown render event type', { type })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Event Handlers
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
private handleStreamStart(event: {
|
||||||
|
streamId: string
|
||||||
|
chatId: string
|
||||||
|
userMessageId: string
|
||||||
|
assistantMessageId: string
|
||||||
|
}): void {
|
||||||
|
this.state.streamId = event.streamId
|
||||||
|
this.state.chatId = event.chatId
|
||||||
|
this.state.currentMessageId = event.assistantMessageId
|
||||||
|
this.state.isStreaming = true
|
||||||
|
this.state.isComplete = false
|
||||||
|
this.state.hasError = false
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleStreamEnd(): void {
|
||||||
|
this.state.isStreaming = false
|
||||||
|
this.state.isComplete = true
|
||||||
|
this.notifyStateChange()
|
||||||
|
this.callbacks.onStreamComplete?.()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleStreamError(event: { error: string }): void {
|
||||||
|
this.state.isStreaming = false
|
||||||
|
this.state.hasError = true
|
||||||
|
this.state.errorMessage = event.error
|
||||||
|
this.notifyStateChange()
|
||||||
|
this.callbacks.onStreamError?.(event.error)
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleMessageStart(event: { messageId: string; role: string }): void {
|
||||||
|
if (event.role === 'assistant') {
|
||||||
|
this.state.currentMessageId = event.messageId
|
||||||
|
this.state.content = ''
|
||||||
|
}
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleMessageSaved(event: { messageId: string; refreshFromDb?: boolean }): void {
|
||||||
|
logger.debug('Message saved', { messageId: event.messageId, refresh: event.refreshFromDb })
|
||||||
|
// If refreshFromDb is true, the message was saved with special state (like diff markers)
|
||||||
|
// The client should refresh from DB to get the latest state
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleMessageEnd(event: { messageId: string }): void {
|
||||||
|
logger.debug('Message end', { messageId: event.messageId })
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleTextDelta(event: { content: string }): void {
|
||||||
|
this.state.content += event.content
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleThinkingStart(): void {
|
||||||
|
this.state.isThinking = true
|
||||||
|
this.state.thinkingContent = ''
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleThinkingDelta(event: { content: string }): void {
|
||||||
|
this.state.thinkingContent += event.content
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleThinkingEnd(): void {
|
||||||
|
this.state.isThinking = false
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleToolPending(event: {
|
||||||
|
toolCallId: string
|
||||||
|
toolName: string
|
||||||
|
args: Record<string, unknown>
|
||||||
|
display: { label: string; description?: string }
|
||||||
|
}): void {
|
||||||
|
this.state.toolCalls.set(event.toolCallId, {
|
||||||
|
id: event.toolCallId,
|
||||||
|
name: event.toolName,
|
||||||
|
args: event.args,
|
||||||
|
status: 'pending',
|
||||||
|
display: event.display,
|
||||||
|
})
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleToolGenerating(event: {
|
||||||
|
toolCallId: string
|
||||||
|
argsPartial?: Record<string, unknown>
|
||||||
|
}): void {
|
||||||
|
const tool = this.state.toolCalls.get(event.toolCallId)
|
||||||
|
if (tool) {
|
||||||
|
tool.status = 'generating'
|
||||||
|
if (event.argsPartial) {
|
||||||
|
tool.args = event.argsPartial
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleToolExecuting(event: { toolCallId: string }): void {
|
||||||
|
const tool = this.state.toolCalls.get(event.toolCallId)
|
||||||
|
if (tool) {
|
||||||
|
tool.status = 'executing'
|
||||||
|
}
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleToolSuccess(event: {
|
||||||
|
toolCallId: string
|
||||||
|
result?: unknown
|
||||||
|
display?: { label: string; description?: string }
|
||||||
|
workflowId?: string
|
||||||
|
hasDiff?: boolean
|
||||||
|
}): void {
|
||||||
|
const tool = this.state.toolCalls.get(event.toolCallId)
|
||||||
|
if (tool) {
|
||||||
|
tool.status = 'success'
|
||||||
|
tool.result = event.result
|
||||||
|
if (event.display) {
|
||||||
|
tool.display = event.display
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleToolError(event: {
|
||||||
|
toolCallId: string
|
||||||
|
error: string
|
||||||
|
display?: { label: string; description?: string }
|
||||||
|
}): void {
|
||||||
|
const tool = this.state.toolCalls.get(event.toolCallId)
|
||||||
|
if (tool) {
|
||||||
|
tool.status = 'error'
|
||||||
|
tool.error = event.error
|
||||||
|
if (event.display) {
|
||||||
|
tool.display = event.display
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleToolAborted(event: { toolCallId: string; reason?: string }): void {
|
||||||
|
const tool = this.state.toolCalls.get(event.toolCallId)
|
||||||
|
if (tool) {
|
||||||
|
tool.status = 'aborted'
|
||||||
|
tool.error = event.reason
|
||||||
|
}
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleInterruptShow(event: {
|
||||||
|
toolCallId: string
|
||||||
|
toolName: string
|
||||||
|
options: Array<{
|
||||||
|
id: string
|
||||||
|
label: string
|
||||||
|
description?: string
|
||||||
|
variant?: 'default' | 'destructive' | 'outline'
|
||||||
|
}>
|
||||||
|
message?: string
|
||||||
|
}): void {
|
||||||
|
this.state.pendingInterrupts.set(event.toolCallId, {
|
||||||
|
toolCallId: event.toolCallId,
|
||||||
|
toolName: event.toolName,
|
||||||
|
options: event.options,
|
||||||
|
message: event.message,
|
||||||
|
})
|
||||||
|
this.notifyStateChange()
|
||||||
|
this.callbacks.onInterruptRequired?.({
|
||||||
|
toolCallId: event.toolCallId,
|
||||||
|
toolName: event.toolName,
|
||||||
|
options: event.options,
|
||||||
|
message: event.message,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleInterruptResolved(event: {
|
||||||
|
toolCallId: string
|
||||||
|
choice: string
|
||||||
|
approved: boolean
|
||||||
|
}): void {
|
||||||
|
this.state.pendingInterrupts.delete(event.toolCallId)
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleDiffReady(event: { workflowId: string; toolCallId: string }): void {
|
||||||
|
this.callbacks.onDiffReady?.(event.workflowId, event.toolCallId)
|
||||||
|
}
|
||||||
|
|
||||||
|
private handlePlanStart(): void {
|
||||||
|
this.state.isCapturingPlan = true
|
||||||
|
this.state.planContent = ''
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handlePlanDelta(event: { content: string }): void {
|
||||||
|
this.state.planContent += event.content
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handlePlanEnd(event: { todos: PlanTodo[] }): void {
|
||||||
|
this.state.isCapturingPlan = false
|
||||||
|
this.state.planTodos = event.todos
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleOptionsStart(): void {
|
||||||
|
this.state.isCapturingOptions = true
|
||||||
|
this.state.optionsContent = ''
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleOptionsDelta(event: { content: string }): void {
|
||||||
|
this.state.optionsContent += event.content
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleOptionsEnd(event: { options: string[] }): void {
|
||||||
|
this.state.isCapturingOptions = false
|
||||||
|
this.state.options = event.options
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleSubagentStart(event: {
|
||||||
|
parentToolCallId: string
|
||||||
|
subagentId: string
|
||||||
|
label?: string
|
||||||
|
}): void {
|
||||||
|
this.state.activeSubagents.set(event.parentToolCallId, {
|
||||||
|
parentToolCallId: event.parentToolCallId,
|
||||||
|
subagentId: event.subagentId,
|
||||||
|
label: event.label,
|
||||||
|
toolCalls: new Map(),
|
||||||
|
})
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleSubagentToolPending(event: {
|
||||||
|
parentToolCallId: string
|
||||||
|
toolCallId: string
|
||||||
|
toolName: string
|
||||||
|
args: Record<string, unknown>
|
||||||
|
display: { label: string; description?: string }
|
||||||
|
}): void {
|
||||||
|
const subagent = this.state.activeSubagents.get(event.parentToolCallId)
|
||||||
|
if (subagent) {
|
||||||
|
subagent.toolCalls.set(event.toolCallId, {
|
||||||
|
id: event.toolCallId,
|
||||||
|
name: event.toolName,
|
||||||
|
args: event.args,
|
||||||
|
status: 'pending',
|
||||||
|
display: event.display,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleSubagentToolExecuting(event: {
|
||||||
|
parentToolCallId: string
|
||||||
|
toolCallId: string
|
||||||
|
}): void {
|
||||||
|
const subagent = this.state.activeSubagents.get(event.parentToolCallId)
|
||||||
|
if (subagent) {
|
||||||
|
const tool = subagent.toolCalls.get(event.toolCallId)
|
||||||
|
if (tool) {
|
||||||
|
tool.status = 'executing'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleSubagentToolSuccess(event: {
|
||||||
|
parentToolCallId: string
|
||||||
|
toolCallId: string
|
||||||
|
result?: unknown
|
||||||
|
display?: { label: string; description?: string }
|
||||||
|
}): void {
|
||||||
|
const subagent = this.state.activeSubagents.get(event.parentToolCallId)
|
||||||
|
if (subagent) {
|
||||||
|
const tool = subagent.toolCalls.get(event.toolCallId)
|
||||||
|
if (tool) {
|
||||||
|
tool.status = 'success'
|
||||||
|
tool.result = event.result
|
||||||
|
if (event.display) {
|
||||||
|
tool.display = event.display
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleSubagentToolError(event: {
|
||||||
|
parentToolCallId: string
|
||||||
|
toolCallId: string
|
||||||
|
error: string
|
||||||
|
}): void {
|
||||||
|
const subagent = this.state.activeSubagents.get(event.parentToolCallId)
|
||||||
|
if (subagent) {
|
||||||
|
const tool = subagent.toolCalls.get(event.toolCallId)
|
||||||
|
if (tool) {
|
||||||
|
tool.status = 'error'
|
||||||
|
tool.error = event.error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleSubagentEnd(event: { parentToolCallId: string }): void {
|
||||||
|
// Keep subagent data for display, just mark as complete
|
||||||
|
logger.debug('Subagent ended', { parentToolCallId: event.parentToolCallId })
|
||||||
|
this.notifyStateChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
private notifyStateChange(): void {
|
||||||
|
this.callbacks.onStateChange(this.getState())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Helper Functions
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parse a render event from an SSE data line
|
||||||
|
*/
|
||||||
|
export function parseRenderEvent(line: string): RenderEvent | null {
|
||||||
|
if (!line.startsWith('data: ')) return null
|
||||||
|
try {
|
||||||
|
return JSON.parse(line.slice(6)) as RenderEvent
|
||||||
|
} catch {
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stream events from an SSE endpoint and process them
|
||||||
|
*/
|
||||||
|
export async function streamRenderEvents(
|
||||||
|
url: string,
|
||||||
|
renderer: ClientRenderer,
|
||||||
|
options?: {
|
||||||
|
signal?: AbortSignal
|
||||||
|
onConnect?: () => void
|
||||||
|
onError?: (error: Error) => void
|
||||||
|
}
|
||||||
|
): Promise<void> {
|
||||||
|
const response = await fetch(url, {
|
||||||
|
headers: { Accept: 'text/event-stream' },
|
||||||
|
signal: options?.signal,
|
||||||
|
})
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
const error = new Error(`Stream failed: ${response.status}`)
|
||||||
|
options?.onError?.(error)
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
|
||||||
|
options?.onConnect?.()
|
||||||
|
|
||||||
|
const reader = response.body?.getReader()
|
||||||
|
if (!reader) {
|
||||||
|
throw new Error('No response body')
|
||||||
|
}
|
||||||
|
|
||||||
|
const decoder = new TextDecoder()
|
||||||
|
let buffer = ''
|
||||||
|
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
const { done, value } = await reader.read()
|
||||||
|
if (done) break
|
||||||
|
|
||||||
|
buffer += decoder.decode(value, { stream: true })
|
||||||
|
|
||||||
|
const lines = buffer.split('\n')
|
||||||
|
buffer = lines.pop() || ''
|
||||||
|
|
||||||
|
for (const line of lines) {
|
||||||
|
const event = parseRenderEvent(line)
|
||||||
|
if (event) {
|
||||||
|
await renderer.processEvent(event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process remaining buffer
|
||||||
|
if (buffer) {
|
||||||
|
const event = parseRenderEvent(buffer)
|
||||||
|
if (event) {
|
||||||
|
await renderer.processEvent(event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
reader.releaseLock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
470
apps/sim/lib/copilot/render-events.ts
Normal file
470
apps/sim/lib/copilot/render-events.ts
Normal file
@@ -0,0 +1,470 @@
|
|||||||
|
/**
|
||||||
|
* Render Events - Server → Client SSE Protocol
|
||||||
|
*
|
||||||
|
* This defines the SSE event protocol between the copilot server and client.
|
||||||
|
* The server processes the raw Sim Agent stream, executes tools, persists to DB,
|
||||||
|
* and emits these render events. The client just renders based on these events.
|
||||||
|
*
|
||||||
|
* Benefits:
|
||||||
|
* - Client is purely a renderer (no parsing, no execution)
|
||||||
|
* - Persistence happens before render (safe to refresh anytime)
|
||||||
|
* - Works identically with or without a client (API-only mode)
|
||||||
|
* - Resume is just replaying render events
|
||||||
|
*/
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Base Types
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
export interface BaseRenderEvent {
|
||||||
|
type: RenderEventType
|
||||||
|
/** Monotonically increasing sequence number for ordering */
|
||||||
|
seq: number
|
||||||
|
/** Timestamp when event was created */
|
||||||
|
ts: number
|
||||||
|
}
|
||||||
|
|
||||||
|
export type RenderEventType =
|
||||||
|
// Stream lifecycle
|
||||||
|
| 'stream_start'
|
||||||
|
| 'stream_end'
|
||||||
|
| 'stream_error'
|
||||||
|
|
||||||
|
// Message lifecycle
|
||||||
|
| 'message_start'
|
||||||
|
| 'message_saved'
|
||||||
|
| 'message_end'
|
||||||
|
|
||||||
|
// Text content
|
||||||
|
| 'text_delta'
|
||||||
|
|
||||||
|
// Thinking blocks
|
||||||
|
| 'thinking_start'
|
||||||
|
| 'thinking_delta'
|
||||||
|
| 'thinking_end'
|
||||||
|
|
||||||
|
// Tool calls
|
||||||
|
| 'tool_pending'
|
||||||
|
| 'tool_generating'
|
||||||
|
| 'tool_executing'
|
||||||
|
| 'tool_success'
|
||||||
|
| 'tool_error'
|
||||||
|
| 'tool_aborted'
|
||||||
|
|
||||||
|
// Interrupts (user approval needed)
|
||||||
|
| 'interrupt_show'
|
||||||
|
| 'interrupt_resolved'
|
||||||
|
|
||||||
|
// Workflow diffs
|
||||||
|
| 'diff_ready'
|
||||||
|
| 'diff_accepted'
|
||||||
|
| 'diff_rejected'
|
||||||
|
|
||||||
|
// Plans
|
||||||
|
| 'plan_start'
|
||||||
|
| 'plan_delta'
|
||||||
|
| 'plan_end'
|
||||||
|
|
||||||
|
// Options (continue/follow-up suggestions)
|
||||||
|
| 'options_start'
|
||||||
|
| 'options_delta'
|
||||||
|
| 'options_end'
|
||||||
|
|
||||||
|
// Subagents
|
||||||
|
| 'subagent_start'
|
||||||
|
| 'subagent_tool_pending'
|
||||||
|
| 'subagent_tool_generating'
|
||||||
|
| 'subagent_tool_executing'
|
||||||
|
| 'subagent_tool_success'
|
||||||
|
| 'subagent_tool_error'
|
||||||
|
| 'subagent_end'
|
||||||
|
|
||||||
|
// Chat metadata
|
||||||
|
| 'chat_id'
|
||||||
|
| 'title_updated'
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Stream Lifecycle Events
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
export interface StreamStartEvent extends BaseRenderEvent {
|
||||||
|
type: 'stream_start'
|
||||||
|
streamId: string
|
||||||
|
chatId: string
|
||||||
|
userMessageId: string
|
||||||
|
assistantMessageId: string
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface StreamEndEvent extends BaseRenderEvent {
|
||||||
|
type: 'stream_end'
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface StreamErrorEvent extends BaseRenderEvent {
|
||||||
|
type: 'stream_error'
|
||||||
|
error: string
|
||||||
|
code?: string
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Message Lifecycle Events
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
export interface MessageStartEvent extends BaseRenderEvent {
|
||||||
|
type: 'message_start'
|
||||||
|
messageId: string
|
||||||
|
role: 'user' | 'assistant'
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface MessageSavedEvent extends BaseRenderEvent {
|
||||||
|
type: 'message_saved'
|
||||||
|
messageId: string
|
||||||
|
/** If true, client should refresh message from DB (contains diff markers, etc.) */
|
||||||
|
refreshFromDb?: boolean
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface MessageEndEvent extends BaseRenderEvent {
|
||||||
|
type: 'message_end'
|
||||||
|
messageId: string
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Text Content Events
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
export interface TextDeltaEvent extends BaseRenderEvent {
|
||||||
|
type: 'text_delta'
|
||||||
|
content: string
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Thinking Block Events
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
export interface ThinkingStartEvent extends BaseRenderEvent {
|
||||||
|
type: 'thinking_start'
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ThinkingDeltaEvent extends BaseRenderEvent {
|
||||||
|
type: 'thinking_delta'
|
||||||
|
content: string
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ThinkingEndEvent extends BaseRenderEvent {
|
||||||
|
type: 'thinking_end'
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Tool Call Events
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
export interface ToolDisplay {
|
||||||
|
label: string
|
||||||
|
description?: string
|
||||||
|
icon?: string
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ToolPendingEvent extends BaseRenderEvent {
|
||||||
|
type: 'tool_pending'
|
||||||
|
toolCallId: string
|
||||||
|
toolName: string
|
||||||
|
args: Record<string, unknown>
|
||||||
|
display: ToolDisplay
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ToolGeneratingEvent extends BaseRenderEvent {
|
||||||
|
type: 'tool_generating'
|
||||||
|
toolCallId: string
|
||||||
|
/** Partial args as they stream in */
|
||||||
|
argsDelta?: string
|
||||||
|
/** Full args so far */
|
||||||
|
argsPartial?: Record<string, unknown>
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ToolExecutingEvent extends BaseRenderEvent {
|
||||||
|
type: 'tool_executing'
|
||||||
|
toolCallId: string
|
||||||
|
display?: ToolDisplay
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ToolSuccessEvent extends BaseRenderEvent {
|
||||||
|
type: 'tool_success'
|
||||||
|
toolCallId: string
|
||||||
|
result?: unknown
|
||||||
|
display?: ToolDisplay
|
||||||
|
/** For edit_workflow: tells client to read diff from DB */
|
||||||
|
workflowId?: string
|
||||||
|
hasDiff?: boolean
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ToolErrorEvent extends BaseRenderEvent {
|
||||||
|
type: 'tool_error'
|
||||||
|
toolCallId: string
|
||||||
|
error: string
|
||||||
|
display?: ToolDisplay
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ToolAbortedEvent extends BaseRenderEvent {
|
||||||
|
type: 'tool_aborted'
|
||||||
|
toolCallId: string
|
||||||
|
reason?: string
|
||||||
|
display?: ToolDisplay
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Interrupt Events (User Approval)
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
export interface InterruptOption {
|
||||||
|
id: string
|
||||||
|
label: string
|
||||||
|
description?: string
|
||||||
|
variant?: 'default' | 'destructive' | 'outline'
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface InterruptShowEvent extends BaseRenderEvent {
|
||||||
|
type: 'interrupt_show'
|
||||||
|
toolCallId: string
|
||||||
|
toolName: string
|
||||||
|
options: InterruptOption[]
|
||||||
|
/** Optional message to display */
|
||||||
|
message?: string
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface InterruptResolvedEvent extends BaseRenderEvent {
|
||||||
|
type: 'interrupt_resolved'
|
||||||
|
toolCallId: string
|
||||||
|
choice: string
|
||||||
|
/** Whether to continue execution */
|
||||||
|
approved: boolean
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Workflow Diff Events
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
export interface DiffReadyEvent extends BaseRenderEvent {
|
||||||
|
type: 'diff_ready'
|
||||||
|
workflowId: string
|
||||||
|
toolCallId: string
|
||||||
|
/** Client should read workflow state from DB which contains diff markers */
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface DiffAcceptedEvent extends BaseRenderEvent {
|
||||||
|
type: 'diff_accepted'
|
||||||
|
workflowId: string
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface DiffRejectedEvent extends BaseRenderEvent {
|
||||||
|
type: 'diff_rejected'
|
||||||
|
workflowId: string
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Plan Events
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
export interface PlanTodo {
|
||||||
|
id: string
|
||||||
|
content: string
|
||||||
|
status: 'pending' | 'in_progress' | 'completed'
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface PlanStartEvent extends BaseRenderEvent {
|
||||||
|
type: 'plan_start'
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface PlanDeltaEvent extends BaseRenderEvent {
|
||||||
|
type: 'plan_delta'
|
||||||
|
content: string
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface PlanEndEvent extends BaseRenderEvent {
|
||||||
|
type: 'plan_end'
|
||||||
|
todos: PlanTodo[]
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Options Events (Follow-up Suggestions)
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
export interface OptionsStartEvent extends BaseRenderEvent {
|
||||||
|
type: 'options_start'
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface OptionsDeltaEvent extends BaseRenderEvent {
|
||||||
|
type: 'options_delta'
|
||||||
|
content: string
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface OptionsEndEvent extends BaseRenderEvent {
|
||||||
|
type: 'options_end'
|
||||||
|
options: string[]
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Subagent Events
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
export interface SubagentStartEvent extends BaseRenderEvent {
|
||||||
|
type: 'subagent_start'
|
||||||
|
parentToolCallId: string
|
||||||
|
subagentId: string
|
||||||
|
label?: string
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface SubagentToolPendingEvent extends BaseRenderEvent {
|
||||||
|
type: 'subagent_tool_pending'
|
||||||
|
parentToolCallId: string
|
||||||
|
toolCallId: string
|
||||||
|
toolName: string
|
||||||
|
args: Record<string, unknown>
|
||||||
|
display: ToolDisplay
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface SubagentToolGeneratingEvent extends BaseRenderEvent {
|
||||||
|
type: 'subagent_tool_generating'
|
||||||
|
parentToolCallId: string
|
||||||
|
toolCallId: string
|
||||||
|
argsDelta?: string
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface SubagentToolExecutingEvent extends BaseRenderEvent {
|
||||||
|
type: 'subagent_tool_executing'
|
||||||
|
parentToolCallId: string
|
||||||
|
toolCallId: string
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface SubagentToolSuccessEvent extends BaseRenderEvent {
|
||||||
|
type: 'subagent_tool_success'
|
||||||
|
parentToolCallId: string
|
||||||
|
toolCallId: string
|
||||||
|
result?: unknown
|
||||||
|
display?: ToolDisplay
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface SubagentToolErrorEvent extends BaseRenderEvent {
|
||||||
|
type: 'subagent_tool_error'
|
||||||
|
parentToolCallId: string
|
||||||
|
toolCallId: string
|
||||||
|
error: string
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface SubagentEndEvent extends BaseRenderEvent {
|
||||||
|
type: 'subagent_end'
|
||||||
|
parentToolCallId: string
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Chat Metadata Events
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
export interface ChatIdEvent extends BaseRenderEvent {
|
||||||
|
type: 'chat_id'
|
||||||
|
chatId: string
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface TitleUpdatedEvent extends BaseRenderEvent {
|
||||||
|
type: 'title_updated'
|
||||||
|
title: string
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Union Type
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
export type RenderEvent =
|
||||||
|
// Stream lifecycle
|
||||||
|
| StreamStartEvent
|
||||||
|
| StreamEndEvent
|
||||||
|
| StreamErrorEvent
|
||||||
|
// Message lifecycle
|
||||||
|
| MessageStartEvent
|
||||||
|
| MessageSavedEvent
|
||||||
|
| MessageEndEvent
|
||||||
|
// Text content
|
||||||
|
| TextDeltaEvent
|
||||||
|
// Thinking
|
||||||
|
| ThinkingStartEvent
|
||||||
|
| ThinkingDeltaEvent
|
||||||
|
| ThinkingEndEvent
|
||||||
|
// Tool calls
|
||||||
|
| ToolPendingEvent
|
||||||
|
| ToolGeneratingEvent
|
||||||
|
| ToolExecutingEvent
|
||||||
|
| ToolSuccessEvent
|
||||||
|
| ToolErrorEvent
|
||||||
|
| ToolAbortedEvent
|
||||||
|
// Interrupts
|
||||||
|
| InterruptShowEvent
|
||||||
|
| InterruptResolvedEvent
|
||||||
|
// Diffs
|
||||||
|
| DiffReadyEvent
|
||||||
|
| DiffAcceptedEvent
|
||||||
|
| DiffRejectedEvent
|
||||||
|
// Plans
|
||||||
|
| PlanStartEvent
|
||||||
|
| PlanDeltaEvent
|
||||||
|
| PlanEndEvent
|
||||||
|
// Options
|
||||||
|
| OptionsStartEvent
|
||||||
|
| OptionsDeltaEvent
|
||||||
|
| OptionsEndEvent
|
||||||
|
// Subagents
|
||||||
|
| SubagentStartEvent
|
||||||
|
| SubagentToolPendingEvent
|
||||||
|
| SubagentToolGeneratingEvent
|
||||||
|
| SubagentToolExecutingEvent
|
||||||
|
| SubagentToolSuccessEvent
|
||||||
|
| SubagentToolErrorEvent
|
||||||
|
| SubagentEndEvent
|
||||||
|
// Chat metadata
|
||||||
|
| ChatIdEvent
|
||||||
|
| TitleUpdatedEvent
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Helper Functions
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
let seqCounter = 0
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a render event with auto-incrementing sequence number
|
||||||
|
*/
|
||||||
|
export function createRenderEvent<T extends RenderEventType>(
|
||||||
|
type: T,
|
||||||
|
data: Omit<Extract<RenderEvent, { type: T }>, 'type' | 'seq' | 'ts'>
|
||||||
|
): Extract<RenderEvent, { type: T }> {
|
||||||
|
return {
|
||||||
|
type,
|
||||||
|
seq: ++seqCounter,
|
||||||
|
ts: Date.now(),
|
||||||
|
...data,
|
||||||
|
} as Extract<RenderEvent, { type: T }>
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset sequence counter (for testing or new streams)
|
||||||
|
*/
|
||||||
|
export function resetSeqCounter(): void {
|
||||||
|
seqCounter = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Serialize a render event to SSE format
|
||||||
|
*/
|
||||||
|
export function serializeRenderEvent(event: RenderEvent): string {
|
||||||
|
return `data: ${JSON.stringify(event)}\n\n`
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parse a render event from SSE data line
|
||||||
|
*/
|
||||||
|
export function parseRenderEvent(line: string): RenderEvent | null {
|
||||||
|
if (!line.startsWith('data: ')) return null
|
||||||
|
try {
|
||||||
|
return JSON.parse(line.slice(6)) as RenderEvent
|
||||||
|
} catch {
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
438
apps/sim/lib/copilot/server-tool-executor.ts
Normal file
438
apps/sim/lib/copilot/server-tool-executor.ts
Normal file
@@ -0,0 +1,438 @@
|
|||||||
|
/**
|
||||||
|
* Server-Side Tool Executor for Copilot
|
||||||
|
*
|
||||||
|
* Executes copilot tools server-side when no client session is present.
|
||||||
|
* Handles routing to appropriate server implementations and marking tools complete.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { db } from '@sim/db'
|
||||||
|
import { account, workflow } from '@sim/db/schema'
|
||||||
|
import { createLogger } from '@sim/logger'
|
||||||
|
import { and, eq } from 'drizzle-orm'
|
||||||
|
import { isClientOnlyTool } from '@/lib/copilot/tools/client/ui-config'
|
||||||
|
import { routeExecution } from '@/lib/copilot/tools/server/router'
|
||||||
|
import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants'
|
||||||
|
import { env } from '@/lib/core/config/env'
|
||||||
|
import { generateRequestId } from '@/lib/core/utils/request'
|
||||||
|
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||||
|
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
|
||||||
|
import { refreshTokenIfNeeded } from '@/app/api/auth/oauth/utils'
|
||||||
|
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
|
||||||
|
import { executeTool } from '@/tools'
|
||||||
|
import { getTool, resolveToolId } from '@/tools/utils'
|
||||||
|
|
||||||
|
const logger = createLogger('ServerToolExecutor')
|
||||||
|
|
||||||
|
const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context for tool execution
|
||||||
|
*/
|
||||||
|
export interface ToolExecutionContext {
|
||||||
|
userId: string
|
||||||
|
workflowId: string
|
||||||
|
chatId: string
|
||||||
|
streamId: string
|
||||||
|
workspaceId?: string
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Result of tool execution
|
||||||
|
*/
|
||||||
|
export interface ToolExecutionResult {
|
||||||
|
success: boolean
|
||||||
|
status: number
|
||||||
|
message?: string
|
||||||
|
data?: unknown
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tools that have dedicated server implementations in the router
|
||||||
|
*/
|
||||||
|
const SERVER_ROUTED_TOOLS = [
|
||||||
|
'edit_workflow',
|
||||||
|
'get_workflow_data',
|
||||||
|
'get_workflow_console',
|
||||||
|
'get_blocks_and_tools',
|
||||||
|
'get_blocks_metadata',
|
||||||
|
'get_block_options',
|
||||||
|
'get_block_config',
|
||||||
|
'get_trigger_blocks',
|
||||||
|
'knowledge_base',
|
||||||
|
'set_environment_variables',
|
||||||
|
'get_credentials',
|
||||||
|
'search_documentation',
|
||||||
|
'make_api_request',
|
||||||
|
'search_online',
|
||||||
|
]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tools that execute workflows
|
||||||
|
*/
|
||||||
|
const WORKFLOW_EXECUTION_TOOLS = ['run_workflow']
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tools that handle deployments
|
||||||
|
*/
|
||||||
|
const DEPLOYMENT_TOOLS = ['deploy_api', 'deploy_chat', 'deploy_mcp', 'redeploy']
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a tool server-side.
|
||||||
|
* Returns result to be sent to Sim Agent via mark-complete.
|
||||||
|
*/
|
||||||
|
export async function executeToolServerSide(
|
||||||
|
toolName: string,
|
||||||
|
toolCallId: string,
|
||||||
|
args: Record<string, unknown>,
|
||||||
|
context: ToolExecutionContext
|
||||||
|
): Promise<ToolExecutionResult> {
|
||||||
|
logger.info('Executing tool server-side', {
|
||||||
|
toolName,
|
||||||
|
toolCallId,
|
||||||
|
userId: context.userId,
|
||||||
|
workflowId: context.workflowId,
|
||||||
|
})
|
||||||
|
|
||||||
|
// 1. Check if tool is client-only
|
||||||
|
if (isClientOnlyTool(toolName)) {
|
||||||
|
logger.info('Skipping client-only tool', { toolName, toolCallId })
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
status: 200,
|
||||||
|
message: `Tool "${toolName}" requires a browser session and was skipped in API mode.`,
|
||||||
|
data: { skipped: true, reason: 'client_only' },
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 2. Route to appropriate executor
|
||||||
|
if (SERVER_ROUTED_TOOLS.includes(toolName)) {
|
||||||
|
return executeServerRoutedTool(toolName, args, context)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (WORKFLOW_EXECUTION_TOOLS.includes(toolName)) {
|
||||||
|
return executeRunWorkflow(args, context)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (DEPLOYMENT_TOOLS.includes(toolName)) {
|
||||||
|
return executeDeploymentTool(toolName, args, context)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Try integration tool execution (Slack, Gmail, etc.)
|
||||||
|
return executeIntegrationTool(toolName, toolCallId, args, context)
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Tool execution failed', {
|
||||||
|
toolName,
|
||||||
|
toolCallId,
|
||||||
|
error: error instanceof Error ? error.message : String(error),
|
||||||
|
})
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
status: 500,
|
||||||
|
message: error instanceof Error ? error.message : 'Tool execution failed',
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a tool that has a dedicated server implementation
|
||||||
|
*/
|
||||||
|
async function executeServerRoutedTool(
|
||||||
|
toolName: string,
|
||||||
|
args: Record<string, unknown>,
|
||||||
|
context: ToolExecutionContext
|
||||||
|
): Promise<ToolExecutionResult> {
|
||||||
|
try {
|
||||||
|
const result = await routeExecution(toolName, args, { userId: context.userId })
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
status: 200,
|
||||||
|
data: result,
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
status: 500,
|
||||||
|
message: error instanceof Error ? error.message : 'Server tool execution failed',
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute the run_workflow tool
|
||||||
|
*/
|
||||||
|
async function executeRunWorkflow(
|
||||||
|
args: Record<string, unknown>,
|
||||||
|
context: ToolExecutionContext
|
||||||
|
): Promise<ToolExecutionResult> {
|
||||||
|
const workflowId = (args.workflowId as string) || context.workflowId
|
||||||
|
const input = (args.input as Record<string, unknown>) || {}
|
||||||
|
|
||||||
|
logger.info('Executing run_workflow', { workflowId, inputKeys: Object.keys(input) })
|
||||||
|
|
||||||
|
try {
|
||||||
|
const response = await fetch(`${getBaseUrl()}/api/workflows/${workflowId}/execute`, {
|
||||||
|
method: 'POST',
|
||||||
|
headers: {
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
Authorization: `Bearer ${await generateInternalToken()}`,
|
||||||
|
},
|
||||||
|
body: JSON.stringify({
|
||||||
|
input,
|
||||||
|
triggerType: 'copilot',
|
||||||
|
workflowId, // For internal auth
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
const errorText = await response.text()
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
status: response.status,
|
||||||
|
message: `Workflow execution failed: ${errorText}`,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const result = await response.json()
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
status: 200,
|
||||||
|
data: result,
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
status: 500,
|
||||||
|
message: error instanceof Error ? error.message : 'Workflow execution failed',
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a deployment tool
|
||||||
|
*/
|
||||||
|
async function executeDeploymentTool(
|
||||||
|
toolName: string,
|
||||||
|
args: Record<string, unknown>,
|
||||||
|
context: ToolExecutionContext
|
||||||
|
): Promise<ToolExecutionResult> {
|
||||||
|
// Deployment tools modify workflow state and create deployments
|
||||||
|
// These can be executed server-side via the server router
|
||||||
|
try {
|
||||||
|
const result = await routeExecution(toolName, args, { userId: context.userId })
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
status: 200,
|
||||||
|
data: result,
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
// If the tool isn't in the router, it might need to be added
|
||||||
|
// For now, return a skip result
|
||||||
|
logger.warn('Deployment tool not available server-side', { toolName })
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
status: 200,
|
||||||
|
message: `Deployment tool "${toolName}" executed with limited functionality in API mode.`,
|
||||||
|
data: { skipped: true, reason: 'limited_api_support' },
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute an integration tool (Slack, Gmail, etc.)
|
||||||
|
* Uses the same logic as /api/copilot/execute-tool
|
||||||
|
*/
|
||||||
|
async function executeIntegrationTool(
|
||||||
|
toolName: string,
|
||||||
|
toolCallId: string,
|
||||||
|
args: Record<string, unknown>,
|
||||||
|
context: ToolExecutionContext
|
||||||
|
): Promise<ToolExecutionResult> {
|
||||||
|
const resolvedToolName = resolveToolId(toolName)
|
||||||
|
const toolConfig = getTool(resolvedToolName)
|
||||||
|
|
||||||
|
if (!toolConfig) {
|
||||||
|
// Tool not found - try server router as fallback
|
||||||
|
try {
|
||||||
|
const result = await routeExecution(toolName, args, { userId: context.userId })
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
status: 200,
|
||||||
|
data: result,
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
logger.warn('Tool not found', { toolName, resolvedToolName })
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
status: 200,
|
||||||
|
message: `Tool "${toolName}" not found. Skipped.`,
|
||||||
|
data: { skipped: true, reason: 'not_found' },
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get workspaceId for env vars
|
||||||
|
let workspaceId = context.workspaceId
|
||||||
|
if (!workspaceId && context.workflowId) {
|
||||||
|
const workflowResult = await db
|
||||||
|
.select({ workspaceId: workflow.workspaceId })
|
||||||
|
.from(workflow)
|
||||||
|
.where(eq(workflow.id, context.workflowId))
|
||||||
|
.limit(1)
|
||||||
|
workspaceId = workflowResult[0]?.workspaceId ?? undefined
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get decrypted environment variables
|
||||||
|
const decryptedEnvVars = await getEffectiveDecryptedEnv(context.userId, workspaceId)
|
||||||
|
|
||||||
|
// Resolve env var references in arguments
|
||||||
|
const executionParams: Record<string, unknown> = resolveEnvVarReferences(
|
||||||
|
args,
|
||||||
|
decryptedEnvVars,
|
||||||
|
{
|
||||||
|
resolveExactMatch: true,
|
||||||
|
allowEmbedded: true,
|
||||||
|
trimKeys: true,
|
||||||
|
onMissing: 'keep',
|
||||||
|
deep: true,
|
||||||
|
}
|
||||||
|
) as Record<string, unknown>
|
||||||
|
|
||||||
|
// Resolve OAuth access token if required
|
||||||
|
if (toolConfig.oauth?.required && toolConfig.oauth.provider) {
|
||||||
|
const provider = toolConfig.oauth.provider
|
||||||
|
|
||||||
|
try {
|
||||||
|
const accounts = await db
|
||||||
|
.select()
|
||||||
|
.from(account)
|
||||||
|
.where(and(eq(account.providerId, provider), eq(account.userId, context.userId)))
|
||||||
|
.limit(1)
|
||||||
|
|
||||||
|
if (accounts.length > 0) {
|
||||||
|
const acc = accounts[0]
|
||||||
|
const requestId = generateRequestId()
|
||||||
|
const { accessToken } = await refreshTokenIfNeeded(requestId, acc as any, acc.id)
|
||||||
|
|
||||||
|
if (accessToken) {
|
||||||
|
executionParams.accessToken = accessToken
|
||||||
|
} else {
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
status: 400,
|
||||||
|
message: `OAuth token not available for ${provider}. Please reconnect your account.`,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
status: 400,
|
||||||
|
message: `No ${provider} account connected. Please connect your account first.`,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
status: 500,
|
||||||
|
message: `Failed to get OAuth token for ${toolConfig.oauth.provider}`,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if tool requires an API key
|
||||||
|
const needsApiKey = toolConfig.params?.apiKey?.required
|
||||||
|
if (needsApiKey && !executionParams.apiKey) {
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
status: 400,
|
||||||
|
message: `API key not provided for ${toolName}.`,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add execution context
|
||||||
|
executionParams._context = {
|
||||||
|
workflowId: context.workflowId,
|
||||||
|
userId: context.userId,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Special handling for function_execute
|
||||||
|
if (toolName === 'function_execute') {
|
||||||
|
executionParams.envVars = decryptedEnvVars
|
||||||
|
executionParams.workflowVariables = {}
|
||||||
|
executionParams.blockData = {}
|
||||||
|
executionParams.blockNameMapping = {}
|
||||||
|
executionParams.language = executionParams.language || 'javascript'
|
||||||
|
executionParams.timeout = executionParams.timeout || 30000
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute the tool
|
||||||
|
const result = await executeTool(resolvedToolName, executionParams, true)
|
||||||
|
|
||||||
|
logger.info('Integration tool execution complete', {
|
||||||
|
toolName,
|
||||||
|
success: result.success,
|
||||||
|
})
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: result.success,
|
||||||
|
status: result.success ? 200 : 500,
|
||||||
|
message: result.error,
|
||||||
|
data: result.output,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mark a tool as complete with Sim Agent
|
||||||
|
*/
|
||||||
|
export async function markToolComplete(
|
||||||
|
toolCallId: string,
|
||||||
|
toolName: string,
|
||||||
|
result: ToolExecutionResult
|
||||||
|
): Promise<boolean> {
|
||||||
|
logger.info('Marking tool complete', {
|
||||||
|
toolCallId,
|
||||||
|
toolName,
|
||||||
|
success: result.success,
|
||||||
|
status: result.status,
|
||||||
|
})
|
||||||
|
|
||||||
|
try {
|
||||||
|
const response = await fetch(`${SIM_AGENT_API_URL}/api/tools/mark-complete`, {
|
||||||
|
method: 'POST',
|
||||||
|
headers: {
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
|
||||||
|
},
|
||||||
|
body: JSON.stringify({
|
||||||
|
id: toolCallId,
|
||||||
|
name: toolName,
|
||||||
|
status: result.status,
|
||||||
|
message: result.message,
|
||||||
|
data: result.data,
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
logger.error('Mark complete failed', { toolCallId, status: response.status })
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Mark complete error', {
|
||||||
|
toolCallId,
|
||||||
|
error: error instanceof Error ? error.message : String(error),
|
||||||
|
})
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generate an internal authentication token for server-to-server calls
|
||||||
|
*/
|
||||||
|
async function generateInternalToken(): Promise<string> {
|
||||||
|
// Use the same pattern as A2A for internal auth
|
||||||
|
const { generateInternalToken: genToken } = await import('@/app/api/a2a/serve/[agentId]/utils')
|
||||||
|
return genToken()
|
||||||
|
}
|
||||||
|
|
||||||
556
apps/sim/lib/copilot/stream-persistence.ts
Normal file
556
apps/sim/lib/copilot/stream-persistence.ts
Normal file
@@ -0,0 +1,556 @@
|
|||||||
|
/**
|
||||||
|
* Stream Persistence Service for Copilot
|
||||||
|
*
|
||||||
|
* Handles persisting copilot stream state to Redis (ephemeral) and database (permanent).
|
||||||
|
* Uses Redis LIST for chunk history and Pub/Sub for live updates (no polling).
|
||||||
|
*
|
||||||
|
* Redis Key Structure:
|
||||||
|
* - copilot:stream:{streamId}:meta → StreamMeta JSON (TTL: 10 min)
|
||||||
|
* - copilot:stream:{streamId}:chunks → LIST of chunks (for replay)
|
||||||
|
* - copilot:stream:{streamId} → Pub/Sub CHANNEL (for live updates)
|
||||||
|
* - copilot:active:{chatId} → streamId lookup
|
||||||
|
* - copilot:abort:{streamId} → abort signal flag
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { db } from '@sim/db'
|
||||||
|
import { copilotChats } from '@sim/db/schema'
|
||||||
|
import { createLogger } from '@sim/logger'
|
||||||
|
import { eq } from 'drizzle-orm'
|
||||||
|
import type Redis from 'ioredis'
|
||||||
|
import { getRedisClient } from '@/lib/core/config/redis'
|
||||||
|
|
||||||
|
const logger = createLogger('CopilotStreamPersistence')
|
||||||
|
|
||||||
|
const STREAM_TTL = 60 * 10 // 10 minutes
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tool call record stored in stream state
|
||||||
|
*/
|
||||||
|
export interface ToolCallRecord {
|
||||||
|
id: string
|
||||||
|
name: string
|
||||||
|
args: Record<string, unknown>
|
||||||
|
state: 'pending' | 'executing' | 'success' | 'error' | 'skipped'
|
||||||
|
result?: unknown
|
||||||
|
error?: string
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pending diff state for edit_workflow tool calls
|
||||||
|
*/
|
||||||
|
export interface PendingDiffState {
|
||||||
|
toolCallId: string
|
||||||
|
baselineWorkflow: unknown
|
||||||
|
proposedWorkflow: unknown
|
||||||
|
diffAnalysis: unknown
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stream metadata stored in Redis
|
||||||
|
*/
|
||||||
|
export interface StreamMeta {
|
||||||
|
id: string
|
||||||
|
status: 'streaming' | 'completed' | 'error'
|
||||||
|
chatId: string
|
||||||
|
userId: string
|
||||||
|
workflowId: string
|
||||||
|
userMessageId: string
|
||||||
|
isClientSession: boolean
|
||||||
|
toolCalls: ToolCallRecord[]
|
||||||
|
assistantContent: string
|
||||||
|
conversationId?: string
|
||||||
|
createdAt: number
|
||||||
|
updatedAt: number
|
||||||
|
/** Pending diff state if edit_workflow tool has changes waiting for review */
|
||||||
|
pendingDiff?: PendingDiffState
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parameters for creating a new stream
|
||||||
|
*/
|
||||||
|
export interface CreateStreamParams {
|
||||||
|
streamId: string
|
||||||
|
chatId: string
|
||||||
|
userId: string
|
||||||
|
workflowId: string
|
||||||
|
userMessageId: string
|
||||||
|
isClientSession: boolean
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============ WRITE OPERATIONS (used by original request handler) ============
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new stream state in Redis
|
||||||
|
*/
|
||||||
|
export async function createStream(params: CreateStreamParams): Promise<void> {
|
||||||
|
const redis = getRedisClient()
|
||||||
|
if (!redis) {
|
||||||
|
logger.warn('Redis not available, stream persistence disabled')
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
const meta: StreamMeta = {
|
||||||
|
id: params.streamId,
|
||||||
|
status: 'streaming',
|
||||||
|
chatId: params.chatId,
|
||||||
|
userId: params.userId,
|
||||||
|
workflowId: params.workflowId,
|
||||||
|
userMessageId: params.userMessageId,
|
||||||
|
isClientSession: params.isClientSession,
|
||||||
|
toolCalls: [],
|
||||||
|
assistantContent: '',
|
||||||
|
createdAt: Date.now(),
|
||||||
|
updatedAt: Date.now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
const metaKey = `copilot:stream:${params.streamId}:meta`
|
||||||
|
const activeKey = `copilot:active:${params.chatId}`
|
||||||
|
|
||||||
|
await redis.setex(metaKey, STREAM_TTL, JSON.stringify(meta))
|
||||||
|
await redis.setex(activeKey, STREAM_TTL, params.streamId)
|
||||||
|
|
||||||
|
logger.info('Created stream state', { streamId: params.streamId, chatId: params.chatId })
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Append a chunk to the stream buffer and publish for live subscribers
|
||||||
|
*/
|
||||||
|
export async function appendChunk(streamId: string, chunk: string): Promise<void> {
|
||||||
|
const redis = getRedisClient()
|
||||||
|
if (!redis) return
|
||||||
|
|
||||||
|
const listKey = `copilot:stream:${streamId}:chunks`
|
||||||
|
const channel = `copilot:stream:${streamId}`
|
||||||
|
|
||||||
|
// Push to list for replay, publish for live subscribers
|
||||||
|
await redis.rpush(listKey, chunk)
|
||||||
|
await redis.expire(listKey, STREAM_TTL)
|
||||||
|
await redis.publish(channel, chunk)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Append content to the accumulated assistant content
|
||||||
|
*/
|
||||||
|
export async function appendContent(streamId: string, content: string): Promise<void> {
|
||||||
|
const redis = getRedisClient()
|
||||||
|
if (!redis) return
|
||||||
|
|
||||||
|
const metaKey = `copilot:stream:${streamId}:meta`
|
||||||
|
const raw = await redis.get(metaKey)
|
||||||
|
if (!raw) return
|
||||||
|
|
||||||
|
const meta: StreamMeta = JSON.parse(raw)
|
||||||
|
meta.assistantContent += content
|
||||||
|
meta.updatedAt = Date.now()
|
||||||
|
|
||||||
|
await redis.setex(metaKey, STREAM_TTL, JSON.stringify(meta))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update stream metadata
|
||||||
|
*/
|
||||||
|
export async function updateMeta(streamId: string, update: Partial<StreamMeta>): Promise<void> {
|
||||||
|
const redis = getRedisClient()
|
||||||
|
if (!redis) return
|
||||||
|
|
||||||
|
const metaKey = `copilot:stream:${streamId}:meta`
|
||||||
|
const raw = await redis.get(metaKey)
|
||||||
|
if (!raw) return
|
||||||
|
|
||||||
|
const meta: StreamMeta = { ...JSON.parse(raw), ...update, updatedAt: Date.now() }
|
||||||
|
await redis.setex(metaKey, STREAM_TTL, JSON.stringify(meta))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update a specific tool call in the stream state
|
||||||
|
*/
|
||||||
|
export async function updateToolCall(
|
||||||
|
streamId: string,
|
||||||
|
toolCallId: string,
|
||||||
|
update: Partial<ToolCallRecord>
|
||||||
|
): Promise<void> {
|
||||||
|
const redis = getRedisClient()
|
||||||
|
if (!redis) return
|
||||||
|
|
||||||
|
const metaKey = `copilot:stream:${streamId}:meta`
|
||||||
|
const raw = await redis.get(metaKey)
|
||||||
|
if (!raw) return
|
||||||
|
|
||||||
|
const meta: StreamMeta = JSON.parse(raw)
|
||||||
|
const toolCallIndex = meta.toolCalls.findIndex((tc) => tc.id === toolCallId)
|
||||||
|
|
||||||
|
if (toolCallIndex >= 0) {
|
||||||
|
meta.toolCalls[toolCallIndex] = { ...meta.toolCalls[toolCallIndex], ...update }
|
||||||
|
} else {
|
||||||
|
// Add new tool call
|
||||||
|
meta.toolCalls.push({
|
||||||
|
id: toolCallId,
|
||||||
|
name: update.name || 'unknown',
|
||||||
|
args: update.args || {},
|
||||||
|
state: update.state || 'pending',
|
||||||
|
result: update.result,
|
||||||
|
error: update.error,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
meta.updatedAt = Date.now()
|
||||||
|
await redis.setex(metaKey, STREAM_TTL, JSON.stringify(meta))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Store pending diff state for a stream (called when edit_workflow creates a diff)
|
||||||
|
*/
|
||||||
|
export async function setPendingDiff(
|
||||||
|
streamId: string,
|
||||||
|
pendingDiff: PendingDiffState
|
||||||
|
): Promise<void> {
|
||||||
|
const redis = getRedisClient()
|
||||||
|
if (!redis) return
|
||||||
|
|
||||||
|
const metaKey = `copilot:stream:${streamId}:meta`
|
||||||
|
const raw = await redis.get(metaKey)
|
||||||
|
if (!raw) return
|
||||||
|
|
||||||
|
const meta: StreamMeta = JSON.parse(raw)
|
||||||
|
meta.pendingDiff = pendingDiff
|
||||||
|
meta.updatedAt = Date.now()
|
||||||
|
await redis.setex(metaKey, STREAM_TTL, JSON.stringify(meta))
|
||||||
|
logger.info('Stored pending diff for stream', { streamId, toolCallId: pendingDiff.toolCallId })
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear pending diff state (called when user accepts/rejects the diff)
|
||||||
|
*/
|
||||||
|
export async function clearPendingDiff(streamId: string): Promise<void> {
|
||||||
|
const redis = getRedisClient()
|
||||||
|
if (!redis) return
|
||||||
|
|
||||||
|
const metaKey = `copilot:stream:${streamId}:meta`
|
||||||
|
const raw = await redis.get(metaKey)
|
||||||
|
if (!raw) return
|
||||||
|
|
||||||
|
const meta: StreamMeta = JSON.parse(raw)
|
||||||
|
delete meta.pendingDiff
|
||||||
|
meta.updatedAt = Date.now()
|
||||||
|
await redis.setex(metaKey, STREAM_TTL, JSON.stringify(meta))
|
||||||
|
logger.info('Cleared pending diff for stream', { streamId })
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get pending diff state for a stream
|
||||||
|
*/
|
||||||
|
export async function getPendingDiff(streamId: string): Promise<PendingDiffState | null> {
|
||||||
|
const redis = getRedisClient()
|
||||||
|
if (!redis) return null
|
||||||
|
|
||||||
|
const meta = await getStreamMeta(streamId)
|
||||||
|
return meta?.pendingDiff || null
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Complete the stream - save to database and cleanup Redis
|
||||||
|
*/
|
||||||
|
export async function completeStream(streamId: string, conversationId?: string): Promise<void> {
|
||||||
|
const redis = getRedisClient()
|
||||||
|
if (!redis) return
|
||||||
|
|
||||||
|
const meta = await getStreamMeta(streamId)
|
||||||
|
if (!meta) return
|
||||||
|
|
||||||
|
// Publish completion event for subscribers
|
||||||
|
await redis.publish(`copilot:stream:${streamId}`, JSON.stringify({ type: 'stream_complete' }))
|
||||||
|
|
||||||
|
// Save to database
|
||||||
|
await saveToDatabase(meta, conversationId)
|
||||||
|
|
||||||
|
// Cleanup Redis
|
||||||
|
await redis.del(`copilot:stream:${streamId}:meta`)
|
||||||
|
await redis.del(`copilot:stream:${streamId}:chunks`)
|
||||||
|
await redis.del(`copilot:active:${meta.chatId}`)
|
||||||
|
await redis.del(`copilot:abort:${streamId}`)
|
||||||
|
|
||||||
|
logger.info('Completed stream', { streamId, chatId: meta.chatId })
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mark stream as errored and save partial content
|
||||||
|
*/
|
||||||
|
export async function errorStream(streamId: string, error: string): Promise<void> {
|
||||||
|
const redis = getRedisClient()
|
||||||
|
if (!redis) return
|
||||||
|
|
||||||
|
const meta = await getStreamMeta(streamId)
|
||||||
|
if (!meta) return
|
||||||
|
|
||||||
|
// Update status
|
||||||
|
meta.status = 'error'
|
||||||
|
|
||||||
|
// Publish error event for subscribers
|
||||||
|
await redis.publish(
|
||||||
|
`copilot:stream:${streamId}`,
|
||||||
|
JSON.stringify({ type: 'stream_error', error })
|
||||||
|
)
|
||||||
|
|
||||||
|
// Still save what we have to database
|
||||||
|
await saveToDatabase(meta)
|
||||||
|
|
||||||
|
// Cleanup Redis
|
||||||
|
await redis.del(`copilot:stream:${streamId}:meta`)
|
||||||
|
await redis.del(`copilot:stream:${streamId}:chunks`)
|
||||||
|
await redis.del(`copilot:active:${meta.chatId}`)
|
||||||
|
await redis.del(`copilot:abort:${streamId}`)
|
||||||
|
|
||||||
|
logger.info('Errored stream', { streamId, error })
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Save stream content to database as assistant message
|
||||||
|
*/
|
||||||
|
async function saveToDatabase(meta: StreamMeta, conversationId?: string): Promise<void> {
|
||||||
|
try {
|
||||||
|
const [chat] = await db
|
||||||
|
.select()
|
||||||
|
.from(copilotChats)
|
||||||
|
.where(eq(copilotChats.id, meta.chatId))
|
||||||
|
.limit(1)
|
||||||
|
|
||||||
|
if (!chat) {
|
||||||
|
logger.warn('Chat not found for stream save', { chatId: meta.chatId })
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
const existingMessages = Array.isArray(chat.messages) ? chat.messages : []
|
||||||
|
|
||||||
|
// Check if there's already an assistant message after the user message
|
||||||
|
// This can happen if the client already saved it before disconnecting
|
||||||
|
const userMessageIndex = existingMessages.findIndex(
|
||||||
|
(m: any) => m.id === meta.userMessageId && m.role === 'user'
|
||||||
|
)
|
||||||
|
|
||||||
|
// If there's already an assistant message right after the user message,
|
||||||
|
// the client may have already saved it - check if it's incomplete
|
||||||
|
if (userMessageIndex >= 0 && userMessageIndex < existingMessages.length - 1) {
|
||||||
|
const nextMessage = existingMessages[userMessageIndex + 1] as any
|
||||||
|
if (nextMessage?.role === 'assistant' && !nextMessage?.serverCompleted) {
|
||||||
|
// Client saved a partial message, update it with the complete content
|
||||||
|
const updatedMessages = existingMessages.map((m: any, idx: number) => {
|
||||||
|
if (idx === userMessageIndex + 1) {
|
||||||
|
return {
|
||||||
|
...m,
|
||||||
|
content: meta.assistantContent,
|
||||||
|
toolCalls: meta.toolCalls,
|
||||||
|
serverCompleted: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return m
|
||||||
|
})
|
||||||
|
|
||||||
|
await db
|
||||||
|
.update(copilotChats)
|
||||||
|
.set({
|
||||||
|
messages: updatedMessages,
|
||||||
|
conversationId: conversationId || (chat.conversationId as string | undefined),
|
||||||
|
updatedAt: new Date(),
|
||||||
|
})
|
||||||
|
.where(eq(copilotChats.id, meta.chatId))
|
||||||
|
|
||||||
|
logger.info('Updated existing assistant message in database', {
|
||||||
|
streamId: meta.id,
|
||||||
|
chatId: meta.chatId,
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build the assistant message
|
||||||
|
const assistantMessage = {
|
||||||
|
id: crypto.randomUUID(),
|
||||||
|
role: 'assistant',
|
||||||
|
content: meta.assistantContent,
|
||||||
|
toolCalls: meta.toolCalls,
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
serverCompleted: true, // Mark that this was completed server-side
|
||||||
|
}
|
||||||
|
|
||||||
|
const updatedMessages = [...existingMessages, assistantMessage]
|
||||||
|
|
||||||
|
await db
|
||||||
|
.update(copilotChats)
|
||||||
|
.set({
|
||||||
|
messages: updatedMessages,
|
||||||
|
conversationId: conversationId || (chat.conversationId as string | undefined),
|
||||||
|
updatedAt: new Date(),
|
||||||
|
})
|
||||||
|
.where(eq(copilotChats.id, meta.chatId))
|
||||||
|
|
||||||
|
logger.info('Saved stream to database', {
|
||||||
|
streamId: meta.id,
|
||||||
|
chatId: meta.chatId,
|
||||||
|
contentLength: meta.assistantContent.length,
|
||||||
|
toolCallsCount: meta.toolCalls.length,
|
||||||
|
})
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Failed to save stream to database', { streamId: meta.id, error })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============ READ OPERATIONS (used by resume handler) ============
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get stream metadata
|
||||||
|
*/
|
||||||
|
export async function getStreamMeta(streamId: string): Promise<StreamMeta | null> {
|
||||||
|
const redis = getRedisClient()
|
||||||
|
if (!redis) return null
|
||||||
|
|
||||||
|
const raw = await redis.get(`copilot:stream:${streamId}:meta`)
|
||||||
|
return raw ? JSON.parse(raw) : null
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get chunks from stream history (for replay)
|
||||||
|
*/
|
||||||
|
export async function getChunks(streamId: string, fromIndex: number = 0): Promise<string[]> {
|
||||||
|
const redis = getRedisClient()
|
||||||
|
if (!redis) return []
|
||||||
|
|
||||||
|
const listKey = `copilot:stream:${streamId}:chunks`
|
||||||
|
return redis.lrange(listKey, fromIndex, -1)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of chunks in the stream
|
||||||
|
*/
|
||||||
|
export async function getChunkCount(streamId: string): Promise<number> {
|
||||||
|
const redis = getRedisClient()
|
||||||
|
if (!redis) return 0
|
||||||
|
|
||||||
|
const listKey = `copilot:stream:${streamId}:chunks`
|
||||||
|
return redis.llen(listKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get active stream ID for a chat (if any)
|
||||||
|
*/
|
||||||
|
export async function getActiveStreamForChat(chatId: string): Promise<string | null> {
|
||||||
|
const redis = getRedisClient()
|
||||||
|
if (!redis) return null
|
||||||
|
|
||||||
|
return redis.get(`copilot:active:${chatId}`)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============ SUBSCRIPTION (for resume handler) ============
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subscribe to live stream updates.
|
||||||
|
* Uses Redis Pub/Sub - no polling, fully event-driven.
|
||||||
|
*
|
||||||
|
* @param streamId - Stream to subscribe to
|
||||||
|
* @param onChunk - Callback for each new chunk
|
||||||
|
* @param onComplete - Callback when stream completes
|
||||||
|
* @param signal - Optional AbortSignal to cancel subscription
|
||||||
|
*/
|
||||||
|
export async function subscribeToStream(
|
||||||
|
streamId: string,
|
||||||
|
onChunk: (chunk: string) => void,
|
||||||
|
onComplete: () => void,
|
||||||
|
signal?: AbortSignal
|
||||||
|
): Promise<void> {
|
||||||
|
const redis = getRedisClient()
|
||||||
|
if (!redis) {
|
||||||
|
onComplete()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a separate Redis connection for subscription
|
||||||
|
const subscriber = redis.duplicate()
|
||||||
|
const channel = `copilot:stream:${streamId}`
|
||||||
|
|
||||||
|
let isComplete = false
|
||||||
|
|
||||||
|
const cleanup = () => {
|
||||||
|
if (!isComplete) {
|
||||||
|
isComplete = true
|
||||||
|
subscriber.unsubscribe(channel).catch(() => {})
|
||||||
|
subscriber.quit().catch(() => {})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
signal?.addEventListener('abort', cleanup)
|
||||||
|
|
||||||
|
await subscriber.subscribe(channel)
|
||||||
|
|
||||||
|
subscriber.on('message', (ch, message) => {
|
||||||
|
if (ch !== channel) return
|
||||||
|
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(message)
|
||||||
|
if (parsed.type === 'stream_complete' || parsed.type === 'stream_error') {
|
||||||
|
cleanup()
|
||||||
|
onComplete()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Not a control message, just a chunk
|
||||||
|
}
|
||||||
|
|
||||||
|
onChunk(message)
|
||||||
|
})
|
||||||
|
|
||||||
|
subscriber.on('error', (err) => {
|
||||||
|
logger.error('Subscriber error', { streamId, error: err })
|
||||||
|
cleanup()
|
||||||
|
onComplete()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============ ABORT HANDLING ============
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set abort signal for a stream.
|
||||||
|
* The original request handler should check this and cancel if set.
|
||||||
|
*/
|
||||||
|
export async function setAbortSignal(streamId: string): Promise<void> {
|
||||||
|
const redis = getRedisClient()
|
||||||
|
if (!redis) return
|
||||||
|
|
||||||
|
await redis.setex(`copilot:abort:${streamId}`, 60, '1')
|
||||||
|
// Also publish to channel so handler sees it immediately
|
||||||
|
await redis.publish(`copilot:stream:${streamId}`, JSON.stringify({ type: 'abort' }))
|
||||||
|
|
||||||
|
logger.info('Set abort signal', { streamId })
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if abort signal is set for a stream
|
||||||
|
*/
|
||||||
|
export async function checkAbortSignal(streamId: string): Promise<boolean> {
|
||||||
|
const redis = getRedisClient()
|
||||||
|
if (!redis) return false
|
||||||
|
|
||||||
|
const val = await redis.get(`copilot:abort:${streamId}`)
|
||||||
|
return val === '1'
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear abort signal for a stream
|
||||||
|
*/
|
||||||
|
export async function clearAbortSignal(streamId: string): Promise<void> {
|
||||||
|
const redis = getRedisClient()
|
||||||
|
if (!redis) return
|
||||||
|
|
||||||
|
await redis.del(`copilot:abort:${streamId}`)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Refresh TTL on all stream keys (call periodically during long streams)
|
||||||
|
*/
|
||||||
|
export async function refreshStreamTTL(streamId: string, chatId: string): Promise<void> {
|
||||||
|
const redis = getRedisClient()
|
||||||
|
if (!redis) return
|
||||||
|
|
||||||
|
await redis.expire(`copilot:stream:${streamId}:meta`, STREAM_TTL)
|
||||||
|
await redis.expire(`copilot:stream:${streamId}:chunks`, STREAM_TTL)
|
||||||
|
await redis.expire(`copilot:active:${chatId}`, STREAM_TTL)
|
||||||
|
}
|
||||||
|
|
||||||
953
apps/sim/lib/copilot/stream-transformer.ts
Normal file
953
apps/sim/lib/copilot/stream-transformer.ts
Normal file
@@ -0,0 +1,953 @@
|
|||||||
|
/**
|
||||||
|
* Stream Transformer - Converts Sim Agent SSE to Render Events
|
||||||
|
*
|
||||||
|
* This module processes the raw SSE stream from Sim Agent, executes tools,
|
||||||
|
* persists to the database, and emits render events for the client.
|
||||||
|
*
|
||||||
|
* The client receives only render events and just needs to render them.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { createLogger } from '@sim/logger'
|
||||||
|
import { routeExecution } from '@/lib/copilot/tools/server/router'
|
||||||
|
import { isClientOnlyTool } from '@/lib/copilot/tools/client/ui-config'
|
||||||
|
import { env } from '@/lib/core/config/env'
|
||||||
|
import {
|
||||||
|
type RenderEvent,
|
||||||
|
type ToolDisplay,
|
||||||
|
createRenderEvent,
|
||||||
|
resetSeqCounter,
|
||||||
|
serializeRenderEvent,
|
||||||
|
} from './render-events'
|
||||||
|
import { SIM_AGENT_API_URL_DEFAULT } from './constants'
|
||||||
|
|
||||||
|
const logger = createLogger('StreamTransformer')
|
||||||
|
const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Types
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
export interface StreamTransformContext {
|
||||||
|
streamId: string
|
||||||
|
chatId: string
|
||||||
|
userId: string
|
||||||
|
workflowId?: string
|
||||||
|
userMessageId: string
|
||||||
|
assistantMessageId: string
|
||||||
|
|
||||||
|
/** Callback to emit render events (sent to client via SSE) */
|
||||||
|
onRenderEvent: (event: RenderEvent) => Promise<void>
|
||||||
|
|
||||||
|
/** Callback to persist state (called at key moments) */
|
||||||
|
onPersist?: (data: PersistData) => Promise<void>
|
||||||
|
|
||||||
|
/** Callback to check if stream is aborted */
|
||||||
|
isAborted?: () => boolean
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface PersistData {
|
||||||
|
type: 'content' | 'tool_call' | 'message_complete'
|
||||||
|
content?: string
|
||||||
|
toolCall?: {
|
||||||
|
id: string
|
||||||
|
name: string
|
||||||
|
args: Record<string, unknown>
|
||||||
|
state: 'pending' | 'executing' | 'success' | 'error'
|
||||||
|
result?: unknown
|
||||||
|
}
|
||||||
|
messageComplete?: boolean
|
||||||
|
}
|
||||||
|
|
||||||
|
// Track state during stream processing
|
||||||
|
interface TransformState {
|
||||||
|
// Content accumulation
|
||||||
|
assistantContent: string
|
||||||
|
|
||||||
|
// Thinking block state
|
||||||
|
inThinkingBlock: boolean
|
||||||
|
thinkingContent: string
|
||||||
|
|
||||||
|
// Plan capture
|
||||||
|
inPlanCapture: boolean
|
||||||
|
planContent: string
|
||||||
|
|
||||||
|
// Options capture
|
||||||
|
inOptionsCapture: boolean
|
||||||
|
optionsContent: string
|
||||||
|
|
||||||
|
// Tool call tracking
|
||||||
|
toolCalls: Map<
|
||||||
|
string,
|
||||||
|
{
|
||||||
|
id: string
|
||||||
|
name: string
|
||||||
|
args: Record<string, unknown>
|
||||||
|
state: 'pending' | 'generating' | 'executing' | 'success' | 'error'
|
||||||
|
result?: unknown
|
||||||
|
}
|
||||||
|
>
|
||||||
|
|
||||||
|
// Subagent tracking
|
||||||
|
activeSubagent: string | null // parentToolCallId
|
||||||
|
subagentToolCalls: Map<string, string> // toolCallId -> parentToolCallId
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Main Transformer
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process a Sim Agent SSE stream and emit render events
|
||||||
|
*/
|
||||||
|
export async function transformStream(
|
||||||
|
agentStream: ReadableStream<Uint8Array>,
|
||||||
|
context: StreamTransformContext
|
||||||
|
): Promise<void> {
|
||||||
|
const { streamId, chatId, userMessageId, assistantMessageId, onRenderEvent, isAborted } = context
|
||||||
|
|
||||||
|
// Reset sequence counter for new stream
|
||||||
|
resetSeqCounter()
|
||||||
|
|
||||||
|
const state: TransformState = {
|
||||||
|
assistantContent: '',
|
||||||
|
inThinkingBlock: false,
|
||||||
|
thinkingContent: '',
|
||||||
|
inPlanCapture: false,
|
||||||
|
planContent: '',
|
||||||
|
inOptionsCapture: false,
|
||||||
|
optionsContent: '',
|
||||||
|
toolCalls: new Map(),
|
||||||
|
activeSubagent: null,
|
||||||
|
subagentToolCalls: new Map(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Emit stream start
|
||||||
|
await emitEvent(onRenderEvent, 'stream_start', {
|
||||||
|
streamId,
|
||||||
|
chatId,
|
||||||
|
userMessageId,
|
||||||
|
assistantMessageId,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Emit message start for assistant
|
||||||
|
await emitEvent(onRenderEvent, 'message_start', {
|
||||||
|
messageId: assistantMessageId,
|
||||||
|
role: 'assistant',
|
||||||
|
})
|
||||||
|
|
||||||
|
const reader = agentStream.getReader()
|
||||||
|
const decoder = new TextDecoder()
|
||||||
|
let buffer = ''
|
||||||
|
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
// Check for abort
|
||||||
|
if (isAborted?.()) {
|
||||||
|
logger.info('Stream aborted by user', { streamId })
|
||||||
|
// Abort any in-progress tools
|
||||||
|
for (const [toolCallId, tool] of state.toolCalls) {
|
||||||
|
if (tool.state === 'pending' || tool.state === 'executing') {
|
||||||
|
await emitEvent(onRenderEvent, 'tool_aborted', {
|
||||||
|
toolCallId,
|
||||||
|
reason: 'User aborted',
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
const { done, value } = await reader.read()
|
||||||
|
if (done) break
|
||||||
|
|
||||||
|
buffer += decoder.decode(value, { stream: true })
|
||||||
|
|
||||||
|
// Process complete SSE lines
|
||||||
|
const lines = buffer.split('\n')
|
||||||
|
buffer = lines.pop() || '' // Keep incomplete line in buffer
|
||||||
|
|
||||||
|
for (const line of lines) {
|
||||||
|
if (!line.startsWith('data: ') || line.length <= 6) continue
|
||||||
|
|
||||||
|
try {
|
||||||
|
const event = JSON.parse(line.slice(6))
|
||||||
|
await processSimAgentEvent(event, state, context)
|
||||||
|
} catch (e) {
|
||||||
|
logger.warn('Failed to parse SSE event', { line: line.slice(0, 100) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process any remaining buffer
|
||||||
|
if (buffer.startsWith('data: ')) {
|
||||||
|
try {
|
||||||
|
const event = JSON.parse(buffer.slice(6))
|
||||||
|
await processSimAgentEvent(event, state, context)
|
||||||
|
} catch {}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finalize thinking block if still open
|
||||||
|
if (state.inThinkingBlock) {
|
||||||
|
await emitEvent(onRenderEvent, 'thinking_end', {})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finalize plan if still open
|
||||||
|
if (state.inPlanCapture) {
|
||||||
|
await finalizePlan(state, context)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finalize options if still open
|
||||||
|
if (state.inOptionsCapture) {
|
||||||
|
await finalizeOptions(state, context)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Emit message end
|
||||||
|
await emitEvent(onRenderEvent, 'message_end', { messageId: assistantMessageId })
|
||||||
|
|
||||||
|
// Emit stream end
|
||||||
|
await emitEvent(onRenderEvent, 'stream_end', {})
|
||||||
|
|
||||||
|
// Persist final message
|
||||||
|
await context.onPersist?.({
|
||||||
|
type: 'message_complete',
|
||||||
|
content: state.assistantContent,
|
||||||
|
messageComplete: true,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Emit message saved
|
||||||
|
await emitEvent(onRenderEvent, 'message_saved', {
|
||||||
|
messageId: assistantMessageId,
|
||||||
|
refreshFromDb: false,
|
||||||
|
})
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Stream transform error', { error, streamId })
|
||||||
|
await emitEvent(onRenderEvent, 'stream_error', {
|
||||||
|
error: error instanceof Error ? error.message : 'Unknown error',
|
||||||
|
})
|
||||||
|
} finally {
|
||||||
|
reader.releaseLock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Event Processing
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
async function processSimAgentEvent(
|
||||||
|
event: any,
|
||||||
|
state: TransformState,
|
||||||
|
context: StreamTransformContext
|
||||||
|
): Promise<void> {
|
||||||
|
const { onRenderEvent } = context
|
||||||
|
|
||||||
|
switch (event.type) {
|
||||||
|
// ========== Content Events ==========
|
||||||
|
case 'content':
|
||||||
|
await handleContent(event, state, context)
|
||||||
|
break
|
||||||
|
|
||||||
|
// ========== Thinking Events ==========
|
||||||
|
case 'thinking':
|
||||||
|
await handleThinking(event, state, context)
|
||||||
|
break
|
||||||
|
|
||||||
|
// ========== Tool Call Events ==========
|
||||||
|
case 'tool_call':
|
||||||
|
await handleToolCall(event, state, context)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'tool_generating':
|
||||||
|
await handleToolGenerating(event, state, context)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'tool_result':
|
||||||
|
await handleToolResult(event, state, context)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'tool_error':
|
||||||
|
await handleToolError(event, state, context)
|
||||||
|
break
|
||||||
|
|
||||||
|
// ========== Plan Events ==========
|
||||||
|
case 'plan_capture_start':
|
||||||
|
state.inPlanCapture = true
|
||||||
|
state.planContent = ''
|
||||||
|
await emitEvent(onRenderEvent, 'plan_start', {})
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'plan_capture':
|
||||||
|
if (state.inPlanCapture && event.data) {
|
||||||
|
state.planContent += event.data
|
||||||
|
await emitEvent(onRenderEvent, 'plan_delta', { content: event.data })
|
||||||
|
}
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'plan_capture_end':
|
||||||
|
await finalizePlan(state, context)
|
||||||
|
break
|
||||||
|
|
||||||
|
// ========== Options Events ==========
|
||||||
|
case 'options_stream_start':
|
||||||
|
state.inOptionsCapture = true
|
||||||
|
state.optionsContent = ''
|
||||||
|
await emitEvent(onRenderEvent, 'options_start', {})
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'options_stream':
|
||||||
|
if (state.inOptionsCapture && event.data) {
|
||||||
|
state.optionsContent += event.data
|
||||||
|
await emitEvent(onRenderEvent, 'options_delta', { content: event.data })
|
||||||
|
}
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'options_stream_end':
|
||||||
|
await finalizeOptions(state, context)
|
||||||
|
break
|
||||||
|
|
||||||
|
// ========== Subagent Events ==========
|
||||||
|
case 'subagent_start':
|
||||||
|
await handleSubagentStart(event, state, context)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'subagent_end':
|
||||||
|
await handleSubagentEnd(event, state, context)
|
||||||
|
break
|
||||||
|
|
||||||
|
// ========== Response Events ==========
|
||||||
|
case 'response_done':
|
||||||
|
// Final response from Sim Agent
|
||||||
|
logger.debug('Response done received', { streamId: context.streamId })
|
||||||
|
break
|
||||||
|
|
||||||
|
default:
|
||||||
|
logger.debug('Unknown Sim Agent event type', { type: event.type })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Content Handling
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
async function handleContent(
|
||||||
|
event: any,
|
||||||
|
state: TransformState,
|
||||||
|
context: StreamTransformContext
|
||||||
|
): Promise<void> {
|
||||||
|
const content = event.data
|
||||||
|
if (!content) return
|
||||||
|
|
||||||
|
state.assistantContent += content
|
||||||
|
|
||||||
|
// Check for thinking block markers
|
||||||
|
if (content.includes('<think>') || content.includes('<thinking>')) {
|
||||||
|
state.inThinkingBlock = true
|
||||||
|
await context.onRenderEvent(createRenderEvent('thinking_start', {}))
|
||||||
|
// Don't emit the marker as text
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if (content.includes('</think>') || content.includes('</thinking>')) {
|
||||||
|
state.inThinkingBlock = false
|
||||||
|
await context.onRenderEvent(createRenderEvent('thinking_end', {}))
|
||||||
|
// Don't emit the marker as text
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Route to appropriate handler
|
||||||
|
if (state.inThinkingBlock) {
|
||||||
|
state.thinkingContent += content
|
||||||
|
await context.onRenderEvent(createRenderEvent('thinking_delta', { content }))
|
||||||
|
} else {
|
||||||
|
await context.onRenderEvent(createRenderEvent('text_delta', { content }))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Thinking Handling
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
async function handleThinking(
|
||||||
|
event: any,
|
||||||
|
state: TransformState,
|
||||||
|
context: StreamTransformContext
|
||||||
|
): Promise<void> {
|
||||||
|
const content = event.data || event.thinking
|
||||||
|
if (!content) return
|
||||||
|
|
||||||
|
// Start thinking block if not already
|
||||||
|
if (!state.inThinkingBlock) {
|
||||||
|
state.inThinkingBlock = true
|
||||||
|
await context.onRenderEvent(createRenderEvent('thinking_start', {}))
|
||||||
|
}
|
||||||
|
|
||||||
|
state.thinkingContent += content
|
||||||
|
await context.onRenderEvent(createRenderEvent('thinking_delta', { content }))
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Tool Call Handling
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
async function handleToolCall(
|
||||||
|
event: any,
|
||||||
|
state: TransformState,
|
||||||
|
context: StreamTransformContext
|
||||||
|
): Promise<void> {
|
||||||
|
const { onRenderEvent, userId, workflowId } = context
|
||||||
|
const data = event.data || event
|
||||||
|
const { id: toolCallId, name: toolName, arguments: args, partial } = data
|
||||||
|
|
||||||
|
if (!toolCallId || !toolName) return
|
||||||
|
|
||||||
|
// Check if this is a subagent tool call
|
||||||
|
const isSubagentTool = state.activeSubagent !== null
|
||||||
|
|
||||||
|
// Track the tool call
|
||||||
|
const existingTool = state.toolCalls.get(toolCallId)
|
||||||
|
|
||||||
|
if (partial) {
|
||||||
|
// Streaming args
|
||||||
|
if (!existingTool) {
|
||||||
|
state.toolCalls.set(toolCallId, {
|
||||||
|
id: toolCallId,
|
||||||
|
name: toolName,
|
||||||
|
args: args || {},
|
||||||
|
state: 'generating',
|
||||||
|
})
|
||||||
|
if (isSubagentTool) {
|
||||||
|
state.subagentToolCalls.set(toolCallId, state.activeSubagent!)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
existingTool.args = { ...existingTool.args, ...args }
|
||||||
|
}
|
||||||
|
|
||||||
|
const display = getToolDisplay(toolName, 'generating')
|
||||||
|
|
||||||
|
if (isSubagentTool) {
|
||||||
|
await emitEvent(onRenderEvent, 'subagent_tool_generating', {
|
||||||
|
parentToolCallId: state.activeSubagent!,
|
||||||
|
toolCallId,
|
||||||
|
argsDelta: JSON.stringify(args),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
await emitEvent(onRenderEvent, 'tool_generating', {
|
||||||
|
toolCallId,
|
||||||
|
argsPartial: existingTool?.args || args,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Complete tool call - ready to execute
|
||||||
|
const finalArgs = args || existingTool?.args || {}
|
||||||
|
|
||||||
|
state.toolCalls.set(toolCallId, {
|
||||||
|
id: toolCallId,
|
||||||
|
name: toolName,
|
||||||
|
args: finalArgs,
|
||||||
|
state: 'pending',
|
||||||
|
})
|
||||||
|
|
||||||
|
if (isSubagentTool) {
|
||||||
|
state.subagentToolCalls.set(toolCallId, state.activeSubagent!)
|
||||||
|
}
|
||||||
|
|
||||||
|
const display = getToolDisplay(toolName, 'pending')
|
||||||
|
|
||||||
|
// Emit pending event
|
||||||
|
if (isSubagentTool) {
|
||||||
|
await emitEvent(onRenderEvent, 'subagent_tool_pending', {
|
||||||
|
parentToolCallId: state.activeSubagent!,
|
||||||
|
toolCallId,
|
||||||
|
toolName,
|
||||||
|
args: finalArgs,
|
||||||
|
display,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
await emitEvent(onRenderEvent, 'tool_pending', {
|
||||||
|
toolCallId,
|
||||||
|
toolName,
|
||||||
|
args: finalArgs,
|
||||||
|
display,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if this tool needs user approval (interrupt)
|
||||||
|
const needsInterrupt = checkToolNeedsInterrupt(toolName, finalArgs)
|
||||||
|
if (needsInterrupt) {
|
||||||
|
const options = getInterruptOptions(toolName, finalArgs)
|
||||||
|
await emitEvent(onRenderEvent, 'interrupt_show', {
|
||||||
|
toolCallId,
|
||||||
|
toolName,
|
||||||
|
options,
|
||||||
|
})
|
||||||
|
// Don't execute yet - wait for interrupt resolution
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if this is a client-only tool
|
||||||
|
if (isClientOnlyTool(toolName)) {
|
||||||
|
logger.info('Skipping client-only tool on server', { toolName, toolCallId })
|
||||||
|
// Client will handle this tool
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute tool server-side - NON-BLOCKING for parallel execution
|
||||||
|
// Fire off the execution and let tool_result event handle the completion
|
||||||
|
executeToolServerSide(toolCallId, toolName, finalArgs, state, context).catch((err) => {
|
||||||
|
logger.error('Tool execution failed (async)', { toolCallId, toolName, error: err })
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async function handleToolGenerating(
|
||||||
|
event: any,
|
||||||
|
state: TransformState,
|
||||||
|
context: StreamTransformContext
|
||||||
|
): Promise<void> {
|
||||||
|
const toolCallId = event.toolCallId || event.data?.id
|
||||||
|
if (!toolCallId) return
|
||||||
|
|
||||||
|
const isSubagentTool = state.subagentToolCalls.has(toolCallId)
|
||||||
|
|
||||||
|
if (isSubagentTool) {
|
||||||
|
await emitEvent(context.onRenderEvent, 'subagent_tool_generating', {
|
||||||
|
parentToolCallId: state.subagentToolCalls.get(toolCallId)!,
|
||||||
|
toolCallId,
|
||||||
|
argsDelta: event.data,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
await emitEvent(context.onRenderEvent, 'tool_generating', {
|
||||||
|
toolCallId,
|
||||||
|
argsDelta: event.data,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function handleToolResult(
|
||||||
|
event: any,
|
||||||
|
state: TransformState,
|
||||||
|
context: StreamTransformContext
|
||||||
|
): Promise<void> {
|
||||||
|
const toolCallId = event.toolCallId || event.data?.id
|
||||||
|
const success = event.success !== false
|
||||||
|
const result = event.result || event.data?.result
|
||||||
|
|
||||||
|
if (!toolCallId) return
|
||||||
|
|
||||||
|
const tool = state.toolCalls.get(toolCallId)
|
||||||
|
|
||||||
|
// Skip if tool already in terminal state (server-side execution already emitted events)
|
||||||
|
if (tool && (tool.state === 'success' || tool.state === 'error')) {
|
||||||
|
logger.debug('Skipping duplicate tool_result event', { toolCallId, currentState: tool.state })
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tool) {
|
||||||
|
tool.state = success ? 'success' : 'error'
|
||||||
|
tool.result = result
|
||||||
|
}
|
||||||
|
|
||||||
|
const isSubagentTool = state.subagentToolCalls.has(toolCallId)
|
||||||
|
const display = getToolDisplay(tool?.name || '', success ? 'success' : 'error')
|
||||||
|
|
||||||
|
if (isSubagentTool) {
|
||||||
|
await emitEvent(context.onRenderEvent, success ? 'subagent_tool_success' : 'subagent_tool_error', {
|
||||||
|
parentToolCallId: state.subagentToolCalls.get(toolCallId)!,
|
||||||
|
toolCallId,
|
||||||
|
...(success ? { result, display } : { error: event.error || 'Tool failed' }),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
if (success) {
|
||||||
|
const successEvent: any = {
|
||||||
|
toolCallId,
|
||||||
|
result,
|
||||||
|
display,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if this was an edit_workflow that created a diff
|
||||||
|
if (tool?.name === 'edit_workflow' && result?.workflowState) {
|
||||||
|
successEvent.workflowId = context.workflowId
|
||||||
|
successEvent.hasDiff = true
|
||||||
|
}
|
||||||
|
|
||||||
|
await emitEvent(context.onRenderEvent, 'tool_success', successEvent)
|
||||||
|
} else {
|
||||||
|
await emitEvent(context.onRenderEvent, 'tool_error', {
|
||||||
|
toolCallId,
|
||||||
|
error: event.error || 'Tool failed',
|
||||||
|
display,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Persist tool call result
|
||||||
|
await context.onPersist?.({
|
||||||
|
type: 'tool_call',
|
||||||
|
toolCall: {
|
||||||
|
id: toolCallId,
|
||||||
|
name: tool?.name || '',
|
||||||
|
args: tool?.args || {},
|
||||||
|
state: success ? 'success' : 'error',
|
||||||
|
result,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async function handleToolError(
|
||||||
|
event: any,
|
||||||
|
state: TransformState,
|
||||||
|
context: StreamTransformContext
|
||||||
|
): Promise<void> {
|
||||||
|
const toolCallId = event.toolCallId || event.data?.id
|
||||||
|
const error = event.error || event.data?.error || 'Tool execution failed'
|
||||||
|
|
||||||
|
if (!toolCallId) return
|
||||||
|
|
||||||
|
const tool = state.toolCalls.get(toolCallId)
|
||||||
|
if (tool) {
|
||||||
|
tool.state = 'error'
|
||||||
|
}
|
||||||
|
|
||||||
|
const isSubagentTool = state.subagentToolCalls.has(toolCallId)
|
||||||
|
const display = getToolDisplay(tool?.name || '', 'error')
|
||||||
|
|
||||||
|
if (isSubagentTool) {
|
||||||
|
await emitEvent(context.onRenderEvent, 'subagent_tool_error', {
|
||||||
|
parentToolCallId: state.subagentToolCalls.get(toolCallId)!,
|
||||||
|
toolCallId,
|
||||||
|
error,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
await emitEvent(context.onRenderEvent, 'tool_error', {
|
||||||
|
toolCallId,
|
||||||
|
error,
|
||||||
|
display,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Tool Execution
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
async function executeToolServerSide(
|
||||||
|
toolCallId: string,
|
||||||
|
toolName: string,
|
||||||
|
args: Record<string, unknown>,
|
||||||
|
state: TransformState,
|
||||||
|
context: StreamTransformContext
|
||||||
|
): Promise<void> {
|
||||||
|
const { onRenderEvent, userId, workflowId } = context
|
||||||
|
const isSubagentTool = state.subagentToolCalls.has(toolCallId)
|
||||||
|
|
||||||
|
// Update state to executing
|
||||||
|
const tool = state.toolCalls.get(toolCallId)
|
||||||
|
if (tool) {
|
||||||
|
tool.state = 'executing'
|
||||||
|
}
|
||||||
|
|
||||||
|
const display = getToolDisplay(toolName, 'executing')
|
||||||
|
|
||||||
|
// Emit executing event
|
||||||
|
if (isSubagentTool) {
|
||||||
|
await emitEvent(onRenderEvent, 'subagent_tool_executing', {
|
||||||
|
parentToolCallId: state.subagentToolCalls.get(toolCallId)!,
|
||||||
|
toolCallId,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
await emitEvent(onRenderEvent, 'tool_executing', {
|
||||||
|
toolCallId,
|
||||||
|
display,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Add workflowId to args if available
|
||||||
|
const execArgs = { ...args }
|
||||||
|
if (workflowId && !execArgs.workflowId) {
|
||||||
|
execArgs.workflowId = workflowId
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute the tool via the router
|
||||||
|
const result = await routeExecution(toolName, execArgs, { userId })
|
||||||
|
|
||||||
|
// Update state
|
||||||
|
if (tool) {
|
||||||
|
tool.state = 'success'
|
||||||
|
tool.result = result
|
||||||
|
}
|
||||||
|
|
||||||
|
// Emit success event
|
||||||
|
const successDisplay = getToolDisplay(toolName, 'success')
|
||||||
|
|
||||||
|
if (isSubagentTool) {
|
||||||
|
await emitEvent(onRenderEvent, 'subagent_tool_success', {
|
||||||
|
parentToolCallId: state.subagentToolCalls.get(toolCallId)!,
|
||||||
|
toolCallId,
|
||||||
|
result,
|
||||||
|
display: successDisplay,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
const successEvent: any = {
|
||||||
|
toolCallId,
|
||||||
|
result,
|
||||||
|
display: successDisplay,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if this was an edit_workflow that created a diff
|
||||||
|
if (toolName === 'edit_workflow' && result?.workflowState) {
|
||||||
|
successEvent.workflowId = workflowId
|
||||||
|
successEvent.hasDiff = true
|
||||||
|
|
||||||
|
// Emit diff_ready so client knows to read from DB
|
||||||
|
await emitEvent(onRenderEvent, 'diff_ready', {
|
||||||
|
workflowId: workflowId || '',
|
||||||
|
toolCallId,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
await emitEvent(onRenderEvent, 'tool_success', successEvent)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify Sim Agent that tool is complete
|
||||||
|
await markToolComplete(toolCallId, toolName, true, result)
|
||||||
|
|
||||||
|
// Persist tool result
|
||||||
|
await context.onPersist?.({
|
||||||
|
type: 'tool_call',
|
||||||
|
toolCall: {
|
||||||
|
id: toolCallId,
|
||||||
|
name: toolName,
|
||||||
|
args,
|
||||||
|
state: 'success',
|
||||||
|
result,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
} catch (error) {
|
||||||
|
const errorMessage = error instanceof Error ? error.message : 'Tool execution failed'
|
||||||
|
logger.error('Tool execution failed', { toolCallId, toolName, error: errorMessage })
|
||||||
|
|
||||||
|
// Update state
|
||||||
|
if (tool) {
|
||||||
|
tool.state = 'error'
|
||||||
|
}
|
||||||
|
|
||||||
|
const errorDisplay = getToolDisplay(toolName, 'error')
|
||||||
|
|
||||||
|
// Emit error event
|
||||||
|
if (isSubagentTool) {
|
||||||
|
await emitEvent(onRenderEvent, 'subagent_tool_error', {
|
||||||
|
parentToolCallId: state.subagentToolCalls.get(toolCallId)!,
|
||||||
|
toolCallId,
|
||||||
|
error: errorMessage,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
await emitEvent(onRenderEvent, 'tool_error', {
|
||||||
|
toolCallId,
|
||||||
|
error: errorMessage,
|
||||||
|
display: errorDisplay,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify Sim Agent that tool failed
|
||||||
|
await markToolComplete(toolCallId, toolName, false, undefined, errorMessage)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function markToolComplete(
|
||||||
|
toolCallId: string,
|
||||||
|
toolName: string,
|
||||||
|
success: boolean,
|
||||||
|
result?: unknown,
|
||||||
|
error?: string
|
||||||
|
): Promise<void> {
|
||||||
|
try {
|
||||||
|
const response = await fetch(`${SIM_AGENT_API_URL}/api/tools/mark-complete`, {
|
||||||
|
method: 'POST',
|
||||||
|
headers: {
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
|
||||||
|
},
|
||||||
|
body: JSON.stringify({
|
||||||
|
id: toolCallId,
|
||||||
|
name: toolName,
|
||||||
|
status: success ? 200 : 500,
|
||||||
|
message: success
|
||||||
|
? (result as Record<string, unknown> | undefined)?.message || 'Success'
|
||||||
|
: error,
|
||||||
|
data: success ? result : undefined,
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
logger.warn('Failed to mark tool complete', { toolCallId, status: response.status })
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
logger.error('Error marking tool complete', { toolCallId, error: e })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Subagent Handling
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
async function handleSubagentStart(
|
||||||
|
event: any,
|
||||||
|
state: TransformState,
|
||||||
|
context: StreamTransformContext
|
||||||
|
): Promise<void> {
|
||||||
|
const parentToolCallId = event.parentToolCallId || event.data?.parentToolCallId
|
||||||
|
const subagentId = event.subagentId || event.data?.subagentId || parentToolCallId
|
||||||
|
const label = event.label || event.data?.label
|
||||||
|
|
||||||
|
if (!parentToolCallId) return
|
||||||
|
|
||||||
|
state.activeSubagent = parentToolCallId
|
||||||
|
|
||||||
|
await emitEvent(context.onRenderEvent, 'subagent_start', {
|
||||||
|
parentToolCallId,
|
||||||
|
subagentId,
|
||||||
|
label,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async function handleSubagentEnd(
|
||||||
|
event: any,
|
||||||
|
state: TransformState,
|
||||||
|
context: StreamTransformContext
|
||||||
|
): Promise<void> {
|
||||||
|
const parentToolCallId = event.parentToolCallId || event.data?.parentToolCallId || state.activeSubagent
|
||||||
|
|
||||||
|
if (!parentToolCallId) return
|
||||||
|
|
||||||
|
state.activeSubagent = null
|
||||||
|
|
||||||
|
await emitEvent(context.onRenderEvent, 'subagent_end', {
|
||||||
|
parentToolCallId,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Plan & Options Handling
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
async function finalizePlan(state: TransformState, context: StreamTransformContext): Promise<void> {
|
||||||
|
if (!state.inPlanCapture) return
|
||||||
|
|
||||||
|
state.inPlanCapture = false
|
||||||
|
|
||||||
|
// Parse todos from plan content
|
||||||
|
const todos = parseTodosFromPlan(state.planContent)
|
||||||
|
|
||||||
|
await emitEvent(context.onRenderEvent, 'plan_end', { todos })
|
||||||
|
}
|
||||||
|
|
||||||
|
async function finalizeOptions(
|
||||||
|
state: TransformState,
|
||||||
|
context: StreamTransformContext
|
||||||
|
): Promise<void> {
|
||||||
|
if (!state.inOptionsCapture) return
|
||||||
|
|
||||||
|
state.inOptionsCapture = false
|
||||||
|
|
||||||
|
// Parse options from content
|
||||||
|
const options = parseOptionsFromContent(state.optionsContent)
|
||||||
|
|
||||||
|
await emitEvent(context.onRenderEvent, 'options_end', { options })
|
||||||
|
}
|
||||||
|
|
||||||
|
function parseTodosFromPlan(content: string): Array<{ id: string; content: string; status: 'pending' }> {
|
||||||
|
const todos: Array<{ id: string; content: string; status: 'pending' }> = []
|
||||||
|
const lines = content.split('\n')
|
||||||
|
|
||||||
|
for (const line of lines) {
|
||||||
|
const match = line.match(/^[-*]\s+(.+)$/)
|
||||||
|
if (match) {
|
||||||
|
todos.push({
|
||||||
|
id: `todo_${Date.now()}_${todos.length}`,
|
||||||
|
content: match[1].trim(),
|
||||||
|
status: 'pending',
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return todos
|
||||||
|
}
|
||||||
|
|
||||||
|
function parseOptionsFromContent(content: string): string[] {
|
||||||
|
try {
|
||||||
|
// Try to parse as JSON array
|
||||||
|
const parsed = JSON.parse(content)
|
||||||
|
if (Array.isArray(parsed)) {
|
||||||
|
return parsed.filter((o) => typeof o === 'string')
|
||||||
|
}
|
||||||
|
} catch {}
|
||||||
|
|
||||||
|
// Fall back to splitting by newlines
|
||||||
|
return content
|
||||||
|
.split('\n')
|
||||||
|
.map((l) => l.trim())
|
||||||
|
.filter((l) => l.length > 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Helpers
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
function getToolDisplay(
|
||||||
|
toolName: string,
|
||||||
|
state: 'pending' | 'generating' | 'executing' | 'success' | 'error'
|
||||||
|
): ToolDisplay {
|
||||||
|
// Default displays based on state
|
||||||
|
const stateLabels: Record<string, string> = {
|
||||||
|
pending: 'Pending...',
|
||||||
|
generating: 'Preparing...',
|
||||||
|
executing: 'Running...',
|
||||||
|
success: 'Completed',
|
||||||
|
error: 'Failed',
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tool-specific labels
|
||||||
|
const toolLabels: Record<string, string> = {
|
||||||
|
edit_workflow: 'Editing workflow',
|
||||||
|
get_user_workflow: 'Reading workflow',
|
||||||
|
get_block_config: 'Getting block config',
|
||||||
|
get_blocks_and_tools: 'Loading blocks',
|
||||||
|
get_credentials: 'Checking credentials',
|
||||||
|
run_workflow: 'Running workflow',
|
||||||
|
knowledge_base: 'Searching knowledge base',
|
||||||
|
navigate_ui: 'Navigating',
|
||||||
|
tour: 'Starting tour',
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
label: toolLabels[toolName] || toolName.replace(/_/g, ' '),
|
||||||
|
description: stateLabels[state],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function checkToolNeedsInterrupt(toolName: string, args: Record<string, unknown>): boolean {
|
||||||
|
// Tools that always need user approval
|
||||||
|
const interruptTools = ['deploy_api', 'deploy_chat', 'deploy_mcp', 'delete_workflow']
|
||||||
|
return interruptTools.includes(toolName)
|
||||||
|
}
|
||||||
|
|
||||||
|
function getInterruptOptions(
|
||||||
|
toolName: string,
|
||||||
|
args: Record<string, unknown>
|
||||||
|
): Array<{ id: string; label: string; description?: string; variant?: 'default' | 'destructive' | 'outline' }> {
|
||||||
|
// Default interrupt options
|
||||||
|
return [
|
||||||
|
{ id: 'approve', label: 'Approve', variant: 'default' },
|
||||||
|
{ id: 'reject', label: 'Cancel', variant: 'outline' },
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
async function emitEvent<T extends RenderEvent['type']>(
|
||||||
|
onRenderEvent: (event: RenderEvent) => Promise<void>,
|
||||||
|
type: T,
|
||||||
|
data: Omit<Extract<RenderEvent, { type: T }>, 'type' | 'seq' | 'ts'>
|
||||||
|
): Promise<void> {
|
||||||
|
const event = createRenderEvent(type, data)
|
||||||
|
await onRenderEvent(event)
|
||||||
|
}
|
||||||
|
|
||||||
@@ -5,6 +5,9 @@
|
|||||||
* Import this module early in the app to ensure all tool configs are available.
|
* Import this module early in the app to ensure all tool configs are available.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Navigation tools
|
||||||
|
import './navigation/navigate-ui'
|
||||||
|
|
||||||
// Other tools (subagents)
|
// Other tools (subagents)
|
||||||
import './other/auth'
|
import './other/auth'
|
||||||
import './other/custom-tool'
|
import './other/custom-tool'
|
||||||
@@ -41,6 +44,7 @@ export {
|
|||||||
getToolUIConfig,
|
getToolUIConfig,
|
||||||
hasInterrupt,
|
hasInterrupt,
|
||||||
type InterruptConfig,
|
type InterruptConfig,
|
||||||
|
isClientOnlyTool,
|
||||||
isSpecialTool,
|
isSpecialTool,
|
||||||
isSubagentTool,
|
isSubagentTool,
|
||||||
type ParamsTableConfig,
|
type ParamsTableConfig,
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import {
|
|||||||
type BaseClientToolMetadata,
|
type BaseClientToolMetadata,
|
||||||
ClientToolCallState,
|
ClientToolCallState,
|
||||||
} from '@/lib/copilot/tools/client/base-tool'
|
} from '@/lib/copilot/tools/client/base-tool'
|
||||||
|
import { registerToolUIConfig } from '@/lib/copilot/tools/client/ui-config'
|
||||||
import { useCopilotStore } from '@/stores/panel/copilot/store'
|
import { useCopilotStore } from '@/stores/panel/copilot/store'
|
||||||
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
||||||
|
|
||||||
@@ -239,3 +240,12 @@ export class NavigateUIClientTool extends BaseClientTool {
|
|||||||
await this.handleAccept(args)
|
await this.handleAccept(args)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Register UI config at module load - clientOnly because this requires browser navigation
|
||||||
|
registerToolUIConfig(NavigateUIClientTool.id, {
|
||||||
|
clientOnly: true,
|
||||||
|
interrupt: {
|
||||||
|
accept: { text: 'Open', icon: Navigation },
|
||||||
|
reject: { text: 'Skip', icon: XCircle },
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ export class TourClientTool extends BaseClientTool {
|
|||||||
[ClientToolCallState.aborted]: { text: 'Aborted tour', icon: XCircle },
|
[ClientToolCallState.aborted]: { text: 'Aborted tour', icon: XCircle },
|
||||||
},
|
},
|
||||||
uiConfig: {
|
uiConfig: {
|
||||||
|
clientOnly: true, // Tour requires browser UI to guide the user
|
||||||
subagent: {
|
subagent: {
|
||||||
streamingLabel: 'Touring',
|
streamingLabel: 'Touring',
|
||||||
completedLabel: 'Tour complete',
|
completedLabel: 'Tour complete',
|
||||||
|
|||||||
@@ -172,6 +172,13 @@ export interface ToolUIConfig {
|
|||||||
* The tool-call component will use this to render specialized content.
|
* The tool-call component will use this to render specialized content.
|
||||||
*/
|
*/
|
||||||
customRenderer?: 'code' | 'edit_summary' | 'none'
|
customRenderer?: 'code' | 'edit_summary' | 'none'
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether this tool requires a client/browser session to execute.
|
||||||
|
* Client-only tools (like navigate_ui, tour) cannot run in headless/API mode.
|
||||||
|
* In API-only mode, these tools will be skipped with a message.
|
||||||
|
*/
|
||||||
|
clientOnly?: boolean
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -215,6 +222,14 @@ export function hasInterrupt(toolName: string): boolean {
|
|||||||
return !!toolUIConfigs[toolName]?.interrupt
|
return !!toolUIConfigs[toolName]?.interrupt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if a tool is client-only (requires browser session).
|
||||||
|
* Client-only tools cannot execute in headless/API mode.
|
||||||
|
*/
|
||||||
|
export function isClientOnlyTool(toolName: string): boolean {
|
||||||
|
return !!toolUIConfigs[toolName]?.clientOnly
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get subagent labels for a tool
|
* Get subagent labels for a tool
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -1,4 +1,6 @@
|
|||||||
import { createLogger } from '@sim/logger'
|
import { createLogger } from '@sim/logger'
|
||||||
|
import crypto from 'crypto'
|
||||||
|
import { acquireLock, releaseLock } from '@/lib/core/config/redis'
|
||||||
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
|
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
|
||||||
import { getBlockConfigServerTool } from '@/lib/copilot/tools/server/blocks/get-block-config'
|
import { getBlockConfigServerTool } from '@/lib/copilot/tools/server/blocks/get-block-config'
|
||||||
import { getBlockOptionsServerTool } from '@/lib/copilot/tools/server/blocks/get-block-options'
|
import { getBlockOptionsServerTool } from '@/lib/copilot/tools/server/blocks/get-block-options'
|
||||||
@@ -30,6 +32,15 @@ import {
|
|||||||
GetTriggerBlocksResult,
|
GetTriggerBlocksResult,
|
||||||
} from '@/lib/copilot/tools/shared/schemas'
|
} from '@/lib/copilot/tools/shared/schemas'
|
||||||
|
|
||||||
|
/** Lock expiry in seconds for edit_workflow operations */
|
||||||
|
const EDIT_WORKFLOW_LOCK_EXPIRY = 30
|
||||||
|
|
||||||
|
/** Maximum wait time in ms before giving up on acquiring the lock */
|
||||||
|
const EDIT_WORKFLOW_LOCK_TIMEOUT = 15000
|
||||||
|
|
||||||
|
/** Delay between lock acquisition retries in ms */
|
||||||
|
const EDIT_WORKFLOW_LOCK_RETRY_DELAY = 100
|
||||||
|
|
||||||
// Generic execute response schemas (success path only for this route; errors handled via HTTP status)
|
// Generic execute response schemas (success path only for this route; errors handled via HTTP status)
|
||||||
export { ExecuteResponseSuccessSchema }
|
export { ExecuteResponseSuccessSchema }
|
||||||
export type ExecuteResponseSuccess = (typeof ExecuteResponseSuccessSchema)['_type']
|
export type ExecuteResponseSuccess = (typeof ExecuteResponseSuccessSchema)['_type']
|
||||||
@@ -53,6 +64,30 @@ serverToolRegistry[getCredentialsServerTool.name] = getCredentialsServerTool
|
|||||||
serverToolRegistry[makeApiRequestServerTool.name] = makeApiRequestServerTool
|
serverToolRegistry[makeApiRequestServerTool.name] = makeApiRequestServerTool
|
||||||
serverToolRegistry[knowledgeBaseServerTool.name] = knowledgeBaseServerTool
|
serverToolRegistry[knowledgeBaseServerTool.name] = knowledgeBaseServerTool
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquire a lock with retries for workflow-mutating operations
|
||||||
|
*/
|
||||||
|
async function acquireLockWithRetry(
|
||||||
|
lockKey: string,
|
||||||
|
lockValue: string,
|
||||||
|
expirySeconds: number,
|
||||||
|
timeoutMs: number,
|
||||||
|
retryDelayMs: number
|
||||||
|
): Promise<boolean> {
|
||||||
|
const startTime = Date.now()
|
||||||
|
|
||||||
|
while (Date.now() - startTime < timeoutMs) {
|
||||||
|
const acquired = await acquireLock(lockKey, lockValue, expirySeconds)
|
||||||
|
if (acquired) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
// Wait before retrying
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, retryDelayMs))
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
export async function routeExecution(
|
export async function routeExecution(
|
||||||
toolName: string,
|
toolName: string,
|
||||||
payload: unknown,
|
payload: unknown,
|
||||||
@@ -93,23 +128,74 @@ export async function routeExecution(
|
|||||||
args = KnowledgeBaseInput.parse(args)
|
args = KnowledgeBaseInput.parse(args)
|
||||||
}
|
}
|
||||||
|
|
||||||
const result = await tool.execute(args, context)
|
// For edit_workflow, acquire a per-workflow lock to prevent race conditions
|
||||||
|
// when multiple edit_workflow calls happen in parallel for the same workflow
|
||||||
|
let lockKey: string | null = null
|
||||||
|
let lockValue: string | null = null
|
||||||
|
|
||||||
if (toolName === 'get_blocks_and_tools') {
|
if (toolName === 'edit_workflow' && args.workflowId) {
|
||||||
return GetBlocksAndToolsResult.parse(result)
|
lockKey = `copilot:edit_workflow:lock:${args.workflowId}`
|
||||||
}
|
lockValue = crypto.randomUUID()
|
||||||
if (toolName === 'get_blocks_metadata') {
|
|
||||||
return GetBlocksMetadataResult.parse(result)
|
const acquired = await acquireLockWithRetry(
|
||||||
}
|
lockKey,
|
||||||
if (toolName === 'get_block_options') {
|
lockValue,
|
||||||
return GetBlockOptionsResult.parse(result)
|
EDIT_WORKFLOW_LOCK_EXPIRY,
|
||||||
}
|
EDIT_WORKFLOW_LOCK_TIMEOUT,
|
||||||
if (toolName === 'get_block_config') {
|
EDIT_WORKFLOW_LOCK_RETRY_DELAY
|
||||||
return GetBlockConfigResult.parse(result)
|
)
|
||||||
}
|
|
||||||
if (toolName === 'get_trigger_blocks') {
|
if (!acquired) {
|
||||||
return GetTriggerBlocksResult.parse(result)
|
logger.warn('Failed to acquire edit_workflow lock after timeout', {
|
||||||
|
workflowId: args.workflowId,
|
||||||
|
timeoutMs: EDIT_WORKFLOW_LOCK_TIMEOUT,
|
||||||
|
})
|
||||||
|
throw new Error(
|
||||||
|
'Workflow is currently being edited by another operation. Please try again shortly.'
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.debug('Acquired edit_workflow lock', {
|
||||||
|
workflowId: args.workflowId,
|
||||||
|
lockKey,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return result
|
try {
|
||||||
|
const result = await tool.execute(args, context)
|
||||||
|
|
||||||
|
if (toolName === 'get_blocks_and_tools') {
|
||||||
|
return GetBlocksAndToolsResult.parse(result)
|
||||||
|
}
|
||||||
|
if (toolName === 'get_blocks_metadata') {
|
||||||
|
return GetBlocksMetadataResult.parse(result)
|
||||||
|
}
|
||||||
|
if (toolName === 'get_block_options') {
|
||||||
|
return GetBlockOptionsResult.parse(result)
|
||||||
|
}
|
||||||
|
if (toolName === 'get_block_config') {
|
||||||
|
return GetBlockConfigResult.parse(result)
|
||||||
|
}
|
||||||
|
if (toolName === 'get_trigger_blocks') {
|
||||||
|
return GetTriggerBlocksResult.parse(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
} finally {
|
||||||
|
// Always release the lock if we acquired one
|
||||||
|
if (lockKey && lockValue) {
|
||||||
|
const released = await releaseLock(lockKey, lockValue)
|
||||||
|
if (released) {
|
||||||
|
logger.debug('Released edit_workflow lock', {
|
||||||
|
workflowId: args.workflowId,
|
||||||
|
lockKey,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
logger.warn('Failed to release edit_workflow lock (may have expired)', {
|
||||||
|
workflowId: args.workflowId,
|
||||||
|
lockKey,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2550,7 +2550,7 @@ export const editWorkflowServerTool: BaseServerTool<EditWorkflowParams, any> = {
|
|||||||
name: 'edit_workflow',
|
name: 'edit_workflow',
|
||||||
async execute(params: EditWorkflowParams, context?: { userId: string }): Promise<any> {
|
async execute(params: EditWorkflowParams, context?: { userId: string }): Promise<any> {
|
||||||
const logger = createLogger('EditWorkflowServerTool')
|
const logger = createLogger('EditWorkflowServerTool')
|
||||||
const { operations, workflowId, currentUserWorkflow } = params
|
const { operations, workflowId } = params
|
||||||
if (!Array.isArray(operations) || operations.length === 0) {
|
if (!Array.isArray(operations) || operations.length === 0) {
|
||||||
throw new Error('operations are required and must be an array')
|
throw new Error('operations are required and must be an array')
|
||||||
}
|
}
|
||||||
@@ -2559,22 +2559,14 @@ export const editWorkflowServerTool: BaseServerTool<EditWorkflowParams, any> = {
|
|||||||
logger.info('Executing edit_workflow', {
|
logger.info('Executing edit_workflow', {
|
||||||
operationCount: operations.length,
|
operationCount: operations.length,
|
||||||
workflowId,
|
workflowId,
|
||||||
hasCurrentUserWorkflow: !!currentUserWorkflow,
|
|
||||||
})
|
})
|
||||||
|
|
||||||
// Get current workflow state
|
// Always fetch from DB to ensure we have the latest state.
|
||||||
let workflowState: any
|
// This is critical because multiple edit_workflow calls may execute
|
||||||
if (currentUserWorkflow) {
|
// sequentially (via locking), and each must see the previous call's changes.
|
||||||
try {
|
// The AI-provided currentUserWorkflow may be stale.
|
||||||
workflowState = JSON.parse(currentUserWorkflow)
|
const fromDb = await getCurrentWorkflowStateFromDb(workflowId)
|
||||||
} catch (error) {
|
const workflowState = fromDb.workflowState
|
||||||
logger.error('Failed to parse currentUserWorkflow', error)
|
|
||||||
throw new Error('Invalid currentUserWorkflow format')
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
const fromDb = await getCurrentWorkflowStateFromDb(workflowId)
|
|
||||||
workflowState = fromDb.workflowState
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get permission config for the user
|
// Get permission config for the user
|
||||||
const permissionConfig = context?.userId ? await getUserPermissionConfig(context.userId) : null
|
const permissionConfig = context?.userId ? await getUserPermissionConfig(context.userId) : null
|
||||||
@@ -2659,16 +2651,42 @@ export const editWorkflowServerTool: BaseServerTool<EditWorkflowParams, any> = {
|
|||||||
logger.warn('No userId in context - skipping custom tools persistence', { workflowId })
|
logger.warn('No userId in context - skipping custom tools persistence', { workflowId })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const finalWorkflowState = validation.sanitizedState || modifiedWorkflowState
|
||||||
|
|
||||||
logger.info('edit_workflow successfully applied operations', {
|
logger.info('edit_workflow successfully applied operations', {
|
||||||
operationCount: operations.length,
|
operationCount: operations.length,
|
||||||
blocksCount: Object.keys(modifiedWorkflowState.blocks).length,
|
blocksCount: Object.keys(finalWorkflowState.blocks).length,
|
||||||
edgesCount: modifiedWorkflowState.edges.length,
|
edgesCount: finalWorkflowState.edges.length,
|
||||||
inputValidationErrors: validationErrors.length,
|
inputValidationErrors: validationErrors.length,
|
||||||
skippedItemsCount: skippedItems.length,
|
skippedItemsCount: skippedItems.length,
|
||||||
schemaValidationErrors: validation.errors.length,
|
schemaValidationErrors: validation.errors.length,
|
||||||
validationWarnings: validation.warnings.length,
|
validationWarnings: validation.warnings.length,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// IMPORTANT: Persist the workflow state to DB BEFORE returning.
|
||||||
|
// This ensures that subsequent edit_workflow calls (which fetch from DB)
|
||||||
|
// will see the latest state. Without this, there's a race condition where
|
||||||
|
// the client persists AFTER the lock is released, and another edit_workflow
|
||||||
|
// call can see stale state.
|
||||||
|
try {
|
||||||
|
const { saveWorkflowToNormalizedTables } = await import(
|
||||||
|
'@/lib/workflows/persistence/utils'
|
||||||
|
)
|
||||||
|
await saveWorkflowToNormalizedTables(workflowId, finalWorkflowState)
|
||||||
|
logger.info('Persisted workflow state to DB before returning', {
|
||||||
|
workflowId,
|
||||||
|
blocksCount: Object.keys(finalWorkflowState.blocks).length,
|
||||||
|
edgesCount: finalWorkflowState.edges.length,
|
||||||
|
})
|
||||||
|
} catch (persistError) {
|
||||||
|
logger.error('Failed to persist workflow state to DB', {
|
||||||
|
workflowId,
|
||||||
|
error: persistError instanceof Error ? persistError.message : String(persistError),
|
||||||
|
})
|
||||||
|
// Don't throw - we still want to return the modified state
|
||||||
|
// The client will also try to persist, which may succeed
|
||||||
|
}
|
||||||
|
|
||||||
// Format validation errors for LLM feedback
|
// Format validation errors for LLM feedback
|
||||||
const inputErrors =
|
const inputErrors =
|
||||||
validationErrors.length > 0
|
validationErrors.length > 0
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -156,6 +156,13 @@ export interface CopilotState {
|
|||||||
|
|
||||||
// Message queue for messages sent while another is in progress
|
// Message queue for messages sent while another is in progress
|
||||||
messageQueue: QueuedMessage[]
|
messageQueue: QueuedMessage[]
|
||||||
|
|
||||||
|
// Stream resumption state
|
||||||
|
activeStreamId: string | null
|
||||||
|
isResuming: boolean
|
||||||
|
|
||||||
|
// Track if abort was user-initiated (vs browser refresh)
|
||||||
|
userInitiatedAbort: boolean
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface CopilotActions {
|
export interface CopilotActions {
|
||||||
@@ -249,6 +256,12 @@ export interface CopilotActions {
|
|||||||
moveUpInQueue: (id: string) => void
|
moveUpInQueue: (id: string) => void
|
||||||
sendNow: (id: string) => Promise<void>
|
sendNow: (id: string) => Promise<void>
|
||||||
clearQueue: () => void
|
clearQueue: () => void
|
||||||
|
|
||||||
|
// Stream resumption actions
|
||||||
|
checkForActiveStream: (chatId: string) => Promise<boolean>
|
||||||
|
resumeActiveStream: (streamId: string) => Promise<void>
|
||||||
|
setActiveStreamId: (streamId: string | null) => void
|
||||||
|
restorePendingDiff: (streamId: string) => Promise<void>
|
||||||
}
|
}
|
||||||
|
|
||||||
export type CopilotStore = CopilotState & CopilotActions
|
export type CopilotStore = CopilotState & CopilotActions
|
||||||
|
|||||||
@@ -20,6 +20,50 @@ import {
|
|||||||
persistWorkflowStateToServer,
|
persistWorkflowStateToServer,
|
||||||
} from './utils'
|
} from './utils'
|
||||||
|
|
||||||
|
/** Get the active stream ID from copilot store (lazy import to avoid circular deps) */
|
||||||
|
async function getActiveStreamId(): Promise<string | null> {
|
||||||
|
try {
|
||||||
|
const { useCopilotStore } = await import('@/stores/panel/copilot/store')
|
||||||
|
return useCopilotStore.getState().activeStreamId
|
||||||
|
} catch {
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Save pending diff to server (Redis) via API */
|
||||||
|
async function savePendingDiffToServer(
|
||||||
|
streamId: string,
|
||||||
|
pendingDiff: {
|
||||||
|
toolCallId: string
|
||||||
|
baselineWorkflow: unknown
|
||||||
|
proposedWorkflow: unknown
|
||||||
|
diffAnalysis: unknown
|
||||||
|
}
|
||||||
|
): Promise<void> {
|
||||||
|
try {
|
||||||
|
await fetch(`/api/copilot/stream/${streamId}/pending-diff`, {
|
||||||
|
method: 'POST',
|
||||||
|
headers: { 'Content-Type': 'application/json' },
|
||||||
|
credentials: 'include',
|
||||||
|
body: JSON.stringify({ pendingDiff }),
|
||||||
|
})
|
||||||
|
} catch (err) {
|
||||||
|
logger.warn('Failed to save pending diff to server', { error: err })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Clear pending diff from server (Redis) via API */
|
||||||
|
async function clearPendingDiffFromServer(streamId: string): Promise<void> {
|
||||||
|
try {
|
||||||
|
await fetch(`/api/copilot/stream/${streamId}/pending-diff`, {
|
||||||
|
method: 'DELETE',
|
||||||
|
credentials: 'include',
|
||||||
|
})
|
||||||
|
} catch {
|
||||||
|
// Ignore errors - not critical
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const logger = createLogger('WorkflowDiffStore')
|
const logger = createLogger('WorkflowDiffStore')
|
||||||
const diffEngine = new WorkflowDiffEngine()
|
const diffEngine = new WorkflowDiffEngine()
|
||||||
|
|
||||||
@@ -169,32 +213,35 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
|
|||||||
edges: candidateState.edges?.length || 0,
|
edges: candidateState.edges?.length || 0,
|
||||||
})
|
})
|
||||||
|
|
||||||
// BACKGROUND: Broadcast and persist without blocking
|
// Persist pending diff to Redis for resumption on page refresh
|
||||||
// These operations happen after the UI has already updated
|
getActiveStreamId().then((streamId) => {
|
||||||
const cleanState = stripWorkflowDiffMarkers(cloneWorkflowState(candidateState))
|
if (streamId) {
|
||||||
|
findLatestEditWorkflowToolCallId().then((toolCallId) => {
|
||||||
// Fire and forget: broadcast to other users (don't await)
|
if (toolCallId) {
|
||||||
enqueueReplaceWorkflowState({
|
savePendingDiffToServer(streamId, {
|
||||||
workflowId: activeWorkflowId,
|
toolCallId,
|
||||||
state: cleanState,
|
baselineWorkflow: baselineWorkflow,
|
||||||
}).catch((error) => {
|
proposedWorkflow: candidateState,
|
||||||
logger.warn('Failed to broadcast workflow state (non-blocking)', { error })
|
diffAnalysis: diffAnalysisResult,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
// Fire and forget: persist to database (don't await)
|
// NOTE: We do NOT broadcast to other users here (to prevent socket errors on refresh).
|
||||||
persistWorkflowStateToServer(activeWorkflowId, candidateState)
|
// But we DO persist to database WITH diff markers so the proposed state survives page refresh
|
||||||
|
// and the diff UI can be restored. Final broadcast (without markers) happens when user accepts.
|
||||||
|
persistWorkflowStateToServer(activeWorkflowId, candidateState, { preserveDiffMarkers: true })
|
||||||
.then((persisted) => {
|
.then((persisted) => {
|
||||||
if (!persisted) {
|
if (!persisted) {
|
||||||
logger.warn('Failed to persist copilot edits (state already applied locally)')
|
logger.warn('Failed to persist diff preview state')
|
||||||
// Don't revert - user can retry or state will sync on next save
|
|
||||||
} else {
|
} else {
|
||||||
logger.info('Workflow diff persisted to database', {
|
logger.info('Diff preview state persisted with markers', { workflowId: activeWorkflowId })
|
||||||
workflowId: activeWorkflowId,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.catch((error) => {
|
.catch((error) => {
|
||||||
logger.warn('Failed to persist workflow state (non-blocking)', { error })
|
logger.warn('Failed to persist diff preview state', { error })
|
||||||
})
|
})
|
||||||
|
|
||||||
// Emit event for undo/redo recording
|
// Emit event for undo/redo recording
|
||||||
@@ -212,6 +259,37 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
|
restoreDiffWithBaseline: (baselineWorkflow, proposedWorkflow, diffAnalysis) => {
|
||||||
|
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
|
||||||
|
if (!activeWorkflowId) {
|
||||||
|
logger.error('Cannot restore diff without an active workflow')
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info('Restoring diff with baseline', {
|
||||||
|
workflowId: activeWorkflowId,
|
||||||
|
hasBaseline: !!baselineWorkflow,
|
||||||
|
newBlocks: diffAnalysis.new_blocks?.length || 0,
|
||||||
|
editedBlocks: diffAnalysis.edited_blocks?.length || 0,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Set the diff state with the provided baseline
|
||||||
|
batchedUpdate({
|
||||||
|
hasActiveDiff: true,
|
||||||
|
isShowingDiff: true,
|
||||||
|
isDiffReady: true,
|
||||||
|
baselineWorkflow: baselineWorkflow,
|
||||||
|
baselineWorkflowId: activeWorkflowId,
|
||||||
|
diffAnalysis: diffAnalysis,
|
||||||
|
diffMetadata: null,
|
||||||
|
diffError: null,
|
||||||
|
})
|
||||||
|
|
||||||
|
// The proposed workflow should already be applied (blocks have is_diff markers)
|
||||||
|
// Just re-apply the markers to ensure they're visible
|
||||||
|
setTimeout(() => get().reapplyDiffMarkers(), 0)
|
||||||
|
},
|
||||||
|
|
||||||
clearDiff: ({ restoreBaseline = true } = {}) => {
|
clearDiff: ({ restoreBaseline = true } = {}) => {
|
||||||
const { baselineWorkflow, baselineWorkflowId } = get()
|
const { baselineWorkflow, baselineWorkflowId } = get()
|
||||||
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
|
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
|
||||||
@@ -292,6 +370,13 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
|
|||||||
const baselineForUndo = get().baselineWorkflow
|
const baselineForUndo = get().baselineWorkflow
|
||||||
const triggerMessageId = get()._triggerMessageId
|
const triggerMessageId = get()._triggerMessageId
|
||||||
|
|
||||||
|
// Clear pending diff from Redis (fire and forget)
|
||||||
|
getActiveStreamId().then((streamId) => {
|
||||||
|
if (streamId) {
|
||||||
|
clearPendingDiffFromServer(streamId)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
// Clear diff state FIRST to prevent flash of colors
|
// Clear diff state FIRST to prevent flash of colors
|
||||||
// This must happen synchronously before applying the cleaned state
|
// This must happen synchronously before applying the cleaned state
|
||||||
set({
|
set({
|
||||||
@@ -312,6 +397,32 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
|
|||||||
// Now apply the cleaned state
|
// Now apply the cleaned state
|
||||||
applyWorkflowStateToStores(activeWorkflowId, stateToApply)
|
applyWorkflowStateToStores(activeWorkflowId, stateToApply)
|
||||||
|
|
||||||
|
// Broadcast and persist the accepted changes
|
||||||
|
const cleanStateForBroadcast = stripWorkflowDiffMarkers(cloneWorkflowState(stateToApply))
|
||||||
|
|
||||||
|
// Fire and forget: broadcast to other users
|
||||||
|
enqueueReplaceWorkflowState({
|
||||||
|
workflowId: activeWorkflowId,
|
||||||
|
state: cleanStateForBroadcast,
|
||||||
|
}).catch((error) => {
|
||||||
|
logger.warn('Failed to broadcast accepted workflow state', { error })
|
||||||
|
})
|
||||||
|
|
||||||
|
// Fire and forget: persist to database
|
||||||
|
persistWorkflowStateToServer(activeWorkflowId, stateToApply)
|
||||||
|
.then((persisted) => {
|
||||||
|
if (!persisted) {
|
||||||
|
logger.warn('Failed to persist accepted workflow changes')
|
||||||
|
} else {
|
||||||
|
logger.info('Accepted workflow changes persisted to database', {
|
||||||
|
workflowId: activeWorkflowId,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.catch((error) => {
|
||||||
|
logger.warn('Failed to persist accepted workflow state', { error })
|
||||||
|
})
|
||||||
|
|
||||||
// Emit event for undo/redo recording (unless we're in an undo/redo operation)
|
// Emit event for undo/redo recording (unless we're in an undo/redo operation)
|
||||||
if (!(window as any).__skipDiffRecording) {
|
if (!(window as any).__skipDiffRecording) {
|
||||||
window.dispatchEvent(
|
window.dispatchEvent(
|
||||||
@@ -356,8 +467,19 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
|
|||||||
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
|
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
|
||||||
|
|
||||||
if (!baselineWorkflow || !baselineWorkflowId) {
|
if (!baselineWorkflow || !baselineWorkflowId) {
|
||||||
logger.warn('Reject called without baseline workflow')
|
logger.warn('Reject called without baseline workflow - cannot revert changes')
|
||||||
|
// This can happen if the diff was restored from markers after page refresh
|
||||||
|
// without a saved baseline. Just clear the diff markers without reverting.
|
||||||
get().clearDiff({ restoreBaseline: false })
|
get().clearDiff({ restoreBaseline: false })
|
||||||
|
// Show a notification to the user
|
||||||
|
try {
|
||||||
|
const { useNotificationStore } = await import('@/stores/notifications')
|
||||||
|
useNotificationStore.getState().addNotification({
|
||||||
|
level: 'info',
|
||||||
|
message:
|
||||||
|
'Cannot revert: The original workflow state was lost after page refresh. Diff markers have been cleared.',
|
||||||
|
})
|
||||||
|
} catch {}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -383,6 +505,13 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
|
|||||||
})
|
})
|
||||||
const afterReject = cloneWorkflowState(baselineWorkflow)
|
const afterReject = cloneWorkflowState(baselineWorkflow)
|
||||||
|
|
||||||
|
// Clear pending diff from Redis (fire and forget)
|
||||||
|
getActiveStreamId().then((streamId) => {
|
||||||
|
if (streamId) {
|
||||||
|
clearPendingDiffFromServer(streamId)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
// Clear diff state FIRST for instant UI feedback
|
// Clear diff state FIRST for instant UI feedback
|
||||||
set({
|
set({
|
||||||
hasActiveDiff: false,
|
hasActiveDiff: false,
|
||||||
@@ -526,6 +655,94 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
|
|||||||
logger.info('Re-applied diff markers to workflow blocks')
|
logger.info('Re-applied diff markers to workflow blocks')
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
|
restoreDiffFromMarkers: () => {
|
||||||
|
// Check if the workflow has any blocks with is_diff markers
|
||||||
|
// If so, restore the diff store state to show the diff view
|
||||||
|
const { hasActiveDiff } = get()
|
||||||
|
if (hasActiveDiff) {
|
||||||
|
// Already have an active diff
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
const workflowStore = useWorkflowStore.getState()
|
||||||
|
const blocks = workflowStore.blocks
|
||||||
|
|
||||||
|
const newBlocks: string[] = []
|
||||||
|
const editedBlocks: string[] = []
|
||||||
|
const fieldDiffs: Record<string, { changed_fields: string[] }> = {}
|
||||||
|
|
||||||
|
Object.entries(blocks).forEach(([blockId, block]) => {
|
||||||
|
const isDiff = (block as any).is_diff
|
||||||
|
if (isDiff === 'new') {
|
||||||
|
newBlocks.push(blockId)
|
||||||
|
} else if (isDiff === 'edited') {
|
||||||
|
editedBlocks.push(blockId)
|
||||||
|
// Check for field diffs
|
||||||
|
const blockFieldDiffs = (block as any).field_diffs
|
||||||
|
if (blockFieldDiffs) {
|
||||||
|
fieldDiffs[blockId] = blockFieldDiffs
|
||||||
|
} else {
|
||||||
|
// Check subBlocks for is_diff markers
|
||||||
|
const changedFields: string[] = []
|
||||||
|
Object.entries((block as any).subBlocks || {}).forEach(
|
||||||
|
([fieldId, subBlock]: [string, any]) => {
|
||||||
|
if (subBlock?.is_diff === 'changed') {
|
||||||
|
changedFields.push(fieldId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
if (changedFields.length > 0) {
|
||||||
|
fieldDiffs[blockId] = { changed_fields: changedFields }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if (newBlocks.length === 0 && editedBlocks.length === 0) {
|
||||||
|
// No diff markers found
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info('Restoring diff state from markers', {
|
||||||
|
newBlocks: newBlocks.length,
|
||||||
|
editedBlocks: editedBlocks.length,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Restore the diff state
|
||||||
|
// Note: We don't have the baseline, so reject will just clear the diff
|
||||||
|
// Add unchanged_fields to satisfy the type (we don't track unchanged fields on restore)
|
||||||
|
const fieldDiffsWithUnchanged: Record<
|
||||||
|
string,
|
||||||
|
{ changed_fields: string[]; unchanged_fields: string[] }
|
||||||
|
> = {}
|
||||||
|
Object.entries(fieldDiffs).forEach(([blockId, diff]) => {
|
||||||
|
fieldDiffsWithUnchanged[blockId] = {
|
||||||
|
changed_fields: diff.changed_fields,
|
||||||
|
unchanged_fields: [],
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
const diffAnalysis = {
|
||||||
|
new_blocks: newBlocks,
|
||||||
|
edited_blocks: editedBlocks,
|
||||||
|
deleted_blocks: [],
|
||||||
|
field_diffs: fieldDiffsWithUnchanged,
|
||||||
|
}
|
||||||
|
|
||||||
|
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
|
||||||
|
|
||||||
|
batchedUpdate({
|
||||||
|
hasActiveDiff: true,
|
||||||
|
isShowingDiff: true,
|
||||||
|
isDiffReady: true,
|
||||||
|
baselineWorkflow: null, // We don't have baseline on restore from markers alone
|
||||||
|
baselineWorkflowId: activeWorkflowId, // Set the workflow ID for later baseline restoration
|
||||||
|
diffAnalysis,
|
||||||
|
diffMetadata: null,
|
||||||
|
diffError: null,
|
||||||
|
})
|
||||||
|
},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{ name: 'workflow-diff-store' }
|
{ name: 'workflow-diff-store' }
|
||||||
|
|||||||
@@ -15,10 +15,16 @@ export interface WorkflowDiffState {
|
|||||||
|
|
||||||
export interface WorkflowDiffActions {
|
export interface WorkflowDiffActions {
|
||||||
setProposedChanges: (workflowState: WorkflowState, diffAnalysis?: DiffAnalysis) => Promise<void>
|
setProposedChanges: (workflowState: WorkflowState, diffAnalysis?: DiffAnalysis) => Promise<void>
|
||||||
|
restoreDiffWithBaseline: (
|
||||||
|
baselineWorkflow: WorkflowState,
|
||||||
|
proposedWorkflow: WorkflowState,
|
||||||
|
diffAnalysis: DiffAnalysis
|
||||||
|
) => void
|
||||||
clearDiff: (options?: { restoreBaseline?: boolean }) => void
|
clearDiff: (options?: { restoreBaseline?: boolean }) => void
|
||||||
toggleDiffView: () => void
|
toggleDiffView: () => void
|
||||||
acceptChanges: () => Promise<void>
|
acceptChanges: () => Promise<void>
|
||||||
rejectChanges: () => Promise<void>
|
rejectChanges: () => Promise<void>
|
||||||
reapplyDiffMarkers: () => void
|
reapplyDiffMarkers: () => void
|
||||||
|
restoreDiffFromMarkers: () => void
|
||||||
_batchedStateUpdate: (updates: Partial<WorkflowDiffState>) => void
|
_batchedStateUpdate: (updates: Partial<WorkflowDiffState>) => void
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,17 +56,24 @@ export function captureBaselineSnapshot(workflowId: string): WorkflowState {
|
|||||||
|
|
||||||
export async function persistWorkflowStateToServer(
|
export async function persistWorkflowStateToServer(
|
||||||
workflowId: string,
|
workflowId: string,
|
||||||
workflowState: WorkflowState
|
workflowState: WorkflowState,
|
||||||
|
options?: { preserveDiffMarkers?: boolean }
|
||||||
): Promise<boolean> {
|
): Promise<boolean> {
|
||||||
try {
|
try {
|
||||||
const cleanState = stripWorkflowDiffMarkers(cloneWorkflowState(workflowState))
|
// When preserveDiffMarkers is true, we keep the is_diff markers on blocks
|
||||||
|
// so they survive page refresh and can be restored. This is used when
|
||||||
|
// persisting a diff that hasn't been accepted/rejected yet.
|
||||||
|
const stateToSave = options?.preserveDiffMarkers
|
||||||
|
? cloneWorkflowState(workflowState)
|
||||||
|
: stripWorkflowDiffMarkers(cloneWorkflowState(workflowState))
|
||||||
|
|
||||||
const response = await fetch(`/api/workflows/${workflowId}/state`, {
|
const response = await fetch(`/api/workflows/${workflowId}/state`, {
|
||||||
method: 'PUT',
|
method: 'PUT',
|
||||||
headers: {
|
headers: {
|
||||||
'Content-Type': 'application/json',
|
'Content-Type': 'application/json',
|
||||||
},
|
},
|
||||||
body: JSON.stringify({
|
body: JSON.stringify({
|
||||||
...cleanState,
|
...stateToSave,
|
||||||
lastSaved: Date.now(),
|
lastSaved: Date.now(),
|
||||||
}),
|
}),
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user