Compare commits

...

32 Commits

Author SHA1 Message Date
Reinier van der Leer
ab1bbba7cf yeet migrations that we don't need anymore (left over from conflict resolution on marketplace DB layer cleanup) 2026-03-11 19:11:21 +01:00
Reinier van der Leer
e05257d5b9 Merge branch 'dev' into pwuts/open-2923-v2-external-api 2026-03-11 18:26:53 +01:00
Reinier van der Leer
648567cf9c add GET /graphs/{graph_id}/marketplace-listing 2026-03-11 17:35:03 +01:00
Reinier van der Leer
842b72809d fix docstrings 2026-03-11 17:33:07 +01:00
Reinier van der Leer
e816bf92d3 support user_password and host_scoped credentials 2026-03-11 17:31:45 +01:00
Reinier van der Leer
604b9f16c0 add missing fields 2026-03-11 17:02:38 +01:00
Reinier van der Leer
fda3c5d236 add missing tools 2026-03-11 15:04:42 +01:00
Reinier van der Leer
7212c8be38 add FastAPI exception handler for Prisma's RecordNotFoundError 2026-03-11 12:56:35 +01:00
Reinier van der Leer
42bd0a20fe improve endpoint self-documentation, add explicit operation IDs 2026-03-11 12:54:55 +01:00
Reinier van der Leer
4cc25a5476 fix error handling 2026-03-10 22:49:58 +01:00
Reinier van der Leer
f88220b93b improve endpoint self-documentation 2026-03-10 22:40:04 +01:00
Reinier van der Leer
2dffcf14a2 Merge branch 'dev' into pwuts/open-2923-v2-external-api 2026-03-10 16:10:28 +01:00
Reinier van der Leer
caad72f390 fix secret push errors 2026-03-08 10:00:39 +01:00
Reinier van der Leer
b896ef0b39 clean up 2026-03-08 09:39:31 +01:00
Reinier van der Leer
a8a729b9e1 Merge branch 'dev' into pwuts/open-2923-v2-external-api 2026-03-06 18:43:14 +01:00
Reinier van der Leer
d49358ce7f clean up marketplace APIs 2026-03-03 15:47:01 +01:00
Reinier van der Leer
3644b8522a add MCP tool server 2026-03-03 12:41:21 +01:00
Reinier van der Leer
aa86d6cd06 allow easy v2 API schema export 2026-02-28 23:36:15 +01:00
Reinier van der Leer
8659ee617a remove newline from snapshot 2026-02-28 22:01:38 +01:00
Reinier van der Leer
95aebae6e3 merge overlapping listing endpoints (and optimize pre-commit config) 2026-02-28 21:42:44 +01:00
Reinier van der Leer
caf6009ae5 eliminate no-op GraphSettings.from_graph() and update_library_agent_version_and_settings() 2026-02-28 14:17:05 +01:00
Reinier van der Leer
a26480db31 make store DB relations and queries more logical and efficient 2026-02-28 13:32:43 +01:00
Reinier van der Leer
b0f024a78c fix credential requirement routes 2026-02-27 18:45:07 +01:00
Reinier van der Leer
e7d494aae7 add missing endpoints 2026-02-27 18:21:11 +01:00
Reinier van der Leer
fd726d688a Merge branch 'dev' into pwuts/open-2923-v2-external-api 2026-02-26 22:29:12 +01:00
Reinier van der Leer
47e0511a44 Merge branch 'dev' into pwuts/open-2923-v2-external-api 2026-02-26 19:25:22 +01:00
Reinier van der Leer
d6e3f7d108 add POST /marketplace/agents/{username}/{agent_name}/add-to-library 2026-02-25 17:17:30 +01:00
Reinier van der Leer
f31814ed65 fix graph ID handling 2026-02-25 16:51:58 +01:00
Reinier van der Leer
6b219ca5ad clean up API models 2026-02-25 16:10:50 +01:00
Reinier van der Leer
9b18fb930d Merge branch 'dev' into pwuts/open-2923-v2-external-api 2026-02-25 12:39:19 +01:00
Reinier van der Leer
a301ae4879 Merge branch 'dev' into pwuts/open-2923-v2-external-api 2026-01-16 14:48:02 +01:00
Reinier van der Leer
87a63ecdba first draft (missing migrations) 2026-01-05 15:30:01 +01:00
84 changed files with 6034 additions and 711 deletions

View File

@@ -27,6 +27,103 @@ repos:
exclude: pnpm-lock\.yaml$
stages: [pre-push]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.7.2
hooks:
- id: ruff
name: Lint (Ruff) - AutoGPT Platform - Backend
alias: ruff-lint-platform-backend
files: ^autogpt_platform/backend/
args: [--fix]
- id: ruff
name: Lint (Ruff) - AutoGPT Platform - Libs
alias: ruff-lint-platform-libs
files: ^autogpt_platform/autogpt_libs/
args: [--fix]
- id: ruff-format
name: Format (Ruff) - AutoGPT Platform - Libs
alias: ruff-lint-platform-libs
files: ^autogpt_platform/autogpt_libs/
- repo: local
# isort needs the context of which packages are installed to function, so we
# can't use a vendored isort pre-commit hook (which runs in its own isolated venv).
hooks:
- id: isort
name: Lint (isort) - AutoGPT Platform - Backend
alias: isort-platform-backend
entry: poetry -P autogpt_platform/backend run isort -p backend
files: ^autogpt_platform/backend/
types: [file, python]
language: system
- id: isort
name: Lint (isort) - Classic - AutoGPT
alias: isort-classic-autogpt
entry: poetry -P classic/original_autogpt run isort -p autogpt
files: ^classic/original_autogpt/
types: [file, python]
language: system
- id: isort
name: Lint (isort) - Classic - Forge
alias: isort-classic-forge
entry: poetry -P classic/forge run isort -p forge
files: ^classic/forge/
types: [file, python]
language: system
- id: isort
name: Lint (isort) - Classic - Benchmark
alias: isort-classic-benchmark
entry: poetry -P classic/benchmark run isort -p agbenchmark
files: ^classic/benchmark/
types: [file, python]
language: system
- repo: https://github.com/psf/black
rev: 24.10.0
# Black has sensible defaults, doesn't need package context, and ignores
# everything in .gitignore, so it works fine without any config or arguments.
hooks:
- id: black
name: Format (Black)
- repo: https://github.com/PyCQA/flake8
rev: 7.0.0
# To have flake8 load the config of the individual subprojects, we have to call
# them separately.
hooks:
- id: flake8
name: Lint (Flake8) - Classic - AutoGPT
alias: flake8-classic-autogpt
files: ^classic/original_autogpt/(autogpt|scripts|tests)/
args: [--config=classic/original_autogpt/.flake8]
- id: flake8
name: Lint (Flake8) - Classic - Forge
alias: flake8-classic-forge
files: ^classic/forge/(forge|tests)/
args: [--config=classic/forge/.flake8]
- id: flake8
name: Lint (Flake8) - Classic - Benchmark
alias: flake8-classic-benchmark
files: ^classic/benchmark/(agbenchmark|tests)/((?!reports).)*[/.]
args: [--config=classic/benchmark/.flake8]
- repo: local
hooks:
- id: prettier
name: Format (Prettier) - AutoGPT Platform - Frontend
alias: format-platform-frontend
entry: bash -c 'cd autogpt_platform/frontend && npx prettier --write $(echo "$@" | sed "s|autogpt_platform/frontend/||g")' --
files: ^autogpt_platform/frontend/
types: [file]
language: system
- repo: local
# For proper type checking, all dependencies need to be up-to-date.
# It's also a good idea to check that poetry.lock is consistent with pyproject.toml.
@@ -164,7 +261,7 @@ repos:
entry: >
bash -c '
cd autogpt_platform/backend
&& poetry run export-api-schema --output ../frontend/src/app/api/openapi.json
&& poetry run export-api-schema --api internal --output ../frontend/src/app/api/openapi.json
&& cd ../frontend
&& pnpm prettier --write ./src/app/api/openapi.json
'
@@ -190,103 +287,6 @@ repos:
pass_filenames: false
stages: [pre-commit, post-checkout]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.7.2
hooks:
- id: ruff
name: Lint (Ruff) - AutoGPT Platform - Backend
alias: ruff-lint-platform-backend
files: ^autogpt_platform/backend/
args: [--fix]
- id: ruff
name: Lint (Ruff) - AutoGPT Platform - Libs
alias: ruff-lint-platform-libs
files: ^autogpt_platform/autogpt_libs/
args: [--fix]
- id: ruff-format
name: Format (Ruff) - AutoGPT Platform - Libs
alias: ruff-lint-platform-libs
files: ^autogpt_platform/autogpt_libs/
- repo: local
# isort needs the context of which packages are installed to function, so we
# can't use a vendored isort pre-commit hook (which runs in its own isolated venv).
hooks:
- id: isort
name: Lint (isort) - AutoGPT Platform - Backend
alias: isort-platform-backend
entry: poetry -P autogpt_platform/backend run isort -p backend
files: ^autogpt_platform/backend/
types: [file, python]
language: system
- id: isort
name: Lint (isort) - Classic - AutoGPT
alias: isort-classic-autogpt
entry: poetry -P classic/original_autogpt run isort -p autogpt
files: ^classic/original_autogpt/
types: [file, python]
language: system
- id: isort
name: Lint (isort) - Classic - Forge
alias: isort-classic-forge
entry: poetry -P classic/forge run isort -p forge
files: ^classic/forge/
types: [file, python]
language: system
- id: isort
name: Lint (isort) - Classic - Benchmark
alias: isort-classic-benchmark
entry: poetry -P classic/benchmark run isort -p agbenchmark
files: ^classic/benchmark/
types: [file, python]
language: system
- repo: https://github.com/psf/black
rev: 24.10.0
# Black has sensible defaults, doesn't need package context, and ignores
# everything in .gitignore, so it works fine without any config or arguments.
hooks:
- id: black
name: Format (Black)
- repo: https://github.com/PyCQA/flake8
rev: 7.0.0
# To have flake8 load the config of the individual subprojects, we have to call
# them separately.
hooks:
- id: flake8
name: Lint (Flake8) - Classic - AutoGPT
alias: flake8-classic-autogpt
files: ^classic/original_autogpt/(autogpt|scripts|tests)/
args: [--config=classic/original_autogpt/.flake8]
- id: flake8
name: Lint (Flake8) - Classic - Forge
alias: flake8-classic-forge
files: ^classic/forge/(forge|tests)/
args: [--config=classic/forge/.flake8]
- id: flake8
name: Lint (Flake8) - Classic - Benchmark
alias: flake8-classic-benchmark
files: ^classic/benchmark/(agbenchmark|tests)/((?!reports).)*[/.]
args: [--config=classic/benchmark/.flake8]
- repo: local
hooks:
- id: prettier
name: Format (Prettier) - AutoGPT Platform - Frontend
alias: format-platform-frontend
entry: bash -c 'cd autogpt_platform/frontend && npx prettier --write $(echo "$@" | sed "s|autogpt_platform/frontend/||g")' --
files: ^autogpt_platform/frontend/
types: [file]
language: system
- repo: local
# To have watertight type checking, we check *all* the files in an affected
# project. To trigger on poetry.lock we also reset the file `types` filter.

View File

@@ -5,7 +5,7 @@ from .dependencies import (
requires_admin_user,
requires_user,
)
from .helpers import add_auth_responses_to_openapi
from .jwt_utils import add_auth_responses_to_openapi
from .models import User
__all__ = [

View File

@@ -1,9 +1,9 @@
from fastapi import FastAPI
from .jwt_utils import bearer_jwt_auth
def add_auth_responses_to_openapi(app: FastAPI) -> None:
def add_auth_responses_to_openapi(
app: FastAPI, supported_auth_schemes: list[str]
) -> None:
"""
Patch a FastAPI instance's `openapi()` method to add 401 responses
to all authenticated endpoints.
@@ -29,7 +29,7 @@ def add_auth_responses_to_openapi(app: FastAPI) -> None:
for auth_option in details.get("security", [])
for schema in auth_option.keys()
]
if bearer_jwt_auth.scheme_name not in security_schemas:
if not any(s in security_schemas for s in supported_auth_schemes):
continue
if "responses" not in details:

View File

@@ -8,8 +8,7 @@ from unittest import mock
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
from autogpt_libs.auth.helpers import add_auth_responses_to_openapi
from autogpt_libs.auth.jwt_utils import bearer_jwt_auth
from autogpt_libs.auth.jwt_utils import add_auth_responses_to_openapi, bearer_jwt_auth
def test_add_auth_responses_to_openapi_basic():

View File

@@ -2,7 +2,7 @@ import logging
from typing import Any
import jwt
from fastapi import HTTPException, Security
from fastapi import FastAPI, HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from .config import get_settings
@@ -78,3 +78,12 @@ def verify_user(jwt_payload: dict | None, admin_only: bool) -> User:
raise HTTPException(status_code=403, detail="Admin access required")
return User.from_payload(jwt_payload)
def add_auth_responses_to_openapi(app: FastAPI) -> None:
"""
Add 401 responses to all endpoints that use the bearer JWT authentication scheme.
"""
from .helpers import add_auth_responses_to_openapi
add_auth_responses_to_openapi(app, [bearer_jwt_auth.scheme_name])

View File

@@ -1,21 +1,57 @@
from fastapi import FastAPI
"""
External API Application
This module defines the main FastAPI application for the external API,
which mounts the v1 and v2 sub-applications.
"""
from fastapi import FastAPI
from fastapi.responses import RedirectResponse
from backend.api.middleware.security import SecurityHeadersMiddleware
from backend.monitoring.instrumentation import instrument_fastapi
from .v1.routes import v1_router
from .v1.app import v1_app
from .v2.app import v2_app
DESCRIPTION = """
The external API provides programmatic access to the AutoGPT Platform for building
integrations, automations, and custom applications.
### API Versions
| Version | End of Life | Path | Documentation |
|---------------------|-------------|------------------------|---------------|
| **v2** | | `/external-api/v2/...` | [v2 docs](v2/docs) |
| **v1** (deprecated) | 2025-05-01 | `/external-api/v1/...` | [v1 docs](v1/docs) |
**Recommendation**: New integrations should use v2.
For authentication details and usage examples, see the
[API Integration Guide](https://docs.agpt.co/platform/integrating/api-guide/).
"""
external_api = FastAPI(
title="AutoGPT External API",
description="External API for AutoGPT integrations",
title="AutoGPT Platform API",
summary="External API for AutoGPT Platform integrations",
description=DESCRIPTION,
version="2.0.0",
docs_url="/docs",
version="1.0",
redoc_url="/redoc",
)
external_api.add_middleware(SecurityHeadersMiddleware)
external_api.include_router(v1_router, prefix="/v1")
# Add Prometheus instrumentation
@external_api.get("/", include_in_schema=False)
async def root_redirect() -> RedirectResponse:
"""Redirect root to API documentation."""
return RedirectResponse(url="/docs")
# Mount versioned sub-applications
# Each sub-app has its own /docs page at /v1/docs and /v2/docs
external_api.mount("/v1", v1_app)
external_api.mount("/v2", v2_app)
# Add Prometheus instrumentation to the main app
instrument_fastapi(
external_api,
service_name="external-api",

View File

@@ -1,4 +1,4 @@
from fastapi import HTTPException, Security, status
from fastapi import FastAPI, HTTPException, Security, status
from fastapi.security import APIKeyHeader, HTTPAuthorizationCredentials, HTTPBearer
from prisma.enums import APIKeyPermission
@@ -96,7 +96,9 @@ def require_permission(*permissions: APIKeyPermission):
"""
async def check_permissions(
auth: APIAuthorizationInfo = Security(require_auth),
auth: APIAuthorizationInfo = Security(
require_auth, scopes=[p.value for p in permissions]
),
) -> APIAuthorizationInfo:
missing = [p for p in permissions if p not in auth.scopes]
if missing:
@@ -108,3 +110,15 @@ def require_permission(*permissions: APIKeyPermission):
return auth
return check_permissions
def add_auth_responses_to_openapi(app: FastAPI) -> None:
"""
Add 401 responses to all endpoints secured with `require_auth`,
`require_api_key`, or `require_access_token` middleware.
"""
from autogpt_libs.auth.helpers import add_auth_responses_to_openapi
add_auth_responses_to_openapi(
app, [api_key_header.scheme_name, bearer_auth.scheme_name]
)

View File

@@ -0,0 +1,50 @@
"""
V1 External API Application
This module defines the FastAPI application for the v1 external API.
"""
from fastapi import FastAPI
from backend.api.external.middleware import add_auth_responses_to_openapi
from backend.api.middleware.security import SecurityHeadersMiddleware
from backend.api.utils.exceptions import add_exception_handlers
from backend.api.utils.openapi import sort_openapi
from .routes import v1_router
DESCRIPTION = """
The v1 API provides access to core AutoGPT functionality for external integrations.
For authentication details and usage examples, see the
[API Integration Guide](https://docs.agpt.co/platform/integrating/api-guide/).
"""
v1_app = FastAPI(
title="AutoGPT Platform API",
summary="External API for AutoGPT Platform integrations (v1)",
description=DESCRIPTION,
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
openapi_tags=[
{"name": "user", "description": "User information"},
{"name": "blocks", "description": "Block operations"},
{"name": "graphs", "description": "Graph execution"},
{"name": "store", "description": "Marketplace agents and creators"},
{"name": "integrations", "description": "OAuth credential management"},
{"name": "tools", "description": "AI assistant tools"},
],
)
v1_app.add_middleware(SecurityHeadersMiddleware)
v1_app.include_router(v1_router)
# Mounted sub-apps do NOT inherit exception handlers from the parent app.
add_exception_handlers(v1_app)
# Add 401 responses to authenticated endpoints in OpenAPI spec
add_auth_responses_to_openapi(v1_app)
# Sort OpenAPI schema to eliminate diff on refactors
sort_openapi(v1_app)

View File

@@ -0,0 +1,9 @@
"""
V2 External API
This module provides the v2 external API for programmatic access to the AutoGPT Platform.
"""
from .routes import v2_router
__all__ = ["v2_router"]

View File

@@ -0,0 +1,112 @@
"""
V2 External API Application
This module defines the FastAPI application for the v2 external API.
"""
from fastapi import FastAPI
from backend.api.external.middleware import add_auth_responses_to_openapi
from backend.api.middleware.security import SecurityHeadersMiddleware
from backend.api.utils.exceptions import add_exception_handlers
from backend.api.utils.openapi import sort_openapi
from .mcp_server import create_mcp_app
from .routes import v2_router
DESCRIPTION = """
The v2 API provides comprehensive access to the AutoGPT Platform for building
integrations, automations, and custom applications.
### Key Improvements over v1
- **Consistent naming**: Uses `graph_id`/`graph_version` consistently
- **Better pagination**: All list endpoints support pagination
- **Comprehensive coverage**: Access to library, runs, schedules, credits, and more
- **Human-in-the-loop**: Review and approve agent decisions via the API
For authentication details and usage examples, see the
[API Integration Guide](https://docs.agpt.co/platform/integrating/api-guide/).
### Pagination
List endpoints return paginated responses. Use `page` and `page_size` query
parameters to navigate results. Maximum page size is 100 items.
""".strip()
v2_app = FastAPI(
title="AutoGPT Platform External API",
summary="External API for AutoGPT Platform integrations (v2)",
description=DESCRIPTION,
version="2.0.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
openapi_tags=[
{
"name": "graphs",
"description": "Create, update, and manage agent graphs",
},
{
"name": "schedules",
"description": "Manage scheduled graph executions",
},
{
"name": "blocks",
"description": "Discover available building blocks",
},
{
"name": "search",
"description": "Cross-domain hybrid search across agents, blocks, and docs",
},
{
"name": "marketplace",
"description": "Browse agents and creators, manage submissions",
},
{
"name": "library",
"description": (
"Manage your agent library (agents and presets), "
"execute agents, organize with folders"
),
},
{
"name": "presets",
"description": "Agent execution presets with webhook triggers",
},
{
"name": "runs",
"description": (
"Monitor, stop, delete, and share agent runs; "
"manage human-in-the-loop reviews"
),
},
{
"name": "credits",
"description": "Check balance and view transaction history",
},
{
"name": "integrations",
"description": "List, create, and delete integration credentials",
},
{
"name": "files",
"description": "Upload, list, download, and delete workspace files",
},
],
)
v2_app.add_middleware(SecurityHeadersMiddleware)
v2_app.include_router(v2_router)
# Mounted sub-apps do NOT inherit exception handlers from the parent app,
# so we must register them here for the v2 API specifically.
add_exception_handlers(v2_app)
# Mount MCP server (Copilot tools via Streamable HTTP)
v2_app.mount("/mcp", create_mcp_app())
# Add 401 responses to authenticated endpoints in OpenAPI spec
add_auth_responses_to_openapi(v2_app)
# Sort OpenAPI schema to eliminate diff on refactors
sort_openapi(v2_app)

View File

@@ -0,0 +1,276 @@
"""
Tests for v2 API error handling behavior.
The v2 app registers its own exception handlers (since mounted sub-apps don't
inherit handlers from the parent app). These tests verify that exceptions from
the DB/service layer are correctly mapped to HTTP status codes.
We construct a lightweight test app rather than importing the full v2_app,
because the latter eagerly loads the MCP server, block registry, and other
heavy dependencies that are irrelevant for error handling tests.
"""
import json
from datetime import datetime, timezone
from unittest.mock import AsyncMock
import fastapi
import fastapi.testclient
import pytest
import pytest_mock
from prisma.enums import APIKeyPermission
from pytest_snapshot.plugin import Snapshot
from backend.api.external.middleware import require_auth
from backend.api.utils.exceptions import add_exception_handlers
from backend.data.auth.base import APIAuthorizationInfo
from backend.util.exceptions import DatabaseError, NotFoundError
from .library.agents import agents_router
from .marketplace import marketplace_router
TEST_USER_ID = "test-user-id"
_mock_auth = APIAuthorizationInfo(
user_id=TEST_USER_ID,
scopes=list(APIKeyPermission),
type="api_key",
created_at=datetime.now(tz=timezone.utc),
)
# ---------------------------------------------------------------------------
# Build a lightweight test app with the shared exception handlers
# but only the routers we need for testing.
# ---------------------------------------------------------------------------
app = fastapi.FastAPI()
app.include_router(agents_router, prefix="/library")
app.include_router(marketplace_router, prefix="/marketplace")
add_exception_handlers(app)
@pytest.fixture(autouse=True)
def _override_auth():
"""Bypass API key / OAuth auth for all tests in this module."""
async def fake_auth() -> APIAuthorizationInfo:
return _mock_auth
app.dependency_overrides[require_auth] = fake_auth
yield
app.dependency_overrides.clear()
client = fastapi.testclient.TestClient(app, raise_server_exceptions=False)
# ============================================================================
# NotFoundError → 404
# ============================================================================
def test_not_found_error_returns_404(
mocker: pytest_mock.MockFixture,
snapshot: Snapshot,
) -> None:
"""NotFoundError raised by the DB layer should become a 404 response."""
mocker.patch(
"backend.api.features.library.db.get_library_agent",
new_callable=AsyncMock,
side_effect=NotFoundError("Agent #nonexistent not found"),
)
response = client.get("/library/agents/nonexistent")
assert response.status_code == 404
body = response.json()
assert body["detail"] == "Agent #nonexistent not found"
assert "message" in body
assert body["hint"] == "Adjust the request and retry."
snapshot.snapshot_dir = "snapshots"
snapshot.assert_match(
json.dumps(body, indent=2, sort_keys=True),
"v2_not_found_error_404",
)
def test_not_found_error_on_delete_returns_404(
mocker: pytest_mock.MockFixture,
) -> None:
"""NotFoundError on DELETE should return 404, not 204 or 500."""
mocker.patch(
"backend.api.features.library.db.delete_library_agent",
new_callable=AsyncMock,
side_effect=NotFoundError("Agent #gone not found"),
)
response = client.delete("/library/agents/gone")
assert response.status_code == 404
assert response.json()["detail"] == "Agent #gone not found"
assert "message" in response.json()
def test_not_found_error_on_marketplace_returns_404(
mocker: pytest_mock.MockFixture,
) -> None:
"""NotFoundError from store DB layer should become a 404."""
mocker.patch(
"backend.api.features.store.db.get_store_agent_by_version_id",
new_callable=AsyncMock,
side_effect=NotFoundError("Store listing not found"),
)
response = client.get("/marketplace/agents/by-version/nonexistent")
assert response.status_code == 404
assert response.json()["detail"] == "Store listing not found"
assert "message" in response.json()
# ============================================================================
# ValueError → 400
# ============================================================================
def test_value_error_returns_400(
mocker: pytest_mock.MockFixture,
snapshot: Snapshot,
) -> None:
"""ValueError raised by the service layer should become a 400 response."""
mocker.patch(
"backend.api.features.library.db.update_library_agent",
new_callable=AsyncMock,
side_effect=ValueError("Invalid graph version: -1"),
)
response = client.patch(
"/library/agents/some-id",
json={"graph_version": -1},
)
assert response.status_code == 400
body = response.json()
assert body["detail"] == "Invalid graph version: -1"
assert "message" in body
assert body["hint"] == "Adjust the request and retry."
snapshot.snapshot_dir = "snapshots"
snapshot.assert_match(
json.dumps(body, indent=2, sort_keys=True),
"v2_value_error_400",
)
# ============================================================================
# NotFoundError is a ValueError subclass — verify specificity wins
# ============================================================================
def test_not_found_error_takes_precedence_over_value_error(
mocker: pytest_mock.MockFixture,
) -> None:
"""
NotFoundError(ValueError) should match the NotFoundError handler (404),
not the ValueError handler (400).
"""
mocker.patch(
"backend.api.features.library.db.get_library_agent",
new_callable=AsyncMock,
side_effect=NotFoundError("Specific not found"),
)
response = client.get("/library/agents/test-id")
# Must be 404, not 400
assert response.status_code == 404
# ============================================================================
# Unhandled Exception → 500
# ============================================================================
def test_unhandled_exception_returns_500(
mocker: pytest_mock.MockFixture,
snapshot: Snapshot,
) -> None:
"""
Unexpected exceptions should return a generic 500 without leaking
internal details.
"""
mocker.patch(
"backend.api.features.library.db.get_library_agent",
new_callable=AsyncMock,
side_effect=DatabaseError("connection refused"),
)
response = client.get("/library/agents/some-id")
assert response.status_code == 500
body = response.json()
assert "message" in body
assert "detail" in body
assert body["hint"] == "Check server logs and dependent services."
snapshot.snapshot_dir = "snapshots"
snapshot.assert_match(
json.dumps(body, indent=2, sort_keys=True),
"v2_unhandled_exception_500",
)
def test_runtime_error_returns_500(
mocker: pytest_mock.MockFixture,
) -> None:
"""RuntimeError (not ValueError) should hit the catch-all 500 handler."""
mocker.patch(
"backend.api.features.library.db.delete_library_agent",
new_callable=AsyncMock,
side_effect=RuntimeError("something broke"),
)
response = client.delete("/library/agents/some-id")
assert response.status_code == 500
assert "detail" in response.json()
assert response.json()["hint"] == "Check server logs and dependent services."
# ============================================================================
# Response format consistency
# ============================================================================
def test_all_error_responses_have_consistent_format(
mocker: pytest_mock.MockFixture,
) -> None:
"""All error responses should use {"message": ..., "detail": ..., "hint": ...} format."""
cases = [
(NotFoundError("not found"), 404),
(ValueError("bad value"), 400),
(RuntimeError("boom"), 500),
]
for exc, expected_status in cases:
mocker.patch(
"backend.api.features.library.db.get_library_agent",
new_callable=AsyncMock,
side_effect=exc,
)
response = client.get("/library/agents/test-id")
assert response.status_code == expected_status, (
f"Expected {expected_status} for {type(exc).__name__}, "
f"got {response.status_code}"
)
body = response.json()
assert (
"message" in body
), f"Missing 'message' key for {type(exc).__name__}: {body}"
assert (
"detail" in body
), f"Missing 'detail' key for {type(exc).__name__}: {body}"
assert "hint" in body, f"Missing 'hint' key for {type(exc).__name__}: {body}"

View File

@@ -0,0 +1,68 @@
"""
V2 External API - Blocks Endpoints
Provides read-only access to available building blocks.
"""
import logging
from fastapi import APIRouter, Security
from fastapi.concurrency import run_in_threadpool
from prisma.enums import APIKeyPermission
from backend.api.external.middleware import require_permission
from backend.blocks import get_blocks
from backend.data.auth.base import APIAuthorizationInfo
from backend.util.cache import cached
from .models import BlockInfo
logger = logging.getLogger(__name__)
blocks_router = APIRouter(tags=["blocks"])
# ============================================================================
# Internal Functions
# ============================================================================
def _compute_blocks_sync() -> list[BlockInfo]:
"""
Synchronous function to compute blocks data.
This does the heavy lifting: instantiate 226+ blocks, compute costs, serialize.
"""
return [
BlockInfo.from_internal(block)
for block_class in get_blocks().values()
if not (block := block_class()).disabled
]
@cached(ttl_seconds=3600)
async def _get_cached_blocks() -> list[BlockInfo]:
"""
Async cached function with thundering herd protection.
On cache miss: runs heavy work in thread pool
On cache hit: returns cached list immediately
"""
return await run_in_threadpool(_compute_blocks_sync)
# ============================================================================
# Endpoints
# ============================================================================
@blocks_router.get(
path="",
summary="List available blocks",
operation_id="listAvailableBlocks",
)
async def list_available_blocks(
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_BLOCK)
),
) -> list[BlockInfo]:
"""List all available blocks with their input/output schemas and cost information."""
return await _get_cached_blocks()

