Compare commits

...

2 Commits

Author SHA1 Message Date
Bentlybro
73651d7f38 fix(backend): Handle stale RabbitMQ channels on connection drop
When the RabbitMQ connection drops, connect_robust reconnects the
underlying connection but the cached channel object becomes invalid.
The is_ready check passes (channel doesn't report as closed) but
operations fail with ChannelInvalidStateError ('No active transport
in channel'), generating ~39K Sentry errors (AUTOGPT-SERVER-1TN).

Fix:
- Add _ensure_channel() helper that resets stale channels before
  reconnecting, so connect() creates a fresh channel
- Catch ChannelInvalidStateError in publish_message and do a full
  reconnect (reset both channel and connection) before retrying
- Simplify get_channel() to use the same resilient helper

The existing @func_retry decorator provides additional retry
resilience on top of the explicit reconnect handling.
2026-02-02 15:10:25 +00:00
Guofang.Tang
1081590384 feat(backend): cover webhook ingress URL route (#11747)
### Changes 🏗️

- Add a unit test to verify webhook ingress URL generation matches the
FastAPI route.

  ### Checklist 📋

  #### For code changes:

  - [x] I have clearly listed my changes in the PR description
  - [x] I have made a test plan
  - [x] I have tested my changes according to the test plan:
- [x] poetry run pytest backend/integrations/webhooks/utils_test.py
--confcutdir=backend/integrations/webhooks

  #### For configuration changes:

  - [x] .env.default is updated or already compatible with my changes
- [x] docker-compose.yml is updated or already compatible with my
changes
- [x] I have included a list of my configuration changes in the PR
description (under Changes)



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **Tests**
* Added a unit test that validates webhook ingress URL generation
matches the application's resolved route (scheme, host, and path) for
provider-specific webhook endpoints, improving confidence in routing
behavior and helping prevent regressions.

<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Reinier van der Leer <pwuts@agpt.co>
2026-02-01 20:29:15 +00:00
2 changed files with 99 additions and 22 deletions

View File

@@ -291,6 +291,18 @@ class AsyncRabbitMQ(RabbitMQBase):
exchange, routing_key=queue.routing_key or queue.name
)
async def _ensure_channel(self) -> aio_pika.abc.AbstractChannel:
"""Get a valid channel, reconnecting if the current one is stale."""
if not self.is_ready:
# Reset stale channel so connect() creates a fresh one
self._channel = None
await self.connect()
if self._channel is None:
raise RuntimeError("Channel should be established after connect")
return self._channel
@func_retry
async def publish_message(
self,
@@ -299,32 +311,58 @@ class AsyncRabbitMQ(RabbitMQBase):
exchange: Optional[Exchange] = None,
persistent: bool = True,
) -> None:
if not self.is_ready:
try:
channel = await self._ensure_channel()
except Exception:
# Force full reconnect on channel acquisition failure
self._channel = None
self._connection = None
channel = await self._ensure_channel()
try:
if exchange:
exchange_obj = await channel.get_exchange(exchange.name)
else:
exchange_obj = channel.default_exchange
await exchange_obj.publish(
aio_pika.Message(
body=message.encode(),
delivery_mode=(
aio_pika.DeliveryMode.PERSISTENT
if persistent
else aio_pika.DeliveryMode.NOT_PERSISTENT
),
),
routing_key=routing_key,
)
except aio_pika.exceptions.ChannelInvalidStateError:
logger.warning(
"RabbitMQ channel invalid, reconnecting and retrying publish"
)
self._channel = None
self._connection = None
await self.connect()
if self._channel is None:
raise RuntimeError("Channel should be established after connect")
if self._channel is None:
raise RuntimeError("Channel should be established after reconnect")
if exchange:
exchange_obj = await self._channel.get_exchange(exchange.name)
else:
exchange_obj = self._channel.default_exchange
if exchange:
exchange_obj = await self._channel.get_exchange(exchange.name)
else:
exchange_obj = self._channel.default_exchange
await exchange_obj.publish(
aio_pika.Message(
body=message.encode(),
delivery_mode=(
aio_pika.DeliveryMode.PERSISTENT
if persistent
else aio_pika.DeliveryMode.NOT_PERSISTENT
await exchange_obj.publish(
aio_pika.Message(
body=message.encode(),
delivery_mode=(
aio_pika.DeliveryMode.PERSISTENT
if persistent
else aio_pika.DeliveryMode.NOT_PERSISTENT
),
),
),
routing_key=routing_key,
)
routing_key=routing_key,
)
async def get_channel(self) -> aio_pika.abc.AbstractChannel:
if not self.is_ready:
await self.connect()
if self._channel is None:
raise RuntimeError("Channel should be established after connect")
return self._channel
return await self._ensure_channel()

View File

@@ -0,0 +1,39 @@
from urllib.parse import urlparse
import fastapi
from fastapi.routing import APIRoute
from backend.api.features.integrations.router import router as integrations_router
from backend.integrations.providers import ProviderName
from backend.integrations.webhooks import utils as webhooks_utils
def test_webhook_ingress_url_matches_route(monkeypatch) -> None:
app = fastapi.FastAPI()
app.include_router(integrations_router, prefix="/api/integrations")
provider = ProviderName.GITHUB
webhook_id = "webhook_123"
base_url = "https://example.com"
monkeypatch.setattr(webhooks_utils.app_config, "platform_base_url", base_url)
route = next(
route
for route in integrations_router.routes
if isinstance(route, APIRoute)
and route.path == "/{provider}/webhooks/{webhook_id}/ingress"
and "POST" in route.methods
)
expected_path = f"/api/integrations{route.path}".format(
provider=provider.value,
webhook_id=webhook_id,
)
actual_url = urlparse(webhooks_utils.webhook_ingress_url(provider, webhook_id))
expected_base = urlparse(base_url)
assert (actual_url.scheme, actual_url.netloc) == (
expected_base.scheme,
expected_base.netloc,
)
assert actual_url.path == expected_path