diff --git a/CHANGELOG.md b/CHANGELOG.md index 72931764f6..6853fa126f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai - Clawdock: avoid Zsh readonly variable collisions in helper scripts. (#15501) Thanks @nkelner. - Discord: route autoThread replies to existing threads instead of the root channel. (#8302) Thanks @gavinbmoore, @thewilloftheshadow. - Discord/Agents: apply channel/group `historyLimit` during embedded-runner history compaction to prevent long-running channel sessions from bypassing truncation and overflowing context windows. (#11224) Thanks @shadril238. +- Mattermost (plugin): retry websocket monitor connections with exponential backoff and abort-aware teardown so transient connect failures no longer permanently stop monitoring. (#14962) Thanks @mcaxtr. - Telegram: scope skill commands to the resolved agent for default accounts so `setMyCommands` no longer triggers `BOT_COMMANDS_TOO_MUCH` when multiple agents are configured. (#15599) - Plugins/Hooks: fire `before_tool_call` hook exactly once per tool invocation in embedded runs by removing duplicate dispatch paths while preserving parameter mutation semantics. (#15635) Thanks @lailoo. - Agents/Image tool: cap image-analysis completion `maxTokens` by model capability (`min(4096, model.maxTokens)`) to avoid over-limit provider failures while still preventing truncation. (#11770) Thanks @detecti1. diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 8d4f3d95e9..4a6e5489a1 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -39,6 +39,7 @@ import { resolveThreadSessionKeys, } from "./monitor-helpers.js"; import { resolveOncharPrefixes, stripOncharPrefix } from "./monitor-onchar.js"; +import { runWithReconnect } from "./reconnect.js"; import { sendMessageMattermost } from "./send.js"; export type MonitorMattermostOpts = { @@ -891,88 +892,102 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} const connectOnce = async (): Promise => { const ws = new WebSocket(wsUrl); - const onAbort = () => ws.close(); + const onAbort = () => ws.terminate(); opts.abortSignal?.addEventListener("abort", onAbort, { once: true }); - return await new Promise((resolve) => { - ws.on("open", () => { - opts.statusSink?.({ - connected: true, - lastConnectedAt: Date.now(), - lastError: null, - }); - ws.send( - JSON.stringify({ - seq: seq++, - action: "authentication_challenge", - data: { token: botToken }, - }), - ); - }); + try { + return await new Promise((resolve, reject) => { + let opened = false; - ws.on("message", async (data) => { - const raw = rawDataToString(data); - let payload: MattermostEventPayload; - try { - payload = JSON.parse(raw) as MattermostEventPayload; - } catch { - return; - } - if (payload.event !== "posted") { - return; - } - const postData = payload.data?.post; - if (!postData) { - return; - } - let post: MattermostPost | null = null; - if (typeof postData === "string") { + ws.on("open", () => { + opened = true; + opts.statusSink?.({ + connected: true, + lastConnectedAt: Date.now(), + lastError: null, + }); + ws.send( + JSON.stringify({ + seq: seq++, + action: "authentication_challenge", + data: { token: botToken }, + }), + ); + }); + + ws.on("message", async (data) => { + const raw = rawDataToString(data); + let payload: MattermostEventPayload; try { - post = JSON.parse(postData) as MattermostPost; + payload = JSON.parse(raw) as MattermostEventPayload; } catch { return; } - } else if (typeof postData === "object") { - post = postData as MattermostPost; - } - if (!post) { - return; - } - try { - await debouncer.enqueue({ post, payload }); - } catch (err) { - runtime.error?.(`mattermost handler failed: ${String(err)}`); - } - }); - - ws.on("close", (code, reason) => { - const message = reason.length > 0 ? reason.toString("utf8") : ""; - opts.statusSink?.({ - connected: false, - lastDisconnect: { - at: Date.now(), - status: code, - error: message || undefined, - }, + if (payload.event !== "posted") { + return; + } + const postData = payload.data?.post; + if (!postData) { + return; + } + let post: MattermostPost | null = null; + if (typeof postData === "string") { + try { + post = JSON.parse(postData) as MattermostPost; + } catch { + return; + } + } else if (typeof postData === "object") { + post = postData as MattermostPost; + } + if (!post) { + return; + } + try { + await debouncer.enqueue({ post, payload }); + } catch (err) { + runtime.error?.(`mattermost handler failed: ${String(err)}`); + } }); - opts.abortSignal?.removeEventListener("abort", onAbort); - resolve(); - }); - ws.on("error", (err) => { - runtime.error?.(`mattermost websocket error: ${String(err)}`); - opts.statusSink?.({ - lastError: String(err), + ws.on("close", (code, reason) => { + const message = reason.length > 0 ? reason.toString("utf8") : ""; + opts.statusSink?.({ + connected: false, + lastDisconnect: { + at: Date.now(), + status: code, + error: message || undefined, + }, + }); + if (opened) { + resolve(); + } else { + reject(new Error(`websocket closed before open (code ${code})`)); + } + }); + + ws.on("error", (err) => { + runtime.error?.(`mattermost websocket error: ${String(err)}`); + opts.statusSink?.({ + lastError: String(err), + }); + ws.close(); }); }); - }); + } finally { + opts.abortSignal?.removeEventListener("abort", onAbort); + } }; - while (!opts.abortSignal?.aborted) { - await connectOnce(); - if (opts.abortSignal?.aborted) { - return; - } - await new Promise((resolve) => setTimeout(resolve, 2000)); - } + await runWithReconnect(connectOnce, { + abortSignal: opts.abortSignal, + onError: (err) => { + runtime.error?.(`mattermost connection failed: ${String(err)}`); + opts.statusSink?.({ lastError: String(err), connected: false }); + }, + onReconnect: (delayMs) => { + runtime.log?.(`mattermost reconnecting in ${Math.round(delayMs / 1000)}s`); + }, + }); } diff --git a/extensions/mattermost/src/mattermost/reconnect.test.ts b/extensions/mattermost/src/mattermost/reconnect.test.ts new file mode 100644 index 0000000000..ebcac2ec85 --- /dev/null +++ b/extensions/mattermost/src/mattermost/reconnect.test.ts @@ -0,0 +1,151 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { runWithReconnect } from "./reconnect.js"; + +beforeEach(() => { + vi.clearAllMocks(); +}); + +describe("runWithReconnect", () => { + it("retries after connectFn resolves (normal close)", async () => { + let callCount = 0; + const abort = new AbortController(); + const connectFn = vi.fn(async () => { + callCount++; + if (callCount >= 3) { + abort.abort(); + } + }); + + await runWithReconnect(connectFn, { + abortSignal: abort.signal, + initialDelayMs: 1, + }); + + expect(connectFn).toHaveBeenCalledTimes(3); + }); + + it("retries after connectFn throws (connection error)", async () => { + let callCount = 0; + const abort = new AbortController(); + const onError = vi.fn(); + const connectFn = vi.fn(async () => { + callCount++; + if (callCount < 3) { + throw new Error("fetch failed"); + } + abort.abort(); + }); + + await runWithReconnect(connectFn, { + abortSignal: abort.signal, + onError, + initialDelayMs: 1, + }); + + expect(connectFn).toHaveBeenCalledTimes(3); + expect(onError).toHaveBeenCalledTimes(2); + expect(onError).toHaveBeenCalledWith(expect.objectContaining({ message: "fetch failed" })); + }); + + it("uses exponential backoff on consecutive errors, capped at maxDelayMs", async () => { + const abort = new AbortController(); + const delays: number[] = []; + let callCount = 0; + const connectFn = vi.fn(async () => { + callCount++; + if (callCount >= 6) { + abort.abort(); + return; + } + throw new Error("connection refused"); + }); + + await runWithReconnect(connectFn, { + abortSignal: abort.signal, + onReconnect: (delayMs) => delays.push(delayMs), + initialDelayMs: 100, + maxDelayMs: 1000, + }); + + expect(connectFn).toHaveBeenCalledTimes(6); + // 5 errors produce delays: 100, 200, 400, 800, 1000(cap) + // 6th succeeds -> delay resets to 100 + // But 6th also aborts → onReconnect NOT called (abort check fires first) + expect(delays).toEqual([100, 200, 400, 800, 1000]); + }); + + it("resets backoff after successful connection", async () => { + const abort = new AbortController(); + const delays: number[] = []; + let callCount = 0; + const connectFn = vi.fn(async () => { + callCount++; + if (callCount === 1) { + throw new Error("first failure"); + } + if (callCount === 2) { + return; // success + } + if (callCount === 3) { + throw new Error("second failure"); + } + abort.abort(); + }); + + await runWithReconnect(connectFn, { + abortSignal: abort.signal, + onReconnect: (delayMs) => delays.push(delayMs), + initialDelayMs: 100, + maxDelayMs: 60_000, + }); + + expect(connectFn).toHaveBeenCalledTimes(4); + // call 1: fail -> delay 100 + // call 2: success → delay resets to 100 + // call 3: fail -> delay 100 (reset held) + // call 4: success + abort → no onReconnect + expect(delays).toEqual([100, 100, 100]); + }); + + it("stops immediately when abort signal is pre-fired", async () => { + const abort = new AbortController(); + abort.abort(); + const connectFn = vi.fn(async () => {}); + + await runWithReconnect(connectFn, { abortSignal: abort.signal }); + + expect(connectFn).not.toHaveBeenCalled(); + }); + + it("stops after current connection when abort fires mid-connection", async () => { + const abort = new AbortController(); + const connectFn = vi.fn(async () => { + abort.abort(); + }); + + await runWithReconnect(connectFn, { + abortSignal: abort.signal, + initialDelayMs: 1, + }); + + expect(connectFn).toHaveBeenCalledTimes(1); + }); + + it("abort signal interrupts backoff sleep immediately", async () => { + const abort = new AbortController(); + const connectFn = vi.fn(async () => { + // Schedule abort to fire 10ms into the 60s sleep + setTimeout(() => abort.abort(), 10); + }); + + const start = Date.now(); + await runWithReconnect(connectFn, { + abortSignal: abort.signal, + initialDelayMs: 60_000, + }); + const elapsed = Date.now() - start; + + expect(connectFn).toHaveBeenCalledTimes(1); + expect(elapsed).toBeLessThan(5000); + }); +}); diff --git a/extensions/mattermost/src/mattermost/reconnect.ts b/extensions/mattermost/src/mattermost/reconnect.ts new file mode 100644 index 0000000000..a2782979d1 --- /dev/null +++ b/extensions/mattermost/src/mattermost/reconnect.ts @@ -0,0 +1,61 @@ +/** + * Reconnection loop with exponential backoff. + * + * Calls `connectFn` in a while loop. On normal resolve (connection closed), + * the backoff resets. On thrown error (connection failed), the current delay is + * used, then doubled for the next retry. + * The loop exits when `abortSignal` fires. + */ +export async function runWithReconnect( + connectFn: () => Promise, + opts: { + abortSignal?: AbortSignal; + onError?: (err: unknown) => void; + onReconnect?: (delayMs: number) => void; + initialDelayMs?: number; + maxDelayMs?: number; + } = {}, +): Promise { + const { initialDelayMs = 2000, maxDelayMs = 60_000 } = opts; + let retryDelay = initialDelayMs; + + while (!opts.abortSignal?.aborted) { + let shouldIncreaseDelay = false; + try { + await connectFn(); + retryDelay = initialDelayMs; + } catch (err) { + if (opts.abortSignal?.aborted) { + return; + } + opts.onError?.(err); + shouldIncreaseDelay = true; + } + if (opts.abortSignal?.aborted) { + return; + } + opts.onReconnect?.(retryDelay); + await sleepAbortable(retryDelay, opts.abortSignal); + if (shouldIncreaseDelay) { + retryDelay = Math.min(retryDelay * 2, maxDelayMs); + } + } +} + +function sleepAbortable(ms: number, signal?: AbortSignal): Promise { + return new Promise((resolve) => { + if (signal?.aborted) { + resolve(); + return; + } + const onAbort = () => { + clearTimeout(timer); + resolve(); + }; + const timer = setTimeout(() => { + signal?.removeEventListener("abort", onAbort); + resolve(); + }, ms); + signal?.addEventListener("abort", onAbort, { once: true }); + }); +}