Fix stream buffer

This commit is contained in:
Siddharth Ganesan
2026-02-04 12:33:53 -08:00
parent 8cea43d926
commit 728463ace7
3 changed files with 131 additions and 5 deletions

View File

@@ -26,7 +26,7 @@ import { tools } from '@/tools/registry'
import { getLatestVersionTools, stripVersionSuffix } from '@/tools/utils'
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
import {
appendStreamEvent,
createStreamEventWriter,
resetStreamBuffer,
setStreamMeta,
} from '@/lib/copilot/orchestrator/stream-buffer'
@@ -489,24 +489,44 @@ export async function POST(req: NextRequest) {
if (stream) {
const streamId = userMessageIdToUse
let eventWriter: ReturnType<typeof createStreamEventWriter> | null = null
let clientDisconnected = false
const transformedStream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder()
await resetStreamBuffer(streamId)
await setStreamMeta(streamId, { status: 'active', userId: authenticatedUserId })
eventWriter = createStreamEventWriter(streamId)
const shouldFlushEvent = (event: Record<string, any>) =>
event.type === 'tool_call' ||
event.type === 'tool_result' ||
event.type === 'tool_error' ||
event.type === 'subagent_end' ||
event.type === 'structured_result' ||
event.type === 'subagent_result' ||
event.type === 'done' ||
event.type === 'error'
const pushEvent = async (event: Record<string, any>) => {
const entry = await appendStreamEvent(streamId, event)
if (!eventWriter) return
const entry = await eventWriter.write(event)
if (shouldFlushEvent(event)) {
await eventWriter.flush()
}
const payload = {
...event,
eventId: entry.eventId,
streamId,
}
try {
controller.enqueue(encoder.encode(`data: ${JSON.stringify(payload)}\n\n`))
if (!clientDisconnected) {
controller.enqueue(encoder.encode(`data: ${JSON.stringify(payload)}\n\n`))
}
} catch {
// Client disconnected - keep buffering
clientDisconnected = true
await eventWriter.flush()
}
}
@@ -554,9 +574,11 @@ export async function POST(req: NextRequest) {
})
.where(eq(copilotChats.id, actualChatId!))
}
await eventWriter.close()
await setStreamMeta(streamId, { status: 'complete', userId: authenticatedUserId })
} catch (error) {
logger.error(`[${tracker.requestId}] Orchestration error:`, error)
await eventWriter.close()
await setStreamMeta(streamId, {
status: 'error',
userId: authenticatedUserId,
@@ -572,6 +594,12 @@ export async function POST(req: NextRequest) {
controller.close()
}
},
async cancel() {
clientDisconnected = true
if (eventWriter) {
await eventWriter.flush()
}
},
})
return new Response(transformedStream, {

View File

@@ -5,6 +5,9 @@ const logger = createLogger('CopilotStreamBuffer')
const STREAM_TTL_SECONDS = 60 * 60
const STREAM_EVENT_LIMIT = 5000
const STREAM_RESERVE_BATCH = 200
const STREAM_FLUSH_INTERVAL_MS = 15
const STREAM_FLUSH_MAX_BATCH = 200
const APPEND_STREAM_EVENT_LUA = `
local seqKey = KEYS[1]
@@ -56,6 +59,12 @@ export type StreamEventEntry = {
event: Record<string, any>
}
export type StreamEventWriter = {
write: (event: Record<string, any>) => Promise<StreamEventEntry>
flush: () => Promise<void>
close: () => Promise<void>
}
export async function resetStreamBuffer(streamId: string): Promise<void> {
const redis = getRedisClient()
if (!redis) return
@@ -140,6 +149,95 @@ export async function appendStreamEvent(
}
}
export function createStreamEventWriter(streamId: string): StreamEventWriter {
const redis = getRedisClient()
if (!redis) {
return {
write: async (event) => ({ eventId: 0, streamId, event }),
flush: async () => {},
close: async () => {},
}
}
let pending: StreamEventEntry[] = []
let nextEventId = 0
let maxReservedId = 0
let flushTimer: ReturnType<typeof setTimeout> | null = null
let isFlushing = false
const scheduleFlush = () => {
if (flushTimer) return
flushTimer = setTimeout(() => {
flushTimer = null
void flush()
}, STREAM_FLUSH_INTERVAL_MS)
}
const reserveIds = async (minCount: number) => {
const reserveCount = Math.max(STREAM_RESERVE_BATCH, minCount)
const newMax = await redis.incrby(getSeqKey(streamId), reserveCount)
const startId = newMax - reserveCount + 1
if (nextEventId === 0 || nextEventId > maxReservedId) {
nextEventId = startId
maxReservedId = newMax
}
}
const flush = async () => {
if (isFlushing || pending.length === 0) return
isFlushing = true
const batch = pending
pending = []
try {
const key = getEventsKey(streamId)
const zaddArgs: (string | number)[] = []
for (const entry of batch) {
zaddArgs.push(entry.eventId, JSON.stringify(entry))
}
const pipeline = redis.pipeline()
pipeline.zadd(key, ...(zaddArgs as any))
pipeline.expire(key, STREAM_TTL_SECONDS)
pipeline.expire(getSeqKey(streamId), STREAM_TTL_SECONDS)
pipeline.zremrangebyrank(key, 0, -STREAM_EVENT_LIMIT - 1)
await pipeline.exec()
} catch (error) {
logger.warn('Failed to flush stream events', {
streamId,
error: error instanceof Error ? error.message : String(error),
})
pending = batch.concat(pending)
} finally {
isFlushing = false
if (pending.length > 0) scheduleFlush()
}
}
const write = async (event: Record<string, any>) => {
if (nextEventId === 0 || nextEventId > maxReservedId) {
await reserveIds(1)
}
const eventId = nextEventId++
const entry: StreamEventEntry = { eventId, streamId, event }
pending.push(entry)
if (pending.length >= STREAM_FLUSH_MAX_BATCH) {
await flush()
} else {
scheduleFlush()
}
return entry
}
const close = async () => {
if (flushTimer) {
clearTimeout(flushTimer)
flushTimer = null
}
await flush()
}
return { write, flush, close }
}
export async function readStreamEvents(
streamId: string,
afterEventId: number

View File

@@ -3039,7 +3039,7 @@ export const useCopilotStore = create<CopilotStore>()(
})
const batchUrl = `/api/copilot/chat/stream?streamId=${encodeURIComponent(
nextStream.streamId
)}&from=0&batch=true`
)}&from=0&to=${encodeURIComponent(String(nextStream.lastEventId))}&batch=true`
const batchResponse = await fetch(batchUrl, { credentials: 'include' })
if (batchResponse.ok) {
const batchData = await batchResponse.json()