diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 6123994e3..b6467ba51 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -1032,6 +1032,20 @@ class Flow(Generic[T], metaclass=FlowMeta): finally: detach(flow_token) + async def akickoff( + self, inputs: dict[str, Any] | None = None + ) -> Any | FlowStreamingOutput: + """Native async method to start the flow execution. Alias for kickoff_async. + + + Args: + inputs: Optional dictionary containing input values and/or a state ID for restoration. + + Returns: + The final output from the flow, which is the result of the last executed method. + """ + return await self.kickoff_async(inputs) + async def _execute_start_method(self, start_method_name: FlowMethodName) -> None: """Executes a flow's start method and its triggered listeners. diff --git a/lib/crewai/tests/test_flow.py b/lib/crewai/tests/test_flow.py index f9f046c68..7a5bccb53 100644 --- a/lib/crewai/tests/test_flow.py +++ b/lib/crewai/tests/test_flow.py @@ -1492,3 +1492,144 @@ def test_flow_copy_state_with_dict_state(): flow.state["test"] = "modified" assert copied_state["test"] == "value" + + +class TestFlowAkickoff: + """Tests for the native async akickoff method.""" + + @pytest.mark.asyncio + async def test_akickoff_basic(self): + """Test basic akickoff execution.""" + execution_order = [] + + class SimpleFlow(Flow): + @start() + def step_1(self): + execution_order.append("step_1") + return "step_1_result" + + @listen(step_1) + def step_2(self, result): + execution_order.append("step_2") + return "final_result" + + flow = SimpleFlow() + result = await flow.akickoff() + + assert execution_order == ["step_1", "step_2"] + assert result == "final_result" + + @pytest.mark.asyncio + async def test_akickoff_with_inputs(self): + """Test akickoff with inputs.""" + + class InputFlow(Flow): + @start() + def process_input(self): + return self.state.get("value", "default") + + flow = InputFlow() + result = await flow.akickoff(inputs={"value": "custom_value"}) + + assert result == "custom_value" + + @pytest.mark.asyncio + async def test_akickoff_with_async_methods(self): + """Test akickoff with async flow methods.""" + execution_order = [] + + class AsyncMethodFlow(Flow): + @start() + async def async_step_1(self): + execution_order.append("async_step_1") + await asyncio.sleep(0.01) + return "async_result" + + @listen(async_step_1) + async def async_step_2(self, result): + execution_order.append("async_step_2") + await asyncio.sleep(0.01) + return f"final_{result}" + + flow = AsyncMethodFlow() + result = await flow.akickoff() + + assert execution_order == ["async_step_1", "async_step_2"] + assert result == "final_async_result" + + @pytest.mark.asyncio + async def test_akickoff_equivalent_to_kickoff_async(self): + """Test that akickoff produces the same results as kickoff_async.""" + execution_order_akickoff = [] + execution_order_kickoff_async = [] + + class TestFlow(Flow): + def __init__(self, execution_list): + super().__init__() + self._execution_list = execution_list + + @start() + def step_1(self): + self._execution_list.append("step_1") + return "result_1" + + @listen(step_1) + def step_2(self, result): + self._execution_list.append("step_2") + return "result_2" + + flow1 = TestFlow(execution_order_akickoff) + result1 = await flow1.akickoff() + + flow2 = TestFlow(execution_order_kickoff_async) + result2 = await flow2.kickoff_async() + + assert execution_order_akickoff == execution_order_kickoff_async + assert result1 == result2 + + @pytest.mark.asyncio + async def test_akickoff_with_multiple_starts(self): + """Test akickoff with multiple start methods.""" + execution_order = [] + + class MultiStartFlow(Flow): + @start() + def start_a(self): + execution_order.append("start_a") + + @start() + def start_b(self): + execution_order.append("start_b") + + flow = MultiStartFlow() + await flow.akickoff() + + assert "start_a" in execution_order + assert "start_b" in execution_order + + @pytest.mark.asyncio + async def test_akickoff_with_router(self): + """Test akickoff with router method.""" + execution_order = [] + + class RouterFlow(Flow): + @start() + def begin(self): + execution_order.append("begin") + return "data" + + @router(begin) + def route(self, data): + execution_order.append("route") + return "PATH_A" + + @listen("PATH_A") + def handle_path_a(self): + execution_order.append("path_a") + return "path_a_result" + + flow = RouterFlow() + result = await flow.akickoff() + + assert execution_order == ["begin", "route", "path_a"] + assert result == "path_a_result"