Lorenze/metrics for human feedback flows (#4188)

* measuring human feedback feat

* add some tests
This commit is contained in:
Lorenze Jay
2026-01-06 16:12:34 -08:00
committed by GitHub
parent b787d7e591
commit 8945457883
4 changed files with 202 additions and 3 deletions

View File

@@ -10,7 +10,7 @@ This module provides the event infrastructure that allows users to:
from __future__ import annotations
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.depends import Depends
@@ -34,6 +34,8 @@ from crewai.events.types.flow_events import (
FlowFinishedEvent,
FlowPlotEvent,
FlowStartedEvent,
HumanFeedbackReceivedEvent,
HumanFeedbackRequestedEvent,
MethodExecutionFailedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
@@ -145,6 +147,8 @@ __all__ = [
"FlowFinishedEvent",
"FlowPlotEvent",
"FlowStartedEvent",
"HumanFeedbackReceivedEvent",
"HumanFeedbackRequestedEvent",
"KnowledgeQueryCompletedEvent",
"KnowledgeQueryFailedEvent",
"KnowledgeQueryStartedEvent",
@@ -205,7 +209,7 @@ _AGENT_EVENT_MAPPING = {
}
def __getattr__(name: str):
def __getattr__(name: str) -> Any:
"""Lazy import for agent events to avoid circular imports."""
if name in _AGENT_EVENT_MAPPING:
import importlib

View File

@@ -37,6 +37,8 @@ from crewai.events.types.flow_events import (
FlowFinishedEvent,
FlowPausedEvent,
FlowStartedEvent,
HumanFeedbackReceivedEvent,
HumanFeedbackRequestedEvent,
MethodExecutionFailedEvent,
MethodExecutionFinishedEvent,
MethodExecutionPausedEvent,
@@ -67,7 +69,6 @@ from crewai.events.types.mcp_events import (
MCPConnectionCompletedEvent,
MCPConnectionFailedEvent,
MCPConnectionStartedEvent,
MCPToolExecutionCompletedEvent,
MCPToolExecutionFailedEvent,
MCPToolExecutionStartedEvent,
)
@@ -329,6 +330,33 @@ class EventListener(BaseEventListener):
"paused",
)
# ----------- HUMAN FEEDBACK EVENTS -----------
@crewai_event_bus.on(HumanFeedbackRequestedEvent)
def on_human_feedback_requested(
_: Any, event: HumanFeedbackRequestedEvent
) -> None:
"""Handle human feedback requested event."""
has_routing = event.emit is not None and len(event.emit) > 0
self._telemetry.human_feedback_span(
event_type="requested",
has_routing=has_routing,
num_outcomes=len(event.emit) if event.emit else 0,
)
@crewai_event_bus.on(HumanFeedbackReceivedEvent)
def on_human_feedback_received(
_: Any, event: HumanFeedbackReceivedEvent
) -> None:
"""Handle human feedback received event."""
has_routing = event.outcome is not None
self._telemetry.human_feedback_span(
event_type="received",
has_routing=has_routing,
num_outcomes=0,
feedback_provided=bool(event.feedback and event.feedback.strip()),
outcome=event.outcome,
)
# ----------- TOOL USAGE EVENTS -----------
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source: Any, event: ToolUsageStartedEvent) -> None:

View File

@@ -969,3 +969,35 @@ class Telemetry:
close_span(span)
self._safe_telemetry_operation(_operation)
def human_feedback_span(
self,
event_type: str,
has_routing: bool,
num_outcomes: int = 0,
feedback_provided: bool | None = None,
outcome: str | None = None,
) -> None:
"""Records human feedback feature usage.
Args:
event_type: Type of event - "requested" or "received".
has_routing: Whether emit options were configured for routing.
num_outcomes: Number of possible outcomes if routing is used.
feedback_provided: Whether user provided feedback or skipped (None if requested).
outcome: The collapsed outcome string if routing was used.
"""
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Human Feedback")
self._add_attribute(span, "event_type", event_type)
self._add_attribute(span, "has_routing", has_routing)
self._add_attribute(span, "num_outcomes", num_outcomes)
if feedback_provided is not None:
self._add_attribute(span, "feedback_provided", feedback_provided)
if outcome is not None:
self._add_attribute(span, "outcome", outcome)
close_span(span)
self._safe_telemetry_operation(_operation)

View File

@@ -25,6 +25,8 @@ from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
FlowStartedEvent,
HumanFeedbackReceivedEvent,
HumanFeedbackRequestedEvent,
MethodExecutionFailedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
@@ -45,6 +47,7 @@ from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
)
from crewai.flow.flow import Flow, listen, start
from crewai.flow.human_feedback import human_feedback
from crewai.llm import LLM
from crewai.task import Task
from crewai.tools.base_tool import BaseTool
@@ -1273,3 +1276,135 @@ def test_llm_emits_event_with_lite_agent():
assert set(all_agent_roles) == {agent.role}
assert set(all_agent_id) == {str(agent.id)}
# ----------- HUMAN FEEDBACK EVENTS -----------
@patch("builtins.input", return_value="looks good")
@patch("builtins.print")
def test_human_feedback_emits_requested_and_received_events(mock_print, mock_input):
"""Test that @human_feedback decorator emits HumanFeedbackRequested and Received events."""
requested_events = []
received_events = []
events_received = threading.Event()
@crewai_event_bus.on(HumanFeedbackRequestedEvent)
def handle_requested(source, event):
requested_events.append(event)
@crewai_event_bus.on(HumanFeedbackReceivedEvent)
def handle_received(source, event):
received_events.append(event)
events_received.set()
class TestFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review(self):
return "test content"
flow = TestFlow()
with patch.object(flow, "_collapse_to_outcome", return_value="approved"):
flow.kickoff()
assert events_received.wait(timeout=5), (
"Timeout waiting for human feedback events"
)
assert len(requested_events) == 1
assert requested_events[0].type == "human_feedback_requested"
assert requested_events[0].emit == ["approved", "rejected"]
assert requested_events[0].message == "Review:"
assert requested_events[0].output == "test content"
assert len(received_events) == 1
assert received_events[0].type == "human_feedback_received"
assert received_events[0].feedback == "looks good"
assert received_events[0].outcome is None
assert flow.last_human_feedback is not None
assert flow.last_human_feedback.outcome == "approved"
@patch("builtins.input", return_value="feedback text")
@patch("builtins.print")
def test_human_feedback_without_routing_emits_events(mock_print, mock_input):
"""Test that @human_feedback without emit still emits events."""
requested_events = []
received_events = []
events_received = threading.Event()
@crewai_event_bus.on(HumanFeedbackRequestedEvent)
def handle_requested(source, event):
requested_events.append(event)
@crewai_event_bus.on(HumanFeedbackReceivedEvent)
def handle_received(source, event):
received_events.append(event)
events_received.set()
class SimpleFlow(Flow):
@start()
@human_feedback(message="Please review:")
def review(self):
return "content to review"
flow = SimpleFlow()
flow.kickoff()
assert events_received.wait(timeout=5), (
"Timeout waiting for human feedback events"
)
assert len(requested_events) == 1
assert requested_events[0].emit is None
assert len(received_events) == 1
assert received_events[0].feedback == "feedback text"
assert received_events[0].outcome is None
@patch("builtins.input", return_value="")
@patch("builtins.print")
def test_human_feedback_empty_feedback_emits_events(mock_print, mock_input):
"""Test that empty feedback (skipped) still emits events correctly."""
received_events = []
events_received = threading.Event()
@crewai_event_bus.on(HumanFeedbackReceivedEvent)
def handle_received(source, event):
received_events.append(event)
events_received.set()
class SkipFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
default_outcome="rejected",
)
def review(self):
return "content"
flow = SkipFlow()
flow.kickoff()
assert events_received.wait(timeout=5), (
"Timeout waiting for human feedback events"
)
assert len(received_events) == 1
assert received_events[0].feedback == ""
assert received_events[0].outcome is None
assert flow.last_human_feedback is not None
assert flow.last_human_feedback.outcome == "rejected"