mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-09 06:15:41 -05:00
fix(backend): replace multiprocessing queue with thread safe queue in ExecutionQueue (#11618)
<!-- Clearly explain the need for these changes: --> The `ExecutionQueue` class was using `multiprocessing.Manager().Queue()` which spawns a subprocess for inter-process communication. However, analysis showed that `ExecutionQueue` is only accessed from threads within the same process, not across processes. This caused: - Unnecessary subprocess spawning per graph execution - IPC overhead for every queue operation - Potential resource leaks if Manager processes weren't properly cleaned up - Limited scalability when many graphs execute concurrently ### Changes <!-- Concisely describe all of the changes made in this pull request: --> - Replaced `multiprocessing.Manager().Queue()` with `queue.Queue()` in `ExecutionQueue` class - Updated imports: removed `from multiprocessing import Manager` and `from queue import Empty`, added `import queue` - Updated exception handling from `except Empty:` to `except queue.Empty:` - Added comprehensive docstring explaining the bug and fix **File changed:** `autogpt_platform/backend/backend/data/execution.py` ### Checklist #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: <!-- Put your test plan here: --> - [x] Verified `ExecutionQueue` uses `queue.Queue` (not `multiprocessing.Manager().Queue()`) - [x] Tested all queue operations: `add()`, `get()`, `empty()`, `get_or_none()` - [x] Verified thread-safety with concurrent producer/consumer threads (100 items) - [x] Verified multi-producer/consumer scenario (3 producers, 2 consumers, 150 items) - [x] Confirmed no subprocess spawning when creating multiple queues - [x] Code passes Black formatting check #### For configuration changes: - [x] `.env.default` is updated or already compatible with my changes - [x] `docker-compose.yml` is updated or already compatible with my changes - [x] I have included a list of my configuration changes in the PR description (under **Changes**) > No configuration changes required - this is a code-only fix with no external API changes. --------- Co-authored-by: Otto <otto@agpt.co> Co-authored-by: Zamil Majdy <majdyz@users.noreply.github.com> Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
This commit is contained in:
@@ -1,9 +1,8 @@
|
||||
import logging
|
||||
import queue
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from enum import Enum
|
||||
from multiprocessing import Manager
|
||||
from queue import Empty
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Annotated,
|
||||
@@ -1200,12 +1199,16 @@ class NodeExecutionEntry(BaseModel):
|
||||
|
||||
class ExecutionQueue(Generic[T]):
|
||||
"""
|
||||
Queue for managing the execution of agents.
|
||||
This will be shared between different processes
|
||||
Thread-safe queue for managing node execution within a single graph execution.
|
||||
|
||||
Note: Uses queue.Queue (not multiprocessing.Queue) since all access is from
|
||||
threads within the same process. If migrating back to ProcessPoolExecutor,
|
||||
replace with multiprocessing.Manager().Queue() for cross-process safety.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.queue = Manager().Queue()
|
||||
# Thread-safe queue (not multiprocessing) — see class docstring
|
||||
self.queue: queue.Queue[T] = queue.Queue()
|
||||
|
||||
def add(self, execution: T) -> T:
|
||||
self.queue.put(execution)
|
||||
@@ -1220,7 +1223,7 @@ class ExecutionQueue(Generic[T]):
|
||||
def get_or_none(self) -> T | None:
|
||||
try:
|
||||
return self.queue.get_nowait()
|
||||
except Empty:
|
||||
except queue.Empty:
|
||||
return None
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,60 @@
|
||||
"""Tests for ExecutionQueue thread-safety."""
|
||||
|
||||
import queue
|
||||
import threading
|
||||
|
||||
import pytest
|
||||
|
||||
from backend.data.execution import ExecutionQueue
|
||||
|
||||
|
||||
def test_execution_queue_uses_stdlib_queue():
|
||||
"""Verify ExecutionQueue uses queue.Queue (not multiprocessing)."""
|
||||
q = ExecutionQueue()
|
||||
assert isinstance(q.queue, queue.Queue)
|
||||
|
||||
|
||||
def test_basic_operations():
|
||||
"""Test add, get, empty, and get_or_none."""
|
||||
q = ExecutionQueue()
|
||||
|
||||
assert q.empty() is True
|
||||
assert q.get_or_none() is None
|
||||
|
||||
result = q.add("item1")
|
||||
assert result == "item1"
|
||||
assert q.empty() is False
|
||||
|
||||
item = q.get()
|
||||
assert item == "item1"
|
||||
assert q.empty() is True
|
||||
|
||||
|
||||
def test_thread_safety():
|
||||
"""Test concurrent access from multiple threads."""
|
||||
q = ExecutionQueue()
|
||||
results = []
|
||||
num_items = 100
|
||||
|
||||
def producer():
|
||||
for i in range(num_items):
|
||||
q.add(f"item_{i}")
|
||||
|
||||
def consumer():
|
||||
count = 0
|
||||
while count < num_items:
|
||||
item = q.get_or_none()
|
||||
if item is not None:
|
||||
results.append(item)
|
||||
count += 1
|
||||
|
||||
producer_thread = threading.Thread(target=producer)
|
||||
consumer_thread = threading.Thread(target=consumer)
|
||||
|
||||
producer_thread.start()
|
||||
consumer_thread.start()
|
||||
|
||||
producer_thread.join(timeout=5)
|
||||
consumer_thread.join(timeout=5)
|
||||
|
||||
assert len(results) == num_items
|
||||
Reference in New Issue
Block a user