fix(executor): isolated-vm worker pool to prevent single-worker bottleneck

This commit is contained in:
waleed
2026-02-06 11:35:48 -08:00
parent ed5ed97c07
commit 965f1bc65f
2 changed files with 283 additions and 138 deletions

View File

@@ -180,6 +180,13 @@ export const env = createEnv({
EXECUTION_TIMEOUT_ASYNC_TEAM: z.string().optional().default('5400'), // 90 minutes
EXECUTION_TIMEOUT_ASYNC_ENTERPRISE: z.string().optional().default('5400'), // 90 minutes
// Isolated-VM Worker Pool Configuration
IVM_POOL_SIZE: z.string().optional().default('4'), // Max worker processes in pool
IVM_MAX_CONCURRENT: z.string().optional().default('10000'), // Max concurrent executions globally
IVM_MAX_PER_WORKER: z.string().optional().default('2500'), // Max concurrent executions per worker
IVM_WORKER_IDLE_TIMEOUT_MS: z.string().optional().default('60000'), // Worker idle cleanup timeout (ms)
IVM_QUEUE_TIMEOUT_MS: z.string().optional().default('300000'), // Max queue wait before rejection (ms)
// Knowledge Base Processing Configuration - Shared across all processing methods
KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes)
KB_CONFIG_MAX_ATTEMPTS: z.number().optional().default(3), // Max retry attempts

View File

@@ -3,6 +3,7 @@ import fs from 'node:fs'
import path from 'node:path'
import { fileURLToPath } from 'node:url'
import { createLogger } from '@sim/logger'
import { env } from '@/lib/core/config/env'
import { validateProxyUrl } from '@/lib/core/security/input-validation'
const logger = createLogger('IsolatedVMExecution')
@@ -44,48 +45,40 @@ export interface IsolatedVMError {
lineContent?: string
}
const POOL_SIZE = Number.parseInt(env.IVM_POOL_SIZE) || 4
const MAX_CONCURRENT = Number.parseInt(env.IVM_MAX_CONCURRENT) || 10000
const MAX_PER_WORKER = Number.parseInt(env.IVM_MAX_PER_WORKER) || 2500
const WORKER_IDLE_TIMEOUT_MS = Number.parseInt(env.IVM_WORKER_IDLE_TIMEOUT_MS) || 60000
const QUEUE_TIMEOUT_MS = Number.parseInt(env.IVM_QUEUE_TIMEOUT_MS) || 300000
interface PendingExecution {
resolve: (result: IsolatedVMExecutionResult) => void
timeout: ReturnType<typeof setTimeout>
}
let worker: ChildProcess | null = null
let workerReady = false
let workerReadyPromise: Promise<void> | null = null
let workerIdleTimeout: ReturnType<typeof setTimeout> | null = null
const pendingExecutions = new Map<number, PendingExecution>()
interface WorkerInfo {
process: ChildProcess
ready: boolean
readyPromise: Promise<void> | null
activeExecutions: number
pendingExecutions: Map<number, PendingExecution>
idleTimeout: ReturnType<typeof setTimeout> | null
id: number
}
interface QueuedExecution {
req: IsolatedVMExecutionRequest
resolve: (result: IsolatedVMExecutionResult) => void
queueTimeout: ReturnType<typeof setTimeout>
}
const workers: Map<number, WorkerInfo> = new Map()
const executionQueue: QueuedExecution[] = []
let totalActiveExecutions = 0
let executionIdCounter = 0
let nextWorkerId = 0
let spawnInProgress = 0
const WORKER_IDLE_TIMEOUT_MS = 60000
function cleanupWorker() {
if (workerIdleTimeout) {
clearTimeout(workerIdleTimeout)
workerIdleTimeout = null
}
if (worker) {
worker.kill()
worker = null
}
workerReady = false
workerReadyPromise = null
}
function resetIdleTimeout() {
if (workerIdleTimeout) {
clearTimeout(workerIdleTimeout)
}
workerIdleTimeout = setTimeout(() => {
if (pendingExecutions.size === 0) {
logger.info('Cleaning up idle isolated-vm worker')
cleanupWorker()
}
}, WORKER_IDLE_TIMEOUT_MS)
}
/**
* Secure fetch wrapper that validates URLs to prevent SSRF attacks
*/
async function secureFetch(requestId: string, url: string, options?: RequestInit): Promise<string> {
const validation = validateProxyUrl(url)
if (!validation.isValid) {
@@ -115,19 +108,22 @@ async function secureFetch(requestId: string, url: string, options?: RequestInit
}
}
/**
* Handle IPC messages from the Node.js worker
*/
function handleWorkerMessage(message: unknown) {
function handleWorkerMessage(workerId: number, message: unknown) {
if (typeof message !== 'object' || message === null) return
const msg = message as Record<string, unknown>
const workerInfo = workers.get(workerId)
if (msg.type === 'result') {
const pending = pendingExecutions.get(msg.executionId as number)
const execId = msg.executionId as number
const pending = workerInfo?.pendingExecutions.get(execId)
if (pending) {
clearTimeout(pending.timeout)
pendingExecutions.delete(msg.executionId as number)
workerInfo!.pendingExecutions.delete(execId)
workerInfo!.activeExecutions--
totalActiveExecutions--
pending.resolve(msg.result as IsolatedVMExecutionResult)
resetWorkerIdleTimeout(workerId)
drainQueue()
}
return
}
@@ -144,7 +140,7 @@ function handleWorkerMessage(message: unknown) {
try {
options = JSON.parse(optionsJson)
} catch {
worker?.send({
workerInfo?.process.send({
type: 'fetchResponse',
fetchId,
response: JSON.stringify({ error: 'Invalid fetch options JSON' }),
@@ -155,14 +151,14 @@ function handleWorkerMessage(message: unknown) {
secureFetch(requestId, url, options)
.then((response) => {
try {
worker?.send({ type: 'fetchResponse', fetchId, response })
workerInfo?.process.send({ type: 'fetchResponse', fetchId, response })
} catch (err) {
logger.error('Failed to send fetch response to worker', { err, fetchId })
logger.error('Failed to send fetch response to worker', { err, fetchId, workerId })
}
})
.catch((err) => {
try {
worker?.send({
workerInfo?.process.send({
type: 'fetchResponse',
fetchId,
response: JSON.stringify({
@@ -170,21 +166,74 @@ function handleWorkerMessage(message: unknown) {
}),
})
} catch (sendErr) {
logger.error('Failed to send fetch error to worker', { sendErr, fetchId })
logger.error('Failed to send fetch error to worker', { sendErr, fetchId, workerId })
}
})
}
}
/**
* Start the Node.js worker process
*/
async function ensureWorker(): Promise<void> {
if (workerReady && worker) return
if (workerReadyPromise) return workerReadyPromise
function cleanupWorker(workerId: number) {
const workerInfo = workers.get(workerId)
if (!workerInfo) return
workerReadyPromise = new Promise<void>((resolve, reject) => {
if (workerInfo.idleTimeout) {
clearTimeout(workerInfo.idleTimeout)
}
workerInfo.process.kill()
for (const [id, pending] of workerInfo.pendingExecutions) {
clearTimeout(pending.timeout)
totalActiveExecutions--
pending.resolve({
result: null,
stdout: '',
error: { message: 'Worker process exited unexpectedly', name: 'WorkerError' },
})
workerInfo.pendingExecutions.delete(id)
}
workers.delete(workerId)
logger.info('Worker removed from pool', { workerId, poolSize: workers.size })
}
function resetWorkerIdleTimeout(workerId: number) {
const workerInfo = workers.get(workerId)
if (!workerInfo) return
if (workerInfo.idleTimeout) {
clearTimeout(workerInfo.idleTimeout)
workerInfo.idleTimeout = null
}
if (workerInfo.activeExecutions === 0) {
workerInfo.idleTimeout = setTimeout(() => {
const w = workers.get(workerId)
if (w && w.activeExecutions === 0) {
logger.info('Cleaning up idle worker', { workerId })
cleanupWorker(workerId)
}
}, WORKER_IDLE_TIMEOUT_MS)
}
}
function spawnWorker(): Promise<WorkerInfo> {
const workerId = nextWorkerId++
spawnInProgress++
const workerInfo: WorkerInfo = {
process: null as unknown as ChildProcess,
ready: false,
readyPromise: null,
activeExecutions: 0,
pendingExecutions: new Map(),
idleTimeout: null,
id: workerId,
}
workerInfo.readyPromise = new Promise<void>((resolve, reject) => {
if (!checkNodeAvailable()) {
spawnInProgress--
reject(
new Error(
'Node.js is required for code execution but was not found. ' +
@@ -198,28 +247,29 @@ async function ensureWorker(): Promise<void> {
const workerPath = path.join(currentDir, 'isolated-vm-worker.cjs')
if (!fs.existsSync(workerPath)) {
spawnInProgress--
reject(new Error(`Worker file not found at ${workerPath}`))
return
}
import('node:child_process').then(({ spawn }) => {
worker = spawn('node', [workerPath], {
const proc = spawn('node', [workerPath], {
stdio: ['ignore', 'pipe', 'pipe', 'ipc'],
serialization: 'json',
})
workerInfo.process = proc
worker.on('message', handleWorkerMessage)
proc.on('message', (message: unknown) => handleWorkerMessage(workerId, message))
let stderrData = ''
worker.stderr?.on('data', (data: Buffer) => {
proc.stderr?.on('data', (data: Buffer) => {
stderrData += data.toString()
})
const startTimeout = setTimeout(() => {
worker?.kill()
worker = null
workerReady = false
workerReadyPromise = null
proc.kill()
spawnInProgress--
workers.delete(workerId)
reject(new Error('Worker failed to start within timeout'))
}, 10000)
@@ -229,110 +279,198 @@ async function ensureWorker(): Promise<void> {
message !== null &&
(message as { type?: string }).type === 'ready'
) {
workerReady = true
workerInfo.ready = true
spawnInProgress--
clearTimeout(startTimeout)
worker?.off('message', readyHandler)
proc.off('message', readyHandler)
workers.set(workerId, workerInfo)
resetWorkerIdleTimeout(workerId)
logger.info('Worker spawned and ready', { workerId, poolSize: workers.size })
resolve()
}
}
worker.on('message', readyHandler)
proc.on('message', readyHandler)
worker.on('exit', (code) => {
if (workerIdleTimeout) {
clearTimeout(workerIdleTimeout)
workerIdleTimeout = null
}
const wasStartupFailure = !workerReady && workerReadyPromise
worker = null
workerReady = false
workerReadyPromise = null
let errorMessage = 'Worker process exited unexpectedly'
if (stderrData.includes('isolated_vm') || stderrData.includes('MODULE_NOT_FOUND')) {
errorMessage =
'Code execution requires the isolated-vm native module which failed to load. ' +
'This usually means the module needs to be rebuilt for your Node.js version. ' +
'Please run: cd node_modules/isolated-vm && npm rebuild'
logger.error('isolated-vm module failed to load', { stderr: stderrData })
} else if (stderrData) {
errorMessage = `Worker process failed: ${stderrData.slice(0, 500)}`
logger.error('Worker process failed', { stderr: stderrData })
}
proc.on('exit', () => {
const wasStartupFailure = !workerInfo.ready
if (wasStartupFailure) {
spawnInProgress--
clearTimeout(startTimeout)
let errorMessage = 'Worker process exited unexpectedly'
if (stderrData.includes('isolated_vm') || stderrData.includes('MODULE_NOT_FOUND')) {
errorMessage =
'Code execution requires the isolated-vm native module which failed to load. ' +
'This usually means the module needs to be rebuilt for your Node.js version. ' +
'Please run: cd node_modules/isolated-vm && npm rebuild'
logger.error('isolated-vm module failed to load', { stderr: stderrData, workerId })
} else if (stderrData) {
errorMessage = `Worker process failed: ${stderrData.slice(0, 500)}`
logger.error('Worker process failed', { stderr: stderrData, workerId })
}
reject(new Error(errorMessage))
return
}
for (const [id, pending] of pendingExecutions) {
clearTimeout(pending.timeout)
pending.resolve({
result: null,
stdout: '',
error: { message: errorMessage, name: 'WorkerError' },
})
pendingExecutions.delete(id)
}
cleanupWorker(workerId)
drainQueue()
})
})
})
return workerReadyPromise
return workerInfo.readyPromise.then(() => workerInfo)
}
/**
* Returns the ready worker with the fewest active executions that still
* has capacity, or null if none available.
*/
function selectWorker(): WorkerInfo | null {
let best: WorkerInfo | null = null
for (const w of workers.values()) {
if (!w.ready) continue
if (w.activeExecutions >= MAX_PER_WORKER) continue
if (!best || w.activeExecutions < best.activeExecutions) {
best = w
}
}
return best
}
/**
* Tries to get an existing worker with capacity, or spawns a new one if the
* pool is not full. Returns null when the pool is at capacity and all workers
* are saturated (caller should enqueue).
*/
async function acquireWorker(): Promise<WorkerInfo | null> {
const existing = selectWorker()
if (existing) return existing
const currentPoolSize = workers.size + spawnInProgress
if (currentPoolSize < POOL_SIZE) {
try {
return await spawnWorker()
} catch (error) {
logger.error('Failed to spawn worker', { error })
return null
}
}
return null
}
function dispatchToWorker(
workerInfo: WorkerInfo,
req: IsolatedVMExecutionRequest,
resolve: (result: IsolatedVMExecutionResult) => void
) {
const execId = ++executionIdCounter
if (workerInfo.idleTimeout) {
clearTimeout(workerInfo.idleTimeout)
workerInfo.idleTimeout = null
}
const timeout = setTimeout(() => {
workerInfo.pendingExecutions.delete(execId)
workerInfo.activeExecutions--
totalActiveExecutions--
resolve({
result: null,
stdout: '',
error: { message: `Execution timed out after ${req.timeoutMs}ms`, name: 'TimeoutError' },
})
resetWorkerIdleTimeout(workerInfo.id)
drainQueue()
}, req.timeoutMs + 1000)
workerInfo.pendingExecutions.set(execId, { resolve, timeout })
workerInfo.activeExecutions++
totalActiveExecutions++
try {
workerInfo.process.send({ type: 'execute', executionId: execId, request: req })
} catch {
clearTimeout(timeout)
workerInfo.pendingExecutions.delete(execId)
workerInfo.activeExecutions--
totalActiveExecutions--
resolve({
result: null,
stdout: '',
error: { message: 'Failed to send execution request to worker', name: 'WorkerError' },
})
resetWorkerIdleTimeout(workerInfo.id)
drainQueue()
}
}
function enqueueExecution(
req: IsolatedVMExecutionRequest,
resolve: (result: IsolatedVMExecutionResult) => void
) {
const queueTimeout = setTimeout(() => {
const idx = executionQueue.findIndex((q) => q.resolve === resolve)
if (idx !== -1) {
executionQueue.splice(idx, 1)
resolve({
result: null,
stdout: '',
error: {
message: `Execution queued too long (${QUEUE_TIMEOUT_MS}ms). All workers are busy.`,
name: 'QueueTimeoutError',
},
})
}
}, QUEUE_TIMEOUT_MS)
executionQueue.push({ req, resolve, queueTimeout })
logger.info('Execution queued', {
queueLength: executionQueue.length,
totalActive: totalActiveExecutions,
poolSize: workers.size,
})
}
/**
* Called after every completion or worker spawn — dispatches queued
* executions to available workers.
*/
function drainQueue() {
while (executionQueue.length > 0 && totalActiveExecutions < MAX_CONCURRENT) {
const worker = selectWorker()
if (!worker) {
const currentPoolSize = workers.size + spawnInProgress
if (currentPoolSize < POOL_SIZE) {
spawnWorker()
.then(() => drainQueue())
.catch((err) => logger.error('Failed to spawn worker during drain', { err }))
}
break
}
const queued = executionQueue.shift()!
clearTimeout(queued.queueTimeout)
dispatchToWorker(worker, queued.req, queued.resolve)
}
}
/**
* Execute JavaScript code in an isolated V8 isolate via Node.js subprocess.
* The worker's V8 isolate enforces timeoutMs internally. The parent timeout
* (timeoutMs + 1000) is a safety buffer for IPC communication.
*/
export async function executeInIsolatedVM(
req: IsolatedVMExecutionRequest
): Promise<IsolatedVMExecutionResult> {
if (workerIdleTimeout) {
clearTimeout(workerIdleTimeout)
workerIdleTimeout = null
if (totalActiveExecutions >= MAX_CONCURRENT) {
return new Promise((resolve) => enqueueExecution(req, resolve))
}
await ensureWorker()
if (!worker) {
return {
result: null,
stdout: '',
error: { message: 'Failed to start isolated-vm worker', name: 'WorkerError' },
}
const workerInfo = await acquireWorker()
if (!workerInfo) {
return new Promise((resolve) => enqueueExecution(req, resolve))
}
const executionId = ++executionIdCounter
return new Promise((resolve) => {
const timeout = setTimeout(() => {
pendingExecutions.delete(executionId)
resolve({
result: null,
stdout: '',
error: { message: `Execution timed out after ${req.timeoutMs}ms`, name: 'TimeoutError' },
})
}, req.timeoutMs + 1000)
pendingExecutions.set(executionId, { resolve, timeout })
try {
worker!.send({ type: 'execute', executionId, request: req })
} catch {
clearTimeout(timeout)
pendingExecutions.delete(executionId)
resolve({
result: null,
stdout: '',
error: { message: 'Failed to send execution request to worker', name: 'WorkerError' },
})
return
}
resetIdleTimeout()
})
return new Promise((resolve) => dispatchToWorker(workerInfo, req, resolve))
}