diff --git a/apps/sim/app/api/function/execute/route.ts b/apps/sim/app/api/function/execute/route.ts index 4ccbd8d7c..441bf788d 100644 --- a/apps/sim/app/api/function/execute/route.ts +++ b/apps/sim/app/api/function/execute/route.ts @@ -845,6 +845,8 @@ export async function POST(req: NextRequest) { contextVariables, timeoutMs: timeout, requestId, + ownerKey: `user:${auth.userId}`, + ownerWeight: 1, }) const executionTime = Date.now() - startTime diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 7c4cdc9db..06984a3e2 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -325,6 +325,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: requestId ) + // Client-side sessions and personal API keys bill/permission-check the + // authenticated user, not the workspace billed account. + const useAuthenticatedUserAsActor = + isClientSession || (auth.authType === 'api_key' && auth.apiKeyType === 'personal') + const preprocessResult = await preprocessExecution({ workflowId, userId, @@ -334,6 +339,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: checkDeployment: !shouldUseDraftState, loggingSession, useDraftState: shouldUseDraftState, + useAuthenticatedUserAsActor, }) if (!preprocessResult.success) { diff --git a/apps/sim/executor/handlers/agent/agent-handler.ts b/apps/sim/executor/handlers/agent/agent-handler.ts index 40c7b9ba8..1a883d0af 100644 --- a/apps/sim/executor/handlers/agent/agent-handler.ts +++ b/apps/sim/executor/handlers/agent/agent-handler.ts @@ -307,6 +307,7 @@ export class AgentBlockHandler implements BlockHandler { _context: { workflowId: ctx.workflowId, workspaceId: ctx.workspaceId, + userId: ctx.userId, isDeployedContext: ctx.isDeployedContext, }, }, diff --git a/apps/sim/executor/handlers/api/api-handler.ts b/apps/sim/executor/handlers/api/api-handler.ts index 562067cdf..83c710bef 100644 --- a/apps/sim/executor/handlers/api/api-handler.ts +++ b/apps/sim/executor/handlers/api/api-handler.ts @@ -72,6 +72,7 @@ export class ApiBlockHandler implements BlockHandler { workflowId: ctx.workflowId, workspaceId: ctx.workspaceId, executionId: ctx.executionId, + userId: ctx.userId, isDeployedContext: ctx.isDeployedContext, }, }, diff --git a/apps/sim/executor/handlers/condition/condition-handler.ts b/apps/sim/executor/handlers/condition/condition-handler.ts index 96fe0db4b..0c88e0e78 100644 --- a/apps/sim/executor/handlers/condition/condition-handler.ts +++ b/apps/sim/executor/handlers/condition/condition-handler.ts @@ -48,6 +48,7 @@ export async function evaluateConditionExpression( _context: { workflowId: ctx.workflowId, workspaceId: ctx.workspaceId, + userId: ctx.userId, isDeployedContext: ctx.isDeployedContext, }, }, diff --git a/apps/sim/executor/handlers/function/function-handler.ts b/apps/sim/executor/handlers/function/function-handler.ts index 624a262d3..d8e1209e5 100644 --- a/apps/sim/executor/handlers/function/function-handler.ts +++ b/apps/sim/executor/handlers/function/function-handler.ts @@ -39,6 +39,7 @@ export class FunctionBlockHandler implements BlockHandler { _context: { workflowId: ctx.workflowId, workspaceId: ctx.workspaceId, + userId: ctx.userId, isDeployedContext: ctx.isDeployedContext, }, }, diff --git a/apps/sim/executor/handlers/generic/generic-handler.ts b/apps/sim/executor/handlers/generic/generic-handler.ts index 558a37dee..c6a6b7e9f 100644 --- a/apps/sim/executor/handlers/generic/generic-handler.ts +++ b/apps/sim/executor/handlers/generic/generic-handler.ts @@ -66,6 +66,7 @@ export class GenericBlockHandler implements BlockHandler { workflowId: ctx.workflowId, workspaceId: ctx.workspaceId, executionId: ctx.executionId, + userId: ctx.userId, isDeployedContext: ctx.isDeployedContext, }, }, diff --git a/apps/sim/executor/handlers/human-in-the-loop/human-in-the-loop-handler.ts b/apps/sim/executor/handlers/human-in-the-loop/human-in-the-loop-handler.ts index dd53a0a0e..2a23c622c 100644 --- a/apps/sim/executor/handlers/human-in-the-loop/human-in-the-loop-handler.ts +++ b/apps/sim/executor/handlers/human-in-the-loop/human-in-the-loop-handler.ts @@ -605,6 +605,7 @@ export class HumanInTheLoopBlockHandler implements BlockHandler { _context: { workflowId: ctx.workflowId, workspaceId: ctx.workspaceId, + userId: ctx.userId, isDeployedContext: ctx.isDeployedContext, }, blockData: blockDataWithPause, diff --git a/apps/sim/executor/orchestrators/loop.ts b/apps/sim/executor/orchestrators/loop.ts index 8bdf8edd2..456838d1e 100644 --- a/apps/sim/executor/orchestrators/loop.ts +++ b/apps/sim/executor/orchestrators/loop.ts @@ -511,6 +511,8 @@ export class LoopOrchestrator { contextVariables: {}, timeoutMs: LOOP_CONDITION_TIMEOUT_MS, requestId, + ownerKey: `user:${ctx.userId}`, + ownerWeight: 1, }) if (vmResult.error) { diff --git a/apps/sim/lib/auth/hybrid.ts b/apps/sim/lib/auth/hybrid.ts index 2b49d7158..1c34286f6 100644 --- a/apps/sim/lib/auth/hybrid.ts +++ b/apps/sim/lib/auth/hybrid.ts @@ -1,7 +1,4 @@ -import { db } from '@sim/db' -import { workflow } from '@sim/db/schema' import { createLogger } from '@sim/logger' -import { eq } from 'drizzle-orm' import type { NextRequest } from 'next/server' import { authenticateApiKeyFromHeader, updateApiKeyLastUsed } from '@/lib/api-key/service' import { getSession } from '@/lib/auth' @@ -13,35 +10,33 @@ export interface AuthResult { success: boolean userId?: string authType?: 'session' | 'api_key' | 'internal_jwt' + apiKeyType?: 'personal' | 'workspace' error?: string } /** * Resolves userId from a verified internal JWT token. - * Extracts workflowId/userId from URL params or POST body, then looks up userId if needed. + * Extracts userId from the JWT payload, URL search params, or POST body. */ async function resolveUserFromJwt( request: NextRequest, verificationUserId: string | null, options: { requireWorkflowId?: boolean } ): Promise { - let workflowId: string | null = null let userId: string | null = verificationUserId - const { searchParams } = new URL(request.url) - workflowId = searchParams.get('workflowId') if (!userId) { + const { searchParams } = new URL(request.url) userId = searchParams.get('userId') } - if (!workflowId && !userId && request.method === 'POST') { + if (!userId && request.method === 'POST') { try { const clonedRequest = request.clone() const bodyText = await clonedRequest.text() if (bodyText) { const body = JSON.parse(bodyText) - workflowId = body.workflowId || body._context?.workflowId - userId = userId || body.userId || body._context?.userId + userId = body.userId || body._context?.userId || null } } catch { // Ignore JSON parse errors @@ -52,22 +47,8 @@ async function resolveUserFromJwt( return { success: true, userId, authType: 'internal_jwt' } } - if (workflowId) { - const [workflowData] = await db - .select({ userId: workflow.userId }) - .from(workflow) - .where(eq(workflow.id, workflowId)) - .limit(1) - - if (!workflowData) { - return { success: false, error: 'Workflow not found' } - } - - return { success: true, userId: workflowData.userId, authType: 'internal_jwt' } - } - if (options.requireWorkflowId !== false) { - return { success: false, error: 'workflowId or userId required for internal JWT calls' } + return { success: false, error: 'userId required for internal JWT calls' } } return { success: true, authType: 'internal_jwt' } @@ -222,6 +203,7 @@ export async function checkHybridAuth( success: true, userId: result.userId!, authType: 'api_key', + apiKeyType: result.keyType, } } diff --git a/apps/sim/lib/core/config/env.ts b/apps/sim/lib/core/config/env.ts index 961b10b84..1921738ed 100644 --- a/apps/sim/lib/core/config/env.ts +++ b/apps/sim/lib/core/config/env.ts @@ -185,6 +185,17 @@ export const env = createEnv({ IVM_MAX_CONCURRENT: z.string().optional().default('10000'), // Max concurrent executions globally IVM_MAX_PER_WORKER: z.string().optional().default('2500'), // Max concurrent executions per worker IVM_WORKER_IDLE_TIMEOUT_MS: z.string().optional().default('60000'), // Worker idle cleanup timeout (ms) + IVM_MAX_QUEUE_SIZE: z.string().optional().default('10000'), // Max pending queued executions in memory + IVM_MAX_FETCH_RESPONSE_BYTES: z.string().optional().default('8388608'),// Max bytes read from sandbox fetch responses + IVM_MAX_FETCH_RESPONSE_CHARS: z.string().optional().default('4000000'),// Max chars returned to sandbox from fetch body + IVM_MAX_FETCH_OPTIONS_JSON_CHARS: z.string().optional().default('262144'), // Max JSON payload size for sandbox fetch options + IVM_MAX_FETCH_URL_LENGTH: z.string().optional().default('8192'), // Max URL length accepted by sandbox fetch + IVM_MAX_STDOUT_CHARS: z.string().optional().default('200000'), // Max captured stdout characters per execution + IVM_MAX_ACTIVE_PER_OWNER: z.string().optional().default('200'), // Max active executions per owner (per process) + IVM_MAX_QUEUED_PER_OWNER: z.string().optional().default('2000'), // Max queued executions per owner (per process) + IVM_MAX_OWNER_WEIGHT: z.string().optional().default('5'), // Max accepted weight for weighted owner scheduling + IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER:z.string().optional().default('2200'), // Max owner in-flight leases across replicas + IVM_DISTRIBUTED_LEASE_MIN_TTL_MS: z.string().optional().default('120000'), // Min TTL for distributed in-flight leases (ms) IVM_QUEUE_TIMEOUT_MS: z.string().optional().default('300000'), // Max queue wait before rejection (ms) // Knowledge Base Processing Configuration - Shared across all processing methods diff --git a/apps/sim/lib/core/security/input-validation.server.ts b/apps/sim/lib/core/security/input-validation.server.ts index e8c0ec861..2a912240c 100644 --- a/apps/sim/lib/core/security/input-validation.server.ts +++ b/apps/sim/lib/core/security/input-validation.server.ts @@ -103,6 +103,7 @@ export interface SecureFetchOptions { body?: string | Buffer | Uint8Array timeout?: number maxRedirects?: number + maxResponseBytes?: number } export class SecureFetchHeaders { @@ -165,6 +166,7 @@ export async function secureFetchWithPinnedIP( redirectCount = 0 ): Promise { const maxRedirects = options.maxRedirects ?? DEFAULT_MAX_REDIRECTS + const maxResponseBytes = options.maxResponseBytes return new Promise((resolve, reject) => { const parsed = new URL(url) @@ -237,14 +239,32 @@ export async function secureFetchWithPinnedIP( } const chunks: Buffer[] = [] + let totalBytes = 0 + let responseTerminated = false - res.on('data', (chunk: Buffer) => chunks.push(chunk)) + res.on('data', (chunk: Buffer) => { + if (responseTerminated) return + + totalBytes += chunk.length + if ( + typeof maxResponseBytes === 'number' && + maxResponseBytes > 0 && + totalBytes > maxResponseBytes + ) { + responseTerminated = true + res.destroy(new Error(`Response exceeded maximum size of ${maxResponseBytes} bytes`)) + return + } + + chunks.push(chunk) + }) res.on('error', (error) => { reject(error) }) res.on('end', () => { + if (responseTerminated) return const bodyBuffer = Buffer.concat(chunks) const body = bodyBuffer.toString('utf-8') const headersRecord: Record = {} diff --git a/apps/sim/lib/execution/isolated-vm-worker.cjs b/apps/sim/lib/execution/isolated-vm-worker.cjs index 3deb76166..2641b80e1 100644 --- a/apps/sim/lib/execution/isolated-vm-worker.cjs +++ b/apps/sim/lib/execution/isolated-vm-worker.cjs @@ -9,6 +9,21 @@ const USER_CODE_START_LINE = 4 const pendingFetches = new Map() let fetchIdCounter = 0 const FETCH_TIMEOUT_MS = 300000 // 5 minutes +const MAX_STDOUT_CHARS = Number.parseInt(process.env.IVM_MAX_STDOUT_CHARS || '', 10) || 200000 +const MAX_FETCH_OPTIONS_JSON_CHARS = + Number.parseInt(process.env.IVM_MAX_FETCH_OPTIONS_JSON_CHARS || '', 10) || 256 * 1024 + +function stringifyLogValue(value) { + if (typeof value !== 'object' || value === null) { + return String(value) + } + + try { + return JSON.stringify(value) + } catch { + return '[unserializable]' + } +} /** * Extract line and column from error stack or message @@ -101,8 +116,32 @@ function convertToCompatibleError(errorInfo, userCode) { async function executeCode(request) { const { code, params, envVars, contextVariables, timeoutMs, requestId } = request const stdoutChunks = [] + let stdoutLength = 0 + let stdoutTruncated = false let isolate = null + const appendStdout = (line) => { + if (stdoutTruncated || !line) return + + const remaining = MAX_STDOUT_CHARS - stdoutLength + if (remaining <= 0) { + stdoutTruncated = true + stdoutChunks.push('[stdout truncated]\n') + return + } + + if (line.length <= remaining) { + stdoutChunks.push(line) + stdoutLength += line.length + return + } + + stdoutChunks.push(line.slice(0, remaining)) + stdoutChunks.push('\n[stdout truncated]\n') + stdoutLength = MAX_STDOUT_CHARS + stdoutTruncated = true + } + try { isolate = new ivm.Isolate({ memoryLimit: 128 }) const context = await isolate.createContext() @@ -111,18 +150,14 @@ async function executeCode(request) { await jail.set('global', jail.derefInto()) const logCallback = new ivm.Callback((...args) => { - const message = args - .map((arg) => (typeof arg === 'object' ? JSON.stringify(arg) : String(arg))) - .join(' ') - stdoutChunks.push(`${message}\n`) + const message = args.map((arg) => stringifyLogValue(arg)).join(' ') + appendStdout(`${message}\n`) }) await jail.set('__log', logCallback) const errorCallback = new ivm.Callback((...args) => { - const message = args - .map((arg) => (typeof arg === 'object' ? JSON.stringify(arg) : String(arg))) - .join(' ') - stdoutChunks.push(`ERROR: ${message}\n`) + const message = args.map((arg) => stringifyLogValue(arg)).join(' ') + appendStdout(`ERROR: ${message}\n`) }) await jail.set('__error', errorCallback) @@ -178,6 +213,9 @@ async function executeCode(request) { } catch { throw new Error('fetch options must be JSON-serializable'); } + if (optionsJson.length > ${MAX_FETCH_OPTIONS_JSON_CHARS}) { + throw new Error('fetch options exceed maximum payload size'); + } } const resultJson = await __fetchRef.apply(undefined, [url, optionsJson], { result: { promise: true } }); let result; diff --git a/apps/sim/lib/execution/isolated-vm.test.ts b/apps/sim/lib/execution/isolated-vm.test.ts new file mode 100644 index 000000000..ad5023e4d --- /dev/null +++ b/apps/sim/lib/execution/isolated-vm.test.ts @@ -0,0 +1,500 @@ +import { EventEmitter } from 'node:events' +import { afterEach, describe, expect, it, vi } from 'vitest' + +type MockProc = EventEmitter & { + connected: boolean + stderr: EventEmitter + send: (message: unknown) => boolean + kill: () => boolean +} + +type SpawnFactory = () => MockProc +type RedisEval = (...args: any[]) => unknown | Promise +type SecureFetchImpl = (...args: any[]) => unknown | Promise + +function createBaseProc(): MockProc { + const proc = new EventEmitter() as MockProc + proc.connected = true + proc.stderr = new EventEmitter() + proc.send = () => true + proc.kill = () => { + if (!proc.connected) return true + proc.connected = false + setImmediate(() => proc.emit('exit', 0)) + return true + } + return proc +} + +function createStartupFailureProc(): MockProc { + const proc = createBaseProc() + setImmediate(() => { + proc.connected = false + proc.emit('exit', 1) + }) + return proc +} + +function createReadyProc(result: unknown): MockProc { + const proc = createBaseProc() + proc.send = (message: unknown) => { + const msg = message as { type?: string; executionId?: number } + if (msg.type === 'execute') { + setImmediate(() => { + proc.emit('message', { + type: 'result', + executionId: msg.executionId, + result: { result, stdout: '' }, + }) + }) + } + return true + } + setImmediate(() => proc.emit('message', { type: 'ready' })) + return proc +} + +function createReadyProcWithDelay(delayMs: number): MockProc { + const proc = createBaseProc() + proc.send = (message: unknown) => { + const msg = message as { type?: string; executionId?: number; request?: { requestId?: string } } + if (msg.type === 'execute') { + setTimeout(() => { + proc.emit('message', { + type: 'result', + executionId: msg.executionId, + result: { result: msg.request?.requestId ?? 'unknown', stdout: '' }, + }) + }, delayMs) + } + return true + } + setImmediate(() => proc.emit('message', { type: 'ready' })) + return proc +} + +function createReadyFetchProxyProc(fetchMessage: { url: string; optionsJson?: string }): MockProc { + const proc = createBaseProc() + let currentExecutionId = 0 + + proc.send = (message: unknown) => { + const msg = message as { type?: string; executionId?: number; request?: { requestId?: string } } + + if (msg.type === 'execute') { + currentExecutionId = msg.executionId ?? 0 + setImmediate(() => { + proc.emit('message', { + type: 'fetch', + fetchId: 1, + requestId: msg.request?.requestId ?? 'fetch-test', + url: fetchMessage.url, + optionsJson: fetchMessage.optionsJson, + }) + }) + return true + } + + if (msg.type === 'fetchResponse') { + const fetchResponse = message as { response?: string } + setImmediate(() => { + proc.emit('message', { + type: 'result', + executionId: currentExecutionId, + result: { result: fetchResponse.response ?? '', stdout: '' }, + }) + }) + return true + } + + return true + } + + setImmediate(() => proc.emit('message', { type: 'ready' })) + return proc +} + +async function loadExecutionModule(options: { + envOverrides?: Record + spawns: SpawnFactory[] + redisEvalImpl?: RedisEval + secureFetchImpl?: SecureFetchImpl +}) { + vi.resetModules() + + const spawnQueue = [...options.spawns] + const spawnMock = vi.fn(() => { + const next = spawnQueue.shift() + if (!next) { + throw new Error('No mock spawn factory configured') + } + return next() as any + }) + + vi.doMock('@sim/logger', () => ({ + createLogger: () => ({ + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }), + })) + + const secureFetchMock = vi.fn( + options.secureFetchImpl ?? + (async () => ({ + ok: true, + status: 200, + statusText: 'OK', + headers: new Map(), + text: async () => '', + json: async () => ({}), + arrayBuffer: async () => new ArrayBuffer(0), + })) + ) + vi.doMock('@/lib/core/security/input-validation.server', () => ({ + secureFetchWithValidation: secureFetchMock, + })) + + vi.doMock('@/lib/core/config/env', () => ({ + env: { + IVM_POOL_SIZE: '1', + IVM_MAX_CONCURRENT: '100', + IVM_MAX_PER_WORKER: '100', + IVM_WORKER_IDLE_TIMEOUT_MS: '60000', + IVM_MAX_QUEUE_SIZE: '10', + IVM_MAX_ACTIVE_PER_OWNER: '100', + IVM_MAX_QUEUED_PER_OWNER: '10', + IVM_MAX_OWNER_WEIGHT: '5', + IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER: '100', + IVM_DISTRIBUTED_LEASE_MIN_TTL_MS: '1000', + IVM_QUEUE_TIMEOUT_MS: '1000', + ...(options.envOverrides ?? {}), + }, + })) + + const redisEval = options.redisEvalImpl ? vi.fn(options.redisEvalImpl) : undefined + vi.doMock('@/lib/core/config/redis', () => ({ + getRedisClient: vi.fn(() => + redisEval + ? ({ + eval: redisEval, + } as any) + : null + ), + })) + + vi.doMock('node:child_process', () => ({ + execSync: vi.fn(() => Buffer.from('v23.11.0')), + spawn: spawnMock, + })) + + const mod = await import('./isolated-vm') + return { ...mod, spawnMock, secureFetchMock } +} + +describe('isolated-vm scheduler', () => { + afterEach(() => { + vi.restoreAllMocks() + vi.resetModules() + }) + + it('recovers from an initial spawn failure and drains queued work', async () => { + const { executeInIsolatedVM, spawnMock } = await loadExecutionModule({ + spawns: [createStartupFailureProc, () => createReadyProc('ok')], + }) + + const result = await executeInIsolatedVM({ + code: 'return "ok"', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-1', + }) + + expect(result.error).toBeUndefined() + expect(result.result).toBe('ok') + expect(spawnMock).toHaveBeenCalledTimes(2) + }) + + it('rejects new requests when the queue is full', async () => { + const { executeInIsolatedVM } = await loadExecutionModule({ + envOverrides: { + IVM_MAX_QUEUE_SIZE: '1', + IVM_QUEUE_TIMEOUT_MS: '200', + }, + spawns: [createStartupFailureProc, createStartupFailureProc, createStartupFailureProc], + }) + + const firstPromise = executeInIsolatedVM({ + code: 'return 1', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-2', + ownerKey: 'user:a', + }) + + await new Promise((resolve) => setTimeout(resolve, 25)) + + const second = await executeInIsolatedVM({ + code: 'return 2', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-3', + ownerKey: 'user:b', + }) + + expect(second.error?.name).toBe('QueueFullError') + + const first = await firstPromise + expect(first.error?.name).toBe('QueueTimeoutError') + }) + + it('enforces per-owner queued limit', async () => { + const { executeInIsolatedVM } = await loadExecutionModule({ + envOverrides: { + IVM_MAX_QUEUED_PER_OWNER: '1', + IVM_QUEUE_TIMEOUT_MS: '200', + }, + spawns: [createStartupFailureProc, createStartupFailureProc, createStartupFailureProc], + }) + + const firstPromise = executeInIsolatedVM({ + code: 'return 1', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-4', + ownerKey: 'user:hog', + }) + + await new Promise((resolve) => setTimeout(resolve, 25)) + + const second = await executeInIsolatedVM({ + code: 'return 2', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-5', + ownerKey: 'user:hog', + }) + + expect(second.error?.name).toBe('OwnerQueueLimitError') + + const first = await firstPromise + expect(first.error?.name).toBe('QueueTimeoutError') + }) + + it('enforces distributed owner in-flight lease limit when Redis is configured', async () => { + const { executeInIsolatedVM } = await loadExecutionModule({ + envOverrides: { + IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER: '1', + REDIS_URL: 'redis://localhost:6379', + }, + spawns: [() => createReadyProc('ok')], + redisEvalImpl: (...args: any[]) => { + const script = String(args[0] ?? '') + if (script.includes('ZREMRANGEBYSCORE')) { + return 0 + } + return 1 + }, + }) + + const result = await executeInIsolatedVM({ + code: 'return "blocked"', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-6', + ownerKey: 'user:distributed', + }) + + expect(result.error?.name).toBe('OwnerInFlightLimitError') + }) + + it('fails closed when Redis is configured but unavailable', async () => { + const { executeInIsolatedVM } = await loadExecutionModule({ + envOverrides: { + REDIS_URL: 'redis://localhost:6379', + }, + spawns: [() => createReadyProc('ok')], + }) + + const result = await executeInIsolatedVM({ + code: 'return "blocked"', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-7', + ownerKey: 'user:redis-down', + }) + + expect(result.error?.name).toBe('DistributedFairnessUnavailableError') + }) + + it('fails closed when Redis lease evaluation errors', async () => { + const { executeInIsolatedVM } = await loadExecutionModule({ + envOverrides: { + REDIS_URL: 'redis://localhost:6379', + }, + spawns: [() => createReadyProc('ok')], + redisEvalImpl: (...args: any[]) => { + const script = String(args[0] ?? '') + if (script.includes('ZREMRANGEBYSCORE')) { + throw new Error('redis timeout') + } + return 1 + }, + }) + + const result = await executeInIsolatedVM({ + code: 'return "blocked"', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-8', + ownerKey: 'user:redis-error', + }) + + expect(result.error?.name).toBe('DistributedFairnessUnavailableError') + }) + + it('applies weighted owner scheduling when draining queued executions', async () => { + const { executeInIsolatedVM } = await loadExecutionModule({ + envOverrides: { + IVM_MAX_PER_WORKER: '1', + }, + spawns: [() => createReadyProcWithDelay(10)], + }) + + const completionOrder: string[] = [] + const pushCompletion = (label: string) => (res: { result: unknown }) => { + completionOrder.push(String(res.result ?? label)) + return res + } + + const p1 = executeInIsolatedVM({ + code: 'return 1', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 500, + requestId: 'a-1', + ownerKey: 'user:a', + ownerWeight: 2, + }).then(pushCompletion('a-1')) + + const p2 = executeInIsolatedVM({ + code: 'return 2', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 500, + requestId: 'a-2', + ownerKey: 'user:a', + ownerWeight: 2, + }).then(pushCompletion('a-2')) + + const p3 = executeInIsolatedVM({ + code: 'return 3', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 500, + requestId: 'b-1', + ownerKey: 'user:b', + ownerWeight: 1, + }).then(pushCompletion('b-1')) + + const p4 = executeInIsolatedVM({ + code: 'return 4', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 500, + requestId: 'b-2', + ownerKey: 'user:b', + ownerWeight: 1, + }).then(pushCompletion('b-2')) + + const p5 = executeInIsolatedVM({ + code: 'return 5', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 500, + requestId: 'a-3', + ownerKey: 'user:a', + ownerWeight: 2, + }).then(pushCompletion('a-3')) + + await Promise.all([p1, p2, p3, p4, p5]) + + expect(completionOrder.slice(0, 3)).toEqual(['a-1', 'a-2', 'a-3']) + expect(completionOrder).toEqual(['a-1', 'a-2', 'a-3', 'b-1', 'b-2']) + }) + + it('rejects oversized fetch options payloads before outbound call', async () => { + const { executeInIsolatedVM, secureFetchMock } = await loadExecutionModule({ + envOverrides: { + IVM_MAX_FETCH_OPTIONS_JSON_CHARS: '50', + }, + spawns: [ + () => + createReadyFetchProxyProc({ + url: 'https://example.com', + optionsJson: 'x'.repeat(100), + }), + ], + }) + + const result = await executeInIsolatedVM({ + code: 'return "fetch-options"', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-fetch-options', + }) + + const payload = JSON.parse(String(result.result)) + expect(payload.error).toContain('Fetch options exceed maximum payload size') + expect(secureFetchMock).not.toHaveBeenCalled() + }) + + it('rejects overly long fetch URLs before outbound call', async () => { + const { executeInIsolatedVM, secureFetchMock } = await loadExecutionModule({ + envOverrides: { + IVM_MAX_FETCH_URL_LENGTH: '30', + }, + spawns: [ + () => + createReadyFetchProxyProc({ + url: 'https://example.com/path/to/a/very/long/resource', + }), + ], + }) + + const result = await executeInIsolatedVM({ + code: 'return "fetch-url"', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-fetch-url', + }) + + const payload = JSON.parse(String(result.result)) + expect(payload.error).toContain('fetch URL exceeds maximum length') + expect(secureFetchMock).not.toHaveBeenCalled() + }) +}) diff --git a/apps/sim/lib/execution/isolated-vm.ts b/apps/sim/lib/execution/isolated-vm.ts index f3f0ed8cc..1ac2ef1ee 100644 --- a/apps/sim/lib/execution/isolated-vm.ts +++ b/apps/sim/lib/execution/isolated-vm.ts @@ -4,7 +4,12 @@ import path from 'node:path' import { fileURLToPath } from 'node:url' import { createLogger } from '@sim/logger' import { env } from '@/lib/core/config/env' -import { validateProxyUrl } from '@/lib/core/security/input-validation' +import { getRedisClient } from '@/lib/core/config/redis' +import { + type SecureFetchOptions, + secureFetchWithValidation, +} from '@/lib/core/security/input-validation.server' +import { sanitizeUrlForLog } from '@/lib/core/utils/logging' const logger = createLogger('IsolatedVMExecution') @@ -28,6 +33,8 @@ export interface IsolatedVMExecutionRequest { contextVariables: Record timeoutMs: number requestId: string + ownerKey?: string + ownerWeight?: number } export interface IsolatedVMExecutionResult { @@ -50,10 +57,28 @@ const MAX_CONCURRENT = Number.parseInt(env.IVM_MAX_CONCURRENT) || 10000 const MAX_PER_WORKER = Number.parseInt(env.IVM_MAX_PER_WORKER) || 2500 const WORKER_IDLE_TIMEOUT_MS = Number.parseInt(env.IVM_WORKER_IDLE_TIMEOUT_MS) || 60000 const QUEUE_TIMEOUT_MS = Number.parseInt(env.IVM_QUEUE_TIMEOUT_MS) || 300000 +const MAX_QUEUE_SIZE = Number.parseInt(env.IVM_MAX_QUEUE_SIZE) || 10000 +const MAX_FETCH_RESPONSE_BYTES = + Number.parseInt(env.IVM_MAX_FETCH_RESPONSE_BYTES) || 8 * 1024 * 1024 +const MAX_FETCH_RESPONSE_CHARS = Number.parseInt(env.IVM_MAX_FETCH_RESPONSE_CHARS) || 4_000_000 +const MAX_FETCH_URL_LENGTH = Number.parseInt(env.IVM_MAX_FETCH_URL_LENGTH) || 8192 +const MAX_FETCH_OPTIONS_JSON_CHARS = + Number.parseInt(env.IVM_MAX_FETCH_OPTIONS_JSON_CHARS) || 256 * 1024 +const MAX_ACTIVE_PER_OWNER = Number.parseInt(env.IVM_MAX_ACTIVE_PER_OWNER) || 200 +const MAX_QUEUED_PER_OWNER = Number.parseInt(env.IVM_MAX_QUEUED_PER_OWNER) || 2000 +const MAX_OWNER_WEIGHT = Number.parseInt(env.IVM_MAX_OWNER_WEIGHT) || 5 +const DISTRIBUTED_MAX_INFLIGHT_PER_OWNER = + Number.parseInt(env.IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER) || + MAX_ACTIVE_PER_OWNER + MAX_QUEUED_PER_OWNER +const DISTRIBUTED_LEASE_MIN_TTL_MS = Number.parseInt(env.IVM_DISTRIBUTED_LEASE_MIN_TTL_MS) || 120000 +const DISTRIBUTED_KEY_PREFIX = 'ivm:fair:v1:owner' +const QUEUE_RETRY_DELAY_MS = 1000 +const DISTRIBUTED_LEASE_GRACE_MS = 30000 interface PendingExecution { resolve: (result: IsolatedVMExecutionResult) => void timeout: ReturnType + ownerKey: string } interface WorkerInfo { @@ -67,47 +92,419 @@ interface WorkerInfo { } interface QueuedExecution { + id: number + ownerKey: string req: IsolatedVMExecutionRequest resolve: (result: IsolatedVMExecutionResult) => void queueTimeout: ReturnType } +interface QueueNode { + ownerKey: string + value: QueuedExecution + prev: QueueNode | null + next: QueueNode | null +} + +interface OwnerState { + ownerKey: string + weight: number + activeExecutions: number + queueHead: QueueNode | null + queueTail: QueueNode | null + queueLength: number + burstRemaining: number +} + const workers: Map = new Map() -const executionQueue: QueuedExecution[] = [] +const ownerStates: Map = new Map() +const queuedOwnerRing: string[] = [] +let queuedOwnerCursor = 0 +let queueSize = 0 +const queueNodes: Map = new Map() let totalActiveExecutions = 0 let executionIdCounter = 0 +let queueIdCounter = 0 let nextWorkerId = 0 let spawnInProgress = 0 +let queueDrainRetryTimeout: ReturnType | null = null -async function secureFetch(requestId: string, url: string, options?: RequestInit): Promise { - const validation = validateProxyUrl(url) - if (!validation.isValid) { - logger.warn(`[${requestId}] Blocked fetch request due to SSRF validation`, { - url: url.substring(0, 100), - error: validation.error, +type IsolatedFetchOptions = RequestInit & { + timeout?: number + maxRedirects?: number +} + +function truncateString(value: string, maxChars: number): { value: string; truncated: boolean } { + if (value.length <= maxChars) { + return { value, truncated: false } + } + return { + value: `${value.slice(0, maxChars)}... [truncated ${value.length - maxChars} chars]`, + truncated: true, + } +} + +function normalizeFetchOptions(options?: IsolatedFetchOptions): SecureFetchOptions { + if (!options) return {} + + const normalized: SecureFetchOptions = { + maxResponseBytes: MAX_FETCH_RESPONSE_BYTES, + } + + if (typeof options.method === 'string' && options.method.length > 0) { + normalized.method = options.method + } + + if ( + typeof options.timeout === 'number' && + Number.isFinite(options.timeout) && + options.timeout > 0 + ) { + normalized.timeout = Math.floor(options.timeout) + } + + if ( + typeof options.maxRedirects === 'number' && + Number.isFinite(options.maxRedirects) && + options.maxRedirects >= 0 + ) { + normalized.maxRedirects = Math.floor(options.maxRedirects) + } + + if (options.headers) { + const headers: Record = {} + if (options.headers instanceof Headers) { + options.headers.forEach((value, key) => { + headers[key] = value + }) + } else if (Array.isArray(options.headers)) { + for (const [key, value] of options.headers) { + headers[String(key)] = String(value) + } + } else { + for (const [key, value] of Object.entries(options.headers)) { + headers[key] = String(value) + } + } + normalized.headers = headers + } + + if ( + typeof options.body === 'string' || + options.body instanceof Buffer || + options.body instanceof Uint8Array + ) { + normalized.body = options.body + } else if (options.body !== undefined && options.body !== null) { + normalized.body = String(options.body) + } + + return normalized +} + +async function secureFetch( + requestId: string, + url: string, + options?: IsolatedFetchOptions +): Promise { + if (url.length > MAX_FETCH_URL_LENGTH) { + return JSON.stringify({ + error: `Security Error: fetch URL exceeds maximum length (${MAX_FETCH_URL_LENGTH})`, }) - return JSON.stringify({ error: `Security Error: ${validation.error}` }) } try { - const response = await fetch(url, options) - const body = await response.text() + const response = await secureFetchWithValidation( + url, + normalizeFetchOptions(options), + 'fetchUrl' + ) + const bodyResult = truncateString(await response.text(), MAX_FETCH_RESPONSE_CHARS) const headers: Record = {} - response.headers.forEach((value, key) => { + for (const [key, value] of response.headers) { headers[key] = value - }) + } return JSON.stringify({ ok: response.ok, status: response.status, statusText: response.statusText, - body, + body: bodyResult.value, + bodyTruncated: bodyResult.truncated, headers, }) } catch (error: unknown) { + logger.warn(`[${requestId}] Isolated fetch failed`, { + url: sanitizeUrlForLog(url), + error: error instanceof Error ? error.message : String(error), + }) return JSON.stringify({ error: error instanceof Error ? error.message : 'Unknown fetch error' }) } } +function normalizeOwnerKey(ownerKey?: string): string { + if (!ownerKey) return 'anonymous' + const normalized = ownerKey.trim() + return normalized || 'anonymous' +} + +function normalizeOwnerWeight(ownerWeight?: number): number { + if (!Number.isFinite(ownerWeight) || ownerWeight === undefined) return 1 + return Math.max(1, Math.min(MAX_OWNER_WEIGHT, Math.floor(ownerWeight))) +} + +function ownerRedisKey(ownerKey: string): string { + return `${DISTRIBUTED_KEY_PREFIX}:${ownerKey}` +} + +type LeaseAcquireResult = 'acquired' | 'limit_exceeded' | 'unavailable' + +async function tryAcquireDistributedLease( + ownerKey: string, + leaseId: string, + timeoutMs: number +): Promise { + // Redis not configured: explicit local-mode fallback is allowed. + if (!env.REDIS_URL) return 'acquired' + + const redis = getRedisClient() + if (!redis) { + logger.error('Redis is configured but unavailable for distributed lease acquisition', { + ownerKey, + }) + return 'unavailable' + } + + const now = Date.now() + const leaseTtlMs = Math.max( + timeoutMs + QUEUE_TIMEOUT_MS + DISTRIBUTED_LEASE_GRACE_MS, + DISTRIBUTED_LEASE_MIN_TTL_MS + ) + const expiresAt = now + leaseTtlMs + const key = ownerRedisKey(ownerKey) + + const script = ` + redis.call('ZREMRANGEBYSCORE', KEYS[1], '-inf', ARGV[1]) + local current = redis.call('ZCARD', KEYS[1]) + if current >= tonumber(ARGV[2]) then + return 0 + end + redis.call('ZADD', KEYS[1], ARGV[3], ARGV[4]) + redis.call('PEXPIRE', KEYS[1], ARGV[5]) + return 1 + ` + + try { + const result = await redis.eval( + script, + 1, + key, + now.toString(), + DISTRIBUTED_MAX_INFLIGHT_PER_OWNER.toString(), + expiresAt.toString(), + leaseId, + leaseTtlMs.toString() + ) + return Number(result) === 1 ? 'acquired' : 'limit_exceeded' + } catch (error) { + logger.error('Failed to acquire distributed owner lease', { ownerKey, error }) + return 'unavailable' + } +} + +async function releaseDistributedLease(ownerKey: string, leaseId: string): Promise { + const redis = getRedisClient() + if (!redis) return + + const key = ownerRedisKey(ownerKey) + const script = ` + redis.call('ZREM', KEYS[1], ARGV[1]) + if redis.call('ZCARD', KEYS[1]) == 0 then + redis.call('DEL', KEYS[1]) + end + return 1 + ` + + try { + await redis.eval(script, 1, key, leaseId) + } catch (error) { + logger.error('Failed to release distributed owner lease', { ownerKey, error }) + } +} + +function queueLength(): number { + return queueSize +} + +function maybeClearDrainRetry() { + if (queueSize === 0 && queueDrainRetryTimeout) { + clearTimeout(queueDrainRetryTimeout) + queueDrainRetryTimeout = null + } +} + +function getOrCreateOwnerState(ownerKey: string, ownerWeight: number): OwnerState { + const existing = ownerStates.get(ownerKey) + if (existing) { + existing.weight = Math.max(existing.weight, ownerWeight) + return existing + } + + const ownerState: OwnerState = { + ownerKey, + weight: ownerWeight, + activeExecutions: 0, + queueHead: null, + queueTail: null, + queueLength: 0, + burstRemaining: 0, + } + ownerStates.set(ownerKey, ownerState) + return ownerState +} + +function addOwnerToRing(ownerKey: string) { + if (queuedOwnerRing.includes(ownerKey)) return + queuedOwnerRing.push(ownerKey) +} + +function removeOwnerFromRing(ownerKey: string) { + const idx = queuedOwnerRing.indexOf(ownerKey) + if (idx === -1) return + queuedOwnerRing.splice(idx, 1) + if (queuedOwnerRing.length === 0) { + queuedOwnerCursor = 0 + return + } + if (idx < queuedOwnerCursor) { + queuedOwnerCursor-- + } else if (queuedOwnerCursor >= queuedOwnerRing.length) { + queuedOwnerCursor = 0 + } +} + +function maybeCleanupOwner(ownerKey: string) { + const owner = ownerStates.get(ownerKey) + if (!owner) return + if (owner.queueLength === 0) { + removeOwnerFromRing(ownerKey) + } + if (owner.queueLength === 0 && owner.activeExecutions === 0) { + ownerStates.delete(ownerKey) + } +} + +function removeQueueNode(node: QueueNode): QueuedExecution { + const owner = ownerStates.get(node.ownerKey) + if (!owner) { + queueNodes.delete(node.value.id) + queueSize = Math.max(0, queueSize - 1) + maybeClearDrainRetry() + return node.value + } + + const { prev, next, value } = node + if (prev) prev.next = next + else owner.queueHead = next + if (next) next.prev = prev + else owner.queueTail = prev + + node.prev = null + node.next = null + + queueNodes.delete(value.id) + owner.queueLength-- + queueSize-- + maybeCleanupOwner(owner.ownerKey) + maybeClearDrainRetry() + return value +} + +function shiftQueuedExecutionForOwner(owner: OwnerState): QueuedExecution | null { + if (!owner.queueHead) return null + return removeQueueNode(owner.queueHead) +} + +function removeQueuedExecutionById(queueId: number): QueuedExecution | null { + const node = queueNodes.get(queueId) + if (!node) return null + return removeQueueNode(node) +} + +function pushQueuedExecution(owner: OwnerState, queued: QueuedExecution) { + const node: QueueNode = { + ownerKey: owner.ownerKey, + value: queued, + prev: owner.queueTail, + next: null, + } + if (owner.queueTail) { + owner.queueTail.next = node + } else { + owner.queueHead = node + } + owner.queueTail = node + owner.queueLength++ + owner.burstRemaining = 0 + addOwnerToRing(owner.ownerKey) + queueNodes.set(queued.id, node) + queueSize++ +} + +function selectOwnerForDispatch(): OwnerState | null { + if (queuedOwnerRing.length === 0) return null + + let visited = 0 + while (queuedOwnerRing.length > 0 && visited < queuedOwnerRing.length) { + if (queuedOwnerCursor >= queuedOwnerRing.length) { + queuedOwnerCursor = 0 + } + const ownerKey = queuedOwnerRing[queuedOwnerCursor] + if (!ownerKey) return null + + const owner = ownerStates.get(ownerKey) + if (!owner) { + removeOwnerFromRing(ownerKey) + continue + } + + if (owner.queueLength === 0) { + owner.burstRemaining = 0 + removeOwnerFromRing(ownerKey) + visited++ + continue + } + + if (owner.activeExecutions >= MAX_ACTIVE_PER_OWNER) { + owner.burstRemaining = 0 + queuedOwnerCursor = (queuedOwnerCursor + 1) % queuedOwnerRing.length + visited++ + continue + } + + if (owner.burstRemaining <= 0) { + owner.burstRemaining = owner.weight + } + + owner.burstRemaining-- + if (owner.burstRemaining <= 0) { + queuedOwnerCursor = (queuedOwnerCursor + 1) % queuedOwnerRing.length + } + + return owner + } + + return null +} + +function scheduleDrainRetry() { + if (queueDrainRetryTimeout || queueSize === 0) return + queueDrainRetryTimeout = setTimeout(() => { + queueDrainRetryTimeout = null + if (queueSize === 0) return + drainQueue() + }, QUEUE_RETRY_DELAY_MS) +} + function handleWorkerMessage(workerId: number, message: unknown) { if (typeof message !== 'object' || message === null) return const msg = message as Record @@ -121,6 +518,11 @@ function handleWorkerMessage(workerId: number, message: unknown) { workerInfo!.pendingExecutions.delete(execId) workerInfo!.activeExecutions-- totalActiveExecutions-- + const owner = ownerStates.get(pending.ownerKey) + if (owner) { + owner.activeExecutions = Math.max(0, owner.activeExecutions - 1) + maybeCleanupOwner(owner.ownerKey) + } pending.resolve(msg.result as IsolatedVMExecutionResult) resetWorkerIdleTimeout(workerId) drainQueue() @@ -135,7 +537,26 @@ function handleWorkerMessage(workerId: number, message: unknown) { url: string optionsJson?: string } - let options: RequestInit | undefined + if (typeof url !== 'string' || url.length === 0) { + workerInfo?.process.send({ + type: 'fetchResponse', + fetchId, + response: JSON.stringify({ error: 'Invalid fetch URL' }), + }) + return + } + if (optionsJson && optionsJson.length > MAX_FETCH_OPTIONS_JSON_CHARS) { + workerInfo?.process.send({ + type: 'fetchResponse', + fetchId, + response: JSON.stringify({ + error: `Fetch options exceed maximum payload size (${MAX_FETCH_OPTIONS_JSON_CHARS} chars)`, + }), + }) + return + } + + let options: IsolatedFetchOptions | undefined if (optionsJson) { try { options = JSON.parse(optionsJson) @@ -185,6 +606,11 @@ function cleanupWorker(workerId: number) { for (const [id, pending] of workerInfo.pendingExecutions) { clearTimeout(pending.timeout) totalActiveExecutions-- + const owner = ownerStates.get(pending.ownerKey) + if (owner) { + owner.activeExecutions = Math.max(0, owner.activeExecutions - 1) + maybeCleanupOwner(owner.ownerKey) + } pending.resolve({ result: null, stdout: '', @@ -192,6 +618,7 @@ function cleanupWorker(workerId: number) { }) workerInfo.pendingExecutions.delete(id) } + workerInfo.activeExecutions = 0 workers.delete(workerId) logger.info('Worker removed from pool', { workerId, poolSize: workers.size }) @@ -220,6 +647,16 @@ function resetWorkerIdleTimeout(workerId: number) { function spawnWorker(): Promise { const workerId = nextWorkerId++ spawnInProgress++ + let spawnSettled = false + + const settleSpawnInProgress = () => { + if (spawnSettled) { + return false + } + spawnSettled = true + spawnInProgress-- + return true + } const workerInfo: WorkerInfo = { process: null as unknown as ChildProcess, @@ -233,7 +670,7 @@ function spawnWorker(): Promise { workerInfo.readyPromise = new Promise((resolve, reject) => { if (!checkNodeAvailable()) { - spawnInProgress-- + settleSpawnInProgress() reject( new Error( 'Node.js is required for code execution but was not found. ' + @@ -247,77 +684,85 @@ function spawnWorker(): Promise { const workerPath = path.join(currentDir, 'isolated-vm-worker.cjs') if (!fs.existsSync(workerPath)) { - spawnInProgress-- + settleSpawnInProgress() reject(new Error(`Worker file not found at ${workerPath}`)) return } - import('node:child_process').then(({ spawn }) => { - const proc = spawn('node', [workerPath], { - stdio: ['ignore', 'pipe', 'pipe', 'ipc'], - serialization: 'json', - }) - workerInfo.process = proc + import('node:child_process') + .then(({ spawn }) => { + const proc = spawn('node', [workerPath], { + stdio: ['ignore', 'pipe', 'pipe', 'ipc'], + serialization: 'json', + }) + workerInfo.process = proc - proc.on('message', (message: unknown) => handleWorkerMessage(workerId, message)) + proc.on('message', (message: unknown) => handleWorkerMessage(workerId, message)) - let stderrData = '' - proc.stderr?.on('data', (data: Buffer) => { - stderrData += data.toString() - }) + let stderrData = '' + proc.stderr?.on('data', (data: Buffer) => { + stderrData += data.toString() + }) - const startTimeout = setTimeout(() => { - proc.kill() - spawnInProgress-- - workers.delete(workerId) - reject(new Error('Worker failed to start within timeout')) - }, 10000) + const startTimeout = setTimeout(() => { + proc.kill() + workers.delete(workerId) + if (!settleSpawnInProgress()) return + reject(new Error('Worker failed to start within timeout')) + }, 10000) - const readyHandler = (message: unknown) => { - if ( - typeof message === 'object' && - message !== null && - (message as { type?: string }).type === 'ready' - ) { - workerInfo.ready = true - spawnInProgress-- - clearTimeout(startTimeout) - proc.off('message', readyHandler) - workers.set(workerId, workerInfo) - resetWorkerIdleTimeout(workerId) - logger.info('Worker spawned and ready', { workerId, poolSize: workers.size }) - resolve() + const readyHandler = (message: unknown) => { + if ( + typeof message === 'object' && + message !== null && + (message as { type?: string }).type === 'ready' + ) { + if (!settleSpawnInProgress()) { + proc.off('message', readyHandler) + return + } + workerInfo.ready = true + clearTimeout(startTimeout) + proc.off('message', readyHandler) + workers.set(workerId, workerInfo) + resetWorkerIdleTimeout(workerId) + logger.info('Worker spawned and ready', { workerId, poolSize: workers.size }) + resolve() + } } - } - proc.on('message', readyHandler) + proc.on('message', readyHandler) - proc.on('exit', () => { - const wasStartupFailure = !workerInfo.ready + proc.on('exit', () => { + const wasStartupFailure = !workerInfo.ready - if (wasStartupFailure) { - spawnInProgress-- - clearTimeout(startTimeout) + if (wasStartupFailure) { + clearTimeout(startTimeout) + if (!settleSpawnInProgress()) return - let errorMessage = 'Worker process exited unexpectedly' - if (stderrData.includes('isolated_vm') || stderrData.includes('MODULE_NOT_FOUND')) { - errorMessage = - 'Code execution requires the isolated-vm native module which failed to load. ' + - 'This usually means the module needs to be rebuilt for your Node.js version. ' + - 'Please run: cd node_modules/isolated-vm && npm rebuild' - logger.error('isolated-vm module failed to load', { stderr: stderrData, workerId }) - } else if (stderrData) { - errorMessage = `Worker process failed: ${stderrData.slice(0, 500)}` - logger.error('Worker process failed', { stderr: stderrData, workerId }) + let errorMessage = 'Worker process exited unexpectedly' + if (stderrData.includes('isolated_vm') || stderrData.includes('MODULE_NOT_FOUND')) { + errorMessage = + 'Code execution requires the isolated-vm native module which failed to load. ' + + 'This usually means the module needs to be rebuilt for your Node.js version. ' + + 'Please run: cd node_modules/isolated-vm && npm rebuild' + logger.error('isolated-vm module failed to load', { stderr: stderrData, workerId }) + } else if (stderrData) { + errorMessage = `Worker process failed: ${stderrData.slice(0, 500)}` + logger.error('Worker process failed', { stderr: stderrData, workerId }) + } + + reject(new Error(errorMessage)) + return } - reject(new Error(errorMessage)) - return - } - - cleanupWorker(workerId) - drainQueue() + cleanupWorker(workerId) + drainQueue() + }) + }) + .catch((error) => { + if (!settleSpawnInProgress()) return + reject(error instanceof Error ? error : new Error('Failed to load child_process module')) }) - }) }) return workerInfo.readyPromise.then(() => workerInfo) @@ -363,6 +808,7 @@ async function acquireWorker(): Promise { function dispatchToWorker( workerInfo: WorkerInfo, + ownerState: OwnerState, req: IsolatedVMExecutionRequest, resolve: (result: IsolatedVMExecutionResult) => void ) { @@ -377,6 +823,8 @@ function dispatchToWorker( workerInfo.pendingExecutions.delete(execId) workerInfo.activeExecutions-- totalActiveExecutions-- + ownerState.activeExecutions = Math.max(0, ownerState.activeExecutions - 1) + maybeCleanupOwner(ownerState.ownerKey) resolve({ result: null, stdout: '', @@ -386,9 +834,10 @@ function dispatchToWorker( drainQueue() }, req.timeoutMs + 1000) - workerInfo.pendingExecutions.set(execId, { resolve, timeout }) + workerInfo.pendingExecutions.set(execId, { resolve, timeout, ownerKey: ownerState.ownerKey }) workerInfo.activeExecutions++ totalActiveExecutions++ + ownerState.activeExecutions++ try { workerInfo.process.send({ type: 'execute', executionId: execId, request: req }) @@ -397,6 +846,8 @@ function dispatchToWorker( workerInfo.pendingExecutions.delete(execId) workerInfo.activeExecutions-- totalActiveExecutions-- + ownerState.activeExecutions = Math.max(0, ownerState.activeExecutions - 1) + maybeCleanupOwner(ownerState.ownerKey) resolve({ result: null, stdout: '', @@ -408,30 +859,62 @@ function dispatchToWorker( } function enqueueExecution( + ownerState: OwnerState, req: IsolatedVMExecutionRequest, resolve: (result: IsolatedVMExecutionResult) => void ) { + if (queueLength() >= MAX_QUEUE_SIZE) { + resolve({ + result: null, + stdout: '', + error: { + message: `Execution queue is full (${MAX_QUEUE_SIZE}). Please retry later.`, + name: 'QueueFullError', + }, + }) + return + } + if (ownerState.queueLength >= MAX_QUEUED_PER_OWNER) { + resolve({ + result: null, + stdout: '', + error: { + message: `Owner queue limit reached (${MAX_QUEUED_PER_OWNER}). Please retry later.`, + name: 'OwnerQueueLimitError', + }, + }) + return + } + + const queueId = ++queueIdCounter const queueTimeout = setTimeout(() => { - const idx = executionQueue.findIndex((q) => q.resolve === resolve) - if (idx !== -1) { - executionQueue.splice(idx, 1) - resolve({ - result: null, - stdout: '', - error: { - message: `Execution queued too long (${QUEUE_TIMEOUT_MS}ms). All workers are busy.`, - name: 'QueueTimeoutError', - }, - }) - } + const queued = removeQueuedExecutionById(queueId) + if (!queued) return + resolve({ + result: null, + stdout: '', + error: { + message: `Execution queued too long (${QUEUE_TIMEOUT_MS}ms). All workers are busy.`, + name: 'QueueTimeoutError', + }, + }) }, QUEUE_TIMEOUT_MS) - executionQueue.push({ req, resolve, queueTimeout }) + pushQueuedExecution(ownerState, { + id: queueId, + ownerKey: ownerState.ownerKey, + req, + resolve, + queueTimeout, + }) logger.info('Execution queued', { - queueLength: executionQueue.length, + queueLength: queueLength(), + ownerKey: ownerState.ownerKey, + ownerQueueLength: ownerState.queueLength, totalActive: totalActiveExecutions, poolSize: workers.size, }) + drainQueue() } /** @@ -439,21 +922,35 @@ function enqueueExecution( * executions to available workers. */ function drainQueue() { - while (executionQueue.length > 0 && totalActiveExecutions < MAX_CONCURRENT) { + while (queueLength() > 0 && totalActiveExecutions < MAX_CONCURRENT) { const worker = selectWorker() if (!worker) { const currentPoolSize = workers.size + spawnInProgress if (currentPoolSize < POOL_SIZE) { spawnWorker() .then(() => drainQueue()) - .catch((err) => logger.error('Failed to spawn worker during drain', { err })) + .catch((err) => { + logger.error('Failed to spawn worker during drain', { err }) + scheduleDrainRetry() + }) } break } - const queued = executionQueue.shift()! + const owner = selectOwnerForDispatch() + if (!owner) { + scheduleDrainRetry() + break + } + + const queued = shiftQueuedExecutionForOwner(owner) + if (!queued) { + owner.burstRemaining = 0 + maybeCleanupOwner(owner.ownerKey) + continue + } clearTimeout(queued.queueTimeout) - dispatchToWorker(worker, queued.req, queued.resolve) + dispatchToWorker(worker, owner, queued.req, queued.resolve) } } @@ -463,14 +960,80 @@ function drainQueue() { export async function executeInIsolatedVM( req: IsolatedVMExecutionRequest ): Promise { - if (totalActiveExecutions >= MAX_CONCURRENT) { - return new Promise((resolve) => enqueueExecution(req, resolve)) + const ownerKey = normalizeOwnerKey(req.ownerKey) + const ownerWeight = normalizeOwnerWeight(req.ownerWeight) + const ownerState = getOrCreateOwnerState(ownerKey, ownerWeight) + + const distributedLeaseId = `${req.requestId}:${Date.now()}:${Math.random().toString(36).slice(2, 10)}` + const leaseAcquireResult = await tryAcquireDistributedLease( + ownerKey, + distributedLeaseId, + req.timeoutMs + ) + if (leaseAcquireResult === 'limit_exceeded') { + return { + result: null, + stdout: '', + error: { + message: `Owner in-flight limit reached (${DISTRIBUTED_MAX_INFLIGHT_PER_OWNER}). Please retry later.`, + name: 'OwnerInFlightLimitError', + }, + } + } + if (leaseAcquireResult === 'unavailable') { + return { + result: null, + stdout: '', + error: { + message: 'Distributed fairness is temporarily unavailable. Please retry.', + name: 'DistributedFairnessUnavailableError', + }, + } } - const workerInfo = await acquireWorker() - if (!workerInfo) { - return new Promise((resolve) => enqueueExecution(req, resolve)) + let settled = false + const releaseLease = () => { + if (settled) return + settled = true + releaseDistributedLease(ownerKey, distributedLeaseId).catch((error) => { + logger.error('Failed to release distributed lease', { ownerKey, error }) + }) } - return new Promise((resolve) => dispatchToWorker(workerInfo, req, resolve)) + return new Promise((resolve) => { + const resolveWithRelease = (result: IsolatedVMExecutionResult) => { + releaseLease() + resolve(result) + } + + if ( + totalActiveExecutions >= MAX_CONCURRENT || + ownerState.activeExecutions >= MAX_ACTIVE_PER_OWNER + ) { + enqueueExecution(ownerState, req, resolveWithRelease) + return + } + + acquireWorker() + .then((workerInfo) => { + if (!workerInfo) { + enqueueExecution(ownerState, req, resolveWithRelease) + return + } + + dispatchToWorker(workerInfo, ownerState, req, resolveWithRelease) + if (queueLength() > 0) { + drainQueue() + } + }) + .catch((error) => { + logger.error('Failed to acquire worker for execution', { error, ownerKey }) + enqueueExecution(ownerState, req, resolveWithRelease) + }) + }).finally(() => { + releaseLease() + if (ownerState.queueLength === 0 && ownerState.activeExecutions === 0) { + maybeCleanupOwner(ownerState.ownerKey) + } + }) } diff --git a/apps/sim/lib/execution/preprocessing.ts b/apps/sim/lib/execution/preprocessing.ts index 9a0236fd1..3eb14813e 100644 --- a/apps/sim/lib/execution/preprocessing.ts +++ b/apps/sim/lib/execution/preprocessing.ts @@ -124,6 +124,7 @@ export interface PreprocessExecutionOptions { workspaceId?: string // If known, used for billing resolution loggingSession?: LoggingSession // If provided, will be used for error logging isResumeContext?: boolean // If true, allows fallback billing on resolution failure (for paused workflow resumes) + useAuthenticatedUserAsActor?: boolean // If true, use the authenticated userId as actorUserId (for client-side executions and personal API keys) /** @deprecated No longer used - background/async executions always use deployed state */ useDraftState?: boolean } @@ -170,6 +171,7 @@ export async function preprocessExecution( workspaceId: providedWorkspaceId, loggingSession: providedLoggingSession, isResumeContext = false, + useAuthenticatedUserAsActor = false, } = options logger.info(`[${requestId}] Starting execution preprocessing`, { @@ -257,7 +259,14 @@ export async function preprocessExecution( let actorUserId: string | null = null try { - if (workspaceId) { + // For client-side executions and personal API keys, the authenticated + // user is the billing and permission actor — not the workspace owner. + if (useAuthenticatedUserAsActor && userId) { + actorUserId = userId + logger.info(`[${requestId}] Using authenticated user as actor: ${actorUserId}`) + } + + if (!actorUserId && workspaceId) { actorUserId = await getWorkspaceBilledAccountUserId(workspaceId) if (actorUserId) { logger.info(`[${requestId}] Using workspace billed account: ${actorUserId}`) diff --git a/apps/sim/tools/index.ts b/apps/sim/tools/index.ts index 77e9b4d76..4d1a8a277 100644 --- a/apps/sim/tools/index.ts +++ b/apps/sim/tools/index.ts @@ -221,7 +221,8 @@ export async function executeTool( // If it's a custom tool, use the async version with workflowId if (isCustomTool(normalizedToolId)) { const workflowId = params._context?.workflowId - tool = await getToolAsync(normalizedToolId, workflowId) + const userId = params._context?.userId + tool = await getToolAsync(normalizedToolId, workflowId, userId) if (!tool) { logger.error(`[${requestId}] Custom tool not found: ${normalizedToolId}`) } @@ -260,26 +261,25 @@ export async function executeTool( try { const baseUrl = getBaseUrl() + const workflowId = contextParams._context?.workflowId + const userId = contextParams._context?.userId + const tokenPayload: OAuthTokenPayload = { credentialId: contextParams.credential as string, } - - // Add workflowId if it exists in params, context, or executionContext - const workflowId = - contextParams.workflowId || - contextParams._context?.workflowId || - executionContext?.workflowId if (workflowId) { tokenPayload.workflowId = workflowId } logger.info(`[${requestId}] Fetching access token from ${baseUrl}/api/auth/oauth/token`) - // Build token URL and also include workflowId in query so server auth can read it const tokenUrlObj = new URL('/api/auth/oauth/token', baseUrl) if (workflowId) { tokenUrlObj.searchParams.set('workflowId', workflowId) } + if (userId) { + tokenUrlObj.searchParams.set('userId', userId) + } // Always send Content-Type; add internal auth on server-side runs const tokenHeaders: Record = { 'Content-Type': 'application/json' } @@ -583,6 +583,10 @@ async function executeToolRequest( if (workflowId) { fullUrlObj.searchParams.set('workflowId', workflowId) } + const userId = params._context?.userId + if (userId) { + fullUrlObj.searchParams.set('userId', userId) + } } const fullUrl = fullUrlObj.toString() @@ -929,8 +933,8 @@ async function executeMcpTool( ) } - const workspaceId = params._context?.workspaceId || executionContext?.workspaceId - const workflowId = params._context?.workflowId || executionContext?.workflowId + const workspaceId = params._context?.workspaceId + const workflowId = params._context?.workflowId if (!workspaceId) { return { diff --git a/apps/sim/tools/utils.ts b/apps/sim/tools/utils.ts index e5364e415..0a7b635fa 100644 --- a/apps/sim/tools/utils.ts +++ b/apps/sim/tools/utils.ts @@ -311,7 +311,8 @@ export function getTool(toolId: string): ToolConfig | undefined { // Get a tool by its ID asynchronously (supports server-side) export async function getToolAsync( toolId: string, - workflowId?: string + workflowId?: string, + userId?: string ): Promise { // Check for built-in tools const builtInTool = tools[toolId] @@ -319,7 +320,7 @@ export async function getToolAsync( // Check if it's a custom tool if (isCustomTool(toolId)) { - return fetchCustomToolFromAPI(toolId, workflowId) + return fetchCustomToolFromAPI(toolId, workflowId, userId) } return undefined @@ -366,7 +367,8 @@ function createToolConfig(customTool: any, customToolId: string): ToolConfig { // Create a tool config from a custom tool definition by fetching from API async function fetchCustomToolFromAPI( customToolId: string, - workflowId?: string + workflowId?: string, + userId?: string ): Promise { const identifier = customToolId.replace('custom_', '') @@ -374,10 +376,12 @@ async function fetchCustomToolFromAPI( const baseUrl = getBaseUrl() const url = new URL('/api/tools/custom', baseUrl) - // Add workflowId as a query parameter if available if (workflowId) { url.searchParams.append('workflowId', workflowId) } + if (userId) { + url.searchParams.append('userId', userId) + } // For server-side calls (during workflow execution), use internal JWT token const headers: Record = {}