refactor(mattermost): extract websocket monitor and reconnect policies

This commit is contained in:
Peter Steinberger
2026-02-14 03:39:02 +01:00
parent 36726b52f4
commit edbd86074f
5 changed files with 473 additions and 119 deletions

View File

@@ -0,0 +1,173 @@
import type { RuntimeEnv } from "openclaw/plugin-sdk";
import { describe, expect, it, vi } from "vitest";
import {
createMattermostConnectOnce,
type MattermostWebSocketLike,
WebSocketClosedBeforeOpenError,
} from "./monitor-websocket.js";
import { runWithReconnect } from "./reconnect.js";
class FakeWebSocket implements MattermostWebSocketLike {
public readonly sent: string[] = [];
public closeCalls = 0;
public terminateCalls = 0;
private openListeners: Array<() => void> = [];
private messageListeners: Array<(data: Buffer) => void | Promise<void>> = [];
private closeListeners: Array<(code: number, reason: Buffer) => void> = [];
private errorListeners: Array<(err: unknown) => void> = [];
on(event: "open", listener: () => void): void;
on(event: "message", listener: (data: Buffer) => void | Promise<void>): void;
on(event: "close", listener: (code: number, reason: Buffer) => void): void;
on(event: "error", listener: (err: unknown) => void): void;
on(event: "open" | "message" | "close" | "error", listener: unknown): void {
if (event === "open") {
this.openListeners.push(listener as () => void);
return;
}
if (event === "message") {
this.messageListeners.push(listener as (data: Buffer) => void | Promise<void>);
return;
}
if (event === "close") {
this.closeListeners.push(listener as (code: number, reason: Buffer) => void);
return;
}
this.errorListeners.push(listener as (err: unknown) => void);
}
send(data: string): void {
this.sent.push(data);
}
close(): void {
this.closeCalls++;
}
terminate(): void {
this.terminateCalls++;
}
emitOpen(): void {
for (const listener of this.openListeners) {
listener();
}
}
emitMessage(data: Buffer): void {
for (const listener of this.messageListeners) {
void listener(data);
}
}
emitClose(code: number, reason = ""): void {
const buffer = Buffer.from(reason, "utf8");
for (const listener of this.closeListeners) {
listener(code, buffer);
}
}
emitError(err: unknown): void {
for (const listener of this.errorListeners) {
listener(err);
}
}
}
const testRuntime = (): RuntimeEnv =>
({
log: vi.fn(),
error: vi.fn(),
exit: ((code: number): never => {
throw new Error(`exit ${code}`);
}) as RuntimeEnv["exit"],
}) as RuntimeEnv;
describe("mattermost websocket monitor", () => {
it("rejects when websocket closes before open", async () => {
const socket = new FakeWebSocket();
const connectOnce = createMattermostConnectOnce({
wsUrl: "wss://example.invalid/api/v4/websocket",
botToken: "token",
runtime: testRuntime(),
nextSeq: () => 1,
onPosted: async () => {},
webSocketFactory: () => socket,
});
queueMicrotask(() => {
socket.emitClose(1006, "connection refused");
});
const failure = connectOnce();
await expect(failure).rejects.toBeInstanceOf(WebSocketClosedBeforeOpenError);
await expect(failure).rejects.toMatchObject({
message: "websocket closed before open (code 1006)",
});
});
it("retries when first attempt errors before open and next attempt succeeds", async () => {
const abort = new AbortController();
const reconnectDelays: number[] = [];
const onError = vi.fn();
const patches: Array<Record<string, unknown>> = [];
const sockets: FakeWebSocket[] = [];
let disconnects = 0;
const connectOnce = createMattermostConnectOnce({
wsUrl: "wss://example.invalid/api/v4/websocket",
botToken: "token",
runtime: testRuntime(),
nextSeq: (() => {
let seq = 1;
return () => seq++;
})(),
onPosted: async () => {},
abortSignal: abort.signal,
statusSink: (patch) => {
patches.push(patch as Record<string, unknown>);
if (patch.lastDisconnect) {
disconnects++;
if (disconnects >= 2) {
abort.abort();
}
}
},
webSocketFactory: () => {
const socket = new FakeWebSocket();
const attempt = sockets.length;
sockets.push(socket);
queueMicrotask(() => {
if (attempt === 0) {
socket.emitError(new Error("boom"));
socket.emitClose(1006, "connection refused");
return;
}
socket.emitOpen();
socket.emitClose(1000);
});
return socket;
},
});
await runWithReconnect(connectOnce, {
abortSignal: abort.signal,
initialDelayMs: 1,
onError,
onReconnect: (delay) => reconnectDelays.push(delay),
});
expect(sockets).toHaveLength(2);
expect(sockets[0].closeCalls).toBe(1);
expect(sockets[1].sent).toHaveLength(1);
expect(JSON.parse(sockets[1].sent[0])).toMatchObject({
action: "authentication_challenge",
data: { token: "token" },
seq: 1,
});
expect(onError).toHaveBeenCalledTimes(1);
expect(reconnectDelays).toEqual([1]);
expect(patches.some((patch) => patch.connected === true)).toBe(true);
expect(patches.filter((patch) => patch.connected === false)).toHaveLength(2);
});
});