View File

@@ -0,0 +1,7 @@
"""
Common utilities for V2 External API
"""
# Constants for pagination
MAX_PAGE_SIZE = 100
DEFAULT_PAGE_SIZE = 20

View File

@@ -0,0 +1,90 @@
"""
V2 External API - Credits Endpoints
Provides access to credit balance and transaction history.
"""
import logging
from typing import Optional
from fastapi import APIRouter, Query, Security
from prisma.enums import APIKeyPermission
from backend.api.external.middleware import require_permission
from backend.data.auth.base import APIAuthorizationInfo
from backend.data.credit import get_user_credit_model
from .common import DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE
from .models import CreditBalance, CreditTransaction, CreditTransactionsResponse
logger = logging.getLogger(__name__)
credits_router = APIRouter(tags=["credits"])
# ============================================================================
# Endpoints
# ============================================================================
@credits_router.get(
path="",
summary="Get credit balance",
operation_id="getCreditBalance",
)
async def get_balance(
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_CREDITS)
),
) -> CreditBalance:
"""Get the current credit balance for the authenticated user."""
user_credit_model = await get_user_credit_model(auth.user_id)
balance = await user_credit_model.get_credits(auth.user_id)
return CreditBalance(balance=balance)
@credits_router.get(
path="/transactions",
summary="Get credit transaction history",
operation_id="listCreditTransactions",
)
async def get_transactions(
page: int = Query(default=1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Items per page (max {MAX_PAGE_SIZE})",
),
transaction_type: Optional[str] = Query(
default=None,
description="Filter by transaction type (TOP_UP, USAGE, GRANT, REFUND)",
),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_CREDITS)
),
) -> CreditTransactionsResponse:
"""Get credit transaction history for the authenticated user."""
user_credit_model = await get_user_credit_model(auth.user_id)
history = await user_credit_model.get_transaction_history(
user_id=auth.user_id,
transaction_count_limit=page_size,
transaction_type=transaction_type,
)
transactions = [CreditTransaction.from_internal(t) for t in history.transactions]
# Note: The current credit module doesn't support true pagination,
# so we're returning what we have
total_count = len(transactions)
total_pages = 1 # Without true pagination support
return CreditTransactionsResponse(
transactions=transactions,
page=page,
page_size=page_size,
total_count=total_count,
total_pages=total_pages,
)

View File

@@ -0,0 +1,341 @@
"""
V2 External API - Files Endpoints
Provides file upload, download, listing, metadata, and deletion functionality.
"""
import base64
import logging
import re
from urllib.parse import quote
from fastapi import APIRouter, File, HTTPException, Query, Security, UploadFile
from fastapi.responses import RedirectResponse, Response
from prisma.enums import APIKeyPermission
from starlette import status
from backend.api.external.middleware import require_permission
from backend.data.auth.base import APIAuthorizationInfo
from backend.data.workspace import (
count_workspace_files,
get_workspace,
get_workspace_file,
list_workspace_files,
soft_delete_workspace_file,
)
from backend.util.cloud_storage import get_cloud_storage_handler
from backend.util.settings import Settings
from backend.util.virus_scanner import scan_content_safe
from backend.util.workspace_storage import get_workspace_storage
from .common import DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE
from .models import (
UploadWorkspaceFileResponse,
WorkspaceFileInfo,
WorkspaceFileListResponse,
)
from .rate_limit import file_upload_limiter
logger = logging.getLogger(__name__)
settings = Settings()
file_workspace_router = APIRouter(tags=["files"])
# ============================================================================
# Endpoints
# ============================================================================
@file_workspace_router.get(
path="",
summary="List workspace files",
operation_id="listWorkspaceFiles",
)
async def list_files(
page: int = Query(default=1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Items per page (max {MAX_PAGE_SIZE})",
),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_FILES)
),
) -> WorkspaceFileListResponse:
"""List files in the user's workspace."""
workspace = await get_workspace(auth.user_id)
if workspace is None:
return WorkspaceFileListResponse(
files=[], page=page, page_size=page_size, total_count=0, total_pages=0
)
total_count = await count_workspace_files(workspace.id)
total_pages = (total_count + page_size - 1) // page_size if total_count > 0 else 0
offset = (page - 1) * page_size
files = await list_workspace_files(
workspace_id=workspace.id,
limit=page_size,
offset=offset,
)
return WorkspaceFileListResponse(
files=[
WorkspaceFileInfo(
id=f.id,
name=f.name,
path=f.path,
mime_type=f.mime_type,
size_bytes=f.size_bytes,
created_at=f.created_at,
updated_at=f.updated_at,
)
for f in files
],
page=page,
page_size=page_size,
total_count=total_count,
total_pages=total_pages,
)
@file_workspace_router.get(
path="/{file_id}",
summary="Get workspace file metadata",
operation_id="getWorkspaceFileInfo",
)
async def get_file(
file_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_FILES)
),
) -> WorkspaceFileInfo:
"""Get metadata for a specific file in the user's workspace."""
workspace = await get_workspace(auth.user_id)
if workspace is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Workspace not found",
)
file = await get_workspace_file(file_id, workspace.id)
if file is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"File #{file_id} not found",
)
return WorkspaceFileInfo(
id=file.id,
name=file.name,
path=file.path,
mime_type=file.mime_type,
size_bytes=file.size_bytes,
created_at=file.created_at,
updated_at=file.updated_at,
)
@file_workspace_router.delete(
path="/{file_id}",
summary="Delete file from workspace",
operation_id="deleteWorkspaceFile",
status_code=status.HTTP_204_NO_CONTENT,
)
async def delete_file(
file_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_FILES)
),
) -> None:
"""Soft-delete a file from the user's workspace."""
workspace = await get_workspace(auth.user_id)
if workspace is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Workspace not found",
)
result = await soft_delete_workspace_file(file_id, workspace.id)
if result is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"File #{file_id} not found",
)
def _create_file_size_error(size_bytes: int, max_size_mb: int) -> HTTPException:
"""Create standardized file size error response."""
return HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
f"File size ({size_bytes} bytes) exceeds "
f"the maximum allowed size of {max_size_mb}MB"
),
)
@file_workspace_router.post(
path="/upload",
summary="Upload file to workspace",
operation_id="uploadWorkspaceFile",
)
async def upload_file(
file: UploadFile = File(...),
expiration_hours: int = Query(
default=24, ge=1, le=48, description="Hours until file expires (1-48)"
),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_FILES)
),
) -> UploadWorkspaceFileResponse:
"""
Upload a file to cloud storage for use with agents.
Returns a `file_uri` that can be passed to agent graph/node file inputs.
Uploaded files are virus-scanned before storage.
"""
file_upload_limiter.check(auth.user_id)
# Check file size limit
max_size_mb = settings.config.upload_file_size_limit_mb
max_size_bytes = max_size_mb * 1024 * 1024
# Try to get file size from headers first
if hasattr(file, "size") and file.size is not None and file.size > max_size_bytes:
raise _create_file_size_error(file.size, max_size_mb)
# Read file content
content = await file.read()
content_size = len(content)
# Double-check file size after reading
if content_size > max_size_bytes:
raise _create_file_size_error(content_size, max_size_mb)
# Extract file info
file_name = file.filename or "uploaded_file"
content_type = file.content_type or "application/octet-stream"
# Virus scan the content
await scan_content_safe(content, filename=file_name)
# Check if cloud storage is configured
cloud_storage = await get_cloud_storage_handler()
if not cloud_storage.config.gcs_bucket_name:
# Fallback to base64 data URI when GCS is not configured
base64_content = base64.b64encode(content).decode("utf-8")
data_uri = f"data:{content_type};base64,{base64_content}"
return UploadWorkspaceFileResponse(
file_uri=data_uri,
file_name=file_name,
size=content_size,
content_type=content_type,
expires_in_hours=expiration_hours,
)
# Store in cloud storage
storage_path = await cloud_storage.store_file(
content=content,
filename=file_name,
expiration_hours=expiration_hours,
user_id=auth.user_id,
)
return UploadWorkspaceFileResponse(
file_uri=storage_path,
file_name=file_name,
size=content_size,
content_type=content_type,
expires_in_hours=expiration_hours,
)
# ============================================================================
# Endpoints - Download
# ============================================================================
def _sanitize_filename_for_header(filename: str) -> str:
"""Sanitize filename for Content-Disposition header."""
sanitized = re.sub(r"[\r\n\x00]", "", filename)
sanitized = sanitized.replace('"', '\\"')
try:
sanitized.encode("ascii")
return f'attachment; filename="{sanitized}"'
except UnicodeEncodeError:
encoded = quote(sanitized, safe="")
return f"attachment; filename*=UTF-8''{encoded}"
@file_workspace_router.get(
path="/{file_id}/download",
summary="Download file from workspace",
operation_id="getWorkspaceFileDownload",
)
async def download_file(
file_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_FILES)
),
) -> Response:
"""Download a file from the user's workspace."""
workspace = await get_workspace(auth.user_id)
if workspace is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Workspace not found",
)
file = await get_workspace_file(file_id, workspace.id)
if file is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"File #{file_id} not found",
)
storage = await get_workspace_storage()
# For local storage, stream directly
if file.storage_path.startswith("local://"):
content = await storage.retrieve(file.storage_path)
return Response(
content=content,
media_type=file.mime_type,
headers={
"Content-Disposition": _sanitize_filename_for_header(file.name),
"Content-Length": str(len(content)),
},
)
# For cloud storage, try signed URL redirect, fall back to streaming
try:
url = await storage.get_download_url(file.storage_path, expires_in=300)
if url.startswith("/api/"):
content = await storage.retrieve(file.storage_path)
return Response(
content=content,
media_type=file.mime_type,
headers={
"Content-Disposition": _sanitize_filename_for_header(file.name),
"Content-Length": str(len(content)),
},
)
return RedirectResponse(url=url, status_code=302)
except Exception:
logger.error(
f"Failed to get download URL for file {file.id}, falling back to stream",
exc_info=True,
)
content = await storage.retrieve(file.storage_path)
return Response(
content=content,
media_type=file.mime_type,
headers={
"Content-Disposition": _sanitize_filename_for_header(file.name),
"Content-Length": str(len(content)),
},
)

View File

@@ -0,0 +1,458 @@
"""
V2 External API - Graphs Endpoints
Provides endpoints for managing agent graphs (CRUD operations).
"""
import logging
from typing import Optional
from uuid import uuid4
from fastapi import APIRouter, HTTPException, Query, Security
from prisma.enums import APIKeyPermission
from starlette import status
from backend.api.external.middleware import require_permission
from backend.api.features.library import db as library_db
from backend.data import graph as graph_db
from backend.data.auth.base import APIAuthorizationInfo
from backend.integrations.webhooks.graph_lifecycle_hooks import (
on_graph_activate,
on_graph_deactivate,
)
from .common import DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE
from .integrations.helpers import get_credential_requirements
from .models import (
BlockInfo,
CredentialRequirementsResponse,
Graph,
GraphCreateRequest,
GraphListResponse,
GraphMeta,
GraphSetActiveVersionRequest,
GraphSettings,
LibraryAgent,
MarketplaceAgentDetails,
)
logger = logging.getLogger(__name__)
graphs_router = APIRouter(tags=["graphs"])
@graphs_router.get(
path="",
summary="List graphs",
operation_id="listGraphs",
)
async def list_graphs(
page: int = Query(default=1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Items per page (max {MAX_PAGE_SIZE})",
),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_GRAPH)
),
) -> GraphListResponse:
"""List all graphs owned by the authenticated user."""
graphs, pagination_info = await graph_db.list_graphs_paginated(
user_id=auth.user_id,
page=page,
page_size=page_size,
filter_by="active",
)
return GraphListResponse(
graphs=[GraphMeta.from_internal(g) for g in graphs],
page=pagination_info.current_page,
page_size=pagination_info.page_size,
total_count=pagination_info.total_items,
total_pages=pagination_info.total_pages,
)
@graphs_router.get(
path="/{graph_id}",
summary="Get graph details",
operation_id="getGraphDetails",
)
async def get_graph(
graph_id: str,
version: Optional[int] = Query(
default=None,
description="Specific version to retrieve (default: active version)",
),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_GRAPH)
),
) -> Graph:
"""
Get detailed information about a specific graph.
Returns the active version by default. Pass `version` to retrieve
a specific version instead.
"""
graph = await graph_db.get_graph(
graph_id,
version,
user_id=auth.user_id,
include_subgraphs=True,
)
if not graph:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph #{graph_id} not found.",
)
return Graph.from_internal(graph)
@graphs_router.post(
path="",
summary="Create graph",
operation_id="createGraph",
)
async def create_graph(
create_graph: GraphCreateRequest,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_GRAPH)
),
) -> Graph:
"""Create a new agent graph."""
from backend.api.features.library import db as library_db
internal_graph = create_graph.to_internal(id=str(uuid4()), version=1)
graph = graph_db.make_graph_model(internal_graph, auth.user_id)
graph.reassign_ids(user_id=auth.user_id, reassign_graph_id=True)
graph.validate_graph(for_run=False)
await graph_db.create_graph(graph, user_id=auth.user_id)
await library_db.create_library_agent(graph, user_id=auth.user_id)
activated_graph = await on_graph_activate(graph, user_id=auth.user_id)
return Graph.from_internal(activated_graph)
@graphs_router.put(
path="/{graph_id}",
summary="Update graph by creating a new version",
operation_id="updateGraphCreateVersion",
)
async def update_graph(
graph_id: str,
update_graph: GraphCreateRequest,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_GRAPH)
),
) -> Graph:
"""
Update a graph by creating a new version.
This does not modify existing versions; it creates a new version
with the provided graph definition.
"""
from backend.api.features.library import db as library_db
existing_versions = await graph_db.get_graph_all_versions(
graph_id, user_id=auth.user_id
)
if not existing_versions:
raise HTTPException(
status.HTTP_404_NOT_FOUND, detail=f"Graph #{graph_id} not found"
)
latest_version_number = max(g.version for g in existing_versions)
internal_graph = update_graph.to_internal(
id=graph_id, version=latest_version_number + 1
)
current_active_version = next((v for v in existing_versions if v.is_active), None)
graph = graph_db.make_graph_model(internal_graph, auth.user_id)
graph.reassign_ids(user_id=auth.user_id, reassign_graph_id=False)
graph.validate_graph(for_run=False)
new_graph_version = await graph_db.create_graph(graph, user_id=auth.user_id)
if new_graph_version.is_active:
await library_db.update_agent_version_in_library(
auth.user_id, new_graph_version.id, new_graph_version.version
)
new_graph_version = await on_graph_activate(
new_graph_version, user_id=auth.user_id
)
await graph_db.set_graph_active_version(
graph_id=graph_id, version=new_graph_version.version, user_id=auth.user_id
)
if current_active_version:
await on_graph_deactivate(current_active_version, user_id=auth.user_id)
new_graph_version_with_subgraphs = await graph_db.get_graph(
graph_id,
new_graph_version.version,
user_id=auth.user_id,
include_subgraphs=True,
)
assert new_graph_version_with_subgraphs
return Graph.from_internal(new_graph_version_with_subgraphs)
# NOTE: we don't expose graph deletion in the UI, so this is commented for now
# @graphs_router.delete(
# path="/{graph_id}",
# summary="Delete graph permanently",
# status_code=status.HTTP_204_NO_CONTENT,
# )
# async def delete_graph(
# graph_id: str,
# auth: APIAuthorizationInfo = Security(
# require_permission(APIKeyPermission.WRITE_GRAPH)
# ),
# ) -> None:
# """
# Permanently delete a graph and all its versions.
# This action cannot be undone. All associated executions will remain
# but will reference a deleted graph.
# """
# if active_version := await graph_db.get_graph(
# graph_id=graph_id, version=None, user_id=auth.user_id
# ):
# await on_graph_deactivate(active_version, user_id=auth.user_id)
# # FIXME: maybe only expose delete for library agents?
# deleted_count = await graph_db.delete_graph(graph_id, user_id=auth.user_id)
# if deleted_count == 0:
# raise HTTPException(
# status_code=status.HTTP_404_NOT_FOUND, detail=f"Graph {graph_id} not found"
# )
@graphs_router.get(
path="/{graph_id}/versions",
summary="List graph versions",
operation_id="listGraphVersions",
)
async def list_graph_versions(
graph_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_GRAPH)
),
) -> list[Graph]:
"""Get all versions of a specific graph."""
graphs = await graph_db.get_graph_all_versions(graph_id, user_id=auth.user_id)
if not graphs:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph #{graph_id} not found.",
)
return [Graph.from_internal(g) for g in graphs]
@graphs_router.put(
path="/{graph_id}/versions/active",
summary="Set active graph version",
operation_id="updateGraphSetActiveVersion",
)
async def set_active_version(
graph_id: str,
request_body: GraphSetActiveVersionRequest,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_GRAPH)
),
) -> None:
"""
Set which version of a graph is the active version.
The active version is the one used when executing the graph
and what is shown to users in the UI.
"""
from backend.api.features.library import db as library_db
new_active_version = request_body.active_graph_version
new_active_graph = await graph_db.get_graph(
graph_id, new_active_version, user_id=auth.user_id
)
if not new_active_graph:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
f"Graph #{graph_id} v{new_active_version} not found",
)
current_active_graph = await graph_db.get_graph(
graph_id=graph_id,
version=None,
user_id=auth.user_id,
)
await on_graph_activate(new_active_graph, user_id=auth.user_id)
await graph_db.set_graph_active_version(
graph_id=graph_id,
version=new_active_version,
user_id=auth.user_id,
)
await library_db.update_agent_version_in_library(
auth.user_id, new_active_graph.id, new_active_graph.version
)
if current_active_graph and current_active_graph.version != new_active_version:
await on_graph_deactivate(current_active_graph, user_id=auth.user_id)
@graphs_router.patch(
path="/{graph_id}/settings",
summary="Update graph settings",
operation_id="updateGraphSettings",
)
async def update_graph_settings(
graph_id: str,
settings: GraphSettings,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_GRAPH)
),
) -> GraphSettings:
"""Update settings for a graph."""
from backend.api.features.library import db as library_db
library_agent = await library_db.get_library_agent_by_graph_id(
graph_id=graph_id, user_id=auth.user_id
)
if not library_agent:
raise HTTPException(
status.HTTP_404_NOT_FOUND, f"Graph #{graph_id} not found in user's library"
)
updated_agent = await library_db.update_library_agent(
user_id=auth.user_id,
library_agent_id=library_agent.id,
settings=settings.to_internal(),
)
return GraphSettings(
human_in_the_loop_safe_mode=updated_agent.settings.human_in_the_loop_safe_mode
)
@graphs_router.get(
path="/{graph_id}/library-agent",
summary="Get library agent for graph",
operation_id="getLibraryAgentForGraph",
)
async def get_library_agent_by_graph(
graph_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_LIBRARY)
),
) -> LibraryAgent:
"""Get the library agent associated with a specific graph."""
agent = await library_db.get_library_agent_by_graph_id(
graph_id=graph_id,
user_id=auth.user_id,
)
if not agent:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"No library agent found for graph #{graph_id}",
)
return LibraryAgent.from_internal(agent)
@graphs_router.get(
path="/{graph_id}/blocks",
summary="List blocks used in a graph",
operation_id="listBlocksInGraph",
)
async def list_graph_blocks(
graph_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_GRAPH)
),
) -> list[BlockInfo]:
"""List the unique blocks used by a graph."""
from backend.blocks import get_block
graph = await graph_db.get_graph(
graph_id,
version=None,
user_id=auth.user_id,
include_subgraphs=True,
)
if not graph:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph #{graph_id} not found.",
)
seen_block_ids: set[str] = set()
blocks: list[BlockInfo] = []
for node in graph.nodes:
if node.block_id in seen_block_ids:
continue
seen_block_ids.add(node.block_id)
block = get_block(node.block_id)
if block and not block.disabled:
blocks.append(BlockInfo.from_internal(block))
return blocks
@graphs_router.get(
path="/{graph_id}/credentials",
summary="Get graph credentials",
operation_id="getCredentialRequirementsForGraph",
)
async def list_graph_credential_requirements(
graph_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_INTEGRATIONS)
),
) -> CredentialRequirementsResponse:
"""List credential requirements for a graph and matching user credentials."""
graph = await graph_db.get_graph(
graph_id=graph_id,
version=None,
user_id=auth.user_id,
include_subgraphs=True,
)
if not graph:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=f"Graph #{graph_id} not found"
)
requirements = await get_credential_requirements(
graph.credentials_input_schema, auth.user_id
)
return CredentialRequirementsResponse(requirements=requirements)
@graphs_router.get(
path="/{graph_id}/marketplace-listing",
summary="Get marketplace listing for graph",
operation_id="getMarketplaceListingForGraph",
)
async def get_marketplace_listing_for_graph(
graph_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_STORE)
),
) -> MarketplaceAgentDetails:
"""Get the marketplace listing for a given graph, if one exists."""
import prisma.models
from backend.api.features.store.model import StoreAgentDetails
agent = await prisma.models.StoreAgent.prisma().find_first(
where={"graph_id": graph_id}
)
if not agent:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"No marketplace listing found for graph {graph_id}",
)
return MarketplaceAgentDetails.from_internal(StoreAgentDetails.from_db(agent))

