fix(backend/copilot): address round-3 review — dedup, persist, guards

- Replace maybe_append_user_message with direct session.messages.append
  for pending drain in both baseline mid-loop and SDK drain-at-start:
  pending messages are atomically popped from Redis and are never
  stale-cache duplicates, so the dedup is wrong and causes
  openai_messages/transcript to diverge from the DB record
- Add immediate upsert_chat_session after SDK drain-at-start so a
  crash between drain and finally doesn't lose messages already removed
  from Redis
- Capture _pre_drain_msg_count before the baseline drain-at-start:
  use it for is_first_turn (prevents pending messages from flipping the
  flag to False on an actual first turn) and for _load_prior_transcript
  (prevents the stale-transcript check from firing on every turn that
  drains pending messages, which would block transcript upload forever)
- Remove redundant if user_id: guards in queue_pending_message — user_id
  is guaranteed non-empty by Security(auth.get_user_id); the guards made
  the rate-limit check silently optional
This commit is contained in:
majdyz
2026-04-10 23:29:44 +07:00
parent ded048bdfb
commit a4dbcf4247
3 changed files with 58 additions and 30 deletions

View File

@@ -1085,30 +1085,29 @@ async def queue_pending_message(
# this, a client could bypass per-turn token limits by batching
# their extra context through this endpoint while a cheap stream
# is in flight.
if user_id:
try:
daily_limit, weekly_limit, _tier = await get_global_rate_limits(
user_id, config.daily_token_limit, config.weekly_token_limit
)
await check_rate_limit(
user_id=user_id,
daily_token_limit=daily_limit,
weekly_token_limit=weekly_limit,
)
except RateLimitExceeded as e:
raise HTTPException(status_code=429, detail=str(e)) from e
if user_id:
track_user_message(
user_id=user_id,
session_id=session_id,
message_length=len(request.message),
# user_id is guaranteed non-empty by Security(auth.get_user_id) — no guard needed.
try:
daily_limit, weekly_limit, _tier = await get_global_rate_limits(
user_id, config.daily_token_limit, config.weekly_token_limit
)
await check_rate_limit(
user_id=user_id,
daily_token_limit=daily_limit,
weekly_token_limit=weekly_limit,
)
except RateLimitExceeded as e:
raise HTTPException(status_code=429, detail=str(e)) from e
track_user_message(
user_id=user_id,
session_id=session_id,
message_length=len(request.message),
)
# Sanitise file IDs to the user's own workspace (same logic as
# stream_chat_post) so injection doesn't surface other users' files.
sanitized_file_ids: list[str] = []
if request.file_ids and user_id:
if request.file_ids:
valid_ids = [fid for fid in request.file_ids if _UUID_RE.match(fid)]
if valid_ids:
workspace = await get_or_create_workspace(user_id)

View File

@@ -934,6 +934,10 @@ async def stream_chat_completion_baseline(
message_length=len(message or ""),
)
# Capture count *before* the pending drain so is_first_turn and the
# transcript staleness check are not skewed by queued messages.
_pre_drain_msg_count = len(session.messages)
# Drain any messages the user queued via POST /messages/pending
# while this session was idle (or during a previous turn whose
# mid-loop drains missed them). Atomic LPOP guarantees that a
@@ -948,7 +952,10 @@ async def stream_chat_completion_baseline(
)
for _pm in drained_at_start:
_content = format_pending_as_user_message(_pm)["content"]
maybe_append_user_message(session, _content, is_user_message=True)
# Append directly — pending messages are atomically-popped from
# Redis and are never stale-cache duplicates, so the
# maybe_append_user_message dedup is wrong here.
session.messages.append(ChatMessage(role="user", content=_content))
session = await upsert_chat_session(session)
@@ -979,7 +986,9 @@ async def stream_chat_completion_baseline(
# Build system prompt only on the first turn to avoid mid-conversation
# changes from concurrent chats updating business understanding.
is_first_turn = len(session.messages) <= 1
# Use the pre-drain count so queued pending messages don't incorrectly
# flip is_first_turn to False on an actual first turn.
is_first_turn = _pre_drain_msg_count <= 1
# Gate context fetch on both first turn AND user message so that assistant-
# role calls (e.g. tool-result submissions) on the first turn don't trigger
# a needless DB lookup for user understanding.
@@ -997,7 +1006,9 @@ async def stream_chat_completion_baseline(
_load_prior_transcript(
user_id=user_id,
session_id=session_id,
session_msg_count=len(session.messages),
# Use pre-drain count so pending messages don't falsely
# mark the stored transcript as stale and prevent upload.
session_msg_count=_pre_drain_msg_count,
transcript_builder=transcript_builder,
),
prompt_task,
@@ -1266,8 +1277,12 @@ async def stream_chat_completion_baseline(
# a faithful copy of what the model actually saw.
formatted = format_pending_as_user_message(pm)
content_for_db = formatted["content"]
maybe_append_user_message(
session, content_for_db, is_user_message=True
# Append directly — pending messages are atomically-popped
# from Redis and are never stale-cache duplicates, so the
# maybe_append_user_message dedup is wrong here and would
# cause openai_messages/transcript to diverge from session.
session.messages.append(
ChatMessage(role="user", content=content_for_db)
)
openai_messages.append(formatted)
transcript_builder.append_user(content=content_for_db)

View File

@@ -2291,11 +2291,12 @@ async def stream_chat_completion_sdk(
#
# The drained content is concatenated into ``current_message``
# so the SDK CLI sees it in the new user message, AND appended
# to ``session.messages`` (via ``maybe_append_user_message``,
# which dedupes trailing same-role repeats) so the durable
# transcript records it too. The endpoint deliberately does
# NOT persist to session.messages — Redis is the single source
# of truth until this drain runs.
# directly to ``session.messages`` (no dedup — pending messages are
# atomically-popped from Redis and are never stale-cache duplicates)
# so the durable transcript records it too. Session is persisted
# immediately after the drain so a crash doesn't lose the messages.
# The endpoint deliberately does NOT persist to session.messages —
# Redis is the single source of truth until this drain runs.
pending_at_start = await drain_pending_messages(session_id)
if pending_at_start:
logger.info(
@@ -2307,11 +2308,24 @@ async def stream_chat_completion_sdk(
format_pending_as_user_message(pm)["content"] for pm in pending_at_start
]
for _pt in pending_texts:
maybe_append_user_message(session, _pt, is_user_message=True)
# Append directly — pending messages are atomically-popped from
# Redis and are never stale-cache duplicates, so the
# maybe_append_user_message dedup is wrong here.
session.messages.append(ChatMessage(role="user", content=_pt))
if current_message.strip():
current_message = current_message + "\n\n" + "\n\n".join(pending_texts)
else:
current_message = "\n\n".join(pending_texts)
# Persist immediately so a crash between here and the finally block
# doesn't lose messages that were already drained from Redis.
try:
session = await upsert_chat_session(session)
except Exception as _persist_err:
logger.warning(
"%s Failed to persist drained pending messages: %s",
log_prefix,
_persist_err,
)
if not current_message.strip():
yield StreamError(