fix: separate telegram reasoning and answer draft streams

This commit is contained in:
Ayaan Zaidi
2026-02-19 14:38:29 +05:30
parent a688ccf24a
commit 321e71c8a5
2 changed files with 317 additions and 128 deletions

View File

@@ -52,6 +52,15 @@ describe("dispatchTelegramMessage draft streaming", () => {
};
}
function setupDraftStreams(params?: { answerMessageId?: number; reasoningMessageId?: number }) {
const answerDraftStream = createDraftStream(params?.answerMessageId);
const reasoningDraftStream = createDraftStream(params?.reasoningMessageId);
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
return { answerDraftStream, reasoningDraftStream };
}
function createContext(overrides?: Partial<TelegramMessageContext>): TelegramMessageContext {
const base = {
ctxPayload: {},
@@ -152,6 +161,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect.objectContaining({
chatId: 123,
thread: { id: 777, scope: "dm" },
minInitialChars: 1,
}),
);
expect(draftStream.update).toHaveBeenCalledWith("Hello");
@@ -172,6 +182,27 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(draftStream.clear).toHaveBeenCalledTimes(1);
});
it("keeps a higher initial debounce threshold in block stream mode", async () => {
const draftStream = createDraftStream();
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Hello" });
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({ context: createContext(), streamMode: "block" });
expect(createTelegramDraftStream).toHaveBeenCalledWith(
expect.objectContaining({
minInitialChars: 30,
}),
);
});
it("keeps block streaming enabled when account config enables it", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
@@ -428,12 +459,12 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
});
it("does not force new message on reasoning end without previous output", async () => {
it("forces new message on reasoning end after streamed reasoning output", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
// Reasoning starts immediately (no previous text output)
// Reasoning starts immediately (no assistant-answer output yet)
await replyOptions?.onReasoningStream?.({ text: "Thinking..." });
// Reasoning ends
await replyOptions?.onReasoningEnd?.();
@@ -447,8 +478,109 @@ describe("dispatchTelegramMessage draft streaming", () => {
await dispatchWithContext({ context: createContext(), streamMode: "block" });
// No previous text output, so no forceNewMessage needed
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
// Reasoning stream produced preview output, so split for final answer.
expect(draftStream.forceNewMessage).toHaveBeenCalled();
});
it("does not finalize preview with reasoning payloads before answer payloads", async () => {
setupDraftStreams({ answerMessageId: 999 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Hi, I did what you asked and..." });
await dispatcherOptions.deliver({ text: "Reasoning:\n_step one_" }, { kind: "final" });
await dispatcherOptions.deliver(
{ text: "Hi, I did what you asked and..." },
{ kind: "final" },
);
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
// Keep reasoning as its own message.
expect(deliverReplies).toHaveBeenCalledTimes(1);
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [expect.objectContaining({ text: "Reasoning:\n_step one_" })],
}),
);
// Finalize preview with the actual answer instead of overwriting with reasoning.
expect(editMessageTelegram).toHaveBeenCalledTimes(1);
expect(editMessageTelegram).toHaveBeenCalledWith(
123,
999,
"Hi, I did what you asked and...",
expect.any(Object),
);
});
it("wires reasoning stream updates into telegram draft previews", async () => {
const { reasoningDraftStream } = setupDraftStreams({
answerMessageId: 999,
reasoningMessageId: 111,
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_Thinking..._" });
await dispatcherOptions.deliver({ text: "Final answer" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(reasoningDraftStream.update).toHaveBeenCalledWith("Reasoning:\n_Thinking..._");
});
it("keeps reasoning and answer streaming in separate preview lanes", async () => {
const { answerDraftStream, reasoningDraftStream } = setupDraftStreams({
answerMessageId: 999,
reasoningMessageId: 111,
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_Working on it..._" });
await replyOptions?.onPartialReply?.({ text: "Checking the directory..." });
await dispatcherOptions.deliver({ text: "Checking the directory..." }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(reasoningDraftStream.update).toHaveBeenCalledWith("Reasoning:\n_Working on it..._");
expect(answerDraftStream.update).toHaveBeenCalledWith("Checking the directory...");
expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled();
expect(reasoningDraftStream.forceNewMessage).not.toHaveBeenCalled();
});
it("does not edit reasoning preview bubble with final answer when no assistant partial arrived yet", async () => {
setupDraftStreams({ reasoningMessageId: 999 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_Working on it..._" });
await dispatcherOptions.deliver({ text: "Here's what I found." }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(editMessageTelegram).not.toHaveBeenCalled();
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [expect.objectContaining({ text: "Here's what I found." })],
}),
);
});
it("does not edit preview message when final payload is an error", async () => {

View File

@@ -33,6 +33,17 @@ const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again.";
/** Minimum chars before sending first streaming message (improves push notification UX) */
const DRAFT_MIN_INITIAL_CHARS = 30;
const REASONING_MESSAGE_PREFIX = "Reasoning:\n";
function isReasoningMessage(text?: string): boolean {
if (typeof text !== "string") {
return false;
}
const trimmed = text.trim();
return (
trimmed.startsWith(REASONING_MESSAGE_PREFIX) && trimmed.length > REASONING_MESSAGE_PREFIX.length
);
}
async function resolveStickerVisionSupport(cfg: OpenClawConfig, agentId: string) {
try {
@@ -97,101 +108,147 @@ export const dispatchTelegramMessage = async ({
const canStreamDraft = streamMode !== "off" && !accountBlockStreamingEnabled;
const draftReplyToMessageId =
replyToMode !== "off" && typeof msg.message_id === "number" ? msg.message_id : undefined;
const draftStream = canStreamDraft
const draftMinInitialChars = streamMode === "partial" ? 1 : DRAFT_MIN_INITIAL_CHARS;
const answerDraftStream = canStreamDraft
? createTelegramDraftStream({
api: bot.api,
chatId,
maxChars: draftMaxChars,
thread: threadSpec,
replyToMessageId: draftReplyToMessageId,
minInitialChars: DRAFT_MIN_INITIAL_CHARS,
minInitialChars: draftMinInitialChars,
log: logVerbose,
warn: logVerbose,
})
: undefined;
const draftChunking =
draftStream && streamMode === "block"
const reasoningDraftStream = canStreamDraft
? createTelegramDraftStream({
api: bot.api,
chatId,
maxChars: draftMaxChars,
thread: threadSpec,
replyToMessageId: draftReplyToMessageId,
minInitialChars: draftMinInitialChars,
log: logVerbose,
warn: logVerbose,
})
: undefined;
const answerDraftChunking =
answerDraftStream && streamMode === "block"
? resolveTelegramDraftStreamingChunking(cfg, route.accountId)
: undefined;
const shouldSplitPreviewMessages = streamMode === "block";
const draftChunker = draftChunking ? new EmbeddedBlockChunker(draftChunking) : undefined;
const answerDraftChunker = answerDraftChunking
? new EmbeddedBlockChunker(answerDraftChunking)
: undefined;
const reasoningDraftChunking =
reasoningDraftStream && streamMode === "block"
? resolveTelegramDraftStreamingChunking(cfg, route.accountId)
: undefined;
const reasoningDraftChunker = reasoningDraftChunking
? new EmbeddedBlockChunker(reasoningDraftChunking)
: undefined;
const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId);
let lastPartialText = "";
let draftText = "";
let hasStreamedMessage = false;
const updateDraftFromPartial = (text?: string) => {
if (!draftStream || !text) {
type DraftLaneState = {
stream: ReturnType<typeof createTelegramDraftStream> | undefined;
lastPartialText: string;
draftText: string;
hasStreamedMessage: boolean;
chunker: EmbeddedBlockChunker | undefined;
};
const answerLane: DraftLaneState = {
stream: answerDraftStream,
lastPartialText: "",
draftText: "",
hasStreamedMessage: false,
chunker: answerDraftChunker,
};
const reasoningLane: DraftLaneState = {
stream: reasoningDraftStream,
lastPartialText: "",
draftText: "",
hasStreamedMessage: false,
chunker: reasoningDraftChunker,
};
const resetDraftLaneState = (lane: DraftLaneState) => {
lane.lastPartialText = "";
lane.draftText = "";
lane.hasStreamedMessage = false;
lane.chunker?.reset();
};
const updateDraftFromPartial = (lane: DraftLaneState, text: string | undefined) => {
const laneStream = lane.stream;
if (!laneStream || !text) {
return;
}
if (text === lastPartialText) {
if (text === lane.lastPartialText) {
return;
}
// Mark that we've received streaming content (for forceNewMessage decision).
hasStreamedMessage = true;
lane.hasStreamedMessage = true;
if (streamMode === "partial") {
// Some providers briefly emit a shorter prefix snapshot (for example
// "Sure." -> "Sure" -> "Sure."). Keep the longer preview to avoid
// visible punctuation flicker.
if (
lastPartialText &&
lastPartialText.startsWith(text) &&
text.length < lastPartialText.length
lane.lastPartialText &&
lane.lastPartialText.startsWith(text) &&
text.length < lane.lastPartialText.length
) {
return;
}
lastPartialText = text;
draftStream.update(text);
lane.lastPartialText = text;
laneStream.update(text);
return;
}
let delta = text;
if (text.startsWith(lastPartialText)) {
delta = text.slice(lastPartialText.length);
if (text.startsWith(lane.lastPartialText)) {
delta = text.slice(lane.lastPartialText.length);
} else {
// Streaming buffer reset (or non-monotonic stream). Start fresh.
draftChunker?.reset();
draftText = "";
lane.chunker?.reset();
lane.draftText = "";
}
lastPartialText = text;
lane.lastPartialText = text;
if (!delta) {
return;
}
if (!draftChunker) {
draftText = text;
draftStream.update(draftText);
if (!lane.chunker) {
lane.draftText = text;
laneStream.update(lane.draftText);
return;
}
draftChunker.append(delta);
draftChunker.drain({
lane.chunker.append(delta);
lane.chunker.drain({
force: false,
emit: (chunk) => {
draftText += chunk;
draftStream.update(draftText);
lane.draftText += chunk;
laneStream.update(lane.draftText);
},
});
};
const flushDraft = async () => {
if (!draftStream) {
const flushDraftLane = async (lane: DraftLaneState) => {
if (!lane.stream) {
return;
}
if (draftChunker?.hasBuffered()) {
draftChunker.drain({
if (lane.chunker?.hasBuffered()) {
lane.chunker.drain({
force: true,
emit: (chunk) => {
draftText += chunk;
lane.draftText += chunk;
},
});
draftChunker.reset();
if (draftText) {
draftStream.update(draftText);
lane.chunker.reset();
if (lane.draftText) {
lane.stream.update(lane.draftText);
}
}
await draftStream.flush();
await lane.stream.flush();
};
const disableBlockStreaming =
typeof telegramCfg.blockStreaming === "boolean"
? !telegramCfg.blockStreaming
: draftStream || streamMode === "off"
: answerDraftStream || streamMode === "off"
? true
: undefined;
@@ -290,6 +347,48 @@ export const dispatchTelegramMessage = async ({
linkPreview: telegramCfg.linkPreview,
replyQuoteText,
};
const tryFinalizePreviewForLane = async (params: {
lane: DraftLaneState;
laneName: "answer" | "reasoning";
finalText: string;
previewButtons?: TelegramInlineButtons;
}): Promise<boolean> => {
const { lane, laneName, finalText, previewButtons } = params;
if (!lane.stream) {
return false;
}
const currentPreviewText = streamMode === "block" ? lane.draftText : lane.lastPartialText;
await lane.stream.stop();
const previewMessageId = lane.stream.messageId();
if (typeof previewMessageId !== "number") {
return false;
}
if (
currentPreviewText &&
currentPreviewText.startsWith(finalText) &&
finalText.length < currentPreviewText.length
) {
// Avoid regressive punctuation/wording flicker from occasional shorter finals.
deliveryState.delivered = true;
return true;
}
try {
await editMessageTelegram(chatId, previewMessageId, finalText, {
api: bot.api,
cfg,
accountId: route.accountId,
linkPreview: telegramCfg.linkPreview,
buttons: previewButtons,
});
deliveryState.delivered = true;
return true;
} catch (err) {
logVerbose(
`telegram: ${laneName} preview final edit failed; falling back to standard send (${String(err)})`,
);
return false;
}
};
let queuedFinal = false;
try {
@@ -300,51 +399,41 @@ export const dispatchTelegramMessage = async ({
...prefixOptions,
deliver: async (payload, info) => {
if (info.kind === "final") {
await flushDraft();
await flushDraftLane(answerLane);
await flushDraftLane(reasoningLane);
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
const previewMessageId = draftStream?.messageId();
const finalText = payload.text;
const currentPreviewText = streamMode === "block" ? draftText : lastPartialText;
const reasoningMessage = isReasoningMessage(finalText);
const previewButtons = (
payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined
)?.buttons;
let draftStoppedForPreviewEdit = false;
// Skip preview edit for error payloads to avoid overwriting previous content
const canFinalizeViaPreviewEdit =
!finalizedViaPreviewMessage &&
!hasMedia &&
typeof finalText === "string" &&
finalText.length > 0 &&
typeof previewMessageId === "number" &&
finalText.length <= draftMaxChars &&
!payload.isError;
if (canFinalizeViaPreviewEdit) {
await draftStream?.stop();
draftStoppedForPreviewEdit = true;
if (
currentPreviewText &&
currentPreviewText.startsWith(finalText) &&
finalText.length < currentPreviewText.length
) {
// Ignore regressive final edits (e.g., "Okay." -> "Ok"), which
// can appear transiently in some provider streams.
if (canFinalizeViaPreviewEdit && reasoningMessage) {
const finalizedReasoning = await tryFinalizePreviewForLane({
lane: reasoningLane,
laneName: "reasoning",
finalText,
previewButtons,
});
if (finalizedReasoning) {
return;
}
try {
await editMessageTelegram(chatId, previewMessageId, finalText, {
api: bot.api,
cfg,
accountId: route.accountId,
linkPreview: telegramCfg.linkPreview,
buttons: previewButtons,
});
}
if (canFinalizeViaPreviewEdit && !reasoningMessage && !finalizedViaPreviewMessage) {
const finalizedAnswer = await tryFinalizePreviewForLane({
lane: answerLane,
laneName: "answer",
finalText,
previewButtons,
});
if (finalizedAnswer) {
finalizedViaPreviewMessage = true;
deliveryState.delivered = true;
return;
} catch (err) {
logVerbose(
`telegram: preview final edit failed; falling back to standard send (${String(err)})`,
);
}
}
if (
@@ -357,38 +446,8 @@ export const dispatchTelegramMessage = async ({
`telegram: preview final too long for edit (${finalText.length} > ${draftMaxChars}); falling back to standard send`,
);
}
if (!draftStoppedForPreviewEdit) {
await draftStream?.stop();
}
// Check if stop() sent a message (debounce released on isFinal)
// If so, edit that message instead of sending a new one
const messageIdAfterStop = draftStream?.messageId();
if (
!finalizedViaPreviewMessage &&
typeof messageIdAfterStop === "number" &&
typeof finalText === "string" &&
finalText.length > 0 &&
finalText.length <= draftMaxChars &&
!hasMedia &&
!payload.isError
) {
try {
await editMessageTelegram(chatId, messageIdAfterStop, finalText, {
api: bot.api,
cfg,
accountId: route.accountId,
linkPreview: telegramCfg.linkPreview,
buttons: previewButtons,
});
finalizedViaPreviewMessage = true;
deliveryState.delivered = true;
return;
} catch (err) {
logVerbose(
`telegram: post-stop preview edit failed; falling back to standard send (${String(err)})`,
);
}
}
await answerLane.stream?.stop();
await reasoningLane.stream?.stop();
}
const result = await deliverReplies({
...deliveryBaseOptions,
@@ -422,32 +481,29 @@ export const dispatchTelegramMessage = async ({
replyOptions: {
skillFilter,
disableBlockStreaming,
onPartialReply: draftStream ? (payload) => updateDraftFromPartial(payload.text) : undefined,
onAssistantMessageStart: draftStream
onPartialReply: answerLane.stream
? (payload) => updateDraftFromPartial(answerLane, payload.text)
: undefined,
onReasoningStream: reasoningLane.stream
? (payload) => updateDraftFromPartial(reasoningLane, payload.text)
: undefined,
onAssistantMessageStart: answerLane.stream
? () => {
// Only split preview bubbles in block mode. In partial mode, keep
// editing one preview message to avoid flooding the chat.
logVerbose(
`telegram: onAssistantMessageStart called, hasStreamedMessage=${hasStreamedMessage}`,
);
if (shouldSplitPreviewMessages && hasStreamedMessage) {
logVerbose(`telegram: calling forceNewMessage()`);
draftStream.forceNewMessage();
// Keep answer blocks separated in block mode; partial mode keeps one answer lane.
if (streamMode === "block" && answerLane.hasStreamedMessage) {
answerLane.stream?.forceNewMessage();
}
lastPartialText = "";
draftText = "";
draftChunker?.reset();
resetDraftLaneState(answerLane);
}
: undefined,
onReasoningEnd: draftStream
onReasoningEnd: reasoningLane.stream
? () => {
// Same policy as assistant-message boundaries: split only in block mode.
if (shouldSplitPreviewMessages && hasStreamedMessage) {
draftStream.forceNewMessage();
// Reasoning should appear in its own lane and each block should keep
// a separate bubble for readability.
if (reasoningLane.hasStreamedMessage) {
reasoningLane.stream?.forceNewMessage();
}
lastPartialText = "";
draftText = "";
draftChunker?.reset();
resetDraftLaneState(reasoningLane);
}
: undefined,
onModelSelected,
@@ -455,10 +511,11 @@ export const dispatchTelegramMessage = async ({
}));
} finally {
// Must stop() first to flush debounced content before clear() wipes state
await draftStream?.stop();
await answerLane.stream?.stop();
if (!finalizedViaPreviewMessage) {
await draftStream?.clear();
await answerLane.stream?.clear();
}
await reasoningLane.stream?.stop();
}
let sentFallback = false;
if (!deliveryState.delivered && deliveryState.skippedNonSilent > 0) {