Compare commits

..

7 Commits

Author SHA1 Message Date
Nicholas Tindle
446c71fec8 Merge branch 'dev' into gitbook 2026-01-15 12:59:51 -07:00
claude[bot]
ec4c2caa14 Merge remote-tracking branch 'origin/dev' into gitbook 2026-01-12 21:45:54 +00:00
Nicholas Tindle
516e8b4b25 fix: move files to the right places 2026-01-12 13:46:56 -06:00
Nicholas Tindle
e7e118b5a8 wip: fixes 2026-01-09 10:23:31 -07:00
Nicholas Tindle
92a7a7e6d6 wip: fixes 2026-01-09 10:21:06 -07:00
Nicholas Tindle
e16995347f Refactor/gitbook platform structure (#11739)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 11:17:32 -06:00
Nicholas Tindle
234d3acb4c refactor(docs): restructure platform docs for GitBook and remove MkDocs (#11738)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 11:09:17 -06:00
50 changed files with 628 additions and 7416 deletions

View File

@@ -1,9 +1,6 @@
# Ignore everything by default, selectively add things to context
*
# Documentation (for embeddings/search)
!docs/
# Platform - Libs
!autogpt_platform/autogpt_libs/autogpt_libs/
!autogpt_platform/autogpt_libs/pyproject.toml

View File

@@ -100,7 +100,6 @@ COPY autogpt_platform/backend/migrations /app/autogpt_platform/backend/migration
FROM server_dependencies AS server
COPY autogpt_platform/backend /app/autogpt_platform/backend
COPY docs /app/docs
RUN poetry install --no-ansi --only-root
ENV PORT=8000

View File

@@ -1,57 +1,21 @@
"""
External API Application
This module defines the main FastAPI application for the external API,
which mounts the v1 and v2 sub-applications.
"""
from fastapi import FastAPI
from fastapi.responses import RedirectResponse
from backend.api.middleware.security import SecurityHeadersMiddleware
from backend.monitoring.instrumentation import instrument_fastapi
from .v1.app import v1_app
from .v2.app import v2_app
DESCRIPTION = """
The external API provides programmatic access to the AutoGPT Platform for building
integrations, automations, and custom applications.
### API Versions
| Version | End of Life | Path | Documentation |
|---------------------|-------------|------------------------|---------------|
| **v2** | | `/external-api/v2/...` | [v2 docs](v2/docs) |
| **v1** (deprecated) | 2025-05-01 | `/external-api/v1/...` | [v1 docs](v1/docs) |
**Recommendation**: New integrations should use v2.
For authentication details and usage examples, see the
[API Integration Guide](https://docs.agpt.co/platform/integrating/api-guide/).
"""
from .v1.routes import v1_router
external_api = FastAPI(
title="AutoGPT Platform API",
summary="External API for AutoGPT Platform integrations",
description=DESCRIPTION,
version="2.0.0",
title="AutoGPT External API",
description="External API for AutoGPT integrations",
docs_url="/docs",
redoc_url="/redoc",
version="1.0",
)
external_api.add_middleware(SecurityHeadersMiddleware)
external_api.include_router(v1_router, prefix="/v1")
@external_api.get("/", include_in_schema=False)
async def root_redirect() -> RedirectResponse:
"""Redirect root to API documentation."""
return RedirectResponse(url="/docs")
# Mount versioned sub-applications
# Each sub-app has its own /docs page at /v1/docs and /v2/docs
external_api.mount("/v1", v1_app)
external_api.mount("/v2", v2_app)
# Add Prometheus instrumentation to the main app
# Add Prometheus instrumentation
instrument_fastapi(
external_api,
service_name="external-api",

View File

@@ -1,39 +0,0 @@
"""
V1 External API Application
This module defines the FastAPI application for the v1 external API.
"""
from fastapi import FastAPI
from backend.api.middleware.security import SecurityHeadersMiddleware
from .routes import v1_router
DESCRIPTION = """
The v1 API provides access to core AutoGPT functionality for external integrations.
For authentication details and usage examples, see the
[API Integration Guide](https://docs.agpt.co/platform/integrating/api-guide/).
"""
v1_app = FastAPI(
title="AutoGPT Platform API",
summary="External API for AutoGPT Platform integrations (v1)",
description=DESCRIPTION,
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
openapi_tags=[
{"name": "user", "description": "User information"},
{"name": "blocks", "description": "Block operations"},
{"name": "graphs", "description": "Graph execution"},
{"name": "store", "description": "Marketplace agents and creators"},
{"name": "integrations", "description": "OAuth credential management"},
{"name": "tools", "description": "AI assistant tools"},
],
)
v1_app.add_middleware(SecurityHeadersMiddleware)
v1_app.include_router(v1_router)

View File

@@ -1,9 +0,0 @@
"""
V2 External API
This module provides the v2 external API for programmatic access to the AutoGPT Platform.
"""
from .routes import v2_router
__all__ = ["v2_router"]

View File

@@ -1,82 +0,0 @@
"""
V2 External API Application
This module defines the FastAPI application for the v2 external API.
"""
from fastapi import FastAPI
from backend.api.middleware.security import SecurityHeadersMiddleware
from .routes import v2_router
DESCRIPTION = """
The v2 API provides comprehensive access to the AutoGPT Platform for building
integrations, automations, and custom applications.
### Key Improvements over v1
- **Consistent naming**: Uses `graph_id`/`graph_version` consistently
- **Better pagination**: All list endpoints support pagination
- **Comprehensive coverage**: Access to library, runs, schedules, credits, and more
- **Human-in-the-loop**: Review and approve agent decisions via the API
For authentication details and usage examples, see the
[API Integration Guide](https://docs.agpt.co/platform/integrating/api-guide/).
### Pagination
List endpoints return paginated responses. Use `page` and `page_size` query
parameters to navigate results. Maximum page size is 100 items.
"""
v2_app = FastAPI(
title="AutoGPT Platform External API",
summary="External API for AutoGPT Platform integrations (v2)",
description=DESCRIPTION,
version="2.0.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
openapi_tags=[
{
"name": "graphs",
"description": "Create, update, and manage agent graphs",
},
{
"name": "schedules",
"description": "Manage scheduled graph executions",
},
{
"name": "blocks",
"description": "Discover available building blocks",
},
{
"name": "marketplace",
"description": "Browse agents and creators, manage submissions",
},
{
"name": "library",
"description": "Access your agent library and execute agents",
},
{
"name": "runs",
"description": "Monitor execution runs and human-in-the-loop reviews",
},
{
"name": "credits",
"description": "Check balance and view transaction history",
},
{
"name": "integrations",
"description": "Manage OAuth credentials for external services",
},
{
"name": "files",
"description": "Upload files for agent input",
},
],
)
v2_app.add_middleware(SecurityHeadersMiddleware)
v2_app.include_router(v2_router)

View File

@@ -1,140 +0,0 @@
"""
V2 External API - Blocks Endpoints
Provides read-only access to available building blocks.
"""
import logging
from typing import Any
from fastapi import APIRouter, Response, Security
from fastapi.concurrency import run_in_threadpool
from prisma.enums import APIKeyPermission
from pydantic import BaseModel, Field
from backend.api.external.middleware import require_permission
from backend.data.auth.base import APIAuthorizationInfo
from backend.data.block import get_blocks
from backend.util.cache import cached
from backend.util.json import dumps
logger = logging.getLogger(__name__)
blocks_router = APIRouter()
# ============================================================================
# Models
# ============================================================================
class BlockCost(BaseModel):
"""Cost information for a block."""
cost_type: str = Field(description="Type of cost (e.g., 'per_call', 'per_token')")
cost_filter: dict[str, Any] = Field(
default_factory=dict, description="Conditions for this cost"
)
cost_amount: int = Field(description="Cost amount in credits")
class Block(BaseModel):
"""A building block that can be used in graphs."""
id: str
name: str
description: str
categories: list[str] = Field(default_factory=list)
input_schema: dict[str, Any]
output_schema: dict[str, Any]
costs: list[BlockCost] = Field(default_factory=list)
disabled: bool = Field(default=False)
class BlocksListResponse(BaseModel):
"""Response for listing blocks."""
blocks: list[Block]
total_count: int
# ============================================================================
# Internal Functions
# ============================================================================
def _compute_blocks_sync() -> str:
"""
Synchronous function to compute blocks data.
This does the heavy lifting: instantiate 226+ blocks, compute costs, serialize.
"""
from backend.data.credit import get_block_cost
block_classes = get_blocks()
result = []
for block_class in block_classes.values():
block_instance = block_class()
if not block_instance.disabled:
costs = get_block_cost(block_instance)
# Convert BlockCost BaseModel objects to dictionaries
costs_dict = [
cost.model_dump() if isinstance(cost, BaseModel) else cost
for cost in costs
]
result.append({**block_instance.to_dict(), "costs": costs_dict})
return dumps(result)
@cached(ttl_seconds=3600)
async def _get_cached_blocks() -> str:
"""
Async cached function with thundering herd protection.
On cache miss: runs heavy work in thread pool
On cache hit: returns cached string immediately
"""
return await run_in_threadpool(_compute_blocks_sync)
# ============================================================================
# Endpoints
# ============================================================================
@blocks_router.get(
path="",
summary="List available blocks",
responses={
200: {
"description": "List of available building blocks",
"content": {
"application/json": {
"schema": {
"items": {"additionalProperties": True, "type": "object"},
"type": "array",
}
}
},
}
},
)
async def list_blocks(
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_BLOCK)
),
) -> Response:
"""
List all available building blocks that can be used in graphs.
Each block represents a specific capability (e.g., HTTP request, text processing,
AI completion, etc.) that can be connected in a graph to create an agent.
The response includes input/output schemas for each block, as well as
cost information for blocks that consume credits.
"""
content = await _get_cached_blocks()
return Response(
content=content,
media_type="application/json",
)

View File

@@ -1,36 +0,0 @@
"""
Common utilities for V2 External API
"""
from typing import TypeVar
from pydantic import BaseModel, Field
# Constants for pagination
MAX_PAGE_SIZE = 100
DEFAULT_PAGE_SIZE = 20
class PaginationParams(BaseModel):
"""Common pagination parameters."""
page: int = Field(default=1, ge=1, description="Page number (1-indexed)")
page_size: int = Field(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Number of items per page (max {MAX_PAGE_SIZE})",
)
T = TypeVar("T")
class PaginatedResponse(BaseModel):
"""Generic paginated response wrapper."""
items: list
total_count: int = Field(description="Total number of items across all pages")
page: int = Field(description="Current page number (1-indexed)")
page_size: int = Field(description="Number of items per page")
total_pages: int = Field(description="Total number of pages")

View File

@@ -1,141 +0,0 @@
"""
V2 External API - Credits Endpoints
Provides access to credit balance and transaction history.
"""
import logging
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Query, Security
from prisma.enums import APIKeyPermission
from pydantic import BaseModel, Field
from backend.api.external.middleware import require_permission
from backend.data.auth.base import APIAuthorizationInfo
from backend.data.credit import get_user_credit_model
from .common import DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE
logger = logging.getLogger(__name__)
credits_router = APIRouter()
# ============================================================================
# Models
# ============================================================================
class CreditBalance(BaseModel):
"""User's credit balance."""
balance: int = Field(description="Current credit balance")
class CreditTransaction(BaseModel):
"""A credit transaction."""
transaction_key: str
amount: int = Field(description="Transaction amount (positive or negative)")
type: str = Field(description="One of: TOP_UP, USAGE, GRANT, REFUND")
transaction_time: datetime
running_balance: Optional[int] = Field(
default=None, description="Balance after this transaction"
)
description: Optional[str] = None
class CreditTransactionsResponse(BaseModel):
"""Response for listing credit transactions."""
transactions: list[CreditTransaction]
total_count: int
page: int
page_size: int
total_pages: int
# ============================================================================
# Endpoints
# ============================================================================
@credits_router.get(
path="",
summary="Get credit balance",
response_model=CreditBalance,
)
async def get_balance(
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_CREDITS)
),
) -> CreditBalance:
"""
Get the current credit balance for the authenticated user.
"""
user_credit_model = await get_user_credit_model(auth.user_id)
balance = await user_credit_model.get_credits(auth.user_id)
return CreditBalance(balance=balance)
@credits_router.get(
path="/transactions",
summary="Get transaction history",
response_model=CreditTransactionsResponse,
)
async def get_transactions(
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_CREDITS)
),
page: int = Query(default=1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Items per page (max {MAX_PAGE_SIZE})",
),
transaction_type: Optional[str] = Query(
default=None,
description="Filter by transaction type (TOP_UP, USAGE, GRANT, REFUND)",
),
) -> CreditTransactionsResponse:
"""
Get credit transaction history for the authenticated user.
Returns transactions sorted by most recent first.
"""
user_credit_model = await get_user_credit_model(auth.user_id)
history = await user_credit_model.get_transaction_history(
user_id=auth.user_id,
transaction_count_limit=page_size,
transaction_type=transaction_type,
)
transactions = [
CreditTransaction(
transaction_key=t.transaction_key,
amount=t.amount,
type=t.transaction_type.value,
transaction_time=t.transaction_time,
running_balance=t.running_balance,
description=t.description,
)
for t in history.transactions
]
# Note: The current credit module doesn't support true pagination,
# so we're returning what we have
total_count = len(transactions)
total_pages = 1 # Without true pagination support
return CreditTransactionsResponse(
transactions=transactions,
total_count=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
)

View File

@@ -1,132 +0,0 @@
"""
V2 External API - Files Endpoints
Provides file upload functionality for agent inputs.
"""
import base64
import logging
from fastapi import APIRouter, File, HTTPException, Query, Security, UploadFile
from prisma.enums import APIKeyPermission
from pydantic import BaseModel, Field
from backend.api.external.middleware import require_permission
from backend.data.auth.base import APIAuthorizationInfo
from backend.util.cloud_storage import get_cloud_storage_handler
from backend.util.settings import Settings
from backend.util.virus_scanner import scan_content_safe
logger = logging.getLogger(__name__)
settings = Settings()
files_router = APIRouter()
# ============================================================================
# Models
# ============================================================================
class UploadFileResponse(BaseModel):
"""Response after uploading a file."""
file_uri: str = Field(description="URI to reference the uploaded file in agents")
file_name: str
size: int = Field(description="File size in bytes")
content_type: str
expires_in_hours: int
# ============================================================================
# Endpoints
# ============================================================================
def _create_file_size_error(size_bytes: int, max_size_mb: int) -> HTTPException:
"""Create standardized file size error response."""
return HTTPException(
status_code=400,
detail=f"File size ({size_bytes} bytes) exceeds the maximum allowed size of {max_size_mb}MB",
)
@files_router.post(
path="/upload",
summary="Upload a file",
response_model=UploadFileResponse,
)
async def upload_file(
file: UploadFile = File(...),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.UPLOAD_FILES)
),
provider: str = Query(
default="gcs", description="Storage provider (gcs, s3, azure)"
),
expiration_hours: int = Query(
default=24, ge=1, le=48, description="Hours until file expires (1-48)"
),
) -> UploadFileResponse:
"""
Upload a file to cloud storage for use with agents.
The returned `file_uri` can be used as input to agents that accept file inputs
(e.g., FileStoreBlock, AgentFileInputBlock).
Files are automatically scanned for viruses before storage.
"""
# Check file size limit
max_size_mb = settings.config.upload_file_size_limit_mb
max_size_bytes = max_size_mb * 1024 * 1024
# Try to get file size from headers first
if hasattr(file, "size") and file.size is not None and file.size > max_size_bytes:
raise _create_file_size_error(file.size, max_size_mb)
# Read file content
content = await file.read()
content_size = len(content)
# Double-check file size after reading
if content_size > max_size_bytes:
raise _create_file_size_error(content_size, max_size_mb)
# Extract file info
file_name = file.filename or "uploaded_file"
content_type = file.content_type or "application/octet-stream"
# Virus scan the content
await scan_content_safe(content, filename=file_name)
# Check if cloud storage is configured
cloud_storage = await get_cloud_storage_handler()
if not cloud_storage.config.gcs_bucket_name:
# Fallback to base64 data URI when GCS is not configured
base64_content = base64.b64encode(content).decode("utf-8")
data_uri = f"data:{content_type};base64,{base64_content}"
return UploadFileResponse(
file_uri=data_uri,
file_name=file_name,
size=content_size,
content_type=content_type,
expires_in_hours=expiration_hours,
)
# Store in cloud storage
storage_path = await cloud_storage.store_file(
content=content,
filename=file_name,
provider=provider,
expiration_hours=expiration_hours,
user_id=auth.user_id,
)
return UploadFileResponse(
file_uri=storage_path,
file_name=file_name,
size=content_size,
content_type=content_type,
expires_in_hours=expiration_hours,
)

View File

@@ -1,445 +0,0 @@
"""
V2 External API - Graphs Endpoints
Provides endpoints for managing agent graphs (CRUD operations).
"""
import logging
from fastapi import APIRouter, HTTPException, Query, Security
from prisma.enums import APIKeyPermission
from backend.api.external.middleware import require_permission
from backend.data import graph as graph_db
from backend.data.auth.base import APIAuthorizationInfo
from backend.integrations.webhooks.graph_lifecycle_hooks import (
on_graph_activate,
on_graph_deactivate,
)
from .common import DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE
from .models import (
CreateGraphRequest,
DeleteGraphResponse,
GraphDetails,
GraphLink,
GraphMeta,
GraphNode,
GraphSettings,
GraphsListResponse,
SetActiveVersionRequest,
)
logger = logging.getLogger(__name__)
graphs_router = APIRouter()
def _convert_graph_meta(graph: graph_db.GraphMeta) -> GraphMeta:
"""Convert internal GraphMeta to v2 API model."""
return GraphMeta(
id=graph.id,
version=graph.version,
is_active=graph.is_active,
name=graph.name,
description=graph.description,
created_at=graph.created_at,
input_schema=graph.input_schema,
output_schema=graph.output_schema,
)
def _convert_graph_details(graph: graph_db.GraphModel) -> GraphDetails:
"""Convert internal GraphModel to v2 API GraphDetails model."""
return GraphDetails(
id=graph.id,
version=graph.version,
is_active=graph.is_active,
name=graph.name,
description=graph.description,
created_at=graph.created_at,
input_schema=graph.input_schema,
output_schema=graph.output_schema,
nodes=[
GraphNode(
id=node.id,
block_id=node.block_id,
input_default=node.input_default,
metadata=node.metadata,
)
for node in graph.nodes
],
links=[
GraphLink(
id=link.id,
source_id=link.source_id,
sink_id=link.sink_id,
source_name=link.source_name,
sink_name=link.sink_name,
is_static=link.is_static,
)
for link in graph.links
],
credentials_input_schema=graph.credentials_input_schema,
)
@graphs_router.get(
path="",
summary="List user's graphs",
response_model=GraphsListResponse,
)
async def list_graphs(
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_GRAPH)
),
page: int = Query(default=1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Items per page (max {MAX_PAGE_SIZE})",
),
) -> GraphsListResponse:
"""
List all graphs owned by the authenticated user.
Returns a paginated list of graph metadata (not full graph details).
"""
graphs, pagination_info = await graph_db.list_graphs_paginated(
user_id=auth.user_id,
page=page,
page_size=page_size,
filter_by="active",
)
return GraphsListResponse(
graphs=[_convert_graph_meta(g) for g in graphs],
total_count=pagination_info.total_items,
page=pagination_info.current_page,
page_size=pagination_info.page_size,
total_pages=pagination_info.total_pages,
)
@graphs_router.post(
path="",
summary="Create a new graph",
response_model=GraphDetails,
)
async def create_graph(
create_graph_request: CreateGraphRequest,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_GRAPH)
),
) -> GraphDetails:
"""
Create a new agent graph.
The graph will be validated and assigned a new ID. It will automatically
be added to the user's library.
"""
# Import here to avoid circular imports
from backend.api.features.library import db as library_db
# Convert v2 API Graph model to internal Graph model
internal_graph = graph_db.Graph(
id=create_graph_request.graph.id or "",
version=create_graph_request.graph.version,
is_active=create_graph_request.graph.is_active,
name=create_graph_request.graph.name,
description=create_graph_request.graph.description,
nodes=[
graph_db.Node(
id=node.id,
block_id=node.block_id,
input_default=node.input_default,
metadata=node.metadata,
)
for node in create_graph_request.graph.nodes
],
links=[
graph_db.Link(
id=link.id,
source_id=link.source_id,
sink_id=link.sink_id,
source_name=link.source_name,
sink_name=link.sink_name,
is_static=link.is_static,
)
for link in create_graph_request.graph.links
],
)
graph = graph_db.make_graph_model(internal_graph, auth.user_id)
graph.reassign_ids(user_id=auth.user_id, reassign_graph_id=True)
graph.validate_graph(for_run=False)
await graph_db.create_graph(graph, user_id=auth.user_id)
await library_db.create_library_agent(graph, user_id=auth.user_id)
activated_graph = await on_graph_activate(graph, user_id=auth.user_id)
return _convert_graph_details(activated_graph)
@graphs_router.get(
path="/{graph_id}",
summary="Get graph details",
response_model=GraphDetails,
)
async def get_graph(
graph_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_GRAPH)
),
version: int | None = Query(
default=None,
description="Specific version to retrieve (default: active version)",
),
) -> GraphDetails:
"""
Get detailed information about a specific graph.
By default returns the active version. Use the `version` query parameter
to retrieve a specific version.
"""
graph = await graph_db.get_graph(
graph_id,
version,
user_id=auth.user_id,
include_subgraphs=True,
)
if not graph:
raise HTTPException(status_code=404, detail=f"Graph #{graph_id} not found.")
return _convert_graph_details(graph)
@graphs_router.put(
path="/{graph_id}",
summary="Update graph (creates new version)",
response_model=GraphDetails,
)
async def update_graph(
graph_id: str,
graph_request: CreateGraphRequest,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_GRAPH)
),
) -> GraphDetails:
"""
Update a graph by creating a new version.
This does not modify existing versions - it creates a new version with
the provided content. The new version becomes the active version.
"""
# Import here to avoid circular imports
from backend.api.features.library import db as library_db
graph_data = graph_request.graph
if graph_data.id and graph_data.id != graph_id:
raise HTTPException(400, detail="Graph ID does not match ID in URI")
existing_versions = await graph_db.get_graph_all_versions(
graph_id, user_id=auth.user_id
)
if not existing_versions:
raise HTTPException(404, detail=f"Graph #{graph_id} not found")
latest_version_number = max(g.version for g in existing_versions)
# Convert v2 API Graph model to internal Graph model
internal_graph = graph_db.Graph(
id=graph_id,
version=latest_version_number + 1,
is_active=graph_data.is_active,
name=graph_data.name,
description=graph_data.description,
nodes=[
graph_db.Node(
id=node.id,
block_id=node.block_id,
input_default=node.input_default,
metadata=node.metadata,
)
for node in graph_data.nodes
],
links=[
graph_db.Link(
id=link.id,
source_id=link.source_id,
sink_id=link.sink_id,
source_name=link.source_name,
sink_name=link.sink_name,
is_static=link.is_static,
)
for link in graph_data.links
],
)
current_active_version = next((v for v in existing_versions if v.is_active), None)
graph = graph_db.make_graph_model(internal_graph, auth.user_id)
graph.reassign_ids(user_id=auth.user_id, reassign_graph_id=False)
graph.validate_graph(for_run=False)
new_graph_version = await graph_db.create_graph(graph, user_id=auth.user_id)
if new_graph_version.is_active:
await library_db.update_agent_version_in_library(
auth.user_id, new_graph_version.id, new_graph_version.version
)
new_graph_version = await on_graph_activate(
new_graph_version, user_id=auth.user_id
)
await graph_db.set_graph_active_version(
graph_id=graph_id, version=new_graph_version.version, user_id=auth.user_id
)
if current_active_version:
await on_graph_deactivate(current_active_version, user_id=auth.user_id)
new_graph_version_with_subgraphs = await graph_db.get_graph(
graph_id,
new_graph_version.version,
user_id=auth.user_id,
include_subgraphs=True,
)
assert new_graph_version_with_subgraphs
return _convert_graph_details(new_graph_version_with_subgraphs)
@graphs_router.delete(
path="/{graph_id}",
summary="Delete graph permanently",
response_model=DeleteGraphResponse,
)
async def delete_graph(
graph_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_GRAPH)
),
) -> DeleteGraphResponse:
"""
Permanently delete a graph and all its versions.
This action cannot be undone. All associated executions will remain
but will reference a deleted graph.
"""
if active_version := await graph_db.get_graph(
graph_id=graph_id, version=None, user_id=auth.user_id
):
await on_graph_deactivate(active_version, user_id=auth.user_id)
version_count = await graph_db.delete_graph(graph_id, user_id=auth.user_id)
return DeleteGraphResponse(version_count=version_count)
@graphs_router.get(
path="/{graph_id}/versions",
summary="List all graph versions",
response_model=list[GraphDetails],
)
async def list_graph_versions(
graph_id: str,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_GRAPH)
),
) -> list[GraphDetails]:
"""
Get all versions of a specific graph.
Returns a list of all versions, with the active version marked.
"""
graphs = await graph_db.get_graph_all_versions(graph_id, user_id=auth.user_id)
if not graphs:
raise HTTPException(status_code=404, detail=f"Graph #{graph_id} not found.")
return [_convert_graph_details(g) for g in graphs]
@graphs_router.put(
path="/{graph_id}/versions/active",
summary="Set active graph version",
)
async def set_active_version(
graph_id: str,
request_body: SetActiveVersionRequest,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_GRAPH)
),
) -> None:
"""
Set which version of a graph is the active version.
The active version is used when executing the graph without specifying
a version number.
"""
# Import here to avoid circular imports
from backend.api.features.library import db as library_db
new_active_version = request_body.active_graph_version
new_active_graph = await graph_db.get_graph(
graph_id, new_active_version, user_id=auth.user_id
)
if not new_active_graph:
raise HTTPException(404, f"Graph #{graph_id} v{new_active_version} not found")
current_active_graph = await graph_db.get_graph(
graph_id=graph_id,
version=None,
user_id=auth.user_id,
)
await on_graph_activate(new_active_graph, user_id=auth.user_id)
await graph_db.set_graph_active_version(
graph_id=graph_id,
version=new_active_version,
user_id=auth.user_id,
)
await library_db.update_agent_version_in_library(
auth.user_id, new_active_graph.id, new_active_graph.version
)
if current_active_graph and current_active_graph.version != new_active_version:
await on_graph_deactivate(current_active_graph, user_id=auth.user_id)
@graphs_router.patch(
path="/{graph_id}/settings",
summary="Update graph settings",
response_model=GraphSettings,
)
async def update_graph_settings(
graph_id: str,
settings: GraphSettings,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_GRAPH)
),
) -> GraphSettings:
"""
Update settings for a graph.
Currently supports:
- human_in_the_loop_safe_mode: Enable/disable safe mode for human-in-the-loop blocks
"""
# Import here to avoid circular imports
from backend.api.features.library import db as library_db
from backend.data.graph import GraphSettings as InternalGraphSettings
library_agent = await library_db.get_library_agent_by_graph_id(
graph_id=graph_id, user_id=auth.user_id
)
if not library_agent:
raise HTTPException(404, f"Graph #{graph_id} not found in user's library")
# Convert to internal model
internal_settings = InternalGraphSettings(
human_in_the_loop_safe_mode=settings.human_in_the_loop_safe_mode
)
updated_agent = await library_db.update_library_agent_settings(
user_id=auth.user_id,
agent_id=library_agent.id,
settings=internal_settings,
)
return GraphSettings(
human_in_the_loop_safe_mode=updated_agent.settings.human_in_the_loop_safe_mode
)

View File

