fix(gateway): make health monitor checks single-flight

This commit is contained in:
Peter Steinberger
2026-02-17 23:47:29 +01:00
parent 96f7d35dd7
commit 442b45e54e
2 changed files with 106 additions and 61 deletions

View File

@@ -1,8 +1,8 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { ChannelId } from "../channels/plugins/types.js";
import type { ChannelAccountSnapshot } from "../channels/plugins/types.js";
import { startChannelHealthMonitor } from "./channel-health-monitor.js";
import type { ChannelManager, ChannelRuntimeSnapshot } from "./server-channels.js";
import { startChannelHealthMonitor } from "./channel-health-monitor.js";
function createMockChannelManager(overrides?: Partial<ChannelManager>): ChannelManager {
return {
@@ -321,6 +321,43 @@ describe("channel-health-monitor", () => {
monitor.stop();
});
it("runs checks single-flight when restart work is still in progress", async () => {
let releaseStart: (() => void) | null = null;
const startGate = new Promise<void>((resolve) => {
releaseStart = resolve;
});
const manager = createMockChannelManager({
getRuntimeSnapshot: vi.fn(() =>
snapshotWith({
telegram: {
default: {
running: false,
enabled: true,
configured: true,
lastError: "stopped",
},
},
}),
),
startChannel: vi.fn(async () => {
await startGate;
}),
});
const monitor = startChannelHealthMonitor({
channelManager: manager,
checkIntervalMs: 100,
startupGraceMs: 0,
cooldownCycles: 0,
});
await vi.advanceTimersByTimeAsync(120);
expect(manager.startChannel).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(500);
expect(manager.startChannel).toHaveBeenCalledTimes(1);
releaseStart?.();
await Promise.resolve();
monitor.stop();
});
it("stops cleanly", async () => {
const manager = createMockChannelManager();
const monitor = startChannelHealthMonitor({

View File

@@ -1,6 +1,6 @@
import type { ChannelId } from "../channels/plugins/types.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import type { ChannelManager } from "./server-channels.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
const log = createSubsystemLogger("gateway/health-monitor");
@@ -64,6 +64,7 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann
const restartRecords = new Map<string, RestartRecord>();
const startedAt = Date.now();
let stopped = false;
let checkInFlight = false;
let timer: ReturnType<typeof setInterval> | null = null;
const rKey = (channelId: string, accountId: string) => `${channelId}:${accountId}`;
@@ -73,74 +74,81 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann
}
async function runCheck() {
if (stopped) {
if (stopped || checkInFlight) {
return;
}
checkInFlight = true;
const now = Date.now();
if (now - startedAt < startupGraceMs) {
return;
}
const snapshot = channelManager.getRuntimeSnapshot();
for (const [channelId, accounts] of Object.entries(snapshot.channelAccounts)) {
if (!accounts) {
continue;
try {
const now = Date.now();
if (now - startedAt < startupGraceMs) {
return;
}
for (const [accountId, status] of Object.entries(accounts)) {
if (!status) {
const snapshot = channelManager.getRuntimeSnapshot();
for (const [channelId, accounts] of Object.entries(snapshot.channelAccounts)) {
if (!accounts) {
continue;
}
if (!isManagedAccount(status)) {
continue;
}
if (channelManager.isManuallyStopped(channelId as ChannelId, accountId)) {
continue;
}
if (isChannelHealthy(status)) {
continue;
}
const key = rKey(channelId, accountId);
const record = restartRecords.get(key) ?? {
lastRestartAt: 0,
restartsThisHour: [],
};
if (now - record.lastRestartAt <= cooldownMs) {
continue;
}
pruneOldRestarts(record, now);
if (record.restartsThisHour.length >= maxRestartsPerHour) {
log.warn?.(
`[${channelId}:${accountId}] health-monitor: hit ${maxRestartsPerHour} restarts/hour limit, skipping`,
);
continue;
}
const reason = !status.running
? status.reconnectAttempts && status.reconnectAttempts >= 10
? "gave-up"
: "stopped"
: "stuck";
log.info?.(`[${channelId}:${accountId}] health-monitor: restarting (reason: ${reason})`);
try {
if (status.running) {
await channelManager.stopChannel(channelId as ChannelId, accountId);
for (const [accountId, status] of Object.entries(accounts)) {
if (!status) {
continue;
}
if (!isManagedAccount(status)) {
continue;
}
if (channelManager.isManuallyStopped(channelId as ChannelId, accountId)) {
continue;
}
if (isChannelHealthy(status)) {
continue;
}
const key = rKey(channelId, accountId);
const record = restartRecords.get(key) ?? {
lastRestartAt: 0,
restartsThisHour: [],
};
if (now - record.lastRestartAt <= cooldownMs) {
continue;
}
pruneOldRestarts(record, now);
if (record.restartsThisHour.length >= maxRestartsPerHour) {
log.warn?.(
`[${channelId}:${accountId}] health-monitor: hit ${maxRestartsPerHour} restarts/hour limit, skipping`,
);
continue;
}
const reason = !status.running
? status.reconnectAttempts && status.reconnectAttempts >= 10
? "gave-up"
: "stopped"
: "stuck";
log.info?.(`[${channelId}:${accountId}] health-monitor: restarting (reason: ${reason})`);
try {
if (status.running) {
await channelManager.stopChannel(channelId as ChannelId, accountId);
}
channelManager.resetRestartAttempts(channelId as ChannelId, accountId);
await channelManager.startChannel(channelId as ChannelId, accountId);
record.lastRestartAt = now;
record.restartsThisHour.push({ at: now });
restartRecords.set(key, record);
} catch (err) {
log.error?.(
`[${channelId}:${accountId}] health-monitor: restart failed: ${String(err)}`,
);
}
channelManager.resetRestartAttempts(channelId as ChannelId, accountId);
await channelManager.startChannel(channelId as ChannelId, accountId);
record.lastRestartAt = now;
record.restartsThisHour.push({ at: now });
restartRecords.set(key, record);
} catch (err) {
log.error?.(`[${channelId}:${accountId}] health-monitor: restart failed: ${String(err)}`);
}
}
} finally {
checkInFlight = false;
}
}