diff --git a/extensions/bluebubbles/src/monitor.test.ts b/extensions/bluebubbles/src/monitor.test.ts index d4f03f07cc..1e652ade28 100644 --- a/extensions/bluebubbles/src/monitor.test.ts +++ b/extensions/bluebubbles/src/monitor.test.ts @@ -1150,6 +1150,136 @@ describe("BlueBubbles webhook monitor", () => { }); }); + describe("inbound debouncing", () => { + it("coalesces text-only then attachment webhook events by messageId", async () => { + vi.useFakeTimers(); + try { + const account = createMockAccount({ dmPolicy: "open" }); + const config: OpenClawConfig = {}; + const core = createMockRuntime(); + + // Use a timing-aware debouncer test double that respects debounceMs/buildKey/shouldDebounce. + core.channel.debounce.createInboundDebouncer = vi.fn((params: any) => { + type Item = any; + const buckets = new Map | null }>(); + + const flush = async (key: string) => { + const bucket = buckets.get(key); + if (!bucket) return; + if (bucket.timer) { + clearTimeout(bucket.timer); + bucket.timer = null; + } + const items = bucket.items; + bucket.items = []; + if (items.length > 0) { + try { + await params.onFlush(items); + } catch (err) { + params.onError?.(err); + throw err; + } + } + }; + + return { + enqueue: async (item: Item) => { + if (params.shouldDebounce && !params.shouldDebounce(item)) { + await params.onFlush([item]); + return; + } + + const key = params.buildKey(item); + const existing = buckets.get(key); + const bucket = existing ?? { items: [], timer: null }; + bucket.items.push(item); + if (bucket.timer) clearTimeout(bucket.timer); + bucket.timer = setTimeout(async () => { + await flush(key); + }, params.debounceMs); + buckets.set(key, bucket); + }, + flushKey: vi.fn(async (key: string) => { + await flush(key); + }), + }; + }) as unknown as PluginRuntime["channel"]["debounce"]["createInboundDebouncer"]; + + setBlueBubblesRuntime(core); + + unregister = registerBlueBubblesWebhookTarget({ + account, + config, + runtime: { log: vi.fn(), error: vi.fn() }, + core, + path: "/bluebubbles-webhook", + }); + + const messageId = "race-msg-1"; + const chatGuid = "iMessage;-;+15551234567"; + + const payloadA = { + type: "new-message", + data: { + text: "hello", + handle: { address: "+15551234567" }, + isGroup: false, + isFromMe: false, + guid: messageId, + chatGuid, + date: Date.now(), + }, + }; + + const payloadB = { + type: "new-message", + data: { + text: "hello", + handle: { address: "+15551234567" }, + isGroup: false, + isFromMe: false, + guid: messageId, + chatGuid, + attachments: [ + { + guid: "att-1", + mimeType: "image/jpeg", + totalBytes: 1024, + }, + ], + date: Date.now(), + }, + }; + + await handleBlueBubblesWebhookRequest( + createMockRequest("POST", "/bluebubbles-webhook", payloadA), + createMockResponse(), + ); + + // Simulate the real-world delay where the attachment-bearing webhook arrives shortly after. + await vi.advanceTimersByTimeAsync(300); + + await handleBlueBubblesWebhookRequest( + createMockRequest("POST", "/bluebubbles-webhook", payloadB), + createMockResponse(), + ); + + // Not flushed yet; still within the debounce window. + expect(mockDispatchReplyWithBufferedBlockDispatcher).not.toHaveBeenCalled(); + + // After the debounce window, the combined message should be processed exactly once. + await vi.advanceTimersByTimeAsync(600); + + expect(mockDispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(1); + const callArgs = mockDispatchReplyWithBufferedBlockDispatcher.mock.calls[0][0]; + expect(callArgs.ctx.MediaPaths).toEqual(["/tmp/test-media.jpg"]); + expect(callArgs.ctx.Body).toContain("hello"); + } finally { + vi.useRealTimers(); + } + }); + }); + describe("reply metadata", () => { it("surfaces reply fields in ctx when provided", async () => { const account = createMockAccount({ dmPolicy: "open" }); diff --git a/extensions/bluebubbles/src/monitor.ts b/extensions/bluebubbles/src/monitor.ts index cd5ff998b9..67a5947ab9 100644 --- a/extensions/bluebubbles/src/monitor.ts +++ b/extensions/bluebubbles/src/monitor.ts @@ -264,7 +264,7 @@ type BlueBubblesDebounceEntry = { * This helps combine URL text + link preview balloon messages that BlueBubbles * sends as separate webhook events when no explicit inbound debounce config exists. */ -const DEFAULT_INBOUND_DEBOUNCE_MS = 350; +const DEFAULT_INBOUND_DEBOUNCE_MS = 500; /** * Combines multiple debounced messages into a single message for processing. @@ -363,7 +363,23 @@ function getOrCreateDebouncer(target: WebhookTarget) { debounceMs: resolveBlueBubblesDebounceMs(config, core), buildKey: (entry) => { const msg = entry.message; - // Build key from account + chat + sender to coalesce messages from same source + // Prefer stable, shared identifiers to coalesce rapid-fire webhook events for the + // same message (e.g., text-only then text+attachment). + // + // For balloons (URL previews, stickers, etc), BlueBubbles often uses a different + // messageId than the originating text. When present, key by associatedMessageGuid + // to keep text + balloon coalescing working. + const balloonBundleId = msg.balloonBundleId?.trim(); + const associatedMessageGuid = msg.associatedMessageGuid?.trim(); + if (balloonBundleId && associatedMessageGuid) { + return `bluebubbles:${account.accountId}:balloon:${associatedMessageGuid}`; + } + + const messageId = msg.messageId?.trim(); + if (messageId) { + return `bluebubbles:${account.accountId}:msg:${messageId}`; + } + const chatKey = msg.chatGuid?.trim() ?? msg.chatIdentifier?.trim() ?? @@ -372,13 +388,12 @@ function getOrCreateDebouncer(target: WebhookTarget) { }, shouldDebounce: (entry) => { const msg = entry.message; - // Skip debouncing for messages with attachments - process immediately - if (msg.attachments && msg.attachments.length > 0) return false; // Skip debouncing for from-me messages (they're just cached, not processed) if (msg.fromMe) return false; // Skip debouncing for control commands - process immediately if (core.channel.text.hasControlCommand(msg.text, config)) return false; - // Debounce normal text messages and URL balloon messages + // Debounce all other messages to coalesce rapid-fire webhook events + // (e.g., text+image arriving as separate webhooks for the same messageId) return true; }, onFlush: async (entries) => {