@@ -1,271 +0,0 @@
"""
V2 External API - Integrations Endpoints
Provides access to user's integration credentials.
"""
import logging
from typing import Optional
from fastapi import APIRouter, HTTPException, Path, Security
from prisma.enums import APIKeyPermission
from pydantic import BaseModel, Field
from backend.api.external.middleware import require_permission
from backend.api.features.library import db as library_db
from backend.data import graph as graph_db
from backend.data.auth.base import APIAuthorizationInfo
from backend.data.model import Credentials, OAuth2Credentials
from backend.integrations.creds_manager import IntegrationCredentialsManager
logger = logging.getLogger(__name__)
integrations_router = APIRouter()
creds_manager = IntegrationCredentialsManager()
# ============================================================================
# Models
# ============================================================================
class Credential(BaseModel):
"""A user's credential for an integration."""
id: str
provider: str = Field(description="Integration provider name")
title: Optional[str] = Field(
default=None, description="User-assigned title for this credential"
)
scopes: list[str] = Field(default_factory=list, description="Granted scopes")
class CredentialsListResponse(BaseModel):
"""Response for listing credentials."""
credentials: list[Credential]
class CredentialRequirement(BaseModel):
"""A credential requirement for a graph or agent."""
provider: str = Field(description="Required provider name")
required_scopes: list[str] = Field(
default_factory=list, description="Required scopes"
)
matching_credentials: list[Credential] = Field(
default_factory=list,
description="User's credentials that match this requirement",
)
class CredentialRequirementsResponse(BaseModel):
"""Response for listing credential requirements."""
requirements: list[CredentialRequirement]
# ============================================================================
# Conversion Functions
# ============================================================================
def _convert_credential(cred: Credentials) -> Credential:
"""Convert internal credential to v2 API model."""
scopes: list[str] = []
if isinstance(cred, OAuth2Credentials):
scopes = cred.scopes or []
return Credential(
id=cred.id,
provider=cred.provider,
title=cred.title,
scopes=scopes,
)
# ============================================================================
# Endpoints
# ============================================================================
@integrations_router.get(
path="/credentials",
summary="List all credentials",
response_model=CredentialsListResponse,
)
async def list_credentials(
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_INTEGRATIONS)
),
) -> CredentialsListResponse:
"""
List all integration credentials for the authenticated user.
This returns all OAuth credentials the user has connected, across
all integration providers.
"""
credentials = await creds_manager.store.get_all_creds(auth.user_id)
return CredentialsListResponse(
credentials=[_convert_credential(c) for c in credentials]
)
@integrations_router.get(
path="/credentials/{provider}",
summary="List credentials by provider",
response_model=CredentialsListResponse,
)
async def list_credentials_by_provider(
provider: str = Path(description="Provider name (e.g., 'github', 'google')"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_INTEGRATIONS)
),
) -> CredentialsListResponse:
"""
List integration credentials for a specific provider.
"""
all_credentials = await creds_manager.store.get_all_creds(auth.user_id)
# Filter by provider
filtered = [c for c in all_credentials if c.provider.lower() == provider.lower()]
return CredentialsListResponse(
credentials=[_convert_credential(c) for c in filtered]
)
@integrations_router.get(
path="/graphs/{graph_id}/credentials",
summary="List credentials matching graph requirements",
response_model=CredentialRequirementsResponse,
)
async def list_graph_credential_requirements(
graph_id: str = Path(description="Graph ID"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_INTEGRATIONS)
),
) -> CredentialRequirementsResponse:
"""
List credential requirements for a graph and matching user credentials.
This helps identify which credentials the user needs to provide
when executing a graph.
"""
# Get the graph
graph = await graph_db.get_graph(
graph_id=graph_id,
version=None, # Active version
user_id=auth.user_id,
include_subgraphs=True,
)
if not graph:
raise HTTPException(status_code=404, detail=f"Graph #{graph_id} not found")
# Get the credentials input schema which contains provider requirements
creds_schema = graph.credentials_input_schema
all_credentials = await creds_manager.store.get_all_creds(auth.user_id)
requirements = []
for field_name, field_schema in creds_schema.get("properties", {}).items():
# Extract provider from schema
# The schema structure varies, but typically has provider info
providers = []
if "anyOf" in field_schema:
for option in field_schema["anyOf"]:
if "provider" in option:
providers.append(option["provider"])
elif "provider" in field_schema:
providers.append(field_schema["provider"])
for provider in providers:
# Find matching credentials
matching = [
_convert_credential(c)
for c in all_credentials
if c.provider.lower() == provider.lower()
]
requirements.append(
CredentialRequirement(
provider=provider,
required_scopes=[], # Would need to extract from schema
matching_credentials=matching,
)
)
return CredentialRequirementsResponse(requirements=requirements)
@integrations_router.get(
path="/library/{agent_id}/credentials",
summary="List credentials matching library agent requirements",
response_model=CredentialRequirementsResponse,
)
async def list_library_agent_credential_requirements(
agent_id: str = Path(description="Library agent ID"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_INTEGRATIONS)
),
) -> CredentialRequirementsResponse:
"""
List credential requirements for a library agent and matching user credentials.
This helps identify which credentials the user needs to provide
when executing an agent from their library.
"""
# Get the library agent
try:
library_agent = await library_db.get_library_agent(
id=agent_id,
user_id=auth.user_id,
)
except Exception:
raise HTTPException(status_code=404, detail=f"Agent #{agent_id} not found")
# Get the underlying graph
graph = await graph_db.get_graph(
graph_id=library_agent.graph_id,
version=library_agent.graph_version,
user_id=auth.user_id,
include_subgraphs=True,
)
if not graph:
raise HTTPException(
status_code=404,
detail=f"Graph for agent #{agent_id} not found",
)
# Get the credentials input schema
creds_schema = graph.credentials_input_schema
all_credentials = await creds_manager.store.get_all_creds(auth.user_id)
requirements = []
for field_name, field_schema in creds_schema.get("properties", {}).items():
# Extract provider from schema
providers = []
if "anyOf" in field_schema:
for option in field_schema["anyOf"]:
if "provider" in option:
providers.append(option["provider"])
elif "provider" in field_schema:
providers.append(field_schema["provider"])
for provider in providers:
# Find matching credentials
matching = [
_convert_credential(c)
for c in all_credentials
if c.provider.lower() == provider.lower()
]
requirements.append(
CredentialRequirement(
provider=provider,
required_scopes=[],
matching_credentials=matching,
)
)
return CredentialRequirementsResponse(requirements=requirements)

View File

@@ -1,247 +0,0 @@
"""
V2 External API - Library Endpoints
Provides access to the user's agent library and agent execution.
"""
import logging
from fastapi import APIRouter, HTTPException, Path, Query, Security
from prisma.enums import APIKeyPermission
from backend.api.external.middleware import require_permission
from backend.api.features.library import db as library_db
from backend.api.features.library import model as library_model
from backend.data import execution as execution_db
from backend.data.auth.base import APIAuthorizationInfo
from backend.data.credit import get_user_credit_model
from backend.executor import utils as execution_utils
from .common import DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE
from .models import (
ExecuteAgentRequest,
LibraryAgent,
LibraryAgentsResponse,
Run,
RunsListResponse,
)
logger = logging.getLogger(__name__)
library_router = APIRouter()
# ============================================================================
# Conversion Functions
# ============================================================================
def _convert_library_agent(agent: library_model.LibraryAgent) -> LibraryAgent:
"""Convert internal LibraryAgent to v2 API model."""
return LibraryAgent(
id=agent.id,
graph_id=agent.graph_id,
graph_version=agent.graph_version,
name=agent.name,
description=agent.description,
is_favorite=agent.is_favorite,
can_access_graph=agent.can_access_graph,
is_latest_version=agent.is_latest_version,
image_url=agent.image_url,
creator_name=agent.creator_name,
input_schema=agent.input_schema,
output_schema=agent.output_schema,
created_at=agent.created_at,
updated_at=agent.updated_at,
)
def _convert_execution_to_run(exec: execution_db.GraphExecutionMeta) -> Run:
"""Convert internal execution to v2 API Run model."""
return Run(
id=exec.id,
graph_id=exec.graph_id,
graph_version=exec.graph_version,
status=exec.status.value,
started_at=exec.started_at,
ended_at=exec.ended_at,
inputs=exec.inputs,
cost=exec.stats.cost if exec.stats else 0,
duration=exec.stats.duration if exec.stats else 0,
node_count=exec.stats.node_exec_count if exec.stats else 0,
)
# ============================================================================
# Endpoints
# ============================================================================
@library_router.get(
path="/agents",
summary="List library agents",
response_model=LibraryAgentsResponse,
)
async def list_library_agents(
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_LIBRARY)
),
page: int = Query(default=1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Items per page (max {MAX_PAGE_SIZE})",
),
) -> LibraryAgentsResponse:
"""
List agents in the user's library.
The library contains agents the user has created or added from the marketplace.
"""
result = await library_db.list_library_agents(
user_id=auth.user_id,
page=page,
page_size=page_size,
)
return LibraryAgentsResponse(
agents=[_convert_library_agent(a) for a in result.agents],
total_count=result.pagination.total_items,
page=result.pagination.current_page,
page_size=result.pagination.page_size,
total_pages=result.pagination.total_pages,
)
@library_router.get(
path="/agents/favorites",
summary="List favorite agents",
response_model=LibraryAgentsResponse,
)
async def list_favorite_agents(
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_LIBRARY)
),
page: int = Query(default=1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Items per page (max {MAX_PAGE_SIZE})",
),
) -> LibraryAgentsResponse:
"""
List favorite agents in the user's library.
"""
result = await library_db.list_favorite_library_agents(
user_id=auth.user_id,
page=page,
page_size=page_size,
)
return LibraryAgentsResponse(
agents=[_convert_library_agent(a) for a in result.agents],
total_count=result.pagination.total_items,
page=result.pagination.current_page,
page_size=result.pagination.page_size,
total_pages=result.pagination.total_pages,
)
@library_router.post(
path="/agents/{agent_id}/runs",
summary="Execute an agent",
response_model=Run,
)
async def execute_agent(
request: ExecuteAgentRequest,
agent_id: str = Path(description="Library agent ID"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.RUN_AGENT)
),
) -> Run:
"""
Execute an agent from the library.
This creates a new run with the provided inputs. The run executes
asynchronously and you can poll the run status using GET /runs/{run_id}.
"""
# Check credit balance
user_credit_model = await get_user_credit_model(auth.user_id)
current_balance = await user_credit_model.get_credits(auth.user_id)
if current_balance <= 0:
raise HTTPException(
status_code=402,
detail="Insufficient balance to execute the agent. Please top up your account.",
)
# Get the library agent to find the graph ID and version
try:
library_agent = await library_db.get_library_agent(
id=agent_id,
user_id=auth.user_id,
)
except Exception:
raise HTTPException(status_code=404, detail=f"Agent #{agent_id} not found")
try:
result = await execution_utils.add_graph_execution(
graph_id=library_agent.graph_id,
user_id=auth.user_id,
inputs=request.inputs,
graph_version=library_agent.graph_version,
graph_credentials_inputs=request.credentials_inputs,
)
return _convert_execution_to_run(result)
except Exception as e:
logger.error(f"Failed to execute agent: {e}")
raise HTTPException(status_code=400, detail=str(e))
@library_router.get(
path="/agents/{agent_id}/runs",
summary="List runs for an agent",
response_model=RunsListResponse,
)
async def list_agent_runs(
agent_id: str = Path(description="Library agent ID"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_LIBRARY)
),
page: int = Query(default=1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Items per page (max {MAX_PAGE_SIZE})",
),
) -> RunsListResponse:
"""
List execution runs for a specific agent.
"""
# Get the library agent to find the graph ID
try:
library_agent = await library_db.get_library_agent(
id=agent_id,
user_id=auth.user_id,
)
except Exception:
raise HTTPException(status_code=404, detail=f"Agent #{agent_id} not found")
result = await execution_db.get_graph_executions_paginated(
graph_id=library_agent.graph_id,
user_id=auth.user_id,
page=page,
page_size=page_size,
)
return RunsListResponse(
runs=[_convert_execution_to_run(e) for e in result.executions],
total_count=result.pagination.total_items,
page=result.pagination.current_page,
page_size=result.pagination.page_size,
total_pages=result.pagination.total_pages,
)

View File

@@ -1,510 +0,0 @@
"""
V2 External API - Marketplace Endpoints
Provides access to the agent marketplace (store).
"""
import logging
import urllib.parse
from datetime import datetime
from typing import Literal, Optional
from fastapi import APIRouter, HTTPException, Path, Query, Security
from prisma.enums import APIKeyPermission
from pydantic import BaseModel, Field
from backend.api.external.middleware import require_permission
from backend.api.features.store import cache as store_cache
from backend.api.features.store import db as store_db
from backend.api.features.store import model as store_model
from backend.data.auth.base import APIAuthorizationInfo
from .common import DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE
logger = logging.getLogger(__name__)
marketplace_router = APIRouter()
# ============================================================================
# Models
# ============================================================================
class MarketplaceAgent(BaseModel):
"""An agent available in the marketplace."""
slug: str
name: str
description: str
sub_heading: str
creator: str
creator_avatar: str
runs: int = Field(default=0, description="Number of times this agent has been run")
rating: float = Field(default=0.0, description="Average rating")
image_url: str = Field(default="")
class MarketplaceAgentDetails(BaseModel):
"""Detailed information about a marketplace agent."""
store_listing_version_id: str
slug: str
name: str
description: str
sub_heading: str
instructions: Optional[str] = None
creator: str
creator_avatar: str
categories: list[str] = Field(default_factory=list)
runs: int = Field(default=0)
rating: float = Field(default=0.0)
image_urls: list[str] = Field(default_factory=list)
video_url: str = Field(default="")
versions: list[str] = Field(default_factory=list, description="Available versions")
agent_graph_versions: list[str] = Field(default_factory=list)
agent_graph_id: str
last_updated: datetime
class MarketplaceAgentsResponse(BaseModel):
"""Response for listing marketplace agents."""
agents: list[MarketplaceAgent]
total_count: int
page: int
page_size: int
total_pages: int
class MarketplaceCreator(BaseModel):
"""A creator on the marketplace."""
name: str
username: str
description: str
avatar_url: str
num_agents: int
agent_rating: float
agent_runs: int
is_featured: bool = False
class MarketplaceCreatorDetails(BaseModel):
"""Detailed information about a marketplace creator."""
name: str
username: str
description: str
avatar_url: str
agent_rating: float
agent_runs: int
top_categories: list[str] = Field(default_factory=list)
links: list[str] = Field(default_factory=list)
class MarketplaceCreatorsResponse(BaseModel):
"""Response for listing marketplace creators."""
creators: list[MarketplaceCreator]
total_count: int
page: int
page_size: int
total_pages: int
class MarketplaceSubmission(BaseModel):
"""A marketplace submission."""
graph_id: str
graph_version: int
name: str
sub_heading: str
slug: str
description: str
instructions: Optional[str] = None
image_urls: list[str] = Field(default_factory=list)
date_submitted: datetime
status: str = Field(description="One of: DRAFT, PENDING, APPROVED, REJECTED")
runs: int = Field(default=0)
rating: float = Field(default=0.0)
store_listing_version_id: Optional[str] = None
version: Optional[int] = None
review_comments: Optional[str] = None
reviewed_at: Optional[datetime] = None
video_url: Optional[str] = None
categories: list[str] = Field(default_factory=list)
class SubmissionsListResponse(BaseModel):
"""Response for listing submissions."""
submissions: list[MarketplaceSubmission]
total_count: int
page: int
page_size: int
total_pages: int
class CreateSubmissionRequest(BaseModel):
"""Request to create a marketplace submission."""
graph_id: str = Field(description="ID of the graph to submit")
graph_version: int = Field(description="Version of the graph to submit")
name: str = Field(description="Display name for the agent")
slug: str = Field(description="URL-friendly identifier")
description: str = Field(description="Full description")
sub_heading: str = Field(description="Short tagline")
image_urls: list[str] = Field(default_factory=list)
video_url: Optional[str] = None
categories: list[str] = Field(default_factory=list)
# ============================================================================
# Conversion Functions
# ============================================================================
def _convert_store_agent(agent: store_model.StoreAgent) -> MarketplaceAgent:
"""Convert internal StoreAgent to v2 API model."""
return MarketplaceAgent(
slug=agent.slug,
name=agent.agent_name,
description=agent.description,
sub_heading=agent.sub_heading,
creator=agent.creator,
creator_avatar=agent.creator_avatar,
runs=agent.runs,
rating=agent.rating,
image_url=agent.agent_image,
)
def _convert_store_agent_details(
agent: store_model.StoreAgentDetails,
) -> MarketplaceAgentDetails:
"""Convert internal StoreAgentDetails to v2 API model."""
return MarketplaceAgentDetails(
store_listing_version_id=agent.store_listing_version_id,
slug=agent.slug,
name=agent.agent_name,
description=agent.description,
sub_heading=agent.sub_heading,
instructions=agent.instructions,
creator=agent.creator,
creator_avatar=agent.creator_avatar,
categories=agent.categories,
runs=agent.runs,
rating=agent.rating,
image_urls=agent.agent_image,
video_url=agent.agent_video,
versions=agent.versions,
agent_graph_versions=agent.agentGraphVersions,
agent_graph_id=agent.agentGraphId,
last_updated=agent.last_updated,
)
def _convert_creator(creator: store_model.Creator) -> MarketplaceCreator:
"""Convert internal Creator to v2 API model."""
return MarketplaceCreator(
name=creator.name,
username=creator.username,
description=creator.description,
avatar_url=creator.avatar_url,
num_agents=creator.num_agents,
agent_rating=creator.agent_rating,
agent_runs=creator.agent_runs,
is_featured=creator.is_featured,
)
def _convert_creator_details(
creator: store_model.CreatorDetails,
) -> MarketplaceCreatorDetails:
"""Convert internal CreatorDetails to v2 API model."""
return MarketplaceCreatorDetails(
name=creator.name,
username=creator.username,
description=creator.description,
avatar_url=creator.avatar_url,
agent_rating=creator.agent_rating,
agent_runs=creator.agent_runs,
top_categories=creator.top_categories,
links=creator.links,
)
def _convert_submission(sub: store_model.StoreSubmission) -> MarketplaceSubmission:
"""Convert internal StoreSubmission to v2 API model."""
return MarketplaceSubmission(
graph_id=sub.agent_id,
graph_version=sub.agent_version,
name=sub.name,
sub_heading=sub.sub_heading,
slug=sub.slug,
description=sub.description,
instructions=sub.instructions,
image_urls=sub.image_urls,
date_submitted=sub.date_submitted,
status=sub.status.value,
runs=sub.runs,
rating=sub.rating,
store_listing_version_id=sub.store_listing_version_id,
version=sub.version,
review_comments=sub.review_comments,
reviewed_at=sub.reviewed_at,
video_url=sub.video_url,
categories=sub.categories,
)
# ============================================================================
# Endpoints - Read (authenticated)
# ============================================================================
@marketplace_router.get(
path="/agents",
summary="List marketplace agents",
response_model=MarketplaceAgentsResponse,
)
async def list_agents(
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_STORE)
),
featured: bool = Query(default=False, description="Filter to featured agents only"),
creator: Optional[str] = Query(
default=None, description="Filter by creator username"
),
sorted_by: Optional[Literal["rating", "runs", "name", "updated_at"]] = Query(
default=None, description="Sort field"
),
search_query: Optional[str] = Query(default=None, description="Search query"),
category: Optional[str] = Query(default=None, description="Filter by category"),
page: int = Query(default=1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Items per page (max {MAX_PAGE_SIZE})",
),
) -> MarketplaceAgentsResponse:
"""
List agents available in the marketplace.
Supports filtering by featured status, creator, category, and search query.
Results can be sorted by rating, runs, name, or update time.
"""
result = await store_cache._get_cached_store_agents(
featured=featured,
creator=creator,
sorted_by=sorted_by,
search_query=search_query,
category=category,
page=page,
page_size=page_size,
)
return MarketplaceAgentsResponse(
agents=[_convert_store_agent(a) for a in result.agents],
total_count=result.pagination.total_items,
page=result.pagination.current_page,
page_size=result.pagination.page_size,
total_pages=result.pagination.total_pages,
)
@marketplace_router.get(
path="/agents/{username}/{agent_name}",
summary="Get agent details",
response_model=MarketplaceAgentDetails,
)
async def get_agent_details(
username: str = Path(description="Creator username"),
agent_name: str = Path(description="Agent slug/name"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_STORE)
),
) -> MarketplaceAgentDetails:
"""
Get detailed information about a specific marketplace agent.
"""
username = urllib.parse.unquote(username).lower()
agent_name = urllib.parse.unquote(agent_name).lower()
agent = await store_cache._get_cached_agent_details(
username=username, agent_name=agent_name
)
return _convert_store_agent_details(agent)
@marketplace_router.get(
path="/creators",
summary="List marketplace creators",
response_model=MarketplaceCreatorsResponse,
)
async def list_creators(
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_STORE)
),
featured: bool = Query(
default=False, description="Filter to featured creators only"
),
search_query: Optional[str] = Query(default=None, description="Search query"),
sorted_by: Optional[Literal["agent_rating", "agent_runs", "num_agents"]] = Query(
default=None, description="Sort field"
),
page: int = Query(default=1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Items per page (max {MAX_PAGE_SIZE})",
),
) -> MarketplaceCreatorsResponse:
"""
List creators on the marketplace.
Supports filtering by featured status and search query.
Results can be sorted by rating, runs, or number of agents.
"""
result = await store_cache._get_cached_store_creators(
featured=featured,
search_query=search_query,
sorted_by=sorted_by,
page=page,
page_size=page_size,
)
return MarketplaceCreatorsResponse(
creators=[_convert_creator(c) for c in result.creators],
total_count=result.pagination.total_items,
page=result.pagination.current_page,
page_size=result.pagination.page_size,
total_pages=result.pagination.total_pages,
)
@marketplace_router.get(
path="/creators/{username}",
summary="Get creator details",
response_model=MarketplaceCreatorDetails,
)
async def get_creator_details(
username: str = Path(description="Creator username"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_STORE)
),
) -> MarketplaceCreatorDetails:
"""
Get detailed information about a specific marketplace creator.
"""
username = urllib.parse.unquote(username).lower()
creator = await store_cache._get_cached_creator_details(username=username)
return _convert_creator_details(creator)
# ============================================================================
# Endpoints - Submissions (CRUD)
# ============================================================================
@marketplace_router.get(
path="/submissions",
summary="List my submissions",
response_model=SubmissionsListResponse,
)
async def list_submissions(
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_STORE)
),
page: int = Query(default=1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Items per page (max {MAX_PAGE_SIZE})",
),
) -> SubmissionsListResponse:
"""
List your marketplace submissions.
Returns all submissions you've created, including drafts, pending,
approved, and rejected submissions.
"""
result = await store_db.get_store_submissions(
user_id=auth.user_id,
page=page,
page_size=page_size,
)
return SubmissionsListResponse(
submissions=[_convert_submission(s) for s in result.submissions],
total_count=result.pagination.total_items,
page=result.pagination.current_page,
page_size=result.pagination.page_size,
total_pages=result.pagination.total_pages,
)
@marketplace_router.post(
path="/submissions",
summary="Create a submission",
response_model=MarketplaceSubmission,
)
async def create_submission(
request: CreateSubmissionRequest,
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_STORE)
),
) -> MarketplaceSubmission:
"""
Create a new marketplace submission.
This submits an agent for review to be published in the marketplace.
The submission will be in PENDING status until reviewed by the team.
"""
submission = await store_db.create_store_submission(
user_id=auth.user_id,
agent_id=request.graph_id,
agent_version=request.graph_version,
slug=request.slug,
name=request.name,
sub_heading=request.sub_heading,
description=request.description,
image_urls=request.image_urls,
video_url=request.video_url,
categories=request.categories,
)
return _convert_submission(submission)
@marketplace_router.delete(
path="/submissions/{submission_id}",
summary="Delete a submission",
)
async def delete_submission(
submission_id: str = Path(description="Submission ID"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_STORE)
),
) -> None:
"""
Delete a marketplace submission.
Only submissions in DRAFT status can be deleted.
"""
success = await store_db.delete_store_submission(
user_id=auth.user_id,
submission_id=submission_id,
)
if not success:
raise HTTPException(
status_code=404, detail=f"Submission #{submission_id} not found"
)

View File

@@ -1,552 +0,0 @@
"""
V2 External API - Request and Response Models
This module defines all request and response models for the v2 external API.
All models are self-contained and specific to the external API contract.
"""
from datetime import datetime
from typing import Any, Optional
from pydantic import BaseModel, Field
# ============================================================================
# Common/Shared Models
# ============================================================================
class PaginatedResponse(BaseModel):
"""Base class for paginated responses."""
total_count: int = Field(description="Total number of items across all pages")
page: int = Field(description="Current page number (1-indexed)")
page_size: int = Field(description="Number of items per page")
total_pages: int = Field(description="Total number of pages")
# ============================================================================
# Graph Models
# ============================================================================
class GraphLink(BaseModel):
"""A link between two nodes in a graph."""
id: str
source_id: str = Field(description="ID of the source node")
sink_id: str = Field(description="ID of the target node")
source_name: str = Field(description="Output pin name on source node")
sink_name: str = Field(description="Input pin name on target node")
is_static: bool = Field(
default=False, description="Whether this link provides static data"
)
class GraphNode(BaseModel):
"""A node in an agent graph."""
id: str
block_id: str = Field(description="ID of the block type")
input_default: dict[str, Any] = Field(
default_factory=dict, description="Default input values"
)
metadata: dict[str, Any] = Field(
default_factory=dict, description="Node metadata (e.g., position)"
)
class Graph(BaseModel):
"""Graph definition for creating or updating an agent."""
id: Optional[str] = Field(default=None, description="Graph ID (assigned by server)")
version: int = Field(default=1, description="Graph version")
is_active: bool = Field(default=True, description="Whether this version is active")
name: str = Field(description="Graph name")
description: str = Field(default="", description="Graph description")
nodes: list[GraphNode] = Field(default_factory=list, description="List of nodes")
links: list[GraphLink] = Field(
default_factory=list, description="Links between nodes"
)
class GraphMeta(BaseModel):
"""Graph metadata (summary information)."""
id: str
version: int
is_active: bool
name: str
description: str
created_at: datetime
input_schema: dict[str, Any] = Field(description="Input schema for the graph")
output_schema: dict[str, Any] = Field(description="Output schema for the graph")
class GraphDetails(GraphMeta):
"""Full graph details including nodes and links."""
nodes: list[GraphNode]
links: list[GraphLink]
credentials_input_schema: dict[str, Any] = Field(
description="Schema for required credentials"
)
class GraphSettings(BaseModel):
"""Settings for a graph."""
human_in_the_loop_safe_mode: Optional[bool] = Field(
default=None, description="Enable safe mode for human-in-the-loop blocks"
)
class CreateGraphRequest(BaseModel):
"""Request to create a new graph."""
graph: Graph = Field(description="The graph definition")
class SetActiveVersionRequest(BaseModel):
"""Request to set the active graph version."""
active_graph_version: int = Field(description="Version number to set as active")
class GraphsListResponse(PaginatedResponse):
"""Response for listing graphs."""
graphs: list[GraphMeta]
class DeleteGraphResponse(BaseModel):
"""Response for deleting a graph."""
version_count: int = Field(description="Number of versions deleted")
# ============================================================================
# Schedule Models
# ============================================================================
class Schedule(BaseModel):
"""An execution schedule for a graph."""
id: str
name: str
graph_id: str
graph_version: int
cron: str = Field(description="Cron expression for the schedule")
input_data: dict[str, Any] = Field(
default_factory=dict, description="Input data for scheduled executions"
)
is_enabled: bool = Field(default=True, description="Whether schedule is enabled")
next_run_time: Optional[datetime] = Field(
default=None, description="Next scheduled run time"
)
class CreateScheduleRequest(BaseModel):
"""Request to create a schedule."""
name: str = Field(description="Display name for the schedule")
cron: str = Field(description="Cron expression (e.g., '0 9 * * *' for 9am daily)")
input_data: dict[str, Any] = Field(
default_factory=dict, description="Input data for scheduled executions"
)
credentials_inputs: dict[str, Any] = Field(
default_factory=dict, description="Credentials for the schedule"
)
graph_version: Optional[int] = Field(
default=None, description="Graph version (default: active version)"
)
timezone: Optional[str] = Field(
default=None,
description="Timezone for schedule (e.g., 'America/New_York')",
)
class SchedulesListResponse(PaginatedResponse):
"""Response for listing schedules."""
schedules: list[Schedule]
# ============================================================================
# Block Models
# ============================================================================
class BlockCost(BaseModel):
"""Cost information for a block."""
cost_type: str = Field(description="Type of cost (e.g., 'per_call', 'per_token')")
cost_filter: dict[str, Any] = Field(
default_factory=dict, description="Conditions for this cost"
)
cost_amount: int = Field(description="Cost amount in credits")
class Block(BaseModel):
"""A building block that can be used in graphs."""
id: str
name: str
description: str
categories: list[str] = Field(default_factory=list)
input_schema: dict[str, Any]
output_schema: dict[str, Any]
costs: list[BlockCost] = Field(default_factory=list)
class BlocksListResponse(BaseModel):
"""Response for listing blocks."""
blocks: list[Block]
# ============================================================================
# Marketplace Models
# ============================================================================
class MarketplaceAgent(BaseModel):
"""An agent available in the marketplace."""
slug: str
agent_name: str
agent_image: str
creator: str
creator_avatar: str
sub_heading: str
description: str
runs: int = Field(default=0, description="Number of times this agent has been run")
rating: float = Field(default=0.0, description="Average rating")
class MarketplaceAgentDetails(BaseModel):
"""Detailed information about a marketplace agent."""
store_listing_version_id: str
slug: str
agent_name: str
agent_video: str
agent_output_demo: str
agent_image: list[str]
creator: str
creator_avatar: str
sub_heading: str
description: str
instructions: Optional[str] = None
categories: list[str]
runs: int
rating: float
versions: list[str]
agent_graph_versions: list[str]
agent_graph_id: str
last_updated: datetime
recommended_schedule_cron: Optional[str] = None
class MarketplaceCreator(BaseModel):
"""A creator on the marketplace."""
name: str
username: str
description: str
avatar_url: str
num_agents: int
agent_rating: float
agent_runs: int
is_featured: bool = False
class MarketplaceAgentsResponse(PaginatedResponse):
"""Response for listing marketplace agents."""
agents: list[MarketplaceAgent]
class MarketplaceCreatorsResponse(PaginatedResponse):
"""Response for listing marketplace creators."""
creators: list[MarketplaceCreator]
# Submission models
class MarketplaceSubmission(BaseModel):
"""A marketplace submission."""
agent_id: str
agent_version: int
name: str
sub_heading: str
slug: str
description: str
instructions: Optional[str] = None
image_urls: list[str] = Field(default_factory=list)
date_submitted: datetime
status: str = Field(description="One of: DRAFT, PENDING, APPROVED, REJECTED")
runs: int
rating: float
store_listing_version_id: Optional[str] = None
version: Optional[int] = None
# Review fields
review_comments: Optional[str] = None
reviewed_at: Optional[datetime] = None
# Additional optional fields
video_url: Optional[str] = None
categories: list[str] = Field(default_factory=list)
class CreateSubmissionRequest(BaseModel):
"""Request to create a marketplace submission."""
agent_id: str = Field(description="ID of the graph to submit")
agent_version: int = Field(description="Version of the graph to submit")
name: str = Field(description="Display name for the agent")
slug: str = Field(description="URL-friendly identifier")
description: str = Field(description="Full description")
sub_heading: str = Field(description="Short tagline")
image_urls: list[str] = Field(default_factory=list)
video_url: Optional[str] = None
categories: list[str] = Field(default_factory=list)
class UpdateSubmissionRequest(BaseModel):
"""Request to update a marketplace submission."""
name: Optional[str] = None
description: Optional[str] = None
sub_heading: Optional[str] = None
image_urls: Optional[list[str]] = None
video_url: Optional[str] = None
categories: Optional[list[str]] = None
class SubmissionsListResponse(PaginatedResponse):
"""Response for listing submissions."""
submissions: list[MarketplaceSubmission]
# ============================================================================
# Library Models
# ============================================================================
class LibraryAgent(BaseModel):
"""An agent in the user's library."""
id: str
graph_id: str
graph_version: int
name: str
description: str
is_favorite: bool = False
can_access_graph: bool = False
is_latest_version: bool = False
image_url: Optional[str] = None
creator_name: str
input_schema: dict[str, Any] = Field(description="Input schema for the agent")
output_schema: dict[str, Any] = Field(description="Output schema for the agent")
created_at: datetime
updated_at: datetime
class LibraryAgentsResponse(PaginatedResponse):
"""Response for listing library agents."""
agents: list[LibraryAgent]
class ExecuteAgentRequest(BaseModel):
"""Request to execute an agent."""
inputs: dict[str, Any] = Field(
default_factory=dict, description="Input values for the agent"
)
credentials_inputs: dict[str, Any] = Field(
default_factory=dict, description="Credentials for the agent"
)
# ============================================================================
# Run Models
# ============================================================================
class Run(BaseModel):
"""An execution run."""
id: str
graph_id: str
graph_version: int
status: str = Field(
description="One of: INCOMPLETE, QUEUED, RUNNING, COMPLETED, TERMINATED, FAILED, REVIEW"
)
started_at: datetime
ended_at: Optional[datetime] = None
inputs: Optional[dict[str, Any]] = None
cost: int = Field(default=0, description="Cost in credits")
duration: float = Field(default=0, description="Duration in seconds")
node_count: int = Field(default=0, description="Number of nodes executed")
class RunDetails(Run):
"""Detailed information about a run including node executions."""
outputs: Optional[dict[str, list[Any]]] = None
node_executions: list[dict[str, Any]] = Field(
default_factory=list, description="Individual node execution results"
)
class RunsListResponse(PaginatedResponse):
"""Response for listing runs."""
runs: list[Run]
# ============================================================================
# Run Review Models (Human-in-the-loop)
# ============================================================================
class PendingReview(BaseModel):
"""A pending human-in-the-loop review."""
id: str # node_exec_id
run_id: str
graph_id: str
graph_version: int
payload: Any = Field(description="Data to be reviewed")
instructions: Optional[str] = Field(
default=None, description="Instructions for the reviewer"
)
editable: bool = Field(
default=True, description="Whether the reviewer can edit the data"
)
status: str = Field(description="One of: WAITING, APPROVED, REJECTED")
created_at: datetime
class PendingReviewsResponse(PaginatedResponse):
"""Response for listing pending reviews."""
reviews: list[PendingReview]
class ReviewDecision(BaseModel):
"""Decision for a single review item."""
node_exec_id: str = Field(description="Node execution ID (review ID)")
approved: bool = Field(description="Whether to approve the data")
edited_payload: Optional[Any] = Field(
default=None, description="Modified payload data (if editing)"
)
message: Optional[str] = Field(
default=None, description="Optional message from reviewer", max_length=2000
)
class SubmitReviewsRequest(BaseModel):
"""Request to submit review responses for all pending reviews of an execution."""
reviews: list[ReviewDecision] = Field(
description="All review decisions for the execution"
)
class SubmitReviewsResponse(BaseModel):
"""Response after submitting reviews."""
run_id: str
approved_count: int = Field(description="Number of reviews approved")
rejected_count: int = Field(description="Number of reviews rejected")
# ============================================================================
# Credit Models
# ============================================================================
class CreditBalance(BaseModel):
"""User's credit balance."""
balance: int = Field(description="Current credit balance")
class CreditTransaction(BaseModel):
"""A credit transaction."""
transaction_key: str
amount: int
transaction_type: str = Field(description="Transaction type")
transaction_time: datetime
running_balance: int
description: Optional[str] = None
class CreditTransactionsResponse(PaginatedResponse):
"""Response for listing credit transactions."""
transactions: list[CreditTransaction]
# ============================================================================
# Integration Models
# ============================================================================
class Credential(BaseModel):
"""A user's credential for an integration."""
id: str
provider: str = Field(description="Integration provider name")
title: Optional[str] = Field(
default=None, description="User-assigned title for this credential"
)
scopes: list[str] = Field(default_factory=list, description="Granted scopes")
class CredentialsListResponse(BaseModel):
"""Response for listing credentials."""
credentials: list[Credential]
class CredentialRequirement(BaseModel):
"""A credential requirement for a graph or agent."""
provider: str = Field(description="Required provider name")
required_scopes: list[str] = Field(
default_factory=list, description="Required scopes"
)
matching_credentials: list[Credential] = Field(
default_factory=list,
description="User's credentials that match this requirement",
)
class CredentialRequirementsResponse(BaseModel):
"""Response for listing credential requirements."""
requirements: list[CredentialRequirement]
# ============================================================================
# File Models
# ============================================================================
class UploadFileResponse(BaseModel):
"""Response after uploading a file."""
file_uri: str = Field(description="URI to reference the uploaded file")
file_name: str
size: int = Field(description="File size in bytes")
content_type: str
expires_in_hours: int

View File

@@ -1,35 +0,0 @@
"""
V2 External API Routes
This module defines the main v2 router that aggregates all v2 API endpoints.
"""
from fastapi import APIRouter
from .blocks import blocks_router
from .credits import credits_router
from .files import files_router
from .graphs import graphs_router
from .integrations import integrations_router
from .library import library_router
from .marketplace import marketplace_router
from .runs import runs_router
from .schedules import graph_schedules_router, schedules_router
v2_router = APIRouter()
# Include all sub-routers
v2_router.include_router(graphs_router, prefix="/graphs", tags=["graphs"])
v2_router.include_router(graph_schedules_router, prefix="/graphs", tags=["schedules"])
v2_router.include_router(schedules_router, prefix="/schedules", tags=["schedules"])
v2_router.include_router(blocks_router, prefix="/blocks", tags=["blocks"])
v2_router.include_router(
marketplace_router, prefix="/marketplace", tags=["marketplace"]
)
v2_router.include_router(library_router, prefix="/library", tags=["library"])
v2_router.include_router(runs_router, prefix="/runs", tags=["runs"])
v2_router.include_router(credits_router, prefix="/credits", tags=["credits"])
v2_router.include_router(
integrations_router, prefix="/integrations", tags=["integrations"]
)
v2_router.include_router(files_router, prefix="/files", tags=["files"])

View File

@@ -1,451 +0,0 @@
"""
V2 External API - Runs Endpoints
Provides access to execution runs and human-in-the-loop reviews.
"""
import logging
from datetime import datetime
from typing import Any, Optional
from fastapi import APIRouter, HTTPException, Path, Query, Security
from prisma.enums import APIKeyPermission, ReviewStatus
from pydantic import BaseModel, Field
from backend.api.external.middleware import require_permission
from backend.api.features.executions.review.model import (
PendingHumanReviewModel,
SafeJsonData,
)
from backend.data import execution as execution_db
from backend.data import human_review as review_db
from backend.data.auth.base import APIAuthorizationInfo
from backend.executor import utils as execution_utils
from .common import DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE
logger = logging.getLogger(__name__)
runs_router = APIRouter()
# ============================================================================
# Models
# ============================================================================
class Run(BaseModel):
"""An execution run."""
id: str
graph_id: str
graph_version: int
status: str = Field(
description="One of: INCOMPLETE, QUEUED, RUNNING, COMPLETED, TERMINATED, FAILED, REVIEW"
)
started_at: datetime
ended_at: Optional[datetime] = None
inputs: Optional[dict[str, Any]] = None
cost: int = Field(default=0, description="Cost in credits")
duration: float = Field(default=0, description="Duration in seconds")
node_count: int = Field(default=0, description="Number of nodes executed")
class RunDetails(Run):
"""Detailed information about a run including outputs and node executions."""
outputs: Optional[dict[str, list[Any]]] = None
node_executions: list[dict[str, Any]] = Field(
default_factory=list, description="Individual node execution results"
)
class RunsListResponse(BaseModel):
"""Response for listing runs."""
runs: list[Run]
total_count: int
page: int
page_size: int
total_pages: int
class PendingReview(BaseModel):
"""A pending human-in-the-loop review."""
id: str # node_exec_id
run_id: str
graph_id: str
graph_version: int
payload: SafeJsonData = Field(description="Data to be reviewed")
instructions: Optional[str] = Field(
default=None, description="Instructions for the reviewer"
)
editable: bool = Field(
default=True, description="Whether the reviewer can edit the data"
)
status: str = Field(description="One of: WAITING, APPROVED, REJECTED")
created_at: datetime
class PendingReviewsResponse(BaseModel):
"""Response for listing pending reviews."""
reviews: list[PendingReview]
total_count: int
page: int
page_size: int
total_pages: int
class ReviewDecision(BaseModel):
"""Decision for a single review item."""
node_exec_id: str = Field(description="Node execution ID (review ID)")
approved: bool = Field(description="Whether to approve the data")
edited_payload: Optional[SafeJsonData] = Field(
default=None, description="Modified payload data (if editing)"
)
message: Optional[str] = Field(
default=None, description="Optional message from reviewer", max_length=2000
)
class SubmitReviewsRequest(BaseModel):
"""Request to submit review responses for all pending reviews of an execution."""
reviews: list[ReviewDecision] = Field(
description="All review decisions for the execution"
)
class SubmitReviewsResponse(BaseModel):
"""Response after submitting reviews."""
run_id: str
approved_count: int = Field(description="Number of reviews approved")
rejected_count: int = Field(description="Number of reviews rejected")
# ============================================================================
# Conversion Functions
# ============================================================================
def _convert_execution_to_run(exec: execution_db.GraphExecutionMeta) -> Run:
"""Convert internal execution to v2 API Run model."""
return Run(
id=exec.id,
graph_id=exec.graph_id,
graph_version=exec.graph_version,
status=exec.status.value,
started_at=exec.started_at,
ended_at=exec.ended_at,
inputs=exec.inputs,
cost=exec.stats.cost if exec.stats else 0,
duration=exec.stats.duration if exec.stats else 0,
node_count=exec.stats.node_exec_count if exec.stats else 0,
)
def _convert_execution_to_run_details(
exec: execution_db.GraphExecutionWithNodes,
) -> RunDetails:
"""Convert internal execution with nodes to v2 API RunDetails model."""
return RunDetails(
id=exec.id,
graph_id=exec.graph_id,
graph_version=exec.graph_version,
status=exec.status.value,
started_at=exec.started_at,
ended_at=exec.ended_at,
inputs=exec.inputs,
outputs=exec.outputs,
cost=exec.stats.cost if exec.stats else 0,
duration=exec.stats.duration if exec.stats else 0,
node_count=exec.stats.node_exec_count if exec.stats else 0,
node_executions=[
{
"node_id": node.node_id,
"status": node.status.value,
"input_data": node.input_data,
"output_data": node.output_data,
"started_at": node.start_time,
"ended_at": node.end_time,
}
for node in exec.node_executions
],
)
def _convert_pending_review(review: PendingHumanReviewModel) -> PendingReview:
"""Convert internal PendingHumanReviewModel to v2 API PendingReview model."""
return PendingReview(
id=review.node_exec_id,
run_id=review.graph_exec_id,
graph_id=review.graph_id,
graph_version=review.graph_version,
payload=review.payload,
instructions=review.instructions,
editable=review.editable,
status=review.status.value,
created_at=review.created_at,
)
# ============================================================================
# Endpoints - Runs
# ============================================================================
@runs_router.get(
path="",
summary="List all runs",
response_model=RunsListResponse,
)
async def list_runs(
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_RUN)
),
page: int = Query(default=1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Items per page (max {MAX_PAGE_SIZE})",
),
) -> RunsListResponse:
"""
List all execution runs for the authenticated user.
Returns runs across all agents, sorted by most recent first.
"""
result = await execution_db.get_graph_executions_paginated(
user_id=auth.user_id,
page=page,
page_size=page_size,
)
return RunsListResponse(
runs=[_convert_execution_to_run(e) for e in result.executions],
total_count=result.pagination.total_items,
page=result.pagination.current_page,
page_size=result.pagination.page_size,
total_pages=result.pagination.total_pages,
)
@runs_router.get(
path="/{run_id}",
summary="Get run details",
response_model=RunDetails,
)
async def get_run(
run_id: str = Path(description="Run ID"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_RUN)
),
) -> RunDetails:
"""
Get detailed information about a specific run.
Includes outputs and individual node execution results.
"""
result = await execution_db.get_graph_execution(
user_id=auth.user_id,
execution_id=run_id,
include_node_executions=True,
)
if not result:
raise HTTPException(status_code=404, detail=f"Run #{run_id} not found")
return _convert_execution_to_run_details(result)
@runs_router.post(
path="/{run_id}/stop",
summary="Stop a run",
)
async def stop_run(
run_id: str = Path(description="Run ID"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_RUN)
),
) -> Run:
"""
Stop a running execution.
Only runs in QUEUED or RUNNING status can be stopped.
"""
# Verify the run exists and belongs to the user
exec = await execution_db.get_graph_execution(
user_id=auth.user_id,
execution_id=run_id,
)
if not exec:
raise HTTPException(status_code=404, detail=f"Run #{run_id} not found")
# Stop the execution
await execution_utils.stop_graph_execution(
graph_exec_id=run_id,
user_id=auth.user_id,
)
# Fetch updated execution
updated_exec = await execution_db.get_graph_execution(
user_id=auth.user_id,
execution_id=run_id,
)
if not updated_exec:
raise HTTPException(status_code=404, detail=f"Run #{run_id} not found")
return _convert_execution_to_run(updated_exec)
@runs_router.delete(
path="/{run_id}",
summary="Delete a run",
)
async def delete_run(
run_id: str = Path(description="Run ID"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_RUN)
),
) -> None:
"""
Delete an execution run.
This marks the run as deleted. The data may still be retained for
some time for recovery purposes.
"""
await execution_db.delete_graph_execution(
graph_exec_id=run_id,
user_id=auth.user_id,
)
# ============================================================================
# Endpoints - Reviews (Human-in-the-loop)
# ============================================================================
@runs_router.get(
path="/reviews",
summary="List all pending reviews",
response_model=PendingReviewsResponse,
)
async def list_pending_reviews(
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_RUN_REVIEW)
),
page: int = Query(default=1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Items per page (max {MAX_PAGE_SIZE})",
),
) -> PendingReviewsResponse:
"""
List all pending human-in-the-loop reviews.
These are blocks that require human approval or input before the
agent can continue execution.
"""
reviews = await review_db.get_pending_reviews_for_user(
user_id=auth.user_id,
page=page,
page_size=page_size,
)
# Note: get_pending_reviews_for_user returns list directly, not a paginated result
# We compute pagination info based on results
total_count = len(reviews)
total_pages = max(1, (total_count + page_size - 1) // page_size)
return PendingReviewsResponse(
reviews=[_convert_pending_review(r) for r in reviews],
total_count=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
)
@runs_router.get(
path="/{run_id}/reviews",
summary="List reviews for a run",
response_model=list[PendingReview],
)
async def list_run_reviews(
run_id: str = Path(description="Run ID"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_RUN_REVIEW)
),
) -> list[PendingReview]:
"""
List all human-in-the-loop reviews for a specific run.
"""
reviews = await review_db.get_pending_reviews_for_execution(
graph_exec_id=run_id,
user_id=auth.user_id,
)
return [_convert_pending_review(r) for r in reviews]
@runs_router.post(
path="/{run_id}/reviews",
summary="Submit review responses for a run",
response_model=SubmitReviewsResponse,
)
async def submit_reviews(
request: SubmitReviewsRequest,
run_id: str = Path(description="Run ID"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_RUN_REVIEW)
),
) -> SubmitReviewsResponse:
"""
Submit responses to all pending human-in-the-loop reviews for a run.
All pending reviews for the execution must be included. Approving
a review will allow the agent to continue; rejecting will terminate
execution at that point.
"""
# Build review decisions dict for process_all_reviews_for_execution
review_decisions: dict[
str, tuple[ReviewStatus, SafeJsonData | None, str | None]
] = {}
for decision in request.reviews:
status = ReviewStatus.APPROVED if decision.approved else ReviewStatus.REJECTED
review_decisions[decision.node_exec_id] = (
status,
decision.edited_payload,
decision.message,
)
try:
results = await review_db.process_all_reviews_for_execution(
user_id=auth.user_id,
review_decisions=review_decisions,
)
approved_count = sum(
1 for r in results.values() if r.status == ReviewStatus.APPROVED
)
rejected_count = sum(
1 for r in results.values() if r.status == ReviewStatus.REJECTED
)
return SubmitReviewsResponse(
run_id=run_id,
approved_count=approved_count,
rejected_count=rejected_count,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))

View File

@@ -1,250 +0,0 @@
"""
V2 External API - Schedules Endpoints
Provides endpoints for managing execution schedules.
"""
import logging
from datetime import datetime
from typing import Any, Optional
from fastapi import APIRouter, HTTPException, Path, Query, Security
from prisma.enums import APIKeyPermission
from pydantic import BaseModel, Field
from backend.api.external.middleware import require_permission
from backend.data import graph as graph_db
from backend.data.auth.base import APIAuthorizationInfo
from backend.data.user import get_user_by_id
from backend.executor import scheduler
from backend.util.clients import get_scheduler_client
from backend.util.timezone_utils import get_user_timezone_or_utc
from .common import DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE
logger = logging.getLogger(__name__)
schedules_router = APIRouter()
# ============================================================================
# Request/Response Models
# ============================================================================
class Schedule(BaseModel):
"""An execution schedule for a graph."""
id: str
name: str
graph_id: str
graph_version: int
cron: str = Field(description="Cron expression for the schedule")
input_data: dict[str, Any] = Field(
default_factory=dict, description="Input data for scheduled executions"
)
next_run_time: Optional[datetime] = Field(
default=None, description="Next scheduled run time"
)
is_enabled: bool = Field(default=True, description="Whether schedule is enabled")
class SchedulesListResponse(BaseModel):
"""Response for listing schedules."""
schedules: list[Schedule]
total_count: int
page: int
page_size: int
total_pages: int
class CreateScheduleRequest(BaseModel):
"""Request to create a schedule."""
name: str = Field(description="Display name for the schedule")
cron: str = Field(description="Cron expression (e.g., '0 9 * * *' for 9am daily)")
input_data: dict[str, Any] = Field(
default_factory=dict, description="Input data for scheduled executions"
)
credentials_inputs: dict[str, Any] = Field(
default_factory=dict, description="Credentials for the schedule"
)
graph_version: Optional[int] = Field(
default=None, description="Graph version (default: active version)"
)
timezone: Optional[str] = Field(
default=None,
description=(
"Timezone for schedule (e.g., 'America/New_York'). "
"Defaults to user's timezone."
),
)
def _convert_schedule(job: scheduler.GraphExecutionJobInfo) -> Schedule:
"""Convert internal schedule job info to v2 API model."""
# Parse the ISO format string to datetime
next_run = datetime.fromisoformat(job.next_run_time) if job.next_run_time else None
return Schedule(
id=job.id,
name=job.name or "",
graph_id=job.graph_id,
graph_version=job.graph_version,
cron=job.cron,
input_data=job.input_data,
next_run_time=next_run,
is_enabled=True, # All returned schedules are enabled
)
# ============================================================================
# Endpoints
# ============================================================================
@schedules_router.get(
path="",
summary="List all user schedules",
response_model=SchedulesListResponse,
)
async def list_all_schedules(
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_SCHEDULE)
),
page: int = Query(default=1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(
default=DEFAULT_PAGE_SIZE,
ge=1,
le=MAX_PAGE_SIZE,
description=f"Items per page (max {MAX_PAGE_SIZE})",
),
) -> SchedulesListResponse:
"""
List all schedules for the authenticated user across all graphs.
"""
schedules = await get_scheduler_client().get_execution_schedules(
user_id=auth.user_id
)
converted = [_convert_schedule(s) for s in schedules]
# Manual pagination (scheduler doesn't support pagination natively)
total_count = len(converted)
total_pages = (total_count + page_size - 1) // page_size if total_count > 0 else 1
start = (page - 1) * page_size
end = start + page_size
paginated = converted[start:end]
return SchedulesListResponse(
schedules=paginated,
total_count=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
)
@schedules_router.delete(
path="/{schedule_id}",
summary="Delete a schedule",
)
async def delete_schedule(
schedule_id: str = Path(description="Schedule ID to delete"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_SCHEDULE)
),
) -> None:
"""
Delete an execution schedule.
"""
try:
await get_scheduler_client().delete_schedule(
schedule_id=schedule_id,
user_id=auth.user_id,
)
except Exception as e:
if "not found" in str(e).lower():
raise HTTPException(
status_code=404, detail=f"Schedule #{schedule_id} not found"
)
raise
# ============================================================================
# Graph-specific Schedule Endpoints (nested under /graphs)
# These are included in the graphs router via include_router
# ============================================================================
graph_schedules_router = APIRouter()
@graph_schedules_router.get(
path="/{graph_id}/schedules",
summary="List schedules for a graph",
response_model=list[Schedule],
)
async def list_graph_schedules(
graph_id: str = Path(description="Graph ID"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.READ_SCHEDULE)
),
) -> list[Schedule]:
"""
List all schedules for a specific graph.
"""
schedules = await get_scheduler_client().get_execution_schedules(
user_id=auth.user_id,
graph_id=graph_id,
)
return [_convert_schedule(s) for s in schedules]
@graph_schedules_router.post(
path="/{graph_id}/schedules",
summary="Create a schedule for a graph",
response_model=Schedule,
)
async def create_graph_schedule(
request: CreateScheduleRequest,
graph_id: str = Path(description="Graph ID"),
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.WRITE_SCHEDULE)
),
) -> Schedule:
"""
Create a new execution schedule for a graph.
The schedule will execute the graph at times matching the cron expression,
using the provided input data.
"""
graph = await graph_db.get_graph(
graph_id=graph_id,
version=request.graph_version,
user_id=auth.user_id,
)
if not graph:
raise HTTPException(
status_code=404,
detail=f"Graph #{graph_id} v{request.graph_version} not found.",
)
# Determine timezone
if request.timezone:
user_timezone = request.timezone
else:
user = await get_user_by_id(auth.user_id)
user_timezone = get_user_timezone_or_utc(user.timezone if user else None)
result = await get_scheduler_client().add_execution_schedule(
user_id=auth.user_id,
graph_id=graph_id,
graph_version=graph.version,
name=request.name,
cron=request.cron,
input_data=request.input_data,
input_credentials=request.credentials_inputs,
user_timezone=user_timezone,
)
return _convert_schedule(result)

View File

@@ -1,431 +0,0 @@
"""
Content Type Handlers for Unified Embeddings
Pluggable system for different content sources (store agents, blocks, docs).
Each handler knows how to fetch and process its content type for embedding.
"""
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from prisma.enums import ContentType
from backend.data.db import query_raw_with_schema
logger = logging.getLogger(__name__)
@dataclass
class ContentItem:
"""Represents a piece of content to be embedded."""
content_id: str # Unique identifier (DB ID or file path)
content_type: ContentType
searchable_text: str # Combined text for embedding
metadata: dict[str, Any] # Content-specific metadata
user_id: str | None = None # For user-scoped content
class ContentHandler(ABC):
"""Base handler for fetching and processing content for embeddings."""
@property
@abstractmethod
def content_type(self) -> ContentType:
"""The ContentType this handler manages."""
pass
@abstractmethod
async def get_missing_items(self, batch_size: int) -> list[ContentItem]:
"""
Fetch items that don't have embeddings yet.
Args:
batch_size: Maximum number of items to return
Returns:
List of ContentItem objects ready for embedding
"""
pass
@abstractmethod
async def get_stats(self) -> dict[str, int]:
"""
Get statistics about embedding coverage.
Returns:
Dict with keys: total, with_embeddings, without_embeddings
"""
pass
class StoreAgentHandler(ContentHandler):
"""Handler for marketplace store agent listings."""
@property
def content_type(self) -> ContentType:
return ContentType.STORE_AGENT
async def get_missing_items(self, batch_size: int) -> list[ContentItem]:
"""Fetch approved store listings without embeddings."""
from backend.api.features.store.embeddings import build_searchable_text
missing = await query_raw_with_schema(
"""
SELECT
slv.id,
slv.name,
slv.description,
slv."subHeading",
slv.categories
FROM {schema_prefix}"StoreListingVersion" slv
LEFT JOIN {schema_prefix}"UnifiedContentEmbedding" uce
ON slv.id = uce."contentId" AND uce."contentType" = 'STORE_AGENT'::{schema_prefix}"ContentType"
WHERE slv."submissionStatus" = 'APPROVED'
AND slv."isDeleted" = false
AND uce."contentId" IS NULL
LIMIT $1
""",
batch_size,
)
return [
ContentItem(
content_id=row["id"],
content_type=ContentType.STORE_AGENT,
searchable_text=build_searchable_text(
name=row["name"],
description=row["description"],
sub_heading=row["subHeading"],
categories=row["categories"] or [],
),
metadata={
"name": row["name"],
"categories": row["categories"] or [],
},
user_id=None, # Store agents are public
)
for row in missing
]
async def get_stats(self) -> dict[str, int]:
"""Get statistics about store agent embedding coverage."""
# Count approved versions
approved_result = await query_raw_with_schema(
"""
SELECT COUNT(*) as count
FROM {schema_prefix}"StoreListingVersion"
WHERE "submissionStatus" = 'APPROVED'
AND "isDeleted" = false
"""
)
total_approved = approved_result[0]["count"] if approved_result else 0
# Count versions with embeddings
embedded_result = await query_raw_with_schema(
"""
SELECT COUNT(*) as count
FROM {schema_prefix}"StoreListingVersion" slv
JOIN {schema_prefix}"UnifiedContentEmbedding" uce ON slv.id = uce."contentId" AND uce."contentType" = 'STORE_AGENT'::{schema_prefix}"ContentType"
WHERE slv."submissionStatus" = 'APPROVED'
AND slv."isDeleted" = false
"""
)
with_embeddings = embedded_result[0]["count"] if embedded_result else 0
return {
"total": total_approved,
"with_embeddings": with_embeddings,
"without_embeddings": total_approved - with_embeddings,
}
class BlockHandler(ContentHandler):
"""Handler for block definitions (Python classes)."""
@property
def content_type(self) -> ContentType:
return ContentType.BLOCK
async def get_missing_items(self, batch_size: int) -> list[ContentItem]:
"""Fetch blocks without embeddings."""
from backend.data.block import get_blocks
# Get all available blocks
all_blocks = get_blocks()
# Check which ones have embeddings
if not all_blocks:
return []
block_ids = list(all_blocks.keys())
# Query for existing embeddings
placeholders = ",".join([f"${i+1}" for i in range(len(block_ids))])
existing_result = await query_raw_with_schema(
f"""
SELECT "contentId"
FROM {{schema_prefix}}"UnifiedContentEmbedding"
WHERE "contentType" = 'BLOCK'::{{schema_prefix}}"ContentType"
AND "contentId" = ANY(ARRAY[{placeholders}])
""",
*block_ids,
)
existing_ids = {row["contentId"] for row in existing_result}
missing_blocks = [
(block_id, block_cls)
for block_id, block_cls in all_blocks.items()
if block_id not in existing_ids
]
# Convert to ContentItem
items = []
for block_id, block_cls in missing_blocks[:batch_size]:
try:
block_instance = block_cls()
# Build searchable text from block metadata
parts = []
if hasattr(block_instance, "name") and block_instance.name:
parts.append(block_instance.name)
if (
hasattr(block_instance, "description")
and block_instance.description
):
parts.append(block_instance.description)
if hasattr(block_instance, "categories") and block_instance.categories:
# Convert BlockCategory enum to strings
parts.append(
" ".join(str(cat.value) for cat in block_instance.categories)
)
# Add input/output schema info
if hasattr(block_instance, "input_schema"):
schema = block_instance.input_schema
if hasattr(schema, "model_json_schema"):
schema_dict = schema.model_json_schema()
if "properties" in schema_dict:
for prop_name, prop_info in schema_dict[
"properties"
].items():
if "description" in prop_info:
parts.append(
f"{prop_name}: {prop_info['description']}"
)
searchable_text = " ".join(parts)
# Convert categories set of enums to list of strings for JSON serialization
categories = getattr(block_instance, "categories", set())
categories_list = (
[cat.value for cat in categories] if categories else []
)
items.append(
ContentItem(
content_id=block_id,
content_type=ContentType.BLOCK,
searchable_text=searchable_text,
metadata={
"name": getattr(block_instance, "name", ""),
"categories": categories_list,
},
user_id=None, # Blocks are public
)
)
except Exception as e:
logger.warning(f"Failed to process block {block_id}: {e}")
continue
return items
async def get_stats(self) -> dict[str, int]:
"""Get statistics about block embedding coverage."""
from backend.data.block import get_blocks
all_blocks = get_blocks()
total_blocks = len(all_blocks)
if total_blocks == 0:
return {"total": 0, "with_embeddings": 0, "without_embeddings": 0}
block_ids = list(all_blocks.keys())
placeholders = ",".join([f"${i+1}" for i in range(len(block_ids))])
embedded_result = await query_raw_with_schema(
f"""
SELECT COUNT(*) as count
FROM {{schema_prefix}}"UnifiedContentEmbedding"
WHERE "contentType" = 'BLOCK'::{{schema_prefix}}"ContentType"
AND "contentId" = ANY(ARRAY[{placeholders}])
""",
*block_ids,
)
with_embeddings = embedded_result[0]["count"] if embedded_result else 0
return {
"total": total_blocks,
"with_embeddings": with_embeddings,
"without_embeddings": total_blocks - with_embeddings,
}
class DocumentationHandler(ContentHandler):
"""Handler for documentation files (.md/.mdx)."""
@property
def content_type(self) -> ContentType:
return ContentType.DOCUMENTATION
def _get_docs_root(self) -> Path:
"""Get the documentation root directory."""
# content_handlers.py is at: backend/backend/api/features/store/content_handlers.py
# Need to go up to project root then into docs/
# In container: /app/autogpt_platform/backend/backend/api/features/store -> /app/docs
# In development: /repo/autogpt_platform/backend/backend/api/features/store -> /repo/docs
this_file = Path(
__file__
) # .../backend/backend/api/features/store/content_handlers.py
project_root = (
this_file.parent.parent.parent.parent.parent.parent.parent
) # -> /app or /repo
docs_root = project_root / "docs"
return docs_root
def _extract_title_and_content(self, file_path: Path) -> tuple[str, str]:
"""Extract title and content from markdown file."""
try:
content = file_path.read_text(encoding="utf-8")
# Try to extract title from first # heading
lines = content.split("\n")
title = ""
body_lines = []
for line in lines:
if line.startswith("# ") and not title:
title = line[2:].strip()
else:
body_lines.append(line)
# If no title found, use filename
if not title:
title = file_path.stem.replace("-", " ").replace("_", " ").title()
body = "\n".join(body_lines)
return title, body
except Exception as e:
logger.warning(f"Failed to read {file_path}: {e}")
return file_path.stem, ""
async def get_missing_items(self, batch_size: int) -> list[ContentItem]:
"""Fetch documentation files without embeddings."""
docs_root = self._get_docs_root()
if not docs_root.exists():
logger.warning(f"Documentation root not found: {docs_root}")
return []
# Find all .md and .mdx files
all_docs = list(docs_root.rglob("*.md")) + list(docs_root.rglob("*.mdx"))
# Get relative paths for content IDs
doc_paths = [str(doc.relative_to(docs_root)) for doc in all_docs]
if not doc_paths:
return []
# Check which ones have embeddings
placeholders = ",".join([f"${i+1}" for i in range(len(doc_paths))])
existing_result = await query_raw_with_schema(
f"""
SELECT "contentId"
FROM {{schema_prefix}}"UnifiedContentEmbedding"
WHERE "contentType" = 'DOCUMENTATION'::{{schema_prefix}}"ContentType"
AND "contentId" = ANY(ARRAY[{placeholders}])
""",
*doc_paths,
)
existing_ids = {row["contentId"] for row in existing_result}
missing_docs = [
(doc_path, doc_file)
for doc_path, doc_file in zip(doc_paths, all_docs)
if doc_path not in existing_ids
]
# Convert to ContentItem
items = []
for doc_path, doc_file in missing_docs[:batch_size]:
try:
title, content = self._extract_title_and_content(doc_file)
# Build searchable text
searchable_text = f"{title} {content}"
items.append(
ContentItem(
content_id=doc_path,
content_type=ContentType.DOCUMENTATION,
searchable_text=searchable_text,
metadata={
"title": title,
"path": doc_path,
},
user_id=None, # Documentation is public
)
)
except Exception as e:
logger.warning(f"Failed to process doc {doc_path}: {e}")
continue
return items
async def get_stats(self) -> dict[str, int]:
"""Get statistics about documentation embedding coverage."""
docs_root = self._get_docs_root()
if not docs_root.exists():
return {"total": 0, "with_embeddings": 0, "without_embeddings": 0}
# Count all .md and .mdx files
all_docs = list(docs_root.rglob("*.md")) + list(docs_root.rglob("*.mdx"))
total_docs = len(all_docs)
if total_docs == 0:
return {"total": 0, "with_embeddings": 0, "without_embeddings": 0}
doc_paths = [str(doc.relative_to(docs_root)) for doc in all_docs]
placeholders = ",".join([f"${i+1}" for i in range(len(doc_paths))])
embedded_result = await query_raw_with_schema(
f"""
SELECT COUNT(*) as count
FROM {{schema_prefix}}"UnifiedContentEmbedding"
WHERE "contentType" = 'DOCUMENTATION'::{{schema_prefix}}"ContentType"
AND "contentId" = ANY(ARRAY[{placeholders}])
""",
*doc_paths,
)
with_embeddings = embedded_result[0]["count"] if embedded_result else 0
return {
"total": total_docs,
"with_embeddings": with_embeddings,
"without_embeddings": total_docs - with_embeddings,
}
# Content handler registry
CONTENT_HANDLERS: dict[ContentType, ContentHandler] = {
ContentType.STORE_AGENT: StoreAgentHandler(),
ContentType.BLOCK: BlockHandler(),
ContentType.DOCUMENTATION: DocumentationHandler(),
}

