From 1f850374f6e7bc8e178f548bddb8095f32ef2f9a Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 17 Feb 2026 03:26:13 +0100 Subject: [PATCH] fix(gateway): harden channel health monitor recovery --- CHANGELOG.md | 1 + src/config/config-misc.test.ts | 23 +++ src/config/zod-schema.ts | 1 + src/gateway/channel-health-monitor.test.ts | 25 ++- src/gateway/channel-health-monitor.ts | 10 +- src/gateway/server-channels.test.ts | 168 +++++++++++++++++++++ src/gateway/server-channels.ts | 72 +++++++-- 7 files changed, 282 insertions(+), 18 deletions(-) create mode 100644 src/gateway/server-channels.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 3749d93a61..fe83be615e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Gateway/Channels: wire `gateway.channelHealthCheckMinutes` into strict config validation, treat implicit account status as managed for health checks, and harden channel auto-restart flow (preserve restart-attempt caps across crash loops, propagate enabled/configured runtime flags, and stop pending restart backoff after manual stop). Thanks @steipete. - UI/Usage: replace lingering undefined `var(--text-muted)` usage with `var(--muted)` in usage date-range and chart styles to keep muted text visible across themes. (#17975) Thanks @jogelin. - UI/Usage: preserve selected-range totals when timeline data is downsampled by bucket-aggregating timeseries points (instead of dropping intermediate points), so filtered tokens/cost stay accurate. (#17959) Thanks @jogelin. - Mattermost: harden reaction handling by requiring an explicit boolean `remove` flag and routing reaction websocket events to the reaction handler, preventing string `"true"` values from being treated as removes and avoiding double-processing of reaction events as posts. (#18608) Thanks @echo931. diff --git a/src/config/config-misc.test.ts b/src/config/config-misc.test.ts index 4f6eaf41bd..e2ad2046dd 100644 --- a/src/config/config-misc.test.ts +++ b/src/config/config-misc.test.ts @@ -153,6 +153,29 @@ describe("gateway.tools config", () => { }); }); +describe("gateway.channelHealthCheckMinutes", () => { + it("accepts zero to disable monitor", () => { + const res = validateConfigObject({ + gateway: { + channelHealthCheckMinutes: 0, + }, + }); + expect(res.ok).toBe(true); + }); + + it("rejects negative intervals", () => { + const res = validateConfigObject({ + gateway: { + channelHealthCheckMinutes: -1, + }, + }); + expect(res.ok).toBe(false); + if (!res.ok) { + expect(res.issues[0]?.path).toBe("gateway.channelHealthCheckMinutes"); + } + }); +}); + describe("cron webhook schema", () => { it("accepts cron.webhookToken and legacy cron.webhook", () => { const res = OpenClawSchema.safeParse({ diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index f767437fad..9ca6f14b71 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -442,6 +442,7 @@ export const OpenClawSchema = z }) .strict() .optional(), + channelHealthCheckMinutes: z.number().int().min(0).optional(), tailscale: z .object({ mode: z.union([z.literal("off"), z.literal("serve"), z.literal("funnel")]).optional(), diff --git a/src/gateway/channel-health-monitor.test.ts b/src/gateway/channel-health-monitor.test.ts index c6ae340d6d..730a219f02 100644 --- a/src/gateway/channel-health-monitor.test.ts +++ b/src/gateway/channel-health-monitor.test.ts @@ -1,8 +1,8 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { ChannelId } from "../channels/plugins/types.js"; import type { ChannelAccountSnapshot } from "../channels/plugins/types.js"; -import { startChannelHealthMonitor } from "./channel-health-monitor.js"; import type { ChannelManager, ChannelRuntimeSnapshot } from "./server-channels.js"; +import { startChannelHealthMonitor } from "./channel-health-monitor.js"; function createMockChannelManager(overrides?: Partial): ChannelManager { return { @@ -236,6 +236,29 @@ describe("channel-health-monitor", () => { monitor.stop(); }); + it("treats missing enabled/configured flags as managed accounts", async () => { + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => + snapshotWith({ + telegram: { + default: { + running: false, + lastError: "polling stopped unexpectedly", + }, + }, + }), + ), + }); + const monitor = startChannelHealthMonitor({ + channelManager: manager, + checkIntervalMs: 5_000, + startupGraceMs: 0, + }); + await vi.advanceTimersByTimeAsync(5_500); + expect(manager.startChannel).toHaveBeenCalledWith("telegram", "default"); + monitor.stop(); + }); + it("applies cooldown — skips recently restarted channels for 2 cycles", async () => { const manager = createMockChannelManager({ getRuntimeSnapshot: vi.fn(() => diff --git a/src/gateway/channel-health-monitor.ts b/src/gateway/channel-health-monitor.ts index 87d5cc8253..8349df0ed1 100644 --- a/src/gateway/channel-health-monitor.ts +++ b/src/gateway/channel-health-monitor.ts @@ -1,6 +1,6 @@ import type { ChannelId } from "../channels/plugins/types.js"; -import { createSubsystemLogger } from "../logging/subsystem.js"; import type { ChannelManager } from "./server-channels.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; const log = createSubsystemLogger("gateway/health-monitor"); @@ -28,13 +28,17 @@ type RestartRecord = { restartsThisHour: { at: number }[]; }; +function isManagedAccount(snapshot: { enabled?: boolean; configured?: boolean }): boolean { + return snapshot.enabled !== false && snapshot.configured !== false; +} + function isChannelHealthy(snapshot: { running?: boolean; connected?: boolean; enabled?: boolean; configured?: boolean; }): boolean { - if (!snapshot.enabled || !snapshot.configured) { + if (!isManagedAccount(snapshot)) { return true; } if (!snapshot.running) { @@ -88,7 +92,7 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann if (!status) { continue; } - if (!status.enabled || !status.configured) { + if (!isManagedAccount(status)) { continue; } if (channelManager.isManuallyStopped(channelId as ChannelId, accountId)) { diff --git a/src/gateway/server-channels.test.ts b/src/gateway/server-channels.test.ts new file mode 100644 index 0000000000..1e18c286e0 --- /dev/null +++ b/src/gateway/server-channels.test.ts @@ -0,0 +1,168 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { RuntimeEnv } from "../runtime.js"; +import { type ChannelId, type ChannelPlugin } from "../channels/plugins/types.js"; +import { + createSubsystemLogger, + type SubsystemLogger, + runtimeForLogger, +} from "../logging/subsystem.js"; +import { createEmptyPluginRegistry, type PluginRegistry } from "../plugins/registry.js"; +import { getActivePluginRegistry, setActivePluginRegistry } from "../plugins/runtime.js"; +import { DEFAULT_ACCOUNT_ID } from "../routing/session-key.js"; +import { createChannelManager } from "./server-channels.js"; + +const hoisted = vi.hoisted(() => { + const computeBackoff = vi.fn(() => 10); + const sleepWithAbort = vi.fn((ms: number, abortSignal?: AbortSignal) => { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => resolve(), ms); + abortSignal?.addEventListener( + "abort", + () => { + clearTimeout(timer); + reject(new Error("aborted")); + }, + { once: true }, + ); + }); + }); + return { computeBackoff, sleepWithAbort }; +}); + +vi.mock("../infra/backoff.js", () => ({ + computeBackoff: hoisted.computeBackoff, + sleepWithAbort: hoisted.sleepWithAbort, +})); + +type TestAccount = { + enabled?: boolean; + configured?: boolean; +}; + +function createTestPlugin(params?: { + account?: TestAccount; + startAccount?: NonNullable["gateway"]>["startAccount"]; + includeDescribeAccount?: boolean; +}): ChannelPlugin { + const account = params?.account ?? { enabled: true, configured: true }; + const includeDescribeAccount = params?.includeDescribeAccount !== false; + const config: ChannelPlugin["config"] = { + listAccountIds: () => [DEFAULT_ACCOUNT_ID], + resolveAccount: () => account, + isEnabled: (resolved) => resolved.enabled !== false, + }; + if (includeDescribeAccount) { + config.describeAccount = (resolved) => ({ + accountId: DEFAULT_ACCOUNT_ID, + enabled: resolved.enabled !== false, + configured: resolved.configured !== false, + }); + } + const gateway: NonNullable["gateway"]> = {}; + if (params?.startAccount) { + gateway.startAccount = params.startAccount; + } + return { + id: "discord", + meta: { + id: "discord", + label: "Discord", + selectionLabel: "Discord", + docsPath: "/channels/discord", + blurb: "test stub", + }, + capabilities: { chatTypes: ["direct"] }, + config, + gateway, + }; +} + +function installTestRegistry(plugin: ChannelPlugin) { + const registry = createEmptyPluginRegistry(); + registry.channels.push({ + pluginId: plugin.id, + source: "test", + plugin, + }); + setActivePluginRegistry(registry); +} + +function createManager() { + const log = createSubsystemLogger("gateway/server-channels-test"); + const channelLogs = { discord: log } as Record; + const runtime = runtimeForLogger(log); + const channelRuntimeEnvs = { discord: runtime } as Record; + return createChannelManager({ + loadConfig: () => ({}), + channelLogs, + channelRuntimeEnvs, + }); +} + +describe("server-channels auto restart", () => { + let previousRegistry: PluginRegistry | null = null; + + beforeEach(() => { + previousRegistry = getActivePluginRegistry(); + vi.useFakeTimers(); + hoisted.computeBackoff.mockClear(); + hoisted.sleepWithAbort.mockClear(); + }); + + afterEach(() => { + vi.useRealTimers(); + setActivePluginRegistry(previousRegistry ?? createEmptyPluginRegistry()); + }); + + it("caps crash-loop restarts after max attempts", async () => { + const startAccount = vi.fn(async () => {}); + installTestRegistry( + createTestPlugin({ + startAccount, + }), + ); + const manager = createManager(); + + await manager.startChannels(); + await vi.advanceTimersByTimeAsync(500); + + expect(startAccount).toHaveBeenCalledTimes(11); + const snapshot = manager.getRuntimeSnapshot(); + const account = snapshot.channelAccounts.discord?.[DEFAULT_ACCOUNT_ID]; + expect(account?.running).toBe(false); + expect(account?.reconnectAttempts).toBe(10); + + await vi.advanceTimersByTimeAsync(500); + expect(startAccount).toHaveBeenCalledTimes(11); + }); + + it("does not auto-restart after manual stop during backoff", async () => { + const startAccount = vi.fn(async () => {}); + installTestRegistry( + createTestPlugin({ + startAccount, + }), + ); + const manager = createManager(); + + await manager.startChannels(); + vi.runAllTicks(); + await manager.stopChannel("discord", DEFAULT_ACCOUNT_ID); + + await vi.advanceTimersByTimeAsync(500); + expect(startAccount).toHaveBeenCalledTimes(1); + }); + + it("marks enabled/configured when account descriptors omit them", () => { + installTestRegistry( + createTestPlugin({ + includeDescribeAccount: false, + }), + ); + const manager = createManager(); + const snapshot = manager.getRuntimeSnapshot(); + const account = snapshot.channelAccounts.discord?.[DEFAULT_ACCOUNT_ID]; + expect(account?.enabled).toBe(true); + expect(account?.configured).toBe(true); + }); +}); diff --git a/src/gateway/server-channels.ts b/src/gateway/server-channels.ts index 6cb5ec2418..e2c7a4e4fc 100644 --- a/src/gateway/server-channels.ts +++ b/src/gateway/server-channels.ts @@ -1,13 +1,13 @@ -import { resolveChannelDefaultAccountId } from "../channels/plugins/helpers.js"; -import { type ChannelId, getChannelPlugin, listChannelPlugins } from "../channels/plugins/index.js"; import type { ChannelAccountSnapshot } from "../channels/plugins/types.js"; import type { OpenClawConfig } from "../config/config.js"; +import type { createSubsystemLogger } from "../logging/subsystem.js"; +import type { RuntimeEnv } from "../runtime.js"; +import { resolveChannelDefaultAccountId } from "../channels/plugins/helpers.js"; +import { type ChannelId, getChannelPlugin, listChannelPlugins } from "../channels/plugins/index.js"; import { type BackoffPolicy, computeBackoff, sleepWithAbort } from "../infra/backoff.js"; import { formatErrorMessage } from "../infra/errors.js"; import { resetDirectoryCache } from "../infra/outbound/target-resolver.js"; -import type { createSubsystemLogger } from "../logging/subsystem.js"; import { DEFAULT_ACCOUNT_ID } from "../routing/session-key.js"; -import type { RuntimeEnv } from "../runtime.js"; const CHANNEL_RESTART_POLICY: BackoffPolicy = { initialMs: 5_000, @@ -61,6 +61,11 @@ type ChannelManagerOptions = { channelRuntimeEnvs: Record; }; +type StartChannelOptions = { + preserveRestartAttempts?: boolean; + preserveManualStop?: boolean; +}; + export type ChannelManager = { getRuntimeSnapshot: () => ChannelRuntimeSnapshot; startChannels: () => Promise; @@ -110,12 +115,17 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage return next; }; - const startChannel = async (channelId: ChannelId, accountId?: string) => { + const startChannelInternal = async ( + channelId: ChannelId, + accountId?: string, + opts: StartChannelOptions = {}, + ) => { const plugin = getChannelPlugin(channelId); const startAccount = plugin?.gateway?.startAccount; if (!startAccount) { return; } + const { preserveRestartAttempts = false, preserveManualStop = false } = opts; const cfg = loadConfig(); resetDirectoryCache({ channel: channelId, accountId }); const store = getStore(channelId); @@ -136,6 +146,8 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage if (!enabled) { setRuntime(channelId, id, { accountId: id, + enabled: false, + configured: true, running: false, lastError: plugin.config.disabledReason?.(account, cfg) ?? "disabled", }); @@ -149,6 +161,8 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage if (!configured) { setRuntime(channelId, id, { accountId: id, + enabled: true, + configured: false, running: false, lastError: plugin.config.unconfiguredReason?.(account, cfg) ?? "not configured", }); @@ -156,17 +170,23 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage } const rKey = restartKey(channelId, id); - manuallyStopped.delete(rKey); + if (!preserveManualStop) { + manuallyStopped.delete(rKey); + } const abort = new AbortController(); store.aborts.set(id, abort); - restartAttempts.delete(rKey); + if (!preserveRestartAttempts) { + restartAttempts.delete(rKey); + } setRuntime(channelId, id, { accountId: id, + enabled: true, + configured: true, running: true, lastStartAt: Date.now(), lastError: null, - reconnectAttempts: 0, + reconnectAttempts: preserveRestartAttempts ? (restartAttempts.get(rKey) ?? 0) : 0, }); const log = channelLogs[channelId]; @@ -180,15 +200,13 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage getStatus: () => getRuntime(channelId, id), setStatus: (next) => setRuntime(channelId, id, next), }); - const tracked = Promise.resolve(task) + const trackedPromise = Promise.resolve(task) .catch((err) => { const message = formatErrorMessage(err); setRuntime(channelId, id, { accountId: id, lastError: message }); log.error?.(`[${id}] channel exited: ${message}`); }) .finally(() => { - store.aborts.delete(id); - store.tasks.delete(id); setRuntime(channelId, id, { accountId: id, running: false, @@ -214,17 +232,41 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage reconnectAttempts: attempt, }); try { - await sleepWithAbort(delayMs); - await startChannel(channelId, id); + await sleepWithAbort(delayMs, abort.signal); + if (manuallyStopped.has(rKey)) { + return; + } + if (store.tasks.get(id) === trackedPromise) { + store.tasks.delete(id); + } + if (store.aborts.get(id) === abort) { + store.aborts.delete(id); + } + await startChannelInternal(channelId, id, { + preserveRestartAttempts: true, + preserveManualStop: true, + }); } catch { // abort or startup failure — next crash will retry } + }) + .finally(() => { + if (store.tasks.get(id) === trackedPromise) { + store.tasks.delete(id); + } + if (store.aborts.get(id) === abort) { + store.aborts.delete(id); + } }); - store.tasks.set(id, tracked); + store.tasks.set(id, trackedPromise); }), ); }; + const startChannel = async (channelId: ChannelId, accountId?: string) => { + await startChannelInternal(channelId, accountId); + }; + const stopChannel = async (channelId: ChannelId, accountId?: string) => { const plugin = getChannelPlugin(channelId); const store = getStore(channelId); @@ -333,6 +375,8 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage const configured = described?.configured; const current = store.runtimes.get(id) ?? cloneDefaultRuntime(plugin.id, id); const next = { ...current, accountId: id }; + next.enabled = enabled; + next.configured = typeof configured === "boolean" ? configured : (next.configured ?? true); if (!next.running) { if (!enabled) { next.lastError ??= plugin.config.disabledReason?.(account, cfg) ?? "disabled";