diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a1f13109d..4f8766f398 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ - Discord: allow emoji/sticker uploads + channel actions in config defaults. (#870) — thanks @JDIVE. - Fix: sanitize user-facing error text + strip `` tags across reply pipelines. (#975) — thanks @ThomsenDrake. - Fix: normalize pairing CLI aliases, allow extension channels, and harden Zalo webhook payload parsing. (#991) — thanks @longmaba. +- Fix: stream block replies between tool calls when block streaming is off. (#992) — thanks @tyler6204. ## 2026.1.14-1 diff --git a/src/agents/pi-embedded-subscribe.handlers.tools.ts b/src/agents/pi-embedded-subscribe.handlers.tools.ts index 6abb4c05eb..cbe5cabc7a 100644 --- a/src/agents/pi-embedded-subscribe.handlers.tools.ts +++ b/src/agents/pi-embedded-subscribe.handlers.tools.ts @@ -109,14 +109,23 @@ export function handleToolExecutionUpdate( partialResult: sanitized, }, }); - void ctx.params.onAgentEvent?.({ - stream: "tool", - data: { - phase: "update", - name: toolName, - toolCallId, - }, - }); + const onAgentEvent = ctx.params.onAgentEvent; + if (onAgentEvent) { + void Promise.resolve() + .then(() => + onAgentEvent({ + stream: "tool", + data: { + phase: "update", + name: toolName, + toolCallId, + }, + }), + ) + .catch((err: unknown) => { + ctx.log.debug(`tool_execution_update handler failed: ${String(err)}`); + }); + } } export function handleToolExecutionEnd( @@ -171,16 +180,25 @@ export function handleToolExecutionEnd( result: sanitizedResult, }, }); - void ctx.params.onAgentEvent?.({ - stream: "tool", - data: { - phase: "result", - name: toolName, - toolCallId, - meta, - isError: isToolError, - }, - }); + const onAgentEvent = ctx.params.onAgentEvent; + if (onAgentEvent) { + void Promise.resolve() + .then(() => + onAgentEvent({ + stream: "tool", + data: { + phase: "result", + name: toolName, + toolCallId, + meta, + isError: isToolError, + }, + }), + ) + .catch((err: unknown) => { + ctx.log.debug(`tool_execution_result handler failed: ${String(err)}`); + }); + } ctx.log.debug( `embedded run tool end: runId=${ctx.params.runId} tool=${toolName} toolCallId=${toolCallId}`, diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.swallows-tool-event-rejections.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.swallows-tool-event-rejections.test.ts new file mode 100644 index 0000000000..689d93791e --- /dev/null +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.swallows-tool-event-rejections.test.ts @@ -0,0 +1,49 @@ +import { describe, expect, it, vi } from "vitest"; +import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js"; + +type StubSession = { + subscribe: (fn: (evt: unknown) => void) => () => void; +}; + +type SessionEventHandler = (evt: unknown) => void; + +describe("subscribeEmbeddedPiSession", () => { + it("swallows onAgentEvent rejections for tool update/result events", async () => { + let handler: SessionEventHandler | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onAgentEvent = vi.fn().mockRejectedValue(new Error("boom")); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters[0]["session"], + runId: "run", + onAgentEvent, + }); + + expect(() => { + handler?.({ + type: "tool_execution_update", + toolName: "bash", + toolCallId: "tool-1", + partialResult: "ok", + }); + handler?.({ + type: "tool_execution_end", + toolName: "bash", + toolCallId: "tool-1", + isError: false, + result: "ok", + }); + }).not.toThrow(); + + // Allow async rejection handling to settle. + await Promise.resolve(); + + expect(onAgentEvent).toHaveBeenCalledTimes(2); + }); +}); diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index abecf58be6..f939054389 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -322,8 +322,12 @@ export async function runAgentTurnWithFallback(params: { } else { // Send directly when flushing before tool execution (no streaming). // Track sent key to avoid duplicate in final payloads. - directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload)); - await params.opts?.onBlockReply?.(blockPayload); + try { + await params.opts?.onBlockReply?.(blockPayload); + directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload)); + } catch (err) { + logVerbose(`block reply direct delivery failed: ${String(err)}`); + } } } : undefined,