Modified workflow, serializer, and executor to use block state from stores instead of ReactFlow nodes

This commit is contained in:
Waleed Latif
2025-01-18 22:59:52 -08:00
parent 9e3a440017
commit f9764816de
6 changed files with 378 additions and 291 deletions

View File

@@ -236,33 +236,6 @@ function WorkflowCanvas() {
initializeStateLogger()
}, [])
/**
* Gets the initial node in the workflow by finding the node with no incoming edges
* @returns {Object} Object containing the initial block and its configuration
* @throws {Error} If no initial block is found or block configuration is invalid
*/
const getInitialNode = () => {
const initialBlockId = Object.values(blocks).find(
(block) => !edges.some((edge) => edge.target === block.id)
)?.id
if (!initialBlockId) {
throw new Error('Could not determine the initial block in the workflow')
}
const blockConfig = getBlock(blocks[initialBlockId].type)
if (!blockConfig) {
throw new Error(
`Block configuration not found for type: ${blocks[initialBlockId].type}`
)
}
return {
block: blocks[initialBlockId],
config: blockConfig,
}
}
/**
* Determines the initial input parameters based on the block type
* @param {BlockState} block - The block to get initial input for
@@ -339,27 +312,20 @@ function WorkflowCanvas() {
setIsExecuting(true)
setExecutionResult(null)
// 1. Get initial node
const { block: initialBlock, config: initialBlockConfig } =
getInitialNode()
// 2. Serialize the workflow
// 1. Serialize the workflow
const serializer = new Serializer()
const serializedWorkflow = serializer.serializeWorkflow(
Object.values(blocks).map(serializeBlock),
blocks,
edges
)
// 3. Create executor and run workflow
// 2. Create executor and run workflow
const executor = new Executor(serializedWorkflow)
const initialInput = getInitialInput(initialBlock, initialBlockConfig)
const result = await executor.execute(
window.location.pathname.split('/').pop() || 'workflow',
initialInput
window.location.pathname.split('/').pop() || 'workflow'
)
// 4. Handle result
// 3. Handle result
setExecutionResult(result)
if (result.success) {
@@ -403,6 +369,13 @@ function WorkflowCanvas() {
<div className="relative w-full h-[calc(100vh-56px)]">
<NotificationList />
<style>{keyframeStyles}</style>
{/* <button
onClick={handleRunWorkflow}
disabled={isExecuting}
className="absolute top-4 right-4 z-10 px-4 py-2 bg-blue-500 text-white rounded hover:bg-blue-600 disabled:bg-gray-400 disabled:cursor-not-allowed"
>
{isExecuting ? 'Running...' : 'Test Run'}
</button> */}
<ReactFlow
nodes={nodes}
edges={edges}

View File

