Compare commits

...

4 Commits

Author SHA1 Message Date
Otto
9b20f4cd13 refactor: simplify ExecutionQueue docstrings and move test file
- Trim verbose BUG FIX docstring to concise 3-line note
- Remove redundant method docstrings (add, get, empty)
- Move test file to backend/data/ with proper pytest conventions
- Add note about ProcessPoolExecutor migration for future devs

Co-authored-by: Zamil Majdy <majdyz@users.noreply.github.com>
2026-02-08 16:11:35 +00:00
Nikhil Bhagat
a3d0f9cbd2 fix(backend): format test_execution_queue.py and remove unused variable 2025-12-14 19:37:29 +05:45
Nikhil Bhagat
02ddb51446 Added test_execution_queue.py and test the execution part and the test got passed 2025-12-14 19:05:14 +05:45
Nikhil Bhagat
750e096f15 fix(backend): replace multiprocessing.Manager().Queue() with queue.Queue()
ExecutionQueue was unnecessarily using multiprocessing.Manager().Queue() which
spawns a subprocess for IPC. Since ExecutionQueue is only accessed from threads
within the same process, queue.Queue() is sufficient and more efficient.

- Eliminates unnecessary subprocess spawning per graph execution
- Removes IPC overhead for queue operations
- Prevents potential resource leaks from Manager processes
- Improves scalability for concurrent graph executions
2025-12-14 19:04:14 +05:45
2 changed files with 69 additions and 6 deletions

View File

@@ -1,9 +1,8 @@
import logging
import queue
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from enum import Enum
from multiprocessing import Manager
from queue import Empty
from typing import (
TYPE_CHECKING,
Annotated,
@@ -1164,12 +1163,16 @@ class NodeExecutionEntry(BaseModel):
class ExecutionQueue(Generic[T]):
"""
Queue for managing the execution of agents.
This will be shared between different processes
Thread-safe queue for managing node execution within a single graph execution.
Note: Uses queue.Queue (not multiprocessing.Queue) since all access is from
threads within the same process. If migrating back to ProcessPoolExecutor,
replace with multiprocessing.Manager().Queue() for cross-process safety.
"""
def __init__(self):
self.queue = Manager().Queue()
# Thread-safe queue (not multiprocessing) — see class docstring
self.queue: queue.Queue[T] = queue.Queue()
def add(self, execution: T) -> T:
self.queue.put(execution)
@@ -1184,7 +1187,7 @@ class ExecutionQueue(Generic[T]):
def get_or_none(self) -> T | None:
try:
return self.queue.get_nowait()
except Empty:
except queue.Empty:
return None

View File

@@ -0,0 +1,60 @@
"""Tests for ExecutionQueue thread-safety."""
import queue
import threading
import pytest
from backend.data.execution import ExecutionQueue
def test_execution_queue_uses_stdlib_queue():
"""Verify ExecutionQueue uses queue.Queue (not multiprocessing)."""
q = ExecutionQueue()
assert isinstance(q.queue, queue.Queue)
def test_basic_operations():
"""Test add, get, empty, and get_or_none."""
q = ExecutionQueue()
assert q.empty() is True
assert q.get_or_none() is None
result = q.add("item1")
assert result == "item1"
assert q.empty() is False
item = q.get()
assert item == "item1"
assert q.empty() is True
def test_thread_safety():
"""Test concurrent access from multiple threads."""
q = ExecutionQueue()
results = []
num_items = 100
def producer():
for i in range(num_items):
q.add(f"item_{i}")
def consumer():
count = 0
while count < num_items:
item = q.get_or_none()
if item is not None:
results.append(item)
count += 1
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join(timeout=5)
consumer_thread.join(timeout=5)
assert len(results) == num_items