fix(reply): track messaging media aliases for dedupe

This commit is contained in:
Peter Steinberger
2026-02-17 03:27:17 +01:00
parent 1f850374f6
commit a75e95be02
4 changed files with 137 additions and 26 deletions

View File

@@ -1,14 +1,14 @@
import type { AgentEvent } from "@mariozechner/pi-agent-core";
import { describe, expect, it, vi } from "vitest";
import type { MessagingToolSend } from "./pi-embedded-messaging.js";
import {
handleToolExecutionEnd,
handleToolExecutionStart,
} from "./pi-embedded-subscribe.handlers.tools.js";
import type {
ToolCallSummary,
ToolHandlerContext,
} from "./pi-embedded-subscribe.handlers.types.js";
import {
handleToolExecutionEnd,
handleToolExecutionStart,
} from "./pi-embedded-subscribe.handlers.tools.js";
type ToolExecutionStartEvent = Extract<AgentEvent, { type: "tool_execution_start" }>;
type ToolExecutionEndEvent = Extract<AgentEvent, { type: "tool_execution_end" }>;
@@ -39,7 +39,7 @@ function createTestContext(): {
toolSummaryById: new Set<string>(),
pendingMessagingTargets: new Map<string, MessagingToolSend>(),
pendingMessagingTexts: new Map<string, string>(),
pendingMessagingMediaUrls: new Map<string, string>(),
pendingMessagingMediaUrls: new Map<string, string[]>(),
messagingToolSentTexts: [],
messagingToolSentTextsNormalized: [],
messagingToolSentMediaUrls: [],
@@ -145,19 +145,19 @@ describe("handleToolExecutionEnd cron.add commitment tracking", () => {
});
describe("messaging tool media URL tracking", () => {
it("tracks mediaUrl arg from messaging tool as pending", async () => {
it("tracks media arg from messaging tool as pending", async () => {
const { ctx } = createTestContext();
const evt: ToolExecutionStartEvent = {
type: "tool_execution_start",
toolName: "message",
toolCallId: "tool-m1",
args: { action: "send", to: "channel:123", content: "hi", mediaUrl: "file:///img.jpg" },
args: { action: "send", to: "channel:123", content: "hi", media: "file:///img.jpg" },
};
await handleToolExecutionStart(ctx, evt);
expect(ctx.state.pendingMessagingMediaUrls.get("tool-m1")).toBe("file:///img.jpg");
expect(ctx.state.pendingMessagingMediaUrls.get("tool-m1")).toEqual(["file:///img.jpg"]);
});
it("commits pending media URL on tool success", async () => {
@@ -168,7 +168,7 @@ describe("messaging tool media URL tracking", () => {
type: "tool_execution_start",
toolName: "message",
toolCallId: "tool-m2",
args: { action: "send", to: "channel:123", content: "hi", mediaUrl: "file:///img.jpg" },
args: { action: "send", to: "channel:123", content: "hi", media: "file:///img.jpg" },
};
await handleToolExecutionStart(ctx, startEvt);
@@ -188,6 +188,41 @@ describe("messaging tool media URL tracking", () => {
expect(ctx.state.pendingMessagingMediaUrls.has("tool-m2")).toBe(false);
});
it("commits mediaUrls from tool result payload", async () => {
const { ctx } = createTestContext();
const startEvt: ToolExecutionStartEvent = {
type: "tool_execution_start",
toolName: "message",
toolCallId: "tool-m2b",
args: { action: "send", to: "channel:123", content: "hi" },
};
await handleToolExecutionStart(ctx, startEvt);
const endEvt: ToolExecutionEndEvent = {
type: "tool_execution_end",
toolName: "message",
toolCallId: "tool-m2b",
isError: false,
result: {
content: [
{
type: "text",
text: JSON.stringify({
mediaUrls: ["file:///img-a.jpg", "file:///img-b.jpg"],
}),
},
],
},
};
await handleToolExecutionEnd(ctx, endEvt);
expect(ctx.state.messagingToolSentMediaUrls).toEqual([
"file:///img-a.jpg",
"file:///img-b.jpg",
]);
});
it("trims messagingToolSentMediaUrls to 200 on commit (FIFO)", async () => {
const { ctx } = createTestContext();
@@ -220,7 +255,7 @@ describe("messaging tool media URL tracking", () => {
type: "tool_execution_start",
toolName: "message",
toolCallId: "tool-cap",
args: { action: "send", to: "channel:123", content: "hi", mediaUrl: "file:///img-new.jpg" },
args: { action: "send", to: "channel:123", content: "hi", media: "file:///img-new.jpg" },
};
await handleToolExecutionStart(ctx, startEvt);
@@ -247,7 +282,7 @@ describe("messaging tool media URL tracking", () => {
type: "tool_execution_start",
toolName: "message",
toolCallId: "tool-m3",
args: { action: "send", to: "channel:123", content: "hi", mediaUrl: "file:///img.jpg" },
args: { action: "send", to: "channel:123", content: "hi", media: "file:///img.jpg" },
};
await handleToolExecutionStart(ctx, startEvt);

View File

@@ -1,13 +1,13 @@
import type { AgentEvent } from "@mariozechner/pi-agent-core";
import { emitAgentEvent } from "../infra/agent-events.js";
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
import type { PluginHookAfterToolCallEvent } from "../plugins/types.js";
import { normalizeTextForComparison } from "./pi-embedded-helpers.js";
import { isMessagingTool, isMessagingToolSendAction } from "./pi-embedded-messaging.js";
import type {
ToolCallSummary,
ToolHandlerContext,
} from "./pi-embedded-subscribe.handlers.types.js";
import { emitAgentEvent } from "../infra/agent-events.js";
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
import { normalizeTextForComparison } from "./pi-embedded-helpers.js";
import { isMessagingTool, isMessagingToolSendAction } from "./pi-embedded-messaging.js";
import {
extractToolErrorMessage,
extractToolResultMediaPaths,
@@ -63,6 +63,71 @@ function extendExecMeta(toolName: string, args: unknown, meta?: string): string
return meta ? `${meta} · ${suffix}` : suffix;
}
function pushUniqueMediaUrl(urls: string[], seen: Set<string>, value: unknown): void {
if (typeof value !== "string") {
return;
}
const normalized = value.trim();
if (!normalized || seen.has(normalized)) {
return;
}
seen.add(normalized);
urls.push(normalized);
}
function collectMessagingMediaUrlsFromRecord(record: Record<string, unknown>): string[] {
const urls: string[] = [];
const seen = new Set<string>();
pushUniqueMediaUrl(urls, seen, record.media);
pushUniqueMediaUrl(urls, seen, record.mediaUrl);
pushUniqueMediaUrl(urls, seen, record.path);
pushUniqueMediaUrl(urls, seen, record.filePath);
const mediaUrls = record.mediaUrls;
if (Array.isArray(mediaUrls)) {
for (const mediaUrl of mediaUrls) {
pushUniqueMediaUrl(urls, seen, mediaUrl);
}
}
return urls;
}
function collectMessagingMediaUrlsFromToolResult(result: unknown): string[] {
const urls: string[] = [];
const seen = new Set<string>();
const appendFromRecord = (value: unknown) => {
if (!value || typeof value !== "object") {
return;
}
const extracted = collectMessagingMediaUrlsFromRecord(value as Record<string, unknown>);
for (const url of extracted) {
if (seen.has(url)) {
continue;
}
seen.add(url);
urls.push(url);
}
};
appendFromRecord(result);
if (result && typeof result === "object") {
appendFromRecord((result as Record<string, unknown>).details);
}
const outputText = extractToolResultText(result);
if (outputText) {
try {
appendFromRecord(JSON.parse(outputText));
} catch {
// Ignore non-JSON tool output.
}
}
return urls;
}
export async function handleToolExecutionStart(
ctx: ToolHandlerContext,
evt: AgentEvent & { toolName: string; toolCallId: string; args: unknown },
@@ -145,10 +210,10 @@ export async function handleToolExecutionStart(
ctx.state.pendingMessagingTexts.set(toolCallId, text);
ctx.log.debug(`Tracking pending messaging text: tool=${toolName} len=${text.length}`);
}
// Track media URL from messaging tool args (pending until tool_execution_end)
const mediaUrl = argsRecord.mediaUrl ?? argsRecord.path ?? argsRecord.filePath;
if (mediaUrl && typeof mediaUrl === "string") {
ctx.state.pendingMessagingMediaUrls.set(toolCallId, mediaUrl);
// Track media URLs from messaging tool args (pending until tool_execution_end).
const mediaUrls = collectMessagingMediaUrlsFromRecord(argsRecord);
if (mediaUrls.length > 0) {
ctx.state.pendingMessagingMediaUrls.set(toolCallId, mediaUrls);
}
}
}
@@ -253,11 +318,22 @@ export async function handleToolExecutionEnd(
ctx.trimMessagingToolSent();
}
}
const pendingMediaUrl = ctx.state.pendingMessagingMediaUrls.get(toolCallId);
if (pendingMediaUrl) {
ctx.state.pendingMessagingMediaUrls.delete(toolCallId);
if (!isToolError) {
ctx.state.messagingToolSentMediaUrls.push(pendingMediaUrl);
const pendingMediaUrls = ctx.state.pendingMessagingMediaUrls.get(toolCallId) ?? [];
ctx.state.pendingMessagingMediaUrls.delete(toolCallId);
const startArgs =
startData?.args && typeof startData.args === "object"
? (startData.args as Record<string, unknown>)
: {};
const isMessagingSend =
pendingMediaUrls.length > 0 ||
(isMessagingTool(toolName) && isMessagingToolSendAction(toolName, startArgs));
if (!isToolError && isMessagingSend) {
const committedMediaUrls = [
...pendingMediaUrls,
...collectMessagingMediaUrlsFromToolResult(result),
];
if (committedMediaUrls.length > 0) {
ctx.state.messagingToolSentMediaUrls.push(...committedMediaUrls);
ctx.trimMessagingToolSent();
}
}

View File

@@ -74,7 +74,7 @@ export type EmbeddedPiSubscribeState = {
pendingMessagingTexts: Map<string, string>;
pendingMessagingTargets: Map<string, MessagingToolSend>;
successfulCronAdds: number;
pendingMessagingMediaUrls: Map<string, string>;
pendingMessagingMediaUrls: Map<string, string[]>;
lastAssistant?: AgentMessage;
};

View File

@@ -41,7 +41,7 @@ function createToolHandlerCtx(params: {
lastToolError: undefined,
pendingMessagingTexts: new Map<string, string>(),
pendingMessagingTargets: new Map<string, unknown>(),
pendingMessagingMediaUrls: new Map<string, string>(),
pendingMessagingMediaUrls: new Map<string, string[]>(),
messagingToolSentTexts: [] as string[],
messagingToolSentTextsNormalized: [] as string[],
messagingToolSentMediaUrls: [] as string[],