Enhance HITL self-loop functionality in human feedback integration tests (#4493)

- Added tests to verify self-loop behavior in HITL routers, ensuring they can handle multiple rejections and immediate approvals.
- Implemented `test_hitl_self_loop_routes_back_to_same_method`, `test_hitl_self_loop_multiple_rejections`, and `test_hitl_self_loop_immediate_approval` to validate the expected execution order and outcomes.
- Updated the `or_()` listener to support looping back to the same method based on human feedback outcomes, improving flow control in complex scenarios.
This commit is contained in:
João Moura
2026-02-15 18:54:42 -08:00
committed by GitHub
parent 09e9229efc
commit 4aedd58829

View File

@@ -14,7 +14,7 @@ from unittest.mock import MagicMock, patch
import pytest
from pydantic import BaseModel
from crewai.flow import Flow, HumanFeedbackResult, human_feedback, listen, start
from crewai.flow import Flow, HumanFeedbackResult, human_feedback, listen, or_, start
from crewai.flow.flow import FlowState
@@ -271,6 +271,182 @@ class TestMultiStepFlows:
assert len(flow.human_feedback_history) == 1
assert flow.human_feedback_history[0].outcome == "rejected"
def test_hitl_self_loop_routes_back_to_same_method(self):
"""Test that a HITL router can loop back to itself via its own emit outcome.
Pattern: review_work listens to or_("do_work", "review") and emits
["review", "approved"]. When the human rejects (outcome="review"),
the method should re-execute. When approved, the flow should continue
to the approve_work listener.
"""
execution_order: list[str] = []
class SelfLoopFlow(Flow):
@start()
def initial_func(self):
execution_order.append("initial_func")
return "initial"
@listen(initial_func)
def do_work(self):
execution_order.append("do_work")
return "work output"
@human_feedback(
message="Do you approve this content?",
emit=["review", "approved"],
llm="gpt-4o-mini",
default_outcome="approved",
)
@listen(or_("do_work", "review"))
def review_work(self):
execution_order.append("review_work")
return "content for review"
@listen("approved")
def approve_work(self):
execution_order.append("approve_work")
return "published"
flow = SelfLoopFlow()
# First call: human rejects (outcome="review") -> self-loop
# Second call: human approves (outcome="approved") -> continue
with (
patch.object(
flow,
"_request_human_feedback",
side_effect=["needs changes", "looks good"],
),
patch.object(
flow,
"_collapse_to_outcome",
side_effect=["review", "approved"],
),
):
result = flow.kickoff()
assert execution_order == [
"initial_func",
"do_work",
"review_work", # first review -> rejected (review)
"review_work", # second review -> approved
"approve_work",
]
assert result == "published"
assert len(flow.human_feedback_history) == 2
assert flow.human_feedback_history[0].outcome == "review"
assert flow.human_feedback_history[1].outcome == "approved"
def test_hitl_self_loop_multiple_rejections(self):
"""Test that a HITL router can loop back multiple times before approving.
Verifies the self-loop works for more than one rejection cycle.
"""
execution_order: list[str] = []
class MultiRejectFlow(Flow):
@start()
def generate(self):
execution_order.append("generate")
return "draft"
@human_feedback(
message="Review this content:",
emit=["revise", "approved"],
llm="gpt-4o-mini",
default_outcome="approved",
)
@listen(or_("generate", "revise"))
def review(self):
execution_order.append("review")
return "content v" + str(execution_order.count("review"))
@listen("approved")
def publish(self):
execution_order.append("publish")
return "published"
flow = MultiRejectFlow()
# Three rejections, then approval
with (
patch.object(
flow,
"_request_human_feedback",
side_effect=["bad", "still bad", "not yet", "great"],
),
patch.object(
flow,
"_collapse_to_outcome",
side_effect=["revise", "revise", "revise", "approved"],
),
):
result = flow.kickoff()
assert execution_order == [
"generate",
"review", # 1st review -> revise
"review", # 2nd review -> revise
"review", # 3rd review -> revise
"review", # 4th review -> approved
"publish",
]
assert result == "published"
assert len(flow.human_feedback_history) == 4
assert [r.outcome for r in flow.human_feedback_history] == [
"revise", "revise", "revise", "approved"
]
def test_hitl_self_loop_immediate_approval(self):
"""Test that a HITL self-loop flow works when approved on the first try.
No looping occurs -- the flow should proceed straight through.
"""
execution_order: list[str] = []
class ImmediateApprovalFlow(Flow):
@start()
def generate(self):
execution_order.append("generate")
return "perfect draft"
@human_feedback(
message="Review:",
emit=["revise", "approved"],
llm="gpt-4o-mini",
)
@listen(or_("generate", "revise"))
def review(self):
execution_order.append("review")
return "content"
@listen("approved")
def publish(self):
execution_order.append("publish")
return "published"
flow = ImmediateApprovalFlow()
with (
patch.object(
flow,
"_request_human_feedback",
return_value="perfect",
),
patch.object(
flow,
"_collapse_to_outcome",
return_value="approved",
),
):
result = flow.kickoff()
assert execution_order == ["generate", "review", "publish"]
assert result == "published"
assert len(flow.human_feedback_history) == 1
assert flow.human_feedback_history[0].outcome == "approved"
def test_router_and_non_router_listeners_for_same_outcome(self):
"""Test that both router and non-router listeners fire for the same outcome."""
execution_order: list[str] = []