refactor(memory): share batch create retry

This commit is contained in:
Peter Steinberger
2026-02-15 06:43:20 +00:00
parent 99da4c8d56
commit ebb54d71ef
3 changed files with 68 additions and 76 deletions

38
src/memory/batch-http.ts Normal file
View File

@@ -0,0 +1,38 @@
import { retryAsync } from "../infra/retry.js";
export async function postJsonWithRetry<T>(params: {
url: string;
headers: Record<string, string>;
body: unknown;
errorPrefix: string;
}): Promise<T> {
const res = await retryAsync(
async () => {
const res = await fetch(params.url, {
method: "POST",
headers: params.headers,
body: JSON.stringify(params.body),
});
if (!res.ok) {
const text = await res.text();
const err = new Error(`${params.errorPrefix}: ${res.status} ${text}`) as Error & {
status?: number;
};
err.status = res.status;
throw err;
}
return res;
},
{
attempts: 3,
minDelayMs: 300,
maxDelayMs: 2000,
jitter: 0.2,
shouldRetry: (err) => {
const status = (err as { status?: number }).status;
return status === 429 || (typeof status === "number" && status >= 500);
},
},
);
return (await res.json()) as T;
}

View File

@@ -1,5 +1,5 @@
import type { OpenAiEmbeddingClient } from "./embeddings-openai.js";
import { retryAsync } from "../infra/retry.js";
import { postJsonWithRetry } from "./batch-http.js";
import { applyEmbeddingBatchOutputLine } from "./batch-output.js";
import { hashText, runWithConcurrency } from "./internal.js";
@@ -96,43 +96,20 @@ async function submitOpenAiBatch(params: {
throw new Error("openai batch file upload failed: missing file id");
}
const batchRes = await retryAsync(
async () => {
const res = await fetch(`${baseUrl}/batches`, {
method: "POST",
headers: getOpenAiHeaders(params.openAi, { json: true }),
body: JSON.stringify({
input_file_id: filePayload.id,
endpoint: OPENAI_BATCH_ENDPOINT,
completion_window: OPENAI_BATCH_COMPLETION_WINDOW,
metadata: {
source: "openclaw-memory",
agent: params.agentId,
},
}),
});
if (!res.ok) {
const text = await res.text();
const err = new Error(`openai batch create failed: ${res.status} ${text}`) as Error & {
status?: number;
};
err.status = res.status;
throw err;
}
return res;
},
{
attempts: 3,
minDelayMs: 300,
maxDelayMs: 2000,
jitter: 0.2,
shouldRetry: (err) => {
const status = (err as { status?: number }).status;
return status === 429 || (typeof status === "number" && status >= 500);
return await postJsonWithRetry<OpenAiBatchStatus>({
url: `${baseUrl}/batches`,
headers: getOpenAiHeaders(params.openAi, { json: true }),
body: {
input_file_id: filePayload.id,
endpoint: OPENAI_BATCH_ENDPOINT,
completion_window: OPENAI_BATCH_COMPLETION_WINDOW,
metadata: {
source: "openclaw-memory",
agent: params.agentId,
},
},
);
return (await batchRes.json()) as OpenAiBatchStatus;
errorPrefix: "openai batch create failed",
});
}
async function fetchOpenAiBatchStatus(params: {

View File

@@ -1,7 +1,7 @@
import { createInterface } from "node:readline";
import { Readable } from "node:stream";
import type { VoyageEmbeddingClient } from "./embeddings-voyage.js";
import { retryAsync } from "../infra/retry.js";
import { postJsonWithRetry } from "./batch-http.js";
import { applyEmbeddingBatchOutputLine } from "./batch-output.js";
import { hashText, runWithConcurrency } from "./internal.js";
@@ -101,47 +101,24 @@ async function submitVoyageBatch(params: {
}
// 2. Create batch job using Voyage Batches API
const batchRes = await retryAsync(
async () => {
const res = await fetch(`${baseUrl}/batches`, {
method: "POST",
headers: getVoyageHeaders(params.client, { json: true }),
body: JSON.stringify({
input_file_id: filePayload.id,
endpoint: VOYAGE_BATCH_ENDPOINT,
completion_window: VOYAGE_BATCH_COMPLETION_WINDOW,
request_params: {
model: params.client.model,
input_type: "document",
},
metadata: {
source: "clawdbot-memory",
agent: params.agentId,
},
}),
});
if (!res.ok) {
const text = await res.text();
const err = new Error(`voyage batch create failed: ${res.status} ${text}`) as Error & {
status?: number;
};
err.status = res.status;
throw err;
}
return res;
},
{
attempts: 3,
minDelayMs: 300,
maxDelayMs: 2000,
jitter: 0.2,
shouldRetry: (err) => {
const status = (err as { status?: number }).status;
return status === 429 || (typeof status === "number" && status >= 500);
return await postJsonWithRetry<VoyageBatchStatus>({
url: `${baseUrl}/batches`,
headers: getVoyageHeaders(params.client, { json: true }),
body: {
input_file_id: filePayload.id,
endpoint: VOYAGE_BATCH_ENDPOINT,
completion_window: VOYAGE_BATCH_COMPLETION_WINDOW,
request_params: {
model: params.client.model,
input_type: "document",
},
metadata: {
source: "clawdbot-memory",
agent: params.agentId,
},
},
);
return (await batchRes.json()) as VoyageBatchStatus;
errorPrefix: "voyage batch create failed",
});
}
async function fetchVoyageBatchStatus(params: {