From 5147656d6533158904f8eccafa0449a6f94fabcf Mon Sep 17 00:00:00 2001 From: Joseph Krug Date: Thu, 12 Feb 2026 16:38:46 -0400 Subject: [PATCH] fix: prevent heartbeat scheduler death when runOnce throws (#14901) Merged via /review-pr -> /prepare-pr -> /merge-pr. Prepared head SHA: 022efbfef959f4c4225d7ab1a49540c8f39accd3 Co-authored-by: joeykrug <5925937+joeykrug@users.noreply.github.com> Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com> Reviewed-by: @gumadeiras --- CHANGELOG.md | 1 + src/infra/heartbeat-runner.scheduler.test.ts | 63 +++++++++++++ src/infra/heartbeat-runner.ts | 34 +++++-- src/infra/heartbeat-wake.test.ts | 98 ++++++++++++++++++++ 4 files changed, 187 insertions(+), 9 deletions(-) create mode 100644 src/infra/heartbeat-wake.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index ee9ffdde4b..b433974298 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ Docs: https://docs.openclaw.ai - Cron: prevent duplicate fires when multiple jobs trigger simultaneously. (#14256) Thanks @xinhuagu. - Cron: isolate scheduler errors so one bad job does not break all jobs. (#14385) Thanks @MarvinDontPanic. - Cron: prevent one-shot `at` jobs from re-firing on restart after skipped/errored runs. (#13878) Thanks @lailoo. +- Heartbeat: prevent scheduler stalls on unexpected run errors and avoid immediate rerun loops after `requests-in-flight` skips. (#14901) Thanks @joeykrug. - WhatsApp: convert Markdown bold/strikethrough to WhatsApp formatting. (#14285) Thanks @Raikan10. - WhatsApp: allow media-only sends and normalize leading blank payloads. (#14408) Thanks @karimnaguib. - WhatsApp: default MIME type for voice messages when Baileys omits it. (#14444) Thanks @mcaxtr. diff --git a/src/infra/heartbeat-runner.scheduler.test.ts b/src/infra/heartbeat-runner.scheduler.test.ts index e95058880a..e1923371ac 100644 --- a/src/infra/heartbeat-runner.scheduler.test.ts +++ b/src/infra/heartbeat-runner.scheduler.test.ts @@ -54,4 +54,67 @@ describe("startHeartbeatRunner", () => { runner.stop(); }); + + it("continues scheduling after runOnce throws an unhandled error", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date(0)); + + let callCount = 0; + const runSpy = vi.fn().mockImplementation(async () => { + callCount++; + if (callCount === 1) { + // First call throws (simulates crash during session compaction) + throw new Error("session compaction error"); + } + return { status: "ran", durationMs: 1 }; + }); + + const runner = startHeartbeatRunner({ + cfg: { + agents: { defaults: { heartbeat: { every: "30m" } } }, + } as OpenClawConfig, + runOnce: runSpy, + }); + + // First heartbeat fires and throws + await vi.advanceTimersByTimeAsync(30 * 60_000 + 1_000); + expect(runSpy).toHaveBeenCalledTimes(1); + + // Second heartbeat should still fire (scheduler must not be dead) + await vi.advanceTimersByTimeAsync(30 * 60_000 + 1_000); + expect(runSpy).toHaveBeenCalledTimes(2); + + runner.stop(); + }); + + it("reschedules timer when runOnce returns requests-in-flight", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date(0)); + + let callCount = 0; + const runSpy = vi.fn().mockImplementation(async () => { + callCount++; + if (callCount === 1) { + return { status: "skipped", reason: "requests-in-flight" }; + } + return { status: "ran", durationMs: 1 }; + }); + + const runner = startHeartbeatRunner({ + cfg: { + agents: { defaults: { heartbeat: { every: "30m" } } }, + } as OpenClawConfig, + runOnce: runSpy, + }); + + // First heartbeat returns requests-in-flight + await vi.advanceTimersByTimeAsync(30 * 60_000 + 1_000); + expect(runSpy).toHaveBeenCalledTimes(1); + + // Timer should be rescheduled; next heartbeat should still fire + await vi.advanceTimersByTimeAsync(30 * 60_000 + 1_000); + expect(runSpy).toHaveBeenCalledTimes(2); + + runner.stop(); + }); }); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index a51a8ec563..1771875c04 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -797,6 +797,11 @@ export function startHeartbeatRunner(opts: { return now + intervalMs; }; + const advanceAgentSchedule = (agent: HeartbeatAgentState, now: number) => { + agent.lastRunMs = now; + agent.nextDueMs = now + agent.intervalMs; + }; + const scheduleNext = () => { if (state.stopped) { return; @@ -897,19 +902,30 @@ export function startHeartbeatRunner(opts: { continue; } - const res = await runOnce({ - cfg: state.cfg, - agentId: agent.agentId, - heartbeat: agent.heartbeat, - reason, - deps: { runtime: state.runtime }, - }); + let res: HeartbeatRunResult; + try { + res = await runOnce({ + cfg: state.cfg, + agentId: agent.agentId, + heartbeat: agent.heartbeat, + reason, + deps: { runtime: state.runtime }, + }); + } catch (err) { + // If runOnce throws (e.g. during session compaction), we must still + // advance the timer and call scheduleNext so heartbeats keep firing. + const errMsg = formatErrorMessage(err); + log.error(`heartbeat runner: runOnce threw unexpectedly: ${errMsg}`, { error: errMsg }); + advanceAgentSchedule(agent, now); + continue; + } if (res.status === "skipped" && res.reason === "requests-in-flight") { + advanceAgentSchedule(agent, now); + scheduleNext(); return res; } if (res.status !== "skipped" || res.reason !== "disabled") { - agent.lastRunMs = now; - agent.nextDueMs = now + agent.intervalMs; + advanceAgentSchedule(agent, now); } if (res.status === "ran") { ran = true; diff --git a/src/infra/heartbeat-wake.test.ts b/src/infra/heartbeat-wake.test.ts new file mode 100644 index 0000000000..cd703ed406 --- /dev/null +++ b/src/infra/heartbeat-wake.test.ts @@ -0,0 +1,98 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; + +async function loadWakeModule() { + vi.resetModules(); + return import("./heartbeat-wake.js"); +} + +describe("heartbeat-wake", () => { + afterEach(() => { + vi.useRealTimers(); + vi.restoreAllMocks(); + }); + + it("coalesces multiple wake requests into one run", async () => { + vi.useFakeTimers(); + const wake = await loadWakeModule(); + const handler = vi.fn().mockResolvedValue({ status: "skipped", reason: "disabled" }); + wake.setHeartbeatWakeHandler(handler); + + wake.requestHeartbeatNow({ reason: "interval", coalesceMs: 200 }); + wake.requestHeartbeatNow({ reason: "exec-event", coalesceMs: 200 }); + wake.requestHeartbeatNow({ reason: "retry", coalesceMs: 200 }); + + expect(wake.hasPendingHeartbeatWake()).toBe(true); + + await vi.advanceTimersByTimeAsync(199); + expect(handler).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(1); + expect(handler).toHaveBeenCalledTimes(1); + expect(handler).toHaveBeenCalledWith({ reason: "retry" }); + expect(wake.hasPendingHeartbeatWake()).toBe(false); + }); + + it("retries requests-in-flight after the default retry delay", async () => { + vi.useFakeTimers(); + const wake = await loadWakeModule(); + const handler = vi + .fn() + .mockResolvedValueOnce({ status: "skipped", reason: "requests-in-flight" }) + .mockResolvedValueOnce({ status: "ran", durationMs: 1 }); + wake.setHeartbeatWakeHandler(handler); + + wake.requestHeartbeatNow({ reason: "interval", coalesceMs: 0 }); + + await vi.advanceTimersByTimeAsync(1); + expect(handler).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(500); + expect(handler).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(500); + expect(handler).toHaveBeenCalledTimes(2); + expect(handler.mock.calls[1]?.[0]).toEqual({ reason: "interval" }); + }); + + it("retries thrown handler errors after the default retry delay", async () => { + vi.useFakeTimers(); + const wake = await loadWakeModule(); + const handler = vi + .fn() + .mockRejectedValueOnce(new Error("boom")) + .mockResolvedValueOnce({ status: "skipped", reason: "disabled" }); + wake.setHeartbeatWakeHandler(handler); + + wake.requestHeartbeatNow({ reason: "exec-event", coalesceMs: 0 }); + + await vi.advanceTimersByTimeAsync(1); + expect(handler).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(500); + expect(handler).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(500); + expect(handler).toHaveBeenCalledTimes(2); + expect(handler.mock.calls[1]?.[0]).toEqual({ reason: "exec-event" }); + }); + + it("drains pending wake once a handler is registered", async () => { + vi.useFakeTimers(); + const wake = await loadWakeModule(); + + wake.requestHeartbeatNow({ reason: "manual", coalesceMs: 0 }); + await vi.advanceTimersByTimeAsync(1); + expect(wake.hasPendingHeartbeatWake()).toBe(true); + + const handler = vi.fn().mockResolvedValue({ status: "skipped", reason: "disabled" }); + wake.setHeartbeatWakeHandler(handler); + + await vi.advanceTimersByTimeAsync(249); + expect(handler).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(1); + expect(handler).toHaveBeenCalledTimes(1); + expect(handler).toHaveBeenCalledWith({ reason: "manual" }); + expect(wake.hasPendingHeartbeatWake()).toBe(false); + }); +});