diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index d8e74fc08..58a59debe 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -1804,8 +1804,13 @@ class Flow(Generic[T], metaclass=FlowMeta): self._pending_and_listeners.clear() self._clear_or_listeners() else: - # We're restoring from persistence, set the flag - self._is_execution_resuming = True + # Only enter resumption mode if there are completed methods to + # replay. When _completed_methods is empty (e.g. a pure + # state-reload via kickoff(inputs={"id": ...})), the flow + # executes from scratch and the flag would incorrectly + # suppress cyclic re-execution on the second iteration. + if self._completed_methods: + self._is_execution_resuming = True if inputs: # Override the id in the state if it exists in inputs diff --git a/lib/crewai/tests/test_flow.py b/lib/crewai/tests/test_flow.py index 0ec4b3063..585b6881e 100644 --- a/lib/crewai/tests/test_flow.py +++ b/lib/crewai/tests/test_flow.py @@ -1772,3 +1772,74 @@ def test_cyclic_flow_multiple_or_listeners_fire_every_iteration(): f"'{method}' should fire every iteration, " f"got {len(events)} fires: {execution_order}" ) + + +def test_cyclic_flow_works_with_persist_and_id_input(): + """Cyclic router flows must complete all iterations when persistence is + enabled and 'id' is passed in inputs. + + Regression test: passing ``inputs={"id": ...}`` with a persistence backend + previously caused ``_is_execution_resuming`` to be set even though + ``_completed_methods`` was empty. The flag was never cleared during + execution, so on the second cycle iteration the resumption path in + ``_execute_single_listener`` short-circuited the router with ``(None, None)`` + and the flow silently terminated after a single iteration. + """ + from uuid import uuid4 + + from crewai.flow.persistence import SQLiteFlowPersistence + + execution_order: list[str] = [] + + class PersistCyclicFlow(Flow): + iteration: int = 0 + max_iterations: int = 3 + + @start() + def begin(self): + execution_order.append("begin") + + @router(or_(begin, "capture")) + def classify(self): + self.iteration += 1 + execution_order.append(f"classify_{self.iteration}") + if self.iteration <= self.max_iterations: + return "type_a" + return "exit" + + @listen("type_a") + def handle(self): + execution_order.append(f"handle_{self.iteration}") + + @listen(or_(handle,)) + def send(self): + execution_order.append(f"send_{self.iteration}") + + @listen("send") + def capture(self): + execution_order.append(f"capture_{self.iteration}") + + @listen("exit") + def finish(self): + execution_order.append("finish") + + persistence = SQLiteFlowPersistence() + flow = PersistCyclicFlow(persistence=persistence) + flow.kickoff(inputs={"id": str(uuid4())}) + + assert "finish" in execution_order, ( + f"Flow should have reached 'finish', got: {execution_order}" + ) + # The router fires max_iterations+1 times (3 cycles + the final "exit") + classify_events = [e for e in execution_order if e.startswith("classify_")] + assert len(classify_events) == 4, ( + f"'classify' should fire 4 times (3 cycles + exit), " + f"got {len(classify_events)}: {execution_order}" + ) + # The other methods fire once per "type_a" cycle + for method in ["handle", "send", "capture"]: + events = [e for e in execution_order if e.startswith(f"{method}_")] + assert len(events) == 3, ( + f"'{method}' should fire 3 times, " + f"got {len(events)}: {execution_order}" + )