From 32d12fcae9e1a4d4165d217037dfcbece6e2953d Mon Sep 17 00:00:00 2001 From: Sam Padilla <35386211+theSamPadilla@users.noreply.github.com> Date: Tue, 17 Feb 2026 03:14:18 -0600 Subject: [PATCH] feat(telegram): add channel_post support for bot-to-bot communication (#17857) Merged via /review-pr -> /prepare-pr -> /merge-pr. Prepared head SHA: 27a343cd4d9c778a6017ff666d8285ae60256bf4 Co-authored-by: theSamPadilla <35386211+theSamPadilla@users.noreply.github.com> Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com> Reviewed-by: @obviyus --- CHANGELOG.md | 1 + src/telegram/allowed-updates.ts | 3 + src/telegram/bot-handlers.ts | 456 +++++++++++++------ src/telegram/bot.create-telegram-bot.test.ts | 170 +++++++ 4 files changed, 488 insertions(+), 142 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f077ca498..95d6041400 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -93,6 +93,7 @@ Docs: https://docs.openclaw.ai - Plugins: expose `llm_input` and `llm_output` hook payloads so extensions can observe prompt/input context and model output usage details. (#16724) Thanks @SecondThread. - Subagents: nested sub-agents (sub-sub-agents) with configurable depth. Set `agents.defaults.subagents.maxSpawnDepth: 2` to allow sub-agents to spawn their own children. Includes `maxChildrenPerAgent` limit (default 5), depth-aware tool policy, and proper announce chain routing. (#14447) Thanks @tyler6204. - Slack/Discord/Telegram: add per-channel ack reaction overrides (account/channel-level) to support platform-specific emoji formats. (#17092) Thanks @zerone0x. +- Telegram: add `channel_post` inbound support for channel-based bot-to-bot wake/trigger flows, with channel allowlist gating and message/media batching parity. - Cron/Gateway: add finished-run webhook delivery toggle (`notify`) and dedicated webhook auth token support (`cron.webhookToken`) for outbound cron webhook posts. (#14535) Thanks @advaitpaliwal. - Channels: deduplicate probe/token resolution base types across core + extensions while preserving per-channel error typing. (#16986) Thanks @iyoda and @thewilloftheshadow. - Memory: add MMR (Maximal Marginal Relevance) re-ranking for hybrid search diversity. Configurable via `memorySearch.query.hybrid.mmr`. Thanks @rodrigouroz. diff --git a/src/telegram/allowed-updates.ts b/src/telegram/allowed-updates.ts index e32fefd096..a081373e81 100644 --- a/src/telegram/allowed-updates.ts +++ b/src/telegram/allowed-updates.ts @@ -7,5 +7,8 @@ export function resolveTelegramAllowedUpdates(): ReadonlyArray { - mediaGroupBuffer.delete(mediaGroupId); - mediaGroupProcessing = mediaGroupProcessing - .then(async () => { - await processMediaGroup(entry); - }) - .catch(() => undefined); - await mediaGroupProcessing; - }; - - const scheduleMediaGroupFlush = (mediaGroupId: string, entry: MediaGroupEntry) => { - clearTimeout(entry.timer); - entry.timer = setTimeout(async () => { - await enqueueMediaGroupFlush(mediaGroupId, entry); - }, mediaGroupTimeoutMs); - }; - - const getOrCreateMediaGroupEntry = (mediaGroupId: string) => { - const existing = mediaGroupBuffer.get(mediaGroupId); - if (existing) { - return existing; - } - const entry: MediaGroupEntry = { - messages: [], - timer: setTimeout(() => undefined, mediaGroupTimeoutMs), - }; - mediaGroupBuffer.set(mediaGroupId, entry); - return entry; - }; - const loadStoreAllowFrom = async () => readChannelAllowFromStore("telegram", process.env, accountId).catch(() => []); @@ -551,7 +522,176 @@ export const registerTelegramHandlers = ({ runtime.error?.(danger(`telegram reaction handler failed: ${String(err)}`)); } }); + const processInboundMessage = async (params: { + ctx: TelegramContext; + msg: Message; + chatId: number; + resolvedThreadId?: number; + storeAllowFrom: string[]; + sendOversizeWarning: boolean; + oversizeLogMessage: string; + }) => { + const { + ctx, + msg, + chatId, + resolvedThreadId, + storeAllowFrom, + sendOversizeWarning, + oversizeLogMessage, + } = params; + // Text fragment handling - Telegram splits long pastes into multiple inbound messages (~4096 chars). + // We buffer “near-limit” messages and append immediately-following parts. + const text = typeof msg.text === "string" ? msg.text : undefined; + const isCommandLike = (text ?? "").trim().startsWith("/"); + if (text && !isCommandLike) { + const nowMs = Date.now(); + const senderId = msg.from?.id != null ? String(msg.from.id) : "unknown"; + const key = `text:${chatId}:${resolvedThreadId ?? "main"}:${senderId}`; + const existing = textFragmentBuffer.get(key); + + if (existing) { + const last = existing.messages.at(-1); + const lastMsgId = last?.msg.message_id; + const lastReceivedAtMs = last?.receivedAtMs ?? nowMs; + const idGap = typeof lastMsgId === "number" ? msg.message_id - lastMsgId : Infinity; + const timeGapMs = nowMs - lastReceivedAtMs; + const canAppend = + idGap > 0 && + idGap <= TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP && + timeGapMs >= 0 && + timeGapMs <= TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS; + + if (canAppend) { + const currentTotalChars = existing.messages.reduce( + (sum, m) => sum + (m.msg.text?.length ?? 0), + 0, + ); + const nextTotalChars = currentTotalChars + text.length; + if ( + existing.messages.length + 1 <= TELEGRAM_TEXT_FRAGMENT_MAX_PARTS && + nextTotalChars <= TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS + ) { + existing.messages.push({ msg, ctx, receivedAtMs: nowMs }); + scheduleTextFragmentFlush(existing); + return; + } + } + + // Not appendable (or limits exceeded): flush buffered entry first, then continue normally. + clearTimeout(existing.timer); + textFragmentBuffer.delete(key); + textFragmentProcessing = textFragmentProcessing + .then(async () => { + await flushTextFragments(existing); + }) + .catch(() => undefined); + await textFragmentProcessing; + } + + const shouldStart = text.length >= TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS; + if (shouldStart) { + const entry: TextFragmentEntry = { + key, + messages: [{ msg, ctx, receivedAtMs: nowMs }], + timer: setTimeout(() => {}, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS), + }; + textFragmentBuffer.set(key, entry); + scheduleTextFragmentFlush(entry); + return; + } + } + + // Media group handling - buffer multi-image messages + const mediaGroupId = msg.media_group_id; + if (mediaGroupId) { + const existing = mediaGroupBuffer.get(mediaGroupId); + if (existing) { + clearTimeout(existing.timer); + existing.messages.push({ msg, ctx }); + existing.timer = setTimeout(async () => { + mediaGroupBuffer.delete(mediaGroupId); + mediaGroupProcessing = mediaGroupProcessing + .then(async () => { + await processMediaGroup(existing); + }) + .catch(() => undefined); + await mediaGroupProcessing; + }, mediaGroupTimeoutMs); + } else { + const entry: MediaGroupEntry = { + messages: [{ msg, ctx }], + timer: setTimeout(async () => { + mediaGroupBuffer.delete(mediaGroupId); + mediaGroupProcessing = mediaGroupProcessing + .then(async () => { + await processMediaGroup(entry); + }) + .catch(() => undefined); + await mediaGroupProcessing; + }, mediaGroupTimeoutMs), + }; + mediaGroupBuffer.set(mediaGroupId, entry); + } + return; + } + + let media: Awaited> = null; + try { + media = await resolveMedia(ctx, mediaMaxBytes, opts.token, opts.proxyFetch); + } catch (mediaErr) { + const errMsg = String(mediaErr); + if (errMsg.includes("exceeds") && errMsg.includes("MB limit")) { + if (sendOversizeWarning) { + const limitMb = Math.round(mediaMaxBytes / (1024 * 1024)); + await withTelegramApiErrorLogging({ + operation: "sendMessage", + runtime, + fn: () => + bot.api.sendMessage(chatId, `⚠️ File too large. Maximum size is ${limitMb}MB.`, { + reply_to_message_id: msg.message_id, + }), + }).catch(() => {}); + } + logger.warn({ chatId, error: errMsg }, oversizeLogMessage); + return; + } + throw mediaErr; + } + + // Skip sticker-only messages where the sticker was skipped (animated/video) + // These have no media and no text content to process. + const hasText = Boolean((msg.text ?? msg.caption ?? "").trim()); + if (msg.sticker && !media && !hasText) { + logVerbose("telegram: skipping sticker-only message (unsupported sticker type)"); + return; + } + + const allMedia = media + ? [ + { + path: media.path, + contentType: media.contentType, + stickerMetadata: media.stickerMetadata, + }, + ] + : []; + const senderId = msg.from?.id ? String(msg.from.id) : ""; + const conversationKey = + resolvedThreadId != null ? `${chatId}:topic:${resolvedThreadId}` : String(chatId); + const debounceKey = senderId + ? `telegram:${accountId ?? "default"}:${conversationKey}:${senderId}` + : null; + await inboundDebouncer.enqueue({ + ctx, + msg, + allMedia, + storeAllowFrom, + debounceKey, + botUsername: ctx.me?.username, + }); + }; bot.on("callback_query", async (ctx) => { const callback = ctx.callbackQuery; if (!callback) { @@ -945,124 +1085,156 @@ export const registerTelegramHandlers = ({ return; } - // Text fragment handling - Telegram splits long pastes into multiple inbound messages (~4096 chars). - // We buffer “near-limit” messages and append immediately-following parts. - const text = typeof msg.text === "string" ? msg.text : undefined; - const isCommandLike = (text ?? "").trim().startsWith("/"); - if (text && !isCommandLike) { - const nowMs = Date.now(); - const senderId = msg.from?.id != null ? String(msg.from.id) : "unknown"; - const key = `text:${chatId}:${resolvedThreadId ?? "main"}:${senderId}`; - const existing = textFragmentBuffer.get(key); - - if (existing) { - const last = existing.messages.at(-1); - const lastMsgId = last?.msg.message_id; - const lastReceivedAtMs = last?.receivedAtMs ?? nowMs; - const idGap = typeof lastMsgId === "number" ? msg.message_id - lastMsgId : Infinity; - const timeGapMs = nowMs - lastReceivedAtMs; - const canAppend = - idGap > 0 && - idGap <= TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP && - timeGapMs >= 0 && - timeGapMs <= TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS; - - if (canAppend) { - const currentTotalChars = existing.messages.reduce( - (sum, m) => sum + (m.msg.text?.length ?? 0), - 0, - ); - const nextTotalChars = currentTotalChars + text.length; - if ( - existing.messages.length + 1 <= TELEGRAM_TEXT_FRAGMENT_MAX_PARTS && - nextTotalChars <= TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS - ) { - existing.messages.push({ msg, ctx, receivedAtMs: nowMs }); - scheduleTextFragmentFlush(existing); - return; - } - } - - // Not appendable (or limits exceeded): flush buffered entry first, then continue normally. - clearTimeout(existing.timer); - await runTextFragmentFlush(existing); - } - - const shouldStart = text.length >= TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS; - if (shouldStart) { - const entry: TextFragmentEntry = { - key, - messages: [{ msg, ctx, receivedAtMs: nowMs }], - timer: setTimeout(() => {}, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS), - }; - textFragmentBuffer.set(key, entry); - scheduleTextFragmentFlush(entry); - return; - } - } - - // Media group handling - buffer multi-image messages - const mediaGroupId = msg.media_group_id; - if (mediaGroupId) { - const entry = getOrCreateMediaGroupEntry(mediaGroupId); - entry.messages.push({ msg, ctx }); - scheduleMediaGroupFlush(mediaGroupId, entry); - return; - } - - let media: Awaited> = null; - try { - media = await resolveMedia(ctx, mediaMaxBytes, opts.token, opts.proxyFetch); - } catch (mediaErr) { - const errMsg = String(mediaErr); - if (errMsg.includes("exceeds") && errMsg.includes("MB limit")) { - const limitMb = Math.round(mediaMaxBytes / (1024 * 1024)); - await withTelegramApiErrorLogging({ - operation: "sendMessage", - runtime, - fn: () => - bot.api.sendMessage(chatId, `⚠️ File too large. Maximum size is ${limitMb}MB.`, { - reply_to_message_id: msg.message_id, - }), - }).catch(() => {}); - logger.warn({ chatId, error: errMsg }, "media exceeds size limit"); - return; - } - throw mediaErr; - } - - // Skip sticker-only messages where the sticker was skipped (animated/video) - // These have no media and no text content to process. - const hasText = Boolean((msg.text ?? msg.caption ?? "").trim()); - if (msg.sticker && !media && !hasText) { - logVerbose("telegram: skipping sticker-only message (unsupported sticker type)"); - return; - } - - const allMedia = media - ? [ - { - path: media.path, - contentType: media.contentType, - stickerMetadata: media.stickerMetadata, - }, - ] - : []; - const conversationKey = - resolvedThreadId != null ? `${chatId}:topic:${resolvedThreadId}` : String(chatId); - const debounceKey = senderId - ? `telegram:${accountId ?? "default"}:${conversationKey}:${senderId}` - : null; - await inboundDebouncer.enqueue({ + await processInboundMessage({ ctx, msg, - allMedia, + chatId, + resolvedThreadId, storeAllowFrom, - debounceKey, - botUsername: ctx.me?.username, + sendOversizeWarning: true, + oversizeLogMessage: "media exceeds size limit", }); } catch (err) { runtime.error?.(danger(`handler failed: ${String(err)}`)); } }); + + // Handle channel posts — enables bot-to-bot communication via Telegram channels. + // Telegram bots cannot see other bot messages in groups, but CAN in channels. + // This handler normalizes channel_post updates into the standard message pipeline. + bot.on("channel_post", async (ctx) => { + try { + const post = ctx.channelPost; + if (!post) { + return; + } + + // Deduplication check — same as the regular message handler + if (shouldSkipUpdate(ctx)) { + return; + } + + const chatId = post.chat.id; + + // Use the full group allow-from context for access control (same as message handler) + const groupAllowContext = await resolveTelegramGroupAllowFromContext({ + chatId, + accountId, + isForum: false, + messageThreadId: undefined, + groupAllowFrom, + resolveTelegramGroupConfig, + }); + const { storeAllowFrom, groupConfig, effectiveGroupAllow, hasGroupAllowOverride } = + groupAllowContext; + + // Check group allowlist (channels use the same groups config) + const groupAllowlist = resolveGroupPolicy(chatId); + if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) { + return; + } + + if (!groupConfig || groupConfig.enabled === false) { + logVerbose(`Blocked telegram channel ${chatId} (channel disabled)`); + return; + } + + // Group policy filtering (same as message handler) + const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy; + const groupPolicy = firstDefined( + groupConfig?.groupPolicy, + telegramCfg.groupPolicy, + defaultGroupPolicy, + "open", + ); + if (groupPolicy === "disabled") { + logVerbose(`Blocked telegram channel message (groupPolicy: disabled)`); + return; + } + + if (hasGroupAllowOverride) { + const senderId = post.sender_chat?.id ?? post.from?.id; + const senderUsername = post.sender_chat?.username ?? post.from?.username ?? ""; + const allowed = + senderId != null && + isSenderAllowed({ + allow: effectiveGroupAllow, + senderId: String(senderId), + senderUsername, + }); + if (!allowed) { + logVerbose( + `Blocked telegram channel sender ${senderId ?? "unknown"} (group allowFrom override)`, + ); + return; + } + } + + if (groupPolicy === "allowlist") { + const senderId = post.sender_chat?.id ?? post.from?.id; + if (senderId == null) { + logVerbose(`Blocked telegram channel message (no sender ID, groupPolicy: allowlist)`); + return; + } + if (!effectiveGroupAllow.hasEntries) { + logVerbose( + "Blocked telegram channel message (groupPolicy: allowlist, no allowlist entries)", + ); + return; + } + const senderUsername = post.sender_chat?.username ?? post.from?.username ?? ""; + if ( + !isSenderAllowed({ + allow: effectiveGroupAllow, + senderId: String(senderId), + senderUsername, + }) + ) { + logVerbose(`Blocked telegram channel message from ${senderId} (groupPolicy: allowlist)`); + return; + } + } + + // Build a synthetic `from` field since channel posts may not have one. + // Use sender_chat (the bot/user that posted) if available. + const syntheticFrom = post.sender_chat + ? { + id: post.sender_chat.id, + is_bot: true as const, + first_name: post.sender_chat.title || "Channel", + username: post.sender_chat.username, + } + : { + id: chatId, + is_bot: true as const, + first_name: post.chat.title || "Channel", + username: post.chat.username, + }; + + const syntheticMsg: Message = { + ...post, + from: post.from ?? syntheticFrom, + chat: { + ...post.chat, + type: "supergroup" as const, + }, + } as Message; + + const syntheticCtx = Object.create(ctx, { + message: { value: syntheticMsg, writable: true, enumerable: true }, + }); + + await processInboundMessage({ + ctx: syntheticCtx as TelegramContext, + msg: syntheticMsg, + chatId, + resolvedThreadId: undefined, + storeAllowFrom, + sendOversizeWarning: false, + oversizeLogMessage: "channel post media exceeds size limit", + }); + } catch (err) { + runtime.error?.(danger(`channel_post handler failed: ${String(err)}`)); + } + }); }; diff --git a/src/telegram/bot.create-telegram-bot.test.ts b/src/telegram/bot.create-telegram-bot.test.ts index 54dd77a591..172b98fcff 100644 --- a/src/telegram/bot.create-telegram-bot.test.ts +++ b/src/telegram/bot.create-telegram-bot.test.ts @@ -45,6 +45,14 @@ const mockMessage = (message: Pick & Partial): Message date: 0, ...message, }) as Message; +const TELEGRAM_TEST_TIMINGS = { + mediaGroupFlushMs: 20, + textFragmentGapMs: 30, +} as const; + +const sleep = async (ms: number) => { + await new Promise((resolve) => setTimeout(resolve, ms)); +}; describe("createTelegramBot", () => { beforeEach(() => { @@ -1864,6 +1872,168 @@ describe("createTelegramBot", () => { expect(sendMessageSpy).toHaveBeenCalledTimes(1); expect(sendMessageSpy.mock.calls[0]?.[1]).toContain("final reply"); }); + it("buffers channel_post media groups and processes them together", async () => { + onSpy.mockReset(); + replySpy.mockReset(); + + loadConfig.mockReturnValue({ + channels: { + telegram: { + groupPolicy: "open", + groups: { + "-100777111222": { + enabled: true, + requireMention: false, + }, + }, + }, + }, + }); + + const fetchSpy = vi.spyOn(globalThis, "fetch").mockImplementation( + async () => + new Response(new Uint8Array([0x89, 0x50, 0x4e, 0x47]), { + status: 200, + headers: { "content-type": "image/png" }, + }), + ); + + createTelegramBot({ token: "tok", testTimings: TELEGRAM_TEST_TIMINGS }); + const handler = getOnHandler("channel_post") as (ctx: Record) => Promise; + + const first = handler({ + channelPost: { + chat: { id: -100777111222, type: "channel", title: "Wake Channel" }, + message_id: 201, + caption: "album caption", + date: 1736380800, + media_group_id: "channel-album-1", + photo: [{ file_id: "p1" }], + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({ file_path: "photos/p1.jpg" }), + }); + + const second = handler({ + channelPost: { + chat: { id: -100777111222, type: "channel", title: "Wake Channel" }, + message_id: 202, + date: 1736380801, + media_group_id: "channel-album-1", + photo: [{ file_id: "p2" }], + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({ file_path: "photos/p2.jpg" }), + }); + + await Promise.all([first, second]); + expect(replySpy).not.toHaveBeenCalled(); + await sleep(TELEGRAM_TEST_TIMINGS.mediaGroupFlushMs + 80); + + expect(replySpy).toHaveBeenCalledTimes(1); + const payload = replySpy.mock.calls[0]?.[0] as { Body?: string; MediaPaths?: string[] }; + expect(payload.Body).toContain("album caption"); + expect(payload.MediaPaths).toHaveLength(2); + + fetchSpy.mockRestore(); + }); + it("coalesces channel_post near-limit text fragments into one message", async () => { + onSpy.mockReset(); + replySpy.mockReset(); + + loadConfig.mockReturnValue({ + channels: { + telegram: { + groupPolicy: "open", + groups: { + "-100777111222": { + enabled: true, + requireMention: false, + }, + }, + }, + }, + }); + + createTelegramBot({ token: "tok", testTimings: TELEGRAM_TEST_TIMINGS }); + const handler = getOnHandler("channel_post") as (ctx: Record) => Promise; + + const part1 = "A".repeat(4050); + const part2 = "B".repeat(50); + + await handler({ + channelPost: { + chat: { id: -100777111222, type: "channel", title: "Wake Channel" }, + message_id: 301, + date: 1736380800, + text: part1, + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({}), + }); + + await handler({ + channelPost: { + chat: { id: -100777111222, type: "channel", title: "Wake Channel" }, + message_id: 302, + date: 1736380801, + text: part2, + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({}), + }); + + expect(replySpy).not.toHaveBeenCalled(); + await sleep(TELEGRAM_TEST_TIMINGS.textFragmentGapMs + 100); + + expect(replySpy).toHaveBeenCalledTimes(1); + const payload = replySpy.mock.calls[0]?.[0] as { RawBody?: string }; + expect(payload.RawBody).toContain(part1.slice(0, 32)); + expect(payload.RawBody).toContain(part2.slice(0, 32)); + }); + it("drops oversized channel_post media instead of dispatching a placeholder message", async () => { + onSpy.mockReset(); + replySpy.mockReset(); + + loadConfig.mockReturnValue({ + channels: { + telegram: { + groupPolicy: "open", + groups: { + "-100777111222": { + enabled: true, + requireMention: false, + }, + }, + }, + }, + }); + + const fetchSpy = vi.spyOn(globalThis, "fetch").mockImplementation( + async () => + new Response(new Uint8Array([0xff, 0xd8, 0xff, 0x00]), { + status: 200, + headers: { "content-type": "image/jpeg" }, + }), + ); + + createTelegramBot({ token: "tok", mediaMaxMb: 0 }); + const handler = getOnHandler("channel_post") as (ctx: Record) => Promise; + + await handler({ + channelPost: { + chat: { id: -100777111222, type: "channel", title: "Wake Channel" }, + message_id: 401, + date: 1736380800, + photo: [{ file_id: "oversized" }], + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({ file_path: "photos/oversized.jpg" }), + }); + + expect(replySpy).not.toHaveBeenCalled(); + fetchSpy.mockRestore(); + }); it("dedupes duplicate message updates by update_id", async () => { onSpy.mockReset(); replySpy.mockReset();