Files
sim/apps/sim/executor/execution/block-executor.ts
Vikhyath Mondreti 47eb060311 feat(enterprise): permission groups, access control (#2736)
* feat(permission-groups): integration/model access controls for enterprise

* feat: enterprise gating for BYOK, SSO, credential sets with org admin/owner checks

* execution time enforcement of mcp and custom tools

* add admin routes to cleanup permission group data

* fix not being on enterprise checks

* separate out orgs from billing system

* update the docs

* add custom tool blockers based on perm configs

* add migrations

* fix

* address greptile comments

* regen migrations

* fix default model picking based on user config

* cleaned up UI
2026-01-09 20:16:22 -08:00

684 lines
19 KiB
TypeScript

import { db } from '@sim/db'
import { mcpServers } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, inArray, isNull } from 'drizzle-orm'
import { getBaseUrl } from '@/lib/core/utils/urls'
import {
BlockType,
buildResumeApiUrl,
buildResumeUiUrl,
DEFAULTS,
EDGE,
isSentinelBlockType,
} from '@/executor/constants'
import type { DAGNode } from '@/executor/dag/builder'
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
import {
generatePauseContextId,
mapNodeMetadataToPauseScopes,
} from '@/executor/human-in-the-loop/utils.ts'
import type {
BlockHandler,
BlockLog,
BlockState,
ExecutionContext,
NormalizedBlockOutput,
} from '@/executor/types'
import { streamingResponseFormatProcessor } from '@/executor/utils'
import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors'
import { validateBlockType } from '@/executor/utils/permission-check'
import type { VariableResolver } from '@/executor/variables/resolver'
import type { SerializedBlock } from '@/serializer/types'
import type { SubflowType } from '@/stores/workflows/workflow/types'
const logger = createLogger('BlockExecutor')
export class BlockExecutor {
constructor(
private blockHandlers: BlockHandler[],
private resolver: VariableResolver,
private contextExtensions: ContextExtensions,
private state: BlockStateWriter
) {}
async execute(
ctx: ExecutionContext,
node: DAGNode,
block: SerializedBlock
): Promise<NormalizedBlockOutput> {
const handler = this.findHandler(block)
if (!handler) {
throw buildBlockExecutionError({
block,
context: ctx,
error: `No handler found for block type: ${block.metadata?.id ?? 'unknown'}`,
})
}
const blockType = block.metadata?.id ?? ''
const isSentinel = isSentinelBlockType(blockType)
let blockLog: BlockLog | undefined
if (!isSentinel) {
blockLog = this.createBlockLog(ctx, node.id, block, node)
ctx.blockLogs.push(blockLog)
this.callOnBlockStart(ctx, node, block)
}
const startTime = Date.now()
let resolvedInputs: Record<string, any> = {}
const nodeMetadata = this.buildNodeMetadata(node)
let cleanupSelfReference: (() => void) | undefined
if (block.metadata?.id === BlockType.HUMAN_IN_THE_LOOP) {
cleanupSelfReference = this.preparePauseResumeSelfReference(ctx, node, block, nodeMetadata)
}
try {
if (!isSentinel && blockType) {
await validateBlockType(ctx.userId, blockType, ctx)
}
resolvedInputs = this.resolver.resolveInputs(ctx, node.id, block.config.params, block)
if (block.metadata?.id === BlockType.AGENT && resolvedInputs.tools) {
resolvedInputs = await this.filterUnavailableMcpToolsForLog(ctx, resolvedInputs)
}
if (blockLog) {
blockLog.input = resolvedInputs
}
} catch (error) {
cleanupSelfReference?.()
return this.handleBlockError(
error,
ctx,
node,
block,
startTime,
blockLog,
resolvedInputs,
isSentinel,
'input_resolution'
)
}
cleanupSelfReference?.()
try {
const output = handler.executeWithNode
? await handler.executeWithNode(ctx, block, resolvedInputs, nodeMetadata)
: await handler.execute(ctx, block, resolvedInputs)
const isStreamingExecution =
output && typeof output === 'object' && 'stream' in output && 'execution' in output
let normalizedOutput: NormalizedBlockOutput
if (isStreamingExecution) {
const streamingExec = output as { stream: ReadableStream; execution: any }
if (ctx.onStream) {
await this.handleStreamingExecution(
ctx,
node,
block,
streamingExec,
resolvedInputs,
ctx.selectedOutputs ?? []
)
}
normalizedOutput = this.normalizeOutput(
streamingExec.execution.output ?? streamingExec.execution
)
} else {
normalizedOutput = this.normalizeOutput(output)
}
const duration = Date.now() - startTime
if (blockLog) {
blockLog.endedAt = new Date().toISOString()
blockLog.durationMs = duration
blockLog.success = true
blockLog.output = this.filterOutputForLog(block, normalizedOutput)
}
this.state.setBlockOutput(node.id, normalizedOutput, duration)
if (!isSentinel) {
const filteredOutput = this.filterOutputForLog(block, normalizedOutput)
this.callOnBlockComplete(ctx, node, block, resolvedInputs, filteredOutput, duration)
}
return normalizedOutput
} catch (error) {
return this.handleBlockError(
error,
ctx,
node,
block,
startTime,
blockLog,
resolvedInputs,
isSentinel,
'execution'
)
}
}
private buildNodeMetadata(node: DAGNode): {
nodeId: string
loopId?: string
parallelId?: string
branchIndex?: number
branchTotal?: number
} {
const metadata = node?.metadata ?? {}
return {
nodeId: node.id,
loopId: metadata.loopId,
parallelId: metadata.parallelId,
branchIndex: metadata.branchIndex,
branchTotal: metadata.branchTotal,
}
}
private findHandler(block: SerializedBlock): BlockHandler | undefined {
return this.blockHandlers.find((h) => h.canHandle(block))
}
private handleBlockError(
error: unknown,
ctx: ExecutionContext,
node: DAGNode,
block: SerializedBlock,
startTime: number,
blockLog: BlockLog | undefined,
resolvedInputs: Record<string, any>,
isSentinel: boolean,
phase: 'input_resolution' | 'execution'
): NormalizedBlockOutput {
const duration = Date.now() - startTime
const errorMessage = normalizeError(error)
const hasResolvedInputs =
resolvedInputs && typeof resolvedInputs === 'object' && Object.keys(resolvedInputs).length > 0
const input =
hasResolvedInputs && resolvedInputs
? resolvedInputs
: ((block.config?.params as Record<string, any> | undefined) ?? {})
if (blockLog) {
blockLog.endedAt = new Date().toISOString()
blockLog.durationMs = duration
blockLog.success = false
blockLog.error = errorMessage
blockLog.input = input
}
const errorOutput: NormalizedBlockOutput = {
error: errorMessage,
}
if (error && typeof error === 'object' && 'childTraceSpans' in error) {
errorOutput.childTraceSpans = (error as any).childTraceSpans
}
this.state.setBlockOutput(node.id, errorOutput, duration)
logger.error(
phase === 'input_resolution' ? 'Failed to resolve block inputs' : 'Block execution failed',
{
blockId: node.id,
blockType: block.metadata?.id,
error: errorMessage,
}
)
if (!isSentinel) {
this.callOnBlockComplete(ctx, node, block, input, errorOutput, duration)
}
const hasErrorPort = this.hasErrorPortEdge(node)
if (hasErrorPort) {
logger.info('Block has error port - returning error output instead of throwing', {
blockId: node.id,
error: errorMessage,
})
return errorOutput
}
const errorToThrow = error instanceof Error ? error : new Error(errorMessage)
throw buildBlockExecutionError({
block,
error: errorToThrow,
context: ctx,
additionalInfo: {
nodeId: node.id,
executionTime: duration,
},
})
}
private hasErrorPortEdge(node: DAGNode): boolean {
for (const [_, edge] of node.outgoingEdges) {
if (edge.sourceHandle === EDGE.ERROR) {
return true
}
}
return false
}
private createBlockLog(
ctx: ExecutionContext,
blockId: string,
block: SerializedBlock,
node: DAGNode
): BlockLog {
let blockName = block.metadata?.name ?? blockId
let loopId: string | undefined
let parallelId: string | undefined
let iterationIndex: number | undefined
if (node?.metadata) {
if (node.metadata.branchIndex !== undefined && node.metadata.parallelId) {
blockName = `${blockName} (iteration ${node.metadata.branchIndex})`
iterationIndex = node.metadata.branchIndex
parallelId = node.metadata.parallelId
} else if (node.metadata.isLoopNode && node.metadata.loopId) {
loopId = node.metadata.loopId
const loopScope = ctx.loopExecutions?.get(loopId)
if (loopScope && loopScope.iteration !== undefined) {
blockName = `${blockName} (iteration ${loopScope.iteration})`
iterationIndex = loopScope.iteration
} else {
logger.warn('Loop scope not found for block', { blockId, loopId })
}
}
}
return {
blockId,
blockName,
blockType: block.metadata?.id ?? DEFAULTS.BLOCK_TYPE,
startedAt: new Date().toISOString(),
endedAt: '',
durationMs: 0,
success: false,
loopId,
parallelId,
iterationIndex,
}
}
private normalizeOutput(output: unknown): NormalizedBlockOutput {
if (output === null || output === undefined) {
return {}
}
if (typeof output === 'object' && !Array.isArray(output)) {
return output as NormalizedBlockOutput
}
return { result: output }
}
private filterOutputForLog(
block: SerializedBlock,
output: NormalizedBlockOutput
): NormalizedBlockOutput {
if (block.metadata?.id === BlockType.HUMAN_IN_THE_LOOP) {
const filtered: NormalizedBlockOutput = {}
for (const [key, value] of Object.entries(output)) {
if (key.startsWith('_')) continue
if (key === 'response') continue
filtered[key] = value
}
return filtered
}
const isTrigger =
block.metadata?.category === 'triggers' ||
block.config?.params?.triggerMode === true ||
block.metadata?.id === BlockType.STARTER
if (isTrigger) {
const filtered: NormalizedBlockOutput = {}
const internalKeys = ['webhook', 'workflowId']
for (const [key, value] of Object.entries(output)) {
if (internalKeys.includes(key)) continue
filtered[key] = value
}
return filtered
}
return output
}
private callOnBlockStart(ctx: ExecutionContext, node: DAGNode, block: SerializedBlock): void {
const blockId = node.id
const blockName = block.metadata?.name ?? blockId
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE
const iterationContext = this.getIterationContext(ctx, node)
if (this.contextExtensions.onBlockStart) {
this.contextExtensions.onBlockStart(blockId, blockName, blockType, iterationContext)
}
}
private callOnBlockComplete(
ctx: ExecutionContext,
node: DAGNode,
block: SerializedBlock,
input: Record<string, any>,
output: NormalizedBlockOutput,
duration: number
): void {
const blockId = node.id
const blockName = block.metadata?.name ?? blockId
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE
const iterationContext = this.getIterationContext(ctx, node)
if (this.contextExtensions.onBlockComplete) {
this.contextExtensions.onBlockComplete(
blockId,
blockName,
blockType,
{
input,
output,
executionTime: duration,
},
iterationContext
)
}
}
private getIterationContext(
ctx: ExecutionContext,
node: DAGNode
): { iterationCurrent: number; iterationTotal: number; iterationType: SubflowType } | undefined {
if (!node?.metadata) return undefined
if (node.metadata.branchIndex !== undefined && node.metadata.branchTotal) {
return {
iterationCurrent: node.metadata.branchIndex,
iterationTotal: node.metadata.branchTotal,
iterationType: 'parallel',
}
}
if (node.metadata.isLoopNode && node.metadata.loopId) {
const loopScope = ctx.loopExecutions?.get(node.metadata.loopId)
if (loopScope && loopScope.iteration !== undefined && loopScope.maxIterations) {
return {
iterationCurrent: loopScope.iteration,
iterationTotal: loopScope.maxIterations,
iterationType: 'loop',
}
}
}
return undefined
}
/**
* Filters out unavailable MCP tools from agent inputs for logging.
* Only includes tools from servers with 'connected' status.
*/
private async filterUnavailableMcpToolsForLog(
ctx: ExecutionContext,
inputs: Record<string, any>
): Promise<Record<string, any>> {
const tools = inputs.tools
if (!Array.isArray(tools) || tools.length === 0) return inputs
const mcpTools = tools.filter((t: any) => t.type === 'mcp')
if (mcpTools.length === 0) return inputs
const serverIds = [
...new Set(mcpTools.map((t: any) => t.params?.serverId).filter(Boolean)),
] as string[]
if (serverIds.length === 0) return inputs
const availableServerIds = new Set<string>()
if (ctx.workspaceId && serverIds.length > 0) {
try {
const servers = await db
.select({ id: mcpServers.id, connectionStatus: mcpServers.connectionStatus })
.from(mcpServers)
.where(
and(
eq(mcpServers.workspaceId, ctx.workspaceId),
inArray(mcpServers.id, serverIds),
isNull(mcpServers.deletedAt)
)
)
for (const server of servers) {
if (server.connectionStatus === 'connected') {
availableServerIds.add(server.id)
}
}
} catch (error) {
logger.warn('Failed to check MCP server availability for logging:', error)
return inputs
}
}
const filteredTools = tools.filter((tool: any) => {
if (tool.type !== 'mcp') return true
const serverId = tool.params?.serverId
if (!serverId) return false
return availableServerIds.has(serverId)
})
return { ...inputs, tools: filteredTools }
}
private preparePauseResumeSelfReference(
ctx: ExecutionContext,
node: DAGNode,
block: SerializedBlock,
nodeMetadata: {
nodeId: string
loopId?: string
parallelId?: string
branchIndex?: number
branchTotal?: number
}
): (() => void) | undefined {
const blockId = node.id
const existingState = ctx.blockStates.get(blockId)
if (existingState?.executed) {
return undefined
}
const executionId = ctx.executionId ?? ctx.metadata?.executionId
const workflowId = ctx.workflowId
if (!executionId || !workflowId) {
return undefined
}
const { loopScope } = mapNodeMetadataToPauseScopes(ctx, nodeMetadata)
const contextId = generatePauseContextId(block.id, nodeMetadata, loopScope)
let resumeLinks: { apiUrl: string; uiUrl: string }
try {
const baseUrl = getBaseUrl()
resumeLinks = {
apiUrl: buildResumeApiUrl(baseUrl, workflowId, executionId, contextId),
uiUrl: buildResumeUiUrl(baseUrl, workflowId, executionId),
}
} catch {
resumeLinks = {
apiUrl: buildResumeApiUrl(undefined, workflowId, executionId, contextId),
uiUrl: buildResumeUiUrl(undefined, workflowId, executionId),
}
}
let previousState: BlockState | undefined
if (existingState) {
previousState = { ...existingState }
}
const hadPrevious = existingState !== undefined
const placeholderState: BlockState = {
output: {
url: resumeLinks.uiUrl,
resumeEndpoint: resumeLinks.apiUrl,
},
executed: false,
executionTime: existingState?.executionTime ?? 0,
}
this.state.setBlockState(blockId, placeholderState)
return () => {
if (hadPrevious && previousState) {
this.state.setBlockState(blockId, previousState)
} else {
this.state.deleteBlockState(blockId)
}
}
}
private async handleStreamingExecution(
ctx: ExecutionContext,
node: DAGNode,
block: SerializedBlock,
streamingExec: { stream: ReadableStream; execution: any },
resolvedInputs: Record<string, any>,
selectedOutputs: string[]
): Promise<void> {
const blockId = node.id
const responseFormat =
resolvedInputs?.responseFormat ??
(block.config?.params as Record<string, any> | undefined)?.responseFormat ??
(block.config as Record<string, any> | undefined)?.responseFormat
const stream = streamingExec.stream
if (typeof stream.tee !== 'function') {
await this.forwardStream(ctx, blockId, streamingExec, stream, responseFormat, selectedOutputs)
return
}
const [clientStream, executorStream] = stream.tee()
const processedClientStream = streamingResponseFormatProcessor.processStream(
clientStream,
blockId,
selectedOutputs,
responseFormat
)
const clientStreamingExec = {
...streamingExec,
stream: processedClientStream,
}
const executorConsumption = this.consumeExecutorStream(
executorStream,
streamingExec,
blockId,
responseFormat
)
const clientConsumption = (async () => {
try {
await ctx.onStream?.(clientStreamingExec)
} catch (error) {
logger.error('Error in onStream callback', { blockId, error })
}
})()
await Promise.all([clientConsumption, executorConsumption])
}
private async forwardStream(
ctx: ExecutionContext,
blockId: string,
streamingExec: { stream: ReadableStream; execution: any },
stream: ReadableStream,
responseFormat: any,
selectedOutputs: string[]
): Promise<void> {
const processedStream = streamingResponseFormatProcessor.processStream(
stream,
blockId,
selectedOutputs,
responseFormat
)
try {
await ctx.onStream?.({
...streamingExec,
stream: processedStream,
})
} catch (error) {
logger.error('Error in onStream callback', { blockId, error })
}
}
private async consumeExecutorStream(
stream: ReadableStream,
streamingExec: { execution: any },
blockId: string,
responseFormat: any
): Promise<void> {
const reader = stream.getReader()
const decoder = new TextDecoder()
let fullContent = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
fullContent += decoder.decode(value, { stream: true })
}
} catch (error) {
logger.error('Error reading executor stream for block', { blockId, error })
} finally {
try {
reader.releaseLock()
} catch {}
}
if (!fullContent) {
return
}
const executionOutput = streamingExec.execution?.output
if (!executionOutput || typeof executionOutput !== 'object') {
return
}
if (responseFormat) {
try {
const parsed = JSON.parse(fullContent.trim())
streamingExec.execution.output = {
...parsed,
tokens: executionOutput.tokens,
toolCalls: executionOutput.toolCalls,
providerTiming: executionOutput.providerTiming,
cost: executionOutput.cost,
model: executionOutput.model,
}
return
} catch (error) {
logger.warn('Failed to parse streamed content for response format', { blockId, error })
}
}
executionOutput.content = fullContent
}
}