From 5da1dfb5e49f2b609731c3cf073edcb74c70bf65 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Mon, 19 Jan 2026 19:46:56 -0800 Subject: [PATCH] v0 --- .../chat/[chatId]/active-stream/route.ts | 86 +++ apps/sim/app/api/copilot/chat/route.ts | 534 ++++++------------ .../copilot/stream/[streamId]/abort/route.ts | 64 +++ .../api/copilot/stream/[streamId]/route.ts | 160 ++++++ .../panel/components/copilot/copilot.tsx | 22 +- apps/sim/lib/copilot/api.ts | 84 ++- apps/sim/lib/copilot/server-tool-executor.ts | 438 ++++++++++++++ apps/sim/lib/copilot/stream-persistence.ts | 453 +++++++++++++++ .../copilot/tools/client/init-tool-configs.ts | 4 + .../tools/client/navigation/navigate-ui.ts | 10 + .../lib/copilot/tools/client/other/tour.ts | 1 + .../sim/lib/copilot/tools/client/ui-config.ts | 15 + apps/sim/stores/panel/copilot/store.ts | 235 +++++++- apps/sim/stores/panel/copilot/types.ts | 12 + bun.lock | 1 - 15 files changed, 1704 insertions(+), 415 deletions(-) create mode 100644 apps/sim/app/api/copilot/chat/[chatId]/active-stream/route.ts create mode 100644 apps/sim/app/api/copilot/stream/[streamId]/abort/route.ts create mode 100644 apps/sim/app/api/copilot/stream/[streamId]/route.ts create mode 100644 apps/sim/lib/copilot/server-tool-executor.ts create mode 100644 apps/sim/lib/copilot/stream-persistence.ts diff --git a/apps/sim/app/api/copilot/chat/[chatId]/active-stream/route.ts b/apps/sim/app/api/copilot/chat/[chatId]/active-stream/route.ts new file mode 100644 index 000000000..c91bdc55d --- /dev/null +++ b/apps/sim/app/api/copilot/chat/[chatId]/active-stream/route.ts @@ -0,0 +1,86 @@ +/** + * GET /api/copilot/chat/[chatId]/active-stream + * + * Check if a chat has an active stream that can be resumed. + * Used by the client on page load to detect if there's an in-progress stream. + */ + +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { getSession } from '@/lib/auth' +import { + getActiveStreamForChat, + getChunkCount, + getStreamMeta, +} from '@/lib/copilot/stream-persistence' + +const logger = createLogger('CopilotActiveStreamAPI') + +export async function GET( + req: NextRequest, + { params }: { params: Promise<{ chatId: string }> } +) { + const session = await getSession() + if (!session?.user?.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const { chatId } = await params + + logger.info('Active stream check', { chatId, userId: session.user.id }) + + // Look up active stream ID from Redis + const streamId = await getActiveStreamForChat(chatId) + + if (!streamId) { + logger.debug('No active stream found', { chatId }) + return NextResponse.json({ hasActiveStream: false }) + } + + // Get stream metadata + const meta = await getStreamMeta(streamId) + + if (!meta) { + logger.debug('Stream metadata not found', { streamId, chatId }) + return NextResponse.json({ hasActiveStream: false }) + } + + // Verify the stream is still active + if (meta.status !== 'streaming') { + logger.debug('Stream not active', { streamId, chatId, status: meta.status }) + return NextResponse.json({ hasActiveStream: false }) + } + + // Verify ownership + if (meta.userId !== session.user.id) { + logger.warn('Stream belongs to different user', { + streamId, + chatId, + requesterId: session.user.id, + ownerId: meta.userId, + }) + return NextResponse.json({ hasActiveStream: false }) + } + + // Get current chunk count for client to track progress + const chunkCount = await getChunkCount(streamId) + + logger.info('Active stream found', { + streamId, + chatId, + chunkCount, + toolCallsCount: meta.toolCalls.length, + }) + + return NextResponse.json({ + hasActiveStream: true, + streamId, + chunkCount, + toolCalls: meta.toolCalls.filter( + (tc) => tc.state === 'pending' || tc.state === 'executing' + ), + createdAt: meta.createdAt, + updatedAt: meta.updatedAt, + }) +} + diff --git a/apps/sim/app/api/copilot/chat/route.ts b/apps/sim/app/api/copilot/chat/route.ts index 9d31bf5c3..10e02249a 100644 --- a/apps/sim/app/api/copilot/chat/route.ts +++ b/apps/sim/app/api/copilot/chat/route.ts @@ -2,6 +2,7 @@ import { db } from '@sim/db' import { copilotChats } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, desc, eq } from 'drizzle-orm' +import { after } from 'next/server' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { getSession } from '@/lib/auth' @@ -16,6 +17,16 @@ import { createRequestTracker, createUnauthorizedResponse, } from '@/lib/copilot/request-helpers' +import { + appendChunk, + appendContent, + checkAbortSignal, + completeStream, + createStream, + errorStream, + refreshStreamTTL, + updateToolCall, +} from '@/lib/copilot/stream-persistence' import { getCredentialsServerTool } from '@/lib/copilot/tools/server/user/get-credentials' import type { CopilotProviderConfig } from '@/lib/copilot/types' import { env } from '@/lib/core/config/env' @@ -492,385 +503,186 @@ export async function POST(req: NextRequest) { ) } - // If streaming is requested, forward the stream and update chat later + // If streaming is requested, start background processing and return streamId immediately if (stream && simAgentResponse.body) { - // Create user message to save - const userMessage = { - id: userMessageIdToUse, // Consistent ID used for request and persistence - role: 'user', - content: message, - timestamp: new Date().toISOString(), - ...(fileAttachments && fileAttachments.length > 0 && { fileAttachments }), - ...(Array.isArray(contexts) && contexts.length > 0 && { contexts }), - ...(Array.isArray(contexts) && - contexts.length > 0 && { - contentBlocks: [{ type: 'contexts', contexts: contexts as any, timestamp: Date.now() }], - }), - } + // Create stream ID for persistence and resumption + const streamId = crypto.randomUUID() - // Create a pass-through stream that captures the response - const transformedStream = new ReadableStream({ - async start(controller) { - const encoder = new TextEncoder() - let assistantContent = '' - const toolCalls: any[] = [] - let buffer = '' - const isFirstDone = true - let responseIdFromStart: string | undefined - let responseIdFromDone: string | undefined - // Track tool call progress to identify a safe done event - const announcedToolCallIds = new Set() - const startedToolExecutionIds = new Set() - const completedToolExecutionIds = new Set() - let lastDoneResponseId: string | undefined - let lastSafeDoneResponseId: string | undefined + // Initialize stream state in Redis + await createStream({ + streamId, + chatId: actualChatId!, + userId: authenticatedUserId, + workflowId, + userMessageId: userMessageIdToUse, + isClientSession: true, + }) - // Send chatId as first event - if (actualChatId) { - const chatIdEvent = `data: ${JSON.stringify({ - type: 'chat_id', - chatId: actualChatId, - })}\n\n` - controller.enqueue(encoder.encode(chatIdEvent)) - logger.debug(`[${tracker.requestId}] Sent initial chatId event to client`) - } + // Track last TTL refresh time + const TTL_REFRESH_INTERVAL = 60000 // Refresh TTL every minute - // Start title generation in parallel if needed - if (actualChatId && !currentChat?.title && conversationHistory.length === 0) { - generateChatTitle(message) - .then(async (title) => { - if (title) { - await db - .update(copilotChats) - .set({ - title, - updatedAt: new Date(), - }) - .where(eq(copilotChats.id, actualChatId!)) + // Capture needed values for background task + const capturedChatId = actualChatId! + const capturedCurrentChat = currentChat - const titleEvent = `data: ${JSON.stringify({ - type: 'title_updated', - title: title, - })}\n\n` - controller.enqueue(encoder.encode(titleEvent)) - logger.info(`[${tracker.requestId}] Generated and saved title: ${title}`) - } - }) - .catch((error) => { - logger.error(`[${tracker.requestId}] Title generation failed:`, error) - }) - } else { - logger.debug(`[${tracker.requestId}] Skipping title generation`) - } + // Start background processing task - runs independently of client + // Client will connect to /api/copilot/stream/{streamId} for live updates + const backgroundTask = (async () => { + const bgReader = simAgentResponse.body!.getReader() + const decoder = new TextDecoder() + let buffer = '' + let assistantContent = '' + const toolCalls: any[] = [] + let lastSafeDoneResponseId: string | undefined + let bgLastTTLRefresh = Date.now() - // Forward the sim agent stream and capture assistant response - const reader = simAgentResponse.body!.getReader() - const decoder = new TextDecoder() + // Send initial events via Redis for client to receive + const chatIdEvent = `data: ${JSON.stringify({ type: 'chat_id', chatId: capturedChatId })}\n\n` + await appendChunk(streamId, chatIdEvent).catch(() => {}) - try { - while (true) { - const { done, value } = await reader.read() - if (done) { - break - } + const streamIdEvent = `data: ${JSON.stringify({ type: 'stream_id', streamId })}\n\n` + await appendChunk(streamId, streamIdEvent).catch(() => {}) - // Decode and parse SSE events for logging and capturing content - const decodedChunk = decoder.decode(value, { stream: true }) - buffer += decodedChunk - - const lines = buffer.split('\n') - buffer = lines.pop() || '' // Keep incomplete line in buffer - - for (const line of lines) { - if (line.trim() === '') continue // Skip empty lines - - if (line.startsWith('data: ') && line.length > 6) { - try { - const jsonStr = line.slice(6) - - // Check if the JSON string is unusually large (potential streaming issue) - if (jsonStr.length > 50000) { - // 50KB limit - logger.warn(`[${tracker.requestId}] Large SSE event detected`, { - size: jsonStr.length, - preview: `${jsonStr.substring(0, 100)}...`, - }) - } - - const event = JSON.parse(jsonStr) - - // Log different event types comprehensively - switch (event.type) { - case 'content': - if (event.data) { - assistantContent += event.data - } - break - - case 'reasoning': - logger.debug( - `[${tracker.requestId}] Reasoning chunk received (${(event.data || event.content || '').length} chars)` - ) - break - - case 'tool_call': - if (!event.data?.partial) { - toolCalls.push(event.data) - if (event.data?.id) { - announcedToolCallIds.add(event.data.id) - } - } - break - - case 'tool_generating': - if (event.toolCallId) { - startedToolExecutionIds.add(event.toolCallId) - } - break - - case 'tool_result': - if (event.toolCallId) { - completedToolExecutionIds.add(event.toolCallId) - } - break - - case 'tool_error': - logger.error(`[${tracker.requestId}] Tool error:`, { - toolCallId: event.toolCallId, - toolName: event.toolName, - error: event.error, - success: event.success, - }) - if (event.toolCallId) { - completedToolExecutionIds.add(event.toolCallId) - } - break - - case 'start': - if (event.data?.responseId) { - responseIdFromStart = event.data.responseId - } - break - - case 'done': - if (event.data?.responseId) { - responseIdFromDone = event.data.responseId - lastDoneResponseId = responseIdFromDone - - // Mark this done as safe only if no tool call is currently in progress or pending - const announced = announcedToolCallIds.size - const completed = completedToolExecutionIds.size - const started = startedToolExecutionIds.size - const hasToolInProgress = announced > completed || started > completed - if (!hasToolInProgress) { - lastSafeDoneResponseId = responseIdFromDone - } - } - break - - case 'error': - break - - default: - } - - // Emit to client: rewrite 'error' events into user-friendly assistant message - if (event?.type === 'error') { - try { - const displayMessage: string = - (event?.data && (event.data.displayMessage as string)) || - 'Sorry, I encountered an error. Please try again.' - const formatted = `_${displayMessage}_` - // Accumulate so it persists to DB as assistant content - assistantContent += formatted - // Send as content chunk - try { - controller.enqueue( - encoder.encode( - `data: ${JSON.stringify({ type: 'content', data: formatted })}\n\n` - ) - ) - } catch (enqueueErr) { - reader.cancel() - break - } - // Then close this response cleanly for the client - try { - controller.enqueue( - encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`) - ) - } catch (enqueueErr) { - reader.cancel() - break - } - } catch {} - // Do not forward the original error event - } else { - // Forward original event to client - try { - controller.enqueue(encoder.encode(`data: ${jsonStr}\n\n`)) - } catch (enqueueErr) { - reader.cancel() - break - } - } - } catch (e) { - // Enhanced error handling for large payloads and parsing issues - const lineLength = line.length - const isLargePayload = lineLength > 10000 - - if (isLargePayload) { - logger.error( - `[${tracker.requestId}] Failed to parse large SSE event (${lineLength} chars)`, - { - error: e, - preview: `${line.substring(0, 200)}...`, - size: lineLength, - } - ) - } else { - logger.warn( - `[${tracker.requestId}] Failed to parse SSE event: "${line.substring(0, 200)}..."`, - e - ) - } - } - } else if (line.trim() && line !== 'data: [DONE]') { - logger.debug(`[${tracker.requestId}] Non-SSE line from sim agent: "${line}"`) - } - } - } - - // Process any remaining buffer - if (buffer.trim()) { - logger.debug(`[${tracker.requestId}] Processing remaining buffer: "${buffer}"`) - if (buffer.startsWith('data: ')) { - try { - const jsonStr = buffer.slice(6) - const event = JSON.parse(jsonStr) - if (event.type === 'content' && event.data) { - assistantContent += event.data - } - // Forward remaining event, applying same error rewrite behavior - if (event?.type === 'error') { - const displayMessage: string = - (event?.data && (event.data.displayMessage as string)) || - 'Sorry, I encountered an error. Please try again.' - const formatted = `_${displayMessage}_` - assistantContent += formatted - try { - controller.enqueue( - encoder.encode( - `data: ${JSON.stringify({ type: 'content', data: formatted })}\n\n` - ) - ) - controller.enqueue( - encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`) - ) - } catch (enqueueErr) { - reader.cancel() - } - } else { - try { - controller.enqueue(encoder.encode(`data: ${jsonStr}\n\n`)) - } catch (enqueueErr) { - reader.cancel() - } - } - } catch (e) { - logger.warn(`[${tracker.requestId}] Failed to parse final buffer: "${buffer}"`) - } - } - } - - // Log final streaming summary - logger.info(`[${tracker.requestId}] Streaming complete summary:`, { - totalContentLength: assistantContent.length, - toolCallsCount: toolCalls.length, - hasContent: assistantContent.length > 0, - toolNames: toolCalls.map((tc) => tc?.name).filter(Boolean), - }) - - // NOTE: Messages are saved by the client via update-messages endpoint with full contentBlocks. - // Server only updates conversationId here to avoid overwriting client's richer save. - if (currentChat) { - // Persist only a safe conversationId to avoid continuing from a state that expects tool outputs - const previousConversationId = currentChat?.conversationId as string | undefined - const responseId = lastSafeDoneResponseId || previousConversationId || undefined - - if (responseId) { + // Start title generation if needed + if (capturedChatId && !capturedCurrentChat?.title && conversationHistory.length === 0) { + generateChatTitle(message) + .then(async (title) => { + if (title) { await db .update(copilotChats) - .set({ - updatedAt: new Date(), - conversationId: responseId, - }) - .where(eq(copilotChats.id, actualChatId!)) + .set({ title, updatedAt: new Date() }) + .where(eq(copilotChats.id, capturedChatId)) - logger.info( - `[${tracker.requestId}] Updated conversationId for chat ${actualChatId}`, - { - updatedConversationId: responseId, - } - ) + const titleEvent = `data: ${JSON.stringify({ type: 'title_updated', title })}\n\n` + await appendChunk(streamId, titleEvent).catch(() => {}) + logger.info(`[${tracker.requestId}] Generated and saved title: ${title}`) } - } - } catch (error) { - logger.error(`[${tracker.requestId}] Error processing stream:`, error) + }) + .catch((error) => { + logger.error(`[${tracker.requestId}] Title generation failed:`, error) + }) + } - // Send an error event to the client before closing so it knows what happened - try { - const errorMessage = - error instanceof Error && error.message === 'terminated' - ? 'Connection to AI service was interrupted. Please try again.' - : 'An unexpected error occurred while processing the response.' - const encoder = new TextEncoder() - - // Send error as content so it shows in the chat - controller.enqueue( - encoder.encode( - `data: ${JSON.stringify({ type: 'content', data: `\n\n_${errorMessage}_` })}\n\n` - ) - ) - // Send done event to properly close the stream on client - controller.enqueue(encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`)) - } catch (enqueueError) { - // Stream might already be closed, that's ok - logger.warn( - `[${tracker.requestId}] Could not send error event to client:`, - enqueueError - ) + try { + while (true) { + // Check for abort signal + const isAborted = await checkAbortSignal(streamId) + if (isAborted) { + logger.info(`[${tracker.requestId}] Background stream aborted via signal`, { streamId }) + bgReader.cancel() + break } - } finally { - try { - controller.close() - } catch { - // Controller might already be closed + + const { done, value } = await bgReader.read() + if (done) break + + const chunk = decoder.decode(value, { stream: true }) + buffer += chunk + + // Persist raw chunk for replay and publish to live subscribers + await appendChunk(streamId, chunk).catch(() => {}) + + // Refresh TTL periodically + const now = Date.now() + if (now - bgLastTTLRefresh > TTL_REFRESH_INTERVAL) { + bgLastTTLRefresh = now + refreshStreamTTL(streamId, capturedChatId).catch(() => {}) + } + + // Parse and track content/tool calls + const lines = buffer.split('\n') + buffer = lines.pop() || '' + + for (const line of lines) { + if (!line.startsWith('data: ') || line.length <= 6) continue + try { + const event = JSON.parse(line.slice(6)) + switch (event.type) { + case 'content': + if (event.data) { + assistantContent += event.data + appendContent(streamId, event.data).catch(() => {}) + } + break + case 'tool_call': + if (!event.data?.partial && event.data?.id) { + toolCalls.push(event.data) + updateToolCall(streamId, event.data.id, { + id: event.data.id, + name: event.data.name, + args: event.data.arguments || {}, + state: 'pending', + }).catch(() => {}) + } + break + case 'tool_generating': + if (event.toolCallId) { + updateToolCall(streamId, event.toolCallId, { state: 'executing' }).catch(() => {}) + } + break + case 'tool_result': + if (event.toolCallId) { + updateToolCall(streamId, event.toolCallId, { + state: 'success', + result: event.result, + }).catch(() => {}) + } + break + case 'tool_error': + if (event.toolCallId) { + updateToolCall(streamId, event.toolCallId, { + state: 'error', + error: event.error, + }).catch(() => {}) + } + break + case 'done': + if (event.data?.responseId) { + lastSafeDoneResponseId = event.data.responseId + } + break + } + } catch {} } } - }, + + // Complete stream - save to DB + const finalConversationId = lastSafeDoneResponseId || (capturedCurrentChat?.conversationId as string | undefined) + await completeStream(streamId, finalConversationId) + + // Update conversationId in DB + if (capturedCurrentChat && lastSafeDoneResponseId) { + await db + .update(copilotChats) + .set({ updatedAt: new Date(), conversationId: lastSafeDoneResponseId }) + .where(eq(copilotChats.id, capturedChatId)) + } + + logger.info(`[${tracker.requestId}] Background stream processing complete`, { + streamId, + contentLength: assistantContent.length, + toolCallsCount: toolCalls.length, + }) + } catch (error) { + logger.error(`[${tracker.requestId}] Background stream error`, { streamId, error }) + await errorStream(streamId, error instanceof Error ? error.message : 'Unknown error') + } + })() + + // Use after() to ensure background task completes even after response is sent + after(() => backgroundTask) + + // Return streamId immediately - client will connect to stream endpoint + logger.info(`[${tracker.requestId}] Returning streamId for client to connect`, { + streamId, + chatId: capturedChatId, }) - const response = new Response(transformedStream, { - headers: { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - Connection: 'keep-alive', - 'X-Accel-Buffering': 'no', - }, + return NextResponse.json({ + success: true, + streamId, + chatId: capturedChatId, }) - - logger.info(`[${tracker.requestId}] Returning streaming response to client`, { - duration: tracker.getDuration(), - chatId: actualChatId, - headers: { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - Connection: 'keep-alive', - }, - }) - - return response } // For non-streaming responses @@ -899,7 +711,7 @@ export async function POST(req: NextRequest) { // Save messages if we have a chat if (currentChat && responseData.content) { const userMessage = { - id: userMessageIdToUse, // Consistent ID used for request and persistence + id: userMessageIdToUse, role: 'user', content: message, timestamp: new Date().toISOString(), diff --git a/apps/sim/app/api/copilot/stream/[streamId]/abort/route.ts b/apps/sim/app/api/copilot/stream/[streamId]/abort/route.ts new file mode 100644 index 000000000..6d92f6647 --- /dev/null +++ b/apps/sim/app/api/copilot/stream/[streamId]/abort/route.ts @@ -0,0 +1,64 @@ +/** + * POST /api/copilot/stream/[streamId]/abort + * + * Signal the server to abort an active stream. + * The original request handler will check for this signal and cancel the stream. + */ + +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { getSession } from '@/lib/auth' +import { getStreamMeta, setAbortSignal } from '@/lib/copilot/stream-persistence' + +const logger = createLogger('CopilotStreamAbortAPI') + +export async function POST( + req: NextRequest, + { params }: { params: Promise<{ streamId: string }> } +) { + const session = await getSession() + if (!session?.user?.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const { streamId } = await params + + logger.info('Stream abort request', { streamId, userId: session.user.id }) + + const meta = await getStreamMeta(streamId) + + if (!meta) { + logger.info('Stream not found for abort', { streamId }) + return NextResponse.json({ error: 'Stream not found' }, { status: 404 }) + } + + // Verify ownership + if (meta.userId !== session.user.id) { + logger.warn('Unauthorized abort attempt', { + streamId, + requesterId: session.user.id, + ownerId: meta.userId, + }) + return NextResponse.json({ error: 'Forbidden' }, { status: 403 }) + } + + // Stream already finished + if (meta.status !== 'streaming') { + logger.info('Stream already finished, nothing to abort', { + streamId, + status: meta.status, + }) + return NextResponse.json({ + success: true, + message: 'Stream already finished', + }) + } + + // Set abort signal in Redis + await setAbortSignal(streamId) + + logger.info('Abort signal set for stream', { streamId }) + + return NextResponse.json({ success: true }) +} + diff --git a/apps/sim/app/api/copilot/stream/[streamId]/route.ts b/apps/sim/app/api/copilot/stream/[streamId]/route.ts new file mode 100644 index 000000000..66f641f94 --- /dev/null +++ b/apps/sim/app/api/copilot/stream/[streamId]/route.ts @@ -0,0 +1,160 @@ +/** + * GET /api/copilot/stream/[streamId] + * + * Resume an active copilot stream. + * - If stream is still active: returns SSE with replay of missed chunks + live updates via Redis Pub/Sub + * - If stream is completed: returns JSON indicating to load from database + * - If stream not found: returns 404 + */ + +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { getSession } from '@/lib/auth' +import { + getChunks, + getStreamMeta, + subscribeToStream, +} from '@/lib/copilot/stream-persistence' + +const logger = createLogger('CopilotStreamResumeAPI') + +const SSE_HEADERS = { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'X-Accel-Buffering': 'no', +} + +export async function GET( + req: NextRequest, + { params }: { params: Promise<{ streamId: string }> } +) { + const session = await getSession() + if (!session?.user?.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const { streamId } = await params + const fromChunk = parseInt(req.nextUrl.searchParams.get('from') || '0') + + logger.info('Stream resume request', { streamId, fromChunk, userId: session.user.id }) + + const meta = await getStreamMeta(streamId) + + if (!meta) { + logger.info('Stream not found or expired', { streamId }) + return NextResponse.json( + { + status: 'not_found', + message: 'Stream not found or expired. Reload chat from database.', + }, + { status: 404 } + ) + } + + // Verify ownership + if (meta.userId !== session.user.id) { + logger.warn('Unauthorized stream access attempt', { + streamId, + requesterId: session.user.id, + ownerId: meta.userId, + }) + return NextResponse.json({ error: 'Forbidden' }, { status: 403 }) + } + + // Stream completed - tell client to load from database + if (meta.status === 'completed') { + logger.info('Stream already completed', { streamId, chatId: meta.chatId }) + return NextResponse.json({ + status: 'completed', + chatId: meta.chatId, + message: 'Stream completed. Messages saved to database.', + }) + } + + // Stream errored + if (meta.status === 'error') { + logger.info('Stream encountered error', { streamId, chatId: meta.chatId }) + return NextResponse.json({ + status: 'error', + chatId: meta.chatId, + message: 'Stream encountered an error.', + }) + } + + // Stream still active - return SSE with replay + live updates + logger.info('Resuming active stream', { streamId, chatId: meta.chatId }) + + const encoder = new TextEncoder() + const abortController = new AbortController() + + // Handle client disconnect + req.signal.addEventListener('abort', () => { + logger.info('Client disconnected from resumed stream', { streamId }) + abortController.abort() + }) + + const responseStream = new ReadableStream({ + async start(controller) { + try { + // 1. Replay missed chunks (single read from Redis LIST) + const missedChunks = await getChunks(streamId, fromChunk) + logger.info('Replaying missed chunks', { + streamId, + fromChunk, + missedChunkCount: missedChunks.length, + }) + + for (const chunk of missedChunks) { + // Chunks are already in SSE format, just re-encode + controller.enqueue(encoder.encode(chunk)) + } + + // 2. Subscribe to live chunks via Redis Pub/Sub (blocking, no polling) + await subscribeToStream( + streamId, + (chunk) => { + try { + controller.enqueue(encoder.encode(chunk)) + } catch { + // Client disconnected + abortController.abort() + } + }, + () => { + // Stream complete - close connection + logger.info('Stream completed during resume', { streamId }) + try { + controller.close() + } catch { + // Already closed + } + }, + abortController.signal + ) + } catch (error) { + logger.error('Error in stream resume', { + streamId, + error: error instanceof Error ? error.message : String(error), + }) + try { + controller.close() + } catch { + // Already closed + } + } + }, + cancel() { + abortController.abort() + }, + }) + + return new Response(responseStream, { + headers: { + ...SSE_HEADERS, + 'X-Stream-Id': streamId, + 'X-Chat-Id': meta.chatId, + }, + }) +} + diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/copilot.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/copilot.tsx index 03053ccf3..d3d1e4d39 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/copilot.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/copilot.tsx @@ -191,26 +191,10 @@ export const Copilot = forwardRef(({ panelWidth }, ref }, [isInitialized, messages.length, scrollToBottom]) /** - * Cleanup on component unmount (page refresh, navigation, etc.) - * Uses a ref to track sending state to avoid stale closure issues - * Note: Parent workflow.tsx also has useStreamCleanup for page-level cleanup + * Note: We intentionally do NOT abort on component unmount. + * Streams continue server-side and can be resumed when user returns. + * The server persists chunks to Redis for resumption. */ - const isSendingRef = useRef(isSendingMessage) - isSendingRef.current = isSendingMessage - const abortMessageRef = useRef(abortMessage) - abortMessageRef.current = abortMessage - - useEffect(() => { - return () => { - // Use refs to check current values, not stale closure values - if (isSendingRef.current) { - abortMessageRef.current() - logger.info('Aborted active message streaming due to component unmount') - } - } - // Empty deps - only run cleanup on actual unmount, not on re-renders - // eslint-disable-next-line react-hooks/exhaustive-deps - }, []) /** * Container-level click capture to cancel edit mode when clicking outside the current edit area diff --git a/apps/sim/lib/copilot/api.ts b/apps/sim/lib/copilot/api.ts index c680f9751..8537e0af6 100644 --- a/apps/sim/lib/copilot/api.ts +++ b/apps/sim/lib/copilot/api.ts @@ -3,6 +3,15 @@ import type { CopilotMode, CopilotModelId, CopilotTransportMode } from '@/lib/co const logger = createLogger('CopilotAPI') +/** + * Response from chat initiation endpoint + */ +export interface ChatInitResponse { + success: boolean + streamId: string + chatId: string +} + /** * Citation interface for documentation references */ @@ -115,10 +124,16 @@ async function handleApiError(response: Response, defaultMessage: string): Promi /** * Send a streaming message to the copilot chat API * This is the main API endpoint that handles all chat operations + * + * Server-first architecture: + * 1. POST to /api/copilot/chat - starts background processing, returns { streamId, chatId } + * 2. Connect to /api/copilot/stream/{streamId} for SSE stream + * + * This ensures stream continues server-side even if client disconnects */ export async function sendStreamingMessage( request: SendMessageRequest -): Promise { +): Promise { try { const { abortSignal, ...requestBody } = request try { @@ -138,34 +153,83 @@ export async function sendStreamingMessage( contextsPreview: preview, }) } catch {} - const response = await fetch('/api/copilot/chat', { + + // Step 1: Initiate chat - server starts background processing + const initResponse = await fetch('/api/copilot/chat', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ ...requestBody, stream: true }), - signal: abortSignal, - credentials: 'include', // Include cookies for session authentication + credentials: 'include', }) - if (!response.ok) { - const errorMessage = await handleApiError(response, 'Failed to send streaming message') + if (!initResponse.ok) { + const errorMessage = await handleApiError(initResponse, 'Failed to initiate chat') return { success: false, error: errorMessage, - status: response.status, + status: initResponse.status, } } - if (!response.body) { + const initData: ChatInitResponse = await initResponse.json() + if (!initData.success || !initData.streamId) { return { success: false, - error: 'No response body received', + error: 'Failed to get stream ID from server', status: 500, } } + logger.info('Chat initiated, connecting to stream', { + streamId: initData.streamId, + chatId: initData.chatId, + }) + + // Step 2: Connect to stream endpoint for SSE + const streamResponse = await fetch(`/api/copilot/stream/${initData.streamId}`, { + method: 'GET', + headers: { Accept: 'text/event-stream' }, + signal: abortSignal, + credentials: 'include', + }) + + if (!streamResponse.ok) { + // Handle completed/not found cases + if (streamResponse.status === 404) { + return { + success: false, + error: 'Stream not found or expired', + status: 404, + streamId: initData.streamId, + chatId: initData.chatId, + } + } + + const errorMessage = await handleApiError(streamResponse, 'Failed to connect to stream') + return { + success: false, + error: errorMessage, + status: streamResponse.status, + streamId: initData.streamId, + chatId: initData.chatId, + } + } + + if (!streamResponse.body) { + return { + success: false, + error: 'No stream body received', + status: 500, + streamId: initData.streamId, + chatId: initData.chatId, + } + } + return { success: true, - stream: response.body, + stream: streamResponse.body, + streamId: initData.streamId, + chatId: initData.chatId, } } catch (error) { // Handle AbortError gracefully - this is expected when user aborts diff --git a/apps/sim/lib/copilot/server-tool-executor.ts b/apps/sim/lib/copilot/server-tool-executor.ts new file mode 100644 index 000000000..929e57232 --- /dev/null +++ b/apps/sim/lib/copilot/server-tool-executor.ts @@ -0,0 +1,438 @@ +/** + * Server-Side Tool Executor for Copilot + * + * Executes copilot tools server-side when no client session is present. + * Handles routing to appropriate server implementations and marking tools complete. + */ + +import { db } from '@sim/db' +import { account, workflow } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq } from 'drizzle-orm' +import { isClientOnlyTool } from '@/lib/copilot/tools/client/ui-config' +import { routeExecution } from '@/lib/copilot/tools/server/router' +import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants' +import { env } from '@/lib/core/config/env' +import { generateRequestId } from '@/lib/core/utils/request' +import { getBaseUrl } from '@/lib/core/utils/urls' +import { getEffectiveDecryptedEnv } from '@/lib/environment/utils' +import { refreshTokenIfNeeded } from '@/app/api/auth/oauth/utils' +import { resolveEnvVarReferences } from '@/executor/utils/reference-validation' +import { executeTool } from '@/tools' +import { getTool, resolveToolId } from '@/tools/utils' + +const logger = createLogger('ServerToolExecutor') + +const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT + +/** + * Context for tool execution + */ +export interface ToolExecutionContext { + userId: string + workflowId: string + chatId: string + streamId: string + workspaceId?: string +} + +/** + * Result of tool execution + */ +export interface ToolExecutionResult { + success: boolean + status: number + message?: string + data?: unknown +} + +/** + * Tools that have dedicated server implementations in the router + */ +const SERVER_ROUTED_TOOLS = [ + 'edit_workflow', + 'get_workflow_data', + 'get_workflow_console', + 'get_blocks_and_tools', + 'get_blocks_metadata', + 'get_block_options', + 'get_block_config', + 'get_trigger_blocks', + 'knowledge_base', + 'set_environment_variables', + 'get_credentials', + 'search_documentation', + 'make_api_request', + 'search_online', +] + +/** + * Tools that execute workflows + */ +const WORKFLOW_EXECUTION_TOOLS = ['run_workflow'] + +/** + * Tools that handle deployments + */ +const DEPLOYMENT_TOOLS = ['deploy_api', 'deploy_chat', 'deploy_mcp', 'redeploy'] + +/** + * Execute a tool server-side. + * Returns result to be sent to Sim Agent via mark-complete. + */ +export async function executeToolServerSide( + toolName: string, + toolCallId: string, + args: Record, + context: ToolExecutionContext +): Promise { + logger.info('Executing tool server-side', { + toolName, + toolCallId, + userId: context.userId, + workflowId: context.workflowId, + }) + + // 1. Check if tool is client-only + if (isClientOnlyTool(toolName)) { + logger.info('Skipping client-only tool', { toolName, toolCallId }) + return { + success: true, + status: 200, + message: `Tool "${toolName}" requires a browser session and was skipped in API mode.`, + data: { skipped: true, reason: 'client_only' }, + } + } + + try { + // 2. Route to appropriate executor + if (SERVER_ROUTED_TOOLS.includes(toolName)) { + return executeServerRoutedTool(toolName, args, context) + } + + if (WORKFLOW_EXECUTION_TOOLS.includes(toolName)) { + return executeRunWorkflow(args, context) + } + + if (DEPLOYMENT_TOOLS.includes(toolName)) { + return executeDeploymentTool(toolName, args, context) + } + + // 3. Try integration tool execution (Slack, Gmail, etc.) + return executeIntegrationTool(toolName, toolCallId, args, context) + } catch (error) { + logger.error('Tool execution failed', { + toolName, + toolCallId, + error: error instanceof Error ? error.message : String(error), + }) + return { + success: false, + status: 500, + message: error instanceof Error ? error.message : 'Tool execution failed', + } + } +} + +/** + * Execute a tool that has a dedicated server implementation + */ +async function executeServerRoutedTool( + toolName: string, + args: Record, + context: ToolExecutionContext +): Promise { + try { + const result = await routeExecution(toolName, args, { userId: context.userId }) + return { + success: true, + status: 200, + data: result, + } + } catch (error) { + return { + success: false, + status: 500, + message: error instanceof Error ? error.message : 'Server tool execution failed', + } + } +} + +/** + * Execute the run_workflow tool + */ +async function executeRunWorkflow( + args: Record, + context: ToolExecutionContext +): Promise { + const workflowId = (args.workflowId as string) || context.workflowId + const input = (args.input as Record) || {} + + logger.info('Executing run_workflow', { workflowId, inputKeys: Object.keys(input) }) + + try { + const response = await fetch(`${getBaseUrl()}/api/workflows/${workflowId}/execute`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${await generateInternalToken()}`, + }, + body: JSON.stringify({ + input, + triggerType: 'copilot', + workflowId, // For internal auth + }), + }) + + if (!response.ok) { + const errorText = await response.text() + return { + success: false, + status: response.status, + message: `Workflow execution failed: ${errorText}`, + } + } + + const result = await response.json() + return { + success: true, + status: 200, + data: result, + } + } catch (error) { + return { + success: false, + status: 500, + message: error instanceof Error ? error.message : 'Workflow execution failed', + } + } +} + +/** + * Execute a deployment tool + */ +async function executeDeploymentTool( + toolName: string, + args: Record, + context: ToolExecutionContext +): Promise { + // Deployment tools modify workflow state and create deployments + // These can be executed server-side via the server router + try { + const result = await routeExecution(toolName, args, { userId: context.userId }) + return { + success: true, + status: 200, + data: result, + } + } catch (error) { + // If the tool isn't in the router, it might need to be added + // For now, return a skip result + logger.warn('Deployment tool not available server-side', { toolName }) + return { + success: true, + status: 200, + message: `Deployment tool "${toolName}" executed with limited functionality in API mode.`, + data: { skipped: true, reason: 'limited_api_support' }, + } + } +} + +/** + * Execute an integration tool (Slack, Gmail, etc.) + * Uses the same logic as /api/copilot/execute-tool + */ +async function executeIntegrationTool( + toolName: string, + toolCallId: string, + args: Record, + context: ToolExecutionContext +): Promise { + const resolvedToolName = resolveToolId(toolName) + const toolConfig = getTool(resolvedToolName) + + if (!toolConfig) { + // Tool not found - try server router as fallback + try { + const result = await routeExecution(toolName, args, { userId: context.userId }) + return { + success: true, + status: 200, + data: result, + } + } catch { + logger.warn('Tool not found', { toolName, resolvedToolName }) + return { + success: true, + status: 200, + message: `Tool "${toolName}" not found. Skipped.`, + data: { skipped: true, reason: 'not_found' }, + } + } + } + + // Get workspaceId for env vars + let workspaceId = context.workspaceId + if (!workspaceId && context.workflowId) { + const workflowResult = await db + .select({ workspaceId: workflow.workspaceId }) + .from(workflow) + .where(eq(workflow.id, context.workflowId)) + .limit(1) + workspaceId = workflowResult[0]?.workspaceId ?? undefined + } + + // Get decrypted environment variables + const decryptedEnvVars = await getEffectiveDecryptedEnv(context.userId, workspaceId) + + // Resolve env var references in arguments + const executionParams: Record = resolveEnvVarReferences( + args, + decryptedEnvVars, + { + resolveExactMatch: true, + allowEmbedded: true, + trimKeys: true, + onMissing: 'keep', + deep: true, + } + ) as Record + + // Resolve OAuth access token if required + if (toolConfig.oauth?.required && toolConfig.oauth.provider) { + const provider = toolConfig.oauth.provider + + try { + const accounts = await db + .select() + .from(account) + .where(and(eq(account.providerId, provider), eq(account.userId, context.userId))) + .limit(1) + + if (accounts.length > 0) { + const acc = accounts[0] + const requestId = generateRequestId() + const { accessToken } = await refreshTokenIfNeeded(requestId, acc as any, acc.id) + + if (accessToken) { + executionParams.accessToken = accessToken + } else { + return { + success: false, + status: 400, + message: `OAuth token not available for ${provider}. Please reconnect your account.`, + } + } + } else { + return { + success: false, + status: 400, + message: `No ${provider} account connected. Please connect your account first.`, + } + } + } catch (error) { + return { + success: false, + status: 500, + message: `Failed to get OAuth token for ${toolConfig.oauth.provider}`, + } + } + } + + // Check if tool requires an API key + const needsApiKey = toolConfig.params?.apiKey?.required + if (needsApiKey && !executionParams.apiKey) { + return { + success: false, + status: 400, + message: `API key not provided for ${toolName}.`, + } + } + + // Add execution context + executionParams._context = { + workflowId: context.workflowId, + userId: context.userId, + } + + // Special handling for function_execute + if (toolName === 'function_execute') { + executionParams.envVars = decryptedEnvVars + executionParams.workflowVariables = {} + executionParams.blockData = {} + executionParams.blockNameMapping = {} + executionParams.language = executionParams.language || 'javascript' + executionParams.timeout = executionParams.timeout || 30000 + } + + // Execute the tool + const result = await executeTool(resolvedToolName, executionParams, true) + + logger.info('Integration tool execution complete', { + toolName, + success: result.success, + }) + + return { + success: result.success, + status: result.success ? 200 : 500, + message: result.error, + data: result.output, + } +} + +/** + * Mark a tool as complete with Sim Agent + */ +export async function markToolComplete( + toolCallId: string, + toolName: string, + result: ToolExecutionResult +): Promise { + logger.info('Marking tool complete', { + toolCallId, + toolName, + success: result.success, + status: result.status, + }) + + try { + const response = await fetch(`${SIM_AGENT_API_URL}/api/tools/mark-complete`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + ...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}), + }, + body: JSON.stringify({ + id: toolCallId, + name: toolName, + status: result.status, + message: result.message, + data: result.data, + }), + }) + + if (!response.ok) { + logger.error('Mark complete failed', { toolCallId, status: response.status }) + return false + } + + return true + } catch (error) { + logger.error('Mark complete error', { + toolCallId, + error: error instanceof Error ? error.message : String(error), + }) + return false + } +} + +/** + * Generate an internal authentication token for server-to-server calls + */ +async function generateInternalToken(): Promise { + // Use the same pattern as A2A for internal auth + const { generateInternalToken: genToken } = await import('@/app/api/a2a/serve/[agentId]/utils') + return genToken() +} + diff --git a/apps/sim/lib/copilot/stream-persistence.ts b/apps/sim/lib/copilot/stream-persistence.ts new file mode 100644 index 000000000..0d8597cce --- /dev/null +++ b/apps/sim/lib/copilot/stream-persistence.ts @@ -0,0 +1,453 @@ +/** + * Stream Persistence Service for Copilot + * + * Handles persisting copilot stream state to Redis (ephemeral) and database (permanent). + * Uses Redis LIST for chunk history and Pub/Sub for live updates (no polling). + * + * Redis Key Structure: + * - copilot:stream:{streamId}:meta → StreamMeta JSON (TTL: 10 min) + * - copilot:stream:{streamId}:chunks → LIST of chunks (for replay) + * - copilot:stream:{streamId} → Pub/Sub CHANNEL (for live updates) + * - copilot:active:{chatId} → streamId lookup + * - copilot:abort:{streamId} → abort signal flag + */ + +import { db } from '@sim/db' +import { copilotChats } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { eq } from 'drizzle-orm' +import type Redis from 'ioredis' +import { getRedisClient } from '@/lib/core/config/redis' + +const logger = createLogger('CopilotStreamPersistence') + +const STREAM_TTL = 60 * 10 // 10 minutes + +/** + * Tool call record stored in stream state + */ +export interface ToolCallRecord { + id: string + name: string + args: Record + state: 'pending' | 'executing' | 'success' | 'error' | 'skipped' + result?: unknown + error?: string +} + +/** + * Stream metadata stored in Redis + */ +export interface StreamMeta { + id: string + status: 'streaming' | 'completed' | 'error' + chatId: string + userId: string + workflowId: string + userMessageId: string + isClientSession: boolean + toolCalls: ToolCallRecord[] + assistantContent: string + conversationId?: string + createdAt: number + updatedAt: number +} + +/** + * Parameters for creating a new stream + */ +export interface CreateStreamParams { + streamId: string + chatId: string + userId: string + workflowId: string + userMessageId: string + isClientSession: boolean +} + +// ============ WRITE OPERATIONS (used by original request handler) ============ + +/** + * Create a new stream state in Redis + */ +export async function createStream(params: CreateStreamParams): Promise { + const redis = getRedisClient() + if (!redis) { + logger.warn('Redis not available, stream persistence disabled') + return + } + + const meta: StreamMeta = { + id: params.streamId, + status: 'streaming', + chatId: params.chatId, + userId: params.userId, + workflowId: params.workflowId, + userMessageId: params.userMessageId, + isClientSession: params.isClientSession, + toolCalls: [], + assistantContent: '', + createdAt: Date.now(), + updatedAt: Date.now(), + } + + const metaKey = `copilot:stream:${params.streamId}:meta` + const activeKey = `copilot:active:${params.chatId}` + + await redis.setex(metaKey, STREAM_TTL, JSON.stringify(meta)) + await redis.setex(activeKey, STREAM_TTL, params.streamId) + + logger.info('Created stream state', { streamId: params.streamId, chatId: params.chatId }) +} + +/** + * Append a chunk to the stream buffer and publish for live subscribers + */ +export async function appendChunk(streamId: string, chunk: string): Promise { + const redis = getRedisClient() + if (!redis) return + + const listKey = `copilot:stream:${streamId}:chunks` + const channel = `copilot:stream:${streamId}` + + // Push to list for replay, publish for live subscribers + await redis.rpush(listKey, chunk) + await redis.expire(listKey, STREAM_TTL) + await redis.publish(channel, chunk) +} + +/** + * Append content to the accumulated assistant content + */ +export async function appendContent(streamId: string, content: string): Promise { + const redis = getRedisClient() + if (!redis) return + + const metaKey = `copilot:stream:${streamId}:meta` + const raw = await redis.get(metaKey) + if (!raw) return + + const meta: StreamMeta = JSON.parse(raw) + meta.assistantContent += content + meta.updatedAt = Date.now() + + await redis.setex(metaKey, STREAM_TTL, JSON.stringify(meta)) +} + +/** + * Update stream metadata + */ +export async function updateMeta(streamId: string, update: Partial): Promise { + const redis = getRedisClient() + if (!redis) return + + const metaKey = `copilot:stream:${streamId}:meta` + const raw = await redis.get(metaKey) + if (!raw) return + + const meta: StreamMeta = { ...JSON.parse(raw), ...update, updatedAt: Date.now() } + await redis.setex(metaKey, STREAM_TTL, JSON.stringify(meta)) +} + +/** + * Update a specific tool call in the stream state + */ +export async function updateToolCall( + streamId: string, + toolCallId: string, + update: Partial +): Promise { + const redis = getRedisClient() + if (!redis) return + + const metaKey = `copilot:stream:${streamId}:meta` + const raw = await redis.get(metaKey) + if (!raw) return + + const meta: StreamMeta = JSON.parse(raw) + const toolCallIndex = meta.toolCalls.findIndex((tc) => tc.id === toolCallId) + + if (toolCallIndex >= 0) { + meta.toolCalls[toolCallIndex] = { ...meta.toolCalls[toolCallIndex], ...update } + } else { + // Add new tool call + meta.toolCalls.push({ + id: toolCallId, + name: update.name || 'unknown', + args: update.args || {}, + state: update.state || 'pending', + result: update.result, + error: update.error, + }) + } + + meta.updatedAt = Date.now() + await redis.setex(metaKey, STREAM_TTL, JSON.stringify(meta)) +} + +/** + * Complete the stream - save to database and cleanup Redis + */ +export async function completeStream(streamId: string, conversationId?: string): Promise { + const redis = getRedisClient() + if (!redis) return + + const meta = await getStreamMeta(streamId) + if (!meta) return + + // Publish completion event for subscribers + await redis.publish(`copilot:stream:${streamId}`, JSON.stringify({ type: 'stream_complete' })) + + // Save to database + await saveToDatabase(meta, conversationId) + + // Cleanup Redis + await redis.del(`copilot:stream:${streamId}:meta`) + await redis.del(`copilot:stream:${streamId}:chunks`) + await redis.del(`copilot:active:${meta.chatId}`) + await redis.del(`copilot:abort:${streamId}`) + + logger.info('Completed stream', { streamId, chatId: meta.chatId }) +} + +/** + * Mark stream as errored and save partial content + */ +export async function errorStream(streamId: string, error: string): Promise { + const redis = getRedisClient() + if (!redis) return + + const meta = await getStreamMeta(streamId) + if (!meta) return + + // Update status + meta.status = 'error' + + // Publish error event for subscribers + await redis.publish( + `copilot:stream:${streamId}`, + JSON.stringify({ type: 'stream_error', error }) + ) + + // Still save what we have to database + await saveToDatabase(meta) + + // Cleanup Redis + await redis.del(`copilot:stream:${streamId}:meta`) + await redis.del(`copilot:stream:${streamId}:chunks`) + await redis.del(`copilot:active:${meta.chatId}`) + await redis.del(`copilot:abort:${streamId}`) + + logger.info('Errored stream', { streamId, error }) +} + +/** + * Save stream content to database as assistant message + */ +async function saveToDatabase(meta: StreamMeta, conversationId?: string): Promise { + try { + const [chat] = await db + .select() + .from(copilotChats) + .where(eq(copilotChats.id, meta.chatId)) + .limit(1) + + if (!chat) { + logger.warn('Chat not found for stream save', { chatId: meta.chatId }) + return + } + + const existingMessages = Array.isArray(chat.messages) ? chat.messages : [] + + // Build the assistant message + const assistantMessage = { + id: crypto.randomUUID(), + role: 'assistant', + content: meta.assistantContent, + toolCalls: meta.toolCalls, + timestamp: new Date().toISOString(), + serverCompleted: true, // Mark that this was completed server-side + } + + const updatedMessages = [...existingMessages, assistantMessage] + + await db + .update(copilotChats) + .set({ + messages: updatedMessages, + conversationId: conversationId || (chat.conversationId as string | undefined), + updatedAt: new Date(), + }) + .where(eq(copilotChats.id, meta.chatId)) + + logger.info('Saved stream to database', { + streamId: meta.id, + chatId: meta.chatId, + contentLength: meta.assistantContent.length, + toolCallsCount: meta.toolCalls.length, + }) + } catch (error) { + logger.error('Failed to save stream to database', { streamId: meta.id, error }) + } +} + +// ============ READ OPERATIONS (used by resume handler) ============ + +/** + * Get stream metadata + */ +export async function getStreamMeta(streamId: string): Promise { + const redis = getRedisClient() + if (!redis) return null + + const raw = await redis.get(`copilot:stream:${streamId}:meta`) + return raw ? JSON.parse(raw) : null +} + +/** + * Get chunks from stream history (for replay) + */ +export async function getChunks(streamId: string, fromIndex: number = 0): Promise { + const redis = getRedisClient() + if (!redis) return [] + + const listKey = `copilot:stream:${streamId}:chunks` + return redis.lrange(listKey, fromIndex, -1) +} + +/** + * Get the number of chunks in the stream + */ +export async function getChunkCount(streamId: string): Promise { + const redis = getRedisClient() + if (!redis) return 0 + + const listKey = `copilot:stream:${streamId}:chunks` + return redis.llen(listKey) +} + +/** + * Get active stream ID for a chat (if any) + */ +export async function getActiveStreamForChat(chatId: string): Promise { + const redis = getRedisClient() + if (!redis) return null + + return redis.get(`copilot:active:${chatId}`) +} + +// ============ SUBSCRIPTION (for resume handler) ============ + +/** + * Subscribe to live stream updates. + * Uses Redis Pub/Sub - no polling, fully event-driven. + * + * @param streamId - Stream to subscribe to + * @param onChunk - Callback for each new chunk + * @param onComplete - Callback when stream completes + * @param signal - Optional AbortSignal to cancel subscription + */ +export async function subscribeToStream( + streamId: string, + onChunk: (chunk: string) => void, + onComplete: () => void, + signal?: AbortSignal +): Promise { + const redis = getRedisClient() + if (!redis) { + onComplete() + return + } + + // Create a separate Redis connection for subscription + const subscriber = redis.duplicate() + const channel = `copilot:stream:${streamId}` + + let isComplete = false + + const cleanup = () => { + if (!isComplete) { + isComplete = true + subscriber.unsubscribe(channel).catch(() => {}) + subscriber.quit().catch(() => {}) + } + } + + signal?.addEventListener('abort', cleanup) + + await subscriber.subscribe(channel) + + subscriber.on('message', (ch, message) => { + if (ch !== channel) return + + try { + const parsed = JSON.parse(message) + if (parsed.type === 'stream_complete' || parsed.type === 'stream_error') { + cleanup() + onComplete() + return + } + } catch { + // Not a control message, just a chunk + } + + onChunk(message) + }) + + subscriber.on('error', (err) => { + logger.error('Subscriber error', { streamId, error: err }) + cleanup() + onComplete() + }) +} + +// ============ ABORT HANDLING ============ + +/** + * Set abort signal for a stream. + * The original request handler should check this and cancel if set. + */ +export async function setAbortSignal(streamId: string): Promise { + const redis = getRedisClient() + if (!redis) return + + await redis.setex(`copilot:abort:${streamId}`, 60, '1') + // Also publish to channel so handler sees it immediately + await redis.publish(`copilot:stream:${streamId}`, JSON.stringify({ type: 'abort' })) + + logger.info('Set abort signal', { streamId }) +} + +/** + * Check if abort signal is set for a stream + */ +export async function checkAbortSignal(streamId: string): Promise { + const redis = getRedisClient() + if (!redis) return false + + const val = await redis.get(`copilot:abort:${streamId}`) + return val === '1' +} + +/** + * Clear abort signal for a stream + */ +export async function clearAbortSignal(streamId: string): Promise { + const redis = getRedisClient() + if (!redis) return + + await redis.del(`copilot:abort:${streamId}`) +} + +/** + * Refresh TTL on all stream keys (call periodically during long streams) + */ +export async function refreshStreamTTL(streamId: string, chatId: string): Promise { + const redis = getRedisClient() + if (!redis) return + + await redis.expire(`copilot:stream:${streamId}:meta`, STREAM_TTL) + await redis.expire(`copilot:stream:${streamId}:chunks`, STREAM_TTL) + await redis.expire(`copilot:active:${chatId}`, STREAM_TTL) +} + diff --git a/apps/sim/lib/copilot/tools/client/init-tool-configs.ts b/apps/sim/lib/copilot/tools/client/init-tool-configs.ts index 9850c6594..3308adea8 100644 --- a/apps/sim/lib/copilot/tools/client/init-tool-configs.ts +++ b/apps/sim/lib/copilot/tools/client/init-tool-configs.ts @@ -5,6 +5,9 @@ * Import this module early in the app to ensure all tool configs are available. */ +// Navigation tools +import './navigation/navigate-ui' + // Other tools (subagents) import './other/auth' import './other/custom-tool' @@ -41,6 +44,7 @@ export { getToolUIConfig, hasInterrupt, type InterruptConfig, + isClientOnlyTool, isSpecialTool, isSubagentTool, type ParamsTableConfig, diff --git a/apps/sim/lib/copilot/tools/client/navigation/navigate-ui.ts b/apps/sim/lib/copilot/tools/client/navigation/navigate-ui.ts index 5b9d30c06..0a6f42840 100644 --- a/apps/sim/lib/copilot/tools/client/navigation/navigate-ui.ts +++ b/apps/sim/lib/copilot/tools/client/navigation/navigate-ui.ts @@ -5,6 +5,7 @@ import { type BaseClientToolMetadata, ClientToolCallState, } from '@/lib/copilot/tools/client/base-tool' +import { registerToolUIConfig } from '@/lib/copilot/tools/client/ui-config' import { useCopilotStore } from '@/stores/panel/copilot/store' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' @@ -239,3 +240,12 @@ export class NavigateUIClientTool extends BaseClientTool { await this.handleAccept(args) } } + +// Register UI config at module load - clientOnly because this requires browser navigation +registerToolUIConfig(NavigateUIClientTool.id, { + clientOnly: true, + interrupt: { + accept: { text: 'Open', icon: Navigation }, + reject: { text: 'Skip', icon: XCircle }, + }, +}) diff --git a/apps/sim/lib/copilot/tools/client/other/tour.ts b/apps/sim/lib/copilot/tools/client/other/tour.ts index 8faca5587..d382e7bf9 100644 --- a/apps/sim/lib/copilot/tools/client/other/tour.ts +++ b/apps/sim/lib/copilot/tools/client/other/tour.ts @@ -33,6 +33,7 @@ export class TourClientTool extends BaseClientTool { [ClientToolCallState.aborted]: { text: 'Aborted tour', icon: XCircle }, }, uiConfig: { + clientOnly: true, // Tour requires browser UI to guide the user subagent: { streamingLabel: 'Touring', completedLabel: 'Tour complete', diff --git a/apps/sim/lib/copilot/tools/client/ui-config.ts b/apps/sim/lib/copilot/tools/client/ui-config.ts index 6fac1645c..9f5f7b16d 100644 --- a/apps/sim/lib/copilot/tools/client/ui-config.ts +++ b/apps/sim/lib/copilot/tools/client/ui-config.ts @@ -172,6 +172,13 @@ export interface ToolUIConfig { * The tool-call component will use this to render specialized content. */ customRenderer?: 'code' | 'edit_summary' | 'none' + + /** + * Whether this tool requires a client/browser session to execute. + * Client-only tools (like navigate_ui, tour) cannot run in headless/API mode. + * In API-only mode, these tools will be skipped with a message. + */ + clientOnly?: boolean } /** @@ -215,6 +222,14 @@ export function hasInterrupt(toolName: string): boolean { return !!toolUIConfigs[toolName]?.interrupt } +/** + * Check if a tool is client-only (requires browser session). + * Client-only tools cannot execute in headless/API mode. + */ +export function isClientOnlyTool(toolName: string): boolean { + return !!toolUIConfigs[toolName]?.clientOnly +} + /** * Get subagent labels for a tool */ diff --git a/apps/sim/stores/panel/copilot/store.ts b/apps/sim/stores/panel/copilot/store.ts index d93cd30af..d61af0b36 100644 --- a/apps/sim/stores/panel/copilot/store.ts +++ b/apps/sim/stores/panel/copilot/store.ts @@ -1086,6 +1086,14 @@ const sseHandlers: Record = { await get().handleNewChatCreation(context.newChatId) } }, + stream_id: (_data, _context, get) => { + // Store stream ID for potential resumption + const streamId = _data?.streamId + if (streamId) { + get().setActiveStreamId(streamId) + logger.debug('[SSE] Received stream ID', { streamId }) + } + }, tool_result: (data, context, get, set) => { try { const toolCallId: string | undefined = data?.toolCallId || data?.data?.id @@ -1735,10 +1743,12 @@ const sseHandlers: Record = { updateStreamingMessage(set, context) } }, - done: (_data, context) => { + done: (_data, context, get) => { context.doneEventCount++ if (context.doneEventCount >= 1) { context.streamComplete = true + // Clear active stream ID when stream completes + get().setActiveStreamId(null) } }, error: (data, context, _get, set) => { @@ -2227,6 +2237,9 @@ const initialState = { autoAllowedTools: [] as string[], messageQueue: [] as import('./types').QueuedMessage[], suppressAbortContinueOption: false, + activeStreamId: null as string | null, + isResuming: false, + userInitiatedAbort: false, // Track if abort was user-initiated vs browser refresh } export const useCopilotStore = create()( @@ -2243,11 +2256,12 @@ export const useCopilotStore = create()( setWorkflowId: async (workflowId: string | null) => { const currentWorkflowId = get().workflowId if (currentWorkflowId === workflowId) return - const { isSendingMessage } = get() - if (isSendingMessage) get().abortMessage() - // Abort all in-progress tools and clear any diff preview - abortAllInProgressTools(set, get) + // Don't abort - let server-side stream continue for resumption + // Just reset client state; stream will be resumable when returning to the chat + // Don't abort tools either - they may still be running server-side + set({ isSendingMessage: false, abortController: null }) + try { useWorkflowDiffStore.getState().clearDiff({ restoreBaseline: false }) } catch {} @@ -2278,10 +2292,14 @@ export const useCopilotStore = create()( if (!workflowId) { return } - if (currentChat && currentChat.id !== chat.id && isSendingMessage) get().abortMessage() - // Abort in-progress tools and clear diff when changing chats - abortAllInProgressTools(set, get) + // Don't abort when switching chats - let server-side stream continue for resumption + // Just reset client state; stream will be resumable when returning to that chat + // Don't abort tools either - they may still be running server-side + if (currentChat && currentChat.id !== chat.id && isSendingMessage) { + set({ isSendingMessage: false, abortController: null }) + } + try { useWorkflowDiffStore.getState().clearDiff({ restoreBaseline: false }) } catch {} @@ -2367,14 +2385,29 @@ export const useCopilotStore = create()( } } } catch {} + + // Check for active stream that can be resumed + try { + const hasActiveStream = await get().checkForActiveStream(chat.id) + if (hasActiveStream && get().activeStreamId) { + logger.info('[Chat] Resuming active stream on chat select', { chatId: chat.id }) + await get().resumeActiveStream(get().activeStreamId!) + } + } catch (err) { + logger.warn('[Chat] Failed to check/resume active stream', { error: err }) + } }, createNewChat: async () => { const { isSendingMessage } = get() - if (isSendingMessage) get().abortMessage() - // Abort in-progress tools and clear diff on new chat - abortAllInProgressTools(set, get) + // Don't abort when creating new chat - let server-side stream continue for resumption + // Just reset client state; stream will be resumable when returning to that chat + // Don't abort tools either - they may still be running server-side + if (isSendingMessage) { + set({ isSendingMessage: false, abortController: null }) + } + try { useWorkflowDiffStore.getState().clearDiff({ restoreBaseline: false }) } catch {} @@ -2497,6 +2530,21 @@ export const useCopilotStore = create()( mode: refreshedMode, selectedModel: refreshedModel as CopilotStore['selectedModel'], }) + + // Check for active stream that can be resumed (e.g., after page refresh) + try { + const hasActiveStream = await get().checkForActiveStream(updatedCurrentChat.id) + if (hasActiveStream && get().activeStreamId) { + logger.info('[Chat] Resuming active stream on refresh', { + chatId: updatedCurrentChat.id, + }) + await get().resumeActiveStream(get().activeStreamId!) + } + } catch (err) { + logger.warn('[Chat] Failed to check/resume active stream on refresh', { + error: err, + }) + } } try { await get().loadMessageCheckpoints(updatedCurrentChat.id) @@ -2531,6 +2579,21 @@ export const useCopilotStore = create()( try { await get().loadMessageCheckpoints(mostRecentChat.id) } catch {} + + // Check for active stream that can be resumed (e.g., after page refresh) + try { + const hasActiveStream = await get().checkForActiveStream(mostRecentChat.id) + if (hasActiveStream && get().activeStreamId) { + logger.info('[Chat] Resuming active stream on auto-select', { + chatId: mostRecentChat.id, + }) + await get().resumeActiveStream(get().activeStreamId!) + } + } catch (err) { + logger.warn('[Chat] Failed to check/resume active stream on auto-select', { + error: err, + }) + } } } else { set({ currentChat: null, messages: [] }) @@ -2697,13 +2760,18 @@ export const useCopilotStore = create()( }) if (result.success && result.stream) { + // Store streamId for resumption if client disconnects + if (result.streamId) { + set({ activeStreamId: result.streamId }) + } await get().handleStreamingResponse( result.stream, streamingMessage.id, false, userMessage.id ) - set({ chatsLastLoadedAt: null, chatsLoadedForWorkflow: null }) + // Clear stream ID on completion + set({ activeStreamId: null, chatsLastLoadedAt: null, chatsLoadedForWorkflow: null }) } else { if (result.error === 'Request was aborted') { return @@ -2762,12 +2830,13 @@ export const useCopilotStore = create()( } }, - // Abort streaming + // Abort streaming (user-initiated) abortMessage: (options?: { suppressContinueOption?: boolean }) => { const { abortController, isSendingMessage, messages } = get() if (!isSendingMessage || !abortController) return const suppressContinueOption = options?.suppressContinueOption === true - set({ isAborting: true, suppressAbortContinueOption: suppressContinueOption }) + // Mark this as a user-initiated abort (vs browser refresh which doesn't call this) + set({ isAborting: true, suppressAbortContinueOption: suppressContinueOption, userInitiatedAbort: true }) try { abortController.abort() stopStreamingUpdates() @@ -2861,7 +2930,11 @@ export const useCopilotStore = create()( abortSignal: abortController.signal, }) if (result.success && result.stream) { + if (result.streamId) { + set({ activeStreamId: result.streamId }) + } await get().handleStreamingResponse(result.stream, newAssistantMessage.id, false) + set({ activeStreamId: null }) } else { if (result.error === 'Request was aborted') return const errorMessage = createErrorMessage( @@ -3206,16 +3279,30 @@ export const useCopilotStore = create()( reader.cancel() }, 600000) + // Track if this is a browser-initiated abort (not user clicking stop) + let browserAbort = false + try { for await (const data of parseSSEStream(reader, decoder)) { - const { abortController } = get() + const { abortController, userInitiatedAbort } = get() if (abortController?.signal.aborted) { - context.wasAborted = true - const { suppressAbortContinueOption } = get() - context.suppressContinueOption = suppressAbortContinueOption === true - if (suppressAbortContinueOption) { - set({ suppressAbortContinueOption: false }) + // Only treat as abort if user explicitly clicked stop (not browser refresh) + if (userInitiatedAbort) { + context.wasAborted = true + const { suppressAbortContinueOption } = get() + context.suppressContinueOption = suppressAbortContinueOption === true + if (suppressAbortContinueOption) { + set({ suppressAbortContinueOption: false }) + } + set({ userInitiatedAbort: false }) // Reset flag + } else { + // Browser refresh/navigation - don't update any UI, just exit + // The page is about to reload anyway + browserAbort = true + reader.cancel().catch(() => {}) + return // Exit immediately, skip all finalization } + // User-initiated abort: clean up and break context.pendingContent = '' finalizeThinkingBlock(context) stopStreamingUpdates() @@ -3503,8 +3590,7 @@ export const useCopilotStore = create()( createdAt: new Date(), updatedAt: new Date(), } - // Abort any in-progress tools and clear diff on new chat creation - abortAllInProgressTools(set, get) + // Don't abort tools during streaming - just clear diff try { useWorkflowDiffStore.getState().clearDiff() } catch {} @@ -3527,8 +3613,10 @@ export const useCopilotStore = create()( retrySave: async (_chatId: string) => {}, cleanup: () => { - const { isSendingMessage } = get() - if (isSendingMessage) get().abortMessage() + // Don't abort on cleanup - let server-side stream continue for resumption + // Just reset client state; stream will be resumable on page reload + set({ isSendingMessage: false, abortController: null }) + if (streamingUpdateRAF !== null) { cancelAnimationFrame(streamingUpdateRAF) streamingUpdateRAF = null @@ -3912,6 +4000,105 @@ export const useCopilotStore = create()( set({ messageQueue: [] }) logger.info('[Queue] Queue cleared') }, + + // ===================== + // Stream Resumption + // ===================== + + setActiveStreamId: (streamId) => { + set({ activeStreamId: streamId }) + }, + + checkForActiveStream: async (chatId) => { + try { + const response = await fetch(`/api/copilot/chat/${chatId}/active-stream`, { + credentials: 'include', + }) + + if (!response.ok) { + return false + } + + const data = await response.json() + + if (data.hasActiveStream && data.streamId) { + logger.info('[Resume] Found active stream', { + chatId, + streamId: data.streamId, + chunkCount: data.chunkCount, + }) + set({ activeStreamId: data.streamId }) + return true + } + + return false + } catch (error) { + logger.warn('[Resume] Failed to check for active stream', { chatId, error }) + return false + } + }, + + resumeActiveStream: async (streamId) => { + const state = get() + + if (state.isResuming) { + logger.warn('[Resume] Already resuming a stream') + return + } + + logger.info('[Resume] Resuming stream', { streamId }) + set({ isResuming: true, isSendingMessage: true }) + + try { + const response = await fetch(`/api/copilot/stream/${streamId}?from=0`, { + credentials: 'include', + }) + + if (!response.ok) { + const data = await response.json().catch(() => ({})) + + // Stream completed or errored - refresh messages from DB + if (data.status === 'completed' || data.status === 'error') { + logger.info('[Resume] Stream already finished', { streamId, status: data.status }) + // Reload the chat to get the saved messages + const currentChat = get().currentChat + if (currentChat) { + await get().selectChat(currentChat) + } + return + } + + logger.warn('[Resume] Failed to resume stream', { streamId, status: response.status }) + return + } + + if (!response.body) { + logger.warn('[Resume] No response body for resume stream') + return + } + + // Create a placeholder assistant message for the resumed stream + const resumeMessageId = crypto.randomUUID() + const messages = get().messages + const assistantMessage: CopilotMessage = { + id: resumeMessageId, + role: 'assistant', + content: '', + timestamp: new Date().toISOString(), + toolCalls: [], + contentBlocks: [], + } + + set({ messages: [...messages, assistantMessage] }) + + // Process the resumed stream + await get().handleStreamingResponse(response.body, resumeMessageId, true) + } catch (error) { + logger.error('[Resume] Stream resumption failed', { streamId, error }) + } finally { + set({ isResuming: false, isSendingMessage: false, activeStreamId: null }) + } + }, })) ) diff --git a/apps/sim/stores/panel/copilot/types.ts b/apps/sim/stores/panel/copilot/types.ts index 477275c3a..e7b151adf 100644 --- a/apps/sim/stores/panel/copilot/types.ts +++ b/apps/sim/stores/panel/copilot/types.ts @@ -156,6 +156,13 @@ export interface CopilotState { // Message queue for messages sent while another is in progress messageQueue: QueuedMessage[] + + // Stream resumption state + activeStreamId: string | null + isResuming: boolean + + // Track if abort was user-initiated (vs browser refresh) + userInitiatedAbort: boolean } export interface CopilotActions { @@ -249,6 +256,11 @@ export interface CopilotActions { moveUpInQueue: (id: string) => void sendNow: (id: string) => Promise clearQueue: () => void + + // Stream resumption actions + checkForActiveStream: (chatId: string) => Promise + resumeActiveStream: (streamId: string) => Promise + setActiveStreamId: (streamId: string | null) => void } export type CopilotStore = CopilotState & CopilotActions diff --git a/bun.lock b/bun.lock index 7de9501f3..1bb4262d6 100644 --- a/bun.lock +++ b/bun.lock @@ -1,6 +1,5 @@ { "lockfileVersion": 1, - "configVersion": 1, "workspaces": { "": { "name": "simstudio",