View File

@@ -0,0 +1,13 @@
"""
V2 External API - Integrations Package
Aggregates all integration-related sub-routers.
"""
from fastapi import APIRouter
from .credentials import credentials_router
integrations_router = APIRouter(tags=["integrations"])
integrations_router.include_router(credentials_router)

View File

@@ -0,0 +1,131 @@
"""
V2 External API - Credential CRUD Endpoints
Provides endpoints for managing integration credentials.
"""
import logging
from typing import Annotated, Optional
from uuid import uuid4
from fastapi import APIRouter, Body, HTTPException, Query, Security
from prisma.enums import APIKeyPermission
from pydantic import SecretStr
from starlette import status
from backend.api.external.middleware import require_permission
from backend.data.auth.base import APIAuthorizationInfo
from backend.data.model import (
APIKeyCredentials,
HostScopedCredentials,
UserPasswordCredentials,
)
from ..models import CredentialCreateRequest, CredentialInfo, CredentialListResponse
from .helpers import creds_manager
logger = logging.getLogger(__name__)
credentials_router = APIRouter()
@credentials_router.get(
path="/credentials",
summary="List integration credentials",
operation_id="listIntegrationCredentials",
)
async def list_credentials(
provider: Optional[str] = Query(
default=None,
description="Filter by provider name (e.g., 'github', 'google')",
),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_INTEGRATIONS)
),
) -> CredentialListResponse:
"""List integration credentials for the authenticated user."""
credentials = await creds_manager.store.get_all_creds(auth.user_id)
if provider:
credentials = [c for c in credentials if c.provider.lower() == provider.lower()]
return CredentialListResponse(
credentials=[CredentialInfo.from_internal(c) for c in credentials]
)
@credentials_router.post(
path="/credentials",
summary="Create integration credential",
operation_id="createIntegrationCredential",
status_code=status.HTTP_201_CREATED,
)
async def create_credential(
request: Annotated[CredentialCreateRequest, Body(discriminator="type")],
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.MANAGE_INTEGRATIONS)
),
) -> CredentialInfo:
"""
Create a new integration credential.
Supports `api_key`, `user_password`, and `host_scoped` credential types.
OAuth credentials must be set up through the web UI.
"""
cred_id = str(uuid4())
if request.type == "api_key":
credentials = APIKeyCredentials(
id=cred_id,
provider=request.provider,
title=request.title,
api_key=SecretStr(request.api_key),
)
elif request.type == "user_password":
credentials = UserPasswordCredentials(
id=cred_id,
provider=request.provider,
title=request.title,
username=SecretStr(request.username),
password=SecretStr(request.password),
)
else:
credentials = HostScopedCredentials(
id=cred_id,
provider=request.provider,
title=request.title,
host=request.host,
headers={k: SecretStr(v) for k, v in request.headers.items()},
)
await creds_manager.create(auth.user_id, credentials)
return CredentialInfo.from_internal(credentials)
@credentials_router.delete(
path="/credentials/{credential_id}",
summary="Delete integration credential",
operation_id="deleteIntegrationCredential",
status_code=status.HTTP_204_NO_CONTENT,
)
async def delete_credential(
credential_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.DELETE_INTEGRATIONS)
),
) -> None:
"""
Delete an integration credential.
Any agents using this credential will fail on their next run.
"""
existing = await creds_manager.store.get_creds_by_id(
user_id=auth.user_id, credentials_id=credential_id
)
if not existing:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Credential #{credential_id} not found",
)
await creds_manager.delete(auth.user_id, credential_id)

View File

@@ -0,0 +1,49 @@
"""
V2 External API - Integration Helpers
Shared logic for credential-related operations.
"""
from backend.integrations.creds_manager import IntegrationCredentialsManager
from ..models import CredentialInfo, CredentialRequirement
creds_manager = IntegrationCredentialsManager()
async def get_credential_requirements(
creds_schema: dict,
user_id: str,
) -> list[CredentialRequirement]:
"""
Extract credential requirements from a graph's credentials input schema
and match them against the user's existing credentials.
"""
all_credentials = await creds_manager.store.get_all_creds(user_id)
requirements = []
for field_name, field_schema in creds_schema.get("properties", {}).items():
providers: list[str] = []
if "anyOf" in field_schema:
for option in field_schema["anyOf"]:
if "provider" in option:
providers.append(option["provider"])
elif "provider" in field_schema:
providers.append(field_schema["provider"])
for provider in providers:
matching = [
CredentialInfo.from_internal(c)
for c in all_credentials
if c.provider.lower() == provider.lower()
]
requirements.append(
CredentialRequirement(
provider=provider,
required_scopes=[],
matching_credentials=matching,
)
)
return requirements

View File

@@ -0,0 +1,17 @@
"""
V2 External API - Library Package
Aggregates all library-related sub-routers (agents, folders, presets).
"""
from fastapi import APIRouter
from .agents import agents_router
from .folders import folders_router
from .presets import presets_router
library_router = APIRouter()
library_router.include_router(agents_router)
library_router.include_router(folders_router)
library_router.include_router(presets_router)

View File

@@ -0,0 +1,239 @@
"""V2 External API - Library Agent Endpoints"""
import logging
from typing import Optional
from fastapi import APIRouter, HTTPException, Query, Security
from prisma.enums import APIKeyPermission
from starlette import status
from backend.api.external.middleware import require_permission
from backend.api.features.library import db as library_db
from backend.data import graph as graph_db
from backend.data.auth.base import APIAuthorizationInfo
from backend.data.credit import get_user_credit_model
from backend.executor import utils as execution_utils
from ..common import DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE
from ..integrations.helpers import get_credential_requirements
from ..models import (
AgentGraphRun,
AgentRunRequest,
CredentialRequirementsResponse,
LibraryAgent,
LibraryAgentListResponse,
LibraryAgentUpdateRequest,
)
from ..rate_limit import execute_limiter
logger = logging.getLogger(__name__)
agents_router = APIRouter(tags=["library"])
# ============================================================================
# Endpoints
# ============================================================================
@agents_router.get(
path="/agents",
summary="List library agents",
operation_id="listLibraryAgents",
)
async def list_library_agents(
published: Optional[bool] = Query(
default=None,
description="Filter by marketplace publish status",
),
favorite: Optional[bool] = Query(
default=None,
description="Filter by `isFavorite` attribute",
),
page: int = Query(default=1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Items per page (max {MAX_PAGE_SIZE})",
),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_LIBRARY)
),
) -> LibraryAgentListResponse:
"""List agents in the user's library."""
result = await library_db.list_library_agents(
user_id=auth.user_id,
page=page,
page_size=page_size,
published=published,
favorite=favorite,
)
return LibraryAgentListResponse(
agents=[LibraryAgent.from_internal(a) for a in result.agents],
page=result.pagination.current_page,
page_size=result.pagination.page_size,
total_count=result.pagination.total_items,
total_pages=result.pagination.total_pages,
)
@agents_router.get(
path="/agents/{agent_id}",
summary="Get library agent",
operation_id="getLibraryAgent",
)
async def get_library_agent(
agent_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_LIBRARY)
),
) -> LibraryAgent:
"""Get detailed information about a specific agent in the user's library."""
agent = await library_db.get_library_agent(
id=agent_id,
user_id=auth.user_id,
)
return LibraryAgent.from_internal(agent)
@agents_router.patch(
path="/agents/{agent_id}",
summary="Update library agent",
operation_id="updateLibraryAgent",
)
async def update_library_agent(
request: LibraryAgentUpdateRequest,
agent_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_LIBRARY)
),
) -> LibraryAgent:
"""Update properties of a library agent."""
updated = await library_db.update_library_agent(
library_agent_id=agent_id,
user_id=auth.user_id,
auto_update_version=request.auto_update_version,
graph_version=request.graph_version,
is_favorite=request.is_favorite,
is_archived=request.is_archived,
folder_id=request.folder_id,
)
return LibraryAgent.from_internal(updated)
@agents_router.delete(
path="/agents/{agent_id}",
summary="Delete library agent",
operation_id="deleteLibraryAgent",
status_code=status.HTTP_204_NO_CONTENT,
)
async def delete_library_agent(
agent_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_LIBRARY)
),
) -> None:
"""Remove an agent from the user's library."""
await library_db.delete_library_agent(
library_agent_id=agent_id,
user_id=auth.user_id,
)
@agents_router.post(
path="/agents/{agent_id}/fork",
summary="Fork library agent",
operation_id="forkLibraryAgent",
status_code=status.HTTP_201_CREATED,
)
async def fork_library_agent(
agent_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_LIBRARY)
),
) -> LibraryAgent:
"""Fork (clone) a library agent.
Creates a deep copy of the agent's underlying graph and all its nodes,
assigning new IDs. The cloned graph is added to the user's library as
an independent agent that can be modified without affecting the original.
"""
forked = await library_db.fork_library_agent(
library_agent_id=agent_id,
user_id=auth.user_id,
)
return LibraryAgent.from_internal(forked)
@agents_router.post(
path="/agents/{agent_id}/runs",
summary="Execute library agent",
operation_id="executeLibraryAgent",
)
async def execute_agent(
request: AgentRunRequest,
agent_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.RUN_AGENT)
),
) -> AgentGraphRun:
"""Execute an agent from the library."""
execute_limiter.check(auth.user_id)
# Check credit balance
user_credit_model = await get_user_credit_model(auth.user_id)
current_balance = await user_credit_model.get_credits(auth.user_id)
if current_balance <= 0:
raise HTTPException(
status_code=status.HTTP_402_PAYMENT_REQUIRED,
detail="Insufficient balance to execute the agent. Please top up your account.",
)
# Get the library agent to find the graph ID and version
library_agent = await library_db.get_library_agent(
id=agent_id,
user_id=auth.user_id,
)
result = await execution_utils.add_graph_execution(
graph_id=library_agent.graph_id,
user_id=auth.user_id,
inputs=request.inputs,
graph_version=library_agent.graph_version,
graph_credentials_inputs=request.credentials_inputs,
)
return AgentGraphRun.from_internal(result)
@agents_router.get(
path="/agents/{agent_id}/credentials",
summary="Get library agent credential requirements",
operation_id="getCredentialRequirementsForLibraryAgent",
)
async def list_agent_credential_requirements(
agent_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_INTEGRATIONS)
),
) -> CredentialRequirementsResponse:
"""List credential requirements and matching user credentials for a library agent."""
library_agent = await library_db.get_library_agent(agent_id, user_id=auth.user_id)
graph = await graph_db.get_graph(
graph_id=library_agent.graph_id,
version=library_agent.graph_version,
user_id=auth.user_id,
include_subgraphs=True,
)
if not graph:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph for agent #{agent_id} not found",
)
requirements = await get_credential_requirements(
graph.credentials_input_schema, auth.user_id
)
return CredentialRequirementsResponse(requirements=requirements)

View File

@@ -0,0 +1,175 @@
"""V2 External API - Library Folder Endpoints"""
import logging
from typing import Optional
from fastapi import APIRouter, Query, Security
from prisma.enums import APIKeyPermission
from starlette import status
from backend.api.external.middleware import require_permission
from backend.api.features.library import db as library_db
from backend.data.auth.base import APIAuthorizationInfo
from ..models import (
LibraryFolder,
LibraryFolderCreateRequest,
LibraryFolderListResponse,
LibraryFolderMoveRequest,
LibraryFolderTree,
LibraryFolderTreeResponse,
LibraryFolderUpdateRequest,
)
logger = logging.getLogger(__name__)
folders_router = APIRouter(tags=["library"])
@folders_router.get(
path="/folders",
summary="List folders in library",
operation_id="listLibraryFolders",
)
async def list_folders(
parent_id: Optional[str] = Query(
default=None, description="Filter by parent folder ID. Omit for root folders."
),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_LIBRARY)
),
) -> LibraryFolderListResponse:
"""List folders in the user's library."""
folders = await library_db.list_folders(
user_id=auth.user_id,
parent_id=parent_id,
)
return LibraryFolderListResponse(
folders=[LibraryFolder.from_internal(f) for f in folders],
)
@folders_router.get(
path="/folders/tree",
summary="Get library folder tree",
operation_id="getLibraryFolderTree",
)
async def get_folder_tree(
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_LIBRARY)
),
) -> LibraryFolderTreeResponse:
"""Get the full folder tree for the user's library."""
tree = await library_db.get_folder_tree(user_id=auth.user_id)
return LibraryFolderTreeResponse(
tree=[LibraryFolderTree.from_internal(f) for f in tree],
)
@folders_router.get(
path="/folders/{folder_id}",
summary="Get folder in library",
operation_id="getLibraryFolder",
)
async def get_folder(
folder_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_LIBRARY)
),
) -> LibraryFolder:
"""Get details of a specific folder."""
folder = await library_db.get_folder(
folder_id=folder_id,
user_id=auth.user_id,
)
return LibraryFolder.from_internal(folder)
@folders_router.post(
path="/folders",
summary="Create folder in library",
operation_id="createLibraryFolder",
status_code=status.HTTP_201_CREATED,
)
async def create_folder(
request: LibraryFolderCreateRequest,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_LIBRARY)
),
) -> LibraryFolder:
"""Create a new folder in the user's library."""
folder = await library_db.create_folder(
user_id=auth.user_id,
name=request.name,
parent_id=request.parent_id,
icon=request.icon,
color=request.color,
)
return LibraryFolder.from_internal(folder)
@folders_router.patch(
path="/folders/{folder_id}",
summary="Update folder in library",
operation_id="updateLibraryFolder",
)
async def update_folder(
request: LibraryFolderUpdateRequest,
folder_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_LIBRARY)
),
) -> LibraryFolder:
"""Update properties of a folder."""
folder = await library_db.update_folder(
folder_id=folder_id,
user_id=auth.user_id,
name=request.name,
icon=request.icon,
color=request.color,
)
return LibraryFolder.from_internal(folder)
@folders_router.post(
path="/folders/{folder_id}/move",
summary="Move folder in library",
operation_id="moveLibraryFolder",
)
async def move_folder(
request: LibraryFolderMoveRequest,
folder_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_LIBRARY)
),
) -> LibraryFolder:
"""Move a folder to a new parent. Set target_parent_id to null to move to root."""
folder = await library_db.move_folder(
folder_id=folder_id,
user_id=auth.user_id,
target_parent_id=request.target_parent_id,
)
return LibraryFolder.from_internal(folder)
@folders_router.delete(
path="/folders/{folder_id}",
summary="Delete folder in library",
operation_id="deleteLibraryFolder",
status_code=status.HTTP_204_NO_CONTENT,
)
async def delete_folder(
folder_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_LIBRARY)
),
) -> None:
"""
Delete a folder and its subfolders. Agents in this folder will be moved to root.
"""
await library_db.delete_folder(
folder_id=folder_id,
user_id=auth.user_id,
)

View File

@@ -0,0 +1,262 @@
"""
V2 External API - Library Preset Endpoints
Provides endpoints for managing agent presets (saved run configurations).
"""
import logging
from typing import Optional
from fastapi import APIRouter, HTTPException, Query, Security
from prisma.enums import APIKeyPermission
from starlette import status
from backend.api.external.middleware import require_permission
from backend.api.features.library import db as library_db
from backend.api.features.library.model import LibraryAgentPresetCreatable
from backend.api.features.library.model import (
TriggeredPresetSetupRequest as _TriggeredPresetSetupRequest,
)
from backend.data.auth.base import APIAuthorizationInfo
from backend.data.credit import get_user_credit_model
from backend.executor import utils as execution_utils
from ..common import DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE
from ..models import (
AgentGraphRun,
AgentPreset,
AgentPresetCreateRequest,
AgentPresetListResponse,
AgentPresetRunRequest,
AgentPresetUpdateRequest,
AgentTriggerSetupRequest,
)
from ..rate_limit import execute_limiter
logger = logging.getLogger(__name__)
presets_router = APIRouter(tags=["library", "presets"])
@presets_router.get(
path="/presets",
summary="List agent execution presets",
operation_id="listAgentRunPresets",
)
async def list_presets(
graph_id: Optional[str] = Query(default=None, description="Filter by graph ID"),
page: int = Query(default=1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Items per page (max {MAX_PAGE_SIZE})",
),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_LIBRARY)
),
) -> AgentPresetListResponse:
"""List presets in the user's library, optionally filtered by graph ID."""
result = await library_db.list_presets(
user_id=auth.user_id,
page=page,
page_size=page_size,
graph_id=graph_id,
)
return AgentPresetListResponse(
presets=[AgentPreset.from_internal(p) for p in result.presets],
page=result.pagination.current_page,
page_size=result.pagination.page_size,
total_count=result.pagination.total_items,
total_pages=result.pagination.total_pages,
)
@presets_router.get(
path="/presets/{preset_id}",
summary="Get agent execution preset",
operation_id="getAgentRunPreset",
)
async def get_preset(
preset_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_LIBRARY)
),
) -> AgentPreset:
"""Get details of a specific preset."""
preset = await library_db.get_preset(
user_id=auth.user_id,
preset_id=preset_id,
)
if not preset:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Preset #{preset_id} not found",
)
return AgentPreset.from_internal(preset)
@presets_router.post(
path="/presets",
summary="Create agent execution preset",
operation_id="createAgentRunPreset",
status_code=status.HTTP_201_CREATED,
)
async def create_preset(
request: AgentPresetCreateRequest,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_LIBRARY)
),
) -> AgentPreset:
"""Create a new preset with saved inputs and credentials for an agent."""
creatable = LibraryAgentPresetCreatable(
graph_id=request.graph_id,
graph_version=request.graph_version,
name=request.name,
description=request.description,
inputs=request.inputs,
credentials=request.credentials,
is_active=request.is_active,
)
preset = await library_db.create_preset(
user_id=auth.user_id,
preset=creatable,
)
return AgentPreset.from_internal(preset)
@presets_router.post(
path="/presets/setup-trigger",
summary="Setup triggered preset",
operation_id="setupAgentRunTrigger",
status_code=status.HTTP_201_CREATED,
)
async def setup_trigger(
request: AgentTriggerSetupRequest,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_LIBRARY)
),
) -> AgentPreset:
"""
Create a preset with a webhook trigger for automatic execution.
The agent's `trigger_setup_info` describes the required trigger configuration
schema and credentials. Use it to populate `trigger_config` and
`agent_credentials`.
"""
# Use internal trigger setup endpoint to avoid logic duplication:
from backend.api.features.library.routes.presets import (
setup_trigger as _internal_setup_trigger,
)
internal_request = _TriggeredPresetSetupRequest(
name=request.name,
description=request.description,
graph_id=request.graph_id,
graph_version=request.graph_version,
trigger_config=request.trigger_config,
agent_credentials=request.agent_credentials,
)
preset = await _internal_setup_trigger(
params=internal_request,
user_id=auth.user_id,
)
return AgentPreset.from_internal(preset)
@presets_router.patch(
path="/presets/{preset_id}",
operation_id="updateAgentRunPreset",
summary="Update agent execution preset",
)
async def update_preset(
request: AgentPresetUpdateRequest,
preset_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_LIBRARY)
),
) -> AgentPreset:
"""Update properties of a preset. Only provided fields will be updated."""
preset = await library_db.update_preset(
user_id=auth.user_id,
preset_id=preset_id,
name=request.name,
description=request.description,
inputs=request.inputs,
credentials=request.credentials,
is_active=request.is_active,
)
return AgentPreset.from_internal(preset)
@presets_router.delete(
path="/presets/{preset_id}",
summary="Delete agent execution preset",
operation_id="deleteAgentRunPreset",
status_code=status.HTTP_204_NO_CONTENT,
)
async def delete_preset(
preset_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_LIBRARY)
),
) -> None:
"""Delete a preset."""
await library_db.delete_preset(
user_id=auth.user_id,
preset_id=preset_id,
)
@presets_router.post(
path="/presets/{preset_id}/execute",
summary="Execute agent preset",
operation_id="executeAgentRunPreset",
)
async def execute_preset(
preset_id: str,
request: AgentPresetRunRequest = AgentPresetRunRequest(),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.RUN_AGENT)
),
) -> AgentGraphRun:
"""Execute a preset, optionally overriding saved inputs and credentials."""
execute_limiter.check(auth.user_id)
# Check credit balance
user_credit_model = await get_user_credit_model(auth.user_id)
current_balance = await user_credit_model.get_credits(auth.user_id)
if current_balance <= 0:
raise HTTPException(
status_code=status.HTTP_402_PAYMENT_REQUIRED,
detail="Insufficient balance to execute the agent. Please top up your account.",
)
# Fetch preset
preset = await library_db.get_preset(
user_id=auth.user_id,
preset_id=preset_id,
)
if not preset:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Preset #{preset_id} not found",
)
# Merge preset inputs with overrides
merged_inputs = {**preset.inputs, **request.inputs}
merged_credentials = {**preset.credentials, **request.credentials_inputs}
result = await execution_utils.add_graph_execution(
graph_id=preset.graph_id,
user_id=auth.user_id,
inputs=merged_inputs,
graph_version=preset.graph_version,
graph_credentials_inputs=merged_credentials,
preset_id=preset_id,
)
return AgentGraphRun.from_internal(result)

View File

@@ -0,0 +1,443 @@
"""
V2 External API - Marketplace Endpoints
Provides access to the agent marketplace (store).
"""
import logging
import urllib.parse
from typing import Literal, Optional
from fastapi import APIRouter, File, HTTPException, Path, Query, Security, UploadFile
from prisma.enums import APIKeyPermission
from starlette import status
from backend.api.external.middleware import require_auth, require_permission
from backend.api.features.store import cache as store_cache
from backend.api.features.store import db as store_db
from backend.api.features.store import media as store_media
from backend.api.features.store.db import (
StoreAgentsSortOptions,
StoreCreatorsSortOptions,
)
from backend.data.auth.base import APIAuthorizationInfo
from backend.util.virus_scanner import scan_content_safe
from .common import DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE
from .models import (
LibraryAgent,
MarketplaceAgent,
MarketplaceAgentDetails,
MarketplaceAgentListResponse,
MarketplaceAgentSubmission,
MarketplaceAgentSubmissionCreateRequest,
MarketplaceAgentSubmissionEditRequest,
MarketplaceAgentSubmissionsListResponse,
MarketplaceCreatorDetails,
MarketplaceCreatorsResponse,
MarketplaceMediaUploadResponse,
MarketplaceUserProfile,
MarketplaceUserProfileUpdateRequest,
)
from .rate_limit import media_upload_limiter
logger = logging.getLogger(__name__)
marketplace_router = APIRouter(tags=["marketplace"])
# ============================================================================
# Agents
# ============================================================================
@marketplace_router.get(
path="/agents",
summary="List or search marketplace agents",
operation_id="listMarketplaceAgents",
)
async def list_agents(
featured: bool = Query(
default=False, description="Filter to only show featured agents"
),
creator: Optional[str] = Query(
default=None, description="Filter by creator username"
),
category: Optional[str] = Query(default=None, description="Filter by category"),
search_query: Optional[str] = Query(
default=None, description="Literal + semantic search on names and descriptions"
),
sorted_by: Optional[Literal["rating", "runs", "name", "updated_at"]] = Query(
default=None,
description="Property to sort results by. Ignored if search_query is provided.",
),
page: int = Query(ge=1, default=1),
page_size: int = Query(ge=1, le=MAX_PAGE_SIZE, default=DEFAULT_PAGE_SIZE),
# This data is public, but we still require auth for access tracking and rate limits
auth: APIAuthorizationInfo = Security(require_auth),
) -> MarketplaceAgentListResponse:
"""List agents available in the marketplace, with optional filtering and sorting."""
result = await store_cache._get_cached_store_agents(
featured=featured,
creator=creator,
sorted_by=StoreAgentsSortOptions(sorted_by) if sorted_by else None,
search_query=search_query,
category=category,
page=page,
page_size=page_size,
)
return MarketplaceAgentListResponse(
agents=[MarketplaceAgent.from_internal(a) for a in result.agents],
page=result.pagination.current_page,
page_size=result.pagination.page_size,
total_count=result.pagination.total_items,
total_pages=result.pagination.total_pages,
)
@marketplace_router.get(
path="/agents/by-version/{version_id}",
summary="Get marketplace agent by version ID",
operation_id="getMarketplaceAgentByListingVersion",
)
async def get_agent_by_version(
version_id: str,
# This data is public, but we still require auth for access tracking and rate limits
auth: APIAuthorizationInfo = Security(require_auth),
) -> MarketplaceAgentDetails:
"""Get details of a marketplace agent by its store listing version ID."""
agent = await store_db.get_store_agent_by_version_id(version_id)
return MarketplaceAgentDetails.from_internal(agent)
@marketplace_router.get(
path="/agents/{username}/{agent_name}",
summary="Get marketplace agent details",
operation_id="getMarketplaceAgent",
)
async def get_agent_details(
username: str,
agent_name: str,
# This data is public, but we still require auth for access tracking and rate limits
auth: APIAuthorizationInfo = Security(require_auth),
) -> MarketplaceAgentDetails:
"""Get details of a specific marketplace agent."""
username = urllib.parse.unquote(username).lower()
agent_name = urllib.parse.unquote(agent_name).lower()
agent = await store_cache._get_cached_agent_details(
username=username, agent_name=agent_name
)
return MarketplaceAgentDetails.from_internal(agent)
@marketplace_router.post(
path="/agents/{username}/{agent_name}/add-to-library",
summary="Add marketplace agent to library",
operation_id="addMarketplaceAgentToLibrary",
status_code=status.HTTP_201_CREATED,
)
async def add_agent_to_library(
username: str,
agent_name: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_LIBRARY)
),
) -> LibraryAgent:
"""Add a marketplace agent to the authenticated user's library."""
from backend.api.features.library import db as library_db
username = urllib.parse.unquote(username).lower()
agent_name = urllib.parse.unquote(agent_name).lower()
agent_details = await store_cache._get_cached_agent_details(
username=username, agent_name=agent_name
)
agent = await library_db.add_store_agent_to_library(
store_listing_version_id=agent_details.store_listing_version_id,
user_id=auth.user_id,
)
return LibraryAgent.from_internal(agent)
# ============================================================================
# Creators
# ============================================================================
@marketplace_router.get(
path="/creators",
summary="List marketplace creators",
operation_id="listMarketplaceCreators",
)
async def list_creators(
featured: bool = Query(
default=False, description="Filter to featured creators only"
),
search_query: Optional[str] = Query(
default=None, description="Literal + semantic search on names and descriptions"
),
sorted_by: Optional[Literal["agent_rating", "agent_runs", "num_agents"]] = Query(
default=None, description="Sort field"
),
page: int = Query(ge=1, default=1),
page_size: int = Query(ge=1, le=MAX_PAGE_SIZE, default=DEFAULT_PAGE_SIZE),
# This data is public, but we still require auth for access tracking and rate limits
auth: APIAuthorizationInfo = Security(require_auth),
) -> MarketplaceCreatorsResponse:
"""List or search marketplace creators."""
result = await store_cache._get_cached_store_creators(
featured=featured,
search_query=search_query,
sorted_by=StoreCreatorsSortOptions(sorted_by) if sorted_by else None,
page=page,
page_size=page_size,
)
return MarketplaceCreatorsResponse(
creators=[MarketplaceCreatorDetails.from_internal(c) for c in result.creators],
page=result.pagination.current_page,
page_size=result.pagination.page_size,
total_count=result.pagination.total_items,
total_pages=result.pagination.total_pages,
)
@marketplace_router.get(
path="/creators/{username}",
summary="Get marketplace creator details",
operation_id="getMarketplaceCreator",
)
async def get_creator_details(
username: str,
# This data is public, but we still require auth for access tracking and rate limits
auth: APIAuthorizationInfo = Security(require_auth),
) -> MarketplaceCreatorDetails:
"""Get a marketplace creator's profile w/ stats."""
username = urllib.parse.unquote(username).lower()
creator = await store_cache._get_cached_creator_details(username=username)
return MarketplaceCreatorDetails.from_internal(creator)
# ============================================================================
# Profile
# ============================================================================
@marketplace_router.get(
path="/profile",
summary="Get my marketplace profile",
operation_id="getMarketplaceMyProfile",
)
async def get_profile(
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_STORE)
),
) -> MarketplaceCreatorDetails:
"""Get the authenticated user's marketplace profile w/ creator stats."""
profile = await store_db.get_user_profile(auth.user_id)
if not profile:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Profile not found",
)
creator = await store_cache._get_cached_creator_details(username=profile.username)
return MarketplaceCreatorDetails.from_internal(creator)
@marketplace_router.patch(
path="/profile",
summary="Update my marketplace profile",
operation_id="updateMarketplaceMyProfile",
)
async def update_profile(
request: MarketplaceUserProfileUpdateRequest,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_STORE)
),
) -> MarketplaceUserProfile:
"""Update the authenticated user's marketplace profile."""
from backend.api.features.store.model import ProfileUpdateRequest
profile = ProfileUpdateRequest(
name=request.name,
username=request.username,
description=request.description,
links=request.links,
avatar_url=request.avatar_url,
)
updated_profile = await store_db.update_profile(auth.user_id, profile)
return MarketplaceUserProfile.from_internal(updated_profile)
# ============================================================================
# Submissions
# ============================================================================
@marketplace_router.get(
path="/submissions",
summary="List my marketplace submissions",
operation_id="listMarketplaceSubmissions",
)
async def list_submissions(
page: int = Query(ge=1, default=1),
page_size: int = Query(ge=1, le=MAX_PAGE_SIZE, default=DEFAULT_PAGE_SIZE),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_STORE)
),
) -> MarketplaceAgentSubmissionsListResponse:
"""List the authenticated user's marketplace listing submissions."""
result = await store_db.get_store_submissions(
user_id=auth.user_id,
page=page,
page_size=page_size,
)
return MarketplaceAgentSubmissionsListResponse(
submissions=[
MarketplaceAgentSubmission.from_internal(s) for s in result.submissions
],
page=result.pagination.current_page,
page_size=result.pagination.page_size,
total_count=result.pagination.total_items,
total_pages=result.pagination.total_pages,
)
@marketplace_router.post(
path="/submissions",
summary="Create marketplace submission",
operation_id="createMarketplaceSubmission",
)
async def create_submission(
request: MarketplaceAgentSubmissionCreateRequest,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_STORE)
),
) -> MarketplaceAgentSubmission:
"""Submit a new marketplace listing for review."""
submission = await store_db.create_store_submission(
user_id=auth.user_id,
graph_id=request.graph_id,
graph_version=request.graph_version,
slug=request.slug,
name=request.name,
sub_heading=request.sub_heading,
description=request.description,
instructions=request.instructions,
categories=request.categories,
image_urls=request.image_urls,
video_url=request.video_url,
agent_output_demo_url=request.agent_output_demo_url,
changes_summary=request.changes_summary or "Initial Submission",
recommended_schedule_cron=request.recommended_schedule_cron,
)
return MarketplaceAgentSubmission.from_internal(submission)
@marketplace_router.put(
path="/submissions/{version_id}",
summary="Edit marketplace submission",
operation_id="updateMarketplaceSubmission",
)
async def edit_submission(
request: MarketplaceAgentSubmissionEditRequest,
version_id: str = Path(description="Store listing version ID"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_STORE)
),
) -> MarketplaceAgentSubmission:
"""Update a pending marketplace listing submission."""
try:
submission = await store_db.edit_store_submission(
user_id=auth.user_id,
store_listing_version_id=version_id,
name=request.name,
sub_heading=request.sub_heading,
description=request.description,
image_urls=request.image_urls,
video_url=request.video_url,
agent_output_demo_url=request.agent_output_demo_url,
categories=request.categories,
changes_summary=request.changes_summary,
recommended_schedule_cron=request.recommended_schedule_cron,
instructions=request.instructions,
)
except Exception as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
return MarketplaceAgentSubmission.from_internal(submission)
@marketplace_router.delete(
path="/submissions/{version_id}",
summary="Delete marketplace submission",
operation_id="deleteMarketplaceSubmission",
)
async def delete_submission(
version_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_STORE)
),
) -> None:
"""Delete a marketplace listing submission. Approved listings can not be deleted."""
success = await store_db.delete_store_submission(
user_id=auth.user_id,
store_listing_version_id=version_id,
)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Submission #{version_id} not found",
)
# ============================================================================
# Submission Media
# ============================================================================
@marketplace_router.post(
path="/submissions/media",
summary="Upload marketplace submission media",
operation_id="uploadMarketplaceSubmissionMedia",
)
async def upload_submission_media(
file: UploadFile = File(...),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_STORE)
),
) -> MarketplaceMediaUploadResponse:
"""Upload an image or video for a marketplace submission. Max size: 10MB."""
media_upload_limiter.check(auth.user_id)
max_size = 10 * 1024 * 1024 # 10MB limit for external API
content = await file.read()
if len(content) > max_size:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File size ({len(content)} bytes) exceeds the 10MB limit",
)
# Virus scan
await scan_content_safe(content, filename=file.filename or "upload")
# Reset file position for store_media to read
await file.seek(0)
url = await store_media.upload_media(
user_id=auth.user_id,
file=file,
)
return MarketplaceMediaUploadResponse(url=url)

View File

@@ -0,0 +1,197 @@
"""
V2 External API - MCP Server Endpoint
Exposes the platform's Copilot tools as an MCP (Model Context Protocol) server,
allowing external MCP clients (Claude Desktop, Cursor, etc.) to interact with
agents, runs, library, and other platform features programmatically.
Uses Streamable HTTP transport with stateless sessions, authenticated via the
same API key / OAuth bearer token mechanism as the rest of the external API.
"""
import logging
from typing import Any, Sequence
import pydantic
from mcp.server.auth.middleware.auth_context import get_access_token
from mcp.server.auth.provider import AccessToken, TokenVerifier
from mcp.server.auth.settings import AuthSettings
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp.server import Context
from mcp.server.fastmcp.tools.base import Tool as MCPTool
from mcp.server.fastmcp.utilities.func_metadata import ArgModelBase, FuncMetadata
from prisma.enums import APIKeyPermission
from pydantic import AnyHttpUrl
from starlette.applications import Starlette
from backend.copilot.model import ChatSession
from backend.copilot.sdk.tool_adapter import _build_input_schema, _execute_tool_sync
from backend.copilot.tools import TOOL_REGISTRY
from backend.copilot.tools.base import BaseTool
from backend.data.auth.api_key import validate_api_key
from backend.data.auth.oauth import (
InvalidClientError,
InvalidTokenError,
validate_access_token,
)
from backend.util.settings import Settings
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Server factory
# ---------------------------------------------------------------------------
def create_mcp_server() -> FastMCP:
"""Create the MCP server with all eligible Copilot tools registered."""
settings = Settings()
base_url = settings.config.platform_base_url or "https://platform.agpt.co"
server = FastMCP(
name="autogpt-platform",
instructions=(
"AutoGPT Platform MCP Server. "
"Use these tools to find, create, run, and manage AI agents."
),
token_verifier=ExternalAPITokenVerifier(),
auth=AuthSettings(
issuer_url=AnyHttpUrl(base_url),
resource_server_url=AnyHttpUrl(f"{base_url}/external-api/v2/mcp"),
),
stateless_http=True,
streamable_http_path="/",
)
registered: list[str] = []
for tool in TOOL_REGISTRY.values():
allowed, required_perms = tool.allow_external_use
if not allowed or required_perms is None:
logger.debug(f"Skipping MCP tool {tool.name} (not allowed externally)")
continue
_register_tool(server, tool, required_perms)
registered.append(tool.name)
logger.info(f"MCP server created with {len(registered)} tools: {registered}")
return server
def create_mcp_app() -> Starlette:
"""Create the Starlette ASGI app for the MCP server."""
server = create_mcp_server()
return server.streamable_http_app()
# ---------------------------------------------------------------------------
# Token verification — reuses existing external API auth infrastructure
# ---------------------------------------------------------------------------
class ExternalAPITokenVerifier(TokenVerifier):
"""Validates API keys and OAuth tokens via external API auth."""
async def verify_token(self, token: str) -> AccessToken | None:
# Try API key first
api_key_info = await validate_api_key(token)
if api_key_info:
return AccessToken(
token=token,
client_id=api_key_info.user_id,
scopes=[s.value for s in api_key_info.scopes],
)
# Try OAuth bearer token
try:
token_info, _ = await validate_access_token(token)
return AccessToken(
token=token,
client_id=token_info.user_id,
scopes=[s.value for s in token_info.scopes],
)
except (InvalidClientError, InvalidTokenError):
return None
# ---------------------------------------------------------------------------
# Tool registration
# ---------------------------------------------------------------------------
def _create_tool_handler(
tool: BaseTool,
required_scopes: Sequence[str],
):
"""Create an async MCP tool handler that wraps a BaseTool subclass.
The handler checks that the caller's API key / OAuth token
has all `required_scopes` before executing the tool.
"""
async def handler(ctx: Context, **kwargs: Any) -> str:
access_token = get_access_token()
if not access_token:
return "Authentication required"
# Enforce per-tool permission scopes
if required_scopes:
missing = [s for s in required_scopes if s not in access_token.scopes]
if missing:
return f"Missing required permission(s): " f"{', '.join(missing)}"
user_id = access_token.client_id
session = ChatSession.new(user_id)
result = await _execute_tool_sync(tool, user_id, session, kwargs)
parts = []
for block in result.get("content", []):
if block.get("type") == "text":
parts.append(block["text"])
return "\n".join(parts) if parts else ""
return handler
def _register_tool(
server: FastMCP, tool: BaseTool, required_perms: Sequence[APIKeyPermission]
) -> None:
"""Register a Copilot tool on the MCP server."""
required_scopes = [p.value for p in required_perms]
handler = _create_tool_handler(tool, required_scopes)
mcp_tool = MCPTool(
fn=handler,
name=tool.name,
title=None,
description=tool.description,
parameters=_build_input_schema(tool),
fn_metadata=_PASSTHROUGH_META,
is_async=True,
context_kwarg="ctx",
annotations=None,
)
server._tool_manager._tools[tool.name] = mcp_tool
# ---------------------------------------------------------------------------
# Passthrough arg model — lets us specify JSON Schema directly instead of
# having FastMCP introspect the handler function's signature.
# ---------------------------------------------------------------------------
class _PassthroughArgs(ArgModelBase):
"""Accepts any fields and passes them through as kwargs."""
model_config = pydantic.ConfigDict(extra="allow")
def model_dump_one_level(self, **_kwargs: Any) -> dict[str, Any]:
return dict(self.__pydantic_extra__ or {})
_PASSTHROUGH_META = FuncMetadata(
arg_model=_PassthroughArgs,
output_schema=None,
output_model=None,
wrap_output=False,
)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,43 @@
"""
V2 External API - Rate Limiting
Simple in-memory sliding window rate limiter per user.
"""
import time
from collections import defaultdict
from fastapi import HTTPException
class RateLimiter:
"""Sliding window rate limiter."""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self._requests: dict[str, list[float]] = defaultdict(list)
def check(self, key: str) -> None:
"""Check if the request is within rate limits. Raises 429 if exceeded."""
now = time.monotonic()
cutoff = now - self.window_seconds
# Remove expired timestamps
timestamps = self._requests[key]
self._requests[key] = [t for t in timestamps if t > cutoff]
if len(self._requests[key]) >= self.max_requests:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Max {self.max_requests} requests per {self.window_seconds}s.",
)
self._requests[key].append(now)
# Pre-configured rate limiters for specific endpoints
media_upload_limiter = RateLimiter(max_requests=10, window_seconds=300) # 10 / 5min
search_limiter = RateLimiter(max_requests=30, window_seconds=60) # 30 / min
execute_limiter = RateLimiter(max_requests=60, window_seconds=60) # 60 / min
file_upload_limiter = RateLimiter(max_requests=20, window_seconds=300) # 20 / 5min

View File

@@ -0,0 +1,33 @@
"""
V2 External API Routes
This module defines the main v2 router that aggregates all v2 API endpoints.
"""
from fastapi import APIRouter
from .blocks import blocks_router
from .credits import credits_router
from .files import file_workspace_router
from .graphs import graphs_router
from .integrations import integrations_router
from .library import library_router
from .marketplace import marketplace_router
from .runs import runs_router
from .schedules import graph_schedules_router, schedules_router
from .search import search_router
v2_router = APIRouter()
# Include all sub-routers
v2_router.include_router(blocks_router, prefix="/blocks")
v2_router.include_router(credits_router, prefix="/credits")
v2_router.include_router(file_workspace_router, prefix="/files")
v2_router.include_router(graph_schedules_router, prefix="/graphs")
v2_router.include_router(graphs_router, prefix="/graphs")
v2_router.include_router(integrations_router, prefix="/integrations")
v2_router.include_router(library_router, prefix="/library")
v2_router.include_router(marketplace_router, prefix="/marketplace")
v2_router.include_router(runs_router, prefix="/runs")
v2_router.include_router(schedules_router, prefix="/schedules")
v2_router.include_router(search_router, prefix="/search")

View File