View File

@@ -1,215 +0,0 @@
"""
Integration tests for content handlers using real DB.
Run with: poetry run pytest backend/api/features/store/content_handlers_integration_test.py -xvs
These tests use the real database but mock OpenAI calls.
"""
from unittest.mock import patch
import pytest
from backend.api.features.store.content_handlers import (
CONTENT_HANDLERS,
BlockHandler,
DocumentationHandler,
StoreAgentHandler,
)
from backend.api.features.store.embeddings import (
EMBEDDING_DIM,
backfill_all_content_types,
ensure_content_embedding,
get_embedding_stats,
)
@pytest.mark.asyncio(loop_scope="session")
async def test_store_agent_handler_real_db():
"""Test StoreAgentHandler with real database queries."""
handler = StoreAgentHandler()
# Get stats from real DB
stats = await handler.get_stats()
# Stats should have correct structure
assert "total" in stats
assert "with_embeddings" in stats
assert "without_embeddings" in stats
assert stats["total"] >= 0
assert stats["with_embeddings"] >= 0
assert stats["without_embeddings"] >= 0
# Get missing items (max 1 to keep test fast)
items = await handler.get_missing_items(batch_size=1)
# Items should be list (may be empty if all have embeddings)
assert isinstance(items, list)
if items:
item = items[0]
assert item.content_id is not None
assert item.content_type.value == "STORE_AGENT"
assert item.searchable_text != ""
assert item.user_id is None
@pytest.mark.asyncio(loop_scope="session")
async def test_block_handler_real_db():
"""Test BlockHandler with real database queries."""
handler = BlockHandler()
# Get stats from real DB
stats = await handler.get_stats()
# Stats should have correct structure
assert "total" in stats
assert "with_embeddings" in stats
assert "without_embeddings" in stats
assert stats["total"] >= 0 # Should have at least some blocks
assert stats["with_embeddings"] >= 0
assert stats["without_embeddings"] >= 0
# Get missing items (max 1 to keep test fast)
items = await handler.get_missing_items(batch_size=1)
# Items should be list
assert isinstance(items, list)
if items:
item = items[0]
assert item.content_id is not None # Should be block UUID
assert item.content_type.value == "BLOCK"
assert item.searchable_text != ""
assert item.user_id is None
@pytest.mark.asyncio(loop_scope="session")
async def test_documentation_handler_real_fs():
"""Test DocumentationHandler with real filesystem."""
handler = DocumentationHandler()
# Get stats from real filesystem
stats = await handler.get_stats()
# Stats should have correct structure
assert "total" in stats
assert "with_embeddings" in stats
assert "without_embeddings" in stats
assert stats["total"] >= 0
assert stats["with_embeddings"] >= 0
assert stats["without_embeddings"] >= 0
# Get missing items (max 1 to keep test fast)
items = await handler.get_missing_items(batch_size=1)
# Items should be list
assert isinstance(items, list)
if items:
item = items[0]
assert item.content_id is not None # Should be relative path
assert item.content_type.value == "DOCUMENTATION"
assert item.searchable_text != ""
assert item.user_id is None
@pytest.mark.asyncio(loop_scope="session")
async def test_get_embedding_stats_all_types():
"""Test get_embedding_stats aggregates all content types."""
stats = await get_embedding_stats()
# Should have structure with by_type and totals
assert "by_type" in stats
assert "totals" in stats
# Check each content type is present
by_type = stats["by_type"]
assert "STORE_AGENT" in by_type
assert "BLOCK" in by_type
assert "DOCUMENTATION" in by_type
# Check totals are aggregated
totals = stats["totals"]
assert totals["total"] >= 0
assert totals["with_embeddings"] >= 0
assert totals["without_embeddings"] >= 0
assert "coverage_percent" in totals
@pytest.mark.asyncio(loop_scope="session")
@patch("backend.api.features.store.embeddings.generate_embedding")
async def test_ensure_content_embedding_blocks(mock_generate):
"""Test creating embeddings for blocks (mocked OpenAI)."""
# Mock OpenAI to return fake embedding
mock_generate.return_value = [0.1] * EMBEDDING_DIM
# Get one block without embedding
handler = BlockHandler()
items = await handler.get_missing_items(batch_size=1)
if not items:
pytest.skip("No blocks without embeddings")
item = items[0]
# Try to create embedding (OpenAI mocked)
result = await ensure_content_embedding(
content_type=item.content_type,
content_id=item.content_id,
searchable_text=item.searchable_text,
metadata=item.metadata,
user_id=item.user_id,
)
# Should succeed with mocked OpenAI
assert result is True
mock_generate.assert_called_once()
@pytest.mark.asyncio(loop_scope="session")
@patch("backend.api.features.store.embeddings.generate_embedding")
async def test_backfill_all_content_types_dry_run(mock_generate):
"""Test backfill_all_content_types processes all handlers in order."""
# Mock OpenAI to return fake embedding
mock_generate.return_value = [0.1] * EMBEDDING_DIM
# Run backfill with batch_size=1 to process max 1 per type
result = await backfill_all_content_types(batch_size=1)
# Should have results for all content types
assert "by_type" in result
assert "totals" in result
by_type = result["by_type"]
assert "BLOCK" in by_type
assert "STORE_AGENT" in by_type
assert "DOCUMENTATION" in by_type
# Each type should have correct structure
for content_type, type_result in by_type.items():
assert "processed" in type_result
assert "success" in type_result
assert "failed" in type_result
# Totals should aggregate
totals = result["totals"]
assert totals["processed"] >= 0
assert totals["success"] >= 0
assert totals["failed"] >= 0
@pytest.mark.asyncio(loop_scope="session")
async def test_content_handler_registry():
"""Test all handlers are registered in correct order."""
from prisma.enums import ContentType
# All three types should be registered
assert ContentType.STORE_AGENT in CONTENT_HANDLERS
assert ContentType.BLOCK in CONTENT_HANDLERS
assert ContentType.DOCUMENTATION in CONTENT_HANDLERS
# Check handler types
assert isinstance(CONTENT_HANDLERS[ContentType.STORE_AGENT], StoreAgentHandler)
assert isinstance(CONTENT_HANDLERS[ContentType.BLOCK], BlockHandler)
assert isinstance(CONTENT_HANDLERS[ContentType.DOCUMENTATION], DocumentationHandler)

View File

@@ -1,324 +0,0 @@
"""
E2E tests for content handlers (blocks, store agents, documentation).
Tests the full flow: discovering content → generating embeddings → storing.
"""
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from prisma.enums import ContentType
from backend.api.features.store.content_handlers import (
CONTENT_HANDLERS,
BlockHandler,
DocumentationHandler,
StoreAgentHandler,
)
@pytest.mark.asyncio(loop_scope="session")
async def test_store_agent_handler_get_missing_items(mocker):
"""Test StoreAgentHandler fetches approved agents without embeddings."""
handler = StoreAgentHandler()
# Mock database query
mock_missing = [
{
"id": "agent-1",
"name": "Test Agent",
"description": "A test agent",
"subHeading": "Test heading",
"categories": ["AI", "Testing"],
}
]
with patch(
"backend.api.features.store.content_handlers.query_raw_with_schema",
return_value=mock_missing,
):
items = await handler.get_missing_items(batch_size=10)
assert len(items) == 1
assert items[0].content_id == "agent-1"
assert items[0].content_type == ContentType.STORE_AGENT
assert "Test Agent" in items[0].searchable_text
assert "A test agent" in items[0].searchable_text
assert items[0].metadata["name"] == "Test Agent"
assert items[0].user_id is None
@pytest.mark.asyncio(loop_scope="session")
async def test_store_agent_handler_get_stats(mocker):
"""Test StoreAgentHandler returns correct stats."""
handler = StoreAgentHandler()
# Mock approved count query
mock_approved = [{"count": 50}]
# Mock embedded count query
mock_embedded = [{"count": 30}]
with patch(
"backend.api.features.store.content_handlers.query_raw_with_schema",
side_effect=[mock_approved, mock_embedded],
):
stats = await handler.get_stats()
assert stats["total"] == 50
assert stats["with_embeddings"] == 30
assert stats["without_embeddings"] == 20
@pytest.mark.asyncio(loop_scope="session")
async def test_block_handler_get_missing_items(mocker):
"""Test BlockHandler discovers blocks without embeddings."""
handler = BlockHandler()
# Mock get_blocks to return test blocks
mock_block_class = MagicMock()
mock_block_instance = MagicMock()
mock_block_instance.name = "Calculator Block"
mock_block_instance.description = "Performs calculations"
mock_block_instance.categories = [MagicMock(value="MATH")]
mock_block_instance.input_schema.model_json_schema.return_value = {
"properties": {"expression": {"description": "Math expression to evaluate"}}
}
mock_block_class.return_value = mock_block_instance
mock_blocks = {"block-uuid-1": mock_block_class}
# Mock existing embeddings query (no embeddings exist)
mock_existing = []
with patch(
"backend.data.block.get_blocks",
return_value=mock_blocks,
):
with patch(
"backend.api.features.store.content_handlers.query_raw_with_schema",
return_value=mock_existing,
):
items = await handler.get_missing_items(batch_size=10)
assert len(items) == 1
assert items[0].content_id == "block-uuid-1"
assert items[0].content_type == ContentType.BLOCK
assert "Calculator Block" in items[0].searchable_text
assert "Performs calculations" in items[0].searchable_text
assert "MATH" in items[0].searchable_text
assert "expression: Math expression" in items[0].searchable_text
assert items[0].user_id is None
@pytest.mark.asyncio(loop_scope="session")
async def test_block_handler_get_stats(mocker):
"""Test BlockHandler returns correct stats."""
handler = BlockHandler()
# Mock get_blocks
mock_blocks = {
"block-1": MagicMock(),
"block-2": MagicMock(),
"block-3": MagicMock(),
}
# Mock embedded count query (2 blocks have embeddings)
mock_embedded = [{"count": 2}]
with patch(
"backend.data.block.get_blocks",
return_value=mock_blocks,
):
with patch(
"backend.api.features.store.content_handlers.query_raw_with_schema",
return_value=mock_embedded,
):
stats = await handler.get_stats()
assert stats["total"] == 3
assert stats["with_embeddings"] == 2
assert stats["without_embeddings"] == 1
@pytest.mark.asyncio(loop_scope="session")
async def test_documentation_handler_get_missing_items(tmp_path, mocker):
"""Test DocumentationHandler discovers docs without embeddings."""
handler = DocumentationHandler()
# Create temporary docs directory with test files
docs_root = tmp_path / "docs"
docs_root.mkdir()
(docs_root / "guide.md").write_text("# Getting Started\n\nThis is a guide.")
(docs_root / "api.mdx").write_text("# API Reference\n\nAPI documentation.")
# Mock _get_docs_root to return temp dir
with patch.object(handler, "_get_docs_root", return_value=docs_root):
# Mock existing embeddings query (no embeddings exist)
with patch(
"backend.api.features.store.content_handlers.query_raw_with_schema",
return_value=[],
):
items = await handler.get_missing_items(batch_size=10)
assert len(items) == 2
# Check guide.md
guide_item = next(
(item for item in items if item.content_id == "guide.md"), None
)
assert guide_item is not None
assert guide_item.content_type == ContentType.DOCUMENTATION
assert "Getting Started" in guide_item.searchable_text
assert "This is a guide" in guide_item.searchable_text
assert guide_item.metadata["title"] == "Getting Started"
assert guide_item.user_id is None
# Check api.mdx
api_item = next(
(item for item in items if item.content_id == "api.mdx"), None
)
assert api_item is not None
assert "API Reference" in api_item.searchable_text
@pytest.mark.asyncio(loop_scope="session")
async def test_documentation_handler_get_stats(tmp_path, mocker):
"""Test DocumentationHandler returns correct stats."""
handler = DocumentationHandler()
# Create temporary docs directory
docs_root = tmp_path / "docs"
docs_root.mkdir()
(docs_root / "doc1.md").write_text("# Doc 1")
(docs_root / "doc2.md").write_text("# Doc 2")
(docs_root / "doc3.mdx").write_text("# Doc 3")
# Mock embedded count query (1 doc has embedding)
mock_embedded = [{"count": 1}]
with patch.object(handler, "_get_docs_root", return_value=docs_root):
with patch(
"backend.api.features.store.content_handlers.query_raw_with_schema",
return_value=mock_embedded,
):
stats = await handler.get_stats()
assert stats["total"] == 3
assert stats["with_embeddings"] == 1
assert stats["without_embeddings"] == 2
@pytest.mark.asyncio(loop_scope="session")
async def test_documentation_handler_title_extraction(tmp_path):
"""Test DocumentationHandler extracts title from markdown heading."""
handler = DocumentationHandler()
# Test with heading
doc_with_heading = tmp_path / "with_heading.md"
doc_with_heading.write_text("# My Title\n\nContent here")
title, content = handler._extract_title_and_content(doc_with_heading)
assert title == "My Title"
assert "# My Title" not in content
assert "Content here" in content
# Test without heading
doc_without_heading = tmp_path / "no-heading.md"
doc_without_heading.write_text("Just content, no heading")
title, content = handler._extract_title_and_content(doc_without_heading)
assert title == "No Heading" # Uses filename
assert "Just content" in content
@pytest.mark.asyncio(loop_scope="session")
async def test_content_handlers_registry():
"""Test all content types are registered."""
assert ContentType.STORE_AGENT in CONTENT_HANDLERS
assert ContentType.BLOCK in CONTENT_HANDLERS
assert ContentType.DOCUMENTATION in CONTENT_HANDLERS
assert isinstance(CONTENT_HANDLERS[ContentType.STORE_AGENT], StoreAgentHandler)
assert isinstance(CONTENT_HANDLERS[ContentType.BLOCK], BlockHandler)
assert isinstance(CONTENT_HANDLERS[ContentType.DOCUMENTATION], DocumentationHandler)
@pytest.mark.asyncio(loop_scope="session")
async def test_block_handler_handles_missing_attributes():
"""Test BlockHandler gracefully handles blocks with missing attributes."""
handler = BlockHandler()
# Mock block with minimal attributes
mock_block_class = MagicMock()
mock_block_instance = MagicMock()
mock_block_instance.name = "Minimal Block"
# No description, categories, or schema
del mock_block_instance.description
del mock_block_instance.categories
del mock_block_instance.input_schema
mock_block_class.return_value = mock_block_instance
mock_blocks = {"block-minimal": mock_block_class}
with patch(
"backend.data.block.get_blocks",
return_value=mock_blocks,
):
with patch(
"backend.api.features.store.content_handlers.query_raw_with_schema",
return_value=[],
):
items = await handler.get_missing_items(batch_size=10)
assert len(items) == 1
assert items[0].searchable_text == "Minimal Block"
@pytest.mark.asyncio(loop_scope="session")
async def test_block_handler_skips_failed_blocks():
"""Test BlockHandler skips blocks that fail to instantiate."""
handler = BlockHandler()
# Mock one good block and one bad block
good_block = MagicMock()
good_instance = MagicMock()
good_instance.name = "Good Block"
good_instance.description = "Works fine"
good_instance.categories = []
good_block.return_value = good_instance
bad_block = MagicMock()
bad_block.side_effect = Exception("Instantiation failed")
mock_blocks = {"good-block": good_block, "bad-block": bad_block}
with patch(
"backend.data.block.get_blocks",
return_value=mock_blocks,
):
with patch(
"backend.api.features.store.content_handlers.query_raw_with_schema",
return_value=[],
):
items = await handler.get_missing_items(batch_size=10)
# Should only get the good block
assert len(items) == 1
assert items[0].content_id == "good-block"
@pytest.mark.asyncio(loop_scope="session")
async def test_documentation_handler_missing_docs_directory():
"""Test DocumentationHandler handles missing docs directory gracefully."""
handler = DocumentationHandler()
# Mock _get_docs_root to return non-existent path
fake_path = Path("/nonexistent/docs")
with patch.object(handler, "_get_docs_root", return_value=fake_path):
items = await handler.get_missing_items(batch_size=10)
assert items == []
stats = await handler.get_stats()
assert stats["total"] == 0
assert stats["with_embeddings"] == 0
assert stats["without_embeddings"] == 0

View File

