From a529a16d70f04ed32b6fed4e8f0d6b0772c5593a Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Tue, 27 Jan 2026 09:45:36 -0600 Subject: [PATCH] feat(backend): persist long-running tool results to survive SSE disconnects Agent generation (create_agent, edit_agent) can take several minutes. Previously, if the user closed their browser tab, the operation would be cancelled via CancelledError and the result lost. This change: - Adds `is_long_running` property to BaseTool for tools to opt-in - Long-running tools spawn background tasks that run independently of SSE - Saves "pending" message to chat history immediately - Updates chat history with result when background task completes - Adds idempotency check to prevent duplicate operations on refresh - Invalidates Redis cache after completion so refresh loads fresh data User can now close their tab during agent generation and see the result when they return to the chat. --- .../backend/backend/api/features/chat/db.py | 42 ++++ .../backend/api/features/chat/model.py | 10 + .../backend/api/features/chat/service.py | 184 +++++++++++++++++- .../api/features/chat/tools/__init__.py | 7 +- .../backend/api/features/chat/tools/base.py | 10 + .../api/features/chat/tools/create_agent.py | 4 + .../api/features/chat/tools/edit_agent.py | 4 + .../backend/api/features/chat/tools/models.py | 40 ++++ .../Chat/components/ChatContainer/helpers.ts | 33 ++++ .../components/ChatMessage/ChatMessage.tsx | 40 ++++ .../components/ChatMessage/useChatMessage.ts | 24 +++ .../PendingOperationWidget.tsx | 94 +++++++++ 12 files changed, 486 insertions(+), 6 deletions(-) create mode 100644 autogpt_platform/frontend/src/components/contextual/Chat/components/PendingOperationWidget/PendingOperationWidget.tsx diff --git a/autogpt_platform/backend/backend/api/features/chat/db.py b/autogpt_platform/backend/backend/api/features/chat/db.py index 05a3553cc8..d34b4e5b07 100644 --- a/autogpt_platform/backend/backend/api/features/chat/db.py +++ b/autogpt_platform/backend/backend/api/features/chat/db.py @@ -247,3 +247,45 @@ async def get_chat_session_message_count(session_id: str) -> int: """Get the number of messages in a chat session.""" count = await PrismaChatMessage.prisma().count(where={"sessionId": session_id}) return count + + +async def update_tool_message_content( + session_id: str, + tool_call_id: str, + new_content: str, +) -> bool: + """Update the content of a tool message in chat history. + + Used by background tasks to update pending operation messages with final results. + + Args: + session_id: The chat session ID. + tool_call_id: The tool call ID to find the message. + new_content: The new content to set. + + Returns: + True if a message was updated, False otherwise. + """ + try: + result = await PrismaChatMessage.prisma().update_many( + where={ + "sessionId": session_id, + "toolCallId": tool_call_id, + }, + data={ + "content": new_content, + }, + ) + if result == 0: + logger.warning( + f"No message found to update for session {session_id}, " + f"tool_call_id {tool_call_id}" + ) + return False + return True + except Exception as e: + logger.error( + f"Failed to update tool message for session {session_id}, " + f"tool_call_id {tool_call_id}: {e}" + ) + return False diff --git a/autogpt_platform/backend/backend/api/features/chat/model.py b/autogpt_platform/backend/backend/api/features/chat/model.py index 75bda11127..738c02cd80 100644 --- a/autogpt_platform/backend/backend/api/features/chat/model.py +++ b/autogpt_platform/backend/backend/api/features/chat/model.py @@ -295,6 +295,16 @@ async def cache_chat_session(session: ChatSession) -> None: await _cache_session(session) +async def invalidate_session_cache(session_id: str) -> None: + """Invalidate a chat session from Redis cache. + + Used by background tasks to ensure fresh data is loaded on next access. + """ + redis_key = _get_session_cache_key(session_id) + async_redis = await get_redis_async() + await async_redis.delete(redis_key) + + async def _get_session_from_db(session_id: str) -> ChatSession | None: """Get a chat session from the database.""" prisma_session = await chat_db.get_chat_session(session_id) diff --git a/autogpt_platform/backend/backend/api/features/chat/service.py b/autogpt_platform/backend/backend/api/features/chat/service.py index 386b37784d..f43f13f05e 100644 --- a/autogpt_platform/backend/backend/api/features/chat/service.py +++ b/autogpt_platform/backend/backend/api/features/chat/service.py @@ -24,6 +24,7 @@ from backend.data.understanding import ( from backend.util.exceptions import NotFoundError from backend.util.settings import Settings +from . import db as chat_db from .config import ChatConfig from .model import ( ChatMessage, @@ -31,6 +32,7 @@ from .model import ( Usage, cache_chat_session, get_chat_session, + invalidate_session_cache, update_session_title, upsert_chat_session, ) @@ -48,8 +50,13 @@ from .response_model import ( StreamToolOutputAvailable, StreamUsage, ) -from .tools import execute_tool, tools -from .tools.models import ErrorResponse +from .tools import execute_tool, get_tool, tools +from .tools.models import ( + ErrorResponse, + OperationInProgressResponse, + OperationPendingResponse, + OperationStartedResponse, +) from .tracking import track_user_message logger = logging.getLogger(__name__) @@ -61,6 +68,10 @@ client = openai.AsyncOpenAI(api_key=config.api_key, base_url=config.base_url) langfuse = get_client() +# In-memory tracking of running long-running operations (tool_call_id -> asyncio.Task) +# Used for idempotency - prevents duplicate executions on browser refresh +_running_operations: dict[str, asyncio.Task] = {} + class LangfuseNotConfiguredError(Exception): """Raised when Langfuse is required but not configured.""" @@ -1260,14 +1271,17 @@ async def _yield_tool_call( """ Yield a tool call and its execution result. - For long-running tools, yields heartbeat events every 15 seconds to keep - the SSE connection alive through proxies and load balancers. + For tools marked with `is_long_running=True` (like agent generation), spawns a + background task so the operation survives SSE disconnections. For other tools, + yields heartbeat events every 15 seconds to keep the SSE connection alive. Raises: orjson.JSONDecodeError: If tool call arguments cannot be parsed as JSON KeyError: If expected tool call fields are missing TypeError: If tool call structure is invalid """ + import uuid as uuid_module + tool_name = tool_calls[yield_idx]["function"]["name"] tool_call_id = tool_calls[yield_idx]["id"] logger.info(f"Yielding tool call: {tool_calls[yield_idx]}") @@ -1285,7 +1299,79 @@ async def _yield_tool_call( input=arguments, ) - # Run tool execution in background task with heartbeats to keep connection alive + # Check if this tool is long-running (survives SSE disconnection) + tool = get_tool(tool_name) + if tool and tool.is_long_running: + # Idempotency check - if this tool_call_id is already running, return status + if tool_call_id in _running_operations: + existing_task = _running_operations[tool_call_id] + if not existing_task.done(): + logger.info( + f"Tool call {tool_call_id} already in progress, returning status" + ) + yield StreamToolOutputAvailable( + toolCallId=tool_call_id, + toolName=tool_name, + output=OperationInProgressResponse( + message="Agent creation already in progress. Please wait...", + tool_call_id=tool_call_id, + ).model_dump_json(), + success=True, + ) + return + + # Generate operation ID + operation_id = str(uuid_module.uuid4()) + + # Save "pending" tool response to chat history immediately + pending_message = ChatMessage( + role="tool", + content=OperationPendingResponse( + message="Creating agent... This may take a few minutes.", + operation_id=operation_id, + tool_name=tool_name, + ).model_dump_json(), + tool_call_id=tool_call_id, + ) + session.messages.append(pending_message) + await upsert_chat_session(session) + logger.info( + f"Saved pending operation {operation_id} for tool {tool_name} " + f"in session {session.session_id}" + ) + + # Start background task (NOT tied to SSE) + task = asyncio.create_task( + _execute_long_running_tool( + tool_name=tool_name, + parameters=arguments, + tool_call_id=tool_call_id, + operation_id=operation_id, + session_id=session.session_id, + user_id=session.user_id, + ) + ) + # Track for idempotency and cleanup + _running_operations[tool_call_id] = task + task.add_done_callback(lambda _: _running_operations.pop(tool_call_id, None)) + + # Return immediately - don't wait for completion + yield StreamToolOutputAvailable( + toolCallId=tool_call_id, + toolName=tool_name, + output=OperationStartedResponse( + message=( + "Agent creation started. You can close this tab - " + "check your library in a few minutes." + ), + operation_id=operation_id, + tool_name=tool_name, + ).model_dump_json(), + success=True, + ) + return + + # Normal flow: Run tool execution in background task with heartbeats tool_task = asyncio.create_task( execute_tool( tool_name=tool_name, @@ -1335,3 +1421,91 @@ async def _yield_tool_call( ) yield tool_execution_response + + +async def _execute_long_running_tool( + tool_name: str, + parameters: dict[str, Any], + tool_call_id: str, + operation_id: str, + session_id: str, + user_id: str | None, +) -> None: + """Execute a long-running tool in background and update chat history with result. + + This function runs independently of the SSE connection, so the operation + survives if the user closes their browser tab. + """ + try: + # Load fresh session (not stale reference) + session = await get_chat_session(session_id, user_id) + if not session: + logger.error(f"Session {session_id} not found for background tool") + return + + # Execute the actual tool + result = await execute_tool( + tool_name=tool_name, + parameters=parameters, + tool_call_id=tool_call_id, + user_id=user_id, + session=session, + ) + + # Update the pending message with result + await _update_pending_operation( + session_id=session_id, + tool_call_id=tool_call_id, + result=( + result.output + if isinstance(result.output, str) + else orjson.dumps(result.output).decode("utf-8") + ), + success=result.success, + ) + + logger.info(f"Background tool {tool_name} completed for session {session_id}") + + except Exception as e: + logger.error(f"Background tool {tool_name} failed: {e}", exc_info=True) + error_response = { + "type": "error", + "message": f"Agent creation failed: {str(e)}", + } + await _update_pending_operation( + session_id=session_id, + tool_call_id=tool_call_id, + result=orjson.dumps(error_response).decode("utf-8"), + success=False, + ) + + +async def _update_pending_operation( + session_id: str, + tool_call_id: str, + result: str, + success: bool, +) -> None: + """Update the pending tool message with final result. + + This is called by background tasks when long-running operations complete. + """ + # Update the message in database + updated = await chat_db.update_tool_message_content( + session_id=session_id, + tool_call_id=tool_call_id, + new_content=result, + ) + + if updated: + # Invalidate Redis cache so next load gets fresh data + await invalidate_session_cache(session_id) + logger.info( + f"Updated pending operation for tool_call_id {tool_call_id} " + f"in session {session_id}" + ) + else: + logger.warning( + f"Failed to update pending operation for tool_call_id {tool_call_id} " + f"in session {session_id}" + ) diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/__init__.py b/autogpt_platform/backend/backend/api/features/chat/tools/__init__.py index 284273f3b9..beeb128ae9 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/__init__.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/__init__.py @@ -49,6 +49,11 @@ tools: list[ChatCompletionToolParam] = [ ] +def get_tool(tool_name: str) -> BaseTool | None: + """Get a tool instance by name.""" + return TOOL_REGISTRY.get(tool_name) + + async def execute_tool( tool_name: str, parameters: dict[str, Any], @@ -57,7 +62,7 @@ async def execute_tool( tool_call_id: str, ) -> "StreamToolOutputAvailable": """Execute a tool by name.""" - tool = TOOL_REGISTRY.get(tool_name) + tool = get_tool(tool_name) if not tool: raise ValueError(f"Tool {tool_name} not found") diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/base.py b/autogpt_platform/backend/backend/api/features/chat/tools/base.py index 1dc40c18c7..809e06632b 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/base.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/base.py @@ -36,6 +36,16 @@ class BaseTool: """Whether this tool requires authentication.""" return False + @property + def is_long_running(self) -> bool: + """Whether this tool is long-running and should execute in background. + + Long-running tools (like agent generation) are executed via background + tasks to survive SSE disconnections. The result is persisted to chat + history and visible when the user refreshes. + """ + return False + def as_openai_tool(self) -> ChatCompletionToolParam: """Convert to OpenAI tool format.""" return ChatCompletionToolParam( diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py b/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py index 87ca5ebca7..6b3784e323 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py @@ -42,6 +42,10 @@ class CreateAgentTool(BaseTool): def requires_auth(self) -> bool: return True + @property + def is_long_running(self) -> bool: + return True + @property def parameters(self) -> dict[str, Any]: return { diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py b/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py index d65b050f06..7c4da8ad43 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py @@ -42,6 +42,10 @@ class EditAgentTool(BaseTool): def requires_auth(self) -> bool: return True + @property + def is_long_running(self) -> bool: + return True + @property def parameters(self) -> dict[str, Any]: return { diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/models.py b/autogpt_platform/backend/backend/api/features/chat/tools/models.py index 1736ddb9a8..8552681d03 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/models.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/models.py @@ -28,6 +28,10 @@ class ResponseType(str, Enum): BLOCK_OUTPUT = "block_output" DOC_SEARCH_RESULTS = "doc_search_results" DOC_PAGE = "doc_page" + # Long-running operation types + OPERATION_STARTED = "operation_started" + OPERATION_PENDING = "operation_pending" + OPERATION_IN_PROGRESS = "operation_in_progress" # Base response model @@ -334,3 +338,39 @@ class BlockOutputResponse(ToolResponseBase): block_name: str outputs: dict[str, list[Any]] success: bool = True + + +# Long-running operation models +class OperationStartedResponse(ToolResponseBase): + """Response when a long-running operation has been started in the background. + + This is returned immediately to the client while the operation continues + to execute. The user can close the tab and check back later. + """ + + type: ResponseType = ResponseType.OPERATION_STARTED + operation_id: str + tool_name: str + + +class OperationPendingResponse(ToolResponseBase): + """Response stored in chat history while a long-running operation is executing. + + This is persisted to the database so users see a pending state when they + refresh before the operation completes. + """ + + type: ResponseType = ResponseType.OPERATION_PENDING + operation_id: str + tool_name: str + + +class OperationInProgressResponse(ToolResponseBase): + """Response when an operation is already in progress. + + Returned for idempotency when the same tool_call_id is requested again + while the background task is still running. + """ + + type: ResponseType = ResponseType.OPERATION_IN_PROGRESS + tool_call_id: str diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/helpers.ts b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/helpers.ts index 7dee924634..216671615c 100644 --- a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/helpers.ts +++ b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/helpers.ts @@ -341,6 +341,39 @@ export function parseToolResponse( timestamp: timestamp || new Date(), }; } + if (responseType === "operation_started") { + return { + type: "operation_started", + toolName: parsedResult.tool_name as string, + operationId: (parsedResult.operation_id as string) || "", + message: + (parsedResult.message as string) || + "Operation started. You can close this tab.", + timestamp: timestamp || new Date(), + }; + } + if (responseType === "operation_pending") { + return { + type: "operation_pending", + toolName: parsedResult.tool_name as string, + operationId: (parsedResult.operation_id as string) || "", + message: + (parsedResult.message as string) || + "Operation in progress. Please wait...", + timestamp: timestamp || new Date(), + }; + } + if (responseType === "operation_in_progress") { + return { + type: "operation_in_progress", + toolName: parsedResult.tool_name as string, + toolCallId: (parsedResult.tool_call_id as string) || "", + message: + (parsedResult.message as string) || + "Operation already in progress. Please wait...", + timestamp: timestamp || new Date(), + }; + } if (responseType === "need_login") { return { type: "login_needed", diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/ChatMessage.tsx b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/ChatMessage.tsx index 29e3a60a8c..c922d0da76 100644 --- a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/ChatMessage.tsx +++ b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/ChatMessage.tsx @@ -16,6 +16,7 @@ import { AuthPromptWidget } from "../AuthPromptWidget/AuthPromptWidget"; import { ChatCredentialsSetup } from "../ChatCredentialsSetup/ChatCredentialsSetup"; import { ClarificationQuestionsWidget } from "../ClarificationQuestionsWidget/ClarificationQuestionsWidget"; import { ExecutionStartedMessage } from "../ExecutionStartedMessage/ExecutionStartedMessage"; +import { PendingOperationWidget } from "../PendingOperationWidget/PendingOperationWidget"; import { MarkdownContent } from "../MarkdownContent/MarkdownContent"; import { NoResultsMessage } from "../NoResultsMessage/NoResultsMessage"; import { ToolCallMessage } from "../ToolCallMessage/ToolCallMessage"; @@ -71,6 +72,9 @@ export function ChatMessage({ isLoginNeeded, isCredentialsNeeded, isClarificationNeeded, + isOperationStarted, + isOperationPending, + isOperationInProgress, } = useChatMessage(message); const displayContent = getDisplayContent(message, isUser); @@ -290,6 +294,42 @@ export function ChatMessage({ ); } + // Render operation_started messages (long-running background operations) + if (isOperationStarted && message.type === "operation_started") { + return ( + + ); + } + + // Render operation_pending messages (operations in progress when refreshing) + if (isOperationPending && message.type === "operation_pending") { + return ( + + ); + } + + // Render operation_in_progress messages (duplicate request while operation running) + if (isOperationInProgress && message.type === "operation_in_progress") { + return ( + + ); + } + // Render tool response messages (but skip agent_output if it's being rendered inside assistant message) if (isToolResponse && message.type === "tool_response") { return ( diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/useChatMessage.ts b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/useChatMessage.ts index 142b140c8b..e2ef9cf276 100644 --- a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/useChatMessage.ts +++ b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/useChatMessage.ts @@ -103,6 +103,27 @@ export type ChatMessageData = message: string; sessionId: string; timestamp?: string | Date; + } + | { + type: "operation_started"; + toolName: string; + operationId: string; + message: string; + timestamp?: string | Date; + } + | { + type: "operation_pending"; + toolName: string; + operationId: string; + message: string; + timestamp?: string | Date; + } + | { + type: "operation_in_progress"; + toolName: string; + toolCallId: string; + message: string; + timestamp?: string | Date; }; export function useChatMessage(message: ChatMessageData) { @@ -124,5 +145,8 @@ export function useChatMessage(message: ChatMessageData) { isExecutionStarted: message.type === "execution_started", isInputsNeeded: message.type === "inputs_needed", isClarificationNeeded: message.type === "clarification_needed", + isOperationStarted: message.type === "operation_started", + isOperationPending: message.type === "operation_pending", + isOperationInProgress: message.type === "operation_in_progress", }; } diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/components/PendingOperationWidget/PendingOperationWidget.tsx b/autogpt_platform/frontend/src/components/contextual/Chat/components/PendingOperationWidget/PendingOperationWidget.tsx new file mode 100644 index 0000000000..514e750eeb --- /dev/null +++ b/autogpt_platform/frontend/src/components/contextual/Chat/components/PendingOperationWidget/PendingOperationWidget.tsx @@ -0,0 +1,94 @@ +"use client"; + +import { Card } from "@/components/atoms/Card/Card"; +import { Text } from "@/components/atoms/Text/Text"; +import { cn } from "@/lib/utils"; +import { CircleNotch, CheckCircle, XCircle } from "@phosphor-icons/react"; + +type OperationStatus = + | "pending" + | "started" + | "in_progress" + | "completed" + | "error"; + +interface Props { + status: OperationStatus; + message: string; + toolName?: string; + className?: string; +} + +export function PendingOperationWidget({ + status, + message, + toolName, + className, +}: Props) { + const isPending = + status === "pending" || status === "started" || status === "in_progress"; + const isCompleted = status === "completed"; + const isError = status === "error"; + + return ( +
+
+
+
+ {isPending && ( + + )} + {isCompleted && ( + + )} + {isError && ( + + )} +
+
+ +
+ +
+ + {isPending && "Creating Agent"} + {isCompleted && "Operation Complete"} + {isError && "Operation Failed"} + + + {message} + +
+ + {isPending && ( + + You can close this tab. Check your library in a few minutes. + + )} + + {toolName && ( + + Tool: {toolName} + + )} +
+
+
+
+ ); +}