@@ -0,0 +1,345 @@
"""
V2 External API - Runs Endpoints
Provides access to agent runs and human-in-the-loop reviews.
"""
import logging
import uuid
from datetime import datetime, timezone
from typing import Optional
from fastapi import APIRouter, HTTPException, Path, Query, Security
from prisma.enums import APIKeyPermission, ReviewStatus
from pydantic import JsonValue
from starlette import status
from backend.api.external.middleware import require_permission
from backend.data import execution as execution_db
from backend.data import human_review as review_db
from backend.data.auth.base import APIAuthorizationInfo
from backend.executor import utils as execution_utils
from backend.util.settings import Settings
from .common import DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE
from .models import (
AgentGraphRun,
AgentGraphRunDetails,
AgentRunListResponse,
AgentRunReview,
AgentRunReviewsResponse,
AgentRunReviewsSubmitRequest,
AgentRunReviewsSubmitResponse,
AgentRunShareResponse,
)
logger = logging.getLogger(__name__)
settings = Settings()
runs_router = APIRouter(tags=["runs"])
# ============================================================================
# Endpoints - Runs
# ============================================================================
@runs_router.get(
path="",
summary="List agent runs",
operation_id="listAgentRuns",
)
async def list_runs(
graph_id: Optional[str] = Query(default=None, description="Filter by graph ID"),
page: int = Query(default=1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Items per page (max {MAX_PAGE_SIZE})",
),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_RUN)
),
) -> AgentRunListResponse:
"""List agent runs, optionally filtered by graph ID."""
result = await execution_db.get_graph_executions_paginated(
user_id=auth.user_id,
graph_id=graph_id,
page=page,
page_size=page_size,
)
return AgentRunListResponse(
runs=[AgentGraphRun.from_internal(e) for e in result.executions],
page=result.pagination.current_page,
page_size=result.pagination.page_size,
total_count=result.pagination.total_items,
total_pages=result.pagination.total_pages,
)
@runs_router.get(
path="/{run_id}",
summary="Get agent run details",
operation_id="getAgentRunDetails",
)
async def get_run(
run_id: str = Path(description="Graph Execution ID"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_RUN)
),
) -> AgentGraphRunDetails:
"""Get detailed information about a specific run."""
result = await execution_db.get_graph_execution(
user_id=auth.user_id,
execution_id=run_id,
include_node_executions=True,
)
if not result:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Run #{run_id} not found",
)
return AgentGraphRunDetails.from_internal(result)
@runs_router.post(
path="/{run_id}/stop",
summary="Stop agent run",
operation_id="stopAgentRun",
)
async def stop_run(
run_id: str = Path(description="Graph Execution ID"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_RUN)
),
) -> AgentGraphRun:
"""
Stop a running execution.
Only runs with status QUEUED or RUNNING can be stopped.
"""
# Verify the run exists and belongs to the user
exec = await execution_db.get_graph_execution(
user_id=auth.user_id,
execution_id=run_id,
)
if not exec:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Run #{run_id} not found",
)
# Stop the execution
await execution_utils.stop_graph_execution(
graph_exec_id=run_id,
user_id=auth.user_id,
)
# Fetch updated execution
updated_exec = await execution_db.get_graph_execution(
user_id=auth.user_id,
execution_id=run_id,
)
if not updated_exec:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Run #{run_id} not found",
)
return AgentGraphRun.from_internal(updated_exec)
@runs_router.delete(
path="/{run_id}",
summary="Delete agent run",
operation_id="deleteAgentRun",
)
async def delete_run(
run_id: str = Path(description="Graph Execution ID"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_RUN)
),
) -> None:
"""Delete an agent run."""
await execution_db.delete_graph_execution(
graph_exec_id=run_id,
user_id=auth.user_id,
)
# ============================================================================
# Endpoints - Sharing
# ============================================================================
@runs_router.post(
path="/{run_id}/share",
summary="Enable sharing for an agent run",
operation_id="enableAgentRunShare",
)
async def enable_sharing(
run_id: str = Path(description="Graph Execution ID"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_RUN, APIKeyPermission.SHARE_RUN)
),
) -> AgentRunShareResponse:
"""Enable public sharing for a run."""
execution = await execution_db.get_graph_execution(
user_id=auth.user_id,
execution_id=run_id,
)
if not execution:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Run #{run_id} not found",
)
share_token = str(uuid.uuid4())
await execution_db.update_graph_execution_share_status(
execution_id=run_id,
user_id=auth.user_id,
is_shared=True,
share_token=share_token,
shared_at=datetime.now(timezone.utc),
)
frontend_url = settings.config.frontend_base_url or "http://localhost:3000"
share_url = f"{frontend_url}/share/{share_token}"
return AgentRunShareResponse(share_url=share_url, share_token=share_token)
@runs_router.delete(
path="/{run_id}/share",
summary="Disable sharing for an agent run",
operation_id="disableAgentRunShare",
status_code=status.HTTP_204_NO_CONTENT,
)
async def disable_sharing(
run_id: str = Path(description="Graph Execution ID"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.SHARE_RUN)
),
) -> None:
"""Disable public sharing for a run."""
execution = await execution_db.get_graph_execution(
user_id=auth.user_id,
execution_id=run_id,
)
if not execution:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Run #{run_id} not found",
)
await execution_db.update_graph_execution_share_status(
execution_id=run_id,
user_id=auth.user_id,
is_shared=False,
share_token=None,
shared_at=None,
)
# ============================================================================
# Endpoints - Reviews (Human-in-the-loop)
# ============================================================================
@runs_router.get(
path="/reviews",
summary="List agent run human-in-the-loop reviews",
operation_id="listAgentRunReviews",
)
async def list_reviews(
run_id: Optional[str] = Query(
default=None, description="Filter by graph execution ID"
),
status: Optional[ReviewStatus] = Query(
description="Filter by review status",
),
page: int = Query(default=1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Items per page (max {MAX_PAGE_SIZE})",
),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_RUN_REVIEW)
),
) -> AgentRunReviewsResponse:
"""
List human-in-the-loop reviews for agent runs.
Returns reviews with status WAITING if no status filter is given.
"""
reviews, pagination = await review_db.get_reviews(
user_id=auth.user_id,
graph_exec_id=run_id,
status=status,
page=page,
page_size=page_size,
)
return AgentRunReviewsResponse(
reviews=[AgentRunReview.from_internal(r) for r in reviews],
page=pagination.current_page,
page_size=pagination.page_size,
total_count=pagination.total_items,
total_pages=pagination.total_pages,
)
@runs_router.post(
path="/{run_id}/reviews",
summary="Submit agent run human-in-the-loop reviews",
operation_id="submitAgentRunReviews",
)
async def submit_reviews(
request: AgentRunReviewsSubmitRequest,
run_id: str = Path(description="Graph Execution ID"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_RUN_REVIEW)
),
) -> AgentRunReviewsSubmitResponse:
"""
Submit responses to all pending human-in-the-loop reviews for a run.
All pending reviews for the run must be included in the request.
Approving a review continues execution; rejecting terminates that branch.
"""
# Build review decisions dict for process_all_reviews_for_execution
review_decisions: dict[str, tuple[ReviewStatus, JsonValue | None, str | None]] = {}
for decision in request.reviews:
status = ReviewStatus.APPROVED if decision.approved else ReviewStatus.REJECTED
review_decisions[decision.node_exec_id] = (
status,
decision.edited_payload,
decision.message,
)
results = await review_db.process_all_reviews_for_execution(
user_id=auth.user_id,
review_decisions=review_decisions,
)
approved_count = sum(
1 for r in results.values() if r.status == ReviewStatus.APPROVED
)
rejected_count = sum(
1 for r in results.values() if r.status == ReviewStatus.REJECTED
)
return AgentRunReviewsSubmitResponse(
run_id=run_id,
approved_count=approved_count,
rejected_count=rejected_count,
)

View File

@@ -0,0 +1,155 @@
"""
V2 External API - Schedules Endpoints
Provides endpoints for managing execution schedules.
"""
import logging
from typing import Optional
from fastapi import APIRouter, HTTPException, Query, Security
from prisma.enums import APIKeyPermission
from starlette import status
from backend.api.external.middleware import require_permission
from backend.data import graph as graph_db
from backend.data.auth.base import APIAuthorizationInfo
from backend.data.user import get_user_by_id
from backend.util.clients import get_scheduler_client
from backend.util.timezone_utils import get_user_timezone_or_utc
from .common import DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE
from .models import (
AgentRunSchedule,
AgentRunScheduleCreateRequest,
AgentRunScheduleListResponse,
)
logger = logging.getLogger(__name__)
schedules_router = APIRouter(tags=["graphs", "schedules"])
# ============================================================================
# Endpoints
# ============================================================================
@schedules_router.get(
path="",
summary="List run schedules",
operation_id="listGraphRunSchedules",
)
async def list_all_schedules(
graph_id: Optional[str] = Query(default=None, description="Filter by graph ID"),
page: int = Query(default=1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Items per page (max {MAX_PAGE_SIZE})",
),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_SCHEDULE)
),
) -> AgentRunScheduleListResponse:
"""List schedules for the authenticated user."""
schedules = await get_scheduler_client().get_execution_schedules(
user_id=auth.user_id,
graph_id=graph_id,
)
converted = [AgentRunSchedule.from_internal(s) for s in schedules]
# Manual pagination (scheduler doesn't support pagination natively)
total_count = len(converted)
total_pages = (total_count + page_size - 1) // page_size if total_count > 0 else 1
start = (page - 1) * page_size
end = start + page_size
paginated = converted[start:end]
return AgentRunScheduleListResponse(
schedules=paginated,
page=page,
page_size=page_size,
total_count=total_count,
total_pages=total_pages,
)
@schedules_router.delete(
path="/{schedule_id}",
summary="Delete run schedule",
operation_id="deleteGraphRunSchedule",
)
async def delete_schedule(
schedule_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_SCHEDULE)
),
) -> None:
"""Delete an execution schedule."""
try:
await get_scheduler_client().delete_schedule(
schedule_id=schedule_id,
user_id=auth.user_id,
)
except Exception as e:
if "not found" in str(e).lower():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Schedule #{schedule_id} not found",
)
raise
# ============================================================================
# Graph-specific Schedule Endpoints (nested under /graphs)
# These are included in the graphs router via include_router
# ============================================================================
graph_schedules_router = APIRouter(tags=["graphs"])
@graph_schedules_router.post(
path="/{graph_id}/schedules",
summary="Create run schedule",
operation_id="createGraphRunSchedule",
)
async def create_graph_schedule(
request: AgentRunScheduleCreateRequest,
graph_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_SCHEDULE)
),
) -> AgentRunSchedule:
"""Create a new execution schedule for a graph."""
graph = await graph_db.get_graph(
graph_id=graph_id,
version=request.graph_version,
user_id=auth.user_id,
)
if not graph:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph #{graph_id} v{request.graph_version} not found.",
)
# Determine timezone
if request.timezone:
user_timezone = request.timezone
else:
user = await get_user_by_id(auth.user_id)
user_timezone = get_user_timezone_or_utc(user.timezone if user else None)
result = await get_scheduler_client().add_execution_schedule(
user_id=auth.user_id,
graph_id=graph_id,
graph_version=graph.version,
name=request.name,
cron=request.cron,
input_data=request.input_data,
input_credentials=request.credentials_inputs,
user_timezone=user_timezone,
)
return AgentRunSchedule.from_internal(result)

View File

