From 862c0d9efb165e679dbb0b6decf827edd264d03f Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Thu, 16 Apr 2026 19:44:25 +0700 Subject: [PATCH] fix(backend/copilot): add Redis distributed lock to append_and_save_message The asyncio.Lock is process-local and only serialises concurrent calls within one pod. In a multi-replica deployment, two infra retries routed to different pods can both read a stale session state and both write the same message to the DB. Fix: add a short-lived Redis NX lock (10s TTL) inside append_and_save_message. The lock covers the full read-check-write cycle and is released as soon as the write completes. A replica that cannot acquire the lock waits 0.5s for the first write to land, then hits the idempotency check which short-circuits. The asyncio.Lock is kept for within-pod serialisation (avoids redundant Redis round trips for concurrent in-process calls). --- .../backend/backend/copilot/model.py | 102 ++++++++++-------- 1 file changed, 60 insertions(+), 42 deletions(-) diff --git a/autogpt_platform/backend/backend/copilot/model.py b/autogpt_platform/backend/backend/copilot/model.py index 6b9dff9ec5..56c49d8952 100644 --- a/autogpt_platform/backend/backend/copilot/model.py +++ b/autogpt_platform/backend/backend/copilot/model.py @@ -654,54 +654,72 @@ async def _save_session_to_db( async def append_and_save_message(session_id: str, message: ChatMessage) -> ChatSession: """Atomically append a message to a session and persist it. - Acquires the session lock, re-fetches the latest session state, - appends the message, and saves — preventing message loss when - concurrent requests modify the same session. + Two-layer locking prevents duplicate DB writes from infra/nginx retries: + - asyncio.Lock (process-local): serialises concurrent calls within one pod. + - Redis NX lock (distributed): prevents the same race across replicas. + + The Redis lock is released as soon as the write completes; the TTL (10s) + is a crash-safety net so a dead pod never holds the lock indefinitely. """ lock = await _get_session_lock(session_id) async with lock: - session = await get_chat_session(session_id) - if session is None: - raise ValueError(f"Session {session_id} not found") + # Distributed lock — protects the read-check-write across replicas. + # A retry routed to a different pod acquires this lock, waits for the + # first pod to finish writing, then hits the idempotency check below. + redis_lock_key = f"copilot:msg_append:{session_id}" + redis = await get_redis_async() + acquired = await redis.set(redis_lock_key, "1", nx=True, ex=10) + if not acquired: + # Another pod is mid-write for this session. Wait briefly so it + # finishes, then let the idempotency check below short-circuit. + await asyncio.sleep(0.5) + + try: + session = await get_chat_session(session_id) + if session is None: + raise ValueError(f"Session {session_id} not found") + + # Idempotency: skip if the trailing message already matches this one. + # This collapses infra/nginx retries whether they land on the same + # pod (asyncio lock serialises them) or a different pod (Redis lock + # + this check covers the cross-replica case). + # + # Legit same-text messages are distinguished by the assistant turn + # between them: if the user said "yes", got a response, and says + # "yes" again, session.messages[-1] is the assistant reply, so the + # role check fails and the second message goes through normally. + # + # Edge case: if a turn dies without writing any assistant message, + # the user's next send of the same text is blocked here permanently. + # The fix is to ensure failed turns always write an error/timeout + # assistant message so the session always ends on an assistant turn. + if ( + session.messages + and session.messages[-1].role == message.role + and session.messages[-1].content == message.content + ): + return session + + session.messages.append(message) + existing_message_count = await chat_db().get_next_sequence(session_id) + + try: + await _save_session_to_db(session, existing_message_count) + except Exception as e: + raise DatabaseError( + f"Failed to persist message to session {session_id}" + ) from e + + try: + await cache_chat_session(session) + except Exception as e: + logger.warning(f"Cache write failed for session {session_id}: {e}") - # Idempotency: skip if the trailing message already matches this one. - # This collapses infra/nginx retries that arrive before the executor - # picks up the turn (the executor's cluster lock handles duplicate - # *execution*; this handles duplicate *DB writes*). - # - # Legit same-text messages are distinguished by the assistant turn - # between them: if the user said "yes", got a response, and says "yes" - # again, session.messages[-1] is the assistant reply, so the role check - # fails and the second message goes through normally. - # - # Edge case: if a turn dies without writing any assistant message, the - # user's next send of the same text will be blocked here permanently. - # The fix for that is to ensure failed turns always write an error/ - # timeout assistant message so the session always ends on an assistant. - if ( - session.messages - and session.messages[-1].role == message.role - and session.messages[-1].content == message.content - ): return session - - session.messages.append(message) - existing_message_count = await chat_db().get_next_sequence(session_id) - - try: - await _save_session_to_db(session, existing_message_count) - except Exception as e: - raise DatabaseError( - f"Failed to persist message to session {session_id}" - ) from e - - try: - await cache_chat_session(session) - except Exception as e: - logger.warning(f"Cache write failed for session {session_id}: {e}") - - return session + finally: + if acquired: + await redis.delete(redis_lock_key) async def create_chat_session(user_id: str, *, dry_run: bool) -> ChatSession: