diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 77f746d35c..ec582fdcb8 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -11,6 +11,7 @@ import { resetAllLanes, waitForActiveTasks, } from "../../process/command-queue.js"; +import { createRestartIterationHook } from "../../process/restart-recovery.js"; const gatewayLog = createSubsystemLogger("gateway"); @@ -115,21 +116,21 @@ export async function runGatewayLoop(params: { process.on("SIGUSR1", onSigusr1); try { + const onIteration = createRestartIterationHook(() => { + // After an in-process restart (SIGUSR1), reset command-queue lane state. + // Interrupted tasks from the previous lifecycle may have left `active` + // counts elevated (their finally blocks never ran), permanently blocking + // new work from draining. This must happen here — at the restart + // coordinator level — rather than inside individual subsystem init + // functions, to avoid surprising cross-cutting side effects. + resetAllLanes(); + }); + // Keep process alive; SIGUSR1 triggers an in-process restart (no supervisor required). // SIGTERM/SIGINT still exit after a graceful shutdown. // eslint-disable-next-line no-constant-condition - let isFirstIteration = true; while (true) { - if (!isFirstIteration) { - // After an in-process restart (SIGUSR1), reset command-queue lane state. - // Interrupted tasks from the previous lifecycle may have left `active` - // counts elevated (their finally blocks never ran), permanently blocking - // new work from draining. This must happen here — at the restart - // coordinator level — rather than inside individual subsystem init - // functions, to avoid surprising cross-cutting side effects. - resetAllLanes(); - } - isFirstIteration = false; + onIteration(); server = await params.start(); await new Promise((resolve) => { restartResolver = resolve; diff --git a/src/macos/gateway-daemon.ts b/src/macos/gateway-daemon.ts index 908008c99e..38fd5485ff 100644 --- a/src/macos/gateway-daemon.ts +++ b/src/macos/gateway-daemon.ts @@ -53,6 +53,7 @@ async function main() { { defaultRuntime }, { enableConsoleCapture, setConsoleTimestampPrefix }, commandQueueMod, + { createRestartIterationHook }, ] = await Promise.all([ import("../config/config.js"), import("../gateway/server.js"), @@ -63,6 +64,7 @@ async function main() { import("../runtime.js"), import("../logging.js"), import("../process/command-queue.js"), + import("../process/restart-recovery.js"), ] as const); enableConsoleCapture(); @@ -134,14 +136,32 @@ async function main() { `gateway: received ${signal}; ${isRestart ? "restarting" : "shutting down"}`, ); + const DRAIN_TIMEOUT_MS = 30_000; + const SHUTDOWN_TIMEOUT_MS = 5_000; + const forceExitMs = isRestart ? DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS : SHUTDOWN_TIMEOUT_MS; forceExitTimer = setTimeout(() => { defaultRuntime.error("gateway: shutdown timed out; exiting without full cleanup"); cleanupSignals(); process.exit(0); - }, 5000); + }, forceExitMs); void (async () => { try { + if (isRestart) { + const activeTasks = commandQueueMod.getActiveTaskCount(); + if (activeTasks > 0) { + defaultRuntime.log( + `gateway: draining ${activeTasks} active task(s) before restart (timeout ${DRAIN_TIMEOUT_MS}ms)`, + ); + const { drained } = await commandQueueMod.waitForActiveTasks(DRAIN_TIMEOUT_MS); + if (drained) { + defaultRuntime.log("gateway: all active tasks drained"); + } else { + defaultRuntime.log("gateway: drain timeout reached; proceeding with restart"); + } + } + } + await server?.close({ reason: isRestart ? "gateway restarting" : "gateway stopping", restartExpectedMs: isRestart ? 1500 : null, @@ -198,17 +218,17 @@ async function main() { } throw err; } + const onIteration = createRestartIterationHook(() => { + // After an in-process restart (SIGUSR1), reset command-queue lane state. + // Interrupted tasks from the previous lifecycle may have left `active` + // counts elevated (their finally blocks never ran), permanently blocking + // new work from draining. + commandQueueMod.resetAllLanes(); + }); + // eslint-disable-next-line no-constant-condition - let isFirstIteration = true; while (true) { - if (!isFirstIteration) { - // After an in-process restart (SIGUSR1), reset command-queue lane state. - // Interrupted tasks from the previous lifecycle may have left `active` - // counts elevated (their finally blocks never ran), permanently blocking - // new work from draining. - commandQueueMod.resetAllLanes(); - } - isFirstIteration = false; + onIteration(); try { server = await startGatewayServer(port, { bind }); } catch (err) { diff --git a/src/process/restart-recovery.test.ts b/src/process/restart-recovery.test.ts new file mode 100644 index 0000000000..5091d7b992 --- /dev/null +++ b/src/process/restart-recovery.test.ts @@ -0,0 +1,18 @@ +import { describe, expect, it, vi } from "vitest"; +import { createRestartIterationHook } from "./restart-recovery.js"; + +describe("restart-recovery", () => { + it("skips recovery on first iteration and runs on subsequent iterations", () => { + const onRestart = vi.fn(); + const onIteration = createRestartIterationHook(onRestart); + + expect(onIteration()).toBe(false); + expect(onRestart).not.toHaveBeenCalled(); + + expect(onIteration()).toBe(true); + expect(onRestart).toHaveBeenCalledTimes(1); + + expect(onIteration()).toBe(true); + expect(onRestart).toHaveBeenCalledTimes(2); + }); +}); diff --git a/src/process/restart-recovery.ts b/src/process/restart-recovery.ts new file mode 100644 index 0000000000..2f9818d7f5 --- /dev/null +++ b/src/process/restart-recovery.ts @@ -0,0 +1,16 @@ +/** + * Returns an iteration hook for in-process restart loops. + * The first call is considered initial startup and does nothing. + * Each subsequent call represents a restart iteration and invokes `onRestart`. + */ +export function createRestartIterationHook(onRestart: () => void): () => boolean { + let isFirstIteration = true; + return () => { + if (isFirstIteration) { + isFirstIteration = false; + return false; + } + onRestart(); + return true; + }; +}