diff --git a/autogpt_platform/backend/backend/api/features/chat/response_model.py b/autogpt_platform/backend/backend/api/features/chat/response_model.py
index 1ae836f7d1..8ea0c1f97a 100644
--- a/autogpt_platform/backend/backend/api/features/chat/response_model.py
+++ b/autogpt_platform/backend/backend/api/features/chat/response_model.py
@@ -10,6 +10,8 @@ from typing import Any
from pydantic import BaseModel, Field
+from backend.util.json import dumps as json_dumps
+
class ResponseType(str, Enum):
"""Types of streaming responses following AI SDK protocol."""
@@ -193,6 +195,18 @@ class StreamError(StreamBaseResponse):
default=None, description="Additional error details"
)
+ def to_sse(self) -> str:
+ """Convert to SSE format, only emitting fields required by AI SDK protocol.
+
+ The AI SDK uses z.strictObject({type, errorText}) which rejects
+ any extra fields like `code` or `details`.
+ """
+ data = {
+ "type": self.type.value,
+ "errorText": self.errorText,
+ }
+ return f"data: {json_dumps(data)}\n\n"
+
class StreamHeartbeat(StreamBaseResponse):
"""Heartbeat to keep SSE connection alive during long-running operations.
diff --git a/autogpt_platform/backend/backend/util/prompt.py b/autogpt_platform/backend/backend/util/prompt.py
index 5f904bbc8a..3ec25dd61b 100644
--- a/autogpt_platform/backend/backend/util/prompt.py
+++ b/autogpt_platform/backend/backend/util/prompt.py
@@ -364,6 +364,44 @@ def _remove_orphan_tool_responses(
return result
+def validate_and_remove_orphan_tool_responses(
+ messages: list[dict],
+ log_warning: bool = True,
+) -> list[dict]:
+ """
+ Validate tool_call/tool_response pairs and remove orphaned responses.
+
+ Scans messages in order, tracking all tool_call IDs. Any tool response
+ referencing an ID not seen in a preceding message is considered orphaned
+ and removed. This prevents API errors like Anthropic's "unexpected tool_use_id".
+
+ Args:
+ messages: List of messages to validate (OpenAI or Anthropic format)
+ log_warning: Whether to log a warning when orphans are found
+
+ Returns:
+ A new list with orphaned tool responses removed
+ """
+ available_ids: set[str] = set()
+ orphan_ids: set[str] = set()
+
+ for msg in messages:
+ available_ids |= _extract_tool_call_ids_from_message(msg)
+ for resp_id in _extract_tool_response_ids_from_message(msg):
+ if resp_id not in available_ids:
+ orphan_ids.add(resp_id)
+
+ if not orphan_ids:
+ return messages
+
+ if log_warning:
+ logger.warning(
+ f"Removing {len(orphan_ids)} orphan tool response(s): {orphan_ids}"
+ )
+
+ return _remove_orphan_tool_responses(messages, orphan_ids)
+
+
def _ensure_tool_pairs_intact(
recent_messages: list[dict],
all_messages: list[dict],
@@ -723,6 +761,13 @@ async def compress_context(
# Filter out any None values that may have been introduced
final_msgs: list[dict] = [m for m in msgs if m is not None]
+
+ # ---- STEP 6: Final tool-pair validation ---------------------------------
+ # After all compression steps, verify that every tool response has a
+ # matching tool_call in a preceding assistant message. Remove orphans
+ # to prevent API errors (e.g., Anthropic's "unexpected tool_use_id").
+ final_msgs = validate_and_remove_orphan_tool_responses(final_msgs)
+
final_count = sum(_msg_tokens(m, enc) for m in final_msgs)
error = None
if final_count + reserve > target_tokens:
diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/components/ChatMessagesContainer/ChatMessagesContainer.tsx b/autogpt_platform/frontend/src/app/(platform)/copilot/components/ChatMessagesContainer/ChatMessagesContainer.tsx
index 4578b268e3..fbe1c03d1d 100644
--- a/autogpt_platform/frontend/src/app/(platform)/copilot/components/ChatMessagesContainer/ChatMessagesContainer.tsx
+++ b/autogpt_platform/frontend/src/app/(platform)/copilot/components/ChatMessagesContainer/ChatMessagesContainer.tsx
@@ -10,8 +10,9 @@ import {
MessageResponse,
} from "@/components/ai-elements/message";
import { LoadingSpinner } from "@/components/atoms/LoadingSpinner/LoadingSpinner";
+import { toast } from "@/components/molecules/Toast/use-toast";
import { ToolUIPart, UIDataTypes, UIMessage, UITools } from "ai";
-import { useEffect, useState } from "react";
+import { useEffect, useRef, useState } from "react";
import { CreateAgentTool } from "../../tools/CreateAgent/CreateAgent";
import { EditAgentTool } from "../../tools/EditAgent/EditAgent";
import { FindAgentsTool } from "../../tools/FindAgents/FindAgents";
@@ -121,6 +122,7 @@ export const ChatMessagesContainer = ({
isLoading,
}: ChatMessagesContainerProps) => {
const [thinkingPhrase, setThinkingPhrase] = useState(getRandomPhrase);
+ const lastToastTimeRef = useRef(0);
useEffect(() => {
if (status === "submitted") {
@@ -128,6 +130,20 @@ export const ChatMessagesContainer = ({
}
}, [status]);
+ // Show a toast when a new error occurs, debounced to avoid spam
+ useEffect(() => {
+ if (!error) return;
+ const now = Date.now();
+ if (now - lastToastTimeRef.current < 3_000) return;
+ lastToastTimeRef.current = now;
+ toast({
+ variant: "destructive",
+ title: "Something went wrong",
+ description:
+ "The assistant encountered an error. Please try sending your message again.",
+ });
+ }, [error]);
+
const lastMessage = messages[messages.length - 1];
const lastAssistantHasVisibleContent =
lastMessage?.role === "assistant" &&
@@ -263,8 +279,12 @@ export const ChatMessagesContainer = ({
)}
{error && (
-
- Error: {error.message}
+
+
Something went wrong
+
+ The assistant encountered an error. Please try sending your
+ message again.
+
)}
diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/tools/CreateAgent/CreateAgent.tsx b/autogpt_platform/frontend/src/app/(platform)/copilot/tools/CreateAgent/CreateAgent.tsx
index 0d023d0529..88b1c491d7 100644
--- a/autogpt_platform/frontend/src/app/(platform)/copilot/tools/CreateAgent/CreateAgent.tsx
+++ b/autogpt_platform/frontend/src/app/(platform)/copilot/tools/CreateAgent/CreateAgent.tsx
@@ -4,7 +4,6 @@ import { WarningDiamondIcon } from "@phosphor-icons/react";
import type { ToolUIPart } from "ai";
import { useCopilotChatActions } from "../../components/CopilotChatActionsProvider/useCopilotChatActions";
import { MorphingTextAnimation } from "../../components/MorphingTextAnimation/MorphingTextAnimation";
-import { OrbitLoader } from "../../components/OrbitLoader/OrbitLoader";
import { ProgressBar } from "../../components/ProgressBar/ProgressBar";
import {
ContentCardDescription,
@@ -77,7 +76,7 @@ function getAccordionMeta(output: CreateAgentToolOutput) {
isOperationInProgressOutput(output)
) {
return {
- icon:
,
+ icon,
title: "Creating agent, this may take a few minutes. Sit back and relax.",
};
}
diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/tools/RunAgent/helpers.tsx b/autogpt_platform/frontend/src/app/(platform)/copilot/tools/RunAgent/helpers.tsx
index 816c661230..2b75ed9c97 100644
--- a/autogpt_platform/frontend/src/app/(platform)/copilot/tools/RunAgent/helpers.tsx
+++ b/autogpt_platform/frontend/src/app/(platform)/copilot/tools/RunAgent/helpers.tsx
@@ -203,7 +203,7 @@ export function getAccordionMeta(output: RunAgentToolOutput): {
? output.status.trim()
: "started";
return {
- icon:
,
+ icon,
title: output.graph_name,
description: `Status: ${statusText}`,
};
diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/tools/RunBlock/helpers.tsx b/autogpt_platform/frontend/src/app/(platform)/copilot/tools/RunBlock/helpers.tsx
index c9b903876a..b8625988cd 100644
--- a/autogpt_platform/frontend/src/app/(platform)/copilot/tools/RunBlock/helpers.tsx
+++ b/autogpt_platform/frontend/src/app/(platform)/copilot/tools/RunBlock/helpers.tsx
@@ -149,7 +149,7 @@ export function getAccordionMeta(output: RunBlockToolOutput): {
if (isRunBlockBlockOutput(output)) {
const keys = Object.keys(output.outputs ?? {});
return {
- icon:
,
+ icon,
title: output.block_name,
description:
keys.length > 0
diff --git a/autogpt_platform/frontend/src/app/api/chat/sessions/[sessionId]/stream/route.ts b/autogpt_platform/frontend/src/app/api/chat/sessions/[sessionId]/stream/route.ts
index 6facf80c58..bd27c77963 100644
--- a/autogpt_platform/frontend/src/app/api/chat/sessions/[sessionId]/stream/route.ts
+++ b/autogpt_platform/frontend/src/app/api/chat/sessions/[sessionId]/stream/route.ts
@@ -1,11 +1,8 @@
import { environment } from "@/services/environment";
import { getServerAuthToken } from "@/lib/autogpt-server-api/helpers";
import { NextRequest } from "next/server";
+import { normalizeSSEStream, SSE_HEADERS } from "../../../sse-helpers";
-/**
- * SSE Proxy for chat streaming.
- * Supports POST with context (page content + URL) in the request body.
- */
export async function POST(
request: NextRequest,
{ params }: { params: Promise<{ sessionId: string }> },
@@ -23,17 +20,14 @@ export async function POST(
);
}
- // Get auth token from server-side session
const token = await getServerAuthToken();
- // Build backend URL
const backendUrl = environment.getAGPTServerBaseUrl();
const streamUrl = new URL(
`/api/chat/sessions/${sessionId}/stream`,
backendUrl,
);
- // Forward request to backend with auth header
const headers: Record
= {
"Content-Type": "application/json",
Accept: "text/event-stream",
@@ -63,14 +57,15 @@ export async function POST(
});
}
- // Return the SSE stream directly
- return new Response(response.body, {
- headers: {
- "Content-Type": "text/event-stream",
- "Cache-Control": "no-cache, no-transform",
- Connection: "keep-alive",
- "X-Accel-Buffering": "no",
- },
+ if (!response.body) {
+ return new Response(
+ JSON.stringify({ error: "Empty response from chat service" }),
+ { status: 502, headers: { "Content-Type": "application/json" } },
+ );
+ }
+
+ return new Response(normalizeSSEStream(response.body), {
+ headers: SSE_HEADERS,
});
} catch (error) {
console.error("SSE proxy error:", error);
@@ -87,13 +82,6 @@ export async function POST(
}
}
-/**
- * Resume an active stream for a session.
- *
- * Called by the AI SDK's `useChat(resume: true)` on page load.
- * Proxies to the backend which checks for an active stream and either
- * replays it (200 + SSE) or returns 204 No Content.
- */
export async function GET(
_request: NextRequest,
{ params }: { params: Promise<{ sessionId: string }> },
@@ -124,7 +112,6 @@ export async function GET(
headers,
});
- // 204 = no active stream to resume
if (response.status === 204) {
return new Response(null, { status: 204 });
}
@@ -137,12 +124,13 @@ export async function GET(
});
}
- return new Response(response.body, {
+ if (!response.body) {
+ return new Response(null, { status: 204 });
+ }
+
+ return new Response(normalizeSSEStream(response.body), {
headers: {
- "Content-Type": "text/event-stream",
- "Cache-Control": "no-cache, no-transform",
- Connection: "keep-alive",
- "X-Accel-Buffering": "no",
+ ...SSE_HEADERS,
"x-vercel-ai-ui-message-stream": "v1",
},
});
diff --git a/autogpt_platform/frontend/src/app/api/chat/sse-helpers.ts b/autogpt_platform/frontend/src/app/api/chat/sse-helpers.ts
new file mode 100644
index 0000000000..a5c76cf872
--- /dev/null
+++ b/autogpt_platform/frontend/src/app/api/chat/sse-helpers.ts
@@ -0,0 +1,72 @@
+export const SSE_HEADERS = {
+ "Content-Type": "text/event-stream",
+ "Cache-Control": "no-cache, no-transform",
+ Connection: "keep-alive",
+ "X-Accel-Buffering": "no",
+} as const;
+
+export function normalizeSSEStream(
+ input: ReadableStream,
+): ReadableStream {
+ const decoder = new TextDecoder();
+ const encoder = new TextEncoder();
+ let buffer = "";
+
+ return input.pipeThrough(
+ new TransformStream({
+ transform(chunk, controller) {
+ buffer += decoder.decode(chunk, { stream: true });
+
+ const parts = buffer.split("\n\n");
+ buffer = parts.pop() ?? "";
+
+ for (const part of parts) {
+ const normalized = normalizeSSEEvent(part);
+ controller.enqueue(encoder.encode(normalized + "\n\n"));
+ }
+ },
+ flush(controller) {
+ if (buffer.trim()) {
+ const normalized = normalizeSSEEvent(buffer);
+ controller.enqueue(encoder.encode(normalized + "\n\n"));
+ }
+ },
+ }),
+ );
+}
+
+function normalizeSSEEvent(event: string): string {
+ const lines = event.split("\n");
+ const dataLines: string[] = [];
+ const otherLines: string[] = [];
+
+ for (const line of lines) {
+ if (line.startsWith("data: ")) {
+ dataLines.push(line.slice(6));
+ } else {
+ otherLines.push(line);
+ }
+ }
+
+ if (dataLines.length === 0) return event;
+
+ const dataStr = dataLines.join("\n");
+ try {
+ const parsed = JSON.parse(dataStr) as Record;
+ if (parsed.type === "error") {
+ const normalized = {
+ type: "error",
+ errorText:
+ typeof parsed.errorText === "string"
+ ? parsed.errorText
+ : "An unexpected error occurred",
+ };
+ const newData = `data: ${JSON.stringify(normalized)}`;
+ return [...otherLines.filter((l) => l.length > 0), newData].join("\n");
+ }
+ } catch {
+ // Not valid JSON — pass through as-is
+ }
+
+ return event;
+}
diff --git a/autogpt_platform/frontend/src/app/api/chat/tasks/[taskId]/stream/route.ts b/autogpt_platform/frontend/src/app/api/chat/tasks/[taskId]/stream/route.ts
index 336786bfdb..238fdebb06 100644
--- a/autogpt_platform/frontend/src/app/api/chat/tasks/[taskId]/stream/route.ts
+++ b/autogpt_platform/frontend/src/app/api/chat/tasks/[taskId]/stream/route.ts
@@ -1,20 +1,8 @@
import { environment } from "@/services/environment";
import { getServerAuthToken } from "@/lib/autogpt-server-api/helpers";
import { NextRequest } from "next/server";
+import { normalizeSSEStream, SSE_HEADERS } from "../../../sse-helpers";
-/**
- * SSE Proxy for task stream reconnection.
- *
- * This endpoint allows clients to reconnect to an ongoing or recently completed
- * background task's stream. It replays missed messages from Redis Streams and
- * subscribes to live updates if the task is still running.
- *
- * Client contract:
- * 1. When receiving an operation_started event, store the task_id
- * 2. To reconnect: GET /api/chat/tasks/{taskId}/stream?last_message_id={idx}
- * 3. Messages are replayed from the last_message_id position
- * 4. Stream ends when "finish" event is received
- */
export async function GET(
request: NextRequest,
{ params }: { params: Promise<{ taskId: string }> },
@@ -24,15 +12,12 @@ export async function GET(
const lastMessageId = searchParams.get("last_message_id") || "0-0";
try {
- // Get auth token from server-side session
const token = await getServerAuthToken();
- // Build backend URL
const backendUrl = environment.getAGPTServerBaseUrl();
const streamUrl = new URL(`/api/chat/tasks/${taskId}/stream`, backendUrl);
streamUrl.searchParams.set("last_message_id", lastMessageId);
- // Forward request to backend with auth header
const headers: Record = {
Accept: "text/event-stream",
"Cache-Control": "no-cache",
@@ -56,14 +41,12 @@ export async function GET(
});
}
- // Return the SSE stream directly
- return new Response(response.body, {
- headers: {
- "Content-Type": "text/event-stream",
- "Cache-Control": "no-cache, no-transform",
- Connection: "keep-alive",
- "X-Accel-Buffering": "no",
- },
+ if (!response.body) {
+ return new Response(null, { status: 204 });
+ }
+
+ return new Response(normalizeSSEStream(response.body), {
+ headers: SSE_HEADERS,
});
} catch (error) {
console.error("Task stream proxy error:", error);