mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
fix(backend/copilot): add Redis distributed lock to append_and_save_message
The asyncio.Lock is process-local and only serialises concurrent calls within one pod. In a multi-replica deployment, two infra retries routed to different pods can both read a stale session state and both write the same message to the DB. Fix: add a short-lived Redis NX lock (10s TTL) inside append_and_save_message. The lock covers the full read-check-write cycle and is released as soon as the write completes. A replica that cannot acquire the lock waits 0.5s for the first write to land, then hits the idempotency check which short-circuits. The asyncio.Lock is kept for within-pod serialisation (avoids redundant Redis round trips for concurrent in-process calls).
This commit is contained in:
@@ -654,54 +654,72 @@ async def _save_session_to_db(
|
||||
async def append_and_save_message(session_id: str, message: ChatMessage) -> ChatSession:
|
||||
"""Atomically append a message to a session and persist it.
|
||||
|
||||
Acquires the session lock, re-fetches the latest session state,
|
||||
appends the message, and saves — preventing message loss when
|
||||
concurrent requests modify the same session.
|
||||
Two-layer locking prevents duplicate DB writes from infra/nginx retries:
|
||||
- asyncio.Lock (process-local): serialises concurrent calls within one pod.
|
||||
- Redis NX lock (distributed): prevents the same race across replicas.
|
||||
|
||||
The Redis lock is released as soon as the write completes; the TTL (10s)
|
||||
is a crash-safety net so a dead pod never holds the lock indefinitely.
|
||||
"""
|
||||
lock = await _get_session_lock(session_id)
|
||||
|
||||
async with lock:
|
||||
session = await get_chat_session(session_id)
|
||||
if session is None:
|
||||
raise ValueError(f"Session {session_id} not found")
|
||||
# Distributed lock — protects the read-check-write across replicas.
|
||||
# A retry routed to a different pod acquires this lock, waits for the
|
||||
# first pod to finish writing, then hits the idempotency check below.
|
||||
redis_lock_key = f"copilot:msg_append:{session_id}"
|
||||
redis = await get_redis_async()
|
||||
acquired = await redis.set(redis_lock_key, "1", nx=True, ex=10)
|
||||
if not acquired:
|
||||
# Another pod is mid-write for this session. Wait briefly so it
|
||||
# finishes, then let the idempotency check below short-circuit.
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
try:
|
||||
session = await get_chat_session(session_id)
|
||||
if session is None:
|
||||
raise ValueError(f"Session {session_id} not found")
|
||||
|
||||
# Idempotency: skip if the trailing message already matches this one.
|
||||
# This collapses infra/nginx retries whether they land on the same
|
||||
# pod (asyncio lock serialises them) or a different pod (Redis lock
|
||||
# + this check covers the cross-replica case).
|
||||
#
|
||||
# Legit same-text messages are distinguished by the assistant turn
|
||||
# between them: if the user said "yes", got a response, and says
|
||||
# "yes" again, session.messages[-1] is the assistant reply, so the
|
||||
# role check fails and the second message goes through normally.
|
||||
#
|
||||
# Edge case: if a turn dies without writing any assistant message,
|
||||
# the user's next send of the same text is blocked here permanently.
|
||||
# The fix is to ensure failed turns always write an error/timeout
|
||||
# assistant message so the session always ends on an assistant turn.
|
||||
if (
|
||||
session.messages
|
||||
and session.messages[-1].role == message.role
|
||||
and session.messages[-1].content == message.content
|
||||
):
|
||||
return session
|
||||
|
||||
session.messages.append(message)
|
||||
existing_message_count = await chat_db().get_next_sequence(session_id)
|
||||
|
||||
try:
|
||||
await _save_session_to_db(session, existing_message_count)
|
||||
except Exception as e:
|
||||
raise DatabaseError(
|
||||
f"Failed to persist message to session {session_id}"
|
||||
) from e
|
||||
|
||||
try:
|
||||
await cache_chat_session(session)
|
||||
except Exception as e:
|
||||
logger.warning(f"Cache write failed for session {session_id}: {e}")
|
||||
|
||||
# Idempotency: skip if the trailing message already matches this one.
|
||||
# This collapses infra/nginx retries that arrive before the executor
|
||||
# picks up the turn (the executor's cluster lock handles duplicate
|
||||
# *execution*; this handles duplicate *DB writes*).
|
||||
#
|
||||
# Legit same-text messages are distinguished by the assistant turn
|
||||
# between them: if the user said "yes", got a response, and says "yes"
|
||||
# again, session.messages[-1] is the assistant reply, so the role check
|
||||
# fails and the second message goes through normally.
|
||||
#
|
||||
# Edge case: if a turn dies without writing any assistant message, the
|
||||
# user's next send of the same text will be blocked here permanently.
|
||||
# The fix for that is to ensure failed turns always write an error/
|
||||
# timeout assistant message so the session always ends on an assistant.
|
||||
if (
|
||||
session.messages
|
||||
and session.messages[-1].role == message.role
|
||||
and session.messages[-1].content == message.content
|
||||
):
|
||||
return session
|
||||
|
||||
session.messages.append(message)
|
||||
existing_message_count = await chat_db().get_next_sequence(session_id)
|
||||
|
||||
try:
|
||||
await _save_session_to_db(session, existing_message_count)
|
||||
except Exception as e:
|
||||
raise DatabaseError(
|
||||
f"Failed to persist message to session {session_id}"
|
||||
) from e
|
||||
|
||||
try:
|
||||
await cache_chat_session(session)
|
||||
except Exception as e:
|
||||
logger.warning(f"Cache write failed for session {session_id}: {e}")
|
||||
|
||||
return session
|
||||
finally:
|
||||
if acquired:
|
||||
await redis.delete(redis_lock_key)
|
||||
|
||||
|
||||
async def create_chat_session(user_id: str, *, dry_run: bool) -> ChatSession:
|
||||
|
||||
Reference in New Issue
Block a user