Lorenze/agent executor flow pattern (#3975)

* WIP gh pr refactor: update agent executor handling and introduce flow-based executor

* wip

* refactor: clean up comments and improve code clarity in agent executor flow

- Removed outdated comments and unnecessary explanations in  and  classes to enhance code readability.
- Simplified parameter updates in the agent executor to avoid confusion regarding executor recreation.
- Improved clarity in the  method to ensure proper handling of non-final answers without raising errors.

* bumping pytest-randomly numpy

* also bump versions of anthropic sdk

* ensure flow logs are not passed if its on executor

* revert anthropic bump

* fix

* refactor: update dependency markers in uv.lock for platform compatibility

- Enhanced dependency markers for , , , and others to ensure compatibility across different platforms (Linux, Darwin, and architecture-specific conditions).
- Removed unnecessary event emission in the  class during kickoff.
- Cleaned up commented-out code in the  class for better readability and maintainability.

* drop dupllicate

* test: enhance agent executor creation and stop word assertions

- Added calls to create_agent_executor in multiple test cases to ensure proper agent execution setup.
- Updated assertions for stop words in the agent tests to remove unnecessary checks and improve clarity.
- Ensured consistency in task handling by invoking create_agent_executor with the appropriate task parameter.

* refactor: reorganize agent executor imports and introduce CrewAgentExecutorFlow

- Removed the old import of CrewAgentExecutorFlow and replaced it with the new import from the experimental module.
- Updated relevant references in the codebase to ensure compatibility with the new structure.
- Enhanced the organization of imports in core.py and base_agent.py for better clarity and maintainability.

* updating name

* dropped usage of printer here for rich console and dropped non-added value logging

* address i18n

* Enhance concurrency control in CrewAgentExecutorFlow by introducing a threading lock to prevent concurrent executions. This change ensures that the executor instance cannot be invoked while already running, improving stability and reliability during flow execution.

* string literal returns

* string literal returns

* Enhance CrewAgentExecutor initialization by allowing optional i18n parameter for improved internationalization support. This change ensures that the executor can utilize a provided i18n instance or fallback to the default, enhancing flexibility in multilingual contexts.

---------

Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
This commit is contained in:
Lorenze Jay
2025-12-28 10:21:32 -08:00
committed by GitHub
parent c73b36a4c5
commit b9dd166a6b
16 changed files with 2010 additions and 595 deletions

View File

@@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from collections.abc import Sequence from collections.abc import Callable, Sequence
import shutil import shutil
import subprocess import subprocess
import time import time
@@ -44,6 +44,7 @@ from crewai.events.types.memory_events import (
MemoryRetrievalCompletedEvent, MemoryRetrievalCompletedEvent,
MemoryRetrievalStartedEvent, MemoryRetrievalStartedEvent,
) )
from crewai.experimental.crew_agent_executor_flow import CrewAgentExecutorFlow
from crewai.knowledge.knowledge import Knowledge from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.lite_agent import LiteAgent from crewai.lite_agent import LiteAgent
@@ -105,7 +106,7 @@ class Agent(BaseAgent):
The agent can also have memory, can operate in verbose mode, and can delegate tasks to other agents. The agent can also have memory, can operate in verbose mode, and can delegate tasks to other agents.
Attributes: Attributes:
agent_executor: An instance of the CrewAgentExecutor class. agent_executor: An instance of the CrewAgentExecutor or CrewAgentExecutorFlow class.
role: The role of the agent. role: The role of the agent.
goal: The objective of the agent. goal: The objective of the agent.
backstory: The backstory of the agent. backstory: The backstory of the agent.
@@ -221,6 +222,10 @@ class Agent(BaseAgent):
default=None, default=None,
description="A2A (Agent-to-Agent) configuration for delegating tasks to remote agents. Can be a single A2AConfig or a dict mapping agent IDs to configs.", description="A2A (Agent-to-Agent) configuration for delegating tasks to remote agents. Can be a single A2AConfig or a dict mapping agent IDs to configs.",
) )
executor_class: type[CrewAgentExecutor] | type[CrewAgentExecutorFlow] = Field(
default=CrewAgentExecutor,
description="Class to use for the agent executor. Defaults to CrewAgentExecutor, can optionally use CrewAgentExecutorFlow.",
)
@model_validator(mode="before") @model_validator(mode="before")
def validate_from_repository(cls, v: Any) -> dict[str, Any] | None | Any: # noqa: N805 def validate_from_repository(cls, v: Any) -> dict[str, Any] | None | Any: # noqa: N805
@@ -721,29 +726,83 @@ class Agent(BaseAgent):
self.response_template.split("{{ .Response }}")[1].strip() self.response_template.split("{{ .Response }}")[1].strip()
) )
self.agent_executor = CrewAgentExecutor( rpm_limit_fn = (
llm=self.llm, # type: ignore[arg-type] self._rpm_controller.check_or_wait if self._rpm_controller else None
task=task, # type: ignore[arg-type]
agent=self,
crew=self.crew,
tools=parsed_tools,
prompt=prompt,
original_tools=raw_tools,
stop_words=stop_words,
max_iter=self.max_iter,
tools_handler=self.tools_handler,
tools_names=get_tool_names(parsed_tools),
tools_description=render_text_description_and_args(parsed_tools),
step_callback=self.step_callback,
function_calling_llm=self.function_calling_llm,
respect_context_window=self.respect_context_window,
request_within_rpm_limit=(
self._rpm_controller.check_or_wait if self._rpm_controller else None
),
callbacks=[TokenCalcHandler(self._token_process)],
response_model=task.response_model if task else None,
) )
if self.agent_executor is not None:
self._update_executor_parameters(
task=task,
tools=parsed_tools,
raw_tools=raw_tools,
prompt=prompt,
stop_words=stop_words,
rpm_limit_fn=rpm_limit_fn,
)
else:
self.agent_executor = self.executor_class(
llm=cast(BaseLLM, self.llm),
task=task,
i18n=self.i18n,
agent=self,
crew=self.crew,
tools=parsed_tools,
prompt=prompt,
original_tools=raw_tools,
stop_words=stop_words,
max_iter=self.max_iter,
tools_handler=self.tools_handler,
tools_names=get_tool_names(parsed_tools),
tools_description=render_text_description_and_args(parsed_tools),
step_callback=self.step_callback,
function_calling_llm=self.function_calling_llm,
respect_context_window=self.respect_context_window,
request_within_rpm_limit=rpm_limit_fn,
callbacks=[TokenCalcHandler(self._token_process)],
response_model=task.response_model if task else None,
)
def _update_executor_parameters(
self,
task: Task | None,
tools: list,
raw_tools: list[BaseTool],
prompt: dict,
stop_words: list[str],
rpm_limit_fn: Callable | None,
) -> None:
"""Update executor parameters without recreating instance.
Args:
task: Task to execute.
tools: Parsed tools.
raw_tools: Original tools.
prompt: Generated prompt.
stop_words: Stop words list.
rpm_limit_fn: RPM limit callback function.
"""
self.agent_executor.task = task
self.agent_executor.tools = tools
self.agent_executor.original_tools = raw_tools
self.agent_executor.prompt = prompt
self.agent_executor.stop = stop_words
self.agent_executor.tools_names = get_tool_names(tools)
self.agent_executor.tools_description = render_text_description_and_args(tools)
self.agent_executor.response_model = task.response_model if task else None
self.agent_executor.tools_handler = self.tools_handler
self.agent_executor.request_within_rpm_limit = rpm_limit_fn
if self.agent_executor.llm:
existing_stop = getattr(self.agent_executor.llm, "stop", [])
self.agent_executor.llm.stop = list(
set(
existing_stop + stop_words
if isinstance(existing_stop, list)
else stop_words
)
)
def get_delegation_tools(self, agents: list[BaseAgent]) -> list[BaseTool]: def get_delegation_tools(self, agents: list[BaseAgent]) -> list[BaseTool]:
agent_tools = AgentTools(agents=agents) agent_tools = AgentTools(agents=agents)
return agent_tools.tools() return agent_tools.tools()

