diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 9cdcf18652..9486e199e3 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -6,6 +6,7 @@ import { isGatewaySigusr1RestartExternallyAllowed, } from "../../infra/restart.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; +import { getActiveTaskCount, waitForActiveTasks } from "../../process/command-queue.js"; const gatewayLog = createSubsystemLogger("gateway"); @@ -26,6 +27,9 @@ export async function runGatewayLoop(params: { process.removeListener("SIGUSR1", onSigusr1); }; + const DRAIN_TIMEOUT_MS = 30_000; + const SHUTDOWN_TIMEOUT_MS = 5_000; + const request = (action: GatewayRunSignalAction, signal: string) => { if (shuttingDown) { gatewayLog.info(`received ${signal} during shutdown; ignoring`); @@ -35,14 +39,33 @@ export async function runGatewayLoop(params: { const isRestart = action === "restart"; gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`); + // Allow extra time for draining active turns on restart. + const forceExitMs = isRestart ? DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS : SHUTDOWN_TIMEOUT_MS; const forceExitTimer = setTimeout(() => { gatewayLog.error("shutdown timed out; exiting without full cleanup"); cleanupSignals(); params.runtime.exit(0); - }, 5000); + }, forceExitMs); void (async () => { try { + // On restart, wait for in-flight agent turns to finish before + // tearing down the server so buffered messages are delivered. + if (isRestart) { + const activeTasks = getActiveTaskCount(); + if (activeTasks > 0) { + gatewayLog.info( + `draining ${activeTasks} active task(s) before restart (timeout ${DRAIN_TIMEOUT_MS}ms)`, + ); + const { drained } = await waitForActiveTasks(DRAIN_TIMEOUT_MS); + if (drained) { + gatewayLog.info("all active tasks drained"); + } else { + gatewayLog.warn("drain timeout reached; proceeding with restart"); + } + } + } + await server?.close({ reason: isRestart ? "gateway restarting" : "gateway stopping", restartExpectedMs: isRestart ? 1500 : null, diff --git a/src/process/command-queue.test.ts b/src/process/command-queue.test.ts index 8ede381da9..d08688347c 100644 --- a/src/process/command-queue.test.ts +++ b/src/process/command-queue.test.ts @@ -16,7 +16,14 @@ vi.mock("../logging/diagnostic.js", () => ({ diagnosticLogger: diagnosticMocks.diag, })); -import { enqueueCommand, getQueueSize } from "./command-queue.js"; +import { + enqueueCommand, + enqueueCommandInLane, + getActiveTaskCount, + getQueueSize, + setCommandLaneConcurrency, + waitForActiveTasks, +} from "./command-queue.js"; describe("command queue", () => { beforeEach(() => { @@ -85,4 +92,106 @@ describe("command queue", () => { expect(waited as number).toBeGreaterThanOrEqual(5); expect(queuedAhead).toBe(0); }); + + it("getActiveTaskCount returns count of currently executing tasks", async () => { + let resolve1!: () => void; + const blocker = new Promise((r) => { + resolve1 = r; + }); + + const task = enqueueCommand(async () => { + await blocker; + }); + + // Give the event loop a tick for the task to start. + await new Promise((r) => setTimeout(r, 5)); + expect(getActiveTaskCount()).toBe(1); + + resolve1(); + await task; + expect(getActiveTaskCount()).toBe(0); + }); + + it("waitForActiveTasks resolves immediately when no tasks are active", async () => { + const { drained } = await waitForActiveTasks(1000); + expect(drained).toBe(true); + }); + + it("waitForActiveTasks waits for active tasks to finish", async () => { + let resolve1!: () => void; + const blocker = new Promise((r) => { + resolve1 = r; + }); + + const task = enqueueCommand(async () => { + await blocker; + }); + + // Give the task a tick to start. + await new Promise((r) => setTimeout(r, 5)); + + const drainPromise = waitForActiveTasks(5000); + + // Resolve the blocker after a short delay. + setTimeout(() => resolve1(), 50); + + const { drained } = await drainPromise; + expect(drained).toBe(true); + + await task; + }); + + it("waitForActiveTasks returns drained=false on timeout", async () => { + let resolve1!: () => void; + const blocker = new Promise((r) => { + resolve1 = r; + }); + + const task = enqueueCommand(async () => { + await blocker; + }); + + await new Promise((r) => setTimeout(r, 5)); + + const { drained } = await waitForActiveTasks(50); + expect(drained).toBe(false); + + resolve1(); + await task; + }); + + it("waitForActiveTasks ignores tasks that start after the call", async () => { + const lane = `drain-snapshot-${Date.now()}-${Math.random().toString(16).slice(2)}`; + setCommandLaneConcurrency(lane, 2); + + let resolve1!: () => void; + const blocker1 = new Promise((r) => { + resolve1 = r; + }); + let resolve2!: () => void; + const blocker2 = new Promise((r) => { + resolve2 = r; + }); + + const first = enqueueCommandInLane(lane, async () => { + await blocker1; + }); + await new Promise((r) => setTimeout(r, 5)); + + const drainPromise = waitForActiveTasks(2000); + + // Starts after waitForActiveTasks snapshot and should not block drain completion. + const second = enqueueCommandInLane(lane, async () => { + await blocker2; + }); + await new Promise((r) => setTimeout(r, 5)); + expect(getActiveTaskCount()).toBeGreaterThanOrEqual(2); + + resolve1(); + const { drained } = await drainPromise; + expect(drained).toBe(true); + + resolve2(); + await Promise.all([first, second]); + }); }); diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts index f9f2f0093f..205cb16013 100644 --- a/src/process/command-queue.ts +++ b/src/process/command-queue.ts @@ -19,11 +19,13 @@ type LaneState = { lane: string; queue: QueueEntry[]; active: number; + activeTaskIds: Set; maxConcurrent: number; draining: boolean; }; const lanes = new Map(); +let nextTaskId = 1; function getLaneState(lane: string): LaneState { const existing = lanes.get(lane); @@ -34,6 +36,7 @@ function getLaneState(lane: string): LaneState { lane, queue: [], active: 0, + activeTaskIds: new Set(), maxConcurrent: 1, draining: false, }; @@ -59,12 +62,15 @@ function drainLane(lane: string) { ); } logLaneDequeue(lane, waitedMs, state.queue.length); + const taskId = nextTaskId++; state.active += 1; + state.activeTaskIds.add(taskId); void (async () => { const startTime = Date.now(); try { const result = await entry.task(); state.active -= 1; + state.activeTaskIds.delete(taskId); diag.debug( `lane task done: lane=${lane} durationMs=${Date.now() - startTime} active=${state.active} queued=${state.queue.length}`, ); @@ -72,6 +78,7 @@ function drainLane(lane: string) { entry.resolve(result); } catch (err) { state.active -= 1; + state.activeTaskIds.delete(taskId); const isProbeLane = lane.startsWith("auth-probe:") || lane.startsWith("session:probe-"); if (!isProbeLane) { diag.error( @@ -158,3 +165,67 @@ export function clearCommandLane(lane: string = CommandLane.Main) { state.queue.length = 0; return removed; } + +/** + * Returns the total number of actively executing tasks across all lanes + * (excludes queued-but-not-started entries). + */ +export function getActiveTaskCount(): number { + let total = 0; + for (const s of lanes.values()) { + total += s.active; + } + return total; +} + +/** + * Wait for all currently active tasks across all lanes to finish. + * Polls at a short interval; resolves when no tasks are active or + * when `timeoutMs` elapses (whichever comes first). + * + * New tasks enqueued after this call are ignored — only tasks that are + * already executing are waited on. + */ +export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolean }> { + const POLL_INTERVAL_MS = 250; + const deadline = Date.now() + timeoutMs; + const activeAtStart = new Set(); + for (const state of lanes.values()) { + for (const taskId of state.activeTaskIds) { + activeAtStart.add(taskId); + } + } + + return new Promise((resolve) => { + const check = () => { + if (activeAtStart.size === 0) { + resolve({ drained: true }); + return; + } + + let hasPending = false; + for (const state of lanes.values()) { + for (const taskId of state.activeTaskIds) { + if (activeAtStart.has(taskId)) { + hasPending = true; + break; + } + } + if (hasPending) { + break; + } + } + + if (!hasPending) { + resolve({ drained: true }); + return; + } + if (Date.now() >= deadline) { + resolve({ drained: false }); + return; + } + setTimeout(check, POLL_INTERVAL_MS); + }; + check(); + }); +}