fix: harden heartbeat wake scheduling and add retry regressions (openclaw#14901) thanks @joeykrug

This commit is contained in:
Gustavo Madeira Santana
2026-02-12 15:33:06 -05:00
parent 69bc49b2ae
commit 022efbfef9
3 changed files with 107 additions and 6 deletions

View File

@@ -28,6 +28,7 @@ Docs: https://docs.openclaw.ai
- Cron: prevent duplicate fires when multiple jobs trigger simultaneously. (#14256) Thanks @xinhuagu.
- Cron: isolate scheduler errors so one bad job does not break all jobs. (#14385) Thanks @MarvinDontPanic.
- Cron: prevent one-shot `at` jobs from re-firing on restart after skipped/errored runs. (#13878) Thanks @lailoo.
- Heartbeat: prevent scheduler stalls on unexpected run errors and avoid immediate rerun loops after `requests-in-flight` skips. (#14901) Thanks @joeykrug.
- WhatsApp: convert Markdown bold/strikethrough to WhatsApp formatting. (#14285) Thanks @Raikan10.
- WhatsApp: allow media-only sends and normalize leading blank payloads. (#14408) Thanks @karimnaguib.
- WhatsApp: default MIME type for voice messages when Baileys omits it. (#14444) Thanks @mcaxtr.

View File

@@ -797,6 +797,11 @@ export function startHeartbeatRunner(opts: {
return now + intervalMs;
};
const advanceAgentSchedule = (agent: HeartbeatAgentState, now: number) => {
agent.lastRunMs = now;
agent.nextDueMs = now + agent.intervalMs;
};
const scheduleNext = () => {
if (state.stopped) {
return;
@@ -911,19 +916,16 @@ export function startHeartbeatRunner(opts: {
// advance the timer and call scheduleNext so heartbeats keep firing.
const errMsg = formatErrorMessage(err);
log.error(`heartbeat runner: runOnce threw unexpectedly: ${errMsg}`, { error: errMsg });
agent.lastRunMs = now;
agent.nextDueMs = now + agent.intervalMs;
advanceAgentSchedule(agent, now);
continue;
}
if (res.status === "skipped" && res.reason === "requests-in-flight") {
agent.lastRunMs = now;
agent.nextDueMs = now + agent.intervalMs;
advanceAgentSchedule(agent, now);
scheduleNext();
return res;
}
if (res.status !== "skipped" || res.reason !== "disabled") {
agent.lastRunMs = now;
agent.nextDueMs = now + agent.intervalMs;
advanceAgentSchedule(agent, now);
}
if (res.status === "ran") {
ran = true;

View File

@@ -0,0 +1,98 @@
import { afterEach, describe, expect, it, vi } from "vitest";
async function loadWakeModule() {
vi.resetModules();
return import("./heartbeat-wake.js");
}
describe("heartbeat-wake", () => {
afterEach(() => {
vi.useRealTimers();
vi.restoreAllMocks();
});
it("coalesces multiple wake requests into one run", async () => {
vi.useFakeTimers();
const wake = await loadWakeModule();
const handler = vi.fn().mockResolvedValue({ status: "skipped", reason: "disabled" });
wake.setHeartbeatWakeHandler(handler);
wake.requestHeartbeatNow({ reason: "interval", coalesceMs: 200 });
wake.requestHeartbeatNow({ reason: "exec-event", coalesceMs: 200 });
wake.requestHeartbeatNow({ reason: "retry", coalesceMs: 200 });
expect(wake.hasPendingHeartbeatWake()).toBe(true);
await vi.advanceTimersByTimeAsync(199);
expect(handler).not.toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(1);
expect(handler).toHaveBeenCalledTimes(1);
expect(handler).toHaveBeenCalledWith({ reason: "retry" });
expect(wake.hasPendingHeartbeatWake()).toBe(false);
});
it("retries requests-in-flight after the default retry delay", async () => {
vi.useFakeTimers();
const wake = await loadWakeModule();
const handler = vi
.fn()
.mockResolvedValueOnce({ status: "skipped", reason: "requests-in-flight" })
.mockResolvedValueOnce({ status: "ran", durationMs: 1 });
wake.setHeartbeatWakeHandler(handler);
wake.requestHeartbeatNow({ reason: "interval", coalesceMs: 0 });
await vi.advanceTimersByTimeAsync(1);
expect(handler).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(500);
expect(handler).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(500);
expect(handler).toHaveBeenCalledTimes(2);
expect(handler.mock.calls[1]?.[0]).toEqual({ reason: "interval" });
});
it("retries thrown handler errors after the default retry delay", async () => {
vi.useFakeTimers();
const wake = await loadWakeModule();
const handler = vi
.fn()
.mockRejectedValueOnce(new Error("boom"))
.mockResolvedValueOnce({ status: "skipped", reason: "disabled" });
wake.setHeartbeatWakeHandler(handler);
wake.requestHeartbeatNow({ reason: "exec-event", coalesceMs: 0 });
await vi.advanceTimersByTimeAsync(1);
expect(handler).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(500);
expect(handler).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(500);
expect(handler).toHaveBeenCalledTimes(2);
expect(handler.mock.calls[1]?.[0]).toEqual({ reason: "exec-event" });
});
it("drains pending wake once a handler is registered", async () => {
vi.useFakeTimers();
const wake = await loadWakeModule();
wake.requestHeartbeatNow({ reason: "manual", coalesceMs: 0 });
await vi.advanceTimersByTimeAsync(1);
expect(wake.hasPendingHeartbeatWake()).toBe(true);
const handler = vi.fn().mockResolvedValue({ status: "skipped", reason: "disabled" });
wake.setHeartbeatWakeHandler(handler);
await vi.advanceTimersByTimeAsync(249);
expect(handler).not.toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(1);
expect(handler).toHaveBeenCalledTimes(1);
expect(handler).toHaveBeenCalledWith({ reason: "manual" });
expect(wake.hasPendingHeartbeatWake()).toBe(false);
});
});