From 965f1bc65f5e9ab508908bcba83f218230fa75eb Mon Sep 17 00:00:00 2001 From: waleed Date: Fri, 6 Feb 2026 11:35:48 -0800 Subject: [PATCH] fix(executor): isolated-vm worker pool to prevent single-worker bottleneck --- apps/sim/lib/core/config/env.ts | 7 + apps/sim/lib/execution/isolated-vm.ts | 414 +++++++++++++++++--------- 2 files changed, 283 insertions(+), 138 deletions(-) diff --git a/apps/sim/lib/core/config/env.ts b/apps/sim/lib/core/config/env.ts index 8440de3bc..961b10b84 100644 --- a/apps/sim/lib/core/config/env.ts +++ b/apps/sim/lib/core/config/env.ts @@ -180,6 +180,13 @@ export const env = createEnv({ EXECUTION_TIMEOUT_ASYNC_TEAM: z.string().optional().default('5400'), // 90 minutes EXECUTION_TIMEOUT_ASYNC_ENTERPRISE: z.string().optional().default('5400'), // 90 minutes + // Isolated-VM Worker Pool Configuration + IVM_POOL_SIZE: z.string().optional().default('4'), // Max worker processes in pool + IVM_MAX_CONCURRENT: z.string().optional().default('10000'), // Max concurrent executions globally + IVM_MAX_PER_WORKER: z.string().optional().default('2500'), // Max concurrent executions per worker + IVM_WORKER_IDLE_TIMEOUT_MS: z.string().optional().default('60000'), // Worker idle cleanup timeout (ms) + IVM_QUEUE_TIMEOUT_MS: z.string().optional().default('300000'), // Max queue wait before rejection (ms) + // Knowledge Base Processing Configuration - Shared across all processing methods KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes) KB_CONFIG_MAX_ATTEMPTS: z.number().optional().default(3), // Max retry attempts diff --git a/apps/sim/lib/execution/isolated-vm.ts b/apps/sim/lib/execution/isolated-vm.ts index 472fc12b2..f3f0ed8cc 100644 --- a/apps/sim/lib/execution/isolated-vm.ts +++ b/apps/sim/lib/execution/isolated-vm.ts @@ -3,6 +3,7 @@ import fs from 'node:fs' import path from 'node:path' import { fileURLToPath } from 'node:url' import { createLogger } from '@sim/logger' +import { env } from '@/lib/core/config/env' import { validateProxyUrl } from '@/lib/core/security/input-validation' const logger = createLogger('IsolatedVMExecution') @@ -44,48 +45,40 @@ export interface IsolatedVMError { lineContent?: string } +const POOL_SIZE = Number.parseInt(env.IVM_POOL_SIZE) || 4 +const MAX_CONCURRENT = Number.parseInt(env.IVM_MAX_CONCURRENT) || 10000 +const MAX_PER_WORKER = Number.parseInt(env.IVM_MAX_PER_WORKER) || 2500 +const WORKER_IDLE_TIMEOUT_MS = Number.parseInt(env.IVM_WORKER_IDLE_TIMEOUT_MS) || 60000 +const QUEUE_TIMEOUT_MS = Number.parseInt(env.IVM_QUEUE_TIMEOUT_MS) || 300000 + interface PendingExecution { resolve: (result: IsolatedVMExecutionResult) => void timeout: ReturnType } -let worker: ChildProcess | null = null -let workerReady = false -let workerReadyPromise: Promise | null = null -let workerIdleTimeout: ReturnType | null = null -const pendingExecutions = new Map() +interface WorkerInfo { + process: ChildProcess + ready: boolean + readyPromise: Promise | null + activeExecutions: number + pendingExecutions: Map + idleTimeout: ReturnType | null + id: number +} + +interface QueuedExecution { + req: IsolatedVMExecutionRequest + resolve: (result: IsolatedVMExecutionResult) => void + queueTimeout: ReturnType +} + +const workers: Map = new Map() +const executionQueue: QueuedExecution[] = [] +let totalActiveExecutions = 0 let executionIdCounter = 0 +let nextWorkerId = 0 +let spawnInProgress = 0 -const WORKER_IDLE_TIMEOUT_MS = 60000 - -function cleanupWorker() { - if (workerIdleTimeout) { - clearTimeout(workerIdleTimeout) - workerIdleTimeout = null - } - if (worker) { - worker.kill() - worker = null - } - workerReady = false - workerReadyPromise = null -} - -function resetIdleTimeout() { - if (workerIdleTimeout) { - clearTimeout(workerIdleTimeout) - } - workerIdleTimeout = setTimeout(() => { - if (pendingExecutions.size === 0) { - logger.info('Cleaning up idle isolated-vm worker') - cleanupWorker() - } - }, WORKER_IDLE_TIMEOUT_MS) -} - -/** - * Secure fetch wrapper that validates URLs to prevent SSRF attacks - */ async function secureFetch(requestId: string, url: string, options?: RequestInit): Promise { const validation = validateProxyUrl(url) if (!validation.isValid) { @@ -115,19 +108,22 @@ async function secureFetch(requestId: string, url: string, options?: RequestInit } } -/** - * Handle IPC messages from the Node.js worker - */ -function handleWorkerMessage(message: unknown) { +function handleWorkerMessage(workerId: number, message: unknown) { if (typeof message !== 'object' || message === null) return const msg = message as Record + const workerInfo = workers.get(workerId) if (msg.type === 'result') { - const pending = pendingExecutions.get(msg.executionId as number) + const execId = msg.executionId as number + const pending = workerInfo?.pendingExecutions.get(execId) if (pending) { clearTimeout(pending.timeout) - pendingExecutions.delete(msg.executionId as number) + workerInfo!.pendingExecutions.delete(execId) + workerInfo!.activeExecutions-- + totalActiveExecutions-- pending.resolve(msg.result as IsolatedVMExecutionResult) + resetWorkerIdleTimeout(workerId) + drainQueue() } return } @@ -144,7 +140,7 @@ function handleWorkerMessage(message: unknown) { try { options = JSON.parse(optionsJson) } catch { - worker?.send({ + workerInfo?.process.send({ type: 'fetchResponse', fetchId, response: JSON.stringify({ error: 'Invalid fetch options JSON' }), @@ -155,14 +151,14 @@ function handleWorkerMessage(message: unknown) { secureFetch(requestId, url, options) .then((response) => { try { - worker?.send({ type: 'fetchResponse', fetchId, response }) + workerInfo?.process.send({ type: 'fetchResponse', fetchId, response }) } catch (err) { - logger.error('Failed to send fetch response to worker', { err, fetchId }) + logger.error('Failed to send fetch response to worker', { err, fetchId, workerId }) } }) .catch((err) => { try { - worker?.send({ + workerInfo?.process.send({ type: 'fetchResponse', fetchId, response: JSON.stringify({ @@ -170,21 +166,74 @@ function handleWorkerMessage(message: unknown) { }), }) } catch (sendErr) { - logger.error('Failed to send fetch error to worker', { sendErr, fetchId }) + logger.error('Failed to send fetch error to worker', { sendErr, fetchId, workerId }) } }) } } -/** - * Start the Node.js worker process - */ -async function ensureWorker(): Promise { - if (workerReady && worker) return - if (workerReadyPromise) return workerReadyPromise +function cleanupWorker(workerId: number) { + const workerInfo = workers.get(workerId) + if (!workerInfo) return - workerReadyPromise = new Promise((resolve, reject) => { + if (workerInfo.idleTimeout) { + clearTimeout(workerInfo.idleTimeout) + } + + workerInfo.process.kill() + + for (const [id, pending] of workerInfo.pendingExecutions) { + clearTimeout(pending.timeout) + totalActiveExecutions-- + pending.resolve({ + result: null, + stdout: '', + error: { message: 'Worker process exited unexpectedly', name: 'WorkerError' }, + }) + workerInfo.pendingExecutions.delete(id) + } + + workers.delete(workerId) + logger.info('Worker removed from pool', { workerId, poolSize: workers.size }) +} + +function resetWorkerIdleTimeout(workerId: number) { + const workerInfo = workers.get(workerId) + if (!workerInfo) return + + if (workerInfo.idleTimeout) { + clearTimeout(workerInfo.idleTimeout) + workerInfo.idleTimeout = null + } + + if (workerInfo.activeExecutions === 0) { + workerInfo.idleTimeout = setTimeout(() => { + const w = workers.get(workerId) + if (w && w.activeExecutions === 0) { + logger.info('Cleaning up idle worker', { workerId }) + cleanupWorker(workerId) + } + }, WORKER_IDLE_TIMEOUT_MS) + } +} + +function spawnWorker(): Promise { + const workerId = nextWorkerId++ + spawnInProgress++ + + const workerInfo: WorkerInfo = { + process: null as unknown as ChildProcess, + ready: false, + readyPromise: null, + activeExecutions: 0, + pendingExecutions: new Map(), + idleTimeout: null, + id: workerId, + } + + workerInfo.readyPromise = new Promise((resolve, reject) => { if (!checkNodeAvailable()) { + spawnInProgress-- reject( new Error( 'Node.js is required for code execution but was not found. ' + @@ -198,28 +247,29 @@ async function ensureWorker(): Promise { const workerPath = path.join(currentDir, 'isolated-vm-worker.cjs') if (!fs.existsSync(workerPath)) { + spawnInProgress-- reject(new Error(`Worker file not found at ${workerPath}`)) return } import('node:child_process').then(({ spawn }) => { - worker = spawn('node', [workerPath], { + const proc = spawn('node', [workerPath], { stdio: ['ignore', 'pipe', 'pipe', 'ipc'], serialization: 'json', }) + workerInfo.process = proc - worker.on('message', handleWorkerMessage) + proc.on('message', (message: unknown) => handleWorkerMessage(workerId, message)) let stderrData = '' - worker.stderr?.on('data', (data: Buffer) => { + proc.stderr?.on('data', (data: Buffer) => { stderrData += data.toString() }) const startTimeout = setTimeout(() => { - worker?.kill() - worker = null - workerReady = false - workerReadyPromise = null + proc.kill() + spawnInProgress-- + workers.delete(workerId) reject(new Error('Worker failed to start within timeout')) }, 10000) @@ -229,110 +279,198 @@ async function ensureWorker(): Promise { message !== null && (message as { type?: string }).type === 'ready' ) { - workerReady = true + workerInfo.ready = true + spawnInProgress-- clearTimeout(startTimeout) - worker?.off('message', readyHandler) + proc.off('message', readyHandler) + workers.set(workerId, workerInfo) + resetWorkerIdleTimeout(workerId) + logger.info('Worker spawned and ready', { workerId, poolSize: workers.size }) resolve() } } - worker.on('message', readyHandler) + proc.on('message', readyHandler) - worker.on('exit', (code) => { - if (workerIdleTimeout) { - clearTimeout(workerIdleTimeout) - workerIdleTimeout = null - } - - const wasStartupFailure = !workerReady && workerReadyPromise - - worker = null - workerReady = false - workerReadyPromise = null - - let errorMessage = 'Worker process exited unexpectedly' - if (stderrData.includes('isolated_vm') || stderrData.includes('MODULE_NOT_FOUND')) { - errorMessage = - 'Code execution requires the isolated-vm native module which failed to load. ' + - 'This usually means the module needs to be rebuilt for your Node.js version. ' + - 'Please run: cd node_modules/isolated-vm && npm rebuild' - logger.error('isolated-vm module failed to load', { stderr: stderrData }) - } else if (stderrData) { - errorMessage = `Worker process failed: ${stderrData.slice(0, 500)}` - logger.error('Worker process failed', { stderr: stderrData }) - } + proc.on('exit', () => { + const wasStartupFailure = !workerInfo.ready if (wasStartupFailure) { + spawnInProgress-- clearTimeout(startTimeout) + + let errorMessage = 'Worker process exited unexpectedly' + if (stderrData.includes('isolated_vm') || stderrData.includes('MODULE_NOT_FOUND')) { + errorMessage = + 'Code execution requires the isolated-vm native module which failed to load. ' + + 'This usually means the module needs to be rebuilt for your Node.js version. ' + + 'Please run: cd node_modules/isolated-vm && npm rebuild' + logger.error('isolated-vm module failed to load', { stderr: stderrData, workerId }) + } else if (stderrData) { + errorMessage = `Worker process failed: ${stderrData.slice(0, 500)}` + logger.error('Worker process failed', { stderr: stderrData, workerId }) + } + reject(new Error(errorMessage)) return } - for (const [id, pending] of pendingExecutions) { - clearTimeout(pending.timeout) - pending.resolve({ - result: null, - stdout: '', - error: { message: errorMessage, name: 'WorkerError' }, - }) - pendingExecutions.delete(id) - } + cleanupWorker(workerId) + drainQueue() }) }) }) - return workerReadyPromise + return workerInfo.readyPromise.then(() => workerInfo) +} + +/** + * Returns the ready worker with the fewest active executions that still + * has capacity, or null if none available. + */ +function selectWorker(): WorkerInfo | null { + let best: WorkerInfo | null = null + for (const w of workers.values()) { + if (!w.ready) continue + if (w.activeExecutions >= MAX_PER_WORKER) continue + if (!best || w.activeExecutions < best.activeExecutions) { + best = w + } + } + return best +} + +/** + * Tries to get an existing worker with capacity, or spawns a new one if the + * pool is not full. Returns null when the pool is at capacity and all workers + * are saturated (caller should enqueue). + */ +async function acquireWorker(): Promise { + const existing = selectWorker() + if (existing) return existing + + const currentPoolSize = workers.size + spawnInProgress + if (currentPoolSize < POOL_SIZE) { + try { + return await spawnWorker() + } catch (error) { + logger.error('Failed to spawn worker', { error }) + return null + } + } + + return null +} + +function dispatchToWorker( + workerInfo: WorkerInfo, + req: IsolatedVMExecutionRequest, + resolve: (result: IsolatedVMExecutionResult) => void +) { + const execId = ++executionIdCounter + + if (workerInfo.idleTimeout) { + clearTimeout(workerInfo.idleTimeout) + workerInfo.idleTimeout = null + } + + const timeout = setTimeout(() => { + workerInfo.pendingExecutions.delete(execId) + workerInfo.activeExecutions-- + totalActiveExecutions-- + resolve({ + result: null, + stdout: '', + error: { message: `Execution timed out after ${req.timeoutMs}ms`, name: 'TimeoutError' }, + }) + resetWorkerIdleTimeout(workerInfo.id) + drainQueue() + }, req.timeoutMs + 1000) + + workerInfo.pendingExecutions.set(execId, { resolve, timeout }) + workerInfo.activeExecutions++ + totalActiveExecutions++ + + try { + workerInfo.process.send({ type: 'execute', executionId: execId, request: req }) + } catch { + clearTimeout(timeout) + workerInfo.pendingExecutions.delete(execId) + workerInfo.activeExecutions-- + totalActiveExecutions-- + resolve({ + result: null, + stdout: '', + error: { message: 'Failed to send execution request to worker', name: 'WorkerError' }, + }) + resetWorkerIdleTimeout(workerInfo.id) + drainQueue() + } +} + +function enqueueExecution( + req: IsolatedVMExecutionRequest, + resolve: (result: IsolatedVMExecutionResult) => void +) { + const queueTimeout = setTimeout(() => { + const idx = executionQueue.findIndex((q) => q.resolve === resolve) + if (idx !== -1) { + executionQueue.splice(idx, 1) + resolve({ + result: null, + stdout: '', + error: { + message: `Execution queued too long (${QUEUE_TIMEOUT_MS}ms). All workers are busy.`, + name: 'QueueTimeoutError', + }, + }) + } + }, QUEUE_TIMEOUT_MS) + + executionQueue.push({ req, resolve, queueTimeout }) + logger.info('Execution queued', { + queueLength: executionQueue.length, + totalActive: totalActiveExecutions, + poolSize: workers.size, + }) +} + +/** + * Called after every completion or worker spawn — dispatches queued + * executions to available workers. + */ +function drainQueue() { + while (executionQueue.length > 0 && totalActiveExecutions < MAX_CONCURRENT) { + const worker = selectWorker() + if (!worker) { + const currentPoolSize = workers.size + spawnInProgress + if (currentPoolSize < POOL_SIZE) { + spawnWorker() + .then(() => drainQueue()) + .catch((err) => logger.error('Failed to spawn worker during drain', { err })) + } + break + } + + const queued = executionQueue.shift()! + clearTimeout(queued.queueTimeout) + dispatchToWorker(worker, queued.req, queued.resolve) + } } /** * Execute JavaScript code in an isolated V8 isolate via Node.js subprocess. - * The worker's V8 isolate enforces timeoutMs internally. The parent timeout - * (timeoutMs + 1000) is a safety buffer for IPC communication. */ export async function executeInIsolatedVM( req: IsolatedVMExecutionRequest ): Promise { - if (workerIdleTimeout) { - clearTimeout(workerIdleTimeout) - workerIdleTimeout = null + if (totalActiveExecutions >= MAX_CONCURRENT) { + return new Promise((resolve) => enqueueExecution(req, resolve)) } - await ensureWorker() - - if (!worker) { - return { - result: null, - stdout: '', - error: { message: 'Failed to start isolated-vm worker', name: 'WorkerError' }, - } + const workerInfo = await acquireWorker() + if (!workerInfo) { + return new Promise((resolve) => enqueueExecution(req, resolve)) } - const executionId = ++executionIdCounter - - return new Promise((resolve) => { - const timeout = setTimeout(() => { - pendingExecutions.delete(executionId) - resolve({ - result: null, - stdout: '', - error: { message: `Execution timed out after ${req.timeoutMs}ms`, name: 'TimeoutError' }, - }) - }, req.timeoutMs + 1000) - - pendingExecutions.set(executionId, { resolve, timeout }) - - try { - worker!.send({ type: 'execute', executionId, request: req }) - } catch { - clearTimeout(timeout) - pendingExecutions.delete(executionId) - resolve({ - result: null, - stdout: '', - error: { message: 'Failed to send execution request to worker', name: 'WorkerError' }, - }) - return - } - - resetIdleTimeout() - }) + return new Promise((resolve) => dispatchToWorker(workerInfo, req, resolve)) }