From 1c9680b6f2d08017c8c50ce2d23688f3a6e1e38f Mon Sep 17 00:00:00 2001 From: abhi1992002 Date: Fri, 6 Feb 2026 13:07:32 +0530 Subject: [PATCH] feat(chat): implement session stream resumption endpoint - Refactored the existing GET endpoint to allow resuming an active chat session stream without requiring a new message. - Updated the backend logic to check for an active task and return the appropriate SSE stream or a 204 No Content response if no task is running. - Modified the frontend to support the new resume functionality, enhancing user experience by allowing seamless continuation of chat sessions. - Updated OpenAPI documentation to reflect changes in endpoint behavior and parameters. --- .../backend/api/features/chat/routes.py | 104 ++++++++++-------- .../(platform)/copilot-2/useCopilotPage.ts | 5 + .../chat/sessions/[sessionId]/stream/route.ts | 33 +++--- .../frontend/src/app/api/openapi.json | 27 +---- 4 files changed, 78 insertions(+), 91 deletions(-) diff --git a/autogpt_platform/backend/backend/api/features/chat/routes.py b/autogpt_platform/backend/backend/api/features/chat/routes.py index cb855f2c62..6b1889bf97 100644 --- a/autogpt_platform/backend/backend/api/features/chat/routes.py +++ b/autogpt_platform/backend/backend/api/features/chat/routes.py @@ -6,7 +6,7 @@ from collections.abc import AsyncGenerator from typing import Annotated from autogpt_libs import auth -from fastapi import APIRouter, Depends, Header, HTTPException, Query, Security +from fastapi import APIRouter, Depends, Header, HTTPException, Query, Response, Security from fastapi.responses import StreamingResponse from pydantic import BaseModel @@ -393,63 +393,73 @@ async def stream_chat_post( @router.get( "/sessions/{session_id}/stream", ) -async def stream_chat_get( +async def resume_session_stream( session_id: str, - message: Annotated[str, Query(min_length=1, max_length=10000)], user_id: str | None = Depends(auth.get_user_id), - is_user_message: bool = Query(default=True), ): """ - Stream chat responses for a session (GET - legacy endpoint). + Resume an active stream for a session. - Streams the AI/completion responses in real time over Server-Sent Events (SSE), including: - - Text fragments as they are generated - - Tool call UI elements (if invoked) - - Tool execution results + Called by the AI SDK's ``useChat(resume: true)`` on page load. + Checks for an active (in-progress) task on the session and either replays + the full SSE stream or returns 204 No Content if nothing is running. Args: - session_id: The chat session identifier to associate with the streamed messages. - message: The user's new message to process. + session_id: The chat session identifier. user_id: Optional authenticated user ID. - is_user_message: Whether the message is a user message. - Returns: - StreamingResponse: SSE-formatted response chunks. + Returns: + StreamingResponse (SSE) when an active stream exists, + or 204 No Content when there is nothing to resume. """ - session = await _validate_and_get_session(session_id, user_id) + import asyncio + + active_task, _last_id = await stream_registry.get_active_task_for_session( + session_id, user_id + ) + + if not active_task: + return Response(status_code=204) + + subscriber_queue = await stream_registry.subscribe_to_task( + task_id=active_task.task_id, + user_id=user_id, + last_message_id="0-0", # Full replay so useChat rebuilds the message + ) + + if subscriber_queue is None: + return Response(status_code=204) async def event_generator() -> AsyncGenerator[str, None]: - chunk_count = 0 - first_chunk_type: str | None = None - async for chunk in chat_service.stream_chat_completion( - session_id, - message, - is_user_message=is_user_message, - user_id=user_id, - session=session, # Pass pre-fetched session to avoid double-fetch - ): - if chunk_count < 3: - logger.info( - "Chat stream chunk", - extra={ - "session_id": session_id, - "chunk_type": str(chunk.type), - }, + try: + while True: + try: + chunk = await asyncio.wait_for( + subscriber_queue.get(), timeout=30.0 + ) + yield chunk.to_sse() + + if isinstance(chunk, StreamFinish): + break + except asyncio.TimeoutError: + yield StreamHeartbeat().to_sse() + except GeneratorExit: + pass + except Exception as e: + logger.error( + f"Error in resume stream for session {session_id}: {e}" + ) + finally: + try: + await stream_registry.unsubscribe_from_task( + active_task.task_id, subscriber_queue ) - if not first_chunk_type: - first_chunk_type = str(chunk.type) - chunk_count += 1 - yield chunk.to_sse() - logger.info( - "Chat stream completed", - extra={ - "session_id": session_id, - "chunk_count": chunk_count, - "first_chunk_type": first_chunk_type, - }, - ) - # AI SDK protocol termination - yield "data: [DONE]\n\n" + except Exception as unsub_err: + logger.error( + f"Error unsubscribing from task {active_task.task_id}: {unsub_err}", + exc_info=True, + ) + yield "data: [DONE]\n\n" return StreamingResponse( event_generator(), @@ -457,8 +467,8 @@ async def stream_chat_get( headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", - "X-Accel-Buffering": "no", # Disable nginx buffering - "x-vercel-ai-ui-message-stream": "v1", # AI SDK protocol header + "X-Accel-Buffering": "no", + "x-vercel-ai-ui-message-stream": "v1", }, ) diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot-2/useCopilotPage.ts b/autogpt_platform/frontend/src/app/(platform)/copilot-2/useCopilotPage.ts index 01dd98ee9a..4e2f24e1c1 100644 --- a/autogpt_platform/frontend/src/app/(platform)/copilot-2/useCopilotPage.ts +++ b/autogpt_platform/frontend/src/app/(platform)/copilot-2/useCopilotPage.ts @@ -37,12 +37,17 @@ export function useCopilotPage() { }, }; }, + // Resume uses GET on the same endpoint (no message param → backend resumes) + prepareReconnectToStreamRequest: () => ({ + api: `/api/chat/sessions/${sessionId}/stream`, + }), }) : null; const { messages, sendMessage, status, error, setMessages } = useChat({ id: sessionId ?? undefined, transport: transport ?? undefined, + resume: !!sessionId, }); useEffect(() => { diff --git a/autogpt_platform/frontend/src/app/api/chat/sessions/[sessionId]/stream/route.ts b/autogpt_platform/frontend/src/app/api/chat/sessions/[sessionId]/stream/route.ts index d63eed0ca2..6facf80c58 100644 --- a/autogpt_platform/frontend/src/app/api/chat/sessions/[sessionId]/stream/route.ts +++ b/autogpt_platform/frontend/src/app/api/chat/sessions/[sessionId]/stream/route.ts @@ -88,39 +88,27 @@ export async function POST( } /** - * Legacy GET endpoint for backward compatibility + * Resume an active stream for a session. + * + * Called by the AI SDK's `useChat(resume: true)` on page load. + * Proxies to the backend which checks for an active stream and either + * replays it (200 + SSE) or returns 204 No Content. */ export async function GET( - request: NextRequest, + _request: NextRequest, { params }: { params: Promise<{ sessionId: string }> }, ) { const { sessionId } = await params; - const searchParams = request.nextUrl.searchParams; - const message = searchParams.get("message"); - const isUserMessage = searchParams.get("is_user_message"); - - if (!message) { - return new Response("Missing message parameter", { status: 400 }); - } try { - // Get auth token from server-side session const token = await getServerAuthToken(); - // Build backend URL const backendUrl = environment.getAGPTServerBaseUrl(); const streamUrl = new URL( `/api/chat/sessions/${sessionId}/stream`, backendUrl, ); - streamUrl.searchParams.set("message", message); - // Pass is_user_message parameter if provided - if (isUserMessage !== null) { - streamUrl.searchParams.set("is_user_message", isUserMessage); - } - - // Forward request to backend with auth header const headers: Record = { Accept: "text/event-stream", "Cache-Control": "no-cache", @@ -136,6 +124,11 @@ export async function GET( headers, }); + // 204 = no active stream to resume + if (response.status === 204) { + return new Response(null, { status: 204 }); + } + if (!response.ok) { const error = await response.text(); return new Response(error, { @@ -144,17 +137,17 @@ export async function GET( }); } - // Return the SSE stream directly return new Response(response.body, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache, no-transform", Connection: "keep-alive", "X-Accel-Buffering": "no", + "x-vercel-ai-ui-message-stream": "v1", }, }); } catch (error) { - console.error("SSE proxy error:", error); + console.error("Resume stream proxy error:", error); return new Response( JSON.stringify({ error: "Failed to connect to chat service", diff --git a/autogpt_platform/frontend/src/app/api/openapi.json b/autogpt_platform/frontend/src/app/api/openapi.json index f269f432f2..3249d91dd7 100644 --- a/autogpt_platform/frontend/src/app/api/openapi.json +++ b/autogpt_platform/frontend/src/app/api/openapi.json @@ -1234,9 +1234,9 @@ "/api/chat/sessions/{session_id}/stream": { "get": { "tags": ["v2", "chat", "chat"], - "summary": "Stream Chat Get", - "description": "Stream chat responses for a session (GET - legacy endpoint).\n\nStreams the AI/completion responses in real time over Server-Sent Events (SSE), including:\n - Text fragments as they are generated\n - Tool call UI elements (if invoked)\n - Tool execution results\n\nArgs:\n session_id: The chat session identifier to associate with the streamed messages.\n message: The user's new message to process.\n user_id: Optional authenticated user ID.\n is_user_message: Whether the message is a user message.\nReturns:\n StreamingResponse: SSE-formatted response chunks.", - "operationId": "getV2StreamChatGet", + "summary": "Resume Session Stream", + "description": "Resume an active stream for a session.\n\nCalled by the AI SDK's ``useChat(resume: true)`` on page load.\nChecks for an active (in-progress) task on the session and either replays\nthe full SSE stream or returns 204 No Content if nothing is running.\n\nArgs:\n session_id: The chat session identifier.\n user_id: Optional authenticated user ID.\n\nReturns:\n StreamingResponse (SSE) when an active stream exists,\n or 204 No Content when there is nothing to resume.", + "operationId": "getV2ResumeSessionStream", "security": [{ "HTTPBearerJWT": [] }], "parameters": [ { @@ -1244,27 +1244,6 @@ "in": "path", "required": true, "schema": { "type": "string", "title": "Session Id" } - }, - { - "name": "message", - "in": "query", - "required": true, - "schema": { - "type": "string", - "minLength": 1, - "maxLength": 10000, - "title": "Message" - } - }, - { - "name": "is_user_message", - "in": "query", - "required": false, - "schema": { - "type": "boolean", - "default": true, - "title": "Is User Message" - } } ], "responses": {