From 2fd5c46b420d0148cbb2c9b0b9b3e2c494d4215e Mon Sep 17 00:00:00 2001 From: Heitor Sammuel Carvalho Date: Thu, 18 Dec 2025 11:04:46 -0300 Subject: [PATCH] Emit FlowFailedEvent and finalise batch via added event listener --- .../events/listeners/tracing/trace_listener.py | 11 +++++++++++ lib/crewai/src/crewai/flow/flow.py | 12 ++++++++++++ 2 files changed, 23 insertions(+) diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py index 08afbeb11..bb17ace6e 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py @@ -33,6 +33,7 @@ from crewai.events.types.crew_events import ( ) from crewai.events.types.flow_events import ( FlowCreatedEvent, + FlowFailedEvent, FlowFinishedEvent, FlowPlotEvent, FlowStartedEvent, @@ -201,6 +202,16 @@ class TraceCollectionListener(BaseEventListener): else: self.batch_manager.finalize_batch() + @event_bus.on(FlowFailedEvent) + def on_flow_failed(source: Any, event: FlowFailedEvent) -> None: + self._handle_trace_event("flow_failed", source, event) + if self.batch_manager.batch_owner_type == "flow": + if self.first_time_handler.is_first_time: + self.first_time_handler.mark_events_collected() + self.first_time_handler.handle_execution_completion() + else: + self.batch_manager.finalize_batch() + @event_bus.on(FlowPlotEvent) def on_flow_plot(source: Any, event: FlowPlotEvent) -> None: self._handle_action_event("flow_plot", source, event) diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 07fb5c7ff..bdc2fab75 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -40,6 +40,7 @@ from crewai.events.listeners.tracing.utils import ( ) from crewai.events.types.flow_events import ( FlowCreatedEvent, + FlowFailedEvent, FlowFinishedEvent, FlowPlotEvent, FlowStartedEvent, @@ -1019,6 +1020,17 @@ class Flow(Generic[T], metaclass=FlowMeta): self._event_futures.clear() return final_output + except Exception as e: + future = crewai_event_bus.emit( + self, + FlowFailedEvent( + flow_name=self.name or self.__class__.__name__, + error=e, + ), + ) + if future: + self._event_futures.append(future) + raise e finally: detach(flow_token)