diff --git a/src/memory/batch-http.ts b/src/memory/batch-http.ts new file mode 100644 index 0000000000..24405e20ba --- /dev/null +++ b/src/memory/batch-http.ts @@ -0,0 +1,38 @@ +import { retryAsync } from "../infra/retry.js"; + +export async function postJsonWithRetry(params: { + url: string; + headers: Record; + body: unknown; + errorPrefix: string; +}): Promise { + const res = await retryAsync( + async () => { + const res = await fetch(params.url, { + method: "POST", + headers: params.headers, + body: JSON.stringify(params.body), + }); + if (!res.ok) { + const text = await res.text(); + const err = new Error(`${params.errorPrefix}: ${res.status} ${text}`) as Error & { + status?: number; + }; + err.status = res.status; + throw err; + } + return res; + }, + { + attempts: 3, + minDelayMs: 300, + maxDelayMs: 2000, + jitter: 0.2, + shouldRetry: (err) => { + const status = (err as { status?: number }).status; + return status === 429 || (typeof status === "number" && status >= 500); + }, + }, + ); + return (await res.json()) as T; +} diff --git a/src/memory/batch-openai.ts b/src/memory/batch-openai.ts index 322ecc9fd7..6c35a831b3 100644 --- a/src/memory/batch-openai.ts +++ b/src/memory/batch-openai.ts @@ -1,5 +1,5 @@ import type { OpenAiEmbeddingClient } from "./embeddings-openai.js"; -import { retryAsync } from "../infra/retry.js"; +import { postJsonWithRetry } from "./batch-http.js"; import { applyEmbeddingBatchOutputLine } from "./batch-output.js"; import { hashText, runWithConcurrency } from "./internal.js"; @@ -96,43 +96,20 @@ async function submitOpenAiBatch(params: { throw new Error("openai batch file upload failed: missing file id"); } - const batchRes = await retryAsync( - async () => { - const res = await fetch(`${baseUrl}/batches`, { - method: "POST", - headers: getOpenAiHeaders(params.openAi, { json: true }), - body: JSON.stringify({ - input_file_id: filePayload.id, - endpoint: OPENAI_BATCH_ENDPOINT, - completion_window: OPENAI_BATCH_COMPLETION_WINDOW, - metadata: { - source: "openclaw-memory", - agent: params.agentId, - }, - }), - }); - if (!res.ok) { - const text = await res.text(); - const err = new Error(`openai batch create failed: ${res.status} ${text}`) as Error & { - status?: number; - }; - err.status = res.status; - throw err; - } - return res; - }, - { - attempts: 3, - minDelayMs: 300, - maxDelayMs: 2000, - jitter: 0.2, - shouldRetry: (err) => { - const status = (err as { status?: number }).status; - return status === 429 || (typeof status === "number" && status >= 500); + return await postJsonWithRetry({ + url: `${baseUrl}/batches`, + headers: getOpenAiHeaders(params.openAi, { json: true }), + body: { + input_file_id: filePayload.id, + endpoint: OPENAI_BATCH_ENDPOINT, + completion_window: OPENAI_BATCH_COMPLETION_WINDOW, + metadata: { + source: "openclaw-memory", + agent: params.agentId, }, }, - ); - return (await batchRes.json()) as OpenAiBatchStatus; + errorPrefix: "openai batch create failed", + }); } async function fetchOpenAiBatchStatus(params: { diff --git a/src/memory/batch-voyage.ts b/src/memory/batch-voyage.ts index 855d64bbe6..72fdf09a2f 100644 --- a/src/memory/batch-voyage.ts +++ b/src/memory/batch-voyage.ts @@ -1,7 +1,7 @@ import { createInterface } from "node:readline"; import { Readable } from "node:stream"; import type { VoyageEmbeddingClient } from "./embeddings-voyage.js"; -import { retryAsync } from "../infra/retry.js"; +import { postJsonWithRetry } from "./batch-http.js"; import { applyEmbeddingBatchOutputLine } from "./batch-output.js"; import { hashText, runWithConcurrency } from "./internal.js"; @@ -101,47 +101,24 @@ async function submitVoyageBatch(params: { } // 2. Create batch job using Voyage Batches API - const batchRes = await retryAsync( - async () => { - const res = await fetch(`${baseUrl}/batches`, { - method: "POST", - headers: getVoyageHeaders(params.client, { json: true }), - body: JSON.stringify({ - input_file_id: filePayload.id, - endpoint: VOYAGE_BATCH_ENDPOINT, - completion_window: VOYAGE_BATCH_COMPLETION_WINDOW, - request_params: { - model: params.client.model, - input_type: "document", - }, - metadata: { - source: "clawdbot-memory", - agent: params.agentId, - }, - }), - }); - if (!res.ok) { - const text = await res.text(); - const err = new Error(`voyage batch create failed: ${res.status} ${text}`) as Error & { - status?: number; - }; - err.status = res.status; - throw err; - } - return res; - }, - { - attempts: 3, - minDelayMs: 300, - maxDelayMs: 2000, - jitter: 0.2, - shouldRetry: (err) => { - const status = (err as { status?: number }).status; - return status === 429 || (typeof status === "number" && status >= 500); + return await postJsonWithRetry({ + url: `${baseUrl}/batches`, + headers: getVoyageHeaders(params.client, { json: true }), + body: { + input_file_id: filePayload.id, + endpoint: VOYAGE_BATCH_ENDPOINT, + completion_window: VOYAGE_BATCH_COMPLETION_WINDOW, + request_params: { + model: params.client.model, + input_type: "document", + }, + metadata: { + source: "clawdbot-memory", + agent: params.agentId, }, }, - ); - return (await batchRes.json()) as VoyageBatchStatus; + errorPrefix: "voyage batch create failed", + }); } async function fetchVoyageBatchStatus(params: {