mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-27 15:58:11 -05:00
654 lines
18 KiB
TypeScript
654 lines
18 KiB
TypeScript
import { createLogger } from '@sim/logger'
|
|
import { getBaseUrl } from '@/lib/core/utils/urls'
|
|
import {
|
|
containsUserFileWithMetadata,
|
|
hydrateUserFilesWithBase64,
|
|
} from '@/lib/uploads/utils/user-file-base64.server'
|
|
import {
|
|
BlockType,
|
|
buildResumeApiUrl,
|
|
buildResumeUiUrl,
|
|
DEFAULTS,
|
|
EDGE,
|
|
isSentinelBlockType,
|
|
isTriggerBehavior,
|
|
isWorkflowBlockType,
|
|
} from '@/executor/constants'
|
|
import type { DAGNode } from '@/executor/dag/builder'
|
|
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
|
|
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
|
|
import {
|
|
generatePauseContextId,
|
|
mapNodeMetadataToPauseScopes,
|
|
} from '@/executor/human-in-the-loop/utils.ts'
|
|
import type {
|
|
BlockHandler,
|
|
BlockLog,
|
|
BlockState,
|
|
ExecutionContext,
|
|
NormalizedBlockOutput,
|
|
} from '@/executor/types'
|
|
import { streamingResponseFormatProcessor } from '@/executor/utils'
|
|
import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors'
|
|
import { validateBlockType } from '@/executor/utils/permission-check'
|
|
import type { VariableResolver } from '@/executor/variables/resolver'
|
|
import type { SerializedBlock } from '@/serializer/types'
|
|
import type { SubflowType } from '@/stores/workflows/workflow/types'
|
|
|
|
const logger = createLogger('BlockExecutor')
|
|
|
|
export class BlockExecutor {
|
|
constructor(
|
|
private blockHandlers: BlockHandler[],
|
|
private resolver: VariableResolver,
|
|
private contextExtensions: ContextExtensions,
|
|
private state: BlockStateWriter
|
|
) {}
|
|
|
|
async execute(
|
|
ctx: ExecutionContext,
|
|
node: DAGNode,
|
|
block: SerializedBlock
|
|
): Promise<NormalizedBlockOutput> {
|
|
const handler = this.findHandler(block)
|
|
if (!handler) {
|
|
throw buildBlockExecutionError({
|
|
block,
|
|
context: ctx,
|
|
error: `No handler found for block type: ${block.metadata?.id ?? 'unknown'}`,
|
|
})
|
|
}
|
|
|
|
const blockType = block.metadata?.id ?? ''
|
|
const isSentinel = isSentinelBlockType(blockType)
|
|
|
|
let blockLog: BlockLog | undefined
|
|
if (!isSentinel) {
|
|
blockLog = this.createBlockLog(ctx, node.id, block, node)
|
|
ctx.blockLogs.push(blockLog)
|
|
this.callOnBlockStart(ctx, node, block)
|
|
}
|
|
|
|
const startTime = Date.now()
|
|
let resolvedInputs: Record<string, any> = {}
|
|
|
|
const nodeMetadata = this.buildNodeMetadata(node)
|
|
let cleanupSelfReference: (() => void) | undefined
|
|
|
|
if (block.metadata?.id === BlockType.HUMAN_IN_THE_LOOP) {
|
|
cleanupSelfReference = this.preparePauseResumeSelfReference(ctx, node, block, nodeMetadata)
|
|
}
|
|
|
|
try {
|
|
if (!isSentinel && blockType) {
|
|
await validateBlockType(ctx.userId, blockType, ctx)
|
|
}
|
|
|
|
resolvedInputs = this.resolver.resolveInputs(ctx, node.id, block.config.params, block)
|
|
|
|
if (blockLog) {
|
|
blockLog.input = resolvedInputs
|
|
}
|
|
} catch (error) {
|
|
cleanupSelfReference?.()
|
|
return this.handleBlockError(
|
|
error,
|
|
ctx,
|
|
node,
|
|
block,
|
|
startTime,
|
|
blockLog,
|
|
resolvedInputs,
|
|
isSentinel,
|
|
'input_resolution'
|
|
)
|
|
}
|
|
cleanupSelfReference?.()
|
|
|
|
try {
|
|
const output = handler.executeWithNode
|
|
? await handler.executeWithNode(ctx, block, resolvedInputs, nodeMetadata)
|
|
: await handler.execute(ctx, block, resolvedInputs)
|
|
|
|
const isStreamingExecution =
|
|
output && typeof output === 'object' && 'stream' in output && 'execution' in output
|
|
|
|
let normalizedOutput: NormalizedBlockOutput
|
|
if (isStreamingExecution) {
|
|
const streamingExec = output as { stream: ReadableStream; execution: any }
|
|
|
|
if (ctx.onStream) {
|
|
await this.handleStreamingExecution(
|
|
ctx,
|
|
node,
|
|
block,
|
|
streamingExec,
|
|
resolvedInputs,
|
|
ctx.selectedOutputs ?? []
|
|
)
|
|
}
|
|
|
|
normalizedOutput = this.normalizeOutput(
|
|
streamingExec.execution.output ?? streamingExec.execution
|
|
)
|
|
} else {
|
|
normalizedOutput = this.normalizeOutput(output)
|
|
}
|
|
|
|
if (ctx.includeFileBase64 && containsUserFileWithMetadata(normalizedOutput)) {
|
|
normalizedOutput = (await hydrateUserFilesWithBase64(normalizedOutput, {
|
|
requestId: ctx.metadata.requestId,
|
|
executionId: ctx.executionId,
|
|
maxBytes: ctx.base64MaxBytes,
|
|
})) as NormalizedBlockOutput
|
|
}
|
|
|
|
const duration = Date.now() - startTime
|
|
|
|
if (blockLog) {
|
|
blockLog.endedAt = new Date().toISOString()
|
|
blockLog.durationMs = duration
|
|
blockLog.success = true
|
|
blockLog.output = this.filterOutputForLog(block, normalizedOutput)
|
|
}
|
|
|
|
this.state.setBlockOutput(node.id, normalizedOutput, duration)
|
|
|
|
if (!isSentinel) {
|
|
const displayOutput = this.filterOutputForDisplay(block, normalizedOutput)
|
|
this.callOnBlockComplete(ctx, node, block, resolvedInputs, displayOutput, duration)
|
|
}
|
|
|
|
return normalizedOutput
|
|
} catch (error) {
|
|
return this.handleBlockError(
|
|
error,
|
|
ctx,
|
|
node,
|
|
block,
|
|
startTime,
|
|
blockLog,
|
|
resolvedInputs,
|
|
isSentinel,
|
|
'execution'
|
|
)
|
|
}
|
|
}
|
|
|
|
private buildNodeMetadata(node: DAGNode): {
|
|
nodeId: string
|
|
loopId?: string
|
|
parallelId?: string
|
|
branchIndex?: number
|
|
branchTotal?: number
|
|
} {
|
|
const metadata = node?.metadata ?? {}
|
|
return {
|
|
nodeId: node.id,
|
|
loopId: metadata.loopId,
|
|
parallelId: metadata.parallelId,
|
|
branchIndex: metadata.branchIndex,
|
|
branchTotal: metadata.branchTotal,
|
|
}
|
|
}
|
|
|
|
private findHandler(block: SerializedBlock): BlockHandler | undefined {
|
|
return this.blockHandlers.find((h) => h.canHandle(block))
|
|
}
|
|
|
|
private handleBlockError(
|
|
error: unknown,
|
|
ctx: ExecutionContext,
|
|
node: DAGNode,
|
|
block: SerializedBlock,
|
|
startTime: number,
|
|
blockLog: BlockLog | undefined,
|
|
resolvedInputs: Record<string, any>,
|
|
isSentinel: boolean,
|
|
phase: 'input_resolution' | 'execution'
|
|
): NormalizedBlockOutput {
|
|
const duration = Date.now() - startTime
|
|
const errorMessage = normalizeError(error)
|
|
const hasResolvedInputs =
|
|
resolvedInputs && typeof resolvedInputs === 'object' && Object.keys(resolvedInputs).length > 0
|
|
const input =
|
|
hasResolvedInputs && resolvedInputs
|
|
? resolvedInputs
|
|
: ((block.config?.params as Record<string, any> | undefined) ?? {})
|
|
|
|
const errorOutput: NormalizedBlockOutput = {
|
|
error: errorMessage,
|
|
}
|
|
|
|
if (ChildWorkflowError.isChildWorkflowError(error)) {
|
|
errorOutput.childTraceSpans = error.childTraceSpans
|
|
errorOutput.childWorkflowName = error.childWorkflowName
|
|
}
|
|
|
|
this.state.setBlockOutput(node.id, errorOutput, duration)
|
|
|
|
if (blockLog) {
|
|
blockLog.endedAt = new Date().toISOString()
|
|
blockLog.durationMs = duration
|
|
blockLog.success = false
|
|
blockLog.error = errorMessage
|
|
blockLog.input = input
|
|
blockLog.output = this.filterOutputForLog(block, errorOutput)
|
|
}
|
|
|
|
logger.error(
|
|
phase === 'input_resolution' ? 'Failed to resolve block inputs' : 'Block execution failed',
|
|
{
|
|
blockId: node.id,
|
|
blockType: block.metadata?.id,
|
|
error: errorMessage,
|
|
}
|
|
)
|
|
|
|
if (!isSentinel) {
|
|
const displayOutput = this.filterOutputForDisplay(block, errorOutput)
|
|
this.callOnBlockComplete(ctx, node, block, input, displayOutput, duration)
|
|
}
|
|
|
|
const hasErrorPort = this.hasErrorPortEdge(node)
|
|
if (hasErrorPort) {
|
|
logger.info('Block has error port - returning error output instead of throwing', {
|
|
blockId: node.id,
|
|
error: errorMessage,
|
|
})
|
|
return errorOutput
|
|
}
|
|
|
|
const errorToThrow = error instanceof Error ? error : new Error(errorMessage)
|
|
|
|
throw buildBlockExecutionError({
|
|
block,
|
|
error: errorToThrow,
|
|
context: ctx,
|
|
additionalInfo: {
|
|
nodeId: node.id,
|
|
executionTime: duration,
|
|
},
|
|
})
|
|
}
|
|
|
|
private hasErrorPortEdge(node: DAGNode): boolean {
|
|
for (const [_, edge] of node.outgoingEdges) {
|
|
if (edge.sourceHandle === EDGE.ERROR) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
private createBlockLog(
|
|
ctx: ExecutionContext,
|
|
blockId: string,
|
|
block: SerializedBlock,
|
|
node: DAGNode
|
|
): BlockLog {
|
|
let blockName = block.metadata?.name ?? blockId
|
|
let loopId: string | undefined
|
|
let parallelId: string | undefined
|
|
let iterationIndex: number | undefined
|
|
|
|
if (node?.metadata) {
|
|
if (node.metadata.branchIndex !== undefined && node.metadata.parallelId) {
|
|
blockName = `${blockName} (iteration ${node.metadata.branchIndex})`
|
|
iterationIndex = node.metadata.branchIndex
|
|
parallelId = node.metadata.parallelId
|
|
} else if (node.metadata.isLoopNode && node.metadata.loopId) {
|
|
loopId = node.metadata.loopId
|
|
const loopScope = ctx.loopExecutions?.get(loopId)
|
|
if (loopScope && loopScope.iteration !== undefined) {
|
|
blockName = `${blockName} (iteration ${loopScope.iteration})`
|
|
iterationIndex = loopScope.iteration
|
|
} else {
|
|
logger.warn('Loop scope not found for block', { blockId, loopId })
|
|
}
|
|
}
|
|
}
|
|
|
|
return {
|
|
blockId,
|
|
blockName,
|
|
blockType: block.metadata?.id ?? DEFAULTS.BLOCK_TYPE,
|
|
startedAt: new Date().toISOString(),
|
|
endedAt: '',
|
|
durationMs: 0,
|
|
success: false,
|
|
loopId,
|
|
parallelId,
|
|
iterationIndex,
|
|
}
|
|
}
|
|
|
|
private normalizeOutput(output: unknown): NormalizedBlockOutput {
|
|
if (output === null || output === undefined) {
|
|
return {}
|
|
}
|
|
|
|
if (typeof output === 'object' && !Array.isArray(output)) {
|
|
return output as NormalizedBlockOutput
|
|
}
|
|
|
|
return { result: output }
|
|
}
|
|
|
|
private filterOutputForLog(
|
|
block: SerializedBlock,
|
|
output: NormalizedBlockOutput
|
|
): NormalizedBlockOutput {
|
|
const blockType = block.metadata?.id
|
|
|
|
if (blockType === BlockType.HUMAN_IN_THE_LOOP) {
|
|
const filtered: NormalizedBlockOutput = {}
|
|
for (const [key, value] of Object.entries(output)) {
|
|
if (key.startsWith('_')) continue
|
|
if (key === 'response') continue
|
|
filtered[key] = value
|
|
}
|
|
return filtered
|
|
}
|
|
|
|
if (isTriggerBehavior(block)) {
|
|
const filtered: NormalizedBlockOutput = {}
|
|
const internalKeys = ['webhook', 'workflowId']
|
|
for (const [key, value] of Object.entries(output)) {
|
|
if (internalKeys.includes(key)) continue
|
|
filtered[key] = value
|
|
}
|
|
return filtered
|
|
}
|
|
|
|
return output
|
|
}
|
|
|
|
private filterOutputForDisplay(
|
|
block: SerializedBlock,
|
|
output: NormalizedBlockOutput
|
|
): NormalizedBlockOutput {
|
|
const filtered = this.filterOutputForLog(block, output)
|
|
|
|
if (isWorkflowBlockType(block.metadata?.id)) {
|
|
const { childTraceSpans: _, ...displayOutput } = filtered as {
|
|
childTraceSpans?: unknown
|
|
} & Record<string, unknown>
|
|
return displayOutput
|
|
}
|
|
|
|
return filtered
|
|
}
|
|
|
|
private callOnBlockStart(ctx: ExecutionContext, node: DAGNode, block: SerializedBlock): void {
|
|
const blockId = node.id
|
|
const blockName = block.metadata?.name ?? blockId
|
|
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE
|
|
|
|
const iterationContext = this.getIterationContext(ctx, node)
|
|
|
|
if (this.contextExtensions.onBlockStart) {
|
|
this.contextExtensions.onBlockStart(blockId, blockName, blockType, iterationContext)
|
|
}
|
|
}
|
|
|
|
private callOnBlockComplete(
|
|
ctx: ExecutionContext,
|
|
node: DAGNode,
|
|
block: SerializedBlock,
|
|
input: Record<string, any>,
|
|
output: NormalizedBlockOutput,
|
|
duration: number
|
|
): void {
|
|
const blockId = node.id
|
|
const blockName = block.metadata?.name ?? blockId
|
|
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE
|
|
|
|
const iterationContext = this.getIterationContext(ctx, node)
|
|
|
|
if (this.contextExtensions.onBlockComplete) {
|
|
this.contextExtensions.onBlockComplete(
|
|
blockId,
|
|
blockName,
|
|
blockType,
|
|
{
|
|
input,
|
|
output,
|
|
executionTime: duration,
|
|
},
|
|
iterationContext
|
|
)
|
|
}
|
|
}
|
|
|
|
private getIterationContext(
|
|
ctx: ExecutionContext,
|
|
node: DAGNode
|
|
): { iterationCurrent: number; iterationTotal: number; iterationType: SubflowType } | undefined {
|
|
if (!node?.metadata) return undefined
|
|
|
|
if (node.metadata.branchIndex !== undefined && node.metadata.branchTotal) {
|
|
return {
|
|
iterationCurrent: node.metadata.branchIndex,
|
|
iterationTotal: node.metadata.branchTotal,
|
|
iterationType: 'parallel',
|
|
}
|
|
}
|
|
|
|
if (node.metadata.isLoopNode && node.metadata.loopId) {
|
|
const loopScope = ctx.loopExecutions?.get(node.metadata.loopId)
|
|
if (loopScope && loopScope.iteration !== undefined && loopScope.maxIterations) {
|
|
return {
|
|
iterationCurrent: loopScope.iteration,
|
|
iterationTotal: loopScope.maxIterations,
|
|
iterationType: 'loop',
|
|
}
|
|
}
|
|
}
|
|
|
|
return undefined
|
|
}
|
|
|
|
private preparePauseResumeSelfReference(
|
|
ctx: ExecutionContext,
|
|
node: DAGNode,
|
|
block: SerializedBlock,
|
|
nodeMetadata: {
|
|
nodeId: string
|
|
loopId?: string
|
|
parallelId?: string
|
|
branchIndex?: number
|
|
branchTotal?: number
|
|
}
|
|
): (() => void) | undefined {
|
|
const blockId = node.id
|
|
|
|
const existingState = ctx.blockStates.get(blockId)
|
|
if (existingState?.executed) {
|
|
return undefined
|
|
}
|
|
|
|
const executionId = ctx.executionId ?? ctx.metadata?.executionId
|
|
const workflowId = ctx.workflowId
|
|
|
|
if (!executionId || !workflowId) {
|
|
return undefined
|
|
}
|
|
|
|
const { loopScope } = mapNodeMetadataToPauseScopes(ctx, nodeMetadata)
|
|
const contextId = generatePauseContextId(block.id, nodeMetadata, loopScope)
|
|
|
|
let resumeLinks: { apiUrl: string; uiUrl: string }
|
|
|
|
try {
|
|
const baseUrl = getBaseUrl()
|
|
resumeLinks = {
|
|
apiUrl: buildResumeApiUrl(baseUrl, workflowId, executionId, contextId),
|
|
uiUrl: buildResumeUiUrl(baseUrl, workflowId, executionId),
|
|
}
|
|
} catch {
|
|
resumeLinks = {
|
|
apiUrl: buildResumeApiUrl(undefined, workflowId, executionId, contextId),
|
|
uiUrl: buildResumeUiUrl(undefined, workflowId, executionId),
|
|
}
|
|
}
|
|
|
|
let previousState: BlockState | undefined
|
|
if (existingState) {
|
|
previousState = { ...existingState }
|
|
}
|
|
const hadPrevious = existingState !== undefined
|
|
|
|
const placeholderState: BlockState = {
|
|
output: {
|
|
url: resumeLinks.uiUrl,
|
|
resumeEndpoint: resumeLinks.apiUrl,
|
|
},
|
|
executed: false,
|
|
executionTime: existingState?.executionTime ?? 0,
|
|
}
|
|
|
|
this.state.setBlockState(blockId, placeholderState)
|
|
|
|
return () => {
|
|
if (hadPrevious && previousState) {
|
|
this.state.setBlockState(blockId, previousState)
|
|
} else {
|
|
this.state.deleteBlockState(blockId)
|
|
}
|
|
}
|
|
}
|
|
|
|
private async handleStreamingExecution(
|
|
ctx: ExecutionContext,
|
|
node: DAGNode,
|
|
block: SerializedBlock,
|
|
streamingExec: { stream: ReadableStream; execution: any },
|
|
resolvedInputs: Record<string, any>,
|
|
selectedOutputs: string[]
|
|
): Promise<void> {
|
|
const blockId = node.id
|
|
|
|
const responseFormat =
|
|
resolvedInputs?.responseFormat ??
|
|
(block.config?.params as Record<string, any> | undefined)?.responseFormat ??
|
|
(block.config as Record<string, any> | undefined)?.responseFormat
|
|
|
|
const stream = streamingExec.stream
|
|
if (typeof stream.tee !== 'function') {
|
|
await this.forwardStream(ctx, blockId, streamingExec, stream, responseFormat, selectedOutputs)
|
|
return
|
|
}
|
|
|
|
const [clientStream, executorStream] = stream.tee()
|
|
|
|
const processedClientStream = streamingResponseFormatProcessor.processStream(
|
|
clientStream,
|
|
blockId,
|
|
selectedOutputs,
|
|
responseFormat
|
|
)
|
|
|
|
const clientStreamingExec = {
|
|
...streamingExec,
|
|
stream: processedClientStream,
|
|
}
|
|
|
|
const executorConsumption = this.consumeExecutorStream(
|
|
executorStream,
|
|
streamingExec,
|
|
blockId,
|
|
responseFormat
|
|
)
|
|
|
|
const clientConsumption = (async () => {
|
|
try {
|
|
await ctx.onStream?.(clientStreamingExec)
|
|
} catch (error) {
|
|
logger.error('Error in onStream callback', { blockId, error })
|
|
}
|
|
})()
|
|
|
|
await Promise.all([clientConsumption, executorConsumption])
|
|
}
|
|
|
|
private async forwardStream(
|
|
ctx: ExecutionContext,
|
|
blockId: string,
|
|
streamingExec: { stream: ReadableStream; execution: any },
|
|
stream: ReadableStream,
|
|
responseFormat: any,
|
|
selectedOutputs: string[]
|
|
): Promise<void> {
|
|
const processedStream = streamingResponseFormatProcessor.processStream(
|
|
stream,
|
|
blockId,
|
|
selectedOutputs,
|
|
responseFormat
|
|
)
|
|
|
|
try {
|
|
await ctx.onStream?.({
|
|
...streamingExec,
|
|
stream: processedStream,
|
|
})
|
|
} catch (error) {
|
|
logger.error('Error in onStream callback', { blockId, error })
|
|
}
|
|
}
|
|
|
|
private async consumeExecutorStream(
|
|
stream: ReadableStream,
|
|
streamingExec: { execution: any },
|
|
blockId: string,
|
|
responseFormat: any
|
|
): Promise<void> {
|
|
const reader = stream.getReader()
|
|
const decoder = new TextDecoder()
|
|
let fullContent = ''
|
|
|
|
try {
|
|
while (true) {
|
|
const { done, value } = await reader.read()
|
|
if (done) break
|
|
fullContent += decoder.decode(value, { stream: true })
|
|
}
|
|
} catch (error) {
|
|
logger.error('Error reading executor stream for block', { blockId, error })
|
|
} finally {
|
|
try {
|
|
reader.releaseLock()
|
|
} catch {}
|
|
}
|
|
|
|
if (!fullContent) {
|
|
return
|
|
}
|
|
|
|
const executionOutput = streamingExec.execution?.output
|
|
if (!executionOutput || typeof executionOutput !== 'object') {
|
|
return
|
|
}
|
|
|
|
if (responseFormat) {
|
|
try {
|
|
const parsed = JSON.parse(fullContent.trim())
|
|
|
|
streamingExec.execution.output = {
|
|
...parsed,
|
|
tokens: executionOutput.tokens,
|
|
toolCalls: executionOutput.toolCalls,
|
|
providerTiming: executionOutput.providerTiming,
|
|
cost: executionOutput.cost,
|
|
model: executionOutput.model,
|
|
}
|
|
return
|
|
} catch (error) {
|
|
logger.warn('Failed to parse streamed content for response format', { blockId, error })
|
|
}
|
|
}
|
|
|
|
executionOutput.content = fullContent
|
|
}
|
|
}
|