Merge branch 'hackathon/copilot' of github.com:Significant-Gravitas/AutoGPT into hackathon/copilot

This commit is contained in:
Swifty
2025-12-16 16:31:27 +01:00
8 changed files with 418 additions and 87 deletions

View File

@@ -5,7 +5,7 @@ from collections.abc import AsyncGenerator
from typing import Annotated
from autogpt_libs import auth
from fastapi import APIRouter, Depends, Query, Security
from fastapi import APIRouter, Depends, Query, Request, Security
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
@@ -25,6 +25,13 @@ router = APIRouter(
# ========== Request/Response Models ==========
class StreamChatRequest(BaseModel):
"""Request model for streaming chat with optional context."""
message: str
is_user_message: bool = True
context: dict[str, str] | None = None # {url: str, content: str}
class CreateSessionResponse(BaseModel):
"""Response model containing information on a newly created chat session."""
@@ -110,17 +117,72 @@ async def get_session(
)
@router.post(
"/sessions/{session_id}/stream",
)
async def stream_chat_post(
session_id: str,
request: StreamChatRequest,
user_id: str | None = Depends(auth.get_user_id),
):
"""
Stream chat responses for a session (POST with context support).
Streams the AI/completion responses in real time over Server-Sent Events (SSE), including:
- Text fragments as they are generated
- Tool call UI elements (if invoked)
- Tool execution results
Args:
session_id: The chat session identifier to associate with the streamed messages.
request: Request body containing message, is_user_message, and optional context.
user_id: Optional authenticated user ID.
Returns:
StreamingResponse: SSE-formatted response chunks.
"""
# Validate session exists before starting the stream
# This prevents errors after the response has already started
session = await chat_service.get_session(session_id, user_id)
if not session:
raise NotFoundError(f"Session {session_id} not found. ")
if session.user_id is None and user_id is not None:
session = await chat_service.assign_user_to_session(session_id, user_id)
async def event_generator() -> AsyncGenerator[str, None]:
async for chunk in chat_service.stream_chat_completion(
session_id,
request.message,
is_user_message=request.is_user_message,
user_id=user_id,
session=session, # Pass pre-fetched session to avoid double-fetch
context=request.context,
):
yield chunk.to_sse()
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
},
)
@router.get(
"/sessions/{session_id}/stream",
)
async def stream_chat(
async def stream_chat_get(
session_id: str,
message: Annotated[str, Query(min_length=1, max_length=10000)],
user_id: str | None = Depends(auth.get_user_id),
is_user_message: bool = Query(default=True),
):
"""
Stream chat responses for a session.
Stream chat responses for a session (GET - legacy endpoint).
Streams the AI/completion responses in real time over Server-Sent Events (SSE), including:
- Text fragments as they are generated

View File

@@ -102,6 +102,7 @@ async def stream_chat_completion(
user_id: str | None = None,
retry_count: int = 0,
session: ChatSession | None = None,
context: dict[str, str] | None = None, # {url: str, content: str}
) -> AsyncGenerator[StreamBaseResponse, None]:
"""Main entry point for streaming chat completions with database handling.
@@ -145,9 +146,18 @@ async def stream_chat_completion(
)
if message:
# Build message content with context if provided
message_content = message
if context and context.get("url") and context.get("content"):
context_text = f"Page URL: {context['url']}\n\nPage Content:\n{context['content']}\n\n---\n\nUser Message: {message}"
message_content = context_text
logger.info(
f"Including page context: URL={context['url']}, content_length={len(context['content'])}"
)
session.messages.append(
ChatMessage(
role="user" if is_user_message else "assistant", content=message
role="user" if is_user_message else "assistant", content=message_content
)
)
logger.info(

View File

@@ -4,8 +4,91 @@ import { NextRequest } from "next/server";
/**
* SSE Proxy for chat streaming.
* EventSource doesn't support custom headers, so we need a server-side proxy
* that adds authentication and forwards the SSE stream to the client.
* Supports POST with context (page content + URL) in the request body.
*/
export async function POST(
request: NextRequest,
{ params }: { params: Promise<{ sessionId: string }> },
) {
const { sessionId } = await params;
try {
const body = await request.json();
const { message, is_user_message, context } = body;
if (!message) {
return new Response(
JSON.stringify({ error: "Missing message parameter" }),
{ status: 400, headers: { "Content-Type": "application/json" } },
);
}
// Get auth token from server-side session
const token = await getServerAuthToken();
// Build backend URL
const backendUrl = environment.getAGPTServerBaseUrl();
const streamUrl = new URL(
`/api/chat/sessions/${sessionId}/stream`,
backendUrl,
);
// Forward request to backend with auth header
const headers: Record<string, string> = {
"Content-Type": "application/json",
Accept: "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
};
if (token) {
headers["Authorization"] = `Bearer ${token}`;
}
const response = await fetch(streamUrl.toString(), {
method: "POST",
headers,
body: JSON.stringify({
message,
is_user_message: is_user_message ?? true,
context: context || null,
}),
});
if (!response.ok) {
const error = await response.text();
return new Response(error, {
status: response.status,
headers: { "Content-Type": "application/json" },
});
}
// Return the SSE stream directly
return new Response(response.body, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
},
});
} catch (error) {
console.error("SSE proxy error:", error);
return new Response(
JSON.stringify({
error: "Failed to connect to chat service",
detail: error instanceof Error ? error.message : String(error),
}),
{
status: 500,
headers: { "Content-Type": "application/json" },
},
);
}
}
/**
* Legacy GET endpoint for backward compatibility
*/
export async function GET(
request: NextRequest,

View File

@@ -5252,11 +5252,51 @@
}
},
"/api/chat/sessions/{session_id}/stream": {
"post": {
"tags": ["v2", "chat", "chat"],
"summary": "Stream Chat Post",
"description": "Stream chat responses for a session (POST with context support).\n\nStreams the AI/completion responses in real time over Server-Sent Events (SSE), including:\n - Text fragments as they are generated\n - Tool call UI elements (if invoked)\n - Tool execution results\n\nArgs:\n session_id: The chat session identifier to associate with the streamed messages.\n request: Request body containing message, is_user_message, and optional context.\n user_id: Optional authenticated user ID.\nReturns:\n StreamingResponse: SSE-formatted response chunks.",
"operationId": "postV2StreamChatPost",
"security": [{ "HTTPBearerJWT": [] }],
"parameters": [
{
"name": "session_id",
"in": "path",
"required": true,
"schema": { "type": "string", "title": "Session Id" }
}
],
"requestBody": {
"required": true,
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/StreamChatRequest" }
}
}
},
"responses": {
"200": {
"description": "Successful Response",
"content": { "application/json": { "schema": {} } }
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/HTTPValidationError" }
}
}
},
"401": {
"$ref": "#/components/responses/HTTP401NotAuthenticatedError"
}
}
},
"get": {
"tags": ["v2", "chat", "chat"],
"summary": "Stream Chat",
"description": "Stream chat responses for a session.\n\nStreams the AI/completion responses in real time over Server-Sent Events (SSE), including:\n - Text fragments as they are generated\n - Tool call UI elements (if invoked)\n - Tool execution results\n\nArgs:\n session_id: The chat session identifier to associate with the streamed messages.\n message: The user's new message to process.\n user_id: Optional authenticated user ID.\n is_user_message: Whether the message is a user message.\nReturns:\n StreamingResponse: SSE-formatted response chunks.",
"operationId": "getV2StreamChat",
"summary": "Stream Chat Get",
"description": "Stream chat responses for a session (GET - legacy endpoint).\n\nStreams the AI/completion responses in real time over Server-Sent Events (SSE), including:\n - Text fragments as they are generated\n - Tool call UI elements (if invoked)\n - Tool execution results\n\nArgs:\n session_id: The chat session identifier to associate with the streamed messages.\n message: The user's new message to process.\n user_id: Optional authenticated user ID.\n is_user_message: Whether the message is a user message.\nReturns:\n StreamingResponse: SSE-formatted response chunks.",
"operationId": "getV2StreamChatGet",
"security": [{ "HTTPBearerJWT": [] }],
"parameters": [
{
@@ -9236,6 +9276,30 @@
"required": ["submissions", "pagination"],
"title": "StoreSubmissionsResponse"
},
"StreamChatRequest": {
"properties": {
"message": { "type": "string", "title": "Message" },
"is_user_message": {
"type": "boolean",
"title": "Is User Message",
"default": true
},
"context": {
"anyOf": [
{
"additionalProperties": { "type": "string" },
"type": "object"
},
{ "type": "null" }
],
"title": "Context"
}
},
"type": "object",
"required": ["message"],
"title": "StreamChatRequest",
"description": "Request model for streaming chat with optional context."
},
"SubmissionStatus": {
"type": "string",
"enum": ["DRAFT", "PENDING", "APPROVED", "REJECTED"],

View File

@@ -1,5 +1,7 @@
import type { SessionDetailResponse } from "@/app/api/__generated__/models/sessionDetailResponse";
import { cn } from "@/lib/utils";
import { useCallback } from "react";
import { usePageContext } from "../../usePageContext";
import { ChatInput } from "../ChatInput/ChatInput";
import { MessageList } from "../MessageList/MessageList";
import { QuickActionsWelcome } from "../QuickActionsWelcome/QuickActionsWelcome";
@@ -24,6 +26,16 @@ export function ChatContainer({
initialMessages,
onRefreshSession,
});
const { capturePageContext } = usePageContext();
// Wrap sendMessage to automatically capture page context
const sendMessageWithContext = useCallback(
async (content: string, isUserMessage: boolean = true) => {
const context = capturePageContext();
await sendMessage(content, isUserMessage, context);
},
[sendMessage, capturePageContext],
);
const quickActions = [
"Find agents for social media management",
@@ -40,7 +52,7 @@ export function ChatContainer({
title="Welcome to AutoGPT Chat"
description="Start a conversation to discover and run AI agents."
actions={quickActions}
onActionClick={sendMessage}
onActionClick={sendMessageWithContext}
disabled={isStreaming || !sessionId}
/>
) : (
@@ -48,7 +60,7 @@ export function ChatContainer({
messages={messages}
streamingChunks={streamingChunks}
isStreaming={isStreaming}
onSendMessage={sendMessage}
onSendMessage={sendMessageWithContext}
className="flex-1"
/>
)}
@@ -56,7 +68,7 @@ export function ChatContainer({
{/* Input - Always visible */}
<div className="border-t border-zinc-200 p-4">
<ChatInput
onSend={sendMessage}
onSend={sendMessageWithContext}
disabled={isStreaming || !sessionId}
placeholder={
sessionId ? "Type your message..." : "Creating session..."

View File

@@ -86,7 +86,11 @@ export function useChatContainer({
}, [initialMessages, messages]);
const sendMessage = useCallback(
async function sendMessage(content: string, isUserMessage: boolean = true) {
async function sendMessage(
content: string,
isUserMessage: boolean = true,
context?: { url: string; content: string },
) {
if (!sessionId) {
console.error("Cannot send message: no session ID");
return;
@@ -110,7 +114,13 @@ export function useChatContainer({
setIsStreamingInitiated,
});
try {
await sendStreamMessage(sessionId, content, dispatcher, isUserMessage);
await sendStreamMessage(
sessionId,
content,
dispatcher,
isUserMessage,
context,
);
} catch (err) {
console.error("Failed to send message:", err);
setIsStreamingInitiated(false);

View File

@@ -44,7 +44,6 @@ export interface StreamChunk {
export function useChatStream() {
const [isStreaming, setIsStreaming] = useState(false);
const [error, setError] = useState<Error | null>(null);
const eventSourceRef = useRef<EventSource | null>(null);
const retryCountRef = useRef<number>(0);
const retryTimeoutRef = useRef<NodeJS.Timeout | null>(null);
const abortControllerRef = useRef<AbortController | null>(null);
@@ -54,10 +53,6 @@ export function useChatStream() {
abortControllerRef.current.abort();
abortControllerRef.current = null;
}
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
if (retryTimeoutRef.current) {
clearTimeout(retryTimeoutRef.current);
retryTimeoutRef.current = null;
@@ -78,6 +73,7 @@ export function useChatStream() {
message: string,
onChunk: (chunk: StreamChunk) => void,
isUserMessage: boolean = true,
context?: { url: string; content: string },
) => {
stopStreaming();
@@ -93,96 +89,147 @@ export function useChatStream() {
setError(null);
try {
const url = `/api/chat/sessions/${sessionId}/stream?message=${encodeURIComponent(
const url = `/api/chat/sessions/${sessionId}/stream`;
const body = JSON.stringify({
message,
)}&is_user_message=${isUserMessage}`;
const eventSource = new EventSource(url);
eventSourceRef.current = eventSource;
abortController.signal.addEventListener("abort", () => {
eventSource.close();
eventSourceRef.current = null;
is_user_message: isUserMessage,
context: context || null,
});
const response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "text/event-stream",
},
body,
signal: abortController.signal,
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(errorText || `HTTP ${response.status}`);
}
if (!response.body) {
throw new Error("Response body is null");
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
return new Promise<void>((resolve, reject) => {
const cleanup = () => {
eventSource.removeEventListener("message", messageHandler);
eventSource.removeEventListener("error", errorHandler);
reader.cancel().catch(() => {
// Ignore cancel errors
});
};
const messageHandler = (event: MessageEvent) => {
const readStream = async () => {
try {
const chunk = JSON.parse(event.data) as StreamChunk;
while (true) {
const { done, value } = await reader.read();
if (retryCountRef.current > 0) {
retryCountRef.current = 0;
}
if (done) {
cleanup();
stopStreaming();
resolve();
return;
}
// Call the chunk handler
onChunk(chunk);
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
// Handle stream lifecycle
if (chunk.type === "stream_end") {
cleanup();
stopStreaming();
resolve();
} else if (chunk.type === "error") {
cleanup();
reject(
new Error(chunk.message || chunk.content || "Stream error"),
);
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data === "[DONE]") {
cleanup();
stopStreaming();
resolve();
return;
}
try {
const chunk = JSON.parse(data) as StreamChunk;
if (retryCountRef.current > 0) {
retryCountRef.current = 0;
}
// Call the chunk handler
onChunk(chunk);
// Handle stream lifecycle
if (chunk.type === "stream_end") {
cleanup();
stopStreaming();
resolve();
return;
} else if (chunk.type === "error") {
cleanup();
reject(
new Error(
chunk.message || chunk.content || "Stream error",
),
);
return;
}
} catch (err) {
// Skip invalid JSON lines
console.warn("Failed to parse SSE chunk:", err, data);
}
}
}
}
} catch (err) {
const parseError =
if (err instanceof Error && err.name === "AbortError") {
cleanup();
return;
}
const streamError =
err instanceof Error
? err
: new Error("Failed to parse stream chunk");
setError(parseError);
cleanup();
reject(parseError);
}
};
: new Error("Failed to read stream");
const errorHandler = () => {
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
if (retryCountRef.current < MAX_RETRIES) {
retryCountRef.current += 1;
const retryDelay =
INITIAL_RETRY_DELAY *
Math.pow(2, retryCountRef.current - 1);
if (retryCountRef.current < MAX_RETRIES) {
retryCountRef.current += 1;
const retryDelay =
INITIAL_RETRY_DELAY * Math.pow(2, retryCountRef.current - 1);
toast.info("Connection interrupted", {
description: `Retrying in ${retryDelay / 1000} seconds...`,
});
toast.info("Connection interrupted", {
description: `Retrying in ${retryDelay / 1000} seconds...`,
});
retryTimeoutRef.current = setTimeout(() => {
sendMessage(sessionId, message, onChunk, isUserMessage).catch(
(_err) => {
retryTimeoutRef.current = setTimeout(() => {
sendMessage(
sessionId,
message,
onChunk,
isUserMessage,
context,
).catch((_err) => {
// Retry failed
},
);
}, retryDelay);
} else {
const streamError = new Error(
"Stream connection failed after multiple retries",
);
setError(streamError);
toast.error("Connection Failed", {
description:
"Unable to connect to chat service. Please try again.",
});
cleanup();
stopStreaming();
reject(streamError);
});
}, retryDelay);
} else {
setError(streamError);
toast.error("Connection Failed", {
description:
"Unable to connect to chat service. Please try again.",
});
cleanup();
stopStreaming();
reject(streamError);
}
}
};
eventSource.addEventListener("message", messageHandler);
eventSource.addEventListener("error", errorHandler);
readStream();
});
} catch (err) {
const streamError =

View File

@@ -0,0 +1,43 @@
import { useCallback } from "react";
export interface PageContext {
url: string;
content: string;
}
/**
* Hook to capture the current page context (URL + full page content)
*/
export function usePageContext() {
const capturePageContext = useCallback((): PageContext => {
if (typeof window === "undefined") {
return { url: "", content: "" };
}
const url = window.location.href;
// Capture full page text content
// Remove script and style elements, then get text
const clone = document.cloneNode(true) as Document;
const scripts = clone.querySelectorAll("script, style, noscript");
scripts.forEach((el) => el.remove());
// Get text content from body
const body = clone.body;
const content = body?.textContent || body?.innerText || "";
// Clean up whitespace
const cleanedContent = content
.replace(/\s+/g, " ")
.replace(/\n\s*\n/g, "\n")
.trim();
return {
url,
content: cleanedContent,
};
}, []);
return { capturePageContext };
}