View File

@@ -457,7 +457,6 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
if self.cache: if self.cache:
self.cache_handler = cache_handler self.cache_handler = cache_handler
self.tools_handler.cache = cache_handler self.tools_handler.cache = cache_handler
self.create_agent_executor()
def set_rpm_controller(self, rpm_controller: RPMController) -> None: def set_rpm_controller(self, rpm_controller: RPMController) -> None:
"""Set the rpm controller for the agent. """Set the rpm controller for the agent.
@@ -467,7 +466,6 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
""" """
if not self._rpm_controller: if not self._rpm_controller:
self._rpm_controller = rpm_controller self._rpm_controller = rpm_controller
self.create_agent_executor()
def set_knowledge(self, crew_embedder: EmbedderConfig | None = None) -> None: def set_knowledge(self, crew_embedder: EmbedderConfig | None = None) -> None:
pass pass

View File

@@ -91,6 +91,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
request_within_rpm_limit: Callable[[], bool] | None = None, request_within_rpm_limit: Callable[[], bool] | None = None,
callbacks: list[Any] | None = None, callbacks: list[Any] | None = None,
response_model: type[BaseModel] | None = None, response_model: type[BaseModel] | None = None,
i18n: I18N | None = None,
) -> None: ) -> None:
"""Initialize executor. """Initialize executor.
@@ -114,7 +115,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
callbacks: Optional callbacks list. callbacks: Optional callbacks list.
response_model: Optional Pydantic model for structured outputs. response_model: Optional Pydantic model for structured outputs.
""" """
self._i18n: I18N = get_i18n() self._i18n: I18N = i18n or get_i18n()
self.llm = llm self.llm = llm
self.task = task self.task = task
self.agent = agent self.agent = agent

View File

