diff --git a/src/memory/batch-openai.ts b/src/memory/batch-openai.ts index efda188384..322ecc9fd7 100644 --- a/src/memory/batch-openai.ts +++ b/src/memory/batch-openai.ts @@ -1,5 +1,6 @@ import type { OpenAiEmbeddingClient } from "./embeddings-openai.js"; import { retryAsync } from "../infra/retry.js"; +import { applyEmbeddingBatchOutputLine } from "./batch-output.js"; import { hashText, runWithConcurrency } from "./internal.js"; export type OpenAiBatchRequest = { @@ -313,32 +314,7 @@ export async function runOpenAiEmbeddingBatches(params: { const remaining = new Set(group.map((request) => request.custom_id)); for (const line of outputLines) { - const customId = line.custom_id; - if (!customId) { - continue; - } - remaining.delete(customId); - if (line.error?.message) { - errors.push(`${customId}: ${line.error.message}`); - continue; - } - const response = line.response; - const statusCode = response?.status_code ?? 0; - if (statusCode >= 400) { - const message = - response?.body?.error?.message ?? - (typeof response?.body === "string" ? response.body : undefined) ?? - "unknown error"; - errors.push(`${customId}: ${message}`); - continue; - } - const data = response?.body?.data ?? []; - const embedding = data[0]?.embedding ?? []; - if (embedding.length === 0) { - errors.push(`${customId}: empty embedding`); - continue; - } - byCustomId.set(customId, embedding); + applyEmbeddingBatchOutputLine({ line, remaining, errors, byCustomId }); } if (errors.length > 0) { diff --git a/src/memory/batch-output.ts b/src/memory/batch-output.ts new file mode 100644 index 0000000000..e2a75a878d --- /dev/null +++ b/src/memory/batch-output.ts @@ -0,0 +1,55 @@ +export type EmbeddingBatchOutputLine = { + custom_id?: string; + error?: { message?: string }; + response?: { + status_code?: number; + body?: + | { + data?: Array<{ + embedding?: number[]; + }>; + error?: { message?: string }; + } + | string; + }; +}; + +export function applyEmbeddingBatchOutputLine(params: { + line: EmbeddingBatchOutputLine; + remaining: Set; + errors: string[]; + byCustomId: Map; +}) { + const customId = params.line.custom_id; + if (!customId) { + return; + } + params.remaining.delete(customId); + + const errorMessage = params.line.error?.message; + if (errorMessage) { + params.errors.push(`${customId}: ${errorMessage}`); + return; + } + + const response = params.line.response; + const statusCode = response?.status_code ?? 0; + if (statusCode >= 400) { + const messageFromObject = + response?.body && typeof response.body === "object" + ? (response.body as { error?: { message?: string } }).error?.message + : undefined; + const messageFromString = typeof response?.body === "string" ? response.body : undefined; + params.errors.push(`${customId}: ${messageFromObject ?? messageFromString ?? "unknown error"}`); + return; + } + + const data = + response?.body && typeof response.body === "object" ? (response.body.data ?? []) : []; + const embedding = data[0]?.embedding ?? []; + if (embedding.length === 0) { + params.errors.push(`${customId}: empty embedding`); + return; + } + params.byCustomId.set(customId, embedding); +} diff --git a/src/memory/batch-voyage.ts b/src/memory/batch-voyage.ts index b559e92da9..855d64bbe6 100644 --- a/src/memory/batch-voyage.ts +++ b/src/memory/batch-voyage.ts @@ -2,6 +2,7 @@ import { createInterface } from "node:readline"; import { Readable } from "node:stream"; import type { VoyageEmbeddingClient } from "./embeddings-voyage.js"; import { retryAsync } from "../infra/retry.js"; +import { applyEmbeddingBatchOutputLine } from "./batch-output.js"; import { hashText, runWithConcurrency } from "./internal.js"; /** @@ -322,32 +323,7 @@ export async function runVoyageEmbeddingBatches(params: { continue; } const line = JSON.parse(rawLine) as VoyageBatchOutputLine; - const customId = line.custom_id; - if (!customId) { - continue; - } - remaining.delete(customId); - if (line.error?.message) { - errors.push(`${customId}: ${line.error.message}`); - continue; - } - const response = line.response; - const statusCode = response?.status_code ?? 0; - if (statusCode >= 400) { - const message = - response?.body?.error?.message ?? - (typeof response?.body === "string" ? response.body : undefined) ?? - "unknown error"; - errors.push(`${customId}: ${message}`); - continue; - } - const data = response?.body?.data ?? []; - const embedding = data[0]?.embedding ?? []; - if (embedding.length === 0) { - errors.push(`${customId}: empty embedding`); - continue; - } - byCustomId.set(customId, embedding); + applyEmbeddingBatchOutputLine({ line, remaining, errors, byCustomId }); } }