fix(userid): resolution for fair scheduling

This commit is contained in:
Vikhyath Mondreti
2026-02-06 15:03:44 -08:00
parent 1c97e21b72
commit 2f02d2c649
18 changed files with 1293 additions and 146 deletions

View File

@@ -845,6 +845,8 @@ export async function POST(req: NextRequest) {
contextVariables,
timeoutMs: timeout,
requestId,
ownerKey: `user:${auth.userId}`,
ownerWeight: 1,
})
const executionTime = Date.now() - startTime

View File

@@ -325,6 +325,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
requestId
)
// Client-side sessions and personal API keys bill/permission-check the
// authenticated user, not the workspace billed account.
const useAuthenticatedUserAsActor =
isClientSession || (auth.authType === 'api_key' && auth.apiKeyType === 'personal')
const preprocessResult = await preprocessExecution({
workflowId,
userId,
@@ -334,6 +339,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
checkDeployment: !shouldUseDraftState,
loggingSession,
useDraftState: shouldUseDraftState,
useAuthenticatedUserAsActor,
})
if (!preprocessResult.success) {

View File

@@ -307,6 +307,7 @@ export class AgentBlockHandler implements BlockHandler {
_context: {
workflowId: ctx.workflowId,
workspaceId: ctx.workspaceId,
userId: ctx.userId,
isDeployedContext: ctx.isDeployedContext,
},
},

View File

@@ -72,6 +72,7 @@ export class ApiBlockHandler implements BlockHandler {
workflowId: ctx.workflowId,
workspaceId: ctx.workspaceId,
executionId: ctx.executionId,
userId: ctx.userId,
isDeployedContext: ctx.isDeployedContext,
},
},

View File

@@ -48,6 +48,7 @@ export async function evaluateConditionExpression(
_context: {
workflowId: ctx.workflowId,
workspaceId: ctx.workspaceId,
userId: ctx.userId,
isDeployedContext: ctx.isDeployedContext,
},
},

View File

@@ -39,6 +39,7 @@ export class FunctionBlockHandler implements BlockHandler {
_context: {
workflowId: ctx.workflowId,
workspaceId: ctx.workspaceId,
userId: ctx.userId,
isDeployedContext: ctx.isDeployedContext,
},
},

View File

@@ -66,6 +66,7 @@ export class GenericBlockHandler implements BlockHandler {
workflowId: ctx.workflowId,
workspaceId: ctx.workspaceId,
executionId: ctx.executionId,
userId: ctx.userId,
isDeployedContext: ctx.isDeployedContext,
},
},

View File

