diff --git a/src/gateway/server.e2e-ws-harness.ts b/src/gateway/server.e2e-ws-harness.ts new file mode 100644 index 0000000000..c3775e53ce --- /dev/null +++ b/src/gateway/server.e2e-ws-harness.ts @@ -0,0 +1,39 @@ +import { WebSocket } from "ws"; +import { connectOk, getFreePort, startGatewayServer } from "./test-helpers.js"; + +export type GatewayWsClient = { + ws: WebSocket; + hello: unknown; +}; + +export type GatewayServerHarness = { + port: number; + server: Awaited>; + openClient: (opts?: Parameters[1]) => Promise; + close: () => Promise; +}; + +export async function startGatewayServerHarness(): Promise { + const previousToken = process.env.OPENCLAW_GATEWAY_TOKEN; + delete process.env.OPENCLAW_GATEWAY_TOKEN; + const port = await getFreePort(); + const server = await startGatewayServer(port); + + const openClient = async (opts?: Parameters[1]): Promise => { + const ws = new WebSocket(`ws://127.0.0.1:${port}`); + await new Promise((resolve) => ws.once("open", resolve)); + const hello = await connectOk(ws, opts); + return { ws, hello }; + }; + + const close = async () => { + await server.close(); + if (previousToken === undefined) { + delete process.env.OPENCLAW_GATEWAY_TOKEN; + } else { + process.env.OPENCLAW_GATEWAY_TOKEN = previousToken; + } + }; + + return { port, server, openClient, close }; +} diff --git a/src/gateway/server.health.e2e.test.ts b/src/gateway/server.health.e2e.test.ts index adab0dfd1a..f42e7b78b3 100644 --- a/src/gateway/server.health.e2e.test.ts +++ b/src/gateway/server.health.e2e.test.ts @@ -1,58 +1,26 @@ import { randomUUID } from "node:crypto"; -import os from "node:os"; -import path from "node:path"; import { afterAll, beforeAll, describe, expect, test } from "vitest"; -import { WebSocket } from "ws"; import { emitAgentEvent } from "../infra/agent-events.js"; -import { - loadOrCreateDeviceIdentity, - publicKeyRawBase64UrlFromPem, - signDevicePayload, -} from "../infra/device-identity.js"; import { emitHeartbeatEvent } from "../infra/heartbeat-events.js"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js"; -import { buildDeviceAuthPayload } from "./device-auth.js"; -import { - connectOk, - getFreePort, - installGatewayTestHooks, - onceMessage, - startGatewayServer, - startServerWithClient, -} from "./test-helpers.js"; +import { startGatewayServerHarness, type GatewayServerHarness } from "./server.e2e-ws-harness.js"; +import { installGatewayTestHooks, onceMessage } from "./test-helpers.js"; installGatewayTestHooks({ scope: "suite" }); -let server: Awaited>; -let port = 0; -let previousToken: string | undefined; +let harness: GatewayServerHarness; beforeAll(async () => { - previousToken = process.env.OPENCLAW_GATEWAY_TOKEN; - delete process.env.OPENCLAW_GATEWAY_TOKEN; - port = await getFreePort(); - server = await startGatewayServer(port); + harness = await startGatewayServerHarness(); }); afterAll(async () => { - await server.close(); - if (previousToken === undefined) { - delete process.env.OPENCLAW_GATEWAY_TOKEN; - } else { - process.env.OPENCLAW_GATEWAY_TOKEN = previousToken; - } + await harness.close(); }); -const openClient = async (opts?: Parameters[1]) => { - const ws = new WebSocket(`ws://127.0.0.1:${port}`); - await new Promise((resolve) => ws.once("open", resolve)); - await connectOk(ws, opts); - return ws; -}; - describe("gateway server health/presence", () => { test("connect + health + presence + status succeed", { timeout: 60_000 }, async () => { - const ws = await openClient(); + const { ws } = await harness.openClient(); const healthP = onceMessage(ws, (o) => o.type === "res" && o.id === "health1"); const statusP = onceMessage(ws, (o) => o.type === "res" && o.id === "status1"); @@ -101,7 +69,7 @@ describe("gateway server health/presence", () => { payload?: unknown; }; - const ws = await openClient(); + const { ws } = await harness.openClient(); const waitHeartbeat = onceMessage( ws, @@ -144,7 +112,7 @@ describe("gateway server health/presence", () => { }); test("presence events carry seq + stateVersion", { timeout: 8000 }, async () => { - const ws = await openClient(); + const { ws } = await harness.openClient(); const presenceEventP = onceMessage(ws, (o) => o.type === "event" && o.event === "presence"); ws.send( @@ -165,7 +133,7 @@ describe("gateway server health/presence", () => { }); test("agent events stream with seq", { timeout: 8000 }, async () => { - const ws = await openClient(); + const { ws } = await harness.openClient(); const runId = randomUUID(); const evtPromise = onceMessage( @@ -186,21 +154,24 @@ describe("gateway server health/presence", () => { }); test("shutdown event is broadcast on close", { timeout: 8000 }, async () => { - const { server, ws } = await startServerWithClient(); - await connectOk(ws); - + const localHarness = await startGatewayServerHarness(); + const { ws } = await localHarness.openClient(); const shutdownP = onceMessage(ws, (o) => o.type === "event" && o.event === "shutdown", 5000); - await server.close(); + await localHarness.close(); const evt = await shutdownP; expect(evt.payload?.reason).toBeDefined(); }); test("presence broadcast reaches multiple clients", { timeout: 8000 }, async () => { - const clients = await Promise.all([openClient(), openClient(), openClient()]); - const waits = clients.map((c) => - onceMessage(c, (o) => o.type === "event" && o.event === "presence"), + const clients = await Promise.all([ + harness.openClient(), + harness.openClient(), + harness.openClient(), + ]); + const waits = clients.map(({ ws }) => + onceMessage(ws, (o) => o.type === "event" && o.event === "presence"), ); - clients[0].send( + clients[0].ws.send( JSON.stringify({ type: "req", id: "broadcast", @@ -213,31 +184,17 @@ describe("gateway server health/presence", () => { expect(evt.payload?.presence?.length).toBeGreaterThan(0); expect(typeof evt.seq).toBe("number"); } - for (const c of clients) { - c.close(); + for (const { ws } of clients) { + ws.close(); } }); test("presence includes client fingerprint", async () => { - const identityPath = path.join(os.tmpdir(), `openclaw-device-${randomUUID()}.json`); - const identity = loadOrCreateDeviceIdentity(identityPath); - const token = process.env.OPENCLAW_GATEWAY_TOKEN?.trim() || undefined; const role = "operator"; const scopes: string[] = ["operator.admin"]; - const signedAtMs = Date.now(); - const payload = buildDeviceAuthPayload({ - deviceId: identity.deviceId, - clientId: GATEWAY_CLIENT_NAMES.FINGERPRINT, - clientMode: GATEWAY_CLIENT_MODES.UI, + const { ws } = await harness.openClient({ role, scopes, - signedAtMs, - token: token ?? null, - }); - const ws = await openClient({ - role, - scopes, - token, client: { id: GATEWAY_CLIENT_NAMES.FINGERPRINT, version: "9.9.9", @@ -247,12 +204,6 @@ describe("gateway server health/presence", () => { mode: GATEWAY_CLIENT_MODES.UI, instanceId: "abc", }, - device: { - id: identity.deviceId, - publicKey: publicKeyRawBase64UrlFromPem(identity.publicKeyPem), - signature: signDevicePayload(identity.privateKeyPem, payload), - signedAt: signedAtMs, - }, }); const presenceP = onceMessage(ws, (o) => o.type === "res" && o.id === "fingerprint", 4000); @@ -286,7 +237,7 @@ describe("gateway server health/presence", () => { test("cli connections are not tracked as instances", async () => { const cliId = `cli-${randomUUID()}`; - const ws = await openClient({ + const { ws } = await harness.openClient({ client: { id: GATEWAY_CLIENT_NAMES.CLI, version: "dev", diff --git a/src/gateway/server.sessions.gateway-server-sessions-a.e2e.test.ts b/src/gateway/server.sessions.gateway-server-sessions-a.e2e.test.ts index 9d387c8ac6..16df39522c 100644 --- a/src/gateway/server.sessions.gateway-server-sessions-a.e2e.test.ts +++ b/src/gateway/server.sessions.gateway-server-sessions-a.e2e.test.ts @@ -2,16 +2,14 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterAll, beforeAll, beforeEach, describe, expect, test, vi } from "vitest"; -import { WebSocket } from "ws"; import { DEFAULT_PROVIDER } from "../agents/defaults.js"; +import { startGatewayServerHarness, type GatewayServerHarness } from "./server.e2e-ws-harness.js"; import { connectOk, embeddedRunMock, - getFreePort, installGatewayTestHooks, piSdkMock, rpcReq, - startGatewayServer, testState, writeSessionStore, } from "./test-helpers.js"; @@ -57,32 +55,17 @@ vi.mock("../hooks/internal-hooks.js", async () => { installGatewayTestHooks({ scope: "suite" }); -let server: Awaited>; -let port = 0; -let previousToken: string | undefined; +let harness: GatewayServerHarness; beforeAll(async () => { - previousToken = process.env.OPENCLAW_GATEWAY_TOKEN; - delete process.env.OPENCLAW_GATEWAY_TOKEN; - port = await getFreePort(); - server = await startGatewayServer(port); + harness = await startGatewayServerHarness(); }); afterAll(async () => { - await server.close(); - if (previousToken === undefined) { - delete process.env.OPENCLAW_GATEWAY_TOKEN; - } else { - process.env.OPENCLAW_GATEWAY_TOKEN = previousToken; - } + await harness.close(); }); -const openClient = async (opts?: Parameters[1]) => { - const ws = new WebSocket(`ws://127.0.0.1:${port}`); - await new Promise((resolve) => ws.once("open", resolve)); - const hello = await connectOk(ws, opts); - return { ws, hello }; -}; +const openClient = async (opts?: Parameters[1]) => await harness.openClient(opts); describe("gateway server sessions", () => { beforeEach(() => { @@ -143,7 +126,7 @@ describe("gateway server sessions", () => { }); const { ws, hello } = await openClient(); - expect((hello as unknown as { features?: { methods?: string[] } }).features?.methods).toEqual( + expect((hello as { features?: { methods?: string[] } }).features?.methods).toEqual( expect.arrayContaining([ "sessions.list", "sessions.preview",