@@ -1,3 +1,4 @@
from crewai.experimental.crew_agent_executor_flow import CrewAgentExecutorFlow
from crewai.experimental.evaluation import ( from crewai.experimental.evaluation import (
AgentEvaluationResult, AgentEvaluationResult,
AgentEvaluator, AgentEvaluator,
@@ -23,6 +24,7 @@ __all__ = [
"AgentEvaluationResult", "AgentEvaluationResult",
"AgentEvaluator", "AgentEvaluator",
"BaseEvaluator", "BaseEvaluator",
"CrewAgentExecutorFlow",
"EvaluationScore", "EvaluationScore",
"EvaluationTraceCallback", "EvaluationTraceCallback",
"ExperimentResult", "ExperimentResult",

View File

@@ -0,0 +1,808 @@
from __future__ import annotations
from collections.abc import Callable
import threading
from typing import TYPE_CHECKING, Any, Literal, cast
from uuid import uuid4
from pydantic import BaseModel, Field, GetCoreSchemaHandler
from pydantic_core import CoreSchema, core_schema
from rich.console import Console
from rich.text import Text
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
from crewai.agents.parser import (
AgentAction,
AgentFinish,
OutputParserError,
)
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.logging_events import (
AgentLogsExecutionEvent,
AgentLogsStartedEvent,
)
from crewai.flow.flow import Flow, listen, or_, router, start
from crewai.hooks.llm_hooks import (
get_after_llm_call_hooks,
get_before_llm_call_hooks,
)
from crewai.utilities.agent_utils import (
enforce_rpm_limit,
format_message_for_llm,
get_llm_response,
handle_agent_action_core,
handle_context_length,
handle_max_iterations_exceeded,
handle_output_parser_exception,
handle_unknown_error,
has_reached_max_iterations,
is_context_length_exceeded,
process_llm_response,
)
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.i18n import I18N, get_i18n
from crewai.utilities.printer import Printer
from crewai.utilities.tool_utils import execute_tool_and_check_finality
from crewai.utilities.training_handler import CrewTrainingHandler
from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.agent import Agent
from crewai.agents.tools_handler import ToolsHandler
from crewai.crew import Crew
from crewai.llms.base_llm import BaseLLM
from crewai.task import Task
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_types import ToolResult
from crewai.utilities.prompts import StandardPromptResult, SystemPromptResult
class AgentReActState(BaseModel):
"""Structured state for agent ReAct flow execution.
Replaces scattered instance variables with validated immutable state.
Maps to: self.messages, self.iterations, formatted_answer in current executor.
"""
messages: list[LLMMessage] = Field(default_factory=list)
iterations: int = Field(default=0)
current_answer: AgentAction | AgentFinish | None = Field(default=None)
is_finished: bool = Field(default=False)
ask_for_human_input: bool = Field(default=False)
class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
"""Flow-based executor matching CrewAgentExecutor interface.
Inherits from:
- Flow[AgentReActState]: Provides flow orchestration capabilities
- CrewAgentExecutorMixin: Provides memory methods (short/long/external term)
Note: Multiple instances may be created during agent initialization
(cache setup, RPM controller setup, etc.) but only the final instance
should execute tasks via invoke().
"""
def __init__(
self,
llm: BaseLLM,
task: Task,
crew: Crew,
agent: Agent,
prompt: SystemPromptResult | StandardPromptResult,
max_iter: int,
tools: list[CrewStructuredTool],
tools_names: str,
stop_words: list[str],
tools_description: str,
tools_handler: ToolsHandler,
step_callback: Any = None,
original_tools: list[BaseTool] | None = None,
function_calling_llm: BaseLLM | Any | None = None,
respect_context_window: bool = False,
request_within_rpm_limit: Callable[[], bool] | None = None,
callbacks: list[Any] | None = None,
response_model: type[BaseModel] | None = None,
i18n: I18N | None = None,
) -> None:
"""Initialize the flow-based agent executor.
Args:
llm: Language model instance.
task: Task to execute.
crew: Crew instance.
agent: Agent to execute.
prompt: Prompt templates.
max_iter: Maximum iterations.
tools: Available tools.
tools_names: Tool names string.
stop_words: Stop word list.
tools_description: Tool descriptions.
tools_handler: Tool handler instance.
step_callback: Optional step callback.
original_tools: Original tool list.
function_calling_llm: Optional function calling LLM.
respect_context_window: Respect context limits.
request_within_rpm_limit: RPM limit check function.
callbacks: Optional callbacks list.
response_model: Optional Pydantic model for structured outputs.
"""
self._i18n: I18N = i18n or get_i18n()
self.llm = llm
self.task = task
self.agent = agent
self.crew = crew
self.prompt = prompt
self.tools = tools
self.tools_names = tools_names
self.stop = stop_words
self.max_iter = max_iter
self.callbacks = callbacks or []
self._printer: Printer = Printer()
self.tools_handler = tools_handler
self.original_tools = original_tools or []
self.step_callback = step_callback
self.tools_description = tools_description
self.function_calling_llm = function_calling_llm
self.respect_context_window = respect_context_window
self.request_within_rpm_limit = request_within_rpm_limit
self.response_model = response_model
self.log_error_after = 3
self._console: Console = Console()
# Error context storage for recovery
self._last_parser_error: OutputParserError | None = None
self._last_context_error: Exception | None = None
# Execution guard to prevent concurrent/duplicate executions
self._execution_lock = threading.Lock()
self._is_executing: bool = False
self._has_been_invoked: bool = False
self._flow_initialized: bool = False
self._instance_id = str(uuid4())[:8]
self.before_llm_call_hooks: list[Callable] = []
self.after_llm_call_hooks: list[Callable] = []
self.before_llm_call_hooks.extend(get_before_llm_call_hooks())
self.after_llm_call_hooks.extend(get_after_llm_call_hooks())
if self.llm:
existing_stop = getattr(self.llm, "stop", [])
self.llm.stop = list(
set(
existing_stop + self.stop
if isinstance(existing_stop, list)
else self.stop
)
)
self._state = AgentReActState()
def _ensure_flow_initialized(self) -> None:
"""Ensure Flow.__init__() has been called.
This is deferred from __init__ to prevent FlowCreatedEvent emission
during agent setup when multiple executor instances are created.
Only the instance that actually executes via invoke() will emit events.
"""
if not self._flow_initialized:
# Now call Flow's __init__ which will replace self._state
# with Flow's managed state. Suppress flow events since this is
# an agent executor, not a user-facing flow.
super().__init__(
suppress_flow_events=True,
)
self._flow_initialized = True
@property
def use_stop_words(self) -> bool:
"""Check to determine if stop words are being used.
Returns:
bool: True if stop words should be used.
"""
return self.llm.supports_stop_words() if self.llm else False
@property
def state(self) -> AgentReActState:
"""Get state - returns temporary state if Flow not yet initialized.
Flow initialization is deferred to prevent event emission during agent setup.
Returns the temporary state until invoke() is called.
"""
return self._state
@property
def messages(self) -> list[LLMMessage]:
"""Compatibility property for mixin - returns state messages."""
return self._state.messages
@property
def iterations(self) -> int:
"""Compatibility property for mixin - returns state iterations."""
return self._state.iterations
@start()
def initialize_reasoning(self) -> Literal["initialized"]:
"""Initialize the reasoning flow and emit agent start logs."""
self._show_start_logs()
return "initialized"
@listen("force_final_answer")
def force_final_answer(self) -> Literal["agent_finished"]:
"""Force agent to provide final answer when max iterations exceeded."""
formatted_answer = handle_max_iterations_exceeded(
formatted_answer=None,
printer=self._printer,
i18n=self._i18n,
messages=list(self.state.messages),
llm=self.llm,
callbacks=self.callbacks,
)
self.state.current_answer = formatted_answer
self.state.is_finished = True
return "agent_finished"
@listen("continue_reasoning")
def call_llm_and_parse(self) -> Literal["parsed", "parser_error", "context_error"]:
"""Execute LLM call with hooks and parse the response.
Returns routing decision based on parsing result.
"""
try:
enforce_rpm_limit(self.request_within_rpm_limit)
answer = get_llm_response(
llm=self.llm,
messages=list(self.state.messages),
callbacks=self.callbacks,
printer=self._printer,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
# Parse the LLM response
formatted_answer = process_llm_response(answer, self.use_stop_words)
self.state.current_answer = formatted_answer
if "Final Answer:" in answer and isinstance(formatted_answer, AgentAction):
warning_text = Text()
warning_text.append("⚠️ ", style="yellow bold")
warning_text.append(
f"LLM returned 'Final Answer:' but parsed as AgentAction (tool: {formatted_answer.tool})",
style="yellow",
)
self._console.print(warning_text)
preview_text = Text()
preview_text.append("Answer preview: ", style="yellow")
preview_text.append(f"{answer[:200]}...", style="yellow dim")
self._console.print(preview_text)
return "parsed"
except OutputParserError as e:
# Store error context for recovery
self._last_parser_error = e or OutputParserError(
error="Unknown parser error"
)
return "parser_error"
except Exception as e:
if is_context_length_exceeded(e):
self._last_context_error = e
return "context_error"
if e.__class__.__module__.startswith("litellm"):
raise e
handle_unknown_error(self._printer, e)
raise
@router(call_llm_and_parse)
def route_by_answer_type(self) -> Literal["execute_tool", "agent_finished"]:
"""Route based on whether answer is AgentAction or AgentFinish."""
if isinstance(self.state.current_answer, AgentAction):
return "execute_tool"
return "agent_finished"
@listen("execute_tool")
def execute_tool_action(self) -> Literal["tool_completed", "tool_result_is_final"]:
"""Execute the tool action and handle the result."""
try:
action = cast(AgentAction, self.state.current_answer)
# Extract fingerprint context for tool execution
fingerprint_context = {}
if (
self.agent
and hasattr(self.agent, "security_config")
and hasattr(self.agent.security_config, "fingerprint")
):
fingerprint_context = {
"agent_fingerprint": str(self.agent.security_config.fingerprint)
}
# Execute the tool
tool_result = execute_tool_and_check_finality(
agent_action=action,
fingerprint_context=fingerprint_context,
tools=self.tools,
i18n=self._i18n,
agent_key=self.agent.key if self.agent else None,
agent_role=self.agent.role if self.agent else None,
tools_handler=self.tools_handler,
task=self.task,
agent=self.agent,
function_calling_llm=self.function_calling_llm,
crew=self.crew,
)
# Handle agent action and append observation to messages
result = self._handle_agent_action(action, tool_result)
self.state.current_answer = result
# Invoke step callback if configured
self._invoke_step_callback(result)
# Append result message to conversation state
if hasattr(result, "text"):
self._append_message_to_state(result.text)
# Check if tool result became a final answer (result_as_answer flag)
if isinstance(result, AgentFinish):
self.state.is_finished = True
return "tool_result_is_final"
return "tool_completed"
except Exception as e:
error_text = Text()
error_text.append("❌ Error in tool execution: ", style="red bold")
error_text.append(str(e), style="red")
self._console.print(error_text)
raise
@listen("initialized")
def continue_iteration(self) -> Literal["check_iteration"]:
"""Bridge listener that connects iteration loop back to iteration check."""
return "check_iteration"
@router(or_(initialize_reasoning, continue_iteration))
def check_max_iterations(
self,
) -> Literal["force_final_answer", "continue_reasoning"]:
"""Check if max iterations reached before proceeding with reasoning."""
if has_reached_max_iterations(self.state.iterations, self.max_iter):
return "force_final_answer"
return "continue_reasoning"
@router(execute_tool_action)
def increment_and_continue(self) -> Literal["initialized"]:
"""Increment iteration counter and loop back for next iteration."""
self.state.iterations += 1
return "initialized"
@listen(or_("agent_finished", "tool_result_is_final"))
def finalize(self) -> Literal["completed", "skipped"]:
"""Finalize execution and emit completion logs."""
if self.state.current_answer is None:
skip_text = Text()
skip_text.append("⚠️ ", style="yellow bold")
skip_text.append(
"Finalize called but no answer in state - skipping", style="yellow"
)
self._console.print(skip_text)
return "skipped"
if not isinstance(self.state.current_answer, AgentFinish):
skip_text = Text()
skip_text.append("⚠️ ", style="yellow bold")
skip_text.append(
f"Finalize called with {type(self.state.current_answer).__name__} instead of AgentFinish - skipping",
style="yellow",
)
self._console.print(skip_text)
return "skipped"
self.state.is_finished = True
self._show_logs(self.state.current_answer)
return "completed"
@listen("parser_error")
def recover_from_parser_error(self) -> Literal["initialized"]:
"""Recover from output parser errors and retry."""
formatted_answer = handle_output_parser_exception(
e=self._last_parser_error,
messages=list(self.state.messages),
iterations=self.state.iterations,
log_error_after=self.log_error_after,
printer=self._printer,
)
if formatted_answer:
self.state.current_answer = formatted_answer
self.state.iterations += 1
return "initialized"
@listen("context_error")
def recover_from_context_length(self) -> Literal["initialized"]:
"""Recover from context length errors and retry."""
handle_context_length(
respect_context_window=self.respect_context_window,
printer=self._printer,
messages=self.state.messages,
llm=self.llm,
callbacks=self.callbacks,
i18n=self._i18n,
)
self.state.iterations += 1
return "initialized"
def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
"""Execute agent with given inputs.
Args:
inputs: Input dictionary containing prompt variables.
Returns:
Dictionary with agent output.
"""
self._ensure_flow_initialized()
with self._execution_lock:
if self._is_executing:
raise RuntimeError(
"Executor is already running. "
"Cannot invoke the same executor instance concurrently."
)
self._is_executing = True
self._has_been_invoked = True
try:
# Reset state for fresh execution
self.state.messages.clear()
self.state.iterations = 0
self.state.current_answer = None
self.state.is_finished = False
if "system" in self.prompt:
prompt = cast("SystemPromptResult", self.prompt)
system_prompt = self._format_prompt(prompt["system"], inputs)
user_prompt = self._format_prompt(prompt["user"], inputs)
self.state.messages.append(
format_message_for_llm(system_prompt, role="system")
)
self.state.messages.append(format_message_for_llm(user_prompt))
else:
user_prompt = self._format_prompt(self.prompt["prompt"], inputs)
self.state.messages.append(format_message_for_llm(user_prompt))
self.state.ask_for_human_input = bool(
inputs.get("ask_for_human_input", False)
)
self.kickoff()
formatted_answer = self.state.current_answer
if not isinstance(formatted_answer, AgentFinish):
raise RuntimeError(
"Agent execution ended without reaching a final answer."
)
if self.state.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)
self._create_short_term_memory(formatted_answer)
self._create_long_term_memory(formatted_answer)
self._create_external_memory(formatted_answer)
return {"output": formatted_answer.output}
except AssertionError:
fail_text = Text()
fail_text.append("", style="red bold")
fail_text.append(
"Agent failed to reach a final answer. This is likely a bug - please report it.",
style="red",
)
self._console.print(fail_text)
raise
except Exception as e:
handle_unknown_error(self._printer, e)
raise
finally:
self._is_executing = False
def _handle_agent_action(
self, formatted_answer: AgentAction, tool_result: ToolResult
) -> AgentAction | AgentFinish:
"""Process agent action and tool execution result.
Args:
formatted_answer: Agent's action to execute.
tool_result: Result from tool execution.
Returns:
Updated action or final answer.
"""
add_image_tool = self._i18n.tools("add_image")
if (
isinstance(add_image_tool, dict)
and formatted_answer.tool.casefold().strip()
== add_image_tool.get("name", "").casefold().strip()
):
self.state.messages.append(
{"role": "assistant", "content": tool_result.result}
)
return formatted_answer
return handle_agent_action_core(
formatted_answer=formatted_answer,
tool_result=tool_result,
messages=self.state.messages,
step_callback=self.step_callback,
show_logs=self._show_logs,
)
def _invoke_step_callback(
self, formatted_answer: AgentAction | AgentFinish
) -> None:
"""Invoke step callback if configured.
Args:
formatted_answer: Current agent response.
"""
if self.step_callback:
self.step_callback(formatted_answer)
def _append_message_to_state(
self, text: str, role: Literal["user", "assistant", "system"] = "assistant"
) -> None:
"""Add message to state conversation history.
Args:
text: Message content.
role: Message role (default: assistant).
"""
self.state.messages.append(format_message_for_llm(text, role=role))
def _show_start_logs(self) -> None:
"""Emit agent start event."""
if self.agent is None:
raise ValueError("Agent cannot be None")
crewai_event_bus.emit(
self.agent,
AgentLogsStartedEvent(
agent_role=self.agent.role,
task_description=(self.task.description if self.task else "Not Found"),
verbose=self.agent.verbose
or (hasattr(self, "crew") and getattr(self.crew, "verbose", False)),
),
)
def _show_logs(self, formatted_answer: AgentAction | AgentFinish) -> None:
"""Emit agent execution event.
Args:
formatted_answer: Agent's response to log.
"""
if self.agent is None:
raise ValueError("Agent cannot be None")
crewai_event_bus.emit(
self.agent,
AgentLogsExecutionEvent(
agent_role=self.agent.role,
formatted_answer=formatted_answer,
verbose=self.agent.verbose
or (hasattr(self, "crew") and getattr(self.crew, "verbose", False)),
),
)
def _handle_crew_training_output(
self, result: AgentFinish, human_feedback: str | None = None
) -> None:
"""Save training data for crew training mode.
Args:
result: Agent's final output.
human_feedback: Optional feedback from human.
"""
agent_id = str(self.agent.id)
train_iteration = (
getattr(self.crew, "_train_iteration", None) if self.crew else None
)
if train_iteration is None or not isinstance(train_iteration, int):
train_error = Text()
train_error.append("", style="red bold")
train_error.append(
"Invalid or missing train iteration. Cannot save training data.",
style="red",
)
self._console.print(train_error)
return
training_handler = CrewTrainingHandler(TRAINING_DATA_FILE)
training_data = training_handler.load() or {}
# Initialize or retrieve agent's training data
agent_training_data = training_data.get(agent_id, {})
if human_feedback is not None:
# Save initial output and human feedback
agent_training_data[train_iteration] = {
"initial_output": result.output,
"human_feedback": human_feedback,
}
else:
# Save improved output
if train_iteration in agent_training_data:
agent_training_data[train_iteration]["improved_output"] = result.output
else:
train_error = Text()
train_error.append("", style="red bold")
train_error.append(
f"No existing training data for agent {agent_id} and iteration "
f"{train_iteration}. Cannot save improved output.",
style="red",
)
self._console.print(train_error)
return
# Update the training data and save
training_data[agent_id] = agent_training_data
training_handler.save(training_data)
@staticmethod
def _format_prompt(prompt: str, inputs: dict[str, str]) -> str:
"""Format prompt template with input values.
Args:
prompt: Template string.
inputs: Values to substitute.
Returns:
Formatted prompt.
"""
prompt = prompt.replace("{input}", inputs["input"])
prompt = prompt.replace("{tool_names}", inputs["tool_names"])
return prompt.replace("{tools}", inputs["tools"])
def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish:
"""Process human feedback and refine answer.
Args:
formatted_answer: Initial agent result.
Returns:
Final answer after feedback.
"""
human_feedback = self._ask_human_input(formatted_answer.output)
if self._is_training_mode():
return self._handle_training_feedback(formatted_answer, human_feedback)
return self._handle_regular_feedback(formatted_answer, human_feedback)
def _is_training_mode(self) -> bool:
"""Check if training mode is active.
Returns:
True if in training mode.
"""
return bool(self.crew and self.crew._train)
def _handle_training_feedback(
self, initial_answer: AgentFinish, feedback: str
) -> AgentFinish:
"""Process training feedback and generate improved answer.
Args:
initial_answer: Initial agent output.
feedback: Training feedback.
Returns:
Improved answer.
"""
self._handle_crew_training_output(initial_answer, feedback)
self.state.messages.append(
format_message_for_llm(
self._i18n.slice("feedback_instructions").format(feedback=feedback)
)
)
# Re-run flow for improved answer
self.state.iterations = 0
self.state.is_finished = False
self.state.current_answer = None
self.kickoff()
# Get improved answer from state
improved_answer = self.state.current_answer
if not isinstance(improved_answer, AgentFinish):
raise RuntimeError(
"Training feedback iteration did not produce final answer"
)
self._handle_crew_training_output(improved_answer)
self.state.ask_for_human_input = False
return improved_answer
def _handle_regular_feedback(
self, current_answer: AgentFinish, initial_feedback: str
) -> AgentFinish:
"""Process regular feedback iteratively until user is satisfied.
Args:
current_answer: Current agent output.
initial_feedback: Initial user feedback.
Returns:
Final answer after iterations.
"""
feedback = initial_feedback
answer = current_answer
while self.state.ask_for_human_input:
if feedback.strip() == "":
self.state.ask_for_human_input = False
else:
answer = self._process_feedback_iteration(feedback)
feedback = self._ask_human_input(answer.output)
return answer
def _process_feedback_iteration(self, feedback: str) -> AgentFinish:
"""Process a single feedback iteration and generate updated response.
Args:
feedback: User feedback.
Returns:
Updated agent response.
"""
self.state.messages.append(
format_message_for_llm(
self._i18n.slice("feedback_instructions").format(feedback=feedback)
)
)
# Re-run flow
self.state.iterations = 0
self.state.is_finished = False
self.state.current_answer = None
self.kickoff()
# Get answer from state
answer = self.state.current_answer
if not isinstance(answer, AgentFinish):
raise RuntimeError("Feedback iteration did not produce final answer")
return answer
@classmethod
def __get_pydantic_core_schema__(
cls, _source_type: Any, _handler: GetCoreSchemaHandler
) -> CoreSchema:
"""Generate Pydantic core schema for Protocol compatibility.
Allows the executor to be used in Pydantic models without
requiring arbitrary_types_allowed=True.
"""
return core_schema.any_schema()

View File

@@ -1,8 +1,9 @@
from __future__ import annotations
from collections.abc import Sequence from collections.abc import Sequence
import threading import threading
from typing import Any from typing import TYPE_CHECKING, Any
from crewai.agent.core import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.events.event_bus import crewai_event_bus from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import ( from crewai.events.types.agent_events import (
@@ -28,6 +29,10 @@ from crewai.experimental.evaluation.evaluation_listener import (
from crewai.task import Task from crewai.task import Task
if TYPE_CHECKING:
from crewai.agent import Agent
class ExecutionState: class ExecutionState:
current_agent_id: str | None = None current_agent_id: str | None = None
current_task_id: str | None = None current_task_id: str | None = None

View File

@@ -1,17 +1,22 @@
from __future__ import annotations
import abc import abc
import enum import enum
from enum import Enum from enum import Enum
from typing import Any from typing import TYPE_CHECKING, Any
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.llm import BaseLLM from crewai.llm import BaseLLM
from crewai.task import Task from crewai.task import Task
from crewai.utilities.llm_utils import create_llm from crewai.utilities.llm_utils import create_llm
if TYPE_CHECKING:
from crewai.agent import Agent
class MetricCategory(enum.Enum): class MetricCategory(enum.Enum):
GOAL_ALIGNMENT = "goal_alignment" GOAL_ALIGNMENT = "goal_alignment"
SEMANTIC_QUALITY = "semantic_quality" SEMANTIC_QUALITY = "semantic_quality"

View File

@@ -1,8 +1,9 @@
from __future__ import annotations
from collections import defaultdict from collections import defaultdict
from hashlib import md5 from hashlib import md5
from typing import Any from typing import TYPE_CHECKING, Any
from crewai import Agent, Crew
from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.experimental.evaluation import AgentEvaluator, create_default_evaluator from crewai.experimental.evaluation import AgentEvaluator, create_default_evaluator
from crewai.experimental.evaluation.evaluation_display import ( from crewai.experimental.evaluation.evaluation_display import (
@@ -17,6 +18,11 @@ from crewai.experimental.evaluation.experiment.result_display import (
) )
if TYPE_CHECKING:
from crewai.agent import Agent
from crewai.crew import Crew
class ExperimentRunner: class ExperimentRunner:
def __init__(self, dataset: list[dict[str, Any]]): def __init__(self, dataset: list[dict[str, Any]]):
self.dataset = dataset or [] self.dataset = dataset or []

View File

@@ -1,6 +1,7 @@
from typing import Any from __future__ import annotations
from typing import TYPE_CHECKING, Any
from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.experimental.evaluation.base_evaluator import ( from crewai.experimental.evaluation.base_evaluator import (
BaseEvaluator, BaseEvaluator,
@@ -12,6 +13,10 @@ from crewai.task import Task
from crewai.utilities.types import LLMMessage from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.agent import Agent
class GoalAlignmentEvaluator(BaseEvaluator): class GoalAlignmentEvaluator(BaseEvaluator):
@property @property
def metric_category(self) -> MetricCategory: def metric_category(self) -> MetricCategory:

View File

@@ -6,15 +6,16 @@ This module provides evaluator implementations for:
- Thinking-to-action ratio - Thinking-to-action ratio
""" """
from __future__ import annotations
from collections.abc import Sequence from collections.abc import Sequence
from enum import Enum from enum import Enum
import logging import logging
import re import re
from typing import Any from typing import TYPE_CHECKING, Any
import numpy as np import numpy as np
from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.experimental.evaluation.base_evaluator import ( from crewai.experimental.evaluation.base_evaluator import (
BaseEvaluator, BaseEvaluator,
@@ -27,6 +28,10 @@ from crewai.tasks.task_output import TaskOutput
from crewai.utilities.types import LLMMessage from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.agent import Agent
class ReasoningPatternType(Enum): class ReasoningPatternType(Enum):
EFFICIENT = "efficient" # Good reasoning flow EFFICIENT = "efficient" # Good reasoning flow
LOOP = "loop" # Agent is stuck in a loop LOOP = "loop" # Agent is stuck in a loop

View File

@@ -1,6 +1,7 @@
from typing import Any from __future__ import annotations
from typing import TYPE_CHECKING, Any
from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.experimental.evaluation.base_evaluator import ( from crewai.experimental.evaluation.base_evaluator import (
BaseEvaluator, BaseEvaluator,
@@ -12,6 +13,10 @@ from crewai.task import Task
from crewai.utilities.types import LLMMessage from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.agent import Agent
class SemanticQualityEvaluator(BaseEvaluator): class SemanticQualityEvaluator(BaseEvaluator):
@property @property
def metric_category(self) -> MetricCategory: def metric_category(self) -> MetricCategory:

View File

@@ -1,7 +1,8 @@
import json from __future__ import annotations
from typing import Any
import json
from typing import TYPE_CHECKING, Any
from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.experimental.evaluation.base_evaluator import ( from crewai.experimental.evaluation.base_evaluator import (
BaseEvaluator, BaseEvaluator,
@@ -13,6 +14,10 @@ from crewai.task import Task
from crewai.utilities.types import LLMMessage from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.agent import Agent
class ToolSelectionEvaluator(BaseEvaluator): class ToolSelectionEvaluator(BaseEvaluator):
@property @property
def metric_category(self) -> MetricCategory: def metric_category(self) -> MetricCategory:

View File

@@ -459,7 +459,10 @@ class FlowMeta(type):
): ):
routers.add(attr_name) routers.add(attr_name)
# Get router paths from the decorator attribute # Get router paths from the decorator attribute
if hasattr(attr_value, "__router_paths__") and attr_value.__router_paths__: if (
hasattr(attr_value, "__router_paths__")
and attr_value.__router_paths__
):
router_paths[attr_name] = attr_value.__router_paths__ router_paths[attr_name] = attr_value.__router_paths__
else: else:
possible_returns = get_possible_return_constants(attr_value) possible_returns = get_possible_return_constants(attr_value)
@@ -501,6 +504,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self, self,
persistence: FlowPersistence | None = None, persistence: FlowPersistence | None = None,
tracing: bool | None = None, tracing: bool | None = None,
suppress_flow_events: bool = False,
**kwargs: Any, **kwargs: Any,
) -> None: ) -> None:
"""Initialize a new Flow instance. """Initialize a new Flow instance.
@@ -508,6 +512,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
Args: Args:
persistence: Optional persistence backend for storing flow states persistence: Optional persistence backend for storing flow states
tracing: Whether to enable tracing. True=always enable, False=always disable, None=check environment/user settings tracing: Whether to enable tracing. True=always enable, False=always disable, None=check environment/user settings
suppress_flow_events: Whether to suppress flow event emissions (internal use)
**kwargs: Additional state values to initialize or override **kwargs: Additional state values to initialize or override
""" """
# Initialize basic instance attributes # Initialize basic instance attributes
@@ -526,6 +531,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self.human_feedback_history: list[HumanFeedbackResult] = [] self.human_feedback_history: list[HumanFeedbackResult] = []
self.last_human_feedback: HumanFeedbackResult | None = None self.last_human_feedback: HumanFeedbackResult | None = None
self._pending_feedback_context: PendingFeedbackContext | None = None self._pending_feedback_context: PendingFeedbackContext | None = None
self.suppress_flow_events: bool = suppress_flow_events
# Initialize state with initial values # Initialize state with initial values
self._state = self._create_initial_state() self._state = self._create_initial_state()
@@ -539,13 +545,14 @@ class Flow(Generic[T], metaclass=FlowMeta):
if kwargs: if kwargs:
self._initialize_state(kwargs) self._initialize_state(kwargs)
crewai_event_bus.emit( if not self.suppress_flow_events:
self, crewai_event_bus.emit(
FlowCreatedEvent( self,
type="flow_created", FlowCreatedEvent(
flow_name=self.name or self.__class__.__name__, type="flow_created",
), flow_name=self.name or self.__class__.__name__,
) ),
)
# Register all flow-related methods # Register all flow-related methods
for method_name in dir(self): for method_name in dir(self):
@@ -672,6 +679,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
result = flow.resume(feedback) result = flow.resume(feedback)
return result return result
# In an async handler, use resume_async instead: # In an async handler, use resume_async instead:
async def handle_feedback_async(flow_id: str, feedback: str): async def handle_feedback_async(flow_id: str, feedback: str):
flow = MyFlow.from_pending(flow_id) flow = MyFlow.from_pending(flow_id)
@@ -1307,19 +1315,20 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._initialize_state(filtered_inputs) self._initialize_state(filtered_inputs)
# Emit FlowStartedEvent and log the start of the flow. # Emit FlowStartedEvent and log the start of the flow.
future = crewai_event_bus.emit( if not self.suppress_flow_events:
self, future = crewai_event_bus.emit(
FlowStartedEvent( self,
type="flow_started", FlowStartedEvent(
flow_name=self.name or self.__class__.__name__, type="flow_started",
inputs=inputs, flow_name=self.name or self.__class__.__name__,
), inputs=inputs,
) ),
if future: )
self._event_futures.append(future) if future:
self._log_flow_event( self._event_futures.append(future)
f"Flow started with ID: {self.flow_id}", color="bold magenta" self._log_flow_event(
) f"Flow started with ID: {self.flow_id}", color="bold magenta"
)
if inputs is not None and "id" not in inputs: if inputs is not None and "id" not in inputs:
self._initialize_state(inputs) self._initialize_state(inputs)
@@ -1391,17 +1400,18 @@ class Flow(Generic[T], metaclass=FlowMeta):
final_output = self._method_outputs[-1] if self._method_outputs else None final_output = self._method_outputs[-1] if self._method_outputs else None
future = crewai_event_bus.emit( if not self.suppress_flow_events:
self, future = crewai_event_bus.emit(
FlowFinishedEvent( self,
type="flow_finished", FlowFinishedEvent(
flow_name=self.name or self.__class__.__name__, type="flow_finished",
result=final_output, flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(), result=final_output,
), state=self._copy_and_serialize_state(),
) ),
if future: )
self._event_futures.append(future) if future:
self._event_futures.append(future)
if self._event_futures: if self._event_futures:
await asyncio.gather( await asyncio.gather(
@@ -1537,18 +1547,19 @@ class Flow(Generic[T], metaclass=FlowMeta):
kwargs or {} kwargs or {}
) )
future = crewai_event_bus.emit( if not self.suppress_flow_events:
self, future = crewai_event_bus.emit(
MethodExecutionStartedEvent( self,
type="method_execution_started", MethodExecutionStartedEvent(
method_name=method_name, type="method_execution_started",
flow_name=self.name or self.__class__.__name__, method_name=method_name,
params=dumped_params, flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(), params=dumped_params,
), state=self._copy_and_serialize_state(),
) ),
if future: )
self._event_futures.append(future) if future:
self._event_futures.append(future)
result = ( result = (
await method(*args, **kwargs) await method(*args, **kwargs)
@@ -1563,41 +1574,32 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._completed_methods.add(method_name) self._completed_methods.add(method_name)
future = crewai_event_bus.emit( if not self.suppress_flow_events:
self,
MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(),
result=result,
),
)
if future:
self._event_futures.append(future)
return result
except Exception as e:
# Check if this is a HumanFeedbackPending exception (paused, not failed)
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
# Emit paused event instead of failed
future = crewai_event_bus.emit( future = crewai_event_bus.emit(
self, self,
MethodExecutionPausedEvent( MethodExecutionFinishedEvent(
type="method_execution_paused", type="method_execution_finished",
method_name=method_name, method_name=method_name,
flow_name=self.name or self.__class__.__name__, flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(), state=self._copy_and_serialize_state(),
flow_id=e.context.flow_id, result=result,
message=e.context.message,
emit=e.context.emit,
), ),
) )
if future: if future:
self._event_futures.append(future) self._event_futures.append(future)
raise e
return result
except Exception as e:
if not self.suppress_flow_events:
# Check if this is a HumanFeedbackPending exception (paused, not failed)
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
# Auto-save pending feedback (create default persistence if needed)
if self._persistence is None:
from crewai.flow.persistence import SQLiteFlowPersistence
self._persistence = SQLiteFlowPersistence()
# Regular failure # Regular failure
future = crewai_event_bus.emit( future = crewai_event_bus.emit(
@@ -1644,7 +1646,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
""" """
# First, handle routers repeatedly until no router triggers anymore # First, handle routers repeatedly until no router triggers anymore
router_results = [] router_results = []
router_result_to_feedback: dict[str, Any] = {} # Map outcome -> HumanFeedbackResult router_result_to_feedback: dict[
str, Any
] = {} # Map outcome -> HumanFeedbackResult
current_trigger = trigger_method current_trigger = trigger_method
current_result = result # Track the result to pass to each router current_result = result # Track the result to pass to each router
@@ -1963,7 +1967,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Show message and prompt for feedback # Show message and prompt for feedback
formatter.console.print(message, style="yellow") formatter.console.print(message, style="yellow")
formatter.console.print("(Press Enter to skip, or type your feedback)\n", style="cyan") formatter.console.print(
"(Press Enter to skip, or type your feedback)\n", style="cyan"
)
feedback = input("Your feedback: ").strip() feedback = input("Your feedback: ").strip()

View File

@@ -1178,6 +1178,7 @@ def test_system_and_prompt_template():
{{ .Response }}<|eot_id|>""", {{ .Response }}<|eot_id|>""",
) )
agent.create_agent_executor()
expected_prompt = """<|start_header_id|>system<|end_header_id|> expected_prompt = """<|start_header_id|>system<|end_header_id|>
@@ -1442,6 +1443,8 @@ def test_agent_max_retry_limit():
human_input=True, human_input=True,
) )
agent.create_agent_executor(task=task)
error_message = "Error happening while sending prompt to model." error_message = "Error happening while sending prompt to model."
with patch.object( with patch.object(
CrewAgentExecutor, "invoke", wraps=agent.agent_executor.invoke CrewAgentExecutor, "invoke", wraps=agent.agent_executor.invoke
@@ -1503,9 +1506,8 @@ def test_agent_with_custom_stop_words():
) )
assert isinstance(agent.llm, BaseLLM) assert isinstance(agent.llm, BaseLLM)
assert set(agent.llm.stop) == set([*stop_words, "\nObservation:"]) assert set(agent.llm.stop) == set(stop_words)
assert all(word in agent.llm.stop for word in stop_words) assert all(word in agent.llm.stop for word in stop_words)
assert "\nObservation:" in agent.llm.stop
def test_agent_with_callbacks(): def test_agent_with_callbacks():
@@ -1629,6 +1631,8 @@ def test_handle_context_length_exceeds_limit_cli_no():
) )
task = Task(description="test task", agent=agent, expected_output="test output") task = Task(description="test task", agent=agent, expected_output="test output")
agent.create_agent_executor(task=task)
with patch.object( with patch.object(
CrewAgentExecutor, "invoke", wraps=agent.agent_executor.invoke CrewAgentExecutor, "invoke", wraps=agent.agent_executor.invoke
) as private_mock: ) as private_mock:
@@ -1679,8 +1683,8 @@ def test_agent_with_all_llm_attributes():
assert agent.llm.temperature == 0.7 assert agent.llm.temperature == 0.7
assert agent.llm.top_p == 0.9 assert agent.llm.top_p == 0.9
# assert agent.llm.n == 1 # assert agent.llm.n == 1
assert set(agent.llm.stop) == set(["STOP", "END", "\nObservation:"]) assert set(agent.llm.stop) == set(["STOP", "END"])
assert all(word in agent.llm.stop for word in ["STOP", "END", "\nObservation:"]) assert all(word in agent.llm.stop for word in ["STOP", "END"])
assert agent.llm.max_tokens == 100 assert agent.llm.max_tokens == 100
assert agent.llm.presence_penalty == 0.1 assert agent.llm.presence_penalty == 0.1
assert agent.llm.frequency_penalty == 0.1 assert agent.llm.frequency_penalty == 0.1

View File

@@ -0,0 +1,479 @@
"""Unit tests for CrewAgentExecutorFlow.
Tests the Flow-based agent executor implementation including state management,
flow methods, routing logic, and error handling.
"""
from unittest.mock import Mock, patch
import pytest
from crewai.experimental.crew_agent_executor_flow import (
AgentReActState,
CrewAgentExecutorFlow,
)
from crewai.agents.parser import AgentAction, AgentFinish
class TestAgentReActState:
"""Test AgentReActState Pydantic model."""
def test_state_initialization(self):
"""Test AgentReActState initialization with defaults."""
state = AgentReActState()
assert state.iterations == 0
assert state.messages == []
assert state.current_answer is None
assert state.is_finished is False
assert state.ask_for_human_input is False
def test_state_with_values(self):
"""Test AgentReActState initialization with values."""
messages = [{"role": "user", "content": "test"}]
state = AgentReActState(
messages=messages,
iterations=5,
current_answer=AgentFinish(thought="thinking", output="done", text="final"),
is_finished=True,
ask_for_human_input=True,
)
assert state.messages == messages
assert state.iterations == 5
assert isinstance(state.current_answer, AgentFinish)
assert state.is_finished is True
assert state.ask_for_human_input is True
class TestCrewAgentExecutorFlow:
"""Test CrewAgentExecutorFlow class."""
@pytest.fixture
def mock_dependencies(self):
"""Create mock dependencies for executor."""
llm = Mock()
llm.supports_stop_words.return_value = True
task = Mock()
task.description = "Test task"
task.human_input = False
task.response_model = None
crew = Mock()
crew.verbose = False
crew._train = False
agent = Mock()
agent.id = "test-agent-id"
agent.role = "Test Agent"
agent.verbose = False
agent.key = "test-key"
prompt = {"prompt": "Test prompt with {input}, {tool_names}, {tools}"}
tools = []
tools_handler = Mock()
return {
"llm": llm,
"task": task,
"crew": crew,
"agent": agent,
"prompt": prompt,
"max_iter": 10,
"tools": tools,
"tools_names": "",
"stop_words": ["Observation"],
"tools_description": "",
"tools_handler": tools_handler,
}
def test_executor_initialization(self, mock_dependencies):
"""Test CrewAgentExecutorFlow initialization."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
assert executor.llm == mock_dependencies["llm"]
assert executor.task == mock_dependencies["task"]
assert executor.agent == mock_dependencies["agent"]
assert executor.crew == mock_dependencies["crew"]
assert executor.max_iter == 10
assert executor.use_stop_words is True
def test_initialize_reasoning(self, mock_dependencies):
"""Test flow entry point."""
with patch.object(
CrewAgentExecutorFlow, "_show_start_logs"
) as mock_show_start:
executor = CrewAgentExecutorFlow(**mock_dependencies)
result = executor.initialize_reasoning()
assert result == "initialized"
mock_show_start.assert_called_once()
def test_check_max_iterations_not_reached(self, mock_dependencies):
"""Test routing when iterations < max."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor.state.iterations = 5
result = executor.check_max_iterations()
assert result == "continue_reasoning"
def test_check_max_iterations_reached(self, mock_dependencies):
"""Test routing when iterations >= max."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor.state.iterations = 10
result = executor.check_max_iterations()
assert result == "force_final_answer"
def test_route_by_answer_type_action(self, mock_dependencies):
"""Test routing for AgentAction."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor.state.current_answer = AgentAction(
thought="thinking", tool="search", tool_input="query", text="action text"
)
result = executor.route_by_answer_type()
assert result == "execute_tool"
def test_route_by_answer_type_finish(self, mock_dependencies):
"""Test routing for AgentFinish."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor.state.current_answer = AgentFinish(
thought="final thoughts", output="Final answer", text="complete"
)
result = executor.route_by_answer_type()
assert result == "agent_finished"
def test_continue_iteration(self, mock_dependencies):
"""Test iteration continuation."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
result = executor.continue_iteration()
assert result == "check_iteration"
def test_finalize_success(self, mock_dependencies):
"""Test finalize with valid AgentFinish."""
with patch.object(CrewAgentExecutorFlow, "_show_logs") as mock_show_logs:
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor.state.current_answer = AgentFinish(
thought="final thinking", output="Done", text="complete"
)
result = executor.finalize()
assert result == "completed"
assert executor.state.is_finished is True
mock_show_logs.assert_called_once()
def test_finalize_failure(self, mock_dependencies):
"""Test finalize skips when given AgentAction instead of AgentFinish."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor.state.current_answer = AgentAction(
thought="thinking", tool="search", tool_input="query", text="action text"
)
result = executor.finalize()
# Should return "skipped" and not set is_finished
assert result == "skipped"
assert executor.state.is_finished is False
def test_format_prompt(self, mock_dependencies):
"""Test prompt formatting."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
inputs = {"input": "test input", "tool_names": "tool1, tool2", "tools": "desc"}
result = executor._format_prompt("Prompt {input} {tool_names} {tools}", inputs)
assert "test input" in result
assert "tool1, tool2" in result
assert "desc" in result
def test_is_training_mode_false(self, mock_dependencies):
"""Test training mode detection when not in training."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
assert executor._is_training_mode() is False
def test_is_training_mode_true(self, mock_dependencies):
"""Test training mode detection when in training."""
mock_dependencies["crew"]._train = True
executor = CrewAgentExecutorFlow(**mock_dependencies)
assert executor._is_training_mode() is True
def test_append_message_to_state(self, mock_dependencies):
"""Test message appending to state."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
initial_count = len(executor.state.messages)
executor._append_message_to_state("test message")
assert len(executor.state.messages) == initial_count + 1
assert executor.state.messages[-1]["content"] == "test message"
def test_invoke_step_callback(self, mock_dependencies):
"""Test step callback invocation."""
callback = Mock()
mock_dependencies["step_callback"] = callback
executor = CrewAgentExecutorFlow(**mock_dependencies)
answer = AgentFinish(thought="thinking", output="test", text="final")
executor._invoke_step_callback(answer)
callback.assert_called_once_with(answer)
def test_invoke_step_callback_none(self, mock_dependencies):
"""Test step callback when none provided."""
mock_dependencies["step_callback"] = None
executor = CrewAgentExecutorFlow(**mock_dependencies)
# Should not raise error
executor._invoke_step_callback(
AgentFinish(thought="thinking", output="test", text="final")
)
@patch("crewai.experimental.crew_agent_executor_flow.handle_output_parser_exception")
def test_recover_from_parser_error(
self, mock_handle_exception, mock_dependencies
):
"""Test recovery from OutputParserError."""
from crewai.agents.parser import OutputParserError
mock_handle_exception.return_value = None
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor._last_parser_error = OutputParserError("test error")
initial_iterations = executor.state.iterations
result = executor.recover_from_parser_error()
assert result == "initialized"
assert executor.state.iterations == initial_iterations + 1
mock_handle_exception.assert_called_once()
@patch("crewai.experimental.crew_agent_executor_flow.handle_context_length")
def test_recover_from_context_length(
self, mock_handle_context, mock_dependencies
):
"""Test recovery from context length error."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor._last_context_error = Exception("context too long")
initial_iterations = executor.state.iterations
result = executor.recover_from_context_length()
assert result == "initialized"
assert executor.state.iterations == initial_iterations + 1
mock_handle_context.assert_called_once()
def test_use_stop_words_property(self, mock_dependencies):
"""Test use_stop_words property."""
mock_dependencies["llm"].supports_stop_words.return_value = True
executor = CrewAgentExecutorFlow(**mock_dependencies)
assert executor.use_stop_words is True
mock_dependencies["llm"].supports_stop_words.return_value = False
executor = CrewAgentExecutorFlow(**mock_dependencies)
assert executor.use_stop_words is False
def test_compatibility_properties(self, mock_dependencies):
"""Test compatibility properties for mixin."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor.state.messages = [{"role": "user", "content": "test"}]
executor.state.iterations = 5
# Test that compatibility properties return state values
assert executor.messages == executor.state.messages
assert executor.iterations == executor.state.iterations
class TestFlowErrorHandling:
"""Test error handling in flow methods."""
@pytest.fixture
def mock_dependencies(self):
"""Create mock dependencies."""
llm = Mock()
llm.supports_stop_words.return_value = True
task = Mock()
task.description = "Test task"
crew = Mock()
agent = Mock()
agent.role = "Test Agent"
agent.verbose = False
prompt = {"prompt": "Test {input}"}
return {
"llm": llm,
"task": task,
"crew": crew,
"agent": agent,
"prompt": prompt,
"max_iter": 10,
"tools": [],
"tools_names": "",
"stop_words": [],
"tools_description": "",
"tools_handler": Mock(),
}
@patch("crewai.experimental.crew_agent_executor_flow.get_llm_response")
@patch("crewai.experimental.crew_agent_executor_flow.enforce_rpm_limit")
def test_call_llm_parser_error(
self, mock_enforce_rpm, mock_get_llm, mock_dependencies
):
"""Test call_llm_and_parse handles OutputParserError."""
from crewai.agents.parser import OutputParserError
mock_enforce_rpm.return_value = None
mock_get_llm.side_effect = OutputParserError("parse failed")
executor = CrewAgentExecutorFlow(**mock_dependencies)
result = executor.call_llm_and_parse()
assert result == "parser_error"
assert executor._last_parser_error is not None
@patch("crewai.experimental.crew_agent_executor_flow.get_llm_response")
@patch("crewai.experimental.crew_agent_executor_flow.enforce_rpm_limit")
@patch("crewai.experimental.crew_agent_executor_flow.is_context_length_exceeded")
def test_call_llm_context_error(
self,
mock_is_context_exceeded,
mock_enforce_rpm,
mock_get_llm,
mock_dependencies,
):
"""Test call_llm_and_parse handles context length error."""
mock_enforce_rpm.return_value = None
mock_get_llm.side_effect = Exception("context length")
mock_is_context_exceeded.return_value = True
executor = CrewAgentExecutorFlow(**mock_dependencies)
result = executor.call_llm_and_parse()
assert result == "context_error"
assert executor._last_context_error is not None
class TestFlowInvoke:
"""Test the invoke method that maintains backward compatibility."""
@pytest.fixture
def mock_dependencies(self):
"""Create mock dependencies."""
llm = Mock()
task = Mock()
task.description = "Test"
task.human_input = False
crew = Mock()
crew._short_term_memory = None
crew._long_term_memory = None
crew._entity_memory = None
crew._external_memory = None
agent = Mock()
agent.role = "Test"
agent.verbose = False
prompt = {"prompt": "Test {input} {tool_names} {tools}"}
return {
"llm": llm,
"task": task,
"crew": crew,
"agent": agent,
"prompt": prompt,
"max_iter": 10,
"tools": [],
"tools_names": "",
"stop_words": [],
"tools_description": "",
"tools_handler": Mock(),
}
@patch.object(CrewAgentExecutorFlow, "kickoff")
@patch.object(CrewAgentExecutorFlow, "_create_short_term_memory")
@patch.object(CrewAgentExecutorFlow, "_create_long_term_memory")
@patch.object(CrewAgentExecutorFlow, "_create_external_memory")
def test_invoke_success(
self,
mock_external_memory,
mock_long_term_memory,
mock_short_term_memory,
mock_kickoff,
mock_dependencies,
):
"""Test successful invoke without human feedback."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
# Mock kickoff to set the final answer in state
def mock_kickoff_side_effect():
executor.state.current_answer = AgentFinish(
thought="final thinking", output="Final result", text="complete"
)
mock_kickoff.side_effect = mock_kickoff_side_effect
inputs = {"input": "test", "tool_names": "", "tools": ""}
result = executor.invoke(inputs)
assert result == {"output": "Final result"}
mock_kickoff.assert_called_once()
mock_short_term_memory.assert_called_once()
mock_long_term_memory.assert_called_once()
mock_external_memory.assert_called_once()
@patch.object(CrewAgentExecutorFlow, "kickoff")
def test_invoke_failure_no_agent_finish(self, mock_kickoff, mock_dependencies):
"""Test invoke fails without AgentFinish."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor.state.current_answer = AgentAction(
thought="thinking", tool="test", tool_input="test", text="action text"
)
inputs = {"input": "test", "tool_names": "", "tools": ""}
with pytest.raises(RuntimeError, match="without reaching a final answer"):
executor.invoke(inputs)
@patch.object(CrewAgentExecutorFlow, "kickoff")
@patch.object(CrewAgentExecutorFlow, "_create_short_term_memory")
@patch.object(CrewAgentExecutorFlow, "_create_long_term_memory")
@patch.object(CrewAgentExecutorFlow, "_create_external_memory")
def test_invoke_with_system_prompt(
self,
mock_external_memory,
mock_long_term_memory,
mock_short_term_memory,
mock_kickoff,
mock_dependencies,
):
"""Test invoke with system prompt configuration."""
mock_dependencies["prompt"] = {
"system": "System: {input}",
"user": "User: {input} {tool_names} {tools}",
}
executor = CrewAgentExecutorFlow(**mock_dependencies)
def mock_kickoff_side_effect():
executor.state.current_answer = AgentFinish(
thought="final thoughts", output="Done", text="complete"
)
mock_kickoff.side_effect = mock_kickoff_side_effect
inputs = {"input": "test", "tool_names": "", "tools": ""}
result = executor.invoke(inputs)
mock_short_term_memory.assert_called_once()
mock_long_term_memory.assert_called_once()
mock_external_memory.assert_called_once()
mock_kickoff.assert_called_once()
assert result == {"output": "Done"}
assert len(executor.state.messages) >= 2

978
uv.lock generated

File diff suppressed because it is too large Load Diff