diff --git a/src/auto-reply/reply.block-streaming.test.ts b/src/auto-reply/reply.block-streaming.test.ts index d982280ab4..18c037789c 100644 --- a/src/auto-reply/reply.block-streaming.test.ts +++ b/src/auto-reply/reply.block-streaming.test.ts @@ -164,7 +164,7 @@ describe("block streaming", () => { }); }); - it("falls back to final payloads when block reply send times out", async () => { + it("falls back to final payloads and respects telegram streamMode block", async () => { await withTempHome(async (home) => { let sawAbort = false; const onBlockReply = vi.fn((_, context) => { @@ -220,32 +220,26 @@ describe("block streaming", () => { const res = await replyPromise; expect(res).toMatchObject({ text: "final" }); expect(sawAbort).toBe(true); - }); - }); - it("does not enable block streaming for telegram streamMode block", async () => { - await withTempHome(async (home) => { - const onBlockReply = vi.fn().mockResolvedValue(undefined); - - const impl = async () => ({ + const onBlockReplyStreamMode = vi.fn().mockResolvedValue(undefined); + piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(async () => ({ payloads: [{ text: "final" }], meta: { durationMs: 5, agentMeta: { sessionId: "s", provider: "p", model: "m" }, }, - }); - piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(impl); + })); - const res = await getReplyFromConfig( + const resStreamMode = await getReplyFromConfig( { Body: "ping", From: "+1004", To: "+2000", - MessageSid: "msg-126", + MessageSid: "msg-127", Provider: "telegram", }, { - onBlockReply, + onBlockReply: onBlockReplyStreamMode, }, { agents: { @@ -259,8 +253,8 @@ describe("block streaming", () => { }, ); - expect(res?.text).toBe("final"); - expect(onBlockReply).not.toHaveBeenCalled(); + expect(resStreamMode?.text).toBe("final"); + expect(onBlockReplyStreamMode).not.toHaveBeenCalled(); }); }); }); diff --git a/src/auto-reply/reply.raw-body.test.ts b/src/auto-reply/reply.raw-body.test.ts index 0b19df8a12..8ec67b88af 100644 --- a/src/auto-reply/reply.raw-body.test.ts +++ b/src/auto-reply/reply.raw-body.test.ts @@ -102,7 +102,7 @@ describe("RawBody directive parsing", () => { vi.clearAllMocks(); }); - it("detects command directives from RawBody/CommandBody in wrapped group messages", async () => { + it("handles directives, history, and non-default agent session files", async () => { await withTempHome(async (home) => { const assertCommandReply = async (input: { message: ReplyMessage; @@ -161,11 +161,7 @@ describe("RawBody directive parsing", () => { }, expectedIncludes: ["Verbose logging enabled."], }); - }); - }); - it("preserves history and reuses non-default agent session files", async () => { - await withTempHome(async (home) => { vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ payloads: [{ text: "ok" }], meta: { diff --git a/src/gateway/server-reload.config-during-reply.test.ts b/src/gateway/server-reload.config-during-reply.test.ts index 326e9de759..c0a7265090 100644 --- a/src/gateway/server-reload.config-during-reply.test.ts +++ b/src/gateway/server-reload.config-during-reply.test.ts @@ -35,8 +35,8 @@ describe("gateway config reload during reply", () => { let deliveredReplies: string[] = []; const dispatcher = createReplyDispatcher({ deliver: async (payload) => { - // Simulate async reply delivery - await new Promise((resolve) => setTimeout(resolve, 20)); + // Keep delivery asynchronous without real wall-clock delay. + await Promise.resolve(); deliveredReplies.push(payload.text ?? ""); }, onError: (err) => { diff --git a/src/gateway/server-reload.integration.test.ts b/src/gateway/server-reload.integration.test.ts index 3bd1bc80e3..698b1041fd 100644 --- a/src/gateway/server-reload.integration.test.ts +++ b/src/gateway/server-reload.integration.test.ts @@ -30,8 +30,8 @@ describe("gateway restart deferral integration", () => { const deliveredReplies: Array<{ text: string; timestamp: number }> = []; const dispatcher = createReplyDispatcher({ deliver: async (payload) => { - // Simulate network delay - await new Promise((resolve) => setTimeout(resolve, 20)); + // Keep delivery asynchronous without real wall-clock delay. + await Promise.resolve(); deliveredReplies.push({ text: payload.text ?? "", timestamp: Date.now(), diff --git a/src/gateway/server-reload.real-scenario.test.ts b/src/gateway/server-reload.real-scenario.test.ts index 19ece2234a..dc10891ff7 100644 --- a/src/gateway/server-reload.real-scenario.test.ts +++ b/src/gateway/server-reload.real-scenario.test.ts @@ -4,6 +4,16 @@ */ import { describe, expect, it, vi, beforeEach, afterEach } from "vitest"; +function createDeferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + describe("real scenario: config change during message processing", () => { let replyErrors: string[] = []; @@ -26,8 +36,10 @@ describe("real scenario: config change during message processing", () => { let rpcConnected = true; const deliveredReplies: string[] = []; + const deliveryStarted = createDeferred(); + const allowDelivery = createDeferred(); - // Create dispatcher with slow delivery (simulates real network delay) + // Hold delivery open so restart checks run while reply is in-flight. const dispatcher = createReplyDispatcher({ deliver: async (payload) => { if (!rpcConnected) { @@ -35,8 +47,8 @@ describe("real scenario: config change during message processing", () => { replyErrors.push(error); throw new Error(error); } - // Slow delivery — restart checks will run during this window - await new Promise((resolve) => setTimeout(resolve, 150)); + deliveryStarted.resolve(); + await allowDelivery.promise; deliveredReplies.push(payload.text ?? ""); }, onError: () => { @@ -49,6 +61,7 @@ describe("real scenario: config change during message processing", () => { // keeping pending > 0 is the in-flight delivery itself. dispatcher.sendFinalReply({ text: "Configuration updated!" }); dispatcher.markComplete(); + await deliveryStarted.promise; // At this point: markComplete flagged, delivery is in flight. // pending > 0 because the in-flight delivery keeps it alive. @@ -59,7 +72,7 @@ describe("real scenario: config change during message processing", () => { // If the tracking is broken, pending would be 0 and we'd restart. let restartTriggered = false; for (let i = 0; i < 3; i++) { - await new Promise((resolve) => setTimeout(resolve, 25)); + await Promise.resolve(); const pending = getTotalPendingReplies(); if (pending === 0) { restartTriggered = true; @@ -68,6 +81,7 @@ describe("real scenario: config change during message processing", () => { } } + allowDelivery.resolve(); // Wait for delivery to complete await dispatcher.waitForIdle(); @@ -83,10 +97,11 @@ describe("real scenario: config change during message processing", () => { it("should keep pending > 0 until reply is actually enqueued", async () => { const { createReplyDispatcher } = await import("../auto-reply/reply/reply-dispatcher.js"); const { getTotalPendingReplies } = await import("../auto-reply/reply/dispatcher-registry.js"); + const allowDelivery = createDeferred(); const dispatcher = createReplyDispatcher({ deliver: async (_payload) => { - await new Promise((resolve) => setTimeout(resolve, 10)); + await allowDelivery.promise; }, }); @@ -94,7 +109,7 @@ describe("real scenario: config change during message processing", () => { expect(getTotalPendingReplies()).toBe(1); // Simulate command processing delay BEFORE reply is enqueued - await new Promise((resolve) => setTimeout(resolve, 20)); + await Promise.resolve(); // During this delay, pending should STILL be 1 (reservation active) expect(getTotalPendingReplies()).toBe(1); @@ -112,6 +127,7 @@ describe("real scenario: config change during message processing", () => { const pendingAfterMarkComplete = getTotalPendingReplies(); expect(pendingAfterMarkComplete).toBeGreaterThan(0); + allowDelivery.resolve(); // Wait for reply to send await dispatcher.waitForIdle(); diff --git a/src/gateway/server.nodes.late-invoke.test.ts b/src/gateway/server.nodes.late-invoke.test.ts index b965e77346..8219b87842 100644 --- a/src/gateway/server.nodes.late-invoke.test.ts +++ b/src/gateway/server.nodes.late-invoke.test.ts @@ -15,26 +15,25 @@ vi.mock("../infra/update-runner.js", () => ({ import { connectOk, + getFreePort, installGatewayTestHooks, rpcReq, - startServerWithClient, + startGatewayServer, } from "./test-helpers.js"; +import { testState } from "./test-helpers.mocks.js"; installGatewayTestHooks({ scope: "suite" }); -let server: Awaited>["server"]; -let ws: WebSocket; +let server: Awaited>; let port: number; let nodeWs: WebSocket; let nodeId: string; beforeAll(async () => { const token = "test-gateway-token-1234567890"; - const started = await startServerWithClient(token); - server = started.server; - ws = started.ws; - port = started.port; - await connectOk(ws, { token }); + testState.gatewayAuth = { mode: "token", token }; + port = await getFreePort(); + server = await startGatewayServer(port, { bind: "loopback" }); nodeWs = new WebSocket(`ws://127.0.0.1:${port}`); await new Promise((resolve) => nodeWs.once("open", resolve)); @@ -55,8 +54,7 @@ beforeAll(async () => { }); afterAll(async () => { - nodeWs.close(); - ws.close(); + nodeWs.terminate(); await server.close(); }); diff --git a/src/gateway/tools-invoke-http.test.ts b/src/gateway/tools-invoke-http.test.ts index 0db60b7188..d373c27410 100644 --- a/src/gateway/tools-invoke-http.test.ts +++ b/src/gateway/tools-invoke-http.test.ts @@ -46,7 +46,7 @@ const invokeAgentsList = async (params: { } return await fetch(`http://127.0.0.1:${params.port}/tools/invoke`, { method: "POST", - headers: { "content-type": "application/json", ...params.headers }, + headers: { "content-type": "application/json", connection: "close", ...params.headers }, body: JSON.stringify(body), }); }; @@ -71,7 +71,7 @@ const invokeTool = async (params: { } return await fetch(`http://127.0.0.1:${params.port}/tools/invoke`, { method: "POST", - headers: { "content-type": "application/json", ...params.headers }, + headers: { "content-type": "application/json", connection: "close", ...params.headers }, body: JSON.stringify(body), }); }; @@ -144,41 +144,6 @@ describe("POST /tools/invoke", () => { expect(implicitBody.ok).toBe(true); }); - it("handles dedicated auth modes for password accept and token reject", async () => { - allowAgentsListForMain(); - - const passwordPort = await getFreePort(); - const passwordServer = await startGatewayServer(passwordPort, { - bind: "loopback", - auth: { mode: "password", password: "secret" }, - }); - try { - const passwordRes = await invokeAgentsList({ - port: passwordPort, - headers: { authorization: "Bearer secret" }, - sessionKey: "main", - }); - expect(passwordRes.status).toBe(200); - } finally { - await passwordServer.close(); - } - - const tokenPort = await getFreePort(); - const tokenServer = await startGatewayServer(tokenPort, { - bind: "loopback", - auth: { mode: "token", token: "t" }, - }); - try { - const tokenRes = await invokeAgentsList({ - port: tokenPort, - sessionKey: "main", - }); - expect(tokenRes.status).toBe(401); - } finally { - await tokenServer.close(); - } - }); - it("routes tools invoke before plugin HTTP handlers", async () => { const pluginHandler = vi.fn(async (_req: IncomingMessage, res: ServerResponse) => { res.statusCode = 418;