refactor: move registry_pubsub cleanup into worker function

Instead of tracking pubsub externally with nonlocal, let the worker
function own and clean up its own resource in a local finally block.
This is cleaner and makes the code easier to understand for type checkers.
This commit is contained in:
Bentlybro
2026-03-02 17:14:59 +00:00
parent 14da8508da
commit 35e2157b73

View File

@@ -79,39 +79,36 @@ async def event_broadcaster(manager: ConnectionManager):
payload=notification.payload,
)
# Track registry pubsub for cleanup
from redis.asyncio.client import PubSub
registry_pubsub: PubSub | None = None
async def registry_refresh_worker():
"""Listen for LLM registry refresh notifications and broadcast to all clients."""
nonlocal registry_pubsub
from backend.data.llm_registry import REGISTRY_REFRESH_CHANNEL
from backend.data.redis_client import connect_async
redis = await connect_async()
registry_pubsub = redis.pubsub()
await registry_pubsub.subscribe(REGISTRY_REFRESH_CHANNEL)
logger.info(
"Subscribed to LLM registry refresh notifications for WebSocket broadcast"
)
pubsub = redis.pubsub()
try:
await pubsub.subscribe(REGISTRY_REFRESH_CHANNEL)
logger.info(
"Subscribed to LLM registry refresh notifications for WebSocket broadcast"
)
async for message in registry_pubsub.listen():
if (
message["type"] == "message"
and message["channel"] == REGISTRY_REFRESH_CHANNEL
):
logger.info(
"Broadcasting LLM registry refresh to all WebSocket clients"
)
await manager.broadcast_to_all(
method=WSMethod.NOTIFICATION,
data={
"type": "LLM_REGISTRY_REFRESH",
"event": "registry_updated",
},
)
async for message in pubsub.listen():
if (
message["type"] == "message"
and message["channel"] == REGISTRY_REFRESH_CHANNEL
):
logger.info(
"Broadcasting LLM registry refresh to all WebSocket clients"
)
await manager.broadcast_to_all(
method=WSMethod.NOTIFICATION,
data={
"type": "LLM_REGISTRY_REFRESH",
"event": "registry_updated",
},
)
finally:
await pubsub.close()
await asyncio.gather(
execution_worker(),
@@ -122,8 +119,6 @@ async def event_broadcaster(manager: ConnectionManager):
# Ensure PubSub connections are closed on any exit to prevent leaks
await execution_bus.close()
await notification_bus.close()
if registry_pubsub is not None: # type: ignore[reportPossiblyUnboundVariable]
await registry_pubsub.close() # type: ignore[reportUnboundVariable]
async def authenticate_websocket(websocket: WebSocket) -> str: