fix(gateway): harden channel health monitor recovery

This commit is contained in:
Peter Steinberger
2026-02-17 03:26:13 +01:00
parent 4aed4eedb7
commit 1f850374f6
7 changed files with 282 additions and 18 deletions

View File

@@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Gateway/Channels: wire `gateway.channelHealthCheckMinutes` into strict config validation, treat implicit account status as managed for health checks, and harden channel auto-restart flow (preserve restart-attempt caps across crash loops, propagate enabled/configured runtime flags, and stop pending restart backoff after manual stop). Thanks @steipete.
- UI/Usage: replace lingering undefined `var(--text-muted)` usage with `var(--muted)` in usage date-range and chart styles to keep muted text visible across themes. (#17975) Thanks @jogelin.
- UI/Usage: preserve selected-range totals when timeline data is downsampled by bucket-aggregating timeseries points (instead of dropping intermediate points), so filtered tokens/cost stay accurate. (#17959) Thanks @jogelin.
- Mattermost: harden reaction handling by requiring an explicit boolean `remove` flag and routing reaction websocket events to the reaction handler, preventing string `"true"` values from being treated as removes and avoiding double-processing of reaction events as posts. (#18608) Thanks @echo931.

View File

@@ -153,6 +153,29 @@ describe("gateway.tools config", () => {
});
});
describe("gateway.channelHealthCheckMinutes", () => {
it("accepts zero to disable monitor", () => {
const res = validateConfigObject({
gateway: {
channelHealthCheckMinutes: 0,
},
});
expect(res.ok).toBe(true);
});
it("rejects negative intervals", () => {
const res = validateConfigObject({
gateway: {
channelHealthCheckMinutes: -1,
},
});
expect(res.ok).toBe(false);
if (!res.ok) {
expect(res.issues[0]?.path).toBe("gateway.channelHealthCheckMinutes");
}
});
});
describe("cron webhook schema", () => {
it("accepts cron.webhookToken and legacy cron.webhook", () => {
const res = OpenClawSchema.safeParse({

View File

@@ -442,6 +442,7 @@ export const OpenClawSchema = z
})
.strict()
.optional(),
channelHealthCheckMinutes: z.number().int().min(0).optional(),
tailscale: z
.object({
mode: z.union([z.literal("off"), z.literal("serve"), z.literal("funnel")]).optional(),

View File

@@ -1,8 +1,8 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { ChannelId } from "../channels/plugins/types.js";
import type { ChannelAccountSnapshot } from "../channels/plugins/types.js";
import { startChannelHealthMonitor } from "./channel-health-monitor.js";
import type { ChannelManager, ChannelRuntimeSnapshot } from "./server-channels.js";
import { startChannelHealthMonitor } from "./channel-health-monitor.js";
function createMockChannelManager(overrides?: Partial<ChannelManager>): ChannelManager {
return {
@@ -236,6 +236,29 @@ describe("channel-health-monitor", () => {
monitor.stop();
});
it("treats missing enabled/configured flags as managed accounts", async () => {
const manager = createMockChannelManager({
getRuntimeSnapshot: vi.fn(() =>
snapshotWith({
telegram: {
default: {
running: false,
lastError: "polling stopped unexpectedly",
},
},
}),
),
});
const monitor = startChannelHealthMonitor({
channelManager: manager,
checkIntervalMs: 5_000,
startupGraceMs: 0,
});
await vi.advanceTimersByTimeAsync(5_500);
expect(manager.startChannel).toHaveBeenCalledWith("telegram", "default");
monitor.stop();
});
it("applies cooldown — skips recently restarted channels for 2 cycles", async () => {
const manager = createMockChannelManager({
getRuntimeSnapshot: vi.fn(() =>

View File

@@ -1,6 +1,6 @@
import type { ChannelId } from "../channels/plugins/types.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import type { ChannelManager } from "./server-channels.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
const log = createSubsystemLogger("gateway/health-monitor");
@@ -28,13 +28,17 @@ type RestartRecord = {
restartsThisHour: { at: number }[];
};
function isManagedAccount(snapshot: { enabled?: boolean; configured?: boolean }): boolean {
return snapshot.enabled !== false && snapshot.configured !== false;
}
function isChannelHealthy(snapshot: {
running?: boolean;
connected?: boolean;
enabled?: boolean;
configured?: boolean;
}): boolean {
if (!snapshot.enabled || !snapshot.configured) {
if (!isManagedAccount(snapshot)) {
return true;
}
if (!snapshot.running) {
@@ -88,7 +92,7 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann
if (!status) {
continue;
}
if (!status.enabled || !status.configured) {
if (!isManagedAccount(status)) {
continue;
}
if (channelManager.isManuallyStopped(channelId as ChannelId, accountId)) {

View File

@@ -0,0 +1,168 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { RuntimeEnv } from "../runtime.js";
import { type ChannelId, type ChannelPlugin } from "../channels/plugins/types.js";
import {
createSubsystemLogger,
type SubsystemLogger,
runtimeForLogger,
} from "../logging/subsystem.js";
import { createEmptyPluginRegistry, type PluginRegistry } from "../plugins/registry.js";
import { getActivePluginRegistry, setActivePluginRegistry } from "../plugins/runtime.js";
import { DEFAULT_ACCOUNT_ID } from "../routing/session-key.js";
import { createChannelManager } from "./server-channels.js";
const hoisted = vi.hoisted(() => {
const computeBackoff = vi.fn(() => 10);
const sleepWithAbort = vi.fn((ms: number, abortSignal?: AbortSignal) => {
return new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => resolve(), ms);
abortSignal?.addEventListener(
"abort",
() => {
clearTimeout(timer);
reject(new Error("aborted"));
},
{ once: true },
);
});
});
return { computeBackoff, sleepWithAbort };
});
vi.mock("../infra/backoff.js", () => ({
computeBackoff: hoisted.computeBackoff,
sleepWithAbort: hoisted.sleepWithAbort,
}));
type TestAccount = {
enabled?: boolean;
configured?: boolean;
};
function createTestPlugin(params?: {
account?: TestAccount;
startAccount?: NonNullable<ChannelPlugin<TestAccount>["gateway"]>["startAccount"];
includeDescribeAccount?: boolean;
}): ChannelPlugin<TestAccount> {
const account = params?.account ?? { enabled: true, configured: true };
const includeDescribeAccount = params?.includeDescribeAccount !== false;
const config: ChannelPlugin<TestAccount>["config"] = {
listAccountIds: () => [DEFAULT_ACCOUNT_ID],
resolveAccount: () => account,
isEnabled: (resolved) => resolved.enabled !== false,
};
if (includeDescribeAccount) {
config.describeAccount = (resolved) => ({
accountId: DEFAULT_ACCOUNT_ID,
enabled: resolved.enabled !== false,
configured: resolved.configured !== false,
});
}
const gateway: NonNullable<ChannelPlugin<TestAccount>["gateway"]> = {};
if (params?.startAccount) {
gateway.startAccount = params.startAccount;
}
return {
id: "discord",
meta: {
id: "discord",
label: "Discord",
selectionLabel: "Discord",
docsPath: "/channels/discord",
blurb: "test stub",
},
capabilities: { chatTypes: ["direct"] },
config,
gateway,
};
}
function installTestRegistry(plugin: ChannelPlugin<TestAccount>) {
const registry = createEmptyPluginRegistry();
registry.channels.push({
pluginId: plugin.id,
source: "test",
plugin,
});
setActivePluginRegistry(registry);
}
function createManager() {
const log = createSubsystemLogger("gateway/server-channels-test");
const channelLogs = { discord: log } as Record<ChannelId, SubsystemLogger>;
const runtime = runtimeForLogger(log);
const channelRuntimeEnvs = { discord: runtime } as Record<ChannelId, RuntimeEnv>;
return createChannelManager({
loadConfig: () => ({}),
channelLogs,
channelRuntimeEnvs,
});
}
describe("server-channels auto restart", () => {
let previousRegistry: PluginRegistry | null = null;
beforeEach(() => {
previousRegistry = getActivePluginRegistry();
vi.useFakeTimers();
hoisted.computeBackoff.mockClear();
hoisted.sleepWithAbort.mockClear();
});
afterEach(() => {
vi.useRealTimers();
setActivePluginRegistry(previousRegistry ?? createEmptyPluginRegistry());
});
it("caps crash-loop restarts after max attempts", async () => {
const startAccount = vi.fn(async () => {});
installTestRegistry(
createTestPlugin({
startAccount,
}),
);
const manager = createManager();
await manager.startChannels();
await vi.advanceTimersByTimeAsync(500);
expect(startAccount).toHaveBeenCalledTimes(11);
const snapshot = manager.getRuntimeSnapshot();
const account = snapshot.channelAccounts.discord?.[DEFAULT_ACCOUNT_ID];
expect(account?.running).toBe(false);
expect(account?.reconnectAttempts).toBe(10);
await vi.advanceTimersByTimeAsync(500);
expect(startAccount).toHaveBeenCalledTimes(11);
});
it("does not auto-restart after manual stop during backoff", async () => {
const startAccount = vi.fn(async () => {});
installTestRegistry(
createTestPlugin({
startAccount,
}),
);
const manager = createManager();
await manager.startChannels();
vi.runAllTicks();
await manager.stopChannel("discord", DEFAULT_ACCOUNT_ID);
await vi.advanceTimersByTimeAsync(500);
expect(startAccount).toHaveBeenCalledTimes(1);
});
it("marks enabled/configured when account descriptors omit them", () => {
installTestRegistry(
createTestPlugin({
includeDescribeAccount: false,
}),
);
const manager = createManager();
const snapshot = manager.getRuntimeSnapshot();
const account = snapshot.channelAccounts.discord?.[DEFAULT_ACCOUNT_ID];
expect(account?.enabled).toBe(true);
expect(account?.configured).toBe(true);
});
});

View File

@@ -1,13 +1,13 @@
import { resolveChannelDefaultAccountId } from "../channels/plugins/helpers.js";
import { type ChannelId, getChannelPlugin, listChannelPlugins } from "../channels/plugins/index.js";
import type { ChannelAccountSnapshot } from "../channels/plugins/types.js";
import type { OpenClawConfig } from "../config/config.js";
import type { createSubsystemLogger } from "../logging/subsystem.js";
import type { RuntimeEnv } from "../runtime.js";
import { resolveChannelDefaultAccountId } from "../channels/plugins/helpers.js";
import { type ChannelId, getChannelPlugin, listChannelPlugins } from "../channels/plugins/index.js";
import { type BackoffPolicy, computeBackoff, sleepWithAbort } from "../infra/backoff.js";
import { formatErrorMessage } from "../infra/errors.js";
import { resetDirectoryCache } from "../infra/outbound/target-resolver.js";
import type { createSubsystemLogger } from "../logging/subsystem.js";
import { DEFAULT_ACCOUNT_ID } from "../routing/session-key.js";
import type { RuntimeEnv } from "../runtime.js";
const CHANNEL_RESTART_POLICY: BackoffPolicy = {
initialMs: 5_000,
@@ -61,6 +61,11 @@ type ChannelManagerOptions = {
channelRuntimeEnvs: Record<ChannelId, RuntimeEnv>;
};
type StartChannelOptions = {
preserveRestartAttempts?: boolean;
preserveManualStop?: boolean;
};
export type ChannelManager = {
getRuntimeSnapshot: () => ChannelRuntimeSnapshot;
startChannels: () => Promise<void>;
@@ -110,12 +115,17 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
return next;
};
const startChannel = async (channelId: ChannelId, accountId?: string) => {
const startChannelInternal = async (
channelId: ChannelId,
accountId?: string,
opts: StartChannelOptions = {},
) => {
const plugin = getChannelPlugin(channelId);
const startAccount = plugin?.gateway?.startAccount;
if (!startAccount) {
return;
}
const { preserveRestartAttempts = false, preserveManualStop = false } = opts;
const cfg = loadConfig();
resetDirectoryCache({ channel: channelId, accountId });
const store = getStore(channelId);
@@ -136,6 +146,8 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
if (!enabled) {
setRuntime(channelId, id, {
accountId: id,
enabled: false,
configured: true,
running: false,
lastError: plugin.config.disabledReason?.(account, cfg) ?? "disabled",
});
@@ -149,6 +161,8 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
if (!configured) {
setRuntime(channelId, id, {
accountId: id,
enabled: true,
configured: false,
running: false,
lastError: plugin.config.unconfiguredReason?.(account, cfg) ?? "not configured",
});
@@ -156,17 +170,23 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
}
const rKey = restartKey(channelId, id);
manuallyStopped.delete(rKey);
if (!preserveManualStop) {
manuallyStopped.delete(rKey);
}
const abort = new AbortController();
store.aborts.set(id, abort);
restartAttempts.delete(rKey);
if (!preserveRestartAttempts) {
restartAttempts.delete(rKey);
}
setRuntime(channelId, id, {
accountId: id,
enabled: true,
configured: true,
running: true,
lastStartAt: Date.now(),
lastError: null,
reconnectAttempts: 0,
reconnectAttempts: preserveRestartAttempts ? (restartAttempts.get(rKey) ?? 0) : 0,
});
const log = channelLogs[channelId];
@@ -180,15 +200,13 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
getStatus: () => getRuntime(channelId, id),
setStatus: (next) => setRuntime(channelId, id, next),
});
const tracked = Promise.resolve(task)
const trackedPromise = Promise.resolve(task)
.catch((err) => {
const message = formatErrorMessage(err);
setRuntime(channelId, id, { accountId: id, lastError: message });
log.error?.(`[${id}] channel exited: ${message}`);
})
.finally(() => {
store.aborts.delete(id);
store.tasks.delete(id);
setRuntime(channelId, id, {
accountId: id,
running: false,
@@ -214,17 +232,41 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
reconnectAttempts: attempt,
});
try {
await sleepWithAbort(delayMs);
await startChannel(channelId, id);
await sleepWithAbort(delayMs, abort.signal);
if (manuallyStopped.has(rKey)) {
return;
}
if (store.tasks.get(id) === trackedPromise) {
store.tasks.delete(id);
}
if (store.aborts.get(id) === abort) {
store.aborts.delete(id);
}
await startChannelInternal(channelId, id, {
preserveRestartAttempts: true,
preserveManualStop: true,
});
} catch {
// abort or startup failure — next crash will retry
}
})
.finally(() => {
if (store.tasks.get(id) === trackedPromise) {
store.tasks.delete(id);
}
if (store.aborts.get(id) === abort) {
store.aborts.delete(id);
}
});
store.tasks.set(id, tracked);
store.tasks.set(id, trackedPromise);
}),
);
};
const startChannel = async (channelId: ChannelId, accountId?: string) => {
await startChannelInternal(channelId, accountId);
};
const stopChannel = async (channelId: ChannelId, accountId?: string) => {
const plugin = getChannelPlugin(channelId);
const store = getStore(channelId);
@@ -333,6 +375,8 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
const configured = described?.configured;
const current = store.runtimes.get(id) ?? cloneDefaultRuntime(plugin.id, id);
const next = { ...current, accountId: id };
next.enabled = enabled;
next.configured = typeof configured === "boolean" ? configured : (next.configured ?? true);
if (!next.running) {
if (!enabled) {
next.lastError ??= plugin.config.disabledReason?.(account, cfg) ?? "disabled";