Added function execution logic locally instead of 3P API using vm, modified executor to handle connection block (still testing)

This commit is contained in:
Waleed Latif
2025-02-08 14:21:41 -08:00
parent f366b557d5
commit 30d3e934db
11 changed files with 557 additions and 265 deletions

89
app/api/execute/route.ts Normal file
View File

@@ -0,0 +1,89 @@
import { NextRequest, NextResponse } from 'next/server'
import { Script, createContext } from 'vm'
// Explicitly export allowed methods
export const dynamic = 'force-dynamic' // Disable static optimization
export const runtime = 'nodejs' // Use Node.js runtime
export async function POST(req: NextRequest) {
const startTime = Date.now()
let stdout = ''
try {
const body = await req.json()
const { code, timeout = 3000 } = body
// Check if code contains unresolved template variables
if (code.includes('<') && code.includes('>')) {
throw new Error(
'Code contains unresolved template variables. Please ensure all variables are resolved before execution.'
)
}
// Create a secure context with console logging
const context = createContext({
console: {
log: (...args: any[]) => {
const logMessage =
args
.map((arg) => (typeof arg === 'object' ? JSON.stringify(arg, null, 2) : String(arg)))
.join(' ') + '\n'
stdout += logMessage
},
error: (...args: any[]) => {
const errorMessage =
args
.map((arg) => (typeof arg === 'object' ? JSON.stringify(arg, null, 2) : String(arg)))
.join(' ') + '\n'
console.error('❌ Code Console Error:', errorMessage.trim())
stdout += 'ERROR: ' + errorMessage
},
},
})
// Create and run the script
const script = new Script(`
(async () => {
try {
${code}
} catch (error) {
console.error(error);
throw error;
}
})()
`)
const result = await script.runInContext(context, {
timeout,
displayErrors: true,
})
const executionTime = Date.now() - startTime
const response = {
success: true,
output: {
result,
stdout,
executionTime,
},
}
return NextResponse.json(response)
} catch (error: any) {
const executionTime = Date.now() - startTime
const errorResponse = {
success: false,
error: error.message || 'Code execution failed',
output: {
result: null,
stdout,
executionTime,
},
}
return NextResponse.json(errorResponse, { status: 500 })
}
}

View File

