mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-02 10:55:14 -05:00
fix(backend): Handle stale RabbitMQ channels on connection drop
When the RabbitMQ connection drops, connect_robust reconnects the
underlying connection but the cached channel object becomes invalid.
The is_ready check passes (channel doesn't report as closed) but
operations fail with ChannelInvalidStateError ('No active transport
in channel'), generating ~39K Sentry errors (AUTOGPT-SERVER-1TN).
Fix:
- Add _ensure_channel() helper that resets stale channels before
reconnecting, so connect() creates a fresh channel
- Catch ChannelInvalidStateError in publish_message and do a full
reconnect (reset both channel and connection) before retrying
- Simplify get_channel() to use the same resilient helper
The existing @func_retry decorator provides additional retry
resilience on top of the explicit reconnect handling.
This commit is contained in:
@@ -291,6 +291,18 @@ class AsyncRabbitMQ(RabbitMQBase):
|
||||
exchange, routing_key=queue.routing_key or queue.name
|
||||
)
|
||||
|
||||
async def _ensure_channel(self) -> aio_pika.abc.AbstractChannel:
|
||||
"""Get a valid channel, reconnecting if the current one is stale."""
|
||||
if not self.is_ready:
|
||||
# Reset stale channel so connect() creates a fresh one
|
||||
self._channel = None
|
||||
await self.connect()
|
||||
|
||||
if self._channel is None:
|
||||
raise RuntimeError("Channel should be established after connect")
|
||||
|
||||
return self._channel
|
||||
|
||||
@func_retry
|
||||
async def publish_message(
|
||||
self,
|
||||
@@ -299,32 +311,58 @@ class AsyncRabbitMQ(RabbitMQBase):
|
||||
exchange: Optional[Exchange] = None,
|
||||
persistent: bool = True,
|
||||
) -> None:
|
||||
if not self.is_ready:
|
||||
try:
|
||||
channel = await self._ensure_channel()
|
||||
except Exception:
|
||||
# Force full reconnect on channel acquisition failure
|
||||
self._channel = None
|
||||
self._connection = None
|
||||
channel = await self._ensure_channel()
|
||||
|
||||
try:
|
||||
if exchange:
|
||||
exchange_obj = await channel.get_exchange(exchange.name)
|
||||
else:
|
||||
exchange_obj = channel.default_exchange
|
||||
|
||||
await exchange_obj.publish(
|
||||
aio_pika.Message(
|
||||
body=message.encode(),
|
||||
delivery_mode=(
|
||||
aio_pika.DeliveryMode.PERSISTENT
|
||||
if persistent
|
||||
else aio_pika.DeliveryMode.NOT_PERSISTENT
|
||||
),
|
||||
),
|
||||
routing_key=routing_key,
|
||||
)
|
||||
except aio_pika.exceptions.ChannelInvalidStateError:
|
||||
logger.warning(
|
||||
"RabbitMQ channel invalid, reconnecting and retrying publish"
|
||||
)
|
||||
self._channel = None
|
||||
self._connection = None
|
||||
await self.connect()
|
||||
|
||||
if self._channel is None:
|
||||
raise RuntimeError("Channel should be established after connect")
|
||||
if self._channel is None:
|
||||
raise RuntimeError("Channel should be established after reconnect")
|
||||
|
||||
if exchange:
|
||||
exchange_obj = await self._channel.get_exchange(exchange.name)
|
||||
else:
|
||||
exchange_obj = self._channel.default_exchange
|
||||
if exchange:
|
||||
exchange_obj = await self._channel.get_exchange(exchange.name)
|
||||
else:
|
||||
exchange_obj = self._channel.default_exchange
|
||||
|
||||
await exchange_obj.publish(
|
||||
aio_pika.Message(
|
||||
body=message.encode(),
|
||||
delivery_mode=(
|
||||
aio_pika.DeliveryMode.PERSISTENT
|
||||
if persistent
|
||||
else aio_pika.DeliveryMode.NOT_PERSISTENT
|
||||
await exchange_obj.publish(
|
||||
aio_pika.Message(
|
||||
body=message.encode(),
|
||||
delivery_mode=(
|
||||
aio_pika.DeliveryMode.PERSISTENT
|
||||
if persistent
|
||||
else aio_pika.DeliveryMode.NOT_PERSISTENT
|
||||
),
|
||||
),
|
||||
),
|
||||
routing_key=routing_key,
|
||||
)
|
||||
routing_key=routing_key,
|
||||
)
|
||||
|
||||
async def get_channel(self) -> aio_pika.abc.AbstractChannel:
|
||||
if not self.is_ready:
|
||||
await self.connect()
|
||||
if self._channel is None:
|
||||
raise RuntimeError("Channel should be established after connect")
|
||||
return self._channel
|
||||
return await self._ensure_channel()
|
||||
|
||||
Reference in New Issue
Block a user