fix(client): network drops reconnecting behaviour (#3775)

* fix(client): network drops reconnecting behaviour

* address bugbot comments

* address comments

* address queued message conflicts during retries

* fix more review comments

* fix branch

* fix non-clear bug

* fix
This commit is contained in:
Vikhyath Mondreti
2026-03-25 17:34:23 -07:00
committed by GitHub
parent 104ad03004
commit 5a5c33d326
6 changed files with 677 additions and 148 deletions

View File

@@ -23,6 +23,7 @@ import type { ChatContext } from '@/stores/panel'
interface MothershipChatProps {
messages: ChatMessage[]
isSending: boolean
isReconnecting?: boolean
onSubmit: (
text: string,
fileAttachments?: FileAttachmentForApi[],
@@ -71,6 +72,7 @@ const LAYOUT_STYLES = {
export function MothershipChat({
messages,
isSending,
isReconnecting = false,
onSubmit,
onStopGeneration,
messageQueue,
@@ -88,7 +90,8 @@ export function MothershipChat({
className,
}: MothershipChatProps) {
const styles = LAYOUT_STYLES[layout]
const { ref: scrollContainerRef, scrollToBottom } = useAutoScroll(isSending)
const isStreamActive = isSending || isReconnecting
const { ref: scrollContainerRef, scrollToBottom } = useAutoScroll(isStreamActive)
const hasMessages = messages.length > 0
const initialScrollDoneRef = useRef(false)
@@ -131,7 +134,7 @@ export function MothershipChat({
msg.content ?? ''
)
const isLastAssistant = index === messages.length - 1
const isThisStreaming = isSending && isLastAssistant
const isThisStreaming = isStreamActive && isLastAssistant
if (!hasAnyBlocks && !msg.content?.trim() && isThisStreaming) {
return <PendingTagIndicator key={msg.id} />
@@ -175,7 +178,7 @@ export function MothershipChat({
/>
<UserInput
onSubmit={onSubmit}
isSending={isSending}
isSending={isStreamActive}
onStopGeneration={onStopGeneration}
isInitialView={false}
userId={userId}

View File

@@ -148,6 +148,7 @@ export function Home({ chatId }: HomeProps = {}) {
const {
messages,
isSending,
isReconnecting,
sendMessage,
stopGeneration,
resolvedChatId,
@@ -335,6 +336,7 @@ export function Home({ chatId }: HomeProps = {}) {
<MothershipChat
messages={messages}
isSending={isSending}
isReconnecting={isReconnecting}
onSubmit={handleSubmit}
onStopGeneration={stopGeneration}
messageQueue={messageQueue}

View File

@@ -8,7 +8,11 @@ import {
markRunToolManuallyStopped,
reportManualRunToolStop,
} from '@/lib/copilot/client-sse/run-tool-execution'
import { COPILOT_CHAT_API_PATH, MOTHERSHIP_CHAT_API_PATH } from '@/lib/copilot/constants'
import {
COPILOT_CHAT_API_PATH,
COPILOT_CHAT_STREAM_API_PATH,
MOTHERSHIP_CHAT_API_PATH,
} from '@/lib/copilot/constants'
import {
extractResourcesFromToolResult,
isResourceToolName,
@@ -19,6 +23,8 @@ import { getNextWorkflowColor } from '@/lib/workflows/colors'
import { invalidateResourceQueries } from '@/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-registry'
import { deploymentKeys } from '@/hooks/queries/deployments'
import {
fetchChatHistory,
type StreamSnapshot,
type TaskChatHistory,
type TaskStoredContentBlock,
type TaskStoredFileAttachment,
@@ -85,6 +91,76 @@ const STATE_TO_STATUS: Record<string, ToolCallStatus> = {
const DEPLOY_TOOL_NAMES = new Set(['deploy_api', 'deploy_chat', 'deploy_mcp', 'redeploy'])
const RECONNECT_TAIL_ERROR =
'Live reconnect failed before the stream finished. The latest response may be incomplete.'
const TERMINAL_STREAM_STATUSES = new Set(['complete', 'error', 'cancelled'])
interface StreamEventEnvelope {
eventId: number
streamId: string
event: Record<string, unknown>
}
interface StreamBatchResponse {
success: boolean
events: StreamEventEnvelope[]
status: string
}
interface StreamTerminationResult {
sawStreamError: boolean
sawDoneEvent: boolean
lastEventId: number
}
interface StreamProcessingOptions {
expectedGen?: number
initialLastEventId?: number
preserveExistingState?: boolean
}
interface AttachToStreamOptions {
streamId: string
assistantId: string
expectedGen: number
snapshot?: StreamSnapshot | null
initialLastEventId?: number
}
interface AttachToStreamResult {
aborted: boolean
error: boolean
}
interface PendingStreamRecovery {
streamId: string
snapshot?: StreamSnapshot | null
}
function isTerminalStreamStatus(status?: string | null): boolean {
return Boolean(status && TERMINAL_STREAM_STATUSES.has(status))
}
function isActiveStreamConflictError(input: unknown): boolean {
if (typeof input !== 'string') return false
return input.includes('A response is already in progress for this chat')
}
function buildReplayStream(events: StreamEventEnvelope[]): ReadableStream<Uint8Array> {
const encoder = new TextEncoder()
return new ReadableStream<Uint8Array>({
start(controller) {
if (events.length > 0) {
const payload = events
.map(
(entry) =>
`data: ${JSON.stringify({ ...entry.event, eventId: entry.eventId, streamId: entry.streamId })}\n\n`
)
.join('')
controller.enqueue(encoder.encode(payload))
}
controller.close()
},
})
}
function mapStoredBlock(block: TaskStoredContentBlock): ContentBlock {
const mapped: ContentBlock = {
@@ -339,15 +415,22 @@ export function useChat(
const [messageQueue, setMessageQueue] = useState<QueuedMessage[]>([])
const messageQueueRef = useRef<QueuedMessage[]>([])
messageQueueRef.current = messageQueue
const [pendingRecoveryMessage, setPendingRecoveryMessage] = useState<QueuedMessage | null>(null)
const pendingRecoveryMessageRef = useRef<QueuedMessage | null>(null)
pendingRecoveryMessageRef.current = pendingRecoveryMessage
const sendMessageRef = useRef<UseChatReturn['sendMessage']>(async () => {})
const processSSEStreamRef = useRef<
(
reader: ReadableStreamDefaultReader<Uint8Array>,
assistantId: string,
expectedGen?: number
) => Promise<boolean>
>(async () => false)
options?: StreamProcessingOptions
) => Promise<StreamTerminationResult>
>(async () => ({
sawStreamError: false,
sawDoneEvent: false,
lastEventId: 0,
}))
const finalizeRef = useRef<(options?: { error?: boolean }) => void>(() => {})
const abortControllerRef = useRef<AbortController | null>(null)
@@ -359,10 +442,12 @@ export function useChat(
const appliedChatIdRef = useRef<string | undefined>(undefined)
const pendingUserMsgRef = useRef<{ id: string; content: string } | null>(null)
const streamIdRef = useRef<string | undefined>(undefined)
const lastEventIdRef = useRef(0)
const sendingRef = useRef(false)
const streamGenRef = useRef(0)
const streamingContentRef = useRef('')
const streamingBlocksRef = useRef<ContentBlock[]>([])
const clientExecutionStartedRef = useRef<Set<string>>(new Set())
const executionStream = useExecutionStream()
const isHomePage = pathname.endsWith('/home')
@@ -420,6 +505,10 @@ export function useChat(
abortControllerRef.current = null
sendingRef.current = false
setIsSending(false)
setIsReconnecting(false)
lastEventIdRef.current = 0
pendingRecoveryMessageRef.current = null
setPendingRecoveryMessage(null)
if (abandonedChatId) {
queryClient.invalidateQueries({ queryKey: taskKeys.detail(abandonedChatId) })
}
@@ -441,6 +530,10 @@ export function useChat(
setStreamingFile(null)
streamingFileRef.current = null
setMessageQueue([])
lastEventIdRef.current = 0
clientExecutionStartedRef.current.clear()
pendingRecoveryMessageRef.current = null
setPendingRecoveryMessage(null)
}, [initialChatId, queryClient])
useEffect(() => {
@@ -461,180 +554,374 @@ export function useChat(
setStreamingFile(null)
streamingFileRef.current = null
setMessageQueue([])
lastEventIdRef.current = 0
clientExecutionStartedRef.current.clear()
pendingRecoveryMessageRef.current = null
setPendingRecoveryMessage(null)
}, [isHomePage])
const fetchStreamBatch = useCallback(
async (
streamId: string,
fromEventId: number,
signal?: AbortSignal
): Promise<StreamBatchResponse> => {
const response = await fetch(
`${COPILOT_CHAT_STREAM_API_PATH}?streamId=${encodeURIComponent(streamId)}&from=${fromEventId}&batch=true`,
{ signal }
)
if (!response.ok) {
throw new Error(`Stream resume batch failed: ${response.status}`)
}
return response.json()
},
[]
)
const attachToExistingStream = useCallback(
async ({
streamId,
assistantId,
expectedGen,
snapshot,
initialLastEventId = 0,
}: AttachToStreamOptions): Promise<AttachToStreamResult> => {
let latestEventId = initialLastEventId
let seedEvents = snapshot?.events ?? []
let streamStatus = snapshot?.status ?? 'unknown'
let attachAttempt = 0
setIsSending(true)
setIsReconnecting(true)
setError(null)
logger.info('Attaching to existing stream', {
streamId,
expectedGen,
initialLastEventId,
seedEventCount: seedEvents.length,
streamStatus,
})
try {
while (streamGenRef.current === expectedGen) {
if (seedEvents.length > 0) {
const replayResult = await processSSEStreamRef.current(
buildReplayStream(seedEvents).getReader(),
assistantId,
{
expectedGen,
initialLastEventId: latestEventId,
preserveExistingState: true,
}
)
latestEventId = Math.max(
replayResult.lastEventId,
seedEvents[seedEvents.length - 1]?.eventId ?? latestEventId
)
lastEventIdRef.current = latestEventId
seedEvents = []
if (replayResult.sawStreamError) {
logger.warn('Replay stream ended with error event', { streamId, latestEventId })
return { aborted: false, error: true }
}
}
if (isTerminalStreamStatus(streamStatus)) {
logger.info('Existing stream already reached terminal status', {
streamId,
latestEventId,
streamStatus,
})
if (streamStatus === 'error') {
setError(RECONNECT_TAIL_ERROR)
}
return { aborted: false, error: streamStatus === 'error' }
}
const activeAbortController = abortControllerRef.current
if (!activeAbortController) {
return { aborted: true, error: false }
}
logger.info('Opening live stream tail', {
streamId,
fromEventId: latestEventId,
attempt: attachAttempt,
})
const sseRes = await fetch(
`${COPILOT_CHAT_STREAM_API_PATH}?streamId=${encodeURIComponent(streamId)}&from=${latestEventId}`,
{ signal: activeAbortController.signal }
)
if (!sseRes.ok || !sseRes.body) {
throw new Error(RECONNECT_TAIL_ERROR)
}
setIsReconnecting(false)
const liveResult = await processSSEStreamRef.current(
sseRes.body.getReader(),
assistantId,
{
expectedGen,
initialLastEventId: latestEventId,
preserveExistingState: true,
}
)
latestEventId = Math.max(latestEventId, liveResult.lastEventId)
lastEventIdRef.current = latestEventId
if (liveResult.sawStreamError) {
logger.warn('Live stream tail ended with error event', { streamId, latestEventId })
return { aborted: false, error: true }
}
attachAttempt += 1
setIsReconnecting(true)
logger.warn('Live stream ended without terminal event, fetching replay batch', {
streamId,
latestEventId,
attempt: attachAttempt,
})
const batch = await fetchStreamBatch(
streamId,
latestEventId,
activeAbortController.signal
)
seedEvents = batch.events
streamStatus = batch.status
if (batch.events.length > 0) {
latestEventId = batch.events[batch.events.length - 1].eventId
lastEventIdRef.current = latestEventId
}
logger.info('Fetched replay batch after non-terminal stream close', {
streamId,
latestEventId,
streamStatus,
eventCount: batch.events.length,
attempt: attachAttempt,
})
if (batch.events.length === 0 && !isTerminalStreamStatus(batch.status)) {
logger.info('No new replay events yet; reopening active stream tail', {
streamId,
latestEventId,
streamStatus,
attempt: attachAttempt,
})
if (activeAbortController.signal.aborted || streamGenRef.current !== expectedGen) {
return { aborted: true, error: false }
}
}
}
return { aborted: true, error: false }
} catch (err) {
if (err instanceof Error && err.name === 'AbortError') {
return { aborted: true, error: false }
}
logger.warn('Failed to attach to existing stream', {
streamId,
latestEventId,
error: err instanceof Error ? err.message : String(err),
})
setError(err instanceof Error ? err.message : RECONNECT_TAIL_ERROR)
return { aborted: false, error: true }
} finally {
setIsReconnecting(false)
}
},
[fetchStreamBatch]
)
const applyChatHistorySnapshot = useCallback(
(history: TaskChatHistory, options?: { preserveActiveStreamingMessage?: boolean }) => {
const preserveActiveStreamingMessage = options?.preserveActiveStreamingMessage ?? false
const activeStreamId = history.activeStreamId
appliedChatIdRef.current = history.id
const mappedMessages = history.messages.map(mapStoredMessage)
const shouldPreserveActiveStreamingMessage =
preserveActiveStreamingMessage &&
sendingRef.current &&
Boolean(activeStreamId) &&
activeStreamId === streamIdRef.current
if (shouldPreserveActiveStreamingMessage) {
setMessages((prev) => {
const localStreamingAssistant = prev[prev.length - 1]
if (localStreamingAssistant?.role !== 'assistant') {
return mappedMessages
}
const nextMessages =
mappedMessages[mappedMessages.length - 1]?.role === 'assistant'
? mappedMessages.slice(0, -1)
: mappedMessages
return [...nextMessages, localStreamingAssistant]
})
} else {
setMessages(mappedMessages)
}
if (history.resources.some((r) => r.id === 'streaming-file')) {
fetch('/api/copilot/chat/resources', {
method: 'DELETE',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
chatId: history.id,
resourceType: 'file',
resourceId: 'streaming-file',
}),
}).catch(() => {})
}
const persistedResources = history.resources.filter((r) => r.id !== 'streaming-file')
if (persistedResources.length > 0) {
setResources(persistedResources)
setActiveResourceId(persistedResources[persistedResources.length - 1].id)
for (const resource of persistedResources) {
if (resource.type !== 'workflow') continue
ensureWorkflowInRegistry(resource.id, resource.title, workspaceId)
}
} else if (history.resources.some((r) => r.id === 'streaming-file')) {
setResources([])
setActiveResourceId(null)
}
},
[workspaceId]
)
const preparePendingStreamRecovery = useCallback(
async (chatId: string): Promise<PendingStreamRecovery | null> => {
const latestHistory = await fetchChatHistory(chatId)
queryClient.setQueryData(taskKeys.detail(chatId), latestHistory)
applyChatHistorySnapshot(latestHistory)
if (!latestHistory.activeStreamId) {
return null
}
return {
streamId: latestHistory.activeStreamId,
snapshot: latestHistory.streamSnapshot,
}
},
[applyChatHistorySnapshot, queryClient]
)
useEffect(() => {
if (!chatHistory || appliedChatIdRef.current === chatHistory.id) return
const activeStreamId = chatHistory.activeStreamId
const snapshot = chatHistory.streamSnapshot
appliedChatIdRef.current = chatHistory.id
const mappedMessages = chatHistory.messages.map(mapStoredMessage)
const shouldPreserveActiveStreamingMessage =
sendingRef.current && Boolean(activeStreamId) && activeStreamId === streamIdRef.current
if (shouldPreserveActiveStreamingMessage) {
setMessages((prev) => {
const localStreamingAssistant = prev[prev.length - 1]
if (localStreamingAssistant?.role !== 'assistant') {
return mappedMessages
}
const nextMessages =
mappedMessages[mappedMessages.length - 1]?.role === 'assistant'
? mappedMessages.slice(0, -1)
: mappedMessages
return [...nextMessages, localStreamingAssistant]
})
} else {
setMessages(mappedMessages)
}
if (chatHistory.resources.some((r) => r.id === 'streaming-file')) {
fetch('/api/copilot/chat/resources', {
method: 'DELETE',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
chatId: chatHistory.id,
resourceType: 'file',
resourceId: 'streaming-file',
}),
}).catch(() => {})
}
const persistedResources = chatHistory.resources.filter((r) => r.id !== 'streaming-file')
if (persistedResources.length > 0) {
setResources(persistedResources)
setActiveResourceId(persistedResources[persistedResources.length - 1].id)
for (const resource of persistedResources) {
if (resource.type !== 'workflow') continue
ensureWorkflowInRegistry(resource.id, resource.title, workspaceId)
}
} else if (chatHistory.resources.some((r) => r.id === 'streaming-file')) {
setResources([])
setActiveResourceId(null)
}
applyChatHistorySnapshot(chatHistory, { preserveActiveStreamingMessage: true })
if (activeStreamId && !sendingRef.current) {
const gen = ++streamGenRef.current
const abortController = new AbortController()
abortControllerRef.current = abortController
streamIdRef.current = activeStreamId
lastEventIdRef.current = snapshot?.events?.[snapshot.events.length - 1]?.eventId ?? 0
sendingRef.current = true
setIsReconnecting(true)
streamingContentRef.current = ''
streamingBlocksRef.current = []
clientExecutionStartedRef.current.clear()
const assistantId = crypto.randomUUID()
const reconnect = async () => {
let reconnectFailed = false
try {
const encoder = new TextEncoder()
const batchEvents = snapshot?.events ?? []
const streamStatus = snapshot?.status ?? ''
if (batchEvents.length === 0 && streamStatus === 'unknown') {
reconnectFailed = true
setError(RECONNECT_TAIL_ERROR)
return
}
setIsSending(true)
setIsReconnecting(false)
const lastEventId =
batchEvents.length > 0 ? batchEvents[batchEvents.length - 1].eventId : 0
const isStreamDone =
streamStatus === 'complete' || streamStatus === 'error' || streamStatus === 'cancelled'
const combinedStream = new ReadableStream<Uint8Array>({
async start(controller) {
if (batchEvents.length > 0) {
const sseText = batchEvents
.map((e) => `data: ${JSON.stringify(e.event)}\n`)
.join('\n')
controller.enqueue(encoder.encode(`${sseText}\n`))
}
if (!isStreamDone) {
try {
const sseRes = await fetch(
`/api/copilot/chat/stream?streamId=${activeStreamId}&from=${lastEventId}`,
{ signal: abortController.signal }
)
if (!sseRes.ok || !sseRes.body) {
reconnectFailed = true
logger.warn('SSE tail reconnect returned no readable body', {
status: sseRes.status,
streamId: activeStreamId,
})
setError(RECONNECT_TAIL_ERROR)
} else {
const reader = sseRes.body.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) break
controller.enqueue(value)
}
}
} catch (err) {
if (!(err instanceof Error && err.name === 'AbortError')) {
reconnectFailed = true
logger.warn('SSE tail failed during reconnect', err)
setError(RECONNECT_TAIL_ERROR)
}
}
}
controller.close()
},
})
const hadStreamError = await processSSEStreamRef.current(
combinedStream.getReader(),
const result = await attachToExistingStream({
streamId: activeStreamId,
assistantId,
gen
)
if (hadStreamError) {
reconnectFailed = true
expectedGen: gen,
snapshot,
initialLastEventId: lastEventIdRef.current,
})
if (streamGenRef.current === gen && !result.aborted) {
finalizeRef.current(result.error ? { error: true } : undefined)
}
} catch (err) {
if (err instanceof Error && err.name === 'AbortError') return
reconnectFailed = true
} finally {
setIsReconnecting(false)
logger.warn('Unexpected error during reconnect', {
streamId: activeStreamId,
chatId: chatHistory.id,
error: err instanceof Error ? err.message : String(err),
})
if (streamGenRef.current === gen) {
finalizeRef.current(reconnectFailed ? { error: true } : undefined)
try {
finalizeRef.current({ error: true })
} catch (finalizeError) {
logger.error('Reconnect fallback finalize failed', {
streamId: activeStreamId,
chatId: chatHistory.id,
error:
finalizeError instanceof Error ? finalizeError.message : String(finalizeError),
})
sendingRef.current = false
setIsSending(false)
setIsReconnecting(false)
abortControllerRef.current = null
setError('Failed to reconnect to the active stream')
}
}
} finally {
if (abortControllerRef.current === abortController) {
abortControllerRef.current = null
}
}
}
reconnect()
}
}, [chatHistory, workspaceId, queryClient])
}, [applyChatHistorySnapshot, attachToExistingStream, chatHistory, queryClient])
const processSSEStream = useCallback(
async (
reader: ReadableStreamDefaultReader<Uint8Array>,
assistantId: string,
expectedGen?: number
options?: StreamProcessingOptions
) => {
const { expectedGen, initialLastEventId = 0, preserveExistingState = false } = options ?? {}
const decoder = new TextDecoder()
streamReaderRef.current = reader
let buffer = ''
const blocks: ContentBlock[] = []
const blocks: ContentBlock[] = preserveExistingState ? [...streamingBlocksRef.current] : []
const toolMap = new Map<string, number>()
const toolArgsMap = new Map<string, Record<string, unknown>>()
const clientExecutionStarted = new Set<string>()
const clientExecutionStarted = clientExecutionStartedRef.current
let activeSubagent: string | undefined
let activeCompactionId: string | undefined
let runningText = ''
let runningText = preserveExistingState ? streamingContentRef.current : ''
let lastContentSource: 'main' | 'subagent' | null = null
let streamRequestId: string | undefined
let lastEventId = initialLastEventId
let sawDoneEvent = false
streamingContentRef.current = ''
streamingBlocksRef.current = []
if (!preserveExistingState) {
streamingContentRef.current = ''
streamingBlocksRef.current = []
}
for (const [index, block] of blocks.entries()) {
if (block.type === 'tool_call' && block.toolCall?.id) {
toolMap.set(block.toolCall.id, index)
if (block.toolCall.params) {
toolArgsMap.set(block.toolCall.id, block.toolCall.params)
}
}
}
const ensureTextBlock = (): ContentBlock => {
const last = blocks[blocks.length - 1]
@@ -716,6 +1003,14 @@ export function useChat(
continue
}
if (typeof (parsed as SSEPayload & { eventId?: unknown }).eventId === 'number') {
lastEventId = Math.max(
lastEventId,
(parsed as SSEPayload & { eventId: number }).eventId
)
lastEventIdRef.current = lastEventId
}
logger.debug('SSE event received', parsed)
switch (parsed.type) {
case 'chat_id': {
@@ -1167,6 +1462,10 @@ export function useChat(
appendInlineErrorTag(buildInlineErrorTag(parsed))
break
}
case 'done': {
sawDoneEvent = true
break
}
}
}
}
@@ -1175,7 +1474,11 @@ export function useChat(
streamReaderRef.current = null
}
}
return sawStreamError
return {
sawStreamError,
sawDoneEvent,
lastEventId,
}
},
[workspaceId, queryClient, addResource, removeResource]
)
@@ -1247,6 +1550,16 @@ export function useChat(
const messagesRef = useRef(messages)
messagesRef.current = messages
const visibleMessageQueue = useMemo(
() =>
pendingRecoveryMessage
? [
pendingRecoveryMessage,
...messageQueue.filter((msg) => msg.id !== pendingRecoveryMessage.id),
]
: messageQueue,
[messageQueue, pendingRecoveryMessage]
)
const finalize = useCallback(
(options?: { error?: boolean }) => {
@@ -1267,6 +1580,21 @@ export function useChat(
return
}
const recoveryMessage = pendingRecoveryMessageRef.current
if (recoveryMessage) {
setPendingRecoveryMessage(null)
const gen = streamGenRef.current
queueMicrotask(() => {
if (streamGenRef.current !== gen) return
sendMessageRef.current(
recoveryMessage.content,
recoveryMessage.fileAttachments,
recoveryMessage.contexts
)
})
return
}
const next = messageQueueRef.current[0]
if (next) {
setMessageQueue((prev) => prev.filter((m) => m.id !== next.id))
@@ -1307,6 +1635,8 @@ export function useChat(
pendingUserMsgRef.current = { id: userMessageId, content: message }
streamIdRef.current = userMessageId
lastEventIdRef.current = 0
clientExecutionStartedRef.current.clear()
const storedAttachments: TaskStoredFileAttachment[] | undefined =
fileAttachments && fileAttachments.length > 0
@@ -1320,6 +1650,9 @@ export function useChat(
: undefined
const requestChatId = selectedChatIdRef.current ?? chatIdRef.current
const previousChatHistory = requestChatId
? queryClient.getQueryData<TaskChatHistory>(taskKeys.detail(requestChatId))
: undefined
if (requestChatId) {
const cachedUserMsg: TaskStoredMessage = {
id: userMessageId,
@@ -1339,6 +1672,7 @@ export function useChat(
}
const userAttachments = storedAttachments?.map(toDisplayAttachment)
const previousMessages = messagesRef.current
const messageContexts = contexts?.map((c) => ({
kind: c.kind,
@@ -1402,20 +1736,132 @@ export function useChat(
if (!response.body) throw new Error('No response body')
const hadStreamError = await processSSEStream(response.body.getReader(), assistantId, gen)
const termination = await processSSEStream(response.body.getReader(), assistantId, {
expectedGen: gen,
})
if (streamGenRef.current === gen) {
finalize(hadStreamError ? { error: true } : undefined)
if (termination.sawStreamError) {
finalize({ error: true })
return
}
const batch = await fetchStreamBatch(
userMessageId,
termination.lastEventId,
abortController.signal
)
if (streamGenRef.current !== gen) {
return
}
if (isTerminalStreamStatus(batch.status)) {
finalize(batch.status === 'error' ? { error: true } : undefined)
return
}
logger.warn(
'Primary stream ended without terminal event, attempting in-place reconnect',
{
streamId: userMessageId,
lastEventId: termination.lastEventId,
streamStatus: batch.status,
sawDoneEvent: termination.sawDoneEvent,
}
)
const reconnectResult = await attachToExistingStream({
streamId: userMessageId,
assistantId,
expectedGen: gen,
snapshot: {
events: batch.events,
status: batch.status,
},
initialLastEventId:
batch.events[batch.events.length - 1]?.eventId ?? termination.lastEventId,
})
if (streamGenRef.current === gen && !reconnectResult.aborted) {
finalize(reconnectResult.error ? { error: true } : undefined)
}
}
} catch (err) {
if (err instanceof Error && err.name === 'AbortError') return
setError(err instanceof Error ? err.message : 'Failed to send message')
const errorMessage = err instanceof Error ? err.message : 'Failed to send message'
if (requestChatId && isActiveStreamConflictError(errorMessage)) {
logger.info('Active stream conflict detected while sending message; reattaching', {
chatId: requestChatId,
attemptedStreamId: userMessageId,
})
if (previousChatHistory) {
queryClient.setQueryData(taskKeys.detail(requestChatId), previousChatHistory)
}
setMessages(previousMessages)
const queuedMessage: QueuedMessage = {
id: crypto.randomUUID(),
content: message,
fileAttachments,
contexts,
}
pendingRecoveryMessageRef.current = queuedMessage
setPendingRecoveryMessage(queuedMessage)
try {
const pendingRecovery = await preparePendingStreamRecovery(requestChatId)
if (!pendingRecovery) {
setError(errorMessage)
if (streamGenRef.current === gen) {
finalize({ error: true })
}
return
}
streamIdRef.current = pendingRecovery.streamId
lastEventIdRef.current =
pendingRecovery.snapshot?.events?.[pendingRecovery.snapshot.events.length - 1]
?.eventId ?? 0
const rehydratedMessages = messagesRef.current
const lastAssistantMsg = [...rehydratedMessages]
.reverse()
.find((m) => m.role === 'assistant')
const recoveryAssistantId = lastAssistantMsg?.id ?? assistantId
const reconnectResult = await attachToExistingStream({
streamId: pendingRecovery.streamId,
assistantId: recoveryAssistantId,
expectedGen: gen,
snapshot: pendingRecovery.snapshot,
initialLastEventId: lastEventIdRef.current,
})
if (streamGenRef.current === gen && !reconnectResult.aborted) {
finalize(reconnectResult.error ? { error: true } : undefined)
}
return
} catch (recoveryError) {
logger.warn('Failed to recover active stream after conflict', {
chatId: requestChatId,
error: recoveryError instanceof Error ? recoveryError.message : String(recoveryError),
})
}
}
setError(errorMessage)
if (streamGenRef.current === gen) {
finalize({ error: true })
}
return
}
},
[workspaceId, queryClient, processSSEStream, finalize]
[
workspaceId,
queryClient,
processSSEStream,
finalize,
attachToExistingStream,
preparePendingStreamRecovery,
]
)
sendMessageRef.current = sendMessage
@@ -1434,6 +1880,10 @@ export function useChat(
abortControllerRef.current = null
sendingRef.current = false
setIsSending(false)
setIsReconnecting(false)
lastEventIdRef.current = 0
pendingRecoveryMessageRef.current = null
setPendingRecoveryMessage(null)
setMessages((prev) =>
prev.map((msg) => {
@@ -1521,24 +1971,47 @@ export function useChat(
}, [invalidateChatQueries, persistPartialResponse, executionStream])
const removeFromQueue = useCallback((id: string) => {
if (pendingRecoveryMessageRef.current?.id === id) {
pendingRecoveryMessageRef.current = null
setPendingRecoveryMessage(null)
return
}
messageQueueRef.current = messageQueueRef.current.filter((m) => m.id !== id)
setMessageQueue((prev) => prev.filter((m) => m.id !== id))
}, [])
const sendNow = useCallback(
async (id: string) => {
const msg = messageQueueRef.current.find((m) => m.id === id)
const recoveryMessage = pendingRecoveryMessageRef.current
const msg =
recoveryMessage?.id === id
? recoveryMessage
: messageQueueRef.current.find((m) => m.id === id)
if (!msg) return
// Eagerly update ref so a rapid second click finds the message already gone
messageQueueRef.current = messageQueueRef.current.filter((m) => m.id !== id)
if (recoveryMessage?.id === id) {
pendingRecoveryMessageRef.current = null
setPendingRecoveryMessage(null)
} else {
messageQueueRef.current = messageQueueRef.current.filter((m) => m.id !== id)
}
await stopGeneration()
setMessageQueue((prev) => prev.filter((m) => m.id !== id))
if (recoveryMessage?.id !== id) {
setMessageQueue((prev) => prev.filter((m) => m.id !== id))
}
await sendMessage(msg.content, msg.fileAttachments, msg.contexts)
},
[stopGeneration, sendMessage]
)
const editQueuedMessage = useCallback((id: string): QueuedMessage | undefined => {
const recoveryMessage = pendingRecoveryMessageRef.current
if (recoveryMessage?.id === id) {
pendingRecoveryMessageRef.current = null
setPendingRecoveryMessage(null)
return recoveryMessage
}
const msg = messageQueueRef.current.find((m) => m.id === id)
if (!msg) return undefined
messageQueueRef.current = messageQueueRef.current.filter((m) => m.id !== id)
@@ -1552,6 +2025,9 @@ export function useChat(
abortControllerRef.current = null
streamGenRef.current++
sendingRef.current = false
lastEventIdRef.current = 0
clientExecutionStartedRef.current.clear()
pendingRecoveryMessageRef.current = null
}
}, [])
@@ -1569,7 +2045,7 @@ export function useChat(
addResource,
removeResource,
reorderResources,
messageQueue,
messageQueue: visibleMessageQueue,
removeFromQueue,
sendNow,
editQueuedMessage,

View File

@@ -318,6 +318,7 @@ export const Panel = memo(function Panel() {
const {
messages: copilotMessages,
isSending: copilotIsSending,
isReconnecting: copilotIsReconnecting,
sendMessage: copilotSendMessage,
stopGeneration: copilotStopGeneration,
resolvedChatId: copilotResolvedChatId,
@@ -812,6 +813,7 @@ export const Panel = memo(function Panel() {
className='min-h-0 flex-1'
messages={copilotMessages}
isSending={copilotIsSending}
isReconnecting={copilotIsReconnecting}
onSubmit={handleCopilotSubmit}
onStopGeneration={copilotStopGeneration}
messageQueue={copilotMessageQueue}

View File

@@ -129,7 +129,10 @@ export function useTasks(workspaceId?: string) {
})
}
async function fetchChatHistory(chatId: string, signal?: AbortSignal): Promise<TaskChatHistory> {
export async function fetchChatHistory(
chatId: string,
signal?: AbortSignal
): Promise<TaskChatHistory> {
const response = await fetch(`/api/copilot/chat?chatId=${chatId}`, { signal })
if (!response.ok) {

View File

@@ -294,6 +294,21 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
return new ReadableStream({
async start(controller) {
const encoder = new TextEncoder()
const markClientDisconnected = (reason: string) => {
if (clientDisconnected) return
clientDisconnected = true
logger.info(
appendCopilotLogContext('Client disconnected from live SSE stream', {
requestId,
messageId,
}),
{
streamId,
runId,
reason,
}
)
}
await resetStreamBuffer(streamId)
await setStreamMeta(streamId, { status: 'active', userId, executionId, runId })
@@ -381,7 +396,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
)
}
} catch {
clientDisconnected = true
markClientDisconnected('enqueue_failed')
}
}
@@ -424,7 +439,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
try {
controller.enqueue(encoder.encode(': keepalive\n\n'))
} catch {
clientDisconnected = true
markClientDisconnected('keepalive_failed')
}
}, 15_000)
@@ -498,6 +513,18 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
await eventWriter.close()
await setStreamMeta(streamId, { status: 'complete', userId, executionId, runId })
await updateRunStatus(runId, 'complete', { completedAt: new Date() }).catch(() => {})
if (clientDisconnected) {
logger.info(
appendCopilotLogContext('Orchestration completed after client disconnect', {
requestId,
messageId,
}),
{
streamId,
runId,
}
)
}
} catch (error) {
if (abortController.signal.aborted) {
logger.error(
@@ -544,6 +571,12 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
error: errorMessage,
}).catch(() => {})
} finally {
logger.info(appendCopilotLogContext('Closing live SSE stream', { requestId, messageId }), {
streamId,
runId,
clientDisconnected,
aborted: abortController.signal.aborted,
})
clearInterval(keepaliveInterval)
if (abortPoller) {
clearInterval(abortPoller)
@@ -566,6 +599,16 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
}
},
cancel() {
logger.info(
appendCopilotLogContext('ReadableStream cancel received from client', {
requestId,
messageId,
}),
{
streamId,
runId,
}
)
clientDisconnected = true
if (eventWriter) {
eventWriter.flush().catch(() => {})