fix: unify telegram thread handling

This commit is contained in:
Ayaan Zaidi
2026-02-02 08:53:42 +05:30
committed by Ayaan Zaidi
parent 5020bfa2a9
commit 19b8416a81
10 changed files with 151 additions and 46 deletions

View File

@@ -51,7 +51,7 @@ import {
describeReplyTarget,
extractTelegramLocation,
hasBotMention,
resolveTelegramForumThreadId,
resolveTelegramThreadSpec,
} from "./bot/helpers.js";
type TelegramMediaRef = {
@@ -158,11 +158,13 @@ export const buildTelegramMessageContext = async ({
const isGroup = msg.chat.type === "group" || msg.chat.type === "supergroup";
const messageThreadId = (msg as { message_thread_id?: number }).message_thread_id;
const isForum = (msg.chat as { is_forum?: boolean }).is_forum === true;
const resolvedThreadId = resolveTelegramForumThreadId({
const threadSpec = resolveTelegramThreadSpec({
isGroup,
isForum,
messageThreadId,
});
const replyThreadId = isGroup ? resolvedThreadId : messageThreadId;
const resolvedThreadId = threadSpec.scope === "forum" ? threadSpec.id : undefined;
const replyThreadId = threadSpec.id;
const { groupConfig, topicConfig } = resolveTelegramGroupConfig(chatId, resolvedThreadId);
const peerId = isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : String(chatId);
const route = resolveAgentRoute({
@@ -175,8 +177,8 @@ export const buildTelegramMessageContext = async ({
},
});
const baseSessionKey = route.sessionKey;
// DMs: use raw messageThreadId for thread sessions (not resolvedThreadId which is for forums)
const dmThreadId = !isGroup ? messageThreadId : undefined;
// DMs: use raw messageThreadId for thread sessions (not forum topic ids)
const dmThreadId = threadSpec.scope === "dm" ? threadSpec.id : undefined;
const threadKeys =
dmThreadId != null
? resolveThreadSessionKeys({ baseSessionKey, threadId: String(dmThreadId) })
@@ -621,8 +623,8 @@ export const buildTelegramMessageContext = async ({
Sticker: allMedia[0]?.stickerMetadata,
...(locationData ? toLocationContext(locationData) : undefined),
CommandAuthorized: commandAuthorized,
// For groups: use resolvedThreadId (forum topics only); for DMs: use raw messageThreadId
MessageThreadId: isGroup ? resolvedThreadId : messageThreadId,
// For groups: use resolved forum topic id; for DMs: use raw messageThreadId
MessageThreadId: threadSpec.id,
IsForum: isForum,
// Originating channel for reply routing.
OriginatingChannel: "telegram" as const,
@@ -675,6 +677,7 @@ export const buildTelegramMessageContext = async ({
chatId,
isGroup,
resolvedThreadId,
threadSpec,
replyThreadId,
isForum,
historyKey,

View File

@@ -59,6 +59,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
isGroup: false,
resolvedThreadId: undefined,
replyThreadId: 777,
threadSpec: { id: 777, scope: "dm" },
historyKey: undefined,
historyLimit: 0,
groupHistories: new Map(),
@@ -88,13 +89,13 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(createTelegramDraftStream).toHaveBeenCalledWith(
expect.objectContaining({
chatId: 123,
messageThreadId: 777,
thread: { id: 777, scope: "dm" },
}),
);
expect(draftStream.update).toHaveBeenCalledWith("Hello");
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
messageThreadId: 777,
thread: { id: 777, scope: "dm" },
}),
);
});

View File

@@ -56,7 +56,7 @@ export const dispatchTelegramMessage = async ({
msg,
chatId,
isGroup,
replyThreadId,
threadSpec,
historyKey,
historyLimit,
groupHistories,
@@ -70,8 +70,7 @@ export const dispatchTelegramMessage = async ({
} = context;
const isPrivateChat = msg.chat.type === "private";
const messageThreadId = (msg as { message_thread_id?: number }).message_thread_id;
const draftThreadId = replyThreadId ?? messageThreadId;
const draftThreadId = threadSpec.id;
const draftMaxChars = Math.min(textLimit, 4096);
const canStreamDraft =
streamMode !== "off" &&
@@ -84,7 +83,7 @@ export const dispatchTelegramMessage = async ({
chatId,
draftId: msg.message_id || Date.now(),
maxChars: draftMaxChars,
messageThreadId: draftThreadId,
thread: threadSpec,
log: logVerbose,
warn: logVerbose,
})
@@ -243,7 +242,7 @@ export const dispatchTelegramMessage = async ({
bot,
replyToMode,
textLimit,
messageThreadId: replyThreadId,
thread: threadSpec,
tableMode,
chunkMode,
onVoiceRecording: sendRecordVoice,
@@ -294,7 +293,7 @@ export const dispatchTelegramMessage = async ({
bot,
replyToMode,
textLimit,
messageThreadId: replyThreadId,
thread: threadSpec,
tableMode,
chunkMode,
linkPreview: telegramCfg.linkPreview,

View File

@@ -45,10 +45,12 @@ import { TelegramUpdateKeyContext } from "./bot-updates.js";
import { TelegramBotOptions } from "./bot.js";
import { deliverReplies } from "./bot/delivery.js";
import {
buildTelegramThreadParams,
buildSenderName,
buildTelegramGroupFrom,
buildTelegramGroupPeerId,
resolveTelegramForumThreadId,
resolveTelegramThreadSpec,
} from "./bot/helpers.js";
import { buildInlineKeyboard } from "./send.js";
@@ -409,7 +411,12 @@ export const registerTelegramNativeCommands = ({
commandAuthorized,
} = auth;
const messageThreadId = (msg as { message_thread_id?: number }).message_thread_id;
const threadIdForSend = isGroup ? resolvedThreadId : messageThreadId;
const threadSpec = resolveTelegramThreadSpec({
isGroup,
isForum,
messageThreadId,
});
const threadParams = buildTelegramThreadParams(threadSpec) ?? {};
const commandDefinition = findCommandByNativeName(command.name, "telegram");
const rawText = ctx.match?.trim() ?? "";
@@ -456,7 +463,7 @@ export const registerTelegramNativeCommands = ({
fn: () =>
bot.api.sendMessage(chatId, title, {
...(replyMarkup ? { reply_markup: replyMarkup } : {}),
...(threadIdForSend != null ? { message_thread_id: threadIdForSend } : {}),
...threadParams,
}),
});
return;
@@ -472,7 +479,7 @@ export const registerTelegramNativeCommands = ({
});
const baseSessionKey = route.sessionKey;
// DMs: use raw messageThreadId for thread sessions (not resolvedThreadId which is for forums)
const dmThreadId = !isGroup ? messageThreadId : undefined;
const dmThreadId = threadSpec.scope === "dm" ? threadSpec.id : undefined;
const threadKeys =
dmThreadId != null
? resolveThreadSessionKeys({
@@ -521,7 +528,7 @@ export const registerTelegramNativeCommands = ({
SessionKey: `telegram:slash:${senderId || chatId}`,
AccountId: route.accountId,
CommandTargetSessionKey: sessionKey,
MessageThreadId: threadIdForSend,
MessageThreadId: threadSpec.id,
IsForum: isForum,
// Originating context for sub-agent announce routing
OriginatingChannel: "telegram" as const,
@@ -553,7 +560,7 @@ export const registerTelegramNativeCommands = ({
bot,
replyToMode,
textLimit,
messageThreadId: threadIdForSend,
thread: threadSpec,
tableMode,
chunkMode,
linkPreview: telegramCfg.linkPreview,
@@ -585,7 +592,7 @@ export const registerTelegramNativeCommands = ({
bot,
replyToMode,
textLimit,
messageThreadId: threadIdForSend,
thread: threadSpec,
tableMode,
chunkMode,
linkPreview: telegramCfg.linkPreview,
@@ -630,9 +637,13 @@ export const registerTelegramNativeCommands = ({
if (!auth) {
return;
}
const { resolvedThreadId, senderId, commandAuthorized, isGroup } = auth;
const { senderId, commandAuthorized, isGroup, isForum } = auth;
const messageThreadId = (msg as { message_thread_id?: number }).message_thread_id;
const threadIdForSend = isGroup ? resolvedThreadId : messageThreadId;
const threadSpec = resolveTelegramThreadSpec({
isGroup,
isForum,
messageThreadId,
});
const result = await executePluginCommand({
command: match.command,
@@ -658,7 +669,7 @@ export const registerTelegramNativeCommands = ({
bot,
replyToMode,
textLimit,
messageThreadId: threadIdForSend,
thread: threadSpec,
tableMode,
chunkMode,
linkPreview: telegramCfg.linkPreview,

View File

@@ -138,6 +138,34 @@ describe("deliverReplies", () => {
);
});
it("keeps message_thread_id=1 when allowed", async () => {
const runtime = { error: vi.fn(), log: vi.fn() };
const sendMessage = vi.fn().mockResolvedValue({
message_id: 4,
chat: { id: "123" },
});
const bot = { api: { sendMessage } } as unknown as Bot;
await deliverReplies({
replies: [{ text: "Hello" }],
chatId: "123",
token: "tok",
runtime,
bot,
replyToMode: "off",
textLimit: 4000,
thread: { id: 1, scope: "dm" },
});
expect(sendMessage).toHaveBeenCalledWith(
"123",
expect.any(String),
expect.objectContaining({
message_thread_id: 1,
}),
);
});
it("does not include link_preview_options when linkPreview is true", async () => {
const runtime = { error: vi.fn(), log: vi.fn() };
const sendMessage = vi.fn().mockResolvedValue({

View File

@@ -22,7 +22,11 @@ import {
import { buildInlineKeyboard } from "../send.js";
import { cacheSticker, getCachedSticker } from "../sticker-cache.js";
import { resolveTelegramVoiceSend } from "../voice.js";
import { buildTelegramThreadParams, resolveTelegramReplyId } from "./helpers.js";
import {
buildTelegramThreadParams,
resolveTelegramReplyId,
type TelegramThreadSpec,
} from "./helpers.js";
const PARSE_ERR_RE = /can't parse entities|parse entities|find end of the entity/i;
const VOICE_FORBIDDEN_RE = /VOICE_MESSAGES_FORBIDDEN/;
@@ -35,7 +39,7 @@ export async function deliverReplies(params: {
bot: Bot;
replyToMode: ReplyToMode;
textLimit: number;
messageThreadId?: number;
thread?: TelegramThreadSpec | number | null;
tableMode?: MarkdownTableMode;
chunkMode?: ChunkMode;
/** Callback invoked before sending a voice message to switch typing indicator. */
@@ -52,7 +56,7 @@ export async function deliverReplies(params: {
bot,
replyToMode,
textLimit,
messageThreadId,
thread,
linkPreview,
replyQuoteText,
} = params;
@@ -114,7 +118,7 @@ export async function deliverReplies(params: {
replyToMessageId:
replyToId && (replyToMode === "all" || !hasReplied) ? replyToId : undefined,
replyQuoteText,
messageThreadId,
thread,
textMode: "html",
plainText: chunk.text,
linkPreview,
@@ -162,8 +166,8 @@ export async function deliverReplies(params: {
...(shouldAttachButtonsToMedia ? { reply_markup: replyMarkup } : {}),
...buildTelegramSendParams({
replyToMessageId,
messageThreadId,
replyQuoteText,
thread,
}),
};
if (isGif) {
@@ -227,7 +231,7 @@ export async function deliverReplies(params: {
replyToId,
replyToMode,
hasReplied,
messageThreadId,
thread,
linkPreview,
replyMarkup,
replyQuoteText,
@@ -268,7 +272,7 @@ export async function deliverReplies(params: {
replyToId && (replyToMode === "all" || !hasReplied) ? replyToId : undefined;
await sendTelegramText(bot, chatId, chunk.html, runtime, {
replyToMessageId: replyToMessageIdFollowup,
messageThreadId,
thread,
textMode: "html",
plainText: chunk.text,
linkPreview,
@@ -447,7 +451,7 @@ async function sendTelegramVoiceFallbackText(opts: {
replyToId?: number;
replyToMode: ReplyToMode;
hasReplied: boolean;
messageThreadId?: number;
thread?: TelegramThreadSpec | number | null;
linkPreview?: boolean;
replyMarkup?: ReturnType<typeof buildInlineKeyboard>;
replyQuoteText?: string;
@@ -460,7 +464,7 @@ async function sendTelegramVoiceFallbackText(opts: {
replyToMessageId:
opts.replyToId && (opts.replyToMode === "all" || !hasReplied) ? opts.replyToId : undefined,
replyQuoteText: opts.replyQuoteText,
messageThreadId: opts.messageThreadId,
thread: opts.thread,
textMode: "html",
plainText: chunk.text,
linkPreview: opts.linkPreview,
@@ -475,10 +479,10 @@ async function sendTelegramVoiceFallbackText(opts: {
function buildTelegramSendParams(opts?: {
replyToMessageId?: number;
messageThreadId?: number;
thread?: TelegramThreadSpec | number | null;
replyQuoteText?: string;
}): Record<string, unknown> {
const threadParams = buildTelegramThreadParams(opts?.messageThreadId);
const threadParams = buildTelegramThreadParams(opts?.thread);
const params: Record<string, unknown> = {};
const quoteText = opts?.replyQuoteText?.trim();
if (opts?.replyToMessageId) {
@@ -505,7 +509,7 @@ async function sendTelegramText(
opts?: {
replyToMessageId?: number;
replyQuoteText?: string;
messageThreadId?: number;
thread?: TelegramThreadSpec | number | null;
textMode?: "markdown" | "html";
plainText?: string;
linkPreview?: boolean;
@@ -515,7 +519,7 @@ async function sendTelegramText(
const baseParams = buildTelegramSendParams({
replyToMessageId: opts?.replyToMessageId,
replyQuoteText: opts?.replyQuoteText,
messageThreadId: opts?.messageThreadId,
thread: opts?.thread,
});
// Add link_preview_options when link preview is disabled.
const linkPreviewEnabled = opts?.linkPreview ?? true;

View File

@@ -41,6 +41,12 @@ describe("buildTelegramThreadParams", () => {
expect(buildTelegramThreadParams(99)).toEqual({ message_thread_id: 99 });
});
it("keeps thread id=1 for dm threads", () => {
expect(buildTelegramThreadParams({ id: 1, scope: "dm" })).toEqual({
message_thread_id: 1,
});
});
it("normalizes thread ids to integers", () => {
expect(buildTelegramThreadParams(42.9)).toEqual({ message_thread_id: 42 });
});

View File

@@ -12,6 +12,13 @@ import { formatLocationText, type NormalizedLocation } from "../../channels/loca
const TELEGRAM_GENERAL_TOPIC_ID = 1;
export type TelegramThreadScope = "dm" | "forum" | "none";
export type TelegramThreadSpec = {
id?: number;
scope: TelegramThreadScope;
};
/**
* Resolve the thread ID for Telegram forum topics.
* For non-forum groups, returns undefined even if messageThreadId is present
@@ -33,17 +40,47 @@ export function resolveTelegramForumThreadId(params: {
return params.messageThreadId;
}
export function resolveTelegramThreadSpec(params: {
isGroup: boolean;
isForum?: boolean;
messageThreadId?: number | null;
}): TelegramThreadSpec {
if (params.isGroup) {
const id = resolveTelegramForumThreadId({
isForum: params.isForum,
messageThreadId: params.messageThreadId,
});
return {
id,
scope: params.isForum ? "forum" : "none",
};
}
if (params.messageThreadId == null) {
return { scope: "dm" };
}
return {
id: params.messageThreadId,
scope: "dm",
};
}
/**
* Build thread params for Telegram API calls (messages, media).
* General forum topic (id=1) must be treated like a regular supergroup send:
* Telegram rejects sendMessage/sendMedia with message_thread_id=1 ("thread not found").
*/
export function buildTelegramThreadParams(messageThreadId?: number) {
if (messageThreadId == null) {
export function buildTelegramThreadParams(thread?: TelegramThreadSpec | number | null) {
let spec: TelegramThreadSpec | undefined;
if (typeof thread === "number") {
spec = { id: thread, scope: "forum" };
} else if (thread && typeof thread === "object") {
spec = thread;
}
if (!spec?.id) {
return undefined;
}
const normalized = Math.trunc(messageThreadId);
if (normalized === TELEGRAM_GENERAL_TOPIC_ID) {
const normalized = Math.trunc(spec.id);
if (normalized === TELEGRAM_GENERAL_TOPIC_ID && spec.scope === "forum") {
return undefined;
}
return { message_thread_id: normalized };

View File

@@ -8,7 +8,7 @@ describe("createTelegramDraftStream", () => {
api: api as any,
chatId: 123,
draftId: 42,
messageThreadId: 99,
thread: { id: 99, scope: "forum" },
});
stream.update("Hello");
@@ -24,11 +24,27 @@ describe("createTelegramDraftStream", () => {
api: api as any,
chatId: 123,
draftId: 42,
messageThreadId: 1,
thread: { id: 1, scope: "forum" },
});
stream.update("Hello");
expect(api.sendMessageDraft).toHaveBeenCalledWith(123, 42, "Hello", undefined);
});
it("keeps message_thread_id for dm threads", () => {
const api = { sendMessageDraft: vi.fn().mockResolvedValue(true) };
const stream = createTelegramDraftStream({
api: api as any,
chatId: 123,
draftId: 42,
thread: { id: 1, scope: "dm" },
});
stream.update("Hello");
expect(api.sendMessageDraft).toHaveBeenCalledWith(123, 42, "Hello", {
message_thread_id: 1,
});
});
});

View File

@@ -1,5 +1,5 @@
import type { Bot } from "grammy";
import { buildTelegramThreadParams } from "./bot/helpers.js";
import { buildTelegramThreadParams, type TelegramThreadSpec } from "./bot/helpers.js";
const TELEGRAM_DRAFT_MAX_CHARS = 4096;
const DEFAULT_THROTTLE_MS = 300;
@@ -15,7 +15,7 @@ export function createTelegramDraftStream(params: {
chatId: number;
draftId: number;
maxChars?: number;
messageThreadId?: number;
thread?: TelegramThreadSpec | number | null;
throttleMs?: number;
log?: (message: string) => void;
warn?: (message: string) => void;
@@ -25,7 +25,7 @@ export function createTelegramDraftStream(params: {
const rawDraftId = Number.isFinite(params.draftId) ? Math.trunc(params.draftId) : 1;
const draftId = rawDraftId === 0 ? 1 : Math.abs(rawDraftId);
const chatId = params.chatId;
const threadParams = buildTelegramThreadParams(params.messageThreadId);
const threadParams = buildTelegramThreadParams(params.thread);
let lastSentText = "";
let lastSentAt = 0;