refactor: share restart iteration hook and drain macOS restarts

This commit is contained in:
Gustavo Madeira Santana
2026-02-13 15:25:52 -05:00
parent 5a9d9b3324
commit 676f9ec451
4 changed files with 76 additions and 21 deletions

View File

@@ -11,6 +11,7 @@ import {
resetAllLanes,
waitForActiveTasks,
} from "../../process/command-queue.js";
import { createRestartIterationHook } from "../../process/restart-recovery.js";
const gatewayLog = createSubsystemLogger("gateway");
@@ -115,21 +116,21 @@ export async function runGatewayLoop(params: {
process.on("SIGUSR1", onSigusr1);
try {
const onIteration = createRestartIterationHook(() => {
// After an in-process restart (SIGUSR1), reset command-queue lane state.
// Interrupted tasks from the previous lifecycle may have left `active`
// counts elevated (their finally blocks never ran), permanently blocking
// new work from draining. This must happen here — at the restart
// coordinator level — rather than inside individual subsystem init
// functions, to avoid surprising cross-cutting side effects.
resetAllLanes();
});
// Keep process alive; SIGUSR1 triggers an in-process restart (no supervisor required).
// SIGTERM/SIGINT still exit after a graceful shutdown.
// eslint-disable-next-line no-constant-condition
let isFirstIteration = true;
while (true) {
if (!isFirstIteration) {
// After an in-process restart (SIGUSR1), reset command-queue lane state.
// Interrupted tasks from the previous lifecycle may have left `active`
// counts elevated (their finally blocks never ran), permanently blocking
// new work from draining. This must happen here — at the restart
// coordinator level — rather than inside individual subsystem init
// functions, to avoid surprising cross-cutting side effects.
resetAllLanes();
}
isFirstIteration = false;
onIteration();
server = await params.start();
await new Promise<void>((resolve) => {
restartResolver = resolve;

View File

@@ -53,6 +53,7 @@ async function main() {
{ defaultRuntime },
{ enableConsoleCapture, setConsoleTimestampPrefix },
commandQueueMod,
{ createRestartIterationHook },
] = await Promise.all([
import("../config/config.js"),
import("../gateway/server.js"),
@@ -63,6 +64,7 @@ async function main() {
import("../runtime.js"),
import("../logging.js"),
import("../process/command-queue.js"),
import("../process/restart-recovery.js"),
] as const);
enableConsoleCapture();
@@ -134,14 +136,32 @@ async function main() {
`gateway: received ${signal}; ${isRestart ? "restarting" : "shutting down"}`,
);
const DRAIN_TIMEOUT_MS = 30_000;
const SHUTDOWN_TIMEOUT_MS = 5_000;
const forceExitMs = isRestart ? DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS : SHUTDOWN_TIMEOUT_MS;
forceExitTimer = setTimeout(() => {
defaultRuntime.error("gateway: shutdown timed out; exiting without full cleanup");
cleanupSignals();
process.exit(0);
}, 5000);
}, forceExitMs);
void (async () => {
try {
if (isRestart) {
const activeTasks = commandQueueMod.getActiveTaskCount();
if (activeTasks > 0) {
defaultRuntime.log(
`gateway: draining ${activeTasks} active task(s) before restart (timeout ${DRAIN_TIMEOUT_MS}ms)`,
);
const { drained } = await commandQueueMod.waitForActiveTasks(DRAIN_TIMEOUT_MS);
if (drained) {
defaultRuntime.log("gateway: all active tasks drained");
} else {
defaultRuntime.log("gateway: drain timeout reached; proceeding with restart");
}
}
}
await server?.close({
reason: isRestart ? "gateway restarting" : "gateway stopping",
restartExpectedMs: isRestart ? 1500 : null,
@@ -198,17 +218,17 @@ async function main() {
}
throw err;
}
const onIteration = createRestartIterationHook(() => {
// After an in-process restart (SIGUSR1), reset command-queue lane state.
// Interrupted tasks from the previous lifecycle may have left `active`
// counts elevated (their finally blocks never ran), permanently blocking
// new work from draining.
commandQueueMod.resetAllLanes();
});
// eslint-disable-next-line no-constant-condition
let isFirstIteration = true;
while (true) {
if (!isFirstIteration) {
// After an in-process restart (SIGUSR1), reset command-queue lane state.
// Interrupted tasks from the previous lifecycle may have left `active`
// counts elevated (their finally blocks never ran), permanently blocking
// new work from draining.
commandQueueMod.resetAllLanes();
}
isFirstIteration = false;
onIteration();
try {
server = await startGatewayServer(port, { bind });
} catch (err) {

View File

@@ -0,0 +1,18 @@
import { describe, expect, it, vi } from "vitest";
import { createRestartIterationHook } from "./restart-recovery.js";
describe("restart-recovery", () => {
it("skips recovery on first iteration and runs on subsequent iterations", () => {
const onRestart = vi.fn();
const onIteration = createRestartIterationHook(onRestart);
expect(onIteration()).toBe(false);
expect(onRestart).not.toHaveBeenCalled();
expect(onIteration()).toBe(true);
expect(onRestart).toHaveBeenCalledTimes(1);
expect(onIteration()).toBe(true);
expect(onRestart).toHaveBeenCalledTimes(2);
});
});

View File

@@ -0,0 +1,16 @@
/**
* Returns an iteration hook for in-process restart loops.
* The first call is considered initial startup and does nothing.
* Each subsequent call represents a restart iteration and invokes `onRestart`.
*/
export function createRestartIterationHook(onRestart: () => void): () => boolean {
let isFirstIteration = true;
return () => {
if (isFirstIteration) {
isFirstIteration = false;
return false;
}
onRestart();
return true;
};
}