From 0ff506140db2bae3a38c04ca48c9e9b29e2edf78 Mon Sep 17 00:00:00 2001 From: Vignesh Natarajan Date: Wed, 18 Feb 2026 23:33:55 -0800 Subject: [PATCH] fix: clear matched tool errors and dedupe reasoning end --- ...pi-embedded-subscribe.handlers.messages.ts | 22 +++++++- .../pi-embedded-subscribe.handlers.types.ts | 1 + ...ion.subscribeembeddedpisession.e2e.test.ts | 53 +++++++++++++++++++ src/agents/pi-embedded-subscribe.ts | 2 + src/agents/tool-mutation.test.ts | 6 ++- src/agents/tool-mutation.ts | 7 ++- 6 files changed, 87 insertions(+), 4 deletions(-) diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index 9daf724803..9aa445a1ab 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -30,6 +30,14 @@ const stripTrailingDirective = (text: string): string => { return text.slice(0, openIndex); }; +function emitReasoningEnd(ctx: EmbeddedPiSubscribeContext) { + if (!ctx.state.reasoningStreamOpen) { + return; + } + ctx.state.reasoningStreamOpen = false; + void ctx.params.onReasoningEnd?.(); +} + export function resolveSilentReplyFallbackText(params: { text: string; messagingToolSentTexts: string[]; @@ -83,6 +91,9 @@ export function handleMessageUpdate( const evtType = typeof assistantRecord?.type === "string" ? assistantRecord.type : ""; if (evtType === "thinking_start" || evtType === "thinking_delta" || evtType === "thinking_end") { + if (evtType === "thinking_start" || evtType === "thinking_delta") { + ctx.state.reasoningStreamOpen = true; + } const thinkingDelta = typeof assistantRecord?.delta === "string" ? assistantRecord.delta : ""; const thinkingContent = typeof assistantRecord?.content === "string" ? assistantRecord.content : ""; @@ -101,7 +112,10 @@ export function handleMessageUpdate( ctx.emitReasoningStream(partialThinking || thinkingContent || thinkingDelta); } if (evtType === "thinking_end") { - void ctx.params.onReasoningEnd?.(); + if (!ctx.state.reasoningStreamOpen) { + ctx.state.reasoningStreamOpen = true; + } + emitReasoningEnd(ctx); } return; } @@ -166,9 +180,12 @@ export function handleMessageUpdate( if (next) { const wasThinking = ctx.state.partialBlockState.thinking; const visibleDelta = chunk ? ctx.stripBlockTags(chunk, ctx.state.partialBlockState) : ""; + if (!wasThinking && ctx.state.partialBlockState.thinking) { + ctx.state.reasoningStreamOpen = true; + } // Detect when thinking block ends ( tag processed) if (wasThinking && !ctx.state.partialBlockState.thinking) { - void ctx.params.onReasoningEnd?.(); + emitReasoningEnd(ctx); } const parsedDelta = visibleDelta ? ctx.consumePartialReplyDirectives(visibleDelta) : null; const parsedFull = parseReplyDirectives(stripTrailingDirective(next)); @@ -414,4 +431,5 @@ export function handleMessageEnd( ctx.state.blockState.inlineCode = createInlineCodeState(); ctx.state.lastStreamedAssistant = undefined; ctx.state.lastStreamedAssistantCleaned = undefined; + ctx.state.reasoningStreamOpen = false; } diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index 435325601d..d5c725528c 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -52,6 +52,7 @@ export type EmbeddedPiSubscribeState = { emittedAssistantUpdate: boolean; lastStreamedReasoning?: string; lastBlockReplyText?: string; + reasoningStreamOpen: boolean; assistantMessageIndex: number; lastAssistantTextMessageIndex: number; lastAssistantTextNormalized?: string; diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.e2e.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.e2e.test.ts index 0bee38b133..b91857c2d7 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.e2e.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.e2e.test.ts @@ -251,6 +251,59 @@ describe("subscribeEmbeddedPiSession", () => { expect(onReasoningEnd).toHaveBeenCalledTimes(1); }); + it("emits reasoning end once when native and tagged reasoning end overlap", () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onReasoningEnd = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters[0]["session"], + runId: "run", + reasoningMode: "stream", + onReasoningStream: vi.fn(), + onReasoningEnd, + }); + + handler?.({ type: "message_start", message: { role: "assistant" } }); + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: "Checking", + }, + }); + + handler?.({ + type: "message_update", + message: { + role: "assistant", + content: [{ type: "thinking", thinking: "Checking" }], + }, + assistantMessageEvent: { + type: "thinking_end", + }, + }); + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: " files\nFinal answer", + }, + }); + + expect(onReasoningEnd).toHaveBeenCalledTimes(1); + }); + it("emits delta chunks in agent events for streaming assistant text", () => { const { emit, onAgentEvent } = createAgentEventHarness(); diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 594cc43862..8e7a7fec29 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -55,6 +55,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar emittedAssistantUpdate: false, lastStreamedReasoning: undefined, lastBlockReplyText: undefined, + reasoningStreamOpen: false, assistantMessageIndex: 0, lastAssistantTextMessageIndex: -1, lastAssistantTextNormalized: undefined, @@ -117,6 +118,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar state.lastBlockReplyText = undefined; state.lastStreamedReasoning = undefined; state.lastReasoningSent = undefined; + state.reasoningStreamOpen = false; state.suppressBlockChunks = false; state.assistantMessageIndex += 1; state.lastAssistantTextMessageIndex = -1; diff --git a/src/agents/tool-mutation.test.ts b/src/agents/tool-mutation.test.ts index 3eb417a71b..ab618f8da5 100644 --- a/src/agents/tool-mutation.test.ts +++ b/src/agents/tool-mutation.test.ts @@ -27,7 +27,11 @@ describe("tool mutation helpers", () => { expect(writeFingerprint).toContain("tool=write"); expect(writeFingerprint).toContain("path=/tmp/demo.txt"); expect(writeFingerprint).toContain("id=42"); - expect(writeFingerprint).toContain("meta=write /tmp/demo.txt"); + expect(writeFingerprint).not.toContain("meta=write /tmp/demo.txt"); + + const metaOnlyFingerprint = buildToolActionFingerprint("exec", { command: "ls -la" }, "ls -la"); + expect(metaOnlyFingerprint).toContain("tool=exec"); + expect(metaOnlyFingerprint).toContain("meta=ls -la"); const readFingerprint = buildToolActionFingerprint("read", { path: "/tmp/demo.txt" }); expect(readFingerprint).toBeUndefined(); diff --git a/src/agents/tool-mutation.ts b/src/agents/tool-mutation.ts index 22b0e7af9d..a88bbadfd2 100644 --- a/src/agents/tool-mutation.ts +++ b/src/agents/tool-mutation.ts @@ -151,6 +151,7 @@ export function buildToolActionFingerprint( if (action) { parts.push(`action=${action}`); } + let hasStableTarget = false; for (const key of [ "path", "filePath", @@ -167,10 +168,14 @@ export function buildToolActionFingerprint( const value = normalizeFingerprintValue(record?.[key]); if (value) { parts.push(`${key.toLowerCase()}=${value}`); + hasStableTarget = true; } } const normalizedMeta = meta?.trim().replace(/\s+/g, " ").toLowerCase(); - if (normalizedMeta) { + // Meta text often carries volatile details (for example "N chars"). + // Prefer stable arg-derived keys for matching; only fall back to meta + // when no stable target key is available. + if (normalizedMeta && !hasStableTarget) { parts.push(`meta=${normalizedMeta}`); } return parts.join("|");