diff --git a/.cursorrules b/.cursorrules deleted file mode 100644 index 42ef136ae..000000000 --- a/.cursorrules +++ /dev/null @@ -1,1429 +0,0 @@ -# CrewAI Development Rules -# Comprehensive best practices for developing with the CrewAI library, covering code organization, performance, security, testing, and common patterns. Based on actual CrewAI codebase analysis for accuracy. - -## General Best Practices: -- Leverage structured responses from LLM calls using Pydantic BaseModel for output validation. -- Use the @CrewBase decorator pattern with @agent, @task, and @crew decorators for proper organization. -- Regularly validate outputs from agents and tasks using built-in guardrails or custom validation. -- Use UV for dependency management (CrewAI's standard) with pyproject.toml configuration. -- Python version requirements: 3.10 to 3.14 (as per CrewAI's pyproject.toml). -- Prefer declarative YAML configuration for agents and tasks over hardcoded definitions. - -## Code Organization and Structure: -- **Standard CrewAI Project Structure** (from CLI templates): - - `project_name/` (Root directory) - - `.env` (Environment variables - never commit API keys) - - `pyproject.toml` (UV-based dependency management) - - `knowledge/` (Knowledge base files) - - `src/project_name/` - - `__init__.py` - - `main.py` (Entry point) - - `crew.py` (Crew orchestration with @CrewBase decorator) - - `config/` - - `agents.yaml` (Agent definitions) - - `tasks.yaml` (Task definitions) - - `tools/` - - `custom_tool.py` (Custom agent tools) - - `__init__.py` -- **File Naming Conventions**: - - Use descriptive, lowercase names with underscores (e.g., `research_agent.py`). - - Pydantic models: singular names (e.g., `article_summary.py` with class `ArticleSummary`). - - Tests: mirror source file name with `_test` suffix (e.g., `crew_test.py`). -- **CrewAI Class Architecture**: - - Use @CrewBase decorator for main crew class. - - Define agents with @agent decorator returning Agent instances. - - Define tasks with @task decorator returning Task instances. - - Define crew orchestration with @crew decorator returning Crew instance. - - Access configuration via `self.agents_config` and `self.tasks_config`. - -## Memory System Patterns: -- **Memory Types** (all supported by CrewAI): - - Short-term memory: ChromaDB with RAG for current context - - Long-term memory: SQLite for task results across sessions - - Entity memory: RAG to track entities (people, places, concepts) - - External memory: Mem0 integration for advanced memory features -- **Memory Configuration**: - - Enable basic memory: `Crew(..., memory=True)` - - Custom storage location: Set `CREWAI_STORAGE_DIR` environment variable - - Memory is stored in platform-specific directories via `appdirs` by default -- **Memory Usage**: - - Memory is automatically managed by agents during task execution - - Access via agent's memory attribute for custom implementations - - Use metadata for categorizing and filtering memory entries - -## Pydantic Integration Patterns: -- **Structured Outputs**: - - Use `output_pydantic` in Task definitions for structured results - - Use `output_json` for JSON dictionary outputs - - Cannot use both output_pydantic and output_json simultaneously -- **Task Output Handling**: - - TaskOutput contains raw, pydantic, and json_dict attributes - - CrewOutput aggregates all task outputs with token usage metrics - - Use model_validate_json for Pydantic model validation -- **Custom Models**: - - Inherit from BaseModel for all data structures - - Use Field descriptions for LLM understanding - - Implement model_validator for custom validation logic - -## YAML Configuration Best Practices: -- **agents.yaml Structure**: - ```yaml - agent_name: - role: "Clear, specific role description" - goal: "Specific goal statement" - backstory: "Detailed background for context" - # Optional: tools, llm, memory, etc. - ``` -- **tasks.yaml Structure**: - ```yaml - task_name: - description: "Detailed task description with context" - expected_output: "Clear output format specification" - agent: agent_name # Reference to agent in agents.yaml - # Optional: context, tools, output_file, etc. - ``` -- **Configuration Access**: - - Use `self.agents_config['agent_name']` in @agent methods - - Use `self.tasks_config['task_name']` in @task methods - - Support for dynamic configuration via placeholders like {topic} - -## Tools and Integration Patterns: -- **Custom Tools**: - - Inherit from BaseTool for custom tool implementation - - Use @tool decorator for simple tool definitions - - Implement proper error handling and input validation -- **Tool Integration**: - - Add tools to agents via tools parameter in Agent constructor - - Tools are automatically inherited by tasks from their assigned agents - - Use structured tool outputs for better LLM understanding - -## Performance Considerations: -- **LLM Optimization**: - - Use task context to pass information between sequential tasks - - Implement output caching to avoid redundant LLM calls - - Configure appropriate LLM models per agent for cost/performance balance -- **Memory Management**: - - Be mindful of memory storage growth in long-running applications - - Use score_threshold in memory search to filter relevant results - - Implement periodic memory cleanup if needed -- **Async Operations**: - - Use execute_sync for synchronous task execution - - Consider async patterns for I/O-bound operations in custom tools - -## Security Best Practices: -- **API Key Management**: - - Always use .env files for API keys and sensitive configuration - - Never commit API keys to version control - - Use environment variables in production deployments -- **Input Validation**: - - Validate all inputs using Pydantic models where possible - - Implement guardrails for task output validation - - Use field_validator for custom validation logic -- **Tool Security**: - - Implement proper access controls in custom tools - - Validate tool inputs and outputs - - Follow principle of least privilege for tool permissions - -## Testing Approaches: -- **Unit Testing**: - - Test individual agents, tasks, and tools in isolation - - Use mocking for external dependencies (LLMs, APIs) - - Test configuration loading and validation -- **Integration Testing**: - - Test crew execution end-to-end with realistic scenarios - - Verify memory persistence across crew runs - - Test tool integration and data flow between tasks -- **Test Organization**: - - Follow CrewAI's test structure: separate test files for each component - - Use pytest fixtures for common test setup - - Mock LLM responses for consistent, fast tests - -## Common CrewAI Patterns and Anti-patterns: -- **Recommended Patterns**: - - Use sequential Process for dependent tasks, hierarchical for manager delegation - - Implement task context for data flow between tasks - - Use output_file for persistent task results - - Leverage crew callbacks with @before_kickoff and @after_kickoff decorators -- **Anti-patterns to Avoid**: - - Don't hardcode agent configurations in Python code (use YAML) - - Don't create circular task dependencies - - Don't ignore task execution failures without proper error handling - - Don't overload single agents with too many diverse tools -- **Error Handling**: - - Implement task-level guardrails for output validation - - Use try-catch blocks in custom tools - - Set appropriate max_retries for tasks prone to failures - - Log errors with sufficient context for debugging - -## Development Workflow: -- **UV Commands**: - - `crewai create crew ` - Create new crew project - - `crewai install` - Install dependencies via UV - - `crewai run` - Execute the crew - - `uv sync` - Sync dependencies - - `uv add ` - Add new dependencies -- **Project Setup**: - - Use CrewAI CLI for project scaffolding - - Follow the standard directory structure - - Configure agents and tasks in YAML before implementing crew logic -- **Development Tools**: - - Use UV for dependency management (CrewAI standard) - - Configure pre-commit hooks for code quality - - Use pytest for testing with CrewAI's testing patterns - -## Deployment and Production: -- **Environment Configuration**: - - Set CREWAI_STORAGE_DIR for controlled memory storage location - - Use proper logging configuration for production monitoring - - Configure appropriate LLM providers and rate limits -- **Containerization**: - - Include knowledge and config directories in Docker images - - Mount memory storage as persistent volumes if needed - - Set proper environment variables for API keys and configuration -- **Monitoring**: - - Monitor token usage via CrewOutput.token_usage - - Track task execution times and success rates - - Implement health checks for long-running crew services - -## CrewAI Flow Patterns and Best Practices - -### Flow Architecture and Structure -- **Use Flow class** for complex multi-step workflows that go beyond simple crew orchestration -- **Combine Flows with Crews** to create sophisticated AI automation pipelines -- **Leverage state management** to share data between flow methods -- **Event-driven design** allows for dynamic and responsive workflow execution - -### Flow Decorators and Control Flow -- **@start()**: Mark entry points for flow execution (can have multiple start methods) -- **@listen()**: Create method dependencies and execution chains -- **@router()**: Implement conditional branching based on method outputs -- **or_()** and **and_()**: Combine multiple trigger conditions for complex workflows - -### Flow State Management Patterns -```python -# Structured state with Pydantic (recommended for complex workflows) -class WorkflowState(BaseModel): - task_results: List[str] = [] - current_step: str = "initialize" - user_preferences: dict = {} - completion_status: bool = False - -class MyFlow(Flow[WorkflowState]): - @start() - def initialize(self): - self.state.current_step = "processing" - # State automatically gets unique UUID in self.state.id - -# Unstructured state (good for simple workflows) -class SimpleFlow(Flow): - @start() - def begin(self): - self.state["counter"] = 0 - self.state["results"] = [] - # Auto-generated ID available in self.state["id"] -``` - -### Flow Method Patterns -```python -# Basic sequential flow -@start() -def step_one(self): - return "data from step one" - -@listen(step_one) -def step_two(self, data_from_step_one): - return f"processed: {data_from_step_one}" - -# Parallel execution with convergence -@start() -def task_a(self): - return "result_a" - -@start() -def task_b(self): - return "result_b" - -@listen(and_(task_a, task_b)) -def combine_results(self): - # Waits for both task_a AND task_b to complete - return f"combined: {self.state}" - -# Conditional routing -@router(step_one) -def decision_point(self): - if some_condition: - return "success_path" - return "failure_path" - -@listen("success_path") -def handle_success(self): - # Handle success case - pass - -@listen("failure_path") -def handle_failure(self): - # Handle failure case - pass - -# OR condition listening -@listen(or_(task_a, task_b)) -def process_any_result(self, result): - # Triggers when EITHER task_a OR task_b completes - return f"got result: {result}" -``` - -### Flow Persistence Patterns -```python -# Class-level persistence (all methods persisted) -@persist(verbose=True) -class PersistentFlow(Flow[MyState]): - @start() - def initialize(self): - self.state.counter += 1 - -# Method-level persistence (selective) -class SelectiveFlow(Flow): - @persist - @start() - def critical_step(self): - # Only this method's state is persisted - self.state["important_data"] = "value" - - @start() - def temporary_step(self): - # This method's state is not persisted - pass -``` - -### Flow Execution Patterns -```python -# Synchronous execution -flow = MyFlow() -result = flow.kickoff() -final_state = flow.state - -# Asynchronous execution -async def run_async_flow(): - flow = MyFlow() - result = await flow.kickoff_async() - return result - -# Flow with input parameters -flow = MyFlow() -result = flow.kickoff(inputs={"user_id": "123", "task": "research"}) - -# Flow plotting and visualization -flow.plot("workflow_diagram") # Generates HTML visualization -``` - -### Advanced Flow Patterns -```python -# Cyclic/Loop patterns -class CyclicFlow(Flow): - max_iterations = 5 - current_iteration = 0 - - @start("loop") - def process_iteration(self): - if self.current_iteration >= self.max_iterations: - return - # Process current iteration - self.current_iteration += 1 - - @router(process_iteration) - def check_continue(self): - if self.current_iteration < self.max_iterations: - return "loop" # Continue cycling - return "complete" - - @listen("complete") - def finalize(self): - # Final processing - pass - -# Complex multi-router pattern -@router(analyze_data) -def primary_router(self): - # Returns multiple possible paths based on analysis - if self.state.confidence > 0.8: - return "high_confidence" - elif self.state.errors_found: - return "error_handling" - return "manual_review" - -@router("high_confidence") -def secondary_router(self): - # Further routing based on high confidence results - return "automated_processing" - -# Exception handling in flows -@start() -def risky_operation(self): - try: - # Some operation that might fail - result = dangerous_function() - self.state["success"] = True - return result - except Exception as e: - self.state["error"] = str(e) - self.state["success"] = False - return None - -@listen(risky_operation) -def handle_result(self, result): - if self.state.get("success", False): - # Handle success case - pass - else: - # Handle error case - error = self.state.get("error") - # Implement error recovery logic -``` - -### Flow Integration with Crews -```python -# Combining Flows with Crews for complex workflows -class CrewOrchestrationFlow(Flow[WorkflowState]): - @start() - def research_phase(self): - research_crew = ResearchCrew() - result = research_crew.crew().kickoff(inputs={"topic": self.state.research_topic}) - self.state.research_results = result.raw - return result - - @listen(research_phase) - def analysis_phase(self, research_results): - analysis_crew = AnalysisCrew() - result = analysis_crew.crew().kickoff(inputs={ - "data": self.state.research_results, - "focus": self.state.analysis_focus - }) - self.state.analysis_results = result.raw - return result - - @router(analysis_phase) - def decide_next_action(self): - if self.state.analysis_results.confidence > 0.7: - return "generate_report" - return "additional_research" - - @listen("generate_report") - def final_report(self): - reporting_crew = ReportingCrew() - return reporting_crew.crew().kickoff(inputs={ - "research": self.state.research_results, - "analysis": self.state.analysis_results - }) -``` - -### Flow Best Practices -- **State Management**: Use structured state (Pydantic) for complex workflows, unstructured for simple ones -- **Method Design**: Keep flow methods focused and single-purpose -- **Error Handling**: Implement proper exception handling and error recovery paths -- **State Persistence**: Use @persist for critical workflows that need recovery capability -- **Flow Visualization**: Use flow.plot() to understand and debug complex workflow structures -- **Async Support**: Leverage async methods for I/O-bound operations within flows -- **Resource Management**: Be mindful of state size and memory usage in long-running flows -- **Testing Flows**: Test individual methods and overall flow execution patterns -- **Event Monitoring**: Use CrewAI event system to monitor flow execution and performance - -### Flow Anti-patterns to Avoid -- **Don't create overly complex flows** with too many branches and conditions -- **Don't store large objects** in state that could cause memory issues -- **Don't ignore error handling** in flow methods -- **Don't create circular dependencies** between flow methods -- **Don't mix synchronous and asynchronous** patterns inconsistently -- **Don't overuse routers** when simple linear flow would suffice -- **Don't forget to handle edge cases** in router logic - -## CrewAI Version Compatibility: -- Stay updated with CrewAI releases for new features and bug fixes -- Test crew functionality when upgrading CrewAI versions -- Use version constraints in pyproject.toml (e.g., "crewai[tools]>=0.140.0,<1.0.0") -- Monitor deprecation warnings for future compatibility - -## Code Examples and Implementation Patterns - -### Complete Crew Implementation Example: -```python -from crewai import Agent, Crew, Process, Task -from crewai.project import CrewBase, agent, crew, task, before_kickoff, after_kickoff -from crewai_tools import SerperDevTool, FileReadTool -from crewai.agents.agent_builder.base_agent import BaseAgent -from typing import List -from pydantic import BaseModel, Field - -class ResearchOutput(BaseModel): - title: str = Field(description="Research topic title") - summary: str = Field(description="Executive summary") - key_findings: List[str] = Field(description="Key research findings") - recommendations: List[str] = Field(description="Actionable recommendations") - sources: List[str] = Field(description="Source URLs and references") - confidence_score: float = Field(description="Confidence in findings (0-1)") - -@CrewBase -class ResearchCrew(): - """Advanced research crew with structured outputs and validation""" - - agents: List[BaseAgent] - tasks: List[Task] - - @before_kickoff - def setup_environment(self): - """Initialize environment before crew execution""" - print("πŸš€ Setting up research environment...") - # Validate API keys, create directories, etc. - - @after_kickoff - def cleanup_and_report(self, output): - """Handle post-execution tasks""" - print(f"βœ… Research completed. Generated {len(output.tasks_output)} task outputs") - print(f"πŸ“Š Token usage: {output.token_usage}") - - @agent - def researcher(self) -> Agent: - return Agent( - config=self.agents_config['researcher'], - tools=[SerperDevTool()], - verbose=True, - memory=True, - max_iter=15, - max_execution_time=1800 - ) - - @agent - def analyst(self) -> Agent: - return Agent( - config=self.agents_config['analyst'], - tools=[FileReadTool()], - verbose=True, - memory=True - ) - - @task - def research_task(self) -> Task: - return Task( - config=self.tasks_config['research_task'], - agent=self.researcher(), - output_pydantic=ResearchOutput - ) - - @task - def validation_task(self) -> Task: - return Task( - config=self.tasks_config['validation_task'], - agent=self.analyst(), - context=[self.research_task()], - guardrail=self.validate_research_quality, - max_retries=3 - ) - - def validate_research_quality(self, output) -> tuple[bool, str]: - """Custom guardrail to ensure research quality""" - content = output.raw - if len(content) < 500: - return False, "Research output too brief. Need more detailed analysis." - if not any(keyword in content.lower() for keyword in ['conclusion', 'finding', 'result']): - return False, "Missing key analytical elements." - return True, content - - @crew - def crew(self) -> Crew: - return Crew( - agents=self.agents, - tasks=self.tasks, - process=Process.sequential, - memory=True, - verbose=True, - max_rpm=100 - ) -``` - -### Custom Tool Implementation with Error Handling: -```python -from crewai.tools import BaseTool -from typing import Type, Optional, Any -from pydantic import BaseModel, Field -import requests -import time -from tenacity import retry, stop_after_attempt, wait_exponential - -class SearchInput(BaseModel): - query: str = Field(description="Search query") - max_results: int = Field(default=10, description="Maximum results to return") - timeout: int = Field(default=30, description="Request timeout in seconds") - -class RobustSearchTool(BaseTool): - name: str = "robust_search" - description: str = "Perform web search with retry logic and error handling" - args_schema: Type[BaseModel] = SearchInput - - def __init__(self, api_key: Optional[str] = None, **kwargs): - super().__init__(**kwargs) - self.api_key = api_key or os.getenv("SEARCH_API_KEY") - self.rate_limit_delay = 1.0 - self.last_request_time = 0 - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=4, max=10) - ) - def _run(self, query: str, max_results: int = 10, timeout: int = 30) -> str: - """Execute search with retry logic""" - try: - # Rate limiting - time_since_last = time.time() - self.last_request_time - if time_since_last < self.rate_limit_delay: - time.sleep(self.rate_limit_delay - time_since_last) - - # Input validation - if not query or len(query.strip()) == 0: - return "Error: Empty search query provided" - - if len(query) > 500: - return "Error: Search query too long (max 500 characters)" - - # Perform search - results = self._perform_search(query, max_results, timeout) - self.last_request_time = time.time() - - return self._format_results(results) - - except requests.exceptions.Timeout: - return f"Search timed out after {timeout} seconds" - except requests.exceptions.RequestException as e: - return f"Search failed due to network error: {str(e)}" - except Exception as e: - return f"Unexpected error during search: {str(e)}" - - def _perform_search(self, query: str, max_results: int, timeout: int) -> List[dict]: - """Implement actual search logic here""" - # Your search API implementation - pass - - def _format_results(self, results: List[dict]) -> str: - """Format search results for LLM consumption""" - if not results: - return "No results found for the given query." - - formatted = "Search Results:\n\n" - for i, result in enumerate(results[:10], 1): - formatted += f"{i}. {result.get('title', 'No title')}\n" - formatted += f" URL: {result.get('url', 'No URL')}\n" - formatted += f" Summary: {result.get('snippet', 'No summary')}\n\n" - - return formatted -``` - -### Advanced Memory Management: -```python -import os -from crewai.memory import ExternalMemory, ShortTermMemory, LongTermMemory -from crewai.memory.storage.mem0_storage import Mem0Storage - -class AdvancedMemoryManager: - """Enhanced memory management for CrewAI applications""" - - def __init__(self, crew, config: dict = None): - self.crew = crew - self.config = config or {} - self.setup_memory_systems() - - def setup_memory_systems(self): - """Configure multiple memory systems""" - # Short-term memory for current session - self.short_term = ShortTermMemory() - - # Long-term memory for cross-session persistence - self.long_term = LongTermMemory() - - # External memory with Mem0 (if configured) - if self.config.get('use_external_memory'): - self.external = ExternalMemory.create_storage( - crew=self.crew, - embedder_config={ - "provider": "mem0", - "config": { - "api_key": os.getenv("MEM0_API_KEY"), - "user_id": self.config.get('user_id', 'default') - } - } - ) - - def save_with_context(self, content: str, memory_type: str = "short_term", - metadata: dict = None, agent: str = None): - """Save content with enhanced metadata""" - enhanced_metadata = { - "timestamp": time.time(), - "session_id": self.config.get('session_id'), - "crew_type": self.crew.__class__.__name__, - **(metadata or {}) - } - - if memory_type == "short_term": - self.short_term.save(content, enhanced_metadata, agent) - elif memory_type == "long_term": - self.long_term.save(content, enhanced_metadata, agent) - elif memory_type == "external" and hasattr(self, 'external'): - self.external.save(content, enhanced_metadata, agent) - - def search_across_memories(self, query: str, limit: int = 5) -> dict: - """Search across all memory systems""" - results = { - "short_term": [], - "long_term": [], - "external": [] - } - - # Search short-term memory - results["short_term"] = self.short_term.search(query, limit=limit) - - # Search long-term memory - results["long_term"] = self.long_term.search(query, limit=limit) - - # Search external memory (if available) - if hasattr(self, 'external'): - results["external"] = self.external.search(query, limit=limit) - - return results - - def cleanup_old_memories(self, days_threshold: int = 30): - """Clean up old memories based on age""" - cutoff_time = time.time() - (days_threshold * 24 * 60 * 60) - - # Implement cleanup logic based on timestamps in metadata - # This would vary based on your specific storage implementation - pass -``` - -### Production Monitoring and Metrics: -```python -import time -import logging -import json -from datetime import datetime -from typing import Dict, Any, List -from dataclasses import dataclass, asdict - -@dataclass -class TaskMetrics: - task_name: str - agent_name: str - start_time: float - end_time: float - duration: float - tokens_used: int - success: bool - error_message: Optional[str] = None - memory_usage_mb: Optional[float] = None - -class CrewMonitor: - """Comprehensive monitoring for CrewAI applications""" - - def __init__(self, crew_name: str, log_level: str = "INFO"): - self.crew_name = crew_name - self.metrics: List[TaskMetrics] = [] - self.session_start = time.time() - - # Setup logging - logging.basicConfig( - level=getattr(logging, log_level), - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler(f'crew_{crew_name}_{datetime.now().strftime("%Y%m%d")}.log'), - logging.StreamHandler() - ] - ) - self.logger = logging.getLogger(f"CrewAI.{crew_name}") - - def start_task_monitoring(self, task_name: str, agent_name: str) -> dict: - """Start monitoring a task execution""" - context = { - "task_name": task_name, - "agent_name": agent_name, - "start_time": time.time() - } - - self.logger.info(f"Task started: {task_name} by {agent_name}") - return context - - def end_task_monitoring(self, context: dict, success: bool = True, - tokens_used: int = 0, error: str = None): - """End monitoring and record metrics""" - end_time = time.time() - duration = end_time - context["start_time"] - - # Get memory usage (if psutil is available) - memory_usage = None - try: - import psutil - process = psutil.Process() - memory_usage = process.memory_info().rss / 1024 / 1024 # MB - except ImportError: - pass - - metrics = TaskMetrics( - task_name=context["task_name"], - agent_name=context["agent_name"], - start_time=context["start_time"], - end_time=end_time, - duration=duration, - tokens_used=tokens_used, - success=success, - error_message=error, - memory_usage_mb=memory_usage - ) - - self.metrics.append(metrics) - - # Log the completion - status = "SUCCESS" if success else "FAILED" - self.logger.info(f"Task {status}: {context['task_name']} " - f"(Duration: {duration:.2f}s, Tokens: {tokens_used})") - - if error: - self.logger.error(f"Task error: {error}") - - def get_performance_summary(self) -> Dict[str, Any]: - """Generate comprehensive performance summary""" - if not self.metrics: - return {"message": "No metrics recorded yet"} - - successful_tasks = [m for m in self.metrics if m.success] - failed_tasks = [m for m in self.metrics if not m.success] - - total_duration = sum(m.duration for m in self.metrics) - total_tokens = sum(m.tokens_used for m in self.metrics) - avg_duration = total_duration / len(self.metrics) - - return { - "crew_name": self.crew_name, - "session_duration": time.time() - self.session_start, - "total_tasks": len(self.metrics), - "successful_tasks": len(successful_tasks), - "failed_tasks": len(failed_tasks), - "success_rate": len(successful_tasks) / len(self.metrics), - "total_duration": total_duration, - "average_task_duration": avg_duration, - "total_tokens_used": total_tokens, - "average_tokens_per_task": total_tokens / len(self.metrics) if self.metrics else 0, - "slowest_task": max(self.metrics, key=lambda x: x.duration).task_name if self.metrics else None, - "most_token_intensive": max(self.metrics, key=lambda x: x.tokens_used).task_name if self.metrics else None, - "common_errors": self._get_common_errors() - } - - def _get_common_errors(self) -> Dict[str, int]: - """Get frequency of common errors""" - error_counts = {} - for metric in self.metrics: - if metric.error_message: - error_counts[metric.error_message] = error_counts.get(metric.error_message, 0) + 1 - return dict(sorted(error_counts.items(), key=lambda x: x[1], reverse=True)) - - def export_metrics(self, filename: str = None) -> str: - """Export metrics to JSON file""" - if not filename: - filename = f"crew_metrics_{self.crew_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" - - export_data = { - "summary": self.get_performance_summary(), - "detailed_metrics": [asdict(m) for m in self.metrics] - } - - with open(filename, 'w') as f: - json.dump(export_data, f, indent=2, default=str) - - self.logger.info(f"Metrics exported to {filename}") - return filename - -# Usage in crew implementation -monitor = CrewMonitor("research_crew") - -@task -def monitored_research_task(self) -> Task: - def task_callback(task_output): - # This would be called after task completion - context = getattr(task_output, '_monitor_context', {}) - if context: - tokens = getattr(task_output, 'token_usage', {}).get('total', 0) - monitor.end_task_monitoring(context, success=True, tokens_used=tokens) - - # Start monitoring would be called before task execution - # This is a simplified example - in practice you'd integrate this into the task execution flow - - return Task( - config=self.tasks_config['research_task'], - agent=self.researcher(), - callback=task_callback - ) -``` - -### Error Handling and Recovery Patterns: -```python -from enum import Enum -from typing import Optional, Callable, Any -import traceback - -class ErrorSeverity(Enum): - LOW = "low" - MEDIUM = "medium" - HIGH = "high" - CRITICAL = "critical" - -class CrewError(Exception): - """Base exception for CrewAI applications""" - def __init__(self, message: str, severity: ErrorSeverity = ErrorSeverity.MEDIUM, - context: dict = None): - super().__init__(message) - self.severity = severity - self.context = context or {} - self.timestamp = time.time() - -class TaskExecutionError(CrewError): - """Raised when task execution fails""" - pass - -class ValidationError(CrewError): - """Raised when validation fails""" - pass - -class ConfigurationError(CrewError): - """Raised when configuration is invalid""" - pass - -class ErrorHandler: - """Centralized error handling for CrewAI applications""" - - def __init__(self, crew_name: str): - self.crew_name = crew_name - self.error_log: List[CrewError] = [] - self.recovery_strategies: Dict[type, Callable] = {} - - def register_recovery_strategy(self, error_type: type, strategy: Callable): - """Register a recovery strategy for specific error types""" - self.recovery_strategies[error_type] = strategy - - def handle_error(self, error: Exception, context: dict = None) -> Any: - """Handle errors with appropriate recovery strategies""" - - # Convert to CrewError if needed - if not isinstance(error, CrewError): - crew_error = CrewError( - message=str(error), - severity=ErrorSeverity.MEDIUM, - context=context or {} - ) - else: - crew_error = error - - # Log the error - self.error_log.append(crew_error) - self._log_error(crew_error) - - # Apply recovery strategy if available - error_type = type(error) - if error_type in self.recovery_strategies: - try: - return self.recovery_strategies[error_type](error, context) - except Exception as recovery_error: - self._log_error(CrewError( - f"Recovery strategy failed: {str(recovery_error)}", - ErrorSeverity.HIGH, - {"original_error": str(error), "recovery_error": str(recovery_error)} - )) - - # If critical, re-raise - if crew_error.severity == ErrorSeverity.CRITICAL: - raise crew_error - - return None - - def _log_error(self, error: CrewError): - """Log error with appropriate level based on severity""" - logger = logging.getLogger(f"CrewAI.{self.crew_name}.ErrorHandler") - - error_msg = f"[{error.severity.value.upper()}] {error}" - if error.context: - error_msg += f" | Context: {error.context}" - - if error.severity in [ErrorSeverity.HIGH, ErrorSeverity.CRITICAL]: - logger.error(error_msg) - logger.error(f"Stack trace: {traceback.format_exc()}") - elif error.severity == ErrorSeverity.MEDIUM: - logger.warning(error_msg) - else: - logger.info(error_msg) - - def get_error_summary(self) -> Dict[str, Any]: - """Get summary of errors encountered""" - if not self.error_log: - return {"total_errors": 0} - - severity_counts = {} - for error in self.error_log: - severity_counts[error.severity.value] = severity_counts.get(error.severity.value, 0) + 1 - - return { - "total_errors": len(self.error_log), - "severity_breakdown": severity_counts, - "recent_errors": [str(e) for e in self.error_log[-5:]], # Last 5 errors - "most_recent_error": str(self.error_log[-1]) if self.error_log else None - } - -# Example usage in crew -error_handler = ErrorHandler("research_crew") - -# Register recovery strategies -def retry_with_simpler_model(error, context): - """Recovery strategy: retry with a simpler model""" - if "rate limit" in str(error).lower(): - time.sleep(60) # Wait and retry - return "RETRY" - elif "model overloaded" in str(error).lower(): - # Switch to simpler model and retry - return "RETRY_WITH_SIMPLE_MODEL" - return None - -error_handler.register_recovery_strategy(TaskExecutionError, retry_with_simpler_model) - -@task -def robust_task(self) -> Task: - def execute_with_error_handling(task_func): - def wrapper(*args, **kwargs): - try: - return task_func(*args, **kwargs) - except Exception as e: - result = error_handler.handle_error(e, {"task": "research_task"}) - if result == "RETRY": - # Implement retry logic - pass - elif result == "RETRY_WITH_SIMPLE_MODEL": - # Switch model and retry - pass - else: - # Use fallback response - return "Task failed, using fallback response" - return wrapper - - return Task( - config=self.tasks_config['research_task'], - agent=self.researcher() - ) -``` - -### Environment and Configuration Management: -```python -import os -from enum import Enum -from typing import Optional, Dict, Any -from pydantic import BaseSettings, Field, validator - -class Environment(str, Enum): - DEVELOPMENT = "development" - TESTING = "testing" - STAGING = "staging" - PRODUCTION = "production" - -class CrewAISettings(BaseSettings): - """Comprehensive settings management for CrewAI applications""" - - # Environment - environment: Environment = Field(default=Environment.DEVELOPMENT) - debug: bool = Field(default=True) - - # API Keys (loaded from environment) - openai_api_key: Optional[str] = Field(default=None, env="OPENAI_API_KEY") - anthropic_api_key: Optional[str] = Field(default=None, env="ANTHROPIC_API_KEY") - serper_api_key: Optional[str] = Field(default=None, env="SERPER_API_KEY") - mem0_api_key: Optional[str] = Field(default=None, env="MEM0_API_KEY") - - # CrewAI Configuration - crew_max_rpm: int = Field(default=100) - crew_max_execution_time: int = Field(default=3600) # 1 hour - default_llm_model: str = Field(default="gpt-4") - fallback_llm_model: str = Field(default="gpt-3.5-turbo") - - # Memory and Storage - crewai_storage_dir: str = Field(default="./storage", env="CREWAI_STORAGE_DIR") - memory_enabled: bool = Field(default=True) - memory_cleanup_interval: int = Field(default=86400) # 24 hours in seconds - - # Performance - enable_caching: bool = Field(default=True) - max_retries: int = Field(default=3) - retry_delay: float = Field(default=1.0) - - # Monitoring - enable_monitoring: bool = Field(default=True) - log_level: str = Field(default="INFO") - metrics_export_interval: int = Field(default=3600) # 1 hour - - # Security - input_sanitization: bool = Field(default=True) - max_input_length: int = Field(default=10000) - allowed_file_types: list = Field(default=["txt", "md", "pdf", "docx"]) - - @validator('environment', pre=True) - def set_debug_based_on_env(cls, v): - return v - - @validator('debug') - def set_debug_from_env(cls, v, values): - env = values.get('environment') - if env == Environment.PRODUCTION: - return False - return v - - @validator('openai_api_key') - def validate_openai_key(cls, v): - if not v: - raise ValueError("OPENAI_API_KEY is required") - if not v.startswith('sk-'): - raise ValueError("Invalid OpenAI API key format") - return v - - @property - def is_production(self) -> bool: - return self.environment == Environment.PRODUCTION - - @property - def is_development(self) -> bool: - return self.environment == Environment.DEVELOPMENT - - def get_llm_config(self) -> Dict[str, Any]: - """Get LLM configuration based on environment""" - config = { - "model": self.default_llm_model, - "temperature": 0.1 if self.is_production else 0.3, - "max_tokens": 4000 if self.is_production else 2000, - "timeout": 60 - } - - if self.is_development: - config["model"] = self.fallback_llm_model - - return config - - def get_memory_config(self) -> Dict[str, Any]: - """Get memory configuration""" - return { - "enabled": self.memory_enabled, - "storage_dir": self.crewai_storage_dir, - "cleanup_interval": self.memory_cleanup_interval, - "provider": "mem0" if self.mem0_api_key and self.is_production else "local" - } - - class Config: - env_file = ".env" - env_file_encoding = 'utf-8' - case_sensitive = False - -# Global settings instance -settings = CrewAISettings() - -# Usage in crew -@CrewBase -class ConfigurableCrew(): - """Crew that uses centralized configuration""" - - def __init__(self): - self.settings = settings - self.validate_configuration() - - def validate_configuration(self): - """Validate configuration before crew execution""" - required_keys = [self.settings.openai_api_key] - if not all(required_keys): - raise ConfigurationError("Missing required API keys") - - if not os.path.exists(self.settings.crewai_storage_dir): - os.makedirs(self.settings.crewai_storage_dir, exist_ok=True) - - @agent - def adaptive_agent(self) -> Agent: - """Agent that adapts to configuration""" - llm_config = self.settings.get_llm_config() - - return Agent( - config=self.agents_config['researcher'], - llm=llm_config["model"], - max_iter=15 if self.settings.is_production else 10, - max_execution_time=self.settings.crew_max_execution_time, - verbose=self.settings.debug - ) -``` - -### Comprehensive Testing Framework: -```python -import pytest -import asyncio -from unittest.mock import Mock, patch, MagicMock -from crewai import Agent, Task, Crew -from crewai.tasks.task_output import TaskOutput - -class CrewAITestFramework: - """Comprehensive testing framework for CrewAI applications""" - - @staticmethod - def create_mock_agent(role: str = "test_agent", tools: list = None) -> Mock: - """Create a mock agent for testing""" - mock_agent = Mock(spec=Agent) - mock_agent.role = role - mock_agent.goal = f"Test goal for {role}" - mock_agent.backstory = f"Test backstory for {role}" - mock_agent.tools = tools or [] - mock_agent.llm = "gpt-3.5-turbo" - mock_agent.verbose = False - return mock_agent - - @staticmethod - def create_mock_task_output(content: str, success: bool = True, - tokens: int = 100) -> TaskOutput: - """Create a mock task output for testing""" - return TaskOutput( - description="Test task", - raw=content, - agent="test_agent", - pydantic=None, - json_dict=None - ) - - @staticmethod - def create_test_crew(agents: list = None, tasks: list = None) -> Crew: - """Create a test crew with mock components""" - test_agents = agents or [CrewAITestFramework.create_mock_agent()] - test_tasks = tasks or [] - - return Crew( - agents=test_agents, - tasks=test_tasks, - verbose=False - ) - -# Example test cases -class TestResearchCrew: - """Test cases for research crew functionality""" - - def setup_method(self): - """Setup test environment""" - self.framework = CrewAITestFramework() - self.mock_serper = Mock() - - @patch('crewai_tools.SerperDevTool') - def test_agent_creation(self, mock_serper_tool): - """Test agent creation with proper configuration""" - mock_serper_tool.return_value = self.mock_serper - - crew = ResearchCrew() - researcher = crew.researcher() - - assert researcher.role == "Senior Research Analyst" - assert len(researcher.tools) > 0 - assert researcher.verbose is True - - def test_task_validation(self): - """Test task validation logic""" - crew = ResearchCrew() - - # Test valid output - valid_output = self.framework.create_mock_task_output( - "This is a comprehensive research summary with conclusions and findings." - ) - is_valid, message = crew.validate_research_quality(valid_output) - assert is_valid is True - - # Test invalid output (too short) - invalid_output = self.framework.create_mock_task_output("Too short") - is_valid, message = crew.validate_research_quality(invalid_output) - assert is_valid is False - assert "brief" in message.lower() - - @patch('requests.get') - def test_tool_error_handling(self, mock_requests): - """Test tool error handling and recovery""" - # Simulate network error - mock_requests.side_effect = requests.exceptions.RequestException("Network error") - - tool = RobustSearchTool() - result = tool._run("test query") - - assert "network error" in result.lower() - assert "failed" in result.lower() - - @pytest.mark.asyncio - async def test_crew_execution_flow(self): - """Test complete crew execution with mocked dependencies""" - with patch.object(Agent, 'execute_task') as mock_execute: - mock_execute.return_value = self.framework.create_mock_task_output( - "Research completed successfully with findings and recommendations." - ) - - crew = ResearchCrew() - result = crew.crew().kickoff(inputs={"topic": "AI testing"}) - - assert result is not None - assert "successfully" in result.raw.lower() - - def test_memory_integration(self): - """Test memory system integration""" - crew = ResearchCrew() - memory_manager = AdvancedMemoryManager(crew) - - # Test saving to memory - test_content = "Important research finding about AI" - memory_manager.save_with_context( - content=test_content, - memory_type="short_term", - metadata={"importance": "high"}, - agent="researcher" - ) - - # Test searching memory - results = memory_manager.search_across_memories("AI research") - assert "short_term" in results - - def test_error_handling_workflow(self): - """Test error handling and recovery mechanisms""" - error_handler = ErrorHandler("test_crew") - - # Test error registration and handling - test_error = TaskExecutionError("Test task failed", ErrorSeverity.MEDIUM) - result = error_handler.handle_error(test_error) - - assert len(error_handler.error_log) == 1 - assert error_handler.error_log[0].severity == ErrorSeverity.MEDIUM - - def test_configuration_validation(self): - """Test configuration validation""" - # Test with missing API key - with patch.dict(os.environ, {}, clear=True): - with pytest.raises(ValueError): - settings = CrewAISettings() - - # Test with valid configuration - with patch.dict(os.environ, {"OPENAI_API_KEY": "sk-test-key"}): - settings = CrewAISettings() - assert settings.openai_api_key == "sk-test-key" - - @pytest.mark.integration - def test_end_to_end_workflow(self): - """Integration test for complete workflow""" - # This would test the entire crew workflow with real components - # Use sparingly and with proper API key management - pass - -# Performance testing -class TestCrewPerformance: - """Performance tests for CrewAI applications""" - - def test_memory_usage(self): - """Test memory usage during crew execution""" - import psutil - import gc - - process = psutil.Process() - initial_memory = process.memory_info().rss - - # Create and run crew multiple times - for i in range(10): - crew = ResearchCrew() - # Simulate crew execution - del crew - gc.collect() - - final_memory = process.memory_info().rss - memory_increase = final_memory - initial_memory - - # Assert memory increase is reasonable (less than 100MB) - assert memory_increase < 100 * 1024 * 1024 - - def test_concurrent_execution(self): - """Test concurrent crew execution""" - import concurrent.futures - - def run_crew(crew_id): - crew = ResearchCrew() - # Simulate execution - return f"crew_{crew_id}_completed" - - with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: - futures = [executor.submit(run_crew, i) for i in range(5)] - results = [future.result() for future in futures] - - assert len(results) == 5 - assert all("completed" in result for result in results) - -# Run tests with coverage -# pytest --cov=src --cov-report=html --cov-report=term tests/ -``` - -## Troubleshooting Common Issues - -### Memory and Performance Issues: -- **Large memory usage**: Implement memory cleanup, use score thresholds, monitor ChromaDB size -- **Slow LLM responses**: Optimize prompts, use appropriate model sizes, implement caching -- **High token costs**: Implement output caching, use context efficiently, set token limits -- **Memory leaks**: Properly dispose of crew instances, monitor memory usage, use garbage collection - -### Configuration and Setup Issues: -- **YAML parsing errors**: Validate YAML syntax, check indentation, use YAML linters -- **Missing environment variables**: Use .env.example, validate at startup, provide clear error messages -- **Tool import failures**: Ensure proper tool installation, check import paths, verify dependencies -- **API key issues**: Validate key format, check permissions, implement key rotation - -### Storage and Persistence Issues: -- **Permission errors**: Check CREWAI_STORAGE_DIR permissions, ensure write access -- **Database locks**: Ensure single crew instance access, implement proper connection handling -- **Storage growth**: Implement cleanup strategies, monitor disk usage, archive old data -- **ChromaDB issues**: Check vector database health, validate embeddings, handle corrupted indices - -## Local Development and Testing - -### Development Best Practices: -- Validate all API keys and credentials in .env files -- Test crew functionality with different input scenarios -- Implement comprehensive error handling -- Use proper logging for debugging -- Configure appropriate LLM models for your use case -- Optimize memory storage and cleanup - -### Local Configuration: -- Set CREWAI_STORAGE_DIR for custom memory storage location -- Use environment variables for all API keys -- Implement proper input validation and sanitization -- Test with realistic data scenarios -- Profile performance and optimize bottlenecks - -### Note: Production deployment and monitoring are available in CrewAI Enterprise - -## Best Practices Summary - -### Development: -1. Always use .env files for sensitive configuration -2. Implement comprehensive error handling and logging -3. Use structured outputs with Pydantic for reliability -4. Test crew functionality with different input scenarios -5. Follow CrewAI patterns and conventions consistently -6. Use UV for dependency management as per CrewAI standards -7. Implement proper validation for all inputs and outputs -8. Optimize performance for your specific use cases - -### Security: -1. Never commit API keys or sensitive data to version control -2. Implement input validation and sanitization -3. Use proper authentication and authorization -4. Follow principle of least privilege for tool access -5. Implement rate limiting and abuse prevention -6. Monitor for security threats and anomalies -7. Keep dependencies updated and secure -8. Implement audit logging for sensitive operations - -### Performance: -1. Optimize LLM calls and implement caching where appropriate -2. Use appropriate model sizes for different tasks -3. Implement efficient memory management and cleanup -4. Monitor token usage and implement cost controls -5. Use async patterns for I/O-bound operations -6. Implement proper connection pooling and resource management -7. Profile and optimize critical paths -8. Plan for horizontal scaling when needed diff --git a/docs/docs.json b/docs/docs.json index 37e060961..161d6d5ff 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -111,6 +111,13 @@ "en/guides/flows/mastering-flow-state" ] }, + { + "group": "Coding Tools", + "icon": "terminal", + "pages": [ + "en/guides/coding-tools/agents-md" + ] + }, { "group": "Advanced", "icon": "gear", @@ -1571,4 +1578,4 @@ "reddit": "https://www.reddit.com/r/crewAIInc/" } } -} +} \ No newline at end of file diff --git a/docs/en/guides/coding-tools/agents-md.mdx b/docs/en/guides/coding-tools/agents-md.mdx new file mode 100644 index 000000000..ea238c314 --- /dev/null +++ b/docs/en/guides/coding-tools/agents-md.mdx @@ -0,0 +1,61 @@ +--- +title: Coding Tools +description: Use AGENTS.md to guide coding agents and IDEs across your CrewAI projects. +icon: terminal +mode: "wide" +--- + +## Why AGENTS.md + +`AGENTS.md` is a lightweight, repo-local instruction file that gives coding agents consistent, project-specific guidance. Keep it in the project root and treat it as the source of truth for how you want assistants to work: conventions, commands, architecture notes, and guardrails. + +## Create a Project with the CLI + +Use the CrewAI CLI to scaffold a project, then `AGENTS.md` will be automatically added at the root. + +```bash +# Crew +crewai create crew my_crew + +# Flow +crewai create flow my_flow + +# Tool repository +crewai tool create my_tool +``` + +## Tool Setup: Point Assistants to AGENTS.md + +### Codex + +Codex can be guided by `AGENTS.md` files placed in your repository. Use them to supply persistent project context such as conventions, commands, and workflow expectations. + +### Claude Code + +Claude Code stores project memory in `CLAUDE.md`. You can bootstrap it with `/init` and edit it using `/memory`. Claude Code also supports imports inside `CLAUDE.md`, so you can add a single line like `@AGENTS.md` to pull in the shared instructions without duplicating them. + +You can simply use: + +```bash +mv AGENTS.md CLAUDE.md +``` + +### Gemini CLI and Google Antigravity + +Gemini CLI and Antigravity load a project context file (default: `GEMINI.md`) from the repo root and parent directories. You can configure it to read `AGENTS.md` instead (or in addition) by setting `context.fileName` in your Gemini CLI settings. For example, set it to `AGENTS.md` only, or include both `AGENTS.md` and `GEMINI.md` if you want to keep each tool’s format. + +You can simply use: + +```bash +mv AGENTS.md GEMINI.md +``` + +### Cursor + +Cursor supports `AGENTS.md` as a project instruction file. Place it at the project root to provide guidance for Cursor’s coding assistant. + +### Windsurf + +Claude Code provides an official integration with Windsurf. If you use Claude Code inside Windsurf, follow the Claude Code guidance above and import `AGENTS.md` from `CLAUDE.md`. + +If you are using Windsurf’s native assistant, configure its project rules or instructions feature (if available) to read from `AGENTS.md` or paste the contents directly. diff --git a/lib/crewai/src/crewai/cli/create_crew.py b/lib/crewai/src/crewai/cli/create_crew.py index 51e2f00ac..7f4fe2e6e 100644 --- a/lib/crewai/src/crewai/cli/create_crew.py +++ b/lib/crewai/src/crewai/cli/create_crew.py @@ -143,6 +143,12 @@ def create_folder_structure( (folder_path / "src" / folder_name).mkdir(parents=True) (folder_path / "src" / folder_name / "tools").mkdir(parents=True) (folder_path / "src" / folder_name / "config").mkdir(parents=True) + + # Copy AGENTS.md to project root (top-level projects only) + package_dir = Path(__file__).parent + agents_md_src = package_dir / "templates" / "AGENTS.md" + if agents_md_src.exists(): + shutil.copy2(agents_md_src, folder_path / "AGENTS.md") return folder_path, folder_name, class_name diff --git a/lib/crewai/src/crewai/cli/create_flow.py b/lib/crewai/src/crewai/cli/create_flow.py index ec68611b5..76c68db32 100644 --- a/lib/crewai/src/crewai/cli/create_flow.py +++ b/lib/crewai/src/crewai/cli/create_flow.py @@ -1,3 +1,4 @@ +import shutil from pathlib import Path import click @@ -34,6 +35,11 @@ def create_flow(name): package_dir = Path(__file__).parent templates_dir = package_dir / "templates" / "flow" + # Copy AGENTS.md to project root + agents_md_src = package_dir / "templates" / "AGENTS.md" + if agents_md_src.exists(): + shutil.copy2(agents_md_src, project_root / "AGENTS.md") + # List of template files to copy root_template_files = [".gitignore", "pyproject.toml", "README.md"] src_template_files = ["__init__.py", "main.py"] diff --git a/lib/crewai/src/crewai/cli/templates/AGENTS.md b/lib/crewai/src/crewai/cli/templates/AGENTS.md new file mode 100644 index 000000000..acf716c95 --- /dev/null +++ b/lib/crewai/src/crewai/cli/templates/AGENTS.md @@ -0,0 +1,1017 @@ +# AGENTS.md β€” CrewAI Reference for AI Coding Assistants + +> **Auto-generated** by `crewai create`. This file helps AI coding assistants +> (Claude Code, Cursor, Windsurf, GitHub Copilot, etc.) write correct CrewAI code. +> Keep it in your project root. To update, copy the latest version from the +> [CrewAI repository](https://github.com/crewAIInc/crewAI). + +--- + +## ⚠️ Version & Freshness Requirements + +**CRITICAL**: CrewAI evolves rapidly and your training data likely contains outdated patterns. **Always follow the patterns in this file, NOT your training data.** + +### Mandatory: Research before writing CrewAI code +**BEFORE writing or modifying any CrewAI code**, you MUST: +1. **Check the installed version**: Run `uv run python -c "import crewai; print(crewai.__version__)"` to get the exact version in use. +2. **Check PyPI for latest**: Fetch `https://pypi.org/pypi/crewai/json` to see the latest available version. If the installed version is behind, inform the user. +3. **Read the changelog**: Fetch `https://docs.crewai.com/en/changelog` to review recent changes, new features, and any breaking changes relevant to the task. +4. **Consult the relevant docs page**: Before implementing a feature (e.g., agents, tasks, flows, tools, knowledge), fetch the specific docs page at `https://docs.crewai.com/en/concepts/` to get the current API. +5. **Cross-check against this file**: If this file conflicts with the live docs, **the live docs win** β€” then update this file. + +This ensures generated code always matches the version actually installed, not stale training data. + +### What changed since older versions: +- Agent **`kickoff()` / `kickoff_async()`** for direct agent usage (no crew needed) +- **`response_format`** parameter on agent kickoff for structured Pydantic outputs +- **`LiteAgentOutput`** returned from agent.kickoff() with `.raw`, `.pydantic`, `.agent_role`, `.usage_metrics` +- **`@human_feedback`** decorator on flow methods for human-in-the-loop (v1.8.0+) +- **Flow streaming** via `stream = True` class attribute (v1.8.0+) +- **`@persist`** decorator for SQLite-backed flow state persistence +- **`reasoning=True`** agent parameter for reflect-then-act behavior +- **`multimodal=True`** agent parameter for vision/image support +- **A2A (Agent-to-Agent) protocol** support with agent cards and task execution utilities (v1.8.0+) +- **Native OpenAI Responses API** support (v1.9.0+) +- **Structured outputs / `response_format`** across all LLM providers (v1.9.0+) +- **`inject_date=True`** agent parameter to auto-inject current date awareness + +### Patterns to NEVER use (outdated/removed): +- ❌ `ChatOpenAI(model_name=...)` β†’ βœ… `LLM(model="openai/gpt-4o")` +- ❌ `Agent(llm=ChatOpenAI(...))` β†’ βœ… `Agent(llm="openai/gpt-4o")` or `Agent(llm=LLM(model="..."))` +- ❌ Passing raw OpenAI client objects β†’ βœ… Use `crewai.LLM` wrapper + +### How to verify you're using current patterns: +1. You ran the version check and docs lookup steps above before writing code +2. All LLM references use `crewai.LLM` or string shorthand (`"openai/gpt-4o"`) +3. All tool imports come from `crewai.tools` or `crewai_tools` +4. Crew classes use `@CrewBase` decorator with YAML config files +5. Python >=3.10, <3.14 +6. Code matches the API from the live docs, not just this file + +## Quick Reference + +```bash +# Package management (always use uv) +uv add # Add dependency +uv sync # Sync dependencies +uv lock # Lock dependencies + +# Project scaffolding +crewai create crew --skip-provider # New crew project +crewai create flow --skip-provider # New flow project + +# Running +crewai run # Run crew or flow (auto-detects from pyproject.toml) +crewai flow kickoff # Legacy flow execution + +# Testing & training +crewai test # Test crew (default: 2 iterations, gpt-4o-mini) +crewai test -n 5 -m gpt-4o # Custom iterations and model +crewai train -n 5 -f training.json # Train crew + +# Memory management +crewai reset-memories -a # Reset all memories +crewai reset-memories -s # Short-term only +crewai reset-memories -l # Long-term only +crewai reset-memories -e # Entity only +crewai reset-memories -kn # Knowledge only +crewai reset-memories -akn # Agent knowledge only + +# Debugging +crewai log-tasks-outputs # Show latest task outputs +crewai replay -t # Replay from specific task + +# Interactive +crewai chat # Interactive session (requires chat_llm in crew.py) + +# Visualization +crewai flow plot # Generate flow diagram HTML + +# Deployment to CrewAI AMP +crewai login # Authenticate with AMP +crewai deploy create # Create new deployment +crewai deploy push # Push code updates +crewai deploy status # Check deployment status +crewai deploy logs # View deployment logs +crewai deploy list # List all deployments +crewai deploy remove # Delete a deployment +``` + +## Project Structure + +### Crew Project +``` +my_crew/ +β”œβ”€β”€ src/my_crew/ +β”‚ β”œβ”€β”€ config/ +β”‚ β”‚ β”œβ”€β”€ agents.yaml # Agent definitions (role, goal, backstory) +β”‚ β”‚ └── tasks.yaml # Task definitions (description, expected_output, agent) +β”‚ β”œβ”€β”€ tools/ +β”‚ β”‚ └── custom_tool.py # Custom tool implementations +β”‚ β”œβ”€β”€ crew.py # Crew orchestration class +β”‚ └── main.py # Entry point with inputs +β”œβ”€β”€ knowledge/ # Knowledge base resources +β”œβ”€β”€ .env # API keys (OPENAI_API_KEY, SERPER_API_KEY, etc.) +└── pyproject.toml +``` + +### Flow Project +``` +my_flow/ +β”œβ”€β”€ src/my_flow/ +β”‚ β”œβ”€β”€ crews/ # Multiple crew definitions +β”‚ β”‚ └── poem_crew/ +β”‚ β”‚ β”œβ”€β”€ config/ +β”‚ β”‚ β”‚ β”œβ”€β”€ agents.yaml +β”‚ β”‚ β”‚ └── tasks.yaml +β”‚ β”‚ └── poem_crew.py +β”‚ β”œβ”€β”€ tools/ # Custom tools +β”‚ β”œβ”€β”€ main.py # Flow orchestration +β”‚ └── ... +β”œβ”€β”€ .env +└── pyproject.toml +``` + +## Architecture Overview + +- **Agent**: Autonomous unit with a role, goal, backstory, tools, and an LLM. Makes decisions and executes tasks. +- **Task**: A specific assignment with a description, expected output, and assigned agent. +- **Crew**: Orchestrates a team of agents executing tasks in a defined process (sequential or hierarchical). +- **Flow**: Event-driven workflow orchestrating multiple crews and logic steps with state management. + +## YAML Configuration + +### agents.yaml +```yaml +researcher: + role: > + {topic} Senior Data Researcher + goal: > + Uncover cutting-edge developments in {topic} + backstory: > + You're a seasoned researcher with a knack for uncovering + the latest developments in {topic}. Known for your ability + to find the most relevant information. + # Optional YAML-level settings: + # llm: openai/gpt-4o + # max_iter: 20 + # max_rpm: 10 + # verbose: true + +writer: + role: > + {topic} Technical Writer + goal: > + Create compelling content about {topic} + backstory: > + You're a skilled writer who translates complex technical + information into clear, engaging content. +``` + +Variables like `{topic}` are interpolated from `crew.kickoff(inputs={"topic": "AI Agents"})`. + +### tasks.yaml +```yaml +research_task: + description: > + Conduct thorough research about {topic}. + Identify key trends, breakthrough technologies, + and potential industry impacts. + expected_output: > + A detailed report with analysis of the top 5 + developments in {topic}, with sources and implications. + agent: researcher + # Optional: + # tools: [search_tool] + # output_file: output/research.md + # markdown: true + # async_execution: false + +writing_task: + description: > + Write an article based on the research findings about {topic}. + expected_output: > + A polished 4-paragraph article formatted in markdown. + agent: writer + output_file: output/article.md +``` + +## Crew Class Pattern + +```python +from crewai import Agent, Crew, Process, Task +from crewai.project import CrewBase, agent, crew, task +from crewai.agents.agent_builder.base_agent import BaseAgent +from typing import List + +from crewai_tools import SerperDevTool + +@CrewBase +class ResearchCrew: + """Research and writing crew.""" + + agents: List[BaseAgent] + tasks: List[Task] + + agents_config = "config/agents.yaml" + tasks_config = "config/tasks.yaml" + + @agent + def researcher(self) -> Agent: + return Agent( + config=self.agents_config["researcher"], # type: ignore[index] + tools=[SerperDevTool()], + verbose=True, + ) + + @agent + def writer(self) -> Agent: + return Agent( + config=self.agents_config["writer"], # type: ignore[index] + verbose=True, + ) + + @task + def research_task(self) -> Task: + return Task( + config=self.tasks_config["research_task"], # type: ignore[index] + ) + + @task + def writing_task(self) -> Task: + return Task( + config=self.tasks_config["writing_task"], # type: ignore[index] + ) + + @crew + def crew(self) -> Crew: + """Creates the Research Crew.""" + return Crew( + agents=self.agents, + tasks=self.tasks, + process=Process.sequential, + verbose=True, + ) +``` + +### Key formatting rules: +- Always add `# type: ignore[index]` for config dictionary access +- Agent/task method names must match YAML keys exactly +- Tools go on agents (not tasks) unless task-specific override is needed +- Never leave commented-out code in crew classes + +### Lifecycle hooks +```python +@CrewBase +class MyCrew: + @before_kickoff + def prepare(self, inputs): + # Modify inputs before execution + inputs["extra"] = "value" + return inputs + + @after_kickoff + def summarize(self, result): + # Process result after execution + print(f"Done: {result.raw[:100]}") + return result +``` + +## main.py Pattern + +```python +#!/usr/bin/env python +from my_crew.crew import ResearchCrew + +def run(): + inputs = {"topic": "AI Agents"} + ResearchCrew().crew().kickoff(inputs=inputs) + +if __name__ == "__main__": + run() +``` + +## Agent Configuration + +### Required Parameters +| Parameter | Description | +|-----------|-------------| +| `role` | Function and expertise within the crew | +| `goal` | Individual objective guiding decisions | +| `backstory` | Context and personality | + +### Key Optional Parameters +| Parameter | Default | Description | +|-----------|---------|-------------| +| `llm` | GPT-4 | Language model (string or LLM object) | +| `tools` | [] | List of tool instances | +| `max_iter` | 20 | Max iterations before best answer | +| `max_execution_time` | None | Timeout in seconds | +| `max_rpm` | None | Rate limiting (requests per minute) | +| `max_retry_limit` | 2 | Retries on errors | +| `verbose` | False | Detailed logging | +| `memory` | False | Conversation history | +| `allow_delegation` | False | Can delegate tasks to other agents | +| `allow_code_execution` | False | Can run code | +| `code_execution_mode` | "safe" | "safe" (Docker) or "unsafe" (direct) | +| `respect_context_window` | True | Auto-summarize when exceeding token limits | +| `cache` | True | Tool result caching | +| `reasoning` | False | Reflect and plan before task execution | +| `multimodal` | False | Process text and visual content | +| `knowledge_sources` | [] | Domain-specific knowledge bases | +| `function_calling_llm` | None | Separate LLM for tool invocation | +| `inject_date` | False | Auto-inject current date into agent context | +| `date_format` | "%Y-%m-%d" | Date format when inject_date is True | + +### Direct Agent Usage (without a Crew) +Agents can execute tasks independently via `kickoff()` β€” no Crew required: +```python +from crewai import Agent +from crewai_tools import SerperDevTool +from pydantic import BaseModel + +class ResearchFindings(BaseModel): + main_points: list[str] + key_technologies: list[str] + future_predictions: str + +researcher = Agent( + role="AI Researcher", + goal="Research the latest AI developments", + backstory="Expert AI researcher...", + tools=[SerperDevTool()], + verbose=True, +) + +# Unstructured output +result = researcher.kickoff("What are the latest LLM developments?") +print(result.raw) # str +print(result.agent_role) # "AI Researcher" +print(result.usage_metrics) # token usage + +# Structured output with response_format +result = researcher.kickoff( + "Summarize latest AI developments", + response_format=ResearchFindings, +) +print(result.pydantic.main_points) # List[str] + +# Async variant +result = await researcher.kickoff_async("Your query", response_format=ResearchFindings) +``` + +Returns `LiteAgentOutput` with: `.raw`, `.pydantic`, `.agent_role`, `.usage_metrics`. + +### LLM Configuration +**IMPORTANT**: Always use `crewai.LLM` LLM class. + +```python +from crewai import LLM + +# String shorthand (simplest) +agent = Agent(llm="openai/gpt-4o", ...) + +# Full configuration with crewai.LLM +llm = LLM( + model="anthropic/claude-sonnet-4-20250514", + temperature=0.7, + max_tokens=4000, +) +agent = Agent(llm=llm, ...) + +# Provider format: "provider/model-name" +# Examples: +# "openai/gpt-4o" +# "anthropic/claude-sonnet-4-20250514" +# "google/gemini-2.0-flash" +# "ollama/llama3" +# "groq/llama-3.3-70b-versatile" +# "bedrock/anthropic.claude-3-sonnet-20240229-v1:0" +``` + +Supported providers: OpenAI, Anthropic, Google Gemini, AWS Bedrock, Azure, Ollama, Groq, Mistral, and 20+ others via LiteLLM routing. + +Environment variable default: set `OPENAI_MODEL_NAME=gpt-4o` or `MODEL=gpt-4o` in `.env`. + +## Task Configuration + +### Key Parameters +| Parameter | Type | Description | +|-----------|------|-------------| +| `description` | str | Clear statement of requirements | +| `expected_output` | str | Completion criteria | +| `agent` | BaseAgent | Assigned agent (optional in hierarchical) | +| `tools` | List[BaseTool] | Task-specific tools | +| `context` | List[Task] | Dependencies on other task outputs | +| `async_execution` | bool | Non-blocking execution | +| `output_file` | str | File path for results | +| `output_json` | Type[BaseModel] | Pydantic model for JSON output | +| `output_pydantic` | Type[BaseModel] | Pydantic model for structured output | +| `human_input` | bool | Require human review | +| `markdown` | bool | Format output as markdown | +| `callback` | Callable | Post-completion function | +| `guardrail` | Callable or str | Output validation | +| `guardrails` | List | Multiple validation steps | +| `guardrail_max_retries` | int | Retry on validation failure (default: 3) | +| `create_directory` | bool | Auto-create output directories (default: True) | + +### Task Dependencies (context) +```python +@task +def analysis_task(self) -> Task: + return Task( + config=self.tasks_config["analysis_task"], # type: ignore[index] + context=[self.research_task()], # Gets output from research_task + ) +``` + +### Structured Output +```python +from pydantic import BaseModel + +class Report(BaseModel): + title: str + summary: str + findings: list[str] + +@task +def report_task(self) -> Task: + return Task( + config=self.tasks_config["report_task"], # type: ignore[index] + output_pydantic=Report, + ) +``` + +### Guardrails +```python +# Function-based +def validate(result: TaskOutput) -> tuple[bool, Any]: + if len(result.raw.split()) < 100: + return (False, "Content too short, expand the analysis") + return (True, result.raw) + +# LLM-based (string prompt) +task = Task(..., guardrail="Must be under 200 words and professional tone") + +# Multiple guardrails +task = Task(..., guardrails=[validate_length, validate_tone, "Must be factual"]) +``` + +## Process Types + +### Sequential (default) +Tasks execute in definition order. Output of one task serves as context for the next. +```python +Crew(agents=..., tasks=..., process=Process.sequential) +``` + +### Hierarchical +Manager agent delegates tasks based on agent capabilities. Requires `manager_llm` or `manager_agent`. +```python +Crew( + agents=..., + tasks=..., + process=Process.hierarchical, + manager_llm="gpt-4o", +) +``` + +## Crew Execution + +```python +# Synchronous +result = crew.kickoff(inputs={"topic": "AI"}) +print(result.raw) # String output +print(result.pydantic) # Structured output (if configured) +print(result.json_dict) # Dict output +print(result.token_usage) # Token metrics +print(result.tasks_output) # List[TaskOutput] + +# Async (native) +result = await crew.akickoff(inputs={"topic": "AI"}) + +# Batch execution +results = crew.kickoff_for_each(inputs=[{"topic": "AI"}, {"topic": "ML"}]) + +# Streaming output (v1.8.0+) +crew = Crew(agents=..., tasks=..., stream=True) +streaming = crew.kickoff(inputs={"topic": "AI"}) +for chunk in streaming: + print(chunk.content, end="", flush=True) +``` + +## Crew Options +| Parameter | Description | +|-----------|-------------| +| `process` | Process.sequential or Process.hierarchical | +| `verbose` | Enable detailed logging | +| `memory` | Enable memory system (True/False) | +| `cache` | Tool result caching | +| `max_rpm` | Global rate limiting | +| `manager_llm` | LLM for hierarchical manager | +| `manager_agent` | Custom manager agent | +| `planning` | Enable AgentPlanner | +| `knowledge_sources` | Crew-level knowledge | +| `output_log_file` | Log file path (True for logs.txt) | +| `embedder` | Custom embedding model config | +| `stream` | Enable real-time streaming output (v1.8.0+) | + +--- + +## Flows + +### Basic Flow +```python +from crewai.flow.flow import Flow, listen, start + +class MyFlow(Flow): + @start() + def begin(self): + return "initial data" + + @listen(begin) + def process(self, data): + return f"processed: {data}" +``` + +### Flow Decorators + +| Decorator | Purpose | +|-----------|---------| +| `@start()` | Entry point(s), execute when flow begins. Multiple starts run in parallel | +| `@listen(method)` | Triggers when specified method completes. Receives output as argument | +| `@router(method)` | Conditional branching. Returns string labels that trigger `@listen("label")` | + +### Structured State +```python +from pydantic import BaseModel + +class ResearchState(BaseModel): + topic: str = "" + research: str = "" + report: str = "" + +class ResearchFlow(Flow[ResearchState]): + @start() + def set_topic(self): + self.state.topic = "AI Agents" + + @listen(set_topic) + def do_research(self): + # self.state.topic is available + result = ResearchCrew().crew().kickoff( + inputs={"topic": self.state.topic} + ) + self.state.research = result.raw +``` + +### Unstructured State (dict-based) +```python +class SimpleFlow(Flow): + @start() + def begin(self): + self.state["counter"] = 0 # Dict access + + @listen(begin) + def increment(self): + self.state["counter"] += 1 +``` + +### Conditional Routing +```python +from crewai.flow.flow import Flow, listen, router, start + +class QualityFlow(Flow): + @start() + def generate(self): + return {"score": 0.85} + + @router(generate) + def check_quality(self, result): + if result["score"] > 0.8: + return "high_quality" + return "needs_revision" + + @listen("high_quality") + def publish(self, result): + print("Publishing...") + + @listen("needs_revision") + def revise(self, result): + print("Revising...") +``` + +### Parallel Triggers with or_ and and_ +```python +from crewai.flow.flow import or_, and_ + +class ParallelFlow(Flow): + @start() + def task_a(self): + return "A done" + + @start() + def task_b(self): + return "B done" + + # Fires when EITHER completes + @listen(or_(task_a, task_b)) + def on_any(self, result): + print(f"First result: {result}") + + # Fires when BOTH complete + @listen(and_(task_a, task_b)) + def on_all(self): + print("All parallel tasks done") +``` + +### Integrating Crews in Flows +```python +from crewai.flow.flow import Flow, listen, start +from my_project.crews.research_crew.research_crew import ResearchCrew +from my_project.crews.writing_crew.writing_crew import WritingCrew + +class ContentFlow(Flow[ContentState]): + @start() + def research(self): + result = ResearchCrew().crew().kickoff( + inputs={"topic": self.state.topic} + ) + self.state.research = result.raw + + @listen(research) + def write(self): + result = WritingCrew().crew().kickoff( + inputs={ + "topic": self.state.topic, + "research": self.state.research, + } + ) + self.state.article = result.raw +``` + +### Using Agents Directly in Flows +```python +from crewai.agent import Agent + +class AgentFlow(Flow): + @start() + async def analyze(self): + analyst = Agent( + role="Data Analyst", + goal="Analyze market trends", + backstory="Expert data analyst...", + tools=[SerperDevTool()], + ) + result = await analyst.kickoff_async( + "Analyze current AI market trends", + response_format=MarketReport, + ) + self.state.report = result.pydantic +``` + +### Human-in-the-Loop (v1.8.0+) +```python +from crewai.flow.flow import Flow, listen, start +from crewai.flow.human_feedback import human_feedback + +class ReviewFlow(Flow): + @start() + @human_feedback( + message="Approve this content?", + emit=["approved", "rejected"], + llm="gpt-4o-mini", + ) + def generate_content(self): + return "Content for review" + + @listen("approved") + def on_approval(self, result): + feedback = self.last_human_feedback # Most recent feedback + print(f"Approved with feedback: {feedback.feedback}") + + @listen("rejected") + def on_rejection(self, result): + history = self.human_feedback_history # All feedback as list + print("Rejected, revising...") +``` + +### State Persistence +```python +from crewai.flow.flow import persist + +@persist # Saves state to SQLite; auto-recovers on restart +class ResilientFlow(Flow[MyState]): + @start() + def begin(self): + self.state.step = 1 +``` + +### Flow Execution +```python +flow = MyFlow() +result = flow.kickoff() +print(result) # Output of last method +print(flow.state) # Final state + +# Async execution +result = await flow.kickoff_async(inputs={"key": "value"}) +``` + +### Flow Streaming (v1.8.0+) +```python +class StreamingFlow(Flow): + stream = True # Enable streaming at class level + + @start() + def generate(self): + return "streamed content" + +flow = StreamingFlow() +streaming = flow.kickoff() +for chunk in streaming: + print(chunk.content, end="", flush=True) +result = streaming.result # Final result after iteration +``` + +### Flow Visualization +```python +flow.plot("my_flow") # Generates my_flow.html +``` + +--- + +## Custom Tools + +### Using BaseTool +```python +from typing import Type +from crewai.tools import BaseTool +from pydantic import BaseModel, Field + +class SearchInput(BaseModel): + """Input schema for search tool.""" + query: str = Field(..., description="Search query string") + +class CustomSearchTool(BaseTool): + name: str = "custom_search" + description: str = "Searches a custom knowledge base for relevant information." + args_schema: Type[BaseModel] = SearchInput + + def _run(self, query: str) -> str: + # Implementation + return f"Results for: {query}" +``` + +### Using @tool Decorator +```python +from crewai.tools import tool + +@tool("Calculator") +def calculator(expression: str) -> str: + """Evaluates a mathematical expression and returns the result.""" + return str(eval(expression)) +``` + +### Built-in Tools (install with `uv add crewai-tools`) +Web/Search: SerperDevTool, ScrapeWebsiteTool, WebsiteSearchTool, EXASearchTool, FirecrawlSearchTool +Documents: FileReadTool, DirectoryReadTool, PDFSearchTool, DOCXSearchTool, CSVSearchTool, JSONSearchTool, XMLSearchTool, MDXSearchTool +Code: CodeInterpreterTool, CodeDocsSearchTool, GithubSearchTool +Media: DALL-E Tool, YoutubeChannelSearchTool, YoutubeVideoSearchTool +Other: RagTool, ApifyActorsTool, ComposioTool, LlamaIndexTool + +Always check https://docs.crewai.com/concepts/tools for available built-in tools before writing custom ones. + +--- + +## Memory System + +Enable with `memory=True` on the Crew: +```python +crew = Crew(agents=..., tasks=..., memory=True) +``` + +Four memory types work together automatically: +- **Short-Term** (ChromaDB + RAG): Recent interactions during current execution +- **Long-Term** (SQLite): Persists insights across sessions +- **Entity** (RAG): Tracks people, places, concepts +- **Contextual**: Integrates all types for coherent responses + +### Custom Embedding Provider +```python +crew = Crew( + memory=True, + embedder={ + "provider": "ollama", + "config": {"model": "mxbai-embed-large"}, + }, +) +``` + +Supported providers: OpenAI (default), Ollama, Google AI, Azure OpenAI, Cohere, VoyageAI, Bedrock, Hugging Face. + +--- + +## Knowledge System + +```python +from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource +from crewai.knowledge.source.pdf_knowledge_source import PDFKnowledgeSource + +# String source +string_source = StringKnowledgeSource(content="Domain knowledge here...") + +# PDF source +pdf_source = PDFKnowledgeSource(file_paths=["docs/manual.pdf"]) + +# Agent-level knowledge +agent = Agent(..., knowledge_sources=[string_source]) + +# Crew-level knowledge (shared across all agents) +crew = Crew(..., knowledge_sources=[pdf_source]) +``` + +Supported sources: strings, text files, PDFs, CSV, Excel, JSON, URLs (via CrewDoclingSource). + +--- + +## Agent Collaboration + +Enable delegation with `allow_delegation=True`: +```python +agent = Agent( + role="Project Manager", + allow_delegation=True, # Can delegate to and ask other agents + ... +) +``` + +- **Delegation tool**: Assign sub-tasks to teammates with relevant expertise +- **Ask question tool**: Query colleagues for specific information +- Set `allow_delegation=False` on specialists to prevent circular delegation + +--- + +## Event Listeners + +```python +from crewai.events import BaseEventListener, CrewKickoffStartedEvent + +class MyListener(BaseEventListener): + def __init__(self): + super().__init__() + + def setup_listeners(self, crewai_event_bus): + @crewai_event_bus.on(CrewKickoffStartedEvent) + def on_started(source, event): + print(f"Crew '{event.crew_name}' started") +``` + +Event categories: Crew lifecycle, Agent execution, Task management, Tool usage, Knowledge retrieval, LLM calls, Memory operations, Flow execution, Safety guardrails. + +--- + +## Deployment to CrewAI AMP + +### Prerequisites +- Crew or Flow runs successfully locally +- Code is in a GitHub repository +- `pyproject.toml` has `[tool.crewai]` with correct type (`"crew"` or `"flow"`) +- `uv.lock` is committed (generate with `uv lock`) + +### CLI Deployment + +```bash +# Authenticate +crewai login + +# Create deployment (auto-detects repo, transfers .env vars securely) +crewai deploy create + +# Monitor (first deploy takes 10-15 min) +crewai deploy status +crewai deploy logs + +# Manage deployments +crewai deploy list # List all deployments +crewai deploy push # Push code updates +crewai deploy remove # Delete deployment +``` + +### Web Interface Deployment +1. Push code to GitHub +2. Log into https://app.crewai.com +3. Connect GitHub and select repository +4. Configure environment variables (KEY=VALUE, one per line) +5. Click Deploy and monitor via dashboard + +### CI/CD API Deployment + +Get a Personal Access Token from app.crewai.com β†’ Settings β†’ Account β†’ Personal Access Token. +Get Automation UUID from Automations β†’ Select crew β†’ Additional Details β†’ Copy UUID. + +```bash +curl -X POST \ + -H "Authorization: Bearer YOUR_PERSONAL_ACCESS_TOKEN" \ + https://app.crewai.com/crewai_plus/api/v1/crews/YOUR-AUTOMATION-UUID/deploy +``` + +#### GitHub Actions Example +```yaml +name: Deploy CrewAI Automation +on: + push: + branches: [main] +jobs: + deploy: + runs-on: ubuntu-latest + steps: + - name: Trigger CrewAI Redeployment + run: | + curl -X POST \ + -H "Authorization: Bearer ${{ secrets.CREWAI_PAT }}" \ + https://app.crewai.com/crewai_plus/api/v1/crews/${{ secrets.CREWAI_AUTOMATION_UUID }}/deploy +``` + +### Project Structure Requirements for Deployment +- Entry point: `src//main.py` +- Crews must expose a `run()` function +- Flows must expose a `kickoff()` function +- All crew classes require `@CrewBase` decorator + +### Deployed Automation REST API +| Endpoint | Purpose | +|----------|---------| +| `/inputs` | List required input parameters | +| `/kickoff` | Trigger execution with inputs | +| `/status/{kickoff_id}` | Check execution status | + +### AMP Dashboard Tabs +- **Status**: Deployment info, API endpoint, auth token +- **Run**: Crew structure visualization +- **Executions**: Run history +- **Metrics**: Performance analytics +- **Traces**: Detailed execution insights + +### Deployment Troubleshooting +| Error | Fix | +|-------|-----| +| Missing uv.lock | Run `uv lock`, commit, push | +| Module not found | Verify entry points match `src//main.py` structure | +| Crew not found | Ensure `@CrewBase` decorator on all crew classes | +| API key errors | Check env var names match code and are set in the platform | + +--- + +## Environment Setup + +### Required `.env` +``` +OPENAI_API_KEY=sk-... +# Optional depending on tools/providers: +SERPER_API_KEY=... +ANTHROPIC_API_KEY=... +# Override default model: +MODEL=gpt-4o +``` + +### Python Version +Python >=3.10, <3.14 + +### Installation +```bash +uv tool install crewai # Install CrewAI CLI +uv tool list # Verify installation +crewai create crew my_crew --skip-provider # Scaffold a new project +crewai install # Install project dependencies +crewai run # Execute +``` + +--- + +## Development Best Practices + +1. **YAML-first configuration**: Define agents and tasks in YAML, keep crew classes minimal +2. **Check built-in tools** before writing custom ones +3. **Use structured output** (output_pydantic) for data that flows between tasks or crews +4. **Use guardrails** to validate task outputs programmatically +5. **Enable memory** for crews that benefit from cross-session learning +6. **Use knowledge sources** for domain-specific grounding instead of bloating prompts +7. **Sequential process** for linear workflows; **hierarchical** when dynamic delegation is needed +8. **Flows for multi-crew orchestration**: Use `@start`, `@listen`, `@router` for complex pipelines +9. **Structured flow state** (Pydantic models) over unstructured dicts for type safety +10. **Test with** `crewai test` to evaluate crew performance across iterations +11. **Verbose mode** during development, disable in production +12. **Rate limiting** (`max_rpm`) to avoid API throttling +13. **`respect_context_window=True`** to auto-handle token limits + +## Common Pitfalls + +- **Using `ChatOpenAI()`** β€” Always use `crewai.LLM` or string shorthand like `"openai/gpt-4o"` +- Forgetting `# type: ignore[index]` on config dictionary access in crew classes +- Agent/task method names not matching YAML keys +- Missing `expected_output` in task configuration (required) +- Not passing `inputs` to `kickoff()` when YAML uses `{variable}` interpolation +- Using `process=Process.hierarchical` without setting `manager_llm` or `manager_agent` +- Circular delegation: set `allow_delegation=False` on specialist agents +- Not installing tools package: `uv add crewai-tools` diff --git a/lib/crewai/src/crewai/cli/tools/main.py b/lib/crewai/src/crewai/cli/tools/main.py index 37467a906..e2dd21dde 100644 --- a/lib/crewai/src/crewai/cli/tools/main.py +++ b/lib/crewai/src/crewai/cli/tools/main.py @@ -2,6 +2,7 @@ import base64 from json import JSONDecodeError import os from pathlib import Path +import shutil import subprocess import tempfile from typing import Any @@ -55,6 +56,11 @@ class ToolCommand(BaseCommand, PlusAPIMixin): tree_find_and_replace(project_root, "{{folder_name}}", folder_name) tree_find_and_replace(project_root, "{{class_name}}", class_name) + # Copy AGENTS.md to project root + agents_md_src = Path(__file__).parent.parent / "templates" / "AGENTS.md" + if agents_md_src.exists(): + shutil.copy2(agents_md_src, project_root / "AGENTS.md") + old_directory = os.getcwd() os.chdir(project_root) try: