From d7bd68ff24143217fcbcb138866e47e31aa0c6a7 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Mon, 9 Feb 2026 08:35:53 +0530 Subject: [PATCH] fix: recover telegram sends from stale thread ids --- CHANGELOG.md | 1 + src/config/sessions.test.ts | 45 ++++ src/config/sessions/store.ts | 19 +- ...send.returns-undefined-empty-input.test.ts | 96 +++++++ src/telegram/send.ts | 249 +++++++++++++----- 5 files changed, 343 insertions(+), 67 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e4a2d57c85..b68d8cfe7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ Docs: https://docs.openclaw.ai - Subagents: stabilize announce timing, preserve compaction metrics across retries, clamp overflow-prone long timeouts, and cap impossible context usage token totals. (#11551) Thanks @tyler6204. - Agents: recover from context overflow caused by oversized tool results (pre-emptive capping + fallback truncation). (#11579) Thanks @tyler6204. - Telegram: render markdown spoilers with `` HTML tags. (#11543) Thanks @ezhikkk. +- Telegram: recover proactive sends when stale topic thread IDs are used by retrying without `message_thread_id`, and clear explicit no-thread route updates instead of inheriting stale thread state. (#11620) - Gateway/CLI: when `gateway.bind=lan`, use a LAN IP for probe URLs and Control UI links. (#11448) Thanks @AnonO6. - Memory: set Voyage embeddings `input_type` for improved retrieval. (#10818) Thanks @mcinteerj. - Memory/QMD: run boot refresh in background by default, add configurable QMD maintenance timeouts, and retry QMD after fallback failures. (#9690, #9705) diff --git a/src/config/sessions.test.ts b/src/config/sessions.test.ts index 345a306442..0f867e9307 100644 --- a/src/config/sessions.test.ts +++ b/src/config/sessions.test.ts @@ -176,6 +176,51 @@ describe("sessions", () => { }); }); + it("updateLastRoute clears threadId when deliveryContext explicitly omits it", async () => { + const mainSessionKey = "agent:main:main"; + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-")); + const storePath = path.join(dir, "sessions.json"); + await fs.writeFile( + storePath, + JSON.stringify( + { + [mainSessionKey]: { + sessionId: "sess-1", + updatedAt: 123, + deliveryContext: { + channel: "telegram", + to: "222", + threadId: "42", + }, + lastChannel: "telegram", + lastTo: "222", + lastThreadId: "42", + }, + }, + null, + 2, + ), + "utf-8", + ); + + await updateLastRoute({ + storePath, + sessionKey: mainSessionKey, + deliveryContext: { + channel: "telegram", + to: "222", + threadId: undefined, + }, + }); + + const store = loadSessionStore(storePath); + expect(store[mainSessionKey]?.deliveryContext).toEqual({ + channel: "telegram", + to: "222", + }); + expect(store[mainSessionKey]?.lastThreadId).toBeUndefined(); + }); + it("updateLastRoute records origin + group metadata when ctx is provided", async () => { const sessionKey = "agent:main:whatsapp:group:123@g.us"; const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-")); diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index ae239ae880..8f9ebf1d03 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -86,6 +86,15 @@ function normalizeSessionEntryDelivery(entry: SessionEntry): SessionEntry { }; } +function removeThreadFromDeliveryContext(context?: DeliveryContext): DeliveryContext | undefined { + if (!context || context.threadId == null) { + return context; + } + const next: DeliveryContext = { ...context }; + delete next.threadId; + return next; +} + function normalizeSessionStore(store: Record): void { for (const [key, entry] of Object.entries(store)) { if (!entry) { @@ -430,7 +439,15 @@ export async function updateLastRoute(params: { threadId, }); const mergedInput = mergeDeliveryContext(explicitContext, inlineContext); - const merged = mergeDeliveryContext(mergedInput, deliveryContextFromSession(existing)); + const explicitDeliveryContext = params.deliveryContext; + const clearThreadFromFallback = + explicitDeliveryContext != null && + Object.prototype.hasOwnProperty.call(explicitDeliveryContext, "threadId") && + explicitDeliveryContext.threadId == null; + const fallbackContext = clearThreadFromFallback + ? removeThreadFromDeliveryContext(deliveryContextFromSession(existing)) + : deliveryContextFromSession(existing); + const merged = mergeDeliveryContext(mergedInput, fallbackContext); const normalized = normalizeSessionDeliveryFields({ deliveryContext: { channel: merged?.channel, diff --git a/src/telegram/send.returns-undefined-empty-input.test.ts b/src/telegram/send.returns-undefined-empty-input.test.ts index 000708dcaf..d6c71c44d5 100644 --- a/src/telegram/send.returns-undefined-empty-input.test.ts +++ b/src/telegram/send.returns-undefined-empty-input.test.ts @@ -478,6 +478,36 @@ describe("sendMessageTelegram", () => { }); }); + it("retries without message_thread_id when Telegram reports missing thread", async () => { + const chatId = "123"; + const threadErr = new Error("400: Bad Request: message thread not found"); + const sendMessage = vi + .fn() + .mockRejectedValueOnce(threadErr) + .mockResolvedValueOnce({ + message_id: 58, + chat: { id: chatId }, + }); + const api = { sendMessage } as unknown as { + sendMessage: typeof sendMessage; + }; + + const res = await sendMessageTelegram(chatId, "hello forum", { + token: "tok", + api, + messageThreadId: 271, + }); + + expect(sendMessage).toHaveBeenNthCalledWith(1, chatId, "hello forum", { + parse_mode: "HTML", + message_thread_id: 271, + }); + expect(sendMessage).toHaveBeenNthCalledWith(2, chatId, "hello forum", { + parse_mode: "HTML", + }); + expect(res.messageId).toBe("58"); + }); + it("sets disable_notification when silent is true", async () => { const chatId = "123"; const sendMessage = vi.fn().mockResolvedValue({ @@ -566,6 +596,45 @@ describe("sendMessageTelegram", () => { reply_to_message_id: 500, }); }); + + it("retries media sends without message_thread_id when thread is missing", async () => { + const chatId = "123"; + const threadErr = new Error("400: Bad Request: message thread not found"); + const sendPhoto = vi + .fn() + .mockRejectedValueOnce(threadErr) + .mockResolvedValueOnce({ + message_id: 59, + chat: { id: chatId }, + }); + const api = { sendPhoto } as unknown as { + sendPhoto: typeof sendPhoto; + }; + + loadWebMedia.mockResolvedValueOnce({ + buffer: Buffer.from("fake-image"), + contentType: "image/jpeg", + fileName: "photo.jpg", + }); + + const res = await sendMessageTelegram(chatId, "photo", { + token: "tok", + api, + mediaUrl: "https://example.com/photo.jpg", + messageThreadId: 271, + }); + + expect(sendPhoto).toHaveBeenNthCalledWith(1, chatId, expect.anything(), { + caption: "photo", + parse_mode: "HTML", + message_thread_id: 271, + }); + expect(sendPhoto).toHaveBeenNthCalledWith(2, chatId, expect.anything(), { + caption: "photo", + parse_mode: "HTML", + }); + expect(res.messageId).toBe("59"); + }); }); describe("sendStickerTelegram", () => { @@ -626,6 +695,33 @@ describe("sendStickerTelegram", () => { }); }); + it("retries sticker sends without message_thread_id when thread is missing", async () => { + const chatId = "123"; + const threadErr = new Error("400: Bad Request: message thread not found"); + const sendSticker = vi + .fn() + .mockRejectedValueOnce(threadErr) + .mockResolvedValueOnce({ + message_id: 109, + chat: { id: chatId }, + }); + const api = { sendSticker } as unknown as { + sendSticker: typeof sendSticker; + }; + + const res = await sendStickerTelegram(chatId, "fileId123", { + token: "tok", + api, + messageThreadId: 271, + }); + + expect(sendSticker).toHaveBeenNthCalledWith(1, chatId, "fileId123", { + message_thread_id: 271, + }); + expect(sendSticker).toHaveBeenNthCalledWith(2, chatId, "fileId123", undefined); + expect(res.messageId).toBe("109"); + }); + it("includes reply_to_message_id for threaded replies", async () => { const chatId = "123"; const fileId = "CAACAgIAAxkBAAI...sticker_file_id"; diff --git a/src/telegram/send.ts b/src/telegram/send.ts index 29cbede999..570f68aa73 100644 --- a/src/telegram/send.ts +++ b/src/telegram/send.ts @@ -69,6 +69,7 @@ type TelegramReactionOpts = { }; const PARSE_ERR_RE = /can't parse entities|parse entities|find end of the entity/i; +const THREAD_NOT_FOUND_RE = /400:\s*Bad Request:\s*message thread not found/i; const diagLogger = createSubsystemLogger("telegram/diagnostic"); function createTelegramHttpLogger(cfg: ReturnType) { @@ -173,6 +174,25 @@ function normalizeMessageId(raw: string | number): number { throw new Error("Message id is required for Telegram actions"); } +function isTelegramThreadNotFoundError(err: unknown): boolean { + return THREAD_NOT_FOUND_RE.test(formatErrorMessage(err)); +} + +function hasMessageThreadIdParam(params?: Record): boolean { + return Boolean(params && Object.hasOwn(params, "message_thread_id")); +} + +function removeMessageThreadIdParam( + params?: Record, +): Record | undefined { + if (!params || !hasMessageThreadIdParam(params)) { + return params; + } + const next = { ...params }; + delete next.message_thread_id; + return Object.keys(next).length > 0 ? next : undefined; +} + export function buildInlineKeyboard( buttons?: TelegramSendOpts["buttons"], ): InlineKeyboardMarkup | undefined { @@ -265,6 +285,30 @@ export async function sendMessageTelegram( ); }; + const sendWithThreadFallback = async ( + params: Record | undefined, + label: string, + attempt: ( + effectiveParams: Record | undefined, + effectiveLabel: string, + ) => Promise, + ): Promise => { + try { + return await attempt(params, label); + } catch (err) { + if (!hasMessageThreadIdParam(params) || !isTelegramThreadNotFoundError(err)) { + throw err; + } + if (opts.verbose) { + console.warn( + `telegram ${label} failed with message_thread_id, retrying without thread: ${formatErrorMessage(err)}`, + ); + } + const retriedParams = removeMessageThreadIdParam(params); + return await attempt(retriedParams, `${label}-threadless`); + } + }; + const textMode = opts.textMode ?? "markdown"; const tableMode = resolveMarkdownTableMode({ cfg, @@ -282,43 +326,48 @@ export async function sendMessageTelegram( params?: Record, fallbackText?: string, ) => { - const htmlText = renderHtmlText(rawText); - const baseParams = params ? { ...params } : {}; - if (linkPreviewOptions) { - baseParams.link_preview_options = linkPreviewOptions; - } - const hasBaseParams = Object.keys(baseParams).length > 0; - const sendParams = { - parse_mode: "HTML" as const, - ...baseParams, - ...(opts.silent === true ? { disable_notification: true } : {}), - }; - const res = await requestWithDiag( - () => api.sendMessage(chatId, htmlText, sendParams), - "message", - ).catch(async (err) => { - // Telegram rejects malformed HTML (e.g., unsupported tags or entities). - // When that happens, fall back to plain text so the message still delivers. - const errText = formatErrorMessage(err); - if (PARSE_ERR_RE.test(errText)) { - if (opts.verbose) { - console.warn(`telegram HTML parse failed, retrying as plain text: ${errText}`); - } - const fallback = fallbackText ?? rawText; - const plainParams = hasBaseParams ? baseParams : undefined; - return await requestWithDiag( - () => - plainParams - ? api.sendMessage(chatId, fallback, plainParams) - : api.sendMessage(chatId, fallback), - "message-plain", - ).catch((err2) => { - throw wrapChatNotFound(err2); - }); + return await sendWithThreadFallback(params, "message", async (effectiveParams, label) => { + const htmlText = renderHtmlText(rawText); + const baseParams = effectiveParams ? { ...effectiveParams } : {}; + if (linkPreviewOptions) { + baseParams.link_preview_options = linkPreviewOptions; } - throw wrapChatNotFound(err); + const hasBaseParams = Object.keys(baseParams).length > 0; + const sendParams = { + parse_mode: "HTML" as const, + ...baseParams, + ...(opts.silent === true ? { disable_notification: true } : {}), + }; + const res = await requestWithDiag( + () => + api.sendMessage(chatId, htmlText, sendParams as Parameters[2]), + label, + ).catch(async (err) => { + // Telegram rejects malformed HTML (e.g., unsupported tags or entities). + // When that happens, fall back to plain text so the message still delivers. + const errText = formatErrorMessage(err); + if (PARSE_ERR_RE.test(errText)) { + if (opts.verbose) { + console.warn(`telegram HTML parse failed, retrying as plain text: ${errText}`); + } + const fallback = fallbackText ?? rawText; + const plainParams = hasBaseParams + ? (baseParams as Parameters[2]) + : undefined; + return await requestWithDiag( + () => + plainParams + ? api.sendMessage(chatId, fallback, plainParams) + : api.sendMessage(chatId, fallback), + `${label}-plain`, + ).catch((err2) => { + throw wrapChatNotFound(err2); + }); + } + throw wrapChatNotFound(err); + }); + return res; }); - return res; }; if (mediaUrl) { @@ -355,23 +404,39 @@ export async function sendMessageTelegram( | Awaited> | Awaited>; if (isGif) { - result = await requestWithDiag( - () => api.sendAnimation(chatId, file, mediaParams), + result = await sendWithThreadFallback( + mediaParams, "animation", - ).catch((err) => { - throw wrapChatNotFound(err); - }); + async (effectiveParams, label) => + requestWithDiag( + () => + api.sendAnimation( + chatId, + file, + effectiveParams as Parameters[2], + ), + label, + ).catch((err) => { + throw wrapChatNotFound(err); + }), + ); } else if (kind === "image") { - result = await requestWithDiag(() => api.sendPhoto(chatId, file, mediaParams), "photo").catch( - (err) => { + result = await sendWithThreadFallback(mediaParams, "photo", async (effectiveParams, label) => + requestWithDiag( + () => api.sendPhoto(chatId, file, effectiveParams as Parameters[2]), + label, + ).catch((err) => { throw wrapChatNotFound(err); - }, + }), ); } else if (kind === "video") { - result = await requestWithDiag(() => api.sendVideo(chatId, file, mediaParams), "video").catch( - (err) => { + result = await sendWithThreadFallback(mediaParams, "video", async (effectiveParams, label) => + requestWithDiag( + () => api.sendVideo(chatId, file, effectiveParams as Parameters[2]), + label, + ).catch((err) => { throw wrapChatNotFound(err); - }, + }), ); } else if (kind === "audio") { const { useVoice } = resolveTelegramVoiceSend({ @@ -381,27 +446,49 @@ export async function sendMessageTelegram( logFallback: logVerbose, }); if (useVoice) { - result = await requestWithDiag( - () => api.sendVoice(chatId, file, mediaParams), + result = await sendWithThreadFallback( + mediaParams, "voice", - ).catch((err) => { - throw wrapChatNotFound(err); - }); + async (effectiveParams, label) => + requestWithDiag( + () => + api.sendVoice(chatId, file, effectiveParams as Parameters[2]), + label, + ).catch((err) => { + throw wrapChatNotFound(err); + }), + ); } else { - result = await requestWithDiag( - () => api.sendAudio(chatId, file, mediaParams), + result = await sendWithThreadFallback( + mediaParams, "audio", - ).catch((err) => { - throw wrapChatNotFound(err); - }); + async (effectiveParams, label) => + requestWithDiag( + () => + api.sendAudio(chatId, file, effectiveParams as Parameters[2]), + label, + ).catch((err) => { + throw wrapChatNotFound(err); + }), + ); } } else { - result = await requestWithDiag( - () => api.sendDocument(chatId, file, mediaParams), + result = await sendWithThreadFallback( + mediaParams, "document", - ).catch((err) => { - throw wrapChatNotFound(err); - }); + async (effectiveParams, label) => + requestWithDiag( + () => + api.sendDocument( + chatId, + file, + effectiveParams as Parameters[2], + ), + label, + ).catch((err) => { + throw wrapChatNotFound(err); + }), + ); } const mediaMessageId = String(result?.message_id ?? "unknown"); const resolvedChatId = String(result?.chat?.id ?? chatId); @@ -730,14 +817,44 @@ export async function sendStickerTelegram( ); }; + const sendWithThreadFallback = async ( + params: Record | undefined, + label: string, + attempt: ( + effectiveParams: Record | undefined, + effectiveLabel: string, + ) => Promise, + ): Promise => { + try { + return await attempt(params, label); + } catch (err) { + if (!hasMessageThreadIdParam(params) || !isTelegramThreadNotFoundError(err)) { + throw err; + } + if (opts.verbose) { + console.warn( + `telegram ${label} failed with message_thread_id, retrying without thread: ${formatErrorMessage(err)}`, + ); + } + const retriedParams = removeMessageThreadIdParam(params) as + | Record + | undefined; + return await attempt(retriedParams, `${label}-threadless`); + } + }; + const stickerParams = hasThreadParams ? threadParams : undefined; - const result = await requestWithDiag( - () => api.sendSticker(chatId, fileId.trim(), stickerParams), + const result = await sendWithThreadFallback( + stickerParams, "sticker", - ).catch((err) => { - throw wrapChatNotFound(err); - }); + async (effectiveParams, label) => + requestWithDiag(() => api.sendSticker(chatId, fileId.trim(), effectiveParams), label).catch( + (err) => { + throw wrapChatNotFound(err); + }, + ), + ); const messageId = String(result?.message_id ?? "unknown"); const resolvedChatId = String(result?.chat?.id ?? chatId);