refactor: extract shared install and embedding utilities

This commit is contained in:
Peter Steinberger
2026-02-18 04:48:30 +00:00
parent 4d3403b7ac
commit 8a9fddedc9
7 changed files with 247 additions and 257 deletions

View File

@@ -1,54 +1,13 @@
import type {
NodeListNode,
PairedNode,
PairingList,
PendingRequest,
} from "../../shared/node-list-types.js";
import { resolveNodeIdFromCandidates } from "../../shared/node-match.js";
import { callGatewayTool, type GatewayCallOptions } from "./gateway.js";
export type NodeListNode = {
nodeId: string;
displayName?: string;
platform?: string;
version?: string;
coreVersion?: string;
uiVersion?: string;
remoteIp?: string;
deviceFamily?: string;
modelIdentifier?: string;
caps?: string[];
commands?: string[];
permissions?: Record<string, boolean>;
paired?: boolean;
connected?: boolean;
};
type PendingRequest = {
requestId: string;
nodeId: string;
displayName?: string;
platform?: string;
version?: string;
coreVersion?: string;
uiVersion?: string;
remoteIp?: string;
isRepair?: boolean;
ts: number;
};
type PairedNode = {
nodeId: string;
token?: string;
displayName?: string;
platform?: string;
version?: string;
coreVersion?: string;
uiVersion?: string;
remoteIp?: string;
permissions?: Record<string, boolean>;
createdAtMs?: number;
approvedAtMs?: number;
};
type PairingList = {
pending: PendingRequest[];
paired: PairedNode[];
};
export type { NodeListNode };
function parseNodeList(value: unknown): NodeListNode[] {
const obj = typeof value === "object" && value !== null ? (value as Record<string, unknown>) : {};

View File

@@ -43,54 +43,9 @@ export type NodesRpcOpts = {
audio?: boolean;
};
export type NodeListNode = {
nodeId: string;
displayName?: string;
platform?: string;
version?: string;
coreVersion?: string;
uiVersion?: string;
remoteIp?: string;
deviceFamily?: string;
modelIdentifier?: string;
pathEnv?: string;
caps?: string[];
commands?: string[];
permissions?: Record<string, boolean>;
paired?: boolean;
connected?: boolean;
connectedAtMs?: number;
};
export type PendingRequest = {
requestId: string;
nodeId: string;
displayName?: string;
platform?: string;
version?: string;
coreVersion?: string;
uiVersion?: string;
remoteIp?: string;
isRepair?: boolean;
ts: number;
};
export type PairedNode = {
nodeId: string;
token?: string;
displayName?: string;
platform?: string;
version?: string;
coreVersion?: string;
uiVersion?: string;
remoteIp?: string;
permissions?: Record<string, boolean>;
createdAtMs?: number;
approvedAtMs?: number;
lastConnectedAtMs?: number;
};
export type PairingList = {
pending: PendingRequest[];
paired: PairedNode[];
};
export type {
NodeListNode,
PairedNode,
PairingList,
PendingRequest,
} from "../../shared/node-list-types.js";

View File

@@ -1,5 +1,4 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { MANIFEST_KEY } from "../compat/legacy-names.js";
import {
@@ -11,8 +10,12 @@ import {
} from "../infra/archive.js";
import { installPackageDir } from "../infra/install-package-dir.js";
import { resolveSafeInstallDir, unscopedPackageName } from "../infra/install-safe-path.js";
import {
packNpmSpecToArchive,
resolveArchiveSourcePath,
withTempDir,
} from "../infra/install-source-utils.js";
import { validateRegistryNpmSpec } from "../infra/npm-registry-spec.js";
import { runCommandWithTimeout } from "../process/exec.js";
import { CONFIG_DIR, resolveUserPath } from "../utils.js";
import { parseFrontmatter } from "./frontmatter.js";
@@ -105,15 +108,6 @@ function resolveTimedHookInstallModeOptions(params: {
};
}
async function withTempDir<T>(prefix: string, fn: (tmpDir: string) => Promise<T>): Promise<T> {
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), prefix));
try {
return await fn(tmpDir);
} finally {
await fs.rm(tmpDir, { recursive: true, force: true }).catch(() => undefined);
}
}
async function resolveInstallTargetDir(
id: string,
hooksDir?: string,
@@ -325,15 +319,11 @@ export async function installHooksFromArchive(params: {
}): Promise<InstallHooksResult> {
const logger = params.logger ?? defaultLogger;
const timeoutMs = params.timeoutMs ?? 120_000;
const archivePath = resolveUserPath(params.archivePath);
if (!(await fileExists(archivePath))) {
return { ok: false, error: `archive not found: ${archivePath}` };
}
if (!resolveArchiveKind(archivePath)) {
return { ok: false, error: `unsupported archive: ${archivePath}` };
const archivePathResult = await resolveArchiveSourcePath(params.archivePath);
if (!archivePathResult.ok) {
return archivePathResult;
}
const archivePath = archivePathResult.path;
return await withTempDir("openclaw-hook-", async (tmpDir) => {
const extractDir = path.join(tmpDir, "extract");
@@ -396,30 +386,17 @@ export async function installHooksFromNpmSpec(params: {
return await withTempDir("openclaw-hook-pack-", async (tmpDir) => {
logger.info?.(`Downloading ${spec}`);
const res = await runCommandWithTimeout(["npm", "pack", spec, "--ignore-scripts"], {
timeoutMs: Math.max(timeoutMs, 300_000),
const packedResult = await packNpmSpecToArchive({
spec,
timeoutMs,
cwd: tmpDir,
env: {
COREPACK_ENABLE_DOWNLOAD_PROMPT: "0",
NPM_CONFIG_IGNORE_SCRIPTS: "true",
},
});
if (res.code !== 0) {
return { ok: false, error: `npm pack failed: ${res.stderr.trim() || res.stdout.trim()}` };
if (!packedResult.ok) {
return packedResult;
}
const packed = (res.stdout || "")
.split("\n")
.map((l) => l.trim())
.filter(Boolean)
.pop();
if (!packed) {
return { ok: false, error: "npm pack produced no archive" };
}
const archivePath = path.join(tmpDir, packed);
return await installHooksFromArchive({
archivePath,
archivePath: packedResult.archivePath,
hooksDir: params.hooksDir,
timeoutMs,
logger,

View File

@@ -0,0 +1,78 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { runCommandWithTimeout } from "../process/exec.js";
import { resolveUserPath } from "../utils.js";
import { fileExists, resolveArchiveKind } from "./archive.js";
export async function withTempDir<T>(
prefix: string,
fn: (tmpDir: string) => Promise<T>,
): Promise<T> {
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), prefix));
try {
return await fn(tmpDir);
} finally {
await fs.rm(tmpDir, { recursive: true, force: true }).catch(() => undefined);
}
}
export async function resolveArchiveSourcePath(archivePath: string): Promise<
| {
ok: true;
path: string;
}
| {
ok: false;
error: string;
}
> {
const resolved = resolveUserPath(archivePath);
if (!(await fileExists(resolved))) {
return { ok: false, error: `archive not found: ${resolved}` };
}
if (!resolveArchiveKind(resolved)) {
return { ok: false, error: `unsupported archive: ${resolved}` };
}
return { ok: true, path: resolved };
}
export async function packNpmSpecToArchive(params: {
spec: string;
timeoutMs: number;
cwd: string;
}): Promise<
| {
ok: true;
archivePath: string;
}
| {
ok: false;
error: string;
}
> {
const res = await runCommandWithTimeout(["npm", "pack", params.spec, "--ignore-scripts"], {
timeoutMs: Math.max(params.timeoutMs, 300_000),
cwd: params.cwd,
env: {
COREPACK_ENABLE_DOWNLOAD_PROMPT: "0",
NPM_CONFIG_IGNORE_SCRIPTS: "true",
},
});
if (res.code !== 0) {
return { ok: false, error: `npm pack failed: ${res.stderr.trim() || res.stdout.trim()}` };
}
const packed = (res.stdout || "")
.split("\n")
.map((line) => line.trim())
.filter(Boolean)
.pop();
if (!packed) {
return { ok: false, error: "npm pack produced no archive" };
}
return { ok: true, archivePath: path.join(params.cwd, packed) };
}

View File

@@ -366,40 +366,49 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
};
}
private async embedChunksWithVoyageBatch(
chunks: MemoryChunk[],
entry: MemoryFileEntry | SessionFileEntry,
source: MemorySource,
): Promise<number[][]> {
const voyage = this.voyage;
if (!voyage) {
return this.embedChunksInBatches(chunks);
private async embedChunksWithProviderBatch<TRequest extends { custom_id: string }>(params: {
chunks: MemoryChunk[];
entry: MemoryFileEntry | SessionFileEntry;
source: MemorySource;
provider: "voyage" | "openai" | "gemini";
enabled: boolean;
buildRequest: (chunk: MemoryChunk) => Omit<TRequest, "custom_id">;
runBatch: (runnerOptions: {
agentId: string;
requests: TRequest[];
wait: boolean;
concurrency: number;
pollIntervalMs: number;
timeoutMs: number;
debug: (message: string, data?: Record<string, unknown>) => void;
}) => Promise<Map<string, number[]> | number[][]>;
}): Promise<number[][]> {
if (!params.enabled) {
return this.embedChunksInBatches(params.chunks);
}
if (chunks.length === 0) {
if (params.chunks.length === 0) {
return [];
}
const { embeddings, missing } = this.collectCachedEmbeddings(chunks);
const { embeddings, missing } = this.collectCachedEmbeddings(params.chunks);
if (missing.length === 0) {
return embeddings;
}
const { requests, mapping } = this.buildBatchRequests<VoyageBatchRequest>({
const { requests, mapping } = this.buildBatchRequests<TRequest>({
missing,
entry,
source,
build: (chunk) => ({
body: { input: chunk.text },
}),
entry: params.entry,
source: params.source,
build: params.buildRequest,
});
const runnerOptions = this.buildEmbeddingBatchRunnerOptions({
requests,
chunks: params.chunks,
source: params.source,
});
const runnerOptions = this.buildEmbeddingBatchRunnerOptions({ requests, chunks, source });
const batchResult = await this.runBatchWithFallback({
provider: "voyage",
run: async () =>
await runVoyageEmbeddingBatches({
client: voyage,
...runnerOptions,
}),
fallback: async () => await this.embedChunksInBatches(chunks),
provider: params.provider,
run: async () => await params.runBatch(runnerOptions),
fallback: async () => await this.embedChunksInBatches(params.chunks),
});
if (Array.isArray(batchResult)) {
return batchResult;
@@ -408,51 +417,55 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
return embeddings;
}
private async embedChunksWithVoyageBatch(
chunks: MemoryChunk[],
entry: MemoryFileEntry | SessionFileEntry,
source: MemorySource,
): Promise<number[][]> {
const voyage = this.voyage;
return await this.embedChunksWithProviderBatch<VoyageBatchRequest>({
chunks,
entry,
source,
provider: "voyage",
enabled: Boolean(voyage),
buildRequest: (chunk) => ({
body: { input: chunk.text },
}),
runBatch: async (runnerOptions) =>
await runVoyageEmbeddingBatches({
client: voyage!,
...runnerOptions,
}),
});
}
private async embedChunksWithOpenAiBatch(
chunks: MemoryChunk[],
entry: MemoryFileEntry | SessionFileEntry,
source: MemorySource,
): Promise<number[][]> {
const openAi = this.openAi;
if (!openAi) {
return this.embedChunksInBatches(chunks);
}
if (chunks.length === 0) {
return [];
}
const { embeddings, missing } = this.collectCachedEmbeddings(chunks);
if (missing.length === 0) {
return embeddings;
}
const { requests, mapping } = this.buildBatchRequests<OpenAiBatchRequest>({
missing,
return await this.embedChunksWithProviderBatch<OpenAiBatchRequest>({
chunks,
entry,
source,
build: (chunk) => ({
provider: "openai",
enabled: Boolean(openAi),
buildRequest: (chunk) => ({
method: "POST",
url: OPENAI_BATCH_ENDPOINT,
body: {
model: this.openAi?.model ?? this.provider?.model ?? "text-embedding-3-small",
model: openAi?.model ?? this.provider?.model ?? "text-embedding-3-small",
input: chunk.text,
},
}),
});
const runnerOptions = this.buildEmbeddingBatchRunnerOptions({ requests, chunks, source });
const batchResult = await this.runBatchWithFallback({
provider: "openai",
run: async () =>
runBatch: async (runnerOptions) =>
await runOpenAiEmbeddingBatches({
openAi,
openAi: openAi!,
...runnerOptions,
}),
fallback: async () => await this.embedChunksInBatches(chunks),
});
if (Array.isArray(batchResult)) {
return batchResult;
}
this.applyBatchEmbeddings({ byCustomId: batchResult, mapping, embeddings });
return embeddings;
}
private async embedChunksWithGeminiBatch(
@@ -461,42 +474,22 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
source: MemorySource,
): Promise<number[][]> {
const gemini = this.gemini;
if (!gemini) {
return this.embedChunksInBatches(chunks);
}
if (chunks.length === 0) {
return [];
}
const { embeddings, missing } = this.collectCachedEmbeddings(chunks);
if (missing.length === 0) {
return embeddings;
}
const { requests, mapping } = this.buildBatchRequests<GeminiBatchRequest>({
missing,
return await this.embedChunksWithProviderBatch<GeminiBatchRequest>({
chunks,
entry,
source,
build: (chunk) => ({
provider: "gemini",
enabled: Boolean(gemini),
buildRequest: (chunk) => ({
content: { parts: [{ text: chunk.text }] },
taskType: "RETRIEVAL_DOCUMENT",
}),
});
const runnerOptions = this.buildEmbeddingBatchRunnerOptions({ requests, chunks, source });
const batchResult = await this.runBatchWithFallback({
provider: "gemini",
run: async () =>
runBatch: async (runnerOptions) =>
await runGeminiEmbeddingBatches({
gemini,
gemini: gemini!,
...runnerOptions,
}),
fallback: async () => await this.embedChunksInBatches(chunks),
});
if (Array.isArray(batchResult)) {
return batchResult;
}
this.applyBatchEmbeddings({ byCustomId: batchResult, mapping, embeddings });
return embeddings;
}
protected async embedBatchWithRetry(texts: string[]): Promise<number[][]> {

View File

@@ -1,5 +1,4 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { MANIFEST_KEY } from "../compat/legacy-names.js";
import {
@@ -15,8 +14,12 @@ import {
safeDirName,
unscopedPackageName,
} from "../infra/install-safe-path.js";
import {
packNpmSpecToArchive,
resolveArchiveSourcePath,
withTempDir,
} from "../infra/install-source-utils.js";
import { validateRegistryNpmSpec } from "../infra/npm-registry-spec.js";
import { runCommandWithTimeout } from "../process/exec.js";
import { extensionUsesSkippedScannerPath, isPathInside } from "../security/scan-paths.js";
import * as skillScanner from "../security/skill-scanner.js";
import { CONFIG_DIR, resolveUserPath } from "../utils.js";
@@ -298,18 +301,13 @@ export async function installPluginFromArchive(params: {
const logger = params.logger ?? defaultLogger;
const timeoutMs = params.timeoutMs ?? 120_000;
const mode = params.mode ?? "install";
const archivePath = resolveUserPath(params.archivePath);
if (!(await fileExists(archivePath))) {
return { ok: false, error: `archive not found: ${archivePath}` };
const archivePathResult = await resolveArchiveSourcePath(params.archivePath);
if (!archivePathResult.ok) {
return archivePathResult;
}
const archivePath = archivePathResult.path;
if (!resolveArchiveKind(archivePath)) {
return { ok: false, error: `unsupported archive: ${archivePath}` };
}
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-plugin-"));
try {
return await withTempDir("openclaw-plugin-", async (tmpDir) => {
const extractDir = path.join(tmpDir, "extract");
await fs.mkdir(extractDir, { recursive: true });
@@ -341,9 +339,7 @@ export async function installPluginFromArchive(params: {
dryRun: params.dryRun,
expectedPluginId: params.expectedPluginId,
});
} finally {
await fs.rm(tmpDir, { recursive: true, force: true }).catch(() => undefined);
}
});
}
export async function installPluginFromDir(params: {
@@ -433,36 +429,19 @@ export async function installPluginFromNpmSpec(params: {
return { ok: false, error: specError };
}
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-npm-pack-"));
try {
return await withTempDir("openclaw-npm-pack-", async (tmpDir) => {
logger.info?.(`Downloading ${spec}`);
const res = await runCommandWithTimeout(["npm", "pack", spec, "--ignore-scripts"], {
timeoutMs: Math.max(timeoutMs, 300_000),
const packedResult = await packNpmSpecToArchive({
spec,
timeoutMs,
cwd: tmpDir,
env: {
COREPACK_ENABLE_DOWNLOAD_PROMPT: "0",
NPM_CONFIG_IGNORE_SCRIPTS: "true",
},
});
if (res.code !== 0) {
return {
ok: false,
error: `npm pack failed: ${res.stderr.trim() || res.stdout.trim()}`,
};
if (!packedResult.ok) {
return packedResult;
}
const packed = (res.stdout || "")
.split("\n")
.map((l) => l.trim())
.filter(Boolean)
.pop();
if (!packed) {
return { ok: false, error: "npm pack produced no archive" };
}
const archivePath = path.join(tmpDir, packed);
return await installPluginFromArchive({
archivePath,
archivePath: packedResult.archivePath,
extensionsDir: params.extensionsDir,
timeoutMs,
logger,
@@ -470,9 +449,7 @@ export async function installPluginFromNpmSpec(params: {
dryRun,
expectedPluginId,
});
} finally {
await fs.rm(tmpDir, { recursive: true, force: true }).catch(() => undefined);
}
});
}
export async function installPluginFromPath(params: {

View File

@@ -0,0 +1,51 @@
export type NodeListNode = {
nodeId: string;
displayName?: string;
platform?: string;
version?: string;
coreVersion?: string;
uiVersion?: string;
remoteIp?: string;
deviceFamily?: string;
modelIdentifier?: string;
pathEnv?: string;
caps?: string[];
commands?: string[];
permissions?: Record<string, boolean>;
paired?: boolean;
connected?: boolean;
connectedAtMs?: number;
};
export type PendingRequest = {
requestId: string;
nodeId: string;
displayName?: string;
platform?: string;
version?: string;
coreVersion?: string;
uiVersion?: string;
remoteIp?: string;
isRepair?: boolean;
ts: number;
};
export type PairedNode = {
nodeId: string;
token?: string;
displayName?: string;
platform?: string;
version?: string;
coreVersion?: string;
uiVersion?: string;
remoteIp?: string;
permissions?: Record<string, boolean>;
createdAtMs?: number;
approvedAtMs?: number;
lastConnectedAtMs?: number;
};
export type PairingList = {
pending: PendingRequest[];
paired: PairedNode[];
};