diff --git a/CHANGELOG.md b/CHANGELOG.md index 56a5d758c4..98e88317ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Docs: https://docs.openclaw.ai - Skills: remove duplicate `local-places` Google Places skill/proxy and keep `goplaces` as the single supported Google Places path. - Discord: send voice messages with waveform previews from local audio files (including silent delivery). (#7253) Thanks @nyanjou. - Discord: add configurable presence status/activity/type/url (custom status defaults to activity text). (#10855) Thanks @h0tp-ftw. +- Slack/Plugins: add thread-ownership outbound gating via `message_sending` hooks, including @-mention bypass tracking and Slack outbound hook wiring for cancel/modify behavior. (#15775) Thanks @DarlingtonDeveloper. ### Fixes diff --git a/extensions/thread-ownership/index.test.ts b/extensions/thread-ownership/index.test.ts new file mode 100644 index 0000000000..3690938a1b --- /dev/null +++ b/extensions/thread-ownership/index.test.ts @@ -0,0 +1,180 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import register from "./index.js"; + +describe("thread-ownership plugin", () => { + const hooks: Record = {}; + const api = { + pluginConfig: {}, + config: { + agents: { + list: [{ id: "test-agent", default: true, identity: { name: "TestBot" } }], + }, + }, + id: "thread-ownership", + name: "Thread Ownership", + logger: { info: vi.fn(), warn: vi.fn(), debug: vi.fn() }, + on: vi.fn((hookName: string, handler: Function) => { + hooks[hookName] = handler; + }), + }; + + let originalFetch: typeof globalThis.fetch; + + beforeEach(() => { + vi.clearAllMocks(); + for (const key of Object.keys(hooks)) delete hooks[key]; + + process.env.SLACK_FORWARDER_URL = "http://localhost:8750"; + process.env.SLACK_BOT_USER_ID = "U999"; + + originalFetch = globalThis.fetch; + globalThis.fetch = vi.fn(); + }); + + afterEach(() => { + globalThis.fetch = originalFetch; + delete process.env.SLACK_FORWARDER_URL; + delete process.env.SLACK_BOT_USER_ID; + vi.restoreAllMocks(); + }); + + it("registers message_received and message_sending hooks", () => { + register(api as any); + + expect(api.on).toHaveBeenCalledTimes(2); + expect(api.on).toHaveBeenCalledWith("message_received", expect.any(Function)); + expect(api.on).toHaveBeenCalledWith("message_sending", expect.any(Function)); + }); + + describe("message_sending", () => { + beforeEach(() => { + register(api as any); + }); + + it("allows non-slack channels", async () => { + const result = await hooks.message_sending( + { content: "hello", metadata: { threadTs: "1234.5678", channelId: "C123" }, to: "C123" }, + { channelId: "discord", conversationId: "C123" }, + ); + + expect(result).toBeUndefined(); + expect(globalThis.fetch).not.toHaveBeenCalled(); + }); + + it("allows top-level messages (no threadTs)", async () => { + const result = await hooks.message_sending( + { content: "hello", metadata: {}, to: "C123" }, + { channelId: "slack", conversationId: "C123" }, + ); + + expect(result).toBeUndefined(); + expect(globalThis.fetch).not.toHaveBeenCalled(); + }); + + it("claims ownership successfully", async () => { + vi.mocked(globalThis.fetch).mockResolvedValue( + new Response(JSON.stringify({ owner: "test-agent" }), { status: 200 }), + ); + + const result = await hooks.message_sending( + { content: "hello", metadata: { threadTs: "1234.5678", channelId: "C123" }, to: "C123" }, + { channelId: "slack", conversationId: "C123" }, + ); + + expect(result).toBeUndefined(); + expect(globalThis.fetch).toHaveBeenCalledWith( + "http://localhost:8750/api/v1/ownership/C123/1234.5678", + expect.objectContaining({ + method: "POST", + body: JSON.stringify({ agent_id: "test-agent" }), + }), + ); + }); + + it("cancels when thread owned by another agent", async () => { + vi.mocked(globalThis.fetch).mockResolvedValue( + new Response(JSON.stringify({ owner: "other-agent" }), { status: 409 }), + ); + + const result = await hooks.message_sending( + { content: "hello", metadata: { threadTs: "1234.5678", channelId: "C123" }, to: "C123" }, + { channelId: "slack", conversationId: "C123" }, + ); + + expect(result).toEqual({ cancel: true }); + expect(api.logger.info).toHaveBeenCalledWith(expect.stringContaining("cancelled send")); + }); + + it("fails open on network error", async () => { + vi.mocked(globalThis.fetch).mockRejectedValue(new Error("ECONNREFUSED")); + + const result = await hooks.message_sending( + { content: "hello", metadata: { threadTs: "1234.5678", channelId: "C123" }, to: "C123" }, + { channelId: "slack", conversationId: "C123" }, + ); + + expect(result).toBeUndefined(); + expect(api.logger.warn).toHaveBeenCalledWith( + expect.stringContaining("ownership check failed"), + ); + }); + }); + + describe("message_received @-mention tracking", () => { + beforeEach(() => { + register(api as any); + }); + + it("tracks @-mentions and skips ownership check for mentioned threads", async () => { + // Simulate receiving a message that @-mentions the agent. + await hooks.message_received( + { content: "Hey @TestBot help me", metadata: { threadTs: "9999.0001", channelId: "C456" } }, + { channelId: "slack", conversationId: "C456" }, + ); + + // Now send in the same thread -- should skip the ownership HTTP call. + const result = await hooks.message_sending( + { content: "Sure!", metadata: { threadTs: "9999.0001", channelId: "C456" }, to: "C456" }, + { channelId: "slack", conversationId: "C456" }, + ); + + expect(result).toBeUndefined(); + expect(globalThis.fetch).not.toHaveBeenCalled(); + }); + + it("ignores @-mentions on non-slack channels", async () => { + // Use a unique thread key so module-level state from other tests doesn't interfere. + await hooks.message_received( + { content: "Hey @TestBot", metadata: { threadTs: "7777.0001", channelId: "C999" } }, + { channelId: "discord", conversationId: "C999" }, + ); + + // The mention should not have been tracked, so sending should still call fetch. + vi.mocked(globalThis.fetch).mockResolvedValue( + new Response(JSON.stringify({ owner: "test-agent" }), { status: 200 }), + ); + + await hooks.message_sending( + { content: "Sure!", metadata: { threadTs: "7777.0001", channelId: "C999" }, to: "C999" }, + { channelId: "slack", conversationId: "C999" }, + ); + + expect(globalThis.fetch).toHaveBeenCalled(); + }); + + it("tracks bot user ID mentions via <@U999> syntax", async () => { + await hooks.message_received( + { content: "Hey <@U999> help", metadata: { threadTs: "8888.0001", channelId: "C789" } }, + { channelId: "slack", conversationId: "C789" }, + ); + + const result = await hooks.message_sending( + { content: "On it!", metadata: { threadTs: "8888.0001", channelId: "C789" }, to: "C789" }, + { channelId: "slack", conversationId: "C789" }, + ); + + expect(result).toBeUndefined(); + expect(globalThis.fetch).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/extensions/thread-ownership/index.ts b/extensions/thread-ownership/index.ts new file mode 100644 index 0000000000..3db1ea94ff --- /dev/null +++ b/extensions/thread-ownership/index.ts @@ -0,0 +1,133 @@ +import type { OpenClawConfig, OpenClawPluginApi } from "openclaw/plugin-sdk"; + +type ThreadOwnershipConfig = { + forwarderUrl?: string; + abTestChannels?: string[]; +}; + +type AgentEntry = NonNullable["list"]>[number]; + +// In-memory set of {channel}:{thread} keys where this agent was @-mentioned. +// Entries expire after 5 minutes. +const mentionedThreads = new Map(); +const MENTION_TTL_MS = 5 * 60 * 1000; + +function cleanExpiredMentions(): void { + const now = Date.now(); + for (const [key, ts] of mentionedThreads) { + if (now - ts > MENTION_TTL_MS) { + mentionedThreads.delete(key); + } + } +} + +function resolveOwnershipAgent(config: OpenClawConfig): { id: string; name: string } { + const list = Array.isArray(config.agents?.list) + ? config.agents.list.filter((entry): entry is AgentEntry => + Boolean(entry && typeof entry === "object"), + ) + : []; + const selected = list.find((entry) => entry.default === true) ?? list[0]; + + const id = + typeof selected?.id === "string" && selected.id.trim() ? selected.id.trim() : "unknown"; + const identityName = + typeof selected?.identity?.name === "string" ? selected.identity.name.trim() : ""; + const fallbackName = typeof selected?.name === "string" ? selected.name.trim() : ""; + const name = identityName || fallbackName; + + return { id, name }; +} + +export default function register(api: OpenClawPluginApi) { + const pluginCfg = (api.pluginConfig ?? {}) as ThreadOwnershipConfig; + const forwarderUrl = ( + pluginCfg.forwarderUrl ?? + process.env.SLACK_FORWARDER_URL ?? + "http://slack-forwarder:8750" + ).replace(/\/$/, ""); + + const abTestChannels = new Set( + pluginCfg.abTestChannels ?? + process.env.THREAD_OWNERSHIP_CHANNELS?.split(",").filter(Boolean) ?? + [], + ); + + const { id: agentId, name: agentName } = resolveOwnershipAgent(api.config); + const botUserId = process.env.SLACK_BOT_USER_ID ?? ""; + + // --------------------------------------------------------------------------- + // message_received: track @-mentions so the agent can reply even if it + // doesn't own the thread. + // --------------------------------------------------------------------------- + api.on("message_received", async (event, ctx) => { + if (ctx.channelId !== "slack") return; + + const text = event.content ?? ""; + const threadTs = (event.metadata?.threadTs as string) ?? ""; + const channelId = (event.metadata?.channelId as string) ?? ctx.conversationId ?? ""; + + if (!threadTs || !channelId) return; + + // Check if this agent was @-mentioned. + const mentioned = + (agentName && text.includes(`@${agentName}`)) || + (botUserId && text.includes(`<@${botUserId}>`)); + + if (mentioned) { + cleanExpiredMentions(); + mentionedThreads.set(`${channelId}:${threadTs}`, Date.now()); + } + }); + + // --------------------------------------------------------------------------- + // message_sending: check thread ownership before sending to Slack. + // Returns { cancel: true } if another agent owns the thread. + // --------------------------------------------------------------------------- + api.on("message_sending", async (event, ctx) => { + if (ctx.channelId !== "slack") return; + + const threadTs = (event.metadata?.threadTs as string) ?? ""; + const channelId = (event.metadata?.channelId as string) ?? event.to; + + // Top-level messages (no thread) are always allowed. + if (!threadTs) return; + + // Only enforce in A/B test channels (if set is empty, skip entirely). + if (abTestChannels.size > 0 && !abTestChannels.has(channelId)) return; + + // If this agent was @-mentioned in this thread recently, skip ownership check. + cleanExpiredMentions(); + if (mentionedThreads.has(`${channelId}:${threadTs}`)) return; + + // Try to claim ownership via the forwarder HTTP API. + try { + const resp = await fetch(`${forwarderUrl}/api/v1/ownership/${channelId}/${threadTs}`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ agent_id: agentId }), + signal: AbortSignal.timeout(3000), + }); + + if (resp.ok) { + // We own it (or just claimed it), proceed. + return; + } + + if (resp.status === 409) { + // Another agent owns this thread — cancel the send. + const body = (await resp.json()) as { owner?: string }; + api.logger.info?.( + `thread-ownership: cancelled send to ${channelId}:${threadTs} — owned by ${body.owner}`, + ); + return { cancel: true }; + } + + // Unexpected status — fail open. + api.logger.warn?.(`thread-ownership: unexpected status ${resp.status}, allowing send`); + } catch (err) { + // Network error — fail open. + api.logger.warn?.(`thread-ownership: ownership check failed (${String(err)}), allowing send`); + } + }); +} diff --git a/extensions/thread-ownership/openclaw.plugin.json b/extensions/thread-ownership/openclaw.plugin.json new file mode 100644 index 0000000000..2e020bdade --- /dev/null +++ b/extensions/thread-ownership/openclaw.plugin.json @@ -0,0 +1,28 @@ +{ + "id": "thread-ownership", + "name": "Thread Ownership", + "description": "Prevents multiple agents from responding in the same Slack thread. Uses HTTP calls to the slack-forwarder ownership API.", + "configSchema": { + "type": "object", + "additionalProperties": false, + "properties": { + "forwarderUrl": { + "type": "string" + }, + "abTestChannels": { + "type": "array", + "items": { "type": "string" } + } + } + }, + "uiHints": { + "forwarderUrl": { + "label": "Forwarder URL", + "help": "Base URL of the slack-forwarder ownership API (default: http://slack-forwarder:8750)" + }, + "abTestChannels": { + "label": "A/B Test Channels", + "help": "Slack channel IDs where thread ownership is enforced" + } + } +} diff --git a/src/channels/plugins/outbound/slack.test.ts b/src/channels/plugins/outbound/slack.test.ts new file mode 100644 index 0000000000..08863d24b7 --- /dev/null +++ b/src/channels/plugins/outbound/slack.test.ts @@ -0,0 +1,124 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +vi.mock("../../../slack/send.js", () => ({ + sendMessageSlack: vi.fn().mockResolvedValue({ ts: "1234.5678", channel: "C123" }), +})); + +vi.mock("../../../plugins/hook-runner-global.js", () => ({ + getGlobalHookRunner: vi.fn(), +})); + +import { getGlobalHookRunner } from "../../../plugins/hook-runner-global.js"; +import { sendMessageSlack } from "../../../slack/send.js"; +import { slackOutbound } from "./slack.js"; + +describe("slack outbound hook wiring", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it("calls send without hooks when no hooks registered", async () => { + vi.mocked(getGlobalHookRunner).mockReturnValue(null); + + await slackOutbound.sendText({ + to: "C123", + text: "hello", + accountId: "default", + replyToId: "1111.2222", + }); + + expect(sendMessageSlack).toHaveBeenCalledWith("C123", "hello", { + threadTs: "1111.2222", + accountId: "default", + }); + }); + + it("calls message_sending hook before sending", async () => { + const mockRunner = { + hasHooks: vi.fn().mockReturnValue(true), + runMessageSending: vi.fn().mockResolvedValue(undefined), + }; + // oxlint-disable-next-line typescript/no-explicit-any + vi.mocked(getGlobalHookRunner).mockReturnValue(mockRunner as any); + + await slackOutbound.sendText({ + to: "C123", + text: "hello", + accountId: "default", + replyToId: "1111.2222", + }); + + expect(mockRunner.hasHooks).toHaveBeenCalledWith("message_sending"); + expect(mockRunner.runMessageSending).toHaveBeenCalledWith( + { to: "C123", content: "hello", metadata: { threadTs: "1111.2222", channelId: "C123" } }, + { channelId: "slack", accountId: "default" }, + ); + expect(sendMessageSlack).toHaveBeenCalledWith("C123", "hello", { + threadTs: "1111.2222", + accountId: "default", + }); + }); + + it("cancels send when hook returns cancel:true", async () => { + const mockRunner = { + hasHooks: vi.fn().mockReturnValue(true), + runMessageSending: vi.fn().mockResolvedValue({ cancel: true }), + }; + // oxlint-disable-next-line typescript/no-explicit-any + vi.mocked(getGlobalHookRunner).mockReturnValue(mockRunner as any); + + const result = await slackOutbound.sendText({ + to: "C123", + text: "hello", + accountId: "default", + replyToId: "1111.2222", + }); + + expect(sendMessageSlack).not.toHaveBeenCalled(); + expect(result.channel).toBe("slack"); + }); + + it("modifies text when hook returns content", async () => { + const mockRunner = { + hasHooks: vi.fn().mockReturnValue(true), + runMessageSending: vi.fn().mockResolvedValue({ content: "modified" }), + }; + // oxlint-disable-next-line typescript/no-explicit-any + vi.mocked(getGlobalHookRunner).mockReturnValue(mockRunner as any); + + await slackOutbound.sendText({ + to: "C123", + text: "original", + accountId: "default", + replyToId: "1111.2222", + }); + + expect(sendMessageSlack).toHaveBeenCalledWith("C123", "modified", { + threadTs: "1111.2222", + accountId: "default", + }); + }); + + it("skips hooks when runner has no message_sending hooks", async () => { + const mockRunner = { + hasHooks: vi.fn().mockReturnValue(false), + runMessageSending: vi.fn(), + }; + // oxlint-disable-next-line typescript/no-explicit-any + vi.mocked(getGlobalHookRunner).mockReturnValue(mockRunner as any); + + await slackOutbound.sendText({ + to: "C123", + text: "hello", + accountId: "default", + replyToId: "1111.2222", + }); + + expect(mockRunner.runMessageSending).not.toHaveBeenCalled(); + expect(sendMessageSlack).toHaveBeenCalled(); + }); +}); diff --git a/src/channels/plugins/outbound/slack.ts b/src/channels/plugins/outbound/slack.ts index 08d27bd707..dde9624553 100644 --- a/src/channels/plugins/outbound/slack.ts +++ b/src/channels/plugins/outbound/slack.ts @@ -1,4 +1,5 @@ import type { ChannelOutboundAdapter } from "../types.js"; +import { getGlobalHookRunner } from "../../../plugins/hook-runner-global.js"; import { sendMessageSlack } from "../../../slack/send.js"; export const slackOutbound: ChannelOutboundAdapter = { @@ -9,7 +10,29 @@ export const slackOutbound: ChannelOutboundAdapter = { const send = deps?.sendSlack ?? sendMessageSlack; // Use threadId fallback so routed tool notifications stay in the Slack thread. const threadTs = replyToId ?? (threadId != null ? String(threadId) : undefined); - const result = await send(to, text, { + let finalText = text; + + // Run message_sending hooks (e.g. thread-ownership can cancel the send). + const hookRunner = getGlobalHookRunner(); + if (hookRunner?.hasHooks("message_sending")) { + const hookResult = await hookRunner.runMessageSending( + { to, content: text, metadata: { threadTs, channelId: to } }, + { channelId: "slack", accountId: accountId ?? undefined }, + ); + if (hookResult?.cancel) { + return { + channel: "slack", + messageId: "cancelled-by-hook", + channelId: to, + meta: { cancelled: true }, + }; + } + if (hookResult?.content) { + finalText = hookResult.content; + } + } + + const result = await send(to, finalText, { threadTs, accountId: accountId ?? undefined, }); @@ -19,7 +42,29 @@ export const slackOutbound: ChannelOutboundAdapter = { const send = deps?.sendSlack ?? sendMessageSlack; // Use threadId fallback so routed tool notifications stay in the Slack thread. const threadTs = replyToId ?? (threadId != null ? String(threadId) : undefined); - const result = await send(to, text, { + let finalText = text; + + // Run message_sending hooks (e.g. thread-ownership can cancel the send). + const hookRunner = getGlobalHookRunner(); + if (hookRunner?.hasHooks("message_sending")) { + const hookResult = await hookRunner.runMessageSending( + { to, content: text, metadata: { threadTs, channelId: to, mediaUrl } }, + { channelId: "slack", accountId: accountId ?? undefined }, + ); + if (hookResult?.cancel) { + return { + channel: "slack", + messageId: "cancelled-by-hook", + channelId: to, + meta: { cancelled: true }, + }; + } + if (hookResult?.content) { + finalText = hookResult.content; + } + } + + const result = await send(to, finalText, { mediaUrl, threadTs, accountId: accountId ?? undefined,