diff --git a/src/gateway/server-node-events.test.ts b/src/gateway/server-node-events.test.ts index 972159d43f..ef9ff66f96 100644 --- a/src/gateway/server-node-events.test.ts +++ b/src/gateway/server-node-events.test.ts @@ -30,18 +30,20 @@ vi.mock("./session-utils.js", () => ({ })); import type { CliDeps } from "../cli/deps.js"; -import { agentCommand } from "../commands/agent.js"; import type { HealthSummary } from "../commands/health.js"; +import type { NodeEventContext } from "./server-node-events-types.js"; +import { agentCommand } from "../commands/agent.js"; import { updateSessionStore } from "../config/sessions.js"; import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; -import type { NodeEventContext } from "./server-node-events-types.js"; import { handleNodeEvent } from "./server-node-events.js"; +import { loadSessionEntry } from "./session-utils.js"; const enqueueSystemEventMock = vi.mocked(enqueueSystemEvent); const requestHeartbeatNowMock = vi.mocked(requestHeartbeatNow); const agentCommandMock = vi.mocked(agentCommand); const updateSessionStoreMock = vi.mocked(updateSessionStore); +const loadSessionEntryMock = vi.mocked(loadSessionEntry); function buildCtx(): NodeEventContext { return { @@ -267,3 +269,80 @@ describe("voice transcript events", () => { expect(warn).toHaveBeenCalledWith(expect.stringContaining("voice session-store update failed")); }); }); + +describe("agent request events", () => { + beforeEach(() => { + agentCommandMock.mockReset(); + updateSessionStoreMock.mockReset(); + loadSessionEntryMock.mockReset(); + agentCommandMock.mockResolvedValue({ status: "ok" } as never); + updateSessionStoreMock.mockImplementation(async (_storePath, update) => { + update({}); + }); + loadSessionEntryMock.mockImplementation((sessionKey: string) => ({ + storePath: "/tmp/sessions.json", + entry: { sessionId: `sid-${sessionKey}` }, + canonicalKey: sessionKey, + })); + }); + + it("disables delivery when route is unresolved instead of falling back globally", async () => { + const warn = vi.fn(); + const ctx = buildCtx(); + ctx.logGateway = { warn }; + + await handleNodeEvent(ctx, "node-route-miss", { + event: "agent.request", + payloadJSON: JSON.stringify({ + message: "summarize this", + sessionKey: "agent:main:main", + deliver: true, + }), + }); + + expect(agentCommandMock).toHaveBeenCalledTimes(1); + const [opts] = agentCommandMock.mock.calls[0] ?? []; + expect(opts).toMatchObject({ + message: "summarize this", + sessionKey: "agent:main:main", + deliver: false, + channel: undefined, + to: undefined, + }); + expect(warn).toHaveBeenCalledWith( + expect.stringContaining("agent delivery disabled node=node-route-miss"), + ); + }); + + it("reuses the current session route when delivery target is omitted", async () => { + const ctx = buildCtx(); + loadSessionEntryMock.mockReturnValueOnce({ + storePath: "/tmp/sessions.json", + entry: { + sessionId: "sid-current", + lastChannel: "telegram", + lastTo: "123", + }, + canonicalKey: "agent:main:main", + }); + + await handleNodeEvent(ctx, "node-route-hit", { + event: "agent.request", + payloadJSON: JSON.stringify({ + message: "route on session", + sessionKey: "agent:main:main", + deliver: true, + }), + }); + + expect(agentCommandMock).toHaveBeenCalledTimes(1); + const [opts] = agentCommandMock.mock.calls[0] ?? []; + expect(opts).toMatchObject({ + message: "route on session", + sessionKey: "agent:main:main", + deliver: true, + channel: "telegram", + to: "123", + }); + }); +}); diff --git a/src/gateway/server-node-events.ts b/src/gateway/server-node-events.ts index 05d2d59dc8..85f21bf4aa 100644 --- a/src/gateway/server-node-events.ts +++ b/src/gateway/server-node-events.ts @@ -1,10 +1,10 @@ import { randomUUID } from "node:crypto"; +import type { NodeEvent, NodeEventContext } from "./server-node-events-types.js"; import { resolveSessionAgentId } from "../agents/agent-scope.js"; import { normalizeChannelId } from "../channels/plugins/index.js"; import { createOutboundSendDeps } from "../cli/outbound-send-deps.js"; import { agentCommand } from "../commands/agent.js"; import { loadConfig } from "../config/config.js"; -import { loadSessionStore } from "../config/sessions.js"; import { updateSessionStore } from "../config/sessions.js"; import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; @@ -14,7 +14,6 @@ import { normalizeMainKey } from "../routing/session-key.js"; import { defaultRuntime } from "../runtime.js"; import { parseMessageWithAttachments } from "./chat-attachments.js"; import { normalizeRpcAttachmentsToChatAttachments } from "./server-methods/attachment-normalize.js"; -import type { NodeEvent, NodeEventContext } from "./server-node-events-types.js"; import { loadSessionEntry, pruneLegacyStoreKeys, @@ -185,45 +184,6 @@ function queueSessionStoreTouch(params: { }); } -function resolveFallbackDeliveryRoute(params: { - storePath: LoadedSessionEntry["storePath"]; - preferredChannel?: string; -}): { channel?: string; to?: string } { - const { storePath, preferredChannel } = params; - if (!storePath) { - return {}; - } - - const targetChannel = preferredChannel?.trim().toLowerCase(); - const store = loadSessionStore(storePath); - const candidates = Object.values(store) - .filter((entry) => { - if (!entry || typeof entry !== "object") { - return false; - } - const channel = typeof entry.lastChannel === "string" ? entry.lastChannel.trim() : ""; - const to = typeof entry.lastTo === "string" ? entry.lastTo.trim() : ""; - if (!channel || !to) { - return false; - } - if (targetChannel && channel.toLowerCase() !== targetChannel) { - return false; - } - return true; - }) - .toSorted((a, b) => (b.updatedAt ?? 0) - (a.updatedAt ?? 0)); - - const winner = candidates[0]; - if (!winner) { - return {}; - } - - return { - channel: typeof winner.lastChannel === "string" ? winner.lastChannel.trim() : undefined, - to: typeof winner.lastTo === "string" ? winner.lastTo.trim() : undefined, - }; -} - function parseSessionKeyFromPayloadJSON(payloadJSON: string): string | null { let payload: unknown; try { @@ -394,7 +354,7 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt const channelRaw = typeof link?.channel === "string" ? link.channel.trim() : ""; let channel = normalizeChannelId(channelRaw) ?? undefined; let to = typeof link?.to === "string" && link.to.trim() ? link.to.trim() : undefined; - const deliver = Boolean(link?.deliver); + const deliverRequested = Boolean(link?.deliver); const wantsReceipt = Boolean(link?.receipt); const receiptTextRaw = typeof link?.receiptText === "string" ? link.receiptText.trim() : ""; const receiptText = @@ -408,7 +368,7 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt const sessionId = entry?.sessionId ?? randomUUID(); await touchSessionStore({ cfg, sessionKey, storePath, canonicalKey, entry, sessionId, now }); - if (deliver && (!channel || !to)) { + if (deliverRequested && (!channel || !to)) { const entryChannel = typeof entry?.lastChannel === "string" ? normalizeChannelId(entry.lastChannel) @@ -421,33 +381,30 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt to = entryTo; } } - if (deliver && (!channel || !to)) { - const fallback = resolveFallbackDeliveryRoute({ - storePath, - preferredChannel: channel ?? cfg.channels?.default ?? "telegram", - }); - if (!channel && fallback.channel) { - channel = normalizeChannelId(fallback.channel) ?? channel; - } - if (!to && fallback.to) { - to = fallback.to; - } + const deliver = deliverRequested && Boolean(channel && to); + const deliveryChannel = deliver ? channel : undefined; + const deliveryTo = deliver ? to : undefined; + + if (deliverRequested && !deliver) { + ctx.logGateway.warn( + `agent delivery disabled node=${nodeId}: missing session delivery route (channel=${channel ?? "-"} to=${to ?? "-"})`, + ); } - if (wantsReceipt && channel && to) { + if (wantsReceipt && deliveryChannel && deliveryTo) { void sendReceiptAck({ cfg, deps: ctx.deps, sessionKey: canonicalKey, - channel, - to, + channel: deliveryChannel, + to: deliveryTo, text: receiptText, }).catch((err) => { ctx.logGateway.warn(`agent receipt failed node=${nodeId}: ${formatForLog(err)}`); }); } else if (wantsReceipt) { ctx.logGateway.warn( - `agent receipt skipped node=${nodeId}: missing delivery route (channel=${channel ?? "-"} to=${to ?? "-"})`, + `agent receipt skipped node=${nodeId}: missing delivery route (channel=${deliveryChannel ?? "-"} to=${deliveryTo ?? "-"})`, ); } @@ -459,8 +416,8 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt sessionKey: canonicalKey, thinking: link?.thinking ?? undefined, deliver, - to, - channel, + to: deliveryTo, + channel: deliveryChannel, timeout: typeof link?.timeoutSeconds === "number" ? link.timeoutSeconds.toString() : undefined, messageChannel: "node",