feat(backend): persist long-running tool results to survive SSE disconnects

Agent generation (create_agent, edit_agent) can take several minutes.
Previously, if the user closed their browser tab, the operation would be
cancelled via CancelledError and the result lost.

This change:
- Adds `is_long_running` property to BaseTool for tools to opt-in
- Long-running tools spawn background tasks that run independently of SSE
- Saves "pending" message to chat history immediately
- Updates chat history with result when background task completes
- Adds idempotency check to prevent duplicate operations on refresh
- Invalidates Redis cache after completion so refresh loads fresh data

User can now close their tab during agent generation and see the result
when they return to the chat.
This commit is contained in:
Zamil Majdy
2026-01-27 09:45:36 -06:00
parent 071b3bb5cd
commit a529a16d70
12 changed files with 486 additions and 6 deletions

View File

@@ -247,3 +247,45 @@ async def get_chat_session_message_count(session_id: str) -> int:
"""Get the number of messages in a chat session."""
count = await PrismaChatMessage.prisma().count(where={"sessionId": session_id})
return count
async def update_tool_message_content(
session_id: str,
tool_call_id: str,
new_content: str,
) -> bool:
"""Update the content of a tool message in chat history.
Used by background tasks to update pending operation messages with final results.
Args:
session_id: The chat session ID.
tool_call_id: The tool call ID to find the message.
new_content: The new content to set.
Returns:
True if a message was updated, False otherwise.
"""
try:
result = await PrismaChatMessage.prisma().update_many(
where={
"sessionId": session_id,
"toolCallId": tool_call_id,
},
data={
"content": new_content,
},
)
if result == 0:
logger.warning(
f"No message found to update for session {session_id}, "
f"tool_call_id {tool_call_id}"
)
return False
return True
except Exception as e:
logger.error(
f"Failed to update tool message for session {session_id}, "
f"tool_call_id {tool_call_id}: {e}"
)
return False

View File

@@ -295,6 +295,16 @@ async def cache_chat_session(session: ChatSession) -> None:
await _cache_session(session)
async def invalidate_session_cache(session_id: str) -> None:
"""Invalidate a chat session from Redis cache.
Used by background tasks to ensure fresh data is loaded on next access.
"""
redis_key = _get_session_cache_key(session_id)
async_redis = await get_redis_async()
await async_redis.delete(redis_key)
async def _get_session_from_db(session_id: str) -> ChatSession | None:
"""Get a chat session from the database."""
prisma_session = await chat_db.get_chat_session(session_id)

View File