@@ -14,7 +14,6 @@ import prisma
from prisma.enums import ContentType
from tiktoken import encoding_for_model
from backend.api.features.store.content_handlers import CONTENT_HANDLERS
from backend.data.db import execute_raw_with_schema, query_raw_with_schema
from backend.util.clients import get_openai_client
from backend.util.json import dumps
@@ -24,9 +23,6 @@ logger = logging.getLogger(__name__)
# OpenAI embedding model configuration
EMBEDDING_MODEL = "text-embedding-3-small"
# Embedding dimension for the model above
# text-embedding-3-small: 1536, text-embedding-3-large: 3072
EMBEDDING_DIM = 1536
# OpenAI embedding token limit (8,191 with 1 token buffer for safety)
EMBEDDING_MAX_TOKENS = 8191
@@ -373,69 +369,55 @@ async def delete_content_embedding(
async def get_embedding_stats() -> dict[str, Any]:
"""
Get statistics about embedding coverage for all content types.
Get statistics about embedding coverage.
Returns stats per content type and overall totals.
Returns counts of:
- Total approved listing versions
- Versions with embeddings
- Versions without embeddings
"""
try:
stats_by_type = {}
total_items = 0
total_with_embeddings = 0
total_without_embeddings = 0
# Count approved versions
approved_result = await query_raw_with_schema(
"""
SELECT COUNT(*) as count
FROM {schema_prefix}"StoreListingVersion"
WHERE "submissionStatus" = 'APPROVED'
AND "isDeleted" = false
"""
)
total_approved = approved_result[0]["count"] if approved_result else 0
# Aggregate stats from all handlers
for content_type, handler in CONTENT_HANDLERS.items():
try:
stats = await handler.get_stats()
stats_by_type[content_type.value] = {
"total": stats["total"],
"with_embeddings": stats["with_embeddings"],
"without_embeddings": stats["without_embeddings"],
"coverage_percent": (
round(stats["with_embeddings"] / stats["total"] * 100, 1)
if stats["total"] > 0
else 0
),
}
total_items += stats["total"]
total_with_embeddings += stats["with_embeddings"]
total_without_embeddings += stats["without_embeddings"]
except Exception as e:
logger.error(f"Failed to get stats for {content_type.value}: {e}")
stats_by_type[content_type.value] = {
"total": 0,
"with_embeddings": 0,
"without_embeddings": 0,
"coverage_percent": 0,
"error": str(e),
}
# Count versions with embeddings
embedded_result = await query_raw_with_schema(
"""
SELECT COUNT(*) as count
FROM {schema_prefix}"StoreListingVersion" slv
JOIN {schema_prefix}"UnifiedContentEmbedding" uce ON slv.id = uce."contentId" AND uce."contentType" = 'STORE_AGENT'::{schema_prefix}"ContentType"
WHERE slv."submissionStatus" = 'APPROVED'
AND slv."isDeleted" = false
"""
)
with_embeddings = embedded_result[0]["count"] if embedded_result else 0
return {
"by_type": stats_by_type,
"totals": {
"total": total_items,
"with_embeddings": total_with_embeddings,
"without_embeddings": total_without_embeddings,
"coverage_percent": (
round(total_with_embeddings / total_items * 100, 1)
if total_items > 0
else 0
),
},
"total_approved": total_approved,
"with_embeddings": with_embeddings,
"without_embeddings": total_approved - with_embeddings,
"coverage_percent": (
round(with_embeddings / total_approved * 100, 1)
if total_approved > 0
else 0
),
}
except Exception as e:
logger.error(f"Failed to get embedding stats: {e}")
return {
"by_type": {},
"totals": {
"total": 0,
"with_embeddings": 0,
"without_embeddings": 0,
"coverage_percent": 0,
},
"total_approved": 0,
"with_embeddings": 0,
"without_embeddings": 0,
"coverage_percent": 0,
"error": str(e),
}
@@ -444,118 +426,73 @@ async def backfill_missing_embeddings(batch_size: int = 10) -> dict[str, Any]:
"""
Generate embeddings for approved listings that don't have them.
BACKWARD COMPATIBILITY: Maintained for existing usage.
This now delegates to backfill_all_content_types() to process all content types.
Args:
batch_size: Number of embeddings to generate per content type
batch_size: Number of embeddings to generate in one call
Returns:
Dict with success/failure counts aggregated across all content types
Dict with success/failure counts
"""
# Delegate to the new generic backfill system
result = await backfill_all_content_types(batch_size)
try:
# Find approved versions without embeddings
missing = await query_raw_with_schema(
"""
SELECT
slv.id,
slv.name,
slv.description,
slv."subHeading",
slv.categories
FROM {schema_prefix}"StoreListingVersion" slv
LEFT JOIN {schema_prefix}"UnifiedContentEmbedding" uce
ON slv.id = uce."contentId" AND uce."contentType" = 'STORE_AGENT'::{schema_prefix}"ContentType"
WHERE slv."submissionStatus" = 'APPROVED'
AND slv."isDeleted" = false
AND uce."contentId" IS NULL
LIMIT $1
""",
batch_size,
)
# Return in the old format for backward compatibility
return result["totals"]
async def backfill_all_content_types(batch_size: int = 10) -> dict[str, Any]:
"""
Generate embeddings for all content types using registered handlers.
Processes content types in order: BLOCK → STORE_AGENT → DOCUMENTATION.
This ensures foundational content (blocks) are searchable first.
Args:
batch_size: Number of embeddings to generate per content type
Returns:
Dict with stats per content type and overall totals
"""
results_by_type = {}
total_processed = 0
total_success = 0
total_failed = 0
# Process content types in explicit order
processing_order = [
ContentType.BLOCK,
ContentType.STORE_AGENT,
ContentType.DOCUMENTATION,
]
for content_type in processing_order:
handler = CONTENT_HANDLERS.get(content_type)
if not handler:
logger.warning(f"No handler registered for {content_type.value}")
continue
try:
logger.info(f"Processing {content_type.value} content type...")
# Get missing items from handler
missing_items = await handler.get_missing_items(batch_size)
if not missing_items:
results_by_type[content_type.value] = {
"processed": 0,
"success": 0,
"failed": 0,
"message": "No missing embeddings",
}
continue
# Process embeddings concurrently for better performance
embedding_tasks = [
ensure_content_embedding(
content_type=item.content_type,
content_id=item.content_id,
searchable_text=item.searchable_text,
metadata=item.metadata,
user_id=item.user_id,
)
for item in missing_items
]
results = await asyncio.gather(*embedding_tasks, return_exceptions=True)
success = sum(1 for result in results if result is True)
failed = len(results) - success
results_by_type[content_type.value] = {
"processed": len(missing_items),
"success": success,
"failed": failed,
"message": f"Backfilled {success} embeddings, {failed} failed",
}
total_processed += len(missing_items)
total_success += success
total_failed += failed
logger.info(
f"{content_type.value}: processed {len(missing_items)}, "
f"success {success}, failed {failed}"
)
except Exception as e:
logger.error(f"Failed to process {content_type.value}: {e}")
results_by_type[content_type.value] = {
if not missing:
return {
"processed": 0,
"success": 0,
"failed": 0,
"error": str(e),
"message": "No missing embeddings",
}
return {
"by_type": results_by_type,
"totals": {
"processed": total_processed,
"success": total_success,
"failed": total_failed,
"message": f"Overall: {total_success} succeeded, {total_failed} failed",
},
}
# Process embeddings concurrently for better performance
embedding_tasks = [
ensure_embedding(
version_id=row["id"],
name=row["name"],
description=row["description"],
sub_heading=row["subHeading"],
categories=row["categories"] or [],
)
for row in missing
]
results = await asyncio.gather(*embedding_tasks, return_exceptions=True)
success = sum(1 for result in results if result is True)
failed = len(results) - success
return {
"processed": len(missing),
"success": success,
"failed": failed,
"message": f"Backfilled {success} embeddings, {failed} failed",
}
except Exception as e:
logger.error(f"Failed to backfill embeddings: {e}")
return {
"processed": 0,
"success": 0,
"failed": 0,
"error": str(e),
}
async def embed_query(query: str) -> list[float] | None:
@@ -629,334 +566,3 @@ async def ensure_content_embedding(
except Exception as e:
logger.error(f"Failed to ensure embedding for {content_type}:{content_id}: {e}")
return False
async def cleanup_orphaned_embeddings() -> dict[str, Any]:
"""
Clean up embeddings for content that no longer exists or is no longer valid.
Compares current content with embeddings in database and removes orphaned records:
- STORE_AGENT: Removes embeddings for rejected/deleted store listings
- BLOCK: Removes embeddings for blocks no longer registered
- DOCUMENTATION: Removes embeddings for deleted doc files
Returns:
Dict with cleanup statistics per content type
"""
results_by_type = {}
total_deleted = 0
# Cleanup orphaned embeddings for all content types
cleanup_types = [
ContentType.STORE_AGENT,
ContentType.BLOCK,
ContentType.DOCUMENTATION,
]
for content_type in cleanup_types:
try:
handler = CONTENT_HANDLERS.get(content_type)
if not handler:
logger.warning(f"No handler registered for {content_type}")
results_by_type[content_type.value] = {
"deleted": 0,
"error": "No handler registered",
}
continue
# Get all current content IDs from handler
if content_type == ContentType.STORE_AGENT:
# Get IDs of approved store listing versions from non-deleted listings
valid_agents = await query_raw_with_schema(
"""
SELECT slv.id
FROM {schema_prefix}"StoreListingVersion" slv
JOIN {schema_prefix}"StoreListing" sl ON slv."storeListingId" = sl.id
WHERE slv."submissionStatus" = 'APPROVED'
AND slv."isDeleted" = false
AND sl."isDeleted" = false
""",
)
current_ids = {row["id"] for row in valid_agents}
elif content_type == ContentType.BLOCK:
from backend.data.block import get_blocks
current_ids = set(get_blocks().keys())
elif content_type == ContentType.DOCUMENTATION:
from pathlib import Path
# embeddings.py is at: backend/backend/api/features/store/embeddings.py
# Need to go up to project root then into docs/
this_file = Path(__file__)
project_root = (
this_file.parent.parent.parent.parent.parent.parent.parent
)
docs_root = project_root / "docs"
if docs_root.exists():
all_docs = list(docs_root.rglob("*.md")) + list(
docs_root.rglob("*.mdx")
)
current_ids = {str(doc.relative_to(docs_root)) for doc in all_docs}
else:
current_ids = set()
else:
# Skip unknown content types to avoid accidental deletion
logger.warning(
f"Skipping cleanup for unknown content type: {content_type}"
)
results_by_type[content_type.value] = {
"deleted": 0,
"error": "Unknown content type - skipped for safety",
}
continue
# Get all embedding IDs from database
db_embeddings = await query_raw_with_schema(
"""
SELECT "contentId"
FROM {schema_prefix}"UnifiedContentEmbedding"
WHERE "contentType" = $1::{schema_prefix}"ContentType"
""",
content_type,
)
db_ids = {row["contentId"] for row in db_embeddings}
# Find orphaned embeddings (in DB but not in current content)
orphaned_ids = db_ids - current_ids
if not orphaned_ids:
logger.info(f"{content_type.value}: No orphaned embeddings found")
results_by_type[content_type.value] = {
"deleted": 0,
"message": "No orphaned embeddings",
}
continue
# Delete orphaned embeddings in batch for better performance
orphaned_list = list(orphaned_ids)
try:
await execute_raw_with_schema(
"""
DELETE FROM {schema_prefix}"UnifiedContentEmbedding"
WHERE "contentType" = $1::{schema_prefix}"ContentType"
AND "contentId" = ANY($2::text[])
""",
content_type,
orphaned_list,
)
deleted = len(orphaned_list)
except Exception as e:
logger.error(f"Failed to batch delete orphaned embeddings: {e}")
deleted = 0
logger.info(
f"{content_type.value}: Deleted {deleted}/{len(orphaned_ids)} orphaned embeddings"
)
results_by_type[content_type.value] = {
"deleted": deleted,
"orphaned": len(orphaned_ids),
"message": f"Deleted {deleted} orphaned embeddings",
}
total_deleted += deleted
except Exception as e:
logger.error(f"Failed to cleanup {content_type.value}: {e}")
results_by_type[content_type.value] = {
"deleted": 0,
"error": str(e),
}
return {
"by_type": results_by_type,
"totals": {
"deleted": total_deleted,
"message": f"Deleted {total_deleted} orphaned embeddings",
},
}
async def semantic_search(
query: str,
content_types: list[ContentType] | None = None,
user_id: str | None = None,
limit: int = 20,
min_similarity: float = 0.5,
) -> list[dict[str, Any]]:
"""
Semantic search across content types using embeddings.
Performs vector similarity search on UnifiedContentEmbedding table.
Used directly for blocks/docs/library agents, or as the semantic component
within hybrid_search for store agents.
If embedding generation fails, falls back to lexical search on searchableText.
Args:
query: Search query string
content_types: List of ContentType to search. Defaults to [BLOCK, STORE_AGENT, DOCUMENTATION]
user_id: Optional user ID for searching private content (library agents)
limit: Maximum number of results to return (default: 20)
min_similarity: Minimum cosine similarity threshold (0-1, default: 0.5)
Returns:
List of search results with the following structure:
[
{
"content_id": str,
"content_type": str, # "BLOCK", "STORE_AGENT", "DOCUMENTATION", or "LIBRARY_AGENT"
"searchable_text": str,
"metadata": dict,
"similarity": float, # Cosine similarity score (0-1)
},
...
]
Examples:
# Search blocks only
results = await semantic_search("calculate", content_types=[ContentType.BLOCK])
# Search blocks and documentation
results = await semantic_search(
"how to use API",
content_types=[ContentType.BLOCK, ContentType.DOCUMENTATION]
)
# Search all public content (default)
results = await semantic_search("AI agent")
# Search user's library agents
results = await semantic_search(
"my custom agent",
content_types=[ContentType.LIBRARY_AGENT],
user_id="user123"
)
"""
# Default to searching all public content types
if content_types is None:
content_types = [
ContentType.BLOCK,
ContentType.STORE_AGENT,
ContentType.DOCUMENTATION,
]
# Validate inputs
if not content_types:
return [] # Empty content_types would cause invalid SQL (IN ())
query = query.strip()
if not query:
return []
if limit < 1:
limit = 1
if limit > 100:
limit = 100
# Generate query embedding
query_embedding = await embed_query(query)
if query_embedding is not None:
# Semantic search with embeddings
embedding_str = embedding_to_vector_string(query_embedding)
# Build params in order: limit, then user_id (if provided), then content types
params: list[Any] = [limit]
user_filter = ""
if user_id is not None:
user_filter = 'AND "userId" = ${}'.format(len(params) + 1)
params.append(user_id)
# Add content type parameters and build placeholders dynamically
content_type_start_idx = len(params) + 1
content_type_placeholders = ", ".join(
f'${content_type_start_idx + i}::{{{{schema_prefix}}}}"ContentType"'
for i in range(len(content_types))
)
params.extend([ct.value for ct in content_types])
sql = f"""
SELECT
"contentId" as content_id,
"contentType" as content_type,
"searchableText" as searchable_text,
metadata,
1 - (embedding <=> '{embedding_str}'::vector) as similarity
FROM {{{{schema_prefix}}}}"UnifiedContentEmbedding"
WHERE "contentType" IN ({content_type_placeholders})
{user_filter}
AND 1 - (embedding <=> '{embedding_str}'::vector) >= ${len(params) + 1}
ORDER BY similarity DESC
LIMIT $1
"""
params.append(min_similarity)
try:
results = await query_raw_with_schema(
sql, *params, set_public_search_path=True
)
return [
{
"content_id": row["content_id"],
"content_type": row["content_type"],
"searchable_text": row["searchable_text"],
"metadata": row["metadata"],
"similarity": float(row["similarity"]),
}
for row in results
]
except Exception as e:
logger.error(f"Semantic search failed: {e}")
# Fall through to lexical search below
# Fallback to lexical search if embeddings unavailable
logger.warning("Falling back to lexical search (embeddings unavailable)")
params_lexical: list[Any] = [limit]
user_filter = ""
if user_id is not None:
user_filter = 'AND "userId" = ${}'.format(len(params_lexical) + 1)
params_lexical.append(user_id)
# Add content type parameters and build placeholders dynamically
content_type_start_idx = len(params_lexical) + 1
content_type_placeholders_lexical = ", ".join(
f'${content_type_start_idx + i}::{{{{schema_prefix}}}}"ContentType"'
for i in range(len(content_types))
)
params_lexical.extend([ct.value for ct in content_types])
sql_lexical = f"""
SELECT
"contentId" as content_id,
"contentType" as content_type,
"searchableText" as searchable_text,
metadata,
0.0 as similarity
FROM {{{{schema_prefix}}}}"UnifiedContentEmbedding"
WHERE "contentType" IN ({content_type_placeholders_lexical})
{user_filter}
AND "searchableText" ILIKE ${len(params_lexical) + 1}
ORDER BY "updatedAt" DESC
LIMIT $1
"""
params_lexical.append(f"%{query}%")
try:
results = await query_raw_with_schema(
sql_lexical, *params_lexical, set_public_search_path=True
)
return [
{
"content_id": row["content_id"],
"content_type": row["content_type"],
"searchable_text": row["searchable_text"],
"metadata": row["metadata"],
"similarity": 0.0, # Lexical search doesn't provide similarity
}
for row in results
]
except Exception as e:
logger.error(f"Lexical search failed: {e}")
return []

View File

@@ -1,666 +0,0 @@
"""
End-to-end database tests for embeddings and hybrid search.
These tests hit the actual database to verify SQL queries work correctly.
Tests cover:
1. Embedding storage (store_content_embedding)
2. Embedding retrieval (get_content_embedding)
3. Embedding deletion (delete_content_embedding)
4. Unified hybrid search across content types
5. Store agent hybrid search
"""
import uuid
from typing import AsyncGenerator
import pytest
from prisma.enums import ContentType
from backend.api.features.store import embeddings
from backend.api.features.store.embeddings import EMBEDDING_DIM
from backend.api.features.store.hybrid_search import (
hybrid_search,
unified_hybrid_search,
)
# ============================================================================
# Test Fixtures
# ============================================================================
@pytest.fixture
def test_content_id() -> str:
"""Generate unique content ID for test isolation."""
return f"test-content-{uuid.uuid4()}"
@pytest.fixture
def test_user_id() -> str:
"""Generate unique user ID for test isolation."""
return f"test-user-{uuid.uuid4()}"
@pytest.fixture
def mock_embedding() -> list[float]:
"""Generate a mock embedding vector."""
# Create a normalized embedding vector
import math
raw = [float(i % 10) / 10.0 for i in range(EMBEDDING_DIM)]
# Normalize to unit length (required for cosine similarity)
magnitude = math.sqrt(sum(x * x for x in raw))
return [x / magnitude for x in raw]
@pytest.fixture
def similar_embedding() -> list[float]:
"""Generate an embedding similar to mock_embedding."""
import math
# Similar but slightly different values
raw = [float(i % 10) / 10.0 + 0.01 for i in range(EMBEDDING_DIM)]
magnitude = math.sqrt(sum(x * x for x in raw))
return [x / magnitude for x in raw]
@pytest.fixture
def different_embedding() -> list[float]:
"""Generate an embedding very different from mock_embedding."""
import math
# Reversed pattern to be maximally different
raw = [float((EMBEDDING_DIM - i) % 10) / 10.0 for i in range(EMBEDDING_DIM)]
magnitude = math.sqrt(sum(x * x for x in raw))
return [x / magnitude for x in raw]
@pytest.fixture
async def cleanup_embeddings(
server,
) -> AsyncGenerator[list[tuple[ContentType, str, str | None]], None]:
"""
Fixture that tracks created embeddings and cleans them up after tests.
Yields a list to which tests can append (content_type, content_id, user_id) tuples.
"""
created_embeddings: list[tuple[ContentType, str, str | None]] = []
yield created_embeddings
# Cleanup all created embeddings
for content_type, content_id, user_id in created_embeddings:
try:
await embeddings.delete_content_embedding(content_type, content_id, user_id)
except Exception:
pass # Ignore cleanup errors
# ============================================================================
# store_content_embedding Tests
# ============================================================================
@pytest.mark.asyncio(loop_scope="session")
async def test_store_content_embedding_store_agent(
server,
test_content_id: str,
mock_embedding: list[float],
cleanup_embeddings: list,
):
"""Test storing embedding for STORE_AGENT content type."""
# Track for cleanup
cleanup_embeddings.append((ContentType.STORE_AGENT, test_content_id, None))
result = await embeddings.store_content_embedding(
content_type=ContentType.STORE_AGENT,
content_id=test_content_id,
embedding=mock_embedding,
searchable_text="AI assistant for productivity tasks",
metadata={"name": "Test Agent", "categories": ["productivity"]},
user_id=None, # Store agents are public
)
assert result is True
# Verify it was stored
stored = await embeddings.get_content_embedding(
ContentType.STORE_AGENT, test_content_id, user_id=None
)
assert stored is not None
assert stored["contentId"] == test_content_id
assert stored["contentType"] == "STORE_AGENT"
assert stored["searchableText"] == "AI assistant for productivity tasks"
@pytest.mark.asyncio(loop_scope="session")
async def test_store_content_embedding_block(
server,
test_content_id: str,
mock_embedding: list[float],
cleanup_embeddings: list,
):
"""Test storing embedding for BLOCK content type."""
cleanup_embeddings.append((ContentType.BLOCK, test_content_id, None))
result = await embeddings.store_content_embedding(
content_type=ContentType.BLOCK,
content_id=test_content_id,
embedding=mock_embedding,
searchable_text="HTTP request block for API calls",
metadata={"name": "HTTP Request Block"},
user_id=None, # Blocks are public
)
assert result is True
stored = await embeddings.get_content_embedding(
ContentType.BLOCK, test_content_id, user_id=None
)
assert stored is not None
assert stored["contentType"] == "BLOCK"
@pytest.mark.asyncio(loop_scope="session")
async def test_store_content_embedding_documentation(
server,
test_content_id: str,
mock_embedding: list[float],
cleanup_embeddings: list,
):
"""Test storing embedding for DOCUMENTATION content type."""
cleanup_embeddings.append((ContentType.DOCUMENTATION, test_content_id, None))
result = await embeddings.store_content_embedding(
content_type=ContentType.DOCUMENTATION,
content_id=test_content_id,
embedding=mock_embedding,
searchable_text="Getting started guide for AutoGPT platform",
metadata={"title": "Getting Started", "url": "/docs/getting-started"},
user_id=None, # Docs are public
)
assert result is True
stored = await embeddings.get_content_embedding(
ContentType.DOCUMENTATION, test_content_id, user_id=None
)
assert stored is not None
assert stored["contentType"] == "DOCUMENTATION"
@pytest.mark.asyncio(loop_scope="session")
async def test_store_content_embedding_upsert(
server,
test_content_id: str,
mock_embedding: list[float],
cleanup_embeddings: list,
):
"""Test that storing embedding twice updates instead of duplicates."""
cleanup_embeddings.append((ContentType.BLOCK, test_content_id, None))
# Store first time
result1 = await embeddings.store_content_embedding(
content_type=ContentType.BLOCK,
content_id=test_content_id,
embedding=mock_embedding,
searchable_text="Original text",
metadata={"version": 1},
user_id=None,
)
assert result1 is True
# Store again with different text (upsert)
result2 = await embeddings.store_content_embedding(
content_type=ContentType.BLOCK,
content_id=test_content_id,
embedding=mock_embedding,
searchable_text="Updated text",
metadata={"version": 2},
user_id=None,
)
assert result2 is True
# Verify only one record with updated text
stored = await embeddings.get_content_embedding(
ContentType.BLOCK, test_content_id, user_id=None
)
assert stored is not None
assert stored["searchableText"] == "Updated text"
# ============================================================================
# get_content_embedding Tests
# ============================================================================
@pytest.mark.asyncio(loop_scope="session")
async def test_get_content_embedding_not_found(server):
"""Test retrieving non-existent embedding returns None."""
result = await embeddings.get_content_embedding(
ContentType.STORE_AGENT, "non-existent-id", user_id=None
)
assert result is None
@pytest.mark.asyncio(loop_scope="session")
async def test_get_content_embedding_with_metadata(
server,
test_content_id: str,
mock_embedding: list[float],
cleanup_embeddings: list,
):
"""Test that metadata is correctly stored and retrieved."""
cleanup_embeddings.append((ContentType.STORE_AGENT, test_content_id, None))
metadata = {
"name": "Test Agent",
"subHeading": "A test agent",
"categories": ["ai", "productivity"],
"customField": 123,
}
await embeddings.store_content_embedding(
content_type=ContentType.STORE_AGENT,
content_id=test_content_id,
embedding=mock_embedding,
searchable_text="test",
metadata=metadata,
user_id=None,
)
stored = await embeddings.get_content_embedding(
ContentType.STORE_AGENT, test_content_id, user_id=None
)
assert stored is not None
assert stored["metadata"]["name"] == "Test Agent"
assert stored["metadata"]["categories"] == ["ai", "productivity"]
assert stored["metadata"]["customField"] == 123
# ============================================================================
# delete_content_embedding Tests
# ============================================================================
@pytest.mark.asyncio(loop_scope="session")
async def test_delete_content_embedding(
server,
test_content_id: str,
mock_embedding: list[float],
):
"""Test deleting embedding removes it from database."""
# Store embedding
await embeddings.store_content_embedding(
content_type=ContentType.BLOCK,
content_id=test_content_id,
embedding=mock_embedding,
searchable_text="To be deleted",
metadata=None,
user_id=None,
)
# Verify it exists
stored = await embeddings.get_content_embedding(
ContentType.BLOCK, test_content_id, user_id=None
)
assert stored is not None
# Delete it
result = await embeddings.delete_content_embedding(
ContentType.BLOCK, test_content_id, user_id=None
)
assert result is True
# Verify it's gone
stored = await embeddings.get_content_embedding(
ContentType.BLOCK, test_content_id, user_id=None
)
assert stored is None
@pytest.mark.asyncio(loop_scope="session")
async def test_delete_content_embedding_not_found(server):
"""Test deleting non-existent embedding doesn't error."""
result = await embeddings.delete_content_embedding(
ContentType.BLOCK, "non-existent-id", user_id=None
)
# Should succeed even if nothing to delete
assert result is True
# ============================================================================
# unified_hybrid_search Tests
# ============================================================================
@pytest.mark.asyncio(loop_scope="session")
async def test_unified_hybrid_search_finds_matching_content(
server,
mock_embedding: list[float],
cleanup_embeddings: list,
):
"""Test unified search finds content matching the query."""
# Create unique content IDs
agent_id = f"test-agent-{uuid.uuid4()}"
block_id = f"test-block-{uuid.uuid4()}"
doc_id = f"test-doc-{uuid.uuid4()}"
cleanup_embeddings.append((ContentType.STORE_AGENT, agent_id, None))
cleanup_embeddings.append((ContentType.BLOCK, block_id, None))
cleanup_embeddings.append((ContentType.DOCUMENTATION, doc_id, None))
# Store embeddings for different content types
await embeddings.store_content_embedding(
content_type=ContentType.STORE_AGENT,
content_id=agent_id,
embedding=mock_embedding,
searchable_text="AI writing assistant for blog posts",
metadata={"name": "Writing Assistant"},
user_id=None,
)
await embeddings.store_content_embedding(
content_type=ContentType.BLOCK,
content_id=block_id,
embedding=mock_embedding,
searchable_text="Text generation block for creative writing",
metadata={"name": "Text Generator"},
user_id=None,
)
await embeddings.store_content_embedding(
content_type=ContentType.DOCUMENTATION,
content_id=doc_id,
embedding=mock_embedding,
searchable_text="How to use writing blocks in AutoGPT",
metadata={"title": "Writing Guide"},
user_id=None,
)
# Search for "writing" - should find all three
results, total = await unified_hybrid_search(
query="writing",
page=1,
page_size=20,
)
# Should find at least our test content (may find others too)
content_ids = [r["content_id"] for r in results]
assert agent_id in content_ids or total >= 1 # Lexical search should find it
@pytest.mark.asyncio(loop_scope="session")
async def test_unified_hybrid_search_filter_by_content_type(
server,
mock_embedding: list[float],
cleanup_embeddings: list,
):
"""Test unified search can filter by content type."""
agent_id = f"test-agent-{uuid.uuid4()}"
block_id = f"test-block-{uuid.uuid4()}"
cleanup_embeddings.append((ContentType.STORE_AGENT, agent_id, None))
cleanup_embeddings.append((ContentType.BLOCK, block_id, None))
# Store both types with same searchable text
await embeddings.store_content_embedding(
content_type=ContentType.STORE_AGENT,
content_id=agent_id,
embedding=mock_embedding,
searchable_text="unique_search_term_xyz123",
metadata={},
user_id=None,
)
await embeddings.store_content_embedding(
content_type=ContentType.BLOCK,
content_id=block_id,
embedding=mock_embedding,
searchable_text="unique_search_term_xyz123",
metadata={},
user_id=None,
)
# Search only for BLOCK type
results, total = await unified_hybrid_search(
query="unique_search_term_xyz123",
content_types=[ContentType.BLOCK],
page=1,
page_size=20,
)
# All results should be BLOCK type
for r in results:
assert r["content_type"] == "BLOCK"
@pytest.mark.asyncio(loop_scope="session")
async def test_unified_hybrid_search_empty_query(server):
"""Test unified search with empty query returns empty results."""
results, total = await unified_hybrid_search(
query="",
page=1,
page_size=20,
)
assert results == []
assert total == 0
@pytest.mark.asyncio(loop_scope="session")
async def test_unified_hybrid_search_pagination(
server,
mock_embedding: list[float],
cleanup_embeddings: list,
):
"""Test unified search pagination works correctly."""
# Create multiple items
content_ids = []
for i in range(5):
content_id = f"test-pagination-{uuid.uuid4()}"
content_ids.append(content_id)
cleanup_embeddings.append((ContentType.BLOCK, content_id, None))
await embeddings.store_content_embedding(
content_type=ContentType.BLOCK,
content_id=content_id,
embedding=mock_embedding,
searchable_text=f"pagination test item number {i}",
metadata={"index": i},
user_id=None,
)
# Get first page
page1_results, total1 = await unified_hybrid_search(
query="pagination test",
content_types=[ContentType.BLOCK],
page=1,
page_size=2,
)
# Get second page
page2_results, total2 = await unified_hybrid_search(
query="pagination test",
content_types=[ContentType.BLOCK],
page=2,
page_size=2,
)
# Total should be consistent
assert total1 == total2
# Pages should have different content (if we have enough results)
if len(page1_results) > 0 and len(page2_results) > 0:
page1_ids = {r["content_id"] for r in page1_results}
page2_ids = {r["content_id"] for r in page2_results}
# No overlap between pages
assert page1_ids.isdisjoint(page2_ids)
@pytest.mark.asyncio(loop_scope="session")
async def test_unified_hybrid_search_min_score_filtering(
server,
mock_embedding: list[float],
cleanup_embeddings: list,
):
"""Test unified search respects min_score threshold."""
content_id = f"test-minscore-{uuid.uuid4()}"
cleanup_embeddings.append((ContentType.BLOCK, content_id, None))
await embeddings.store_content_embedding(
content_type=ContentType.BLOCK,
content_id=content_id,
embedding=mock_embedding,
searchable_text="completely unrelated content about bananas",
metadata={},
user_id=None,
)
# Search with very high min_score - should filter out low relevance
results_high, _ = await unified_hybrid_search(
query="quantum computing algorithms",
content_types=[ContentType.BLOCK],
min_score=0.9, # Very high threshold
page=1,
page_size=20,
)
# Search with low min_score
results_low, _ = await unified_hybrid_search(
query="quantum computing algorithms",
content_types=[ContentType.BLOCK],
min_score=0.01, # Very low threshold
page=1,
page_size=20,
)
# High threshold should have fewer or equal results
assert len(results_high) <= len(results_low)
# ============================================================================
# hybrid_search (Store Agents) Tests
# ============================================================================
@pytest.mark.asyncio(loop_scope="session")
async def test_hybrid_search_store_agents_sql_valid(server):
"""Test that hybrid_search SQL executes without errors."""
# This test verifies the SQL is syntactically correct
# even if no results are found
results, total = await hybrid_search(
query="test agent",
page=1,
page_size=20,
)
# Should not raise - verifies SQL is valid
assert isinstance(results, list)
assert isinstance(total, int)
assert total >= 0
@pytest.mark.asyncio(loop_scope="session")
async def test_hybrid_search_with_filters(server):
"""Test hybrid_search with various filter options."""
# Test with all filter types
results, total = await hybrid_search(
query="productivity",
featured=True,
creators=["test-creator"],
category="productivity",
page=1,
page_size=10,
)
# Should not raise - verifies filter SQL is valid
assert isinstance(results, list)
assert isinstance(total, int)
@pytest.mark.asyncio(loop_scope="session")
async def test_hybrid_search_pagination(server):
"""Test hybrid_search pagination."""
# Page 1
results1, total1 = await hybrid_search(
query="agent",
page=1,
page_size=5,
)
# Page 2
results2, total2 = await hybrid_search(
query="agent",
page=2,
page_size=5,
)
# Verify SQL executes without error
assert isinstance(results1, list)
assert isinstance(results2, list)
assert isinstance(total1, int)
assert isinstance(total2, int)
# If page 1 has results, total should be > 0
# Note: total from page 2 may be 0 if no results on that page (COUNT(*) OVER limitation)
if results1:
assert total1 > 0
# ============================================================================
# SQL Validity Tests (verify queries don't break)
# ============================================================================
@pytest.mark.asyncio(loop_scope="session")
async def test_all_content_types_searchable(server):
"""Test that all content types can be searched without SQL errors."""
for content_type in [
ContentType.STORE_AGENT,
ContentType.BLOCK,
ContentType.DOCUMENTATION,
]:
results, total = await unified_hybrid_search(
query="test",
content_types=[content_type],
page=1,
page_size=10,
)
# Should not raise
assert isinstance(results, list)
assert isinstance(total, int)
@pytest.mark.asyncio(loop_scope="session")
async def test_multiple_content_types_searchable(server):
"""Test searching multiple content types at once."""
results, total = await unified_hybrid_search(
query="test",
content_types=[ContentType.BLOCK, ContentType.DOCUMENTATION],
page=1,
page_size=20,
)
# Should not raise
assert isinstance(results, list)
assert isinstance(total, int)
@pytest.mark.asyncio(loop_scope="session")
async def test_search_all_content_types_default(server):
"""Test searching all content types (default behavior)."""
results, total = await unified_hybrid_search(
query="test",
content_types=None, # Should search all
page=1,
page_size=20,
)
# Should not raise
assert isinstance(results, list)
assert isinstance(total, int)
if __name__ == "__main__":
pytest.main([__file__, "-v", "-s"])

View File

@@ -4,13 +4,12 @@ Integration tests for embeddings with schema handling.
These tests verify that embeddings operations work correctly across different database schemas.
"""
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import AsyncMock, patch
import pytest
from prisma.enums import ContentType
from backend.api.features.store import embeddings
from backend.api.features.store.embeddings import EMBEDDING_DIM
# Schema prefix tests removed - functionality moved to db.raw_with_schema() helper
@@ -29,7 +28,7 @@ async def test_store_content_embedding_with_schema():
result = await embeddings.store_content_embedding(
content_type=ContentType.STORE_AGENT,
content_id="test-id",
embedding=[0.1] * EMBEDDING_DIM,
embedding=[0.1] * 1536,
searchable_text="test text",
metadata={"test": "data"},
user_id=None,
@@ -126,69 +125,84 @@ async def test_delete_content_embedding_with_schema():
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.integration
async def test_get_embedding_stats_with_schema():
"""Test embedding statistics with proper schema handling via content handlers."""
# Mock handler to return stats
mock_handler = MagicMock()
mock_handler.get_stats = AsyncMock(
return_value={
"total": 100,
"with_embeddings": 80,
"without_embeddings": 20,
}
)
"""Test embedding statistics with proper schema handling."""
with patch("backend.data.db.get_database_schema") as mock_schema:
mock_schema.return_value = "platform"
with patch(
"backend.api.features.store.embeddings.CONTENT_HANDLERS",
{ContentType.STORE_AGENT: mock_handler},
):
result = await embeddings.get_embedding_stats()
with patch("prisma.get_client") as mock_get_client:
mock_client = AsyncMock()
# Mock both query results
mock_client.query_raw.side_effect = [
[{"count": 100}], # total_approved
[{"count": 80}], # with_embeddings
]
mock_get_client.return_value = mock_client
# Verify handler was called
mock_handler.get_stats.assert_called_once()
result = await embeddings.get_embedding_stats()
# Verify new result structure
assert "by_type" in result
assert "totals" in result
assert result["totals"]["total"] == 100
assert result["totals"]["with_embeddings"] == 80
assert result["totals"]["without_embeddings"] == 20
assert result["totals"]["coverage_percent"] == 80.0
# Verify both queries were called
assert mock_client.query_raw.call_count == 2
# Get both SQL queries
first_call = mock_client.query_raw.call_args_list[0]
second_call = mock_client.query_raw.call_args_list[1]
first_sql = first_call[0][0]
second_sql = second_call[0][0]
# Verify schema prefix in both queries
assert '"platform"."StoreListingVersion"' in first_sql
assert '"platform"."StoreListingVersion"' in second_sql
assert '"platform"."UnifiedContentEmbedding"' in second_sql
# Verify results
assert result["total_approved"] == 100
assert result["with_embeddings"] == 80
assert result["without_embeddings"] == 20
assert result["coverage_percent"] == 80.0
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.integration
async def test_backfill_missing_embeddings_with_schema():
"""Test backfilling embeddings via content handlers."""
from backend.api.features.store.content_handlers import ContentItem
"""Test backfilling embeddings with proper schema handling."""
with patch("backend.data.db.get_database_schema") as mock_schema:
mock_schema.return_value = "platform"
# Create mock content item
mock_item = ContentItem(
content_id="version-1",
content_type=ContentType.STORE_AGENT,
searchable_text="Test Agent Test description",
metadata={"name": "Test Agent"},
)
with patch("prisma.get_client") as mock_get_client:
mock_client = AsyncMock()
# Mock missing embeddings query
mock_client.query_raw.return_value = [
{
"id": "version-1",
"name": "Test Agent",
"description": "Test description",
"subHeading": "Test heading",
"categories": ["test"],
}
]
mock_get_client.return_value = mock_client
# Mock handler
mock_handler = MagicMock()
mock_handler.get_missing_items = AsyncMock(return_value=[mock_item])
with patch(
"backend.api.features.store.embeddings.CONTENT_HANDLERS",
{ContentType.STORE_AGENT: mock_handler},
):
with patch(
"backend.api.features.store.embeddings.generate_embedding",
return_value=[0.1] * EMBEDDING_DIM,
):
with patch(
"backend.api.features.store.embeddings.store_content_embedding",
return_value=True,
):
"backend.api.features.store.embeddings.ensure_embedding"
) as mock_ensure:
mock_ensure.return_value = True
result = await embeddings.backfill_missing_embeddings(batch_size=10)
# Verify handler was called
mock_handler.get_missing_items.assert_called_once_with(10)
# Verify the query was called
assert mock_client.query_raw.called
# Get the SQL query
call_args = mock_client.query_raw.call_args
sql_query = call_args[0][0]
# Verify schema prefix in query
assert '"platform"."StoreListingVersion"' in sql_query
assert '"platform"."UnifiedContentEmbedding"' in sql_query
# Verify ensure_embedding was called
assert mock_ensure.called
# Verify results
assert result["processed"] == 1
@@ -212,7 +226,7 @@ async def test_ensure_content_embedding_with_schema():
with patch(
"backend.api.features.store.embeddings.generate_embedding"
) as mock_generate:
mock_generate.return_value = [0.1] * EMBEDDING_DIM
mock_generate.return_value = [0.1] * 1536
with patch(
"backend.api.features.store.embeddings.store_content_embedding"
@@ -246,7 +260,7 @@ async def test_backward_compatibility_store_embedding():
result = await embeddings.store_embedding(
version_id="test-version-id",
embedding=[0.1] * EMBEDDING_DIM,
embedding=[0.1] * 1536,
tx=None,
)
@@ -301,7 +315,7 @@ async def test_schema_handling_error_cases():
result = await embeddings.store_content_embedding(
content_type=ContentType.STORE_AGENT,
content_id="test-id",
embedding=[0.1] * EMBEDDING_DIM,
embedding=[0.1] * 1536,
searchable_text="test",
metadata=None,
user_id=None,

View File

@@ -63,7 +63,7 @@ async def test_generate_embedding_success():
result = await embeddings.generate_embedding("test text")
assert result is not None
assert len(result) == embeddings.EMBEDDING_DIM
assert len(result) == 1536
assert result[0] == 0.1
mock_client.embeddings.create.assert_called_once_with(
@@ -110,7 +110,7 @@ async def test_generate_embedding_text_truncation():
mock_client = MagicMock()
mock_response = MagicMock()
mock_response.data = [MagicMock()]
mock_response.data[0].embedding = [0.1] * embeddings.EMBEDDING_DIM
mock_response.data[0].embedding = [0.1] * 1536
# Use AsyncMock for async embeddings.create method
mock_client.embeddings.create = AsyncMock(return_value=mock_response)
@@ -297,92 +297,72 @@ async def test_ensure_embedding_generation_fails(mock_get, mock_generate):
@pytest.mark.asyncio(loop_scope="session")
async def test_get_embedding_stats():
"""Test embedding statistics retrieval."""
# Mock handler stats for each content type
mock_handler = MagicMock()
mock_handler.get_stats = AsyncMock(
return_value={
"total": 100,
"with_embeddings": 75,
"without_embeddings": 25,
}
)
# Mock approved count query and embedded count query
mock_approved_result = [{"count": 100}]
mock_embedded_result = [{"count": 75}]
# Patch the CONTENT_HANDLERS where it's used (in embeddings module)
with patch(
"backend.api.features.store.embeddings.CONTENT_HANDLERS",
{ContentType.STORE_AGENT: mock_handler},
"backend.api.features.store.embeddings.query_raw_with_schema",
side_effect=[mock_approved_result, mock_embedded_result],
):
result = await embeddings.get_embedding_stats()
assert "by_type" in result
assert "totals" in result
assert result["totals"]["total"] == 100
assert result["totals"]["with_embeddings"] == 75
assert result["totals"]["without_embeddings"] == 25
assert result["totals"]["coverage_percent"] == 75.0
assert result["total_approved"] == 100
assert result["with_embeddings"] == 75
assert result["without_embeddings"] == 25
assert result["coverage_percent"] == 75.0
@pytest.mark.asyncio(loop_scope="session")
@patch("backend.api.features.store.embeddings.store_content_embedding")
async def test_backfill_missing_embeddings_success(mock_store):
@patch("backend.api.features.store.embeddings.ensure_embedding")
async def test_backfill_missing_embeddings_success(mock_ensure):
"""Test backfill with successful embedding generation."""
# Mock ContentItem from handlers
from backend.api.features.store.content_handlers import ContentItem
mock_items = [
ContentItem(
content_id="version-1",
content_type=ContentType.STORE_AGENT,
searchable_text="Agent 1 Description 1",
metadata={"name": "Agent 1"},
),
ContentItem(
content_id="version-2",
content_type=ContentType.STORE_AGENT,
searchable_text="Agent 2 Description 2",
metadata={"name": "Agent 2"},
),
# Mock missing embeddings query
mock_missing = [
{
"id": "version-1",
"name": "Agent 1",
"description": "Description 1",
"subHeading": "Heading 1",
"categories": ["AI"],
},
{
"id": "version-2",
"name": "Agent 2",
"description": "Description 2",
"subHeading": "Heading 2",
"categories": ["Productivity"],
},
]
# Mock handler to return missing items
mock_handler = MagicMock()
mock_handler.get_missing_items = AsyncMock(return_value=mock_items)
# Mock store_content_embedding to succeed for first, fail for second
mock_store.side_effect = [True, False]
# Mock ensure_embedding to succeed for first, fail for second
mock_ensure.side_effect = [True, False]
with patch(
"backend.api.features.store.embeddings.CONTENT_HANDLERS",
{ContentType.STORE_AGENT: mock_handler},
"backend.api.features.store.embeddings.query_raw_with_schema",
return_value=mock_missing,
):
with patch(
"backend.api.features.store.embeddings.generate_embedding",
return_value=[0.1] * embeddings.EMBEDDING_DIM,
):
result = await embeddings.backfill_missing_embeddings(batch_size=5)
result = await embeddings.backfill_missing_embeddings(batch_size=5)
assert result["processed"] == 2
assert result["success"] == 1
assert result["failed"] == 1
assert mock_store.call_count == 2
assert result["processed"] == 2
assert result["success"] == 1
assert result["failed"] == 1
assert mock_ensure.call_count == 2
@pytest.mark.asyncio(loop_scope="session")
async def test_backfill_missing_embeddings_no_missing():
"""Test backfill when no embeddings are missing."""
# Mock handler to return no missing items
mock_handler = MagicMock()
mock_handler.get_missing_items = AsyncMock(return_value=[])
with patch(
"backend.api.features.store.embeddings.CONTENT_HANDLERS",
{ContentType.STORE_AGENT: mock_handler},
"backend.api.features.store.embeddings.query_raw_with_schema",
return_value=[],
):
result = await embeddings.backfill_missing_embeddings(batch_size=5)
assert result["processed"] == 0
assert result["success"] == 0
assert result["failed"] == 0
assert result["message"] == "No missing embeddings"
@pytest.mark.asyncio(loop_scope="session")

View File

@@ -1,18 +1,16 @@
"""
Unified Hybrid Search
Hybrid Search for Store Agents
Combines semantic (embedding) search with lexical (tsvector) search
for improved relevance across all content types (agents, blocks, docs).
for improved relevance in marketplace agent discovery.
"""
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Literal
from prisma.enums import ContentType
from backend.api.features.store.embeddings import (
EMBEDDING_DIM,
embed_query,
embedding_to_vector_string,
)
@@ -22,299 +20,17 @@ logger = logging.getLogger(__name__)
@dataclass
class UnifiedSearchWeights:
"""Weights for unified search (no popularity signal)."""
class HybridSearchWeights:
"""Weights for combining search signals."""
semantic: float = 0.40 # Embedding cosine similarity
lexical: float = 0.40 # tsvector ts_rank_cd score
category: float = 0.10 # Category match boost (for types that have categories)
recency: float = 0.10 # Newer content ranked higher
semantic: float = 0.30 # Embedding cosine similarity
lexical: float = 0.30 # tsvector ts_rank_cd score
category: float = 0.20 # Category match boost
recency: float = 0.10 # Newer agents ranked higher
popularity: float = 0.10 # Agent usage/runs (PageRank-like)
def __post_init__(self):
"""Validate weights are non-negative and sum to approximately 1.0."""
total = self.semantic + self.lexical + self.category + self.recency
if any(
w < 0 for w in [self.semantic, self.lexical, self.category, self.recency]
):
raise ValueError("All weights must be non-negative")
if not (0.99 <= total <= 1.01):
raise ValueError(f"Weights must sum to ~1.0, got {total:.3f}")
# Default weights for unified search
DEFAULT_UNIFIED_WEIGHTS = UnifiedSearchWeights()
# Minimum relevance score thresholds
DEFAULT_MIN_SCORE = 0.15 # For unified search (more permissive)
DEFAULT_STORE_AGENT_MIN_SCORE = 0.20 # For store agent search (original threshold)
async def unified_hybrid_search(
query: str,
content_types: list[ContentType] | None = None,
category: str | None = None,
page: int = 1,
page_size: int = 20,
weights: UnifiedSearchWeights | None = None,
min_score: float | None = None,
user_id: str | None = None,
) -> tuple[list[dict[str, Any]], int]:
"""
Unified hybrid search across all content types.
Searches UnifiedContentEmbedding using both semantic (vector) and lexical (tsvector) signals.
Args:
query: Search query string
content_types: List of content types to search. Defaults to all public types.
category: Filter by category (for content types that support it)
page: Page number (1-indexed)
page_size: Results per page
weights: Custom weights for search signals
min_score: Minimum relevance score threshold (0-1)
user_id: User ID for searching private content (library agents)
Returns:
Tuple of (results list, total count)
"""
# Validate inputs
query = query.strip()
if not query:
return [], 0
if page < 1:
page = 1
if page_size < 1:
page_size = 1
if page_size > 100:
page_size = 100
if content_types is None:
content_types = [
ContentType.STORE_AGENT,
ContentType.BLOCK,
ContentType.DOCUMENTATION,
]
if weights is None:
weights = DEFAULT_UNIFIED_WEIGHTS
if min_score is None:
min_score = DEFAULT_MIN_SCORE
offset = (page - 1) * page_size
# Generate query embedding
query_embedding = await embed_query(query)
# Graceful degradation if embedding unavailable
if query_embedding is None or not query_embedding:
logger.warning(
"Failed to generate query embedding - falling back to lexical-only search. "
"Check that openai_internal_api_key is configured and OpenAI API is accessible."
)
query_embedding = [0.0] * EMBEDDING_DIM
# Redistribute semantic weight to lexical
total_non_semantic = weights.lexical + weights.category + weights.recency
if total_non_semantic > 0:
factor = 1.0 / total_non_semantic
weights = UnifiedSearchWeights(
semantic=0.0,
lexical=weights.lexical * factor,
category=weights.category * factor,
recency=weights.recency * factor,
)
else:
weights = UnifiedSearchWeights(
semantic=0.0, lexical=1.0, category=0.0, recency=0.0
)
# Build parameters
params: list[Any] = []
param_idx = 1
# Query for lexical search
params.append(query)
query_param = f"${param_idx}"
param_idx += 1
# Query lowercase for category matching
params.append(query.lower())
query_lower_param = f"${param_idx}"
param_idx += 1
# Embedding
embedding_str = embedding_to_vector_string(query_embedding)
params.append(embedding_str)
embedding_param = f"${param_idx}"
param_idx += 1
# Content types
content_type_values = [ct.value for ct in content_types]
params.append(content_type_values)
content_types_param = f"${param_idx}"
param_idx += 1
# User ID filter (for private content)
user_filter = ""
if user_id is not None:
params.append(user_id)
user_filter = f'AND (uce."userId" = ${param_idx} OR uce."userId" IS NULL)'
param_idx += 1
else:
user_filter = 'AND uce."userId" IS NULL'
# Weights
params.append(weights.semantic)
w_semantic = f"${param_idx}"
param_idx += 1
params.append(weights.lexical)
w_lexical = f"${param_idx}"
param_idx += 1
params.append(weights.category)
w_category = f"${param_idx}"
param_idx += 1
params.append(weights.recency)
w_recency = f"${param_idx}"
param_idx += 1
# Min score
params.append(min_score)
min_score_param = f"${param_idx}"
param_idx += 1
# Pagination
params.append(page_size)
limit_param = f"${param_idx}"
param_idx += 1
params.append(offset)
offset_param = f"${param_idx}"
param_idx += 1
# Unified search query on UnifiedContentEmbedding
sql_query = f"""
WITH candidates AS (
-- Lexical matches (uses GIN index on search column)
SELECT uce.id, uce."contentType", uce."contentId"
FROM {{schema_prefix}}"UnifiedContentEmbedding" uce
WHERE uce."contentType" = ANY({content_types_param}::{{schema_prefix}}"ContentType"[])
{user_filter}
AND uce.search @@ plainto_tsquery('english', {query_param})
UNION
-- Semantic matches (uses HNSW index on embedding)
(
SELECT uce.id, uce."contentType", uce."contentId"
FROM {{schema_prefix}}"UnifiedContentEmbedding" uce
WHERE uce."contentType" = ANY({content_types_param}::{{schema_prefix}}"ContentType"[])
{user_filter}
ORDER BY uce.embedding <=> {embedding_param}::vector
LIMIT 200
)
),
search_scores AS (
SELECT
uce."contentType" as content_type,
uce."contentId" as content_id,
uce."searchableText" as searchable_text,
uce.metadata,
uce."updatedAt" as updated_at,
-- Semantic score: cosine similarity (1 - distance)
COALESCE(1 - (uce.embedding <=> {embedding_param}::vector), 0) as semantic_score,
-- Lexical score: ts_rank_cd
COALESCE(ts_rank_cd(uce.search, plainto_tsquery('english', {query_param})), 0) as lexical_raw,
-- Category match from metadata
CASE
WHEN uce.metadata ? 'categories' AND EXISTS (
SELECT 1 FROM jsonb_array_elements_text(uce.metadata->'categories') cat
WHERE LOWER(cat) LIKE '%' || {query_lower_param} || '%'
)
THEN 1.0
ELSE 0.0
END as category_score,
-- Recency score: linear decay over 90 days
GREATEST(0, 1 - EXTRACT(EPOCH FROM (NOW() - uce."updatedAt")) / (90 * 24 * 3600)) as recency_score
FROM candidates c
INNER JOIN {{schema_prefix}}"UnifiedContentEmbedding" uce ON c.id = uce.id
),
max_lexical AS (
SELECT GREATEST(MAX(lexical_raw), 0.001) as max_val FROM search_scores
),
normalized AS (
SELECT
ss.*,
ss.lexical_raw / ml.max_val as lexical_score
FROM search_scores ss
CROSS JOIN max_lexical ml
),
scored AS (
SELECT
content_type,
content_id,
searchable_text,
metadata,
updated_at,
semantic_score,
lexical_score,
category_score,
recency_score,
(
{w_semantic} * semantic_score +
{w_lexical} * lexical_score +
{w_category} * category_score +
{w_recency} * recency_score
) as combined_score
FROM normalized
),
filtered AS (
SELECT
*,
COUNT(*) OVER () as total_count
FROM scored
WHERE combined_score >= {min_score_param}
)
SELECT * FROM filtered
ORDER BY combined_score DESC
LIMIT {limit_param} OFFSET {offset_param}
"""
results = await query_raw_with_schema(
sql_query, *params, set_public_search_path=True
)
total = results[0]["total_count"] if results else 0
# Clean up results
for result in results:
result.pop("total_count", None)
logger.info(f"Unified hybrid search: {len(results)} results, {total} total")
return results, total
# ============================================================================
# Store Agent specific search (with full metadata)
# ============================================================================
@dataclass
class StoreAgentSearchWeights:
"""Weights for store agent search including popularity."""
semantic: float = 0.30
lexical: float = 0.30
category: float = 0.20
recency: float = 0.10
popularity: float = 0.10
def __post_init__(self):
total = (
self.semantic
+ self.lexical
@@ -322,6 +38,7 @@ class StoreAgentSearchWeights:
+ self.recency
+ self.popularity
)
if any(
w < 0
for w in [
@@ -333,11 +50,46 @@ class StoreAgentSearchWeights:
]
):
raise ValueError("All weights must be non-negative")
if not (0.99 <= total <= 1.01):
raise ValueError(f"Weights must sum to ~1.0, got {total:.3f}")
DEFAULT_STORE_AGENT_WEIGHTS = StoreAgentSearchWeights()
DEFAULT_WEIGHTS = HybridSearchWeights()
# Minimum relevance score threshold - agents below this are filtered out
# With weights (0.30 semantic + 0.30 lexical + 0.20 category + 0.10 recency + 0.10 popularity):
# - 0.20 means at least ~60% semantic match OR strong lexical match required
# - Ensures only genuinely relevant results are returned
# - Recency/popularity alone (0.10 each) won't pass the threshold
DEFAULT_MIN_SCORE = 0.20
@dataclass
class HybridSearchResult:
"""A single search result with score breakdown."""
slug: str
agent_name: str
agent_image: str
creator_username: str
creator_avatar: str
sub_heading: str
description: str
runs: int
rating: float
categories: list[str]
featured: bool
is_available: bool
updated_at: datetime
# Score breakdown (for debugging/tuning)
combined_score: float
semantic_score: float = 0.0
lexical_score: float = 0.0
category_score: float = 0.0
recency_score: float = 0.0
popularity_score: float = 0.0
async def hybrid_search(
@@ -350,263 +102,276 @@ async def hybrid_search(
) = None,
page: int = 1,
page_size: int = 20,
weights: StoreAgentSearchWeights | None = None,
weights: HybridSearchWeights | None = None,
min_score: float | None = None,
) -> tuple[list[dict[str, Any]], int]:
"""
Hybrid search for store agents with full metadata.
Perform hybrid search combining semantic and lexical signals.
Uses UnifiedContentEmbedding for search, joins to StoreAgent for metadata.
Args:
query: Search query string
featured: Filter for featured agents only
creators: Filter by creator usernames
category: Filter by category
sorted_by: Sort order (relevance uses hybrid scoring)
page: Page number (1-indexed)
page_size: Results per page
weights: Custom weights for search signals
min_score: Minimum relevance score threshold (0-1). Results below
this score are filtered out. Defaults to DEFAULT_MIN_SCORE.
Returns:
Tuple of (results list, total count). Returns empty list if no
results meet the minimum relevance threshold.
"""
# Validate inputs
query = query.strip()
if not query:
return [], 0
return [], 0 # Empty query returns no results
if page < 1:
page = 1
if page_size < 1:
page_size = 1
if page_size > 100:
if page_size > 100: # Cap at reasonable limit to prevent performance issues
page_size = 100
if weights is None:
weights = DEFAULT_STORE_AGENT_WEIGHTS
weights = DEFAULT_WEIGHTS
if min_score is None:
min_score = (
DEFAULT_STORE_AGENT_MIN_SCORE # Use original threshold for store agents
)
min_score = DEFAULT_MIN_SCORE
offset = (page - 1) * page_size
# Generate query embedding
query_embedding = await embed_query(query)
# Graceful degradation
if query_embedding is None or not query_embedding:
logger.warning(
"Failed to generate query embedding - falling back to lexical-only search."
)
query_embedding = [0.0] * EMBEDDING_DIM
total_non_semantic = (
weights.lexical + weights.category + weights.recency + weights.popularity
)
if total_non_semantic > 0:
factor = 1.0 / total_non_semantic
weights = StoreAgentSearchWeights(
semantic=0.0,
lexical=weights.lexical * factor,
category=weights.category * factor,
recency=weights.recency * factor,
popularity=weights.popularity * factor,
)
else:
weights = StoreAgentSearchWeights(
semantic=0.0, lexical=1.0, category=0.0, recency=0.0, popularity=0.0
)
# Build parameters
# Build WHERE clause conditions
where_parts: list[str] = ["sa.is_available = true"]
params: list[Any] = []
param_idx = 1
param_index = 1
# Add search query for lexical matching
params.append(query)
query_param = f"${param_idx}"
param_idx += 1
query_param = f"${param_index}"
param_index += 1
# Add lowercased query for category matching
params.append(query.lower())
query_lower_param = f"${param_idx}"
param_idx += 1
embedding_str = embedding_to_vector_string(query_embedding)
params.append(embedding_str)
embedding_param = f"${param_idx}"
param_idx += 1
# Build WHERE clause for StoreAgent filters
where_parts = ["sa.is_available = true"]
query_lower_param = f"${param_index}"
param_index += 1
if featured:
where_parts.append("sa.featured = true")
if creators:
where_parts.append(f"sa.creator_username = ANY(${param_index})")
params.append(creators)
where_parts.append(f"sa.creator_username = ANY(${param_idx})")
param_idx += 1
param_index += 1
if category:
where_parts.append(f"${param_index} = ANY(sa.categories)")
params.append(category)
where_parts.append(f"${param_idx} = ANY(sa.categories)")
param_idx += 1
param_index += 1
# Safe: where_parts only contains hardcoded strings with $N parameter placeholders
# No user input is concatenated directly into the SQL string
where_clause = " AND ".join(where_parts)
# Weights
# Embedding is required for hybrid search - fail fast if unavailable
if query_embedding is None or not query_embedding:
# Log detailed error server-side
logger.error(
"Failed to generate query embedding. "
"Check that openai_internal_api_key is configured and OpenAI API is accessible."
)
# Raise generic error to client
raise ValueError("Search service temporarily unavailable")
# Add embedding parameter
embedding_str = embedding_to_vector_string(query_embedding)
params.append(embedding_str)
embedding_param = f"${param_index}"
param_index += 1
# Add weight parameters for SQL calculation
params.append(weights.semantic)
w_semantic = f"${param_idx}"
param_idx += 1
weight_semantic_param = f"${param_index}"
param_index += 1
params.append(weights.lexical)
w_lexical = f"${param_idx}"
param_idx += 1
weight_lexical_param = f"${param_index}"
param_index += 1
params.append(weights.category)
w_category = f"${param_idx}"
param_idx += 1
weight_category_param = f"${param_index}"
param_index += 1
params.append(weights.recency)
w_recency = f"${param_idx}"
param_idx += 1
weight_recency_param = f"${param_index}"
param_index += 1
params.append(weights.popularity)
w_popularity = f"${param_idx}"
param_idx += 1
weight_popularity_param = f"${param_index}"
param_index += 1
# Add min_score parameter
params.append(min_score)
min_score_param = f"${param_idx}"
param_idx += 1
min_score_param = f"${param_index}"
param_index += 1
params.append(page_size)
limit_param = f"${param_idx}"
param_idx += 1
params.append(offset)
offset_param = f"${param_idx}"
param_idx += 1
# Query using UnifiedContentEmbedding for search, StoreAgent for metadata
# Optimized hybrid search query:
# 1. Direct join to UnifiedContentEmbedding via contentId=storeListingVersionId (no redundant JOINs)
# 2. UNION approach (deduplicates agents matching both branches)
# 3. COUNT(*) OVER() to get total count in single query
# 4. Optimized category matching with EXISTS + unnest
# 5. Pre-calculated max values for lexical and popularity normalization
# 6. Simplified recency calculation with linear decay
# 7. Logarithmic popularity scaling to prevent viral agents from dominating
sql_query = f"""
WITH candidates AS (
-- Lexical matches via UnifiedContentEmbedding.search
SELECT uce."contentId" as "storeListingVersionId"
FROM {{schema_prefix}}"UnifiedContentEmbedding" uce
INNER JOIN {{schema_prefix}}"StoreAgent" sa
ON uce."contentId" = sa."storeListingVersionId"
WHERE uce."contentType" = 'STORE_AGENT'::{{schema_prefix}}"ContentType"
AND uce."userId" IS NULL
AND uce.search @@ plainto_tsquery('english', {query_param})
AND {where_clause}
WITH candidates AS (
-- Lexical matches (uses GIN index on search column)
SELECT sa."storeListingVersionId"
FROM {{schema_prefix}}"StoreAgent" sa
WHERE {where_clause}
AND sa.search @@ plainto_tsquery('english', {query_param})
UNION
UNION
-- Semantic matches via UnifiedContentEmbedding.embedding
SELECT uce."contentId" as "storeListingVersionId"
FROM (
SELECT uce."contentId", uce.embedding
FROM {{schema_prefix}}"UnifiedContentEmbedding" uce
-- Semantic matches (uses HNSW index on embedding with KNN)
SELECT "storeListingVersionId"
FROM (
SELECT sa."storeListingVersionId", uce.embedding
FROM {{schema_prefix}}"StoreAgent" sa
INNER JOIN {{schema_prefix}}"UnifiedContentEmbedding" uce
ON sa."storeListingVersionId" = uce."contentId" AND uce."contentType" = 'STORE_AGENT'::{{schema_prefix}}"ContentType"
WHERE {where_clause}
ORDER BY uce.embedding <=> {embedding_param}::vector
LIMIT 200
) semantic_results
),
search_scores AS (
SELECT
sa.slug,
sa.agent_name,
sa.agent_image,
sa.creator_username,
sa.creator_avatar,
sa.sub_heading,
sa.description,
sa.runs,
sa.rating,
sa.categories,
sa.featured,
sa.is_available,
sa.updated_at,
-- Semantic score: cosine similarity (1 - distance)
COALESCE(1 - (uce.embedding <=> {embedding_param}::vector), 0) as semantic_score,
-- Lexical score: ts_rank_cd (will be normalized later)
COALESCE(ts_rank_cd(sa.search, plainto_tsquery('english', {query_param})), 0) as lexical_raw,
-- Category match: optimized with unnest for better performance
CASE
WHEN EXISTS (
SELECT 1 FROM unnest(sa.categories) cat
WHERE LOWER(cat) LIKE '%' || {query_lower_param} || '%'
)
THEN 1.0
ELSE 0.0
END as category_score,
-- Recency score: linear decay over 90 days (simpler than exponential)
GREATEST(0, 1 - EXTRACT(EPOCH FROM (NOW() - sa.updated_at)) / (90 * 24 * 3600)) as recency_score,
-- Popularity raw: agent runs count (will be normalized with log scaling)
sa.runs as popularity_raw
FROM candidates c
INNER JOIN {{schema_prefix}}"StoreAgent" sa
ON uce."contentId" = sa."storeListingVersionId"
WHERE uce."contentType" = 'STORE_AGENT'::{{schema_prefix}}"ContentType"
AND uce."userId" IS NULL
AND {where_clause}
ORDER BY uce.embedding <=> {embedding_param}::vector
LIMIT 200
) uce
),
search_scores AS (
SELECT
sa.slug,
sa.agent_name,
sa.agent_image,
sa.creator_username,
sa.creator_avatar,
sa.sub_heading,
sa.description,
sa.runs,
sa.rating,
sa.categories,
sa.featured,
sa.is_available,
sa.updated_at,
-- Semantic score
COALESCE(1 - (uce.embedding <=> {embedding_param}::vector), 0) as semantic_score,
-- Lexical score (raw, will normalize)
COALESCE(ts_rank_cd(uce.search, plainto_tsquery('english', {query_param})), 0) as lexical_raw,
-- Category match
CASE
WHEN EXISTS (
SELECT 1 FROM unnest(sa.categories) cat
WHERE LOWER(cat) LIKE '%' || {query_lower_param} || '%'
)
THEN 1.0
ELSE 0.0
END as category_score,
-- Recency
GREATEST(0, 1 - EXTRACT(EPOCH FROM (NOW() - sa.updated_at)) / (90 * 24 * 3600)) as recency_score,
-- Popularity (raw)
sa.runs as popularity_raw
FROM candidates c
INNER JOIN {{schema_prefix}}"StoreAgent" sa
ON c."storeListingVersionId" = sa."storeListingVersionId"
INNER JOIN {{schema_prefix}}"UnifiedContentEmbedding" uce
ON sa."storeListingVersionId" = uce."contentId"
AND uce."contentType" = 'STORE_AGENT'::{{schema_prefix}}"ContentType"
),
max_vals AS (
SELECT
GREATEST(MAX(lexical_raw), 0.001) as max_lexical,
GREATEST(MAX(popularity_raw), 1) as max_popularity
FROM search_scores
),
normalized AS (
SELECT
ss.*,
ss.lexical_raw / mv.max_lexical as lexical_score,
CASE
WHEN ss.popularity_raw > 0
THEN LN(1 + ss.popularity_raw) / LN(1 + mv.max_popularity)
ELSE 0
END as popularity_score
FROM search_scores ss
CROSS JOIN max_vals mv
),
scored AS (
SELECT
slug,
agent_name,
agent_image,
creator_username,
creator_avatar,
sub_heading,
description,
runs,
rating,
categories,
featured,
is_available,
updated_at,
semantic_score,
lexical_score,
category_score,
recency_score,
popularity_score,
(
{w_semantic} * semantic_score +
{w_lexical} * lexical_score +
{w_category} * category_score +
{w_recency} * recency_score +
{w_popularity} * popularity_score
) as combined_score
FROM normalized
),
filtered AS (
SELECT *, COUNT(*) OVER () as total_count
FROM scored
WHERE combined_score >= {min_score_param}
)
SELECT * FROM filtered
ORDER BY combined_score DESC
LIMIT {limit_param} OFFSET {offset_param}
ON c."storeListingVersionId" = sa."storeListingVersionId"
LEFT JOIN {{schema_prefix}}"UnifiedContentEmbedding" uce
ON sa."storeListingVersionId" = uce."contentId" AND uce."contentType" = 'STORE_AGENT'::{{schema_prefix}}"ContentType"
),
max_lexical AS (
SELECT MAX(lexical_raw) as max_val FROM search_scores
),
max_popularity AS (
SELECT MAX(popularity_raw) as max_val FROM search_scores
),
normalized AS (
SELECT
ss.*,
-- Normalize lexical score by pre-calculated max
CASE
WHEN ml.max_val > 0
THEN ss.lexical_raw / ml.max_val
ELSE 0
END as lexical_score,
-- Normalize popularity with logarithmic scaling to prevent viral agents from dominating
-- LOG(1 + runs) / LOG(1 + max_runs) ensures score is 0-1 range
CASE
WHEN mp.max_val > 0 AND ss.popularity_raw > 0
THEN LN(1 + ss.popularity_raw) / LN(1 + mp.max_val)
ELSE 0
END as popularity_score
FROM search_scores ss
CROSS JOIN max_lexical ml
CROSS JOIN max_popularity mp
),
scored AS (
SELECT
slug,
agent_name,
agent_image,
creator_username,
creator_avatar,
sub_heading,
description,
runs,
rating,
categories,
featured,
is_available,
updated_at,
semantic_score,
lexical_score,
category_score,
recency_score,
popularity_score,
(
{weight_semantic_param} * semantic_score +
{weight_lexical_param} * lexical_score +
{weight_category_param} * category_score +
{weight_recency_param} * recency_score +
{weight_popularity_param} * popularity_score
) as combined_score
FROM normalized
),
filtered AS (
SELECT
*,
COUNT(*) OVER () as total_count
FROM scored
WHERE combined_score >= {min_score_param}
)
SELECT * FROM filtered
ORDER BY combined_score DESC
LIMIT ${param_index} OFFSET ${param_index + 1}
"""
# Add pagination params
params.extend([page_size, offset])
# Execute search query - includes total_count via window function
results = await query_raw_with_schema(
sql_query, *params, set_public_search_path=True
)
# Extract total count from first result (all rows have same count)
total = results[0]["total_count"] if results else 0
# Remove total_count from results before returning
for result in results:
result.pop("total_count", None)
logger.info(f"Hybrid search (store agents): {len(results)} results, {total} total")
# Log without sensitive query content
logger.info(f"Hybrid search: {len(results)} results, {total} total")
return results, total
@@ -616,10 +381,13 @@ async def hybrid_search_simple(
page: int = 1,
page_size: int = 20,
) -> tuple[list[dict[str, Any]], int]:
"""Simplified hybrid search for store agents."""
return await hybrid_search(query=query, page=page, page_size=page_size)
"""
Simplified hybrid search for common use cases.
# Backward compatibility alias - HybridSearchWeights maps to StoreAgentSearchWeights
# for existing code that expects the popularity parameter
HybridSearchWeights = StoreAgentSearchWeights
Uses default weights and no filters.
"""
return await hybrid_search(
query=query,
page=page,
page_size=page_size,
)

View File

@@ -7,15 +7,8 @@ These tests verify that hybrid search works correctly across different database
from unittest.mock import patch
import pytest
from prisma.enums import ContentType
from backend.api.features.store import embeddings
from backend.api.features.store.hybrid_search import (
HybridSearchWeights,
UnifiedSearchWeights,
hybrid_search,
unified_hybrid_search,
)
from backend.api.features.store.hybrid_search import HybridSearchWeights, hybrid_search
@pytest.mark.asyncio(loop_scope="session")
@@ -56,7 +49,7 @@ async def test_hybrid_search_with_schema_handling():
with patch(
"backend.api.features.store.hybrid_search.embed_query"
) as mock_embed:
mock_embed.return_value = [0.1] * embeddings.EMBEDDING_DIM # Mock embedding
mock_embed.return_value = [0.1] * 1536 # Mock embedding
results, total = await hybrid_search(
query=query,
@@ -92,7 +85,7 @@ async def test_hybrid_search_with_public_schema():
with patch(
"backend.api.features.store.hybrid_search.embed_query"
) as mock_embed:
mock_embed.return_value = [0.1] * embeddings.EMBEDDING_DIM
mock_embed.return_value = [0.1] * 1536
results, total = await hybrid_search(
query="test",
@@ -123,7 +116,7 @@ async def test_hybrid_search_with_custom_schema():
with patch(
"backend.api.features.store.hybrid_search.embed_query"
) as mock_embed:
mock_embed.return_value = [0.1] * embeddings.EMBEDDING_DIM
mock_embed.return_value = [0.1] * 1536
results, total = await hybrid_search(
query="test",
@@ -141,52 +134,22 @@ async def test_hybrid_search_with_custom_schema():
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.integration
async def test_hybrid_search_without_embeddings():
"""Test hybrid search gracefully degrades when embeddings are unavailable."""
# Mock database to return some results
mock_results = [
{
"slug": "test-agent",
"agent_name": "Test Agent",
"agent_image": "test.png",
"creator_username": "creator",
"creator_avatar": "avatar.png",
"sub_heading": "Test heading",
"description": "Test description",
"runs": 100,
"rating": 4.5,
"categories": ["AI"],
"featured": False,
"is_available": True,
"updated_at": "2025-01-01T00:00:00Z",
"semantic_score": 0.0, # Zero because no embedding
"lexical_score": 0.5,
"category_score": 0.0,
"recency_score": 0.1,
"popularity_score": 0.2,
"combined_score": 0.3,
"total_count": 1,
}
]
"""Test hybrid search fails fast when embeddings are unavailable."""
# Patch where the function is used, not where it's defined
with patch("backend.api.features.store.hybrid_search.embed_query") as mock_embed:
with patch(
"backend.api.features.store.hybrid_search.query_raw_with_schema"
) as mock_query:
# Simulate embedding failure
mock_embed.return_value = None
mock_query.return_value = mock_results
# Simulate embedding failure
mock_embed.return_value = None
# Should NOT raise - graceful degradation
results, total = await hybrid_search(
# Should raise ValueError with helpful message
with pytest.raises(ValueError) as exc_info:
await hybrid_search(
query="test",
page=1,
page_size=20,
)
# Verify it returns results even without embeddings
assert len(results) == 1
assert results[0]["slug"] == "test-agent"
assert total == 1
# Verify error message is generic (doesn't leak implementation details)
assert "Search service temporarily unavailable" in str(exc_info.value)
@pytest.mark.asyncio(loop_scope="session")
@@ -201,7 +164,7 @@ async def test_hybrid_search_with_filters():
with patch(
"backend.api.features.store.hybrid_search.embed_query"
) as mock_embed:
mock_embed.return_value = [0.1] * embeddings.EMBEDDING_DIM
mock_embed.return_value = [0.1] * 1536
# Test with featured filter
results, total = await hybrid_search(
@@ -241,7 +204,7 @@ async def test_hybrid_search_weights():
with patch(
"backend.api.features.store.hybrid_search.embed_query"
) as mock_embed:
mock_embed.return_value = [0.1] * embeddings.EMBEDDING_DIM
mock_embed.return_value = [0.1] * 1536
results, total = await hybrid_search(
query="test",
@@ -285,7 +248,7 @@ async def test_hybrid_search_min_score_filtering():
with patch(
"backend.api.features.store.hybrid_search.embed_query"
) as mock_embed:
mock_embed.return_value = [0.1] * embeddings.EMBEDDING_DIM
mock_embed.return_value = [0.1] * 1536
# Test with custom min_score
results, total = await hybrid_search(
@@ -320,7 +283,7 @@ async def test_hybrid_search_pagination():
with patch(
"backend.api.features.store.hybrid_search.embed_query"
) as mock_embed:
mock_embed.return_value = [0.1] * embeddings.EMBEDDING_DIM
mock_embed.return_value = [0.1] * 1536
# Test page 2 with page_size 10
results, total = await hybrid_search(
@@ -354,7 +317,7 @@ async def test_hybrid_search_error_handling():
with patch(
"backend.api.features.store.hybrid_search.embed_query"
) as mock_embed:
mock_embed.return_value = [0.1] * embeddings.EMBEDDING_DIM
mock_embed.return_value = [0.1] * 1536
# Should raise exception
with pytest.raises(Exception) as exc_info:
@@ -367,301 +330,5 @@ async def test_hybrid_search_error_handling():
assert "Database connection error" in str(exc_info.value)
# =============================================================================
# Unified Hybrid Search Tests
# =============================================================================
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.integration
async def test_unified_hybrid_search_basic():
"""Test basic unified hybrid search across all content types."""
mock_results = [
{
"content_type": "STORE_AGENT",
"content_id": "agent-1",
"searchable_text": "Test Agent Description",
"metadata": {"name": "Test Agent"},
"updated_at": "2025-01-01T00:00:00Z",
"semantic_score": 0.7,
"lexical_score": 0.8,
"category_score": 0.5,
"recency_score": 0.3,
"combined_score": 0.6,
"total_count": 2,
},
{
"content_type": "BLOCK",
"content_id": "block-1",
"searchable_text": "Test Block Description",
"metadata": {"name": "Test Block"},
"updated_at": "2025-01-01T00:00:00Z",
"semantic_score": 0.6,
"lexical_score": 0.7,
"category_score": 0.4,
"recency_score": 0.2,
"combined_score": 0.5,
"total_count": 2,
},
]
with patch(
"backend.api.features.store.hybrid_search.query_raw_with_schema"
) as mock_query:
with patch(
"backend.api.features.store.hybrid_search.embed_query"
) as mock_embed:
mock_query.return_value = mock_results
mock_embed.return_value = [0.1] * embeddings.EMBEDDING_DIM
results, total = await unified_hybrid_search(
query="test",
page=1,
page_size=20,
)
assert len(results) == 2
assert total == 2
assert results[0]["content_type"] == "STORE_AGENT"
assert results[1]["content_type"] == "BLOCK"
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.integration
async def test_unified_hybrid_search_filter_by_content_type():
"""Test unified search filtering by specific content types."""
mock_results = [
{
"content_type": "BLOCK",
"content_id": "block-1",
"searchable_text": "Test Block",
"metadata": {},
"updated_at": "2025-01-01T00:00:00Z",
"semantic_score": 0.7,
"lexical_score": 0.8,
"category_score": 0.0,
"recency_score": 0.3,
"combined_score": 0.5,
"total_count": 1,
},
]
with patch(
"backend.api.features.store.hybrid_search.query_raw_with_schema"
) as mock_query:
with patch(
"backend.api.features.store.hybrid_search.embed_query"
) as mock_embed:
mock_query.return_value = mock_results
mock_embed.return_value = [0.1] * embeddings.EMBEDDING_DIM
results, total = await unified_hybrid_search(
query="test",
content_types=[ContentType.BLOCK],
page=1,
page_size=20,
)
# Verify content_types parameter was passed correctly
call_args = mock_query.call_args
params = call_args[0][1:]
# The content types should be in the params as a list
assert ["BLOCK"] in params
assert len(results) == 1
assert total == 1
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.integration
async def test_unified_hybrid_search_with_user_id():
"""Test unified search with user_id for private content."""
mock_results = [
{
"content_type": "STORE_AGENT",
"content_id": "agent-1",
"searchable_text": "My Private Agent",
"metadata": {},
"updated_at": "2025-01-01T00:00:00Z",
"semantic_score": 0.7,
"lexical_score": 0.8,
"category_score": 0.0,
"recency_score": 0.3,
"combined_score": 0.6,
"total_count": 1,
},
]
with patch(
"backend.api.features.store.hybrid_search.query_raw_with_schema"
) as mock_query:
with patch(
"backend.api.features.store.hybrid_search.embed_query"
) as mock_embed:
mock_query.return_value = mock_results
mock_embed.return_value = [0.1] * embeddings.EMBEDDING_DIM
results, total = await unified_hybrid_search(
query="test",
user_id="user-123",
page=1,
page_size=20,
)
# Verify SQL contains user_id filter
call_args = mock_query.call_args
sql_template = call_args[0][0]
params = call_args[0][1:]
assert 'uce."userId"' in sql_template
assert "user-123" in params
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.integration
async def test_unified_hybrid_search_custom_weights():
"""Test unified search with custom weights."""
custom_weights = UnifiedSearchWeights(
semantic=0.6,
lexical=0.2,
category=0.1,
recency=0.1,
)
with patch(
"backend.api.features.store.hybrid_search.query_raw_with_schema"
) as mock_query:
with patch(
"backend.api.features.store.hybrid_search.embed_query"
) as mock_embed:
mock_query.return_value = []
mock_embed.return_value = [0.1] * embeddings.EMBEDDING_DIM
results, total = await unified_hybrid_search(
query="test",
weights=custom_weights,
page=1,
page_size=20,
)
# Verify custom weights are in parameters
call_args = mock_query.call_args
params = call_args[0][1:]
assert 0.6 in params # semantic weight
assert 0.2 in params # lexical weight
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.integration
async def test_unified_hybrid_search_graceful_degradation():
"""Test unified search gracefully degrades when embeddings unavailable."""
mock_results = [
{
"content_type": "DOCUMENTATION",
"content_id": "doc-1",
"searchable_text": "API Documentation",
"metadata": {},
"updated_at": "2025-01-01T00:00:00Z",
"semantic_score": 0.0, # Zero because no embedding
"lexical_score": 0.8,
"category_score": 0.0,
"recency_score": 0.2,
"combined_score": 0.5,
"total_count": 1,
},
]
with patch(
"backend.api.features.store.hybrid_search.query_raw_with_schema"
) as mock_query:
with patch(
"backend.api.features.store.hybrid_search.embed_query"
) as mock_embed:
mock_query.return_value = mock_results
mock_embed.return_value = None # Embedding failure
# Should NOT raise - graceful degradation
results, total = await unified_hybrid_search(
query="test",
page=1,
page_size=20,
)
assert len(results) == 1
assert total == 1
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.integration
async def test_unified_hybrid_search_empty_query():
"""Test unified search with empty query returns empty results."""
results, total = await unified_hybrid_search(
query="",
page=1,
page_size=20,
)
assert results == []
assert total == 0
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.integration
async def test_unified_hybrid_search_pagination():
"""Test unified search pagination."""
with patch(
"backend.api.features.store.hybrid_search.query_raw_with_schema"
) as mock_query:
with patch(
"backend.api.features.store.hybrid_search.embed_query"
) as mock_embed:
mock_query.return_value = []
mock_embed.return_value = [0.1] * embeddings.EMBEDDING_DIM
results, total = await unified_hybrid_search(
query="test",
page=3,
page_size=15,
)
# Verify pagination parameters (last two params are LIMIT and OFFSET)
call_args = mock_query.call_args
params = call_args[0]
limit = params[-2]
offset = params[-1]
assert limit == 15 # page_size
assert offset == 30 # (page - 1) * page_size = (3 - 1) * 15
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.integration
async def test_unified_hybrid_search_schema_prefix():
"""Test unified search uses schema_prefix placeholder."""
with patch(
"backend.api.features.store.hybrid_search.query_raw_with_schema"
) as mock_query:
with patch(
"backend.api.features.store.hybrid_search.embed_query"
) as mock_embed:
mock_query.return_value = []
mock_embed.return_value = [0.1] * embeddings.EMBEDDING_DIM
await unified_hybrid_search(
query="test",
page=1,
page_size=20,
)
call_args = mock_query.call_args
sql_template = call_args[0][0]
# Verify schema_prefix placeholder is used for table references
assert "{schema_prefix}" in sql_template
assert '"UnifiedContentEmbedding"' in sql_template
if __name__ == "__main__":
pytest.main([__file__, "-v", "-s"])

View File

@@ -221,23 +221,3 @@ class ReviewSubmissionRequest(pydantic.BaseModel):
is_approved: bool
comments: str # External comments visible to creator
internal_comments: str | None = None # Private admin notes
class UnifiedSearchResult(pydantic.BaseModel):
"""A single result from unified hybrid search across all content types."""
content_type: str # STORE_AGENT, BLOCK, DOCUMENTATION
content_id: str
searchable_text: str
metadata: dict | None = None
updated_at: datetime.datetime | None = None
combined_score: float | None = None
semantic_score: float | None = None
lexical_score: float | None = None
class UnifiedSearchResponse(pydantic.BaseModel):
"""Response model for unified search across all content types."""
results: list[UnifiedSearchResult]
pagination: Pagination

View File

@@ -7,15 +7,12 @@ from typing import Literal
import autogpt_libs.auth
import fastapi
import fastapi.responses
import prisma.enums
import backend.data.graph
import backend.util.json
from backend.util.models import Pagination
from . import cache as store_cache
from . import db as store_db
from . import hybrid_search as store_hybrid_search
from . import image_gen as store_image_gen
from . import media as store_media
from . import model as store_model
@@ -149,102 +146,6 @@ async def get_agents(
return agents
##############################################
############### Search Endpoints #############
##############################################
@router.get(
"/search",
summary="Unified search across all content types",
tags=["store", "public"],
response_model=store_model.UnifiedSearchResponse,
)
async def unified_search(
query: str,
content_types: list[str] | None = fastapi.Query(
default=None,
description="Content types to search: STORE_AGENT, BLOCK, DOCUMENTATION. If not specified, searches all.",
),
page: int = 1,
page_size: int = 20,
user_id: str | None = fastapi.Security(
autogpt_libs.auth.get_optional_user_id, use_cache=False
),
):
"""
Search across all content types (store agents, blocks, documentation) using hybrid search.
Combines semantic (embedding-based) and lexical (text-based) search for best results.
Args:
query: The search query string
content_types: Optional list of content types to filter by (STORE_AGENT, BLOCK, DOCUMENTATION)
page: Page number for pagination (default 1)
page_size: Number of results per page (default 20)
user_id: Optional authenticated user ID (for user-scoped content in future)
Returns:
UnifiedSearchResponse: Paginated list of search results with relevance scores
"""
if page < 1:
raise fastapi.HTTPException(
status_code=422, detail="Page must be greater than 0"
)
if page_size < 1:
raise fastapi.HTTPException(
status_code=422, detail="Page size must be greater than 0"
)
# Convert string content types to enum
content_type_enums: list[prisma.enums.ContentType] | None = None
if content_types:
try:
content_type_enums = [prisma.enums.ContentType(ct) for ct in content_types]
except ValueError as e:
raise fastapi.HTTPException(
status_code=422,
detail=f"Invalid content type. Valid values: STORE_AGENT, BLOCK, DOCUMENTATION. Error: {e}",
)
# Perform unified hybrid search
results, total = await store_hybrid_search.unified_hybrid_search(
query=query,
content_types=content_type_enums,
user_id=user_id,
page=page,
page_size=page_size,
)
# Convert results to response model
search_results = [
store_model.UnifiedSearchResult(
content_type=r["content_type"],
content_id=r["content_id"],
searchable_text=r.get("searchable_text", ""),
metadata=r.get("metadata"),
updated_at=r.get("updated_at"),
combined_score=r.get("combined_score"),
semantic_score=r.get("semantic_score"),
lexical_score=r.get("lexical_score"),
)
for r in results
]
total_pages = (total + page_size - 1) // page_size if total > 0 else 0
return store_model.UnifiedSearchResponse(
results=search_results,
pagination=Pagination(
total_items=total,
total_pages=total_pages,
current_page=page,
page_size=page_size,
),
)
@router.get(
"/agents/{username}/{agent_name}",
summary="Get specific agent",

View File

@@ -1,272 +0,0 @@
"""Tests for the semantic_search function."""
import pytest
from prisma.enums import ContentType
from backend.api.features.store.embeddings import EMBEDDING_DIM, semantic_search
@pytest.mark.asyncio
async def test_search_blocks_only(mocker):
"""Test searching only BLOCK content type."""
# Mock embed_query to return a test embedding
mock_embedding = [0.1] * EMBEDDING_DIM
mocker.patch(
"backend.api.features.store.embeddings.embed_query",
return_value=mock_embedding,
)
# Mock query_raw_with_schema to return test results
mock_results = [
{
"content_id": "block-123",
"content_type": "BLOCK",
"searchable_text": "Calculator Block - Performs arithmetic operations",
"metadata": {"name": "Calculator", "categories": ["Math"]},
"similarity": 0.85,
}
]
mocker.patch(
"backend.api.features.store.embeddings.query_raw_with_schema",
return_value=mock_results,
)
results = await semantic_search(
query="calculate numbers",
content_types=[ContentType.BLOCK],
)
assert len(results) == 1
assert results[0]["content_type"] == "BLOCK"
assert results[0]["content_id"] == "block-123"
assert results[0]["similarity"] == 0.85
@pytest.mark.asyncio
async def test_search_multiple_content_types(mocker):
"""Test searching multiple content types simultaneously."""
mock_embedding = [0.1] * EMBEDDING_DIM
mocker.patch(
"backend.api.features.store.embeddings.embed_query",
return_value=mock_embedding,
)
mock_results = [
{
"content_id": "block-123",
"content_type": "BLOCK",
"searchable_text": "Calculator Block",
"metadata": {},
"similarity": 0.85,
},
{
"content_id": "doc-456",
"content_type": "DOCUMENTATION",
"searchable_text": "How to use Calculator",
"metadata": {},
"similarity": 0.75,
},
]
mocker.patch(
"backend.api.features.store.embeddings.query_raw_with_schema",
return_value=mock_results,
)
results = await semantic_search(
query="calculator",
content_types=[ContentType.BLOCK, ContentType.DOCUMENTATION],
)
assert len(results) == 2
assert results[0]["content_type"] == "BLOCK"
assert results[1]["content_type"] == "DOCUMENTATION"
@pytest.mark.asyncio
async def test_search_with_min_similarity_threshold(mocker):
"""Test that results below min_similarity are filtered out."""
mock_embedding = [0.1] * EMBEDDING_DIM
mocker.patch(
"backend.api.features.store.embeddings.embed_query",
return_value=mock_embedding,
)
# Only return results above 0.7 similarity
mock_results = [
{
"content_id": "block-123",
"content_type": "BLOCK",
"searchable_text": "Calculator Block",
"metadata": {},
"similarity": 0.85,
}
]
mocker.patch(
"backend.api.features.store.embeddings.query_raw_with_schema",
return_value=mock_results,
)
results = await semantic_search(
query="calculate",
content_types=[ContentType.BLOCK],
min_similarity=0.7,
)
assert len(results) == 1
assert results[0]["similarity"] >= 0.7
@pytest.mark.asyncio
async def test_search_fallback_to_lexical(mocker):
"""Test fallback to lexical search when embeddings fail."""
# Mock embed_query to return None (embeddings unavailable)
mocker.patch(
"backend.api.features.store.embeddings.embed_query",
return_value=None,
)
mock_lexical_results = [
{
"content_id": "block-123",
"content_type": "BLOCK",
"searchable_text": "Calculator Block performs calculations",
"metadata": {},
"similarity": 0.0,
}
]
mocker.patch(
"backend.api.features.store.embeddings.query_raw_with_schema",
return_value=mock_lexical_results,
)
results = await semantic_search(
query="calculator",
content_types=[ContentType.BLOCK],
)
assert len(results) == 1
assert results[0]["similarity"] == 0.0 # Lexical search returns 0 similarity
@pytest.mark.asyncio
async def test_search_empty_query():
"""Test that empty query returns no results."""
results = await semantic_search(query="")
assert results == []
results = await semantic_search(query=" ")
assert results == []
@pytest.mark.asyncio
async def test_search_with_user_id_filter(mocker):
"""Test searching with user_id filter for private content."""
mock_embedding = [0.1] * EMBEDDING_DIM
mocker.patch(
"backend.api.features.store.embeddings.embed_query",
return_value=mock_embedding,
)
mock_results = [
{
"content_id": "agent-789",
"content_type": "LIBRARY_AGENT",
"searchable_text": "My Custom Agent",
"metadata": {},
"similarity": 0.9,
}
]
mocker.patch(
"backend.api.features.store.embeddings.query_raw_with_schema",
return_value=mock_results,
)
results = await semantic_search(
query="custom agent",
content_types=[ContentType.LIBRARY_AGENT],
user_id="user-123",
)
assert len(results) == 1
assert results[0]["content_type"] == "LIBRARY_AGENT"
@pytest.mark.asyncio
async def test_search_limit_parameter(mocker):
"""Test that limit parameter correctly limits results."""
mock_embedding = [0.1] * EMBEDDING_DIM
mocker.patch(
"backend.api.features.store.embeddings.embed_query",
return_value=mock_embedding,
)
# Return 5 results
mock_results = [
{
"content_id": f"block-{i}",
"content_type": "BLOCK",
"searchable_text": f"Block {i}",
"metadata": {},
"similarity": 0.8,
}
for i in range(5)
]
mocker.patch(
"backend.api.features.store.embeddings.query_raw_with_schema",
return_value=mock_results,
)
results = await semantic_search(
query="block",
content_types=[ContentType.BLOCK],
limit=5,
)
assert len(results) == 5
@pytest.mark.asyncio
async def test_search_default_content_types(mocker):
"""Test that default content_types includes BLOCK, STORE_AGENT, and DOCUMENTATION."""
mock_embedding = [0.1] * EMBEDDING_DIM
mocker.patch(
"backend.api.features.store.embeddings.embed_query",
return_value=mock_embedding,
)
mock_query_raw = mocker.patch(
"backend.api.features.store.embeddings.query_raw_with_schema",
return_value=[],
)
await semantic_search(query="test")
# Check that the SQL query includes all three default content types
call_args = mock_query_raw.call_args
assert "BLOCK" in str(call_args)
assert "STORE_AGENT" in str(call_args)
assert "DOCUMENTATION" in str(call_args)
@pytest.mark.asyncio
async def test_search_handles_database_error(mocker):
"""Test that database errors are handled gracefully."""
mock_embedding = [0.1] * EMBEDDING_DIM
mocker.patch(
"backend.api.features.store.embeddings.embed_query",
return_value=mock_embedding,
)
# Simulate database error
mocker.patch(
"backend.api.features.store.embeddings.query_raw_with_schema",
side_effect=Exception("Database connection failed"),
)
results = await semantic_search(
query="test",
content_types=[ContentType.BLOCK],
)
# Should return empty list on error
assert results == []

View File

@@ -693,13 +693,13 @@ class DeleteGraphResponse(TypedDict):
async def list_graphs(
user_id: Annotated[str, Security(get_user_id)],
) -> Sequence[graph_db.GraphMeta]:
graphs, _ = await graph_db.list_graphs_paginated(
paginated_result = await graph_db.list_graphs_paginated(
user_id=user_id,
page=1,
page_size=250,
filter_by="active",
)
return graphs
return paginated_result.graphs
@v1_router.get(

View File

@@ -804,7 +804,9 @@ class GraphModel(Graph):
)
class GraphMeta(GraphModel):
class GraphMeta(Graph):
user_id: str
# Easy work-around to prevent exposing nodes and links in the API response
nodes: list[NodeModel] = Field(default=[], exclude=True) # type: ignore
links: list[Link] = Field(default=[], exclude=True)
@@ -814,6 +816,13 @@ class GraphMeta(GraphModel):
return GraphMeta(**graph.model_dump())
class GraphsPaginated(BaseModel):
"""Response schema for paginated graphs."""
graphs: list[GraphMeta]
pagination: Pagination
# --------------------- CRUD functions --------------------- #
@@ -847,7 +856,7 @@ async def list_graphs_paginated(
page: int = 1,
page_size: int = 25,
filter_by: Literal["active"] | None = "active",
) -> tuple[list[GraphMeta], Pagination]:
) -> GraphsPaginated:
"""
Retrieves paginated graph metadata objects.
@@ -858,8 +867,7 @@ async def list_graphs_paginated(
filter_by: An optional filter to either select graphs.
Returns:
list[GraphMeta]: List of graph info objects.
Pagination: Pagination information.
GraphsPaginated: Paginated list of graph metadata.
"""
where_clause: AgentGraphWhereInput = {"userId": user_id}
@@ -892,11 +900,14 @@ async def list_graphs_paginated(
logger.error(f"Error processing graph {graph.id}: {e}")
continue
return graph_models, Pagination(
total_items=total_count,
total_pages=total_pages,
current_page=page,
page_size=page_size,
return GraphsPaginated(
graphs=graph_models,
pagination=Pagination(
total_items=total_count,
total_pages=total_pages,
current_page=page,
page_size=page_size,
),
)

View File

@@ -9,7 +9,6 @@ from backend.api.features.library.db import (
from backend.api.features.store.db import get_store_agent_details, get_store_agents
from backend.api.features.store.embeddings import (
backfill_missing_embeddings,
cleanup_orphaned_embeddings,
get_embedding_stats,
)
from backend.data import db
@@ -222,7 +221,6 @@ class DatabaseManager(AppService):
# Store Embeddings
get_embedding_stats = _(get_embedding_stats)
backfill_missing_embeddings = _(backfill_missing_embeddings)
cleanup_orphaned_embeddings = _(cleanup_orphaned_embeddings)
# Summary data - async
get_user_execution_summary_data = _(get_user_execution_summary_data)
@@ -278,7 +276,6 @@ class DatabaseManagerClient(AppServiceClient):
# Store Embeddings
get_embedding_stats = _(d.get_embedding_stats)
backfill_missing_embeddings = _(d.backfill_missing_embeddings)
cleanup_orphaned_embeddings = _(d.cleanup_orphaned_embeddings)
class DatabaseManagerAsyncClient(AppServiceClient):

View File

@@ -28,7 +28,6 @@ from backend.data.auth.oauth import cleanup_expired_oauth_tokens
from backend.data.block import BlockInput
from backend.data.execution import GraphExecutionWithNodes
from backend.data.model import CredentialsMetaInput
from backend.data.onboarding import increment_onboarding_runs
from backend.executor import utils as execution_utils
from backend.monitoring import (
NotificationJobArgs,
@@ -157,7 +156,6 @@ async def _execute_graph(**kwargs):
inputs=args.input_data,
graph_credentials_inputs=args.input_credentials,
)
await increment_onboarding_runs(args.user_id)
elapsed = asyncio.get_event_loop().time() - start_time
logger.info(
f"Graph execution started with ID {graph_exec.id} for graph {args.graph_id} "
@@ -257,14 +255,14 @@ def execution_accuracy_alerts():
def ensure_embeddings_coverage():
"""
Ensure all content types (store agents, blocks, docs) have embeddings for search.
Ensure approved store agents have embeddings for hybrid search.
Processes ALL missing embeddings in batches of 10 per content type until 100% coverage.
Missing embeddings = content invisible in hybrid search.
Processes ALL missing embeddings in batches of 10 until 100% coverage.
Missing embeddings = agents invisible in hybrid search.
Schedule: Runs every 6 hours (balanced between coverage and API costs).
- Catches new content added between scheduled runs
- Batch size 10 per content type: gradual processing to avoid rate limits
- Catches agents approved between scheduled runs
- Batch size 10: gradual processing to avoid rate limits
- Manual trigger available via execute_ensure_embeddings_coverage endpoint
"""
db_client = get_database_manager_client()
@@ -275,91 +273,51 @@ def ensure_embeddings_coverage():
logger.error(
f"Failed to get embedding stats: {stats['error']} - skipping backfill"
)
return {
"backfill": {"processed": 0, "success": 0, "failed": 0},
"cleanup": {"deleted": 0},
"error": stats["error"],
}
return {"processed": 0, "success": 0, "failed": 0, "error": stats["error"]}
# Extract totals from new stats structure
totals = stats.get("totals", {})
without_embeddings = totals.get("without_embeddings", 0)
coverage_percent = totals.get("coverage_percent", 0)
if stats["without_embeddings"] == 0:
logger.info("All approved agents have embeddings, skipping backfill")
return {"processed": 0, "success": 0, "failed": 0}
logger.info(
f"Found {stats['without_embeddings']} agents without embeddings "
f"({stats['coverage_percent']}% coverage) - processing all"
)
total_processed = 0
total_success = 0
total_failed = 0
if without_embeddings == 0:
logger.info("All content has embeddings, skipping backfill")
else:
# Log per-content-type stats for visibility
by_type = stats.get("by_type", {})
for content_type, type_stats in by_type.items():
if type_stats.get("without_embeddings", 0) > 0:
logger.info(
f"{content_type}: {type_stats['without_embeddings']} items without embeddings "
f"({type_stats['coverage_percent']}% coverage)"
)
# Process in batches until no more missing embeddings
while True:
result = db_client.backfill_missing_embeddings(batch_size=10)
logger.info(
f"Total: {without_embeddings} items without embeddings "
f"({coverage_percent}% coverage) - processing all"
)
total_processed += result["processed"]
total_success += result["success"]
total_failed += result["failed"]
# Process in batches until no more missing embeddings
while True:
result = db_client.backfill_missing_embeddings(batch_size=10)
if result["processed"] == 0:
# No more missing embeddings
break
total_processed += result["processed"]
total_success += result["success"]
total_failed += result["failed"]
if result["success"] == 0 and result["processed"] > 0:
# All attempts in this batch failed - stop to avoid infinite loop
logger.error(
f"All {result['processed']} embedding attempts failed - stopping backfill"
)
break
if result["processed"] == 0:
# No more missing embeddings
break
if result["success"] == 0 and result["processed"] > 0:
# All attempts in this batch failed - stop to avoid infinite loop
logger.error(
f"All {result['processed']} embedding attempts failed - stopping backfill"
)
break
# Small delay between batches to avoid rate limits
time.sleep(1)
logger.info(
f"Embedding backfill completed: {total_success}/{total_processed} succeeded, "
f"{total_failed} failed"
)
# Clean up orphaned embeddings for blocks and docs
logger.info("Running cleanup for orphaned embeddings (blocks/docs)...")
cleanup_result = db_client.cleanup_orphaned_embeddings()
cleanup_totals = cleanup_result.get("totals", {})
cleanup_deleted = cleanup_totals.get("deleted", 0)
if cleanup_deleted > 0:
logger.info(f"Cleanup completed: deleted {cleanup_deleted} orphaned embeddings")
by_type = cleanup_result.get("by_type", {})
for content_type, type_result in by_type.items():
if type_result.get("deleted", 0) > 0:
logger.info(
f"{content_type}: deleted {type_result['deleted']} orphaned embeddings"
)
else:
logger.info("Cleanup completed: no orphaned embeddings found")
# Small delay between batches to avoid rate limits
time.sleep(1)
logger.info(
f"Embedding backfill completed: {total_success}/{total_processed} succeeded, "
f"{total_failed} failed"
)
return {
"backfill": {
"processed": total_processed,
"success": total_success,
"failed": total_failed,
},
"cleanup": {
"deleted": cleanup_deleted,
},
"processed": total_processed,
"success": total_success,
"failed": total_failed,
}

View File

@@ -43,6 +43,4 @@ CREATE UNIQUE INDEX "UnifiedContentEmbedding_contentType_contentId_userId_key" O
-- CreateIndex
-- HNSW index for fast vector similarity search on embeddings
-- Uses cosine distance operator (<=>), which matches the query in hybrid_search.py
-- Note: Drop first in case Prisma created a btree index (Prisma doesn't support HNSW)
DROP INDEX IF EXISTS "UnifiedContentEmbedding_embedding_idx";
CREATE INDEX "UnifiedContentEmbedding_embedding_idx" ON "UnifiedContentEmbedding" USING hnsw ("embedding" public.vector_cosine_ops);

View File

@@ -1,35 +0,0 @@
-- Add tsvector search column to UnifiedContentEmbedding for unified full-text search
-- This enables hybrid search (semantic + lexical) across all content types
-- Add search column (IF NOT EXISTS for idempotency)
ALTER TABLE "UnifiedContentEmbedding" ADD COLUMN IF NOT EXISTS "search" tsvector DEFAULT ''::tsvector;
-- Create GIN index for fast full-text search
-- No @@index in schema.prisma - Prisma may generate DROP INDEX on migrate dev
-- If that happens, just let it drop and this migration will recreate it, or manually re-run:
-- CREATE INDEX IF NOT EXISTS "UnifiedContentEmbedding_search_idx" ON "UnifiedContentEmbedding" USING GIN ("search");
DROP INDEX IF EXISTS "UnifiedContentEmbedding_search_idx";
CREATE INDEX "UnifiedContentEmbedding_search_idx" ON "UnifiedContentEmbedding" USING GIN ("search");
-- Drop existing trigger/function if exists
DROP TRIGGER IF EXISTS "update_unified_tsvector" ON "UnifiedContentEmbedding";
DROP FUNCTION IF EXISTS update_unified_tsvector_column();
-- Create function to auto-update tsvector from searchableText
CREATE OR REPLACE FUNCTION update_unified_tsvector_column() RETURNS TRIGGER AS $$
BEGIN
NEW.search := to_tsvector('english', COALESCE(NEW."searchableText", ''));
RETURN NEW;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER SET search_path = platform, pg_temp;
-- Create trigger to auto-update search column on insert/update
CREATE TRIGGER "update_unified_tsvector"
BEFORE INSERT OR UPDATE ON "UnifiedContentEmbedding"
FOR EACH ROW
EXECUTE FUNCTION update_unified_tsvector_column();
-- Backfill existing rows
UPDATE "UnifiedContentEmbedding"
SET search = to_tsvector('english', COALESCE("searchableText", ''))
WHERE search IS NULL OR search = ''::tsvector;

View File

@@ -1,90 +0,0 @@
-- Remove the old search column from StoreListingVersion
-- This column has been replaced by UnifiedContentEmbedding.search
-- which provides unified hybrid search across all content types
-- First drop the dependent view
DROP VIEW IF EXISTS "StoreAgent";
-- Drop the trigger and function for old search column
-- The original trigger was created in 20251016093049_add_full_text_search
DROP TRIGGER IF EXISTS "update_tsvector" ON "StoreListingVersion";
DROP FUNCTION IF EXISTS update_tsvector_column();
-- Drop the index
DROP INDEX IF EXISTS "StoreListingVersion_search_idx";
-- NOTE: Keeping search column for now to allow easy revert if needed
-- Uncomment to fully remove once migration is verified in production:
-- ALTER TABLE "StoreListingVersion" DROP COLUMN IF EXISTS "search";
-- Recreate the StoreAgent view WITHOUT the search column
-- (Search now handled by UnifiedContentEmbedding)
CREATE OR REPLACE VIEW "StoreAgent" AS
WITH latest_versions AS (
SELECT
"storeListingId",
MAX(version) AS max_version
FROM "StoreListingVersion"
WHERE "submissionStatus" = 'APPROVED'
GROUP BY "storeListingId"
),
agent_versions AS (
SELECT
"storeListingId",
array_agg(DISTINCT version::text ORDER BY version::text) AS versions
FROM "StoreListingVersion"
WHERE "submissionStatus" = 'APPROVED'
GROUP BY "storeListingId"
),
agent_graph_versions AS (
SELECT
"storeListingId",
array_agg(DISTINCT "agentGraphVersion"::text ORDER BY "agentGraphVersion"::text) AS graph_versions
FROM "StoreListingVersion"
WHERE "submissionStatus" = 'APPROVED'
GROUP BY "storeListingId"
)
SELECT
sl.id AS listing_id,
slv.id AS "storeListingVersionId",
slv."createdAt" AS updated_at,
sl.slug,
COALESCE(slv.name, '') AS agent_name,
slv."videoUrl" AS agent_video,
slv."agentOutputDemoUrl" AS agent_output_demo,
COALESCE(slv."imageUrls", ARRAY[]::text[]) AS agent_image,
slv."isFeatured" AS featured,
p.username AS creator_username,
p."avatarUrl" AS creator_avatar,
slv."subHeading" AS sub_heading,
slv.description,
slv.categories,
COALESCE(ar.run_count, 0::bigint) AS runs,
COALESCE(rs.avg_rating, 0.0)::double precision AS rating,
COALESCE(av.versions, ARRAY[slv.version::text]) AS versions,
COALESCE(agv.graph_versions, ARRAY[slv."agentGraphVersion"::text]) AS "agentGraphVersions",
slv."agentGraphId",
slv."isAvailable" AS is_available,
COALESCE(sl."useForOnboarding", false) AS "useForOnboarding"
FROM "StoreListing" sl
JOIN latest_versions lv
ON sl.id = lv."storeListingId"
JOIN "StoreListingVersion" slv
ON slv."storeListingId" = lv."storeListingId"
AND slv.version = lv.max_version
AND slv."submissionStatus" = 'APPROVED'
JOIN "AgentGraph" a
ON slv."agentGraphId" = a.id
AND slv."agentGraphVersion" = a.version
LEFT JOIN "Profile" p
ON sl."owningUserId" = p."userId"
LEFT JOIN "mv_review_stats" rs
ON sl.id = rs."storeListingId"
LEFT JOIN "mv_agent_run_counts" ar
ON a.id = ar."agentGraphId"
LEFT JOIN agent_versions av
ON sl.id = av."storeListingId"
LEFT JOIN agent_graph_versions agv
ON sl.id = agv."storeListingId"
WHERE sl."isDeleted" = false
AND sl."hasApprovedVersion" = true;

View File

@@ -937,7 +937,7 @@ model StoreListingVersion {
// Old versions can be made unavailable by the author if desired
isAvailable Boolean @default(true)
// Note: search column removed - now using UnifiedContentEmbedding.search
search Unsupported("tsvector")? @default(dbgenerated("''::tsvector"))
// Version workflow state
submissionStatus SubmissionStatus @default(DRAFT)
@@ -1002,7 +1002,6 @@ model UnifiedContentEmbedding {
// Search data
embedding Unsupported("vector(1536)") // pgvector embedding (extension in platform schema)
searchableText String // Combined text for search and fallback
search Unsupported("tsvector")? @default(dbgenerated("''::tsvector")) // Full-text search (auto-populated by trigger)
metadata Json @default("{}") // Content-specific metadata
@@unique([contentType, contentId, userId], map: "UnifiedContentEmbedding_contentType_contentId_userId_key")
@@ -1010,8 +1009,6 @@ model UnifiedContentEmbedding {
@@index([userId])
@@index([contentType, userId])
@@index([embedding], map: "UnifiedContentEmbedding_embedding_idx")
// NO @@index for search - GIN index "UnifiedContentEmbedding_search_idx" created via SQL migration
// Prisma may generate DROP INDEX on migrate dev - that's okay, migration recreates it
}
model StoreListingReview {
@@ -1040,31 +1037,16 @@ enum SubmissionStatus {
}
enum APIKeyPermission {
// Legacy v1 permissions (kept for backward compatibility)
IDENTITY // Info about the authenticated user
EXECUTE_GRAPH // Can execute agent graphs (v1 only)
EXECUTE_GRAPH // Can execute agent graphs
READ_GRAPH // Can get graph versions and details
EXECUTE_BLOCK // Can execute individual blocks (v1 only)
EXECUTE_BLOCK // Can execute individual blocks
READ_BLOCK // Can get block information
READ_STORE // Can read store/marketplace agents and creators
USE_TOOLS // Can use chat tools via external API (v1 only)
MANAGE_INTEGRATIONS // Can initiate OAuth flows and complete them (v1 only)
READ_STORE // Can read store agents and creators
USE_TOOLS // Can use chat tools via external API
MANAGE_INTEGRATIONS // Can initiate OAuth flows and complete them
READ_INTEGRATIONS // Can list credentials and providers
DELETE_INTEGRATIONS // Can delete credentials (v1 only)
// V2 permissions
WRITE_GRAPH // Can create, update, delete graphs
READ_SCHEDULE // Can list schedules
WRITE_SCHEDULE // Can create and delete schedules
WRITE_STORE // Can create, update, delete marketplace submissions
READ_LIBRARY // Can list library agents and runs
RUN_AGENT // Can run agents from library
READ_RUN // Can list and get run details
WRITE_RUN // Can stop and delete runs
READ_RUN_REVIEW // Can list pending human-in-the-loop reviews
WRITE_RUN_REVIEW // Can submit human-in-the-loop review responses
READ_CREDITS // Can get credit balance and transactions
UPLOAD_FILES // Can upload files for agent input
DELETE_INTEGRATIONS // Can delete credentials
}
model APIKey {

View File

@@ -1,8 +1,7 @@
import { useGraphStore } from "@/app/(platform)/build/stores/graphStore";
import { usePostV1ExecuteGraphAgent } from "@/app/api/__generated__/endpoints/graphs/graphs";
import { useToast } from "@/components/molecules/Toast/use-toast";
import {
ApiError,
CredentialsMetaInput,
GraphExecutionMeta,
} from "@/lib/autogpt-server-api";
@@ -10,9 +9,6 @@ import { parseAsInteger, parseAsString, useQueryStates } from "nuqs";
import { useMemo, useState } from "react";
import { uiSchema } from "../../../FlowEditor/nodes/uiSchema";
import { isCredentialFieldSchema } from "@/components/renderers/InputRenderer/custom/CredentialField/helpers";
import { useNodeStore } from "@/app/(platform)/build/stores/nodeStore";
import { useToast } from "@/components/molecules/Toast/use-toast";
import { useReactFlow } from "@xyflow/react";
export const useRunInputDialog = ({
setIsOpen,
@@ -35,7 +31,6 @@ export const useRunInputDialog = ({
flowVersion: parseAsInteger,
});
const { toast } = useToast();
const { setViewport } = useReactFlow();
const { mutateAsync: executeGraph, isPending: isExecutingGraph } =
usePostV1ExecuteGraphAgent({
@@ -47,63 +42,13 @@ export const useRunInputDialog = ({
});
},
onError: (error) => {
if (error instanceof ApiError && error.isGraphValidationError()) {
const errorData = error.response?.detail;
Object.entries(errorData.node_errors).forEach(
([nodeId, nodeErrors]) => {
useNodeStore
.getState()
.updateNodeErrors(
nodeId,
nodeErrors as { [key: string]: string },
);
},
);
toast({
title: errorData?.message || "Graph validation failed",
description:
"Please fix the validation errors on the highlighted nodes and try again.",
variant: "destructive",
});
setIsOpen(false);
const firstBackendId = Object.keys(errorData.node_errors)[0];
if (firstBackendId) {
const firstErrorNode = useNodeStore
.getState()
.nodes.find(
(n) =>
n.data.metadata?.backend_id === firstBackendId ||
n.id === firstBackendId,
);
if (firstErrorNode) {
setTimeout(() => {
setViewport(
{
x:
-firstErrorNode.position.x * 0.8 +
window.innerWidth / 2 -
150,
y: -firstErrorNode.position.y * 0.8 + 50,
zoom: 0.8,
},
{ duration: 500 },
);
}, 50);
}
}
} else {
toast({
title: "Error running graph",
description:
(error as Error).message || "An unexpected error occurred.",
variant: "destructive",
});
setIsOpen(false);
}
// Reset running state on error
setIsGraphRunning(false);
toast({
title: (error.detail as string) ?? "An unexpected error occurred.",
description: "An unexpected error occurred.",
variant: "destructive",
});
},
},
});

View File

@@ -20,13 +20,11 @@ type Props = {
export const NodeHeader = ({ data, nodeId }: Props) => {
const updateNodeData = useNodeStore((state) => state.updateNodeData);
const title =
(data.metadata?.customized_name as string) ||
data.hardcodedValues.agent_name ||
data.title;
const title = (data.metadata?.customized_name as string) || data.title;
const [isEditingTitle, setIsEditingTitle] = useState(false);
const [editedTitle, setEditedTitle] = useState(title);
const [editedTitle, setEditedTitle] = useState(
beautifyString(title).replace("Block", "").trim(),
);
const handleTitleEdit = () => {
updateNodeData(nodeId, {

View File

@@ -31,6 +31,8 @@ export const OutputHandler = ({
const [isOutputVisible, setIsOutputVisible] = useState(true);
const brokenOutputs = useBrokenOutputs(nodeId);
console.log("brokenOutputs", brokenOutputs);
const showHandles = uiType !== BlockUIType.OUTPUT;
const renderOutputHandles = (

View File

@@ -5621,69 +5621,6 @@
"security": [{ "HTTPBearerJWT": [] }]
}
},
"/api/store/search": {
"get": {
"tags": ["v2", "store", "public"],
"summary": "Unified search across all content types",
"description": "Search across all content types (store agents, blocks, documentation) using hybrid search.\n\nCombines semantic (embedding-based) and lexical (text-based) search for best results.\n\nArgs:\n query: The search query string\n content_types: Optional list of content types to filter by (STORE_AGENT, BLOCK, DOCUMENTATION)\n page: Page number for pagination (default 1)\n page_size: Number of results per page (default 20)\n user_id: Optional authenticated user ID (for user-scoped content in future)\n\nReturns:\n UnifiedSearchResponse: Paginated list of search results with relevance scores",
"operationId": "getV2Unified search across all content types",
"security": [{ "HTTPBearer": [] }],
"parameters": [
{
"name": "query",
"in": "query",
"required": true,
"schema": { "type": "string", "title": "Query" }
},
{
"name": "content_types",
"in": "query",
"required": false,
"schema": {
"anyOf": [
{ "type": "array", "items": { "type": "string" } },
{ "type": "null" }
],
"description": "Content types to search: STORE_AGENT, BLOCK, DOCUMENTATION. If not specified, searches all.",
"title": "Content Types"
},
"description": "Content types to search: STORE_AGENT, BLOCK, DOCUMENTATION. If not specified, searches all."
},
{
"name": "page",
"in": "query",
"required": false,
"schema": { "type": "integer", "default": 1, "title": "Page" }
},
{
"name": "page_size",
"in": "query",
"required": false,
"schema": { "type": "integer", "default": 20, "title": "Page Size" }
}
],
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/UnifiedSearchResponse"
}
}
}
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/HTTPValidationError" }
}
}
}
}
}
},
"/api/store/submissions": {
"get": {
"tags": ["v2", "store", "private"],
@@ -10962,57 +10899,6 @@
"required": ["name", "graph_id", "graph_version", "trigger_config"],
"title": "TriggeredPresetSetupRequest"
},
"UnifiedSearchResponse": {
"properties": {
"results": {
"items": { "$ref": "#/components/schemas/UnifiedSearchResult" },
"type": "array",
"title": "Results"
},
"pagination": { "$ref": "#/components/schemas/Pagination" }
},
"type": "object",
"required": ["results", "pagination"],
"title": "UnifiedSearchResponse",
"description": "Response model for unified search across all content types."
},
"UnifiedSearchResult": {
"properties": {
"content_type": { "type": "string", "title": "Content Type" },
"content_id": { "type": "string", "title": "Content Id" },
"searchable_text": { "type": "string", "title": "Searchable Text" },
"metadata": {
"anyOf": [
{ "additionalProperties": true, "type": "object" },
{ "type": "null" }
],
"title": "Metadata"
},
"updated_at": {
"anyOf": [
{ "type": "string", "format": "date-time" },
{ "type": "null" }
],
"title": "Updated At"
},
"combined_score": {
"anyOf": [{ "type": "number" }, { "type": "null" }],
"title": "Combined Score"
},
"semantic_score": {
"anyOf": [{ "type": "number" }, { "type": "null" }],
"title": "Semantic Score"
},
"lexical_score": {
"anyOf": [{ "type": "number" }, { "type": "null" }],
"title": "Lexical Score"
}
},
"type": "object",
"required": ["content_type", "content_id", "searchable_text"],
"title": "UnifiedSearchResult",
"description": "A single result from unified hybrid search across all content types."
},
"UpdateAppLogoRequest": {
"properties": {
"logo_url": {
@@ -11977,7 +11863,6 @@
"in": "header",
"name": "X-Postmark-Webhook-Token"
},
"HTTPBearer": { "type": "http", "scheme": "bearer" },
"HTTPBearerJWT": {
"type": "http",
"scheme": "bearer",

View File

@@ -30,8 +30,6 @@ export const FormRenderer = ({
return generateUiSchemaForCustomFields(preprocessedSchema, uiSchema);
}, [preprocessedSchema, uiSchema]);
console.log("preprocessedSchema", preprocessedSchema);
return (
<div className={"mb-6 mt-4"} data-tutorial-id="input-handles">
<Form

View File

@@ -63,7 +63,7 @@ export default function ArrayFieldTemplate(props: ArrayFieldTemplateProps) {
<div className="m-0 flex p-0">
<div className="m-0 w-full space-y-4 p-0">
{!fromAnyOf && (
<div className="flex items-center gap-2">
<div className="flex items-center">
<ArrayFieldTitleTemplate
fieldPathId={fieldPathId}
title={uiOptions.title || title}

View File

@@ -17,7 +17,6 @@ import {
import { useNodeStore } from "@/app/(platform)/build/stores/nodeStore";
import { useEdgeStore } from "@/app/(platform)/build/stores/edgeStore";
import { FieldError } from "./FieldError";
import { BlockUIType } from "@/app/(platform)/build/components/types";
export default function FieldTemplate(props: FieldTemplateProps) {
const {
@@ -40,7 +39,7 @@ export default function FieldTemplate(props: FieldTemplateProps) {
onRemoveProperty,
readonly,
} = props;
const { nodeId, uiType } = registry.formContext;
const { nodeId } = registry.formContext;
const { isInputConnected } = useEdgeStore();
const showAdvanced = useNodeStore(
@@ -51,10 +50,6 @@ export default function FieldTemplate(props: FieldTemplateProps) {
return <div className="hidden">{children}</div>;
}
if (uiType === BlockUIType.NOTE) {
return children;
}
const uiOptions = getUiOptions(uiSchema);
const TitleFieldTemplate = getTemplate(
"TitleFieldTemplate",

View File

@@ -1,23 +1,12 @@
import { BlockUIType } from "@/app/(platform)/build/components/types";
import { GoogleDrivePickerInput } from "@/components/contextual/GoogleDrivePicker/GoogleDrivePickerInput";
import { GoogleDrivePickerConfig } from "@/lib/autogpt-server-api";
import { FieldProps, getUiOptions } from "@rjsf/utils";
export const GoogleDrivePickerField = (props: FieldProps) => {
const { schema, uiSchema, onChange, fieldPathId, formData, registry } = props;
const { schema, uiSchema, onChange, fieldPathId, formData } = props;
const uiOptions = getUiOptions(uiSchema);
const config: GoogleDrivePickerConfig = schema.google_drive_picker_config;
const uiType = registry.formContext?.uiType;
if (uiType === BlockUIType.INPUT) {
return (
<div className="rounded-3xl border border-gray-200 p-2 pl-4 text-xs text-gray-500 hover:cursor-not-allowed">
Select files when you run the graph
</div>
);
}
return (
<div>
<GoogleDrivePickerInput

View File

@@ -3,10 +3,7 @@ import { CredentialsField } from "./CredentialField/CredentialField";
import { GoogleDrivePickerField } from "./GoogleDrivePickerField/GoogleDrivePickerField";
import { JsonTextField } from "./JsonTextField/JsonTextField";
import { MultiSelectField } from "./MultiSelectField/MultiSelectField";
import {
isGoogleDrivePickerSchema,
isMultiSelectSchema,
} from "../utils/schema-utils";
import { isMultiSelectSchema } from "../utils/schema-utils";
import { TableField } from "./TableField/TableField";
export interface CustomFieldDefinition {
@@ -32,7 +29,12 @@ export const CUSTOM_FIELDS: CustomFieldDefinition[] = [
},
{
id: "custom/google_drive_picker_field",
matcher: isGoogleDrivePickerSchema,
matcher: (schema: any) => {
return (
"google_drive_picker_config" in schema ||
("format" in schema && schema.format === "google-drive-picker")
);
},
component: GoogleDrivePickerField,
},
{

View File

@@ -55,38 +55,3 @@ export function isMultiSelectSchema(schema: RJSFSchema | undefined): boolean {
)
);
}
const isGoogleDriveFileObject = (obj: RJSFSchema): boolean => {
if (obj.type !== "object" || !obj.properties) {
return false;
}
const props = obj.properties;
const hasId = "id" in props;
const hasMimeType = "mimeType" in props || "mime_type" in props;
const hasIconUrl = "iconUrl" in props || "icon_url" in props;
const hasIsFolder = "isFolder" in props || "is_folder" in props;
return hasId && hasMimeType && (hasIconUrl || hasIsFolder);
};
export const isGoogleDrivePickerSchema = (
schema: RJSFSchema | undefined,
): boolean => {
if (!schema) {
return false;
}
// highest priority
if (
"google_drive_picker_config" in schema ||
("format" in schema && schema.format === "google-drive-picker")
) {
return true;
}
// In the Input type block, we do not add the format for the GoogleFile field, so we need to include this extra check.
if (isGoogleDriveFileObject(schema)) {
return true;
}
return false;
};

View File

@@ -2,7 +2,7 @@
## Overview
The AutoGPT Platform implements OAuth 2.0 in two distinct contexts:
The AutoGPT platform implements OAuth 2.0 in two distinct contexts:
1. **User Authentication (SSO)**: Handled by Supabase for platform login
2. **API Integration Credentials**: Custom OAuth implementation for third-party service access
@@ -324,7 +324,7 @@ stateDiagram-v2
### User Authentication (SSO) via Supabase
- **Purpose**: Authenticate users to access the AutoGPT Platform
- **Purpose**: Authenticate users to access the AutoGPT platform
- **Provider**: Supabase Auth (currently supports Google SSO)
- **Flow Path**: `/login` → Supabase OAuth → `/auth/callback`
- **Session Storage**: Supabase-managed cookies

View File

@@ -1,18 +1,14 @@
# AutoGPT Platform External API Guide
The AutoGPT Platform provides an External API that allows you to programmatically interact with agents, blocks, the marketplace, and more.
The AutoGPT Platform provides an External API that allows you to programmatically interact with agents, blocks, the store, and more.
## API Documentation
Full API documentation with interactive examples is available at:
- **Main**: [https://backend.agpt.co/external-api/docs](https://backend.agpt.co/external-api/docs)
- **v2 API**: [https://backend.agpt.co/external-api/v2/docs](https://backend.agpt.co/external-api/v2/docs)
- **v1 API**: [https://backend.agpt.co/external-api/v1/docs](https://backend.agpt.co/external-api/v1/docs)
**[https://backend.agpt.co/external-api/docs](https://backend.agpt.co/external-api/docs)**
The Swagger UI documentation includes all available endpoints, request/response schemas, and allows you to try out API calls directly.
**Recommendation**: New integrations should use the v2 API.
This Swagger UI documentation includes all available endpoints, request/response schemas, and allows you to try out API calls directly.
## Authentication Methods
@@ -20,12 +16,11 @@ The External API supports two authentication methods:
### 1. API Keys
API keys are the simplest way to authenticate. Generate an API key from your AutoGPT Platform account settings and include it in your requests using the `X-API-Key` header:
API keys are the simplest way to authenticate. Generate an API key from your AutoGPT Platform account settings and include it in your requests:
```bash
# List available blocks
curl -H "X-API-Key: YOUR_API_KEY" \
https://backend.agpt.co/external-api/v1/blocks
```http
GET /external-api/v1/blocks
X-API-Key: your_api_key_here
```
API keys are ideal for:
@@ -37,62 +32,51 @@ API keys are ideal for:
For applications that need to act on behalf of users, use OAuth 2.0. This allows users to authorize your application to access their AutoGPT resources.
To get started:
OAuth is ideal for:
- Third-party applications
- "Sign in with AutoGPT" (SSO, Single Sign-On) functionality
- Applications that need user-specific permissions
See the [SSO Integration Guide](sso-guide.md) for complete OAuth implementation details.
## Available Scopes
When using OAuth, request only the scopes your application needs:
| Scope | Description |
|-------|-------------|
| `IDENTITY` | Read user ID, e-mail, and timezone |
| `EXECUTE_GRAPH` | Run agents |
| `READ_GRAPH` | Read agent run results |
| `EXECUTE_BLOCK` | Run individual blocks |
| `READ_BLOCK` | Read block definitions |
| `READ_STORE` | Access the agent store |
| `USE_TOOLS` | Use platform tools |
| `MANAGE_INTEGRATIONS` | Create and update user integrations |
| `READ_INTEGRATIONS` | Read user integration status |
| `DELETE_INTEGRATIONS` | Remove user integrations |
## Quick Start
### Using an API Key
```bash
# List available blocks
curl -H "X-API-Key: YOUR_API_KEY" \
https://backend.agpt.co/external-api/v1/blocks
```
### Using OAuth
1. Register an OAuth application (contact platform administrator)
2. Implement the OAuth flow as described in the [OAuth Guide](oauth-guide.md)
3. Go through the OAuth flow to authorize your app and obtain an access token
4. Make API requests with the access token in the `Authorization: Bearer` header:
2. Implement the OAuth flow as described in the [SSO Guide](sso-guide.md)
3. Use the obtained access token:
```bash
curl -H "Authorization: Bearer agpt_xt_..." \
https://backend.agpt.co/external-api/v1/blocks
```
OAuth is ideal for:
- Third-party applications
- "Sign in with AutoGPT" (SSO, Single Sign-On) functionality
- Applications that need user-specific permissions
See the [OAuth Integration Guide](oauth-guide.md) for complete OAuth implementation details.
## Available Scopes
When creating API keys or using OAuth, request only the scopes your application needs.
### Core Scopes
| Scope | Description |
|-------|-------------|
| `IDENTITY` | Read user ID, e-mail, and timezone |
| `READ_GRAPH` | Read graph/agent definitions and versions |
| `WRITE_GRAPH` | Create, update, and delete graphs |
| `READ_BLOCK` | Read block definitions |
| `READ_STORE` | Access the agent marketplace |
| `WRITE_STORE` | Create, update, and delete marketplace submissions |
| `READ_LIBRARY` | List library agents and their runs |
| `RUN_AGENT` | Execute agents from your library |
| `READ_RUN` | List and get execution run details |
| `WRITE_RUN` | Stop and delete runs |
| `READ_RUN_REVIEW` | List pending human-in-the-loop reviews |
| `WRITE_RUN_REVIEW` | Submit human-in-the-loop review responses |
| `READ_SCHEDULE` | List execution schedules |
| `WRITE_SCHEDULE` | Create and delete schedules |
| `READ_CREDITS` | Get credit balance and transaction history |
| `READ_INTEGRATIONS` | List OAuth credentials |
| `UPLOAD_FILES` | Upload files for agent input |
### Legacy Scopes (v1 only)
| Scope | Description |
|-------|-------------|
| `EXECUTE_GRAPH` | Execute graphs directly (use `RUN_AGENT` in v2) |
| `EXECUTE_BLOCK` | Execute individual blocks |
| `USE_TOOLS` | Use chat tools via external API |
| `MANAGE_INTEGRATIONS` | Initiate and complete OAuth flows |
| `DELETE_INTEGRATIONS` | Delete OAuth credentials |
## Support
For issues or questions about API integration: