feat(platform): Chat system backend (#11230)

Implements foundational backend infrastructure for chat-based agent
interaction system. Users will be able to discover, configure, and run
marketplace agents through conversational AI.

**Note:** Chat routes are behind a feature flag 

### Changes 🏗️

**Core Chat System:**
- Chat service with LLM orchestration (Claude 3.5 Sonnet, Haiku, GPT-4)
- REST API routes for sessions and messages
- Database layer for chat persistence
- System prompts and configuration

**5 Conversational Tools:**
1. `find_agent` - Search marketplace by keywords
2. `get_agent_details` - Fetch agent info, inputs, credentials
3. `get_required_setup_info` - Check user readiness, missing credentials
4. `run_agent` - Execute agents immediately
5. `setup_agent` - Configure scheduled execution with cron

**Testing:**
- 28 tests across chat tools (23 passing, 5 skipped for scheduler)
- Test fixtures for simple, LLM, and Firecrawl agents
- Service and data layer tests

**Bug Fixes:**
- Fixed `setup_agent.py` to create schedules instead of immediate
execution
- Fixed graph lookup to use UUID instead of username/slug
- Fixed credential matching by provider/type instead of ID
- Fixed internal tool calls to use `._execute()` instead of `.execute()`

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
- [x] All 28 chat tool tests pass (23 pass, 5 skip - require scheduler)
  - [x] Code formatting and linting pass
  - [x] Tool execution flow validated through unit tests
  - [x] Agent discovery, details, and execution tested
  - [x] Credential parsing and matching tested

#### For configuration changes:
- [x] `.env.default` is updated or already compatible with my changes
- [x] `docker-compose.yml` is updated or already compatible with my
changes
- [x] I have included a list of my configuration changes in the PR
description (under **Changes**)

No configuration changes required - all existing settings compatible.
This commit is contained in:
Swifty
2025-11-05 14:49:01 +01:00
committed by GitHub
parent 2ad5a88a5c
commit 2f87e13d17
27 changed files with 4985 additions and 5 deletions

View File

@@ -1,5 +1,10 @@
from .config import verify_settings
from .dependencies import get_user_id, requires_admin_user, requires_user
from .dependencies import (
get_optional_user_id,
get_user_id,
requires_admin_user,
requires_user,
)
from .helpers import add_auth_responses_to_openapi
from .models import User
@@ -8,6 +13,7 @@ __all__ = [
"get_user_id",
"requires_admin_user",
"requires_user",
"get_optional_user_id",
"add_auth_responses_to_openapi",
"User",
]

View File

@@ -7,15 +7,50 @@ These are the high-level dependency functions used in route definitions.
import logging
import fastapi
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from .jwt_utils import get_jwt_payload, verify_user
from .models import User
logger = logging.getLogger(__name__)
optional_bearer = HTTPBearer(auto_error=False)
# Header name for admin impersonation
IMPERSONATION_HEADER_NAME = "X-Act-As-User-Id"
logger = logging.getLogger(__name__)
def get_optional_user_id(
credentials: HTTPAuthorizationCredentials | None = fastapi.Security(
optional_bearer
),
) -> str | None:
"""
Attempts to extract the user ID ("sub" claim) from a Bearer JWT if provided.
This dependency allows for both authenticated and anonymous access. If a valid bearer token is
supplied, it parses the JWT and extracts the user ID. If the token is missing or invalid, it returns None,
treating the request as anonymous.
Args:
credentials: Optional HTTPAuthorizationCredentials object from FastAPI Security dependency.
Returns:
The user ID (str) extracted from the JWT "sub" claim, or None if no valid token is present.
"""
if not credentials:
return None
try:
# Parse JWT token to get user ID
from autogpt_libs.auth.jwt_utils import parse_jwt_token
payload = parse_jwt_token(credentials.credentials)
return payload.get("sub")
except Exception as e:
logger.debug(f"Auth token validation failed (anonymous access): {e}")
return None
async def requires_user(jwt_payload: dict = fastapi.Security(get_jwt_payload)) -> User:
"""

View File

@@ -27,6 +27,7 @@ import backend.server.v2.admin.credit_admin_routes
import backend.server.v2.admin.store_admin_routes
import backend.server.v2.builder
import backend.server.v2.builder.routes
import backend.server.v2.chat.routes as chat_routes
import backend.server.v2.library.db
import backend.server.v2.library.model
import backend.server.v2.library.routes
@@ -49,7 +50,12 @@ from backend.util.exceptions import (
NotAuthorizedError,
NotFoundError,
)
from backend.util.feature_flag import initialize_launchdarkly, shutdown_launchdarkly
from backend.util.feature_flag import (
Flag,
create_feature_flag_dependency,
initialize_launchdarkly,
shutdown_launchdarkly,
)
from backend.util.service import UnhealthyServiceError
settings = backend.util.settings.Settings()
@@ -284,6 +290,14 @@ app.include_router(
tags=["v1", "email"],
prefix="/api/email",
)
app.include_router(
chat_routes.router,
tags=["v2", "chat"],
prefix="/api/chat",
dependencies=[
fastapi.Depends(create_feature_flag_dependency(Flag.CHAT, default=False))
],
)
app.mount("/external-api", external_app)

View File

@@ -0,0 +1,114 @@
"""Configuration management for chat system."""
import os
from pathlib import Path
from pydantic import Field, field_validator
from pydantic_settings import BaseSettings
class ChatConfig(BaseSettings):
"""Configuration for the chat system."""
# OpenAI API Configuration
model: str = Field(
default="qwen/qwen3-235b-a22b-2507", description="Default model to use"
)
api_key: str | None = Field(default=None, description="OpenAI API key")
base_url: str | None = Field(
default="https://openrouter.ai/api/v1",
description="Base URL for API (e.g., for OpenRouter)",
)
# Session TTL Configuration - 12 hours
session_ttl: int = Field(default=43200, description="Session TTL in seconds")
# System Prompt Configuration
system_prompt_path: str = Field(
default="prompts/chat_system.md",
description="Path to system prompt file relative to chat module",
)
# Streaming Configuration
max_context_messages: int = Field(
default=50, ge=1, le=200, description="Maximum context messages"
)
stream_timeout: int = Field(default=300, description="Stream timeout in seconds")
max_retries: int = Field(default=3, description="Maximum number of retries")
@field_validator("api_key", mode="before")
@classmethod
def get_api_key(cls, v):
"""Get API key from environment if not provided."""
if v is None:
# Try to get from environment variables
# First check for CHAT_API_KEY (Pydantic prefix)
v = os.getenv("CHAT_API_KEY")
if not v:
# Fall back to OPEN_ROUTER_API_KEY
v = os.getenv("OPEN_ROUTER_API_KEY")
if not v:
# Fall back to OPENAI_API_KEY
v = os.getenv("OPENAI_API_KEY")
return v
@field_validator("base_url", mode="before")
@classmethod
def get_base_url(cls, v):
"""Get base URL from environment if not provided."""
if v is None:
# Check for OpenRouter or custom base URL
v = os.getenv("CHAT_BASE_URL")
if not v:
v = os.getenv("OPENROUTER_BASE_URL")
if not v:
v = os.getenv("OPENAI_BASE_URL")
if not v:
v = "https://openrouter.ai/api/v1"
return v
def get_system_prompt(self, **template_vars) -> str:
"""Load and render the system prompt from file.
Args:
**template_vars: Variables to substitute in the template
Returns:
Rendered system prompt string
"""
# Get the path relative to this module
module_dir = Path(__file__).parent
prompt_path = module_dir / self.system_prompt_path
# Check for .j2 extension first (Jinja2 template)
j2_path = Path(str(prompt_path) + ".j2")
if j2_path.exists():
try:
from jinja2 import Template
template = Template(j2_path.read_text())
return template.render(**template_vars)
except ImportError:
# Jinja2 not installed, fall back to reading as plain text
return j2_path.read_text()
# Check for markdown file
if prompt_path.exists():
content = prompt_path.read_text()
# Simple variable substitution if Jinja2 is not available
for key, value in template_vars.items():
placeholder = f"{{{key}}}"
content = content.replace(placeholder, str(value))
return content
raise FileNotFoundError(f"System prompt file not found: {prompt_path}")
class Config:
"""Pydantic config."""
env_file = ".env"
env_file_encoding = "utf-8"
extra = "ignore" # Ignore extra environment variables

View File

@@ -0,0 +1,201 @@
import logging
import uuid
from datetime import UTC, datetime
from openai.types.chat import (
ChatCompletionAssistantMessageParam,
ChatCompletionDeveloperMessageParam,
ChatCompletionFunctionMessageParam,
ChatCompletionMessageParam,
ChatCompletionSystemMessageParam,
ChatCompletionToolMessageParam,
ChatCompletionUserMessageParam,
)
from openai.types.chat.chat_completion_assistant_message_param import FunctionCall
from openai.types.chat.chat_completion_message_tool_call_param import (
ChatCompletionMessageToolCallParam,
Function,
)
from pydantic import BaseModel
from backend.data.redis_client import get_redis_async
from backend.server.v2.chat.config import ChatConfig
from backend.util.exceptions import RedisError
logger = logging.getLogger(__name__)
config = ChatConfig()
class ChatMessage(BaseModel):
role: str
content: str
name: str | None = None
tool_call_id: str | None = None
refusal: str | None = None
tool_calls: list[dict] | None = None
function_call: dict | None = None
class Usage(BaseModel):
prompt_tokens: int
completion_tokens: int
total_tokens: int
class ChatSession(BaseModel):
session_id: str
user_id: str | None
messages: list[ChatMessage]
usage: list[Usage]
credentials: dict[str, dict] = {} # Map of provider -> credential metadata
started_at: datetime
updated_at: datetime
@staticmethod
def new(user_id: str | None) -> "ChatSession":
return ChatSession(
session_id=str(uuid.uuid4()),
user_id=user_id,
messages=[],
usage=[],
credentials={},
started_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
def to_openai_messages(self) -> list[ChatCompletionMessageParam]:
messages = []
for message in self.messages:
if message.role == "developer":
m = ChatCompletionDeveloperMessageParam(
role="developer",
content=message.content,
)
if message.name:
m["name"] = message.name
messages.append(m)
elif message.role == "system":
m = ChatCompletionSystemMessageParam(
role="system",
content=message.content,
)
if message.name:
m["name"] = message.name
messages.append(m)
elif message.role == "user":
m = ChatCompletionUserMessageParam(
role="user",
content=message.content,
)
if message.name:
m["name"] = message.name
messages.append(m)
elif message.role == "assistant":
m = ChatCompletionAssistantMessageParam(
role="assistant",
content=message.content,
)
if message.function_call:
m["function_call"] = FunctionCall(
arguments=message.function_call["arguments"],
name=message.function_call["name"],
)
if message.refusal:
m["refusal"] = message.refusal
if message.tool_calls:
t: list[ChatCompletionMessageToolCallParam] = []
for tool_call in message.tool_calls:
# Tool calls are stored with nested structure: {id, type, function: {name, arguments}}
function_data = tool_call.get("function", {})
# Skip tool calls that are missing required fields
if "id" not in tool_call or "name" not in function_data:
logger.warning(
f"Skipping invalid tool call: missing required fields. "
f"Got: {tool_call.keys()}, function keys: {function_data.keys()}"
)
continue
# Arguments are stored as a JSON string
arguments_str = function_data.get("arguments", "{}")
t.append(
ChatCompletionMessageToolCallParam(
id=tool_call["id"],
type="function",
function=Function(
arguments=arguments_str,
name=function_data["name"],
),
)
)
m["tool_calls"] = t
if message.name:
m["name"] = message.name
messages.append(m)
elif message.role == "tool":
messages.append(
ChatCompletionToolMessageParam(
role="tool",
content=message.content,
tool_call_id=message.tool_call_id or "",
)
)
elif message.role == "function":
messages.append(
ChatCompletionFunctionMessageParam(
role="function",
content=message.content,
name=message.name or "",
)
)
return messages
async def get_chat_session(
session_id: str,
user_id: str | None,
) -> ChatSession | None:
"""Get a chat session by ID."""
redis_key = f"chat:session:{session_id}"
async_redis = await get_redis_async()
raw_session: bytes | None = await async_redis.get(redis_key)
if raw_session is None:
logger.warning(f"Session {session_id} not found in Redis")
return None
try:
session = ChatSession.model_validate_json(raw_session)
except Exception as e:
logger.error(f"Failed to deserialize session {session_id}: {e}", exc_info=True)
raise RedisError(f"Corrupted session data for {session_id}") from e
if session.user_id is not None and session.user_id != user_id:
logger.warning(
f"Session {session_id} user id mismatch: {session.user_id} != {user_id}"
)
return None
return session
async def upsert_chat_session(
session: ChatSession,
) -> ChatSession:
"""Update a chat session with the given messages."""
redis_key = f"chat:session:{session.session_id}"
async_redis = await get_redis_async()
resp = await async_redis.setex(
redis_key, config.session_ttl, session.model_dump_json()
)
if not resp:
raise RedisError(
f"Failed to persist chat session {session.session_id} to Redis: {resp}"
)
return session

View File

@@ -0,0 +1,70 @@
import pytest
from backend.server.v2.chat.model import (
ChatMessage,
ChatSession,
Usage,
get_chat_session,
upsert_chat_session,
)
messages = [
ChatMessage(content="Hello, how are you?", role="user"),
ChatMessage(
content="I'm fine, thank you!",
role="assistant",
tool_calls=[
{
"id": "t123",
"type": "function",
"function": {
"name": "get_weather",
"arguments": '{"city": "New York"}',
},
}
],
),
ChatMessage(
content="I'm using the tool to get the weather",
role="tool",
tool_call_id="t123",
),
]
@pytest.mark.asyncio(loop_scope="session")
async def test_chatsession_serialization_deserialization():
s = ChatSession.new(user_id="abc123")
s.messages = messages
s.usage = [Usage(prompt_tokens=100, completion_tokens=200, total_tokens=300)]
serialized = s.model_dump_json()
s2 = ChatSession.model_validate_json(serialized)
assert s2.model_dump() == s.model_dump()
@pytest.mark.asyncio(loop_scope="session")
async def test_chatsession_redis_storage():
s = ChatSession.new(user_id=None)
s.messages = messages
s = await upsert_chat_session(s)
s2 = await get_chat_session(
session_id=s.session_id,
user_id=s.user_id,
)
assert s2 == s
@pytest.mark.asyncio(loop_scope="session")
async def test_chatsession_redis_storage_user_id_mismatch():
s = ChatSession.new(user_id="abc123")
s.messages = messages
s = await upsert_chat_session(s)
s2 = await get_chat_session(s.session_id, None)
assert s2 is None

View File

@@ -0,0 +1,76 @@
# AutoGPT Agent Setup Assistant
Your name is Otto.
You work for AutoGPT as an AI Co-Pilot acting as an AI Forward Deployed Engineer.
You were made by AutoGPT.
AutoGPT is an AI Business Automation tool it help buisness capture the value from AI to accelerate there growth!
You help users find and set up AutoGPT agents to solve their business problems. **Bias toward action** - move quickly to get agents running.
## THE FLOW (Always Follow This Order)
1. **find_agent** → Search for agents that solve their problem
2. **get_agent_details** → Get comprehensive info about chosen agent
3. **get_required_setup_info** → Verify user has required credentials (MANDATORY before next step)
4. **schedule_agent** or **run_agent** → Execute the agent
## YOUR APPROACH
### STEP 1: UNDERSTAND THE PROBLEM (Quick)
- One or two targeted questions max
- What business problem are they trying to solve?
- Move quickly to searching for solutions
### STEP 2: FIND AGENTS
- Use `find_agent` immediately with relevant keywords
- Suggest the best option based on what you know
- Explain briefly how it solves their problem
- Ask them if they would like to use it, if they do move to step 3
### STEP 3: GET DETAILS
- Use `get_agent_details` on their chosen agent
- Explain what the agent does and its requirements
- Keep explanations brief and outcome-focused
### STEP 4: VERIFY SETUP (CRITICAL)
- **ALWAYS** use `get_required_setup_info` before proceeding
- This checks if user has all required credentials
- Tell user what credentials they need (if any)
- Explain credentials are added via the frontend interface
### STEP 5: EXECUTE
<<<<<<< Updated upstream
- Once credentials verified, use `schedule_agent` for scheduled and tirggered runs OR `run_agent` for immediate execution
=======
- Once credentials verified, use `schedule_agent` for scheduled runs OR `run_agent` for immediate execution
>>>>>>> Stashed changes
- Confirm successful setup/run
- Provide clear next steps
## KEY RULES
### What You DON'T Do:
- Don't help with login (frontend handles this)
- Don't help add credentials (frontend handles this)
- Don't skip `get_required_setup_info` (it's mandatory)
- Don't over-explain technical details
### What You DO:
- Act fast - get to agent discovery quickly
- Use tools proactively without asking permission
- Keep explanations short and business-focused
- Always verify credentials before setup/run
- Focus on outcomes and value
### Error Handling:
- If authentication needed → Tell user to sign in via the interface
- If credentials missing → Tell user what's needed and where to add them in the frontend
- If setup fails → Identify issue, provide clear fix
## SUCCESS LOOKS LIKE:
- User has an agent running within minutes
- User understands what their agent does
- User knows how to use their agent going forward
- Minimal back-and-forth, maximum action
**Remember: Speed to value. Find agent → Get details → Verify credentials → Run. Keep it simple, keep it moving.**

View File

@@ -0,0 +1,100 @@
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
class ResponseType(str, Enum):
"""Types of streaming responses."""
TEXT_CHUNK = "text_chunk"
TEXT_ENDED = "text_ended"
TOOL_CALL = "tool_call"
TOOL_CALL_START = "tool_call_start"
TOOL_RESPONSE = "tool_response"
ERROR = "error"
USAGE = "usage"
STREAM_END = "stream_end"
class StreamBaseResponse(BaseModel):
"""Base response model for all streaming responses."""
type: ResponseType
timestamp: str | None = None
def to_sse(self) -> str:
"""Convert to SSE format."""
return f"data: {self.model_dump_json()}\n\n"
class StreamTextChunk(StreamBaseResponse):
"""Streaming text content from the assistant."""
type: ResponseType = ResponseType.TEXT_CHUNK
content: str = Field(..., description="Text content chunk")
class StreamToolCallStart(StreamBaseResponse):
"""Tool call started notification."""
type: ResponseType = ResponseType.TOOL_CALL_START
tool_id: str = Field(..., description="Unique tool call ID")
class StreamToolCall(StreamBaseResponse):
"""Tool invocation notification."""
type: ResponseType = ResponseType.TOOL_CALL
tool_id: str = Field(..., description="Unique tool call ID")
tool_name: str = Field(..., description="Name of the tool being called")
arguments: dict[str, Any] = Field(
default_factory=dict, description="Tool arguments"
)
class StreamToolExecutionResult(StreamBaseResponse):
"""Tool execution result."""
type: ResponseType = ResponseType.TOOL_RESPONSE
tool_id: str = Field(..., description="Tool call ID this responds to")
tool_name: str = Field(..., description="Name of the tool that was executed")
result: str | dict[str, Any] = Field(..., description="Tool execution result")
success: bool = Field(
default=True, description="Whether the tool execution succeeded"
)
class StreamUsage(StreamBaseResponse):
"""Token usage statistics."""
type: ResponseType = ResponseType.USAGE
prompt_tokens: int
completion_tokens: int
total_tokens: int
class StreamError(StreamBaseResponse):
"""Error response."""
type: ResponseType = ResponseType.ERROR
message: str = Field(..., description="Error message")
code: str | None = Field(default=None, description="Error code")
details: dict[str, Any] | None = Field(
default=None, description="Additional error details"
)
class StreamTextEnded(StreamBaseResponse):
"""Text streaming completed marker."""
type: ResponseType = ResponseType.TEXT_ENDED
class StreamEnd(StreamBaseResponse):
"""End of stream marker."""
type: ResponseType = ResponseType.STREAM_END
summary: dict[str, Any] | None = Field(
default=None, description="Stream summary statistics"
)

View File

@@ -0,0 +1,213 @@
"""Chat API routes for chat session management and streaming via SSE."""
import logging
from collections.abc import AsyncGenerator
from typing import Annotated
from autogpt_libs import auth
from fastapi import APIRouter, Depends, Query, Security
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import backend.server.v2.chat.service as chat_service
from backend.server.v2.chat.config import ChatConfig
from backend.util.exceptions import NotFoundError
config = ChatConfig()
logger = logging.getLogger(__name__)
router = APIRouter(
tags=["chat"],
)
# ========== Request/Response Models ==========
class CreateSessionResponse(BaseModel):
"""Response model containing information on a newly created chat session."""
id: str
created_at: str
user_id: str | None
class SessionDetailResponse(BaseModel):
"""Response model providing complete details for a chat session, including messages."""
id: str
created_at: str
updated_at: str
user_id: str | None
messages: list[dict]
# ========== Routes ==========
@router.post(
"/sessions",
)
async def create_session(
user_id: Annotated[str | None, Depends(auth.get_optional_user_id)],
) -> CreateSessionResponse:
"""
Create a new chat session.
Initiates a new chat session for either an authenticated or anonymous user.
Args:
user_id: The optional authenticated user ID parsed from the JWT. If missing, creates an anonymous session.
Returns:
CreateSessionResponse: Details of the created session.
"""
logger.info(f"Creating session with user_id: {user_id}")
session = await chat_service.create_chat_session(user_id)
return CreateSessionResponse(
id=session.session_id,
created_at=session.started_at.isoformat(),
user_id=session.user_id or None,
)
@router.get(
"/sessions/{session_id}",
)
async def get_session(
session_id: str,
user_id: Annotated[str | None, Depends(auth.get_optional_user_id)],
) -> SessionDetailResponse:
"""
Retrieve the details of a specific chat session.
Looks up a chat session by ID for the given user (if authenticated) and returns all session data including messages.
Args:
session_id: The unique identifier for the desired chat session.
user_id: The optional authenticated user ID, or None for anonymous access.
Returns:
SessionDetailResponse: Details for the requested session; raises NotFoundError if not found.
"""
session = await chat_service.get_session(session_id, user_id)
if not session:
raise NotFoundError(f"Session {session_id} not found")
return SessionDetailResponse(
id=session.session_id,
created_at=session.started_at.isoformat(),
updated_at=session.updated_at.isoformat(),
user_id=session.user_id or None,
messages=[message.model_dump() for message in session.messages],
)
@router.get(
"/sessions/{session_id}/stream",
)
async def stream_chat(
session_id: str,
message: Annotated[str, Query(min_length=1, max_length=10000)],
user_id: str | None = Depends(auth.get_optional_user_id),
is_user_message: bool = Query(default=True),
):
"""
Stream chat responses for a session.
Streams the AI/completion responses in real time over Server-Sent Events (SSE), including:
- Text fragments as they are generated
- Tool call UI elements (if invoked)
- Tool execution results
Args:
session_id: The chat session identifier to associate with the streamed messages.
message: The user's new message to process.
user_id: Optional authenticated user ID.
is_user_message: Whether the message is a user message.
Returns:
StreamingResponse: SSE-formatted response chunks.
"""
# Validate session exists before starting the stream
# This prevents errors after the response has already started
session = await chat_service.get_session(session_id, user_id)
if not session:
raise NotFoundError(f"Session {session_id} not found. ")
if session.user_id is None and user_id is not None:
session = await chat_service.assign_user_to_session(session_id, user_id)
async def event_generator() -> AsyncGenerator[str, None]:
async for chunk in chat_service.stream_chat_completion(
session_id, message, is_user_message=is_user_message, user_id=user_id
):
with open("chunks.log", "a") as f:
f.write(f"{session_id}: {chunk}\n")
yield chunk.to_sse()
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
},
)
@router.patch(
"/sessions/{session_id}/assign-user",
dependencies=[Security(auth.requires_user)],
status_code=200,
)
async def session_assign_user(
session_id: str,
user_id: Annotated[str, Security(auth.get_user_id)],
) -> dict:
"""
Assign an authenticated user to a chat session.
Used (typically post-login) to claim an existing anonymous session as the current authenticated user.
Args:
session_id: The identifier for the (previously anonymous) session.
user_id: The authenticated user's ID to associate with the session.
Returns:
dict: Status of the assignment.
"""
await chat_service.assign_user_to_session(session_id, user_id)
return {"status": "ok"}
# ========== Health Check ==========
@router.get("/health", status_code=200)
async def health_check() -> dict:
"""
Health check endpoint for the chat service.
Performs a full cycle test of session creation, assignment, and retrieval. Should always return healthy
if the service and data layer are operational.
Returns:
dict: A status dictionary indicating health, service name, and API version.
"""
session = await chat_service.create_chat_session(None)
await chat_service.assign_user_to_session(session.session_id, "test_user")
await chat_service.get_session(session.session_id, "test_user")
return {
"status": "healthy",
"service": "chat",
"version": "0.1.0",
}

View File

@@ -0,0 +1,472 @@
import logging
from collections.abc import AsyncGenerator
from datetime import UTC, datetime
from typing import Any
import orjson
from openai import AsyncOpenAI
from openai.types.chat import ChatCompletionChunk, ChatCompletionToolParam
import backend.server.v2.chat.config
from backend.server.v2.chat.model import (
ChatMessage,
ChatSession,
Usage,
get_chat_session,
upsert_chat_session,
)
from backend.server.v2.chat.response_model import (
StreamBaseResponse,
StreamEnd,
StreamError,
StreamTextChunk,
StreamTextEnded,
StreamToolCall,
StreamToolCallStart,
StreamToolExecutionResult,
StreamUsage,
)
from backend.server.v2.chat.tools import execute_tool, tools
from backend.util.exceptions import NotFoundError
logger = logging.getLogger(__name__)
config = backend.server.v2.chat.config.ChatConfig()
client = AsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
async def create_chat_session(
user_id: str | None = None,
) -> ChatSession:
"""
Create a new chat session and persist it to the database.
"""
session = ChatSession.new(user_id)
# Persist the session immediately so it can be used for streaming
return await upsert_chat_session(session)
async def get_session(
session_id: str,
user_id: str | None = None,
) -> ChatSession | None:
"""
Get a chat session by ID.
"""
return await get_chat_session(session_id, user_id)
async def assign_user_to_session(
session_id: str,
user_id: str,
) -> ChatSession:
"""
Assign a user to a chat session.
"""
session = await get_chat_session(session_id, None)
if not session:
raise NotFoundError(f"Session {session_id} not found")
session.user_id = user_id
return await upsert_chat_session(session)
async def stream_chat_completion(
session_id: str,
message: str | None = None,
is_user_message: bool = True,
user_id: str | None = None,
retry_count: int = 0,
) -> AsyncGenerator[StreamBaseResponse, None]:
"""Main entry point for streaming chat completions with database handling.
This function handles all database operations and delegates streaming
to the internal _stream_chat_chunks function.
Args:
session_id: Chat session ID
user_message: User's input message
user_id: User ID for authentication (None for anonymous)
Yields:
StreamBaseResponse objects formatted as SSE
Raises:
NotFoundError: If session_id is invalid
ValueError: If max_context_messages is exceeded
"""
logger.info(
f"Streaming chat completion for session {session_id} for message {message} and user id {user_id}. Message is user message: {is_user_message}"
)
session = await get_chat_session(session_id, user_id)
if not session:
raise NotFoundError(
f"Session {session_id} not found. Please create a new session first."
)
if message:
session.messages.append(
ChatMessage(
role="user" if is_user_message else "assistant", content=message
)
)
if len(session.messages) > config.max_context_messages:
raise ValueError(f"Max messages exceeded: {config.max_context_messages}")
logger.info(
f"Upserting session: {session.session_id} with user id {session.user_id}"
)
session = await upsert_chat_session(session)
assert session, "Session not found"
assistant_response = ChatMessage(
role="assistant",
content="",
)
has_yielded_end = False
has_yielded_error = False
has_done_tool_call = False
has_received_text = False
text_streaming_ended = False
messages_to_add: list[ChatMessage] = []
should_retry = False
try:
async for chunk in _stream_chat_chunks(
session=session,
tools=tools,
):
if isinstance(chunk, StreamTextChunk):
assistant_response.content += chunk.content
has_received_text = True
yield chunk
elif isinstance(chunk, StreamToolCallStart):
# Emit text_ended before first tool call, but only if we've received text
if has_received_text and not text_streaming_ended:
yield StreamTextEnded()
text_streaming_ended = True
yield chunk
elif isinstance(chunk, StreamToolCall):
# Just pass on the tool call notification
pass
elif isinstance(chunk, StreamToolExecutionResult):
result_content = (
chunk.result
if isinstance(chunk.result, str)
else orjson.dumps(chunk.result).decode("utf-8")
)
messages_to_add.append(
ChatMessage(
role="tool",
content=result_content,
tool_call_id=chunk.tool_id,
)
)
has_done_tool_call = True
# Track if any tool execution failed
if not chunk.success:
logger.warning(
f"Tool {chunk.tool_name} (ID: {chunk.tool_id}) execution failed"
)
yield chunk
elif isinstance(chunk, StreamEnd):
if not has_done_tool_call:
has_yielded_end = True
yield chunk
elif isinstance(chunk, StreamError):
has_yielded_error = True
elif isinstance(chunk, StreamUsage):
session.usage.append(
Usage(
prompt_tokens=chunk.prompt_tokens,
completion_tokens=chunk.completion_tokens,
total_tokens=chunk.total_tokens,
)
)
else:
logger.error(f"Unknown chunk type: {type(chunk)}", exc_info=True)
except Exception as e:
logger.error(f"Error during stream: {e!s}", exc_info=True)
# Check if this is a retryable error (JSON parsing, incomplete tool calls, etc.)
is_retryable = isinstance(e, (orjson.JSONDecodeError, KeyError, TypeError))
if is_retryable and retry_count < config.max_retries:
logger.info(
f"Retryable error encountered. Attempt {retry_count + 1}/{config.max_retries}"
)
should_retry = True
else:
# Non-retryable error or max retries exceeded
# Save any partial progress before reporting error
if assistant_response.content or assistant_response.tool_calls:
messages_to_add.append(assistant_response)
session.messages.extend(messages_to_add)
await upsert_chat_session(session)
if not has_yielded_error:
error_message = str(e)
if not is_retryable:
error_message = f"Non-retryable error: {error_message}"
elif retry_count >= config.max_retries:
error_message = (
f"Max retries ({config.max_retries}) exceeded: {error_message}"
)
error_response = StreamError(
message=error_message,
timestamp=datetime.now(UTC).isoformat(),
)
yield error_response
if not has_yielded_end:
yield StreamEnd(
timestamp=datetime.now(UTC).isoformat(),
)
return
# Handle retry outside of exception handler to avoid nesting
if should_retry and retry_count < config.max_retries:
logger.info(
f"Retrying stream_chat_completion for session {session_id}, attempt {retry_count + 1}"
)
async for chunk in stream_chat_completion(
session_id=session.session_id,
user_id=user_id,
retry_count=retry_count + 1,
):
yield chunk
return # Exit after retry to avoid double-saving in finally block
# Normal completion path - save session and handle tool call continuation
logger.info(
f"Upserting session: {session.session_id} with user id {session.user_id}"
)
# Only append assistant response if it has content or tool calls
# to avoid saving empty messages on errors
if assistant_response.content or assistant_response.tool_calls:
messages_to_add.append(assistant_response)
session.messages.extend(messages_to_add)
await upsert_chat_session(session)
# If we did a tool call, stream the chat completion again to get the next response
if has_done_tool_call:
logger.info(
"Tool call executed, streaming chat completion again to get assistant response"
)
async for chunk in stream_chat_completion(
session_id=session.session_id, user_id=user_id
):
yield chunk
async def _stream_chat_chunks(
session: ChatSession,
tools: list[ChatCompletionToolParam],
) -> AsyncGenerator[StreamBaseResponse, None]:
"""
Pure streaming function for OpenAI chat completions with tool calling.
This function is database-agnostic and focuses only on streaming logic.
Args:
messages: Conversation context as ChatCompletionMessageParam list
session_id: Session ID
user_id: User ID for tool execution
Yields:
SSE formatted JSON response objects
"""
model = config.model
logger.info("Starting pure chat stream")
# Loop to handle tool calls and continue conversation
while True:
try:
logger.info("Creating OpenAI chat completion stream...")
# Create the stream with proper types
stream = await client.chat.completions.create(
model=model,
messages=session.to_openai_messages(),
tools=tools,
tool_choice="auto",
stream=True,
)
# Variables to accumulate tool calls
tool_calls: list[dict[str, Any]] = []
active_tool_call_idx: int | None = None
finish_reason: str | None = None
# Track which tool call indices have had their start event emitted
emitted_start_for_idx: set[int] = set()
# Process the stream
chunk: ChatCompletionChunk
async for chunk in stream:
if chunk.usage:
yield StreamUsage(
prompt_tokens=chunk.usage.prompt_tokens,
completion_tokens=chunk.usage.completion_tokens,
total_tokens=chunk.usage.total_tokens,
)
if chunk.choices:
choice = chunk.choices[0]
delta = choice.delta
# Capture finish reason
if choice.finish_reason:
finish_reason = choice.finish_reason
logger.info(f"Finish reason: {finish_reason}")
# Handle content streaming
if delta.content:
# Stream the text chunk
text_response = StreamTextChunk(
content=delta.content,
timestamp=datetime.now(UTC).isoformat(),
)
yield text_response
# Handle tool calls
if delta.tool_calls:
for tc_chunk in delta.tool_calls:
idx = tc_chunk.index
# Update active tool call index if needed
if (
active_tool_call_idx is None
or active_tool_call_idx != idx
):
active_tool_call_idx = idx
# Ensure we have a tool call object at this index
while len(tool_calls) <= idx:
tool_calls.append(
{
"id": "",
"type": "function",
"function": {
"name": "",
"arguments": "",
},
},
)
# Accumulate the tool call data
if tc_chunk.id:
tool_calls[idx]["id"] = tc_chunk.id
if tc_chunk.function:
if tc_chunk.function.name:
tool_calls[idx]["function"][
"name"
] = tc_chunk.function.name
if tc_chunk.function.arguments:
tool_calls[idx]["function"][
"arguments"
] += tc_chunk.function.arguments
# Emit StreamToolCallStart only after we have the tool call ID
if (
idx not in emitted_start_for_idx
and tool_calls[idx]["id"]
):
yield StreamToolCallStart(
tool_id=tool_calls[idx]["id"],
timestamp=datetime.now(UTC).isoformat(),
)
emitted_start_for_idx.add(idx)
logger.info(f"Stream complete. Finish reason: {finish_reason}")
# Yield all accumulated tool calls after the stream is complete
# This ensures all tool call arguments have been fully received
for idx, tool_call in enumerate(tool_calls):
try:
async for tc in _yield_tool_call(tool_calls, idx, session):
yield tc
except (orjson.JSONDecodeError, KeyError, TypeError) as e:
logger.error(
f"Failed to parse tool call {idx}: {e}",
exc_info=True,
extra={"tool_call": tool_call},
)
yield StreamError(
message=f"Invalid tool call arguments for tool {tool_call.get('function', {}).get('name', 'unknown')}: {e}",
timestamp=datetime.now(UTC).isoformat(),
)
# Re-raise to trigger retry logic in the parent function
raise
yield StreamEnd(
timestamp=datetime.now(UTC).isoformat(),
)
return
except Exception as e:
logger.error(f"Error in stream: {e!s}", exc_info=True)
error_response = StreamError(
message=str(e),
timestamp=datetime.now(UTC).isoformat(),
)
yield error_response
yield StreamEnd(
timestamp=datetime.now(UTC).isoformat(),
)
return
async def _yield_tool_call(
tool_calls: list[dict[str, Any]],
yield_idx: int,
session: ChatSession,
) -> AsyncGenerator[StreamBaseResponse, None]:
"""
Yield a tool call and its execution result.
Raises:
orjson.JSONDecodeError: If tool call arguments cannot be parsed as JSON
KeyError: If expected tool call fields are missing
TypeError: If tool call structure is invalid
"""
logger.info(f"Yielding tool call: {tool_calls[yield_idx]}")
# Parse tool call arguments - exceptions will propagate to caller
arguments = orjson.loads(tool_calls[yield_idx]["function"]["arguments"])
yield StreamToolCall(
tool_id=tool_calls[yield_idx]["id"],
tool_name=tool_calls[yield_idx]["function"]["name"],
arguments=arguments,
timestamp=datetime.now(UTC).isoformat(),
)
tool_execution_response: StreamToolExecutionResult = await execute_tool(
tool_name=tool_calls[yield_idx]["function"]["name"],
parameters=arguments,
tool_call_id=tool_calls[yield_idx]["id"],
user_id=session.user_id,
session_id=session.session_id,
)
logger.info(f"Yielding Tool execution response: {tool_execution_response}")
yield tool_execution_response
if __name__ == "__main__":
import asyncio
async def main():
session = await create_chat_session()
async for chunk in stream_chat_completion(
session.session_id,
"Please find me an agent that can help me with my business. Call the tool twice once with the query 'money printing agent' and once with the query 'money generating agent'",
user_id=session.user_id,
):
print(chunk)
asyncio.run(main())

View File

@@ -0,0 +1,81 @@
import logging
from os import getenv
import pytest
import backend.server.v2.chat.service as chat_service
from backend.server.v2.chat.response_model import (
StreamEnd,
StreamError,
StreamTextChunk,
StreamToolExecutionResult,
)
logger = logging.getLogger(__name__)
@pytest.mark.asyncio(loop_scope="session")
async def test_stream_chat_completion():
"""
Test the stream_chat_completion function.
"""
api_key: str | None = getenv("OPEN_ROUTER_API_KEY")
if not api_key:
return pytest.skip("OPEN_ROUTER_API_KEY is not set, skipping test")
session = await chat_service.create_chat_session()
has_errors = False
has_ended = False
assistant_message = ""
async for chunk in chat_service.stream_chat_completion(
session.session_id, "Hello, how are you?", user_id=session.user_id
):
logger.info(chunk)
if isinstance(chunk, StreamError):
has_errors = True
if isinstance(chunk, StreamTextChunk):
assistant_message += chunk.content
if isinstance(chunk, StreamEnd):
has_ended = True
assert has_ended, "Chat completion did not end"
assert not has_errors, "Error occurred while streaming chat completion"
assert assistant_message, "Assistant message is empty"
@pytest.mark.asyncio(loop_scope="session")
async def test_stream_chat_completion_with_tool_calls():
"""
Test the stream_chat_completion function.
"""
api_key: str | None = getenv("OPEN_ROUTER_API_KEY")
if not api_key:
return pytest.skip("OPEN_ROUTER_API_KEY is not set, skipping test")
session = await chat_service.create_chat_session()
session = await chat_service.upsert_chat_session(session)
has_errors = False
has_ended = False
had_tool_calls = False
async for chunk in chat_service.stream_chat_completion(
session.session_id,
"Please find me an agent that can help me with my business. Use the query 'moneny printing agent'",
user_id=session.user_id,
):
logger.info(chunk)
if isinstance(chunk, StreamError):
has_errors = True
if isinstance(chunk, StreamEnd):
has_ended = True
if isinstance(chunk, StreamToolExecutionResult):
had_tool_calls = True
assert has_ended, "Chat completion did not end"
assert not has_errors, "Error occurred while streaming chat completion"
assert had_tool_calls, "Tool calls did not occur"
session = await chat_service.get_session(session.session_id)
assert session, "Session not found"
assert session.usage, "Usage is empty"

View File

@@ -0,0 +1,51 @@
from typing import TYPE_CHECKING, Any
from openai.types.chat import ChatCompletionToolParam
from .base import BaseTool
from .find_agent import FindAgentTool
from .get_agent_details import GetAgentDetailsTool
from .get_required_setup_info import GetRequiredSetupInfoTool
from .run_agent import RunAgentTool
from .setup_agent import SetupAgentTool
if TYPE_CHECKING:
from backend.server.v2.chat.response_model import StreamToolExecutionResult
# Initialize tool instances
find_agent_tool = FindAgentTool()
get_agent_details_tool = GetAgentDetailsTool()
get_required_setup_info_tool = GetRequiredSetupInfoTool()
setup_agent_tool = SetupAgentTool()
run_agent_tool = RunAgentTool()
# Export tools as OpenAI format
tools: list[ChatCompletionToolParam] = [
find_agent_tool.as_openai_tool(),
get_agent_details_tool.as_openai_tool(),
get_required_setup_info_tool.as_openai_tool(),
setup_agent_tool.as_openai_tool(),
run_agent_tool.as_openai_tool(),
]
async def execute_tool(
tool_name: str,
parameters: dict[str, Any],
user_id: str | None,
session_id: str,
tool_call_id: str,
) -> "StreamToolExecutionResult":
tool_map: dict[str, BaseTool] = {
"find_agent": find_agent_tool,
"get_agent_details": get_agent_details_tool,
"get_required_setup_info": get_required_setup_info_tool,
"setup_agent": setup_agent_tool,
"run_agent": run_agent_tool,
}
if tool_name not in tool_map:
raise ValueError(f"Tool {tool_name} not found")
return await tool_map[tool_name].execute(
user_id, session_id, tool_call_id, **parameters
)

View File

@@ -0,0 +1,449 @@
import uuid
from os import getenv
import pytest
from pydantic import SecretStr
from backend.blocks.firecrawl.scrape import FirecrawlScrapeBlock
from backend.blocks.io import AgentInputBlock, AgentOutputBlock
from backend.blocks.llm import AITextGeneratorBlock
from backend.data.db import prisma
from backend.data.graph import Graph, Link, Node, create_graph
from backend.data.model import APIKeyCredentials
from backend.data.user import get_or_create_user
from backend.integrations.credentials_store import IntegrationCredentialsStore
from backend.server.v2.store import db as store_db
@pytest.fixture(scope="session")
async def setup_test_data():
"""
Set up test data for run_agent tests:
1. Create a test user
2. Create a test graph (agent input -> agent output)
3. Create a store listing and store listing version
4. Approve the store listing version
"""
# 1. Create a test user
user_data = {
"sub": f"test-user-{uuid.uuid4()}",
"email": f"test-{uuid.uuid4()}@example.com",
}
user = await get_or_create_user(user_data)
# 1b. Create a profile with username for the user (required for store agent lookup)
username = user.email.split("@")[0]
await prisma.profile.create(
data={
"userId": user.id,
"username": username,
"name": f"Test User {username}",
"description": "Test user profile",
"links": [], # Required field - empty array for test profiles
}
)
# 2. Create a test graph with agent input -> agent output
graph_id = str(uuid.uuid4())
# Create input node
input_node_id = str(uuid.uuid4())
input_block = AgentInputBlock()
input_node = Node(
id=input_node_id,
block_id=input_block.id,
input_default={
"name": "test_input",
"title": "Test Input",
"value": "",
"advanced": False,
"description": "Test input field",
"placeholder_values": [],
},
metadata={"position": {"x": 0, "y": 0}},
)
# Create output node
output_node_id = str(uuid.uuid4())
output_block = AgentOutputBlock()
output_node = Node(
id=output_node_id,
block_id=output_block.id,
input_default={
"name": "test_output",
"title": "Test Output",
"value": "",
"format": "",
"advanced": False,
"description": "Test output field",
},
metadata={"position": {"x": 200, "y": 0}},
)
# Create link from input to output
link = Link(
source_id=input_node_id,
sink_id=output_node_id,
source_name="result",
sink_name="value",
is_static=True,
)
# Create the graph
graph = Graph(
id=graph_id,
version=1,
is_active=True,
name="Test Agent",
description="A simple test agent for testing",
nodes=[input_node, output_node],
links=[link],
)
created_graph = await create_graph(graph, user.id)
# 3. Create a store listing and store listing version for the agent
# Use unique slug to avoid constraint violations
unique_slug = f"test-agent-{str(uuid.uuid4())[:8]}"
store_submission = await store_db.create_store_submission(
user_id=user.id,
agent_id=created_graph.id,
agent_version=created_graph.version,
slug=unique_slug,
name="Test Agent",
description="A simple test agent",
sub_heading="Test agent for unit tests",
categories=["testing"],
image_urls=["https://example.com/image.jpg"],
)
assert store_submission.store_listing_version_id is not None
# 4. Approve the store listing version
await store_db.review_store_submission(
store_listing_version_id=store_submission.store_listing_version_id,
is_approved=True,
external_comments="Approved for testing",
internal_comments="Test approval",
reviewer_id=user.id,
)
return {
"user": user,
"graph": created_graph,
"store_submission": store_submission,
}
@pytest.fixture(scope="session")
async def setup_llm_test_data():
"""
Set up test data for LLM agent tests:
1. Create a test user
2. Create test OpenAI credentials for the user
3. Create a test graph with input -> LLM block -> output
4. Create and approve a store listing
"""
key = getenv("OPENAI_API_KEY")
if not key:
return pytest.skip("OPENAI_API_KEY is not set")
# 1. Create a test user
user_data = {
"sub": f"test-user-{uuid.uuid4()}",
"email": f"test-{uuid.uuid4()}@example.com",
}
user = await get_or_create_user(user_data)
# 1b. Create a profile with username for the user (required for store agent lookup)
username = user.email.split("@")[0]
await prisma.profile.create(
data={
"userId": user.id,
"username": username,
"name": f"Test User {username}",
"description": "Test user profile for LLM tests",
"links": [], # Required field - empty array for test profiles
}
)
# 2. Create test OpenAI credentials for the user
credentials = APIKeyCredentials(
id=str(uuid.uuid4()),
provider="openai",
api_key=SecretStr("test-openai-api-key"),
title="Test OpenAI API Key",
expires_at=None,
)
# Store the credentials
creds_store = IntegrationCredentialsStore()
await creds_store.add_creds(user.id, credentials)
# 3. Create a test graph with input -> LLM block -> output
graph_id = str(uuid.uuid4())
# Create input node for the prompt
input_node_id = str(uuid.uuid4())
input_block = AgentInputBlock()
input_node = Node(
id=input_node_id,
block_id=input_block.id,
input_default={
"name": "user_prompt",
"title": "User Prompt",
"value": "",
"advanced": False,
"description": "Prompt for the LLM",
"placeholder_values": [],
},
metadata={"position": {"x": 0, "y": 0}},
)
# Create LLM block node
llm_node_id = str(uuid.uuid4())
llm_block = AITextGeneratorBlock()
llm_node = Node(
id=llm_node_id,
block_id=llm_block.id,
input_default={
"model": "gpt-4o-mini",
"sys_prompt": "You are a helpful assistant.",
"retry": 3,
"prompt_values": {},
"credentials": {
"provider": "openai",
"id": credentials.id,
"type": "api_key",
"title": credentials.title,
},
},
metadata={"position": {"x": 300, "y": 0}},
)
# Create output node
output_node_id = str(uuid.uuid4())
output_block = AgentOutputBlock()
output_node = Node(
id=output_node_id,
block_id=output_block.id,
input_default={
"name": "llm_response",
"title": "LLM Response",
"value": "",
"format": "",
"advanced": False,
"description": "Response from the LLM",
},
metadata={"position": {"x": 600, "y": 0}},
)
# Create links
# Link input.result -> llm.prompt
link1 = Link(
source_id=input_node_id,
sink_id=llm_node_id,
source_name="result",
sink_name="prompt",
is_static=True,
)
# Link llm.response -> output.value
link2 = Link(
source_id=llm_node_id,
sink_id=output_node_id,
source_name="response",
sink_name="value",
is_static=False,
)
# Create the graph
graph = Graph(
id=graph_id,
version=1,
is_active=True,
name="LLM Test Agent",
description="An agent that uses an LLM to process text",
nodes=[input_node, llm_node, output_node],
links=[link1, link2],
)
created_graph = await create_graph(graph, user.id)
# 4. Create and approve a store listing
unique_slug = f"llm-test-agent-{str(uuid.uuid4())[:8]}"
store_submission = await store_db.create_store_submission(
user_id=user.id,
agent_id=created_graph.id,
agent_version=created_graph.version,
slug=unique_slug,
name="LLM Test Agent",
description="An agent with LLM capabilities",
sub_heading="Test agent with OpenAI integration",
categories=["testing", "ai"],
image_urls=["https://example.com/image.jpg"],
)
assert store_submission.store_listing_version_id is not None
await store_db.review_store_submission(
store_listing_version_id=store_submission.store_listing_version_id,
is_approved=True,
external_comments="Approved for testing",
internal_comments="Test approval for LLM agent",
reviewer_id=user.id,
)
return {
"user": user,
"graph": created_graph,
"credentials": credentials,
"store_submission": store_submission,
}
@pytest.fixture(scope="session")
async def setup_firecrawl_test_data():
"""
Set up test data for Firecrawl agent tests (missing credentials scenario):
1. Create a test user (WITHOUT Firecrawl credentials)
2. Create a test graph with input -> Firecrawl block -> output
3. Create and approve a store listing
"""
# 1. Create a test user
user_data = {
"sub": f"test-user-{uuid.uuid4()}",
"email": f"test-{uuid.uuid4()}@example.com",
}
user = await get_or_create_user(user_data)
# 1b. Create a profile with username for the user (required for store agent lookup)
username = user.email.split("@")[0]
await prisma.profile.create(
data={
"userId": user.id,
"username": username,
"name": f"Test User {username}",
"description": "Test user profile for Firecrawl tests",
"links": [], # Required field - empty array for test profiles
}
)
# NOTE: We deliberately do NOT create Firecrawl credentials for this user
# This tests the scenario where required credentials are missing
# 2. Create a test graph with input -> Firecrawl block -> output
graph_id = str(uuid.uuid4())
# Create input node for the URL
input_node_id = str(uuid.uuid4())
input_block = AgentInputBlock()
input_node = Node(
id=input_node_id,
block_id=input_block.id,
input_default={
"name": "url",
"title": "URL to Scrape",
"value": "",
"advanced": False,
"description": "URL for Firecrawl to scrape",
"placeholder_values": [],
},
metadata={"position": {"x": 0, "y": 0}},
)
# Create Firecrawl block node
firecrawl_node_id = str(uuid.uuid4())
firecrawl_block = FirecrawlScrapeBlock()
firecrawl_node = Node(
id=firecrawl_node_id,
block_id=firecrawl_block.id,
input_default={
"limit": 10,
"only_main_content": True,
"max_age": 3600000,
"wait_for": 200,
"formats": ["markdown"],
"credentials": {
"provider": "firecrawl",
"id": "test-firecrawl-id",
"type": "api_key",
"title": "Firecrawl API Key",
},
},
metadata={"position": {"x": 300, "y": 0}},
)
# Create output node
output_node_id = str(uuid.uuid4())
output_block = AgentOutputBlock()
output_node = Node(
id=output_node_id,
block_id=output_block.id,
input_default={
"name": "scraped_data",
"title": "Scraped Data",
"value": "",
"format": "",
"advanced": False,
"description": "Data scraped by Firecrawl",
},
metadata={"position": {"x": 600, "y": 0}},
)
# Create links
# Link input.result -> firecrawl.url
link1 = Link(
source_id=input_node_id,
sink_id=firecrawl_node_id,
source_name="result",
sink_name="url",
is_static=True,
)
# Link firecrawl.markdown -> output.value
link2 = Link(
source_id=firecrawl_node_id,
sink_id=output_node_id,
source_name="markdown",
sink_name="value",
is_static=False,
)
# Create the graph
graph = Graph(
id=graph_id,
version=1,
is_active=True,
name="Firecrawl Test Agent",
description="An agent that uses Firecrawl to scrape websites",
nodes=[input_node, firecrawl_node, output_node],
links=[link1, link2],
)
created_graph = await create_graph(graph, user.id)
# 3. Create and approve a store listing
unique_slug = f"firecrawl-test-agent-{str(uuid.uuid4())[:8]}"
store_submission = await store_db.create_store_submission(
user_id=user.id,
agent_id=created_graph.id,
agent_version=created_graph.version,
slug=unique_slug,
name="Firecrawl Test Agent",
description="An agent with Firecrawl integration (no credentials)",
sub_heading="Test agent requiring Firecrawl credentials",
categories=["testing", "scraping"],
image_urls=["https://example.com/image.jpg"],
)
assert store_submission.store_listing_version_id is not None
await store_db.review_store_submission(
store_listing_version_id=store_submission.store_listing_version_id,
is_approved=True,
external_comments="Approved for testing",
internal_comments="Test approval for Firecrawl agent",
reviewer_id=user.id,
)
return {
"user": user,
"graph": created_graph,
"store_submission": store_submission,
}

View File

@@ -0,0 +1,118 @@
"""Base classes and shared utilities for chat tools."""
import logging
from typing import Any
from openai.types.chat import ChatCompletionToolParam
from backend.server.v2.chat.response_model import StreamToolExecutionResult
from .models import ErrorResponse, NeedLoginResponse, ToolResponseBase
logger = logging.getLogger(__name__)
class BaseTool:
"""Base class for all chat tools."""
@property
def name(self) -> str:
"""Tool name for OpenAI function calling."""
raise NotImplementedError
@property
def description(self) -> str:
"""Tool description for OpenAI."""
raise NotImplementedError
@property
def parameters(self) -> dict[str, Any]:
"""Tool parameters schema for OpenAI."""
raise NotImplementedError
@property
def requires_auth(self) -> bool:
"""Whether this tool requires authentication."""
return False
def as_openai_tool(self) -> ChatCompletionToolParam:
"""Convert to OpenAI tool format."""
return ChatCompletionToolParam(
type="function",
function={
"name": self.name,
"description": self.description,
"parameters": self.parameters,
},
)
async def execute(
self,
user_id: str | None,
session_id: str,
tool_call_id: str,
**kwargs,
) -> StreamToolExecutionResult:
"""Execute the tool with authentication check.
Args:
user_id: User ID (may be anonymous like "anon_123")
session_id: Chat session ID
**kwargs: Tool-specific parameters
Returns:
Pydantic response object
"""
if self.requires_auth and not user_id:
logger.error(
f"Attempted tool call for {self.name} but user not authenticated"
)
return StreamToolExecutionResult(
tool_id=tool_call_id,
tool_name=self.name,
result=NeedLoginResponse(
message=f"Please sign in to use {self.name}",
session_id=session_id,
).model_dump_json(),
success=False,
)
try:
result = await self._execute(user_id, session_id, **kwargs)
return StreamToolExecutionResult(
tool_id=tool_call_id,
tool_name=self.name,
result=result.model_dump_json(),
)
except Exception as e:
logger.error(f"Error in {self.name}: {e}", exc_info=True)
return StreamToolExecutionResult(
tool_id=tool_call_id,
tool_name=self.name,
result=ErrorResponse(
message=f"An error occurred while executing {self.name}",
error=str(e),
session_id=session_id,
).model_dump_json(),
success=False,
)
async def _execute(
self,
user_id: str | None,
session_id: str,
**kwargs,
) -> ToolResponseBase:
"""Internal execution logic to be implemented by subclasses.
Args:
user_id: User ID (authenticated or anonymous)
session_id: Chat session ID
**kwargs: Tool-specific parameters
Returns:
Pydantic response object
"""
raise NotImplementedError

View File

@@ -0,0 +1,149 @@
"""Tool for discovering agents from marketplace and user library."""
import logging
from typing import Any
from backend.server.v2.chat.tools.base import BaseTool
from backend.server.v2.chat.tools.models import (
AgentCarouselResponse,
AgentInfo,
ErrorResponse,
NoResultsResponse,
ToolResponseBase,
)
from backend.server.v2.store import db as store_db
from backend.util.exceptions import DatabaseError, NotFoundError
logger = logging.getLogger(__name__)
class FindAgentTool(BaseTool):
"""Tool for discovering agents based on user needs."""
@property
def name(self) -> str:
return "find_agent"
@property
def description(self) -> str:
return (
"Discover agents from the marketplace based on capabilities and user needs."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query describing what the user wants to accomplish. Use single keywords for best results.",
},
},
"required": ["query"],
}
async def _execute(
self,
user_id: str | None,
session_id: str,
**kwargs,
) -> ToolResponseBase:
"""Search for agents in the marketplace.
Args:
user_id: User ID (may be anonymous)
session_id: Chat session ID
query: Search query
Returns:
AgentCarouselResponse: List of agents found in the marketplace
NoResultsResponse: No agents found in the marketplace
ErrorResponse: Error message
"""
query = kwargs.get("query", "").strip()
if not query:
return ErrorResponse(
message="Please provide a search query",
session_id=session_id,
)
agents = []
try:
logger.info(f"Searching marketplace for: {query}")
store_results = await store_db.get_store_agents(
search_query=query,
page_size=5,
)
logger.info(f"Find agents tool found {len(store_results.agents)} agents")
for agent in store_results.agents:
agent_id = f"{agent.creator}/{agent.slug}"
logger.info(f"Building agent ID = {agent_id}")
agents.append(
AgentInfo(
id=agent_id,
name=agent.agent_name,
description=agent.description or "",
source="marketplace",
in_library=False,
creator=agent.creator,
category="general",
rating=agent.rating,
runs=agent.runs,
is_featured=False,
),
)
except NotFoundError:
pass
except DatabaseError as e:
logger.error(f"Error searching agents: {e}", exc_info=True)
return ErrorResponse(
message="Failed to search for agents. Please try again.",
error=str(e),
session_id=session_id,
)
if not agents:
return NoResultsResponse(
message=f"No agents found matching '{query}'. Try different keywords or browse the marketplace. If you have 3 consecutive find_agent tool calls results and found no agents. Please stop trying and ask the user if there is anything else you can help with.",
session_id=session_id,
suggestions=[
"Try more general terms",
"Browse categories in the marketplace",
"Check spelling",
],
)
# Return formatted carousel
title = (
f"Found {len(agents)} agent{'s' if len(agents) != 1 else ''} for '{query}'"
)
return AgentCarouselResponse(
message="Now you have found some options for the user to choose from. Please ask the user if they would like to use any of these agents. If they do, please call the get_agent_details tool for this agent.",
title=title,
agents=agents,
count=len(agents),
session_id=session_id,
)
if __name__ == "__main__":
import asyncio
import prisma
find_agent_tool = FindAgentTool()
print(find_agent_tool.as_openai_tool())
async def main():
await prisma.Prisma().connect()
agents = await find_agent_tool.execute(
tool_call_id="tool_call_id",
query="Linkedin",
user_id="user",
session_id="session",
)
print(agents)
await prisma.Prisma().disconnect()
asyncio.run(main())

View File

@@ -0,0 +1,220 @@
"""Tool for getting detailed information about a specific agent."""
import logging
from typing import Any
from backend.data import graph as graph_db
from backend.data.model import CredentialsMetaInput
from backend.server.v2.chat.tools.base import BaseTool
from backend.server.v2.chat.tools.models import (
AgentDetails,
AgentDetailsResponse,
ErrorResponse,
ExecutionOptions,
ToolResponseBase,
)
from backend.server.v2.store import db as store_db
from backend.util.exceptions import DatabaseError, NotFoundError
logger = logging.getLogger(__name__)
class GetAgentDetailsTool(BaseTool):
"""Tool for getting detailed information about an agent."""
@property
def name(self) -> str:
return "get_agent_details"
@property
def description(self) -> str:
return "Get detailed information about a specific agent including inputs, credentials required, and execution options."
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"username_agent_slug": {
"type": "string",
"description": "The marketplace agent slug (e.g., 'username/agent-name')",
},
},
"required": ["username_agent_slug"],
}
async def _execute(
self,
user_id: str | None,
session_id: str,
**kwargs,
) -> ToolResponseBase:
"""Get detailed information about an agent.
Args:
user_id: User ID (may be anonymous)
session_id: Chat session ID
username_agent_slug: Agent ID or slug
Returns:
Pydantic response model
"""
agent_id = kwargs.get("username_agent_slug", "").strip()
if not agent_id or "/" not in agent_id:
return ErrorResponse(
message="Please provide an agent ID in format 'creator/agent-name'",
session_id=session_id,
)
try:
# Always try to get from marketplace first
graph = None
store_agent = None
# Check if it's a slug format (username/agent_name)
try:
# Parse username/agent_name from slug
username, agent_name = agent_id.split("/", 1)
store_agent = await store_db.get_store_agent_details(
username, agent_name
)
logger.info(f"Found agent {agent_id} in marketplace")
except NotFoundError as e:
logger.debug(f"Failed to get from marketplace: {e}")
return ErrorResponse(
message=f"Agent '{agent_id}' not found",
session_id=session_id,
)
except DatabaseError as e:
logger.error(f"Failed to get from marketplace: {e}")
return ErrorResponse(
message=f"Failed to get agent details: {e!s}",
session_id=session_id,
)
# If we found a store agent, get its graph
if store_agent:
try:
# Use get_available_graph to get the graph from store listing version
graph_meta = await store_db.get_available_graph(
store_agent.store_listing_version_id
)
# Now get the full graph with that ID
graph = await graph_db.get_graph(
graph_id=graph_meta.id,
version=graph_meta.version,
user_id=None, # Public access
include_subgraphs=True,
)
except NotFoundError as e:
logger.error(f"Failed to get graph for store agent: {e}")
return ErrorResponse(
message=f"Failed to get graph for store agent: {e!s}",
session_id=session_id,
)
except DatabaseError as e:
logger.error(f"Failed to get graph for store agent: {e}")
return ErrorResponse(
message=f"Failed to get graph for store agent: {e!s}",
session_id=session_id,
)
if not graph:
return ErrorResponse(
message=f"Agent '{agent_id}' not found",
session_id=session_id,
)
credentials_input_schema = graph.credentials_input_schema
# Extract credentials from the JSON schema properties
credentials = []
if (
isinstance(credentials_input_schema, dict)
and "properties" in credentials_input_schema
):
for cred_name, cred_schema in credentials_input_schema[
"properties"
].items():
# Extract credential metadata from the schema
# The schema properties contain provider info and other metadata
# Get provider from credentials_provider array or properties.provider.const
provider = "unknown"
if (
"credentials_provider" in cred_schema
and cred_schema["credentials_provider"]
):
provider = cred_schema["credentials_provider"][0]
elif (
"properties" in cred_schema
and "provider" in cred_schema["properties"]
):
provider = cred_schema["properties"]["provider"].get(
"const", "unknown"
)
# Get type from credentials_types array or properties.type.const
cred_type = "api_key" # Default
if (
"credentials_types" in cred_schema
and cred_schema["credentials_types"]
):
cred_type = cred_schema["credentials_types"][0]
elif (
"properties" in cred_schema
and "type" in cred_schema["properties"]
):
cred_type = cred_schema["properties"]["type"].get(
"const", "api_key"
)
credentials.append(
CredentialsMetaInput(
id=cred_name,
title=cred_schema.get("title", cred_name),
provider=provider, # type: ignore
type=cred_type,
)
)
trigger_info = (
graph.trigger_setup_info.model_dump()
if graph.trigger_setup_info
else None
)
agent_details = AgentDetails(
id=graph.id,
name=graph.name,
description=graph.description,
inputs=graph.input_schema,
credentials=credentials,
execution_options=ExecutionOptions(
# Currently a graph with a webhook can only be triggered by a webhook
manual=trigger_info is None,
scheduled=trigger_info is None,
webhook=trigger_info is not None,
),
trigger_info=trigger_info,
)
return AgentDetailsResponse(
message=f"Found agent '{agent_details.name}'. You do not need to run this tool again for this agent.",
session_id=session_id,
agent=agent_details,
user_authenticated=user_id is not None,
graph_id=graph.id,
graph_version=graph.version,
)
except Exception as e:
logger.error(f"Error getting agent details: {e}", exc_info=True)
return ErrorResponse(
message=f"Failed to get agent details: {e!s}",
error=str(e),
session_id=session_id,
)

View File

@@ -0,0 +1,309 @@
import uuid
import orjson
import pytest
from backend.server.v2.chat.tools._test_data import setup_llm_test_data, setup_test_data
from backend.server.v2.chat.tools.get_agent_details import GetAgentDetailsTool
# This is so the formatter doesn't remove the fixture imports
setup_llm_test_data = setup_llm_test_data
setup_test_data = setup_test_data
@pytest.mark.asyncio(scope="session")
async def test_get_agent_details_success(setup_test_data):
"""Test successfully getting agent details from marketplace"""
# Use test data from fixture
user = setup_test_data["user"]
graph = setup_test_data["graph"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = GetAgentDetailsTool()
# Build the proper marketplace agent_id format: username/slug
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute the tool
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Check the basic structure
assert "agent" in result_data
assert "message" in result_data
assert "graph_id" in result_data
assert "graph_version" in result_data
assert "user_authenticated" in result_data
# Check agent details
agent = result_data["agent"]
assert agent["id"] == graph.id
assert agent["name"] == "Test Agent"
assert (
agent["description"] == "A simple test agent"
) # Description from store submission
assert "inputs" in agent
assert "credentials" in agent
assert "execution_options" in agent
# Check execution options
exec_options = agent["execution_options"]
assert "manual" in exec_options
assert "scheduled" in exec_options
assert "webhook" in exec_options
# Check inputs schema
assert isinstance(agent["inputs"], dict)
# Should have properties for the input fields
if "properties" in agent["inputs"]:
assert "test_input" in agent["inputs"]["properties"]
@pytest.mark.asyncio(scope="session")
async def test_get_agent_details_with_llm_credentials(setup_llm_test_data):
"""Test getting agent details for an agent that requires LLM credentials"""
# Use test data from fixture
user = setup_llm_test_data["user"]
store_submission = setup_llm_test_data["store_submission"]
# Create the tool instance
tool = GetAgentDetailsTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute the tool
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Check that agent details are returned
assert "agent" in result_data
agent = result_data["agent"]
# Check that credentials are listed
assert "credentials" in agent
credentials = agent["credentials"]
# The LLM agent should have OpenAI credentials listed
# Note: This depends on how the graph's credentials_input_schema is structured
assert isinstance(credentials, list)
# Check that inputs include the user_prompt
assert "inputs" in agent
if "properties" in agent["inputs"]:
assert "user_prompt" in agent["inputs"]["properties"]
@pytest.mark.asyncio(scope="session")
async def test_get_agent_details_invalid_format():
"""Test error handling when agent_id is not in correct format"""
tool = GetAgentDetailsTool()
# Execute with invalid format (no slash)
response = await tool.execute(
user_id=str(uuid.uuid4()),
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug="invalid-format",
)
# Verify error response
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "message" in result_data
assert "creator/agent-name" in result_data["message"]
@pytest.mark.asyncio(scope="session")
async def test_get_agent_details_empty_slug():
"""Test error handling when agent_id is empty"""
tool = GetAgentDetailsTool()
# Execute with empty slug
response = await tool.execute(
user_id=str(uuid.uuid4()),
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug="",
)
# Verify error response
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "message" in result_data
assert "creator/agent-name" in result_data["message"]
@pytest.mark.asyncio(scope="session")
async def test_get_agent_details_not_found():
"""Test error handling when agent is not found in marketplace"""
tool = GetAgentDetailsTool()
# Execute with non-existent agent
response = await tool.execute(
user_id=str(uuid.uuid4()),
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug="nonexistent/agent",
)
# Verify error response
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "message" in result_data
assert "not found" in result_data["message"].lower()
@pytest.mark.asyncio(scope="session")
async def test_get_agent_details_anonymous_user(setup_test_data):
"""Test getting agent details as an anonymous user (no user_id)"""
# Use test data from fixture
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = GetAgentDetailsTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute the tool without a user_id (anonymous)
response = await tool.execute(
user_id=None,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Should still get agent details
assert "agent" in result_data
assert "user_authenticated" in result_data
# User should be marked as not authenticated
assert result_data["user_authenticated"] is False
@pytest.mark.asyncio(scope="session")
async def test_get_agent_details_authenticated_user(setup_test_data):
"""Test getting agent details as an authenticated user"""
# Use test data from fixture
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = GetAgentDetailsTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute the tool with a user_id (authenticated)
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Should get agent details
assert "agent" in result_data
assert "user_authenticated" in result_data
# User should be marked as authenticated
assert result_data["user_authenticated"] is True
@pytest.mark.asyncio(scope="session")
async def test_get_agent_details_includes_execution_options(setup_test_data):
"""Test that agent details include execution options"""
# Use test data from fixture
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = GetAgentDetailsTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute the tool
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Check execution options
assert "agent" in result_data
agent = result_data["agent"]
assert "execution_options" in agent
exec_options = agent["execution_options"]
# These should all be boolean values
assert isinstance(exec_options["manual"], bool)
assert isinstance(exec_options["scheduled"], bool)
assert isinstance(exec_options["webhook"], bool)
# For a regular agent (no webhook), manual and scheduled should be True
assert exec_options["manual"] is True
assert exec_options["scheduled"] is True
assert exec_options["webhook"] is False

View File

@@ -0,0 +1,183 @@
"""Tool for getting required setup information for an agent."""
import logging
from typing import Any
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.server.v2.chat.tools.base import BaseTool
from backend.server.v2.chat.tools.get_agent_details import GetAgentDetailsTool
from backend.server.v2.chat.tools.models import (
AgentDetailsResponse,
ErrorResponse,
SetupInfo,
SetupRequirementsResponse,
ToolResponseBase,
UserReadiness,
)
logger = logging.getLogger(__name__)
class GetRequiredSetupInfoTool(BaseTool):
"""Tool for getting required setup information including credentials and inputs."""
@property
def name(self) -> str:
return "get_required_setup_info"
@property
def description(self) -> str:
return """Check if an agent can be set up with the provided input data and credentials.
Call this AFTER get_agent_details to validate that you have all required inputs.
Pass the input dictionary you plan to use with run_agent or setup_agent to verify it's complete."""
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"username_agent_slug": {
"type": "string",
"description": "The marketplace agent slug (e.g., 'username/agent-name' or just 'agent-name' to search)",
},
"inputs": {
"type": "object",
"description": "The input dictionary you plan to provide. Should contain ALL required inputs from get_agent_details",
"additionalProperties": True,
},
},
"required": ["username_agent_slug"],
}
@property
def requires_auth(self) -> bool:
"""This tool requires authentication."""
return True
async def _execute(
self,
user_id: str | None,
session_id: str,
**kwargs,
) -> ToolResponseBase:
"""
Retrieve and validate the required setup information for running or configuring an agent.
This checks all required credentials and input fields based on the agent details,
and verifies user readiness to run the agent based on provided inputs and available credentials.
Args:
user_id: The authenticated user's ID (must not be None; authentication required).
session_id: The chat session ID.
agent_id: The agent's marketplace slug (e.g. 'username/agent-name'). Also accepts Graph ID.
agent_version: (Optional) Specific agent/graph version (if applicable).
Returns:
SetupRequirementsResponse containing:
- agent and graph info,
- credential and input requirements,
- user readiness and missing credentials/fields,
- setup instructions.
"""
assert (
user_id is not None
), "GetRequiredSetupInfoTool - This should never happen user_id is None when auth is required"
# Call _execute directly since we're calling internally from another tool
agent_details = await GetAgentDetailsTool()._execute(
user_id, session_id, **kwargs
)
if isinstance(agent_details, ErrorResponse):
return agent_details
if not isinstance(agent_details, AgentDetailsResponse):
return ErrorResponse(
message="Failed to get agent details",
session_id=session_id,
)
available_creds = await IntegrationCredentialsManager().store.get_all_creds(
user_id
)
required_credentials = []
# Check if user has credentials matching the required provider/type
for c in agent_details.agent.credentials:
# Check if any available credential matches this provider and type
has_matching_cred = any(
cred.provider == c.provider and cred.type == c.type
for cred in available_creds
)
if not has_matching_cred:
required_credentials.append(c)
required_fields = set(agent_details.agent.inputs.get("required", []))
provided_inputs = kwargs.get("inputs", {})
missing_inputs = required_fields - set(provided_inputs.keys())
missing_credentials = {c.id: c.model_dump() for c in required_credentials}
user_readiness = UserReadiness(
has_all_credentials=len(required_credentials) == 0,
missing_credentials=missing_credentials,
ready_to_run=len(missing_inputs) == 0 and len(required_credentials) == 0,
)
# Convert execution options to list of available modes
exec_opts = agent_details.agent.execution_options
execution_modes = []
if exec_opts.manual:
execution_modes.append("manual")
if exec_opts.scheduled:
execution_modes.append("scheduled")
if exec_opts.webhook:
execution_modes.append("webhook")
# Convert input schema to list of input field info
inputs_list = []
if (
isinstance(agent_details.agent.inputs, dict)
and "properties" in agent_details.agent.inputs
):
for field_name, field_schema in agent_details.agent.inputs[
"properties"
].items():
inputs_list.append(
{
"name": field_name,
"title": field_schema.get("title", field_name),
"type": field_schema.get("type", "string"),
"description": field_schema.get("description", ""),
"required": field_name
in agent_details.agent.inputs.get("required", []),
}
)
requirements = {
"credentials": agent_details.agent.credentials,
"inputs": inputs_list,
"execution_modes": execution_modes,
}
message = ""
if len(agent_details.agent.credentials) > 0:
message = "The user needs to enter credentials before proceeding. Please wait until you have a message informing you that the credentials have been entered."
elif len(inputs_list) > 0:
message = (
"The user needs to enter inputs before proceeding. Please wait until you have a message informing you that the inputs have been entered. The inputs are: "
+ ", ".join([input["name"] for input in inputs_list])
)
else:
message = "The agent is ready to run. Please call the run_agent tool with the agent ID."
return SetupRequirementsResponse(
message=message,
session_id=session_id,
setup_info=SetupInfo(
agent_id=agent_details.agent.id,
agent_name=agent_details.agent.name,
user_readiness=user_readiness,
requirements=requirements,
),
graph_id=agent_details.graph_id,
graph_version=agent_details.graph_version,
)

View File

@@ -0,0 +1,394 @@
import uuid
import orjson
import pytest
from backend.server.v2.chat.tools._test_data import (
setup_firecrawl_test_data,
setup_llm_test_data,
setup_test_data,
)
from backend.server.v2.chat.tools.get_required_setup_info import (
GetRequiredSetupInfoTool,
)
# This is so the formatter doesn't remove the fixture imports
setup_llm_test_data = setup_llm_test_data
setup_test_data = setup_test_data
setup_firecrawl_test_data = setup_firecrawl_test_data
@pytest.mark.asyncio(scope="session")
async def test_get_required_setup_info_success(setup_test_data):
"""Test successfully getting setup info for a simple agent"""
# Use test data from fixture
user = setup_test_data["user"]
graph = setup_test_data["graph"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = GetRequiredSetupInfoTool()
# Build the proper marketplace agent_id format: username/slug
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute the tool
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={"test_input": "Hello World"},
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Check the basic structure
assert "setup_info" in result_data
setup_info = result_data["setup_info"]
# Check agent info
assert "agent_id" in setup_info
assert setup_info["agent_id"] == graph.id
assert "agent_name" in setup_info
assert setup_info["agent_name"] == "Test Agent"
# Check requirements
assert "requirements" in setup_info
requirements = setup_info["requirements"]
assert "credentials" in requirements
assert "inputs" in requirements
assert "execution_modes" in requirements
# Simple agent should have no credentials required
assert isinstance(requirements["credentials"], list)
assert len(requirements["credentials"]) == 0
# Check inputs format
assert isinstance(requirements["inputs"], list)
if len(requirements["inputs"]) > 0:
first_input = requirements["inputs"][0]
assert "name" in first_input
assert "title" in first_input
assert "type" in first_input
# Check execution modes
assert isinstance(requirements["execution_modes"], list)
assert "manual" in requirements["execution_modes"]
assert "scheduled" in requirements["execution_modes"]
# Check user readiness
assert "user_readiness" in setup_info
user_readiness = setup_info["user_readiness"]
assert "has_all_credentials" in user_readiness
assert "ready_to_run" in user_readiness
# Simple agent with inputs provided should be ready
assert user_readiness["ready_to_run"] is True
@pytest.mark.asyncio(scope="session")
async def test_get_required_setup_info_missing_credentials(setup_firecrawl_test_data):
"""Test getting setup info for an agent requiring missing credentials"""
# Use test data from fixture
user = setup_firecrawl_test_data["user"]
store_submission = setup_firecrawl_test_data["store_submission"]
# Create the tool instance
tool = GetRequiredSetupInfoTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute the tool
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={"url": "https://example.com"},
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Check setup info
assert "setup_info" in result_data
setup_info = result_data["setup_info"]
# Check requirements
requirements = setup_info["requirements"]
# Should have Firecrawl credentials required
assert "credentials" in requirements
assert isinstance(requirements["credentials"], list)
assert len(requirements["credentials"]) > 0
# Check the credential requirement
firecrawl_cred = requirements["credentials"][0]
assert "provider" in firecrawl_cred
assert firecrawl_cred["provider"] == "firecrawl"
assert "type" in firecrawl_cred
assert firecrawl_cred["type"] == "api_key"
# Check user readiness - should NOT be ready since credentials are missing
user_readiness = setup_info["user_readiness"]
assert user_readiness["has_all_credentials"] is False
assert user_readiness["ready_to_run"] is False
# Check missing credentials
assert "missing_credentials" in user_readiness
assert isinstance(user_readiness["missing_credentials"], dict)
assert len(user_readiness["missing_credentials"]) > 0
@pytest.mark.asyncio(scope="session")
async def test_get_required_setup_info_with_available_credentials(setup_llm_test_data):
"""Test getting setup info when user has required credentials"""
# Use test data from fixture (includes OpenAI credentials)
user = setup_llm_test_data["user"]
store_submission = setup_llm_test_data["store_submission"]
# Create the tool instance
tool = GetRequiredSetupInfoTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute the tool
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={"user_prompt": "What is 2+2?"},
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Check setup info
setup_info = result_data["setup_info"]
# Check user readiness - should be ready since credentials are available
user_readiness = setup_info["user_readiness"]
assert user_readiness["has_all_credentials"] is True
assert user_readiness["ready_to_run"] is True
# Missing credentials should be empty
assert "missing_credentials" in user_readiness
assert len(user_readiness["missing_credentials"]) == 0
@pytest.mark.asyncio(scope="session")
async def test_get_required_setup_info_missing_inputs(setup_test_data):
"""Test getting setup info when required inputs are not provided"""
# Use test data from fixture
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = GetRequiredSetupInfoTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute the tool WITHOUT providing inputs
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={}, # Empty inputs
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Check setup info
setup_info = result_data["setup_info"]
# Check requirements
requirements = setup_info["requirements"]
assert "inputs" in requirements
assert isinstance(requirements["inputs"], list)
# User readiness depends on whether inputs are required or optional
user_readiness = setup_info["user_readiness"]
assert "ready_to_run" in user_readiness
@pytest.mark.asyncio(scope="session")
async def test_get_required_setup_info_invalid_agent():
"""Test getting setup info for a non-existent agent"""
# Create the tool instance
tool = GetRequiredSetupInfoTool()
# Execute with invalid agent ID
response = await tool.execute(
user_id=str(uuid.uuid4()),
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug="invalid/agent",
inputs={},
)
# Verify error response
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "message" in result_data
# Should indicate failure or not found
assert any(
phrase in result_data["message"].lower()
for phrase in ["not found", "failed", "error"]
)
@pytest.mark.asyncio(scope="session")
async def test_get_required_setup_info_graph_metadata(setup_test_data):
"""Test that setup info includes graph metadata"""
# Use test data from fixture
user = setup_test_data["user"]
graph = setup_test_data["graph"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = GetRequiredSetupInfoTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute the tool
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={"test_input": "test"},
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Check that graph_id and graph_version are included
assert "graph_id" in result_data
assert result_data["graph_id"] == graph.id
assert "graph_version" in result_data
assert result_data["graph_version"] == graph.version
@pytest.mark.asyncio(scope="session")
async def test_get_required_setup_info_inputs_structure(setup_test_data):
"""Test that inputs are properly structured as a list"""
# Use test data from fixture
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = GetRequiredSetupInfoTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute the tool
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={},
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Check inputs structure
setup_info = result_data["setup_info"]
requirements = setup_info["requirements"]
# Inputs should be a list
assert isinstance(requirements["inputs"], list)
# Each input should have proper structure
for input_field in requirements["inputs"]:
assert isinstance(input_field, dict)
assert "name" in input_field
assert "title" in input_field
assert "type" in input_field
assert "description" in input_field
assert "required" in input_field
assert isinstance(input_field["required"], bool)
@pytest.mark.asyncio(scope="session")
async def test_get_required_setup_info_execution_modes_structure(setup_test_data):
"""Test that execution_modes are properly structured as a list"""
# Use test data from fixture
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = GetRequiredSetupInfoTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute the tool
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={},
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Check execution modes structure
setup_info = result_data["setup_info"]
requirements = setup_info["requirements"]
# execution_modes should be a list of strings
assert isinstance(requirements["execution_modes"], list)
for mode in requirements["execution_modes"]:
assert isinstance(mode, str)
assert mode in ["manual", "scheduled", "webhook"]

View File

@@ -0,0 +1,279 @@
"""Pydantic models for tool responses."""
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
from backend.data.model import CredentialsMetaInput
class ResponseType(str, Enum):
"""Types of tool responses."""
AGENT_CAROUSEL = "agent_carousel"
AGENT_DETAILS = "agent_details"
AGENT_DETAILS_NEED_LOGIN = "agent_details_need_login"
AGENT_DETAILS_NEED_CREDENTIALS = "agent_details_need_credentials"
SETUP_REQUIREMENTS = "setup_requirements"
SCHEDULE_CREATED = "schedule_created"
WEBHOOK_CREATED = "webhook_created"
PRESET_CREATED = "preset_created"
EXECUTION_STARTED = "execution_started"
NEED_LOGIN = "need_login"
NEED_CREDENTIALS = "need_credentials"
INSUFFICIENT_CREDITS = "insufficient_credits"
VALIDATION_ERROR = "validation_error"
ERROR = "error"
NO_RESULTS = "no_results"
SUCCESS = "success"
# Base response model
class ToolResponseBase(BaseModel):
"""Base model for all tool responses."""
type: ResponseType
message: str
session_id: str | None = None
# Agent discovery models
class AgentInfo(BaseModel):
"""Information about an agent."""
id: str
name: str
description: str
source: str = Field(description="marketplace or library")
in_library: bool = False
creator: str | None = None
category: str | None = None
rating: float | None = None
runs: int | None = None
is_featured: bool | None = None
status: str | None = None
can_access_graph: bool | None = None
has_external_trigger: bool | None = None
new_output: bool | None = None
graph_id: str | None = None
class AgentCarouselResponse(ToolResponseBase):
"""Response for find_agent tool."""
type: ResponseType = ResponseType.AGENT_CAROUSEL
title: str = "Available Agents"
agents: list[AgentInfo]
count: int
class NoResultsResponse(ToolResponseBase):
"""Response when no agents found."""
type: ResponseType = ResponseType.NO_RESULTS
suggestions: list[str] = []
# Agent details models
class InputField(BaseModel):
"""Input field specification."""
name: str
type: str = "string"
description: str = ""
required: bool = False
default: Any | None = None
options: list[Any] | None = None
format: str | None = None
class ExecutionOptions(BaseModel):
"""Available execution options for an agent."""
manual: bool = True
scheduled: bool = True
webhook: bool = False
class AgentDetails(BaseModel):
"""Detailed agent information."""
id: str
name: str
description: str
in_library: bool = False
inputs: dict[str, Any] = {}
credentials: list[CredentialsMetaInput] = []
execution_options: ExecutionOptions = Field(default_factory=ExecutionOptions)
trigger_info: dict[str, Any] | None = None
class AgentDetailsResponse(ToolResponseBase):
"""Response for get_agent_details tool."""
type: ResponseType = ResponseType.AGENT_DETAILS
agent: AgentDetails
user_authenticated: bool = False
graph_id: str | None = None
graph_version: int | None = None
class AgentDetailsNeedLoginResponse(ToolResponseBase):
"""Response when agent details need login."""
type: ResponseType = ResponseType.AGENT_DETAILS_NEED_LOGIN
agent: AgentDetails
agent_info: dict[str, Any] | None = None
graph_id: str | None = None
graph_version: int | None = None
class AgentDetailsNeedCredentialsResponse(ToolResponseBase):
"""Response when agent needs credentials to be configured."""
type: ResponseType = ResponseType.NEED_CREDENTIALS
agent: AgentDetails
credentials_schema: dict[str, Any]
agent_info: dict[str, Any] | None = None
graph_id: str | None = None
graph_version: int | None = None
# Setup info models
class SetupRequirementInfo(BaseModel):
"""Setup requirement information."""
key: str
provider: str
required: bool = True
user_has: bool = False
credential_id: str | None = None
type: str | None = None
scopes: list[str] | None = None
description: str | None = None
class ExecutionModeInfo(BaseModel):
"""Execution mode information."""
type: str # manual, scheduled, webhook
description: str
supported: bool
config_required: dict[str, str] | None = None
trigger_info: dict[str, Any] | None = None
class UserReadiness(BaseModel):
"""User readiness status."""
has_all_credentials: bool = False
missing_credentials: dict[str, Any] = {}
ready_to_run: bool = False
class SetupInfo(BaseModel):
"""Complete setup information."""
agent_id: str
agent_name: str
requirements: dict[str, list[Any]] = Field(
default_factory=lambda: {
"credentials": [],
"inputs": [],
"execution_modes": [],
},
)
user_readiness: UserReadiness = Field(default_factory=UserReadiness)
setup_instructions: list[str] = []
class SetupRequirementsResponse(ToolResponseBase):
"""Response for get_required_setup_info tool."""
type: ResponseType = ResponseType.SETUP_REQUIREMENTS
setup_info: SetupInfo
graph_id: str | None = None
graph_version: int | None = None
# Setup agent models
class ScheduleCreatedResponse(ToolResponseBase):
"""Response for scheduled agent setup."""
type: ResponseType = ResponseType.SCHEDULE_CREATED
schedule_id: str
name: str
cron: str
timezone: str = "UTC"
next_run: str | None = None
graph_id: str
graph_name: str
class WebhookCreatedResponse(ToolResponseBase):
"""Response for webhook agent setup."""
type: ResponseType = ResponseType.WEBHOOK_CREATED
webhook_id: str
webhook_url: str
preset_id: str | None = None
name: str
graph_id: str
graph_name: str
class PresetCreatedResponse(ToolResponseBase):
"""Response for preset agent setup."""
type: ResponseType = ResponseType.PRESET_CREATED
preset_id: str
name: str
graph_id: str
graph_name: str
# Run agent models
class ExecutionStartedResponse(ToolResponseBase):
"""Response for agent execution started."""
type: ResponseType = ResponseType.EXECUTION_STARTED
execution_id: str
graph_id: str
graph_name: str
status: str = "QUEUED"
ended_at: str | None = None
outputs: dict[str, Any] | None = None
error: str | None = None
timeout_reached: bool | None = None
class InsufficientCreditsResponse(ToolResponseBase):
"""Response for insufficient credits."""
type: ResponseType = ResponseType.INSUFFICIENT_CREDITS
balance: float
class ValidationErrorResponse(ToolResponseBase):
"""Response for validation errors."""
type: ResponseType = ResponseType.VALIDATION_ERROR
error: str
details: dict[str, Any] | None = None
# Auth/error models
class NeedLoginResponse(ToolResponseBase):
"""Response when login is needed."""
type: ResponseType = ResponseType.NEED_LOGIN
agent_info: dict[str, Any] | None = None
class ErrorResponse(ToolResponseBase):
"""Response for errors."""
type: ResponseType = ResponseType.ERROR
error: str | None = None
details: dict[str, Any] | None = None

View File

@@ -0,0 +1,241 @@
"""Tool for running an agent manually (one-off execution)."""
import logging
from typing import Any
from backend.data.graph import get_graph
from backend.data.model import CredentialsMetaInput
from backend.executor import utils as execution_utils
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.server.v2.chat.tools.base import BaseTool
from backend.server.v2.chat.tools.get_required_setup_info import (
GetRequiredSetupInfoTool,
)
from backend.server.v2.chat.tools.models import (
ErrorResponse,
ExecutionStartedResponse,
SetupInfo,
SetupRequirementsResponse,
ToolResponseBase,
)
from backend.server.v2.library import db as library_db
from backend.server.v2.library import model as library_model
logger = logging.getLogger(__name__)
class RunAgentTool(BaseTool):
"""Tool for executing an agent manually with immediate results."""
@property
def name(self) -> str:
return "run_agent"
@property
def description(self) -> str:
return """Run an agent immediately (one-off manual execution).
IMPORTANT: Before calling this tool, you MUST first call get_agent_details to determine what inputs are required.
The 'inputs' parameter must be a dictionary containing ALL required input values identified by get_agent_details.
Example: If get_agent_details shows required inputs 'search_query' and 'max_results', you must pass:
inputs={"search_query": "user's query", "max_results": 10}"""
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"username_agent_slug": {
"type": "string",
"description": "The ID of the agent to run (graph ID or marketplace slug)",
},
"inputs": {
"type": "object",
"description": 'REQUIRED: Dictionary of input values. Must include ALL required inputs from get_agent_details. Format: {"input_name": value}',
"additionalProperties": True,
},
},
"required": ["username_agent_slug"],
}
@property
def requires_auth(self) -> bool:
"""This tool requires authentication."""
return True
async def _execute(
self,
user_id: str | None,
session_id: str,
**kwargs,
) -> ToolResponseBase:
"""Execute an agent manually.
Args:
user_id: Authenticated user ID
session_id: Chat session ID
**kwargs: Execution parameters
Returns:
JSON formatted execution result
"""
assert (
user_id is not None
), "User ID is required to run an agent. Superclass enforces authentication."
username_agent_slug = kwargs.get("username_agent_slug", "").strip()
inputs = kwargs.get("inputs", {})
# Call _execute directly since we're calling internally from another tool
response = await GetRequiredSetupInfoTool()._execute(
user_id, session_id, **kwargs
)
if not isinstance(response, SetupRequirementsResponse):
return ErrorResponse(
message="Failed to get required setup information",
session_id=session_id,
)
setup_info = SetupInfo.model_validate(response.setup_info)
if not setup_info.user_readiness.ready_to_run:
return ErrorResponse(
message=f"User is not ready to run the agent. User Readiness: {setup_info.user_readiness.model_dump_json()} Requirments: {setup_info.requirements}",
session_id=session_id,
)
# Get the graph using the graph_id and graph_version from the setup response
if not response.graph_id or not response.graph_version:
return ErrorResponse(
message=f"Graph information not available for {username_agent_slug}",
session_id=session_id,
)
graph = await get_graph(
graph_id=response.graph_id,
version=response.graph_version,
user_id=None, # Public access for store graphs
include_subgraphs=True,
)
if not graph:
return ErrorResponse(
message=f"Graph {username_agent_slug} ({response.graph_id}v{response.graph_version}) not found",
session_id=session_id,
)
# Check if we already have a library agent for this graph
existing_library_agent = await library_db.get_library_agent_by_graph_id(
graph_id=graph.id, user_id=user_id
)
if not existing_library_agent:
# Now we need to add the graph to the users library
library_agents: list[library_model.LibraryAgent] = (
await library_db.create_library_agent(
graph=graph,
user_id=user_id,
create_library_agents_for_sub_graphs=False,
)
)
assert len(library_agents) == 1, "Expected 1 library agent to be created"
library_agent = library_agents[0]
else:
library_agent = existing_library_agent
# Build credentials mapping for the graph
graph_credentials_inputs: dict[str, CredentialsMetaInput] = {}
# Get aggregated credentials requirements from the graph
aggregated_creds = graph.aggregate_credentials_inputs()
logger.debug(
f"Matching credentials for graph {graph.id}: {len(aggregated_creds)} required"
)
if aggregated_creds:
# Get all available credentials for the user
creds_manager = IntegrationCredentialsManager()
available_creds = await creds_manager.store.get_all_creds(user_id)
# Track unmatched credentials for error reporting
missing_creds: list[str] = []
# For each required credential field, find a matching user credential
# field_info.provider is a frozenset because aggregate_credentials_inputs()
# combines requirements from multiple nodes. A credential matches if its
# provider is in the set of acceptable providers.
for credential_field_name, (
credential_requirements,
_node_fields,
) in aggregated_creds.items():
# Find first matching credential by provider and type
matching_cred = next(
(
cred
for cred in available_creds
if cred.provider in credential_requirements.provider
and cred.type in credential_requirements.supported_types
),
None,
)
if matching_cred:
# Use Pydantic validation to ensure type safety
try:
graph_credentials_inputs[credential_field_name] = (
CredentialsMetaInput(
id=matching_cred.id,
provider=matching_cred.provider, # type: ignore
type=matching_cred.type,
title=matching_cred.title,
)
)
except Exception as e:
logger.error(
f"Failed to create CredentialsMetaInput for field '{credential_field_name}': "
f"provider={matching_cred.provider}, type={matching_cred.type}, "
f"credential_id={matching_cred.id}",
exc_info=True,
)
missing_creds.append(
f"{credential_field_name} (validation failed: {e})"
)
else:
missing_creds.append(
f"{credential_field_name} "
f"(requires provider in {list(credential_requirements.provider)}, "
f"type in {list(credential_requirements.supported_types)})"
)
# Fail fast if any required credentials are missing
if missing_creds:
logger.warning(
f"Cannot execute agent - missing credentials: {missing_creds}"
)
return ErrorResponse(
message=f"Cannot execute agent: missing {len(missing_creds)} required credential(s). You need to call the get_required_setup_info tool to setup the credentials."
f"Please set up the following credentials: {', '.join(missing_creds)}",
session_id=session_id,
details={"missing_credentials": missing_creds},
)
logger.info(
f"Credential matching complete: {len(graph_credentials_inputs)}/{len(aggregated_creds)} matched"
)
# At this point we know the user is ready to run the agent
# So we can execute the agent
execution = await execution_utils.add_graph_execution(
graph_id=library_agent.graph_id,
user_id=user_id,
inputs=inputs,
graph_credentials_inputs=graph_credentials_inputs,
)
return ExecutionStartedResponse(
message="Agent execution successfully started. Do not run this tool again unless specifically asked to run the agent again.",
session_id=session_id,
execution_id=execution.id,
graph_id=library_agent.graph_id,
graph_name=library_agent.name,
)

