From 14cee1670ace3497f62db406770afa663c210056 Mon Sep 17 00:00:00 2001 From: Krzysztof Czerwinski <34861343+kcze@users.noreply.github.com> Date: Tue, 3 Feb 2026 17:07:48 +0900 Subject: [PATCH] fix(backend): Prevent leaking Redis connections in `ws_api` (#11869) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixing https://github.com/Significant-Gravitas/AutoGPT/pull/11297#discussion_r2496833421 ### Changes 🏗️ 1. event_bus.py - Added close method to AsyncRedisEventBus - Added __init__ method to track the _pubsub instance attribute - Added async def close() method that closes the PubSub connection safely - Modified listen_events() to store the pubsub reference in self._pubsub 2. ws_api.py - Added cleanup in event_broadcaster - Wrapped the worker coroutines in try/finally block - The finally block calls close() on both event buses to ensure cleanup happens on any exit (including exceptions before retry) --- .../backend/backend/api/ws_api.py | 26 ++++++++++++------- .../backend/backend/data/event_bus.py | 14 ++++++++++ 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/autogpt_platform/backend/backend/api/ws_api.py b/autogpt_platform/backend/backend/api/ws_api.py index b71fdb3526..e254d4b4db 100644 --- a/autogpt_platform/backend/backend/api/ws_api.py +++ b/autogpt_platform/backend/backend/api/ws_api.py @@ -66,18 +66,24 @@ async def event_broadcaster(manager: ConnectionManager): execution_bus = AsyncRedisExecutionEventBus() notification_bus = AsyncRedisNotificationEventBus() - async def execution_worker(): - async for event in execution_bus.listen("*"): - await manager.send_execution_update(event) + try: - async def notification_worker(): - async for notification in notification_bus.listen("*"): - await manager.send_notification( - user_id=notification.user_id, - payload=notification.payload, - ) + async def execution_worker(): + async for event in execution_bus.listen("*"): + await manager.send_execution_update(event) - await asyncio.gather(execution_worker(), notification_worker()) + async def notification_worker(): + async for notification in notification_bus.listen("*"): + await manager.send_notification( + user_id=notification.user_id, + payload=notification.payload, + ) + + await asyncio.gather(execution_worker(), notification_worker()) + finally: + # Ensure PubSub connections are closed on any exit to prevent leaks + await execution_bus.close() + await notification_bus.close() async def authenticate_websocket(websocket: WebSocket) -> str: diff --git a/autogpt_platform/backend/backend/data/event_bus.py b/autogpt_platform/backend/backend/data/event_bus.py index d8a1c5b729..614fb158b2 100644 --- a/autogpt_platform/backend/backend/data/event_bus.py +++ b/autogpt_platform/backend/backend/data/event_bus.py @@ -133,10 +133,23 @@ class RedisEventBus(BaseRedisEventBus[M], ABC): class AsyncRedisEventBus(BaseRedisEventBus[M], ABC): + def __init__(self): + self._pubsub: AsyncPubSub | None = None + @property async def connection(self) -> redis.AsyncRedis: return await redis.get_redis_async() + async def close(self) -> None: + """Close the PubSub connection if it exists.""" + if self._pubsub is not None: + try: + await self._pubsub.close() + except Exception: + logger.warning("Failed to close PubSub connection", exc_info=True) + finally: + self._pubsub = None + async def publish_event(self, event: M, channel_key: str): """ Publish an event to Redis. Gracefully handles connection failures @@ -157,6 +170,7 @@ class AsyncRedisEventBus(BaseRedisEventBus[M], ABC): await self.connection, channel_key ) assert isinstance(pubsub, AsyncPubSub) + self._pubsub = pubsub if "*" in channel_key: await pubsub.psubscribe(full_channel_name)