mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-03 03:03:24 -04:00
fix: reset stale execution state after SIGUSR1 in-process restart (#15195)
Merged via /review-pr -> /prepare-pr -> /merge-pr.
Prepared head SHA: 676f9ec451
Co-authored-by: joeykrug <5925937+joeykrug@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
@@ -23,6 +23,7 @@ import {
|
||||
enqueueCommandInLane,
|
||||
getActiveTaskCount,
|
||||
getQueueSize,
|
||||
resetAllLanes,
|
||||
setCommandLaneConcurrency,
|
||||
waitForActiveTasks,
|
||||
} from "./command-queue.js";
|
||||
@@ -36,6 +37,12 @@ describe("command queue", () => {
|
||||
diagnosticMocks.diag.error.mockClear();
|
||||
});
|
||||
|
||||
it("resetAllLanes is safe when no lanes have been created", () => {
|
||||
expect(getActiveTaskCount()).toBe(0);
|
||||
expect(() => resetAllLanes()).not.toThrow();
|
||||
expect(getActiveTaskCount()).toBe(0);
|
||||
});
|
||||
|
||||
it("runs tasks one at a time in order", async () => {
|
||||
let active = 0;
|
||||
let maxActive = 0;
|
||||
@@ -162,6 +169,49 @@ describe("command queue", () => {
|
||||
await task;
|
||||
});
|
||||
|
||||
it("resetAllLanes drains queued work immediately after reset", async () => {
|
||||
const lane = `reset-test-${Date.now()}-${Math.random().toString(16).slice(2)}`;
|
||||
setCommandLaneConcurrency(lane, 1);
|
||||
|
||||
let resolve1!: () => void;
|
||||
const blocker = new Promise<void>((r) => {
|
||||
resolve1 = r;
|
||||
});
|
||||
|
||||
// Start a task that blocks the lane
|
||||
const task1 = enqueueCommandInLane(lane, async () => {
|
||||
await blocker;
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(getActiveTaskCount()).toBeGreaterThanOrEqual(1);
|
||||
});
|
||||
|
||||
// Enqueue another task — it should be stuck behind the blocker
|
||||
let task2Ran = false;
|
||||
const task2 = enqueueCommandInLane(lane, async () => {
|
||||
task2Ran = true;
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(getQueueSize(lane)).toBeGreaterThanOrEqual(2);
|
||||
});
|
||||
expect(task2Ran).toBe(false);
|
||||
|
||||
// Simulate SIGUSR1: reset all lanes. Queued work (task2) should be
|
||||
// drained immediately — no fresh enqueue needed.
|
||||
resetAllLanes();
|
||||
|
||||
// Complete the stale in-flight task; generation mismatch makes its
|
||||
// completion path a no-op for queue bookkeeping.
|
||||
resolve1();
|
||||
await task1;
|
||||
|
||||
// task2 should have been pumped by resetAllLanes's drain pass.
|
||||
await task2;
|
||||
expect(task2Ran).toBe(true);
|
||||
});
|
||||
|
||||
it("waitForActiveTasks ignores tasks that start after the call", async () => {
|
||||
const lane = `drain-snapshot-${Date.now()}-${Math.random().toString(16).slice(2)}`;
|
||||
setCommandLaneConcurrency(lane, 2);
|
||||
|
||||
@@ -29,10 +29,10 @@ type QueueEntry = {
|
||||
type LaneState = {
|
||||
lane: string;
|
||||
queue: QueueEntry[];
|
||||
active: number;
|
||||
activeTaskIds: Set<number>;
|
||||
maxConcurrent: number;
|
||||
draining: boolean;
|
||||
generation: number;
|
||||
};
|
||||
|
||||
const lanes = new Map<string, LaneState>();
|
||||
@@ -46,15 +46,23 @@ function getLaneState(lane: string): LaneState {
|
||||
const created: LaneState = {
|
||||
lane,
|
||||
queue: [],
|
||||
active: 0,
|
||||
activeTaskIds: new Set(),
|
||||
maxConcurrent: 1,
|
||||
draining: false,
|
||||
generation: 0,
|
||||
};
|
||||
lanes.set(lane, created);
|
||||
return created;
|
||||
}
|
||||
|
||||
function completeTask(state: LaneState, taskId: number, taskGeneration: number): boolean {
|
||||
if (taskGeneration !== state.generation) {
|
||||
return false;
|
||||
}
|
||||
state.activeTaskIds.delete(taskId);
|
||||
return true;
|
||||
}
|
||||
|
||||
function drainLane(lane: string) {
|
||||
const state = getLaneState(lane);
|
||||
if (state.draining) {
|
||||
@@ -63,7 +71,7 @@ function drainLane(lane: string) {
|
||||
state.draining = true;
|
||||
|
||||
const pump = () => {
|
||||
while (state.active < state.maxConcurrent && state.queue.length > 0) {
|
||||
while (state.activeTaskIds.size < state.maxConcurrent && state.queue.length > 0) {
|
||||
const entry = state.queue.shift() as QueueEntry;
|
||||
const waitedMs = Date.now() - entry.enqueuedAt;
|
||||
if (waitedMs >= entry.warnAfterMs) {
|
||||
@@ -74,29 +82,31 @@ function drainLane(lane: string) {
|
||||
}
|
||||
logLaneDequeue(lane, waitedMs, state.queue.length);
|
||||
const taskId = nextTaskId++;
|
||||
state.active += 1;
|
||||
const taskGeneration = state.generation;
|
||||
state.activeTaskIds.add(taskId);
|
||||
void (async () => {
|
||||
const startTime = Date.now();
|
||||
try {
|
||||
const result = await entry.task();
|
||||
state.active -= 1;
|
||||
state.activeTaskIds.delete(taskId);
|
||||
diag.debug(
|
||||
`lane task done: lane=${lane} durationMs=${Date.now() - startTime} active=${state.active} queued=${state.queue.length}`,
|
||||
);
|
||||
pump();
|
||||
const completedCurrentGeneration = completeTask(state, taskId, taskGeneration);
|
||||
if (completedCurrentGeneration) {
|
||||
diag.debug(
|
||||
`lane task done: lane=${lane} durationMs=${Date.now() - startTime} active=${state.activeTaskIds.size} queued=${state.queue.length}`,
|
||||
);
|
||||
pump();
|
||||
}
|
||||
entry.resolve(result);
|
||||
} catch (err) {
|
||||
state.active -= 1;
|
||||
state.activeTaskIds.delete(taskId);
|
||||
const completedCurrentGeneration = completeTask(state, taskId, taskGeneration);
|
||||
const isProbeLane = lane.startsWith("auth-probe:") || lane.startsWith("session:probe-");
|
||||
if (!isProbeLane) {
|
||||
diag.error(
|
||||
`lane task error: lane=${lane} durationMs=${Date.now() - startTime} error="${String(err)}"`,
|
||||
);
|
||||
}
|
||||
pump();
|
||||
if (completedCurrentGeneration) {
|
||||
pump();
|
||||
}
|
||||
entry.reject(err);
|
||||
}
|
||||
})();
|
||||
@@ -134,7 +144,7 @@ export function enqueueCommandInLane<T>(
|
||||
warnAfterMs,
|
||||
onWait: opts?.onWait,
|
||||
});
|
||||
logLaneEnqueue(cleaned, state.queue.length + state.active);
|
||||
logLaneEnqueue(cleaned, state.queue.length + state.activeTaskIds.size);
|
||||
drainLane(cleaned);
|
||||
});
|
||||
}
|
||||
@@ -155,13 +165,13 @@ export function getQueueSize(lane: string = CommandLane.Main) {
|
||||
if (!state) {
|
||||
return 0;
|
||||
}
|
||||
return state.queue.length + state.active;
|
||||
return state.queue.length + state.activeTaskIds.size;
|
||||
}
|
||||
|
||||
export function getTotalQueueSize() {
|
||||
let total = 0;
|
||||
for (const s of lanes.values()) {
|
||||
total += s.queue.length + s.active;
|
||||
total += s.queue.length + s.activeTaskIds.size;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
@@ -180,6 +190,36 @@ export function clearCommandLane(lane: string = CommandLane.Main) {
|
||||
return removed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset all lane runtime state to idle. Used after SIGUSR1 in-process
|
||||
* restarts where interrupted tasks' finally blocks may not run, leaving
|
||||
* stale active task IDs that permanently block new work from draining.
|
||||
*
|
||||
* Bumps lane generation and clears execution counters so stale completions
|
||||
* from old in-flight tasks are ignored. Queued entries are intentionally
|
||||
* preserved — they represent pending user work that should still execute
|
||||
* after restart.
|
||||
*
|
||||
* After resetting, drains any lanes that still have queued entries so
|
||||
* preserved work is pumped immediately rather than waiting for a future
|
||||
* `enqueueCommandInLane()` call (which may never come).
|
||||
*/
|
||||
export function resetAllLanes(): void {
|
||||
const lanesToDrain: string[] = [];
|
||||
for (const state of lanes.values()) {
|
||||
state.generation += 1;
|
||||
state.activeTaskIds.clear();
|
||||
state.draining = false;
|
||||
if (state.queue.length > 0) {
|
||||
lanesToDrain.push(state.lane);
|
||||
}
|
||||
}
|
||||
// Drain after the full reset pass so all lanes are in a clean state first.
|
||||
for (const lane of lanesToDrain) {
|
||||
drainLane(lane);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the total number of actively executing tasks across all lanes
|
||||
* (excludes queued-but-not-started entries).
|
||||
@@ -187,7 +227,7 @@ export function clearCommandLane(lane: string = CommandLane.Main) {
|
||||
export function getActiveTaskCount(): number {
|
||||
let total = 0;
|
||||
for (const s of lanes.values()) {
|
||||
total += s.active;
|
||||
total += s.activeTaskIds.size;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
18
src/process/restart-recovery.test.ts
Normal file
18
src/process/restart-recovery.test.ts
Normal file
@@ -0,0 +1,18 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { createRestartIterationHook } from "./restart-recovery.js";
|
||||
|
||||
describe("restart-recovery", () => {
|
||||
it("skips recovery on first iteration and runs on subsequent iterations", () => {
|
||||
const onRestart = vi.fn();
|
||||
const onIteration = createRestartIterationHook(onRestart);
|
||||
|
||||
expect(onIteration()).toBe(false);
|
||||
expect(onRestart).not.toHaveBeenCalled();
|
||||
|
||||
expect(onIteration()).toBe(true);
|
||||
expect(onRestart).toHaveBeenCalledTimes(1);
|
||||
|
||||
expect(onIteration()).toBe(true);
|
||||
expect(onRestart).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
16
src/process/restart-recovery.ts
Normal file
16
src/process/restart-recovery.ts
Normal file
@@ -0,0 +1,16 @@
|
||||
/**
|
||||
* Returns an iteration hook for in-process restart loops.
|
||||
* The first call is considered initial startup and does nothing.
|
||||
* Each subsequent call represents a restart iteration and invokes `onRestart`.
|
||||
*/
|
||||
export function createRestartIterationHook(onRestart: () => void): () => boolean {
|
||||
let isFirstIteration = true;
|
||||
return () => {
|
||||
if (isFirstIteration) {
|
||||
isFirstIteration = false;
|
||||
return false;
|
||||
}
|
||||
onRestart();
|
||||
return true;
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user