mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-02-20 03:00:31 -05:00
Fix cyclic flows silently breaking when persistence ID is passed in inputs (#4501)
* Implement user input handling in Flow class - Introduced `FlowInputRequestedEvent` and `FlowInputReceivedEvent` to manage user input requests and responses during flow execution. - Added `InputProvider` protocol and `InputResponse` dataclass for customizable input handling. - Enhanced `Flow` class with `ask()` method to request user input, including timeout handling and state checkpointing. - Updated `FlowConfig` to support custom input providers. - Created `input_provider.py` for default input provider implementations, including a console-based provider. - Added comprehensive tests for `ask()` functionality, covering basic usage, timeout behavior, and integration with flow machinery. * Potential fix for pull request finding 'Unused import' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com> * Refactor test_flow_ask.py to streamline flow kickoff calls - Removed unnecessary variable assignments for the result of `flow.kickoff()` in two test cases, improving code clarity. - Updated assertions to ensure the expected execution log entries are present after the flow kickoff, enhancing test reliability. * Add current_flow_method_name context variable for flow method tracking - Introduced a new context variable, `current_flow_method_name`, to store the name of the currently executing flow method, defaulting to "unknown". - Updated the Flow class to set and reset this context variable during method execution, enhancing the ability to track method calls without stack inspection. - Removed the obsolete `_resolve_calling_method_name` method, streamlining the code and improving clarity. * Enhance input history management in Flow class - Introduced a new `InputHistoryEntry` TypedDict to structure user input history for the `ask()` method, capturing details such as the question, user response, method name, timestamp, and associated metadata. - Updated the `_input_history` attribute in the Flow class to utilize the new `InputHistoryEntry` type, improving type safety and clarity in input history management. * Enhance timeout handling in Flow class input requests - Updated the `ask()` method to improve timeout management by manually managing the `ThreadPoolExecutor`, preventing potential deadlocks when the provider call exceeds the timeout duration. - Added clarifications in the documentation regarding the behavior of the timeout and the underlying request handling, ensuring better understanding for users. * Enhance memory reset functionality in CLI commands - Introduced flow memory reset capabilities in the `reset_memories_command`, allowing for both crew and flow memory resets. - Added a new utility function `_reset_flow_memory` to handle memory resets for individual flow instances, improving modularity and clarity. - Updated the `get_flows` utility to discover flow instances from project files, enhancing the CLI's ability to manage flow states. - Expanded test coverage to validate the new flow memory reset features, ensuring robust functionality and error handling. * LINTER * Fix resumption flag logic in Flow class and add regression test for cyclic flow persistence - Updated the logic for setting the `_is_execution_resuming` flag to ensure it only activates when there are completed methods to replay, preventing incorrect suppression of cyclic re-execution during state reloads. - Added a regression test to validate that cyclic router flows complete all iterations when persistence is enabled and an 'id' is passed in inputs, ensuring robust handling of flow execution in these scenarios. --------- Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
This commit is contained in:
@@ -1804,8 +1804,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
self._pending_and_listeners.clear()
|
||||
self._clear_or_listeners()
|
||||
else:
|
||||
# We're restoring from persistence, set the flag
|
||||
self._is_execution_resuming = True
|
||||
# Only enter resumption mode if there are completed methods to
|
||||
# replay. When _completed_methods is empty (e.g. a pure
|
||||
# state-reload via kickoff(inputs={"id": ...})), the flow
|
||||
# executes from scratch and the flag would incorrectly
|
||||
# suppress cyclic re-execution on the second iteration.
|
||||
if self._completed_methods:
|
||||
self._is_execution_resuming = True
|
||||
|
||||
if inputs:
|
||||
# Override the id in the state if it exists in inputs
|
||||
|
||||
@@ -1772,3 +1772,74 @@ def test_cyclic_flow_multiple_or_listeners_fire_every_iteration():
|
||||
f"'{method}' should fire every iteration, "
|
||||
f"got {len(events)} fires: {execution_order}"
|
||||
)
|
||||
|
||||
|
||||
def test_cyclic_flow_works_with_persist_and_id_input():
|
||||
"""Cyclic router flows must complete all iterations when persistence is
|
||||
enabled and 'id' is passed in inputs.
|
||||
|
||||
Regression test: passing ``inputs={"id": ...}`` with a persistence backend
|
||||
previously caused ``_is_execution_resuming`` to be set even though
|
||||
``_completed_methods`` was empty. The flag was never cleared during
|
||||
execution, so on the second cycle iteration the resumption path in
|
||||
``_execute_single_listener`` short-circuited the router with ``(None, None)``
|
||||
and the flow silently terminated after a single iteration.
|
||||
"""
|
||||
from uuid import uuid4
|
||||
|
||||
from crewai.flow.persistence import SQLiteFlowPersistence
|
||||
|
||||
execution_order: list[str] = []
|
||||
|
||||
class PersistCyclicFlow(Flow):
|
||||
iteration: int = 0
|
||||
max_iterations: int = 3
|
||||
|
||||
@start()
|
||||
def begin(self):
|
||||
execution_order.append("begin")
|
||||
|
||||
@router(or_(begin, "capture"))
|
||||
def classify(self):
|
||||
self.iteration += 1
|
||||
execution_order.append(f"classify_{self.iteration}")
|
||||
if self.iteration <= self.max_iterations:
|
||||
return "type_a"
|
||||
return "exit"
|
||||
|
||||
@listen("type_a")
|
||||
def handle(self):
|
||||
execution_order.append(f"handle_{self.iteration}")
|
||||
|
||||
@listen(or_(handle,))
|
||||
def send(self):
|
||||
execution_order.append(f"send_{self.iteration}")
|
||||
|
||||
@listen("send")
|
||||
def capture(self):
|
||||
execution_order.append(f"capture_{self.iteration}")
|
||||
|
||||
@listen("exit")
|
||||
def finish(self):
|
||||
execution_order.append("finish")
|
||||
|
||||
persistence = SQLiteFlowPersistence()
|
||||
flow = PersistCyclicFlow(persistence=persistence)
|
||||
flow.kickoff(inputs={"id": str(uuid4())})
|
||||
|
||||
assert "finish" in execution_order, (
|
||||
f"Flow should have reached 'finish', got: {execution_order}"
|
||||
)
|
||||
# The router fires max_iterations+1 times (3 cycles + the final "exit")
|
||||
classify_events = [e for e in execution_order if e.startswith("classify_")]
|
||||
assert len(classify_events) == 4, (
|
||||
f"'classify' should fire 4 times (3 cycles + exit), "
|
||||
f"got {len(classify_events)}: {execution_order}"
|
||||
)
|
||||
# The other methods fire once per "type_a" cycle
|
||||
for method in ["handle", "send", "capture"]:
|
||||
events = [e for e in execution_order if e.startswith(f"{method}_")]
|
||||
assert len(events) == 3, (
|
||||
f"'{method}' should fire 3 times, "
|
||||
f"got {len(events)}: {execution_order}"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user