diff --git a/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py b/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py index 5864a4995..cc4c9d4d8 100644 --- a/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py +++ b/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py @@ -3,6 +3,7 @@ from __future__ import annotations import time from typing import TYPE_CHECKING +from crewai.agents.parser import AgentFinish from crewai.events.event_listener import event_listener from crewai.memory.entity.entity_memory_item import EntityMemoryItem from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem @@ -29,7 +30,7 @@ class CrewAgentExecutorMixin: _i18n: I18N _printer: Printer = Printer() - def _create_short_term_memory(self, output) -> None: + def _create_short_term_memory(self, output: AgentFinish) -> None: """Create and save a short-term memory item if conditions are met.""" if ( self.crew @@ -53,7 +54,7 @@ class CrewAgentExecutorMixin: "error", f"Failed to add to short term memory: {e}" ) - def _create_external_memory(self, output) -> None: + def _create_external_memory(self, output: AgentFinish) -> None: """Create and save a external-term memory item if conditions are met.""" if ( self.crew @@ -75,7 +76,7 @@ class CrewAgentExecutorMixin: "error", f"Failed to add to external memory: {e}" ) - def _create_long_term_memory(self, output) -> None: + def _create_long_term_memory(self, output: AgentFinish) -> None: """Create and save long-term and entity memory items based on evaluation.""" if ( self.crew @@ -136,40 +137,50 @@ class CrewAgentExecutorMixin: ) def _ask_human_input(self, final_answer: str) -> str: - """Prompt human input with mode-appropriate messaging.""" - event_listener.formatter.pause_live_updates() - try: - self._printer.print( - content=f"\033[1m\033[95m ## Final Result:\033[00m \033[92m{final_answer}\033[00m" - ) + """Prompt human input with mode-appropriate messaging. + Note: The final answer is already displayed via the AgentLogsExecutionEvent + panel, so we only show the feedback prompt here. + """ + from rich.panel import Panel + from rich.text import Text + + formatter = event_listener.formatter + formatter.pause_live_updates() + + try: # Training mode prompt (single iteration) if self.crew and getattr(self.crew, "_train", False): - prompt = ( - "\n\n=====\n" - "## TRAINING MODE: Provide feedback to improve the agent's performance.\n" + prompt_text = ( + "TRAINING MODE: Provide feedback to improve the agent's performance.\n\n" "This will be used to train better versions of the agent.\n" - "Please provide detailed feedback about the result quality and reasoning process.\n" - "=====\n" + "Please provide detailed feedback about the result quality and reasoning process." ) + title = "🎓 Training Feedback Required" # Regular human-in-the-loop prompt (multiple iterations) else: - prompt = ( - "\n\n=====\n" - "## HUMAN FEEDBACK: Provide feedback on the Final Result and Agent's actions.\n" - "Please follow these guidelines:\n" - " - If you are happy with the result, simply hit Enter without typing anything.\n" - " - Otherwise, provide specific improvement requests.\n" - " - You can provide multiple rounds of feedback until satisfied.\n" - "=====\n" + prompt_text = ( + "Provide feedback on the Final Result above.\n\n" + "• If you are happy with the result, simply hit Enter without typing anything.\n" + "• Otherwise, provide specific improvement requests.\n" + "• You can provide multiple rounds of feedback until satisfied." ) + title = "💬 Human Feedback Required" + + content = Text() + content.append(prompt_text, style="yellow") + + prompt_panel = Panel( + content, + title=title, + border_style="yellow", + padding=(1, 2), + ) + formatter.console.print(prompt_panel) - self._printer.print(content=prompt, color="bold_yellow") response = input() if response.strip() != "": - self._printer.print( - content="\nProcessing your feedback...", color="cyan" - ) + formatter.console.print("\n[cyan]Processing your feedback...[/cyan]") return response finally: - event_listener.formatter.resume_live_updates() + formatter.resume_live_updates() diff --git a/lib/crewai/src/crewai/agents/crew_agent_executor.py b/lib/crewai/src/crewai/agents/crew_agent_executor.py index e11e3e80a..45d4f84f3 100644 --- a/lib/crewai/src/crewai/agents/crew_agent_executor.py +++ b/lib/crewai/src/crewai/agents/crew_agent_executor.py @@ -7,6 +7,7 @@ and memory management. from __future__ import annotations from collections.abc import Callable +import logging from typing import TYPE_CHECKING, Any, Literal, cast from pydantic import BaseModel, GetCoreSchemaHandler @@ -51,6 +52,8 @@ from crewai.utilities.tool_utils import ( from crewai.utilities.training_handler import CrewTrainingHandler +logger = logging.getLogger(__name__) + if TYPE_CHECKING: from crewai.agent import Agent from crewai.agents.tools_handler import ToolsHandler @@ -541,7 +544,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): if self.agent is None: raise ValueError("Agent cannot be None") - crewai_event_bus.emit( + future = crewai_event_bus.emit( self.agent, AgentLogsExecutionEvent( agent_role=self.agent.role, @@ -551,6 +554,12 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): ), ) + if future is not None: + try: + future.result(timeout=5.0) + except Exception as e: + logger.error(f"Failed to show logs for agent execution event: {e}") + def _handle_crew_training_output( self, result: AgentFinish, human_feedback: str | None = None ) -> None: diff --git a/lib/crewai/src/crewai/events/event_listener.py b/lib/crewai/src/crewai/events/event_listener.py index 1c7602587..36c37e9c9 100644 --- a/lib/crewai/src/crewai/events/event_listener.py +++ b/lib/crewai/src/crewai/events/event_listener.py @@ -1,7 +1,6 @@ from __future__ import annotations from io import StringIO -import threading from typing import TYPE_CHECKING, Any from pydantic import Field, PrivateAttr @@ -17,8 +16,6 @@ from crewai.events.types.a2a_events import ( A2AResponseReceivedEvent, ) from crewai.events.types.agent_events import ( - AgentExecutionCompletedEvent, - AgentExecutionStartedEvent, LiteAgentExecutionCompletedEvent, LiteAgentExecutionErrorEvent, LiteAgentExecutionStartedEvent, @@ -48,7 +45,6 @@ from crewai.events.types.flow_events import ( from crewai.events.types.knowledge_events import ( KnowledgeQueryCompletedEvent, KnowledgeQueryFailedEvent, - KnowledgeQueryStartedEvent, KnowledgeRetrievalCompletedEvent, KnowledgeRetrievalStartedEvent, KnowledgeSearchQueryFailedEvent, @@ -112,7 +108,6 @@ class EventListener(BaseEventListener): text_stream: StringIO = StringIO() knowledge_retrieval_in_progress: bool = False knowledge_query_in_progress: bool = False - method_branches: dict[str, Any] = Field(default_factory=dict) def __new__(cls) -> EventListener: if cls._instance is None: @@ -126,10 +121,8 @@ class EventListener(BaseEventListener): self._telemetry = Telemetry() self._telemetry.set_tracer() self.execution_spans = {} - self.method_branches = {} self._initialized = True self.formatter = ConsoleFormatter(verbose=True) - self._crew_tree_lock = threading.Condition() # Initialize trace listener with formatter for memory event handling trace_listener = TraceCollectionListener() @@ -140,12 +133,10 @@ class EventListener(BaseEventListener): def setup_listeners(self, crewai_event_bus: CrewAIEventsBus) -> None: @crewai_event_bus.on(CrewKickoffStartedEvent) def on_crew_started(source: Any, event: CrewKickoffStartedEvent) -> None: - with self._crew_tree_lock: - self.formatter.create_crew_tree(event.crew_name or "Crew", source.id) - source._execution_span = self._telemetry.crew_execution_span( - source, event.inputs - ) - self._crew_tree_lock.notify_all() + self.formatter.handle_crew_started(event.crew_name or "Crew", source.id) + source._execution_span = self._telemetry.crew_execution_span( + source, event.inputs + ) @crewai_event_bus.on(CrewKickoffCompletedEvent) def on_crew_completed(source: Any, event: CrewKickoffCompletedEvent) -> None: @@ -153,8 +144,7 @@ class EventListener(BaseEventListener): final_string_output = event.output.raw self._telemetry.end_crew(source, final_string_output) - self.formatter.update_crew_tree( - self.formatter.current_crew_tree, + self.formatter.handle_crew_status( event.crew_name or "Crew", source.id, "completed", @@ -163,8 +153,7 @@ class EventListener(BaseEventListener): @crewai_event_bus.on(CrewKickoffFailedEvent) def on_crew_failed(source: Any, event: CrewKickoffFailedEvent) -> None: - self.formatter.update_crew_tree( - self.formatter.current_crew_tree, + self.formatter.handle_crew_status( event.crew_name or "Crew", source.id, "failed", @@ -197,23 +186,22 @@ class EventListener(BaseEventListener): # ----------- TASK EVENTS ----------- + def get_task_name(source: Any) -> str | None: + return ( + source.name + if hasattr(source, "name") and source.name + else source.description + if hasattr(source, "description") and source.description + else None + ) + @crewai_event_bus.on(TaskStartedEvent) def on_task_started(source: Any, event: TaskStartedEvent) -> None: span = self._telemetry.task_started(crew=source.agent.crew, task=source) self.execution_spans[source] = span - with self._crew_tree_lock: - self._crew_tree_lock.wait_for( - lambda: self.formatter.current_crew_tree is not None, timeout=5.0 - ) - - if self.formatter.current_crew_tree is not None: - task_name = ( - source.name if hasattr(source, "name") and source.name else None - ) - self.formatter.create_task_branch( - self.formatter.current_crew_tree, source.id, task_name - ) + task_name = get_task_name(source) + self.formatter.handle_task_started(source.id, task_name) @crewai_event_bus.on(TaskCompletedEvent) def on_task_completed(source: Any, event: TaskCompletedEvent) -> None: @@ -224,13 +212,9 @@ class EventListener(BaseEventListener): self.execution_spans[source] = None # Pass task name if it exists - task_name = source.name if hasattr(source, "name") and source.name else None - self.formatter.update_task_status( - self.formatter.current_crew_tree, - source.id, - source.agent.role, - "completed", - task_name, + task_name = get_task_name(source) + self.formatter.handle_task_status( + source.id, source.agent.role, "completed", task_name ) @crewai_event_bus.on(TaskFailedEvent) @@ -242,37 +226,12 @@ class EventListener(BaseEventListener): self.execution_spans[source] = None # Pass task name if it exists - task_name = source.name if hasattr(source, "name") and source.name else None - self.formatter.update_task_status( - self.formatter.current_crew_tree, - source.id, - source.agent.role, - "failed", - task_name, + task_name = get_task_name(source) + self.formatter.handle_task_status( + source.id, source.agent.role, "failed", task_name ) # ----------- AGENT EVENTS ----------- - - @crewai_event_bus.on(AgentExecutionStartedEvent) - def on_agent_execution_started( - _: Any, event: AgentExecutionStartedEvent - ) -> None: - self.formatter.create_agent_branch( - self.formatter.current_task_branch, - event.agent.role, - self.formatter.current_crew_tree, - ) - - @crewai_event_bus.on(AgentExecutionCompletedEvent) - def on_agent_execution_completed( - _: Any, event: AgentExecutionCompletedEvent - ) -> None: - self.formatter.update_agent_status( - self.formatter.current_agent_branch, - event.agent.role, - self.formatter.current_crew_tree, - ) - # ----------- LITE AGENT EVENTS ----------- @crewai_event_bus.on(LiteAgentExecutionStartedEvent) @@ -316,79 +275,61 @@ class EventListener(BaseEventListener): self._telemetry.flow_execution_span( event.flow_name, list(source._methods.keys()) ) - tree = self.formatter.create_flow_tree(event.flow_name, str(source.flow_id)) - self.formatter.current_flow_tree = tree - self.formatter.start_flow(event.flow_name, str(source.flow_id)) + self.formatter.handle_flow_created(event.flow_name, str(source.flow_id)) + self.formatter.handle_flow_started(event.flow_name, str(source.flow_id)) @crewai_event_bus.on(FlowFinishedEvent) def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None: - self.formatter.update_flow_status( - self.formatter.current_flow_tree, event.flow_name, source.flow_id + self.formatter.handle_flow_status( + event.flow_name, + source.flow_id, ) @crewai_event_bus.on(MethodExecutionStartedEvent) def on_method_execution_started( _: Any, event: MethodExecutionStartedEvent ) -> None: - method_branch = self.method_branches.get(event.method_name) - updated_branch = self.formatter.update_method_status( - method_branch, - self.formatter.current_flow_tree, + self.formatter.handle_method_status( event.method_name, "running", ) - self.method_branches[event.method_name] = updated_branch @crewai_event_bus.on(MethodExecutionFinishedEvent) def on_method_execution_finished( _: Any, event: MethodExecutionFinishedEvent ) -> None: - method_branch = self.method_branches.get(event.method_name) - updated_branch = self.formatter.update_method_status( - method_branch, - self.formatter.current_flow_tree, + self.formatter.handle_method_status( event.method_name, "completed", ) - self.method_branches[event.method_name] = updated_branch @crewai_event_bus.on(MethodExecutionFailedEvent) def on_method_execution_failed( _: Any, event: MethodExecutionFailedEvent ) -> None: - method_branch = self.method_branches.get(event.method_name) - updated_branch = self.formatter.update_method_status( - method_branch, - self.formatter.current_flow_tree, + self.formatter.handle_method_status( event.method_name, "failed", ) - self.method_branches[event.method_name] = updated_branch @crewai_event_bus.on(MethodExecutionPausedEvent) def on_method_execution_paused( _: Any, event: MethodExecutionPausedEvent ) -> None: - method_branch = self.method_branches.get(event.method_name) - updated_branch = self.formatter.update_method_status( - method_branch, - self.formatter.current_flow_tree, + self.formatter.handle_method_status( event.method_name, "paused", ) - self.method_branches[event.method_name] = updated_branch @crewai_event_bus.on(FlowPausedEvent) def on_flow_paused(_: Any, event: FlowPausedEvent) -> None: - self.formatter.update_flow_status( - self.formatter.current_flow_tree, + self.formatter.handle_flow_status( event.flow_name, event.flow_id, "paused", ) # ----------- TOOL USAGE EVENTS ----------- - @crewai_event_bus.on(ToolUsageStartedEvent) def on_tool_usage_started(source: Any, event: ToolUsageStartedEvent) -> None: if isinstance(source, LLM): @@ -398,9 +339,9 @@ class EventListener(BaseEventListener): ) else: self.formatter.handle_tool_usage_started( - self.formatter.current_agent_branch, event.tool_name, - self.formatter.current_crew_tree, + event.tool_args, + event.run_attempts, ) @crewai_event_bus.on(ToolUsageFinishedEvent) @@ -409,12 +350,6 @@ class EventListener(BaseEventListener): self.formatter.handle_llm_tool_usage_finished( event.tool_name, ) - else: - self.formatter.handle_tool_usage_finished( - self.formatter.current_tool_branch, - event.tool_name, - self.formatter.current_crew_tree, - ) @crewai_event_bus.on(ToolUsageErrorEvent) def on_tool_usage_error(source: Any, event: ToolUsageErrorEvent) -> None: @@ -425,10 +360,9 @@ class EventListener(BaseEventListener): ) else: self.formatter.handle_tool_usage_error( - self.formatter.current_tool_branch, event.tool_name, event.error, - self.formatter.current_crew_tree, + event.run_attempts, ) # ----------- LLM EVENTS ----------- @@ -437,32 +371,15 @@ class EventListener(BaseEventListener): def on_llm_call_started(_: Any, event: LLMCallStartedEvent) -> None: self.text_stream = StringIO() self.next_chunk = 0 - # Capture the returned tool branch and update the current_tool_branch reference - thinking_branch = self.formatter.handle_llm_call_started( - self.formatter.current_agent_branch, - self.formatter.current_crew_tree, - ) - # Update the formatter's current_tool_branch to ensure proper cleanup - if thinking_branch is not None: - self.formatter.current_tool_branch = thinking_branch @crewai_event_bus.on(LLMCallCompletedEvent) def on_llm_call_completed(_: Any, event: LLMCallCompletedEvent) -> None: self.formatter.handle_llm_stream_completed() - self.formatter.handle_llm_call_completed( - self.formatter.current_tool_branch, - self.formatter.current_agent_branch, - self.formatter.current_crew_tree, - ) @crewai_event_bus.on(LLMCallFailedEvent) def on_llm_call_failed(_: Any, event: LLMCallFailedEvent) -> None: self.formatter.handle_llm_stream_completed() - self.formatter.handle_llm_call_failed( - self.formatter.current_tool_branch, - event.error, - self.formatter.current_crew_tree, - ) + self.formatter.handle_llm_call_failed(event.error) @crewai_event_bus.on(LLMStreamChunkEvent) def on_llm_stream_chunk(_: Any, event: LLMStreamChunkEvent) -> None: @@ -473,9 +390,7 @@ class EventListener(BaseEventListener): accumulated_text = self.text_stream.getvalue() self.formatter.handle_llm_stream_chunk( - event.chunk, accumulated_text, - self.formatter.current_crew_tree, event.call_type, ) @@ -515,7 +430,6 @@ class EventListener(BaseEventListener): @crewai_event_bus.on(CrewTestCompletedEvent) def on_crew_test_completed(_: Any, event: CrewTestCompletedEvent) -> None: self.formatter.handle_crew_test_completed( - self.formatter.current_flow_tree, event.crew_name or "Crew", ) @@ -532,10 +446,7 @@ class EventListener(BaseEventListener): self.knowledge_retrieval_in_progress = True - self.formatter.handle_knowledge_retrieval_started( - self.formatter.current_agent_branch, - self.formatter.current_crew_tree, - ) + self.formatter.handle_knowledge_retrieval_started() @crewai_event_bus.on(KnowledgeRetrievalCompletedEvent) def on_knowledge_retrieval_completed( @@ -546,24 +457,13 @@ class EventListener(BaseEventListener): self.knowledge_retrieval_in_progress = False self.formatter.handle_knowledge_retrieval_completed( - self.formatter.current_agent_branch, - self.formatter.current_crew_tree, event.retrieved_knowledge, + event.query, ) - @crewai_event_bus.on(KnowledgeQueryStartedEvent) - def on_knowledge_query_started( - _: Any, event: KnowledgeQueryStartedEvent - ) -> None: - pass - @crewai_event_bus.on(KnowledgeQueryFailedEvent) def on_knowledge_query_failed(_: Any, event: KnowledgeQueryFailedEvent) -> None: - self.formatter.handle_knowledge_query_failed( - self.formatter.current_agent_branch, - event.error, - self.formatter.current_crew_tree, - ) + self.formatter.handle_knowledge_query_failed(event.error) @crewai_event_bus.on(KnowledgeQueryCompletedEvent) def on_knowledge_query_completed( @@ -575,11 +475,7 @@ class EventListener(BaseEventListener): def on_knowledge_search_query_failed( _: Any, event: KnowledgeSearchQueryFailedEvent ) -> None: - self.formatter.handle_knowledge_search_query_failed( - self.formatter.current_agent_branch, - event.error, - self.formatter.current_crew_tree, - ) + self.formatter.handle_knowledge_search_query_failed(event.error) # ----------- REASONING EVENTS ----------- @@ -587,11 +483,7 @@ class EventListener(BaseEventListener): def on_agent_reasoning_started( _: Any, event: AgentReasoningStartedEvent ) -> None: - self.formatter.handle_reasoning_started( - self.formatter.current_agent_branch, - event.attempt, - self.formatter.current_crew_tree, - ) + self.formatter.handle_reasoning_started(event.attempt) @crewai_event_bus.on(AgentReasoningCompletedEvent) def on_agent_reasoning_completed( @@ -600,14 +492,12 @@ class EventListener(BaseEventListener): self.formatter.handle_reasoning_completed( event.plan, event.ready, - self.formatter.current_crew_tree, ) @crewai_event_bus.on(AgentReasoningFailedEvent) def on_agent_reasoning_failed(_: Any, event: AgentReasoningFailedEvent) -> None: self.formatter.handle_reasoning_failed( event.error, - self.formatter.current_crew_tree, ) # ----------- AGENT LOGGING EVENTS ----------- @@ -734,18 +624,6 @@ class EventListener(BaseEventListener): event.tool_args, ) - @crewai_event_bus.on(MCPToolExecutionCompletedEvent) - def on_mcp_tool_execution_completed( - _: Any, event: MCPToolExecutionCompletedEvent - ) -> None: - self.formatter.handle_mcp_tool_execution_completed( - event.server_name, - event.tool_name, - event.tool_args, - event.result, - event.execution_duration_ms, - ) - @crewai_event_bus.on(MCPToolExecutionFailedEvent) def on_mcp_tool_execution_failed( _: Any, event: MCPToolExecutionFailedEvent diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py index c8f7000cd..6e7fba0ef 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py @@ -1,7 +1,7 @@ """Trace collection listener for orchestrating trace collection.""" import os -from typing import Any, ClassVar +from typing import Any, ClassVar, cast import uuid from typing_extensions import Self @@ -105,7 +105,7 @@ class TraceCollectionListener(BaseEventListener): """Create or return singleton instance.""" if cls._instance is None: cls._instance = super().__new__(cls) - return cls._instance + return cast(Self, cls._instance) def __init__( self, @@ -319,21 +319,12 @@ class TraceCollectionListener(BaseEventListener): source: Any, event: MemoryQueryCompletedEvent ) -> None: self._handle_action_event("memory_query_completed", source, event) - if self.formatter and self.memory_retrieval_in_progress: - self.formatter.handle_memory_query_completed( - self.formatter.current_agent_branch, - event.source_type or "memory", - event.query_time_ms, - self.formatter.current_crew_tree, - ) @event_bus.on(MemoryQueryFailedEvent) def on_memory_query_failed(source: Any, event: MemoryQueryFailedEvent) -> None: self._handle_action_event("memory_query_failed", source, event) if self.formatter and self.memory_retrieval_in_progress: self.formatter.handle_memory_query_failed( - self.formatter.current_agent_branch, - self.formatter.current_crew_tree, event.error, event.source_type or "memory", ) @@ -347,10 +338,7 @@ class TraceCollectionListener(BaseEventListener): self.memory_save_in_progress = True - self.formatter.handle_memory_save_started( - self.formatter.current_agent_branch, - self.formatter.current_crew_tree, - ) + self.formatter.handle_memory_save_started() @event_bus.on(MemorySaveCompletedEvent) def on_memory_save_completed( @@ -364,8 +352,6 @@ class TraceCollectionListener(BaseEventListener): self.memory_save_in_progress = False self.formatter.handle_memory_save_completed( - self.formatter.current_agent_branch, - self.formatter.current_crew_tree, event.save_time_ms, event.source_type or "memory", ) @@ -375,10 +361,8 @@ class TraceCollectionListener(BaseEventListener): self._handle_action_event("memory_save_failed", source, event) if self.formatter and self.memory_save_in_progress: self.formatter.handle_memory_save_failed( - self.formatter.current_agent_branch, event.error, event.source_type or "memory", - self.formatter.current_crew_tree, ) @event_bus.on(MemoryRetrievalStartedEvent) @@ -391,10 +375,7 @@ class TraceCollectionListener(BaseEventListener): self.memory_retrieval_in_progress = True - self.formatter.handle_memory_retrieval_started( - self.formatter.current_agent_branch, - self.formatter.current_crew_tree, - ) + self.formatter.handle_memory_retrieval_started() @event_bus.on(MemoryRetrievalCompletedEvent) def on_memory_retrieval_completed( @@ -406,8 +387,6 @@ class TraceCollectionListener(BaseEventListener): self.memory_retrieval_in_progress = False self.formatter.handle_memory_retrieval_completed( - self.formatter.current_agent_branch, - self.formatter.current_crew_tree, event.memory_content, event.retrieval_time_ms, ) diff --git a/lib/crewai/src/crewai/events/utils/console_formatter.py b/lib/crewai/src/crewai/events/utils/console_formatter.py index b6a5c7d6b..a395db39f 100644 --- a/lib/crewai/src/crewai/events/utils/console_formatter.py +++ b/lib/crewai/src/crewai/events/utils/console_formatter.py @@ -4,41 +4,23 @@ from typing import Any, ClassVar from rich.console import Console from rich.live import Live from rich.panel import Panel -from rich.syntax import Syntax from rich.text import Text -from rich.tree import Tree class ConsoleFormatter: - current_crew_tree: Tree | None = None - current_task_branch: Tree | None = None - current_agent_branch: Tree | None = None - current_tool_branch: Tree | None = None - current_flow_tree: Tree | None = None - current_method_branch: Tree | None = None - current_lite_agent_branch: Tree | None = None tool_usage_counts: ClassVar[dict[str, int]] = {} - current_reasoning_branch: Tree | None = None - _live_paused: bool = False - current_llm_tool_tree: Tree | None = None - current_a2a_conversation_branch: Tree | str | None = None + current_a2a_turn_count: int = 0 _pending_a2a_message: str | None = None _pending_a2a_agent_role: str | None = None _pending_a2a_turn_number: int | None = None - _a2a_turn_branches: ClassVar[dict[int, Tree]] = {} _current_a2a_agent_name: str | None = None + crew_completion_printed: ClassVar[threading.Event] = threading.Event() def __init__(self, verbose: bool = False): self.console = Console(width=None) self.verbose = verbose - # Live instance to dynamically update a Tree renderable (e.g. the Crew tree) - # When multiple Tree objects are printed sequentially we reuse this Live - # instance so the previous render is replaced instead of writing a new one. - # Once any non-Tree renderable is printed we stop the Live session so the - # final Tree persists on the terminal. - self._live: Live | None = None self._streaming_live: Live | None = None self._is_streaming: bool = False self._just_streamed_final_answer: bool = False @@ -95,98 +77,44 @@ To enable tracing, do any one of these: """Create standardized status content with consistent formatting.""" content = Text() content.append(f"{title}\n", style=f"{status_style} bold") - content.append("Name: ", style="white") + content.append("Name: \n", style="white") content.append(f"{name}\n", style=status_style) for label, value in fields.items(): - content.append(f"{label}: ", style="white") + content.append(f"{label}: \n", style="white") content.append( f"{value}\n", style=fields.get(f"{label}_style", status_style) ) - content.append("Tool Args: ", style="white") - content.append(f"{tool_args}\n", style=status_style) + if tool_args: + content.append("Tool Args: \n", style="white") + content.append(f"{tool_args}\n", style=status_style) return content - def update_tree_label( - self, - tree: Tree, - prefix: str, - name: str, - style: str = "blue", - status: str | None = None, - ) -> None: - """Update tree label with consistent formatting.""" - label = Text() - label.append(f"{prefix} ", style=f"{style} bold") - label.append(name, style=style) - if status: - label.append("\nStatus: ", style="white") - label.append(status, style=f"{style} bold") - tree.label = label - - def add_tree_node(self, parent: Tree, text: str, style: str = "yellow") -> Tree: - """Add a node to the tree with consistent styling.""" - return parent.add(Text(text, style=style)) - def print(self, *args: Any, **kwargs: Any) -> None: - """Custom print that replaces consecutive Tree renders. - - * If the argument is a single ``Tree`` instance, we either start a - ``Live`` session (first tree) or update the existing one (subsequent - trees). This results in the tree being rendered in-place instead of - being appended repeatedly to the log. - - * A blank call (no positional arguments) is ignored while a Live - session is active so it does not prematurely terminate the tree - rendering. - - * Any other renderable will terminate the Live session (if one is - active) so the last tree stays on screen and the new content is - printed normally. - """ - - # Case 1: updating / starting live Tree rendering - if len(args) == 1 and isinstance(args[0], Tree): - tree = args[0] - - if self._is_streaming: - return - - if not self._live: - # Start a new Live session for the first tree - self._live = Live(tree, console=self.console, refresh_per_second=4) - self._live.start() - else: - # Update existing Live session - self._live.update(tree, refresh=True) - return # Nothing else to do - - # Case 2: blank line while a live session is running - ignore so we - # don't break the in-place rendering behaviour - if len(args) == 0 and self._live: + """Print to console. Simplified to only handle panel-based output.""" + # Skip blank lines during streaming + if len(args) == 0 and self._is_streaming: return - - # Case 3: printing something other than a Tree → terminate live session - if self._live: - self._live.stop() - self._live = None - - # Finally, pass through to the regular Console.print implementation self.console.print(*args, **kwargs) def pause_live_updates(self) -> None: - """Pause Live session updates to allow for human input without interference.""" - if not self._live_paused: - if self._live: - self._live.stop() - self._live = None - self._live_paused = True + """Pause Live session updates to allow for human input without interference. + + This stops any active streaming Live session to prevent console refresh + interference during HITL (Human-in-the-Loop) user input. + """ + if self._streaming_live: + self._streaming_live.stop() + self._streaming_live = None def resume_live_updates(self) -> None: - """Resume Live session updates after human input is complete.""" - if self._live_paused: - self._live_paused = False + """Resume Live session updates after human input is complete. + + New streaming sessions will be created on-demand when needed. + This method exists for API compatibility with HITL callers. + """ + pass def print_panel( self, content: Text, title: str, style: str = "blue", is_flow: bool = False @@ -201,38 +129,30 @@ To enable tracing, do any one of these: self.print(panel) self.print() - def update_crew_tree( + def handle_crew_status( self, - tree: Tree | None, crew_name: str, source_id: str, status: str = "completed", final_string_output: str = "", ) -> None: - """Handle crew tree updates with consistent formatting.""" - if not self.verbose or tree is None: + """Handle crew completion/failure with panel display.""" + if not self.verbose: return if status == "completed": - prefix, style = "✅ Crew:", "green" + style = "green" title = "Crew Completion" content_title = "Crew Execution Completed" elif status == "failed": - prefix, style = "❌ Crew:", "red" + style = "red" title = "Crew Failure" content_title = "Crew Execution Failed" else: - prefix, style = "🚀 Crew:", "cyan" + style = "cyan" title = "Crew Execution" content_title = "Crew Execution Started" - self.update_tree_label( - tree, - prefix, - crew_name or "Crew", - style, - ) - content = self.create_status_content( content_title, crew_name or "Crew", @@ -243,29 +163,23 @@ To enable tracing, do any one of these: if status == "failed" and final_string_output: content.append("Error:\n", style="white bold") content.append(f"{final_string_output}\n", style="red") - else: + elif final_string_output: content.append(f"Final Output: {final_string_output}\n", style="white") self.print_panel(content, title, style) if status in ["completed", "failed"]: self.crew_completion_printed.set() - - # Show tracing disabled message after crew completion self._show_tracing_disabled_message_if_needed() - def create_crew_tree(self, crew_name: str, source_id: str) -> Tree | None: - """Create and initialize a new crew tree with initial status.""" + def handle_crew_started(self, crew_name: str, source_id: str) -> None: + """Show crew started panel.""" if not self.verbose: - return None + return # Reset the crew completion event for this new crew execution ConsoleFormatter.crew_completion_printed.clear() - tree = Tree( - Text("🚀 Crew: ", style="cyan bold") + Text(crew_name, style="cyan") - ) - content = self.create_status_content( "Crew Execution Started", crew_name, @@ -273,219 +187,85 @@ To enable tracing, do any one of these: ID=source_id, ) - self.print_panel(content, "Crew Execution Started", "cyan") + self.print_panel(content, "🚀 Crew Execution Started", "cyan") - # Set the current_crew_tree attribute directly - self.current_crew_tree = tree - - return tree - - def create_task_branch( - self, crew_tree: Tree | None, task_id: str, task_name: str | None = None - ) -> Tree | None: - """Create and initialize a task branch.""" + def handle_task_started(self, task_id: str, task_name: str | None = None) -> None: + """Show task started panel.""" if not self.verbose: - return None + return - task_content = Text() + content = Text() + display_name = task_name if task_name else task_id - # Display task name if available, otherwise just the ID - if task_name: - task_content.append("📋 Task: ", style="yellow bold") - task_content.append(f"{task_name}", style="yellow bold") - task_content.append(f" (ID: {task_id})", style="yellow dim") - else: - task_content.append(f"📋 Task: {task_id}", style="yellow bold") + content.append("Task Started\n", style="yellow bold") + content.append("Name: ", style="white") + content.append(f"{display_name}\n", style="yellow") + content.append("ID: ", style="white") + content.append(f"{task_id}\n", style="yellow ") - task_content.append("\nStatus: ", style="white") - task_content.append("Executing Task...", style="yellow dim") + self.print_panel(content, "📋 Task Started", "yellow") - task_branch = None - if crew_tree: - task_branch = crew_tree.add(task_content) - self.print(crew_tree) - else: - self.print_panel(task_content, "Task Started", "yellow") - - self.print() - - # Set the current_task_branch attribute directly - self.current_task_branch = task_branch - - return task_branch - - def update_task_status( + def handle_task_status( self, - crew_tree: Tree | None, task_id: str, agent_role: str, status: str = "completed", task_name: str | None = None, ) -> None: - """Update task status in the tree.""" - if not self.verbose or crew_tree is None: + """Show task completion/failure panel.""" + if not self.verbose: return if status == "completed": style = "green" - status_text = "✅ Completed" - panel_title = "Task Completion" + panel_title = "📋 Task Completion" else: style = "red" - status_text = "❌ Failed" - panel_title = "Task Failure" + panel_title = "📋 Task Failure" - # Update tree label - for branch in crew_tree.children: - if str(task_id) in str(branch.label): - # Build label without introducing stray blank lines - task_content = Text() - # First line: Task ID/name - if task_name: - task_content.append("📋 Task: ", style=f"{style} bold") - task_content.append(f"{task_name}", style=f"{style} bold") - task_content.append(f" (ID: {task_id})", style=f"{style} dim") - else: - task_content.append(f"📋 Task: {task_id}", style=f"{style} bold") - - # Second line: Assigned to - task_content.append("\nAssigned to: ", style="white") - task_content.append(agent_role, style=style) - - # Third line: Status - task_content.append("\nStatus: ", style="white") - task_content.append(status_text, style=f"{style} bold") - branch.label = task_content - self.print(crew_tree) - break - - # Show status panel display_name = task_name if task_name else str(task_id) content = self.create_status_content( f"Task {status.title()}", display_name, style, Agent=agent_role ) self.print_panel(content, panel_title, style) - def create_agent_branch( - self, task_branch: Tree | None, agent_role: str, crew_tree: Tree | None - ) -> Tree | None: - """Create and initialize an agent branch.""" - if not self.verbose or not task_branch or not crew_tree: - return None - - # Instead of creating a separate Agent node, we treat the task branch - # itself as the logical agent branch so that Reasoning/Tool nodes are - # nested under the task without an extra visual level. - - # Store the task branch as the current_agent_branch for future nesting. - self.current_agent_branch = task_branch - - # No additional tree modification needed; return the task branch so - # caller logic remains unchanged. - return task_branch - - def update_agent_status( - self, - agent_branch: Tree | None, - agent_role: str, - crew_tree: Tree | None, - status: str = "completed", - ) -> None: - """Update agent status in the tree.""" - # We no longer render a separate agent branch, so this method simply - # updates the stored branch reference (already the task branch) without - # altering the tree. Keeping it a no-op avoids duplicate status lines. - return - - def create_flow_tree(self, flow_name: str, flow_id: str) -> Tree | None: - """Create and initialize a flow tree.""" + def handle_flow_created(self, flow_name: str, flow_id: str) -> None: + """Show flow started panel.""" content = self.create_status_content( "Starting Flow Execution", flow_name, "blue", ID=flow_id ) - self.print_panel(content, "Flow Execution", "blue", is_flow=True) + self.print_panel(content, "🌊 Flow Execution", "blue", is_flow=True) - # Create initial tree with flow ID - flow_label = Text() - flow_label.append("🌊 Flow: ", style="blue bold") - flow_label.append(flow_name, style="blue") - flow_label.append("\nID: ", style="white") - flow_label.append(flow_id, style="blue") + def handle_flow_started(self, flow_name: str, flow_id: str) -> None: + """Show flow started panel.""" + content = Text() + content.append("Flow Started\n", style="blue bold") + content.append("Name: ", style="white") + content.append(f"{flow_name}\n", style="blue") + content.append("ID: ", style="white") + content.append(f"{flow_id}\n", style="blue") - flow_tree = Tree(flow_label) - self.add_tree_node(flow_tree, "✨ Created", "blue") - self.add_tree_node(flow_tree, "✅ Initialization Complete", "green") + self.print_panel(content, "🌊 Flow Started", "blue", is_flow=True) - return flow_tree - - def start_flow(self, flow_name: str, flow_id: str) -> Tree | None: - """Initialize or update a flow execution tree.""" - if self.current_flow_tree is not None: - for child in self.current_flow_tree.children: - if "Starting Flow" in str(child.label): - child.label = Text("🚀 Flow Started", style="green") - break - return self.current_flow_tree - - flow_tree = Tree("") - flow_label = Text() - flow_label.append("🌊 Flow: ", style="blue bold") - flow_label.append(flow_name, style="blue") - flow_label.append("\nID: ", style="white") - flow_label.append(flow_id, style="blue") - flow_tree.label = flow_label - - self.add_tree_node(flow_tree, "🧠 Starting Flow...", "yellow") - - self.print(flow_tree) - self.print() - - self.current_flow_tree = flow_tree - return flow_tree - - def update_flow_status( + def handle_flow_status( self, - flow_tree: Tree | None, flow_name: str, flow_id: str, status: str = "completed", ) -> None: - """Update flow status in the tree.""" - if flow_tree is None: - return - - # Determine status-specific labels and styles + """Show flow status panel.""" if status == "completed": - label_prefix = "✅ Flow Finished:" style = "green" - node_text = "✅ Flow Completed" content_text = "Flow Execution Completed" - panel_title = "Flow Completion" + panel_title = "✅ Flow Completion" elif status == "paused": - label_prefix = "⏳ Flow Paused:" style = "cyan" - node_text = "⏳ Waiting for Human Feedback" content_text = "Flow Paused - Waiting for Feedback" - panel_title = "Flow Paused" + panel_title = "⏳ Flow Paused" else: - label_prefix = "❌ Flow Failed:" style = "red" - node_text = "❌ Flow Failed" content_text = "Flow Execution Failed" - panel_title = "Flow Failure" - - # Update main flow label - self.update_tree_label( - flow_tree, - label_prefix, - flow_name, - style, - ) - - # Update initialization node status - for child in flow_tree.children: - if "Starting Flow" in str(child.label): - child.label = Text(node_text, style=style) - break + panel_title = "❌ Flow Failure" content = self.create_status_content( content_text, @@ -493,405 +273,141 @@ To enable tracing, do any one of these: style, ID=flow_id, ) - self.print(flow_tree) - self.print_panel(content, panel_title, style) + self.print_panel(content, panel_title, style, is_flow=True) - def update_method_status( + def handle_method_status( self, - method_branch: Tree | None, - flow_tree: Tree | None, method_name: str, status: str = "running", - ) -> Tree | None: - """Update method status in the flow tree.""" - if not flow_tree: - return None + ) -> None: + """Show method status panel.""" + if not self.verbose: + return if status == "running": - prefix, style = "🔄 Running:", "yellow" + style = "yellow" + panel_title = "🔄 Flow Method Running" elif status == "completed": - prefix, style = "✅ Completed:", "green" - for child in flow_tree.children: - if "Starting Flow" in str(child.label): - child.label = Text("Flow Method Step", style="white") - break + style = "green" + panel_title = "✅ Flow Method Completed" elif status == "paused": - prefix, style = "⏳ Paused:", "cyan" - for child in flow_tree.children: - if "Starting Flow" in str(child.label): - child.label = Text("⏳ Waiting for Feedback", style="cyan") - break + style = "cyan" + panel_title = "⏳ Flow Method Paused" else: - prefix, style = "❌ Failed:", "red" - for child in flow_tree.children: - if "Starting Flow" in str(child.label): - child.label = Text("❌ Flow Step Failed", style="red") - break + style = "red" + panel_title = "❌ Flow Method Failed" - if method_branch is not None: - if method_branch in flow_tree.children: - method_branch.label = Text(prefix, style=f"{style} bold") + Text( - f" {method_name}", style=style - ) - self.print(flow_tree) - self.print() - return method_branch + content = Text() + content.append(f"Method: {method_name}\n", style=f"{style} bold") + content.append("Status: ", style="white") + content.append(f"{status.title()}\n", style=style) - for branch in flow_tree.children: - label_str = str(branch.label) - if f" {method_name}" in label_str and ( - "Running:" in label_str - or "Completed:" in label_str - or "Failed:" in label_str - ): - method_branch = branch - break - - if method_branch is None: - method_branch = flow_tree.add("") - - method_branch.label = Text(prefix, style=f"{style} bold") + Text( - f" {method_name}", style=style - ) - - self.print(flow_tree) - self.print() - - return method_branch - - def get_llm_tree(self, tool_name: str) -> Tree: - text = Text() - text.append(f"🔧 Using {tool_name} from LLM available_function", style="yellow") - - tree = self.current_flow_tree or self.current_crew_tree - - if tree: - tree.add(text) - - return tree or Tree(text) + self.print_panel(content, panel_title, style, is_flow=True) def handle_llm_tool_usage_started( self, tool_name: str, tool_args: dict[str, Any] | str, - ) -> Tree: - # Create status content for the tool usage + ) -> None: + """Handle LLM tool usage started with panel display.""" content = self.create_status_content( "Tool Usage Started", tool_name, Status="In Progress", tool_args=tool_args ) - - # Create and print the panel - self.print_panel(content, "Tool Usage", "green") - self.print() - - # Still return the tree for compatibility with existing code - return self.get_llm_tree(tool_name) + self.print_panel(content, "🔧 LLM Tool Usage", "yellow") def handle_llm_tool_usage_finished( self, tool_name: str, ) -> None: - tree = self.get_llm_tree(tool_name) - self.add_tree_node(tree, "✅ Tool Usage Completed", "green") - self.print(tree) - self.print() + """Handle LLM tool usage finished with panel display.""" + content = Text() + content.append("Tool Usage Completed\n", style="green bold") + content.append("Tool: ", style="white") + content.append(f"{tool_name}\n", style="green") + + self.print_panel(content, "✅ LLM Tool Completed", "green") def handle_llm_tool_usage_error( self, tool_name: str, error: str, ) -> None: - tree = self.get_llm_tree(tool_name) - self.add_tree_node(tree, "❌ Tool Usage Failed", "red") - self.print(tree) - self.print() - + """Handle LLM tool usage error with panel display.""" error_content = self.create_status_content( "Tool Usage Failed", tool_name, "red", Error=error ) - self.print_panel(error_content, "Tool Error", "red") + self.print_panel(error_content, "❌ LLM Tool Error", "red") def handle_tool_usage_started( self, - agent_branch: Tree | None, tool_name: str, - crew_tree: Tree | None, tool_args: dict[str, Any] | str = "", - ) -> Tree | None: - """Handle tool usage started event.""" + run_attempts: int | None = None, + ) -> None: + """Handle tool usage started event with panel display.""" if not self.verbose: - return None - - # Parent for tool usage: LiteAgent > Agent > Task - branch_to_use = ( - self.current_lite_agent_branch or agent_branch or self.current_task_branch - ) - - # Render full crew tree when available for consistent live updates - tree_to_use = self.current_crew_tree or crew_tree or branch_to_use - - if branch_to_use is None or tree_to_use is None: - # If we don't have a valid branch, default to crew_tree if provided - if crew_tree is not None: - branch_to_use = tree_to_use = crew_tree - else: - return None + return # Update tool usage count self.tool_usage_counts[tool_name] = self.tool_usage_counts.get(tool_name, 0) + 1 + iteration = self.tool_usage_counts[tool_name] - # Find or create tool node - tool_branch = self.current_tool_branch - if tool_branch is None: - tool_branch = branch_to_use.add("") - self.current_tool_branch = tool_branch + content = Text() + content.append("Tool: ", style="white") + content.append(f"{tool_name}\n", style="yellow bold") - # Update label with current count - self.update_tree_label( - tool_branch, - "🔧", - f"Using {tool_name} ({self.tool_usage_counts[tool_name]})", - "yellow", - ) + if tool_args: + content.append("Args: ", style="white") + args_str = ( + str(tool_args)[:200] + "..." + if len(str(tool_args)) > 200 + else str(tool_args) + ) + content.append(f"{args_str}\n", style="yellow ") - # Print updated tree immediately - self.print(tree_to_use) - self.print() - - return tool_branch - - def handle_tool_usage_finished( - self, - tool_branch: Tree | None, - tool_name: str, - crew_tree: Tree | None, - ) -> None: - """Handle tool usage finished event.""" - if not self.verbose or tool_branch is None: - return - - # Decide which tree to render: prefer full crew tree, else parent branch - tree_to_use = self.current_crew_tree or crew_tree or self.current_task_branch - if tree_to_use is None: - return - - # Update the existing tool node's label - self.update_tree_label( - tool_branch, - "🔧", - f"Used {tool_name} ({self.tool_usage_counts[tool_name]})", - "green", - ) - - # Clear the current tool branch as we're done with it - self.current_tool_branch = None - - # Only print if we have a valid tree and the tool node is still in it - if isinstance(tree_to_use, Tree) and tool_branch in tree_to_use.children: - self.print(tree_to_use) - self.print() + self.print_panel(content, f"🔧 Tool Execution Started (#{iteration})", "yellow") def handle_tool_usage_error( self, - tool_branch: Tree | None, tool_name: str, error: str, - crew_tree: Tree | None, + run_attempts: int | None = None, ) -> None: - """Handle tool usage error event.""" + """Handle tool usage error event with panel display.""" if not self.verbose: return - # Decide which tree to render: prefer full crew tree, else parent branch - tree_to_use = self.current_crew_tree or crew_tree or self.current_task_branch + iteration = self.tool_usage_counts.get(tool_name, 1) - if tool_branch: - self.update_tree_label( - tool_branch, - "🔧 Failed", - f"{tool_name} ({self.tool_usage_counts[tool_name]})", - "red", - ) - if tree_to_use: - self.print(tree_to_use) - self.print() + content = Text() + content.append("Tool Failed\n", style="red bold") + content.append("Tool: ", style="white") + content.append(f"{tool_name}\n", style="red bold") + content.append("Iteration: ", style="white") + content.append(f"{iteration}\n", style="red") + if run_attempts is not None: + content.append("Attempt: ", style="white") + content.append(f"{run_attempts}\n", style="red") + content.append("Error: ", style="white") + content.append(f"{error}\n", style="red") - # Show error panel - error_content = self.create_status_content( - "Tool Usage Failed", tool_name, "red", Error=error - ) - self.print_panel(error_content, "Tool Error", "red") + self.print_panel(content, f"🔧 Tool Error (#{iteration})", "red") - def handle_llm_call_started( - self, - agent_branch: Tree | None, - crew_tree: Tree | None, - ) -> Tree | None: - """Handle LLM call started event.""" - if not self.verbose: - return None - - # Parent for tool usage: LiteAgent > Agent > Task - branch_to_use = ( - self.current_lite_agent_branch or agent_branch or self.current_task_branch - ) - - # Render full crew tree when available for consistent live updates - tree_to_use = self.current_crew_tree or crew_tree or branch_to_use - - if branch_to_use is None or tree_to_use is None: - # If we don't have a valid branch, default to crew_tree if provided - if crew_tree is not None: - branch_to_use = tree_to_use = crew_tree - else: - return None - - # Only add thinking status if we don't have a current tool branch - # or if the current tool branch is not a thinking node - should_add_thinking = self.current_tool_branch is None or "Thinking" not in str( - self.current_tool_branch.label - ) - - if should_add_thinking: - tool_branch = branch_to_use.add("") - self.update_tree_label(tool_branch, "🧠", "Thinking...", "blue") - self.current_tool_branch = tool_branch - self.print(tree_to_use) - self.print() - return tool_branch - - # Return the existing tool branch if it's already a thinking node - return self.current_tool_branch - - def handle_llm_call_completed( - self, - tool_branch: Tree | None, - agent_branch: Tree | None, - crew_tree: Tree | None, - ) -> None: - """Handle LLM call completed event.""" + def handle_llm_call_failed(self, error: str) -> None: + """Handle LLM call failed event with panel display.""" if not self.verbose: return - # Decide which tree to render: prefer full crew tree, else parent branch - tree_to_use = self.current_crew_tree or crew_tree or self.current_task_branch - if tree_to_use is None: - return - - # Try to remove the thinking status node - first try the provided tool_branch - thinking_branch_to_remove = None - removed = False - - # Method 1: Use the provided tool_branch if it's a thinking/streaming node - if tool_branch is not None and ( - "Thinking" in str(tool_branch.label) - or "Streaming" in str(tool_branch.label) - ): - thinking_branch_to_remove = tool_branch - - # Method 2: Fallback - search for any thinking/streaming node if tool_branch is None or not found - if thinking_branch_to_remove is None: - parents = [ - self.current_lite_agent_branch, - self.current_agent_branch, - self.current_task_branch, - tree_to_use, - ] - for parent in parents: - if isinstance(parent, Tree): - for child in parent.children: - label_str = str(child.label) - if "Thinking" in label_str or "Streaming" in label_str: - thinking_branch_to_remove = child - break - if thinking_branch_to_remove: - break - - # Remove the thinking node if found - if thinking_branch_to_remove: - parents = [ - self.current_lite_agent_branch, - self.current_agent_branch, - self.current_task_branch, - tree_to_use, - ] - for parent in parents: - if ( - isinstance(parent, Tree) - and thinking_branch_to_remove in parent.children - ): - parent.children.remove(thinking_branch_to_remove) - removed = True - break - - # Clear pointer if we just removed the current_tool_branch - if self.current_tool_branch is thinking_branch_to_remove: - self.current_tool_branch = None - - if removed: - self.print(tree_to_use) - self.print() - - def handle_llm_call_failed( - self, tool_branch: Tree | None, error: str, crew_tree: Tree | None - ) -> None: - """Handle LLM call failed event.""" - if not self.verbose: - return - - # Decide which tree to render: prefer full crew tree, else parent branch - tree_to_use = self.current_crew_tree or crew_tree or self.current_task_branch - - # Find the thinking branch to update (similar to completion logic) - thinking_branch_to_update = None - - if tool_branch is not None and ( - "Thinking" in str(tool_branch.label) - or "Streaming" in str(tool_branch.label) - ): - thinking_branch_to_update = tool_branch - - # Method 2: Fallback - search for any thinking/streaming node if tool_branch is None or not found - if thinking_branch_to_update is None: - parents = [ - self.current_lite_agent_branch, - self.current_agent_branch, - self.current_task_branch, - tree_to_use, - ] - for parent in parents: - if isinstance(parent, Tree): - for child in parent.children: - label_str = str(child.label) - if "Thinking" in label_str or "Streaming" in label_str: - thinking_branch_to_update = child - break - if thinking_branch_to_update: - break - - # Update the thinking branch to show failure - if thinking_branch_to_update: - thinking_branch_to_update.label = Text("❌ LLM Failed", style="red bold") - # Clear the current_tool_branch reference - if self.current_tool_branch is thinking_branch_to_update: - self.current_tool_branch = None - if tree_to_use: - self.print(tree_to_use) - self.print() - - # Show error panel error_content = Text() - error_content.append("❌ LLM Call Failed\n", style="red bold") + error_content.append("LLM Call Failed\n", style="red bold") error_content.append("Error: ", style="white") error_content.append(str(error), style="red") - self.print_panel(error_content, "LLM Error", "red") + self.print_panel(error_content, "❌ LLM Error", "red") def handle_llm_stream_chunk( self, - chunk: str, accumulated_text: str, - crew_tree: Tree | None, call_type: Any = None, ) -> None: """Handle LLM stream chunk event - display streaming text in a panel. @@ -899,7 +415,7 @@ To enable tracing, do any one of these: Args: chunk: The new chunk of text received. accumulated_text: All text accumulated so far. - crew_tree: The current crew tree for rendering. + crew_tree: Unused (kept for API compatibility). call_type: The type of LLM call (LLM_CALL or TOOL_CALL). """ if not self.verbose: @@ -908,10 +424,6 @@ To enable tracing, do any one of these: self._is_streaming = True self._last_stream_call_type = call_type - if self._live: - self._live.stop() - self._live = None - display_text = accumulated_text max_lines = 20 lines = display_text.split("\n") @@ -966,65 +478,29 @@ To enable tracing, do any one of these: def handle_crew_test_started( self, crew_name: str, source_id: str, n_iterations: int - ) -> Tree | None: - """Handle crew test started event.""" - if not self.verbose: - return None - - # Create initial panel - content = Text() - content.append("🧪 Starting Crew Test\n\n", style="blue bold") - content.append("Crew: ", style="white") - content.append(f"{crew_name}\n", style="blue") - content.append("ID: ", style="white") - content.append(str(source_id), style="blue") - content.append("\nIterations: ", style="white") - content.append(str(n_iterations), style="yellow") - - self.print() - self.print_panel(content, "Test Execution", "blue") - self.print() - - # Create and display the test tree - test_label = Text() - test_label.append("🧪 Test: ", style="blue bold") - test_label.append(crew_name or "Crew", style="blue") - test_label.append("\nStatus: ", style="white") - test_label.append("In Progress", style="yellow") - - test_tree = Tree(test_label) - self.add_tree_node(test_tree, "🔄 Running tests...", "yellow") - - self.print(test_tree) - self.print() - return test_tree - - def handle_crew_test_completed( - self, flow_tree: Tree | None, crew_name: str ) -> None: - """Handle crew test completed event.""" + """Handle crew test started event with panel display.""" if not self.verbose: return - if flow_tree: - # Update test tree label to show completion - test_label = Text() - test_label.append("✅ Test: ", style="green bold") - test_label.append(crew_name or "Crew", style="green") - test_label.append("\nStatus: ", style="white") - test_label.append("Completed", style="green bold") - flow_tree.label = test_label + content = Text() + content.append("Starting Crew Test\n", style="blue bold") + content.append("Crew: ", style="white") + content.append(f"{crew_name}\n", style="blue") + content.append("ID: ", style="white") + content.append(f"{source_id}\n", style="blue") + content.append("Iterations: ", style="white") + content.append(f"{n_iterations}\n", style="yellow") + content.append("Status: ", style="white") + content.append("Running...", style="yellow") - # Update the running tests node - for child in flow_tree.children: - if "Running tests" in str(child.label): - child.label = Text("✅ Tests completed successfully", style="green") - break + self.print_panel(content, "🧪 Test Execution Started", "blue") - self.print(flow_tree) - self.print() + def handle_crew_test_completed(self, crew_name: str) -> None: + """Handle crew test completed event with panel display.""" + if not self.verbose: + return - # Create completion panel completion_content = Text() completion_content.append("Test Execution Completed\n", style="green bold") completion_content.append("Crew: ", style="white") @@ -1090,68 +566,50 @@ To enable tracing, do any one of these: self.print_panel(failure_content, "Test Failure", "red") self.print() - def create_lite_agent_branch(self, lite_agent_role: str) -> Tree | None: - """Create and initialize a lite agent branch.""" + def create_lite_agent_branch(self, lite_agent_role: str) -> None: + """Show lite agent started panel.""" if not self.verbose: - return None + return - # Create initial tree for LiteAgent if it doesn't exist - if not self.current_lite_agent_branch: - lite_agent_label = Text() - lite_agent_label.append("🤖 LiteAgent: ", style="cyan bold") - lite_agent_label.append(lite_agent_role, style="cyan") - lite_agent_label.append("\nStatus: ", style="white") - lite_agent_label.append("In Progress", style="yellow") + content = Text() + content.append("LiteAgent Started\n", style="cyan bold") + content.append("Role: ", style="white") + content.append(f"{lite_agent_role}\n", style="cyan") + content.append("Status: ", style="white") + content.append("In Progress\n", style="yellow") - lite_agent_tree = Tree(lite_agent_label) - self.current_lite_agent_branch = lite_agent_tree - self.print(lite_agent_tree) - self.print() - - return self.current_lite_agent_branch + self.print_panel(content, "🤖 LiteAgent Started", "cyan") def update_lite_agent_status( self, - lite_agent_branch: Tree | None, lite_agent_role: str, status: str = "completed", **fields: dict[str, Any], ) -> None: - """Update lite agent status in the tree.""" - if not self.verbose or lite_agent_branch is None: + """Show lite agent status panel.""" + if not self.verbose: return - # Determine style based on status if status == "completed": - prefix, style = "✅ LiteAgent:", "green" - status_text = "Completed" - title = "LiteAgent Completion" + style = "green" + title = "✅ LiteAgent Completed" elif status == "failed": - prefix, style = "❌ LiteAgent:", "red" - status_text = "Failed" - title = "LiteAgent Error" + style = "red" + title = "❌ LiteAgent Failed" else: - prefix, style = "🤖 LiteAgent:", "yellow" - status_text = "In Progress" - title = "LiteAgent Status" + style = "yellow" + title = "🤖 LiteAgent Status" - # Update the tree label - lite_agent_label = Text() - lite_agent_label.append(f"{prefix} ", style=f"{style} bold") - lite_agent_label.append(lite_agent_role, style=style) - lite_agent_label.append("\nStatus: ", style="white") - lite_agent_label.append(status_text, style=f"{style} bold") - lite_agent_branch.label = lite_agent_label + content = Text() + content.append(f"LiteAgent {status.title()}\n", style=f"{style} bold") + content.append("Role: ", style="white") + content.append(f"{lite_agent_role}\n", style=style) - self.print(lite_agent_branch) - self.print() + for field_name, field_value in fields.items(): + content.append(f"{field_name}: ", style="white") + content.append(f"{field_value}\n", style=style) - # Show status panel if additional fields are provided - if fields: - content = self.create_status_content( - f"LiteAgent {status.title()}", lite_agent_role, style, **fields - ) - self.print_panel(content, title, style) + self.print_panel(content, title, style) def handle_lite_agent_execution( self, @@ -1160,346 +618,174 @@ To enable tracing, do any one of these: error: Any = None, **fields: dict[str, Any], ) -> None: - """Handle lite agent execution events with consistent formatting.""" + """Handle lite agent execution events with panel display.""" if not self.verbose: return if status == "started": - # Create or get the LiteAgent branch - lite_agent_branch = self.create_lite_agent_branch(lite_agent_role) - if lite_agent_branch and fields: - # Show initial status panel + self.create_lite_agent_branch(lite_agent_role) + if fields: content = self.create_status_content( "LiteAgent Session Started", lite_agent_role, "cyan", **fields ) - self.print_panel(content, "LiteAgent Started", "cyan") + self.print_panel(content, "🤖 LiteAgent Started", "cyan") else: - # Update existing LiteAgent branch if error: fields["Error"] = error - self.update_lite_agent_status( - self.current_lite_agent_branch, lite_agent_role, status, **fields - ) + self.update_lite_agent_status(lite_agent_role, status, **fields) def handle_knowledge_retrieval_started( self, - agent_branch: Tree | None, - crew_tree: Tree | None, - ) -> Tree | None: - """Handle knowledge retrieval started event.""" + ) -> None: + """Handle knowledge retrieval started event with panel display.""" if not self.verbose: - return None + return - branch_to_use = agent_branch or self.current_lite_agent_branch - tree_to_use = branch_to_use or crew_tree + content = Text() + content.append("Knowledge Retrieval Started\n", style="blue bold") + content.append("Status: ", style="white") + content.append("Retrieving...\n", style="blue") - if branch_to_use is None or tree_to_use is None: - # If we don't have a valid branch, default to crew_tree if provided - if crew_tree is not None: - branch_to_use = tree_to_use = crew_tree - else: - return None - - knowledge_branch = branch_to_use.add("") - self.update_tree_label( - knowledge_branch, "🔍", "Knowledge Retrieval Started", "blue" - ) - - self.print(tree_to_use) - self.print() - return knowledge_branch + self.print_panel(content, "🔍 Knowledge Retrieval", "blue") def handle_knowledge_retrieval_completed( self, - agent_branch: Tree | None, - crew_tree: Tree | None, retrieved_knowledge: Any, + search_query: str, ) -> None: - """Handle knowledge retrieval completed event.""" + """Handle knowledge retrieval completed event with panel display.""" if not self.verbose: return - branch_to_use = self.current_lite_agent_branch or agent_branch - tree_to_use = branch_to_use or crew_tree - - if branch_to_use is None and tree_to_use is not None: - branch_to_use = tree_to_use - - if branch_to_use is None or tree_to_use is None: - if retrieved_knowledge: - knowledge_text = str(retrieved_knowledge) - if len(knowledge_text) > 500: - knowledge_text = knowledge_text[:497] + "..." - - knowledge_panel = Panel( - Text(knowledge_text, style="white"), - title="📚 Retrieved Knowledge", - border_style="green", - padding=(1, 2), - ) - self.print(knowledge_panel) - self.print() - return - - knowledge_branch_found = False - for child in branch_to_use.children: - if "Knowledge Retrieval Started" in str(child.label): - self.update_tree_label( - child, "✅", "Knowledge Retrieval Completed", "green" - ) - knowledge_branch_found = True - break - - if not knowledge_branch_found: - for child in branch_to_use.children: - if ( - "Knowledge Retrieval" in str(child.label) - and "Started" not in str(child.label) - and "Completed" not in str(child.label) - ): - self.update_tree_label( - child, "✅", "Knowledge Retrieval Completed", "green" - ) - knowledge_branch_found = True - break - - if not knowledge_branch_found: - knowledge_branch = branch_to_use.add("") - self.update_tree_label( - knowledge_branch, "✅", "Knowledge Retrieval Completed", "green" - ) - - self.print(tree_to_use) - + content = Text() + content.append("Search Query:\n", style="white") + content.append(f"{search_query}\n", style="green") + content.append("Knowledge Retrieved: \n", style="white") if retrieved_knowledge: knowledge_text = str(retrieved_knowledge) if len(knowledge_text) > 500: knowledge_text = knowledge_text[:497] + "..." + content.append(f"{knowledge_text}\n", style="green ") + else: + content.append("No knowledge retrieved\n", style="yellow") - knowledge_panel = Panel( - Text(knowledge_text, style="white"), - title="📚 Retrieved Knowledge", - border_style="green", - padding=(1, 2), - ) - self.print(knowledge_panel) - - self.print() + self.print_panel(content, "📚 Knowledge Retrieved", "green") def handle_knowledge_query_started( self, - agent_branch: Tree | None, task_prompt: str, - crew_tree: Tree | None, ) -> None: - """Handle knowledge query generated event.""" + """Handle knowledge query started event with panel display.""" if not self.verbose: return - branch_to_use = self.current_lite_agent_branch or agent_branch - tree_to_use = branch_to_use or crew_tree - if branch_to_use is None or tree_to_use is None: - return - - query_branch = branch_to_use.add("") - self.update_tree_label( - query_branch, "🔎", f"Query: {task_prompt[:50]}...", "yellow" + content = Text() + content.append("Knowledge Query Started\n", style="yellow bold") + content.append("Query: ", style="white") + query_preview = ( + task_prompt[:100] + "..." if len(task_prompt) > 100 else task_prompt ) + content.append(f"{query_preview}\n", style="yellow") - self.print(tree_to_use) - self.print() + self.print_panel(content, "🔎 Knowledge Query", "yellow") def handle_knowledge_query_failed( self, - agent_branch: Tree | None, error: str, - crew_tree: Tree | None, ) -> None: - """Handle knowledge query failed event.""" + """Handle knowledge query failed event with panel display.""" if not self.verbose: return - tree_to_use = self.current_lite_agent_branch or crew_tree - branch_to_use = self.current_lite_agent_branch or agent_branch - - if branch_to_use and tree_to_use: - query_branch = branch_to_use.add("") - self.update_tree_label(query_branch, "❌", "Knowledge Query Failed", "red") - self.print(tree_to_use) - self.print() - - # Show error panel error_content = self.create_status_content( "Knowledge Query Failed", "Query Error", "red", Error=error ) - self.print_panel(error_content, "Knowledge Error", "red") + self.print_panel(error_content, "❌ Knowledge Error", "red") - def handle_knowledge_query_completed( - self, - agent_branch: Tree | None, - crew_tree: Tree | None, - ) -> None: - """Handle knowledge query completed event.""" + def handle_knowledge_query_completed(self) -> None: + """Handle knowledge query completed event with panel display.""" if not self.verbose: return - branch_to_use = self.current_lite_agent_branch or agent_branch - tree_to_use = branch_to_use or crew_tree + content = Text() + content.append("Knowledge Query Completed\n", style="green bold") - if branch_to_use is None or tree_to_use is None: - return - - query_branch = branch_to_use.add("") - self.update_tree_label(query_branch, "✅", "Knowledge Query Completed", "green") - - self.print(tree_to_use) - self.print() + self.print_panel(content, "✅ Knowledge Query Complete", "green") def handle_knowledge_search_query_failed( self, - agent_branch: Tree | None, error: str, - crew_tree: Tree | None, ) -> None: - """Handle knowledge search query failed event.""" + """Handle knowledge search query failed event with panel display.""" if not self.verbose: return - tree_to_use = self.current_lite_agent_branch or crew_tree - branch_to_use = self.current_lite_agent_branch or agent_branch - - if branch_to_use and tree_to_use: - query_branch = branch_to_use.add("") - self.update_tree_label(query_branch, "❌", "Knowledge Search Failed", "red") - self.print(tree_to_use) - self.print() - - # Show error panel error_content = self.create_status_content( "Knowledge Search Failed", "Search Error", "red", Error=error ) - self.print_panel(error_content, "Search Error", "red") + self.print_panel(error_content, "❌ Search Error", "red") # ----------- AGENT REASONING EVENTS ----------- def handle_reasoning_started( self, - agent_branch: Tree | None, attempt: int, - crew_tree: Tree | None, - ) -> Tree | None: - """Handle agent reasoning started (or refinement) event.""" + ) -> None: + """Handle agent reasoning started event with panel display.""" if not self.verbose: - return None + return - # Prefer LiteAgent > Agent > Task branch as the parent for reasoning - branch_to_use = ( - self.current_lite_agent_branch or agent_branch or self.current_task_branch + content = Text() + content.append("Reasoning Started\n", style="blue bold") + content.append("Attempt: ", style="white") + content.append(f"{attempt}\n", style="blue") + content.append("Status: ", style="white") + content.append("Thinking...\n", style="blue") + + panel_title = ( + f"🧠 Reasoning (Attempt #{attempt})" if attempt > 1 else "🧠 Reasoning" ) - - # We always want to render the full crew tree when possible so the - # Live view updates coherently. Fallbacks: crew tree → branch itself. - tree_to_use = self.current_crew_tree or crew_tree or branch_to_use - - if branch_to_use is None: - # Nothing to attach to, abort - return None - - # Reuse existing reasoning branch if present - reasoning_branch = self.current_reasoning_branch - if reasoning_branch is None: - reasoning_branch = branch_to_use.add("") - self.current_reasoning_branch = reasoning_branch - - # Build label text depending on attempt - status_text = ( - f"Reasoning (Attempt {attempt})" if attempt > 1 else "Reasoning..." - ) - self.update_tree_label(reasoning_branch, "🧠", status_text, "blue") - - self.print(tree_to_use) - self.print() - - return reasoning_branch + self.print_panel(content, panel_title, "blue") def handle_reasoning_completed( self, plan: str, ready: bool, - crew_tree: Tree | None, ) -> None: - """Handle agent reasoning completed event.""" + """Handle agent reasoning completed event with panel display.""" if not self.verbose: return - reasoning_branch = self.current_reasoning_branch - tree_to_use = ( - self.current_crew_tree - or self.current_lite_agent_branch - or self.current_task_branch - or crew_tree - ) - style = "green" if ready else "yellow" - status_text = ( - "Reasoning Completed" if ready else "Reasoning Completed (Not Ready)" - ) + status_text = "Ready" if ready else "Not Ready" - if reasoning_branch is not None: - self.update_tree_label(reasoning_branch, "✅", status_text, style) + content = Text() + content.append("Reasoning Completed\n", style=f"{style} bold") + content.append("Status: ", style="white") + content.append(f"{status_text}\n", style=style) - if tree_to_use is not None: - self.print(tree_to_use) - - # Show plan in a panel (trim very long plans) if plan: - plan_panel = Panel( - Text(plan, style="white"), - title="🧠 Reasoning Plan", - border_style=style, - padding=(1, 2), - ) - self.print(plan_panel) + plan_preview = plan[:500] + "..." if len(plan) > 500 else plan + content.append("Plan: ", style="white") + content.append(f"{plan_preview}\n", style=style) - self.print() - - # Clear stored branch after completion - self.current_reasoning_branch = None + self.print_panel(content, "✅ Reasoning Complete", style) def handle_reasoning_failed( self, error: str, - crew_tree: Tree | None, ) -> None: - """Handle agent reasoning failure event.""" + """Handle agent reasoning failure event with panel display.""" if not self.verbose: return - reasoning_branch = self.current_reasoning_branch - tree_to_use = ( - self.current_crew_tree - or self.current_lite_agent_branch - or self.current_task_branch - or crew_tree - ) - - if reasoning_branch is not None: - self.update_tree_label(reasoning_branch, "❌", "Reasoning Failed", "red") - - if tree_to_use is not None: - self.print(tree_to_use) - - # Error panel error_content = self.create_status_content( "Reasoning Failed", "Error", "red", Error=error, ) - self.print_panel(error_content, "Reasoning Error", "red") - - # Clear stored branch after failure - self.current_reasoning_branch = None + self.print_panel(error_content, "❌ Reasoning Error", "red") # ----------- AGENT LOGGING EVENTS ----------- @@ -1545,74 +831,12 @@ To enable tracing, do any one of these: return import json - import re from crewai.agents.parser import AgentAction, AgentFinish agent_role = agent_role.partition("\n")[0] if isinstance(formatted_answer, AgentAction): - thought = re.sub(r"\n+", "\n", formatted_answer.thought) - formatted_json = json.dumps( - json.loads(formatted_answer.tool_input), - indent=2, - ensure_ascii=False, - ) - - # Create content for the action panel - content = Text() - content.append("Agent: ", style="white") - content.append(f"{agent_role}\n\n", style="bright_green bold") - - if thought and thought != "": - content.append("Thought: ", style="white") - content.append(f"{thought}\n\n", style="bright_green") - - content.append("Using Tool: ", style="white") - content.append(f"{formatted_answer.tool}\n\n", style="bright_green bold") - - content.append("Tool Input:\n", style="white") - - # Create a syntax-highlighted JSON code block - json_syntax = Syntax( - formatted_json, - "json", - theme="monokai", - line_numbers=False, - background_color="default", - word_wrap=True, - ) - - content.append("\n") - - # Create separate panels for better organization - main_content = Text() - main_content.append("Agent: ", style="white") - main_content.append(f"{agent_role}\n\n", style="bright_green bold") - - if thought and thought != "": - main_content.append("Thought: ", style="white") - main_content.append(f"{thought}\n\n", style="bright_green") - - main_content.append("Using Tool: ", style="white") - main_content.append(f"{formatted_answer.tool}", style="bright_green bold") - - # Create the main action panel - action_panel = Panel( - main_content, - title="🔧 Agent Tool Execution", - border_style="magenta", - padding=(1, 2), - ) - - # Create the JSON input panel - input_panel = Panel( - json_syntax, - title="Tool Input", - border_style="blue", - padding=(1, 2), - ) - # Create tool output content with better formatting output_text = str(formatted_answer.result) if len(output_text) > 2000: @@ -1626,8 +850,6 @@ To enable tracing, do any one of these: ) # Print all panels - self.print(action_panel) - self.print(input_panel) self.print(output_panel) self.print() @@ -1668,246 +890,111 @@ To enable tracing, do any one of these: self.print(finish_panel) self.print() - def handle_memory_retrieval_started( - self, - agent_branch: Tree | None, - crew_tree: Tree | None, - ) -> Tree | None: + def handle_memory_retrieval_started(self) -> None: + """Handle memory retrieval started event with panel display.""" if not self.verbose: - return None + return - branch_to_use = agent_branch or self.current_lite_agent_branch - tree_to_use = branch_to_use or crew_tree + content = Text() + content.append("Memory Retrieval Started\n", style="blue bold") + content.append("Status: ", style="white") + content.append("Retrieving...\n", style="blue") - if branch_to_use is None or tree_to_use is None: - if crew_tree is not None: - branch_to_use = tree_to_use = crew_tree - else: - return None - - memory_branch = branch_to_use.add("") - self.update_tree_label(memory_branch, "🧠", "Memory Retrieval Started", "blue") - - self.print(tree_to_use) - self.print() - return memory_branch + self.print_panel(content, "🧠 Memory Retrieval", "blue") def handle_memory_retrieval_completed( self, - agent_branch: Tree | None, - crew_tree: Tree | None, memory_content: str, retrieval_time_ms: float, ) -> None: + """Handle memory retrieval completed event with panel display.""" if not self.verbose: return - branch_to_use = self.current_lite_agent_branch or agent_branch - tree_to_use = branch_to_use or crew_tree - - if branch_to_use is None and tree_to_use is not None: - branch_to_use = tree_to_use - - def add_panel() -> None: - memory_text = str(memory_content) - if len(memory_text) > 500: - memory_text = memory_text[:497] + "..." - - memory_panel = Panel( - Text(memory_text, style="white"), - title="🧠 Retrieved Memory", - subtitle=f"Retrieval Time: {retrieval_time_ms:.2f}ms", - border_style="green", - padding=(1, 2), - ) - self.print(memory_panel) - self.print() - - if branch_to_use is None or tree_to_use is None: - add_panel() - return - - memory_branch_found = False - for child in branch_to_use.children: - if "Memory Retrieval Started" in str(child.label): - self.update_tree_label( - child, "✅", "Memory Retrieval Completed", "green" - ) - memory_branch_found = True - break - - if not memory_branch_found: - for child in branch_to_use.children: - if ( - "Memory Retrieval" in str(child.label) - and "Started" not in str(child.label) - and "Completed" not in str(child.label) - ): - self.update_tree_label( - child, "✅", "Memory Retrieval Completed", "green" - ) - memory_branch_found = True - break - - if not memory_branch_found: - memory_branch = branch_to_use.add("") - self.update_tree_label( - memory_branch, "✅", "Memory Retrieval Completed", "green" - ) - - self.print(tree_to_use) + content = Text() + content.append("Memory Retrieval Completed\n", style="green bold") + content.append("Time: ", style="white") + content.append(f"{retrieval_time_ms:.2f}ms\n", style="green") if memory_content: - add_panel() + memory_text = str(memory_content) - def handle_memory_query_completed( - self, - agent_branch: Tree | None, - source_type: str, - query_time_ms: float, - crew_tree: Tree | None, - ) -> None: - if not self.verbose: - return + content.append("Content: \n", style="white") + content.append(f"{memory_text}\n", style="green ") - branch_to_use = self.current_lite_agent_branch or agent_branch - tree_to_use = branch_to_use or crew_tree - - if branch_to_use is None and tree_to_use is not None: - branch_to_use = tree_to_use - - if branch_to_use is None: - return - - memory_type = source_type.replace("_", " ").title() - - for child in branch_to_use.children: - if "Memory Retrieval" in str(child.label): - for inner_child in child.children: - sources_branch = inner_child - if "Sources Used" in str(inner_child.label): - sources_branch.add(f"✅ {memory_type} ({query_time_ms:.2f}ms)") - break - else: - sources_branch = child.add("Sources Used") - sources_branch.add(f"✅ {memory_type} ({query_time_ms:.2f}ms)") - break + self.print_panel(content, "🧠 Memory Retrieved", "green") def handle_memory_query_failed( self, - agent_branch: Tree | None, - crew_tree: Tree | None, error: str, source_type: str, ) -> None: + """Handle memory query failed event with panel display.""" if not self.verbose: return - branch_to_use = self.current_lite_agent_branch or agent_branch - tree_to_use = branch_to_use or crew_tree - - if branch_to_use is None and tree_to_use is not None: - branch_to_use = tree_to_use - - if branch_to_use is None: - return - memory_type = source_type.replace("_", " ").title() - for child in branch_to_use.children: - if "Memory Retrieval" in str(child.label): - for inner_child in child.children: - sources_branch = inner_child - if "Sources Used" in str(inner_child.label): - sources_branch.add(f"❌ {memory_type} - Error: {error}") - break - else: - sources_branch = child.add("🧠 Sources Used") - sources_branch.add(f"❌ {memory_type} - Error: {error}") - break + content = Text() + content.append("Memory Query Failed\n", style="red bold") + content.append("Source: ", style="white") + content.append(f"{memory_type}\n", style="red") + content.append("Error: ", style="white") + content.append(f"{error}\n", style="red") - def handle_memory_save_started( - self, agent_branch: Tree | None, crew_tree: Tree | None - ) -> None: + self.print_panel(content, "❌ Memory Query Error", "red") + + def handle_memory_save_started(self) -> None: + """Handle memory save started event with panel display.""" if not self.verbose: return - branch_to_use = agent_branch or self.current_lite_agent_branch - tree_to_use = branch_to_use or crew_tree + content = Text() + content.append("Memory Save Started\n", style="blue bold") + content.append("Status: ", style="white") + content.append("Saving...\n", style="blue") - if tree_to_use is None: - return - - for child in tree_to_use.children: - if "Memory Update" in str(child.label): - break - else: - memory_branch = tree_to_use.add("") - self.update_tree_label( - memory_branch, "🧠", "Memory Update Overall", "white" - ) - - self.print(tree_to_use) - self.print() + self.print_panel(content, "🧠 Memory Save", "blue") def handle_memory_save_completed( self, - agent_branch: Tree | None, - crew_tree: Tree | None, save_time_ms: float, source_type: str, ) -> None: + """Handle memory save completed event with panel display.""" if not self.verbose: return - branch_to_use = agent_branch or self.current_lite_agent_branch - tree_to_use = branch_to_use or crew_tree - - if tree_to_use is None: - return - memory_type = source_type.replace("_", " ").title() - content = f"✅ {memory_type} Memory Saved ({save_time_ms:.2f}ms)" - for child in tree_to_use.children: - if "Memory Update" in str(child.label): - child.add(content) - break - else: - memory_branch = tree_to_use.add("") - memory_branch.add(content) + content = Text() + content.append("Memory Save Completed\n", style="green bold") + content.append("Source: ", style="white") + content.append(f"{memory_type}\n", style="green") + content.append("Time: ", style="white") + content.append(f"{save_time_ms:.2f}ms\n", style="green") - self.print(tree_to_use) - self.print() + self.print_panel(content, "✅ Memory Saved", "green") def handle_memory_save_failed( self, - agent_branch: Tree | None, error: str, source_type: str, - crew_tree: Tree | None, ) -> None: + """Handle memory save failed event with panel display.""" if not self.verbose: return - branch_to_use = agent_branch or self.current_lite_agent_branch - tree_to_use = branch_to_use or crew_tree - - if branch_to_use is None or tree_to_use is None: - return - memory_type = source_type.replace("_", " ").title() - content = f"❌ {memory_type} Memory Save Failed" - for child in branch_to_use.children: - if "Memory Update" in str(child.label): - child.add(content) - break - else: - memory_branch = branch_to_use.add("") - memory_branch.add(content) - self.print(tree_to_use) - self.print() + content = Text() + content.append("Memory Save Failed\n", style="red bold") + content.append("Source: ", style="white") + content.append(f"{memory_type}\n", style="red") + content.append("Error: ", style="white") + content.append(f"{error}\n", style="red") + + self.print_panel(content, "❌ Memory Save Error", "red") def handle_guardrail_started( self, @@ -1974,94 +1061,30 @@ To enable tracing, do any one of these: agent_id: str, is_multiturn: bool = False, turn_number: int = 1, - ) -> Tree | None: - """Handle A2A delegation started event. - - Args: - endpoint: A2A agent endpoint URL - task_description: Task being delegated - agent_id: A2A agent identifier - is_multiturn: Whether this is part of a multiturn conversation - turn_number: Current turn number in conversation (1-indexed) - """ - branch_to_use = self.current_lite_agent_branch or self.current_task_branch - tree_to_use = self.current_crew_tree or branch_to_use - a2a_branch: Tree | None = None - + ) -> None: + """Handle A2A delegation started event with panel display.""" if is_multiturn: - if self.current_a2a_turn_count == 0 and not isinstance( - self.current_a2a_conversation_branch, Tree - ): - if branch_to_use is not None and tree_to_use is not None: - self.current_a2a_conversation_branch = branch_to_use.add("") - self.update_tree_label( - self.current_a2a_conversation_branch, - "💬", - f"Multiturn A2A Conversation ({agent_id})", - "cyan", - ) - self.print(tree_to_use) - self.print() - else: - self.current_a2a_conversation_branch = "MULTITURN_NO_TREE" - - content = Text() - content.append( - "Multiturn A2A Conversation Started\n\n", style="cyan bold" - ) - content.append("Agent ID: ", style="white") - content.append(f"{agent_id}\n", style="cyan") - content.append("Note: ", style="white dim") - content.append( - "Conversation will be tracked in tree view", style="cyan dim" - ) - - panel = self.create_panel( - content, "💬 Multiturn Conversation", "cyan" - ) - self.print(panel) - self.print() - self.current_a2a_turn_count = turn_number - return ( - self.current_a2a_conversation_branch - if isinstance(self.current_a2a_conversation_branch, Tree) - else None - ) - - if branch_to_use is not None and tree_to_use is not None: - a2a_branch = branch_to_use.add("") - self.update_tree_label( - a2a_branch, - "🔗", - f"Delegating to A2A Agent ({agent_id})", - "cyan", - ) - - self.print(tree_to_use) - self.print() - content = Text() - content.append("A2A Delegation Started\n\n", style="cyan bold") + content.append("A2A Delegation Started\n", style="cyan bold") content.append("Agent ID: ", style="white") content.append(f"{agent_id}\n", style="cyan") content.append("Endpoint: ", style="white") - content.append(f"{endpoint}\n\n", style="cyan dim") - content.append("Task Description:\n", style="white") - + content.append(f"{endpoint}\n", style="cyan") + if is_multiturn: + content.append("Turn: ", style="white") + content.append(f"{turn_number}\n", style="cyan") + content.append("Task: ", style="white") task_preview = ( - task_description - if len(task_description) <= 200 - else task_description[:197] + "..." + task_description[:200] + "..." + if len(task_description) > 200 + else task_description ) - content.append(task_preview, style="cyan") + content.append(f"{task_preview}\n", style="cyan") - panel = self.create_panel(content, "🔗 A2A Delegation", "cyan") - self.print(panel) - self.print() - - return a2a_branch + self.print_panel(content, "🔗 A2A Delegation", "cyan") + return def handle_a2a_delegation_completed( self, @@ -2070,154 +1093,58 @@ To enable tracing, do any one of these: error: str | None = None, is_multiturn: bool = False, ) -> None: - """Handle A2A delegation completed event. - - Args: - status: Completion status - result: Optional result message - error: Optional error message (or response for input_required) - is_multiturn: Whether this is part of a multiturn conversation - """ - tree_to_use = self.current_crew_tree or self.current_task_branch - a2a_branch = None - - if is_multiturn and self.current_a2a_conversation_branch: - has_tree = isinstance(self.current_a2a_conversation_branch, Tree) - - if status == "input_required" and error: - pass - elif status == "completed": - if has_tree and isinstance(self.current_a2a_conversation_branch, Tree): - final_turn = self.current_a2a_conversation_branch.add("") - self.update_tree_label( - final_turn, - "✅", - "Conversation Completed", - "green", - ) - - if tree_to_use: - self.print(tree_to_use) - self.print() - - self.current_a2a_conversation_branch = None + """Handle A2A delegation completed event with panel display.""" + if is_multiturn: + if status in ["completed", "failed"]: self.current_a2a_turn_count = 0 - elif status == "failed": - if has_tree and isinstance(self.current_a2a_conversation_branch, Tree): - error_turn = self.current_a2a_conversation_branch.add("") - error_msg = ( - error[:150] + "..." if error and len(error) > 150 else error - ) - self.update_tree_label( - error_turn, - "❌", - f"Failed: {error_msg}" if error else "Conversation Failed", - "red", - ) - - if tree_to_use: - self.print(tree_to_use) - self.print() - - self.current_a2a_conversation_branch = None - self.current_a2a_turn_count = 0 - - return - - if a2a_branch and tree_to_use: - if status == "completed": - self.update_tree_label( - a2a_branch, - "✅", - "A2A Delegation Completed", - "green", - ) - elif status == "failed": - self.update_tree_label( - a2a_branch, - "❌", - "A2A Delegation Failed", - "red", - ) - else: - self.update_tree_label( - a2a_branch, - "⚠️", - f"A2A Delegation {status.replace('_', ' ').title()}", - "yellow", - ) - - self.print(tree_to_use) - self.print() if status == "completed" and result: content = Text() - content.append("A2A Delegation Completed\n\n", style="green bold") - content.append("Result:\n", style="white") + content.append("A2A Delegation Completed\n", style="green bold") + content.append("Result: ", style="white") + result_preview = result[:500] + "..." if len(result) > 500 else result + content.append(f"{result_preview}\n", style="green") - result_preview = result if len(result) <= 500 else result[:497] + "..." - content.append(result_preview, style="green") - - panel = self.create_panel(content, "✅ A2A Success", "green") - self.print(panel) - self.print() + self.print_panel(content, "✅ A2A Success", "green") elif status == "input_required" and error: content = Text() - content.append("A2A Response\n\n", style="cyan bold") - content.append("Message:\n", style="white") + content.append("A2A Response Received\n", style="cyan bold") + content.append("Message: ", style="white") + response_preview = error[:500] + "..." if len(error) > 500 else error + content.append(f"{response_preview}\n", style="cyan") - response_preview = error if len(error) <= 500 else error[:497] + "..." - content.append(response_preview, style="cyan") - - panel = self.create_panel(content, "💬 A2A Response", "cyan") - self.print(panel) - self.print() - elif error: + self.print_panel(content, "💬 A2A Response", "cyan") + elif status == "failed": content = Text() - content.append( - "A2A Delegation Issue\n\n", - style="red bold" if status == "failed" else "yellow bold", - ) - content.append("Status: ", style="white") - content.append( - f"{status}\n\n", style="red" if status == "failed" else "yellow" - ) - content.append("Message:\n", style="white") - content.append(error, style="red" if status == "failed" else "yellow") + content.append("A2A Delegation Failed\n", style="red bold") + if error: + content.append("Error: ", style="white") + content.append(f"{error}\n", style="red") - panel_style = "red" if status == "failed" else "yellow" - panel_title = "❌ A2A Failed" if status == "failed" else "⚠️ A2A Status" - panel = self.create_panel(content, panel_title, panel_style) - self.print(panel) - self.print() + self.print_panel(content, "❌ A2A Failed", "red") + else: + content = Text() + content.append(f"A2A Delegation {status.title()}\n", style="yellow bold") + if error: + content.append("Message: ", style="white") + content.append(f"{error}\n", style="yellow") + + self.print_panel(content, "⚠️ A2A Status", "yellow") def handle_a2a_conversation_started( self, agent_id: str, endpoint: str, ) -> None: - """Handle A2A conversation started event. + """Handle A2A conversation started event with panel display.""" + content = Text() + content.append("A2A Conversation Started\n", style="cyan bold") + content.append("Agent ID: ", style="white") + content.append(f"{agent_id}\n", style="cyan") + content.append("Endpoint: ", style="white") + content.append(f"{endpoint}\n", style="cyan ") - Args: - agent_id: A2A agent identifier - endpoint: A2A agent endpoint URL - """ - branch_to_use = self.current_lite_agent_branch or self.current_task_branch - tree_to_use = self.current_crew_tree or branch_to_use - - if not isinstance(self.current_a2a_conversation_branch, Tree): - if branch_to_use is not None and tree_to_use is not None: - self.current_a2a_conversation_branch = branch_to_use.add("") - self.update_tree_label( - self.current_a2a_conversation_branch, - "💬", - f"Multiturn A2A Conversation ({agent_id})", - "cyan", - ) - self.print(tree_to_use) - self.print() - else: - self.current_a2a_conversation_branch = "MULTITURN_NO_TREE" + self.print_panel(content, "💬 A2A Conversation", "cyan") def handle_a2a_message_sent( self, @@ -2225,13 +1152,7 @@ To enable tracing, do any one of these: turn_number: int, agent_role: str | None = None, ) -> None: - """Handle A2A message sent event. - - Args: - message: Message content sent to the A2A agent - turn_number: Current turn number - agent_role: Role of the CrewAI agent sending the message - """ + """Handle A2A message sent event - store for display with response.""" self._pending_a2a_message = message self._pending_a2a_agent_role = agent_role self._pending_a2a_turn_number = turn_number @@ -2243,91 +1164,56 @@ To enable tracing, do any one of these: status: str, agent_role: str | None = None, ) -> None: - """Handle A2A response received event. + """Handle A2A response received event with panel display.""" + crewai_agent_role = self._pending_a2a_agent_role or agent_role or "User" + message_content = self._pending_a2a_message or "" - Args: - response: Response content from the A2A agent - turn_number: Current turn number - status: Response status (input_required, completed, etc.) - agent_role: Role of the CrewAI agent (for display) - """ - if self.current_a2a_conversation_branch and isinstance( - self.current_a2a_conversation_branch, Tree - ): - if turn_number in self._a2a_turn_branches: - turn_branch = self._a2a_turn_branches[turn_number] - else: - turn_branch = self.current_a2a_conversation_branch.add("") - self.update_tree_label( - turn_branch, - "💬", - f"Turn {turn_number}", - "cyan", - ) - self._a2a_turn_branches[turn_number] = turn_branch + # Determine status styling + if status == "completed": + style = "green" + status_indicator = "✓" + elif status == "input_required": + style = "yellow" + status_indicator = "❓" + elif status == "failed": + style = "red" + status_indicator = "✗" + elif status == "auth_required": + style = "magenta" + status_indicator = "🔒" + elif status == "canceled": + style = "" + status_indicator = "⊘" + else: + style = "cyan" + status_indicator = "" - crewai_agent_role = self._pending_a2a_agent_role or agent_role or "User" - message_content = self._pending_a2a_message or "sent message" + content = Text() + content.append(f"A2A Turn {turn_number}\n", style="cyan bold") + content.append("Status: ", style="white") + content.append(f"{status_indicator} {status}\n", style=style) - message_preview = ( - message_content[:100] + "..." - if len(message_content) > 100 + if message_content: + content.append(f"{crewai_agent_role}: ", style="blue bold") + msg_preview = ( + message_content[:200] + "..." + if len(message_content) > 200 else message_content ) + content.append(f"{msg_preview}\n", style="blue") - user_node = turn_branch.add("") - self.update_tree_label( - user_node, - f"{crewai_agent_role} 👤 : ", - f'"{message_preview}"', - "blue", - ) + content.append( + f"{self._current_a2a_agent_name or 'A2A Agent'}: ", style=f"{style} bold" + ) + response_preview = response[:200] + "..." if len(response) > 200 else response + content.append(f"{response_preview}\n", style=style) - agent_node = turn_branch.add("") - response_preview = ( - response[:100] + "..." if len(response) > 100 else response - ) + self.print_panel(content, f"💬 A2A Turn #{turn_number}", style) - a2a_agent_display = f"{self._current_a2a_agent_name} \U0001f916: " - - if status == "completed": - response_color = "green" - status_indicator = "✓" - elif status == "input_required": - response_color = "yellow" - status_indicator = "❓" - elif status == "failed": - response_color = "red" - status_indicator = "✗" - elif status == "auth_required": - response_color = "magenta" - status_indicator = "🔒" - elif status == "canceled": - response_color = "dim" - status_indicator = "⊘" - else: - response_color = "cyan" - status_indicator = "" - - label = f'"{response_preview}"' - if status_indicator: - label = f"{status_indicator} {label}" - - self.update_tree_label( - agent_node, - a2a_agent_display, - label, - response_color, - ) - - self._pending_a2a_message = None - self._pending_a2a_agent_role = None - self._pending_a2a_turn_number = None - - tree_to_use = self.current_crew_tree or self.current_task_branch - if tree_to_use: - self.print(tree_to_use) - self.print() + # Clear pending state + self._pending_a2a_message = None + self._pending_a2a_agent_role = None + self._pending_a2a_turn_number = None def handle_a2a_conversation_completed( self, @@ -2336,68 +1222,38 @@ To enable tracing, do any one of these: error: str | None, total_turns: int, ) -> None: - """Handle A2A conversation completed event. - - Args: - status: Final status (completed, failed, etc.) - final_result: Final result if completed successfully - error: Error message if failed - total_turns: Total number of turns in the conversation - """ - if self.current_a2a_conversation_branch and isinstance( - self.current_a2a_conversation_branch, Tree - ): - if status == "completed": - if self._pending_a2a_message and self._pending_a2a_agent_role: - if total_turns in self._a2a_turn_branches: - turn_branch = self._a2a_turn_branches[total_turns] - else: - turn_branch = self.current_a2a_conversation_branch.add("") - self.update_tree_label( - turn_branch, - "💬", - f"Turn {total_turns}", - "cyan", - ) - self._a2a_turn_branches[total_turns] = turn_branch - - crewai_agent_role = self._pending_a2a_agent_role - message_content = self._pending_a2a_message - - message_preview = ( - message_content[:100] + "..." - if len(message_content) > 100 - else message_content - ) - - user_node = turn_branch.add("") - self.update_tree_label( - user_node, - f"{crewai_agent_role} 👤 : ", - f'"{message_preview}"', - "green", - ) - - self._pending_a2a_message = None - self._pending_a2a_agent_role = None - self._pending_a2a_turn_number = None - elif status == "failed": - error_turn = self.current_a2a_conversation_branch.add("") - error_msg = error[:150] + "..." if error and len(error) > 150 else error - self.update_tree_label( - error_turn, - "❌", - f"Failed: {error_msg}" if error else "Conversation Failed", - "red", + """Handle A2A conversation completed event with panel display.""" + if status == "completed": + content = Text() + content.append("A2A Conversation Completed\n", style="green bold") + content.append("Total Turns: ", style="white") + content.append(f"{total_turns}\n", style="green") + if final_result: + content.append("Result: ", style="white") + result_preview = ( + final_result[:500] + "..." + if len(final_result) > 500 + else final_result ) + content.append(f"{result_preview}\n", style="green") - tree_to_use = self.current_crew_tree or self.current_task_branch - if tree_to_use: - self.print(tree_to_use) - self.print() + self.print_panel(content, "✅ A2A Complete", "green") + elif status == "failed": + content = Text() + content.append("A2A Conversation Failed\n", style="red bold") + content.append("Total Turns: ", style="white") + content.append(f"{total_turns}\n", style="red") + if error: + content.append("Error: ", style="white") + content.append(f"{error}\n", style="red") - self.current_a2a_conversation_branch = None + self.print_panel(content, "❌ A2A Failed", "red") + + # Reset state self.current_a2a_turn_count = 0 + self._pending_a2a_message = None + self._pending_a2a_agent_role = None + self._pending_a2a_turn_number = None # ----------- MCP EVENTS ----------- @@ -2416,12 +1272,10 @@ To enable tracing, do any one of these: content = Text() reconnect_text = " (Reconnecting)" if is_reconnect else "" content.append(f"MCP Connection Started{reconnect_text}\n\n", style="cyan bold") - content.append("Server: ", style="white") - content.append(f"{server_name}\n", style="cyan") if server_url: content.append("URL: ", style="white") - content.append(f"{server_url}\n", style="cyan dim") + content.append(f"{server_url}\n", style="cyan ") if transport_type: content.append("Transport: ", style="white") @@ -2457,7 +1311,7 @@ To enable tracing, do any one of these: if server_url: content.append("URL: ", style="white") - content.append(f"{server_url}\n", style="green dim") + content.append(f"{server_url}\n", style="green ") if transport_type: content.append("Transport: ", style="white") @@ -2490,7 +1344,7 @@ To enable tracing, do any one of these: if server_url: content.append("URL: ", style="white") - content.append(f"{server_url}\n", style="red dim") + content.append(f"{server_url}\n", style="red ") if transport_type: content.append("Transport: ", style="white") @@ -2520,49 +1374,14 @@ To enable tracing, do any one of these: return content = self.create_status_content( - "MCP Tool Execution Started", + "MCP Tool Started", tool_name, "yellow", tool_args=tool_args or {}, Server=server_name, ) - panel = self.create_panel(content, "🔧 MCP Tool", "yellow") - self.print(panel) - self.print() - - def handle_mcp_tool_execution_completed( - self, - server_name: str, - tool_name: str, - tool_args: dict[str, Any] | None = None, - result: Any | None = None, - execution_duration_ms: float | None = None, - ) -> None: - """Handle MCP tool execution completed event.""" - if not self.verbose: - return - - content = self.create_status_content( - "MCP Tool Execution Completed", - tool_name, - "green", - tool_args=tool_args or {}, - Server=server_name, - ) - - if execution_duration_ms is not None: - content.append("Duration: ", style="white") - content.append(f"{execution_duration_ms:.2f}ms\n", style="green") - - if result is not None: - result_str = str(result) - if len(result_str) > 500: - result_str = result_str[:497] + "..." - content.append("\nResult: ", style="white bold") - content.append(f"{result_str}\n", style="green") - - panel = self.create_panel(content, "✅ MCP Tool Completed", "green") + panel = self.create_panel(content, "🔧 MCP Tool Started", "yellow") self.print(panel) self.print() diff --git a/lib/crewai/src/crewai/tools/tool_usage.py b/lib/crewai/src/crewai/tools/tool_usage.py index 8f753f412..ab3d0fc25 100644 --- a/lib/crewai/src/crewai/tools/tool_usage.py +++ b/lib/crewai/src/crewai/tools/tool_usage.py @@ -249,6 +249,7 @@ class ToolUsage: "tool_args": self.action.tool_input, "tool_class": self.action.tool, "agent": self.agent, + "run_attempts": self._run_attempts, } if self.agent.fingerprint: # type: ignore @@ -435,6 +436,7 @@ class ToolUsage: "tool_args": self.action.tool_input, "tool_class": self.action.tool, "agent": self.agent, + "run_attempts": self._run_attempts, } # TODO: Investigate fingerprint attribute availability on BaseAgent/LiteAgent diff --git a/lib/crewai/tests/test_flow_human_input_integration.py b/lib/crewai/tests/test_flow_human_input_integration.py index 63f6308ed..e60cfe514 100644 --- a/lib/crewai/tests/test_flow_human_input_integration.py +++ b/lib/crewai/tests/test_flow_human_input_integration.py @@ -7,22 +7,19 @@ from crewai.events.event_listener import event_listener class TestFlowHumanInputIntegration: """Test integration between Flow execution and human input functionality.""" - def test_console_formatter_pause_resume_methods(self): - """Test that ConsoleFormatter pause/resume methods work correctly.""" + def test_console_formatter_pause_resume_methods_exist(self): + """Test that ConsoleFormatter pause/resume methods exist and are callable.""" formatter = event_listener.formatter - original_paused_state = formatter._live_paused + # Methods should exist and be callable + assert hasattr(formatter, "pause_live_updates") + assert hasattr(formatter, "resume_live_updates") + assert callable(formatter.pause_live_updates) + assert callable(formatter.resume_live_updates) - try: - formatter._live_paused = False - - formatter.pause_live_updates() - assert formatter._live_paused - - formatter.resume_live_updates() - assert not formatter._live_paused - finally: - formatter._live_paused = original_paused_state + # Should not raise + formatter.pause_live_updates() + formatter.resume_live_updates() @patch("builtins.input", return_value="") def test_human_input_pauses_flow_updates(self, mock_input): @@ -38,23 +35,16 @@ class TestFlowHumanInputIntegration: formatter = event_listener.formatter - original_paused_state = formatter._live_paused + with ( + patch.object(formatter, "pause_live_updates") as mock_pause, + patch.object(formatter, "resume_live_updates") as mock_resume, + ): + result = executor._ask_human_input("Test result") - try: - formatter._live_paused = False - - with ( - patch.object(formatter, "pause_live_updates") as mock_pause, - patch.object(formatter, "resume_live_updates") as mock_resume, - ): - result = executor._ask_human_input("Test result") - - mock_pause.assert_called_once() - mock_resume.assert_called_once() - mock_input.assert_called_once() - assert result == "" - finally: - formatter._live_paused = original_paused_state + mock_pause.assert_called_once() + mock_resume.assert_called_once() + mock_input.assert_called_once() + assert result == "" @patch("builtins.input", side_effect=["feedback", ""]) def test_multiple_human_input_rounds(self, mock_input): @@ -70,53 +60,46 @@ class TestFlowHumanInputIntegration: formatter = event_listener.formatter - original_paused_state = formatter._live_paused + pause_calls = [] + resume_calls = [] - try: - pause_calls = [] - resume_calls = [] + def track_pause(): + pause_calls.append(True) - def track_pause(): - pause_calls.append(True) + def track_resume(): + resume_calls.append(True) - def track_resume(): - resume_calls.append(True) + with ( + patch.object(formatter, "pause_live_updates", side_effect=track_pause), + patch.object( + formatter, "resume_live_updates", side_effect=track_resume + ), + ): + result1 = executor._ask_human_input("Test result 1") + assert result1 == "feedback" - with ( - patch.object(formatter, "pause_live_updates", side_effect=track_pause), - patch.object( - formatter, "resume_live_updates", side_effect=track_resume - ), - ): - result1 = executor._ask_human_input("Test result 1") - assert result1 == "feedback" + result2 = executor._ask_human_input("Test result 2") + assert result2 == "" - result2 = executor._ask_human_input("Test result 2") - assert result2 == "" - - assert len(pause_calls) == 2 - assert len(resume_calls) == 2 - finally: - formatter._live_paused = original_paused_state + assert len(pause_calls) == 2 + assert len(resume_calls) == 2 def test_pause_resume_with_no_live_session(self): """Test pause/resume methods handle case when no Live session exists.""" formatter = event_listener.formatter - original_live = formatter._live - original_paused_state = formatter._live_paused + original_streaming_live = formatter._streaming_live try: - formatter._live = None - formatter._live_paused = False + formatter._streaming_live = None + # Should not raise when no session exists formatter.pause_live_updates() formatter.resume_live_updates() - assert not formatter._live_paused + assert formatter._streaming_live is None finally: - formatter._live = original_live - formatter._live_paused = original_paused_state + formatter._streaming_live = original_streaming_live def test_pause_resume_exception_handling(self): """Test that resume is called even if exception occurs during human input.""" @@ -131,23 +114,18 @@ class TestFlowHumanInputIntegration: formatter = event_listener.formatter - original_paused_state = formatter._live_paused + with ( + patch.object(formatter, "pause_live_updates") as mock_pause, + patch.object(formatter, "resume_live_updates") as mock_resume, + patch( + "builtins.input", side_effect=KeyboardInterrupt("Test exception") + ), + ): + with pytest.raises(KeyboardInterrupt): + executor._ask_human_input("Test result") - try: - with ( - patch.object(formatter, "pause_live_updates") as mock_pause, - patch.object(formatter, "resume_live_updates") as mock_resume, - patch( - "builtins.input", side_effect=KeyboardInterrupt("Test exception") - ), - ): - with pytest.raises(KeyboardInterrupt): - executor._ask_human_input("Test result") - - mock_pause.assert_called_once() - mock_resume.assert_called_once() - finally: - formatter._live_paused = original_paused_state + mock_pause.assert_called_once() + mock_resume.assert_called_once() def test_training_mode_human_input(self): """Test human input in training mode.""" @@ -162,28 +140,25 @@ class TestFlowHumanInputIntegration: formatter = event_listener.formatter - original_paused_state = formatter._live_paused + with ( + patch.object(formatter, "pause_live_updates") as mock_pause, + patch.object(formatter, "resume_live_updates") as mock_resume, + patch.object(formatter.console, "print") as mock_console_print, + patch("builtins.input", return_value="training feedback"), + ): + result = executor._ask_human_input("Test result") - try: - with ( - patch.object(formatter, "pause_live_updates") as mock_pause, - patch.object(formatter, "resume_live_updates") as mock_resume, - patch("builtins.input", return_value="training feedback"), - ): - result = executor._ask_human_input("Test result") + mock_pause.assert_called_once() + mock_resume.assert_called_once() + assert result == "training feedback" - mock_pause.assert_called_once() - mock_resume.assert_called_once() - assert result == "training feedback" - - executor._printer.print.assert_called() - call_args = [ - call[1]["content"] - for call in executor._printer.print.call_args_list - ] - training_prompt_found = any( - "TRAINING MODE" in content for content in call_args - ) - assert training_prompt_found - finally: - formatter._live_paused = original_paused_state + # Verify the training panel was printed via formatter's console + mock_console_print.assert_called() + # Check that a Panel with training title was printed + call_args = mock_console_print.call_args_list + training_panel_found = any( + hasattr(call[0][0], "title") and "Training" in str(call[0][0].title) + for call in call_args + if call[0] + ) + assert training_panel_found diff --git a/lib/crewai/tests/utilities/test_console_formatter_pause_resume.py b/lib/crewai/tests/utilities/test_console_formatter_pause_resume.py index 6a64852e1..0964a0756 100644 --- a/lib/crewai/tests/utilities/test_console_formatter_pause_resume.py +++ b/lib/crewai/tests/utilities/test_console_formatter_pause_resume.py @@ -1,116 +1,107 @@ from unittest.mock import MagicMock, patch -from rich.tree import Tree from rich.live import Live from crewai.events.utils.console_formatter import ConsoleFormatter class TestConsoleFormatterPauseResume: - """Test ConsoleFormatter pause/resume functionality.""" + """Test ConsoleFormatter pause/resume functionality for HITL features.""" - def test_pause_live_updates_with_active_session(self): - """Test pausing when Live session is active.""" + def test_pause_stops_active_streaming_session(self): + """Test pausing stops an active streaming Live session.""" formatter = ConsoleFormatter() mock_live = MagicMock(spec=Live) - formatter._live = mock_live - formatter._live_paused = False + formatter._streaming_live = mock_live formatter.pause_live_updates() mock_live.stop.assert_called_once() - assert formatter._live_paused + assert formatter._streaming_live is None - def test_pause_live_updates_when_already_paused(self): - """Test pausing when already paused does nothing.""" + def test_pause_is_safe_when_no_session(self): + """Test pausing when no streaming session exists doesn't error.""" + formatter = ConsoleFormatter() + formatter._streaming_live = None + + # Should not raise + formatter.pause_live_updates() + + assert formatter._streaming_live is None + + def test_multiple_pauses_are_safe(self): + """Test calling pause multiple times is safe.""" formatter = ConsoleFormatter() mock_live = MagicMock(spec=Live) - formatter._live = mock_live - formatter._live_paused = True + formatter._streaming_live = mock_live formatter.pause_live_updates() + mock_live.stop.assert_called_once() + assert formatter._streaming_live is None - mock_live.stop.assert_not_called() - assert formatter._live_paused - - def test_pause_live_updates_with_no_session(self): - """Test pausing when no Live session exists.""" - formatter = ConsoleFormatter() - - formatter._live = None - formatter._live_paused = False - + # Second pause should not error (no session to stop) formatter.pause_live_updates() - assert formatter._live_paused - - def test_resume_live_updates_when_paused(self): - """Test resuming when paused.""" + def test_resume_is_safe(self): + """Test resume method exists and doesn't error.""" formatter = ConsoleFormatter() - formatter._live_paused = True - + # Should not raise formatter.resume_live_updates() - assert not formatter._live_paused - - def test_resume_live_updates_when_not_paused(self): - """Test resuming when not paused does nothing.""" + def test_streaming_after_pause_resume_creates_new_session(self): + """Test that streaming after pause/resume creates new Live session.""" formatter = ConsoleFormatter() + formatter.verbose = True - formatter._live_paused = False + # Simulate having an active session + mock_live = MagicMock(spec=Live) + formatter._streaming_live = mock_live + # Pause stops the session + formatter.pause_live_updates() + assert formatter._streaming_live is None + + # Resume (no-op, sessions created on demand) formatter.resume_live_updates() - assert not formatter._live_paused + # After resume, streaming should be able to start a new session + with patch("crewai.events.utils.console_formatter.Live") as mock_live_class: + mock_live_instance = MagicMock() + mock_live_class.return_value = mock_live_instance - def test_print_after_resume_restarts_live_session(self): - """Test that printing a Tree after resume creates new Live session.""" + # Simulate streaming chunk (this creates a new Live session) + formatter.handle_llm_stream_chunk("test chunk", call_type=None) + + mock_live_class.assert_called_once() + mock_live_instance.start.assert_called_once() + assert formatter._streaming_live == mock_live_instance + + def test_pause_resume_cycle_with_streaming(self): + """Test full pause/resume cycle during streaming.""" formatter = ConsoleFormatter() - - formatter._live_paused = True - formatter._live = None - - formatter.resume_live_updates() - assert not formatter._live_paused - - tree = Tree("Test") + formatter.verbose = True with patch("crewai.events.utils.console_formatter.Live") as mock_live_class: mock_live_instance = MagicMock() mock_live_class.return_value = mock_live_instance - formatter.print(tree) + # Start streaming + formatter.handle_llm_stream_chunk("chunk 1", call_type=None) + assert formatter._streaming_live == mock_live_instance - mock_live_class.assert_called_once() - mock_live_instance.start.assert_called_once() - assert formatter._live == mock_live_instance + # Pause should stop the session + formatter.pause_live_updates() + mock_live_instance.stop.assert_called_once() + assert formatter._streaming_live is None - def test_multiple_pause_resume_cycles(self): - """Test multiple pause/resume cycles work correctly.""" - formatter = ConsoleFormatter() + # Resume (no-op) + formatter.resume_live_updates() - mock_live = MagicMock(spec=Live) - formatter._live = mock_live - formatter._live_paused = False + # Create a new mock for the next session + mock_live_instance_2 = MagicMock() + mock_live_class.return_value = mock_live_instance_2 - formatter.pause_live_updates() - assert formatter._live_paused - mock_live.stop.assert_called_once() - assert formatter._live is None # Live session should be cleared - - formatter.resume_live_updates() - assert not formatter._live_paused - - formatter.pause_live_updates() - assert formatter._live_paused - - formatter.resume_live_updates() - assert not formatter._live_paused - - def test_pause_resume_state_initialization(self): - """Test that _live_paused is properly initialized.""" - formatter = ConsoleFormatter() - - assert hasattr(formatter, "_live_paused") - assert not formatter._live_paused + # Streaming again creates new session + formatter.handle_llm_stream_chunk("chunk 2", call_type=None) + assert formatter._streaming_live == mock_live_instance_2