diff --git a/app/w/hooks/use-workflow-execution.ts b/app/w/hooks/use-workflow-execution.ts index e65e0471c..dbf0dd309 100644 --- a/app/w/hooks/use-workflow-execution.ts +++ b/app/w/hooks/use-workflow-execution.ts @@ -26,32 +26,44 @@ export function useWorkflowExecution() { }, {} as Record) // Execute workflow - const executor = new Executor( - new Serializer().serializeWorkflow(blocks, edges), - currentBlockStates - ) + const workflow = new Serializer().serializeWorkflow(blocks, edges) + const executor = new Executor(workflow, currentBlockStates) - const result = await executor.execute(crypto.randomUUID()) + const result = await executor.execute('my-run-id') setExecutionResult(result) + if (result.logs) { + console.group('Detailed Block Logs') + result.logs.forEach((log) => { + console.log(`Block ${log.blockTitle}: Success=${log.success}`, { + output: log.output, + error: log.error, + durationMs: log.durationMs, + startedAt: log.startedAt, + endedAt: log.endedAt + }) + }) + console.groupEnd() + } + // Show execution result with workflowId addNotification( result.success ? 'console' : 'error', result.success ? 'Workflow completed successfully' - : `Failed to execute workflow: ${result.error}`, + : `Workflow execution failed: ${result.error}`, activeWorkflowId ) - // Log detailed result to console + // Also log final output info if (result.success) { - console.group('Workflow Execution Result') + console.group('Final Output') console.log('Status: ✅ Success') console.log('Output:', result.output) if (result.metadata) { console.log('Duration:', result.metadata.duration + 'ms') - console.log('Start Time:', new Date(result.metadata.startTime).toLocaleTimeString()) - console.log('End Time:', new Date(result.metadata.endTime).toLocaleTimeString()) + console.log('StartedAt:', new Date(result.metadata.startTime).toLocaleTimeString()) + console.log('EndedAt:', new Date(result.metadata.endTime).toLocaleTimeString()) } console.groupEnd() } @@ -62,7 +74,7 @@ export function useWorkflowExecution() { output: { response: {} }, error: errorMessage }) - addNotification('error', `Failed to execute workflow: ${errorMessage}`, activeWorkflowId) + addNotification('error', `Workflow execution failed: ${errorMessage}`, activeWorkflowId) } finally { setIsExecuting(false) } diff --git a/executor/index.ts b/executor/index.ts index 340f2b780..df2c73b84 100644 --- a/executor/index.ts +++ b/executor/index.ts @@ -1,234 +1,386 @@ +/** + * "Executor" for running agentic workflows in parallel. + * + * Notes & Features: + * • Uses a layered topological sort to allow parallel block execution for blocks with no remaining dependencies. + * • Each block's inputs are resolved through a template mechanism (e.g., ). + * • Stores block outputs in context.blockStates so subsequent blocks can reference them by ID or name. + * • Maintains robust error handling (if a block fails, throws an error for the entire workflow). + * • Returns per-block logs that can be displayed in the UI for better trace/debug. + */ + import { SerializedWorkflow, SerializedBlock } from '@/serializer/types' -import { ExecutionContext, ExecutionResult, Tool } from './types' -import { tools } from '@/tools' import { BlockOutput } from '@/blocks/types' +import { + Tool, + ExecutionContext, + ExecutionResult, + BlockLog +} from './types' +import { tools } from '@/tools' export class Executor { constructor( private workflow: SerializedWorkflow, + // Initial block states can be passed in if you need to resume workflows or pre-populate data. private initialBlockStates: Record = {} ) {} + /** + * Main entry point that executes the entire workflow in parallel layers. + */ + async execute(workflowId: string): Promise { + const startTime = new Date() + + // Build the ExecutionContext with new blockLogs array + const context: ExecutionContext = { + workflowId, + blockStates: new Map(), + blockLogs: [], + metadata: { + startTime: startTime.toISOString() + }, + } + + // Pre-populate block states if initialBlockStates exist + Object.entries(this.initialBlockStates).forEach(([blockId, output]) => { + context.blockStates.set(blockId, output) + }) + + try { + // Perform layered parallel execution + const lastOutput = await this.executeInParallel(context) + + const endTime = new Date() + context.metadata.endTime = endTime.toISOString() + + // Return full logs for the UI to consume + return { + success: true, + output: lastOutput, + metadata: { + duration: endTime.getTime() - startTime.getTime(), + startTime: context.metadata.startTime!, + endTime: context.metadata.endTime!, + }, + logs: context.blockLogs, + } + } catch (error) { + return { + success: false, + output: { response: {} }, + error: error instanceof Error ? error.message : 'Unknown error', + logs: context.blockLogs, + } + } + } + + /** + * Executes all blocks in a layered topological fashion, running each layer in parallel via Promise.all. + * If a cycle is detected, throws an error. + */ + private async executeInParallel(context: ExecutionContext): Promise { + const { blocks, connections } = this.workflow + + // Build in-degree and adjacency list for each block + const inDegree = new Map() + const adjacency = new Map() + + // Initialize inDegree and adjacency + for (const block of blocks) { + inDegree.set(block.id, 0) + adjacency.set(block.id, []) + } + + // Populate edges + for (const conn of connections) { + inDegree.set(conn.target, (inDegree.get(conn.target) || 0) + 1) + adjacency.get(conn.source)?.push(conn.target) + } + + // Start with all blocks that have inDegree = 0 + let layer = blocks + .filter((b) => (inDegree.get(b.id) || 0) === 0) + .map((b) => b.id) + + // Track the final output from the "last" block or set of blocks + let lastOutput: BlockOutput = { response: {} } + + while (layer.length > 0) { + // Execute current layer in parallel + const results = await Promise.all( + layer.map(async (blockId) => { + // Find the block object + const block = blocks.find((b) => b.id === blockId) + if (!block) { + throw new Error(`Missing block ${blockId}`) + } + + // Resolve template references in block config params + const resolvedInputs = this.resolveInputs(block, context) + + // Execute the block, store the result + const output = await this.executeBlock(block, resolvedInputs, context) + context.blockStates.set(block.id, output) + return output + }) + ) + + // Keep track of the "most recent" result as lastOutput + if (results.length > 0) { + lastOutput = results[results.length - 1] + } + + // Build the next layer by reducing in-degree of neighbors + const nextLayer: string[] = [] + for (const blockId of layer) { + const neighbors = adjacency.get(blockId) || [] + for (const targetId of neighbors) { + const deg = inDegree.get(targetId) ?? 0 + const newDeg = deg - 1 + inDegree.set(targetId, newDeg) + if (newDeg === 0) { + nextLayer.push(targetId) + } + } + } + + layer = nextLayer + } + + // Validate that all blocks were executed. If not, the workflow has a cycle. + const executedCount = [...inDegree.values()].filter((x) => x === 0).length + if (executedCount !== blocks.length) { + throw new Error('Workflow contains cycles or invalid connections') + } + + return lastOutput + } + + /** + * Executes a single block by: + * 1) Determining which tool to call + * 2) Validating parameters + * 3) Making the request (for http blocks or LLM blocks, etc.) + * 4) Transforming the response via the tool's transformResponse + */ private async executeBlock( block: SerializedBlock, inputs: Record, context: ExecutionContext ): Promise { const toolId = block.config.tool - if (!toolId) throw new Error(`Block ${block.id} does not specify a tool`) + if (!toolId) { + throw new Error(`Block "${block.id}" does not specify a tool`) + } - const tool = tools[toolId] - if (!tool) throw new Error(`Tool not found: ${toolId}`) + const tool: Tool | undefined = tools[toolId] + if (!tool) { + throw new Error(`Tool not found: ${toolId}`) + } - const validatedParams = this.validateToolParams(tool, { ...block.config.params, ...inputs }) + // Merge block's static params with dynamic inputs + const validatedParams = this.validateToolParams(tool, { + ...block.config.params, + ...inputs, + }) + + // Prepare a new blockLog entry + const blockLog: Partial = { + blockId: block.id, + blockTitle: block.metadata?.title || 'Unnamed Block', + startedAt: new Date().toISOString(), + } try { - const url = typeof tool.request.url === 'function' ? tool.request.url(validatedParams) : tool.request.url - const headers = tool.request.headers(validatedParams) - const method = typeof validatedParams.method === 'object' - ? validatedParams.method.method - : (validatedParams.method || tool.request.method) + if (!tool.request) { + throw new Error(`Tool "${toolId}" has no request config.`) + } - const body = (method !== 'GET' && method !== 'HEAD' && tool.request.body) - ? JSON.stringify(tool.request.body(validatedParams)) + const { url: urlOrFn, method: defaultMethod, headers: headersFn, body: bodyFn } = + tool.request + + // Build the URL + const url = typeof urlOrFn === 'function' ? urlOrFn(validatedParams) : urlOrFn + // Determine HTTP method + const methodFromParams = + typeof validatedParams.method === 'object' + ? validatedParams.method.method + : validatedParams.method + const method = methodFromParams || defaultMethod || 'GET' + + // Safely compute headers + const headers = headersFn?.(validatedParams) ?? {} + + // Build body if needed + const bodyNeeded = method !== 'GET' && method !== 'HEAD' && !!bodyFn + const body = bodyNeeded + ? JSON.stringify(bodyFn!(validatedParams)) : undefined - const response = await fetch(url, { method, headers, body }) - + // Perform fetch() + const response = await fetch(url || '', { method, headers, body }) if (!response.ok) { - const error = await response.json().catch(() => ({ message: response.statusText })) - throw new Error(tool.transformError(error)) + // In case there is a custom transformError + const transformError = tool.transformError ?? (() => 'Unknown error') + const errorBody = await response.json().catch(() => ({ + message: response.statusText, + })) + throw new Error(transformError(errorBody)) } - const result = await tool.transformResponse(response) - + // Transform the response + const transformResponse = + tool.transformResponse ?? + (async (resp: Response) => ({ + success: true, + output: await resp.json(), + })) + + const result = await transformResponse(response) if (!result.success) { - throw new Error(tool.transformError(result)) + const transformError = tool.transformError ?? (() => 'Tool returned an error object') + throw new Error(transformError(result)) } - - return { - response: result.output + + // Success: update the blockLog with success & output + blockLog.success = true + blockLog.output = result.output + + return { response: result.output } + } catch (e) { + blockLog.success = false + blockLog.error = e instanceof Error ? e.message : 'Unknown error' + throw e + } finally { + // Compute the end time and duration + const end = new Date() + blockLog.endedAt = end.toISOString() + + if (blockLog.startedAt) { + const started = new Date(blockLog.startedAt).getTime() + blockLog.durationMs = end.getTime() - started + } else { + blockLog.durationMs = 0 } - } catch (error) { - throw new Error(`Tool ${toolId} execution failed: ${error instanceof Error ? error.message : 'Unknown error'}`) + + // Push the log entry + context.blockLogs.push(blockLog as BlockLog) } } + /** + * Validates required parameters for a Tool, or uses defaults if present. + */ private validateToolParams(tool: Tool, params: Record): Record { - return Object.entries(tool.params).reduce((validated, [name, config]) => { - if (name in params) validated[name] = params[name] - else if ('default' in config) validated[name] = config.default - else if (config.required) throw new Error(`Missing required parameter '${name}'`) - return validated + return Object.entries(tool.params).reduce((acc, [name, config]) => { + if (name in params) { + acc[name] = params[name] + } else if ('default' in config) { + acc[name] = config.default + } else if (config.required) { + throw new Error(`Missing required parameter '${name}'`) + } + return acc }, {} as Record) } - private determineExecutionOrder(): string[] { - const { blocks, connections } = this.workflow - const order: string[] = [] - const visited = new Set() - const inDegree = new Map(blocks.map(block => [block.id, 0])) - - connections.forEach(conn => { - inDegree.set(conn.target, (inDegree.get(conn.target) || 0) + 1) - }) - - const queue = blocks - .filter(block => inDegree.get(block.id) === 0) - .map(block => block.id) - - while (queue.length > 0) { - const blockId = queue.shift()! - if (!visited.has(blockId)) { - visited.add(blockId) - order.push(blockId) - - connections - .filter(conn => conn.source === blockId) - .forEach(conn => { - const newDegree = (inDegree.get(conn.target) || 0) - 1 - inDegree.set(conn.target, newDegree) - if (newDegree === 0) queue.push(conn.target) - }) - } - } - - if (order.length !== blocks.length) throw new Error('Workflow contains cycles') - return order - } - - private resolveInputs(block: SerializedBlock, context: ExecutionContext): Record { + /** + * Resolves any template references in a block's config params (e.g., ""), + * pulling from context.blockStates. This is how outputs from one block get wired as inputs to another. + */ + private resolveInputs( + block: SerializedBlock, + context: ExecutionContext + ): Record { const inputs = { ...block.config.params } - // Create maps for both ID and name lookups - const blockById = new Map( - this.workflow.blocks.map(b => [b.id, b]) - ) + // Create quick-lookup for blocks by ID and by normalized name + const blockById = new Map(this.workflow.blocks.map((b) => [b.id, b])) const blockByName = new Map( - this.workflow.blocks.map(b => [ + this.workflow.blocks.map((b) => [ b.metadata?.title?.toLowerCase().replace(/\s+/g, '') || '', b ]) ) - const resolvedInputs = Object.entries(inputs).reduce((acc, [key, value]) => { - if (typeof value === 'string') { - const matches = value.match(/<([^>]+)>/g) - - if (matches) { - let resolvedValue = value - - matches.forEach(match => { - const path = match.slice(1, -1) // Remove < and > - const [blockRef, ...pathParts] = path.split('.') - - // Try to find block by ID first, then by normalized name - let sourceBlock = blockById.get(blockRef) - if (!sourceBlock) { - const normalizedName = blockRef.toLowerCase().replace(/\s+/g, '') - sourceBlock = blockByName.get(normalizedName) - } + const resolvedInputs = Object.entries(inputs).reduce( + (acc, [key, value]) => { + if (typeof value === 'string') { + const matches = value.match(/<([^>]+)>/g) + if (matches) { + let resolvedValue = value + for (const match of matches) { + // e.g. "" + const path = match.slice(1, -1) // remove < and > + const [blockRef, ...pathParts] = path.split('.') - if (!sourceBlock) { - console.warn(`Block ${blockRef} not found by ID or name`) - return - } - - const sourceState = context.blockStates.get(sourceBlock.id) - if (!sourceState) { - console.warn(`No state found for block ${sourceBlock.id}`) - return - } - - // Start with the block's state - let replacementValue: any = sourceState - - // Traverse the path parts to get the final value - for (const part of pathParts) { - if (!replacementValue || typeof replacementValue !== 'object') { - console.warn(`Invalid path part ${part} in ${path}`) - return + // Try blockRef as ID, then as normalized name + let sourceBlock = blockById.get(blockRef) + if (!sourceBlock) { + const normalized = blockRef.toLowerCase().replace(/\s+/g, '') + sourceBlock = blockByName.get(normalized) + } + + if (!sourceBlock) { + console.warn(`Block reference "${blockRef}" not found by ID or name.`) + continue + } + + const sourceState = context.blockStates.get(sourceBlock.id) + if (!sourceState) { + console.warn(`No state found for block ID "${sourceBlock.id}".`) + continue + } + + // Drill into the path + let replacementValue: any = sourceState + for (const part of pathParts) { + if (!replacementValue || typeof replacementValue !== 'object') { + console.warn(`Invalid path part "${part}" in "${path}".`) + replacementValue = undefined + break + } + replacementValue = replacementValue[part] + } + + // If a valid leaf is found + if (replacementValue !== undefined) { + // Replace the placeholder in the string + resolvedValue = resolvedValue.replace( + match, + typeof replacementValue === 'object' + ? JSON.stringify(replacementValue) + : String(replacementValue) + ) + } else { + console.warn(`No value found at path "${path}".`) } - replacementValue = replacementValue[part] } - if (replacementValue !== undefined) { - // Replace the entire template expression with the resolved value - resolvedValue = resolvedValue.replace(match, - typeof replacementValue === 'object' - ? JSON.stringify(replacementValue) - : String(replacementValue) - ) - } else { - console.warn(`No value found at path ${path}`) - } - }) - - // Try to parse the value if it looks like JSON - try { - if (resolvedValue.startsWith('{') || resolvedValue.startsWith('[')) { - acc[key] = JSON.parse(resolvedValue) - } else { + // Attempt JSON parse if it looks like JSON + try { + if (resolvedValue.startsWith('{') || resolvedValue.startsWith('[')) { + acc[key] = JSON.parse(resolvedValue) + } else { + acc[key] = resolvedValue + } + } catch { acc[key] = resolvedValue } - } catch { - acc[key] = resolvedValue + } else { + // No placeholders + acc[key] = value } } else { + // Not a string param acc[key] = value } - } else { - acc[key] = value - } - - return acc - }, {} as Record) + return acc + }, + {} as Record + ) return resolvedInputs } - - async execute(workflowId: string): Promise { - const startTime = new Date() - - const context: ExecutionContext = { - workflowId, - blockStates: new Map(), - metadata: { startTime: startTime.toISOString() } - } - - try { - const executionOrder = this.determineExecutionOrder() - - for (const blockId of executionOrder) { - const block = this.workflow.blocks.find(b => b.id === blockId) - if (!block) throw new Error(`Block ${blockId} not found in workflow`) - - const output = await this.executeBlock(block, this.resolveInputs(block, context), context) - context.blockStates.set(blockId, output) - } - - const endTime = new Date() - const lastOutput = context.blockStates.get(executionOrder[executionOrder.length - 1]) - - if (!lastOutput) { - throw new Error('No output from workflow execution') - } - - return { - success: true, - output: lastOutput, - metadata: { - duration: endTime.getTime() - startTime.getTime(), - startTime: startTime.toISOString(), - endTime: endTime.toISOString() - } - } - } catch (error) { - return { - success: false, - output: { response: {} }, - error: error instanceof Error ? error.message : 'Unknown error' - } - } - } } diff --git a/executor/types.ts b/executor/types.ts index 21d0f2ebd..9cb450946 100644 --- a/executor/types.ts +++ b/executor/types.ts @@ -1,49 +1,88 @@ import { BlockOutput } from '@/blocks/types' +/** + * Describes a single block's logs, including timing and success/failure state. + */ +export interface BlockLog { + blockId: string + blockTitle?: string + success: boolean + error?: string + startedAt: string + endedAt: string + durationMs: number + output?: any +} + +/** + * Describes the runtime context for executing a workflow, + * including all block outputs (blockStates), metadata for timing, and block logs. + */ +export interface ExecutionContext { + workflowId: string + blockStates: Map + // Make metadata non-optional so we can assign .startTime or .endTime without TS warnings + metadata: { + startTime?: string + endTime?: string + // You can keep an index signature if you want to store extra fields + [key: string]: any + } + // We store logs in an array so the final result includes a step-by-step record + blockLogs: BlockLog[] +} + +/** + * The complete result from executing the workflow. Includes success/fail, + * the "last block" output, optional error, timing metadata, and logs of each block's run. + */ +export interface ExecutionResult { + success: boolean + output: BlockOutput + error?: string + metadata?: { + duration: number + startTime: string + endTime: string + } + // Detailed logs of what happened in each block + logs?: BlockLog[] +} + +/** + * Defines how a particular tool is invoked (URLs, headers, etc.), how it transforms responses + * and handles errors. Used by blocks that reference a particular tool ID. + */ export interface Tool

> { - id: string - name: string - description: string - version: string + id: string + name: string + description: string + version: string params: { [key: string]: { - type: string - required?: boolean - description?: string - default?: any - } - } - request: { - url: string | ((params: P) => string) - method: string - headers: (params: P) => Record - body?: (params: P) => Record - } - transformResponse: (response: any) => Promise<{ + type: string + required?: boolean + description?: string + default?: any + } + } + request?: { + url?: string | ((params: P) => string) + method?: string + headers?: (params: P) => Record + body?: (params: P) => Record + } + transformResponse?: (response: any) => Promise<{ success: boolean output: O error?: string }> - transformError: (error: any) => string + transformError?: (error: any) => string } +/** + * A registry of Tools, keyed by their IDs or names. + */ export interface ToolRegistry { - [key: string]: Tool -} - -export interface ExecutionContext { - workflowId: string - blockStates: Map - metadata?: Record -} - -export interface ExecutionResult { - success: boolean - output: BlockOutput - error?: string - metadata?: { - duration: number - startTime: string - endTime: string - } + [key: string]: Tool } \ No newline at end of file