@@ -24,6 +24,7 @@ from backend.data.understanding import (
from backend.util.exceptions import NotFoundError
from backend.util.settings import Settings
from . import db as chat_db
from .config import ChatConfig
from .model import (
ChatMessage,
@@ -31,6 +32,7 @@ from .model import (
Usage,
cache_chat_session,
get_chat_session,
invalidate_session_cache,
update_session_title,
upsert_chat_session,
)
@@ -48,8 +50,13 @@ from .response_model import (
StreamToolOutputAvailable,
StreamUsage,
)
from .tools import execute_tool, tools
from .tools.models import ErrorResponse
from .tools import execute_tool, get_tool, tools
from .tools.models import (
ErrorResponse,
OperationInProgressResponse,
OperationPendingResponse,
OperationStartedResponse,
)
from .tracking import track_user_message
logger = logging.getLogger(__name__)
@@ -61,6 +68,10 @@ client = openai.AsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
langfuse = get_client()
# In-memory tracking of running long-running operations (tool_call_id -> asyncio.Task)
# Used for idempotency - prevents duplicate executions on browser refresh
_running_operations: dict[str, asyncio.Task] = {}
class LangfuseNotConfiguredError(Exception):
"""Raised when Langfuse is required but not configured."""
@@ -1260,14 +1271,17 @@ async def _yield_tool_call(
"""
Yield a tool call and its execution result.
For long-running tools, yields heartbeat events every 15 seconds to keep
the SSE connection alive through proxies and load balancers.
For tools marked with `is_long_running=True` (like agent generation), spawns a
background task so the operation survives SSE disconnections. For other tools,
yields heartbeat events every 15 seconds to keep the SSE connection alive.
Raises:
orjson.JSONDecodeError: If tool call arguments cannot be parsed as JSON
KeyError: If expected tool call fields are missing
TypeError: If tool call structure is invalid
"""
import uuid as uuid_module
tool_name = tool_calls[yield_idx]["function"]["name"]
tool_call_id = tool_calls[yield_idx]["id"]
logger.info(f"Yielding tool call: {tool_calls[yield_idx]}")
@@ -1285,7 +1299,79 @@ async def _yield_tool_call(
input=arguments,
)
# Run tool execution in background task with heartbeats to keep connection alive
# Check if this tool is long-running (survives SSE disconnection)
tool = get_tool(tool_name)
if tool and tool.is_long_running:
# Idempotency check - if this tool_call_id is already running, return status
if tool_call_id in _running_operations:
existing_task = _running_operations[tool_call_id]
if not existing_task.done():
logger.info(
f"Tool call {tool_call_id} already in progress, returning status"
)
yield StreamToolOutputAvailable(
toolCallId=tool_call_id,
toolName=tool_name,
output=OperationInProgressResponse(
message="Agent creation already in progress. Please wait...",
tool_call_id=tool_call_id,
).model_dump_json(),
success=True,
)
return
# Generate operation ID
operation_id = str(uuid_module.uuid4())
# Save "pending" tool response to chat history immediately
pending_message = ChatMessage(
role="tool",
content=OperationPendingResponse(
message="Creating agent... This may take a few minutes.",
operation_id=operation_id,
tool_name=tool_name,
).model_dump_json(),
tool_call_id=tool_call_id,
)
session.messages.append(pending_message)
await upsert_chat_session(session)
logger.info(
f"Saved pending operation {operation_id} for tool {tool_name} "
f"in session {session.session_id}"
)
# Start background task (NOT tied to SSE)
task = asyncio.create_task(
_execute_long_running_tool(
tool_name=tool_name,
parameters=arguments,
tool_call_id=tool_call_id,
operation_id=operation_id,
session_id=session.session_id,
user_id=session.user_id,
)
)
# Track for idempotency and cleanup
_running_operations[tool_call_id] = task
task.add_done_callback(lambda _: _running_operations.pop(tool_call_id, None))
# Return immediately - don't wait for completion
yield StreamToolOutputAvailable(
toolCallId=tool_call_id,
toolName=tool_name,
output=OperationStartedResponse(
message=(
"Agent creation started. You can close this tab - "
"check your library in a few minutes."
),
operation_id=operation_id,
tool_name=tool_name,
).model_dump_json(),
success=True,
)
return
# Normal flow: Run tool execution in background task with heartbeats
tool_task = asyncio.create_task(
execute_tool(
tool_name=tool_name,
@@ -1335,3 +1421,91 @@ async def _yield_tool_call(
)
yield tool_execution_response
async def _execute_long_running_tool(
tool_name: str,
parameters: dict[str, Any],
tool_call_id: str,
operation_id: str,
session_id: str,
user_id: str | None,
) -> None:
"""Execute a long-running tool in background and update chat history with result.
This function runs independently of the SSE connection, so the operation
survives if the user closes their browser tab.
"""
try:
# Load fresh session (not stale reference)
session = await get_chat_session(session_id, user_id)
if not session:
logger.error(f"Session {session_id} not found for background tool")
return
# Execute the actual tool
result = await execute_tool(
tool_name=tool_name,
parameters=parameters,
tool_call_id=tool_call_id,
user_id=user_id,
session=session,
)
# Update the pending message with result
await _update_pending_operation(
session_id=session_id,
tool_call_id=tool_call_id,
result=(
result.output
if isinstance(result.output, str)
else orjson.dumps(result.output).decode("utf-8")
),
success=result.success,
)
logger.info(f"Background tool {tool_name} completed for session {session_id}")
except Exception as e:
logger.error(f"Background tool {tool_name} failed: {e}", exc_info=True)
error_response = {
"type": "error",
"message": f"Agent creation failed: {str(e)}",
}
await _update_pending_operation(
session_id=session_id,
tool_call_id=tool_call_id,
result=orjson.dumps(error_response).decode("utf-8"),
success=False,
)
async def _update_pending_operation(
session_id: str,
tool_call_id: str,
result: str,
success: bool,
) -> None:
"""Update the pending tool message with final result.
This is called by background tasks when long-running operations complete.
"""
# Update the message in database
updated = await chat_db.update_tool_message_content(
session_id=session_id,
tool_call_id=tool_call_id,
new_content=result,
)
if updated:
# Invalidate Redis cache so next load gets fresh data
await invalidate_session_cache(session_id)
logger.info(
f"Updated pending operation for tool_call_id {tool_call_id} "
f"in session {session_id}"
)
else:
logger.warning(
f"Failed to update pending operation for tool_call_id {tool_call_id} "
f"in session {session_id}"
)

View File

@@ -49,6 +49,11 @@ tools: list[ChatCompletionToolParam] = [
]
def get_tool(tool_name: str) -> BaseTool | None:
"""Get a tool instance by name."""
return TOOL_REGISTRY.get(tool_name)
async def execute_tool(
tool_name: str,
parameters: dict[str, Any],
@@ -57,7 +62,7 @@ async def execute_tool(
tool_call_id: str,
) -> "StreamToolOutputAvailable":
"""Execute a tool by name."""
tool = TOOL_REGISTRY.get(tool_name)
tool = get_tool(tool_name)
if not tool:
raise ValueError(f"Tool {tool_name} not found")

View File

@@ -36,6 +36,16 @@ class BaseTool:
"""Whether this tool requires authentication."""
return False
@property
def is_long_running(self) -> bool:
"""Whether this tool is long-running and should execute in background.
Long-running tools (like agent generation) are executed via background
tasks to survive SSE disconnections. The result is persisted to chat
history and visible when the user refreshes.
"""
return False
def as_openai_tool(self) -> ChatCompletionToolParam:
"""Convert to OpenAI tool format."""
return ChatCompletionToolParam(

View File

@@ -42,6 +42,10 @@ class CreateAgentTool(BaseTool):
def requires_auth(self) -> bool:
return True
@property
def is_long_running(self) -> bool:
return True
@property
def parameters(self) -> dict[str, Any]:
return {

View File

@@ -42,6 +42,10 @@ class EditAgentTool(BaseTool):
def requires_auth(self) -> bool:
return True
@property
def is_long_running(self) -> bool:
return True
@property
def parameters(self) -> dict[str, Any]:
return {

View File

@@ -28,6 +28,10 @@ class ResponseType(str, Enum):
BLOCK_OUTPUT = "block_output"
DOC_SEARCH_RESULTS = "doc_search_results"
DOC_PAGE = "doc_page"
# Long-running operation types
OPERATION_STARTED = "operation_started"
OPERATION_PENDING = "operation_pending"
OPERATION_IN_PROGRESS = "operation_in_progress"
# Base response model
@@ -334,3 +338,39 @@ class BlockOutputResponse(ToolResponseBase):
block_name: str
outputs: dict[str, list[Any]]
success: bool = True
# Long-running operation models
class OperationStartedResponse(ToolResponseBase):
"""Response when a long-running operation has been started in the background.
This is returned immediately to the client while the operation continues
to execute. The user can close the tab and check back later.
"""
type: ResponseType = ResponseType.OPERATION_STARTED
operation_id: str
tool_name: str
class OperationPendingResponse(ToolResponseBase):
"""Response stored in chat history while a long-running operation is executing.
This is persisted to the database so users see a pending state when they
refresh before the operation completes.
"""
type: ResponseType = ResponseType.OPERATION_PENDING
operation_id: str
tool_name: str
class OperationInProgressResponse(ToolResponseBase):
"""Response when an operation is already in progress.
Returned for idempotency when the same tool_call_id is requested again
while the background task is still running.
"""
type: ResponseType = ResponseType.OPERATION_IN_PROGRESS
tool_call_id: str

View File

@@ -341,6 +341,39 @@ export function parseToolResponse(
timestamp: timestamp || new Date(),
};
}
if (responseType === "operation_started") {
return {
type: "operation_started",
toolName: parsedResult.tool_name as string,
operationId: (parsedResult.operation_id as string) || "",
message:
(parsedResult.message as string) ||
"Operation started. You can close this tab.",
timestamp: timestamp || new Date(),
};
}
if (responseType === "operation_pending") {
return {
type: "operation_pending",
toolName: parsedResult.tool_name as string,
operationId: (parsedResult.operation_id as string) || "",
message:
(parsedResult.message as string) ||
"Operation in progress. Please wait...",
timestamp: timestamp || new Date(),
};
}
if (responseType === "operation_in_progress") {
return {
type: "operation_in_progress",
toolName: parsedResult.tool_name as string,
toolCallId: (parsedResult.tool_call_id as string) || "",
message:
(parsedResult.message as string) ||
"Operation already in progress. Please wait...",
timestamp: timestamp || new Date(),
};
}
if (responseType === "need_login") {
return {
type: "login_needed",

View File

@@ -16,6 +16,7 @@ import { AuthPromptWidget } from "../AuthPromptWidget/AuthPromptWidget";
import { ChatCredentialsSetup } from "../ChatCredentialsSetup/ChatCredentialsSetup";
import { ClarificationQuestionsWidget } from "../ClarificationQuestionsWidget/ClarificationQuestionsWidget";
import { ExecutionStartedMessage } from "../ExecutionStartedMessage/ExecutionStartedMessage";
import { PendingOperationWidget } from "../PendingOperationWidget/PendingOperationWidget";
import { MarkdownContent } from "../MarkdownContent/MarkdownContent";
import { NoResultsMessage } from "../NoResultsMessage/NoResultsMessage";
import { ToolCallMessage } from "../ToolCallMessage/ToolCallMessage";
@@ -71,6 +72,9 @@ export function ChatMessage({
isLoginNeeded,
isCredentialsNeeded,
isClarificationNeeded,
isOperationStarted,
isOperationPending,
isOperationInProgress,
} = useChatMessage(message);
const displayContent = getDisplayContent(message, isUser);
@@ -290,6 +294,42 @@ export function ChatMessage({
);
}
// Render operation_started messages (long-running background operations)
if (isOperationStarted && message.type === "operation_started") {
return (
<PendingOperationWidget
status="started"
message={message.message}
toolName={message.toolName}
className={className}
/>
);
}
// Render operation_pending messages (operations in progress when refreshing)
if (isOperationPending && message.type === "operation_pending") {
return (
<PendingOperationWidget
status="pending"
message={message.message}
toolName={message.toolName}
className={className}
/>
);
}
// Render operation_in_progress messages (duplicate request while operation running)
if (isOperationInProgress && message.type === "operation_in_progress") {
return (
<PendingOperationWidget
status="in_progress"
message={message.message}
toolName={message.toolName}
className={className}
/>
);
}
// Render tool response messages (but skip agent_output if it's being rendered inside assistant message)
if (isToolResponse && message.type === "tool_response") {
return (

View File

@@ -103,6 +103,27 @@ export type ChatMessageData =
message: string;
sessionId: string;
timestamp?: string | Date;
}
| {
type: "operation_started";
toolName: string;
operationId: string;
message: string;
timestamp?: string | Date;
}
| {
type: "operation_pending";
toolName: string;
operationId: string;
message: string;
timestamp?: string | Date;
}
| {
type: "operation_in_progress";
toolName: string;
toolCallId: string;
message: string;
timestamp?: string | Date;
};
export function useChatMessage(message: ChatMessageData) {
@@ -124,5 +145,8 @@ export function useChatMessage(message: ChatMessageData) {
isExecutionStarted: message.type === "execution_started",
isInputsNeeded: message.type === "inputs_needed",
isClarificationNeeded: message.type === "clarification_needed",
isOperationStarted: message.type === "operation_started",
isOperationPending: message.type === "operation_pending",
isOperationInProgress: message.type === "operation_in_progress",
};
}

View File

@@ -0,0 +1,94 @@
"use client";
import { Card } from "@/components/atoms/Card/Card";
import { Text } from "@/components/atoms/Text/Text";
import { cn } from "@/lib/utils";
import { CircleNotch, CheckCircle, XCircle } from "@phosphor-icons/react";
type OperationStatus =
| "pending"
| "started"
| "in_progress"
| "completed"
| "error";
interface Props {
status: OperationStatus;
message: string;
toolName?: string;
className?: string;
}
export function PendingOperationWidget({
status,
message,
toolName,
className,
}: Props) {
const isPending =
status === "pending" || status === "started" || status === "in_progress";
const isCompleted = status === "completed";
const isError = status === "error";
return (
<div
className={cn(
"group relative flex w-full justify-start gap-3 px-4 py-3",
className,
)}
>
<div className="flex w-full max-w-3xl gap-3">
<div className="flex-shrink-0">
<div
className={cn(
"flex h-7 w-7 items-center justify-center rounded-lg",
isPending && "bg-blue-500",
isCompleted && "bg-green-500",
isError && "bg-red-500",
)}
>
{isPending && (
<CircleNotch
className="h-4 w-4 animate-spin text-white"
weight="bold"
/>
)}
{isCompleted && (
<CheckCircle className="h-4 w-4 text-white" weight="bold" />
)}
{isError && (
<XCircle className="h-4 w-4 text-white" weight="bold" />
)}
</div>
</div>
<div className="flex min-w-0 flex-1 flex-col">
<Card className="space-y-2 p-4">
<div>
<Text variant="h4" className="mb-1 text-slate-900">
{isPending && "Creating Agent"}
{isCompleted && "Operation Complete"}
{isError && "Operation Failed"}
</Text>
<Text variant="small" className="text-slate-600">
{message}
</Text>
</div>
{isPending && (
<Text variant="small" className="italic text-slate-500">
You can close this tab. Check your library in a few minutes.
</Text>
)}
{toolName && (
<Text variant="small" className="text-slate-400">
Tool: {toolName}
</Text>
)}
</Card>
</div>
</div>
</div>
);
}