From ad0571c752a22c1850889025f6b1c2fe98506682 Mon Sep 17 00:00:00 2001 From: Nicholas Tindle Date: Sun, 8 Feb 2026 21:22:16 -0600 Subject: [PATCH] =?UTF-8?q?Revert=20"fix(backend):=20replace=20multiproces?= =?UTF-8?q?sing=20queue=20with=20thread=20safe=20queue=20in=E2=80=A6"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 728c40def5f717004ceb35b53ad6e13c86759e97. --- .../backend/backend/data/execution.py | 15 ++--- .../backend/data/execution_queue_test.py | 60 ------------------- 2 files changed, 6 insertions(+), 69 deletions(-) delete mode 100644 autogpt_platform/backend/backend/data/execution_queue_test.py diff --git a/autogpt_platform/backend/backend/data/execution.py b/autogpt_platform/backend/backend/data/execution.py index def3d14fda..afb8c70538 100644 --- a/autogpt_platform/backend/backend/data/execution.py +++ b/autogpt_platform/backend/backend/data/execution.py @@ -1,8 +1,9 @@ import logging -import queue from collections import defaultdict from datetime import datetime, timedelta, timezone from enum import Enum +from multiprocessing import Manager +from queue import Empty from typing import ( TYPE_CHECKING, Annotated, @@ -1199,16 +1200,12 @@ class NodeExecutionEntry(BaseModel): class ExecutionQueue(Generic[T]): """ - Thread-safe queue for managing node execution within a single graph execution. - - Note: Uses queue.Queue (not multiprocessing.Queue) since all access is from - threads within the same process. If migrating back to ProcessPoolExecutor, - replace with multiprocessing.Manager().Queue() for cross-process safety. + Queue for managing the execution of agents. + This will be shared between different processes """ def __init__(self): - # Thread-safe queue (not multiprocessing) — see class docstring - self.queue: queue.Queue[T] = queue.Queue() + self.queue = Manager().Queue() def add(self, execution: T) -> T: self.queue.put(execution) @@ -1223,7 +1220,7 @@ class ExecutionQueue(Generic[T]): def get_or_none(self) -> T | None: try: return self.queue.get_nowait() - except queue.Empty: + except Empty: return None diff --git a/autogpt_platform/backend/backend/data/execution_queue_test.py b/autogpt_platform/backend/backend/data/execution_queue_test.py deleted file mode 100644 index ffe0fb265b..0000000000 --- a/autogpt_platform/backend/backend/data/execution_queue_test.py +++ /dev/null @@ -1,60 +0,0 @@ -"""Tests for ExecutionQueue thread-safety.""" - -import queue -import threading - -import pytest - -from backend.data.execution import ExecutionQueue - - -def test_execution_queue_uses_stdlib_queue(): - """Verify ExecutionQueue uses queue.Queue (not multiprocessing).""" - q = ExecutionQueue() - assert isinstance(q.queue, queue.Queue) - - -def test_basic_operations(): - """Test add, get, empty, and get_or_none.""" - q = ExecutionQueue() - - assert q.empty() is True - assert q.get_or_none() is None - - result = q.add("item1") - assert result == "item1" - assert q.empty() is False - - item = q.get() - assert item == "item1" - assert q.empty() is True - - -def test_thread_safety(): - """Test concurrent access from multiple threads.""" - q = ExecutionQueue() - results = [] - num_items = 100 - - def producer(): - for i in range(num_items): - q.add(f"item_{i}") - - def consumer(): - count = 0 - while count < num_items: - item = q.get_or_none() - if item is not None: - results.append(item) - count += 1 - - producer_thread = threading.Thread(target=producer) - consumer_thread = threading.Thread(target=consumer) - - producer_thread.start() - consumer_thread.start() - - producer_thread.join(timeout=5) - consumer_thread.join(timeout=5) - - assert len(results) == num_items