refactor: unify gateway restart deferral and dispatcher cleanup

This commit is contained in:
Peter Steinberger
2026-02-14 00:38:10 +01:00
parent 51296e770c
commit ad57e561c6
10 changed files with 337 additions and 155 deletions

View File

@@ -0,0 +1,61 @@
import { describe, expect, it, vi } from "vitest";
import type { ReplyDispatcher } from "./reply/reply-dispatcher.js";
import { withReplyDispatcher } from "./dispatch.js";
function createDispatcher(record: string[]): ReplyDispatcher {
return {
sendToolResult: () => true,
sendBlockReply: () => true,
sendFinalReply: () => true,
getQueuedCounts: () => ({ tool: 0, block: 0, final: 0 }),
markComplete: () => {
record.push("markComplete");
},
waitForIdle: async () => {
record.push("waitForIdle");
},
};
}
describe("withReplyDispatcher", () => {
it("always marks complete and waits for idle after success", async () => {
const order: string[] = [];
const dispatcher = createDispatcher(order);
const result = await withReplyDispatcher({
dispatcher,
run: async () => {
order.push("run");
return "ok";
},
onSettled: () => {
order.push("onSettled");
},
});
expect(result).toBe("ok");
expect(order).toEqual(["run", "markComplete", "waitForIdle", "onSettled"]);
});
it("still drains dispatcher after run throws", async () => {
const order: string[] = [];
const dispatcher = createDispatcher(order);
const onSettled = vi.fn(() => {
order.push("onSettled");
});
await expect(
withReplyDispatcher({
dispatcher,
run: async () => {
order.push("run");
throw new Error("boom");
},
onSettled,
}),
).rejects.toThrow("boom");
expect(onSettled).toHaveBeenCalledTimes(1);
expect(order).toEqual(["run", "markComplete", "waitForIdle", "onSettled"]);
});
});

View File