View File

@@ -0,0 +1,190 @@
import type { ChannelAccountSnapshot, RuntimeEnv } from "openclaw/plugin-sdk";
import WebSocket from "ws";
import type { MattermostPost } from "./client.js";
import { rawDataToString } from "./monitor-helpers.js";
export type MattermostEventPayload = {
event?: string;
data?: {
post?: string;
channel_id?: string;
channel_name?: string;
channel_display_name?: string;
channel_type?: string;
sender_name?: string;
team_id?: string;
};
broadcast?: {
channel_id?: string;
team_id?: string;
user_id?: string;
};
};
export type MattermostWebSocketLike = {
on(event: "open", listener: () => void): void;
on(event: "message", listener: (data: WebSocket.RawData) => void | Promise<void>): void;
on(event: "close", listener: (code: number, reason: Buffer) => void): void;
on(event: "error", listener: (err: unknown) => void): void;
send(data: string): void;
close(): void;
terminate(): void;
};
export type MattermostWebSocketFactory = (url: string) => MattermostWebSocketLike;
export class WebSocketClosedBeforeOpenError extends Error {
constructor(
public readonly code: number,
public readonly reason?: string,
) {
super(`websocket closed before open (code ${code})`);
this.name = "WebSocketClosedBeforeOpenError";
}
}
type CreateMattermostConnectOnceOpts = {
wsUrl: string;
botToken: string;
abortSignal?: AbortSignal;
statusSink?: (patch: Partial<ChannelAccountSnapshot>) => void;
runtime: RuntimeEnv;
nextSeq: () => number;
onPosted: (post: MattermostPost, payload: MattermostEventPayload) => Promise<void>;
webSocketFactory?: MattermostWebSocketFactory;
};
export const defaultMattermostWebSocketFactory: MattermostWebSocketFactory = (url) =>
new WebSocket(url) as MattermostWebSocketLike;
export function parsePostedEvent(
data: WebSocket.RawData,
): { payload: MattermostEventPayload; post: MattermostPost } | null {
const raw = rawDataToString(data);
let payload: MattermostEventPayload;
try {
payload = JSON.parse(raw) as MattermostEventPayload;
} catch {
return null;
}
if (payload.event !== "posted") {
return null;
}
const postData = payload.data?.post;
if (!postData) {
return null;
}
let post: MattermostPost | null = null;
if (typeof postData === "string") {
try {
post = JSON.parse(postData) as MattermostPost;
} catch {
return null;
}
} else if (typeof postData === "object") {
post = postData as MattermostPost;
}
if (!post) {
return null;
}
return { payload, post };
}
export function createMattermostConnectOnce(
opts: CreateMattermostConnectOnceOpts,
): () => Promise<void> {
const webSocketFactory = opts.webSocketFactory ?? defaultMattermostWebSocketFactory;
return async () => {
const ws = webSocketFactory(opts.wsUrl);
const onAbort = () => ws.terminate();
opts.abortSignal?.addEventListener("abort", onAbort, { once: true });
try {
return await new Promise<void>((resolve, reject) => {
let opened = false;
let settled = false;
const resolveOnce = () => {
if (settled) {
return;
}
settled = true;
resolve();
};
const rejectOnce = (error: Error) => {
if (settled) {
return;
}
settled = true;
reject(error);
};
ws.on("open", () => {
opened = true;
opts.statusSink?.({
connected: true,
lastConnectedAt: Date.now(),
lastError: null,
});
ws.send(
JSON.stringify({
seq: opts.nextSeq(),
action: "authentication_challenge",
data: { token: opts.botToken },
}),
);
});
ws.on("message", async (data) => {
const parsed = parsePostedEvent(data);
if (!parsed) {
return;
}
try {
await opts.onPosted(parsed.post, parsed.payload);
} catch (err) {
opts.runtime.error?.(`mattermost handler failed: ${String(err)}`);
}
});
ws.on("close", (code, reason) => {
const message = reasonToString(reason);
opts.statusSink?.({
connected: false,
lastDisconnect: {
at: Date.now(),
status: code,
error: message || undefined,
},
});
if (opened) {
resolveOnce();
return;
}
rejectOnce(new WebSocketClosedBeforeOpenError(code, message || undefined));
});
ws.on("error", (err) => {
opts.runtime.error?.(`mattermost websocket error: ${String(err)}`);
opts.statusSink?.({
lastError: String(err),
});
try {
ws.close();
} catch {}
});
});
} finally {
opts.abortSignal?.removeEventListener("abort", onAbort);
}
};
}
function reasonToString(reason: Buffer | string | undefined): string {
if (!reason) {
return "";
}
if (typeof reason === "string") {
return reason;
}
return reason.length > 0 ? reason.toString("utf8") : "";
}

