From a9dc2547dbb755102b5ea0697cbf64575fd5e4a6 Mon Sep 17 00:00:00 2001 From: Vignesh Natarajan Date: Mon, 16 Feb 2026 14:29:21 -0800 Subject: [PATCH 1/2] Cron: route reminders by session namespace --- src/agents/tools/cron-tool.e2e.test.ts | 40 +++++++++ src/agents/tools/cron-tool.ts | 19 +++-- src/cron/normalize.test.ts | 36 +++++++++ src/cron/normalize.ts | 14 ++++ ...runs-one-shot-main-job-disables-it.test.ts | 16 +++- src/cron/service.store.migration.test.ts | 2 + src/cron/service/jobs.ts | 5 ++ src/cron/service/normalize.ts | 8 ++ src/cron/service/state.ts | 13 ++- src/cron/service/store.ts | 9 +++ src/cron/service/timer.ts | 26 +++++- src/cron/types.ts | 2 + src/gateway/protocol/schema/cron.ts | 3 + src/gateway/server-cron.ts | 81 +++++++++++++++++-- ...tbeat-runner.returns-default-unset.test.ts | 75 +++++++++++++++++ src/infra/heartbeat-runner.scheduler.test.ts | 40 +++++++++ src/infra/heartbeat-runner.ts | 75 ++++++++++++++++- src/infra/heartbeat-wake.test.ts | 32 ++++++++ src/infra/heartbeat-wake.ts | 59 ++++++++++++-- 19 files changed, 525 insertions(+), 30 deletions(-) diff --git a/src/agents/tools/cron-tool.e2e.test.ts b/src/agents/tools/cron-tool.e2e.test.ts index 9ff69eec6c..7b6d1310e4 100644 --- a/src/agents/tools/cron-tool.e2e.test.ts +++ b/src/agents/tools/cron-tool.e2e.test.ts @@ -144,6 +144,46 @@ describe("cron tool", () => { expect(call?.params?.agentId).toBeNull(); }); + it("stamps cron.add with caller sessionKey when missing", async () => { + callGatewayMock.mockResolvedValueOnce({ ok: true }); + + const callerSessionKey = "agent:main:discord:channel:ops"; + const tool = createCronTool({ agentSessionKey: callerSessionKey }); + await tool.execute("call-session-key", { + action: "add", + job: { + name: "wake-up", + schedule: { at: new Date(123).toISOString() }, + payload: { kind: "systemEvent", text: "hello" }, + }, + }); + + const call = callGatewayMock.mock.calls[0]?.[0] as { + params?: { sessionKey?: string }; + }; + expect(call?.params?.sessionKey).toBe(callerSessionKey); + }); + + it("preserves explicit job.sessionKey on add", async () => { + callGatewayMock.mockResolvedValueOnce({ ok: true }); + + const tool = createCronTool({ agentSessionKey: "agent:main:discord:channel:ops" }); + await tool.execute("call-explicit-session-key", { + action: "add", + job: { + name: "wake-up", + schedule: { at: new Date(123).toISOString() }, + sessionKey: "agent:main:telegram:group:-100123:topic:99", + payload: { kind: "systemEvent", text: "hello" }, + }, + }); + + const call = callGatewayMock.mock.calls[0]?.[0] as { + params?: { sessionKey?: string }; + }; + expect(call?.params?.sessionKey).toBe("agent:main:telegram:group:-100123:topic:99"); + }); + it("adds recent context for systemEvent reminders when contextMessages > 0", async () => { callGatewayMock .mockResolvedValueOnce({ diff --git a/src/agents/tools/cron-tool.ts b/src/agents/tools/cron-tool.ts index be5f1e9b84..662ca377ba 100644 --- a/src/agents/tools/cron-tool.ts +++ b/src/agents/tools/cron-tool.ts @@ -332,13 +332,22 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con throw new Error("job required"); } const job = normalizeCronJobCreate(params.job) ?? params.job; - if (job && typeof job === "object" && !("agentId" in job)) { + if (job && typeof job === "object") { const cfg = loadConfig(); - const agentId = opts?.agentSessionKey - ? resolveSessionAgentId({ sessionKey: opts.agentSessionKey, config: cfg }) + const { mainKey, alias } = resolveMainSessionAlias(cfg); + const resolvedSessionKey = opts?.agentSessionKey + ? resolveInternalSessionKey({ key: opts.agentSessionKey, alias, mainKey }) : undefined; - if (agentId) { - (job as { agentId?: string }).agentId = agentId; + if (!("agentId" in job)) { + const agentId = opts?.agentSessionKey + ? resolveSessionAgentId({ sessionKey: opts.agentSessionKey, config: cfg }) + : undefined; + if (agentId) { + (job as { agentId?: string }).agentId = agentId; + } + } + if (!("sessionKey" in job) && resolvedSessionKey) { + (job as { sessionKey?: string }).sessionKey = resolvedSessionKey; } } diff --git a/src/cron/normalize.test.ts b/src/cron/normalize.test.ts index efbc324138..c07066fd82 100644 --- a/src/cron/normalize.test.ts +++ b/src/cron/normalize.test.ts @@ -79,6 +79,30 @@ describe("normalizeCronJobCreate", () => { expect(cleared.agentId).toBeNull(); }); + it("trims sessionKey and drops blanks", () => { + const normalized = normalizeCronJobCreate({ + name: "session-key", + enabled: true, + schedule: { kind: "cron", expr: "* * * * *" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + sessionKey: " agent:main:discord:channel:ops ", + payload: { kind: "systemEvent", text: "hi" }, + }) as unknown as Record; + expect(normalized.sessionKey).toBe("agent:main:discord:channel:ops"); + + const cleared = normalizeCronJobCreate({ + name: "session-key-clear", + enabled: true, + schedule: { kind: "cron", expr: "* * * * *" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + sessionKey: " ", + payload: { kind: "systemEvent", text: "hi" }, + }) as unknown as Record; + expect("sessionKey" in cleared).toBe(false); + }); + it("canonicalizes payload.channel casing", () => { const normalized = normalizeCronJobCreate({ name: "legacy provider", @@ -329,4 +353,16 @@ describe("normalizeCronJobPatch", () => { expect(payload.channel).toBe("telegram"); expect(payload.to).toBe("+15550001111"); }); + + it("preserves null sessionKey patches and trims string values", () => { + const trimmed = normalizeCronJobPatch({ + sessionKey: " agent:main:telegram:group:-100123 ", + }) as unknown as Record; + expect(trimmed.sessionKey).toBe("agent:main:telegram:group:-100123"); + + const cleared = normalizeCronJobPatch({ + sessionKey: null, + }) as unknown as Record; + expect(cleared.sessionKey).toBeNull(); + }); }); diff --git a/src/cron/normalize.ts b/src/cron/normalize.ts index 2e09aefd57..9cab8a6086 100644 --- a/src/cron/normalize.ts +++ b/src/cron/normalize.ts @@ -301,6 +301,20 @@ export function normalizeCronJobInput( } } + if ("sessionKey" in base) { + const sessionKey = base.sessionKey; + if (sessionKey === null) { + next.sessionKey = null; + } else if (typeof sessionKey === "string") { + const trimmed = sessionKey.trim(); + if (trimmed) { + next.sessionKey = trimmed; + } else { + delete next.sessionKey; + } + } + } + if ("enabled" in base) { const enabled = base.enabled; if (typeof enabled === "boolean") { diff --git a/src/cron/service.runs-one-shot-main-job-disables-it.test.ts b/src/cron/service.runs-one-shot-main-job-disables-it.test.ts index 6fa4c0fa62..d64b2219aa 100644 --- a/src/cron/service.runs-one-shot-main-job-disables-it.test.ts +++ b/src/cron/service.runs-one-shot-main-job-disables-it.test.ts @@ -444,7 +444,7 @@ describe("CronService", () => { await store.cleanup(); }); - it("passes agentId to runHeartbeatOnce for main-session wakeMode now jobs", async () => { + it("passes agentId + sessionKey to runHeartbeatOnce for main-session wakeMode now jobs", async () => { ensureDir(fixturesRoot); const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); @@ -465,9 +465,11 @@ describe("CronService", () => { }); await cron.start(); + const sessionKey = "agent:ops:discord:channel:alerts"; const job = await cron.add({ name: "wakeMode now with agent", agentId: "ops", + sessionKey, enabled: true, schedule: { kind: "at", at: new Date(1).toISOString() }, sessionTarget: "main", @@ -482,12 +484,13 @@ describe("CronService", () => { expect.objectContaining({ reason: `cron:${job.id}`, agentId: "ops", + sessionKey, }), ); expect(requestHeartbeatNow).not.toHaveBeenCalled(); expect(enqueueSystemEvent).toHaveBeenCalledWith( "hello", - expect.objectContaining({ agentId: "ops" }), + expect.objectContaining({ agentId: "ops", sessionKey }), ); cron.stop(); @@ -524,8 +527,10 @@ describe("CronService", () => { }); await cron.start(); + const sessionKey = "agent:main:discord:channel:ops"; const job = await cron.add({ name: "wakeMode now fallback", + sessionKey, enabled: true, schedule: { kind: "at", at: new Date(1).toISOString() }, sessionTarget: "main", @@ -536,7 +541,12 @@ describe("CronService", () => { await cron.run(job.id, "force"); expect(runHeartbeatOnce).toHaveBeenCalled(); - expect(requestHeartbeatNow).toHaveBeenCalled(); + expect(requestHeartbeatNow).toHaveBeenCalledWith( + expect.objectContaining({ + reason: `cron:${job.id}`, + sessionKey, + }), + ); expect(job.state.lastStatus).toBe("ok"); expect(job.state.lastError).toBeUndefined(); diff --git a/src/cron/service.store.migration.test.ts b/src/cron/service.store.migration.test.ts index 9dab820d4e..14adfc9c9e 100644 --- a/src/cron/service.store.migration.test.ts +++ b/src/cron/service.store.migration.test.ts @@ -58,6 +58,7 @@ describe("cron store migration", () => { const legacyJob = { id: "job-1", agentId: undefined, + sessionKey: " agent:main:discord:channel:ops ", name: "Legacy job", description: null, enabled: true, @@ -82,6 +83,7 @@ describe("cron store migration", () => { await fs.writeFile(store.storePath, JSON.stringify({ version: 1, jobs: [legacyJob] }, null, 2)); const migrated = await migrateAndLoadFirstJob(store.storePath); + expect(migrated.sessionKey).toBe("agent:main:discord:channel:ops"); expect(migrated.delivery).toEqual({ mode: "announce", channel: "telegram", diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index 0d529843e7..0d7537abbb 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -14,6 +14,7 @@ import { computeNextRunAtMs } from "../schedule.js"; import { normalizeHttpWebhookUrl } from "../webhook-url.js"; import { normalizeOptionalAgentId, + normalizeOptionalSessionKey, normalizeOptionalText, normalizePayloadToSystemText, normalizeRequiredName, @@ -287,6 +288,7 @@ export function createJob(state: CronServiceState, input: CronJobCreate): CronJo const job: CronJob = { id, agentId: normalizeOptionalAgentId(input.agentId), + sessionKey: normalizeOptionalSessionKey((input as { sessionKey?: unknown }).sessionKey), name: normalizeRequiredName(input.name), description: normalizeOptionalText(input.description), enabled, @@ -356,6 +358,9 @@ export function applyJobPatch(job: CronJob, patch: CronJobPatch) { if ("agentId" in patch) { job.agentId = normalizeOptionalAgentId((patch as { agentId?: unknown }).agentId); } + if ("sessionKey" in patch) { + job.sessionKey = normalizeOptionalSessionKey((patch as { sessionKey?: unknown }).sessionKey); + } assertSupportedJobSpec(job); assertDeliverySupport(job); } diff --git a/src/cron/service/normalize.ts b/src/cron/service/normalize.ts index 15198c2610..1e4b172f49 100644 --- a/src/cron/service/normalize.ts +++ b/src/cron/service/normalize.ts @@ -39,6 +39,14 @@ export function normalizeOptionalAgentId(raw: unknown) { return normalizeAgentId(trimmed); } +export function normalizeOptionalSessionKey(raw: unknown) { + if (typeof raw !== "string") { + return undefined; + } + const trimmed = raw.trim(); + return trimmed || undefined; +} + export function inferLegacyName(job: { schedule?: { kind?: unknown; everyMs?: unknown; expr?: unknown }; payload?: { kind?: unknown; text?: unknown; message?: unknown }; diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index 0ba0f86ac7..8451985365 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -35,9 +35,16 @@ export type CronServiceDeps = { resolveSessionStorePath?: (agentId?: string) => string; /** Path to the session store (sessions.json) for reaper use. */ sessionStorePath?: string; - enqueueSystemEvent: (text: string, opts?: { agentId?: string; contextKey?: string }) => void; - requestHeartbeatNow: (opts?: { reason?: string }) => void; - runHeartbeatOnce?: (opts?: { reason?: string; agentId?: string }) => Promise; + enqueueSystemEvent: ( + text: string, + opts?: { agentId?: string; sessionKey?: string; contextKey?: string }, + ) => void; + requestHeartbeatNow: (opts?: { reason?: string; agentId?: string; sessionKey?: string }) => void; + runHeartbeatOnce?: (opts?: { + reason?: string; + agentId?: string; + sessionKey?: string; + }) => Promise; /** * WakeMode=now: max time to wait for runHeartbeatOnce to stop returning * { status:"skipped", reason:"requests-in-flight" } before falling back to diff --git a/src/cron/service/store.ts b/src/cron/service/store.ts index 9a171f9e07..b47d9ccc5f 100644 --- a/src/cron/service/store.ts +++ b/src/cron/service/store.ts @@ -264,6 +264,15 @@ export async function ensureLoaded( mutated = true; } + if ("sessionKey" in raw) { + const sessionKey = + typeof raw.sessionKey === "string" ? normalizeOptionalText(raw.sessionKey) : undefined; + if (raw.sessionKey !== sessionKey) { + raw.sessionKey = sessionKey; + mutated = true; + } + } + if (typeof raw.enabled !== "boolean") { raw.enabled = true; mutated = true; diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 1d26401afb..872e297176 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -441,6 +441,7 @@ async function executeJobCore( } state.deps.enqueueSystemEvent(text, { agentId: job.agentId, + sessionKey: job.sessionKey, contextKey: `cron:${job.id}`, }); if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) { @@ -452,7 +453,11 @@ async function executeJobCore( let heartbeatResult: HeartbeatRunResult; for (;;) { - heartbeatResult = await state.deps.runHeartbeatOnce({ reason, agentId: job.agentId }); + heartbeatResult = await state.deps.runHeartbeatOnce({ + reason, + agentId: job.agentId, + sessionKey: job.sessionKey, + }); if ( heartbeatResult.status !== "skipped" || heartbeatResult.reason !== "requests-in-flight" @@ -460,7 +465,11 @@ async function executeJobCore( break; } if (state.deps.nowMs() - waitStartedAt > maxWaitMs) { - state.deps.requestHeartbeatNow({ reason }); + state.deps.requestHeartbeatNow({ + reason, + agentId: job.agentId, + sessionKey: job.sessionKey, + }); return { status: "ok", summary: text }; } await delay(retryDelayMs); @@ -474,7 +483,11 @@ async function executeJobCore( return { status: "error", error: heartbeatResult.reason, summary: text }; } } else { - state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` }); + state.deps.requestHeartbeatNow({ + reason: `cron:${job.id}`, + agentId: job.agentId, + sessionKey: job.sessionKey, + }); return { status: "ok", summary: text }; } } @@ -502,10 +515,15 @@ async function executeJobCore( res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`; state.deps.enqueueSystemEvent(label, { agentId: job.agentId, + sessionKey: job.sessionKey, contextKey: `cron:${job.id}`, }); if (job.wakeMode === "now") { - state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` }); + state.deps.requestHeartbeatNow({ + reason: `cron:${job.id}`, + agentId: job.agentId, + sessionKey: job.sessionKey, + }); } } diff --git a/src/cron/types.ts b/src/cron/types.ts index 6c7e7bec02..75c65b74f5 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -68,6 +68,8 @@ export type CronJobState = { export type CronJob = { id: string; agentId?: string; + /** Origin session namespace for reminder delivery and wake routing. */ + sessionKey?: string; name: string; description?: string; enabled: boolean; diff --git a/src/gateway/protocol/schema/cron.ts b/src/gateway/protocol/schema/cron.ts index b4ce346970..0ed3d3de23 100644 --- a/src/gateway/protocol/schema/cron.ts +++ b/src/gateway/protocol/schema/cron.ts @@ -135,6 +135,7 @@ export const CronJobSchema = Type.Object( { id: NonEmptyString, agentId: Type.Optional(NonEmptyString), + sessionKey: Type.Optional(NonEmptyString), name: NonEmptyString, description: Type.Optional(Type.String()), enabled: Type.Boolean(), @@ -164,6 +165,7 @@ export const CronAddParamsSchema = Type.Object( { name: NonEmptyString, agentId: Type.Optional(Type.Union([NonEmptyString, Type.Null()])), + sessionKey: Type.Optional(Type.Union([NonEmptyString, Type.Null()])), description: Type.Optional(Type.String()), enabled: Type.Optional(Type.Boolean()), deleteAfterRun: Type.Optional(Type.Boolean()), @@ -180,6 +182,7 @@ export const CronJobPatchSchema = Type.Object( { name: Type.Optional(NonEmptyString), agentId: Type.Optional(Type.Union([NonEmptyString, Type.Null()])), + sessionKey: Type.Optional(Type.Union([NonEmptyString, Type.Null()])), description: Type.Optional(Type.String()), enabled: Type.Optional(Type.Boolean()), deleteAfterRun: Type.Optional(Type.Boolean()), diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index a5d1c711b3..1d3282d2b4 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -1,7 +1,11 @@ import type { CliDeps } from "../cli/deps.js"; import { resolveDefaultAgentId } from "../agents/agent-scope.js"; import { loadConfig } from "../config/config.js"; -import { resolveAgentMainSessionKey } from "../config/sessions.js"; +import { + canonicalizeMainSessionAlias, + resolveAgentIdFromSessionKey, + resolveAgentMainSessionKey, +} from "../config/sessions.js"; import { resolveStorePath } from "../config/sessions/paths.js"; import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js"; import { appendCronRunLog, resolveCronRunLogPath } from "../cron/run-log.js"; @@ -82,6 +86,35 @@ export function buildGatewayCronService(params: { return { agentId, cfg: runtimeConfig }; }; + const resolveCronSessionKey = (params: { + runtimeConfig: ReturnType; + agentId: string; + requestedSessionKey?: string | null; + }) => { + const requested = params.requestedSessionKey?.trim(); + if (!requested) { + return resolveAgentMainSessionKey({ + cfg: params.runtimeConfig, + agentId: params.agentId, + }); + } + const canonical = canonicalizeMainSessionAlias({ + cfg: params.runtimeConfig, + agentId: params.agentId, + sessionKey: requested, + }); + if (canonical !== "global") { + const sessionAgentId = resolveAgentIdFromSessionKey(canonical); + if (normalizeAgentId(sessionAgentId) !== normalizeAgentId(params.agentId)) { + return resolveAgentMainSessionKey({ + cfg: params.runtimeConfig, + agentId: params.agentId, + }); + } + } + return canonical; + }; + const defaultAgentId = resolveDefaultAgentId(params.cfg); const resolveSessionStorePath = (agentId?: string) => resolveStorePath(params.cfg.session?.store, { @@ -99,20 +132,58 @@ export function buildGatewayCronService(params: { sessionStorePath, enqueueSystemEvent: (text, opts) => { const { agentId, cfg: runtimeConfig } = resolveCronAgent(opts?.agentId); - const sessionKey = resolveAgentMainSessionKey({ - cfg: runtimeConfig, + const sessionKey = resolveCronSessionKey({ + runtimeConfig, agentId, + requestedSessionKey: opts?.sessionKey, }); enqueueSystemEvent(text, { sessionKey, contextKey: opts?.contextKey }); }, - requestHeartbeatNow, + requestHeartbeatNow: (opts) => { + const runtimeConfig = loadConfig(); + const requestedAgentId = opts?.agentId ? resolveCronAgent(opts.agentId).agentId : undefined; + const derivedAgentId = + requestedAgentId ?? + (opts?.sessionKey + ? normalizeAgentId(resolveAgentIdFromSessionKey(opts.sessionKey)) + : undefined); + const agentId = derivedAgentId || undefined; + const sessionKey = + opts?.sessionKey && agentId + ? resolveCronSessionKey({ + runtimeConfig, + agentId, + requestedSessionKey: opts.sessionKey, + }) + : undefined; + requestHeartbeatNow({ + reason: opts?.reason, + agentId, + sessionKey, + }); + }, runHeartbeatOnce: async (opts) => { const runtimeConfig = loadConfig(); - const agentId = opts?.agentId ? resolveCronAgent(opts.agentId).agentId : undefined; + const requestedAgentId = opts?.agentId ? resolveCronAgent(opts.agentId).agentId : undefined; + const derivedAgentId = + requestedAgentId ?? + (opts?.sessionKey + ? normalizeAgentId(resolveAgentIdFromSessionKey(opts.sessionKey)) + : undefined); + const agentId = derivedAgentId || undefined; + const sessionKey = + opts?.sessionKey && agentId + ? resolveCronSessionKey({ + runtimeConfig, + agentId, + requestedSessionKey: opts.sessionKey, + }) + : undefined; return await runHeartbeatOnce({ cfg: runtimeConfig, reason: opts?.reason, agentId, + sessionKey, deps: { ...params.deps, runtime: defaultRuntime }, }); }, diff --git a/src/infra/heartbeat-runner.returns-default-unset.test.ts b/src/infra/heartbeat-runner.returns-default-unset.test.ts index 45fb3912b6..929e41d496 100644 --- a/src/infra/heartbeat-runner.returns-default-unset.test.ts +++ b/src/infra/heartbeat-runner.returns-default-unset.test.ts @@ -708,6 +708,81 @@ describe("runHeartbeatOnce", () => { } }); + it("runs heartbeats in forced session key overrides passed at call time", async () => { + const tmpDir = await createCaseDir("hb-forced-session-override"); + const storePath = path.join(tmpDir, "sessions.json"); + const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); + try { + const cfg: OpenClawConfig = { + agents: { + defaults: { + workspace: tmpDir, + heartbeat: { + every: "5m", + target: "last", + }, + }, + }, + channels: { whatsapp: { allowFrom: ["*"] } }, + session: { store: storePath }, + }; + const mainSessionKey = resolveMainSessionKey(cfg); + const agentId = resolveAgentIdFromSessionKey(mainSessionKey); + const forcedSessionKey = buildAgentPeerSessionKey({ + agentId, + channel: "whatsapp", + peerKind: "dm", + peerId: "+15559990000", + }); + + await fs.writeFile( + storePath, + JSON.stringify({ + [mainSessionKey]: { + sessionId: "sid-main", + updatedAt: Date.now(), + lastChannel: "whatsapp", + lastTo: "+1555", + }, + [forcedSessionKey]: { + sessionId: "sid-forced", + updatedAt: Date.now() + 10_000, + lastChannel: "whatsapp", + lastTo: "+15559990000", + }, + }), + ); + + replySpy.mockResolvedValue([{ text: "Forced alert" }]); + const sendWhatsApp = vi.fn().mockResolvedValue({ + messageId: "m1", + toJid: "jid", + }); + + await runHeartbeatOnce({ + cfg, + sessionKey: forcedSessionKey, + deps: { + sendWhatsApp, + getQueueSize: () => 0, + nowMs: () => 0, + webAuthExists: async () => true, + hasActiveWebListener: () => true, + }, + }); + + expect(sendWhatsApp).toHaveBeenCalledTimes(1); + expect(sendWhatsApp).toHaveBeenCalledWith("+15559990000", "Forced alert", expect.any(Object)); + expect(replySpy).toHaveBeenCalledWith( + expect.objectContaining({ SessionKey: forcedSessionKey }), + expect.objectContaining({ isHeartbeat: true }), + cfg, + ); + } finally { + replySpy.mockRestore(); + } + }); + it("suppresses duplicate heartbeat payloads within 24h", async () => { const tmpDir = await createCaseDir("hb-dup-suppress"); const storePath = path.join(tmpDir, "sessions.json"); diff --git a/src/infra/heartbeat-runner.scheduler.test.ts b/src/infra/heartbeat-runner.scheduler.test.ts index 23c5cbb812..c6908e07b8 100644 --- a/src/infra/heartbeat-runner.scheduler.test.ts +++ b/src/infra/heartbeat-runner.scheduler.test.ts @@ -1,6 +1,7 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../config/config.js"; import { startHeartbeatRunner } from "./heartbeat-runner.js"; +import { requestHeartbeatNow, resetHeartbeatWakeStateForTests } from "./heartbeat-wake.js"; describe("startHeartbeatRunner", () => { function startDefaultRunner(runOnce: (typeof startHeartbeatRunner)[0]["runOnce"]) { @@ -13,6 +14,7 @@ describe("startHeartbeatRunner", () => { } afterEach(() => { + resetHeartbeatWakeStateForTests(); vi.useRealTimers(); vi.restoreAllMocks(); }); @@ -162,4 +164,42 @@ describe("startHeartbeatRunner", () => { runner.stop(); }); + + it("routes targeted wake requests to the requested agent/session", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date(0)); + + const runSpy = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); + const runner = startHeartbeatRunner({ + cfg: { + agents: { + defaults: { heartbeat: { every: "30m" } }, + list: [ + { id: "main", heartbeat: { every: "30m" } }, + { id: "ops", heartbeat: { every: "15m" } }, + ], + }, + } as OpenClawConfig, + runOnce: runSpy, + }); + + requestHeartbeatNow({ + reason: "cron:job-123", + agentId: "ops", + sessionKey: "agent:ops:discord:channel:alerts", + coalesceMs: 0, + }); + await vi.advanceTimersByTimeAsync(1); + + expect(runSpy).toHaveBeenCalledTimes(1); + expect(runSpy).toHaveBeenCalledWith( + expect.objectContaining({ + agentId: "ops", + reason: "cron:job-123", + sessionKey: "agent:ops:discord:channel:alerts", + }), + ); + + runner.stop(); + }); }); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index b83693c2ff..8bf970e20e 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -257,6 +257,7 @@ function resolveHeartbeatSession( cfg: OpenClawConfig, agentId?: string, heartbeat?: HeartbeatConfig, + forcedSessionKey?: string, ) { const sessionCfg = cfg.session; const scope = sessionCfg?.scope ?? "per-sender"; @@ -274,6 +275,31 @@ function resolveHeartbeatSession( return { sessionKey: mainSessionKey, storePath, store, entry: mainEntry }; } + const forced = forcedSessionKey?.trim(); + if (forced) { + const forcedCandidate = toAgentStoreSessionKey({ + agentId: resolvedAgentId, + requestKey: forced, + mainKey: cfg.session?.mainKey, + }); + const forcedCanonical = canonicalizeMainSessionAlias({ + cfg, + agentId: resolvedAgentId, + sessionKey: forcedCandidate, + }); + if (forcedCanonical !== "global") { + const sessionAgentId = resolveAgentIdFromSessionKey(forcedCanonical); + if (sessionAgentId === normalizeAgentId(resolvedAgentId)) { + return { + sessionKey: forcedCanonical, + storePath, + store, + entry: store[forcedCanonical], + }; + } + } + } + const trimmed = heartbeat?.session?.trim() ?? ""; if (!trimmed) { return { sessionKey: mainSessionKey, storePath, store, entry: mainEntry }; @@ -377,6 +403,7 @@ function normalizeHeartbeatReply( export async function runHeartbeatOnce(opts: { cfg?: OpenClawConfig; agentId?: string; + sessionKey?: string; heartbeat?: HeartbeatConfig; reason?: string; deps?: HeartbeatDeps; @@ -433,7 +460,12 @@ export async function runHeartbeatOnce(opts: { // The LLM prompt says "if it exists" so this is expected behavior. } - const { entry, sessionKey, storePath } = resolveHeartbeatSession(cfg, agentId, heartbeat); + const { entry, sessionKey, storePath } = resolveHeartbeatSession( + cfg, + agentId, + heartbeat, + opts.sessionKey, + ); const previousUpdatedAt = entry?.updatedAt; const delivery = resolveHeartbeatDeliveryTarget({ cfg, entry, heartbeat }); const heartbeatAccountId = heartbeat?.accountId?.trim(); @@ -896,11 +928,45 @@ export function startHeartbeatRunner(opts: { } const reason = params?.reason; + const requestedAgentId = params?.agentId ? normalizeAgentId(params.agentId) : undefined; + const requestedSessionKey = params?.sessionKey?.trim() || undefined; const isInterval = reason === "interval"; const startedAt = Date.now(); const now = startedAt; let ran = false; + if (requestedSessionKey || requestedAgentId) { + const targetAgentId = requestedAgentId ?? resolveAgentIdFromSessionKey(requestedSessionKey); + const targetAgent = state.agents.get(targetAgentId); + if (!targetAgent) { + scheduleNext(); + return { status: "skipped", reason: "disabled" }; + } + try { + const res = await runOnce({ + cfg: state.cfg, + agentId: targetAgent.agentId, + heartbeat: targetAgent.heartbeat, + reason, + sessionKey: requestedSessionKey, + deps: { runtime: state.runtime }, + }); + if (res.status !== "skipped" || res.reason !== "disabled") { + advanceAgentSchedule(targetAgent, now); + } + scheduleNext(); + return res.status === "ran" ? { status: "ran", durationMs: Date.now() - startedAt } : res; + } catch (err) { + const errMsg = formatErrorMessage(err); + log.error(`heartbeat runner: targeted runOnce threw unexpectedly: ${errMsg}`, { + error: errMsg, + }); + advanceAgentSchedule(targetAgent, now); + scheduleNext(); + return { status: "failed", reason: errMsg }; + } + } + for (const agent of state.agents.values()) { if (isInterval && now < agent.nextDueMs) { continue; @@ -943,7 +1009,12 @@ export function startHeartbeatRunner(opts: { return { status: "skipped", reason: isInterval ? "not-due" : "disabled" }; }; - const wakeHandler: HeartbeatWakeHandler = async (params) => run({ reason: params.reason }); + const wakeHandler: HeartbeatWakeHandler = async (params) => + run({ + reason: params.reason, + agentId: params.agentId, + sessionKey: params.sessionKey, + }); const disposeWakeHandler = setHeartbeatWakeHandler(wakeHandler); updateConfig(state.cfg); diff --git a/src/infra/heartbeat-wake.test.ts b/src/infra/heartbeat-wake.test.ts index 85a5f247e1..e0c8364b30 100644 --- a/src/infra/heartbeat-wake.test.ts +++ b/src/infra/heartbeat-wake.test.ts @@ -247,4 +247,36 @@ describe("heartbeat-wake", () => { expect(handler).toHaveBeenCalledWith({ reason: "manual" }); expect(hasPendingHeartbeatWake()).toBe(false); }); + + it("forwards wake target fields and preserves them across retries", async () => { + vi.useFakeTimers(); + const handler = vi + .fn() + .mockResolvedValueOnce({ status: "skipped", reason: "requests-in-flight" }) + .mockResolvedValueOnce({ status: "ran", durationMs: 1 }); + setHeartbeatWakeHandler(handler); + + requestHeartbeatNow({ + reason: "cron:job-1", + agentId: "ops", + sessionKey: "agent:ops:discord:channel:alerts", + coalesceMs: 0, + }); + + await vi.advanceTimersByTimeAsync(1); + expect(handler).toHaveBeenCalledTimes(1); + expect(handler.mock.calls[0]?.[0]).toEqual({ + reason: "cron:job-1", + agentId: "ops", + sessionKey: "agent:ops:discord:channel:alerts", + }); + + await vi.advanceTimersByTimeAsync(1000); + expect(handler).toHaveBeenCalledTimes(2); + expect(handler.mock.calls[1]?.[0]).toEqual({ + reason: "cron:job-1", + agentId: "ops", + sessionKey: "agent:ops:discord:channel:alerts", + }); + }); }); diff --git a/src/infra/heartbeat-wake.ts b/src/infra/heartbeat-wake.ts index 6297b5ffb6..8543abb07a 100644 --- a/src/infra/heartbeat-wake.ts +++ b/src/infra/heartbeat-wake.ts @@ -3,13 +3,19 @@ export type HeartbeatRunResult = | { status: "skipped"; reason: string } | { status: "failed"; reason: string }; -export type HeartbeatWakeHandler = (opts: { reason?: string }) => Promise; +export type HeartbeatWakeHandler = (opts: { + reason?: string; + agentId?: string; + sessionKey?: string; +}) => Promise; type WakeTimerKind = "normal" | "retry"; type PendingWakeReason = { reason: string; priority: number; requestedAt: number; + agentId?: string; + sessionKey?: string; }; let handler: HeartbeatWakeHandler | null = null; @@ -56,12 +62,25 @@ function normalizeWakeReason(reason?: string): string { return trimmed.length > 0 ? trimmed : "requested"; } -function queuePendingWakeReason(reason?: string, requestedAt = Date.now()) { - const normalizedReason = normalizeWakeReason(reason); +function normalizeWakeTarget(value?: string): string | undefined { + const trimmed = typeof value === "string" ? value.trim() : ""; + return trimmed || undefined; +} + +function queuePendingWakeReason(params?: { + reason?: string; + requestedAt?: number; + agentId?: string; + sessionKey?: string; +}) { + const requestedAt = params?.requestedAt ?? Date.now(); + const normalizedReason = normalizeWakeReason(params?.reason); const next: PendingWakeReason = { reason: normalizedReason, priority: resolveReasonPriority(normalizedReason), requestedAt, + agentId: normalizeWakeTarget(params?.agentId), + sessionKey: normalizeWakeTarget(params?.sessionKey), }; if (!pendingWake) { pendingWake = next; @@ -113,18 +132,33 @@ function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") { } const reason = pendingWake?.reason; + const agentId = pendingWake?.agentId; + const sessionKey = pendingWake?.sessionKey; pendingWake = null; running = true; try { - const res = await active({ reason: reason ?? undefined }); + const wakeOpts = { + reason: reason ?? undefined, + ...(agentId ? { agentId } : {}), + ...(sessionKey ? { sessionKey } : {}), + }; + const res = await active(wakeOpts); if (res.status === "skipped" && res.reason === "requests-in-flight") { // The main lane is busy; retry soon. - queuePendingWakeReason(reason ?? "retry"); + queuePendingWakeReason({ + reason: reason ?? "retry", + agentId, + sessionKey, + }); schedule(DEFAULT_RETRY_MS, "retry"); } } catch { // Error is already logged by the heartbeat runner; schedule a retry. - queuePendingWakeReason(reason ?? "retry"); + queuePendingWakeReason({ + reason: reason ?? "retry", + agentId, + sessionKey, + }); schedule(DEFAULT_RETRY_MS, "retry"); } finally { running = false; @@ -178,8 +212,17 @@ export function setHeartbeatWakeHandler(next: HeartbeatWakeHandler | null): () = }; } -export function requestHeartbeatNow(opts?: { reason?: string; coalesceMs?: number }) { - queuePendingWakeReason(opts?.reason); +export function requestHeartbeatNow(opts?: { + reason?: string; + coalesceMs?: number; + agentId?: string; + sessionKey?: string; +}) { + queuePendingWakeReason({ + reason: opts?.reason, + agentId: opts?.agentId, + sessionKey: opts?.sessionKey, + }); schedule(opts?.coalesceMs ?? DEFAULT_COALESCE_MS, "normal"); } From aba0d5a9dc2b599b70d45355cc8703bdaf4af0f6 Mon Sep 17 00:00:00 2001 From: Vignesh Natarajan Date: Mon, 16 Feb 2026 14:37:22 -0800 Subject: [PATCH 2/2] Cron: dedupe gateway wake target resolution --- src/gateway/server-cron.ts | 54 ++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 32 deletions(-) diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index 1d3282d2b4..0ff6b098f8 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -115,6 +115,26 @@ export function buildGatewayCronService(params: { return canonical; }; + const resolveCronWakeTarget = (opts?: { agentId?: string; sessionKey?: string | null }) => { + const runtimeConfig = loadConfig(); + const requestedAgentId = opts?.agentId ? resolveCronAgent(opts.agentId).agentId : undefined; + const derivedAgentId = + requestedAgentId ?? + (opts?.sessionKey + ? normalizeAgentId(resolveAgentIdFromSessionKey(opts.sessionKey)) + : undefined); + const agentId = derivedAgentId || undefined; + const sessionKey = + opts?.sessionKey && agentId + ? resolveCronSessionKey({ + runtimeConfig, + agentId, + requestedSessionKey: opts.sessionKey, + }) + : undefined; + return { runtimeConfig, agentId, sessionKey }; + }; + const defaultAgentId = resolveDefaultAgentId(params.cfg); const resolveSessionStorePath = (agentId?: string) => resolveStorePath(params.cfg.session?.store, { @@ -140,22 +160,7 @@ export function buildGatewayCronService(params: { enqueueSystemEvent(text, { sessionKey, contextKey: opts?.contextKey }); }, requestHeartbeatNow: (opts) => { - const runtimeConfig = loadConfig(); - const requestedAgentId = opts?.agentId ? resolveCronAgent(opts.agentId).agentId : undefined; - const derivedAgentId = - requestedAgentId ?? - (opts?.sessionKey - ? normalizeAgentId(resolveAgentIdFromSessionKey(opts.sessionKey)) - : undefined); - const agentId = derivedAgentId || undefined; - const sessionKey = - opts?.sessionKey && agentId - ? resolveCronSessionKey({ - runtimeConfig, - agentId, - requestedSessionKey: opts.sessionKey, - }) - : undefined; + const { agentId, sessionKey } = resolveCronWakeTarget(opts); requestHeartbeatNow({ reason: opts?.reason, agentId, @@ -163,22 +168,7 @@ export function buildGatewayCronService(params: { }); }, runHeartbeatOnce: async (opts) => { - const runtimeConfig = loadConfig(); - const requestedAgentId = opts?.agentId ? resolveCronAgent(opts.agentId).agentId : undefined; - const derivedAgentId = - requestedAgentId ?? - (opts?.sessionKey - ? normalizeAgentId(resolveAgentIdFromSessionKey(opts.sessionKey)) - : undefined); - const agentId = derivedAgentId || undefined; - const sessionKey = - opts?.sessionKey && agentId - ? resolveCronSessionKey({ - runtimeConfig, - agentId, - requestedSessionKey: opts.sessionKey, - }) - : undefined; + const { runtimeConfig, agentId, sessionKey } = resolveCronWakeTarget(opts); return await runHeartbeatOnce({ cfg: runtimeConfig, reason: opts?.reason,