diff --git a/autogpt_platform/backend/backend/api/features/chat/routes.py b/autogpt_platform/backend/backend/api/features/chat/routes.py index 2e19ea8ca3..6d057b0270 100644 --- a/autogpt_platform/backend/backend/api/features/chat/routes.py +++ b/autogpt_platform/backend/backend/api/features/chat/routes.py @@ -1085,30 +1085,29 @@ async def queue_pending_message( # this, a client could bypass per-turn token limits by batching # their extra context through this endpoint while a cheap stream # is in flight. - if user_id: - try: - daily_limit, weekly_limit, _tier = await get_global_rate_limits( - user_id, config.daily_token_limit, config.weekly_token_limit - ) - await check_rate_limit( - user_id=user_id, - daily_token_limit=daily_limit, - weekly_token_limit=weekly_limit, - ) - except RateLimitExceeded as e: - raise HTTPException(status_code=429, detail=str(e)) from e - - if user_id: - track_user_message( - user_id=user_id, - session_id=session_id, - message_length=len(request.message), + # user_id is guaranteed non-empty by Security(auth.get_user_id) — no guard needed. + try: + daily_limit, weekly_limit, _tier = await get_global_rate_limits( + user_id, config.daily_token_limit, config.weekly_token_limit ) + await check_rate_limit( + user_id=user_id, + daily_token_limit=daily_limit, + weekly_token_limit=weekly_limit, + ) + except RateLimitExceeded as e: + raise HTTPException(status_code=429, detail=str(e)) from e + + track_user_message( + user_id=user_id, + session_id=session_id, + message_length=len(request.message), + ) # Sanitise file IDs to the user's own workspace (same logic as # stream_chat_post) so injection doesn't surface other users' files. sanitized_file_ids: list[str] = [] - if request.file_ids and user_id: + if request.file_ids: valid_ids = [fid for fid in request.file_ids if _UUID_RE.match(fid)] if valid_ids: workspace = await get_or_create_workspace(user_id) diff --git a/autogpt_platform/backend/backend/copilot/baseline/service.py b/autogpt_platform/backend/backend/copilot/baseline/service.py index f9b5a7d9ea..f46c31ff21 100644 --- a/autogpt_platform/backend/backend/copilot/baseline/service.py +++ b/autogpt_platform/backend/backend/copilot/baseline/service.py @@ -934,6 +934,10 @@ async def stream_chat_completion_baseline( message_length=len(message or ""), ) + # Capture count *before* the pending drain so is_first_turn and the + # transcript staleness check are not skewed by queued messages. + _pre_drain_msg_count = len(session.messages) + # Drain any messages the user queued via POST /messages/pending # while this session was idle (or during a previous turn whose # mid-loop drains missed them). Atomic LPOP guarantees that a @@ -948,7 +952,10 @@ async def stream_chat_completion_baseline( ) for _pm in drained_at_start: _content = format_pending_as_user_message(_pm)["content"] - maybe_append_user_message(session, _content, is_user_message=True) + # Append directly — pending messages are atomically-popped from + # Redis and are never stale-cache duplicates, so the + # maybe_append_user_message dedup is wrong here. + session.messages.append(ChatMessage(role="user", content=_content)) session = await upsert_chat_session(session) @@ -979,7 +986,9 @@ async def stream_chat_completion_baseline( # Build system prompt only on the first turn to avoid mid-conversation # changes from concurrent chats updating business understanding. - is_first_turn = len(session.messages) <= 1 + # Use the pre-drain count so queued pending messages don't incorrectly + # flip is_first_turn to False on an actual first turn. + is_first_turn = _pre_drain_msg_count <= 1 # Gate context fetch on both first turn AND user message so that assistant- # role calls (e.g. tool-result submissions) on the first turn don't trigger # a needless DB lookup for user understanding. @@ -997,7 +1006,9 @@ async def stream_chat_completion_baseline( _load_prior_transcript( user_id=user_id, session_id=session_id, - session_msg_count=len(session.messages), + # Use pre-drain count so pending messages don't falsely + # mark the stored transcript as stale and prevent upload. + session_msg_count=_pre_drain_msg_count, transcript_builder=transcript_builder, ), prompt_task, @@ -1266,8 +1277,12 @@ async def stream_chat_completion_baseline( # a faithful copy of what the model actually saw. formatted = format_pending_as_user_message(pm) content_for_db = formatted["content"] - maybe_append_user_message( - session, content_for_db, is_user_message=True + # Append directly — pending messages are atomically-popped + # from Redis and are never stale-cache duplicates, so the + # maybe_append_user_message dedup is wrong here and would + # cause openai_messages/transcript to diverge from session. + session.messages.append( + ChatMessage(role="user", content=content_for_db) ) openai_messages.append(formatted) transcript_builder.append_user(content=content_for_db) diff --git a/autogpt_platform/backend/backend/copilot/sdk/service.py b/autogpt_platform/backend/backend/copilot/sdk/service.py index 0b3eba5ee0..a818b66d08 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/service.py +++ b/autogpt_platform/backend/backend/copilot/sdk/service.py @@ -2291,11 +2291,12 @@ async def stream_chat_completion_sdk( # # The drained content is concatenated into ``current_message`` # so the SDK CLI sees it in the new user message, AND appended - # to ``session.messages`` (via ``maybe_append_user_message``, - # which dedupes trailing same-role repeats) so the durable - # transcript records it too. The endpoint deliberately does - # NOT persist to session.messages — Redis is the single source - # of truth until this drain runs. + # directly to ``session.messages`` (no dedup — pending messages are + # atomically-popped from Redis and are never stale-cache duplicates) + # so the durable transcript records it too. Session is persisted + # immediately after the drain so a crash doesn't lose the messages. + # The endpoint deliberately does NOT persist to session.messages — + # Redis is the single source of truth until this drain runs. pending_at_start = await drain_pending_messages(session_id) if pending_at_start: logger.info( @@ -2307,11 +2308,24 @@ async def stream_chat_completion_sdk( format_pending_as_user_message(pm)["content"] for pm in pending_at_start ] for _pt in pending_texts: - maybe_append_user_message(session, _pt, is_user_message=True) + # Append directly — pending messages are atomically-popped from + # Redis and are never stale-cache duplicates, so the + # maybe_append_user_message dedup is wrong here. + session.messages.append(ChatMessage(role="user", content=_pt)) if current_message.strip(): current_message = current_message + "\n\n" + "\n\n".join(pending_texts) else: current_message = "\n\n".join(pending_texts) + # Persist immediately so a crash between here and the finally block + # doesn't lose messages that were already drained from Redis. + try: + session = await upsert_chat_session(session) + except Exception as _persist_err: + logger.warning( + "%s Failed to persist drained pending messages: %s", + log_prefix, + _persist_err, + ) if not current_message.strip(): yield StreamError(