From 227c60abd31cdecfa141f7f96e89b620f0b02667 Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Wed, 15 Apr 2026 18:54:59 +0700 Subject: [PATCH] fix(backend/copilot): idempotency guard + frontend dedup fix for duplicate messages (#12788) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Why After merging #12782 to dev, a k8s rolling deployment triggered infrastructure-level POST retries — nginx detected the old pod's connection reset mid-stream and resent the same POST to a new pod. Both pods independently saved the user message and ran the executor, producing duplicate entries in the DB (seq 159, 161, 163) and a duplicate response in the chat. The model saw the same question 3× in its context window and spent its response commenting on that instead of answering. Two compounding issues: 1. **No backend idempotency**: `append_and_save_message` saves unconditionally — k8s/nginx retries silently produce duplicate turns. 2. **Frontend dedup cleared after success**: `lastSubmittedMsgRef.current = null` after every completed turn wipes the dedup guard, so any rapid re-submit of the same text (from a stalled UI or user double-click) slips through. ## What **Backend** — Redis idempotency gate in `stream_chat_post`: - Before saving the user message, compute `sha256(session_id + message)[:16]` and `SET NX ex=30` in Redis - If key already exists → duplicate: return empty SSE (`StreamFinish + [DONE]`) immediately, skip save + executor enqueue - User messages only (`is_user_message=True`); system/assistant messages bypass the check **Frontend** — Keep `lastSubmittedMsgRef` populated after success: - Remove `lastSubmittedMsgRef.current = null` on stream complete - `getSendSuppressionReason` already has a two-condition check: `ref === text AND lastUserMsg === text` — so legitimate re-asks (after a different question was answered) still work; only rapid re-sends of the exact same text while it's still the last user message are blocked ## How - 30 s Redis TTL covers infrastructure retry windows (k8s SIGTERM → connection reset → ingress retry typically < 5 s) - Empty SSE response is well-formed (StreamFinish + [DONE]) — frontend AI SDK marks the turn complete without rendering a ghost message - Frontend ref kept live means: submit "foo" → success → submit "foo" again instantly → suppressed. Submit "foo" → success → submit "bar" → proceeds (different text updates the ref). ## Tests - 3 new backend route tests: duplicate blocked, first POST proceeds, non-user messages bypass - 5 new frontend `getSendSuppressionReason` unit tests: fresh ref, reconnecting, duplicate suppressed, different-turn re-ask allowed, different text allowed ## Checklist - [x] I have read the [AutoGPT Contributing Guide](https://github.com/Significant-Gravitas/AutoGPT/blob/master/CONTRIBUTING.md) - [x] I have performed a self-review of my code - [x] I have added tests that prove the fix is effective - [x] I have run `poetry run format` and `pnpm format` + `pnpm lint` --- .../backend/api/features/chat/routes.py | 149 ++++++---- .../backend/api/features/chat/routes_test.py | 278 +++++++++++++++++- .../backend/backend/copilot/message_dedup.py | 71 +++++ .../backend/copilot/message_dedup_test.py | 94 ++++++ .../copilot/__tests__/helpers.test.ts | 71 ++++- .../(platform)/copilot/useCopilotStream.ts | 7 +- 6 files changed, 606 insertions(+), 64 deletions(-) create mode 100644 autogpt_platform/backend/backend/copilot/message_dedup.py create mode 100644 autogpt_platform/backend/backend/copilot/message_dedup_test.py diff --git a/autogpt_platform/backend/backend/api/features/chat/routes.py b/autogpt_platform/backend/backend/api/features/chat/routes.py index ac7325e201..496e958e17 100644 --- a/autogpt_platform/backend/backend/api/features/chat/routes.py +++ b/autogpt_platform/backend/backend/api/features/chat/routes.py @@ -18,6 +18,7 @@ from backend.copilot import stream_registry from backend.copilot.config import ChatConfig, CopilotLlmModel, CopilotMode from backend.copilot.db import get_chat_messages_paginated from backend.copilot.executor.utils import enqueue_cancel_task, enqueue_copilot_turn +from backend.copilot.message_dedup import acquire_dedup_lock from backend.copilot.model import ( ChatMessage, ChatSession, @@ -840,6 +841,9 @@ async def stream_chat_post( # Also sanitise file_ids so only validated, workspace-scoped IDs are # forwarded downstream (e.g. to the executor via enqueue_copilot_turn). sanitized_file_ids: list[str] | None = None + # Capture the original message text BEFORE any mutation (attachment enrichment) + # so the idempotency hash is stable across retries. + original_message = request.message if request.file_ids and user_id: # Filter to valid UUIDs only to prevent DB abuse valid_ids = [fid for fid in request.file_ids if _UUID_RE.match(fid)] @@ -868,61 +872,91 @@ async def stream_chat_post( ) request.message += files_block + # ── Idempotency guard ──────────────────────────────────────────────────── + # Blocks duplicate executor tasks from concurrent/retried POSTs. + # See backend/copilot/message_dedup.py for the full lifecycle description. + dedup_lock = None + if request.is_user_message: + dedup_lock = await acquire_dedup_lock( + session_id, original_message, sanitized_file_ids + ) + if dedup_lock is None and (original_message or sanitized_file_ids): + + async def _empty_sse() -> AsyncGenerator[str, None]: + yield StreamFinish().to_sse() + yield "data: [DONE]\n\n" + + return StreamingResponse( + _empty_sse(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + "Connection": "keep-alive", + "x-vercel-ai-ui-message-stream": "v1", + }, + ) + # Atomically append user message to session BEFORE creating task to avoid # race condition where GET_SESSION sees task as "running" but message isn't # saved yet. append_and_save_message re-fetches inside a lock to prevent # message loss from concurrent requests. - if request.message: - message = ChatMessage( - role="user" if request.is_user_message else "assistant", - content=request.message, - ) - if request.is_user_message: - track_user_message( - user_id=user_id, - session_id=session_id, - message_length=len(request.message), + # + # If any of these operations raises, release the dedup lock before propagating + # so subsequent retries are not blocked for 30 s. + try: + if request.message: + message = ChatMessage( + role="user" if request.is_user_message else "assistant", + content=request.message, ) - logger.info(f"[STREAM] Saving user message to session {session_id}") - await append_and_save_message(session_id, message) - logger.info(f"[STREAM] User message saved for session {session_id}") + if request.is_user_message: + track_user_message( + user_id=user_id, + session_id=session_id, + message_length=len(request.message), + ) + logger.info(f"[STREAM] Saving user message to session {session_id}") + await append_and_save_message(session_id, message) + logger.info(f"[STREAM] User message saved for session {session_id}") - # Create a task in the stream registry for reconnection support - turn_id = str(uuid4()) - log_meta["turn_id"] = turn_id + # Create a task in the stream registry for reconnection support + turn_id = str(uuid4()) + log_meta["turn_id"] = turn_id - session_create_start = time.perf_counter() - await stream_registry.create_session( - session_id=session_id, - user_id=user_id, - tool_call_id="chat_stream", - tool_name="chat", - turn_id=turn_id, - ) - logger.info( - f"[TIMING] create_session completed in {(time.perf_counter() - session_create_start) * 1000:.1f}ms", - extra={ - "json_fields": { - **log_meta, - "duration_ms": (time.perf_counter() - session_create_start) * 1000, - } - }, - ) + session_create_start = time.perf_counter() + await stream_registry.create_session( + session_id=session_id, + user_id=user_id, + tool_call_id="chat_stream", + tool_name="chat", + turn_id=turn_id, + ) + logger.info( + f"[TIMING] create_session completed in {(time.perf_counter() - session_create_start) * 1000:.1f}ms", + extra={ + "json_fields": { + **log_meta, + "duration_ms": (time.perf_counter() - session_create_start) * 1000, + } + }, + ) - # Per-turn stream is always fresh (unique turn_id), subscribe from beginning - subscribe_from_id = "0-0" - - await enqueue_copilot_turn( - session_id=session_id, - user_id=user_id, - message=request.message, - turn_id=turn_id, - is_user_message=request.is_user_message, - context=request.context, - file_ids=sanitized_file_ids, - mode=request.mode, - model=request.model, - ) + await enqueue_copilot_turn( + session_id=session_id, + user_id=user_id, + message=request.message, + turn_id=turn_id, + is_user_message=request.is_user_message, + context=request.context, + file_ids=sanitized_file_ids, + mode=request.mode, + model=request.model, + ) + except Exception: + if dedup_lock: + await dedup_lock.release() + raise setup_time = (time.perf_counter() - stream_start_time) * 1000 logger.info( @@ -930,6 +964,9 @@ async def stream_chat_post( extra={"json_fields": {**log_meta, "setup_time_ms": setup_time}}, ) + # Per-turn stream is always fresh (unique turn_id), subscribe from beginning + subscribe_from_id = "0-0" + # SSE endpoint that subscribes to the task's stream async def event_generator() -> AsyncGenerator[str, None]: import time as time_module @@ -943,6 +980,12 @@ async def stream_chat_post( subscriber_queue = None first_chunk_yielded = False chunks_yielded = 0 + # True for every exit path except GeneratorExit (client disconnect). + # On disconnect the backend turn is still running — releasing the lock + # there would reopen the infra-retry duplicate window. The 30 s TTL + # is the fallback. All other exits (normal finish, early return, error) + # should release so the user can re-send the same message. + release_dedup_lock_on_exit = True try: # Subscribe from the position we captured before enqueuing # This avoids replaying old messages while catching all new ones @@ -954,8 +997,7 @@ async def stream_chat_post( if subscriber_queue is None: yield StreamFinish().to_sse() - yield "data: [DONE]\n\n" - return + return # finally releases dedup_lock # Read from the subscriber queue and yield to SSE logger.info( @@ -984,7 +1026,6 @@ async def stream_chat_post( yield chunk.to_sse() - # Check for finish signal if isinstance(chunk, StreamFinish): total_time = time_module.perf_counter() - event_gen_start logger.info( @@ -998,7 +1039,8 @@ async def stream_chat_post( } }, ) - break + break # finally releases dedup_lock + except asyncio.TimeoutError: yield StreamHeartbeat().to_sse() @@ -1013,7 +1055,7 @@ async def stream_chat_post( } }, ) - pass # Client disconnected - background task continues + release_dedup_lock_on_exit = False except Exception as e: elapsed = (time_module.perf_counter() - event_gen_start) * 1000 logger.error( @@ -1028,7 +1070,10 @@ async def stream_chat_post( code="stream_error", ).to_sse() yield StreamFinish().to_sse() + # finally releases dedup_lock finally: + if dedup_lock and release_dedup_lock_on_exit: + await dedup_lock.release() # Unsubscribe when client disconnects or stream ends if subscriber_queue is not None: try: diff --git a/autogpt_platform/backend/backend/api/features/chat/routes_test.py b/autogpt_platform/backend/backend/api/features/chat/routes_test.py index 74259b3463..597aad01ad 100644 --- a/autogpt_platform/backend/backend/api/features/chat/routes_test.py +++ b/autogpt_platform/backend/backend/api/features/chat/routes_test.py @@ -133,14 +133,30 @@ def test_stream_chat_rejects_too_many_file_ids(): assert response.status_code == 422 -def _mock_stream_internals(mocker: pytest_mock.MockFixture): +def _mock_stream_internals( + mocker: pytest_mock.MockerFixture, + *, + redis_set_returns: object = True, +): """Mock the async internals of stream_chat_post so tests can exercise - validation and enrichment logic without needing Redis/RabbitMQ.""" + validation and enrichment logic without needing Redis/RabbitMQ. + + Args: + redis_set_returns: Value returned by the mocked Redis ``set`` call. + ``True`` (default) simulates a fresh key (new message); + ``None`` simulates a collision (duplicate blocked). + + Returns: + A namespace with ``redis``, ``save``, and ``enqueue`` mock objects so + callers can make additional assertions about side-effects. + """ + import types + mocker.patch( "backend.api.features.chat.routes._validate_and_get_session", return_value=None, ) - mocker.patch( + mock_save = mocker.patch( "backend.api.features.chat.routes.append_and_save_message", return_value=None, ) @@ -150,7 +166,7 @@ def _mock_stream_internals(mocker: pytest_mock.MockFixture): "backend.api.features.chat.routes.stream_registry", mock_registry, ) - mocker.patch( + mock_enqueue = mocker.patch( "backend.api.features.chat.routes.enqueue_copilot_turn", return_value=None, ) @@ -158,9 +174,18 @@ def _mock_stream_internals(mocker: pytest_mock.MockFixture): "backend.api.features.chat.routes.track_user_message", return_value=None, ) + mock_redis = AsyncMock() + mock_redis.set = AsyncMock(return_value=redis_set_returns) + mocker.patch( + "backend.copilot.message_dedup.get_redis_async", + new_callable=AsyncMock, + return_value=mock_redis, + ) + ns = types.SimpleNamespace(redis=mock_redis, save=mock_save, enqueue=mock_enqueue) + return ns -def test_stream_chat_accepts_20_file_ids(mocker: pytest_mock.MockFixture): +def test_stream_chat_accepts_20_file_ids(mocker: pytest_mock.MockerFixture): """Exactly 20 file_ids should be accepted (not rejected by validation).""" _mock_stream_internals(mocker) # Patch workspace lookup as imported by the routes module @@ -189,7 +214,7 @@ def test_stream_chat_accepts_20_file_ids(mocker: pytest_mock.MockFixture): # ─── UUID format filtering ───────────────────────────────────────────── -def test_file_ids_filters_invalid_uuids(mocker: pytest_mock.MockFixture): +def test_file_ids_filters_invalid_uuids(mocker: pytest_mock.MockerFixture): """Non-UUID strings in file_ids should be silently filtered out and NOT passed to the database query.""" _mock_stream_internals(mocker) @@ -228,7 +253,7 @@ def test_file_ids_filters_invalid_uuids(mocker: pytest_mock.MockFixture): # ─── Cross-workspace file_ids ───────────────────────────────────────── -def test_file_ids_scoped_to_workspace(mocker: pytest_mock.MockFixture): +def test_file_ids_scoped_to_workspace(mocker: pytest_mock.MockerFixture): """The batch query should scope to the user's workspace.""" _mock_stream_internals(mocker) mocker.patch( @@ -257,7 +282,7 @@ def test_file_ids_scoped_to_workspace(mocker: pytest_mock.MockFixture): # ─── Rate limit → 429 ───────────────────────────────────────────────── -def test_stream_chat_returns_429_on_daily_rate_limit(mocker: pytest_mock.MockFixture): +def test_stream_chat_returns_429_on_daily_rate_limit(mocker: pytest_mock.MockerFixture): """When check_rate_limit raises RateLimitExceeded for daily limit the endpoint returns 429.""" from backend.copilot.rate_limit import RateLimitExceeded @@ -278,7 +303,9 @@ def test_stream_chat_returns_429_on_daily_rate_limit(mocker: pytest_mock.MockFix assert "daily" in response.json()["detail"].lower() -def test_stream_chat_returns_429_on_weekly_rate_limit(mocker: pytest_mock.MockFixture): +def test_stream_chat_returns_429_on_weekly_rate_limit( + mocker: pytest_mock.MockerFixture, +): """When check_rate_limit raises RateLimitExceeded for weekly limit the endpoint returns 429.""" from backend.copilot.rate_limit import RateLimitExceeded @@ -301,7 +328,7 @@ def test_stream_chat_returns_429_on_weekly_rate_limit(mocker: pytest_mock.MockFi assert "resets in" in detail -def test_stream_chat_429_includes_reset_time(mocker: pytest_mock.MockFixture): +def test_stream_chat_429_includes_reset_time(mocker: pytest_mock.MockerFixture): """The 429 response detail should include the human-readable reset time.""" from backend.copilot.rate_limit import RateLimitExceeded @@ -679,6 +706,237 @@ class TestStripInjectedContext: assert result["content"] == "hello" +# ─── Idempotency / duplicate-POST guard ────────────────────────────── + + +def test_stream_chat_blocks_duplicate_post_returns_empty_sse( + mocker: pytest_mock.MockerFixture, +) -> None: + """A second POST with the same message within the 30-s window must return + an empty SSE stream (StreamFinish + [DONE]) so the frontend marks the + turn complete without creating a ghost response.""" + # redis_set_returns=None simulates a collision: the NX key already exists. + ns = _mock_stream_internals(mocker, redis_set_returns=None) + + response = client.post( + "/sessions/sess-dup/stream", + json={"message": "duplicate message", "is_user_message": True}, + ) + + assert response.status_code == 200 + body = response.text + # The response must contain StreamFinish (type=finish) and the SSE [DONE] terminator. + assert '"finish"' in body + assert "[DONE]" in body + # The empty SSE response must include the AI SDK protocol header so the + # frontend treats it as a valid stream and marks the turn complete. + assert response.headers.get("x-vercel-ai-ui-message-stream") == "v1" + # The duplicate guard must prevent save/enqueue side effects. + ns.save.assert_not_called() + ns.enqueue.assert_not_called() + + +def test_stream_chat_first_post_proceeds_normally( + mocker: pytest_mock.MockerFixture, +) -> None: + """The first POST (Redis NX key set successfully) must proceed through the + normal streaming path — no early return.""" + ns = _mock_stream_internals(mocker, redis_set_returns=True) + + response = client.post( + "/sessions/sess-new/stream", + json={"message": "first message", "is_user_message": True}, + ) + + assert response.status_code == 200 + # Redis set must have been called once with the NX flag. + ns.redis.set.assert_called_once() + call_kwargs = ns.redis.set.call_args + assert call_kwargs.kwargs.get("nx") is True + + +def test_stream_chat_dedup_skipped_for_non_user_messages( + mocker: pytest_mock.MockerFixture, +) -> None: + """System/assistant messages (is_user_message=False) bypass the dedup + guard — they are injected programmatically and must always be processed.""" + ns = _mock_stream_internals(mocker, redis_set_returns=None) + + response = client.post( + "/sessions/sess-sys/stream", + json={"message": "system context", "is_user_message": False}, + ) + + # Even though redis_set_returns=None (would block a user message), + # the endpoint must proceed because is_user_message=False. + assert response.status_code == 200 + ns.redis.set.assert_not_called() + + +def test_stream_chat_dedup_hash_uses_original_message_not_mutated( + mocker: pytest_mock.MockerFixture, +) -> None: + """The dedup hash must be computed from the original request message, + not the mutated version that has the [Attached files] block appended. + A file_id is sent so the route actually appends the [Attached files] block, + exercising the mutation path — the hash must still match the original text.""" + import hashlib + + ns = _mock_stream_internals(mocker, redis_set_returns=True) + + file_id = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + # Mock workspace + prisma so the attachment block is actually appended. + mocker.patch( + "backend.api.features.chat.routes.get_or_create_workspace", + return_value=type("W", (), {"id": "ws-1"})(), + ) + fake_file = type( + "F", + (), + { + "id": file_id, + "name": "doc.pdf", + "mimeType": "application/pdf", + "sizeBytes": 1024, + }, + )() + mock_prisma = mocker.MagicMock() + mock_prisma.find_many = mocker.AsyncMock(return_value=[fake_file]) + mocker.patch( + "prisma.models.UserWorkspaceFile.prisma", + return_value=mock_prisma, + ) + + response = client.post( + "/sessions/sess-hash/stream", + json={ + "message": "plain message", + "is_user_message": True, + "file_ids": [file_id], + }, + ) + + assert response.status_code == 200 + ns.redis.set.assert_called_once() + call_args = ns.redis.set.call_args + dedup_key = call_args.args[0] + + # Hash must use the original message + sorted file IDs, not the mutated text. + expected_hash = hashlib.sha256( + f"sess-hash:plain message:{file_id}".encode() + ).hexdigest()[:16] + expected_key = f"chat:msg_dedup:sess-hash:{expected_hash}" + assert dedup_key == expected_key, ( + f"Dedup key {dedup_key!r} does not match expected {expected_key!r} — " + "hash may be using mutated message or wrong inputs" + ) + + +def test_stream_chat_dedup_key_released_after_stream_finish( + mocker: pytest_mock.MockerFixture, +) -> None: + """The dedup Redis key must be deleted after the turn completes (when + subscriber_queue is None the route yields StreamFinish immediately and + should release the key so the user can re-send the same message).""" + from unittest.mock import AsyncMock as _AsyncMock + + # Set up all internals manually so we can control subscribe_to_session. + mocker.patch( + "backend.api.features.chat.routes._validate_and_get_session", + return_value=None, + ) + mocker.patch( + "backend.api.features.chat.routes.append_and_save_message", + return_value=None, + ) + mocker.patch( + "backend.api.features.chat.routes.enqueue_copilot_turn", + return_value=None, + ) + mocker.patch( + "backend.api.features.chat.routes.track_user_message", + return_value=None, + ) + mock_registry = mocker.MagicMock() + mock_registry.create_session = _AsyncMock(return_value=None) + # None → early-finish path: StreamFinish yielded immediately, dedup key released. + mock_registry.subscribe_to_session = _AsyncMock(return_value=None) + mocker.patch( + "backend.api.features.chat.routes.stream_registry", + mock_registry, + ) + mock_redis = mocker.AsyncMock() + mock_redis.set = _AsyncMock(return_value=True) + mocker.patch( + "backend.copilot.message_dedup.get_redis_async", + new_callable=_AsyncMock, + return_value=mock_redis, + ) + + response = client.post( + "/sessions/sess-finish/stream", + json={"message": "hello", "is_user_message": True}, + ) + + assert response.status_code == 200 + body = response.text + assert '"finish"' in body + # The dedup key must be released so intentional re-sends are allowed. + mock_redis.delete.assert_called_once() + + +def test_stream_chat_dedup_key_released_even_when_redis_delete_raises( + mocker: pytest_mock.MockerFixture, +) -> None: + """The route must not crash when the dedup Redis delete fails on the + subscriber_queue-is-None early-finish path (except Exception: pass).""" + from unittest.mock import AsyncMock as _AsyncMock + + mocker.patch( + "backend.api.features.chat.routes._validate_and_get_session", + return_value=None, + ) + mocker.patch( + "backend.api.features.chat.routes.append_and_save_message", + return_value=None, + ) + mocker.patch( + "backend.api.features.chat.routes.enqueue_copilot_turn", + return_value=None, + ) + mocker.patch( + "backend.api.features.chat.routes.track_user_message", + return_value=None, + ) + mock_registry = mocker.MagicMock() + mock_registry.create_session = _AsyncMock(return_value=None) + mock_registry.subscribe_to_session = _AsyncMock(return_value=None) + mocker.patch( + "backend.api.features.chat.routes.stream_registry", + mock_registry, + ) + mock_redis = mocker.AsyncMock() + mock_redis.set = _AsyncMock(return_value=True) + # Make the delete raise so the except-pass branch is exercised. + mock_redis.delete = _AsyncMock(side_effect=RuntimeError("redis gone")) + mocker.patch( + "backend.copilot.message_dedup.get_redis_async", + new_callable=_AsyncMock, + return_value=mock_redis, + ) + + # Should not raise even though delete fails. + response = client.post( + "/sessions/sess-finish-err/stream", + json={"message": "hello", "is_user_message": True}, + ) + + assert response.status_code == 200 + assert '"finish"' in response.text + # delete must have been attempted — the except-pass branch silenced the error. + mock_redis.delete.assert_called_once() + + # ─── DELETE /sessions/{id}/stream — disconnect listeners ────────────── diff --git a/autogpt_platform/backend/backend/copilot/message_dedup.py b/autogpt_platform/backend/backend/copilot/message_dedup.py new file mode 100644 index 0000000000..2af13b559a --- /dev/null +++ b/autogpt_platform/backend/backend/copilot/message_dedup.py @@ -0,0 +1,71 @@ +"""Per-request idempotency lock for the /stream endpoint. + +Prevents duplicate executor tasks from concurrent or retried POSTs (e.g. k8s +rolling-deploy retries, nginx upstream retries, rapid double-clicks). + +Lifecycle +--------- +1. ``acquire()`` — computes a stable hash of (session_id, message, file_ids) + and atomically sets a Redis NX key. Returns a ``_DedupLock`` on success or + ``None`` when the key already exists (duplicate request). +2. ``release()`` — deletes the key. Must be called on turn completion or turn + error so the next legitimate send is never blocked. +3. On client disconnect (``GeneratorExit``) the lock must NOT be released — + the backend turn is still running, and releasing would reopen the duplicate + window for infra-level retries. The 30 s TTL is the safety net. +""" + +import hashlib +import logging + +from backend.data.redis_client import get_redis_async + +logger = logging.getLogger(__name__) + +_KEY_PREFIX = "chat:msg_dedup" +_TTL_SECONDS = 30 + + +class _DedupLock: + def __init__(self, key: str, redis) -> None: + self._key = key + self._redis = redis + + async def release(self) -> None: + """Best-effort key deletion. The TTL handles failures silently.""" + try: + await self._redis.delete(self._key) + except Exception: + pass + + +async def acquire_dedup_lock( + session_id: str, + message: str | None, + file_ids: list[str] | None, +) -> _DedupLock | None: + """Acquire the idempotency lock for this (session, message, files) tuple. + + Returns a ``_DedupLock`` when the lock is freshly acquired (first request). + Returns ``None`` when a duplicate is detected (lock already held). + Returns ``None`` when there is nothing to deduplicate (no message, no files). + """ + if not message and not file_ids: + return None + + sorted_ids = ":".join(sorted(file_ids or [])) + content_hash = hashlib.sha256( + f"{session_id}:{message or ''}:{sorted_ids}".encode() + ).hexdigest()[:16] + key = f"{_KEY_PREFIX}:{session_id}:{content_hash}" + + redis = await get_redis_async() + acquired = await redis.set(key, "1", ex=_TTL_SECONDS, nx=True) + if not acquired: + logger.warning( + f"[STREAM] Duplicate user message blocked for session {session_id}, " + f"hash={content_hash} — returning empty SSE", + ) + return None + + return _DedupLock(key, redis) diff --git a/autogpt_platform/backend/backend/copilot/message_dedup_test.py b/autogpt_platform/backend/backend/copilot/message_dedup_test.py new file mode 100644 index 0000000000..935ddd36b6 --- /dev/null +++ b/autogpt_platform/backend/backend/copilot/message_dedup_test.py @@ -0,0 +1,94 @@ +"""Unit tests for backend.copilot.message_dedup.""" + +from unittest.mock import AsyncMock + +import pytest +import pytest_mock + +from backend.copilot.message_dedup import _KEY_PREFIX, acquire_dedup_lock + + +def _patch_redis(mocker: pytest_mock.MockerFixture, *, set_returns): + mock_redis = AsyncMock() + mock_redis.set = AsyncMock(return_value=set_returns) + mocker.patch( + "backend.copilot.message_dedup.get_redis_async", + new_callable=AsyncMock, + return_value=mock_redis, + ) + return mock_redis + + +@pytest.mark.asyncio +async def test_acquire_returns_none_when_no_message_no_files( + mocker: pytest_mock.MockerFixture, +) -> None: + """Nothing to deduplicate — no Redis call made, None returned.""" + mock_redis = _patch_redis(mocker, set_returns=True) + result = await acquire_dedup_lock("sess-1", None, None) + assert result is None + mock_redis.set.assert_not_called() + + +@pytest.mark.asyncio +async def test_acquire_returns_lock_on_first_request( + mocker: pytest_mock.MockerFixture, +) -> None: + """First request acquires the lock and returns a _DedupLock.""" + mock_redis = _patch_redis(mocker, set_returns=True) + lock = await acquire_dedup_lock("sess-1", "hello", None) + assert lock is not None + mock_redis.set.assert_called_once() + key_arg = mock_redis.set.call_args.args[0] + assert key_arg.startswith(f"{_KEY_PREFIX}:sess-1:") + + +@pytest.mark.asyncio +async def test_acquire_returns_none_on_duplicate( + mocker: pytest_mock.MockerFixture, +) -> None: + """Duplicate request (NX fails) returns None to signal the caller.""" + _patch_redis(mocker, set_returns=None) + result = await acquire_dedup_lock("sess-1", "hello", None) + assert result is None + + +@pytest.mark.asyncio +async def test_acquire_key_stable_across_file_order( + mocker: pytest_mock.MockerFixture, +) -> None: + """File IDs are sorted before hashing so order doesn't affect the key.""" + mock_redis_1 = _patch_redis(mocker, set_returns=True) + await acquire_dedup_lock("sess-1", "msg", ["b", "a"]) + key_ab = mock_redis_1.set.call_args.args[0] + + mock_redis_2 = _patch_redis(mocker, set_returns=True) + await acquire_dedup_lock("sess-1", "msg", ["a", "b"]) + key_ba = mock_redis_2.set.call_args.args[0] + + assert key_ab == key_ba + + +@pytest.mark.asyncio +async def test_release_deletes_key( + mocker: pytest_mock.MockerFixture, +) -> None: + """release() calls Redis delete exactly once.""" + mock_redis = _patch_redis(mocker, set_returns=True) + lock = await acquire_dedup_lock("sess-1", "hello", None) + assert lock is not None + await lock.release() + mock_redis.delete.assert_called_once() + + +@pytest.mark.asyncio +async def test_release_swallows_redis_error( + mocker: pytest_mock.MockerFixture, +) -> None: + """release() must not raise even when Redis delete fails.""" + mock_redis = _patch_redis(mocker, set_returns=True) + mock_redis.delete = AsyncMock(side_effect=RuntimeError("redis down")) + lock = await acquire_dedup_lock("sess-1", "hello", None) + assert lock is not None + await lock.release() # must not raise + mock_redis.delete.assert_called_once() diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/__tests__/helpers.test.ts b/autogpt_platform/frontend/src/app/(platform)/copilot/__tests__/helpers.test.ts index 712aaaf508..9580ef349a 100644 --- a/autogpt_platform/frontend/src/app/(platform)/copilot/__tests__/helpers.test.ts +++ b/autogpt_platform/frontend/src/app/(platform)/copilot/__tests__/helpers.test.ts @@ -1,6 +1,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { IMPERSONATION_HEADER_NAME } from "@/lib/constants"; -import { getCopilotAuthHeaders } from "../helpers"; +import { getCopilotAuthHeaders, getSendSuppressionReason } from "../helpers"; +import type { UIMessage } from "ai"; vi.mock("@/lib/supabase/actions", () => ({ getWebSocketToken: vi.fn(), @@ -72,3 +73,71 @@ describe("getCopilotAuthHeaders", () => { ); }); }); + +// ─── getSendSuppressionReason ───────────────────────────────────────────────── + +function makeUserMsg(text: string): UIMessage { + return { + id: "msg-1", + role: "user", + content: text, + parts: [{ type: "text", text }], + } as UIMessage; +} + +describe("getSendSuppressionReason", () => { + it("returns null when no dedup context exists (fresh ref)", () => { + const result = getSendSuppressionReason({ + text: "hello", + isReconnectScheduled: false, + lastSubmittedText: null, + messages: [], + }); + expect(result).toBeNull(); + }); + + it("returns 'reconnecting' when reconnect is scheduled regardless of text", () => { + const result = getSendSuppressionReason({ + text: "hello", + isReconnectScheduled: true, + lastSubmittedText: null, + messages: [], + }); + expect(result).toBe("reconnecting"); + }); + + it("returns 'duplicate' when same text was submitted and is the last user message", () => { + // This is the core regression test: after a successful turn the ref + // is intentionally NOT cleared to null, so submitting the same text + // again is caught here. + const result = getSendSuppressionReason({ + text: "hello", + isReconnectScheduled: false, + lastSubmittedText: "hello", + messages: [makeUserMsg("hello")], + }); + expect(result).toBe("duplicate"); + }); + + it("returns null when same ref text but different last user message (different question)", () => { + // User asked "hello" before, got a reply, then asked a different question + // — the last user message in chat is now different, so no suppression. + const result = getSendSuppressionReason({ + text: "hello", + isReconnectScheduled: false, + lastSubmittedText: "hello", + messages: [makeUserMsg("hello"), makeUserMsg("something else")], + }); + expect(result).toBeNull(); + }); + + it("returns null when text differs from lastSubmittedText", () => { + const result = getSendSuppressionReason({ + text: "new question", + isReconnectScheduled: false, + lastSubmittedText: "old question", + messages: [makeUserMsg("old question")], + }); + expect(result).toBeNull(); + }); +}); diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts b/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts index 85709f23d9..666b87bfba 100644 --- a/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts +++ b/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts @@ -497,7 +497,12 @@ export function useCopilotStream({ if (status === "ready") { reconnectAttemptsRef.current = 0; hasShownDisconnectToast.current = false; - lastSubmittedMsgRef.current = null; + // Intentionally NOT clearing lastSubmittedMsgRef here: keeping the last + // submitted text prevents getSendSuppressionReason from allowing a + // duplicate POST of the same message immediately after a successful turn + // (the "duplicate" branch checks both the ref and the visible last user + // message, so legitimate re-sends after a different reply are still + // allowed). setReconnectExhausted(false); } }