From 750e096f1562bd44737e42cce2da89ab11a8def7 Mon Sep 17 00:00:00 2001 From: Nikhil Bhagat Date: Sun, 14 Dec 2025 19:04:14 +0545 Subject: [PATCH] fix(backend): replace multiprocessing.Manager().Queue() with queue.Queue() ExecutionQueue was unnecessarily using multiprocessing.Manager().Queue() which spawns a subprocess for IPC. Since ExecutionQueue is only accessed from threads within the same process, queue.Queue() is sufficient and more efficient. - Eliminates unnecessary subprocess spawning per graph execution - Removes IPC overhead for queue operations - Prevents potential resource leaks from Manager processes - Improves scalability for concurrent graph executions --- .../backend/backend/data/execution.py | 52 ++++++++++++++++--- 1 file changed, 46 insertions(+), 6 deletions(-) diff --git a/autogpt_platform/backend/backend/data/execution.py b/autogpt_platform/backend/backend/data/execution.py index 020a5a1906..14cc4b0fba 100644 --- a/autogpt_platform/backend/backend/data/execution.py +++ b/autogpt_platform/backend/backend/data/execution.py @@ -1,9 +1,8 @@ import logging +import queue from collections import defaultdict from datetime import datetime, timedelta, timezone from enum import Enum -from multiprocessing import Manager -from queue import Empty from typing import ( TYPE_CHECKING, Annotated, @@ -1164,27 +1163,68 @@ class NodeExecutionEntry(BaseModel): class ExecutionQueue(Generic[T]): """ - Queue for managing the execution of agents. - This will be shared between different processes + Thread-safe queue for managing node execution within a single graph execution. + + BUG FIX EXPLANATION: + ==================== + This class previously used `multiprocessing.Manager().Queue()` which spawns a + separate subprocess for inter-process communication (IPC). However, analysis of + the codebase revealed that ExecutionQueue is: + + 1. Created per-graph-execution in `_on_graph_execution()` method + 2. Only accessed from threads within the SAME process: + - Main execution thread (adds/gets items, checks if empty) + - Async coroutines in `node_evaluation_loop` thread (adds items via + `_process_node_output`) + + Since all access is within a single process, we only need THREAD-SAFETY, not + PROCESS-SAFETY. Using multiprocessing.Manager() was: + + - Spawning an unnecessary subprocess for each graph execution + - Adding significant IPC overhead for every queue operation + - Potentially causing resource leaks if Manager processes weren't properly + cleaned up + - Limiting scalability when many graphs execute concurrently + + THE FIX: + ======== + Replace `multiprocessing.Manager().Queue()` with `queue.Queue()` which is: + - Thread-safe (uses internal locks for synchronization) + - Much faster (no IPC overhead) + - No subprocess spawning + - Proper cleanup through Python's garbage collector + + This is a minimal, high-impact fix that improves performance and resource usage + without changing any external API or behavior. """ def __init__(self): - self.queue = Manager().Queue() + # Use threading-safe queue instead of multiprocessing Manager queue. + # queue.Queue is thread-safe and sufficient since ExecutionQueue is only + # accessed from multiple threads within the same process, not across processes. + self.queue: queue.Queue[T] = queue.Queue() def add(self, execution: T) -> T: + """Add an execution entry to the queue. Thread-safe.""" self.queue.put(execution) return execution def get(self) -> T: + """Get the next execution entry from the queue. Blocks if empty. Thread-safe.""" return self.queue.get() def empty(self) -> bool: + """Check if the queue is empty. Thread-safe (approximate check).""" return self.queue.empty() def get_or_none(self) -> T | None: + """ + Non-blocking get: returns the next item or None if queue is empty. + Thread-safe. + """ try: return self.queue.get_nowait() - except Empty: + except queue.Empty: return None