Files
AutoGPT/autogpt_platform/backend/backend/data/execution_queue_test.py
Nikhil Bhagat 728c40def5 fix(backend): replace multiprocessing queue with thread safe queue in ExecutionQueue (#11618)
<!-- Clearly explain the need for these changes: -->

The `ExecutionQueue` class was using `multiprocessing.Manager().Queue()`
which spawns a subprocess for inter-process communication. However,
analysis showed that `ExecutionQueue` is only accessed from threads
within the same process, not across processes. This caused:
- Unnecessary subprocess spawning per graph execution
- IPC overhead for every queue operation
- Potential resource leaks if Manager processes weren't properly cleaned
up
- Limited scalability when many graphs execute concurrently

### Changes 

<!-- Concisely describe all of the changes made in this pull request:
-->

- Replaced `multiprocessing.Manager().Queue()` with `queue.Queue()` in
`ExecutionQueue` class
- Updated imports: removed `from multiprocessing import Manager` and
`from queue import Empty`, added `import queue`
- Updated exception handling from `except Empty:` to `except
queue.Empty:`
- Added comprehensive docstring explaining the bug and fix

**File changed:** `autogpt_platform/backend/backend/data/execution.py`

### Checklist 

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
- [x] Verified `ExecutionQueue` uses `queue.Queue` (not
`multiprocessing.Manager().Queue()`)
- [x] Tested all queue operations: `add()`, `get()`, `empty()`,
`get_or_none()`
- [x] Verified thread-safety with concurrent producer/consumer threads
(100 items)
- [x] Verified multi-producer/consumer scenario (3 producers, 2
consumers, 150 items)
  - [x] Confirmed no subprocess spawning when creating multiple queues
  - [x] Code passes Black formatting check

#### For configuration changes:

- [x] `.env.default` is updated or already compatible with my changes
- [x] `docker-compose.yml` is updated or already compatible with my
changes
- [x] I have included a list of my configuration changes in the PR
description (under **Changes**)

> No configuration changes required - this is a code-only fix with no
external API changes.

---------

Co-authored-by: Otto <otto@agpt.co>
Co-authored-by: Zamil Majdy <majdyz@users.noreply.github.com>
Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
2026-02-08 16:28:04 +00:00

61 lines
1.4 KiB
Python

"""Tests for ExecutionQueue thread-safety."""
import queue
import threading
import pytest
from backend.data.execution import ExecutionQueue
def test_execution_queue_uses_stdlib_queue():
"""Verify ExecutionQueue uses queue.Queue (not multiprocessing)."""
q = ExecutionQueue()
assert isinstance(q.queue, queue.Queue)
def test_basic_operations():
"""Test add, get, empty, and get_or_none."""
q = ExecutionQueue()
assert q.empty() is True
assert q.get_or_none() is None
result = q.add("item1")
assert result == "item1"
assert q.empty() is False
item = q.get()
assert item == "item1"
assert q.empty() is True
def test_thread_safety():
"""Test concurrent access from multiple threads."""
q = ExecutionQueue()
results = []
num_items = 100
def producer():
for i in range(num_items):
q.add(f"item_{i}")
def consumer():
count = 0
while count < num_items:
item = q.get_or_none()
if item is not None:
results.append(item)
count += 1
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join(timeout=5)
consumer_thread.join(timeout=5)
assert len(results) == num_items