@@ -1,7 +1,7 @@
import { Executor } from '../index';
import { SerializedWorkflow } from '@/serializer/types';
import { Tool } from '../types';
import { tools } from '@/tools/registry';
import { tools } from '@/tools';
// Mock tools
const createMockTool = (
@@ -41,6 +41,10 @@ const createMockTool = (
transformError: () => mockError || 'Mock error'
});
jest.mock('@/tools', () => ({
tools: {}
}));
describe('Executor', () => {
beforeEach(() => {
// Reset tools mock
@@ -82,7 +86,7 @@ describe('Executor', () => {
);
const executor = new Executor(workflow);
const result = await executor.execute('workflow-1', { input: 'test' });
const result = await executor.execute('workflow-1');
expect(result.success).toBe(true);
expect(result.data).toEqual({ result: 'test processed' });
@@ -125,7 +129,7 @@ describe('Executor', () => {
};
const executor = new Executor(workflow);
const result = await executor.execute('workflow-1', {});
const result = await executor.execute('workflow-1');
expect(result.success).toBe(false);
expect(result.error).toContain('Missing required parameter');
@@ -166,7 +170,7 @@ describe('Executor', () => {
);
const executor = new Executor(workflow);
const result = await executor.execute('workflow-1', { input: 'test' });
const result = await executor.execute('workflow-1');
expect(result.success).toBe(false);
expect(result.error).toContain('API Error');
@@ -200,7 +204,7 @@ describe('Executor', () => {
};
const executor = new Executor(workflow);
const result = await executor.execute('workflow-1', { input: 42 });
const result = await executor.execute('workflow-1');
expect(result.success).toBe(false);
expect(result.error).toContain('Invalid type for input');
@@ -240,7 +244,7 @@ describe('Executor', () => {
);
const executor = new Executor(workflow);
const result = await executor.execute('workflow-1', { input: 'test' });
const result = await executor.execute('workflow-1');
expect(result.success).toBe(false);
expect(result.error).toContain('Tool output missing required field');
@@ -248,40 +252,40 @@ describe('Executor', () => {
});
describe('Complex Workflows', () => {
it('should execute a workflow with multiple connected blocks', async () => {
const processorTool = createMockTool(
'processor',
'Processor Tool',
{ processed: 'TEST' }
it('should execute blocks in correct order and pass data between them', async () => {
const mockTool1 = createMockTool(
'tool-1',
'Tool 1',
{ output: 'test data' }
);
const formatterTool = createMockTool(
'formatter',
'Formatter Tool',
{ result: '<TEST>' }
const mockTool2 = createMockTool(
'tool-2',
'Tool 2',
{ result: 'processed data' }
);
(tools as any)['processor'] = processorTool;
(tools as any)['formatter'] = formatterTool;
(tools as any)['tool-1'] = mockTool1;
(tools as any)['tool-2'] = mockTool2;
const workflow: SerializedWorkflow = {
version: '1.0',
blocks: [
{
id: 'process',
id: 'block-1',
position: { x: 0, y: 0 },
config: {
tool: 'processor',
params: { input: 'test' },
tool: 'tool-1',
params: { input: 'initial' },
interface: {
inputs: { input: 'string' },
outputs: { processed: 'string' }
inputs: {},
outputs: { output: 'string' }
}
}
},
{
id: 'format',
position: { x: 100, y: 0 },
id: 'block-2',
position: { x: 200, y: 0 },
config: {
tool: 'formatter',
tool: 'tool-2',
params: {},
interface: {
inputs: { input: 'string' },
@@ -290,31 +294,89 @@ describe('Executor', () => {
}
}
],
connections: [{
source: 'process',
target: 'format',
sourceHandle: 'processed',
targetHandle: 'input'
}]
connections: [
{
source: 'block-1',
target: 'block-2',
sourceHandle: 'output',
targetHandle: 'input'
}
]
};
// Mock fetch for both tools
global.fetch = jest.fn()
.mockImplementationOnce(() => Promise.resolve({
ok: true,
json: () => Promise.resolve({ processed: 'TEST' })
}))
.mockImplementationOnce(() => Promise.resolve({
ok: true,
json: () => Promise.resolve({ result: '<TEST>' })
}));
.mockImplementationOnce(() =>
Promise.resolve({
ok: true,
json: () => Promise.resolve({ output: 'test data' })
})
)
.mockImplementationOnce(() =>
Promise.resolve({
ok: true,
json: () => Promise.resolve({ result: 'processed data' })
})
);
const executor = new Executor(workflow);
const result = await executor.execute('workflow-1', { input: 'test' });
const result = await executor.execute('workflow-1');
expect(result.success).toBe(true);
expect(result.data).toEqual({ result: '<TEST>' });
expect(result.data).toEqual({ result: 'processed data' });
expect(global.fetch).toHaveBeenCalledTimes(2);
});
it('should handle cycles in workflow', async () => {
const workflow: SerializedWorkflow = {
version: '1.0',
blocks: [
{
id: 'block-1',
position: { x: 0, y: 0 },
config: {
tool: 'test-tool',
params: {},
interface: {
inputs: {},
outputs: {}
}
}
},
{
id: 'block-2',
position: { x: 200, y: 0 },
config: {
tool: 'test-tool',
params: {},
interface: {
inputs: {},
outputs: {}
}
}
}
],
connections: [
{
source: 'block-1',
target: 'block-2',
sourceHandle: 'output',
targetHandle: 'input'
},
{
source: 'block-2',
target: 'block-1',
sourceHandle: 'output',
targetHandle: 'input'
}
]
};
const executor = new Executor(workflow);
const result = await executor.execute('workflow-1');
expect(result.success).toBe(false);
expect(result.error).toContain('Workflow contains cycles');
});
});
});

View File

@@ -1,6 +1,7 @@
import { SerializedWorkflow, SerializedBlock, BlockConfig } from '@/serializer/types';
import { SerializedWorkflow, SerializedBlock } from '@/serializer/types';
import { ExecutionContext, ExecutionResult, Tool } from './types';
import { tools } from '@/tools';
import { BlockState } from '@/stores/workflow/types';
export class Executor {
private workflow: SerializedWorkflow;
@@ -14,7 +15,7 @@ export class Executor {
inputs: Record<string, any>,
context: ExecutionContext
): Promise<Record<string, any>> {
const config = block.config as BlockConfig;
const config = block.config;
const toolId = config.tool;
if (!toolId) {
@@ -113,15 +114,17 @@ export class Executor {
return typeof value === 'number';
case 'boolean':
return typeof value === 'boolean';
case 'object':
return typeof value === 'object' && value !== null;
case 'array':
return Array.isArray(value);
case 'function':
return typeof value === 'function';
case 'json':
try {
if (typeof value === 'string') {
JSON.parse(value);
}
return true;
} catch {
return false;
}
default:
// For complex types like 'Record<string, any>', 'string[]', etc.
// We just do basic object/array validation
// For complex types, we just do basic object/array validation
return true;
}
}
@@ -187,20 +190,22 @@ export class Executor {
}
});
// If this is a start block, pass through workflow inputs
if (Object.keys(inputs).length === 0 && context.input) {
return context.input;
// If this is a start block with no inputs, use the block's params
if (Object.keys(inputs).length === 0) {
const targetBlock = this.workflow.blocks.find(b => b.id === block.id);
if (targetBlock) {
return targetBlock.config.params;
}
}
return inputs;
}
async execute(workflowId: string, input: Record<string, any>): Promise<ExecutionResult> {
async execute(workflowId: string): Promise<ExecutionResult> {
const startTime = new Date();
const context: ExecutionContext = {
workflowId,
blockStates: new Map(),
input,
metadata: {
startTime: startTime.toISOString()
}

View File

@@ -28,7 +28,7 @@ export interface ToolRegistry {
export interface ExecutionContext {
workflowId: string;
blockStates: Map<string, any>;
input: Record<string, any>;
input?: Record<string, any>;
metadata?: Record<string, any>;
}

View File

@@ -1,6 +1,47 @@
import { Node, Edge } from 'reactflow';
import { Edge } from 'reactflow';
import { Serializer } from '../index';
import { SerializedWorkflow } from '../types';
import { BlockState } from '@/stores/workflow/types';
import { OutputType } from '@/blocks/types';
jest.mock('@/blocks', () => ({
getBlock: (type: string) => {
if (type === 'http') {
return {
type,
workflow: {
tools: {
access: ['http.request'],
config: {
tool: () => 'http.request'
}
},
inputs: {
url: 'string',
method: 'string'
},
outputType: 'json'
}
};
}
// Default agent block config
return {
type,
workflow: {
tools: {
access: ['openai.chat'],
config: {
tool: () => 'openai.chat'
}
},
inputs: {
prompt: 'string'
},
outputType: 'string'
}
};
}
}));
describe('Serializer', () => {
let serializer: Serializer;
@@ -11,61 +52,51 @@ describe('Serializer', () => {
describe('serializeWorkflow', () => {
it('should serialize a workflow with agent and http blocks', () => {
const blocks: Node[] = [
{
const blocks: Record<string, BlockState> = {
'agent-1': {
id: 'agent-1',
type: 'custom',
type: 'agent',
name: 'GPT-4o Agent',
position: { x: 100, y: 100 },
data: {
tool: 'openai.chat',
params: {
model: 'gpt-4o',
systemPrompt: 'You are helpful',
temperature: 0.7
subBlocks: {
'model': {
id: 'model',
type: 'dropdown',
value: 'gpt-4o'
},
interface: {
inputs: {
prompt: 'string'
},
outputs: {
response: 'string',
tokens: 'number'
}
'systemPrompt': {
id: 'systemPrompt',
type: 'long-input',
value: 'You are helpful'
},
title: 'GPT-4o Agent',
description: 'Language model block',
category: 'AI',
icon: 'brain',
color: '#7F2FFF'
}
'temperature': {
id: 'temperature',
type: 'slider',
value: 0.7
}
},
outputType: 'string'
},
{
'http-1': {
id: 'http-1',
type: 'custom',
type: 'http',
name: 'API Call',
position: { x: 400, y: 100 },
data: {
tool: 'http.request',
params: {
url: 'https://api.example.com',
method: 'GET'
subBlocks: {
'url': {
id: 'url',
type: 'short-input',
value: 'https://api.example.com'
},
interface: {
inputs: {
body: 'object'
},
outputs: {
data: 'object',
status: 'number'
}
},
title: 'API Call',
description: 'HTTP request block',
category: 'Web',
icon: 'globe',
color: '#00FF00'
}
'method': {
id: 'method',
type: 'dropdown',
value: 'GET'
}
},
outputType: 'json'
}
];
};
const connections: Edge[] = [
{
@@ -93,13 +124,6 @@ describe('Serializer', () => {
systemPrompt: 'You are helpful',
temperature: 0.7
});
expect(agentBlock?.metadata).toEqual({
title: 'GPT-4o Agent',
description: 'Language model block',
category: 'AI',
icon: 'brain',
color: '#7F2FFF'
});
// Test http block serialization
const httpBlock = serialized.blocks.find(b => b.id === 'http-1');
@@ -112,21 +136,22 @@ describe('Serializer', () => {
});
it('should handle blocks with minimal required configuration', () => {
const blocks: Node[] = [{
id: 'minimal-1',
type: 'custom',
position: { x: 0, y: 0 },
data: {
tool: 'openai.chat',
params: {
model: 'gpt-4o'
const blocks: Record<string, BlockState> = {
'minimal-1': {
id: 'minimal-1',
type: 'agent',
name: 'Minimal Agent',
position: { x: 0, y: 0 },
subBlocks: {
'model': {
id: 'model',
type: 'dropdown',
value: 'gpt-4o'
}
},
interface: {
inputs: {},
outputs: {}
}
outputType: 'string'
}
}];
};
const serialized = serializer.serializeWorkflow(blocks, []);
const block = serialized.blocks[0];
@@ -134,71 +159,68 @@ describe('Serializer', () => {
expect(block.id).toBe('minimal-1');
expect(block.config.tool).toBe('openai.chat');
expect(block.config.params).toEqual({ model: 'gpt-4o' });
expect(block.metadata).toBeUndefined();
});
it('should handle complex workflow with multiple interconnected blocks', () => {
const blocks: Node[] = [
{
const blocks: Record<string, BlockState> = {
'input-1': {
id: 'input-1',
type: 'custom',
type: 'http',
name: 'Data Input',
position: { x: 100, y: 100 },
data: {
tool: 'http.request',
params: {
url: 'https://api.data.com',
method: 'GET'
subBlocks: {
'url': {
id: 'url',
type: 'short-input',
value: 'https://api.data.com'
},
interface: {
inputs: {},
outputs: {
data: 'object'
}
'method': {
id: 'method',
type: 'dropdown',
value: 'GET'
}
}
},
outputType: 'json'
},
{
'process-1': {
id: 'process-1',
type: 'custom',
type: 'agent',
name: 'Data Processor',
position: { x: 300, y: 100 },
data: {
tool: 'openai.chat',
params: {
model: 'gpt-4o',
systemPrompt: 'Process this data'
subBlocks: {
'model': {
id: 'model',
type: 'dropdown',
value: 'gpt-4o'
},
interface: {
inputs: {
data: 'object',
config: 'object'
},
outputs: {
result: 'string'
}
'systemPrompt': {
id: 'systemPrompt',
type: 'long-input',
value: 'Process this data'
}
}
},
outputType: 'string'
},
{
'output-1': {
id: 'output-1',
type: 'custom',
type: 'http',
name: 'Data Output',
position: { x: 500, y: 100 },
data: {
tool: 'http.request',
params: {
url: 'https://api.output.com',
method: 'POST'
subBlocks: {
'url': {
id: 'url',
type: 'short-input',
value: 'https://api.output.com'
},
interface: {
inputs: {
body: 'string'
},
outputs: {
status: 'number'
}
'method': {
id: 'method',
type: 'dropdown',
value: 'POST'
}
}
},
outputType: 'json'
}
];
};
const connections: Edge[] = [
{
@@ -230,57 +252,56 @@ describe('Serializer', () => {
expect(conn1.target).toBe('process-1');
expect(conn2.source).toBe('process-1');
expect(conn2.target).toBe('output-1');
// Verify interface matching
const process = serialized.blocks.find(b => b.id === 'process-1');
expect(process?.config.interface.inputs).toHaveProperty('data');
expect(process?.config.interface.outputs).toHaveProperty('result');
});
it('should preserve tool-specific parameters', () => {
const blocks: Node[] = [{
id: 'agent-1',
type: 'custom',
position: { x: 0, y: 0 },
data: {
tool: 'openai.chat',
params: {
model: 'gpt-4o',
temperature: 0.7,
maxTokens: 1000,
topP: 0.9,
frequencyPenalty: 0.1,
presencePenalty: 0.1
const blocks: Record<string, BlockState> = {
'agent-1': {
id: 'agent-1',
type: 'agent',
name: 'Advanced Agent',
position: { x: 0, y: 0 },
subBlocks: {
'model': {
id: 'model',
type: 'dropdown',
value: 'gpt-4o'
},
'temperature': {
id: 'temperature',
type: 'slider',
value: 0.7
},
'maxTokens': {
id: 'maxTokens',
type: 'slider',
value: 1000
}
},
interface: {
inputs: { prompt: 'string' },
outputs: { response: 'string' }
}
outputType: 'string'
}
}];
};
const serialized = serializer.serializeWorkflow(blocks, []);
const block = serialized.blocks[0];
expect(block.config.tool).toBe('openai.chat');
expect(block.config.params).toEqual({
model: 'gpt-4o',
temperature: 0.7,
maxTokens: 1000,
topP: 0.9,
frequencyPenalty: 0.1,
presencePenalty: 0.1
maxTokens: 1000
});
});
});
describe('deserializeWorkflow', () => {
it('should deserialize a workflow back to ReactFlow format', () => {
it('should deserialize a workflow back to blocks and connections', () => {
const workflow: SerializedWorkflow = {
version: '1.0',
blocks: [
{
id: 'agent-1',
position: { x: 100, y: 100 },
position: { x: 0, y: 0 },
config: {
tool: 'openai.chat',
params: {
@@ -289,27 +310,21 @@ describe('Serializer', () => {
},
interface: {
inputs: { prompt: 'string' },
outputs: { response: 'string' }
outputs: { output: 'string' }
}
},
metadata: {
title: 'GPT-4o Agent',
category: 'AI'
}
}
],
connections: []
};
const { blocks, connections } = serializer.deserializeWorkflow(workflow);
const { blocks } = serializer.deserializeWorkflow(workflow);
const block = blocks['agent-1'];
expect(blocks).toHaveLength(1);
const block = blocks[0];
expect(block.id).toBe('agent-1');
expect(block.type).toBe('custom');
expect(block.data.tool).toBe('openai.chat');
expect(block.data.params.model).toBe('gpt-4o');
expect(block.data.title).toBe('GPT-4o Agent');
expect(block.type).toBe('openai.chat');
expect(block.subBlocks.model.value).toBe('gpt-4o');
expect(block.subBlocks.systemPrompt.value).toBe('You are helpful');
expect(block.outputType).toBe('string');
});
});
});

View File

@@ -1,11 +1,14 @@
import { Node, Edge } from "reactflow";
import { SerializedBlock, SerializedConnection, SerializedWorkflow } from "./types";
import { BlockState, SubBlockState } from '@/stores/workflow/types';
import { Edge } from 'reactflow';
import { SerializedBlock, SerializedConnection, SerializedWorkflow } from './types';
import { getBlock } from '@/blocks';
import { OutputType, SubBlockType } from '@/blocks/types';
export class Serializer {
serializeWorkflow(blocks: Node[], connections: Edge[]): SerializedWorkflow {
serializeWorkflow(blocks: Record<string, BlockState>, connections: Edge[]): SerializedWorkflow {
return {
version: '1.0',
blocks: blocks.map(block => this.serializeBlock(block)),
blocks: Object.values(blocks).map(block => this.serializeBlock(block)),
connections: connections.map(conn => ({
source: conn.source,
target: conn.target,
@@ -15,45 +18,60 @@ export class Serializer {
};
}
private serializeBlock(block: Node): SerializedBlock {
const { data } = block;
const serialized: SerializedBlock = {
private serializeBlock(block: BlockState): SerializedBlock {
const blockConfig = getBlock(block.type);
if (!blockConfig) {
throw new Error(`Block configuration not found for type: ${block.type}`);
}
// Get the tool ID from the block's configuration
const tools = blockConfig.workflow.tools;
if (!tools?.access || tools.access.length === 0) {
throw new Error(`No tools specified for block type: ${block.type}`);
}
// Get all values from subBlocks
const params: Record<string, any> = {};
Object.entries(block.subBlocks || {}).forEach(([id, subBlock]) => {
if (subBlock?.value !== undefined) {
params[id] = subBlock.value;
}
});
// Get the tool ID from the block's configuration
const toolId = tools.config?.tool?.(params) || params.tool || tools.access[0];
if (!toolId || !tools.access.includes(toolId)) {
throw new Error(`Invalid or unauthorized tool: ${toolId}`);
}
return {
id: block.id,
position: {
x: block.position.x,
y: block.position.y
},
position: block.position,
config: {
tool: data.tool,
params: data.params || {},
tool: toolId,
params: params,
interface: {
inputs: data.interface?.inputs || {},
outputs: data.interface?.outputs || {}
inputs: blockConfig.workflow.inputs || {},
outputs: {
output: block.outputType
}
}
}
};
const metadata = {
title: data.title,
description: data.description,
category: data.category,
icon: data.icon,
color: data.color
};
if (Object.values(metadata).some(value => value !== undefined)) {
serialized.metadata = metadata;
}
return serialized;
}
deserializeWorkflow(serialized: SerializedWorkflow): {
blocks: Node[];
blocks: Record<string, BlockState>;
connections: Edge[];
} {
const blocks: Record<string, BlockState> = {};
serialized.blocks.forEach(block => {
const deserialized = this.deserializeBlock(block);
blocks[deserialized.id] = deserialized;
});
return {
blocks: serialized.blocks.map(block => this.deserializeBlock(block)),
blocks,
connections: serialized.connections.map(conn => ({
id: `${conn.source}-${conn.target}`,
source: conn.source,
@@ -64,17 +82,31 @@ export class Serializer {
};
}
private deserializeBlock(serialized: SerializedBlock): Node {
private deserializeBlock(serialized: SerializedBlock): BlockState {
return {
id: serialized.id,
type: 'custom',
type: serialized.config.tool,
name: `${serialized.config.tool} Block`,
position: serialized.position,
data: {
tool: serialized.config.tool,
params: serialized.config.params,
interface: serialized.config.interface,
...(serialized.metadata || {})
}
subBlocks: Object.entries(serialized.config.params).reduce((acc, [key, value]) => {
acc[key] = {
id: key,
type: this.inferSubBlockType(value),
value: value
};
return acc;
}, {} as Record<string, SubBlockState>),
outputType: serialized.config.interface.outputs.output as OutputType
};
}
private inferSubBlockType(value: any): SubBlockType {
if (Array.isArray(value) && Array.isArray(value[0])) {
return 'table';
}
if (typeof value === 'string' && value.length > 100) {
return 'long-input';
}
return 'short-input';
}
}