From ec0e31e72fdee4c7e0bda668c53f82bcf24404ae Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Thu, 16 Apr 2026 19:29:15 +0700 Subject: [PATCH] fix(backend/copilot): replace dedup lock with idempotent append_and_save_message MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Redis dedup lock (30s TTL) was overly complex and only partially effective: - On GeneratorExit (client disconnect), it deliberately held the lock for 30s to block infra retries — but the executor's cluster lock already prevents duplicate execution. - The only real gap was duplicate DB writes in the ~1s RabbitMQ transit window before the executor acquires the cluster lock. Fix: make append_and_save_message idempotent by checking whether the trailing message in the session already matches the incoming role+content before appending. This collapses infra/nginx retries at the DB layer without any Redis coordination, and removes the entire message_dedup module. --- .../backend/api/features/chat/routes.py | 48 +--- .../backend/api/features/chat/routes_test.py | 256 +----------------- .../backend/backend/copilot/message_dedup.py | 58 ---- .../backend/copilot/message_dedup_test.py | 94 ------- .../backend/backend/copilot/model.py | 11 + 5 files changed, 17 insertions(+), 450 deletions(-) delete mode 100644 autogpt_platform/backend/backend/copilot/message_dedup.py delete mode 100644 autogpt_platform/backend/backend/copilot/message_dedup_test.py diff --git a/autogpt_platform/backend/backend/api/features/chat/routes.py b/autogpt_platform/backend/backend/api/features/chat/routes.py index 5c96632bd2..7446ddea22 100644 --- a/autogpt_platform/backend/backend/api/features/chat/routes.py +++ b/autogpt_platform/backend/backend/api/features/chat/routes.py @@ -18,7 +18,6 @@ from backend.copilot import stream_registry from backend.copilot.config import ChatConfig, CopilotLlmModel, CopilotMode from backend.copilot.db import get_chat_messages_paginated from backend.copilot.executor.utils import enqueue_cancel_task, enqueue_copilot_turn -from backend.copilot.message_dedup import acquire_dedup_lock from backend.copilot.model import ( ChatMessage, ChatSession, @@ -846,9 +845,6 @@ async def stream_chat_post( # Also sanitise file_ids so only validated, workspace-scoped IDs are # forwarded downstream (e.g. to the executor via enqueue_copilot_turn). sanitized_file_ids: list[str] | None = None - # Capture the original message text BEFORE any mutation (attachment enrichment) - # so the idempotency hash is stable across retries. - original_message = request.message if request.file_ids and user_id: # Filter to valid UUIDs only to prevent DB abuse valid_ids = [fid for fid in request.file_ids if _UUID_RE.match(fid)] @@ -877,38 +873,10 @@ async def stream_chat_post( ) request.message += files_block - # ── Idempotency guard ──────────────────────────────────────────────────── - # Blocks duplicate executor tasks from concurrent/retried POSTs. - # See backend/copilot/message_dedup.py for the full lifecycle description. - dedup_lock = None - if request.is_user_message: - dedup_lock = await acquire_dedup_lock( - session_id, original_message, sanitized_file_ids - ) - if dedup_lock is None and (original_message or sanitized_file_ids): - - async def _empty_sse() -> AsyncGenerator[str, None]: - yield StreamFinish().to_sse() - yield "data: [DONE]\n\n" - - return StreamingResponse( - _empty_sse(), - media_type="text/event-stream", - headers={ - "Cache-Control": "no-cache", - "X-Accel-Buffering": "no", - "Connection": "keep-alive", - "x-vercel-ai-ui-message-stream": "v1", - }, - ) - # Atomically append user message to session BEFORE creating task to avoid # race condition where GET_SESSION sees task as "running" but message isn't - # saved yet. append_and_save_message re-fetches inside a lock to prevent - # message loss from concurrent requests. - # - # If any of these operations raises, release the dedup lock before propagating - # so subsequent retries are not blocked for 30 s. + # saved yet. append_and_save_message is idempotent — duplicate POSTs with + # the same content are silently skipped at the DB layer. try: if request.message: message = ChatMessage( @@ -959,8 +927,6 @@ async def stream_chat_post( model=request.model, ) except Exception: - if dedup_lock: - await dedup_lock.release() raise setup_time = (time.perf_counter() - stream_start_time) * 1000 @@ -985,13 +951,6 @@ async def stream_chat_post( subscriber_queue = None first_chunk_yielded = False chunks_yielded = 0 - # On client disconnect the message is already persisted in the DB but - # the executor hasn't yet acquired its cluster lock. Releasing the dedup - # lock immediately would let an infra retry (k8s, nginx) re-save the - # same message before the cluster lock blocks it, causing a duplicate - # DB entry. The 5 s TTL covers the ~1 s RabbitMQ-transit window. - # All other exits (normal finish, error) release immediately. - release_dedup_lock_on_exit = True try: # Subscribe from the position we captured before enqueuing # This avoids replaying old messages while catching all new ones @@ -1061,7 +1020,6 @@ async def stream_chat_post( } }, ) - release_dedup_lock_on_exit = False except Exception as e: elapsed = (time_module.perf_counter() - event_gen_start) * 1000 logger.error( @@ -1077,8 +1035,6 @@ async def stream_chat_post( ).to_sse() yield StreamFinish().to_sse() finally: - if dedup_lock and release_dedup_lock_on_exit: - await dedup_lock.release() # Unsubscribe when client disconnects or stream ends if subscriber_queue is not None: try: diff --git a/autogpt_platform/backend/backend/api/features/chat/routes_test.py b/autogpt_platform/backend/backend/api/features/chat/routes_test.py index 597aad01ad..7b9d89dee3 100644 --- a/autogpt_platform/backend/backend/api/features/chat/routes_test.py +++ b/autogpt_platform/backend/backend/api/features/chat/routes_test.py @@ -133,21 +133,12 @@ def test_stream_chat_rejects_too_many_file_ids(): assert response.status_code == 422 -def _mock_stream_internals( - mocker: pytest_mock.MockerFixture, - *, - redis_set_returns: object = True, -): +def _mock_stream_internals(mocker: pytest_mock.MockerFixture): """Mock the async internals of stream_chat_post so tests can exercise - validation and enrichment logic without needing Redis/RabbitMQ. - - Args: - redis_set_returns: Value returned by the mocked Redis ``set`` call. - ``True`` (default) simulates a fresh key (new message); - ``None`` simulates a collision (duplicate blocked). + validation and enrichment logic without needing RabbitMQ. Returns: - A namespace with ``redis``, ``save``, and ``enqueue`` mock objects so + A namespace with ``save`` and ``enqueue`` mock objects so callers can make additional assertions about side-effects. """ import types @@ -174,15 +165,7 @@ def _mock_stream_internals( "backend.api.features.chat.routes.track_user_message", return_value=None, ) - mock_redis = AsyncMock() - mock_redis.set = AsyncMock(return_value=redis_set_returns) - mocker.patch( - "backend.copilot.message_dedup.get_redis_async", - new_callable=AsyncMock, - return_value=mock_redis, - ) - ns = types.SimpleNamespace(redis=mock_redis, save=mock_save, enqueue=mock_enqueue) - return ns + return types.SimpleNamespace(save=mock_save, enqueue=mock_enqueue) def test_stream_chat_accepts_20_file_ids(mocker: pytest_mock.MockerFixture): @@ -706,237 +689,6 @@ class TestStripInjectedContext: assert result["content"] == "hello" -# ─── Idempotency / duplicate-POST guard ────────────────────────────── - - -def test_stream_chat_blocks_duplicate_post_returns_empty_sse( - mocker: pytest_mock.MockerFixture, -) -> None: - """A second POST with the same message within the 30-s window must return - an empty SSE stream (StreamFinish + [DONE]) so the frontend marks the - turn complete without creating a ghost response.""" - # redis_set_returns=None simulates a collision: the NX key already exists. - ns = _mock_stream_internals(mocker, redis_set_returns=None) - - response = client.post( - "/sessions/sess-dup/stream", - json={"message": "duplicate message", "is_user_message": True}, - ) - - assert response.status_code == 200 - body = response.text - # The response must contain StreamFinish (type=finish) and the SSE [DONE] terminator. - assert '"finish"' in body - assert "[DONE]" in body - # The empty SSE response must include the AI SDK protocol header so the - # frontend treats it as a valid stream and marks the turn complete. - assert response.headers.get("x-vercel-ai-ui-message-stream") == "v1" - # The duplicate guard must prevent save/enqueue side effects. - ns.save.assert_not_called() - ns.enqueue.assert_not_called() - - -def test_stream_chat_first_post_proceeds_normally( - mocker: pytest_mock.MockerFixture, -) -> None: - """The first POST (Redis NX key set successfully) must proceed through the - normal streaming path — no early return.""" - ns = _mock_stream_internals(mocker, redis_set_returns=True) - - response = client.post( - "/sessions/sess-new/stream", - json={"message": "first message", "is_user_message": True}, - ) - - assert response.status_code == 200 - # Redis set must have been called once with the NX flag. - ns.redis.set.assert_called_once() - call_kwargs = ns.redis.set.call_args - assert call_kwargs.kwargs.get("nx") is True - - -def test_stream_chat_dedup_skipped_for_non_user_messages( - mocker: pytest_mock.MockerFixture, -) -> None: - """System/assistant messages (is_user_message=False) bypass the dedup - guard — they are injected programmatically and must always be processed.""" - ns = _mock_stream_internals(mocker, redis_set_returns=None) - - response = client.post( - "/sessions/sess-sys/stream", - json={"message": "system context", "is_user_message": False}, - ) - - # Even though redis_set_returns=None (would block a user message), - # the endpoint must proceed because is_user_message=False. - assert response.status_code == 200 - ns.redis.set.assert_not_called() - - -def test_stream_chat_dedup_hash_uses_original_message_not_mutated( - mocker: pytest_mock.MockerFixture, -) -> None: - """The dedup hash must be computed from the original request message, - not the mutated version that has the [Attached files] block appended. - A file_id is sent so the route actually appends the [Attached files] block, - exercising the mutation path — the hash must still match the original text.""" - import hashlib - - ns = _mock_stream_internals(mocker, redis_set_returns=True) - - file_id = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" - # Mock workspace + prisma so the attachment block is actually appended. - mocker.patch( - "backend.api.features.chat.routes.get_or_create_workspace", - return_value=type("W", (), {"id": "ws-1"})(), - ) - fake_file = type( - "F", - (), - { - "id": file_id, - "name": "doc.pdf", - "mimeType": "application/pdf", - "sizeBytes": 1024, - }, - )() - mock_prisma = mocker.MagicMock() - mock_prisma.find_many = mocker.AsyncMock(return_value=[fake_file]) - mocker.patch( - "prisma.models.UserWorkspaceFile.prisma", - return_value=mock_prisma, - ) - - response = client.post( - "/sessions/sess-hash/stream", - json={ - "message": "plain message", - "is_user_message": True, - "file_ids": [file_id], - }, - ) - - assert response.status_code == 200 - ns.redis.set.assert_called_once() - call_args = ns.redis.set.call_args - dedup_key = call_args.args[0] - - # Hash must use the original message + sorted file IDs, not the mutated text. - expected_hash = hashlib.sha256( - f"sess-hash:plain message:{file_id}".encode() - ).hexdigest()[:16] - expected_key = f"chat:msg_dedup:sess-hash:{expected_hash}" - assert dedup_key == expected_key, ( - f"Dedup key {dedup_key!r} does not match expected {expected_key!r} — " - "hash may be using mutated message or wrong inputs" - ) - - -def test_stream_chat_dedup_key_released_after_stream_finish( - mocker: pytest_mock.MockerFixture, -) -> None: - """The dedup Redis key must be deleted after the turn completes (when - subscriber_queue is None the route yields StreamFinish immediately and - should release the key so the user can re-send the same message).""" - from unittest.mock import AsyncMock as _AsyncMock - - # Set up all internals manually so we can control subscribe_to_session. - mocker.patch( - "backend.api.features.chat.routes._validate_and_get_session", - return_value=None, - ) - mocker.patch( - "backend.api.features.chat.routes.append_and_save_message", - return_value=None, - ) - mocker.patch( - "backend.api.features.chat.routes.enqueue_copilot_turn", - return_value=None, - ) - mocker.patch( - "backend.api.features.chat.routes.track_user_message", - return_value=None, - ) - mock_registry = mocker.MagicMock() - mock_registry.create_session = _AsyncMock(return_value=None) - # None → early-finish path: StreamFinish yielded immediately, dedup key released. - mock_registry.subscribe_to_session = _AsyncMock(return_value=None) - mocker.patch( - "backend.api.features.chat.routes.stream_registry", - mock_registry, - ) - mock_redis = mocker.AsyncMock() - mock_redis.set = _AsyncMock(return_value=True) - mocker.patch( - "backend.copilot.message_dedup.get_redis_async", - new_callable=_AsyncMock, - return_value=mock_redis, - ) - - response = client.post( - "/sessions/sess-finish/stream", - json={"message": "hello", "is_user_message": True}, - ) - - assert response.status_code == 200 - body = response.text - assert '"finish"' in body - # The dedup key must be released so intentional re-sends are allowed. - mock_redis.delete.assert_called_once() - - -def test_stream_chat_dedup_key_released_even_when_redis_delete_raises( - mocker: pytest_mock.MockerFixture, -) -> None: - """The route must not crash when the dedup Redis delete fails on the - subscriber_queue-is-None early-finish path (except Exception: pass).""" - from unittest.mock import AsyncMock as _AsyncMock - - mocker.patch( - "backend.api.features.chat.routes._validate_and_get_session", - return_value=None, - ) - mocker.patch( - "backend.api.features.chat.routes.append_and_save_message", - return_value=None, - ) - mocker.patch( - "backend.api.features.chat.routes.enqueue_copilot_turn", - return_value=None, - ) - mocker.patch( - "backend.api.features.chat.routes.track_user_message", - return_value=None, - ) - mock_registry = mocker.MagicMock() - mock_registry.create_session = _AsyncMock(return_value=None) - mock_registry.subscribe_to_session = _AsyncMock(return_value=None) - mocker.patch( - "backend.api.features.chat.routes.stream_registry", - mock_registry, - ) - mock_redis = mocker.AsyncMock() - mock_redis.set = _AsyncMock(return_value=True) - # Make the delete raise so the except-pass branch is exercised. - mock_redis.delete = _AsyncMock(side_effect=RuntimeError("redis gone")) - mocker.patch( - "backend.copilot.message_dedup.get_redis_async", - new_callable=_AsyncMock, - return_value=mock_redis, - ) - - # Should not raise even though delete fails. - response = client.post( - "/sessions/sess-finish-err/stream", - json={"message": "hello", "is_user_message": True}, - ) - - assert response.status_code == 200 - assert '"finish"' in response.text - # delete must have been attempted — the except-pass branch silenced the error. - mock_redis.delete.assert_called_once() - - # ─── DELETE /sessions/{id}/stream — disconnect listeners ────────────── diff --git a/autogpt_platform/backend/backend/copilot/message_dedup.py b/autogpt_platform/backend/backend/copilot/message_dedup.py deleted file mode 100644 index 6486bc272a..0000000000 --- a/autogpt_platform/backend/backend/copilot/message_dedup.py +++ /dev/null @@ -1,58 +0,0 @@ -"""Per-request idempotency lock for the /stream endpoint. - -Blocks duplicate executor tasks from concurrent or retried POSTs (e.g. k8s -rolling-deploy retries, nginx upstream retries, rapid double-clicks). - -The lock only needs to cover the brief window between the HTTP POST and the -executor acquiring the cluster-wide session lock (~1 s). The 5 s TTL is a -crash-safety fallback; the lock is always deleted explicitly on exit. -""" - -import hashlib -import logging - -from backend.data.redis_client import get_redis_async - -logger = logging.getLogger(__name__) - -_KEY_PREFIX = "chat:msg_dedup" -_TTL_SECONDS = 5 - - -class DedupLock: - def __init__(self, key: str, redis) -> None: - self._key = key - self._redis = redis - - async def release(self) -> None: - try: - await self._redis.delete(self._key) - except Exception: - pass - - -async def acquire_dedup_lock( - session_id: str, - message: str | None, - file_ids: list[str] | None, -) -> DedupLock | None: - """Return a DedupLock if this is a new request, or None if it is a duplicate.""" - if not message and not file_ids: - return None - - sorted_ids = ":".join(sorted(file_ids or [])) - content_hash = hashlib.sha256( - f"{session_id}:{message or ''}:{sorted_ids}".encode() - ).hexdigest()[:16] - key = f"{_KEY_PREFIX}:{session_id}:{content_hash}" - - redis = await get_redis_async() - if not await redis.set(key, "1", ex=_TTL_SECONDS, nx=True): - logger.warning( - "[STREAM] Duplicate message blocked session=%s hash=%s", - session_id, - content_hash, - ) - return None - - return DedupLock(key, redis) diff --git a/autogpt_platform/backend/backend/copilot/message_dedup_test.py b/autogpt_platform/backend/backend/copilot/message_dedup_test.py deleted file mode 100644 index 935ddd36b6..0000000000 --- a/autogpt_platform/backend/backend/copilot/message_dedup_test.py +++ /dev/null @@ -1,94 +0,0 @@ -"""Unit tests for backend.copilot.message_dedup.""" - -from unittest.mock import AsyncMock - -import pytest -import pytest_mock - -from backend.copilot.message_dedup import _KEY_PREFIX, acquire_dedup_lock - - -def _patch_redis(mocker: pytest_mock.MockerFixture, *, set_returns): - mock_redis = AsyncMock() - mock_redis.set = AsyncMock(return_value=set_returns) - mocker.patch( - "backend.copilot.message_dedup.get_redis_async", - new_callable=AsyncMock, - return_value=mock_redis, - ) - return mock_redis - - -@pytest.mark.asyncio -async def test_acquire_returns_none_when_no_message_no_files( - mocker: pytest_mock.MockerFixture, -) -> None: - """Nothing to deduplicate — no Redis call made, None returned.""" - mock_redis = _patch_redis(mocker, set_returns=True) - result = await acquire_dedup_lock("sess-1", None, None) - assert result is None - mock_redis.set.assert_not_called() - - -@pytest.mark.asyncio -async def test_acquire_returns_lock_on_first_request( - mocker: pytest_mock.MockerFixture, -) -> None: - """First request acquires the lock and returns a _DedupLock.""" - mock_redis = _patch_redis(mocker, set_returns=True) - lock = await acquire_dedup_lock("sess-1", "hello", None) - assert lock is not None - mock_redis.set.assert_called_once() - key_arg = mock_redis.set.call_args.args[0] - assert key_arg.startswith(f"{_KEY_PREFIX}:sess-1:") - - -@pytest.mark.asyncio -async def test_acquire_returns_none_on_duplicate( - mocker: pytest_mock.MockerFixture, -) -> None: - """Duplicate request (NX fails) returns None to signal the caller.""" - _patch_redis(mocker, set_returns=None) - result = await acquire_dedup_lock("sess-1", "hello", None) - assert result is None - - -@pytest.mark.asyncio -async def test_acquire_key_stable_across_file_order( - mocker: pytest_mock.MockerFixture, -) -> None: - """File IDs are sorted before hashing so order doesn't affect the key.""" - mock_redis_1 = _patch_redis(mocker, set_returns=True) - await acquire_dedup_lock("sess-1", "msg", ["b", "a"]) - key_ab = mock_redis_1.set.call_args.args[0] - - mock_redis_2 = _patch_redis(mocker, set_returns=True) - await acquire_dedup_lock("sess-1", "msg", ["a", "b"]) - key_ba = mock_redis_2.set.call_args.args[0] - - assert key_ab == key_ba - - -@pytest.mark.asyncio -async def test_release_deletes_key( - mocker: pytest_mock.MockerFixture, -) -> None: - """release() calls Redis delete exactly once.""" - mock_redis = _patch_redis(mocker, set_returns=True) - lock = await acquire_dedup_lock("sess-1", "hello", None) - assert lock is not None - await lock.release() - mock_redis.delete.assert_called_once() - - -@pytest.mark.asyncio -async def test_release_swallows_redis_error( - mocker: pytest_mock.MockerFixture, -) -> None: - """release() must not raise even when Redis delete fails.""" - mock_redis = _patch_redis(mocker, set_returns=True) - mock_redis.delete = AsyncMock(side_effect=RuntimeError("redis down")) - lock = await acquire_dedup_lock("sess-1", "hello", None) - assert lock is not None - await lock.release() # must not raise - mock_redis.delete.assert_called_once() diff --git a/autogpt_platform/backend/backend/copilot/model.py b/autogpt_platform/backend/backend/copilot/model.py index 39229b7210..f2d23b0bcc 100644 --- a/autogpt_platform/backend/backend/copilot/model.py +++ b/autogpt_platform/backend/backend/copilot/model.py @@ -665,6 +665,17 @@ async def append_and_save_message(session_id: str, message: ChatMessage) -> Chat if session is None: raise ValueError(f"Session {session_id} not found") + # Idempotency: skip if last message is identical (infra/nginx retry). + # The cluster lock in the executor prevents duplicate execution; + # this prevents the duplicate DB write that would occur before + # the executor even picks up the task. + if ( + session.messages + and session.messages[-1].role == message.role + and session.messages[-1].content == message.content + ): + return session + session.messages.append(message) existing_message_count = await chat_db().get_next_sequence(session_id)