From a6e133c73edaeb5daaf65bb64954b75b84e5b309 Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Thu, 16 Apr 2026 20:05:40 +0700 Subject: [PATCH] fix(backend/copilot): harden Redis NX lock in append_and_save_message - Replace sleep anti-pattern with a proper retry loop (polls every 50ms for up to 2s) so waiters always acquire the lock before writing, never proceed lockless after a single sleep. - Wrap redis.delete() in try/except so a cleanup error after a successful write doesn't surface a false 500 and invite duplicate retries. - Invalidate the stale cache entry on cache-write failure so future reads fall back to authoritative DB, preventing a retry from bypassing the idempotency check with stale cache state. - Remove the now-redundant asyncio.Lock wrapper: the Redis retry loop serialises within-pod races too (Redis SET NX is atomic), and when Redis is down the whole chat stack is broken regardless. --- .../backend/backend/copilot/model.py | 136 ++++++++++-------- 1 file changed, 77 insertions(+), 59 deletions(-) diff --git a/autogpt_platform/backend/backend/copilot/model.py b/autogpt_platform/backend/backend/copilot/model.py index 8fa50b583d..aa2b567bee 100644 --- a/autogpt_platform/backend/backend/copilot/model.py +++ b/autogpt_platform/backend/backend/copilot/model.py @@ -654,74 +654,92 @@ async def _save_session_to_db( async def append_and_save_message(session_id: str, message: ChatMessage) -> ChatSession: """Atomically append a message to a session and persist it. - Two-layer locking prevents duplicate DB writes from infra/nginx retries: - - asyncio.Lock (process-local): serialises concurrent calls within one pod. - - Redis NX lock (distributed): prevents the same race across replicas. - - The Redis lock is released as soon as the write completes; the TTL (10s) - is a crash-safety net so a dead pod never holds the lock indefinitely. + A Redis NX lock serialises concurrent writers — both within a pod and across + replicas — so the idempotency check below always sees the authoritative state. + The lock is released as soon as the write completes; the TTL (10s) is a + crash-safety net. Redis errors degrade gracefully: the idempotency check + still prevents most duplicates even without the distributed lock. """ - lock = await _get_session_lock(session_id) - - async with lock: - # Distributed lock — protects the read-check-write across replicas. - # A retry routed to a different pod acquires this lock, waits for the - # first pod to finish writing, then hits the idempotency check below. - # TTL (10s) is a crash-safety net; the lock is always released explicitly - # as soon as the write completes (same pattern as rate_limit.py). + # Distributed lock — protects the read-check-write across replicas. + # Retries for up to 2s so a pod waiting behind a concurrent writer keeps + # polling until the lock is free, then hits the idempotency check below. + _lock_key = f"copilot:msg_append:{session_id}" + _redis = None + acquired = False + try: _redis = await get_redis_async() - _lock_key = f"copilot:msg_append:{session_id}" - acquired = bool(await _redis.set(_lock_key, "1", nx=True, ex=10)) + deadline = asyncio.get_event_loop().time() + 2.0 + while asyncio.get_event_loop().time() < deadline: + if await _redis.set(_lock_key, "1", nx=True, ex=10): + acquired = True + break + await asyncio.sleep(0.05) if not acquired: - # Another pod is mid-write for this session. Wait briefly so it - # finishes, then let the idempotency check below short-circuit. - await asyncio.sleep(0.5) + logger.warning( + "Could not acquire msg_append lock for session %s within 2s", + session_id, + ) + except Exception as e: + logger.warning( + "Redis unavailable for msg_append lock on session %s: %s", + session_id, + e, + ) + + try: + session = await get_chat_session(session_id) + if session is None: + raise ValueError(f"Session {session_id} not found") + + # Idempotency: skip if the trailing message already matches this one. + # This collapses infra/nginx retries whether they land on the same pod + # (serialised by the Redis lock above) or a different pod. + # + # Legit same-text messages are distinguished by the assistant turn + # between them: if the user said "yes", got a response, and says + # "yes" again, session.messages[-1] is the assistant reply, so the + # role check fails and the second message goes through normally. + # + # Edge case: if a turn dies without writing any assistant message, + # the user's next send of the same text is blocked here permanently. + # The fix is to ensure failed turns always write an error/timeout + # assistant message so the session always ends on an assistant turn. + if ( + session.messages + and session.messages[-1].role == message.role + and session.messages[-1].content == message.content + ): + return session + + session.messages.append(message) + existing_message_count = await chat_db().get_next_sequence(session_id) try: - session = await get_chat_session(session_id) - if session is None: - raise ValueError(f"Session {session_id} not found") + await _save_session_to_db(session, existing_message_count) + except Exception as e: + raise DatabaseError( + f"Failed to persist message to session {session_id}" + ) from e - # Idempotency: skip if the trailing message already matches this one. - # This collapses infra/nginx retries whether they land on the same - # pod (asyncio lock serialises them) or a different pod (Redis lock - # + this check covers the cross-replica case). - # - # Legit same-text messages are distinguished by the assistant turn - # between them: if the user said "yes", got a response, and says - # "yes" again, session.messages[-1] is the assistant reply, so the - # role check fails and the second message goes through normally. - # - # Edge case: if a turn dies without writing any assistant message, - # the user's next send of the same text is blocked here permanently. - # The fix is to ensure failed turns always write an error/timeout - # assistant message so the session always ends on an assistant turn. - if ( - session.messages - and session.messages[-1].role == message.role - and session.messages[-1].content == message.content - ): - return session - - session.messages.append(message) - existing_message_count = await chat_db().get_next_sequence(session_id) + try: + await cache_chat_session(session) + except Exception as e: + logger.warning(f"Cache write failed for session {session_id}: {e}") + # Invalidate the stale entry so future reads fall back to DB, + # preventing a retry from bypassing the idempotency check above. + if _redis is not None: + try: + await _redis.delete(_get_session_cache_key(session_id)) + except Exception: + pass + return session + finally: + if acquired and _redis is not None: try: - await _save_session_to_db(session, existing_message_count) - except Exception as e: - raise DatabaseError( - f"Failed to persist message to session {session_id}" - ) from e - - try: - await cache_chat_session(session) - except Exception as e: - logger.warning(f"Cache write failed for session {session_id}: {e}") - - return session - finally: - if acquired: await _redis.delete(_lock_key) + except Exception: + pass # TTL will expire the key async def create_chat_session(user_id: str, *, dry_run: bool) -> ChatSession: