mirror of
https://github.com/openclaw/openclaw.git
synced 2026-02-19 18:39:20 -05:00
fix(mattermost): add WebSocket reconnection with exponential backoff (#14962)
* fix(mattermost): add WebSocket reconnection with exponential backoff Fixes #13980 The Mattermost WebSocket monitor had no error handling around the reconnection loop. When connectOnce() threw (e.g. 'fetch failed' from network issues), the error propagated through the while loop, causing the gateway to log 'channel exited' and never restart. Extract runWithReconnect() utility that: - Catches thrown errors from connectFn and retries - Uses exponential backoff (2s→4s→8s→...→60s cap) - Resets backoff after successful connections - Stops cleanly on abort signal - Reports errors and reconnect delays via callbacks * fix(mattermost): make backoff sleep abort-aware and reject on WS connect failure * fix(mattermost): clean up abort listener on normal timeout to prevent leak * fix(mattermost): skip error reporting when abort causes connection rejection * fix(mattermost): use try/finally for abort listener cleanup in connectOnce * fix: force-close WebSocket on error to prevent reconnect hang * fix: use ws.terminate() on abort for reliable teardown during CONNECTING state * fix(mattermost): use initial retry delay for reconnect backoff --------- Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
@@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Clawdock: avoid Zsh readonly variable collisions in helper scripts. (#15501) Thanks @nkelner.
|
||||
- Discord: route autoThread replies to existing threads instead of the root channel. (#8302) Thanks @gavinbmoore, @thewilloftheshadow.
|
||||
- Discord/Agents: apply channel/group `historyLimit` during embedded-runner history compaction to prevent long-running channel sessions from bypassing truncation and overflowing context windows. (#11224) Thanks @shadril238.
|
||||
- Mattermost (plugin): retry websocket monitor connections with exponential backoff and abort-aware teardown so transient connect failures no longer permanently stop monitoring. (#14962) Thanks @mcaxtr.
|
||||
- Telegram: scope skill commands to the resolved agent for default accounts so `setMyCommands` no longer triggers `BOT_COMMANDS_TOO_MUCH` when multiple agents are configured. (#15599)
|
||||
- Plugins/Hooks: fire `before_tool_call` hook exactly once per tool invocation in embedded runs by removing duplicate dispatch paths while preserving parameter mutation semantics. (#15635) Thanks @lailoo.
|
||||
- Agents/Image tool: cap image-analysis completion `maxTokens` by model capability (`min(4096, model.maxTokens)`) to avoid over-limit provider failures while still preventing truncation. (#11770) Thanks @detecti1.
|
||||
|
||||
@@ -39,6 +39,7 @@ import {
|
||||
resolveThreadSessionKeys,
|
||||
} from "./monitor-helpers.js";
|
||||
import { resolveOncharPrefixes, stripOncharPrefix } from "./monitor-onchar.js";
|
||||
import { runWithReconnect } from "./reconnect.js";
|
||||
import { sendMessageMattermost } from "./send.js";
|
||||
|
||||
export type MonitorMattermostOpts = {
|
||||
@@ -891,88 +892,102 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
|
||||
|
||||
const connectOnce = async (): Promise<void> => {
|
||||
const ws = new WebSocket(wsUrl);
|
||||
const onAbort = () => ws.close();
|
||||
const onAbort = () => ws.terminate();
|
||||
opts.abortSignal?.addEventListener("abort", onAbort, { once: true });
|
||||
|
||||
return await new Promise((resolve) => {
|
||||
ws.on("open", () => {
|
||||
opts.statusSink?.({
|
||||
connected: true,
|
||||
lastConnectedAt: Date.now(),
|
||||
lastError: null,
|
||||
});
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
seq: seq++,
|
||||
action: "authentication_challenge",
|
||||
data: { token: botToken },
|
||||
}),
|
||||
);
|
||||
});
|
||||
try {
|
||||
return await new Promise((resolve, reject) => {
|
||||
let opened = false;
|
||||
|
||||
ws.on("message", async (data) => {
|
||||
const raw = rawDataToString(data);
|
||||
let payload: MattermostEventPayload;
|
||||
try {
|
||||
payload = JSON.parse(raw) as MattermostEventPayload;
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
if (payload.event !== "posted") {
|
||||
return;
|
||||
}
|
||||
const postData = payload.data?.post;
|
||||
if (!postData) {
|
||||
return;
|
||||
}
|
||||
let post: MattermostPost | null = null;
|
||||
if (typeof postData === "string") {
|
||||
ws.on("open", () => {
|
||||
opened = true;
|
||||
opts.statusSink?.({
|
||||
connected: true,
|
||||
lastConnectedAt: Date.now(),
|
||||
lastError: null,
|
||||
});
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
seq: seq++,
|
||||
action: "authentication_challenge",
|
||||
data: { token: botToken },
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
ws.on("message", async (data) => {
|
||||
const raw = rawDataToString(data);
|
||||
let payload: MattermostEventPayload;
|
||||
try {
|
||||
post = JSON.parse(postData) as MattermostPost;
|
||||
payload = JSON.parse(raw) as MattermostEventPayload;
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
} else if (typeof postData === "object") {
|
||||
post = postData as MattermostPost;
|
||||
}
|
||||
if (!post) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await debouncer.enqueue({ post, payload });
|
||||
} catch (err) {
|
||||
runtime.error?.(`mattermost handler failed: ${String(err)}`);
|
||||
}
|
||||
});
|
||||
|
||||
ws.on("close", (code, reason) => {
|
||||
const message = reason.length > 0 ? reason.toString("utf8") : "";
|
||||
opts.statusSink?.({
|
||||
connected: false,
|
||||
lastDisconnect: {
|
||||
at: Date.now(),
|
||||
status: code,
|
||||
error: message || undefined,
|
||||
},
|
||||
if (payload.event !== "posted") {
|
||||
return;
|
||||
}
|
||||
const postData = payload.data?.post;
|
||||
if (!postData) {
|
||||
return;
|
||||
}
|
||||
let post: MattermostPost | null = null;
|
||||
if (typeof postData === "string") {
|
||||
try {
|
||||
post = JSON.parse(postData) as MattermostPost;
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
} else if (typeof postData === "object") {
|
||||
post = postData as MattermostPost;
|
||||
}
|
||||
if (!post) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await debouncer.enqueue({ post, payload });
|
||||
} catch (err) {
|
||||
runtime.error?.(`mattermost handler failed: ${String(err)}`);
|
||||
}
|
||||
});
|
||||
opts.abortSignal?.removeEventListener("abort", onAbort);
|
||||
resolve();
|
||||
});
|
||||
|
||||
ws.on("error", (err) => {
|
||||
runtime.error?.(`mattermost websocket error: ${String(err)}`);
|
||||
opts.statusSink?.({
|
||||
lastError: String(err),
|
||||
ws.on("close", (code, reason) => {
|
||||
const message = reason.length > 0 ? reason.toString("utf8") : "";
|
||||
opts.statusSink?.({
|
||||
connected: false,
|
||||
lastDisconnect: {
|
||||
at: Date.now(),
|
||||
status: code,
|
||||
error: message || undefined,
|
||||
},
|
||||
});
|
||||
if (opened) {
|
||||
resolve();
|
||||
} else {
|
||||
reject(new Error(`websocket closed before open (code ${code})`));
|
||||
}
|
||||
});
|
||||
|
||||
ws.on("error", (err) => {
|
||||
runtime.error?.(`mattermost websocket error: ${String(err)}`);
|
||||
opts.statusSink?.({
|
||||
lastError: String(err),
|
||||
});
|
||||
ws.close();
|
||||
});
|
||||
});
|
||||
});
|
||||
} finally {
|
||||
opts.abortSignal?.removeEventListener("abort", onAbort);
|
||||
}
|
||||
};
|
||||
|
||||
while (!opts.abortSignal?.aborted) {
|
||||
await connectOnce();
|
||||
if (opts.abortSignal?.aborted) {
|
||||
return;
|
||||
}
|
||||
await new Promise((resolve) => setTimeout(resolve, 2000));
|
||||
}
|
||||
await runWithReconnect(connectOnce, {
|
||||
abortSignal: opts.abortSignal,
|
||||
onError: (err) => {
|
||||
runtime.error?.(`mattermost connection failed: ${String(err)}`);
|
||||
opts.statusSink?.({ lastError: String(err), connected: false });
|
||||
},
|
||||
onReconnect: (delayMs) => {
|
||||
runtime.log?.(`mattermost reconnecting in ${Math.round(delayMs / 1000)}s`);
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
151
extensions/mattermost/src/mattermost/reconnect.test.ts
Normal file
151
extensions/mattermost/src/mattermost/reconnect.test.ts
Normal file
@@ -0,0 +1,151 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { runWithReconnect } from "./reconnect.js";
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
describe("runWithReconnect", () => {
|
||||
it("retries after connectFn resolves (normal close)", async () => {
|
||||
let callCount = 0;
|
||||
const abort = new AbortController();
|
||||
const connectFn = vi.fn(async () => {
|
||||
callCount++;
|
||||
if (callCount >= 3) {
|
||||
abort.abort();
|
||||
}
|
||||
});
|
||||
|
||||
await runWithReconnect(connectFn, {
|
||||
abortSignal: abort.signal,
|
||||
initialDelayMs: 1,
|
||||
});
|
||||
|
||||
expect(connectFn).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
|
||||
it("retries after connectFn throws (connection error)", async () => {
|
||||
let callCount = 0;
|
||||
const abort = new AbortController();
|
||||
const onError = vi.fn();
|
||||
const connectFn = vi.fn(async () => {
|
||||
callCount++;
|
||||
if (callCount < 3) {
|
||||
throw new Error("fetch failed");
|
||||
}
|
||||
abort.abort();
|
||||
});
|
||||
|
||||
await runWithReconnect(connectFn, {
|
||||
abortSignal: abort.signal,
|
||||
onError,
|
||||
initialDelayMs: 1,
|
||||
});
|
||||
|
||||
expect(connectFn).toHaveBeenCalledTimes(3);
|
||||
expect(onError).toHaveBeenCalledTimes(2);
|
||||
expect(onError).toHaveBeenCalledWith(expect.objectContaining({ message: "fetch failed" }));
|
||||
});
|
||||
|
||||
it("uses exponential backoff on consecutive errors, capped at maxDelayMs", async () => {
|
||||
const abort = new AbortController();
|
||||
const delays: number[] = [];
|
||||
let callCount = 0;
|
||||
const connectFn = vi.fn(async () => {
|
||||
callCount++;
|
||||
if (callCount >= 6) {
|
||||
abort.abort();
|
||||
return;
|
||||
}
|
||||
throw new Error("connection refused");
|
||||
});
|
||||
|
||||
await runWithReconnect(connectFn, {
|
||||
abortSignal: abort.signal,
|
||||
onReconnect: (delayMs) => delays.push(delayMs),
|
||||
initialDelayMs: 100,
|
||||
maxDelayMs: 1000,
|
||||
});
|
||||
|
||||
expect(connectFn).toHaveBeenCalledTimes(6);
|
||||
// 5 errors produce delays: 100, 200, 400, 800, 1000(cap)
|
||||
// 6th succeeds -> delay resets to 100
|
||||
// But 6th also aborts → onReconnect NOT called (abort check fires first)
|
||||
expect(delays).toEqual([100, 200, 400, 800, 1000]);
|
||||
});
|
||||
|
||||
it("resets backoff after successful connection", async () => {
|
||||
const abort = new AbortController();
|
||||
const delays: number[] = [];
|
||||
let callCount = 0;
|
||||
const connectFn = vi.fn(async () => {
|
||||
callCount++;
|
||||
if (callCount === 1) {
|
||||
throw new Error("first failure");
|
||||
}
|
||||
if (callCount === 2) {
|
||||
return; // success
|
||||
}
|
||||
if (callCount === 3) {
|
||||
throw new Error("second failure");
|
||||
}
|
||||
abort.abort();
|
||||
});
|
||||
|
||||
await runWithReconnect(connectFn, {
|
||||
abortSignal: abort.signal,
|
||||
onReconnect: (delayMs) => delays.push(delayMs),
|
||||
initialDelayMs: 100,
|
||||
maxDelayMs: 60_000,
|
||||
});
|
||||
|
||||
expect(connectFn).toHaveBeenCalledTimes(4);
|
||||
// call 1: fail -> delay 100
|
||||
// call 2: success → delay resets to 100
|
||||
// call 3: fail -> delay 100 (reset held)
|
||||
// call 4: success + abort → no onReconnect
|
||||
expect(delays).toEqual([100, 100, 100]);
|
||||
});
|
||||
|
||||
it("stops immediately when abort signal is pre-fired", async () => {
|
||||
const abort = new AbortController();
|
||||
abort.abort();
|
||||
const connectFn = vi.fn(async () => {});
|
||||
|
||||
await runWithReconnect(connectFn, { abortSignal: abort.signal });
|
||||
|
||||
expect(connectFn).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("stops after current connection when abort fires mid-connection", async () => {
|
||||
const abort = new AbortController();
|
||||
const connectFn = vi.fn(async () => {
|
||||
abort.abort();
|
||||
});
|
||||
|
||||
await runWithReconnect(connectFn, {
|
||||
abortSignal: abort.signal,
|
||||
initialDelayMs: 1,
|
||||
});
|
||||
|
||||
expect(connectFn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("abort signal interrupts backoff sleep immediately", async () => {
|
||||
const abort = new AbortController();
|
||||
const connectFn = vi.fn(async () => {
|
||||
// Schedule abort to fire 10ms into the 60s sleep
|
||||
setTimeout(() => abort.abort(), 10);
|
||||
});
|
||||
|
||||
const start = Date.now();
|
||||
await runWithReconnect(connectFn, {
|
||||
abortSignal: abort.signal,
|
||||
initialDelayMs: 60_000,
|
||||
});
|
||||
const elapsed = Date.now() - start;
|
||||
|
||||
expect(connectFn).toHaveBeenCalledTimes(1);
|
||||
expect(elapsed).toBeLessThan(5000);
|
||||
});
|
||||
});
|
||||
61
extensions/mattermost/src/mattermost/reconnect.ts
Normal file
61
extensions/mattermost/src/mattermost/reconnect.ts
Normal file
@@ -0,0 +1,61 @@
|
||||
/**
|
||||
* Reconnection loop with exponential backoff.
|
||||
*
|
||||
* Calls `connectFn` in a while loop. On normal resolve (connection closed),
|
||||
* the backoff resets. On thrown error (connection failed), the current delay is
|
||||
* used, then doubled for the next retry.
|
||||
* The loop exits when `abortSignal` fires.
|
||||
*/
|
||||
export async function runWithReconnect(
|
||||
connectFn: () => Promise<void>,
|
||||
opts: {
|
||||
abortSignal?: AbortSignal;
|
||||
onError?: (err: unknown) => void;
|
||||
onReconnect?: (delayMs: number) => void;
|
||||
initialDelayMs?: number;
|
||||
maxDelayMs?: number;
|
||||
} = {},
|
||||
): Promise<void> {
|
||||
const { initialDelayMs = 2000, maxDelayMs = 60_000 } = opts;
|
||||
let retryDelay = initialDelayMs;
|
||||
|
||||
while (!opts.abortSignal?.aborted) {
|
||||
let shouldIncreaseDelay = false;
|
||||
try {
|
||||
await connectFn();
|
||||
retryDelay = initialDelayMs;
|
||||
} catch (err) {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
return;
|
||||
}
|
||||
opts.onError?.(err);
|
||||
shouldIncreaseDelay = true;
|
||||
}
|
||||
if (opts.abortSignal?.aborted) {
|
||||
return;
|
||||
}
|
||||
opts.onReconnect?.(retryDelay);
|
||||
await sleepAbortable(retryDelay, opts.abortSignal);
|
||||
if (shouldIncreaseDelay) {
|
||||
retryDelay = Math.min(retryDelay * 2, maxDelayMs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function sleepAbortable(ms: number, signal?: AbortSignal): Promise<void> {
|
||||
return new Promise((resolve) => {
|
||||
if (signal?.aborted) {
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
const onAbort = () => {
|
||||
clearTimeout(timer);
|
||||
resolve();
|
||||
};
|
||||
const timer = setTimeout(() => {
|
||||
signal?.removeEventListener("abort", onAbort);
|
||||
resolve();
|
||||
}, ms);
|
||||
signal?.addEventListener("abort", onAbort, { once: true });
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user