diff --git a/src/gateway/channel-health-monitor.test.ts b/src/gateway/channel-health-monitor.test.ts index 228b7b66e9..d37ad95d48 100644 --- a/src/gateway/channel-health-monitor.test.ts +++ b/src/gateway/channel-health-monitor.test.ts @@ -1,8 +1,8 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { ChannelId } from "../channels/plugins/types.js"; import type { ChannelAccountSnapshot } from "../channels/plugins/types.js"; -import { startChannelHealthMonitor } from "./channel-health-monitor.js"; import type { ChannelManager, ChannelRuntimeSnapshot } from "./server-channels.js"; +import { startChannelHealthMonitor } from "./channel-health-monitor.js"; function createMockChannelManager(overrides?: Partial): ChannelManager { return { @@ -321,6 +321,43 @@ describe("channel-health-monitor", () => { monitor.stop(); }); + it("runs checks single-flight when restart work is still in progress", async () => { + let releaseStart: (() => void) | null = null; + const startGate = new Promise((resolve) => { + releaseStart = resolve; + }); + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => + snapshotWith({ + telegram: { + default: { + running: false, + enabled: true, + configured: true, + lastError: "stopped", + }, + }, + }), + ), + startChannel: vi.fn(async () => { + await startGate; + }), + }); + const monitor = startChannelHealthMonitor({ + channelManager: manager, + checkIntervalMs: 100, + startupGraceMs: 0, + cooldownCycles: 0, + }); + await vi.advanceTimersByTimeAsync(120); + expect(manager.startChannel).toHaveBeenCalledTimes(1); + await vi.advanceTimersByTimeAsync(500); + expect(manager.startChannel).toHaveBeenCalledTimes(1); + releaseStart?.(); + await Promise.resolve(); + monitor.stop(); + }); + it("stops cleanly", async () => { const manager = createMockChannelManager(); const monitor = startChannelHealthMonitor({ diff --git a/src/gateway/channel-health-monitor.ts b/src/gateway/channel-health-monitor.ts index 4b26efcec9..4461c4cb91 100644 --- a/src/gateway/channel-health-monitor.ts +++ b/src/gateway/channel-health-monitor.ts @@ -1,6 +1,6 @@ import type { ChannelId } from "../channels/plugins/types.js"; -import { createSubsystemLogger } from "../logging/subsystem.js"; import type { ChannelManager } from "./server-channels.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; const log = createSubsystemLogger("gateway/health-monitor"); @@ -64,6 +64,7 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann const restartRecords = new Map(); const startedAt = Date.now(); let stopped = false; + let checkInFlight = false; let timer: ReturnType | null = null; const rKey = (channelId: string, accountId: string) => `${channelId}:${accountId}`; @@ -73,74 +74,81 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann } async function runCheck() { - if (stopped) { + if (stopped || checkInFlight) { return; } + checkInFlight = true; - const now = Date.now(); - if (now - startedAt < startupGraceMs) { - return; - } - - const snapshot = channelManager.getRuntimeSnapshot(); - - for (const [channelId, accounts] of Object.entries(snapshot.channelAccounts)) { - if (!accounts) { - continue; + try { + const now = Date.now(); + if (now - startedAt < startupGraceMs) { + return; } - for (const [accountId, status] of Object.entries(accounts)) { - if (!status) { + + const snapshot = channelManager.getRuntimeSnapshot(); + + for (const [channelId, accounts] of Object.entries(snapshot.channelAccounts)) { + if (!accounts) { continue; } - if (!isManagedAccount(status)) { - continue; - } - if (channelManager.isManuallyStopped(channelId as ChannelId, accountId)) { - continue; - } - if (isChannelHealthy(status)) { - continue; - } - - const key = rKey(channelId, accountId); - const record = restartRecords.get(key) ?? { - lastRestartAt: 0, - restartsThisHour: [], - }; - - if (now - record.lastRestartAt <= cooldownMs) { - continue; - } - - pruneOldRestarts(record, now); - if (record.restartsThisHour.length >= maxRestartsPerHour) { - log.warn?.( - `[${channelId}:${accountId}] health-monitor: hit ${maxRestartsPerHour} restarts/hour limit, skipping`, - ); - continue; - } - - const reason = !status.running - ? status.reconnectAttempts && status.reconnectAttempts >= 10 - ? "gave-up" - : "stopped" - : "stuck"; - - log.info?.(`[${channelId}:${accountId}] health-monitor: restarting (reason: ${reason})`); - - try { - if (status.running) { - await channelManager.stopChannel(channelId as ChannelId, accountId); + for (const [accountId, status] of Object.entries(accounts)) { + if (!status) { + continue; + } + if (!isManagedAccount(status)) { + continue; + } + if (channelManager.isManuallyStopped(channelId as ChannelId, accountId)) { + continue; + } + if (isChannelHealthy(status)) { + continue; + } + + const key = rKey(channelId, accountId); + const record = restartRecords.get(key) ?? { + lastRestartAt: 0, + restartsThisHour: [], + }; + + if (now - record.lastRestartAt <= cooldownMs) { + continue; + } + + pruneOldRestarts(record, now); + if (record.restartsThisHour.length >= maxRestartsPerHour) { + log.warn?.( + `[${channelId}:${accountId}] health-monitor: hit ${maxRestartsPerHour} restarts/hour limit, skipping`, + ); + continue; + } + + const reason = !status.running + ? status.reconnectAttempts && status.reconnectAttempts >= 10 + ? "gave-up" + : "stopped" + : "stuck"; + + log.info?.(`[${channelId}:${accountId}] health-monitor: restarting (reason: ${reason})`); + + try { + if (status.running) { + await channelManager.stopChannel(channelId as ChannelId, accountId); + } + channelManager.resetRestartAttempts(channelId as ChannelId, accountId); + await channelManager.startChannel(channelId as ChannelId, accountId); + record.lastRestartAt = now; + record.restartsThisHour.push({ at: now }); + restartRecords.set(key, record); + } catch (err) { + log.error?.( + `[${channelId}:${accountId}] health-monitor: restart failed: ${String(err)}`, + ); } - channelManager.resetRestartAttempts(channelId as ChannelId, accountId); - await channelManager.startChannel(channelId as ChannelId, accountId); - record.lastRestartAt = now; - record.restartsThisHour.push({ at: now }); - restartRecords.set(key, record); - } catch (err) { - log.error?.(`[${channelId}:${accountId}] health-monitor: restart failed: ${String(err)}`); } } + } finally { + checkInFlight = false; } }