View File

@@ -18,7 +18,6 @@ import {
resolveChannelMediaMaxBytes,
type HistoryEntry,
} from "openclaw/plugin-sdk";
import WebSocket from "ws";
import { getMattermostRuntime } from "../runtime.js";
import { resolveMattermostAccount } from "./accounts.js";
import {
@@ -35,10 +34,14 @@ import {
import {
createDedupeCache,
formatInboundFromLabel,
rawDataToString,
resolveThreadSessionKeys,
} from "./monitor-helpers.js";
import { resolveOncharPrefixes, stripOncharPrefix } from "./monitor-onchar.js";
import {
createMattermostConnectOnce,
type MattermostEventPayload,
type MattermostWebSocketFactory,
} from "./monitor-websocket.js";
import { runWithReconnect } from "./reconnect.js";
import { sendMessageMattermost } from "./send.js";
@@ -50,29 +53,12 @@ export type MonitorMattermostOpts = {
runtime?: RuntimeEnv;
abortSignal?: AbortSignal;
statusSink?: (patch: Partial<ChannelAccountSnapshot>) => void;
webSocketFactory?: MattermostWebSocketFactory;
};
type FetchLike = (input: URL | RequestInfo, init?: RequestInit) => Promise<Response>;
type MediaKind = "image" | "audio" | "video" | "document" | "unknown";
type MattermostEventPayload = {
event?: string;
data?: {
post?: string;
channel_id?: string;
channel_name?: string;
channel_display_name?: string;
channel_type?: string;
sender_name?: string;
team_id?: string;
};
broadcast?: {
channel_id?: string;
team_id?: string;
user_id?: string;
};
};
const RECENT_MATTERMOST_MESSAGE_TTL_MS = 5 * 60_000;
const RECENT_MATTERMOST_MESSAGE_MAX = 2000;
const CHANNEL_CACHE_TTL_MS = 5 * 60_000;
@@ -889,99 +875,22 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
const wsUrl = buildMattermostWsUrl(baseUrl);
let seq = 1;
const connectOnce = async (): Promise<void> => {
const ws = new WebSocket(wsUrl);
const onAbort = () => ws.terminate();
opts.abortSignal?.addEventListener("abort", onAbort, { once: true });
try {
return await new Promise((resolve, reject) => {
let opened = false;
ws.on("open", () => {
opened = true;
opts.statusSink?.({
connected: true,
lastConnectedAt: Date.now(),
lastError: null,
});
ws.send(
JSON.stringify({
seq: seq++,
action: "authentication_challenge",
data: { token: botToken },
}),
);
});
ws.on("message", async (data) => {
const raw = rawDataToString(data);
let payload: MattermostEventPayload;
try {
payload = JSON.parse(raw) as MattermostEventPayload;
} catch {
return;
}
if (payload.event !== "posted") {
return;
}
const postData = payload.data?.post;
if (!postData) {
return;
}
let post: MattermostPost | null = null;
if (typeof postData === "string") {
try {
post = JSON.parse(postData) as MattermostPost;
} catch {
return;
}
} else if (typeof postData === "object") {
post = postData as MattermostPost;
}
if (!post) {
return;
}
try {
await debouncer.enqueue({ post, payload });
} catch (err) {
runtime.error?.(`mattermost handler failed: ${String(err)}`);
}
});
ws.on("close", (code, reason) => {
const message = reason.length > 0 ? reason.toString("utf8") : "";
opts.statusSink?.({
connected: false,
lastDisconnect: {
at: Date.now(),
status: code,
error: message || undefined,
},
});
if (opened) {
resolve();
} else {
reject(new Error(`websocket closed before open (code ${code})`));
}
});
ws.on("error", (err) => {
runtime.error?.(`mattermost websocket error: ${String(err)}`);
opts.statusSink?.({
lastError: String(err),
});
ws.close();
});
});
} finally {
opts.abortSignal?.removeEventListener("abort", onAbort);
}
};
const connectOnce = createMattermostConnectOnce({
wsUrl,
botToken,
abortSignal: opts.abortSignal,
statusSink: opts.statusSink,
runtime,
webSocketFactory: opts.webSocketFactory,
nextSeq: () => seq++,
onPosted: async (post, payload) => {
await debouncer.enqueue({ post, payload });
},
});
await runWithReconnect(connectOnce, {
abortSignal: opts.abortSignal,
jitterRatio: 0.2,
onError: (err) => {
runtime.error?.(`mattermost connection failed: ${String(err)}`);
opts.statusSink?.({ lastError: String(err), connected: false });

View File

@@ -148,4 +148,44 @@ describe("runWithReconnect", () => {
expect(connectFn).toHaveBeenCalledTimes(1);
expect(elapsed).toBeLessThan(5000);
});
it("applies jitter to reconnect delay when configured", async () => {
const abort = new AbortController();
const delays: number[] = [];
let callCount = 0;
const connectFn = vi.fn(async () => {
callCount++;
if (callCount === 1) {
throw new Error("connection refused");
}
abort.abort();
});
await runWithReconnect(connectFn, {
abortSignal: abort.signal,
onReconnect: (delayMs) => delays.push(delayMs),
initialDelayMs: 100,
jitterRatio: 0.5,
random: () => 1,
});
expect(connectFn).toHaveBeenCalledTimes(2);
expect(delays).toEqual([150]);
});
it("supports strategy hook to stop reconnecting after failure", async () => {
const onReconnect = vi.fn();
const connectFn = vi.fn(async () => {
throw new Error("fatal");
});
await runWithReconnect(connectFn, {
initialDelayMs: 1,
onReconnect,
shouldReconnect: (params) => params.outcome !== "rejected",
});
expect(connectFn).toHaveBeenCalledTimes(1);
expect(onReconnect).not.toHaveBeenCalled();
});
});

View File

@@ -1,3 +1,23 @@
export type ReconnectOutcome = "resolved" | "rejected";
export type ShouldReconnectParams = {
attempt: number;
delayMs: number;
outcome: ReconnectOutcome;
error?: unknown;
};
export type RunWithReconnectOpts = {
abortSignal?: AbortSignal;
onError?: (err: unknown) => void;
onReconnect?: (delayMs: number) => void;
initialDelayMs?: number;
maxDelayMs?: number;
jitterRatio?: number;
random?: () => number;
shouldReconnect?: (params: ShouldReconnectParams) => boolean;
};
/**
* Reconnection loop with exponential backoff.
*
@@ -8,19 +28,18 @@
*/
export async function runWithReconnect(
connectFn: () => Promise<void>,
opts: {
abortSignal?: AbortSignal;
onError?: (err: unknown) => void;
onReconnect?: (delayMs: number) => void;
initialDelayMs?: number;
maxDelayMs?: number;
} = {},
opts: RunWithReconnectOpts = {},
): Promise<void> {
const { initialDelayMs = 2000, maxDelayMs = 60_000 } = opts;
const jitterRatio = Math.max(0, opts.jitterRatio ?? 0);
const random = opts.random ?? Math.random;
let retryDelay = initialDelayMs;
let attempt = 0;
while (!opts.abortSignal?.aborted) {
let shouldIncreaseDelay = false;
let outcome: ReconnectOutcome = "resolved";
let error: unknown;
try {
await connectFn();
retryDelay = initialDelayMs;
@@ -28,20 +47,43 @@ export async function runWithReconnect(
if (opts.abortSignal?.aborted) {
return;
}
outcome = "rejected";
error = err;
opts.onError?.(err);
shouldIncreaseDelay = true;
}
if (opts.abortSignal?.aborted) {
return;
}
opts.onReconnect?.(retryDelay);
await sleepAbortable(retryDelay, opts.abortSignal);
const delayMs = withJitter(retryDelay, jitterRatio, random);
const shouldReconnect =
opts.shouldReconnect?.({
attempt,
delayMs,
outcome,
error,
}) ?? true;
if (!shouldReconnect) {
return;
}
opts.onReconnect?.(delayMs);
await sleepAbortable(delayMs, opts.abortSignal);
if (shouldIncreaseDelay) {
retryDelay = Math.min(retryDelay * 2, maxDelayMs);
}
attempt++;
}
}
function withJitter(baseMs: number, jitterRatio: number, random: () => number): number {
if (jitterRatio <= 0) {
return baseMs;
}
const normalized = Math.max(0, Math.min(1, random()));
const spread = baseMs * jitterRatio;
return Math.max(1, Math.round(baseMs - spread + normalized * spread * 2));
}
function sleepAbortable(ms: number, signal?: AbortSignal): Promise<void> {
return new Promise((resolve) => {
if (signal?.aborted) {