fix: preserve task outputs when mixing sync and async tasks (#4137)

This fixes issue #4137 where task outputs were lost when a Crew executed
a mix of synchronous and asynchronous tasks. The bug was caused by
_process_async_tasks and _aprocess_async_tasks returning a new list,
which then replaced the existing task_outputs list instead of extending it.

Changes:
- Changed task_outputs = self._process_async_tasks(...) to
  task_outputs.extend(self._process_async_tasks(...)) in _execute_tasks
- Changed task_outputs = await self._aprocess_async_tasks(...) to
  task_outputs.extend(await self._aprocess_async_tasks(...)) in _aexecute_tasks
- Applied the same fix to _handle_conditional_task and _ahandle_conditional_task

Added tests:
- test_sync_task_outputs_preserved_when_mixing_sync_async_tasks
- test_sync_task_outputs_preserved_when_crew_ends_with_async_task
- test_sync_multiple_sync_tasks_before_async_all_preserved
- TestMixedSyncAsyncTaskOutputs class with async variants

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2025-12-20 15:09:38 +00:00
parent be70a04153
commit 029eedfddb
3 changed files with 373 additions and 8 deletions

View File

@@ -957,8 +957,8 @@ class Crew(FlowTrackable, BaseModel):
pending_tasks.append((task, async_task, task_index))
else:
if pending_tasks:
task_outputs = await self._aprocess_async_tasks(
pending_tasks, was_replayed
task_outputs.extend(
await self._aprocess_async_tasks(pending_tasks, was_replayed)
)
pending_tasks.clear()
@@ -973,7 +973,9 @@ class Crew(FlowTrackable, BaseModel):
self._store_execution_log(task, task_output, task_index, was_replayed)
if pending_tasks:
task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed)
task_outputs.extend(
await self._aprocess_async_tasks(pending_tasks, was_replayed)
)
return self._create_crew_output(task_outputs)
@@ -987,7 +989,9 @@ class Crew(FlowTrackable, BaseModel):
) -> TaskOutput | None:
"""Handle conditional task evaluation using native async."""
if pending_tasks:
task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed)
task_outputs.extend(
await self._aprocess_async_tasks(pending_tasks, was_replayed)
)
pending_tasks.clear()
return check_conditional_skip(
@@ -1152,7 +1156,7 @@ class Crew(FlowTrackable, BaseModel):
futures.append((task, future, task_index))
else:
if futures:
task_outputs = self._process_async_tasks(futures, was_replayed)
task_outputs.extend(self._process_async_tasks(futures, was_replayed))
futures.clear()
context = self._get_context(task, task_outputs)
@@ -1166,7 +1170,7 @@ class Crew(FlowTrackable, BaseModel):
self._store_execution_log(task, task_output, task_index, was_replayed)
if futures:
task_outputs = self._process_async_tasks(futures, was_replayed)
task_outputs.extend(self._process_async_tasks(futures, was_replayed))
return self._create_crew_output(task_outputs)
@@ -1179,7 +1183,7 @@ class Crew(FlowTrackable, BaseModel):
was_replayed: bool,
) -> TaskOutput | None:
if futures:
task_outputs = self._process_async_tasks(futures, was_replayed)
task_outputs.extend(self._process_async_tasks(futures, was_replayed))
futures.clear()
return check_conditional_skip(

View File

@@ -381,4 +381,171 @@ class TestAsyncProcessAsyncTasks:
async def test_aprocess_async_tasks_empty(self, test_crew: Crew) -> None:
"""Test processing empty list of async tasks."""
result = await test_crew._aprocess_async_tasks([])
assert result == []
assert result == []
class TestMixedSyncAsyncTaskOutputs:
"""Tests for issue #4137: Task outputs lost when mixing sync and async tasks.
These tests verify that when a Crew executes a mix of synchronous and
asynchronous tasks, all task outputs are preserved correctly.
"""
@pytest.mark.asyncio
@patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock)
async def test_async_sync_task_before_async_task_outputs_preserved(
self, mock_execute: AsyncMock, test_agent: Agent
) -> None:
"""Test that sync task outputs before async tasks are preserved.
Scenario: sync -> async -> sync
Expected: All 3 task outputs should be in the result.
"""
task1 = Task(
description="Sync task 1",
expected_output="Output 1",
agent=test_agent,
async_execution=False,
)
task2 = Task(
description="Async task 2",
expected_output="Output 2",
agent=test_agent,
async_execution=True,
)
task3 = Task(
description="Sync task 3",
expected_output="Output 3",
agent=test_agent,
async_execution=False,
)
crew = Crew(
agents=[test_agent],
tasks=[task1, task2, task3],
verbose=False,
)
mock_output1 = TaskOutput(
description="Sync task 1",
raw="Result 1",
agent="Test Agent",
)
mock_output2 = TaskOutput(
description="Async task 2",
raw="Result 2",
agent="Test Agent",
)
mock_output3 = TaskOutput(
description="Sync task 3",
raw="Result 3",
agent="Test Agent",
)
mock_execute.side_effect = [mock_output1, mock_output2, mock_output3]
result = await crew._aexecute_tasks(crew.tasks)
assert result is not None
assert len(result.tasks_output) == 3
assert result.tasks_output[0].raw == "Result 1"
assert result.tasks_output[1].raw == "Result 2"
assert result.tasks_output[2].raw == "Result 3"
@pytest.mark.asyncio
@patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock)
async def test_async_crew_ending_with_async_task_preserves_outputs(
self, mock_execute: AsyncMock, test_agent: Agent
) -> None:
"""Test that outputs are preserved when crew ends with async task.
Scenario: sync -> async (final)
Expected: Both task outputs should be in the result.
"""
task1 = Task(
description="Sync task 1",
expected_output="Output 1",
agent=test_agent,
async_execution=False,
)
task2 = Task(
description="Async task 2",
expected_output="Output 2",
agent=test_agent,
async_execution=True,
)
crew = Crew(
agents=[test_agent],
tasks=[task1, task2],
verbose=False,
)
mock_output1 = TaskOutput(
description="Sync task 1",
raw="Result 1",
agent="Test Agent",
)
mock_output2 = TaskOutput(
description="Async task 2",
raw="Result 2",
agent="Test Agent",
)
mock_execute.side_effect = [mock_output1, mock_output2]
result = await crew._aexecute_tasks(crew.tasks)
assert result is not None
assert len(result.tasks_output) == 2
assert result.tasks_output[0].raw == "Result 1"
assert result.tasks_output[1].raw == "Result 2"
@pytest.mark.asyncio
@patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock)
async def test_async_multiple_sync_before_async_all_preserved(
self, mock_execute: AsyncMock, test_agent: Agent
) -> None:
"""Test that multiple sync task outputs before async are preserved.
Scenario: sync -> sync -> async -> sync
Expected: All 4 task outputs should be in the result.
"""
task1 = Task(
description="Sync task 1",
expected_output="Output 1",
agent=test_agent,
async_execution=False,
)
task2 = Task(
description="Sync task 2",
expected_output="Output 2",
agent=test_agent,
async_execution=False,
)
task3 = Task(
description="Async task 3",
expected_output="Output 3",
agent=test_agent,
async_execution=True,
)
task4 = Task(
description="Sync task 4",
expected_output="Output 4",
agent=test_agent,
async_execution=False,
)
crew = Crew(
agents=[test_agent],
tasks=[task1, task2, task3, task4],
verbose=False,
)
mock_outputs = [
TaskOutput(description=f"Task {i}", raw=f"Result {i}", agent="Test Agent")
for i in range(1, 5)
]
mock_execute.side_effect = mock_outputs
result = await crew._aexecute_tasks(crew.tasks)
assert result is not None
assert len(result.tasks_output) == 4
for i in range(4):
assert result.tasks_output[i].raw == f"Result {i + 1}"

