From 72d1fa4da56aa68bd5ce36862ed80130b1b8919c Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 3 Jan 2026 01:48:38 +0100 Subject: [PATCH] fix: dedupe repeated block replies --- src/agents/pi-embedded-subscribe.test.ts | 49 ++++++++++++++++++++++++ src/agents/pi-embedded-subscribe.ts | 15 ++++++++ 2 files changed, 64 insertions(+) diff --git a/src/agents/pi-embedded-subscribe.test.ts b/src/agents/pi-embedded-subscribe.test.ts index 4b16f5fcb4..97ab7a62c1 100644 --- a/src/agents/pi-embedded-subscribe.test.ts +++ b/src/agents/pi-embedded-subscribe.test.ts @@ -182,6 +182,55 @@ describe("subscribeEmbeddedPiSession", () => { expect(subscription.assistantTexts).toEqual(["Hello block"]); }); + it("does not emit duplicate block replies when text_end repeats", () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onBlockReply = vi.fn(); + + const subscription = subscribeEmbeddedPiSession({ + session: session as unknown as Parameters< + typeof subscribeEmbeddedPiSession + >[0]["session"], + runId: "run", + onBlockReply, + blockReplyBreak: "text_end", + }); + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: "Hello block", + }, + }); + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_end", + }, + }); + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_end", + }, + }); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + expect(subscription.assistantTexts).toEqual(["Hello block"]); + }); + it("waits for auto-compaction retry and clears buffered text", async () => { const listeners: SessionEventHandler[] = []; const session = { diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 646dc96c25..fb2b8b6f87 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -78,6 +78,7 @@ export function subscribeEmbeddedPiSession(params: { const blockReplyBreak = params.blockReplyBreak ?? "text_end"; let deltaBuffer = ""; let lastStreamedAssistant: string | undefined; + let lastBlockReplyText: string | undefined; let assistantTextBaseline = 0; let compactionInFlight = false; let pendingCompactionRetry = 0; @@ -152,6 +153,7 @@ export function subscribeEmbeddedPiSession(params: { toolMetaById.clear(); deltaBuffer = ""; lastStreamedAssistant = undefined; + lastBlockReplyText = undefined; assistantTextBaseline = 0; toolDebouncer.flush(); }; @@ -304,6 +306,12 @@ export function subscribeEmbeddedPiSession(params: { } if (evtType === "text_end" && blockReplyBreak === "text_end") { + if (next && next === lastBlockReplyText) { + deltaBuffer = ""; + lastStreamedAssistant = undefined; + return; + } + lastBlockReplyText = next || undefined; if (next) assistantTexts.push(next); if (next && params.onBlockReply) { const { text: cleanedText, mediaUrls } = @@ -349,6 +357,12 @@ export function subscribeEmbeddedPiSession(params: { text && params.onBlockReply ) { + if (text === lastBlockReplyText) { + deltaBuffer = ""; + lastStreamedAssistant = undefined; + return; + } + lastBlockReplyText = text; const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text); if (cleanedText || (mediaUrls && mediaUrls.length > 0)) { void params.onBlockReply({ @@ -359,6 +373,7 @@ export function subscribeEmbeddedPiSession(params: { } deltaBuffer = ""; lastStreamedAssistant = undefined; + lastBlockReplyText = undefined; } }