mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-11 15:25:16 -05:00
fix(frontend): API stream issues leaking into prompt (#12063)
## Changes 🏗️ <img width="800" height="621" alt="Screenshot 2026-02-11 at 19 32 39" src="https://github.com/user-attachments/assets/e97be1a7-972e-4ae0-8dfa-6ade63cf287b" /> When the BE API has an error, prevent it from leaking into the stream and instead handle it gracefully via toast. ## Checklist 📋 ### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: - [x] Run the app locally and trust the changes <!-- greptile_comment --> <h2>Greptile Overview</h2> <details><summary><h3>Greptile Summary</h3></summary> This PR fixes an issue where backend API stream errors were leaking into the chat prompt instead of being handled gracefully. The fix involves both backend and frontend changes to ensure error events conform to the AI SDK's strict schema. **Key Changes:** - **Backend (`response_model.py`)**: Added custom `to_sse()` method for `StreamError` that only emits `type` and `errorText` fields, stripping extra fields like `code` and `details` that cause AI SDK validation failures - **Backend (`prompt.py`)**: Added validation step after context compression to remove orphaned tool responses without matching tool calls, preventing "unexpected tool_use_id" API errors - **Frontend (`route.ts`)**: Implemented SSE stream normalization with `normalizeSSEStream()` and `normalizeSSEEvent()` functions to strip non-conforming fields from error events before they reach the AI SDK - **Frontend (`ChatMessagesContainer.tsx`)**: Added toast notifications for errors and improved error display UI with deduplication logic The changes ensure a clean separation between internal error metadata (useful for logging/debugging) and the strict schema required by the AI SDK on the frontend. </details> <details><summary><h3>Confidence Score: 4/5</h3></summary> - This PR is safe to merge with low risk - The changes are well-structured and address a specific bug with proper error handling. The dual-layer approach (backend filtering in `to_sse()` + frontend normalization) provides defense-in-depth. However, the lack of automated tests for the new error normalization logic and the potential for edge cases in SSE parsing prevent a perfect score. - Pay close attention to `autogpt_platform/frontend/src/app/api/chat/sessions/[sessionId]/stream/route.ts` - the SSE normalization logic should be tested with various error scenarios </details> <details><summary><h3>Sequence Diagram</h3></summary> ```mermaid sequenceDiagram participant User participant Frontend as ChatMessagesContainer participant Proxy as /api/chat/.../stream participant Backend as Backend API participant AISDK as AI SDK User->>Frontend: Send message Frontend->>Proxy: POST with message Proxy->>Backend: Forward request with auth Backend->>Backend: Process message alt Success Path Backend->>Proxy: SSE stream (text-delta, etc.) Proxy->>Proxy: normalizeSSEStream (pass through) Proxy->>AISDK: Forward SSE events AISDK->>Frontend: Update messages Frontend->>User: Display response else Error Path Backend->>Backend: StreamError.to_sse() Note over Backend: Only emit {type, errorText} Backend->>Proxy: SSE error event Proxy->>Proxy: normalizeSSEEvent() Note over Proxy: Strip extra fields (code, details) Proxy->>AISDK: {type: "error", errorText: "..."} AISDK->>Frontend: error state updated Frontend->>Frontend: Toast notification (deduplicated) Frontend->>User: Show error UI + toast end ``` </details> <!-- greptile_other_comments_section --> <!-- /greptile_comment --> --------- Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> Co-authored-by: Otto-AGPT <otto@agpt.co>
This commit is contained in:
@@ -10,6 +10,8 @@ from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from backend.util.json import dumps as json_dumps
|
||||
|
||||
|
||||
class ResponseType(str, Enum):
|
||||
"""Types of streaming responses following AI SDK protocol."""
|
||||
@@ -193,6 +195,18 @@ class StreamError(StreamBaseResponse):
|
||||
default=None, description="Additional error details"
|
||||
)
|
||||
|
||||
def to_sse(self) -> str:
|
||||
"""Convert to SSE format, only emitting fields required by AI SDK protocol.
|
||||
|
||||
The AI SDK uses z.strictObject({type, errorText}) which rejects
|
||||
any extra fields like `code` or `details`.
|
||||
"""
|
||||
data = {
|
||||
"type": self.type.value,
|
||||
"errorText": self.errorText,
|
||||
}
|
||||
return f"data: {json_dumps(data)}\n\n"
|
||||
|
||||
|
||||
class StreamHeartbeat(StreamBaseResponse):
|
||||
"""Heartbeat to keep SSE connection alive during long-running operations.
|
||||
|
||||
@@ -364,6 +364,44 @@ def _remove_orphan_tool_responses(
|
||||
return result
|
||||
|
||||
|
||||
def validate_and_remove_orphan_tool_responses(
|
||||
messages: list[dict],
|
||||
log_warning: bool = True,
|
||||
) -> list[dict]:
|
||||
"""
|
||||
Validate tool_call/tool_response pairs and remove orphaned responses.
|
||||
|
||||
Scans messages in order, tracking all tool_call IDs. Any tool response
|
||||
referencing an ID not seen in a preceding message is considered orphaned
|
||||
and removed. This prevents API errors like Anthropic's "unexpected tool_use_id".
|
||||
|
||||
Args:
|
||||
messages: List of messages to validate (OpenAI or Anthropic format)
|
||||
log_warning: Whether to log a warning when orphans are found
|
||||
|
||||
Returns:
|
||||
A new list with orphaned tool responses removed
|
||||
"""
|
||||
available_ids: set[str] = set()
|
||||
orphan_ids: set[str] = set()
|
||||
|
||||
for msg in messages:
|
||||
available_ids |= _extract_tool_call_ids_from_message(msg)
|
||||
for resp_id in _extract_tool_response_ids_from_message(msg):
|
||||
if resp_id not in available_ids:
|
||||
orphan_ids.add(resp_id)
|
||||
|
||||
if not orphan_ids:
|
||||
return messages
|
||||
|
||||
if log_warning:
|
||||
logger.warning(
|
||||
f"Removing {len(orphan_ids)} orphan tool response(s): {orphan_ids}"
|
||||
)
|
||||
|
||||
return _remove_orphan_tool_responses(messages, orphan_ids)
|
||||
|
||||
|
||||
def _ensure_tool_pairs_intact(
|
||||
recent_messages: list[dict],
|
||||
all_messages: list[dict],
|
||||
@@ -723,6 +761,13 @@ async def compress_context(
|
||||
|
||||
# Filter out any None values that may have been introduced
|
||||
final_msgs: list[dict] = [m for m in msgs if m is not None]
|
||||
|
||||
# ---- STEP 6: Final tool-pair validation ---------------------------------
|
||||
# After all compression steps, verify that every tool response has a
|
||||
# matching tool_call in a preceding assistant message. Remove orphans
|
||||
# to prevent API errors (e.g., Anthropic's "unexpected tool_use_id").
|
||||
final_msgs = validate_and_remove_orphan_tool_responses(final_msgs)
|
||||
|
||||
final_count = sum(_msg_tokens(m, enc) for m in final_msgs)
|
||||
error = None
|
||||
if final_count + reserve > target_tokens:
|
||||
|
||||
@@ -10,8 +10,9 @@ import {
|
||||
MessageResponse,
|
||||
} from "@/components/ai-elements/message";
|
||||
import { LoadingSpinner } from "@/components/atoms/LoadingSpinner/LoadingSpinner";
|
||||
import { toast } from "@/components/molecules/Toast/use-toast";
|
||||
import { ToolUIPart, UIDataTypes, UIMessage, UITools } from "ai";
|
||||
import { useEffect, useState } from "react";
|
||||
import { useEffect, useRef, useState } from "react";
|
||||
import { CreateAgentTool } from "../../tools/CreateAgent/CreateAgent";
|
||||
import { EditAgentTool } from "../../tools/EditAgent/EditAgent";
|
||||
import { FindAgentsTool } from "../../tools/FindAgents/FindAgents";
|
||||
@@ -121,6 +122,7 @@ export const ChatMessagesContainer = ({
|
||||
isLoading,
|
||||
}: ChatMessagesContainerProps) => {
|
||||
const [thinkingPhrase, setThinkingPhrase] = useState(getRandomPhrase);
|
||||
const lastToastTimeRef = useRef(0);
|
||||
|
||||
useEffect(() => {
|
||||
if (status === "submitted") {
|
||||
@@ -128,6 +130,20 @@ export const ChatMessagesContainer = ({
|
||||
}
|
||||
}, [status]);
|
||||
|
||||
// Show a toast when a new error occurs, debounced to avoid spam
|
||||
useEffect(() => {
|
||||
if (!error) return;
|
||||
const now = Date.now();
|
||||
if (now - lastToastTimeRef.current < 3_000) return;
|
||||
lastToastTimeRef.current = now;
|
||||
toast({
|
||||
variant: "destructive",
|
||||
title: "Something went wrong",
|
||||
description:
|
||||
"The assistant encountered an error. Please try sending your message again.",
|
||||
});
|
||||
}, [error]);
|
||||
|
||||
const lastMessage = messages[messages.length - 1];
|
||||
const lastAssistantHasVisibleContent =
|
||||
lastMessage?.role === "assistant" &&
|
||||
@@ -263,8 +279,12 @@ export const ChatMessagesContainer = ({
|
||||
</Message>
|
||||
)}
|
||||
{error && (
|
||||
<div className="rounded-lg bg-red-50 p-3 text-red-600">
|
||||
Error: {error.message}
|
||||
<div className="rounded-lg bg-red-50 p-4 text-sm text-red-700">
|
||||
<p className="font-medium">Something went wrong</p>
|
||||
<p className="mt-1 text-red-600">
|
||||
The assistant encountered an error. Please try sending your
|
||||
message again.
|
||||
</p>
|
||||
</div>
|
||||
)}
|
||||
</ConversationContent>
|
||||
|
||||
@@ -4,7 +4,6 @@ import { WarningDiamondIcon } from "@phosphor-icons/react";
|
||||
import type { ToolUIPart } from "ai";
|
||||
import { useCopilotChatActions } from "../../components/CopilotChatActionsProvider/useCopilotChatActions";
|
||||
import { MorphingTextAnimation } from "../../components/MorphingTextAnimation/MorphingTextAnimation";
|
||||
import { OrbitLoader } from "../../components/OrbitLoader/OrbitLoader";
|
||||
import { ProgressBar } from "../../components/ProgressBar/ProgressBar";
|
||||
import {
|
||||
ContentCardDescription,
|
||||
@@ -77,7 +76,7 @@ function getAccordionMeta(output: CreateAgentToolOutput) {
|
||||
isOperationInProgressOutput(output)
|
||||
) {
|
||||
return {
|
||||
icon: <OrbitLoader size={32} />,
|
||||
icon,
|
||||
title: "Creating agent, this may take a few minutes. Sit back and relax.",
|
||||
};
|
||||
}
|
||||
|
||||
@@ -203,7 +203,7 @@ export function getAccordionMeta(output: RunAgentToolOutput): {
|
||||
? output.status.trim()
|
||||
: "started";
|
||||
return {
|
||||
icon: <OrbitLoader size={28} className="text-neutral-700" />,
|
||||
icon,
|
||||
title: output.graph_name,
|
||||
description: `Status: ${statusText}`,
|
||||
};
|
||||
|
||||
@@ -149,7 +149,7 @@ export function getAccordionMeta(output: RunBlockToolOutput): {
|
||||
if (isRunBlockBlockOutput(output)) {
|
||||
const keys = Object.keys(output.outputs ?? {});
|
||||
return {
|
||||
icon: <OrbitLoader size={24} className="text-neutral-700" />,
|
||||
icon,
|
||||
title: output.block_name,
|
||||
description:
|
||||
keys.length > 0
|
||||
|
||||
@@ -1,11 +1,8 @@
|
||||
import { environment } from "@/services/environment";
|
||||
import { getServerAuthToken } from "@/lib/autogpt-server-api/helpers";
|
||||
import { NextRequest } from "next/server";
|
||||
import { normalizeSSEStream, SSE_HEADERS } from "../../../sse-helpers";
|
||||
|
||||
/**
|
||||
* SSE Proxy for chat streaming.
|
||||
* Supports POST with context (page content + URL) in the request body.
|
||||
*/
|
||||
export async function POST(
|
||||
request: NextRequest,
|
||||
{ params }: { params: Promise<{ sessionId: string }> },
|
||||
@@ -23,17 +20,14 @@ export async function POST(
|
||||
);
|
||||
}
|
||||
|
||||
// Get auth token from server-side session
|
||||
const token = await getServerAuthToken();
|
||||
|
||||
// Build backend URL
|
||||
const backendUrl = environment.getAGPTServerBaseUrl();
|
||||
const streamUrl = new URL(
|
||||
`/api/chat/sessions/${sessionId}/stream`,
|
||||
backendUrl,
|
||||
);
|
||||
|
||||
// Forward request to backend with auth header
|
||||
const headers: Record<string, string> = {
|
||||
"Content-Type": "application/json",
|
||||
Accept: "text/event-stream",
|
||||
@@ -63,14 +57,15 @@ export async function POST(
|
||||
});
|
||||
}
|
||||
|
||||
// Return the SSE stream directly
|
||||
return new Response(response.body, {
|
||||
headers: {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache, no-transform",
|
||||
Connection: "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
},
|
||||
if (!response.body) {
|
||||
return new Response(
|
||||
JSON.stringify({ error: "Empty response from chat service" }),
|
||||
{ status: 502, headers: { "Content-Type": "application/json" } },
|
||||
);
|
||||
}
|
||||
|
||||
return new Response(normalizeSSEStream(response.body), {
|
||||
headers: SSE_HEADERS,
|
||||
});
|
||||
} catch (error) {
|
||||
console.error("SSE proxy error:", error);
|
||||
@@ -87,13 +82,6 @@ export async function POST(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resume an active stream for a session.
|
||||
*
|
||||
* Called by the AI SDK's `useChat(resume: true)` on page load.
|
||||
* Proxies to the backend which checks for an active stream and either
|
||||
* replays it (200 + SSE) or returns 204 No Content.
|
||||
*/
|
||||
export async function GET(
|
||||
_request: NextRequest,
|
||||
{ params }: { params: Promise<{ sessionId: string }> },
|
||||
@@ -124,7 +112,6 @@ export async function GET(
|
||||
headers,
|
||||
});
|
||||
|
||||
// 204 = no active stream to resume
|
||||
if (response.status === 204) {
|
||||
return new Response(null, { status: 204 });
|
||||
}
|
||||
@@ -137,12 +124,13 @@ export async function GET(
|
||||
});
|
||||
}
|
||||
|
||||
return new Response(response.body, {
|
||||
if (!response.body) {
|
||||
return new Response(null, { status: 204 });
|
||||
}
|
||||
|
||||
return new Response(normalizeSSEStream(response.body), {
|
||||
headers: {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache, no-transform",
|
||||
Connection: "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
...SSE_HEADERS,
|
||||
"x-vercel-ai-ui-message-stream": "v1",
|
||||
},
|
||||
});
|
||||
|
||||
72
autogpt_platform/frontend/src/app/api/chat/sse-helpers.ts
Normal file
72
autogpt_platform/frontend/src/app/api/chat/sse-helpers.ts
Normal file
@@ -0,0 +1,72 @@
|
||||
export const SSE_HEADERS = {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache, no-transform",
|
||||
Connection: "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
} as const;
|
||||
|
||||
export function normalizeSSEStream(
|
||||
input: ReadableStream<Uint8Array>,
|
||||
): ReadableStream<Uint8Array> {
|
||||
const decoder = new TextDecoder();
|
||||
const encoder = new TextEncoder();
|
||||
let buffer = "";
|
||||
|
||||
return input.pipeThrough(
|
||||
new TransformStream<Uint8Array, Uint8Array>({
|
||||
transform(chunk, controller) {
|
||||
buffer += decoder.decode(chunk, { stream: true });
|
||||
|
||||
const parts = buffer.split("\n\n");
|
||||
buffer = parts.pop() ?? "";
|
||||
|
||||
for (const part of parts) {
|
||||
const normalized = normalizeSSEEvent(part);
|
||||
controller.enqueue(encoder.encode(normalized + "\n\n"));
|
||||
}
|
||||
},
|
||||
flush(controller) {
|
||||
if (buffer.trim()) {
|
||||
const normalized = normalizeSSEEvent(buffer);
|
||||
controller.enqueue(encoder.encode(normalized + "\n\n"));
|
||||
}
|
||||
},
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
function normalizeSSEEvent(event: string): string {
|
||||
const lines = event.split("\n");
|
||||
const dataLines: string[] = [];
|
||||
const otherLines: string[] = [];
|
||||
|
||||
for (const line of lines) {
|
||||
if (line.startsWith("data: ")) {
|
||||
dataLines.push(line.slice(6));
|
||||
} else {
|
||||
otherLines.push(line);
|
||||
}
|
||||
}
|
||||
|
||||
if (dataLines.length === 0) return event;
|
||||
|
||||
const dataStr = dataLines.join("\n");
|
||||
try {
|
||||
const parsed = JSON.parse(dataStr) as Record<string, unknown>;
|
||||
if (parsed.type === "error") {
|
||||
const normalized = {
|
||||
type: "error",
|
||||
errorText:
|
||||
typeof parsed.errorText === "string"
|
||||
? parsed.errorText
|
||||
: "An unexpected error occurred",
|
||||
};
|
||||
const newData = `data: ${JSON.stringify(normalized)}`;
|
||||
return [...otherLines.filter((l) => l.length > 0), newData].join("\n");
|
||||
}
|
||||
} catch {
|
||||
// Not valid JSON — pass through as-is
|
||||
}
|
||||
|
||||
return event;
|
||||
}
|
||||
@@ -1,20 +1,8 @@
|
||||
import { environment } from "@/services/environment";
|
||||
import { getServerAuthToken } from "@/lib/autogpt-server-api/helpers";
|
||||
import { NextRequest } from "next/server";
|
||||
import { normalizeSSEStream, SSE_HEADERS } from "../../../sse-helpers";
|
||||
|
||||
/**
|
||||
* SSE Proxy for task stream reconnection.
|
||||
*
|
||||
* This endpoint allows clients to reconnect to an ongoing or recently completed
|
||||
* background task's stream. It replays missed messages from Redis Streams and
|
||||
* subscribes to live updates if the task is still running.
|
||||
*
|
||||
* Client contract:
|
||||
* 1. When receiving an operation_started event, store the task_id
|
||||
* 2. To reconnect: GET /api/chat/tasks/{taskId}/stream?last_message_id={idx}
|
||||
* 3. Messages are replayed from the last_message_id position
|
||||
* 4. Stream ends when "finish" event is received
|
||||
*/
|
||||
export async function GET(
|
||||
request: NextRequest,
|
||||
{ params }: { params: Promise<{ taskId: string }> },
|
||||
@@ -24,15 +12,12 @@ export async function GET(
|
||||
const lastMessageId = searchParams.get("last_message_id") || "0-0";
|
||||
|
||||
try {
|
||||
// Get auth token from server-side session
|
||||
const token = await getServerAuthToken();
|
||||
|
||||
// Build backend URL
|
||||
const backendUrl = environment.getAGPTServerBaseUrl();
|
||||
const streamUrl = new URL(`/api/chat/tasks/${taskId}/stream`, backendUrl);
|
||||
streamUrl.searchParams.set("last_message_id", lastMessageId);
|
||||
|
||||
// Forward request to backend with auth header
|
||||
const headers: Record<string, string> = {
|
||||
Accept: "text/event-stream",
|
||||
"Cache-Control": "no-cache",
|
||||
@@ -56,14 +41,12 @@ export async function GET(
|
||||
});
|
||||
}
|
||||
|
||||
// Return the SSE stream directly
|
||||
return new Response(response.body, {
|
||||
headers: {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache, no-transform",
|
||||
Connection: "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
},
|
||||
if (!response.body) {
|
||||
return new Response(null, { status: 204 });
|
||||
}
|
||||
|
||||
return new Response(normalizeSSEStream(response.body), {
|
||||
headers: SSE_HEADERS,
|
||||
});
|
||||
} catch (error) {
|
||||
console.error("Task stream proxy error:", error);
|
||||
|
||||
Reference in New Issue
Block a user