feat(platform): generic managed credential system with AgentMail auto-provisioning (#12537)

### Why / What / How

**Why:** We need a third credential type: **system-provided but unique
per user** (managed credentials). Currently we have system credentials
(same for all users) and user credentials (user provides their own
keys). Managed credentials bridge the gap — the platform provisions them
automatically, one per user, for integrations like AgentMail where each
user needs their own pod-scoped API key.

**What:**
- Generic **managed credential provider registry** — any integration can
register a provider that auto-provisions per-user credentials
- **AgentMail** is the first consumer: creates a pod + pod-scoped API
key using the org-level API key
- Managed credentials appear in the credential dropdown like normal API
keys but with `autogpt_managed=True` — users **cannot update or delete**
them
- **Auto-provisioning** on `GET /credentials` — lazily creates managed
credentials when users browse their credential list
- **Account deletion cleanup** utility — revokes external resources
(pods, API keys) before user deletion
- **Frontend UX** — hides the delete button for managed credentials on
the integrations page

**How:**

### Backend

**New files:**
- `backend/integrations/managed_credentials.py` —
`ManagedCredentialProvider` ABC, global registry,
`ensure_managed_credentials()` (with per-user asyncio lock +
`asyncio.gather` for concurrency), `cleanup_managed_credentials()`
- `backend/integrations/managed_providers/__init__.py` —
`register_all()` called at startup
- `backend/integrations/managed_providers/agentmail.py` —
`AgentMailManagedProvider` with `provision()` (creates pod + API key via
agentmail SDK) and `deprovision()` (deletes pod)

**Modified files:**
- `credentials_store.py` — `autogpt_managed` guards on update/delete,
`has_managed_credential()` / `add_managed_credential()` helpers
- `model.py` — `autogpt_managed: bool` + `metadata: dict` on
`_BaseCredentials`
- `router.py` — calls `ensure_managed_credentials()` in list endpoints,
removed explicit `/agentmail/connect` endpoint
- `user.py` — `cleanup_user_managed_credentials()` for account deletion
- `rest_api.py` — registers managed providers at startup
- `settings.py` — `agentmail_api_key` setting

### Frontend
- Added `autogpt_managed` to `CredentialsMetaResponse` type
- Conditionally hides delete button on integrations page for managed
credentials

### Key design decisions
- **Auto-provision in API layer, not data layer** — keeps
`get_all_creds()` side-effect-free
- **Race-safe** — per-(user, provider) asyncio lock with double-check
pattern prevents duplicate pods
- **Idempotent** — AgentMail SDK `client_id` ensures pod creation is
idempotent; `add_managed_credential()` uses upsert under Redis lock
- **Error-resilient** — provisioning failures are logged but never block
credential listing

### Changes 🏗️

| File | Action | Description |
|------|--------|-------------|
| `backend/integrations/managed_credentials.py` | NEW | ABC, registry,
ensure/cleanup |
| `backend/integrations/managed_providers/__init__.py` | NEW | Registers
all providers at startup |
| `backend/integrations/managed_providers/agentmail.py` | NEW |
AgentMail provisioning/deprovisioning |
| `backend/integrations/credentials_store.py` | MODIFY | Guards +
managed credential helpers |
| `backend/data/model.py` | MODIFY | `autogpt_managed` + `metadata`
fields |
| `backend/api/features/integrations/router.py` | MODIFY |
Auto-provision on list, removed `/agentmail/connect` |
| `backend/data/user.py` | MODIFY | Account deletion cleanup |
| `backend/api/rest_api.py` | MODIFY | Provider registration at startup
|
| `backend/util/settings.py` | MODIFY | `agentmail_api_key` setting |
| `frontend/.../integrations/page.tsx` | MODIFY | Hide delete for
managed creds |
| `frontend/.../types.ts` | MODIFY | `autogpt_managed` field |

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
- [x] 23 tests pass in `router_test.py` (9 new tests for
ensure/cleanup/auto-provisioning)
  - [x] `poetry run format && poetry run lint` — clean
  - [x] OpenAPI schema regenerated
- [x] Manual: verify managed credential appears in AgentMail block
dropdown
  - [x] Manual: verify delete button hidden for managed credentials
- [x] Manual: verify managed credential cannot be deleted via API (403)

#### For configuration changes:
- [x] `.env.default` is updated with `AGENTMAIL_API_KEY=`

---------

Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
This commit is contained in:
Abhimanyu Yadav
2026-03-31 18:26:18 +05:30
committed by GitHub
parent a20188ae59
commit 57b17dc8e1
16 changed files with 810 additions and 37 deletions

View File

@@ -178,6 +178,7 @@ SMTP_USERNAME=
SMTP_PASSWORD=
# Business & Marketing Tools
AGENTMAIL_API_KEY=
APOLLO_API_KEY=
ENRICHLAYER_API_KEY=
AYRSHARE_API_KEY=

View File

@@ -31,7 +31,10 @@ from backend.data.model import (
UserPasswordCredentials,
is_sdk_default,
)
from backend.integrations.credentials_store import provider_matches
from backend.integrations.credentials_store import (
is_system_credential,
provider_matches,
)
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.integrations.oauth import CREDENTIALS_BY_PROVIDER, HANDLERS_BY_NAME
from backend.integrations.providers import ProviderName
@@ -618,6 +621,11 @@ async def delete_credential(
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Credentials not found"
)
if is_system_credential(cred_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="System-managed credentials cannot be deleted",
)
creds = await creds_manager.store.get_creds_by_id(auth.user_id, cred_id)
if not creds:
raise HTTPException(

View File

@@ -40,11 +40,15 @@ from backend.data.onboarding import OnboardingStep, complete_onboarding_step
from backend.data.user import get_user_integrations
from backend.executor.utils import add_graph_execution
from backend.integrations.ayrshare import AyrshareClient, SocialPlatform
from backend.integrations.credentials_store import provider_matches
from backend.integrations.credentials_store import (
is_system_credential,
provider_matches,
)
from backend.integrations.creds_manager import (
IntegrationCredentialsManager,
create_mcp_oauth_handler,
)
from backend.integrations.managed_credentials import ensure_managed_credentials
from backend.integrations.oauth import CREDENTIALS_BY_PROVIDER, HANDLERS_BY_NAME
from backend.integrations.providers import ProviderName
from backend.integrations.webhooks import get_webhook_manager
@@ -110,6 +114,7 @@ class CredentialsMetaResponse(BaseModel):
default=None,
description="Host pattern for host-scoped or MCP server URL for MCP credentials",
)
is_managed: bool = False
@model_validator(mode="before")
@classmethod
@@ -148,6 +153,7 @@ def to_meta_response(cred: Credentials) -> CredentialsMetaResponse:
scopes=cred.scopes if isinstance(cred, OAuth2Credentials) else None,
username=cred.username if isinstance(cred, OAuth2Credentials) else None,
host=CredentialsMetaResponse.get_host(cred),
is_managed=cred.is_managed,
)
@@ -224,6 +230,9 @@ async def callback(
async def list_credentials(
user_id: Annotated[str, Security(get_user_id)],
) -> list[CredentialsMetaResponse]:
# Fire-and-forget: provision missing managed credentials in the background.
# The credential appears on the next page load; listing is never blocked.
asyncio.create_task(ensure_managed_credentials(user_id, creds_manager.store))
credentials = await creds_manager.store.get_all_creds(user_id)
return [
@@ -238,6 +247,7 @@ async def list_credentials_by_provider(
],
user_id: Annotated[str, Security(get_user_id)],
) -> list[CredentialsMetaResponse]:
asyncio.create_task(ensure_managed_credentials(user_id, creds_manager.store))
credentials = await creds_manager.store.get_creds_by_provider(user_id, provider)
return [
@@ -332,6 +342,11 @@ async def delete_credentials(
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Credentials not found"
)
if is_system_credential(cred_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="System-managed credentials cannot be deleted",
)
creds = await creds_manager.store.get_creds_by_id(user_id, cred_id)
if not creds:
raise HTTPException(
@@ -342,6 +357,11 @@ async def delete_credentials(
status_code=status.HTTP_404_NOT_FOUND,
detail="Credentials not found",
)
if creds.is_managed:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="AutoGPT-managed credentials cannot be deleted",
)
try:
await remove_all_webhooks_for_credentials(user_id, creds, force)

View File

@@ -1,6 +1,7 @@
"""Tests for credentials API security: no secret leakage, SDK defaults filtered."""
from unittest.mock import AsyncMock, patch
from contextlib import asynccontextmanager
from unittest.mock import AsyncMock, MagicMock, patch
import fastapi
import fastapi.testclient
@@ -276,3 +277,294 @@ class TestCreateCredentialNoSecretInResponse:
assert resp.status_code == 403
mock_mgr.create.assert_not_called()
class TestManagedCredentials:
"""AutoGPT-managed credentials cannot be deleted by users."""
def test_delete_is_managed_returns_403(self):
cred = APIKeyCredentials(
id="managed-cred-1",
provider="agent_mail",
title="AgentMail (managed by AutoGPT)",
api_key=SecretStr("sk-managed-key"),
is_managed=True,
)
with patch(
"backend.api.features.integrations.router.creds_manager"
) as mock_mgr:
mock_mgr.store.get_creds_by_id = AsyncMock(return_value=cred)
resp = client.request("DELETE", "/agent_mail/credentials/managed-cred-1")
assert resp.status_code == 403
assert "AutoGPT-managed" in resp.json()["detail"]
def test_list_credentials_includes_is_managed_field(self):
managed = APIKeyCredentials(
id="managed-1",
provider="agent_mail",
title="AgentMail (managed)",
api_key=SecretStr("sk-key"),
is_managed=True,
)
regular = APIKeyCredentials(
id="regular-1",
provider="openai",
title="My Key",
api_key=SecretStr("sk-key"),
)
with patch(
"backend.api.features.integrations.router.creds_manager"
) as mock_mgr:
mock_mgr.store.get_all_creds = AsyncMock(return_value=[managed, regular])
resp = client.get("/credentials")
assert resp.status_code == 200
data = resp.json()
managed_cred = next(c for c in data if c["id"] == "managed-1")
regular_cred = next(c for c in data if c["id"] == "regular-1")
assert managed_cred["is_managed"] is True
assert regular_cred["is_managed"] is False
# ---------------------------------------------------------------------------
# Managed credential provisioning infrastructure
# ---------------------------------------------------------------------------
def _make_managed_cred(
provider: str = "agent_mail", pod_id: str = "pod-abc"
) -> APIKeyCredentials:
return APIKeyCredentials(
id="managed-auto",
provider=provider,
title="AgentMail (managed by AutoGPT)",
api_key=SecretStr("sk-pod-key"),
is_managed=True,
metadata={"pod_id": pod_id},
)
def _make_store_mock(**kwargs) -> MagicMock:
"""Create a store mock with a working async ``locks()`` context manager."""
@asynccontextmanager
async def _noop_locked(key):
yield
locks_obj = MagicMock()
locks_obj.locked = _noop_locked
store = MagicMock(**kwargs)
store.locks = AsyncMock(return_value=locks_obj)
return store
class TestEnsureManagedCredentials:
"""Unit tests for the ensure/cleanup helpers in managed_credentials.py."""
@pytest.mark.asyncio
async def test_provisions_when_missing(self):
"""Provider.provision() is called when no managed credential exists."""
from backend.integrations.managed_credentials import (
_PROVIDERS,
_provisioned_users,
ensure_managed_credentials,
)
cred = _make_managed_cred()
provider = MagicMock()
provider.provider_name = "test_provider"
provider.is_available = AsyncMock(return_value=True)
provider.provision = AsyncMock(return_value=cred)
store = _make_store_mock()
store.has_managed_credential = AsyncMock(return_value=False)
store.add_managed_credential = AsyncMock()
saved = dict(_PROVIDERS)
_PROVIDERS.clear()
_PROVIDERS["test_provider"] = provider
_provisioned_users.pop("user-1", None)
try:
await ensure_managed_credentials("user-1", store)
finally:
_PROVIDERS.clear()
_PROVIDERS.update(saved)
_provisioned_users.pop("user-1", None)
provider.provision.assert_awaited_once_with("user-1")
store.add_managed_credential.assert_awaited_once_with("user-1", cred)
@pytest.mark.asyncio
async def test_skips_when_already_exists(self):
"""Provider.provision() is NOT called when managed credential exists."""
from backend.integrations.managed_credentials import (
_PROVIDERS,
_provisioned_users,
ensure_managed_credentials,
)
provider = MagicMock()
provider.provider_name = "test_provider"
provider.is_available = AsyncMock(return_value=True)
provider.provision = AsyncMock()
store = _make_store_mock()
store.has_managed_credential = AsyncMock(return_value=True)
saved = dict(_PROVIDERS)
_PROVIDERS.clear()
_PROVIDERS["test_provider"] = provider
_provisioned_users.pop("user-1", None)
try:
await ensure_managed_credentials("user-1", store)
finally:
_PROVIDERS.clear()
_PROVIDERS.update(saved)
_provisioned_users.pop("user-1", None)
provider.provision.assert_not_awaited()
@pytest.mark.asyncio
async def test_skips_when_unavailable(self):
"""Provider.provision() is NOT called when provider is not available."""
from backend.integrations.managed_credentials import (
_PROVIDERS,
_provisioned_users,
ensure_managed_credentials,
)
provider = MagicMock()
provider.provider_name = "test_provider"
provider.is_available = AsyncMock(return_value=False)
provider.provision = AsyncMock()
store = _make_store_mock()
store.has_managed_credential = AsyncMock()
saved = dict(_PROVIDERS)
_PROVIDERS.clear()
_PROVIDERS["test_provider"] = provider
_provisioned_users.pop("user-1", None)
try:
await ensure_managed_credentials("user-1", store)
finally:
_PROVIDERS.clear()
_PROVIDERS.update(saved)
_provisioned_users.pop("user-1", None)
provider.provision.assert_not_awaited()
store.has_managed_credential.assert_not_awaited()
@pytest.mark.asyncio
async def test_provision_failure_does_not_propagate(self):
"""A failed provision is logged but does not raise."""
from backend.integrations.managed_credentials import (
_PROVIDERS,
_provisioned_users,
ensure_managed_credentials,
)
provider = MagicMock()
provider.provider_name = "test_provider"
provider.is_available = AsyncMock(return_value=True)
provider.provision = AsyncMock(side_effect=RuntimeError("boom"))
store = _make_store_mock()
store.has_managed_credential = AsyncMock(return_value=False)
saved = dict(_PROVIDERS)
_PROVIDERS.clear()
_PROVIDERS["test_provider"] = provider
_provisioned_users.pop("user-1", None)
try:
await ensure_managed_credentials("user-1", store)
finally:
_PROVIDERS.clear()
_PROVIDERS.update(saved)
_provisioned_users.pop("user-1", None)
# No exception raised — provisioning failure is swallowed.
class TestCleanupManagedCredentials:
"""Unit tests for cleanup_managed_credentials."""
@pytest.mark.asyncio
async def test_calls_deprovision_for_managed_creds(self):
from backend.integrations.managed_credentials import (
_PROVIDERS,
cleanup_managed_credentials,
)
cred = _make_managed_cred()
provider = MagicMock()
provider.provider_name = "agent_mail"
provider.deprovision = AsyncMock()
store = MagicMock()
store.get_all_creds = AsyncMock(return_value=[cred])
saved = dict(_PROVIDERS)
_PROVIDERS.clear()
_PROVIDERS["agent_mail"] = provider
try:
await cleanup_managed_credentials("user-1", store)
finally:
_PROVIDERS.clear()
_PROVIDERS.update(saved)
provider.deprovision.assert_awaited_once_with("user-1", cred)
@pytest.mark.asyncio
async def test_skips_non_managed_creds(self):
from backend.integrations.managed_credentials import (
_PROVIDERS,
cleanup_managed_credentials,
)
regular = _make_api_key_cred()
provider = MagicMock()
provider.provider_name = "openai"
provider.deprovision = AsyncMock()
store = MagicMock()
store.get_all_creds = AsyncMock(return_value=[regular])
saved = dict(_PROVIDERS)
_PROVIDERS.clear()
_PROVIDERS["openai"] = provider
try:
await cleanup_managed_credentials("user-1", store)
finally:
_PROVIDERS.clear()
_PROVIDERS.update(saved)
provider.deprovision.assert_not_awaited()
@pytest.mark.asyncio
async def test_deprovision_failure_does_not_propagate(self):
from backend.integrations.managed_credentials import (
_PROVIDERS,
cleanup_managed_credentials,
)
cred = _make_managed_cred()
provider = MagicMock()
provider.provider_name = "agent_mail"
provider.deprovision = AsyncMock(side_effect=RuntimeError("boom"))
store = MagicMock()
store.get_all_creds = AsyncMock(return_value=[cred])
saved = dict(_PROVIDERS)
_PROVIDERS.clear()
_PROVIDERS["agent_mail"] = provider
try:
await cleanup_managed_credentials("user-1", store)
finally:
_PROVIDERS.clear()
_PROVIDERS.update(saved)
# No exception raised — cleanup failure is swallowed.

View File

@@ -12,6 +12,7 @@ Tests cover:
5. Complete OAuth flow end-to-end
"""
import asyncio
import base64
import hashlib
import secrets
@@ -58,14 +59,27 @@ async def test_user(server, test_user_id: str):
yield test_user_id
# Cleanup - delete in correct order due to foreign key constraints
await PrismaOAuthAccessToken.prisma().delete_many(where={"userId": test_user_id})
await PrismaOAuthRefreshToken.prisma().delete_many(where={"userId": test_user_id})
await PrismaOAuthAuthorizationCode.prisma().delete_many(
where={"userId": test_user_id}
)
await PrismaOAuthApplication.prisma().delete_many(where={"ownerId": test_user_id})
await PrismaUser.prisma().delete(where={"id": test_user_id})
# Cleanup - delete in correct order due to foreign key constraints.
# Wrap in try/except because the event loop or Prisma engine may already
# be closed during session teardown on Python 3.12+.
try:
await asyncio.gather(
PrismaOAuthAccessToken.prisma().delete_many(where={"userId": test_user_id}),
PrismaOAuthRefreshToken.prisma().delete_many(
where={"userId": test_user_id}
),
PrismaOAuthAuthorizationCode.prisma().delete_many(
where={"userId": test_user_id}
),
)
await asyncio.gather(
PrismaOAuthApplication.prisma().delete_many(
where={"ownerId": test_user_id}
),
PrismaUser.prisma().delete(where={"id": test_user_id}),
)
except RuntimeError:
pass
@pytest_asyncio.fixture

View File

@@ -118,6 +118,11 @@ async def lifespan_context(app: fastapi.FastAPI):
AutoRegistry.patch_integrations()
# Register managed credential providers (e.g. AgentMail)
from backend.integrations.managed_providers import register_all
register_all()
await backend.data.block.initialize_blocks()
await backend.data.user.migrate_and_encrypt_user_integrations()

View File

@@ -325,6 +325,8 @@ class _BaseCredentials(BaseModel):
id: str = Field(default_factory=lambda: str(uuid4()))
provider: str
title: Optional[str] = None
is_managed: bool = False
metadata: dict[str, Any] = Field(default_factory=dict)
@field_serializer("*")
def dump_secret_strings(value: Any, _info):
@@ -344,7 +346,6 @@ class OAuth2Credentials(_BaseCredentials):
refresh_token_expires_at: Optional[int] = None
"""Unix timestamp (seconds) indicating when the refresh token expires (if at all)"""
scopes: list[str]
metadata: dict[str, Any] = Field(default_factory=dict)
def auth_header(self) -> str:
return f"Bearer {self.access_token.get_secret_value()}"

View File

@@ -3,7 +3,7 @@ import hashlib
import hmac
import logging
from datetime import datetime, timedelta
from typing import Optional, cast
from typing import TYPE_CHECKING, Optional, cast
from urllib.parse import quote_plus
from autogpt_libs.auth.models import DEFAULT_USER_ID
@@ -21,6 +21,9 @@ from backend.util.exceptions import DatabaseError
from backend.util.json import SafeJson
from backend.util.settings import Settings
if TYPE_CHECKING:
from backend.integrations.credentials_store import IntegrationCredentialsStore
logger = logging.getLogger(__name__)
settings = Settings()
@@ -453,6 +456,27 @@ async def unsubscribe_user_by_token(token: str) -> None:
raise DatabaseError(f"Failed to unsubscribe user by token {token}: {e}") from e
async def cleanup_user_managed_credentials(
user_id: str,
store: Optional["IntegrationCredentialsStore"] = None,
) -> None:
"""Revoke all externally-provisioned managed credentials for *user_id*.
Call this before deleting a user account so that external resources
(e.g. AgentMail pods, pod-scoped API keys) are properly cleaned up.
The credential rows themselves are cascade-deleted with the User row.
Pass an existing *store* for testability; when omitted a fresh instance
is created.
"""
from backend.integrations.credentials_store import IntegrationCredentialsStore
from backend.integrations.managed_credentials import cleanup_managed_credentials
if store is None:
store = IntegrationCredentialsStore()
await cleanup_managed_credentials(user_id, store)
async def update_user_timezone(user_id: str, timezone: str) -> User:
"""Update a user's timezone setting."""
try:

View File

@@ -1,5 +1,6 @@
import base64
import hashlib
import logging
import secrets
from contextlib import asynccontextmanager
from datetime import datetime, timedelta, timezone
@@ -21,6 +22,7 @@ from backend.data.redis_client import get_redis_async
from backend.util.settings import Settings
settings = Settings()
logger = logging.getLogger(__name__)
def provider_matches(stored: str, expected: str) -> bool:
@@ -284,6 +286,7 @@ DEFAULT_CREDENTIALS = [
elevenlabs_credentials,
]
SYSTEM_CREDENTIAL_IDS = {cred.id for cred in DEFAULT_CREDENTIALS}
# Set of providers that have system credentials available
@@ -323,20 +326,45 @@ class IntegrationCredentialsStore:
return get_database_manager_async_client()
# =============== USER-MANAGED CREDENTIALS =============== #
async def _get_persisted_user_creds_unlocked(
self, user_id: str
) -> list[Credentials]:
"""Return only the persisted (user-stored) credentials — no side effects.
**Caller must already hold ``locked_user_integrations(user_id)``.**
"""
return list((await self._get_user_integrations(user_id)).credentials)
async def add_creds(self, user_id: str, credentials: Credentials) -> None:
async with await self.locked_user_integrations(user_id):
if await self.get_creds_by_id(user_id, credentials.id):
# Check system/managed IDs without triggering provisioning
if credentials.id in SYSTEM_CREDENTIAL_IDS:
raise ValueError(
f"Can not re-create existing credentials #{credentials.id} "
f"for user #{user_id}"
)
await self._set_user_integration_creds(
user_id, [*(await self.get_all_creds(user_id)), credentials]
)
persisted = await self._get_persisted_user_creds_unlocked(user_id)
if any(c.id == credentials.id for c in persisted):
raise ValueError(
f"Can not re-create existing credentials #{credentials.id} "
f"for user #{user_id}"
)
await self._set_user_integration_creds(user_id, [*persisted, credentials])
async def get_all_creds(self, user_id: str) -> list[Credentials]:
users_credentials = (await self._get_user_integrations(user_id)).credentials
all_credentials = users_credentials
"""Public entry point — acquires lock, then delegates."""
async with await self.locked_user_integrations(user_id):
return await self._get_all_creds_unlocked(user_id)
async def _get_all_creds_unlocked(self, user_id: str) -> list[Credentials]:
"""Return all credentials for *user_id*.
**Caller must already hold ``locked_user_integrations(user_id)``.**
"""
user_integrations = await self._get_user_integrations(user_id)
all_credentials = list(user_integrations.credentials)
# These will always be added
all_credentials.append(ollama_credentials)
@@ -417,13 +445,22 @@ class IntegrationCredentialsStore:
return list(set(c.provider for c in credentials))
async def update_creds(self, user_id: str, updated: Credentials) -> None:
if updated.id in SYSTEM_CREDENTIAL_IDS:
raise ValueError(
f"System credential #{updated.id} cannot be updated directly"
)
async with await self.locked_user_integrations(user_id):
current = await self.get_creds_by_id(user_id, updated.id)
persisted = await self._get_persisted_user_creds_unlocked(user_id)
current = next((c for c in persisted if c.id == updated.id), None)
if not current:
raise ValueError(
f"Credentials with ID {updated.id} "
f"for user with ID {user_id} not found"
)
if current.is_managed:
raise ValueError(
f"AutoGPT-managed credential #{updated.id} cannot be updated"
)
if type(current) is not type(updated):
raise TypeError(
f"Can not update credentials with ID {updated.id} "
@@ -443,22 +480,53 @@ class IntegrationCredentialsStore:
f"to more restrictive set of scopes {updated.scopes}"
)
# Update the credentials
# Update only persisted credentials — no side-effectful provisioning
updated_credentials_list = [
updated if c.id == updated.id else c
for c in await self.get_all_creds(user_id)
updated if c.id == updated.id else c for c in persisted
]
await self._set_user_integration_creds(user_id, updated_credentials_list)
async def delete_creds_by_id(self, user_id: str, credentials_id: str) -> None:
if credentials_id in SYSTEM_CREDENTIAL_IDS:
raise ValueError(f"System credential #{credentials_id} cannot be deleted")
async with await self.locked_user_integrations(user_id):
filtered_credentials = [
c for c in await self.get_all_creds(user_id) if c.id != credentials_id
]
persisted = await self._get_persisted_user_creds_unlocked(user_id)
target = next((c for c in persisted if c.id == credentials_id), None)
if target and target.is_managed:
raise ValueError(
f"AutoGPT-managed credential #{credentials_id} cannot be deleted"
)
filtered_credentials = [c for c in persisted if c.id != credentials_id]
await self._set_user_integration_creds(user_id, filtered_credentials)
# ============== SYSTEM-MANAGED CREDENTIALS ============== #
async def has_managed_credential(self, user_id: str, provider: str) -> bool:
"""Check if a managed credential exists for *provider*."""
user_integrations = await self._get_user_integrations(user_id)
return any(
c.provider == provider and c.is_managed
for c in user_integrations.credentials
)
async def add_managed_credential(
self, user_id: str, credential: Credentials
) -> None:
"""Upsert a managed credential.
Removes any existing managed credential for the same provider,
then appends the new one. The credential MUST have is_managed=True.
"""
if not credential.is_managed:
raise ValueError("credential.is_managed must be True")
async with self.edit_user_integrations(user_id) as user_integrations:
user_integrations.credentials = [
c
for c in user_integrations.credentials
if not (c.provider == credential.provider and c.is_managed)
]
user_integrations.credentials.append(credential)
async def set_ayrshare_profile_key(self, user_id: str, profile_key: str) -> None:
"""Set the Ayrshare profile key for a user.

View File

@@ -0,0 +1,188 @@
"""Generic infrastructure for system-provided, per-user managed credentials.
Managed credentials are provisioned automatically by the platform (e.g. an
AgentMail pod-scoped API key) and stored alongside regular user credentials
with ``is_managed=True``. Users cannot update or delete them.
New integrations register a :class:`ManagedCredentialProvider` at import time;
the two entry-points consumed by the rest of the application are:
* :func:`ensure_managed_credentials` fired as a background task from the
credential-listing endpoints (non-blocking).
* :func:`cleanup_managed_credentials` called during account deletion to
revoke external resources (API keys, pods, etc.).
"""
from __future__ import annotations
import asyncio
import logging
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
from cachetools import TTLCache
if TYPE_CHECKING:
from backend.data.model import Credentials
from backend.integrations.credentials_store import IntegrationCredentialsStore
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Abstract provider
# ---------------------------------------------------------------------------
class ManagedCredentialProvider(ABC):
"""Base class for integrations that auto-provision per-user credentials."""
provider_name: str
"""Must match the ``provider`` field on the resulting credential."""
@abstractmethod
async def is_available(self) -> bool:
"""Return ``True`` when the org-level configuration is present."""
@abstractmethod
async def provision(self, user_id: str) -> Credentials:
"""Create external resources and return a credential.
The returned credential **must** have ``is_managed=True``.
"""
@abstractmethod
async def deprovision(self, user_id: str, credential: Credentials) -> None:
"""Revoke external resources during account deletion."""
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
_PROVIDERS: dict[str, ManagedCredentialProvider] = {}
# Users whose managed credentials have already been verified recently.
# Avoids redundant DB checks on every GET /credentials call.
# maxsize caps memory; TTL re-checks periodically (e.g. when new providers
# are added). ~100K entries ≈ 4-8 MB.
_provisioned_users: TTLCache[str, bool] = TTLCache(maxsize=100_000, ttl=3600)
def register_managed_provider(provider: ManagedCredentialProvider) -> None:
_PROVIDERS[provider.provider_name] = provider
def get_managed_provider(name: str) -> ManagedCredentialProvider | None:
return _PROVIDERS.get(name)
def get_managed_providers() -> dict[str, ManagedCredentialProvider]:
return dict(_PROVIDERS)
# ---------------------------------------------------------------------------
# Public helpers
# ---------------------------------------------------------------------------
async def _ensure_one(
user_id: str,
store: IntegrationCredentialsStore,
name: str,
provider: ManagedCredentialProvider,
) -> bool:
"""Provision a single managed credential under a distributed Redis lock.
Returns ``True`` if the credential already exists or was successfully
provisioned, ``False`` on transient failure so the caller knows not to
cache the user as fully provisioned.
"""
try:
if not await provider.is_available():
return True
# Use a distributed Redis lock so the check-then-provision operation
# is atomic across all workers, preventing duplicate external
# resource provisioning (e.g. AgentMail API keys).
locks = await store.locks()
key = (f"user:{user_id}", f"managed-provision:{name}")
async with locks.locked(key):
# Re-check under lock to avoid duplicate provisioning.
if await store.has_managed_credential(user_id, name):
return True
credential = await provider.provision(user_id)
await store.add_managed_credential(user_id, credential)
logger.info(
"Provisioned managed credential for provider=%s user=%s",
name,
user_id,
)
return True
except Exception:
logger.warning(
"Failed to provision managed credential for provider=%s user=%s",
name,
user_id,
exc_info=True,
)
return False
async def ensure_managed_credentials(
user_id: str,
store: IntegrationCredentialsStore,
) -> None:
"""Provision missing managed credentials for *user_id*.
Fired as a non-blocking background task from the credential-listing
endpoints. Failures are logged but never propagated — the user simply
will not see the managed credential until the next page load.
Skips entirely if this user has already been checked during the current
process lifetime (in-memory cache). Resets on restart — just a
performance optimisation, not a correctness guarantee.
Providers are checked concurrently via ``asyncio.gather``.
"""
if user_id in _provisioned_users:
return
results = await asyncio.gather(
*(_ensure_one(user_id, store, n, p) for n, p in _PROVIDERS.items())
)
# Only cache the user as provisioned when every provider succeeded or
# was already present. A transient failure (network timeout, Redis
# blip) returns False, so the next page load will retry.
if all(results):
_provisioned_users[user_id] = True
async def cleanup_managed_credentials(
user_id: str,
store: IntegrationCredentialsStore,
) -> None:
"""Revoke all external managed resources for a user being deleted."""
all_creds = await store.get_all_creds(user_id)
managed = [c for c in all_creds if c.is_managed]
for cred in managed:
provider = _PROVIDERS.get(cred.provider)
if not provider:
logger.warning(
"No managed provider registered for %s — skipping cleanup",
cred.provider,
)
continue
try:
await provider.deprovision(user_id, cred)
logger.info(
"Deprovisioned managed credential for provider=%s user=%s",
cred.provider,
user_id,
)
except Exception:
logger.error(
"Failed to deprovision %s for user %s",
cred.provider,
user_id,
exc_info=True,
)

View File

@@ -0,0 +1,17 @@
"""Managed credential providers.
Call :func:`register_all` at application startup (e.g. in ``rest_api.py``)
to populate the provider registry before any requests are processed.
"""
from backend.integrations.managed_credentials import (
get_managed_provider,
register_managed_provider,
)
from backend.integrations.managed_providers.agentmail import AgentMailManagedProvider
def register_all() -> None:
"""Register every built-in managed credential provider (idempotent)."""
if get_managed_provider(AgentMailManagedProvider.provider_name) is None:
register_managed_provider(AgentMailManagedProvider())

View File

@@ -0,0 +1,90 @@
"""AgentMail managed credential provider.
Uses the org-level AgentMail API key to create a per-user pod and a
pod-scoped API key. The pod key is stored as an ``is_managed``
credential so it appears automatically in block credential dropdowns.
"""
from __future__ import annotations
import logging
from pydantic import SecretStr
from backend.data.model import APIKeyCredentials, Credentials
from backend.integrations.managed_credentials import ManagedCredentialProvider
from backend.util.settings import Settings
logger = logging.getLogger(__name__)
settings = Settings()
class AgentMailManagedProvider(ManagedCredentialProvider):
provider_name = "agent_mail"
async def is_available(self) -> bool:
return bool(settings.secrets.agentmail_api_key)
async def provision(self, user_id: str) -> Credentials:
from agentmail import AsyncAgentMail
client = AsyncAgentMail(api_key=settings.secrets.agentmail_api_key)
# client_id makes pod creation idempotent — if a pod already exists
# for this user_id the SDK returns the existing pod.
pod = await client.pods.create(client_id=user_id, name=f"{user_id}-pod")
# NOTE: api_keys.create() is NOT idempotent. If the caller retries
# after a partial failure (pod created, key created, but store write
# failed), a second key will be created and the first becomes orphaned
# on AgentMail's side. The double-check pattern in _ensure_one
# (has_managed_credential under lock) prevents this in normal flow;
# only a crash between key creation and store write can cause it.
api_key_obj = await client.pods.api_keys.create(
pod_id=pod.pod_id, name=f"{user_id}-agpt-managed"
)
return APIKeyCredentials(
provider=self.provider_name,
title="AgentMail (managed by AutoGPT)",
api_key=SecretStr(api_key_obj.api_key),
expires_at=None,
is_managed=True,
metadata={"pod_id": pod.pod_id},
)
async def deprovision(self, user_id: str, credential: Credentials) -> None:
from agentmail import AsyncAgentMail
pod_id = credential.metadata.get("pod_id")
if not pod_id:
logger.warning(
"Managed credential for user %s has no pod_id in metadata — "
"skipping AgentMail cleanup",
user_id,
)
return
client = AsyncAgentMail(api_key=settings.secrets.agentmail_api_key)
try:
# Verify the pod actually belongs to this user before deleting,
# as a safety measure against cross-user deletion via the
# org-level API key.
pod = await client.pods.get(pod_id=pod_id)
if getattr(pod, "client_id", None) and pod.client_id != user_id:
logger.error(
"Pod %s client_id=%s does not match user %s"
"refusing to delete",
pod_id,
pod.client_id,
user_id,
)
return
await client.pods.delete(pod_id=pod_id)
except Exception:
logger.warning(
"Failed to delete AgentMail pod %s for user %s",
pod_id,
user_id,
exc_info=True,
)

View File

@@ -708,6 +708,8 @@ class Secrets(UpdateTrackingModel["Secrets"], BaseSettings):
description="The LaunchDarkly SDK key for feature flag management",
)
agentmail_api_key: str = Field(default="", description="AgentMail API Key")
ayrshare_api_key: str = Field(default="", description="Ayrshare API Key")
ayrshare_jwt_key: str = Field(default="", description="Ayrshare private Key")

View File

@@ -198,12 +198,14 @@ export default function UserIntegrationsPage() {
</small>
</TableCell>
<TableCell className="w-0 whitespace-nowrap">
<Button
variant="destructive"
onClick={() => removeCredentials(cred.provider, cred.id)}
>
<Trash2Icon className="mr-1.5 size-4" /> Delete
</Button>
{!cred.is_managed && (
<Button
variant="destructive"
onClick={() => removeCredentials(cred.provider, cred.id)}
>
<Trash2Icon className="mr-1.5 size-4" /> Delete
</Button>
)}
</TableCell>
</TableRow>
))}

View File

@@ -7046,6 +7046,16 @@
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Title"
},
"is_managed": {
"type": "boolean",
"title": "Is Managed",
"default": false
},
"metadata": {
"additionalProperties": true,
"type": "object",
"title": "Metadata"
},
"type": {
"type": "string",
"const": "api_key",
@@ -8661,6 +8671,11 @@
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Host",
"description": "Host pattern for host-scoped or MCP server URL for MCP credentials"
},
"is_managed": {
"type": "boolean",
"title": "Is Managed",
"default": false
}
},
"type": "object",
@@ -9892,6 +9907,16 @@
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Title"
},
"is_managed": {
"type": "boolean",
"title": "Is Managed",
"default": false
},
"metadata": {
"additionalProperties": true,
"type": "object",
"title": "Metadata"
},
"type": {
"type": "string",
"const": "host_scoped",
@@ -10985,6 +11010,16 @@
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Title"
},
"is_managed": {
"type": "boolean",
"title": "Is Managed",
"default": false
},
"metadata": {
"additionalProperties": true,
"type": "object",
"title": "Metadata"
},
"type": {
"type": "string",
"const": "oauth2",
@@ -11020,11 +11055,6 @@
"items": { "type": "string" },
"type": "array",
"title": "Scopes"
},
"metadata": {
"additionalProperties": true,
"type": "object",
"title": "Metadata"
}
},
"type": "object",
@@ -14685,6 +14715,16 @@
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Title"
},
"is_managed": {
"type": "boolean",
"title": "Is Managed",
"default": false
},
"metadata": {
"additionalProperties": true,
"type": "object",
"title": "Metadata"
},
"type": {
"type": "string",
"const": "user_password",

View File

@@ -614,6 +614,7 @@ export type CredentialsMetaResponse = {
username?: string;
host?: string;
is_system?: boolean;
is_managed?: boolean;
};
/* Mirror of backend/api/features/integrations/router.py:CredentialsDeletionResponse */