diff --git a/autogpt_platform/backend/backend/api/features/chat/routes.py b/autogpt_platform/backend/backend/api/features/chat/routes.py index 7446ddea22..6c876f3be5 100644 --- a/autogpt_platform/backend/backend/api/features/chat/routes.py +++ b/autogpt_platform/backend/backend/api/features/chat/routes.py @@ -875,8 +875,9 @@ async def stream_chat_post( # Atomically append user message to session BEFORE creating task to avoid # race condition where GET_SESSION sees task as "running" but message isn't - # saved yet. append_and_save_message is idempotent — duplicate POSTs with - # the same content are silently skipped at the DB layer. + # saved yet. append_and_save_message returns None when a duplicate is + # detected — in that case skip enqueue to avoid processing the message twice. + is_duplicate_message = False try: if request.message: message = ChatMessage( @@ -890,7 +891,8 @@ async def stream_chat_post( message_length=len(request.message), ) logger.info(f"[STREAM] Saving user message to session {session_id}") - await append_and_save_message(session_id, message) + saved = await append_and_save_message(session_id, message) + is_duplicate_message = saved is None logger.info(f"[STREAM] User message saved for session {session_id}") # Create a task in the stream registry for reconnection support @@ -915,17 +917,22 @@ async def stream_chat_post( }, ) - await enqueue_copilot_turn( - session_id=session_id, - user_id=user_id, - message=request.message, - turn_id=turn_id, - is_user_message=request.is_user_message, - context=request.context, - file_ids=sanitized_file_ids, - mode=request.mode, - model=request.model, - ) + if not is_duplicate_message: + await enqueue_copilot_turn( + session_id=session_id, + user_id=user_id, + message=request.message, + turn_id=turn_id, + is_user_message=request.is_user_message, + context=request.context, + file_ids=sanitized_file_ids, + mode=request.mode, + model=request.model, + ) + else: + logger.info( + f"[STREAM] Duplicate message detected for session {session_id}, skipping enqueue" + ) except Exception: raise diff --git a/autogpt_platform/backend/backend/api/features/chat/routes_test.py b/autogpt_platform/backend/backend/api/features/chat/routes_test.py index 7b9d89dee3..ff776bccb2 100644 --- a/autogpt_platform/backend/backend/api/features/chat/routes_test.py +++ b/autogpt_platform/backend/backend/api/features/chat/routes_test.py @@ -149,7 +149,7 @@ def _mock_stream_internals(mocker: pytest_mock.MockerFixture): ) mock_save = mocker.patch( "backend.api.features.chat.routes.append_and_save_message", - return_value=None, + return_value=MagicMock(), # non-None = message was saved (not a duplicate) ) mock_registry = mocker.MagicMock() mock_registry.create_session = mocker.AsyncMock(return_value=None) @@ -194,6 +194,26 @@ def test_stream_chat_accepts_20_file_ids(mocker: pytest_mock.MockerFixture): assert response.status_code == 200 +# ─── Duplicate message dedup ────────────────────────────────────────── + + +def test_stream_chat_skips_enqueue_for_duplicate_message( + mocker: pytest_mock.MockerFixture, +): + """When append_and_save_message returns None (duplicate detected), + enqueue_copilot_turn must NOT be called to avoid double-processing.""" + mocks = _mock_stream_internals(mocker) + # Override save to return None — signalling a duplicate + mocks.save.return_value = None + + response = client.post( + "/sessions/sess-1/stream", + json={"message": "hello"}, + ) + assert response.status_code == 200 + mocks.enqueue.assert_not_called() + + # ─── UUID format filtering ───────────────────────────────────────────── diff --git a/autogpt_platform/backend/backend/copilot/model.py b/autogpt_platform/backend/backend/copilot/model.py index 2255e67dd4..fed3d8e8d4 100644 --- a/autogpt_platform/backend/backend/copilot/model.py +++ b/autogpt_platform/backend/backend/copilot/model.py @@ -647,9 +647,15 @@ async def _save_session_to_db( msg.sequence = existing_message_count + i -async def append_and_save_message(session_id: str, message: ChatMessage) -> ChatSession: +async def append_and_save_message( + session_id: str, message: ChatMessage +) -> ChatSession | None: """Atomically append a message to a session and persist it. + Returns the updated session, or None if the message was detected as a + duplicate (idempotency guard). Callers must check for None and skip any + downstream work (e.g. enqueuing a new LLM turn) when a duplicate is detected. + Uses _get_session_lock (Redis NX) to serialise concurrent writers across replicas. The idempotency check below provides a last-resort guard when the lock degrades. """ @@ -676,7 +682,7 @@ async def append_and_save_message(session_id: str, message: ChatMessage) -> Chat and session.messages[-1].role == message.role and session.messages[-1].content == message.content ): - return session + return None # duplicate — caller should skip enqueue session.messages.append(message) existing_message_count = await chat_db().get_next_sequence(session_id)