View File

@@ -1251,6 +1251,200 @@ async def test_async_task_execution_call_count(researcher, writer):
assert mock_execute_sync.call_count == 1
def test_sync_task_outputs_preserved_when_mixing_sync_async_tasks():
"""Test for issue #4137: Task outputs lost when mixing sync and async tasks.
Scenario: sync -> async -> sync
Expected: All 3 task outputs should be in the result.
"""
researcher_agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
allow_delegation=False,
)
task1 = Task(
description="Sync task 1",
expected_output="Output 1",
agent=researcher_agent,
async_execution=False,
)
task2 = Task(
description="Async task 2",
expected_output="Output 2",
agent=researcher_agent,
async_execution=True,
)
task3 = Task(
description="Sync task 3",
expected_output="Output 3",
agent=researcher_agent,
async_execution=False,
)
crew = Crew(
agents=[researcher_agent],
tasks=[task1, task2, task3],
verbose=False,
)
mock_output1 = TaskOutput(
description="Sync task 1",
raw="Result 1",
agent="Researcher",
)
mock_output2 = TaskOutput(
description="Async task 2",
raw="Result 2",
agent="Researcher",
)
mock_output3 = TaskOutput(
description="Sync task 3",
raw="Result 3",
agent="Researcher",
)
mock_future = MagicMock(spec=Future)
mock_future.result.return_value = mock_output2
with (
patch.object(Task, "execute_sync", side_effect=[mock_output1, mock_output3]),
patch.object(Task, "execute_async", return_value=mock_future),
):
result = crew.kickoff()
assert result is not None
assert len(result.tasks_output) == 3
assert result.tasks_output[0].raw == "Result 1"
assert result.tasks_output[1].raw == "Result 2"
assert result.tasks_output[2].raw == "Result 3"
def test_sync_task_outputs_preserved_when_crew_ends_with_async_task():
"""Test for issue #4137: Task outputs preserved when crew ends with async task.
Scenario: sync -> async (final)
Expected: Both task outputs should be in the result.
"""
researcher_agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
allow_delegation=False,
)
task1 = Task(
description="Sync task 1",
expected_output="Output 1",
agent=researcher_agent,
async_execution=False,
)
task2 = Task(
description="Async task 2",
expected_output="Output 2",
agent=researcher_agent,
async_execution=True,
)
crew = Crew(
agents=[researcher_agent],
tasks=[task1, task2],
verbose=False,
)
mock_output1 = TaskOutput(
description="Sync task 1",
raw="Result 1",
agent="Researcher",
)
mock_output2 = TaskOutput(
description="Async task 2",
raw="Result 2",
agent="Researcher",
)
mock_future = MagicMock(spec=Future)
mock_future.result.return_value = mock_output2
with (
patch.object(Task, "execute_sync", return_value=mock_output1),
patch.object(Task, "execute_async", return_value=mock_future),
):
result = crew.kickoff()
assert result is not None
assert len(result.tasks_output) == 2
assert result.tasks_output[0].raw == "Result 1"
assert result.tasks_output[1].raw == "Result 2"
def test_sync_multiple_sync_tasks_before_async_all_preserved():
"""Test for issue #4137: Multiple sync task outputs before async are preserved.
Scenario: sync -> sync -> async -> sync
Expected: All 4 task outputs should be in the result.
"""
researcher_agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
allow_delegation=False,
)
task1 = Task(
description="Sync task 1",
expected_output="Output 1",
agent=researcher_agent,
async_execution=False,
)
task2 = Task(
description="Sync task 2",
expected_output="Output 2",
agent=researcher_agent,
async_execution=False,
)
task3 = Task(
description="Async task 3",
expected_output="Output 3",
agent=researcher_agent,
async_execution=True,
)
task4 = Task(
description="Sync task 4",
expected_output="Output 4",
agent=researcher_agent,
async_execution=False,
)
crew = Crew(
agents=[researcher_agent],
tasks=[task1, task2, task3, task4],
verbose=False,
)
mock_outputs = [
TaskOutput(description=f"Task {i}", raw=f"Result {i}", agent="Researcher")
for i in range(1, 5)
]
mock_future = MagicMock(spec=Future)
mock_future.result.return_value = mock_outputs[2]
with (
patch.object(
Task, "execute_sync", side_effect=[mock_outputs[0], mock_outputs[1], mock_outputs[3]]
),
patch.object(Task, "execute_async", return_value=mock_future),
):
result = crew.kickoff()
assert result is not None
assert len(result.tasks_output) == 4
for i in range(4):
assert result.tasks_output[i].raw == f"Result {i + 1}"
@pytest.mark.vcr()
def test_kickoff_for_each_single_input():
"""Tests if kickoff_for_each works with a single input."""