mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
refactor(backend/copilot): replace manual NX-poll loop with redis-py built-in Lock in _get_session_lock
This commit is contained in:
@@ -1,4 +1,3 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import uuid
|
||||
from contextlib import asynccontextmanager
|
||||
@@ -848,25 +847,21 @@ async def update_session_title(
|
||||
|
||||
@asynccontextmanager
|
||||
async def _get_session_lock(session_id: str) -> AsyncIterator[None]:
|
||||
"""Distributed Redis NX lock for a session, usable as an async context manager.
|
||||
"""Distributed Redis lock for a session, usable as an async context manager.
|
||||
|
||||
Retries every 50ms for up to 2s so a waiter behind a concurrent writer keeps
|
||||
polling until the lock is free. The lock is always released explicitly on exit;
|
||||
the 10s TTL is a crash-safety net so a dead pod never holds it indefinitely.
|
||||
Uses redis-py's built-in Lock (Lua-script acquire/release) so lock acquisition
|
||||
is atomic and release is owner-verified. Blocks up to 2s for a concurrent
|
||||
writer to finish; the 10s TTL ensures a dead pod never holds the lock forever.
|
||||
On Redis failure the lock is skipped with a warning — callers should still
|
||||
apply an idempotency check as a fallback.
|
||||
"""
|
||||
_lock_key = f"copilot:session_lock:{session_id}"
|
||||
_redis = None
|
||||
lock = None
|
||||
acquired = False
|
||||
try:
|
||||
_redis = await get_redis_async()
|
||||
deadline = asyncio.get_event_loop().time() + 2.0
|
||||
while asyncio.get_event_loop().time() < deadline:
|
||||
if await _redis.set(_lock_key, "1", nx=True, ex=10):
|
||||
acquired = True
|
||||
break
|
||||
await asyncio.sleep(0.05)
|
||||
lock = _redis.lock(_lock_key, timeout=10, blocking_timeout=2)
|
||||
acquired = await lock.acquire(blocking=True)
|
||||
if not acquired:
|
||||
logger.warning(
|
||||
"Could not acquire session lock for %s within 2s", session_id
|
||||
@@ -877,8 +872,8 @@ async def _get_session_lock(session_id: str) -> AsyncIterator[None]:
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
if acquired and _redis is not None:
|
||||
if acquired and lock is not None:
|
||||
try:
|
||||
await _redis.delete(_lock_key)
|
||||
await lock.release()
|
||||
except Exception:
|
||||
pass # TTL will expire the key
|
||||
|
||||
Reference in New Issue
Block a user