fix: guard tool-block replies + events (#992) (thanks @tyler6204)

This commit is contained in:
Peter Steinberger
2026-01-16 07:25:51 +00:00
parent e1c0b8276f
commit 7dead2530b
4 changed files with 92 additions and 20 deletions

View File

@@ -39,6 +39,7 @@
- Discord: allow emoji/sticker uploads + channel actions in config defaults. (#870) — thanks @JDIVE.
- Fix: sanitize user-facing error text + strip `<final>` tags across reply pipelines. (#975) — thanks @ThomsenDrake.
- Fix: normalize pairing CLI aliases, allow extension channels, and harden Zalo webhook payload parsing. (#991) — thanks @longmaba.
- Fix: stream block replies between tool calls when block streaming is off. (#992) — thanks @tyler6204.
## 2026.1.14-1

View File

@@ -109,14 +109,23 @@ export function handleToolExecutionUpdate(
partialResult: sanitized,
},
});
void ctx.params.onAgentEvent?.({
stream: "tool",
data: {
phase: "update",
name: toolName,
toolCallId,
},
});
const onAgentEvent = ctx.params.onAgentEvent;
if (onAgentEvent) {
void Promise.resolve()
.then(() =>
onAgentEvent({
stream: "tool",
data: {
phase: "update",
name: toolName,
toolCallId,
},
}),
)
.catch((err: unknown) => {
ctx.log.debug(`tool_execution_update handler failed: ${String(err)}`);
});
}
}
export function handleToolExecutionEnd(
@@ -171,16 +180,25 @@ export function handleToolExecutionEnd(
result: sanitizedResult,
},
});
void ctx.params.onAgentEvent?.({
stream: "tool",
data: {
phase: "result",
name: toolName,
toolCallId,
meta,
isError: isToolError,
},
});
const onAgentEvent = ctx.params.onAgentEvent;
if (onAgentEvent) {
void Promise.resolve()
.then(() =>
onAgentEvent({
stream: "tool",
data: {
phase: "result",
name: toolName,
toolCallId,
meta,
isError: isToolError,
},
}),
)
.catch((err: unknown) => {
ctx.log.debug(`tool_execution_result handler failed: ${String(err)}`);
});
}
ctx.log.debug(
`embedded run tool end: runId=${ctx.params.runId} tool=${toolName} toolCallId=${toolCallId}`,

View File

@@ -0,0 +1,49 @@
import { describe, expect, it, vi } from "vitest";
import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
type StubSession = {
subscribe: (fn: (evt: unknown) => void) => () => void;
};
type SessionEventHandler = (evt: unknown) => void;
describe("subscribeEmbeddedPiSession", () => {
it("swallows onAgentEvent rejections for tool update/result events", async () => {
let handler: SessionEventHandler | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onAgentEvent = vi.fn().mockRejectedValue(new Error("boom"));
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
runId: "run",
onAgentEvent,
});
expect(() => {
handler?.({
type: "tool_execution_update",
toolName: "bash",
toolCallId: "tool-1",
partialResult: "ok",
});
handler?.({
type: "tool_execution_end",
toolName: "bash",
toolCallId: "tool-1",
isError: false,
result: "ok",
});
}).not.toThrow();
// Allow async rejection handling to settle.
await Promise.resolve();
expect(onAgentEvent).toHaveBeenCalledTimes(2);
});
});

View File

@@ -322,8 +322,12 @@ export async function runAgentTurnWithFallback(params: {
} else {
// Send directly when flushing before tool execution (no streaming).
// Track sent key to avoid duplicate in final payloads.
directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload));
await params.opts?.onBlockReply?.(blockPayload);
try {
await params.opts?.onBlockReply?.(blockPayload);
directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload));
} catch (err) {
logVerbose(`block reply direct delivery failed: ${String(err)}`);
}
}
}
: undefined,