Compare commits

...

2 Commits

Author SHA1 Message Date
Otto
48c8e541db Merge branch 'dev' into revert-11618-fix/execution-queue-performance
Resolve conflict: remove execution_queue_test.py (deleted by revert)
2026-02-09 08:54:54 +00:00
Nicholas Tindle
ad0571c752 Revert "fix(backend): replace multiprocessing queue with thread safe queue in…"
This reverts commit 728c40def5.
2026-02-08 21:22:16 -06:00
2 changed files with 6 additions and 67 deletions

View File

@@ -1,8 +1,9 @@
import logging
import queue
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from enum import Enum
from multiprocessing import Manager
from queue import Empty
from typing import (
TYPE_CHECKING,
Annotated,
@@ -1199,16 +1200,12 @@ class NodeExecutionEntry(BaseModel):
class ExecutionQueue(Generic[T]):
"""
Thread-safe queue for managing node execution within a single graph execution.
Note: Uses queue.Queue (not multiprocessing.Queue) since all access is from
threads within the same process. If migrating back to ProcessPoolExecutor,
replace with multiprocessing.Manager().Queue() for cross-process safety.
Queue for managing the execution of agents.
This will be shared between different processes
"""
def __init__(self):
# Thread-safe queue (not multiprocessing) — see class docstring
self.queue: queue.Queue[T] = queue.Queue()
self.queue = Manager().Queue()
def add(self, execution: T) -> T:
self.queue.put(execution)
@@ -1223,7 +1220,7 @@ class ExecutionQueue(Generic[T]):
def get_or_none(self) -> T | None:
try:
return self.queue.get_nowait()
except queue.Empty:
except Empty:
return None

View File

@@ -1,58 +0,0 @@
"""Tests for ExecutionQueue thread-safety."""
import queue
import threading
from backend.data.execution import ExecutionQueue
def test_execution_queue_uses_stdlib_queue():
"""Verify ExecutionQueue uses queue.Queue (not multiprocessing)."""
q = ExecutionQueue()
assert isinstance(q.queue, queue.Queue)
def test_basic_operations():
"""Test add, get, empty, and get_or_none."""
q = ExecutionQueue()
assert q.empty() is True
assert q.get_or_none() is None
result = q.add("item1")
assert result == "item1"
assert q.empty() is False
item = q.get()
assert item == "item1"
assert q.empty() is True
def test_thread_safety():
"""Test concurrent access from multiple threads."""
q = ExecutionQueue()
results = []
num_items = 100
def producer():
for i in range(num_items):
q.add(f"item_{i}")
def consumer():
count = 0
while count < num_items:
item = q.get_or_none()
if item is not None:
results.append(item)
count += 1
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join(timeout=5)
consumer_thread.join(timeout=5)
assert len(results) == num_items