From 8a9fddedc9ad3b5ca64ce8eb63cd08ab6de3bab5 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 18 Feb 2026 04:48:30 +0000 Subject: [PATCH] refactor: extract shared install and embedding utilities --- src/agents/tools/nodes-utils.ts | 55 ++-------- src/cli/nodes-cli/types.ts | 57 ++--------- src/hooks/install.ts | 53 +++------- src/infra/install-source-utils.ts | 78 +++++++++++++++ src/memory/manager-embedding-ops.ts | 149 +++++++++++++--------------- src/plugins/install.ts | 61 ++++-------- src/shared/node-list-types.ts | 51 ++++++++++ 7 files changed, 247 insertions(+), 257 deletions(-) create mode 100644 src/infra/install-source-utils.ts create mode 100644 src/shared/node-list-types.ts diff --git a/src/agents/tools/nodes-utils.ts b/src/agents/tools/nodes-utils.ts index 121a65400c..49f43c6000 100644 --- a/src/agents/tools/nodes-utils.ts +++ b/src/agents/tools/nodes-utils.ts @@ -1,54 +1,13 @@ +import type { + NodeListNode, + PairedNode, + PairingList, + PendingRequest, +} from "../../shared/node-list-types.js"; import { resolveNodeIdFromCandidates } from "../../shared/node-match.js"; import { callGatewayTool, type GatewayCallOptions } from "./gateway.js"; -export type NodeListNode = { - nodeId: string; - displayName?: string; - platform?: string; - version?: string; - coreVersion?: string; - uiVersion?: string; - remoteIp?: string; - deviceFamily?: string; - modelIdentifier?: string; - caps?: string[]; - commands?: string[]; - permissions?: Record; - paired?: boolean; - connected?: boolean; -}; - -type PendingRequest = { - requestId: string; - nodeId: string; - displayName?: string; - platform?: string; - version?: string; - coreVersion?: string; - uiVersion?: string; - remoteIp?: string; - isRepair?: boolean; - ts: number; -}; - -type PairedNode = { - nodeId: string; - token?: string; - displayName?: string; - platform?: string; - version?: string; - coreVersion?: string; - uiVersion?: string; - remoteIp?: string; - permissions?: Record; - createdAtMs?: number; - approvedAtMs?: number; -}; - -type PairingList = { - pending: PendingRequest[]; - paired: PairedNode[]; -}; +export type { NodeListNode }; function parseNodeList(value: unknown): NodeListNode[] { const obj = typeof value === "object" && value !== null ? (value as Record) : {}; diff --git a/src/cli/nodes-cli/types.ts b/src/cli/nodes-cli/types.ts index 77f10318e8..c978fa6f0a 100644 --- a/src/cli/nodes-cli/types.ts +++ b/src/cli/nodes-cli/types.ts @@ -43,54 +43,9 @@ export type NodesRpcOpts = { audio?: boolean; }; -export type NodeListNode = { - nodeId: string; - displayName?: string; - platform?: string; - version?: string; - coreVersion?: string; - uiVersion?: string; - remoteIp?: string; - deviceFamily?: string; - modelIdentifier?: string; - pathEnv?: string; - caps?: string[]; - commands?: string[]; - permissions?: Record; - paired?: boolean; - connected?: boolean; - connectedAtMs?: number; -}; - -export type PendingRequest = { - requestId: string; - nodeId: string; - displayName?: string; - platform?: string; - version?: string; - coreVersion?: string; - uiVersion?: string; - remoteIp?: string; - isRepair?: boolean; - ts: number; -}; - -export type PairedNode = { - nodeId: string; - token?: string; - displayName?: string; - platform?: string; - version?: string; - coreVersion?: string; - uiVersion?: string; - remoteIp?: string; - permissions?: Record; - createdAtMs?: number; - approvedAtMs?: number; - lastConnectedAtMs?: number; -}; - -export type PairingList = { - pending: PendingRequest[]; - paired: PairedNode[]; -}; +export type { + NodeListNode, + PairedNode, + PairingList, + PendingRequest, +} from "../../shared/node-list-types.js"; diff --git a/src/hooks/install.ts b/src/hooks/install.ts index 99ffb26343..2a1daac746 100644 --- a/src/hooks/install.ts +++ b/src/hooks/install.ts @@ -1,5 +1,4 @@ import fs from "node:fs/promises"; -import os from "node:os"; import path from "node:path"; import { MANIFEST_KEY } from "../compat/legacy-names.js"; import { @@ -11,8 +10,12 @@ import { } from "../infra/archive.js"; import { installPackageDir } from "../infra/install-package-dir.js"; import { resolveSafeInstallDir, unscopedPackageName } from "../infra/install-safe-path.js"; +import { + packNpmSpecToArchive, + resolveArchiveSourcePath, + withTempDir, +} from "../infra/install-source-utils.js"; import { validateRegistryNpmSpec } from "../infra/npm-registry-spec.js"; -import { runCommandWithTimeout } from "../process/exec.js"; import { CONFIG_DIR, resolveUserPath } from "../utils.js"; import { parseFrontmatter } from "./frontmatter.js"; @@ -105,15 +108,6 @@ function resolveTimedHookInstallModeOptions(params: { }; } -async function withTempDir(prefix: string, fn: (tmpDir: string) => Promise): Promise { - const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), prefix)); - try { - return await fn(tmpDir); - } finally { - await fs.rm(tmpDir, { recursive: true, force: true }).catch(() => undefined); - } -} - async function resolveInstallTargetDir( id: string, hooksDir?: string, @@ -325,15 +319,11 @@ export async function installHooksFromArchive(params: { }): Promise { const logger = params.logger ?? defaultLogger; const timeoutMs = params.timeoutMs ?? 120_000; - - const archivePath = resolveUserPath(params.archivePath); - if (!(await fileExists(archivePath))) { - return { ok: false, error: `archive not found: ${archivePath}` }; - } - - if (!resolveArchiveKind(archivePath)) { - return { ok: false, error: `unsupported archive: ${archivePath}` }; + const archivePathResult = await resolveArchiveSourcePath(params.archivePath); + if (!archivePathResult.ok) { + return archivePathResult; } + const archivePath = archivePathResult.path; return await withTempDir("openclaw-hook-", async (tmpDir) => { const extractDir = path.join(tmpDir, "extract"); @@ -396,30 +386,17 @@ export async function installHooksFromNpmSpec(params: { return await withTempDir("openclaw-hook-pack-", async (tmpDir) => { logger.info?.(`Downloading ${spec}…`); - const res = await runCommandWithTimeout(["npm", "pack", spec, "--ignore-scripts"], { - timeoutMs: Math.max(timeoutMs, 300_000), + const packedResult = await packNpmSpecToArchive({ + spec, + timeoutMs, cwd: tmpDir, - env: { - COREPACK_ENABLE_DOWNLOAD_PROMPT: "0", - NPM_CONFIG_IGNORE_SCRIPTS: "true", - }, }); - if (res.code !== 0) { - return { ok: false, error: `npm pack failed: ${res.stderr.trim() || res.stdout.trim()}` }; + if (!packedResult.ok) { + return packedResult; } - const packed = (res.stdout || "") - .split("\n") - .map((l) => l.trim()) - .filter(Boolean) - .pop(); - if (!packed) { - return { ok: false, error: "npm pack produced no archive" }; - } - - const archivePath = path.join(tmpDir, packed); return await installHooksFromArchive({ - archivePath, + archivePath: packedResult.archivePath, hooksDir: params.hooksDir, timeoutMs, logger, diff --git a/src/infra/install-source-utils.ts b/src/infra/install-source-utils.ts new file mode 100644 index 0000000000..f51be1e7e4 --- /dev/null +++ b/src/infra/install-source-utils.ts @@ -0,0 +1,78 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { runCommandWithTimeout } from "../process/exec.js"; +import { resolveUserPath } from "../utils.js"; +import { fileExists, resolveArchiveKind } from "./archive.js"; + +export async function withTempDir( + prefix: string, + fn: (tmpDir: string) => Promise, +): Promise { + const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), prefix)); + try { + return await fn(tmpDir); + } finally { + await fs.rm(tmpDir, { recursive: true, force: true }).catch(() => undefined); + } +} + +export async function resolveArchiveSourcePath(archivePath: string): Promise< + | { + ok: true; + path: string; + } + | { + ok: false; + error: string; + } +> { + const resolved = resolveUserPath(archivePath); + if (!(await fileExists(resolved))) { + return { ok: false, error: `archive not found: ${resolved}` }; + } + + if (!resolveArchiveKind(resolved)) { + return { ok: false, error: `unsupported archive: ${resolved}` }; + } + + return { ok: true, path: resolved }; +} + +export async function packNpmSpecToArchive(params: { + spec: string; + timeoutMs: number; + cwd: string; +}): Promise< + | { + ok: true; + archivePath: string; + } + | { + ok: false; + error: string; + } +> { + const res = await runCommandWithTimeout(["npm", "pack", params.spec, "--ignore-scripts"], { + timeoutMs: Math.max(params.timeoutMs, 300_000), + cwd: params.cwd, + env: { + COREPACK_ENABLE_DOWNLOAD_PROMPT: "0", + NPM_CONFIG_IGNORE_SCRIPTS: "true", + }, + }); + if (res.code !== 0) { + return { ok: false, error: `npm pack failed: ${res.stderr.trim() || res.stdout.trim()}` }; + } + + const packed = (res.stdout || "") + .split("\n") + .map((line) => line.trim()) + .filter(Boolean) + .pop(); + if (!packed) { + return { ok: false, error: "npm pack produced no archive" }; + } + + return { ok: true, archivePath: path.join(params.cwd, packed) }; +} diff --git a/src/memory/manager-embedding-ops.ts b/src/memory/manager-embedding-ops.ts index 45ebd5626c..51e95b136b 100644 --- a/src/memory/manager-embedding-ops.ts +++ b/src/memory/manager-embedding-ops.ts @@ -366,40 +366,49 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps { }; } - private async embedChunksWithVoyageBatch( - chunks: MemoryChunk[], - entry: MemoryFileEntry | SessionFileEntry, - source: MemorySource, - ): Promise { - const voyage = this.voyage; - if (!voyage) { - return this.embedChunksInBatches(chunks); + private async embedChunksWithProviderBatch(params: { + chunks: MemoryChunk[]; + entry: MemoryFileEntry | SessionFileEntry; + source: MemorySource; + provider: "voyage" | "openai" | "gemini"; + enabled: boolean; + buildRequest: (chunk: MemoryChunk) => Omit; + runBatch: (runnerOptions: { + agentId: string; + requests: TRequest[]; + wait: boolean; + concurrency: number; + pollIntervalMs: number; + timeoutMs: number; + debug: (message: string, data?: Record) => void; + }) => Promise | number[][]>; + }): Promise { + if (!params.enabled) { + return this.embedChunksInBatches(params.chunks); } - if (chunks.length === 0) { + if (params.chunks.length === 0) { return []; } - const { embeddings, missing } = this.collectCachedEmbeddings(chunks); + const { embeddings, missing } = this.collectCachedEmbeddings(params.chunks); if (missing.length === 0) { return embeddings; } - const { requests, mapping } = this.buildBatchRequests({ + const { requests, mapping } = this.buildBatchRequests({ missing, - entry, - source, - build: (chunk) => ({ - body: { input: chunk.text }, - }), + entry: params.entry, + source: params.source, + build: params.buildRequest, + }); + const runnerOptions = this.buildEmbeddingBatchRunnerOptions({ + requests, + chunks: params.chunks, + source: params.source, }); - const runnerOptions = this.buildEmbeddingBatchRunnerOptions({ requests, chunks, source }); const batchResult = await this.runBatchWithFallback({ - provider: "voyage", - run: async () => - await runVoyageEmbeddingBatches({ - client: voyage, - ...runnerOptions, - }), - fallback: async () => await this.embedChunksInBatches(chunks), + provider: params.provider, + run: async () => await params.runBatch(runnerOptions), + fallback: async () => await this.embedChunksInBatches(params.chunks), }); if (Array.isArray(batchResult)) { return batchResult; @@ -408,51 +417,55 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps { return embeddings; } + private async embedChunksWithVoyageBatch( + chunks: MemoryChunk[], + entry: MemoryFileEntry | SessionFileEntry, + source: MemorySource, + ): Promise { + const voyage = this.voyage; + return await this.embedChunksWithProviderBatch({ + chunks, + entry, + source, + provider: "voyage", + enabled: Boolean(voyage), + buildRequest: (chunk) => ({ + body: { input: chunk.text }, + }), + runBatch: async (runnerOptions) => + await runVoyageEmbeddingBatches({ + client: voyage!, + ...runnerOptions, + }), + }); + } + private async embedChunksWithOpenAiBatch( chunks: MemoryChunk[], entry: MemoryFileEntry | SessionFileEntry, source: MemorySource, ): Promise { const openAi = this.openAi; - if (!openAi) { - return this.embedChunksInBatches(chunks); - } - if (chunks.length === 0) { - return []; - } - const { embeddings, missing } = this.collectCachedEmbeddings(chunks); - if (missing.length === 0) { - return embeddings; - } - - const { requests, mapping } = this.buildBatchRequests({ - missing, + return await this.embedChunksWithProviderBatch({ + chunks, entry, source, - build: (chunk) => ({ + provider: "openai", + enabled: Boolean(openAi), + buildRequest: (chunk) => ({ method: "POST", url: OPENAI_BATCH_ENDPOINT, body: { - model: this.openAi?.model ?? this.provider?.model ?? "text-embedding-3-small", + model: openAi?.model ?? this.provider?.model ?? "text-embedding-3-small", input: chunk.text, }, }), - }); - const runnerOptions = this.buildEmbeddingBatchRunnerOptions({ requests, chunks, source }); - const batchResult = await this.runBatchWithFallback({ - provider: "openai", - run: async () => + runBatch: async (runnerOptions) => await runOpenAiEmbeddingBatches({ - openAi, + openAi: openAi!, ...runnerOptions, }), - fallback: async () => await this.embedChunksInBatches(chunks), }); - if (Array.isArray(batchResult)) { - return batchResult; - } - this.applyBatchEmbeddings({ byCustomId: batchResult, mapping, embeddings }); - return embeddings; } private async embedChunksWithGeminiBatch( @@ -461,42 +474,22 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps { source: MemorySource, ): Promise { const gemini = this.gemini; - if (!gemini) { - return this.embedChunksInBatches(chunks); - } - if (chunks.length === 0) { - return []; - } - const { embeddings, missing } = this.collectCachedEmbeddings(chunks); - if (missing.length === 0) { - return embeddings; - } - - const { requests, mapping } = this.buildBatchRequests({ - missing, + return await this.embedChunksWithProviderBatch({ + chunks, entry, source, - build: (chunk) => ({ + provider: "gemini", + enabled: Boolean(gemini), + buildRequest: (chunk) => ({ content: { parts: [{ text: chunk.text }] }, taskType: "RETRIEVAL_DOCUMENT", }), - }); - const runnerOptions = this.buildEmbeddingBatchRunnerOptions({ requests, chunks, source }); - - const batchResult = await this.runBatchWithFallback({ - provider: "gemini", - run: async () => + runBatch: async (runnerOptions) => await runGeminiEmbeddingBatches({ - gemini, + gemini: gemini!, ...runnerOptions, }), - fallback: async () => await this.embedChunksInBatches(chunks), }); - if (Array.isArray(batchResult)) { - return batchResult; - } - this.applyBatchEmbeddings({ byCustomId: batchResult, mapping, embeddings }); - return embeddings; } protected async embedBatchWithRetry(texts: string[]): Promise { diff --git a/src/plugins/install.ts b/src/plugins/install.ts index c50dbee294..537ece54a2 100644 --- a/src/plugins/install.ts +++ b/src/plugins/install.ts @@ -1,5 +1,4 @@ import fs from "node:fs/promises"; -import os from "node:os"; import path from "node:path"; import { MANIFEST_KEY } from "../compat/legacy-names.js"; import { @@ -15,8 +14,12 @@ import { safeDirName, unscopedPackageName, } from "../infra/install-safe-path.js"; +import { + packNpmSpecToArchive, + resolveArchiveSourcePath, + withTempDir, +} from "../infra/install-source-utils.js"; import { validateRegistryNpmSpec } from "../infra/npm-registry-spec.js"; -import { runCommandWithTimeout } from "../process/exec.js"; import { extensionUsesSkippedScannerPath, isPathInside } from "../security/scan-paths.js"; import * as skillScanner from "../security/skill-scanner.js"; import { CONFIG_DIR, resolveUserPath } from "../utils.js"; @@ -298,18 +301,13 @@ export async function installPluginFromArchive(params: { const logger = params.logger ?? defaultLogger; const timeoutMs = params.timeoutMs ?? 120_000; const mode = params.mode ?? "install"; - - const archivePath = resolveUserPath(params.archivePath); - if (!(await fileExists(archivePath))) { - return { ok: false, error: `archive not found: ${archivePath}` }; + const archivePathResult = await resolveArchiveSourcePath(params.archivePath); + if (!archivePathResult.ok) { + return archivePathResult; } + const archivePath = archivePathResult.path; - if (!resolveArchiveKind(archivePath)) { - return { ok: false, error: `unsupported archive: ${archivePath}` }; - } - - const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-plugin-")); - try { + return await withTempDir("openclaw-plugin-", async (tmpDir) => { const extractDir = path.join(tmpDir, "extract"); await fs.mkdir(extractDir, { recursive: true }); @@ -341,9 +339,7 @@ export async function installPluginFromArchive(params: { dryRun: params.dryRun, expectedPluginId: params.expectedPluginId, }); - } finally { - await fs.rm(tmpDir, { recursive: true, force: true }).catch(() => undefined); - } + }); } export async function installPluginFromDir(params: { @@ -433,36 +429,19 @@ export async function installPluginFromNpmSpec(params: { return { ok: false, error: specError }; } - const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-npm-pack-")); - try { + return await withTempDir("openclaw-npm-pack-", async (tmpDir) => { logger.info?.(`Downloading ${spec}…`); - const res = await runCommandWithTimeout(["npm", "pack", spec, "--ignore-scripts"], { - timeoutMs: Math.max(timeoutMs, 300_000), + const packedResult = await packNpmSpecToArchive({ + spec, + timeoutMs, cwd: tmpDir, - env: { - COREPACK_ENABLE_DOWNLOAD_PROMPT: "0", - NPM_CONFIG_IGNORE_SCRIPTS: "true", - }, }); - if (res.code !== 0) { - return { - ok: false, - error: `npm pack failed: ${res.stderr.trim() || res.stdout.trim()}`, - }; + if (!packedResult.ok) { + return packedResult; } - const packed = (res.stdout || "") - .split("\n") - .map((l) => l.trim()) - .filter(Boolean) - .pop(); - if (!packed) { - return { ok: false, error: "npm pack produced no archive" }; - } - - const archivePath = path.join(tmpDir, packed); return await installPluginFromArchive({ - archivePath, + archivePath: packedResult.archivePath, extensionsDir: params.extensionsDir, timeoutMs, logger, @@ -470,9 +449,7 @@ export async function installPluginFromNpmSpec(params: { dryRun, expectedPluginId, }); - } finally { - await fs.rm(tmpDir, { recursive: true, force: true }).catch(() => undefined); - } + }); } export async function installPluginFromPath(params: { diff --git a/src/shared/node-list-types.ts b/src/shared/node-list-types.ts new file mode 100644 index 0000000000..04bd587abe --- /dev/null +++ b/src/shared/node-list-types.ts @@ -0,0 +1,51 @@ +export type NodeListNode = { + nodeId: string; + displayName?: string; + platform?: string; + version?: string; + coreVersion?: string; + uiVersion?: string; + remoteIp?: string; + deviceFamily?: string; + modelIdentifier?: string; + pathEnv?: string; + caps?: string[]; + commands?: string[]; + permissions?: Record; + paired?: boolean; + connected?: boolean; + connectedAtMs?: number; +}; + +export type PendingRequest = { + requestId: string; + nodeId: string; + displayName?: string; + platform?: string; + version?: string; + coreVersion?: string; + uiVersion?: string; + remoteIp?: string; + isRepair?: boolean; + ts: number; +}; + +export type PairedNode = { + nodeId: string; + token?: string; + displayName?: string; + platform?: string; + version?: string; + coreVersion?: string; + uiVersion?: string; + remoteIp?: string; + permissions?: Record; + createdAtMs?: number; + approvedAtMs?: number; + lastConnectedAtMs?: number; +}; + +export type PairingList = { + pending: PendingRequest[]; + paired: PairedNode[]; +};