fix(telegram): scope stream fix to telegram lane handling

This commit is contained in:
Ayaan Zaidi
2026-02-19 15:47:59 +05:30
parent 99fa7d4b1d
commit 2947c69ae4
5 changed files with 8 additions and 98 deletions

View File

@@ -16,7 +16,6 @@ import {
extractThinkingFromTaggedText,
formatReasoningMessage,
promoteThinkingTagsToBlocks,
stripTrailingPartialThinkingTagFragment,
} from "./pi-embedded-utils.js";
const stripTrailingDirective = (text: string): string => {
@@ -190,7 +189,7 @@ export function handleMessageUpdate(
}
const parsedDelta = visibleDelta ? ctx.consumePartialReplyDirectives(visibleDelta) : null;
const parsedFull = parseReplyDirectives(stripTrailingDirective(next));
const cleanedText = stripTrailingPartialThinkingTagFragment(parsedFull.text);
const cleanedText = parsedFull.text;
const mediaUrls = parsedDelta?.mediaUrls;
const hasMedia = Boolean(mediaUrls && mediaUrls.length > 0);
const hasAudio = Boolean(parsedDelta?.audioAsVoice);
@@ -280,10 +279,6 @@ export function handleMessageEnd(
? extractAssistantThinking(assistantMessage) || extractThinkingFromTaggedText(rawText)
: "";
const formattedReasoning = rawThinking ? formatReasoningMessage(rawThinking) : "";
if (ctx.state.streamReasoning && rawThinking) {
// Emit final reasoning snapshot before answer finalization paths.
ctx.emitReasoningStream(rawThinking);
}
const trimmedText = text.trim();
const parsedText = trimmedText ? parseReplyDirectives(stripTrailingDirective(trimmedText)) : null;
let cleanedText = parsedText?.text ?? "";
@@ -400,6 +395,9 @@ export function handleMessageEnd(
if (!shouldEmitReasoningBeforeAnswer) {
maybeEmitReasoning();
}
if (ctx.state.streamReasoning && rawThinking) {
ctx.emitReasoningStream(rawThinking);
}
if (ctx.state.blockReplyBreak === "text_end" && onBlockReply) {
const tailResult = ctx.consumeReplyDirectives("", { final: true });

View File

@@ -298,61 +298,6 @@ describe("subscribeEmbeddedPiSession", () => {
expect(onReasoningEnd).toHaveBeenCalledTimes(1);
});
it("does not leak partial think tag fragments into reasoning or assistant streams", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onReasoningStream = vi.fn();
const onPartialReply = vi.fn();
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
runId: "run",
reasoningMode: "stream",
onReasoningStream,
onPartialReply,
});
handler?.({ type: "message_start", message: { role: "assistant" } });
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: "<think>step one" },
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: " and two</think" },
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: ">\nfinal answer" },
});
const reasoningTexts = onReasoningStream.mock.calls
.map((call) => call[0]?.text)
.filter((value): value is string => typeof value === "string");
const partialTexts = onPartialReply.mock.calls
.map((call) => call[0]?.text)
.filter((value): value is string => typeof value === "string");
expect(reasoningTexts.at(-1)).toContain("Reasoning:\n_step one and two_");
for (const text of reasoningTexts) {
expect(text).not.toContain("</think");
expect(text).not.toContain("<think");
}
for (const text of partialTexts) {
expect(text).not.toContain("</think");
expect(text).not.toContain("<think");
}
});
it("emits delta chunks in agent events for streaming assistant text", () => {
const { emit, onAgentEvent } = createAgentEventHarness();

View File

@@ -2,9 +2,7 @@ import type { AssistantMessage } from "@mariozechner/pi-ai";
import { describe, expect, it } from "vitest";
import {
extractAssistantText,
extractThinkingFromTaggedStream,
formatReasoningMessage,
stripTrailingPartialThinkingTagFragment,
stripDowngradedToolCallText,
} from "./pi-embedded-utils.js";
@@ -605,19 +603,6 @@ describe("formatReasoningMessage", () => {
});
});
describe("thinking tag fragment handling", () => {
it("strips dangling closing tag fragments from tagged stream extraction", () => {
expect(extractThinkingFromTaggedStream("<think>step one and two</")).toBe("step one and two");
expect(extractThinkingFromTaggedStream("<think>step one and two</th")).toBe("step one and two");
});
it("strips dangling opening/closing think fragments while preserving plain text", () => {
expect(stripTrailingPartialThinkingTagFragment("Reasoning line <th")).toBe("Reasoning line");
expect(stripTrailingPartialThinkingTagFragment("Reasoning line </")).toBe("Reasoning line");
expect(stripTrailingPartialThinkingTagFragment("2 < 3")).toBe("2 < 3");
});
});
describe("stripDowngradedToolCallText", () => {
it("strips [Historical context: ...] blocks", () => {
const text = `[Historical context: a different model called tool "exec" with arguments {"command":"git status"}]`;

View File

@@ -390,31 +390,13 @@ export function extractThinkingFromTaggedText(text: string): string {
return result.trim();
}
export function stripTrailingPartialThinkingTagFragment(text: string): string {
if (!text) {
return text;
}
const match = text.match(/<\s*\/?\s*([a-z]*)\s*$/i);
if (!match || typeof match.index !== "number") {
return text;
}
const prefix = (match[1] ?? "").toLowerCase();
const targets = ["think", "thinking", "thought", "antthinking"];
const isThinkingPrefix =
prefix.length === 0 || targets.some((target) => target.startsWith(prefix));
if (!isThinkingPrefix) {
return text;
}
return text.slice(0, match.index).trimEnd();
}
export function extractThinkingFromTaggedStream(text: string): string {
if (!text) {
return "";
}
const closed = extractThinkingFromTaggedText(text);
if (closed) {
return stripTrailingPartialThinkingTagFragment(closed);
return closed;
}
const openRe = /<\s*(?:think(?:ing)?|thought|antthinking)\s*>/gi;
@@ -427,10 +409,10 @@ export function extractThinkingFromTaggedStream(text: string): string {
const lastOpen = openMatches[openMatches.length - 1];
const lastClose = closeMatches[closeMatches.length - 1];
if (lastClose && (lastClose.index ?? -1) > (lastOpen.index ?? -1)) {
return stripTrailingPartialThinkingTagFragment(closed);
return closed;
}
const start = (lastOpen.index ?? 0) + lastOpen[0].length;
return stripTrailingPartialThinkingTagFragment(text.slice(start).trim());
return text.slice(start).trim();
}
export function inferToolMetaFromArgs(toolName: string, args: unknown): string | undefined {

View File

@@ -301,11 +301,11 @@ export async function runAgentTurnWithFallback(params: {
onReasoningStream:
params.typingSignals.shouldStartOnReasoning || params.opts?.onReasoningStream
? async (payload) => {
await params.typingSignals.signalReasoningDelta();
await params.opts?.onReasoningStream?.({
text: payload.text,
mediaUrls: payload.mediaUrls,
});
await params.typingSignals.signalReasoningDelta();
}
: undefined,
onReasoningEnd: params.opts?.onReasoningEnd,