diff --git a/apps/sim/stores/panel/copilot/store.ts b/apps/sim/stores/panel/copilot/store.ts index 9e8d3b137..39a933dd1 100644 --- a/apps/sim/stores/panel/copilot/store.ts +++ b/apps/sim/stores/panel/copilot/store.ts @@ -134,12 +134,18 @@ function writeActiveStreamToStorage(info: CopilotStreamInfo | null): void { window.sessionStorage.removeItem(STREAM_STORAGE_KEY) return } + const payload = JSON.stringify(info) + window.sessionStorage.setItem(STREAM_STORAGE_KEY, payload) + const verified = window.sessionStorage.getItem(STREAM_STORAGE_KEY) === payload logger.info('[Copilot] Writing stream to storage', { streamId: info.streamId, lastEventId: info.lastEventId, + userMessageContent: info.userMessageContent?.slice(0, 30), + verified, }) - window.sessionStorage.setItem(STREAM_STORAGE_KEY, JSON.stringify(info)) - } catch {} + } catch (e) { + logger.error('[Copilot] Failed to write stream to storage', { error: String(e) }) + } } function updateActiveStreamEventId( @@ -1046,6 +1052,7 @@ interface StreamingContext { subAgentToolCalls: Record /** Track subagent streaming blocks per parent tool call */ subAgentBlocks: Record + suppressStreamingUpdates?: boolean } type SSEHandler = ( @@ -2018,6 +2025,92 @@ const subAgentSSEHandlers: Record = { }, } +async function applySseEvent( + data: any, + context: StreamingContext, + get: () => CopilotStore, + set: (next: Partial | ((state: CopilotStore) => Partial)) => void +): Promise { + if (data.type === 'subagent_start') { + const toolCallId = data.data?.tool_call_id + if (toolCallId) { + context.subAgentParentToolCallId = toolCallId + const { toolCallsById } = get() + const parentToolCall = toolCallsById[toolCallId] + if (parentToolCall) { + const updatedToolCall: CopilotToolCall = { + ...parentToolCall, + subAgentStreaming: true, + } + const updatedMap = { ...toolCallsById, [toolCallId]: updatedToolCall } + set({ toolCallsById: updatedMap }) + } + logger.info('[SSE] Subagent session started', { + subagent: data.subagent, + parentToolCallId: toolCallId, + }) + } + return true + } + + if (data.type === 'subagent_end') { + const parentToolCallId = context.subAgentParentToolCallId + if (parentToolCallId) { + const { toolCallsById } = get() + const parentToolCall = toolCallsById[parentToolCallId] + if (parentToolCall) { + const updatedToolCall: CopilotToolCall = { + ...parentToolCall, + subAgentContent: context.subAgentContent[parentToolCallId] || '', + subAgentToolCalls: context.subAgentToolCalls[parentToolCallId] || [], + subAgentBlocks: context.subAgentBlocks[parentToolCallId] || [], + subAgentStreaming: false, + } + const updatedMap = { ...toolCallsById, [parentToolCallId]: updatedToolCall } + set({ toolCallsById: updatedMap }) + logger.info('[SSE] Subagent session ended', { + subagent: data.subagent, + parentToolCallId, + contentLength: context.subAgentContent[parentToolCallId]?.length || 0, + toolCallCount: context.subAgentToolCalls[parentToolCallId]?.length || 0, + }) + } + } + context.subAgentParentToolCallId = undefined + return true + } + + if (data.subagent) { + const parentToolCallId = context.subAgentParentToolCallId + if (!parentToolCallId) { + logger.warn('[SSE] Subagent event without parent tool call ID', { + type: data.type, + subagent: data.subagent, + }) + return true + } + + logger.info('[SSE] Processing subagent event', { + type: data.type, + subagent: data.subagent, + parentToolCallId, + hasHandler: !!subAgentSSEHandlers[data.type], + }) + + const subAgentHandler = subAgentSSEHandlers[data.type] + if (subAgentHandler) { + await subAgentHandler(data, context, get, set) + } else { + logger.warn('[SSE] No handler for subagent event type', { type: data.type }) + } + return !context.streamComplete + } + + const handler = sseHandlers[data.type] || sseHandlers.default + await handler(data, context, get, set) + return !context.streamComplete +} + // Debounced UI update queue for smoother streaming const streamingUpdateQueue = new Map() let streamingUpdateRAF: number | null = null @@ -2034,7 +2127,7 @@ function stopStreamingUpdates() { streamingUpdateQueue.clear() } -/** Flush pending streaming updates immediately (apply them to state before clearing) */ +/** Flush pending streaming updates immediately (apply them to state before clearing). */ function flushStreamingUpdates(set: any) { if (streamingUpdateRAF !== null) { cancelAnimationFrame(streamingUpdateRAF) @@ -2066,6 +2159,76 @@ function flushStreamingUpdates(set: any) { }) } +function cloneContentBlocks(blocks: any[]): any[] { + if (!Array.isArray(blocks)) return [] + return blocks.map((block) => (block ? { ...block } : block)) +} + +function extractTextFromBlocks(blocks: any[]): string { + if (!Array.isArray(blocks)) return '' + return blocks + .filter((block) => block?.type === TEXT_BLOCK_TYPE && typeof block.content === 'string') + .map((block) => block.content) + .join('') +} + +function appendTextToBlocks(blocks: any[], text: string): any[] { + const nextBlocks = cloneContentBlocks(blocks) + if (!text) return nextBlocks + const lastIndex = nextBlocks.length - 1 + const lastBlock = nextBlocks[lastIndex] + if (lastBlock?.type === TEXT_BLOCK_TYPE) { + const current = typeof lastBlock.content === 'string' ? lastBlock.content : '' + nextBlocks[lastIndex] = { ...lastBlock, content: current + text } + return nextBlocks + } + nextBlocks.push({ type: TEXT_BLOCK_TYPE, content: text, timestamp: Date.now() }) + return nextBlocks +} + +function findLastTextBlock(blocks: any[]): any | null { + if (!Array.isArray(blocks) || blocks.length === 0) return null + const lastBlock = blocks[blocks.length - 1] + return lastBlock?.type === TEXT_BLOCK_TYPE ? lastBlock : null +} + +function replaceTextBlocks(blocks: any[], text: string): any[] { + const next: any[] = [] + let inserted = false + for (const block of blocks || []) { + if (block?.type === TEXT_BLOCK_TYPE) { + if (!inserted && text) { + next.push({ type: TEXT_BLOCK_TYPE, content: text, timestamp: Date.now() }) + inserted = true + } + continue + } + next.push(block ? { ...block } : block) + } + if (!inserted && text) { + next.push({ type: TEXT_BLOCK_TYPE, content: text, timestamp: Date.now() }) + } + return next +} + +function createStreamingContext(messageId: string): StreamingContext { + return { + messageId, + accumulatedContent: new StringBuilder(), + contentBlocks: [], + currentTextBlock: null, + isInThinkingBlock: false, + currentThinkingBlock: null, + isInDesignWorkflowBlock: false, + designWorkflowContent: '', + pendingContent: '', + doneEventCount: 0, + subAgentContent: {}, + subAgentToolCalls: {}, + subAgentBlocks: {}, + } +} + function createOptimizedContentBlocks(contentBlocks: any[]): any[] { const result: any[] = new Array(contentBlocks.length) for (let i = 0; i < contentBlocks.length; i++) { @@ -2074,8 +2237,9 @@ function createOptimizedContentBlocks(contentBlocks: any[]): any[] { } return result } - +`` function updateStreamingMessage(set: any, context: StreamingContext) { + if (context.suppressStreamingUpdates) return const now = performance.now() streamingUpdateQueue.set(context.messageId, context) const timeSinceLastBatch = now - lastBatchTime @@ -2617,6 +2781,8 @@ export const useCopilotStore = create()( currentUserMessageId: userMessage.id, })) + // Create new stream info and write to storage BEFORE starting the stream + // This ensures that if the user refreshes, they get the correct stream const activeStream: CopilotStreamInfo = { streamId: userMessage.id, workflowId, @@ -2630,6 +2796,12 @@ export const useCopilotStore = create()( contexts, startedAt: Date.now(), } + logger.info('[Copilot] Creating new active stream', { + streamId: activeStream.streamId, + workflowId: activeStream.workflowId, + chatId: activeStream.chatId, + userMessageContent: message.slice(0, 50), + }) set({ activeStream }) writeActiveStreamToStorage(activeStream) @@ -2712,7 +2884,8 @@ export const useCopilotStore = create()( result.stream, streamingMessage.id, false, - userMessage.id + userMessage.id, + nextAbortController.signal ) set({ chatsLastLoadedAt: null, chatsLoadedForWorkflow: null }) } else { @@ -2778,13 +2951,18 @@ export const useCopilotStore = create()( }, resumeActiveStream: async () => { - const stored = get().activeStream || readActiveStreamFromStorage() + const inMemoryStream = get().activeStream + const storedStream = readActiveStreamFromStorage() + const stored = inMemoryStream || storedStream logger.info('[Copilot] Resume check', { - hasStored: !!stored, + hasInMemory: !!inMemoryStream, + hasStored: !!storedStream, + usingStream: inMemoryStream ? 'memory' : storedStream ? 'storage' : 'none', streamId: stored?.streamId, lastEventId: stored?.lastEventId, storedWorkflowId: stored?.workflowId, storedChatId: stored?.chatId, + userMessageContent: stored?.userMessageContent?.slice(0, 50), currentWorkflowId: get().workflowId, isSendingMessage: get().isSendingMessage, resumeAttempts: stored?.resumeAttempts, @@ -2798,7 +2976,7 @@ export const useCopilotStore = create()( return false } - const nextStream: CopilotStreamInfo = { + let nextStream: CopilotStreamInfo = { ...stored, resumeAttempts: (stored.resumeAttempts || 0) + 1, } @@ -2836,8 +3014,8 @@ export const useCopilotStore = create()( } } - // Fetch ALL buffered events and display instantly, then continue streaming from highest event let bufferedContent = '' + let replayBlocks: any[] | null = null let resumeFromEventId = nextStream.lastEventId if (nextStream.lastEventId > 0) { try { @@ -2845,7 +3023,6 @@ export const useCopilotStore = create()( streamId: nextStream.streamId, savedLastEventId: nextStream.lastEventId, }) - // Fetch ALL events (no 'to' limit) so we get everything buffered while disconnected const batchUrl = `/api/copilot/chat/stream?streamId=${encodeURIComponent( nextStream.streamId )}&from=0&batch=true` @@ -2853,17 +3030,19 @@ export const useCopilotStore = create()( if (batchResponse.ok) { const batchData = await batchResponse.json() if (batchData.success && Array.isArray(batchData.events)) { - // Extract text content and track highest event ID + const replayContext = createStreamingContext(nextStream.assistantMessageId) + replayContext.suppressStreamingUpdates = true for (const entry of batchData.events) { const event = entry.event - if (event?.type === 'content' && typeof event.data === 'string') { - bufferedContent += event.data + if (event) { + await applySseEvent(event, replayContext, get, set) } - // Track highest event ID so we resume from there (not the old lastEventId) if (typeof entry.eventId === 'number' && entry.eventId > resumeFromEventId) { resumeFromEventId = entry.eventId } } + bufferedContent = replayContext.accumulatedContent.toString() + replayBlocks = replayContext.contentBlocks logger.info('[Copilot] Loaded buffered content instantly', { eventCount: batchData.events.length, contentLength: bufferedContent.length, @@ -2884,6 +3063,11 @@ export const useCopilotStore = create()( logger.warn('[Copilot] Failed to fetch buffered events', { error: String(e) }) } } + if (resumeFromEventId > nextStream.lastEventId) { + nextStream = { ...nextStream, lastEventId: resumeFromEventId } + set({ activeStream: nextStream }) + writeActiveStreamToStorage(nextStream) + } let nextMessages = messages let cleanedExisting = false @@ -2918,29 +3102,44 @@ export const useCopilotStore = create()( } if (!nextMessages.some((m) => m.id === nextStream.assistantMessageId)) { - // Create assistant message with buffered content pre-loaded const assistantMessage: CopilotMessage = { ...createStreamingMessage(), id: nextStream.assistantMessageId, content: bufferedContent, - contentBlocks: bufferedContent - ? [{ type: TEXT_BLOCK_TYPE, content: bufferedContent, timestamp: Date.now() }] - : [], + contentBlocks: + replayBlocks && replayBlocks.length > 0 + ? replayBlocks + : bufferedContent + ? [{ type: TEXT_BLOCK_TYPE, content: bufferedContent, timestamp: Date.now() }] + : [], } nextMessages = [...nextMessages, assistantMessage] - } else if (bufferedContent) { - // Update existing assistant message with buffered content - nextMessages = nextMessages.map((m) => - m.id === nextStream.assistantMessageId - ? { - ...m, - content: bufferedContent, - contentBlocks: [ - { type: TEXT_BLOCK_TYPE, content: bufferedContent, timestamp: Date.now() }, - ], - } - : m - ) + } else if (bufferedContent || (replayBlocks && replayBlocks.length > 0)) { + nextMessages = nextMessages.map((m) => { + if (m.id !== nextStream.assistantMessageId) return m + let nextBlocks = replayBlocks && replayBlocks.length > 0 ? replayBlocks : null + if (!nextBlocks) { + const existingBlocks = Array.isArray(m.contentBlocks) ? m.contentBlocks : [] + const existingText = extractTextFromBlocks(existingBlocks) + if (existingText && bufferedContent.startsWith(existingText)) { + const delta = bufferedContent.slice(existingText.length) + nextBlocks = delta + ? appendTextToBlocks(existingBlocks, delta) + : cloneContentBlocks(existingBlocks) + } else if (!existingText && existingBlocks.length === 0) { + nextBlocks = bufferedContent + ? [{ type: TEXT_BLOCK_TYPE, content: bufferedContent, timestamp: Date.now() }] + : [] + } else { + nextBlocks = replaceTextBlocks(existingBlocks, bufferedContent) + } + } + return { + ...m, + content: bufferedContent, + contentBlocks: nextBlocks || [], + } + }) } if (cleanedExisting || nextMessages !== messages || bufferedContent) { @@ -2986,7 +3185,8 @@ export const useCopilotStore = create()( result.stream, nextStream.assistantMessageId, true, - nextStream.userMessageId + nextStream.userMessageId, + abortController.signal ) return true } @@ -3015,9 +3215,7 @@ export const useCopilotStore = create()( set({ isAborting: true, suppressAbortContinueOption: suppressContinueOption }) try { abortController.abort() - // Flush pending streaming updates to preserve content before stopping flushStreamingUpdates(set) - // Re-read messages after flush to get the latest content const { messages: updatedMessages } = get() const lastMessage = updatedMessages[updatedMessages.length - 1] if (lastMessage && lastMessage.role === 'assistant') { @@ -3121,7 +3319,13 @@ export const useCopilotStore = create()( abortSignal: abortController.signal, }) if (result.success && result.stream) { - await get().handleStreamingResponse(result.stream, newAssistantMessage.id, false) + await get().handleStreamingResponse( + result.stream, + newAssistantMessage.id, + false, + undefined, + abortController.signal + ) } else { if (result.error === 'Request was aborted') return const errorMessage = createErrorMessage( @@ -3420,27 +3624,15 @@ export const useCopilotStore = create()( stream: ReadableStream, assistantMessageId: string, isContinuation = false, - triggerUserMessageId?: string + triggerUserMessageId?: string, + abortSignal?: AbortSignal ) => { const reader = stream.getReader() const decoder = new TextDecoder() const startTimeMs = Date.now() + const expectedStreamId = triggerUserMessageId - const context: StreamingContext = { - messageId: assistantMessageId, - accumulatedContent: new StringBuilder(), - contentBlocks: [], - currentTextBlock: null, - isInThinkingBlock: false, - currentThinkingBlock: null, - isInDesignWorkflowBlock: false, - designWorkflowContent: '', - pendingContent: '', - doneEventCount: 0, - subAgentContent: {}, - subAgentToolCalls: {}, - subAgentBlocks: {}, - } + const context = createStreamingContext(assistantMessageId) if (isContinuation) { context.suppressContinueOption = true } @@ -3455,27 +3647,25 @@ export const useCopilotStore = create()( contentBlocksCount: existingMessage?.contentBlocks?.length || 0, }) if (existingMessage) { - // Initialize with existing text content (should be buffered content we set earlier) - const existingContent = existingMessage.content || '' - if (existingContent) { - context.accumulatedContent.append(existingContent) - } - // Create fresh text block with existing content (don't reuse to avoid mutation issues) - if (existingContent) { + const existingBlocks = Array.isArray(existingMessage.contentBlocks) + ? existingMessage.contentBlocks + : [] + if (existingBlocks.length > 0) { + const existingText = extractTextFromBlocks(existingBlocks) + if (existingText) { + context.accumulatedContent.append(existingText) + } + const clonedBlocks = cloneContentBlocks(existingBlocks) + context.contentBlocks = clonedBlocks + context.currentTextBlock = findLastTextBlock(clonedBlocks) + } else if (existingMessage.content) { const textBlock = contentBlockPool.get() textBlock.type = TEXT_BLOCK_TYPE - textBlock.content = existingContent + textBlock.content = existingMessage.content textBlock.timestamp = Date.now() context.contentBlocks = [textBlock] context.currentTextBlock = textBlock - } - // Copy over any non-text blocks (tool calls, thinking, etc) from existing message - if (existingMessage.contentBlocks) { - for (const block of existingMessage.contentBlocks) { - if (block.type !== TEXT_BLOCK_TYPE) { - context.contentBlocks.push({ ...block }) - } - } + context.accumulatedContent.append(existingMessage.content) } } } @@ -3487,24 +3677,30 @@ export const useCopilotStore = create()( try { for await (const data of parseSSEStream(reader, decoder)) { - const { abortController } = get() - if (abortController?.signal.aborted) { + if (abortSignal?.aborted) { context.wasAborted = true const { suppressAbortContinueOption } = get() - // Suppress continue option if explicitly requested OR if page is unloading (refresh/close) context.suppressContinueOption = suppressAbortContinueOption === true || isPageUnloading if (suppressAbortContinueOption) { set({ suppressAbortContinueOption: false }) } context.pendingContent = '' finalizeThinkingBlock(context) - stopStreamingUpdates() + flushStreamingUpdates(set) reader.cancel() break } const eventId = typeof data?.eventId === 'number' ? data.eventId : undefined const streamId = typeof data?.streamId === 'string' ? data.streamId : undefined + if (expectedStreamId && streamId && streamId !== expectedStreamId) { + logger.warn('[SSE] Ignoring event for mismatched stream', { + expectedStreamId, + streamId, + type: data.type, + }) + continue + } if (eventId && streamId) { updateActiveStreamEventId(get, set, streamId, eventId) } @@ -3520,91 +3716,8 @@ export const useCopilotStore = create()( : JSON.stringify(data.data)?.substring(0, 100), }) - // Handle subagent_start to track parent tool call - if (data.type === 'subagent_start') { - const toolCallId = data.data?.tool_call_id - if (toolCallId) { - context.subAgentParentToolCallId = toolCallId - // Mark the parent tool call as streaming - const { toolCallsById } = get() - const parentToolCall = toolCallsById[toolCallId] - if (parentToolCall) { - const updatedToolCall: CopilotToolCall = { - ...parentToolCall, - subAgentStreaming: true, - } - const updatedMap = { ...toolCallsById, [toolCallId]: updatedToolCall } - set({ toolCallsById: updatedMap }) - } - logger.info('[SSE] Subagent session started', { - subagent: data.subagent, - parentToolCallId: toolCallId, - }) - } - continue - } - - // Handle subagent_end to finalize subagent content - if (data.type === 'subagent_end') { - const parentToolCallId = context.subAgentParentToolCallId - if (parentToolCallId) { - // Mark subagent streaming as complete - const { toolCallsById } = get() - const parentToolCall = toolCallsById[parentToolCallId] - if (parentToolCall) { - const updatedToolCall: CopilotToolCall = { - ...parentToolCall, - subAgentContent: context.subAgentContent[parentToolCallId] || '', - subAgentToolCalls: context.subAgentToolCalls[parentToolCallId] || [], - subAgentBlocks: context.subAgentBlocks[parentToolCallId] || [], - subAgentStreaming: false, // Done streaming - } - const updatedMap = { ...toolCallsById, [parentToolCallId]: updatedToolCall } - set({ toolCallsById: updatedMap }) - logger.info('[SSE] Subagent session ended', { - subagent: data.subagent, - parentToolCallId, - contentLength: context.subAgentContent[parentToolCallId]?.length || 0, - toolCallCount: context.subAgentToolCalls[parentToolCallId]?.length || 0, - }) - } - } - context.subAgentParentToolCallId = undefined - continue - } - - // Check if this is a subagent event (has subagent field) - if (data.subagent) { - const parentToolCallId = context.subAgentParentToolCallId - if (!parentToolCallId) { - logger.warn('[SSE] Subagent event without parent tool call ID', { - type: data.type, - subagent: data.subagent, - }) - continue - } - - logger.info('[SSE] Processing subagent event', { - type: data.type, - subagent: data.subagent, - parentToolCallId, - hasHandler: !!subAgentSSEHandlers[data.type], - }) - - const subAgentHandler = subAgentSSEHandlers[data.type] - if (subAgentHandler) { - await subAgentHandler(data, context, get, set) - } else { - logger.warn('[SSE] No handler for subagent event type', { type: data.type }) - } - // Skip regular handlers for subagent events - if (context.streamComplete) break - continue - } - - const handler = sseHandlers[data.type] || sseHandlers.default - await handler(data, context, get, set) - if (context.streamComplete) break + const shouldContinue = await applySseEvent(data, context, get, set) + if (!shouldContinue) break } if (!context.wasAborted && sseHandlers.stream_end) { diff --git a/apps/sim/stores/panel/copilot/types.ts b/apps/sim/stores/panel/copilot/types.ts index 706ff2948..9357ebd6b 100644 --- a/apps/sim/stores/panel/copilot/types.ts +++ b/apps/sim/stores/panel/copilot/types.ts @@ -246,7 +246,8 @@ export interface CopilotActions { stream: ReadableStream, messageId: string, isContinuation?: boolean, - triggerUserMessageId?: string + triggerUserMessageId?: string, + abortSignal?: AbortSignal ) => Promise handleNewChatCreation: (newChatId: string) => Promise loadAutoAllowedTools: () => Promise