diff --git a/src/auto-reply/reply.block-streaming.test.ts b/src/auto-reply/reply.block-streaming.test.ts index 4d4fd8d1c8..e051944dc9 100644 --- a/src/auto-reply/reply.block-streaming.test.ts +++ b/src/auto-reply/reply.block-streaming.test.ts @@ -98,69 +98,7 @@ describe("block streaming", () => { ]); }); - it("waits for block replies before returning final payloads", async () => { - await withTempHome(async (home) => { - let releaseTyping: (() => void) | undefined; - const typingGate = new Promise((resolve) => { - releaseTyping = resolve; - }); - let resolveOnReplyStart: (() => void) | undefined; - const onReplyStartCalled = new Promise((resolve) => { - resolveOnReplyStart = resolve; - }); - const onReplyStart = vi.fn(() => { - resolveOnReplyStart?.(); - return typingGate; - }); - const onBlockReply = vi.fn().mockResolvedValue(undefined); - - const impl = async (params: RunEmbeddedPiAgentParams) => { - void params.onBlockReply?.({ text: "hello" }); - return { - payloads: [{ text: "hello" }], - meta: { - durationMs: 5, - agentMeta: { sessionId: "s", provider: "p", model: "m" }, - }, - }; - }; - piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(impl); - - const replyPromise = getReplyFromConfig( - { - Body: "ping", - From: "+1004", - To: "+2000", - MessageSid: "msg-123", - Provider: "discord", - }, - { - onReplyStart, - onBlockReply, - disableBlockStreaming: false, - }, - { - agents: { - defaults: { - model: "anthropic/claude-opus-4-5", - workspace: path.join(home, "openclaw"), - }, - }, - channels: { whatsapp: { allowFrom: ["*"] } }, - session: { store: path.join(home, "sessions.json") }, - }, - ); - - await onReplyStartCalled; - releaseTyping?.(); - - const res = await replyPromise; - expect(res).toBeUndefined(); - expect(onBlockReply).toHaveBeenCalledTimes(1); - }); - }); - - it("preserves block reply ordering when typing start is slow", async () => { + it("waits for block replies and preserves ordering when typing start is slow", async () => { await withTempHome(async (home) => { let releaseTyping: (() => void) | undefined; const typingGate = new Promise((resolve) => { @@ -197,7 +135,7 @@ describe("block streaming", () => { Body: "ping", From: "+1004", To: "+2000", - MessageSid: "msg-125", + MessageSid: "msg-123", Provider: "telegram", }, { @@ -309,7 +247,7 @@ describe("block streaming", () => { }, { onBlockReply, - blockReplyTimeoutMs: 10, + blockReplyTimeoutMs: 1, disableBlockStreaming: false, }, { diff --git a/src/auto-reply/reply.raw-body.test.ts b/src/auto-reply/reply.raw-body.test.ts index 75d586bffe..e66b174e05 100644 --- a/src/auto-reply/reply.raw-body.test.ts +++ b/src/auto-reply/reply.raw-body.test.ts @@ -140,31 +140,6 @@ describe("RawBody directive parsing", () => { expectedIncludes: ["Thinking level set to high."], }); - await assertCommandReply({ - message: { - Body: "[Context]\nJake: /model status\n[from: Jake]", - RawBody: "/model status", - From: "+1222", - To: "+1222", - ChatType: "group", - CommandAuthorized: true, - }, - config: { - agents: { - defaults: { - model: "anthropic/claude-opus-4-5", - workspace: path.join(home, "openclaw-2"), - models: { - "anthropic/claude-opus-4-5": {}, - }, - }, - }, - channels: { whatsapp: { allowFrom: ["*"] } }, - session: { store: path.join(home, "sessions-2.json") }, - }, - expectedIncludes: ["anthropic/claude-opus-4-5"], - }); - await assertCommandReply({ message: { Body: "[Context]\nJake: /verbose on\n[from: Jake]", @@ -178,11 +153,11 @@ describe("RawBody directive parsing", () => { agents: { defaults: { model: "anthropic/claude-opus-4-5", - workspace: path.join(home, "openclaw-3"), + workspace: path.join(home, "openclaw-2"), }, }, channels: { whatsapp: { allowFrom: ["*"] } }, - session: { store: path.join(home, "sessions-3.json") }, + session: { store: path.join(home, "sessions-2.json") }, }, expectedIncludes: ["Verbose logging enabled."], }); @@ -204,11 +179,11 @@ describe("RawBody directive parsing", () => { agents: { defaults: { model: "anthropic/claude-opus-4-5", - workspace: path.join(home, "openclaw-4"), + workspace: path.join(home, "openclaw-3"), }, }, channels: { whatsapp: { allowFrom: ["+1222"] } }, - session: { store: path.join(home, "sessions-4.json") }, + session: { store: path.join(home, "sessions-3.json") }, }, expectedIncludes: ["Session: agent:main:whatsapp:group:g1", "anthropic/claude-opus-4-5"], }); diff --git a/src/infra/transport-ready.test.ts b/src/infra/transport-ready.test.ts index adb2560ce1..2df90a6420 100644 --- a/src/infra/transport-ready.test.ts +++ b/src/infra/transport-ready.test.ts @@ -15,22 +15,22 @@ describe("waitForTransportReady", () => { let attempts = 0; const readyPromise = waitForTransportReady({ label: "test transport", - timeoutMs: 500, - logAfterMs: 120, - logIntervalMs: 100, - pollIntervalMs: 80, + timeoutMs: 220, + logAfterMs: 60, + logIntervalMs: 1_000, + pollIntervalMs: 50, runtime, check: async () => { attempts += 1; - if (attempts > 4) { + if (attempts > 2) { return { ok: true }; } return { ok: false, error: "not ready" }; }, }); - for (let i = 0; i < 5; i += 1) { - await vi.advanceTimersByTimeAsync(80); + for (let i = 0; i < 3; i += 1) { + await vi.advanceTimersByTimeAsync(50); } await readyPromise; @@ -41,14 +41,14 @@ describe("waitForTransportReady", () => { const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() }; const waitPromise = waitForTransportReady({ label: "test transport", - timeoutMs: 200, + timeoutMs: 110, logAfterMs: 0, - logIntervalMs: 100, + logIntervalMs: 1_000, pollIntervalMs: 50, runtime, check: async () => ({ ok: false, error: "still down" }), }); - await vi.advanceTimersByTimeAsync(250); + await vi.advanceTimersByTimeAsync(200); await expect(waitPromise).rejects.toThrow("test transport not ready"); expect(runtime.error).toHaveBeenCalled(); }); diff --git a/src/memory/index.test.ts b/src/memory/index.test.ts index 3030c45dbb..9f5d708a2b 100644 --- a/src/memory/index.test.ts +++ b/src/memory/index.test.ts @@ -280,7 +280,7 @@ describe("memory index", () => { expect(results[0]?.path).toContain("memory/2026-01-12.md"); }); - it("hybrid weights can favor vector-only matches over keyword-only matches", async () => { + it("hybrid weights shift ranking between vector and keyword matches", async () => { const manyAlpha = Array.from({ length: 50 }, () => "Alpha").join(" "); await fs.writeFile( path.join(workspaceDir, "memory", "vector-only.md"), @@ -291,7 +291,7 @@ describe("memory index", () => { `${manyAlpha} beta id123.`, ); - const cfg = { + const vectorWeightedCfg = { agents: { defaults: { workspace: workspaceDir, @@ -315,12 +315,15 @@ describe("memory index", () => { list: [{ id: "main", default: true }], }, }; - const result = await getMemorySearchManager({ cfg, agentId: "main" }); - expect(result.manager).not.toBeNull(); - if (!result.manager) { + const vectorWeighted = await getMemorySearchManager({ + cfg: vectorWeightedCfg, + agentId: "main", + }); + expect(vectorWeighted.manager).not.toBeNull(); + if (!vectorWeighted.manager) { throw new Error("manager missing"); } - manager = result.manager; + manager = vectorWeighted.manager; const status = manager.status(); if (!status.fts?.available) { @@ -328,28 +331,19 @@ describe("memory index", () => { } await manager.sync({ force: true }); - const results = await manager.search("alpha beta id123"); - expect(results.length).toBeGreaterThan(0); - const paths = results.map((r) => r.path); - expect(paths).toContain("memory/vector-only.md"); - expect(paths).toContain("memory/keyword-only.md"); - const vectorOnly = results.find((r) => r.path === "memory/vector-only.md"); - const keywordOnly = results.find((r) => r.path === "memory/keyword-only.md"); + const vectorResults = await manager.search("alpha beta id123"); + expect(vectorResults.length).toBeGreaterThan(0); + const vectorPaths = vectorResults.map((r) => r.path); + expect(vectorPaths).toContain("memory/vector-only.md"); + expect(vectorPaths).toContain("memory/keyword-only.md"); + const vectorOnly = vectorResults.find((r) => r.path === "memory/vector-only.md"); + const keywordOnly = vectorResults.find((r) => r.path === "memory/keyword-only.md"); expect((vectorOnly?.score ?? 0) > (keywordOnly?.score ?? 0)).toBe(true); - }); - it("hybrid weights can favor keyword matches when text weight dominates", async () => { - const manyAlpha = Array.from({ length: 50 }, () => "Alpha").join(" "); - await fs.writeFile( - path.join(workspaceDir, "memory", "vector-only.md"), - "Alpha beta. Alpha beta. Alpha beta. Alpha beta.", - ); - await fs.writeFile( - path.join(workspaceDir, "memory", "keyword-only.md"), - `${manyAlpha} beta id123.`, - ); + await manager.close(); + manager = null; - const cfg = { + const textWeightedCfg = { agents: { defaults: { workspace: workspaceDir, @@ -357,7 +351,7 @@ describe("memory index", () => { provider: "openai", model: "mock-embed", store: { path: indexPath, vector: { enabled: false } }, - sync: { watch: false, onSessionStart: false, onSearch: true }, + sync: { watch: false, onSessionStart: false, onSearch: false }, query: { minScore: 0, maxResults: 200, @@ -373,27 +367,21 @@ describe("memory index", () => { list: [{ id: "main", default: true }], }, }; - const result = await getMemorySearchManager({ cfg, agentId: "main" }); - expect(result.manager).not.toBeNull(); - if (!result.manager) { + + const textWeighted = await getMemorySearchManager({ cfg: textWeightedCfg, agentId: "main" }); + expect(textWeighted.manager).not.toBeNull(); + if (!textWeighted.manager) { throw new Error("manager missing"); } - manager = result.manager; - - const status = manager.status(); - if (!status.fts?.available) { - return; - } - - await manager.sync({ force: true }); - const results = await manager.search("alpha beta id123"); - expect(results.length).toBeGreaterThan(0); - const paths = results.map((r) => r.path); - expect(paths).toContain("memory/vector-only.md"); - expect(paths).toContain("memory/keyword-only.md"); - const vectorOnly = results.find((r) => r.path === "memory/vector-only.md"); - const keywordOnly = results.find((r) => r.path === "memory/keyword-only.md"); - expect((keywordOnly?.score ?? 0) > (vectorOnly?.score ?? 0)).toBe(true); + manager = textWeighted.manager; + const keywordResults = await manager.search("alpha beta id123"); + expect(keywordResults.length).toBeGreaterThan(0); + const keywordPaths = keywordResults.map((r) => r.path); + expect(keywordPaths).toContain("memory/vector-only.md"); + expect(keywordPaths).toContain("memory/keyword-only.md"); + const vectorOnlyAfter = keywordResults.find((r) => r.path === "memory/vector-only.md"); + const keywordOnlyAfter = keywordResults.find((r) => r.path === "memory/keyword-only.md"); + expect((keywordOnlyAfter?.score ?? 0) > (vectorOnlyAfter?.score ?? 0)).toBe(true); }); it("reports vector availability after probe", async () => { diff --git a/src/memory/manager.batch.test.ts b/src/memory/manager.batch.test.ts index 2ac5eeb5be..2cf1b30c05 100644 --- a/src/memory/manager.batch.test.ts +++ b/src/memory/manager.batch.test.ts @@ -281,7 +281,7 @@ describe("memory indexing with OpenAI batches", () => { expect(batchCreates).toBe(2); }); - it("falls back to non-batch on failure and resets failures after success", async () => { + it("tracks batch failures, resets on success, and disables after repeated failures", async () => { const content = ["flaky", "batch"].join("\n\n"); await fs.writeFile(path.join(workspaceDir, "memory", "2026-01-09.md"), content); @@ -376,12 +376,14 @@ describe("memory indexing with OpenAI batches", () => { } manager = result.manager; + // First failure: fallback to regular embeddings and increment failure count. await manager.sync({ force: true }); expect(embedBatch).toHaveBeenCalled(); let status = manager.status(); expect(status.batch?.enabled).toBe(true); expect(status.batch?.failures).toBe(1); + // Success should reset failure count. embedBatch.mockClear(); mode = "ok"; await fs.writeFile( @@ -393,110 +395,33 @@ describe("memory indexing with OpenAI batches", () => { expect(status.batch?.enabled).toBe(true); expect(status.batch?.failures).toBe(0); expect(embedBatch).not.toHaveBeenCalled(); - }); - - it("disables batch after repeated failures and skips batch thereafter", async () => { - const content = ["repeat", "failures"].join("\n\n"); - await fs.writeFile(path.join(workspaceDir, "memory", "2026-01-10.md"), content); - - let uploadedRequests: Array<{ custom_id?: string }> = []; - const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => { - const url = - typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url; - if (url.endsWith("/files")) { - const body = init?.body; - if (!(body instanceof FormData)) { - throw new Error("expected FormData upload"); - } - for (const [key, value] of body.entries()) { - if (key !== "file") { - continue; - } - if (typeof value === "string") { - uploadedRequests = value - .split("\n") - .filter(Boolean) - .map((line) => JSON.parse(line) as { custom_id?: string }); - } else { - const text = await value.text(); - uploadedRequests = text - .split("\n") - .filter(Boolean) - .map((line) => JSON.parse(line) as { custom_id?: string }); - } - } - return new Response(JSON.stringify({ id: "file_1" }), { - status: 200, - headers: { "Content-Type": "application/json" }, - }); - } - if (url.endsWith("/batches")) { - return new Response("batch failed", { status: 500 }); - } - if (url.endsWith("/files/file_out/content")) { - const lines = uploadedRequests.map((request, index) => - JSON.stringify({ - custom_id: request.custom_id, - response: { - status_code: 200, - body: { data: [{ embedding: [index + 1, 0, 0], index: 0 }] }, - }, - }), - ); - return new Response(lines.join("\n"), { - status: 200, - headers: { "Content-Type": "application/jsonl" }, - }); - } - throw new Error(`unexpected fetch ${url}`); - }); - - vi.stubGlobal("fetch", fetchMock); - - const cfg = { - agents: { - defaults: { - workspace: workspaceDir, - memorySearch: { - provider: "openai", - model: "text-embedding-3-small", - store: { path: indexPath }, - sync: { watch: false, onSessionStart: false, onSearch: false }, - query: { minScore: 0 }, - remote: { batch: { enabled: true, wait: true, pollIntervalMs: 1 } }, - }, - }, - list: [{ id: "main", default: true }], - }, - }; - - const result = await getMemorySearchManager({ cfg, agentId: "main" }); - expect(result.manager).not.toBeNull(); - if (!result.manager) { - throw new Error("manager missing"); - } - manager = result.manager; + // Two more failures after reset should disable remote batching. + mode = "fail"; + await fs.writeFile( + path.join(workspaceDir, "memory", "2026-01-09.md"), + ["flaky", "batch", "fail-a"].join("\n\n"), + ); await manager.sync({ force: true }); - let status = manager.status(); + status = manager.status(); expect(status.batch?.enabled).toBe(true); expect(status.batch?.failures).toBe(1); - embedBatch.mockClear(); await fs.writeFile( - path.join(workspaceDir, "memory", "2026-01-10.md"), - ["repeat", "failures", "again"].join("\n\n"), + path.join(workspaceDir, "memory", "2026-01-09.md"), + ["flaky", "batch", "fail-b"].join("\n\n"), ); await manager.sync({ force: true }); status = manager.status(); expect(status.batch?.enabled).toBe(false); expect(status.batch?.failures).toBeGreaterThanOrEqual(2); + // Once disabled, batch endpoints are skipped and fallback embeddings run directly. const fetchCalls = fetchMock.mock.calls.length; embedBatch.mockClear(); await fs.writeFile( - path.join(workspaceDir, "memory", "2026-01-10.md"), - ["repeat", "failures", "fallback"].join("\n\n"), + path.join(workspaceDir, "memory", "2026-01-09.md"), + ["flaky", "batch", "fallback"].join("\n\n"), ); await manager.sync({ force: true }); expect(fetchMock.mock.calls.length).toBe(fetchCalls);