fix(reasoning): split streamed reasoning and answer lanes

This commit is contained in:
Ayaan Zaidi
2026-02-19 19:39:41 +05:30
parent 6af475f007
commit 071da127f1
7 changed files with 630 additions and 86 deletions

View File

@@ -165,9 +165,15 @@ export function handleMessageUpdate(
}
}
const streamedReasoning = extractThinkingFromTaggedStream(ctx.state.deltaBuffer);
if (streamedReasoning) {
// Keep the latest observed reasoning so reasoning=on can still emit a separate
// reasoning block when providers omit thinking blocks in final message content.
ctx.state.lastObservedReasoning = streamedReasoning;
}
if (ctx.state.streamReasoning) {
// Handle partial <think> tags: stream whatever reasoning is visible so far.
ctx.emitReasoningStream(extractThinkingFromTaggedStream(ctx.state.deltaBuffer));
ctx.emitReasoningStream(streamedReasoning);
}
const next = ctx
@@ -274,9 +280,13 @@ export function handleMessageEnd(
text: ctx.stripBlockTags(rawText, { thinking: false, final: false }),
messagingToolSentTexts: ctx.state.messagingToolSentTexts,
});
const streamedReasoningFallback =
ctx.state.lastObservedReasoning || extractThinkingFromTaggedStream(ctx.state.deltaBuffer);
const rawThinking =
ctx.state.includeReasoning || ctx.state.streamReasoning
? extractAssistantThinking(assistantMessage) || extractThinkingFromTaggedText(rawText)
? extractAssistantThinking(assistantMessage) ||
streamedReasoningFallback ||
extractThinkingFromTaggedText(rawText)
: "";
const formattedReasoning = rawThinking ? formatReasoningMessage(rawThinking) : "";
const trimmedText = text.trim();

View File

@@ -51,6 +51,7 @@ export type EmbeddedPiSubscribeState = {
lastStreamedAssistantCleaned?: string;
emittedAssistantUpdate: boolean;
lastStreamedReasoning?: string;
lastObservedReasoning?: string;
lastBlockReplyText?: string;
reasoningStreamOpen: boolean;
assistantMessageIndex: number;

View File

@@ -68,4 +68,36 @@ describe("subscribeEmbeddedPiSession", () => {
]);
},
);
it.each(THINKING_TAG_CASES)(
"falls back to streamed <%s> reasoning when message_end has no thinking block",
({ open, close }) => {
const { emit, onBlockReply } = createReasoningBlockReplyHarness();
emit({ type: "message_start", message: { role: "assistant" } });
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: `${open}Because` },
});
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: ` it helps${close}\n\nFinal answer`,
},
});
emit({
type: "message_end",
message: {
role: "assistant",
content: [{ type: "text", text: "Final answer" }],
} as AssistantMessage,
});
expectReasoningAndAnswerCalls(onBlockReply);
},
);
});

View File

@@ -54,6 +54,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
lastStreamedAssistantCleaned: undefined,
emittedAssistantUpdate: false,
lastStreamedReasoning: undefined,
lastObservedReasoning: undefined,
lastBlockReplyText: undefined,
reasoningStreamOpen: false,
assistantMessageIndex: 0,
@@ -117,6 +118,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
state.emittedAssistantUpdate = false;
state.lastBlockReplyText = undefined;
state.lastStreamedReasoning = undefined;
state.lastObservedReasoning = undefined;
state.lastReasoningSent = undefined;
state.reasoningStreamOpen = false;
state.suppressBlockChunks = false;

View File

