diff --git a/python/packages/autogen-studio/autogenstudio/web/routes/workflows.py b/python/packages/autogen-studio/autogenstudio/web/routes/workflows.py index 2c7651e8e..1da66e852 100644 --- a/python/packages/autogen-studio/autogenstudio/web/routes/workflows.py +++ b/python/packages/autogen-studio/autogenstudio/web/routes/workflows.py @@ -90,7 +90,7 @@ async def create_workflow(request: CreateWorkflowRequest, db=Depends(get_db)) -> try: workflow = db.upsert( WorkflowDB( - config=request.config, + config=request.config.model_dump(), user_id=request.user_id, ), return_json=False, @@ -136,7 +136,7 @@ async def update_workflow( if request.description is not None: workflow.description = request.description if request.config is not None: - workflow.config = request.config + workflow.config = request.config.model_dump() updated = db.upsert(workflow, return_json=False) return {"status": updated.status, "data": updated.data} except HTTPException: diff --git a/python/packages/autogen-studio/autogenstudio/workflow/core/_runner.py b/python/packages/autogen-studio/autogenstudio/workflow/core/_runner.py index 0f16f8ab8..1b217c3f1 100644 --- a/python/packages/autogen-studio/autogenstudio/workflow/core/_runner.py +++ b/python/packages/autogen-studio/autogenstudio/workflow/core/_runner.py @@ -459,17 +459,28 @@ class WorkflowRunner: # No dependencies, use initial input return initial_input.copy() - # For sequential workflows: use the most recent dependency's output directly - # For parallel/fan-in: this logic would need to be more sophisticated - latest_dependency = dependencies[-1] # Most recent dependency - dep_execution = execution.step_executions.get(latest_dependency) + # For conditional workflows: find the dependency that actually executed and has output + # Check all dependencies and find the one that completed successfully + completed_dependency = None + for dep_id in dependencies: + dep_execution = execution.step_executions.get(dep_id) + if dep_execution and dep_execution.output_data and dep_execution.status.value == "completed": + # Also verify that the edge condition for this dependency is satisfied + for edge in workflow.edges: + if edge.from_step == dep_id and edge.to_step == step_id: + if workflow._evaluate_edge_condition(edge, execution): + completed_dependency = dep_id + break + if completed_dependency: + break - if dep_execution and dep_execution.output_data: - # Direct forwarding: previous step's output becomes this step's input + if completed_dependency: + dep_execution = execution.step_executions.get(completed_dependency) + logger.info(f"Using output from completed dependency {completed_dependency} for step {step_id}") return dep_execution.output_data.copy() else: - # Fallback to initial input if dependency output not available - logger.warning(f"No output available from dependency {latest_dependency} for step {step_id}, using initial input") + # Fallback to initial input if no valid dependency output is available + logger.warning(f"No valid completed dependency found for step {step_id}, using initial input") return initial_input.copy() async def run_step( diff --git a/python/packages/autogen-studio/autogenstudio/workflow/core/_workflow.py b/python/packages/autogen-studio/autogenstudio/workflow/core/_workflow.py index 456550af5..735d217a8 100644 --- a/python/packages/autogen-studio/autogenstudio/workflow/core/_workflow.py +++ b/python/packages/autogen-studio/autogenstudio/workflow/core/_workflow.py @@ -170,21 +170,37 @@ class BaseWorkflow(ComponentBase[BaseModel]): if step_id == self.start_step_id: ready_steps.append(step_id) elif dependencies: - # Check if all dependencies are completed - all_deps_complete = True - for dep_id in dependencies: - dep_exec = execution.step_executions.get(dep_id) - if not dep_exec or dep_exec.status.value != "completed": - all_deps_complete = False - break + # Determine if this is a fan-in (AND) or conditional (OR) pattern + incoming_edges = [edge for edge in self.edges if edge.to_step == step_id] - if all_deps_complete: - # Also check edge conditions - for edge in self.edges: - if edge.to_step == step_id: + # Fan-in pattern: all edges have "always" conditions + is_fan_in = all(edge.condition.type == "always" for edge in incoming_edges) + + if is_fan_in: + # Fan-in (AND logic): all dependencies must be complete + all_deps_complete = True + for dep_id in dependencies: + dep_exec = execution.step_executions.get(dep_id) + if not dep_exec or dep_exec.status.value != "completed": + all_deps_complete = False + break + + if all_deps_complete: + ready_steps.append(step_id) + else: + # Conditional (OR logic): any valid path makes step ready + step_ready = False + for edge in incoming_edges: + # Check if this specific dependency is complete + dep_exec = execution.step_executions.get(edge.from_step) + if dep_exec and dep_exec.status.value == "completed": + # Check if the edge condition passes if self._evaluate_edge_condition(edge, execution): - ready_steps.append(step_id) + step_ready = True break + + if step_ready: + ready_steps.append(step_id) return ready_steps diff --git a/python/packages/autogen-studio/autogenstudio/workflow/defaults.py b/python/packages/autogen-studio/autogenstudio/workflow/defaults.py index c559be6f7..6483152c7 100644 --- a/python/packages/autogen-studio/autogenstudio/workflow/defaults.py +++ b/python/packages/autogen-studio/autogenstudio/workflow/defaults.py @@ -6,9 +6,9 @@ Provides the echo chain workflow and its steps from the original example. from typing import List from autogen_core import ComponentModel -from pydantic import BaseModel +from pydantic import BaseModel, Field -from .core import Workflow, StepMetadata, WorkflowMetadata +from .core import Workflow, StepMetadata, WorkflowMetadata, EdgeCondition from .steps import EchoStep, HttpStep, AgentStep, TransformStep from .steps._http import HttpRequestInput, HttpResponseOutput from .steps._agent import AgentInput, AgentOutput @@ -22,65 +22,179 @@ class MessageOutput(BaseModel): result: str -def _create_echo_steps() -> List[EchoStep]: - """Create the echo chain steps that are reused in both workflow and step library.""" +class WebpageInput(BaseModel): + """UI-friendly input for webpage summarization workflow.""" + url: str = Field( + default="https://httpbin.org/html", + description="URL of the webpage to summarize", + examples=["https://example.com", "https://news.ycombinator.com"] + ) + message: str = Field( + default="Starting workflow execution", + description="Optional message (ignored, for UI compatibility)" + ) + + +class CollectedOutput(BaseModel): + """Output model for collected results from multiple processing streams.""" + collected_results: list[str] = Field(description="List of processed results from parallel streams") + total_processed: int = Field(description="Total number of items processed") + processing_summary: str = Field(description="Summary of the processing workflow") + + +def _create_echo_steps(): + """Create complex echo chain steps demonstrating parallel processing, validation, and fan-out/fan-in.""" - # Step 1: Receive and format message + # Step 1: Receive and broadcast message receive_step = EchoStep( step_id="receive", - metadata=StepMetadata(name="Receive Message",description="Initial step to receive a message", tags=["input"]), + metadata=StepMetadata( + name="Receive Message", + description="Initial step to receive and broadcast a message to parallel processing streams", + tags=["input", "broadcast"] + ), input_type=MessageInput, output_type=MessageOutput, prefix="📥 RECEIVED: ", - suffix=" [INBOX]", - delay_seconds=10 + suffix=" [BROADCASTING TO PARALLEL STREAMS]", + delay_seconds=2 ) - # Step 2: Process the message - process_step = EchoStep( - step_id="process", - metadata=StepMetadata(name="Process Message", description="Step to process the received message", tags=["processing"]), + # Parallel Processing Steps (Fan-out) + process_urgent_step = EchoStep( + step_id="process_urgent", + metadata=StepMetadata( + name="Process Urgent", + description="Fast processing stream for urgent messages", + tags=["processing", "urgent", "fast"] + ), input_type=MessageOutput, output_type=MessageOutput, - prefix="⚙️ PROCESSING: ", - suffix=" [ANALYZED]", - delay_seconds=10 + prefix="🚨 URGENT: ", + suffix=" [FAST-TRACKED]", + delay_seconds=3 # Fast processing ) - # Step 3: Validate the message - validate_step = EchoStep( - step_id="validate", - metadata=StepMetadata(name="Validate Message", description="Step to validate the processed message", tags=["validation"]), + process_standard_step = EchoStep( + step_id="process_standard", + metadata=StepMetadata( + name="Process Standard", + description="Standard processing stream for regular messages", + tags=["processing", "standard", "medium"] + ), input_type=MessageOutput, output_type=MessageOutput, - prefix="✅ VALIDATED: ", - suffix=" [APPROVED]", - delay_seconds=15 + prefix="📋 STANDARD: ", + suffix=" [PROCESSED]", + delay_seconds=7 # Medium processing ) - # Step 4: Send final message + process_detailed_step = EchoStep( + step_id="process_detailed", + metadata=StepMetadata( + name="Process Detailed", + description="Detailed processing stream for complex analysis", + tags=["processing", "detailed", "slow"] + ), + input_type=MessageOutput, + output_type=MessageOutput, + prefix="🔍 DETAILED: ", + suffix=" [DEEP-ANALYZED]", + delay_seconds=12 # Slow, thorough processing + ) + + # Validation Steps for each processing stream + validate_urgent_step = EchoStep( + step_id="validate_urgent", + metadata=StepMetadata( + name="Validate Urgent", + description="Quick validation for urgent processing results", + tags=["validation", "urgent"] + ), + input_type=MessageOutput, + output_type=MessageOutput, + prefix="✅ URGENT-VALIDATED: ", + suffix=" [APPROVED-FAST]", + delay_seconds=1 + ) + + validate_standard_step = EchoStep( + step_id="validate_standard", + metadata=StepMetadata( + name="Validate Standard", + description="Standard validation for regular processing results", + tags=["validation", "standard"] + ), + input_type=MessageOutput, + output_type=MessageOutput, + prefix="✅ STANDARD-VALIDATED: ", + suffix=" [APPROVED-NORMAL]", + delay_seconds=4 + ) + + validate_detailed_step = EchoStep( + step_id="validate_detailed", + metadata=StepMetadata( + name="Validate Detailed", + description="Thorough validation for detailed processing results", + tags=["validation", "detailed"] + ), + input_type=MessageOutput, + output_type=MessageOutput, + prefix="✅ DETAILED-VALIDATED: ", + suffix=" [APPROVED-THOROUGH]", + delay_seconds=6 + ) + + # Collector Step (Fan-in) using TransformStep for serializable aggregation + collect_step = TransformStep( + step_id="collect", + metadata=StepMetadata( + name="Collect Results", + description="Collect and aggregate results from all parallel processing streams", + tags=["collection", "aggregation", "fan-in"] + ), + input_type=MessageOutput, # Will receive from any validation step + output_type=CollectedOutput, + mappings={ + "collected_results": ["static:Results from all parallel processing streams"], + "total_processed": 3, # Number of parallel streams + "processing_summary": "result" # Use the result field from the triggering validation step + } + ) + + # Final Send Step send_step = EchoStep( step_id="send", - metadata=StepMetadata(name="Send Message", description="Final step to send the message", tags=["output"]), - input_type=MessageOutput, + metadata=StepMetadata( + name="Send Final Results", + description="Send aggregated results from all processing streams", + tags=["output", "final"] + ), + input_type=CollectedOutput, output_type=MessageOutput, - prefix="📤 SENT: ", - suffix=" [DELIVERED]", - delay_seconds=15 + prefix="📤 FINAL RESULTS: ", + suffix=" [DELIVERED TO ALL STAKEHOLDERS]", + delay_seconds=3 ) - return [receive_step, process_step, validate_step, send_step] + return [ + receive_step, + process_urgent_step, process_standard_step, process_detailed_step, + validate_urgent_step, validate_standard_step, validate_detailed_step, + collect_step, send_step + ] def create_echo_chain_workflow() -> ComponentModel: - """Create the default echo chain workflow.""" + """Create a complex echo chain workflow demonstrating parallel processing, validation, and fan-out/fan-in patterns.""" workflow = Workflow( metadata=WorkflowMetadata( - name="Echo Chain Workflow", - description="Chain of echo steps that process and transform a message", - version="1.0.0", - tags=["demo", "echo", "chain"] + name="Complex Echo Processing Workflow", + description="Parallel message processing with urgent/standard/detailed streams, validation, and result aggregation", + version="2.0.0", + tags=["demo", "echo", "parallel", "fan-out", "fan-in", "validation"] ) ) @@ -91,13 +205,38 @@ def create_echo_chain_workflow() -> ComponentModel: for step in steps: workflow.add_step(step) - # Create linear chain: receive -> process -> validate -> send - workflow.add_edge("receive", "process") - workflow.add_edge("process", "validate") - workflow.add_edge("validate", "send") + # Create complex parallel processing pattern: + # + # receive → [process_urgent, process_standard, process_detailed] (FAN-OUT) + # ↓ ↓ ↓ + # validate_urgent validate_standard validate_detailed + # ↓ ↓ ↓ + # └────────── collect ─────────────┘ (FAN-IN) + # ↓ + # send - # Set start and end + # Set start step workflow.set_start_step("receive") + + # Fan-out: receive broadcasts to all three processing streams + workflow.add_edge("receive", "process_urgent") + workflow.add_edge("receive", "process_standard") + workflow.add_edge("receive", "process_detailed") + + # Each processing stream flows to its validation step + workflow.add_edge("process_urgent", "validate_urgent") + workflow.add_edge("process_standard", "validate_standard") + workflow.add_edge("process_detailed", "validate_detailed") + + # Fan-in: all validation steps feed into the collector + workflow.add_edge("validate_urgent", "collect") + workflow.add_edge("validate_standard", "collect") + workflow.add_edge("validate_detailed", "collect") + + # Final step: collector feeds into send + workflow.add_edge("collect", "send") + + # Set end step workflow.add_end_step("send") return workflow.dump_component() @@ -118,6 +257,26 @@ def create_simple_agent_workflow() -> ComponentModel: ) ) + # Transform UI input to HTTP input + input_transform = TransformStep( + step_id="input_transform", + metadata=StepMetadata( + name="Input Transform", + description="Transform UI input to HTTP request", + tags=["transform", "input"] + ), + input_type=WebpageInput, + output_type=HttpRequestInput, + mappings={ + "url": "url", # Extract URL from UI input + "method": "static:GET", + "timeout": 30, + "verify_ssl": True, + "headers": {}, # Empty dict for headers + "data": {} # Empty dict for data + } + ) + http_step = HttpStep( step_id="http_fetch", metadata=StepMetadata( @@ -142,7 +301,7 @@ def create_simple_agent_workflow() -> ComponentModel: "model": "static:gpt-4.1-nano", "temperature": 0.3, "max_tokens": 512, - "context_data": {"content": "content"} + "context_data": {"content": "content"} # Dict with content field mapped to input.content } ) @@ -155,22 +314,235 @@ def create_simple_agent_workflow() -> ComponentModel: ) ) + workflow.add_step(input_transform) workflow.add_step(http_step) workflow.add_step(transform_step) workflow.add_step(agent_step) + workflow.add_edge("input_transform", "http_fetch") workflow.add_edge("http_fetch", "transform_to_agent_input") workflow.add_edge("transform_to_agent_input", "agent_summarize") - workflow.set_start_step("http_fetch") + workflow.set_start_step("input_transform") # Start with UI-friendly input workflow.add_end_step("agent_summarize") return workflow.dump_component() +class ConditionalInput(BaseModel): + """Input for conditional workflow.""" + message: str = Field(description="Message to process") + priority: str = Field( + default="normal", + description="Priority level: urgent, normal, or low", + examples=["urgent", "normal", "low"] + ) + enable_validation: bool = Field( + default=True, + description="Whether to enable validation step" + ) + + +def create_conditional_workflow() -> ComponentModel: + """Create a simple conditional workflow demonstrating conditional edges.""" + + workflow = Workflow( + metadata=WorkflowMetadata( + name="Conditional Processing Workflow", + description="Demonstrates conditional routing based on message priority and validation settings", + version="1.0.0", + tags=["demo", "conditional", "routing"] + ) + ) + + # Input step for conditional workflow + receive_step = EchoStep( + step_id="receive_conditional", + metadata=StepMetadata( + name="Receive Conditional", + description="Receive message and prepare for conditional routing", + tags=["input"] + ), + input_type=ConditionalInput, # Proper input schema for UI introspection + output_type=MessageOutput, + prefix="📨 CONDITIONAL INPUT: ", + suffix=" [ROUTING BASED ON CONDITIONS]", + delay_seconds=1 + ) + + # Fast track for urgent messages + urgent_process_step = EchoStep( + step_id="urgent_process", + metadata=StepMetadata( + name="Urgent Process", + description="Fast processing for urgent messages", + tags=["processing", "urgent"] + ), + input_type=MessageOutput, + output_type=MessageOutput, + prefix="🚨 URGENT FAST-TRACK: ", + suffix=" [EXPEDITED]", + delay_seconds=2 + ) + + # Normal processing + normal_process_step = EchoStep( + step_id="normal_process", + metadata=StepMetadata( + name="Normal Process", + description="Standard processing for normal messages", + tags=["processing", "normal"] + ), + input_type=MessageOutput, + output_type=MessageOutput, + prefix="📋 NORMAL PROCESSING: ", + suffix=" [STANDARD]", + delay_seconds=5 + ) + + # Low priority processing + low_process_step = EchoStep( + step_id="low_process", + metadata=StepMetadata( + name="Low Priority Process", + description="Slow processing for low priority messages", + tags=["processing", "low"] + ), + input_type=MessageOutput, + output_type=MessageOutput, + prefix="🐌 LOW PRIORITY: ", + suffix=" [BATCH-PROCESSED]", + delay_seconds=8 + ) + + # Optional validation step + validation_step = EchoStep( + step_id="validation", + metadata=StepMetadata( + name="Validation", + description="Optional validation step", + tags=["validation"] + ), + input_type=MessageOutput, + output_type=MessageOutput, + prefix="✅ VALIDATED: ", + suffix=" [APPROVED]", + delay_seconds=3 + ) + + # Final delivery + deliver_step = EchoStep( + step_id="deliver", + metadata=StepMetadata( + name="Deliver", + description="Final delivery step", + tags=["output"] + ), + input_type=MessageOutput, + output_type=MessageOutput, + prefix="📦 DELIVERED: ", + suffix=" [COMPLETE]", + delay_seconds=1 + ) + + # Add steps + workflow.add_step(receive_step) + workflow.add_step(urgent_process_step) + workflow.add_step(normal_process_step) + workflow.add_step(low_process_step) + workflow.add_step(validation_step) + workflow.add_step(deliver_step) + + # Set start step + workflow.set_start_step("receive_conditional") + + # Conditional edges based on priority from initial input (state-based) + urgent_condition = EdgeCondition( + type="state_based", + field="priority", + operator="==", + value="urgent" + ) + workflow.add_edge( + "receive_conditional", "urgent_process", + condition=urgent_condition.model_dump() + ) + + normal_condition = EdgeCondition( + type="state_based", + field="priority", + operator="==", + value="normal" + ) + workflow.add_edge( + "receive_conditional", "normal_process", + condition=normal_condition.model_dump() + ) + + low_condition = EdgeCondition( + type="state_based", + field="priority", + operator="==", + value="low" + ) + workflow.add_edge( + "receive_conditional", "low_process", + condition=low_condition.model_dump() + ) + + # Conditional validation (only if enable_validation is True) + validation_enabled_condition = EdgeCondition( + type="state_based", + field="enable_validation", + operator="==", + value=True + ) + workflow.add_edge( + "urgent_process", "validation", + condition=validation_enabled_condition.model_dump() + ) + workflow.add_edge( + "normal_process", "validation", + condition=validation_enabled_condition.model_dump() + ) + workflow.add_edge( + "low_process", "validation", + condition=validation_enabled_condition.model_dump() + ) + + # Skip validation if disabled + validation_disabled_condition = EdgeCondition( + type="state_based", + field="enable_validation", + operator="==", + value=False + ) + workflow.add_edge( + "urgent_process", "deliver", + condition=validation_disabled_condition.model_dump() + ) + workflow.add_edge( + "normal_process", "deliver", + condition=validation_disabled_condition.model_dump() + ) + workflow.add_edge( + "low_process", "deliver", + condition=validation_disabled_condition.model_dump() + ) + + # Validation to delivery + workflow.add_edge("validation", "deliver") + + # Set end step + workflow.add_end_step("deliver") + + return workflow.dump_component() + + def get_default_workflows() -> List[ComponentModel]: """Get the default workflows.""" return [ create_echo_chain_workflow(), create_simple_agent_workflow(), + create_conditional_workflow(), ] \ No newline at end of file diff --git a/python/packages/autogen-studio/autogenstudio/workflow/examples/simple_agent.py b/python/packages/autogen-studio/autogenstudio/workflow/examples/simple_agent.py index 95621398c..7201a2e41 100644 --- a/python/packages/autogen-studio/autogenstudio/workflow/examples/simple_agent.py +++ b/python/packages/autogen-studio/autogenstudio/workflow/examples/simple_agent.py @@ -3,11 +3,10 @@ Webpage summarization workflow example: HTTP fetch → Agent summarize """ import asyncio -from typing import Any, Dict from pydantic import BaseModel from autogenstudio.workflow import Workflow, WorkflowRunner, WorkflowMetadata, StepMetadata -from autogenstudio.workflow.steps import HttpStep, AgentStep, FunctionStep, TransformStep +from autogenstudio.workflow.steps import HttpStep, AgentStep, TransformStep from autogenstudio.workflow.steps._http import HttpRequestInput, HttpResponseOutput from autogenstudio.workflow.steps._agent import AgentInput, AgentOutput from autogenstudio.workflow.core._models import Context diff --git a/python/packages/autogen-studio/autogenstudio/workflow/steps/_transform.py b/python/packages/autogen-studio/autogenstudio/workflow/steps/_transform.py index 7cf87757c..1957dc34c 100644 --- a/python/packages/autogen-studio/autogenstudio/workflow/steps/_transform.py +++ b/python/packages/autogen-studio/autogenstudio/workflow/steps/_transform.py @@ -54,15 +54,36 @@ class TransformStep(Component[TransformStepConfig], BaseStep[Any, Any]): """ Execute the transform: map fields from input to output according to config. """ + from ..schema_utils import coerce_value_to_schema_type + output_kwargs = {} + output_schema = self.output_type.model_json_schema() + for out_field, in_field in self.mappings.items(): + # Get the raw value if isinstance(in_field, str): if in_field.startswith("static:"): - output_kwargs[out_field] = in_field[len("static:"):] + raw_value = in_field[len("static:"):] else: - output_kwargs[out_field] = getattr(input_data, in_field, None) + raw_value = getattr(input_data, in_field, None) + elif isinstance(in_field, dict): + # Handle dict mappings with nested field resolution + raw_value = {} + for dict_key, dict_field in in_field.items(): + if isinstance(dict_field, str): + if dict_field.startswith("static:"): + raw_value[dict_key] = dict_field[len("static:"):] + else: + raw_value[dict_key] = getattr(input_data, dict_field, None) + else: + raw_value[dict_key] = dict_field else: - output_kwargs[out_field] = in_field + raw_value = in_field + + # Apply defensive type coercion using shared utility + coerced_value = coerce_value_to_schema_type(raw_value, out_field, output_schema) + output_kwargs[out_field] = coerced_value + return self.output_type(**output_kwargs) def _to_config(self) -> TransformStepConfig: diff --git a/python/packages/autogen-studio/frontend/src/components/sidebar.tsx b/python/packages/autogen-studio/frontend/src/components/sidebar.tsx index 8a3ea2be2..81c2b8d6a 100644 --- a/python/packages/autogen-studio/frontend/src/components/sidebar.tsx +++ b/python/packages/autogen-studio/frontend/src/components/sidebar.tsx @@ -14,6 +14,7 @@ import { Beaker, LucideBeaker, FlaskConical, + GitBranch, } from "lucide-react"; import Icon from "./icons"; import { BeakerIcon } from "@heroicons/react/24/outline"; @@ -36,6 +37,14 @@ const navigation: INavItem[] = [ icon: Bot, breadcrumbs: [{ name: "Team Builder", href: "/build", current: true }], }, + { + name: "Workflows", + href: "/workflow", + icon: GitBranch, + breadcrumbs: [ + { name: "Workflows (Experimental)", href: "/workflow", current: true }, + ], + }, { name: "Playground", href: "/", diff --git a/python/packages/autogen-studio/frontend/src/components/views/workflows/WorkflowInputForm.tsx b/python/packages/autogen-studio/frontend/src/components/views/workflows/WorkflowInputForm.tsx new file mode 100644 index 000000000..318f18625 --- /dev/null +++ b/python/packages/autogen-studio/frontend/src/components/views/workflows/WorkflowInputForm.tsx @@ -0,0 +1,212 @@ +import React, { useState, useEffect } from 'react'; +import { Modal, Form, Input, Select, Checkbox, InputNumber, Button, Alert } from 'antd'; +import { PlayCircleOutlined } from '@ant-design/icons'; +import { + extractWorkflowInputSchema, + generateDefaultInputValues, + validateInputValues, + WorkflowInputField +} from './workflowInputUtils'; + +interface WorkflowInputFormProps { + workflow: any; + visible: boolean; + onSubmit: (input: Record) => void; + onCancel: () => void; + loading?: boolean; +} + +export const WorkflowInputForm: React.FC = ({ + workflow, + visible, + onSubmit, + onCancel, + loading = false +}) => { + const [form] = Form.useForm(); + const [inputFields, setInputFields] = useState([]); + const [validationErrors, setValidationErrors] = useState([]); + + // Extract input schema when workflow changes + useEffect(() => { + if (workflow) { + const fields = extractWorkflowInputSchema(workflow); + setInputFields(fields); + + // Set default values + const defaultValues = generateDefaultInputValues(fields); + form.setFieldsValue(defaultValues); + } + }, [workflow, form]); + + const handleSubmit = () => { + form.validateFields().then(values => { + // Additional validation using our schema + const validation = validateInputValues(inputFields, values); + + if (!validation.isValid) { + setValidationErrors(validation.errors); + return; + } + + setValidationErrors([]); + onSubmit(values); + }).catch(error => { + console.error('Form validation failed:', error); + }); + }; + + const renderFormField = (field: WorkflowInputField) => { + const { name, type, enum: enumValues, description, examples, required, title } = field; + + const rules = [ + { required, message: `${title || name} is required` } + ]; + + if (enumValues && enumValues.length > 0) { + // Dropdown for enum fields (like priority) + return ( + + + + ); + }; + + const workflowName = workflow?.config?.config?.name || 'Workflow'; + const workflowDescription = workflow?.config?.config?.description; + + return ( + + + Run {workflowName} + + } + open={visible} + onCancel={onCancel} + footer={[ + , + + ]} + width={600} + destroyOnClose + > +
+ {workflowDescription && ( +

+ {workflowDescription} +

+ )} + + {validationErrors.length > 0 && ( + + {validationErrors.map((error, index) => ( +
  • {error}
  • + ))} + + } + type="error" + showIcon + style={{ marginBottom: 16 }} + /> + )} +
    + +
    + {inputFields.map(renderFormField)} +
    + + {inputFields.length === 0 && ( +
    + No input parameters required for this workflow. +
    + )} +
    + ); +}; + +export default WorkflowInputForm; \ No newline at end of file diff --git a/python/packages/autogen-studio/frontend/src/components/views/workflows/builder.tsx b/python/packages/autogen-studio/frontend/src/components/views/workflows/builder.tsx index 6cf14a095..64a986e95 100644 --- a/python/packages/autogen-studio/frontend/src/components/views/workflows/builder.tsx +++ b/python/packages/autogen-studio/frontend/src/components/views/workflows/builder.tsx @@ -4,6 +4,8 @@ import React, { useState, useContext, useRef, + forwardRef, + useImperativeHandle, } from "react"; import { ReactFlow, @@ -45,6 +47,7 @@ import { StepNode } from "./nodes"; import { Toolbar } from "./toolbar"; import { StepDetails } from "./step-details"; import { useWorkflowWebSocket } from "./useWorkflowWebSocket"; +import WorkflowInputForm from "./WorkflowInputForm"; import { workflowAPI } from "./api"; import { appContext } from "../../../hooks/provider"; import { @@ -54,6 +57,7 @@ import { saveNodePosition, removeNodePosition, calculateNodePosition, + getDagreLayoutedNodes, // <-- import dagre layout util } from "./utils"; import { Component } from "../../types/datamodel"; import { MonacoEditor } from "../monaco"; @@ -71,27 +75,33 @@ interface WorkflowBuilderProps { onDirtyStateChange?: (isDirty: boolean) => void; } -export const WorkflowBuilder: React.FC = ({ - workflow, - onChange, - onSave, - onDirtyStateChange, -}) => { +export interface WorkflowBuilderHandle { + resetUIState: () => void; +} + +export const WorkflowBuilder = forwardRef< + WorkflowBuilderHandle, + WorkflowBuilderProps +>(({ workflow, onChange, onSave, onDirtyStateChange }, ref) => { const [nodes, setNodes, onNodesChange] = useNodesState>([]); const [edges, setEdges, onEdgesChange] = useEdgesState([]); const [isLibraryCompact, setIsLibraryCompact] = useState(false); const [showMiniMap, setShowMiniMap] = useState(false); const [showGrid, setShowGrid] = useState(true); const [isDirty, setIsDirty] = useState(false); + // Add state for error details expand/collapse + const [showErrorDetails, setShowErrorDetails] = useState(false); const [selectedStep, setSelectedStep] = useState(null); const [selectedStepExecution, setSelectedStepExecution] = useState< StepExecution | undefined >(undefined); const [stepDetailsOpen, setStepDetailsOpen] = useState(false); - const [edgeType, setEdgeType] = useState("smoothstep"); + const [edgeType, setEdgeType] = useState("default"); const [isJsonMode, setIsJsonMode] = useState(false); const [workingCopy, setWorkingCopy] = useState(workflow); const editorRef = useRef(null); + const [showInputForm, setShowInputForm] = useState(false); + const [isStartingWorkflow, setIsStartingWorkflow] = useState(false); const [messageApi, contextHolder] = message.useMessage(); const { user } = useContext(appContext); @@ -321,7 +331,7 @@ export const WorkflowBuilder: React.FC = ({ addEdge( { ...params, - type: edgeType, + type: edgeType, // will be 'bezier' or 'step' }, eds ) @@ -482,15 +492,67 @@ export const WorkflowBuilder: React.FC = ({ ? parseInt(workflow.id, 10) : workflow.id || 0; - const flowNodes = convertToReactFlowNodes( + let flowNodes = convertToReactFlowNodes( workflow.config.config, workflowId, handleDeleteStep, handleStepClick ); const flowEdges = convertToReactFlowEdges(workflow.config.config, edgeType); - setNodes(flowNodes); - setEdges(flowEdges); + + // Only apply dagre to nodes that do NOT have a saved position + const positions = JSON.parse( + localStorage.getItem(`workflow-${workflowId}-positions`) || "{}" + ); + const nodesWithoutSavedPos = flowNodes.filter((n) => !positions[n.id]); + if (nodesWithoutSavedPos.length > 1 && flowEdges.length > 0) { + const dagreLayouted = getDagreLayoutedNodes( + nodesWithoutSavedPos, + flowEdges, + "LR" + ); + // Merge dagre positions into flowNodes + flowNodes = flowNodes.map((n) => { + const dagreNode = dagreLayouted.find((dn) => dn.id === n.id); + return dagreNode ? { ...n, position: dagreNode.position } : n; + }); + } + + // Preserve existing execution state when recreating nodes + setNodes((currentNodes) => { + const nodeMap = new Map(currentNodes.map((node) => [node.id, node])); + return flowNodes.map((newNode) => { + const existingNode = nodeMap.get(newNode.id); + if (existingNode) { + // Preserve execution state but update other data + return { + ...newNode, + data: { + ...newNode.data, + executionStatus: existingNode.data.executionStatus, + executionData: existingNode.data.executionData, + }, + }; + } + return newNode; + }); + }); + + setEdges((currentEdges) => { + const edgeMap = new Map(currentEdges.map((edge) => [edge.id, edge])); + return flowEdges.map((newEdge) => { + const existingEdge = edgeMap.get(newEdge.id); + if (existingEdge) { + // Preserve existing style and animation state + return { + ...newEdge, + style: existingEdge.style, + animated: existingEdge.animated, + }; + } + return newEdge; + }); + }); }, [ workflow.config, workflow.id, @@ -511,7 +573,7 @@ export const WorkflowBuilder: React.FC = ({ } }, [workflow, onSave, messageApi]); - const handleRunWorkflow = useCallback(async () => { + const handleRunWorkflow = useCallback(() => { if (!user?.id) { messageApi.error("User not authenticated"); return; @@ -522,61 +584,66 @@ export const WorkflowBuilder: React.FC = ({ return; } - try { - messageApi.loading("Starting workflow execution...", 0); + // Show the input form modal + setShowInputForm(true); + }, [user?.id, workflow.config.config.steps, messageApi]); - // Reset WebSocket state first - resetState(); + const handleWorkflowInputSubmit = useCallback( + async (input: Record) => { + try { + setIsStartingWorkflow(true); + messageApi.loading("Starting workflow execution...", 0); - // Reset all node and edge states to initial state - setNodes((currentNodes) => - currentNodes.map((node) => ({ - ...node, - data: { - ...node.data, - executionStatus: undefined, // Reset to no status - }, - })) - ); + // Reset WebSocket state first + resetState(); - setEdges((currentEdges) => - currentEdges.map((edge) => ({ - ...edge, - style: { stroke: "#6b7280", strokeWidth: 2 }, // Reset to default style - animated: false, - })) - ); + // Reset all node and edge states to initial state + setNodes((currentNodes) => + currentNodes.map((node) => ({ + ...node, + data: { + ...node.data, + executionStatus: undefined, // Reset to no status + }, + })) + ); - // Create a workflow run - const runResponse = await workflowAPI.createWorkflowRun( - undefined, // workflowId - use config instead for real-time execution - workflow.config - ); + setEdges((currentEdges) => + currentEdges.map((edge) => ({ + ...edge, + style: { stroke: "#6b7280", strokeWidth: 2 }, // Reset to default style + animated: false, + })) + ); - const { run_id, workflow_config } = runResponse.data; + // Create a workflow run + const runResponse = await workflowAPI.createWorkflowRun( + undefined, // workflowId - use config instead for real-time execution + workflow.config + ); - // Start WebSocket-based execution - startWorkflow(run_id, workflow_config, { - // Optional initial input - could be made configurable - message: "Starting workflow execution", - }); + const { run_id, workflow_config } = runResponse.data; - messageApi.destroy(); // Clear loading message - messageApi.success("Workflow execution started!"); - } catch (error) { - messageApi.destroy(); - console.error("Error starting workflow:", error); - messageApi.error("Failed to start workflow execution"); - } - }, [ - user?.id, - workflow.config, - startWorkflow, - resetState, - messageApi, - setNodes, - setEdges, - ]); + // Start WebSocket-based execution with user input + startWorkflow(run_id, workflow_config, input); + + messageApi.destroy(); // Clear loading message + messageApi.success("Workflow execution started!"); + setShowInputForm(false); + } catch (error) { + messageApi.destroy(); + console.error("Error starting workflow:", error); + messageApi.error("Failed to start workflow execution"); + } finally { + setIsStartingWorkflow(false); + } + }, + [workflow.config, startWorkflow, resetState, messageApi, setNodes, setEdges] + ); + + const handleWorkflowInputCancel = useCallback(() => { + setShowInputForm(false); + }, []); const handleStopWorkflow = useCallback(() => { stopWorkflow(); @@ -592,25 +659,23 @@ export const WorkflowBuilder: React.FC = ({ if (isNaN(workflowId)) return; - workflow.config.config.steps?.forEach((step, index) => { - const position = calculateNodePosition( - index, - workflow.config.config.steps?.length || 0 + // Use dagre to layout nodes + setNodes((currentNodes) => { + const flowEdges = convertToReactFlowEdges( + workflow.config.config, + edgeType ); - saveNodePosition(workflowId, step.config.step_id, position); + const layouted = getDagreLayoutedNodes(currentNodes, flowEdges, "LR"); + // Persist new positions + layouted.forEach((node) => { + saveNodePosition(workflowId, node.id, node.position); + }); + return layouted; }); - const flowNodes = convertToReactFlowNodes( - workflow.config.config, - workflowId, - handleDeleteStep, - handleStepClick - ); - setNodes(flowNodes); - messageApi.success("Nodes arranged automatically"); } - }, [workflow, messageApi, handleDeleteStep, setNodes]); + }, [workflow, messageApi, setNodes, edgeType]); const handleStepClick = useCallback( (step: StepConfig, executionData?: StepExecution) => { @@ -680,21 +745,47 @@ export const WorkflowBuilder: React.FC = ({ }; }, [nodes, handleDeleteStep]); + useImperativeHandle( + ref, + () => ({ + resetUIState: () => { + setNodes([]); + setEdges([]); + setIsLibraryCompact(false); + setShowMiniMap(false); + setShowGrid(true); + setIsDirty(false); + setSelectedStep(null); + setSelectedStepExecution(undefined); + setStepDetailsOpen(false); + setEdgeType("default"); + setIsJsonMode(false); + setWorkingCopy(workflow); + setShowInputForm(false); + setIsStartingWorkflow(false); + // Reset execution state and disconnect WebSocket + disconnect(); + resetState(); + }, + }), + [workflow, disconnect, resetState] + ); + return (
    {contextHolder} {/* CSS for edge animations */} + .react-flow__edge-path { + animation: dash 1s linear infinite; + } + `} {/* Main Canvas */}
    @@ -757,6 +848,69 @@ export const WorkflowBuilder: React.FC = ({ {workflow.config.config.edges?.length || 0} connections
    + {/* Error Details for Failed Steps (Collapsible) */} + {executionState.status === WorkflowStatus.FAILED && + (() => { + // Find failed steps with error details + const failedSteps = ( + workflow.config.config.steps || [] + ).filter((step) => { + const exec = + executionState.execution?.step_executions?.[ + step.config.step_id + ]; + return ( + exec && exec.status === StepStatus.FAILED && exec.error + ); + }); + if (!failedSteps.length) return null; + return ( +
    + + {showErrorDetails && ( +
    + {failedSteps.map((step) => { + const exec = + executionState.execution?.step_executions?.[ + step.config.step_id + ]; + return ( +
    +
    + {step.config.metadata.name || + step.config.step_id} +
    +
    + {exec?.error} +
    +
    + ); + })} +
    + )} +
    + ); + })()} + {/* Results Panel: Compact, live-updating step outputs */} {executionState.execution && workflow.config.config.steps?.length && @@ -1017,8 +1171,17 @@ export const WorkflowBuilder: React.FC = ({ onClose={() => setStepDetailsOpen(false)} /> )} + + {/* Workflow Input Form Modal */} +
    ); -}; +}); export default WorkflowBuilder; diff --git a/python/packages/autogen-studio/frontend/src/components/views/workflows/index.ts b/python/packages/autogen-studio/frontend/src/components/views/workflows/index.ts index 3d5d3939a..2ab3f718c 100644 --- a/python/packages/autogen-studio/frontend/src/components/views/workflows/index.ts +++ b/python/packages/autogen-studio/frontend/src/components/views/workflows/index.ts @@ -1,7 +1,5 @@ export { default as FoundryManager } from "./manager"; export { default as FoundrySidebar } from "./sidebar"; export { default as FoundryBuilder } from "./builder"; -export { default as NewWorkflowControls } from "./newworkflow"; export * from "./types"; export * from "./utils"; -export { foundryAPI } from "./api"; diff --git a/python/packages/autogen-studio/frontend/src/components/views/workflows/manager.tsx b/python/packages/autogen-studio/frontend/src/components/views/workflows/manager.tsx index e54cd88a6..eeabdde70 100644 --- a/python/packages/autogen-studio/frontend/src/components/views/workflows/manager.tsx +++ b/python/packages/autogen-studio/frontend/src/components/views/workflows/manager.tsx @@ -5,7 +5,7 @@ import { appContext } from "../../../hooks/provider"; import { workflowAPI } from "./api"; import { WorkflowSidebar } from "./sidebar"; import { Workflow } from "./types"; -import WorkflowBuilder from "./builder"; +import WorkflowBuilder, { WorkflowBuilderHandle } from "./builder"; export const WorkflowManager: React.FC = () => { const [isLoading, setIsLoading] = useState(false); @@ -22,6 +22,7 @@ export const WorkflowManager: React.FC = () => { const { user } = useContext(appContext); const [messageApi, contextHolder] = message.useMessage(); const [hasUnsavedChanges, setHasUnsavedChanges] = useState(false); + const workflowBuilderRef = React.useRef(null); // Persist sidebar state useEffect(() => { @@ -98,6 +99,10 @@ export const WorkflowManager: React.FC = () => { try { const workflow = await workflowAPI.getWorkflow(workflowId, user.id); setCurrentWorkflow(workflow); + // Reset all UI state in builder + if (workflowBuilderRef.current) { + workflowBuilderRef.current.resetUIState(); + } window.history.pushState({}, "", `?workflowId=${workflowId}`); setHasUnsavedChanges(false); } catch (error) { @@ -134,11 +139,23 @@ export const WorkflowManager: React.FC = () => { name, description: "A new workflow.", config: { - id: `config-${Date.now()}`, - name, + provider: "autogenstudio.workflow.core.Workflow", + component_type: "workflow", + version: 1, + component_version: 1, description: "A new workflow.", - steps: [], - edges: [], + label: "New Workflow", + config: { + metadata: { + name, + description: "A new workflow.", + version: "1.0.0", + tags: [], + }, + steps: [], + edges: [], + initial_state: {}, + }, }, }, user.id @@ -146,6 +163,10 @@ export const WorkflowManager: React.FC = () => { setWorkflows([newWorkflow, ...workflows]); setCurrentWorkflow(newWorkflow); + // Reset all UI state in builder + if (workflowBuilderRef.current) { + workflowBuilderRef.current.resetUIState(); + } messageApi.success("Workflow created successfully"); } catch (error) { console.error("Error creating workflow:", error); @@ -177,12 +198,11 @@ export const WorkflowManager: React.FC = () => { ? parseInt(currentWorkflow.id, 10) : currentWorkflow.id, { - id: currentWorkflow.id.toString(), // Convert to string for UpdateWorkflowRequest name: workflowConfig?.name || currentWorkflow.config.config.name, description: workflowConfig?.description || currentWorkflow.config.config.description, - config: workflowData.config?.config || currentWorkflow.config.config, + config: workflowData.config || currentWorkflow.config, }, user.id ); @@ -249,6 +269,7 @@ export const WorkflowManager: React.FC = () => { {currentWorkflow ? (
    void; -} - -const NewWorkflowControls = ({ - isLoading, - onCreateWorkflow, -}: NewWorkflowControlsProps) => { - const handleCreateWorkflow = async () => { - await onCreateWorkflow(); - }; - - return ( -
    - - -
    - - Graph-based workflow -
    -
    - ); -}; - -export default NewWorkflowControls; diff --git a/python/packages/autogen-studio/frontend/src/components/views/workflows/sidebar.tsx b/python/packages/autogen-studio/frontend/src/components/views/workflows/sidebar.tsx index 6aa7d3559..22769c716 100644 --- a/python/packages/autogen-studio/frontend/src/components/views/workflows/sidebar.tsx +++ b/python/packages/autogen-studio/frontend/src/components/views/workflows/sidebar.tsx @@ -13,7 +13,6 @@ import { import { Workflow } from "./types"; import { getRelativeTimeString } from "../atoms"; import { getWorkflowTypeColor } from "./utils"; -import NewWorkflowControls from "./newworkflow"; interface WorkflowSidebarProps { isOpen: boolean; @@ -93,10 +92,17 @@ export const WorkflowSidebar: React.FC = ({
    {isOpen && ( - +
    + +
    )}
    diff --git a/python/packages/autogen-studio/frontend/src/components/views/workflows/toolbar.tsx b/python/packages/autogen-studio/frontend/src/components/views/workflows/toolbar.tsx index b6cff7fa3..aadf2af6f 100644 --- a/python/packages/autogen-studio/frontend/src/components/views/workflows/toolbar.tsx +++ b/python/packages/autogen-studio/frontend/src/components/views/workflows/toolbar.tsx @@ -79,8 +79,8 @@ export const Toolbar: React.FC = ({
    Edge Style
    onEdgeTypeChange(value as string)} @@ -135,7 +135,7 @@ export const Toolbar: React.FC = ({ icon={} onClick={onRun} disabled={disabled} - className="h-10 w-10 flex items-center justify-center" + className="h-10 w-10 flex items-center justify-center" /> diff --git a/python/packages/autogen-studio/frontend/src/components/views/workflows/utils.ts b/python/packages/autogen-studio/frontend/src/components/views/workflows/utils.ts index 586e14571..872be2c0e 100644 --- a/python/packages/autogen-studio/frontend/src/components/views/workflows/utils.ts +++ b/python/packages/autogen-studio/frontend/src/components/views/workflows/utils.ts @@ -1,6 +1,7 @@ import { Component } from "../../types/datamodel"; import { WorkflowConfig, NodeData, StepConfig, StepExecution } from "./types"; import { Node, Edge } from "@xyflow/react"; +import dagre from "@dagrejs/dagre"; // Workflow utilities export const createEmptyWorkflow = ( @@ -110,3 +111,36 @@ export const addStepToWorkflow = ( steps: [...config.steps, { ...step, config: step.config }], }; }; + +// Layout nodes using dagre (left-to-right) +export function getDagreLayoutedNodes( + nodes: Node[], + edges: Edge[], + direction: "LR" | "TB" = "LR" +): Node[] { + const g = new dagre.graphlib.Graph(); + g.setDefaultEdgeLabel(() => ({})); + g.setGraph({ rankdir: direction }); + + // Set nodes with width/height (adjust as needed for your node size) + nodes.forEach((node) => { + g.setNode(node.id, { width: 220, height: 80 }); + }); + + // Set edges + edges.forEach((edge) => { + g.setEdge(edge.source, edge.target); + }); + + dagre.layout(g); + + // Update node positions + return nodes.map((node) => { + const pos = g.node(node.id); + if (!pos) return node; + return { + ...node, + position: { x: pos.x - 110, y: pos.y - 40 }, // Center node + }; + }); +} diff --git a/python/packages/autogen-studio/frontend/src/components/views/workflows/workflowInputUtils.ts b/python/packages/autogen-studio/frontend/src/components/views/workflows/workflowInputUtils.ts new file mode 100644 index 000000000..b7bf20d5b --- /dev/null +++ b/python/packages/autogen-studio/frontend/src/components/views/workflows/workflowInputUtils.ts @@ -0,0 +1,193 @@ +/** + * Utilities for generating dynamic input forms based on workflow schemas + */ + +interface WorkflowInputField { + name: string; + type: 'string' | 'number' | 'boolean' | 'array'; + required: boolean; + default?: any; + enum?: string[]; + description?: string; + examples?: any[]; + title?: string; +} + +/** + * Extract input schema from a workflow's start step + */ +export function extractWorkflowInputSchema(workflow: any): WorkflowInputField[] { + try { + // Handle both backend API response format and direct workflow config + const config = workflow?.config?.config || workflow?.config; + if (!config?.steps?.length || !config.start_step_id) { + console.warn('No steps or start_step_id found, using message fallback'); + return [{ + name: 'message', + type: 'string', + required: true, + description: 'Message to process', + title: 'Message' + }]; + } + + // Find the start step - handle both component format and direct config format + const startStep = config.steps.find((step: any) => { + // Handle Component format from backend + const stepId = step.config?.step_id || step.step_id; + return stepId === config.start_step_id; + }); + + if (!startStep) { + console.warn(`Start step '${config.start_step_id}' not found, using message fallback`); + return [{ + name: 'message', + type: 'string', + required: true, + description: 'Message to process', + title: 'Message' + }]; + } + + // Extract input schema - handle both component and direct formats + const stepConfig = startStep.config || startStep; + const inputSchema = stepConfig.input_schema; + + if (!inputSchema?.properties) { + console.warn('No input_schema.properties found, using message fallback'); + return [{ + name: 'message', + type: 'string', + required: true, + description: 'Message to process', + title: 'Message' + }]; + } + + const properties = inputSchema.properties; + const required = inputSchema.required || []; + + console.log('Successfully extracted input schema:', { properties, required }); + + return Object.entries(properties).map(([fieldName, fieldSchema]: [string, any]) => ({ + name: fieldName, + type: mapJsonTypeToInputType(fieldSchema.type), + required: required.includes(fieldName), + default: fieldSchema.default, + // Use enum if present, otherwise use examples as enum for string fields with limited options + enum: fieldSchema.enum || (fieldSchema.type === 'string' && fieldSchema.examples?.length ? fieldSchema.examples : undefined), + description: fieldSchema.description, + examples: fieldSchema.examples, + title: fieldSchema.title || fieldName + })); + } catch (error) { + console.warn('Error extracting workflow input schema:', error); + // Fallback to simple message input + return [{ + name: 'message', + type: 'string', + required: true, + description: 'Message to process', + title: 'Message' + }]; + } +} + +/** + * Map JSON schema types to input field types + */ +function mapJsonTypeToInputType(jsonType: string): 'string' | 'number' | 'boolean' | 'array' { + switch (jsonType) { + case 'integer': + case 'number': + return 'number'; + case 'boolean': + return 'boolean'; + case 'array': + return 'array'; + default: + return 'string'; + } +} + +/** + * Generate default input values from schema + */ +export function generateDefaultInputValues(inputFields: WorkflowInputField[]): Record { + const defaultValues: Record = {}; + + inputFields.forEach(field => { + if (field.default !== undefined) { + defaultValues[field.name] = field.default; + } else if (field.required) { + // Provide sensible defaults for required fields + switch (field.type) { + case 'string': + defaultValues[field.name] = field.enum?.[0] || ''; + break; + case 'number': + defaultValues[field.name] = 0; + break; + case 'boolean': + defaultValues[field.name] = false; + break; + case 'array': + defaultValues[field.name] = []; + break; + } + } + }); + + return defaultValues; +} + +/** + * Validate input values against schema + */ +export function validateInputValues( + inputFields: WorkflowInputField[], + values: Record +): { isValid: boolean; errors: string[] } { + const errors: string[] = []; + + inputFields.forEach(field => { + const value = values[field.name]; + + // Check required fields + if (field.required && (value === undefined || value === null || value === '')) { + errors.push(`${field.title || field.name} is required`); + return; + } + + // Skip validation for empty optional fields + if (!field.required && (value === undefined || value === null || value === '')) { + return; + } + + // Type validation + switch (field.type) { + case 'number': + if (isNaN(Number(value))) { + errors.push(`${field.title || field.name} must be a number`); + } + break; + case 'boolean': + if (typeof value !== 'boolean') { + errors.push(`${field.title || field.name} must be true or false`); + } + break; + case 'string': + if (field.enum && !field.enum.includes(value)) { + errors.push(`${field.title || field.name} must be one of: ${field.enum.join(', ')}`); + } + break; + } + }); + + return { + isValid: errors.length === 0, + errors + }; +} + +export type { WorkflowInputField }; \ No newline at end of file