@@ -605,6 +605,7 @@ export class HumanInTheLoopBlockHandler implements BlockHandler {
_context: {
workflowId: ctx.workflowId,
workspaceId: ctx.workspaceId,
userId: ctx.userId,
isDeployedContext: ctx.isDeployedContext,
},
blockData: blockDataWithPause,

View File

@@ -511,6 +511,8 @@ export class LoopOrchestrator {
contextVariables: {},
timeoutMs: LOOP_CONDITION_TIMEOUT_MS,
requestId,
ownerKey: `user:${ctx.userId}`,
ownerWeight: 1,
})
if (vmResult.error) {

View File

@@ -1,7 +1,4 @@
import { db } from '@sim/db'
import { workflow } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { authenticateApiKeyFromHeader, updateApiKeyLastUsed } from '@/lib/api-key/service'
import { getSession } from '@/lib/auth'
@@ -13,35 +10,33 @@ export interface AuthResult {
success: boolean
userId?: string
authType?: 'session' | 'api_key' | 'internal_jwt'
apiKeyType?: 'personal' | 'workspace'
error?: string
}
/**
* Resolves userId from a verified internal JWT token.
* Extracts workflowId/userId from URL params or POST body, then looks up userId if needed.
* Extracts userId from the JWT payload, URL search params, or POST body.
*/
async function resolveUserFromJwt(
request: NextRequest,
verificationUserId: string | null,
options: { requireWorkflowId?: boolean }
): Promise<AuthResult> {
let workflowId: string | null = null
let userId: string | null = verificationUserId
const { searchParams } = new URL(request.url)
workflowId = searchParams.get('workflowId')
if (!userId) {
const { searchParams } = new URL(request.url)
userId = searchParams.get('userId')
}
if (!workflowId && !userId && request.method === 'POST') {
if (!userId && request.method === 'POST') {
try {
const clonedRequest = request.clone()
const bodyText = await clonedRequest.text()
if (bodyText) {
const body = JSON.parse(bodyText)
workflowId = body.workflowId || body._context?.workflowId
userId = userId || body.userId || body._context?.userId
userId = body.userId || body._context?.userId || null
}
} catch {
// Ignore JSON parse errors
@@ -52,22 +47,8 @@ async function resolveUserFromJwt(
return { success: true, userId, authType: 'internal_jwt' }
}
if (workflowId) {
const [workflowData] = await db
.select({ userId: workflow.userId })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
if (!workflowData) {
return { success: false, error: 'Workflow not found' }
}
return { success: true, userId: workflowData.userId, authType: 'internal_jwt' }
}
if (options.requireWorkflowId !== false) {
return { success: false, error: 'workflowId or userId required for internal JWT calls' }
return { success: false, error: 'userId required for internal JWT calls' }
}
return { success: true, authType: 'internal_jwt' }
@@ -222,6 +203,7 @@ export async function checkHybridAuth(
success: true,
userId: result.userId!,
authType: 'api_key',
apiKeyType: result.keyType,
}
}

View File

@@ -185,6 +185,17 @@ export const env = createEnv({
IVM_MAX_CONCURRENT: z.string().optional().default('10000'), // Max concurrent executions globally
IVM_MAX_PER_WORKER: z.string().optional().default('2500'), // Max concurrent executions per worker
IVM_WORKER_IDLE_TIMEOUT_MS: z.string().optional().default('60000'), // Worker idle cleanup timeout (ms)
IVM_MAX_QUEUE_SIZE: z.string().optional().default('10000'), // Max pending queued executions in memory
IVM_MAX_FETCH_RESPONSE_BYTES: z.string().optional().default('8388608'),// Max bytes read from sandbox fetch responses
IVM_MAX_FETCH_RESPONSE_CHARS: z.string().optional().default('4000000'),// Max chars returned to sandbox from fetch body
IVM_MAX_FETCH_OPTIONS_JSON_CHARS: z.string().optional().default('262144'), // Max JSON payload size for sandbox fetch options
IVM_MAX_FETCH_URL_LENGTH: z.string().optional().default('8192'), // Max URL length accepted by sandbox fetch
IVM_MAX_STDOUT_CHARS: z.string().optional().default('200000'), // Max captured stdout characters per execution
IVM_MAX_ACTIVE_PER_OWNER: z.string().optional().default('200'), // Max active executions per owner (per process)
IVM_MAX_QUEUED_PER_OWNER: z.string().optional().default('2000'), // Max queued executions per owner (per process)
IVM_MAX_OWNER_WEIGHT: z.string().optional().default('5'), // Max accepted weight for weighted owner scheduling
IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER:z.string().optional().default('2200'), // Max owner in-flight leases across replicas
IVM_DISTRIBUTED_LEASE_MIN_TTL_MS: z.string().optional().default('120000'), // Min TTL for distributed in-flight leases (ms)
IVM_QUEUE_TIMEOUT_MS: z.string().optional().default('300000'), // Max queue wait before rejection (ms)
// Knowledge Base Processing Configuration - Shared across all processing methods

View File

@@ -103,6 +103,7 @@ export interface SecureFetchOptions {
body?: string | Buffer | Uint8Array
timeout?: number
maxRedirects?: number
maxResponseBytes?: number
}
export class SecureFetchHeaders {
@@ -165,6 +166,7 @@ export async function secureFetchWithPinnedIP(
redirectCount = 0
): Promise<SecureFetchResponse> {
const maxRedirects = options.maxRedirects ?? DEFAULT_MAX_REDIRECTS
const maxResponseBytes = options.maxResponseBytes
return new Promise((resolve, reject) => {
const parsed = new URL(url)
@@ -237,14 +239,32 @@ export async function secureFetchWithPinnedIP(
}
const chunks: Buffer[] = []
let totalBytes = 0
let responseTerminated = false
res.on('data', (chunk: Buffer) => chunks.push(chunk))
res.on('data', (chunk: Buffer) => {
if (responseTerminated) return
totalBytes += chunk.length
if (
typeof maxResponseBytes === 'number' &&
maxResponseBytes > 0 &&
totalBytes > maxResponseBytes
) {
responseTerminated = true
res.destroy(new Error(`Response exceeded maximum size of ${maxResponseBytes} bytes`))
return
}
chunks.push(chunk)
})
res.on('error', (error) => {
reject(error)
})
res.on('end', () => {
if (responseTerminated) return
const bodyBuffer = Buffer.concat(chunks)
const body = bodyBuffer.toString('utf-8')
const headersRecord: Record<string, string> = {}

View File

@@ -9,6 +9,21 @@ const USER_CODE_START_LINE = 4
const pendingFetches = new Map()
let fetchIdCounter = 0
const FETCH_TIMEOUT_MS = 300000 // 5 minutes
const MAX_STDOUT_CHARS = Number.parseInt(process.env.IVM_MAX_STDOUT_CHARS || '', 10) || 200000
const MAX_FETCH_OPTIONS_JSON_CHARS =
Number.parseInt(process.env.IVM_MAX_FETCH_OPTIONS_JSON_CHARS || '', 10) || 256 * 1024
function stringifyLogValue(value) {
if (typeof value !== 'object' || value === null) {
return String(value)
}
try {
return JSON.stringify(value)
} catch {
return '[unserializable]'
}
}
/**
* Extract line and column from error stack or message
@@ -101,8 +116,32 @@ function convertToCompatibleError(errorInfo, userCode) {
async function executeCode(request) {
const { code, params, envVars, contextVariables, timeoutMs, requestId } = request
const stdoutChunks = []
let stdoutLength = 0
let stdoutTruncated = false
let isolate = null
const appendStdout = (line) => {
if (stdoutTruncated || !line) return
const remaining = MAX_STDOUT_CHARS - stdoutLength
if (remaining <= 0) {
stdoutTruncated = true
stdoutChunks.push('[stdout truncated]\n')
return
}
if (line.length <= remaining) {
stdoutChunks.push(line)
stdoutLength += line.length
return
}
stdoutChunks.push(line.slice(0, remaining))
stdoutChunks.push('\n[stdout truncated]\n')
stdoutLength = MAX_STDOUT_CHARS
stdoutTruncated = true
}
try {
isolate = new ivm.Isolate({ memoryLimit: 128 })
const context = await isolate.createContext()
@@ -111,18 +150,14 @@ async function executeCode(request) {
await jail.set('global', jail.derefInto())
const logCallback = new ivm.Callback((...args) => {
const message = args
.map((arg) => (typeof arg === 'object' ? JSON.stringify(arg) : String(arg)))
.join(' ')
stdoutChunks.push(`${message}\n`)
const message = args.map((arg) => stringifyLogValue(arg)).join(' ')
appendStdout(`${message}\n`)
})
await jail.set('__log', logCallback)
const errorCallback = new ivm.Callback((...args) => {
const message = args
.map((arg) => (typeof arg === 'object' ? JSON.stringify(arg) : String(arg)))
.join(' ')
stdoutChunks.push(`ERROR: ${message}\n`)
const message = args.map((arg) => stringifyLogValue(arg)).join(' ')
appendStdout(`ERROR: ${message}\n`)
})
await jail.set('__error', errorCallback)
@@ -178,6 +213,9 @@ async function executeCode(request) {
} catch {
throw new Error('fetch options must be JSON-serializable');
}
if (optionsJson.length > ${MAX_FETCH_OPTIONS_JSON_CHARS}) {
throw new Error('fetch options exceed maximum payload size');
}
}
const resultJson = await __fetchRef.apply(undefined, [url, optionsJson], { result: { promise: true } });
let result;

View File

@@ -0,0 +1,500 @@
import { EventEmitter } from 'node:events'
import { afterEach, describe, expect, it, vi } from 'vitest'
type MockProc = EventEmitter & {
connected: boolean
stderr: EventEmitter
send: (message: unknown) => boolean
kill: () => boolean
}
type SpawnFactory = () => MockProc
type RedisEval = (...args: any[]) => unknown | Promise<unknown>
type SecureFetchImpl = (...args: any[]) => unknown | Promise<unknown>
function createBaseProc(): MockProc {
const proc = new EventEmitter() as MockProc
proc.connected = true
proc.stderr = new EventEmitter()
proc.send = () => true
proc.kill = () => {
if (!proc.connected) return true
proc.connected = false
setImmediate(() => proc.emit('exit', 0))
return true
}
return proc
}
function createStartupFailureProc(): MockProc {
const proc = createBaseProc()
setImmediate(() => {
proc.connected = false
proc.emit('exit', 1)
})
return proc
}
function createReadyProc(result: unknown): MockProc {
const proc = createBaseProc()
proc.send = (message: unknown) => {
const msg = message as { type?: string; executionId?: number }
if (msg.type === 'execute') {
setImmediate(() => {
proc.emit('message', {
type: 'result',
executionId: msg.executionId,
result: { result, stdout: '' },
})
})
}
return true
}
setImmediate(() => proc.emit('message', { type: 'ready' }))
return proc
}
function createReadyProcWithDelay(delayMs: number): MockProc {
const proc = createBaseProc()
proc.send = (message: unknown) => {
const msg = message as { type?: string; executionId?: number; request?: { requestId?: string } }
if (msg.type === 'execute') {
setTimeout(() => {
proc.emit('message', {
type: 'result',
executionId: msg.executionId,
result: { result: msg.request?.requestId ?? 'unknown', stdout: '' },
})
}, delayMs)
}
return true
}
setImmediate(() => proc.emit('message', { type: 'ready' }))
return proc
}
function createReadyFetchProxyProc(fetchMessage: { url: string; optionsJson?: string }): MockProc {
const proc = createBaseProc()
let currentExecutionId = 0
proc.send = (message: unknown) => {
const msg = message as { type?: string; executionId?: number; request?: { requestId?: string } }
if (msg.type === 'execute') {
currentExecutionId = msg.executionId ?? 0
setImmediate(() => {
proc.emit('message', {
type: 'fetch',
fetchId: 1,
requestId: msg.request?.requestId ?? 'fetch-test',
url: fetchMessage.url,
optionsJson: fetchMessage.optionsJson,
})
})
return true
}
if (msg.type === 'fetchResponse') {
const fetchResponse = message as { response?: string }
setImmediate(() => {
proc.emit('message', {
type: 'result',
executionId: currentExecutionId,
result: { result: fetchResponse.response ?? '', stdout: '' },
})
})
return true
}
return true
}
setImmediate(() => proc.emit('message', { type: 'ready' }))
return proc
}
async function loadExecutionModule(options: {
envOverrides?: Record<string, string>
spawns: SpawnFactory[]
redisEvalImpl?: RedisEval
secureFetchImpl?: SecureFetchImpl
}) {
vi.resetModules()
const spawnQueue = [...options.spawns]
const spawnMock = vi.fn(() => {
const next = spawnQueue.shift()
if (!next) {
throw new Error('No mock spawn factory configured')
}
return next() as any
})
vi.doMock('@sim/logger', () => ({
createLogger: () => ({
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
}),
}))
const secureFetchMock = vi.fn(
options.secureFetchImpl ??
(async () => ({
ok: true,
status: 200,
statusText: 'OK',
headers: new Map<string, string>(),
text: async () => '',
json: async () => ({}),
arrayBuffer: async () => new ArrayBuffer(0),
}))
)
vi.doMock('@/lib/core/security/input-validation.server', () => ({
secureFetchWithValidation: secureFetchMock,
}))
vi.doMock('@/lib/core/config/env', () => ({
env: {
IVM_POOL_SIZE: '1',
IVM_MAX_CONCURRENT: '100',
IVM_MAX_PER_WORKER: '100',
IVM_WORKER_IDLE_TIMEOUT_MS: '60000',
IVM_MAX_QUEUE_SIZE: '10',
IVM_MAX_ACTIVE_PER_OWNER: '100',
IVM_MAX_QUEUED_PER_OWNER: '10',
IVM_MAX_OWNER_WEIGHT: '5',
IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER: '100',
IVM_DISTRIBUTED_LEASE_MIN_TTL_MS: '1000',
IVM_QUEUE_TIMEOUT_MS: '1000',
...(options.envOverrides ?? {}),
},
}))
const redisEval = options.redisEvalImpl ? vi.fn(options.redisEvalImpl) : undefined
vi.doMock('@/lib/core/config/redis', () => ({
getRedisClient: vi.fn(() =>
redisEval
? ({
eval: redisEval,
} as any)
: null
),
}))
vi.doMock('node:child_process', () => ({
execSync: vi.fn(() => Buffer.from('v23.11.0')),
spawn: spawnMock,
}))
const mod = await import('./isolated-vm')
return { ...mod, spawnMock, secureFetchMock }
}
describe('isolated-vm scheduler', () => {
afterEach(() => {
vi.restoreAllMocks()
vi.resetModules()
})
it('recovers from an initial spawn failure and drains queued work', async () => {
const { executeInIsolatedVM, spawnMock } = await loadExecutionModule({
spawns: [createStartupFailureProc, () => createReadyProc('ok')],
})
const result = await executeInIsolatedVM({
code: 'return "ok"',
params: {},
envVars: {},
contextVariables: {},
timeoutMs: 100,
requestId: 'req-1',
})
expect(result.error).toBeUndefined()
expect(result.result).toBe('ok')
expect(spawnMock).toHaveBeenCalledTimes(2)
})
it('rejects new requests when the queue is full', async () => {
const { executeInIsolatedVM } = await loadExecutionModule({
envOverrides: {
IVM_MAX_QUEUE_SIZE: '1',
IVM_QUEUE_TIMEOUT_MS: '200',
},
spawns: [createStartupFailureProc, createStartupFailureProc, createStartupFailureProc],
})
const firstPromise = executeInIsolatedVM({
code: 'return 1',
params: {},
envVars: {},
contextVariables: {},
timeoutMs: 100,
requestId: 'req-2',
ownerKey: 'user:a',
})
await new Promise((resolve) => setTimeout(resolve, 25))
const second = await executeInIsolatedVM({
code: 'return 2',
params: {},
envVars: {},
contextVariables: {},
timeoutMs: 100,
requestId: 'req-3',
ownerKey: 'user:b',
})
expect(second.error?.name).toBe('QueueFullError')
const first = await firstPromise
expect(first.error?.name).toBe('QueueTimeoutError')
})
it('enforces per-owner queued limit', async () => {
const { executeInIsolatedVM } = await loadExecutionModule({
envOverrides: {
IVM_MAX_QUEUED_PER_OWNER: '1',
IVM_QUEUE_TIMEOUT_MS: '200',
},
spawns: [createStartupFailureProc, createStartupFailureProc, createStartupFailureProc],
})
const firstPromise = executeInIsolatedVM({
code: 'return 1',
params: {},
envVars: {},
contextVariables: {},
timeoutMs: 100,
requestId: 'req-4',
ownerKey: 'user:hog',
})
await new Promise((resolve) => setTimeout(resolve, 25))
const second = await executeInIsolatedVM({
code: 'return 2',
params: {},
envVars: {},
contextVariables: {},
timeoutMs: 100,
requestId: 'req-5',
ownerKey: 'user:hog',
})
expect(second.error?.name).toBe('OwnerQueueLimitError')
const first = await firstPromise
expect(first.error?.name).toBe('QueueTimeoutError')
})
it('enforces distributed owner in-flight lease limit when Redis is configured', async () => {
const { executeInIsolatedVM } = await loadExecutionModule({
envOverrides: {
IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER: '1',
REDIS_URL: 'redis://localhost:6379',
},
spawns: [() => createReadyProc('ok')],
redisEvalImpl: (...args: any[]) => {
const script = String(args[0] ?? '')
if (script.includes('ZREMRANGEBYSCORE')) {
return 0
}
return 1
},
})
const result = await executeInIsolatedVM({
code: 'return "blocked"',
params: {},
envVars: {},
contextVariables: {},
timeoutMs: 100,
requestId: 'req-6',
ownerKey: 'user:distributed',
})
expect(result.error?.name).toBe('OwnerInFlightLimitError')
})
it('fails closed when Redis is configured but unavailable', async () => {
const { executeInIsolatedVM } = await loadExecutionModule({
envOverrides: {
REDIS_URL: 'redis://localhost:6379',
},
spawns: [() => createReadyProc('ok')],
})
const result = await executeInIsolatedVM({
code: 'return "blocked"',
params: {},
envVars: {},
contextVariables: {},
timeoutMs: 100,
requestId: 'req-7',
ownerKey: 'user:redis-down',
})
expect(result.error?.name).toBe('DistributedFairnessUnavailableError')
})
it('fails closed when Redis lease evaluation errors', async () => {
const { executeInIsolatedVM } = await loadExecutionModule({
envOverrides: {
REDIS_URL: 'redis://localhost:6379',
},
spawns: [() => createReadyProc('ok')],
redisEvalImpl: (...args: any[]) => {
const script = String(args[0] ?? '')
if (script.includes('ZREMRANGEBYSCORE')) {
throw new Error('redis timeout')
}
return 1
},
})
const result = await executeInIsolatedVM({
code: 'return "blocked"',
params: {},
envVars: {},
contextVariables: {},
timeoutMs: 100,
requestId: 'req-8',
ownerKey: 'user:redis-error',
})
expect(result.error?.name).toBe('DistributedFairnessUnavailableError')
})
it('applies weighted owner scheduling when draining queued executions', async () => {
const { executeInIsolatedVM } = await loadExecutionModule({
envOverrides: {
IVM_MAX_PER_WORKER: '1',
},
spawns: [() => createReadyProcWithDelay(10)],
})
const completionOrder: string[] = []
const pushCompletion = (label: string) => (res: { result: unknown }) => {
completionOrder.push(String(res.result ?? label))
return res
}
const p1 = executeInIsolatedVM({
code: 'return 1',
params: {},
envVars: {},
contextVariables: {},
timeoutMs: 500,
requestId: 'a-1',
ownerKey: 'user:a',
ownerWeight: 2,
}).then(pushCompletion('a-1'))
const p2 = executeInIsolatedVM({
code: 'return 2',
params: {},
envVars: {},
contextVariables: {},
timeoutMs: 500,
requestId: 'a-2',
ownerKey: 'user:a',
ownerWeight: 2,
}).then(pushCompletion('a-2'))
const p3 = executeInIsolatedVM({
code: 'return 3',
params: {},
envVars: {},
contextVariables: {},
timeoutMs: 500,
requestId: 'b-1',
ownerKey: 'user:b',
ownerWeight: 1,
}).then(pushCompletion('b-1'))
const p4 = executeInIsolatedVM({
code: 'return 4',
params: {},
envVars: {},
contextVariables: {},
timeoutMs: 500,
requestId: 'b-2',
ownerKey: 'user:b',
ownerWeight: 1,
}).then(pushCompletion('b-2'))
const p5 = executeInIsolatedVM({
code: 'return 5',
params: {},
envVars: {},
contextVariables: {},
timeoutMs: 500,
requestId: 'a-3',
ownerKey: 'user:a',
ownerWeight: 2,
}).then(pushCompletion('a-3'))
await Promise.all([p1, p2, p3, p4, p5])
expect(completionOrder.slice(0, 3)).toEqual(['a-1', 'a-2', 'a-3'])
expect(completionOrder).toEqual(['a-1', 'a-2', 'a-3', 'b-1', 'b-2'])
})
it('rejects oversized fetch options payloads before outbound call', async () => {
const { executeInIsolatedVM, secureFetchMock } = await loadExecutionModule({
envOverrides: {
IVM_MAX_FETCH_OPTIONS_JSON_CHARS: '50',
},
spawns: [
() =>
createReadyFetchProxyProc({
url: 'https://example.com',
optionsJson: 'x'.repeat(100),
}),
],
})
const result = await executeInIsolatedVM({
code: 'return "fetch-options"',
params: {},
envVars: {},
contextVariables: {},
timeoutMs: 100,
requestId: 'req-fetch-options',
})
const payload = JSON.parse(String(result.result))
expect(payload.error).toContain('Fetch options exceed maximum payload size')
expect(secureFetchMock).not.toHaveBeenCalled()
})
it('rejects overly long fetch URLs before outbound call', async () => {
const { executeInIsolatedVM, secureFetchMock } = await loadExecutionModule({
envOverrides: {
IVM_MAX_FETCH_URL_LENGTH: '30',
},
spawns: [
() =>
createReadyFetchProxyProc({
url: 'https://example.com/path/to/a/very/long/resource',
}),
],
})
const result = await executeInIsolatedVM({
code: 'return "fetch-url"',
params: {},
envVars: {},
contextVariables: {},
timeoutMs: 100,
requestId: 'req-fetch-url',
})
const payload = JSON.parse(String(result.result))
expect(payload.error).toContain('fetch URL exceeds maximum length')
expect(secureFetchMock).not.toHaveBeenCalled()
})
})

View File

@@ -4,7 +4,12 @@ import path from 'node:path'
import { fileURLToPath } from 'node:url'
import { createLogger } from '@sim/logger'
import { env } from '@/lib/core/config/env'
import { validateProxyUrl } from '@/lib/core/security/input-validation'
import { getRedisClient } from '@/lib/core/config/redis'
import {
type SecureFetchOptions,
secureFetchWithValidation,
} from '@/lib/core/security/input-validation.server'
import { sanitizeUrlForLog } from '@/lib/core/utils/logging'
const logger = createLogger('IsolatedVMExecution')
@@ -28,6 +33,8 @@ export interface IsolatedVMExecutionRequest {
contextVariables: Record<string, unknown>
timeoutMs: number
requestId: string
ownerKey?: string
ownerWeight?: number
}
export interface IsolatedVMExecutionResult {
@@ -50,10 +57,28 @@ const MAX_CONCURRENT = Number.parseInt(env.IVM_MAX_CONCURRENT) || 10000
const MAX_PER_WORKER = Number.parseInt(env.IVM_MAX_PER_WORKER) || 2500
const WORKER_IDLE_TIMEOUT_MS = Number.parseInt(env.IVM_WORKER_IDLE_TIMEOUT_MS) || 60000
const QUEUE_TIMEOUT_MS = Number.parseInt(env.IVM_QUEUE_TIMEOUT_MS) || 300000
const MAX_QUEUE_SIZE = Number.parseInt(env.IVM_MAX_QUEUE_SIZE) || 10000
const MAX_FETCH_RESPONSE_BYTES =
Number.parseInt(env.IVM_MAX_FETCH_RESPONSE_BYTES) || 8 * 1024 * 1024
const MAX_FETCH_RESPONSE_CHARS = Number.parseInt(env.IVM_MAX_FETCH_RESPONSE_CHARS) || 4_000_000
const MAX_FETCH_URL_LENGTH = Number.parseInt(env.IVM_MAX_FETCH_URL_LENGTH) || 8192
const MAX_FETCH_OPTIONS_JSON_CHARS =
Number.parseInt(env.IVM_MAX_FETCH_OPTIONS_JSON_CHARS) || 256 * 1024
const MAX_ACTIVE_PER_OWNER = Number.parseInt(env.IVM_MAX_ACTIVE_PER_OWNER) || 200
const MAX_QUEUED_PER_OWNER = Number.parseInt(env.IVM_MAX_QUEUED_PER_OWNER) || 2000
const MAX_OWNER_WEIGHT = Number.parseInt(env.IVM_MAX_OWNER_WEIGHT) || 5
const DISTRIBUTED_MAX_INFLIGHT_PER_OWNER =
Number.parseInt(env.IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER) ||
MAX_ACTIVE_PER_OWNER + MAX_QUEUED_PER_OWNER
const DISTRIBUTED_LEASE_MIN_TTL_MS = Number.parseInt(env.IVM_DISTRIBUTED_LEASE_MIN_TTL_MS) || 120000
const DISTRIBUTED_KEY_PREFIX = 'ivm:fair:v1:owner'
const QUEUE_RETRY_DELAY_MS = 1000
const DISTRIBUTED_LEASE_GRACE_MS = 30000
interface PendingExecution {
resolve: (result: IsolatedVMExecutionResult) => void
timeout: ReturnType<typeof setTimeout>
ownerKey: string
}
interface WorkerInfo {
@@ -67,47 +92,419 @@ interface WorkerInfo {
}
interface QueuedExecution {
id: number
ownerKey: string
req: IsolatedVMExecutionRequest
resolve: (result: IsolatedVMExecutionResult) => void
queueTimeout: ReturnType<typeof setTimeout>
}
interface QueueNode {
ownerKey: string
value: QueuedExecution
prev: QueueNode | null
next: QueueNode | null
}
interface OwnerState {
ownerKey: string
weight: number
activeExecutions: number
queueHead: QueueNode | null
queueTail: QueueNode | null
queueLength: number
burstRemaining: number
}
const workers: Map<number, WorkerInfo> = new Map()
const executionQueue: QueuedExecution[] = []
const ownerStates: Map<string, OwnerState> = new Map()
const queuedOwnerRing: string[] = []
let queuedOwnerCursor = 0
let queueSize = 0
const queueNodes: Map<number, QueueNode> = new Map()
let totalActiveExecutions = 0
let executionIdCounter = 0
let queueIdCounter = 0
let nextWorkerId = 0
let spawnInProgress = 0
let queueDrainRetryTimeout: ReturnType<typeof setTimeout> | null = null
async function secureFetch(requestId: string, url: string, options?: RequestInit): Promise<string> {
const validation = validateProxyUrl(url)
if (!validation.isValid) {
logger.warn(`[${requestId}] Blocked fetch request due to SSRF validation`, {
url: url.substring(0, 100),
error: validation.error,
type IsolatedFetchOptions = RequestInit & {
timeout?: number
maxRedirects?: number
}
function truncateString(value: string, maxChars: number): { value: string; truncated: boolean } {
if (value.length <= maxChars) {
return { value, truncated: false }
}
return {
value: `${value.slice(0, maxChars)}... [truncated ${value.length - maxChars} chars]`,
truncated: true,
}
}
function normalizeFetchOptions(options?: IsolatedFetchOptions): SecureFetchOptions {
if (!options) return {}
const normalized: SecureFetchOptions = {
maxResponseBytes: MAX_FETCH_RESPONSE_BYTES,
}
if (typeof options.method === 'string' && options.method.length > 0) {
normalized.method = options.method
}
if (
typeof options.timeout === 'number' &&
Number.isFinite(options.timeout) &&
options.timeout > 0
) {
normalized.timeout = Math.floor(options.timeout)
}
if (
typeof options.maxRedirects === 'number' &&
Number.isFinite(options.maxRedirects) &&
options.maxRedirects >= 0
) {
normalized.maxRedirects = Math.floor(options.maxRedirects)
}
if (options.headers) {
const headers: Record<string, string> = {}
if (options.headers instanceof Headers) {
options.headers.forEach((value, key) => {
headers[key] = value
})
} else if (Array.isArray(options.headers)) {
for (const [key, value] of options.headers) {
headers[String(key)] = String(value)
}
} else {
for (const [key, value] of Object.entries(options.headers)) {
headers[key] = String(value)
}
}
normalized.headers = headers
}
if (
typeof options.body === 'string' ||
options.body instanceof Buffer ||
options.body instanceof Uint8Array
) {
normalized.body = options.body
} else if (options.body !== undefined && options.body !== null) {
normalized.body = String(options.body)
}
return normalized
}
async function secureFetch(
requestId: string,
url: string,
options?: IsolatedFetchOptions
): Promise<string> {
if (url.length > MAX_FETCH_URL_LENGTH) {
return JSON.stringify({
error: `Security Error: fetch URL exceeds maximum length (${MAX_FETCH_URL_LENGTH})`,
})
return JSON.stringify({ error: `Security Error: ${validation.error}` })
}
try {
const response = await fetch(url, options)
const body = await response.text()
const response = await secureFetchWithValidation(
url,
normalizeFetchOptions(options),
'fetchUrl'
)
const bodyResult = truncateString(await response.text(), MAX_FETCH_RESPONSE_CHARS)
const headers: Record<string, string> = {}
response.headers.forEach((value, key) => {
for (const [key, value] of response.headers) {
headers[key] = value
})
}
return JSON.stringify({
ok: response.ok,
status: response.status,
statusText: response.statusText,
body,
body: bodyResult.value,
bodyTruncated: bodyResult.truncated,
headers,
})
} catch (error: unknown) {
logger.warn(`[${requestId}] Isolated fetch failed`, {
url: sanitizeUrlForLog(url),
error: error instanceof Error ? error.message : String(error),
})
return JSON.stringify({ error: error instanceof Error ? error.message : 'Unknown fetch error' })
}
}
function normalizeOwnerKey(ownerKey?: string): string {
if (!ownerKey) return 'anonymous'
const normalized = ownerKey.trim()
return normalized || 'anonymous'
}
function normalizeOwnerWeight(ownerWeight?: number): number {
if (!Number.isFinite(ownerWeight) || ownerWeight === undefined) return 1
return Math.max(1, Math.min(MAX_OWNER_WEIGHT, Math.floor(ownerWeight)))
}
function ownerRedisKey(ownerKey: string): string {
return `${DISTRIBUTED_KEY_PREFIX}:${ownerKey}`
}
type LeaseAcquireResult = 'acquired' | 'limit_exceeded' | 'unavailable'
async function tryAcquireDistributedLease(
ownerKey: string,
leaseId: string,
timeoutMs: number
): Promise<LeaseAcquireResult> {
// Redis not configured: explicit local-mode fallback is allowed.
if (!env.REDIS_URL) return 'acquired'
const redis = getRedisClient()
if (!redis) {
logger.error('Redis is configured but unavailable for distributed lease acquisition', {
ownerKey,
})
return 'unavailable'
}
const now = Date.now()
const leaseTtlMs = Math.max(
timeoutMs + QUEUE_TIMEOUT_MS + DISTRIBUTED_LEASE_GRACE_MS,
DISTRIBUTED_LEASE_MIN_TTL_MS
)
const expiresAt = now + leaseTtlMs
const key = ownerRedisKey(ownerKey)
const script = `
redis.call('ZREMRANGEBYSCORE', KEYS[1], '-inf', ARGV[1])
local current = redis.call('ZCARD', KEYS[1])
if current >= tonumber(ARGV[2]) then
return 0
end
redis.call('ZADD', KEYS[1], ARGV[3], ARGV[4])
redis.call('PEXPIRE', KEYS[1], ARGV[5])
return 1
`
try {
const result = await redis.eval(
script,
1,
key,
now.toString(),
DISTRIBUTED_MAX_INFLIGHT_PER_OWNER.toString(),
expiresAt.toString(),
leaseId,
leaseTtlMs.toString()
)
return Number(result) === 1 ? 'acquired' : 'limit_exceeded'
} catch (error) {
logger.error('Failed to acquire distributed owner lease', { ownerKey, error })
return 'unavailable'
}
}
async function releaseDistributedLease(ownerKey: string, leaseId: string): Promise<void> {
const redis = getRedisClient()
if (!redis) return
const key = ownerRedisKey(ownerKey)
const script = `
redis.call('ZREM', KEYS[1], ARGV[1])
if redis.call('ZCARD', KEYS[1]) == 0 then
redis.call('DEL', KEYS[1])
end
return 1
`
try {
await redis.eval(script, 1, key, leaseId)
} catch (error) {
logger.error('Failed to release distributed owner lease', { ownerKey, error })
}
}
function queueLength(): number {
return queueSize
}
function maybeClearDrainRetry() {
if (queueSize === 0 && queueDrainRetryTimeout) {
clearTimeout(queueDrainRetryTimeout)
queueDrainRetryTimeout = null
}
}
function getOrCreateOwnerState(ownerKey: string, ownerWeight: number): OwnerState {
const existing = ownerStates.get(ownerKey)
if (existing) {
existing.weight = Math.max(existing.weight, ownerWeight)
return existing
}
const ownerState: OwnerState = {
ownerKey,
weight: ownerWeight,
activeExecutions: 0,
queueHead: null,
queueTail: null,
queueLength: 0,
burstRemaining: 0,
}
ownerStates.set(ownerKey, ownerState)
return ownerState
}
function addOwnerToRing(ownerKey: string) {
if (queuedOwnerRing.includes(ownerKey)) return
queuedOwnerRing.push(ownerKey)
}
function removeOwnerFromRing(ownerKey: string) {
const idx = queuedOwnerRing.indexOf(ownerKey)
if (idx === -1) return
queuedOwnerRing.splice(idx, 1)
if (queuedOwnerRing.length === 0) {
queuedOwnerCursor = 0
return
}
if (idx < queuedOwnerCursor) {
queuedOwnerCursor--
} else if (queuedOwnerCursor >= queuedOwnerRing.length) {
queuedOwnerCursor = 0
}
}
function maybeCleanupOwner(ownerKey: string) {
const owner = ownerStates.get(ownerKey)
if (!owner) return
if (owner.queueLength === 0) {
removeOwnerFromRing(ownerKey)
}
if (owner.queueLength === 0 && owner.activeExecutions === 0) {
ownerStates.delete(ownerKey)
}
}
function removeQueueNode(node: QueueNode): QueuedExecution {
const owner = ownerStates.get(node.ownerKey)
if (!owner) {
queueNodes.delete(node.value.id)
queueSize = Math.max(0, queueSize - 1)
maybeClearDrainRetry()
return node.value
}
const { prev, next, value } = node
if (prev) prev.next = next
else owner.queueHead = next
if (next) next.prev = prev
else owner.queueTail = prev
node.prev = null
node.next = null
queueNodes.delete(value.id)
owner.queueLength--
queueSize--
maybeCleanupOwner(owner.ownerKey)
maybeClearDrainRetry()
return value
}
function shiftQueuedExecutionForOwner(owner: OwnerState): QueuedExecution | null {
if (!owner.queueHead) return null
return removeQueueNode(owner.queueHead)
}
function removeQueuedExecutionById(queueId: number): QueuedExecution | null {
const node = queueNodes.get(queueId)
if (!node) return null
return removeQueueNode(node)
}
function pushQueuedExecution(owner: OwnerState, queued: QueuedExecution) {
const node: QueueNode = {
ownerKey: owner.ownerKey,
value: queued,
prev: owner.queueTail,
next: null,
}
if (owner.queueTail) {
owner.queueTail.next = node
} else {
owner.queueHead = node
}
owner.queueTail = node
owner.queueLength++
owner.burstRemaining = 0
addOwnerToRing(owner.ownerKey)
queueNodes.set(queued.id, node)
queueSize++
}
function selectOwnerForDispatch(): OwnerState | null {
if (queuedOwnerRing.length === 0) return null
let visited = 0
while (queuedOwnerRing.length > 0 && visited < queuedOwnerRing.length) {
if (queuedOwnerCursor >= queuedOwnerRing.length) {
queuedOwnerCursor = 0
}
const ownerKey = queuedOwnerRing[queuedOwnerCursor]
if (!ownerKey) return null
const owner = ownerStates.get(ownerKey)
if (!owner) {
removeOwnerFromRing(ownerKey)
continue
}
if (owner.queueLength === 0) {
owner.burstRemaining = 0
removeOwnerFromRing(ownerKey)
visited++
continue
}
if (owner.activeExecutions >= MAX_ACTIVE_PER_OWNER) {
owner.burstRemaining = 0
queuedOwnerCursor = (queuedOwnerCursor + 1) % queuedOwnerRing.length
visited++
continue
}
if (owner.burstRemaining <= 0) {
owner.burstRemaining = owner.weight
}
owner.burstRemaining--
if (owner.burstRemaining <= 0) {
queuedOwnerCursor = (queuedOwnerCursor + 1) % queuedOwnerRing.length
}
return owner
}
return null
}
function scheduleDrainRetry() {
if (queueDrainRetryTimeout || queueSize === 0) return
queueDrainRetryTimeout = setTimeout(() => {
queueDrainRetryTimeout = null
if (queueSize === 0) return
drainQueue()
}, QUEUE_RETRY_DELAY_MS)
}
function handleWorkerMessage(workerId: number, message: unknown) {
if (typeof message !== 'object' || message === null) return
const msg = message as Record<string, unknown>
@@ -121,6 +518,11 @@ function handleWorkerMessage(workerId: number, message: unknown) {
workerInfo!.pendingExecutions.delete(execId)
workerInfo!.activeExecutions--
totalActiveExecutions--
const owner = ownerStates.get(pending.ownerKey)
if (owner) {
owner.activeExecutions = Math.max(0, owner.activeExecutions - 1)
maybeCleanupOwner(owner.ownerKey)
}
pending.resolve(msg.result as IsolatedVMExecutionResult)
resetWorkerIdleTimeout(workerId)
drainQueue()
@@ -135,7 +537,26 @@ function handleWorkerMessage(workerId: number, message: unknown) {
url: string
optionsJson?: string
}
let options: RequestInit | undefined
if (typeof url !== 'string' || url.length === 0) {
workerInfo?.process.send({
type: 'fetchResponse',
fetchId,
response: JSON.stringify({ error: 'Invalid fetch URL' }),
})
return
}
if (optionsJson && optionsJson.length > MAX_FETCH_OPTIONS_JSON_CHARS) {
workerInfo?.process.send({
type: 'fetchResponse',
fetchId,
response: JSON.stringify({
error: `Fetch options exceed maximum payload size (${MAX_FETCH_OPTIONS_JSON_CHARS} chars)`,
}),
})
return
}
let options: IsolatedFetchOptions | undefined
if (optionsJson) {
try {
options = JSON.parse(optionsJson)
@@ -185,6 +606,11 @@ function cleanupWorker(workerId: number) {
for (const [id, pending] of workerInfo.pendingExecutions) {
clearTimeout(pending.timeout)
totalActiveExecutions--
const owner = ownerStates.get(pending.ownerKey)
if (owner) {
owner.activeExecutions = Math.max(0, owner.activeExecutions - 1)
maybeCleanupOwner(owner.ownerKey)
}
pending.resolve({
result: null,
stdout: '',
@@ -192,6 +618,7 @@ function cleanupWorker(workerId: number) {
})
workerInfo.pendingExecutions.delete(id)
}
workerInfo.activeExecutions = 0
workers.delete(workerId)
logger.info('Worker removed from pool', { workerId, poolSize: workers.size })
@@ -220,6 +647,16 @@ function resetWorkerIdleTimeout(workerId: number) {
function spawnWorker(): Promise<WorkerInfo> {
const workerId = nextWorkerId++
spawnInProgress++
let spawnSettled = false
const settleSpawnInProgress = () => {
if (spawnSettled) {
return false
}
spawnSettled = true
spawnInProgress--
return true
}
const workerInfo: WorkerInfo = {
process: null as unknown as ChildProcess,
@@ -233,7 +670,7 @@ function spawnWorker(): Promise<WorkerInfo> {
workerInfo.readyPromise = new Promise<void>((resolve, reject) => {
if (!checkNodeAvailable()) {
spawnInProgress--
settleSpawnInProgress()
reject(
new Error(
'Node.js is required for code execution but was not found. ' +
@@ -247,77 +684,85 @@ function spawnWorker(): Promise<WorkerInfo> {
const workerPath = path.join(currentDir, 'isolated-vm-worker.cjs')
if (!fs.existsSync(workerPath)) {
spawnInProgress--
settleSpawnInProgress()
reject(new Error(`Worker file not found at ${workerPath}`))
return
}
import('node:child_process').then(({ spawn }) => {
const proc = spawn('node', [workerPath], {
stdio: ['ignore', 'pipe', 'pipe', 'ipc'],
serialization: 'json',
})
workerInfo.process = proc
import('node:child_process')
.then(({ spawn }) => {
const proc = spawn('node', [workerPath], {
stdio: ['ignore', 'pipe', 'pipe', 'ipc'],
serialization: 'json',
})
workerInfo.process = proc
proc.on('message', (message: unknown) => handleWorkerMessage(workerId, message))
proc.on('message', (message: unknown) => handleWorkerMessage(workerId, message))
let stderrData = ''
proc.stderr?.on('data', (data: Buffer) => {
stderrData += data.toString()
})
let stderrData = ''
proc.stderr?.on('data', (data: Buffer) => {
stderrData += data.toString()
})
const startTimeout = setTimeout(() => {
proc.kill()
spawnInProgress--
workers.delete(workerId)
reject(new Error('Worker failed to start within timeout'))
}, 10000)
const startTimeout = setTimeout(() => {
proc.kill()
workers.delete(workerId)
if (!settleSpawnInProgress()) return
reject(new Error('Worker failed to start within timeout'))
}, 10000)
const readyHandler = (message: unknown) => {
if (
typeof message === 'object' &&
message !== null &&
(message as { type?: string }).type === 'ready'
) {
workerInfo.ready = true
spawnInProgress--
clearTimeout(startTimeout)
proc.off('message', readyHandler)
workers.set(workerId, workerInfo)
resetWorkerIdleTimeout(workerId)
logger.info('Worker spawned and ready', { workerId, poolSize: workers.size })
resolve()
const readyHandler = (message: unknown) => {
if (
typeof message === 'object' &&
message !== null &&
(message as { type?: string }).type === 'ready'
) {
if (!settleSpawnInProgress()) {
proc.off('message', readyHandler)
return
}
workerInfo.ready = true
clearTimeout(startTimeout)
proc.off('message', readyHandler)
workers.set(workerId, workerInfo)
resetWorkerIdleTimeout(workerId)
logger.info('Worker spawned and ready', { workerId, poolSize: workers.size })
resolve()
}
}
}
proc.on('message', readyHandler)
proc.on('message', readyHandler)
proc.on('exit', () => {
const wasStartupFailure = !workerInfo.ready
proc.on('exit', () => {
const wasStartupFailure = !workerInfo.ready
if (wasStartupFailure) {
spawnInProgress--
clearTimeout(startTimeout)
if (wasStartupFailure) {
clearTimeout(startTimeout)
if (!settleSpawnInProgress()) return
let errorMessage = 'Worker process exited unexpectedly'
if (stderrData.includes('isolated_vm') || stderrData.includes('MODULE_NOT_FOUND')) {
errorMessage =
'Code execution requires the isolated-vm native module which failed to load. ' +
'This usually means the module needs to be rebuilt for your Node.js version. ' +
'Please run: cd node_modules/isolated-vm && npm rebuild'
logger.error('isolated-vm module failed to load', { stderr: stderrData, workerId })
} else if (stderrData) {
errorMessage = `Worker process failed: ${stderrData.slice(0, 500)}`
logger.error('Worker process failed', { stderr: stderrData, workerId })
let errorMessage = 'Worker process exited unexpectedly'
if (stderrData.includes('isolated_vm') || stderrData.includes('MODULE_NOT_FOUND')) {
errorMessage =
'Code execution requires the isolated-vm native module which failed to load. ' +
'This usually means the module needs to be rebuilt for your Node.js version. ' +
'Please run: cd node_modules/isolated-vm && npm rebuild'
logger.error('isolated-vm module failed to load', { stderr: stderrData, workerId })
} else if (stderrData) {
errorMessage = `Worker process failed: ${stderrData.slice(0, 500)}`
logger.error('Worker process failed', { stderr: stderrData, workerId })
}
reject(new Error(errorMessage))
return
}
reject(new Error(errorMessage))
return
}
cleanupWorker(workerId)
drainQueue()
cleanupWorker(workerId)
drainQueue()
})
})
.catch((error) => {
if (!settleSpawnInProgress()) return
reject(error instanceof Error ? error : new Error('Failed to load child_process module'))
})
})
})
return workerInfo.readyPromise.then(() => workerInfo)
@@ -363,6 +808,7 @@ async function acquireWorker(): Promise<WorkerInfo | null> {
function dispatchToWorker(
workerInfo: WorkerInfo,
ownerState: OwnerState,
req: IsolatedVMExecutionRequest,
resolve: (result: IsolatedVMExecutionResult) => void
) {
@@ -377,6 +823,8 @@ function dispatchToWorker(
workerInfo.pendingExecutions.delete(execId)
workerInfo.activeExecutions--
totalActiveExecutions--
ownerState.activeExecutions = Math.max(0, ownerState.activeExecutions - 1)
maybeCleanupOwner(ownerState.ownerKey)
resolve({
result: null,
stdout: '',
@@ -386,9 +834,10 @@ function dispatchToWorker(
drainQueue()
}, req.timeoutMs + 1000)
workerInfo.pendingExecutions.set(execId, { resolve, timeout })
workerInfo.pendingExecutions.set(execId, { resolve, timeout, ownerKey: ownerState.ownerKey })
workerInfo.activeExecutions++
totalActiveExecutions++
ownerState.activeExecutions++
try {
workerInfo.process.send({ type: 'execute', executionId: execId, request: req })
@@ -397,6 +846,8 @@ function dispatchToWorker(
workerInfo.pendingExecutions.delete(execId)
workerInfo.activeExecutions--
totalActiveExecutions--
ownerState.activeExecutions = Math.max(0, ownerState.activeExecutions - 1)
maybeCleanupOwner(ownerState.ownerKey)
resolve({
result: null,
stdout: '',
@@ -408,30 +859,62 @@ function dispatchToWorker(
}
function enqueueExecution(
ownerState: OwnerState,
req: IsolatedVMExecutionRequest,
resolve: (result: IsolatedVMExecutionResult) => void
) {
if (queueLength() >= MAX_QUEUE_SIZE) {
resolve({
result: null,
stdout: '',
error: {
message: `Execution queue is full (${MAX_QUEUE_SIZE}). Please retry later.`,
name: 'QueueFullError',
},
})
return
}
if (ownerState.queueLength >= MAX_QUEUED_PER_OWNER) {
resolve({
result: null,
stdout: '',
error: {
message: `Owner queue limit reached (${MAX_QUEUED_PER_OWNER}). Please retry later.`,
name: 'OwnerQueueLimitError',
},
})
return
}
const queueId = ++queueIdCounter
const queueTimeout = setTimeout(() => {
const idx = executionQueue.findIndex((q) => q.resolve === resolve)
if (idx !== -1) {
executionQueue.splice(idx, 1)
resolve({
result: null,
stdout: '',
error: {
message: `Execution queued too long (${QUEUE_TIMEOUT_MS}ms). All workers are busy.`,
name: 'QueueTimeoutError',
},
})
}
const queued = removeQueuedExecutionById(queueId)
if (!queued) return
resolve({
result: null,
stdout: '',
error: {
message: `Execution queued too long (${QUEUE_TIMEOUT_MS}ms). All workers are busy.`,
name: 'QueueTimeoutError',
},
})
}, QUEUE_TIMEOUT_MS)
executionQueue.push({ req, resolve, queueTimeout })
pushQueuedExecution(ownerState, {
id: queueId,
ownerKey: ownerState.ownerKey,
req,
resolve,
queueTimeout,
})
logger.info('Execution queued', {
queueLength: executionQueue.length,
queueLength: queueLength(),
ownerKey: ownerState.ownerKey,
ownerQueueLength: ownerState.queueLength,
totalActive: totalActiveExecutions,
poolSize: workers.size,
})
drainQueue()
}
/**
@@ -439,21 +922,35 @@ function enqueueExecution(
* executions to available workers.
*/
function drainQueue() {
while (executionQueue.length > 0 && totalActiveExecutions < MAX_CONCURRENT) {
while (queueLength() > 0 && totalActiveExecutions < MAX_CONCURRENT) {
const worker = selectWorker()
if (!worker) {
const currentPoolSize = workers.size + spawnInProgress
if (currentPoolSize < POOL_SIZE) {
spawnWorker()
.then(() => drainQueue())
.catch((err) => logger.error('Failed to spawn worker during drain', { err }))
.catch((err) => {
logger.error('Failed to spawn worker during drain', { err })
scheduleDrainRetry()
})
}
break
}
const queued = executionQueue.shift()!
const owner = selectOwnerForDispatch()
if (!owner) {
scheduleDrainRetry()
break
}
const queued = shiftQueuedExecutionForOwner(owner)
if (!queued) {
owner.burstRemaining = 0
maybeCleanupOwner(owner.ownerKey)
continue
}
clearTimeout(queued.queueTimeout)
dispatchToWorker(worker, queued.req, queued.resolve)
dispatchToWorker(worker, owner, queued.req, queued.resolve)
}
}
@@ -463,14 +960,80 @@ function drainQueue() {
export async function executeInIsolatedVM(
req: IsolatedVMExecutionRequest
): Promise<IsolatedVMExecutionResult> {
if (totalActiveExecutions >= MAX_CONCURRENT) {
return new Promise((resolve) => enqueueExecution(req, resolve))
const ownerKey = normalizeOwnerKey(req.ownerKey)
const ownerWeight = normalizeOwnerWeight(req.ownerWeight)
const ownerState = getOrCreateOwnerState(ownerKey, ownerWeight)
const distributedLeaseId = `${req.requestId}:${Date.now()}:${Math.random().toString(36).slice(2, 10)}`
const leaseAcquireResult = await tryAcquireDistributedLease(
ownerKey,
distributedLeaseId,
req.timeoutMs
)
if (leaseAcquireResult === 'limit_exceeded') {
return {
result: null,
stdout: '',
error: {
message: `Owner in-flight limit reached (${DISTRIBUTED_MAX_INFLIGHT_PER_OWNER}). Please retry later.`,
name: 'OwnerInFlightLimitError',
},
}
}
if (leaseAcquireResult === 'unavailable') {
return {
result: null,
stdout: '',
error: {
message: 'Distributed fairness is temporarily unavailable. Please retry.',
name: 'DistributedFairnessUnavailableError',
},
}
}
const workerInfo = await acquireWorker()
if (!workerInfo) {
return new Promise((resolve) => enqueueExecution(req, resolve))
let settled = false
const releaseLease = () => {
if (settled) return
settled = true
releaseDistributedLease(ownerKey, distributedLeaseId).catch((error) => {
logger.error('Failed to release distributed lease', { ownerKey, error })
})
}
return new Promise((resolve) => dispatchToWorker(workerInfo, req, resolve))
return new Promise<IsolatedVMExecutionResult>((resolve) => {
const resolveWithRelease = (result: IsolatedVMExecutionResult) => {
releaseLease()
resolve(result)
}
if (
totalActiveExecutions >= MAX_CONCURRENT ||
ownerState.activeExecutions >= MAX_ACTIVE_PER_OWNER
) {
enqueueExecution(ownerState, req, resolveWithRelease)
return
}
acquireWorker()
.then((workerInfo) => {
if (!workerInfo) {
enqueueExecution(ownerState, req, resolveWithRelease)
return
}
dispatchToWorker(workerInfo, ownerState, req, resolveWithRelease)
if (queueLength() > 0) {
drainQueue()
}
})
.catch((error) => {
logger.error('Failed to acquire worker for execution', { error, ownerKey })
enqueueExecution(ownerState, req, resolveWithRelease)
})
}).finally(() => {
releaseLease()
if (ownerState.queueLength === 0 && ownerState.activeExecutions === 0) {
maybeCleanupOwner(ownerState.ownerKey)
}
})
}

View File

@@ -124,6 +124,7 @@ export interface PreprocessExecutionOptions {
workspaceId?: string // If known, used for billing resolution
loggingSession?: LoggingSession // If provided, will be used for error logging
isResumeContext?: boolean // If true, allows fallback billing on resolution failure (for paused workflow resumes)
useAuthenticatedUserAsActor?: boolean // If true, use the authenticated userId as actorUserId (for client-side executions and personal API keys)
/** @deprecated No longer used - background/async executions always use deployed state */
useDraftState?: boolean
}
@@ -170,6 +171,7 @@ export async function preprocessExecution(
workspaceId: providedWorkspaceId,
loggingSession: providedLoggingSession,
isResumeContext = false,
useAuthenticatedUserAsActor = false,
} = options
logger.info(`[${requestId}] Starting execution preprocessing`, {
@@ -257,7 +259,14 @@ export async function preprocessExecution(
let actorUserId: string | null = null
try {
if (workspaceId) {
// For client-side executions and personal API keys, the authenticated
// user is the billing and permission actor — not the workspace owner.
if (useAuthenticatedUserAsActor && userId) {
actorUserId = userId
logger.info(`[${requestId}] Using authenticated user as actor: ${actorUserId}`)
}
if (!actorUserId && workspaceId) {
actorUserId = await getWorkspaceBilledAccountUserId(workspaceId)
if (actorUserId) {
logger.info(`[${requestId}] Using workspace billed account: ${actorUserId}`)

View File

@@ -221,7 +221,8 @@ export async function executeTool(
// If it's a custom tool, use the async version with workflowId
if (isCustomTool(normalizedToolId)) {
const workflowId = params._context?.workflowId
tool = await getToolAsync(normalizedToolId, workflowId)
const userId = params._context?.userId
tool = await getToolAsync(normalizedToolId, workflowId, userId)
if (!tool) {
logger.error(`[${requestId}] Custom tool not found: ${normalizedToolId}`)
}
@@ -260,26 +261,25 @@ export async function executeTool(
try {
const baseUrl = getBaseUrl()
const workflowId = contextParams._context?.workflowId
const userId = contextParams._context?.userId
const tokenPayload: OAuthTokenPayload = {
credentialId: contextParams.credential as string,
}
// Add workflowId if it exists in params, context, or executionContext
const workflowId =
contextParams.workflowId ||
contextParams._context?.workflowId ||
executionContext?.workflowId
if (workflowId) {
tokenPayload.workflowId = workflowId
}
logger.info(`[${requestId}] Fetching access token from ${baseUrl}/api/auth/oauth/token`)
// Build token URL and also include workflowId in query so server auth can read it
const tokenUrlObj = new URL('/api/auth/oauth/token', baseUrl)
if (workflowId) {
tokenUrlObj.searchParams.set('workflowId', workflowId)
}
if (userId) {
tokenUrlObj.searchParams.set('userId', userId)
}
// Always send Content-Type; add internal auth on server-side runs
const tokenHeaders: Record<string, string> = { 'Content-Type': 'application/json' }
@@ -583,6 +583,10 @@ async function executeToolRequest(
if (workflowId) {
fullUrlObj.searchParams.set('workflowId', workflowId)
}
const userId = params._context?.userId
if (userId) {
fullUrlObj.searchParams.set('userId', userId)
}
}
const fullUrl = fullUrlObj.toString()
@@ -929,8 +933,8 @@ async function executeMcpTool(
)
}
const workspaceId = params._context?.workspaceId || executionContext?.workspaceId
const workflowId = params._context?.workflowId || executionContext?.workflowId
const workspaceId = params._context?.workspaceId
const workflowId = params._context?.workflowId
if (!workspaceId) {
return {

View File

@@ -311,7 +311,8 @@ export function getTool(toolId: string): ToolConfig | undefined {
// Get a tool by its ID asynchronously (supports server-side)
export async function getToolAsync(
toolId: string,
workflowId?: string
workflowId?: string,
userId?: string
): Promise<ToolConfig | undefined> {
// Check for built-in tools
const builtInTool = tools[toolId]
@@ -319,7 +320,7 @@ export async function getToolAsync(
// Check if it's a custom tool
if (isCustomTool(toolId)) {
return fetchCustomToolFromAPI(toolId, workflowId)
return fetchCustomToolFromAPI(toolId, workflowId, userId)
}
return undefined
@@ -366,7 +367,8 @@ function createToolConfig(customTool: any, customToolId: string): ToolConfig {
// Create a tool config from a custom tool definition by fetching from API
async function fetchCustomToolFromAPI(
customToolId: string,
workflowId?: string
workflowId?: string,
userId?: string
): Promise<ToolConfig | undefined> {
const identifier = customToolId.replace('custom_', '')
@@ -374,10 +376,12 @@ async function fetchCustomToolFromAPI(
const baseUrl = getBaseUrl()
const url = new URL('/api/tools/custom', baseUrl)
// Add workflowId as a query parameter if available
if (workflowId) {
url.searchParams.append('workflowId', workflowId)
}
if (userId) {
url.searchParams.append('userId', userId)
}
// For server-side calls (during workflow execution), use internal JWT token
const headers: Record<string, string> = {}