support user_password and host_scoped credentials

This commit is contained in:
Reinier van der Leer
2026-03-11 17:31:45 +01:00
parent 604b9f16c0
commit e816bf92d3
2 changed files with 143 additions and 18 deletions

View File

@@ -5,17 +5,21 @@ Provides endpoints for managing integration credentials.
"""
import logging
from typing import Optional
from typing import Annotated, Optional
from uuid import uuid4
from fastapi import APIRouter, HTTPException, Query, Security
from fastapi import APIRouter, Body, HTTPException, Query, Security
from prisma.enums import APIKeyPermission
from pydantic import SecretStr
from starlette import status
from backend.api.external.middleware import require_permission
from backend.data.auth.base import APIAuthorizationInfo
from backend.data.model import APIKeyCredentials
from backend.data.model import (
APIKeyCredentials,
HostScopedCredentials,
UserPasswordCredentials,
)
from ..models import CredentialCreateRequest, CredentialInfo, CredentialListResponse
from .helpers import creds_manager
@@ -52,12 +56,12 @@ async def list_credentials(
@credentials_router.post(
path="/credentials",
summary="Add integration API key credential",
operation_id="addIntegrationAPIKeyCredential",
summary="Create integration credential",
operation_id="createIntegrationCredential",
status_code=status.HTTP_201_CREATED,
)
async def create_credential(
request: CredentialCreateRequest,
request: Annotated[CredentialCreateRequest, Body(discriminator="type")],
auth: APIAuthorizationInfo = Security(
require_permission(APIKeyPermission.MANAGE_INTEGRATIONS)
),
@@ -65,15 +69,34 @@ async def create_credential(
"""
Create a new integration credential.
Only API key credentials can be created via the external API.
Supports `api_key`, `user_password`, and `host_scoped` credential types.
OAuth credentials must be set up through the web UI.
"""
credentials = APIKeyCredentials(
id=str(uuid4()),
provider=request.provider,
title=request.title,
api_key=SecretStr(request.api_key),
)
cred_id = str(uuid4())
if request.type == "api_key":
credentials = APIKeyCredentials(
id=cred_id,
provider=request.provider,
title=request.title,
api_key=SecretStr(request.api_key),
)
elif request.type == "user_password":
credentials = UserPasswordCredentials(
id=cred_id,
provider=request.provider,
title=request.title,
username=SecretStr(request.username),
password=SecretStr(request.password),
)
else:
credentials = HostScopedCredentials(
id=cred_id,
provider=request.provider,
title=request.title,
host=request.host,
headers={k: SecretStr(v) for k, v in request.headers.items()},
)
await creds_manager.create(auth.user_id, credentials)
return CredentialInfo.from_internal(credentials)

View File

@@ -12,7 +12,7 @@ from __future__ import annotations
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Literal, Optional, Self, TypeAlias
from pydantic import BaseModel, Field, JsonValue
from pydantic import BaseModel, Field, JsonValue, field_validator
import backend.blocks._base as block_types
@@ -978,14 +978,116 @@ class CredentialListResponse(BaseModel):
credentials: list[CredentialInfo]
class CredentialCreateRequest(BaseModel):
"""Request to create an API key credential."""
class _CredentialCreateBase(BaseModel):
provider: str = Field(description="Provider name (e.g., 'github', 'openai')")
title: Optional[str] = Field(
default=None, description="User-friendly name for this credential"
)
api_key: str = Field(description="API key value")
class APIKeyCredentialCreateRequest(_CredentialCreateBase):
"""Request to create an API key credential."""
type: Literal["api_key"]
api_key: str
class UserPasswordCredentialCreateRequest(_CredentialCreateBase):
"""Request to create a username/password credential."""
type: Literal["user_password"]
username: str
password: str
class HostScopedCredentialCreateRequest(_CredentialCreateBase):
"""Request to create a host-scoped credential with custom headers."""
type: Literal["host_scoped"]
host: str = Field(
description=(
"Host pattern to match against request URLs. "
"Supports exact hosts (api.example.com), wildcard subdomains "
"(*.example.com), and optional port (api.example.com:8080)"
),
)
headers: dict[str, str] = Field(
description="Key-value header map to add to matching requests"
)
@field_validator("host")
@classmethod
def validate_host(cls, v: str) -> str:
"""
Validates that `host` is a pattern compatible with
`HostScopedCredentials.matches_url()`, which supports exact hosts,
`*.wildcard` subdomains, and optional ports.
"""
import ipaddress
import re
from urllib3.util import parse_url
v = v.strip()
if not v:
raise ValueError("host must not be empty")
try:
parsed = parse_url(v)
except Exception:
# parse_url can't handle bare IPv6 like "::1";
# check if it's a valid IP before rejecting
try:
ipaddress.ip_address(v.strip("[]"))
return v
except ValueError:
pass
raise ValueError(f"Invalid host pattern: {v}")
# If a full URL was given, extract just the host part
if parsed.scheme:
raise ValueError(
f"host must be a host pattern, not a URL: "
f"omit the scheme ({parsed.scheme}://)"
)
if parsed.path and parsed.path != "/":
raise ValueError("host must be a host pattern without a path component")
# Validate the hostname portion (with optional *. prefix)
hostname = parsed.hostname or v.split(":")[0]
# Allow IPv4 and IPv6 addresses (matches_url handles them via exact match)
bare = hostname.strip("[]") # strip brackets from [::1]-style IPv6
try:
ipaddress.ip_address(bare)
return v # valid IP, skip domain validation
except ValueError:
pass
if hostname.startswith("*."):
domain = hostname[2:]
else:
domain = hostname
# Domain validation: labels separated by dots, no empty labels
if not re.match(
r"^[a-zA-Z0-9]([a-zA-Z0-9-]*[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]*[a-zA-Z0-9])?)*$",
domain,
):
raise ValueError(
f"Invalid hostname: {hostname}. "
"Expected a domain like api.example.com, *.example.com, "
"or an IP address"
)
return v
CredentialCreateRequest = (
APIKeyCredentialCreateRequest
| UserPasswordCredentialCreateRequest
| HostScopedCredentialCreateRequest
)
class CredentialRequirement(BaseModel):