Fix stream reconnect

This commit is contained in:
Siddharth Ganesan
2026-04-04 17:26:36 -07:00
parent fefeb010de
commit 8848780f56
2 changed files with 313 additions and 168 deletions

View File

@@ -86,6 +86,7 @@ export async function GET(request: NextRequest) {
const url = new URL(request.url)
const streamId = url.searchParams.get('streamId') || ''
const afterCursor = url.searchParams.get('after') || ''
const batchMode = url.searchParams.get('batch') === 'true'
if (!streamId) {
return NextResponse.json({ error: 'streamId is required' }, { status: 400 })
@@ -101,6 +102,7 @@ export async function GET(request: NextRequest) {
logger.info('[Resume] Stream lookup', {
streamId,
afterCursor,
batchMode,
hasRun: !!run,
runStatus: run?.status,
})
@@ -108,6 +110,27 @@ export async function GET(request: NextRequest) {
return NextResponse.json({ error: 'Stream not found' }, { status: 404 })
}
if (batchMode) {
const afterSeq = afterCursor || '0'
const events = await readEvents(streamId, afterSeq)
const batchEvents = events.map((envelope) => ({
eventId: envelope.seq,
streamId: envelope.stream.streamId,
event: envelope,
}))
logger.info('[Resume] Batch response', {
streamId,
afterCursor: afterSeq,
eventCount: batchEvents.length,
runStatus: run.status,
})
return NextResponse.json({
success: true,
events: batchEvents,
status: run.status,
})
}
const startTime = Date.now()
const stream = new ReadableStream({

View File

@@ -122,10 +122,38 @@ type StreamToolUI = {
clientExecutable?: boolean
}
type StreamRecoveryResult = {
attached: boolean
hadStreamError: boolean
aborted: boolean
type StreamBatchEvent = {
eventId: number
streamId: string
event: Record<string, unknown>
}
type StreamBatchResponse = {
success: boolean
events: StreamBatchEvent[]
status: string
}
const TERMINAL_STREAM_STATUSES = new Set(['complete', 'error', 'cancelled'])
function isTerminalStreamStatus(status: string | null | undefined): boolean {
return TERMINAL_STREAM_STATUSES.has(status ?? '')
}
const sseEncoder = new TextEncoder()
function buildReplayStream(events: StreamBatchEvent[]): ReadableStream<Uint8Array> {
return new ReadableStream({
start(controller) {
const payload = events
.map(
(entry) =>
`data: ${JSON.stringify({ ...entry.event, eventId: entry.eventId, streamId: entry.streamId })}\n\n`
)
.join('')
controller.enqueue(sseEncoder.encode(payload))
controller.close()
},
})
}
function asPayloadRecord(value: unknown): StreamPayload | undefined {
@@ -313,15 +341,9 @@ export function useChat(
options?: { preserveExistingState?: boolean }
) => Promise<{ sawStreamError: boolean; sawComplete: boolean }>
>(async () => ({ sawStreamError: false, sawComplete: false }))
const reattachToStreamRef = useRef<
(params: {
assistantId: string
expectedGen: number
abortController: AbortController
preferredStreamId?: string
chatId?: string
}) => Promise<StreamRecoveryResult>
>(async () => ({ attached: false, hadStreamError: false, aborted: true }))
const retryReconnectRef = useRef<
(opts: { streamId: string; assistantId: string; gen: number }) => Promise<boolean>
>(async () => false)
const finalizeRef = useRef<(options?: { error?: boolean }) => void>(() => {})
const abortControllerRef = useRef<AbortController | null>(null)
@@ -551,25 +573,27 @@ export function useChat(
const abortController = new AbortController()
abortControllerRef.current = abortController
streamIdRef.current = activeStreamId
lastCursorRef.current = '0'
sendingRef.current = true
const assistantId = crypto.randomUUID()
const reconnect = async () => {
const recovery = await reattachToStreamRef.current({
const succeeded = await retryReconnectRef.current({
streamId: activeStreamId,
assistantId,
expectedGen: gen,
abortController,
preferredStreamId: activeStreamId,
chatId: chatHistory.id,
gen,
})
if (recovery.aborted) {
return
}
if (streamGenRef.current === gen) {
finalizeRef.current(
recovery.attached && !recovery.hadStreamError ? undefined : { error: true }
)
if (!succeeded && streamGenRef.current === gen) {
try {
finalizeRef.current({ error: true })
} catch {
sendingRef.current = false
setIsSending(false)
setIsReconnecting(false)
abortControllerRef.current = null
setError('Failed to reconnect to the active stream')
}
}
}
reconnect()
@@ -1256,135 +1280,234 @@ export function useChat(
[queryClient]
)
const reattachToStream = useCallback(
async (params: {
const fetchStreamBatch = useCallback(
async (
streamId: string,
afterCursor: string,
signal?: AbortSignal
): Promise<StreamBatchResponse> => {
const response = await fetch(
`/api/copilot/chat/stream?streamId=${encodeURIComponent(streamId)}&after=${encodeURIComponent(afterCursor)}&batch=true`,
{ signal }
)
if (!response.ok) {
throw new Error(`Stream resume batch failed: ${response.status}`)
}
return response.json()
},
[]
)
const attachToExistingStream = useCallback(
async (opts: {
streamId: string
assistantId: string
expectedGen: number
abortController: AbortController
preferredStreamId?: string
chatId?: string
}): Promise<StreamRecoveryResult> => {
const { assistantId, expectedGen, abortController, preferredStreamId, chatId } = params
let streamId = preferredStreamId
let lastError = RECONNECT_TAIL_ERROR
const isStale = () => streamGenRef.current !== expectedGen
const waitForRetry = async (ms: number) => {
await new Promise<void>((resolve) => {
const timeout = setTimeout(() => {
abortController.signal.removeEventListener('abort', onAbort)
resolve()
}, ms)
const onAbort = () => {
clearTimeout(timeout)
abortController.signal.removeEventListener('abort', onAbort)
resolve()
}
abortController.signal.addEventListener('abort', onAbort, { once: true })
})
}
initialBatch?: StreamBatchResponse | null
afterCursor?: string
}): Promise<{ error: boolean; aborted: boolean }> => {
const { streamId, assistantId, expectedGen, afterCursor = '0' } = opts
let latestCursor = afterCursor
let seedEvents = opts.initialBatch?.events ?? []
let streamStatus = opts.initialBatch?.status ?? 'unknown'
setIsSending(true)
setIsReconnecting(true)
setError(null)
try {
for (let attempt = 0; attempt <= MAX_RECONNECT_ATTEMPTS; attempt++) {
if (abortController.signal.aborted || isStale()) {
return { attached: false, hadStreamError: false, aborted: true }
}
if (attempt > 0) {
const delayMs = Math.min(
RECONNECT_BASE_DELAY_MS * 2 ** (attempt - 1),
RECONNECT_MAX_DELAY_MS
while (streamGenRef.current === expectedGen) {
if (seedEvents.length > 0) {
const replayResult = await processSSEStreamRef.current(
buildReplayStream(seedEvents).getReader(),
assistantId,
expectedGen,
{ preserveExistingState: true }
)
logger.warn('Reconnect attempt', {
streamId,
attempt,
maxAttempts: MAX_RECONNECT_ATTEMPTS,
delayMs,
})
setIsReconnecting(true)
await waitForRetry(delayMs)
if (abortController.signal.aborted || isStale()) {
return { attached: false, hadStreamError: false, aborted: true }
latestCursor = String(seedEvents[seedEvents.length - 1]?.eventId ?? latestCursor)
lastCursorRef.current = latestCursor
seedEvents = []
if (replayResult.sawStreamError) {
return { error: true, aborted: false }
}
}
if (!streamId && chatId) {
streamId = (await getActiveStreamIdForChat(chatId)) ?? undefined
}
if (!streamId) {
lastError = RECONNECT_TAIL_ERROR
} else {
try {
streamIdRef.current = streamId
const resumeAfter = lastCursorRef.current || '0'
const response = await fetch(
`/api/copilot/chat/stream?streamId=${streamId}&after=${encodeURIComponent(resumeAfter)}`,
{ signal: abortController.signal }
)
if (response.ok && response.body) {
setIsReconnecting(false)
const result = await processSSEStreamRef.current(
response.body.getReader(),
assistantId,
expectedGen,
{ preserveExistingState: true }
)
if (
!result.sawComplete &&
!result.sawStreamError &&
!isStale() &&
!abortController.signal.aborted
) {
continue
}
return { attached: true, hadStreamError: result.sawStreamError, aborted: false }
}
const errorData = await response.json().catch(() => ({}))
lastError =
(typeof errorData.error === 'string' ? errorData.error : undefined) ||
`Reconnect failed: ${response.status}`
if (chatId) {
streamId =
(typeof errorData.activeStreamId === 'string'
? errorData.activeStreamId
: undefined) ||
((await getActiveStreamIdForChat(chatId)) ?? undefined)
}
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
return { attached: false, hadStreamError: false, aborted: true }
}
lastError =
error instanceof Error ? error.message : 'Failed to reconnect to the active stream'
if (chatId) {
streamId = (await getActiveStreamIdForChat(chatId)) ?? streamId
}
if (isTerminalStreamStatus(streamStatus)) {
if (streamStatus === 'error') {
setError(RECONNECT_TAIL_ERROR)
}
return { error: streamStatus === 'error', aborted: false }
}
const activeAbort = abortControllerRef.current
if (!activeAbort || activeAbort.signal.aborted) {
return { error: false, aborted: true }
}
logger.info('Opening live stream tail', { streamId, afterCursor: latestCursor })
const sseRes = await fetch(
`/api/copilot/chat/stream?streamId=${encodeURIComponent(streamId)}&after=${encodeURIComponent(latestCursor)}`,
{ signal: activeAbort.signal }
)
if (!sseRes.ok || !sseRes.body) {
throw new Error(RECONNECT_TAIL_ERROR)
}
}
logger.error('All reconnect attempts exhausted', {
streamId,
maxAttempts: MAX_RECONNECT_ATTEMPTS,
})
setError(lastError)
return { attached: false, hadStreamError: true, aborted: false }
} finally {
if (!abortController.signal.aborted && !isStale()) {
setIsReconnecting(false)
const liveResult = await processSSEStreamRef.current(
sseRes.body.getReader(),
assistantId,
expectedGen,
{ preserveExistingState: true }
)
if (liveResult.sawStreamError) {
return { error: true, aborted: false }
}
if (liveResult.sawComplete) {
return { error: false, aborted: false }
}
setIsReconnecting(true)
latestCursor = lastCursorRef.current || latestCursor
logger.warn('Live stream ended without terminal event, fetching batch', {
streamId,
latestCursor,
})
const batch = await fetchStreamBatch(streamId, latestCursor, activeAbort.signal)
seedEvents = batch.events
streamStatus = batch.status
if (batch.events.length > 0) {
latestCursor = String(batch.events[batch.events.length - 1].eventId)
lastCursorRef.current = latestCursor
}
if (batch.events.length === 0 && !isTerminalStreamStatus(batch.status)) {
if (activeAbort.signal.aborted || streamGenRef.current !== expectedGen) {
return { error: false, aborted: true }
}
}
}
return { error: false, aborted: true }
} catch (err) {
if (err instanceof Error && err.name === 'AbortError') {
return { error: false, aborted: true }
}
throw err
} finally {
setIsReconnecting(false)
}
},
[getActiveStreamIdForChat]
[fetchStreamBatch]
)
reattachToStreamRef.current = reattachToStream
const resumeOrFinalize = useCallback(
async (opts: {
streamId: string
assistantId: string
gen: number
afterCursor: string
signal?: AbortSignal
}): Promise<void> => {
const { streamId, assistantId, gen, afterCursor, signal } = opts
const batch = await fetchStreamBatch(streamId, afterCursor, signal)
if (streamGenRef.current !== gen) return
if (isTerminalStreamStatus(batch.status)) {
if (batch.events.length > 0) {
await processSSEStreamRef.current(
buildReplayStream(batch.events).getReader(),
assistantId,
gen,
{ preserveExistingState: true }
)
}
finalizeRef.current(batch.status === 'error' ? { error: true } : undefined)
return
}
const reconnectResult = await attachToExistingStream({
streamId,
assistantId,
expectedGen: gen,
initialBatch: batch,
afterCursor:
batch.events.length > 0
? String(batch.events[batch.events.length - 1].eventId)
: afterCursor,
})
if (streamGenRef.current === gen && !reconnectResult.aborted) {
finalizeRef.current(reconnectResult.error ? { error: true } : undefined)
}
},
[fetchStreamBatch, attachToExistingStream]
)
const retryReconnect = useCallback(
async (opts: { streamId: string; assistantId: string; gen: number }): Promise<boolean> => {
const { streamId, assistantId, gen } = opts
for (let attempt = 0; attempt <= MAX_RECONNECT_ATTEMPTS; attempt++) {
if (streamGenRef.current !== gen) return true
if (abortControllerRef.current?.signal.aborted) return true
if (attempt > 0) {
const delayMs = Math.min(
RECONNECT_BASE_DELAY_MS * 2 ** (attempt - 1),
RECONNECT_MAX_DELAY_MS
)
logger.warn('Reconnect attempt', {
streamId,
attempt,
maxAttempts: MAX_RECONNECT_ATTEMPTS,
delayMs,
})
setIsReconnecting(true)
await new Promise((resolve) => setTimeout(resolve, delayMs))
if (streamGenRef.current !== gen) return true
if (abortControllerRef.current?.signal.aborted) return true
}
try {
await resumeOrFinalize({
streamId,
assistantId,
gen,
afterCursor: lastCursorRef.current || '0',
signal: abortControllerRef.current?.signal,
})
return true
} catch (err) {
if (err instanceof Error && err.name === 'AbortError') return true
logger.warn('Reconnect attempt failed', {
streamId,
attempt: attempt + 1,
error: err instanceof Error ? err.message : String(err),
})
}
}
logger.error('All reconnect attempts exhausted', {
streamId,
maxAttempts: MAX_RECONNECT_ATTEMPTS,
})
setIsReconnecting(false)
return false
},
[resumeOrFinalize]
)
retryReconnectRef.current = retryReconnect
const persistPartialResponse = useCallback(async () => {
const chatId = chatIdRef.current
@@ -1613,17 +1736,19 @@ export function useChat(
if (!response.ok) {
const errorData = await response.json().catch(() => ({}))
if (response.status === 409) {
const recovery = await reattachToStream({
const conflictStreamId =
typeof errorData.activeStreamId === 'string'
? errorData.activeStreamId
: userMessageId
streamIdRef.current = conflictStreamId
const succeeded = await retryReconnect({
streamId: conflictStreamId,
assistantId,
expectedGen: gen,
abortController,
preferredStreamId:
typeof errorData.activeStreamId === 'string' ? errorData.activeStreamId : undefined,
chatId: requestChatId,
gen,
})
if (recovery.aborted) return
if (succeeded) return
if (streamGenRef.current === gen) {
finalize(recovery.attached && !recovery.hadStreamError ? undefined : { error: true })
finalize({ error: true })
}
return
}
@@ -1636,41 +1761,38 @@ export function useChat(
if (streamGenRef.current === gen) {
if (streamResult.sawStreamError) {
finalize({ error: true })
} else if (!streamResult.sawComplete) {
const recovery = await reattachToStream({
assistantId,
expectedGen: gen,
abortController,
preferredStreamId: userMessageId,
chatId: requestChatId,
})
if (!recovery.aborted && streamGenRef.current === gen) {
finalize(recovery.attached && !recovery.hadStreamError ? undefined : { error: true })
}
} else {
finalize()
return
}
await resumeOrFinalize({
streamId: streamIdRef.current || userMessageId,
assistantId,
gen,
afterCursor: lastCursorRef.current || '0',
signal: abortController.signal,
})
}
} catch (err) {
if (err instanceof Error && err.name === 'AbortError') return
const recovery = await reattachToStream({
assistantId,
expectedGen: gen,
abortController,
preferredStreamId: userMessageId,
chatId: requestChatId,
})
if (recovery.aborted) return
if (!recovery.attached) {
setError(err instanceof Error ? err.message : 'Failed to send message')
const activeStreamId = streamIdRef.current
if (activeStreamId && streamGenRef.current === gen) {
const succeeded = await retryReconnect({
streamId: activeStreamId,
assistantId,
gen,
})
if (succeeded) return
}
setError(err instanceof Error ? err.message : 'Failed to send message')
if (streamGenRef.current === gen) {
finalize(recovery.attached && !recovery.hadStreamError ? undefined : { error: true })
finalize({ error: true })
}
return
}
},
[workspaceId, queryClient, processSSEStream, finalize, reattachToStream]
[workspaceId, queryClient, processSSEStream, finalize, resumeOrFinalize, retryReconnect]
)
sendMessageRef.current = sendMessage