From edbd86074f58a0929949da22a8297510bc555285 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 14 Feb 2026 03:39:02 +0100 Subject: [PATCH] refactor(mattermost): extract websocket monitor and reconnect policies --- .../src/mattermost/monitor-websocket.test.ts | 173 ++++++++++++++++ .../src/mattermost/monitor-websocket.ts | 190 ++++++++++++++++++ .../mattermost/src/mattermost/monitor.ts | 129 ++---------- .../src/mattermost/reconnect.test.ts | 40 ++++ .../mattermost/src/mattermost/reconnect.ts | 60 +++++- 5 files changed, 473 insertions(+), 119 deletions(-) create mode 100644 extensions/mattermost/src/mattermost/monitor-websocket.test.ts create mode 100644 extensions/mattermost/src/mattermost/monitor-websocket.ts diff --git a/extensions/mattermost/src/mattermost/monitor-websocket.test.ts b/extensions/mattermost/src/mattermost/monitor-websocket.test.ts new file mode 100644 index 0000000000..fee581b62c --- /dev/null +++ b/extensions/mattermost/src/mattermost/monitor-websocket.test.ts @@ -0,0 +1,173 @@ +import type { RuntimeEnv } from "openclaw/plugin-sdk"; +import { describe, expect, it, vi } from "vitest"; +import { + createMattermostConnectOnce, + type MattermostWebSocketLike, + WebSocketClosedBeforeOpenError, +} from "./monitor-websocket.js"; +import { runWithReconnect } from "./reconnect.js"; + +class FakeWebSocket implements MattermostWebSocketLike { + public readonly sent: string[] = []; + public closeCalls = 0; + public terminateCalls = 0; + private openListeners: Array<() => void> = []; + private messageListeners: Array<(data: Buffer) => void | Promise> = []; + private closeListeners: Array<(code: number, reason: Buffer) => void> = []; + private errorListeners: Array<(err: unknown) => void> = []; + + on(event: "open", listener: () => void): void; + on(event: "message", listener: (data: Buffer) => void | Promise): void; + on(event: "close", listener: (code: number, reason: Buffer) => void): void; + on(event: "error", listener: (err: unknown) => void): void; + on(event: "open" | "message" | "close" | "error", listener: unknown): void { + if (event === "open") { + this.openListeners.push(listener as () => void); + return; + } + if (event === "message") { + this.messageListeners.push(listener as (data: Buffer) => void | Promise); + return; + } + if (event === "close") { + this.closeListeners.push(listener as (code: number, reason: Buffer) => void); + return; + } + this.errorListeners.push(listener as (err: unknown) => void); + } + + send(data: string): void { + this.sent.push(data); + } + + close(): void { + this.closeCalls++; + } + + terminate(): void { + this.terminateCalls++; + } + + emitOpen(): void { + for (const listener of this.openListeners) { + listener(); + } + } + + emitMessage(data: Buffer): void { + for (const listener of this.messageListeners) { + void listener(data); + } + } + + emitClose(code: number, reason = ""): void { + const buffer = Buffer.from(reason, "utf8"); + for (const listener of this.closeListeners) { + listener(code, buffer); + } + } + + emitError(err: unknown): void { + for (const listener of this.errorListeners) { + listener(err); + } + } +} + +const testRuntime = (): RuntimeEnv => + ({ + log: vi.fn(), + error: vi.fn(), + exit: ((code: number): never => { + throw new Error(`exit ${code}`); + }) as RuntimeEnv["exit"], + }) as RuntimeEnv; + +describe("mattermost websocket monitor", () => { + it("rejects when websocket closes before open", async () => { + const socket = new FakeWebSocket(); + const connectOnce = createMattermostConnectOnce({ + wsUrl: "wss://example.invalid/api/v4/websocket", + botToken: "token", + runtime: testRuntime(), + nextSeq: () => 1, + onPosted: async () => {}, + webSocketFactory: () => socket, + }); + + queueMicrotask(() => { + socket.emitClose(1006, "connection refused"); + }); + + const failure = connectOnce(); + await expect(failure).rejects.toBeInstanceOf(WebSocketClosedBeforeOpenError); + await expect(failure).rejects.toMatchObject({ + message: "websocket closed before open (code 1006)", + }); + }); + + it("retries when first attempt errors before open and next attempt succeeds", async () => { + const abort = new AbortController(); + const reconnectDelays: number[] = []; + const onError = vi.fn(); + const patches: Array> = []; + const sockets: FakeWebSocket[] = []; + let disconnects = 0; + + const connectOnce = createMattermostConnectOnce({ + wsUrl: "wss://example.invalid/api/v4/websocket", + botToken: "token", + runtime: testRuntime(), + nextSeq: (() => { + let seq = 1; + return () => seq++; + })(), + onPosted: async () => {}, + abortSignal: abort.signal, + statusSink: (patch) => { + patches.push(patch as Record); + if (patch.lastDisconnect) { + disconnects++; + if (disconnects >= 2) { + abort.abort(); + } + } + }, + webSocketFactory: () => { + const socket = new FakeWebSocket(); + const attempt = sockets.length; + sockets.push(socket); + queueMicrotask(() => { + if (attempt === 0) { + socket.emitError(new Error("boom")); + socket.emitClose(1006, "connection refused"); + return; + } + socket.emitOpen(); + socket.emitClose(1000); + }); + return socket; + }, + }); + + await runWithReconnect(connectOnce, { + abortSignal: abort.signal, + initialDelayMs: 1, + onError, + onReconnect: (delay) => reconnectDelays.push(delay), + }); + + expect(sockets).toHaveLength(2); + expect(sockets[0].closeCalls).toBe(1); + expect(sockets[1].sent).toHaveLength(1); + expect(JSON.parse(sockets[1].sent[0])).toMatchObject({ + action: "authentication_challenge", + data: { token: "token" }, + seq: 1, + }); + expect(onError).toHaveBeenCalledTimes(1); + expect(reconnectDelays).toEqual([1]); + expect(patches.some((patch) => patch.connected === true)).toBe(true); + expect(patches.filter((patch) => patch.connected === false)).toHaveLength(2); + }); +}); diff --git a/extensions/mattermost/src/mattermost/monitor-websocket.ts b/extensions/mattermost/src/mattermost/monitor-websocket.ts new file mode 100644 index 0000000000..72fae6be87 --- /dev/null +++ b/extensions/mattermost/src/mattermost/monitor-websocket.ts @@ -0,0 +1,190 @@ +import type { ChannelAccountSnapshot, RuntimeEnv } from "openclaw/plugin-sdk"; +import WebSocket from "ws"; +import type { MattermostPost } from "./client.js"; +import { rawDataToString } from "./monitor-helpers.js"; + +export type MattermostEventPayload = { + event?: string; + data?: { + post?: string; + channel_id?: string; + channel_name?: string; + channel_display_name?: string; + channel_type?: string; + sender_name?: string; + team_id?: string; + }; + broadcast?: { + channel_id?: string; + team_id?: string; + user_id?: string; + }; +}; + +export type MattermostWebSocketLike = { + on(event: "open", listener: () => void): void; + on(event: "message", listener: (data: WebSocket.RawData) => void | Promise): void; + on(event: "close", listener: (code: number, reason: Buffer) => void): void; + on(event: "error", listener: (err: unknown) => void): void; + send(data: string): void; + close(): void; + terminate(): void; +}; + +export type MattermostWebSocketFactory = (url: string) => MattermostWebSocketLike; + +export class WebSocketClosedBeforeOpenError extends Error { + constructor( + public readonly code: number, + public readonly reason?: string, + ) { + super(`websocket closed before open (code ${code})`); + this.name = "WebSocketClosedBeforeOpenError"; + } +} + +type CreateMattermostConnectOnceOpts = { + wsUrl: string; + botToken: string; + abortSignal?: AbortSignal; + statusSink?: (patch: Partial) => void; + runtime: RuntimeEnv; + nextSeq: () => number; + onPosted: (post: MattermostPost, payload: MattermostEventPayload) => Promise; + webSocketFactory?: MattermostWebSocketFactory; +}; + +export const defaultMattermostWebSocketFactory: MattermostWebSocketFactory = (url) => + new WebSocket(url) as MattermostWebSocketLike; + +export function parsePostedEvent( + data: WebSocket.RawData, +): { payload: MattermostEventPayload; post: MattermostPost } | null { + const raw = rawDataToString(data); + let payload: MattermostEventPayload; + try { + payload = JSON.parse(raw) as MattermostEventPayload; + } catch { + return null; + } + if (payload.event !== "posted") { + return null; + } + const postData = payload.data?.post; + if (!postData) { + return null; + } + let post: MattermostPost | null = null; + if (typeof postData === "string") { + try { + post = JSON.parse(postData) as MattermostPost; + } catch { + return null; + } + } else if (typeof postData === "object") { + post = postData as MattermostPost; + } + if (!post) { + return null; + } + return { payload, post }; +} + +export function createMattermostConnectOnce( + opts: CreateMattermostConnectOnceOpts, +): () => Promise { + const webSocketFactory = opts.webSocketFactory ?? defaultMattermostWebSocketFactory; + return async () => { + const ws = webSocketFactory(opts.wsUrl); + const onAbort = () => ws.terminate(); + opts.abortSignal?.addEventListener("abort", onAbort, { once: true }); + + try { + return await new Promise((resolve, reject) => { + let opened = false; + let settled = false; + const resolveOnce = () => { + if (settled) { + return; + } + settled = true; + resolve(); + }; + const rejectOnce = (error: Error) => { + if (settled) { + return; + } + settled = true; + reject(error); + }; + + ws.on("open", () => { + opened = true; + opts.statusSink?.({ + connected: true, + lastConnectedAt: Date.now(), + lastError: null, + }); + ws.send( + JSON.stringify({ + seq: opts.nextSeq(), + action: "authentication_challenge", + data: { token: opts.botToken }, + }), + ); + }); + + ws.on("message", async (data) => { + const parsed = parsePostedEvent(data); + if (!parsed) { + return; + } + try { + await opts.onPosted(parsed.post, parsed.payload); + } catch (err) { + opts.runtime.error?.(`mattermost handler failed: ${String(err)}`); + } + }); + + ws.on("close", (code, reason) => { + const message = reasonToString(reason); + opts.statusSink?.({ + connected: false, + lastDisconnect: { + at: Date.now(), + status: code, + error: message || undefined, + }, + }); + if (opened) { + resolveOnce(); + return; + } + rejectOnce(new WebSocketClosedBeforeOpenError(code, message || undefined)); + }); + + ws.on("error", (err) => { + opts.runtime.error?.(`mattermost websocket error: ${String(err)}`); + opts.statusSink?.({ + lastError: String(err), + }); + try { + ws.close(); + } catch {} + }); + }); + } finally { + opts.abortSignal?.removeEventListener("abort", onAbort); + } + }; +} + +function reasonToString(reason: Buffer | string | undefined): string { + if (!reason) { + return ""; + } + if (typeof reason === "string") { + return reason; + } + return reason.length > 0 ? reason.toString("utf8") : ""; +} diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 4a6e5489a1..ddc6dce702 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -18,7 +18,6 @@ import { resolveChannelMediaMaxBytes, type HistoryEntry, } from "openclaw/plugin-sdk"; -import WebSocket from "ws"; import { getMattermostRuntime } from "../runtime.js"; import { resolveMattermostAccount } from "./accounts.js"; import { @@ -35,10 +34,14 @@ import { import { createDedupeCache, formatInboundFromLabel, - rawDataToString, resolveThreadSessionKeys, } from "./monitor-helpers.js"; import { resolveOncharPrefixes, stripOncharPrefix } from "./monitor-onchar.js"; +import { + createMattermostConnectOnce, + type MattermostEventPayload, + type MattermostWebSocketFactory, +} from "./monitor-websocket.js"; import { runWithReconnect } from "./reconnect.js"; import { sendMessageMattermost } from "./send.js"; @@ -50,29 +53,12 @@ export type MonitorMattermostOpts = { runtime?: RuntimeEnv; abortSignal?: AbortSignal; statusSink?: (patch: Partial) => void; + webSocketFactory?: MattermostWebSocketFactory; }; type FetchLike = (input: URL | RequestInfo, init?: RequestInit) => Promise; type MediaKind = "image" | "audio" | "video" | "document" | "unknown"; -type MattermostEventPayload = { - event?: string; - data?: { - post?: string; - channel_id?: string; - channel_name?: string; - channel_display_name?: string; - channel_type?: string; - sender_name?: string; - team_id?: string; - }; - broadcast?: { - channel_id?: string; - team_id?: string; - user_id?: string; - }; -}; - const RECENT_MATTERMOST_MESSAGE_TTL_MS = 5 * 60_000; const RECENT_MATTERMOST_MESSAGE_MAX = 2000; const CHANNEL_CACHE_TTL_MS = 5 * 60_000; @@ -889,99 +875,22 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} const wsUrl = buildMattermostWsUrl(baseUrl); let seq = 1; - - const connectOnce = async (): Promise => { - const ws = new WebSocket(wsUrl); - const onAbort = () => ws.terminate(); - opts.abortSignal?.addEventListener("abort", onAbort, { once: true }); - - try { - return await new Promise((resolve, reject) => { - let opened = false; - - ws.on("open", () => { - opened = true; - opts.statusSink?.({ - connected: true, - lastConnectedAt: Date.now(), - lastError: null, - }); - ws.send( - JSON.stringify({ - seq: seq++, - action: "authentication_challenge", - data: { token: botToken }, - }), - ); - }); - - ws.on("message", async (data) => { - const raw = rawDataToString(data); - let payload: MattermostEventPayload; - try { - payload = JSON.parse(raw) as MattermostEventPayload; - } catch { - return; - } - if (payload.event !== "posted") { - return; - } - const postData = payload.data?.post; - if (!postData) { - return; - } - let post: MattermostPost | null = null; - if (typeof postData === "string") { - try { - post = JSON.parse(postData) as MattermostPost; - } catch { - return; - } - } else if (typeof postData === "object") { - post = postData as MattermostPost; - } - if (!post) { - return; - } - try { - await debouncer.enqueue({ post, payload }); - } catch (err) { - runtime.error?.(`mattermost handler failed: ${String(err)}`); - } - }); - - ws.on("close", (code, reason) => { - const message = reason.length > 0 ? reason.toString("utf8") : ""; - opts.statusSink?.({ - connected: false, - lastDisconnect: { - at: Date.now(), - status: code, - error: message || undefined, - }, - }); - if (opened) { - resolve(); - } else { - reject(new Error(`websocket closed before open (code ${code})`)); - } - }); - - ws.on("error", (err) => { - runtime.error?.(`mattermost websocket error: ${String(err)}`); - opts.statusSink?.({ - lastError: String(err), - }); - ws.close(); - }); - }); - } finally { - opts.abortSignal?.removeEventListener("abort", onAbort); - } - }; + const connectOnce = createMattermostConnectOnce({ + wsUrl, + botToken, + abortSignal: opts.abortSignal, + statusSink: opts.statusSink, + runtime, + webSocketFactory: opts.webSocketFactory, + nextSeq: () => seq++, + onPosted: async (post, payload) => { + await debouncer.enqueue({ post, payload }); + }, + }); await runWithReconnect(connectOnce, { abortSignal: opts.abortSignal, + jitterRatio: 0.2, onError: (err) => { runtime.error?.(`mattermost connection failed: ${String(err)}`); opts.statusSink?.({ lastError: String(err), connected: false }); diff --git a/extensions/mattermost/src/mattermost/reconnect.test.ts b/extensions/mattermost/src/mattermost/reconnect.test.ts index ebcac2ec85..e67a7fafdf 100644 --- a/extensions/mattermost/src/mattermost/reconnect.test.ts +++ b/extensions/mattermost/src/mattermost/reconnect.test.ts @@ -148,4 +148,44 @@ describe("runWithReconnect", () => { expect(connectFn).toHaveBeenCalledTimes(1); expect(elapsed).toBeLessThan(5000); }); + + it("applies jitter to reconnect delay when configured", async () => { + const abort = new AbortController(); + const delays: number[] = []; + let callCount = 0; + const connectFn = vi.fn(async () => { + callCount++; + if (callCount === 1) { + throw new Error("connection refused"); + } + abort.abort(); + }); + + await runWithReconnect(connectFn, { + abortSignal: abort.signal, + onReconnect: (delayMs) => delays.push(delayMs), + initialDelayMs: 100, + jitterRatio: 0.5, + random: () => 1, + }); + + expect(connectFn).toHaveBeenCalledTimes(2); + expect(delays).toEqual([150]); + }); + + it("supports strategy hook to stop reconnecting after failure", async () => { + const onReconnect = vi.fn(); + const connectFn = vi.fn(async () => { + throw new Error("fatal"); + }); + + await runWithReconnect(connectFn, { + initialDelayMs: 1, + onReconnect, + shouldReconnect: (params) => params.outcome !== "rejected", + }); + + expect(connectFn).toHaveBeenCalledTimes(1); + expect(onReconnect).not.toHaveBeenCalled(); + }); }); diff --git a/extensions/mattermost/src/mattermost/reconnect.ts b/extensions/mattermost/src/mattermost/reconnect.ts index a2782979d1..7de004d1c1 100644 --- a/extensions/mattermost/src/mattermost/reconnect.ts +++ b/extensions/mattermost/src/mattermost/reconnect.ts @@ -1,3 +1,23 @@ +export type ReconnectOutcome = "resolved" | "rejected"; + +export type ShouldReconnectParams = { + attempt: number; + delayMs: number; + outcome: ReconnectOutcome; + error?: unknown; +}; + +export type RunWithReconnectOpts = { + abortSignal?: AbortSignal; + onError?: (err: unknown) => void; + onReconnect?: (delayMs: number) => void; + initialDelayMs?: number; + maxDelayMs?: number; + jitterRatio?: number; + random?: () => number; + shouldReconnect?: (params: ShouldReconnectParams) => boolean; +}; + /** * Reconnection loop with exponential backoff. * @@ -8,19 +28,18 @@ */ export async function runWithReconnect( connectFn: () => Promise, - opts: { - abortSignal?: AbortSignal; - onError?: (err: unknown) => void; - onReconnect?: (delayMs: number) => void; - initialDelayMs?: number; - maxDelayMs?: number; - } = {}, + opts: RunWithReconnectOpts = {}, ): Promise { const { initialDelayMs = 2000, maxDelayMs = 60_000 } = opts; + const jitterRatio = Math.max(0, opts.jitterRatio ?? 0); + const random = opts.random ?? Math.random; let retryDelay = initialDelayMs; + let attempt = 0; while (!opts.abortSignal?.aborted) { let shouldIncreaseDelay = false; + let outcome: ReconnectOutcome = "resolved"; + let error: unknown; try { await connectFn(); retryDelay = initialDelayMs; @@ -28,20 +47,43 @@ export async function runWithReconnect( if (opts.abortSignal?.aborted) { return; } + outcome = "rejected"; + error = err; opts.onError?.(err); shouldIncreaseDelay = true; } if (opts.abortSignal?.aborted) { return; } - opts.onReconnect?.(retryDelay); - await sleepAbortable(retryDelay, opts.abortSignal); + const delayMs = withJitter(retryDelay, jitterRatio, random); + const shouldReconnect = + opts.shouldReconnect?.({ + attempt, + delayMs, + outcome, + error, + }) ?? true; + if (!shouldReconnect) { + return; + } + opts.onReconnect?.(delayMs); + await sleepAbortable(delayMs, opts.abortSignal); if (shouldIncreaseDelay) { retryDelay = Math.min(retryDelay * 2, maxDelayMs); } + attempt++; } } +function withJitter(baseMs: number, jitterRatio: number, random: () => number): number { + if (jitterRatio <= 0) { + return baseMs; + } + const normalized = Math.max(0, Math.min(1, random())); + const spread = baseMs * jitterRatio; + return Math.max(1, Math.round(baseMs - spread + normalized * spread * 2)); +} + function sleepAbortable(ms: number, signal?: AbortSignal): Promise { return new Promise((resolve) => { if (signal?.aborted) {