From 171ff6e776a5f4b77e916b9103c8554538ea6453 Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Tue, 27 Jan 2026 16:09:34 -0600 Subject: [PATCH] feat(backend): persist long-running tool results to survive SSE disconnects (#11856) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary Agent generation (`create_agent`, `edit_agent`) can take 1-5 minutes. Previously, if the user closed their browser tab during this time: 1. The SSE connection would die 2. The tool execution would be cancelled via `CancelledError` 3. The result would be lost - even if the agent-generator service completed successfully This PR ensures long-running tool operations survive SSE disconnections. ### Changes 🏗️ **Backend:** - **base.py**: Added `is_long_running` property to `BaseTool` for tools to opt-in to background execution - **create_agent.py / edit_agent.py**: Set `is_long_running = True` - **models.py**: Added `OperationStartedResponse`, `OperationPendingResponse`, `OperationInProgressResponse` types - **service.py**: Modified `_yield_tool_call()` to: - Check if tool is `is_long_running` - Save "pending" message to chat history immediately - Spawn background task that runs independently of SSE - Return `operation_started` immediately (don't wait) - Update chat history with result when background task completes - Track running operations for idempotency (prevents duplicate ops on refresh) - **db.py**: Added `update_tool_message_content()` to update pending messages - **model.py**: Added `invalidate_session_cache()` to clear Redis after background completion **Frontend:** - **useChatMessage.ts**: Added operation message types - **helpers.ts**: Handle `operation_started`, `operation_pending`, `operation_in_progress` response types - **PendingOperationWidget**: New component to display operation status with spinner - **ChatMessage.tsx**: Render `PendingOperationWidget` for operation messages ### How It Works ``` User Request → Save "pending" message → Spawn background task → Return immediately ↓ Task runs independently of SSE ↓ On completion: Update message in chat history ↓ User refreshes → Loads history → Sees result ``` ### User Experience 1. User requests agent creation 2. Sees "Agent creation started. You can close this tab - check your library in a few minutes." 3. Can close browser tab safely 4. When they return, chat shows the completed result (or error) ### Checklist 📋 #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: - [x] pyright passes (0 errors) - [x] TypeScript checks pass - [x] Formatters applied ### Test Plan 1. Start agent creation in copilot 2. Close browser tab immediately after seeing "operation_started" 3. Wait 2-3 minutes 4. Reopen chat 5. Verify: Chat history shows completion message and agent appears in library --------- Co-authored-by: Ubbe --- .../backend/api/features/chat/config.py | 6 + .../backend/backend/api/features/chat/db.py | 42 ++ .../backend/api/features/chat/model.py | 15 + .../backend/api/features/chat/service.py | 437 +++++++++++++++++- .../api/features/chat/tools/__init__.py | 7 +- .../backend/api/features/chat/tools/base.py | 10 + .../api/features/chat/tools/create_agent.py | 4 + .../api/features/chat/tools/edit_agent.py | 4 + .../backend/api/features/chat/tools/models.py | 40 ++ .../backend/backend/util/settings.py | 4 +- .../Chat/components/ChatContainer/handlers.ts | 34 +- .../Chat/components/ChatContainer/helpers.ts | 37 ++ .../ChatContainer/useChatContainer.ts | 105 ++++- .../components/ChatMessage/ChatMessage.tsx | 40 ++ .../components/ChatMessage/useChatMessage.ts | 28 ++ .../ClarificationQuestionsWidget.tsx | 32 ++ .../PendingOperationWidget.tsx | 109 +++++ .../contextual/Chat/useChatSession.ts | 96 ++++ 18 files changed, 1023 insertions(+), 27 deletions(-) create mode 100644 autogpt_platform/frontend/src/components/contextual/Chat/components/PendingOperationWidget/PendingOperationWidget.tsx diff --git a/autogpt_platform/backend/backend/api/features/chat/config.py b/autogpt_platform/backend/backend/api/features/chat/config.py index 95aef7f2ed..d782ab7242 100644 --- a/autogpt_platform/backend/backend/api/features/chat/config.py +++ b/autogpt_platform/backend/backend/api/features/chat/config.py @@ -38,6 +38,12 @@ class ChatConfig(BaseSettings): default=3, description="Maximum number of agent schedules" ) + # Long-running operation configuration + long_running_operation_ttl: int = Field( + default=600, + description="TTL in seconds for long-running operation tracking in Redis (safety net if pod dies)", + ) + # Langfuse Prompt Management Configuration # Note: Langfuse credentials are in Settings().secrets (settings.py) langfuse_prompt_name: str = Field( diff --git a/autogpt_platform/backend/backend/api/features/chat/db.py b/autogpt_platform/backend/backend/api/features/chat/db.py index 05a3553cc8..d34b4e5b07 100644 --- a/autogpt_platform/backend/backend/api/features/chat/db.py +++ b/autogpt_platform/backend/backend/api/features/chat/db.py @@ -247,3 +247,45 @@ async def get_chat_session_message_count(session_id: str) -> int: """Get the number of messages in a chat session.""" count = await PrismaChatMessage.prisma().count(where={"sessionId": session_id}) return count + + +async def update_tool_message_content( + session_id: str, + tool_call_id: str, + new_content: str, +) -> bool: + """Update the content of a tool message in chat history. + + Used by background tasks to update pending operation messages with final results. + + Args: + session_id: The chat session ID. + tool_call_id: The tool call ID to find the message. + new_content: The new content to set. + + Returns: + True if a message was updated, False otherwise. + """ + try: + result = await PrismaChatMessage.prisma().update_many( + where={ + "sessionId": session_id, + "toolCallId": tool_call_id, + }, + data={ + "content": new_content, + }, + ) + if result == 0: + logger.warning( + f"No message found to update for session {session_id}, " + f"tool_call_id {tool_call_id}" + ) + return False + return True + except Exception as e: + logger.error( + f"Failed to update tool message for session {session_id}, " + f"tool_call_id {tool_call_id}: {e}" + ) + return False diff --git a/autogpt_platform/backend/backend/api/features/chat/model.py b/autogpt_platform/backend/backend/api/features/chat/model.py index 75bda11127..7318ef88d7 100644 --- a/autogpt_platform/backend/backend/api/features/chat/model.py +++ b/autogpt_platform/backend/backend/api/features/chat/model.py @@ -295,6 +295,21 @@ async def cache_chat_session(session: ChatSession) -> None: await _cache_session(session) +async def invalidate_session_cache(session_id: str) -> None: + """Invalidate a chat session from Redis cache. + + Used by background tasks to ensure fresh data is loaded on next access. + This is best-effort - Redis failures are logged but don't fail the operation. + """ + try: + redis_key = _get_session_cache_key(session_id) + async_redis = await get_redis_async() + await async_redis.delete(redis_key) + except Exception as e: + # Best-effort: log but don't fail - cache will expire naturally + logger.warning(f"Failed to invalidate session cache for {session_id}: {e}") + + async def _get_session_from_db(session_id: str) -> ChatSession | None: """Get a chat session from the database.""" prisma_session = await chat_db.get_chat_session(session_id) diff --git a/autogpt_platform/backend/backend/api/features/chat/service.py b/autogpt_platform/backend/backend/api/features/chat/service.py index 386b37784d..6d99aab583 100644 --- a/autogpt_platform/backend/backend/api/features/chat/service.py +++ b/autogpt_platform/backend/backend/api/features/chat/service.py @@ -17,6 +17,7 @@ from openai import ( ) from openai.types.chat import ChatCompletionChunk, ChatCompletionToolParam +from backend.data.redis_client import get_redis_async from backend.data.understanding import ( format_understanding_for_prompt, get_business_understanding, @@ -24,6 +25,7 @@ from backend.data.understanding import ( from backend.util.exceptions import NotFoundError from backend.util.settings import Settings +from . import db as chat_db from .config import ChatConfig from .model import ( ChatMessage, @@ -31,6 +33,7 @@ from .model import ( Usage, cache_chat_session, get_chat_session, + invalidate_session_cache, update_session_title, upsert_chat_session, ) @@ -48,8 +51,13 @@ from .response_model import ( StreamToolOutputAvailable, StreamUsage, ) -from .tools import execute_tool, tools -from .tools.models import ErrorResponse +from .tools import execute_tool, get_tool, tools +from .tools.models import ( + ErrorResponse, + OperationInProgressResponse, + OperationPendingResponse, + OperationStartedResponse, +) from .tracking import track_user_message logger = logging.getLogger(__name__) @@ -61,6 +69,43 @@ client = openai.AsyncOpenAI(api_key=config.api_key, base_url=config.base_url) langfuse = get_client() +# Redis key prefix for tracking running long-running operations +# Used for idempotency across Kubernetes pods - prevents duplicate executions on browser refresh +RUNNING_OPERATION_PREFIX = "chat:running_operation:" + +# Module-level set to hold strong references to background tasks. +# This prevents asyncio from garbage collecting tasks before they complete. +# Tasks are automatically removed on completion via done_callback. +_background_tasks: set[asyncio.Task] = set() + + +async def _mark_operation_started(tool_call_id: str) -> bool: + """Mark a long-running operation as started (Redis-based). + + Returns True if successfully marked (operation was not already running), + False if operation was already running (lost race condition). + Raises exception if Redis is unavailable (fail-closed). + """ + redis = await get_redis_async() + key = f"{RUNNING_OPERATION_PREFIX}{tool_call_id}" + # SETNX with TTL - atomic "set if not exists" + result = await redis.set(key, "1", ex=config.long_running_operation_ttl, nx=True) + return result is not None + + +async def _mark_operation_completed(tool_call_id: str) -> None: + """Mark a long-running operation as completed (remove Redis key). + + This is best-effort - if Redis fails, the TTL will eventually clean up. + """ + try: + redis = await get_redis_async() + key = f"{RUNNING_OPERATION_PREFIX}{tool_call_id}" + await redis.delete(key) + except Exception as e: + # Non-critical: TTL will clean up eventually + logger.warning(f"Failed to delete running operation key {tool_call_id}: {e}") + class LangfuseNotConfiguredError(Exception): """Raised when Langfuse is required but not configured.""" @@ -315,6 +360,7 @@ async def stream_chat_completion( has_yielded_end = False has_yielded_error = False has_done_tool_call = False + has_long_running_tool_call = False # Track if we had a long-running tool call has_received_text = False text_streaming_ended = False tool_response_messages: list[ChatMessage] = [] @@ -336,7 +382,6 @@ async def stream_chat_completion( system_prompt=system_prompt, text_block_id=text_block_id, ): - if isinstance(chunk, StreamTextStart): # Emit text-start before first text delta if not has_received_text: @@ -394,13 +439,34 @@ async def stream_chat_completion( if isinstance(chunk.output, str) else orjson.dumps(chunk.output).decode("utf-8") ) - tool_response_messages.append( - ChatMessage( - role="tool", - content=result_content, - tool_call_id=chunk.toolCallId, + # Skip saving long-running operation responses - messages already saved in _yield_tool_call + # Use JSON parsing instead of substring matching to avoid false positives + is_long_running_response = False + try: + parsed = orjson.loads(result_content) + if isinstance(parsed, dict) and parsed.get("type") in ( + "operation_started", + "operation_in_progress", + ): + is_long_running_response = True + except (orjson.JSONDecodeError, TypeError): + pass # Not JSON or not a dict - treat as regular response + if is_long_running_response: + # Remove from accumulated_tool_calls since assistant message was already saved + accumulated_tool_calls[:] = [ + tc + for tc in accumulated_tool_calls + if tc["id"] != chunk.toolCallId + ] + has_long_running_tool_call = True + else: + tool_response_messages.append( + ChatMessage( + role="tool", + content=result_content, + tool_call_id=chunk.toolCallId, + ) ) - ) has_done_tool_call = True # Track if any tool execution failed if not chunk.success: @@ -576,7 +642,14 @@ async def stream_chat_completion( logger.info( f"Extended session messages, new message_count={len(session.messages)}" ) - if messages_to_save or has_appended_streaming_message: + # Save if there are regular (non-long-running) tool responses or streaming message. + # Long-running tools save their own state, but we still need to save regular tools + # that may be in the same response. + has_regular_tool_responses = len(tool_response_messages) > 0 + if has_regular_tool_responses or ( + not has_long_running_tool_call + and (messages_to_save or has_appended_streaming_message) + ): await upsert_chat_session(session) else: logger.info( @@ -585,7 +658,9 @@ async def stream_chat_completion( ) # If we did a tool call, stream the chat completion again to get the next response - if has_done_tool_call: + # Skip only if ALL tools were long-running (they handle their own completion) + has_regular_tools = len(tool_response_messages) > 0 + if has_done_tool_call and (has_regular_tools or not has_long_running_tool_call): logger.info( "Tool call executed, streaming chat completion again to get assistant response" ) @@ -1260,14 +1335,17 @@ async def _yield_tool_call( """ Yield a tool call and its execution result. - For long-running tools, yields heartbeat events every 15 seconds to keep - the SSE connection alive through proxies and load balancers. + For tools marked with `is_long_running=True` (like agent generation), spawns a + background task so the operation survives SSE disconnections. For other tools, + yields heartbeat events every 15 seconds to keep the SSE connection alive. Raises: orjson.JSONDecodeError: If tool call arguments cannot be parsed as JSON KeyError: If expected tool call fields are missing TypeError: If tool call structure is invalid """ + import uuid as uuid_module + tool_name = tool_calls[yield_idx]["function"]["name"] tool_call_id = tool_calls[yield_idx]["id"] logger.info(f"Yielding tool call: {tool_calls[yield_idx]}") @@ -1285,7 +1363,151 @@ async def _yield_tool_call( input=arguments, ) - # Run tool execution in background task with heartbeats to keep connection alive + # Check if this tool is long-running (survives SSE disconnection) + tool = get_tool(tool_name) + if tool and tool.is_long_running: + # Atomic check-and-set: returns False if operation already running (lost race) + if not await _mark_operation_started(tool_call_id): + logger.info( + f"Tool call {tool_call_id} already in progress, returning status" + ) + # Build dynamic message based on tool name + if tool_name == "create_agent": + in_progress_msg = "Agent creation already in progress. Please wait..." + elif tool_name == "edit_agent": + in_progress_msg = "Agent edit already in progress. Please wait..." + else: + in_progress_msg = f"{tool_name} already in progress. Please wait..." + + yield StreamToolOutputAvailable( + toolCallId=tool_call_id, + toolName=tool_name, + output=OperationInProgressResponse( + message=in_progress_msg, + tool_call_id=tool_call_id, + ).model_dump_json(), + success=True, + ) + return + + # Generate operation ID + operation_id = str(uuid_module.uuid4()) + + # Build a user-friendly message based on tool and arguments + if tool_name == "create_agent": + agent_desc = arguments.get("description", "") + # Truncate long descriptions for the message + desc_preview = ( + (agent_desc[:100] + "...") if len(agent_desc) > 100 else agent_desc + ) + pending_msg = ( + f"Creating your agent: {desc_preview}" + if desc_preview + else "Creating agent... This may take a few minutes." + ) + started_msg = ( + "Agent creation started. You can close this tab - " + "check your library in a few minutes." + ) + elif tool_name == "edit_agent": + changes = arguments.get("changes", "") + changes_preview = (changes[:100] + "...") if len(changes) > 100 else changes + pending_msg = ( + f"Editing agent: {changes_preview}" + if changes_preview + else "Editing agent... This may take a few minutes." + ) + started_msg = ( + "Agent edit started. You can close this tab - " + "check your library in a few minutes." + ) + else: + pending_msg = f"Running {tool_name}... This may take a few minutes." + started_msg = ( + f"{tool_name} started. You can close this tab - " + "check back in a few minutes." + ) + + # Track appended messages for rollback on failure + assistant_message: ChatMessage | None = None + pending_message: ChatMessage | None = None + + # Wrap session save and task creation in try-except to release lock on failure + try: + # Save assistant message with tool_call FIRST (required by LLM) + assistant_message = ChatMessage( + role="assistant", + content="", + tool_calls=[tool_calls[yield_idx]], + ) + session.messages.append(assistant_message) + + # Then save pending tool result + pending_message = ChatMessage( + role="tool", + content=OperationPendingResponse( + message=pending_msg, + operation_id=operation_id, + tool_name=tool_name, + ).model_dump_json(), + tool_call_id=tool_call_id, + ) + session.messages.append(pending_message) + await upsert_chat_session(session) + logger.info( + f"Saved pending operation {operation_id} for tool {tool_name} " + f"in session {session.session_id}" + ) + + # Store task reference in module-level set to prevent GC before completion + task = asyncio.create_task( + _execute_long_running_tool( + tool_name=tool_name, + parameters=arguments, + tool_call_id=tool_call_id, + operation_id=operation_id, + session_id=session.session_id, + user_id=session.user_id, + ) + ) + _background_tasks.add(task) + task.add_done_callback(_background_tasks.discard) + except Exception as e: + # Roll back appended messages to prevent data corruption on subsequent saves + if ( + pending_message + and session.messages + and session.messages[-1] == pending_message + ): + session.messages.pop() + if ( + assistant_message + and session.messages + and session.messages[-1] == assistant_message + ): + session.messages.pop() + + # Release the Redis lock since the background task won't be spawned + await _mark_operation_completed(tool_call_id) + logger.error( + f"Failed to setup long-running tool {tool_name}: {e}", exc_info=True + ) + raise + + # Return immediately - don't wait for completion + yield StreamToolOutputAvailable( + toolCallId=tool_call_id, + toolName=tool_name, + output=OperationStartedResponse( + message=started_msg, + operation_id=operation_id, + tool_name=tool_name, + ).model_dump_json(), + success=True, + ) + return + + # Normal flow: Run tool execution in background task with heartbeats tool_task = asyncio.create_task( execute_tool( tool_name=tool_name, @@ -1335,3 +1557,190 @@ async def _yield_tool_call( ) yield tool_execution_response + + +async def _execute_long_running_tool( + tool_name: str, + parameters: dict[str, Any], + tool_call_id: str, + operation_id: str, + session_id: str, + user_id: str | None, +) -> None: + """Execute a long-running tool in background and update chat history with result. + + This function runs independently of the SSE connection, so the operation + survives if the user closes their browser tab. + """ + try: + # Load fresh session (not stale reference) + session = await get_chat_session(session_id, user_id) + if not session: + logger.error(f"Session {session_id} not found for background tool") + return + + # Execute the actual tool + result = await execute_tool( + tool_name=tool_name, + parameters=parameters, + tool_call_id=tool_call_id, + user_id=user_id, + session=session, + ) + + # Update the pending message with result + await _update_pending_operation( + session_id=session_id, + tool_call_id=tool_call_id, + result=( + result.output + if isinstance(result.output, str) + else orjson.dumps(result.output).decode("utf-8") + ), + ) + + logger.info(f"Background tool {tool_name} completed for session {session_id}") + + # Generate LLM continuation so user sees response when they poll/refresh + await _generate_llm_continuation(session_id=session_id, user_id=user_id) + + except Exception as e: + logger.error(f"Background tool {tool_name} failed: {e}", exc_info=True) + error_response = ErrorResponse( + message=f"Tool {tool_name} failed: {str(e)}", + ) + await _update_pending_operation( + session_id=session_id, + tool_call_id=tool_call_id, + result=error_response.model_dump_json(), + ) + finally: + await _mark_operation_completed(tool_call_id) + + +async def _update_pending_operation( + session_id: str, + tool_call_id: str, + result: str, +) -> None: + """Update the pending tool message with final result. + + This is called by background tasks when long-running operations complete. + """ + # Update the message in database + updated = await chat_db.update_tool_message_content( + session_id=session_id, + tool_call_id=tool_call_id, + new_content=result, + ) + + if updated: + # Invalidate Redis cache so next load gets fresh data + # Wrap in try/except to prevent cache failures from triggering error handling + # that would overwrite our successful DB update + try: + await invalidate_session_cache(session_id) + except Exception as e: + # Non-critical: cache will eventually be refreshed on next load + logger.warning(f"Failed to invalidate cache for session {session_id}: {e}") + logger.info( + f"Updated pending operation for tool_call_id {tool_call_id} " + f"in session {session_id}" + ) + else: + logger.warning( + f"Failed to update pending operation for tool_call_id {tool_call_id} " + f"in session {session_id}" + ) + + +async def _generate_llm_continuation( + session_id: str, + user_id: str | None, +) -> None: + """Generate an LLM response after a long-running tool completes. + + This is called by background tasks to continue the conversation + after a tool result is saved. The response is saved to the database + so users see it when they refresh or poll. + """ + try: + # Load fresh session from DB (bypass cache to get the updated tool result) + await invalidate_session_cache(session_id) + session = await get_chat_session(session_id, user_id) + if not session: + logger.error(f"Session {session_id} not found for LLM continuation") + return + + # Build system prompt + system_prompt, _ = await _build_system_prompt(user_id) + + # Build messages in OpenAI format + messages = session.to_openai_messages() + if system_prompt: + from openai.types.chat import ChatCompletionSystemMessageParam + + system_message = ChatCompletionSystemMessageParam( + role="system", + content=system_prompt, + ) + messages = [system_message] + messages + + # Build extra_body for tracing + extra_body: dict[str, Any] = { + "posthogProperties": { + "environment": settings.config.app_env.value, + }, + } + if user_id: + extra_body["user"] = user_id[:128] + extra_body["posthogDistinctId"] = user_id + if session_id: + extra_body["session_id"] = session_id[:128] + + # Make non-streaming LLM call (no tools - just text response) + from typing import cast + + from openai.types.chat import ChatCompletionMessageParam + + # No tools parameter = text-only response (no tool calls) + response = await client.chat.completions.create( + model=config.model, + messages=cast(list[ChatCompletionMessageParam], messages), + extra_body=extra_body, + ) + + if response.choices and response.choices[0].message.content: + assistant_content = response.choices[0].message.content + + # Reload session from DB to avoid race condition with user messages + # that may have been sent while we were generating the LLM response + fresh_session = await get_chat_session(session_id, user_id) + if not fresh_session: + logger.error( + f"Session {session_id} disappeared during LLM continuation" + ) + return + + # Save assistant message to database + assistant_message = ChatMessage( + role="assistant", + content=assistant_content, + ) + fresh_session.messages.append(assistant_message) + + # Save to database (not cache) to persist the response + await upsert_chat_session(fresh_session) + + # Invalidate cache so next poll/refresh gets fresh data + await invalidate_session_cache(session_id) + + logger.info( + f"Generated LLM continuation for session {session_id}, " + f"response length: {len(assistant_content)}" + ) + else: + logger.warning(f"LLM continuation returned empty response for {session_id}") + + except Exception as e: + logger.error(f"Failed to generate LLM continuation: {e}", exc_info=True) diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/__init__.py b/autogpt_platform/backend/backend/api/features/chat/tools/__init__.py index 284273f3b9..beeb128ae9 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/__init__.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/__init__.py @@ -49,6 +49,11 @@ tools: list[ChatCompletionToolParam] = [ ] +def get_tool(tool_name: str) -> BaseTool | None: + """Get a tool instance by name.""" + return TOOL_REGISTRY.get(tool_name) + + async def execute_tool( tool_name: str, parameters: dict[str, Any], @@ -57,7 +62,7 @@ async def execute_tool( tool_call_id: str, ) -> "StreamToolOutputAvailable": """Execute a tool by name.""" - tool = TOOL_REGISTRY.get(tool_name) + tool = get_tool(tool_name) if not tool: raise ValueError(f"Tool {tool_name} not found") diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/base.py b/autogpt_platform/backend/backend/api/features/chat/tools/base.py index 1dc40c18c7..809e06632b 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/base.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/base.py @@ -36,6 +36,16 @@ class BaseTool: """Whether this tool requires authentication.""" return False + @property + def is_long_running(self) -> bool: + """Whether this tool is long-running and should execute in background. + + Long-running tools (like agent generation) are executed via background + tasks to survive SSE disconnections. The result is persisted to chat + history and visible when the user refreshes. + """ + return False + def as_openai_tool(self) -> ChatCompletionToolParam: """Convert to OpenAI tool format.""" return ChatCompletionToolParam( diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py b/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py index 87ca5ebca7..6b3784e323 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py @@ -42,6 +42,10 @@ class CreateAgentTool(BaseTool): def requires_auth(self) -> bool: return True + @property + def is_long_running(self) -> bool: + return True + @property def parameters(self) -> dict[str, Any]: return { diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py b/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py index d65b050f06..7c4da8ad43 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py @@ -42,6 +42,10 @@ class EditAgentTool(BaseTool): def requires_auth(self) -> bool: return True + @property + def is_long_running(self) -> bool: + return True + @property def parameters(self) -> dict[str, Any]: return { diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/models.py b/autogpt_platform/backend/backend/api/features/chat/tools/models.py index 1736ddb9a8..8552681d03 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/models.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/models.py @@ -28,6 +28,10 @@ class ResponseType(str, Enum): BLOCK_OUTPUT = "block_output" DOC_SEARCH_RESULTS = "doc_search_results" DOC_PAGE = "doc_page" + # Long-running operation types + OPERATION_STARTED = "operation_started" + OPERATION_PENDING = "operation_pending" + OPERATION_IN_PROGRESS = "operation_in_progress" # Base response model @@ -334,3 +338,39 @@ class BlockOutputResponse(ToolResponseBase): block_name: str outputs: dict[str, list[Any]] success: bool = True + + +# Long-running operation models +class OperationStartedResponse(ToolResponseBase): + """Response when a long-running operation has been started in the background. + + This is returned immediately to the client while the operation continues + to execute. The user can close the tab and check back later. + """ + + type: ResponseType = ResponseType.OPERATION_STARTED + operation_id: str + tool_name: str + + +class OperationPendingResponse(ToolResponseBase): + """Response stored in chat history while a long-running operation is executing. + + This is persisted to the database so users see a pending state when they + refresh before the operation completes. + """ + + type: ResponseType = ResponseType.OPERATION_PENDING + operation_id: str + tool_name: str + + +class OperationInProgressResponse(ToolResponseBase): + """Response when an operation is already in progress. + + Returned for idempotency when the same tool_call_id is requested again + while the background task is still running. + """ + + type: ResponseType = ResponseType.OPERATION_IN_PROGRESS + tool_call_id: str diff --git a/autogpt_platform/backend/backend/util/settings.py b/autogpt_platform/backend/backend/util/settings.py index 8d34292803..a42a4d29b4 100644 --- a/autogpt_platform/backend/backend/util/settings.py +++ b/autogpt_platform/backend/backend/util/settings.py @@ -359,8 +359,8 @@ class Config(UpdateTrackingModel["Config"], BaseSettings): description="The port for the Agent Generator service", ) agentgenerator_timeout: int = Field( - default=120, - description="The timeout in seconds for Agent Generator service requests", + default=600, + description="The timeout in seconds for Agent Generator service requests (includes retries for rate limits)", ) enable_example_blocks: bool = Field( diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/handlers.ts b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/handlers.ts index 96198a0386..f406d33db4 100644 --- a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/handlers.ts +++ b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/handlers.ts @@ -48,6 +48,15 @@ export function handleTextEnded( const completedText = deps.streamingChunksRef.current.join(""); if (completedText.trim()) { deps.setMessages((prev) => { + // Check if this exact message already exists to prevent duplicates + const exists = prev.some( + (msg) => + msg.type === "message" && + msg.role === "assistant" && + msg.content === completedText, + ); + if (exists) return prev; + const assistantMessage: ChatMessageData = { type: "message", role: "assistant", @@ -203,13 +212,24 @@ export function handleStreamEnd( ]); } if (completedContent.trim()) { - const assistantMessage: ChatMessageData = { - type: "message", - role: "assistant", - content: completedContent, - timestamp: new Date(), - }; - deps.setMessages((prev) => [...prev, assistantMessage]); + deps.setMessages((prev) => { + // Check if this exact message already exists to prevent duplicates + const exists = prev.some( + (msg) => + msg.type === "message" && + msg.role === "assistant" && + msg.content === completedContent, + ); + if (exists) return prev; + + const assistantMessage: ChatMessageData = { + type: "message", + role: "assistant", + content: completedContent, + timestamp: new Date(), + }; + return [...prev, assistantMessage]; + }); } deps.setStreamingChunks([]); deps.streamingChunksRef.current = []; diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/helpers.ts b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/helpers.ts index 7dee924634..e744c9bc34 100644 --- a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/helpers.ts +++ b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/helpers.ts @@ -304,6 +304,7 @@ export function parseToolResponse( if (isAgentArray(agentsData)) { return { type: "agent_carousel", + toolId, toolName: "agent_carousel", agents: agentsData, totalCount: parsedResult.total_count as number | undefined, @@ -316,6 +317,7 @@ export function parseToolResponse( if (responseType === "execution_started") { return { type: "execution_started", + toolId, toolName: "execution_started", executionId: (parsedResult.execution_id as string) || "", agentName: (parsedResult.graph_name as string) || undefined, @@ -341,6 +343,41 @@ export function parseToolResponse( timestamp: timestamp || new Date(), }; } + if (responseType === "operation_started") { + return { + type: "operation_started", + toolName: (parsedResult.tool_name as string) || toolName, + toolId, + operationId: (parsedResult.operation_id as string) || "", + message: + (parsedResult.message as string) || + "Operation started. You can close this tab.", + timestamp: timestamp || new Date(), + }; + } + if (responseType === "operation_pending") { + return { + type: "operation_pending", + toolName: (parsedResult.tool_name as string) || toolName, + toolId, + operationId: (parsedResult.operation_id as string) || "", + message: + (parsedResult.message as string) || + "Operation in progress. Please wait...", + timestamp: timestamp || new Date(), + }; + } + if (responseType === "operation_in_progress") { + return { + type: "operation_in_progress", + toolName: (parsedResult.tool_name as string) || toolName, + toolCallId: (parsedResult.tool_call_id as string) || toolId, + message: + (parsedResult.message as string) || + "Operation already in progress. Please wait...", + timestamp: timestamp || new Date(), + }; + } if (responseType === "need_login") { return { type: "login_needed", diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/useChatContainer.ts b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/useChatContainer.ts index b7f9d305dd..83730cc308 100644 --- a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/useChatContainer.ts +++ b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/useChatContainer.ts @@ -82,11 +82,110 @@ export function useChatContainer({ [sessionId, stopStreaming, activeStreams, subscribeToStream], ); - const allMessages = useMemo( - () => [...processInitialMessages(initialMessages), ...messages], - [initialMessages, messages], + // Collect toolIds from completed tool results in initialMessages + // Used to filter out operation messages when their results arrive + const completedToolIds = useMemo(() => { + const processedInitial = processInitialMessages(initialMessages); + const ids = new Set(); + for (const msg of processedInitial) { + if ( + msg.type === "tool_response" || + msg.type === "agent_carousel" || + msg.type === "execution_started" + ) { + const toolId = (msg as any).toolId; + if (toolId) { + ids.add(toolId); + } + } + } + return ids; + }, [initialMessages]); + + // Clean up local operation messages when their completed results arrive from polling + // This effect runs when completedToolIds changes (i.e., when polling brings new results) + useEffect( + function cleanupCompletedOperations() { + if (completedToolIds.size === 0) return; + + setMessages((prev) => { + const filtered = prev.filter((msg) => { + if ( + msg.type === "operation_started" || + msg.type === "operation_pending" || + msg.type === "operation_in_progress" + ) { + const toolId = (msg as any).toolId || (msg as any).toolCallId; + if (toolId && completedToolIds.has(toolId)) { + return false; // Remove - operation completed + } + } + return true; + }); + // Only update state if something was actually filtered + return filtered.length === prev.length ? prev : filtered; + }); + }, + [completedToolIds], ); + // Combine initial messages from backend with local streaming messages, + // then deduplicate to prevent duplicates when polling refreshes initialMessages + const allMessages = useMemo(() => { + const processedInitial = processInitialMessages(initialMessages); + + // Filter local messages to remove operation messages for completed tools + const filteredLocalMessages = messages.filter((msg) => { + if ( + msg.type === "operation_started" || + msg.type === "operation_pending" || + msg.type === "operation_in_progress" + ) { + const toolId = (msg as any).toolId || (msg as any).toolCallId; + if (toolId && completedToolIds.has(toolId)) { + return false; // Filter out - operation completed + } + } + return true; + }); + + const combined = [...processedInitial, ...filteredLocalMessages]; + + // Deduplicate by content+role+timestamp. When initialMessages is refreshed via polling, + // it may contain messages that are also in the local `messages` state. + // Including timestamp prevents dropping legitimate repeated messages (e.g., user sends "yes" twice) + const seen = new Set(); + return combined.filter((msg) => { + // Create a key based on type, role, content, and timestamp for deduplication + let key: string; + if (msg.type === "message") { + // Use timestamp (rounded to nearest second) to allow slight variations + // while still catching true duplicates from SSE/polling overlap + const ts = msg.timestamp + ? Math.floor(new Date(msg.timestamp).getTime() / 1000) + : ""; + key = `msg:${msg.role}:${ts}:${msg.content}`; + } else if (msg.type === "tool_call") { + key = `toolcall:${msg.toolId}`; + } else if ( + msg.type === "operation_started" || + msg.type === "operation_pending" || + msg.type === "operation_in_progress" + ) { + // Dedupe operation messages by toolId or operationId + key = `op:${(msg as any).toolId || (msg as any).operationId || (msg as any).toolCallId || ""}:${msg.toolName}`; + } else { + // For other types, use a combination of type and first few fields + key = `${msg.type}:${JSON.stringify(msg).slice(0, 100)}`; + } + if (seen.has(key)) { + return false; + } + seen.add(key); + return true; + }); + }, [initialMessages, messages]); + async function sendMessage( content: string, isUserMessage: boolean = true, diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/ChatMessage.tsx b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/ChatMessage.tsx index 29e3a60a8c..c922d0da76 100644 --- a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/ChatMessage.tsx +++ b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/ChatMessage.tsx @@ -16,6 +16,7 @@ import { AuthPromptWidget } from "../AuthPromptWidget/AuthPromptWidget"; import { ChatCredentialsSetup } from "../ChatCredentialsSetup/ChatCredentialsSetup"; import { ClarificationQuestionsWidget } from "../ClarificationQuestionsWidget/ClarificationQuestionsWidget"; import { ExecutionStartedMessage } from "../ExecutionStartedMessage/ExecutionStartedMessage"; +import { PendingOperationWidget } from "../PendingOperationWidget/PendingOperationWidget"; import { MarkdownContent } from "../MarkdownContent/MarkdownContent"; import { NoResultsMessage } from "../NoResultsMessage/NoResultsMessage"; import { ToolCallMessage } from "../ToolCallMessage/ToolCallMessage"; @@ -71,6 +72,9 @@ export function ChatMessage({ isLoginNeeded, isCredentialsNeeded, isClarificationNeeded, + isOperationStarted, + isOperationPending, + isOperationInProgress, } = useChatMessage(message); const displayContent = getDisplayContent(message, isUser); @@ -290,6 +294,42 @@ export function ChatMessage({ ); } + // Render operation_started messages (long-running background operations) + if (isOperationStarted && message.type === "operation_started") { + return ( + + ); + } + + // Render operation_pending messages (operations in progress when refreshing) + if (isOperationPending && message.type === "operation_pending") { + return ( + + ); + } + + // Render operation_in_progress messages (duplicate request while operation running) + if (isOperationInProgress && message.type === "operation_in_progress") { + return ( + + ); + } + // Render tool response messages (but skip agent_output if it's being rendered inside assistant message) if (isToolResponse && message.type === "tool_response") { return ( diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/useChatMessage.ts b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/useChatMessage.ts index 142b140c8b..d6526c78ab 100644 --- a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/useChatMessage.ts +++ b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatMessage/useChatMessage.ts @@ -61,6 +61,7 @@ export type ChatMessageData = } | { type: "agent_carousel"; + toolId: string; toolName: string; agents: Array<{ id: string; @@ -74,6 +75,7 @@ export type ChatMessageData = } | { type: "execution_started"; + toolId: string; toolName: string; executionId: string; agentName?: string; @@ -103,6 +105,29 @@ export type ChatMessageData = message: string; sessionId: string; timestamp?: string | Date; + } + | { + type: "operation_started"; + toolName: string; + toolId: string; + operationId: string; + message: string; + timestamp?: string | Date; + } + | { + type: "operation_pending"; + toolName: string; + toolId: string; + operationId: string; + message: string; + timestamp?: string | Date; + } + | { + type: "operation_in_progress"; + toolName: string; + toolCallId: string; + message: string; + timestamp?: string | Date; }; export function useChatMessage(message: ChatMessageData) { @@ -124,5 +149,8 @@ export function useChatMessage(message: ChatMessageData) { isExecutionStarted: message.type === "execution_started", isInputsNeeded: message.type === "inputs_needed", isClarificationNeeded: message.type === "clarification_needed", + isOperationStarted: message.type === "operation_started", + isOperationPending: message.type === "operation_pending", + isOperationInProgress: message.type === "operation_in_progress", }; } diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/components/ClarificationQuestionsWidget/ClarificationQuestionsWidget.tsx b/autogpt_platform/frontend/src/components/contextual/Chat/components/ClarificationQuestionsWidget/ClarificationQuestionsWidget.tsx index b2d3608254..a3bd17dd3f 100644 --- a/autogpt_platform/frontend/src/components/contextual/Chat/components/ClarificationQuestionsWidget/ClarificationQuestionsWidget.tsx +++ b/autogpt_platform/frontend/src/components/contextual/Chat/components/ClarificationQuestionsWidget/ClarificationQuestionsWidget.tsx @@ -30,6 +30,7 @@ export function ClarificationQuestionsWidget({ className, }: Props) { const [answers, setAnswers] = useState>({}); + const [isSubmitted, setIsSubmitted] = useState(false); function handleAnswerChange(keyword: string, value: string) { setAnswers((prev) => ({ ...prev, [keyword]: value })); @@ -41,11 +42,42 @@ export function ClarificationQuestionsWidget({ if (!allAnswered) { return; } + setIsSubmitted(true); onSubmitAnswers(answers); } const allAnswered = questions.every((q) => answers[q.keyword]?.trim()); + // Show submitted state after answers are submitted + if (isSubmitted) { + return ( +
+
+
+
+ +
+
+
+ + + Answers submitted + + + Processing your responses... + + +
+
+
+ ); + } + return (
"Creating Agent", "edit_agent" -> "Editing Agent" + if (toolName === "create_agent") return "Creating Agent"; + if (toolName === "edit_agent") return "Editing Agent"; + // Default: capitalize and format tool name + return toolName + .split("_") + .map((word) => word.charAt(0).toUpperCase() + word.slice(1)) + .join(" "); +} + +export function PendingOperationWidget({ + status, + message, + toolName, + className, +}: Props) { + const isPending = + status === "pending" || status === "started" || status === "in_progress"; + const isCompleted = status === "completed"; + const isError = status === "error"; + + const operationTitle = getOperationTitle(toolName); + + return ( +
+
+
+
+ {isPending && ( + + )} + {isCompleted && ( + + )} + {isError && ( + + )} +
+
+ +
+ +
+ + {isPending && operationTitle} + {isCompleted && `${operationTitle} Complete`} + {isError && `${operationTitle} Failed`} + + + {message} + +
+ + {isPending && ( + + Check your library in a few minutes. + + )} + + {toolName && ( + + Tool: {toolName} + + )} +
+
+
+
+ ); +} diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/useChatSession.ts b/autogpt_platform/frontend/src/components/contextual/Chat/useChatSession.ts index e02a15605b..3fe4f801c6 100644 --- a/autogpt_platform/frontend/src/components/contextual/Chat/useChatSession.ts +++ b/autogpt_platform/frontend/src/components/contextual/Chat/useChatSession.ts @@ -1,6 +1,7 @@ import { getGetV2GetSessionQueryKey, getGetV2GetSessionQueryOptions, + getGetV2ListSessionsQueryKey, postV2CreateSession, useGetV2GetSession, usePatchV2SessionAssignUser, @@ -102,6 +103,100 @@ export function useChatSession({ } }, [createError, loadError]); + // Check if there are any pending operations in the messages + // Must check all operation types: operation_pending, operation_started, operation_in_progress + const hasPendingOperations = useMemo(() => { + if (!messages || messages.length === 0) return false; + const pendingTypes = new Set([ + "operation_pending", + "operation_in_progress", + "operation_started", + ]); + return messages.some((msg) => { + if (msg.role !== "tool" || !msg.content) return false; + try { + const content = + typeof msg.content === "string" + ? JSON.parse(msg.content) + : msg.content; + return pendingTypes.has(content?.type); + } catch { + return false; + } + }); + }, [messages]); + + // Refresh sessions list when a pending operation completes + // (hasPendingOperations transitions from true to false) + const prevHasPendingOperationsRef = useRef(hasPendingOperations); + useEffect( + function refreshSessionsListOnOperationComplete() { + const wasHasPending = prevHasPendingOperationsRef.current; + prevHasPendingOperationsRef.current = hasPendingOperations; + + // Only invalidate when transitioning from pending to not pending + if (wasHasPending && !hasPendingOperations && sessionId) { + queryClient.invalidateQueries({ + queryKey: getGetV2ListSessionsQueryKey(), + }); + } + }, + [hasPendingOperations, sessionId, queryClient], + ); + + // Poll for updates when there are pending operations (long poll - 10s intervals with backoff) + const pollAttemptRef = useRef(0); + const hasPendingOperationsRef = useRef(hasPendingOperations); + hasPendingOperationsRef.current = hasPendingOperations; + + useEffect( + function pollForPendingOperations() { + if (!sessionId || !hasPendingOperations) { + pollAttemptRef.current = 0; + return; + } + + let cancelled = false; + let timeoutId: ReturnType | null = null; + + // Calculate delay with exponential backoff: 10s, 15s, 20s, 25s, 30s (max) + const baseDelay = 10000; + const maxDelay = 30000; + + function schedule() { + const delay = Math.min( + baseDelay + pollAttemptRef.current * 5000, + maxDelay, + ); + timeoutId = setTimeout(async () => { + if (cancelled) return; + console.info( + `[useChatSession] Polling for pending operation updates (attempt ${pollAttemptRef.current + 1})`, + ); + pollAttemptRef.current += 1; + try { + await refetch(); + } catch (err) { + console.error("[useChatSession] Poll failed:", err); + } finally { + // Continue polling if still pending and not cancelled + if (!cancelled && hasPendingOperationsRef.current) { + schedule(); + } + } + }, delay); + } + + schedule(); + + return () => { + cancelled = true; + if (timeoutId) clearTimeout(timeoutId); + }; + }, + [sessionId, hasPendingOperations, refetch], + ); + async function createSession() { try { setError(null); @@ -228,6 +323,7 @@ export function useChatSession({ isCreating, error, isSessionNotFound: isNotFoundError(loadError), + hasPendingOperations, createSession, loadSession, refreshSession,