diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index 2c7f583b9..60492c383 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -957,8 +957,8 @@ class Crew(FlowTrackable, BaseModel): pending_tasks.append((task, async_task, task_index)) else: if pending_tasks: - task_outputs = await self._aprocess_async_tasks( - pending_tasks, was_replayed + task_outputs.extend( + await self._aprocess_async_tasks(pending_tasks, was_replayed) ) pending_tasks.clear() @@ -973,7 +973,9 @@ class Crew(FlowTrackable, BaseModel): self._store_execution_log(task, task_output, task_index, was_replayed) if pending_tasks: - task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed) + task_outputs.extend( + await self._aprocess_async_tasks(pending_tasks, was_replayed) + ) return self._create_crew_output(task_outputs) @@ -987,7 +989,9 @@ class Crew(FlowTrackable, BaseModel): ) -> TaskOutput | None: """Handle conditional task evaluation using native async.""" if pending_tasks: - task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed) + task_outputs.extend( + await self._aprocess_async_tasks(pending_tasks, was_replayed) + ) pending_tasks.clear() return check_conditional_skip( @@ -1152,7 +1156,7 @@ class Crew(FlowTrackable, BaseModel): futures.append((task, future, task_index)) else: if futures: - task_outputs = self._process_async_tasks(futures, was_replayed) + task_outputs.extend(self._process_async_tasks(futures, was_replayed)) futures.clear() context = self._get_context(task, task_outputs) @@ -1166,7 +1170,7 @@ class Crew(FlowTrackable, BaseModel): self._store_execution_log(task, task_output, task_index, was_replayed) if futures: - task_outputs = self._process_async_tasks(futures, was_replayed) + task_outputs.extend(self._process_async_tasks(futures, was_replayed)) return self._create_crew_output(task_outputs) @@ -1179,7 +1183,7 @@ class Crew(FlowTrackable, BaseModel): was_replayed: bool, ) -> TaskOutput | None: if futures: - task_outputs = self._process_async_tasks(futures, was_replayed) + task_outputs.extend(self._process_async_tasks(futures, was_replayed)) futures.clear() return check_conditional_skip( diff --git a/lib/crewai/tests/crew/test_async_crew.py b/lib/crewai/tests/crew/test_async_crew.py index aaaffa64f..ebe53fa24 100644 --- a/lib/crewai/tests/crew/test_async_crew.py +++ b/lib/crewai/tests/crew/test_async_crew.py @@ -381,4 +381,171 @@ class TestAsyncProcessAsyncTasks: async def test_aprocess_async_tasks_empty(self, test_crew: Crew) -> None: """Test processing empty list of async tasks.""" result = await test_crew._aprocess_async_tasks([]) - assert result == [] \ No newline at end of file + assert result == [] + + +class TestMixedSyncAsyncTaskOutputs: + """Tests for issue #4137: Task outputs lost when mixing sync and async tasks. + + These tests verify that when a Crew executes a mix of synchronous and + asynchronous tasks, all task outputs are preserved correctly. + """ + + @pytest.mark.asyncio + @patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock) + async def test_async_sync_task_before_async_task_outputs_preserved( + self, mock_execute: AsyncMock, test_agent: Agent + ) -> None: + """Test that sync task outputs before async tasks are preserved. + + Scenario: sync -> async -> sync + Expected: All 3 task outputs should be in the result. + """ + task1 = Task( + description="Sync task 1", + expected_output="Output 1", + agent=test_agent, + async_execution=False, + ) + task2 = Task( + description="Async task 2", + expected_output="Output 2", + agent=test_agent, + async_execution=True, + ) + task3 = Task( + description="Sync task 3", + expected_output="Output 3", + agent=test_agent, + async_execution=False, + ) + crew = Crew( + agents=[test_agent], + tasks=[task1, task2, task3], + verbose=False, + ) + + mock_output1 = TaskOutput( + description="Sync task 1", + raw="Result 1", + agent="Test Agent", + ) + mock_output2 = TaskOutput( + description="Async task 2", + raw="Result 2", + agent="Test Agent", + ) + mock_output3 = TaskOutput( + description="Sync task 3", + raw="Result 3", + agent="Test Agent", + ) + mock_execute.side_effect = [mock_output1, mock_output2, mock_output3] + + result = await crew._aexecute_tasks(crew.tasks) + + assert result is not None + assert len(result.tasks_output) == 3 + assert result.tasks_output[0].raw == "Result 1" + assert result.tasks_output[1].raw == "Result 2" + assert result.tasks_output[2].raw == "Result 3" + + @pytest.mark.asyncio + @patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock) + async def test_async_crew_ending_with_async_task_preserves_outputs( + self, mock_execute: AsyncMock, test_agent: Agent + ) -> None: + """Test that outputs are preserved when crew ends with async task. + + Scenario: sync -> async (final) + Expected: Both task outputs should be in the result. + """ + task1 = Task( + description="Sync task 1", + expected_output="Output 1", + agent=test_agent, + async_execution=False, + ) + task2 = Task( + description="Async task 2", + expected_output="Output 2", + agent=test_agent, + async_execution=True, + ) + crew = Crew( + agents=[test_agent], + tasks=[task1, task2], + verbose=False, + ) + + mock_output1 = TaskOutput( + description="Sync task 1", + raw="Result 1", + agent="Test Agent", + ) + mock_output2 = TaskOutput( + description="Async task 2", + raw="Result 2", + agent="Test Agent", + ) + mock_execute.side_effect = [mock_output1, mock_output2] + + result = await crew._aexecute_tasks(crew.tasks) + + assert result is not None + assert len(result.tasks_output) == 2 + assert result.tasks_output[0].raw == "Result 1" + assert result.tasks_output[1].raw == "Result 2" + + @pytest.mark.asyncio + @patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock) + async def test_async_multiple_sync_before_async_all_preserved( + self, mock_execute: AsyncMock, test_agent: Agent + ) -> None: + """Test that multiple sync task outputs before async are preserved. + + Scenario: sync -> sync -> async -> sync + Expected: All 4 task outputs should be in the result. + """ + task1 = Task( + description="Sync task 1", + expected_output="Output 1", + agent=test_agent, + async_execution=False, + ) + task2 = Task( + description="Sync task 2", + expected_output="Output 2", + agent=test_agent, + async_execution=False, + ) + task3 = Task( + description="Async task 3", + expected_output="Output 3", + agent=test_agent, + async_execution=True, + ) + task4 = Task( + description="Sync task 4", + expected_output="Output 4", + agent=test_agent, + async_execution=False, + ) + crew = Crew( + agents=[test_agent], + tasks=[task1, task2, task3, task4], + verbose=False, + ) + + mock_outputs = [ + TaskOutput(description=f"Task {i}", raw=f"Result {i}", agent="Test Agent") + for i in range(1, 5) + ] + mock_execute.side_effect = mock_outputs + + result = await crew._aexecute_tasks(crew.tasks) + + assert result is not None + assert len(result.tasks_output) == 4 + for i in range(4): + assert result.tasks_output[i].raw == f"Result {i + 1}" diff --git a/lib/crewai/tests/test_crew.py b/lib/crewai/tests/test_crew.py index 4f485f207..8bd8c049e 100644 --- a/lib/crewai/tests/test_crew.py +++ b/lib/crewai/tests/test_crew.py @@ -1251,6 +1251,200 @@ async def test_async_task_execution_call_count(researcher, writer): assert mock_execute_sync.call_count == 1 +def test_sync_task_outputs_preserved_when_mixing_sync_async_tasks(): + """Test for issue #4137: Task outputs lost when mixing sync and async tasks. + + Scenario: sync -> async -> sync + Expected: All 3 task outputs should be in the result. + """ + researcher_agent = Agent( + role="Researcher", + goal="Research topics", + backstory="Expert researcher", + allow_delegation=False, + ) + + task1 = Task( + description="Sync task 1", + expected_output="Output 1", + agent=researcher_agent, + async_execution=False, + ) + task2 = Task( + description="Async task 2", + expected_output="Output 2", + agent=researcher_agent, + async_execution=True, + ) + task3 = Task( + description="Sync task 3", + expected_output="Output 3", + agent=researcher_agent, + async_execution=False, + ) + + crew = Crew( + agents=[researcher_agent], + tasks=[task1, task2, task3], + verbose=False, + ) + + mock_output1 = TaskOutput( + description="Sync task 1", + raw="Result 1", + agent="Researcher", + ) + mock_output2 = TaskOutput( + description="Async task 2", + raw="Result 2", + agent="Researcher", + ) + mock_output3 = TaskOutput( + description="Sync task 3", + raw="Result 3", + agent="Researcher", + ) + + mock_future = MagicMock(spec=Future) + mock_future.result.return_value = mock_output2 + + with ( + patch.object(Task, "execute_sync", side_effect=[mock_output1, mock_output3]), + patch.object(Task, "execute_async", return_value=mock_future), + ): + result = crew.kickoff() + + assert result is not None + assert len(result.tasks_output) == 3 + assert result.tasks_output[0].raw == "Result 1" + assert result.tasks_output[1].raw == "Result 2" + assert result.tasks_output[2].raw == "Result 3" + + +def test_sync_task_outputs_preserved_when_crew_ends_with_async_task(): + """Test for issue #4137: Task outputs preserved when crew ends with async task. + + Scenario: sync -> async (final) + Expected: Both task outputs should be in the result. + """ + researcher_agent = Agent( + role="Researcher", + goal="Research topics", + backstory="Expert researcher", + allow_delegation=False, + ) + + task1 = Task( + description="Sync task 1", + expected_output="Output 1", + agent=researcher_agent, + async_execution=False, + ) + task2 = Task( + description="Async task 2", + expected_output="Output 2", + agent=researcher_agent, + async_execution=True, + ) + + crew = Crew( + agents=[researcher_agent], + tasks=[task1, task2], + verbose=False, + ) + + mock_output1 = TaskOutput( + description="Sync task 1", + raw="Result 1", + agent="Researcher", + ) + mock_output2 = TaskOutput( + description="Async task 2", + raw="Result 2", + agent="Researcher", + ) + + mock_future = MagicMock(spec=Future) + mock_future.result.return_value = mock_output2 + + with ( + patch.object(Task, "execute_sync", return_value=mock_output1), + patch.object(Task, "execute_async", return_value=mock_future), + ): + result = crew.kickoff() + + assert result is not None + assert len(result.tasks_output) == 2 + assert result.tasks_output[0].raw == "Result 1" + assert result.tasks_output[1].raw == "Result 2" + + +def test_sync_multiple_sync_tasks_before_async_all_preserved(): + """Test for issue #4137: Multiple sync task outputs before async are preserved. + + Scenario: sync -> sync -> async -> sync + Expected: All 4 task outputs should be in the result. + """ + researcher_agent = Agent( + role="Researcher", + goal="Research topics", + backstory="Expert researcher", + allow_delegation=False, + ) + + task1 = Task( + description="Sync task 1", + expected_output="Output 1", + agent=researcher_agent, + async_execution=False, + ) + task2 = Task( + description="Sync task 2", + expected_output="Output 2", + agent=researcher_agent, + async_execution=False, + ) + task3 = Task( + description="Async task 3", + expected_output="Output 3", + agent=researcher_agent, + async_execution=True, + ) + task4 = Task( + description="Sync task 4", + expected_output="Output 4", + agent=researcher_agent, + async_execution=False, + ) + + crew = Crew( + agents=[researcher_agent], + tasks=[task1, task2, task3, task4], + verbose=False, + ) + + mock_outputs = [ + TaskOutput(description=f"Task {i}", raw=f"Result {i}", agent="Researcher") + for i in range(1, 5) + ] + + mock_future = MagicMock(spec=Future) + mock_future.result.return_value = mock_outputs[2] + + with ( + patch.object( + Task, "execute_sync", side_effect=[mock_outputs[0], mock_outputs[1], mock_outputs[3]] + ), + patch.object(Task, "execute_async", return_value=mock_future), + ): + result = crew.kickoff() + + assert result is not None + assert len(result.tasks_output) == 4 + for i in range(4): + assert result.tasks_output[i].raw == f"Result {i + 1}" + + @pytest.mark.vcr() def test_kickoff_for_each_single_input(): """Tests if kickoff_for_each works with a single input."""