fix(backend): replace multiprocessing.Manager().Queue() with queue.Queue()

ExecutionQueue was unnecessarily using multiprocessing.Manager().Queue() which
spawns a subprocess for IPC. Since ExecutionQueue is only accessed from threads
within the same process, queue.Queue() is sufficient and more efficient.

- Eliminates unnecessary subprocess spawning per graph execution
- Removes IPC overhead for queue operations
- Prevents potential resource leaks from Manager processes
- Improves scalability for concurrent graph executions
This commit is contained in:
Nikhil Bhagat
2025-12-14 19:04:14 +05:45
parent ff5c8f324b
commit 750e096f15

View File

@@ -1,9 +1,8 @@
import logging
import queue
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from enum import Enum
from multiprocessing import Manager
from queue import Empty
from typing import (
TYPE_CHECKING,
Annotated,
@@ -1164,27 +1163,68 @@ class NodeExecutionEntry(BaseModel):
class ExecutionQueue(Generic[T]):
"""
Queue for managing the execution of agents.
This will be shared between different processes
Thread-safe queue for managing node execution within a single graph execution.
BUG FIX EXPLANATION:
====================
This class previously used `multiprocessing.Manager().Queue()` which spawns a
separate subprocess for inter-process communication (IPC). However, analysis of
the codebase revealed that ExecutionQueue is:
1. Created per-graph-execution in `_on_graph_execution()` method
2. Only accessed from threads within the SAME process:
- Main execution thread (adds/gets items, checks if empty)
- Async coroutines in `node_evaluation_loop` thread (adds items via
`_process_node_output`)
Since all access is within a single process, we only need THREAD-SAFETY, not
PROCESS-SAFETY. Using multiprocessing.Manager() was:
- Spawning an unnecessary subprocess for each graph execution
- Adding significant IPC overhead for every queue operation
- Potentially causing resource leaks if Manager processes weren't properly
cleaned up
- Limiting scalability when many graphs execute concurrently
THE FIX:
========
Replace `multiprocessing.Manager().Queue()` with `queue.Queue()` which is:
- Thread-safe (uses internal locks for synchronization)
- Much faster (no IPC overhead)
- No subprocess spawning
- Proper cleanup through Python's garbage collector
This is a minimal, high-impact fix that improves performance and resource usage
without changing any external API or behavior.
"""
def __init__(self):
self.queue = Manager().Queue()
# Use threading-safe queue instead of multiprocessing Manager queue.
# queue.Queue is thread-safe and sufficient since ExecutionQueue is only
# accessed from multiple threads within the same process, not across processes.
self.queue: queue.Queue[T] = queue.Queue()
def add(self, execution: T) -> T:
"""Add an execution entry to the queue. Thread-safe."""
self.queue.put(execution)
return execution
def get(self) -> T:
"""Get the next execution entry from the queue. Blocks if empty. Thread-safe."""
return self.queue.get()
def empty(self) -> bool:
"""Check if the queue is empty. Thread-safe (approximate check)."""
return self.queue.empty()
def get_or_none(self) -> T | None:
"""
Non-blocking get: returns the next item or None if queue is empty.
Thread-safe.
"""
try:
return self.queue.get_nowait()
except Empty:
except queue.Empty:
return None