mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-04 03:45:12 -05:00
feat(backend): persist long-running tool results to survive SSE disconnects (#11856)
## Summary Agent generation (`create_agent`, `edit_agent`) can take 1-5 minutes. Previously, if the user closed their browser tab during this time: 1. The SSE connection would die 2. The tool execution would be cancelled via `CancelledError` 3. The result would be lost - even if the agent-generator service completed successfully This PR ensures long-running tool operations survive SSE disconnections. ### Changes 🏗️ **Backend:** - **base.py**: Added `is_long_running` property to `BaseTool` for tools to opt-in to background execution - **create_agent.py / edit_agent.py**: Set `is_long_running = True` - **models.py**: Added `OperationStartedResponse`, `OperationPendingResponse`, `OperationInProgressResponse` types - **service.py**: Modified `_yield_tool_call()` to: - Check if tool is `is_long_running` - Save "pending" message to chat history immediately - Spawn background task that runs independently of SSE - Return `operation_started` immediately (don't wait) - Update chat history with result when background task completes - Track running operations for idempotency (prevents duplicate ops on refresh) - **db.py**: Added `update_tool_message_content()` to update pending messages - **model.py**: Added `invalidate_session_cache()` to clear Redis after background completion **Frontend:** - **useChatMessage.ts**: Added operation message types - **helpers.ts**: Handle `operation_started`, `operation_pending`, `operation_in_progress` response types - **PendingOperationWidget**: New component to display operation status with spinner - **ChatMessage.tsx**: Render `PendingOperationWidget` for operation messages ### How It Works ``` User Request → Save "pending" message → Spawn background task → Return immediately ↓ Task runs independently of SSE ↓ On completion: Update message in chat history ↓ User refreshes → Loads history → Sees result ``` ### User Experience 1. User requests agent creation 2. Sees "Agent creation started. You can close this tab - check your library in a few minutes." 3. Can close browser tab safely 4. When they return, chat shows the completed result (or error) ### Checklist 📋 #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: - [x] pyright passes (0 errors) - [x] TypeScript checks pass - [x] Formatters applied ### Test Plan 1. Start agent creation in copilot 2. Close browser tab immediately after seeing "operation_started" 3. Wait 2-3 minutes 4. Reopen chat 5. Verify: Chat history shows completion message and agent appears in library --------- Co-authored-by: Ubbe <hi@ubbe.dev>
This commit is contained in:
@@ -38,6 +38,12 @@ class ChatConfig(BaseSettings):
|
||||
default=3, description="Maximum number of agent schedules"
|
||||
)
|
||||
|
||||
# Long-running operation configuration
|
||||
long_running_operation_ttl: int = Field(
|
||||
default=600,
|
||||
description="TTL in seconds for long-running operation tracking in Redis (safety net if pod dies)",
|
||||
)
|
||||
|
||||
# Langfuse Prompt Management Configuration
|
||||
# Note: Langfuse credentials are in Settings().secrets (settings.py)
|
||||
langfuse_prompt_name: str = Field(
|
||||
|
||||
@@ -247,3 +247,45 @@ async def get_chat_session_message_count(session_id: str) -> int:
|
||||
"""Get the number of messages in a chat session."""
|
||||
count = await PrismaChatMessage.prisma().count(where={"sessionId": session_id})
|
||||
return count
|
||||
|
||||
|
||||
async def update_tool_message_content(
|
||||
session_id: str,
|
||||
tool_call_id: str,
|
||||
new_content: str,
|
||||
) -> bool:
|
||||
"""Update the content of a tool message in chat history.
|
||||
|
||||
Used by background tasks to update pending operation messages with final results.
|
||||
|
||||
Args:
|
||||
session_id: The chat session ID.
|
||||
tool_call_id: The tool call ID to find the message.
|
||||
new_content: The new content to set.
|
||||
|
||||
Returns:
|
||||
True if a message was updated, False otherwise.
|
||||
"""
|
||||
try:
|
||||
result = await PrismaChatMessage.prisma().update_many(
|
||||
where={
|
||||
"sessionId": session_id,
|
||||
"toolCallId": tool_call_id,
|
||||
},
|
||||
data={
|
||||
"content": new_content,
|
||||
},
|
||||
)
|
||||
if result == 0:
|
||||
logger.warning(
|
||||
f"No message found to update for session {session_id}, "
|
||||
f"tool_call_id {tool_call_id}"
|
||||
)
|
||||
return False
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to update tool message for session {session_id}, "
|
||||
f"tool_call_id {tool_call_id}: {e}"
|
||||
)
|
||||
return False
|
||||
|
||||
@@ -295,6 +295,21 @@ async def cache_chat_session(session: ChatSession) -> None:
|
||||
await _cache_session(session)
|
||||
|
||||
|
||||
async def invalidate_session_cache(session_id: str) -> None:
|
||||
"""Invalidate a chat session from Redis cache.
|
||||
|
||||
Used by background tasks to ensure fresh data is loaded on next access.
|
||||
This is best-effort - Redis failures are logged but don't fail the operation.
|
||||
"""
|
||||
try:
|
||||
redis_key = _get_session_cache_key(session_id)
|
||||
async_redis = await get_redis_async()
|
||||
await async_redis.delete(redis_key)
|
||||
except Exception as e:
|
||||
# Best-effort: log but don't fail - cache will expire naturally
|
||||
logger.warning(f"Failed to invalidate session cache for {session_id}: {e}")
|
||||
|
||||
|
||||
async def _get_session_from_db(session_id: str) -> ChatSession | None:
|
||||
"""Get a chat session from the database."""
|
||||
prisma_session = await chat_db.get_chat_session(session_id)
|
||||
|
||||
@@ -17,6 +17,7 @@ from openai import (
|
||||
)
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletionToolParam
|
||||
|
||||
from backend.data.redis_client import get_redis_async
|
||||
from backend.data.understanding import (
|
||||
format_understanding_for_prompt,
|
||||
get_business_understanding,
|
||||
@@ -24,6 +25,7 @@ from backend.data.understanding import (
|
||||
from backend.util.exceptions import NotFoundError
|
||||
from backend.util.settings import Settings
|
||||
|
||||
from . import db as chat_db
|
||||
from .config import ChatConfig
|
||||
from .model import (
|
||||
ChatMessage,
|
||||
@@ -31,6 +33,7 @@ from .model import (
|
||||
Usage,
|
||||
cache_chat_session,
|
||||
get_chat_session,
|
||||
invalidate_session_cache,
|
||||
update_session_title,
|
||||
upsert_chat_session,
|
||||
)
|
||||
@@ -48,8 +51,13 @@ from .response_model import (
|
||||
StreamToolOutputAvailable,
|
||||
StreamUsage,
|
||||
)
|
||||
from .tools import execute_tool, tools
|
||||
from .tools.models import ErrorResponse
|
||||
from .tools import execute_tool, get_tool, tools
|
||||
from .tools.models import (
|
||||
ErrorResponse,
|
||||
OperationInProgressResponse,
|
||||
OperationPendingResponse,
|
||||
OperationStartedResponse,
|
||||
)
|
||||
from .tracking import track_user_message
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -61,6 +69,43 @@ client = openai.AsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
|
||||
|
||||
langfuse = get_client()
|
||||
|
||||
# Redis key prefix for tracking running long-running operations
|
||||
# Used for idempotency across Kubernetes pods - prevents duplicate executions on browser refresh
|
||||
RUNNING_OPERATION_PREFIX = "chat:running_operation:"
|
||||
|
||||
# Module-level set to hold strong references to background tasks.
|
||||
# This prevents asyncio from garbage collecting tasks before they complete.
|
||||
# Tasks are automatically removed on completion via done_callback.
|
||||
_background_tasks: set[asyncio.Task] = set()
|
||||
|
||||
|
||||
async def _mark_operation_started(tool_call_id: str) -> bool:
|
||||
"""Mark a long-running operation as started (Redis-based).
|
||||
|
||||
Returns True if successfully marked (operation was not already running),
|
||||
False if operation was already running (lost race condition).
|
||||
Raises exception if Redis is unavailable (fail-closed).
|
||||
"""
|
||||
redis = await get_redis_async()
|
||||
key = f"{RUNNING_OPERATION_PREFIX}{tool_call_id}"
|
||||
# SETNX with TTL - atomic "set if not exists"
|
||||
result = await redis.set(key, "1", ex=config.long_running_operation_ttl, nx=True)
|
||||
return result is not None
|
||||
|
||||
|
||||
async def _mark_operation_completed(tool_call_id: str) -> None:
|
||||
"""Mark a long-running operation as completed (remove Redis key).
|
||||
|
||||
This is best-effort - if Redis fails, the TTL will eventually clean up.
|
||||
"""
|
||||
try:
|
||||
redis = await get_redis_async()
|
||||
key = f"{RUNNING_OPERATION_PREFIX}{tool_call_id}"
|
||||
await redis.delete(key)
|
||||
except Exception as e:
|
||||
# Non-critical: TTL will clean up eventually
|
||||
logger.warning(f"Failed to delete running operation key {tool_call_id}: {e}")
|
||||
|
||||
|
||||
class LangfuseNotConfiguredError(Exception):
|
||||
"""Raised when Langfuse is required but not configured."""
|
||||
@@ -315,6 +360,7 @@ async def stream_chat_completion(
|
||||
has_yielded_end = False
|
||||
has_yielded_error = False
|
||||
has_done_tool_call = False
|
||||
has_long_running_tool_call = False # Track if we had a long-running tool call
|
||||
has_received_text = False
|
||||
text_streaming_ended = False
|
||||
tool_response_messages: list[ChatMessage] = []
|
||||
@@ -336,7 +382,6 @@ async def stream_chat_completion(
|
||||
system_prompt=system_prompt,
|
||||
text_block_id=text_block_id,
|
||||
):
|
||||
|
||||
if isinstance(chunk, StreamTextStart):
|
||||
# Emit text-start before first text delta
|
||||
if not has_received_text:
|
||||
@@ -394,13 +439,34 @@ async def stream_chat_completion(
|
||||
if isinstance(chunk.output, str)
|
||||
else orjson.dumps(chunk.output).decode("utf-8")
|
||||
)
|
||||
tool_response_messages.append(
|
||||
ChatMessage(
|
||||
role="tool",
|
||||
content=result_content,
|
||||
tool_call_id=chunk.toolCallId,
|
||||
# Skip saving long-running operation responses - messages already saved in _yield_tool_call
|
||||
# Use JSON parsing instead of substring matching to avoid false positives
|
||||
is_long_running_response = False
|
||||
try:
|
||||
parsed = orjson.loads(result_content)
|
||||
if isinstance(parsed, dict) and parsed.get("type") in (
|
||||
"operation_started",
|
||||
"operation_in_progress",
|
||||
):
|
||||
is_long_running_response = True
|
||||
except (orjson.JSONDecodeError, TypeError):
|
||||
pass # Not JSON or not a dict - treat as regular response
|
||||
if is_long_running_response:
|
||||
# Remove from accumulated_tool_calls since assistant message was already saved
|
||||
accumulated_tool_calls[:] = [
|
||||
tc
|
||||
for tc in accumulated_tool_calls
|
||||
if tc["id"] != chunk.toolCallId
|
||||
]
|
||||
has_long_running_tool_call = True
|
||||
else:
|
||||
tool_response_messages.append(
|
||||
ChatMessage(
|
||||
role="tool",
|
||||
content=result_content,
|
||||
tool_call_id=chunk.toolCallId,
|
||||
)
|
||||
)
|
||||
)
|
||||
has_done_tool_call = True
|
||||
# Track if any tool execution failed
|
||||
if not chunk.success:
|
||||
@@ -576,7 +642,14 @@ async def stream_chat_completion(
|
||||
logger.info(
|
||||
f"Extended session messages, new message_count={len(session.messages)}"
|
||||
)
|
||||
if messages_to_save or has_appended_streaming_message:
|
||||
# Save if there are regular (non-long-running) tool responses or streaming message.
|
||||
# Long-running tools save their own state, but we still need to save regular tools
|
||||
# that may be in the same response.
|
||||
has_regular_tool_responses = len(tool_response_messages) > 0
|
||||
if has_regular_tool_responses or (
|
||||
not has_long_running_tool_call
|
||||
and (messages_to_save or has_appended_streaming_message)
|
||||
):
|
||||
await upsert_chat_session(session)
|
||||
else:
|
||||
logger.info(
|
||||
@@ -585,7 +658,9 @@ async def stream_chat_completion(
|
||||
)
|
||||
|
||||
# If we did a tool call, stream the chat completion again to get the next response
|
||||
if has_done_tool_call:
|
||||
# Skip only if ALL tools were long-running (they handle their own completion)
|
||||
has_regular_tools = len(tool_response_messages) > 0
|
||||
if has_done_tool_call and (has_regular_tools or not has_long_running_tool_call):
|
||||
logger.info(
|
||||
"Tool call executed, streaming chat completion again to get assistant response"
|
||||
)
|
||||
@@ -1260,14 +1335,17 @@ async def _yield_tool_call(
|
||||
"""
|
||||
Yield a tool call and its execution result.
|
||||
|
||||
For long-running tools, yields heartbeat events every 15 seconds to keep
|
||||
the SSE connection alive through proxies and load balancers.
|
||||
For tools marked with `is_long_running=True` (like agent generation), spawns a
|
||||
background task so the operation survives SSE disconnections. For other tools,
|
||||
yields heartbeat events every 15 seconds to keep the SSE connection alive.
|
||||
|
||||
Raises:
|
||||
orjson.JSONDecodeError: If tool call arguments cannot be parsed as JSON
|
||||
KeyError: If expected tool call fields are missing
|
||||
TypeError: If tool call structure is invalid
|
||||
"""
|
||||
import uuid as uuid_module
|
||||
|
||||
tool_name = tool_calls[yield_idx]["function"]["name"]
|
||||
tool_call_id = tool_calls[yield_idx]["id"]
|
||||
logger.info(f"Yielding tool call: {tool_calls[yield_idx]}")
|
||||
@@ -1285,7 +1363,151 @@ async def _yield_tool_call(
|
||||
input=arguments,
|
||||
)
|
||||
|
||||
# Run tool execution in background task with heartbeats to keep connection alive
|
||||
# Check if this tool is long-running (survives SSE disconnection)
|
||||
tool = get_tool(tool_name)
|
||||
if tool and tool.is_long_running:
|
||||
# Atomic check-and-set: returns False if operation already running (lost race)
|
||||
if not await _mark_operation_started(tool_call_id):
|
||||
logger.info(
|
||||
f"Tool call {tool_call_id} already in progress, returning status"
|
||||
)
|
||||
# Build dynamic message based on tool name
|
||||
if tool_name == "create_agent":
|
||||
in_progress_msg = "Agent creation already in progress. Please wait..."
|
||||
elif tool_name == "edit_agent":
|
||||
in_progress_msg = "Agent edit already in progress. Please wait..."
|
||||
else:
|
||||
in_progress_msg = f"{tool_name} already in progress. Please wait..."
|
||||
|
||||
yield StreamToolOutputAvailable(
|
||||
toolCallId=tool_call_id,
|
||||
toolName=tool_name,
|
||||
output=OperationInProgressResponse(
|
||||
message=in_progress_msg,
|
||||
tool_call_id=tool_call_id,
|
||||
).model_dump_json(),
|
||||
success=True,
|
||||
)
|
||||
return
|
||||
|
||||
# Generate operation ID
|
||||
operation_id = str(uuid_module.uuid4())
|
||||
|
||||
# Build a user-friendly message based on tool and arguments
|
||||
if tool_name == "create_agent":
|
||||
agent_desc = arguments.get("description", "")
|
||||
# Truncate long descriptions for the message
|
||||
desc_preview = (
|
||||
(agent_desc[:100] + "...") if len(agent_desc) > 100 else agent_desc
|
||||
)
|
||||
pending_msg = (
|
||||
f"Creating your agent: {desc_preview}"
|
||||
if desc_preview
|
||||
else "Creating agent... This may take a few minutes."
|
||||
)
|
||||
started_msg = (
|
||||
"Agent creation started. You can close this tab - "
|
||||
"check your library in a few minutes."
|
||||
)
|
||||
elif tool_name == "edit_agent":
|
||||
changes = arguments.get("changes", "")
|
||||
changes_preview = (changes[:100] + "...") if len(changes) > 100 else changes
|
||||
pending_msg = (
|
||||
f"Editing agent: {changes_preview}"
|
||||
if changes_preview
|
||||
else "Editing agent... This may take a few minutes."
|
||||
)
|
||||
started_msg = (
|
||||
"Agent edit started. You can close this tab - "
|
||||
"check your library in a few minutes."
|
||||
)
|
||||
else:
|
||||
pending_msg = f"Running {tool_name}... This may take a few minutes."
|
||||
started_msg = (
|
||||
f"{tool_name} started. You can close this tab - "
|
||||
"check back in a few minutes."
|
||||
)
|
||||
|
||||
# Track appended messages for rollback on failure
|
||||
assistant_message: ChatMessage | None = None
|
||||
pending_message: ChatMessage | None = None
|
||||
|
||||
# Wrap session save and task creation in try-except to release lock on failure
|
||||
try:
|
||||
# Save assistant message with tool_call FIRST (required by LLM)
|
||||
assistant_message = ChatMessage(
|
||||
role="assistant",
|
||||
content="",
|
||||
tool_calls=[tool_calls[yield_idx]],
|
||||
)
|
||||
session.messages.append(assistant_message)
|
||||
|
||||
# Then save pending tool result
|
||||
pending_message = ChatMessage(
|
||||
role="tool",
|
||||
content=OperationPendingResponse(
|
||||
message=pending_msg,
|
||||
operation_id=operation_id,
|
||||
tool_name=tool_name,
|
||||
).model_dump_json(),
|
||||
tool_call_id=tool_call_id,
|
||||
)
|
||||
session.messages.append(pending_message)
|
||||
await upsert_chat_session(session)
|
||||
logger.info(
|
||||
f"Saved pending operation {operation_id} for tool {tool_name} "
|
||||
f"in session {session.session_id}"
|
||||
)
|
||||
|
||||
# Store task reference in module-level set to prevent GC before completion
|
||||
task = asyncio.create_task(
|
||||
_execute_long_running_tool(
|
||||
tool_name=tool_name,
|
||||
parameters=arguments,
|
||||
tool_call_id=tool_call_id,
|
||||
operation_id=operation_id,
|
||||
session_id=session.session_id,
|
||||
user_id=session.user_id,
|
||||
)
|
||||
)
|
||||
_background_tasks.add(task)
|
||||
task.add_done_callback(_background_tasks.discard)
|
||||
except Exception as e:
|
||||
# Roll back appended messages to prevent data corruption on subsequent saves
|
||||
if (
|
||||
pending_message
|
||||
and session.messages
|
||||
and session.messages[-1] == pending_message
|
||||
):
|
||||
session.messages.pop()
|
||||
if (
|
||||
assistant_message
|
||||
and session.messages
|
||||
and session.messages[-1] == assistant_message
|
||||
):
|
||||
session.messages.pop()
|
||||
|
||||
# Release the Redis lock since the background task won't be spawned
|
||||
await _mark_operation_completed(tool_call_id)
|
||||
logger.error(
|
||||
f"Failed to setup long-running tool {tool_name}: {e}", exc_info=True
|
||||
)
|
||||
raise
|
||||
|
||||
# Return immediately - don't wait for completion
|
||||
yield StreamToolOutputAvailable(
|
||||
toolCallId=tool_call_id,
|
||||
toolName=tool_name,
|
||||
output=OperationStartedResponse(
|
||||
message=started_msg,
|
||||
operation_id=operation_id,
|
||||
tool_name=tool_name,
|
||||
).model_dump_json(),
|
||||
success=True,
|
||||
)
|
||||
return
|
||||
|
||||
# Normal flow: Run tool execution in background task with heartbeats
|
||||
tool_task = asyncio.create_task(
|
||||
execute_tool(
|
||||
tool_name=tool_name,
|
||||
@@ -1335,3 +1557,190 @@ async def _yield_tool_call(
|
||||
)
|
||||
|
||||
yield tool_execution_response
|
||||
|
||||
|
||||
async def _execute_long_running_tool(
|
||||
tool_name: str,
|
||||
parameters: dict[str, Any],
|
||||
tool_call_id: str,
|
||||
operation_id: str,
|
||||
session_id: str,
|
||||
user_id: str | None,
|
||||
) -> None:
|
||||
"""Execute a long-running tool in background and update chat history with result.
|
||||
|
||||
This function runs independently of the SSE connection, so the operation
|
||||
survives if the user closes their browser tab.
|
||||
"""
|
||||
try:
|
||||
# Load fresh session (not stale reference)
|
||||
session = await get_chat_session(session_id, user_id)
|
||||
if not session:
|
||||
logger.error(f"Session {session_id} not found for background tool")
|
||||
return
|
||||
|
||||
# Execute the actual tool
|
||||
result = await execute_tool(
|
||||
tool_name=tool_name,
|
||||
parameters=parameters,
|
||||
tool_call_id=tool_call_id,
|
||||
user_id=user_id,
|
||||
session=session,
|
||||
)
|
||||
|
||||
# Update the pending message with result
|
||||
await _update_pending_operation(
|
||||
session_id=session_id,
|
||||
tool_call_id=tool_call_id,
|
||||
result=(
|
||||
result.output
|
||||
if isinstance(result.output, str)
|
||||
else orjson.dumps(result.output).decode("utf-8")
|
||||
),
|
||||
)
|
||||
|
||||
logger.info(f"Background tool {tool_name} completed for session {session_id}")
|
||||
|
||||
# Generate LLM continuation so user sees response when they poll/refresh
|
||||
await _generate_llm_continuation(session_id=session_id, user_id=user_id)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Background tool {tool_name} failed: {e}", exc_info=True)
|
||||
error_response = ErrorResponse(
|
||||
message=f"Tool {tool_name} failed: {str(e)}",
|
||||
)
|
||||
await _update_pending_operation(
|
||||
session_id=session_id,
|
||||
tool_call_id=tool_call_id,
|
||||
result=error_response.model_dump_json(),
|
||||
)
|
||||
finally:
|
||||
await _mark_operation_completed(tool_call_id)
|
||||
|
||||
|
||||
async def _update_pending_operation(
|
||||
session_id: str,
|
||||
tool_call_id: str,
|
||||
result: str,
|
||||
) -> None:
|
||||
"""Update the pending tool message with final result.
|
||||
|
||||
This is called by background tasks when long-running operations complete.
|
||||
"""
|
||||
# Update the message in database
|
||||
updated = await chat_db.update_tool_message_content(
|
||||
session_id=session_id,
|
||||
tool_call_id=tool_call_id,
|
||||
new_content=result,
|
||||
)
|
||||
|
||||
if updated:
|
||||
# Invalidate Redis cache so next load gets fresh data
|
||||
# Wrap in try/except to prevent cache failures from triggering error handling
|
||||
# that would overwrite our successful DB update
|
||||
try:
|
||||
await invalidate_session_cache(session_id)
|
||||
except Exception as e:
|
||||
# Non-critical: cache will eventually be refreshed on next load
|
||||
logger.warning(f"Failed to invalidate cache for session {session_id}: {e}")
|
||||
logger.info(
|
||||
f"Updated pending operation for tool_call_id {tool_call_id} "
|
||||
f"in session {session_id}"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Failed to update pending operation for tool_call_id {tool_call_id} "
|
||||
f"in session {session_id}"
|
||||
)
|
||||
|
||||
|
||||
async def _generate_llm_continuation(
|
||||
session_id: str,
|
||||
user_id: str | None,
|
||||
) -> None:
|
||||
"""Generate an LLM response after a long-running tool completes.
|
||||
|
||||
This is called by background tasks to continue the conversation
|
||||
after a tool result is saved. The response is saved to the database
|
||||
so users see it when they refresh or poll.
|
||||
"""
|
||||
try:
|
||||
# Load fresh session from DB (bypass cache to get the updated tool result)
|
||||
await invalidate_session_cache(session_id)
|
||||
session = await get_chat_session(session_id, user_id)
|
||||
if not session:
|
||||
logger.error(f"Session {session_id} not found for LLM continuation")
|
||||
return
|
||||
|
||||
# Build system prompt
|
||||
system_prompt, _ = await _build_system_prompt(user_id)
|
||||
|
||||
# Build messages in OpenAI format
|
||||
messages = session.to_openai_messages()
|
||||
if system_prompt:
|
||||
from openai.types.chat import ChatCompletionSystemMessageParam
|
||||
|
||||
system_message = ChatCompletionSystemMessageParam(
|
||||
role="system",
|
||||
content=system_prompt,
|
||||
)
|
||||
messages = [system_message] + messages
|
||||
|
||||
# Build extra_body for tracing
|
||||
extra_body: dict[str, Any] = {
|
||||
"posthogProperties": {
|
||||
"environment": settings.config.app_env.value,
|
||||
},
|
||||
}
|
||||
if user_id:
|
||||
extra_body["user"] = user_id[:128]
|
||||
extra_body["posthogDistinctId"] = user_id
|
||||
if session_id:
|
||||
extra_body["session_id"] = session_id[:128]
|
||||
|
||||
# Make non-streaming LLM call (no tools - just text response)
|
||||
from typing import cast
|
||||
|
||||
from openai.types.chat import ChatCompletionMessageParam
|
||||
|
||||
# No tools parameter = text-only response (no tool calls)
|
||||
response = await client.chat.completions.create(
|
||||
model=config.model,
|
||||
messages=cast(list[ChatCompletionMessageParam], messages),
|
||||
extra_body=extra_body,
|
||||
)
|
||||
|
||||
if response.choices and response.choices[0].message.content:
|
||||
assistant_content = response.choices[0].message.content
|
||||
|
||||
# Reload session from DB to avoid race condition with user messages
|
||||
# that may have been sent while we were generating the LLM response
|
||||
fresh_session = await get_chat_session(session_id, user_id)
|
||||
if not fresh_session:
|
||||
logger.error(
|
||||
f"Session {session_id} disappeared during LLM continuation"
|
||||
)
|
||||
return
|
||||
|
||||
# Save assistant message to database
|
||||
assistant_message = ChatMessage(
|
||||
role="assistant",
|
||||
content=assistant_content,
|
||||
)
|
||||
fresh_session.messages.append(assistant_message)
|
||||
|
||||
# Save to database (not cache) to persist the response
|
||||
await upsert_chat_session(fresh_session)
|
||||
|
||||
# Invalidate cache so next poll/refresh gets fresh data
|
||||
await invalidate_session_cache(session_id)
|
||||
|
||||
logger.info(
|
||||
f"Generated LLM continuation for session {session_id}, "
|
||||
f"response length: {len(assistant_content)}"
|
||||
)
|
||||
else:
|
||||
logger.warning(f"LLM continuation returned empty response for {session_id}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to generate LLM continuation: {e}", exc_info=True)
|
||||
|
||||
@@ -49,6 +49,11 @@ tools: list[ChatCompletionToolParam] = [
|
||||
]
|
||||
|
||||
|
||||
def get_tool(tool_name: str) -> BaseTool | None:
|
||||
"""Get a tool instance by name."""
|
||||
return TOOL_REGISTRY.get(tool_name)
|
||||
|
||||
|
||||
async def execute_tool(
|
||||
tool_name: str,
|
||||
parameters: dict[str, Any],
|
||||
@@ -57,7 +62,7 @@ async def execute_tool(
|
||||
tool_call_id: str,
|
||||
) -> "StreamToolOutputAvailable":
|
||||
"""Execute a tool by name."""
|
||||
tool = TOOL_REGISTRY.get(tool_name)
|
||||
tool = get_tool(tool_name)
|
||||
if not tool:
|
||||
raise ValueError(f"Tool {tool_name} not found")
|
||||
|
||||
|
||||
@@ -36,6 +36,16 @@ class BaseTool:
|
||||
"""Whether this tool requires authentication."""
|
||||
return False
|
||||
|
||||
@property
|
||||
def is_long_running(self) -> bool:
|
||||
"""Whether this tool is long-running and should execute in background.
|
||||
|
||||
Long-running tools (like agent generation) are executed via background
|
||||
tasks to survive SSE disconnections. The result is persisted to chat
|
||||
history and visible when the user refreshes.
|
||||
"""
|
||||
return False
|
||||
|
||||
def as_openai_tool(self) -> ChatCompletionToolParam:
|
||||
"""Convert to OpenAI tool format."""
|
||||
return ChatCompletionToolParam(
|
||||
|
||||
@@ -42,6 +42,10 @@ class CreateAgentTool(BaseTool):
|
||||
def requires_auth(self) -> bool:
|
||||
return True
|
||||
|
||||
@property
|
||||
def is_long_running(self) -> bool:
|
||||
return True
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, Any]:
|
||||
return {
|
||||
|
||||
@@ -42,6 +42,10 @@ class EditAgentTool(BaseTool):
|
||||
def requires_auth(self) -> bool:
|
||||
return True
|
||||
|
||||
@property
|
||||
def is_long_running(self) -> bool:
|
||||
return True
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, Any]:
|
||||
return {
|
||||
|
||||
@@ -28,6 +28,10 @@ class ResponseType(str, Enum):
|
||||
BLOCK_OUTPUT = "block_output"
|
||||
DOC_SEARCH_RESULTS = "doc_search_results"
|
||||
DOC_PAGE = "doc_page"
|
||||
# Long-running operation types
|
||||
OPERATION_STARTED = "operation_started"
|
||||
OPERATION_PENDING = "operation_pending"
|
||||
OPERATION_IN_PROGRESS = "operation_in_progress"
|
||||
|
||||
|
||||
# Base response model
|
||||
@@ -334,3 +338,39 @@ class BlockOutputResponse(ToolResponseBase):
|
||||
block_name: str
|
||||
outputs: dict[str, list[Any]]
|
||||
success: bool = True
|
||||
|
||||
|
||||
# Long-running operation models
|
||||
class OperationStartedResponse(ToolResponseBase):
|
||||
"""Response when a long-running operation has been started in the background.
|
||||
|
||||
This is returned immediately to the client while the operation continues
|
||||
to execute. The user can close the tab and check back later.
|
||||
"""
|
||||
|
||||
type: ResponseType = ResponseType.OPERATION_STARTED
|
||||
operation_id: str
|
||||
tool_name: str
|
||||
|
||||
|
||||
class OperationPendingResponse(ToolResponseBase):
|
||||
"""Response stored in chat history while a long-running operation is executing.
|
||||
|
||||
This is persisted to the database so users see a pending state when they
|
||||
refresh before the operation completes.
|
||||
"""
|
||||
|
||||
type: ResponseType = ResponseType.OPERATION_PENDING
|
||||
operation_id: str
|
||||
tool_name: str
|
||||
|
||||
|
||||
class OperationInProgressResponse(ToolResponseBase):
|
||||
"""Response when an operation is already in progress.
|
||||
|
||||
Returned for idempotency when the same tool_call_id is requested again
|
||||
while the background task is still running.
|
||||
"""
|
||||
|
||||
type: ResponseType = ResponseType.OPERATION_IN_PROGRESS
|
||||
tool_call_id: str
|
||||
|
||||
@@ -359,8 +359,8 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
|
||||
description="The port for the Agent Generator service",
|
||||
)
|
||||
agentgenerator_timeout: int = Field(
|
||||
default=120,
|
||||
description="The timeout in seconds for Agent Generator service requests",
|
||||
default=600,
|
||||
description="The timeout in seconds for Agent Generator service requests (includes retries for rate limits)",
|
||||
)
|
||||
|
||||
enable_example_blocks: bool = Field(
|
||||
|
||||
@@ -48,6 +48,15 @@ export function handleTextEnded(
|
||||
const completedText = deps.streamingChunksRef.current.join("");
|
||||
if (completedText.trim()) {
|
||||
deps.setMessages((prev) => {
|
||||
// Check if this exact message already exists to prevent duplicates
|
||||
const exists = prev.some(
|
||||
(msg) =>
|
||||
msg.type === "message" &&
|
||||
msg.role === "assistant" &&
|
||||
msg.content === completedText,
|
||||
);
|
||||
if (exists) return prev;
|
||||
|
||||
const assistantMessage: ChatMessageData = {
|
||||
type: "message",
|
||||
role: "assistant",
|
||||
@@ -203,13 +212,24 @@ export function handleStreamEnd(
|
||||
]);
|
||||
}
|
||||
if (completedContent.trim()) {
|
||||
const assistantMessage: ChatMessageData = {
|
||||
type: "message",
|
||||
role: "assistant",
|
||||
content: completedContent,
|
||||
timestamp: new Date(),
|
||||
};
|
||||
deps.setMessages((prev) => [...prev, assistantMessage]);
|
||||
deps.setMessages((prev) => {
|
||||
// Check if this exact message already exists to prevent duplicates
|
||||
const exists = prev.some(
|
||||
(msg) =>
|
||||
msg.type === "message" &&
|
||||
msg.role === "assistant" &&
|
||||
msg.content === completedContent,
|
||||
);
|
||||
if (exists) return prev;
|
||||
|
||||
const assistantMessage: ChatMessageData = {
|
||||
type: "message",
|
||||
role: "assistant",
|
||||
content: completedContent,
|
||||
timestamp: new Date(),
|
||||
};
|
||||
return [...prev, assistantMessage];
|
||||
});
|
||||
}
|
||||
deps.setStreamingChunks([]);
|
||||
deps.streamingChunksRef.current = [];
|
||||
|
||||
@@ -304,6 +304,7 @@ export function parseToolResponse(
|
||||
if (isAgentArray(agentsData)) {
|
||||
return {
|
||||
type: "agent_carousel",
|
||||
toolId,
|
||||
toolName: "agent_carousel",
|
||||
agents: agentsData,
|
||||
totalCount: parsedResult.total_count as number | undefined,
|
||||
@@ -316,6 +317,7 @@ export function parseToolResponse(
|
||||
if (responseType === "execution_started") {
|
||||
return {
|
||||
type: "execution_started",
|
||||
toolId,
|
||||
toolName: "execution_started",
|
||||
executionId: (parsedResult.execution_id as string) || "",
|
||||
agentName: (parsedResult.graph_name as string) || undefined,
|
||||
@@ -341,6 +343,41 @@ export function parseToolResponse(
|
||||
timestamp: timestamp || new Date(),
|
||||
};
|
||||
}
|
||||
if (responseType === "operation_started") {
|
||||
return {
|
||||
type: "operation_started",
|
||||
toolName: (parsedResult.tool_name as string) || toolName,
|
||||
toolId,
|
||||
operationId: (parsedResult.operation_id as string) || "",
|
||||
message:
|
||||
(parsedResult.message as string) ||
|
||||
"Operation started. You can close this tab.",
|
||||
timestamp: timestamp || new Date(),
|
||||
};
|
||||
}
|
||||
if (responseType === "operation_pending") {
|
||||
return {
|
||||
type: "operation_pending",
|
||||
toolName: (parsedResult.tool_name as string) || toolName,
|
||||
toolId,
|
||||
operationId: (parsedResult.operation_id as string) || "",
|
||||
message:
|
||||
(parsedResult.message as string) ||
|
||||
"Operation in progress. Please wait...",
|
||||
timestamp: timestamp || new Date(),
|
||||
};
|
||||
}
|
||||
if (responseType === "operation_in_progress") {
|
||||
return {
|
||||
type: "operation_in_progress",
|
||||
toolName: (parsedResult.tool_name as string) || toolName,
|
||||
toolCallId: (parsedResult.tool_call_id as string) || toolId,
|
||||
message:
|
||||
(parsedResult.message as string) ||
|
||||
"Operation already in progress. Please wait...",
|
||||
timestamp: timestamp || new Date(),
|
||||
};
|
||||
}
|
||||
if (responseType === "need_login") {
|
||||
return {
|
||||
type: "login_needed",
|
||||
|
||||
@@ -82,11 +82,110 @@ export function useChatContainer({
|
||||
[sessionId, stopStreaming, activeStreams, subscribeToStream],
|
||||
);
|
||||
|
||||
const allMessages = useMemo(
|
||||
() => [...processInitialMessages(initialMessages), ...messages],
|
||||
[initialMessages, messages],
|
||||
// Collect toolIds from completed tool results in initialMessages
|
||||
// Used to filter out operation messages when their results arrive
|
||||
const completedToolIds = useMemo(() => {
|
||||
const processedInitial = processInitialMessages(initialMessages);
|
||||
const ids = new Set<string>();
|
||||
for (const msg of processedInitial) {
|
||||
if (
|
||||
msg.type === "tool_response" ||
|
||||
msg.type === "agent_carousel" ||
|
||||
msg.type === "execution_started"
|
||||
) {
|
||||
const toolId = (msg as any).toolId;
|
||||
if (toolId) {
|
||||
ids.add(toolId);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ids;
|
||||
}, [initialMessages]);
|
||||
|
||||
// Clean up local operation messages when their completed results arrive from polling
|
||||
// This effect runs when completedToolIds changes (i.e., when polling brings new results)
|
||||
useEffect(
|
||||
function cleanupCompletedOperations() {
|
||||
if (completedToolIds.size === 0) return;
|
||||
|
||||
setMessages((prev) => {
|
||||
const filtered = prev.filter((msg) => {
|
||||
if (
|
||||
msg.type === "operation_started" ||
|
||||
msg.type === "operation_pending" ||
|
||||
msg.type === "operation_in_progress"
|
||||
) {
|
||||
const toolId = (msg as any).toolId || (msg as any).toolCallId;
|
||||
if (toolId && completedToolIds.has(toolId)) {
|
||||
return false; // Remove - operation completed
|
||||
}
|
||||
}
|
||||
return true;
|
||||
});
|
||||
// Only update state if something was actually filtered
|
||||
return filtered.length === prev.length ? prev : filtered;
|
||||
});
|
||||
},
|
||||
[completedToolIds],
|
||||
);
|
||||
|
||||
// Combine initial messages from backend with local streaming messages,
|
||||
// then deduplicate to prevent duplicates when polling refreshes initialMessages
|
||||
const allMessages = useMemo(() => {
|
||||
const processedInitial = processInitialMessages(initialMessages);
|
||||
|
||||
// Filter local messages to remove operation messages for completed tools
|
||||
const filteredLocalMessages = messages.filter((msg) => {
|
||||
if (
|
||||
msg.type === "operation_started" ||
|
||||
msg.type === "operation_pending" ||
|
||||
msg.type === "operation_in_progress"
|
||||
) {
|
||||
const toolId = (msg as any).toolId || (msg as any).toolCallId;
|
||||
if (toolId && completedToolIds.has(toolId)) {
|
||||
return false; // Filter out - operation completed
|
||||
}
|
||||
}
|
||||
return true;
|
||||
});
|
||||
|
||||
const combined = [...processedInitial, ...filteredLocalMessages];
|
||||
|
||||
// Deduplicate by content+role+timestamp. When initialMessages is refreshed via polling,
|
||||
// it may contain messages that are also in the local `messages` state.
|
||||
// Including timestamp prevents dropping legitimate repeated messages (e.g., user sends "yes" twice)
|
||||
const seen = new Set<string>();
|
||||
return combined.filter((msg) => {
|
||||
// Create a key based on type, role, content, and timestamp for deduplication
|
||||
let key: string;
|
||||
if (msg.type === "message") {
|
||||
// Use timestamp (rounded to nearest second) to allow slight variations
|
||||
// while still catching true duplicates from SSE/polling overlap
|
||||
const ts = msg.timestamp
|
||||
? Math.floor(new Date(msg.timestamp).getTime() / 1000)
|
||||
: "";
|
||||
key = `msg:${msg.role}:${ts}:${msg.content}`;
|
||||
} else if (msg.type === "tool_call") {
|
||||
key = `toolcall:${msg.toolId}`;
|
||||
} else if (
|
||||
msg.type === "operation_started" ||
|
||||
msg.type === "operation_pending" ||
|
||||
msg.type === "operation_in_progress"
|
||||
) {
|
||||
// Dedupe operation messages by toolId or operationId
|
||||
key = `op:${(msg as any).toolId || (msg as any).operationId || (msg as any).toolCallId || ""}:${msg.toolName}`;
|
||||
} else {
|
||||
// For other types, use a combination of type and first few fields
|
||||
key = `${msg.type}:${JSON.stringify(msg).slice(0, 100)}`;
|
||||
}
|
||||
if (seen.has(key)) {
|
||||
return false;
|
||||
}
|
||||
seen.add(key);
|
||||
return true;
|
||||
});
|
||||
}, [initialMessages, messages]);
|
||||
|
||||
async function sendMessage(
|
||||
content: string,
|
||||
isUserMessage: boolean = true,
|
||||
|
||||
@@ -16,6 +16,7 @@ import { AuthPromptWidget } from "../AuthPromptWidget/AuthPromptWidget";
|
||||
import { ChatCredentialsSetup } from "../ChatCredentialsSetup/ChatCredentialsSetup";
|
||||
import { ClarificationQuestionsWidget } from "../ClarificationQuestionsWidget/ClarificationQuestionsWidget";
|
||||
import { ExecutionStartedMessage } from "../ExecutionStartedMessage/ExecutionStartedMessage";
|
||||
import { PendingOperationWidget } from "../PendingOperationWidget/PendingOperationWidget";
|
||||
import { MarkdownContent } from "../MarkdownContent/MarkdownContent";
|
||||
import { NoResultsMessage } from "../NoResultsMessage/NoResultsMessage";
|
||||
import { ToolCallMessage } from "../ToolCallMessage/ToolCallMessage";
|
||||
@@ -71,6 +72,9 @@ export function ChatMessage({
|
||||
isLoginNeeded,
|
||||
isCredentialsNeeded,
|
||||
isClarificationNeeded,
|
||||
isOperationStarted,
|
||||
isOperationPending,
|
||||
isOperationInProgress,
|
||||
} = useChatMessage(message);
|
||||
const displayContent = getDisplayContent(message, isUser);
|
||||
|
||||
@@ -290,6 +294,42 @@ export function ChatMessage({
|
||||
);
|
||||
}
|
||||
|
||||
// Render operation_started messages (long-running background operations)
|
||||
if (isOperationStarted && message.type === "operation_started") {
|
||||
return (
|
||||
<PendingOperationWidget
|
||||
status="started"
|
||||
message={message.message}
|
||||
toolName={message.toolName}
|
||||
className={className}
|
||||
/>
|
||||
);
|
||||
}
|
||||
|
||||
// Render operation_pending messages (operations in progress when refreshing)
|
||||
if (isOperationPending && message.type === "operation_pending") {
|
||||
return (
|
||||
<PendingOperationWidget
|
||||
status="pending"
|
||||
message={message.message}
|
||||
toolName={message.toolName}
|
||||
className={className}
|
||||
/>
|
||||
);
|
||||
}
|
||||
|
||||
// Render operation_in_progress messages (duplicate request while operation running)
|
||||
if (isOperationInProgress && message.type === "operation_in_progress") {
|
||||
return (
|
||||
<PendingOperationWidget
|
||||
status="in_progress"
|
||||
message={message.message}
|
||||
toolName={message.toolName}
|
||||
className={className}
|
||||
/>
|
||||
);
|
||||
}
|
||||
|
||||
// Render tool response messages (but skip agent_output if it's being rendered inside assistant message)
|
||||
if (isToolResponse && message.type === "tool_response") {
|
||||
return (
|
||||
|
||||
@@ -61,6 +61,7 @@ export type ChatMessageData =
|
||||
}
|
||||
| {
|
||||
type: "agent_carousel";
|
||||
toolId: string;
|
||||
toolName: string;
|
||||
agents: Array<{
|
||||
id: string;
|
||||
@@ -74,6 +75,7 @@ export type ChatMessageData =
|
||||
}
|
||||
| {
|
||||
type: "execution_started";
|
||||
toolId: string;
|
||||
toolName: string;
|
||||
executionId: string;
|
||||
agentName?: string;
|
||||
@@ -103,6 +105,29 @@ export type ChatMessageData =
|
||||
message: string;
|
||||
sessionId: string;
|
||||
timestamp?: string | Date;
|
||||
}
|
||||
| {
|
||||
type: "operation_started";
|
||||
toolName: string;
|
||||
toolId: string;
|
||||
operationId: string;
|
||||
message: string;
|
||||
timestamp?: string | Date;
|
||||
}
|
||||
| {
|
||||
type: "operation_pending";
|
||||
toolName: string;
|
||||
toolId: string;
|
||||
operationId: string;
|
||||
message: string;
|
||||
timestamp?: string | Date;
|
||||
}
|
||||
| {
|
||||
type: "operation_in_progress";
|
||||
toolName: string;
|
||||
toolCallId: string;
|
||||
message: string;
|
||||
timestamp?: string | Date;
|
||||
};
|
||||
|
||||
export function useChatMessage(message: ChatMessageData) {
|
||||
@@ -124,5 +149,8 @@ export function useChatMessage(message: ChatMessageData) {
|
||||
isExecutionStarted: message.type === "execution_started",
|
||||
isInputsNeeded: message.type === "inputs_needed",
|
||||
isClarificationNeeded: message.type === "clarification_needed",
|
||||
isOperationStarted: message.type === "operation_started",
|
||||
isOperationPending: message.type === "operation_pending",
|
||||
isOperationInProgress: message.type === "operation_in_progress",
|
||||
};
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ export function ClarificationQuestionsWidget({
|
||||
className,
|
||||
}: Props) {
|
||||
const [answers, setAnswers] = useState<Record<string, string>>({});
|
||||
const [isSubmitted, setIsSubmitted] = useState(false);
|
||||
|
||||
function handleAnswerChange(keyword: string, value: string) {
|
||||
setAnswers((prev) => ({ ...prev, [keyword]: value }));
|
||||
@@ -41,11 +42,42 @@ export function ClarificationQuestionsWidget({
|
||||
if (!allAnswered) {
|
||||
return;
|
||||
}
|
||||
setIsSubmitted(true);
|
||||
onSubmitAnswers(answers);
|
||||
}
|
||||
|
||||
const allAnswered = questions.every((q) => answers[q.keyword]?.trim());
|
||||
|
||||
// Show submitted state after answers are submitted
|
||||
if (isSubmitted) {
|
||||
return (
|
||||
<div
|
||||
className={cn(
|
||||
"group relative flex w-full justify-start gap-3 px-4 py-3",
|
||||
className,
|
||||
)}
|
||||
>
|
||||
<div className="flex w-full max-w-3xl gap-3">
|
||||
<div className="flex-shrink-0">
|
||||
<div className="flex h-7 w-7 items-center justify-center rounded-lg bg-green-500">
|
||||
<CheckCircleIcon className="h-4 w-4 text-white" weight="bold" />
|
||||
</div>
|
||||
</div>
|
||||
<div className="flex min-w-0 flex-1 flex-col">
|
||||
<Card className="p-4">
|
||||
<Text variant="h4" className="mb-1 text-slate-900">
|
||||
Answers submitted
|
||||
</Text>
|
||||
<Text variant="small" className="text-slate-600">
|
||||
Processing your responses...
|
||||
</Text>
|
||||
</Card>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
return (
|
||||
<div
|
||||
className={cn(
|
||||
|
||||
@@ -0,0 +1,109 @@
|
||||
"use client";
|
||||
|
||||
import { Card } from "@/components/atoms/Card/Card";
|
||||
import { Text } from "@/components/atoms/Text/Text";
|
||||
import { cn } from "@/lib/utils";
|
||||
import { CircleNotch, CheckCircle, XCircle } from "@phosphor-icons/react";
|
||||
|
||||
type OperationStatus =
|
||||
| "pending"
|
||||
| "started"
|
||||
| "in_progress"
|
||||
| "completed"
|
||||
| "error";
|
||||
|
||||
interface Props {
|
||||
status: OperationStatus;
|
||||
message: string;
|
||||
toolName?: string;
|
||||
className?: string;
|
||||
}
|
||||
|
||||
function getOperationTitle(toolName?: string): string {
|
||||
if (!toolName) return "Operation";
|
||||
// Convert tool name to human-readable format
|
||||
// e.g., "create_agent" -> "Creating Agent", "edit_agent" -> "Editing Agent"
|
||||
if (toolName === "create_agent") return "Creating Agent";
|
||||
if (toolName === "edit_agent") return "Editing Agent";
|
||||
// Default: capitalize and format tool name
|
||||
return toolName
|
||||
.split("_")
|
||||
.map((word) => word.charAt(0).toUpperCase() + word.slice(1))
|
||||
.join(" ");
|
||||
}
|
||||
|
||||
export function PendingOperationWidget({
|
||||
status,
|
||||
message,
|
||||
toolName,
|
||||
className,
|
||||
}: Props) {
|
||||
const isPending =
|
||||
status === "pending" || status === "started" || status === "in_progress";
|
||||
const isCompleted = status === "completed";
|
||||
const isError = status === "error";
|
||||
|
||||
const operationTitle = getOperationTitle(toolName);
|
||||
|
||||
return (
|
||||
<div
|
||||
className={cn(
|
||||
"group relative flex w-full justify-start gap-3 px-4 py-3",
|
||||
className,
|
||||
)}
|
||||
>
|
||||
<div className="flex w-full max-w-3xl gap-3">
|
||||
<div className="flex-shrink-0">
|
||||
<div
|
||||
className={cn(
|
||||
"flex h-7 w-7 items-center justify-center rounded-lg",
|
||||
isPending && "bg-blue-500",
|
||||
isCompleted && "bg-green-500",
|
||||
isError && "bg-red-500",
|
||||
)}
|
||||
>
|
||||
{isPending && (
|
||||
<CircleNotch
|
||||
className="h-4 w-4 animate-spin text-white"
|
||||
weight="bold"
|
||||
/>
|
||||
)}
|
||||
{isCompleted && (
|
||||
<CheckCircle className="h-4 w-4 text-white" weight="bold" />
|
||||
)}
|
||||
{isError && (
|
||||
<XCircle className="h-4 w-4 text-white" weight="bold" />
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="flex min-w-0 flex-1 flex-col">
|
||||
<Card className="space-y-2 p-4">
|
||||
<div>
|
||||
<Text variant="h4" className="mb-1 text-slate-900">
|
||||
{isPending && operationTitle}
|
||||
{isCompleted && `${operationTitle} Complete`}
|
||||
{isError && `${operationTitle} Failed`}
|
||||
</Text>
|
||||
<Text variant="small" className="text-slate-600">
|
||||
{message}
|
||||
</Text>
|
||||
</div>
|
||||
|
||||
{isPending && (
|
||||
<Text variant="small" className="italic text-slate-500">
|
||||
Check your library in a few minutes.
|
||||
</Text>
|
||||
)}
|
||||
|
||||
{toolName && (
|
||||
<Text variant="small" className="text-slate-400">
|
||||
Tool: {toolName}
|
||||
</Text>
|
||||
)}
|
||||
</Card>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
import {
|
||||
getGetV2GetSessionQueryKey,
|
||||
getGetV2GetSessionQueryOptions,
|
||||
getGetV2ListSessionsQueryKey,
|
||||
postV2CreateSession,
|
||||
useGetV2GetSession,
|
||||
usePatchV2SessionAssignUser,
|
||||
@@ -102,6 +103,100 @@ export function useChatSession({
|
||||
}
|
||||
}, [createError, loadError]);
|
||||
|
||||
// Check if there are any pending operations in the messages
|
||||
// Must check all operation types: operation_pending, operation_started, operation_in_progress
|
||||
const hasPendingOperations = useMemo(() => {
|
||||
if (!messages || messages.length === 0) return false;
|
||||
const pendingTypes = new Set([
|
||||
"operation_pending",
|
||||
"operation_in_progress",
|
||||
"operation_started",
|
||||
]);
|
||||
return messages.some((msg) => {
|
||||
if (msg.role !== "tool" || !msg.content) return false;
|
||||
try {
|
||||
const content =
|
||||
typeof msg.content === "string"
|
||||
? JSON.parse(msg.content)
|
||||
: msg.content;
|
||||
return pendingTypes.has(content?.type);
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}, [messages]);
|
||||
|
||||
// Refresh sessions list when a pending operation completes
|
||||
// (hasPendingOperations transitions from true to false)
|
||||
const prevHasPendingOperationsRef = useRef(hasPendingOperations);
|
||||
useEffect(
|
||||
function refreshSessionsListOnOperationComplete() {
|
||||
const wasHasPending = prevHasPendingOperationsRef.current;
|
||||
prevHasPendingOperationsRef.current = hasPendingOperations;
|
||||
|
||||
// Only invalidate when transitioning from pending to not pending
|
||||
if (wasHasPending && !hasPendingOperations && sessionId) {
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: getGetV2ListSessionsQueryKey(),
|
||||
});
|
||||
}
|
||||
},
|
||||
[hasPendingOperations, sessionId, queryClient],
|
||||
);
|
||||
|
||||
// Poll for updates when there are pending operations (long poll - 10s intervals with backoff)
|
||||
const pollAttemptRef = useRef(0);
|
||||
const hasPendingOperationsRef = useRef(hasPendingOperations);
|
||||
hasPendingOperationsRef.current = hasPendingOperations;
|
||||
|
||||
useEffect(
|
||||
function pollForPendingOperations() {
|
||||
if (!sessionId || !hasPendingOperations) {
|
||||
pollAttemptRef.current = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
let cancelled = false;
|
||||
let timeoutId: ReturnType<typeof setTimeout> | null = null;
|
||||
|
||||
// Calculate delay with exponential backoff: 10s, 15s, 20s, 25s, 30s (max)
|
||||
const baseDelay = 10000;
|
||||
const maxDelay = 30000;
|
||||
|
||||
function schedule() {
|
||||
const delay = Math.min(
|
||||
baseDelay + pollAttemptRef.current * 5000,
|
||||
maxDelay,
|
||||
);
|
||||
timeoutId = setTimeout(async () => {
|
||||
if (cancelled) return;
|
||||
console.info(
|
||||
`[useChatSession] Polling for pending operation updates (attempt ${pollAttemptRef.current + 1})`,
|
||||
);
|
||||
pollAttemptRef.current += 1;
|
||||
try {
|
||||
await refetch();
|
||||
} catch (err) {
|
||||
console.error("[useChatSession] Poll failed:", err);
|
||||
} finally {
|
||||
// Continue polling if still pending and not cancelled
|
||||
if (!cancelled && hasPendingOperationsRef.current) {
|
||||
schedule();
|
||||
}
|
||||
}
|
||||
}, delay);
|
||||
}
|
||||
|
||||
schedule();
|
||||
|
||||
return () => {
|
||||
cancelled = true;
|
||||
if (timeoutId) clearTimeout(timeoutId);
|
||||
};
|
||||
},
|
||||
[sessionId, hasPendingOperations, refetch],
|
||||
);
|
||||
|
||||
async function createSession() {
|
||||
try {
|
||||
setError(null);
|
||||
@@ -228,6 +323,7 @@ export function useChatSession({
|
||||
isCreating,
|
||||
error,
|
||||
isSessionNotFound: isNotFoundError(loadError),
|
||||
hasPendingOperations,
|
||||
createSession,
|
||||
loadSession,
|
||||
refreshSession,
|
||||
|
||||
Reference in New Issue
Block a user