fix(backend): Prevent leaking Redis connections in ws_api (#11869)

Fixing
https://github.com/Significant-Gravitas/AutoGPT/pull/11297#discussion_r2496833421

### Changes 🏗️

1. event_bus.py - Added close method to AsyncRedisEventBus
- Added __init__ method to track the _pubsub instance attribute
- Added async def close() method that closes the PubSub connection
safely
- Modified listen_events() to store the pubsub reference in self._pubsub

2. ws_api.py - Added cleanup in event_broadcaster
- Wrapped the worker coroutines in try/finally block
- The finally block calls close() on both event buses to ensure cleanup
happens on any exit (including exceptions before retry)
This commit is contained in:
Krzysztof Czerwinski
2026-02-03 17:07:48 +09:00
committed by GitHub
parent d81d1ce024
commit 14cee1670a
2 changed files with 30 additions and 10 deletions

View File

@@ -66,18 +66,24 @@ async def event_broadcaster(manager: ConnectionManager):
execution_bus = AsyncRedisExecutionEventBus()
notification_bus = AsyncRedisNotificationEventBus()
async def execution_worker():
async for event in execution_bus.listen("*"):
await manager.send_execution_update(event)
try:
async def notification_worker():
async for notification in notification_bus.listen("*"):
await manager.send_notification(
user_id=notification.user_id,
payload=notification.payload,
)
async def execution_worker():
async for event in execution_bus.listen("*"):
await manager.send_execution_update(event)
await asyncio.gather(execution_worker(), notification_worker())
async def notification_worker():
async for notification in notification_bus.listen("*"):
await manager.send_notification(
user_id=notification.user_id,
payload=notification.payload,
)
await asyncio.gather(execution_worker(), notification_worker())
finally:
# Ensure PubSub connections are closed on any exit to prevent leaks
await execution_bus.close()
await notification_bus.close()
async def authenticate_websocket(websocket: WebSocket) -> str:

View File

@@ -133,10 +133,23 @@ class RedisEventBus(BaseRedisEventBus[M], ABC):
class AsyncRedisEventBus(BaseRedisEventBus[M], ABC):
def __init__(self):
self._pubsub: AsyncPubSub | None = None
@property
async def connection(self) -> redis.AsyncRedis:
return await redis.get_redis_async()
async def close(self) -> None:
"""Close the PubSub connection if it exists."""
if self._pubsub is not None:
try:
await self._pubsub.close()
except Exception:
logger.warning("Failed to close PubSub connection", exc_info=True)
finally:
self._pubsub = None
async def publish_event(self, event: M, channel_key: str):
"""
Publish an event to Redis. Gracefully handles connection failures
@@ -157,6 +170,7 @@ class AsyncRedisEventBus(BaseRedisEventBus[M], ABC):
await self.connection, channel_key
)
assert isinstance(pubsub, AsyncPubSub)
self._pubsub = pubsub
if "*" in channel_key:
await pubsub.psubscribe(full_channel_name)