mirror of
https://github.com/openclaw/openclaw.git
synced 2026-02-19 18:39:20 -05:00
fix(gateway): block cross-session fallback in node event delivery
This commit is contained in:
@@ -30,18 +30,20 @@ vi.mock("./session-utils.js", () => ({
|
||||
}));
|
||||
|
||||
import type { CliDeps } from "../cli/deps.js";
|
||||
import { agentCommand } from "../commands/agent.js";
|
||||
import type { HealthSummary } from "../commands/health.js";
|
||||
import type { NodeEventContext } from "./server-node-events-types.js";
|
||||
import { agentCommand } from "../commands/agent.js";
|
||||
import { updateSessionStore } from "../config/sessions.js";
|
||||
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
|
||||
import { enqueueSystemEvent } from "../infra/system-events.js";
|
||||
import type { NodeEventContext } from "./server-node-events-types.js";
|
||||
import { handleNodeEvent } from "./server-node-events.js";
|
||||
import { loadSessionEntry } from "./session-utils.js";
|
||||
|
||||
const enqueueSystemEventMock = vi.mocked(enqueueSystemEvent);
|
||||
const requestHeartbeatNowMock = vi.mocked(requestHeartbeatNow);
|
||||
const agentCommandMock = vi.mocked(agentCommand);
|
||||
const updateSessionStoreMock = vi.mocked(updateSessionStore);
|
||||
const loadSessionEntryMock = vi.mocked(loadSessionEntry);
|
||||
|
||||
function buildCtx(): NodeEventContext {
|
||||
return {
|
||||
@@ -267,3 +269,80 @@ describe("voice transcript events", () => {
|
||||
expect(warn).toHaveBeenCalledWith(expect.stringContaining("voice session-store update failed"));
|
||||
});
|
||||
});
|
||||
|
||||
describe("agent request events", () => {
|
||||
beforeEach(() => {
|
||||
agentCommandMock.mockReset();
|
||||
updateSessionStoreMock.mockReset();
|
||||
loadSessionEntryMock.mockReset();
|
||||
agentCommandMock.mockResolvedValue({ status: "ok" } as never);
|
||||
updateSessionStoreMock.mockImplementation(async (_storePath, update) => {
|
||||
update({});
|
||||
});
|
||||
loadSessionEntryMock.mockImplementation((sessionKey: string) => ({
|
||||
storePath: "/tmp/sessions.json",
|
||||
entry: { sessionId: `sid-${sessionKey}` },
|
||||
canonicalKey: sessionKey,
|
||||
}));
|
||||
});
|
||||
|
||||
it("disables delivery when route is unresolved instead of falling back globally", async () => {
|
||||
const warn = vi.fn();
|
||||
const ctx = buildCtx();
|
||||
ctx.logGateway = { warn };
|
||||
|
||||
await handleNodeEvent(ctx, "node-route-miss", {
|
||||
event: "agent.request",
|
||||
payloadJSON: JSON.stringify({
|
||||
message: "summarize this",
|
||||
sessionKey: "agent:main:main",
|
||||
deliver: true,
|
||||
}),
|
||||
});
|
||||
|
||||
expect(agentCommandMock).toHaveBeenCalledTimes(1);
|
||||
const [opts] = agentCommandMock.mock.calls[0] ?? [];
|
||||
expect(opts).toMatchObject({
|
||||
message: "summarize this",
|
||||
sessionKey: "agent:main:main",
|
||||
deliver: false,
|
||||
channel: undefined,
|
||||
to: undefined,
|
||||
});
|
||||
expect(warn).toHaveBeenCalledWith(
|
||||
expect.stringContaining("agent delivery disabled node=node-route-miss"),
|
||||
);
|
||||
});
|
||||
|
||||
it("reuses the current session route when delivery target is omitted", async () => {
|
||||
const ctx = buildCtx();
|
||||
loadSessionEntryMock.mockReturnValueOnce({
|
||||
storePath: "/tmp/sessions.json",
|
||||
entry: {
|
||||
sessionId: "sid-current",
|
||||
lastChannel: "telegram",
|
||||
lastTo: "123",
|
||||
},
|
||||
canonicalKey: "agent:main:main",
|
||||
});
|
||||
|
||||
await handleNodeEvent(ctx, "node-route-hit", {
|
||||
event: "agent.request",
|
||||
payloadJSON: JSON.stringify({
|
||||
message: "route on session",
|
||||
sessionKey: "agent:main:main",
|
||||
deliver: true,
|
||||
}),
|
||||
});
|
||||
|
||||
expect(agentCommandMock).toHaveBeenCalledTimes(1);
|
||||
const [opts] = agentCommandMock.mock.calls[0] ?? [];
|
||||
expect(opts).toMatchObject({
|
||||
message: "route on session",
|
||||
sessionKey: "agent:main:main",
|
||||
deliver: true,
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import type { NodeEvent, NodeEventContext } from "./server-node-events-types.js";
|
||||
import { resolveSessionAgentId } from "../agents/agent-scope.js";
|
||||
import { normalizeChannelId } from "../channels/plugins/index.js";
|
||||
import { createOutboundSendDeps } from "../cli/outbound-send-deps.js";
|
||||
import { agentCommand } from "../commands/agent.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import { loadSessionStore } from "../config/sessions.js";
|
||||
import { updateSessionStore } from "../config/sessions.js";
|
||||
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
|
||||
import { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
|
||||
@@ -14,7 +14,6 @@ import { normalizeMainKey } from "../routing/session-key.js";
|
||||
import { defaultRuntime } from "../runtime.js";
|
||||
import { parseMessageWithAttachments } from "./chat-attachments.js";
|
||||
import { normalizeRpcAttachmentsToChatAttachments } from "./server-methods/attachment-normalize.js";
|
||||
import type { NodeEvent, NodeEventContext } from "./server-node-events-types.js";
|
||||
import {
|
||||
loadSessionEntry,
|
||||
pruneLegacyStoreKeys,
|
||||
@@ -185,45 +184,6 @@ function queueSessionStoreTouch(params: {
|
||||
});
|
||||
}
|
||||
|
||||
function resolveFallbackDeliveryRoute(params: {
|
||||
storePath: LoadedSessionEntry["storePath"];
|
||||
preferredChannel?: string;
|
||||
}): { channel?: string; to?: string } {
|
||||
const { storePath, preferredChannel } = params;
|
||||
if (!storePath) {
|
||||
return {};
|
||||
}
|
||||
|
||||
const targetChannel = preferredChannel?.trim().toLowerCase();
|
||||
const store = loadSessionStore(storePath);
|
||||
const candidates = Object.values(store)
|
||||
.filter((entry) => {
|
||||
if (!entry || typeof entry !== "object") {
|
||||
return false;
|
||||
}
|
||||
const channel = typeof entry.lastChannel === "string" ? entry.lastChannel.trim() : "";
|
||||
const to = typeof entry.lastTo === "string" ? entry.lastTo.trim() : "";
|
||||
if (!channel || !to) {
|
||||
return false;
|
||||
}
|
||||
if (targetChannel && channel.toLowerCase() !== targetChannel) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
})
|
||||
.toSorted((a, b) => (b.updatedAt ?? 0) - (a.updatedAt ?? 0));
|
||||
|
||||
const winner = candidates[0];
|
||||
if (!winner) {
|
||||
return {};
|
||||
}
|
||||
|
||||
return {
|
||||
channel: typeof winner.lastChannel === "string" ? winner.lastChannel.trim() : undefined,
|
||||
to: typeof winner.lastTo === "string" ? winner.lastTo.trim() : undefined,
|
||||
};
|
||||
}
|
||||
|
||||
function parseSessionKeyFromPayloadJSON(payloadJSON: string): string | null {
|
||||
let payload: unknown;
|
||||
try {
|
||||
@@ -394,7 +354,7 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt
|
||||
const channelRaw = typeof link?.channel === "string" ? link.channel.trim() : "";
|
||||
let channel = normalizeChannelId(channelRaw) ?? undefined;
|
||||
let to = typeof link?.to === "string" && link.to.trim() ? link.to.trim() : undefined;
|
||||
const deliver = Boolean(link?.deliver);
|
||||
const deliverRequested = Boolean(link?.deliver);
|
||||
const wantsReceipt = Boolean(link?.receipt);
|
||||
const receiptTextRaw = typeof link?.receiptText === "string" ? link.receiptText.trim() : "";
|
||||
const receiptText =
|
||||
@@ -408,7 +368,7 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt
|
||||
const sessionId = entry?.sessionId ?? randomUUID();
|
||||
await touchSessionStore({ cfg, sessionKey, storePath, canonicalKey, entry, sessionId, now });
|
||||
|
||||
if (deliver && (!channel || !to)) {
|
||||
if (deliverRequested && (!channel || !to)) {
|
||||
const entryChannel =
|
||||
typeof entry?.lastChannel === "string"
|
||||
? normalizeChannelId(entry.lastChannel)
|
||||
@@ -421,33 +381,30 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt
|
||||
to = entryTo;
|
||||
}
|
||||
}
|
||||
if (deliver && (!channel || !to)) {
|
||||
const fallback = resolveFallbackDeliveryRoute({
|
||||
storePath,
|
||||
preferredChannel: channel ?? cfg.channels?.default ?? "telegram",
|
||||
});
|
||||
if (!channel && fallback.channel) {
|
||||
channel = normalizeChannelId(fallback.channel) ?? channel;
|
||||
}
|
||||
if (!to && fallback.to) {
|
||||
to = fallback.to;
|
||||
}
|
||||
const deliver = deliverRequested && Boolean(channel && to);
|
||||
const deliveryChannel = deliver ? channel : undefined;
|
||||
const deliveryTo = deliver ? to : undefined;
|
||||
|
||||
if (deliverRequested && !deliver) {
|
||||
ctx.logGateway.warn(
|
||||
`agent delivery disabled node=${nodeId}: missing session delivery route (channel=${channel ?? "-"} to=${to ?? "-"})`,
|
||||
);
|
||||
}
|
||||
|
||||
if (wantsReceipt && channel && to) {
|
||||
if (wantsReceipt && deliveryChannel && deliveryTo) {
|
||||
void sendReceiptAck({
|
||||
cfg,
|
||||
deps: ctx.deps,
|
||||
sessionKey: canonicalKey,
|
||||
channel,
|
||||
to,
|
||||
channel: deliveryChannel,
|
||||
to: deliveryTo,
|
||||
text: receiptText,
|
||||
}).catch((err) => {
|
||||
ctx.logGateway.warn(`agent receipt failed node=${nodeId}: ${formatForLog(err)}`);
|
||||
});
|
||||
} else if (wantsReceipt) {
|
||||
ctx.logGateway.warn(
|
||||
`agent receipt skipped node=${nodeId}: missing delivery route (channel=${channel ?? "-"} to=${to ?? "-"})`,
|
||||
`agent receipt skipped node=${nodeId}: missing delivery route (channel=${deliveryChannel ?? "-"} to=${deliveryTo ?? "-"})`,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -459,8 +416,8 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt
|
||||
sessionKey: canonicalKey,
|
||||
thinking: link?.thinking ?? undefined,
|
||||
deliver,
|
||||
to,
|
||||
channel,
|
||||
to: deliveryTo,
|
||||
channel: deliveryChannel,
|
||||
timeout:
|
||||
typeof link?.timeoutSeconds === "number" ? link.timeoutSeconds.toString() : undefined,
|
||||
messageChannel: "node",
|
||||
|
||||
Reference in New Issue
Block a user