From 728463ace773d8530ab1db7b68f282a474023380 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Wed, 4 Feb 2026 12:33:53 -0800 Subject: [PATCH] Fix stream buffer --- apps/sim/app/api/copilot/chat/route.ts | 36 ++++++- .../lib/copilot/orchestrator/stream-buffer.ts | 98 +++++++++++++++++++ apps/sim/stores/panel/copilot/store.ts | 2 +- 3 files changed, 131 insertions(+), 5 deletions(-) diff --git a/apps/sim/app/api/copilot/chat/route.ts b/apps/sim/app/api/copilot/chat/route.ts index 631a7e54e..2188f80d3 100644 --- a/apps/sim/app/api/copilot/chat/route.ts +++ b/apps/sim/app/api/copilot/chat/route.ts @@ -26,7 +26,7 @@ import { tools } from '@/tools/registry' import { getLatestVersionTools, stripVersionSuffix } from '@/tools/utils' import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator' import { - appendStreamEvent, + createStreamEventWriter, resetStreamBuffer, setStreamMeta, } from '@/lib/copilot/orchestrator/stream-buffer' @@ -489,24 +489,44 @@ export async function POST(req: NextRequest) { if (stream) { const streamId = userMessageIdToUse + let eventWriter: ReturnType | null = null + let clientDisconnected = false const transformedStream = new ReadableStream({ async start(controller) { const encoder = new TextEncoder() await resetStreamBuffer(streamId) await setStreamMeta(streamId, { status: 'active', userId: authenticatedUserId }) + eventWriter = createStreamEventWriter(streamId) + + const shouldFlushEvent = (event: Record) => + event.type === 'tool_call' || + event.type === 'tool_result' || + event.type === 'tool_error' || + event.type === 'subagent_end' || + event.type === 'structured_result' || + event.type === 'subagent_result' || + event.type === 'done' || + event.type === 'error' const pushEvent = async (event: Record) => { - const entry = await appendStreamEvent(streamId, event) + if (!eventWriter) return + const entry = await eventWriter.write(event) + if (shouldFlushEvent(event)) { + await eventWriter.flush() + } const payload = { ...event, eventId: entry.eventId, streamId, } try { - controller.enqueue(encoder.encode(`data: ${JSON.stringify(payload)}\n\n`)) + if (!clientDisconnected) { + controller.enqueue(encoder.encode(`data: ${JSON.stringify(payload)}\n\n`)) + } } catch { - // Client disconnected - keep buffering + clientDisconnected = true + await eventWriter.flush() } } @@ -554,9 +574,11 @@ export async function POST(req: NextRequest) { }) .where(eq(copilotChats.id, actualChatId!)) } + await eventWriter.close() await setStreamMeta(streamId, { status: 'complete', userId: authenticatedUserId }) } catch (error) { logger.error(`[${tracker.requestId}] Orchestration error:`, error) + await eventWriter.close() await setStreamMeta(streamId, { status: 'error', userId: authenticatedUserId, @@ -572,6 +594,12 @@ export async function POST(req: NextRequest) { controller.close() } }, + async cancel() { + clientDisconnected = true + if (eventWriter) { + await eventWriter.flush() + } + }, }) return new Response(transformedStream, { diff --git a/apps/sim/lib/copilot/orchestrator/stream-buffer.ts b/apps/sim/lib/copilot/orchestrator/stream-buffer.ts index b1b37bb55..236047841 100644 --- a/apps/sim/lib/copilot/orchestrator/stream-buffer.ts +++ b/apps/sim/lib/copilot/orchestrator/stream-buffer.ts @@ -5,6 +5,9 @@ const logger = createLogger('CopilotStreamBuffer') const STREAM_TTL_SECONDS = 60 * 60 const STREAM_EVENT_LIMIT = 5000 +const STREAM_RESERVE_BATCH = 200 +const STREAM_FLUSH_INTERVAL_MS = 15 +const STREAM_FLUSH_MAX_BATCH = 200 const APPEND_STREAM_EVENT_LUA = ` local seqKey = KEYS[1] @@ -56,6 +59,12 @@ export type StreamEventEntry = { event: Record } +export type StreamEventWriter = { + write: (event: Record) => Promise + flush: () => Promise + close: () => Promise +} + export async function resetStreamBuffer(streamId: string): Promise { const redis = getRedisClient() if (!redis) return @@ -140,6 +149,95 @@ export async function appendStreamEvent( } } +export function createStreamEventWriter(streamId: string): StreamEventWriter { + const redis = getRedisClient() + if (!redis) { + return { + write: async (event) => ({ eventId: 0, streamId, event }), + flush: async () => {}, + close: async () => {}, + } + } + + let pending: StreamEventEntry[] = [] + let nextEventId = 0 + let maxReservedId = 0 + let flushTimer: ReturnType | null = null + let isFlushing = false + + const scheduleFlush = () => { + if (flushTimer) return + flushTimer = setTimeout(() => { + flushTimer = null + void flush() + }, STREAM_FLUSH_INTERVAL_MS) + } + + const reserveIds = async (minCount: number) => { + const reserveCount = Math.max(STREAM_RESERVE_BATCH, minCount) + const newMax = await redis.incrby(getSeqKey(streamId), reserveCount) + const startId = newMax - reserveCount + 1 + if (nextEventId === 0 || nextEventId > maxReservedId) { + nextEventId = startId + maxReservedId = newMax + } + } + + const flush = async () => { + if (isFlushing || pending.length === 0) return + isFlushing = true + const batch = pending + pending = [] + try { + const key = getEventsKey(streamId) + const zaddArgs: (string | number)[] = [] + for (const entry of batch) { + zaddArgs.push(entry.eventId, JSON.stringify(entry)) + } + const pipeline = redis.pipeline() + pipeline.zadd(key, ...(zaddArgs as any)) + pipeline.expire(key, STREAM_TTL_SECONDS) + pipeline.expire(getSeqKey(streamId), STREAM_TTL_SECONDS) + pipeline.zremrangebyrank(key, 0, -STREAM_EVENT_LIMIT - 1) + await pipeline.exec() + } catch (error) { + logger.warn('Failed to flush stream events', { + streamId, + error: error instanceof Error ? error.message : String(error), + }) + pending = batch.concat(pending) + } finally { + isFlushing = false + if (pending.length > 0) scheduleFlush() + } + } + + const write = async (event: Record) => { + if (nextEventId === 0 || nextEventId > maxReservedId) { + await reserveIds(1) + } + const eventId = nextEventId++ + const entry: StreamEventEntry = { eventId, streamId, event } + pending.push(entry) + if (pending.length >= STREAM_FLUSH_MAX_BATCH) { + await flush() + } else { + scheduleFlush() + } + return entry + } + + const close = async () => { + if (flushTimer) { + clearTimeout(flushTimer) + flushTimer = null + } + await flush() + } + + return { write, flush, close } +} + export async function readStreamEvents( streamId: string, afterEventId: number diff --git a/apps/sim/stores/panel/copilot/store.ts b/apps/sim/stores/panel/copilot/store.ts index 9d7bdb6dc..95cbfee48 100644 --- a/apps/sim/stores/panel/copilot/store.ts +++ b/apps/sim/stores/panel/copilot/store.ts @@ -3039,7 +3039,7 @@ export const useCopilotStore = create()( }) const batchUrl = `/api/copilot/chat/stream?streamId=${encodeURIComponent( nextStream.streamId - )}&from=0&batch=true` + )}&from=0&to=${encodeURIComponent(String(nextStream.lastEventId))}&batch=true` const batchResponse = await fetch(batchUrl, { credentials: 'include' }) if (batchResponse.ok) { const batchData = await batchResponse.json()