feat(platform): Simplify the chat tool system to use only 2 tools (#11464)

Simplifying the chat tool system to use only 2 tools

### Changes 🏗️

- remove old tools
- expand run_agent tool to include all stages

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [x] tested adding credentials work
  - [x] tested running an agent works
  - [x] tested scheduling an agent works
This commit is contained in:
Swifty
2025-12-01 20:56:16 +01:00
committed by GitHub
parent 7dc3b201b7
commit 9f37342bc6
16 changed files with 1006 additions and 2356 deletions

View File

@@ -17,13 +17,7 @@ from pydantic import BaseModel, Field
from backend.data.api_key import APIKeyInfo
from backend.server.external.middleware import require_permission
from backend.server.v2.chat.model import ChatSession
from backend.server.v2.chat.tools import (
find_agent_tool,
get_agent_details_tool,
get_required_setup_info_tool,
run_agent_tool,
setup_agent_tool,
)
from backend.server.v2.chat.tools import find_agent_tool, run_agent_tool
from backend.server.v2.chat.tools.models import ToolResponseBase
logger = logging.getLogger(__name__)
@@ -40,34 +34,32 @@ class FindAgentRequest(BaseModel):
query: str = Field(..., description="Search query for finding agents")
class AgentSlugRequest(BaseModel):
class RunAgentRequest(BaseModel):
"""Request to run or schedule an agent.
The tool automatically handles the setup flow:
- First call returns available inputs so user can decide what values to use
- Returns missing credentials if user needs to configure them
- Executes when inputs are provided OR use_defaults=true
- Schedules execution if schedule_name and cron are provided
"""
username_agent_slug: str = Field(
...,
description="The marketplace agent slug (e.g., 'username/agent-name')",
)
class GetRequiredSetupInfoRequest(AgentSlugRequest):
inputs: dict[str, Any] = Field(
default_factory=dict,
description="The input dictionary you plan to provide",
)
class RunAgentRequest(AgentSlugRequest):
inputs: dict[str, Any] = Field(
default_factory=dict,
description="Dictionary of input values for the agent",
)
class SetupAgentRequest(AgentSlugRequest):
setup_type: str = Field(
default="schedule",
description="Type of setup: 'schedule' for cron, 'webhook' for triggers",
use_defaults: bool = Field(
default=False,
description="Set to true to run with default values (user must confirm)",
)
schedule_name: str | None = Field(
None,
description="Name for scheduled execution (triggers scheduling mode)",
)
name: str = Field(..., description="Name for this setup/schedule")
description: str | None = Field(None, description="Description of this setup")
cron: str | None = Field(
None,
description="Cron expression (5 fields: minute hour day month weekday)",
@@ -76,27 +68,16 @@ class SetupAgentRequest(AgentSlugRequest):
default="UTC",
description="IANA timezone (e.g., 'America/New_York', 'UTC')",
)
inputs: dict[str, Any] = Field(
default_factory=dict,
description="Dictionary with required inputs for the agent",
)
webhook_config: dict[str, Any] | None = Field(
None,
description="Webhook configuration (required if setup_type is 'webhook')",
)
def _create_ephemeral_session(user_id: str | None) -> ChatSession:
"""Create an ephemeral session for stateless API requests.
Note: These sessions are NOT persisted to Redis, so session-based rate
limiting (max_agent_runs, max_agent_schedules) will not be enforced
across requests.
"""
"""Create an ephemeral session for stateless API requests."""
return ChatSession.new(user_id)
@tools_router.post(path="/find-agent")
@tools_router.post(
path="/find-agent",
)
async def find_agent(
request: FindAgentRequest,
api_key: APIKeyInfo = Security(require_permission(APIKeyPermission.USE_TOOLS)),
@@ -119,71 +100,34 @@ async def find_agent(
return _response_to_dict(result)
@tools_router.post(path="/get-agent-details")
async def get_agent_details(
request: AgentSlugRequest,
api_key: APIKeyInfo = Security(require_permission(APIKeyPermission.USE_TOOLS)),
) -> dict[str, Any]:
"""
Get detailed information about a specific agent including inputs,
credentials required, and execution options.
Args:
request: Agent slug in format 'username/agent-name'
Returns:
Detailed agent information
"""
session = _create_ephemeral_session(api_key.user_id)
result = await get_agent_details_tool._execute(
user_id=api_key.user_id,
session=session,
username_agent_slug=request.username_agent_slug,
)
return _response_to_dict(result)
@tools_router.post(path="/get-required-setup-info")
async def get_required_setup_info(
request: GetRequiredSetupInfoRequest,
api_key: APIKeyInfo = Security(require_permission(APIKeyPermission.USE_TOOLS)),
) -> dict[str, Any]:
"""
Check if an agent can be set up with the provided input data and credentials.
Validates that you have all required inputs before running or scheduling.
Args:
request: Agent slug and optional inputs to validate
Returns:
Setup requirements and user readiness status
"""
session = _create_ephemeral_session(api_key.user_id)
result = await get_required_setup_info_tool._execute(
user_id=api_key.user_id,
session=session,
username_agent_slug=request.username_agent_slug,
inputs=request.inputs,
)
return _response_to_dict(result)
@tools_router.post(path="/run-agent")
@tools_router.post(
path="/run-agent",
)
async def run_agent(
request: RunAgentRequest,
api_key: APIKeyInfo = Security(require_permission(APIKeyPermission.USE_TOOLS)),
) -> dict[str, Any]:
"""
Run an agent immediately (one-off manual execution).
Run or schedule an agent from the marketplace.
IMPORTANT: Before calling this endpoint, first call get-agent-details
to determine what inputs are required.
The endpoint automatically handles the setup flow:
- Returns missing inputs if required fields are not provided
- Returns missing credentials if user needs to configure them
- Executes immediately if all requirements are met
- Schedules execution if schedule_name and cron are provided
For scheduled execution:
- Cron format: "minute hour day month weekday"
- Examples: "0 9 * * 1-5" (9am weekdays), "0 0 * * *" (daily at midnight)
- Timezone: Use IANA timezone names like "America/New_York"
Args:
request: Agent slug and input values
request: Agent slug, inputs, and optional schedule config
Returns:
Execution started response with execution_id
- setup_requirements: If inputs or credentials are missing
- execution_started: If agent was run or scheduled successfully
- error: If something went wrong
"""
session = _create_ephemeral_session(api_key.user_id)
result = await run_agent_tool._execute(
@@ -191,45 +135,10 @@ async def run_agent(
session=session,
username_agent_slug=request.username_agent_slug,
inputs=request.inputs,
)
return _response_to_dict(result)
@tools_router.post(path="/setup-agent")
async def setup_agent(
request: SetupAgentRequest,
api_key: APIKeyInfo = Security(require_permission(APIKeyPermission.USE_TOOLS)),
) -> dict[str, Any]:
"""
Set up an agent with credentials and configure it for scheduled execution
or webhook triggers.
For SCHEDULED execution:
- Cron format: "minute hour day month weekday"
- Examples: "0 9 * * 1-5" (9am weekdays), "0 0 * * *" (daily at midnight)
- Timezone: Use IANA timezone names like "America/New_York", "Europe/London"
For WEBHOOK triggers:
- The agent will be triggered by external events
Args:
request: Agent slug, setup type, schedule configuration, and inputs
Returns:
Schedule or webhook created response
"""
session = _create_ephemeral_session(api_key.user_id)
result = await setup_agent_tool._execute(
user_id=api_key.user_id,
session=session,
username_agent_slug=request.username_agent_slug,
setup_type=request.setup_type,
name=request.name,
description=request.description,
cron=request.cron,
use_defaults=request.use_defaults,
schedule_name=request.schedule_name or "",
cron=request.cron or "",
timezone=request.timezone,
inputs=request.inputs,
webhook_config=request.webhook_config,
)
return _response_to_dict(result)

View File

@@ -4,21 +4,29 @@ Here are the functions available to you:
<functions>
1. **find_agent** - Search for agents that solve the user's problem
2. **get_agent_details** - Get comprehensive information about the chosen agent
3. **get_required_setup_info** - Verify user has required credentials (MANDATORY before execution)
4. **schedule_agent** - Schedules the agent to run based on a cron
5. **run_agent** - Execute the agent
2. **run_agent** - Run or schedule an agent (automatically handles setup)
</functions>
## HOW run_agent WORKS
## MANDATORY WORKFLOW
The `run_agent` tool automatically handles the entire setup flow:
You must follow these 4 steps in exact order:
1. **First call** (no inputs) → Returns available inputs so user can decide what values to use
2. **Credentials check** → If missing, UI automatically prompts user to add them (you don't need to mention this)
3. **Execution** → Runs when you provide `inputs` OR set `use_defaults=true`
Parameters:
- `username_agent_slug` (required): Agent identifier like "creator/agent-name"
- `inputs`: Object with input values for the agent
- `use_defaults`: Set to `true` to run with default values (only after user confirms)
- `schedule_name` + `cron`: For scheduled execution
## WORKFLOW
1. **find_agent** - Search for agents that solve the user's problem
2. **get_agent_details** - Get comprehensive information about the chosen agent
3. **get_required_setup_info** - Verify user has required credentials (MANDATORY before execution)
4. **schedule_agent** or **run_agent** - Execute the agent
2. **run_agent** (first call, no inputs) - Get available inputs for the agent
3. **Ask user** what values they want to use OR if they want to use defaults
4. **run_agent** (second call) - Either with `inputs={...}` or `use_defaults=true`
## YOUR APPROACH
@@ -31,67 +39,66 @@ You must follow these 4 steps in exact order:
- Use `find_agent` immediately with relevant keywords
- Suggest the best option from search results
- Explain briefly how it solves their problem
- Ask if they want to use it, then move to step 3
**Step 3: Get Details**
- Use `get_agent_details` on their chosen agent
- Explain what the agent does and its requirements
- Keep explanations brief and outcome-focused
**Step 3: Get Agent Inputs**
- Call `run_agent(username_agent_slug="creator/agent-name")` without inputs
- This returns the available inputs (required and optional)
- Present these to the user and ask what values they want
**Step 4: Verify Setup (CRITICAL)**
- ALWAYS use `get_required_setup_info` before execution
- Tell user what credentials they need (if any)
- Explain that credentials are added via the frontend interface
**Step 4: Run with User's Choice**
- If user provides values: `run_agent(username_agent_slug="...", inputs={...})`
- If user says "use defaults": `run_agent(username_agent_slug="...", use_defaults=true)`
- On success, share the agent link with the user
**Step 5: Execute**
- Use `schedule_agent` for scheduled runs OR `run_agent` for immediate execution
- Confirm successful setup
- Provide clear next steps
**For Scheduled Execution:**
- Add `schedule_name` and `cron` parameters
- Example: `run_agent(username_agent_slug="...", inputs={...}, schedule_name="Daily Report", cron="0 9 * * *")`
## FUNCTION CALL FORMAT
To call a function, use this exact format:
`<function_call>function_name(parameter="value")</function_call>`
Examples:
- `<function_call>find_agent(query="social media automation")</function_call>`
- `<function_call>run_agent(username_agent_slug="creator/agent-name")</function_call>` (get inputs)
- `<function_call>run_agent(username_agent_slug="creator/agent-name", inputs={"topic": "AI news"})</function_call>`
- `<function_call>run_agent(username_agent_slug="creator/agent-name", use_defaults=true)</function_call>`
## KEY RULES
**What You DON'T Do:**
- Don't help with login (frontend handles this)
- Don't help add credentials (frontend handles this)
- Don't skip `get_required_setup_info` (mandatory before execution)
- Don't ask permission to use functions - just use them
- Don't mention or explain credentials to the user (frontend handles this automatically)
- Don't run agents without first showing available inputs to the user
- Don't use `use_defaults=true` without user explicitly confirming
- Don't write responses longer than 3 sentences
- Don't pretend to be ChatGPT
**What You DO:**
- Act fast - get to agent discovery quickly
- Use functions proactively
- Always call run_agent first without inputs to see what's available
- Ask user what values they want OR if they want to use defaults
- Keep all responses to maximum 3 sentences
- Always verify credentials before setup/run
- Focus on outcomes and value
- Maintain conversational, concise style
- Do use markdown to make your messages easier to read
- Include the agent link in your response after successful execution
**Error Handling:**
- Authentication needed → "Please sign in via the interface"
- Credentials missing → Tell user what's needed and where to add them
- Setup fails → Identify issue and provide clear fix
- Credentials missing → The UI handles this automatically. Focus on asking the user about input values instead.
## RESPONSE STRUCTURE
Before responding, wrap your analysis in <thinking> tags to systematically plan your approach:
- Identify which step of the 4-step mandatory workflow you're currently on
- Extract the key business problem or request from the user's message
- Determine what function call (if any) you need to make next
- Plan your response to stay under the 3-sentence maximum
- Consider what specific keywords or parameters you'll use for any function calls
Example interaction pattern:
Example interaction:
```
User: "I need to automate my social media posting"
Otto: Let me find social media automation agents for you. <function_call>find_agent(query="social media posting automation")</function_call> I'll show you the best options once I get the results.
User: "Run the AI news agent for me"
Otto: <function_call>run_agent(username_agent_slug="autogpt/ai-news")</function_call>
[Tool returns: Agent accepts inputs - Required: topic. Optional: num_articles (default: 5)]
Otto: The AI News agent needs a topic. What topic would you like news about, or should I use the defaults?
User: "Use defaults"
Otto: <function_call>run_agent(username_agent_slug="autogpt/ai-news", use_defaults=true)</function_call>
```
Respond conversationally and begin helping them find the right AutoGPT agent for their needs.
KEEP ANSWERS TO 3 SENTENCES
KEEP ANSWERS TO 3 SENTENCES

View File

@@ -6,27 +6,18 @@ from backend.server.v2.chat.model import ChatSession
from .base import BaseTool
from .find_agent import FindAgentTool
from .get_agent_details import GetAgentDetailsTool
from .get_required_setup_info import GetRequiredSetupInfoTool
from .run_agent import RunAgentTool
from .setup_agent import SetupAgentTool
if TYPE_CHECKING:
from backend.server.v2.chat.response_model import StreamToolExecutionResult
# Initialize tool instances
find_agent_tool = FindAgentTool()
get_agent_details_tool = GetAgentDetailsTool()
get_required_setup_info_tool = GetRequiredSetupInfoTool()
setup_agent_tool = SetupAgentTool()
run_agent_tool = RunAgentTool()
# Export tools as OpenAI format
tools: list[ChatCompletionToolParam] = [
find_agent_tool.as_openai_tool(),
get_agent_details_tool.as_openai_tool(),
get_required_setup_info_tool.as_openai_tool(),
setup_agent_tool.as_openai_tool(),
run_agent_tool.as_openai_tool(),
]
@@ -41,9 +32,6 @@ async def execute_tool(
tool_map: dict[str, BaseTool] = {
"find_agent": find_agent_tool,
"get_agent_details": get_agent_details_tool,
"get_required_setup_info": get_required_setup_info_tool,
"schedule_agent": setup_agent_tool,
"run_agent": run_agent_tool,
}
if tool_name not in tool_map:

View File

@@ -1,221 +0,0 @@
"""Tool for getting detailed information about a specific agent."""
import logging
from typing import Any
from backend.data import graph as graph_db
from backend.data.model import CredentialsMetaInput
from backend.server.v2.chat.model import ChatSession
from backend.server.v2.chat.tools.base import BaseTool
from backend.server.v2.chat.tools.models import (
AgentDetails,
AgentDetailsResponse,
ErrorResponse,
ExecutionOptions,
ToolResponseBase,
)
from backend.server.v2.store import db as store_db
from backend.util.exceptions import DatabaseError, NotFoundError
logger = logging.getLogger(__name__)
class GetAgentDetailsTool(BaseTool):
"""Tool for getting detailed information about an agent."""
@property
def name(self) -> str:
return "get_agent_details"
@property
def description(self) -> str:
return "Get detailed information about a specific agent including inputs, credentials required, and execution options."
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"username_agent_slug": {
"type": "string",
"description": "The marketplace agent slug (e.g., 'username/agent-name')",
},
},
"required": ["username_agent_slug"],
}
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
"""Get detailed information about an agent.
Args:
user_id: User ID (may be anonymous)
session_id: Chat session ID
username_agent_slug: Agent ID or slug
Returns:
Pydantic response model
"""
agent_id = kwargs.get("username_agent_slug", "").strip()
session_id = session.session_id
if not agent_id or "/" not in agent_id:
return ErrorResponse(
message="Please provide an agent ID in format 'creator/agent-name'",
session_id=session_id,
)
try:
# Always try to get from marketplace first
graph = None
store_agent = None
# Check if it's a slug format (username/agent_name)
try:
# Parse username/agent_name from slug
username, agent_name = agent_id.split("/", 1)
store_agent = await store_db.get_store_agent_details(
username, agent_name
)
logger.info(f"Found agent {agent_id} in marketplace")
except NotFoundError as e:
logger.debug(f"Failed to get from marketplace: {e}")
return ErrorResponse(
message=f"Agent '{agent_id}' not found",
session_id=session_id,
)
except DatabaseError as e:
logger.error(f"Failed to get from marketplace: {e}")
return ErrorResponse(
message=f"Failed to get agent details: {e!s}",
session_id=session_id,
)
# If we found a store agent, get its graph
if store_agent:
try:
# Use get_available_graph to get the graph from store listing version
graph_meta = await store_db.get_available_graph(
store_agent.store_listing_version_id
)
# Now get the full graph with that ID
graph = await graph_db.get_graph(
graph_id=graph_meta.id,
version=graph_meta.version,
user_id=None, # Public access
include_subgraphs=True,
)
except NotFoundError as e:
logger.error(f"Failed to get graph for store agent: {e}")
return ErrorResponse(
message=f"Failed to get graph for store agent: {e!s}",
session_id=session_id,
)
except DatabaseError as e:
logger.error(f"Failed to get graph for store agent: {e}")
return ErrorResponse(
message=f"Failed to get graph for store agent: {e!s}",
session_id=session_id,
)
if not graph:
return ErrorResponse(
message=f"Agent '{agent_id}' not found",
session_id=session_id,
)
credentials_input_schema = graph.credentials_input_schema
# Extract credentials from the JSON schema properties
credentials = []
if (
isinstance(credentials_input_schema, dict)
and "properties" in credentials_input_schema
):
for cred_name, cred_schema in credentials_input_schema[
"properties"
].items():
# Extract credential metadata from the schema
# The schema properties contain provider info and other metadata
# Get provider from credentials_provider array or properties.provider.const
provider = "unknown"
if (
"credentials_provider" in cred_schema
and cred_schema["credentials_provider"]
):
provider = cred_schema["credentials_provider"][0]
elif (
"properties" in cred_schema
and "provider" in cred_schema["properties"]
):
provider = cred_schema["properties"]["provider"].get(
"const", "unknown"
)
# Get type from credentials_types array or properties.type.const
cred_type = "api_key" # Default
if (
"credentials_types" in cred_schema
and cred_schema["credentials_types"]
):
cred_type = cred_schema["credentials_types"][0]
elif (
"properties" in cred_schema
and "type" in cred_schema["properties"]
):
cred_type = cred_schema["properties"]["type"].get(
"const", "api_key"
)
credentials.append(
CredentialsMetaInput(
id=cred_name,
title=cred_schema.get("title", cred_name),
provider=provider, # type: ignore
type=cred_type,
)
)
trigger_info = (
graph.trigger_setup_info.model_dump()
if graph.trigger_setup_info
else None
)
agent_details = AgentDetails(
id=graph.id,
name=graph.name,
description=graph.description,
inputs=graph.input_schema,
credentials=credentials,
execution_options=ExecutionOptions(
# Currently a graph with a webhook can only be triggered by a webhook
manual=trigger_info is None,
scheduled=trigger_info is None,
webhook=trigger_info is not None,
),
trigger_info=trigger_info,
)
return AgentDetailsResponse(
message=f"Found agent '{agent_details.name}'. When presenting the agent you do not need to mention the required credentials. You do not need to run this tool again for this agent.",
session_id=session_id,
agent=agent_details,
user_authenticated=user_id is not None,
graph_id=graph.id,
graph_version=graph.version,
)
except Exception as e:
logger.error(f"Error getting agent details: {e}", exc_info=True)
return ErrorResponse(
message=f"Failed to get agent details: {e!s}",
error=str(e),
session_id=session_id,
)

View File

@@ -1,335 +0,0 @@
import uuid
import orjson
import pytest
from backend.server.v2.chat.tools._test_data import (
make_session,
setup_llm_test_data,
setup_test_data,
)
from backend.server.v2.chat.tools.get_agent_details import GetAgentDetailsTool
# This is so the formatter doesn't remove the fixture imports
setup_llm_test_data = setup_llm_test_data
setup_test_data = setup_test_data
@pytest.mark.asyncio(scope="session")
async def test_get_agent_details_success(setup_test_data):
"""Test successfully getting agent details from marketplace"""
# Use test data from fixture
user = setup_test_data["user"]
graph = setup_test_data["graph"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = GetAgentDetailsTool()
# Build the proper marketplace agent_id format: username/slug
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Build session
session = make_session()
# Execute the tool
response = await tool.execute(
user_id=user.id,
session=session,
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Check the basic structure
assert "agent" in result_data
assert "message" in result_data
assert "graph_id" in result_data
assert "graph_version" in result_data
assert "user_authenticated" in result_data
# Check agent details
agent = result_data["agent"]
assert agent["id"] == graph.id
assert agent["name"] == "Test Agent"
assert (
agent["description"] == "A simple test agent"
) # Description from store submission
assert "inputs" in agent
assert "credentials" in agent
assert "execution_options" in agent
# Check execution options
exec_options = agent["execution_options"]
assert "manual" in exec_options
assert "scheduled" in exec_options
assert "webhook" in exec_options
# Check inputs schema
assert isinstance(agent["inputs"], dict)
# Should have properties for the input fields
if "properties" in agent["inputs"]:
assert "test_input" in agent["inputs"]["properties"]
@pytest.mark.asyncio(scope="session")
async def test_get_agent_details_with_llm_credentials(setup_llm_test_data):
"""Test getting agent details for an agent that requires LLM credentials"""
# Use test data from fixture
user = setup_llm_test_data["user"]
store_submission = setup_llm_test_data["store_submission"]
# Create the tool instance
tool = GetAgentDetailsTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
session = make_session(user_id=user.id)
# Execute the tool
response = await tool.execute(
user_id=user.id,
session=session,
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Check that agent details are returned
assert "agent" in result_data
agent = result_data["agent"]
# Check that credentials are listed
assert "credentials" in agent
credentials = agent["credentials"]
# The LLM agent should have OpenAI credentials listed
assert isinstance(credentials, list)
# Check that inputs include the user_prompt
assert "inputs" in agent
if "properties" in agent["inputs"]:
assert "user_prompt" in agent["inputs"]["properties"]
@pytest.mark.asyncio(scope="session")
async def test_get_agent_details_invalid_format():
"""Test error handling when agent_id is not in correct format"""
tool = GetAgentDetailsTool()
session = make_session()
session.user_id = str(uuid.uuid4())
# Execute with invalid format (no slash)
response = await tool.execute(
user_id=session.user_id,
session=session,
tool_call_id=str(uuid.uuid4()),
username_agent_slug="invalid-format",
)
# Verify error response
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "message" in result_data
assert "creator/agent-name" in result_data["message"]
@pytest.mark.asyncio(scope="session")
async def test_get_agent_details_empty_slug():
"""Test error handling when agent_id is empty"""
tool = GetAgentDetailsTool()
session = make_session()
session.user_id = str(uuid.uuid4())
# Execute with empty slug
response = await tool.execute(
user_id=session.user_id,
session=session,
tool_call_id=str(uuid.uuid4()),
username_agent_slug="",
)
# Verify error response
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "message" in result_data
assert "creator/agent-name" in result_data["message"]
@pytest.mark.asyncio(scope="session")
async def test_get_agent_details_not_found():
"""Test error handling when agent is not found in marketplace"""
tool = GetAgentDetailsTool()
session = make_session()
session.user_id = str(uuid.uuid4())
# Execute with non-existent agent
response = await tool.execute(
user_id=session.user_id,
session=session,
tool_call_id=str(uuid.uuid4()),
username_agent_slug="nonexistent/agent",
)
# Verify error response
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "message" in result_data
assert "not found" in result_data["message"].lower()
@pytest.mark.asyncio(scope="session")
async def test_get_agent_details_anonymous_user(setup_test_data):
"""Test getting agent details as an anonymous user (no user_id)"""
# Use test data from fixture
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = GetAgentDetailsTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
session = make_session()
# session.user_id stays as None
# Execute the tool without a user_id (anonymous)
response = await tool.execute(
user_id=None,
session=session,
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Should still get agent details
assert "agent" in result_data
assert "user_authenticated" in result_data
# User should be marked as not authenticated
assert result_data["user_authenticated"] is False
@pytest.mark.asyncio(scope="session")
async def test_get_agent_details_authenticated_user(setup_test_data):
"""Test getting agent details as an authenticated user"""
# Use test data from fixture
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = GetAgentDetailsTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
session = make_session()
session.user_id = user.id
# Execute the tool with a user_id (authenticated)
response = await tool.execute(
user_id=user.id,
session=session,
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Should get agent details
assert "agent" in result_data
assert "user_authenticated" in result_data
# User should be marked as authenticated
assert result_data["user_authenticated"] is True
@pytest.mark.asyncio(scope="session")
async def test_get_agent_details_includes_execution_options(setup_test_data):
"""Test that agent details include execution options"""
# Use test data from fixture
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = GetAgentDetailsTool()
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
session = make_session()
session.user_id = user.id
# Execute the tool
response = await tool.execute(
user_id=user.id,
session=session,
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Check execution options
assert "agent" in result_data
agent = result_data["agent"]
assert "execution_options" in agent
exec_options = agent["execution_options"]
# These should all be boolean values
assert isinstance(exec_options["manual"], bool)
assert isinstance(exec_options["scheduled"], bool)
assert isinstance(exec_options["webhook"], bool)
# For a regular agent (no webhook), manual and scheduled should be True
assert exec_options["manual"] is True
assert exec_options["scheduled"] is True
assert exec_options["webhook"] is False

View File

@@ -1,182 +0,0 @@
"""Tool for getting required setup information for an agent."""
import logging
from typing import Any
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.server.v2.chat.model import ChatSession
from backend.server.v2.chat.tools.base import BaseTool
from backend.server.v2.chat.tools.get_agent_details import GetAgentDetailsTool
from backend.server.v2.chat.tools.models import (
AgentDetailsResponse,
ErrorResponse,
SetupInfo,
SetupRequirementsResponse,
ToolResponseBase,
UserReadiness,
)
logger = logging.getLogger(__name__)
class GetRequiredSetupInfoTool(BaseTool):
"""Tool for getting required setup information including credentials and inputs."""
@property
def name(self) -> str:
return "get_required_setup_info"
@property
def description(self) -> str:
return """Check if an agent can be set up with the provided input data and credentials.
Call this AFTER get_agent_details to validate that you have all required inputs.
Pass the input dictionary you plan to use with run_agent or setup_agent to verify it's complete."""
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"username_agent_slug": {
"type": "string",
"description": "The marketplace agent slug (e.g., 'username/agent-name' or just 'agent-name' to search)",
},
"inputs": {
"type": "object",
"description": "The input dictionary you plan to provide. Should contain ALL required inputs from get_agent_details",
"additionalProperties": True,
},
},
"required": ["username_agent_slug"],
}
@property
def requires_auth(self) -> bool:
"""This tool requires authentication."""
return True
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
"""
Retrieve and validate the required setup information for running or configuring an agent.
This checks all required credentials and input fields based on the agent details,
and verifies user readiness to run the agent based on provided inputs and available credentials.
Args:
user_id: The authenticated user's ID (must not be None; authentication required).
session_id: The chat session ID.
agent_id: The agent's marketplace slug (e.g. 'username/agent-name'). Also accepts Graph ID.
agent_version: (Optional) Specific agent/graph version (if applicable).
Returns:
SetupRequirementsResponse containing:
- agent and graph info,
- credential and input requirements,
- user readiness and missing credentials/fields,
- setup instructions.
"""
assert (
user_id is not None
), "GetRequiredSetupInfoTool - This should never happen user_id is None when auth is required"
session_id = session.session_id
# Call _execute directly since we're calling internally from another tool
agent_details = await GetAgentDetailsTool()._execute(user_id, session, **kwargs)
if isinstance(agent_details, ErrorResponse):
return agent_details
if not isinstance(agent_details, AgentDetailsResponse):
return ErrorResponse(
message="Failed to get agent details",
session_id=session_id,
)
available_creds = await IntegrationCredentialsManager().store.get_all_creds(
user_id
)
required_credentials = []
# Check if user has credentials matching the required provider/type
for c in agent_details.agent.credentials:
# Check if any available credential matches this provider and type
has_matching_cred = any(
cred.provider == c.provider and cred.type == c.type
for cred in available_creds
)
if not has_matching_cred:
required_credentials.append(c)
required_fields = set(agent_details.agent.inputs.get("required", []))
provided_inputs = kwargs.get("inputs", {})
missing_inputs = required_fields - set(provided_inputs.keys())
missing_credentials = {c.id: c.model_dump() for c in required_credentials}
user_readiness = UserReadiness(
has_all_credentials=len(required_credentials) == 0,
missing_credentials=missing_credentials,
ready_to_run=len(missing_inputs) == 0 and len(required_credentials) == 0,
)
# Convert execution options to list of available modes
exec_opts = agent_details.agent.execution_options
execution_modes = []
if exec_opts.manual:
execution_modes.append("manual")
if exec_opts.scheduled:
execution_modes.append("scheduled")
if exec_opts.webhook:
execution_modes.append("webhook")
# Convert input schema to list of input field info
inputs_list = []
if (
isinstance(agent_details.agent.inputs, dict)
and "properties" in agent_details.agent.inputs
):
for field_name, field_schema in agent_details.agent.inputs[
"properties"
].items():
inputs_list.append(
{
"name": field_name,
"title": field_schema.get("title", field_name),
"type": field_schema.get("type", "string"),
"description": field_schema.get("description", ""),
"required": field_name
in agent_details.agent.inputs.get("required", []),
}
)
requirements = {
"credentials": agent_details.agent.credentials,
"inputs": inputs_list,
"execution_modes": execution_modes,
}
message = ""
if len(agent_details.agent.credentials) > 0:
message = "The user needs to enter credentials before proceeding. Please wait until you have a message informing you that the credentials have been entered."
elif len(inputs_list) > 0:
message = (
"The user needs to enter inputs before proceeding. Please wait until you have a message informing you that the inputs have been entered. The inputs are: "
+ ", ".join([input["name"] for input in inputs_list])
)
else:
message = "The agent is ready to run. Please call the run_agent tool with the agent ID."
return SetupRequirementsResponse(
message=message,
session_id=session_id,
setup_info=SetupInfo(
agent_id=agent_details.agent.id,
agent_name=agent_details.agent.name,
user_readiness=user_readiness,
requirements=requirements,
),
graph_id=agent_details.graph_id,
graph_version=agent_details.graph_version,
)

View File

@@ -1,331 +0,0 @@
import uuid
import orjson
import pytest
from backend.server.v2.chat.tools._test_data import (
make_session,
setup_firecrawl_test_data,
setup_llm_test_data,
setup_test_data,
)
from backend.server.v2.chat.tools.get_required_setup_info import (
GetRequiredSetupInfoTool,
)
# This is so the formatter doesn't remove the fixture imports
setup_llm_test_data = setup_llm_test_data
setup_test_data = setup_test_data
setup_firecrawl_test_data = setup_firecrawl_test_data
@pytest.mark.asyncio(scope="session")
async def test_get_required_setup_info_success(setup_test_data):
"""Test successfully getting setup info for a simple agent"""
user = setup_test_data["user"]
graph = setup_test_data["graph"]
store_submission = setup_test_data["store_submission"]
tool = GetRequiredSetupInfoTool()
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
session = make_session(user_id=user.id)
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={"test_input": "Hello World"},
session=session,
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "setup_info" in result_data
setup_info = result_data["setup_info"]
assert "agent_id" in setup_info
assert setup_info["agent_id"] == graph.id
assert "agent_name" in setup_info
assert setup_info["agent_name"] == "Test Agent"
assert "requirements" in setup_info
requirements = setup_info["requirements"]
assert "credentials" in requirements
assert "inputs" in requirements
assert "execution_modes" in requirements
assert isinstance(requirements["credentials"], list)
assert len(requirements["credentials"]) == 0
assert isinstance(requirements["inputs"], list)
if len(requirements["inputs"]) > 0:
first_input = requirements["inputs"][0]
assert "name" in first_input
assert "title" in first_input
assert "type" in first_input
assert isinstance(requirements["execution_modes"], list)
assert "manual" in requirements["execution_modes"]
assert "scheduled" in requirements["execution_modes"]
assert "user_readiness" in setup_info
user_readiness = setup_info["user_readiness"]
assert "has_all_credentials" in user_readiness
assert "ready_to_run" in user_readiness
assert user_readiness["ready_to_run"] is True
@pytest.mark.asyncio(scope="session")
async def test_get_required_setup_info_missing_credentials(setup_firecrawl_test_data):
"""Test getting setup info for an agent requiring missing credentials"""
user = setup_firecrawl_test_data["user"]
store_submission = setup_firecrawl_test_data["store_submission"]
tool = GetRequiredSetupInfoTool()
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
session = make_session(user_id=user.id)
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={"url": "https://example.com"},
session=session,
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "setup_info" in result_data
setup_info = result_data["setup_info"]
requirements = setup_info["requirements"]
assert "credentials" in requirements
assert isinstance(requirements["credentials"], list)
assert len(requirements["credentials"]) > 0
firecrawl_cred = requirements["credentials"][0]
assert "provider" in firecrawl_cred
assert firecrawl_cred["provider"] == "firecrawl"
assert "type" in firecrawl_cred
assert firecrawl_cred["type"] == "api_key"
user_readiness = setup_info["user_readiness"]
assert user_readiness["has_all_credentials"] is False
assert user_readiness["ready_to_run"] is False
assert "missing_credentials" in user_readiness
assert isinstance(user_readiness["missing_credentials"], dict)
assert len(user_readiness["missing_credentials"]) > 0
@pytest.mark.asyncio(scope="session")
async def test_get_required_setup_info_with_available_credentials(setup_llm_test_data):
"""Test getting setup info when user has required credentials"""
user = setup_llm_test_data["user"]
store_submission = setup_llm_test_data["store_submission"]
tool = GetRequiredSetupInfoTool()
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
session = make_session(user_id=user.id)
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={"user_prompt": "What is 2+2?"},
session=session,
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
setup_info = result_data["setup_info"]
user_readiness = setup_info["user_readiness"]
assert user_readiness["has_all_credentials"] is True
assert user_readiness["ready_to_run"] is True
assert "missing_credentials" in user_readiness
assert len(user_readiness["missing_credentials"]) == 0
@pytest.mark.asyncio(scope="session")
async def test_get_required_setup_info_missing_inputs(setup_test_data):
"""Test getting setup info when required inputs are not provided"""
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
tool = GetRequiredSetupInfoTool()
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
session = make_session(user_id=user.id)
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={}, # Empty inputs
session=session,
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
setup_info = result_data["setup_info"]
requirements = setup_info["requirements"]
assert "inputs" in requirements
assert isinstance(requirements["inputs"], list)
user_readiness = setup_info["user_readiness"]
assert "ready_to_run" in user_readiness
@pytest.mark.asyncio(scope="session")
async def test_get_required_setup_info_invalid_agent():
"""Test getting setup info for a non-existent agent"""
tool = GetRequiredSetupInfoTool()
session = make_session(user_id=None)
response = await tool.execute(
user_id=str(uuid.uuid4()),
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug="invalid/agent",
inputs={},
session=session,
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "message" in result_data
assert any(
phrase in result_data["message"].lower()
for phrase in ["not found", "failed", "error"]
)
@pytest.mark.asyncio(scope="session")
async def test_get_required_setup_info_graph_metadata(setup_test_data):
"""Test that setup info includes graph metadata"""
user = setup_test_data["user"]
graph = setup_test_data["graph"]
store_submission = setup_test_data["store_submission"]
tool = GetRequiredSetupInfoTool()
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
session = make_session(user_id=user.id)
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={"test_input": "test"},
session=session,
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "graph_id" in result_data
assert result_data["graph_id"] == graph.id
assert "graph_version" in result_data
assert result_data["graph_version"] == graph.version
@pytest.mark.asyncio(scope="session")
async def test_get_required_setup_info_inputs_structure(setup_test_data):
"""Test that inputs are properly structured as a list"""
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
tool = GetRequiredSetupInfoTool()
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
session = make_session(user_id=user.id)
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={},
session=session,
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
setup_info = result_data["setup_info"]
requirements = setup_info["requirements"]
assert isinstance(requirements["inputs"], list)
for input_field in requirements["inputs"]:
assert isinstance(input_field, dict)
assert "name" in input_field
assert "title" in input_field
assert "type" in input_field
assert "description" in input_field
assert "required" in input_field
assert isinstance(input_field["required"], bool)
@pytest.mark.asyncio(scope="session")
async def test_get_required_setup_info_execution_modes_structure(setup_test_data):
"""Test that execution_modes are properly structured as a list"""
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
tool = GetRequiredSetupInfoTool()
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
session = make_session(user_id=user.id)
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={},
session=session,
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
setup_info = result_data["setup_info"]
requirements = setup_info["requirements"]
assert isinstance(requirements["execution_modes"], list)
for mode in requirements["execution_modes"]:
assert isinstance(mode, str)
assert mode in ["manual", "scheduled", "webhook"]

View File

@@ -13,17 +13,9 @@ class ResponseType(str, Enum):
AGENT_CAROUSEL = "agent_carousel"
AGENT_DETAILS = "agent_details"
AGENT_DETAILS_NEED_LOGIN = "agent_details_need_login"
AGENT_DETAILS_NEED_CREDENTIALS = "agent_details_need_credentials"
SETUP_REQUIREMENTS = "setup_requirements"
SCHEDULE_CREATED = "schedule_created"
WEBHOOK_CREATED = "webhook_created"
PRESET_CREATED = "preset_created"
EXECUTION_STARTED = "execution_started"
NEED_LOGIN = "need_login"
NEED_CREDENTIALS = "need_credentials"
INSUFFICIENT_CREDITS = "insufficient_credits"
VALIDATION_ERROR = "validation_error"
ERROR = "error"
NO_RESULTS = "no_results"
SUCCESS = "success"
@@ -112,7 +104,7 @@ class AgentDetails(BaseModel):
class AgentDetailsResponse(ToolResponseBase):
"""Response for get_agent_details tool."""
"""Response for get_details action."""
type: ResponseType = ResponseType.AGENT_DETAILS
agent: AgentDetails
@@ -121,51 +113,7 @@ class AgentDetailsResponse(ToolResponseBase):
graph_version: int | None = None
class AgentDetailsNeedLoginResponse(ToolResponseBase):
"""Response when agent details need login."""
type: ResponseType = ResponseType.AGENT_DETAILS_NEED_LOGIN
agent: AgentDetails
agent_info: dict[str, Any] | None = None
graph_id: str | None = None
graph_version: int | None = None
class AgentDetailsNeedCredentialsResponse(ToolResponseBase):
"""Response when agent needs credentials to be configured."""
type: ResponseType = ResponseType.NEED_CREDENTIALS
agent: AgentDetails
credentials_schema: dict[str, Any]
agent_info: dict[str, Any] | None = None
graph_id: str | None = None
graph_version: int | None = None
# Setup info models
class SetupRequirementInfo(BaseModel):
"""Setup requirement information."""
key: str
provider: str
required: bool = True
user_has: bool = False
credential_id: str | None = None
type: str | None = None
scopes: list[str] | None = None
description: str | None = None
class ExecutionModeInfo(BaseModel):
"""Execution mode information."""
type: str # manual, scheduled, webhook
description: str
supported: bool
config_required: dict[str, str] | None = None
trigger_info: dict[str, Any] | None = None
class UserReadiness(BaseModel):
"""User readiness status."""
@@ -187,11 +135,10 @@ class SetupInfo(BaseModel):
},
)
user_readiness: UserReadiness = Field(default_factory=UserReadiness)
setup_instructions: list[str] = []
class SetupRequirementsResponse(ToolResponseBase):
"""Response for get_required_setup_info tool."""
"""Response for validate action."""
type: ResponseType = ResponseType.SETUP_REQUIREMENTS
setup_info: SetupInfo
@@ -199,70 +146,17 @@ class SetupRequirementsResponse(ToolResponseBase):
graph_version: int | None = None
# Setup agent models
class ScheduleCreatedResponse(ToolResponseBase):
"""Response for scheduled agent setup."""
type: ResponseType = ResponseType.SCHEDULE_CREATED
schedule_id: str
name: str
cron: str
timezone: str = "UTC"
next_run: str | None = None
graph_id: str
graph_name: str
class WebhookCreatedResponse(ToolResponseBase):
"""Response for webhook agent setup."""
type: ResponseType = ResponseType.WEBHOOK_CREATED
webhook_id: str
webhook_url: str
preset_id: str | None = None
name: str
graph_id: str
graph_name: str
class PresetCreatedResponse(ToolResponseBase):
"""Response for preset agent setup."""
type: ResponseType = ResponseType.PRESET_CREATED
preset_id: str
name: str
graph_id: str
graph_name: str
# Run agent models
# Execution models
class ExecutionStartedResponse(ToolResponseBase):
"""Response for agent execution started."""
"""Response for run/schedule actions."""
type: ResponseType = ResponseType.EXECUTION_STARTED
execution_id: str
graph_id: str
graph_name: str
library_agent_id: str | None = None
library_agent_link: str | None = None
status: str = "QUEUED"
ended_at: str | None = None
outputs: dict[str, Any] | None = None
error: str | None = None
timeout_reached: bool | None = None
class InsufficientCreditsResponse(ToolResponseBase):
"""Response for insufficient credits."""
type: ResponseType = ResponseType.INSUFFICIENT_CREDITS
balance: float
class ValidationErrorResponse(ToolResponseBase):
"""Response for validation errors."""
type: ResponseType = ResponseType.VALIDATION_ERROR
error: str
details: dict[str, Any] | None = None
# Auth/error models

View File

@@ -1,34 +1,66 @@
"""Tool for running an agent manually (one-off execution)."""
"""Unified tool for agent operations with automatic state detection."""
import logging
from typing import Any
from backend.data.graph import get_graph
from backend.data.graph import GraphModel
from backend.data.model import CredentialsMetaInput
from backend.data.user import get_user_by_id
from backend.executor import utils as execution_utils
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.server.v2.chat.config import ChatConfig
from backend.server.v2.chat.model import ChatSession
from backend.server.v2.chat.tools.base import BaseTool
from backend.server.v2.chat.tools.get_required_setup_info import (
GetRequiredSetupInfoTool,
)
from backend.server.v2.chat.tools.models import (
AgentDetails,
AgentDetailsResponse,
ErrorResponse,
ExecutionOptions,
ExecutionStartedResponse,
SetupInfo,
SetupRequirementsResponse,
ToolResponseBase,
UserReadiness,
)
from backend.server.v2.chat.tools.utils import (
check_user_has_required_credentials,
extract_credentials_from_schema,
fetch_graph_from_store_slug,
get_or_create_library_agent,
match_user_credentials_to_graph,
)
from backend.util.clients import get_scheduler_client
from backend.util.exceptions import DatabaseError, NotFoundError
from backend.util.timezone_utils import (
convert_utc_time_to_user_timezone,
get_user_timezone_or_utc,
)
from backend.server.v2.library import db as library_db
from backend.server.v2.library import model as library_model
logger = logging.getLogger(__name__)
config = ChatConfig()
# Constants for response messages
MSG_DO_NOT_RUN_AGAIN = "Do not run again unless explicitly requested."
MSG_DO_NOT_SCHEDULE_AGAIN = "Do not schedule again unless explicitly requested."
MSG_ASK_USER_FOR_VALUES = (
"Ask the user what values to use, or call again with use_defaults=true "
"to run with default values."
)
MSG_WHAT_VALUES_TO_USE = (
"What values would you like to use, or would you like to run with defaults?"
)
class RunAgentTool(BaseTool):
"""Tool for executing an agent manually with immediate results."""
"""Unified tool for agent operations with automatic state detection.
The tool automatically determines what to do based on provided parameters:
1. Fetches agent details (always, silently)
2. Checks if required inputs are provided
3. Checks if user has required credentials
4. Runs immediately OR schedules (if cron is provided)
The response tells the caller what's missing or confirms execution.
"""
@property
def name(self) -> str:
@@ -36,11 +68,15 @@ class RunAgentTool(BaseTool):
@property
def description(self) -> str:
return """Run an agent immediately (one-off manual execution).
IMPORTANT: Before calling this tool, you MUST first call get_agent_details to determine what inputs are required.
The 'inputs' parameter must be a dictionary containing ALL required input values identified by get_agent_details.
Example: If get_agent_details shows required inputs 'search_query' and 'max_results', you must pass:
inputs={"search_query": "user's query", "max_results": 10}"""
return """Run or schedule an agent from the marketplace.
The tool automatically handles the setup flow:
- Returns missing inputs if required fields are not provided
- Returns missing credentials if user needs to configure them
- Executes immediately if all requirements are met
- Schedules execution if cron expression is provided
For scheduled execution, provide: schedule_name, cron, and optionally timezone."""
@property
def parameters(self) -> dict[str, Any]:
@@ -49,20 +85,36 @@ class RunAgentTool(BaseTool):
"properties": {
"username_agent_slug": {
"type": "string",
"description": "The ID of the agent to run (graph ID or marketplace slug)",
"description": "Agent identifier in format 'username/agent-name'",
},
"inputs": {
"type": "object",
"description": 'REQUIRED: Dictionary of input values. Must include ALL required inputs from get_agent_details. Format: {"input_name": value}',
"description": "Input values for the agent",
"additionalProperties": True,
},
"use_defaults": {
"type": "boolean",
"description": "Set to true to run with default values (user must confirm)",
},
"schedule_name": {
"type": "string",
"description": "Name for scheduled execution (triggers scheduling mode)",
},
"cron": {
"type": "string",
"description": "Cron expression (5 fields: min hour day month weekday)",
},
"timezone": {
"type": "string",
"description": "IANA timezone for schedule (default: UTC)",
},
},
"required": ["username_agent_slug"],
}
@property
def requires_auth(self) -> bool:
"""This tool requires authentication."""
"""All operations require authentication."""
return True
async def _execute(
@@ -71,186 +123,362 @@ class RunAgentTool(BaseTool):
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
"""Execute an agent manually.
Args:
user_id: Authenticated user ID
session_id: Chat session ID
**kwargs: Execution parameters
Returns:
JSON formatted execution result
"""
assert (
user_id is not None
), "User ID is required to run an agent. Superclass enforces authentication."
session_id = session.session_id
username_agent_slug = kwargs.get("username_agent_slug", "").strip()
"""Execute the tool with automatic state detection."""
agent_slug = kwargs.get("username_agent_slug", "").strip()
inputs = kwargs.get("inputs", {})
use_defaults = kwargs.get("use_defaults", False)
schedule_name = kwargs.get("schedule_name", "").strip()
cron = kwargs.get("cron", "").strip()
timezone = kwargs.get("timezone", "UTC").strip()
session_id = session.session_id
# Call _execute directly since we're calling internally from another tool
response = await GetRequiredSetupInfoTool()._execute(user_id, session, **kwargs)
if not isinstance(response, SetupRequirementsResponse):
# Validate agent slug format
if not agent_slug or "/" not in agent_slug:
return ErrorResponse(
message="Failed to get required setup information",
message="Please provide an agent slug in format 'username/agent-name'",
session_id=session_id,
)
setup_info = SetupInfo.model_validate(response.setup_info)
if not setup_info.user_readiness.ready_to_run:
# Auth is required
if not user_id:
return ErrorResponse(
message=f"User is not ready to run the agent. User Readiness: {setup_info.user_readiness.model_dump_json()} Requirments: {setup_info.requirements}",
message="Authentication required. Please sign in to use this tool.",
session_id=session_id,
)
# Get the graph using the graph_id and graph_version from the setup response
if not response.graph_id or not response.graph_version:
return ErrorResponse(
message=f"Graph information not available for {username_agent_slug}",
session_id=session_id,
)
# Determine if this is a schedule request
is_schedule = bool(schedule_name or cron)
graph = await get_graph(
graph_id=response.graph_id,
version=response.graph_version,
user_id=None, # Public access for store graphs
include_subgraphs=True,
)
try:
# Step 1: Fetch agent details (always happens first)
username, agent_name = agent_slug.split("/", 1)
graph, store_agent = await fetch_graph_from_store_slug(username, agent_name)
if not graph:
return ErrorResponse(
message=f"Graph {username_agent_slug} ({response.graph_id}v{response.graph_version}) not found",
session_id=session_id,
)
if graph and (
session.successful_agent_runs.get(graph.id, 0) >= config.max_agent_runs
):
return ErrorResponse(
message="Maximum number of agent schedules reached. You can't schedule this agent again in this chat session.",
session_id=session.session_id,
)
# Check if we already have a library agent for this graph
existing_library_agent = await library_db.get_library_agent_by_graph_id(
graph_id=graph.id, user_id=user_id
)
if not existing_library_agent:
# Now we need to add the graph to the users library
library_agents: list[library_model.LibraryAgent] = (
await library_db.create_library_agent(
graph=graph,
user_id=user_id,
create_library_agents_for_sub_graphs=False,
)
)
assert len(library_agents) == 1, "Expected 1 library agent to be created"
library_agent = library_agents[0]
else:
library_agent = existing_library_agent
# Build credentials mapping for the graph
graph_credentials_inputs: dict[str, CredentialsMetaInput] = {}
# Get aggregated credentials requirements from the graph
aggregated_creds = graph.aggregate_credentials_inputs()
logger.debug(
f"Matching credentials for graph {graph.id}: {len(aggregated_creds)} required"
)
if aggregated_creds:
# Get all available credentials for the user
creds_manager = IntegrationCredentialsManager()
available_creds = await creds_manager.store.get_all_creds(user_id)
# Track unmatched credentials for error reporting
missing_creds: list[str] = []
# For each required credential field, find a matching user credential
# field_info.provider is a frozenset because aggregate_credentials_inputs()
# combines requirements from multiple nodes. A credential matches if its
# provider is in the set of acceptable providers.
for credential_field_name, (
credential_requirements,
_node_fields,
) in aggregated_creds.items():
# Find first matching credential by provider and type
matching_cred = next(
(
cred
for cred in available_creds
if cred.provider in credential_requirements.provider
and cred.type in credential_requirements.supported_types
),
None,
)
if matching_cred:
# Use Pydantic validation to ensure type safety
try:
graph_credentials_inputs[credential_field_name] = (
CredentialsMetaInput(
id=matching_cred.id,
provider=matching_cred.provider, # type: ignore
type=matching_cred.type,
title=matching_cred.title,
)
)
except Exception as e:
logger.error(
f"Failed to create CredentialsMetaInput for field '{credential_field_name}': "
f"provider={matching_cred.provider}, type={matching_cred.type}, "
f"credential_id={matching_cred.id}",
exc_info=True,
)
missing_creds.append(
f"{credential_field_name} (validation failed: {e})"
)
else:
missing_creds.append(
f"{credential_field_name} "
f"(requires provider in {list(credential_requirements.provider)}, "
f"type in {list(credential_requirements.supported_types)})"
)
# Fail fast if any required credentials are missing
if missing_creds:
logger.warning(
f"Cannot execute agent - missing credentials: {missing_creds}"
)
if not graph:
return ErrorResponse(
message=f"Cannot execute agent: missing {len(missing_creds)} required credential(s). You need to call the get_required_setup_info tool to setup the credentials."
f"Please set up the following credentials: {', '.join(missing_creds)}",
message=f"Agent '{agent_slug}' not found in marketplace",
session_id=session_id,
details={"missing_credentials": missing_creds},
)
logger.info(
f"Credential matching complete: {len(graph_credentials_inputs)}/{len(aggregated_creds)} matched"
# Step 2: Check credentials
graph_credentials, missing_creds = await match_user_credentials_to_graph(
user_id, graph
)
# At this point we know the user is ready to run the agent
# So we can execute the agent
if missing_creds:
# Return credentials needed response with input data info
# The UI handles credential setup automatically, so the message
# focuses on asking about input data
credentials = extract_credentials_from_schema(
graph.credentials_input_schema
)
missing_creds_check = await check_user_has_required_credentials(
user_id, credentials
)
missing_credentials_dict = {
c.id: c.model_dump() for c in missing_creds_check
}
return SetupRequirementsResponse(
message=self._build_inputs_message(graph, MSG_WHAT_VALUES_TO_USE),
session_id=session_id,
setup_info=SetupInfo(
agent_id=graph.id,
agent_name=graph.name,
user_readiness=UserReadiness(
has_all_credentials=False,
missing_credentials=missing_credentials_dict,
ready_to_run=False,
),
requirements={
"credentials": [c.model_dump() for c in credentials],
"inputs": self._get_inputs_list(graph.input_schema),
"execution_modes": self._get_execution_modes(graph),
},
),
graph_id=graph.id,
graph_version=graph.version,
)
# Step 3: Check inputs
# Get all available input fields from schema
input_properties = graph.input_schema.get("properties", {})
required_fields = set(graph.input_schema.get("required", []))
provided_inputs = set(inputs.keys())
# If agent has inputs but none were provided AND use_defaults is not set,
# always show what's available first so user can decide
if input_properties and not provided_inputs and not use_defaults:
credentials = extract_credentials_from_schema(
graph.credentials_input_schema
)
return AgentDetailsResponse(
message=self._build_inputs_message(graph, MSG_ASK_USER_FOR_VALUES),
session_id=session_id,
agent=self._build_agent_details(graph, credentials),
user_authenticated=True,
graph_id=graph.id,
graph_version=graph.version,
)
# Check if required inputs are missing (and not using defaults)
missing_inputs = required_fields - provided_inputs
if missing_inputs and not use_defaults:
# Return agent details with missing inputs info
credentials = extract_credentials_from_schema(
graph.credentials_input_schema
)
return AgentDetailsResponse(
message=(
f"Agent '{graph.name}' is missing required inputs: "
f"{', '.join(missing_inputs)}. "
"Please provide these values to run the agent."
),
session_id=session_id,
agent=self._build_agent_details(graph, credentials),
user_authenticated=True,
graph_id=graph.id,
graph_version=graph.version,
)
# Step 4: Execute or Schedule
if is_schedule:
return await self._schedule_agent(
user_id=user_id,
session=session,
graph=graph,
graph_credentials=graph_credentials,
inputs=inputs,
schedule_name=schedule_name,
cron=cron,
timezone=timezone,
)
else:
return await self._run_agent(
user_id=user_id,
session=session,
graph=graph,
graph_credentials=graph_credentials,
inputs=inputs,
)
except NotFoundError as e:
return ErrorResponse(
message=f"Agent '{agent_slug}' not found",
error=str(e) if str(e) else "not_found",
session_id=session_id,
)
except DatabaseError as e:
logger.error(f"Database error: {e}", exc_info=True)
return ErrorResponse(
message=f"Failed to process request: {e!s}",
error=str(e),
session_id=session_id,
)
except Exception as e:
logger.error(f"Error processing agent request: {e}", exc_info=True)
return ErrorResponse(
message=f"Failed to process request: {e!s}",
error=str(e),
session_id=session_id,
)
def _get_inputs_list(self, input_schema: dict[str, Any]) -> list[dict[str, Any]]:
"""Extract inputs list from schema."""
inputs_list = []
if isinstance(input_schema, dict) and "properties" in input_schema:
for field_name, field_schema in input_schema["properties"].items():
inputs_list.append(
{
"name": field_name,
"title": field_schema.get("title", field_name),
"type": field_schema.get("type", "string"),
"description": field_schema.get("description", ""),
"required": field_name in input_schema.get("required", []),
}
)
return inputs_list
def _get_execution_modes(self, graph: GraphModel) -> list[str]:
"""Get available execution modes for the graph."""
trigger_info = graph.trigger_setup_info
if trigger_info is None:
return ["manual", "scheduled"]
return ["webhook"]
def _build_inputs_message(
self,
graph: GraphModel,
suffix: str,
) -> str:
"""Build a message describing available inputs for an agent."""
inputs_list = self._get_inputs_list(graph.input_schema)
required_names = [i["name"] for i in inputs_list if i["required"]]
optional_names = [i["name"] for i in inputs_list if not i["required"]]
message_parts = [f"Agent '{graph.name}' accepts the following inputs:"]
if required_names:
message_parts.append(f"Required: {', '.join(required_names)}.")
if optional_names:
message_parts.append(
f"Optional (have defaults): {', '.join(optional_names)}."
)
if not inputs_list:
message_parts = [f"Agent '{graph.name}' has no required inputs."]
message_parts.append(suffix)
return " ".join(message_parts)
def _build_agent_details(
self,
graph: GraphModel,
credentials: list[CredentialsMetaInput],
) -> AgentDetails:
"""Build AgentDetails from a graph."""
trigger_info = (
graph.trigger_setup_info.model_dump() if graph.trigger_setup_info else None
)
return AgentDetails(
id=graph.id,
name=graph.name,
description=graph.description,
inputs=graph.input_schema,
credentials=credentials,
execution_options=ExecutionOptions(
manual=trigger_info is None,
scheduled=trigger_info is None,
webhook=trigger_info is not None,
),
trigger_info=trigger_info,
)
async def _run_agent(
self,
user_id: str,
session: ChatSession,
graph: GraphModel,
graph_credentials: dict[str, CredentialsMetaInput],
inputs: dict[str, Any],
) -> ToolResponseBase:
"""Execute an agent immediately."""
session_id = session.session_id
# Check rate limits
if session.successful_agent_runs.get(graph.id, 0) >= config.max_agent_runs:
return ErrorResponse(
message="Maximum agent runs reached for this session. Please try again later.",
session_id=session_id,
)
# Get or create library agent
library_agent = await get_or_create_library_agent(graph, user_id)
# Execute
execution = await execution_utils.add_graph_execution(
graph_id=library_agent.graph_id,
user_id=user_id,
inputs=inputs,
graph_credentials_inputs=graph_credentials_inputs,
graph_credentials_inputs=graph_credentials,
)
# Track successful run
session.successful_agent_runs[library_agent.graph_id] = (
session.successful_agent_runs.get(library_agent.graph_id, 0) + 1
)
library_agent_link = f"/library/agents/{library_agent.id}"
return ExecutionStartedResponse(
message=f"Agent execution successfully started. You can add a link to the agent at: /library/agents/{library_agent.id}. Do not run this tool again unless specifically asked to run the agent again.",
message=(
f"Agent '{library_agent.name}' execution started successfully. "
f"View at {library_agent_link}. "
f"{MSG_DO_NOT_RUN_AGAIN}"
),
session_id=session_id,
execution_id=execution.id,
graph_id=library_agent.graph_id,
graph_name=library_agent.name,
library_agent_id=library_agent.id,
library_agent_link=library_agent_link,
)
async def _schedule_agent(
self,
user_id: str,
session: ChatSession,
graph: GraphModel,
graph_credentials: dict[str, CredentialsMetaInput],
inputs: dict[str, Any],
schedule_name: str,
cron: str,
timezone: str,
) -> ToolResponseBase:
"""Set up scheduled execution for an agent."""
session_id = session.session_id
# Validate schedule params
if not schedule_name:
return ErrorResponse(
message="schedule_name is required for scheduled execution",
session_id=session_id,
)
if not cron:
return ErrorResponse(
message="cron expression is required for scheduled execution",
session_id=session_id,
)
# Check rate limits
if (
session.successful_agent_schedules.get(graph.id, 0)
>= config.max_agent_schedules
):
return ErrorResponse(
message="Maximum agent schedules reached for this session.",
session_id=session_id,
)
# Get or create library agent
library_agent = await get_or_create_library_agent(graph, user_id)
# Get user timezone
user = await get_user_by_id(user_id)
user_timezone = get_user_timezone_or_utc(user.timezone if user else timezone)
# Create schedule
result = await get_scheduler_client().add_execution_schedule(
user_id=user_id,
graph_id=library_agent.graph_id,
graph_version=library_agent.graph_version,
name=schedule_name,
cron=cron,
input_data=inputs,
input_credentials=graph_credentials,
user_timezone=user_timezone,
)
# Convert next_run_time to user timezone for display
if result.next_run_time:
result.next_run_time = convert_utc_time_to_user_timezone(
result.next_run_time, user_timezone
)
# Track successful schedule
session.successful_agent_schedules[library_agent.graph_id] = (
session.successful_agent_schedules.get(library_agent.graph_id, 0) + 1
)
library_agent_link = f"/library/agents/{library_agent.id}"
return ExecutionStartedResponse(
message=(
f"Agent '{library_agent.name}' scheduled successfully as '{schedule_name}'. "
f"View at {library_agent_link}. "
f"{MSG_DO_NOT_SCHEDULE_AGAIN}"
),
session_id=session_id,
execution_id=result.id,
graph_id=library_agent.graph_id,
graph_name=library_agent.name,
library_agent_id=library_agent.id,
library_agent_link=library_agent_link,
)

View File

@@ -5,6 +5,7 @@ import pytest
from backend.server.v2.chat.tools._test_data import (
make_session,
setup_firecrawl_test_data,
setup_llm_test_data,
setup_test_data,
)
@@ -13,6 +14,7 @@ from backend.server.v2.chat.tools.run_agent import RunAgentTool
# This is so the formatter doesn't remove the fixture imports
setup_llm_test_data = setup_llm_test_data
setup_test_data = setup_test_data
setup_firecrawl_test_data = setup_firecrawl_test_data
@pytest.mark.asyncio(scope="session")
@@ -169,3 +171,221 @@ async def test_run_agent_with_llm_credentials(setup_llm_test_data):
assert result_data["graph_id"] == graph.id
assert "graph_name" in result_data
assert result_data["graph_name"] == "LLM Test Agent"
@pytest.mark.asyncio(scope="session")
async def test_run_agent_shows_available_inputs_when_none_provided(setup_test_data):
"""Test that run_agent returns available inputs when called without inputs or use_defaults."""
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
tool = RunAgentTool()
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
session = make_session(user_id=user.id)
# Execute without inputs and without use_defaults
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={},
use_defaults=False,
session=session,
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Should return agent_details type showing available inputs
assert result_data.get("type") == "agent_details"
assert "agent" in result_data
assert "message" in result_data
# Message should mention inputs
assert "inputs" in result_data["message"].lower()
@pytest.mark.asyncio(scope="session")
async def test_run_agent_with_use_defaults(setup_test_data):
"""Test that run_agent executes successfully with use_defaults=True."""
user = setup_test_data["user"]
graph = setup_test_data["graph"]
store_submission = setup_test_data["store_submission"]
tool = RunAgentTool()
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
session = make_session(user_id=user.id)
# Execute with use_defaults=True (no explicit inputs)
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={},
use_defaults=True,
session=session,
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Should execute successfully
assert "execution_id" in result_data
assert result_data["graph_id"] == graph.id
@pytest.mark.asyncio(scope="session")
async def test_run_agent_missing_credentials(setup_firecrawl_test_data):
"""Test that run_agent returns setup_requirements when credentials are missing."""
user = setup_firecrawl_test_data["user"]
store_submission = setup_firecrawl_test_data["store_submission"]
tool = RunAgentTool()
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
session = make_session(user_id=user.id)
# Execute - user doesn't have firecrawl credentials
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={"url": "https://example.com"},
session=session,
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Should return setup_requirements type with missing credentials
assert result_data.get("type") == "setup_requirements"
assert "setup_info" in result_data
setup_info = result_data["setup_info"]
assert "user_readiness" in setup_info
assert setup_info["user_readiness"]["has_all_credentials"] is False
assert len(setup_info["user_readiness"]["missing_credentials"]) > 0
@pytest.mark.asyncio(scope="session")
async def test_run_agent_invalid_slug_format(setup_test_data):
"""Test that run_agent returns error for invalid slug format (no slash)."""
user = setup_test_data["user"]
tool = RunAgentTool()
session = make_session(user_id=user.id)
# Execute with invalid slug format
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug="no-slash-here",
inputs={},
session=session,
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Should return error
assert result_data.get("type") == "error"
assert "username/agent-name" in result_data["message"]
@pytest.mark.asyncio(scope="session")
async def test_run_agent_unauthenticated():
"""Test that run_agent returns need_login for unauthenticated users."""
tool = RunAgentTool()
session = make_session(user_id=None)
# Execute without user_id
response = await tool.execute(
user_id=None,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug="test/test-agent",
inputs={},
session=session,
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Base tool returns need_login type for unauthenticated users
assert result_data.get("type") == "need_login"
assert "sign in" in result_data["message"].lower()
@pytest.mark.asyncio(scope="session")
async def test_run_agent_schedule_without_cron(setup_test_data):
"""Test that run_agent returns error when scheduling without cron expression."""
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
tool = RunAgentTool()
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
session = make_session(user_id=user.id)
# Try to schedule without cron
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={"test_input": "test"},
schedule_name="My Schedule",
cron="", # Empty cron
session=session,
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Should return error about missing cron
assert result_data.get("type") == "error"
assert "cron" in result_data["message"].lower()
@pytest.mark.asyncio(scope="session")
async def test_run_agent_schedule_without_name(setup_test_data):
"""Test that run_agent returns error when scheduling without schedule_name."""
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
tool = RunAgentTool()
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
session = make_session(user_id=user.id)
# Try to schedule without schedule_name
response = await tool.execute(
user_id=user.id,
session_id=str(uuid.uuid4()),
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={"test_input": "test"},
schedule_name="", # Empty name
cron="0 9 * * *",
session=session,
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Should return error about missing schedule_name
assert result_data.get("type") == "error"
assert "schedule_name" in result_data["message"].lower()

View File

@@ -1,395 +0,0 @@
"""Tool for setting up an agent with credentials and configuration."""
import logging
from typing import Any
from pydantic import BaseModel
from backend.data.graph import get_graph
from backend.data.model import CredentialsMetaInput
from backend.data.user import get_user_by_id
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.server.v2.chat.config import ChatConfig
from backend.server.v2.chat.model import ChatSession
from backend.server.v2.chat.tools.get_required_setup_info import (
GetRequiredSetupInfoTool,
)
from backend.server.v2.chat.tools.models import (
ExecutionStartedResponse,
SetupInfo,
SetupRequirementsResponse,
)
from backend.server.v2.library import db as library_db
from backend.server.v2.library import model as library_model
from backend.util.clients import get_scheduler_client
from backend.util.timezone_utils import (
convert_utc_time_to_user_timezone,
get_user_timezone_or_utc,
)
from .base import BaseTool
from .models import ErrorResponse, ToolResponseBase
config = ChatConfig()
logger = logging.getLogger(__name__)
class AgentDetails(BaseModel):
graph_name: str
graph_id: str
graph_version: int
recommended_schedule_cron: str | None
required_credentials: dict[str, CredentialsMetaInput]
class SetupAgentTool(BaseTool):
"""Tool for setting up an agent with scheduled execution or webhook triggers."""
@property
def name(self) -> str:
return "schedule_agent"
@property
def description(self) -> str:
return """Set up an agent with credentials and configure it for scheduled execution or webhook triggers.
IMPORTANT: Before calling this tool, you MUST first call get_agent_details to determine what inputs are required.
For SCHEDULED execution:
- Cron format: "minute hour day month weekday" (e.g., "0 9 * * 1-5" = 9am weekdays)
- Common patterns: "0 * * * *" (hourly), "0 0 * * *" (daily at midnight), "0 9 * * 1" (Mondays at 9am)
- Timezone: Use IANA timezone names like "America/New_York", "Europe/London", "Asia/Tokyo"
- The 'inputs' parameter must contain ALL required inputs from get_agent_details as a dictionary
For WEBHOOK triggers:
- The agent will be triggered by external events
- Still requires all input values from get_agent_details"""
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"username_agent_slug": {
"type": "string",
"description": "The marketplace agent slug (e.g., 'username/agent-name')",
},
"setup_type": {
"type": "string",
"enum": ["schedule", "webhook"],
"description": "Type of setup: 'schedule' for cron, 'webhook' for triggers.",
},
"name": {
"type": "string",
"description": "Name for this setup/schedule (e.g., 'Daily Report', 'Weekly Summary')",
},
"description": {
"type": "string",
"description": "Description of this setup",
},
"cron": {
"type": "string",
"description": "Cron expression (5 fields: minute hour day month weekday). Examples: '0 9 * * 1-5' (9am weekdays), '*/30 * * * *' (every 30 min)",
},
"timezone": {
"type": "string",
"description": "IANA timezone (e.g., 'America/New_York', 'Europe/London', 'UTC'). Defaults to UTC if not specified.",
},
"inputs": {
"type": "object",
"description": 'REQUIRED: Dictionary with ALL required inputs from get_agent_details. Format: {"input_name": value}',
"additionalProperties": True,
},
"webhook_config": {
"type": "object",
"description": "Webhook configuration (required if setup_type is 'webhook')",
"additionalProperties": True,
},
},
"required": ["username_agent_slug", "setup_type"],
}
@property
def requires_auth(self) -> bool:
"""This tool requires authentication."""
return True
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
"""Set up an agent with configuration.
Args:
user_id: Authenticated user ID
session_id: Chat session ID
**kwargs: Setup parameters
Returns:
JSON formatted setup result
"""
assert (
user_id is not None
), "User ID is required to run an agent. Superclass enforces authentication."
session_id = session.session_id
setup_type = kwargs.get("setup_type", "schedule").strip()
if setup_type != "schedule":
return ErrorResponse(
message="Only schedule setup is supported at this time",
session_id=session_id,
)
else:
cron = kwargs.get("cron", "").strip()
cron_name = kwargs.get("name", "").strip()
if not cron or not cron_name:
return ErrorResponse(
message="Cron and name are required for schedule setup",
session_id=session_id,
)
username_agent_slug = kwargs.get("username_agent_slug", "").strip()
inputs = kwargs.get("inputs", {})
library_agent = await self._get_or_add_library_agent(
username_agent_slug, user_id, session, **kwargs
)
if not isinstance(library_agent, AgentDetails):
# library agent is an ErrorResponse
return library_agent
if library_agent and (
session.successful_agent_schedules.get(library_agent.graph_id, 0)
if isinstance(library_agent, AgentDetails)
else 0 >= config.max_agent_schedules
):
return ErrorResponse(
message="Maximum number of agent schedules reached. You can't schedule this agent again in this chat session.",
session_id=session.session_id,
)
# At this point we know the user is ready to run the agent
# Create the schedule for the agent
from backend.server.v2.library import db as library_db
# Get the library agent model for scheduling
lib_agent = await library_db.get_library_agent_by_graph_id(
graph_id=library_agent.graph_id, user_id=user_id
)
if not lib_agent:
return ErrorResponse(
message=f"Library agent not found for graph {library_agent.graph_id}",
session_id=session_id,
)
return await self._add_graph_execution_schedule(
library_agent=lib_agent,
user_id=user_id,
cron=cron,
name=cron_name,
inputs=inputs,
credentials=library_agent.required_credentials,
session=session,
)
async def _add_graph_execution_schedule(
self,
library_agent: library_model.LibraryAgent,
user_id: str,
cron: str,
name: str,
inputs: dict[str, Any],
credentials: dict[str, CredentialsMetaInput],
session: ChatSession,
**kwargs,
) -> ExecutionStartedResponse | ErrorResponse:
# Use timezone from request if provided, otherwise fetch from user profile
user = await get_user_by_id(user_id)
user_timezone = get_user_timezone_or_utc(user.timezone if user else None)
session_id = session.session_id
# Map required credentials (schema field names) to actual user credential IDs
# credentials param contains CredentialsMetaInput with schema field names as keys
# We need to find the user's actual credentials that match the provider/type
creds_manager = IntegrationCredentialsManager()
user_credentials = await creds_manager.store.get_all_creds(user_id)
# Build a mapping from schema field name -> actual credential ID
resolved_credentials: dict[str, CredentialsMetaInput] = {}
missing_credentials: list[str] = []
for field_name, cred_meta in credentials.items():
# Find a matching credential from the user's credentials
matching_cred = next(
(
c
for c in user_credentials
if c.provider == cred_meta.provider and c.type == cred_meta.type
),
None,
)
if matching_cred:
# Use the actual credential ID instead of the schema field name
# Create a new CredentialsMetaInput with the actual credential ID
# but keep the same provider/type from the original meta
resolved_credentials[field_name] = CredentialsMetaInput(
id=matching_cred.id,
provider=cred_meta.provider,
type=cred_meta.type,
title=cred_meta.title,
)
else:
missing_credentials.append(
f"{cred_meta.title} ({cred_meta.provider}/{cred_meta.type})"
)
if missing_credentials:
return ErrorResponse(
message=f"Cannot execute agent: missing {len(missing_credentials)} required credential(s). You need to call the get_required_setup_info tool to setup the credentials.",
session_id=session_id,
)
result = await get_scheduler_client().add_execution_schedule(
user_id=user_id,
graph_id=library_agent.graph_id,
graph_version=library_agent.graph_version,
name=name,
cron=cron,
input_data=inputs,
input_credentials=resolved_credentials,
user_timezone=user_timezone,
)
# Convert the next_run_time back to user timezone for display
if result.next_run_time:
result.next_run_time = convert_utc_time_to_user_timezone(
result.next_run_time, user_timezone
)
session.successful_agent_schedules[library_agent.graph_id] = (
session.successful_agent_schedules.get(library_agent.graph_id, 0) + 1
)
return ExecutionStartedResponse(
message=f"Agent execution successfully scheduled. You can add a link to the agent at: /library/agents/{library_agent.id}. Do not run this tool again unless specifically asked to run the agent again.",
session_id=session_id,
execution_id=result.id,
graph_id=library_agent.graph_id,
graph_name=library_agent.name,
)
async def _get_or_add_library_agent(
self, agent_id: str, user_id: str, session: ChatSession, **kwargs
) -> AgentDetails | ErrorResponse:
# Call _execute directly since we're calling internally from another tool
session_id = session.session_id
response = await GetRequiredSetupInfoTool()._execute(user_id, session, **kwargs)
if not isinstance(response, SetupRequirementsResponse):
return ErrorResponse(
message="Failed to get required setup information",
session_id=session_id,
)
setup_info = SetupInfo.model_validate(response.setup_info)
if not setup_info.user_readiness.ready_to_run:
return ErrorResponse(
message=f"User is not ready to run the agent. User Readiness: {setup_info.user_readiness.model_dump_json()} Requirments: {setup_info.requirements}",
session_id=session_id,
)
# Get the graph using the graph_id and graph_version from the setup response
if not response.graph_id or not response.graph_version:
return ErrorResponse(
message=f"Graph information not available for {agent_id}",
session_id=session_id,
)
graph = await get_graph(
graph_id=response.graph_id,
version=response.graph_version,
user_id=None, # Public access for store graphs
include_subgraphs=True,
)
if not graph:
return ErrorResponse(
message=f"Graph {agent_id} ({response.graph_id}v{response.graph_version}) not found",
session_id=session_id,
)
recommended_schedule_cron = graph.recommended_schedule_cron
# Extract credentials from the JSON schema properties
credentials_input_schema = graph.credentials_input_schema
required_credentials: dict[str, CredentialsMetaInput] = {}
if (
isinstance(credentials_input_schema, dict)
and "properties" in credentials_input_schema
):
for cred_name, cred_schema in credentials_input_schema[
"properties"
].items():
# Get provider from credentials_provider array or properties.provider.const
provider = "unknown"
if (
"credentials_provider" in cred_schema
and cred_schema["credentials_provider"]
):
provider = cred_schema["credentials_provider"][0]
elif (
"properties" in cred_schema
and "provider" in cred_schema["properties"]
):
provider = cred_schema["properties"]["provider"].get(
"const", "unknown"
)
# Get type from credentials_types array or properties.type.const
cred_type = "api_key" # Default
if (
"credentials_types" in cred_schema
and cred_schema["credentials_types"]
):
cred_type = cred_schema["credentials_types"][0]
elif (
"properties" in cred_schema and "type" in cred_schema["properties"]
):
cred_type = cred_schema["properties"]["type"].get(
"const", "api_key"
)
required_credentials[cred_name] = CredentialsMetaInput(
id=cred_name,
title=cred_schema.get("title", cred_name),
provider=provider, # type: ignore
type=cred_type,
)
# Check if we already have a library agent for this graph
existing_library_agent = await library_db.get_library_agent_by_graph_id(
graph_id=graph.id, user_id=user_id
)
if not existing_library_agent:
# Now we need to add the graph to the users library
library_agents: list[library_model.LibraryAgent] = (
await library_db.create_library_agent(
graph=graph,
user_id=user_id,
create_library_agents_for_sub_graphs=False,
)
)
assert len(library_agents) == 1, "Expected 1 library agent to be created"
library_agent = library_agents[0]
else:
library_agent = existing_library_agent
return AgentDetails(
graph_name=graph.name,
graph_id=library_agent.graph_id,
graph_version=library_agent.graph_version,
recommended_schedule_cron=recommended_schedule_cron,
required_credentials=required_credentials,
)

View File

@@ -1,422 +0,0 @@
import uuid
import orjson
import pytest
from backend.server.v2.chat.tools._test_data import (
make_session,
setup_llm_test_data,
setup_test_data,
)
from backend.server.v2.chat.tools.setup_agent import SetupAgentTool
from backend.util.clients import get_scheduler_client
# This is so the formatter doesn't remove the fixture imports
setup_llm_test_data = setup_llm_test_data
setup_test_data = setup_test_data
@pytest.mark.asyncio(scope="session")
async def test_setup_agent_missing_cron(setup_test_data):
"""Test error when cron is missing for schedule setup"""
# Use test data from fixture
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = SetupAgentTool()
# Build the session
session = make_session(user_id=user.id)
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute without cron
response = await tool.execute(
user_id=user.id,
session=session,
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
setup_type="schedule",
inputs={"test_input": "Hello World"},
# Missing: cron and name
)
# Verify error response
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "message" in result_data
assert (
"cron" in result_data["message"].lower()
or "name" in result_data["message"].lower()
)
@pytest.mark.asyncio(scope="session")
async def test_setup_agent_webhook_not_supported(setup_test_data):
"""Test error when webhook setup is attempted"""
# Use test data from fixture
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = SetupAgentTool()
# Build the session
session = make_session(user_id=user.id)
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute with webhook setup_type
response = await tool.execute(
user_id=user.id,
session=session,
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
setup_type="webhook",
inputs={"test_input": "Hello World"},
)
# Verify error response
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "message" in result_data
message_lower = result_data["message"].lower()
assert "schedule" in message_lower and "supported" in message_lower
@pytest.mark.asyncio(scope="session")
@pytest.mark.skip(reason="Requires scheduler service to be running")
async def test_setup_agent_schedule_success(setup_test_data):
"""Test successfully setting up an agent with a schedule"""
# Use test data from fixture
user = setup_test_data["user"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = SetupAgentTool()
# Build the session
session = make_session(user_id=user.id)
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute with schedule setup
response = await tool.execute(
user_id=user.id,
session=session,
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
setup_type="schedule",
name="Test Schedule",
description="Test schedule description",
cron="0 9 * * *", # Daily at 9am
timezone="UTC",
inputs={"test_input": "Hello World"},
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Check for execution started
assert "message" in result_data
assert "execution_id" in result_data
assert "graph_id" in result_data
assert "graph_name" in result_data
@pytest.mark.asyncio(scope="session")
@pytest.mark.skip(reason="Requires scheduler service to be running")
async def test_setup_agent_with_credentials(setup_llm_test_data):
"""Test setting up an agent that requires credentials"""
# Use test data from fixture (includes OpenAI credentials)
user = setup_llm_test_data["user"]
store_submission = setup_llm_test_data["store_submission"]
# Create the tool instance
tool = SetupAgentTool()
# Build the session
session = make_session(user_id=user.id)
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute with schedule setup
response = await tool.execute(
user_id=user.id,
session=session,
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
setup_type="schedule",
name="LLM Schedule",
description="LLM schedule with credentials",
cron="*/30 * * * *", # Every 30 minutes
timezone="America/New_York",
inputs={"user_prompt": "What is 2+2?"},
)
# Verify the response
assert response is not None
assert hasattr(response, "result")
# Parse the result JSON
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
# Should succeed since user has OpenAI credentials
assert "execution_id" in result_data
assert "graph_id" in result_data
@pytest.mark.asyncio(scope="session")
async def test_setup_agent_invalid_agent(setup_test_data):
"""Test error when agent doesn't exist"""
# Use test data from fixture
user = setup_test_data["user"]
# Create the tool instance
tool = SetupAgentTool()
# Build the session
session = make_session(user_id=user.id)
# Execute with non-existent agent
response = await tool.execute(
user_id=user.id,
session=session,
tool_call_id=str(uuid.uuid4()),
username_agent_slug="nonexistent/agent",
setup_type="schedule",
name="Test Schedule",
cron="0 9 * * *",
inputs={},
)
# Verify error response
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "message" in result_data
# Should fail to find the agent
assert any(
phrase in result_data["message"].lower()
for phrase in ["not found", "failed", "error"]
)
@pytest.mark.asyncio(scope="session")
@pytest.mark.skip(reason="Requires scheduler service to be running")
async def test_setup_agent_schedule_created_in_scheduler(setup_test_data):
"""Test that the schedule is actually created in the scheduler service"""
# Use test data from fixture
user = setup_test_data["user"]
graph = setup_test_data["graph"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = SetupAgentTool()
# Build the session
session = make_session(user_id=user.id)
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Create a unique schedule name to identify this test
schedule_name = f"Test Schedule {uuid.uuid4()}"
# Execute with schedule setup
response = await tool.execute(
user_id=user.id,
session=session,
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
setup_type="schedule",
name=schedule_name,
description="Test schedule to verify credentials",
cron="0 0 * * *", # Daily at midnight
timezone="UTC",
inputs={"test_input": "Scheduled execution"},
)
# Verify the response
assert response is not None
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "execution_id" in result_data
# Now verify the schedule was created in the scheduler service
scheduler = get_scheduler_client()
schedules = await scheduler.get_execution_schedules(graph.id, user.id)
# Find our schedule
our_schedule = None
for schedule in schedules:
if schedule.name == schedule_name:
our_schedule = schedule
break
assert (
our_schedule is not None
), f"Schedule '{schedule_name}' not found in scheduler"
assert our_schedule.cron == "0 0 * * *"
assert our_schedule.graph_id == graph.id
# Clean up: delete the schedule
await scheduler.delete_schedule(our_schedule.id, user_id=user.id)
@pytest.mark.asyncio(scope="session")
@pytest.mark.skip(reason="Requires scheduler service to be running")
async def test_setup_agent_schedule_with_credentials_triggered(setup_llm_test_data):
"""Test that credentials are properly passed when a schedule is triggered"""
# Use test data from fixture (includes OpenAI credentials)
user = setup_llm_test_data["user"]
graph = setup_llm_test_data["graph"]
store_submission = setup_llm_test_data["store_submission"]
# Create the tool instance
tool = SetupAgentTool()
# Build the session
session = make_session(user_id=user.id)
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Create a unique schedule name
schedule_name = f"LLM Test Schedule {uuid.uuid4()}"
# Execute with schedule setup
response = await tool.execute(
user_id=user.id,
session=session,
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
setup_type="schedule",
name=schedule_name,
description="Test LLM schedule with credentials",
cron="* * * * *", # Every minute (for testing)
timezone="UTC",
inputs={"user_prompt": "Test prompt for credentials"},
)
# Verify the response
assert response is not None
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "execution_id" in result_data
# Get the schedule from the scheduler
scheduler = get_scheduler_client()
schedules = await scheduler.get_execution_schedules(graph.id, user.id)
# Find our schedule
our_schedule = None
for schedule in schedules:
if schedule.name == schedule_name:
our_schedule = schedule
break
assert our_schedule is not None, f"Schedule '{schedule_name}' not found"
# Verify the schedule has the correct input data
assert our_schedule.input_data is not None
assert "user_prompt" in our_schedule.input_data
assert our_schedule.input_data["user_prompt"] == "Test prompt for credentials"
# Verify credentials are stored in the schedule
# The credentials should be stored as input_credentials
assert our_schedule.input_credentials is not None
# The credentials should contain the OpenAI provider credential
# Note: The exact structure depends on how credentials are serialized
# We're checking that credentials data exists and has the right provider
if our_schedule.input_credentials:
# Convert to dict if needed
creds_dict = (
our_schedule.input_credentials
if isinstance(our_schedule.input_credentials, dict)
else {}
)
# Check if any credential has openai provider
has_openai_cred = False
for cred_key, cred_value in creds_dict.items():
if isinstance(cred_value, dict):
if cred_value.get("provider") == "openai":
has_openai_cred = True
# Verify the credential has the expected structure
assert "id" in cred_value or "api_key" in cred_value
break
# If we have LLM block, we should have stored credentials
assert has_openai_cred, "OpenAI credentials not found in schedule"
# Clean up: delete the schedule
await scheduler.delete_schedule(our_schedule.id, user_id=user.id)
@pytest.mark.asyncio(scope="session")
@pytest.mark.skip(reason="Requires scheduler service to be running")
async def test_setup_agent_creates_library_agent(setup_test_data):
"""Test that setup creates a library agent for the user"""
# Use test data from fixture
user = setup_test_data["user"]
graph = setup_test_data["graph"]
store_submission = setup_test_data["store_submission"]
# Create the tool instance
tool = SetupAgentTool()
# Build the session
session = make_session(user_id=user.id)
# Build the proper marketplace agent_id format
agent_marketplace_id = f"{user.email.split('@')[0]}/{store_submission.slug}"
# Execute with schedule setup
response = await tool.execute(
user_id=user.id,
session=session,
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
setup_type="schedule",
name="Library Test Schedule",
cron="0 12 * * *", # Daily at noon
inputs={"test_input": "Library test"},
)
# Verify the response
assert response is not None
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert "graph_id" in result_data
assert result_data["graph_id"] == graph.id
# Verify library agent was created
from backend.server.v2.library import db as library_db
library_agent = await library_db.get_library_agent_by_graph_id(
graph_id=graph.id, user_id=user.id
)
assert library_agent is not None
assert library_agent.graph_id == graph.id
assert library_agent.name == "Test Agent"

View File

@@ -0,0 +1,288 @@
"""Shared utilities for chat tools."""
import logging
from typing import Any
from backend.data import graph as graph_db
from backend.data.graph import GraphModel
from backend.data.model import CredentialsMetaInput
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.server.v2.library import db as library_db
from backend.server.v2.library import model as library_model
from backend.server.v2.store import db as store_db
from backend.util.exceptions import NotFoundError
logger = logging.getLogger(__name__)
async def fetch_graph_from_store_slug(
username: str,
agent_name: str,
) -> tuple[GraphModel | None, Any | None]:
"""
Fetch graph from store by username/agent_name slug.
Args:
username: Creator's username
agent_name: Agent name/slug
Returns:
tuple[Graph | None, StoreAgentDetails | None]: The graph and store agent details,
or (None, None) if not found.
Raises:
DatabaseError: If there's a database error during lookup.
"""
try:
store_agent = await store_db.get_store_agent_details(username, agent_name)
except NotFoundError:
return None, None
# Get the graph from store listing version
graph_meta = await store_db.get_available_graph(
store_agent.store_listing_version_id
)
graph = await graph_db.get_graph(
graph_id=graph_meta.id,
version=graph_meta.version,
user_id=None, # Public access
include_subgraphs=True,
)
return graph, store_agent
def extract_credentials_from_schema(
credentials_input_schema: dict[str, Any] | None,
) -> list[CredentialsMetaInput]:
"""
Extract credential requirements from graph's credentials_input_schema.
This consolidates duplicated logic from get_agent_details.py and setup_agent.py.
Args:
credentials_input_schema: The credentials_input_schema from a Graph object
Returns:
List of CredentialsMetaInput with provider and type info
"""
credentials: list[CredentialsMetaInput] = []
if (
not isinstance(credentials_input_schema, dict)
or "properties" not in credentials_input_schema
):
return credentials
for cred_name, cred_schema in credentials_input_schema["properties"].items():
provider = _extract_provider_from_schema(cred_schema)
cred_type = _extract_credential_type_from_schema(cred_schema)
credentials.append(
CredentialsMetaInput(
id=cred_name,
title=cred_schema.get("title", cred_name),
provider=provider, # type: ignore
type=cred_type, # type: ignore
)
)
return credentials
def extract_credentials_as_dict(
credentials_input_schema: dict[str, Any] | None,
) -> dict[str, CredentialsMetaInput]:
"""
Extract credential requirements as a dict keyed by field name.
Args:
credentials_input_schema: The credentials_input_schema from a Graph object
Returns:
Dict mapping field name to CredentialsMetaInput
"""
credentials: dict[str, CredentialsMetaInput] = {}
if (
not isinstance(credentials_input_schema, dict)
or "properties" not in credentials_input_schema
):
return credentials
for cred_name, cred_schema in credentials_input_schema["properties"].items():
provider = _extract_provider_from_schema(cred_schema)
cred_type = _extract_credential_type_from_schema(cred_schema)
credentials[cred_name] = CredentialsMetaInput(
id=cred_name,
title=cred_schema.get("title", cred_name),
provider=provider, # type: ignore
type=cred_type, # type: ignore
)
return credentials
def _extract_provider_from_schema(cred_schema: dict[str, Any]) -> str:
"""Extract provider from credential schema."""
if "credentials_provider" in cred_schema and cred_schema["credentials_provider"]:
return cred_schema["credentials_provider"][0]
if "properties" in cred_schema and "provider" in cred_schema["properties"]:
return cred_schema["properties"]["provider"].get("const", "unknown")
return "unknown"
def _extract_credential_type_from_schema(cred_schema: dict[str, Any]) -> str:
"""Extract credential type from credential schema."""
if "credentials_types" in cred_schema and cred_schema["credentials_types"]:
return cred_schema["credentials_types"][0]
if "properties" in cred_schema and "type" in cred_schema["properties"]:
return cred_schema["properties"]["type"].get("const", "api_key")
return "api_key"
async def get_or_create_library_agent(
graph: GraphModel,
user_id: str,
) -> library_model.LibraryAgent:
"""
Get existing library agent or create new one.
This consolidates duplicated logic from run_agent.py and setup_agent.py.
Args:
graph: The Graph to add to library
user_id: The user's ID
Returns:
LibraryAgent instance
"""
existing = await library_db.get_library_agent_by_graph_id(
graph_id=graph.id, user_id=user_id
)
if existing:
return existing
library_agents = await library_db.create_library_agent(
graph=graph,
user_id=user_id,
create_library_agents_for_sub_graphs=False,
)
assert len(library_agents) == 1, "Expected 1 library agent to be created"
return library_agents[0]
async def match_user_credentials_to_graph(
user_id: str,
graph: GraphModel,
) -> tuple[dict[str, CredentialsMetaInput], list[str]]:
"""
Match user's available credentials against graph's required credentials.
Uses graph.aggregate_credentials_inputs() which handles credentials from
multiple nodes and uses frozensets for provider matching.
Args:
user_id: The user's ID
graph: The Graph with credential requirements
Returns:
tuple[matched_credentials dict, missing_credential_descriptions list]
"""
graph_credentials_inputs: dict[str, CredentialsMetaInput] = {}
missing_creds: list[str] = []
# Get aggregated credentials requirements from the graph
aggregated_creds = graph.aggregate_credentials_inputs()
logger.debug(
f"Matching credentials for graph {graph.id}: {len(aggregated_creds)} required"
)
if not aggregated_creds:
return graph_credentials_inputs, missing_creds
# Get all available credentials for the user
creds_manager = IntegrationCredentialsManager()
available_creds = await creds_manager.store.get_all_creds(user_id)
# For each required credential field, find a matching user credential
# field_info.provider is a frozenset because aggregate_credentials_inputs()
# combines requirements from multiple nodes. A credential matches if its
# provider is in the set of acceptable providers.
for credential_field_name, (
credential_requirements,
_node_fields,
) in aggregated_creds.items():
# Find first matching credential by provider and type
matching_cred = next(
(
cred
for cred in available_creds
if cred.provider in credential_requirements.provider
and cred.type in credential_requirements.supported_types
),
None,
)
if matching_cred:
try:
graph_credentials_inputs[credential_field_name] = CredentialsMetaInput(
id=matching_cred.id,
provider=matching_cred.provider, # type: ignore
type=matching_cred.type,
title=matching_cred.title,
)
except Exception as e:
logger.error(
f"Failed to create CredentialsMetaInput for field '{credential_field_name}': "
f"provider={matching_cred.provider}, type={matching_cred.type}, "
f"credential_id={matching_cred.id}",
exc_info=True,
)
missing_creds.append(
f"{credential_field_name} (validation failed: {e})"
)
else:
missing_creds.append(
f"{credential_field_name} "
f"(requires provider in {list(credential_requirements.provider)}, "
f"type in {list(credential_requirements.supported_types)})"
)
logger.info(
f"Credential matching complete: {len(graph_credentials_inputs)}/{len(aggregated_creds)} matched"
)
return graph_credentials_inputs, missing_creds
async def check_user_has_required_credentials(
user_id: str,
required_credentials: list[CredentialsMetaInput],
) -> list[CredentialsMetaInput]:
"""
Check which required credentials the user is missing.
Args:
user_id: The user's ID
required_credentials: List of required credentials
Returns:
List of missing credentials (empty if user has all)
"""
if not required_credentials:
return []
creds_manager = IntegrationCredentialsManager()
available_creds = await creds_manager.store.get_all_creds(user_id)
missing: list[CredentialsMetaInput] = []
for required in required_credentials:
has_matching = any(
cred.provider == required.provider and cred.type == required.type
for cred in available_creds
)
if not has_matching:
missing.append(required)
return missing

View File

@@ -159,8 +159,9 @@ export function parseToolResponse(
type: "execution_started",
toolName: "execution_started",
executionId: (parsedResult.execution_id as string) || "",
agentName: parsedResult.agent_name as string | undefined,
agentName: (parsedResult.graph_name as string) || undefined,
message: parsedResult.message as string | undefined,
libraryAgentLink: parsedResult.library_agent_link as string | undefined,
timestamp: timestamp || new Date(),
};
}
@@ -263,7 +264,7 @@ export function extractCredentialsNeeded(
}));
return {
type: "credentials_needed",
toolName: "get_required_setup_info",
toolName: "run_agent",
credentials,
message: `To run ${agentName}, you need to add ${credentials.length === 1 ? "credentials" : `${credentials.length} credentials`}.`,
agentName,

View File

@@ -100,9 +100,9 @@ export function handleToolResponse(
parsedResult = null;
}
if (
chunk.tool_name === "get_required_setup_info" &&
chunk.tool_name === "run_agent" &&
chunk.success &&
parsedResult
parsedResult?.type === "setup_requirements"
) {
const credentialsMessage = extractCredentialsNeeded(parsedResult);
if (credentialsMessage) {

View File

@@ -75,6 +75,7 @@ export type ChatMessageData =
executionId: string;
agentName?: string;
message?: string;
libraryAgentLink?: string;
timestamp?: string | Date;
};