diff --git a/app/api/execute/route.ts b/app/api/execute/route.ts new file mode 100644 index 000000000..4b363eba3 --- /dev/null +++ b/app/api/execute/route.ts @@ -0,0 +1,89 @@ +import { NextRequest, NextResponse } from 'next/server' +import { Script, createContext } from 'vm' + +// Explicitly export allowed methods +export const dynamic = 'force-dynamic' // Disable static optimization +export const runtime = 'nodejs' // Use Node.js runtime + +export async function POST(req: NextRequest) { + const startTime = Date.now() + let stdout = '' + + try { + const body = await req.json() + + const { code, timeout = 3000 } = body + + // Check if code contains unresolved template variables + if (code.includes('<') && code.includes('>')) { + throw new Error( + 'Code contains unresolved template variables. Please ensure all variables are resolved before execution.' + ) + } + + // Create a secure context with console logging + const context = createContext({ + console: { + log: (...args: any[]) => { + const logMessage = + args + .map((arg) => (typeof arg === 'object' ? JSON.stringify(arg, null, 2) : String(arg))) + .join(' ') + '\n' + stdout += logMessage + }, + error: (...args: any[]) => { + const errorMessage = + args + .map((arg) => (typeof arg === 'object' ? JSON.stringify(arg, null, 2) : String(arg))) + .join(' ') + '\n' + console.error('❌ Code Console Error:', errorMessage.trim()) + stdout += 'ERROR: ' + errorMessage + }, + }, + }) + + // Create and run the script + const script = new Script(` + (async () => { + try { + ${code} + } catch (error) { + console.error(error); + throw error; + } + })() + `) + + const result = await script.runInContext(context, { + timeout, + displayErrors: true, + }) + + const executionTime = Date.now() - startTime + + const response = { + success: true, + output: { + result, + stdout, + executionTime, + }, + } + + return NextResponse.json(response) + } catch (error: any) { + const executionTime = Date.now() - startTime + + const errorResponse = { + success: false, + error: error.message || 'Code execution failed', + output: { + result: null, + stdout, + executionTime, + }, + } + + return NextResponse.json(errorResponse, { status: 500 }) + } +} diff --git a/app/w/[id]/components/workflow-block/components/sub-block/components/condition-input.tsx b/app/w/[id]/components/workflow-block/components/sub-block/components/condition-input.tsx index d560e3caf..5e0ff5b7e 100644 --- a/app/w/[id]/components/workflow-block/components/sub-block/components/condition-input.tsx +++ b/app/w/[id]/components/workflow-block/components/sub-block/components/condition-input.tsx @@ -205,7 +205,7 @@ export function ConditionInput({ blockId, subBlockId, isConnecting }: ConditionI for (let i = 0; i < height; i++) { numbers.push(
0 && 'invisible')} > {lineNumber} diff --git a/blocks/blocks/condition.ts b/blocks/blocks/condition.ts index c231a460b..e4d17f376 100644 --- a/blocks/blocks/condition.ts +++ b/blocks/blocks/condition.ts @@ -23,6 +23,7 @@ export const ConditionBlock: BlockConfig = { type: { result: 'any', stdout: 'string', + executionTime: 'number', }, }, }, diff --git a/blocks/blocks/function.ts b/blocks/blocks/function.ts index 745166ab0..2170ed2bc 100644 --- a/blocks/blocks/function.ts +++ b/blocks/blocks/function.ts @@ -17,12 +17,15 @@ export const FunctionBlock: BlockConfig = { workflow: { inputs: { code: { type: 'string', required: true }, + timeout: { type: 'number', required: false }, + memoryLimit: { type: 'number', required: false }, }, outputs: { response: { type: { result: 'any', stdout: 'string', + executionTime: 'number', }, }, }, diff --git a/executor/index.ts b/executor/index.ts index 84f0456a6..62a03a41b 100644 --- a/executor/index.ts +++ b/executor/index.ts @@ -1,12 +1,18 @@ /** - * "Executor" for running agentic workflows in parallel. + * Executor for running agentic workflows in parallel. * - * Notes & Features: - * • Uses a layered topological sort to allow parallel block execution for blocks with no remaining dependencies. - * • Each block's inputs are resolved through a template mechanism (e.g., ). - * • Stores block outputs in context.blockStates so subsequent blocks can reference them by ID or name. - * • Maintains robust error handling (if a block fails, throws an error for the entire workflow). - * • Returns per-block logs that can be displayed in the UI for better trace/debug. + * High-Level Overview: + * - This class is responsible for running workflows using a layered topological sort. + * - Blocks that have no unresolved dependencies are executed in parallel. + * - Depending on the block type (router, condition, agent, or regular tool), different execution + * logic is applied. For example, condition blocks evaluate multiple branches and record the + * chosen branch via its condition ID so that only that path is executed. + * - Each block's output is stored in the ExecutionContext so that subsequent blocks can reference them. + * - Detailed logs are collected for each block to assist with debugging. + * + * Error Handling: + * - If a block fails, an error is thrown, halting the workflow. + * - Meaningful error messages are provided. */ import { getAllBlocks } from '@/blocks' import { generateRouterPrompt } from '@/blocks/blocks/router' @@ -15,24 +21,24 @@ import { BlockConfig } from '@/blocks/types' import { executeProviderRequest } from '@/providers/service' import { getProviderFromModel } from '@/providers/utils' import { SerializedBlock, SerializedWorkflow } from '@/serializer/types' -import { executeTool, getTool, tools } from '@/tools' +import { executeTool, getTool } from '@/tools' import { BlockLog, ExecutionContext, ExecutionResult, Tool } from './types' export class Executor { constructor( private workflow: SerializedWorkflow, - // Initial block states can be passed in if you need to resume workflows or pre-populate data. + // Initial block states can be passed in (e.g., for resuming workflows or pre-populating data) private initialBlockStates: Record = {}, private environmentVariables: Record = {} ) {} /** - * Main entry point that executes the entire workflow in parallel layers. + * Main entry point that executes the entire workflow in layered parallel fashion. */ async execute(workflowId: string): Promise { const startTime = new Date() - // Build the ExecutionContext with new blockLogs array + // Build the execution context: holds outputs, logs, metadata, and environment variables. const context: ExecutionContext = { workflowId, blockStates: new Map(), @@ -43,19 +49,18 @@ export class Executor { environmentVariables: this.environmentVariables, } - // Pre-populate block states if initialBlockStates exist + // Pre-populate context with any initial block states. Object.entries(this.initialBlockStates).forEach(([blockId, output]) => { context.blockStates.set(blockId, output) }) try { - // Perform layered parallel execution + // Execute all blocks in parallel layers (using topological sorting). const lastOutput = await this.executeInParallel(context) const endTime = new Date() context.metadata.endTime = endTime.toISOString() - // Return full logs for the UI to consume return { success: true, output: lastOutput, @@ -67,7 +72,6 @@ export class Executor { logs: context.blockLogs, } } catch (error: any) { - // Ensure we return a meaningful error message return { success: false, output: { response: {} }, @@ -78,138 +82,184 @@ export class Executor { } /** - * Executes all blocks in a layered topological fashion, running each layer in parallel via Promise.all. - * If a cycle is detected, throws an error. + * Executes workflow blocks layer-by-layer. Blocks with no dependencies are processed together. + * + * Notes: + * - Maintains in-degrees and adjacency lists for blocks (i.e. dependencies). + * - Blocks with condition or router types update routing/conditional decisions. + * - Only the branch corresponding to the evaluated condition is executed. */ private async executeInParallel(context: ExecutionContext): Promise { const { blocks, connections } = this.workflow - // Build in-degree and adjacency list for each block + // Build dependency graphs: inDegree (number of incoming edges) and adjacency (outgoing connections) const inDegree = new Map() const adjacency = new Map() - // Initialize inDegree and adjacency for (const block of blocks) { inDegree.set(block.id, 0) adjacency.set(block.id, []) } - // Populate edges + // Populate inDegree and adjacency. For conditional connections, inDegree is handled dynamically. for (const conn of connections) { - inDegree.set(conn.target, (inDegree.get(conn.target) || 0) + 1) + // Increase inDegree only for regular (non-conditional) connections. + if (!conn.condition) { + inDegree.set(conn.target, (inDegree.get(conn.target) || 0) + 1) + } adjacency.get(conn.source)?.push(conn.target) } + // Maps for router and conditional decisions. + // routerDecisions: router block id -> chosen target block id. + // activeConditionalPaths: conditional block id -> selected condition id. + const routerDecisions = new Map() + const activeConditionalPaths = new Map() + + // Queue initially contains all blocks without dependencies. + const queue: string[] = [] + for (const [blockId, degree] of inDegree) { + if (degree === 0) { + queue.push(blockId) + } + } + + // This variable will store the output of the latest executed block. let lastOutput: BlockOutput = { response: {} } - let routerDecision: { routerId: string; chosenPath: string } | null = null - // Start with all blocks that have inDegree = 0 - let layer = blocks.filter((b) => (inDegree.get(b.id) || 0) === 0).map((b) => b.id) + // Process blocks layer by layer. + while (queue.length > 0) { + const currentLayer = [...queue] + queue.length = 0 - while (layer.length > 0) { - // Execute current layer in parallel, but only if blocks are in the chosen path - const results = await Promise.all( - layer - .filter((blockId) => { - // If we have a router decision, only execute blocks in the chosen path - if (routerDecision) { - return this.isInChosenPath( - blockId, - routerDecision.chosenPath, - routerDecision.routerId - ) + // Filtering: only execute blocks that match router and conditional decisions. + const executableBlocks = currentLayer.filter((blockId) => { + // Verify if block lies on the router's chosen path. + for (const [routerId, chosenPath] of routerDecisions) { + if (!this.isInChosenPath(blockId, chosenPath, routerId)) { + return false + } + } + + // Verify if block lies on the selected conditional path. + for (const [conditionBlockId, selectedConditionId] of activeConditionalPaths) { + const connection = connections.find( + (conn) => + conn.source === conditionBlockId && + conn.target === blockId && + conn.sourceHandle?.startsWith('condition-') + ) + if (connection) { + // Extract condition id from sourceHandle (format: "condition-") + const connConditionId = connection.sourceHandle?.replace('condition-', '') + if (connConditionId !== selectedConditionId) { + return false } - return true - }) - .map(async (blockId) => { - const block = blocks.find((b) => b.id === blockId) - if (!block) { - throw new Error(`Missing block ${blockId}`) + } + } + return true + }) + + // Execute blocks in the current layer in parallel. + const layerResults = await Promise.all( + executableBlocks.map(async (blockId) => { + const block = blocks.find((b) => b.id === blockId) + if (!block) { + throw new Error(`Block ${blockId} not found`) + } + + // Resolve inputs (including template variables and env vars) for the block. + const inputs = this.resolveInputs(block, context) + const result = await this.executeBlock(block, inputs, context) + // Store the block output in context for later reference. + context.blockStates.set(block.id, result) + // Update lastOutput to reflect the latest executed block. + lastOutput = result + + // For router or condition blocks, update decision maps accordingly. + if (block.metadata?.type === 'router') { + const routerResult = result as { + response: { + content: string + model: string + tokens: { + prompt: number + completion: number + total: number + } + selectedPath: { blockId: string } + } } - - // Skip disabled blocks - if (block.enabled === false) { - return { response: {} } - } - - try { - const resolvedInputs = this.resolveInputs(block, context) - const output = await this.executeBlock(block, resolvedInputs, context) - - // If this is a router block, store its decision - if ( - block.metadata?.type === 'router' && - output && - typeof output === 'object' && - 'response' in output && - output.response && - typeof output.response === 'object' && - 'selectedPath' in output.response - ) { - const routerResponse = output.response as { selectedPath: { blockId: string } } - routerDecision = { - routerId: block.id, - chosenPath: routerResponse.selectedPath.blockId, + routerDecisions.set(block.id, routerResult.response.selectedPath.blockId) + } else if (block.metadata?.type === 'condition') { + const conditionResult = result as { + response: { + condition: { + selectedConditionId: string + result: boolean } } - - context.blockStates.set(block.id, output) - return output - } catch (error) { - throw error } - }) + activeConditionalPaths.set( + block.id, + conditionResult.response.condition.selectedConditionId + ) + } + return blockId + }) ) - if (results.length > 0) { - lastOutput = results[results.length - 1] - } + // After executing a layer, update in-degrees for all adjacent blocks. + for (const finishedBlockId of layerResults) { + const neighbors = adjacency.get(finishedBlockId) || [] + for (const neighbor of neighbors) { + // Find the relevant connection from finishedBlockId to neighbor. + const connection = connections.find( + (conn) => conn.source === finishedBlockId && conn.target === neighbor + ) + if (!connection) continue - // Build the next layer by reducing in-degree of neighbors - const nextLayer: string[] = [] - for (const blockId of layer) { - const neighbors = adjacency.get(blockId) || [] - for (const targetId of neighbors) { - const deg = inDegree.get(targetId) ?? 0 - const newDeg = deg - 1 - inDegree.set(targetId, newDeg) - if (newDeg === 0) { - nextLayer.push(targetId) + // Regular (non-conditional) connection: always decrement. + if (!connection.sourceHandle || !connection.sourceHandle.startsWith('condition-')) { + const newDegree = (inDegree.get(neighbor) || 0) - 1 + inDegree.set(neighbor, newDegree) + if (newDegree === 0) { + queue.push(neighbor) + } + } else { + // For a conditional connection, only decrement if the active condition matches. + const conditionId = connection.sourceHandle.replace('condition-', '') + if (activeConditionalPaths.get(finishedBlockId) === conditionId) { + const newDegree = (inDegree.get(neighbor) || 0) - 1 + inDegree.set(neighbor, newDegree) + if (newDegree === 0) { + queue.push(neighbor) + } + } } } } - - layer = nextLayer - } - - // Validate that all blocks were executed. If not, the workflow has a cycle. - const executedCount = [...inDegree.values()].filter((x) => x === 0).length - if (executedCount !== blocks.length) { - throw new Error('Workflow contains cycles or invalid connections') } return lastOutput } /** - * Executes a single block by: - * 1) Determining which tool to call - * 2) Validating parameters - * 3) Making the request (for http blocks or LLM blocks, etc.) - * 4) Transforming the response via the tool's transformResponse + * Executes a single block. Deduces the tool to call, validates parameters, + * makes the request, and transforms the response. + * + * The result is logged and returned. */ private async executeBlock( block: SerializedBlock, inputs: Record, context: ExecutionContext ): Promise { - // Start timing const startTime = new Date() - const blockLog: BlockLog = { blockId: block.id, - blockTitle: block.metadata?.title, - blockType: block.metadata?.type, + blockTitle: block.metadata?.title || '', + blockType: block.metadata?.type || '', startedAt: startTime.toISOString(), endedAt: '', durationMs: 0, @@ -219,7 +269,7 @@ export class Executor { try { let output: BlockOutput - // Handle router blocks differently + // Execute block based on its type. if (block.metadata?.type === 'router') { const routerOutput = await this.executeRouterBlock(block, context) output = { @@ -230,18 +280,28 @@ export class Executor { selectedPath: routerOutput.selectedPath, }, } + } else if (block.metadata?.type === 'condition') { + const conditionResult = await this.executeConditionalBlock(block, context) + output = { + response: { + result: conditionResult.condition, + content: conditionResult.content, + condition: { + result: conditionResult.condition, + selectedPath: conditionResult.selectedPath, + selectedConditionId: conditionResult.selectedConditionId, + }, + }, + } } else if (block.metadata?.type === 'agent') { - // Special handling for agent blocks that use providers - let responseFormat = undefined + // Agent block: use a provider request. + let responseFormat: any = undefined if (inputs.responseFormat) { try { - // If it's already a string, parse it once - if (typeof inputs.responseFormat === 'string') { - responseFormat = JSON.parse(inputs.responseFormat) - } else { - // If it's somehow already an object, use it directly - responseFormat = inputs.responseFormat - } + responseFormat = + typeof inputs.responseFormat === 'string' + ? JSON.parse(inputs.responseFormat) + : inputs.responseFormat } catch (error: any) { console.error('Error parsing responseFormat:', error) throw new Error('Invalid response format: ' + error.message) @@ -251,20 +311,16 @@ export class Executor { const model = inputs.model || 'gpt-4o' const providerId = getProviderFromModel(model) - // Format tools if they exist - const tools = Array.isArray(inputs.tools) + // Format tools if provided. (Rename local variable to avoid conflict with imported "tools".) + const formattedTools = Array.isArray(inputs.tools) ? inputs.tools .map((tool: any) => { - const block = getAllBlocks().find((b: BlockConfig) => b.type === tool.type) - const toolId = block?.tools.access[0] - if (!toolId) { - return null - } + const blockFound = getAllBlocks().find((b: BlockConfig) => b.type === tool.type) + const toolId = blockFound?.tools.access[0] + if (!toolId) return null const toolConfig = getTool(toolId) - if (!toolConfig) { - return null - } + if (!toolConfig) return null return { id: toolConfig.id, @@ -296,10 +352,11 @@ export class Executor { const response = await executeProviderRequest(providerId, { model, systemPrompt: inputs.systemPrompt, - context: Array.isArray(inputs.context) - ? JSON.stringify(inputs.context, null, 2) - : inputs.context, - tools: tools.length > 0 ? tools : undefined, + context: + Array.isArray(inputs.context) === true + ? JSON.stringify(inputs.context, null, 2) + : inputs.context, + tools: formattedTools.length > 0 ? formattedTools : undefined, temperature: inputs.temperature, maxTokens: inputs.maxTokens, apiKey: inputs.apiKey, @@ -337,10 +394,10 @@ export class Executor { }, } } else { - // Regular tool execution + // Regular tool block execution. const tool = getTool(block.config.tool) if (!tool) { - throw new Error(`Tool ${block.config.tool} not found`) + throw new Error(`Tool not found: ${block.config.tool}`) } const result = await executeTool(block.config.tool, inputs) @@ -348,41 +405,35 @@ export class Executor { console.error('Tool execution failed:', result.error) throw new Error(result.error || `Tool ${block.config.tool} failed with no error message`) } - output = { response: result.output } } + // Mark block execution as successful and record timing. blockLog.success = true blockLog.output = output - - // Compute timing const endTime = new Date() blockLog.endedAt = endTime.toISOString() blockLog.durationMs = endTime.getTime() - startTime.getTime() - - // Add log entry context.blockLogs.push(blockLog) + // Ensure block output is available in the context for downstream blocks. + context.blockStates.set(block.id, output) return output } catch (error: any) { - // Update block log with error + // On error: log the error, update blockLog, and rethrow. blockLog.success = false - blockLog.error = error.message || `Block execution failed` - - // Compute timing + blockLog.error = error.message || 'Block execution failed' const endTime = new Date() blockLog.endedAt = endTime.toISOString() blockLog.durationMs = endTime.getTime() - startTime.getTime() - - // Add log entry context.blockLogs.push(blockLog) - throw error } } /** - * Validates required parameters for a Tool, or uses defaults if present. + * Validates required parameters for a given tool configuration. + * Uses defaults when available and throws an error if a required parameter is missing. */ private validateToolParams(tool: Tool, params: Record): Record { return Object.entries(tool.params).reduce( @@ -401,13 +452,14 @@ export class Executor { } /** - * Resolves any template references in a block's config params (e.g., ""), - * pulling from context.blockStates. This is how outputs from one block get wired as inputs to another. + * Resolves template references in a block's configuration (e.g., ""), + * as well as environment variables (format: "{{ENV_VAR}}"). + * The values are pulled from the context's blockStates and environmentVariables. */ private resolveInputs(block: SerializedBlock, context: ExecutionContext): Record { const inputs = { ...block.config.params } - // Create quick-lookup for blocks by ID and by normalized name + // Create quick lookups for blocks by ID and by normalized title. const blockById = new Map(this.workflow.blocks.map((b) => [b.id, b])) const blockByName = new Map( this.workflow.blocks.map((b) => [ @@ -416,7 +468,7 @@ export class Executor { ]) ) - // Helper function to resolve environment variables in a value + // Helper to resolve environment variables in a given value. const resolveEnvVars = (value: any): any => { if (typeof value === 'string') { const envMatches = value.match(/\{\{([^}]+)\}\}/g) @@ -425,11 +477,9 @@ export class Executor { for (const match of envMatches) { const envKey = match.slice(2, -2) const envValue = this.environmentVariables?.[envKey] - if (envValue === undefined) { throw new Error(`Environment variable "${envKey}" was not found.`) } - resolvedValue = resolvedValue.replace(match, envValue) } return resolvedValue @@ -453,76 +503,58 @@ export class Executor { if (typeof value === 'string') { let resolvedValue = value - // Handle block references with <> syntax + // Resolve block reference templates in the format "" const blockMatches = value.match(/<([^>]+)>/g) if (blockMatches) { for (const match of blockMatches) { // e.g. "" - const path = match.slice(1, -1) // remove < and > + const path = match.slice(1, -1) const [blockRef, ...pathParts] = path.split('.') - - // Try referencing as an ID, then as a normalized name. let sourceBlock = blockById.get(blockRef) if (!sourceBlock) { const normalized = blockRef.toLowerCase().replace(/\s+/g, '') sourceBlock = blockByName.get(normalized) } - if (!sourceBlock) { throw new Error(`Block reference "${blockRef}" was not found.`) } - - // Check if the referenced block is disabled. if (sourceBlock.enabled === false) { throw new Error( `Block "${sourceBlock.metadata?.title}" is disabled, and block "${block.metadata?.title}" depends on it.` ) } - const sourceState = context.blockStates.get(sourceBlock.id) if (!sourceState) { throw new Error( `No state found for block "${sourceBlock.metadata?.title}" (ID: ${sourceBlock.id}).` ) } - - // Drill into the path + // Drill into the property path. let replacementValue: any = sourceState for (const part of pathParts) { if (!replacementValue || typeof replacementValue !== 'object') { throw new Error( - `Invalid path part "${part}" in "${path}" for block "${block.metadata?.title}".` + `Invalid path "${part}" in "${path}" for block "${block.metadata?.title}".` ) } - - // Check if we have a response format and this is a field from it - const responseFormat = sourceBlock.config.params?.responseFormat - if (responseFormat && typeof responseFormat === 'string') { - try { - const parsedFormat = JSON.parse(responseFormat) - if (parsedFormat?.fields?.some((f: any) => f.name === part)) { - replacementValue = replacementValue[part] - continue - } - } catch (e) { - console.error('Error parsing response format:', e) - } - } - + // Optional: special-case formatting for response formats. replacementValue = replacementValue[part] } - - // If a valid leaf is found if (replacementValue !== undefined) { - // Special handling for context field to ensure proper formatting - if (key === 'context') { - // If it's not a string, stringify it for context + if (block.metadata?.type === 'function' && key === 'code') { + // For function blocks, format the code nicely. + resolvedValue = resolvedValue.replace( + match, + typeof replacementValue === 'object' + ? JSON.stringify(replacementValue, null, 2) + : JSON.stringify(String(replacementValue)) + ) + } else if (key === 'context') { resolvedValue = typeof replacementValue === 'string' ? replacementValue : JSON.stringify(replacementValue, null, 2) } else { - // For all other fields, use existing logic resolvedValue = resolvedValue.replace( match, typeof replacementValue === 'object' @@ -537,11 +569,8 @@ export class Executor { } } } - - // After all block references are resolved, resolve any environment variables + // Resolve environment variables. resolvedValue = resolveEnvVars(resolvedValue) - - // After all replacements are done, attempt JSON parse if it looks like JSON try { if (resolvedValue.startsWith('{') || resolvedValue.startsWith('[')) { acc[key] = JSON.parse(resolvedValue) @@ -552,7 +581,6 @@ export class Executor { acc[key] = resolvedValue } } else { - // For non-string values, still try to resolve any nested environment variables acc[key] = resolveEnvVars(value) } return acc @@ -563,6 +591,9 @@ export class Executor { return resolvedInputs } + /** + * Executes a router block which calculates branching decisions based on a prompt. + */ private async executeRouterBlock( block: SerializedBlock, context: ExecutionContext @@ -580,23 +611,19 @@ export class Executor { blockTitle: string } }> { - // First resolve all inputs including environment variables + // Resolve inputs for the router block. const resolvedInputs = this.resolveInputs(block, context) - const outgoingConnections = this.workflow.connections.filter((conn) => conn.source === block.id) - const targetBlocks = outgoingConnections.map((conn) => { const targetBlock = this.workflow.blocks.find((b) => b.id === conn.target) if (!targetBlock) { throw new Error(`Target block ${conn.target} not found`) } - return { id: targetBlock.id, type: targetBlock.metadata?.type, title: targetBlock.metadata?.title, description: targetBlock.metadata?.description, - category: targetBlock.metadata?.category, subBlocks: targetBlock.config.params, currentState: context.blockStates.get(targetBlock.id), } @@ -612,34 +639,24 @@ export class Executor { const model = routerConfig.model || 'gpt-4o' const providerId = getProviderFromModel(model) + // Generate and send the router prompt. const response = await executeProviderRequest(providerId, { model: routerConfig.model, systemPrompt: generateRouterPrompt(routerConfig.prompt, targetBlocks), - messages: [ - { - role: 'user', - content: routerConfig.prompt, - }, - ], + messages: [{ role: 'user', content: routerConfig.prompt }], temperature: routerConfig.temperature, apiKey: routerConfig.apiKey, }) const chosenBlockId = response.content.trim().toLowerCase() const chosenBlock = targetBlocks.find((b) => b.id === chosenBlockId) - if (!chosenBlock) { throw new Error(`Invalid routing decision: ${chosenBlockId}`) } - // Pass through the actual resolved content from the source block - const sourceContent = resolvedInputs.prompt - - // Ensure tokens are properly typed const tokens = response.tokens || { prompt: 0, completion: 0, total: 0 } - return { - content: sourceContent, // This now contains the actual resolved content from Agent 4 + content: resolvedInputs.prompt, model: response.model, tokens: { prompt: tokens.prompt || 0, @@ -654,6 +671,11 @@ export class Executor { } } + /** + * Determines whether a block is reachable along the chosen router path. + * + * This uses a breadth-first search starting from the chosen block id. + */ private isInChosenPath(blockId: string, chosenBlockId: string, routerId: string): boolean { const visited = new Set() const queue = [chosenBlockId] @@ -662,7 +684,6 @@ export class Executor { const currentId = queue.shift()! if (visited.has(currentId)) continue visited.add(currentId) - const connections = this.workflow.connections.filter((conn) => conn.source === currentId) for (const conn of connections) { queue.push(conn.target) @@ -671,4 +692,125 @@ export class Executor { return blockId === routerId || visited.has(blockId) } + + /** + * Executes a condition block that evaluates a set of conditions (if/else-if/else). + * + * The block: + * - Parses its conditions. + * - Uses the source block's output to evaluate each condition. + * - Selects the branch matching the evaluation (via sourceHandle in the connection). + * - Returns an output that includes the boolean result and the selected condition's ID. + */ + private async executeConditionalBlock( + block: SerializedBlock, + context: ExecutionContext + ): Promise<{ + content: string + condition: boolean + selectedConditionId: string + sourceOutput: BlockOutput + selectedPath: { + blockId: string + blockType: string + blockTitle: string + } + }> { + const conditions = JSON.parse(block.config.params.conditions) + console.log('Parsed conditions:', conditions) + + // Identify the source block that feeds into this condition block. + const sourceBlockId = this.workflow.connections.find((conn) => conn.target === block.id)?.source + + if (!sourceBlockId) { + throw new Error(`No source block found for condition block ${block.id}`) + } + + const sourceOutput = context.blockStates.get(sourceBlockId) + if (!sourceOutput) { + throw new Error(`No output found for source block ${sourceBlockId}`) + } + console.log('Source block output:', sourceOutput) + + const outgoingConnections = this.workflow.connections.filter((conn) => conn.source === block.id) + console.log('Outgoing connections:', outgoingConnections) + + let conditionMet = false + let selectedConnection: { target: string; sourceHandle?: string } | null = null + let selectedCondition: { id: string; title: string; value: string } | null = null + + // Evaluate conditions one by one. + for (const condition of conditions) { + try { + // Resolve the condition expression using the current context. + const resolvedCondition = this.resolveInputs( + { + id: block.id, + config: { params: { condition: condition.value }, tool: block.config.tool }, + metadata: block.metadata, + position: block.position, + inputs: block.inputs, + outputs: block.outputs, + enabled: block.enabled, + }, + context + ) + const evalContext = { + ...(typeof sourceOutput === 'object' && sourceOutput !== null ? sourceOutput : {}), + agent1: sourceOutput, + } + conditionMet = new Function( + 'context', + `with(context) { return ${resolvedCondition.condition} }` + )(evalContext) + + // Cast the connection so that TypeScript knows it has a target property. + const connection = outgoingConnections.find( + (conn) => conn.sourceHandle === `condition-${condition.id}` + ) as { target: string; sourceHandle?: string } | undefined + + if (connection) { + // For if/else-if, require conditionMet to be true. + // For else, unconditionally select it. + if ((condition.title === 'if' || condition.title === 'else if') && conditionMet) { + selectedConnection = connection + selectedCondition = condition + break + } else if (condition.title === 'else') { + selectedConnection = connection + selectedCondition = condition + break + } + } + } catch (error: any) { + console.error(`Failed to evaluate condition: ${error.message}`, { + condition, + error, + }) + throw new Error(`Failed to evaluate condition: ${error.message}`) + } + } + + if (!selectedConnection || !selectedCondition) { + throw new Error(`No matching path found for condition block ${block.id}`) + } + + // Identify the target block based on the selected connection. + const targetBlock = this.workflow.blocks.find((b) => b.id === selectedConnection!.target) + if (!targetBlock) { + throw new Error(`Target block ${selectedConnection!.target} not found`) + } + + return { + content: `Condition evaluated to ${conditionMet}`, + condition: conditionMet, + selectedConditionId: selectedCondition.id, + sourceOutput, + selectedPath: { + blockId: targetBlock.id, + blockType: targetBlock.metadata?.type || '', + blockTitle: targetBlock.metadata?.title || '', + }, + } + } } diff --git a/package-lock.json b/package-lock.json index 8f0967014..eb82911aa 100644 --- a/package-lock.json +++ b/package-lock.json @@ -35,6 +35,7 @@ "reactflow": "^11.11.4", "tailwind-merge": "^2.6.0", "tailwindcss-animate": "^1.0.7", + "vm2": "^3.9.19", "zod": "^3.24.1" }, "devDependencies": { @@ -3585,6 +3586,30 @@ "node": ">=6.5" } }, + "node_modules/acorn": { + "version": "8.14.0", + "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.14.0.tgz", + "integrity": "sha512-cl669nCJTZBsL97OF4kUQm5g5hC2uihk0NxY3WENAC0TYdILVkAyHymAntgxGkl7K+t0cXIrH5siy5S4XkFycA==", + "license": "MIT", + "bin": { + "acorn": "bin/acorn" + }, + "engines": { + "node": ">=0.4.0" + } + }, + "node_modules/acorn-walk": { + "version": "8.3.4", + "resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.3.4.tgz", + "integrity": "sha512-ueEepnujpqee2o5aIYnvHU6C0A42MNdsIDeqy5BydrkuC5R1ZuUFnm27EeFJGoEHJQgn3uleRvmTXaJgfXbt4g==", + "license": "MIT", + "dependencies": { + "acorn": "^8.11.0" + }, + "engines": { + "node": ">=0.4.0" + } + }, "node_modules/agentkeepalive": { "version": "4.6.0", "resolved": "https://registry.npmjs.org/agentkeepalive/-/agentkeepalive-4.6.0.tgz", @@ -9268,6 +9293,23 @@ "node": ">=10.12.0" } }, + "node_modules/vm2": { + "version": "3.9.19", + "resolved": "https://registry.npmjs.org/vm2/-/vm2-3.9.19.tgz", + "integrity": "sha512-J637XF0DHDMV57R6JyVsTak7nIL8gy5KH4r1HiwWLf/4GBbb5MKL5y7LpmF4A8E2nR6XmzpmMFQ7V7ppPTmUQg==", + "deprecated": "The library contains critical security issues and should not be used for production! The maintenance of the project has been discontinued. Consider migrating your code to isolated-vm.", + "license": "MIT", + "dependencies": { + "acorn": "^8.7.0", + "acorn-walk": "^8.2.0" + }, + "bin": { + "vm2": "bin/vm2" + }, + "engines": { + "node": ">=6.0" + } + }, "node_modules/walker": { "version": "1.0.8", "resolved": "https://registry.npmjs.org/walker/-/walker-1.0.8.tgz", diff --git a/package.json b/package.json index 95a3d69be..fb7851ff9 100644 --- a/package.json +++ b/package.json @@ -41,6 +41,7 @@ "reactflow": "^11.11.4", "tailwind-merge": "^2.6.0", "tailwindcss-animate": "^1.0.7", + "vm2": "^3.9.19", "zod": "^3.24.1" }, "devDependencies": { diff --git a/serializer/types.ts b/serializer/types.ts index 2c2ed9aeb..d97d560ae 100644 --- a/serializer/types.ts +++ b/serializer/types.ts @@ -12,6 +12,10 @@ export interface SerializedConnection { target: string sourceHandle?: string targetHandle?: string + condition?: { + type: 'if' | 'else' | 'else if' + expression?: string // JavaScript expression to evaluate + } } export interface SerializedBlock { diff --git a/tools/function/execute.ts b/tools/function/execute.ts index 8f4f1df07..e5be8b609 100644 --- a/tools/function/execute.ts +++ b/tools/function/execute.ts @@ -2,21 +2,26 @@ import { ToolConfig, ToolResponse } from '../types' export interface CodeExecutionInput { code: Array<{ content: string; id: string }> | string - input?: Record + timeout?: number + memoryLimit?: number } export interface CodeExecutionOutput extends ToolResponse { output: { result: any stdout: string + executionTime: number } } +const DEFAULT_TIMEOUT = 3000 // 3 seconds +const DEFAULT_MEMORY_LIMIT = 512 // 512MB + export const functionExecuteTool: ToolConfig = { id: 'function_execute', name: 'Function Execute', description: - 'Execute code snippets in a secure, sandboxed environment with support for multiple languages. Captures both function output and stdout with proper error handling.', + 'Execute JavaScript code in a secure, sandboxed environment with proper isolation and resource limits.', version: '1.0.0', params: { @@ -25,71 +30,54 @@ export const functionExecuteTool: ToolConfig ({ - 'Content-Type': 'application/json', - Accept: 'application/json', - }), - body: (params) => { - const codeContent = Array.isArray(params.code) - ? params.code.map((c) => c.content).join('\n') - : params.code - - return { - language: 'js', - version: '*', - files: [ - { - name: 'code.js', - content: codeContent, - }, - ], - stdin: '', - args: [], - compile_timeout: 10000, - run_timeout: 3000, - compile_memory_limit: -1, - run_memory_limit: -1, - } + timeout: { + type: 'number', + required: false, + description: 'Execution timeout in milliseconds', + default: DEFAULT_TIMEOUT, + }, + memoryLimit: { + type: 'number', + required: false, + description: 'Memory limit in MB', + default: DEFAULT_MEMORY_LIMIT, }, }, - transformResponse: async (response) => { + request: { + url: '/api/execute', + method: 'POST', + headers: () => ({ + 'Content-Type': 'application/json', + }), + body: (params: CodeExecutionInput) => { + const codeContent = Array.isArray(params.code) + ? params.code.map((c: { content: string }) => c.content).join('\n') + : params.code + + return { + code: codeContent, + timeout: params.timeout || DEFAULT_TIMEOUT, + memoryLimit: params.memoryLimit || DEFAULT_MEMORY_LIMIT, + } + }, + isInternalRoute: true, + }, + + transformResponse: async (response: Response): Promise => { const result = await response.json() - if (!response.ok) { - throw new Error(result.message || 'Execution failed') + if (!response.ok || !result.success) { + throw new Error(result.error || 'Code execution failed') } - if (result.run?.stderr) { - throw new Error(result.run.stderr) - } - - const stdout = result.run?.stdout || '' - - try { - // Try parsing the output as JSON - const parsed = JSON.parse(stdout) - return { - success: true, - output: { - result: parsed, - stdout, - }, - } - } catch { - // If not JSON, return as string - return { - success: true, - output: { - result: stdout, - stdout, - }, - } + return { + success: true, + output: { + result: result.output.result, + stdout: result.output.stdout, + executionTime: result.output.executionTime, + }, } }, diff --git a/tools/index.ts b/tools/index.ts index b637fb338..c90ddd6f7 100644 --- a/tools/index.ts +++ b/tools/index.ts @@ -57,12 +57,33 @@ export function getTool(toolId: string): ToolConfig | undefined { return tools[toolId] } -// Execute a tool by calling the reverse proxy endpoint. +// Execute a tool by calling either the proxy for external APIs or directly for internal routes export async function executeTool( toolId: string, params: Record ): Promise { try { + const tool = getTool(toolId) + if (!tool) { + throw new Error(`Tool not found: ${toolId}`) + } + + // For internal routes, call the API directly + if (tool.request.isInternalRoute) { + const url = + typeof tool.request.url === 'function' ? tool.request.url(params) : tool.request.url + + const response = await fetch(url, { + method: tool.request.method, + headers: tool.request.headers(params), + body: JSON.stringify(tool.request.body ? tool.request.body(params) : params), + }) + + const result = await tool.transformResponse(response) + return result + } + + // For external APIs, use the proxy const response = await fetch('/api/proxy', { method: 'POST', headers: { 'Content-Type': 'application/json' }, diff --git a/tools/types.ts b/tools/types.ts index e9dd0a8e1..f6ec52fd8 100644 --- a/tools/types.ts +++ b/tools/types.ts @@ -31,6 +31,7 @@ export interface ToolConfig

{ method: string headers: (params: P) => Record body?: (params: P) => Record + isInternalRoute?: boolean // Whether this is an internal API route } // Response handling