mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-14 00:35:02 -05:00
fix(ws): close registry pubsub connection on shutdown
Track the registry_pubsub connection and close it in the finally block to prevent Redis connection leaks on WebSocket server shutdown.
This commit is contained in:
@@ -79,19 +79,23 @@ async def event_broadcaster(manager: ConnectionManager):
|
||||
payload=notification.payload,
|
||||
)
|
||||
|
||||
# Track registry pubsub for cleanup
|
||||
registry_pubsub = None
|
||||
|
||||
async def registry_refresh_worker():
|
||||
"""Listen for LLM registry refresh notifications and broadcast to all clients."""
|
||||
nonlocal registry_pubsub
|
||||
from backend.data.llm_registry import REGISTRY_REFRESH_CHANNEL
|
||||
from backend.data.redis_client import connect_async
|
||||
|
||||
redis = await connect_async()
|
||||
pubsub = redis.pubsub()
|
||||
await pubsub.subscribe(REGISTRY_REFRESH_CHANNEL)
|
||||
registry_pubsub = redis.pubsub()
|
||||
await registry_pubsub.subscribe(REGISTRY_REFRESH_CHANNEL)
|
||||
logger.info(
|
||||
"Subscribed to LLM registry refresh notifications for WebSocket broadcast"
|
||||
)
|
||||
|
||||
async for message in pubsub.listen():
|
||||
async for message in registry_pubsub.listen():
|
||||
if (
|
||||
message["type"] == "message"
|
||||
and message["channel"] == REGISTRY_REFRESH_CHANNEL
|
||||
@@ -116,6 +120,8 @@ async def event_broadcaster(manager: ConnectionManager):
|
||||
# Ensure PubSub connections are closed on any exit to prevent leaks
|
||||
await execution_bus.close()
|
||||
await notification_bus.close()
|
||||
if registry_pubsub:
|
||||
await registry_pubsub.close()
|
||||
|
||||
|
||||
async def authenticate_websocket(websocket: WebSocket) -> str:
|
||||
|
||||
Reference in New Issue
Block a user