diff --git a/CHANGELOG.md b/CHANGELOG.md index 1db8fea7a9..71ee4c64e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,8 @@ Docs: https://docs.openclaw.ai - Agents/Tools: strip duplicated `read` truncation payloads from tool-result `details` and make pre-call context guarding account for heavy tool-result metadata, so repeated `read` calls no longer bypass compaction and overflow model context windows. Thanks @tyler6204. - Reply threading: keep reply context sticky across streamed/split chunks and preserve `replyToId` on all chunk sends across shared and channel-specific delivery paths (including iMessage, BlueBubbles, Telegram, Discord, and Matrix), so follow-up bubbles stay attached to the same referenced message. Thanks @tyler6204. - Gateway/Agent: defer transient lifecycle `error` snapshots with a short grace window so `agent.wait` does not resolve early during retry/failover. Thanks @tyler6204. +- Hooks/Automation: bridge outbound/inbound message lifecycle into internal hook events (`message:received`, `message:sent`) with session-key correlation guards, while keeping per-payload success/error reporting accurate for chunked and best-effort deliveries. (PR #9387) +- Media understanding: honor `agents.defaults.imageModel` during auto-discovery so implicit image analysis uses configured primary/fallback image models. (PR #7607) - iOS/Onboarding: stop auth Step 3 retry-loop churn by pausing reconnect attempts on unauthorized/missing-token gateway errors and keeping auth/pairing issue state sticky during manual retry. (#19153) Thanks @mbelinky. - Voice-call: auto-end calls when media streams disconnect to prevent stuck active calls. (#18435) Thanks @JayMishra-source. - Voice call/Gateway: prevent overlapping closed-loop turn races with per-call turn locking, route transcript dedupe via source-aware fingerprints with strict cache eviction bounds, and harden `voicecall latency` stats for large logs without spread-operator stack overflow. (#19140) Thanks @mbelinky. diff --git a/docs/automation/hooks.md b/docs/automation/hooks.md index ffdf32ab79..55c04e9990 100644 --- a/docs/automation/hooks.md +++ b/docs/automation/hooks.md @@ -207,12 +207,13 @@ Each event includes: ```typescript { - type: 'command' | 'session' | 'agent' | 'gateway', - action: string, // e.g., 'new', 'reset', 'stop' + type: 'command' | 'session' | 'agent' | 'gateway' | 'message', + action: string, // e.g., 'new', 'reset', 'stop', 'received', 'sent' sessionKey: string, // Session identifier timestamp: Date, // When the event occurred messages: string[], // Push messages here to send to user context: { + // Command events: sessionEntry?: SessionEntry, sessionId?: string, sessionFile?: string, @@ -220,7 +221,13 @@ Each event includes: senderId?: string, workspaceDir?: string, bootstrapFiles?: WorkspaceBootstrapFile[], - cfg?: OpenClawConfig + cfg?: OpenClawConfig, + // Message events (see Message Events section for full details): + from?: string, // message:received + to?: string, // message:sent + content?: string, + channelId?: string, + success?: boolean, // message:sent } } ``` @@ -246,6 +253,70 @@ Triggered when the gateway starts: - **`gateway:startup`**: After channels start and hooks are loaded +### Message Events + +Triggered when messages are received or sent: + +- **`message`**: All message events (general listener) +- **`message:received`**: When an inbound message is received from any channel +- **`message:sent`**: When an outbound message is successfully sent + +#### Message Event Context + +Message events include rich context about the message: + +```typescript +// message:received context +{ + from: string, // Sender identifier (phone number, user ID, etc.) + content: string, // Message content + timestamp?: number, // Unix timestamp when received + channelId: string, // Channel (e.g., "whatsapp", "telegram", "discord") + accountId?: string, // Provider account ID for multi-account setups + conversationId?: string, // Chat/conversation ID + messageId?: string, // Message ID from the provider + metadata?: { // Additional provider-specific data + to?: string, + provider?: string, + surface?: string, + threadId?: string, + senderId?: string, + senderName?: string, + senderUsername?: string, + senderE164?: string, + } +} + +// message:sent context +{ + to: string, // Recipient identifier + content: string, // Message content that was sent + success: boolean, // Whether the send succeeded + error?: string, // Error message if sending failed + channelId: string, // Channel (e.g., "whatsapp", "telegram", "discord") + accountId?: string, // Provider account ID + conversationId?: string, // Chat/conversation ID + messageId?: string, // Message ID returned by the provider +} +``` + +#### Example: Message Logger Hook + +```typescript +import type { HookHandler } from "../../src/hooks/hooks.js"; +import { isMessageReceivedEvent, isMessageSentEvent } from "../../src/hooks/internal-hooks.js"; + +const handler: HookHandler = async (event) => { + if (isMessageReceivedEvent(event)) { + console.log(`[message-logger] Received from ${event.context.from}: ${event.context.content}`); + } else if (isMessageSentEvent(event)) { + console.log(`[message-logger] Sent to ${event.context.to}: ${event.context.content}`); + } +}; + +export default handler; +``` + ### Tool Result Hooks (Plugin API) These hooks are not event-stream listeners; they let plugins synchronously adjust tool results before OpenClaw persists them. @@ -259,8 +330,6 @@ Planned event types: - **`session:start`**: When a new session begins - **`session:end`**: When a session ends - **`agent:error`**: When an agent encounters an error -- **`message:sent`**: When a message is sent -- **`message:received`**: When a message is received ## Creating Custom Hooks diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index e8f8ccbf79..c7dfe5d3a7 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -25,6 +25,19 @@ const hookMocks = vi.hoisted(() => ({ runMessageReceived: vi.fn(async () => {}), }, })); +const internalHookMocks = vi.hoisted(() => ({ + createInternalHookEvent: vi.fn( + (type: string, action: string, sessionKey: string, context: Record) => ({ + type, + action, + sessionKey, + context, + timestamp: new Date(), + messages: [], + }), + ), + triggerInternalHook: vi.fn(async () => {}), +})); vi.mock("./route-reply.js", () => ({ isRoutableChannel: (channel: string | undefined) => @@ -55,6 +68,10 @@ vi.mock("../../logging/diagnostic.js", () => ({ vi.mock("../../plugins/hook-runner-global.js", () => ({ getGlobalHookRunner: () => hookMocks.runner, })); +vi.mock("../../hooks/internal-hooks.js", () => ({ + createInternalHookEvent: internalHookMocks.createInternalHookEvent, + triggerInternalHook: internalHookMocks.triggerInternalHook, +})); const { dispatchReplyFromConfig } = await import("./dispatch-from-config.js"); const { resetInboundDedupe } = await import("./inbound-dedupe.js"); @@ -104,6 +121,8 @@ describe("dispatchReplyFromConfig", () => { hookMocks.runner.hasHooks.mockReset(); hookMocks.runner.hasHooks.mockReturnValue(false); hookMocks.runner.runMessageReceived.mockReset(); + internalHookMocks.createInternalHookEvent.mockClear(); + internalHookMocks.triggerInternalHook.mockClear(); }); it("does not route when Provider matches OriginatingChannel (even if Surface is missing)", async () => { setNoAbort(); @@ -423,6 +442,53 @@ describe("dispatchReplyFromConfig", () => { ); }); + it("emits internal message:received hook when a session key is available", async () => { + setNoAbort(); + const cfg = emptyConfig; + const dispatcher = createDispatcher(); + const ctx = buildTestCtx({ + Provider: "telegram", + Surface: "telegram", + SessionKey: "agent:main:main", + CommandBody: "/help", + MessageSid: "msg-42", + }); + + const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload; + await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); + + expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledWith( + "message", + "received", + "agent:main:main", + expect.objectContaining({ + from: ctx.From, + content: "/help", + channelId: "telegram", + messageId: "msg-42", + }), + ); + expect(internalHookMocks.triggerInternalHook).toHaveBeenCalledTimes(1); + }); + + it("skips internal message:received hook when session key is unavailable", async () => { + setNoAbort(); + const cfg = emptyConfig; + const dispatcher = createDispatcher(); + const ctx = buildTestCtx({ + Provider: "telegram", + Surface: "telegram", + CommandBody: "/help", + }); + (ctx as MsgContext).SessionKey = undefined; + + const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload; + await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); + + expect(internalHookMocks.createInternalHookEvent).not.toHaveBeenCalled(); + expect(internalHookMocks.triggerInternalHook).not.toHaveBeenCalled(); + }); + it("emits diagnostics when enabled", async () => { setNoAbort(); const cfg = { diagnostics: { enabled: true } } as OpenClawConfig; diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 0b8da28cc9..5965594cbd 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -1,7 +1,11 @@ -import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import type { OpenClawConfig } from "../../config/config.js"; +import type { FinalizedMsgContext } from "../templating.js"; +import type { GetReplyOptions, ReplyPayload } from "../types.js"; +import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js"; +import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import { loadSessionStore, resolveStorePath } from "../../config/sessions.js"; import { logVerbose } from "../../globals.js"; +import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; import { isDiagnosticsEnabled } from "../../infra/diagnostic-events.js"; import { logMessageProcessed, @@ -11,11 +15,8 @@ import { import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; import { maybeApplyTtsToPayload, normalizeTtsAutoMode, resolveTtsConfig } from "../../tts/tts.js"; import { getReplyFromConfig } from "../reply.js"; -import type { FinalizedMsgContext } from "../templating.js"; -import type { GetReplyOptions, ReplyPayload } from "../types.js"; import { formatAbortReplyText, tryFastAbortFromMessage } from "./abort.js"; import { shouldSkipDuplicateInbound } from "./inbound-dedupe.js"; -import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js"; import { isRoutableChannel, routeReply } from "./route-reply.js"; const AUDIO_PLACEHOLDER_RE = /^(\s*\([^)]*\))?$/i; @@ -148,24 +149,25 @@ export async function dispatchReplyFromConfig(params: { const inboundAudio = isInboundAudioContext(ctx); const sessionTtsAuto = resolveSessionTtsAuto(ctx, cfg); const hookRunner = getGlobalHookRunner(); - if (hookRunner?.hasHooks("message_received")) { - const timestamp = - typeof ctx.Timestamp === "number" && Number.isFinite(ctx.Timestamp) - ? ctx.Timestamp - : undefined; - const messageIdForHook = - ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast; - const content = - typeof ctx.BodyForCommands === "string" - ? ctx.BodyForCommands - : typeof ctx.RawBody === "string" - ? ctx.RawBody - : typeof ctx.Body === "string" - ? ctx.Body - : ""; - const channelId = (ctx.OriginatingChannel ?? ctx.Surface ?? ctx.Provider ?? "").toLowerCase(); - const conversationId = ctx.OriginatingTo ?? ctx.To ?? ctx.From ?? undefined; + // Extract message context for hooks (plugin and internal) + const timestamp = + typeof ctx.Timestamp === "number" && Number.isFinite(ctx.Timestamp) ? ctx.Timestamp : undefined; + const messageIdForHook = + ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast; + const content = + typeof ctx.BodyForCommands === "string" + ? ctx.BodyForCommands + : typeof ctx.RawBody === "string" + ? ctx.RawBody + : typeof ctx.Body === "string" + ? ctx.Body + : ""; + const channelId = (ctx.OriginatingChannel ?? ctx.Surface ?? ctx.Provider ?? "").toLowerCase(); + const conversationId = ctx.OriginatingTo ?? ctx.To ?? ctx.From ?? undefined; + + // Trigger plugin hooks (fire-and-forget) + if (hookRunner?.hasHooks("message_received")) { void hookRunner .runMessageReceived( { @@ -193,10 +195,37 @@ export async function dispatchReplyFromConfig(params: { }, ) .catch((err) => { - logVerbose(`dispatch-from-config: message_received hook failed: ${String(err)}`); + logVerbose(`dispatch-from-config: message_received plugin hook failed: ${String(err)}`); }); } + // Bridge to internal hooks (HOOK.md discovery system) - refs #8807 + if (sessionKey) { + void triggerInternalHook( + createInternalHookEvent("message", "received", sessionKey, { + from: ctx.From ?? "", + content, + timestamp, + channelId, + accountId: ctx.AccountId, + conversationId, + messageId: messageIdForHook, + metadata: { + to: ctx.To, + provider: ctx.Provider, + surface: ctx.Surface, + threadId: ctx.MessageThreadId, + senderId: ctx.SenderId, + senderName: ctx.SenderName, + senderUsername: ctx.SenderUsername, + senderE164: ctx.SenderE164, + }, + }), + ).catch((err) => { + logVerbose(`dispatch-from-config: message_received internal hook failed: ${String(err)}`); + }); + } + // Check if we should route replies to originating channel instead of dispatcher. // Only route when the originating channel is DIFFERENT from the current surface. // This handles cross-provider routing (e.g., message from Telegram being processed diff --git a/src/hooks/internal-hooks.test.ts b/src/hooks/internal-hooks.test.ts index dd5353866a..b1ea70ab0e 100644 --- a/src/hooks/internal-hooks.test.ts +++ b/src/hooks/internal-hooks.test.ts @@ -4,10 +4,14 @@ import { createInternalHookEvent, getRegisteredEventKeys, isAgentBootstrapEvent, + isMessageReceivedEvent, + isMessageSentEvent, registerInternalHook, triggerInternalHook, unregisterInternalHook, type AgentBootstrapHookContext, + type MessageReceivedHookContext, + type MessageSentHookContext, } from "./internal-hooks.js"; describe("hooks", () => { @@ -181,6 +185,191 @@ describe("hooks", () => { }); }); + describe("isMessageReceivedEvent", () => { + it("returns true for message:received events with expected context", () => { + const context: MessageReceivedHookContext = { + from: "+1234567890", + content: "Hello world", + channelId: "whatsapp", + conversationId: "chat-123", + timestamp: Date.now(), + }; + const event = createInternalHookEvent("message", "received", "test-session", context); + expect(isMessageReceivedEvent(event)).toBe(true); + }); + + it("returns false for non-message events", () => { + const event = createInternalHookEvent("command", "new", "test-session"); + expect(isMessageReceivedEvent(event)).toBe(false); + }); + + it("returns false for message:sent events", () => { + const context: MessageSentHookContext = { + to: "+1234567890", + content: "Hello world", + success: true, + channelId: "whatsapp", + }; + const event = createInternalHookEvent("message", "sent", "test-session", context); + expect(isMessageReceivedEvent(event)).toBe(false); + }); + + it("returns false when context is missing required fields", () => { + const event = createInternalHookEvent("message", "received", "test-session", { + from: "+1234567890", + // missing channelId + }); + expect(isMessageReceivedEvent(event)).toBe(false); + }); + }); + + describe("isMessageSentEvent", () => { + it("returns true for message:sent events with expected context", () => { + const context: MessageSentHookContext = { + to: "+1234567890", + content: "Hello world", + success: true, + channelId: "telegram", + conversationId: "chat-456", + messageId: "msg-789", + }; + const event = createInternalHookEvent("message", "sent", "test-session", context); + expect(isMessageSentEvent(event)).toBe(true); + }); + + it("returns true when success is false (error case)", () => { + const context: MessageSentHookContext = { + to: "+1234567890", + content: "Hello world", + success: false, + error: "Network error", + channelId: "whatsapp", + }; + const event = createInternalHookEvent("message", "sent", "test-session", context); + expect(isMessageSentEvent(event)).toBe(true); + }); + + it("returns false for non-message events", () => { + const event = createInternalHookEvent("command", "new", "test-session"); + expect(isMessageSentEvent(event)).toBe(false); + }); + + it("returns false for message:received events", () => { + const context: MessageReceivedHookContext = { + from: "+1234567890", + content: "Hello world", + channelId: "whatsapp", + }; + const event = createInternalHookEvent("message", "received", "test-session", context); + expect(isMessageSentEvent(event)).toBe(false); + }); + + it("returns false when context is missing required fields", () => { + const event = createInternalHookEvent("message", "sent", "test-session", { + to: "+1234567890", + channelId: "whatsapp", + // missing success + }); + expect(isMessageSentEvent(event)).toBe(false); + }); + }); + + describe("message hooks", () => { + it("should trigger message:received handlers", async () => { + const handler = vi.fn(); + registerInternalHook("message:received", handler); + + const context: MessageReceivedHookContext = { + from: "+1234567890", + content: "Hello world", + channelId: "whatsapp", + conversationId: "chat-123", + }; + const event = createInternalHookEvent("message", "received", "test-session", context); + await triggerInternalHook(event); + + expect(handler).toHaveBeenCalledWith(event); + }); + + it("should trigger message:sent handlers", async () => { + const handler = vi.fn(); + registerInternalHook("message:sent", handler); + + const context: MessageSentHookContext = { + to: "+1234567890", + content: "Hello world", + success: true, + channelId: "telegram", + messageId: "msg-123", + }; + const event = createInternalHookEvent("message", "sent", "test-session", context); + await triggerInternalHook(event); + + expect(handler).toHaveBeenCalledWith(event); + }); + + it("should trigger general message handlers for both received and sent", async () => { + const handler = vi.fn(); + registerInternalHook("message", handler); + + const receivedContext: MessageReceivedHookContext = { + from: "+1234567890", + content: "Hello", + channelId: "whatsapp", + }; + const receivedEvent = createInternalHookEvent( + "message", + "received", + "test-session", + receivedContext, + ); + await triggerInternalHook(receivedEvent); + + const sentContext: MessageSentHookContext = { + to: "+1234567890", + content: "World", + success: true, + channelId: "whatsapp", + }; + const sentEvent = createInternalHookEvent("message", "sent", "test-session", sentContext); + await triggerInternalHook(sentEvent); + + expect(handler).toHaveBeenCalledTimes(2); + expect(handler).toHaveBeenNthCalledWith(1, receivedEvent); + expect(handler).toHaveBeenNthCalledWith(2, sentEvent); + }); + + it("should handle hook errors without breaking message processing", async () => { + const consoleError = vi.spyOn(console, "error").mockImplementation(() => {}); + const errorHandler = vi.fn(() => { + throw new Error("Hook failed"); + }); + const successHandler = vi.fn(); + + registerInternalHook("message:received", errorHandler); + registerInternalHook("message:received", successHandler); + + const context: MessageReceivedHookContext = { + from: "+1234567890", + content: "Hello", + channelId: "whatsapp", + }; + const event = createInternalHookEvent("message", "received", "test-session", context); + await triggerInternalHook(event); + + // Both handlers were called + expect(errorHandler).toHaveBeenCalled(); + expect(successHandler).toHaveBeenCalled(); + // Error was logged but didn't prevent second handler + expect(consoleError).toHaveBeenCalledWith( + expect.stringContaining("Hook error"), + expect.stringContaining("Hook failed"), + ); + + consoleError.mockRestore(); + }); + }); + describe("getRegisteredEventKeys", () => { it("should return all registered event keys", () => { registerInternalHook("command:new", vi.fn()); diff --git a/src/hooks/internal-hooks.ts b/src/hooks/internal-hooks.ts index e92b193668..428c5ddf41 100644 --- a/src/hooks/internal-hooks.ts +++ b/src/hooks/internal-hooks.ts @@ -8,7 +8,7 @@ import type { WorkspaceBootstrapFile } from "../agents/workspace.js"; import type { OpenClawConfig } from "../config/config.js"; -export type InternalHookEventType = "command" | "session" | "agent" | "gateway"; +export type InternalHookEventType = "command" | "session" | "agent" | "gateway" | "message"; export type AgentBootstrapHookContext = { workspaceDir: string; @@ -25,6 +25,60 @@ export type AgentBootstrapHookEvent = InternalHookEvent & { context: AgentBootstrapHookContext; }; +// ============================================================================ +// Message Hook Events +// ============================================================================ + +export type MessageReceivedHookContext = { + /** Sender identifier (e.g., phone number, user ID) */ + from: string; + /** Message content */ + content: string; + /** Unix timestamp when the message was received */ + timestamp?: number; + /** Channel identifier (e.g., "telegram", "whatsapp") */ + channelId: string; + /** Provider account ID for multi-account setups */ + accountId?: string; + /** Conversation/chat ID */ + conversationId?: string; + /** Message ID from the provider */ + messageId?: string; + /** Additional provider-specific metadata */ + metadata?: Record; +}; + +export type MessageReceivedHookEvent = InternalHookEvent & { + type: "message"; + action: "received"; + context: MessageReceivedHookContext; +}; + +export type MessageSentHookContext = { + /** Recipient identifier */ + to: string; + /** Message content */ + content: string; + /** Whether the message was sent successfully */ + success: boolean; + /** Error message if sending failed */ + error?: string; + /** Channel identifier (e.g., "telegram", "whatsapp") */ + channelId: string; + /** Provider account ID for multi-account setups */ + accountId?: string; + /** Conversation/chat ID */ + conversationId?: string; + /** Message ID returned by the provider */ + messageId?: string; +}; + +export type MessageSentHookEvent = InternalHookEvent & { + type: "message"; + action: "sent"; + context: MessageSentHookContext; +}; + export interface InternalHookEvent { /** The type of event (command, session, agent, gateway, etc.) */ type: InternalHookEventType; @@ -179,3 +233,31 @@ export function isAgentBootstrapEvent(event: InternalHookEvent): event is AgentB } return Array.isArray(context.bootstrapFiles); } + +export function isMessageReceivedEvent( + event: InternalHookEvent, +): event is MessageReceivedHookEvent { + if (event.type !== "message" || event.action !== "received") { + return false; + } + const context = event.context as Partial | null; + if (!context || typeof context !== "object") { + return false; + } + return typeof context.from === "string" && typeof context.channelId === "string"; +} + +export function isMessageSentEvent(event: InternalHookEvent): event is MessageSentHookEvent { + if (event.type !== "message" || event.action !== "sent") { + return false; + } + const context = event.context as Partial | null; + if (!context || typeof context !== "object") { + return false; + } + return ( + typeof context.to === "string" && + typeof context.channelId === "string" && + typeof context.success === "boolean" + ); +} diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index bc2031c8ac..f2d01f4d60 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -1,9 +1,9 @@ import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../../config/config.js"; import { signalOutbound } from "../../channels/plugins/outbound/signal.js"; import { telegramOutbound } from "../../channels/plugins/outbound/telegram.js"; import { whatsappOutbound } from "../../channels/plugins/outbound/whatsapp.js"; -import type { OpenClawConfig } from "../../config/config.js"; import { STATE_DIR } from "../../config/paths.js"; import { setActivePluginRegistry } from "../../plugins/runtime.js"; import { markdownToSignalTextChunks } from "../../signal/format.js"; @@ -19,6 +19,19 @@ const hookMocks = vi.hoisted(() => ({ runMessageSent: vi.fn(async () => {}), }, })); +const internalHookMocks = vi.hoisted(() => ({ + createInternalHookEvent: vi.fn( + (type: string, action: string, sessionKey: string, context: Record) => ({ + type, + action, + sessionKey, + context, + timestamp: new Date(), + messages: [], + }), + ), + triggerInternalHook: vi.fn(async () => {}), +})); const queueMocks = vi.hoisted(() => ({ enqueueDelivery: vi.fn(async () => "mock-queue-id"), ackDelivery: vi.fn(async () => {}), @@ -37,6 +50,10 @@ vi.mock("../../config/sessions.js", async () => { vi.mock("../../plugins/hook-runner-global.js", () => ({ getGlobalHookRunner: () => hookMocks.runner, })); +vi.mock("../../hooks/internal-hooks.js", () => ({ + createInternalHookEvent: internalHookMocks.createInternalHookEvent, + triggerInternalHook: internalHookMocks.triggerInternalHook, +})); vi.mock("./delivery-queue.js", () => ({ enqueueDelivery: queueMocks.enqueueDelivery, ackDelivery: queueMocks.ackDelivery, @@ -76,6 +93,8 @@ describe("deliverOutboundPayloads", () => { hookMocks.runner.hasHooks.mockReturnValue(false); hookMocks.runner.runMessageSent.mockReset(); hookMocks.runner.runMessageSent.mockResolvedValue(undefined); + internalHookMocks.createInternalHookEvent.mockClear(); + internalHookMocks.triggerInternalHook.mockClear(); queueMocks.enqueueDelivery.mockReset(); queueMocks.enqueueDelivery.mockResolvedValue("mock-queue-id"); queueMocks.ackDelivery.mockReset(); @@ -449,6 +468,58 @@ describe("deliverOutboundPayloads", () => { expect(results).toEqual([{ channel: "whatsapp", messageId: "w2", toJid: "jid" }]); }); + it("emits internal message:sent hook with success=true for chunked payload delivery", async () => { + const sendWhatsApp = vi + .fn() + .mockResolvedValueOnce({ messageId: "w1", toJid: "jid" }) + .mockResolvedValueOnce({ messageId: "w2", toJid: "jid" }); + const cfg: OpenClawConfig = { + channels: { whatsapp: { textChunkLimit: 2 } }, + }; + + await deliverOutboundPayloads({ + cfg, + channel: "whatsapp", + to: "+1555", + payloads: [{ text: "abcd" }], + deps: { sendWhatsApp }, + mirror: { + sessionKey: "agent:main:main", + }, + }); + + expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledTimes(1); + expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledWith( + "message", + "sent", + "agent:main:main", + expect.objectContaining({ + to: "+1555", + content: "abcd", + success: true, + channelId: "whatsapp", + conversationId: "+1555", + messageId: "w2", + }), + ); + expect(internalHookMocks.triggerInternalHook).toHaveBeenCalledTimes(1); + }); + + it("does not emit internal message:sent hook when mirror sessionKey is missing", async () => { + const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" }); + + await deliverOutboundPayloads({ + cfg: whatsappChunkConfig, + channel: "whatsapp", + to: "+1555", + payloads: [{ text: "hello" }], + deps: { sendWhatsApp }, + }); + + expect(internalHookMocks.createInternalHookEvent).not.toHaveBeenCalled(); + expect(internalHookMocks.triggerInternalHook).not.toHaveBeenCalled(); + }); + it("calls failDelivery instead of ackDelivery on bestEffort partial failure", async () => { const sendWhatsApp = vi .fn() diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index 77dba9bfc7..018eea5cc1 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -1,37 +1,38 @@ +import type { ReplyPayload } from "../../auto-reply/types.js"; +import type { + ChannelOutboundAdapter, + ChannelOutboundContext, +} from "../../channels/plugins/types.js"; +import type { OpenClawConfig } from "../../config/config.js"; +import type { sendMessageDiscord } from "../../discord/send.js"; +import type { sendMessageIMessage } from "../../imessage/send.js"; +import type { sendMessageSlack } from "../../slack/send.js"; +import type { sendMessageTelegram } from "../../telegram/send.js"; +import type { sendMessageWhatsApp } from "../../web/outbound.js"; +import type { OutboundIdentity } from "./identity.js"; +import type { NormalizedOutboundPayload } from "./payloads.js"; +import type { OutboundChannel } from "./targets.js"; import { chunkByParagraph, chunkMarkdownTextWithMode, resolveChunkMode, resolveTextChunkLimit, } from "../../auto-reply/chunk.js"; -import type { ReplyPayload } from "../../auto-reply/types.js"; import { resolveChannelMediaMaxBytes } from "../../channels/plugins/media-limits.js"; import { loadChannelOutboundAdapter } from "../../channels/plugins/outbound/load.js"; -import type { - ChannelOutboundAdapter, - ChannelOutboundContext, -} from "../../channels/plugins/types.js"; -import type { OpenClawConfig } from "../../config/config.js"; import { resolveMarkdownTableMode } from "../../config/markdown-tables.js"; import { appendAssistantMessageToSessionTranscript, resolveMirroredTranscriptText, } from "../../config/sessions.js"; -import type { sendMessageDiscord } from "../../discord/send.js"; -import type { sendMessageIMessage } from "../../imessage/send.js"; +import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; import { getAgentScopedMediaLocalRoots } from "../../media/local-roots.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; import { markdownToSignalTextChunks, type SignalTextStyleRange } from "../../signal/format.js"; import { sendMessageSignal } from "../../signal/send.js"; -import type { sendMessageSlack } from "../../slack/send.js"; -import type { sendMessageTelegram } from "../../telegram/send.js"; -import type { sendMessageWhatsApp } from "../../web/outbound.js"; import { throwIfAborted } from "./abort.js"; import { ackDelivery, enqueueDelivery, failDelivery } from "./delivery-queue.js"; -import type { OutboundIdentity } from "./identity.js"; -import type { NormalizedOutboundPayload } from "./payloads.js"; import { normalizeReplyPayloadsForDelivery } from "./payloads.js"; -import type { OutboundChannel } from "./targets.js"; export type { NormalizedOutboundPayload } from "./payloads.js"; export { normalizeOutboundPayloads } from "./payloads.js"; @@ -443,30 +444,51 @@ async function deliverOutboundPayloadsCore( return normalized ? [normalized] : []; }); const hookRunner = getGlobalHookRunner(); + const sessionKeyForInternalHooks = params.mirror?.sessionKey; for (const payload of normalizedPayloads) { const payloadSummary: NormalizedOutboundPayload = { text: payload.text ?? "", mediaUrls: payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []), channelData: payload.channelData, }; - const emitMessageSent = (success: boolean, error?: string) => { - if (!hookRunner?.hasHooks("message_sent")) { + const emitMessageSent = (params: { + success: boolean; + content: string; + error?: string; + messageId?: string; + }) => { + if (hookRunner?.hasHooks("message_sent")) { + void hookRunner + .runMessageSent( + { + to, + content: params.content, + success: params.success, + ...(params.error ? { error: params.error } : {}), + }, + { + channelId: channel, + accountId: accountId ?? undefined, + conversationId: to, + }, + ) + .catch(() => {}); + } + if (!sessionKeyForInternalHooks) { return; } - void hookRunner - .runMessageSent( - { - to, - content: payloadSummary.text, - success, - ...(error ? { error } : {}), - }, - { - channelId: channel, - accountId: accountId ?? undefined, - }, - ) - .catch(() => {}); + void triggerInternalHook( + createInternalHookEvent("message", "sent", sessionKeyForInternalHooks, { + to, + content: params.content, + success: params.success, + ...(params.error ? { error: params.error } : {}), + channelId: channel, + accountId: accountId ?? undefined, + conversationId: to, + messageId: params.messageId, + }), + ).catch(() => {}); }; try { throwIfAborted(abortSignal); @@ -504,34 +526,58 @@ async function deliverOutboundPayloadsCore( threadId: params.threadId ?? undefined, }; if (handler.sendPayload && effectivePayload.channelData) { - results.push(await handler.sendPayload(effectivePayload, sendOverrides)); - emitMessageSent(true); + const delivery = await handler.sendPayload(effectivePayload, sendOverrides); + results.push(delivery); + emitMessageSent({ + success: true, + content: payloadSummary.text, + messageId: delivery.messageId, + }); continue; } if (payloadSummary.mediaUrls.length === 0) { + const beforeCount = results.length; if (isSignalChannel) { await sendSignalTextChunks(payloadSummary.text); } else { await sendTextChunks(payloadSummary.text, sendOverrides); } - emitMessageSent(true); + const messageId = results.at(-1)?.messageId; + emitMessageSent({ + success: results.length > beforeCount, + content: payloadSummary.text, + messageId, + }); continue; } let first = true; + let lastMessageId: string | undefined; for (const url of payloadSummary.mediaUrls) { throwIfAborted(abortSignal); const caption = first ? payloadSummary.text : ""; first = false; if (isSignalChannel) { - results.push(await sendSignalMedia(caption, url)); + const delivery = await sendSignalMedia(caption, url); + results.push(delivery); + lastMessageId = delivery.messageId; } else { - results.push(await handler.sendMedia(caption, url, sendOverrides)); + const delivery = await handler.sendMedia(caption, url, sendOverrides); + results.push(delivery); + lastMessageId = delivery.messageId; } } - emitMessageSent(true); + emitMessageSent({ + success: true, + content: payloadSummary.text, + messageId: lastMessageId, + }); } catch (err) { - emitMessageSent(false, err instanceof Error ? err.message : String(err)); + emitMessageSent({ + success: false, + content: payloadSummary.text, + error: err instanceof Error ? err.message : String(err), + }); if (!params.bestEffort) { throw err; } @@ -551,5 +597,6 @@ async function deliverOutboundPayloadsCore( }); } } + return results; }