From 813fd60f21da0405b5ae8a47fd44de2dc5c3a2a7 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Thu, 19 Feb 2026 20:35:12 +0530 Subject: [PATCH] refactor(telegram): unify lane ingest and cleanup --- src/telegram/bot-message-dispatch.ts | 49 ++++++++++++---------- src/telegram/reasoning-lane-coordinator.ts | 15 ++++--- 2 files changed, 35 insertions(+), 29 deletions(-) diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index e9a5fb0a4d..c1382b630e 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -261,7 +261,7 @@ export const dispatchTelegramMessage = async ({ }, }); }; - const updateDraftLanesFromPartial = (text: string | undefined) => { + const ingestDraftLaneSegments = (text: string | undefined) => { for (const segment of splitTextIntoLaneSegments(text)) { if (segment.lane === "reasoning") { reasoningStepState.noteReasoningHint(); @@ -655,7 +655,7 @@ export const dispatchTelegramMessage = async ({ disableBlockStreaming, onPartialReply: answerLane.stream || reasoningLane.stream - ? (payload) => updateDraftLanesFromPartial(payload.text) + ? (payload) => ingestDraftLaneSegments(payload.text) : undefined, onReasoningStream: reasoningLane.stream ? (payload) => { @@ -667,13 +667,7 @@ export const dispatchTelegramMessage = async ({ resetDraftLaneState(reasoningLane); splitReasoningOnNextStream = false; } - for (const segment of splitTextIntoLaneSegments(payload.text)) { - if (segment.lane === "reasoning") { - reasoningStepState.noteReasoningHint(); - reasoningStepState.noteReasoningDelivered(); - } - updateDraftFromPartial(lanes[segment.lane], segment.text); - } + ingestDraftLaneSegments(payload.text); } : undefined, onAssistantMessageStart: answerLane.stream @@ -696,19 +690,32 @@ export const dispatchTelegramMessage = async ({ }, })); } finally { - // Must stop() first to flush debounced content before clear() wipes state - const streamsShareHandle = - Boolean(answerLane.stream) && - Boolean(reasoningLane.stream) && - answerLane.stream === reasoningLane.stream; - await answerLane.stream?.stop(); - if (!finalizedPreviewByLane.answer) { - await answerLane.stream?.clear(); + // Must stop() first to flush debounced content before clear() wipes state. + const streamCleanupStates = new Map< + NonNullable, + { shouldClear: boolean } + >(); + const lanesToCleanup: Array<{ laneName: LaneName; lane: DraftLaneState }> = [ + { laneName: "answer", lane: answerLane }, + { laneName: "reasoning", lane: reasoningLane }, + ]; + for (const laneState of lanesToCleanup) { + const stream = laneState.lane.stream; + if (!stream) { + continue; + } + const shouldClear = !finalizedPreviewByLane[laneState.laneName]; + const existing = streamCleanupStates.get(stream); + if (!existing) { + streamCleanupStates.set(stream, { shouldClear }); + continue; + } + existing.shouldClear = existing.shouldClear && shouldClear; } - if (!streamsShareHandle) { - await reasoningLane.stream?.stop(); - if (!finalizedPreviewByLane.reasoning) { - await reasoningLane.stream?.clear(); + for (const [stream, cleanupState] of streamCleanupStates) { + await stream.stop(); + if (cleanupState.shouldClear) { + await stream.clear(); } } } diff --git a/src/telegram/reasoning-lane-coordinator.ts b/src/telegram/reasoning-lane-coordinator.ts index c510e4ae90..61bef050a2 100644 --- a/src/telegram/reasoning-lane-coordinator.ts +++ b/src/telegram/reasoning-lane-coordinator.ts @@ -67,21 +67,21 @@ export type BufferedFinalAnswer = { }; export function createTelegramReasoningStepState() { - let reasoningHint = false; - let reasoningDelivered = false; + let reasoningStatus: "none" | "hinted" | "delivered" = "none"; let bufferedFinalAnswer: BufferedFinalAnswer | undefined; const noteReasoningHint = () => { - reasoningHint = true; + if (reasoningStatus === "none") { + reasoningStatus = "hinted"; + } }; const noteReasoningDelivered = () => { - reasoningHint = true; - reasoningDelivered = true; + reasoningStatus = "delivered"; }; const shouldBufferFinalAnswer = () => { - return reasoningHint && !reasoningDelivered && !bufferedFinalAnswer; + return reasoningStatus === "hinted" && !bufferedFinalAnswer; }; const bufferFinalAnswer = (value: BufferedFinalAnswer) => { @@ -95,8 +95,7 @@ export function createTelegramReasoningStepState() { }; const resetForNextStep = () => { - reasoningHint = false; - reasoningDelivered = false; + reasoningStatus = "none"; bufferedFinalAnswer = undefined; };