refactor(telegram): unify lane ingest and cleanup

This commit is contained in:
Ayaan Zaidi
2026-02-19 20:35:12 +05:30
parent 735644e002
commit 813fd60f21
2 changed files with 35 additions and 29 deletions

View File

@@ -261,7 +261,7 @@ export const dispatchTelegramMessage = async ({
},
});
};
const updateDraftLanesFromPartial = (text: string | undefined) => {
const ingestDraftLaneSegments = (text: string | undefined) => {
for (const segment of splitTextIntoLaneSegments(text)) {
if (segment.lane === "reasoning") {
reasoningStepState.noteReasoningHint();
@@ -655,7 +655,7 @@ export const dispatchTelegramMessage = async ({
disableBlockStreaming,
onPartialReply:
answerLane.stream || reasoningLane.stream
? (payload) => updateDraftLanesFromPartial(payload.text)
? (payload) => ingestDraftLaneSegments(payload.text)
: undefined,
onReasoningStream: reasoningLane.stream
? (payload) => {
@@ -667,13 +667,7 @@ export const dispatchTelegramMessage = async ({
resetDraftLaneState(reasoningLane);
splitReasoningOnNextStream = false;
}
for (const segment of splitTextIntoLaneSegments(payload.text)) {
if (segment.lane === "reasoning") {
reasoningStepState.noteReasoningHint();
reasoningStepState.noteReasoningDelivered();
}
updateDraftFromPartial(lanes[segment.lane], segment.text);
}
ingestDraftLaneSegments(payload.text);
}
: undefined,
onAssistantMessageStart: answerLane.stream
@@ -696,19 +690,32 @@ export const dispatchTelegramMessage = async ({
},
}));
} finally {
// Must stop() first to flush debounced content before clear() wipes state
const streamsShareHandle =
Boolean(answerLane.stream) &&
Boolean(reasoningLane.stream) &&
answerLane.stream === reasoningLane.stream;
await answerLane.stream?.stop();
if (!finalizedPreviewByLane.answer) {
await answerLane.stream?.clear();
// Must stop() first to flush debounced content before clear() wipes state.
const streamCleanupStates = new Map<
NonNullable<DraftLaneState["stream"]>,
{ shouldClear: boolean }
>();
const lanesToCleanup: Array<{ laneName: LaneName; lane: DraftLaneState }> = [
{ laneName: "answer", lane: answerLane },
{ laneName: "reasoning", lane: reasoningLane },
];
for (const laneState of lanesToCleanup) {
const stream = laneState.lane.stream;
if (!stream) {
continue;
}
const shouldClear = !finalizedPreviewByLane[laneState.laneName];
const existing = streamCleanupStates.get(stream);
if (!existing) {
streamCleanupStates.set(stream, { shouldClear });
continue;
}
existing.shouldClear = existing.shouldClear && shouldClear;
}
if (!streamsShareHandle) {
await reasoningLane.stream?.stop();
if (!finalizedPreviewByLane.reasoning) {
await reasoningLane.stream?.clear();
for (const [stream, cleanupState] of streamCleanupStates) {
await stream.stop();
if (cleanupState.shouldClear) {
await stream.clear();
}
}
}

View File

@@ -67,21 +67,21 @@ export type BufferedFinalAnswer = {
};
export function createTelegramReasoningStepState() {
let reasoningHint = false;
let reasoningDelivered = false;
let reasoningStatus: "none" | "hinted" | "delivered" = "none";
let bufferedFinalAnswer: BufferedFinalAnswer | undefined;
const noteReasoningHint = () => {
reasoningHint = true;
if (reasoningStatus === "none") {
reasoningStatus = "hinted";
}
};
const noteReasoningDelivered = () => {
reasoningHint = true;
reasoningDelivered = true;
reasoningStatus = "delivered";
};
const shouldBufferFinalAnswer = () => {
return reasoningHint && !reasoningDelivered && !bufferedFinalAnswer;
return reasoningStatus === "hinted" && !bufferedFinalAnswer;
};
const bufferFinalAnswer = (value: BufferedFinalAnswer) => {
@@ -95,8 +95,7 @@ export function createTelegramReasoningStepState() {
};
const resetForNextStep = () => {
reasoningHint = false;
reasoningDelivered = false;
reasoningStatus = "none";
bufferedFinalAnswer = undefined;
};