From 5ccf55cbb89cf6db8a11fd436f0a17230a58e020 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Tue, 28 Jan 2025 19:36:12 -0800 Subject: [PATCH] Extended and simplified executor and workflow execution hook, connections work. Added new unit test for Agent -> Function -> API and it passes. Tested Agent -> API in workflow UI, succeeds. --- app/w/hooks/use-workflow-execution.ts | 70 ++++--- executor/__tests__/executor.test.ts | 284 +++++++++++++++++++++++--- executor/index.ts | 188 ++++++++--------- 3 files changed, 383 insertions(+), 159 deletions(-) diff --git a/app/w/hooks/use-workflow-execution.ts b/app/w/hooks/use-workflow-execution.ts index 7988487996..869364ee78 100644 --- a/app/w/hooks/use-workflow-execution.ts +++ b/app/w/hooks/use-workflow-execution.ts @@ -1,48 +1,68 @@ -import { useState } from 'react' +import { useCallback, useState } from 'react' import { useWorkflowStore } from '@/stores/workflow/workflow-store' +import { Serializer } from '@/serializer' +import { Executor } from '@/executor' +import { ExecutionResult } from '@/executor/types' import { useNotificationStore } from '@/stores/notifications/notifications-store' -import { executeWorkflow } from '@/lib/workflow' export function useWorkflowExecution() { const [isExecuting, setIsExecuting] = useState(false) - const [executionResult, setExecutionResult] = useState(null) + const [executionResult, setExecutionResult] = useState(null) const { blocks, edges } = useWorkflowStore() const { addNotification } = useNotificationStore() - const handleRunWorkflow = async () => { + const handleRunWorkflow = useCallback(async () => { + setIsExecuting(true) try { - setIsExecuting(true) - setExecutionResult(null) + // Extract existing block states + const currentBlockStates = Object.entries(blocks).reduce((acc, [id, block]) => { + if (block.subBlocks?.response?.value !== undefined) { + acc[id] = { response: block.subBlocks.response.value } + } + return acc + }, {} as Record) - const result = await executeWorkflow( - blocks, - edges, - window.location.pathname.split('/').pop() || 'workflow' + // Execute workflow + const executor = new Executor( + new Serializer().serializeWorkflow(blocks, edges), + currentBlockStates ) - + + const result = await executor.execute(crypto.randomUUID()) setExecutionResult(result) + // Show execution result + addNotification( + result.success ? 'console' : 'error', + result.success + ? 'Workflow completed successfully' + : `Failed to execute workflow: ${result.error}` + ) + + // Log detailed result to console if (result.success) { - addNotification('console', 'Workflow completed successfully') - } else { - addNotification('error', `Failed to execute workflow: ${result.error}`) + console.group('Workflow Execution Result') + console.log('Status: ✅ Success') + console.log('Data:', result.data) + if (result.metadata) { + console.log('Duration:', result.metadata.duration + 'ms') + console.log('Start Time:', new Date(result.metadata.startTime).toLocaleTimeString()) + console.log('End Time:', new Date(result.metadata.endTime).toLocaleTimeString()) + } + console.groupEnd() } - } catch (error: any) { - console.error('Error executing workflow:', error) + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error' setExecutionResult({ success: false, - error: - error instanceof Error ? error.message : 'Unknown error occurred', + data: {}, + error: errorMessage }) - addNotification('error', `Failed to execute workflow: ${error.message}`) + addNotification('error', `Failed to execute workflow: ${errorMessage}`) } finally { setIsExecuting(false) } - } + }, [blocks, edges, addNotification]) - return { - isExecuting, - executionResult, - handleRunWorkflow - } + return { isExecuting, executionResult, handleRunWorkflow } } \ No newline at end of file diff --git a/executor/__tests__/executor.test.ts b/executor/__tests__/executor.test.ts index 3ac8a12ea5..277d796104 100644 --- a/executor/__tests__/executor.test.ts +++ b/executor/__tests__/executor.test.ts @@ -3,11 +3,6 @@ import { SerializedWorkflow } from '@/serializer/types' import { Tool } from '../types' import { tools } from '@/tools' -// Mock icons -jest.mock('@/components/icons', () => ({ - AgentIcon: () => null -})) - // Mock tools const createMockTool = ( id: string, @@ -327,76 +322,71 @@ describe('Executor', () => { const mockTool1 = createMockTool( 'tool-1', 'Tool 1', - { output: 'test data' } - ) + { response: 'test data' } + ); const mockTool2 = createMockTool( 'tool-2', 'Tool 2', - { result: 'processed data' } + { response: 'processed data' } ); (tools as any)['tool-1'] = mockTool1; - (tools as any)['tool-2'] = mockTool2 + (tools as any)['tool-2'] = mockTool2; const workflow: SerializedWorkflow = { version: '1.0', blocks: [ { - id: 'block-1', + id: 'block1', position: { x: 0, y: 0 }, config: { tool: 'tool-1', params: { input: 'initial' }, interface: { inputs: {}, - outputs: { output: 'string' } + outputs: { response: 'string' } } } }, { - id: 'block-2', + id: 'block2', position: { x: 200, y: 0 }, config: { tool: 'tool-2', - params: {}, + params: { + input: '' + }, interface: { inputs: { input: 'string' }, - outputs: { result: 'string' } + outputs: { response: 'string' } } } } ], - connections: [ - { - source: 'block-1', - target: 'block-2', - sourceHandle: 'output', - targetHandle: 'input' - } - ] - } + connections: [] + }; // Mock fetch for both tools global.fetch = jest.fn() .mockImplementationOnce(() => Promise.resolve({ ok: true, - json: () => Promise.resolve({ output: 'test data' }) + json: () => Promise.resolve({ response: 'test data' }) }) ) .mockImplementationOnce(() => Promise.resolve({ ok: true, - json: () => Promise.resolve({ result: 'processed data' }) + json: () => Promise.resolve({ response: 'processed data' }) }) - ) + ); - const executor = new Executor(workflow) - const result = await executor.execute('workflow-1') + const executor = new Executor(workflow); + const result = await executor.execute('workflow-1'); - expect(result.success).toBe(true) - expect(result.data).toEqual({ result: 'processed data' }) - expect(global.fetch).toHaveBeenCalledTimes(2) - }) + expect(result.success).toBe(true); + expect(result.data).toEqual({ response: 'processed data' }); + expect(global.fetch).toHaveBeenCalledTimes(2); + }); it('should handle cycles in workflow', async () => { const workflow: SerializedWorkflow = { @@ -450,4 +440,234 @@ describe('Executor', () => { expect(result.error).toContain('Workflow contains cycles') }) }) + + describe('Connection Tests', () => { + it('should execute an Agent -> Function -> API chain', async () => { + // Mock the OpenAI chat tool + const openaiTool: Tool = { + id: 'openai.chat', + name: 'OpenAI Chat', + description: 'Chat with OpenAI models', + version: '1.0.0', + params: { + systemPrompt: { + type: 'string', + required: true, + description: 'System prompt' + }, + apiKey: { + type: 'string', + required: true, + description: 'OpenAI API key' + } + }, + request: { + url: 'https://api.openai.com/v1/chat/completions', + method: 'POST', + headers: (params) => ({ + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${params.apiKey}` + }), + body: (params) => ({ + model: 'gpt-4', + messages: [ + { role: 'system', content: params.systemPrompt } + ] + }) + }, + transformResponse: async () => ({ + response: 'https://api.example.com/data' + }), + transformError: () => 'OpenAI error' + }; + + // Mock the Function execution tool + const functionTool: Tool = { + id: 'function.execute', + name: 'Execute Function', + description: 'Execute custom code', + version: '1.0.0', + params: { + code: { + type: 'string', + required: true, + description: 'Code to execute' + }, + url: { + type: 'string', + required: true, + description: 'URL to process' + } + }, + request: { + url: 'http://localhost:3000/api/function', + method: 'POST', + headers: () => ({ 'Content-Type': 'application/json' }), + body: (params) => ({ code: params.code, url: params.url }) + }, + transformResponse: async () => ({ + response: { method: 'GET', headers: { 'Accept': 'application/json' } } + }), + transformError: () => 'Function execution error' + }; + + // Mock the HTTP request tool + const httpTool: Tool = { + id: 'http.request', + name: 'HTTP Request', + description: 'Make HTTP requests', + version: '1.0.0', + params: { + url: { + type: 'string', + required: true, + description: 'URL to request' + }, + method: { + type: 'string', + required: true, + description: 'HTTP method' + } + }, + request: { + url: (params) => params.url, + method: 'GET', + headers: () => ({ 'Content-Type': 'application/json' }), + body: undefined + }, + transformResponse: async () => ({ + response: { status: 200, data: { message: 'Success!' } } + }), + transformError: () => 'HTTP request error' + }; + + (tools as any)['openai.chat'] = openaiTool; + (tools as any)['function.execute'] = functionTool; + (tools as any)['http.request'] = httpTool; + + const workflow: SerializedWorkflow = { + version: '1.0', + blocks: [ + { + id: 'agent1', + position: { x: 0, y: 0 }, + config: { + tool: 'openai.chat', + params: { + systemPrompt: 'Generate an API endpoint', + apiKey: 'test-key' + }, + interface: { + inputs: { + systemPrompt: 'string', + apiKey: 'string' + }, + outputs: { + response: 'string' + } + } + } + }, + { + id: 'function1', + position: { x: 200, y: 0 }, + config: { + tool: 'function.execute', + params: { + code: 'return { method: "GET", headers: { "Accept": "application/json" } }', + url: '' + }, + interface: { + inputs: { + code: 'string', + url: 'string' + }, + outputs: { + response: 'any' + } + } + } + }, + { + id: 'api1', + position: { x: 400, y: 0 }, + config: { + tool: 'http.request', + params: { + url: '', + method: '' + }, + interface: { + inputs: { + url: 'string', + method: 'string' + }, + outputs: { + response: 'any' + } + } + } + } + ], + connections: [] + }; + + // Mock fetch responses + global.fetch = jest.fn() + .mockImplementationOnce(() => + Promise.resolve({ + ok: true, + json: () => Promise.resolve({ + response: 'https://api.example.com/data' + }) + }) + ) + .mockImplementationOnce(() => + Promise.resolve({ + ok: true, + json: () => Promise.resolve({ + response: { method: 'GET', headers: { 'Accept': 'application/json' } } + }) + }) + ) + .mockImplementationOnce(() => + Promise.resolve({ + ok: true, + json: () => Promise.resolve({ + response: { status: 200, data: { message: 'Success!' } } + }) + }) + ); + + const executor = new Executor(workflow); + const result = await executor.execute('test-workflow'); + + expect(result.success).toBe(true); + expect(result.data).toEqual({ + response: { status: 200, data: { message: 'Success!' } } + }); + + // Verify the execution order and data flow + const fetchCalls = (global.fetch as jest.Mock).mock.calls; + expect(fetchCalls).toHaveLength(3); + + // First call - Agent generates API endpoint + expect(JSON.parse(fetchCalls[0][1].body)).toEqual({ + model: 'gpt-4', + messages: [ + { role: 'system', content: 'Generate an API endpoint' } + ] + }); + + // Second call - Function processes the URL + expect(JSON.parse(fetchCalls[1][1].body)).toEqual({ + code: 'return { method: "GET", headers: { "Accept": "application/json" } }', + url: 'https://api.example.com/data' + }); + + // Third call - API makes the request + expect(fetchCalls[2][0]).toBe('https://api.example.com/data'); + expect(fetchCalls[2][1].method).toBe('GET'); + }); + }); }) diff --git a/executor/index.ts b/executor/index.ts index 85b72b68f9..c95a746410 100644 --- a/executor/index.ts +++ b/executor/index.ts @@ -3,140 +3,132 @@ import { ExecutionContext, ExecutionResult, Tool } from './types' import { tools } from '@/tools' export class Executor { - private workflow: SerializedWorkflow - - constructor(workflow: SerializedWorkflow) { - this.workflow = workflow - } + constructor( + private workflow: SerializedWorkflow, + private initialBlockStates: Record = {} + ) {} private async executeBlock( block: SerializedBlock, inputs: Record, context: ExecutionContext ): Promise> { - const config = block.config - const toolId = config.tool - - if (!toolId) { - throw new Error(`Block ${block.id} does not specify a tool`) - } + const toolId = block.config.tool + if (!toolId) throw new Error(`Block ${block.id} does not specify a tool`) const tool = tools[toolId] - if (!tool) { - throw new Error(`Tool not found: ${toolId}`) - } + if (!tool) throw new Error(`Tool not found: ${toolId}`) - // Merge block parameters with runtime inputs - const params = { - ...config.params, - ...inputs - } - - // Validate tool parameters and apply defaults - const validatedParams: Record = {} - for (const [paramName, paramConfig] of Object.entries(tool.params)) { - if (paramName in params) { - validatedParams[paramName] = params[paramName] - } else if ('default' in paramConfig) { - validatedParams[paramName] = paramConfig.default - } else if (paramConfig.required) { - throw new Error(`Missing required parameter '${paramName}' for tool ${toolId}`) - } - } + const validatedParams = this.validateToolParams(tool, { ...block.config.params, ...inputs }) try { - // Make the HTTP request - const url = typeof tool.request.url === 'function' - ? tool.request.url(validatedParams) - : tool.request.url + const url = typeof tool.request.url === 'function' ? tool.request.url(validatedParams) : tool.request.url + const headers = tool.request.headers(validatedParams) + const method = typeof validatedParams.method === 'object' + ? validatedParams.method.method + : (validatedParams.method || tool.request.method) - const response = await fetch(url, { - method: tool.request.method, - headers: tool.request.headers(validatedParams), - body: tool.request.body ? JSON.stringify(tool.request.body(validatedParams)) : undefined - }) + const body = (method !== 'GET' && method !== 'HEAD' && tool.request.body) + ? JSON.stringify(tool.request.body(validatedParams)) + : undefined + + const response = await fetch(url, { method, headers, body }) if (!response.ok) { const error = await response.json().catch(() => ({ message: response.statusText })) throw new Error(tool.transformError(error)) } - return await tool.transformResponse(response) } catch (error) { throw new Error(`Tool ${toolId} execution failed: ${error instanceof Error ? error.message : 'Unknown error'}`) } } + private validateToolParams(tool: Tool, params: Record): Record { + return Object.entries(tool.params).reduce((validated, [name, config]) => { + if (name in params) validated[name] = params[name] + else if ('default' in config) validated[name] = config.default + else if (config.required) throw new Error(`Missing required parameter '${name}'`) + return validated + }, {} as Record) + } + private determineExecutionOrder(): string[] { const { blocks, connections } = this.workflow const order: string[] = [] const visited = new Set() - const inDegree = new Map() + const inDegree = new Map(blocks.map(block => [block.id, 0])) - blocks.forEach(block => inDegree.set(block.id, 0)) connections.forEach(conn => { - const target = conn.target - inDegree.set(target, (inDegree.get(target) || 0) + 1) + inDegree.set(conn.target, (inDegree.get(conn.target) || 0) + 1) }) const queue = blocks - .filter(block => (inDegree.get(block.id) || 0) === 0) + .filter(block => inDegree.get(block.id) === 0) .map(block => block.id) while (queue.length > 0) { const blockId = queue.shift()! - if (visited.has(blockId)) continue - - visited.add(blockId) - order.push(blockId) - - connections - .filter(conn => conn.source === blockId) - .forEach(conn => { - const targetId = conn.target - inDegree.set(targetId, (inDegree.get(targetId) || 0) - 1) - - if (inDegree.get(targetId) === 0) { - queue.push(targetId) - } - }) - } - - if (order.length !== blocks.length) { - throw new Error('Workflow contains cycles') + if (!visited.has(blockId)) { + visited.add(blockId) + order.push(blockId) + + connections + .filter(conn => conn.source === blockId) + .forEach(conn => { + const newDegree = (inDegree.get(conn.target) || 0) - 1 + inDegree.set(conn.target, newDegree) + if (newDegree === 0) queue.push(conn.target) + }) + } } + if (order.length !== blocks.length) throw new Error('Workflow contains cycles') return order } - private resolveInputs( - block: SerializedBlock, - context: ExecutionContext - ): Record { - const inputs: Record = {} - - // Get all incoming connections for this block - const incomingConnections = this.workflow.connections.filter( - conn => conn.target === block.id + private resolveInputs(block: SerializedBlock, context: ExecutionContext): Record { + const inputs = { ...block.config.params } + const blockNameMap = new Map( + this.workflow.blocks + .map(b => { + const name = b.metadata?.title?.toLowerCase().replace(' ', '') || '' + return name ? [name, b.id] as [string, string] : null + }) + .filter((entry): entry is [string, string] => entry !== null) ) + + const blockStateMap = new Map( + Object.entries(this.initialBlockStates) + .filter(([_, state]) => state !== undefined) + ) + + const connectionPattern = /<([a-z0-9]+)\.(string|number|boolean|res|any)>/g - // Map outputs from previous blocks to inputs for this block - incomingConnections.forEach(conn => { - const sourceOutput = context.blockStates.get(conn.source) - if (sourceOutput && conn.sourceHandle && conn.targetHandle) { - inputs[conn.targetHandle] = sourceOutput[conn.sourceHandle] + return Object.entries(block.config.params || {}).reduce((acc, [key, value]) => { + if (typeof value === 'string') { + let resolvedValue = value + Array.from(value.matchAll(connectionPattern)).forEach(match => { + const [fullMatch, blockName, type] = match + const blockId = blockNameMap.get(blockName) || blockName + const sourceOutput = context.blockStates.get(blockId) || blockStateMap.get(blockId) + + if (sourceOutput) { + const replacementValue = type === 'res' + ? (sourceOutput.response?.method || sourceOutput.response || sourceOutput) + : (sourceOutput.output || sourceOutput.response) + + if (replacementValue !== undefined) { + resolvedValue = resolvedValue.replace(fullMatch, replacementValue.toString()) + } + } + }) + acc[key] = resolvedValue + } else { + acc[key] = value } - }) - - // If this is a start block with no inputs, use the block's params - if (Object.keys(inputs).length === 0) { - const targetBlock = this.workflow.blocks.find(b => b.id === block.id) - if (targetBlock) { - return targetBlock.config.params - } - } - - return inputs + return acc + }, inputs) } async execute(workflowId: string): Promise { @@ -144,9 +136,7 @@ export class Executor { const context: ExecutionContext = { workflowId, blockStates: new Map(), - metadata: { - startTime: startTime.toISOString() - } + metadata: { startTime: startTime.toISOString() } } try { @@ -154,22 +144,16 @@ export class Executor { for (const blockId of executionOrder) { const block = this.workflow.blocks.find(b => b.id === blockId) - if (!block) { - throw new Error(`Block ${blockId} not found in workflow`) - } + if (!block) throw new Error(`Block ${blockId} not found in workflow`) - const blockInputs = this.resolveInputs(block, context) - const result = await this.executeBlock(block, blockInputs, context) + const result = await this.executeBlock(block, this.resolveInputs(block, context), context) context.blockStates.set(blockId, result) } - const lastBlockId = executionOrder[executionOrder.length - 1] - const finalOutput = context.blockStates.get(lastBlockId) - const endTime = new Date() return { success: true, - data: finalOutput || {}, + data: context.blockStates.get(executionOrder[executionOrder.length - 1]) || {}, metadata: { duration: endTime.getTime() - startTime.getTime(), startTime: startTime.toISOString(),