View File

@@ -0,0 +1,151 @@
import uuid
import orjson
import pytest
from backend.server.v2.chat.tools._test_data import setup_llm_test_data, setup_test_data
from backend.server.v2.chat.tools.run_agent import RunAgentTool
# This is so the formatter doesn't remove the fixture imports
setup_llm_test_data = setup_llm_test_data
setup_test_data = setup_test_data
@pytest.mark.asyncio(scope="session")
async def test_run_agent(setup_test_data):
"""Test that the run_agent tool successfully executes an approved agent"""
# Use test data from fixture
user = setup_test_data["user"]
graph = setup_test_data["graph"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = RunAgentTool()
# Build the proper marketplace agent_id format: username/slug
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute the tool
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={"test_input": "Hello World"},
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON to verify the execution started
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "execution_id" in result_data
assert "graph_id" in result_data
assert result_data["graph_id"] == graph.id
assert "graph_name" in result_data
assert result_data["graph_name"] == "Test Agent"
@pytest.mark.asyncio(scope="session")
async def test_run_agent_missing_inputs(setup_test_data):
"""Test that the run_agent tool returns error when inputs are missing"""
# Use test data from fixture
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = RunAgentTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute the tool without required inputs
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={}, # Missing required input
)
# Verify that we get an error response
assert response is not None
assert hasattr(response, "result")
# The tool should return an ErrorResponse when setup info indicates not ready
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "message" in result_data
@pytest.mark.asyncio(scope="session")
async def test_run_agent_invalid_agent_id(setup_test_data):
"""Test that the run_agent tool returns error for invalid agent ID"""
# Use test data from fixture
user = setup_test_data["user"]
# Create the tool instance
tool = RunAgentTool()
# Execute the tool with invalid agent ID
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug="invalid/agent-id",
inputs={"test_input": "Hello World"},
)
# Verify that we get an error response
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "message" in result_data
# Should get an error about failed setup or not found
assert any(
phrase in result_data["message"].lower() for phrase in ["not found", "failed"]
)
@pytest.mark.asyncio(scope="session")
async def test_run_agent_with_llm_credentials(setup_llm_test_data):
"""Test that run_agent works with an agent requiring LLM credentials"""
# Use test data from fixture
user = setup_llm_test_data["user"]
graph = setup_llm_test_data["graph"]
store_submission = setup_llm_test_data["store_submission"]
# Create the tool instance
tool = RunAgentTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute the tool with a prompt for the LLM
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={"user_prompt": "What is 2+2?"},
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON to verify the execution started
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Should successfully start execution since credentials are available
assert "execution_id" in result_data
assert "graph_id" in result_data
assert result_data["graph_id"] == graph.id
assert "graph_name" in result_data
assert result_data["graph_name"] == "LLM Test Agent"

