From 728c40def5f717004ceb35b53ad6e13c86759e97 Mon Sep 17 00:00:00 2001 From: Nikhil Bhagat <55572863+NikeGunn@users.noreply.github.com> Date: Sun, 8 Feb 2026 22:13:04 +0545 Subject: [PATCH] fix(backend): replace multiprocessing queue with thread safe queue in ExecutionQueue (#11618) The `ExecutionQueue` class was using `multiprocessing.Manager().Queue()` which spawns a subprocess for inter-process communication. However, analysis showed that `ExecutionQueue` is only accessed from threads within the same process, not across processes. This caused: - Unnecessary subprocess spawning per graph execution - IPC overhead for every queue operation - Potential resource leaks if Manager processes weren't properly cleaned up - Limited scalability when many graphs execute concurrently ### Changes - Replaced `multiprocessing.Manager().Queue()` with `queue.Queue()` in `ExecutionQueue` class - Updated imports: removed `from multiprocessing import Manager` and `from queue import Empty`, added `import queue` - Updated exception handling from `except Empty:` to `except queue.Empty:` - Added comprehensive docstring explaining the bug and fix **File changed:** `autogpt_platform/backend/backend/data/execution.py` ### Checklist #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: - [x] Verified `ExecutionQueue` uses `queue.Queue` (not `multiprocessing.Manager().Queue()`) - [x] Tested all queue operations: `add()`, `get()`, `empty()`, `get_or_none()` - [x] Verified thread-safety with concurrent producer/consumer threads (100 items) - [x] Verified multi-producer/consumer scenario (3 producers, 2 consumers, 150 items) - [x] Confirmed no subprocess spawning when creating multiple queues - [x] Code passes Black formatting check #### For configuration changes: - [x] `.env.default` is updated or already compatible with my changes - [x] `docker-compose.yml` is updated or already compatible with my changes - [x] I have included a list of my configuration changes in the PR description (under **Changes**) > No configuration changes required - this is a code-only fix with no external API changes. --------- Co-authored-by: Otto Co-authored-by: Zamil Majdy Co-authored-by: Zamil Majdy --- .../backend/backend/data/execution.py | 15 +++-- .../backend/data/execution_queue_test.py | 60 +++++++++++++++++++ 2 files changed, 69 insertions(+), 6 deletions(-) create mode 100644 autogpt_platform/backend/backend/data/execution_queue_test.py diff --git a/autogpt_platform/backend/backend/data/execution.py b/autogpt_platform/backend/backend/data/execution.py index afb8c70538..def3d14fda 100644 --- a/autogpt_platform/backend/backend/data/execution.py +++ b/autogpt_platform/backend/backend/data/execution.py @@ -1,9 +1,8 @@ import logging +import queue from collections import defaultdict from datetime import datetime, timedelta, timezone from enum import Enum -from multiprocessing import Manager -from queue import Empty from typing import ( TYPE_CHECKING, Annotated, @@ -1200,12 +1199,16 @@ class NodeExecutionEntry(BaseModel): class ExecutionQueue(Generic[T]): """ - Queue for managing the execution of agents. - This will be shared between different processes + Thread-safe queue for managing node execution within a single graph execution. + + Note: Uses queue.Queue (not multiprocessing.Queue) since all access is from + threads within the same process. If migrating back to ProcessPoolExecutor, + replace with multiprocessing.Manager().Queue() for cross-process safety. """ def __init__(self): - self.queue = Manager().Queue() + # Thread-safe queue (not multiprocessing) — see class docstring + self.queue: queue.Queue[T] = queue.Queue() def add(self, execution: T) -> T: self.queue.put(execution) @@ -1220,7 +1223,7 @@ class ExecutionQueue(Generic[T]): def get_or_none(self) -> T | None: try: return self.queue.get_nowait() - except Empty: + except queue.Empty: return None diff --git a/autogpt_platform/backend/backend/data/execution_queue_test.py b/autogpt_platform/backend/backend/data/execution_queue_test.py new file mode 100644 index 0000000000..ffe0fb265b --- /dev/null +++ b/autogpt_platform/backend/backend/data/execution_queue_test.py @@ -0,0 +1,60 @@ +"""Tests for ExecutionQueue thread-safety.""" + +import queue +import threading + +import pytest + +from backend.data.execution import ExecutionQueue + + +def test_execution_queue_uses_stdlib_queue(): + """Verify ExecutionQueue uses queue.Queue (not multiprocessing).""" + q = ExecutionQueue() + assert isinstance(q.queue, queue.Queue) + + +def test_basic_operations(): + """Test add, get, empty, and get_or_none.""" + q = ExecutionQueue() + + assert q.empty() is True + assert q.get_or_none() is None + + result = q.add("item1") + assert result == "item1" + assert q.empty() is False + + item = q.get() + assert item == "item1" + assert q.empty() is True + + +def test_thread_safety(): + """Test concurrent access from multiple threads.""" + q = ExecutionQueue() + results = [] + num_items = 100 + + def producer(): + for i in range(num_items): + q.add(f"item_{i}") + + def consumer(): + count = 0 + while count < num_items: + item = q.get_or_none() + if item is not None: + results.append(item) + count += 1 + + producer_thread = threading.Thread(target=producer) + consumer_thread = threading.Thread(target=consumer) + + producer_thread.start() + consumer_thread.start() + + producer_thread.join(timeout=5) + consumer_thread.join(timeout=5) + + assert len(results) == num_items