From 022efbfef959f4c4225d7ab1a49540c8f39accd3 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Thu, 12 Feb 2026 15:33:06 -0500 Subject: [PATCH] fix: harden heartbeat wake scheduling and add retry regressions (openclaw#14901) thanks @joeykrug --- CHANGELOG.md | 1 + src/infra/heartbeat-runner.ts | 14 +++-- src/infra/heartbeat-wake.test.ts | 98 ++++++++++++++++++++++++++++++++ 3 files changed, 107 insertions(+), 6 deletions(-) create mode 100644 src/infra/heartbeat-wake.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index ee9ffdde4b..b433974298 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ Docs: https://docs.openclaw.ai - Cron: prevent duplicate fires when multiple jobs trigger simultaneously. (#14256) Thanks @xinhuagu. - Cron: isolate scheduler errors so one bad job does not break all jobs. (#14385) Thanks @MarvinDontPanic. - Cron: prevent one-shot `at` jobs from re-firing on restart after skipped/errored runs. (#13878) Thanks @lailoo. +- Heartbeat: prevent scheduler stalls on unexpected run errors and avoid immediate rerun loops after `requests-in-flight` skips. (#14901) Thanks @joeykrug. - WhatsApp: convert Markdown bold/strikethrough to WhatsApp formatting. (#14285) Thanks @Raikan10. - WhatsApp: allow media-only sends and normalize leading blank payloads. (#14408) Thanks @karimnaguib. - WhatsApp: default MIME type for voice messages when Baileys omits it. (#14444) Thanks @mcaxtr. diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 44defb25e8..1771875c04 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -797,6 +797,11 @@ export function startHeartbeatRunner(opts: { return now + intervalMs; }; + const advanceAgentSchedule = (agent: HeartbeatAgentState, now: number) => { + agent.lastRunMs = now; + agent.nextDueMs = now + agent.intervalMs; + }; + const scheduleNext = () => { if (state.stopped) { return; @@ -911,19 +916,16 @@ export function startHeartbeatRunner(opts: { // advance the timer and call scheduleNext so heartbeats keep firing. const errMsg = formatErrorMessage(err); log.error(`heartbeat runner: runOnce threw unexpectedly: ${errMsg}`, { error: errMsg }); - agent.lastRunMs = now; - agent.nextDueMs = now + agent.intervalMs; + advanceAgentSchedule(agent, now); continue; } if (res.status === "skipped" && res.reason === "requests-in-flight") { - agent.lastRunMs = now; - agent.nextDueMs = now + agent.intervalMs; + advanceAgentSchedule(agent, now); scheduleNext(); return res; } if (res.status !== "skipped" || res.reason !== "disabled") { - agent.lastRunMs = now; - agent.nextDueMs = now + agent.intervalMs; + advanceAgentSchedule(agent, now); } if (res.status === "ran") { ran = true; diff --git a/src/infra/heartbeat-wake.test.ts b/src/infra/heartbeat-wake.test.ts new file mode 100644 index 0000000000..cd703ed406 --- /dev/null +++ b/src/infra/heartbeat-wake.test.ts @@ -0,0 +1,98 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; + +async function loadWakeModule() { + vi.resetModules(); + return import("./heartbeat-wake.js"); +} + +describe("heartbeat-wake", () => { + afterEach(() => { + vi.useRealTimers(); + vi.restoreAllMocks(); + }); + + it("coalesces multiple wake requests into one run", async () => { + vi.useFakeTimers(); + const wake = await loadWakeModule(); + const handler = vi.fn().mockResolvedValue({ status: "skipped", reason: "disabled" }); + wake.setHeartbeatWakeHandler(handler); + + wake.requestHeartbeatNow({ reason: "interval", coalesceMs: 200 }); + wake.requestHeartbeatNow({ reason: "exec-event", coalesceMs: 200 }); + wake.requestHeartbeatNow({ reason: "retry", coalesceMs: 200 }); + + expect(wake.hasPendingHeartbeatWake()).toBe(true); + + await vi.advanceTimersByTimeAsync(199); + expect(handler).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(1); + expect(handler).toHaveBeenCalledTimes(1); + expect(handler).toHaveBeenCalledWith({ reason: "retry" }); + expect(wake.hasPendingHeartbeatWake()).toBe(false); + }); + + it("retries requests-in-flight after the default retry delay", async () => { + vi.useFakeTimers(); + const wake = await loadWakeModule(); + const handler = vi + .fn() + .mockResolvedValueOnce({ status: "skipped", reason: "requests-in-flight" }) + .mockResolvedValueOnce({ status: "ran", durationMs: 1 }); + wake.setHeartbeatWakeHandler(handler); + + wake.requestHeartbeatNow({ reason: "interval", coalesceMs: 0 }); + + await vi.advanceTimersByTimeAsync(1); + expect(handler).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(500); + expect(handler).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(500); + expect(handler).toHaveBeenCalledTimes(2); + expect(handler.mock.calls[1]?.[0]).toEqual({ reason: "interval" }); + }); + + it("retries thrown handler errors after the default retry delay", async () => { + vi.useFakeTimers(); + const wake = await loadWakeModule(); + const handler = vi + .fn() + .mockRejectedValueOnce(new Error("boom")) + .mockResolvedValueOnce({ status: "skipped", reason: "disabled" }); + wake.setHeartbeatWakeHandler(handler); + + wake.requestHeartbeatNow({ reason: "exec-event", coalesceMs: 0 }); + + await vi.advanceTimersByTimeAsync(1); + expect(handler).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(500); + expect(handler).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(500); + expect(handler).toHaveBeenCalledTimes(2); + expect(handler.mock.calls[1]?.[0]).toEqual({ reason: "exec-event" }); + }); + + it("drains pending wake once a handler is registered", async () => { + vi.useFakeTimers(); + const wake = await loadWakeModule(); + + wake.requestHeartbeatNow({ reason: "manual", coalesceMs: 0 }); + await vi.advanceTimersByTimeAsync(1); + expect(wake.hasPendingHeartbeatWake()).toBe(true); + + const handler = vi.fn().mockResolvedValue({ status: "skipped", reason: "disabled" }); + wake.setHeartbeatWakeHandler(handler); + + await vi.advanceTimersByTimeAsync(249); + expect(handler).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(1); + expect(handler).toHaveBeenCalledTimes(1); + expect(handler).toHaveBeenCalledWith({ reason: "manual" }); + expect(wake.hasPendingHeartbeatWake()).toBe(false); + }); +});