View File

@@ -0,0 +1,376 @@
"""Tool for setting up an agent with credentials and configuration."""
import logging
from typing import Any
from pydantic import BaseModel
from backend.data.graph import get_graph
from backend.data.model import CredentialsMetaInput
from backend.data.user import get_user_by_id
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.server.v2.chat.tools.get_required_setup_info import (
GetRequiredSetupInfoTool,
)
from backend.server.v2.chat.tools.models import (
ExecutionStartedResponse,
SetupInfo,
SetupRequirementsResponse,
)
from backend.server.v2.library import db as library_db
from backend.server.v2.library import model as library_model
from backend.util.clients import get_scheduler_client
from backend.util.timezone_utils import (
convert_utc_time_to_user_timezone,
get_user_timezone_or_utc,
)
from .base import BaseTool
from .models import ErrorResponse, ToolResponseBase
logger = logging.getLogger(__name__)
class AgentDetails(BaseModel):
graph_name: str
graph_id: str
graph_version: int
recommended_schedule_cron: str | None
required_credentials: dict[str, CredentialsMetaInput]
class SetupAgentTool(BaseTool):
"""Tool for setting up an agent with scheduled execution or webhook triggers."""
@property
def name(self) -> str:
return "schedule_agent"
@property
def description(self) -> str:
return """Set up an agent with credentials and configure it for scheduled execution or webhook triggers.
IMPORTANT: Before calling this tool, you MUST first call get_agent_details to determine what inputs are required.
For SCHEDULED execution:
- Cron format: "minute hour day month weekday" (e.g., "0 9 * * 1-5" = 9am weekdays)
- Common patterns: "0 * * * *" (hourly), "0 0 * * *" (daily at midnight), "0 9 * * 1" (Mondays at 9am)
- Timezone: Use IANA timezone names like "America/New_York", "Europe/London", "Asia/Tokyo"
- The 'inputs' parameter must contain ALL required inputs from get_agent_details as a dictionary
For WEBHOOK triggers:
- The agent will be triggered by external events
- Still requires all input values from get_agent_details"""
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"username_agent_slug": {
"type": "string",
"description": "The marketplace agent slug (e.g., 'username/agent-name')",
},
"setup_type": {
"type": "string",
"enum": ["schedule", "webhook"],
"description": "Type of setup: 'schedule' for cron, 'webhook' for triggers.",
},
"name": {
"type": "string",
"description": "Name for this setup/schedule (e.g., 'Daily Report', 'Weekly Summary')",
},
"description": {
"type": "string",
"description": "Description of this setup",
},
"cron": {
"type": "string",
"description": "Cron expression (5 fields: minute hour day month weekday). Examples: '0 9 * * 1-5' (9am weekdays), '*/30 * * * *' (every 30 min)",
},
"timezone": {
"type": "string",
"description": "IANA timezone (e.g., 'America/New_York', 'Europe/London', 'UTC'). Defaults to UTC if not specified.",
},
"inputs": {
"type": "object",
"description": 'REQUIRED: Dictionary with ALL required inputs from get_agent_details. Format: {"input_name": value}',
"additionalProperties": True,
},
"webhook_config": {
"type": "object",
"description": "Webhook configuration (required if setup_type is 'webhook')",
"additionalProperties": True,
},
},
"required": ["username_agent_slug", "setup_type"],
}
@property
def requires_auth(self) -> bool:
"""This tool requires authentication."""
return True
async def _execute(
self,
user_id: str | None,
session_id: str,
**kwargs,
) -> ToolResponseBase:
"""Set up an agent with configuration.
Args:
user_id: Authenticated user ID
session_id: Chat session ID
**kwargs: Setup parameters
Returns:
JSON formatted setup result
"""
assert (
user_id is not None
), "User ID is required to run an agent. Superclass enforces authentication."
setup_type = kwargs.get("setup_type", "schedule").strip()
if setup_type != "schedule":
return ErrorResponse(
message="Only schedule setup is supported at this time",
session_id=session_id,
)
else:
cron = kwargs.get("cron", "").strip()
cron_name = kwargs.get("name", "").strip()
if not cron or not cron_name:
return ErrorResponse(
message="Cron and name are required for schedule setup",
session_id=session_id,
)
username_agent_slug = kwargs.get("username_agent_slug", "").strip()
inputs = kwargs.get("inputs", {})
library_agent = await self._get_or_add_library_agent(
username_agent_slug, user_id, session_id, **kwargs
)
if not isinstance(library_agent, AgentDetails):
# library agent is an ErrorResponse
return library_agent
# At this point we know the user is ready to run the agent
# Create the schedule for the agent
from backend.server.v2.library import db as library_db
# Get the library agent model for scheduling
lib_agent = await library_db.get_library_agent_by_graph_id(
graph_id=library_agent.graph_id, user_id=user_id
)
if not lib_agent:
return ErrorResponse(
message=f"Library agent not found for graph {library_agent.graph_id}",
session_id=session_id,
)
return await self._add_graph_execution_schedule(
library_agent=lib_agent,
user_id=user_id,
cron=cron,
name=cron_name,
inputs=inputs,
credentials=library_agent.required_credentials,
session_id=session_id,
)
async def _add_graph_execution_schedule(
self,
library_agent: library_model.LibraryAgent,
user_id: str,
cron: str,
name: str,
inputs: dict[str, Any],
credentials: dict[str, CredentialsMetaInput],
session_id: str,
**kwargs,
) -> ExecutionStartedResponse | ErrorResponse:
# Use timezone from request if provided, otherwise fetch from user profile
user = await get_user_by_id(user_id)
user_timezone = get_user_timezone_or_utc(user.timezone if user else None)
# Map required credentials (schema field names) to actual user credential IDs
# credentials param contains CredentialsMetaInput with schema field names as keys
# We need to find the user's actual credentials that match the provider/type
creds_manager = IntegrationCredentialsManager()
user_credentials = await creds_manager.store.get_all_creds(user_id)
# Build a mapping from schema field name -> actual credential ID
resolved_credentials: dict[str, CredentialsMetaInput] = {}
missing_credentials: list[str] = []
for field_name, cred_meta in credentials.items():
# Find a matching credential from the user's credentials
matching_cred = next(
(
c
for c in user_credentials
if c.provider == cred_meta.provider and c.type == cred_meta.type
),
None,
)
if matching_cred:
# Use the actual credential ID instead of the schema field name
# Create a new CredentialsMetaInput with the actual credential ID
# but keep the same provider/type from the original meta
resolved_credentials[field_name] = CredentialsMetaInput(
id=matching_cred.id,
provider=cred_meta.provider,
type=cred_meta.type,
title=cred_meta.title,
)
else:
missing_credentials.append(
f"{cred_meta.title} ({cred_meta.provider}/{cred_meta.type})"
)
if missing_credentials:
return ErrorResponse(
message=f"Cannot execute agent: missing {len(missing_credentials)} required credential(s). You need to call the get_required_setup_info tool to setup the credentials.",
session_id=session_id,
)
result = await get_scheduler_client().add_execution_schedule(
user_id=user_id,
graph_id=library_agent.graph_id,
graph_version=library_agent.graph_version,
name=name,
cron=cron,
input_data=inputs,
input_credentials=resolved_credentials,
user_timezone=user_timezone,
)
# Convert the next_run_time back to user timezone for display
if result.next_run_time:
result.next_run_time = convert_utc_time_to_user_timezone(
result.next_run_time, user_timezone
)
return ExecutionStartedResponse(
message="Agent execution successfully scheduled. Do not run this tool again unless specifically asked to run the agent again.",
session_id=session_id,
execution_id=result.id,
graph_id=library_agent.graph_id,
graph_name=library_agent.name,
)
async def _get_or_add_library_agent(
self, agent_id: str, user_id: str, session_id: str, **kwargs
) -> AgentDetails | ErrorResponse:
# Call _execute directly since we're calling internally from another tool
response = await GetRequiredSetupInfoTool()._execute(
user_id, session_id, **kwargs
)
if not isinstance(response, SetupRequirementsResponse):
return ErrorResponse(
message="Failed to get required setup information",
session_id=session_id,
)
setup_info = SetupInfo.model_validate(response.setup_info)
if not setup_info.user_readiness.ready_to_run:
return ErrorResponse(
message=f"User is not ready to run the agent. User Readiness: {setup_info.user_readiness.model_dump_json()} Requirments: {setup_info.requirements}",
session_id=session_id,
)
# Get the graph using the graph_id and graph_version from the setup response
if not response.graph_id or not response.graph_version:
return ErrorResponse(
message=f"Graph information not available for {agent_id}",
session_id=session_id,
)
graph = await get_graph(
graph_id=response.graph_id,
version=response.graph_version,
user_id=None, # Public access for store graphs
include_subgraphs=True,
)
if not graph:
return ErrorResponse(
message=f"Graph {agent_id} ({response.graph_id}v{response.graph_version}) not found",
session_id=session_id,
)
recommended_schedule_cron = graph.recommended_schedule_cron
# Extract credentials from the JSON schema properties
credentials_input_schema = graph.credentials_input_schema
required_credentials: dict[str, CredentialsMetaInput] = {}
if (
isinstance(credentials_input_schema, dict)
and "properties" in credentials_input_schema
):
for cred_name, cred_schema in credentials_input_schema[
"properties"
].items():
# Get provider from credentials_provider array or properties.provider.const
provider = "unknown"
if (
"credentials_provider" in cred_schema
and cred_schema["credentials_provider"]
):
provider = cred_schema["credentials_provider"][0]
elif (
"properties" in cred_schema
and "provider" in cred_schema["properties"]
):
provider = cred_schema["properties"]["provider"].get(
"const", "unknown"
)
# Get type from credentials_types array or properties.type.const
cred_type = "api_key" # Default
if (
"credentials_types" in cred_schema
and cred_schema["credentials_types"]
):
cred_type = cred_schema["credentials_types"][0]
elif (
"properties" in cred_schema and "type" in cred_schema["properties"]
):
cred_type = cred_schema["properties"]["type"].get(
"const", "api_key"
)
required_credentials[cred_name] = CredentialsMetaInput(
id=cred_name,
title=cred_schema.get("title", cred_name),
provider=provider, # type: ignore
type=cred_type,
)
# Check if we already have a library agent for this graph
existing_library_agent = await library_db.get_library_agent_by_graph_id(
graph_id=graph.id, user_id=user_id
)
if not existing_library_agent:
# Now we need to add the graph to the users library
library_agents: list[library_model.LibraryAgent] = (
await library_db.create_library_agent(
graph=graph,
user_id=user_id,
create_library_agents_for_sub_graphs=False,
)
)
assert len(library_agents) == 1, "Expected 1 library agent to be created"
library_agent = library_agents[0]
else:
library_agent = existing_library_agent
return AgentDetails(
graph_name=graph.name,
graph_id=library_agent.graph_id,
graph_version=library_agent.graph_version,
recommended_schedule_cron=recommended_schedule_cron,
required_credentials=required_credentials,
)

