diff --git a/autogpt_platform/backend/backend/api/features/chat/db.py b/autogpt_platform/backend/backend/api/features/chat/db.py
index 05a3553cc8..d34b4e5b07 100644
--- a/autogpt_platform/backend/backend/api/features/chat/db.py
+++ b/autogpt_platform/backend/backend/api/features/chat/db.py
@@ -247,3 +247,45 @@ async def get_chat_session_message_count(session_id: str) -> int:
"""Get the number of messages in a chat session."""
count = await PrismaChatMessage.prisma().count(where={"sessionId": session_id})
return count
+
+
+async def update_tool_message_content(
+ session_id: str,
+ tool_call_id: str,
+ new_content: str,
+) -> bool:
+ """Update the content of a tool message in chat history.
+
+ Used by background tasks to update pending operation messages with final results.
+
+ Args:
+ session_id: The chat session ID.
+ tool_call_id: The tool call ID to find the message.
+ new_content: The new content to set.
+
+ Returns:
+ True if a message was updated, False otherwise.
+ """
+ try:
+ result = await PrismaChatMessage.prisma().update_many(
+ where={
+ "sessionId": session_id,
+ "toolCallId": tool_call_id,
+ },
+ data={
+ "content": new_content,
+ },
+ )
+ if result == 0:
+ logger.warning(
+ f"No message found to update for session {session_id}, "
+ f"tool_call_id {tool_call_id}"
+ )
+ return False
+ return True
+ except Exception as e:
+ logger.error(
+ f"Failed to update tool message for session {session_id}, "
+ f"tool_call_id {tool_call_id}: {e}"
+ )
+ return False
diff --git a/autogpt_platform/backend/backend/api/features/chat/model.py b/autogpt_platform/backend/backend/api/features/chat/model.py
index 75bda11127..738c02cd80 100644
--- a/autogpt_platform/backend/backend/api/features/chat/model.py
+++ b/autogpt_platform/backend/backend/api/features/chat/model.py
@@ -295,6 +295,16 @@ async def cache_chat_session(session: ChatSession) -> None:
await _cache_session(session)
+async def invalidate_session_cache(session_id: str) -> None:
+ """Invalidate a chat session from Redis cache.
+
+ Used by background tasks to ensure fresh data is loaded on next access.
+ """
+ redis_key = _get_session_cache_key(session_id)
+ async_redis = await get_redis_async()
+ await async_redis.delete(redis_key)
+
+
async def _get_session_from_db(session_id: str) -> ChatSession | None:
"""Get a chat session from the database."""
prisma_session = await chat_db.get_chat_session(session_id)
diff --git a/autogpt_platform/backend/backend/api/features/chat/service.py b/autogpt_platform/backend/backend/api/features/chat/service.py
index 386b37784d..f43f13f05e 100644
--- a/autogpt_platform/backend/backend/api/features/chat/service.py
+++ b/autogpt_platform/backend/backend/api/features/chat/service.py
@@ -24,6 +24,7 @@ from backend.data.understanding import (
from backend.util.exceptions import NotFoundError
from backend.util.settings import Settings
+from . import db as chat_db
from .config import ChatConfig
from .model import (
ChatMessage,
@@ -31,6 +32,7 @@ from .model import (
Usage,
cache_chat_session,
get_chat_session,
+ invalidate_session_cache,
update_session_title,
upsert_chat_session,
)
@@ -48,8 +50,13 @@ from .response_model import (
StreamToolOutputAvailable,
StreamUsage,
)
-from .tools import execute_tool, tools
-from .tools.models import ErrorResponse
+from .tools import execute_tool, get_tool, tools
+from .tools.models import (
+ ErrorResponse,
+ OperationInProgressResponse,
+ OperationPendingResponse,
+ OperationStartedResponse,
+)
from .tracking import track_user_message
logger = logging.getLogger(__name__)
@@ -61,6 +68,10 @@ client = openai.AsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
langfuse = get_client()
+# In-memory tracking of running long-running operations (tool_call_id -> asyncio.Task)
+# Used for idempotency - prevents duplicate executions on browser refresh
+_running_operations: dict[str, asyncio.Task] = {}
+
class LangfuseNotConfiguredError(Exception):
"""Raised when Langfuse is required but not configured."""
@@ -1260,14 +1271,17 @@ async def _yield_tool_call(
"""
Yield a tool call and its execution result.
- For long-running tools, yields heartbeat events every 15 seconds to keep
- the SSE connection alive through proxies and load balancers.
+ For tools marked with `is_long_running=True` (like agent generation), spawns a
+ background task so the operation survives SSE disconnections. For other tools,
+ yields heartbeat events every 15 seconds to keep the SSE connection alive.
Raises:
orjson.JSONDecodeError: If tool call arguments cannot be parsed as JSON
KeyError: If expected tool call fields are missing
TypeError: If tool call structure is invalid
"""
+ import uuid as uuid_module
+
tool_name = tool_calls[yield_idx]["function"]["name"]
tool_call_id = tool_calls[yield_idx]["id"]
logger.info(f"Yielding tool call: {tool_calls[yield_idx]}")
@@ -1285,7 +1299,79 @@ async def _yield_tool_call(
input=arguments,
)
- # Run tool execution in background task with heartbeats to keep connection alive
+ # Check if this tool is long-running (survives SSE disconnection)
+ tool = get_tool(tool_name)
+ if tool and tool.is_long_running:
+ # Idempotency check - if this tool_call_id is already running, return status
+ if tool_call_id in _running_operations:
+ existing_task = _running_operations[tool_call_id]
+ if not existing_task.done():
+ logger.info(
+ f"Tool call {tool_call_id} already in progress, returning status"
+ )
+ yield StreamToolOutputAvailable(
+ toolCallId=tool_call_id,
+ toolName=tool_name,
+ output=OperationInProgressResponse(
+ message="Agent creation already in progress. Please wait...",
+ tool_call_id=tool_call_id,
+ ).model_dump_json(),
+ success=True,
+ )
+ return
+
+ # Generate operation ID
+ operation_id = str(uuid_module.uuid4())
+
+ # Save "pending" tool response to chat history immediately
+ pending_message = ChatMessage(
+ role="tool",
+ content=OperationPendingResponse(
+ message="Creating agent... This may take a few minutes.",
+ operation_id=operation_id,
+ tool_name=tool_name,
+ ).model_dump_json(),
+ tool_call_id=tool_call_id,
+ )
+ session.messages.append(pending_message)
+ await upsert_chat_session(session)
+ logger.info(
+ f"Saved pending operation {operation_id} for tool {tool_name} "
+ f"in session {session.session_id}"
+ )
+
+ # Start background task (NOT tied to SSE)
+ task = asyncio.create_task(
+ _execute_long_running_tool(
+ tool_name=tool_name,
+ parameters=arguments,
+ tool_call_id=tool_call_id,
+ operation_id=operation_id,
+ session_id=session.session_id,
+ user_id=session.user_id,
+ )
+ )
+ # Track for idempotency and cleanup
+ _running_operations[tool_call_id] = task
+ task.add_done_callback(lambda _: _running_operations.pop(tool_call_id, None))
+
+ # Return immediately - don't wait for completion
+ yield StreamToolOutputAvailable(
+ toolCallId=tool_call_id,
+ toolName=tool_name,
+ output=OperationStartedResponse(
+ message=(
+ "Agent creation started. You can close this tab - "
+ "check your library in a few minutes."
+ ),
+ operation_id=operation_id,
+ tool_name=tool_name,
+ ).model_dump_json(),
+ success=True,
+ )
+ return
+
+ # Normal flow: Run tool execution in background task with heartbeats
tool_task = asyncio.create_task(
execute_tool(
tool_name=tool_name,
@@ -1335,3 +1421,91 @@ async def _yield_tool_call(
)
yield tool_execution_response
+
+
+async def _execute_long_running_tool(
+ tool_name: str,
+ parameters: dict[str, Any],
+ tool_call_id: str,
+ operation_id: str,
+ session_id: str,
+ user_id: str | None,
+) -> None:
+ """Execute a long-running tool in background and update chat history with result.
+
+ This function runs independently of the SSE connection, so the operation
+ survives if the user closes their browser tab.
+ """
+ try:
+ # Load fresh session (not stale reference)
+ session = await get_chat_session(session_id, user_id)
+ if not session:
+ logger.error(f"Session {session_id} not found for background tool")
+ return
+
+ # Execute the actual tool
+ result = await execute_tool(
+ tool_name=tool_name,
+ parameters=parameters,
+ tool_call_id=tool_call_id,
+ user_id=user_id,
+ session=session,
+ )
+
+ # Update the pending message with result
+ await _update_pending_operation(
+ session_id=session_id,
+ tool_call_id=tool_call_id,
+ result=(
+ result.output
+ if isinstance(result.output, str)
+ else orjson.dumps(result.output).decode("utf-8")
+ ),
+ success=result.success,
+ )
+
+ logger.info(f"Background tool {tool_name} completed for session {session_id}")
+
+ except Exception as e:
+ logger.error(f"Background tool {tool_name} failed: {e}", exc_info=True)
+ error_response = {
+ "type": "error",
+ "message": f"Agent creation failed: {str(e)}",
+ }
+ await _update_pending_operation(
+ session_id=session_id,
+ tool_call_id=tool_call_id,
+ result=orjson.dumps(error_response).decode("utf-8"),
+ success=False,
+ )
+
+
+async def _update_pending_operation(
+ session_id: str,
+ tool_call_id: str,
+ result: str,
+ success: bool,
+) -> None:
+ """Update the pending tool message with final result.
+
+ This is called by background tasks when long-running operations complete.
+ """
+ # Update the message in database
+ updated = await chat_db.update_tool_message_content(
+ session_id=session_id,
+ tool_call_id=tool_call_id,
+ new_content=result,
+ )
+
+ if updated:
+ # Invalidate Redis cache so next load gets fresh data
+ await invalidate_session_cache(session_id)
+ logger.info(
+ f"Updated pending operation for tool_call_id {tool_call_id} "
+ f"in session {session_id}"
+ )
+ else:
+ logger.warning(
+ f"Failed to update pending operation for tool_call_id {tool_call_id} "
+ f"in session {session_id}"
+ )
diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/__init__.py b/autogpt_platform/backend/backend/api/features/chat/tools/__init__.py
index 284273f3b9..beeb128ae9 100644
--- a/autogpt_platform/backend/backend/api/features/chat/tools/__init__.py
+++ b/autogpt_platform/backend/backend/api/features/chat/tools/__init__.py
@@ -49,6 +49,11 @@ tools: list[ChatCompletionToolParam] = [
]
+def get_tool(tool_name: str) -> BaseTool | None:
+ """Get a tool instance by name."""
+ return TOOL_REGISTRY.get(tool_name)
+
+
async def execute_tool(
tool_name: str,
parameters: dict[str, Any],
@@ -57,7 +62,7 @@ async def execute_tool(
tool_call_id: str,
) -> "StreamToolOutputAvailable":
"""Execute a tool by name."""
- tool = TOOL_REGISTRY.get(tool_name)
+ tool = get_tool(tool_name)
if not tool:
raise ValueError(f"Tool {tool_name} not found")
diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/base.py b/autogpt_platform/backend/backend/api/features/chat/tools/base.py
index 1dc40c18c7..809e06632b 100644
--- a/autogpt_platform/backend/backend/api/features/chat/tools/base.py
+++ b/autogpt_platform/backend/backend/api/features/chat/tools/base.py
@@ -36,6 +36,16 @@ class BaseTool:
"""Whether this tool requires authentication."""
return False
+ @property
+ def is_long_running(self) -> bool:
+ """Whether this tool is long-running and should execute in background.
+
+ Long-running tools (like agent generation) are executed via background
+ tasks to survive SSE disconnections. The result is persisted to chat
+ history and visible when the user refreshes.
+ """
+ return False
+
def as_openai_tool(self) -> ChatCompletionToolParam:
"""Convert to OpenAI tool format."""
return ChatCompletionToolParam(
diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py b/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py
index 87ca5ebca7..6b3784e323 100644
--- a/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py
+++ b/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py
@@ -42,6 +42,10 @@ class CreateAgentTool(BaseTool):
def requires_auth(self) -> bool:
return True
+ @property
+ def is_long_running(self) -> bool:
+ return True
+
@property
def parameters(self) -> dict[str, Any]:
return {
diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py b/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py
index d65b050f06..7c4da8ad43 100644
--- a/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py
+++ b/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py
@@ -42,6 +42,10 @@ class EditAgentTool(BaseTool):
def requires_auth(self) -> bool:
return True
+ @property
+ def is_long_running(self) -> bool:
+ return True
+
@property
def parameters(self) -> dict[str, Any]:
return {
diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/models.py b/autogpt_platform/backend/backend/api/features/chat/tools/models.py
index 1736ddb9a8..8552681d03 100644
--- a/autogpt_platform/backend/backend/api/features/chat/tools/models.py
+++ b/autogpt_platform/backend/backend/api/features/chat/tools/models.py
@@ -28,6 +28,10 @@ class ResponseType(str, Enum):
BLOCK_OUTPUT = "block_output"
DOC_SEARCH_RESULTS = "doc_search_results"
DOC_PAGE = "doc_page"
+ # Long-running operation types
+ OPERATION_STARTED = "operation_started"
+ OPERATION_PENDING = "operation_pending"
+ OPERATION_IN_PROGRESS = "operation_in_progress"
# Base response model
@@ -334,3 +338,39 @@ class BlockOutputResponse(ToolResponseBase):
block_name: str
outputs: dict[str, list[Any]]
success: bool = True
+
+
+# Long-running operation models
+class OperationStartedResponse(ToolResponseBase):
+ """Response when a long-running operation has been started in the background.
+
+ This is returned immediately to the client while the operation continues
+ to execute. The user can close the tab and check back later.
+ """
+
+ type: ResponseType = ResponseType.OPERATION_STARTED
+ operation_id: str
+ tool_name: str
+
+
+class OperationPendingResponse(ToolResponseBase):
+ """Response stored in chat history while a long-running operation is executing.
+
+ This is persisted to the database so users see a pending state when they
+ refresh before the operation completes.
+ """
+
+ type: ResponseType = ResponseType.OPERATION_PENDING
+ operation_id: str
+ tool_name: str
+
+
+class OperationInProgressResponse(ToolResponseBase):
+ """Response when an operation is already in progress.
+
+ Returned for idempotency when the same tool_call_id is requested again
+ while the background task is still running.
+ """
+
+ type: ResponseType = ResponseType.OPERATION_IN_PROGRESS
+ tool_call_id: str
diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/helpers.ts b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/helpers.ts
index 7dee924634..216671615c 100644
--- a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/helpers.ts
+++ b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/helpers.ts
@@ -341,6 +341,39 @@ export function parseToolResponse(
timestamp: timestamp || new Date(),
};
}
+ if (responseType === "operation_started") {
+ return {
+ type: "operation_started",
+ toolName: parsedResult.tool_name as string,
+ operationId: (parsedResult.operation_id as string) || "",
+ message:
+ (parsedResult.message as string) ||
+ "Operation started. You can close this tab.",
+ timestamp: timestamp || new Date(),
+ };
+ }
+ if (responseType === "operation_pending") {
+ return {
+ type: "operation_pending",
+ toolName: parsedResult.tool_name as string,
+ operationId: (parsedResult.operation_id as string) || "",
+ message:
+ (parsedResult.message as string) ||
+ "Operation in progress. Please wait...",
+ timestamp: timestamp || new Date(),
+ };
+ }
+ if (responseType === "operation_in_progress") {
+ return {
+ type: "operation_in_progress",
+ toolName: parsedResult.tool_name as string,
+ toolCallId: (parsedResult.tool_call_id as string) || "",
+ message:
+ (parsedResult.message as string) ||
+ "Operation already in progress. Please wait...",
+ timestamp: timestamp || new Date(),
+ };
+ }
if (responseType === "need_login") {
return {
type: "login_needed",
diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/ChatMessage.tsx b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/ChatMessage.tsx
index 29e3a60a8c..c922d0da76 100644
--- a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/ChatMessage.tsx
+++ b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/ChatMessage.tsx
@@ -16,6 +16,7 @@ import { AuthPromptWidget } from "../AuthPromptWidget/AuthPromptWidget";
import { ChatCredentialsSetup } from "../ChatCredentialsSetup/ChatCredentialsSetup";
import { ClarificationQuestionsWidget } from "../ClarificationQuestionsWidget/ClarificationQuestionsWidget";
import { ExecutionStartedMessage } from "../ExecutionStartedMessage/ExecutionStartedMessage";
+import { PendingOperationWidget } from "../PendingOperationWidget/PendingOperationWidget";
import { MarkdownContent } from "../MarkdownContent/MarkdownContent";
import { NoResultsMessage } from "../NoResultsMessage/NoResultsMessage";
import { ToolCallMessage } from "../ToolCallMessage/ToolCallMessage";
@@ -71,6 +72,9 @@ export function ChatMessage({
isLoginNeeded,
isCredentialsNeeded,
isClarificationNeeded,
+ isOperationStarted,
+ isOperationPending,
+ isOperationInProgress,
} = useChatMessage(message);
const displayContent = getDisplayContent(message, isUser);
@@ -290,6 +294,42 @@ export function ChatMessage({
);
}
+ // Render operation_started messages (long-running background operations)
+ if (isOperationStarted && message.type === "operation_started") {
+ return (
+