fix(backend/copilot): flush buffered rounds before mid-loop pending drain and wrap turn-start persist

Address three review comments on the pending-message PR:

1. (Blocker) Mid-loop pending drain now flushes state.session_messages
   into session.messages before appending the pending user message, so
   assistant+tool entries from completed rounds land in chronological
   order. Without this, the next turn's replay could hit OpenAI tool-call
   ordering errors (user message interposed between assistant tool_call
   and its tool result).

2. (Should-Fix) Turn-start upsert_chat_session wrapped in try/except so
   a transient DB failure doesn't silently lose messages already popped
   from Redis.  Matches the pattern used in mid-loop and SDK drain paths.

3. (Nice-to-Have) Added TestMidLoopPendingFlushOrdering regression test
   in service_unit_test.py that replays the production flush sequence
   and asserts chronological ordering of assistant/tool/pending entries.
This commit is contained in:
majdyz
2026-04-11 14:55:46 +00:00
parent 6b390d6677
commit d49ffac0a1
2 changed files with 149 additions and 1 deletions

View File

@@ -957,7 +957,19 @@ async def stream_chat_completion_baseline(
# maybe_append_user_message dedup is wrong here.
session.messages.append(ChatMessage(role="user", content=content))
session = await upsert_chat_session(session)
# Persist the drained pending messages (if any) plus the current user
# message. Wrap in try/except so a transient DB failure here does not
# silently discard messages that were already popped from Redis — the
# turn can still proceed using the in-memory session.messages, and a
# later resume/replay will backfill from the DB on the next turn.
try:
session = await upsert_chat_session(session)
except Exception as _persist_err:
logger.warning(
"[Baseline] Failed to persist session at turn start "
"(pending drain may not be durable): %s",
_persist_err,
)
# Select model based on the per-request mode. 'fast' downgrades to
# the cheaper/faster model; everything else keeps the default.
@@ -1274,6 +1286,21 @@ async def stream_chat_completion_baseline(
continue
pending = await drain_pending_messages(session_id)
if pending:
# Flush any buffered assistant/tool messages from completed
# rounds into session.messages BEFORE appending the pending
# user message. ``_baseline_conversation_updater`` only
# records assistant+tool rounds into ``state.session_messages``
# — they are normally batch-flushed in the finally block.
# Without this in-order flush, the mid-loop pending user
# message lands before the preceding round's assistant/tool
# entries, producing chronologically-wrong session.messages
# on persist (user interposed between an assistant tool_call
# and its tool-result), which breaks OpenAI tool-call ordering
# invariants on the next turn's replay.
for _buffered in state.session_messages:
session.messages.append(_buffered)
state.session_messages.clear()
for pm in pending:
# ``format_pending_as_user_message`` embeds file
# attachments and context URL/page content into the

View File

@@ -828,3 +828,124 @@ class TestBaselineCostExtraction:
# response was never assigned so cost extraction must not raise
assert state.cost_usd is None
class TestMidLoopPendingFlushOrdering:
"""Regression test for the mid-loop pending drain ordering invariant.
``_baseline_conversation_updater`` records assistant+tool entries from
each tool-call round into ``state.session_messages``; the finally block
of ``stream_chat_completion_baseline`` batch-flushes them into
``session.messages`` at the end of the turn.
The mid-loop pending drain appends pending user messages directly to
``session.messages``. Without flushing ``state.session_messages`` first,
the pending user message lands BEFORE the preceding round's assistant+
tool entries in the final persisted ``session.messages`` — which
produces a malformed tool-call/tool-result ordering on the next turn's
replay.
This test documents the invariant by replaying the production flush
sequence against an in-memory state.
"""
def test_flush_then_append_preserves_chronological_order(self):
"""Mid-loop drain must flush state.session_messages before appending
the pending user message, so the final order matches the
chronological execution order.
"""
# Initial state: user turn already appended by maybe_append_user_message
session_messages: list[ChatMessage] = [
ChatMessage(role="user", content="original user turn"),
]
state = _BaselineStreamState()
# Round 1 completes: conversation_updater buffers assistant+tool
# entries into state.session_messages (but does NOT write to
# session.messages yet).
builder = TranscriptBuilder()
builder.append_user("original user turn")
response = LLMLoopResponse(
response_text="calling search",
tool_calls=[LLMToolCall(id="tc_1", name="search", arguments="{}")],
raw_response=None,
prompt_tokens=0,
completion_tokens=0,
)
tool_results = [
ToolCallResult(
tool_call_id="tc_1", tool_name="search", content="search output"
),
]
openai_messages: list = []
_baseline_conversation_updater(
openai_messages,
response,
tool_results=tool_results,
transcript_builder=builder,
state=state,
model="test-model",
)
# state.session_messages should now hold the round-1 assistant + tool
assert len(state.session_messages) == 2
assert state.session_messages[0].role == "assistant"
assert state.session_messages[1].role == "tool"
# --- Mid-loop pending drain (production code pattern) ---
# Flush first, THEN append pending. This is the ordering fix.
for _buffered in state.session_messages:
session_messages.append(_buffered)
state.session_messages.clear()
session_messages.append(
ChatMessage(role="user", content="pending mid-loop message")
)
# Round 2 completes: new assistant+tool entries buffer again.
response2 = LLMLoopResponse(
response_text="another call",
tool_calls=[LLMToolCall(id="tc_2", name="calc", arguments="{}")],
raw_response=None,
prompt_tokens=0,
completion_tokens=0,
)
tool_results2 = [
ToolCallResult(
tool_call_id="tc_2", tool_name="calc", content="calc output"
),
]
_baseline_conversation_updater(
openai_messages,
response2,
tool_results=tool_results2,
transcript_builder=builder,
state=state,
model="test-model",
)
# --- Finally-block flush (end of turn) ---
for msg in state.session_messages:
session_messages.append(msg)
# Assert chronological order: original user, round-1 assistant,
# round-1 tool, pending user, round-2 assistant, round-2 tool.
assert [m.role for m in session_messages] == [
"user",
"assistant",
"tool",
"user",
"assistant",
"tool",
]
assert session_messages[0].content == "original user turn"
assert session_messages[3].content == "pending mid-loop message"
# The assistant message carrying tool_call tc_1 must be immediately
# followed by its tool result — no user message interposed.
assert session_messages[1].role == "assistant"
assert session_messages[1].tool_calls is not None
assert session_messages[1].tool_calls[0]["id"] == "tc_1"
assert session_messages[2].role == "tool"
assert session_messages[2].tool_call_id == "tc_1"
# Same invariant for the round after the pending user.
assert session_messages[4].tool_calls is not None
assert session_messages[4].tool_calls[0]["id"] == "tc_2"
assert session_messages[5].tool_call_id == "tc_2"