diff --git a/apps/sim/app/api/copilot/headless/route.ts b/apps/sim/app/api/copilot/headless/route.ts new file mode 100644 index 000000000..9735b4859 --- /dev/null +++ b/apps/sim/app/api/copilot/headless/route.ts @@ -0,0 +1,364 @@ +import { db } from '@sim/db' +import { copilotChats, workflow as workflowTable } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { eq } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { authenticateApiKeyFromHeader, updateApiKeyLastUsed } from '@/lib/api-key/service' +import { getSession } from '@/lib/auth' +import { getCopilotModel } from '@/lib/copilot/config' +import { SIM_AGENT_API_URL_DEFAULT, SIM_AGENT_VERSION } from '@/lib/copilot/constants' +import { COPILOT_MODEL_IDS } from '@/lib/copilot/models' +import { + createRequestTracker, + createUnauthorizedResponse, +} from '@/lib/copilot/request-helpers' +import { + createStream, + completeStream, + errorStream, + updateStreamStatus, +} from '@/lib/copilot/stream-persistence' +import { executeToolServerSide, isServerExecutableTool } from '@/lib/copilot/tools/server/executor' +import { getCredentialsServerTool } from '@/lib/copilot/tools/server/user/get-credentials' +import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' +import { sanitizeForCopilot } from '@/lib/workflows/sanitization/json-sanitizer' +import { env } from '@/lib/core/config/env' +import { tools } from '@/tools/registry' +import { getLatestVersionTools, stripVersionSuffix } from '@/tools/utils' + +const logger = createLogger('HeadlessCopilotAPI') + +const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT + +const HeadlessRequestSchema = z.object({ + message: z.string().min(1, 'Message is required'), + workflowId: z.string().min(1, 'Workflow ID is required'), + chatId: z.string().optional(), + model: z.enum(COPILOT_MODEL_IDS).optional(), + mode: z.enum(['agent', 'build', 'chat']).optional().default('agent'), + timeout: z.number().optional().default(300000), // 5 minute default + persistChanges: z.boolean().optional().default(true), + createNewChat: z.boolean().optional().default(false), +}) + +export const dynamic = 'force-dynamic' +export const fetchCache = 'force-no-store' +export const runtime = 'nodejs' + +/** + * POST /api/copilot/headless + * + * Execute copilot completely server-side without any client connection. + * All tool calls are executed server-side and results are persisted directly. + * + * Returns the final result after all processing is complete. + */ +export async function POST(req: NextRequest) { + const tracker = createRequestTracker() + const startTime = Date.now() + + try { + // Authenticate via session or API key + let userId: string | null = null + + const session = await getSession() + if (session?.user?.id) { + userId = session.user.id + } else { + // Try API key authentication from header + const apiKey = req.headers.get('x-api-key') + if (apiKey) { + const authResult = await authenticateApiKeyFromHeader(apiKey) + if (authResult.success && authResult.userId) { + userId = authResult.userId + // Update last used timestamp in background + if (authResult.keyId) { + updateApiKeyLastUsed(authResult.keyId).catch(() => {}) + } + } + } + } + + if (!userId) { + return createUnauthorizedResponse() + } + + const body = await req.json() + const { message, workflowId, chatId, model, mode, timeout, persistChanges, createNewChat } = + HeadlessRequestSchema.parse(body) + + logger.info(`[${tracker.requestId}] Headless copilot request`, { + userId, + workflowId, + messageLength: message.length, + mode, + }) + + // Verify user has access to workflow + const [wf] = await db + .select({ userId: workflowTable.userId, workspaceId: workflowTable.workspaceId }) + .from(workflowTable) + .where(eq(workflowTable.id, workflowId)) + .limit(1) + + if (!wf) { + return NextResponse.json({ error: 'Workflow not found' }, { status: 404 }) + } + + // TODO: Add proper workspace access check + if (wf.userId !== userId) { + return NextResponse.json({ error: 'Access denied' }, { status: 403 }) + } + + // Load current workflow state from database + const workflowData = await loadWorkflowFromNormalizedTables(workflowId) + if (!workflowData) { + return NextResponse.json({ error: 'Workflow data not found' }, { status: 404 }) + } + + const sanitizedWorkflow = sanitizeForCopilot({ + blocks: workflowData.blocks, + edges: workflowData.edges, + loops: workflowData.loops, + parallels: workflowData.parallels, + }) + + // Create a stream for tracking (even in headless mode) + const streamId = crypto.randomUUID() + const userMessageId = crypto.randomUUID() + const assistantMessageId = crypto.randomUUID() + + await createStream({ + streamId, + chatId: chatId || '', + userId, + workflowId, + userMessageId, + isClientSession: false, // Key: this is headless + }) + + await updateStreamStatus(streamId, 'streaming') + + // Handle chat persistence + let actualChatId = chatId + if (createNewChat && !chatId) { + const { provider, model: defaultModel } = getCopilotModel('chat') + const [newChat] = await db + .insert(copilotChats) + .values({ + userId, + workflowId, + title: null, + model: model || defaultModel, + messages: [], + }) + .returning() + + if (newChat) { + actualChatId = newChat.id + } + } + + // Get credentials for tools + let credentials: { + oauth: Record + apiKeys: string[] + } | null = null + + try { + const rawCredentials = await getCredentialsServerTool.execute({ workflowId }, { userId }) + const oauthMap: Record = {} + + for (const cred of rawCredentials?.oauth?.connected?.credentials || []) { + if (cred.accessToken) { + oauthMap[cred.provider] = { + accessToken: cred.accessToken, + accountId: cred.id, + name: cred.name, + } + } + } + + credentials = { + oauth: oauthMap, + apiKeys: rawCredentials?.environment?.variableNames || [], + } + } catch (error) { + logger.warn(`[${tracker.requestId}] Failed to fetch credentials`, { error }) + } + + // Build tool definitions + const { createUserToolSchema } = await import('@/tools/params') + const latestTools = getLatestVersionTools(tools) + const integrationTools = Object.entries(latestTools).map(([toolId, toolConfig]) => { + const userSchema = createUserToolSchema(toolConfig) + const strippedName = stripVersionSuffix(toolId) + return { + name: strippedName, + description: toolConfig.description || toolConfig.name || strippedName, + input_schema: userSchema, + defer_loading: true, + } + }) + + // Build request payload + const defaults = getCopilotModel('chat') + const selectedModel = model || defaults.model + const effectiveMode = mode === 'agent' ? 'build' : mode + + const requestPayload = { + message, + workflowId, + userId, + stream: false, // Non-streaming for headless + model: selectedModel, + mode: effectiveMode, + version: SIM_AGENT_VERSION, + messageId: userMessageId, + ...(actualChatId && { chatId: actualChatId }), + ...(integrationTools.length > 0 && { tools: integrationTools }), + ...(credentials && { credentials }), + } + + // Call sim agent (non-streaming) + const controller = new AbortController() + const timeoutId = setTimeout(() => controller.abort(), timeout) + + try { + const response = await fetch(`${SIM_AGENT_API_URL}/api/chat-completion`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + ...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}), + }, + body: JSON.stringify(requestPayload), + signal: controller.signal, + }) + + clearTimeout(timeoutId) + + if (!response.ok) { + const errorText = await response.text() + logger.error(`[${tracker.requestId}] Sim agent error`, { + status: response.status, + error: errorText, + }) + await errorStream(streamId, `Agent error: ${response.statusText}`) + return NextResponse.json( + { error: `Agent error: ${response.statusText}` }, + { status: response.status } + ) + } + + const result = await response.json() + + // Execute tool calls server-side + const toolResults: Record = {} + + if (result.toolCalls && Array.isArray(result.toolCalls)) { + for (const toolCall of result.toolCalls) { + const toolName = toolCall.name + const toolArgs = toolCall.arguments || toolCall.input || {} + + logger.info(`[${tracker.requestId}] Executing tool server-side`, { + toolName, + toolCallId: toolCall.id, + }) + + if (!isServerExecutableTool(toolName)) { + logger.warn(`[${tracker.requestId}] Tool not executable server-side`, { toolName }) + toolResults[toolCall.id] = { + success: false, + error: `Tool ${toolName} requires client-side execution`, + } + continue + } + + const toolResult = await executeToolServerSide( + { name: toolName, args: toolArgs }, + { workflowId, userId, persistChanges } + ) + + toolResults[toolCall.id] = toolResult + } + } + + // Mark stream complete + await completeStream(streamId, { content: result.content, toolResults }) + + // Save to chat history + if (actualChatId && persistChanges) { + const [chat] = await db + .select() + .from(copilotChats) + .where(eq(copilotChats.id, actualChatId)) + .limit(1) + + const existingMessages = chat ? (Array.isArray(chat.messages) ? chat.messages : []) : [] + + const newMessages = [ + ...existingMessages, + { + id: userMessageId, + role: 'user', + content: message, + timestamp: new Date().toISOString(), + }, + { + id: assistantMessageId, + role: 'assistant', + content: result.content, + timestamp: new Date().toISOString(), + toolCalls: Object.entries(toolResults).map(([id, r]) => ({ + id, + success: r.success, + })), + }, + ] + + await db + .update(copilotChats) + .set({ messages: newMessages, updatedAt: new Date() }) + .where(eq(copilotChats.id, actualChatId)) + } + + const duration = Date.now() - startTime + logger.info(`[${tracker.requestId}] Headless copilot complete`, { + duration, + contentLength: result.content?.length || 0, + toolCallsExecuted: Object.keys(toolResults).length, + }) + + return NextResponse.json({ + success: true, + streamId, + chatId: actualChatId, + content: result.content, + toolResults, + duration, + }) + } catch (error) { + clearTimeout(timeoutId) + + if (error instanceof Error && error.name === 'AbortError') { + await errorStream(streamId, 'Request timed out') + return NextResponse.json({ error: 'Request timed out' }, { status: 504 }) + } + + throw error + } + } catch (error) { + logger.error(`[${tracker.requestId}] Headless copilot error`, { error }) + + if (error instanceof z.ZodError) { + return NextResponse.json({ error: 'Invalid request', details: error.errors }, { status: 400 }) + } + + return NextResponse.json( + { error: error instanceof Error ? error.message : 'Internal error' }, + { status: 500 } + ) + } +} + diff --git a/apps/sim/app/api/copilot/stream/[streamId]/route.ts b/apps/sim/app/api/copilot/stream/[streamId]/route.ts new file mode 100644 index 000000000..62e0a15f1 --- /dev/null +++ b/apps/sim/app/api/copilot/stream/[streamId]/route.ts @@ -0,0 +1,237 @@ +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { getSession } from '@/lib/auth' +import { + getStreamMetadata, + getStreamEvents, + getStreamEventCount, + getToolCallStates, + refreshStreamTTL, + checkAbortSignal, + abortStream, +} from '@/lib/copilot/stream-persistence' + +const logger = createLogger('StreamResumeAPI') + +interface RouteParams { + streamId: string +} + +/** + * GET /api/copilot/stream/{streamId} + * Subscribe to or resume a stream + * + * Query params: + * - offset: Start from this event index (for resumption) + * - mode: 'sse' (default) or 'poll' + */ +export async function GET(req: NextRequest, { params }: { params: Promise }) { + const { streamId } = await params + const session = await getSession() + + if (!session?.user?.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const metadata = await getStreamMetadata(streamId) + if (!metadata) { + return NextResponse.json({ error: 'Stream not found' }, { status: 404 }) + } + + // Verify user owns this stream + if (metadata.userId !== session.user.id) { + return NextResponse.json({ error: 'Forbidden' }, { status: 403 }) + } + + const offset = parseInt(req.nextUrl.searchParams.get('offset') || '0', 10) + const mode = req.nextUrl.searchParams.get('mode') || 'sse' + + // Refresh TTL since someone is actively consuming + await refreshStreamTTL(streamId) + + // Poll mode: return current state as JSON + if (mode === 'poll') { + const events = await getStreamEvents(streamId, offset) + const toolCalls = await getToolCallStates(streamId) + const eventCount = await getStreamEventCount(streamId) + + return NextResponse.json({ + metadata, + events, + toolCalls, + totalEvents: eventCount, + nextOffset: offset + events.length, + }) + } + + // SSE mode: stream events + const encoder = new TextEncoder() + + const readable = new ReadableStream({ + async start(controller) { + let closed = false + + const safeEnqueue = (data: string) => { + if (closed) return + try { + controller.enqueue(encoder.encode(data)) + } catch { + closed = true + } + } + + const safeClose = () => { + if (closed) return + closed = true + try { + controller.close() + } catch { + // Already closed + } + } + + // Send initial connection event + safeEnqueue(`: connected\n\n`) + + // Send metadata + safeEnqueue(`event: metadata\ndata: ${JSON.stringify(metadata)}\n\n`) + + // Send tool call states + const toolCalls = await getToolCallStates(streamId) + if (Object.keys(toolCalls).length > 0) { + safeEnqueue(`event: tool_states\ndata: ${JSON.stringify(toolCalls)}\n\n`) + } + + // Replay missed events + const missedEvents = await getStreamEvents(streamId, offset) + for (const event of missedEvents) { + safeEnqueue(event) + } + + // If stream is complete, send done and close + if (metadata.status === 'complete' || metadata.status === 'error' || metadata.status === 'aborted') { + safeEnqueue( + `event: stream_status\ndata: ${JSON.stringify({ + status: metadata.status, + error: metadata.error, + })}\n\n` + ) + safeClose() + return + } + + // Stream is still active - poll for new events + let lastOffset = offset + missedEvents.length + const pollInterval = 100 // 100ms + const maxPollTime = 5 * 60 * 1000 // 5 minutes max + const startTime = Date.now() + + const poll = async () => { + if (closed) return + + try { + // Check for timeout + if (Date.now() - startTime > maxPollTime) { + logger.info('Stream poll timeout', { streamId }) + safeEnqueue( + `event: stream_status\ndata: ${JSON.stringify({ status: 'timeout' })}\n\n` + ) + safeClose() + return + } + + // Check if client disconnected + if (await checkAbortSignal(streamId)) { + safeEnqueue( + `event: stream_status\ndata: ${JSON.stringify({ status: 'aborted' })}\n\n` + ) + safeClose() + return + } + + // Get current metadata to check status + const currentMeta = await getStreamMetadata(streamId) + if (!currentMeta) { + safeClose() + return + } + + // Get new events + const newEvents = await getStreamEvents(streamId, lastOffset) + for (const event of newEvents) { + safeEnqueue(event) + } + lastOffset += newEvents.length + + // Refresh TTL + await refreshStreamTTL(streamId) + + // If complete, send status and close + if ( + currentMeta.status === 'complete' || + currentMeta.status === 'error' || + currentMeta.status === 'aborted' + ) { + safeEnqueue( + `event: stream_status\ndata: ${JSON.stringify({ + status: currentMeta.status, + error: currentMeta.error, + })}\n\n` + ) + safeClose() + return + } + + // Continue polling + setTimeout(poll, pollInterval) + } catch (error) { + logger.error('Stream poll error', { streamId, error }) + safeClose() + } + } + + // Start polling + setTimeout(poll, pollInterval) + }, + }) + + return new Response(readable, { + headers: { + 'Content-Type': 'text/event-stream; charset=utf-8', + 'Cache-Control': 'no-cache, no-transform', + Connection: 'keep-alive', + 'X-Accel-Buffering': 'no', + 'X-Stream-Id': streamId, + }, + }) +} + +/** + * DELETE /api/copilot/stream/{streamId} + * Abort a stream + */ +export async function DELETE(req: NextRequest, { params }: { params: Promise }) { + const { streamId } = await params + const session = await getSession() + + if (!session?.user?.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const metadata = await getStreamMetadata(streamId) + if (!metadata) { + return NextResponse.json({ error: 'Stream not found' }, { status: 404 }) + } + + // Verify user owns this stream + if (metadata.userId !== session.user.id) { + return NextResponse.json({ error: 'Forbidden' }, { status: 403 }) + } + + await abortStream(streamId) + + logger.info('Stream aborted by user', { streamId, userId: session.user.id }) + + return NextResponse.json({ success: true, streamId }) +} + diff --git a/apps/sim/lib/copilot/api.ts b/apps/sim/lib/copilot/api.ts index c680f9751..12021699c 100644 --- a/apps/sim/lib/copilot/api.ts +++ b/apps/sim/lib/copilot/api.ts @@ -98,6 +98,8 @@ export interface ApiResponse { */ export interface StreamingResponse extends ApiResponse { stream?: ReadableStream + streamId?: string + chatId?: string } /** @@ -163,9 +165,15 @@ export async function sendStreamingMessage( } } + // Extract stream and chat IDs from headers for resumption support + const streamId = response.headers.get('X-Stream-Id') || undefined + const chatId = response.headers.get('X-Chat-Id') || undefined + return { success: true, stream: response.body, + streamId, + chatId, } } catch (error) { // Handle AbortError gracefully - this is expected when user aborts diff --git a/apps/sim/lib/copilot/render-events.ts b/apps/sim/lib/copilot/render-events.ts new file mode 100644 index 000000000..8afd3a519 --- /dev/null +++ b/apps/sim/lib/copilot/render-events.ts @@ -0,0 +1,324 @@ +/** + * Render events are the normalized event types sent to clients. + * These are independent of the sim agent's internal event format. + */ + +export type RenderEventType = + | 'text_delta' + | 'text_complete' + | 'tool_pending' + | 'tool_executing' + | 'tool_success' + | 'tool_error' + | 'tool_result' + | 'subagent_start' + | 'subagent_text' + | 'subagent_tool_call' + | 'subagent_end' + | 'thinking_start' + | 'thinking_delta' + | 'thinking_end' + | 'message_start' + | 'message_complete' + | 'chat_id' + | 'conversation_id' + | 'error' + | 'stream_status' + +export interface BaseRenderEvent { + type: RenderEventType + timestamp?: number +} + +export interface TextDeltaEvent extends BaseRenderEvent { + type: 'text_delta' + content: string +} + +export interface TextCompleteEvent extends BaseRenderEvent { + type: 'text_complete' + content: string +} + +export interface ToolPendingEvent extends BaseRenderEvent { + type: 'tool_pending' + toolCallId: string + toolName: string + args?: Record + display?: { + label: string + icon?: string + } +} + +export interface ToolExecutingEvent extends BaseRenderEvent { + type: 'tool_executing' + toolCallId: string + toolName: string +} + +export interface ToolSuccessEvent extends BaseRenderEvent { + type: 'tool_success' + toolCallId: string + toolName: string + result?: unknown + display?: { + label: string + icon?: string + } +} + +export interface ToolErrorEvent extends BaseRenderEvent { + type: 'tool_error' + toolCallId: string + toolName: string + error: string + display?: { + label: string + icon?: string + } +} + +export interface ToolResultEvent extends BaseRenderEvent { + type: 'tool_result' + toolCallId: string + success: boolean + result?: unknown + error?: string + failedDependency?: boolean + skipped?: boolean +} + +export interface SubagentStartEvent extends BaseRenderEvent { + type: 'subagent_start' + parentToolCallId: string + subagentName: string +} + +export interface SubagentTextEvent extends BaseRenderEvent { + type: 'subagent_text' + parentToolCallId: string + content: string +} + +export interface SubagentToolCallEvent extends BaseRenderEvent { + type: 'subagent_tool_call' + parentToolCallId: string + toolCallId: string + toolName: string + args?: Record + state: 'pending' | 'executing' | 'success' | 'error' + result?: unknown + error?: string +} + +export interface SubagentEndEvent extends BaseRenderEvent { + type: 'subagent_end' + parentToolCallId: string +} + +export interface ThinkingStartEvent extends BaseRenderEvent { + type: 'thinking_start' +} + +export interface ThinkingDeltaEvent extends BaseRenderEvent { + type: 'thinking_delta' + content: string +} + +export interface ThinkingEndEvent extends BaseRenderEvent { + type: 'thinking_end' +} + +export interface MessageStartEvent extends BaseRenderEvent { + type: 'message_start' + messageId: string +} + +export interface MessageCompleteEvent extends BaseRenderEvent { + type: 'message_complete' + messageId: string + content?: string +} + +export interface ChatIdEvent extends BaseRenderEvent { + type: 'chat_id' + chatId: string +} + +export interface ConversationIdEvent extends BaseRenderEvent { + type: 'conversation_id' + conversationId: string +} + +export interface ErrorEvent extends BaseRenderEvent { + type: 'error' + error: string + code?: string +} + +export interface StreamStatusEvent extends BaseRenderEvent { + type: 'stream_status' + status: 'streaming' | 'complete' | 'error' | 'aborted' + error?: string +} + +export type RenderEvent = + | TextDeltaEvent + | TextCompleteEvent + | ToolPendingEvent + | ToolExecutingEvent + | ToolSuccessEvent + | ToolErrorEvent + | ToolResultEvent + | SubagentStartEvent + | SubagentTextEvent + | SubagentToolCallEvent + | SubagentEndEvent + | ThinkingStartEvent + | ThinkingDeltaEvent + | ThinkingEndEvent + | MessageStartEvent + | MessageCompleteEvent + | ChatIdEvent + | ConversationIdEvent + | ErrorEvent + | StreamStatusEvent + +/** + * Serialize a render event to SSE format + */ +export function serializeRenderEvent(event: RenderEvent): string { + const eventWithTimestamp = { + ...event, + timestamp: event.timestamp || Date.now(), + } + return `event: ${event.type}\ndata: ${JSON.stringify(eventWithTimestamp)}\n\n` +} + +/** + * Parse an SSE chunk into a render event + */ +export function parseRenderEvent(chunk: string): RenderEvent | null { + // SSE format: "event: \ndata: \n\n" + const lines = chunk.trim().split('\n') + let eventType: string | null = null + let data: string | null = null + + for (const line of lines) { + if (line.startsWith('event: ')) { + eventType = line.slice(7) + } else if (line.startsWith('data: ')) { + data = line.slice(6) + } + } + + if (!data) return null + + try { + const parsed = JSON.parse(data) + // If we extracted an event type from SSE, use it; otherwise use from data + if (eventType && !parsed.type) { + parsed.type = eventType + } + return parsed as RenderEvent + } catch { + return null + } +} + +/** + * Create a text delta event + */ +export function createTextDelta(content: string): TextDeltaEvent { + return { type: 'text_delta', content, timestamp: Date.now() } +} + +/** + * Create a tool pending event + */ +export function createToolPending( + toolCallId: string, + toolName: string, + args?: Record, + display?: { label: string; icon?: string } +): ToolPendingEvent { + return { + type: 'tool_pending', + toolCallId, + toolName, + args, + display, + timestamp: Date.now(), + } +} + +/** + * Create a tool executing event + */ +export function createToolExecuting(toolCallId: string, toolName: string): ToolExecutingEvent { + return { type: 'tool_executing', toolCallId, toolName, timestamp: Date.now() } +} + +/** + * Create a tool success event + */ +export function createToolSuccess( + toolCallId: string, + toolName: string, + result?: unknown, + display?: { label: string; icon?: string } +): ToolSuccessEvent { + return { + type: 'tool_success', + toolCallId, + toolName, + result, + display, + timestamp: Date.now(), + } +} + +/** + * Create a tool error event + */ +export function createToolError( + toolCallId: string, + toolName: string, + error: string, + display?: { label: string; icon?: string } +): ToolErrorEvent { + return { + type: 'tool_error', + toolCallId, + toolName, + error, + display, + timestamp: Date.now(), + } +} + +/** + * Create a message complete event + */ +export function createMessageComplete(messageId: string, content?: string): MessageCompleteEvent { + return { type: 'message_complete', messageId, content, timestamp: Date.now() } +} + +/** + * Create a stream status event + */ +export function createStreamStatus( + status: 'streaming' | 'complete' | 'error' | 'aborted', + error?: string +): StreamStatusEvent { + return { type: 'stream_status', status, error, timestamp: Date.now() } +} + +/** + * Create an error event + */ +export function createError(error: string, code?: string): ErrorEvent { + return { type: 'error', error, code, timestamp: Date.now() } +} + diff --git a/apps/sim/lib/copilot/stream-client.ts b/apps/sim/lib/copilot/stream-client.ts new file mode 100644 index 000000000..614fb1dc0 --- /dev/null +++ b/apps/sim/lib/copilot/stream-client.ts @@ -0,0 +1,299 @@ +'use client' + +import { createLogger } from '@sim/logger' + +const logger = createLogger('StreamClient') + +export interface StreamMetadata { + streamId: string + chatId: string + userId: string + workflowId: string + userMessageId: string + assistantMessageId?: string + status: 'pending' | 'streaming' | 'complete' | 'error' | 'aborted' + isClientSession: boolean + createdAt: number + updatedAt: number + completedAt?: number + error?: string +} + +export interface StreamResumeResponse { + metadata: StreamMetadata + events: string[] + toolCalls: Record + totalEvents: number + nextOffset: number +} + +const STREAM_ID_STORAGE_KEY = 'copilot:activeStream' +const RECONNECT_DELAY_MS = 1000 +const MAX_RECONNECT_ATTEMPTS = 5 + +/** + * Store active stream info for potential resumption + */ +export function storeActiveStream( + chatId: string, + streamId: string, + messageId: string +): void { + try { + const data = { chatId, streamId, messageId, storedAt: Date.now() } + sessionStorage.setItem(STREAM_ID_STORAGE_KEY, JSON.stringify(data)) + logger.info('Stored active stream for potential resumption', { streamId, chatId }) + } catch { + // Session storage not available + } +} + +/** + * Get stored active stream if one exists + */ +export function getStoredActiveStream(): { + chatId: string + streamId: string + messageId: string + storedAt: number +} | null { + try { + const data = sessionStorage.getItem(STREAM_ID_STORAGE_KEY) + if (!data) return null + return JSON.parse(data) + } catch { + return null + } +} + +/** + * Clear stored active stream + */ +export function clearStoredActiveStream(): void { + try { + sessionStorage.removeItem(STREAM_ID_STORAGE_KEY) + } catch { + // Session storage not available + } +} + +/** + * Check if a stream is still active + */ +export async function checkStreamStatus(streamId: string): Promise { + try { + const response = await fetch(`/api/copilot/stream/${streamId}?mode=poll&offset=0`) + if (!response.ok) { + if (response.status === 404) { + // Stream not found or expired + return null + } + throw new Error(`Failed to check stream status: ${response.statusText}`) + } + const data: StreamResumeResponse = await response.json() + return data.metadata + } catch (error) { + logger.error('Failed to check stream status', { streamId, error }) + return null + } +} + +/** + * Resume a stream from a given offset using SSE + */ +export async function resumeStream( + streamId: string, + offset: number = 0 +): Promise | null> { + try { + const response = await fetch(`/api/copilot/stream/${streamId}?mode=sse&offset=${offset}`) + if (!response.ok || !response.body) { + if (response.status === 404) { + logger.info('Stream not found for resumption', { streamId }) + clearStoredActiveStream() + return null + } + throw new Error(`Failed to resume stream: ${response.statusText}`) + } + + logger.info('Stream resumption started', { streamId, offset }) + return response.body + } catch (error) { + logger.error('Failed to resume stream', { streamId, error }) + return null + } +} + +/** + * Abort a stream + */ +export async function abortStream(streamId: string): Promise { + try { + const response = await fetch(`/api/copilot/stream/${streamId}`, { + method: 'DELETE', + }) + if (!response.ok && response.status !== 404) { + throw new Error(`Failed to abort stream: ${response.statusText}`) + } + clearStoredActiveStream() + return true + } catch (error) { + logger.error('Failed to abort stream', { streamId, error }) + return false + } +} + +export interface StreamSubscription { + unsubscribe: () => void + getStreamId: () => string +} + +export interface StreamEventHandler { + onEvent: (event: { type: string; data: Record }) => void + onError?: (error: Error) => void + onComplete?: () => void +} + +/** + * Subscribe to a stream (new or resumed) and process events + * This provides a unified interface for both initial streams and resumed streams + */ +export function subscribeToStream( + streamBody: ReadableStream, + handlers: StreamEventHandler +): StreamSubscription { + const reader = streamBody.getReader() + const decoder = new TextDecoder() + let cancelled = false + let buffer = '' + let streamId = '' + + const processEvents = async () => { + try { + while (!cancelled) { + const { done, value } = await reader.read() + if (done || cancelled) break + + buffer += decoder.decode(value, { stream: true }) + + // Process complete SSE messages + const messages = buffer.split('\n\n') + buffer = messages.pop() || '' + + for (const message of messages) { + if (!message.trim()) continue + if (message.startsWith(':')) continue // SSE comment (ping) + + // Parse SSE format + const lines = message.split('\n') + let eventType = 'message' + let data: Record = {} + + for (const line of lines) { + if (line.startsWith('event: ')) { + eventType = line.slice(7) + } else if (line.startsWith('data: ')) { + try { + data = JSON.parse(line.slice(6)) + } catch { + data = { raw: line.slice(6) } + } + } + } + + // Track stream ID if provided in metadata + if (eventType === 'metadata' && data.streamId) { + streamId = data.streamId as string + } + + handlers.onEvent({ type: eventType, data }) + + // Check for terminal events + if (eventType === 'stream_status') { + const status = data.status as string + if (status === 'complete' || status === 'error' || status === 'aborted') { + if (status === 'error' && handlers.onError) { + handlers.onError(new Error(data.error as string || 'Stream error')) + } + if (handlers.onComplete) { + handlers.onComplete() + } + clearStoredActiveStream() + return + } + } + } + } + + // Stream ended without explicit status + if (handlers.onComplete) { + handlers.onComplete() + } + } catch (error) { + if (!cancelled && handlers.onError) { + handlers.onError(error instanceof Error ? error : new Error(String(error))) + } + } finally { + reader.releaseLock() + } + } + + // Start processing + processEvents() + + return { + unsubscribe: () => { + cancelled = true + reader.cancel().catch(() => {}) + clearStoredActiveStream() + }, + getStreamId: () => streamId, + } +} + +/** + * Attempt to resume any active stream from session storage + * Returns handlers if resumption is possible, null otherwise + */ +export async function attemptStreamResumption(): Promise<{ + stream: ReadableStream + metadata: StreamMetadata + offset: number +} | null> { + const stored = getStoredActiveStream() + if (!stored) return null + + // Check if stream is still valid (not too old) + const maxAge = 5 * 60 * 1000 // 5 minutes + if (Date.now() - stored.storedAt > maxAge) { + clearStoredActiveStream() + return null + } + + // Check stream status + const metadata = await checkStreamStatus(stored.streamId) + if (!metadata) { + clearStoredActiveStream() + return null + } + + // Only resume if stream is still active + if (metadata.status !== 'streaming' && metadata.status !== 'pending') { + clearStoredActiveStream() + return null + } + + // Get the stream + const stream = await resumeStream(stored.streamId, 0) + if (!stream) { + return null + } + + logger.info('Stream resumption possible', { + streamId: stored.streamId, + status: metadata.status, + }) + + return { stream, metadata, offset: 0 } +} + diff --git a/apps/sim/lib/copilot/stream-persistence.ts b/apps/sim/lib/copilot/stream-persistence.ts new file mode 100644 index 000000000..f3ef70aa2 --- /dev/null +++ b/apps/sim/lib/copilot/stream-persistence.ts @@ -0,0 +1,327 @@ +import { createLogger } from '@sim/logger' +import { getRedisClient } from '@/lib/core/config/redis' + +const logger = createLogger('StreamPersistence') + +const STREAM_PREFIX = 'copilot:stream:' +const STREAM_TTL = 60 * 60 * 24 // 24 hours + +export type StreamStatus = 'pending' | 'streaming' | 'complete' | 'error' | 'aborted' + +export interface StreamMetadata { + streamId: string + chatId: string + userId: string + workflowId: string + userMessageId: string + assistantMessageId?: string + status: StreamStatus + isClientSession: boolean + createdAt: number + updatedAt: number + completedAt?: number + error?: string +} + +export interface ToolCallState { + id: string + name: string + args: Record + state: 'pending' | 'executing' | 'success' | 'error' + result?: unknown + error?: string +} + +/** + * Initialize a new stream in Redis + */ +export async function createStream(params: { + streamId: string + chatId: string + userId: string + workflowId: string + userMessageId: string + isClientSession: boolean +}): Promise { + const redis = getRedisClient() + if (!redis) { + logger.warn('Redis not available, stream will not be resumable') + return + } + + const metadata: StreamMetadata = { + ...params, + status: 'pending', + createdAt: Date.now(), + updatedAt: Date.now(), + } + + const key = `${STREAM_PREFIX}${params.streamId}:meta` + await redis.set(key, JSON.stringify(metadata), 'EX', STREAM_TTL) + + logger.info('Stream created', { streamId: params.streamId }) +} + +/** + * Update stream status + */ +export async function updateStreamStatus( + streamId: string, + status: StreamStatus, + error?: string +): Promise { + const redis = getRedisClient() + if (!redis) return + + const key = `${STREAM_PREFIX}${streamId}:meta` + const data = await redis.get(key) + if (!data) return + + const metadata: StreamMetadata = JSON.parse(data) + metadata.status = status + metadata.updatedAt = Date.now() + if (status === 'complete' || status === 'error') { + metadata.completedAt = Date.now() + } + if (error) { + metadata.error = error + } + + await redis.set(key, JSON.stringify(metadata), 'EX', STREAM_TTL) +} + +/** + * Update stream metadata with additional fields + */ +export async function updateStreamMetadata( + streamId: string, + updates: Partial +): Promise { + const redis = getRedisClient() + if (!redis) return + + const key = `${STREAM_PREFIX}${streamId}:meta` + const data = await redis.get(key) + if (!data) return + + const metadata: StreamMetadata = JSON.parse(data) + Object.assign(metadata, updates, { updatedAt: Date.now() }) + + await redis.set(key, JSON.stringify(metadata), 'EX', STREAM_TTL) +} + +/** + * Append a serialized SSE event chunk to the stream + */ +export async function appendChunk(streamId: string, chunk: string): Promise { + const redis = getRedisClient() + if (!redis) return + + const key = `${STREAM_PREFIX}${streamId}:events` + await redis.rpush(key, chunk) + await redis.expire(key, STREAM_TTL) +} + +/** + * Append text content (for quick content retrieval without parsing events) + */ +export async function appendContent(streamId: string, content: string): Promise { + const redis = getRedisClient() + if (!redis) return + + const key = `${STREAM_PREFIX}${streamId}:content` + await redis.append(key, content) + await redis.expire(key, STREAM_TTL) +} + +/** + * Update tool call state + */ +export async function updateToolCall( + streamId: string, + toolCallId: string, + update: Partial +): Promise { + const redis = getRedisClient() + if (!redis) return + + const key = `${STREAM_PREFIX}${streamId}:tools` + const existing = await redis.hget(key, toolCallId) + const current: ToolCallState = existing + ? JSON.parse(existing) + : { id: toolCallId, name: '', args: {}, state: 'pending' } + + const updated = { ...current, ...update } + await redis.hset(key, toolCallId, JSON.stringify(updated)) + await redis.expire(key, STREAM_TTL) +} + +/** + * Mark stream as complete + */ +export async function completeStream(streamId: string, result?: unknown): Promise { + const redis = getRedisClient() + if (!redis) return + + await updateStreamStatus(streamId, 'complete') + + if (result !== undefined) { + const key = `${STREAM_PREFIX}${streamId}:result` + await redis.set(key, JSON.stringify(result), 'EX', STREAM_TTL) + } + + logger.info('Stream completed', { streamId }) +} + +/** + * Mark stream as errored + */ +export async function errorStream(streamId: string, error: string): Promise { + await updateStreamStatus(streamId, 'error', error) + logger.error('Stream errored', { streamId, error }) +} + +/** + * Check if stream was aborted (client requested abort) + */ +export async function checkAbortSignal(streamId: string): Promise { + const redis = getRedisClient() + if (!redis) return false + + const key = `${STREAM_PREFIX}${streamId}:abort` + const aborted = await redis.exists(key) + return aborted === 1 +} + +/** + * Signal stream abort + */ +export async function abortStream(streamId: string): Promise { + const redis = getRedisClient() + if (!redis) return + + await redis.set(`${STREAM_PREFIX}${streamId}:abort`, '1', 'EX', STREAM_TTL) + await updateStreamStatus(streamId, 'aborted') + logger.info('Stream aborted', { streamId }) +} + +/** + * Refresh TTL on all stream keys + */ +export async function refreshStreamTTL(streamId: string): Promise { + const redis = getRedisClient() + if (!redis) return + + const keys = [ + `${STREAM_PREFIX}${streamId}:meta`, + `${STREAM_PREFIX}${streamId}:events`, + `${STREAM_PREFIX}${streamId}:content`, + `${STREAM_PREFIX}${streamId}:tools`, + `${STREAM_PREFIX}${streamId}:result`, + ] + + for (const key of keys) { + await redis.expire(key, STREAM_TTL) + } +} + +/** + * Get stream metadata + */ +export async function getStreamMetadata(streamId: string): Promise { + const redis = getRedisClient() + if (!redis) return null + + const data = await redis.get(`${STREAM_PREFIX}${streamId}:meta`) + return data ? JSON.parse(data) : null +} + +/** + * Get stream events from offset (for resumption) + */ +export async function getStreamEvents(streamId: string, fromOffset: number = 0): Promise { + const redis = getRedisClient() + if (!redis) return [] + + const key = `${STREAM_PREFIX}${streamId}:events` + return redis.lrange(key, fromOffset, -1) +} + +/** + * Get current event count (for client to know where it is) + */ +export async function getStreamEventCount(streamId: string): Promise { + const redis = getRedisClient() + if (!redis) return 0 + + const key = `${STREAM_PREFIX}${streamId}:events` + return redis.llen(key) +} + +/** + * Get all tool call states + */ +export async function getToolCallStates(streamId: string): Promise> { + const redis = getRedisClient() + if (!redis) return {} + + const key = `${STREAM_PREFIX}${streamId}:tools` + const data = await redis.hgetall(key) + + const result: Record = {} + for (const [id, json] of Object.entries(data)) { + result[id] = JSON.parse(json) + } + return result +} + +/** + * Get accumulated content + */ +export async function getStreamContent(streamId: string): Promise { + const redis = getRedisClient() + if (!redis) return '' + + const key = `${STREAM_PREFIX}${streamId}:content` + return (await redis.get(key)) || '' +} + +/** + * Get final result (if complete) + */ +export async function getStreamResult(streamId: string): Promise { + const redis = getRedisClient() + if (!redis) return null + + const key = `${STREAM_PREFIX}${streamId}:result` + const data = await redis.get(key) + return data ? JSON.parse(data) : null +} + +/** + * Check if Redis is available for stream persistence + */ +export function isStreamPersistenceEnabled(): boolean { + return getRedisClient() !== null +} + +/** + * Delete all stream data (cleanup) + */ +export async function deleteStream(streamId: string): Promise { + const redis = getRedisClient() + if (!redis) return + + const keys = [ + `${STREAM_PREFIX}${streamId}:meta`, + `${STREAM_PREFIX}${streamId}:events`, + `${STREAM_PREFIX}${streamId}:content`, + `${STREAM_PREFIX}${streamId}:tools`, + `${STREAM_PREFIX}${streamId}:result`, + `${STREAM_PREFIX}${streamId}:abort`, + ] + + await redis.del(...keys) + logger.info('Stream deleted', { streamId }) +} + diff --git a/apps/sim/lib/copilot/stream-transformer.ts b/apps/sim/lib/copilot/stream-transformer.ts new file mode 100644 index 000000000..8b6e20358 --- /dev/null +++ b/apps/sim/lib/copilot/stream-transformer.ts @@ -0,0 +1,419 @@ +import { createLogger } from '@sim/logger' +import type { RenderEvent } from './render-events' + +const logger = createLogger('StreamTransformer') + +export interface TransformStreamContext { + streamId: string + chatId: string + userId: string + workflowId: string + userMessageId: string + assistantMessageId: string + + /** Callback for each render event - handles both client delivery and persistence */ + onRenderEvent: (event: RenderEvent) => Promise + + /** Callback for persistence operations */ + onPersist?: (data: { type: string; [key: string]: unknown }) => Promise + + /** Check if stream should be aborted */ + isAborted: () => boolean | Promise +} + +interface SimAgentEvent { + type?: string + event?: string + data?: unknown + [key: string]: unknown +} + +/** + * Transform a sim agent SSE stream into normalized render events. + * This function consumes the entire stream and emits events via callbacks. + */ +export async function transformStream( + body: ReadableStream, + context: TransformStreamContext +): Promise { + const { onRenderEvent, onPersist, isAborted } = context + + const reader = body.getReader() + const decoder = new TextDecoder() + let buffer = '' + + try { + while (true) { + // Check abort signal + const shouldAbort = await Promise.resolve(isAborted()) + if (shouldAbort) { + logger.info('Stream aborted by signal', { streamId: context.streamId }) + break + } + + const { done, value } = await reader.read() + if (done) break + + buffer += decoder.decode(value, { stream: true }) + + // Process complete SSE messages (separated by double newlines) + const messages = buffer.split('\n\n') + buffer = messages.pop() || '' // Keep incomplete message in buffer + + for (const message of messages) { + if (!message.trim()) continue + + const events = parseSimAgentMessage(message) + for (const simEvent of events) { + const renderEvents = transformSimAgentEvent(simEvent, context) + for (const renderEvent of renderEvents) { + await onRenderEvent(renderEvent) + } + } + } + } + + // Process any remaining buffer content + if (buffer.trim()) { + const events = parseSimAgentMessage(buffer) + for (const simEvent of events) { + const renderEvents = transformSimAgentEvent(simEvent, context) + for (const renderEvent of renderEvents) { + await onRenderEvent(renderEvent) + } + } + } + + // Emit message complete + await onRenderEvent({ + type: 'message_complete', + messageId: context.assistantMessageId, + timestamp: Date.now(), + }) + + // Notify persistence layer + if (onPersist) { + await onPersist({ type: 'message_complete', messageId: context.assistantMessageId }) + } + } catch (error) { + logger.error('Stream transform error', { streamId: context.streamId, error }) + + await onRenderEvent({ + type: 'error', + error: error instanceof Error ? error.message : 'Stream processing error', + timestamp: Date.now(), + }) + + throw error + } finally { + reader.releaseLock() + } +} + +/** + * Parse a raw SSE message into sim agent events + */ +function parseSimAgentMessage(message: string): SimAgentEvent[] { + const events: SimAgentEvent[] = [] + const lines = message.split('\n') + + let currentEvent: string | null = null + let currentData: string[] = [] + + for (const line of lines) { + if (line.startsWith('event: ')) { + // If we have accumulated data, emit previous event + if (currentData.length > 0) { + const dataStr = currentData.join('\n') + const parsed = tryParseJson(dataStr) + if (parsed) { + events.push({ ...parsed, event: currentEvent || undefined }) + } + currentData = [] + } + currentEvent = line.slice(7) + } else if (line.startsWith('data: ')) { + currentData.push(line.slice(6)) + } else if (line === '' && currentData.length > 0) { + // Empty line signals end of event + const dataStr = currentData.join('\n') + const parsed = tryParseJson(dataStr) + if (parsed) { + events.push({ ...parsed, event: currentEvent || undefined }) + } + currentEvent = null + currentData = [] + } + } + + // Handle remaining data + if (currentData.length > 0) { + const dataStr = currentData.join('\n') + const parsed = tryParseJson(dataStr) + if (parsed) { + events.push({ ...parsed, event: currentEvent || undefined }) + } + } + + return events +} + +function tryParseJson(str: string): Record | null { + if (str === '[DONE]') return null + try { + return JSON.parse(str) + } catch { + return null + } +} + +/** + * Transform a sim agent event into one or more render events + */ +function transformSimAgentEvent( + simEvent: SimAgentEvent, + context: TransformStreamContext +): RenderEvent[] { + const eventType = simEvent.type || simEvent.event + const events: RenderEvent[] = [] + const timestamp = Date.now() + + switch (eventType) { + // Text content events + case 'content_block_delta': + case 'text_delta': + case 'delta': { + const delta = (simEvent.delta as Record) || simEvent + const text = (delta.text as string) || (delta.content as string) || (simEvent.text as string) + if (text) { + events.push({ type: 'text_delta', content: text, timestamp }) + } + break + } + + case 'content_block_stop': + case 'text_complete': { + events.push({ + type: 'text_complete', + content: (simEvent.content as string) || '', + timestamp, + }) + break + } + + // Tool call events + case 'tool_call': + case 'tool_use': { + const data = (simEvent.data as Record) || simEvent + const toolCallId = (data.id as string) || (simEvent.id as string) + const toolName = (data.name as string) || (simEvent.name as string) + const args = (data.arguments as Record) || (data.input as Record) + + if (toolCallId && toolName) { + events.push({ + type: 'tool_pending', + toolCallId, + toolName, + args, + timestamp, + }) + } + break + } + + case 'tool_executing': { + const toolCallId = (simEvent.toolCallId as string) || (simEvent.id as string) + const toolName = (simEvent.toolName as string) || (simEvent.name as string) || '' + + if (toolCallId) { + events.push({ + type: 'tool_executing', + toolCallId, + toolName, + timestamp, + }) + } + break + } + + case 'tool_result': { + const toolCallId = (simEvent.toolCallId as string) || (simEvent.id as string) + const success = simEvent.success as boolean + const result = simEvent.result + const error = simEvent.error as string | undefined + + if (toolCallId) { + events.push({ + type: 'tool_result', + toolCallId, + success: success !== false, + result, + error, + failedDependency: simEvent.failedDependency as boolean | undefined, + skipped: (simEvent.result as Record)?.skipped as boolean | undefined, + timestamp, + }) + + // Also emit success/error event for UI + if (success !== false) { + events.push({ + type: 'tool_success', + toolCallId, + toolName: (simEvent.toolName as string) || '', + result, + timestamp, + }) + } else { + events.push({ + type: 'tool_error', + toolCallId, + toolName: (simEvent.toolName as string) || '', + error: error || 'Tool execution failed', + timestamp, + }) + } + } + break + } + + // Subagent events + case 'subagent_start': { + events.push({ + type: 'subagent_start', + parentToolCallId: simEvent.parentToolCallId as string, + subagentName: simEvent.subagentName as string, + timestamp, + }) + break + } + + case 'subagent_text': + case 'subagent_delta': { + events.push({ + type: 'subagent_text', + parentToolCallId: simEvent.parentToolCallId as string, + content: (simEvent.content as string) || (simEvent.text as string) || '', + timestamp, + }) + break + } + + case 'subagent_tool_call': { + events.push({ + type: 'subagent_tool_call', + parentToolCallId: simEvent.parentToolCallId as string, + toolCallId: simEvent.toolCallId as string, + toolName: simEvent.toolName as string, + args: simEvent.args as Record | undefined, + state: (simEvent.state as 'pending' | 'executing' | 'success' | 'error') || 'pending', + result: simEvent.result, + error: simEvent.error as string | undefined, + timestamp, + }) + break + } + + case 'subagent_end': { + events.push({ + type: 'subagent_end', + parentToolCallId: simEvent.parentToolCallId as string, + timestamp, + }) + break + } + + // Thinking events (for extended thinking models) + case 'thinking_start': + case 'thinking': { + if (simEvent.type === 'thinking_start' || !simEvent.content) { + events.push({ type: 'thinking_start', timestamp }) + } + if (simEvent.content) { + events.push({ + type: 'thinking_delta', + content: simEvent.content as string, + timestamp, + }) + } + break + } + + case 'thinking_delta': { + events.push({ + type: 'thinking_delta', + content: (simEvent.content as string) || '', + timestamp, + }) + break + } + + case 'thinking_end': + case 'thinking_complete': { + events.push({ type: 'thinking_end', timestamp }) + break + } + + // Message lifecycle events + case 'message_start': { + events.push({ + type: 'message_start', + messageId: (simEvent.messageId as string) || context.assistantMessageId, + timestamp, + }) + break + } + + case 'message_stop': + case 'message_complete': + case 'message_delta': { + if (eventType === 'message_complete' || eventType === 'message_stop') { + events.push({ + type: 'message_complete', + messageId: (simEvent.messageId as string) || context.assistantMessageId, + content: simEvent.content as string | undefined, + timestamp, + }) + } + break + } + + // Metadata events + case 'chat_id': { + events.push({ + type: 'chat_id', + chatId: simEvent.chatId as string, + timestamp, + }) + break + } + + case 'conversation_id': { + events.push({ + type: 'conversation_id', + conversationId: simEvent.conversationId as string, + timestamp, + }) + break + } + + // Error events + case 'error': { + events.push({ + type: 'error', + error: (simEvent.error as string) || (simEvent.message as string) || 'Unknown error', + code: simEvent.code as string | undefined, + timestamp, + }) + break + } + + default: { + // Log unhandled event types for debugging + if (eventType && eventType !== 'ping') { + logger.debug('Unhandled sim agent event type', { eventType, streamId: context.streamId }) + } + } + } + + return events +} diff --git a/apps/sim/lib/copilot/tools/server/executor.ts b/apps/sim/lib/copilot/tools/server/executor.ts new file mode 100644 index 000000000..77277adfa --- /dev/null +++ b/apps/sim/lib/copilot/tools/server/executor.ts @@ -0,0 +1,255 @@ +import { createLogger } from '@sim/logger' +import { routeExecution } from './router' +import { saveWorkflowToNormalizedTables } from '@/lib/workflows/persistence/utils' + +const logger = createLogger('ServerToolExecutor') + +export interface ServerToolContext { + workflowId: string + userId: string + persistChanges?: boolean +} + +export interface ServerToolResult { + success: boolean + result?: unknown + error?: string +} + +/** + * Execute any copilot tool completely server-side. + * This is the central dispatcher for headless/API operation. + */ +export async function executeToolServerSide( + toolCall: { name: string; args: Record }, + context: ServerToolContext +): Promise { + const { name, args } = toolCall + const { workflowId, userId, persistChanges = true } = context + + logger.info('Executing tool server-side', { name, workflowId, userId }) + + try { + const result = await executeToolInternal(name, args, context) + return { success: true, result } + } catch (error) { + logger.error('Server-side tool execution failed', { + name, + workflowId, + error: error instanceof Error ? error.message : String(error), + }) + return { + success: false, + error: error instanceof Error ? error.message : 'Tool execution failed', + } + } +} + +async function executeToolInternal( + name: string, + args: Record, + context: ServerToolContext +): Promise { + const { workflowId, userId, persistChanges = true } = context + + switch (name) { + case 'edit_workflow': { + // Execute edit_workflow with direct persistence + const result = await routeExecution( + 'edit_workflow', + { + ...args, + workflowId, + // Don't require currentUserWorkflow - server tool will load from DB + }, + { userId } + ) + + // Persist directly to database if enabled + if (persistChanges && result.workflowState) { + try { + await saveWorkflowToNormalizedTables(workflowId, result.workflowState) + logger.info('Workflow changes persisted directly', { workflowId }) + } catch (error) { + logger.error('Failed to persist workflow changes', { error, workflowId }) + // Don't throw - return the result anyway + } + } + + return result + } + + case 'run_workflow': { + // Import dynamically to avoid circular dependencies + const { executeWorkflow } = await import('@/lib/workflows/executor/execute-workflow') + + const result = await executeWorkflow({ + workflowId, + input: (args.workflow_input as Record) || {}, + isClientSession: false, + }) + + return result + } + + case 'deploy_api': + case 'deploy_chat': + case 'deploy_mcp': { + // Import dynamically + const { deployWorkflow } = await import('@/lib/workflows/persistence/utils') + + const deployType = name.replace('deploy_', '') + const result = await deployWorkflow({ + workflowId, + deployedBy: userId, + }) + + return { ...result, deployType } + } + + case 'redeploy': { + const { deployWorkflow } = await import('@/lib/workflows/persistence/utils') + + const result = await deployWorkflow({ + workflowId, + deployedBy: userId, + }) + + return result + } + + // Server tools that already exist in the router + case 'get_blocks_and_tools': + case 'get_blocks_metadata': + case 'get_block_options': + case 'get_block_config': + case 'get_trigger_blocks': + case 'get_workflow_console': + case 'search_documentation': + case 'search_online': + case 'set_environment_variables': + case 'get_credentials': + case 'make_api_request': + case 'knowledge_base': { + return routeExecution(name, args, { userId }) + } + + // Tools that just need workflowId context + case 'get_user_workflow': + case 'get_workflow_data': { + const { loadWorkflowFromNormalizedTables } = await import( + '@/lib/workflows/persistence/utils' + ) + const { sanitizeForCopilot } = await import('@/lib/workflows/sanitization/json-sanitizer') + + const workflowData = await loadWorkflowFromNormalizedTables(workflowId) + if (!workflowData) { + throw new Error('Workflow not found') + } + + const sanitized = sanitizeForCopilot({ + blocks: workflowData.blocks, + edges: workflowData.edges, + loops: workflowData.loops, + parallels: workflowData.parallels, + }) + + return { workflow: JSON.stringify(sanitized, null, 2) } + } + + case 'list_user_workflows': { + const { db } = await import('@sim/db') + const { workflow: workflowTable } = await import('@sim/db/schema') + const { eq } = await import('drizzle-orm') + + const workflows = await db + .select({ + id: workflowTable.id, + name: workflowTable.name, + description: workflowTable.description, + isDeployed: workflowTable.isDeployed, + createdAt: workflowTable.createdAt, + updatedAt: workflowTable.updatedAt, + }) + .from(workflowTable) + .where(eq(workflowTable.userId, userId)) + + return { workflows } + } + + case 'check_deployment_status': { + const { db } = await import('@sim/db') + const { workflow: workflowTable } = await import('@sim/db/schema') + const { eq } = await import('drizzle-orm') + + const [wf] = await db + .select({ + isDeployed: workflowTable.isDeployed, + deployedAt: workflowTable.deployedAt, + }) + .from(workflowTable) + .where(eq(workflowTable.id, workflowId)) + .limit(1) + + return { + isDeployed: wf?.isDeployed || false, + deployedAt: wf?.deployedAt || null, + } + } + + default: { + logger.warn('Unknown tool for server-side execution', { name }) + throw new Error(`Tool ${name} is not available for server-side execution`) + } + } +} + +/** + * Check if a tool can be executed server-side + */ +export function isServerExecutableTool(toolName: string): boolean { + const serverExecutableTools = new Set([ + // Core editing tools + 'edit_workflow', + 'run_workflow', + + // Deployment tools + 'deploy_api', + 'deploy_chat', + 'deploy_mcp', + 'redeploy', + 'check_deployment_status', + + // Existing server tools + 'get_blocks_and_tools', + 'get_blocks_metadata', + 'get_block_options', + 'get_block_config', + 'get_trigger_blocks', + 'get_workflow_console', + 'search_documentation', + 'search_online', + 'set_environment_variables', + 'get_credentials', + 'make_api_request', + 'knowledge_base', + + // Workflow info tools + 'get_user_workflow', + 'get_workflow_data', + 'list_user_workflows', + ]) + + return serverExecutableTools.has(toolName) +} + +/** + * Get list of tools that require client-side execution + */ +export function getClientOnlyTools(): string[] { + return [ + 'navigate_ui', // Requires DOM + 'oauth_request_access', // Requires browser auth flow + ] +} + diff --git a/apps/sim/stores/panel/copilot/store.ts b/apps/sim/stores/panel/copilot/store.ts index 697265df8..aa4f10cbf 100644 --- a/apps/sim/stores/panel/copilot/store.ts +++ b/apps/sim/stores/panel/copilot/store.ts @@ -2708,6 +2708,16 @@ export const useCopilotStore = create()( }) if (result.success && result.stream) { + // Store stream ID for potential resumption on disconnect + if (result.streamId) { + const { storeActiveStream } = await import('@/lib/copilot/stream-client') + storeActiveStream( + result.chatId || currentChat?.id || '', + result.streamId, + streamingMessage.id + ) + } + await get().handleStreamingResponse( result.stream, streamingMessage.id, @@ -2715,6 +2725,12 @@ export const useCopilotStore = create()( userMessage.id ) set({ chatsLastLoadedAt: null, chatsLoadedForWorkflow: null }) + + // Clear stream storage on successful completion + if (result.streamId) { + const { clearStoredActiveStream } = await import('@/lib/copilot/stream-client') + clearStoredActiveStream() + } } else { if (result.error === 'Request was aborted') { return @@ -3853,6 +3869,68 @@ export const useCopilotStore = create()( return autoAllowedTools.includes(toolId) }, + // Stream resumption + attemptStreamResumption: async () => { + const { isSendingMessage } = get() + if (isSendingMessage) { + logger.info('[Stream] Cannot attempt resumption while already sending') + return false + } + + try { + const { attemptStreamResumption, clearStoredActiveStream } = await import( + '@/lib/copilot/stream-client' + ) + + const resumption = await attemptStreamResumption() + if (!resumption) { + return false + } + + const { stream, metadata } = resumption + + logger.info('[Stream] Resuming stream', { + streamId: metadata.streamId, + chatId: metadata.chatId, + }) + + // Find or create the assistant message for this stream + const { messages } = get() + let assistantMessageId = metadata.assistantMessageId + + // If we don't have the assistant message, create a placeholder + if (!assistantMessageId || !messages.find((m) => m.id === assistantMessageId)) { + assistantMessageId = crypto.randomUUID() + const streamingMessage: CopilotMessage = { + id: assistantMessageId, + role: 'assistant', + content: '', + timestamp: new Date().toISOString(), + isStreaming: true, + contentBlocks: [], + } + set((state) => ({ + messages: [...state.messages, streamingMessage], + isSendingMessage: true, + })) + } + + // Process the resumed stream + await get().handleStreamingResponse( + stream, + assistantMessageId, + true, // This is a continuation + metadata.userMessageId + ) + + clearStoredActiveStream() + return true + } catch (error) { + logger.error('[Stream] Resumption failed', { error }) + return false + } + }, + // Message queue actions addToQueue: (message, options) => { const queuedMessage: import('./types').QueuedMessage = { diff --git a/apps/sim/stores/panel/copilot/types.ts b/apps/sim/stores/panel/copilot/types.ts index 477275c3a..29c244cc2 100644 --- a/apps/sim/stores/panel/copilot/types.ts +++ b/apps/sim/stores/panel/copilot/types.ts @@ -63,6 +63,8 @@ export interface CopilotMessage { fileAttachments?: MessageFileAttachment[] contexts?: ChatContext[] errorType?: 'usage_limit' | 'unauthorized' | 'forbidden' | 'rate_limit' | 'upgrade_required' + /** Whether this message is currently being streamed */ + isStreaming?: boolean } /** @@ -235,6 +237,9 @@ export interface CopilotActions { removeAutoAllowedTool: (toolId: string) => Promise isToolAutoAllowed: (toolId: string) => boolean + // Stream resumption + attemptStreamResumption: () => Promise + // Message queue actions addToQueue: ( message: string,