@@ -0,0 +1,76 @@
"""
V2 External API - Search Endpoints
Cross-domain hybrid search across agents, blocks, and documentation.
"""
import logging
from typing import Optional
from fastapi import APIRouter, Query, Security
from prisma.enums import ContentType as SearchContentType
from backend.api.external.middleware import require_auth
from backend.api.features.store.hybrid_search import unified_hybrid_search
from backend.data.auth.base import APIAuthorizationInfo
from .common import DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE
from .models import MarketplaceSearchResponse, MarketplaceSearchResult
from .rate_limit import search_limiter
logger = logging.getLogger(__name__)
search_router = APIRouter(tags=["search"])
@search_router.get(
path="",
summary="Search content and capabilities of the platform",
operation_id="search",
)
async def search(
query: str = Query(description="Search query"),
content_types: Optional[list[SearchContentType]] = Query(
default=None, description="Content types to filter by"
),
category: Optional[str] = Query(default=None, description="Filter by category"),
page: int = Query(ge=1, default=1),
page_size: int = Query(ge=1, le=MAX_PAGE_SIZE, default=DEFAULT_PAGE_SIZE),
auth: APIAuthorizationInfo = Security(require_auth),
) -> MarketplaceSearchResponse:
"""
Search the platform's content and capabilities (hybrid search: literal + semantic).
Searches across agents, blocks, and documentation. Results are ranked
by a combination of keyword matching and semantic similarity.
"""
search_limiter.check(auth.user_id)
results, total_count = await unified_hybrid_search(
query=query,
content_types=content_types,
category=category,
page=page,
page_size=page_size,
user_id=auth.user_id,
)
total_pages = max(1, (total_count + page_size - 1) // page_size)
return MarketplaceSearchResponse(
results=[
MarketplaceSearchResult(
content_type=r.get("content_type", ""),
content_id=r.get("content_id", ""),
searchable_text=r.get("searchable_text", ""),
metadata=r.get("metadata"),
updated_at=r.get("updated_at"),
combined_score=r.get("combined_score"),
)
for r in results
],
page=page,
page_size=page_size,
total_count=total_count,
total_pages=total_pages,
)

View File

@@ -4,9 +4,11 @@ import logging
from typing import Literal, Optional
import fastapi
import prisma.enums
import prisma.errors
import prisma.models
import prisma.types
from prisma.enums import SubmissionStatus
import backend.api.features.store.image_gen as store_image_gen
import backend.api.features.store.media as store_media
@@ -46,6 +48,8 @@ integration_creds_manager = IntegrationCredentialsManager()
async def list_library_agents(
user_id: str,
search_term: Optional[str] = None,
published: Optional[bool] = None,
favorite: Optional[bool] = None,
sort_by: library_model.LibraryAgentSort = library_model.LibraryAgentSort.UPDATED_AT,
page: int = 1,
page_size: int = 50,
@@ -59,6 +63,8 @@ async def list_library_agents(
Args:
user_id: The ID of the user whose LibraryAgents we want to retrieve.
search_term: Optional string to filter agents by name/description.
published: Allows filtering by marketplace publish status;
`True` -> only published agents, `False` -> only unpublished agents.
sort_by: Sorting field (createdAt, updatedAt, isFavorite, isCreatedByUser).
page: Current page (1-indexed).
page_size: Number of items per page.
@@ -117,6 +123,28 @@ async def list_library_agents(
},
]
# Filter by marketplace publish status
if published is not None:
active_listing_filter: prisma.types.StoreListingVersionWhereInput = {
"isAvailable": True,
"isDeleted": False,
"submissionStatus": prisma.enums.SubmissionStatus.APPROVED,
"StoreListing": {"is": {"isDeleted": False}},
}
where_clause["AgentGraph"] = {
"is": {
"StoreListingVersions": (
{"some": active_listing_filter}
if published
else {"none": active_listing_filter}
)
}
}
# Filter by favorite status
if favorite is not None:
where_clause["isFavorite"] = favorite
order_by: prisma.types.LibraryAgentOrderByInput | None = None
if sort_by == library_model.LibraryAgentSort.CREATED_AT:
@@ -259,32 +287,12 @@ async def get_library_agent(id: str, user_id: str) -> library_model.LibraryAgent
"userId": user_id,
"isDeleted": False,
},
include=library_agent_include(user_id),
include=library_agent_include(user_id, include_store_listing=True),
)
if not library_agent:
raise NotFoundError(f"Library agent #{id} not found")
# Fetch marketplace listing if the agent has been published
store_listing = None
profile = None
if library_agent.AgentGraph:
store_listing = await prisma.models.StoreListing.prisma().find_first(
where={
"agentGraphId": library_agent.AgentGraph.id,
"isDeleted": False,
"hasApprovedVersion": True,
},
include={
"ActiveVersion": True,
},
)
if store_listing and store_listing.ActiveVersion and store_listing.owningUserId:
# Fetch Profile separately since User doesn't have a direct Profile relation
profile = await prisma.models.Profile.prisma().find_first(
where={"userId": store_listing.owningUserId}
)
return library_model.LibraryAgent.from_db(
library_agent,
sub_graphs=(
@@ -292,8 +300,6 @@ async def get_library_agent(id: str, user_id: str) -> library_model.LibraryAgent
if library_agent.AgentGraph
else None
),
store_listing=store_listing,
profile=profile,
)
@@ -447,9 +453,8 @@ async def create_library_agent(
}
},
settings=SafeJson(
GraphSettings.from_graph(
graph_entry,
hitl_safe_mode=hitl_safe_mode,
GraphSettings(
human_in_the_loop_safe_mode=hitl_safe_mode,
sensitive_action_safe_mode=sensitive_action_safe_mode,
).model_dump()
),
@@ -586,8 +591,8 @@ async def update_graph_in_library(
if not library_agent:
raise NotFoundError(f"Library agent not found for graph {created_graph.id}")
library_agent = await update_library_agent_version_and_settings(
user_id, created_graph
library_agent = await update_agent_version_in_library(
user_id, created_graph.id, created_graph.version
)
if created_graph.is_active:
@@ -603,27 +608,6 @@ async def update_graph_in_library(
return created_graph, library_agent
async def update_library_agent_version_and_settings(
user_id: str, agent_graph: graph_db.GraphModel
) -> library_model.LibraryAgent:
"""Update library agent to point to new graph version and sync settings."""
library = await update_agent_version_in_library(
user_id, agent_graph.id, agent_graph.version
)
updated_settings = GraphSettings.from_graph(
graph=agent_graph,
hitl_safe_mode=library.settings.human_in_the_loop_safe_mode,
sensitive_action_safe_mode=library.settings.sensitive_action_safe_mode,
)
if updated_settings != library.settings:
library = await update_library_agent(
library_agent_id=library.id,
user_id=user_id,
settings=updated_settings,
)
return library
async def update_library_agent(
library_agent_id: str,
user_id: str,
@@ -823,7 +807,7 @@ async def add_store_agent_to_library(
Args:
store_listing_version_id: The ID of the store listing version containing the agent.
user_id: The users library to which the agent is being added.
user_id: The user's library to which the agent is being added.
Returns:
The newly created LibraryAgent if successfully added, the existing corresponding one if any.
@@ -837,34 +821,30 @@ async def add_store_agent_to_library(
f"to library for user #{user_id}"
)
store_listing_version = (
await prisma.models.StoreListingVersion.prisma().find_unique(
where={"id": store_listing_version_id}, include={"AgentGraph": True}
)
listing_version = await prisma.models.StoreListingVersion.prisma().find_unique(
where={"id": store_listing_version_id}
)
if not store_listing_version or not store_listing_version.AgentGraph:
logger.warning(f"Store listing version not found: {store_listing_version_id}")
if (
not listing_version
or not listing_version.AgentGraph
or listing_version.submissionStatus != SubmissionStatus.APPROVED
or listing_version.isDeleted
):
logger.warning(
"Store listing version not found or not available: "
f"{store_listing_version_id}"
)
raise NotFoundError(
f"Store listing version {store_listing_version_id} not found or invalid"
f"Store listing version {store_listing_version_id} not found "
"or not available"
)
graph = store_listing_version.AgentGraph
# Convert to GraphModel to check for HITL blocks
graph_model = await graph_db.get_graph(
graph_id=graph.id,
version=graph.version,
user_id=user_id,
include_subgraphs=False,
)
if not graph_model:
raise NotFoundError(
f"Graph #{graph.id} v{graph.version} not found or accessible"
)
graph_id = listing_version.agentGraphId
graph_version = listing_version.agentGraphVersion
# Check if user already has this agent (non-deleted)
if existing := await get_library_agent_by_graph_id(
user_id, graph.id, graph.version
user_id, graph_id, graph_version
):
return existing
@@ -873,8 +853,8 @@ async def add_store_agent_to_library(
where={
"userId_agentGraphId_agentGraphVersion": {
"userId": user_id,
"agentGraphId": graph.id,
"agentGraphVersion": graph.version,
"agentGraphId": graph_id,
"agentGraphVersion": graph_version,
}
},
)
@@ -887,20 +867,20 @@ async def add_store_agent_to_library(
"User": {"connect": {"id": user_id}},
"AgentGraph": {
"connect": {
"graphVersionId": {"id": graph.id, "version": graph.version}
"graphVersionId": {"id": graph_id, "version": graph_version}
}
},
"isCreatedByUser": False,
"useGraphIsActiveVersion": False,
"settings": SafeJson(GraphSettings.from_graph(graph_model).model_dump()),
"settings": SafeJson(GraphSettings().model_dump()),
},
include=library_agent_include(
user_id, include_nodes=False, include_executions=False
),
)
logger.debug(
f"Added graph #{graph.id} v{graph.version}"
f"for store listing version #{store_listing_version.id} "
f"Added graph #{graph_id} v{graph_version}"
f"for store listing version #{listing_version.id} "
f"to library for user #{user_id}"
)
return library_model.LibraryAgent.from_db(added_agent)
@@ -911,37 +891,6 @@ async def add_store_agent_to_library(
##############################################
async def _fetch_user_folders(
user_id: str,
extra_where: Optional[prisma.types.LibraryFolderWhereInput] = None,
include_relations: bool = True,
) -> list[prisma.models.LibraryFolder]:
"""
Shared helper to fetch folders for a user with consistent query params.
Args:
user_id: The ID of the user.
extra_where: Additional where-clause filters to merge in.
include_relations: Whether to include LibraryAgents and Children relations
(used to derive counts via len(); Prisma Python has no _count include).
Returns:
A list of raw Prisma LibraryFolder records.
"""
where_clause: prisma.types.LibraryFolderWhereInput = {
"userId": user_id,
"isDeleted": False,
}
if extra_where:
where_clause.update(extra_where)
return await prisma.models.LibraryFolder.prisma().find_many(
where=where_clause,
order={"createdAt": "asc"},
include=LIBRARY_FOLDER_INCLUDE if include_relations else None,
)
async def list_folders(
user_id: str,
parent_id: Optional[str] = None,
@@ -1019,6 +968,37 @@ async def get_folder_tree(
return root_folders
async def _fetch_user_folders(
user_id: str,
extra_where: Optional[prisma.types.LibraryFolderWhereInput] = None,
include_relations: bool = True,
) -> list[prisma.models.LibraryFolder]:
"""
Shared helper to fetch folders for a user with consistent query params.
Args:
user_id: The ID of the user.
extra_where: Additional where-clause filters to merge in.
include_relations: Whether to include LibraryAgents and Children relations
(used to derive counts via len(); Prisma Python has no _count include).
Returns:
A list of raw Prisma LibraryFolder records.
"""
where_clause: prisma.types.LibraryFolderWhereInput = {
"userId": user_id,
"isDeleted": False,
}
if extra_where:
where_clause.update(extra_where)
return await prisma.models.LibraryFolder.prisma().find_many(
where=where_clause,
order={"createdAt": "asc"},
include=LIBRARY_FOLDER_INCLUDE if include_relations else None,
)
async def get_folder(
folder_id: str,
user_id: str,
@@ -1055,43 +1035,6 @@ async def get_folder(
)
async def _is_descendant_of(
folder_id: str,
potential_ancestor_id: str,
user_id: str,
) -> bool:
"""
Check if folder_id is a descendant of (or equal to) potential_ancestor_id.
Fetches all user folders in a single query and walks the parent chain
in memory to avoid N database round-trips.
Args:
folder_id: The ID of the folder to check.
potential_ancestor_id: The ID of the potential ancestor.
user_id: The ID of the user.
Returns:
True if folder_id is a descendant of (or equal to) potential_ancestor_id.
"""
all_folders = await prisma.models.LibraryFolder.prisma().find_many(
where={"userId": user_id, "isDeleted": False},
)
parent_map = {f.id: f.parentId for f in all_folders}
visited: set[str] = set()
current_id: str | None = folder_id
while current_id:
if current_id == potential_ancestor_id:
return True
if current_id in visited:
break # cycle detected
visited.add(current_id)
current_id = parent_map.get(current_id)
return False
async def create_folder(
user_id: str,
name: str,
@@ -1303,6 +1246,43 @@ async def move_folder(
)
async def _is_descendant_of(
folder_id: str,
potential_ancestor_id: str,
user_id: str,
) -> bool:
"""
Check if folder_id is a descendant of (or equal to) potential_ancestor_id.
Fetches all user folders in a single query and walks the parent chain
in memory to avoid N database round-trips.
Args:
folder_id: The ID of the folder to check.
potential_ancestor_id: The ID of the potential ancestor.
user_id: The ID of the user.
Returns:
True if folder_id is a descendant of (or equal to) potential_ancestor_id.
"""
all_folders = await prisma.models.LibraryFolder.prisma().find_many(
where={"userId": user_id, "isDeleted": False},
)
parent_map = {f.id: f.parentId for f in all_folders}
visited: set[str] = set()
current_id: str | None = folder_id
while current_id:
if current_id == potential_ancestor_id:
return True
if current_id in visited:
break # cycle detected
visited.add(current_id)
current_id = parent_map.get(current_id)
return False
async def delete_folder(
folder_id: str,
user_id: str,

View File

@@ -220,8 +220,6 @@ class LibraryAgent(pydantic.BaseModel):
def from_db(
agent: prisma.models.LibraryAgent,
sub_graphs: Optional[list[prisma.models.AgentGraph]] = None,
store_listing: Optional[prisma.models.StoreListing] = None,
profile: Optional[prisma.models.Profile] = None,
) -> "LibraryAgent":
"""
Factory method that constructs a LibraryAgent from a Prisma LibraryAgent
@@ -306,19 +304,33 @@ class LibraryAgent(pydantic.BaseModel):
can_access_graph = agent.AgentGraph.userId == agent.userId
is_latest_version = True
marketplace_listing_data = None
if store_listing and store_listing.ActiveVersion and profile:
creator_data = MarketplaceListingCreator(
name=profile.name,
id=profile.id,
slug=profile.username,
)
marketplace_listing_data = MarketplaceListing(
# NOTE: this access pattern is designed for use with
# `library_agent_include(..., include_store_listing=True)`
active_listing = (
agent.AgentGraph.StoreListingVersions[0]
if agent.AgentGraph.StoreListingVersions
else None
)
store_listing = active_listing.StoreListing if active_listing else None
active_listing = store_listing.ActiveVersion if store_listing else None
creator_profile = store_listing.CreatorProfile if store_listing else None
marketplace_listing_info = (
MarketplaceListing(
id=store_listing.id,
name=store_listing.ActiveVersion.name,
name=active_listing.name,
slug=store_listing.slug,
creator=creator_data,
creator=MarketplaceListingCreator(
name=creator_profile.name,
id=creator_profile.id,
slug=creator_profile.username,
),
)
if store_listing
and active_listing
and creator_profile
and not store_listing.isDeleted
else None
)
return LibraryAgent(
id=agent.id,
@@ -355,7 +367,7 @@ class LibraryAgent(pydantic.BaseModel):
folder_name=agent.Folder.name if agent.Folder else None,
recommended_schedule_cron=agent.AgentGraph.recommendedScheduleCron,
settings=_parse_settings(agent.settings),
marketplace_listing=marketplace_listing_data,
marketplace_listing=marketplace_listing_info,
)

View File

@@ -282,7 +282,7 @@ class TestOAuthLogin:
)
mock_register.return_value = {
"client_id": "registered-client-id",
"client_secret": "registered-secret",
"client_secret": "registered-secret", # pragma: allowlist secret
}
mock_cm.store.store_state_token = AsyncMock(
return_value=("state-token-123", "code-challenge-abc")
@@ -383,7 +383,7 @@ class TestOAuthCallback:
"authorize_url": "https://auth.sentry.io/authorize",
"token_url": "https://auth.sentry.io/token",
"client_id": "test-client-id",
"client_secret": "test-secret",
"client_secret": "test-secret", # pragma: allowlist secret
"server_url": "https://mcp.sentry.dev/mcp",
}
mock_state.scopes = ["openid"]

View File

@@ -518,22 +518,22 @@ async def get_store_submissions(
async def delete_store_submission(
user_id: str,
submission_id: str,
store_listing_version_id: str,
) -> bool:
"""
Delete a store submission version as the submitting user.
Args:
user_id: ID of the authenticated user
submission_id: StoreListingVersion ID to delete
store_listing_version_id: StoreListingVersion ID to delete
Returns:
bool: True if successfully deleted
"""
try:
# Find the submission version with ownership check
version = await prisma.models.StoreListingVersion.prisma().find_first(
where={"id": submission_id}, include={"StoreListing": True}
version = await prisma.models.StoreListingVersion.prisma().find_unique(
where={"id": store_listing_version_id}, include={"StoreListing": True}
)
if (
@@ -546,7 +546,7 @@ async def delete_store_submission(
# Prevent deletion of approved submissions
if version.submissionStatus == prisma.enums.SubmissionStatus.APPROVED:
raise store_exceptions.InvalidOperationError(
"Cannot delete approved submissions"
"Cannot delete approved store listings"
)
# Delete the version
@@ -916,7 +916,7 @@ async def get_user_profile(
async def update_profile(
user_id: str, profile: store_model.Profile
user_id: str, profile: store_model.ProfileUpdateRequest
) -> store_model.ProfileDetails:
"""
Update the store profile for a user or create a new one if it doesn't exist.
@@ -930,11 +930,6 @@ async def update_profile(
"""
logger.info(f"Updating profile for user {user_id} with data: {profile}")
try:
# Sanitize username to allow only letters, numbers, and hyphens
username = "".join(
c if c.isalpha() or c == "-" or c.isnumeric() else ""
for c in profile.username
).lower()
# Check if profile exists for the given user_id
existing_profile = await prisma.models.Profile.prisma().find_first(
where={"userId": user_id}
@@ -957,17 +952,26 @@ async def update_profile(
logger.debug(f"Updating existing profile for user {user_id}")
# Prepare update data, only including non-None values
update_data = {}
update_data: prisma.types.ProfileUpdateInput = {}
if profile.name is not None:
update_data["name"] = profile.name
update_data["name"] = profile.name.strip()
if profile.username is not None:
update_data["username"] = username
# Sanitize username to allow only letters, numbers, and hyphens
update_data["username"] = "".join(
c if c.isalpha() or c == "-" or c.isnumeric() else ""
for c in profile.username
).lower()
if profile.description is not None:
update_data["description"] = profile.description
update_data["description"] = profile.description.strip()
if profile.links is not None:
update_data["links"] = profile.links
update_data["links"] = [
# Filter out empty links
link
for _link in profile.links
if (link := _link.strip())
]
if profile.avatar_url is not None:
update_data["avatarUrl"] = profile.avatar_url
update_data["avatarUrl"] = profile.avatar_url.strip() or None
# Update the existing profile
updated_profile = await prisma.models.Profile.prisma().update(
@@ -996,12 +1000,13 @@ async def get_my_agents(
try:
search_filter: prisma.types.LibraryAgentWhereInput = {
"userId": user_id,
# Filter for unpublished agents only:
# Filter for unsubmitted agents only:
"AgentGraph": {
"is": {
"StoreListingVersions": {
"none": {
"isAvailable": True,
"isDeleted": False,
"StoreListing": {"is": {"isDeleted": False}},
}
}

View File

@@ -7,7 +7,7 @@ import pytest
from prisma import Prisma
from . import db
from .model import Profile
from .model import ProfileUpdateRequest
@pytest.fixture(autouse=True)
@@ -297,7 +297,7 @@ async def test_update_profile(mocker):
mock_profile_db.return_value.update = mocker.AsyncMock(return_value=mock_profile)
# Test data
profile = Profile(
profile = ProfileUpdateRequest(
name="Test Creator",
username="creator",
description="Test description",

View File

@@ -117,19 +117,24 @@ class StoreAgentDetails(pydantic.BaseModel):
)
class Profile(pydantic.BaseModel):
class ProfileUpdateRequest(pydantic.BaseModel):
"""Marketplace user profile (only attributes that the user can update)"""
username: str | None = None
name: str | None = None
description: str | None = None
avatar_url: str | None = None
links: list[str] | None = None
class ProfileDetails(pydantic.BaseModel):
"""Marketplace user profile (including read-only fields)"""
username: str
name: str
description: str
avatar_url: str | None
links: list[str]
class ProfileDetails(Profile):
"""Marketplace user profile (including read-only fields)"""
is_featured: bool
@classmethod

View File

@@ -54,7 +54,7 @@ async def get_profile(
dependencies=[Security(autogpt_libs.auth.requires_user)],
)
async def update_or_create_profile(
profile: store_model.Profile,
profile: store_model.ProfileUpdateRequest,
user_id: str = Security(autogpt_libs.auth.get_user_id),
) -> store_model.ProfileDetails:
"""Update the store profile for the authenticated user."""
@@ -354,7 +354,7 @@ async def delete_submission(
"""Delete a marketplace listing submission"""
result = await store_db.delete_store_submission(
user_id=user_id,
submission_id=submission_id,
store_listing_version_id=submission_id,
)
return result

View File

@@ -736,13 +736,13 @@ class DeleteGraphResponse(TypedDict):
async def list_graphs(
user_id: Annotated[str, Security(get_user_id)],
) -> Sequence[graph_db.GraphMeta]:
paginated_result = await graph_db.list_graphs_paginated(
graphs, _ = await graph_db.list_graphs_paginated(
user_id=user_id,
page=1,
page_size=250,
filter_by="active",
)
return paginated_result.graphs
return graphs
@v1_router.get(
@@ -859,8 +859,8 @@ async def update_graph(
new_graph_version = await graph_db.create_graph(graph, user_id=user_id)
if new_graph_version.is_active:
await library_db.update_library_agent_version_and_settings(
user_id, new_graph_version
await library_db.update_agent_version_in_library(
user_id, new_graph_version.id, new_graph_version.version
)
new_graph_version = await on_graph_activate(new_graph_version, user_id=user_id)
await graph_db.set_graph_active_version(
@@ -913,8 +913,8 @@ async def set_graph_active_version(
)
# Keep the library agent up to date with the new active version
await library_db.update_library_agent_version_and_settings(
user_id, new_active_graph
await library_db.update_agent_version_in_library(
user_id, new_active_graph.id, new_active_graph.version
)
if current_active_graph and current_active_graph.version != new_active_version:

View File

@@ -5,16 +5,12 @@ from enum import Enum
from typing import Any, Optional
import fastapi
import fastapi.responses
import pydantic
import starlette.middleware.cors
import uvicorn
from autogpt_libs.auth import add_auth_responses_to_openapi
from autogpt_libs.auth import verify_settings as verify_auth_settings
from fastapi.exceptions import RequestValidationError
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.routing import APIRoute
from prisma.errors import PrismaError
import backend.api.features.admin.credit_admin_routes
import backend.api.features.admin.execution_analytics_routes
@@ -41,22 +37,12 @@ import backend.data.user
import backend.integrations.webhooks.utils
import backend.util.service
import backend.util.settings
from backend.api.features.library.exceptions import (
FolderAlreadyExistsError,
FolderValidationError,
)
from backend.api.utils.exceptions import add_exception_handlers
from backend.blocks.llm import DEFAULT_LLM_MODEL
from backend.data.model import Credentials
from backend.integrations.providers import ProviderName
from backend.monitoring.instrumentation import instrument_fastapi
from backend.util import json
from backend.util.cloud_storage import shutdown_cloud_storage_handler
from backend.util.exceptions import (
MissingConfigError,
NotAuthorizedError,
NotFoundError,
PreconditionFailed,
)
from backend.util.feature_flag import initialize_launchdarkly, shutdown_launchdarkly
from backend.util.service import UnhealthyServiceError
from backend.util.workspace_storage import shutdown_workspace_storage
@@ -207,77 +193,7 @@ instrument_fastapi(
)
def handle_internal_http_error(status_code: int = 500, log_error: bool = True):
def handler(request: fastapi.Request, exc: Exception):
if log_error:
logger.exception(
"%s %s failed. Investigate and resolve the underlying issue: %s",
request.method,
request.url.path,
exc,
exc_info=exc,
)
hint = (
"Adjust the request and retry."
if status_code < 500
else "Check server logs and dependent services."
)
return fastapi.responses.JSONResponse(
content={
"message": f"Failed to process {request.method} {request.url.path}",
"detail": str(exc),
"hint": hint,
},
status_code=status_code,
)
return handler
async def validation_error_handler(
request: fastapi.Request, exc: Exception
) -> fastapi.responses.Response:
logger.error(
"Validation failed for %s %s: %s. Fix the request payload and try again.",
request.method,
request.url.path,
exc,
)
errors: list | str
if hasattr(exc, "errors"):
errors = exc.errors() # type: ignore[call-arg]
else:
errors = str(exc)
response_content = {
"message": f"Invalid data for {request.method} {request.url.path}",
"detail": errors,
"hint": "Ensure the request matches the API schema.",
}
content_json = json.dumps(response_content)
return fastapi.responses.Response(
content=content_json,
status_code=422,
media_type="application/json",
)
app.add_exception_handler(PrismaError, handle_internal_http_error(500))
app.add_exception_handler(
FolderAlreadyExistsError, handle_internal_http_error(409, False)
)
app.add_exception_handler(FolderValidationError, handle_internal_http_error(400, False))
app.add_exception_handler(NotFoundError, handle_internal_http_error(404, False))
app.add_exception_handler(NotAuthorizedError, handle_internal_http_error(403, False))
app.add_exception_handler(RequestValidationError, validation_error_handler)
app.add_exception_handler(pydantic.ValidationError, validation_error_handler)
app.add_exception_handler(MissingConfigError, handle_internal_http_error(503))
app.add_exception_handler(ValueError, handle_internal_http_error(400))
app.add_exception_handler(PreconditionFailed, handle_internal_http_error(428))
app.add_exception_handler(Exception, handle_internal_http_error(500))
add_exception_handlers(app)
app.include_router(backend.api.features.v1.v1_router, tags=["v1"], prefix="/api")
app.include_router(

View File

@@ -0,0 +1,119 @@
"""
Shared exception handlers for FastAPI applications.
Provides a single `add_exception_handlers` function that registers a consistent
set of exception-to-HTTP-status mappings on any FastAPI app instance. This
ensures that all mounted sub-apps (v1, v2, main) handle errors uniformly.
"""
import json
import logging
import fastapi
import fastapi.responses
import pydantic
from fastapi.exceptions import RequestValidationError
from prisma.errors import PrismaError
from prisma.errors import RecordNotFoundError as PrismaRecordNotFoundError
from starlette import status
from backend.api.features.library.exceptions import (
FolderAlreadyExistsError,
FolderValidationError,
)
from backend.util.exceptions import (
MissingConfigError,
NotAuthorizedError,
NotFoundError,
PreconditionFailed,
)
logger = logging.getLogger(__name__)
def add_exception_handlers(app: fastapi.FastAPI) -> None:
"""
Register standard exception handlers on the given FastAPI app.
Mounted sub-apps do NOT inherit exception handlers from the parent app,
so each app instance must register its own handlers.
"""
for exception, handler in {
# It's the client's problem: HTTP 4XX
NotFoundError: _handle_error(status.HTTP_404_NOT_FOUND, log_error=False),
NotAuthorizedError: _handle_error(status.HTTP_403_FORBIDDEN, log_error=False),
PreconditionFailed: _handle_error(status.HTTP_428_PRECONDITION_REQUIRED),
RequestValidationError: _handle_validation_error,
pydantic.ValidationError: _handle_validation_error,
PrismaRecordNotFoundError: _handle_error(status.HTTP_404_NOT_FOUND),
FolderAlreadyExistsError: _handle_error(
status.HTTP_409_CONFLICT, log_error=False
),
FolderValidationError: _handle_error(
status.HTTP_400_BAD_REQUEST, log_error=False
),
ValueError: _handle_error(status.HTTP_400_BAD_REQUEST),
# It's the backend's problem: HTTP 5XX
MissingConfigError: _handle_error(status.HTTP_503_SERVICE_UNAVAILABLE),
PrismaError: _handle_error(status.HTTP_500_INTERNAL_SERVER_ERROR),
Exception: _handle_error(status.HTTP_500_INTERNAL_SERVER_ERROR),
}.items():
app.add_exception_handler(exception, handler)
def _handle_error(status_code: int = 500, log_error: bool = True):
def handler(request: fastapi.Request, exc: Exception):
if log_error:
logger.exception(
"%s %s failed. Investigate and resolve the underlying issue: %s",
request.method,
request.url.path,
exc,
exc_info=exc,
)
hint = (
"Adjust the request and retry."
if status_code < 500
else "Check server logs and dependent services."
)
return fastapi.responses.JSONResponse(
content={
"message": f"Failed to process {request.method} {request.url.path}",
"detail": str(exc),
"hint": hint,
},
status_code=status_code,
)
return handler
async def _handle_validation_error(
request: fastapi.Request, exc: Exception
) -> fastapi.responses.Response:
logger.error(
"Validation failed for %s %s: %s. Fix the request payload and try again.",
request.method,
request.url.path,
exc,
)
errors: list | str
if hasattr(exc, "errors"):
errors = exc.errors() # type: ignore[call-arg]
else:
errors = str(exc)
response_content = {
"message": f"Invalid data for {request.method} {request.url.path}",
"detail": errors,
"hint": "Ensure the request matches the API schema.",
}
content_json = json.dumps(response_content)
return fastapi.responses.Response(
content=content_json,
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
media_type="application/json",
)

View File

@@ -43,7 +43,12 @@ def test_server_host_standard_url():
def test_server_host_strips_credentials():
"""hostname must not expose user:pass."""
assert server_host("https://user:secret@mcp.example.com/mcp") == "mcp.example.com"
assert (
server_host(
"https://user:secret@mcp.example.com/mcp" # pragma: allowlist secret
)
== "mcp.example.com"
)
def test_server_host_with_port():

View File

@@ -160,7 +160,6 @@ async def add_test_data(db):
data={
"slug": f"test-agent-{graph.id[:8]}",
"agentGraphId": graph.id,
"agentGraphVersion": graph.version,
"hasApprovedVersion": True,
"owningUserId": graph.userId,
}

View File

@@ -6,9 +6,9 @@ This script imports the FastAPI app from backend.api.rest_api and outputs
the OpenAPI specification as JSON to stdout or a specified file.
Usage:
`poetry run python generate_openapi_json.py`
`poetry run python generate_openapi_json.py --output openapi.json`
`poetry run python generate_openapi_json.py --indent 4 --output openapi.json`
`poetry run export-api-schema`
`poetry run export-api-schema --output openapi.json`
`poetry run export-api-schema --api v2 --output openapi.json`
"""
import json
@@ -17,8 +17,16 @@ from pathlib import Path
import click
API_CHOICES = ["internal", "v1", "v2"]
@click.command()
@click.option(
"--api",
type=click.Choice(API_CHOICES),
default="internal",
help="Which API schema to export (default: internal)",
)
@click.option(
"--output",
type=click.Path(dir_okay=False, path_type=Path),
@@ -26,13 +34,12 @@ import click
)
@click.option(
"--pretty",
type=click.BOOL,
default=False,
is_flag=True,
help="Pretty-print JSON output (indented 2 spaces)",
)
def main(output: Path, pretty: bool):
def main(api: str, output: Path, pretty: bool):
"""Generate and output the OpenAPI JSON specification."""
openapi_schema = get_openapi_schema()
openapi_schema = get_openapi_schema(api)
json_output = json.dumps(
openapi_schema, indent=2 if pretty else None, ensure_ascii=False
@@ -46,11 +53,22 @@ def main(output: Path, pretty: bool):
print(json_output)
def get_openapi_schema():
"""Get the OpenAPI schema from the FastAPI app"""
from backend.api.rest_api import app
def get_openapi_schema(api: str = "internal"):
"""Get the OpenAPI schema from the specified FastAPI app."""
if api == "internal":
from backend.api.rest_api import app
return app.openapi()
return app.openapi()
elif api == "v1":
from backend.api.external.v1.app import v1_app
return v1_app.openapi()
elif api == "v2":
from backend.api.external.v2.app import v2_app
return v2_app.openapi()
else:
raise click.BadParameter(f"Unknown API: {api}. Choose from {API_CHOICES}")
if __name__ == "__main__":

View File

@@ -36,7 +36,7 @@ class TestSetupLangfuseOtel:
"""OTEL env vars should be set when Langfuse credentials exist."""
mock_settings = MagicMock()
mock_settings.secrets.langfuse_public_key = "pk-test-123"
mock_settings.secrets.langfuse_secret_key = "sk-test-456"
mock_settings.secrets.langfuse_secret_key = "sk-test-456" # pragma: allowlist secret # noqa: E501; fmt: skip
mock_settings.secrets.langfuse_host = "https://langfuse.example.com"
mock_settings.secrets.langfuse_tracing_environment = "test"
@@ -91,7 +91,7 @@ class TestSetupLangfuseOtel:
"""Explicit env-var overrides should not be clobbered."""
mock_settings = MagicMock()
mock_settings.secrets.langfuse_public_key = "pk-test"
mock_settings.secrets.langfuse_secret_key = "sk-test"
mock_settings.secrets.langfuse_secret_key = "sk-test" # pragma: allowlist secret # noqa: E501; fmt: skip
mock_settings.secrets.langfuse_host = "https://langfuse.example.com"
with (

View File

@@ -53,49 +53,52 @@ logger = logging.getLogger(__name__)
# Single source of truth for all tools
TOOL_REGISTRY: dict[str, BaseTool] = {
"add_understanding": AddUnderstandingTool(),
"create_agent": CreateAgentTool(),
"customize_agent": CustomizeAgentTool(),
"edit_agent": EditAgentTool(),
"find_agent": FindAgentTool(),
"find_block": FindBlockTool(),
"find_library_agent": FindLibraryAgentTool(),
# Folder management tools
"create_folder": CreateFolderTool(),
"list_folders": ListFoldersTool(),
"update_folder": UpdateFolderTool(),
"move_folder": MoveFolderTool(),
"delete_folder": DeleteFolderTool(),
"move_agents_to_folder": MoveAgentsToFolderTool(),
"run_agent": RunAgentTool(),
"run_block": RunBlockTool(),
"continue_run_block": ContinueRunBlockTool(),
"run_mcp_tool": RunMCPToolTool(),
"get_mcp_guide": GetMCPGuideTool(),
"view_agent_output": AgentOutputTool(),
"search_docs": SearchDocsTool(),
"get_doc_page": GetDocPageTool(),
"get_agent_building_guide": GetAgentBuildingGuideTool(),
# Web fetch for safe URL retrieval
"web_fetch": WebFetchTool(),
# Agent-browser multi-step automation (navigate, act, screenshot)
"browser_navigate": BrowserNavigateTool(),
"browser_act": BrowserActTool(),
"browser_screenshot": BrowserScreenshotTool(),
# Sandboxed code execution (bubblewrap)
"bash_exec": BashExecTool(),
# Persistent workspace tools (cloud storage, survives across sessions)
# Feature request tools
"search_feature_requests": SearchFeatureRequestsTool(),
"create_feature_request": CreateFeatureRequestTool(),
# Agent generation tools (local validation/fixing)
"validate_agent_graph": ValidateAgentGraphTool(),
"fix_agent_graph": FixAgentGraphTool(),
# Workspace tools for CoPilot file operations
"list_workspace_files": ListWorkspaceFilesTool(),
"read_workspace_file": ReadWorkspaceFileTool(),
"write_workspace_file": WriteWorkspaceFileTool(),
"delete_workspace_file": DeleteWorkspaceFileTool(),
tool.name: tool
for tool in [
AddUnderstandingTool(),
CreateAgentTool(),
CustomizeAgentTool(),
EditAgentTool(),
FindAgentTool(),
FindBlockTool(),
FindLibraryAgentTool(),
# Folder management tools
CreateFolderTool(),
ListFoldersTool(),
UpdateFolderTool(),
MoveFolderTool(),
DeleteFolderTool(),
MoveAgentsToFolderTool(),
RunAgentTool(),
RunBlockTool(),
ContinueRunBlockTool(),
RunMCPToolTool(),
GetMCPGuideTool(),
AgentOutputTool(),
SearchDocsTool(),
GetDocPageTool(),
GetAgentBuildingGuideTool(),
# Web fetch for safe URL retrieval
WebFetchTool(),
# Agent-browser multi-step automation (navigate, act, screenshot)
BrowserNavigateTool(),
BrowserActTool(),
BrowserScreenshotTool(),
# Sandboxed code execution (bubblewrap)
BashExecTool(),
# Persistent workspace tools (cloud storage, survives across sessions)
# Feature request tools
SearchFeatureRequestsTool(),
CreateFeatureRequestTool(),
# Agent generation tools (local validation/fixing)
ValidateAgentGraphTool(),
FixAgentGraphTool(),
# Workspace tools for CoPilot file operations
ListWorkspaceFilesTool(),
ReadWorkspaceFileTool(),
WriteWorkspaceFileTool(),
DeleteWorkspaceFileTool(),
]
}
# Export individual tool instances for backwards compatibility

View File

@@ -4,7 +4,7 @@ import logging
import re
import uuid
from collections.abc import Sequence
from typing import Any, NotRequired, TypedDict
from typing import TYPE_CHECKING, Any, NotRequired, TypedDict
from backend.data.db_accessors import graph_db, library_db, store_db
from backend.data.graph import Graph, Link, Node
@@ -12,6 +12,10 @@ from backend.util.exceptions import DatabaseError, NotFoundError
from .helpers import UUID_RE_STR
if TYPE_CHECKING:
from backend.api.features.library.model import LibraryAgent
logger = logging.getLogger(__name__)
@@ -69,7 +73,7 @@ class DecompositionResult(TypedDict, total=False):
error_type: str
AgentSummary = LibraryAgentSummary | MarketplaceAgentSummary | dict[str, Any]
AgentSummary = LibraryAgentSummary | MarketplaceAgentSummary
_UUID_PATTERN = re.compile(UUID_RE_STR, re.IGNORECASE)
@@ -388,7 +392,7 @@ async def get_all_relevant_agents_for_generation(
def extract_search_terms_from_steps(
decomposition_result: DecompositionResult | dict[str, Any],
decomposition_result: DecompositionResult,
) -> list[str]:
"""Extract search terms from decomposed instruction steps.
@@ -431,12 +435,12 @@ def extract_search_terms_from_steps(
async def enrich_library_agents_from_steps(
user_id: str,
decomposition_result: DecompositionResult | dict[str, Any],
existing_agents: Sequence[AgentSummary] | Sequence[dict[str, Any]],
decomposition_result: DecompositionResult,
existing_agents: Sequence[AgentSummary],
exclude_graph_id: str | None = None,
include_marketplace: bool = True,
max_additional_results: int = 10,
) -> list[AgentSummary] | list[dict[str, Any]]:
) -> list[AgentSummary]:
"""Enrich library agents list with additional searches based on decomposed steps.
This implements two-phase search: after decomposition, we search for additional
@@ -469,7 +473,7 @@ async def enrich_library_agents_from_steps(
if graph_id and isinstance(graph_id, str):
existing_ids.add(graph_id)
all_agents: list[AgentSummary] | list[dict[str, Any]] = list(existing_agents)
all_agents: list[AgentSummary] = list(existing_agents)
for term in search_terms[:3]:
try:
@@ -599,7 +603,7 @@ async def save_agent_to_library(
user_id: str,
is_update: bool = False,
folder_id: str | None = None,
) -> tuple[Graph, Any]:
) -> tuple[Graph, "LibraryAgent"]:
"""Save agent to database and user's library.
Args:

View File

@@ -5,6 +5,7 @@ import re
from datetime import datetime, timedelta, timezone
from typing import Any
from prisma.enums import APIKeyPermission
from pydantic import BaseModel, Field, field_validator
from backend.api.features.library.model import LibraryAgent
@@ -106,6 +107,10 @@ class AgentOutputTool(BaseTool):
def name(self) -> str:
return "view_agent_output"
@property
def allow_external_use(self):
return True, [APIKeyPermission.READ_RUN]
@property
def description(self) -> str:
return """Retrieve execution outputs from agents in the user's library.

View File

@@ -2,9 +2,10 @@
import json
import logging
from typing import Any
from typing import Any, Literal, Sequence
from openai.types.chat import ChatCompletionToolParam
from prisma.enums import APIKeyPermission
from backend.copilot.model import ChatSession
from backend.copilot.response_model import StreamToolOutputAvailable
@@ -119,11 +120,6 @@ class BaseTool:
"""Tool parameters schema for OpenAI."""
raise NotImplementedError
@property
def requires_auth(self) -> bool:
"""Whether this tool requires authentication."""
return False
@property
def is_available(self) -> bool:
"""Whether this tool is available in the current environment.
@@ -134,6 +130,21 @@ class BaseTool:
"""
return True
@property
def requires_auth(self) -> bool:
"""Whether this tool requires an authenticated end user."""
return False
@property
def allow_external_use(
self,
) -> tuple[Literal[False], None] | tuple[Literal[True], Sequence[APIKeyPermission]]:
"""
Whether this tool maybe used through our external MCP server.
Returns `True` and a list of required permissions if so.
"""
return False, None
def as_openai_tool(self) -> ChatCompletionToolParam:
"""Convert to OpenAI tool format."""
return ChatCompletionToolParam(

View File

@@ -4,6 +4,8 @@ import logging
import uuid
from typing import Any
from prisma.enums import APIKeyPermission
from backend.copilot.model import ChatSession
from .agent_generator.pipeline import fetch_library_agents, fix_validate_and_save
@@ -20,6 +22,14 @@ class CreateAgentTool(BaseTool):
def name(self) -> str:
return "create_agent"
@property
def allow_external_use(self):
return True, [
APIKeyPermission.WRITE_GRAPH,
APIKeyPermission.WRITE_LIBRARY,
APIKeyPermission.READ_LIBRARY, # for finding relevant library (sub-)agents
]
@property
def description(self) -> str:
return (

View File

@@ -4,6 +4,8 @@ import logging
import uuid
from typing import Any
from prisma.enums import APIKeyPermission
from backend.copilot.model import ChatSession
from .agent_generator.pipeline import fetch_library_agents, fix_validate_and_save
@@ -20,6 +22,14 @@ class CustomizeAgentTool(BaseTool):
def name(self) -> str:
return "customize_agent"
@property
def allow_external_use(self):
return True, [
APIKeyPermission.WRITE_GRAPH,
APIKeyPermission.WRITE_LIBRARY,
# READ_STORE permission not needed since we only use public marketplace data
]
@property
def description(self) -> str:
return (

View File

@@ -26,7 +26,7 @@ from .e2b_sandbox import (
)
_SESSION_ID = "sess-123"
_API_KEY = "test-api-key"
_API_KEY = "test-api-key" # pragma: allowlist secret
_SANDBOX_ID = "sb-abc"
_TIMEOUT = 300

View File

@@ -3,6 +3,8 @@
import logging
from typing import Any
from prisma.enums import APIKeyPermission
from backend.copilot.model import ChatSession
from .agent_generator import get_agent_as_json
@@ -20,6 +22,10 @@ class EditAgentTool(BaseTool):
def name(self) -> str:
return "edit_agent"
@property
def allow_external_use(self):
return True, [APIKeyPermission.WRITE_GRAPH, APIKeyPermission.WRITE_LIBRARY]
@property
def description(self) -> str:
return (

View File

@@ -132,6 +132,10 @@ class SearchFeatureRequestsTool(BaseTool):
def name(self) -> str:
return "search_feature_requests"
@property
def allow_external_use(self):
return True, []
@property
def description(self) -> str:
return (

View File

@@ -16,6 +16,11 @@ class FindAgentTool(BaseTool):
def name(self) -> str:
return "find_agent"
@property
def allow_external_use(self):
# READ_STORE permission not needed since we only use public marketplace data
return True, []
@property
def description(self) -> str:
return (

View File

@@ -1,7 +1,7 @@
import logging
from typing import Any
from prisma.enums import ContentType
from prisma.enums import APIKeyPermission, ContentType
from backend.blocks import get_block
from backend.blocks._base import BlockType
@@ -49,6 +49,10 @@ class FindBlockTool(BaseTool):
def name(self) -> str:
return "find_block"
@property
def allow_external_use(self):
return True, [APIKeyPermission.READ_BLOCK]
@property
def description(self) -> str:
return (

View File

@@ -2,6 +2,8 @@
from typing import Any
from prisma.enums import APIKeyPermission
from backend.copilot.model import ChatSession
from .agent_search import search_agents
@@ -16,6 +18,10 @@ class FindLibraryAgentTool(BaseTool):
def name(self) -> str:
return "find_library_agent"
@property
def allow_external_use(self):
return True, [APIKeyPermission.READ_LIBRARY]
@property
def description(self) -> str:
return (

View File

@@ -19,6 +19,10 @@ class FixAgentGraphTool(BaseTool):
def name(self) -> str:
return "fix_agent_graph"
@property
def allow_external_use(self):
return True, []
@property
def description(self) -> str:
return (

View File

@@ -40,6 +40,10 @@ class GetAgentBuildingGuideTool(BaseTool):
def name(self) -> str:
return "get_agent_building_guide"
@property
def allow_external_use(self):
return True, []
@property
def description(self) -> str:
return (

View File

@@ -22,6 +22,10 @@ class GetDocPageTool(BaseTool):
def name(self) -> str:
return "get_doc_page"
@property
def allow_external_use(self):
return True, []
@property
def description(self) -> str:
return (

View File

@@ -36,6 +36,10 @@ class GetMCPGuideTool(BaseTool):
def name(self) -> str:
return "get_mcp_guide"
@property
def allow_external_use(self):
return True, []
@property
def description(self) -> str:
return (

View File

@@ -2,6 +2,8 @@
from typing import Any
from prisma.enums import APIKeyPermission
from backend.api.features.library import model as library_model
from backend.api.features.library.db import collect_tree_ids
from backend.copilot.model import ChatSession
@@ -86,6 +88,10 @@ class CreateFolderTool(BaseTool):
def name(self) -> str:
return "create_folder"
@property
def allow_external_use(self):
return True, [APIKeyPermission.WRITE_LIBRARY]
@property
def description(self) -> str:
return (
@@ -172,6 +178,10 @@ class ListFoldersTool(BaseTool):
def name(self) -> str:
return "list_folders"
@property
def allow_external_use(self):
return True, [APIKeyPermission.READ_LIBRARY]
@property
def description(self) -> str:
return (
@@ -275,6 +285,10 @@ class UpdateFolderTool(BaseTool):
def name(self) -> str:
return "update_folder"
@property
def allow_external_use(self):
return True, [APIKeyPermission.WRITE_LIBRARY]
@property
def description(self) -> str:
return "Update a folder's name, icon, or color."
@@ -355,6 +369,10 @@ class MoveFolderTool(BaseTool):
def name(self) -> str:
return "move_folder"
@property
def allow_external_use(self):
return True, [APIKeyPermission.WRITE_LIBRARY]
@property
def description(self) -> str:
return (
@@ -431,6 +449,10 @@ class DeleteFolderTool(BaseTool):
def name(self) -> str:
return "delete_folder"
@property
def allow_external_use(self):
return True, [APIKeyPermission.WRITE_LIBRARY]
@property
def description(self) -> str:
return (
@@ -497,6 +519,10 @@ class MoveAgentsToFolderTool(BaseTool):
def name(self) -> str:
return "move_agents_to_folder"
@property
def allow_external_use(self):
return True, [APIKeyPermission.WRITE_LIBRARY]
@property
def description(self) -> str:
return (

View File

@@ -3,6 +3,7 @@
import logging
from typing import Any
from prisma.enums import APIKeyPermission
from pydantic import BaseModel, Field, field_validator
from backend.copilot.config import ChatConfig
@@ -102,6 +103,10 @@ class RunAgentTool(BaseTool):
def name(self) -> str:
return "run_agent"
@property
def allow_external_use(self):
return True, [APIKeyPermission.RUN_AGENT]
@property
def description(self) -> str:
return """Run or schedule an agent from the marketplace or user's library.

View File

@@ -36,6 +36,10 @@ class SearchDocsTool(BaseTool):
def name(self) -> str:
return "search_docs"
@property
def allow_external_use(self):
return True, []
@property
def description(self) -> str:
return (

View File

@@ -54,7 +54,12 @@ def test_server_host_plain_url():
def test_server_host_strips_credentials():
"""netloc would expose user:pass — hostname must not."""
assert server_host("https://user:secret@mcp.example.com/mcp") == "mcp.example.com"
assert (
server_host(
"https://user:secret@mcp.example.com/mcp" # pragma: allowlist secret
)
== "mcp.example.com"
)
def test_server_host_with_port():
@@ -122,7 +127,7 @@ async def test_credential_bearing_url_returns_error():
response = await tool._execute(
user_id=_USER_ID,
session=session,
server_url="https://user:secret@mcp.example.com/mcp",
server_url="https://user:secret@mcp.example.com/mcp", # pragma: allowlist secret # noqa: E501; fmt: skip
)
assert isinstance(response, ErrorResponse)
assert (

View File

@@ -19,6 +19,10 @@ class ValidateAgentGraphTool(BaseTool):
def name(self) -> str:
return "validate_agent_graph"
@property
def allow_external_use(self):
return True, []
@property
def description(self) -> str:
return (

View File

@@ -57,6 +57,10 @@ class WebFetchTool(BaseTool):
def name(self) -> str:
return "web_fetch"
@property
def allow_external_use(self):
return True, []
@property
def description(self) -> str:
return (

View File

@@ -5,6 +5,7 @@ import logging
import os
from typing import Any, Optional
from prisma.enums import APIKeyPermission
from pydantic import BaseModel
from backend.copilot.context import (
@@ -325,6 +326,10 @@ class ListWorkspaceFilesTool(BaseTool):
def name(self) -> str:
return "list_workspace_files"
@property
def allow_external_use(self):
return True, [APIKeyPermission.READ_FILES]
@property
def description(self) -> str:
return (
@@ -439,6 +444,10 @@ class ReadWorkspaceFileTool(BaseTool):
def name(self) -> str:
return "read_workspace_file"
@property
def allow_external_use(self):
return True, [APIKeyPermission.READ_FILES]
@property
def description(self) -> str:
return (
@@ -656,6 +665,10 @@ class WriteWorkspaceFileTool(BaseTool):
def name(self) -> str:
return "write_workspace_file"
@property
def allow_external_use(self):
return True, [APIKeyPermission.WRITE_FILES]
@property
def description(self) -> str:
return (
@@ -846,6 +859,10 @@ class DeleteWorkspaceFileTool(BaseTool):
def name(self) -> str:
return "delete_workspace_file"
@property
def allow_external_use(self):
return True, [APIKeyPermission.WRITE_FILES]
@property
def description(self) -> str:
return (

View File

@@ -100,7 +100,10 @@ async def test_block_credit_usage(server: SpinTestServer):
graph_exec_id="test_graph_exec",
node_exec_id="test_node_exec",
block_id=AITextGeneratorBlock().id,
inputs={"model": "gpt-4-turbo", "api_key": "owned_api_key"},
inputs={
"model": "gpt-4-turbo",
"api_key": "owned_api_key", # pragma: allowlist secret
},
execution_context=ExecutionContext(user_timezone="UTC"),
),
)

View File

@@ -63,21 +63,6 @@ class GraphSettings(BaseModel):
bool, BeforeValidator(lambda v: v if v is not None else False)
] = False
@classmethod
def from_graph(
cls,
graph: "GraphModel",
hitl_safe_mode: bool | None = None,
sensitive_action_safe_mode: bool = False,
) -> "GraphSettings":
# Default to True if not explicitly set
if hitl_safe_mode is None:
hitl_safe_mode = True
return cls(
human_in_the_loop_safe_mode=hitl_safe_mode,
sensitive_action_safe_mode=sensitive_action_safe_mode,
)
class Link(BaseDbModel):
source_id: str
@@ -973,13 +958,6 @@ class GraphModelWithoutNodes(GraphModel):
sub_graphs: list[BaseGraph] = Field(default_factory=list, exclude=True)
class GraphsPaginated(BaseModel):
"""Response schema for paginated graphs."""
graphs: list[GraphMeta]
pagination: Pagination
# --------------------- CRUD functions --------------------- #
@@ -1013,7 +991,7 @@ async def list_graphs_paginated(
page: int = 1,
page_size: int = 25,
filter_by: Literal["active"] | None = "active",
) -> GraphsPaginated:
) -> tuple[list[GraphMeta], Pagination]:
"""
Retrieves paginated graph metadata objects.
@@ -1024,7 +1002,8 @@ async def list_graphs_paginated(
filter_by: An optional filter to either select graphs.
Returns:
GraphsPaginated: Paginated list of graph metadata.
list[GraphMeta]: List of graph info objects.
Pagination: Pagination information.
"""
where_clause: AgentGraphWhereInput = {"userId": user_id}
@@ -1047,14 +1026,11 @@ async def list_graphs_paginated(
graph_models = [GraphMeta.from_db(graph) for graph in graphs]
return GraphsPaginated(
graphs=graph_models,
pagination=Pagination(
total_items=total_count,
total_pages=total_pages,
current_page=page,
page_size=page_size,
),
return graph_models, Pagination(
total_items=total_count,
total_pages=total_pages,
current_page=page,
page_size=page_size,
)

View File

@@ -6,11 +6,11 @@ Handles all database operations for pending human reviews.
import asyncio
import logging
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Optional
from typing import Awaitable, Optional
from prisma.enums import ReviewStatus
from prisma.models import AgentNodeExecution, PendingHumanReview
from prisma.types import PendingHumanReviewUpdateInput
from prisma.types import PendingHumanReviewUpdateInput, PendingHumanReviewWhereInput
from pydantic import BaseModel
from backend.api.features.executions.review.model import (
@@ -23,9 +23,7 @@ from backend.copilot.constants import (
)
from backend.data.execution import get_graph_execution_meta
from backend.util.json import SafeJson
if TYPE_CHECKING:
pass
from backend.util.models import Pagination
logger = logging.getLogger(__name__)
@@ -239,7 +237,7 @@ async def get_or_create_human_review(
async def get_pending_review_by_node_exec_id(
node_exec_id: str, user_id: str
) -> Optional["PendingHumanReviewModel"]:
) -> Optional[PendingHumanReviewModel]:
"""
Get a pending review by its node execution ID.
@@ -342,6 +340,60 @@ async def has_pending_reviews_for_graph_exec(graph_exec_id: str) -> bool:
return count > 0
async def get_reviews(
user_id: str,
graph_exec_id: Optional[str] = None,
status: Optional[ReviewStatus] = None,
page: int = 1,
page_size: int = 25,
) -> tuple[list[PendingHumanReviewModel], Pagination]:
"""
Get reviews for a user with pagination, optionally filtered by execution and status.
Args:
user_id: User ID to get reviews for
graph_exec_id: Optional graph execution ID to scope to
status: Optional review status filter
page: Page number (1-indexed)
page_size: Number of reviews per page
Returns:
List of reviews and pagination info
"""
from backend.data.execution import get_node_execution
where: PendingHumanReviewWhereInput = {"userId": user_id}
if graph_exec_id:
where["graphExecId"] = graph_exec_id
if status:
where["status"] = status
offset = (page - 1) * page_size
total_count = await PendingHumanReview.prisma().count(where=where)
_reviews = await PendingHumanReview.prisma().find_many(
where=where,
order={"createdAt": "desc"},
skip=offset,
take=page_size,
)
reviews = []
for _review in _reviews:
node_id = await _resolve_node_id(_review.nodeExecId, get_node_execution)
reviews.append(PendingHumanReviewModel.from_db(_review, node_id=node_id))
total_pages = max(1, (total_count + page_size - 1) // page_size)
return reviews, Pagination(
total_items=total_count,
total_pages=total_pages,
current_page=page,
page_size=page_size,
)
async def _resolve_node_id(node_exec_id: str, get_node_execution) -> str:
"""Resolve node_id from a node_exec_id.
@@ -357,72 +409,22 @@ async def _resolve_node_id(node_exec_id: str, get_node_execution) -> str:
async def get_pending_reviews_for_user(
user_id: str, page: int = 1, page_size: int = 25
) -> list["PendingHumanReviewModel"]:
"""
Get all pending reviews for a user with pagination.
Args:
user_id: User ID to get reviews for
page: Page number (1-indexed)
page_size: Number of reviews per page
Returns:
List of pending review models with node_id included
"""
# Local import to avoid event loop conflicts in tests
from backend.data.execution import get_node_execution
# Calculate offset for pagination
offset = (page - 1) * page_size
reviews = await PendingHumanReview.prisma().find_many(
where={"userId": user_id, "status": ReviewStatus.WAITING},
order={"createdAt": "desc"},
skip=offset,
take=page_size,
) -> list[PendingHumanReviewModel]:
"""Get all pending reviews for a user with pagination."""
reviews, _ = await get_reviews(
user_id=user_id, status=ReviewStatus.WAITING, page=page, page_size=page_size
)
# Fetch node_id for each review from NodeExecution
result = []
for review in reviews:
node_id = await _resolve_node_id(review.nodeExecId, get_node_execution)
result.append(PendingHumanReviewModel.from_db(review, node_id=node_id))
return result
return reviews
async def get_pending_reviews_for_execution(
graph_exec_id: str, user_id: str
) -> list[PendingHumanReviewModel]:
"""
Get all pending reviews for a specific graph execution.
Args:
graph_exec_id: Graph execution ID
user_id: User ID for security validation
Returns:
List of pending review models with node_id included
"""
# Local import to avoid event loop conflicts in tests
from backend.data.execution import get_node_execution
reviews = await PendingHumanReview.prisma().find_many(
where={
"userId": user_id,
"graphExecId": graph_exec_id,
"status": ReviewStatus.WAITING,
},
order={"createdAt": "asc"},
"""Get all pending reviews for a specific graph execution."""
reviews, _ = await get_reviews(
user_id=user_id, graph_exec_id=graph_exec_id, status=ReviewStatus.WAITING
)
# Fetch node_id for each review from NodeExecution
result = []
for review in reviews:
node_id = await _resolve_node_id(review.nodeExecId, get_node_execution)
result.append(PendingHumanReviewModel.from_db(review, node_id=node_id))
return result
return reviews
async def process_all_reviews_for_execution(
@@ -455,8 +457,8 @@ async def process_all_reviews_for_execution(
)
# Separate into pending and already-processed reviews
reviews_to_process = []
already_processed = []
reviews_to_process: list[PendingHumanReview] = []
already_processed: list[PendingHumanReview] = []
for review in all_reviews:
if review.status == ReviewStatus.WAITING:
reviews_to_process.append(review)
@@ -490,7 +492,7 @@ async def process_all_reviews_for_execution(
)
# Create parallel update tasks for reviews that still need processing
update_tasks = []
update_tasks: list[Awaitable[PendingHumanReview | None]] = []
for review in reviews_to_process:
new_status, reviewed_data, message = review_decisions[review.nodeExecId]
@@ -529,8 +531,11 @@ async def process_all_reviews_for_execution(
# Combine updated reviews with already-processed ones (for idempotent response)
all_result_reviews = list(updated_reviews) + already_processed
result = {}
result: dict[str, PendingHumanReviewModel] = {}
for review in all_result_reviews:
if review is None:
continue
if is_copilot_synthetic_id(review.nodeExecId):
# CoPilot synthetic node_exec_ids encode node_id as "{node_id}:{random}"
node_id = parse_node_id_from_exec_id(review.nodeExecId)

View File

@@ -128,8 +128,9 @@ async def test_get_pending_reviews_for_user(
sample_db_review,
):
"""Test getting pending reviews for a user with pagination"""
mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_find_many.return_value.find_many = AsyncMock(return_value=[sample_db_review])
mock_prisma = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_prisma.return_value.count = AsyncMock(return_value=11)
mock_prisma.return_value.find_many = AsyncMock(return_value=[sample_db_review])
# Mock get_node_execution to return node with node_id (async function)
mock_node_exec = Mock()
@@ -146,7 +147,7 @@ async def test_get_pending_reviews_for_user(
assert result[0].node_id == "test_node_def_789"
# Verify pagination parameters
call_args = mock_find_many.return_value.find_many.call_args
call_args = mock_prisma.return_value.find_many.call_args
assert call_args.kwargs["skip"] == 10 # (page-1) * page_size = (2-1) * 10
assert call_args.kwargs["take"] == 10
@@ -157,8 +158,9 @@ async def test_get_pending_reviews_for_execution(
sample_db_review,
):
"""Test getting pending reviews for specific execution"""
mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_find_many.return_value.find_many = AsyncMock(return_value=[sample_db_review])
mock_prisma = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_prisma.return_value.count = AsyncMock(return_value=1)
mock_prisma.return_value.find_many = AsyncMock(return_value=[sample_db_review])
# Mock get_node_execution to return node with node_id (async function)
mock_node_exec = Mock()
@@ -177,7 +179,7 @@ async def test_get_pending_reviews_for_execution(
assert result[0].node_id == "test_node_def_789"
# Verify it filters by execution and user
call_args = mock_find_many.return_value.find_many.call_args
call_args = mock_prisma.return_value.find_many.call_args
where_clause = call_args.kwargs["where"]
assert where_clause["userId"] == "test-user-123"
assert where_clause["graphExecId"] == "test_graph_exec_456"

View File

@@ -89,6 +89,7 @@ def library_agent_include(
user_id: str,
include_nodes: bool = True,
include_executions: bool = True,
include_store_listing: bool = False,
execution_limit: int = MAX_LIBRARY_AGENT_EXECUTIONS_FETCH,
) -> prisma.types.LibraryAgentInclude:
"""
@@ -98,6 +99,7 @@ def library_agent_include(
user_id: User ID for filtering user-specific data
include_nodes: Whether to include graph nodes (default: True, needed for get_sub_graphs)
include_executions: Whether to include executions (default: True, safe with execution_limit)
include_store_listing: Whether to include marketplace listing info (default: False, adds extra joins)
execution_limit: Limit on executions to fetch (default: MAX_LIBRARY_AGENT_EXECUTIONS_FETCH)
Defaults maintain backward compatibility and safety - includes everything needed for all functionality.
@@ -116,7 +118,7 @@ def library_agent_include(
# Build AgentGraph include based on requested options
if include_nodes or include_executions:
agent_graph_include = {}
agent_graph_include: prisma.types.AgentGraphIncludeFromAgentGraph = {}
# Add nodes if requested (always full nodes)
if include_nodes:
@@ -130,12 +132,26 @@ def library_agent_include(
"take": execution_limit,
}
result["AgentGraph"] = cast(
prisma.types.AgentGraphArgsFromLibraryAgent,
{"include": agent_graph_include},
)
result["AgentGraph"] = {"include": agent_graph_include}
else:
# Default: Basic metadata only (fast - recommended for most use cases)
result["AgentGraph"] = True # Basic graph metadata (name, description, id)
if include_store_listing:
if result["AgentGraph"] is True:
result["AgentGraph"] = {"include": {}}
# We can't get StoreListing directly from AgentGraph, so we take the
# latest published listing version and get it from there:
result["AgentGraph"]["include"]["StoreListingVersions"] = {
"order_by": {"version": "desc"},
"take": 1,
"where": {
"submissionStatus": prisma.enums.SubmissionStatus.APPROVED,
"isDeleted": False,
"isAvailable": True,
},
"include": {"StoreListing": {"include": {"CreatorProfile": True}}},
}
return result

View File

@@ -44,7 +44,7 @@ class NotFoundError(ValueError):
"""The requested record was not found, resulting in an error condition"""
class GraphNotFoundError(ValueError):
class GraphNotFoundError(NotFoundError):
"""The requested Agent Graph was not found, resulting in an error condition"""

View File

@@ -1,32 +0,0 @@
BEGIN;
-- Drop illogical column StoreListing.agentGraphVersion;
ALTER TABLE "StoreListing" DROP CONSTRAINT "StoreListing_agentGraphId_agentGraphVersion_fkey";
DROP INDEX "StoreListing_agentGraphId_agentGraphVersion_idx";
ALTER TABLE "StoreListing" DROP COLUMN "agentGraphVersion";
-- Add uniqueness constraint to Profile.userId and remove invalid data
--
-- Delete any profiles with null userId (which is invalid and doesn't occur in theory)
DELETE FROM "Profile" WHERE "userId" IS NULL;
--
-- Delete duplicate profiles per userId, keeping the most recently updated one
DELETE FROM "Profile"
WHERE "id" IN (
SELECT "id" FROM (
SELECT "id", ROW_NUMBER() OVER (
PARTITION BY "userId" ORDER BY "updatedAt" DESC, "id" DESC
) AS rn
FROM "Profile"
) ranked
WHERE rn > 1
);
--
-- Add userId uniqueness constraint
ALTER TABLE "Profile" ALTER COLUMN "userId" SET NOT NULL;
CREATE UNIQUE INDEX "Profile_userId_key" ON "Profile"("userId");
-- Add formal relation StoreListing.owningUserId -> Profile.userId
ALTER TABLE "StoreListing" ADD CONSTRAINT "StoreListing_owner_Profile_fkey" FOREIGN KEY ("owningUserId") REFERENCES "Profile"("userId") ON DELETE CASCADE ON UPDATE CASCADE;
COMMIT;

View File

@@ -992,7 +992,7 @@ model StoreListing {
ActiveVersion StoreListingVersion? @relation("ActiveVersion", fields: [activeVersionId], references: [id])
// The agent link here is only so we can do lookup on agentId
agentGraphId String @unique
agentGraphId String @unique
owningUserId String
OwningUser User @relation(fields: [owningUserId], references: [id])
@@ -1137,18 +1137,34 @@ enum SubmissionStatus {
}
enum APIKeyPermission {
// Legacy v1 permissions (kept for backward compatibility)
IDENTITY // Info about the authenticated user
EXECUTE_GRAPH // Can execute agent graphs
EXECUTE_GRAPH // Can execute agent graphs (v1 only)
READ_GRAPH // Can get graph versions and details
WRITE_GRAPH // Can create and update agent graphs
EXECUTE_BLOCK // Can execute individual blocks
EXECUTE_BLOCK // Can execute individual blocks (v1 only)
READ_BLOCK // Can get block information
READ_STORE // Can read store agents and creators
WRITE_LIBRARY // Can add agents to library
USE_TOOLS // Can use chat tools via external API
MANAGE_INTEGRATIONS // Can initiate OAuth flows and complete them
USE_TOOLS // Can use chat tools via external API (v1 only)
MANAGE_INTEGRATIONS // Can initiate OAuth flows and complete them (v1 only)
READ_INTEGRATIONS // Can list credentials and providers
DELETE_INTEGRATIONS // Can delete credentials
DELETE_INTEGRATIONS // Can delete credentials (v1 only)
// V2 permissions
READ_SCHEDULE // Can list schedules
WRITE_SCHEDULE // Can create and delete schedules
READ_STORE // Can read marketplace submissions (listings are public and accessible without this permission)
WRITE_STORE // Can create, update, delete marketplace submissions
READ_LIBRARY // Can list library agents and runs
RUN_AGENT // Can run agents from library
READ_RUN // Can list and get run details
WRITE_RUN // Can stop and delete runs
SHARE_RUN // Can share/unshare agent runs
READ_RUN_REVIEW // Can list pending human-in-the-loop reviews
WRITE_RUN_REVIEW // Can submit human-in-the-loop review responses
READ_CREDITS // Can get credit balance and transactions
READ_FILES // Can list and download workspace files
WRITE_FILES // Can upload and delete workspace files
}
model APIKey {

View File

@@ -0,0 +1,5 @@
{
"detail": "Agent #nonexistent not found",
"hint": "Adjust the request and retry.",
"message": "Failed to process GET /library/agents/nonexistent"
}

View File

@@ -0,0 +1,5 @@
{
"detail": "connection refused",
"hint": "Check server logs and dependent services.",
"message": "Failed to process GET /library/agents/some-id"
}

View File

@@ -0,0 +1,5 @@
{
"detail": "Invalid graph version: -1",
"hint": "Adjust the request and retry.",
"message": "Failed to process PATCH /library/agents/some-id"
}

View File

@@ -377,7 +377,7 @@ class TestExtractSearchTermsFromSteps:
def test_extracts_terms_from_instructions_type(self):
"""Test extraction from valid instructions decomposition result."""
decomposition_result = {
decomposition_result: core.DecompositionResult = {
"type": "instructions",
"steps": [
{
@@ -397,7 +397,7 @@ class TestExtractSearchTermsFromSteps:
def test_returns_empty_for_non_instructions_type(self):
"""Test that non-instructions types return empty list."""
decomposition_result = {
decomposition_result: core.DecompositionResult = {
"type": "clarifying_questions",
"questions": [{"question": "What email?"}],
}
@@ -408,7 +408,7 @@ class TestExtractSearchTermsFromSteps:
def test_deduplicates_terms_case_insensitively(self):
"""Test that duplicate terms are removed (case-insensitive)."""
decomposition_result = {
decomposition_result: core.DecompositionResult = {
"type": "instructions",
"steps": [
{"description": "Send Email", "name": "send email"},
@@ -424,7 +424,7 @@ class TestExtractSearchTermsFromSteps:
def test_filters_short_terms(self):
"""Test that terms with 3 or fewer characters are filtered out."""
decomposition_result = {
decomposition_result: core.DecompositionResult = {
"type": "instructions",
"steps": [
{"description": "ab", "action": "xyz"}, # Both too short
@@ -440,7 +440,7 @@ class TestExtractSearchTermsFromSteps:
def test_handles_empty_steps(self):
"""Test handling of empty steps list."""
decomposition_result = {
decomposition_result: core.DecompositionResult = {
"type": "instructions",
"steps": [],
}
@@ -456,7 +456,7 @@ class TestEnrichLibraryAgentsFromSteps:
@pytest.mark.asyncio
async def test_enriches_with_additional_agents(self):
"""Test that additional agents are found based on steps."""
existing_agents = [
existing_agents: list[core.LibraryAgentSummary] = [
{
"graph_id": "existing-123",
"graph_version": 1,
@@ -478,7 +478,7 @@ class TestEnrichLibraryAgentsFromSteps:
}
]
decomposition_result = {
decomposition_result: core.DecompositionResult = {
"type": "instructions",
"steps": [
{"description": "Send email notification"},
@@ -506,7 +506,7 @@ class TestEnrichLibraryAgentsFromSteps:
@pytest.mark.asyncio
async def test_deduplicates_by_graph_id(self):
"""Test that agents with same graph_id are not duplicated."""
existing_agents = [
existing_agents: list[core.LibraryAgentSummary] = [
{
"graph_id": "agent-123",
"graph_version": 1,
@@ -529,7 +529,7 @@ class TestEnrichLibraryAgentsFromSteps:
}
]
decomposition_result = {
decomposition_result: core.DecompositionResult = {
"type": "instructions",
"steps": [{"description": "Some action"}],
}
@@ -552,7 +552,7 @@ class TestEnrichLibraryAgentsFromSteps:
@pytest.mark.asyncio
async def test_deduplicates_by_name(self):
"""Test that agents with same name are not duplicated."""
existing_agents = [
existing_agents: list[core.LibraryAgentSummary] = [
{
"graph_id": "agent-123",
"graph_version": 1,
@@ -575,7 +575,7 @@ class TestEnrichLibraryAgentsFromSteps:
}
]
decomposition_result = {
decomposition_result: core.DecompositionResult = {
"type": "instructions",
"steps": [{"description": "Send email"}],
}
@@ -599,7 +599,7 @@ class TestEnrichLibraryAgentsFromSteps:
@pytest.mark.asyncio
async def test_returns_existing_when_no_steps(self):
"""Test that existing agents are returned when no search terms extracted."""
existing_agents = [
existing_agents: list[core.LibraryAgentSummary] = [
{
"graph_id": "existing-123",
"graph_version": 1,
@@ -610,7 +610,7 @@ class TestEnrichLibraryAgentsFromSteps:
}
]
decomposition_result = {
decomposition_result: core.DecompositionResult = {
"type": "clarifying_questions", # Not instructions type
"questions": [],
}
@@ -629,7 +629,7 @@ class TestEnrichLibraryAgentsFromSteps:
"""Test that only first 3 search terms are used."""
existing_agents = []
decomposition_result = {
decomposition_result: core.DecompositionResult = {
"type": "instructions",
"steps": [
{"description": "First action"},

View File

@@ -21,8 +21,8 @@ x-backend-env: &backend-env # Docker internal service hostnames (override localh
SUPABASE_URL: http://kong:8000
# Database connection string for Docker network
# This cannot be constructed like in .env because we cannot interpolate values set here (DB_HOST)
DATABASE_URL: postgresql://postgres:your-super-secret-and-long-postgres-password@db:5432/postgres?connect_timeout=60&schema=platform
DIRECT_URL: postgresql://postgres:your-super-secret-and-long-postgres-password@db:5432/postgres?connect_timeout=60&schema=platform
DATABASE_URL: postgresql://postgres:your-super-secret-and-long-postgres-password@db:5432/postgres?connect_timeout=60&schema=platform # pragma: allowlist secret
DIRECT_URL: postgresql://postgres:your-super-secret-and-long-postgres-password@db:5432/postgres?connect_timeout=60&schema=platform # pragma: allowlist secret
# Common env_file configuration for backend services
x-backend-env-files: &backend-env-files

View File

@@ -6277,7 +6277,7 @@
"requestBody": {
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/Profile" }
"schema": { "$ref": "#/components/schemas/ProfileUpdateRequest" }
}
},
"required": true
@@ -6921,12 +6921,25 @@
"WRITE_GRAPH",
"EXECUTE_BLOCK",
"READ_BLOCK",
"READ_STORE",
"WRITE_LIBRARY",
"USE_TOOLS",
"MANAGE_INTEGRATIONS",
"READ_INTEGRATIONS",
"DELETE_INTEGRATIONS"
"DELETE_INTEGRATIONS",
"READ_SCHEDULE",
"WRITE_SCHEDULE",
"READ_STORE",
"WRITE_STORE",
"READ_LIBRARY",
"RUN_AGENT",
"READ_RUN",
"WRITE_RUN",
"SHARE_RUN",
"READ_RUN_REVIEW",
"WRITE_RUN_REVIEW",
"READ_CREDITS",
"READ_FILES",
"WRITE_FILES"
],
"title": "APIKeyPermission"
},
@@ -11347,26 +11360,6 @@
],
"title": "PostmarkSubscriptionChangeWebhook"
},
"Profile": {
"properties": {
"username": { "type": "string", "title": "Username" },
"name": { "type": "string", "title": "Name" },
"description": { "type": "string", "title": "Description" },
"avatar_url": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Avatar Url"
},
"links": {
"items": { "type": "string" },
"type": "array",
"title": "Links"
}
},
"type": "object",
"required": ["username", "name", "description", "avatar_url", "links"],
"title": "Profile",
"description": "Marketplace user profile (only attributes that the user can update)"
},
"ProfileDetails": {
"properties": {
"username": { "type": "string", "title": "Username" },
@@ -11395,6 +11388,36 @@
"title": "ProfileDetails",
"description": "Marketplace user profile (including read-only fields)"
},
"ProfileUpdateRequest": {
"properties": {
"username": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Username"
},
"name": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Name"
},
"description": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Description"
},
"avatar_url": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Avatar Url"
},
"links": {
"anyOf": [
{ "items": { "type": "string" }, "type": "array" },
{ "type": "null" }
],
"title": "Links"
}
},
"type": "object",
"title": "ProfileUpdateRequest",
"description": "Marketplace user profile (only attributes that the user can update)"
},
"Provider": {
"properties": {
"name": {

View File

@@ -2,7 +2,7 @@
## Overview
The AutoGPT platform implements OAuth 2.0 in two distinct contexts:
The AutoGPT Platform implements OAuth 2.0 in two distinct contexts:
1. **User Authentication (SSO)**: Handled by Supabase for platform login
2. **API Integration Credentials**: Custom OAuth implementation for third-party service access
@@ -324,7 +324,7 @@ stateDiagram-v2
### User Authentication (SSO) via Supabase
- **Purpose**: Authenticate users to access the AutoGPT platform
- **Purpose**: Authenticate users to access the AutoGPT Platform
- **Provider**: Supabase Auth (currently supports Google SSO)
- **Flow Path**: `/login` → Supabase OAuth → `/auth/callback`
- **Session Storage**: Supabase-managed cookies

View File

@@ -1,14 +1,18 @@
# AutoGPT Platform External API Guide
The AutoGPT Platform provides an External API that allows you to programmatically interact with agents, blocks, the store, and more.
The AutoGPT Platform provides an External API that allows you to programmatically interact with agents, blocks, the marketplace, and more.
## API Documentation
Full API documentation with interactive examples is available at:
**[https://backend.agpt.co/external-api/docs](https://backend.agpt.co/external-api/docs)**
- **Main**: [https://backend.agpt.co/external-api/docs](https://backend.agpt.co/external-api/docs)
- **v2 API**: [https://backend.agpt.co/external-api/v2/docs](https://backend.agpt.co/external-api/v2/docs)
- **v1 API**: [https://backend.agpt.co/external-api/v1/docs](https://backend.agpt.co/external-api/v1/docs)
This Swagger UI documentation includes all available endpoints, request/response schemas, and allows you to try out API calls directly.
The Swagger UI documentation includes all available endpoints, request/response schemas, and allows you to try out API calls directly.
**Recommendation**: New integrations should use the v2 API.
## Authentication Methods
@@ -16,11 +20,12 @@ The External API supports two authentication methods:
### 1. API Keys
API keys are the simplest way to authenticate. Generate an API key from your AutoGPT Platform account settings and include it in your requests:
API keys are the simplest way to authenticate. Generate an API key from your AutoGPT Platform account settings and include it in your requests using the `X-API-Key` header:
```http
GET /external-api/v1/blocks
X-API-Key: your_api_key_here
```bash
# List available blocks
curl -H "X-API-Key: YOUR_API_KEY" \
https://backend.agpt.co/external-api/v1/blocks
```
API keys are ideal for:
@@ -32,51 +37,62 @@ API keys are ideal for:
For applications that need to act on behalf of users, use OAuth 2.0. This allows users to authorize your application to access their AutoGPT resources.
OAuth is ideal for:
- Third-party applications
- "Sign in with AutoGPT" (SSO, Single Sign-On) functionality
- Applications that need user-specific permissions
See the [SSO Integration Guide](sso-guide.md) for complete OAuth implementation details.
## Available Scopes
When using OAuth, request only the scopes your application needs:
| Scope | Description |
|-------|-------------|
| `IDENTITY` | Read user ID, e-mail, and timezone |
| `EXECUTE_GRAPH` | Run agents |
| `READ_GRAPH` | Read agent run results |
| `EXECUTE_BLOCK` | Run individual blocks |
| `READ_BLOCK` | Read block definitions |
| `READ_STORE` | Access the agent store |
| `USE_TOOLS` | Use platform tools |
| `MANAGE_INTEGRATIONS` | Create and update user integrations |
| `READ_INTEGRATIONS` | Read user integration status |
| `DELETE_INTEGRATIONS` | Remove user integrations |
## Quick Start
### Using an API Key
```bash
# List available blocks
curl -H "X-API-Key: YOUR_API_KEY" \
https://backend.agpt.co/external-api/v1/blocks
```
### Using OAuth
To get started:
1. Register an OAuth application (contact platform administrator)
2. Implement the OAuth flow as described in the [SSO Guide](sso-guide.md)
3. Use the obtained access token:
2. Implement the OAuth flow as described in the [OAuth Guide](oauth-guide.md)
3. Go through the OAuth flow to authorize your app and obtain an access token
4. Make API requests with the access token in the `Authorization: Bearer` header:
```bash
curl -H "Authorization: Bearer agpt_xt_..." \
https://backend.agpt.co/external-api/v1/blocks
```
OAuth is ideal for:
- Third-party applications
- "Sign in with AutoGPT" (SSO, Single Sign-On) functionality
- Applications that need user-specific permissions
See the [OAuth Integration Guide](oauth-guide.md) for complete OAuth implementation details.
## Available Scopes
When creating API keys or using OAuth, request only the scopes your application needs.
### Core Scopes
| Scope | Description |
|-------|-------------|
| `IDENTITY` | Read user ID, e-mail, and timezone |
| `READ_GRAPH` | Read graph/agent definitions and versions |
| `WRITE_GRAPH` | Create, update, and delete graphs |
| `READ_BLOCK` | Read block definitions |
| `READ_STORE` | Access the agent marketplace |
| `WRITE_STORE` | Create, update, and delete marketplace submissions |
| `READ_LIBRARY` | List library agents and their runs |
| `RUN_AGENT` | Execute agents from your library |
| `READ_RUN` | List and get agent run details |
| `WRITE_RUN` | Stop and delete runs |
| `READ_RUN_REVIEW` | List pending human-in-the-loop reviews |
| `WRITE_RUN_REVIEW` | Submit human-in-the-loop review responses |
| `READ_SCHEDULE` | List execution schedules |
| `WRITE_SCHEDULE` | Create and delete schedules |
| `READ_CREDITS` | Get credit balance and transaction history |
| `READ_INTEGRATIONS` | List OAuth credentials |
| `UPLOAD_FILES` | Upload files for agent input |
### Legacy Scopes (v1 only)
| Scope | Description |
|-------|-------------|
| `EXECUTE_GRAPH` | Execute graphs directly (use `RUN_AGENT` in v2) |
| `EXECUTE_BLOCK` | Execute individual blocks |
| `USE_TOOLS` | Use chat tools via external API |
| `MANAGE_INTEGRATIONS` | Initiate and complete OAuth flows |
| `DELETE_INTEGRATIONS` | Delete OAuth credentials |
## Support
For issues or questions about API integration: