mirror of
https://github.com/simstudioai/sim.git
synced 2026-02-07 05:05:15 -05:00
fix(function): isolated-vm worker pool to prevent single-worker bottleneck + execution user id resolution (#3155)
* fix(executor): isolated-vm worker pool to prevent single-worker bottleneck * chore(helm): add isolated-vm worker pool env vars to values.yaml * fix(userid): resolution for fair scheduling * add fallback back * add to helm charts * remove constant fallbacks * fix * address bugbot comments * fix fallbacks * one more bugbot comment --------- Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
This commit is contained in:
@@ -845,6 +845,8 @@ export async function POST(req: NextRequest) {
|
||||
contextVariables,
|
||||
timeoutMs: timeout,
|
||||
requestId,
|
||||
ownerKey: `user:${auth.userId}`,
|
||||
ownerWeight: 1,
|
||||
})
|
||||
|
||||
const executionTime = Date.now() - startTime
|
||||
|
||||
@@ -325,6 +325,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
requestId
|
||||
)
|
||||
|
||||
// Client-side sessions and personal API keys bill/permission-check the
|
||||
// authenticated user, not the workspace billed account.
|
||||
const useAuthenticatedUserAsActor =
|
||||
isClientSession || (auth.authType === 'api_key' && auth.apiKeyType === 'personal')
|
||||
|
||||
const preprocessResult = await preprocessExecution({
|
||||
workflowId,
|
||||
userId,
|
||||
@@ -334,6 +339,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
checkDeployment: !shouldUseDraftState,
|
||||
loggingSession,
|
||||
useDraftState: shouldUseDraftState,
|
||||
useAuthenticatedUserAsActor,
|
||||
})
|
||||
|
||||
if (!preprocessResult.success) {
|
||||
|
||||
@@ -326,6 +326,7 @@ export class AgentBlockHandler implements BlockHandler {
|
||||
_context: {
|
||||
workflowId: ctx.workflowId,
|
||||
workspaceId: ctx.workspaceId,
|
||||
userId: ctx.userId,
|
||||
isDeployedContext: ctx.isDeployedContext,
|
||||
},
|
||||
},
|
||||
|
||||
@@ -72,6 +72,7 @@ export class ApiBlockHandler implements BlockHandler {
|
||||
workflowId: ctx.workflowId,
|
||||
workspaceId: ctx.workspaceId,
|
||||
executionId: ctx.executionId,
|
||||
userId: ctx.userId,
|
||||
isDeployedContext: ctx.isDeployedContext,
|
||||
},
|
||||
},
|
||||
|
||||
@@ -48,6 +48,7 @@ export async function evaluateConditionExpression(
|
||||
_context: {
|
||||
workflowId: ctx.workflowId,
|
||||
workspaceId: ctx.workspaceId,
|
||||
userId: ctx.userId,
|
||||
isDeployedContext: ctx.isDeployedContext,
|
||||
},
|
||||
},
|
||||
|
||||
@@ -39,6 +39,7 @@ export class FunctionBlockHandler implements BlockHandler {
|
||||
_context: {
|
||||
workflowId: ctx.workflowId,
|
||||
workspaceId: ctx.workspaceId,
|
||||
userId: ctx.userId,
|
||||
isDeployedContext: ctx.isDeployedContext,
|
||||
},
|
||||
},
|
||||
|
||||
@@ -66,6 +66,7 @@ export class GenericBlockHandler implements BlockHandler {
|
||||
workflowId: ctx.workflowId,
|
||||
workspaceId: ctx.workspaceId,
|
||||
executionId: ctx.executionId,
|
||||
userId: ctx.userId,
|
||||
isDeployedContext: ctx.isDeployedContext,
|
||||
},
|
||||
},
|
||||
|
||||
@@ -605,6 +605,7 @@ export class HumanInTheLoopBlockHandler implements BlockHandler {
|
||||
_context: {
|
||||
workflowId: ctx.workflowId,
|
||||
workspaceId: ctx.workspaceId,
|
||||
userId: ctx.userId,
|
||||
isDeployedContext: ctx.isDeployedContext,
|
||||
},
|
||||
blockData: blockDataWithPause,
|
||||
|
||||
@@ -511,6 +511,8 @@ export class LoopOrchestrator {
|
||||
contextVariables: {},
|
||||
timeoutMs: LOOP_CONDITION_TIMEOUT_MS,
|
||||
requestId,
|
||||
ownerKey: `user:${ctx.userId}`,
|
||||
ownerWeight: 1,
|
||||
})
|
||||
|
||||
if (vmResult.error) {
|
||||
|
||||
@@ -1,7 +1,4 @@
|
||||
import { db } from '@sim/db'
|
||||
import { workflow } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { authenticateApiKeyFromHeader, updateApiKeyLastUsed } from '@/lib/api-key/service'
|
||||
import { getSession } from '@/lib/auth'
|
||||
@@ -13,35 +10,33 @@ export interface AuthResult {
|
||||
success: boolean
|
||||
userId?: string
|
||||
authType?: 'session' | 'api_key' | 'internal_jwt'
|
||||
apiKeyType?: 'personal' | 'workspace'
|
||||
error?: string
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves userId from a verified internal JWT token.
|
||||
* Extracts workflowId/userId from URL params or POST body, then looks up userId if needed.
|
||||
* Extracts userId from the JWT payload, URL search params, or POST body.
|
||||
*/
|
||||
async function resolveUserFromJwt(
|
||||
request: NextRequest,
|
||||
verificationUserId: string | null,
|
||||
options: { requireWorkflowId?: boolean }
|
||||
): Promise<AuthResult> {
|
||||
let workflowId: string | null = null
|
||||
let userId: string | null = verificationUserId
|
||||
|
||||
const { searchParams } = new URL(request.url)
|
||||
workflowId = searchParams.get('workflowId')
|
||||
if (!userId) {
|
||||
const { searchParams } = new URL(request.url)
|
||||
userId = searchParams.get('userId')
|
||||
}
|
||||
|
||||
if (!workflowId && !userId && request.method === 'POST') {
|
||||
if (!userId && request.method === 'POST') {
|
||||
try {
|
||||
const clonedRequest = request.clone()
|
||||
const bodyText = await clonedRequest.text()
|
||||
if (bodyText) {
|
||||
const body = JSON.parse(bodyText)
|
||||
workflowId = body.workflowId || body._context?.workflowId
|
||||
userId = userId || body.userId || body._context?.userId
|
||||
userId = body.userId || body._context?.userId || null
|
||||
}
|
||||
} catch {
|
||||
// Ignore JSON parse errors
|
||||
@@ -52,22 +47,8 @@ async function resolveUserFromJwt(
|
||||
return { success: true, userId, authType: 'internal_jwt' }
|
||||
}
|
||||
|
||||
if (workflowId) {
|
||||
const [workflowData] = await db
|
||||
.select({ userId: workflow.userId })
|
||||
.from(workflow)
|
||||
.where(eq(workflow.id, workflowId))
|
||||
.limit(1)
|
||||
|
||||
if (!workflowData) {
|
||||
return { success: false, error: 'Workflow not found' }
|
||||
}
|
||||
|
||||
return { success: true, userId: workflowData.userId, authType: 'internal_jwt' }
|
||||
}
|
||||
|
||||
if (options.requireWorkflowId !== false) {
|
||||
return { success: false, error: 'workflowId or userId required for internal JWT calls' }
|
||||
return { success: false, error: 'userId required for internal JWT calls' }
|
||||
}
|
||||
|
||||
return { success: true, authType: 'internal_jwt' }
|
||||
@@ -222,6 +203,7 @@ export async function checkHybridAuth(
|
||||
success: true,
|
||||
userId: result.userId!,
|
||||
authType: 'api_key',
|
||||
apiKeyType: result.keyType,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -183,6 +183,24 @@ export const env = createEnv({
|
||||
EXECUTION_TIMEOUT_ASYNC_TEAM: z.string().optional().default('5400'), // 90 minutes
|
||||
EXECUTION_TIMEOUT_ASYNC_ENTERPRISE: z.string().optional().default('5400'), // 90 minutes
|
||||
|
||||
// Isolated-VM Worker Pool Configuration
|
||||
IVM_POOL_SIZE: z.string().optional().default('4'), // Max worker processes in pool
|
||||
IVM_MAX_CONCURRENT: z.string().optional().default('10000'), // Max concurrent executions globally
|
||||
IVM_MAX_PER_WORKER: z.string().optional().default('2500'), // Max concurrent executions per worker
|
||||
IVM_WORKER_IDLE_TIMEOUT_MS: z.string().optional().default('60000'), // Worker idle cleanup timeout (ms)
|
||||
IVM_MAX_QUEUE_SIZE: z.string().optional().default('10000'), // Max pending queued executions in memory
|
||||
IVM_MAX_FETCH_RESPONSE_BYTES: z.string().optional().default('8388608'),// Max bytes read from sandbox fetch responses
|
||||
IVM_MAX_FETCH_RESPONSE_CHARS: z.string().optional().default('4000000'),// Max chars returned to sandbox from fetch body
|
||||
IVM_MAX_FETCH_OPTIONS_JSON_CHARS: z.string().optional().default('262144'), // Max JSON payload size for sandbox fetch options
|
||||
IVM_MAX_FETCH_URL_LENGTH: z.string().optional().default('8192'), // Max URL length accepted by sandbox fetch
|
||||
IVM_MAX_STDOUT_CHARS: z.string().optional().default('200000'), // Max captured stdout characters per execution
|
||||
IVM_MAX_ACTIVE_PER_OWNER: z.string().optional().default('200'), // Max active executions per owner (per process)
|
||||
IVM_MAX_QUEUED_PER_OWNER: z.string().optional().default('2000'), // Max queued executions per owner (per process)
|
||||
IVM_MAX_OWNER_WEIGHT: z.string().optional().default('5'), // Max accepted weight for weighted owner scheduling
|
||||
IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER:z.string().optional().default('2200'), // Max owner in-flight leases across replicas
|
||||
IVM_DISTRIBUTED_LEASE_MIN_TTL_MS: z.string().optional().default('120000'), // Min TTL for distributed in-flight leases (ms)
|
||||
IVM_QUEUE_TIMEOUT_MS: z.string().optional().default('300000'), // Max queue wait before rejection (ms)
|
||||
|
||||
// Knowledge Base Processing Configuration - Shared across all processing methods
|
||||
KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes)
|
||||
KB_CONFIG_MAX_ATTEMPTS: z.number().optional().default(3), // Max retry attempts
|
||||
|
||||
@@ -103,6 +103,7 @@ export interface SecureFetchOptions {
|
||||
body?: string | Buffer | Uint8Array
|
||||
timeout?: number
|
||||
maxRedirects?: number
|
||||
maxResponseBytes?: number
|
||||
}
|
||||
|
||||
export class SecureFetchHeaders {
|
||||
@@ -165,6 +166,7 @@ export async function secureFetchWithPinnedIP(
|
||||
redirectCount = 0
|
||||
): Promise<SecureFetchResponse> {
|
||||
const maxRedirects = options.maxRedirects ?? DEFAULT_MAX_REDIRECTS
|
||||
const maxResponseBytes = options.maxResponseBytes
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const parsed = new URL(url)
|
||||
@@ -237,14 +239,32 @@ export async function secureFetchWithPinnedIP(
|
||||
}
|
||||
|
||||
const chunks: Buffer[] = []
|
||||
let totalBytes = 0
|
||||
let responseTerminated = false
|
||||
|
||||
res.on('data', (chunk: Buffer) => chunks.push(chunk))
|
||||
res.on('data', (chunk: Buffer) => {
|
||||
if (responseTerminated) return
|
||||
|
||||
totalBytes += chunk.length
|
||||
if (
|
||||
typeof maxResponseBytes === 'number' &&
|
||||
maxResponseBytes > 0 &&
|
||||
totalBytes > maxResponseBytes
|
||||
) {
|
||||
responseTerminated = true
|
||||
res.destroy(new Error(`Response exceeded maximum size of ${maxResponseBytes} bytes`))
|
||||
return
|
||||
}
|
||||
|
||||
chunks.push(chunk)
|
||||
})
|
||||
|
||||
res.on('error', (error) => {
|
||||
reject(error)
|
||||
})
|
||||
|
||||
res.on('end', () => {
|
||||
if (responseTerminated) return
|
||||
const bodyBuffer = Buffer.concat(chunks)
|
||||
const body = bodyBuffer.toString('utf-8')
|
||||
const headersRecord: Record<string, string> = {}
|
||||
|
||||
@@ -9,6 +9,21 @@ const USER_CODE_START_LINE = 4
|
||||
const pendingFetches = new Map()
|
||||
let fetchIdCounter = 0
|
||||
const FETCH_TIMEOUT_MS = 300000 // 5 minutes
|
||||
const MAX_STDOUT_CHARS = Number.parseInt(process.env.IVM_MAX_STDOUT_CHARS || '', 10) || 200000
|
||||
const MAX_FETCH_OPTIONS_JSON_CHARS =
|
||||
Number.parseInt(process.env.IVM_MAX_FETCH_OPTIONS_JSON_CHARS || '', 10) || 256 * 1024
|
||||
|
||||
function stringifyLogValue(value) {
|
||||
if (typeof value !== 'object' || value === null) {
|
||||
return String(value)
|
||||
}
|
||||
|
||||
try {
|
||||
return JSON.stringify(value)
|
||||
} catch {
|
||||
return '[unserializable]'
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract line and column from error stack or message
|
||||
@@ -101,8 +116,32 @@ function convertToCompatibleError(errorInfo, userCode) {
|
||||
async function executeCode(request) {
|
||||
const { code, params, envVars, contextVariables, timeoutMs, requestId } = request
|
||||
const stdoutChunks = []
|
||||
let stdoutLength = 0
|
||||
let stdoutTruncated = false
|
||||
let isolate = null
|
||||
|
||||
const appendStdout = (line) => {
|
||||
if (stdoutTruncated || !line) return
|
||||
|
||||
const remaining = MAX_STDOUT_CHARS - stdoutLength
|
||||
if (remaining <= 0) {
|
||||
stdoutTruncated = true
|
||||
stdoutChunks.push('[stdout truncated]\n')
|
||||
return
|
||||
}
|
||||
|
||||
if (line.length <= remaining) {
|
||||
stdoutChunks.push(line)
|
||||
stdoutLength += line.length
|
||||
return
|
||||
}
|
||||
|
||||
stdoutChunks.push(line.slice(0, remaining))
|
||||
stdoutChunks.push('\n[stdout truncated]\n')
|
||||
stdoutLength = MAX_STDOUT_CHARS
|
||||
stdoutTruncated = true
|
||||
}
|
||||
|
||||
try {
|
||||
isolate = new ivm.Isolate({ memoryLimit: 128 })
|
||||
const context = await isolate.createContext()
|
||||
@@ -111,18 +150,14 @@ async function executeCode(request) {
|
||||
await jail.set('global', jail.derefInto())
|
||||
|
||||
const logCallback = new ivm.Callback((...args) => {
|
||||
const message = args
|
||||
.map((arg) => (typeof arg === 'object' ? JSON.stringify(arg) : String(arg)))
|
||||
.join(' ')
|
||||
stdoutChunks.push(`${message}\n`)
|
||||
const message = args.map((arg) => stringifyLogValue(arg)).join(' ')
|
||||
appendStdout(`${message}\n`)
|
||||
})
|
||||
await jail.set('__log', logCallback)
|
||||
|
||||
const errorCallback = new ivm.Callback((...args) => {
|
||||
const message = args
|
||||
.map((arg) => (typeof arg === 'object' ? JSON.stringify(arg) : String(arg)))
|
||||
.join(' ')
|
||||
stdoutChunks.push(`ERROR: ${message}\n`)
|
||||
const message = args.map((arg) => stringifyLogValue(arg)).join(' ')
|
||||
appendStdout(`ERROR: ${message}\n`)
|
||||
})
|
||||
await jail.set('__error', errorCallback)
|
||||
|
||||
@@ -178,6 +213,9 @@ async function executeCode(request) {
|
||||
} catch {
|
||||
throw new Error('fetch options must be JSON-serializable');
|
||||
}
|
||||
if (optionsJson.length > ${MAX_FETCH_OPTIONS_JSON_CHARS}) {
|
||||
throw new Error('fetch options exceed maximum payload size');
|
||||
}
|
||||
}
|
||||
const resultJson = await __fetchRef.apply(undefined, [url, optionsJson], { result: { promise: true } });
|
||||
let result;
|
||||
|
||||
500
apps/sim/lib/execution/isolated-vm.test.ts
Normal file
500
apps/sim/lib/execution/isolated-vm.test.ts
Normal file
@@ -0,0 +1,500 @@
|
||||
import { EventEmitter } from 'node:events'
|
||||
import { afterEach, describe, expect, it, vi } from 'vitest'
|
||||
|
||||
type MockProc = EventEmitter & {
|
||||
connected: boolean
|
||||
stderr: EventEmitter
|
||||
send: (message: unknown) => boolean
|
||||
kill: () => boolean
|
||||
}
|
||||
|
||||
type SpawnFactory = () => MockProc
|
||||
type RedisEval = (...args: any[]) => unknown | Promise<unknown>
|
||||
type SecureFetchImpl = (...args: any[]) => unknown | Promise<unknown>
|
||||
|
||||
function createBaseProc(): MockProc {
|
||||
const proc = new EventEmitter() as MockProc
|
||||
proc.connected = true
|
||||
proc.stderr = new EventEmitter()
|
||||
proc.send = () => true
|
||||
proc.kill = () => {
|
||||
if (!proc.connected) return true
|
||||
proc.connected = false
|
||||
setImmediate(() => proc.emit('exit', 0))
|
||||
return true
|
||||
}
|
||||
return proc
|
||||
}
|
||||
|
||||
function createStartupFailureProc(): MockProc {
|
||||
const proc = createBaseProc()
|
||||
setImmediate(() => {
|
||||
proc.connected = false
|
||||
proc.emit('exit', 1)
|
||||
})
|
||||
return proc
|
||||
}
|
||||
|
||||
function createReadyProc(result: unknown): MockProc {
|
||||
const proc = createBaseProc()
|
||||
proc.send = (message: unknown) => {
|
||||
const msg = message as { type?: string; executionId?: number }
|
||||
if (msg.type === 'execute') {
|
||||
setImmediate(() => {
|
||||
proc.emit('message', {
|
||||
type: 'result',
|
||||
executionId: msg.executionId,
|
||||
result: { result, stdout: '' },
|
||||
})
|
||||
})
|
||||
}
|
||||
return true
|
||||
}
|
||||
setImmediate(() => proc.emit('message', { type: 'ready' }))
|
||||
return proc
|
||||
}
|
||||
|
||||
function createReadyProcWithDelay(delayMs: number): MockProc {
|
||||
const proc = createBaseProc()
|
||||
proc.send = (message: unknown) => {
|
||||
const msg = message as { type?: string; executionId?: number; request?: { requestId?: string } }
|
||||
if (msg.type === 'execute') {
|
||||
setTimeout(() => {
|
||||
proc.emit('message', {
|
||||
type: 'result',
|
||||
executionId: msg.executionId,
|
||||
result: { result: msg.request?.requestId ?? 'unknown', stdout: '' },
|
||||
})
|
||||
}, delayMs)
|
||||
}
|
||||
return true
|
||||
}
|
||||
setImmediate(() => proc.emit('message', { type: 'ready' }))
|
||||
return proc
|
||||
}
|
||||
|
||||
function createReadyFetchProxyProc(fetchMessage: { url: string; optionsJson?: string }): MockProc {
|
||||
const proc = createBaseProc()
|
||||
let currentExecutionId = 0
|
||||
|
||||
proc.send = (message: unknown) => {
|
||||
const msg = message as { type?: string; executionId?: number; request?: { requestId?: string } }
|
||||
|
||||
if (msg.type === 'execute') {
|
||||
currentExecutionId = msg.executionId ?? 0
|
||||
setImmediate(() => {
|
||||
proc.emit('message', {
|
||||
type: 'fetch',
|
||||
fetchId: 1,
|
||||
requestId: msg.request?.requestId ?? 'fetch-test',
|
||||
url: fetchMessage.url,
|
||||
optionsJson: fetchMessage.optionsJson,
|
||||
})
|
||||
})
|
||||
return true
|
||||
}
|
||||
|
||||
if (msg.type === 'fetchResponse') {
|
||||
const fetchResponse = message as { response?: string }
|
||||
setImmediate(() => {
|
||||
proc.emit('message', {
|
||||
type: 'result',
|
||||
executionId: currentExecutionId,
|
||||
result: { result: fetchResponse.response ?? '', stdout: '' },
|
||||
})
|
||||
})
|
||||
return true
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
setImmediate(() => proc.emit('message', { type: 'ready' }))
|
||||
return proc
|
||||
}
|
||||
|
||||
async function loadExecutionModule(options: {
|
||||
envOverrides?: Record<string, string>
|
||||
spawns: SpawnFactory[]
|
||||
redisEvalImpl?: RedisEval
|
||||
secureFetchImpl?: SecureFetchImpl
|
||||
}) {
|
||||
vi.resetModules()
|
||||
|
||||
const spawnQueue = [...options.spawns]
|
||||
const spawnMock = vi.fn(() => {
|
||||
const next = spawnQueue.shift()
|
||||
if (!next) {
|
||||
throw new Error('No mock spawn factory configured')
|
||||
}
|
||||
return next() as any
|
||||
})
|
||||
|
||||
vi.doMock('@sim/logger', () => ({
|
||||
createLogger: () => ({
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
}),
|
||||
}))
|
||||
|
||||
const secureFetchMock = vi.fn(
|
||||
options.secureFetchImpl ??
|
||||
(async () => ({
|
||||
ok: true,
|
||||
status: 200,
|
||||
statusText: 'OK',
|
||||
headers: new Map<string, string>(),
|
||||
text: async () => '',
|
||||
json: async () => ({}),
|
||||
arrayBuffer: async () => new ArrayBuffer(0),
|
||||
}))
|
||||
)
|
||||
vi.doMock('@/lib/core/security/input-validation.server', () => ({
|
||||
secureFetchWithValidation: secureFetchMock,
|
||||
}))
|
||||
|
||||
vi.doMock('@/lib/core/config/env', () => ({
|
||||
env: {
|
||||
IVM_POOL_SIZE: '1',
|
||||
IVM_MAX_CONCURRENT: '100',
|
||||
IVM_MAX_PER_WORKER: '100',
|
||||
IVM_WORKER_IDLE_TIMEOUT_MS: '60000',
|
||||
IVM_MAX_QUEUE_SIZE: '10',
|
||||
IVM_MAX_ACTIVE_PER_OWNER: '100',
|
||||
IVM_MAX_QUEUED_PER_OWNER: '10',
|
||||
IVM_MAX_OWNER_WEIGHT: '5',
|
||||
IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER: '100',
|
||||
IVM_DISTRIBUTED_LEASE_MIN_TTL_MS: '1000',
|
||||
IVM_QUEUE_TIMEOUT_MS: '1000',
|
||||
...(options.envOverrides ?? {}),
|
||||
},
|
||||
}))
|
||||
|
||||
const redisEval = options.redisEvalImpl ? vi.fn(options.redisEvalImpl) : undefined
|
||||
vi.doMock('@/lib/core/config/redis', () => ({
|
||||
getRedisClient: vi.fn(() =>
|
||||
redisEval
|
||||
? ({
|
||||
eval: redisEval,
|
||||
} as any)
|
||||
: null
|
||||
),
|
||||
}))
|
||||
|
||||
vi.doMock('node:child_process', () => ({
|
||||
execSync: vi.fn(() => Buffer.from('v23.11.0')),
|
||||
spawn: spawnMock,
|
||||
}))
|
||||
|
||||
const mod = await import('./isolated-vm')
|
||||
return { ...mod, spawnMock, secureFetchMock }
|
||||
}
|
||||
|
||||
describe('isolated-vm scheduler', () => {
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks()
|
||||
vi.resetModules()
|
||||
})
|
||||
|
||||
it('recovers from an initial spawn failure and drains queued work', async () => {
|
||||
const { executeInIsolatedVM, spawnMock } = await loadExecutionModule({
|
||||
spawns: [createStartupFailureProc, () => createReadyProc('ok')],
|
||||
})
|
||||
|
||||
const result = await executeInIsolatedVM({
|
||||
code: 'return "ok"',
|
||||
params: {},
|
||||
envVars: {},
|
||||
contextVariables: {},
|
||||
timeoutMs: 100,
|
||||
requestId: 'req-1',
|
||||
})
|
||||
|
||||
expect(result.error).toBeUndefined()
|
||||
expect(result.result).toBe('ok')
|
||||
expect(spawnMock).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
|
||||
it('rejects new requests when the queue is full', async () => {
|
||||
const { executeInIsolatedVM } = await loadExecutionModule({
|
||||
envOverrides: {
|
||||
IVM_MAX_QUEUE_SIZE: '1',
|
||||
IVM_QUEUE_TIMEOUT_MS: '200',
|
||||
},
|
||||
spawns: [createStartupFailureProc, createStartupFailureProc, createStartupFailureProc],
|
||||
})
|
||||
|
||||
const firstPromise = executeInIsolatedVM({
|
||||
code: 'return 1',
|
||||
params: {},
|
||||
envVars: {},
|
||||
contextVariables: {},
|
||||
timeoutMs: 100,
|
||||
requestId: 'req-2',
|
||||
ownerKey: 'user:a',
|
||||
})
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 25))
|
||||
|
||||
const second = await executeInIsolatedVM({
|
||||
code: 'return 2',
|
||||
params: {},
|
||||
envVars: {},
|
||||
contextVariables: {},
|
||||
timeoutMs: 100,
|
||||
requestId: 'req-3',
|
||||
ownerKey: 'user:b',
|
||||
})
|
||||
|
||||
expect(second.error?.message).toContain('at capacity')
|
||||
|
||||
const first = await firstPromise
|
||||
expect(first.error?.message).toContain('timed out waiting')
|
||||
})
|
||||
|
||||
it('enforces per-owner queued limit', async () => {
|
||||
const { executeInIsolatedVM } = await loadExecutionModule({
|
||||
envOverrides: {
|
||||
IVM_MAX_QUEUED_PER_OWNER: '1',
|
||||
IVM_QUEUE_TIMEOUT_MS: '200',
|
||||
},
|
||||
spawns: [createStartupFailureProc, createStartupFailureProc, createStartupFailureProc],
|
||||
})
|
||||
|
||||
const firstPromise = executeInIsolatedVM({
|
||||
code: 'return 1',
|
||||
params: {},
|
||||
envVars: {},
|
||||
contextVariables: {},
|
||||
timeoutMs: 100,
|
||||
requestId: 'req-4',
|
||||
ownerKey: 'user:hog',
|
||||
})
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 25))
|
||||
|
||||
const second = await executeInIsolatedVM({
|
||||
code: 'return 2',
|
||||
params: {},
|
||||
envVars: {},
|
||||
contextVariables: {},
|
||||
timeoutMs: 100,
|
||||
requestId: 'req-5',
|
||||
ownerKey: 'user:hog',
|
||||
})
|
||||
|
||||
expect(second.error?.message).toContain('Too many concurrent')
|
||||
|
||||
const first = await firstPromise
|
||||
expect(first.error?.message).toContain('timed out waiting')
|
||||
})
|
||||
|
||||
it('enforces distributed owner in-flight lease limit when Redis is configured', async () => {
|
||||
const { executeInIsolatedVM } = await loadExecutionModule({
|
||||
envOverrides: {
|
||||
IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER: '1',
|
||||
REDIS_URL: 'redis://localhost:6379',
|
||||
},
|
||||
spawns: [() => createReadyProc('ok')],
|
||||
redisEvalImpl: (...args: any[]) => {
|
||||
const script = String(args[0] ?? '')
|
||||
if (script.includes('ZREMRANGEBYSCORE')) {
|
||||
return 0
|
||||
}
|
||||
return 1
|
||||
},
|
||||
})
|
||||
|
||||
const result = await executeInIsolatedVM({
|
||||
code: 'return "blocked"',
|
||||
params: {},
|
||||
envVars: {},
|
||||
contextVariables: {},
|
||||
timeoutMs: 100,
|
||||
requestId: 'req-6',
|
||||
ownerKey: 'user:distributed',
|
||||
})
|
||||
|
||||
expect(result.error?.message).toContain('Too many concurrent')
|
||||
})
|
||||
|
||||
it('fails closed when Redis is configured but unavailable', async () => {
|
||||
const { executeInIsolatedVM } = await loadExecutionModule({
|
||||
envOverrides: {
|
||||
REDIS_URL: 'redis://localhost:6379',
|
||||
},
|
||||
spawns: [() => createReadyProc('ok')],
|
||||
})
|
||||
|
||||
const result = await executeInIsolatedVM({
|
||||
code: 'return "blocked"',
|
||||
params: {},
|
||||
envVars: {},
|
||||
contextVariables: {},
|
||||
timeoutMs: 100,
|
||||
requestId: 'req-7',
|
||||
ownerKey: 'user:redis-down',
|
||||
})
|
||||
|
||||
expect(result.error?.message).toContain('temporarily unavailable')
|
||||
})
|
||||
|
||||
it('fails closed when Redis lease evaluation errors', async () => {
|
||||
const { executeInIsolatedVM } = await loadExecutionModule({
|
||||
envOverrides: {
|
||||
REDIS_URL: 'redis://localhost:6379',
|
||||
},
|
||||
spawns: [() => createReadyProc('ok')],
|
||||
redisEvalImpl: (...args: any[]) => {
|
||||
const script = String(args[0] ?? '')
|
||||
if (script.includes('ZREMRANGEBYSCORE')) {
|
||||
throw new Error('redis timeout')
|
||||
}
|
||||
return 1
|
||||
},
|
||||
})
|
||||
|
||||
const result = await executeInIsolatedVM({
|
||||
code: 'return "blocked"',
|
||||
params: {},
|
||||
envVars: {},
|
||||
contextVariables: {},
|
||||
timeoutMs: 100,
|
||||
requestId: 'req-8',
|
||||
ownerKey: 'user:redis-error',
|
||||
})
|
||||
|
||||
expect(result.error?.message).toContain('temporarily unavailable')
|
||||
})
|
||||
|
||||
it('applies weighted owner scheduling when draining queued executions', async () => {
|
||||
const { executeInIsolatedVM } = await loadExecutionModule({
|
||||
envOverrides: {
|
||||
IVM_MAX_PER_WORKER: '1',
|
||||
},
|
||||
spawns: [() => createReadyProcWithDelay(10)],
|
||||
})
|
||||
|
||||
const completionOrder: string[] = []
|
||||
const pushCompletion = (label: string) => (res: { result: unknown }) => {
|
||||
completionOrder.push(String(res.result ?? label))
|
||||
return res
|
||||
}
|
||||
|
||||
const p1 = executeInIsolatedVM({
|
||||
code: 'return 1',
|
||||
params: {},
|
||||
envVars: {},
|
||||
contextVariables: {},
|
||||
timeoutMs: 500,
|
||||
requestId: 'a-1',
|
||||
ownerKey: 'user:a',
|
||||
ownerWeight: 2,
|
||||
}).then(pushCompletion('a-1'))
|
||||
|
||||
const p2 = executeInIsolatedVM({
|
||||
code: 'return 2',
|
||||
params: {},
|
||||
envVars: {},
|
||||
contextVariables: {},
|
||||
timeoutMs: 500,
|
||||
requestId: 'a-2',
|
||||
ownerKey: 'user:a',
|
||||
ownerWeight: 2,
|
||||
}).then(pushCompletion('a-2'))
|
||||
|
||||
const p3 = executeInIsolatedVM({
|
||||
code: 'return 3',
|
||||
params: {},
|
||||
envVars: {},
|
||||
contextVariables: {},
|
||||
timeoutMs: 500,
|
||||
requestId: 'b-1',
|
||||
ownerKey: 'user:b',
|
||||
ownerWeight: 1,
|
||||
}).then(pushCompletion('b-1'))
|
||||
|
||||
const p4 = executeInIsolatedVM({
|
||||
code: 'return 4',
|
||||
params: {},
|
||||
envVars: {},
|
||||
contextVariables: {},
|
||||
timeoutMs: 500,
|
||||
requestId: 'b-2',
|
||||
ownerKey: 'user:b',
|
||||
ownerWeight: 1,
|
||||
}).then(pushCompletion('b-2'))
|
||||
|
||||
const p5 = executeInIsolatedVM({
|
||||
code: 'return 5',
|
||||
params: {},
|
||||
envVars: {},
|
||||
contextVariables: {},
|
||||
timeoutMs: 500,
|
||||
requestId: 'a-3',
|
||||
ownerKey: 'user:a',
|
||||
ownerWeight: 2,
|
||||
}).then(pushCompletion('a-3'))
|
||||
|
||||
await Promise.all([p1, p2, p3, p4, p5])
|
||||
|
||||
expect(completionOrder.slice(0, 3)).toEqual(['a-1', 'a-2', 'a-3'])
|
||||
expect(completionOrder).toEqual(['a-1', 'a-2', 'a-3', 'b-1', 'b-2'])
|
||||
})
|
||||
|
||||
it('rejects oversized fetch options payloads before outbound call', async () => {
|
||||
const { executeInIsolatedVM, secureFetchMock } = await loadExecutionModule({
|
||||
envOverrides: {
|
||||
IVM_MAX_FETCH_OPTIONS_JSON_CHARS: '50',
|
||||
},
|
||||
spawns: [
|
||||
() =>
|
||||
createReadyFetchProxyProc({
|
||||
url: 'https://example.com',
|
||||
optionsJson: 'x'.repeat(100),
|
||||
}),
|
||||
],
|
||||
})
|
||||
|
||||
const result = await executeInIsolatedVM({
|
||||
code: 'return "fetch-options"',
|
||||
params: {},
|
||||
envVars: {},
|
||||
contextVariables: {},
|
||||
timeoutMs: 100,
|
||||
requestId: 'req-fetch-options',
|
||||
})
|
||||
|
||||
const payload = JSON.parse(String(result.result))
|
||||
expect(payload.error).toContain('Fetch options exceed maximum payload size')
|
||||
expect(secureFetchMock).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('rejects overly long fetch URLs before outbound call', async () => {
|
||||
const { executeInIsolatedVM, secureFetchMock } = await loadExecutionModule({
|
||||
envOverrides: {
|
||||
IVM_MAX_FETCH_URL_LENGTH: '30',
|
||||
},
|
||||
spawns: [
|
||||
() =>
|
||||
createReadyFetchProxyProc({
|
||||
url: 'https://example.com/path/to/a/very/long/resource',
|
||||
}),
|
||||
],
|
||||
})
|
||||
|
||||
const result = await executeInIsolatedVM({
|
||||
code: 'return "fetch-url"',
|
||||
params: {},
|
||||
envVars: {},
|
||||
contextVariables: {},
|
||||
timeoutMs: 100,
|
||||
requestId: 'req-fetch-url',
|
||||
})
|
||||
|
||||
const payload = JSON.parse(String(result.result))
|
||||
expect(payload.error).toContain('fetch URL exceeds maximum length')
|
||||
expect(secureFetchMock).not.toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
File diff suppressed because it is too large
Load Diff
@@ -124,6 +124,7 @@ export interface PreprocessExecutionOptions {
|
||||
workspaceId?: string // If known, used for billing resolution
|
||||
loggingSession?: LoggingSession // If provided, will be used for error logging
|
||||
isResumeContext?: boolean // If true, allows fallback billing on resolution failure (for paused workflow resumes)
|
||||
useAuthenticatedUserAsActor?: boolean // If true, use the authenticated userId as actorUserId (for client-side executions and personal API keys)
|
||||
/** @deprecated No longer used - background/async executions always use deployed state */
|
||||
useDraftState?: boolean
|
||||
}
|
||||
@@ -170,6 +171,7 @@ export async function preprocessExecution(
|
||||
workspaceId: providedWorkspaceId,
|
||||
loggingSession: providedLoggingSession,
|
||||
isResumeContext = false,
|
||||
useAuthenticatedUserAsActor = false,
|
||||
} = options
|
||||
|
||||
logger.info(`[${requestId}] Starting execution preprocessing`, {
|
||||
@@ -257,7 +259,14 @@ export async function preprocessExecution(
|
||||
let actorUserId: string | null = null
|
||||
|
||||
try {
|
||||
if (workspaceId) {
|
||||
// For client-side executions and personal API keys, the authenticated
|
||||
// user is the billing and permission actor — not the workspace owner.
|
||||
if (useAuthenticatedUserAsActor && userId) {
|
||||
actorUserId = userId
|
||||
logger.info(`[${requestId}] Using authenticated user as actor: ${actorUserId}`)
|
||||
}
|
||||
|
||||
if (!actorUserId && workspaceId) {
|
||||
actorUserId = await getWorkspaceBilledAccountUserId(workspaceId)
|
||||
if (actorUserId) {
|
||||
logger.info(`[${requestId}] Using workspace billed account: ${actorUserId}`)
|
||||
|
||||
@@ -247,7 +247,8 @@ export async function executeTool(
|
||||
// If it's a custom tool, use the async version with workflowId
|
||||
if (isCustomTool(normalizedToolId)) {
|
||||
const workflowId = params._context?.workflowId
|
||||
tool = await getToolAsync(normalizedToolId, workflowId)
|
||||
const userId = params._context?.userId
|
||||
tool = await getToolAsync(normalizedToolId, workflowId, userId)
|
||||
if (!tool) {
|
||||
logger.error(`[${requestId}] Custom tool not found: ${normalizedToolId}`)
|
||||
}
|
||||
@@ -286,26 +287,25 @@ export async function executeTool(
|
||||
try {
|
||||
const baseUrl = getBaseUrl()
|
||||
|
||||
const workflowId = contextParams._context?.workflowId
|
||||
const userId = contextParams._context?.userId
|
||||
|
||||
const tokenPayload: OAuthTokenPayload = {
|
||||
credentialId: contextParams.credential as string,
|
||||
}
|
||||
|
||||
// Add workflowId if it exists in params, context, or executionContext
|
||||
const workflowId =
|
||||
contextParams.workflowId ||
|
||||
contextParams._context?.workflowId ||
|
||||
executionContext?.workflowId
|
||||
if (workflowId) {
|
||||
tokenPayload.workflowId = workflowId
|
||||
}
|
||||
|
||||
logger.info(`[${requestId}] Fetching access token from ${baseUrl}/api/auth/oauth/token`)
|
||||
|
||||
// Build token URL and also include workflowId in query so server auth can read it
|
||||
const tokenUrlObj = new URL('/api/auth/oauth/token', baseUrl)
|
||||
if (workflowId) {
|
||||
tokenUrlObj.searchParams.set('workflowId', workflowId)
|
||||
}
|
||||
if (userId) {
|
||||
tokenUrlObj.searchParams.set('userId', userId)
|
||||
}
|
||||
|
||||
// Always send Content-Type; add internal auth on server-side runs
|
||||
const tokenHeaders: Record<string, string> = { 'Content-Type': 'application/json' }
|
||||
@@ -609,6 +609,10 @@ async function executeToolRequest(
|
||||
if (workflowId) {
|
||||
fullUrlObj.searchParams.set('workflowId', workflowId)
|
||||
}
|
||||
const userId = params._context?.userId
|
||||
if (userId) {
|
||||
fullUrlObj.searchParams.set('userId', userId)
|
||||
}
|
||||
}
|
||||
|
||||
const fullUrl = fullUrlObj.toString()
|
||||
|
||||
@@ -311,7 +311,8 @@ export function getTool(toolId: string): ToolConfig | undefined {
|
||||
// Get a tool by its ID asynchronously (supports server-side)
|
||||
export async function getToolAsync(
|
||||
toolId: string,
|
||||
workflowId?: string
|
||||
workflowId?: string,
|
||||
userId?: string
|
||||
): Promise<ToolConfig | undefined> {
|
||||
// Check for built-in tools
|
||||
const builtInTool = tools[toolId]
|
||||
@@ -319,7 +320,7 @@ export async function getToolAsync(
|
||||
|
||||
// Check if it's a custom tool
|
||||
if (isCustomTool(toolId)) {
|
||||
return fetchCustomToolFromAPI(toolId, workflowId)
|
||||
return fetchCustomToolFromAPI(toolId, workflowId, userId)
|
||||
}
|
||||
|
||||
return undefined
|
||||
@@ -366,7 +367,8 @@ function createToolConfig(customTool: any, customToolId: string): ToolConfig {
|
||||
// Create a tool config from a custom tool definition by fetching from API
|
||||
async function fetchCustomToolFromAPI(
|
||||
customToolId: string,
|
||||
workflowId?: string
|
||||
workflowId?: string,
|
||||
userId?: string
|
||||
): Promise<ToolConfig | undefined> {
|
||||
const identifier = customToolId.replace('custom_', '')
|
||||
|
||||
@@ -374,10 +376,12 @@ async function fetchCustomToolFromAPI(
|
||||
const baseUrl = getBaseUrl()
|
||||
const url = new URL('/api/tools/custom', baseUrl)
|
||||
|
||||
// Add workflowId as a query parameter if available
|
||||
if (workflowId) {
|
||||
url.searchParams.append('workflowId', workflowId)
|
||||
}
|
||||
if (userId) {
|
||||
url.searchParams.append('userId', userId)
|
||||
}
|
||||
|
||||
// For server-side calls (during workflow execution), use internal JWT token
|
||||
const headers: Record<string, string> = {}
|
||||
|
||||
@@ -139,7 +139,25 @@ app:
|
||||
EXECUTION_TIMEOUT_ASYNC_PRO: "5400" # Pro tier async timeout (90 minutes)
|
||||
EXECUTION_TIMEOUT_ASYNC_TEAM: "5400" # Team tier async timeout (90 minutes)
|
||||
EXECUTION_TIMEOUT_ASYNC_ENTERPRISE: "5400" # Enterprise tier async timeout (90 minutes)
|
||||
|
||||
|
||||
# Isolated-VM Worker Pool Configuration
|
||||
IVM_POOL_SIZE: "4" # Max worker processes in pool
|
||||
IVM_MAX_CONCURRENT: "10000" # Max concurrent executions globally
|
||||
IVM_MAX_PER_WORKER: "2500" # Max concurrent executions per worker
|
||||
IVM_WORKER_IDLE_TIMEOUT_MS: "60000" # Worker idle cleanup timeout (ms)
|
||||
IVM_QUEUE_TIMEOUT_MS: "300000" # Max queue wait before rejection (ms)
|
||||
IVM_MAX_QUEUE_SIZE: "10000" # Max queued executions globally
|
||||
IVM_MAX_ACTIVE_PER_OWNER: "200" # Max concurrent executions per user
|
||||
IVM_MAX_QUEUED_PER_OWNER: "2000" # Max queued executions per user
|
||||
IVM_MAX_OWNER_WEIGHT: "5" # Max scheduling weight per user
|
||||
IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER: "2200" # Max in-flight per user across instances (Redis)
|
||||
IVM_DISTRIBUTED_LEASE_MIN_TTL_MS: "120000" # Min distributed lease TTL (ms)
|
||||
IVM_MAX_FETCH_RESPONSE_BYTES: "8388608" # Max fetch response size (8MB)
|
||||
IVM_MAX_FETCH_RESPONSE_CHARS: "4000000" # Max fetch response chars
|
||||
IVM_MAX_FETCH_URL_LENGTH: "8192" # Max fetch URL length
|
||||
IVM_MAX_FETCH_OPTIONS_JSON_CHARS: "262144" # Max fetch options payload (256KB)
|
||||
IVM_MAX_STDOUT_CHARS: "200000" # Max stdout capture per execution
|
||||
|
||||
# UI Branding & Whitelabeling Configuration
|
||||
NEXT_PUBLIC_BRAND_NAME: "Sim" # Custom brand name
|
||||
NEXT_PUBLIC_BRAND_LOGO_URL: "" # Custom logo URL (leave empty for default)
|
||||
|
||||
Reference in New Issue
Block a user