diff --git a/src/telegram/bot-message-dispatch.test.ts b/src/telegram/bot-message-dispatch.test.ts index 8893628fd1..5f59feb77a 100644 --- a/src/telegram/bot-message-dispatch.test.ts +++ b/src/telegram/bot-message-dispatch.test.ts @@ -52,6 +52,15 @@ describe("dispatchTelegramMessage draft streaming", () => { }; } + function setupDraftStreams(params?: { answerMessageId?: number; reasoningMessageId?: number }) { + const answerDraftStream = createDraftStream(params?.answerMessageId); + const reasoningDraftStream = createDraftStream(params?.reasoningMessageId); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + return { answerDraftStream, reasoningDraftStream }; + } + function createContext(overrides?: Partial): TelegramMessageContext { const base = { ctxPayload: {}, @@ -152,6 +161,7 @@ describe("dispatchTelegramMessage draft streaming", () => { expect.objectContaining({ chatId: 123, thread: { id: 777, scope: "dm" }, + minInitialChars: 1, }), ); expect(draftStream.update).toHaveBeenCalledWith("Hello"); @@ -172,6 +182,27 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(draftStream.clear).toHaveBeenCalledTimes(1); }); + it("keeps a higher initial debounce threshold in block stream mode", async () => { + const draftStream = createDraftStream(); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Hello" }); + await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ context: createContext(), streamMode: "block" }); + + expect(createTelegramDraftStream).toHaveBeenCalledWith( + expect.objectContaining({ + minInitialChars: 30, + }), + ); + }); + it("keeps block streaming enabled when account config enables it", async () => { dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); @@ -428,12 +459,12 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); }); - it("does not force new message on reasoning end without previous output", async () => { + it("forces new message on reasoning end after streamed reasoning output", async () => { const draftStream = createDraftStream(999); createTelegramDraftStream.mockReturnValue(draftStream); dispatchReplyWithBufferedBlockDispatcher.mockImplementation( async ({ dispatcherOptions, replyOptions }) => { - // Reasoning starts immediately (no previous text output) + // Reasoning starts immediately (no assistant-answer output yet) await replyOptions?.onReasoningStream?.({ text: "Thinking..." }); // Reasoning ends await replyOptions?.onReasoningEnd?.(); @@ -447,8 +478,109 @@ describe("dispatchTelegramMessage draft streaming", () => { await dispatchWithContext({ context: createContext(), streamMode: "block" }); - // No previous text output, so no forceNewMessage needed - expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); + // Reasoning stream produced preview output, so split for final answer. + expect(draftStream.forceNewMessage).toHaveBeenCalled(); + }); + + it("does not finalize preview with reasoning payloads before answer payloads", async () => { + setupDraftStreams({ answerMessageId: 999 }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Hi, I did what you asked and..." }); + await dispatcherOptions.deliver({ text: "Reasoning:\n_step one_" }, { kind: "final" }); + await dispatcherOptions.deliver( + { text: "Hi, I did what you asked and..." }, + { kind: "final" }, + ); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + // Keep reasoning as its own message. + expect(deliverReplies).toHaveBeenCalledTimes(1); + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [expect.objectContaining({ text: "Reasoning:\n_step one_" })], + }), + ); + // Finalize preview with the actual answer instead of overwriting with reasoning. + expect(editMessageTelegram).toHaveBeenCalledTimes(1); + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 999, + "Hi, I did what you asked and...", + expect.any(Object), + ); + }); + + it("wires reasoning stream updates into telegram draft previews", async () => { + const { reasoningDraftStream } = setupDraftStreams({ + answerMessageId: 999, + reasoningMessageId: 111, + }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_Thinking..._" }); + await dispatcherOptions.deliver({ text: "Final answer" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(reasoningDraftStream.update).toHaveBeenCalledWith("Reasoning:\n_Thinking..._"); + }); + + it("keeps reasoning and answer streaming in separate preview lanes", async () => { + const { answerDraftStream, reasoningDraftStream } = setupDraftStreams({ + answerMessageId: 999, + reasoningMessageId: 111, + }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_Working on it..._" }); + await replyOptions?.onPartialReply?.({ text: "Checking the directory..." }); + await dispatcherOptions.deliver({ text: "Checking the directory..." }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(reasoningDraftStream.update).toHaveBeenCalledWith("Reasoning:\n_Working on it..._"); + expect(answerDraftStream.update).toHaveBeenCalledWith("Checking the directory..."); + expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled(); + expect(reasoningDraftStream.forceNewMessage).not.toHaveBeenCalled(); + }); + + it("does not edit reasoning preview bubble with final answer when no assistant partial arrived yet", async () => { + setupDraftStreams({ reasoningMessageId: 999 }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_Working on it..._" }); + await dispatcherOptions.deliver({ text: "Here's what I found." }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(editMessageTelegram).not.toHaveBeenCalled(); + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [expect.objectContaining({ text: "Here's what I found." })], + }), + ); }); it("does not edit preview message when final payload is an error", async () => { diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index 7cfd077879..06054da705 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -33,6 +33,17 @@ const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again."; /** Minimum chars before sending first streaming message (improves push notification UX) */ const DRAFT_MIN_INITIAL_CHARS = 30; +const REASONING_MESSAGE_PREFIX = "Reasoning:\n"; + +function isReasoningMessage(text?: string): boolean { + if (typeof text !== "string") { + return false; + } + const trimmed = text.trim(); + return ( + trimmed.startsWith(REASONING_MESSAGE_PREFIX) && trimmed.length > REASONING_MESSAGE_PREFIX.length + ); +} async function resolveStickerVisionSupport(cfg: OpenClawConfig, agentId: string) { try { @@ -97,101 +108,147 @@ export const dispatchTelegramMessage = async ({ const canStreamDraft = streamMode !== "off" && !accountBlockStreamingEnabled; const draftReplyToMessageId = replyToMode !== "off" && typeof msg.message_id === "number" ? msg.message_id : undefined; - const draftStream = canStreamDraft + const draftMinInitialChars = streamMode === "partial" ? 1 : DRAFT_MIN_INITIAL_CHARS; + const answerDraftStream = canStreamDraft ? createTelegramDraftStream({ api: bot.api, chatId, maxChars: draftMaxChars, thread: threadSpec, replyToMessageId: draftReplyToMessageId, - minInitialChars: DRAFT_MIN_INITIAL_CHARS, + minInitialChars: draftMinInitialChars, log: logVerbose, warn: logVerbose, }) : undefined; - const draftChunking = - draftStream && streamMode === "block" + const reasoningDraftStream = canStreamDraft + ? createTelegramDraftStream({ + api: bot.api, + chatId, + maxChars: draftMaxChars, + thread: threadSpec, + replyToMessageId: draftReplyToMessageId, + minInitialChars: draftMinInitialChars, + log: logVerbose, + warn: logVerbose, + }) + : undefined; + const answerDraftChunking = + answerDraftStream && streamMode === "block" ? resolveTelegramDraftStreamingChunking(cfg, route.accountId) : undefined; - const shouldSplitPreviewMessages = streamMode === "block"; - const draftChunker = draftChunking ? new EmbeddedBlockChunker(draftChunking) : undefined; + const answerDraftChunker = answerDraftChunking + ? new EmbeddedBlockChunker(answerDraftChunking) + : undefined; + const reasoningDraftChunking = + reasoningDraftStream && streamMode === "block" + ? resolveTelegramDraftStreamingChunking(cfg, route.accountId) + : undefined; + const reasoningDraftChunker = reasoningDraftChunking + ? new EmbeddedBlockChunker(reasoningDraftChunking) + : undefined; const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId); - let lastPartialText = ""; - let draftText = ""; - let hasStreamedMessage = false; - const updateDraftFromPartial = (text?: string) => { - if (!draftStream || !text) { + type DraftLaneState = { + stream: ReturnType | undefined; + lastPartialText: string; + draftText: string; + hasStreamedMessage: boolean; + chunker: EmbeddedBlockChunker | undefined; + }; + const answerLane: DraftLaneState = { + stream: answerDraftStream, + lastPartialText: "", + draftText: "", + hasStreamedMessage: false, + chunker: answerDraftChunker, + }; + const reasoningLane: DraftLaneState = { + stream: reasoningDraftStream, + lastPartialText: "", + draftText: "", + hasStreamedMessage: false, + chunker: reasoningDraftChunker, + }; + const resetDraftLaneState = (lane: DraftLaneState) => { + lane.lastPartialText = ""; + lane.draftText = ""; + lane.hasStreamedMessage = false; + lane.chunker?.reset(); + }; + const updateDraftFromPartial = (lane: DraftLaneState, text: string | undefined) => { + const laneStream = lane.stream; + if (!laneStream || !text) { return; } - if (text === lastPartialText) { + if (text === lane.lastPartialText) { return; } // Mark that we've received streaming content (for forceNewMessage decision). - hasStreamedMessage = true; + lane.hasStreamedMessage = true; if (streamMode === "partial") { // Some providers briefly emit a shorter prefix snapshot (for example // "Sure." -> "Sure" -> "Sure."). Keep the longer preview to avoid // visible punctuation flicker. if ( - lastPartialText && - lastPartialText.startsWith(text) && - text.length < lastPartialText.length + lane.lastPartialText && + lane.lastPartialText.startsWith(text) && + text.length < lane.lastPartialText.length ) { return; } - lastPartialText = text; - draftStream.update(text); + lane.lastPartialText = text; + laneStream.update(text); return; } let delta = text; - if (text.startsWith(lastPartialText)) { - delta = text.slice(lastPartialText.length); + if (text.startsWith(lane.lastPartialText)) { + delta = text.slice(lane.lastPartialText.length); } else { // Streaming buffer reset (or non-monotonic stream). Start fresh. - draftChunker?.reset(); - draftText = ""; + lane.chunker?.reset(); + lane.draftText = ""; } - lastPartialText = text; + lane.lastPartialText = text; if (!delta) { return; } - if (!draftChunker) { - draftText = text; - draftStream.update(draftText); + if (!lane.chunker) { + lane.draftText = text; + laneStream.update(lane.draftText); return; } - draftChunker.append(delta); - draftChunker.drain({ + lane.chunker.append(delta); + lane.chunker.drain({ force: false, emit: (chunk) => { - draftText += chunk; - draftStream.update(draftText); + lane.draftText += chunk; + laneStream.update(lane.draftText); }, }); }; - const flushDraft = async () => { - if (!draftStream) { + const flushDraftLane = async (lane: DraftLaneState) => { + if (!lane.stream) { return; } - if (draftChunker?.hasBuffered()) { - draftChunker.drain({ + if (lane.chunker?.hasBuffered()) { + lane.chunker.drain({ force: true, emit: (chunk) => { - draftText += chunk; + lane.draftText += chunk; }, }); - draftChunker.reset(); - if (draftText) { - draftStream.update(draftText); + lane.chunker.reset(); + if (lane.draftText) { + lane.stream.update(lane.draftText); } } - await draftStream.flush(); + await lane.stream.flush(); }; const disableBlockStreaming = typeof telegramCfg.blockStreaming === "boolean" ? !telegramCfg.blockStreaming - : draftStream || streamMode === "off" + : answerDraftStream || streamMode === "off" ? true : undefined; @@ -290,6 +347,48 @@ export const dispatchTelegramMessage = async ({ linkPreview: telegramCfg.linkPreview, replyQuoteText, }; + const tryFinalizePreviewForLane = async (params: { + lane: DraftLaneState; + laneName: "answer" | "reasoning"; + finalText: string; + previewButtons?: TelegramInlineButtons; + }): Promise => { + const { lane, laneName, finalText, previewButtons } = params; + if (!lane.stream) { + return false; + } + const currentPreviewText = streamMode === "block" ? lane.draftText : lane.lastPartialText; + await lane.stream.stop(); + const previewMessageId = lane.stream.messageId(); + if (typeof previewMessageId !== "number") { + return false; + } + if ( + currentPreviewText && + currentPreviewText.startsWith(finalText) && + finalText.length < currentPreviewText.length + ) { + // Avoid regressive punctuation/wording flicker from occasional shorter finals. + deliveryState.delivered = true; + return true; + } + try { + await editMessageTelegram(chatId, previewMessageId, finalText, { + api: bot.api, + cfg, + accountId: route.accountId, + linkPreview: telegramCfg.linkPreview, + buttons: previewButtons, + }); + deliveryState.delivered = true; + return true; + } catch (err) { + logVerbose( + `telegram: ${laneName} preview final edit failed; falling back to standard send (${String(err)})`, + ); + return false; + } + }; let queuedFinal = false; try { @@ -300,51 +399,41 @@ export const dispatchTelegramMessage = async ({ ...prefixOptions, deliver: async (payload, info) => { if (info.kind === "final") { - await flushDraft(); + await flushDraftLane(answerLane); + await flushDraftLane(reasoningLane); const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; - const previewMessageId = draftStream?.messageId(); const finalText = payload.text; - const currentPreviewText = streamMode === "block" ? draftText : lastPartialText; + const reasoningMessage = isReasoningMessage(finalText); const previewButtons = ( payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined )?.buttons; - let draftStoppedForPreviewEdit = false; - // Skip preview edit for error payloads to avoid overwriting previous content const canFinalizeViaPreviewEdit = - !finalizedViaPreviewMessage && !hasMedia && typeof finalText === "string" && finalText.length > 0 && - typeof previewMessageId === "number" && finalText.length <= draftMaxChars && !payload.isError; - if (canFinalizeViaPreviewEdit) { - await draftStream?.stop(); - draftStoppedForPreviewEdit = true; - if ( - currentPreviewText && - currentPreviewText.startsWith(finalText) && - finalText.length < currentPreviewText.length - ) { - // Ignore regressive final edits (e.g., "Okay." -> "Ok"), which - // can appear transiently in some provider streams. + if (canFinalizeViaPreviewEdit && reasoningMessage) { + const finalizedReasoning = await tryFinalizePreviewForLane({ + lane: reasoningLane, + laneName: "reasoning", + finalText, + previewButtons, + }); + if (finalizedReasoning) { return; } - try { - await editMessageTelegram(chatId, previewMessageId, finalText, { - api: bot.api, - cfg, - accountId: route.accountId, - linkPreview: telegramCfg.linkPreview, - buttons: previewButtons, - }); + } + if (canFinalizeViaPreviewEdit && !reasoningMessage && !finalizedViaPreviewMessage) { + const finalizedAnswer = await tryFinalizePreviewForLane({ + lane: answerLane, + laneName: "answer", + finalText, + previewButtons, + }); + if (finalizedAnswer) { finalizedViaPreviewMessage = true; - deliveryState.delivered = true; return; - } catch (err) { - logVerbose( - `telegram: preview final edit failed; falling back to standard send (${String(err)})`, - ); } } if ( @@ -357,38 +446,8 @@ export const dispatchTelegramMessage = async ({ `telegram: preview final too long for edit (${finalText.length} > ${draftMaxChars}); falling back to standard send`, ); } - if (!draftStoppedForPreviewEdit) { - await draftStream?.stop(); - } - // Check if stop() sent a message (debounce released on isFinal) - // If so, edit that message instead of sending a new one - const messageIdAfterStop = draftStream?.messageId(); - if ( - !finalizedViaPreviewMessage && - typeof messageIdAfterStop === "number" && - typeof finalText === "string" && - finalText.length > 0 && - finalText.length <= draftMaxChars && - !hasMedia && - !payload.isError - ) { - try { - await editMessageTelegram(chatId, messageIdAfterStop, finalText, { - api: bot.api, - cfg, - accountId: route.accountId, - linkPreview: telegramCfg.linkPreview, - buttons: previewButtons, - }); - finalizedViaPreviewMessage = true; - deliveryState.delivered = true; - return; - } catch (err) { - logVerbose( - `telegram: post-stop preview edit failed; falling back to standard send (${String(err)})`, - ); - } - } + await answerLane.stream?.stop(); + await reasoningLane.stream?.stop(); } const result = await deliverReplies({ ...deliveryBaseOptions, @@ -422,32 +481,29 @@ export const dispatchTelegramMessage = async ({ replyOptions: { skillFilter, disableBlockStreaming, - onPartialReply: draftStream ? (payload) => updateDraftFromPartial(payload.text) : undefined, - onAssistantMessageStart: draftStream + onPartialReply: answerLane.stream + ? (payload) => updateDraftFromPartial(answerLane, payload.text) + : undefined, + onReasoningStream: reasoningLane.stream + ? (payload) => updateDraftFromPartial(reasoningLane, payload.text) + : undefined, + onAssistantMessageStart: answerLane.stream ? () => { - // Only split preview bubbles in block mode. In partial mode, keep - // editing one preview message to avoid flooding the chat. - logVerbose( - `telegram: onAssistantMessageStart called, hasStreamedMessage=${hasStreamedMessage}`, - ); - if (shouldSplitPreviewMessages && hasStreamedMessage) { - logVerbose(`telegram: calling forceNewMessage()`); - draftStream.forceNewMessage(); + // Keep answer blocks separated in block mode; partial mode keeps one answer lane. + if (streamMode === "block" && answerLane.hasStreamedMessage) { + answerLane.stream?.forceNewMessage(); } - lastPartialText = ""; - draftText = ""; - draftChunker?.reset(); + resetDraftLaneState(answerLane); } : undefined, - onReasoningEnd: draftStream + onReasoningEnd: reasoningLane.stream ? () => { - // Same policy as assistant-message boundaries: split only in block mode. - if (shouldSplitPreviewMessages && hasStreamedMessage) { - draftStream.forceNewMessage(); + // Reasoning should appear in its own lane and each block should keep + // a separate bubble for readability. + if (reasoningLane.hasStreamedMessage) { + reasoningLane.stream?.forceNewMessage(); } - lastPartialText = ""; - draftText = ""; - draftChunker?.reset(); + resetDraftLaneState(reasoningLane); } : undefined, onModelSelected, @@ -455,10 +511,11 @@ export const dispatchTelegramMessage = async ({ })); } finally { // Must stop() first to flush debounced content before clear() wipes state - await draftStream?.stop(); + await answerLane.stream?.stop(); if (!finalizedViaPreviewMessage) { - await draftStream?.clear(); + await answerLane.stream?.clear(); } + await reasoningLane.stream?.stop(); } let sentFallback = false; if (!deliveryState.delivered && deliveryState.skippedNonSilent > 0) {