From 2f87e13d175212852f552b445963ebb4aec5c54c Mon Sep 17 00:00:00 2001 From: Swifty Date: Wed, 5 Nov 2025 14:49:01 +0100 Subject: [PATCH] feat(platform): Chat system backend (#11230) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements foundational backend infrastructure for chat-based agent interaction system. Users will be able to discover, configure, and run marketplace agents through conversational AI. **Note:** Chat routes are behind a feature flag ### Changes 🏗️ **Core Chat System:** - Chat service with LLM orchestration (Claude 3.5 Sonnet, Haiku, GPT-4) - REST API routes for sessions and messages - Database layer for chat persistence - System prompts and configuration **5 Conversational Tools:** 1. `find_agent` - Search marketplace by keywords 2. `get_agent_details` - Fetch agent info, inputs, credentials 3. `get_required_setup_info` - Check user readiness, missing credentials 4. `run_agent` - Execute agents immediately 5. `setup_agent` - Configure scheduled execution with cron **Testing:** - 28 tests across chat tools (23 passing, 5 skipped for scheduler) - Test fixtures for simple, LLM, and Firecrawl agents - Service and data layer tests **Bug Fixes:** - Fixed `setup_agent.py` to create schedules instead of immediate execution - Fixed graph lookup to use UUID instead of username/slug - Fixed credential matching by provider/type instead of ID - Fixed internal tool calls to use `._execute()` instead of `.execute()` ### Checklist 📋 #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: - [x] All 28 chat tool tests pass (23 pass, 5 skip - require scheduler) - [x] Code formatting and linting pass - [x] Tool execution flow validated through unit tests - [x] Agent discovery, details, and execution tested - [x] Credential parsing and matching tested #### For configuration changes: - [x] `.env.default` is updated or already compatible with my changes - [x] `docker-compose.yml` is updated or already compatible with my changes - [x] I have included a list of my configuration changes in the PR description (under **Changes**) No configuration changes required - all existing settings compatible. --- .../autogpt_libs/auth/__init__.py | 8 +- .../autogpt_libs/auth/dependencies.py | 37 +- .../backend/backend/server/rest_api.py | 16 +- .../backend/backend/server/v2/chat/config.py | 114 +++++ .../backend/backend/server/v2/chat/model.py | 201 ++++++++ .../backend/server/v2/chat/model_test.py | 70 +++ .../server/v2/chat/prompts/chat_system.md | 76 +++ .../backend/server/v2/chat/response_model.py | 100 ++++ .../backend/backend/server/v2/chat/routes.py | 213 ++++++++ .../backend/backend/server/v2/chat/service.py | 472 ++++++++++++++++++ .../backend/server/v2/chat/service_test.py | 81 +++ .../backend/server/v2/chat/tools/__init__.py | 51 ++ .../server/v2/chat/tools/_test_data.py | 449 +++++++++++++++++ .../backend/server/v2/chat/tools/base.py | 118 +++++ .../server/v2/chat/tools/find_agent.py | 149 ++++++ .../server/v2/chat/tools/get_agent_details.py | 220 ++++++++ .../v2/chat/tools/get_agent_details_test.py | 309 ++++++++++++ .../v2/chat/tools/get_required_setup_info.py | 183 +++++++ .../tools/get_required_setup_info_test.py | 394 +++++++++++++++ .../backend/server/v2/chat/tools/models.py | 279 +++++++++++ .../backend/server/v2/chat/tools/run_agent.py | 241 +++++++++ .../server/v2/chat/tools/run_agent_test.py | 151 ++++++ .../server/v2/chat/tools/setup_agent.py | 376 ++++++++++++++ .../server/v2/chat/tools/setup_agent_test.py | 394 +++++++++++++++ .../backend/backend/util/exceptions.py | 6 + .../backend/backend/util/feature_flag.py | 70 ++- .../frontend/src/app/api/openapi.json | 212 +++++++- 27 files changed, 4985 insertions(+), 5 deletions(-) create mode 100644 autogpt_platform/backend/backend/server/v2/chat/config.py create mode 100644 autogpt_platform/backend/backend/server/v2/chat/model.py create mode 100644 autogpt_platform/backend/backend/server/v2/chat/model_test.py create mode 100644 autogpt_platform/backend/backend/server/v2/chat/prompts/chat_system.md create mode 100644 autogpt_platform/backend/backend/server/v2/chat/response_model.py create mode 100644 autogpt_platform/backend/backend/server/v2/chat/routes.py create mode 100644 autogpt_platform/backend/backend/server/v2/chat/service.py create mode 100644 autogpt_platform/backend/backend/server/v2/chat/service_test.py create mode 100644 autogpt_platform/backend/backend/server/v2/chat/tools/__init__.py create mode 100644 autogpt_platform/backend/backend/server/v2/chat/tools/_test_data.py create mode 100644 autogpt_platform/backend/backend/server/v2/chat/tools/base.py create mode 100644 autogpt_platform/backend/backend/server/v2/chat/tools/find_agent.py create mode 100644 autogpt_platform/backend/backend/server/v2/chat/tools/get_agent_details.py create mode 100644 autogpt_platform/backend/backend/server/v2/chat/tools/get_agent_details_test.py create mode 100644 autogpt_platform/backend/backend/server/v2/chat/tools/get_required_setup_info.py create mode 100644 autogpt_platform/backend/backend/server/v2/chat/tools/get_required_setup_info_test.py create mode 100644 autogpt_platform/backend/backend/server/v2/chat/tools/models.py create mode 100644 autogpt_platform/backend/backend/server/v2/chat/tools/run_agent.py create mode 100644 autogpt_platform/backend/backend/server/v2/chat/tools/run_agent_test.py create mode 100644 autogpt_platform/backend/backend/server/v2/chat/tools/setup_agent.py create mode 100644 autogpt_platform/backend/backend/server/v2/chat/tools/setup_agent_test.py diff --git a/autogpt_platform/autogpt_libs/autogpt_libs/auth/__init__.py b/autogpt_platform/autogpt_libs/autogpt_libs/auth/__init__.py index 5202ebc769..edf0f4c29d 100644 --- a/autogpt_platform/autogpt_libs/autogpt_libs/auth/__init__.py +++ b/autogpt_platform/autogpt_libs/autogpt_libs/auth/__init__.py @@ -1,5 +1,10 @@ from .config import verify_settings -from .dependencies import get_user_id, requires_admin_user, requires_user +from .dependencies import ( + get_optional_user_id, + get_user_id, + requires_admin_user, + requires_user, +) from .helpers import add_auth_responses_to_openapi from .models import User @@ -8,6 +13,7 @@ __all__ = [ "get_user_id", "requires_admin_user", "requires_user", + "get_optional_user_id", "add_auth_responses_to_openapi", "User", ] diff --git a/autogpt_platform/autogpt_libs/autogpt_libs/auth/dependencies.py b/autogpt_platform/autogpt_libs/autogpt_libs/auth/dependencies.py index ff280713ae..3fcecb3544 100644 --- a/autogpt_platform/autogpt_libs/autogpt_libs/auth/dependencies.py +++ b/autogpt_platform/autogpt_libs/autogpt_libs/auth/dependencies.py @@ -7,15 +7,50 @@ These are the high-level dependency functions used in route definitions. import logging import fastapi +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from .jwt_utils import get_jwt_payload, verify_user from .models import User -logger = logging.getLogger(__name__) +optional_bearer = HTTPBearer(auto_error=False) # Header name for admin impersonation IMPERSONATION_HEADER_NAME = "X-Act-As-User-Id" +logger = logging.getLogger(__name__) + + +def get_optional_user_id( + credentials: HTTPAuthorizationCredentials | None = fastapi.Security( + optional_bearer + ), +) -> str | None: + """ + Attempts to extract the user ID ("sub" claim) from a Bearer JWT if provided. + + This dependency allows for both authenticated and anonymous access. If a valid bearer token is + supplied, it parses the JWT and extracts the user ID. If the token is missing or invalid, it returns None, + treating the request as anonymous. + + Args: + credentials: Optional HTTPAuthorizationCredentials object from FastAPI Security dependency. + + Returns: + The user ID (str) extracted from the JWT "sub" claim, or None if no valid token is present. + """ + if not credentials: + return None + + try: + # Parse JWT token to get user ID + from autogpt_libs.auth.jwt_utils import parse_jwt_token + + payload = parse_jwt_token(credentials.credentials) + return payload.get("sub") + except Exception as e: + logger.debug(f"Auth token validation failed (anonymous access): {e}") + return None + async def requires_user(jwt_payload: dict = fastapi.Security(get_jwt_payload)) -> User: """ diff --git a/autogpt_platform/backend/backend/server/rest_api.py b/autogpt_platform/backend/backend/server/rest_api.py index 33f64918d9..8162b0ae3d 100644 --- a/autogpt_platform/backend/backend/server/rest_api.py +++ b/autogpt_platform/backend/backend/server/rest_api.py @@ -27,6 +27,7 @@ import backend.server.v2.admin.credit_admin_routes import backend.server.v2.admin.store_admin_routes import backend.server.v2.builder import backend.server.v2.builder.routes +import backend.server.v2.chat.routes as chat_routes import backend.server.v2.library.db import backend.server.v2.library.model import backend.server.v2.library.routes @@ -49,7 +50,12 @@ from backend.util.exceptions import ( NotAuthorizedError, NotFoundError, ) -from backend.util.feature_flag import initialize_launchdarkly, shutdown_launchdarkly +from backend.util.feature_flag import ( + Flag, + create_feature_flag_dependency, + initialize_launchdarkly, + shutdown_launchdarkly, +) from backend.util.service import UnhealthyServiceError settings = backend.util.settings.Settings() @@ -284,6 +290,14 @@ app.include_router( tags=["v1", "email"], prefix="/api/email", ) +app.include_router( + chat_routes.router, + tags=["v2", "chat"], + prefix="/api/chat", + dependencies=[ + fastapi.Depends(create_feature_flag_dependency(Flag.CHAT, default=False)) + ], +) app.mount("/external-api", external_app) diff --git a/autogpt_platform/backend/backend/server/v2/chat/config.py b/autogpt_platform/backend/backend/server/v2/chat/config.py new file mode 100644 index 0000000000..6e864d68b1 --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/config.py @@ -0,0 +1,114 @@ +"""Configuration management for chat system.""" + +import os +from pathlib import Path + +from pydantic import Field, field_validator +from pydantic_settings import BaseSettings + + +class ChatConfig(BaseSettings): + """Configuration for the chat system.""" + + # OpenAI API Configuration + model: str = Field( + default="qwen/qwen3-235b-a22b-2507", description="Default model to use" + ) + api_key: str | None = Field(default=None, description="OpenAI API key") + base_url: str | None = Field( + default="https://openrouter.ai/api/v1", + description="Base URL for API (e.g., for OpenRouter)", + ) + + # Session TTL Configuration - 12 hours + session_ttl: int = Field(default=43200, description="Session TTL in seconds") + + # System Prompt Configuration + system_prompt_path: str = Field( + default="prompts/chat_system.md", + description="Path to system prompt file relative to chat module", + ) + + # Streaming Configuration + max_context_messages: int = Field( + default=50, ge=1, le=200, description="Maximum context messages" + ) + + stream_timeout: int = Field(default=300, description="Stream timeout in seconds") + max_retries: int = Field(default=3, description="Maximum number of retries") + + @field_validator("api_key", mode="before") + @classmethod + def get_api_key(cls, v): + """Get API key from environment if not provided.""" + if v is None: + # Try to get from environment variables + # First check for CHAT_API_KEY (Pydantic prefix) + v = os.getenv("CHAT_API_KEY") + if not v: + # Fall back to OPEN_ROUTER_API_KEY + v = os.getenv("OPEN_ROUTER_API_KEY") + if not v: + # Fall back to OPENAI_API_KEY + v = os.getenv("OPENAI_API_KEY") + return v + + @field_validator("base_url", mode="before") + @classmethod + def get_base_url(cls, v): + """Get base URL from environment if not provided.""" + if v is None: + # Check for OpenRouter or custom base URL + v = os.getenv("CHAT_BASE_URL") + if not v: + v = os.getenv("OPENROUTER_BASE_URL") + if not v: + v = os.getenv("OPENAI_BASE_URL") + if not v: + v = "https://openrouter.ai/api/v1" + return v + + def get_system_prompt(self, **template_vars) -> str: + """Load and render the system prompt from file. + + Args: + **template_vars: Variables to substitute in the template + + Returns: + Rendered system prompt string + + """ + # Get the path relative to this module + module_dir = Path(__file__).parent + prompt_path = module_dir / self.system_prompt_path + + # Check for .j2 extension first (Jinja2 template) + j2_path = Path(str(prompt_path) + ".j2") + if j2_path.exists(): + try: + from jinja2 import Template + + template = Template(j2_path.read_text()) + return template.render(**template_vars) + except ImportError: + # Jinja2 not installed, fall back to reading as plain text + return j2_path.read_text() + + # Check for markdown file + if prompt_path.exists(): + content = prompt_path.read_text() + + # Simple variable substitution if Jinja2 is not available + for key, value in template_vars.items(): + placeholder = f"{{{key}}}" + content = content.replace(placeholder, str(value)) + + return content + raise FileNotFoundError(f"System prompt file not found: {prompt_path}") + + class Config: + """Pydantic config.""" + + env_file = ".env" + env_file_encoding = "utf-8" + extra = "ignore" # Ignore extra environment variables diff --git a/autogpt_platform/backend/backend/server/v2/chat/model.py b/autogpt_platform/backend/backend/server/v2/chat/model.py new file mode 100644 index 0000000000..d9cc525d38 --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/model.py @@ -0,0 +1,201 @@ +import logging +import uuid +from datetime import UTC, datetime + +from openai.types.chat import ( + ChatCompletionAssistantMessageParam, + ChatCompletionDeveloperMessageParam, + ChatCompletionFunctionMessageParam, + ChatCompletionMessageParam, + ChatCompletionSystemMessageParam, + ChatCompletionToolMessageParam, + ChatCompletionUserMessageParam, +) +from openai.types.chat.chat_completion_assistant_message_param import FunctionCall +from openai.types.chat.chat_completion_message_tool_call_param import ( + ChatCompletionMessageToolCallParam, + Function, +) +from pydantic import BaseModel + +from backend.data.redis_client import get_redis_async +from backend.server.v2.chat.config import ChatConfig +from backend.util.exceptions import RedisError + +logger = logging.getLogger(__name__) +config = ChatConfig() + + +class ChatMessage(BaseModel): + role: str + content: str + name: str | None = None + tool_call_id: str | None = None + refusal: str | None = None + tool_calls: list[dict] | None = None + function_call: dict | None = None + + +class Usage(BaseModel): + prompt_tokens: int + completion_tokens: int + total_tokens: int + + +class ChatSession(BaseModel): + session_id: str + user_id: str | None + messages: list[ChatMessage] + usage: list[Usage] + credentials: dict[str, dict] = {} # Map of provider -> credential metadata + started_at: datetime + updated_at: datetime + + @staticmethod + def new(user_id: str | None) -> "ChatSession": + return ChatSession( + session_id=str(uuid.uuid4()), + user_id=user_id, + messages=[], + usage=[], + credentials={}, + started_at=datetime.now(UTC), + updated_at=datetime.now(UTC), + ) + + def to_openai_messages(self) -> list[ChatCompletionMessageParam]: + messages = [] + for message in self.messages: + if message.role == "developer": + m = ChatCompletionDeveloperMessageParam( + role="developer", + content=message.content, + ) + if message.name: + m["name"] = message.name + messages.append(m) + elif message.role == "system": + m = ChatCompletionSystemMessageParam( + role="system", + content=message.content, + ) + if message.name: + m["name"] = message.name + messages.append(m) + elif message.role == "user": + m = ChatCompletionUserMessageParam( + role="user", + content=message.content, + ) + if message.name: + m["name"] = message.name + messages.append(m) + elif message.role == "assistant": + m = ChatCompletionAssistantMessageParam( + role="assistant", + content=message.content, + ) + if message.function_call: + m["function_call"] = FunctionCall( + arguments=message.function_call["arguments"], + name=message.function_call["name"], + ) + if message.refusal: + m["refusal"] = message.refusal + if message.tool_calls: + t: list[ChatCompletionMessageToolCallParam] = [] + for tool_call in message.tool_calls: + # Tool calls are stored with nested structure: {id, type, function: {name, arguments}} + function_data = tool_call.get("function", {}) + + # Skip tool calls that are missing required fields + if "id" not in tool_call or "name" not in function_data: + logger.warning( + f"Skipping invalid tool call: missing required fields. " + f"Got: {tool_call.keys()}, function keys: {function_data.keys()}" + ) + continue + + # Arguments are stored as a JSON string + arguments_str = function_data.get("arguments", "{}") + + t.append( + ChatCompletionMessageToolCallParam( + id=tool_call["id"], + type="function", + function=Function( + arguments=arguments_str, + name=function_data["name"], + ), + ) + ) + m["tool_calls"] = t + if message.name: + m["name"] = message.name + messages.append(m) + elif message.role == "tool": + messages.append( + ChatCompletionToolMessageParam( + role="tool", + content=message.content, + tool_call_id=message.tool_call_id or "", + ) + ) + elif message.role == "function": + messages.append( + ChatCompletionFunctionMessageParam( + role="function", + content=message.content, + name=message.name or "", + ) + ) + return messages + + +async def get_chat_session( + session_id: str, + user_id: str | None, +) -> ChatSession | None: + """Get a chat session by ID.""" + redis_key = f"chat:session:{session_id}" + async_redis = await get_redis_async() + + raw_session: bytes | None = await async_redis.get(redis_key) + + if raw_session is None: + logger.warning(f"Session {session_id} not found in Redis") + return None + + try: + session = ChatSession.model_validate_json(raw_session) + except Exception as e: + logger.error(f"Failed to deserialize session {session_id}: {e}", exc_info=True) + raise RedisError(f"Corrupted session data for {session_id}") from e + + if session.user_id is not None and session.user_id != user_id: + logger.warning( + f"Session {session_id} user id mismatch: {session.user_id} != {user_id}" + ) + return None + + return session + + +async def upsert_chat_session( + session: ChatSession, +) -> ChatSession: + """Update a chat session with the given messages.""" + + redis_key = f"chat:session:{session.session_id}" + + async_redis = await get_redis_async() + resp = await async_redis.setex( + redis_key, config.session_ttl, session.model_dump_json() + ) + + if not resp: + raise RedisError( + f"Failed to persist chat session {session.session_id} to Redis: {resp}" + ) + + return session diff --git a/autogpt_platform/backend/backend/server/v2/chat/model_test.py b/autogpt_platform/backend/backend/server/v2/chat/model_test.py new file mode 100644 index 0000000000..f9c79b331b --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/model_test.py @@ -0,0 +1,70 @@ +import pytest + +from backend.server.v2.chat.model import ( + ChatMessage, + ChatSession, + Usage, + get_chat_session, + upsert_chat_session, +) + +messages = [ + ChatMessage(content="Hello, how are you?", role="user"), + ChatMessage( + content="I'm fine, thank you!", + role="assistant", + tool_calls=[ + { + "id": "t123", + "type": "function", + "function": { + "name": "get_weather", + "arguments": '{"city": "New York"}', + }, + } + ], + ), + ChatMessage( + content="I'm using the tool to get the weather", + role="tool", + tool_call_id="t123", + ), +] + + +@pytest.mark.asyncio(loop_scope="session") +async def test_chatsession_serialization_deserialization(): + s = ChatSession.new(user_id="abc123") + s.messages = messages + s.usage = [Usage(prompt_tokens=100, completion_tokens=200, total_tokens=300)] + serialized = s.model_dump_json() + s2 = ChatSession.model_validate_json(serialized) + assert s2.model_dump() == s.model_dump() + + +@pytest.mark.asyncio(loop_scope="session") +async def test_chatsession_redis_storage(): + + s = ChatSession.new(user_id=None) + s.messages = messages + + s = await upsert_chat_session(s) + + s2 = await get_chat_session( + session_id=s.session_id, + user_id=s.user_id, + ) + + assert s2 == s + + +@pytest.mark.asyncio(loop_scope="session") +async def test_chatsession_redis_storage_user_id_mismatch(): + + s = ChatSession.new(user_id="abc123") + s.messages = messages + s = await upsert_chat_session(s) + + s2 = await get_chat_session(s.session_id, None) + + assert s2 is None diff --git a/autogpt_platform/backend/backend/server/v2/chat/prompts/chat_system.md b/autogpt_platform/backend/backend/server/v2/chat/prompts/chat_system.md new file mode 100644 index 0000000000..79c6308259 --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/prompts/chat_system.md @@ -0,0 +1,76 @@ +# AutoGPT Agent Setup Assistant +Your name is Otto. +You work for AutoGPT as an AI Co-Pilot acting as an AI Forward Deployed Engineer. +You were made by AutoGPT. + +AutoGPT is an AI Business Automation tool it help buisness capture the value from AI to accelerate there growth! + +You help users find and set up AutoGPT agents to solve their business problems. **Bias toward action** - move quickly to get agents running. + +## THE FLOW (Always Follow This Order) + +1. **find_agent** → Search for agents that solve their problem +2. **get_agent_details** → Get comprehensive info about chosen agent +3. **get_required_setup_info** → Verify user has required credentials (MANDATORY before next step) +4. **schedule_agent** or **run_agent** → Execute the agent + +## YOUR APPROACH + +### STEP 1: UNDERSTAND THE PROBLEM (Quick) +- One or two targeted questions max +- What business problem are they trying to solve? +- Move quickly to searching for solutions + +### STEP 2: FIND AGENTS +- Use `find_agent` immediately with relevant keywords +- Suggest the best option based on what you know +- Explain briefly how it solves their problem +- Ask them if they would like to use it, if they do move to step 3 + +### STEP 3: GET DETAILS +- Use `get_agent_details` on their chosen agent +- Explain what the agent does and its requirements +- Keep explanations brief and outcome-focused + +### STEP 4: VERIFY SETUP (CRITICAL) +- **ALWAYS** use `get_required_setup_info` before proceeding +- This checks if user has all required credentials +- Tell user what credentials they need (if any) +- Explain credentials are added via the frontend interface + +### STEP 5: EXECUTE +<<<<<<< Updated upstream +- Once credentials verified, use `schedule_agent` for scheduled and tirggered runs OR `run_agent` for immediate execution +======= +- Once credentials verified, use `schedule_agent` for scheduled runs OR `run_agent` for immediate execution +>>>>>>> Stashed changes +- Confirm successful setup/run +- Provide clear next steps + +## KEY RULES + +### What You DON'T Do: +- Don't help with login (frontend handles this) +- Don't help add credentials (frontend handles this) +- Don't skip `get_required_setup_info` (it's mandatory) +- Don't over-explain technical details + +### What You DO: +- Act fast - get to agent discovery quickly +- Use tools proactively without asking permission +- Keep explanations short and business-focused +- Always verify credentials before setup/run +- Focus on outcomes and value + +### Error Handling: +- If authentication needed → Tell user to sign in via the interface +- If credentials missing → Tell user what's needed and where to add them in the frontend +- If setup fails → Identify issue, provide clear fix + +## SUCCESS LOOKS LIKE: +- User has an agent running within minutes +- User understands what their agent does +- User knows how to use their agent going forward +- Minimal back-and-forth, maximum action + +**Remember: Speed to value. Find agent → Get details → Verify credentials → Run. Keep it simple, keep it moving.** \ No newline at end of file diff --git a/autogpt_platform/backend/backend/server/v2/chat/response_model.py b/autogpt_platform/backend/backend/server/v2/chat/response_model.py new file mode 100644 index 0000000000..7ca335ff85 --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/response_model.py @@ -0,0 +1,100 @@ +from enum import Enum +from typing import Any + +from pydantic import BaseModel, Field + + +class ResponseType(str, Enum): + """Types of streaming responses.""" + + TEXT_CHUNK = "text_chunk" + TEXT_ENDED = "text_ended" + TOOL_CALL = "tool_call" + TOOL_CALL_START = "tool_call_start" + TOOL_RESPONSE = "tool_response" + ERROR = "error" + USAGE = "usage" + STREAM_END = "stream_end" + + +class StreamBaseResponse(BaseModel): + """Base response model for all streaming responses.""" + + type: ResponseType + timestamp: str | None = None + + def to_sse(self) -> str: + """Convert to SSE format.""" + return f"data: {self.model_dump_json()}\n\n" + + +class StreamTextChunk(StreamBaseResponse): + """Streaming text content from the assistant.""" + + type: ResponseType = ResponseType.TEXT_CHUNK + content: str = Field(..., description="Text content chunk") + + +class StreamToolCallStart(StreamBaseResponse): + """Tool call started notification.""" + + type: ResponseType = ResponseType.TOOL_CALL_START + tool_id: str = Field(..., description="Unique tool call ID") + + +class StreamToolCall(StreamBaseResponse): + """Tool invocation notification.""" + + type: ResponseType = ResponseType.TOOL_CALL + tool_id: str = Field(..., description="Unique tool call ID") + tool_name: str = Field(..., description="Name of the tool being called") + arguments: dict[str, Any] = Field( + default_factory=dict, description="Tool arguments" + ) + + +class StreamToolExecutionResult(StreamBaseResponse): + """Tool execution result.""" + + type: ResponseType = ResponseType.TOOL_RESPONSE + tool_id: str = Field(..., description="Tool call ID this responds to") + tool_name: str = Field(..., description="Name of the tool that was executed") + result: str | dict[str, Any] = Field(..., description="Tool execution result") + success: bool = Field( + default=True, description="Whether the tool execution succeeded" + ) + + +class StreamUsage(StreamBaseResponse): + """Token usage statistics.""" + + type: ResponseType = ResponseType.USAGE + prompt_tokens: int + completion_tokens: int + total_tokens: int + + +class StreamError(StreamBaseResponse): + """Error response.""" + + type: ResponseType = ResponseType.ERROR + message: str = Field(..., description="Error message") + code: str | None = Field(default=None, description="Error code") + details: dict[str, Any] | None = Field( + default=None, description="Additional error details" + ) + + +class StreamTextEnded(StreamBaseResponse): + """Text streaming completed marker.""" + + type: ResponseType = ResponseType.TEXT_ENDED + + +class StreamEnd(StreamBaseResponse): + """End of stream marker.""" + + type: ResponseType = ResponseType.STREAM_END + summary: dict[str, Any] | None = Field( + default=None, description="Stream summary statistics" + ) diff --git a/autogpt_platform/backend/backend/server/v2/chat/routes.py b/autogpt_platform/backend/backend/server/v2/chat/routes.py new file mode 100644 index 0000000000..4a00304b55 --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/routes.py @@ -0,0 +1,213 @@ +"""Chat API routes for chat session management and streaming via SSE.""" + +import logging +from collections.abc import AsyncGenerator +from typing import Annotated + +from autogpt_libs import auth +from fastapi import APIRouter, Depends, Query, Security +from fastapi.responses import StreamingResponse +from pydantic import BaseModel + +import backend.server.v2.chat.service as chat_service +from backend.server.v2.chat.config import ChatConfig +from backend.util.exceptions import NotFoundError + +config = ChatConfig() + + +logger = logging.getLogger(__name__) + +router = APIRouter( + tags=["chat"], +) + +# ========== Request/Response Models ========== + + +class CreateSessionResponse(BaseModel): + """Response model containing information on a newly created chat session.""" + + id: str + created_at: str + user_id: str | None + + +class SessionDetailResponse(BaseModel): + """Response model providing complete details for a chat session, including messages.""" + + id: str + created_at: str + updated_at: str + user_id: str | None + messages: list[dict] + + +# ========== Routes ========== + + +@router.post( + "/sessions", +) +async def create_session( + user_id: Annotated[str | None, Depends(auth.get_optional_user_id)], +) -> CreateSessionResponse: + """ + Create a new chat session. + + Initiates a new chat session for either an authenticated or anonymous user. + + Args: + user_id: The optional authenticated user ID parsed from the JWT. If missing, creates an anonymous session. + + Returns: + CreateSessionResponse: Details of the created session. + + """ + logger.info(f"Creating session with user_id: {user_id}") + + session = await chat_service.create_chat_session(user_id) + + return CreateSessionResponse( + id=session.session_id, + created_at=session.started_at.isoformat(), + user_id=session.user_id or None, + ) + + +@router.get( + "/sessions/{session_id}", +) +async def get_session( + session_id: str, + user_id: Annotated[str | None, Depends(auth.get_optional_user_id)], +) -> SessionDetailResponse: + """ + Retrieve the details of a specific chat session. + + Looks up a chat session by ID for the given user (if authenticated) and returns all session data including messages. + + Args: + session_id: The unique identifier for the desired chat session. + user_id: The optional authenticated user ID, or None for anonymous access. + + Returns: + SessionDetailResponse: Details for the requested session; raises NotFoundError if not found. + + """ + session = await chat_service.get_session(session_id, user_id) + if not session: + raise NotFoundError(f"Session {session_id} not found") + return SessionDetailResponse( + id=session.session_id, + created_at=session.started_at.isoformat(), + updated_at=session.updated_at.isoformat(), + user_id=session.user_id or None, + messages=[message.model_dump() for message in session.messages], + ) + + +@router.get( + "/sessions/{session_id}/stream", +) +async def stream_chat( + session_id: str, + message: Annotated[str, Query(min_length=1, max_length=10000)], + user_id: str | None = Depends(auth.get_optional_user_id), + is_user_message: bool = Query(default=True), +): + """ + Stream chat responses for a session. + + Streams the AI/completion responses in real time over Server-Sent Events (SSE), including: + - Text fragments as they are generated + - Tool call UI elements (if invoked) + - Tool execution results + + Args: + session_id: The chat session identifier to associate with the streamed messages. + message: The user's new message to process. + user_id: Optional authenticated user ID. + is_user_message: Whether the message is a user message. + Returns: + StreamingResponse: SSE-formatted response chunks. + + """ + # Validate session exists before starting the stream + # This prevents errors after the response has already started + session = await chat_service.get_session(session_id, user_id) + + if not session: + raise NotFoundError(f"Session {session_id} not found. ") + if session.user_id is None and user_id is not None: + session = await chat_service.assign_user_to_session(session_id, user_id) + + async def event_generator() -> AsyncGenerator[str, None]: + async for chunk in chat_service.stream_chat_completion( + session_id, message, is_user_message=is_user_message, user_id=user_id + ): + with open("chunks.log", "a") as f: + f.write(f"{session_id}: {chunk}\n") + yield chunk.to_sse() + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", # Disable nginx buffering + }, + ) + + +@router.patch( + "/sessions/{session_id}/assign-user", + dependencies=[Security(auth.requires_user)], + status_code=200, +) +async def session_assign_user( + session_id: str, + user_id: Annotated[str, Security(auth.get_user_id)], +) -> dict: + """ + Assign an authenticated user to a chat session. + + Used (typically post-login) to claim an existing anonymous session as the current authenticated user. + + Args: + session_id: The identifier for the (previously anonymous) session. + user_id: The authenticated user's ID to associate with the session. + + Returns: + dict: Status of the assignment. + + """ + await chat_service.assign_user_to_session(session_id, user_id) + return {"status": "ok"} + + +# ========== Health Check ========== + + +@router.get("/health", status_code=200) +async def health_check() -> dict: + """ + Health check endpoint for the chat service. + + Performs a full cycle test of session creation, assignment, and retrieval. Should always return healthy + if the service and data layer are operational. + + Returns: + dict: A status dictionary indicating health, service name, and API version. + + """ + session = await chat_service.create_chat_session(None) + await chat_service.assign_user_to_session(session.session_id, "test_user") + await chat_service.get_session(session.session_id, "test_user") + + return { + "status": "healthy", + "service": "chat", + "version": "0.1.0", + } diff --git a/autogpt_platform/backend/backend/server/v2/chat/service.py b/autogpt_platform/backend/backend/server/v2/chat/service.py new file mode 100644 index 0000000000..082e84fe3b --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/service.py @@ -0,0 +1,472 @@ +import logging +from collections.abc import AsyncGenerator +from datetime import UTC, datetime +from typing import Any + +import orjson +from openai import AsyncOpenAI +from openai.types.chat import ChatCompletionChunk, ChatCompletionToolParam + +import backend.server.v2.chat.config +from backend.server.v2.chat.model import ( + ChatMessage, + ChatSession, + Usage, + get_chat_session, + upsert_chat_session, +) +from backend.server.v2.chat.response_model import ( + StreamBaseResponse, + StreamEnd, + StreamError, + StreamTextChunk, + StreamTextEnded, + StreamToolCall, + StreamToolCallStart, + StreamToolExecutionResult, + StreamUsage, +) +from backend.server.v2.chat.tools import execute_tool, tools +from backend.util.exceptions import NotFoundError + +logger = logging.getLogger(__name__) + +config = backend.server.v2.chat.config.ChatConfig() +client = AsyncOpenAI(api_key=config.api_key, base_url=config.base_url) + + +async def create_chat_session( + user_id: str | None = None, +) -> ChatSession: + """ + Create a new chat session and persist it to the database. + """ + session = ChatSession.new(user_id) + # Persist the session immediately so it can be used for streaming + return await upsert_chat_session(session) + + +async def get_session( + session_id: str, + user_id: str | None = None, +) -> ChatSession | None: + """ + Get a chat session by ID. + """ + return await get_chat_session(session_id, user_id) + + +async def assign_user_to_session( + session_id: str, + user_id: str, +) -> ChatSession: + """ + Assign a user to a chat session. + """ + session = await get_chat_session(session_id, None) + if not session: + raise NotFoundError(f"Session {session_id} not found") + session.user_id = user_id + return await upsert_chat_session(session) + + +async def stream_chat_completion( + session_id: str, + message: str | None = None, + is_user_message: bool = True, + user_id: str | None = None, + retry_count: int = 0, +) -> AsyncGenerator[StreamBaseResponse, None]: + """Main entry point for streaming chat completions with database handling. + + This function handles all database operations and delegates streaming + to the internal _stream_chat_chunks function. + + Args: + session_id: Chat session ID + user_message: User's input message + user_id: User ID for authentication (None for anonymous) + + Yields: + StreamBaseResponse objects formatted as SSE + + Raises: + NotFoundError: If session_id is invalid + ValueError: If max_context_messages is exceeded + + """ + logger.info( + f"Streaming chat completion for session {session_id} for message {message} and user id {user_id}. Message is user message: {is_user_message}" + ) + + session = await get_chat_session(session_id, user_id) + + if not session: + raise NotFoundError( + f"Session {session_id} not found. Please create a new session first." + ) + + if message: + session.messages.append( + ChatMessage( + role="user" if is_user_message else "assistant", content=message + ) + ) + + if len(session.messages) > config.max_context_messages: + raise ValueError(f"Max messages exceeded: {config.max_context_messages}") + + logger.info( + f"Upserting session: {session.session_id} with user id {session.user_id}" + ) + session = await upsert_chat_session(session) + assert session, "Session not found" + + assistant_response = ChatMessage( + role="assistant", + content="", + ) + + has_yielded_end = False + has_yielded_error = False + has_done_tool_call = False + has_received_text = False + text_streaming_ended = False + messages_to_add: list[ChatMessage] = [] + should_retry = False + + try: + async for chunk in _stream_chat_chunks( + session=session, + tools=tools, + ): + + if isinstance(chunk, StreamTextChunk): + assistant_response.content += chunk.content + has_received_text = True + yield chunk + elif isinstance(chunk, StreamToolCallStart): + # Emit text_ended before first tool call, but only if we've received text + if has_received_text and not text_streaming_ended: + yield StreamTextEnded() + text_streaming_ended = True + yield chunk + elif isinstance(chunk, StreamToolCall): + # Just pass on the tool call notification + pass + elif isinstance(chunk, StreamToolExecutionResult): + result_content = ( + chunk.result + if isinstance(chunk.result, str) + else orjson.dumps(chunk.result).decode("utf-8") + ) + messages_to_add.append( + ChatMessage( + role="tool", + content=result_content, + tool_call_id=chunk.tool_id, + ) + ) + has_done_tool_call = True + # Track if any tool execution failed + if not chunk.success: + logger.warning( + f"Tool {chunk.tool_name} (ID: {chunk.tool_id}) execution failed" + ) + yield chunk + elif isinstance(chunk, StreamEnd): + if not has_done_tool_call: + has_yielded_end = True + yield chunk + elif isinstance(chunk, StreamError): + has_yielded_error = True + elif isinstance(chunk, StreamUsage): + session.usage.append( + Usage( + prompt_tokens=chunk.prompt_tokens, + completion_tokens=chunk.completion_tokens, + total_tokens=chunk.total_tokens, + ) + ) + else: + logger.error(f"Unknown chunk type: {type(chunk)}", exc_info=True) + except Exception as e: + logger.error(f"Error during stream: {e!s}", exc_info=True) + + # Check if this is a retryable error (JSON parsing, incomplete tool calls, etc.) + is_retryable = isinstance(e, (orjson.JSONDecodeError, KeyError, TypeError)) + + if is_retryable and retry_count < config.max_retries: + logger.info( + f"Retryable error encountered. Attempt {retry_count + 1}/{config.max_retries}" + ) + should_retry = True + else: + # Non-retryable error or max retries exceeded + # Save any partial progress before reporting error + if assistant_response.content or assistant_response.tool_calls: + messages_to_add.append(assistant_response) + session.messages.extend(messages_to_add) + await upsert_chat_session(session) + + if not has_yielded_error: + error_message = str(e) + if not is_retryable: + error_message = f"Non-retryable error: {error_message}" + elif retry_count >= config.max_retries: + error_message = ( + f"Max retries ({config.max_retries}) exceeded: {error_message}" + ) + + error_response = StreamError( + message=error_message, + timestamp=datetime.now(UTC).isoformat(), + ) + yield error_response + if not has_yielded_end: + yield StreamEnd( + timestamp=datetime.now(UTC).isoformat(), + ) + return + + # Handle retry outside of exception handler to avoid nesting + if should_retry and retry_count < config.max_retries: + logger.info( + f"Retrying stream_chat_completion for session {session_id}, attempt {retry_count + 1}" + ) + async for chunk in stream_chat_completion( + session_id=session.session_id, + user_id=user_id, + retry_count=retry_count + 1, + ): + yield chunk + return # Exit after retry to avoid double-saving in finally block + + # Normal completion path - save session and handle tool call continuation + logger.info( + f"Upserting session: {session.session_id} with user id {session.user_id}" + ) + # Only append assistant response if it has content or tool calls + # to avoid saving empty messages on errors + if assistant_response.content or assistant_response.tool_calls: + messages_to_add.append(assistant_response) + session.messages.extend(messages_to_add) + await upsert_chat_session(session) + + # If we did a tool call, stream the chat completion again to get the next response + if has_done_tool_call: + logger.info( + "Tool call executed, streaming chat completion again to get assistant response" + ) + async for chunk in stream_chat_completion( + session_id=session.session_id, user_id=user_id + ): + yield chunk + + +async def _stream_chat_chunks( + session: ChatSession, + tools: list[ChatCompletionToolParam], +) -> AsyncGenerator[StreamBaseResponse, None]: + """ + Pure streaming function for OpenAI chat completions with tool calling. + + This function is database-agnostic and focuses only on streaming logic. + + Args: + messages: Conversation context as ChatCompletionMessageParam list + session_id: Session ID + user_id: User ID for tool execution + + Yields: + SSE formatted JSON response objects + + """ + model = config.model + + logger.info("Starting pure chat stream") + + # Loop to handle tool calls and continue conversation + while True: + try: + logger.info("Creating OpenAI chat completion stream...") + + # Create the stream with proper types + stream = await client.chat.completions.create( + model=model, + messages=session.to_openai_messages(), + tools=tools, + tool_choice="auto", + stream=True, + ) + + # Variables to accumulate tool calls + tool_calls: list[dict[str, Any]] = [] + active_tool_call_idx: int | None = None + finish_reason: str | None = None + # Track which tool call indices have had their start event emitted + emitted_start_for_idx: set[int] = set() + + # Process the stream + chunk: ChatCompletionChunk + async for chunk in stream: + if chunk.usage: + yield StreamUsage( + prompt_tokens=chunk.usage.prompt_tokens, + completion_tokens=chunk.usage.completion_tokens, + total_tokens=chunk.usage.total_tokens, + ) + + if chunk.choices: + choice = chunk.choices[0] + delta = choice.delta + + # Capture finish reason + if choice.finish_reason: + finish_reason = choice.finish_reason + logger.info(f"Finish reason: {finish_reason}") + + # Handle content streaming + if delta.content: + # Stream the text chunk + text_response = StreamTextChunk( + content=delta.content, + timestamp=datetime.now(UTC).isoformat(), + ) + yield text_response + + # Handle tool calls + if delta.tool_calls: + for tc_chunk in delta.tool_calls: + idx = tc_chunk.index + + # Update active tool call index if needed + if ( + active_tool_call_idx is None + or active_tool_call_idx != idx + ): + active_tool_call_idx = idx + + # Ensure we have a tool call object at this index + while len(tool_calls) <= idx: + tool_calls.append( + { + "id": "", + "type": "function", + "function": { + "name": "", + "arguments": "", + }, + }, + ) + + # Accumulate the tool call data + if tc_chunk.id: + tool_calls[idx]["id"] = tc_chunk.id + if tc_chunk.function: + if tc_chunk.function.name: + tool_calls[idx]["function"][ + "name" + ] = tc_chunk.function.name + if tc_chunk.function.arguments: + tool_calls[idx]["function"][ + "arguments" + ] += tc_chunk.function.arguments + + # Emit StreamToolCallStart only after we have the tool call ID + if ( + idx not in emitted_start_for_idx + and tool_calls[idx]["id"] + ): + yield StreamToolCallStart( + tool_id=tool_calls[idx]["id"], + timestamp=datetime.now(UTC).isoformat(), + ) + emitted_start_for_idx.add(idx) + logger.info(f"Stream complete. Finish reason: {finish_reason}") + + # Yield all accumulated tool calls after the stream is complete + # This ensures all tool call arguments have been fully received + for idx, tool_call in enumerate(tool_calls): + try: + async for tc in _yield_tool_call(tool_calls, idx, session): + yield tc + except (orjson.JSONDecodeError, KeyError, TypeError) as e: + logger.error( + f"Failed to parse tool call {idx}: {e}", + exc_info=True, + extra={"tool_call": tool_call}, + ) + yield StreamError( + message=f"Invalid tool call arguments for tool {tool_call.get('function', {}).get('name', 'unknown')}: {e}", + timestamp=datetime.now(UTC).isoformat(), + ) + # Re-raise to trigger retry logic in the parent function + raise + + yield StreamEnd( + timestamp=datetime.now(UTC).isoformat(), + ) + return + except Exception as e: + logger.error(f"Error in stream: {e!s}", exc_info=True) + error_response = StreamError( + message=str(e), + timestamp=datetime.now(UTC).isoformat(), + ) + yield error_response + yield StreamEnd( + timestamp=datetime.now(UTC).isoformat(), + ) + return + + +async def _yield_tool_call( + tool_calls: list[dict[str, Any]], + yield_idx: int, + session: ChatSession, +) -> AsyncGenerator[StreamBaseResponse, None]: + """ + Yield a tool call and its execution result. + + Raises: + orjson.JSONDecodeError: If tool call arguments cannot be parsed as JSON + KeyError: If expected tool call fields are missing + TypeError: If tool call structure is invalid + """ + logger.info(f"Yielding tool call: {tool_calls[yield_idx]}") + + # Parse tool call arguments - exceptions will propagate to caller + arguments = orjson.loads(tool_calls[yield_idx]["function"]["arguments"]) + + yield StreamToolCall( + tool_id=tool_calls[yield_idx]["id"], + tool_name=tool_calls[yield_idx]["function"]["name"], + arguments=arguments, + timestamp=datetime.now(UTC).isoformat(), + ) + + tool_execution_response: StreamToolExecutionResult = await execute_tool( + tool_name=tool_calls[yield_idx]["function"]["name"], + parameters=arguments, + tool_call_id=tool_calls[yield_idx]["id"], + user_id=session.user_id, + session_id=session.session_id, + ) + logger.info(f"Yielding Tool execution response: {tool_execution_response}") + yield tool_execution_response + + +if __name__ == "__main__": + import asyncio + + async def main(): + session = await create_chat_session() + async for chunk in stream_chat_completion( + session.session_id, + "Please find me an agent that can help me with my business. Call the tool twice once with the query 'money printing agent' and once with the query 'money generating agent'", + user_id=session.user_id, + ): + print(chunk) + + asyncio.run(main()) diff --git a/autogpt_platform/backend/backend/server/v2/chat/service_test.py b/autogpt_platform/backend/backend/server/v2/chat/service_test.py new file mode 100644 index 0000000000..df3d64976e --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/service_test.py @@ -0,0 +1,81 @@ +import logging +from os import getenv + +import pytest + +import backend.server.v2.chat.service as chat_service +from backend.server.v2.chat.response_model import ( + StreamEnd, + StreamError, + StreamTextChunk, + StreamToolExecutionResult, +) + +logger = logging.getLogger(__name__) + + +@pytest.mark.asyncio(loop_scope="session") +async def test_stream_chat_completion(): + """ + Test the stream_chat_completion function. + """ + api_key: str | None = getenv("OPEN_ROUTER_API_KEY") + if not api_key: + return pytest.skip("OPEN_ROUTER_API_KEY is not set, skipping test") + + session = await chat_service.create_chat_session() + + has_errors = False + has_ended = False + assistant_message = "" + async for chunk in chat_service.stream_chat_completion( + session.session_id, "Hello, how are you?", user_id=session.user_id + ): + logger.info(chunk) + if isinstance(chunk, StreamError): + has_errors = True + if isinstance(chunk, StreamTextChunk): + assistant_message += chunk.content + if isinstance(chunk, StreamEnd): + has_ended = True + + assert has_ended, "Chat completion did not end" + assert not has_errors, "Error occurred while streaming chat completion" + assert assistant_message, "Assistant message is empty" + + +@pytest.mark.asyncio(loop_scope="session") +async def test_stream_chat_completion_with_tool_calls(): + """ + Test the stream_chat_completion function. + """ + api_key: str | None = getenv("OPEN_ROUTER_API_KEY") + if not api_key: + return pytest.skip("OPEN_ROUTER_API_KEY is not set, skipping test") + + session = await chat_service.create_chat_session() + session = await chat_service.upsert_chat_session(session) + + has_errors = False + has_ended = False + had_tool_calls = False + async for chunk in chat_service.stream_chat_completion( + session.session_id, + "Please find me an agent that can help me with my business. Use the query 'moneny printing agent'", + user_id=session.user_id, + ): + logger.info(chunk) + if isinstance(chunk, StreamError): + has_errors = True + + if isinstance(chunk, StreamEnd): + has_ended = True + if isinstance(chunk, StreamToolExecutionResult): + had_tool_calls = True + + assert has_ended, "Chat completion did not end" + assert not has_errors, "Error occurred while streaming chat completion" + assert had_tool_calls, "Tool calls did not occur" + session = await chat_service.get_session(session.session_id) + assert session, "Session not found" + assert session.usage, "Usage is empty" diff --git a/autogpt_platform/backend/backend/server/v2/chat/tools/__init__.py b/autogpt_platform/backend/backend/server/v2/chat/tools/__init__.py new file mode 100644 index 0000000000..893873f58a --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/tools/__init__.py @@ -0,0 +1,51 @@ +from typing import TYPE_CHECKING, Any + +from openai.types.chat import ChatCompletionToolParam + +from .base import BaseTool +from .find_agent import FindAgentTool +from .get_agent_details import GetAgentDetailsTool +from .get_required_setup_info import GetRequiredSetupInfoTool +from .run_agent import RunAgentTool +from .setup_agent import SetupAgentTool + +if TYPE_CHECKING: + from backend.server.v2.chat.response_model import StreamToolExecutionResult + +# Initialize tool instances +find_agent_tool = FindAgentTool() +get_agent_details_tool = GetAgentDetailsTool() +get_required_setup_info_tool = GetRequiredSetupInfoTool() +setup_agent_tool = SetupAgentTool() +run_agent_tool = RunAgentTool() + +# Export tools as OpenAI format +tools: list[ChatCompletionToolParam] = [ + find_agent_tool.as_openai_tool(), + get_agent_details_tool.as_openai_tool(), + get_required_setup_info_tool.as_openai_tool(), + setup_agent_tool.as_openai_tool(), + run_agent_tool.as_openai_tool(), +] + + +async def execute_tool( + tool_name: str, + parameters: dict[str, Any], + user_id: str | None, + session_id: str, + tool_call_id: str, +) -> "StreamToolExecutionResult": + + tool_map: dict[str, BaseTool] = { + "find_agent": find_agent_tool, + "get_agent_details": get_agent_details_tool, + "get_required_setup_info": get_required_setup_info_tool, + "setup_agent": setup_agent_tool, + "run_agent": run_agent_tool, + } + if tool_name not in tool_map: + raise ValueError(f"Tool {tool_name} not found") + return await tool_map[tool_name].execute( + user_id, session_id, tool_call_id, **parameters + ) diff --git a/autogpt_platform/backend/backend/server/v2/chat/tools/_test_data.py b/autogpt_platform/backend/backend/server/v2/chat/tools/_test_data.py new file mode 100644 index 0000000000..15221bc314 --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/tools/_test_data.py @@ -0,0 +1,449 @@ +import uuid +from os import getenv + +import pytest +from pydantic import SecretStr + +from backend.blocks.firecrawl.scrape import FirecrawlScrapeBlock +from backend.blocks.io import AgentInputBlock, AgentOutputBlock +from backend.blocks.llm import AITextGeneratorBlock +from backend.data.db import prisma +from backend.data.graph import Graph, Link, Node, create_graph +from backend.data.model import APIKeyCredentials +from backend.data.user import get_or_create_user +from backend.integrations.credentials_store import IntegrationCredentialsStore +from backend.server.v2.store import db as store_db + + +@pytest.fixture(scope="session") +async def setup_test_data(): + """ + Set up test data for run_agent tests: + 1. Create a test user + 2. Create a test graph (agent input -> agent output) + 3. Create a store listing and store listing version + 4. Approve the store listing version + """ + # 1. Create a test user + user_data = { + "sub": f"test-user-{uuid.uuid4()}", + "email": f"test-{uuid.uuid4()}@example.com", + } + user = await get_or_create_user(user_data) + + # 1b. Create a profile with username for the user (required for store agent lookup) + username = user.email.split("@")[0] + await prisma.profile.create( + data={ + "userId": user.id, + "username": username, + "name": f"Test User {username}", + "description": "Test user profile", + "links": [], # Required field - empty array for test profiles + } + ) + + # 2. Create a test graph with agent input -> agent output + graph_id = str(uuid.uuid4()) + + # Create input node + input_node_id = str(uuid.uuid4()) + input_block = AgentInputBlock() + input_node = Node( + id=input_node_id, + block_id=input_block.id, + input_default={ + "name": "test_input", + "title": "Test Input", + "value": "", + "advanced": False, + "description": "Test input field", + "placeholder_values": [], + }, + metadata={"position": {"x": 0, "y": 0}}, + ) + + # Create output node + output_node_id = str(uuid.uuid4()) + output_block = AgentOutputBlock() + output_node = Node( + id=output_node_id, + block_id=output_block.id, + input_default={ + "name": "test_output", + "title": "Test Output", + "value": "", + "format": "", + "advanced": False, + "description": "Test output field", + }, + metadata={"position": {"x": 200, "y": 0}}, + ) + + # Create link from input to output + link = Link( + source_id=input_node_id, + sink_id=output_node_id, + source_name="result", + sink_name="value", + is_static=True, + ) + + # Create the graph + graph = Graph( + id=graph_id, + version=1, + is_active=True, + name="Test Agent", + description="A simple test agent for testing", + nodes=[input_node, output_node], + links=[link], + ) + + created_graph = await create_graph(graph, user.id) + + # 3. Create a store listing and store listing version for the agent + # Use unique slug to avoid constraint violations + unique_slug = f"test-agent-{str(uuid.uuid4())[:8]}" + store_submission = await store_db.create_store_submission( + user_id=user.id, + agent_id=created_graph.id, + agent_version=created_graph.version, + slug=unique_slug, + name="Test Agent", + description="A simple test agent", + sub_heading="Test agent for unit tests", + categories=["testing"], + image_urls=["https://example.com/image.jpg"], + ) + + assert store_submission.store_listing_version_id is not None + # 4. Approve the store listing version + await store_db.review_store_submission( + store_listing_version_id=store_submission.store_listing_version_id, + is_approved=True, + external_comments="Approved for testing", + internal_comments="Test approval", + reviewer_id=user.id, + ) + + return { + "user": user, + "graph": created_graph, + "store_submission": store_submission, + } + + +@pytest.fixture(scope="session") +async def setup_llm_test_data(): + """ + Set up test data for LLM agent tests: + 1. Create a test user + 2. Create test OpenAI credentials for the user + 3. Create a test graph with input -> LLM block -> output + 4. Create and approve a store listing + """ + key = getenv("OPENAI_API_KEY") + if not key: + return pytest.skip("OPENAI_API_KEY is not set") + + # 1. Create a test user + user_data = { + "sub": f"test-user-{uuid.uuid4()}", + "email": f"test-{uuid.uuid4()}@example.com", + } + user = await get_or_create_user(user_data) + + # 1b. Create a profile with username for the user (required for store agent lookup) + username = user.email.split("@")[0] + await prisma.profile.create( + data={ + "userId": user.id, + "username": username, + "name": f"Test User {username}", + "description": "Test user profile for LLM tests", + "links": [], # Required field - empty array for test profiles + } + ) + + # 2. Create test OpenAI credentials for the user + credentials = APIKeyCredentials( + id=str(uuid.uuid4()), + provider="openai", + api_key=SecretStr("test-openai-api-key"), + title="Test OpenAI API Key", + expires_at=None, + ) + + # Store the credentials + creds_store = IntegrationCredentialsStore() + await creds_store.add_creds(user.id, credentials) + + # 3. Create a test graph with input -> LLM block -> output + graph_id = str(uuid.uuid4()) + + # Create input node for the prompt + input_node_id = str(uuid.uuid4()) + input_block = AgentInputBlock() + input_node = Node( + id=input_node_id, + block_id=input_block.id, + input_default={ + "name": "user_prompt", + "title": "User Prompt", + "value": "", + "advanced": False, + "description": "Prompt for the LLM", + "placeholder_values": [], + }, + metadata={"position": {"x": 0, "y": 0}}, + ) + + # Create LLM block node + llm_node_id = str(uuid.uuid4()) + llm_block = AITextGeneratorBlock() + llm_node = Node( + id=llm_node_id, + block_id=llm_block.id, + input_default={ + "model": "gpt-4o-mini", + "sys_prompt": "You are a helpful assistant.", + "retry": 3, + "prompt_values": {}, + "credentials": { + "provider": "openai", + "id": credentials.id, + "type": "api_key", + "title": credentials.title, + }, + }, + metadata={"position": {"x": 300, "y": 0}}, + ) + + # Create output node + output_node_id = str(uuid.uuid4()) + output_block = AgentOutputBlock() + output_node = Node( + id=output_node_id, + block_id=output_block.id, + input_default={ + "name": "llm_response", + "title": "LLM Response", + "value": "", + "format": "", + "advanced": False, + "description": "Response from the LLM", + }, + metadata={"position": {"x": 600, "y": 0}}, + ) + + # Create links + # Link input.result -> llm.prompt + link1 = Link( + source_id=input_node_id, + sink_id=llm_node_id, + source_name="result", + sink_name="prompt", + is_static=True, + ) + + # Link llm.response -> output.value + link2 = Link( + source_id=llm_node_id, + sink_id=output_node_id, + source_name="response", + sink_name="value", + is_static=False, + ) + + # Create the graph + graph = Graph( + id=graph_id, + version=1, + is_active=True, + name="LLM Test Agent", + description="An agent that uses an LLM to process text", + nodes=[input_node, llm_node, output_node], + links=[link1, link2], + ) + + created_graph = await create_graph(graph, user.id) + + # 4. Create and approve a store listing + unique_slug = f"llm-test-agent-{str(uuid.uuid4())[:8]}" + store_submission = await store_db.create_store_submission( + user_id=user.id, + agent_id=created_graph.id, + agent_version=created_graph.version, + slug=unique_slug, + name="LLM Test Agent", + description="An agent with LLM capabilities", + sub_heading="Test agent with OpenAI integration", + categories=["testing", "ai"], + image_urls=["https://example.com/image.jpg"], + ) + assert store_submission.store_listing_version_id is not None + await store_db.review_store_submission( + store_listing_version_id=store_submission.store_listing_version_id, + is_approved=True, + external_comments="Approved for testing", + internal_comments="Test approval for LLM agent", + reviewer_id=user.id, + ) + + return { + "user": user, + "graph": created_graph, + "credentials": credentials, + "store_submission": store_submission, + } + + +@pytest.fixture(scope="session") +async def setup_firecrawl_test_data(): + """ + Set up test data for Firecrawl agent tests (missing credentials scenario): + 1. Create a test user (WITHOUT Firecrawl credentials) + 2. Create a test graph with input -> Firecrawl block -> output + 3. Create and approve a store listing + """ + # 1. Create a test user + user_data = { + "sub": f"test-user-{uuid.uuid4()}", + "email": f"test-{uuid.uuid4()}@example.com", + } + user = await get_or_create_user(user_data) + + # 1b. Create a profile with username for the user (required for store agent lookup) + username = user.email.split("@")[0] + await prisma.profile.create( + data={ + "userId": user.id, + "username": username, + "name": f"Test User {username}", + "description": "Test user profile for Firecrawl tests", + "links": [], # Required field - empty array for test profiles + } + ) + + # NOTE: We deliberately do NOT create Firecrawl credentials for this user + # This tests the scenario where required credentials are missing + + # 2. Create a test graph with input -> Firecrawl block -> output + graph_id = str(uuid.uuid4()) + + # Create input node for the URL + input_node_id = str(uuid.uuid4()) + input_block = AgentInputBlock() + input_node = Node( + id=input_node_id, + block_id=input_block.id, + input_default={ + "name": "url", + "title": "URL to Scrape", + "value": "", + "advanced": False, + "description": "URL for Firecrawl to scrape", + "placeholder_values": [], + }, + metadata={"position": {"x": 0, "y": 0}}, + ) + + # Create Firecrawl block node + firecrawl_node_id = str(uuid.uuid4()) + firecrawl_block = FirecrawlScrapeBlock() + firecrawl_node = Node( + id=firecrawl_node_id, + block_id=firecrawl_block.id, + input_default={ + "limit": 10, + "only_main_content": True, + "max_age": 3600000, + "wait_for": 200, + "formats": ["markdown"], + "credentials": { + "provider": "firecrawl", + "id": "test-firecrawl-id", + "type": "api_key", + "title": "Firecrawl API Key", + }, + }, + metadata={"position": {"x": 300, "y": 0}}, + ) + + # Create output node + output_node_id = str(uuid.uuid4()) + output_block = AgentOutputBlock() + output_node = Node( + id=output_node_id, + block_id=output_block.id, + input_default={ + "name": "scraped_data", + "title": "Scraped Data", + "value": "", + "format": "", + "advanced": False, + "description": "Data scraped by Firecrawl", + }, + metadata={"position": {"x": 600, "y": 0}}, + ) + + # Create links + # Link input.result -> firecrawl.url + link1 = Link( + source_id=input_node_id, + sink_id=firecrawl_node_id, + source_name="result", + sink_name="url", + is_static=True, + ) + + # Link firecrawl.markdown -> output.value + link2 = Link( + source_id=firecrawl_node_id, + sink_id=output_node_id, + source_name="markdown", + sink_name="value", + is_static=False, + ) + + # Create the graph + graph = Graph( + id=graph_id, + version=1, + is_active=True, + name="Firecrawl Test Agent", + description="An agent that uses Firecrawl to scrape websites", + nodes=[input_node, firecrawl_node, output_node], + links=[link1, link2], + ) + + created_graph = await create_graph(graph, user.id) + + # 3. Create and approve a store listing + unique_slug = f"firecrawl-test-agent-{str(uuid.uuid4())[:8]}" + store_submission = await store_db.create_store_submission( + user_id=user.id, + agent_id=created_graph.id, + agent_version=created_graph.version, + slug=unique_slug, + name="Firecrawl Test Agent", + description="An agent with Firecrawl integration (no credentials)", + sub_heading="Test agent requiring Firecrawl credentials", + categories=["testing", "scraping"], + image_urls=["https://example.com/image.jpg"], + ) + assert store_submission.store_listing_version_id is not None + await store_db.review_store_submission( + store_listing_version_id=store_submission.store_listing_version_id, + is_approved=True, + external_comments="Approved for testing", + internal_comments="Test approval for Firecrawl agent", + reviewer_id=user.id, + ) + + return { + "user": user, + "graph": created_graph, + "store_submission": store_submission, + } diff --git a/autogpt_platform/backend/backend/server/v2/chat/tools/base.py b/autogpt_platform/backend/backend/server/v2/chat/tools/base.py new file mode 100644 index 0000000000..0648ef0cba --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/tools/base.py @@ -0,0 +1,118 @@ +"""Base classes and shared utilities for chat tools.""" + +import logging +from typing import Any + +from openai.types.chat import ChatCompletionToolParam + +from backend.server.v2.chat.response_model import StreamToolExecutionResult + +from .models import ErrorResponse, NeedLoginResponse, ToolResponseBase + +logger = logging.getLogger(__name__) + + +class BaseTool: + """Base class for all chat tools.""" + + @property + def name(self) -> str: + """Tool name for OpenAI function calling.""" + raise NotImplementedError + + @property + def description(self) -> str: + """Tool description for OpenAI.""" + raise NotImplementedError + + @property + def parameters(self) -> dict[str, Any]: + """Tool parameters schema for OpenAI.""" + raise NotImplementedError + + @property + def requires_auth(self) -> bool: + """Whether this tool requires authentication.""" + return False + + def as_openai_tool(self) -> ChatCompletionToolParam: + """Convert to OpenAI tool format.""" + return ChatCompletionToolParam( + type="function", + function={ + "name": self.name, + "description": self.description, + "parameters": self.parameters, + }, + ) + + async def execute( + self, + user_id: str | None, + session_id: str, + tool_call_id: str, + **kwargs, + ) -> StreamToolExecutionResult: + """Execute the tool with authentication check. + + Args: + user_id: User ID (may be anonymous like "anon_123") + session_id: Chat session ID + **kwargs: Tool-specific parameters + + Returns: + Pydantic response object + + """ + if self.requires_auth and not user_id: + logger.error( + f"Attempted tool call for {self.name} but user not authenticated" + ) + return StreamToolExecutionResult( + tool_id=tool_call_id, + tool_name=self.name, + result=NeedLoginResponse( + message=f"Please sign in to use {self.name}", + session_id=session_id, + ).model_dump_json(), + success=False, + ) + + try: + result = await self._execute(user_id, session_id, **kwargs) + return StreamToolExecutionResult( + tool_id=tool_call_id, + tool_name=self.name, + result=result.model_dump_json(), + ) + except Exception as e: + logger.error(f"Error in {self.name}: {e}", exc_info=True) + return StreamToolExecutionResult( + tool_id=tool_call_id, + tool_name=self.name, + result=ErrorResponse( + message=f"An error occurred while executing {self.name}", + error=str(e), + session_id=session_id, + ).model_dump_json(), + success=False, + ) + + async def _execute( + self, + user_id: str | None, + session_id: str, + **kwargs, + ) -> ToolResponseBase: + """Internal execution logic to be implemented by subclasses. + + Args: + user_id: User ID (authenticated or anonymous) + session_id: Chat session ID + **kwargs: Tool-specific parameters + + Returns: + Pydantic response object + + """ + raise NotImplementedError diff --git a/autogpt_platform/backend/backend/server/v2/chat/tools/find_agent.py b/autogpt_platform/backend/backend/server/v2/chat/tools/find_agent.py new file mode 100644 index 0000000000..552ccc72d8 --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/tools/find_agent.py @@ -0,0 +1,149 @@ +"""Tool for discovering agents from marketplace and user library.""" + +import logging +from typing import Any + +from backend.server.v2.chat.tools.base import BaseTool +from backend.server.v2.chat.tools.models import ( + AgentCarouselResponse, + AgentInfo, + ErrorResponse, + NoResultsResponse, + ToolResponseBase, +) +from backend.server.v2.store import db as store_db +from backend.util.exceptions import DatabaseError, NotFoundError + +logger = logging.getLogger(__name__) + + +class FindAgentTool(BaseTool): + """Tool for discovering agents based on user needs.""" + + @property + def name(self) -> str: + return "find_agent" + + @property + def description(self) -> str: + return ( + "Discover agents from the marketplace based on capabilities and user needs." + ) + + @property + def parameters(self) -> dict[str, Any]: + return { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "Search query describing what the user wants to accomplish. Use single keywords for best results.", + }, + }, + "required": ["query"], + } + + async def _execute( + self, + user_id: str | None, + session_id: str, + **kwargs, + ) -> ToolResponseBase: + """Search for agents in the marketplace. + + Args: + user_id: User ID (may be anonymous) + session_id: Chat session ID + query: Search query + + Returns: + AgentCarouselResponse: List of agents found in the marketplace + NoResultsResponse: No agents found in the marketplace + ErrorResponse: Error message + """ + query = kwargs.get("query", "").strip() + + if not query: + return ErrorResponse( + message="Please provide a search query", + session_id=session_id, + ) + agents = [] + try: + logger.info(f"Searching marketplace for: {query}") + store_results = await store_db.get_store_agents( + search_query=query, + page_size=5, + ) + + logger.info(f"Find agents tool found {len(store_results.agents)} agents") + for agent in store_results.agents: + agent_id = f"{agent.creator}/{agent.slug}" + logger.info(f"Building agent ID = {agent_id}") + agents.append( + AgentInfo( + id=agent_id, + name=agent.agent_name, + description=agent.description or "", + source="marketplace", + in_library=False, + creator=agent.creator, + category="general", + rating=agent.rating, + runs=agent.runs, + is_featured=False, + ), + ) + except NotFoundError: + pass + except DatabaseError as e: + logger.error(f"Error searching agents: {e}", exc_info=True) + return ErrorResponse( + message="Failed to search for agents. Please try again.", + error=str(e), + session_id=session_id, + ) + if not agents: + return NoResultsResponse( + message=f"No agents found matching '{query}'. Try different keywords or browse the marketplace. If you have 3 consecutive find_agent tool calls results and found no agents. Please stop trying and ask the user if there is anything else you can help with.", + session_id=session_id, + suggestions=[ + "Try more general terms", + "Browse categories in the marketplace", + "Check spelling", + ], + ) + + # Return formatted carousel + title = ( + f"Found {len(agents)} agent{'s' if len(agents) != 1 else ''} for '{query}'" + ) + return AgentCarouselResponse( + message="Now you have found some options for the user to choose from. Please ask the user if they would like to use any of these agents. If they do, please call the get_agent_details tool for this agent.", + title=title, + agents=agents, + count=len(agents), + session_id=session_id, + ) + + +if __name__ == "__main__": + import asyncio + + import prisma + + find_agent_tool = FindAgentTool() + print(find_agent_tool.as_openai_tool()) + + async def main(): + await prisma.Prisma().connect() + agents = await find_agent_tool.execute( + tool_call_id="tool_call_id", + query="Linkedin", + user_id="user", + session_id="session", + ) + print(agents) + await prisma.Prisma().disconnect() + + asyncio.run(main()) diff --git a/autogpt_platform/backend/backend/server/v2/chat/tools/get_agent_details.py b/autogpt_platform/backend/backend/server/v2/chat/tools/get_agent_details.py new file mode 100644 index 0000000000..df928f1658 --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/tools/get_agent_details.py @@ -0,0 +1,220 @@ +"""Tool for getting detailed information about a specific agent.""" + +import logging +from typing import Any + +from backend.data import graph as graph_db +from backend.data.model import CredentialsMetaInput +from backend.server.v2.chat.tools.base import BaseTool +from backend.server.v2.chat.tools.models import ( + AgentDetails, + AgentDetailsResponse, + ErrorResponse, + ExecutionOptions, + ToolResponseBase, +) +from backend.server.v2.store import db as store_db +from backend.util.exceptions import DatabaseError, NotFoundError + +logger = logging.getLogger(__name__) + + +class GetAgentDetailsTool(BaseTool): + """Tool for getting detailed information about an agent.""" + + @property + def name(self) -> str: + return "get_agent_details" + + @property + def description(self) -> str: + return "Get detailed information about a specific agent including inputs, credentials required, and execution options." + + @property + def parameters(self) -> dict[str, Any]: + return { + "type": "object", + "properties": { + "username_agent_slug": { + "type": "string", + "description": "The marketplace agent slug (e.g., 'username/agent-name')", + }, + }, + "required": ["username_agent_slug"], + } + + async def _execute( + self, + user_id: str | None, + session_id: str, + **kwargs, + ) -> ToolResponseBase: + """Get detailed information about an agent. + + Args: + user_id: User ID (may be anonymous) + session_id: Chat session ID + username_agent_slug: Agent ID or slug + + Returns: + Pydantic response model + + """ + agent_id = kwargs.get("username_agent_slug", "").strip() + + if not agent_id or "/" not in agent_id: + return ErrorResponse( + message="Please provide an agent ID in format 'creator/agent-name'", + session_id=session_id, + ) + + try: + # Always try to get from marketplace first + graph = None + store_agent = None + + # Check if it's a slug format (username/agent_name) + try: + # Parse username/agent_name from slug + username, agent_name = agent_id.split("/", 1) + store_agent = await store_db.get_store_agent_details( + username, agent_name + ) + logger.info(f"Found agent {agent_id} in marketplace") + except NotFoundError as e: + logger.debug(f"Failed to get from marketplace: {e}") + return ErrorResponse( + message=f"Agent '{agent_id}' not found", + session_id=session_id, + ) + except DatabaseError as e: + logger.error(f"Failed to get from marketplace: {e}") + return ErrorResponse( + message=f"Failed to get agent details: {e!s}", + session_id=session_id, + ) + + # If we found a store agent, get its graph + if store_agent: + try: + # Use get_available_graph to get the graph from store listing version + graph_meta = await store_db.get_available_graph( + store_agent.store_listing_version_id + ) + # Now get the full graph with that ID + graph = await graph_db.get_graph( + graph_id=graph_meta.id, + version=graph_meta.version, + user_id=None, # Public access + include_subgraphs=True, + ) + + except NotFoundError as e: + logger.error(f"Failed to get graph for store agent: {e}") + return ErrorResponse( + message=f"Failed to get graph for store agent: {e!s}", + session_id=session_id, + ) + except DatabaseError as e: + logger.error(f"Failed to get graph for store agent: {e}") + return ErrorResponse( + message=f"Failed to get graph for store agent: {e!s}", + session_id=session_id, + ) + + if not graph: + return ErrorResponse( + message=f"Agent '{agent_id}' not found", + session_id=session_id, + ) + + credentials_input_schema = graph.credentials_input_schema + + # Extract credentials from the JSON schema properties + credentials = [] + if ( + isinstance(credentials_input_schema, dict) + and "properties" in credentials_input_schema + ): + for cred_name, cred_schema in credentials_input_schema[ + "properties" + ].items(): + # Extract credential metadata from the schema + # The schema properties contain provider info and other metadata + + # Get provider from credentials_provider array or properties.provider.const + provider = "unknown" + if ( + "credentials_provider" in cred_schema + and cred_schema["credentials_provider"] + ): + provider = cred_schema["credentials_provider"][0] + elif ( + "properties" in cred_schema + and "provider" in cred_schema["properties"] + ): + provider = cred_schema["properties"]["provider"].get( + "const", "unknown" + ) + + # Get type from credentials_types array or properties.type.const + cred_type = "api_key" # Default + if ( + "credentials_types" in cred_schema + and cred_schema["credentials_types"] + ): + cred_type = cred_schema["credentials_types"][0] + elif ( + "properties" in cred_schema + and "type" in cred_schema["properties"] + ): + cred_type = cred_schema["properties"]["type"].get( + "const", "api_key" + ) + + credentials.append( + CredentialsMetaInput( + id=cred_name, + title=cred_schema.get("title", cred_name), + provider=provider, # type: ignore + type=cred_type, + ) + ) + + trigger_info = ( + graph.trigger_setup_info.model_dump() + if graph.trigger_setup_info + else None + ) + + agent_details = AgentDetails( + id=graph.id, + name=graph.name, + description=graph.description, + inputs=graph.input_schema, + credentials=credentials, + execution_options=ExecutionOptions( + # Currently a graph with a webhook can only be triggered by a webhook + manual=trigger_info is None, + scheduled=trigger_info is None, + webhook=trigger_info is not None, + ), + trigger_info=trigger_info, + ) + + return AgentDetailsResponse( + message=f"Found agent '{agent_details.name}'. You do not need to run this tool again for this agent.", + session_id=session_id, + agent=agent_details, + user_authenticated=user_id is not None, + graph_id=graph.id, + graph_version=graph.version, + ) + + except Exception as e: + logger.error(f"Error getting agent details: {e}", exc_info=True) + return ErrorResponse( + message=f"Failed to get agent details: {e!s}", + error=str(e), + session_id=session_id, + ) diff --git a/autogpt_platform/backend/backend/server/v2/chat/tools/get_agent_details_test.py b/autogpt_platform/backend/backend/server/v2/chat/tools/get_agent_details_test.py new file mode 100644 index 0000000000..2c77b81659 --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/tools/get_agent_details_test.py @@ -0,0 +1,309 @@ +import uuid + +import orjson +import pytest + +from backend.server.v2.chat.tools._test_data import setup_llm_test_data, setup_test_data +from backend.server.v2.chat.tools.get_agent_details import GetAgentDetailsTool + +# This is so the formatter doesn't remove the fixture imports +setup_llm_test_data = setup_llm_test_data +setup_test_data = setup_test_data + + +@pytest.mark.asyncio(scope="session") +async def test_get_agent_details_success(setup_test_data): + """Test successfully getting agent details from marketplace""" + # Use test data from fixture + user = setup_test_data["user"] + graph = setup_test_data["graph"] + store_submission = setup_test_data["store_submission"] + + # Create the tool instance + tool = GetAgentDetailsTool() + + # Build the proper marketplace agent_id format: username/slug + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute the tool + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + ) + + # Verify the response + assert response is not None + assert hasattr(response, "result") + + # Parse the result JSON + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + + # Check the basic structure + assert "agent" in result_data + assert "message" in result_data + assert "graph_id" in result_data + assert "graph_version" in result_data + assert "user_authenticated" in result_data + + # Check agent details + agent = result_data["agent"] + assert agent["id"] == graph.id + assert agent["name"] == "Test Agent" + assert ( + agent["description"] == "A simple test agent" + ) # Description from store submission + assert "inputs" in agent + assert "credentials" in agent + assert "execution_options" in agent + + # Check execution options + exec_options = agent["execution_options"] + assert "manual" in exec_options + assert "scheduled" in exec_options + assert "webhook" in exec_options + + # Check inputs schema + assert isinstance(agent["inputs"], dict) + # Should have properties for the input fields + if "properties" in agent["inputs"]: + assert "test_input" in agent["inputs"]["properties"] + + +@pytest.mark.asyncio(scope="session") +async def test_get_agent_details_with_llm_credentials(setup_llm_test_data): + """Test getting agent details for an agent that requires LLM credentials""" + # Use test data from fixture + user = setup_llm_test_data["user"] + store_submission = setup_llm_test_data["store_submission"] + + # Create the tool instance + tool = GetAgentDetailsTool() + + # Build the proper marketplace agent_id format + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute the tool + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + ) + + # Verify the response + assert response is not None + assert hasattr(response, "result") + + # Parse the result JSON + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + + # Check that agent details are returned + assert "agent" in result_data + agent = result_data["agent"] + + # Check that credentials are listed + assert "credentials" in agent + credentials = agent["credentials"] + + # The LLM agent should have OpenAI credentials listed + # Note: This depends on how the graph's credentials_input_schema is structured + assert isinstance(credentials, list) + + # Check that inputs include the user_prompt + assert "inputs" in agent + if "properties" in agent["inputs"]: + assert "user_prompt" in agent["inputs"]["properties"] + + +@pytest.mark.asyncio(scope="session") +async def test_get_agent_details_invalid_format(): + """Test error handling when agent_id is not in correct format""" + tool = GetAgentDetailsTool() + + # Execute with invalid format (no slash) + response = await tool.execute( + user_id=str(uuid.uuid4()), + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug="invalid-format", + ) + + # Verify error response + assert response is not None + assert hasattr(response, "result") + + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + assert "message" in result_data + assert "creator/agent-name" in result_data["message"] + + +@pytest.mark.asyncio(scope="session") +async def test_get_agent_details_empty_slug(): + """Test error handling when agent_id is empty""" + tool = GetAgentDetailsTool() + + # Execute with empty slug + response = await tool.execute( + user_id=str(uuid.uuid4()), + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug="", + ) + + # Verify error response + assert response is not None + assert hasattr(response, "result") + + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + assert "message" in result_data + assert "creator/agent-name" in result_data["message"] + + +@pytest.mark.asyncio(scope="session") +async def test_get_agent_details_not_found(): + """Test error handling when agent is not found in marketplace""" + tool = GetAgentDetailsTool() + + # Execute with non-existent agent + response = await tool.execute( + user_id=str(uuid.uuid4()), + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug="nonexistent/agent", + ) + + # Verify error response + assert response is not None + assert hasattr(response, "result") + + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + assert "message" in result_data + assert "not found" in result_data["message"].lower() + + +@pytest.mark.asyncio(scope="session") +async def test_get_agent_details_anonymous_user(setup_test_data): + """Test getting agent details as an anonymous user (no user_id)""" + # Use test data from fixture + user = setup_test_data["user"] + store_submission = setup_test_data["store_submission"] + + # Create the tool instance + tool = GetAgentDetailsTool() + + # Build the proper marketplace agent_id format + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute the tool without a user_id (anonymous) + response = await tool.execute( + user_id=None, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + ) + + # Verify the response + assert response is not None + assert hasattr(response, "result") + + # Parse the result JSON + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + + # Should still get agent details + assert "agent" in result_data + assert "user_authenticated" in result_data + + # User should be marked as not authenticated + assert result_data["user_authenticated"] is False + + +@pytest.mark.asyncio(scope="session") +async def test_get_agent_details_authenticated_user(setup_test_data): + """Test getting agent details as an authenticated user""" + # Use test data from fixture + user = setup_test_data["user"] + store_submission = setup_test_data["store_submission"] + + # Create the tool instance + tool = GetAgentDetailsTool() + + # Build the proper marketplace agent_id format + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute the tool with a user_id (authenticated) + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + ) + + # Verify the response + assert response is not None + assert hasattr(response, "result") + + # Parse the result JSON + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + + # Should get agent details + assert "agent" in result_data + assert "user_authenticated" in result_data + + # User should be marked as authenticated + assert result_data["user_authenticated"] is True + + +@pytest.mark.asyncio(scope="session") +async def test_get_agent_details_includes_execution_options(setup_test_data): + """Test that agent details include execution options""" + # Use test data from fixture + user = setup_test_data["user"] + store_submission = setup_test_data["store_submission"] + + # Create the tool instance + tool = GetAgentDetailsTool() + + # Build the proper marketplace agent_id format + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute the tool + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + ) + + # Verify the response + assert response is not None + assert hasattr(response, "result") + + # Parse the result JSON + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + + # Check execution options + assert "agent" in result_data + agent = result_data["agent"] + assert "execution_options" in agent + + exec_options = agent["execution_options"] + + # These should all be boolean values + assert isinstance(exec_options["manual"], bool) + assert isinstance(exec_options["scheduled"], bool) + assert isinstance(exec_options["webhook"], bool) + + # For a regular agent (no webhook), manual and scheduled should be True + assert exec_options["manual"] is True + assert exec_options["scheduled"] is True + assert exec_options["webhook"] is False diff --git a/autogpt_platform/backend/backend/server/v2/chat/tools/get_required_setup_info.py b/autogpt_platform/backend/backend/server/v2/chat/tools/get_required_setup_info.py new file mode 100644 index 0000000000..e4b40d00e8 --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/tools/get_required_setup_info.py @@ -0,0 +1,183 @@ +"""Tool for getting required setup information for an agent.""" + +import logging +from typing import Any + +from backend.integrations.creds_manager import IntegrationCredentialsManager +from backend.server.v2.chat.tools.base import BaseTool +from backend.server.v2.chat.tools.get_agent_details import GetAgentDetailsTool +from backend.server.v2.chat.tools.models import ( + AgentDetailsResponse, + ErrorResponse, + SetupInfo, + SetupRequirementsResponse, + ToolResponseBase, + UserReadiness, +) + +logger = logging.getLogger(__name__) + + +class GetRequiredSetupInfoTool(BaseTool): + """Tool for getting required setup information including credentials and inputs.""" + + @property + def name(self) -> str: + return "get_required_setup_info" + + @property + def description(self) -> str: + return """Check if an agent can be set up with the provided input data and credentials. + Call this AFTER get_agent_details to validate that you have all required inputs. + Pass the input dictionary you plan to use with run_agent or setup_agent to verify it's complete.""" + + @property + def parameters(self) -> dict[str, Any]: + return { + "type": "object", + "properties": { + "username_agent_slug": { + "type": "string", + "description": "The marketplace agent slug (e.g., 'username/agent-name' or just 'agent-name' to search)", + }, + "inputs": { + "type": "object", + "description": "The input dictionary you plan to provide. Should contain ALL required inputs from get_agent_details", + "additionalProperties": True, + }, + }, + "required": ["username_agent_slug"], + } + + @property + def requires_auth(self) -> bool: + """This tool requires authentication.""" + return True + + async def _execute( + self, + user_id: str | None, + session_id: str, + **kwargs, + ) -> ToolResponseBase: + """ + Retrieve and validate the required setup information for running or configuring an agent. + + This checks all required credentials and input fields based on the agent details, + and verifies user readiness to run the agent based on provided inputs and available credentials. + + Args: + user_id: The authenticated user's ID (must not be None; authentication required). + session_id: The chat session ID. + agent_id: The agent's marketplace slug (e.g. 'username/agent-name'). Also accepts Graph ID. + agent_version: (Optional) Specific agent/graph version (if applicable). + + Returns: + SetupRequirementsResponse containing: + - agent and graph info, + - credential and input requirements, + - user readiness and missing credentials/fields, + - setup instructions. + """ + assert ( + user_id is not None + ), "GetRequiredSetupInfoTool - This should never happen user_id is None when auth is required" + + # Call _execute directly since we're calling internally from another tool + agent_details = await GetAgentDetailsTool()._execute( + user_id, session_id, **kwargs + ) + + if isinstance(agent_details, ErrorResponse): + return agent_details + + if not isinstance(agent_details, AgentDetailsResponse): + return ErrorResponse( + message="Failed to get agent details", + session_id=session_id, + ) + + available_creds = await IntegrationCredentialsManager().store.get_all_creds( + user_id + ) + required_credentials = [] + + # Check if user has credentials matching the required provider/type + for c in agent_details.agent.credentials: + # Check if any available credential matches this provider and type + has_matching_cred = any( + cred.provider == c.provider and cred.type == c.type + for cred in available_creds + ) + if not has_matching_cred: + required_credentials.append(c) + + required_fields = set(agent_details.agent.inputs.get("required", [])) + provided_inputs = kwargs.get("inputs", {}) + missing_inputs = required_fields - set(provided_inputs.keys()) + + missing_credentials = {c.id: c.model_dump() for c in required_credentials} + + user_readiness = UserReadiness( + has_all_credentials=len(required_credentials) == 0, + missing_credentials=missing_credentials, + ready_to_run=len(missing_inputs) == 0 and len(required_credentials) == 0, + ) + # Convert execution options to list of available modes + exec_opts = agent_details.agent.execution_options + execution_modes = [] + if exec_opts.manual: + execution_modes.append("manual") + if exec_opts.scheduled: + execution_modes.append("scheduled") + if exec_opts.webhook: + execution_modes.append("webhook") + + # Convert input schema to list of input field info + inputs_list = [] + if ( + isinstance(agent_details.agent.inputs, dict) + and "properties" in agent_details.agent.inputs + ): + for field_name, field_schema in agent_details.agent.inputs[ + "properties" + ].items(): + inputs_list.append( + { + "name": field_name, + "title": field_schema.get("title", field_name), + "type": field_schema.get("type", "string"), + "description": field_schema.get("description", ""), + "required": field_name + in agent_details.agent.inputs.get("required", []), + } + ) + + requirements = { + "credentials": agent_details.agent.credentials, + "inputs": inputs_list, + "execution_modes": execution_modes, + } + message = "" + if len(agent_details.agent.credentials) > 0: + message = "The user needs to enter credentials before proceeding. Please wait until you have a message informing you that the credentials have been entered." + elif len(inputs_list) > 0: + message = ( + "The user needs to enter inputs before proceeding. Please wait until you have a message informing you that the inputs have been entered. The inputs are: " + + ", ".join([input["name"] for input in inputs_list]) + ) + else: + message = "The agent is ready to run. Please call the run_agent tool with the agent ID." + + return SetupRequirementsResponse( + message=message, + session_id=session_id, + setup_info=SetupInfo( + agent_id=agent_details.agent.id, + agent_name=agent_details.agent.name, + user_readiness=user_readiness, + requirements=requirements, + ), + graph_id=agent_details.graph_id, + graph_version=agent_details.graph_version, + ) diff --git a/autogpt_platform/backend/backend/server/v2/chat/tools/get_required_setup_info_test.py b/autogpt_platform/backend/backend/server/v2/chat/tools/get_required_setup_info_test.py new file mode 100644 index 0000000000..382b39753a --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/tools/get_required_setup_info_test.py @@ -0,0 +1,394 @@ +import uuid + +import orjson +import pytest + +from backend.server.v2.chat.tools._test_data import ( + setup_firecrawl_test_data, + setup_llm_test_data, + setup_test_data, +) +from backend.server.v2.chat.tools.get_required_setup_info import ( + GetRequiredSetupInfoTool, +) + +# This is so the formatter doesn't remove the fixture imports +setup_llm_test_data = setup_llm_test_data +setup_test_data = setup_test_data +setup_firecrawl_test_data = setup_firecrawl_test_data + + +@pytest.mark.asyncio(scope="session") +async def test_get_required_setup_info_success(setup_test_data): + """Test successfully getting setup info for a simple agent""" + # Use test data from fixture + user = setup_test_data["user"] + graph = setup_test_data["graph"] + store_submission = setup_test_data["store_submission"] + + # Create the tool instance + tool = GetRequiredSetupInfoTool() + + # Build the proper marketplace agent_id format: username/slug + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute the tool + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + inputs={"test_input": "Hello World"}, + ) + + # Verify the response + assert response is not None + assert hasattr(response, "result") + + # Parse the result JSON + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + + # Check the basic structure + assert "setup_info" in result_data + setup_info = result_data["setup_info"] + + # Check agent info + assert "agent_id" in setup_info + assert setup_info["agent_id"] == graph.id + assert "agent_name" in setup_info + assert setup_info["agent_name"] == "Test Agent" + + # Check requirements + assert "requirements" in setup_info + requirements = setup_info["requirements"] + assert "credentials" in requirements + assert "inputs" in requirements + assert "execution_modes" in requirements + + # Simple agent should have no credentials required + assert isinstance(requirements["credentials"], list) + assert len(requirements["credentials"]) == 0 + + # Check inputs format + assert isinstance(requirements["inputs"], list) + if len(requirements["inputs"]) > 0: + first_input = requirements["inputs"][0] + assert "name" in first_input + assert "title" in first_input + assert "type" in first_input + + # Check execution modes + assert isinstance(requirements["execution_modes"], list) + assert "manual" in requirements["execution_modes"] + assert "scheduled" in requirements["execution_modes"] + + # Check user readiness + assert "user_readiness" in setup_info + user_readiness = setup_info["user_readiness"] + assert "has_all_credentials" in user_readiness + assert "ready_to_run" in user_readiness + # Simple agent with inputs provided should be ready + assert user_readiness["ready_to_run"] is True + + +@pytest.mark.asyncio(scope="session") +async def test_get_required_setup_info_missing_credentials(setup_firecrawl_test_data): + """Test getting setup info for an agent requiring missing credentials""" + # Use test data from fixture + user = setup_firecrawl_test_data["user"] + store_submission = setup_firecrawl_test_data["store_submission"] + + # Create the tool instance + tool = GetRequiredSetupInfoTool() + + # Build the proper marketplace agent_id format + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute the tool + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + inputs={"url": "https://example.com"}, + ) + + # Verify the response + assert response is not None + assert hasattr(response, "result") + + # Parse the result JSON + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + + # Check setup info + assert "setup_info" in result_data + setup_info = result_data["setup_info"] + + # Check requirements + requirements = setup_info["requirements"] + + # Should have Firecrawl credentials required + assert "credentials" in requirements + assert isinstance(requirements["credentials"], list) + assert len(requirements["credentials"]) > 0 + + # Check the credential requirement + firecrawl_cred = requirements["credentials"][0] + assert "provider" in firecrawl_cred + assert firecrawl_cred["provider"] == "firecrawl" + assert "type" in firecrawl_cred + assert firecrawl_cred["type"] == "api_key" + + # Check user readiness - should NOT be ready since credentials are missing + user_readiness = setup_info["user_readiness"] + assert user_readiness["has_all_credentials"] is False + assert user_readiness["ready_to_run"] is False + + # Check missing credentials + assert "missing_credentials" in user_readiness + assert isinstance(user_readiness["missing_credentials"], dict) + assert len(user_readiness["missing_credentials"]) > 0 + + +@pytest.mark.asyncio(scope="session") +async def test_get_required_setup_info_with_available_credentials(setup_llm_test_data): + """Test getting setup info when user has required credentials""" + # Use test data from fixture (includes OpenAI credentials) + user = setup_llm_test_data["user"] + store_submission = setup_llm_test_data["store_submission"] + + # Create the tool instance + tool = GetRequiredSetupInfoTool() + + # Build the proper marketplace agent_id format + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute the tool + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + inputs={"user_prompt": "What is 2+2?"}, + ) + + # Verify the response + assert response is not None + assert hasattr(response, "result") + + # Parse the result JSON + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + + # Check setup info + setup_info = result_data["setup_info"] + + # Check user readiness - should be ready since credentials are available + user_readiness = setup_info["user_readiness"] + assert user_readiness["has_all_credentials"] is True + assert user_readiness["ready_to_run"] is True + + # Missing credentials should be empty + assert "missing_credentials" in user_readiness + assert len(user_readiness["missing_credentials"]) == 0 + + +@pytest.mark.asyncio(scope="session") +async def test_get_required_setup_info_missing_inputs(setup_test_data): + """Test getting setup info when required inputs are not provided""" + # Use test data from fixture + user = setup_test_data["user"] + store_submission = setup_test_data["store_submission"] + + # Create the tool instance + tool = GetRequiredSetupInfoTool() + + # Build the proper marketplace agent_id format + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute the tool WITHOUT providing inputs + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + inputs={}, # Empty inputs + ) + + # Verify the response + assert response is not None + assert hasattr(response, "result") + + # Parse the result JSON + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + + # Check setup info + setup_info = result_data["setup_info"] + + # Check requirements + requirements = setup_info["requirements"] + assert "inputs" in requirements + assert isinstance(requirements["inputs"], list) + + # User readiness depends on whether inputs are required or optional + user_readiness = setup_info["user_readiness"] + assert "ready_to_run" in user_readiness + + +@pytest.mark.asyncio(scope="session") +async def test_get_required_setup_info_invalid_agent(): + """Test getting setup info for a non-existent agent""" + # Create the tool instance + tool = GetRequiredSetupInfoTool() + + # Execute with invalid agent ID + response = await tool.execute( + user_id=str(uuid.uuid4()), + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug="invalid/agent", + inputs={}, + ) + + # Verify error response + assert response is not None + assert hasattr(response, "result") + + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + assert "message" in result_data + # Should indicate failure or not found + assert any( + phrase in result_data["message"].lower() + for phrase in ["not found", "failed", "error"] + ) + + +@pytest.mark.asyncio(scope="session") +async def test_get_required_setup_info_graph_metadata(setup_test_data): + """Test that setup info includes graph metadata""" + # Use test data from fixture + user = setup_test_data["user"] + graph = setup_test_data["graph"] + store_submission = setup_test_data["store_submission"] + + # Create the tool instance + tool = GetRequiredSetupInfoTool() + + # Build the proper marketplace agent_id format + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute the tool + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + inputs={"test_input": "test"}, + ) + + # Verify the response + assert response is not None + assert hasattr(response, "result") + + # Parse the result JSON + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + + # Check that graph_id and graph_version are included + assert "graph_id" in result_data + assert result_data["graph_id"] == graph.id + assert "graph_version" in result_data + assert result_data["graph_version"] == graph.version + + +@pytest.mark.asyncio(scope="session") +async def test_get_required_setup_info_inputs_structure(setup_test_data): + """Test that inputs are properly structured as a list""" + # Use test data from fixture + user = setup_test_data["user"] + store_submission = setup_test_data["store_submission"] + + # Create the tool instance + tool = GetRequiredSetupInfoTool() + + # Build the proper marketplace agent_id format + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute the tool + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + inputs={}, + ) + + # Verify the response + assert response is not None + assert hasattr(response, "result") + + # Parse the result JSON + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + + # Check inputs structure + setup_info = result_data["setup_info"] + requirements = setup_info["requirements"] + + # Inputs should be a list + assert isinstance(requirements["inputs"], list) + + # Each input should have proper structure + for input_field in requirements["inputs"]: + assert isinstance(input_field, dict) + assert "name" in input_field + assert "title" in input_field + assert "type" in input_field + assert "description" in input_field + assert "required" in input_field + assert isinstance(input_field["required"], bool) + + +@pytest.mark.asyncio(scope="session") +async def test_get_required_setup_info_execution_modes_structure(setup_test_data): + """Test that execution_modes are properly structured as a list""" + # Use test data from fixture + user = setup_test_data["user"] + store_submission = setup_test_data["store_submission"] + + # Create the tool instance + tool = GetRequiredSetupInfoTool() + + # Build the proper marketplace agent_id format + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute the tool + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + inputs={}, + ) + + # Verify the response + assert response is not None + assert hasattr(response, "result") + + # Parse the result JSON + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + + # Check execution modes structure + setup_info = result_data["setup_info"] + requirements = setup_info["requirements"] + + # execution_modes should be a list of strings + assert isinstance(requirements["execution_modes"], list) + for mode in requirements["execution_modes"]: + assert isinstance(mode, str) + assert mode in ["manual", "scheduled", "webhook"] diff --git a/autogpt_platform/backend/backend/server/v2/chat/tools/models.py b/autogpt_platform/backend/backend/server/v2/chat/tools/models.py new file mode 100644 index 0000000000..e8c2ab0ff1 --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/tools/models.py @@ -0,0 +1,279 @@ +"""Pydantic models for tool responses.""" + +from enum import Enum +from typing import Any + +from pydantic import BaseModel, Field + +from backend.data.model import CredentialsMetaInput + + +class ResponseType(str, Enum): + """Types of tool responses.""" + + AGENT_CAROUSEL = "agent_carousel" + AGENT_DETAILS = "agent_details" + AGENT_DETAILS_NEED_LOGIN = "agent_details_need_login" + AGENT_DETAILS_NEED_CREDENTIALS = "agent_details_need_credentials" + SETUP_REQUIREMENTS = "setup_requirements" + SCHEDULE_CREATED = "schedule_created" + WEBHOOK_CREATED = "webhook_created" + PRESET_CREATED = "preset_created" + EXECUTION_STARTED = "execution_started" + NEED_LOGIN = "need_login" + NEED_CREDENTIALS = "need_credentials" + INSUFFICIENT_CREDITS = "insufficient_credits" + VALIDATION_ERROR = "validation_error" + ERROR = "error" + NO_RESULTS = "no_results" + SUCCESS = "success" + + +# Base response model +class ToolResponseBase(BaseModel): + """Base model for all tool responses.""" + + type: ResponseType + message: str + session_id: str | None = None + + +# Agent discovery models +class AgentInfo(BaseModel): + """Information about an agent.""" + + id: str + name: str + description: str + source: str = Field(description="marketplace or library") + in_library: bool = False + creator: str | None = None + category: str | None = None + rating: float | None = None + runs: int | None = None + is_featured: bool | None = None + status: str | None = None + can_access_graph: bool | None = None + has_external_trigger: bool | None = None + new_output: bool | None = None + graph_id: str | None = None + + +class AgentCarouselResponse(ToolResponseBase): + """Response for find_agent tool.""" + + type: ResponseType = ResponseType.AGENT_CAROUSEL + title: str = "Available Agents" + agents: list[AgentInfo] + count: int + + +class NoResultsResponse(ToolResponseBase): + """Response when no agents found.""" + + type: ResponseType = ResponseType.NO_RESULTS + suggestions: list[str] = [] + + +# Agent details models +class InputField(BaseModel): + """Input field specification.""" + + name: str + type: str = "string" + description: str = "" + required: bool = False + default: Any | None = None + options: list[Any] | None = None + format: str | None = None + + +class ExecutionOptions(BaseModel): + """Available execution options for an agent.""" + + manual: bool = True + scheduled: bool = True + webhook: bool = False + + +class AgentDetails(BaseModel): + """Detailed agent information.""" + + id: str + name: str + description: str + in_library: bool = False + inputs: dict[str, Any] = {} + credentials: list[CredentialsMetaInput] = [] + execution_options: ExecutionOptions = Field(default_factory=ExecutionOptions) + trigger_info: dict[str, Any] | None = None + + +class AgentDetailsResponse(ToolResponseBase): + """Response for get_agent_details tool.""" + + type: ResponseType = ResponseType.AGENT_DETAILS + agent: AgentDetails + user_authenticated: bool = False + graph_id: str | None = None + graph_version: int | None = None + + +class AgentDetailsNeedLoginResponse(ToolResponseBase): + """Response when agent details need login.""" + + type: ResponseType = ResponseType.AGENT_DETAILS_NEED_LOGIN + agent: AgentDetails + agent_info: dict[str, Any] | None = None + graph_id: str | None = None + graph_version: int | None = None + + +class AgentDetailsNeedCredentialsResponse(ToolResponseBase): + """Response when agent needs credentials to be configured.""" + + type: ResponseType = ResponseType.NEED_CREDENTIALS + agent: AgentDetails + credentials_schema: dict[str, Any] + agent_info: dict[str, Any] | None = None + graph_id: str | None = None + graph_version: int | None = None + + +# Setup info models +class SetupRequirementInfo(BaseModel): + """Setup requirement information.""" + + key: str + provider: str + required: bool = True + user_has: bool = False + credential_id: str | None = None + type: str | None = None + scopes: list[str] | None = None + description: str | None = None + + +class ExecutionModeInfo(BaseModel): + """Execution mode information.""" + + type: str # manual, scheduled, webhook + description: str + supported: bool + config_required: dict[str, str] | None = None + trigger_info: dict[str, Any] | None = None + + +class UserReadiness(BaseModel): + """User readiness status.""" + + has_all_credentials: bool = False + missing_credentials: dict[str, Any] = {} + ready_to_run: bool = False + + +class SetupInfo(BaseModel): + """Complete setup information.""" + + agent_id: str + agent_name: str + requirements: dict[str, list[Any]] = Field( + default_factory=lambda: { + "credentials": [], + "inputs": [], + "execution_modes": [], + }, + ) + user_readiness: UserReadiness = Field(default_factory=UserReadiness) + setup_instructions: list[str] = [] + + +class SetupRequirementsResponse(ToolResponseBase): + """Response for get_required_setup_info tool.""" + + type: ResponseType = ResponseType.SETUP_REQUIREMENTS + setup_info: SetupInfo + graph_id: str | None = None + graph_version: int | None = None + + +# Setup agent models +class ScheduleCreatedResponse(ToolResponseBase): + """Response for scheduled agent setup.""" + + type: ResponseType = ResponseType.SCHEDULE_CREATED + schedule_id: str + name: str + cron: str + timezone: str = "UTC" + next_run: str | None = None + graph_id: str + graph_name: str + + +class WebhookCreatedResponse(ToolResponseBase): + """Response for webhook agent setup.""" + + type: ResponseType = ResponseType.WEBHOOK_CREATED + webhook_id: str + webhook_url: str + preset_id: str | None = None + name: str + graph_id: str + graph_name: str + + +class PresetCreatedResponse(ToolResponseBase): + """Response for preset agent setup.""" + + type: ResponseType = ResponseType.PRESET_CREATED + preset_id: str + name: str + graph_id: str + graph_name: str + + +# Run agent models +class ExecutionStartedResponse(ToolResponseBase): + """Response for agent execution started.""" + + type: ResponseType = ResponseType.EXECUTION_STARTED + execution_id: str + graph_id: str + graph_name: str + status: str = "QUEUED" + ended_at: str | None = None + outputs: dict[str, Any] | None = None + error: str | None = None + timeout_reached: bool | None = None + + +class InsufficientCreditsResponse(ToolResponseBase): + """Response for insufficient credits.""" + + type: ResponseType = ResponseType.INSUFFICIENT_CREDITS + balance: float + + +class ValidationErrorResponse(ToolResponseBase): + """Response for validation errors.""" + + type: ResponseType = ResponseType.VALIDATION_ERROR + error: str + details: dict[str, Any] | None = None + + +# Auth/error models +class NeedLoginResponse(ToolResponseBase): + """Response when login is needed.""" + + type: ResponseType = ResponseType.NEED_LOGIN + agent_info: dict[str, Any] | None = None + + +class ErrorResponse(ToolResponseBase): + """Response for errors.""" + + type: ResponseType = ResponseType.ERROR + error: str | None = None + details: dict[str, Any] | None = None diff --git a/autogpt_platform/backend/backend/server/v2/chat/tools/run_agent.py b/autogpt_platform/backend/backend/server/v2/chat/tools/run_agent.py new file mode 100644 index 0000000000..1ea177cc31 --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/tools/run_agent.py @@ -0,0 +1,241 @@ +"""Tool for running an agent manually (one-off execution).""" + +import logging +from typing import Any + +from backend.data.graph import get_graph +from backend.data.model import CredentialsMetaInput +from backend.executor import utils as execution_utils +from backend.integrations.creds_manager import IntegrationCredentialsManager +from backend.server.v2.chat.tools.base import BaseTool +from backend.server.v2.chat.tools.get_required_setup_info import ( + GetRequiredSetupInfoTool, +) +from backend.server.v2.chat.tools.models import ( + ErrorResponse, + ExecutionStartedResponse, + SetupInfo, + SetupRequirementsResponse, + ToolResponseBase, +) +from backend.server.v2.library import db as library_db +from backend.server.v2.library import model as library_model + +logger = logging.getLogger(__name__) + + +class RunAgentTool(BaseTool): + """Tool for executing an agent manually with immediate results.""" + + @property + def name(self) -> str: + return "run_agent" + + @property + def description(self) -> str: + return """Run an agent immediately (one-off manual execution). + IMPORTANT: Before calling this tool, you MUST first call get_agent_details to determine what inputs are required. + The 'inputs' parameter must be a dictionary containing ALL required input values identified by get_agent_details. + Example: If get_agent_details shows required inputs 'search_query' and 'max_results', you must pass: + inputs={"search_query": "user's query", "max_results": 10}""" + + @property + def parameters(self) -> dict[str, Any]: + return { + "type": "object", + "properties": { + "username_agent_slug": { + "type": "string", + "description": "The ID of the agent to run (graph ID or marketplace slug)", + }, + "inputs": { + "type": "object", + "description": 'REQUIRED: Dictionary of input values. Must include ALL required inputs from get_agent_details. Format: {"input_name": value}', + "additionalProperties": True, + }, + }, + "required": ["username_agent_slug"], + } + + @property + def requires_auth(self) -> bool: + """This tool requires authentication.""" + return True + + async def _execute( + self, + user_id: str | None, + session_id: str, + **kwargs, + ) -> ToolResponseBase: + """Execute an agent manually. + + Args: + user_id: Authenticated user ID + session_id: Chat session ID + **kwargs: Execution parameters + + Returns: + JSON formatted execution result + + """ + + assert ( + user_id is not None + ), "User ID is required to run an agent. Superclass enforces authentication." + + username_agent_slug = kwargs.get("username_agent_slug", "").strip() + inputs = kwargs.get("inputs", {}) + + # Call _execute directly since we're calling internally from another tool + response = await GetRequiredSetupInfoTool()._execute( + user_id, session_id, **kwargs + ) + + if not isinstance(response, SetupRequirementsResponse): + return ErrorResponse( + message="Failed to get required setup information", + session_id=session_id, + ) + + setup_info = SetupInfo.model_validate(response.setup_info) + + if not setup_info.user_readiness.ready_to_run: + return ErrorResponse( + message=f"User is not ready to run the agent. User Readiness: {setup_info.user_readiness.model_dump_json()} Requirments: {setup_info.requirements}", + session_id=session_id, + ) + + # Get the graph using the graph_id and graph_version from the setup response + if not response.graph_id or not response.graph_version: + return ErrorResponse( + message=f"Graph information not available for {username_agent_slug}", + session_id=session_id, + ) + + graph = await get_graph( + graph_id=response.graph_id, + version=response.graph_version, + user_id=None, # Public access for store graphs + include_subgraphs=True, + ) + + if not graph: + return ErrorResponse( + message=f"Graph {username_agent_slug} ({response.graph_id}v{response.graph_version}) not found", + session_id=session_id, + ) + + # Check if we already have a library agent for this graph + existing_library_agent = await library_db.get_library_agent_by_graph_id( + graph_id=graph.id, user_id=user_id + ) + if not existing_library_agent: + # Now we need to add the graph to the users library + library_agents: list[library_model.LibraryAgent] = ( + await library_db.create_library_agent( + graph=graph, + user_id=user_id, + create_library_agents_for_sub_graphs=False, + ) + ) + assert len(library_agents) == 1, "Expected 1 library agent to be created" + library_agent = library_agents[0] + else: + library_agent = existing_library_agent + + # Build credentials mapping for the graph + graph_credentials_inputs: dict[str, CredentialsMetaInput] = {} + + # Get aggregated credentials requirements from the graph + aggregated_creds = graph.aggregate_credentials_inputs() + logger.debug( + f"Matching credentials for graph {graph.id}: {len(aggregated_creds)} required" + ) + + if aggregated_creds: + # Get all available credentials for the user + creds_manager = IntegrationCredentialsManager() + available_creds = await creds_manager.store.get_all_creds(user_id) + + # Track unmatched credentials for error reporting + missing_creds: list[str] = [] + + # For each required credential field, find a matching user credential + # field_info.provider is a frozenset because aggregate_credentials_inputs() + # combines requirements from multiple nodes. A credential matches if its + # provider is in the set of acceptable providers. + for credential_field_name, ( + credential_requirements, + _node_fields, + ) in aggregated_creds.items(): + # Find first matching credential by provider and type + matching_cred = next( + ( + cred + for cred in available_creds + if cred.provider in credential_requirements.provider + and cred.type in credential_requirements.supported_types + ), + None, + ) + + if matching_cred: + # Use Pydantic validation to ensure type safety + try: + graph_credentials_inputs[credential_field_name] = ( + CredentialsMetaInput( + id=matching_cred.id, + provider=matching_cred.provider, # type: ignore + type=matching_cred.type, + title=matching_cred.title, + ) + ) + except Exception as e: + logger.error( + f"Failed to create CredentialsMetaInput for field '{credential_field_name}': " + f"provider={matching_cred.provider}, type={matching_cred.type}, " + f"credential_id={matching_cred.id}", + exc_info=True, + ) + missing_creds.append( + f"{credential_field_name} (validation failed: {e})" + ) + else: + missing_creds.append( + f"{credential_field_name} " + f"(requires provider in {list(credential_requirements.provider)}, " + f"type in {list(credential_requirements.supported_types)})" + ) + + # Fail fast if any required credentials are missing + if missing_creds: + logger.warning( + f"Cannot execute agent - missing credentials: {missing_creds}" + ) + return ErrorResponse( + message=f"Cannot execute agent: missing {len(missing_creds)} required credential(s). You need to call the get_required_setup_info tool to setup the credentials." + f"Please set up the following credentials: {', '.join(missing_creds)}", + session_id=session_id, + details={"missing_credentials": missing_creds}, + ) + + logger.info( + f"Credential matching complete: {len(graph_credentials_inputs)}/{len(aggregated_creds)} matched" + ) + + # At this point we know the user is ready to run the agent + # So we can execute the agent + execution = await execution_utils.add_graph_execution( + graph_id=library_agent.graph_id, + user_id=user_id, + inputs=inputs, + graph_credentials_inputs=graph_credentials_inputs, + ) + return ExecutionStartedResponse( + message="Agent execution successfully started. Do not run this tool again unless specifically asked to run the agent again.", + session_id=session_id, + execution_id=execution.id, + graph_id=library_agent.graph_id, + graph_name=library_agent.name, + ) diff --git a/autogpt_platform/backend/backend/server/v2/chat/tools/run_agent_test.py b/autogpt_platform/backend/backend/server/v2/chat/tools/run_agent_test.py new file mode 100644 index 0000000000..68e5b0102e --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/tools/run_agent_test.py @@ -0,0 +1,151 @@ +import uuid + +import orjson +import pytest + +from backend.server.v2.chat.tools._test_data import setup_llm_test_data, setup_test_data +from backend.server.v2.chat.tools.run_agent import RunAgentTool + +# This is so the formatter doesn't remove the fixture imports +setup_llm_test_data = setup_llm_test_data +setup_test_data = setup_test_data + + +@pytest.mark.asyncio(scope="session") +async def test_run_agent(setup_test_data): + """Test that the run_agent tool successfully executes an approved agent""" + # Use test data from fixture + user = setup_test_data["user"] + graph = setup_test_data["graph"] + store_submission = setup_test_data["store_submission"] + + # Create the tool instance + tool = RunAgentTool() + + # Build the proper marketplace agent_id format: username/slug + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute the tool + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + inputs={"test_input": "Hello World"}, + ) + + # Verify the response + assert response is not None + assert hasattr(response, "result") + # Parse the result JSON to verify the execution started + + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + assert "execution_id" in result_data + assert "graph_id" in result_data + assert result_data["graph_id"] == graph.id + assert "graph_name" in result_data + assert result_data["graph_name"] == "Test Agent" + + +@pytest.mark.asyncio(scope="session") +async def test_run_agent_missing_inputs(setup_test_data): + """Test that the run_agent tool returns error when inputs are missing""" + # Use test data from fixture + user = setup_test_data["user"] + store_submission = setup_test_data["store_submission"] + + # Create the tool instance + tool = RunAgentTool() + + # Build the proper marketplace agent_id format + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute the tool without required inputs + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + inputs={}, # Missing required input + ) + + # Verify that we get an error response + assert response is not None + assert hasattr(response, "result") + # The tool should return an ErrorResponse when setup info indicates not ready + + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + assert "message" in result_data + + +@pytest.mark.asyncio(scope="session") +async def test_run_agent_invalid_agent_id(setup_test_data): + """Test that the run_agent tool returns error for invalid agent ID""" + # Use test data from fixture + user = setup_test_data["user"] + + # Create the tool instance + tool = RunAgentTool() + + # Execute the tool with invalid agent ID + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug="invalid/agent-id", + inputs={"test_input": "Hello World"}, + ) + + # Verify that we get an error response + assert response is not None + assert hasattr(response, "result") + + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + assert "message" in result_data + # Should get an error about failed setup or not found + assert any( + phrase in result_data["message"].lower() for phrase in ["not found", "failed"] + ) + + +@pytest.mark.asyncio(scope="session") +async def test_run_agent_with_llm_credentials(setup_llm_test_data): + """Test that run_agent works with an agent requiring LLM credentials""" + # Use test data from fixture + user = setup_llm_test_data["user"] + graph = setup_llm_test_data["graph"] + store_submission = setup_llm_test_data["store_submission"] + + # Create the tool instance + tool = RunAgentTool() + + # Build the proper marketplace agent_id format + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute the tool with a prompt for the LLM + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + inputs={"user_prompt": "What is 2+2?"}, + ) + + # Verify the response + assert response is not None + assert hasattr(response, "result") + + # Parse the result JSON to verify the execution started + + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + + # Should successfully start execution since credentials are available + assert "execution_id" in result_data + assert "graph_id" in result_data + assert result_data["graph_id"] == graph.id + assert "graph_name" in result_data + assert result_data["graph_name"] == "LLM Test Agent" diff --git a/autogpt_platform/backend/backend/server/v2/chat/tools/setup_agent.py b/autogpt_platform/backend/backend/server/v2/chat/tools/setup_agent.py new file mode 100644 index 0000000000..313c40c5f9 --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/tools/setup_agent.py @@ -0,0 +1,376 @@ +"""Tool for setting up an agent with credentials and configuration.""" + +import logging +from typing import Any + +from pydantic import BaseModel + +from backend.data.graph import get_graph +from backend.data.model import CredentialsMetaInput +from backend.data.user import get_user_by_id +from backend.integrations.creds_manager import IntegrationCredentialsManager +from backend.server.v2.chat.tools.get_required_setup_info import ( + GetRequiredSetupInfoTool, +) +from backend.server.v2.chat.tools.models import ( + ExecutionStartedResponse, + SetupInfo, + SetupRequirementsResponse, +) +from backend.server.v2.library import db as library_db +from backend.server.v2.library import model as library_model +from backend.util.clients import get_scheduler_client +from backend.util.timezone_utils import ( + convert_utc_time_to_user_timezone, + get_user_timezone_or_utc, +) + +from .base import BaseTool +from .models import ErrorResponse, ToolResponseBase + +logger = logging.getLogger(__name__) + + +class AgentDetails(BaseModel): + graph_name: str + graph_id: str + graph_version: int + recommended_schedule_cron: str | None + required_credentials: dict[str, CredentialsMetaInput] + + +class SetupAgentTool(BaseTool): + """Tool for setting up an agent with scheduled execution or webhook triggers.""" + + @property + def name(self) -> str: + return "schedule_agent" + + @property + def description(self) -> str: + return """Set up an agent with credentials and configure it for scheduled execution or webhook triggers. + IMPORTANT: Before calling this tool, you MUST first call get_agent_details to determine what inputs are required. + + For SCHEDULED execution: + - Cron format: "minute hour day month weekday" (e.g., "0 9 * * 1-5" = 9am weekdays) + - Common patterns: "0 * * * *" (hourly), "0 0 * * *" (daily at midnight), "0 9 * * 1" (Mondays at 9am) + - Timezone: Use IANA timezone names like "America/New_York", "Europe/London", "Asia/Tokyo" + - The 'inputs' parameter must contain ALL required inputs from get_agent_details as a dictionary + + For WEBHOOK triggers: + - The agent will be triggered by external events + - Still requires all input values from get_agent_details""" + + @property + def parameters(self) -> dict[str, Any]: + return { + "type": "object", + "properties": { + "username_agent_slug": { + "type": "string", + "description": "The marketplace agent slug (e.g., 'username/agent-name')", + }, + "setup_type": { + "type": "string", + "enum": ["schedule", "webhook"], + "description": "Type of setup: 'schedule' for cron, 'webhook' for triggers.", + }, + "name": { + "type": "string", + "description": "Name for this setup/schedule (e.g., 'Daily Report', 'Weekly Summary')", + }, + "description": { + "type": "string", + "description": "Description of this setup", + }, + "cron": { + "type": "string", + "description": "Cron expression (5 fields: minute hour day month weekday). Examples: '0 9 * * 1-5' (9am weekdays), '*/30 * * * *' (every 30 min)", + }, + "timezone": { + "type": "string", + "description": "IANA timezone (e.g., 'America/New_York', 'Europe/London', 'UTC'). Defaults to UTC if not specified.", + }, + "inputs": { + "type": "object", + "description": 'REQUIRED: Dictionary with ALL required inputs from get_agent_details. Format: {"input_name": value}', + "additionalProperties": True, + }, + "webhook_config": { + "type": "object", + "description": "Webhook configuration (required if setup_type is 'webhook')", + "additionalProperties": True, + }, + }, + "required": ["username_agent_slug", "setup_type"], + } + + @property + def requires_auth(self) -> bool: + """This tool requires authentication.""" + return True + + async def _execute( + self, + user_id: str | None, + session_id: str, + **kwargs, + ) -> ToolResponseBase: + """Set up an agent with configuration. + + Args: + user_id: Authenticated user ID + session_id: Chat session ID + **kwargs: Setup parameters + + Returns: + JSON formatted setup result + + """ + assert ( + user_id is not None + ), "User ID is required to run an agent. Superclass enforces authentication." + setup_type = kwargs.get("setup_type", "schedule").strip() + if setup_type != "schedule": + return ErrorResponse( + message="Only schedule setup is supported at this time", + session_id=session_id, + ) + else: + cron = kwargs.get("cron", "").strip() + cron_name = kwargs.get("name", "").strip() + if not cron or not cron_name: + return ErrorResponse( + message="Cron and name are required for schedule setup", + session_id=session_id, + ) + + username_agent_slug = kwargs.get("username_agent_slug", "").strip() + inputs = kwargs.get("inputs", {}) + + library_agent = await self._get_or_add_library_agent( + username_agent_slug, user_id, session_id, **kwargs + ) + if not isinstance(library_agent, AgentDetails): + # library agent is an ErrorResponse + return library_agent + + # At this point we know the user is ready to run the agent + # Create the schedule for the agent + from backend.server.v2.library import db as library_db + + # Get the library agent model for scheduling + lib_agent = await library_db.get_library_agent_by_graph_id( + graph_id=library_agent.graph_id, user_id=user_id + ) + if not lib_agent: + return ErrorResponse( + message=f"Library agent not found for graph {library_agent.graph_id}", + session_id=session_id, + ) + + return await self._add_graph_execution_schedule( + library_agent=lib_agent, + user_id=user_id, + cron=cron, + name=cron_name, + inputs=inputs, + credentials=library_agent.required_credentials, + session_id=session_id, + ) + + async def _add_graph_execution_schedule( + self, + library_agent: library_model.LibraryAgent, + user_id: str, + cron: str, + name: str, + inputs: dict[str, Any], + credentials: dict[str, CredentialsMetaInput], + session_id: str, + **kwargs, + ) -> ExecutionStartedResponse | ErrorResponse: + # Use timezone from request if provided, otherwise fetch from user profile + user = await get_user_by_id(user_id) + user_timezone = get_user_timezone_or_utc(user.timezone if user else None) + + # Map required credentials (schema field names) to actual user credential IDs + # credentials param contains CredentialsMetaInput with schema field names as keys + # We need to find the user's actual credentials that match the provider/type + creds_manager = IntegrationCredentialsManager() + user_credentials = await creds_manager.store.get_all_creds(user_id) + + # Build a mapping from schema field name -> actual credential ID + resolved_credentials: dict[str, CredentialsMetaInput] = {} + missing_credentials: list[str] = [] + + for field_name, cred_meta in credentials.items(): + # Find a matching credential from the user's credentials + matching_cred = next( + ( + c + for c in user_credentials + if c.provider == cred_meta.provider and c.type == cred_meta.type + ), + None, + ) + + if matching_cred: + # Use the actual credential ID instead of the schema field name + # Create a new CredentialsMetaInput with the actual credential ID + # but keep the same provider/type from the original meta + resolved_credentials[field_name] = CredentialsMetaInput( + id=matching_cred.id, + provider=cred_meta.provider, + type=cred_meta.type, + title=cred_meta.title, + ) + else: + missing_credentials.append( + f"{cred_meta.title} ({cred_meta.provider}/{cred_meta.type})" + ) + + if missing_credentials: + return ErrorResponse( + message=f"Cannot execute agent: missing {len(missing_credentials)} required credential(s). You need to call the get_required_setup_info tool to setup the credentials.", + session_id=session_id, + ) + + result = await get_scheduler_client().add_execution_schedule( + user_id=user_id, + graph_id=library_agent.graph_id, + graph_version=library_agent.graph_version, + name=name, + cron=cron, + input_data=inputs, + input_credentials=resolved_credentials, + user_timezone=user_timezone, + ) + + # Convert the next_run_time back to user timezone for display + if result.next_run_time: + result.next_run_time = convert_utc_time_to_user_timezone( + result.next_run_time, user_timezone + ) + return ExecutionStartedResponse( + message="Agent execution successfully scheduled. Do not run this tool again unless specifically asked to run the agent again.", + session_id=session_id, + execution_id=result.id, + graph_id=library_agent.graph_id, + graph_name=library_agent.name, + ) + + async def _get_or_add_library_agent( + self, agent_id: str, user_id: str, session_id: str, **kwargs + ) -> AgentDetails | ErrorResponse: + # Call _execute directly since we're calling internally from another tool + response = await GetRequiredSetupInfoTool()._execute( + user_id, session_id, **kwargs + ) + + if not isinstance(response, SetupRequirementsResponse): + return ErrorResponse( + message="Failed to get required setup information", + session_id=session_id, + ) + + setup_info = SetupInfo.model_validate(response.setup_info) + + if not setup_info.user_readiness.ready_to_run: + return ErrorResponse( + message=f"User is not ready to run the agent. User Readiness: {setup_info.user_readiness.model_dump_json()} Requirments: {setup_info.requirements}", + session_id=session_id, + ) + + # Get the graph using the graph_id and graph_version from the setup response + if not response.graph_id or not response.graph_version: + return ErrorResponse( + message=f"Graph information not available for {agent_id}", + session_id=session_id, + ) + + graph = await get_graph( + graph_id=response.graph_id, + version=response.graph_version, + user_id=None, # Public access for store graphs + include_subgraphs=True, + ) + if not graph: + return ErrorResponse( + message=f"Graph {agent_id} ({response.graph_id}v{response.graph_version}) not found", + session_id=session_id, + ) + + recommended_schedule_cron = graph.recommended_schedule_cron + + # Extract credentials from the JSON schema properties + credentials_input_schema = graph.credentials_input_schema + required_credentials: dict[str, CredentialsMetaInput] = {} + if ( + isinstance(credentials_input_schema, dict) + and "properties" in credentials_input_schema + ): + for cred_name, cred_schema in credentials_input_schema[ + "properties" + ].items(): + # Get provider from credentials_provider array or properties.provider.const + provider = "unknown" + if ( + "credentials_provider" in cred_schema + and cred_schema["credentials_provider"] + ): + provider = cred_schema["credentials_provider"][0] + elif ( + "properties" in cred_schema + and "provider" in cred_schema["properties"] + ): + provider = cred_schema["properties"]["provider"].get( + "const", "unknown" + ) + + # Get type from credentials_types array or properties.type.const + cred_type = "api_key" # Default + if ( + "credentials_types" in cred_schema + and cred_schema["credentials_types"] + ): + cred_type = cred_schema["credentials_types"][0] + elif ( + "properties" in cred_schema and "type" in cred_schema["properties"] + ): + cred_type = cred_schema["properties"]["type"].get( + "const", "api_key" + ) + + required_credentials[cred_name] = CredentialsMetaInput( + id=cred_name, + title=cred_schema.get("title", cred_name), + provider=provider, # type: ignore + type=cred_type, + ) + + # Check if we already have a library agent for this graph + existing_library_agent = await library_db.get_library_agent_by_graph_id( + graph_id=graph.id, user_id=user_id + ) + if not existing_library_agent: + # Now we need to add the graph to the users library + library_agents: list[library_model.LibraryAgent] = ( + await library_db.create_library_agent( + graph=graph, + user_id=user_id, + create_library_agents_for_sub_graphs=False, + ) + ) + assert len(library_agents) == 1, "Expected 1 library agent to be created" + library_agent = library_agents[0] + else: + library_agent = existing_library_agent + + return AgentDetails( + graph_name=graph.name, + graph_id=library_agent.graph_id, + graph_version=library_agent.graph_version, + recommended_schedule_cron=recommended_schedule_cron, + required_credentials=required_credentials, + ) diff --git a/autogpt_platform/backend/backend/server/v2/chat/tools/setup_agent_test.py b/autogpt_platform/backend/backend/server/v2/chat/tools/setup_agent_test.py new file mode 100644 index 0000000000..18e594e025 --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/chat/tools/setup_agent_test.py @@ -0,0 +1,394 @@ +import uuid + +import orjson +import pytest + +from backend.server.v2.chat.tools._test_data import setup_llm_test_data, setup_test_data +from backend.server.v2.chat.tools.setup_agent import SetupAgentTool +from backend.util.clients import get_scheduler_client + +# This is so the formatter doesn't remove the fixture imports +setup_llm_test_data = setup_llm_test_data +setup_test_data = setup_test_data + + +@pytest.mark.asyncio(scope="session") +async def test_setup_agent_missing_cron(setup_test_data): + """Test error when cron is missing for schedule setup""" + # Use test data from fixture + user = setup_test_data["user"] + store_submission = setup_test_data["store_submission"] + + # Create the tool instance + tool = SetupAgentTool() + + # Build the proper marketplace agent_id format + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute without cron + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + setup_type="schedule", + inputs={"test_input": "Hello World"}, + # Missing: cron and name + ) + + # Verify error response + assert response is not None + assert hasattr(response, "result") + + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + assert "message" in result_data + assert ( + "cron" in result_data["message"].lower() + or "name" in result_data["message"].lower() + ) + + +@pytest.mark.asyncio(scope="session") +async def test_setup_agent_webhook_not_supported(setup_test_data): + """Test error when webhook setup is attempted""" + # Use test data from fixture + user = setup_test_data["user"] + store_submission = setup_test_data["store_submission"] + + # Create the tool instance + tool = SetupAgentTool() + + # Build the proper marketplace agent_id format + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute with webhook setup_type + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + setup_type="webhook", + inputs={"test_input": "Hello World"}, + ) + + # Verify error response + assert response is not None + assert hasattr(response, "result") + + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + assert "message" in result_data + message_lower = result_data["message"].lower() + assert "schedule" in message_lower and "supported" in message_lower + + +@pytest.mark.asyncio(scope="session") +@pytest.mark.skip(reason="Requires scheduler service to be running") +async def test_setup_agent_schedule_success(setup_test_data): + """Test successfully setting up an agent with a schedule""" + # Use test data from fixture + user = setup_test_data["user"] + store_submission = setup_test_data["store_submission"] + + # Create the tool instance + tool = SetupAgentTool() + + # Build the proper marketplace agent_id format + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute with schedule setup + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + setup_type="schedule", + name="Test Schedule", + description="Test schedule description", + cron="0 9 * * *", # Daily at 9am + timezone="UTC", + inputs={"test_input": "Hello World"}, + ) + + # Verify the response + assert response is not None + assert hasattr(response, "result") + + # Parse the result JSON + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + + # Check for execution started + assert "message" in result_data + assert "execution_id" in result_data + assert "graph_id" in result_data + assert "graph_name" in result_data + + +@pytest.mark.asyncio(scope="session") +@pytest.mark.skip(reason="Requires scheduler service to be running") +async def test_setup_agent_with_credentials(setup_llm_test_data): + """Test setting up an agent that requires credentials""" + # Use test data from fixture (includes OpenAI credentials) + user = setup_llm_test_data["user"] + store_submission = setup_llm_test_data["store_submission"] + + # Create the tool instance + tool = SetupAgentTool() + + # Build the proper marketplace agent_id format + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute with schedule setup + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + setup_type="schedule", + name="LLM Schedule", + description="LLM schedule with credentials", + cron="*/30 * * * *", # Every 30 minutes + timezone="America/New_York", + inputs={"user_prompt": "What is 2+2?"}, + ) + + # Verify the response + assert response is not None + assert hasattr(response, "result") + + # Parse the result JSON + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + + # Should succeed since user has OpenAI credentials + assert "execution_id" in result_data + assert "graph_id" in result_data + + +@pytest.mark.asyncio(scope="session") +async def test_setup_agent_invalid_agent(setup_test_data): + """Test error when agent doesn't exist""" + # Use test data from fixture + user = setup_test_data["user"] + + # Create the tool instance + tool = SetupAgentTool() + + # Execute with non-existent agent + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug="nonexistent/agent", + setup_type="schedule", + name="Test Schedule", + cron="0 9 * * *", + inputs={}, + ) + + # Verify error response + assert response is not None + assert hasattr(response, "result") + + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + assert "message" in result_data + # Should fail to find the agent + assert any( + phrase in result_data["message"].lower() + for phrase in ["not found", "failed", "error"] + ) + + +@pytest.mark.asyncio(scope="session") +@pytest.mark.skip(reason="Requires scheduler service to be running") +async def test_setup_agent_schedule_created_in_scheduler(setup_test_data): + """Test that the schedule is actually created in the scheduler service""" + # Use test data from fixture + user = setup_test_data["user"] + graph = setup_test_data["graph"] + store_submission = setup_test_data["store_submission"] + + # Create the tool instance + tool = SetupAgentTool() + + # Build the proper marketplace agent_id format + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Create a unique schedule name to identify this test + schedule_name = f"Test Schedule {uuid.uuid4()}" + + # Execute with schedule setup + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + setup_type="schedule", + name=schedule_name, + description="Test schedule to verify credentials", + cron="0 0 * * *", # Daily at midnight + timezone="UTC", + inputs={"test_input": "Scheduled execution"}, + ) + + # Verify the response + assert response is not None + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + assert "execution_id" in result_data + + # Now verify the schedule was created in the scheduler service + scheduler = get_scheduler_client() + schedules = await scheduler.get_execution_schedules(graph.id, user.id) + + # Find our schedule + our_schedule = None + for schedule in schedules: + if schedule.name == schedule_name: + our_schedule = schedule + break + + assert ( + our_schedule is not None + ), f"Schedule '{schedule_name}' not found in scheduler" + assert our_schedule.cron == "0 0 * * *" + assert our_schedule.graph_id == graph.id + + # Clean up: delete the schedule + await scheduler.delete_schedule(our_schedule.id, user_id=user.id) + + +@pytest.mark.asyncio(scope="session") +@pytest.mark.skip(reason="Requires scheduler service to be running") +async def test_setup_agent_schedule_with_credentials_triggered(setup_llm_test_data): + """Test that credentials are properly passed when a schedule is triggered""" + # Use test data from fixture (includes OpenAI credentials) + user = setup_llm_test_data["user"] + graph = setup_llm_test_data["graph"] + store_submission = setup_llm_test_data["store_submission"] + + # Create the tool instance + tool = SetupAgentTool() + + # Build the proper marketplace agent_id format + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Create a unique schedule name + schedule_name = f"LLM Test Schedule {uuid.uuid4()}" + + # Execute with schedule setup + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + setup_type="schedule", + name=schedule_name, + description="Test LLM schedule with credentials", + cron="* * * * *", # Every minute (for testing) + timezone="UTC", + inputs={"user_prompt": "Test prompt for credentials"}, + ) + + # Verify the response + assert response is not None + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + assert "execution_id" in result_data + + # Get the schedule from the scheduler + scheduler = get_scheduler_client() + schedules = await scheduler.get_execution_schedules(graph.id, user.id) + + # Find our schedule + our_schedule = None + for schedule in schedules: + if schedule.name == schedule_name: + our_schedule = schedule + break + + assert our_schedule is not None, f"Schedule '{schedule_name}' not found" + + # Verify the schedule has the correct input data + assert our_schedule.input_data is not None + assert "user_prompt" in our_schedule.input_data + assert our_schedule.input_data["user_prompt"] == "Test prompt for credentials" + + # Verify credentials are stored in the schedule + # The credentials should be stored as input_credentials + assert our_schedule.input_credentials is not None + + # The credentials should contain the OpenAI provider credential + # Note: The exact structure depends on how credentials are serialized + # We're checking that credentials data exists and has the right provider + if our_schedule.input_credentials: + # Convert to dict if needed + creds_dict = ( + our_schedule.input_credentials + if isinstance(our_schedule.input_credentials, dict) + else {} + ) + + # Check if any credential has openai provider + has_openai_cred = False + for cred_key, cred_value in creds_dict.items(): + if isinstance(cred_value, dict): + if cred_value.get("provider") == "openai": + has_openai_cred = True + # Verify the credential has the expected structure + assert "id" in cred_value or "api_key" in cred_value + break + + # If we have LLM block, we should have stored credentials + assert has_openai_cred, "OpenAI credentials not found in schedule" + + # Clean up: delete the schedule + await scheduler.delete_schedule(our_schedule.id, user_id=user.id) + + +@pytest.mark.asyncio(scope="session") +@pytest.mark.skip(reason="Requires scheduler service to be running") +async def test_setup_agent_creates_library_agent(setup_test_data): + """Test that setup creates a library agent for the user""" + # Use test data from fixture + user = setup_test_data["user"] + graph = setup_test_data["graph"] + store_submission = setup_test_data["store_submission"] + + # Create the tool instance + tool = SetupAgentTool() + + # Build the proper marketplace agent_id format + agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}" + + # Execute with schedule setup + response = await tool.execute( + user_id=user.id, + session_id=str(uuid.uuid4()), + tool_call_id=str(uuid.uuid4()), + username_agent_slug=agent_marketplace_id, + setup_type="schedule", + name="Library Test Schedule", + cron="0 12 * * *", # Daily at noon + inputs={"test_input": "Library test"}, + ) + + # Verify the response + assert response is not None + assert isinstance(response.result, str) + result_data = orjson.loads(response.result) + assert "graph_id" in result_data + assert result_data["graph_id"] == graph.id + + # Verify library agent was created + from backend.server.v2.library import db as library_db + + library_agent = await library_db.get_library_agent_by_graph_id( + graph_id=graph.id, user_id=user.id + ) + assert library_agent is not None + assert library_agent.graph_id == graph.id + assert library_agent.name == "Test Agent" diff --git a/autogpt_platform/backend/backend/util/exceptions.py b/autogpt_platform/backend/backend/util/exceptions.py index d17dc7487a..c5129f0ede 100644 --- a/autogpt_platform/backend/backend/util/exceptions.py +++ b/autogpt_platform/backend/backend/util/exceptions.py @@ -98,3 +98,9 @@ class DatabaseError(Exception): """Raised when there is an error interacting with the database""" pass + + +class RedisError(Exception): + """Raised when there is an error interacting with Redis""" + + pass diff --git a/autogpt_platform/backend/backend/util/feature_flag.py b/autogpt_platform/backend/backend/util/feature_flag.py index 06f685102a..fbd3573112 100644 --- a/autogpt_platform/backend/backend/util/feature_flag.py +++ b/autogpt_platform/backend/backend/util/feature_flag.py @@ -5,7 +5,8 @@ from functools import wraps from typing import Any, Awaitable, Callable, TypeVar import ldclient -from fastapi import HTTPException +from autogpt_libs.auth.dependencies import get_optional_user_id +from fastapi import HTTPException, Security from ldclient import Context, LDClient from ldclient.config import Config from typing_extensions import ParamSpec @@ -36,6 +37,7 @@ class Flag(str, Enum): BETA_BLOCKS = "beta-blocks" AGENT_ACTIVITY = "agent-activity" ENABLE_PLATFORM_PAYMENT = "enable-platform-payment" + CHAT = "chat" def is_configured() -> bool: @@ -252,6 +254,72 @@ def feature_flag( return decorator +def create_feature_flag_dependency( + flag_key: Flag, + default: bool = False, +) -> Callable[[str | None], Awaitable[None]]: + """ + Create a FastAPI dependency that checks a feature flag. + + This dependency automatically extracts the user_id from the JWT token + (if present) for proper LaunchDarkly user targeting, while still + supporting anonymous access. + + Args: + flag_key: The Flag enum value to check + default: Default value if flag evaluation fails + + Returns: + An async dependency function that raises HTTPException if flag is disabled + + Example: + router = APIRouter( + dependencies=[Depends(create_feature_flag_dependency(Flag.CHAT))] + ) + """ + + async def check_feature_flag( + user_id: str | None = Security(get_optional_user_id), + ) -> None: + """Check if feature flag is enabled for the user. + + The user_id is automatically injected from JWT authentication if present, + or None for anonymous access. + """ + # For routes that don't require authentication, use anonymous context + check_user_id = user_id or "anonymous" + + if not is_configured(): + logger.debug( + f"LaunchDarkly not configured, using default {flag_key.value}={default}" + ) + if not default: + raise HTTPException(status_code=404, detail="Feature not available") + return + + try: + client = get_client() + if not client.is_initialized(): + logger.debug( + f"LaunchDarkly not initialized, using default {flag_key.value}={default}" + ) + if not default: + raise HTTPException(status_code=404, detail="Feature not available") + return + + is_enabled = await is_feature_enabled(flag_key, check_user_id, default) + + if not is_enabled: + raise HTTPException(status_code=404, detail="Feature not available") + except Exception as e: + logger.warning( + f"LaunchDarkly error for flag {flag_key.value}: {e}, using default={default}" + ) + raise HTTPException(status_code=500, detail="Failed to check feature flag") + + return check_feature_flag + + @contextlib.contextmanager def mock_flag_variation(flag_key: str, return_value: Any): """Context manager for testing feature flags.""" diff --git a/autogpt_platform/frontend/src/app/api/openapi.json b/autogpt_platform/frontend/src/app/api/openapi.json index 3f9c487aeb..43ee6b4fc2 100644 --- a/autogpt_platform/frontend/src/app/api/openapi.json +++ b/autogpt_platform/frontend/src/app/api/openapi.json @@ -4795,6 +4795,181 @@ "security": [{ "APIKeyAuthenticator-X-Postmark-Webhook-Token": [] }] } }, + "/api/chat/sessions": { + "post": { + "tags": ["v2", "chat", "chat"], + "summary": "Create Session", + "description": "Create a new chat session.\n\nInitiates a new chat session for either an authenticated or anonymous user.\n\nArgs:\n user_id: The optional authenticated user ID parsed from the JWT. If missing, creates an anonymous session.\n\nReturns:\n CreateSessionResponse: Details of the created session.", + "operationId": "postV2CreateSession", + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/CreateSessionResponse" + } + } + } + } + }, + "security": [{ "HTTPBearer": [] }] + } + }, + "/api/chat/sessions/{session_id}": { + "get": { + "tags": ["v2", "chat", "chat"], + "summary": "Get Session", + "description": "Retrieve the details of a specific chat session.\n\nLooks up a chat session by ID for the given user (if authenticated) and returns all session data including messages.\n\nArgs:\n session_id: The unique identifier for the desired chat session.\n user_id: The optional authenticated user ID, or None for anonymous access.\n\nReturns:\n SessionDetailResponse: Details for the requested session; raises NotFoundError if not found.", + "operationId": "getV2GetSession", + "security": [{ "HTTPBearer": [] }], + "parameters": [ + { + "name": "session_id", + "in": "path", + "required": true, + "schema": { "type": "string", "title": "Session Id" } + } + ], + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/SessionDetailResponse" + } + } + } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { "$ref": "#/components/schemas/HTTPValidationError" } + } + } + } + } + } + }, + "/api/chat/sessions/{session_id}/stream": { + "get": { + "tags": ["v2", "chat", "chat"], + "summary": "Stream Chat", + "description": "Stream chat responses for a session.\n\nStreams the AI/completion responses in real time over Server-Sent Events (SSE), including:\n - Text fragments as they are generated\n - Tool call UI elements (if invoked)\n - Tool execution results\n\nArgs:\n session_id: The chat session identifier to associate with the streamed messages.\n message: The user's new message to process.\n user_id: Optional authenticated user ID.\n is_user_message: Whether the message is a user message.\nReturns:\n StreamingResponse: SSE-formatted response chunks.", + "operationId": "getV2StreamChat", + "security": [{ "HTTPBearer": [] }], + "parameters": [ + { + "name": "session_id", + "in": "path", + "required": true, + "schema": { "type": "string", "title": "Session Id" } + }, + { + "name": "message", + "in": "query", + "required": true, + "schema": { + "type": "string", + "minLength": 1, + "maxLength": 10000, + "title": "Message" + } + }, + { + "name": "is_user_message", + "in": "query", + "required": false, + "schema": { + "type": "boolean", + "default": true, + "title": "Is User Message" + } + } + ], + "responses": { + "200": { + "description": "Successful Response", + "content": { "application/json": { "schema": {} } } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { "$ref": "#/components/schemas/HTTPValidationError" } + } + } + } + } + } + }, + "/api/chat/sessions/{session_id}/assign-user": { + "patch": { + "tags": ["v2", "chat", "chat"], + "summary": "Session Assign User", + "description": "Assign an authenticated user to a chat session.\n\nUsed (typically post-login) to claim an existing anonymous session as the current authenticated user.\n\nArgs:\n session_id: The identifier for the (previously anonymous) session.\n user_id: The authenticated user's ID to associate with the session.\n\nReturns:\n dict: Status of the assignment.", + "operationId": "patchV2SessionAssignUser", + "security": [{ "HTTPBearer": [] }, { "HTTPBearerJWT": [] }], + "parameters": [ + { + "name": "session_id", + "in": "path", + "required": true, + "schema": { "type": "string", "title": "Session Id" } + } + ], + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "type": "object", + "additionalProperties": true, + "title": "Response Patchv2Sessionassignuser" + } + } + } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { "$ref": "#/components/schemas/HTTPValidationError" } + } + } + }, + "401": { + "$ref": "#/components/responses/HTTP401NotAuthenticatedError" + } + } + } + }, + "/api/chat/health": { + "get": { + "tags": ["v2", "chat", "chat"], + "summary": "Health Check", + "description": "Health check endpoint for the chat service.\n\nPerforms a full cycle test of session creation, assignment, and retrieval. Should always return healthy\nif the service and data layer are operational.\n\nReturns:\n dict: A status dictionary indicating health, service name, and API version.", + "operationId": "getV2HealthCheck", + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "additionalProperties": true, + "type": "object", + "title": "Response Getv2Healthcheck" + } + } + } + } + }, + "security": [{ "HTTPBearer": [] }] + } + }, "/health": { "get": { "tags": ["health"], @@ -5371,6 +5546,20 @@ "required": ["graph"], "title": "CreateGraph" }, + "CreateSessionResponse": { + "properties": { + "id": { "type": "string", "title": "Id" }, + "created_at": { "type": "string", "title": "Created At" }, + "user_id": { + "anyOf": [{ "type": "string" }, { "type": "null" }], + "title": "User Id" + } + }, + "type": "object", + "required": ["id", "created_at", "user_id"], + "title": "CreateSessionResponse", + "description": "Response model containing information on a newly created chat session." + }, "Creator": { "properties": { "name": { "type": "string", "title": "Name" }, @@ -7500,6 +7689,26 @@ "required": ["items", "total_items", "page", "more_pages"], "title": "SearchResponse" }, + "SessionDetailResponse": { + "properties": { + "id": { "type": "string", "title": "Id" }, + "created_at": { "type": "string", "title": "Created At" }, + "updated_at": { "type": "string", "title": "Updated At" }, + "user_id": { + "anyOf": [{ "type": "string" }, { "type": "null" }], + "title": "User Id" + }, + "messages": { + "items": { "additionalProperties": true, "type": "object" }, + "type": "array", + "title": "Messages" + } + }, + "type": "object", + "required": ["id", "created_at", "updated_at", "user_id", "messages"], + "title": "SessionDetailResponse", + "description": "Response model providing complete details for a chat session, including messages." + }, "SetGraphActiveVersion": { "properties": { "active_graph_version": { @@ -9654,7 +9863,8 @@ "type": "apiKey", "in": "header", "name": "X-Postmark-Webhook-Token" - } + }, + "HTTPBearer": { "type": "http", "scheme": "bearer" } }, "responses": { "HTTP401NotAuthenticatedError": {