fix(backend/copilot): prevent pending message duplication in stale-transcript gap

When use_resume=True and the transcript is stale, _build_query_message computes
a gap slice from session.messages[transcript_msg_count:-1].  Pending messages
drained at turn start are appended to session.messages AND concatenated into
current_message, so without the ceiling they appear in both gap_context and
current_message.

Capture _pre_drain_msg_count before drain_pending_messages() and pass it as
session_msg_ceiling to _build_query_message.  The gap slice is now bounded at
the pre-drain count, preventing pending messages from leaking into the gap.

Adds two regression tests in query_builder_test.py.
This commit is contained in:
majdyz
2026-04-11 08:25:14 +07:00
parent c58176365f
commit 1d05b06e43
2 changed files with 101 additions and 2 deletions

View File

@@ -226,6 +226,78 @@ async def test_build_query_no_resume_multi_message(monkeypatch):
assert was_compacted is False # mock returns False
@pytest.mark.asyncio
async def test_build_query_session_msg_ceiling_prevents_pending_duplication():
"""session_msg_ceiling stops pending messages from leaking into the gap.
Scenario: transcript covers 2 messages, session has 2 historical + 1 current
+ 2 pending drained at turn start. Without the ceiling the gap would include
the pending messages AND current_message already has them → duplication.
With session_msg_ceiling=3 (pre-drain count) the gap slice is empty and
only current_message carries the pending content.
"""
# session.messages after drain: [hist1, hist2, current_msg, pending1, pending2]
session = _make_session(
[
ChatMessage(role="user", content="hist1"),
ChatMessage(role="assistant", content="hist2"),
ChatMessage(role="user", content="current msg with pending1 pending2"),
ChatMessage(role="user", content="pending1"),
ChatMessage(role="user", content="pending2"),
]
)
# transcript covers hist1+hist2 (2 messages); pre-drain count was 3 (includes current_msg)
result, was_compacted = await _build_query_message(
"current msg with pending1 pending2",
session,
use_resume=True,
transcript_msg_count=2,
session_id="test-session",
session_msg_ceiling=3, # len(session.messages) before drain
)
# Gap should be empty (transcript_msg_count == ceiling - 1), so no history prepended
assert result == "current msg with pending1 pending2"
assert was_compacted is False
# Pending messages must NOT appear in gap context
assert "pending1" not in result.split("current msg")[0]
@pytest.mark.asyncio
async def test_build_query_session_msg_ceiling_preserves_real_gap():
"""session_msg_ceiling still surfaces a genuine stale-transcript gap.
Scenario: transcript covers 2 messages, session has 4 historical + 1 current
+ 2 pending. Ceiling = 5 (pre-drain). Real gap = messages 2-3 (hist3, hist4).
"""
session = _make_session(
[
ChatMessage(role="user", content="hist1"),
ChatMessage(role="assistant", content="hist2"),
ChatMessage(role="user", content="hist3"),
ChatMessage(role="assistant", content="hist4"),
ChatMessage(role="user", content="current"),
ChatMessage(role="user", content="pending1"),
ChatMessage(role="user", content="pending2"),
]
)
result, was_compacted = await _build_query_message(
"current",
session,
use_resume=True,
transcript_msg_count=2,
session_id="test-session",
session_msg_ceiling=5, # pre-drain: [hist1..hist4, current]
)
# Gap = session.messages[2:4] = [hist3, hist4]
assert "<conversation_history>" in result
assert "hist3" in result
assert "hist4" in result
assert "Now, the user says:\ncurrent" in result
# Pending messages must NOT appear in gap
assert "pending1" not in result
assert "pending2" not in result
@pytest.mark.asyncio
async def test_build_query_no_resume_multi_message_compacted(monkeypatch):
"""When compression actually compacts, was_compacted should be True."""

View File

@@ -959,17 +959,33 @@ async def _build_query_message(
use_resume: bool,
transcript_msg_count: int,
session_id: str,
*,
session_msg_ceiling: int | None = None,
) -> tuple[str, bool]:
"""Build the query message with appropriate context.
Args:
session_msg_ceiling: If provided, treat ``session.messages`` as if it
only has this many entries when computing the gap slice. Pass
``len(session.messages)`` captured *before* appending any pending
messages so that mid-turn drains do not skew the gap calculation
and cause pending messages to be duplicated in both the gap context
and ``current_message``.
Returns:
Tuple of (query_message, was_compacted).
"""
msg_count = len(session.messages)
# Use the ceiling if supplied (prevents pending-message duplication when
# messages were appended to session.messages after the drain but before
# this function is called).
effective_count = (
session_msg_ceiling if session_msg_ceiling is not None else msg_count
)
if use_resume and transcript_msg_count > 0:
if transcript_msg_count < msg_count - 1:
gap = session.messages[transcript_msg_count:-1]
if transcript_msg_count < effective_count - 1:
gap = session.messages[transcript_msg_count : effective_count - 1]
compressed, was_compressed = await _compress_messages(gap)
gap_context = _format_conversation_context(compressed)
if gap_context:
@@ -2282,6 +2298,15 @@ async def stream_chat_completion_sdk(
if last_user:
current_message = last_user[-1].content or ""
# Capture the message count *before* draining so _build_query_message
# can compute the gap slice without including the newly-drained pending
# messages. Pending messages are both appended to session.messages AND
# concatenated into current_message; without the ceiling the gap slice
# would extend into the pending messages and duplicate them in the
# model's input context (gap_context + current_message both containing
# them).
_pre_drain_msg_count = len(session.messages)
# Drain any messages the user queued via POST /messages/pending
# while the previous turn was running (or since the session was
# idle). Messages are drained ATOMICALLY — one LPOP with count
@@ -2341,6 +2366,7 @@ async def stream_chat_completion_sdk(
use_resume,
transcript_msg_count,
session_id,
session_msg_ceiling=_pre_drain_msg_count,
)
# On the first turn inject user context into the message instead of the
# system prompt — the system prompt is now static (same for all users)
@@ -2478,6 +2504,7 @@ async def stream_chat_completion_sdk(
state.use_resume,
state.transcript_msg_count,
session_id,
session_msg_ceiling=_pre_drain_msg_count,
)
if attachments.hint:
state.query_message = f"{state.query_message}\n\n{attachments.hint}"