fix(backend/external-api): Improve security & reliability of API key storage (#10796)

Our API key generation, storage, and verification system has a couple of
issues that need to be ironed out before full-scale deployment.

### Changes 🏗️

- Move from unsalted SHA256 to salted Scrypt hashing for API keys
- Avoid false-negative API key validation due to prefix collision
- Refactor API key management code for clarity
- [refactor(backend): Clean up API key DB & API code
(#10797)](https://github.com/Significant-Gravitas/AutoGPT/pull/10797)
  - Rename models and properties in `backend.data.api_key` for clarity
- Eliminate redundant/custom/boilerplate error handling/wrapping in API
key endpoint call stack
- Remove redundant/inaccurate `response_model` declarations from API key
endpoints

Dependencies for `autogpt_libs`:
- Add `cryptography` as a dependency
- Add `pyright` as a dev dependency

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - Performing these actions through the UI (still) works:
    - [x] Creating an API key
    - [x] Listing owned API keys
    - [x] Deleting an owned API key
  - [x] Newly created API key can be used in Swagger UI
  - [x] Existing API key can be used in Swagger UI
  - [x] Existing API key is re-encrypted with salt on use
This commit is contained in:
Reinier van der Leer
2025-09-10 11:34:49 +02:00
committed by GitHub
parent 986245ec43
commit 2ffd249aac
19 changed files with 484 additions and 474 deletions

View File

@@ -1,35 +0,0 @@
import hashlib
import secrets
from typing import NamedTuple
class APIKeyContainer(NamedTuple):
"""Container for API key parts."""
raw: str
prefix: str
postfix: str
hash: str
class APIKeyManager:
PREFIX: str = "agpt_"
PREFIX_LENGTH: int = 8
POSTFIX_LENGTH: int = 8
def generate_api_key(self) -> APIKeyContainer:
"""Generate a new API key with all its parts."""
raw_key = f"{self.PREFIX}{secrets.token_urlsafe(32)}"
return APIKeyContainer(
raw=raw_key,
prefix=raw_key[: self.PREFIX_LENGTH],
postfix=raw_key[-self.POSTFIX_LENGTH :],
hash=hashlib.sha256(raw_key.encode()).hexdigest(),
)
def verify_api_key(self, provided_key: str, stored_hash: str) -> bool:
"""Verify if a provided API key matches the stored hash."""
if not provided_key.startswith(self.PREFIX):
return False
provided_hash = hashlib.sha256(provided_key.encode()).hexdigest()
return secrets.compare_digest(provided_hash, stored_hash)

View File

@@ -0,0 +1,78 @@
import hashlib
import secrets
from typing import NamedTuple
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
class APIKeyContainer(NamedTuple):
"""Container for API key parts."""
key: str
head: str
tail: str
hash: str
salt: str
class APIKeySmith:
PREFIX: str = "agpt_"
HEAD_LENGTH: int = 8
TAIL_LENGTH: int = 8
def generate_key(self) -> APIKeyContainer:
"""Generate a new API key with secure hashing."""
raw_key = f"{self.PREFIX}{secrets.token_urlsafe(32)}"
hash, salt = self.hash_key(raw_key)
return APIKeyContainer(
key=raw_key,
head=raw_key[: self.HEAD_LENGTH],
tail=raw_key[-self.TAIL_LENGTH :],
hash=hash,
salt=salt,
)
def verify_key(
self, provided_key: str, known_hash: str, known_salt: str | None = None
) -> bool:
"""
Verify an API key against a known hash (+ salt).
Supports verifying both legacy SHA256 and secure Scrypt hashes.
"""
if not provided_key.startswith(self.PREFIX):
return False
# Handle legacy SHA256 hashes (migration support)
if known_salt is None:
legacy_hash = hashlib.sha256(provided_key.encode()).hexdigest()
return secrets.compare_digest(legacy_hash, known_hash)
try:
salt_bytes = bytes.fromhex(known_salt)
provided_hash = self._hash_key_with_salt(provided_key, salt_bytes)
return secrets.compare_digest(provided_hash, known_hash)
except (ValueError, TypeError):
return False
def hash_key(self, raw_key: str) -> tuple[str, str]:
"""Migrate a legacy hash to secure hash format."""
salt = self._generate_salt()
hash = self._hash_key_with_salt(raw_key, salt)
return hash, salt.hex()
def _generate_salt(self) -> bytes:
"""Generate a random salt for hashing."""
return secrets.token_bytes(32)
def _hash_key_with_salt(self, raw_key: str, salt: bytes) -> str:
"""Hash API key using Scrypt with salt."""
kdf = Scrypt(
length=32,
salt=salt,
n=2**14, # CPU/memory cost parameter
r=8, # Block size parameter
p=1, # Parallelization parameter
)
key_hash = kdf.derive(raw_key.encode())
return key_hash.hex()

View File

@@ -0,0 +1,79 @@
import hashlib
from autogpt_libs.api_key.keysmith import APIKeySmith
def test_generate_api_key():
keysmith = APIKeySmith()
key = keysmith.generate_key()
assert key.key.startswith(keysmith.PREFIX)
assert key.head == key.key[: keysmith.HEAD_LENGTH]
assert key.tail == key.key[-keysmith.TAIL_LENGTH :]
assert len(key.hash) == 64 # 32 bytes hex encoded
assert len(key.salt) == 64 # 32 bytes hex encoded
def test_verify_new_secure_key():
keysmith = APIKeySmith()
key = keysmith.generate_key()
# Test correct key validates
assert keysmith.verify_key(key.key, key.hash, key.salt) is True
# Test wrong key fails
wrong_key = f"{keysmith.PREFIX}wrongkey123"
assert keysmith.verify_key(wrong_key, key.hash, key.salt) is False
def test_verify_legacy_key():
keysmith = APIKeySmith()
legacy_key = f"{keysmith.PREFIX}legacykey123"
legacy_hash = hashlib.sha256(legacy_key.encode()).hexdigest()
# Test legacy key validates without salt
assert keysmith.verify_key(legacy_key, legacy_hash) is True
# Test wrong legacy key fails
wrong_key = f"{keysmith.PREFIX}wronglegacy"
assert keysmith.verify_key(wrong_key, legacy_hash) is False
def test_rehash_existing_key():
keysmith = APIKeySmith()
legacy_key = f"{keysmith.PREFIX}migratekey123"
# Migrate the legacy key
new_hash, new_salt = keysmith.hash_key(legacy_key)
# Verify migrated key works
assert keysmith.verify_key(legacy_key, new_hash, new_salt) is True
# Verify different key fails with migrated hash
wrong_key = f"{keysmith.PREFIX}wrongkey"
assert keysmith.verify_key(wrong_key, new_hash, new_salt) is False
def test_invalid_key_prefix():
keysmith = APIKeySmith()
key = keysmith.generate_key()
# Test key without proper prefix fails
invalid_key = "invalid_prefix_key"
assert keysmith.verify_key(invalid_key, key.hash, key.salt) is False
def test_secure_hash_requires_salt():
keysmith = APIKeySmith()
key = keysmith.generate_key()
# Secure hash without salt should fail
assert keysmith.verify_key(key.key, key.hash) is False
def test_invalid_salt_format():
keysmith = APIKeySmith()
key = keysmith.generate_key()
# Invalid salt format should fail gracefully
assert keysmith.verify_key(key.key, key.hash, "invalid_hex") is False

View File

@@ -1002,6 +1002,18 @@ dynamodb = ["boto3 (>=1.9.71)"]
redis = ["redis (>=2.10.5)"]
test-filesource = ["pyyaml (>=5.3.1)", "watchdog (>=3.0.0)"]
[[package]]
name = "nodeenv"
version = "1.9.1"
description = "Node.js virtual environment builder"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7"
groups = ["dev"]
files = [
{file = "nodeenv-1.9.1-py2.py3-none-any.whl", hash = "sha256:ba11c9782d29c27c70ffbdda2d7415098754709be8a7056d79a737cd901155c9"},
{file = "nodeenv-1.9.1.tar.gz", hash = "sha256:6ec12890a2dab7946721edbfbcd91f3319c6ccc9aec47be7c7e6b7011ee6645f"},
]
[[package]]
name = "opentelemetry-api"
version = "1.35.0"
@@ -1347,6 +1359,27 @@ files = [
{file = "pyrfc3339-2.0.1.tar.gz", hash = "sha256:e47843379ea35c1296c3b6c67a948a1a490ae0584edfcbdea0eaffb5dd29960b"},
]
[[package]]
name = "pyright"
version = "1.1.404"
description = "Command line wrapper for pyright"
optional = false
python-versions = ">=3.7"
groups = ["dev"]
files = [
{file = "pyright-1.1.404-py3-none-any.whl", hash = "sha256:c7b7ff1fdb7219c643079e4c3e7d4125f0dafcc19d253b47e898d130ea426419"},
{file = "pyright-1.1.404.tar.gz", hash = "sha256:455e881a558ca6be9ecca0b30ce08aa78343ecc031d37a198ffa9a7a1abeb63e"},
]
[package.dependencies]
nodeenv = ">=1.6.0"
typing-extensions = ">=4.1"
[package.extras]
all = ["nodejs-wheel-binaries", "twine (>=3.4.1)"]
dev = ["twine (>=3.4.1)"]
nodejs = ["nodejs-wheel-binaries"]
[[package]]
name = "pytest"
version = "8.4.1"
@@ -1740,7 +1773,6 @@ files = [
{file = "typing_extensions-4.14.1-py3-none-any.whl", hash = "sha256:d1e1e3b58374dc93031d6eda2420a48ea44a36c2b4766a4fdeb3710755731d76"},
{file = "typing_extensions-4.14.1.tar.gz", hash = "sha256:38b39f4aeeab64884ce9f74c94263ef78f3c22467c8724005483154c26648d36"},
]
markers = {dev = "python_version < \"3.11\""}
[[package]]
name = "typing-inspection"
@@ -1897,4 +1929,4 @@ type = ["pytest-mypy"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<4.0"
content-hash = "d841f62f95180f6ad63ce82ed8e62aa201b9bf89242cc9299ae0f26ff1f72136"
content-hash = "0c40b63c3c921846cf05ccfb4e685d4959854b29c2c302245f9832e20aac6954"

View File

@@ -9,6 +9,7 @@ packages = [{ include = "autogpt_libs" }]
[tool.poetry.dependencies]
python = ">=3.10,<4.0"
colorama = "^0.4.6"
cryptography = "^45.0"
expiringdict = "^1.2.2"
fastapi = "^0.116.1"
google-cloud-logging = "^3.12.1"
@@ -21,11 +22,12 @@ supabase = "^2.16.0"
uvicorn = "^0.35.0"
[tool.poetry.group.dev.dependencies]
ruff = "^0.12.11"
pyright = "^1.1.404"
pytest = "^8.4.1"
pytest-asyncio = "^1.1.0"
pytest-mock = "^3.14.1"
pytest-cov = "^6.2.1"
ruff = "^0.12.11"
[build-system]
requires = ["poetry-core"]