hotfix(backend/executor): Fix RabbitMQ channel retry logic in executor (#10661)

## Summary
**HOTFIX for production** - Fixes executor being stuck in infinite retry
loop when RabbitMQ channels are closed
- Ensures proper reconnection by checking channel state before
attempting to consume messages
- Prevents accumulation of thousands of retry attempts (was seeing 7000+
retries)

## Changes
The executor was stuck repeatedly failing with "Channel is closed"
errors because the `continuous_retry` decorator was attempting to reuse
closed channels instead of creating new ones.

Added channel state checks (`is_ready`) before connecting in both:
- `_consume_execution_run()` 
- `_consume_execution_cancel()`

When a channel is not ready (closed), the code now:
1. Disconnects the client (safe operation, checks if already
disconnected)
2. Establishes a fresh connection with new channel
3. Proceeds with message consumption

## Test plan
- [x] Verified the disconnect() method is safe to call on already
disconnected clients
- [x] Confirmed is_ready property checks both connection and channel
state
- [ ] Deploy to environment and verify executors reconnect properly
after channel failures
- [ ] Monitor logs to ensure no more "Channel is closed" retry loops

## Related Issues
Fixes critical production issue where:
- Executor pods show repeated "Channel is closed" errors
- 757 messages stuck in `graph_execution_queue`
- 102,286 messages in `failed_notifications` queue
- RabbitMQ logs show connections being closed due to missed heartbeats

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Zamil Majdy
2025-08-17 02:06:06 +04:00
committed by GitHub
parent af7d56612d
commit bf92e7dbc8

View File

@@ -1208,6 +1208,9 @@ class ExecutionManager(AppProcess):
)
return
# Check if channel is closed and force reconnection if needed
if not self.cancel_client.is_ready:
self.cancel_client.disconnect()
self.cancel_client.connect()
cancel_channel = self.cancel_client.get_channel()
cancel_channel.basic_consume(
@@ -1237,6 +1240,9 @@ class ExecutionManager(AppProcess):
)
return
# Check if channel is closed and force reconnection if needed
if not self.run_client.is_ready:
self.run_client.disconnect()
self.run_client.connect()
run_channel = self.run_client.get_channel()
run_channel.basic_qos(prefetch_count=self.pool_size)