fix: recover telegram sends from stale thread ids

This commit is contained in:
Ayaan Zaidi
2026-02-09 08:35:53 +05:30
committed by Ayaan Zaidi
parent 5ac1be9cb6
commit d7bd68ff24
5 changed files with 343 additions and 67 deletions

View File

@@ -28,6 +28,7 @@ Docs: https://docs.openclaw.ai
- Subagents: stabilize announce timing, preserve compaction metrics across retries, clamp overflow-prone long timeouts, and cap impossible context usage token totals. (#11551) Thanks @tyler6204.
- Agents: recover from context overflow caused by oversized tool results (pre-emptive capping + fallback truncation). (#11579) Thanks @tyler6204.
- Telegram: render markdown spoilers with `<tg-spoiler>` HTML tags. (#11543) Thanks @ezhikkk.
- Telegram: recover proactive sends when stale topic thread IDs are used by retrying without `message_thread_id`, and clear explicit no-thread route updates instead of inheriting stale thread state. (#11620)
- Gateway/CLI: when `gateway.bind=lan`, use a LAN IP for probe URLs and Control UI links. (#11448) Thanks @AnonO6.
- Memory: set Voyage embeddings `input_type` for improved retrieval. (#10818) Thanks @mcinteerj.
- Memory/QMD: run boot refresh in background by default, add configurable QMD maintenance timeouts, and retry QMD after fallback failures. (#9690, #9705)

View File

@@ -176,6 +176,51 @@ describe("sessions", () => {
});
});
it("updateLastRoute clears threadId when deliveryContext explicitly omits it", async () => {
const mainSessionKey = "agent:main:main";
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-"));
const storePath = path.join(dir, "sessions.json");
await fs.writeFile(
storePath,
JSON.stringify(
{
[mainSessionKey]: {
sessionId: "sess-1",
updatedAt: 123,
deliveryContext: {
channel: "telegram",
to: "222",
threadId: "42",
},
lastChannel: "telegram",
lastTo: "222",
lastThreadId: "42",
},
},
null,
2,
),
"utf-8",
);
await updateLastRoute({
storePath,
sessionKey: mainSessionKey,
deliveryContext: {
channel: "telegram",
to: "222",
threadId: undefined,
},
});
const store = loadSessionStore(storePath);
expect(store[mainSessionKey]?.deliveryContext).toEqual({
channel: "telegram",
to: "222",
});
expect(store[mainSessionKey]?.lastThreadId).toBeUndefined();
});
it("updateLastRoute records origin + group metadata when ctx is provided", async () => {
const sessionKey = "agent:main:whatsapp:group:123@g.us";
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-"));

View File

@@ -86,6 +86,15 @@ function normalizeSessionEntryDelivery(entry: SessionEntry): SessionEntry {
};
}
function removeThreadFromDeliveryContext(context?: DeliveryContext): DeliveryContext | undefined {
if (!context || context.threadId == null) {
return context;
}
const next: DeliveryContext = { ...context };
delete next.threadId;
return next;
}
function normalizeSessionStore(store: Record<string, SessionEntry>): void {
for (const [key, entry] of Object.entries(store)) {
if (!entry) {
@@ -430,7 +439,15 @@ export async function updateLastRoute(params: {
threadId,
});
const mergedInput = mergeDeliveryContext(explicitContext, inlineContext);
const merged = mergeDeliveryContext(mergedInput, deliveryContextFromSession(existing));
const explicitDeliveryContext = params.deliveryContext;
const clearThreadFromFallback =
explicitDeliveryContext != null &&
Object.prototype.hasOwnProperty.call(explicitDeliveryContext, "threadId") &&
explicitDeliveryContext.threadId == null;
const fallbackContext = clearThreadFromFallback
? removeThreadFromDeliveryContext(deliveryContextFromSession(existing))
: deliveryContextFromSession(existing);
const merged = mergeDeliveryContext(mergedInput, fallbackContext);
const normalized = normalizeSessionDeliveryFields({
deliveryContext: {
channel: merged?.channel,

View File

@@ -478,6 +478,36 @@ describe("sendMessageTelegram", () => {
});
});
it("retries without message_thread_id when Telegram reports missing thread", async () => {
const chatId = "123";
const threadErr = new Error("400: Bad Request: message thread not found");
const sendMessage = vi
.fn()
.mockRejectedValueOnce(threadErr)
.mockResolvedValueOnce({
message_id: 58,
chat: { id: chatId },
});
const api = { sendMessage } as unknown as {
sendMessage: typeof sendMessage;
};
const res = await sendMessageTelegram(chatId, "hello forum", {
token: "tok",
api,
messageThreadId: 271,
});
expect(sendMessage).toHaveBeenNthCalledWith(1, chatId, "hello forum", {
parse_mode: "HTML",
message_thread_id: 271,
});
expect(sendMessage).toHaveBeenNthCalledWith(2, chatId, "hello forum", {
parse_mode: "HTML",
});
expect(res.messageId).toBe("58");
});
it("sets disable_notification when silent is true", async () => {
const chatId = "123";
const sendMessage = vi.fn().mockResolvedValue({
@@ -566,6 +596,45 @@ describe("sendMessageTelegram", () => {
reply_to_message_id: 500,
});
});
it("retries media sends without message_thread_id when thread is missing", async () => {
const chatId = "123";
const threadErr = new Error("400: Bad Request: message thread not found");
const sendPhoto = vi
.fn()
.mockRejectedValueOnce(threadErr)
.mockResolvedValueOnce({
message_id: 59,
chat: { id: chatId },
});
const api = { sendPhoto } as unknown as {
sendPhoto: typeof sendPhoto;
};
loadWebMedia.mockResolvedValueOnce({
buffer: Buffer.from("fake-image"),
contentType: "image/jpeg",
fileName: "photo.jpg",
});
const res = await sendMessageTelegram(chatId, "photo", {
token: "tok",
api,
mediaUrl: "https://example.com/photo.jpg",
messageThreadId: 271,
});
expect(sendPhoto).toHaveBeenNthCalledWith(1, chatId, expect.anything(), {
caption: "photo",
parse_mode: "HTML",
message_thread_id: 271,
});
expect(sendPhoto).toHaveBeenNthCalledWith(2, chatId, expect.anything(), {
caption: "photo",
parse_mode: "HTML",
});
expect(res.messageId).toBe("59");
});
});
describe("sendStickerTelegram", () => {
@@ -626,6 +695,33 @@ describe("sendStickerTelegram", () => {
});
});
it("retries sticker sends without message_thread_id when thread is missing", async () => {
const chatId = "123";
const threadErr = new Error("400: Bad Request: message thread not found");
const sendSticker = vi
.fn()
.mockRejectedValueOnce(threadErr)
.mockResolvedValueOnce({
message_id: 109,
chat: { id: chatId },
});
const api = { sendSticker } as unknown as {
sendSticker: typeof sendSticker;
};
const res = await sendStickerTelegram(chatId, "fileId123", {
token: "tok",
api,
messageThreadId: 271,
});
expect(sendSticker).toHaveBeenNthCalledWith(1, chatId, "fileId123", {
message_thread_id: 271,
});
expect(sendSticker).toHaveBeenNthCalledWith(2, chatId, "fileId123", undefined);
expect(res.messageId).toBe("109");
});
it("includes reply_to_message_id for threaded replies", async () => {
const chatId = "123";
const fileId = "CAACAgIAAxkBAAI...sticker_file_id";

View File

@@ -69,6 +69,7 @@ type TelegramReactionOpts = {
};
const PARSE_ERR_RE = /can't parse entities|parse entities|find end of the entity/i;
const THREAD_NOT_FOUND_RE = /400:\s*Bad Request:\s*message thread not found/i;
const diagLogger = createSubsystemLogger("telegram/diagnostic");
function createTelegramHttpLogger(cfg: ReturnType<typeof loadConfig>) {
@@ -173,6 +174,25 @@ function normalizeMessageId(raw: string | number): number {
throw new Error("Message id is required for Telegram actions");
}
function isTelegramThreadNotFoundError(err: unknown): boolean {
return THREAD_NOT_FOUND_RE.test(formatErrorMessage(err));
}
function hasMessageThreadIdParam(params?: Record<string, unknown>): boolean {
return Boolean(params && Object.hasOwn(params, "message_thread_id"));
}
function removeMessageThreadIdParam(
params?: Record<string, unknown>,
): Record<string, unknown> | undefined {
if (!params || !hasMessageThreadIdParam(params)) {
return params;
}
const next = { ...params };
delete next.message_thread_id;
return Object.keys(next).length > 0 ? next : undefined;
}
export function buildInlineKeyboard(
buttons?: TelegramSendOpts["buttons"],
): InlineKeyboardMarkup | undefined {
@@ -265,6 +285,30 @@ export async function sendMessageTelegram(
);
};
const sendWithThreadFallback = async <T>(
params: Record<string, unknown> | undefined,
label: string,
attempt: (
effectiveParams: Record<string, unknown> | undefined,
effectiveLabel: string,
) => Promise<T>,
): Promise<T> => {
try {
return await attempt(params, label);
} catch (err) {
if (!hasMessageThreadIdParam(params) || !isTelegramThreadNotFoundError(err)) {
throw err;
}
if (opts.verbose) {
console.warn(
`telegram ${label} failed with message_thread_id, retrying without thread: ${formatErrorMessage(err)}`,
);
}
const retriedParams = removeMessageThreadIdParam(params);
return await attempt(retriedParams, `${label}-threadless`);
}
};
const textMode = opts.textMode ?? "markdown";
const tableMode = resolveMarkdownTableMode({
cfg,
@@ -282,43 +326,48 @@ export async function sendMessageTelegram(
params?: Record<string, unknown>,
fallbackText?: string,
) => {
const htmlText = renderHtmlText(rawText);
const baseParams = params ? { ...params } : {};
if (linkPreviewOptions) {
baseParams.link_preview_options = linkPreviewOptions;
}
const hasBaseParams = Object.keys(baseParams).length > 0;
const sendParams = {
parse_mode: "HTML" as const,
...baseParams,
...(opts.silent === true ? { disable_notification: true } : {}),
};
const res = await requestWithDiag(
() => api.sendMessage(chatId, htmlText, sendParams),
"message",
).catch(async (err) => {
// Telegram rejects malformed HTML (e.g., unsupported tags or entities).
// When that happens, fall back to plain text so the message still delivers.
const errText = formatErrorMessage(err);
if (PARSE_ERR_RE.test(errText)) {
if (opts.verbose) {
console.warn(`telegram HTML parse failed, retrying as plain text: ${errText}`);
}
const fallback = fallbackText ?? rawText;
const plainParams = hasBaseParams ? baseParams : undefined;
return await requestWithDiag(
() =>
plainParams
? api.sendMessage(chatId, fallback, plainParams)
: api.sendMessage(chatId, fallback),
"message-plain",
).catch((err2) => {
throw wrapChatNotFound(err2);
});
return await sendWithThreadFallback(params, "message", async (effectiveParams, label) => {
const htmlText = renderHtmlText(rawText);
const baseParams = effectiveParams ? { ...effectiveParams } : {};
if (linkPreviewOptions) {
baseParams.link_preview_options = linkPreviewOptions;
}
throw wrapChatNotFound(err);
const hasBaseParams = Object.keys(baseParams).length > 0;
const sendParams = {
parse_mode: "HTML" as const,
...baseParams,
...(opts.silent === true ? { disable_notification: true } : {}),
};
const res = await requestWithDiag(
() =>
api.sendMessage(chatId, htmlText, sendParams as Parameters<typeof api.sendMessage>[2]),
label,
).catch(async (err) => {
// Telegram rejects malformed HTML (e.g., unsupported tags or entities).
// When that happens, fall back to plain text so the message still delivers.
const errText = formatErrorMessage(err);
if (PARSE_ERR_RE.test(errText)) {
if (opts.verbose) {
console.warn(`telegram HTML parse failed, retrying as plain text: ${errText}`);
}
const fallback = fallbackText ?? rawText;
const plainParams = hasBaseParams
? (baseParams as Parameters<typeof api.sendMessage>[2])
: undefined;
return await requestWithDiag(
() =>
plainParams
? api.sendMessage(chatId, fallback, plainParams)
: api.sendMessage(chatId, fallback),
`${label}-plain`,
).catch((err2) => {
throw wrapChatNotFound(err2);
});
}
throw wrapChatNotFound(err);
});
return res;
});
return res;
};
if (mediaUrl) {
@@ -355,23 +404,39 @@ export async function sendMessageTelegram(
| Awaited<ReturnType<typeof api.sendAnimation>>
| Awaited<ReturnType<typeof api.sendDocument>>;
if (isGif) {
result = await requestWithDiag(
() => api.sendAnimation(chatId, file, mediaParams),
result = await sendWithThreadFallback(
mediaParams,
"animation",
).catch((err) => {
throw wrapChatNotFound(err);
});
async (effectiveParams, label) =>
requestWithDiag(
() =>
api.sendAnimation(
chatId,
file,
effectiveParams as Parameters<typeof api.sendAnimation>[2],
),
label,
).catch((err) => {
throw wrapChatNotFound(err);
}),
);
} else if (kind === "image") {
result = await requestWithDiag(() => api.sendPhoto(chatId, file, mediaParams), "photo").catch(
(err) => {
result = await sendWithThreadFallback(mediaParams, "photo", async (effectiveParams, label) =>
requestWithDiag(
() => api.sendPhoto(chatId, file, effectiveParams as Parameters<typeof api.sendPhoto>[2]),
label,
).catch((err) => {
throw wrapChatNotFound(err);
},
}),
);
} else if (kind === "video") {
result = await requestWithDiag(() => api.sendVideo(chatId, file, mediaParams), "video").catch(
(err) => {
result = await sendWithThreadFallback(mediaParams, "video", async (effectiveParams, label) =>
requestWithDiag(
() => api.sendVideo(chatId, file, effectiveParams as Parameters<typeof api.sendVideo>[2]),
label,
).catch((err) => {
throw wrapChatNotFound(err);
},
}),
);
} else if (kind === "audio") {
const { useVoice } = resolveTelegramVoiceSend({
@@ -381,27 +446,49 @@ export async function sendMessageTelegram(
logFallback: logVerbose,
});
if (useVoice) {
result = await requestWithDiag(
() => api.sendVoice(chatId, file, mediaParams),
result = await sendWithThreadFallback(
mediaParams,
"voice",
).catch((err) => {
throw wrapChatNotFound(err);
});
async (effectiveParams, label) =>
requestWithDiag(
() =>
api.sendVoice(chatId, file, effectiveParams as Parameters<typeof api.sendVoice>[2]),
label,
).catch((err) => {
throw wrapChatNotFound(err);
}),
);
} else {
result = await requestWithDiag(
() => api.sendAudio(chatId, file, mediaParams),
result = await sendWithThreadFallback(
mediaParams,
"audio",
).catch((err) => {
throw wrapChatNotFound(err);
});
async (effectiveParams, label) =>
requestWithDiag(
() =>
api.sendAudio(chatId, file, effectiveParams as Parameters<typeof api.sendAudio>[2]),
label,
).catch((err) => {
throw wrapChatNotFound(err);
}),
);
}
} else {
result = await requestWithDiag(
() => api.sendDocument(chatId, file, mediaParams),
result = await sendWithThreadFallback(
mediaParams,
"document",
).catch((err) => {
throw wrapChatNotFound(err);
});
async (effectiveParams, label) =>
requestWithDiag(
() =>
api.sendDocument(
chatId,
file,
effectiveParams as Parameters<typeof api.sendDocument>[2],
),
label,
).catch((err) => {
throw wrapChatNotFound(err);
}),
);
}
const mediaMessageId = String(result?.message_id ?? "unknown");
const resolvedChatId = String(result?.chat?.id ?? chatId);
@@ -730,14 +817,44 @@ export async function sendStickerTelegram(
);
};
const sendWithThreadFallback = async <T>(
params: Record<string, number> | undefined,
label: string,
attempt: (
effectiveParams: Record<string, number> | undefined,
effectiveLabel: string,
) => Promise<T>,
): Promise<T> => {
try {
return await attempt(params, label);
} catch (err) {
if (!hasMessageThreadIdParam(params) || !isTelegramThreadNotFoundError(err)) {
throw err;
}
if (opts.verbose) {
console.warn(
`telegram ${label} failed with message_thread_id, retrying without thread: ${formatErrorMessage(err)}`,
);
}
const retriedParams = removeMessageThreadIdParam(params) as
| Record<string, number>
| undefined;
return await attempt(retriedParams, `${label}-threadless`);
}
};
const stickerParams = hasThreadParams ? threadParams : undefined;
const result = await requestWithDiag(
() => api.sendSticker(chatId, fileId.trim(), stickerParams),
const result = await sendWithThreadFallback(
stickerParams,
"sticker",
).catch((err) => {
throw wrapChatNotFound(err);
});
async (effectiveParams, label) =>
requestWithDiag(() => api.sendSticker(chatId, fileId.trim(), effectiveParams), label).catch(
(err) => {
throw wrapChatNotFound(err);
},
),
);
const messageId = String(result?.message_id ?? "unknown");
const resolvedChatId = String(result?.chat?.id ?? chatId);