diff --git a/autogpt_platform/backend/backend/api/features/chat/routes.py b/autogpt_platform/backend/backend/api/features/chat/routes.py index 0ff078ebb6..8ab4fa27df 100644 --- a/autogpt_platform/backend/backend/api/features/chat/routes.py +++ b/autogpt_platform/backend/backend/api/features/chat/routes.py @@ -878,28 +878,31 @@ async def stream_chat_post( # saved yet. append_and_save_message returns None when a duplicate is # detected — in that case skip enqueue to avoid processing the message twice. is_duplicate_message = False - try: - if request.message: - message = ChatMessage( - role="user" if request.is_user_message else "assistant", - content=request.message, + if request.message: + message = ChatMessage( + role="user" if request.is_user_message else "assistant", + content=request.message, + ) + if request.is_user_message: + track_user_message( + user_id=user_id, + session_id=session_id, + message_length=len(request.message), ) - if request.is_user_message: - track_user_message( - user_id=user_id, - session_id=session_id, - message_length=len(request.message), - ) - logger.info(f"[STREAM] Saving user message to session {session_id}") - is_duplicate_message = ( - await append_and_save_message(session_id, message) - ) is None - logger.info(f"[STREAM] User message saved for session {session_id}") + logger.info(f"[STREAM] Saving user message to session {session_id}") + is_duplicate_message = ( + await append_and_save_message(session_id, message) + ) is None + logger.info(f"[STREAM] User message saved for session {session_id}") - # Create a task in the stream registry for reconnection support - turn_id = str(uuid4()) - log_meta["turn_id"] = turn_id + # Create a task in the stream registry for reconnection support. + # For duplicate messages, skip create_session entirely so the infra-retry + # client subscribes to the *existing* turn's Redis stream and receives the + # in-progress executor output rather than an empty stream. + turn_id = str(uuid4()) + log_meta["turn_id"] = turn_id + if not is_duplicate_message: session_create_start = time.perf_counter() await stream_registry.create_session( session_id=session_id, @@ -917,25 +920,21 @@ async def stream_chat_post( } }, ) - - if not is_duplicate_message: - await enqueue_copilot_turn( - session_id=session_id, - user_id=user_id, - message=request.message, - turn_id=turn_id, - is_user_message=request.is_user_message, - context=request.context, - file_ids=sanitized_file_ids, - mode=request.mode, - model=request.model, - ) - else: - logger.info( - f"[STREAM] Duplicate message detected for session {session_id}, skipping enqueue" - ) - except Exception: - raise + await enqueue_copilot_turn( + session_id=session_id, + user_id=user_id, + message=request.message, + turn_id=turn_id, + is_user_message=request.is_user_message, + context=request.context, + file_ids=sanitized_file_ids, + mode=request.mode, + model=request.model, + ) + else: + logger.info( + f"[STREAM] Duplicate message detected for session {session_id}, skipping enqueue" + ) setup_time = (time.perf_counter() - stream_start_time) * 1000 logger.info( diff --git a/autogpt_platform/backend/backend/api/features/chat/routes_test.py b/autogpt_platform/backend/backend/api/features/chat/routes_test.py index ff776bccb2..fdc586494f 100644 --- a/autogpt_platform/backend/backend/api/features/chat/routes_test.py +++ b/autogpt_platform/backend/backend/api/features/chat/routes_test.py @@ -165,7 +165,9 @@ def _mock_stream_internals(mocker: pytest_mock.MockerFixture): "backend.api.features.chat.routes.track_user_message", return_value=None, ) - return types.SimpleNamespace(save=mock_save, enqueue=mock_enqueue) + return types.SimpleNamespace( + save=mock_save, enqueue=mock_enqueue, registry=mock_registry + ) def test_stream_chat_accepts_20_file_ids(mocker: pytest_mock.MockerFixture): @@ -201,7 +203,9 @@ def test_stream_chat_skips_enqueue_for_duplicate_message( mocker: pytest_mock.MockerFixture, ): """When append_and_save_message returns None (duplicate detected), - enqueue_copilot_turn must NOT be called to avoid double-processing.""" + enqueue_copilot_turn and stream_registry.create_session must NOT be called + to avoid double-processing and to prevent overwriting the active stream's + turn_id in Redis (which would cause reconnecting clients to miss the response).""" mocks = _mock_stream_internals(mocker) # Override save to return None — signalling a duplicate mocks.save.return_value = None @@ -212,6 +216,7 @@ def test_stream_chat_skips_enqueue_for_duplicate_message( ) assert response.status_code == 200 mocks.enqueue.assert_not_called() + mocks.registry.create_session.assert_not_called() # ─── UUID format filtering ─────────────────────────────────────────────