From 07fdceb5fd5dc214f1b7f50c7dfa6925596c7d74 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Wed, 18 Feb 2026 00:02:51 -0500 Subject: [PATCH] refactor: centralize presence routing and version precedence coverage (#19609) Merged via /review-pr -> /prepare-pr -> /merge-pr. Prepared head SHA: 10d9df5263f5e14712fa4f9f62b7a686dc55e6ae Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com> Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com> Reviewed-by: @gumadeiras --- CHANGELOG.md | 1 + src/gateway/server-methods/system.ts | 18 ++--- src/gateway/server.auth.e2e.test.ts | 80 +++++++++++++++++++ src/gateway/server/presence-events.test.ts | 35 ++++++++ src/gateway/server/presence-events.ts | 22 +++++ src/gateway/server/ws-connection.ts | 18 +---- .../server/ws-connection/message-handler.ts | 3 +- src/infra/system-presence.test.ts | 27 ++++++- src/infra/system-presence.ts | 7 +- src/infra/system-presence.version.test.ts | 60 ++++++++++++++ src/version.test.ts | 40 ++++++++++ src/version.ts | 27 +++++++ 12 files changed, 305 insertions(+), 33 deletions(-) create mode 100644 src/gateway/server/presence-events.test.ts create mode 100644 src/gateway/server/presence-events.ts create mode 100644 src/infra/system-presence.version.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 6036ca9b59..61cfa81c3f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,7 @@ Docs: https://docs.openclaw.ai - Agents/Tools: strip duplicated `read` truncation payloads from tool-result `details` and make pre-call context guarding account for heavy tool-result metadata, so repeated `read` calls no longer bypass compaction and overflow model context windows. Thanks @tyler6204. - Reply threading: keep reply context sticky across streamed/split chunks and preserve `replyToId` on all chunk sends across shared and channel-specific delivery paths (including iMessage, BlueBubbles, Telegram, Discord, and Matrix), so follow-up bubbles stay attached to the same referenced message. Thanks @tyler6204. - Gateway/Agent: defer transient lifecycle `error` snapshots with a short grace window so `agent.wait` does not resolve early during retry/failover. Thanks @tyler6204. +- Gateway/Presence: centralize presence snapshot broadcasts and unify runtime version precedence (`OPENCLAW_VERSION` > `OPENCLAW_SERVICE_VERSION` > `npm_package_version`) so self-presence and websocket `hello-ok` report consistent versions. - Hooks/Automation: bridge outbound/inbound message lifecycle into internal hook events (`message:received`, `message:sent`) with session-key correlation guards, while keeping per-payload success/error reporting accurate for chunked and best-effort deliveries. (PR #9387) - Media understanding: honor `agents.defaults.imageModel` during auto-discovery so implicit image analysis uses configured primary/fallback image models. (PR #7607) - iOS/Onboarding: stop auth Step 3 retry-loop churn by pausing reconnect attempts on unauthorized/missing-token gateway errors and keeping auth/pairing issue state sticky during manual retry. (#19153) Thanks @mbelinky. diff --git a/src/gateway/server-methods/system.ts b/src/gateway/server-methods/system.ts index b9c5e64ca0..7ee8ac35d7 100644 --- a/src/gateway/server-methods/system.ts +++ b/src/gateway/server-methods/system.ts @@ -4,6 +4,7 @@ import { setHeartbeatsEnabled } from "../../infra/heartbeat-runner.js"; import { enqueueSystemEvent, isSystemEventContextChanged } from "../../infra/system-events.js"; import { listSystemPresence, updateSystemPresence } from "../../infra/system-presence.js"; import { ErrorCodes, errorShape } from "../protocol/index.js"; +import { broadcastPresenceSnapshot } from "../server/presence-events.js"; import type { GatewayRequestHandlers } from "./types.js"; export const systemHandlers: GatewayRequestHandlers = { @@ -123,18 +124,11 @@ export const systemHandlers: GatewayRequestHandlers = { } else { enqueueSystemEvent(text, { sessionKey }); } - const nextPresenceVersion = context.incrementPresenceVersion(); - context.broadcast( - "presence", - { presence: listSystemPresence() }, - { - dropIfSlow: true, - stateVersion: { - presence: nextPresenceVersion, - health: context.getHealthVersion(), - }, - }, - ); + broadcastPresenceSnapshot({ + broadcast: context.broadcast, + incrementPresenceVersion: context.incrementPresenceVersion, + getHealthVersion: context.getHealthVersion, + }); respond(true, { ok: true }, undefined); }, }; diff --git a/src/gateway/server.auth.e2e.test.ts b/src/gateway/server.auth.e2e.test.ts index f270adb18e..fbf588a407 100644 --- a/src/gateway/server.auth.e2e.test.ts +++ b/src/gateway/server.auth.e2e.test.ts @@ -1,5 +1,6 @@ import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, test, vi } from "vitest"; import { WebSocket } from "ws"; +import { withEnvAsync } from "../test-utils/env.js"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js"; import { buildDeviceAuthPayload } from "./device-auth.js"; import { PROTOCOL_VERSION } from "./protocol/index.js"; @@ -63,6 +64,13 @@ function restoreGatewayToken(prevToken: string | undefined) { } } +async function withRuntimeVersionEnv( + env: Record, + run: () => Promise, +): Promise { + return withEnvAsync(env, run); +} + const TEST_OPERATOR_CLIENT = { id: GATEWAY_CLIENT_NAMES.TEST, version: "1.0.0", @@ -235,6 +243,78 @@ describe("gateway server auth/connect", () => { ws.close(); }); + test("connect (req) handshake prefers service version fallback in hello-ok payload", async () => { + await withRuntimeVersionEnv( + { + OPENCLAW_VERSION: " ", + OPENCLAW_SERVICE_VERSION: "2.4.6-service", + npm_package_version: "1.0.0-package", + }, + async () => { + const ws = await openWs(port); + const res = await connectReq(ws); + expect(res.ok).toBe(true); + const payload = res.payload as + | { + type?: unknown; + server?: { version?: string }; + } + | undefined; + expect(payload?.type).toBe("hello-ok"); + expect(payload?.server?.version).toBe("2.4.6-service"); + ws.close(); + }, + ); + }); + + test("connect (req) handshake prefers OPENCLAW_VERSION over service version", async () => { + await withRuntimeVersionEnv( + { + OPENCLAW_VERSION: "9.9.9-cli", + OPENCLAW_SERVICE_VERSION: "2.4.6-service", + npm_package_version: "1.0.0-package", + }, + async () => { + const ws = await openWs(port); + const res = await connectReq(ws); + expect(res.ok).toBe(true); + const payload = res.payload as + | { + type?: unknown; + server?: { version?: string }; + } + | undefined; + expect(payload?.type).toBe("hello-ok"); + expect(payload?.server?.version).toBe("9.9.9-cli"); + ws.close(); + }, + ); + }); + + test("connect (req) handshake falls back to npm_package_version when higher-precedence env values are blank", async () => { + await withRuntimeVersionEnv( + { + OPENCLAW_VERSION: " ", + OPENCLAW_SERVICE_VERSION: "\t", + npm_package_version: "1.0.0-package", + }, + async () => { + const ws = await openWs(port); + const res = await connectReq(ws); + expect(res.ok).toBe(true); + const payload = res.payload as + | { + type?: unknown; + server?: { version?: string }; + } + | undefined; + expect(payload?.type).toBe("hello-ok"); + expect(payload?.server?.version).toBe("1.0.0-package"); + ws.close(); + }, + ); + }); + test("does not grant admin when scopes are empty", async () => { const ws = await openWs(port); const res = await connectReq(ws, { scopes: [] }); diff --git a/src/gateway/server/presence-events.test.ts b/src/gateway/server/presence-events.test.ts new file mode 100644 index 0000000000..51372bfa4a --- /dev/null +++ b/src/gateway/server/presence-events.test.ts @@ -0,0 +1,35 @@ +import { describe, expect, it, vi } from "vitest"; +import { broadcastPresenceSnapshot } from "./presence-events.js"; + +describe("broadcastPresenceSnapshot", () => { + it("increments version and broadcasts presence with state versions", () => { + const broadcast = vi.fn(); + const incrementPresenceVersion = vi.fn(() => 7); + const getHealthVersion = vi.fn(() => 11); + + const presenceVersion = broadcastPresenceSnapshot({ + broadcast, + incrementPresenceVersion, + getHealthVersion, + }); + + expect(presenceVersion).toBe(7); + expect(incrementPresenceVersion).toHaveBeenCalledTimes(1); + expect(getHealthVersion).toHaveBeenCalledTimes(1); + expect(broadcast).toHaveBeenCalledTimes(1); + + const [event, payload, opts] = broadcast.mock.calls[0] as [ + string, + unknown, + { dropIfSlow?: boolean; stateVersion?: { presence?: number; health?: number } } | undefined, + ]; + + expect(event).toBe("presence"); + if (!payload || typeof payload !== "object" || Array.isArray(payload)) { + throw new Error("expected object payload"); + } + expect(Array.isArray((payload as { presence?: unknown }).presence)).toBe(true); + expect(opts?.dropIfSlow).toBe(true); + expect(opts?.stateVersion).toEqual({ presence: 7, health: 11 }); + }); +}); diff --git a/src/gateway/server/presence-events.ts b/src/gateway/server/presence-events.ts new file mode 100644 index 0000000000..cf5b5b832b --- /dev/null +++ b/src/gateway/server/presence-events.ts @@ -0,0 +1,22 @@ +import { listSystemPresence } from "../../infra/system-presence.js"; +import type { GatewayBroadcastFn } from "../server-broadcast.js"; + +export function broadcastPresenceSnapshot(params: { + broadcast: GatewayBroadcastFn; + incrementPresenceVersion: () => number; + getHealthVersion: () => number; +}): number { + const presenceVersion = params.incrementPresenceVersion(); + params.broadcast( + "presence", + { presence: listSystemPresence() }, + { + dropIfSlow: true, + stateVersion: { + presence: presenceVersion, + health: params.getHealthVersion(), + }, + }, + ); + return presenceVersion; +} diff --git a/src/gateway/server/ws-connection.ts b/src/gateway/server/ws-connection.ts index c02dc337b0..e7c9d458f8 100644 --- a/src/gateway/server/ws-connection.ts +++ b/src/gateway/server/ws-connection.ts @@ -2,7 +2,7 @@ import { randomUUID } from "node:crypto"; import type { WebSocket, WebSocketServer } from "ws"; import { resolveCanvasHostUrl } from "../../infra/canvas-host-url.js"; import { removeRemoteNodeInfo } from "../../infra/skills-remote.js"; -import { listSystemPresence, upsertPresence } from "../../infra/system-presence.js"; +import { upsertPresence } from "../../infra/system-presence.js"; import type { createSubsystemLogger } from "../../logging/subsystem.js"; import { truncateUtf16Safe } from "../../utils.js"; import { isWebchatClient } from "../../utils/message-channel.js"; @@ -13,7 +13,8 @@ import { getHandshakeTimeoutMs } from "../server-constants.js"; import type { GatewayRequestContext, GatewayRequestHandlers } from "../server-methods/types.js"; import { formatError } from "../server-utils.js"; import { logWs } from "../ws-log.js"; -import { getHealthVersion, getPresenceVersion, incrementPresenceVersion } from "./health-state.js"; +import { getHealthVersion, incrementPresenceVersion } from "./health-state.js"; +import { broadcastPresenceSnapshot } from "./presence-events.js"; import { attachGatewayWsMessageHandler } from "./ws-connection/message-handler.js"; import type { GatewayWsClient } from "./ws-types.js"; @@ -227,18 +228,7 @@ export function attachGatewayWsConnectionHandler(params: { } if (client?.presenceKey) { upsertPresence(client.presenceKey, { reason: "disconnect" }); - incrementPresenceVersion(); - broadcast( - "presence", - { presence: listSystemPresence() }, - { - dropIfSlow: true, - stateVersion: { - presence: getPresenceVersion(), - health: getHealthVersion(), - }, - }, - ); + broadcastPresenceSnapshot({ broadcast, incrementPresenceVersion, getHealthVersion }); } if (client?.connect?.role === "node") { const context = buildRequestContext(); diff --git a/src/gateway/server/ws-connection/message-handler.ts b/src/gateway/server/ws-connection/message-handler.ts index 51008a3590..7a50d1cc8a 100644 --- a/src/gateway/server/ws-connection/message-handler.ts +++ b/src/gateway/server/ws-connection/message-handler.ts @@ -22,6 +22,7 @@ import { loadVoiceWakeConfig } from "../../../infra/voicewake.js"; import { rawDataToString } from "../../../infra/ws.js"; import type { createSubsystemLogger } from "../../../logging/subsystem.js"; import { isGatewayCliClient, isWebchatClient } from "../../../utils/message-channel.js"; +import { resolveRuntimeServiceVersion } from "../../../version.js"; import { AUTH_RATE_LIMIT_SCOPE_DEVICE_TOKEN, AUTH_RATE_LIMIT_SCOPE_SHARED_SECRET, @@ -791,7 +792,7 @@ export function attachGatewayWsMessageHandler(params: { type: "hello-ok", protocol: PROTOCOL_VERSION, server: { - version: process.env.OPENCLAW_VERSION ?? process.env.npm_package_version ?? "dev", + version: resolveRuntimeServiceVersion(process.env, "dev"), commit: process.env.GIT_COMMIT, host: os.hostname(), connId, diff --git a/src/infra/system-presence.test.ts b/src/infra/system-presence.test.ts index 982c7d6112..1092911560 100644 --- a/src/infra/system-presence.test.ts +++ b/src/infra/system-presence.test.ts @@ -1,8 +1,12 @@ import { randomUUID } from "node:crypto"; -import { describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import { listSystemPresence, updateSystemPresence, upsertPresence } from "./system-presence.js"; describe("system-presence", () => { + afterEach(() => { + vi.useRealTimers(); + }); + it("dedupes entries across sources by case-insensitive instanceId key", () => { const instanceIdUpper = `AaBb-${randomUUID()}`.toUpperCase(); const instanceIdLower = instanceIdUpper.toLowerCase(); @@ -56,4 +60,25 @@ describe("system-presence", () => { expect(entry?.roles).toEqual(expect.arrayContaining(["operator", "node"])); expect(entry?.scopes).toEqual(expect.arrayContaining(["operator.admin", "system.run"])); }); + + it("prunes stale non-self entries after TTL", () => { + vi.useFakeTimers(); + vi.setSystemTime(Date.now()); + + const deviceId = randomUUID(); + upsertPresence(deviceId, { + deviceId, + host: "stale-host", + mode: "ui", + reason: "connect", + }); + + expect(listSystemPresence().some((entry) => entry.deviceId === deviceId)).toBe(true); + + vi.advanceTimersByTime(5 * 60 * 1000 + 1); + + const entries = listSystemPresence(); + expect(entries.some((entry) => entry.deviceId === deviceId)).toBe(false); + expect(entries.some((entry) => entry.reason === "self")).toBe(true); + }); }); diff --git a/src/infra/system-presence.ts b/src/infra/system-presence.ts index 4ac6aacfea..a6e5863b23 100644 --- a/src/infra/system-presence.ts +++ b/src/infra/system-presence.ts @@ -1,6 +1,7 @@ import { spawnSync } from "node:child_process"; import os from "node:os"; import { pickPrimaryLanIPv4 } from "../gateway/net.js"; +import { resolveRuntimeServiceVersion } from "../version.js"; export type SystemPresence = { host?: string; @@ -50,11 +51,7 @@ function resolvePrimaryIPv4(): string | undefined { function initSelfPresence() { const host = os.hostname(); const ip = resolvePrimaryIPv4() ?? undefined; - const version = - process.env.OPENCLAW_VERSION ?? - process.env.OPENCLAW_SERVICE_VERSION ?? - process.env.npm_package_version ?? - "unknown"; + const version = resolveRuntimeServiceVersion(process.env, "unknown"); const modelIdentifier = (() => { const p = os.platform(); if (p === "darwin") { diff --git a/src/infra/system-presence.version.test.ts b/src/infra/system-presence.version.test.ts new file mode 100644 index 0000000000..1eb68efbe6 --- /dev/null +++ b/src/infra/system-presence.version.test.ts @@ -0,0 +1,60 @@ +import { describe, expect, it, vi } from "vitest"; +import { withEnvAsync } from "../test-utils/env.js"; + +async function withPresenceModule( + env: Record, + run: (module: typeof import("./system-presence.js")) => Promise | T, +): Promise { + return withEnvAsync(env, async () => { + vi.resetModules(); + try { + const module = await import("./system-presence.js"); + return await run(module); + } finally { + vi.resetModules(); + } + }); +} + +describe("system-presence version fallback", () => { + it("uses OPENCLAW_SERVICE_VERSION when OPENCLAW_VERSION is not set", async () => { + await withPresenceModule( + { + OPENCLAW_SERVICE_VERSION: "2.4.6-service", + npm_package_version: "1.0.0-package", + }, + ({ listSystemPresence }) => { + const selfEntry = listSystemPresence().find((entry) => entry.reason === "self"); + expect(selfEntry?.version).toBe("2.4.6-service"); + }, + ); + }); + + it("prefers OPENCLAW_VERSION over OPENCLAW_SERVICE_VERSION", async () => { + await withPresenceModule( + { + OPENCLAW_VERSION: "9.9.9-cli", + OPENCLAW_SERVICE_VERSION: "2.4.6-service", + npm_package_version: "1.0.0-package", + }, + ({ listSystemPresence }) => { + const selfEntry = listSystemPresence().find((entry) => entry.reason === "self"); + expect(selfEntry?.version).toBe("9.9.9-cli"); + }, + ); + }); + + it("uses npm_package_version when OPENCLAW_VERSION and OPENCLAW_SERVICE_VERSION are blank", async () => { + await withPresenceModule( + { + OPENCLAW_VERSION: " ", + OPENCLAW_SERVICE_VERSION: "\t", + npm_package_version: "1.0.0-package", + }, + ({ listSystemPresence }) => { + const selfEntry = listSystemPresence().find((entry) => entry.reason === "self"); + expect(selfEntry?.version).toBe("1.0.0-package"); + }, + ); + }); +}); diff --git a/src/version.test.ts b/src/version.test.ts index 8806d00de8..d8d1f4fa5d 100644 --- a/src/version.test.ts +++ b/src/version.test.ts @@ -6,6 +6,7 @@ import { describe, expect, it } from "vitest"; import { readVersionFromBuildInfoForModuleUrl, readVersionFromPackageJsonForModuleUrl, + resolveRuntimeServiceVersion, resolveVersionFromModuleUrl, } from "./version.js"; @@ -83,4 +84,43 @@ describe("version resolution", () => { expect(resolveVersionFromModuleUrl(moduleUrl)).toBeNull(); }); }); + + it("prefers OPENCLAW_VERSION over service and package versions", () => { + expect( + resolveRuntimeServiceVersion({ + OPENCLAW_VERSION: "9.9.9", + OPENCLAW_SERVICE_VERSION: "2.2.2", + npm_package_version: "1.1.1", + }), + ).toBe("9.9.9"); + }); + + it("uses service and package fallbacks and ignores blank env values", () => { + expect( + resolveRuntimeServiceVersion({ + OPENCLAW_VERSION: " ", + OPENCLAW_SERVICE_VERSION: " 2.0.0 ", + npm_package_version: "1.0.0", + }), + ).toBe("2.0.0"); + + expect( + resolveRuntimeServiceVersion({ + OPENCLAW_VERSION: " ", + OPENCLAW_SERVICE_VERSION: "\t", + npm_package_version: " 1.0.0-package ", + }), + ).toBe("1.0.0-package"); + + expect( + resolveRuntimeServiceVersion( + { + OPENCLAW_VERSION: "", + OPENCLAW_SERVICE_VERSION: " ", + npm_package_version: "", + }, + "fallback", + ), + ).toBe("fallback"); + }); }); diff --git a/src/version.ts b/src/version.ts index bf2d1e44e6..18c3c968dd 100644 --- a/src/version.ts +++ b/src/version.ts @@ -44,6 +44,16 @@ function readVersionFromJsonCandidates( } } +function firstNonEmpty(...values: Array): string | undefined { + for (const value of values) { + const trimmed = value?.trim(); + if (trimmed) { + return trimmed; + } + } + return undefined; +} + export function readVersionFromPackageJsonForModuleUrl(moduleUrl: string): string | null { return readVersionFromJsonCandidates(moduleUrl, PACKAGE_JSON_CANDIDATES, { requirePackageName: true, @@ -61,6 +71,23 @@ export function resolveVersionFromModuleUrl(moduleUrl: string): string | null { ); } +export type RuntimeVersionEnv = { + [key: string]: string | undefined; +}; + +export function resolveRuntimeServiceVersion( + env: RuntimeVersionEnv = process.env as RuntimeVersionEnv, + fallback = "dev", +): string { + return ( + firstNonEmpty( + env["OPENCLAW_VERSION"], + env["OPENCLAW_SERVICE_VERSION"], + env["npm_package_version"], + ) ?? fallback + ); +} + // Single source of truth for the current OpenClaw version. // - Embedded/bundled builds: injected define or env var. // - Dev/npm builds: package.json.