From 6fe8f2aa0492d2a5b68f164d43bc497bb0981d6a Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Wed, 21 Jan 2026 10:24:58 -0800 Subject: [PATCH] Temp --- apps/sim/app/api/copilot/chat/route.ts | 597 +++++++----------- .../hooks/use-copilot-initialization.ts | 31 +- apps/sim/hooks/use-stream-cleanup.ts | 44 +- apps/sim/stores/panel/copilot/store.ts | 8 +- apps/sim/stores/panel/copilot/types.ts | 2 +- 5 files changed, 289 insertions(+), 393 deletions(-) diff --git a/apps/sim/app/api/copilot/chat/route.ts b/apps/sim/app/api/copilot/chat/route.ts index 9d31bf5c3..6ea871ca1 100644 --- a/apps/sim/app/api/copilot/chat/route.ts +++ b/apps/sim/app/api/copilot/chat/route.ts @@ -2,6 +2,7 @@ import { db } from '@sim/db' import { copilotChats } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, desc, eq } from 'drizzle-orm' +import { after } from 'next/server' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { getSession } from '@/lib/auth' @@ -16,16 +17,35 @@ import { createRequestTracker, createUnauthorizedResponse, } from '@/lib/copilot/request-helpers' +import { + type RenderEvent, + serializeRenderEvent, +} from '@/lib/copilot/render-events' +import { + appendChunk, + appendContent, + checkAbortSignal, + completeStream, + createStream, + errorStream, + refreshStreamTTL, + updateToolCall, +} from '@/lib/copilot/stream-persistence' +import { transformStream } from '@/lib/copilot/stream-transformer' import { getCredentialsServerTool } from '@/lib/copilot/tools/server/user/get-credentials' import type { CopilotProviderConfig } from '@/lib/copilot/types' import { env } from '@/lib/core/config/env' -import { CopilotFiles } from '@/lib/uploads' +import { CopilotFiles } from '@/lib/uploads' import { createFileContent } from '@/lib/uploads/utils/file-utils' import { tools } from '@/tools/registry' import { getLatestVersionTools, stripVersionSuffix } from '@/tools/utils' const logger = createLogger('CopilotChatAPI') +export const dynamic = 'force-dynamic' +export const fetchCache = 'force-no-store' +export const runtime = 'nodejs' + const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT const FileAttachmentSchema = z.object({ @@ -492,385 +512,250 @@ export async function POST(req: NextRequest) { ) } - // If streaming is requested, forward the stream and update chat later + // If streaming is requested, return a DIRECT SSE stream for low latency + // Also persist to Redis in background for stream resumption if (stream && simAgentResponse.body) { - // Create user message to save - const userMessage = { - id: userMessageIdToUse, // Consistent ID used for request and persistence - role: 'user', - content: message, - timestamp: new Date().toISOString(), - ...(fileAttachments && fileAttachments.length > 0 && { fileAttachments }), - ...(Array.isArray(contexts) && contexts.length > 0 && { contexts }), - ...(Array.isArray(contexts) && - contexts.length > 0 && { - contentBlocks: [{ type: 'contexts', contexts: contexts as any, timestamp: Date.now() }], - }), + // Create stream ID for persistence and resumption + const streamId = crypto.randomUUID() + + // Initialize stream state in Redis (fire-and-forget) + createStream({ + streamId, + chatId: actualChatId!, + userId: authenticatedUserId, + workflowId, + userMessageId: userMessageIdToUse, + isClientSession: true, + }).catch(() => {}) + + // Save user message to database immediately so it's available on refresh + // This is critical for stream resumption - user message must be persisted before stream starts + if (currentChat) { + const existingMessages = Array.isArray(currentChat.messages) ? currentChat.messages : [] + const userMessage = { + id: userMessageIdToUse, + role: 'user', + content: message, + timestamp: new Date().toISOString(), + ...(fileAttachments && fileAttachments.length > 0 && { fileAttachments }), + ...(Array.isArray(contexts) && contexts.length > 0 && { contexts }), + ...(Array.isArray(contexts) && + contexts.length > 0 && { + contentBlocks: [{ type: 'contexts', contexts: contexts as any, timestamp: Date.now() }], + }), + } + + // Fire-and-forget - don't block the stream + db.update(copilotChats) + .set({ + messages: [...existingMessages, userMessage], + updatedAt: new Date(), + }) + .where(eq(copilotChats.id, actualChatId!)) + .catch(() => {}) + + logger.info(`[${tracker.requestId}] Saving user message (async)`, { + chatId: actualChatId, + messageId: userMessageIdToUse, + }) } - // Create a pass-through stream that captures the response - const transformedStream = new ReadableStream({ - async start(controller) { - const encoder = new TextEncoder() - let assistantContent = '' - const toolCalls: any[] = [] - let buffer = '' - const isFirstDone = true - let responseIdFromStart: string | undefined - let responseIdFromDone: string | undefined - // Track tool call progress to identify a safe done event - const announcedToolCallIds = new Set() - const startedToolExecutionIds = new Set() - const completedToolExecutionIds = new Set() - let lastDoneResponseId: string | undefined - let lastSafeDoneResponseId: string | undefined + // Capture needed values + const capturedChatId = actualChatId! + const capturedCurrentChat = currentChat + const assistantMessageId = crypto.randomUUID() - // Send chatId as first event - if (actualChatId) { - const chatIdEvent = `data: ${JSON.stringify({ - type: 'chat_id', - chatId: actualChatId, - })}\n\n` - controller.enqueue(encoder.encode(chatIdEvent)) - logger.debug(`[${tracker.requestId}] Sent initial chatId event to client`) - } - - // Start title generation in parallel if needed - if (actualChatId && !currentChat?.title && conversationHistory.length === 0) { - generateChatTitle(message) - .then(async (title) => { - if (title) { - await db - .update(copilotChats) - .set({ - title, - updatedAt: new Date(), - }) - .where(eq(copilotChats.id, actualChatId!)) - - const titleEvent = `data: ${JSON.stringify({ - type: 'title_updated', - title: title, - })}\n\n` - controller.enqueue(encoder.encode(titleEvent)) - logger.info(`[${tracker.requestId}] Generated and saved title: ${title}`) - } - }) - .catch((error) => { - logger.error(`[${tracker.requestId}] Title generation failed:`, error) - }) - } else { - logger.debug(`[${tracker.requestId}] Skipping title generation`) - } - - // Forward the sim agent stream and capture assistant response - const reader = simAgentResponse.body!.getReader() - const decoder = new TextDecoder() - - try { - while (true) { - const { done, value } = await reader.read() - if (done) { - break - } - - // Decode and parse SSE events for logging and capturing content - const decodedChunk = decoder.decode(value, { stream: true }) - buffer += decodedChunk - - const lines = buffer.split('\n') - buffer = lines.pop() || '' // Keep incomplete line in buffer - - for (const line of lines) { - if (line.trim() === '') continue // Skip empty lines - - if (line.startsWith('data: ') && line.length > 6) { - try { - const jsonStr = line.slice(6) - - // Check if the JSON string is unusually large (potential streaming issue) - if (jsonStr.length > 50000) { - // 50KB limit - logger.warn(`[${tracker.requestId}] Large SSE event detected`, { - size: jsonStr.length, - preview: `${jsonStr.substring(0, 100)}...`, - }) - } - - const event = JSON.parse(jsonStr) - - // Log different event types comprehensively - switch (event.type) { - case 'content': - if (event.data) { - assistantContent += event.data - } - break - - case 'reasoning': - logger.debug( - `[${tracker.requestId}] Reasoning chunk received (${(event.data || event.content || '').length} chars)` - ) - break - - case 'tool_call': - if (!event.data?.partial) { - toolCalls.push(event.data) - if (event.data?.id) { - announcedToolCallIds.add(event.data.id) - } - } - break - - case 'tool_generating': - if (event.toolCallId) { - startedToolExecutionIds.add(event.toolCallId) - } - break - - case 'tool_result': - if (event.toolCallId) { - completedToolExecutionIds.add(event.toolCallId) - } - break - - case 'tool_error': - logger.error(`[${tracker.requestId}] Tool error:`, { - toolCallId: event.toolCallId, - toolName: event.toolName, - error: event.error, - success: event.success, - }) - if (event.toolCallId) { - completedToolExecutionIds.add(event.toolCallId) - } - break - - case 'start': - if (event.data?.responseId) { - responseIdFromStart = event.data.responseId - } - break - - case 'done': - if (event.data?.responseId) { - responseIdFromDone = event.data.responseId - lastDoneResponseId = responseIdFromDone - - // Mark this done as safe only if no tool call is currently in progress or pending - const announced = announcedToolCallIds.size - const completed = completedToolExecutionIds.size - const started = startedToolExecutionIds.size - const hasToolInProgress = announced > completed || started > completed - if (!hasToolInProgress) { - lastSafeDoneResponseId = responseIdFromDone - } - } - break - - case 'error': - break - - default: - } - - // Emit to client: rewrite 'error' events into user-friendly assistant message - if (event?.type === 'error') { - try { - const displayMessage: string = - (event?.data && (event.data.displayMessage as string)) || - 'Sorry, I encountered an error. Please try again.' - const formatted = `_${displayMessage}_` - // Accumulate so it persists to DB as assistant content - assistantContent += formatted - // Send as content chunk - try { - controller.enqueue( - encoder.encode( - `data: ${JSON.stringify({ type: 'content', data: formatted })}\n\n` - ) - ) - } catch (enqueueErr) { - reader.cancel() - break - } - // Then close this response cleanly for the client - try { - controller.enqueue( - encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`) - ) - } catch (enqueueErr) { - reader.cancel() - break - } - } catch {} - // Do not forward the original error event - } else { - // Forward original event to client - try { - controller.enqueue(encoder.encode(`data: ${jsonStr}\n\n`)) - } catch (enqueueErr) { - reader.cancel() - break - } - } - } catch (e) { - // Enhanced error handling for large payloads and parsing issues - const lineLength = line.length - const isLargePayload = lineLength > 10000 - - if (isLargePayload) { - logger.error( - `[${tracker.requestId}] Failed to parse large SSE event (${lineLength} chars)`, - { - error: e, - preview: `${line.substring(0, 200)}...`, - size: lineLength, - } - ) - } else { - logger.warn( - `[${tracker.requestId}] Failed to parse SSE event: "${line.substring(0, 200)}..."`, - e - ) - } - } - } else if (line.trim() && line !== 'data: [DONE]') { - logger.debug(`[${tracker.requestId}] Non-SSE line from sim agent: "${line}"`) - } - } + // Start title generation if needed (runs in parallel) + if (capturedChatId && !capturedCurrentChat?.title && conversationHistory.length === 0) { + generateChatTitle(message) + .then(async (title) => { + if (title) { + await db + .update(copilotChats) + .set({ title, updatedAt: new Date() }) + .where(eq(copilotChats.id, capturedChatId)) + logger.info(`[${tracker.requestId}] Generated and saved title: ${title}`) } + }) + .catch((error) => { + logger.error(`[${tracker.requestId}] Title generation failed:`, error) + }) + } - // Process any remaining buffer - if (buffer.trim()) { - logger.debug(`[${tracker.requestId}] Processing remaining buffer: "${buffer}"`) - if (buffer.startsWith('data: ')) { - try { - const jsonStr = buffer.slice(6) - const event = JSON.parse(jsonStr) - if (event.type === 'content' && event.data) { - assistantContent += event.data - } - // Forward remaining event, applying same error rewrite behavior - if (event?.type === 'error') { - const displayMessage: string = - (event?.data && (event.data.displayMessage as string)) || - 'Sorry, I encountered an error. Please try again.' - const formatted = `_${displayMessage}_` - assistantContent += formatted - try { - controller.enqueue( - encoder.encode( - `data: ${JSON.stringify({ type: 'content', data: formatted })}\n\n` - ) - ) - controller.enqueue( - encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`) - ) - } catch (enqueueErr) { - reader.cancel() - } - } else { - try { - controller.enqueue(encoder.encode(`data: ${jsonStr}\n\n`)) - } catch (enqueueErr) { - reader.cancel() - } - } - } catch (e) { - logger.warn(`[${tracker.requestId}] Failed to parse final buffer: "${buffer}"`) - } - } - } + // Track accumulated content for final persistence + let accumulatedContent = '' + const accumulatedToolCalls: Array<{ + id: string + name: string + args: Record + state: string + result?: unknown + }> = [] - // Log final streaming summary - logger.info(`[${tracker.requestId}] Streaming complete summary:`, { - totalContentLength: assistantContent.length, - toolCallsCount: toolCalls.length, - hasContent: assistantContent.length > 0, - toolNames: toolCalls.map((tc) => tc?.name).filter(Boolean), - }) + const encoder = new TextEncoder() - // NOTE: Messages are saved by the client via update-messages endpoint with full contentBlocks. - // Server only updates conversationId here to avoid overwriting client's richer save. - if (currentChat) { - // Persist only a safe conversationId to avoid continuing from a state that expects tool outputs - const previousConversationId = currentChat?.conversationId as string | undefined - const responseId = lastSafeDoneResponseId || previousConversationId || undefined + // Track if client is still connected + let clientConnected = true - if (responseId) { - await db - .update(copilotChats) - .set({ - updatedAt: new Date(), - conversationId: responseId, - }) - .where(eq(copilotChats.id, actualChatId!)) + // Create the stream processing promise - this runs independently of client connection + // and is scheduled via after() to ensure it completes even if client disconnects + const streamProcessingPromise = transformStream(simAgentResponse.body!, { + streamId, + chatId: capturedChatId, + userId: authenticatedUserId, + workflowId, + userMessageId: userMessageIdToUse, + assistantMessageId, - logger.info( - `[${tracker.requestId}] Updated conversationId for chat ${actualChatId}`, - { - updatedConversationId: responseId, - } - ) - } - } - } catch (error) { - logger.error(`[${tracker.requestId}] Error processing stream:`, error) + // Emit render events - try to send to client, always persist to Redis + onRenderEvent: async (event: RenderEvent) => { + const serialized = serializeRenderEvent(event) - // Send an error event to the client before closing so it knows what happened + // 1. Persist to Redis FIRST (critical for resumption) + appendChunk(streamId, serialized).catch(() => {}) + + // 2. Try to send to client if still connected (best effort) + if (clientConnected) { try { - const errorMessage = - error instanceof Error && error.message === 'terminated' - ? 'Connection to AI service was interrupted. Please try again.' - : 'An unexpected error occurred while processing the response.' - const encoder = new TextEncoder() - - // Send error as content so it shows in the chat - controller.enqueue( - encoder.encode( - `data: ${JSON.stringify({ type: 'content', data: `\n\n_${errorMessage}_` })}\n\n` - ) - ) - // Send done event to properly close the stream on client - controller.enqueue(encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`)) - } catch (enqueueError) { - // Stream might already be closed, that's ok - logger.warn( - `[${tracker.requestId}] Could not send error event to client:`, - enqueueError - ) + streamController?.enqueue(encoder.encode(serialized)) + } catch { + // Client disconnected - mark as disconnected and continue processing + clientConnected = false + logger.info(`[${tracker.requestId}] Client disconnected, continuing server-side`, { + streamId, + }) } - } finally { + } + + // Update stream metadata for specific events + switch (event.type) { + case 'text_delta': + accumulatedContent += (event as any).content || '' + appendContent(streamId, (event as any).content || '').catch(() => {}) + break + case 'tool_pending': + updateToolCall(streamId, (event as any).toolCallId, { + id: (event as any).toolCallId, + name: (event as any).toolName, + args: (event as any).args || {}, + state: 'pending', + }).catch(() => {}) + break + case 'tool_executing': + updateToolCall(streamId, (event as any).toolCallId, { + state: 'executing', + }).catch(() => {}) + break + case 'tool_success': + updateToolCall(streamId, (event as any).toolCallId, { + state: 'success', + result: (event as any).result, + }).catch(() => {}) + accumulatedToolCalls.push({ + id: (event as any).toolCallId, + name: (event as any).display?.label || '', + args: {}, + state: 'success', + result: (event as any).result, + }) + break + case 'tool_error': + updateToolCall(streamId, (event as any).toolCallId, { + state: 'error', + error: (event as any).error, + }).catch(() => {}) + accumulatedToolCalls.push({ + id: (event as any).toolCallId, + name: (event as any).display?.label || '', + args: {}, + state: 'error', + }) + break + } + }, + + onPersist: async (data) => { + if (data.type === 'message_complete') { + completeStream(streamId, undefined).catch(() => {}) + } + }, + + // Never abort based on client - let stream complete + isAborted: () => false, + }) + .then(() => { + if (capturedCurrentChat) { + db.update(copilotChats) + .set({ updatedAt: new Date() }) + .where(eq(copilotChats.id, capturedChatId)) + .catch(() => {}) + } + + logger.info(`[${tracker.requestId}] Stream processing complete`, { + streamId, + contentLength: accumulatedContent.length, + toolCallsCount: accumulatedToolCalls.length, + clientWasConnected: clientConnected, + }) + }) + .catch((error) => { + logger.error(`[${tracker.requestId}] Stream error`, { streamId, error }) + errorStream(streamId, error instanceof Error ? error.message : 'Unknown error').catch( + () => {} + ) + }) + + // Use after() to ensure stream processing completes even if client disconnects + // This is critical for serverless environments where the handler might be killed + after(streamProcessingPromise) + + // Controller reference for the client stream + let streamController: ReadableStreamDefaultController | null = null + + // Create ReadableStream for client - this is just a view into the processing + const readable = new ReadableStream({ + start(controller) { + streamController = controller + + // Prime the SSE stream to avoid buffering proxies + try { + controller.enqueue(encoder.encode(': ping\n\n')) + } catch {} + + // When stream processing completes, close the client stream + streamProcessingPromise.finally(() => { try { controller.close() } catch { - // Controller might already be closed + // Already closed } - } + }) + }, + cancel() { + // Client cancelled the stream (e.g., navigated away) + clientConnected = false + logger.info(`[${tracker.requestId}] Client stream cancelled, server continues`, { + streamId, + }) }, }) - const response = new Response(transformedStream, { + // Return direct SSE stream with streamId in header for resumption + logger.info(`[${tracker.requestId}] Returning direct SSE stream`, { + streamId, + chatId: capturedChatId, + }) + + return new Response(readable, { headers: { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', + 'Content-Type': 'text/event-stream; charset=utf-8', + 'Cache-Control': 'no-cache, no-transform', Connection: 'keep-alive', 'X-Accel-Buffering': 'no', + 'X-Stream-Id': streamId, + 'X-Chat-Id': capturedChatId, }, }) - - logger.info(`[${tracker.requestId}] Returning streaming response to client`, { - duration: tracker.getDuration(), - chatId: actualChatId, - headers: { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - Connection: 'keep-alive', - }, - }) - - return response } // For non-streaming responses @@ -899,7 +784,7 @@ export async function POST(req: NextRequest) { // Save messages if we have a chat if (currentChat && responseData.content) { const userMessage = { - id: userMessageIdToUse, // Consistent ID used for request and persistence + id: userMessageIdToUse, role: 'user', content: message, timestamp: new Date().toISOString(), diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/hooks/use-copilot-initialization.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/hooks/use-copilot-initialization.ts index 809263a18..735528dd6 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/hooks/use-copilot-initialization.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/hooks/use-copilot-initialization.ts @@ -38,6 +38,17 @@ export function useCopilotInitialization(props: UseCopilotInitializationProps) { const lastWorkflowIdRef = useRef(null) const hasMountedRef = useRef(false) + // Use refs to store stable references to store functions + // This prevents the useEffect from triggering when functions are recreated + const setCopilotWorkflowIdRef = useRef(setCopilotWorkflowId) + const loadChatsRef = useRef(loadChats) + const loadAutoAllowedToolsRef = useRef(loadAutoAllowedTools) + + // Update refs on each render + setCopilotWorkflowIdRef.current = setCopilotWorkflowId + loadChatsRef.current = loadChats + loadAutoAllowedToolsRef.current = loadAutoAllowedTools + /** Initialize on mount - loads chats if needed. Never loads during streaming */ useEffect(() => { if (activeWorkflowId && !hasMountedRef.current && !isSendingMessage) { @@ -45,10 +56,10 @@ export function useCopilotInitialization(props: UseCopilotInitializationProps) { setIsInitialized(false) lastWorkflowIdRef.current = null - setCopilotWorkflowId(activeWorkflowId) - loadChats(false) + setCopilotWorkflowIdRef.current(activeWorkflowId) + loadChatsRef.current(false) } - }, [activeWorkflowId, setCopilotWorkflowId, loadChats, isSendingMessage]) + }, [activeWorkflowId, isSendingMessage]) /** Handles genuine workflow changes, preventing re-init on every render */ useEffect(() => { @@ -65,8 +76,8 @@ export function useCopilotInitialization(props: UseCopilotInitializationProps) { }) setIsInitialized(false) lastWorkflowIdRef.current = activeWorkflowId - setCopilotWorkflowId(activeWorkflowId) - loadChats(false) + setCopilotWorkflowIdRef.current(activeWorkflowId) + loadChatsRef.current(false) } if ( @@ -82,8 +93,8 @@ export function useCopilotInitialization(props: UseCopilotInitializationProps) { }) setIsInitialized(false) lastWorkflowIdRef.current = activeWorkflowId - setCopilotWorkflowId(activeWorkflowId) - loadChats(false) + setCopilotWorkflowIdRef.current(activeWorkflowId) + loadChatsRef.current(false) } if ( @@ -100,8 +111,6 @@ export function useCopilotInitialization(props: UseCopilotInitializationProps) { isLoadingChats, chatsLoadedForWorkflow, isInitialized, - setCopilotWorkflowId, - loadChats, isSendingMessage, ]) @@ -110,11 +119,11 @@ export function useCopilotInitialization(props: UseCopilotInitializationProps) { useEffect(() => { if (hasMountedRef.current && !hasLoadedAutoAllowedToolsRef.current) { hasLoadedAutoAllowedToolsRef.current = true - loadAutoAllowedTools().catch((err) => { + loadAutoAllowedToolsRef.current().catch((err) => { logger.warn('[Copilot] Failed to load auto-allowed tools', err) }) } - }, [loadAutoAllowedTools]) + }, []) return { isInitialized, diff --git a/apps/sim/hooks/use-stream-cleanup.ts b/apps/sim/hooks/use-stream-cleanup.ts index 060f8b3c0..8f375afad 100644 --- a/apps/sim/hooks/use-stream-cleanup.ts +++ b/apps/sim/hooks/use-stream-cleanup.ts @@ -1,34 +1,32 @@ 'use client' -import { useCallback, useEffect } from 'react' +import { useEffect, useRef } from 'react' /** - * Generic hook to handle stream cleanup on page unload and component unmount - * This ensures that ongoing streams are properly terminated when: - * - Page is refreshed - * - User navigates away - * - Component unmounts - * - Tab is closed + * Generic hook to handle stream cleanup on component unmount. + * + * IMPORTANT: This hook intentionally does NOT cleanup on page refresh/unload. + * The server stream continues running independently and can be resumed when + * the client reconnects. Only cleanup on explicit navigation within the app. */ export function useStreamCleanup(cleanup: () => void) { - const stableCleanup = useCallback(() => { - try { - cleanup() - } catch (error) { - console.warn('Error during stream cleanup:', error) - } - }, [cleanup]) + // Use ref to store cleanup function to avoid recreating effects + const cleanupRef = useRef(cleanup) + cleanupRef.current = cleanup useEffect(() => { - const handleBeforeUnload = () => { - stableCleanup() - } - - window.addEventListener('beforeunload', handleBeforeUnload) - + // Only cleanup on component unmount (navigation within app) + // NOT on page unload/refresh - server stream continues independently return () => { - window.removeEventListener('beforeunload', handleBeforeUnload) - stableCleanup() + // Check if this is a navigation within the app vs page unload + // document.visibilityState is 'hidden' during page unload + if (typeof document !== 'undefined' && document.visibilityState !== 'hidden') { + try { + cleanupRef.current() + } catch (error) { + console.warn('Error during stream cleanup:', error) + } + } } - }, [stableCleanup]) + }, []) // Empty deps - only run on mount/unmount } diff --git a/apps/sim/stores/panel/copilot/store.ts b/apps/sim/stores/panel/copilot/store.ts index aa4f10cbf..e6f2c16e1 100644 --- a/apps/sim/stores/panel/copilot/store.ts +++ b/apps/sim/stores/panel/copilot/store.ts @@ -3553,9 +3553,13 @@ export const useCopilotStore = create()( clearCheckpointError: () => set({ checkpointError: null }), retrySave: async (_chatId: string) => {}, - cleanup: () => { + cleanup: (options?: { preserveServerStream?: boolean }) => { const { isSendingMessage } = get() - if (isSendingMessage) get().abortMessage() + // Only abort if explicitly requested (e.g., navigation within app) + // Don't abort on page unload - let server stream continue for resumption + if (isSendingMessage && !options?.preserveServerStream) { + get().abortMessage() + } if (streamingUpdateRAF !== null) { cancelAnimationFrame(streamingUpdateRAF) streamingUpdateRAF = null diff --git a/apps/sim/stores/panel/copilot/types.ts b/apps/sim/stores/panel/copilot/types.ts index 29c244cc2..980148fe4 100644 --- a/apps/sim/stores/panel/copilot/types.ts +++ b/apps/sim/stores/panel/copilot/types.ts @@ -209,7 +209,7 @@ export interface CopilotActions { clearSaveError: () => void clearCheckpointError: () => void retrySave: (chatId: string) => Promise - cleanup: () => void + cleanup: (options?: { preserveServerStream?: boolean }) => void reset: () => void setInputValue: (value: string) => void