- Error: {error.message}
+
+
Something went wrong
+
+ The assistant encountered an error. Please try sending your
+ message again.
+
)}
diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/tools/CreateAgent/CreateAgent.tsx b/autogpt_platform/frontend/src/app/(platform)/copilot/tools/CreateAgent/CreateAgent.tsx
index 0d023d0529..88b1c491d7 100644
--- a/autogpt_platform/frontend/src/app/(platform)/copilot/tools/CreateAgent/CreateAgent.tsx
+++ b/autogpt_platform/frontend/src/app/(platform)/copilot/tools/CreateAgent/CreateAgent.tsx
@@ -4,7 +4,6 @@ import { WarningDiamondIcon } from "@phosphor-icons/react";
import type { ToolUIPart } from "ai";
import { useCopilotChatActions } from "../../components/CopilotChatActionsProvider/useCopilotChatActions";
import { MorphingTextAnimation } from "../../components/MorphingTextAnimation/MorphingTextAnimation";
-import { OrbitLoader } from "../../components/OrbitLoader/OrbitLoader";
import { ProgressBar } from "../../components/ProgressBar/ProgressBar";
import {
ContentCardDescription,
@@ -77,7 +76,7 @@ function getAccordionMeta(output: CreateAgentToolOutput) {
isOperationInProgressOutput(output)
) {
return {
- icon:
,
+ icon,
title: "Creating agent, this may take a few minutes. Sit back and relax.",
};
}
diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/tools/RunAgent/helpers.tsx b/autogpt_platform/frontend/src/app/(platform)/copilot/tools/RunAgent/helpers.tsx
index 816c661230..2b75ed9c97 100644
--- a/autogpt_platform/frontend/src/app/(platform)/copilot/tools/RunAgent/helpers.tsx
+++ b/autogpt_platform/frontend/src/app/(platform)/copilot/tools/RunAgent/helpers.tsx
@@ -203,7 +203,7 @@ export function getAccordionMeta(output: RunAgentToolOutput): {
? output.status.trim()
: "started";
return {
- icon:
,
+ icon,
title: output.graph_name,
description: `Status: ${statusText}`,
};
diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/tools/RunBlock/helpers.tsx b/autogpt_platform/frontend/src/app/(platform)/copilot/tools/RunBlock/helpers.tsx
index c9b903876a..b8625988cd 100644
--- a/autogpt_platform/frontend/src/app/(platform)/copilot/tools/RunBlock/helpers.tsx
+++ b/autogpt_platform/frontend/src/app/(platform)/copilot/tools/RunBlock/helpers.tsx
@@ -149,7 +149,7 @@ export function getAccordionMeta(output: RunBlockToolOutput): {
if (isRunBlockBlockOutput(output)) {
const keys = Object.keys(output.outputs ?? {});
return {
- icon:
,
+ icon,
title: output.block_name,
description:
keys.length > 0
diff --git a/autogpt_platform/frontend/src/app/api/chat/sessions/[sessionId]/stream/route.ts b/autogpt_platform/frontend/src/app/api/chat/sessions/[sessionId]/stream/route.ts
index 6facf80c58..bd27c77963 100644
--- a/autogpt_platform/frontend/src/app/api/chat/sessions/[sessionId]/stream/route.ts
+++ b/autogpt_platform/frontend/src/app/api/chat/sessions/[sessionId]/stream/route.ts
@@ -1,11 +1,8 @@
import { environment } from "@/services/environment";
import { getServerAuthToken } from "@/lib/autogpt-server-api/helpers";
import { NextRequest } from "next/server";
+import { normalizeSSEStream, SSE_HEADERS } from "../../../sse-helpers";
-/**
- * SSE Proxy for chat streaming.
- * Supports POST with context (page content + URL) in the request body.
- */
export async function POST(
request: NextRequest,
{ params }: { params: Promise<{ sessionId: string }> },
@@ -23,17 +20,14 @@ export async function POST(
);
}
- // Get auth token from server-side session
const token = await getServerAuthToken();
- // Build backend URL
const backendUrl = environment.getAGPTServerBaseUrl();
const streamUrl = new URL(
`/api/chat/sessions/${sessionId}/stream`,
backendUrl,
);
- // Forward request to backend with auth header
const headers: Record
= {
"Content-Type": "application/json",
Accept: "text/event-stream",
@@ -63,14 +57,15 @@ export async function POST(
});
}
- // Return the SSE stream directly
- return new Response(response.body, {
- headers: {
- "Content-Type": "text/event-stream",
- "Cache-Control": "no-cache, no-transform",
- Connection: "keep-alive",
- "X-Accel-Buffering": "no",
- },
+ if (!response.body) {
+ return new Response(
+ JSON.stringify({ error: "Empty response from chat service" }),
+ { status: 502, headers: { "Content-Type": "application/json" } },
+ );
+ }
+
+ return new Response(normalizeSSEStream(response.body), {
+ headers: SSE_HEADERS,
});
} catch (error) {
console.error("SSE proxy error:", error);
@@ -87,13 +82,6 @@ export async function POST(
}
}
-/**
- * Resume an active stream for a session.
- *
- * Called by the AI SDK's `useChat(resume: true)` on page load.
- * Proxies to the backend which checks for an active stream and either
- * replays it (200 + SSE) or returns 204 No Content.
- */
export async function GET(
_request: NextRequest,
{ params }: { params: Promise<{ sessionId: string }> },
@@ -124,7 +112,6 @@ export async function GET(
headers,
});
- // 204 = no active stream to resume
if (response.status === 204) {
return new Response(null, { status: 204 });
}
@@ -137,12 +124,13 @@ export async function GET(
});
}
- return new Response(response.body, {
+ if (!response.body) {
+ return new Response(null, { status: 204 });
+ }
+
+ return new Response(normalizeSSEStream(response.body), {
headers: {
- "Content-Type": "text/event-stream",
- "Cache-Control": "no-cache, no-transform",
- Connection: "keep-alive",
- "X-Accel-Buffering": "no",
+ ...SSE_HEADERS,
"x-vercel-ai-ui-message-stream": "v1",
},
});
diff --git a/autogpt_platform/frontend/src/app/api/chat/sse-helpers.ts b/autogpt_platform/frontend/src/app/api/chat/sse-helpers.ts
new file mode 100644
index 0000000000..a5c76cf872
--- /dev/null
+++ b/autogpt_platform/frontend/src/app/api/chat/sse-helpers.ts
@@ -0,0 +1,72 @@
+export const SSE_HEADERS = {
+ "Content-Type": "text/event-stream",
+ "Cache-Control": "no-cache, no-transform",
+ Connection: "keep-alive",
+ "X-Accel-Buffering": "no",
+} as const;
+
+export function normalizeSSEStream(
+ input: ReadableStream,
+): ReadableStream {
+ const decoder = new TextDecoder();
+ const encoder = new TextEncoder();
+ let buffer = "";
+
+ return input.pipeThrough(
+ new TransformStream({
+ transform(chunk, controller) {
+ buffer += decoder.decode(chunk, { stream: true });
+
+ const parts = buffer.split("\n\n");
+ buffer = parts.pop() ?? "";
+
+ for (const part of parts) {
+ const normalized = normalizeSSEEvent(part);
+ controller.enqueue(encoder.encode(normalized + "\n\n"));
+ }
+ },
+ flush(controller) {
+ if (buffer.trim()) {
+ const normalized = normalizeSSEEvent(buffer);
+ controller.enqueue(encoder.encode(normalized + "\n\n"));
+ }
+ },
+ }),
+ );
+}
+
+function normalizeSSEEvent(event: string): string {
+ const lines = event.split("\n");
+ const dataLines: string[] = [];
+ const otherLines: string[] = [];
+
+ for (const line of lines) {
+ if (line.startsWith("data: ")) {
+ dataLines.push(line.slice(6));
+ } else {
+ otherLines.push(line);
+ }
+ }
+
+ if (dataLines.length === 0) return event;
+
+ const dataStr = dataLines.join("\n");
+ try {
+ const parsed = JSON.parse(dataStr) as Record;
+ if (parsed.type === "error") {
+ const normalized = {
+ type: "error",
+ errorText:
+ typeof parsed.errorText === "string"
+ ? parsed.errorText
+ : "An unexpected error occurred",
+ };
+ const newData = `data: ${JSON.stringify(normalized)}`;
+ return [...otherLines.filter((l) => l.length > 0), newData].join("\n");
+ }
+ } catch {
+ // Not valid JSON — pass through as-is
+ }
+
+ return event;
+}
diff --git a/autogpt_platform/frontend/src/app/api/chat/tasks/[taskId]/stream/route.ts b/autogpt_platform/frontend/src/app/api/chat/tasks/[taskId]/stream/route.ts
index 336786bfdb..238fdebb06 100644
--- a/autogpt_platform/frontend/src/app/api/chat/tasks/[taskId]/stream/route.ts
+++ b/autogpt_platform/frontend/src/app/api/chat/tasks/[taskId]/stream/route.ts
@@ -1,20 +1,8 @@
import { environment } from "@/services/environment";
import { getServerAuthToken } from "@/lib/autogpt-server-api/helpers";
import { NextRequest } from "next/server";
+import { normalizeSSEStream, SSE_HEADERS } from "../../../sse-helpers";
-/**
- * SSE Proxy for task stream reconnection.
- *
- * This endpoint allows clients to reconnect to an ongoing or recently completed
- * background task's stream. It replays missed messages from Redis Streams and
- * subscribes to live updates if the task is still running.
- *
- * Client contract:
- * 1. When receiving an operation_started event, store the task_id
- * 2. To reconnect: GET /api/chat/tasks/{taskId}/stream?last_message_id={idx}
- * 3. Messages are replayed from the last_message_id position
- * 4. Stream ends when "finish" event is received
- */
export async function GET(
request: NextRequest,
{ params }: { params: Promise<{ taskId: string }> },
@@ -24,15 +12,12 @@ export async function GET(
const lastMessageId = searchParams.get("last_message_id") || "0-0";
try {
- // Get auth token from server-side session
const token = await getServerAuthToken();
- // Build backend URL
const backendUrl = environment.getAGPTServerBaseUrl();
const streamUrl = new URL(`/api/chat/tasks/${taskId}/stream`, backendUrl);
streamUrl.searchParams.set("last_message_id", lastMessageId);
- // Forward request to backend with auth header
const headers: Record = {
Accept: "text/event-stream",
"Cache-Control": "no-cache",
@@ -56,14 +41,12 @@ export async function GET(
});
}
- // Return the SSE stream directly
- return new Response(response.body, {
- headers: {
- "Content-Type": "text/event-stream",
- "Cache-Control": "no-cache, no-transform",
- Connection: "keep-alive",
- "X-Accel-Buffering": "no",
- },
+ if (!response.body) {
+ return new Response(null, { status: 204 });
+ }
+
+ return new Response(normalizeSSEStream(response.body), {
+ headers: SSE_HEADERS,
});
} catch (error) {
console.error("Task stream proxy error:", error);
From 36aeb0b2b3d0ee0e8f23236771108040425a6cd5 Mon Sep 17 00:00:00 2001
From: Otto
Date: Wed, 11 Feb 2026 15:43:58 +0000
Subject: [PATCH 3/4] docs(blocks): clarify HumanInTheLoop output descriptions
for agent builder (#12069)
## Problem
The agent builder (LLM) misinterprets the HumanInTheLoop block outputs.
It thinks `approved_data` and `rejected_data` will yield status strings
like "APPROVED" or "REJECTED" instead of understanding that the actual
input data passes through.
This leads to unnecessary complexity - the agent builder adds comparison
blocks to check for status strings that don't exist.
## Solution
Enriched the block docstring and all input/output field descriptions to
make it explicit that:
1. The output is the actual data itself, not a status string
2. The routing is determined by which output pin fires
3. How to use the block correctly (connect downstream blocks to
appropriate output pins)
## Changes
- Updated block docstring with clear "How it works" and "Example usage"
sections
- Enhanced `data` input description to explain data flow
- Enhanced `name` input description for reviewer context
- Enhanced `approved_data` output to explicitly state it's NOT a status
string
- Enhanced `rejected_data` output to explicitly state it's NOT a status
string
- Enhanced `review_message` output for clarity
## Testing
Documentation-only change to schema descriptions. No functional changes.
Fixes SECRT-1930
Greptile Overview
Greptile Summary
Enhanced documentation for the `HumanInTheLoopBlock` to clarify how
output pins work. The key improvement explicitly states that output pins
(`approved_data` and `rejected_data`) yield the actual input data, not
status strings like "APPROVED" or "REJECTED". This prevents the agent
builder (LLM) from misinterpreting the block's behavior and adding
unnecessary comparison blocks.
**Key changes:**
- Added "How it works" and "Example usage" sections to the block
docstring
- Clarified that routing is determined by which output pin fires, not by
comparing output values
- Enhanced all input/output field descriptions with explicit data flow
explanations
- Emphasized that downstream blocks should be connected to the
appropriate output pin based on desired workflow path
This is a documentation-only change with no functional modifications to
the code logic.
Confidence Score: 5/5
- This PR is safe to merge with no risk
- Documentation-only change that accurately reflects the existing code
behavior. No functional changes, no runtime impact, and the enhanced
descriptions correctly explain how the block outputs work based on
verification of the implementation code.
- No files require special attention
Co-authored-by: Zamil Majdy
---
.../backend/blocks/human_in_the_loop.py | 56 ++++++++++++++-----
docs/integrations/README.md | 2 +-
docs/integrations/block-integrations/basic.md | 14 ++---
3 files changed, 50 insertions(+), 22 deletions(-)
diff --git a/autogpt_platform/backend/backend/blocks/human_in_the_loop.py b/autogpt_platform/backend/backend/blocks/human_in_the_loop.py
index 568ac4b33f..d31f90ec81 100644
--- a/autogpt_platform/backend/backend/blocks/human_in_the_loop.py
+++ b/autogpt_platform/backend/backend/blocks/human_in_the_loop.py
@@ -21,43 +21,71 @@ logger = logging.getLogger(__name__)
class HumanInTheLoopBlock(Block):
"""
- This block pauses execution and waits for human approval or modification of the data.
+ Pauses execution and waits for human approval or rejection of the data.
- When executed, it creates a pending review entry and sets the node execution status
- to REVIEW. The execution will remain paused until a human user either:
- - Approves the data (with or without modifications)
- - Rejects the data
+ When executed, this block creates a pending review entry and sets the node execution
+ status to REVIEW. The execution remains paused until a human user either approves
+ or rejects the data.
- This is useful for workflows that require human validation or intervention before
- proceeding to the next steps.
+ **How it works:**
+ - The input data is presented to a human reviewer
+ - The reviewer can approve or reject (and optionally modify the data if editable)
+ - On approval: the data flows out through the `approved_data` output pin
+ - On rejection: the data flows out through the `rejected_data` output pin
+
+ **Important:** The output pins yield the actual data itself, NOT status strings.
+ The approval/rejection decision determines WHICH output pin fires, not the value.
+ You do NOT need to compare the output to "APPROVED" or "REJECTED" - simply connect
+ downstream blocks to the appropriate output pin for each case.
+
+ **Example usage:**
+ - Connect `approved_data` → next step in your workflow (data was approved)
+ - Connect `rejected_data` → error handling or notification (data was rejected)
"""
class Input(BlockSchemaInput):
- data: Any = SchemaField(description="The data to be reviewed by a human user")
+ data: Any = SchemaField(
+ description="The data to be reviewed by a human user. "
+ "This exact data will be passed through to either approved_data or "
+ "rejected_data output based on the reviewer's decision."
+ )
name: str = SchemaField(
- description="A descriptive name for what this data represents",
+ description="A descriptive name for what this data represents. "
+ "This helps the reviewer understand what they are reviewing.",
)
editable: bool = SchemaField(
- description="Whether the human reviewer can edit the data",
+ description="Whether the human reviewer can edit the data before "
+ "approving or rejecting it",
default=True,
advanced=True,
)
class Output(BlockSchemaOutput):
approved_data: Any = SchemaField(
- description="The data when approved (may be modified by reviewer)"
+ description="Outputs the input data when the reviewer APPROVES it. "
+ "The value is the actual data itself (not a status string like 'APPROVED'). "
+ "If the reviewer edited the data, this contains the modified version. "
+ "Connect downstream blocks here for the 'approved' workflow path."
)
rejected_data: Any = SchemaField(
- description="The data when rejected (may be modified by reviewer)"
+ description="Outputs the input data when the reviewer REJECTS it. "
+ "The value is the actual data itself (not a status string like 'REJECTED'). "
+ "If the reviewer edited the data, this contains the modified version. "
+ "Connect downstream blocks here for the 'rejected' workflow path."
)
review_message: str = SchemaField(
- description="Any message provided by the reviewer", default=""
+ description="Optional message provided by the reviewer explaining their "
+ "decision. Only outputs when the reviewer provides a message; "
+ "this pin does not fire if no message was given.",
+ default="",
)
def __init__(self):
super().__init__(
id="8b2a7b3c-6e9d-4a5f-8c1b-2e3f4a5b6c7d",
- description="Pause execution and wait for human approval or modification of data",
+ description="Pause execution for human review. Data flows through "
+ "approved_data or rejected_data output based on the reviewer's decision. "
+ "Outputs contain the actual data, not status strings.",
categories={BlockCategory.BASIC},
input_schema=HumanInTheLoopBlock.Input,
output_schema=HumanInTheLoopBlock.Output,
diff --git a/docs/integrations/README.md b/docs/integrations/README.md
index 97a4d98709..a471ef3533 100644
--- a/docs/integrations/README.md
+++ b/docs/integrations/README.md
@@ -61,7 +61,7 @@ Below is a comprehensive list of all available blocks, categorized by their prim
| [Get List Item](block-integrations/basic.md#get-list-item) | Returns the element at the given index |
| [Get Store Agent Details](block-integrations/system/store_operations.md#get-store-agent-details) | Get detailed information about an agent from the store |
| [Get Weather Information](block-integrations/basic.md#get-weather-information) | Retrieves weather information for a specified location using OpenWeatherMap API |
-| [Human In The Loop](block-integrations/basic.md#human-in-the-loop) | Pause execution and wait for human approval or modification of data |
+| [Human In The Loop](block-integrations/basic.md#human-in-the-loop) | Pause execution for human review |
| [List Is Empty](block-integrations/basic.md#list-is-empty) | Checks if a list is empty |
| [List Library Agents](block-integrations/system/library_operations.md#list-library-agents) | List all agents in your personal library |
| [Note](block-integrations/basic.md#note) | A visual annotation block that displays a sticky note in the workflow editor for documentation and organization purposes |
diff --git a/docs/integrations/block-integrations/basic.md b/docs/integrations/block-integrations/basic.md
index 5a73fd5a03..08def38ede 100644
--- a/docs/integrations/block-integrations/basic.md
+++ b/docs/integrations/block-integrations/basic.md
@@ -975,7 +975,7 @@ A travel planning application could use this block to provide users with current
## Human In The Loop
### What it is
-Pause execution and wait for human approval or modification of data
+Pause execution for human review. Data flows through approved_data or rejected_data output based on the reviewer's decision. Outputs contain the actual data, not status strings.
### How it works
@@ -988,18 +988,18 @@ This enables human oversight at critical points in automated workflows, ensuring
| Input | Description | Type | Required |
|-------|-------------|------|----------|
-| data | The data to be reviewed by a human user | Data | Yes |
-| name | A descriptive name for what this data represents | str | Yes |
-| editable | Whether the human reviewer can edit the data | bool | No |
+| data | The data to be reviewed by a human user. This exact data will be passed through to either approved_data or rejected_data output based on the reviewer's decision. | Data | Yes |
+| name | A descriptive name for what this data represents. This helps the reviewer understand what they are reviewing. | str | Yes |
+| editable | Whether the human reviewer can edit the data before approving or rejecting it | bool | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
-| approved_data | The data when approved (may be modified by reviewer) | Approved Data |
-| rejected_data | The data when rejected (may be modified by reviewer) | Rejected Data |
-| review_message | Any message provided by the reviewer | str |
+| approved_data | Outputs the input data when the reviewer APPROVES it. The value is the actual data itself (not a status string like 'APPROVED'). If the reviewer edited the data, this contains the modified version. Connect downstream blocks here for the 'approved' workflow path. | Approved Data |
+| rejected_data | Outputs the input data when the reviewer REJECTS it. The value is the actual data itself (not a status string like 'REJECTED'). If the reviewer edited the data, this contains the modified version. Connect downstream blocks here for the 'rejected' workflow path. | Rejected Data |
+| review_message | Optional message provided by the reviewer explaining their decision. Only outputs when the reviewer provides a message; this pin does not fire if no message was given. | str |
### Possible use case
From a78145505b50f158a34ce4f101b5399f1115727e Mon Sep 17 00:00:00 2001
From: Zamil Majdy
Date: Thu, 12 Feb 2026 05:52:17 +0400
Subject: [PATCH 4/4] fix(copilot): merge split assistant messages to prevent
Anthropic API errors (#12062)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
## Summary
- When the copilot model responds with both text content AND a
long-running tool call (e.g., `create_agent`), the streaming code
created two separate consecutive assistant messages — one with text, one
with `tool_calls`. This caused Anthropic's API to reject with
`"unexpected tool_use_id found in tool_result blocks"` because the
`tool_result` couldn't find a matching `tool_use` in the immediately
preceding assistant message.
- Added a defensive merge of consecutive assistant messages in
`to_openai_messages()` (fixes existing corrupt sessions too)
- Fixed `_yield_tool_call` to add tool_calls to the existing
current-turn assistant message instead of creating a new one
- Changed `accumulated_tool_calls` assignment to use `extend` to prevent
overwriting tool_calls added by long-running tool flow
## Test plan
- [x] All 23 chat feature tests pass (`backend/api/features/chat/`)
- [x] All 44 prompt utility tests pass (`backend/util/prompt_test.py`)
- [x] All pre-commit hooks pass (ruff, isort, black, pyright)
- [ ] Manual test: create an agent via copilot, then ask a follow-up
question — should no longer get 400 error
Greptile Overview
Greptile Summary
Fixes a critical bug where long-running tool calls (like `create_agent`)
caused Anthropic API 400 errors due to split assistant messages. The fix
ensures tool calls are added to the existing assistant message instead
of creating new ones, and adds a defensive merge function to repair any
existing corrupt sessions.
**Key changes:**
- Added `_merge_consecutive_assistant_messages()` to defensively merge
split assistant messages in `to_openai_messages()`
- Modified `_yield_tool_call()` to append tool calls to the current-turn
assistant message instead of creating a new one
- Changed `accumulated_tool_calls` from assignment to `extend` to
preserve tool calls already added by long-running tool flow
**Impact:** Resolves the issue where users received 400 errors after
creating agents via copilot and asking follow-up questions.
Confidence Score: 4/5
- Safe to merge with minor verification recommended
- The changes are well-targeted and solve a real API compatibility
issue. The logic is sound: searching backwards for the current assistant
message is correct, and using `extend` instead of assignment prevents
overwriting. The defensive merge in `to_openai_messages()` also fixes
existing corrupt sessions. All existing tests pass according to the PR
description.
- No files require special attention - changes are localized and
defensive
Sequence Diagram
```mermaid
sequenceDiagram
participant User
participant StreamAPI as stream_chat_completion
participant Chunks as _stream_chat_chunks
participant ToolCall as _yield_tool_call
participant Session as ChatSession
User->>StreamAPI: Send message
StreamAPI->>Chunks: Stream chat chunks
alt Text + Long-running tool call
Chunks->>StreamAPI: Text delta (content)
StreamAPI->>Session: Append assistant message with content
Chunks->>ToolCall: Tool call detected
Note over ToolCall: OLD: Created new assistant message
NEW: Appends to existing assistant
ToolCall->>Session: Search backwards for current assistant
ToolCall->>Session: Append tool_call to existing message
ToolCall->>Session: Add pending tool result
end
StreamAPI->>StreamAPI: Merge accumulated_tool_calls
Note over StreamAPI: Use extend (not assign)
to preserve existing tool_calls
StreamAPI->>Session: to_openai_messages()
Session->>Session: _merge_consecutive_assistant_messages()
Note over Session: Defensive: Merges any split
assistant messages
Session-->>StreamAPI: Merged messages
StreamAPI->>User: Return response
```
---
.../backend/api/features/chat/model.py | 65 +++++-
.../backend/api/features/chat/model_test.py | 214 ++++++++++++++++++
.../backend/api/features/chat/service.py | 18 +-
3 files changed, 286 insertions(+), 11 deletions(-)
diff --git a/autogpt_platform/backend/backend/api/features/chat/model.py b/autogpt_platform/backend/backend/api/features/chat/model.py
index 7318ef88d7..35418f174f 100644
--- a/autogpt_platform/backend/backend/api/features/chat/model.py
+++ b/autogpt_platform/backend/backend/api/features/chat/model.py
@@ -2,7 +2,7 @@ import asyncio
import logging
import uuid
from datetime import UTC, datetime
-from typing import Any
+from typing import Any, cast
from weakref import WeakValueDictionary
from openai.types.chat import (
@@ -104,6 +104,26 @@ class ChatSession(BaseModel):
successful_agent_runs: dict[str, int] = {}
successful_agent_schedules: dict[str, int] = {}
+ def add_tool_call_to_current_turn(self, tool_call: dict) -> None:
+ """Attach a tool_call to the current turn's assistant message.
+
+ Searches backwards for the most recent assistant message (stopping at
+ any user message boundary). If found, appends the tool_call to it.
+ Otherwise creates a new assistant message with the tool_call.
+ """
+ for msg in reversed(self.messages):
+ if msg.role == "user":
+ break
+ if msg.role == "assistant":
+ if not msg.tool_calls:
+ msg.tool_calls = []
+ msg.tool_calls.append(tool_call)
+ return
+
+ self.messages.append(
+ ChatMessage(role="assistant", content="", tool_calls=[tool_call])
+ )
+
@staticmethod
def new(user_id: str) -> "ChatSession":
return ChatSession(
@@ -172,6 +192,47 @@ class ChatSession(BaseModel):
successful_agent_schedules=successful_agent_schedules,
)
+ @staticmethod
+ def _merge_consecutive_assistant_messages(
+ messages: list[ChatCompletionMessageParam],
+ ) -> list[ChatCompletionMessageParam]:
+ """Merge consecutive assistant messages into single messages.
+
+ Long-running tool flows can create split assistant messages: one with
+ text content and another with tool_calls. Anthropic's API requires
+ tool_result blocks to reference a tool_use in the immediately preceding
+ assistant message, so these splits cause 400 errors via OpenRouter.
+ """
+ if len(messages) < 2:
+ return messages
+
+ result: list[ChatCompletionMessageParam] = [messages[0]]
+ for msg in messages[1:]:
+ prev = result[-1]
+ if prev.get("role") != "assistant" or msg.get("role") != "assistant":
+ result.append(msg)
+ continue
+
+ prev = cast(ChatCompletionAssistantMessageParam, prev)
+ curr = cast(ChatCompletionAssistantMessageParam, msg)
+
+ curr_content = curr.get("content") or ""
+ if curr_content:
+ prev_content = prev.get("content") or ""
+ prev["content"] = (
+ f"{prev_content}\n{curr_content}" if prev_content else curr_content
+ )
+
+ curr_tool_calls = curr.get("tool_calls")
+ if curr_tool_calls:
+ prev_tool_calls = prev.get("tool_calls")
+ prev["tool_calls"] = (
+ list(prev_tool_calls) + list(curr_tool_calls)
+ if prev_tool_calls
+ else list(curr_tool_calls)
+ )
+ return result
+
def to_openai_messages(self) -> list[ChatCompletionMessageParam]:
messages = []
for message in self.messages:
@@ -258,7 +319,7 @@ class ChatSession(BaseModel):
name=message.name or "",
)
)
- return messages
+ return self._merge_consecutive_assistant_messages(messages)
async def _get_session_from_cache(session_id: str) -> ChatSession | None:
diff --git a/autogpt_platform/backend/backend/api/features/chat/model_test.py b/autogpt_platform/backend/backend/api/features/chat/model_test.py
index c230b00f9c..239137844d 100644
--- a/autogpt_platform/backend/backend/api/features/chat/model_test.py
+++ b/autogpt_platform/backend/backend/api/features/chat/model_test.py
@@ -1,4 +1,16 @@
+from typing import cast
+
import pytest
+from openai.types.chat import (
+ ChatCompletionAssistantMessageParam,
+ ChatCompletionMessageParam,
+ ChatCompletionToolMessageParam,
+ ChatCompletionUserMessageParam,
+)
+from openai.types.chat.chat_completion_message_tool_call_param import (
+ ChatCompletionMessageToolCallParam,
+ Function,
+)
from .model import (
ChatMessage,
@@ -117,3 +129,205 @@ async def test_chatsession_db_storage(setup_test_user, test_user_id):
loaded.tool_calls is not None
), f"Tool calls missing for {orig.role} message"
assert len(orig.tool_calls) == len(loaded.tool_calls)
+
+
+# --------------------------------------------------------------------------- #
+# _merge_consecutive_assistant_messages #
+# --------------------------------------------------------------------------- #
+
+_tc = ChatCompletionMessageToolCallParam(
+ id="tc1", type="function", function=Function(name="do_stuff", arguments="{}")
+)
+_tc2 = ChatCompletionMessageToolCallParam(
+ id="tc2", type="function", function=Function(name="other", arguments="{}")
+)
+
+
+def test_merge_noop_when_no_consecutive_assistants():
+ """Messages without consecutive assistants are returned unchanged."""
+ msgs = [
+ ChatCompletionUserMessageParam(role="user", content="hi"),
+ ChatCompletionAssistantMessageParam(role="assistant", content="hello"),
+ ChatCompletionUserMessageParam(role="user", content="bye"),
+ ]
+ merged = ChatSession._merge_consecutive_assistant_messages(msgs)
+ assert len(merged) == 3
+ assert [m["role"] for m in merged] == ["user", "assistant", "user"]
+
+
+def test_merge_splits_text_and_tool_calls():
+ """The exact bug scenario: text-only assistant followed by tool_calls-only assistant."""
+ msgs = [
+ ChatCompletionUserMessageParam(role="user", content="build agent"),
+ ChatCompletionAssistantMessageParam(
+ role="assistant", content="Let me build that"
+ ),
+ ChatCompletionAssistantMessageParam(
+ role="assistant", content="", tool_calls=[_tc]
+ ),
+ ChatCompletionToolMessageParam(role="tool", content="ok", tool_call_id="tc1"),
+ ]
+ merged = ChatSession._merge_consecutive_assistant_messages(msgs)
+
+ assert len(merged) == 3
+ assert merged[0]["role"] == "user"
+ assert merged[2]["role"] == "tool"
+ a = cast(ChatCompletionAssistantMessageParam, merged[1])
+ assert a["role"] == "assistant"
+ assert a.get("content") == "Let me build that"
+ assert a.get("tool_calls") == [_tc]
+
+
+def test_merge_combines_tool_calls_from_both():
+ """Both consecutive assistants have tool_calls — they get merged."""
+ msgs: list[ChatCompletionAssistantMessageParam] = [
+ ChatCompletionAssistantMessageParam(
+ role="assistant", content="text", tool_calls=[_tc]
+ ),
+ ChatCompletionAssistantMessageParam(
+ role="assistant", content="", tool_calls=[_tc2]
+ ),
+ ]
+ merged = ChatSession._merge_consecutive_assistant_messages(msgs) # type: ignore[arg-type]
+
+ assert len(merged) == 1
+ a = cast(ChatCompletionAssistantMessageParam, merged[0])
+ assert a.get("tool_calls") == [_tc, _tc2]
+ assert a.get("content") == "text"
+
+
+def test_merge_three_consecutive_assistants():
+ """Three consecutive assistants collapse into one."""
+ msgs: list[ChatCompletionAssistantMessageParam] = [
+ ChatCompletionAssistantMessageParam(role="assistant", content="a"),
+ ChatCompletionAssistantMessageParam(role="assistant", content="b"),
+ ChatCompletionAssistantMessageParam(
+ role="assistant", content="", tool_calls=[_tc]
+ ),
+ ]
+ merged = ChatSession._merge_consecutive_assistant_messages(msgs) # type: ignore[arg-type]
+
+ assert len(merged) == 1
+ a = cast(ChatCompletionAssistantMessageParam, merged[0])
+ assert a.get("content") == "a\nb"
+ assert a.get("tool_calls") == [_tc]
+
+
+def test_merge_empty_and_single_message():
+ """Edge cases: empty list and single message."""
+ assert ChatSession._merge_consecutive_assistant_messages([]) == []
+
+ single: list[ChatCompletionMessageParam] = [
+ ChatCompletionUserMessageParam(role="user", content="hi")
+ ]
+ assert ChatSession._merge_consecutive_assistant_messages(single) == single
+
+
+# --------------------------------------------------------------------------- #
+# add_tool_call_to_current_turn #
+# --------------------------------------------------------------------------- #
+
+_raw_tc = {
+ "id": "tc1",
+ "type": "function",
+ "function": {"name": "f", "arguments": "{}"},
+}
+_raw_tc2 = {
+ "id": "tc2",
+ "type": "function",
+ "function": {"name": "g", "arguments": "{}"},
+}
+
+
+def test_add_tool_call_appends_to_existing_assistant():
+ """When the last assistant is from the current turn, tool_call is added to it."""
+ session = ChatSession.new(user_id="u")
+ session.messages = [
+ ChatMessage(role="user", content="hi"),
+ ChatMessage(role="assistant", content="working on it"),
+ ]
+ session.add_tool_call_to_current_turn(_raw_tc)
+
+ assert len(session.messages) == 2 # no new message created
+ assert session.messages[1].tool_calls == [_raw_tc]
+
+
+def test_add_tool_call_creates_assistant_when_none_exists():
+ """When there's no current-turn assistant, a new one is created."""
+ session = ChatSession.new(user_id="u")
+ session.messages = [
+ ChatMessage(role="user", content="hi"),
+ ]
+ session.add_tool_call_to_current_turn(_raw_tc)
+
+ assert len(session.messages) == 2
+ assert session.messages[1].role == "assistant"
+ assert session.messages[1].tool_calls == [_raw_tc]
+
+
+def test_add_tool_call_does_not_cross_user_boundary():
+ """A user message acts as a boundary — previous assistant is not modified."""
+ session = ChatSession.new(user_id="u")
+ session.messages = [
+ ChatMessage(role="assistant", content="old turn"),
+ ChatMessage(role="user", content="new message"),
+ ]
+ session.add_tool_call_to_current_turn(_raw_tc)
+
+ assert len(session.messages) == 3 # new assistant was created
+ assert session.messages[0].tool_calls is None # old assistant untouched
+ assert session.messages[2].role == "assistant"
+ assert session.messages[2].tool_calls == [_raw_tc]
+
+
+def test_add_tool_call_multiple_times():
+ """Multiple long-running tool calls accumulate on the same assistant."""
+ session = ChatSession.new(user_id="u")
+ session.messages = [
+ ChatMessage(role="user", content="hi"),
+ ChatMessage(role="assistant", content="doing stuff"),
+ ]
+ session.add_tool_call_to_current_turn(_raw_tc)
+ # Simulate a pending tool result in between (like _yield_tool_call does)
+ session.messages.append(
+ ChatMessage(role="tool", content="pending", tool_call_id="tc1")
+ )
+ session.add_tool_call_to_current_turn(_raw_tc2)
+
+ assert len(session.messages) == 3 # user, assistant, tool — no extra assistant
+ assert session.messages[1].tool_calls == [_raw_tc, _raw_tc2]
+
+
+def test_to_openai_messages_merges_split_assistants():
+ """End-to-end: session with split assistants produces valid OpenAI messages."""
+ session = ChatSession.new(user_id="u")
+ session.messages = [
+ ChatMessage(role="user", content="build agent"),
+ ChatMessage(role="assistant", content="Let me build that"),
+ ChatMessage(
+ role="assistant",
+ content="",
+ tool_calls=[
+ {
+ "id": "tc1",
+ "type": "function",
+ "function": {"name": "create_agent", "arguments": "{}"},
+ }
+ ],
+ ),
+ ChatMessage(role="tool", content="done", tool_call_id="tc1"),
+ ChatMessage(role="assistant", content="Saved!"),
+ ChatMessage(role="user", content="show me an example run"),
+ ]
+ openai_msgs = session.to_openai_messages()
+
+ # The two consecutive assistants at index 1,2 should be merged
+ roles = [m["role"] for m in openai_msgs]
+ assert roles == ["user", "assistant", "tool", "assistant", "user"]
+
+ # The merged assistant should have both content and tool_calls
+ merged = cast(ChatCompletionAssistantMessageParam, openai_msgs[1])
+ assert merged.get("content") == "Let me build that"
+ tc_list = merged.get("tool_calls")
+ assert tc_list is not None and len(list(tc_list)) == 1
+ assert list(tc_list)[0]["id"] == "tc1"
diff --git a/autogpt_platform/backend/backend/api/features/chat/service.py b/autogpt_platform/backend/backend/api/features/chat/service.py
index 072ea88fd5..193566ea01 100644
--- a/autogpt_platform/backend/backend/api/features/chat/service.py
+++ b/autogpt_platform/backend/backend/api/features/chat/service.py
@@ -800,9 +800,13 @@ async def stream_chat_completion(
# Build the messages list in the correct order
messages_to_save: list[ChatMessage] = []
- # Add assistant message with tool_calls if any
+ # Add assistant message with tool_calls if any.
+ # Use extend (not assign) to preserve tool_calls already added by
+ # _yield_tool_call for long-running tools.
if accumulated_tool_calls:
- assistant_response.tool_calls = accumulated_tool_calls
+ if not assistant_response.tool_calls:
+ assistant_response.tool_calls = []
+ assistant_response.tool_calls.extend(accumulated_tool_calls)
logger.info(
f"Added {len(accumulated_tool_calls)} tool calls to assistant message"
)
@@ -1404,13 +1408,9 @@ async def _yield_tool_call(
operation_id=operation_id,
)
- # Save assistant message with tool_call FIRST (required by LLM)
- assistant_message = ChatMessage(
- role="assistant",
- content="",
- tool_calls=[tool_calls[yield_idx]],
- )
- session.messages.append(assistant_message)
+ # Attach the tool_call to the current turn's assistant message
+ # (or create one if this is a tool-only response with no text).
+ session.add_tool_call_to_current_turn(tool_calls[yield_idx])
# Then save pending tool result
pending_message = ChatMessage(