From 6bfd50d0acbce87338ff306a1edfd453188a06e2 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Tue, 3 Feb 2026 16:30:16 -0800 Subject: [PATCH] Checkpoint --- apps/sim/app/api/copilot/chat/route.ts | 108 +++-- apps/sim/app/api/copilot/chat/stream/route.ts | 133 ++++++ .../panel/components/copilot/copilot.tsx | 6 +- .../hooks/use-copilot-initialization.ts | 13 + apps/sim/lib/copilot/api.ts | 46 +- .../lib/copilot/orchestrator/stream-buffer.ts | 152 +++++++ apps/sim/stores/panel/copilot/store.ts | 423 +++++++++++++++++- apps/sim/stores/panel/copilot/types.ts | 18 + 8 files changed, 858 insertions(+), 41 deletions(-) create mode 100644 apps/sim/app/api/copilot/chat/stream/route.ts create mode 100644 apps/sim/lib/copilot/orchestrator/stream-buffer.ts diff --git a/apps/sim/app/api/copilot/chat/route.ts b/apps/sim/app/api/copilot/chat/route.ts index 89839a7e4..631a7e54e 100644 --- a/apps/sim/app/api/copilot/chat/route.ts +++ b/apps/sim/app/api/copilot/chat/route.ts @@ -25,6 +25,11 @@ import { resolveWorkflowIdForUser } from '@/lib/workflows/utils' import { tools } from '@/tools/registry' import { getLatestVersionTools, stripVersionSuffix } from '@/tools/utils' import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator' +import { + appendStreamEvent, + resetStreamBuffer, + setStreamMeta, +} from '@/lib/copilot/orchestrator/stream-buffer' const logger = createLogger('CopilotChatAPI') @@ -483,16 +488,30 @@ export async function POST(req: NextRequest) { } catch {} if (stream) { + const streamId = userMessageIdToUse const transformedStream = new ReadableStream({ async start(controller) { const encoder = new TextEncoder() + await resetStreamBuffer(streamId) + await setStreamMeta(streamId, { status: 'active', userId: authenticatedUserId }) + + const pushEvent = async (event: Record) => { + const entry = await appendStreamEvent(streamId, event) + const payload = { + ...event, + eventId: entry.eventId, + streamId, + } + try { + controller.enqueue(encoder.encode(`data: ${JSON.stringify(payload)}\n\n`)) + } catch { + // Client disconnected - keep buffering + } + } + if (actualChatId) { - controller.enqueue( - encoder.encode( - `data: ${JSON.stringify({ type: 'chat_id', chatId: actualChatId })}\n\n` - ) - ) + await pushEvent({ type: 'chat_id', chatId: actualChatId }) } if (actualChatId && !currentChat?.title && conversationHistory.length === 0) { @@ -506,9 +525,7 @@ export async function POST(req: NextRequest) { updatedAt: new Date(), }) .where(eq(copilotChats.id, actualChatId!)) - controller.enqueue( - encoder.encode(`data: ${JSON.stringify({ type: 'title_updated', title })}\n\n`) - ) + await pushEvent({ type: 'title_updated', title }) } }) .catch((error) => { @@ -524,11 +541,7 @@ export async function POST(req: NextRequest) { autoExecuteTools: true, interactive: true, onEvent: async (event) => { - try { - controller.enqueue(encoder.encode(`data: ${JSON.stringify(event)}\n\n`)) - } catch { - controller.error('Failed to forward SSE event') - } + await pushEvent(event) }, }) @@ -541,19 +554,20 @@ export async function POST(req: NextRequest) { }) .where(eq(copilotChats.id, actualChatId!)) } + await setStreamMeta(streamId, { status: 'complete', userId: authenticatedUserId }) } catch (error) { logger.error(`[${tracker.requestId}] Orchestration error:`, error) - controller.enqueue( - encoder.encode( - `data: ${JSON.stringify({ - type: 'error', - data: { - displayMessage: - 'An unexpected error occurred while processing the response.', - }, - })}\n\n` - ) - ) + await setStreamMeta(streamId, { + status: 'error', + userId: authenticatedUserId, + error: error instanceof Error ? error.message : 'Stream error', + }) + await pushEvent({ + type: 'error', + data: { + displayMessage: 'An unexpected error occurred while processing the response.', + }, + }) } finally { controller.close() } @@ -698,10 +712,7 @@ export async function GET(req: NextRequest) { try { const { searchParams } = new URL(req.url) const workflowId = searchParams.get('workflowId') - - if (!workflowId) { - return createBadRequestResponse('workflowId is required') - } + const chatId = searchParams.get('chatId') // Get authenticated user using consolidated helper const { userId: authenticatedUserId, isAuthenticated } = @@ -710,6 +721,47 @@ export async function GET(req: NextRequest) { return createUnauthorizedResponse() } + // If chatId is provided, fetch a single chat + if (chatId) { + const [chat] = await db + .select({ + id: copilotChats.id, + title: copilotChats.title, + model: copilotChats.model, + messages: copilotChats.messages, + planArtifact: copilotChats.planArtifact, + config: copilotChats.config, + createdAt: copilotChats.createdAt, + updatedAt: copilotChats.updatedAt, + }) + .from(copilotChats) + .where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, authenticatedUserId))) + .limit(1) + + if (!chat) { + return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 }) + } + + const transformedChat = { + id: chat.id, + title: chat.title, + model: chat.model, + messages: Array.isArray(chat.messages) ? chat.messages : [], + messageCount: Array.isArray(chat.messages) ? chat.messages.length : 0, + planArtifact: chat.planArtifact || null, + config: chat.config || null, + createdAt: chat.createdAt, + updatedAt: chat.updatedAt, + } + + logger.info(`Retrieved chat ${chatId}`) + return NextResponse.json({ success: true, chat: transformedChat }) + } + + if (!workflowId) { + return createBadRequestResponse('workflowId or chatId is required') + } + // Fetch chats for this user and workflow const chats = await db .select({ diff --git a/apps/sim/app/api/copilot/chat/stream/route.ts b/apps/sim/app/api/copilot/chat/stream/route.ts new file mode 100644 index 000000000..e04271171 --- /dev/null +++ b/apps/sim/app/api/copilot/chat/stream/route.ts @@ -0,0 +1,133 @@ +import { type NextRequest, NextResponse } from 'next/server' +import { createLogger } from '@sim/logger' +import { SSE_HEADERS } from '@/lib/core/utils/sse' +import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers' +import { + getStreamMeta, + readStreamEvents, + type StreamMeta, +} from '@/lib/copilot/orchestrator/stream-buffer' + +const logger = createLogger('CopilotChatStreamAPI') +const POLL_INTERVAL_MS = 250 +const MAX_STREAM_MS = 10 * 60 * 1000 + +function encodeEvent(event: Record): Uint8Array { + return new TextEncoder().encode(`data: ${JSON.stringify(event)}\n\n`) +} + +export async function GET(request: NextRequest) { + const { userId: authenticatedUserId, isAuthenticated } = + await authenticateCopilotRequestSessionOnly() + + if (!isAuthenticated || !authenticatedUserId) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const url = new URL(request.url) + const streamId = url.searchParams.get('streamId') || '' + const fromParam = url.searchParams.get('from') || '0' + const fromEventId = Number(fromParam || 0) + // If batch=true, return buffered events as JSON instead of SSE + const batchMode = url.searchParams.get('batch') === 'true' + const toParam = url.searchParams.get('to') + const toEventId = toParam ? Number(toParam) : undefined + + if (!streamId) { + return NextResponse.json({ error: 'streamId is required' }, { status: 400 }) + } + + const meta = (await getStreamMeta(streamId)) as StreamMeta | null + logger.info('[Resume] Stream lookup', { + streamId, + fromEventId, + toEventId, + batchMode, + hasMeta: !!meta, + metaStatus: meta?.status, + }) + if (!meta) { + return NextResponse.json({ error: 'Stream not found' }, { status: 404 }) + } + if (meta.userId && meta.userId !== authenticatedUserId) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 403 }) + } + + // Batch mode: return all buffered events as JSON + if (batchMode) { + const events = await readStreamEvents(streamId, fromEventId) + const filteredEvents = toEventId + ? events.filter((e) => e.eventId <= toEventId) + : events + logger.info('[Resume] Batch response', { + streamId, + fromEventId, + toEventId, + eventCount: filteredEvents.length, + }) + return NextResponse.json({ + success: true, + events: filteredEvents, + status: meta.status, + }) + } + + const startTime = Date.now() + + const stream = new ReadableStream({ + async start(controller) { + let lastEventId = Number.isFinite(fromEventId) ? fromEventId : 0 + + const flushEvents = async () => { + const events = await readStreamEvents(streamId, lastEventId) + if (events.length > 0) { + logger.info('[Resume] Flushing events', { + streamId, + fromEventId: lastEventId, + eventCount: events.length, + }) + } + for (const entry of events) { + lastEventId = entry.eventId + const payload = { + ...entry.event, + eventId: entry.eventId, + streamId: entry.streamId, + } + controller.enqueue(encodeEvent(payload)) + } + } + + try { + await flushEvents() + + while (Date.now() - startTime < MAX_STREAM_MS) { + const currentMeta = await getStreamMeta(streamId) + if (!currentMeta) break + + await flushEvents() + + if (currentMeta.status === 'complete' || currentMeta.status === 'error') { + break + } + + if (request.signal.aborted) { + break + } + + await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS)) + } + } catch (error) { + logger.warn('Stream replay failed', { + streamId, + error: error instanceof Error ? error.message : String(error), + }) + } finally { + controller.close() + } + }, + }) + + return new Response(stream, { headers: SSE_HEADERS }) +} + diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/copilot.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/copilot.tsx index f0c9a59f0..e52898e15 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/copilot.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/copilot.tsx @@ -114,6 +114,7 @@ export const Copilot = forwardRef(({ panelWidth }, ref clearPlanArtifact, savePlanArtifact, loadAutoAllowedTools, + resumeActiveStream, } = useCopilotStore() // Initialize copilot @@ -126,6 +127,7 @@ export const Copilot = forwardRef(({ panelWidth }, ref loadAutoAllowedTools, currentChat, isSendingMessage, + resumeActiveStream, }) // Handle scroll management (80px stickiness for copilot) @@ -421,8 +423,8 @@ export const Copilot = forwardRef(({ panelWidth }, ref - {/* Show loading state until fully initialized */} - {!isInitialized ? ( + {/* Show loading state until fully initialized, but skip if actively streaming (resume case) */} + {!isInitialized && !isSendingMessage ? (

Loading copilot

diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/hooks/use-copilot-initialization.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/hooks/use-copilot-initialization.ts index 48a3ead80..1ffe80216 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/hooks/use-copilot-initialization.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/hooks/use-copilot-initialization.ts @@ -14,6 +14,7 @@ interface UseCopilotInitializationProps { loadAutoAllowedTools: () => Promise currentChat: any isSendingMessage: boolean + resumeActiveStream: () => Promise } /** @@ -32,11 +33,13 @@ export function useCopilotInitialization(props: UseCopilotInitializationProps) { loadAutoAllowedTools, currentChat, isSendingMessage, + resumeActiveStream, } = props const [isInitialized, setIsInitialized] = useState(false) const lastWorkflowIdRef = useRef(null) const hasMountedRef = useRef(false) + const hasResumedRef = useRef(false) /** Initialize on mount - loads chats if needed. Never loads during streaming */ useEffect(() => { @@ -105,6 +108,16 @@ export function useCopilotInitialization(props: UseCopilotInitializationProps) { isSendingMessage, ]) + /** Try to resume active stream on mount - runs early, before waiting for chats */ + useEffect(() => { + if (hasResumedRef.current || isSendingMessage) return + hasResumedRef.current = true + // Resume immediately on mount - don't wait for isInitialized + resumeActiveStream().catch((err) => { + logger.warn('[Copilot] Failed to resume active stream', err) + }) + }, [isSendingMessage, resumeActiveStream]) + /** Load auto-allowed tools once on mount - runs immediately, independent of workflow */ const hasLoadedAutoAllowedToolsRef = useRef(false) useEffect(() => { diff --git a/apps/sim/lib/copilot/api.ts b/apps/sim/lib/copilot/api.ts index c680f9751..089d6bac7 100644 --- a/apps/sim/lib/copilot/api.ts +++ b/apps/sim/lib/copilot/api.ts @@ -82,6 +82,7 @@ export interface SendMessageRequest { executionId?: string }> commands?: string[] + resumeFromEventId?: number } /** @@ -120,7 +121,7 @@ export async function sendStreamingMessage( request: SendMessageRequest ): Promise { try { - const { abortSignal, ...requestBody } = request + const { abortSignal, resumeFromEventId, ...requestBody } = request try { const preview = Array.isArray((requestBody as any).contexts) ? (requestBody as any).contexts.map((c: any) => ({ @@ -136,8 +137,51 @@ export async function sendStreamingMessage( ? (requestBody as any).contexts.length : 0, contextsPreview: preview, + resumeFromEventId, }) } catch {} + + const streamId = request.userMessageId + if (typeof resumeFromEventId === 'number') { + if (!streamId) { + return { + success: false, + error: 'streamId is required to resume a stream', + status: 400, + } + } + const url = `/api/copilot/chat/stream?streamId=${encodeURIComponent( + streamId + )}&from=${encodeURIComponent(String(resumeFromEventId))}` + const response = await fetch(url, { + method: 'GET', + signal: abortSignal, + credentials: 'include', + }) + + if (!response.ok) { + const errorMessage = await handleApiError(response, 'Failed to resume streaming message') + return { + success: false, + error: errorMessage, + status: response.status, + } + } + + if (!response.body) { + return { + success: false, + error: 'No response body received', + status: 500, + } + } + + return { + success: true, + stream: response.body, + } + } + const response = await fetch('/api/copilot/chat', { method: 'POST', headers: { 'Content-Type': 'application/json' }, diff --git a/apps/sim/lib/copilot/orchestrator/stream-buffer.ts b/apps/sim/lib/copilot/orchestrator/stream-buffer.ts new file mode 100644 index 000000000..11f651870 --- /dev/null +++ b/apps/sim/lib/copilot/orchestrator/stream-buffer.ts @@ -0,0 +1,152 @@ +import { createLogger } from '@sim/logger' +import { getRedisClient } from '@/lib/core/config/redis' + +const logger = createLogger('CopilotStreamBuffer') + +const STREAM_TTL_SECONDS = 60 * 60 +const STREAM_EVENT_LIMIT = 5000 + +function getStreamKeyPrefix(streamId: string) { + return `copilot_stream:${streamId}` +} + +function getEventsKey(streamId: string) { + return `${getStreamKeyPrefix(streamId)}:events` +} + +function getSeqKey(streamId: string) { + return `${getStreamKeyPrefix(streamId)}:seq` +} + +function getMetaKey(streamId: string) { + return `${getStreamKeyPrefix(streamId)}:meta` +} + +export type StreamStatus = 'active' | 'complete' | 'error' + +export type StreamMeta = { + status: StreamStatus + userId?: string + updatedAt?: string + error?: string +} + +export type StreamEventEntry = { + eventId: number + streamId: string + event: Record +} + +export async function resetStreamBuffer(streamId: string): Promise { + const redis = getRedisClient() + if (!redis) return + try { + await redis.del(getEventsKey(streamId), getSeqKey(streamId), getMetaKey(streamId)) + } catch (error) { + logger.warn('Failed to reset stream buffer', { + streamId, + error: error instanceof Error ? error.message : String(error), + }) + } +} + +export async function setStreamMeta( + streamId: string, + meta: StreamMeta +): Promise { + const redis = getRedisClient() + if (!redis) return + try { + const payload: Record = { + status: meta.status, + updatedAt: meta.updatedAt || new Date().toISOString(), + } + if (meta.userId) payload.userId = meta.userId + if (meta.error) payload.error = meta.error + await redis.hset(getMetaKey(streamId), payload) + await redis.expire(getMetaKey(streamId), STREAM_TTL_SECONDS) + } catch (error) { + logger.warn('Failed to update stream meta', { + streamId, + error: error instanceof Error ? error.message : String(error), + }) + } +} + +export async function getStreamMeta(streamId: string): Promise { + const redis = getRedisClient() + if (!redis) return null + try { + const meta = await redis.hgetall(getMetaKey(streamId)) + if (!meta || Object.keys(meta).length === 0) return null + return meta as StreamMeta + } catch (error) { + logger.warn('Failed to read stream meta', { + streamId, + error: error instanceof Error ? error.message : String(error), + }) + return null + } +} + +export async function appendStreamEvent( + streamId: string, + event: Record +): Promise { + const redis = getRedisClient() + if (!redis) { + return { eventId: 0, streamId, event } + } + + try { + const nextId = await redis.incr(getSeqKey(streamId)) + const entry: StreamEventEntry = { eventId: nextId, streamId, event } + await redis.zadd(getEventsKey(streamId), nextId, JSON.stringify(entry)) + + const count = await redis.zcard(getEventsKey(streamId)) + if (count > STREAM_EVENT_LIMIT) { + const trimCount = count - STREAM_EVENT_LIMIT + if (trimCount > 0) { + await redis.zremrangebyrank(getEventsKey(streamId), 0, trimCount - 1) + } + } + + await redis.expire(getEventsKey(streamId), STREAM_TTL_SECONDS) + await redis.expire(getSeqKey(streamId), STREAM_TTL_SECONDS) + + return entry + } catch (error) { + logger.warn('Failed to append stream event', { + streamId, + error: error instanceof Error ? error.message : String(error), + }) + return { eventId: 0, streamId, event } + } +} + +export async function readStreamEvents( + streamId: string, + afterEventId: number +): Promise { + const redis = getRedisClient() + if (!redis) return [] + try { + const raw = await redis.zrangebyscore(getEventsKey(streamId), afterEventId + 1, '+inf') + return raw + .map((entry) => { + try { + return JSON.parse(entry) as StreamEventEntry + } catch { + return null + } + }) + .filter((entry): entry is StreamEventEntry => Boolean(entry)) + } catch (error) { + logger.warn('Failed to read stream events', { + streamId, + error: error instanceof Error ? error.message : String(error), + }) + return [] + } +} + diff --git a/apps/sim/stores/panel/copilot/store.ts b/apps/sim/stores/panel/copilot/store.ts index 55b366ecd..bfb6e2c8e 100644 --- a/apps/sim/stores/panel/copilot/store.ts +++ b/apps/sim/stores/panel/copilot/store.ts @@ -80,6 +80,7 @@ import { subscriptionKeys } from '@/hooks/queries/subscription' import type { ChatContext, CopilotMessage, + CopilotStreamInfo, CopilotStore, CopilotToolCall, MessageFileAttachment, @@ -92,6 +93,69 @@ import type { WorkflowState } from '@/stores/workflows/workflow/types' const logger = createLogger('CopilotStore') +const STREAM_STORAGE_KEY = 'copilot_active_stream' + +/** + * Flag set on beforeunload to suppress continue option during page refresh/close. + * Aborts during unload should NOT show the continue button. + */ +let isPageUnloading = false +if (typeof window !== 'undefined') { + window.addEventListener('beforeunload', () => { + isPageUnloading = true + }) +} + +function readActiveStreamFromStorage(): CopilotStreamInfo | null { + if (typeof window === 'undefined') return null + try { + const raw = window.sessionStorage.getItem(STREAM_STORAGE_KEY) + logger.info('[Copilot] Reading stream from storage', { + hasRaw: !!raw, + rawPreview: raw ? raw.substring(0, 100) : null, + }) + if (!raw) return null + const parsed = JSON.parse(raw) as CopilotStreamInfo + return parsed?.streamId ? parsed : null + } catch (e) { + logger.warn('[Copilot] Failed to read stream from storage', { error: String(e) }) + return null + } +} + +function writeActiveStreamToStorage(info: CopilotStreamInfo | null): void { + if (typeof window === 'undefined') return + try { + if (!info) { + logger.info('[Copilot] Clearing stream from storage', { + isPageUnloading, + stack: new Error().stack?.split('\n').slice(1, 4).join(' <- '), + }) + window.sessionStorage.removeItem(STREAM_STORAGE_KEY) + return + } + logger.info('[Copilot] Writing stream to storage', { + streamId: info.streamId, + lastEventId: info.lastEventId, + }) + window.sessionStorage.setItem(STREAM_STORAGE_KEY, JSON.stringify(info)) + } catch {} +} + +function updateActiveStreamEventId( + get: () => CopilotStore, + set: (next: Partial) => void, + streamId: string, + eventId: number +): void { + const current = get().activeStream + if (!current || current.streamId !== streamId) return + if (eventId <= (current.lastEventId || 0)) return + const next = { ...current, lastEventId: eventId } + set({ activeStream: next }) + writeActiveStreamToStorage(next) +} + // On module load, clear any lingering diff preview (fresh page refresh) try { const diffStore = useWorkflowDiffStore.getState() @@ -1033,6 +1097,28 @@ function appendContinueOptionBlock(blocks: any[]): any[] { ] } +function stripContinueOption(content: string): string { + if (!content || !content.includes(CONTINUE_OPTIONS_TAG)) return content + const next = content.replace(CONTINUE_OPTIONS_TAG, '') + return next.replace(/\n{2,}\s*$/g, '\n').trimEnd() +} + +function stripContinueOptionFromBlocks(blocks: any[]): any[] { + if (!Array.isArray(blocks)) return blocks + return blocks.flatMap((block) => { + if ( + block?.type === TEXT_BLOCK_TYPE && + typeof block.content === 'string' && + block.content.includes(CONTINUE_OPTIONS_TAG) + ) { + const nextContent = stripContinueOption(block.content) + if (!nextContent.trim()) return [] + return [{ ...block, content: nextContent }] + } + return [block] + }) +} + function beginThinkingBlock(context: StreamingContext) { if (!context.currentThinkingBlock) { context.currentThinkingBlock = contentBlockPool.get() @@ -1118,12 +1204,18 @@ function appendSubAgentText(context: StreamingContext, parentToolCallId: string, } const sseHandlers: Record = { - chat_id: async (data, context, get) => { + chat_id: async (data, context, get, set) => { context.newChatId = data.chatId - const { currentChat } = get() + const { currentChat, activeStream } = get() if (!currentChat && context.newChatId) { await get().handleNewChatCreation(context.newChatId) } + // Update activeStream with chatId for resume purposes + if (activeStream && context.newChatId && !activeStream.chatId) { + const updatedStream = { ...activeStream, chatId: context.newChatId } + set({ activeStream: updatedStream }) + writeActiveStreamToStorage(updatedStream) + } }, title_updated: (_data, _context, get, set) => { const title = _data.title @@ -2072,6 +2164,7 @@ const initialState = { toolCallsById: {} as Record, suppressAutoSelect: false, autoAllowedTools: [] as string[], + activeStream: null as CopilotStreamInfo | null, messageQueue: [] as import('./types').QueuedMessage[], suppressAbortContinueOption: false, sensitiveCredentialIds: new Set(), @@ -2492,6 +2585,22 @@ export const useCopilotStore = create()( currentUserMessageId: userMessage.id, })) + const activeStream: CopilotStreamInfo = { + streamId: userMessage.id, + workflowId, + chatId: currentChat?.id, + userMessageId: userMessage.id, + assistantMessageId: streamingMessage.id, + lastEventId: 0, + resumeAttempts: 0, + userMessageContent: message, + fileAttachments, + contexts, + startedAt: Date.now(), + } + set({ activeStream }) + writeActiveStreamToStorage(activeStream) + if (isFirstMessage) { const optimisticTitle = message.length > 50 ? `${message.substring(0, 47)}...` : message set((state) => ({ @@ -2616,6 +2725,8 @@ export const useCopilotStore = create()( isSendingMessage: false, abortController: null, })) + set({ activeStream: null }) + writeActiveStreamToStorage(null) } } catch (error) { if (error instanceof Error && error.name === 'AbortError') return @@ -2629,14 +2740,240 @@ export const useCopilotStore = create()( isSendingMessage: false, abortController: null, })) + set({ activeStream: null }) + writeActiveStreamToStorage(null) } }, + resumeActiveStream: async () => { + const stored = get().activeStream || readActiveStreamFromStorage() + logger.info('[Copilot] Resume check', { + hasStored: !!stored, + streamId: stored?.streamId, + lastEventId: stored?.lastEventId, + storedWorkflowId: stored?.workflowId, + storedChatId: stored?.chatId, + currentWorkflowId: get().workflowId, + isSendingMessage: get().isSendingMessage, + resumeAttempts: stored?.resumeAttempts, + }) + if (!stored || !stored.streamId) return false + if (get().isSendingMessage) return false + if (get().workflowId && stored.workflowId !== get().workflowId) return false + + if (stored.resumeAttempts >= 3) { + logger.warn('[Copilot] Too many resume attempts, giving up') + return false + } + + const nextStream: CopilotStreamInfo = { + ...stored, + resumeAttempts: (stored.resumeAttempts || 0) + 1, + } + set({ activeStream: nextStream }) + writeActiveStreamToStorage(nextStream) + + // Load existing chat messages from database if we have a chatId but no messages + let messages = get().messages + // Track if this is a fresh page load (no messages in memory) + const isFreshResume = messages.length === 0 + if (isFreshResume && nextStream.chatId) { + try { + logger.info('[Copilot] Loading chat for resume', { chatId: nextStream.chatId }) + const response = await fetch(`/api/copilot/chat?chatId=${nextStream.chatId}`) + if (response.ok) { + const data = await response.json() + if (data.success && data.chat) { + const normalizedMessages = normalizeMessagesForUI(data.chat.messages || []) + const toolCallsById = buildToolCallsById(normalizedMessages) + set({ + currentChat: data.chat, + messages: normalizedMessages, + toolCallsById, + streamingPlanContent: data.chat.planArtifact || '', + }) + messages = normalizedMessages + logger.info('[Copilot] Loaded chat for resume', { + chatId: nextStream.chatId, + messageCount: normalizedMessages.length, + }) + } + } + } catch (e) { + logger.warn('[Copilot] Failed to load chat for resume', { error: String(e) }) + } + } + + // ALWAYS fetch buffered events when resuming (to ensure we have complete content) + let bufferedContent = '' + if (nextStream.lastEventId > 0) { + try { + logger.info('[Copilot] Fetching buffered events', { + streamId: nextStream.streamId, + lastEventId: nextStream.lastEventId, + isFreshResume, + }) + const batchUrl = `/api/copilot/chat/stream?streamId=${encodeURIComponent( + nextStream.streamId + )}&from=0&to=${nextStream.lastEventId}&batch=true` + const batchResponse = await fetch(batchUrl, { credentials: 'include' }) + if (batchResponse.ok) { + const batchData = await batchResponse.json() + if (batchData.success && Array.isArray(batchData.events)) { + // Extract text content from buffered events + for (const entry of batchData.events) { + const event = entry.event + if (event?.type === 'content' && typeof event.data === 'string') { + bufferedContent += event.data + } + } + logger.info('[Copilot] Loaded buffered content', { + eventCount: batchData.events.length, + contentLength: bufferedContent.length, + contentPreview: bufferedContent.slice(0, 100), + }) + } else { + logger.warn('[Copilot] Batch response missing events', { + success: batchData.success, + hasEvents: Array.isArray(batchData.events), + }) + } + } else { + logger.warn('[Copilot] Failed to fetch buffered events', { + status: batchResponse.status, + }) + } + } catch (e) { + logger.warn('[Copilot] Failed to fetch buffered events', { error: String(e) }) + } + } + + let nextMessages = messages + let cleanedExisting = false + nextMessages = nextMessages.map((m) => { + if (m.id !== nextStream.assistantMessageId) return m + const hasContinueTag = + (typeof m.content === 'string' && m.content.includes(CONTINUE_OPTIONS_TAG)) || + (Array.isArray(m.contentBlocks) && + m.contentBlocks.some( + (b: any) => + b?.type === TEXT_BLOCK_TYPE && + typeof b.content === 'string' && + b.content.includes(CONTINUE_OPTIONS_TAG) + )) + if (!hasContinueTag) return m + cleanedExisting = true + return { + ...m, + content: stripContinueOption(m.content || ''), + contentBlocks: stripContinueOptionFromBlocks(m.contentBlocks || []), + } + }) + + if (!messages.some((m) => m.id === nextStream.userMessageId)) { + const userMessage = createUserMessage( + nextStream.userMessageContent || '', + nextStream.fileAttachments, + nextStream.contexts, + nextStream.userMessageId + ) + nextMessages = [...nextMessages, userMessage] + } + + if (!nextMessages.some((m) => m.id === nextStream.assistantMessageId)) { + // Create assistant message with buffered content pre-loaded + const assistantMessage: CopilotMessage = { + ...createStreamingMessage(), + id: nextStream.assistantMessageId, + content: bufferedContent, + contentBlocks: bufferedContent + ? [{ type: TEXT_BLOCK_TYPE, content: bufferedContent, timestamp: Date.now() }] + : [], + } + nextMessages = [...nextMessages, assistantMessage] + } else if (bufferedContent) { + // Update existing assistant message with buffered content + nextMessages = nextMessages.map((m) => + m.id === nextStream.assistantMessageId + ? { + ...m, + content: bufferedContent, + contentBlocks: [ + { type: TEXT_BLOCK_TYPE, content: bufferedContent, timestamp: Date.now() }, + ], + } + : m + ) + } + + if (cleanedExisting || nextMessages !== messages || bufferedContent) { + set({ messages: nextMessages, currentUserMessageId: nextStream.userMessageId }) + } else { + set({ currentUserMessageId: nextStream.userMessageId }) + } + + const abortController = new AbortController() + set({ isSendingMessage: true, abortController }) + + try { + logger.info('[Copilot] Attempting to resume stream', { + streamId: nextStream.streamId, + lastEventId: nextStream.lastEventId, + isFreshResume, + bufferedContentLength: bufferedContent.length, + assistantMessageId: nextStream.assistantMessageId, + chatId: nextStream.chatId, + }) + const result = await sendStreamingMessage({ + message: nextStream.userMessageContent || '', + userMessageId: nextStream.userMessageId, + workflowId: nextStream.workflowId, + chatId: nextStream.chatId || get().currentChat?.id || undefined, + mode: get().mode === 'ask' ? 'ask' : get().mode === 'plan' ? 'plan' : 'agent', + model: get().selectedModel, + prefetch: get().agentPrefetch, + stream: true, + resumeFromEventId: nextStream.lastEventId, + abortSignal: abortController.signal, + }) + + logger.info('[Copilot] Resume stream result', { + success: result.success, + hasStream: !!result.stream, + error: result.error, + }) + + if (result.success && result.stream) { + await get().handleStreamingResponse( + result.stream, + nextStream.assistantMessageId, + true, + nextStream.userMessageId + ) + return true + } + set({ isSendingMessage: false, abortController: null }) + } catch (error) { + // Handle AbortError gracefully - expected when user aborts + if (error instanceof Error && (error.name === 'AbortError' || error.message.includes('aborted'))) { + logger.info('[Copilot] Resume stream aborted by user') + set({ isSendingMessage: false, abortController: null }) + return false + } + logger.error('[Copilot] Failed to resume stream', { + error: error instanceof Error ? error.message : String(error), + }) + set({ isSendingMessage: false, abortController: null }) + } + return false + }, + // Abort streaming abortMessage: (options?: { suppressContinueOption?: boolean }) => { const { abortController, isSendingMessage, messages } = get() if (!isSendingMessage || !abortController) return - const suppressContinueOption = options?.suppressContinueOption === true + // Suppress continue option if explicitly requested OR if page is unloading (refresh/close) + const suppressContinueOption = options?.suppressContinueOption === true || isPageUnloading set({ isAborting: true, suppressAbortContinueOption: suppressContinueOption }) try { abortController.abort() @@ -2678,6 +3015,13 @@ export const useCopilotStore = create()( }) } + // Only clear active stream for user-initiated aborts, NOT page unload + // During page unload, keep the stream info so we can resume after refresh + if (!isPageUnloading) { + set({ activeStream: null }) + writeActiveStreamToStorage(null) + } + // Immediately put all in-progress tools into aborted state abortAllInProgressTools(set, get) @@ -2704,6 +3048,11 @@ export const useCopilotStore = create()( } } catch { set({ isSendingMessage: false, isAborting: false }) + // Only clear active stream for user-initiated aborts, NOT page unload + if (!isPageUnloading) { + set({ activeStream: null }) + writeActiveStreamToStorage(null) + } } }, @@ -3051,15 +3400,42 @@ export const useCopilotStore = create()( subAgentToolCalls: {}, subAgentBlocks: {}, } + if (isContinuation) { + context.suppressContinueOption = true + } if (isContinuation) { const { messages } = get() const existingMessage = messages.find((m) => m.id === assistantMessageId) + logger.info('[Copilot] Continuation init', { + hasMessage: !!existingMessage, + contentLength: existingMessage?.content?.length || 0, + contentPreview: existingMessage?.content?.slice(0, 100) || '', + contentBlocksCount: existingMessage?.contentBlocks?.length || 0, + }) if (existingMessage) { - if (existingMessage.content) context.accumulatedContent.append(existingMessage.content) - context.contentBlocks = existingMessage.contentBlocks - ? [...existingMessage.contentBlocks] - : [] + // Initialize with existing text content (should be buffered content we set earlier) + const existingContent = existingMessage.content || '' + if (existingContent) { + context.accumulatedContent.append(existingContent) + } + // Create fresh text block with existing content (don't reuse to avoid mutation issues) + if (existingContent) { + const textBlock = contentBlockPool.get() + textBlock.type = TEXT_BLOCK_TYPE + textBlock.content = existingContent + textBlock.timestamp = Date.now() + context.contentBlocks = [textBlock] + context.currentTextBlock = textBlock + } + // Copy over any non-text blocks (tool calls, thinking, etc) from existing message + if (existingMessage.contentBlocks) { + for (const block of existingMessage.contentBlocks) { + if (block.type !== TEXT_BLOCK_TYPE) { + context.contentBlocks.push({ ...block }) + } + } + } } } @@ -3074,7 +3450,8 @@ export const useCopilotStore = create()( if (abortController?.signal.aborted) { context.wasAborted = true const { suppressAbortContinueOption } = get() - context.suppressContinueOption = suppressAbortContinueOption === true + // Suppress continue option if explicitly requested OR if page is unloading (refresh/close) + context.suppressContinueOption = suppressAbortContinueOption === true || isPageUnloading if (suppressAbortContinueOption) { set({ suppressAbortContinueOption: false }) } @@ -3085,6 +3462,12 @@ export const useCopilotStore = create()( break } + const eventId = typeof data?.eventId === 'number' ? data.eventId : undefined + const streamId = typeof data?.streamId === 'string' ? data.streamId : undefined + if (eventId && streamId) { + updateActiveStreamEventId(get, set, streamId, eventId) + } + // Log SSE events for debugging logger.info('[SSE] Received event', { type: data.type, @@ -3202,10 +3585,20 @@ export const useCopilotStore = create()( : block ) } + if (isContinuation) { + sanitizedContentBlocks = stripContinueOptionFromBlocks(sanitizedContentBlocks) + } if (context.wasAborted && !context.suppressContinueOption) { sanitizedContentBlocks = appendContinueOptionBlock(sanitizedContentBlocks) } + if (!context.streamComplete && !context.wasAborted) { + const resumed = await get().resumeActiveStream() + if (resumed) { + return + } + } + if (context.contentBlocks) { context.contentBlocks.forEach((block) => { if (block.type === TEXT_BLOCK_TYPE || block.type === THINKING_BLOCK_TYPE) { @@ -3215,10 +3608,13 @@ export const useCopilotStore = create()( } const finalContent = stripTodoTags(context.accumulatedContent.toString()) + const finalContentStripped = isContinuation + ? stripContinueOption(finalContent) + : finalContent const finalContentWithOptions = context.wasAborted && !context.suppressContinueOption ? appendContinueOption(finalContent) - : finalContent + : finalContentStripped set((state) => { const snapshotId = state.currentUserMessageId const nextSnapshots = @@ -3229,7 +3625,7 @@ export const useCopilotStore = create()( return updated })() : state.messageSnapshots - return { + const nextState: Partial = { messages: state.messages.map((msg) => msg.id === assistantMessageId ? { @@ -3245,8 +3641,15 @@ export const useCopilotStore = create()( currentUserMessageId: null, messageSnapshots: nextSnapshots, } + return nextState }) + // Only clear active stream if stream completed normally or user aborted (not page unload) + if ((context.streamComplete || context.wasAborted) && !isPageUnloading) { + set({ activeStream: null }) + writeActiveStreamToStorage(null) + } + if (context.newChatId && !get().currentChat) { await get().handleNewChatCreation(context.newChatId) } diff --git a/apps/sim/stores/panel/copilot/types.ts b/apps/sim/stores/panel/copilot/types.ts index 6736f5a60..706ff2948 100644 --- a/apps/sim/stores/panel/copilot/types.ts +++ b/apps/sim/stores/panel/copilot/types.ts @@ -33,6 +33,20 @@ export interface CopilotToolCall { subAgentStreaming?: boolean } +export interface CopilotStreamInfo { + streamId: string + workflowId: string + chatId?: string + userMessageId: string + assistantMessageId: string + lastEventId: number + resumeAttempts: number + userMessageContent: string + fileAttachments?: MessageFileAttachment[] + contexts?: ChatContext[] + startedAt: number +} + export interface MessageFileAttachment { id: string key: string @@ -154,6 +168,9 @@ export interface CopilotState { // Auto-allowed integration tools (tools that can run without confirmation) autoAllowedTools: string[] + // Active stream metadata for reconnect/replay + activeStream: CopilotStreamInfo | null + // Message queue for messages sent while another is in progress messageQueue: QueuedMessage[] @@ -194,6 +211,7 @@ export interface CopilotActions { toolCallState: 'accepted' | 'rejected' | 'error', toolCallId?: string ) => void + resumeActiveStream: () => Promise setToolCallState: (toolCall: any, newState: ClientToolCallState, options?: any) => void updateToolCallParams: (toolCallId: string, params: Record) => void sendDocsMessage: (query: string, options?: { stream?: boolean; topK?: number }) => Promise