@@ -205,7 +205,7 @@ export function ConditionInput({ blockId, subBlockId, isConnecting }: ConditionI
for (let i = 0; i < height; i++) {
numbers.push(
<div
key={`${lineNumber}-${i}`}
key={`${blockId}-${lineNumber}-${i}`}
className={cn('text-xs text-muted-foreground leading-[21px]', i > 0 && 'invisible')}
>
{lineNumber}

View File

@@ -23,6 +23,7 @@ export const ConditionBlock: BlockConfig<CodeExecutionOutput> = {
type: {
result: 'any',
stdout: 'string',
executionTime: 'number',
},
},
},

View File

@@ -17,12 +17,15 @@ export const FunctionBlock: BlockConfig<CodeExecutionOutput> = {
workflow: {
inputs: {
code: { type: 'string', required: true },
timeout: { type: 'number', required: false },
memoryLimit: { type: 'number', required: false },
},
outputs: {
response: {
type: {
result: 'any',
stdout: 'string',
executionTime: 'number',
},
},
},

View File

@@ -1,12 +1,18 @@
/**
* "Executor" for running agentic workflows in parallel.
* Executor for running agentic workflows in parallel.
*
* Notes & Features:
* • Uses a layered topological sort to allow parallel block execution for blocks with no remaining dependencies.
* • Each block's inputs are resolved through a template mechanism (e.g., <blockId.property>).
* • Stores block outputs in context.blockStates so subsequent blocks can reference them by ID or name.
* • Maintains robust error handling (if a block fails, throws an error for the entire workflow).
* • Returns per-block logs that can be displayed in the UI for better trace/debug.
* High-Level Overview:
* - This class is responsible for running workflows using a layered topological sort.
* - Blocks that have no unresolved dependencies are executed in parallel.
* - Depending on the block type (router, condition, agent, or regular tool), different execution
* logic is applied. For example, condition blocks evaluate multiple branches and record the
* chosen branch via its condition ID so that only that path is executed.
* - Each block's output is stored in the ExecutionContext so that subsequent blocks can reference them.
* - Detailed logs are collected for each block to assist with debugging.
*
* Error Handling:
* - If a block fails, an error is thrown, halting the workflow.
* - Meaningful error messages are provided.
*/
import { getAllBlocks } from '@/blocks'
import { generateRouterPrompt } from '@/blocks/blocks/router'
@@ -15,24 +21,24 @@ import { BlockConfig } from '@/blocks/types'
import { executeProviderRequest } from '@/providers/service'
import { getProviderFromModel } from '@/providers/utils'
import { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
import { executeTool, getTool, tools } from '@/tools'
import { executeTool, getTool } from '@/tools'
import { BlockLog, ExecutionContext, ExecutionResult, Tool } from './types'
export class Executor {
constructor(
private workflow: SerializedWorkflow,
// Initial block states can be passed in if you need to resume workflows or pre-populate data.
// Initial block states can be passed in (e.g., for resuming workflows or pre-populating data)
private initialBlockStates: Record<string, BlockOutput> = {},
private environmentVariables: Record<string, string> = {}
) {}
/**
* Main entry point that executes the entire workflow in parallel layers.
* Main entry point that executes the entire workflow in layered parallel fashion.
*/
async execute(workflowId: string): Promise<ExecutionResult> {
const startTime = new Date()
// Build the ExecutionContext with new blockLogs array
// Build the execution context: holds outputs, logs, metadata, and environment variables.
const context: ExecutionContext = {
workflowId,
blockStates: new Map<string, BlockOutput>(),
@@ -43,19 +49,18 @@ export class Executor {
environmentVariables: this.environmentVariables,
}
// Pre-populate block states if initialBlockStates exist
// Pre-populate context with any initial block states.
Object.entries(this.initialBlockStates).forEach(([blockId, output]) => {
context.blockStates.set(blockId, output)
})
try {
// Perform layered parallel execution
// Execute all blocks in parallel layers (using topological sorting).
const lastOutput = await this.executeInParallel(context)
const endTime = new Date()
context.metadata.endTime = endTime.toISOString()
// Return full logs for the UI to consume
return {
success: true,
output: lastOutput,
@@ -67,7 +72,6 @@ export class Executor {
logs: context.blockLogs,
}
} catch (error: any) {
// Ensure we return a meaningful error message
return {
success: false,
output: { response: {} },
@@ -78,138 +82,184 @@ export class Executor {
}
/**
* Executes all blocks in a layered topological fashion, running each layer in parallel via Promise.all.
* If a cycle is detected, throws an error.
* Executes workflow blocks layer-by-layer. Blocks with no dependencies are processed together.
*
* Notes:
* - Maintains in-degrees and adjacency lists for blocks (i.e. dependencies).
* - Blocks with condition or router types update routing/conditional decisions.
* - Only the branch corresponding to the evaluated condition is executed.
*/
private async executeInParallel(context: ExecutionContext): Promise<BlockOutput> {
const { blocks, connections } = this.workflow
// Build in-degree and adjacency list for each block
// Build dependency graphs: inDegree (number of incoming edges) and adjacency (outgoing connections)
const inDegree = new Map<string, number>()
const adjacency = new Map<string, string[]>()
// Initialize inDegree and adjacency
for (const block of blocks) {
inDegree.set(block.id, 0)
adjacency.set(block.id, [])
}
// Populate edges
// Populate inDegree and adjacency. For conditional connections, inDegree is handled dynamically.
for (const conn of connections) {
inDegree.set(conn.target, (inDegree.get(conn.target) || 0) + 1)
// Increase inDegree only for regular (non-conditional) connections.
if (!conn.condition) {
inDegree.set(conn.target, (inDegree.get(conn.target) || 0) + 1)
}
adjacency.get(conn.source)?.push(conn.target)
}
// Maps for router and conditional decisions.
// routerDecisions: router block id -> chosen target block id.
// activeConditionalPaths: conditional block id -> selected condition id.
const routerDecisions = new Map<string, string>()
const activeConditionalPaths = new Map<string, string>()
// Queue initially contains all blocks without dependencies.
const queue: string[] = []
for (const [blockId, degree] of inDegree) {
if (degree === 0) {
queue.push(blockId)
}
}
// This variable will store the output of the latest executed block.
let lastOutput: BlockOutput = { response: {} }
let routerDecision: { routerId: string; chosenPath: string } | null = null
// Start with all blocks that have inDegree = 0
let layer = blocks.filter((b) => (inDegree.get(b.id) || 0) === 0).map((b) => b.id)
// Process blocks layer by layer.
while (queue.length > 0) {
const currentLayer = [...queue]
queue.length = 0
while (layer.length > 0) {
// Execute current layer in parallel, but only if blocks are in the chosen path
const results = await Promise.all(
layer
.filter((blockId) => {
// If we have a router decision, only execute blocks in the chosen path
if (routerDecision) {
return this.isInChosenPath(
blockId,
routerDecision.chosenPath,
routerDecision.routerId
)
// Filtering: only execute blocks that match router and conditional decisions.
const executableBlocks = currentLayer.filter((blockId) => {
// Verify if block lies on the router's chosen path.
for (const [routerId, chosenPath] of routerDecisions) {
if (!this.isInChosenPath(blockId, chosenPath, routerId)) {
return false
}
}
// Verify if block lies on the selected conditional path.
for (const [conditionBlockId, selectedConditionId] of activeConditionalPaths) {
const connection = connections.find(
(conn) =>
conn.source === conditionBlockId &&
conn.target === blockId &&
conn.sourceHandle?.startsWith('condition-')
)
if (connection) {
// Extract condition id from sourceHandle (format: "condition-<conditionId>")
const connConditionId = connection.sourceHandle?.replace('condition-', '')
if (connConditionId !== selectedConditionId) {
return false
}
return true
})
.map(async (blockId) => {
const block = blocks.find((b) => b.id === blockId)
if (!block) {
throw new Error(`Missing block ${blockId}`)
}
}
return true
})
// Execute blocks in the current layer in parallel.
const layerResults = await Promise.all(
executableBlocks.map(async (blockId) => {
const block = blocks.find((b) => b.id === blockId)
if (!block) {
throw new Error(`Block ${blockId} not found`)
}
// Resolve inputs (including template variables and env vars) for the block.
const inputs = this.resolveInputs(block, context)
const result = await this.executeBlock(block, inputs, context)
// Store the block output in context for later reference.
context.blockStates.set(block.id, result)
// Update lastOutput to reflect the latest executed block.
lastOutput = result
// For router or condition blocks, update decision maps accordingly.
if (block.metadata?.type === 'router') {
const routerResult = result as {
response: {
content: string
model: string
tokens: {
prompt: number
completion: number
total: number
}
selectedPath: { blockId: string }
}
}
// Skip disabled blocks
if (block.enabled === false) {
return { response: {} }
}
try {
const resolvedInputs = this.resolveInputs(block, context)
const output = await this.executeBlock(block, resolvedInputs, context)
// If this is a router block, store its decision
if (
block.metadata?.type === 'router' &&
output &&
typeof output === 'object' &&
'response' in output &&
output.response &&
typeof output.response === 'object' &&
'selectedPath' in output.response
) {
const routerResponse = output.response as { selectedPath: { blockId: string } }
routerDecision = {
routerId: block.id,
chosenPath: routerResponse.selectedPath.blockId,
routerDecisions.set(block.id, routerResult.response.selectedPath.blockId)
} else if (block.metadata?.type === 'condition') {
const conditionResult = result as {
response: {
condition: {
selectedConditionId: string
result: boolean
}
}
context.blockStates.set(block.id, output)
return output
} catch (error) {
throw error
}
})
activeConditionalPaths.set(
block.id,
conditionResult.response.condition.selectedConditionId
)
}
return blockId
})
)
if (results.length > 0) {
lastOutput = results[results.length - 1]
}
// After executing a layer, update in-degrees for all adjacent blocks.
for (const finishedBlockId of layerResults) {
const neighbors = adjacency.get(finishedBlockId) || []
for (const neighbor of neighbors) {
// Find the relevant connection from finishedBlockId to neighbor.
const connection = connections.find(
(conn) => conn.source === finishedBlockId && conn.target === neighbor
)
if (!connection) continue
// Build the next layer by reducing in-degree of neighbors
const nextLayer: string[] = []
for (const blockId of layer) {
const neighbors = adjacency.get(blockId) || []
for (const targetId of neighbors) {
const deg = inDegree.get(targetId) ?? 0
const newDeg = deg - 1
inDegree.set(targetId, newDeg)
if (newDeg === 0) {
nextLayer.push(targetId)
// Regular (non-conditional) connection: always decrement.
if (!connection.sourceHandle || !connection.sourceHandle.startsWith('condition-')) {
const newDegree = (inDegree.get(neighbor) || 0) - 1
inDegree.set(neighbor, newDegree)
if (newDegree === 0) {
queue.push(neighbor)
}
} else {
// For a conditional connection, only decrement if the active condition matches.
const conditionId = connection.sourceHandle.replace('condition-', '')
if (activeConditionalPaths.get(finishedBlockId) === conditionId) {
const newDegree = (inDegree.get(neighbor) || 0) - 1
inDegree.set(neighbor, newDegree)
if (newDegree === 0) {
queue.push(neighbor)
}
}
}
}
}
layer = nextLayer
}
// Validate that all blocks were executed. If not, the workflow has a cycle.
const executedCount = [...inDegree.values()].filter((x) => x === 0).length
if (executedCount !== blocks.length) {
throw new Error('Workflow contains cycles or invalid connections')
}
return lastOutput
}
/**
* Executes a single block by:
* 1) Determining which tool to call
* 2) Validating parameters
* 3) Making the request (for http blocks or LLM blocks, etc.)
* 4) Transforming the response via the tool's transformResponse
* Executes a single block. Deduces the tool to call, validates parameters,
* makes the request, and transforms the response.
*
* The result is logged and returned.
*/
private async executeBlock(
block: SerializedBlock,
inputs: Record<string, any>,
context: ExecutionContext
): Promise<BlockOutput> {
// Start timing
const startTime = new Date()
const blockLog: BlockLog = {
blockId: block.id,
blockTitle: block.metadata?.title,
blockType: block.metadata?.type,
blockTitle: block.metadata?.title || '',
blockType: block.metadata?.type || '',
startedAt: startTime.toISOString(),
endedAt: '',
durationMs: 0,
@@ -219,7 +269,7 @@ export class Executor {
try {
let output: BlockOutput
// Handle router blocks differently
// Execute block based on its type.
if (block.metadata?.type === 'router') {
const routerOutput = await this.executeRouterBlock(block, context)
output = {
@@ -230,18 +280,28 @@ export class Executor {
selectedPath: routerOutput.selectedPath,
},
}
} else if (block.metadata?.type === 'condition') {
const conditionResult = await this.executeConditionalBlock(block, context)
output = {
response: {
result: conditionResult.condition,
content: conditionResult.content,
condition: {
result: conditionResult.condition,
selectedPath: conditionResult.selectedPath,
selectedConditionId: conditionResult.selectedConditionId,
},
},
}
} else if (block.metadata?.type === 'agent') {
// Special handling for agent blocks that use providers
let responseFormat = undefined
// Agent block: use a provider request.
let responseFormat: any = undefined
if (inputs.responseFormat) {
try {
// If it's already a string, parse it once
if (typeof inputs.responseFormat === 'string') {
responseFormat = JSON.parse(inputs.responseFormat)
} else {
// If it's somehow already an object, use it directly
responseFormat = inputs.responseFormat
}
responseFormat =
typeof inputs.responseFormat === 'string'
? JSON.parse(inputs.responseFormat)
: inputs.responseFormat
} catch (error: any) {
console.error('Error parsing responseFormat:', error)
throw new Error('Invalid response format: ' + error.message)
@@ -251,20 +311,16 @@ export class Executor {
const model = inputs.model || 'gpt-4o'
const providerId = getProviderFromModel(model)
// Format tools if they exist
const tools = Array.isArray(inputs.tools)
// Format tools if provided. (Rename local variable to avoid conflict with imported "tools".)
const formattedTools = Array.isArray(inputs.tools)
? inputs.tools
.map((tool: any) => {
const block = getAllBlocks().find((b: BlockConfig) => b.type === tool.type)
const toolId = block?.tools.access[0]
if (!toolId) {
return null
}
const blockFound = getAllBlocks().find((b: BlockConfig) => b.type === tool.type)
const toolId = blockFound?.tools.access[0]
if (!toolId) return null
const toolConfig = getTool(toolId)
if (!toolConfig) {
return null
}
if (!toolConfig) return null
return {
id: toolConfig.id,
@@ -296,10 +352,11 @@ export class Executor {
const response = await executeProviderRequest(providerId, {
model,
systemPrompt: inputs.systemPrompt,
context: Array.isArray(inputs.context)
? JSON.stringify(inputs.context, null, 2)
: inputs.context,
tools: tools.length > 0 ? tools : undefined,
context:
Array.isArray(inputs.context) === true
? JSON.stringify(inputs.context, null, 2)
: inputs.context,
tools: formattedTools.length > 0 ? formattedTools : undefined,
temperature: inputs.temperature,
maxTokens: inputs.maxTokens,
apiKey: inputs.apiKey,
@@ -337,10 +394,10 @@ export class Executor {
},
}
} else {
// Regular tool execution
// Regular tool block execution.
const tool = getTool(block.config.tool)
if (!tool) {
throw new Error(`Tool ${block.config.tool} not found`)
throw new Error(`Tool not found: ${block.config.tool}`)
}
const result = await executeTool(block.config.tool, inputs)
@@ -348,41 +405,35 @@ export class Executor {
console.error('Tool execution failed:', result.error)
throw new Error(result.error || `Tool ${block.config.tool} failed with no error message`)
}
output = { response: result.output }
}
// Mark block execution as successful and record timing.
blockLog.success = true
blockLog.output = output
// Compute timing
const endTime = new Date()
blockLog.endedAt = endTime.toISOString()
blockLog.durationMs = endTime.getTime() - startTime.getTime()
// Add log entry
context.blockLogs.push(blockLog)
// Ensure block output is available in the context for downstream blocks.
context.blockStates.set(block.id, output)
return output
} catch (error: any) {
// Update block log with error
// On error: log the error, update blockLog, and rethrow.
blockLog.success = false
blockLog.error = error.message || `Block execution failed`
// Compute timing
blockLog.error = error.message || 'Block execution failed'
const endTime = new Date()
blockLog.endedAt = endTime.toISOString()
blockLog.durationMs = endTime.getTime() - startTime.getTime()
// Add log entry
context.blockLogs.push(blockLog)
throw error
}
}
/**
* Validates required parameters for a Tool, or uses defaults if present.
* Validates required parameters for a given tool configuration.
* Uses defaults when available and throws an error if a required parameter is missing.
*/
private validateToolParams(tool: Tool, params: Record<string, any>): Record<string, any> {
return Object.entries(tool.params).reduce(
@@ -401,13 +452,14 @@ export class Executor {
}
/**
* Resolves any template references in a block's config params (e.g., "<someBlockId.response>"),
* pulling from context.blockStates. This is how outputs from one block get wired as inputs to another.
* Resolves template references in a block's configuration (e.g., "<blockId.property>"),
* as well as environment variables (format: "{{ENV_VAR}}").
* The values are pulled from the context's blockStates and environmentVariables.
*/
private resolveInputs(block: SerializedBlock, context: ExecutionContext): Record<string, any> {
const inputs = { ...block.config.params }
// Create quick-lookup for blocks by ID and by normalized name
// Create quick lookups for blocks by ID and by normalized title.
const blockById = new Map(this.workflow.blocks.map((b) => [b.id, b]))
const blockByName = new Map(
this.workflow.blocks.map((b) => [
@@ -416,7 +468,7 @@ export class Executor {
])
)
// Helper function to resolve environment variables in a value
// Helper to resolve environment variables in a given value.
const resolveEnvVars = (value: any): any => {
if (typeof value === 'string') {
const envMatches = value.match(/\{\{([^}]+)\}\}/g)
@@ -425,11 +477,9 @@ export class Executor {
for (const match of envMatches) {
const envKey = match.slice(2, -2)
const envValue = this.environmentVariables?.[envKey]
if (envValue === undefined) {
throw new Error(`Environment variable "${envKey}" was not found.`)
}
resolvedValue = resolvedValue.replace(match, envValue)
}
return resolvedValue
@@ -453,76 +503,58 @@ export class Executor {
if (typeof value === 'string') {
let resolvedValue = value
// Handle block references with <> syntax
// Resolve block reference templates in the format "<blockId.property>"
const blockMatches = value.match(/<([^>]+)>/g)
if (blockMatches) {
for (const match of blockMatches) {
// e.g. "<someBlockId.response>"
const path = match.slice(1, -1) // remove < and >
const path = match.slice(1, -1)
const [blockRef, ...pathParts] = path.split('.')
// Try referencing as an ID, then as a normalized name.
let sourceBlock = blockById.get(blockRef)
if (!sourceBlock) {
const normalized = blockRef.toLowerCase().replace(/\s+/g, '')
sourceBlock = blockByName.get(normalized)
}
if (!sourceBlock) {
throw new Error(`Block reference "${blockRef}" was not found.`)
}
// Check if the referenced block is disabled.
if (sourceBlock.enabled === false) {
throw new Error(
`Block "${sourceBlock.metadata?.title}" is disabled, and block "${block.metadata?.title}" depends on it.`
)
}
const sourceState = context.blockStates.get(sourceBlock.id)
if (!sourceState) {
throw new Error(
`No state found for block "${sourceBlock.metadata?.title}" (ID: ${sourceBlock.id}).`
)
}
// Drill into the path
// Drill into the property path.
let replacementValue: any = sourceState
for (const part of pathParts) {
if (!replacementValue || typeof replacementValue !== 'object') {
throw new Error(
`Invalid path part "${part}" in "${path}" for block "${block.metadata?.title}".`
`Invalid path "${part}" in "${path}" for block "${block.metadata?.title}".`
)
}
// Check if we have a response format and this is a field from it
const responseFormat = sourceBlock.config.params?.responseFormat
if (responseFormat && typeof responseFormat === 'string') {
try {
const parsedFormat = JSON.parse(responseFormat)
if (parsedFormat?.fields?.some((f: any) => f.name === part)) {
replacementValue = replacementValue[part]
continue
}
} catch (e) {
console.error('Error parsing response format:', e)
}
}
// Optional: special-case formatting for response formats.
replacementValue = replacementValue[part]
}
// If a valid leaf is found
if (replacementValue !== undefined) {
// Special handling for context field to ensure proper formatting
if (key === 'context') {
// If it's not a string, stringify it for context
if (block.metadata?.type === 'function' && key === 'code') {
// For function blocks, format the code nicely.
resolvedValue = resolvedValue.replace(
match,
typeof replacementValue === 'object'
? JSON.stringify(replacementValue, null, 2)
: JSON.stringify(String(replacementValue))
)
} else if (key === 'context') {
resolvedValue =
typeof replacementValue === 'string'
? replacementValue
: JSON.stringify(replacementValue, null, 2)
} else {
// For all other fields, use existing logic
resolvedValue = resolvedValue.replace(
match,
typeof replacementValue === 'object'
@@ -537,11 +569,8 @@ export class Executor {
}
}
}
// After all block references are resolved, resolve any environment variables
// Resolve environment variables.
resolvedValue = resolveEnvVars(resolvedValue)
// After all replacements are done, attempt JSON parse if it looks like JSON
try {
if (resolvedValue.startsWith('{') || resolvedValue.startsWith('[')) {
acc[key] = JSON.parse(resolvedValue)
@@ -552,7 +581,6 @@ export class Executor {
acc[key] = resolvedValue
}
} else {
// For non-string values, still try to resolve any nested environment variables
acc[key] = resolveEnvVars(value)
}
return acc
@@ -563,6 +591,9 @@ export class Executor {
return resolvedInputs
}
/**
* Executes a router block which calculates branching decisions based on a prompt.
*/
private async executeRouterBlock(
block: SerializedBlock,
context: ExecutionContext
@@ -580,23 +611,19 @@ export class Executor {
blockTitle: string
}
}> {
// First resolve all inputs including environment variables
// Resolve inputs for the router block.
const resolvedInputs = this.resolveInputs(block, context)
const outgoingConnections = this.workflow.connections.filter((conn) => conn.source === block.id)
const targetBlocks = outgoingConnections.map((conn) => {
const targetBlock = this.workflow.blocks.find((b) => b.id === conn.target)
if (!targetBlock) {
throw new Error(`Target block ${conn.target} not found`)
}
return {
id: targetBlock.id,
type: targetBlock.metadata?.type,
title: targetBlock.metadata?.title,
description: targetBlock.metadata?.description,
category: targetBlock.metadata?.category,
subBlocks: targetBlock.config.params,
currentState: context.blockStates.get(targetBlock.id),
}
@@ -612,34 +639,24 @@ export class Executor {
const model = routerConfig.model || 'gpt-4o'
const providerId = getProviderFromModel(model)
// Generate and send the router prompt.
const response = await executeProviderRequest(providerId, {
model: routerConfig.model,
systemPrompt: generateRouterPrompt(routerConfig.prompt, targetBlocks),
messages: [
{
role: 'user',
content: routerConfig.prompt,
},
],
messages: [{ role: 'user', content: routerConfig.prompt }],
temperature: routerConfig.temperature,
apiKey: routerConfig.apiKey,
})
const chosenBlockId = response.content.trim().toLowerCase()
const chosenBlock = targetBlocks.find((b) => b.id === chosenBlockId)
if (!chosenBlock) {
throw new Error(`Invalid routing decision: ${chosenBlockId}`)
}
// Pass through the actual resolved content from the source block
const sourceContent = resolvedInputs.prompt
// Ensure tokens are properly typed
const tokens = response.tokens || { prompt: 0, completion: 0, total: 0 }
return {
content: sourceContent, // This now contains the actual resolved content from Agent 4
content: resolvedInputs.prompt,
model: response.model,
tokens: {
prompt: tokens.prompt || 0,
@@ -654,6 +671,11 @@ export class Executor {
}
}
/**
* Determines whether a block is reachable along the chosen router path.
*
* This uses a breadth-first search starting from the chosen block id.
*/
private isInChosenPath(blockId: string, chosenBlockId: string, routerId: string): boolean {
const visited = new Set<string>()
const queue = [chosenBlockId]
@@ -662,7 +684,6 @@ export class Executor {
const currentId = queue.shift()!
if (visited.has(currentId)) continue
visited.add(currentId)
const connections = this.workflow.connections.filter((conn) => conn.source === currentId)
for (const conn of connections) {
queue.push(conn.target)
@@ -671,4 +692,125 @@ export class Executor {
return blockId === routerId || visited.has(blockId)
}
/**
* Executes a condition block that evaluates a set of conditions (if/else-if/else).
*
* The block:
* - Parses its conditions.
* - Uses the source block's output to evaluate each condition.
* - Selects the branch matching the evaluation (via sourceHandle in the connection).
* - Returns an output that includes the boolean result and the selected condition's ID.
*/
private async executeConditionalBlock(
block: SerializedBlock,
context: ExecutionContext
): Promise<{
content: string
condition: boolean
selectedConditionId: string
sourceOutput: BlockOutput
selectedPath: {
blockId: string
blockType: string
blockTitle: string
}
}> {
const conditions = JSON.parse(block.config.params.conditions)
console.log('Parsed conditions:', conditions)
// Identify the source block that feeds into this condition block.
const sourceBlockId = this.workflow.connections.find((conn) => conn.target === block.id)?.source
if (!sourceBlockId) {
throw new Error(`No source block found for condition block ${block.id}`)
}
const sourceOutput = context.blockStates.get(sourceBlockId)
if (!sourceOutput) {
throw new Error(`No output found for source block ${sourceBlockId}`)
}
console.log('Source block output:', sourceOutput)
const outgoingConnections = this.workflow.connections.filter((conn) => conn.source === block.id)
console.log('Outgoing connections:', outgoingConnections)
let conditionMet = false
let selectedConnection: { target: string; sourceHandle?: string } | null = null
let selectedCondition: { id: string; title: string; value: string } | null = null
// Evaluate conditions one by one.
for (const condition of conditions) {
try {
// Resolve the condition expression using the current context.
const resolvedCondition = this.resolveInputs(
{
id: block.id,
config: { params: { condition: condition.value }, tool: block.config.tool },
metadata: block.metadata,
position: block.position,
inputs: block.inputs,
outputs: block.outputs,
enabled: block.enabled,
},
context
)
const evalContext = {
...(typeof sourceOutput === 'object' && sourceOutput !== null ? sourceOutput : {}),
agent1: sourceOutput,
}
conditionMet = new Function(
'context',
`with(context) { return ${resolvedCondition.condition} }`
)(evalContext)
// Cast the connection so that TypeScript knows it has a target property.
const connection = outgoingConnections.find(
(conn) => conn.sourceHandle === `condition-${condition.id}`
) as { target: string; sourceHandle?: string } | undefined
if (connection) {
// For if/else-if, require conditionMet to be true.
// For else, unconditionally select it.
if ((condition.title === 'if' || condition.title === 'else if') && conditionMet) {
selectedConnection = connection
selectedCondition = condition
break
} else if (condition.title === 'else') {
selectedConnection = connection
selectedCondition = condition
break
}
}
} catch (error: any) {
console.error(`Failed to evaluate condition: ${error.message}`, {
condition,
error,
})
throw new Error(`Failed to evaluate condition: ${error.message}`)
}
}
if (!selectedConnection || !selectedCondition) {
throw new Error(`No matching path found for condition block ${block.id}`)
}
// Identify the target block based on the selected connection.
const targetBlock = this.workflow.blocks.find((b) => b.id === selectedConnection!.target)
if (!targetBlock) {
throw new Error(`Target block ${selectedConnection!.target} not found`)
}
return {
content: `Condition evaluated to ${conditionMet}`,
condition: conditionMet,
selectedConditionId: selectedCondition.id,
sourceOutput,
selectedPath: {
blockId: targetBlock.id,
blockType: targetBlock.metadata?.type || '',
blockTitle: targetBlock.metadata?.title || '',
},
}
}
}

42
package-lock.json generated
View File

@@ -35,6 +35,7 @@
"reactflow": "^11.11.4",
"tailwind-merge": "^2.6.0",
"tailwindcss-animate": "^1.0.7",
"vm2": "^3.9.19",
"zod": "^3.24.1"
},
"devDependencies": {
@@ -3585,6 +3586,30 @@
"node": ">=6.5"
}
},
"node_modules/acorn": {
"version": "8.14.0",
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.14.0.tgz",
"integrity": "sha512-cl669nCJTZBsL97OF4kUQm5g5hC2uihk0NxY3WENAC0TYdILVkAyHymAntgxGkl7K+t0cXIrH5siy5S4XkFycA==",
"license": "MIT",
"bin": {
"acorn": "bin/acorn"
},
"engines": {
"node": ">=0.4.0"
}
},
"node_modules/acorn-walk": {
"version": "8.3.4",
"resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.3.4.tgz",
"integrity": "sha512-ueEepnujpqee2o5aIYnvHU6C0A42MNdsIDeqy5BydrkuC5R1ZuUFnm27EeFJGoEHJQgn3uleRvmTXaJgfXbt4g==",
"license": "MIT",
"dependencies": {
"acorn": "^8.11.0"
},
"engines": {
"node": ">=0.4.0"
}
},
"node_modules/agentkeepalive": {
"version": "4.6.0",
"resolved": "https://registry.npmjs.org/agentkeepalive/-/agentkeepalive-4.6.0.tgz",
@@ -9268,6 +9293,23 @@
"node": ">=10.12.0"
}
},
"node_modules/vm2": {
"version": "3.9.19",
"resolved": "https://registry.npmjs.org/vm2/-/vm2-3.9.19.tgz",
"integrity": "sha512-J637XF0DHDMV57R6JyVsTak7nIL8gy5KH4r1HiwWLf/4GBbb5MKL5y7LpmF4A8E2nR6XmzpmMFQ7V7ppPTmUQg==",
"deprecated": "The library contains critical security issues and should not be used for production! The maintenance of the project has been discontinued. Consider migrating your code to isolated-vm.",
"license": "MIT",
"dependencies": {
"acorn": "^8.7.0",
"acorn-walk": "^8.2.0"
},
"bin": {
"vm2": "bin/vm2"
},
"engines": {
"node": ">=6.0"
}
},
"node_modules/walker": {
"version": "1.0.8",
"resolved": "https://registry.npmjs.org/walker/-/walker-1.0.8.tgz",

View File

@@ -41,6 +41,7 @@
"reactflow": "^11.11.4",
"tailwind-merge": "^2.6.0",
"tailwindcss-animate": "^1.0.7",
"vm2": "^3.9.19",
"zod": "^3.24.1"
},
"devDependencies": {

View File

@@ -12,6 +12,10 @@ export interface SerializedConnection {
target: string
sourceHandle?: string
targetHandle?: string
condition?: {
type: 'if' | 'else' | 'else if'
expression?: string // JavaScript expression to evaluate
}
}
export interface SerializedBlock {

View File

@@ -2,21 +2,26 @@ import { ToolConfig, ToolResponse } from '../types'
export interface CodeExecutionInput {
code: Array<{ content: string; id: string }> | string
input?: Record<string, any>
timeout?: number
memoryLimit?: number
}
export interface CodeExecutionOutput extends ToolResponse {
output: {
result: any
stdout: string
executionTime: number
}
}
const DEFAULT_TIMEOUT = 3000 // 3 seconds
const DEFAULT_MEMORY_LIMIT = 512 // 512MB
export const functionExecuteTool: ToolConfig<CodeExecutionInput, CodeExecutionOutput> = {
id: 'function_execute',
name: 'Function Execute',
description:
'Execute code snippets in a secure, sandboxed environment with support for multiple languages. Captures both function output and stdout with proper error handling.',
'Execute JavaScript code in a secure, sandboxed environment with proper isolation and resource limits.',
version: '1.0.0',
params: {
@@ -25,71 +30,54 @@ export const functionExecuteTool: ToolConfig<CodeExecutionInput, CodeExecutionOu
required: true,
description: 'The code to execute',
},
},
request: {
url: 'https://emkc.org/api/v2/piston/execute',
method: 'POST',
headers: () => ({
'Content-Type': 'application/json',
Accept: 'application/json',
}),
body: (params) => {
const codeContent = Array.isArray(params.code)
? params.code.map((c) => c.content).join('\n')
: params.code
return {
language: 'js',
version: '*',
files: [
{
name: 'code.js',
content: codeContent,
},
],
stdin: '',
args: [],
compile_timeout: 10000,
run_timeout: 3000,
compile_memory_limit: -1,
run_memory_limit: -1,
}
timeout: {
type: 'number',
required: false,
description: 'Execution timeout in milliseconds',
default: DEFAULT_TIMEOUT,
},
memoryLimit: {
type: 'number',
required: false,
description: 'Memory limit in MB',
default: DEFAULT_MEMORY_LIMIT,
},
},
transformResponse: async (response) => {
request: {
url: '/api/execute',
method: 'POST',
headers: () => ({
'Content-Type': 'application/json',
}),
body: (params: CodeExecutionInput) => {
const codeContent = Array.isArray(params.code)
? params.code.map((c: { content: string }) => c.content).join('\n')
: params.code
return {
code: codeContent,
timeout: params.timeout || DEFAULT_TIMEOUT,
memoryLimit: params.memoryLimit || DEFAULT_MEMORY_LIMIT,
}
},
isInternalRoute: true,
},
transformResponse: async (response: Response): Promise<CodeExecutionOutput> => {
const result = await response.json()
if (!response.ok) {
throw new Error(result.message || 'Execution failed')
if (!response.ok || !result.success) {
throw new Error(result.error || 'Code execution failed')
}
if (result.run?.stderr) {
throw new Error(result.run.stderr)
}
const stdout = result.run?.stdout || ''
try {
// Try parsing the output as JSON
const parsed = JSON.parse(stdout)
return {
success: true,
output: {
result: parsed,
stdout,
},
}
} catch {
// If not JSON, return as string
return {
success: true,
output: {
result: stdout,
stdout,
},
}
return {
success: true,
output: {
result: result.output.result,
stdout: result.output.stdout,
executionTime: result.output.executionTime,
},
}
},

View File

@@ -57,12 +57,33 @@ export function getTool(toolId: string): ToolConfig | undefined {
return tools[toolId]
}
// Execute a tool by calling the reverse proxy endpoint.
// Execute a tool by calling either the proxy for external APIs or directly for internal routes
export async function executeTool(
toolId: string,
params: Record<string, any>
): Promise<ToolResponse> {
try {
const tool = getTool(toolId)
if (!tool) {
throw new Error(`Tool not found: ${toolId}`)
}
// For internal routes, call the API directly
if (tool.request.isInternalRoute) {
const url =
typeof tool.request.url === 'function' ? tool.request.url(params) : tool.request.url
const response = await fetch(url, {
method: tool.request.method,
headers: tool.request.headers(params),
body: JSON.stringify(tool.request.body ? tool.request.body(params) : params),
})
const result = await tool.transformResponse(response)
return result
}
// For external APIs, use the proxy
const response = await fetch('/api/proxy', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },

View File

@@ -31,6 +31,7 @@ export interface ToolConfig<P = any, R extends ToolResponse = ToolResponse> {
method: string
headers: (params: P) => Record<string, string>
body?: (params: P) => Record<string, any>
isInternalRoute?: boolean // Whether this is an internal API route
}
// Response handling