diff --git a/CHANGELOG.md b/CHANGELOG.md index 61cfa81c3f..ef328c5d0c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Telegram/Cron/Heartbeat: honor explicit Telegram topic targets in cron and heartbeat delivery (`:topic:`) so scheduled sends land in the configured topic instead of the last active thread. (#19367) Thanks @Lukavyi. - Commands/Doctor: avoid rewriting invalid configs with new `gateway.auth.token` defaults during repair and only write when real config changes are detected, preventing accidental token duplication and backup churn. - Sandbox/Registry: serialize container and browser registry writes with shared file locks and atomic replacement to prevent lost updates and delete rollback races from desyncing `sandbox list`, `prune`, and `recreate --all`. Thanks @kexinoh. - Security/Exec: require `tools.exec.safeBins` binaries to resolve from trusted bin directories (system defaults plus gateway startup `PATH`) so PATH-hijacked trojan binaries cannot bypass allowlist checks. Thanks @jackhax for reporting. diff --git a/docs/gateway/heartbeat.md b/docs/gateway/heartbeat.md index a450218f2c..36550e35ae 100644 --- a/docs/gateway/heartbeat.md +++ b/docs/gateway/heartbeat.md @@ -176,7 +176,7 @@ Use `accountId` to target a specific account on multi-account channels like Tele heartbeat: { every: "1h", target: "telegram", - to: "12345678", + to: "12345678:topic:42", // optional: route to a specific topic/thread accountId: "ops-bot", }, }, @@ -205,7 +205,7 @@ Use `accountId` to target a specific account on multi-account channels like Tele - `last` (default): deliver to the last used external channel. - explicit channel: `whatsapp` / `telegram` / `discord` / `googlechat` / `slack` / `msteams` / `signal` / `imessage`. - `none`: run the heartbeat but **do not deliver** externally. -- `to`: optional recipient override (channel-specific id, e.g. E.164 for WhatsApp or a Telegram chat id). +- `to`: optional recipient override (channel-specific id, e.g. E.164 for WhatsApp or a Telegram chat id). For Telegram topics/threads, use `:topic:`. - `accountId`: optional account id for multi-account channels. When `target: "last"`, the account id applies to the resolved last channel if it supports accounts; otherwise it is ignored. If the account id does not match a configured account for the resolved channel, delivery is skipped. - `prompt`: overrides the default prompt body (not merged). - `ackMaxChars`: max chars allowed after `HEARTBEAT_OK` before delivery. diff --git a/src/config/types.agent-defaults.ts b/src/config/types.agent-defaults.ts index 1829c71839..c89d44261c 100644 --- a/src/config/types.agent-defaults.ts +++ b/src/config/types.agent-defaults.ts @@ -217,7 +217,7 @@ export type AgentDefaultsConfig = { session?: string; /** Delivery target ("last", "none", or a channel id). */ target?: "last" | "none" | ChannelId; - /** Optional delivery override (E.164 for WhatsApp, chat id for Telegram). */ + /** Optional delivery override (E.164 for WhatsApp, chat id for Telegram). Supports :topic:NNN suffix for Telegram topics. */ to?: string; /** Optional account id for multi-account channels. */ accountId?: string; diff --git a/src/cron/isolated-agent.delivery-target-thread-session.test.ts b/src/cron/isolated-agent.delivery-target-thread-session.test.ts new file mode 100644 index 0000000000..088609bcdb --- /dev/null +++ b/src/cron/isolated-agent.delivery-target-thread-session.test.ts @@ -0,0 +1,131 @@ +import { describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../config/config.js"; + +// Mock session store so we can control what entries exist. +const mockStore: Record> = {}; +vi.mock("../config/sessions.js", () => ({ + loadSessionStore: vi.fn((storePath: string) => mockStore[storePath] ?? {}), + resolveAgentMainSessionKey: vi.fn(({ agentId }: { agentId: string }) => `agent:${agentId}:main`), + resolveStorePath: vi.fn((_store: unknown, _opts: unknown) => "/mock/store.json"), +})); + +// Mock channel-selection to avoid real config resolution. +vi.mock("../infra/outbound/channel-selection.js", () => ({ + resolveMessageChannelSelection: vi.fn(async () => ({ channel: "telegram" })), +})); + +// Minimal mock for channel plugins (Telegram resolveTarget is an identity). +vi.mock("../channels/plugins/index.js", () => ({ + getChannelPlugin: vi.fn(() => ({ + meta: { label: "Telegram" }, + config: {}, + outbound: { + resolveTarget: ({ to }: { to?: string }) => + to ? { ok: true, to } : { ok: false, error: new Error("missing") }, + }, + })), + normalizeChannelId: vi.fn((id: string) => id), +})); + +const { resolveDeliveryTarget } = await import("./isolated-agent/delivery-target.js"); + +describe("resolveDeliveryTarget thread session lookup", () => { + const cfg: OpenClawConfig = {}; + + it("uses thread session entry when sessionKey is provided and entry exists", async () => { + mockStore["/mock/store.json"] = { + "agent:main:main": { + sessionId: "s1", + updatedAt: 1, + lastChannel: "telegram", + lastTo: "-100111", + }, + "agent:main:main:thread:9999": { + sessionId: "s2", + updatedAt: 2, + lastChannel: "telegram", + lastTo: "-100111", + lastThreadId: 9999, + }, + }; + + const result = await resolveDeliveryTarget(cfg, "main", { + channel: "last", + sessionKey: "agent:main:main:thread:9999", + }); + + expect(result.to).toBe("-100111"); + expect(result.threadId).toBe(9999); + expect(result.channel).toBe("telegram"); + }); + + it("falls back to main session when sessionKey entry does not exist", async () => { + mockStore["/mock/store.json"] = { + "agent:main:main": { + sessionId: "s1", + updatedAt: 1, + lastChannel: "telegram", + lastTo: "-100222", + }, + }; + + const result = await resolveDeliveryTarget(cfg, "main", { + channel: "last", + sessionKey: "agent:main:main:thread:nonexistent", + }); + + expect(result.to).toBe("-100222"); + expect(result.threadId).toBeUndefined(); + expect(result.channel).toBe("telegram"); + }); + + it("falls back to main session when no sessionKey is provided", async () => { + mockStore["/mock/store.json"] = { + "agent:main:main": { + sessionId: "s1", + updatedAt: 1, + lastChannel: "telegram", + lastTo: "-100333", + }, + }; + + const result = await resolveDeliveryTarget(cfg, "main", { + channel: "last", + }); + + expect(result.to).toBe("-100333"); + expect(result.threadId).toBeUndefined(); + }); + + it("preserves threadId from :topic: in delivery.to on first run (no session history)", async () => { + mockStore["/mock/store.json"] = {}; + + const result = await resolveDeliveryTarget(cfg, "main", { + channel: "telegram", + to: "63448508:topic:1008013", + }); + + expect(result.to).toBe("63448508"); + expect(result.threadId).toBe(1008013); + expect(result.channel).toBe("telegram"); + }); + + it("preserves threadId from :topic: when lastTo differs", async () => { + mockStore["/mock/store.json"] = { + "agent:main:main": { + sessionId: "s1", + updatedAt: 1, + lastChannel: "telegram", + lastTo: "-100999", + }, + }; + + const result = await resolveDeliveryTarget(cfg, "main", { + channel: "telegram", + to: "63448508:topic:1008013", + }); + + expect(result.to).toBe("63448508"); + expect(result.threadId).toBe(1008013); + }); +}); diff --git a/src/cron/isolated-agent/delivery-target.ts b/src/cron/isolated-agent/delivery-target.ts index c0a59827bb..b7516778e6 100644 --- a/src/cron/isolated-agent/delivery-target.ts +++ b/src/cron/isolated-agent/delivery-target.ts @@ -21,6 +21,7 @@ export async function resolveDeliveryTarget( jobPayload: { channel?: "last" | ChannelId; to?: string; + sessionKey?: string; }, ): Promise<{ channel: Exclude; @@ -38,7 +39,12 @@ export async function resolveDeliveryTarget( const mainSessionKey = resolveAgentMainSessionKey({ cfg, agentId }); const storePath = resolveStorePath(sessionCfg?.store, { agentId }); const store = loadSessionStore(storePath); - const main = store[mainSessionKey]; + + // Look up thread-specific session first (e.g. agent:main:main:thread:1234), + // then fall back to the main session entry. + const threadSessionKey = jobPayload.sessionKey?.trim(); + const threadEntry = threadSessionKey ? store[threadSessionKey] : undefined; + const main = threadEntry ?? store[mainSessionKey]; const preliminary = resolveSessionDeliveryTarget({ entry: main, @@ -86,12 +92,13 @@ export async function resolveDeliveryTarget( } } - // Only carry threadId when delivering to the same recipient as the session's - // last conversation. This prevents stale thread IDs (e.g. from a Telegram - // supergroup topic) from being sent to a different target (e.g. a private - // chat) where they would cause API errors. + // Carry threadId when it was explicitly set (from :topic: parsing or config) + // or when delivering to the same recipient as the session's last conversation. + // Session-derived threadIds are dropped when the target differs to prevent + // stale thread IDs from leaking to a different chat. const threadId = - resolved.threadId && resolved.to && resolved.to === resolved.lastTo + resolved.threadId && + (resolved.threadIdExplicit || (resolved.to && resolved.to === resolved.lastTo)) ? resolved.threadId : undefined; diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index fdcdc43564..b59ba33d88 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -363,6 +363,7 @@ export async function runCronIsolatedAgentTurn(params: { const resolvedDelivery = await resolveDeliveryTarget(cfgWithAgentDefaults, agentId, { channel: deliveryPlan.channel ?? "last", to: deliveryPlan.to, + sessionKey: params.job.sessionKey, }); const { formattedTime, timeLine } = resolveCronStyleNow(params.cfg, now); diff --git a/src/infra/heartbeat-runner.returns-default-unset.test.ts b/src/infra/heartbeat-runner.returns-default-unset.test.ts index f866af4470..299a738219 100644 --- a/src/infra/heartbeat-runner.returns-default-unset.test.ts +++ b/src/infra/heartbeat-runner.returns-default-unset.test.ts @@ -305,6 +305,46 @@ describe("resolveHeartbeatDeliveryTarget", () => { }); }); + it("keeps explicit telegram targets", () => { + const cfg: OpenClawConfig = { + agents: { defaults: { heartbeat: { target: "telegram", to: "123" } } }, + }; + expect(resolveHeartbeatDeliveryTarget({ cfg, entry: baseEntry })).toEqual({ + channel: "telegram", + to: "123", + accountId: undefined, + lastChannel: undefined, + lastAccountId: undefined, + }); + }); + + it("parses threadId from :topic: suffix in heartbeat to", () => { + const cfg: OpenClawConfig = { + agents: { + defaults: { + heartbeat: { target: "telegram", to: "-100111:topic:42" }, + }, + }, + }; + const result = resolveHeartbeatDeliveryTarget({ cfg, entry: baseEntry }); + expect(result.channel).toBe("telegram"); + expect(result.to).toBe("-100111"); + expect(result.threadId).toBe(42); + }); + + it("heartbeat to without :topic: has no threadId", () => { + const cfg: OpenClawConfig = { + agents: { + defaults: { + heartbeat: { target: "telegram", to: "-100111" }, + }, + }, + }; + const result = resolveHeartbeatDeliveryTarget({ cfg, entry: baseEntry }); + expect(result.to).toBe("-100111"); + expect(result.threadId).toBeUndefined(); + }); + it("uses explicit heartbeat accountId when provided", () => { const cfg: OpenClawConfig = { agents: { diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index fef8972bcc..d8b0f5db92 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -637,6 +637,7 @@ export async function runHeartbeatOnce(opts: { channel: delivery.channel, to: delivery.to, accountId: delivery.accountId, + threadId: delivery.threadId, payloads: [{ text: heartbeatOkText }], agentId, deps: opts.deps, @@ -832,6 +833,7 @@ export async function runHeartbeatOnce(opts: { to: delivery.to, accountId: deliveryAccountId, agentId, + threadId: delivery.threadId, payloads: [ ...reasoningPayloads, ...(shouldSkipMain diff --git a/src/infra/outbound/outbound.test.ts b/src/infra/outbound/outbound.test.ts index d113f7530c..8095da3c6b 100644 --- a/src/infra/outbound/outbound.test.ts +++ b/src/infra/outbound/outbound.test.ts @@ -1000,6 +1000,7 @@ describe("resolveSessionDeliveryTarget", () => { to: "+1555", accountId: "acct-1", threadId: undefined, + threadIdExplicit: false, mode: "implicit", lastChannel: "whatsapp", lastTo: "+1555", @@ -1024,6 +1025,7 @@ describe("resolveSessionDeliveryTarget", () => { to: undefined, accountId: undefined, threadId: undefined, + threadIdExplicit: false, mode: "implicit", lastChannel: "whatsapp", lastTo: "+1555", @@ -1049,6 +1051,7 @@ describe("resolveSessionDeliveryTarget", () => { to: "+1555", accountId: undefined, threadId: undefined, + threadIdExplicit: false, mode: "implicit", lastChannel: "whatsapp", lastTo: "+1555", @@ -1074,6 +1077,7 @@ describe("resolveSessionDeliveryTarget", () => { to: undefined, accountId: undefined, threadId: undefined, + threadIdExplicit: false, mode: "implicit", lastChannel: "whatsapp", lastTo: "+1555", diff --git a/src/infra/outbound/targets.test.ts b/src/infra/outbound/targets.test.ts new file mode 100644 index 0000000000..68a242ad1a --- /dev/null +++ b/src/infra/outbound/targets.test.ts @@ -0,0 +1,220 @@ +import { describe, expect, it } from "vitest"; +import { resolveSessionDeliveryTarget } from "./targets.js"; + +describe("resolveSessionDeliveryTarget", () => { + it("derives implicit delivery from the last route", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-1", + updatedAt: 1, + lastChannel: " whatsapp ", + lastTo: " +1555 ", + lastAccountId: " acct-1 ", + }, + requestedChannel: "last", + }); + + expect(resolved).toEqual({ + channel: "whatsapp", + to: "+1555", + accountId: "acct-1", + threadId: undefined, + threadIdExplicit: false, + mode: "implicit", + lastChannel: "whatsapp", + lastTo: "+1555", + lastAccountId: "acct-1", + lastThreadId: undefined, + }); + }); + + it("prefers explicit targets without reusing lastTo", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-2", + updatedAt: 1, + lastChannel: "whatsapp", + lastTo: "+1555", + }, + requestedChannel: "telegram", + }); + + expect(resolved).toEqual({ + channel: "telegram", + to: undefined, + accountId: undefined, + threadId: undefined, + threadIdExplicit: false, + mode: "implicit", + lastChannel: "whatsapp", + lastTo: "+1555", + lastAccountId: undefined, + lastThreadId: undefined, + }); + }); + + it("allows mismatched lastTo when configured", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-3", + updatedAt: 1, + lastChannel: "whatsapp", + lastTo: "+1555", + }, + requestedChannel: "telegram", + allowMismatchedLastTo: true, + }); + + expect(resolved).toEqual({ + channel: "telegram", + to: "+1555", + accountId: undefined, + threadId: undefined, + threadIdExplicit: false, + mode: "implicit", + lastChannel: "whatsapp", + lastTo: "+1555", + lastAccountId: undefined, + lastThreadId: undefined, + }); + }); + + it("passes through explicitThreadId when provided", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-thread", + updatedAt: 1, + lastChannel: "telegram", + lastTo: "-100123", + lastThreadId: 999, + }, + requestedChannel: "last", + explicitThreadId: 42, + }); + + expect(resolved.threadId).toBe(42); + expect(resolved.channel).toBe("telegram"); + expect(resolved.to).toBe("-100123"); + }); + + it("uses session lastThreadId when no explicitThreadId", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-thread-2", + updatedAt: 1, + lastChannel: "telegram", + lastTo: "-100123", + lastThreadId: 999, + }, + requestedChannel: "last", + }); + + expect(resolved.threadId).toBe(999); + }); + + it("falls back to a provided channel when requested is unsupported", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-4", + updatedAt: 1, + lastChannel: "whatsapp", + lastTo: "+1555", + }, + requestedChannel: "webchat", + fallbackChannel: "slack", + }); + + expect(resolved).toEqual({ + channel: "slack", + to: undefined, + accountId: undefined, + threadId: undefined, + threadIdExplicit: false, + mode: "implicit", + lastChannel: "whatsapp", + lastTo: "+1555", + lastAccountId: undefined, + lastThreadId: undefined, + }); + }); + + it("parses :topic:NNN from explicitTo into threadId", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-topic", + updatedAt: 1, + lastChannel: "telegram", + lastTo: "63448508", + }, + requestedChannel: "last", + explicitTo: "63448508:topic:1008013", + }); + + expect(resolved.to).toBe("63448508"); + expect(resolved.threadId).toBe(1008013); + }); + + it("parses :topic:NNN even when lastTo is absent", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-no-last", + updatedAt: 1, + lastChannel: "telegram", + }, + requestedChannel: "last", + explicitTo: "63448508:topic:1008013", + }); + + expect(resolved.to).toBe("63448508"); + expect(resolved.threadId).toBe(1008013); + }); + + it("skips :topic: parsing for non-telegram channels", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-slack", + updatedAt: 1, + lastChannel: "slack", + lastTo: "C12345", + }, + requestedChannel: "last", + explicitTo: "C12345:topic:999", + }); + + expect(resolved.to).toBe("C12345:topic:999"); + expect(resolved.threadId).toBeUndefined(); + }); + + it("skips :topic: parsing when channel is explicitly non-telegram even if lastChannel was telegram", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-cross", + updatedAt: 1, + lastChannel: "telegram", + lastTo: "63448508", + }, + requestedChannel: "slack", + explicitTo: "C12345:topic:999", + }); + + expect(resolved.to).toBe("C12345:topic:999"); + expect(resolved.threadId).toBeUndefined(); + }); + + it("explicitThreadId takes priority over :topic: parsed value", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-priority", + updatedAt: 1, + lastChannel: "telegram", + lastTo: "63448508", + }, + requestedChannel: "last", + explicitTo: "63448508:topic:1008013", + explicitThreadId: 42, + }); + + expect(resolved.threadId).toBe(42); + expect(resolved.to).toBe("63448508"); + }); +}); diff --git a/src/infra/outbound/targets.ts b/src/infra/outbound/targets.ts index 4776f5110c..d6b756fc9b 100644 --- a/src/infra/outbound/targets.ts +++ b/src/infra/outbound/targets.ts @@ -5,6 +5,7 @@ import type { OpenClawConfig } from "../../config/config.js"; import type { SessionEntry } from "../../config/sessions.js"; import type { AgentDefaultsConfig } from "../../config/types.agent-defaults.js"; import { normalizeAccountId } from "../../routing/session-key.js"; +import { parseTelegramTarget } from "../../telegram/targets.js"; import { deliveryContextFromSession } from "../../utils/delivery-context.js"; import type { DeliverableMessageChannel, @@ -26,6 +27,7 @@ export type OutboundTarget = { to?: string; reason?: string; accountId?: string; + threadId?: string | number; lastChannel?: DeliverableMessageChannel; lastAccountId?: string; }; @@ -43,6 +45,8 @@ export type SessionDeliveryTarget = { to?: string; accountId?: string; threadId?: string | number; + /** Whether threadId came from an explicit source (config/param/:topic: parsing) vs session history. */ + threadIdExplicit?: boolean; mode: ChannelOutboundTargetMode; lastChannel?: DeliverableMessageChannel; lastTo?: string; @@ -75,20 +79,33 @@ export function resolveSessionDeliveryTarget(params: { ? requested : undefined; - const explicitTo = + const rawExplicitTo = typeof params.explicitTo === "string" && params.explicitTo.trim() ? params.explicitTo.trim() : undefined; - const explicitThreadId = - params.explicitThreadId != null && params.explicitThreadId !== "" - ? params.explicitThreadId - : undefined; let channel = requestedChannel === "last" ? lastChannel : requestedChannel; if (!channel && params.fallbackChannel && isDeliverableMessageChannel(params.fallbackChannel)) { channel = params.fallbackChannel; } + // Parse :topic:NNN from explicitTo (Telegram topic syntax). + // Only applies when we positively know the channel is Telegram. + // When channel is unknown, the downstream send path (resolveTelegramSession) + // handles :topic: parsing independently. + const isTelegramContext = channel === "telegram" || (!channel && lastChannel === "telegram"); + let explicitTo = rawExplicitTo; + let parsedThreadId: number | undefined; + if (isTelegramContext && rawExplicitTo && rawExplicitTo.includes(":topic:")) { + const parsed = parseTelegramTarget(rawExplicitTo); + explicitTo = parsed.chatId; + parsedThreadId = parsed.messageThreadId; + } + const explicitThreadId = + params.explicitThreadId != null && params.explicitThreadId !== "" + ? params.explicitThreadId + : parsedThreadId; + let to = explicitTo; if (!to && lastTo) { if (channel && channel === lastChannel) { @@ -102,11 +119,13 @@ export function resolveSessionDeliveryTarget(params: { const threadId = channel && channel === lastChannel ? lastThreadId : undefined; const mode = params.mode ?? (explicitTo ? "explicit" : "implicit"); + const resolvedThreadId = explicitThreadId ?? threadId; return { channel, to, accountId, - threadId: explicitThreadId ?? threadId, + threadId: resolvedThreadId, + threadIdExplicit: resolvedThreadId != null && explicitThreadId != null, mode, lastChannel, lastTo, @@ -281,6 +300,7 @@ export function resolveHeartbeatDeliveryTarget(params: { to: resolved.to, reason, accountId: effectiveAccountId, + threadId: resolvedTarget.threadId, lastChannel: resolvedTarget.lastChannel, lastAccountId: resolvedTarget.lastAccountId, };