Files
AutoGPT/autogpt_platform/backend/test_requeue_integration.py
2026-02-12 03:25:19 +00:00

350 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Integration test for the requeue fix implementation.
Tests actual RabbitMQ behavior to verify that republishing sends messages to back of queue.
"""
import json
import time
from threading import Event
from typing import List
from backend.data.rabbitmq import SyncRabbitMQ
from backend.executor.utils import create_execution_queue_config
class QueueOrderTester:
"""Helper class to test message ordering in RabbitMQ using a dedicated test queue."""
def __init__(self):
self.received_messages: List[dict] = []
self.stop_consuming = Event()
self.queue_client = SyncRabbitMQ(create_execution_queue_config())
self.queue_client.connect()
# Use a dedicated test queue name to avoid conflicts
self.test_queue_name = "test_requeue_ordering"
self.test_exchange = "test_exchange"
self.test_routing_key = "test.requeue"
def setup_queue(self):
"""Set up a dedicated test queue for testing."""
channel = self.queue_client.get_channel()
# Declare test exchange
channel.exchange_declare(
exchange=self.test_exchange, exchange_type="direct", durable=True
)
# Declare test queue
channel.queue_declare(
queue=self.test_queue_name, durable=True, auto_delete=False
)
# Bind queue to exchange
channel.queue_bind(
exchange=self.test_exchange,
queue=self.test_queue_name,
routing_key=self.test_routing_key,
)
# Purge the queue to start fresh
channel.queue_purge(self.test_queue_name)
print(f"✅ Test queue {self.test_queue_name} setup and purged")
def create_test_message(self, message_id: str, user_id: str = "test-user") -> str:
"""Create a test graph execution message."""
return json.dumps(
{
"graph_exec_id": f"exec-{message_id}",
"graph_id": f"graph-{message_id}",
"user_id": user_id,
"execution_context": {"timezone": "UTC"},
"nodes_input_masks": {},
"starting_nodes_input": [],
}
)
def publish_message(self, message: str):
"""Publish a message to the test queue."""
channel = self.queue_client.get_channel()
channel.basic_publish(
exchange=self.test_exchange,
routing_key=self.test_routing_key,
body=message,
)
def consume_messages(self, max_messages: int = 10, timeout: float = 5.0):
"""Consume messages and track their order."""
def callback(ch, method, properties, body):
try:
message_data = json.loads(body.decode())
self.received_messages.append(message_data)
ch.basic_ack(delivery_tag=method.delivery_tag)
if len(self.received_messages) >= max_messages:
self.stop_consuming.set()
except Exception as e:
print(f"Error processing message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
# Use synchronous consumption with blocking
channel = self.queue_client.get_channel()
# Check if there are messages in the queue first
method_frame, header_frame, body = channel.basic_get(
queue=self.test_queue_name, auto_ack=False
)
if method_frame:
# There are messages, set up consumer
channel.basic_nack(
delivery_tag=method_frame.delivery_tag, requeue=True
) # Put message back
# Set up consumer
channel.basic_consume(
queue=self.test_queue_name,
on_message_callback=callback,
)
# Consume with timeout
start_time = time.time()
while (
not self.stop_consuming.is_set()
and (time.time() - start_time) < timeout
and len(self.received_messages) < max_messages
):
try:
channel.connection.process_data_events(time_limit=0.1)
except Exception as e:
print(f"Error during consumption: {e}")
break
# Cancel the consumer
try:
channel.cancel()
except Exception:
pass
else:
# No messages in queue - this might be expected for some tests
pass
return self.received_messages
def cleanup(self):
"""Clean up test resources."""
try:
channel = self.queue_client.get_channel()
channel.queue_delete(queue=self.test_queue_name)
channel.exchange_delete(exchange=self.test_exchange)
print(f"✅ Test queue {self.test_queue_name} cleaned up")
except Exception as e:
print(f"⚠️ Cleanup issue: {e}")
def test_queue_ordering_behavior():
"""
Integration test to verify that our republishing method sends messages to back of queue.
This tests the actual fix for the rate limiting queue blocking issue.
"""
tester = QueueOrderTester()
try:
tester.setup_queue()
print("🧪 Testing actual RabbitMQ queue ordering behavior...")
# Test 1: Normal FIFO behavior
print("1. Testing normal FIFO queue behavior")
# Publish messages in order: A, B, C
msg_a = tester.create_test_message("A")
msg_b = tester.create_test_message("B")
msg_c = tester.create_test_message("C")
tester.publish_message(msg_a)
tester.publish_message(msg_b)
tester.publish_message(msg_c)
# Consume and verify FIFO order: A, B, C
tester.received_messages = []
tester.stop_consuming.clear()
messages = tester.consume_messages(max_messages=3)
assert len(messages) == 3, f"Expected 3 messages, got {len(messages)}"
assert messages[0]["graph_exec_id"] == "exec-A", (
f"First message should be A, got {messages[0]['graph_exec_id']}"
)
assert messages[1]["graph_exec_id"] == "exec-B", (
f"Second message should be B, got {messages[1]['graph_exec_id']}"
)
assert messages[2]["graph_exec_id"] == "exec-C", (
f"Third message should be C, got {messages[2]['graph_exec_id']}"
)
print("✅ FIFO order confirmed: A -> B -> C")
# Test 2: Rate limiting simulation - the key test!
print("2. Testing rate limiting fix scenario")
# Simulate the scenario where user1 is rate limited
user1_msg = tester.create_test_message("RATE-LIMITED", "user1")
user2_msg1 = tester.create_test_message("USER2-1", "user2")
user2_msg2 = tester.create_test_message("USER2-2", "user2")
# Initially publish user1 message (gets consumed, then rate limited on retry)
tester.publish_message(user1_msg)
# Other users publish their messages
tester.publish_message(user2_msg1)
tester.publish_message(user2_msg2)
# Now simulate: user1 message gets "requeued" using our new republishing method
# This is what happens in manager.py when requeue_by_republishing=True
tester.publish_message(user1_msg) # Goes to back via our method
# Expected order: RATE-LIMITED, USER2-1, USER2-2, RATE-LIMITED (republished to back)
# This shows that user2 messages get processed instead of being blocked
tester.received_messages = []
tester.stop_consuming.clear()
messages = tester.consume_messages(max_messages=4)
assert len(messages) == 4, f"Expected 4 messages, got {len(messages)}"
# The key verification: user2 messages are NOT blocked by user1's rate-limited message
user2_messages = [msg for msg in messages if msg["user_id"] == "user2"]
assert len(user2_messages) == 2, "Both user2 messages should be processed"
assert user2_messages[0]["graph_exec_id"] == "exec-USER2-1"
assert user2_messages[1]["graph_exec_id"] == "exec-USER2-2"
print("✅ Rate limiting fix confirmed: user2 executions NOT blocked by user1")
# Test 3: Verify our method behaves like going to back of queue
print("3. Testing republishing sends messages to back")
# Start with message X in queue
msg_x = tester.create_test_message("X")
tester.publish_message(msg_x)
# Add message Y
msg_y = tester.create_test_message("Y")
tester.publish_message(msg_y)
# Republish X (simulates requeue using our method)
tester.publish_message(msg_x)
# Expected: X, Y, X (X was republished to back)
tester.received_messages = []
tester.stop_consuming.clear()
messages = tester.consume_messages(max_messages=3)
assert len(messages) == 3
# Y should come before the republished X
y_index = next(
i for i, msg in enumerate(messages) if msg["graph_exec_id"] == "exec-Y"
)
republished_x_index = next(
i
for i, msg in enumerate(messages[1:], 1)
if msg["graph_exec_id"] == "exec-X"
)
assert y_index < republished_x_index, (
f"Y should come before republished X, but got order: {[m['graph_exec_id'] for m in messages]}"
)
print("✅ Republishing confirmed: messages go to back of queue")
print("🎉 All integration tests passed!")
print("🎉 Our republishing method works correctly with real RabbitMQ")
print("🎉 Queue blocking issue is fixed!")
finally:
tester.cleanup()
def test_traditional_requeue_behavior():
"""
Test that traditional requeue (basic_nack with requeue=True) sends messages to FRONT of queue.
This validates our hypothesis about why queue blocking occurs.
"""
tester = QueueOrderTester()
try:
tester.setup_queue()
print("🧪 Testing traditional requeue behavior (basic_nack with requeue=True)")
# Step 1: Publish message A
msg_a = tester.create_test_message("A")
tester.publish_message(msg_a)
# Step 2: Publish message B
msg_b = tester.create_test_message("B")
tester.publish_message(msg_b)
# Step 3: Consume message A and requeue it using traditional method
channel = tester.queue_client.get_channel()
method_frame, header_frame, body = channel.basic_get(
queue=tester.test_queue_name, auto_ack=False
)
assert method_frame is not None, "Should have received message A"
consumed_msg = json.loads(body.decode())
assert consumed_msg["graph_exec_id"] == "exec-A", (
f"Should have consumed message A, got {consumed_msg['graph_exec_id']}"
)
# Traditional requeue: basic_nack with requeue=True (sends to FRONT)
channel.basic_nack(delivery_tag=method_frame.delivery_tag, requeue=True)
print(f"🔄 Traditional requeue (to FRONT): {consumed_msg['graph_exec_id']}")
# Step 4: Consume all messages using basic_get for reliability
received_messages = []
# Get first message
method_frame, header_frame, body = channel.basic_get(
queue=tester.test_queue_name, auto_ack=True
)
if method_frame:
msg = json.loads(body.decode())
received_messages.append(msg)
# Get second message
method_frame, header_frame, body = channel.basic_get(
queue=tester.test_queue_name, auto_ack=True
)
if method_frame:
msg = json.loads(body.decode())
received_messages.append(msg)
# CRITICAL ASSERTION: Traditional requeue should put A at FRONT
# Expected order: A (requeued to front), B
assert len(received_messages) == 2, (
f"Expected 2 messages, got {len(received_messages)}"
)
first_msg = received_messages[0]["graph_exec_id"]
second_msg = received_messages[1]["graph_exec_id"]
# This is the critical test: requeued message A should come BEFORE B
assert first_msg == "exec-A", (
f"Traditional requeue should put A at FRONT, but first message was: {first_msg}"
)
assert second_msg == "exec-B", (
f"B should come after requeued A, but second message was: {second_msg}"
)
print(
"✅ HYPOTHESIS CONFIRMED: Traditional requeue sends messages to FRONT of queue"
)
print(f" Order: {first_msg} (requeued to front) → {second_msg}")
print(" This explains why rate-limited messages block other users!")
finally:
tester.cleanup()
if __name__ == "__main__":
test_queue_ordering_behavior()