feat(chat): implement session stream resumption endpoint

- Refactored the existing GET endpoint to allow resuming an active chat session stream without requiring a new message.
- Updated the backend logic to check for an active task and return the appropriate SSE stream or a 204 No Content response if no task is running.
- Modified the frontend to support the new resume functionality, enhancing user experience by allowing seamless continuation of chat sessions.
- Updated OpenAPI documentation to reflect changes in endpoint behavior and parameters.
This commit is contained in:
abhi1992002
2026-02-06 13:07:32 +05:30
parent 251d26a643
commit 1c9680b6f2
4 changed files with 78 additions and 91 deletions

View File

@@ -6,7 +6,7 @@ from collections.abc import AsyncGenerator
from typing import Annotated
from autogpt_libs import auth
from fastapi import APIRouter, Depends, Header, HTTPException, Query, Security
from fastapi import APIRouter, Depends, Header, HTTPException, Query, Response, Security
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
@@ -393,63 +393,73 @@ async def stream_chat_post(
@router.get(
"/sessions/{session_id}/stream",
)
async def stream_chat_get(
async def resume_session_stream(
session_id: str,
message: Annotated[str, Query(min_length=1, max_length=10000)],
user_id: str | None = Depends(auth.get_user_id),
is_user_message: bool = Query(default=True),
):
"""
Stream chat responses for a session (GET - legacy endpoint).
Resume an active stream for a session.
Streams the AI/completion responses in real time over Server-Sent Events (SSE), including:
- Text fragments as they are generated
- Tool call UI elements (if invoked)
- Tool execution results
Called by the AI SDK's ``useChat(resume: true)`` on page load.
Checks for an active (in-progress) task on the session and either replays
the full SSE stream or returns 204 No Content if nothing is running.
Args:
session_id: The chat session identifier to associate with the streamed messages.
message: The user's new message to process.
session_id: The chat session identifier.
user_id: Optional authenticated user ID.
is_user_message: Whether the message is a user message.
Returns:
StreamingResponse: SSE-formatted response chunks.
Returns:
StreamingResponse (SSE) when an active stream exists,
or 204 No Content when there is nothing to resume.
"""
session = await _validate_and_get_session(session_id, user_id)
import asyncio
active_task, _last_id = await stream_registry.get_active_task_for_session(
session_id, user_id
)
if not active_task:
return Response(status_code=204)
subscriber_queue = await stream_registry.subscribe_to_task(
task_id=active_task.task_id,
user_id=user_id,
last_message_id="0-0", # Full replay so useChat rebuilds the message
)
if subscriber_queue is None:
return Response(status_code=204)
async def event_generator() -> AsyncGenerator[str, None]:
chunk_count = 0
first_chunk_type: str | None = None
async for chunk in chat_service.stream_chat_completion(
session_id,
message,
is_user_message=is_user_message,
user_id=user_id,
session=session, # Pass pre-fetched session to avoid double-fetch
):
if chunk_count < 3:
logger.info(
"Chat stream chunk",
extra={
"session_id": session_id,
"chunk_type": str(chunk.type),
},
try:
while True:
try:
chunk = await asyncio.wait_for(
subscriber_queue.get(), timeout=30.0
)
yield chunk.to_sse()
if isinstance(chunk, StreamFinish):
break
except asyncio.TimeoutError:
yield StreamHeartbeat().to_sse()
except GeneratorExit:
pass
except Exception as e:
logger.error(
f"Error in resume stream for session {session_id}: {e}"
)
finally:
try:
await stream_registry.unsubscribe_from_task(
active_task.task_id, subscriber_queue
)
if not first_chunk_type:
first_chunk_type = str(chunk.type)
chunk_count += 1
yield chunk.to_sse()
logger.info(
"Chat stream completed",
extra={
"session_id": session_id,
"chunk_count": chunk_count,
"first_chunk_type": first_chunk_type,
},
)
# AI SDK protocol termination
yield "data: [DONE]\n\n"
except Exception as unsub_err:
logger.error(
f"Error unsubscribing from task {active_task.task_id}: {unsub_err}",
exc_info=True,
)
yield "data: [DONE]\n\n"
return StreamingResponse(
event_generator(),
@@ -457,8 +467,8 @@ async def stream_chat_get(
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
"x-vercel-ai-ui-message-stream": "v1", # AI SDK protocol header
"X-Accel-Buffering": "no",
"x-vercel-ai-ui-message-stream": "v1",
},
)

View File

@@ -37,12 +37,17 @@ export function useCopilotPage() {
},
};
},
// Resume uses GET on the same endpoint (no message param → backend resumes)
prepareReconnectToStreamRequest: () => ({
api: `/api/chat/sessions/${sessionId}/stream`,
}),
})
: null;
const { messages, sendMessage, status, error, setMessages } = useChat({
id: sessionId ?? undefined,
transport: transport ?? undefined,
resume: !!sessionId,
});
useEffect(() => {

View File

@@ -88,39 +88,27 @@ export async function POST(
}
/**
* Legacy GET endpoint for backward compatibility
* Resume an active stream for a session.
*
* Called by the AI SDK's `useChat(resume: true)` on page load.
* Proxies to the backend which checks for an active stream and either
* replays it (200 + SSE) or returns 204 No Content.
*/
export async function GET(
request: NextRequest,
_request: NextRequest,
{ params }: { params: Promise<{ sessionId: string }> },
) {
const { sessionId } = await params;
const searchParams = request.nextUrl.searchParams;
const message = searchParams.get("message");
const isUserMessage = searchParams.get("is_user_message");
if (!message) {
return new Response("Missing message parameter", { status: 400 });
}
try {
// Get auth token from server-side session
const token = await getServerAuthToken();
// Build backend URL
const backendUrl = environment.getAGPTServerBaseUrl();
const streamUrl = new URL(
`/api/chat/sessions/${sessionId}/stream`,
backendUrl,
);
streamUrl.searchParams.set("message", message);
// Pass is_user_message parameter if provided
if (isUserMessage !== null) {
streamUrl.searchParams.set("is_user_message", isUserMessage);
}
// Forward request to backend with auth header
const headers: Record<string, string> = {
Accept: "text/event-stream",
"Cache-Control": "no-cache",
@@ -136,6 +124,11 @@ export async function GET(
headers,
});
// 204 = no active stream to resume
if (response.status === 204) {
return new Response(null, { status: 204 });
}
if (!response.ok) {
const error = await response.text();
return new Response(error, {
@@ -144,17 +137,17 @@ export async function GET(
});
}
// Return the SSE stream directly
return new Response(response.body, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
"x-vercel-ai-ui-message-stream": "v1",
},
});
} catch (error) {
console.error("SSE proxy error:", error);
console.error("Resume stream proxy error:", error);
return new Response(
JSON.stringify({
error: "Failed to connect to chat service",

View File

@@ -1234,9 +1234,9 @@
"/api/chat/sessions/{session_id}/stream": {
"get": {
"tags": ["v2", "chat", "chat"],
"summary": "Stream Chat Get",
"description": "Stream chat responses for a session (GET - legacy endpoint).\n\nStreams the AI/completion responses in real time over Server-Sent Events (SSE), including:\n - Text fragments as they are generated\n - Tool call UI elements (if invoked)\n - Tool execution results\n\nArgs:\n session_id: The chat session identifier to associate with the streamed messages.\n message: The user's new message to process.\n user_id: Optional authenticated user ID.\n is_user_message: Whether the message is a user message.\nReturns:\n StreamingResponse: SSE-formatted response chunks.",
"operationId": "getV2StreamChatGet",
"summary": "Resume Session Stream",
"description": "Resume an active stream for a session.\n\nCalled by the AI SDK's ``useChat(resume: true)`` on page load.\nChecks for an active (in-progress) task on the session and either replays\nthe full SSE stream or returns 204 No Content if nothing is running.\n\nArgs:\n session_id: The chat session identifier.\n user_id: Optional authenticated user ID.\n\nReturns:\n StreamingResponse (SSE) when an active stream exists,\n or 204 No Content when there is nothing to resume.",
"operationId": "getV2ResumeSessionStream",
"security": [{ "HTTPBearerJWT": [] }],
"parameters": [
{
@@ -1244,27 +1244,6 @@
"in": "path",
"required": true,
"schema": { "type": "string", "title": "Session Id" }
},
{
"name": "message",
"in": "query",
"required": true,
"schema": {
"type": "string",
"minLength": 1,
"maxLength": 10000,
"title": "Message"
}
},
{
"name": "is_user_message",
"in": "query",
"required": false,
"schema": {
"type": "boolean",
"default": true,
"title": "Is User Message"
}
}
],
"responses": {