From 22298c24fdceb8f5940f144ad5ffab1dcdedf674 Mon Sep 17 00:00:00 2001 From: Lluis Agusti Date: Tue, 16 Dec 2025 16:21:59 +0100 Subject: [PATCH] chore: add page content and url to stream message --- .../backend/backend/server/v2/chat/routes.py | 68 +++++- .../backend/backend/server/v2/chat/service.py | 12 +- .../chat/sessions/[sessionId]/stream/route.ts | 87 +++++++- .../frontend/src/app/api/openapi.json | 70 ++++++- .../ChatContainer/ChatContainer.tsx | 18 +- .../ChatContainer/useChatContainer.ts | 14 +- .../contextual/Chat/useChatStream.ts | 193 +++++++++++------- .../contextual/Chat/usePageContext.ts | 43 ++++ 8 files changed, 418 insertions(+), 87 deletions(-) create mode 100644 autogpt_platform/frontend/src/components/contextual/Chat/usePageContext.ts diff --git a/autogpt_platform/backend/backend/server/v2/chat/routes.py b/autogpt_platform/backend/backend/server/v2/chat/routes.py index 86bcf861c0..754bd5c1b5 100644 --- a/autogpt_platform/backend/backend/server/v2/chat/routes.py +++ b/autogpt_platform/backend/backend/server/v2/chat/routes.py @@ -5,7 +5,7 @@ from collections.abc import AsyncGenerator from typing import Annotated from autogpt_libs import auth -from fastapi import APIRouter, Depends, Query, Security +from fastapi import APIRouter, Depends, Query, Request, Security from fastapi.responses import StreamingResponse from pydantic import BaseModel @@ -25,6 +25,13 @@ router = APIRouter( # ========== Request/Response Models ========== +class StreamChatRequest(BaseModel): + """Request model for streaming chat with optional context.""" + message: str + is_user_message: bool = True + context: dict[str, str] | None = None # {url: str, content: str} + + class CreateSessionResponse(BaseModel): """Response model containing information on a newly created chat session.""" @@ -110,17 +117,72 @@ async def get_session( ) +@router.post( + "/sessions/{session_id}/stream", +) +async def stream_chat_post( + session_id: str, + request: StreamChatRequest, + user_id: str | None = Depends(auth.get_user_id), +): + """ + Stream chat responses for a session (POST with context support). + + Streams the AI/completion responses in real time over Server-Sent Events (SSE), including: + - Text fragments as they are generated + - Tool call UI elements (if invoked) + - Tool execution results + + Args: + session_id: The chat session identifier to associate with the streamed messages. + request: Request body containing message, is_user_message, and optional context. + user_id: Optional authenticated user ID. + Returns: + StreamingResponse: SSE-formatted response chunks. + + """ + # Validate session exists before starting the stream + # This prevents errors after the response has already started + session = await chat_service.get_session(session_id, user_id) + + if not session: + raise NotFoundError(f"Session {session_id} not found. ") + if session.user_id is None and user_id is not None: + session = await chat_service.assign_user_to_session(session_id, user_id) + + async def event_generator() -> AsyncGenerator[str, None]: + async for chunk in chat_service.stream_chat_completion( + session_id, + request.message, + is_user_message=request.is_user_message, + user_id=user_id, + session=session, # Pass pre-fetched session to avoid double-fetch + context=request.context, + ): + yield chunk.to_sse() + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", # Disable nginx buffering + }, + ) + + @router.get( "/sessions/{session_id}/stream", ) -async def stream_chat( +async def stream_chat_get( session_id: str, message: Annotated[str, Query(min_length=1, max_length=10000)], user_id: str | None = Depends(auth.get_user_id), is_user_message: bool = Query(default=True), ): """ - Stream chat responses for a session. + Stream chat responses for a session (GET - legacy endpoint). Streams the AI/completion responses in real time over Server-Sent Events (SSE), including: - Text fragments as they are generated diff --git a/autogpt_platform/backend/backend/server/v2/chat/service.py b/autogpt_platform/backend/backend/server/v2/chat/service.py index 4328deb016..a673ca7a31 100644 --- a/autogpt_platform/backend/backend/server/v2/chat/service.py +++ b/autogpt_platform/backend/backend/server/v2/chat/service.py @@ -77,6 +77,7 @@ async def stream_chat_completion( user_id: str | None = None, retry_count: int = 0, session: ChatSession | None = None, + context: dict[str, str] | None = None, # {url: str, content: str} ) -> AsyncGenerator[StreamBaseResponse, None]: """Main entry point for streaming chat completions with database handling. @@ -120,9 +121,18 @@ async def stream_chat_completion( ) if message: + # Build message content with context if provided + message_content = message + if context and context.get("url") and context.get("content"): + context_text = f"Page URL: {context['url']}\n\nPage Content:\n{context['content']}\n\n---\n\nUser Message: {message}" + message_content = context_text + logger.info( + f"Including page context: URL={context['url']}, content_length={len(context['content'])}" + ) + session.messages.append( ChatMessage( - role="user" if is_user_message else "assistant", content=message + role="user" if is_user_message else "assistant", content=message_content ) ) logger.info( diff --git a/autogpt_platform/frontend/src/app/api/chat/sessions/[sessionId]/stream/route.ts b/autogpt_platform/frontend/src/app/api/chat/sessions/[sessionId]/stream/route.ts index ca33483587..d63eed0ca2 100644 --- a/autogpt_platform/frontend/src/app/api/chat/sessions/[sessionId]/stream/route.ts +++ b/autogpt_platform/frontend/src/app/api/chat/sessions/[sessionId]/stream/route.ts @@ -4,8 +4,91 @@ import { NextRequest } from "next/server"; /** * SSE Proxy for chat streaming. - * EventSource doesn't support custom headers, so we need a server-side proxy - * that adds authentication and forwards the SSE stream to the client. + * Supports POST with context (page content + URL) in the request body. + */ +export async function POST( + request: NextRequest, + { params }: { params: Promise<{ sessionId: string }> }, +) { + const { sessionId } = await params; + + try { + const body = await request.json(); + const { message, is_user_message, context } = body; + + if (!message) { + return new Response( + JSON.stringify({ error: "Missing message parameter" }), + { status: 400, headers: { "Content-Type": "application/json" } }, + ); + } + + // Get auth token from server-side session + const token = await getServerAuthToken(); + + // Build backend URL + const backendUrl = environment.getAGPTServerBaseUrl(); + const streamUrl = new URL( + `/api/chat/sessions/${sessionId}/stream`, + backendUrl, + ); + + // Forward request to backend with auth header + const headers: Record = { + "Content-Type": "application/json", + Accept: "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }; + + if (token) { + headers["Authorization"] = `Bearer ${token}`; + } + + const response = await fetch(streamUrl.toString(), { + method: "POST", + headers, + body: JSON.stringify({ + message, + is_user_message: is_user_message ?? true, + context: context || null, + }), + }); + + if (!response.ok) { + const error = await response.text(); + return new Response(error, { + status: response.status, + headers: { "Content-Type": "application/json" }, + }); + } + + // Return the SSE stream directly + return new Response(response.body, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache, no-transform", + Connection: "keep-alive", + "X-Accel-Buffering": "no", + }, + }); + } catch (error) { + console.error("SSE proxy error:", error); + return new Response( + JSON.stringify({ + error: "Failed to connect to chat service", + detail: error instanceof Error ? error.message : String(error), + }), + { + status: 500, + headers: { "Content-Type": "application/json" }, + }, + ); + } +} + +/** + * Legacy GET endpoint for backward compatibility */ export async function GET( request: NextRequest, diff --git a/autogpt_platform/frontend/src/app/api/openapi.json b/autogpt_platform/frontend/src/app/api/openapi.json index f8c5563476..d3f1ae4f88 100644 --- a/autogpt_platform/frontend/src/app/api/openapi.json +++ b/autogpt_platform/frontend/src/app/api/openapi.json @@ -5252,11 +5252,51 @@ } }, "/api/chat/sessions/{session_id}/stream": { + "post": { + "tags": ["v2", "chat", "chat"], + "summary": "Stream Chat Post", + "description": "Stream chat responses for a session (POST with context support).\n\nStreams the AI/completion responses in real time over Server-Sent Events (SSE), including:\n - Text fragments as they are generated\n - Tool call UI elements (if invoked)\n - Tool execution results\n\nArgs:\n session_id: The chat session identifier to associate with the streamed messages.\n request: Request body containing message, is_user_message, and optional context.\n user_id: Optional authenticated user ID.\nReturns:\n StreamingResponse: SSE-formatted response chunks.", + "operationId": "postV2StreamChatPost", + "security": [{ "HTTPBearerJWT": [] }], + "parameters": [ + { + "name": "session_id", + "in": "path", + "required": true, + "schema": { "type": "string", "title": "Session Id" } + } + ], + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": { "$ref": "#/components/schemas/StreamChatRequest" } + } + } + }, + "responses": { + "200": { + "description": "Successful Response", + "content": { "application/json": { "schema": {} } } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { "$ref": "#/components/schemas/HTTPValidationError" } + } + } + }, + "401": { + "$ref": "#/components/responses/HTTP401NotAuthenticatedError" + } + } + }, "get": { "tags": ["v2", "chat", "chat"], - "summary": "Stream Chat", - "description": "Stream chat responses for a session.\n\nStreams the AI/completion responses in real time over Server-Sent Events (SSE), including:\n - Text fragments as they are generated\n - Tool call UI elements (if invoked)\n - Tool execution results\n\nArgs:\n session_id: The chat session identifier to associate with the streamed messages.\n message: The user's new message to process.\n user_id: Optional authenticated user ID.\n is_user_message: Whether the message is a user message.\nReturns:\n StreamingResponse: SSE-formatted response chunks.", - "operationId": "getV2StreamChat", + "summary": "Stream Chat Get", + "description": "Stream chat responses for a session (GET - legacy endpoint).\n\nStreams the AI/completion responses in real time over Server-Sent Events (SSE), including:\n - Text fragments as they are generated\n - Tool call UI elements (if invoked)\n - Tool execution results\n\nArgs:\n session_id: The chat session identifier to associate with the streamed messages.\n message: The user's new message to process.\n user_id: Optional authenticated user ID.\n is_user_message: Whether the message is a user message.\nReturns:\n StreamingResponse: SSE-formatted response chunks.", + "operationId": "getV2StreamChatGet", "security": [{ "HTTPBearerJWT": [] }], "parameters": [ { @@ -9236,6 +9276,30 @@ "required": ["submissions", "pagination"], "title": "StoreSubmissionsResponse" }, + "StreamChatRequest": { + "properties": { + "message": { "type": "string", "title": "Message" }, + "is_user_message": { + "type": "boolean", + "title": "Is User Message", + "default": true + }, + "context": { + "anyOf": [ + { + "additionalProperties": { "type": "string" }, + "type": "object" + }, + { "type": "null" } + ], + "title": "Context" + } + }, + "type": "object", + "required": ["message"], + "title": "StreamChatRequest", + "description": "Request model for streaming chat with optional context." + }, "SubmissionStatus": { "type": "string", "enum": ["DRAFT", "PENDING", "APPROVED", "REJECTED"], diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/ChatContainer.tsx b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/ChatContainer.tsx index 1eee26ee09..eae822f37f 100644 --- a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/ChatContainer.tsx +++ b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/ChatContainer.tsx @@ -1,5 +1,7 @@ import type { SessionDetailResponse } from "@/app/api/__generated__/models/sessionDetailResponse"; import { cn } from "@/lib/utils"; +import { useCallback } from "react"; +import { usePageContext } from "../../usePageContext"; import { ChatInput } from "../ChatInput/ChatInput"; import { MessageList } from "../MessageList/MessageList"; import { QuickActionsWelcome } from "../QuickActionsWelcome/QuickActionsWelcome"; @@ -24,6 +26,16 @@ export function ChatContainer({ initialMessages, onRefreshSession, }); + const { capturePageContext } = usePageContext(); + + // Wrap sendMessage to automatically capture page context + const sendMessageWithContext = useCallback( + async (content: string, isUserMessage: boolean = true) => { + const context = capturePageContext(); + await sendMessage(content, isUserMessage, context); + }, + [sendMessage, capturePageContext], + ); const quickActions = [ "Find agents for social media management", @@ -40,7 +52,7 @@ export function ChatContainer({ title="Welcome to AutoGPT Chat" description="Start a conversation to discover and run AI agents." actions={quickActions} - onActionClick={sendMessage} + onActionClick={sendMessageWithContext} disabled={isStreaming || !sessionId} /> ) : ( @@ -48,7 +60,7 @@ export function ChatContainer({ messages={messages} streamingChunks={streamingChunks} isStreaming={isStreaming} - onSendMessage={sendMessage} + onSendMessage={sendMessageWithContext} className="flex-1" /> )} @@ -56,7 +68,7 @@ export function ChatContainer({ {/* Input - Always visible */}
(null); - const eventSourceRef = useRef(null); const retryCountRef = useRef(0); const retryTimeoutRef = useRef(null); const abortControllerRef = useRef(null); @@ -54,10 +53,6 @@ export function useChatStream() { abortControllerRef.current.abort(); abortControllerRef.current = null; } - if (eventSourceRef.current) { - eventSourceRef.current.close(); - eventSourceRef.current = null; - } if (retryTimeoutRef.current) { clearTimeout(retryTimeoutRef.current); retryTimeoutRef.current = null; @@ -78,6 +73,7 @@ export function useChatStream() { message: string, onChunk: (chunk: StreamChunk) => void, isUserMessage: boolean = true, + context?: { url: string; content: string }, ) => { stopStreaming(); @@ -93,96 +89,147 @@ export function useChatStream() { setError(null); try { - const url = `/api/chat/sessions/${sessionId}/stream?message=${encodeURIComponent( + const url = `/api/chat/sessions/${sessionId}/stream`; + const body = JSON.stringify({ message, - )}&is_user_message=${isUserMessage}`; - - const eventSource = new EventSource(url); - eventSourceRef.current = eventSource; - - abortController.signal.addEventListener("abort", () => { - eventSource.close(); - eventSourceRef.current = null; + is_user_message: isUserMessage, + context: context || null, }); + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + Accept: "text/event-stream", + }, + body, + signal: abortController.signal, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(errorText || `HTTP ${response.status}`); + } + + if (!response.body) { + throw new Error("Response body is null"); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + return new Promise((resolve, reject) => { const cleanup = () => { - eventSource.removeEventListener("message", messageHandler); - eventSource.removeEventListener("error", errorHandler); + reader.cancel().catch(() => { + // Ignore cancel errors + }); }; - const messageHandler = (event: MessageEvent) => { + const readStream = async () => { try { - const chunk = JSON.parse(event.data) as StreamChunk; + while (true) { + const { done, value } = await reader.read(); - if (retryCountRef.current > 0) { - retryCountRef.current = 0; - } + if (done) { + cleanup(); + stopStreaming(); + resolve(); + return; + } - // Call the chunk handler - onChunk(chunk); + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + buffer = lines.pop() || ""; - // Handle stream lifecycle - if (chunk.type === "stream_end") { - cleanup(); - stopStreaming(); - resolve(); - } else if (chunk.type === "error") { - cleanup(); - reject( - new Error(chunk.message || chunk.content || "Stream error"), - ); + for (const line of lines) { + if (line.startsWith("data: ")) { + const data = line.slice(6); + if (data === "[DONE]") { + cleanup(); + stopStreaming(); + resolve(); + return; + } + + try { + const chunk = JSON.parse(data) as StreamChunk; + + if (retryCountRef.current > 0) { + retryCountRef.current = 0; + } + + // Call the chunk handler + onChunk(chunk); + + // Handle stream lifecycle + if (chunk.type === "stream_end") { + cleanup(); + stopStreaming(); + resolve(); + return; + } else if (chunk.type === "error") { + cleanup(); + reject( + new Error( + chunk.message || chunk.content || "Stream error", + ), + ); + return; + } + } catch (err) { + // Skip invalid JSON lines + console.warn("Failed to parse SSE chunk:", err, data); + } + } + } } } catch (err) { - const parseError = + if (err instanceof Error && err.name === "AbortError") { + cleanup(); + return; + } + + const streamError = err instanceof Error ? err - : new Error("Failed to parse stream chunk"); - setError(parseError); - cleanup(); - reject(parseError); - } - }; + : new Error("Failed to read stream"); - const errorHandler = () => { - if (eventSourceRef.current) { - eventSourceRef.current.close(); - eventSourceRef.current = null; - } + if (retryCountRef.current < MAX_RETRIES) { + retryCountRef.current += 1; + const retryDelay = + INITIAL_RETRY_DELAY * + Math.pow(2, retryCountRef.current - 1); - if (retryCountRef.current < MAX_RETRIES) { - retryCountRef.current += 1; - const retryDelay = - INITIAL_RETRY_DELAY * Math.pow(2, retryCountRef.current - 1); + toast.info("Connection interrupted", { + description: `Retrying in ${retryDelay / 1000} seconds...`, + }); - toast.info("Connection interrupted", { - description: `Retrying in ${retryDelay / 1000} seconds...`, - }); - - retryTimeoutRef.current = setTimeout(() => { - sendMessage(sessionId, message, onChunk, isUserMessage).catch( - (_err) => { + retryTimeoutRef.current = setTimeout(() => { + sendMessage( + sessionId, + message, + onChunk, + isUserMessage, + context, + ).catch((_err) => { // Retry failed - }, - ); - }, retryDelay); - } else { - const streamError = new Error( - "Stream connection failed after multiple retries", - ); - setError(streamError); - toast.error("Connection Failed", { - description: - "Unable to connect to chat service. Please try again.", - }); - cleanup(); - stopStreaming(); - reject(streamError); + }); + }, retryDelay); + } else { + setError(streamError); + toast.error("Connection Failed", { + description: + "Unable to connect to chat service. Please try again.", + }); + cleanup(); + stopStreaming(); + reject(streamError); + } } }; - eventSource.addEventListener("message", messageHandler); - eventSource.addEventListener("error", errorHandler); + readStream(); }); } catch (err) { const streamError = diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/usePageContext.ts b/autogpt_platform/frontend/src/components/contextual/Chat/usePageContext.ts new file mode 100644 index 0000000000..616106dcaf --- /dev/null +++ b/autogpt_platform/frontend/src/components/contextual/Chat/usePageContext.ts @@ -0,0 +1,43 @@ +import { useCallback } from "react"; + +export interface PageContext { + url: string; + content: string; +} + +/** + * Hook to capture the current page context (URL + full page content) + */ +export function usePageContext() { + const capturePageContext = useCallback((): PageContext => { + if (typeof window === "undefined") { + return { url: "", content: "" }; + } + + const url = window.location.href; + + // Capture full page text content + // Remove script and style elements, then get text + const clone = document.cloneNode(true) as Document; + const scripts = clone.querySelectorAll("script, style, noscript"); + scripts.forEach((el) => el.remove()); + + // Get text content from body + const body = clone.body; + const content = body?.textContent || body?.innerText || ""; + + // Clean up whitespace + const cleanedContent = content + .replace(/\s+/g, " ") + .replace(/\n\s*\n/g, "\n") + .trim(); + + return { + url, + content: cleanedContent, + }; + }, []); + + return { capturePageContext }; +} +