Files
AutoGPT/autogpt_platform/backend/backend/data/model.py
Zamil Majdy f9f358c526 feat(mcp): Add MCP tool block with OAuth, tool discovery, and standard credential integration (#12011)
## Summary

<img width="1000" alt="image"
src="https://github.com/user-attachments/assets/18e8ef34-d222-453c-8b0a-1b25ef8cf806"
/>


<img width="250" alt="image"
src="https://github.com/user-attachments/assets/ba97556c-09c5-4f76-9f4e-49a2e8e57468"
/>

<img width="250" alt="image"
src="https://github.com/user-attachments/assets/68f7804a-fe74-442d-9849-39a229c052cf"
/>

<img width="250" alt="image"
src="https://github.com/user-attachments/assets/700690ba-f9fe-4726-8871-3bfbab586001"
/>

Full-stack MCP (Model Context Protocol) tool block integration that
allows users to connect to any MCP server, discover available tools,
authenticate via OAuth, and execute tools — all through the standard
AutoGPT credential system.

### Backend

- **MCPToolBlock** (`blocks/mcp/block.py`): New block using
`CredentialsMetaInput` pattern with optional credentials (`default={}`),
supporting both authenticated (OAuth) and public MCP servers. Includes
auto-lookup fallback for backward compatibility.
- **MCP Client** (`blocks/mcp/client.py`): HTTP transport with JSON-RPC
2.0, tool discovery, tool execution with robust error handling
(type-checked error fields, non-JSON response handling)
- **MCP OAuth Handler** (`blocks/mcp/oauth.py`): RFC 8414 discovery,
dynamic per-server OAuth with PKCE, token storage and refresh via
`raise_for_status=True`
- **MCP API Routes** (`api/features/mcp/routes.py`): `discover-tools`,
`oauth/login`, `oauth/callback` endpoints with credential cleanup,
defensive OAuth metadata validation
- **Credential system integration**:
- `CredentialsMetaInput` model_validator normalizes legacy
`"ProviderName.MCP"` format from Python 3.13's `str(StrEnum)` change
- `CredentialsFieldInfo.combine()` supports URL-based credential
discrimination (each MCP server gets its own credential entry)
- `aggregate_credentials_inputs` checks block schema defaults for
credential optionality
- Executor normalizes credential data for both Pydantic and JSON schema
validation paths
  - Chat credential matching handles MCP server URL filtering
- `provider_matches()` helper used consistently for Python 3.13 StrEnum
compatibility
- **Pre-run validation**: `_validate_graph_get_errors` now calls
`get_missing_input()` for custom block-level validation (MCP tool
arguments)
- **Security**: HTML tag stripping loop to prevent XSS bypass, SSRF
protection (removed trusted_origins)

### Frontend

- **MCPToolDialog** (`MCPToolDialog.tsx`): Full tool discovery UI —
enter server URL, authenticate if needed, browse tools, select tool and
configure
- **OAuth popup** (`oauth-popup.ts`): Shared utility supporting
cross-origin MCP OAuth flows with BroadcastChannel + localStorage
fallback
- **Credential integration**: MCP-specific OAuth flow in
`useCredentialsInput`, server URL filtering in `useCredentials`, MCP
callback page
- **CredentialsSelect**: Auto-selects first available credential instead
of defaulting to "None", credentials listed before "None" in dropdown
- **Node rendering**: Dynamic tool input schema rendering on MCP nodes,
proper handling in both legacy and new flow editors
- **Block title persistence**: `customized_name` set at block creation
for both MCP and Agent blocks — no fallback logic needed, titles survive
save/load reliably
- **Stable credential ordering**: Removed `sortByUnsetFirst` that caused
credential inputs to jump when selected

### Tests (~2060 lines)

- Unit tests: block, client, tool execution
- Integration tests: mock MCP server with auth
- OAuth flow tests
- API endpoint tests
- Credential combining/optionality tests
- E2e tests (skipped in CI, run manually)

## Key Design Decisions

1. **Optional credentials via `default={}`**: MCP servers can be public
(no auth) or private (OAuth). The `credentials` field has `default={}`
making it optional at the schema level, so public servers work without
prompting for credentials.

2. **URL-based credential discrimination**: Each MCP server URL gets its
own credential entry in the "Run agent" form (via
`discriminator="server_url"`), so agents using multiple MCP servers
prompt for each independently.

3. **Model-level normalization**: Python 3.13 changed `str(StrEnum)` to
return `"ClassName.MEMBER"`. Rather than scattering fixes across the
codebase, a Pydantic `model_validator(mode="before")` on
`CredentialsMetaInput` handles normalization centrally, and
`provider_matches()` handles lookups.

4. **Credential auto-select**: `CredentialsSelect` component defaults to
the first available credential and notifies the parent state, ensuring
credentials are pre-filled in the "Run agent" dialog without requiring
manual selection.

5. **customized_name for block titles**: Both MCP and Agent blocks set
`customized_name` in metadata at creation time. This eliminates
convoluted runtime fallback logic (`agent_name`, hostname extraction) —
the title is persisted once and read directly.

## Test plan

- [x] Unit/integration tests pass (68 MCP + 11 graph = 79 tests)
- [x] Manual: MCP block with public server (DeepWiki) — no credentials
needed, tools discovered and executable
- [x] Manual: MCP block with OAuth server (Linear, Sentry) — OAuth flow
prompts correctly
- [x] Manual: "Run agent" form shows correct credential requirements per
MCP server
- [x] Manual: Credential auto-selects when exactly one matches,
pre-selects first when multiple exist
- [x] Manual: Credential ordering stays stable when
selecting/deselecting
- [x] Manual: MCP block title persists after save and refresh
- [x] Manual: Agent block title persists after save and refresh (via
customized_name)
- [ ] Manual: Shared agent with MCP block prompts new user for
credentials

---------

Co-authored-by: Otto <otto@agpt.co>
Co-authored-by: Ubbe <hi@ubbe.dev>
2026-02-13 16:17:03 +00:00

927 lines
32 KiB
Python

from __future__ import annotations
import base64
import enum
import logging
from collections import defaultdict
from datetime import datetime, timezone
from json import JSONDecodeError
from typing import (
TYPE_CHECKING,
Annotated,
Any,
Callable,
ClassVar,
Generic,
Literal,
Optional,
TypeVar,
cast,
get_args,
)
from uuid import uuid4
from prisma.enums import CreditTransactionType, OnboardingStep
from pydantic import (
BaseModel,
ConfigDict,
Field,
GetCoreSchemaHandler,
SecretStr,
field_serializer,
model_validator,
)
from pydantic_core import (
CoreSchema,
PydanticUndefined,
PydanticUndefinedType,
ValidationError,
core_schema,
)
from typing_extensions import TypedDict
from backend.integrations.providers import ProviderName
from backend.util.json import loads as json_loads
from backend.util.request import parse_url
from backend.util.settings import Secrets
# Type alias for any provider name (including custom ones)
AnyProviderName = str # Will be validated as ProviderName at runtime
USER_TIMEZONE_NOT_SET = "not-set"
class User(BaseModel):
"""Application-layer User model with snake_case convention."""
model_config = ConfigDict(
extra="forbid",
str_strip_whitespace=True,
)
id: str = Field(..., description="User ID")
email: str = Field(..., description="User email address")
email_verified: bool = Field(default=True, description="Whether email is verified")
name: Optional[str] = Field(None, description="User display name")
created_at: datetime = Field(..., description="When user was created")
updated_at: datetime = Field(..., description="When user was last updated")
metadata: dict[str, Any] = Field(
default_factory=dict, description="User metadata as dict"
)
integrations: str = Field(default="", description="Encrypted integrations data")
stripe_customer_id: Optional[str] = Field(None, description="Stripe customer ID")
top_up_config: Optional["AutoTopUpConfig"] = Field(
None, description="Top up configuration"
)
# Notification preferences
max_emails_per_day: int = Field(default=3, description="Maximum emails per day")
notify_on_agent_run: bool = Field(default=True, description="Notify on agent run")
notify_on_zero_balance: bool = Field(
default=True, description="Notify on zero balance"
)
notify_on_low_balance: bool = Field(
default=True, description="Notify on low balance"
)
notify_on_block_execution_failed: bool = Field(
default=True, description="Notify on block execution failure"
)
notify_on_continuous_agent_error: bool = Field(
default=True, description="Notify on continuous agent error"
)
notify_on_daily_summary: bool = Field(
default=True, description="Notify on daily summary"
)
notify_on_weekly_summary: bool = Field(
default=True, description="Notify on weekly summary"
)
notify_on_monthly_summary: bool = Field(
default=True, description="Notify on monthly summary"
)
# User timezone for scheduling and time display
timezone: str = Field(
default=USER_TIMEZONE_NOT_SET,
description="User timezone (IANA timezone identifier or 'not-set')",
)
@classmethod
def from_db(cls, prisma_user: "PrismaUser") -> "User":
"""Convert a database User object to application User model."""
# Handle metadata field - convert from JSON string or dict to dict
metadata = {}
if prisma_user.metadata:
if isinstance(prisma_user.metadata, str):
try:
metadata = json_loads(prisma_user.metadata)
except (JSONDecodeError, TypeError):
metadata = {}
elif isinstance(prisma_user.metadata, dict):
metadata = prisma_user.metadata
# Handle topUpConfig field
top_up_config = None
if prisma_user.topUpConfig:
if isinstance(prisma_user.topUpConfig, str):
try:
config_dict = json_loads(prisma_user.topUpConfig)
top_up_config = AutoTopUpConfig.model_validate(config_dict)
except (JSONDecodeError, TypeError, ValueError):
top_up_config = None
elif isinstance(prisma_user.topUpConfig, dict):
try:
top_up_config = AutoTopUpConfig.model_validate(
prisma_user.topUpConfig
)
except ValueError:
top_up_config = None
return cls(
id=prisma_user.id,
email=prisma_user.email,
email_verified=prisma_user.emailVerified or True,
name=prisma_user.name,
created_at=prisma_user.createdAt,
updated_at=prisma_user.updatedAt,
metadata=metadata,
integrations=prisma_user.integrations or "",
stripe_customer_id=prisma_user.stripeCustomerId,
top_up_config=top_up_config,
max_emails_per_day=prisma_user.maxEmailsPerDay or 3,
notify_on_agent_run=prisma_user.notifyOnAgentRun or True,
notify_on_zero_balance=prisma_user.notifyOnZeroBalance or True,
notify_on_low_balance=prisma_user.notifyOnLowBalance or True,
notify_on_block_execution_failed=prisma_user.notifyOnBlockExecutionFailed
or True,
notify_on_continuous_agent_error=prisma_user.notifyOnContinuousAgentError
or True,
notify_on_daily_summary=prisma_user.notifyOnDailySummary or True,
notify_on_weekly_summary=prisma_user.notifyOnWeeklySummary or True,
notify_on_monthly_summary=prisma_user.notifyOnMonthlySummary or True,
timezone=prisma_user.timezone or USER_TIMEZONE_NOT_SET,
)
if TYPE_CHECKING:
from prisma.models import User as PrismaUser
T = TypeVar("T")
logger = logging.getLogger(__name__)
GraphInput = dict[str, Any]
class BlockSecret:
def __init__(self, key: Optional[str] = None, value: Optional[str] = None):
if value is not None:
trimmed_value = value.strip()
if value != trimmed_value:
logger.debug(BlockSecret.TRIMMING_VALUE_MSG)
self._value = trimmed_value
return
self._value = self.__get_secret(key)
if self._value is None:
raise ValueError(f"Secret {key} not found.")
trimmed_value = self._value.strip()
if self._value != trimmed_value:
logger.debug(BlockSecret.TRIMMING_VALUE_MSG)
self._value = trimmed_value
TRIMMING_VALUE_MSG: ClassVar[str] = "Provided secret value got trimmed."
STR: ClassVar[str] = "<secret>"
SECRETS: ClassVar[Secrets] = Secrets()
def __repr__(self):
return BlockSecret.STR
def __str__(self):
return BlockSecret.STR
@staticmethod
def __get_secret(key: str | None):
if not key or not hasattr(BlockSecret.SECRETS, key):
return None
return getattr(BlockSecret.SECRETS, key)
def get_secret_value(self):
trimmed_value = str(self._value).strip()
if self._value != trimmed_value:
logger.info(BlockSecret.TRIMMING_VALUE_MSG)
return trimmed_value
@classmethod
def parse_value(cls, value: Any) -> BlockSecret:
if isinstance(value, BlockSecret):
return value
return BlockSecret(value=value)
@classmethod
def __get_pydantic_json_schema__(
cls, source_type: Any, handler: GetCoreSchemaHandler
) -> dict[str, Any]:
return {
"type": "string",
}
@classmethod
def __get_pydantic_core_schema__(
cls, source_type: Any, handler: GetCoreSchemaHandler
) -> CoreSchema:
validate_fun = core_schema.no_info_plain_validator_function(cls.parse_value)
return core_schema.json_or_python_schema(
json_schema=validate_fun,
python_schema=validate_fun,
serialization=core_schema.plain_serializer_function_ser_schema(
lambda val: BlockSecret.STR
),
)
def SecretField(
value: Optional[str] = None,
key: Optional[str] = None,
title: Optional[str] = None,
description: Optional[str] = None,
placeholder: Optional[str] = None,
**kwargs,
) -> BlockSecret:
return SchemaField(
BlockSecret(key=key, value=value),
title=title,
description=description,
placeholder=placeholder,
secret=True,
**kwargs,
)
def SchemaField(
default: T | PydanticUndefinedType = PydanticUndefined,
*args,
default_factory: Optional[Callable[[], T]] = None,
title: Optional[str] = None,
description: Optional[str] = None,
placeholder: Optional[str] = None,
advanced: Optional[bool] = None,
secret: bool = False,
exclude: bool = False,
hidden: Optional[bool] = None,
depends_on: Optional[list[str]] = None,
ge: Optional[float] = None,
le: Optional[float] = None,
min_length: Optional[int] = None,
max_length: Optional[int] = None,
discriminator: Optional[str] = None,
format: Optional[str] = None,
json_schema_extra: Optional[dict[str, Any]] = None,
) -> T:
if default is PydanticUndefined and default_factory is None:
advanced = False
elif advanced is None:
advanced = True
json_schema_extra = {
k: v
for k, v in {
"placeholder": placeholder,
"secret": secret,
"advanced": advanced,
"hidden": hidden,
"depends_on": depends_on,
"format": format,
**(json_schema_extra or {}),
}.items()
if v is not None
}
return Field(
default,
*args,
default_factory=default_factory,
title=title,
description=description,
exclude=exclude,
ge=ge,
le=le,
min_length=min_length,
max_length=max_length,
discriminator=discriminator,
json_schema_extra=json_schema_extra,
) # type: ignore
class _BaseCredentials(BaseModel):
id: str = Field(default_factory=lambda: str(uuid4()))
provider: str
title: Optional[str] = None
@field_serializer("*")
def dump_secret_strings(value: Any, _info):
if isinstance(value, SecretStr):
return value.get_secret_value()
return value
class OAuth2Credentials(_BaseCredentials):
type: Literal["oauth2"] = "oauth2"
username: Optional[str] = None
"""Username of the third-party service user that these credentials belong to"""
access_token: SecretStr
access_token_expires_at: Optional[int] = None
"""Unix timestamp (seconds) indicating when the access token expires (if at all)"""
refresh_token: Optional[SecretStr] = None
refresh_token_expires_at: Optional[int] = None
"""Unix timestamp (seconds) indicating when the refresh token expires (if at all)"""
scopes: list[str]
metadata: dict[str, Any] = Field(default_factory=dict)
def auth_header(self) -> str:
return f"Bearer {self.access_token.get_secret_value()}"
class APIKeyCredentials(_BaseCredentials):
type: Literal["api_key"] = "api_key"
api_key: SecretStr
expires_at: Optional[int] = Field(
default=None,
description="Unix timestamp (seconds) indicating when the API key expires (if at all)",
)
"""Unix timestamp (seconds) indicating when the API key expires (if at all)"""
def auth_header(self) -> str:
# Linear API keys should not have Bearer prefix
if self.provider == "linear":
return self.api_key.get_secret_value()
return f"Bearer {self.api_key.get_secret_value()}"
class UserPasswordCredentials(_BaseCredentials):
type: Literal["user_password"] = "user_password"
username: SecretStr
password: SecretStr
def auth_header(self) -> str:
# Converting the string to bytes using encode()
# Base64 encoding it with base64.b64encode()
# Converting the resulting bytes back to a string with decode()
return f"Basic {base64.b64encode(f'{self.username.get_secret_value()}:{self.password.get_secret_value()}'.encode()).decode()}"
class HostScopedCredentials(_BaseCredentials):
type: Literal["host_scoped"] = "host_scoped"
host: str = Field(description="The host/URI pattern to match against request URLs")
headers: dict[str, SecretStr] = Field(
description="Key-value header map to add to matching requests",
default_factory=dict,
)
def _extract_headers(self, headers: dict[str, SecretStr]) -> dict[str, str]:
"""Helper to extract secret values from headers."""
return {key: value.get_secret_value() for key, value in headers.items()}
@field_serializer("headers")
def serialize_headers(self, headers: dict[str, SecretStr]) -> dict[str, str]:
"""Serialize headers by extracting secret values."""
return self._extract_headers(headers)
def get_headers_dict(self) -> dict[str, str]:
"""Get headers with secret values extracted."""
return self._extract_headers(self.headers)
def auth_header(self) -> str:
"""Get authorization header for backward compatibility."""
auth_headers = self.get_headers_dict()
if "Authorization" in auth_headers:
return auth_headers["Authorization"]
return ""
def matches_url(self, url: str) -> bool:
"""Check if this credential should be applied to the given URL."""
request_host, request_port = _extract_host_from_url(url)
cred_scope_host, cred_scope_port = _extract_host_from_url(self.host)
if not request_host:
return False
# If a port is specified in credential host, the request host port must match
if cred_scope_port is not None and request_port != cred_scope_port:
return False
# Non-standard ports are only allowed if explicitly specified in credential host
elif cred_scope_port is None and request_port not in (80, 443, None):
return False
# Simple host matching
if cred_scope_host == request_host:
return True
# Support wildcard matching (e.g., "*.example.com" matches "api.example.com")
if cred_scope_host.startswith("*."):
domain = cred_scope_host[2:] # Remove "*."
return request_host.endswith(f".{domain}") or request_host == domain
return False
Credentials = Annotated[
OAuth2Credentials
| APIKeyCredentials
| UserPasswordCredentials
| HostScopedCredentials,
Field(discriminator="type"),
]
CredentialsType = Literal["api_key", "oauth2", "user_password", "host_scoped"]
class OAuthState(BaseModel):
token: str
provider: str
expires_at: int
code_verifier: Optional[str] = None
"""Unix timestamp (seconds) indicating when this OAuth state expires"""
scopes: list[str]
# Fields for external API OAuth flows
callback_url: Optional[str] = None
"""External app's callback URL for OAuth redirect"""
state_metadata: dict[str, Any] = Field(default_factory=dict)
"""Metadata to echo back to external app on completion"""
initiated_by_api_key_id: Optional[str] = None
"""ID of the API key that initiated this OAuth flow"""
@property
def is_external(self) -> bool:
"""Whether this OAuth flow was initiated via external API."""
return self.callback_url is not None
class UserMetadata(BaseModel):
integration_credentials: list[Credentials] = Field(default_factory=list)
"""⚠️ Deprecated; use `UserIntegrations.credentials` instead"""
integration_oauth_states: list[OAuthState] = Field(default_factory=list)
"""⚠️ Deprecated; use `UserIntegrations.oauth_states` instead"""
class UserMetadataRaw(TypedDict, total=False):
integration_credentials: list[dict]
"""⚠️ Deprecated; use `UserIntegrations.credentials` instead"""
integration_oauth_states: list[dict]
"""⚠️ Deprecated; use `UserIntegrations.oauth_states` instead"""
class UserIntegrations(BaseModel):
class ManagedCredentials(BaseModel):
"""Integration credentials managed by us, rather than by the user"""
ayrshare_profile_key: Optional[SecretStr] = None
@field_serializer("*")
def dump_secret_strings(value: Any, _info):
if isinstance(value, SecretStr):
return value.get_secret_value()
return value
managed_credentials: ManagedCredentials = Field(default_factory=ManagedCredentials)
credentials: list[Credentials] = Field(default_factory=list)
oauth_states: list[OAuthState] = Field(default_factory=list)
CP = TypeVar("CP", bound=ProviderName)
CT = TypeVar("CT", bound=CredentialsType)
def is_credentials_field_name(field_name: str) -> bool:
return field_name == "credentials" or field_name.endswith("_credentials")
class CredentialsMetaInput(BaseModel, Generic[CP, CT]):
id: str
title: Optional[str] = None
provider: CP
type: CT
@model_validator(mode="before")
@classmethod
def _normalize_legacy_provider(cls, data: Any) -> Any:
"""Fix ``ProviderName.X`` format from Python 3.13 ``str(Enum)`` bug.
Python 3.13 changed ``str(StrEnum)`` to return ``"ClassName.MEMBER"``
instead of the plain value. Old stored credential references may have
``provider: "ProviderName.MCP"`` instead of ``"mcp"``.
"""
if isinstance(data, dict):
prov = data.get("provider", "")
if isinstance(prov, str) and prov.startswith("ProviderName."):
member = prov.removeprefix("ProviderName.")
try:
data = {**data, "provider": ProviderName[member].value}
except KeyError:
pass
return data
@classmethod
def allowed_providers(cls) -> tuple[ProviderName, ...] | None:
return get_args(cls.model_fields["provider"].annotation)
@classmethod
def allowed_cred_types(cls) -> tuple[CredentialsType, ...]:
return get_args(cls.model_fields["type"].annotation)
@staticmethod
def validate_credentials_field_schema(
field_schema: dict[str, Any], field_name: str
):
"""Validates the schema of a credentials input field"""
try:
field_info = CredentialsFieldInfo[CP, CT].model_validate(field_schema)
except ValidationError as e:
if "Field required [type=missing" not in str(e):
raise
raise TypeError(
"Field 'credentials' JSON schema lacks required extra items: "
f"{field_schema}"
) from e
providers = field_info.provider
if (
providers is not None
and len(providers) > 1
and not field_info.discriminator
):
raise TypeError(
f"Multi-provider CredentialsField '{field_name}' "
"requires discriminator!"
)
@staticmethod
def _add_json_schema_extra(schema: dict, model_class: type):
# Use model_class for allowed_providers/cred_types
if hasattr(model_class, "allowed_providers") and hasattr(
model_class, "allowed_cred_types"
):
allowed_providers = model_class.allowed_providers()
# If no specific providers (None), allow any string
if allowed_providers is None:
schema["credentials_provider"] = ["string"] # Allow any string provider
else:
schema["credentials_provider"] = allowed_providers
schema["credentials_types"] = model_class.allowed_cred_types()
# Do not return anything, just mutate schema in place
model_config = ConfigDict(
json_schema_extra=_add_json_schema_extra, # type: ignore
)
def _extract_host_from_url(url: str) -> tuple[str, int | None]:
"""Extract host and port from URL for grouping host-scoped credentials."""
try:
parsed = parse_url(url)
return parsed.hostname or url, parsed.port
except Exception:
return "", None
class CredentialsFieldInfo(BaseModel, Generic[CP, CT]):
# TODO: move discrimination mechanism out of CredentialsField (frontend + backend)
provider: frozenset[CP] = Field(..., alias="credentials_provider")
supported_types: frozenset[CT] = Field(..., alias="credentials_types")
required_scopes: Optional[frozenset[str]] = Field(None, alias="credentials_scopes")
discriminator: Optional[str] = None
discriminator_mapping: Optional[dict[str, CP]] = None
discriminator_values: set[Any] = Field(default_factory=set)
@classmethod
def combine(
cls, *fields: tuple[CredentialsFieldInfo[CP, CT], T]
) -> dict[str, tuple[CredentialsFieldInfo[CP, CT], set[T]]]:
"""
Combines multiple CredentialsFieldInfo objects into as few as possible.
Rules:
- Items can only be combined if they have the same supported credentials types
and the same supported providers.
- When combining items, the `required_scopes` of the result is a join
of the `required_scopes` of the original items.
Params:
*fields: (CredentialsFieldInfo, key) objects to group and combine
Returns:
A sequence of tuples containing combined CredentialsFieldInfo objects and
the set of keys of the respective original items that were grouped together.
"""
if not fields:
return {}
# Group fields by their provider and supported_types
# For HTTP host-scoped credentials, also group by host
grouped_fields: defaultdict[
tuple[frozenset[CP], frozenset[CT]],
list[tuple[T, CredentialsFieldInfo[CP, CT]]],
] = defaultdict(list)
for field, key in fields:
if (
field.discriminator
and not field.discriminator_mapping
and field.discriminator_values
):
# URL-based discrimination (e.g. HTTP host-scoped, MCP server URL):
# Each unique host gets its own credential entry.
provider_prefix = next(iter(field.provider))
# Use .value for enum types to get the plain string (e.g. "mcp" not "ProviderName.MCP")
prefix_str = getattr(provider_prefix, "value", str(provider_prefix))
providers = frozenset(
[cast(CP, prefix_str)]
+ [
cast(CP, parse_url(str(value)).netloc)
for value in field.discriminator_values
]
)
else:
providers = frozenset(field.provider)
group_key = (providers, frozenset(field.supported_types))
grouped_fields[group_key].append((key, field))
# Combine fields within each group
result: dict[str, tuple[CredentialsFieldInfo[CP, CT], set[T]]] = {}
for key, group in grouped_fields.items():
# Start with the first field in the group
_, combined = group[0]
# Track the keys that were combined
combined_keys = {key for key, _ in group}
# Combine required_scopes from all fields in the group
all_scopes = set()
for _, field in group:
if field.required_scopes:
all_scopes.update(field.required_scopes)
# Combine discriminator_values from all fields in the group (removing duplicates)
all_discriminator_values = []
for _, field in group:
for value in field.discriminator_values:
if value not in all_discriminator_values:
all_discriminator_values.append(value)
# Generate the key for the combined result
providers_key, supported_types_key = key
group_key = (
"-".join(sorted(providers_key))
+ "_"
+ "-".join(sorted(supported_types_key))
+ "_credentials"
)
result[group_key] = (
CredentialsFieldInfo[CP, CT](
credentials_provider=combined.provider,
credentials_types=combined.supported_types,
credentials_scopes=frozenset(all_scopes) or None,
discriminator=combined.discriminator,
discriminator_mapping=combined.discriminator_mapping,
discriminator_values=set(all_discriminator_values),
),
combined_keys,
)
return result
def discriminate(self, discriminator_value: Any) -> CredentialsFieldInfo:
if not (self.discriminator and self.discriminator_mapping):
return self
try:
provider = self.discriminator_mapping[discriminator_value]
except KeyError:
raise ValueError(
f"Model '{discriminator_value}' is not supported. "
"It may have been deprecated. Please update your agent configuration."
)
return CredentialsFieldInfo(
credentials_provider=frozenset([provider]),
credentials_types=self.supported_types,
credentials_scopes=self.required_scopes,
discriminator=self.discriminator,
discriminator_mapping=self.discriminator_mapping,
discriminator_values=self.discriminator_values,
)
def CredentialsField(
required_scopes: set[str] = set(),
*,
discriminator: Optional[str] = None,
discriminator_mapping: Optional[dict[str, Any]] = None,
discriminator_values: Optional[set[Any]] = None,
title: Optional[str] = None,
description: Optional[str] = None,
**kwargs,
) -> CredentialsMetaInput:
"""
`CredentialsField` must and can only be used on fields named `credentials`.
This is enforced by the `BlockSchema` base class.
"""
field_schema_extra = {
k: v
for k, v in {
"credentials_scopes": list(required_scopes) or None,
"discriminator": discriminator,
"discriminator_mapping": discriminator_mapping,
"discriminator_values": discriminator_values,
}.items()
if v is not None
}
# Merge any json_schema_extra passed in kwargs
if "json_schema_extra" in kwargs:
extra_schema = kwargs.pop("json_schema_extra")
field_schema_extra.update(extra_schema)
return Field(
title=title,
description=description,
json_schema_extra=field_schema_extra, # validated on BlockSchema init
**kwargs,
)
class ContributorDetails(BaseModel):
name: str = Field(title="Name", description="The name of the contributor.")
class TopUpType(enum.Enum):
AUTO = "AUTO"
MANUAL = "MANUAL"
UNCATEGORIZED = "UNCATEGORIZED"
class AutoTopUpConfig(BaseModel):
amount: int
"""Amount of credits to top up."""
threshold: int
"""Threshold to trigger auto top up."""
class UserTransaction(BaseModel):
transaction_key: str = ""
transaction_time: datetime = datetime.min.replace(tzinfo=timezone.utc)
transaction_type: CreditTransactionType = CreditTransactionType.USAGE
amount: int = 0
running_balance: int = 0
current_balance: int = 0
description: str | None = None
usage_graph_id: str | None = None
usage_execution_id: str | None = None
usage_node_count: int = 0
usage_start_time: datetime = datetime.max.replace(tzinfo=timezone.utc)
user_id: str
user_email: str | None = None
reason: str | None = None
admin_email: str | None = None
extra_data: str | None = None
class TransactionHistory(BaseModel):
transactions: list[UserTransaction]
next_transaction_time: datetime | None
class RefundRequest(BaseModel):
id: str
user_id: str
transaction_key: str
amount: int
reason: str
result: str | None = None
status: str
created_at: datetime
updated_at: datetime
class NodeExecutionStats(BaseModel):
"""Execution statistics for a node execution."""
model_config = ConfigDict(
extra="allow",
arbitrary_types_allowed=True,
)
error: Optional[BaseException | str] = None
walltime: float = 0
cputime: float = 0
input_size: int = 0
output_size: int = 0
llm_call_count: int = 0
llm_retry_count: int = 0
input_token_count: int = 0
output_token_count: int = 0
extra_cost: int = 0
extra_steps: int = 0
# Moderation fields
cleared_inputs: Optional[dict[str, list[str]]] = None
cleared_outputs: Optional[dict[str, list[str]]] = None
def __iadd__(self, other: "NodeExecutionStats") -> "NodeExecutionStats":
"""Mutate this instance by adding another NodeExecutionStats."""
if not isinstance(other, NodeExecutionStats):
return NotImplemented
stats_dict = other.model_dump()
current_stats = self.model_dump()
for key, value in stats_dict.items():
if key not in current_stats:
# Field doesn't exist yet, just set it
setattr(self, key, value)
elif isinstance(value, dict) and isinstance(current_stats[key], dict):
current_stats[key].update(value)
setattr(self, key, current_stats[key])
elif isinstance(value, (int, float)) and isinstance(
current_stats[key], (int, float)
):
setattr(self, key, current_stats[key] + value)
elif isinstance(value, list) and isinstance(current_stats[key], list):
current_stats[key].extend(value)
setattr(self, key, current_stats[key])
else:
setattr(self, key, value)
return self
class GraphExecutionStats(BaseModel):
"""Execution statistics for a graph execution."""
model_config = ConfigDict(
extra="allow",
arbitrary_types_allowed=True,
)
error: Optional[Exception | str] = None
walltime: float = Field(
default=0, description="Time between start and end of run (seconds)"
)
cputime: float = 0
nodes_walltime: float = Field(
default=0, description="Total node execution time (seconds)"
)
nodes_cputime: float = 0
node_count: int = Field(default=0, description="Total number of node executions")
node_error_count: int = Field(
default=0, description="Total number of errors generated"
)
cost: int = Field(default=0, description="Total execution cost (cents)")
activity_status: Optional[str] = Field(
default=None, description="AI-generated summary of what the agent did"
)
correctness_score: Optional[float] = Field(
default=None,
description="AI-generated score (0.0-1.0) indicating how well the execution achieved its intended purpose",
)
class UserExecutionSummaryStats(BaseModel):
"""Summary of user statistics for a specific user."""
model_config = ConfigDict(
extra="allow",
arbitrary_types_allowed=True,
)
total_credits_used: float = Field(default=0)
total_executions: int = Field(default=0)
successful_runs: int = Field(default=0)
failed_runs: int = Field(default=0)
most_used_agent: str = Field(default="")
total_execution_time: float = Field(default=0)
average_execution_time: float = Field(default=0)
cost_breakdown: dict[str, float] = Field(default_factory=dict)
class UserOnboarding(BaseModel):
userId: str
completedSteps: list[OnboardingStep]
walletShown: bool
notified: list[OnboardingStep]
rewardedFor: list[OnboardingStep]
usageReason: Optional[str]
integrations: list[str]
otherIntegrations: Optional[str]
selectedStoreListingVersionId: Optional[str]
agentInput: Optional[dict[str, Any]]
onboardingAgentExecutionId: Optional[str]
agentRuns: int
lastRunAt: Optional[datetime]
consecutiveRunDays: int