diff --git a/autogpt_platform/backend/backend/copilot/sdk/query_builder_test.py b/autogpt_platform/backend/backend/copilot/sdk/query_builder_test.py index 57f037baba..4042dae590 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/query_builder_test.py +++ b/autogpt_platform/backend/backend/copilot/sdk/query_builder_test.py @@ -226,6 +226,78 @@ async def test_build_query_no_resume_multi_message(monkeypatch): assert was_compacted is False # mock returns False +@pytest.mark.asyncio +async def test_build_query_session_msg_ceiling_prevents_pending_duplication(): + """session_msg_ceiling stops pending messages from leaking into the gap. + + Scenario: transcript covers 2 messages, session has 2 historical + 1 current + + 2 pending drained at turn start. Without the ceiling the gap would include + the pending messages AND current_message already has them → duplication. + With session_msg_ceiling=3 (pre-drain count) the gap slice is empty and + only current_message carries the pending content. + """ + # session.messages after drain: [hist1, hist2, current_msg, pending1, pending2] + session = _make_session( + [ + ChatMessage(role="user", content="hist1"), + ChatMessage(role="assistant", content="hist2"), + ChatMessage(role="user", content="current msg with pending1 pending2"), + ChatMessage(role="user", content="pending1"), + ChatMessage(role="user", content="pending2"), + ] + ) + # transcript covers hist1+hist2 (2 messages); pre-drain count was 3 (includes current_msg) + result, was_compacted = await _build_query_message( + "current msg with pending1 pending2", + session, + use_resume=True, + transcript_msg_count=2, + session_id="test-session", + session_msg_ceiling=3, # len(session.messages) before drain + ) + # Gap should be empty (transcript_msg_count == ceiling - 1), so no history prepended + assert result == "current msg with pending1 pending2" + assert was_compacted is False + # Pending messages must NOT appear in gap context + assert "pending1" not in result.split("current msg")[0] + + +@pytest.mark.asyncio +async def test_build_query_session_msg_ceiling_preserves_real_gap(): + """session_msg_ceiling still surfaces a genuine stale-transcript gap. + + Scenario: transcript covers 2 messages, session has 4 historical + 1 current + + 2 pending. Ceiling = 5 (pre-drain). Real gap = messages 2-3 (hist3, hist4). + """ + session = _make_session( + [ + ChatMessage(role="user", content="hist1"), + ChatMessage(role="assistant", content="hist2"), + ChatMessage(role="user", content="hist3"), + ChatMessage(role="assistant", content="hist4"), + ChatMessage(role="user", content="current"), + ChatMessage(role="user", content="pending1"), + ChatMessage(role="user", content="pending2"), + ] + ) + result, was_compacted = await _build_query_message( + "current", + session, + use_resume=True, + transcript_msg_count=2, + session_id="test-session", + session_msg_ceiling=5, # pre-drain: [hist1..hist4, current] + ) + # Gap = session.messages[2:4] = [hist3, hist4] + assert "" in result + assert "hist3" in result + assert "hist4" in result + assert "Now, the user says:\ncurrent" in result + # Pending messages must NOT appear in gap + assert "pending1" not in result + assert "pending2" not in result + + @pytest.mark.asyncio async def test_build_query_no_resume_multi_message_compacted(monkeypatch): """When compression actually compacts, was_compacted should be True.""" diff --git a/autogpt_platform/backend/backend/copilot/sdk/service.py b/autogpt_platform/backend/backend/copilot/sdk/service.py index 39299ba14b..4d53611021 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/service.py +++ b/autogpt_platform/backend/backend/copilot/sdk/service.py @@ -959,17 +959,33 @@ async def _build_query_message( use_resume: bool, transcript_msg_count: int, session_id: str, + *, + session_msg_ceiling: int | None = None, ) -> tuple[str, bool]: """Build the query message with appropriate context. + Args: + session_msg_ceiling: If provided, treat ``session.messages`` as if it + only has this many entries when computing the gap slice. Pass + ``len(session.messages)`` captured *before* appending any pending + messages so that mid-turn drains do not skew the gap calculation + and cause pending messages to be duplicated in both the gap context + and ``current_message``. + Returns: Tuple of (query_message, was_compacted). """ msg_count = len(session.messages) + # Use the ceiling if supplied (prevents pending-message duplication when + # messages were appended to session.messages after the drain but before + # this function is called). + effective_count = ( + session_msg_ceiling if session_msg_ceiling is not None else msg_count + ) if use_resume and transcript_msg_count > 0: - if transcript_msg_count < msg_count - 1: - gap = session.messages[transcript_msg_count:-1] + if transcript_msg_count < effective_count - 1: + gap = session.messages[transcript_msg_count : effective_count - 1] compressed, was_compressed = await _compress_messages(gap) gap_context = _format_conversation_context(compressed) if gap_context: @@ -2282,6 +2298,15 @@ async def stream_chat_completion_sdk( if last_user: current_message = last_user[-1].content or "" + # Capture the message count *before* draining so _build_query_message + # can compute the gap slice without including the newly-drained pending + # messages. Pending messages are both appended to session.messages AND + # concatenated into current_message; without the ceiling the gap slice + # would extend into the pending messages and duplicate them in the + # model's input context (gap_context + current_message both containing + # them). + _pre_drain_msg_count = len(session.messages) + # Drain any messages the user queued via POST /messages/pending # while the previous turn was running (or since the session was # idle). Messages are drained ATOMICALLY — one LPOP with count @@ -2341,6 +2366,7 @@ async def stream_chat_completion_sdk( use_resume, transcript_msg_count, session_id, + session_msg_ceiling=_pre_drain_msg_count, ) # On the first turn inject user context into the message instead of the # system prompt — the system prompt is now static (same for all users) @@ -2478,6 +2504,7 @@ async def stream_chat_completion_sdk( state.use_resume, state.transcript_msg_count, session_id, + session_msg_ceiling=_pre_drain_msg_count, ) if attachments.hint: state.query_message = f"{state.query_message}\n\n{attachments.hint}"