remove verify_signature and add HMACValidator

This commit is contained in:
Bentlybro
2025-03-25 13:44:41 +00:00
parent 870e6a901a
commit 22413a3cc7
2 changed files with 92 additions and 17 deletions

View File

@@ -1,6 +1,8 @@
import inspect
import logging
from typing import Any, Callable, Optional
import hmac
import hashlib
from fastapi import HTTPException, Request, Security
from fastapi.security import APIKeyHeader, HTTPBearer
@@ -133,3 +135,85 @@ class APIKeyValidator:
# This helps FastAPI recognize it as a security dependency
validate_api_key.__name__ = f"validate_{self.security_scheme.model.name}"
return validate_api_key
class HMACValidator:
"""
Configurable HMAC-based validator for FastAPI applications.
This class is useful for validating signed requests such as webhooks,
where the signature is computed using HMAC SHA256 and sent in a request header.
It compares the provided signature to a computed one using a shared secret and the raw request body.
Examples:
Basic usage:
```python
validator = HMACValidator(
header_name="X-Signature",
secret="your-shared-secret"
)
@app.post("/webhook", dependencies=[Depends(validator.get_dependency())])
async def webhook_handler():
return {"status": "ok"}
```
Custom integration:
```python
validator = HMACValidator(
header_name="X-Custom-Signature",
secret=secrets.webhook_secret
)
@router.post("/custom-endpoint")
async def handler(
_ = Depends(validator.get_dependency())
):
...
```
Args:
header_name (str): The name of the request header containing the HMAC signature.
secret (str): The shared secret used to compute the HMAC hash.
error_status (int): HTTP status code to return when validation fails.
error_message (str): Error message to return when validation fails.
"""
def __init__(
self,
header_name: str,
secret: str,
error_status: int = HTTP_401_UNAUTHORIZED,
error_message: str = "Invalid HMAC signature"
):
self.secret = secret
self.header = APIKeyHeader(name=header_name)
self.error_status = error_status
self.error_message = error_message
async def __call__(
self,
request: Request,
signature: str = Security(APIKeyHeader(name="X-Signature"))
) -> bool:
body = await request.body()
computed_signature = hmac.new(
self.secret.encode(),
body,
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(computed_signature, signature):
raise HTTPException(
status_code=self.error_status,
detail=self.error_message
)
return True
def get_dependency(self):
async def validate_signature(request: Request, signature: str = Security(self.header)) -> bool:
return await self(request, signature)
validate_signature.__name__ = f"validate_{self.header.model.name}"
return validate_signature

View File

@@ -2,26 +2,23 @@ import hmac
import hashlib
import logging
from typing import Dict, Any
from fastapi import APIRouter, Request, Response, HTTPException, Header
from fastapi import APIRouter, Request, Response, HTTPException, Header, Depends
from backend.util.settings import Settings
from backend.util.service import get_service_client
from backend.executor import ExecutionManager
from .models import EventType, IffyWebhookEvent
from autogpt_libs.auth.middleware import HMACValidator
logger = logging.getLogger(__name__)
settings = Settings()
iffy_router = APIRouter()
async def verify_signature(body: bytes, signature: str) -> bool:
"""Verify the Iffy webhook signature using HMAC SHA256"""
WEBHOOK_SECRET = settings.secrets.iffy_webhook_secret
computed_hash = hmac.new(
WEBHOOK_SECRET.encode(),
body,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(computed_hash, signature)
iffy_signature_validator = HMACValidator(
header_name="X-Signature",
secret=settings.secrets.iffy_webhook_secret,
error_message="Invalid Iffy signature"
)
# This handles the webhook events from iffy like stopping an execution if a flagged block is detected.
async def handle_record_event(event_type: EventType, metadata: Dict[str, Any]) -> Response:
@@ -92,15 +89,9 @@ async def handle_user_event(event_type: EventType, payload: Dict[str, Any]) -> R
@iffy_router.post("/webhook")
async def handle_iffy_webhook(
request: Request,
x_signature: str = Header(..., alias="X-Signature")
_ = Depends(iffy_signature_validator.get_dependency())
) -> Response:
"""Handle incoming webhook events from Iffy"""
body = await request.body()
if not await verify_signature(body, x_signature):
logger.error("Invalid Iffy webhook signature")
raise HTTPException(status_code=401, detail="Invalid signature")
try:
event_data = IffyWebhookEvent.model_validate_json(body)
except Exception as e: