diff --git a/src/memory/manager.batch.test.ts b/src/memory/manager.batch.test.ts index bd29d8f6e7..eb17760a12 100644 --- a/src/memory/manager.batch.test.ts +++ b/src/memory/manager.batch.test.ts @@ -59,6 +59,95 @@ describe("memory indexing with OpenAI batches", () => { return () => spy.mockRestore(); } + async function readOpenAIBatchUploadRequests(body: FormData) { + let uploadedRequests: Array<{ custom_id?: string }> = []; + for (const [key, value] of body.entries()) { + if (key !== "file") { + continue; + } + const text = typeof value === "string" ? value : await value.text(); + uploadedRequests = text + .split("\n") + .filter(Boolean) + .map((line) => JSON.parse(line) as { custom_id?: string }); + } + return uploadedRequests; + } + + function createOpenAIBatchFetchMock(options?: { + onCreateBatch?: (ctx: { batchCreates: number }) => Response | Promise; + }) { + let uploadedRequests: Array<{ custom_id?: string }> = []; + const state = { batchCreates: 0 }; + const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => { + const url = + typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url; + if (url.endsWith("/files")) { + const body = init?.body; + if (!(body instanceof FormData)) { + throw new Error("expected FormData upload"); + } + uploadedRequests = await readOpenAIBatchUploadRequests(body); + return new Response(JSON.stringify({ id: "file_1" }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + if (url.endsWith("/batches")) { + state.batchCreates += 1; + if (options?.onCreateBatch) { + return await options.onCreateBatch({ batchCreates: state.batchCreates }); + } + return new Response(JSON.stringify({ id: "batch_1", status: "in_progress" }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + if (url.endsWith("/batches/batch_1")) { + return new Response( + JSON.stringify({ id: "batch_1", status: "completed", output_file_id: "file_out" }), + { status: 200, headers: { "Content-Type": "application/json" } }, + ); + } + if (url.endsWith("/files/file_out/content")) { + const lines = uploadedRequests.map((request, index) => + JSON.stringify({ + custom_id: request.custom_id, + response: { + status_code: 200, + body: { data: [{ embedding: [index + 1, 0, 0], index: 0 }] }, + }, + }), + ); + return new Response(lines.join("\n"), { + status: 200, + headers: { "Content-Type": "application/jsonl" }, + }); + } + throw new Error(`unexpected fetch ${url}`); + }); + return { fetchMock, state }; + } + + function createBatchCfg() { + return { + agents: { + defaults: { + workspace: workspaceDir, + memorySearch: { + provider: "openai", + model: "text-embedding-3-small", + store: { path: indexPath, vector: { enabled: false } }, + sync: { watch: false, onSessionStart: false, onSearch: false }, + query: { minScore: 0, hybrid: { enabled: false } }, + remote: { batch: { enabled: true, wait: true, pollIntervalMs: 1 } }, + }, + }, + list: [{ id: "main", default: true }], + }, + }; + } + beforeAll(async () => { fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-mem-batch-")); }); @@ -91,85 +180,11 @@ describe("memory indexing with OpenAI batches", () => { const content = ["hello", "from", "batch"].join("\n\n"); await fs.writeFile(path.join(workspaceDir, "memory", "2026-01-07.md"), content); - let uploadedRequests: Array<{ custom_id?: string }> = []; - const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => { - const url = - typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url; - if (url.endsWith("/files")) { - const body = init?.body; - if (!(body instanceof FormData)) { - throw new Error("expected FormData upload"); - } - for (const [key, value] of body.entries()) { - if (key !== "file") { - continue; - } - if (typeof value === "string") { - uploadedRequests = value - .split("\n") - .filter(Boolean) - .map((line) => JSON.parse(line) as { custom_id?: string }); - } else { - const text = await value.text(); - uploadedRequests = text - .split("\n") - .filter(Boolean) - .map((line) => JSON.parse(line) as { custom_id?: string }); - } - } - return new Response(JSON.stringify({ id: "file_1" }), { - status: 200, - headers: { "Content-Type": "application/json" }, - }); - } - if (url.endsWith("/batches")) { - return new Response(JSON.stringify({ id: "batch_1", status: "in_progress" }), { - status: 200, - headers: { "Content-Type": "application/json" }, - }); - } - if (url.endsWith("/batches/batch_1")) { - return new Response( - JSON.stringify({ id: "batch_1", status: "completed", output_file_id: "file_out" }), - { status: 200, headers: { "Content-Type": "application/json" } }, - ); - } - if (url.endsWith("/files/file_out/content")) { - const lines = uploadedRequests.map((request, index) => - JSON.stringify({ - custom_id: request.custom_id, - response: { - status_code: 200, - body: { data: [{ embedding: [index + 1, 0, 0], index: 0 }] }, - }, - }), - ); - return new Response(lines.join("\n"), { - status: 200, - headers: { "Content-Type": "application/jsonl" }, - }); - } - throw new Error(`unexpected fetch ${url}`); - }); + const { fetchMock } = createOpenAIBatchFetchMock(); vi.stubGlobal("fetch", fetchMock); - const cfg = { - agents: { - defaults: { - workspace: workspaceDir, - memorySearch: { - provider: "openai", - model: "text-embedding-3-small", - store: { path: indexPath, vector: { enabled: false } }, - sync: { watch: false, onSessionStart: false, onSearch: false }, - query: { minScore: 0, hybrid: { enabled: false } }, - remote: { batch: { enabled: true, wait: true, pollIntervalMs: 1 } }, - }, - }, - list: [{ id: "main", default: true }], - }, - }; + const cfg = createBatchCfg(); try { const result = await getMemorySearchManager({ cfg, agentId: "main" }); @@ -202,40 +217,8 @@ describe("memory indexing with OpenAI batches", () => { const content = ["retry", "the", "batch"].join("\n\n"); await fs.writeFile(path.join(workspaceDir, "memory", "2026-01-08.md"), content); - let uploadedRequests: Array<{ custom_id?: string }> = []; - let batchCreates = 0; - const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => { - const url = - typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url; - if (url.endsWith("/files")) { - const body = init?.body; - if (!(body instanceof FormData)) { - throw new Error("expected FormData upload"); - } - for (const [key, value] of body.entries()) { - if (key !== "file") { - continue; - } - if (typeof value === "string") { - uploadedRequests = value - .split("\n") - .filter(Boolean) - .map((line) => JSON.parse(line) as { custom_id?: string }); - } else { - const text = await value.text(); - uploadedRequests = text - .split("\n") - .filter(Boolean) - .map((line) => JSON.parse(line) as { custom_id?: string }); - } - } - return new Response(JSON.stringify({ id: "file_1" }), { - status: 200, - headers: { "Content-Type": "application/json" }, - }); - } - if (url.endsWith("/batches")) { - batchCreates += 1; + const { fetchMock, state } = createOpenAIBatchFetchMock({ + onCreateBatch: ({ batchCreates }) => { if (batchCreates === 1) { return new Response("upstream connect error", { status: 503 }); } @@ -243,49 +226,12 @@ describe("memory indexing with OpenAI batches", () => { status: 200, headers: { "Content-Type": "application/json" }, }); - } - if (url.endsWith("/batches/batch_1")) { - return new Response( - JSON.stringify({ id: "batch_1", status: "completed", output_file_id: "file_out" }), - { status: 200, headers: { "Content-Type": "application/json" } }, - ); - } - if (url.endsWith("/files/file_out/content")) { - const lines = uploadedRequests.map((request, index) => - JSON.stringify({ - custom_id: request.custom_id, - response: { - status_code: 200, - body: { data: [{ embedding: [index + 1, 0, 0], index: 0 }] }, - }, - }), - ); - return new Response(lines.join("\n"), { - status: 200, - headers: { "Content-Type": "application/jsonl" }, - }); - } - throw new Error(`unexpected fetch ${url}`); + }, }); vi.stubGlobal("fetch", fetchMock); - const cfg = { - agents: { - defaults: { - workspace: workspaceDir, - memorySearch: { - provider: "openai", - model: "text-embedding-3-small", - store: { path: indexPath, vector: { enabled: false } }, - sync: { watch: false, onSessionStart: false, onSearch: false }, - query: { minScore: 0, hybrid: { enabled: false } }, - remote: { batch: { enabled: true, wait: true, pollIntervalMs: 1 } }, - }, - }, - list: [{ id: "main", default: true }], - }, - }; + const cfg = createBatchCfg(); try { const result = await getMemorySearchManager({ cfg, agentId: "main" }); @@ -298,7 +244,7 @@ describe("memory indexing with OpenAI batches", () => { const status = manager.status(); expect(status.chunks).toBeGreaterThan(0); - expect(batchCreates).toBe(2); + expect(state.batchCreates).toBe(2); } finally { restoreTimeouts(); } @@ -318,39 +264,9 @@ describe("memory indexing with OpenAI batches", () => { await fs.writeFile(memoryFile, content); await touch(); - let uploadedRequests: Array<{ custom_id?: string }> = []; let mode: "fail" | "ok" = "fail"; - const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => { - const url = - typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url; - if (url.endsWith("/files")) { - const body = init?.body; - if (!(body instanceof FormData)) { - throw new Error("expected FormData upload"); - } - for (const [key, value] of body.entries()) { - if (key !== "file") { - continue; - } - if (typeof value === "string") { - uploadedRequests = value - .split("\n") - .filter(Boolean) - .map((line) => JSON.parse(line) as { custom_id?: string }); - } else { - const text = await value.text(); - uploadedRequests = text - .split("\n") - .filter(Boolean) - .map((line) => JSON.parse(line) as { custom_id?: string }); - } - } - return new Response(JSON.stringify({ id: "file_1" }), { - status: 200, - headers: { "Content-Type": "application/json" }, - }); - } - if (url.endsWith("/batches")) { + const { fetchMock } = createOpenAIBatchFetchMock({ + onCreateBatch: () => { if (mode === "fail") { return new Response("batch failed", { status: 400 }); } @@ -358,49 +274,12 @@ describe("memory indexing with OpenAI batches", () => { status: 200, headers: { "Content-Type": "application/json" }, }); - } - if (url.endsWith("/batches/batch_1")) { - return new Response( - JSON.stringify({ id: "batch_1", status: "completed", output_file_id: "file_out" }), - { status: 200, headers: { "Content-Type": "application/json" } }, - ); - } - if (url.endsWith("/files/file_out/content")) { - const lines = uploadedRequests.map((request, index) => - JSON.stringify({ - custom_id: request.custom_id, - response: { - status_code: 200, - body: { data: [{ embedding: [index + 1, 0, 0], index: 0 }] }, - }, - }), - ); - return new Response(lines.join("\n"), { - status: 200, - headers: { "Content-Type": "application/jsonl" }, - }); - } - throw new Error(`unexpected fetch ${url}`); + }, }); vi.stubGlobal("fetch", fetchMock); - const cfg = { - agents: { - defaults: { - workspace: workspaceDir, - memorySearch: { - provider: "openai", - model: "text-embedding-3-small", - store: { path: indexPath, vector: { enabled: false } }, - sync: { watch: false, onSessionStart: false, onSearch: false }, - query: { minScore: 0, hybrid: { enabled: false } }, - remote: { batch: { enabled: true, wait: true, pollIntervalMs: 1 } }, - }, - }, - list: [{ id: "main", default: true }], - }, - }; + const cfg = createBatchCfg(); try { const result = await getMemorySearchManager({ cfg, agentId: "main" });