Things are broken

This commit is contained in:
Siddharth Ganesan
2026-02-03 18:33:47 -08:00
parent 2216e7a855
commit 71701f1049
2 changed files with 271 additions and 157 deletions

View File

@@ -134,12 +134,18 @@ function writeActiveStreamToStorage(info: CopilotStreamInfo | null): void {
window.sessionStorage.removeItem(STREAM_STORAGE_KEY)
return
}
const payload = JSON.stringify(info)
window.sessionStorage.setItem(STREAM_STORAGE_KEY, payload)
const verified = window.sessionStorage.getItem(STREAM_STORAGE_KEY) === payload
logger.info('[Copilot] Writing stream to storage', {
streamId: info.streamId,
lastEventId: info.lastEventId,
userMessageContent: info.userMessageContent?.slice(0, 30),
verified,
})
window.sessionStorage.setItem(STREAM_STORAGE_KEY, JSON.stringify(info))
} catch {}
} catch (e) {
logger.error('[Copilot] Failed to write stream to storage', { error: String(e) })
}
}
function updateActiveStreamEventId(
@@ -1046,6 +1052,7 @@ interface StreamingContext {
subAgentToolCalls: Record<string, CopilotToolCall[]>
/** Track subagent streaming blocks per parent tool call */
subAgentBlocks: Record<string, any[]>
suppressStreamingUpdates?: boolean
}
type SSEHandler = (
@@ -2018,6 +2025,92 @@ const subAgentSSEHandlers: Record<string, SSEHandler> = {
},
}
async function applySseEvent(
data: any,
context: StreamingContext,
get: () => CopilotStore,
set: (next: Partial<CopilotStore> | ((state: CopilotStore) => Partial<CopilotStore>)) => void
): Promise<boolean> {
if (data.type === 'subagent_start') {
const toolCallId = data.data?.tool_call_id
if (toolCallId) {
context.subAgentParentToolCallId = toolCallId
const { toolCallsById } = get()
const parentToolCall = toolCallsById[toolCallId]
if (parentToolCall) {
const updatedToolCall: CopilotToolCall = {
...parentToolCall,
subAgentStreaming: true,
}
const updatedMap = { ...toolCallsById, [toolCallId]: updatedToolCall }
set({ toolCallsById: updatedMap })
}
logger.info('[SSE] Subagent session started', {
subagent: data.subagent,
parentToolCallId: toolCallId,
})
}
return true
}
if (data.type === 'subagent_end') {
const parentToolCallId = context.subAgentParentToolCallId
if (parentToolCallId) {
const { toolCallsById } = get()
const parentToolCall = toolCallsById[parentToolCallId]
if (parentToolCall) {
const updatedToolCall: CopilotToolCall = {
...parentToolCall,
subAgentContent: context.subAgentContent[parentToolCallId] || '',
subAgentToolCalls: context.subAgentToolCalls[parentToolCallId] || [],
subAgentBlocks: context.subAgentBlocks[parentToolCallId] || [],
subAgentStreaming: false,
}
const updatedMap = { ...toolCallsById, [parentToolCallId]: updatedToolCall }
set({ toolCallsById: updatedMap })
logger.info('[SSE] Subagent session ended', {
subagent: data.subagent,
parentToolCallId,
contentLength: context.subAgentContent[parentToolCallId]?.length || 0,
toolCallCount: context.subAgentToolCalls[parentToolCallId]?.length || 0,
})
}
}
context.subAgentParentToolCallId = undefined
return true
}
if (data.subagent) {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId) {
logger.warn('[SSE] Subagent event without parent tool call ID', {
type: data.type,
subagent: data.subagent,
})
return true
}
logger.info('[SSE] Processing subagent event', {
type: data.type,
subagent: data.subagent,
parentToolCallId,
hasHandler: !!subAgentSSEHandlers[data.type],
})
const subAgentHandler = subAgentSSEHandlers[data.type]
if (subAgentHandler) {
await subAgentHandler(data, context, get, set)
} else {
logger.warn('[SSE] No handler for subagent event type', { type: data.type })
}
return !context.streamComplete
}
const handler = sseHandlers[data.type] || sseHandlers.default
await handler(data, context, get, set)
return !context.streamComplete
}
// Debounced UI update queue for smoother streaming
const streamingUpdateQueue = new Map<string, StreamingContext>()
let streamingUpdateRAF: number | null = null
@@ -2034,7 +2127,7 @@ function stopStreamingUpdates() {
streamingUpdateQueue.clear()
}
/** Flush pending streaming updates immediately (apply them to state before clearing) */
/** Flush pending streaming updates immediately (apply them to state before clearing). */
function flushStreamingUpdates(set: any) {
if (streamingUpdateRAF !== null) {
cancelAnimationFrame(streamingUpdateRAF)
@@ -2066,6 +2159,76 @@ function flushStreamingUpdates(set: any) {
})
}
function cloneContentBlocks(blocks: any[]): any[] {
if (!Array.isArray(blocks)) return []
return blocks.map((block) => (block ? { ...block } : block))
}
function extractTextFromBlocks(blocks: any[]): string {
if (!Array.isArray(blocks)) return ''
return blocks
.filter((block) => block?.type === TEXT_BLOCK_TYPE && typeof block.content === 'string')
.map((block) => block.content)
.join('')
}
function appendTextToBlocks(blocks: any[], text: string): any[] {
const nextBlocks = cloneContentBlocks(blocks)
if (!text) return nextBlocks
const lastIndex = nextBlocks.length - 1
const lastBlock = nextBlocks[lastIndex]
if (lastBlock?.type === TEXT_BLOCK_TYPE) {
const current = typeof lastBlock.content === 'string' ? lastBlock.content : ''
nextBlocks[lastIndex] = { ...lastBlock, content: current + text }
return nextBlocks
}
nextBlocks.push({ type: TEXT_BLOCK_TYPE, content: text, timestamp: Date.now() })
return nextBlocks
}
function findLastTextBlock(blocks: any[]): any | null {
if (!Array.isArray(blocks) || blocks.length === 0) return null
const lastBlock = blocks[blocks.length - 1]
return lastBlock?.type === TEXT_BLOCK_TYPE ? lastBlock : null
}
function replaceTextBlocks(blocks: any[], text: string): any[] {
const next: any[] = []
let inserted = false
for (const block of blocks || []) {
if (block?.type === TEXT_BLOCK_TYPE) {
if (!inserted && text) {
next.push({ type: TEXT_BLOCK_TYPE, content: text, timestamp: Date.now() })
inserted = true
}
continue
}
next.push(block ? { ...block } : block)
}
if (!inserted && text) {
next.push({ type: TEXT_BLOCK_TYPE, content: text, timestamp: Date.now() })
}
return next
}
function createStreamingContext(messageId: string): StreamingContext {
return {
messageId,
accumulatedContent: new StringBuilder(),
contentBlocks: [],
currentTextBlock: null,
isInThinkingBlock: false,
currentThinkingBlock: null,
isInDesignWorkflowBlock: false,
designWorkflowContent: '',
pendingContent: '',
doneEventCount: 0,
subAgentContent: {},
subAgentToolCalls: {},
subAgentBlocks: {},
}
}
function createOptimizedContentBlocks(contentBlocks: any[]): any[] {
const result: any[] = new Array(contentBlocks.length)
for (let i = 0; i < contentBlocks.length; i++) {
@@ -2074,8 +2237,9 @@ function createOptimizedContentBlocks(contentBlocks: any[]): any[] {
}
return result
}
``
function updateStreamingMessage(set: any, context: StreamingContext) {
if (context.suppressStreamingUpdates) return
const now = performance.now()
streamingUpdateQueue.set(context.messageId, context)
const timeSinceLastBatch = now - lastBatchTime
@@ -2617,6 +2781,8 @@ export const useCopilotStore = create<CopilotStore>()(
currentUserMessageId: userMessage.id,
}))
// Create new stream info and write to storage BEFORE starting the stream
// This ensures that if the user refreshes, they get the correct stream
const activeStream: CopilotStreamInfo = {
streamId: userMessage.id,
workflowId,
@@ -2630,6 +2796,12 @@ export const useCopilotStore = create<CopilotStore>()(
contexts,
startedAt: Date.now(),
}
logger.info('[Copilot] Creating new active stream', {
streamId: activeStream.streamId,
workflowId: activeStream.workflowId,
chatId: activeStream.chatId,
userMessageContent: message.slice(0, 50),
})
set({ activeStream })
writeActiveStreamToStorage(activeStream)
@@ -2712,7 +2884,8 @@ export const useCopilotStore = create<CopilotStore>()(
result.stream,
streamingMessage.id,
false,
userMessage.id
userMessage.id,
nextAbortController.signal
)
set({ chatsLastLoadedAt: null, chatsLoadedForWorkflow: null })
} else {
@@ -2778,13 +2951,18 @@ export const useCopilotStore = create<CopilotStore>()(
},
resumeActiveStream: async () => {
const stored = get().activeStream || readActiveStreamFromStorage()
const inMemoryStream = get().activeStream
const storedStream = readActiveStreamFromStorage()
const stored = inMemoryStream || storedStream
logger.info('[Copilot] Resume check', {
hasStored: !!stored,
hasInMemory: !!inMemoryStream,
hasStored: !!storedStream,
usingStream: inMemoryStream ? 'memory' : storedStream ? 'storage' : 'none',
streamId: stored?.streamId,
lastEventId: stored?.lastEventId,
storedWorkflowId: stored?.workflowId,
storedChatId: stored?.chatId,
userMessageContent: stored?.userMessageContent?.slice(0, 50),
currentWorkflowId: get().workflowId,
isSendingMessage: get().isSendingMessage,
resumeAttempts: stored?.resumeAttempts,
@@ -2798,7 +2976,7 @@ export const useCopilotStore = create<CopilotStore>()(
return false
}
const nextStream: CopilotStreamInfo = {
let nextStream: CopilotStreamInfo = {
...stored,
resumeAttempts: (stored.resumeAttempts || 0) + 1,
}
@@ -2836,8 +3014,8 @@ export const useCopilotStore = create<CopilotStore>()(
}
}
// Fetch ALL buffered events and display instantly, then continue streaming from highest event
let bufferedContent = ''
let replayBlocks: any[] | null = null
let resumeFromEventId = nextStream.lastEventId
if (nextStream.lastEventId > 0) {
try {
@@ -2845,7 +3023,6 @@ export const useCopilotStore = create<CopilotStore>()(
streamId: nextStream.streamId,
savedLastEventId: nextStream.lastEventId,
})
// Fetch ALL events (no 'to' limit) so we get everything buffered while disconnected
const batchUrl = `/api/copilot/chat/stream?streamId=${encodeURIComponent(
nextStream.streamId
)}&from=0&batch=true`
@@ -2853,17 +3030,19 @@ export const useCopilotStore = create<CopilotStore>()(
if (batchResponse.ok) {
const batchData = await batchResponse.json()
if (batchData.success && Array.isArray(batchData.events)) {
// Extract text content and track highest event ID
const replayContext = createStreamingContext(nextStream.assistantMessageId)
replayContext.suppressStreamingUpdates = true
for (const entry of batchData.events) {
const event = entry.event
if (event?.type === 'content' && typeof event.data === 'string') {
bufferedContent += event.data
if (event) {
await applySseEvent(event, replayContext, get, set)
}
// Track highest event ID so we resume from there (not the old lastEventId)
if (typeof entry.eventId === 'number' && entry.eventId > resumeFromEventId) {
resumeFromEventId = entry.eventId
}
}
bufferedContent = replayContext.accumulatedContent.toString()
replayBlocks = replayContext.contentBlocks
logger.info('[Copilot] Loaded buffered content instantly', {
eventCount: batchData.events.length,
contentLength: bufferedContent.length,
@@ -2884,6 +3063,11 @@ export const useCopilotStore = create<CopilotStore>()(
logger.warn('[Copilot] Failed to fetch buffered events', { error: String(e) })
}
}
if (resumeFromEventId > nextStream.lastEventId) {
nextStream = { ...nextStream, lastEventId: resumeFromEventId }
set({ activeStream: nextStream })
writeActiveStreamToStorage(nextStream)
}
let nextMessages = messages
let cleanedExisting = false
@@ -2918,29 +3102,44 @@ export const useCopilotStore = create<CopilotStore>()(
}
if (!nextMessages.some((m) => m.id === nextStream.assistantMessageId)) {
// Create assistant message with buffered content pre-loaded
const assistantMessage: CopilotMessage = {
...createStreamingMessage(),
id: nextStream.assistantMessageId,
content: bufferedContent,
contentBlocks: bufferedContent
? [{ type: TEXT_BLOCK_TYPE, content: bufferedContent, timestamp: Date.now() }]
: [],
contentBlocks:
replayBlocks && replayBlocks.length > 0
? replayBlocks
: bufferedContent
? [{ type: TEXT_BLOCK_TYPE, content: bufferedContent, timestamp: Date.now() }]
: [],
}
nextMessages = [...nextMessages, assistantMessage]
} else if (bufferedContent) {
// Update existing assistant message with buffered content
nextMessages = nextMessages.map((m) =>
m.id === nextStream.assistantMessageId
? {
...m,
content: bufferedContent,
contentBlocks: [
{ type: TEXT_BLOCK_TYPE, content: bufferedContent, timestamp: Date.now() },
],
}
: m
)
} else if (bufferedContent || (replayBlocks && replayBlocks.length > 0)) {
nextMessages = nextMessages.map((m) => {
if (m.id !== nextStream.assistantMessageId) return m
let nextBlocks = replayBlocks && replayBlocks.length > 0 ? replayBlocks : null
if (!nextBlocks) {
const existingBlocks = Array.isArray(m.contentBlocks) ? m.contentBlocks : []
const existingText = extractTextFromBlocks(existingBlocks)
if (existingText && bufferedContent.startsWith(existingText)) {
const delta = bufferedContent.slice(existingText.length)
nextBlocks = delta
? appendTextToBlocks(existingBlocks, delta)
: cloneContentBlocks(existingBlocks)
} else if (!existingText && existingBlocks.length === 0) {
nextBlocks = bufferedContent
? [{ type: TEXT_BLOCK_TYPE, content: bufferedContent, timestamp: Date.now() }]
: []
} else {
nextBlocks = replaceTextBlocks(existingBlocks, bufferedContent)
}
}
return {
...m,
content: bufferedContent,
contentBlocks: nextBlocks || [],
}
})
}
if (cleanedExisting || nextMessages !== messages || bufferedContent) {
@@ -2986,7 +3185,8 @@ export const useCopilotStore = create<CopilotStore>()(
result.stream,
nextStream.assistantMessageId,
true,
nextStream.userMessageId
nextStream.userMessageId,
abortController.signal
)
return true
}
@@ -3015,9 +3215,7 @@ export const useCopilotStore = create<CopilotStore>()(
set({ isAborting: true, suppressAbortContinueOption: suppressContinueOption })
try {
abortController.abort()
// Flush pending streaming updates to preserve content before stopping
flushStreamingUpdates(set)
// Re-read messages after flush to get the latest content
const { messages: updatedMessages } = get()
const lastMessage = updatedMessages[updatedMessages.length - 1]
if (lastMessage && lastMessage.role === 'assistant') {
@@ -3121,7 +3319,13 @@ export const useCopilotStore = create<CopilotStore>()(
abortSignal: abortController.signal,
})
if (result.success && result.stream) {
await get().handleStreamingResponse(result.stream, newAssistantMessage.id, false)
await get().handleStreamingResponse(
result.stream,
newAssistantMessage.id,
false,
undefined,
abortController.signal
)
} else {
if (result.error === 'Request was aborted') return
const errorMessage = createErrorMessage(
@@ -3420,27 +3624,15 @@ export const useCopilotStore = create<CopilotStore>()(
stream: ReadableStream,
assistantMessageId: string,
isContinuation = false,
triggerUserMessageId?: string
triggerUserMessageId?: string,
abortSignal?: AbortSignal
) => {
const reader = stream.getReader()
const decoder = new TextDecoder()
const startTimeMs = Date.now()
const expectedStreamId = triggerUserMessageId
const context: StreamingContext = {
messageId: assistantMessageId,
accumulatedContent: new StringBuilder(),
contentBlocks: [],
currentTextBlock: null,
isInThinkingBlock: false,
currentThinkingBlock: null,
isInDesignWorkflowBlock: false,
designWorkflowContent: '',
pendingContent: '',
doneEventCount: 0,
subAgentContent: {},
subAgentToolCalls: {},
subAgentBlocks: {},
}
const context = createStreamingContext(assistantMessageId)
if (isContinuation) {
context.suppressContinueOption = true
}
@@ -3455,27 +3647,25 @@ export const useCopilotStore = create<CopilotStore>()(
contentBlocksCount: existingMessage?.contentBlocks?.length || 0,
})
if (existingMessage) {
// Initialize with existing text content (should be buffered content we set earlier)
const existingContent = existingMessage.content || ''
if (existingContent) {
context.accumulatedContent.append(existingContent)
}
// Create fresh text block with existing content (don't reuse to avoid mutation issues)
if (existingContent) {
const existingBlocks = Array.isArray(existingMessage.contentBlocks)
? existingMessage.contentBlocks
: []
if (existingBlocks.length > 0) {
const existingText = extractTextFromBlocks(existingBlocks)
if (existingText) {
context.accumulatedContent.append(existingText)
}
const clonedBlocks = cloneContentBlocks(existingBlocks)
context.contentBlocks = clonedBlocks
context.currentTextBlock = findLastTextBlock(clonedBlocks)
} else if (existingMessage.content) {
const textBlock = contentBlockPool.get()
textBlock.type = TEXT_BLOCK_TYPE
textBlock.content = existingContent
textBlock.content = existingMessage.content
textBlock.timestamp = Date.now()
context.contentBlocks = [textBlock]
context.currentTextBlock = textBlock
}
// Copy over any non-text blocks (tool calls, thinking, etc) from existing message
if (existingMessage.contentBlocks) {
for (const block of existingMessage.contentBlocks) {
if (block.type !== TEXT_BLOCK_TYPE) {
context.contentBlocks.push({ ...block })
}
}
context.accumulatedContent.append(existingMessage.content)
}
}
}
@@ -3487,24 +3677,30 @@ export const useCopilotStore = create<CopilotStore>()(
try {
for await (const data of parseSSEStream(reader, decoder)) {
const { abortController } = get()
if (abortController?.signal.aborted) {
if (abortSignal?.aborted) {
context.wasAborted = true
const { suppressAbortContinueOption } = get()
// Suppress continue option if explicitly requested OR if page is unloading (refresh/close)
context.suppressContinueOption = suppressAbortContinueOption === true || isPageUnloading
if (suppressAbortContinueOption) {
set({ suppressAbortContinueOption: false })
}
context.pendingContent = ''
finalizeThinkingBlock(context)
stopStreamingUpdates()
flushStreamingUpdates(set)
reader.cancel()
break
}
const eventId = typeof data?.eventId === 'number' ? data.eventId : undefined
const streamId = typeof data?.streamId === 'string' ? data.streamId : undefined
if (expectedStreamId && streamId && streamId !== expectedStreamId) {
logger.warn('[SSE] Ignoring event for mismatched stream', {
expectedStreamId,
streamId,
type: data.type,
})
continue
}
if (eventId && streamId) {
updateActiveStreamEventId(get, set, streamId, eventId)
}
@@ -3520,91 +3716,8 @@ export const useCopilotStore = create<CopilotStore>()(
: JSON.stringify(data.data)?.substring(0, 100),
})
// Handle subagent_start to track parent tool call
if (data.type === 'subagent_start') {
const toolCallId = data.data?.tool_call_id
if (toolCallId) {
context.subAgentParentToolCallId = toolCallId
// Mark the parent tool call as streaming
const { toolCallsById } = get()
const parentToolCall = toolCallsById[toolCallId]
if (parentToolCall) {
const updatedToolCall: CopilotToolCall = {
...parentToolCall,
subAgentStreaming: true,
}
const updatedMap = { ...toolCallsById, [toolCallId]: updatedToolCall }
set({ toolCallsById: updatedMap })
}
logger.info('[SSE] Subagent session started', {
subagent: data.subagent,
parentToolCallId: toolCallId,
})
}
continue
}
// Handle subagent_end to finalize subagent content
if (data.type === 'subagent_end') {
const parentToolCallId = context.subAgentParentToolCallId
if (parentToolCallId) {
// Mark subagent streaming as complete
const { toolCallsById } = get()
const parentToolCall = toolCallsById[parentToolCallId]
if (parentToolCall) {
const updatedToolCall: CopilotToolCall = {
...parentToolCall,
subAgentContent: context.subAgentContent[parentToolCallId] || '',
subAgentToolCalls: context.subAgentToolCalls[parentToolCallId] || [],
subAgentBlocks: context.subAgentBlocks[parentToolCallId] || [],
subAgentStreaming: false, // Done streaming
}
const updatedMap = { ...toolCallsById, [parentToolCallId]: updatedToolCall }
set({ toolCallsById: updatedMap })
logger.info('[SSE] Subagent session ended', {
subagent: data.subagent,
parentToolCallId,
contentLength: context.subAgentContent[parentToolCallId]?.length || 0,
toolCallCount: context.subAgentToolCalls[parentToolCallId]?.length || 0,
})
}
}
context.subAgentParentToolCallId = undefined
continue
}
// Check if this is a subagent event (has subagent field)
if (data.subagent) {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId) {
logger.warn('[SSE] Subagent event without parent tool call ID', {
type: data.type,
subagent: data.subagent,
})
continue
}
logger.info('[SSE] Processing subagent event', {
type: data.type,
subagent: data.subagent,
parentToolCallId,
hasHandler: !!subAgentSSEHandlers[data.type],
})
const subAgentHandler = subAgentSSEHandlers[data.type]
if (subAgentHandler) {
await subAgentHandler(data, context, get, set)
} else {
logger.warn('[SSE] No handler for subagent event type', { type: data.type })
}
// Skip regular handlers for subagent events
if (context.streamComplete) break
continue
}
const handler = sseHandlers[data.type] || sseHandlers.default
await handler(data, context, get, set)
if (context.streamComplete) break
const shouldContinue = await applySseEvent(data, context, get, set)
if (!shouldContinue) break
}
if (!context.wasAborted && sseHandlers.stream_end) {

View File

@@ -246,7 +246,8 @@ export interface CopilotActions {
stream: ReadableStream,
messageId: string,
isContinuation?: boolean,
triggerUserMessageId?: string
triggerUserMessageId?: string,
abortSignal?: AbortSignal
) => Promise<void>
handleNewChatCreation: (newChatId: string) => Promise<void>
loadAutoAllowedTools: () => Promise<void>