From 0bf1b38cc0dbf8feffc4fb60298631661175ab97 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Tue, 17 Feb 2026 22:52:46 -0500 Subject: [PATCH] Agents: fix subagent completion thread routing --- ...gents.sessions-spawn.lifecycle.e2e.test.ts | 6 +-- ...subagents.sessions-spawn.model.e2e.test.ts | 2 +- .../sessions-spawn-threadid.e2e.test.ts | 31 ++++++------- .../subagent-announce.format.e2e.test.ts | 44 +++++++++++++++++-- src/agents/subagent-announce.ts | 5 +++ src/gateway/protocol/schema/agent.ts | 2 + src/gateway/server-methods/send.test.ts | 18 ++++++++ src/gateway/server-methods/send.ts | 7 +++ 8 files changed, 90 insertions(+), 25 deletions(-) diff --git a/src/agents/openclaw-tools.subagents.sessions-spawn.lifecycle.e2e.test.ts b/src/agents/openclaw-tools.subagents.sessions-spawn.lifecycle.e2e.test.ts index bc78429e5c..f45653d7bb 100644 --- a/src/agents/openclaw-tools.subagents.sessions-spawn.lifecycle.e2e.test.ts +++ b/src/agents/openclaw-tools.subagents.sessions-spawn.lifecycle.e2e.test.ts @@ -212,7 +212,7 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => { expect(send?.sessionKey).toBe("agent:main:main"); expect(send?.channel).toBe("whatsapp"); expect(send?.to).toBe("+123"); - expect(send?.message).toBe("done"); + expect(send?.message).toBe("✅ Subagent main finished\n\ndone"); expect(child.sessionKey?.startsWith("agent:main:subagent:")).toBe(true); }); @@ -297,7 +297,7 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => { expect(send?.sessionKey).toBe("agent:main:discord:group:req"); expect(send?.channel).toBe("discord"); expect(send?.to).toBe("discord:dm:u123"); - expect(send?.message).toContain("completed successfully"); + expect(send?.message).toBe("✅ Subagent main finished"); expect(deletedKey?.startsWith("agent:main:subagent:")).toBe(true); }); @@ -364,7 +364,7 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => { expect(send?.sessionKey).toBe("agent:main:discord:group:req"); expect(send?.channel).toBe("discord"); expect(send?.to).toBe("discord:dm:u123"); - expect(send?.message).toBe("done"); + expect(send?.message).toBe("✅ Subagent main finished\n\ndone"); // Session should be deleted expect(deletedKey?.startsWith("agent:main:subagent:")).toBe(true); diff --git a/src/agents/openclaw-tools.subagents.sessions-spawn.model.e2e.test.ts b/src/agents/openclaw-tools.subagents.sessions-spawn.model.e2e.test.ts index e81719c572..579f72f1c7 100644 --- a/src/agents/openclaw-tools.subagents.sessions-spawn.model.e2e.test.ts +++ b/src/agents/openclaw-tools.subagents.sessions-spawn.model.e2e.test.ts @@ -83,7 +83,7 @@ describe("openclaw-tools: subagents (sessions_spawn model + thinking)", () => { }); expect(result.details).toMatchObject({ status: "accepted", - note: "auto-announces on completion, do not poll", + note: "auto-announces on completion, do not poll/sleep. The response will be sent back as an agent message.", modelApplied: true, }); diff --git a/src/agents/sessions-spawn-threadid.e2e.test.ts b/src/agents/sessions-spawn-threadid.e2e.test.ts index 0b14533100..9f57566c49 100644 --- a/src/agents/sessions-spawn-threadid.e2e.test.ts +++ b/src/agents/sessions-spawn-threadid.e2e.test.ts @@ -1,10 +1,10 @@ import { beforeEach, describe, expect, it } from "vitest"; -import { createOpenClawTools } from "./openclaw-tools.js"; import "./test-helpers/fast-core-tools.js"; import { - callGatewayMock, - setSubagentsConfigOverride, -} from "./openclaw-tools.subagents.test-harness.js"; + getCallGatewayMock, + getSessionsSpawnTool, + setSessionsSpawnConfigOverride, +} from "./openclaw-tools.subagents.sessions-spawn.test-harness.js"; import { listSubagentRunsForRequester, resetSubagentRegistryForTests, @@ -12,9 +12,10 @@ import { describe("sessions_spawn requesterOrigin threading", () => { beforeEach(() => { + const callGatewayMock = getCallGatewayMock(); resetSubagentRegistryForTests(); callGatewayMock.mockReset(); - setSubagentsConfigOverride({ + setSessionsSpawnConfigOverride({ session: { mainKey: "main", scope: "per-sender", @@ -35,20 +36,18 @@ describe("sessions_spawn requesterOrigin threading", () => { }); it("captures threadId in requesterOrigin", async () => { - const tool = createOpenClawTools({ + const tool = await getSessionsSpawnTool({ agentSessionKey: "main", agentChannel: "telegram", agentTo: "telegram:123", agentThreadId: 42, - }).find((candidate) => candidate.name === "sessions_spawn"); - if (!tool) { - throw new Error("missing sessions_spawn tool"); - } + }); - await tool.execute("call", { + const result = await tool.execute("call", { task: "do thing", runTimeoutSeconds: 1, }); + expect(result.details).toMatchObject({ status: "accepted", runId: "run-1" }); const runs = listSubagentRunsForRequester("main"); expect(runs).toHaveLength(1); @@ -60,19 +59,17 @@ describe("sessions_spawn requesterOrigin threading", () => { }); it("stores requesterOrigin without threadId when none is provided", async () => { - const tool = createOpenClawTools({ + const tool = await getSessionsSpawnTool({ agentSessionKey: "main", agentChannel: "telegram", agentTo: "telegram:123", - }).find((candidate) => candidate.name === "sessions_spawn"); - if (!tool) { - throw new Error("missing sessions_spawn tool"); - } + }); - await tool.execute("call", { + const result = await tool.execute("call", { task: "do thing", runTimeoutSeconds: 1, }); + expect(result.details).toMatchObject({ status: "accepted", runId: "run-1" }); const runs = listSubagentRunsForRequester("main"); expect(runs).toHaveLength(1); diff --git a/src/agents/subagent-announce.format.e2e.test.ts b/src/agents/subagent-announce.format.e2e.test.ts index 6dc8bf6a39..7ee781a74f 100644 --- a/src/agents/subagent-announce.format.e2e.test.ts +++ b/src/agents/subagent-announce.format.e2e.test.ts @@ -372,11 +372,8 @@ describe("subagent announce formatting", () => { expect(call?.params?.channel).toBe("discord"); expect(call?.params?.to).toBe("channel:12345"); expect(call?.params?.sessionKey).toBe("agent:main:main"); - expect(msg).toContain("[System Message]"); - expect(msg).toContain('subagent task "do thing"'); - expect(msg).toContain("Result:"); + expect(msg).toContain("✅ Subagent main finished"); expect(msg).toContain("final answer: 2"); - expect(msg).toContain("Stats:"); expect(msg).not.toContain("Convert the result above into your normal assistant voice"); }); @@ -413,6 +410,45 @@ describe("subagent announce formatting", () => { const call = sendSpy.mock.calls[0]?.[0] as { params?: Record }; expect(call?.params?.channel).toBe("discord"); expect(call?.params?.to).toBe("channel:12345"); + expect(call?.params?.threadId).toBeUndefined(); + }); + + it("passes requesterOrigin.threadId for manual completion direct-send", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + sessionStore = { + "agent:main:subagent:test": { + sessionId: "child-session-direct-thread-pass", + }, + "agent:main:main": { + sessionId: "requester-session-thread-pass", + }, + }; + chatHistoryMock.mockResolvedValueOnce({ + messages: [{ role: "assistant", content: [{ type: "text", text: "done" }] }], + }); + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:test", + childRunId: "run-direct-thread-pass", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + requesterOrigin: { + channel: "discord", + to: "channel:12345", + accountId: "acct-1", + threadId: 99, + }, + ...defaultOutcomeAnnounce, + expectsCompletionMessage: true, + }); + + expect(didAnnounce).toBe(true); + expect(sendSpy).toHaveBeenCalledTimes(1); + expect(agentSpy).not.toHaveBeenCalled(); + const call = sendSpy.mock.calls[0]?.[0] as { params?: Record }; + expect(call?.params?.channel).toBe("discord"); + expect(call?.params?.to).toBe("channel:12345"); + expect(call?.params?.threadId).toBe("99"); }); it("steers announcements into an active run when queue mode is steer", async () => { diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index f44d3034bf..6582478f83 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -463,12 +463,17 @@ async function sendSubagentAnnounceDirectly(params: { hasCompletionDirectTarget && params.completionMessage?.trim() ) { + const completionThreadId = + completionDirectOrigin?.threadId != null && completionDirectOrigin.threadId !== "" + ? String(completionDirectOrigin.threadId) + : undefined; await callGateway({ method: "send", params: { channel: completionChannel, to: completionTo, accountId: completionDirectOrigin?.accountId, + threadId: completionThreadId, sessionKey: canonicalRequesterSessionKey, message: params.completionMessage, idempotencyKey: params.directIdempotencyKey, diff --git a/src/gateway/protocol/schema/agent.ts b/src/gateway/protocol/schema/agent.ts index 9eba8b8359..41a13855bf 100644 --- a/src/gateway/protocol/schema/agent.ts +++ b/src/gateway/protocol/schema/agent.ts @@ -22,6 +22,8 @@ export const SendParamsSchema = Type.Object( gifPlayback: Type.Optional(Type.Boolean()), channel: Type.Optional(Type.String()), accountId: Type.Optional(Type.String()), + /** Thread id (channel-specific meaning, e.g. Telegram forum topic id). */ + threadId: Type.Optional(Type.String()), /** Optional session key for mirroring delivered output back into the transcript. */ sessionKey: Type.Optional(Type.String()), idempotencyKey: NonEmptyString, diff --git a/src/gateway/server-methods/send.test.ts b/src/gateway/server-methods/send.test.ts index 167132ccad..be4dc69003 100644 --- a/src/gateway/server-methods/send.test.ts +++ b/src/gateway/server-methods/send.test.ts @@ -235,4 +235,22 @@ describe("gateway send mirroring", () => { }), ); }); + + it("forwards threadId to outbound delivery when provided", async () => { + mocks.deliverOutboundPayloads.mockResolvedValue([{ messageId: "m-thread", channel: "slack" }]); + + await runSend({ + to: "channel:C1", + message: "hi", + channel: "slack", + threadId: "1710000000.9999", + idempotencyKey: "idem-thread", + }); + + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + threadId: "1710000000.9999", + }), + ); + }); }); diff --git a/src/gateway/server-methods/send.ts b/src/gateway/server-methods/send.ts index 550839acdb..527eec4248 100644 --- a/src/gateway/server-methods/send.ts +++ b/src/gateway/server-methods/send.ts @@ -64,6 +64,7 @@ export const sendHandlers: GatewayRequestHandlers = { gifPlayback?: boolean; channel?: string; accountId?: string; + threadId?: string; sessionKey?: string; idempotencyKey: string; }; @@ -130,6 +131,10 @@ export const sendHandlers: GatewayRequestHandlers = { typeof request.accountId === "string" && request.accountId.trim().length ? request.accountId.trim() : undefined; + const threadId = + typeof request.threadId === "string" && request.threadId.trim().length + ? request.threadId.trim() + : undefined; const outboundChannel = channel; const plugin = getChannelPlugin(channel); if (!plugin) { @@ -182,6 +187,7 @@ export const sendHandlers: GatewayRequestHandlers = { agentId: derivedAgentId, accountId, target: resolved.to, + threadId, }) : null; if (derivedRoute) { @@ -203,6 +209,7 @@ export const sendHandlers: GatewayRequestHandlers = { ? resolveSessionAgentId({ sessionKey: providedSessionKey, config: cfg }) : derivedAgentId, gifPlayback: request.gifPlayback, + threadId: threadId ?? null, deps: outboundDeps, mirror: providedSessionKey ? {