Files
AutoGPT/autogpt_platform/backend/test_execution_queue.py

237 lines
7.1 KiB
Python

"""
Test script to verify the ExecutionQueue fix in execution.py
This script tests:
1. That ExecutionQueue uses queue.Queue (not multiprocessing.Manager().Queue())
2. All queue operations work correctly
3. Thread-safety works as expected
"""
import sys
import threading
import time
sys.path.insert(0, ".")
import queue
def test_queue_type():
"""Test that ExecutionQueue uses the correct queue type."""
from backend.data.execution import ExecutionQueue
q = ExecutionQueue()
# Verify it's using queue.Queue, not multiprocessing queue
assert isinstance(
q.queue, queue.Queue
), f"FAIL: Expected queue.Queue, got {type(q.queue)}"
print("✓ ExecutionQueue uses queue.Queue (not multiprocessing.Manager().Queue())")
def test_basic_operations():
"""Test basic queue operations."""
from backend.data.execution import ExecutionQueue
q = ExecutionQueue()
# Test add
result = q.add("item1")
assert result == "item1", f"FAIL: add() should return the item, got {result}"
print("✓ add() works correctly")
# Test empty() when not empty
assert q.empty() is False, "FAIL: empty() should return False when queue has items"
print("✓ empty() returns False when queue has items")
# Test get()
item = q.get()
assert item == "item1", f"FAIL: get() returned {item}, expected 'item1'"
print("✓ get() works correctly")
# Test empty() when empty
assert q.empty() is True, "FAIL: empty() should return True when queue is empty"
print("✓ empty() returns True when queue is empty")
# Test get_or_none() when empty
result = q.get_or_none()
assert result is None, f"FAIL: get_or_none() should return None, got {result}"
print("✓ get_or_none() returns None when queue is empty")
# Test get_or_none() with items
q.add("item2")
result = q.get_or_none()
assert result == "item2", f"FAIL: get_or_none() returned {result}, expected 'item2'"
print("✓ get_or_none() returns item when queue has items")
def test_thread_safety():
"""Test that the queue is thread-safe."""
from backend.data.execution import ExecutionQueue
q = ExecutionQueue()
results = []
errors = []
num_items = 100
def producer():
try:
for i in range(num_items):
q.add(f"item_{i}")
except Exception as e:
errors.append(f"Producer error: {e}")
def consumer():
try:
count = 0
while count < num_items:
item = q.get_or_none()
if item is not None:
results.append(item)
count += 1
else:
time.sleep(0.001) # Small delay to avoid busy waiting
except Exception as e:
errors.append(f"Consumer error: {e}")
# Start threads
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join(timeout=5)
consumer_thread.join(timeout=5)
assert len(errors) == 0, f"FAIL: Thread errors occurred: {errors}"
assert len(results) == num_items, f"FAIL: Expected {num_items} items, got {len(results)}"
print(f"✓ Thread-safety test passed ({num_items} items transferred between threads)")
def test_multiple_producers_consumers():
"""Test with multiple producer and consumer threads."""
from backend.data.execution import ExecutionQueue
q = ExecutionQueue()
results = []
results_lock = threading.Lock()
errors = []
items_per_producer = 50
num_producers = 3
total_items = items_per_producer * num_producers
def producer(producer_id):
try:
for i in range(items_per_producer):
q.add(f"producer_{producer_id}_item_{i}")
except Exception as e:
errors.append(f"Producer {producer_id} error: {e}")
def consumer(consumer_id, target_count):
try:
count = 0
max_attempts = target_count * 100
attempts = 0
while count < target_count and attempts < max_attempts:
item = q.get_or_none()
if item is not None:
with results_lock:
results.append(item)
count += 1
else:
time.sleep(0.001)
attempts += 1
except Exception as e:
errors.append(f"Consumer {consumer_id} error: {e}")
# Start multiple producers
producer_threads = [
threading.Thread(target=producer, args=(i,)) for i in range(num_producers)
]
# Start multiple consumers (each consumes half of total)
consumer_threads = [
threading.Thread(target=consumer, args=(i, total_items // 2))
for i in range(2)
]
for t in producer_threads:
t.start()
for t in consumer_threads:
t.start()
for t in producer_threads:
t.join(timeout=10)
for t in consumer_threads:
t.join(timeout=10)
assert len(errors) == 0, f"FAIL: Thread errors occurred: {errors}"
assert len(results) == total_items, f"FAIL: Expected {total_items} items, got {len(results)}"
print(f"✓ Multi-producer/consumer test passed ({num_producers} producers, 2 consumers, {total_items} items)")
def test_no_subprocess_spawned():
"""Verify that no subprocess is spawned (unlike multiprocessing.Manager())."""
import os
from backend.data.execution import ExecutionQueue
# Get current process ID
current_pid = os.getpid()
# Create multiple queues (this would spawn subprocesses with Manager())
queues = [ExecutionQueue() for _ in range(5)]
# If we got here without issues, no subprocesses were spawned
# With Manager().Queue(), creating 5 queues would spawn 5 manager processes
for q in queues:
q.add("test")
assert q.get() == "test"
print("✓ No subprocess spawning (5 queues created without spawning manager processes)")
def main():
print("=" * 60)
print("ExecutionQueue Fix Verification Tests")
print("=" * 60)
print()
tests = [
("Queue Type Check", test_queue_type),
("Basic Operations", test_basic_operations),
("Thread Safety", test_thread_safety),
("Multiple Producers/Consumers", test_multiple_producers_consumers),
("No Subprocess Spawning", test_no_subprocess_spawned),
]
passed = 0
failed = 0
for name, test_func in tests:
print(f"\n--- {name} ---")
try:
test_func()
passed += 1
except AssertionError as e:
print(f"{e}")
failed += 1
except Exception as e:
print(f"✗ Unexpected error: {e}")
failed += 1
print()
print("=" * 60)
if failed == 0:
print(f"✅ ALL TESTS PASSED ({passed}/{passed})")
print("The ExecutionQueue fix is working correctly!")
else:
print(f"❌ TESTS FAILED: {failed} failed, {passed} passed")
print("=" * 60)
return 0 if failed == 0 else 1
if __name__ == "__main__":
sys.exit(main())