From a78145505b50f158a34ce4f101b5399f1115727e Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Thu, 12 Feb 2026 05:52:17 +0400 Subject: [PATCH] fix(copilot): merge split assistant messages to prevent Anthropic API errors (#12062) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary - When the copilot model responds with both text content AND a long-running tool call (e.g., `create_agent`), the streaming code created two separate consecutive assistant messages — one with text, one with `tool_calls`. This caused Anthropic's API to reject with `"unexpected tool_use_id found in tool_result blocks"` because the `tool_result` couldn't find a matching `tool_use` in the immediately preceding assistant message. - Added a defensive merge of consecutive assistant messages in `to_openai_messages()` (fixes existing corrupt sessions too) - Fixed `_yield_tool_call` to add tool_calls to the existing current-turn assistant message instead of creating a new one - Changed `accumulated_tool_calls` assignment to use `extend` to prevent overwriting tool_calls added by long-running tool flow ## Test plan - [x] All 23 chat feature tests pass (`backend/api/features/chat/`) - [x] All 44 prompt utility tests pass (`backend/util/prompt_test.py`) - [x] All pre-commit hooks pass (ruff, isort, black, pyright) - [ ] Manual test: create an agent via copilot, then ask a follow-up question — should no longer get 400 error

Greptile Overview

Greptile Summary

Fixes a critical bug where long-running tool calls (like `create_agent`) caused Anthropic API 400 errors due to split assistant messages. The fix ensures tool calls are added to the existing assistant message instead of creating new ones, and adds a defensive merge function to repair any existing corrupt sessions. **Key changes:** - Added `_merge_consecutive_assistant_messages()` to defensively merge split assistant messages in `to_openai_messages()` - Modified `_yield_tool_call()` to append tool calls to the current-turn assistant message instead of creating a new one - Changed `accumulated_tool_calls` from assignment to `extend` to preserve tool calls already added by long-running tool flow **Impact:** Resolves the issue where users received 400 errors after creating agents via copilot and asking follow-up questions.

Confidence Score: 4/5

- Safe to merge with minor verification recommended - The changes are well-targeted and solve a real API compatibility issue. The logic is sound: searching backwards for the current assistant message is correct, and using `extend` instead of assignment prevents overwriting. The defensive merge in `to_openai_messages()` also fixes existing corrupt sessions. All existing tests pass according to the PR description. - No files require special attention - changes are localized and defensive

Sequence Diagram

```mermaid sequenceDiagram participant User participant StreamAPI as stream_chat_completion participant Chunks as _stream_chat_chunks participant ToolCall as _yield_tool_call participant Session as ChatSession User->>StreamAPI: Send message StreamAPI->>Chunks: Stream chat chunks alt Text + Long-running tool call Chunks->>StreamAPI: Text delta (content) StreamAPI->>Session: Append assistant message with content Chunks->>ToolCall: Tool call detected Note over ToolCall: OLD: Created new assistant message
NEW: Appends to existing assistant ToolCall->>Session: Search backwards for current assistant ToolCall->>Session: Append tool_call to existing message ToolCall->>Session: Add pending tool result end StreamAPI->>StreamAPI: Merge accumulated_tool_calls Note over StreamAPI: Use extend (not assign)
to preserve existing tool_calls StreamAPI->>Session: to_openai_messages() Session->>Session: _merge_consecutive_assistant_messages() Note over Session: Defensive: Merges any split
assistant messages Session-->>StreamAPI: Merged messages StreamAPI->>User: Return response ```
--- .../backend/api/features/chat/model.py | 65 +++++- .../backend/api/features/chat/model_test.py | 214 ++++++++++++++++++ .../backend/api/features/chat/service.py | 18 +- 3 files changed, 286 insertions(+), 11 deletions(-) diff --git a/autogpt_platform/backend/backend/api/features/chat/model.py b/autogpt_platform/backend/backend/api/features/chat/model.py index 7318ef88d7..35418f174f 100644 --- a/autogpt_platform/backend/backend/api/features/chat/model.py +++ b/autogpt_platform/backend/backend/api/features/chat/model.py @@ -2,7 +2,7 @@ import asyncio import logging import uuid from datetime import UTC, datetime -from typing import Any +from typing import Any, cast from weakref import WeakValueDictionary from openai.types.chat import ( @@ -104,6 +104,26 @@ class ChatSession(BaseModel): successful_agent_runs: dict[str, int] = {} successful_agent_schedules: dict[str, int] = {} + def add_tool_call_to_current_turn(self, tool_call: dict) -> None: + """Attach a tool_call to the current turn's assistant message. + + Searches backwards for the most recent assistant message (stopping at + any user message boundary). If found, appends the tool_call to it. + Otherwise creates a new assistant message with the tool_call. + """ + for msg in reversed(self.messages): + if msg.role == "user": + break + if msg.role == "assistant": + if not msg.tool_calls: + msg.tool_calls = [] + msg.tool_calls.append(tool_call) + return + + self.messages.append( + ChatMessage(role="assistant", content="", tool_calls=[tool_call]) + ) + @staticmethod def new(user_id: str) -> "ChatSession": return ChatSession( @@ -172,6 +192,47 @@ class ChatSession(BaseModel): successful_agent_schedules=successful_agent_schedules, ) + @staticmethod + def _merge_consecutive_assistant_messages( + messages: list[ChatCompletionMessageParam], + ) -> list[ChatCompletionMessageParam]: + """Merge consecutive assistant messages into single messages. + + Long-running tool flows can create split assistant messages: one with + text content and another with tool_calls. Anthropic's API requires + tool_result blocks to reference a tool_use in the immediately preceding + assistant message, so these splits cause 400 errors via OpenRouter. + """ + if len(messages) < 2: + return messages + + result: list[ChatCompletionMessageParam] = [messages[0]] + for msg in messages[1:]: + prev = result[-1] + if prev.get("role") != "assistant" or msg.get("role") != "assistant": + result.append(msg) + continue + + prev = cast(ChatCompletionAssistantMessageParam, prev) + curr = cast(ChatCompletionAssistantMessageParam, msg) + + curr_content = curr.get("content") or "" + if curr_content: + prev_content = prev.get("content") or "" + prev["content"] = ( + f"{prev_content}\n{curr_content}" if prev_content else curr_content + ) + + curr_tool_calls = curr.get("tool_calls") + if curr_tool_calls: + prev_tool_calls = prev.get("tool_calls") + prev["tool_calls"] = ( + list(prev_tool_calls) + list(curr_tool_calls) + if prev_tool_calls + else list(curr_tool_calls) + ) + return result + def to_openai_messages(self) -> list[ChatCompletionMessageParam]: messages = [] for message in self.messages: @@ -258,7 +319,7 @@ class ChatSession(BaseModel): name=message.name or "", ) ) - return messages + return self._merge_consecutive_assistant_messages(messages) async def _get_session_from_cache(session_id: str) -> ChatSession | None: diff --git a/autogpt_platform/backend/backend/api/features/chat/model_test.py b/autogpt_platform/backend/backend/api/features/chat/model_test.py index c230b00f9c..239137844d 100644 --- a/autogpt_platform/backend/backend/api/features/chat/model_test.py +++ b/autogpt_platform/backend/backend/api/features/chat/model_test.py @@ -1,4 +1,16 @@ +from typing import cast + import pytest +from openai.types.chat import ( + ChatCompletionAssistantMessageParam, + ChatCompletionMessageParam, + ChatCompletionToolMessageParam, + ChatCompletionUserMessageParam, +) +from openai.types.chat.chat_completion_message_tool_call_param import ( + ChatCompletionMessageToolCallParam, + Function, +) from .model import ( ChatMessage, @@ -117,3 +129,205 @@ async def test_chatsession_db_storage(setup_test_user, test_user_id): loaded.tool_calls is not None ), f"Tool calls missing for {orig.role} message" assert len(orig.tool_calls) == len(loaded.tool_calls) + + +# --------------------------------------------------------------------------- # +# _merge_consecutive_assistant_messages # +# --------------------------------------------------------------------------- # + +_tc = ChatCompletionMessageToolCallParam( + id="tc1", type="function", function=Function(name="do_stuff", arguments="{}") +) +_tc2 = ChatCompletionMessageToolCallParam( + id="tc2", type="function", function=Function(name="other", arguments="{}") +) + + +def test_merge_noop_when_no_consecutive_assistants(): + """Messages without consecutive assistants are returned unchanged.""" + msgs = [ + ChatCompletionUserMessageParam(role="user", content="hi"), + ChatCompletionAssistantMessageParam(role="assistant", content="hello"), + ChatCompletionUserMessageParam(role="user", content="bye"), + ] + merged = ChatSession._merge_consecutive_assistant_messages(msgs) + assert len(merged) == 3 + assert [m["role"] for m in merged] == ["user", "assistant", "user"] + + +def test_merge_splits_text_and_tool_calls(): + """The exact bug scenario: text-only assistant followed by tool_calls-only assistant.""" + msgs = [ + ChatCompletionUserMessageParam(role="user", content="build agent"), + ChatCompletionAssistantMessageParam( + role="assistant", content="Let me build that" + ), + ChatCompletionAssistantMessageParam( + role="assistant", content="", tool_calls=[_tc] + ), + ChatCompletionToolMessageParam(role="tool", content="ok", tool_call_id="tc1"), + ] + merged = ChatSession._merge_consecutive_assistant_messages(msgs) + + assert len(merged) == 3 + assert merged[0]["role"] == "user" + assert merged[2]["role"] == "tool" + a = cast(ChatCompletionAssistantMessageParam, merged[1]) + assert a["role"] == "assistant" + assert a.get("content") == "Let me build that" + assert a.get("tool_calls") == [_tc] + + +def test_merge_combines_tool_calls_from_both(): + """Both consecutive assistants have tool_calls — they get merged.""" + msgs: list[ChatCompletionAssistantMessageParam] = [ + ChatCompletionAssistantMessageParam( + role="assistant", content="text", tool_calls=[_tc] + ), + ChatCompletionAssistantMessageParam( + role="assistant", content="", tool_calls=[_tc2] + ), + ] + merged = ChatSession._merge_consecutive_assistant_messages(msgs) # type: ignore[arg-type] + + assert len(merged) == 1 + a = cast(ChatCompletionAssistantMessageParam, merged[0]) + assert a.get("tool_calls") == [_tc, _tc2] + assert a.get("content") == "text" + + +def test_merge_three_consecutive_assistants(): + """Three consecutive assistants collapse into one.""" + msgs: list[ChatCompletionAssistantMessageParam] = [ + ChatCompletionAssistantMessageParam(role="assistant", content="a"), + ChatCompletionAssistantMessageParam(role="assistant", content="b"), + ChatCompletionAssistantMessageParam( + role="assistant", content="", tool_calls=[_tc] + ), + ] + merged = ChatSession._merge_consecutive_assistant_messages(msgs) # type: ignore[arg-type] + + assert len(merged) == 1 + a = cast(ChatCompletionAssistantMessageParam, merged[0]) + assert a.get("content") == "a\nb" + assert a.get("tool_calls") == [_tc] + + +def test_merge_empty_and_single_message(): + """Edge cases: empty list and single message.""" + assert ChatSession._merge_consecutive_assistant_messages([]) == [] + + single: list[ChatCompletionMessageParam] = [ + ChatCompletionUserMessageParam(role="user", content="hi") + ] + assert ChatSession._merge_consecutive_assistant_messages(single) == single + + +# --------------------------------------------------------------------------- # +# add_tool_call_to_current_turn # +# --------------------------------------------------------------------------- # + +_raw_tc = { + "id": "tc1", + "type": "function", + "function": {"name": "f", "arguments": "{}"}, +} +_raw_tc2 = { + "id": "tc2", + "type": "function", + "function": {"name": "g", "arguments": "{}"}, +} + + +def test_add_tool_call_appends_to_existing_assistant(): + """When the last assistant is from the current turn, tool_call is added to it.""" + session = ChatSession.new(user_id="u") + session.messages = [ + ChatMessage(role="user", content="hi"), + ChatMessage(role="assistant", content="working on it"), + ] + session.add_tool_call_to_current_turn(_raw_tc) + + assert len(session.messages) == 2 # no new message created + assert session.messages[1].tool_calls == [_raw_tc] + + +def test_add_tool_call_creates_assistant_when_none_exists(): + """When there's no current-turn assistant, a new one is created.""" + session = ChatSession.new(user_id="u") + session.messages = [ + ChatMessage(role="user", content="hi"), + ] + session.add_tool_call_to_current_turn(_raw_tc) + + assert len(session.messages) == 2 + assert session.messages[1].role == "assistant" + assert session.messages[1].tool_calls == [_raw_tc] + + +def test_add_tool_call_does_not_cross_user_boundary(): + """A user message acts as a boundary — previous assistant is not modified.""" + session = ChatSession.new(user_id="u") + session.messages = [ + ChatMessage(role="assistant", content="old turn"), + ChatMessage(role="user", content="new message"), + ] + session.add_tool_call_to_current_turn(_raw_tc) + + assert len(session.messages) == 3 # new assistant was created + assert session.messages[0].tool_calls is None # old assistant untouched + assert session.messages[2].role == "assistant" + assert session.messages[2].tool_calls == [_raw_tc] + + +def test_add_tool_call_multiple_times(): + """Multiple long-running tool calls accumulate on the same assistant.""" + session = ChatSession.new(user_id="u") + session.messages = [ + ChatMessage(role="user", content="hi"), + ChatMessage(role="assistant", content="doing stuff"), + ] + session.add_tool_call_to_current_turn(_raw_tc) + # Simulate a pending tool result in between (like _yield_tool_call does) + session.messages.append( + ChatMessage(role="tool", content="pending", tool_call_id="tc1") + ) + session.add_tool_call_to_current_turn(_raw_tc2) + + assert len(session.messages) == 3 # user, assistant, tool — no extra assistant + assert session.messages[1].tool_calls == [_raw_tc, _raw_tc2] + + +def test_to_openai_messages_merges_split_assistants(): + """End-to-end: session with split assistants produces valid OpenAI messages.""" + session = ChatSession.new(user_id="u") + session.messages = [ + ChatMessage(role="user", content="build agent"), + ChatMessage(role="assistant", content="Let me build that"), + ChatMessage( + role="assistant", + content="", + tool_calls=[ + { + "id": "tc1", + "type": "function", + "function": {"name": "create_agent", "arguments": "{}"}, + } + ], + ), + ChatMessage(role="tool", content="done", tool_call_id="tc1"), + ChatMessage(role="assistant", content="Saved!"), + ChatMessage(role="user", content="show me an example run"), + ] + openai_msgs = session.to_openai_messages() + + # The two consecutive assistants at index 1,2 should be merged + roles = [m["role"] for m in openai_msgs] + assert roles == ["user", "assistant", "tool", "assistant", "user"] + + # The merged assistant should have both content and tool_calls + merged = cast(ChatCompletionAssistantMessageParam, openai_msgs[1]) + assert merged.get("content") == "Let me build that" + tc_list = merged.get("tool_calls") + assert tc_list is not None and len(list(tc_list)) == 1 + assert list(tc_list)[0]["id"] == "tc1" diff --git a/autogpt_platform/backend/backend/api/features/chat/service.py b/autogpt_platform/backend/backend/api/features/chat/service.py index 072ea88fd5..193566ea01 100644 --- a/autogpt_platform/backend/backend/api/features/chat/service.py +++ b/autogpt_platform/backend/backend/api/features/chat/service.py @@ -800,9 +800,13 @@ async def stream_chat_completion( # Build the messages list in the correct order messages_to_save: list[ChatMessage] = [] - # Add assistant message with tool_calls if any + # Add assistant message with tool_calls if any. + # Use extend (not assign) to preserve tool_calls already added by + # _yield_tool_call for long-running tools. if accumulated_tool_calls: - assistant_response.tool_calls = accumulated_tool_calls + if not assistant_response.tool_calls: + assistant_response.tool_calls = [] + assistant_response.tool_calls.extend(accumulated_tool_calls) logger.info( f"Added {len(accumulated_tool_calls)} tool calls to assistant message" ) @@ -1404,13 +1408,9 @@ async def _yield_tool_call( operation_id=operation_id, ) - # Save assistant message with tool_call FIRST (required by LLM) - assistant_message = ChatMessage( - role="assistant", - content="", - tool_calls=[tool_calls[yield_idx]], - ) - session.messages.append(assistant_message) + # Attach the tool_call to the current turn's assistant message + # (or create one if this is a tool-only response with no text). + session.add_tool_call_to_current_turn(tool_calls[yield_idx]) # Then save pending tool result pending_message = ChatMessage(