refactor(memory): share batch output parsing

This commit is contained in:
Peter Steinberger
2026-02-14 14:36:59 +00:00
parent 4f61a3f527
commit 7bd073340a
3 changed files with 59 additions and 52 deletions

View File

@@ -1,5 +1,6 @@
import type { OpenAiEmbeddingClient } from "./embeddings-openai.js";
import { retryAsync } from "../infra/retry.js";
import { applyEmbeddingBatchOutputLine } from "./batch-output.js";
import { hashText, runWithConcurrency } from "./internal.js";
export type OpenAiBatchRequest = {
@@ -313,32 +314,7 @@ export async function runOpenAiEmbeddingBatches(params: {
const remaining = new Set(group.map((request) => request.custom_id));
for (const line of outputLines) {
const customId = line.custom_id;
if (!customId) {
continue;
}
remaining.delete(customId);
if (line.error?.message) {
errors.push(`${customId}: ${line.error.message}`);
continue;
}
const response = line.response;
const statusCode = response?.status_code ?? 0;
if (statusCode >= 400) {
const message =
response?.body?.error?.message ??
(typeof response?.body === "string" ? response.body : undefined) ??
"unknown error";
errors.push(`${customId}: ${message}`);
continue;
}
const data = response?.body?.data ?? [];
const embedding = data[0]?.embedding ?? [];
if (embedding.length === 0) {
errors.push(`${customId}: empty embedding`);
continue;
}
byCustomId.set(customId, embedding);
applyEmbeddingBatchOutputLine({ line, remaining, errors, byCustomId });
}
if (errors.length > 0) {

View File

@@ -0,0 +1,55 @@
export type EmbeddingBatchOutputLine = {
custom_id?: string;
error?: { message?: string };
response?: {
status_code?: number;
body?:
| {
data?: Array<{
embedding?: number[];
}>;
error?: { message?: string };
}
| string;
};
};
export function applyEmbeddingBatchOutputLine(params: {
line: EmbeddingBatchOutputLine;
remaining: Set<string>;
errors: string[];
byCustomId: Map<string, number[]>;
}) {
const customId = params.line.custom_id;
if (!customId) {
return;
}
params.remaining.delete(customId);
const errorMessage = params.line.error?.message;
if (errorMessage) {
params.errors.push(`${customId}: ${errorMessage}`);
return;
}
const response = params.line.response;
const statusCode = response?.status_code ?? 0;
if (statusCode >= 400) {
const messageFromObject =
response?.body && typeof response.body === "object"
? (response.body as { error?: { message?: string } }).error?.message
: undefined;
const messageFromString = typeof response?.body === "string" ? response.body : undefined;
params.errors.push(`${customId}: ${messageFromObject ?? messageFromString ?? "unknown error"}`);
return;
}
const data =
response?.body && typeof response.body === "object" ? (response.body.data ?? []) : [];
const embedding = data[0]?.embedding ?? [];
if (embedding.length === 0) {
params.errors.push(`${customId}: empty embedding`);
return;
}
params.byCustomId.set(customId, embedding);
}

View File

@@ -2,6 +2,7 @@ import { createInterface } from "node:readline";
import { Readable } from "node:stream";
import type { VoyageEmbeddingClient } from "./embeddings-voyage.js";
import { retryAsync } from "../infra/retry.js";
import { applyEmbeddingBatchOutputLine } from "./batch-output.js";
import { hashText, runWithConcurrency } from "./internal.js";
/**
@@ -322,32 +323,7 @@ export async function runVoyageEmbeddingBatches(params: {
continue;
}
const line = JSON.parse(rawLine) as VoyageBatchOutputLine;
const customId = line.custom_id;
if (!customId) {
continue;
}
remaining.delete(customId);
if (line.error?.message) {
errors.push(`${customId}: ${line.error.message}`);
continue;
}
const response = line.response;
const statusCode = response?.status_code ?? 0;
if (statusCode >= 400) {
const message =
response?.body?.error?.message ??
(typeof response?.body === "string" ? response.body : undefined) ??
"unknown error";
errors.push(`${customId}: ${message}`);
continue;
}
const data = response?.body?.data ?? [];
const embedding = data[0]?.embedding ?? [];
if (embedding.length === 0) {
errors.push(`${customId}: empty embedding`);
continue;
}
byCustomId.set(customId, embedding);
applyEmbeddingBatchOutputLine({ line, remaining, errors, byCustomId });
}
}