mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
fix(backend/copilot): harden Redis NX lock in append_and_save_message
- Replace sleep anti-pattern with a proper retry loop (polls every 50ms for up to 2s) so waiters always acquire the lock before writing, never proceed lockless after a single sleep. - Wrap redis.delete() in try/except so a cleanup error after a successful write doesn't surface a false 500 and invite duplicate retries. - Invalidate the stale cache entry on cache-write failure so future reads fall back to authoritative DB, preventing a retry from bypassing the idempotency check with stale cache state. - Remove the now-redundant asyncio.Lock wrapper: the Redis retry loop serialises within-pod races too (Redis SET NX is atomic), and when Redis is down the whole chat stack is broken regardless.
This commit is contained in:
@@ -654,74 +654,92 @@ async def _save_session_to_db(
|
||||
async def append_and_save_message(session_id: str, message: ChatMessage) -> ChatSession:
|
||||
"""Atomically append a message to a session and persist it.
|
||||
|
||||
Two-layer locking prevents duplicate DB writes from infra/nginx retries:
|
||||
- asyncio.Lock (process-local): serialises concurrent calls within one pod.
|
||||
- Redis NX lock (distributed): prevents the same race across replicas.
|
||||
|
||||
The Redis lock is released as soon as the write completes; the TTL (10s)
|
||||
is a crash-safety net so a dead pod never holds the lock indefinitely.
|
||||
A Redis NX lock serialises concurrent writers — both within a pod and across
|
||||
replicas — so the idempotency check below always sees the authoritative state.
|
||||
The lock is released as soon as the write completes; the TTL (10s) is a
|
||||
crash-safety net. Redis errors degrade gracefully: the idempotency check
|
||||
still prevents most duplicates even without the distributed lock.
|
||||
"""
|
||||
lock = await _get_session_lock(session_id)
|
||||
|
||||
async with lock:
|
||||
# Distributed lock — protects the read-check-write across replicas.
|
||||
# A retry routed to a different pod acquires this lock, waits for the
|
||||
# first pod to finish writing, then hits the idempotency check below.
|
||||
# TTL (10s) is a crash-safety net; the lock is always released explicitly
|
||||
# as soon as the write completes (same pattern as rate_limit.py).
|
||||
# Distributed lock — protects the read-check-write across replicas.
|
||||
# Retries for up to 2s so a pod waiting behind a concurrent writer keeps
|
||||
# polling until the lock is free, then hits the idempotency check below.
|
||||
_lock_key = f"copilot:msg_append:{session_id}"
|
||||
_redis = None
|
||||
acquired = False
|
||||
try:
|
||||
_redis = await get_redis_async()
|
||||
_lock_key = f"copilot:msg_append:{session_id}"
|
||||
acquired = bool(await _redis.set(_lock_key, "1", nx=True, ex=10))
|
||||
deadline = asyncio.get_event_loop().time() + 2.0
|
||||
while asyncio.get_event_loop().time() < deadline:
|
||||
if await _redis.set(_lock_key, "1", nx=True, ex=10):
|
||||
acquired = True
|
||||
break
|
||||
await asyncio.sleep(0.05)
|
||||
if not acquired:
|
||||
# Another pod is mid-write for this session. Wait briefly so it
|
||||
# finishes, then let the idempotency check below short-circuit.
|
||||
await asyncio.sleep(0.5)
|
||||
logger.warning(
|
||||
"Could not acquire msg_append lock for session %s within 2s",
|
||||
session_id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Redis unavailable for msg_append lock on session %s: %s",
|
||||
session_id,
|
||||
e,
|
||||
)
|
||||
|
||||
try:
|
||||
session = await get_chat_session(session_id)
|
||||
if session is None:
|
||||
raise ValueError(f"Session {session_id} not found")
|
||||
|
||||
# Idempotency: skip if the trailing message already matches this one.
|
||||
# This collapses infra/nginx retries whether they land on the same pod
|
||||
# (serialised by the Redis lock above) or a different pod.
|
||||
#
|
||||
# Legit same-text messages are distinguished by the assistant turn
|
||||
# between them: if the user said "yes", got a response, and says
|
||||
# "yes" again, session.messages[-1] is the assistant reply, so the
|
||||
# role check fails and the second message goes through normally.
|
||||
#
|
||||
# Edge case: if a turn dies without writing any assistant message,
|
||||
# the user's next send of the same text is blocked here permanently.
|
||||
# The fix is to ensure failed turns always write an error/timeout
|
||||
# assistant message so the session always ends on an assistant turn.
|
||||
if (
|
||||
session.messages
|
||||
and session.messages[-1].role == message.role
|
||||
and session.messages[-1].content == message.content
|
||||
):
|
||||
return session
|
||||
|
||||
session.messages.append(message)
|
||||
existing_message_count = await chat_db().get_next_sequence(session_id)
|
||||
|
||||
try:
|
||||
session = await get_chat_session(session_id)
|
||||
if session is None:
|
||||
raise ValueError(f"Session {session_id} not found")
|
||||
await _save_session_to_db(session, existing_message_count)
|
||||
except Exception as e:
|
||||
raise DatabaseError(
|
||||
f"Failed to persist message to session {session_id}"
|
||||
) from e
|
||||
|
||||
# Idempotency: skip if the trailing message already matches this one.
|
||||
# This collapses infra/nginx retries whether they land on the same
|
||||
# pod (asyncio lock serialises them) or a different pod (Redis lock
|
||||
# + this check covers the cross-replica case).
|
||||
#
|
||||
# Legit same-text messages are distinguished by the assistant turn
|
||||
# between them: if the user said "yes", got a response, and says
|
||||
# "yes" again, session.messages[-1] is the assistant reply, so the
|
||||
# role check fails and the second message goes through normally.
|
||||
#
|
||||
# Edge case: if a turn dies without writing any assistant message,
|
||||
# the user's next send of the same text is blocked here permanently.
|
||||
# The fix is to ensure failed turns always write an error/timeout
|
||||
# assistant message so the session always ends on an assistant turn.
|
||||
if (
|
||||
session.messages
|
||||
and session.messages[-1].role == message.role
|
||||
and session.messages[-1].content == message.content
|
||||
):
|
||||
return session
|
||||
|
||||
session.messages.append(message)
|
||||
existing_message_count = await chat_db().get_next_sequence(session_id)
|
||||
try:
|
||||
await cache_chat_session(session)
|
||||
except Exception as e:
|
||||
logger.warning(f"Cache write failed for session {session_id}: {e}")
|
||||
# Invalidate the stale entry so future reads fall back to DB,
|
||||
# preventing a retry from bypassing the idempotency check above.
|
||||
if _redis is not None:
|
||||
try:
|
||||
await _redis.delete(_get_session_cache_key(session_id))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return session
|
||||
finally:
|
||||
if acquired and _redis is not None:
|
||||
try:
|
||||
await _save_session_to_db(session, existing_message_count)
|
||||
except Exception as e:
|
||||
raise DatabaseError(
|
||||
f"Failed to persist message to session {session_id}"
|
||||
) from e
|
||||
|
||||
try:
|
||||
await cache_chat_session(session)
|
||||
except Exception as e:
|
||||
logger.warning(f"Cache write failed for session {session_id}: {e}")
|
||||
|
||||
return session
|
||||
finally:
|
||||
if acquired:
|
||||
await _redis.delete(_lock_key)
|
||||
except Exception:
|
||||
pass # TTL will expire the key
|
||||
|
||||
|
||||
async def create_chat_session(user_id: str, *, dry_run: bool) -> ChatSession:
|
||||
|
||||
Reference in New Issue
Block a user