mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-01-09 07:08:09 -05:00
## Summary This PR implements a graph-level Safe Mode toggle system for Human-in-the-Loop (HITL) blocks. When Safe Mode is ON (default), HITL blocks require manual review before proceeding. When OFF, they execute automatically. ## 🔧 Backend Changes - **Database**: Added `metadata` JSON column to `AgentGraph` table with migration - **API**: Updated `execute_graph` endpoint to accept `safe_mode` parameter - **Execution**: Enhanced execution context to use graph metadata as default with API override capability - **Auto-detection**: Automatically populate `has_human_in_the_loop` for graphs containing HITL blocks - **Block Detection**: HITL block ID: `8b2a7b3c-6e9d-4a5f-8c1b-2e3f4a5b6c7d` ## 🎨 Frontend Changes - **Component**: New `FloatingSafeModeToggle` with dual variants: - **White variant**: For library pages, integrates with action buttons - **Black variant**: For builders, floating positioned - **Integration**: Added toggles to both new/legacy builders and library pages - **API Integration**: Direct graph metadata updates via `usePutV1UpdateGraphVersion` - **Query Management**: React Query cache invalidation for consistent UI updates - **Conditional Display**: Toggle only appears when graph contains HITL blocks ## 🛠 Technical Implementation - **Safe Mode ON** (default): HITL blocks require manual review before proceeding - **Safe Mode OFF**: HITL blocks execute automatically without intervention - **Priority**: Backend API `safe_mode` parameter takes precedence over graph metadata - **Detection**: Auto-populates `has_human_in_the_loop` metadata field - **Positioning**: Proper z-index and responsive positioning for floating elements ## 🚧 Known Issues (Work in Progress) ### High Priority - [ ] **Toggle state persistence**: Always shows "ON" regardless of actual state - query invalidation issue - [ ] **LibraryAgent metadata**: Missing metadata field causing TypeScript errors - [ ] **Tooltip z-index**: Still covered by some UI elements despite high z-index ### Medium Priority - [ ] **HITL detection**: Logic needs improvement for reliable block detection - [ ] **Error handling**: Removing HITL blocks from graph causes save errors - [ ] **TypeScript**: Fix type mismatches between GraphModel and LibraryAgent ### Low Priority - [ ] **Frontend API**: Add `safe_mode` parameter to execution calls once OpenAPI is regenerated - [ ] **Performance**: Consider debouncing rapid toggle clicks ## 🧪 Test Plan - [ ] Verify toggle appears only when graph has HITL blocks - [ ] Test toggle persistence across page refreshes - [ ] Confirm API calls update graph metadata correctly - [ ] Validate execution behavior respects safe mode setting - [ ] Check styling consistency across builder and library contexts ## 🔗 Related - Addresses requirements for graph-level HITL configuration - Builds on existing FloatingReviewsPanel infrastructure - Integrates with existing graph metadata system 🤖 Generated with [Claude Code](https://claude.ai/code)
350 lines
13 KiB
Python
350 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Integration test for the requeue fix implementation.
|
|
Tests actual RabbitMQ behavior to verify that republishing sends messages to back of queue.
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
from threading import Event
|
|
from typing import List
|
|
|
|
from backend.data.rabbitmq import SyncRabbitMQ
|
|
from backend.executor.utils import create_execution_queue_config
|
|
|
|
|
|
class QueueOrderTester:
|
|
"""Helper class to test message ordering in RabbitMQ using a dedicated test queue."""
|
|
|
|
def __init__(self):
|
|
self.received_messages: List[dict] = []
|
|
self.stop_consuming = Event()
|
|
self.queue_client = SyncRabbitMQ(create_execution_queue_config())
|
|
self.queue_client.connect()
|
|
|
|
# Use a dedicated test queue name to avoid conflicts
|
|
self.test_queue_name = "test_requeue_ordering"
|
|
self.test_exchange = "test_exchange"
|
|
self.test_routing_key = "test.requeue"
|
|
|
|
def setup_queue(self):
|
|
"""Set up a dedicated test queue for testing."""
|
|
channel = self.queue_client.get_channel()
|
|
|
|
# Declare test exchange
|
|
channel.exchange_declare(
|
|
exchange=self.test_exchange, exchange_type="direct", durable=True
|
|
)
|
|
|
|
# Declare test queue
|
|
channel.queue_declare(
|
|
queue=self.test_queue_name, durable=True, auto_delete=False
|
|
)
|
|
|
|
# Bind queue to exchange
|
|
channel.queue_bind(
|
|
exchange=self.test_exchange,
|
|
queue=self.test_queue_name,
|
|
routing_key=self.test_routing_key,
|
|
)
|
|
|
|
# Purge the queue to start fresh
|
|
channel.queue_purge(self.test_queue_name)
|
|
print(f"✅ Test queue {self.test_queue_name} setup and purged")
|
|
|
|
def create_test_message(self, message_id: str, user_id: str = "test-user") -> str:
|
|
"""Create a test graph execution message."""
|
|
return json.dumps(
|
|
{
|
|
"graph_exec_id": f"exec-{message_id}",
|
|
"graph_id": f"graph-{message_id}",
|
|
"user_id": user_id,
|
|
"execution_context": {"timezone": "UTC"},
|
|
"nodes_input_masks": {},
|
|
"starting_nodes_input": [],
|
|
}
|
|
)
|
|
|
|
def publish_message(self, message: str):
|
|
"""Publish a message to the test queue."""
|
|
channel = self.queue_client.get_channel()
|
|
channel.basic_publish(
|
|
exchange=self.test_exchange,
|
|
routing_key=self.test_routing_key,
|
|
body=message,
|
|
)
|
|
|
|
def consume_messages(self, max_messages: int = 10, timeout: float = 5.0):
|
|
"""Consume messages and track their order."""
|
|
|
|
def callback(ch, method, properties, body):
|
|
try:
|
|
message_data = json.loads(body.decode())
|
|
self.received_messages.append(message_data)
|
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
|
|
if len(self.received_messages) >= max_messages:
|
|
self.stop_consuming.set()
|
|
except Exception as e:
|
|
print(f"Error processing message: {e}")
|
|
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
|
|
|
|
# Use synchronous consumption with blocking
|
|
channel = self.queue_client.get_channel()
|
|
|
|
# Check if there are messages in the queue first
|
|
method_frame, header_frame, body = channel.basic_get(
|
|
queue=self.test_queue_name, auto_ack=False
|
|
)
|
|
if method_frame:
|
|
# There are messages, set up consumer
|
|
channel.basic_nack(
|
|
delivery_tag=method_frame.delivery_tag, requeue=True
|
|
) # Put message back
|
|
|
|
# Set up consumer
|
|
channel.basic_consume(
|
|
queue=self.test_queue_name,
|
|
on_message_callback=callback,
|
|
)
|
|
|
|
# Consume with timeout
|
|
start_time = time.time()
|
|
while (
|
|
not self.stop_consuming.is_set()
|
|
and (time.time() - start_time) < timeout
|
|
and len(self.received_messages) < max_messages
|
|
):
|
|
try:
|
|
channel.connection.process_data_events(time_limit=0.1)
|
|
except Exception as e:
|
|
print(f"Error during consumption: {e}")
|
|
break
|
|
|
|
# Cancel the consumer
|
|
try:
|
|
channel.cancel()
|
|
except Exception:
|
|
pass
|
|
else:
|
|
# No messages in queue - this might be expected for some tests
|
|
pass
|
|
|
|
return self.received_messages
|
|
|
|
def cleanup(self):
|
|
"""Clean up test resources."""
|
|
try:
|
|
channel = self.queue_client.get_channel()
|
|
channel.queue_delete(queue=self.test_queue_name)
|
|
channel.exchange_delete(exchange=self.test_exchange)
|
|
print(f"✅ Test queue {self.test_queue_name} cleaned up")
|
|
except Exception as e:
|
|
print(f"⚠️ Cleanup issue: {e}")
|
|
|
|
|
|
def test_queue_ordering_behavior():
|
|
"""
|
|
Integration test to verify that our republishing method sends messages to back of queue.
|
|
This tests the actual fix for the rate limiting queue blocking issue.
|
|
"""
|
|
tester = QueueOrderTester()
|
|
|
|
try:
|
|
tester.setup_queue()
|
|
|
|
print("🧪 Testing actual RabbitMQ queue ordering behavior...")
|
|
|
|
# Test 1: Normal FIFO behavior
|
|
print("1. Testing normal FIFO queue behavior")
|
|
|
|
# Publish messages in order: A, B, C
|
|
msg_a = tester.create_test_message("A")
|
|
msg_b = tester.create_test_message("B")
|
|
msg_c = tester.create_test_message("C")
|
|
|
|
tester.publish_message(msg_a)
|
|
tester.publish_message(msg_b)
|
|
tester.publish_message(msg_c)
|
|
|
|
# Consume and verify FIFO order: A, B, C
|
|
tester.received_messages = []
|
|
tester.stop_consuming.clear()
|
|
messages = tester.consume_messages(max_messages=3)
|
|
|
|
assert len(messages) == 3, f"Expected 3 messages, got {len(messages)}"
|
|
assert (
|
|
messages[0]["graph_exec_id"] == "exec-A"
|
|
), f"First message should be A, got {messages[0]['graph_exec_id']}"
|
|
assert (
|
|
messages[1]["graph_exec_id"] == "exec-B"
|
|
), f"Second message should be B, got {messages[1]['graph_exec_id']}"
|
|
assert (
|
|
messages[2]["graph_exec_id"] == "exec-C"
|
|
), f"Third message should be C, got {messages[2]['graph_exec_id']}"
|
|
|
|
print("✅ FIFO order confirmed: A -> B -> C")
|
|
|
|
# Test 2: Rate limiting simulation - the key test!
|
|
print("2. Testing rate limiting fix scenario")
|
|
|
|
# Simulate the scenario where user1 is rate limited
|
|
user1_msg = tester.create_test_message("RATE-LIMITED", "user1")
|
|
user2_msg1 = tester.create_test_message("USER2-1", "user2")
|
|
user2_msg2 = tester.create_test_message("USER2-2", "user2")
|
|
|
|
# Initially publish user1 message (gets consumed, then rate limited on retry)
|
|
tester.publish_message(user1_msg)
|
|
|
|
# Other users publish their messages
|
|
tester.publish_message(user2_msg1)
|
|
tester.publish_message(user2_msg2)
|
|
|
|
# Now simulate: user1 message gets "requeued" using our new republishing method
|
|
# This is what happens in manager.py when requeue_by_republishing=True
|
|
tester.publish_message(user1_msg) # Goes to back via our method
|
|
|
|
# Expected order: RATE-LIMITED, USER2-1, USER2-2, RATE-LIMITED (republished to back)
|
|
# This shows that user2 messages get processed instead of being blocked
|
|
tester.received_messages = []
|
|
tester.stop_consuming.clear()
|
|
messages = tester.consume_messages(max_messages=4)
|
|
|
|
assert len(messages) == 4, f"Expected 4 messages, got {len(messages)}"
|
|
|
|
# The key verification: user2 messages are NOT blocked by user1's rate-limited message
|
|
user2_messages = [msg for msg in messages if msg["user_id"] == "user2"]
|
|
assert len(user2_messages) == 2, "Both user2 messages should be processed"
|
|
assert user2_messages[0]["graph_exec_id"] == "exec-USER2-1"
|
|
assert user2_messages[1]["graph_exec_id"] == "exec-USER2-2"
|
|
|
|
print("✅ Rate limiting fix confirmed: user2 executions NOT blocked by user1")
|
|
|
|
# Test 3: Verify our method behaves like going to back of queue
|
|
print("3. Testing republishing sends messages to back")
|
|
|
|
# Start with message X in queue
|
|
msg_x = tester.create_test_message("X")
|
|
tester.publish_message(msg_x)
|
|
|
|
# Add message Y
|
|
msg_y = tester.create_test_message("Y")
|
|
tester.publish_message(msg_y)
|
|
|
|
# Republish X (simulates requeue using our method)
|
|
tester.publish_message(msg_x)
|
|
|
|
# Expected: X, Y, X (X was republished to back)
|
|
tester.received_messages = []
|
|
tester.stop_consuming.clear()
|
|
messages = tester.consume_messages(max_messages=3)
|
|
|
|
assert len(messages) == 3
|
|
# Y should come before the republished X
|
|
y_index = next(
|
|
i for i, msg in enumerate(messages) if msg["graph_exec_id"] == "exec-Y"
|
|
)
|
|
republished_x_index = next(
|
|
i
|
|
for i, msg in enumerate(messages[1:], 1)
|
|
if msg["graph_exec_id"] == "exec-X"
|
|
)
|
|
|
|
assert (
|
|
y_index < republished_x_index
|
|
), f"Y should come before republished X, but got order: {[m['graph_exec_id'] for m in messages]}"
|
|
|
|
print("✅ Republishing confirmed: messages go to back of queue")
|
|
|
|
print("🎉 All integration tests passed!")
|
|
print("🎉 Our republishing method works correctly with real RabbitMQ")
|
|
print("🎉 Queue blocking issue is fixed!")
|
|
|
|
finally:
|
|
tester.cleanup()
|
|
|
|
|
|
def test_traditional_requeue_behavior():
|
|
"""
|
|
Test that traditional requeue (basic_nack with requeue=True) sends messages to FRONT of queue.
|
|
This validates our hypothesis about why queue blocking occurs.
|
|
"""
|
|
tester = QueueOrderTester()
|
|
|
|
try:
|
|
tester.setup_queue()
|
|
print("🧪 Testing traditional requeue behavior (basic_nack with requeue=True)")
|
|
|
|
# Step 1: Publish message A
|
|
msg_a = tester.create_test_message("A")
|
|
tester.publish_message(msg_a)
|
|
|
|
# Step 2: Publish message B
|
|
msg_b = tester.create_test_message("B")
|
|
tester.publish_message(msg_b)
|
|
|
|
# Step 3: Consume message A and requeue it using traditional method
|
|
channel = tester.queue_client.get_channel()
|
|
method_frame, header_frame, body = channel.basic_get(
|
|
queue=tester.test_queue_name, auto_ack=False
|
|
)
|
|
|
|
assert method_frame is not None, "Should have received message A"
|
|
consumed_msg = json.loads(body.decode())
|
|
assert (
|
|
consumed_msg["graph_exec_id"] == "exec-A"
|
|
), f"Should have consumed message A, got {consumed_msg['graph_exec_id']}"
|
|
|
|
# Traditional requeue: basic_nack with requeue=True (sends to FRONT)
|
|
channel.basic_nack(delivery_tag=method_frame.delivery_tag, requeue=True)
|
|
print(f"🔄 Traditional requeue (to FRONT): {consumed_msg['graph_exec_id']}")
|
|
|
|
# Step 4: Consume all messages using basic_get for reliability
|
|
received_messages = []
|
|
|
|
# Get first message
|
|
method_frame, header_frame, body = channel.basic_get(
|
|
queue=tester.test_queue_name, auto_ack=True
|
|
)
|
|
if method_frame:
|
|
msg = json.loads(body.decode())
|
|
received_messages.append(msg)
|
|
|
|
# Get second message
|
|
method_frame, header_frame, body = channel.basic_get(
|
|
queue=tester.test_queue_name, auto_ack=True
|
|
)
|
|
if method_frame:
|
|
msg = json.loads(body.decode())
|
|
received_messages.append(msg)
|
|
|
|
# CRITICAL ASSERTION: Traditional requeue should put A at FRONT
|
|
# Expected order: A (requeued to front), B
|
|
assert (
|
|
len(received_messages) == 2
|
|
), f"Expected 2 messages, got {len(received_messages)}"
|
|
|
|
first_msg = received_messages[0]["graph_exec_id"]
|
|
second_msg = received_messages[1]["graph_exec_id"]
|
|
|
|
# This is the critical test: requeued message A should come BEFORE B
|
|
assert (
|
|
first_msg == "exec-A"
|
|
), f"Traditional requeue should put A at FRONT, but first message was: {first_msg}"
|
|
assert (
|
|
second_msg == "exec-B"
|
|
), f"B should come after requeued A, but second message was: {second_msg}"
|
|
|
|
print(
|
|
"✅ HYPOTHESIS CONFIRMED: Traditional requeue sends messages to FRONT of queue"
|
|
)
|
|
print(f" Order: {first_msg} (requeued to front) → {second_msg}")
|
|
print(" This explains why rate-limited messages block other users!")
|
|
|
|
finally:
|
|
tester.cleanup()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
test_queue_ordering_behavior()
|