Files
AutoGPT/autogpt_platform/backend/backend/api/rest_api.py
Zamil Majdy f9f358c526 feat(mcp): Add MCP tool block with OAuth, tool discovery, and standard credential integration (#12011)
## Summary

<img width="1000" alt="image"
src="https://github.com/user-attachments/assets/18e8ef34-d222-453c-8b0a-1b25ef8cf806"
/>


<img width="250" alt="image"
src="https://github.com/user-attachments/assets/ba97556c-09c5-4f76-9f4e-49a2e8e57468"
/>

<img width="250" alt="image"
src="https://github.com/user-attachments/assets/68f7804a-fe74-442d-9849-39a229c052cf"
/>

<img width="250" alt="image"
src="https://github.com/user-attachments/assets/700690ba-f9fe-4726-8871-3bfbab586001"
/>

Full-stack MCP (Model Context Protocol) tool block integration that
allows users to connect to any MCP server, discover available tools,
authenticate via OAuth, and execute tools — all through the standard
AutoGPT credential system.

### Backend

- **MCPToolBlock** (`blocks/mcp/block.py`): New block using
`CredentialsMetaInput` pattern with optional credentials (`default={}`),
supporting both authenticated (OAuth) and public MCP servers. Includes
auto-lookup fallback for backward compatibility.
- **MCP Client** (`blocks/mcp/client.py`): HTTP transport with JSON-RPC
2.0, tool discovery, tool execution with robust error handling
(type-checked error fields, non-JSON response handling)
- **MCP OAuth Handler** (`blocks/mcp/oauth.py`): RFC 8414 discovery,
dynamic per-server OAuth with PKCE, token storage and refresh via
`raise_for_status=True`
- **MCP API Routes** (`api/features/mcp/routes.py`): `discover-tools`,
`oauth/login`, `oauth/callback` endpoints with credential cleanup,
defensive OAuth metadata validation
- **Credential system integration**:
- `CredentialsMetaInput` model_validator normalizes legacy
`"ProviderName.MCP"` format from Python 3.13's `str(StrEnum)` change
- `CredentialsFieldInfo.combine()` supports URL-based credential
discrimination (each MCP server gets its own credential entry)
- `aggregate_credentials_inputs` checks block schema defaults for
credential optionality
- Executor normalizes credential data for both Pydantic and JSON schema
validation paths
  - Chat credential matching handles MCP server URL filtering
- `provider_matches()` helper used consistently for Python 3.13 StrEnum
compatibility
- **Pre-run validation**: `_validate_graph_get_errors` now calls
`get_missing_input()` for custom block-level validation (MCP tool
arguments)
- **Security**: HTML tag stripping loop to prevent XSS bypass, SSRF
protection (removed trusted_origins)

### Frontend

- **MCPToolDialog** (`MCPToolDialog.tsx`): Full tool discovery UI —
enter server URL, authenticate if needed, browse tools, select tool and
configure
- **OAuth popup** (`oauth-popup.ts`): Shared utility supporting
cross-origin MCP OAuth flows with BroadcastChannel + localStorage
fallback
- **Credential integration**: MCP-specific OAuth flow in
`useCredentialsInput`, server URL filtering in `useCredentials`, MCP
callback page
- **CredentialsSelect**: Auto-selects first available credential instead
of defaulting to "None", credentials listed before "None" in dropdown
- **Node rendering**: Dynamic tool input schema rendering on MCP nodes,
proper handling in both legacy and new flow editors
- **Block title persistence**: `customized_name` set at block creation
for both MCP and Agent blocks — no fallback logic needed, titles survive
save/load reliably
- **Stable credential ordering**: Removed `sortByUnsetFirst` that caused
credential inputs to jump when selected

### Tests (~2060 lines)

- Unit tests: block, client, tool execution
- Integration tests: mock MCP server with auth
- OAuth flow tests
- API endpoint tests
- Credential combining/optionality tests
- E2e tests (skipped in CI, run manually)

## Key Design Decisions

1. **Optional credentials via `default={}`**: MCP servers can be public
(no auth) or private (OAuth). The `credentials` field has `default={}`
making it optional at the schema level, so public servers work without
prompting for credentials.

2. **URL-based credential discrimination**: Each MCP server URL gets its
own credential entry in the "Run agent" form (via
`discriminator="server_url"`), so agents using multiple MCP servers
prompt for each independently.

3. **Model-level normalization**: Python 3.13 changed `str(StrEnum)` to
return `"ClassName.MEMBER"`. Rather than scattering fixes across the
codebase, a Pydantic `model_validator(mode="before")` on
`CredentialsMetaInput` handles normalization centrally, and
`provider_matches()` handles lookups.

4. **Credential auto-select**: `CredentialsSelect` component defaults to
the first available credential and notifies the parent state, ensuring
credentials are pre-filled in the "Run agent" dialog without requiring
manual selection.

5. **customized_name for block titles**: Both MCP and Agent blocks set
`customized_name` in metadata at creation time. This eliminates
convoluted runtime fallback logic (`agent_name`, hostname extraction) —
the title is persisted once and read directly.

## Test plan

- [x] Unit/integration tests pass (68 MCP + 11 graph = 79 tests)
- [x] Manual: MCP block with public server (DeepWiki) — no credentials
needed, tools discovered and executable
- [x] Manual: MCP block with OAuth server (Linear, Sentry) — OAuth flow
prompts correctly
- [x] Manual: "Run agent" form shows correct credential requirements per
MCP server
- [x] Manual: Credential auto-selects when exactly one matches,
pre-selects first when multiple exist
- [x] Manual: Credential ordering stays stable when
selecting/deselecting
- [x] Manual: MCP block title persists after save and refresh
- [x] Manual: Agent block title persists after save and refresh (via
customized_name)
- [ ] Manual: Shared agent with MCP block prompts new user for
credentials

---------

Co-authored-by: Otto <otto@agpt.co>
Co-authored-by: Ubbe <hi@ubbe.dev>
2026-02-13 16:17:03 +00:00

547 lines
18 KiB
Python

import contextlib
import logging
import platform
from enum import Enum
from typing import Any, Optional
import fastapi
import fastapi.responses
import pydantic
import starlette.middleware.cors
import uvicorn
from autogpt_libs.auth import add_auth_responses_to_openapi
from autogpt_libs.auth import verify_settings as verify_auth_settings
from fastapi.exceptions import RequestValidationError
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.routing import APIRoute
from prisma.errors import PrismaError
import backend.api.features.admin.credit_admin_routes
import backend.api.features.admin.execution_analytics_routes
import backend.api.features.admin.store_admin_routes
import backend.api.features.builder
import backend.api.features.builder.routes
import backend.api.features.chat.routes as chat_routes
import backend.api.features.executions.review.routes
import backend.api.features.library.db
import backend.api.features.library.model
import backend.api.features.library.routes
import backend.api.features.mcp.routes as mcp_routes
import backend.api.features.oauth
import backend.api.features.otto.routes
import backend.api.features.postmark.postmark
import backend.api.features.store.model
import backend.api.features.store.routes
import backend.api.features.v1
import backend.api.features.workspace.routes as workspace_routes
import backend.data.block
import backend.data.db
import backend.data.graph
import backend.data.user
import backend.integrations.webhooks.utils
import backend.util.service
import backend.util.settings
from backend.api.features.chat.completion_consumer import (
start_completion_consumer,
stop_completion_consumer,
)
from backend.blocks.llm import DEFAULT_LLM_MODEL
from backend.data.model import Credentials
from backend.integrations.providers import ProviderName
from backend.monitoring.instrumentation import instrument_fastapi
from backend.util import json
from backend.util.cloud_storage import shutdown_cloud_storage_handler
from backend.util.exceptions import (
MissingConfigError,
NotAuthorizedError,
NotFoundError,
)
from backend.util.feature_flag import initialize_launchdarkly, shutdown_launchdarkly
from backend.util.service import UnhealthyServiceError
from backend.util.workspace_storage import shutdown_workspace_storage
from .external.fastapi_app import external_api
from .features.analytics import router as analytics_router
from .features.integrations.router import router as integrations_router
from .middleware.security import SecurityHeadersMiddleware
from .utils.cors import build_cors_params
from .utils.openapi import sort_openapi
settings = backend.util.settings.Settings()
logger = logging.getLogger(__name__)
logging.getLogger("autogpt_libs").setLevel(logging.INFO)
@contextlib.contextmanager
def launch_darkly_context():
if settings.config.app_env != backend.util.settings.AppEnvironment.LOCAL:
initialize_launchdarkly()
try:
yield
finally:
shutdown_launchdarkly()
else:
yield
@contextlib.asynccontextmanager
async def lifespan_context(app: fastapi.FastAPI):
verify_auth_settings()
await backend.data.db.connect()
# Configure thread pool for FastAPI sync operation performance
# CRITICAL: FastAPI automatically runs ALL sync functions in this thread pool:
# - Any endpoint defined with 'def' (not async def)
# - Any dependency function defined with 'def' (not async def)
# - Manual run_in_threadpool() calls (like JWT decoding)
# Default pool size is only 40 threads, causing bottlenecks under high concurrency
config = backend.util.settings.Config()
try:
import anyio.to_thread
anyio.to_thread.current_default_thread_limiter().total_tokens = (
config.fastapi_thread_pool_size
)
logger.info(
f"Thread pool size set to {config.fastapi_thread_pool_size} for sync endpoint/dependency performance"
)
except (ImportError, AttributeError) as e:
logger.warning(f"Could not configure thread pool size: {e}")
# Continue without thread pool configuration
# Ensure SDK auto-registration is patched before initializing blocks
from backend.sdk.registry import AutoRegistry
AutoRegistry.patch_integrations()
await backend.data.block.initialize_blocks()
await backend.data.user.migrate_and_encrypt_user_integrations()
await backend.data.graph.fix_llm_provider_credentials()
await backend.data.graph.migrate_llm_models(DEFAULT_LLM_MODEL)
await backend.integrations.webhooks.utils.migrate_legacy_triggered_graphs()
# Start chat completion consumer for Redis Streams notifications
try:
await start_completion_consumer()
except Exception as e:
logger.warning(f"Could not start chat completion consumer: {e}")
with launch_darkly_context():
yield
# Stop chat completion consumer
try:
await stop_completion_consumer()
except Exception as e:
logger.warning(f"Error stopping chat completion consumer: {e}")
try:
await shutdown_cloud_storage_handler()
except Exception as e:
logger.warning(f"Error shutting down cloud storage handler: {e}")
try:
await shutdown_workspace_storage()
except Exception as e:
logger.warning(f"Error shutting down workspace storage: {e}")
await backend.data.db.disconnect()
def custom_generate_unique_id(route: APIRoute):
"""Generate clean operation IDs for OpenAPI spec following the format:
{method}{tag}{summary}
"""
if not route.tags or not route.methods:
return f"{route.name}"
method = list(route.methods)[0].lower()
first_tag = route.tags[0]
if isinstance(first_tag, Enum):
tag_str = first_tag.name
else:
tag_str = str(first_tag)
tag = "".join(word.capitalize() for word in tag_str.split("_")) # v1/v2
summary = (
route.summary if route.summary else route.name
) # need to be unique, a different version could have the same summary
summary = "".join(word.capitalize() for word in str(summary).split("_"))
if tag:
return f"{method}{tag}{summary}"
else:
return f"{method}{summary}"
docs_url = (
"/docs"
if settings.config.app_env == backend.util.settings.AppEnvironment.LOCAL
else None
)
app = fastapi.FastAPI(
title="AutoGPT Agent Server",
description=(
"This server is used to execute agents that are created by the AutoGPT system."
),
summary="AutoGPT Agent Server",
version="0.1",
lifespan=lifespan_context,
docs_url=docs_url,
generate_unique_id_function=custom_generate_unique_id,
)
app.add_middleware(SecurityHeadersMiddleware)
# Add GZip compression middleware for large responses (like /api/blocks)
app.add_middleware(GZipMiddleware, minimum_size=50_000) # 50KB threshold
# Add 401 responses to authenticated endpoints in OpenAPI spec
add_auth_responses_to_openapi(app)
# Sort OpenAPI schema to eliminate diff on refactors
sort_openapi(app)
# Add Prometheus instrumentation
instrument_fastapi(
app,
service_name="rest-api",
expose_endpoint=True,
endpoint="/metrics",
include_in_schema=settings.config.app_env
== backend.util.settings.AppEnvironment.LOCAL,
)
def handle_internal_http_error(status_code: int = 500, log_error: bool = True):
def handler(request: fastapi.Request, exc: Exception):
if log_error:
logger.exception(
"%s %s failed. Investigate and resolve the underlying issue: %s",
request.method,
request.url.path,
exc,
exc_info=exc,
)
hint = (
"Adjust the request and retry."
if status_code < 500
else "Check server logs and dependent services."
)
return fastapi.responses.JSONResponse(
content={
"message": f"Failed to process {request.method} {request.url.path}",
"detail": str(exc),
"hint": hint,
},
status_code=status_code,
)
return handler
async def validation_error_handler(
request: fastapi.Request, exc: Exception
) -> fastapi.responses.Response:
logger.error(
"Validation failed for %s %s: %s. Fix the request payload and try again.",
request.method,
request.url.path,
exc,
)
errors: list | str
if hasattr(exc, "errors"):
errors = exc.errors() # type: ignore[call-arg]
else:
errors = str(exc)
response_content = {
"message": f"Invalid data for {request.method} {request.url.path}",
"detail": errors,
"hint": "Ensure the request matches the API schema.",
}
content_json = json.dumps(response_content)
return fastapi.responses.Response(
content=content_json,
status_code=422,
media_type="application/json",
)
app.add_exception_handler(PrismaError, handle_internal_http_error(500))
app.add_exception_handler(NotFoundError, handle_internal_http_error(404, False))
app.add_exception_handler(NotAuthorizedError, handle_internal_http_error(403, False))
app.add_exception_handler(RequestValidationError, validation_error_handler)
app.add_exception_handler(pydantic.ValidationError, validation_error_handler)
app.add_exception_handler(MissingConfigError, handle_internal_http_error(503))
app.add_exception_handler(ValueError, handle_internal_http_error(400))
app.add_exception_handler(Exception, handle_internal_http_error(500))
app.include_router(backend.api.features.v1.v1_router, tags=["v1"], prefix="/api")
app.include_router(
integrations_router,
prefix="/api/integrations",
tags=["v1", "integrations"],
)
app.include_router(
analytics_router,
prefix="/api/analytics",
tags=["analytics"],
)
app.include_router(
backend.api.features.store.routes.router, tags=["v2"], prefix="/api/store"
)
app.include_router(
backend.api.features.builder.routes.router, tags=["v2"], prefix="/api/builder"
)
app.include_router(
backend.api.features.admin.store_admin_routes.router,
tags=["v2", "admin"],
prefix="/api/store",
)
app.include_router(
backend.api.features.admin.credit_admin_routes.router,
tags=["v2", "admin"],
prefix="/api/credits",
)
app.include_router(
backend.api.features.admin.execution_analytics_routes.router,
tags=["v2", "admin"],
prefix="/api/executions",
)
app.include_router(
backend.api.features.executions.review.routes.router,
tags=["v2", "executions", "review"],
prefix="/api/review",
)
app.include_router(
backend.api.features.library.routes.router, tags=["v2"], prefix="/api/library"
)
app.include_router(
backend.api.features.otto.routes.router, tags=["v2", "otto"], prefix="/api/otto"
)
app.include_router(
backend.api.features.postmark.postmark.router,
tags=["v1", "email"],
prefix="/api/email",
)
app.include_router(
chat_routes.router,
tags=["v2", "chat"],
prefix="/api/chat",
)
app.include_router(
workspace_routes.router,
tags=["workspace"],
prefix="/api/workspace",
)
app.include_router(
mcp_routes.router,
tags=["v2", "mcp"],
prefix="/api/mcp",
)
app.include_router(
backend.api.features.oauth.router,
tags=["oauth"],
prefix="/api/oauth",
)
app.mount("/external-api", external_api)
@app.get(path="/health", tags=["health"], dependencies=[])
async def health():
if not backend.data.db.is_connected():
raise UnhealthyServiceError("Database is not connected")
return {"status": "healthy"}
class AgentServer(backend.util.service.AppProcess):
def run(self):
cors_params = build_cors_params(
settings.config.backend_cors_allow_origins,
settings.config.app_env,
)
server_app = starlette.middleware.cors.CORSMiddleware(
app=app,
**cors_params,
allow_credentials=True,
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
# Only add debug in local environment (not supported in all uvicorn versions)
if settings.config.app_env == backend.util.settings.AppEnvironment.LOCAL:
import os
# Enable asyncio debug mode via environment variable
os.environ["PYTHONASYNCIODEBUG"] = "1"
# Configure uvicorn with performance optimizations from Kludex FastAPI tips
uvicorn.run(
app=server_app,
host=settings.config.agent_api_host,
port=settings.config.agent_api_port,
log_config=None,
# Use httptools for HTTP parsing (if available)
http="httptools",
# Only use uvloop on Unix-like systems (not supported on Windows)
loop="uvloop" if platform.system() != "Windows" else "auto",
# Disable WebSockets since this service doesn't have any WebSocket endpoints
ws="none",
)
@staticmethod
async def test_execute_graph(
graph_id: str,
user_id: str,
graph_version: Optional[int] = None,
node_input: Optional[dict[str, Any]] = None,
):
return await backend.api.features.v1.execute_graph(
user_id=user_id,
graph_id=graph_id,
graph_version=graph_version,
inputs=node_input or {},
credentials_inputs={},
)
@staticmethod
async def test_get_graph(
graph_id: str,
graph_version: int,
user_id: str,
for_export: bool = False,
):
return await backend.api.features.v1.get_graph(
graph_id, user_id, graph_version, for_export
)
@staticmethod
async def test_create_graph(
create_graph: backend.api.features.v1.CreateGraph,
user_id: str,
):
return await backend.api.features.v1.create_new_graph(create_graph, user_id)
@staticmethod
async def test_get_graph_run_status(graph_exec_id: str, user_id: str):
from backend.data.execution import get_graph_execution_meta
execution = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not execution:
raise ValueError(f"Execution {graph_exec_id} not found")
return execution.status
@staticmethod
async def test_delete_graph(graph_id: str, user_id: str):
"""Used for clean-up after a test run"""
await backend.api.features.library.db.delete_library_agent_by_graph_id(
graph_id=graph_id, user_id=user_id
)
return await backend.api.features.v1.delete_graph(graph_id, user_id)
@staticmethod
async def test_get_presets(user_id: str, page: int = 1, page_size: int = 10):
return await backend.api.features.library.routes.presets.list_presets(
user_id=user_id, page=page, page_size=page_size
)
@staticmethod
async def test_get_preset(preset_id: str, user_id: str):
return await backend.api.features.library.routes.presets.get_preset(
preset_id=preset_id, user_id=user_id
)
@staticmethod
async def test_create_preset(
preset: backend.api.features.library.model.LibraryAgentPresetCreatable,
user_id: str,
):
return await backend.api.features.library.routes.presets.create_preset(
preset=preset, user_id=user_id
)
@staticmethod
async def test_update_preset(
preset_id: str,
preset: backend.api.features.library.model.LibraryAgentPresetUpdatable,
user_id: str,
):
return await backend.api.features.library.routes.presets.update_preset(
preset_id=preset_id, preset=preset, user_id=user_id
)
@staticmethod
async def test_delete_preset(preset_id: str, user_id: str):
return await backend.api.features.library.routes.presets.delete_preset(
preset_id=preset_id, user_id=user_id
)
@staticmethod
async def test_execute_preset(
preset_id: str,
user_id: str,
inputs: Optional[dict[str, Any]] = None,
):
return await backend.api.features.library.routes.presets.execute_preset(
preset_id=preset_id,
user_id=user_id,
inputs=inputs or {},
credential_inputs={},
)
@staticmethod
async def test_create_store_listing(
request: backend.api.features.store.model.StoreSubmissionRequest, user_id: str
):
return await backend.api.features.store.routes.create_submission(
request, user_id
)
### ADMIN ###
@staticmethod
async def test_review_store_listing(
request: backend.api.features.store.model.ReviewSubmissionRequest,
user_id: str,
):
return await backend.api.features.admin.store_admin_routes.review_submission(
request.store_listing_version_id, request, user_id
)
@staticmethod
async def test_create_credentials(
user_id: str,
provider: ProviderName,
credentials: Credentials,
) -> Credentials:
from .features.integrations.router import create_credentials, get_credential
try:
return await create_credentials(
user_id=user_id, provider=provider, credentials=credentials
)
except Exception as e:
logger.error(f"Error creating credentials: {e}")
return await get_credential(
provider=provider,
user_id=user_id,
cred_id=credentials.id,
)
def set_test_dependency_overrides(self, overrides: dict):
app.dependency_overrides.update(overrides)