@@ -7,6 +7,8 @@ const createTelegramDraftStream = vi.hoisted(() => vi.fn());
const dispatchReplyWithBufferedBlockDispatcher = vi.hoisted(() => vi.fn());
const deliverReplies = vi.hoisted(() => vi.fn());
const editMessageTelegram = vi.hoisted(() => vi.fn());
const loadSessionStore = vi.hoisted(() => vi.fn());
const resolveStorePath = vi.hoisted(() => vi.fn(() => "/tmp/sessions.json"));
vi.mock("./draft-stream.js", () => ({
createTelegramDraftStream,
@@ -24,6 +26,11 @@ vi.mock("./send.js", () => ({
editMessageTelegram,
}));
vi.mock("../config/sessions.js", async () => ({
loadSessionStore,
resolveStorePath,
}));
vi.mock("./sticker-cache.js", () => ({
cacheSticker: vi.fn(),
describeStickerImage: vi.fn(),
@@ -39,6 +46,10 @@ describe("dispatchTelegramMessage draft streaming", () => {
dispatchReplyWithBufferedBlockDispatcher.mockReset();
deliverReplies.mockReset();
editMessageTelegram.mockReset();
loadSessionStore.mockReset();
resolveStorePath.mockReset();
resolveStorePath.mockReturnValue("/tmp/sessions.json");
loadSessionStore.mockReturnValue({});
});
function createDraftStream(messageId?: number) {
@@ -226,6 +237,66 @@ describe("dispatchTelegramMessage draft streaming", () => {
);
});
it("keeps block streaming enabled when session reasoning level is on", async () => {
loadSessionStore.mockReturnValue({
s1: { reasoningLevel: "on" },
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Reasoning:\n_step_" }, { kind: "block" });
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
ctxPayload: { SessionKey: "s1" } as unknown as TelegramMessageContext["ctxPayload"],
}),
});
expect(createTelegramDraftStream).not.toHaveBeenCalled();
expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledWith(
expect.objectContaining({
replyOptions: expect.objectContaining({
disableBlockStreaming: false,
}),
}),
);
expect(loadSessionStore).toHaveBeenCalledWith("/tmp/sessions.json", { skipCache: true });
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [expect.objectContaining({ text: "Reasoning:\n_step_" })],
}),
);
});
it("streams reasoning draft updates even when answer stream mode is off", async () => {
loadSessionStore.mockReturnValue({
s1: { reasoningLevel: "stream" },
});
const reasoningDraftStream = createDraftStream(111);
createTelegramDraftStream.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_step_" });
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
ctxPayload: { SessionKey: "s1" } as unknown as TelegramMessageContext["ctxPayload"],
}),
streamMode: "off",
});
expect(createTelegramDraftStream).toHaveBeenCalledTimes(1);
expect(reasoningDraftStream.update).toHaveBeenCalledWith("Reasoning:\n_step_");
expect(loadSessionStore).toHaveBeenCalledWith("/tmp/sessions.json", { skipCache: true });
});
it("finalizes text-only replies by editing the preview message in place", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
@@ -707,6 +778,141 @@ describe("dispatchTelegramMessage draft streaming", () => {
);
});
it("routes think-tag partials to reasoning lane and keeps answer lane clean", async () => {
const { answerDraftStream, reasoningDraftStream } = setupDraftStreams({
answerMessageId: 999,
reasoningMessageId: 111,
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({
text: "<think>Counting letters in strawberry</think>3",
});
await dispatcherOptions.deliver({ text: "3" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(reasoningDraftStream.update).toHaveBeenCalledWith(
"Reasoning:\n_Counting letters in strawberry_",
);
expect(answerDraftStream.update).toHaveBeenCalledWith("3");
expect(
answerDraftStream.update.mock.calls.some((call) => String(call[0] ?? "").includes("<think>")),
).toBe(false);
expect(editMessageTelegram).toHaveBeenCalledWith(123, 999, "3", expect.any(Object));
});
it("routes unmatched think partials to reasoning lane without leaking answer lane", async () => {
const { answerDraftStream, reasoningDraftStream } = setupDraftStreams({
answerMessageId: 999,
reasoningMessageId: 111,
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({
text: "<think>Counting letters in strawberry",
});
await dispatcherOptions.deliver(
{ text: "There are 3 r's in strawberry." },
{ kind: "final" },
);
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(reasoningDraftStream.update).toHaveBeenCalledWith(
"Reasoning:\n_Counting letters in strawberry_",
);
expect(
answerDraftStream.update.mock.calls.some((call) => String(call[0] ?? "").includes("<")),
).toBe(false);
expect(editMessageTelegram).toHaveBeenCalledWith(
123,
999,
"There are 3 r's in strawberry.",
expect.any(Object),
);
});
it("keeps reasoning preview message when reasoning is streamed but final is answer-only", async () => {
const { reasoningDraftStream } = setupDraftStreams({
answerMessageId: 999,
reasoningMessageId: 111,
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({
text: "<think>Word: strawberry. r appears at 3, 8, 9.</think>",
});
await dispatcherOptions.deliver(
{ text: "There are 3 r's in strawberry." },
{ kind: "final" },
);
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(reasoningDraftStream.update).toHaveBeenCalledWith(
"Reasoning:\n_Word: strawberry. r appears at 3, 8, 9._",
);
expect(reasoningDraftStream.clear).not.toHaveBeenCalled();
expect(editMessageTelegram).toHaveBeenCalledWith(
123,
999,
"There are 3 r's in strawberry.",
expect.any(Object),
);
});
it("splits think-tag final payload into reasoning and answer lanes", async () => {
setupDraftStreams({
answerMessageId: 999,
reasoningMessageId: 111,
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver(
{
text: "<think>Word: strawberry. r appears at 3, 8, 9.</think>There are 3 r's in strawberry.",
},
{ kind: "final" },
);
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(editMessageTelegram).toHaveBeenNthCalledWith(
1,
123,
111,
"Reasoning:\n_Word: strawberry. r appears at 3, 8, 9._",
expect.any(Object),
);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
2,
123,
999,
"There are 3 r's in strawberry.",
expect.any(Object),
);
expect(deliverReplies).not.toHaveBeenCalled();
});
it("splits reasoning preview only when next reasoning block starts in partial mode", async () => {
const { reasoningDraftStream } = setupDraftStreams({
answerMessageId: 999,

View File

@@ -10,11 +10,13 @@ import { EmbeddedBlockChunker } from "../agents/pi-embedded-block-chunker.js";
import { resolveChunkMode } from "../auto-reply/chunk.js";
import { clearHistoryEntriesIfEnabled } from "../auto-reply/reply/history.js";
import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js";
import type { ReplyPayload } from "../auto-reply/types.js";
import { removeAckReactionAfterReply } from "../channels/ack-reactions.js";
import { logAckFailure, logTypingFailure } from "../channels/logging.js";
import { createReplyPrefixOptions } from "../channels/reply-prefix.js";
import { createTypingCallbacks } from "../channels/typing.js";
import { resolveMarkdownTableMode } from "../config/markdown-tables.js";
import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
import type { OpenClawConfig, ReplyToMode, TelegramAccountConfig } from "../config/types.js";
import { danger, logVerbose } from "../globals.js";
import { getAgentScopedMediaLocalRoots } from "../media/local-roots.js";
@@ -27,6 +29,10 @@ import type { TelegramInlineButtons } from "./button-types.js";
import { resolveTelegramDraftStreamingChunking } from "./draft-chunking.js";
import { createTelegramDraftStream } from "./draft-stream.js";
import { renderTelegramHtmlText } from "./format.js";
import {
createTelegramReasoningStepState,
splitTelegramReasoningText,
} from "./reasoning-lane-coordinator.js";
import { editMessageTelegram } from "./send.js";
import { cacheSticker, describeStickerImage } from "./sticker-cache.js";
@@ -34,17 +40,6 @@ const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again.";
/** Minimum chars before sending first streaming message (improves push notification UX) */
const DRAFT_MIN_INITIAL_CHARS = 30;
const REASONING_MESSAGE_PREFIX = "Reasoning:\n";
function isReasoningMessage(text?: string): boolean {
if (typeof text !== "string") {
return false;
}
const trimmed = text.trim();
return (
trimmed.startsWith(REASONING_MESSAGE_PREFIX) && trimmed.length > REASONING_MESSAGE_PREFIX.length
);
}
async function resolveStickerVisionSupport(cfg: OpenClawConfig, agentId: string) {
try {
@@ -72,6 +67,31 @@ type DispatchTelegramMessageParams = {
opts: Pick<TelegramBotOptions, "token">;
};
type TelegramReasoningLevel = "off" | "on" | "stream";
function resolveTelegramReasoningLevel(params: {
cfg: OpenClawConfig;
sessionKey?: string;
agentId: string;
}): TelegramReasoningLevel {
const { cfg, sessionKey, agentId } = params;
if (!sessionKey) {
return "off";
}
try {
const storePath = resolveStorePath(cfg.session?.store, { agentId });
const store = loadSessionStore(storePath, { skipCache: true });
const entry = store[sessionKey.toLowerCase()] ?? store[sessionKey];
const level = entry?.reasoningLevel;
if (level === "on" || level === "stream") {
return level;
}
} catch {
// Fall through to default.
}
return "off";
}
export const dispatchTelegramMessage = async ({
context,
bot,
@@ -115,11 +135,21 @@ export const dispatchTelegramMessage = async ({
typeof telegramCfg.blockStreaming === "boolean"
? telegramCfg.blockStreaming
: cfg.agents?.defaults?.blockStreamingDefault === "on";
const canStreamDraft = streamMode !== "off" && !accountBlockStreamingEnabled;
const resolvedReasoningLevel = resolveTelegramReasoningLevel({
cfg,
sessionKey: ctxPayload.SessionKey,
agentId: route.agentId,
});
const forceBlockStreamingForReasoning = resolvedReasoningLevel === "on";
const streamReasoningDraft = resolvedReasoningLevel === "stream";
const canStreamAnswerDraft =
streamMode !== "off" && !accountBlockStreamingEnabled && !forceBlockStreamingForReasoning;
const canStreamReasoningDraft = canStreamAnswerDraft || streamReasoningDraft;
const draftReplyToMessageId =
replyToMode !== "off" && typeof msg.message_id === "number" ? msg.message_id : undefined;
const draftMinInitialChars = streamMode === "partial" ? 1 : DRAFT_MIN_INITIAL_CHARS;
const answerDraftStream = canStreamDraft
const draftMinInitialChars =
streamMode === "partial" || streamReasoningDraft ? 1 : DRAFT_MIN_INITIAL_CHARS;
const answerDraftStream = canStreamAnswerDraft
? createTelegramDraftStream({
api: bot.api,
chatId,
@@ -132,7 +162,7 @@ export const dispatchTelegramMessage = async ({
warn: logVerbose,
})
: undefined;
const reasoningDraftStream = canStreamDraft
const reasoningDraftStream = canStreamReasoningDraft
? createTelegramDraftStream({
api: bot.api,
chatId,
@@ -182,6 +212,7 @@ export const dispatchTelegramMessage = async ({
chunker: reasoningDraftChunker,
};
let splitReasoningOnNextStream = false;
const reasoningStepState = createTelegramReasoningStepState();
const resetDraftLaneState = (lane: DraftLaneState) => {
lane.lastPartialText = "";
lane.draftText = "";
@@ -239,6 +270,20 @@ export const dispatchTelegramMessage = async ({
},
});
};
const updateDraftLanesFromPartial = (text: string | undefined) => {
if (!text) {
return;
}
const split = splitTelegramReasoningText(text);
if (split.reasoningText) {
reasoningStepState.noteReasoningHint();
reasoningStepState.noteReasoningDelivered();
updateDraftFromPartial(reasoningLane, split.reasoningText);
}
if (split.answerText) {
updateDraftFromPartial(answerLane, split.answerText);
}
};
const flushDraftLane = async (lane: DraftLaneState) => {
if (!lane.stream) {
return;
@@ -258,10 +303,11 @@ export const dispatchTelegramMessage = async ({
await lane.stream.flush();
};
const disableBlockStreaming =
typeof telegramCfg.blockStreaming === "boolean"
const disableBlockStreaming = forceBlockStreamingForReasoning
? false
: typeof telegramCfg.blockStreaming === "boolean"
? !telegramCfg.blockStreaming
: answerDraftStream || streamMode === "off"
: canStreamAnswerDraft || streamMode === "off"
? true
: undefined;
@@ -336,6 +382,7 @@ export const dispatchTelegramMessage = async ({
skippedNonSilent: 0,
};
let finalizedViaPreviewMessage = false;
let finalizedReasoningViaPreviewMessage = false;
const clearGroupHistory = () => {
if (isGroup && historyKey) {
clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit });
@@ -442,6 +489,51 @@ export const dispatchTelegramMessage = async ({
return false;
}
};
const applyTextToPayload = (payload: ReplyPayload, text: string): ReplyPayload => {
if (payload.text === text) {
return payload;
}
return { ...payload, text };
};
const sendPayload = async (payload: ReplyPayload) => {
const result = await deliverReplies({
...deliveryBaseOptions,
replies: [payload],
onVoiceRecording: sendRecordVoice,
});
if (result.delivered) {
deliveryState.delivered = true;
}
return result.delivered;
};
const tryFinalizeLaneText = async (params: {
lane: DraftLaneState;
laneName: "answer" | "reasoning";
text: string;
previewButtons?: TelegramInlineButtons;
alreadyFinalized?: boolean;
payload: ReplyPayload;
}): Promise<boolean> => {
const { lane, laneName, text, previewButtons, alreadyFinalized, payload } = params;
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
const canFinalizeViaPreviewEdit =
!hasMedia && text.length > 0 && text.length <= draftMaxChars && !payload.isError;
if (!canFinalizeViaPreviewEdit || alreadyFinalized) {
if (!hasMedia && !payload.isError && text.length > draftMaxChars) {
logVerbose(
`telegram: preview final too long for edit (${text.length} > ${draftMaxChars}); falling back to standard send`,
);
}
return false;
}
await flushDraftLane(lane);
return tryFinalizePreviewForLane({
lane,
laneName,
finalText: text,
previewButtons,
});
};
let queuedFinal = false;
try {
@@ -451,75 +543,147 @@ export const dispatchTelegramMessage = async ({
dispatcherOptions: {
...prefixOptions,
deliver: async (payload, info) => {
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
const finalText = payload.text;
const reasoningMessage = isReasoningMessage(finalText);
const previewButtons = (
payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined
)?.buttons;
const canFinalizeViaPreviewEdit =
!hasMedia &&
typeof finalText === "string" &&
finalText.length > 0 &&
finalText.length <= draftMaxChars &&
!payload.isError;
if (info.kind === "final") {
await flushDraftLane(answerLane);
await flushDraftLane(reasoningLane);
if (canFinalizeViaPreviewEdit && reasoningMessage) {
const finalizedReasoning = await tryFinalizePreviewForLane({
lane: reasoningLane,
laneName: "reasoning",
finalText,
previewButtons,
});
if (finalizedReasoning) {
return;
}
}
if (canFinalizeViaPreviewEdit && !reasoningMessage && !finalizedViaPreviewMessage) {
const finalizedAnswer = await tryFinalizePreviewForLane({
lane: answerLane,
laneName: "answer",
finalText,
previewButtons,
});
if (finalizedAnswer) {
finalizedViaPreviewMessage = true;
return;
}
}
if (
!hasMedia &&
!payload.isError &&
typeof finalText === "string" &&
finalText.length > draftMaxChars
) {
logVerbose(
`telegram: preview final too long for edit (${finalText.length} > ${draftMaxChars}); falling back to standard send`,
);
}
await answerLane.stream?.stop();
await reasoningLane.stream?.stop();
}
if (info.kind !== "final" && canFinalizeViaPreviewEdit && reasoningMessage) {
const updatedReasoning = await tryEditExistingPreviewForLane({
lane: reasoningLane,
laneName: "reasoning",
finalText,
previewButtons,
});
if (updatedReasoning) {
const split = splitTelegramReasoningText(payload.text);
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
const flushBufferedFinalAnswer = async () => {
const buffered = reasoningStepState.takeBufferedFinalAnswer();
if (!buffered) {
return;
}
const bufferedButtons = (
buffered.payload.channelData?.telegram as
| { buttons?: TelegramInlineButtons }
| undefined
)?.buttons;
const finalizedBufferedAnswer = await tryFinalizeLaneText({
lane: answerLane,
laneName: "answer",
text: buffered.text,
previewButtons: bufferedButtons,
alreadyFinalized: finalizedViaPreviewMessage,
payload: buffered.payload,
});
if (finalizedBufferedAnswer) {
finalizedViaPreviewMessage = true;
reasoningStepState.resetForNextStep();
return;
}
await answerLane.stream?.stop();
await sendPayload(applyTextToPayload(buffered.payload, buffered.text));
reasoningStepState.resetForNextStep();
};
const deliverReasoningText = async (text: string) => {
reasoningStepState.noteReasoningHint();
if (info.kind === "final") {
const finalizedReasoning = await tryFinalizeLaneText({
lane: reasoningLane,
laneName: "reasoning",
text,
previewButtons,
payload,
});
if (finalizedReasoning) {
finalizedReasoningViaPreviewMessage = true;
reasoningStepState.noteReasoningDelivered();
await flushBufferedFinalAnswer();
return;
}
await reasoningLane.stream?.stop();
const delivered = await sendPayload(applyTextToPayload(payload, text));
if (delivered) {
reasoningStepState.noteReasoningDelivered();
await flushBufferedFinalAnswer();
}
return;
}
const canEditReasoningPreview =
!hasMedia && text.length > 0 && text.length <= draftMaxChars && !payload.isError;
if (canEditReasoningPreview) {
const updatedReasoning = await tryEditExistingPreviewForLane({
lane: reasoningLane,
laneName: "reasoning",
finalText: text,
previewButtons,
});
if (updatedReasoning) {
reasoningStepState.noteReasoningDelivered();
await flushBufferedFinalAnswer();
return;
}
}
const delivered = await sendPayload(applyTextToPayload(payload, text));
if (delivered) {
reasoningStepState.noteReasoningDelivered();
await flushBufferedFinalAnswer();
}
};
const deliverAnswerText = async (text: string) => {
if (info.kind === "final" && reasoningStepState.shouldBufferFinalAnswer()) {
reasoningStepState.bufferFinalAnswer({ payload, text });
return;
}
if (info.kind === "final") {
const finalizedAnswer = await tryFinalizeLaneText({
lane: answerLane,
laneName: "answer",
text,
previewButtons,
alreadyFinalized: finalizedViaPreviewMessage,
payload,
});
if (finalizedAnswer && !finalizedViaPreviewMessage) {
finalizedViaPreviewMessage = true;
if (reasoningLane.hasStreamedMessage) {
finalizedReasoningViaPreviewMessage = true;
}
reasoningStepState.resetForNextStep();
return;
}
await answerLane.stream?.stop();
}
await sendPayload(applyTextToPayload(payload, text));
if (info.kind === "final") {
if (reasoningLane.hasStreamedMessage) {
finalizedReasoningViaPreviewMessage = true;
}
reasoningStepState.resetForNextStep();
}
};
if (split.reasoningText) {
await deliverReasoningText(split.reasoningText);
}
const result = await deliverReplies({
...deliveryBaseOptions,
replies: [payload],
onVoiceRecording: sendRecordVoice,
});
if (result.delivered) {
deliveryState.delivered = true;
if (split.answerText) {
await deliverAnswerText(split.answerText);
return;
}
if (split.reasoningText) {
return;
}
if (info.kind === "final") {
await answerLane.stream?.stop();
await reasoningLane.stream?.stop();
reasoningStepState.resetForNextStep();
}
const canSendAsIs =
hasMedia || typeof payload.text !== "string" || payload.text.length > 0;
if (!canSendAsIs) {
if (info.kind === "final") {
await flushBufferedFinalAnswer();
}
return;
}
await sendPayload(payload);
if (info.kind === "final") {
await flushBufferedFinalAnswer();
}
},
onSkip: (_payload, info) => {
@@ -546,7 +710,7 @@ export const dispatchTelegramMessage = async ({
skillFilter,
disableBlockStreaming,
onPartialReply: answerLane.stream
? (payload) => updateDraftFromPartial(answerLane, payload.text)
? (payload) => updateDraftLanesFromPartial(payload.text)
: undefined,
onReasoningStream: reasoningLane.stream
? (payload) => {
@@ -558,11 +722,20 @@ export const dispatchTelegramMessage = async ({
resetDraftLaneState(reasoningLane);
splitReasoningOnNextStream = false;
}
updateDraftFromPartial(reasoningLane, payload.text);
const split = splitTelegramReasoningText(payload.text);
if (split.reasoningText) {
reasoningStepState.noteReasoningHint();
reasoningStepState.noteReasoningDelivered();
updateDraftFromPartial(reasoningLane, split.reasoningText);
}
if (split.answerText) {
updateDraftFromPartial(answerLane, split.answerText);
}
}
: undefined,
onAssistantMessageStart: answerLane.stream
? () => {
reasoningStepState.resetForNextStep();
// Keep answer blocks separated in block mode; partial mode keeps one answer lane.
if (streamMode === "block" && answerLane.hasStreamedMessage) {
answerLane.stream?.forceNewMessage();
@@ -581,11 +754,20 @@ export const dispatchTelegramMessage = async ({
}));
} finally {
// Must stop() first to flush debounced content before clear() wipes state
const streamsShareHandle =
Boolean(answerLane.stream) &&
Boolean(reasoningLane.stream) &&
answerLane.stream === reasoningLane.stream;
await answerLane.stream?.stop();
if (!finalizedViaPreviewMessage) {
await answerLane.stream?.clear();
}
await reasoningLane.stream?.stop();
if (!streamsShareHandle) {
await reasoningLane.stream?.stop();
if (!finalizedReasoningViaPreviewMessage) {
await reasoningLane.stream?.clear();
}
}
}
let sentFallback = false;
if (!deliveryState.delivered && deliveryState.skippedNonSilent > 0) {

View File

@@ -0,0 +1,111 @@
import {
extractThinkingFromTaggedStream,
formatReasoningMessage,
} from "../agents/pi-embedded-utils.js";
import type { ReplyPayload } from "../auto-reply/types.js";
import { stripReasoningTagsFromText } from "../shared/text/reasoning-tags.js";
const REASONING_MESSAGE_PREFIX = "Reasoning:\n";
const REASONING_TAG_PREFIXES = [
"<think",
"<thinking",
"<thought",
"<antthinking",
"</think",
"</thinking",
"</thought",
"</antthinking",
];
function isPartialReasoningTagPrefix(text: string): boolean {
const trimmed = text.trimStart().toLowerCase();
if (!trimmed.startsWith("<")) {
return false;
}
if (trimmed.includes(">")) {
return false;
}
return REASONING_TAG_PREFIXES.some((prefix) => prefix.startsWith(trimmed));
}
export type TelegramReasoningSplit = {
reasoningText?: string;
answerText?: string;
};
export function splitTelegramReasoningText(text?: string): TelegramReasoningSplit {
if (typeof text !== "string") {
return {};
}
const trimmed = text.trim();
if (isPartialReasoningTagPrefix(trimmed)) {
return {};
}
if (
trimmed.startsWith(REASONING_MESSAGE_PREFIX) &&
trimmed.length > REASONING_MESSAGE_PREFIX.length
) {
return { reasoningText: trimmed };
}
const taggedReasoning = extractThinkingFromTaggedStream(text);
const strippedAnswer = stripReasoningTagsFromText(text, { mode: "strict", trim: "both" });
if (!taggedReasoning && strippedAnswer === text) {
return { answerText: text };
}
const reasoningText = taggedReasoning ? formatReasoningMessage(taggedReasoning) : undefined;
const answerText = strippedAnswer || undefined;
return { reasoningText, answerText };
}
export type BufferedFinalAnswer = {
payload: ReplyPayload;
text: string;
};
export function createTelegramReasoningStepState() {
let reasoningHint = false;
let reasoningDelivered = false;
let bufferedFinalAnswer: BufferedFinalAnswer | undefined;
const noteReasoningHint = () => {
reasoningHint = true;
};
const noteReasoningDelivered = () => {
reasoningHint = true;
reasoningDelivered = true;
};
const shouldBufferFinalAnswer = () => {
return reasoningHint && !reasoningDelivered && !bufferedFinalAnswer;
};
const bufferFinalAnswer = (value: BufferedFinalAnswer) => {
bufferedFinalAnswer = value;
};
const takeBufferedFinalAnswer = (): BufferedFinalAnswer | undefined => {
const value = bufferedFinalAnswer;
bufferedFinalAnswer = undefined;
return value;
};
const resetForNextStep = () => {
reasoningHint = false;
reasoningDelivered = false;
bufferedFinalAnswer = undefined;
};
return {
noteReasoningHint,
noteReasoningDelivered,
shouldBufferFinalAnswer,
bufferFinalAnswer,
takeBufferedFinalAnswer,
resetForNextStep,
};
}