From 266526f08c7e68f0d4206961606dd9951f94ddac Mon Sep 17 00:00:00 2001 From: Bentlybro Date: Fri, 13 Feb 2026 15:12:55 +0000 Subject: [PATCH] fix(ws): close registry pubsub connection on shutdown Track the registry_pubsub connection and close it in the finally block to prevent Redis connection leaks on WebSocket server shutdown. --- autogpt_platform/backend/backend/api/ws_api.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/autogpt_platform/backend/backend/api/ws_api.py b/autogpt_platform/backend/backend/api/ws_api.py index d837473f63..4845518445 100644 --- a/autogpt_platform/backend/backend/api/ws_api.py +++ b/autogpt_platform/backend/backend/api/ws_api.py @@ -79,19 +79,23 @@ async def event_broadcaster(manager: ConnectionManager): payload=notification.payload, ) + # Track registry pubsub for cleanup + registry_pubsub = None + async def registry_refresh_worker(): """Listen for LLM registry refresh notifications and broadcast to all clients.""" + nonlocal registry_pubsub from backend.data.llm_registry import REGISTRY_REFRESH_CHANNEL from backend.data.redis_client import connect_async redis = await connect_async() - pubsub = redis.pubsub() - await pubsub.subscribe(REGISTRY_REFRESH_CHANNEL) + registry_pubsub = redis.pubsub() + await registry_pubsub.subscribe(REGISTRY_REFRESH_CHANNEL) logger.info( "Subscribed to LLM registry refresh notifications for WebSocket broadcast" ) - async for message in pubsub.listen(): + async for message in registry_pubsub.listen(): if ( message["type"] == "message" and message["channel"] == REGISTRY_REFRESH_CHANNEL @@ -116,6 +120,8 @@ async def event_broadcaster(manager: ConnectionManager): # Ensure PubSub connections are closed on any exit to prevent leaks await execution_bus.close() await notification_bus.close() + if registry_pubsub: + await registry_pubsub.close() async def authenticate_websocket(websocket: WebSocket) -> str: