mirror of
https://github.com/openclaw/openclaw.git
synced 2026-02-19 18:39:20 -05:00
refactor(test): share gateway ws e2e harness
This commit is contained in:
39
src/gateway/server.e2e-ws-harness.ts
Normal file
39
src/gateway/server.e2e-ws-harness.ts
Normal file
@@ -0,0 +1,39 @@
|
||||
import { WebSocket } from "ws";
|
||||
import { connectOk, getFreePort, startGatewayServer } from "./test-helpers.js";
|
||||
|
||||
export type GatewayWsClient = {
|
||||
ws: WebSocket;
|
||||
hello: unknown;
|
||||
};
|
||||
|
||||
export type GatewayServerHarness = {
|
||||
port: number;
|
||||
server: Awaited<ReturnType<typeof startGatewayServer>>;
|
||||
openClient: (opts?: Parameters<typeof connectOk>[1]) => Promise<GatewayWsClient>;
|
||||
close: () => Promise<void>;
|
||||
};
|
||||
|
||||
export async function startGatewayServerHarness(): Promise<GatewayServerHarness> {
|
||||
const previousToken = process.env.OPENCLAW_GATEWAY_TOKEN;
|
||||
delete process.env.OPENCLAW_GATEWAY_TOKEN;
|
||||
const port = await getFreePort();
|
||||
const server = await startGatewayServer(port);
|
||||
|
||||
const openClient = async (opts?: Parameters<typeof connectOk>[1]): Promise<GatewayWsClient> => {
|
||||
const ws = new WebSocket(`ws://127.0.0.1:${port}`);
|
||||
await new Promise<void>((resolve) => ws.once("open", resolve));
|
||||
const hello = await connectOk(ws, opts);
|
||||
return { ws, hello };
|
||||
};
|
||||
|
||||
const close = async () => {
|
||||
await server.close();
|
||||
if (previousToken === undefined) {
|
||||
delete process.env.OPENCLAW_GATEWAY_TOKEN;
|
||||
} else {
|
||||
process.env.OPENCLAW_GATEWAY_TOKEN = previousToken;
|
||||
}
|
||||
};
|
||||
|
||||
return { port, server, openClient, close };
|
||||
}
|
||||
@@ -1,58 +1,26 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterAll, beforeAll, describe, expect, test } from "vitest";
|
||||
import { WebSocket } from "ws";
|
||||
import { emitAgentEvent } from "../infra/agent-events.js";
|
||||
import {
|
||||
loadOrCreateDeviceIdentity,
|
||||
publicKeyRawBase64UrlFromPem,
|
||||
signDevicePayload,
|
||||
} from "../infra/device-identity.js";
|
||||
import { emitHeartbeatEvent } from "../infra/heartbeat-events.js";
|
||||
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
|
||||
import { buildDeviceAuthPayload } from "./device-auth.js";
|
||||
import {
|
||||
connectOk,
|
||||
getFreePort,
|
||||
installGatewayTestHooks,
|
||||
onceMessage,
|
||||
startGatewayServer,
|
||||
startServerWithClient,
|
||||
} from "./test-helpers.js";
|
||||
import { startGatewayServerHarness, type GatewayServerHarness } from "./server.e2e-ws-harness.js";
|
||||
import { installGatewayTestHooks, onceMessage } from "./test-helpers.js";
|
||||
|
||||
installGatewayTestHooks({ scope: "suite" });
|
||||
|
||||
let server: Awaited<ReturnType<typeof startGatewayServer>>;
|
||||
let port = 0;
|
||||
let previousToken: string | undefined;
|
||||
let harness: GatewayServerHarness;
|
||||
|
||||
beforeAll(async () => {
|
||||
previousToken = process.env.OPENCLAW_GATEWAY_TOKEN;
|
||||
delete process.env.OPENCLAW_GATEWAY_TOKEN;
|
||||
port = await getFreePort();
|
||||
server = await startGatewayServer(port);
|
||||
harness = await startGatewayServerHarness();
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await server.close();
|
||||
if (previousToken === undefined) {
|
||||
delete process.env.OPENCLAW_GATEWAY_TOKEN;
|
||||
} else {
|
||||
process.env.OPENCLAW_GATEWAY_TOKEN = previousToken;
|
||||
}
|
||||
await harness.close();
|
||||
});
|
||||
|
||||
const openClient = async (opts?: Parameters<typeof connectOk>[1]) => {
|
||||
const ws = new WebSocket(`ws://127.0.0.1:${port}`);
|
||||
await new Promise<void>((resolve) => ws.once("open", resolve));
|
||||
await connectOk(ws, opts);
|
||||
return ws;
|
||||
};
|
||||
|
||||
describe("gateway server health/presence", () => {
|
||||
test("connect + health + presence + status succeed", { timeout: 60_000 }, async () => {
|
||||
const ws = await openClient();
|
||||
const { ws } = await harness.openClient();
|
||||
|
||||
const healthP = onceMessage(ws, (o) => o.type === "res" && o.id === "health1");
|
||||
const statusP = onceMessage(ws, (o) => o.type === "res" && o.id === "status1");
|
||||
@@ -101,7 +69,7 @@ describe("gateway server health/presence", () => {
|
||||
payload?: unknown;
|
||||
};
|
||||
|
||||
const ws = await openClient();
|
||||
const { ws } = await harness.openClient();
|
||||
|
||||
const waitHeartbeat = onceMessage<EventFrame>(
|
||||
ws,
|
||||
@@ -144,7 +112,7 @@ describe("gateway server health/presence", () => {
|
||||
});
|
||||
|
||||
test("presence events carry seq + stateVersion", { timeout: 8000 }, async () => {
|
||||
const ws = await openClient();
|
||||
const { ws } = await harness.openClient();
|
||||
|
||||
const presenceEventP = onceMessage(ws, (o) => o.type === "event" && o.event === "presence");
|
||||
ws.send(
|
||||
@@ -165,7 +133,7 @@ describe("gateway server health/presence", () => {
|
||||
});
|
||||
|
||||
test("agent events stream with seq", { timeout: 8000 }, async () => {
|
||||
const ws = await openClient();
|
||||
const { ws } = await harness.openClient();
|
||||
|
||||
const runId = randomUUID();
|
||||
const evtPromise = onceMessage(
|
||||
@@ -186,21 +154,24 @@ describe("gateway server health/presence", () => {
|
||||
});
|
||||
|
||||
test("shutdown event is broadcast on close", { timeout: 8000 }, async () => {
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
const localHarness = await startGatewayServerHarness();
|
||||
const { ws } = await localHarness.openClient();
|
||||
const shutdownP = onceMessage(ws, (o) => o.type === "event" && o.event === "shutdown", 5000);
|
||||
await server.close();
|
||||
await localHarness.close();
|
||||
const evt = await shutdownP;
|
||||
expect(evt.payload?.reason).toBeDefined();
|
||||
});
|
||||
|
||||
test("presence broadcast reaches multiple clients", { timeout: 8000 }, async () => {
|
||||
const clients = await Promise.all([openClient(), openClient(), openClient()]);
|
||||
const waits = clients.map((c) =>
|
||||
onceMessage(c, (o) => o.type === "event" && o.event === "presence"),
|
||||
const clients = await Promise.all([
|
||||
harness.openClient(),
|
||||
harness.openClient(),
|
||||
harness.openClient(),
|
||||
]);
|
||||
const waits = clients.map(({ ws }) =>
|
||||
onceMessage(ws, (o) => o.type === "event" && o.event === "presence"),
|
||||
);
|
||||
clients[0].send(
|
||||
clients[0].ws.send(
|
||||
JSON.stringify({
|
||||
type: "req",
|
||||
id: "broadcast",
|
||||
@@ -213,31 +184,17 @@ describe("gateway server health/presence", () => {
|
||||
expect(evt.payload?.presence?.length).toBeGreaterThan(0);
|
||||
expect(typeof evt.seq).toBe("number");
|
||||
}
|
||||
for (const c of clients) {
|
||||
c.close();
|
||||
for (const { ws } of clients) {
|
||||
ws.close();
|
||||
}
|
||||
});
|
||||
|
||||
test("presence includes client fingerprint", async () => {
|
||||
const identityPath = path.join(os.tmpdir(), `openclaw-device-${randomUUID()}.json`);
|
||||
const identity = loadOrCreateDeviceIdentity(identityPath);
|
||||
const token = process.env.OPENCLAW_GATEWAY_TOKEN?.trim() || undefined;
|
||||
const role = "operator";
|
||||
const scopes: string[] = ["operator.admin"];
|
||||
const signedAtMs = Date.now();
|
||||
const payload = buildDeviceAuthPayload({
|
||||
deviceId: identity.deviceId,
|
||||
clientId: GATEWAY_CLIENT_NAMES.FINGERPRINT,
|
||||
clientMode: GATEWAY_CLIENT_MODES.UI,
|
||||
const { ws } = await harness.openClient({
|
||||
role,
|
||||
scopes,
|
||||
signedAtMs,
|
||||
token: token ?? null,
|
||||
});
|
||||
const ws = await openClient({
|
||||
role,
|
||||
scopes,
|
||||
token,
|
||||
client: {
|
||||
id: GATEWAY_CLIENT_NAMES.FINGERPRINT,
|
||||
version: "9.9.9",
|
||||
@@ -247,12 +204,6 @@ describe("gateway server health/presence", () => {
|
||||
mode: GATEWAY_CLIENT_MODES.UI,
|
||||
instanceId: "abc",
|
||||
},
|
||||
device: {
|
||||
id: identity.deviceId,
|
||||
publicKey: publicKeyRawBase64UrlFromPem(identity.publicKeyPem),
|
||||
signature: signDevicePayload(identity.privateKeyPem, payload),
|
||||
signedAt: signedAtMs,
|
||||
},
|
||||
});
|
||||
|
||||
const presenceP = onceMessage(ws, (o) => o.type === "res" && o.id === "fingerprint", 4000);
|
||||
@@ -286,7 +237,7 @@ describe("gateway server health/presence", () => {
|
||||
|
||||
test("cli connections are not tracked as instances", async () => {
|
||||
const cliId = `cli-${randomUUID()}`;
|
||||
const ws = await openClient({
|
||||
const { ws } = await harness.openClient({
|
||||
client: {
|
||||
id: GATEWAY_CLIENT_NAMES.CLI,
|
||||
version: "dev",
|
||||
|
||||
@@ -2,16 +2,14 @@ import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterAll, beforeAll, beforeEach, describe, expect, test, vi } from "vitest";
|
||||
import { WebSocket } from "ws";
|
||||
import { DEFAULT_PROVIDER } from "../agents/defaults.js";
|
||||
import { startGatewayServerHarness, type GatewayServerHarness } from "./server.e2e-ws-harness.js";
|
||||
import {
|
||||
connectOk,
|
||||
embeddedRunMock,
|
||||
getFreePort,
|
||||
installGatewayTestHooks,
|
||||
piSdkMock,
|
||||
rpcReq,
|
||||
startGatewayServer,
|
||||
testState,
|
||||
writeSessionStore,
|
||||
} from "./test-helpers.js";
|
||||
@@ -57,32 +55,17 @@ vi.mock("../hooks/internal-hooks.js", async () => {
|
||||
|
||||
installGatewayTestHooks({ scope: "suite" });
|
||||
|
||||
let server: Awaited<ReturnType<typeof startGatewayServer>>;
|
||||
let port = 0;
|
||||
let previousToken: string | undefined;
|
||||
let harness: GatewayServerHarness;
|
||||
|
||||
beforeAll(async () => {
|
||||
previousToken = process.env.OPENCLAW_GATEWAY_TOKEN;
|
||||
delete process.env.OPENCLAW_GATEWAY_TOKEN;
|
||||
port = await getFreePort();
|
||||
server = await startGatewayServer(port);
|
||||
harness = await startGatewayServerHarness();
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await server.close();
|
||||
if (previousToken === undefined) {
|
||||
delete process.env.OPENCLAW_GATEWAY_TOKEN;
|
||||
} else {
|
||||
process.env.OPENCLAW_GATEWAY_TOKEN = previousToken;
|
||||
}
|
||||
await harness.close();
|
||||
});
|
||||
|
||||
const openClient = async (opts?: Parameters<typeof connectOk>[1]) => {
|
||||
const ws = new WebSocket(`ws://127.0.0.1:${port}`);
|
||||
await new Promise<void>((resolve) => ws.once("open", resolve));
|
||||
const hello = await connectOk(ws, opts);
|
||||
return { ws, hello };
|
||||
};
|
||||
const openClient = async (opts?: Parameters<typeof connectOk>[1]) => await harness.openClient(opts);
|
||||
|
||||
describe("gateway server sessions", () => {
|
||||
beforeEach(() => {
|
||||
@@ -143,7 +126,7 @@ describe("gateway server sessions", () => {
|
||||
});
|
||||
|
||||
const { ws, hello } = await openClient();
|
||||
expect((hello as unknown as { features?: { methods?: string[] } }).features?.methods).toEqual(
|
||||
expect((hello as { features?: { methods?: string[] } }).features?.methods).toEqual(
|
||||
expect.arrayContaining([
|
||||
"sessions.list",
|
||||
"sessions.preview",
|
||||
|
||||
Reference in New Issue
Block a user