Checkpoint

This commit is contained in:
Siddharth Ganesan
2026-02-03 16:30:16 -08:00
parent 1ddcebfe57
commit 6bfd50d0ac
8 changed files with 858 additions and 41 deletions

View File

@@ -25,6 +25,11 @@ import { resolveWorkflowIdForUser } from '@/lib/workflows/utils'
import { tools } from '@/tools/registry'
import { getLatestVersionTools, stripVersionSuffix } from '@/tools/utils'
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
import {
appendStreamEvent,
resetStreamBuffer,
setStreamMeta,
} from '@/lib/copilot/orchestrator/stream-buffer'
const logger = createLogger('CopilotChatAPI')
@@ -483,16 +488,30 @@ export async function POST(req: NextRequest) {
} catch {}
if (stream) {
const streamId = userMessageIdToUse
const transformedStream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder()
await resetStreamBuffer(streamId)
await setStreamMeta(streamId, { status: 'active', userId: authenticatedUserId })
const pushEvent = async (event: Record<string, any>) => {
const entry = await appendStreamEvent(streamId, event)
const payload = {
...event,
eventId: entry.eventId,
streamId,
}
try {
controller.enqueue(encoder.encode(`data: ${JSON.stringify(payload)}\n\n`))
} catch {
// Client disconnected - keep buffering
}
}
if (actualChatId) {
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ type: 'chat_id', chatId: actualChatId })}\n\n`
)
)
await pushEvent({ type: 'chat_id', chatId: actualChatId })
}
if (actualChatId && !currentChat?.title && conversationHistory.length === 0) {
@@ -506,9 +525,7 @@ export async function POST(req: NextRequest) {
updatedAt: new Date(),
})
.where(eq(copilotChats.id, actualChatId!))
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ type: 'title_updated', title })}\n\n`)
)
await pushEvent({ type: 'title_updated', title })
}
})
.catch((error) => {
@@ -524,11 +541,7 @@ export async function POST(req: NextRequest) {
autoExecuteTools: true,
interactive: true,
onEvent: async (event) => {
try {
controller.enqueue(encoder.encode(`data: ${JSON.stringify(event)}\n\n`))
} catch {
controller.error('Failed to forward SSE event')
}
await pushEvent(event)
},
})
@@ -541,19 +554,20 @@ export async function POST(req: NextRequest) {
})
.where(eq(copilotChats.id, actualChatId!))
}
await setStreamMeta(streamId, { status: 'complete', userId: authenticatedUserId })
} catch (error) {
logger.error(`[${tracker.requestId}] Orchestration error:`, error)
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({
type: 'error',
data: {
displayMessage:
'An unexpected error occurred while processing the response.',
},
})}\n\n`
)
)
await setStreamMeta(streamId, {
status: 'error',
userId: authenticatedUserId,
error: error instanceof Error ? error.message : 'Stream error',
})
await pushEvent({
type: 'error',
data: {
displayMessage: 'An unexpected error occurred while processing the response.',
},
})
} finally {
controller.close()
}
@@ -698,10 +712,7 @@ export async function GET(req: NextRequest) {
try {
const { searchParams } = new URL(req.url)
const workflowId = searchParams.get('workflowId')
if (!workflowId) {
return createBadRequestResponse('workflowId is required')
}
const chatId = searchParams.get('chatId')
// Get authenticated user using consolidated helper
const { userId: authenticatedUserId, isAuthenticated } =
@@ -710,6 +721,47 @@ export async function GET(req: NextRequest) {
return createUnauthorizedResponse()
}
// If chatId is provided, fetch a single chat
if (chatId) {
const [chat] = await db
.select({
id: copilotChats.id,
title: copilotChats.title,
model: copilotChats.model,
messages: copilotChats.messages,
planArtifact: copilotChats.planArtifact,
config: copilotChats.config,
createdAt: copilotChats.createdAt,
updatedAt: copilotChats.updatedAt,
})
.from(copilotChats)
.where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, authenticatedUserId)))
.limit(1)
if (!chat) {
return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 })
}
const transformedChat = {
id: chat.id,
title: chat.title,
model: chat.model,
messages: Array.isArray(chat.messages) ? chat.messages : [],
messageCount: Array.isArray(chat.messages) ? chat.messages.length : 0,
planArtifact: chat.planArtifact || null,
config: chat.config || null,
createdAt: chat.createdAt,
updatedAt: chat.updatedAt,
}
logger.info(`Retrieved chat ${chatId}`)
return NextResponse.json({ success: true, chat: transformedChat })
}
if (!workflowId) {
return createBadRequestResponse('workflowId or chatId is required')
}
// Fetch chats for this user and workflow
const chats = await db
.select({

View File

@@ -0,0 +1,133 @@
import { type NextRequest, NextResponse } from 'next/server'
import { createLogger } from '@sim/logger'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers'
import {
getStreamMeta,
readStreamEvents,
type StreamMeta,
} from '@/lib/copilot/orchestrator/stream-buffer'
const logger = createLogger('CopilotChatStreamAPI')
const POLL_INTERVAL_MS = 250
const MAX_STREAM_MS = 10 * 60 * 1000
function encodeEvent(event: Record<string, any>): Uint8Array {
return new TextEncoder().encode(`data: ${JSON.stringify(event)}\n\n`)
}
export async function GET(request: NextRequest) {
const { userId: authenticatedUserId, isAuthenticated } =
await authenticateCopilotRequestSessionOnly()
if (!isAuthenticated || !authenticatedUserId) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const url = new URL(request.url)
const streamId = url.searchParams.get('streamId') || ''
const fromParam = url.searchParams.get('from') || '0'
const fromEventId = Number(fromParam || 0)
// If batch=true, return buffered events as JSON instead of SSE
const batchMode = url.searchParams.get('batch') === 'true'
const toParam = url.searchParams.get('to')
const toEventId = toParam ? Number(toParam) : undefined
if (!streamId) {
return NextResponse.json({ error: 'streamId is required' }, { status: 400 })
}
const meta = (await getStreamMeta(streamId)) as StreamMeta | null
logger.info('[Resume] Stream lookup', {
streamId,
fromEventId,
toEventId,
batchMode,
hasMeta: !!meta,
metaStatus: meta?.status,
})
if (!meta) {
return NextResponse.json({ error: 'Stream not found' }, { status: 404 })
}
if (meta.userId && meta.userId !== authenticatedUserId) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 403 })
}
// Batch mode: return all buffered events as JSON
if (batchMode) {
const events = await readStreamEvents(streamId, fromEventId)
const filteredEvents = toEventId
? events.filter((e) => e.eventId <= toEventId)
: events
logger.info('[Resume] Batch response', {
streamId,
fromEventId,
toEventId,
eventCount: filteredEvents.length,
})
return NextResponse.json({
success: true,
events: filteredEvents,
status: meta.status,
})
}
const startTime = Date.now()
const stream = new ReadableStream({
async start(controller) {
let lastEventId = Number.isFinite(fromEventId) ? fromEventId : 0
const flushEvents = async () => {
const events = await readStreamEvents(streamId, lastEventId)
if (events.length > 0) {
logger.info('[Resume] Flushing events', {
streamId,
fromEventId: lastEventId,
eventCount: events.length,
})
}
for (const entry of events) {
lastEventId = entry.eventId
const payload = {
...entry.event,
eventId: entry.eventId,
streamId: entry.streamId,
}
controller.enqueue(encodeEvent(payload))
}
}
try {
await flushEvents()
while (Date.now() - startTime < MAX_STREAM_MS) {
const currentMeta = await getStreamMeta(streamId)
if (!currentMeta) break
await flushEvents()
if (currentMeta.status === 'complete' || currentMeta.status === 'error') {
break
}
if (request.signal.aborted) {
break
}
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
}
} catch (error) {
logger.warn('Stream replay failed', {
streamId,
error: error instanceof Error ? error.message : String(error),
})
} finally {
controller.close()
}
},
})
return new Response(stream, { headers: SSE_HEADERS })
}

View File

@@ -114,6 +114,7 @@ export const Copilot = forwardRef<CopilotRef, CopilotProps>(({ panelWidth }, ref
clearPlanArtifact,
savePlanArtifact,
loadAutoAllowedTools,
resumeActiveStream,
} = useCopilotStore()
// Initialize copilot
@@ -126,6 +127,7 @@ export const Copilot = forwardRef<CopilotRef, CopilotProps>(({ panelWidth }, ref
loadAutoAllowedTools,
currentChat,
isSendingMessage,
resumeActiveStream,
})
// Handle scroll management (80px stickiness for copilot)
@@ -421,8 +423,8 @@ export const Copilot = forwardRef<CopilotRef, CopilotProps>(({ panelWidth }, ref
</div>
</div>
{/* Show loading state until fully initialized */}
{!isInitialized ? (
{/* Show loading state until fully initialized, but skip if actively streaming (resume case) */}
{!isInitialized && !isSendingMessage ? (
<div className='flex h-full w-full items-center justify-center'>
<div className='flex flex-col items-center gap-3'>
<p className='text-muted-foreground text-sm'>Loading copilot</p>

View File

@@ -14,6 +14,7 @@ interface UseCopilotInitializationProps {
loadAutoAllowedTools: () => Promise<void>
currentChat: any
isSendingMessage: boolean
resumeActiveStream: () => Promise<boolean>
}
/**
@@ -32,11 +33,13 @@ export function useCopilotInitialization(props: UseCopilotInitializationProps) {
loadAutoAllowedTools,
currentChat,
isSendingMessage,
resumeActiveStream,
} = props
const [isInitialized, setIsInitialized] = useState(false)
const lastWorkflowIdRef = useRef<string | null>(null)
const hasMountedRef = useRef(false)
const hasResumedRef = useRef(false)
/** Initialize on mount - loads chats if needed. Never loads during streaming */
useEffect(() => {
@@ -105,6 +108,16 @@ export function useCopilotInitialization(props: UseCopilotInitializationProps) {
isSendingMessage,
])
/** Try to resume active stream on mount - runs early, before waiting for chats */
useEffect(() => {
if (hasResumedRef.current || isSendingMessage) return
hasResumedRef.current = true
// Resume immediately on mount - don't wait for isInitialized
resumeActiveStream().catch((err) => {
logger.warn('[Copilot] Failed to resume active stream', err)
})
}, [isSendingMessage, resumeActiveStream])
/** Load auto-allowed tools once on mount - runs immediately, independent of workflow */
const hasLoadedAutoAllowedToolsRef = useRef(false)
useEffect(() => {

View File

@@ -82,6 +82,7 @@ export interface SendMessageRequest {
executionId?: string
}>
commands?: string[]
resumeFromEventId?: number
}
/**
@@ -120,7 +121,7 @@ export async function sendStreamingMessage(
request: SendMessageRequest
): Promise<StreamingResponse> {
try {
const { abortSignal, ...requestBody } = request
const { abortSignal, resumeFromEventId, ...requestBody } = request
try {
const preview = Array.isArray((requestBody as any).contexts)
? (requestBody as any).contexts.map((c: any) => ({
@@ -136,8 +137,51 @@ export async function sendStreamingMessage(
? (requestBody as any).contexts.length
: 0,
contextsPreview: preview,
resumeFromEventId,
})
} catch {}
const streamId = request.userMessageId
if (typeof resumeFromEventId === 'number') {
if (!streamId) {
return {
success: false,
error: 'streamId is required to resume a stream',
status: 400,
}
}
const url = `/api/copilot/chat/stream?streamId=${encodeURIComponent(
streamId
)}&from=${encodeURIComponent(String(resumeFromEventId))}`
const response = await fetch(url, {
method: 'GET',
signal: abortSignal,
credentials: 'include',
})
if (!response.ok) {
const errorMessage = await handleApiError(response, 'Failed to resume streaming message')
return {
success: false,
error: errorMessage,
status: response.status,
}
}
if (!response.body) {
return {
success: false,
error: 'No response body received',
status: 500,
}
}
return {
success: true,
stream: response.body,
}
}
const response = await fetch('/api/copilot/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },

View File

@@ -0,0 +1,152 @@
import { createLogger } from '@sim/logger'
import { getRedisClient } from '@/lib/core/config/redis'
const logger = createLogger('CopilotStreamBuffer')
const STREAM_TTL_SECONDS = 60 * 60
const STREAM_EVENT_LIMIT = 5000
function getStreamKeyPrefix(streamId: string) {
return `copilot_stream:${streamId}`
}
function getEventsKey(streamId: string) {
return `${getStreamKeyPrefix(streamId)}:events`
}
function getSeqKey(streamId: string) {
return `${getStreamKeyPrefix(streamId)}:seq`
}
function getMetaKey(streamId: string) {
return `${getStreamKeyPrefix(streamId)}:meta`
}
export type StreamStatus = 'active' | 'complete' | 'error'
export type StreamMeta = {
status: StreamStatus
userId?: string
updatedAt?: string
error?: string
}
export type StreamEventEntry = {
eventId: number
streamId: string
event: Record<string, any>
}
export async function resetStreamBuffer(streamId: string): Promise<void> {
const redis = getRedisClient()
if (!redis) return
try {
await redis.del(getEventsKey(streamId), getSeqKey(streamId), getMetaKey(streamId))
} catch (error) {
logger.warn('Failed to reset stream buffer', {
streamId,
error: error instanceof Error ? error.message : String(error),
})
}
}
export async function setStreamMeta(
streamId: string,
meta: StreamMeta
): Promise<void> {
const redis = getRedisClient()
if (!redis) return
try {
const payload: Record<string, string> = {
status: meta.status,
updatedAt: meta.updatedAt || new Date().toISOString(),
}
if (meta.userId) payload.userId = meta.userId
if (meta.error) payload.error = meta.error
await redis.hset(getMetaKey(streamId), payload)
await redis.expire(getMetaKey(streamId), STREAM_TTL_SECONDS)
} catch (error) {
logger.warn('Failed to update stream meta', {
streamId,
error: error instanceof Error ? error.message : String(error),
})
}
}
export async function getStreamMeta(streamId: string): Promise<StreamMeta | null> {
const redis = getRedisClient()
if (!redis) return null
try {
const meta = await redis.hgetall(getMetaKey(streamId))
if (!meta || Object.keys(meta).length === 0) return null
return meta as StreamMeta
} catch (error) {
logger.warn('Failed to read stream meta', {
streamId,
error: error instanceof Error ? error.message : String(error),
})
return null
}
}
export async function appendStreamEvent(
streamId: string,
event: Record<string, any>
): Promise<StreamEventEntry> {
const redis = getRedisClient()
if (!redis) {
return { eventId: 0, streamId, event }
}
try {
const nextId = await redis.incr(getSeqKey(streamId))
const entry: StreamEventEntry = { eventId: nextId, streamId, event }
await redis.zadd(getEventsKey(streamId), nextId, JSON.stringify(entry))
const count = await redis.zcard(getEventsKey(streamId))
if (count > STREAM_EVENT_LIMIT) {
const trimCount = count - STREAM_EVENT_LIMIT
if (trimCount > 0) {
await redis.zremrangebyrank(getEventsKey(streamId), 0, trimCount - 1)
}
}
await redis.expire(getEventsKey(streamId), STREAM_TTL_SECONDS)
await redis.expire(getSeqKey(streamId), STREAM_TTL_SECONDS)
return entry
} catch (error) {
logger.warn('Failed to append stream event', {
streamId,
error: error instanceof Error ? error.message : String(error),
})
return { eventId: 0, streamId, event }
}
}
export async function readStreamEvents(
streamId: string,
afterEventId: number
): Promise<StreamEventEntry[]> {
const redis = getRedisClient()
if (!redis) return []
try {
const raw = await redis.zrangebyscore(getEventsKey(streamId), afterEventId + 1, '+inf')
return raw
.map((entry) => {
try {
return JSON.parse(entry) as StreamEventEntry
} catch {
return null
}
})
.filter((entry): entry is StreamEventEntry => Boolean(entry))
} catch (error) {
logger.warn('Failed to read stream events', {
streamId,
error: error instanceof Error ? error.message : String(error),
})
return []
}
}

View File

@@ -80,6 +80,7 @@ import { subscriptionKeys } from '@/hooks/queries/subscription'
import type {
ChatContext,
CopilotMessage,
CopilotStreamInfo,
CopilotStore,
CopilotToolCall,
MessageFileAttachment,
@@ -92,6 +93,69 @@ import type { WorkflowState } from '@/stores/workflows/workflow/types'
const logger = createLogger('CopilotStore')
const STREAM_STORAGE_KEY = 'copilot_active_stream'
/**
* Flag set on beforeunload to suppress continue option during page refresh/close.
* Aborts during unload should NOT show the continue button.
*/
let isPageUnloading = false
if (typeof window !== 'undefined') {
window.addEventListener('beforeunload', () => {
isPageUnloading = true
})
}
function readActiveStreamFromStorage(): CopilotStreamInfo | null {
if (typeof window === 'undefined') return null
try {
const raw = window.sessionStorage.getItem(STREAM_STORAGE_KEY)
logger.info('[Copilot] Reading stream from storage', {
hasRaw: !!raw,
rawPreview: raw ? raw.substring(0, 100) : null,
})
if (!raw) return null
const parsed = JSON.parse(raw) as CopilotStreamInfo
return parsed?.streamId ? parsed : null
} catch (e) {
logger.warn('[Copilot] Failed to read stream from storage', { error: String(e) })
return null
}
}
function writeActiveStreamToStorage(info: CopilotStreamInfo | null): void {
if (typeof window === 'undefined') return
try {
if (!info) {
logger.info('[Copilot] Clearing stream from storage', {
isPageUnloading,
stack: new Error().stack?.split('\n').slice(1, 4).join(' <- '),
})
window.sessionStorage.removeItem(STREAM_STORAGE_KEY)
return
}
logger.info('[Copilot] Writing stream to storage', {
streamId: info.streamId,
lastEventId: info.lastEventId,
})
window.sessionStorage.setItem(STREAM_STORAGE_KEY, JSON.stringify(info))
} catch {}
}
function updateActiveStreamEventId(
get: () => CopilotStore,
set: (next: Partial<CopilotStore>) => void,
streamId: string,
eventId: number
): void {
const current = get().activeStream
if (!current || current.streamId !== streamId) return
if (eventId <= (current.lastEventId || 0)) return
const next = { ...current, lastEventId: eventId }
set({ activeStream: next })
writeActiveStreamToStorage(next)
}
// On module load, clear any lingering diff preview (fresh page refresh)
try {
const diffStore = useWorkflowDiffStore.getState()
@@ -1033,6 +1097,28 @@ function appendContinueOptionBlock(blocks: any[]): any[] {
]
}
function stripContinueOption(content: string): string {
if (!content || !content.includes(CONTINUE_OPTIONS_TAG)) return content
const next = content.replace(CONTINUE_OPTIONS_TAG, '')
return next.replace(/\n{2,}\s*$/g, '\n').trimEnd()
}
function stripContinueOptionFromBlocks(blocks: any[]): any[] {
if (!Array.isArray(blocks)) return blocks
return blocks.flatMap((block) => {
if (
block?.type === TEXT_BLOCK_TYPE &&
typeof block.content === 'string' &&
block.content.includes(CONTINUE_OPTIONS_TAG)
) {
const nextContent = stripContinueOption(block.content)
if (!nextContent.trim()) return []
return [{ ...block, content: nextContent }]
}
return [block]
})
}
function beginThinkingBlock(context: StreamingContext) {
if (!context.currentThinkingBlock) {
context.currentThinkingBlock = contentBlockPool.get()
@@ -1118,12 +1204,18 @@ function appendSubAgentText(context: StreamingContext, parentToolCallId: string,
}
const sseHandlers: Record<string, SSEHandler> = {
chat_id: async (data, context, get) => {
chat_id: async (data, context, get, set) => {
context.newChatId = data.chatId
const { currentChat } = get()
const { currentChat, activeStream } = get()
if (!currentChat && context.newChatId) {
await get().handleNewChatCreation(context.newChatId)
}
// Update activeStream with chatId for resume purposes
if (activeStream && context.newChatId && !activeStream.chatId) {
const updatedStream = { ...activeStream, chatId: context.newChatId }
set({ activeStream: updatedStream })
writeActiveStreamToStorage(updatedStream)
}
},
title_updated: (_data, _context, get, set) => {
const title = _data.title
@@ -2072,6 +2164,7 @@ const initialState = {
toolCallsById: {} as Record<string, CopilotToolCall>,
suppressAutoSelect: false,
autoAllowedTools: [] as string[],
activeStream: null as CopilotStreamInfo | null,
messageQueue: [] as import('./types').QueuedMessage[],
suppressAbortContinueOption: false,
sensitiveCredentialIds: new Set<string>(),
@@ -2492,6 +2585,22 @@ export const useCopilotStore = create<CopilotStore>()(
currentUserMessageId: userMessage.id,
}))
const activeStream: CopilotStreamInfo = {
streamId: userMessage.id,
workflowId,
chatId: currentChat?.id,
userMessageId: userMessage.id,
assistantMessageId: streamingMessage.id,
lastEventId: 0,
resumeAttempts: 0,
userMessageContent: message,
fileAttachments,
contexts,
startedAt: Date.now(),
}
set({ activeStream })
writeActiveStreamToStorage(activeStream)
if (isFirstMessage) {
const optimisticTitle = message.length > 50 ? `${message.substring(0, 47)}...` : message
set((state) => ({
@@ -2616,6 +2725,8 @@ export const useCopilotStore = create<CopilotStore>()(
isSendingMessage: false,
abortController: null,
}))
set({ activeStream: null })
writeActiveStreamToStorage(null)
}
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') return
@@ -2629,14 +2740,240 @@ export const useCopilotStore = create<CopilotStore>()(
isSendingMessage: false,
abortController: null,
}))
set({ activeStream: null })
writeActiveStreamToStorage(null)
}
},
resumeActiveStream: async () => {
const stored = get().activeStream || readActiveStreamFromStorage()
logger.info('[Copilot] Resume check', {
hasStored: !!stored,
streamId: stored?.streamId,
lastEventId: stored?.lastEventId,
storedWorkflowId: stored?.workflowId,
storedChatId: stored?.chatId,
currentWorkflowId: get().workflowId,
isSendingMessage: get().isSendingMessage,
resumeAttempts: stored?.resumeAttempts,
})
if (!stored || !stored.streamId) return false
if (get().isSendingMessage) return false
if (get().workflowId && stored.workflowId !== get().workflowId) return false
if (stored.resumeAttempts >= 3) {
logger.warn('[Copilot] Too many resume attempts, giving up')
return false
}
const nextStream: CopilotStreamInfo = {
...stored,
resumeAttempts: (stored.resumeAttempts || 0) + 1,
}
set({ activeStream: nextStream })
writeActiveStreamToStorage(nextStream)
// Load existing chat messages from database if we have a chatId but no messages
let messages = get().messages
// Track if this is a fresh page load (no messages in memory)
const isFreshResume = messages.length === 0
if (isFreshResume && nextStream.chatId) {
try {
logger.info('[Copilot] Loading chat for resume', { chatId: nextStream.chatId })
const response = await fetch(`/api/copilot/chat?chatId=${nextStream.chatId}`)
if (response.ok) {
const data = await response.json()
if (data.success && data.chat) {
const normalizedMessages = normalizeMessagesForUI(data.chat.messages || [])
const toolCallsById = buildToolCallsById(normalizedMessages)
set({
currentChat: data.chat,
messages: normalizedMessages,
toolCallsById,
streamingPlanContent: data.chat.planArtifact || '',
})
messages = normalizedMessages
logger.info('[Copilot] Loaded chat for resume', {
chatId: nextStream.chatId,
messageCount: normalizedMessages.length,
})
}
}
} catch (e) {
logger.warn('[Copilot] Failed to load chat for resume', { error: String(e) })
}
}
// ALWAYS fetch buffered events when resuming (to ensure we have complete content)
let bufferedContent = ''
if (nextStream.lastEventId > 0) {
try {
logger.info('[Copilot] Fetching buffered events', {
streamId: nextStream.streamId,
lastEventId: nextStream.lastEventId,
isFreshResume,
})
const batchUrl = `/api/copilot/chat/stream?streamId=${encodeURIComponent(
nextStream.streamId
)}&from=0&to=${nextStream.lastEventId}&batch=true`
const batchResponse = await fetch(batchUrl, { credentials: 'include' })
if (batchResponse.ok) {
const batchData = await batchResponse.json()
if (batchData.success && Array.isArray(batchData.events)) {
// Extract text content from buffered events
for (const entry of batchData.events) {
const event = entry.event
if (event?.type === 'content' && typeof event.data === 'string') {
bufferedContent += event.data
}
}
logger.info('[Copilot] Loaded buffered content', {
eventCount: batchData.events.length,
contentLength: bufferedContent.length,
contentPreview: bufferedContent.slice(0, 100),
})
} else {
logger.warn('[Copilot] Batch response missing events', {
success: batchData.success,
hasEvents: Array.isArray(batchData.events),
})
}
} else {
logger.warn('[Copilot] Failed to fetch buffered events', {
status: batchResponse.status,
})
}
} catch (e) {
logger.warn('[Copilot] Failed to fetch buffered events', { error: String(e) })
}
}
let nextMessages = messages
let cleanedExisting = false
nextMessages = nextMessages.map((m) => {
if (m.id !== nextStream.assistantMessageId) return m
const hasContinueTag =
(typeof m.content === 'string' && m.content.includes(CONTINUE_OPTIONS_TAG)) ||
(Array.isArray(m.contentBlocks) &&
m.contentBlocks.some(
(b: any) =>
b?.type === TEXT_BLOCK_TYPE &&
typeof b.content === 'string' &&
b.content.includes(CONTINUE_OPTIONS_TAG)
))
if (!hasContinueTag) return m
cleanedExisting = true
return {
...m,
content: stripContinueOption(m.content || ''),
contentBlocks: stripContinueOptionFromBlocks(m.contentBlocks || []),
}
})
if (!messages.some((m) => m.id === nextStream.userMessageId)) {
const userMessage = createUserMessage(
nextStream.userMessageContent || '',
nextStream.fileAttachments,
nextStream.contexts,
nextStream.userMessageId
)
nextMessages = [...nextMessages, userMessage]
}
if (!nextMessages.some((m) => m.id === nextStream.assistantMessageId)) {
// Create assistant message with buffered content pre-loaded
const assistantMessage: CopilotMessage = {
...createStreamingMessage(),
id: nextStream.assistantMessageId,
content: bufferedContent,
contentBlocks: bufferedContent
? [{ type: TEXT_BLOCK_TYPE, content: bufferedContent, timestamp: Date.now() }]
: [],
}
nextMessages = [...nextMessages, assistantMessage]
} else if (bufferedContent) {
// Update existing assistant message with buffered content
nextMessages = nextMessages.map((m) =>
m.id === nextStream.assistantMessageId
? {
...m,
content: bufferedContent,
contentBlocks: [
{ type: TEXT_BLOCK_TYPE, content: bufferedContent, timestamp: Date.now() },
],
}
: m
)
}
if (cleanedExisting || nextMessages !== messages || bufferedContent) {
set({ messages: nextMessages, currentUserMessageId: nextStream.userMessageId })
} else {
set({ currentUserMessageId: nextStream.userMessageId })
}
const abortController = new AbortController()
set({ isSendingMessage: true, abortController })
try {
logger.info('[Copilot] Attempting to resume stream', {
streamId: nextStream.streamId,
lastEventId: nextStream.lastEventId,
isFreshResume,
bufferedContentLength: bufferedContent.length,
assistantMessageId: nextStream.assistantMessageId,
chatId: nextStream.chatId,
})
const result = await sendStreamingMessage({
message: nextStream.userMessageContent || '',
userMessageId: nextStream.userMessageId,
workflowId: nextStream.workflowId,
chatId: nextStream.chatId || get().currentChat?.id || undefined,
mode: get().mode === 'ask' ? 'ask' : get().mode === 'plan' ? 'plan' : 'agent',
model: get().selectedModel,
prefetch: get().agentPrefetch,
stream: true,
resumeFromEventId: nextStream.lastEventId,
abortSignal: abortController.signal,
})
logger.info('[Copilot] Resume stream result', {
success: result.success,
hasStream: !!result.stream,
error: result.error,
})
if (result.success && result.stream) {
await get().handleStreamingResponse(
result.stream,
nextStream.assistantMessageId,
true,
nextStream.userMessageId
)
return true
}
set({ isSendingMessage: false, abortController: null })
} catch (error) {
// Handle AbortError gracefully - expected when user aborts
if (error instanceof Error && (error.name === 'AbortError' || error.message.includes('aborted'))) {
logger.info('[Copilot] Resume stream aborted by user')
set({ isSendingMessage: false, abortController: null })
return false
}
logger.error('[Copilot] Failed to resume stream', {
error: error instanceof Error ? error.message : String(error),
})
set({ isSendingMessage: false, abortController: null })
}
return false
},
// Abort streaming
abortMessage: (options?: { suppressContinueOption?: boolean }) => {
const { abortController, isSendingMessage, messages } = get()
if (!isSendingMessage || !abortController) return
const suppressContinueOption = options?.suppressContinueOption === true
// Suppress continue option if explicitly requested OR if page is unloading (refresh/close)
const suppressContinueOption = options?.suppressContinueOption === true || isPageUnloading
set({ isAborting: true, suppressAbortContinueOption: suppressContinueOption })
try {
abortController.abort()
@@ -2678,6 +3015,13 @@ export const useCopilotStore = create<CopilotStore>()(
})
}
// Only clear active stream for user-initiated aborts, NOT page unload
// During page unload, keep the stream info so we can resume after refresh
if (!isPageUnloading) {
set({ activeStream: null })
writeActiveStreamToStorage(null)
}
// Immediately put all in-progress tools into aborted state
abortAllInProgressTools(set, get)
@@ -2704,6 +3048,11 @@ export const useCopilotStore = create<CopilotStore>()(
}
} catch {
set({ isSendingMessage: false, isAborting: false })
// Only clear active stream for user-initiated aborts, NOT page unload
if (!isPageUnloading) {
set({ activeStream: null })
writeActiveStreamToStorage(null)
}
}
},
@@ -3051,15 +3400,42 @@ export const useCopilotStore = create<CopilotStore>()(
subAgentToolCalls: {},
subAgentBlocks: {},
}
if (isContinuation) {
context.suppressContinueOption = true
}
if (isContinuation) {
const { messages } = get()
const existingMessage = messages.find((m) => m.id === assistantMessageId)
logger.info('[Copilot] Continuation init', {
hasMessage: !!existingMessage,
contentLength: existingMessage?.content?.length || 0,
contentPreview: existingMessage?.content?.slice(0, 100) || '',
contentBlocksCount: existingMessage?.contentBlocks?.length || 0,
})
if (existingMessage) {
if (existingMessage.content) context.accumulatedContent.append(existingMessage.content)
context.contentBlocks = existingMessage.contentBlocks
? [...existingMessage.contentBlocks]
: []
// Initialize with existing text content (should be buffered content we set earlier)
const existingContent = existingMessage.content || ''
if (existingContent) {
context.accumulatedContent.append(existingContent)
}
// Create fresh text block with existing content (don't reuse to avoid mutation issues)
if (existingContent) {
const textBlock = contentBlockPool.get()
textBlock.type = TEXT_BLOCK_TYPE
textBlock.content = existingContent
textBlock.timestamp = Date.now()
context.contentBlocks = [textBlock]
context.currentTextBlock = textBlock
}
// Copy over any non-text blocks (tool calls, thinking, etc) from existing message
if (existingMessage.contentBlocks) {
for (const block of existingMessage.contentBlocks) {
if (block.type !== TEXT_BLOCK_TYPE) {
context.contentBlocks.push({ ...block })
}
}
}
}
}
@@ -3074,7 +3450,8 @@ export const useCopilotStore = create<CopilotStore>()(
if (abortController?.signal.aborted) {
context.wasAborted = true
const { suppressAbortContinueOption } = get()
context.suppressContinueOption = suppressAbortContinueOption === true
// Suppress continue option if explicitly requested OR if page is unloading (refresh/close)
context.suppressContinueOption = suppressAbortContinueOption === true || isPageUnloading
if (suppressAbortContinueOption) {
set({ suppressAbortContinueOption: false })
}
@@ -3085,6 +3462,12 @@ export const useCopilotStore = create<CopilotStore>()(
break
}
const eventId = typeof data?.eventId === 'number' ? data.eventId : undefined
const streamId = typeof data?.streamId === 'string' ? data.streamId : undefined
if (eventId && streamId) {
updateActiveStreamEventId(get, set, streamId, eventId)
}
// Log SSE events for debugging
logger.info('[SSE] Received event', {
type: data.type,
@@ -3202,10 +3585,20 @@ export const useCopilotStore = create<CopilotStore>()(
: block
)
}
if (isContinuation) {
sanitizedContentBlocks = stripContinueOptionFromBlocks(sanitizedContentBlocks)
}
if (context.wasAborted && !context.suppressContinueOption) {
sanitizedContentBlocks = appendContinueOptionBlock(sanitizedContentBlocks)
}
if (!context.streamComplete && !context.wasAborted) {
const resumed = await get().resumeActiveStream()
if (resumed) {
return
}
}
if (context.contentBlocks) {
context.contentBlocks.forEach((block) => {
if (block.type === TEXT_BLOCK_TYPE || block.type === THINKING_BLOCK_TYPE) {
@@ -3215,10 +3608,13 @@ export const useCopilotStore = create<CopilotStore>()(
}
const finalContent = stripTodoTags(context.accumulatedContent.toString())
const finalContentStripped = isContinuation
? stripContinueOption(finalContent)
: finalContent
const finalContentWithOptions =
context.wasAborted && !context.suppressContinueOption
? appendContinueOption(finalContent)
: finalContent
: finalContentStripped
set((state) => {
const snapshotId = state.currentUserMessageId
const nextSnapshots =
@@ -3229,7 +3625,7 @@ export const useCopilotStore = create<CopilotStore>()(
return updated
})()
: state.messageSnapshots
return {
const nextState: Partial<CopilotStore> = {
messages: state.messages.map((msg) =>
msg.id === assistantMessageId
? {
@@ -3245,8 +3641,15 @@ export const useCopilotStore = create<CopilotStore>()(
currentUserMessageId: null,
messageSnapshots: nextSnapshots,
}
return nextState
})
// Only clear active stream if stream completed normally or user aborted (not page unload)
if ((context.streamComplete || context.wasAborted) && !isPageUnloading) {
set({ activeStream: null })
writeActiveStreamToStorage(null)
}
if (context.newChatId && !get().currentChat) {
await get().handleNewChatCreation(context.newChatId)
}

View File

@@ -33,6 +33,20 @@ export interface CopilotToolCall {
subAgentStreaming?: boolean
}
export interface CopilotStreamInfo {
streamId: string
workflowId: string
chatId?: string
userMessageId: string
assistantMessageId: string
lastEventId: number
resumeAttempts: number
userMessageContent: string
fileAttachments?: MessageFileAttachment[]
contexts?: ChatContext[]
startedAt: number
}
export interface MessageFileAttachment {
id: string
key: string
@@ -154,6 +168,9 @@ export interface CopilotState {
// Auto-allowed integration tools (tools that can run without confirmation)
autoAllowedTools: string[]
// Active stream metadata for reconnect/replay
activeStream: CopilotStreamInfo | null
// Message queue for messages sent while another is in progress
messageQueue: QueuedMessage[]
@@ -194,6 +211,7 @@ export interface CopilotActions {
toolCallState: 'accepted' | 'rejected' | 'error',
toolCallId?: string
) => void
resumeActiveStream: () => Promise<boolean>
setToolCallState: (toolCall: any, newState: ClientToolCallState, options?: any) => void
updateToolCallParams: (toolCallId: string, params: Record<string, any>) => void
sendDocsMessage: (query: string, options?: { stream?: boolean; topK?: number }) => Promise<void>