View File

@@ -0,0 +1,394 @@
import uuid
import orjson
import pytest
from backend.server.v2.chat.tools._test_data import setup_llm_test_data, setup_test_data
from backend.server.v2.chat.tools.setup_agent import SetupAgentTool
from backend.util.clients import get_scheduler_client
# This is so the formatter doesn't remove the fixture imports
setup_llm_test_data = setup_llm_test_data
setup_test_data = setup_test_data
@pytest.mark.asyncio(scope="session")
async def test_setup_agent_missing_cron(setup_test_data):
"""Test error when cron is missing for schedule setup"""
# Use test data from fixture
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = SetupAgentTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute without cron
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
setup_type="schedule",
inputs={"test_input": "Hello World"},
# Missing: cron and name
)
# Verify error response
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "message" in result_data
assert (
"cron" in result_data["message"].lower()
or "name" in result_data["message"].lower()
)
@pytest.mark.asyncio(scope="session")
async def test_setup_agent_webhook_not_supported(setup_test_data):
"""Test error when webhook setup is attempted"""
# Use test data from fixture
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = SetupAgentTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute with webhook setup_type
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
setup_type="webhook",
inputs={"test_input": "Hello World"},
)
# Verify error response
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "message" in result_data
message_lower = result_data["message"].lower()
assert "schedule" in message_lower and "supported" in message_lower
@pytest.mark.asyncio(scope="session")
@pytest.mark.skip(reason="Requires scheduler service to be running")
async def test_setup_agent_schedule_success(setup_test_data):
"""Test successfully setting up an agent with a schedule"""
# Use test data from fixture
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = SetupAgentTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute with schedule setup
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
setup_type="schedule",
name="Test Schedule",
description="Test schedule description",
cron="0 9 * * *", # Daily at 9am
timezone="UTC",
inputs={"test_input": "Hello World"},
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Check for execution started
assert "message" in result_data
assert "execution_id" in result_data
assert "graph_id" in result_data
assert "graph_name" in result_data
@pytest.mark.asyncio(scope="session")
@pytest.mark.skip(reason="Requires scheduler service to be running")
async def test_setup_agent_with_credentials(setup_llm_test_data):
"""Test setting up an agent that requires credentials"""
# Use test data from fixture (includes OpenAI credentials)
user = setup_llm_test_data["user"]
store_submission = setup_llm_test_data["store_submission"]
# Create the tool instance
tool = SetupAgentTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute with schedule setup
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
setup_type="schedule",
name="LLM Schedule",
description="LLM schedule with credentials",
cron="*/30 * * * *", # Every 30 minutes
timezone="America/New_York",
inputs={"user_prompt": "What is 2+2?"},
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Should succeed since user has OpenAI credentials
assert "execution_id" in result_data
assert "graph_id" in result_data
@pytest.mark.asyncio(scope="session")
async def test_setup_agent_invalid_agent(setup_test_data):
"""Test error when agent doesn't exist"""
# Use test data from fixture
user = setup_test_data["user"]
# Create the tool instance
tool = SetupAgentTool()
# Execute with non-existent agent
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug="nonexistent/agent",
setup_type="schedule",
name="Test Schedule",
cron="0 9 * * *",
inputs={},
)
# Verify error response
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "message" in result_data
# Should fail to find the agent
assert any(
phrase in result_data["message"].lower()
for phrase in ["not found", "failed", "error"]
)
@pytest.mark.asyncio(scope="session")
@pytest.mark.skip(reason="Requires scheduler service to be running")
async def test_setup_agent_schedule_created_in_scheduler(setup_test_data):
"""Test that the schedule is actually created in the scheduler service"""
# Use test data from fixture
user = setup_test_data["user"]
graph = setup_test_data["graph"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = SetupAgentTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Create a unique schedule name to identify this test
schedule_name = f"Test Schedule {uuid.uuid4()}"
# Execute with schedule setup
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
setup_type="schedule",
name=schedule_name,
description="Test schedule to verify credentials",
cron="0 0 * * *", # Daily at midnight
timezone="UTC",
inputs={"test_input": "Scheduled execution"},
)
# Verify the response
assert response is not None
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "execution_id" in result_data
# Now verify the schedule was created in the scheduler service
scheduler = get_scheduler_client()
schedules = await scheduler.get_execution_schedules(graph.id, user.id)
# Find our schedule
our_schedule = None
for schedule in schedules:
if schedule.name == schedule_name:
our_schedule = schedule
break
assert (
our_schedule is not None
), f"Schedule '{schedule_name}' not found in scheduler"
assert our_schedule.cron == "0 0 * * *"
assert our_schedule.graph_id == graph.id
# Clean up: delete the schedule
await scheduler.delete_schedule(our_schedule.id, user_id=user.id)
@pytest.mark.asyncio(scope="session")
@pytest.mark.skip(reason="Requires scheduler service to be running")
async def test_setup_agent_schedule_with_credentials_triggered(setup_llm_test_data):
"""Test that credentials are properly passed when a schedule is triggered"""
# Use test data from fixture (includes OpenAI credentials)
user = setup_llm_test_data["user"]
graph = setup_llm_test_data["graph"]
store_submission = setup_llm_test_data["store_submission"]
# Create the tool instance
tool = SetupAgentTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Create a unique schedule name
schedule_name = f"LLM Test Schedule {uuid.uuid4()}"
# Execute with schedule setup
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
setup_type="schedule",
name=schedule_name,
description="Test LLM schedule with credentials",
cron="* * * * *", # Every minute (for testing)
timezone="UTC",
inputs={"user_prompt": "Test prompt for credentials"},
)
# Verify the response
assert response is not None
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "execution_id" in result_data
# Get the schedule from the scheduler
scheduler = get_scheduler_client()
schedules = await scheduler.get_execution_schedules(graph.id, user.id)
# Find our schedule
our_schedule = None
for schedule in schedules:
if schedule.name == schedule_name:
our_schedule = schedule
break
assert our_schedule is not None, f"Schedule '{schedule_name}' not found"
# Verify the schedule has the correct input data
assert our_schedule.input_data is not None
assert "user_prompt" in our_schedule.input_data
assert our_schedule.input_data["user_prompt"] == "Test prompt for credentials"
# Verify credentials are stored in the schedule
# The credentials should be stored as input_credentials
assert our_schedule.input_credentials is not None
# The credentials should contain the OpenAI provider credential
# Note: The exact structure depends on how credentials are serialized
# We're checking that credentials data exists and has the right provider
if our_schedule.input_credentials:
# Convert to dict if needed
creds_dict = (
our_schedule.input_credentials
if isinstance(our_schedule.input_credentials, dict)
else {}
)
# Check if any credential has openai provider
has_openai_cred = False
for cred_key, cred_value in creds_dict.items():
if isinstance(cred_value, dict):
if cred_value.get("provider") == "openai":
has_openai_cred = True
# Verify the credential has the expected structure
assert "id" in cred_value or "api_key" in cred_value
break
# If we have LLM block, we should have stored credentials
assert has_openai_cred, "OpenAI credentials not found in schedule"
# Clean up: delete the schedule
await scheduler.delete_schedule(our_schedule.id, user_id=user.id)
@pytest.mark.asyncio(scope="session")
@pytest.mark.skip(reason="Requires scheduler service to be running")
async def test_setup_agent_creates_library_agent(setup_test_data):
"""Test that setup creates a library agent for the user"""
# Use test data from fixture
user = setup_test_data["user"]
graph = setup_test_data["graph"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = SetupAgentTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute with schedule setup
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
setup_type="schedule",
name="Library Test Schedule",
cron="0 12 * * *", # Daily at noon
inputs={"test_input": "Library test"},
)
# Verify the response
assert response is not None
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "graph_id" in result_data
assert result_data["graph_id"] == graph.id
# Verify library agent was created
from backend.server.v2.library import db as library_db
library_agent = await library_db.get_library_agent_by_graph_id(
graph_id=graph.id, user_id=user.id
)
assert library_agent is not None
assert library_agent.graph_id == graph.id
assert library_agent.name == "Test Agent"