@@ -14,6 +14,24 @@ import {
export type DispatchInboundResult = DispatchFromConfigResult;
export async function withReplyDispatcher<T>(params: {
dispatcher: ReplyDispatcher;
run: () => Promise<T>;
onSettled?: () => void | Promise<void>;
}): Promise<T> {
try {
return await params.run();
} finally {
// Ensure dispatcher reservations are always released on every exit path.
params.dispatcher.markComplete();
try {
await params.dispatcher.waitForIdle();
} finally {
await params.onSettled?.();
}
}
}
export async function dispatchInboundMessage(params: {
ctx: MsgContext | FinalizedMsgContext;
cfg: OpenClawConfig;
@@ -41,20 +59,23 @@ export async function dispatchInboundMessageWithBufferedDispatcher(params: {
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping(
params.dispatcherOptions,
);
const result = await dispatchInboundMessage({
ctx: params.ctx,
cfg: params.cfg,
return await withReplyDispatcher({
dispatcher,
replyResolver: params.replyResolver,
replyOptions: {
...params.replyOptions,
...replyOptions,
run: async () =>
dispatchInboundMessage({
ctx: params.ctx,
cfg: params.cfg,
dispatcher,
replyResolver: params.replyResolver,
replyOptions: {
...params.replyOptions,
...replyOptions,
},
}),
onSettled: () => {
markDispatchIdle();
},
});
markDispatchIdle();
return result;
}
export async function dispatchInboundMessageWithDispatcher(params: {
@@ -65,13 +86,15 @@ export async function dispatchInboundMessageWithDispatcher(params: {
replyResolver?: typeof import("./reply.js").getReplyFromConfig;
}): Promise<DispatchInboundResult> {
const dispatcher = createReplyDispatcher(params.dispatcherOptions);
const result = await dispatchInboundMessage({
ctx: params.ctx,
cfg: params.cfg,
return await withReplyDispatcher({
dispatcher,
replyResolver: params.replyResolver,
replyOptions: params.replyOptions,
run: async () =>
dispatchInboundMessage({
ctx: params.ctx,
cfg: params.cfg,
dispatcher,
replyResolver: params.replyResolver,
replyOptions: params.replyOptions,
}),
});
await dispatcher.waitForIdle();
return result;
}

View File

@@ -5,6 +5,7 @@ const acquireGatewayLock = vi.fn(async () => ({
}));
const consumeGatewaySigusr1RestartAuthorization = vi.fn(() => true);
const isGatewaySigusr1RestartExternallyAllowed = vi.fn(() => false);
const markGatewaySigusr1RestartHandled = vi.fn();
const getActiveTaskCount = vi.fn(() => 0);
const waitForActiveTasks = vi.fn(async () => ({ drained: true }));
const resetAllLanes = vi.fn();
@@ -22,6 +23,7 @@ vi.mock("../../infra/gateway-lock.js", () => ({
vi.mock("../../infra/restart.js", () => ({
consumeGatewaySigusr1RestartAuthorization: () => consumeGatewaySigusr1RestartAuthorization(),
isGatewaySigusr1RestartExternallyAllowed: () => isGatewaySigusr1RestartExternallyAllowed(),
markGatewaySigusr1RestartHandled: () => markGatewaySigusr1RestartHandled(),
}));
vi.mock("../../process/command-queue.js", () => ({
@@ -100,6 +102,7 @@ describe("runGatewayLoop", () => {
reason: "gateway restarting",
restartExpectedMs: 1500,
});
expect(markGatewaySigusr1RestartHandled).toHaveBeenCalledTimes(1);
expect(resetAllLanes).toHaveBeenCalledTimes(1);
process.emit("SIGUSR1");
@@ -109,6 +112,7 @@ describe("runGatewayLoop", () => {
reason: "gateway restarting",
restartExpectedMs: 1500,
});
expect(markGatewaySigusr1RestartHandled).toHaveBeenCalledTimes(2);
expect(resetAllLanes).toHaveBeenCalledTimes(2);
} finally {
removeNewSignalListeners("SIGTERM", beforeSigterm);

View File

@@ -4,6 +4,7 @@ import { acquireGatewayLock } from "../../infra/gateway-lock.js";
import {
consumeGatewaySigusr1RestartAuthorization,
isGatewaySigusr1RestartExternallyAllowed,
markGatewaySigusr1RestartHandled,
} from "../../infra/restart.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import {
@@ -108,6 +109,7 @@ export async function runGatewayLoop(params: {
);
return;
}
markGatewaySigusr1RestartHandled();
request("restart", "SIGUSR1");
};

View File

@@ -6,7 +6,7 @@ import type { GatewayRequestContext, GatewayRequestHandlers } from "./types.js";
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
import { resolveThinkingDefault } from "../../agents/model-selection.js";
import { resolveAgentTimeoutMs } from "../../agents/timeout.js";
import { dispatchInboundMessage } from "../../auto-reply/dispatch.js";
import { dispatchInboundMessage, withReplyDispatcher } from "../../auto-reply/dispatch.js";
import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js";
import { createReplyPrefixOptions } from "../../channels/reply-prefix.js";
import { resolveSessionFilePath } from "../../config/sessions.js";
@@ -524,36 +524,40 @@ export const chatHandlers: GatewayRequestHandlers = {
});
let agentRunStarted = false;
void dispatchInboundMessage({
ctx,
cfg,
void withReplyDispatcher({
dispatcher,
replyOptions: {
runId: clientRunId,
abortSignal: abortController.signal,
images: parsedImages.length > 0 ? parsedImages : undefined,
disableBlockStreaming: true,
onAgentRunStart: (runId) => {
agentRunStarted = true;
const connId = typeof client?.connId === "string" ? client.connId : undefined;
const wantsToolEvents = hasGatewayClientCap(
client?.connect?.caps,
GATEWAY_CLIENT_CAPS.TOOL_EVENTS,
);
if (connId && wantsToolEvents) {
context.registerToolEventRecipient(runId, connId);
// Register for any other active runs *in the same session* so
// late-joining clients (e.g. page refresh mid-response) receive
// in-progress tool events without leaking cross-session data.
for (const [activeRunId, active] of context.chatAbortControllers) {
if (activeRunId !== runId && active.sessionKey === p.sessionKey) {
context.registerToolEventRecipient(activeRunId, connId);
run: () =>
dispatchInboundMessage({
ctx,
cfg,
dispatcher,
replyOptions: {
runId: clientRunId,
abortSignal: abortController.signal,
images: parsedImages.length > 0 ? parsedImages : undefined,
disableBlockStreaming: true,
onAgentRunStart: (runId) => {
agentRunStarted = true;
const connId = typeof client?.connId === "string" ? client.connId : undefined;
const wantsToolEvents = hasGatewayClientCap(
client?.connect?.caps,
GATEWAY_CLIENT_CAPS.TOOL_EVENTS,
);
if (connId && wantsToolEvents) {
context.registerToolEventRecipient(runId, connId);
// Register for any other active runs *in the same session* so
// late-joining clients (e.g. page refresh mid-response) receive
// in-progress tool events without leaking cross-session data.
for (const [activeRunId, active] of context.chatAbortControllers) {
if (activeRunId !== runId && active.sessionKey === p.sessionKey) {
context.registerToolEventRecipient(activeRunId, connId);
}
}
}
}
}
},
onModelSelected,
},
},
onModelSelected,
},
}),
})
.then(() => {
if (!agentRunStarted) {

View File

@@ -8,7 +8,11 @@ import { resolveAgentMaxConcurrent, resolveSubagentMaxConcurrent } from "../conf
import { startGmailWatcher, stopGmailWatcher } from "../hooks/gmail-watcher.js";
import { isTruthyEnvValue } from "../infra/env.js";
import { resetDirectoryCache } from "../infra/outbound/target-resolver.js";
import { emitGatewayRestart, setGatewaySigusr1RestartPolicy } from "../infra/restart.js";
import {
deferGatewayRestartUntilIdle,
emitGatewayRestart,
setGatewaySigusr1RestartPolicy,
} from "../infra/restart.js";
import { setCommandLaneConcurrency, getTotalQueueSize } from "../process/command-queue.js";
import { CommandLane } from "../process/lanes.js";
import { resolveHooksConfig } from "./hooks.js";
@@ -155,13 +159,33 @@ export function createGatewayReloadHandlers(params: {
return;
}
// Check if there are active operations (commands in queue, pending replies, or embedded runs)
const queueSize = getTotalQueueSize();
const pendingReplies = getTotalPendingReplies();
const embeddedRuns = getActiveEmbeddedRunCount();
const totalActive = queueSize + pendingReplies + embeddedRuns;
const getActiveCounts = () => {
const queueSize = getTotalQueueSize();
const pendingReplies = getTotalPendingReplies();
const embeddedRuns = getActiveEmbeddedRunCount();
return {
queueSize,
pendingReplies,
embeddedRuns,
totalActive: queueSize + pendingReplies + embeddedRuns,
};
};
const formatActiveDetails = (counts: ReturnType<typeof getActiveCounts>) => {
const details = [];
if (counts.queueSize > 0) {
details.push(`${counts.queueSize} operation(s)`);
}
if (counts.pendingReplies > 0) {
details.push(`${counts.pendingReplies} reply(ies)`);
}
if (counts.embeddedRuns > 0) {
details.push(`${counts.embeddedRuns} embedded run(s)`);
}
return details;
};
const active = getActiveCounts();
if (totalActive > 0) {
if (active.totalActive > 0) {
// Avoid spinning up duplicate polling loops from repeated config changes.
if (restartPending) {
params.logReload.info(
@@ -170,63 +194,40 @@ export function createGatewayReloadHandlers(params: {
return;
}
restartPending = true;
const details = [];
if (queueSize > 0) {
details.push(`${queueSize} queued operation(s)`);
}
if (pendingReplies > 0) {
details.push(`${pendingReplies} pending reply(ies)`);
}
if (embeddedRuns > 0) {
details.push(`${embeddedRuns} embedded run(s)`);
}
const initialDetails = formatActiveDetails(active);
params.logReload.warn(
`config change requires gateway restart (${reasons}) — deferring until ${details.join(", ")} complete`,
`config change requires gateway restart (${reasons}) — deferring until ${initialDetails.join(", ")} complete`,
);
// Wait for all operations and replies to complete before restarting (max 30 seconds)
const maxWaitMs = 30_000;
const checkIntervalMs = 500;
const startTime = Date.now();
const checkAndRestart = () => {
const currentQueueSize = getTotalQueueSize();
const currentPendingReplies = getTotalPendingReplies();
const currentEmbeddedRuns = getActiveEmbeddedRunCount();
const currentTotalActive = currentQueueSize + currentPendingReplies + currentEmbeddedRuns;
const elapsed = Date.now() - startTime;
if (currentTotalActive === 0) {
restartPending = false;
params.logReload.info("all operations and replies completed; restarting gateway now");
emitGatewayRestart();
} else if (elapsed >= maxWaitMs) {
const remainingDetails = [];
if (currentQueueSize > 0) {
remainingDetails.push(`${currentQueueSize} operation(s)`);
}
if (currentPendingReplies > 0) {
remainingDetails.push(`${currentPendingReplies} reply(ies)`);
}
if (currentEmbeddedRuns > 0) {
remainingDetails.push(`${currentEmbeddedRuns} embedded run(s)`);
}
restartPending = false;
params.logReload.warn(
`restart timeout after ${elapsed}ms with ${remainingDetails.join(", ")} still active; restarting anyway`,
);
emitGatewayRestart();
} else {
// Check again soon
setTimeout(checkAndRestart, checkIntervalMs);
}
};
setTimeout(checkAndRestart, checkIntervalMs);
deferGatewayRestartUntilIdle({
getPendingCount: () => getActiveCounts().totalActive,
hooks: {
onReady: () => {
restartPending = false;
params.logReload.info("all operations and replies completed; restarting gateway now");
},
onTimeout: (_pending, elapsedMs) => {
const remaining = formatActiveDetails(getActiveCounts());
restartPending = false;
params.logReload.warn(
`restart timeout after ${elapsedMs}ms with ${remaining.join(", ")} still active; restarting anyway`,
);
},
onCheckError: (err) => {
restartPending = false;
params.logReload.warn(
`restart deferral check failed (${String(err)}); restarting gateway now`,
);
},
},
});
} else {
// No active operations or pending replies, restart immediately
params.logReload.warn(`config change requires gateway restart (${reasons})`);
emitGatewayRestart();
const emitted = emitGatewayRestart();
if (!emitted) {
params.logReload.info("gateway restart already scheduled; skipping duplicate signal");
}
}
};

View File

@@ -3,7 +3,7 @@ import type { IMessagePayload, MonitorIMessageOpts } from "./types.js";
import { resolveHumanDelayConfig } from "../../agents/identity.js";
import { resolveTextChunkLimit } from "../../auto-reply/chunk.js";
import { hasControlCommand } from "../../auto-reply/command-detection.js";
import { dispatchInboundMessage } from "../../auto-reply/dispatch.js";
import { dispatchInboundMessage, withReplyDispatcher } from "../../auto-reply/dispatch.js";
import {
formatInboundEnvelope,
formatInboundFromLabel,
@@ -647,17 +647,21 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
},
});
const { queuedFinal } = await dispatchInboundMessage({
ctx: ctxPayload,
cfg,
const { queuedFinal } = await withReplyDispatcher({
dispatcher,
replyOptions: {
disableBlockStreaming:
typeof accountInfo.config.blockStreaming === "boolean"
? !accountInfo.config.blockStreaming
: undefined,
onModelSelected,
},
run: () =>
dispatchInboundMessage({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
disableBlockStreaming:
typeof accountInfo.config.blockStreaming === "boolean"
? !accountInfo.config.blockStreaming
: undefined,
onModelSelected,
},
}),
});
if (!queuedFinal) {

View File

@@ -6,7 +6,9 @@ import { ensureBinary } from "./binaries.js";
import {
__testing,
consumeGatewaySigusr1RestartAuthorization,
emitGatewayRestart,
isGatewaySigusr1RestartExternallyAllowed,
markGatewaySigusr1RestartHandled,
scheduleGatewaySigusr1Restart,
setGatewaySigusr1RestartPolicy,
setPreRestartDeferralCheck,
@@ -100,6 +102,25 @@ describe("infra runtime", () => {
setGatewaySigusr1RestartPolicy({ allowExternal: true });
expect(isGatewaySigusr1RestartExternallyAllowed()).toBe(true);
});
it("suppresses duplicate emit until the restart cycle is marked handled", () => {
const emitSpy = vi.spyOn(process, "emit");
const handler = () => {};
process.on("SIGUSR1", handler);
try {
expect(emitGatewayRestart()).toBe(true);
expect(emitGatewayRestart()).toBe(false);
expect(consumeGatewaySigusr1RestartAuthorization()).toBe(true);
markGatewaySigusr1RestartHandled();
expect(emitGatewayRestart()).toBe(true);
const sigusr1Emits = emitSpy.mock.calls.filter((args) => args[0] === "SIGUSR1");
expect(sigusr1Emits.length).toBe(2);
} finally {
process.removeListener("SIGUSR1", handler);
}
});
});
describe("pre-restart deferral check", () => {

View File

@@ -13,12 +13,20 @@ export type RestartAttempt = {
const SPAWN_TIMEOUT_MS = 2000;
const SIGUSR1_AUTH_GRACE_MS = 5000;
const DEFAULT_DEFERRAL_POLL_MS = 500;
const DEFAULT_DEFERRAL_MAX_WAIT_MS = 30_000;
let sigusr1AuthorizedCount = 0;
let sigusr1AuthorizedUntil = 0;
let sigusr1ExternalAllowed = false;
let preRestartCheck: (() => number) | null = null;
let sigusr1Emitted = false;
let restartCycleToken = 0;
let emittedRestartToken = 0;
let consumedRestartToken = 0;
function hasUnconsumedRestartSignal(): boolean {
return emittedRestartToken > consumedRestartToken;
}
/**
* Register a callback that scheduleGatewaySigusr1Restart checks before emitting SIGUSR1.
@@ -35,10 +43,11 @@ export function setPreRestartDeferralCheck(fn: () => number): void {
* to ensure only one restart fires.
*/
export function emitGatewayRestart(): boolean {
if (sigusr1Emitted) {
if (hasUnconsumedRestartSignal()) {
return false;
}
sigusr1Emitted = true;
const cycleToken = ++restartCycleToken;
emittedRestartToken = cycleToken;
authorizeGatewaySigusr1Restart();
try {
if (process.listenerCount("SIGUSR1") > 0) {
@@ -47,7 +56,9 @@ export function emitGatewayRestart(): boolean {
process.kill(process.pid, "SIGUSR1");
}
} catch {
/* ignore */
// Roll back the cycle marker so future restart requests can still proceed.
emittedRestartToken = consumedRestartToken;
return false;
}
return true;
}
@@ -85,10 +96,6 @@ export function consumeGatewaySigusr1RestartAuthorization(): boolean {
if (sigusr1AuthorizedCount <= 0) {
return false;
}
// Reset the emission guard so the next restart cycle can fire.
// The run loop re-enters startGatewayServer() after close(), which
// re-registers setPreRestartDeferralCheck and can schedule new restarts.
sigusr1Emitted = false;
sigusr1AuthorizedCount -= 1;
if (sigusr1AuthorizedCount <= 0) {
sigusr1AuthorizedUntil = 0;
@@ -96,6 +103,80 @@ export function consumeGatewaySigusr1RestartAuthorization(): boolean {
return true;
}
/**
* Mark the currently emitted SIGUSR1 restart cycle as consumed by the run loop.
* This explicitly advances the cycle state instead of resetting emit guards inside
* consumeGatewaySigusr1RestartAuthorization().
*/
export function markGatewaySigusr1RestartHandled(): void {
if (hasUnconsumedRestartSignal()) {
consumedRestartToken = emittedRestartToken;
}
}
export type RestartDeferralHooks = {
onDeferring?: (pending: number) => void;
onReady?: () => void;
onTimeout?: (pending: number, elapsedMs: number) => void;
onCheckError?: (err: unknown) => void;
};
/**
* Poll pending work until it drains (or times out), then emit one restart signal.
* Shared by both the direct RPC restart path and the config watcher path.
*/
export function deferGatewayRestartUntilIdle(opts: {
getPendingCount: () => number;
hooks?: RestartDeferralHooks;
pollMs?: number;
maxWaitMs?: number;
}): void {
const pollMsRaw = opts.pollMs ?? DEFAULT_DEFERRAL_POLL_MS;
const pollMs = Math.max(10, Math.floor(pollMsRaw));
const maxWaitMsRaw = opts.maxWaitMs ?? DEFAULT_DEFERRAL_MAX_WAIT_MS;
const maxWaitMs = Math.max(pollMs, Math.floor(maxWaitMsRaw));
let pending: number;
try {
pending = opts.getPendingCount();
} catch (err) {
opts.hooks?.onCheckError?.(err);
emitGatewayRestart();
return;
}
if (pending <= 0) {
opts.hooks?.onReady?.();
emitGatewayRestart();
return;
}
opts.hooks?.onDeferring?.(pending);
const startedAt = Date.now();
const poll = setInterval(() => {
let current: number;
try {
current = opts.getPendingCount();
} catch (err) {
clearInterval(poll);
opts.hooks?.onCheckError?.(err);
emitGatewayRestart();
return;
}
if (current <= 0) {
clearInterval(poll);
opts.hooks?.onReady?.();
emitGatewayRestart();
return;
}
const elapsedMs = Date.now() - startedAt;
if (elapsedMs >= maxWaitMs) {
clearInterval(poll);
opts.hooks?.onTimeout?.(current, elapsedMs);
emitGatewayRestart();
}
}, pollMs);
}
function formatSpawnDetail(result: {
error?: unknown;
status?: number | null;
@@ -227,40 +308,14 @@ export function scheduleGatewaySigusr1Restart(opts?: {
typeof opts?.reason === "string" && opts.reason.trim()
? opts.reason.trim().slice(0, 200)
: undefined;
const DEFERRAL_POLL_MS = 500;
const DEFERRAL_MAX_WAIT_MS = 30_000;
setTimeout(() => {
if (!preRestartCheck) {
const pendingCheck = preRestartCheck;
if (!pendingCheck) {
emitGatewayRestart();
return;
}
let pending: number;
try {
pending = preRestartCheck();
} catch {
emitGatewayRestart();
return;
}
if (pending <= 0) {
emitGatewayRestart();
return;
}
// Poll until pending work drains or timeout
let waited = 0;
const poll = setInterval(() => {
waited += DEFERRAL_POLL_MS;
let current: number;
try {
current = preRestartCheck!();
} catch {
current = 0;
}
if (current <= 0 || waited >= DEFERRAL_MAX_WAIT_MS) {
clearInterval(poll);
emitGatewayRestart();
}
}, DEFERRAL_POLL_MS);
deferGatewayRestartUntilIdle({ getPendingCount: pendingCheck });
}, delayMs);
return {
ok: true,
@@ -278,6 +333,8 @@ export const __testing = {
sigusr1AuthorizedUntil = 0;
sigusr1ExternalAllowed = false;
preRestartCheck = null;
sigusr1Emitted = false;
restartCycleToken = 0;
emittedRestartToken = 0;
consumedRestartToken = 0;
},
};

View File

@@ -49,7 +49,11 @@ async function main() {
{ setGatewayWsLogStyle },
{ setVerbose },
{ acquireGatewayLock, GatewayLockError },
{ consumeGatewaySigusr1RestartAuthorization, isGatewaySigusr1RestartExternallyAllowed },
{
consumeGatewaySigusr1RestartAuthorization,
isGatewaySigusr1RestartExternallyAllowed,
markGatewaySigusr1RestartHandled,
},
{ defaultRuntime },
{ enableConsoleCapture, setConsoleTimestampPrefix },
commandQueueMod,
@@ -201,6 +205,7 @@ async function main() {
);
return;
}
markGatewaySigusr1RestartHandled();
request("restart", "SIGUSR1");
};