From 4aedd58829d6eb7678e8c53ae725a931372a5c5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Moura?= Date: Sun, 15 Feb 2026 18:54:42 -0800 Subject: [PATCH] Enhance HITL self-loop functionality in human feedback integration tests (#4493) - Added tests to verify self-loop behavior in HITL routers, ensuring they can handle multiple rejections and immediate approvals. - Implemented `test_hitl_self_loop_routes_back_to_same_method`, `test_hitl_self_loop_multiple_rejections`, and `test_hitl_self_loop_immediate_approval` to validate the expected execution order and outcomes. - Updated the `or_()` listener to support looping back to the same method based on human feedback outcomes, improving flow control in complex scenarios. --- .../tests/test_human_feedback_integration.py | 178 +++++++++++++++++- 1 file changed, 177 insertions(+), 1 deletion(-) diff --git a/lib/crewai/tests/test_human_feedback_integration.py b/lib/crewai/tests/test_human_feedback_integration.py index d2d6a6f31..15f1e364c 100644 --- a/lib/crewai/tests/test_human_feedback_integration.py +++ b/lib/crewai/tests/test_human_feedback_integration.py @@ -14,7 +14,7 @@ from unittest.mock import MagicMock, patch import pytest from pydantic import BaseModel -from crewai.flow import Flow, HumanFeedbackResult, human_feedback, listen, start +from crewai.flow import Flow, HumanFeedbackResult, human_feedback, listen, or_, start from crewai.flow.flow import FlowState @@ -271,6 +271,182 @@ class TestMultiStepFlows: assert len(flow.human_feedback_history) == 1 assert flow.human_feedback_history[0].outcome == "rejected" + def test_hitl_self_loop_routes_back_to_same_method(self): + """Test that a HITL router can loop back to itself via its own emit outcome. + + Pattern: review_work listens to or_("do_work", "review") and emits + ["review", "approved"]. When the human rejects (outcome="review"), + the method should re-execute. When approved, the flow should continue + to the approve_work listener. + """ + execution_order: list[str] = [] + + class SelfLoopFlow(Flow): + @start() + def initial_func(self): + execution_order.append("initial_func") + return "initial" + + @listen(initial_func) + def do_work(self): + execution_order.append("do_work") + return "work output" + + @human_feedback( + message="Do you approve this content?", + emit=["review", "approved"], + llm="gpt-4o-mini", + default_outcome="approved", + ) + @listen(or_("do_work", "review")) + def review_work(self): + execution_order.append("review_work") + return "content for review" + + @listen("approved") + def approve_work(self): + execution_order.append("approve_work") + return "published" + + flow = SelfLoopFlow() + + # First call: human rejects (outcome="review") -> self-loop + # Second call: human approves (outcome="approved") -> continue + with ( + patch.object( + flow, + "_request_human_feedback", + side_effect=["needs changes", "looks good"], + ), + patch.object( + flow, + "_collapse_to_outcome", + side_effect=["review", "approved"], + ), + ): + result = flow.kickoff() + + assert execution_order == [ + "initial_func", + "do_work", + "review_work", # first review -> rejected (review) + "review_work", # second review -> approved + "approve_work", + ] + assert result == "published" + assert len(flow.human_feedback_history) == 2 + assert flow.human_feedback_history[0].outcome == "review" + assert flow.human_feedback_history[1].outcome == "approved" + + def test_hitl_self_loop_multiple_rejections(self): + """Test that a HITL router can loop back multiple times before approving. + + Verifies the self-loop works for more than one rejection cycle. + """ + execution_order: list[str] = [] + + class MultiRejectFlow(Flow): + @start() + def generate(self): + execution_order.append("generate") + return "draft" + + @human_feedback( + message="Review this content:", + emit=["revise", "approved"], + llm="gpt-4o-mini", + default_outcome="approved", + ) + @listen(or_("generate", "revise")) + def review(self): + execution_order.append("review") + return "content v" + str(execution_order.count("review")) + + @listen("approved") + def publish(self): + execution_order.append("publish") + return "published" + + flow = MultiRejectFlow() + + # Three rejections, then approval + with ( + patch.object( + flow, + "_request_human_feedback", + side_effect=["bad", "still bad", "not yet", "great"], + ), + patch.object( + flow, + "_collapse_to_outcome", + side_effect=["revise", "revise", "revise", "approved"], + ), + ): + result = flow.kickoff() + + assert execution_order == [ + "generate", + "review", # 1st review -> revise + "review", # 2nd review -> revise + "review", # 3rd review -> revise + "review", # 4th review -> approved + "publish", + ] + assert result == "published" + assert len(flow.human_feedback_history) == 4 + assert [r.outcome for r in flow.human_feedback_history] == [ + "revise", "revise", "revise", "approved" + ] + + def test_hitl_self_loop_immediate_approval(self): + """Test that a HITL self-loop flow works when approved on the first try. + + No looping occurs -- the flow should proceed straight through. + """ + execution_order: list[str] = [] + + class ImmediateApprovalFlow(Flow): + @start() + def generate(self): + execution_order.append("generate") + return "perfect draft" + + @human_feedback( + message="Review:", + emit=["revise", "approved"], + llm="gpt-4o-mini", + ) + @listen(or_("generate", "revise")) + def review(self): + execution_order.append("review") + return "content" + + @listen("approved") + def publish(self): + execution_order.append("publish") + return "published" + + flow = ImmediateApprovalFlow() + + with ( + patch.object( + flow, + "_request_human_feedback", + return_value="perfect", + ), + patch.object( + flow, + "_collapse_to_outcome", + return_value="approved", + ), + ): + result = flow.kickoff() + + assert execution_order == ["generate", "review", "publish"] + assert result == "published" + assert len(flow.human_feedback_history) == 1 + assert flow.human_feedback_history[0].outcome == "approved" + def test_router_and_non_router_listeners_for_same_outcome(self): """Test that both router and non-router listeners fire for the same outcome.""" execution_order: list[str] = []