View File

@@ -98,3 +98,9 @@ class DatabaseError(Exception):
"""Raised when there is an error interacting with the database"""
pass
class RedisError(Exception):
"""Raised when there is an error interacting with Redis"""
pass

View File

@@ -5,7 +5,8 @@ from functools import wraps
from typing import Any, Awaitable, Callable, TypeVar
import ldclient
from fastapi import HTTPException
from autogpt_libs.auth.dependencies import get_optional_user_id
from fastapi import HTTPException, Security
from ldclient import Context, LDClient
from ldclient.config import Config
from typing_extensions import ParamSpec
@@ -36,6 +37,7 @@ class Flag(str, Enum):
BETA_BLOCKS = "beta-blocks"
AGENT_ACTIVITY = "agent-activity"
ENABLE_PLATFORM_PAYMENT = "enable-platform-payment"
CHAT = "chat"
def is_configured() -> bool:
@@ -252,6 +254,72 @@ def feature_flag(
return decorator
def create_feature_flag_dependency(
flag_key: Flag,
default: bool = False,
) -> Callable[[str | None], Awaitable[None]]:
"""
Create a FastAPI dependency that checks a feature flag.
This dependency automatically extracts the user_id from the JWT token
(if present) for proper LaunchDarkly user targeting, while still
supporting anonymous access.
Args:
flag_key: The Flag enum value to check
default: Default value if flag evaluation fails
Returns:
An async dependency function that raises HTTPException if flag is disabled
Example:
router = APIRouter(
dependencies=[Depends(create_feature_flag_dependency(Flag.CHAT))]
)
"""
async def check_feature_flag(
user_id: str | None = Security(get_optional_user_id),
) -> None:
"""Check if feature flag is enabled for the user.
The user_id is automatically injected from JWT authentication if present,
or None for anonymous access.
"""
# For routes that don't require authentication, use anonymous context
check_user_id = user_id or "anonymous"
if not is_configured():
logger.debug(
f"LaunchDarkly not configured, using default {flag_key.value}={default}"
)
if not default:
raise HTTPException(status_code=404, detail="Feature not available")
return
try:
client = get_client()
if not client.is_initialized():
logger.debug(
f"LaunchDarkly not initialized, using default {flag_key.value}={default}"
)
if not default:
raise HTTPException(status_code=404, detail="Feature not available")
return
is_enabled = await is_feature_enabled(flag_key, check_user_id, default)
if not is_enabled:
raise HTTPException(status_code=404, detail="Feature not available")
except Exception as e:
logger.warning(
f"LaunchDarkly error for flag {flag_key.value}: {e}, using default={default}"
)
raise HTTPException(status_code=500, detail="Failed to check feature flag")
return check_feature_flag
@contextlib.contextmanager
def mock_flag_variation(flag_key: str, return_value: Any):
"""Context manager for testing feature flags."""

View File

@@ -4795,6 +4795,181 @@
"security": [{ "APIKeyAuthenticator-X-Postmark-Webhook-Token": [] }]
}
},
"/api/chat/sessions": {
"post": {
"tags": ["v2", "chat", "chat"],
"summary": "Create Session",
"description": "Create a new chat session.\n\nInitiates a new chat session for either an authenticated or anonymous user.\n\nArgs:\n user_id: The optional authenticated user ID parsed from the JWT. If missing, creates an anonymous session.\n\nReturns:\n CreateSessionResponse: Details of the created session.",
"operationId": "postV2CreateSession",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/CreateSessionResponse"
}
}
}
}
},
"security": [{ "HTTPBearer": [] }]
}
},
"/api/chat/sessions/{session_id}": {
"get": {
"tags": ["v2", "chat", "chat"],
"summary": "Get Session",
"description": "Retrieve the details of a specific chat session.\n\nLooks up a chat session by ID for the given user (if authenticated) and returns all session data including messages.\n\nArgs:\n session_id: The unique identifier for the desired chat session.\n user_id: The optional authenticated user ID, or None for anonymous access.\n\nReturns:\n SessionDetailResponse: Details for the requested session; raises NotFoundError if not found.",
"operationId": "getV2GetSession",
"security": [{ "HTTPBearer": [] }],
"parameters": [
{
"name": "session_id",
"in": "path",
"required": true,
"schema": { "type": "string", "title": "Session Id" }
}
],
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/SessionDetailResponse"
}
}
}
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/HTTPValidationError" }
}
}
}
}
}
},
"/api/chat/sessions/{session_id}/stream": {
"get": {
"tags": ["v2", "chat", "chat"],
"summary": "Stream Chat",
"description": "Stream chat responses for a session.\n\nStreams the AI/completion responses in real time over Server-Sent Events (SSE), including:\n - Text fragments as they are generated\n - Tool call UI elements (if invoked)\n - Tool execution results\n\nArgs:\n session_id: The chat session identifier to associate with the streamed messages.\n message: The user's new message to process.\n user_id: Optional authenticated user ID.\n is_user_message: Whether the message is a user message.\nReturns:\n StreamingResponse: SSE-formatted response chunks.",
"operationId": "getV2StreamChat",
"security": [{ "HTTPBearer": [] }],
"parameters": [
{
"name": "session_id",
"in": "path",
"required": true,
"schema": { "type": "string", "title": "Session Id" }
},
{
"name": "message",
"in": "query",
"required": true,
"schema": {
"type": "string",
"minLength": 1,
"maxLength": 10000,
"title": "Message"
}
},
{
"name": "is_user_message",
"in": "query",
"required": false,
"schema": {
"type": "boolean",
"default": true,
"title": "Is User Message"
}
}
],
"responses": {
"200": {
"description": "Successful Response",
"content": { "application/json": { "schema": {} } }
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/HTTPValidationError" }
}
}
}
}
}
},
"/api/chat/sessions/{session_id}/assign-user": {
"patch": {
"tags": ["v2", "chat", "chat"],
"summary": "Session Assign User",
"description": "Assign an authenticated user to a chat session.\n\nUsed (typically post-login) to claim an existing anonymous session as the current authenticated user.\n\nArgs:\n session_id: The identifier for the (previously anonymous) session.\n user_id: The authenticated user's ID to associate with the session.\n\nReturns:\n dict: Status of the assignment.",
"operationId": "patchV2SessionAssignUser",
"security": [{ "HTTPBearer": [] }, { "HTTPBearerJWT": [] }],
"parameters": [
{
"name": "session_id",
"in": "path",
"required": true,
"schema": { "type": "string", "title": "Session Id" }
}
],
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {
"type": "object",
"additionalProperties": true,
"title": "Response Patchv2Sessionassignuser"
}
}
}
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/HTTPValidationError" }
}
}
},
"401": {
"$ref": "#/components/responses/HTTP401NotAuthenticatedError"
}
}
}
},
"/api/chat/health": {
"get": {
"tags": ["v2", "chat", "chat"],
"summary": "Health Check",
"description": "Health check endpoint for the chat service.\n\nPerforms a full cycle test of session creation, assignment, and retrieval. Should always return healthy\nif the service and data layer are operational.\n\nReturns:\n dict: A status dictionary indicating health, service name, and API version.",
"operationId": "getV2HealthCheck",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {
"additionalProperties": true,
"type": "object",
"title": "Response Getv2Healthcheck"
}
}
}
}
},
"security": [{ "HTTPBearer": [] }]
}
},
"/health": {
"get": {
"tags": ["health"],
@@ -5371,6 +5546,20 @@
"required": ["graph"],
"title": "CreateGraph"
},
"CreateSessionResponse": {
"properties": {
"id": { "type": "string", "title": "Id" },
"created_at": { "type": "string", "title": "Created At" },
"user_id": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "User Id"
}
},
"type": "object",
"required": ["id", "created_at", "user_id"],
"title": "CreateSessionResponse",
"description": "Response model containing information on a newly created chat session."
},
"Creator": {
"properties": {
"name": { "type": "string", "title": "Name" },
@@ -7500,6 +7689,26 @@
"required": ["items", "total_items", "page", "more_pages"],
"title": "SearchResponse"
},
"SessionDetailResponse": {
"properties": {
"id": { "type": "string", "title": "Id" },
"created_at": { "type": "string", "title": "Created At" },
"updated_at": { "type": "string", "title": "Updated At" },
"user_id": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "User Id"
},
"messages": {
"items": { "additionalProperties": true, "type": "object" },
"type": "array",
"title": "Messages"
}
},
"type": "object",
"required": ["id", "created_at", "updated_at", "user_id", "messages"],
"title": "SessionDetailResponse",
"description": "Response model providing complete details for a chat session, including messages."
},
"SetGraphActiveVersion": {
"properties": {
"active_graph_version": {
@@ -9654,7 +9863,8 @@
"type": "apiKey",
"in": "header",
"name": "X-Postmark-Webhook-Token"
}
},
"HTTPBearer": { "type": "http", "scheme": "bearer" }
},
"responses": {
"HTTP401NotAuthenticatedError": {