add fastapi auth

This commit is contained in:
Swifty
2025-12-20 22:51:40 +01:00
parent 217e3718d7
commit eda0f16cb9
12 changed files with 3452 additions and 0 deletions

View File

@@ -0,0 +1,58 @@
"""
Native authentication module for AutoGPT Platform.
This module provides authentication functionality that replaces Supabase Auth,
including:
- Password hashing with Argon2id
- JWT token generation and validation
- Magic links for email verification and password reset
- Email service for auth-related emails
- User migration from Supabase
Usage:
from backend.data.auth.password import hash_password, verify_password
from backend.data.auth.tokens import create_access_token, create_token_pair
from backend.data.auth.magic_links import create_password_reset_link
from backend.data.auth.email_service import get_auth_email_service
"""
from backend.data.auth.email_service import AuthEmailService, get_auth_email_service
from backend.data.auth.magic_links import (
MagicLinkPurpose,
create_email_verification_link,
create_password_reset_link,
verify_email_token,
verify_password_reset_token,
)
from backend.data.auth.password import hash_password, needs_rehash, verify_password
from backend.data.auth.tokens import (
TokenPair,
create_access_token,
create_token_pair,
decode_access_token,
revoke_all_user_refresh_tokens,
validate_refresh_token,
)
__all__ = [
# Password
"hash_password",
"verify_password",
"needs_rehash",
# Tokens
"TokenPair",
"create_access_token",
"create_token_pair",
"decode_access_token",
"validate_refresh_token",
"revoke_all_user_refresh_tokens",
# Magic Links
"MagicLinkPurpose",
"create_email_verification_link",
"create_password_reset_link",
"verify_email_token",
"verify_password_reset_token",
# Email Service
"AuthEmailService",
"get_auth_email_service",
]

View File

@@ -0,0 +1,271 @@
"""
Email service for authentication flows.
Uses Postmark to send transactional emails for:
- Email verification
- Password reset
- Account security notifications
"""
import logging
import pathlib
from typing import Optional
from jinja2 import Template
from postmarker.core import PostmarkClient
from backend.util.settings import Settings
logger = logging.getLogger(__name__)
settings = Settings()
# Template directory
TEMPLATE_DIR = pathlib.Path(__file__).parent / "templates"
class AuthEmailService:
"""Email service for authentication-related emails."""
def __init__(self):
if settings.secrets.postmark_server_api_token:
self.postmark = PostmarkClient(
server_token=settings.secrets.postmark_server_api_token
)
self.enabled = True
else:
logger.warning(
"Postmark server API token not found, auth emails disabled"
)
self.postmark = None
self.enabled = False
self.sender_email = settings.config.postmark_sender_email
self.frontend_url = (
settings.config.frontend_base_url or settings.config.platform_base_url
)
def _send_email(
self,
to_email: str,
subject: str,
html_body: str,
) -> bool:
"""
Send an email via Postmark.
Returns True if sent successfully, False otherwise.
"""
if not self.enabled or not self.postmark:
logger.warning(f"Email not sent (disabled): {subject} to {to_email}")
return False
try:
self.postmark.emails.send(
From=self.sender_email,
To=to_email,
Subject=subject,
HtmlBody=html_body,
)
logger.info(f"Auth email sent: {subject} to {to_email}")
return True
except Exception as e:
logger.error(f"Failed to send auth email: {e}")
return False
def send_verification_email(self, email: str, token: str) -> bool:
"""
Send email verification link.
Args:
email: Recipient email address
token: Verification token
Returns:
True if sent successfully
"""
verify_url = f"{self.frontend_url}/auth/verify-email?token={token}"
subject = "Verify your email address"
html_body = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
body {{ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; line-height: 1.6; color: #333; }}
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
.button {{ display: inline-block; padding: 12px 24px; background-color: #5046e5; color: white; text-decoration: none; border-radius: 6px; font-weight: 500; }}
.footer {{ margin-top: 30px; font-size: 12px; color: #666; }}
</style>
</head>
<body>
<div class="container">
<h2>Verify your email address</h2>
<p>Thanks for signing up! Please verify your email address by clicking the button below:</p>
<p style="margin: 30px 0;">
<a href="{verify_url}" class="button">Verify Email</a>
</p>
<p>Or copy and paste this link into your browser:</p>
<p style="word-break: break-all; color: #666;">{verify_url}</p>
<p>This link will expire in 24 hours.</p>
<div class="footer">
<p>If you didn't create an account, you can safely ignore this email.</p>
</div>
</div>
</body>
</html>
"""
return self._send_email(email, subject, html_body)
def send_password_reset_email(self, email: str, token: str) -> bool:
"""
Send password reset link.
Args:
email: Recipient email address
token: Password reset token
Returns:
True if sent successfully
"""
reset_url = f"{self.frontend_url}/reset-password?token={token}"
subject = "Reset your password"
html_body = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
body {{ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; line-height: 1.6; color: #333; }}
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
.button {{ display: inline-block; padding: 12px 24px; background-color: #5046e5; color: white; text-decoration: none; border-radius: 6px; font-weight: 500; }}
.warning {{ background-color: #fef3c7; border: 1px solid #f59e0b; padding: 12px; border-radius: 6px; margin: 20px 0; }}
.footer {{ margin-top: 30px; font-size: 12px; color: #666; }}
</style>
</head>
<body>
<div class="container">
<h2>Reset your password</h2>
<p>We received a request to reset your password. Click the button below to choose a new password:</p>
<p style="margin: 30px 0;">
<a href="{reset_url}" class="button">Reset Password</a>
</p>
<p>Or copy and paste this link into your browser:</p>
<p style="word-break: break-all; color: #666;">{reset_url}</p>
<div class="warning">
<strong>This link will expire in 15 minutes.</strong>
</div>
<div class="footer">
<p>If you didn't request a password reset, you can safely ignore this email. Your password will remain unchanged.</p>
</div>
</div>
</body>
</html>
"""
return self._send_email(email, subject, html_body)
def send_password_changed_notification(self, email: str) -> bool:
"""
Send notification that password was changed.
Args:
email: Recipient email address
Returns:
True if sent successfully
"""
subject = "Your password was changed"
html_body = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
body {{ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; line-height: 1.6; color: #333; }}
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
.warning {{ background-color: #fee2e2; border: 1px solid #ef4444; padding: 12px; border-radius: 6px; margin: 20px 0; }}
.footer {{ margin-top: 30px; font-size: 12px; color: #666; }}
</style>
</head>
<body>
<div class="container">
<h2>Password Changed</h2>
<p>Your password was successfully changed.</p>
<div class="warning">
<strong>If you didn't make this change</strong>, please contact support immediately and reset your password.
</div>
<div class="footer">
<p>This is an automated security notification.</p>
</div>
</div>
</body>
</html>
"""
return self._send_email(email, subject, html_body)
def send_migrated_user_password_reset(self, email: str, token: str) -> bool:
"""
Send password reset email for users migrated from Supabase.
Args:
email: Recipient email address
token: Password reset token
Returns:
True if sent successfully
"""
reset_url = f"{self.frontend_url}/reset-password?token={token}"
subject = "Action Required: Set your password"
html_body = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
body {{ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; line-height: 1.6; color: #333; }}
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
.button {{ display: inline-block; padding: 12px 24px; background-color: #5046e5; color: white; text-decoration: none; border-radius: 6px; font-weight: 500; }}
.info {{ background-color: #dbeafe; border: 1px solid #3b82f6; padding: 12px; border-radius: 6px; margin: 20px 0; }}
.footer {{ margin-top: 30px; font-size: 12px; color: #666; }}
</style>
</head>
<body>
<div class="container">
<h2>Set Your Password</h2>
<div class="info">
<strong>We've upgraded our authentication system!</strong>
<p style="margin: 8px 0 0 0;">For enhanced security, please set a new password to continue using your account.</p>
</div>
<p>Click the button below to set your password:</p>
<p style="margin: 30px 0;">
<a href="{reset_url}" class="button">Set Password</a>
</p>
<p>Or copy and paste this link into your browser:</p>
<p style="word-break: break-all; color: #666;">{reset_url}</p>
<p>This link will expire in 24 hours.</p>
<div class="footer">
<p>If you signed up with Google, no action is needed - simply continue signing in with Google.</p>
</div>
</div>
</body>
</html>
"""
return self._send_email(email, subject, html_body)
# Singleton instance
_email_service: Optional[AuthEmailService] = None
def get_auth_email_service() -> AuthEmailService:
"""Get the singleton auth email service instance."""
global _email_service
if _email_service is None:
_email_service = AuthEmailService()
return _email_service

View File

@@ -0,0 +1,253 @@
"""
Magic link service for email verification and password reset.
Magic links are single-use, time-limited tokens sent via email that allow
users to verify their email address or reset their password without entering
the old password.
"""
import hashlib
import logging
import secrets
import uuid
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Optional
from prisma.models import UserAuthMagicLink
from pydantic import BaseModel
logger = logging.getLogger(__name__)
# Magic link TTLs
EMAIL_VERIFICATION_TTL = timedelta(hours=24)
PASSWORD_RESET_TTL = timedelta(minutes=15)
# Token prefix for identification
MAGIC_LINK_PREFIX = "agpt_ml_"
class MagicLinkPurpose(str, Enum):
"""Purpose of the magic link."""
EMAIL_VERIFICATION = "email_verification"
PASSWORD_RESET = "password_reset"
class MagicLinkInfo(BaseModel):
"""Information about a valid magic link."""
email: str
purpose: MagicLinkPurpose
user_id: Optional[str] = None # Set for password reset, not for signup verification
def generate_magic_link_token() -> str:
"""
Generate a cryptographically secure magic link token.
Returns:
A prefixed random token string.
"""
random_bytes = secrets.token_urlsafe(32)
return f"{MAGIC_LINK_PREFIX}{random_bytes}"
def hash_magic_link_token(token: str) -> str:
"""
Hash a magic link token for storage.
Uses SHA256 for deterministic lookup.
Args:
token: The plaintext magic link token.
Returns:
The SHA256 hex digest.
"""
return hashlib.sha256(token.encode()).hexdigest()
async def create_magic_link(
email: str,
purpose: MagicLinkPurpose,
user_id: Optional[str] = None,
) -> str:
"""
Create a magic link token and store it in the database.
Args:
email: The email address associated with the link.
purpose: The purpose of the magic link.
user_id: Optional user ID (for password reset).
Returns:
The plaintext magic link token.
"""
token = generate_magic_link_token()
token_hash = hash_magic_link_token(token)
# Determine TTL based on purpose
if purpose == MagicLinkPurpose.PASSWORD_RESET:
ttl = PASSWORD_RESET_TTL
else:
ttl = EMAIL_VERIFICATION_TTL
expires_at = datetime.now(timezone.utc) + ttl
# Invalidate any existing magic links for this email and purpose
await UserAuthMagicLink.prisma().update_many(
where={
"email": email,
"purpose": purpose.value,
"usedAt": None,
},
data={"usedAt": datetime.now(timezone.utc)}, # Mark as used to invalidate
)
# Create new magic link
await UserAuthMagicLink.prisma().create(
data={
"id": str(uuid.uuid4()),
"tokenHash": token_hash,
"email": email,
"purpose": purpose.value,
"userId": user_id,
"expiresAt": expires_at,
}
)
return token
async def validate_magic_link(
token: str,
expected_purpose: Optional[MagicLinkPurpose] = None,
) -> Optional[MagicLinkInfo]:
"""
Validate a magic link token without consuming it.
Args:
token: The plaintext magic link token.
expected_purpose: Optional expected purpose to validate against.
Returns:
MagicLinkInfo if valid, None otherwise.
"""
token_hash = hash_magic_link_token(token)
where_clause: dict = {
"tokenHash": token_hash,
"usedAt": None,
"expiresAt": {"gt": datetime.now(timezone.utc)},
}
if expected_purpose:
where_clause["purpose"] = expected_purpose.value
db_link = await UserAuthMagicLink.prisma().find_first(where=where_clause)
if not db_link:
return None
return MagicLinkInfo(
email=db_link.email,
purpose=MagicLinkPurpose(db_link.purpose),
user_id=db_link.userId,
)
async def consume_magic_link(
token: str,
expected_purpose: Optional[MagicLinkPurpose] = None,
) -> Optional[MagicLinkInfo]:
"""
Validate and consume a magic link token (single-use).
Args:
token: The plaintext magic link token.
expected_purpose: Optional expected purpose to validate against.
Returns:
MagicLinkInfo if valid and successfully consumed, None otherwise.
"""
# First validate
link_info = await validate_magic_link(token, expected_purpose)
if not link_info:
return None
# Then consume (mark as used)
token_hash = hash_magic_link_token(token)
result = await UserAuthMagicLink.prisma().update_many(
where={
"tokenHash": token_hash,
"usedAt": None,
},
data={"usedAt": datetime.now(timezone.utc)},
)
if result == 0:
# Race condition - link was consumed by another request
logger.warning("Magic link was already consumed (race condition)")
return None
return link_info
async def create_email_verification_link(email: str) -> str:
"""
Create an email verification magic link.
Args:
email: The email address to verify.
Returns:
The plaintext magic link token.
"""
return await create_magic_link(email, MagicLinkPurpose.EMAIL_VERIFICATION)
async def create_password_reset_link(email: str, user_id: str) -> str:
"""
Create a password reset magic link.
Args:
email: The user's email address.
user_id: The user's ID.
Returns:
The plaintext magic link token.
"""
return await create_magic_link(
email, MagicLinkPurpose.PASSWORD_RESET, user_id=user_id
)
async def verify_email_token(token: str) -> Optional[str]:
"""
Verify an email verification token and consume it.
Args:
token: The magic link token.
Returns:
The email address if valid, None otherwise.
"""
link_info = await consume_magic_link(token, MagicLinkPurpose.EMAIL_VERIFICATION)
return link_info.email if link_info else None
async def verify_password_reset_token(token: str) -> Optional[tuple[str, str]]:
"""
Verify a password reset token and consume it.
Args:
token: The magic link token.
Returns:
Tuple of (user_id, email) if valid, None otherwise.
"""
link_info = await consume_magic_link(token, MagicLinkPurpose.PASSWORD_RESET)
if not link_info or not link_info.user_id:
return None
return link_info.user_id, link_info.email

View File

@@ -0,0 +1,441 @@
"""
Migration script for moving users from Supabase Auth to native FastAPI auth.
This script handles:
1. Marking existing users as migrated from Supabase
2. Sending password reset emails to migrated users
3. Tracking migration progress
4. Generating reports
Usage:
# Dry run - see what would happen
python -m backend.data.auth.migration --dry-run
# Mark users as migrated (no emails)
python -m backend.data.auth.migration --mark-migrated
# Send password reset emails to migrated users
python -m backend.data.auth.migration --send-emails --batch-size 100
# Full migration (mark + send emails)
python -m backend.data.auth.migration --full-migration --batch-size 100
"""
import argparse
import asyncio
import csv
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from prisma.models import User
from backend.data.auth.email_service import get_auth_email_service
from backend.data.auth.magic_links import create_password_reset_link
from backend.data.db import connect, disconnect
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
class MigrationStats:
"""Track migration statistics."""
def __init__(self):
self.total_users = 0
self.already_migrated = 0
self.marked_migrated = 0
self.emails_sent = 0
self.emails_failed = 0
self.oauth_users_skipped = 0
self.errors = []
def __str__(self):
return f"""
Migration Statistics:
---------------------
Total users processed: {self.total_users}
Already migrated: {self.already_migrated}
Newly marked as migrated: {self.marked_migrated}
Password reset emails sent: {self.emails_sent}
Email failures: {self.emails_failed}
OAuth users skipped: {self.oauth_users_skipped}
Errors: {len(self.errors)}
"""
async def get_users_to_migrate(
batch_size: int = 100,
offset: int = 0,
) -> list[User]:
"""
Get users that need to be migrated.
Returns users where:
- authProvider is "supabase" or NULL
- migratedFromSupabase is False or NULL
- passwordHash is NULL (they haven't set a native password)
"""
users = await User.prisma().find_many(
where={
"OR": [
{"authProvider": "supabase"},
{"authProvider": None},
],
"migratedFromSupabase": False,
"passwordHash": None,
},
take=batch_size,
skip=offset,
order={"createdAt": "asc"},
)
return users
async def get_migrated_users_needing_email(
batch_size: int = 100,
offset: int = 0,
) -> list[User]:
"""
Get migrated users who haven't set their password yet.
These users need a password reset email.
"""
users = await User.prisma().find_many(
where={
"migratedFromSupabase": True,
"passwordHash": None,
"authProvider": {"not": "google"}, # Skip OAuth users
},
take=batch_size,
skip=offset,
order={"createdAt": "asc"},
)
return users
async def mark_user_as_migrated(user: User, dry_run: bool = False) -> bool:
"""
Mark a user as migrated from Supabase.
Sets migratedFromSupabase=True and authProvider="supabase".
"""
if dry_run:
logger.info(f"[DRY RUN] Would mark user {user.id} ({user.email}) as migrated")
return True
try:
await User.prisma().update(
where={"id": user.id},
data={
"migratedFromSupabase": True,
"authProvider": "supabase",
},
)
logger.info(f"Marked user {user.id} ({user.email}) as migrated")
return True
except Exception as e:
logger.error(f"Failed to mark user {user.id} as migrated: {e}")
return False
async def send_migration_email(
user: User,
email_service,
dry_run: bool = False,
) -> bool:
"""
Send a password reset email to a migrated user.
"""
if dry_run:
logger.info(f"[DRY RUN] Would send migration email to {user.email}")
return True
try:
token = await create_password_reset_link(user.email, user.id)
success = email_service.send_migrated_user_password_reset(user.email, token)
if success:
logger.info(f"Sent migration email to {user.email}")
else:
logger.warning(f"Failed to send migration email to {user.email}")
return success
except Exception as e:
logger.error(f"Error sending migration email to {user.email}: {e}")
return False
async def run_migration(
mark_migrated: bool = False,
send_emails: bool = False,
batch_size: int = 100,
dry_run: bool = False,
email_delay: float = 0.5, # Delay between emails to avoid rate limiting
) -> MigrationStats:
"""
Run the migration process.
Args:
mark_migrated: Mark users as migrated from Supabase
send_emails: Send password reset emails to migrated users
batch_size: Number of users to process at a time
dry_run: If True, don't make any changes
email_delay: Seconds to wait between sending emails
Returns:
MigrationStats with results
"""
stats = MigrationStats()
email_service = get_auth_email_service() if send_emails else None
# Phase 1: Mark users as migrated
if mark_migrated:
logger.info("Phase 1: Marking users as migrated...")
offset = 0
while True:
users = await get_users_to_migrate(batch_size, offset)
if not users:
break
for user in users:
stats.total_users += 1
# Skip OAuth users
if user.authProvider == "google":
stats.oauth_users_skipped += 1
continue
success = await mark_user_as_migrated(user, dry_run)
if success:
stats.marked_migrated += 1
else:
stats.errors.append(f"Failed to mark {user.email}")
offset += batch_size
logger.info(f"Processed {offset} users...")
# Phase 2: Send password reset emails
if send_emails:
logger.info("Phase 2: Sending password reset emails...")
offset = 0
while True:
users = await get_migrated_users_needing_email(batch_size, offset)
if not users:
break
for user in users:
stats.total_users += 1
success = await send_migration_email(user, email_service, dry_run)
if success:
stats.emails_sent += 1
else:
stats.emails_failed += 1
stats.errors.append(f"Failed to email {user.email}")
# Rate limiting
if not dry_run and email_delay > 0:
await asyncio.sleep(email_delay)
offset += batch_size
logger.info(f"Processed {offset} users for email...")
return stats
async def generate_migration_report(output_path: Optional[str] = None) -> str:
"""
Generate a CSV report of all users and their migration status.
"""
if output_path is None:
output_path = f"migration_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
users = await User.prisma().find_many(
order={"createdAt": "asc"},
)
with open(output_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow([
"user_id",
"email",
"auth_provider",
"migrated_from_supabase",
"has_password",
"email_verified",
"created_at",
"needs_action",
])
for user in users:
needs_action = (
user.migratedFromSupabase
and user.passwordHash is None
and user.authProvider != "google"
)
writer.writerow([
user.id,
user.email,
user.authProvider or "unknown",
user.migratedFromSupabase,
user.passwordHash is not None,
user.emailVerified,
user.createdAt.isoformat() if user.createdAt else "",
"YES" if needs_action else "NO",
])
logger.info(f"Report saved to {output_path}")
return output_path
async def count_migration_status():
"""
Get counts of users in different migration states.
"""
total = await User.prisma().count()
already_native = await User.prisma().count(
where={"authProvider": "password", "passwordHash": {"not": None}}
)
oauth_users = await User.prisma().count(
where={"authProvider": "google"}
)
migrated_pending = await User.prisma().count(
where={
"migratedFromSupabase": True,
"passwordHash": None,
"authProvider": {"not": "google"},
}
)
not_migrated = await User.prisma().count(
where={
"migratedFromSupabase": False,
"authProvider": {"in": ["supabase", None]},
}
)
return {
"total": total,
"already_native": already_native,
"oauth_users": oauth_users,
"migrated_pending_password": migrated_pending,
"not_yet_migrated": not_migrated,
}
async def main():
parser = argparse.ArgumentParser(
description="Migrate users from Supabase Auth to native FastAPI auth"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Don't make any changes, just show what would happen",
)
parser.add_argument(
"--mark-migrated",
action="store_true",
help="Mark existing Supabase users as migrated",
)
parser.add_argument(
"--send-emails",
action="store_true",
help="Send password reset emails to migrated users",
)
parser.add_argument(
"--full-migration",
action="store_true",
help="Run full migration (mark + send emails)",
)
parser.add_argument(
"--batch-size",
type=int,
default=100,
help="Number of users to process at a time (default: 100)",
)
parser.add_argument(
"--email-delay",
type=float,
default=0.5,
help="Seconds to wait between emails (default: 0.5)",
)
parser.add_argument(
"--report",
action="store_true",
help="Generate a CSV report of migration status",
)
parser.add_argument(
"--status",
action="store_true",
help="Show current migration status counts",
)
args = parser.parse_args()
# Connect to database
await connect()
try:
if args.status:
counts = await count_migration_status()
print("\nMigration Status:")
print("-" * 40)
print(f"Total users: {counts['total']}")
print(f"Already using native auth: {counts['already_native']}")
print(f"OAuth users (Google): {counts['oauth_users']}")
print(f"Migrated, pending password: {counts['migrated_pending_password']}")
print(f"Not yet migrated: {counts['not_yet_migrated']}")
return
if args.report:
await generate_migration_report()
return
if args.full_migration:
args.mark_migrated = True
args.send_emails = True
if not args.mark_migrated and not args.send_emails:
parser.print_help()
print("\nError: Must specify --mark-migrated, --send-emails, --full-migration, --report, or --status")
return
if args.dry_run:
logger.info("=" * 50)
logger.info("DRY RUN MODE - No changes will be made")
logger.info("=" * 50)
stats = await run_migration(
mark_migrated=args.mark_migrated,
send_emails=args.send_emails,
batch_size=args.batch_size,
dry_run=args.dry_run,
email_delay=args.email_delay,
)
print(stats)
if stats.errors:
print("\nErrors encountered:")
for error in stats.errors[:10]: # Show first 10 errors
print(f" - {error}")
if len(stats.errors) > 10:
print(f" ... and {len(stats.errors) - 10} more")
finally:
await disconnect()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,70 @@
"""
Password hashing service using Argon2id.
OWASP 2024 recommended configuration:
- time_cost: 2 iterations
- memory_cost: 19456 KiB (19 MiB)
- parallelism: 1
"""
import logging
from argon2 import PasswordHasher
from argon2.exceptions import InvalidHashError, VerifyMismatchError
from argon2.profiles import RFC_9106_LOW_MEMORY
logger = logging.getLogger(__name__)
# Use RFC 9106 low-memory profile (OWASP recommended)
# time_cost=2, memory_cost=19456, parallelism=1
_hasher = PasswordHasher.from_parameters(RFC_9106_LOW_MEMORY)
def hash_password(password: str) -> str:
"""
Hash a password using Argon2id.
Args:
password: The plaintext password to hash.
Returns:
The hashed password string (includes algorithm params and salt).
"""
return _hasher.hash(password)
def verify_password(password_hash: str, password: str) -> bool:
"""
Verify a password against a hash.
Args:
password_hash: The stored password hash.
password: The plaintext password to verify.
Returns:
True if the password matches, False otherwise.
"""
try:
_hasher.verify(password_hash, password)
return True
except VerifyMismatchError:
return False
except InvalidHashError:
logger.warning("Invalid password hash format encountered")
return False
def needs_rehash(password_hash: str) -> bool:
"""
Check if a password hash needs to be rehashed.
This returns True if the hash was created with different parameters
than the current configuration, allowing for transparent upgrades.
Args:
password_hash: The stored password hash.
Returns:
True if the hash should be rehashed, False otherwise.
"""
return _hasher.check_needs_rehash(password_hash)

View File

@@ -0,0 +1,270 @@
"""
JWT token generation and validation for user authentication.
This module generates tokens compatible with Supabase JWT format to ensure
a smooth migration without requiring frontend changes.
"""
import hashlib
import logging
import secrets
import uuid
from datetime import datetime, timedelta, timezone
from typing import Optional
import jwt
from prisma.models import UserAuthRefreshToken
from pydantic import BaseModel
from autogpt_libs.auth.config import get_settings
logger = logging.getLogger(__name__)
# Token TTLs
ACCESS_TOKEN_TTL = timedelta(hours=1)
REFRESH_TOKEN_TTL = timedelta(days=30)
# Refresh token prefix for identification
REFRESH_TOKEN_PREFIX = "agpt_rt_"
class TokenPair(BaseModel):
"""Access and refresh token pair."""
access_token: str
refresh_token: str
expires_in: int # seconds until access token expires
token_type: str = "bearer"
class JWTPayload(BaseModel):
"""JWT payload structure matching Supabase format."""
sub: str # user ID
email: str
phone: str = ""
role: str = "authenticated"
aud: str = "authenticated"
iat: int # issued at (unix timestamp)
exp: int # expiration (unix timestamp)
def create_access_token(
user_id: str,
email: str,
role: str = "authenticated",
phone: str = "",
) -> str:
"""
Create a JWT access token.
The token format matches Supabase JWT structure so existing backend
validation code continues to work without modification.
Args:
user_id: The user's UUID.
email: The user's email address.
role: The user's role (default: "authenticated").
phone: The user's phone number (optional).
Returns:
The encoded JWT token string.
"""
settings = get_settings()
now = datetime.now(timezone.utc)
payload = {
"sub": user_id,
"email": email,
"phone": phone,
"role": role,
"aud": "authenticated",
"iat": int(now.timestamp()),
"exp": int((now + ACCESS_TOKEN_TTL).timestamp()),
}
return jwt.encode(payload, settings.JWT_VERIFY_KEY, algorithm=settings.JWT_ALGORITHM)
def decode_access_token(token: str) -> Optional[JWTPayload]:
"""
Decode and validate a JWT access token.
Args:
token: The JWT token string.
Returns:
The decoded payload if valid, None otherwise.
"""
settings = get_settings()
try:
payload = jwt.decode(
token,
settings.JWT_VERIFY_KEY,
algorithms=[settings.JWT_ALGORITHM],
audience="authenticated",
)
return JWTPayload(**payload)
except jwt.ExpiredSignatureError:
logger.debug("Token has expired")
return None
except jwt.InvalidTokenError as e:
logger.debug(f"Invalid token: {e}")
return None
def generate_refresh_token() -> str:
"""
Generate a cryptographically secure refresh token.
Returns:
A prefixed random token string.
"""
random_bytes = secrets.token_urlsafe(32)
return f"{REFRESH_TOKEN_PREFIX}{random_bytes}"
def hash_refresh_token(token: str) -> str:
"""
Hash a refresh token for storage.
Uses SHA256 for deterministic lookup (unlike passwords which use Argon2).
Args:
token: The plaintext refresh token.
Returns:
The SHA256 hex digest.
"""
return hashlib.sha256(token.encode()).hexdigest()
async def create_refresh_token_db(
user_id: str,
token: Optional[str] = None,
) -> tuple[str, datetime]:
"""
Create a refresh token and store it in the database.
Args:
user_id: The user's UUID.
token: Optional pre-generated token (used in OAuth flow).
Returns:
Tuple of (plaintext token, expiration datetime).
"""
if token is None:
token = generate_refresh_token()
token_hash = hash_refresh_token(token)
expires_at = datetime.now(timezone.utc) + REFRESH_TOKEN_TTL
await UserAuthRefreshToken.prisma().create(
data={
"id": str(uuid.uuid4()),
"tokenHash": token_hash,
"userId": user_id,
"expiresAt": expires_at,
}
)
return token, expires_at
async def validate_refresh_token(token: str) -> Optional[str]:
"""
Validate a refresh token and return the associated user ID.
Args:
token: The plaintext refresh token.
Returns:
The user ID if valid, None otherwise.
"""
token_hash = hash_refresh_token(token)
db_token = await UserAuthRefreshToken.prisma().find_first(
where={
"tokenHash": token_hash,
"revokedAt": None,
"expiresAt": {"gt": datetime.now(timezone.utc)},
}
)
if not db_token:
return None
return db_token.userId
async def revoke_refresh_token(token: str) -> bool:
"""
Revoke a refresh token.
Args:
token: The plaintext refresh token.
Returns:
True if a token was revoked, False otherwise.
"""
token_hash = hash_refresh_token(token)
result = await UserAuthRefreshToken.prisma().update_many(
where={
"tokenHash": token_hash,
"revokedAt": None,
},
data={"revokedAt": datetime.now(timezone.utc)},
)
return result > 0
async def revoke_all_user_refresh_tokens(user_id: str) -> int:
"""
Revoke all refresh tokens for a user.
Used for global logout or security events.
Args:
user_id: The user's UUID.
Returns:
Number of tokens revoked.
"""
result = await UserAuthRefreshToken.prisma().update_many(
where={
"userId": user_id,
"revokedAt": None,
},
data={"revokedAt": datetime.now(timezone.utc)},
)
return result
async def create_token_pair(
user_id: str,
email: str,
role: str = "authenticated",
) -> TokenPair:
"""
Create a complete token pair (access + refresh).
Args:
user_id: The user's UUID.
email: The user's email.
role: The user's role.
Returns:
TokenPair with access_token, refresh_token, and metadata.
"""
access_token = create_access_token(user_id, email, role)
refresh_token, _ = await create_refresh_token_db(user_id)
return TokenPair(
access_token=access_token,
refresh_token=refresh_token,
expires_in=int(ACCESS_TOKEN_TTL.total_seconds()),
)

View File

@@ -23,6 +23,7 @@ import backend.data.user
import backend.integrations.webhooks.utils
import backend.server.routers.oauth
import backend.server.routers.postmark.postmark
import backend.server.routers.user_auth
import backend.server.routers.v1
import backend.server.v2.admin.credit_admin_routes
import backend.server.v2.admin.execution_analytics_routes
@@ -303,6 +304,11 @@ app.include_router(
tags=["oauth"],
prefix="/api/oauth",
)
app.include_router(
backend.server.routers.user_auth.router,
tags=["user-auth"],
prefix="/api",
)
app.mount("/external-api", external_app)

View File

@@ -0,0 +1,910 @@
"""
User authentication router for native FastAPI auth.
This router provides endpoints that are compatible with the Supabase Auth API
structure, allowing the frontend to migrate without code changes.
Endpoints:
- POST /auth/signup - Register a new user
- POST /auth/login - Login with email/password
- POST /auth/logout - Logout (clear session)
- POST /auth/refresh - Refresh access token
- GET /auth/me - Get current user
- POST /auth/password/reset - Request password reset email
- POST /auth/password/set - Set new password from reset link
- GET /auth/verify-email - Verify email from magic link
- GET /auth/oauth/google/authorize - Get Google OAuth URL
- GET /auth/oauth/google/callback - Handle Google OAuth callback
Admin Endpoints:
- GET /auth/admin/users - List users (admin only)
- GET /auth/admin/users/{user_id} - Get user details (admin only)
- POST /auth/admin/users/{user_id}/impersonate - Get impersonation token (admin only)
"""
import logging
import os
import secrets
import uuid
from datetime import datetime, timezone
from typing import List, Optional
from urllib.parse import urlencode
import httpx
from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response
from pydantic import BaseModel, EmailStr
from prisma.models import User
from backend.data.auth.email_service import get_auth_email_service
from backend.data.auth.magic_links import (
create_email_verification_link,
create_password_reset_link,
verify_email_token,
verify_password_reset_token,
)
from backend.data.auth.password import hash_password, needs_rehash, verify_password
from backend.data.auth.tokens import (
ACCESS_TOKEN_TTL,
REFRESH_TOKEN_TTL,
create_access_token,
create_refresh_token_db,
decode_access_token,
revoke_all_user_refresh_tokens,
revoke_refresh_token,
validate_refresh_token,
)
from backend.util.settings import Settings
logger = logging.getLogger(__name__)
settings = Settings()
router = APIRouter(prefix="/auth", tags=["user-auth"])
# Cookie configuration
ACCESS_TOKEN_COOKIE = "access_token"
REFRESH_TOKEN_COOKIE = "refresh_token"
OAUTH_STATE_COOKIE = "oauth_state"
# Header for admin impersonation (matches existing autogpt_libs pattern)
IMPERSONATION_HEADER = "X-Act-As-User-Id"
# ============================================================================
# Admin Role Detection
# ============================================================================
def _get_admin_domains() -> set[str]:
"""Get set of email domains that grant admin role."""
domains_str = settings.config.admin_email_domains
if not domains_str:
return set()
return {d.strip().lower() for d in domains_str.split(",") if d.strip()}
def _get_admin_emails() -> set[str]:
"""Get set of specific email addresses that grant admin role."""
emails_str = settings.config.admin_emails
if not emails_str:
return set()
return {e.strip().lower() for e in emails_str.split(",") if e.strip()}
def get_user_role(email: str) -> str:
"""
Determine user role based on email.
Returns "admin" if:
- Email domain is in admin_email_domains list
- Email is in admin_emails list
Otherwise returns "authenticated".
"""
email_lower = email.lower()
domain = email_lower.split("@")[-1] if "@" in email_lower else ""
# Check specific emails first
if email_lower in _get_admin_emails():
return "admin"
# Check domains
if domain in _get_admin_domains():
return "admin"
return "authenticated"
# ============================================================================
# Request/Response Models
# ============================================================================
class SignupRequest(BaseModel):
email: EmailStr
password: str
class LoginRequest(BaseModel):
email: EmailStr
password: str
class PasswordResetRequest(BaseModel):
email: EmailStr
class PasswordSetRequest(BaseModel):
token: str
password: str
class UserResponse(BaseModel):
id: str
email: str
email_verified: bool
name: Optional[str] = None
created_at: datetime
role: Optional[str] = None
@staticmethod
def from_db(user: User, include_role: bool = False) -> "UserResponse":
return UserResponse(
id=user.id,
email=user.email,
email_verified=user.emailVerified,
name=user.name,
created_at=user.createdAt,
role=get_user_role(user.email) if include_role else None,
)
class AuthResponse(BaseModel):
"""Response matching Supabase auth response structure."""
user: UserResponse
access_token: str
refresh_token: str
expires_in: int
token_type: str = "bearer"
class MessageResponse(BaseModel):
message: str
class AdminUserListResponse(BaseModel):
users: List[UserResponse]
total: int
page: int
page_size: int
class ImpersonationResponse(BaseModel):
access_token: str
impersonated_user: UserResponse
expires_in: int
# ============================================================================
# Cookie Helpers
# ============================================================================
def _is_production() -> bool:
return os.getenv("APP_ENV", "local").lower() in ("production", "prod")
def _set_auth_cookies(response: Response, access_token: str, refresh_token: str):
"""Set authentication cookies on the response."""
secure = _is_production()
# Access token: accessible to JavaScript for API calls
response.set_cookie(
key=ACCESS_TOKEN_COOKIE,
value=access_token,
httponly=False, # JS needs access for Authorization header
secure=secure,
samesite="lax",
max_age=int(ACCESS_TOKEN_TTL.total_seconds()),
path="/",
)
# Refresh token: httpOnly, restricted path
response.set_cookie(
key=REFRESH_TOKEN_COOKIE,
value=refresh_token,
httponly=True, # Not accessible to JavaScript
secure=secure,
samesite="strict",
max_age=int(REFRESH_TOKEN_TTL.total_seconds()),
path="/api/auth/refresh", # Only sent to refresh endpoint
)
def _clear_auth_cookies(response: Response):
"""Clear authentication cookies."""
response.delete_cookie(key=ACCESS_TOKEN_COOKIE, path="/")
response.delete_cookie(key=REFRESH_TOKEN_COOKIE, path="/api/auth/refresh")
def _get_access_token(request: Request) -> Optional[str]:
"""Get access token from cookie or Authorization header."""
# Try cookie first
token = request.cookies.get(ACCESS_TOKEN_COOKIE)
if token:
return token
# Try Authorization header
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
return auth_header[7:]
return None
# ============================================================================
# Auth Dependencies
# ============================================================================
async def get_current_user_from_token(request: Request) -> Optional[User]:
"""Get the current user from the access token."""
access_token = _get_access_token(request)
if not access_token:
return None
payload = decode_access_token(access_token)
if not payload:
return None
return await User.prisma().find_unique(where={"id": payload.sub})
async def require_auth(request: Request) -> User:
"""Require authentication - returns user or raises 401."""
user = await get_current_user_from_token(request)
if not user:
raise HTTPException(status_code=401, detail="Not authenticated")
return user
async def require_admin(request: Request) -> User:
"""Require admin authentication - returns user or raises 401/403."""
user = await require_auth(request)
role = get_user_role(user.email)
if role != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
return user
# ============================================================================
# Authentication Endpoints
# ============================================================================
@router.post("/signup", response_model=MessageResponse)
async def signup(data: SignupRequest):
"""
Register a new user.
Returns a message prompting the user to verify their email.
No automatic login until email is verified.
"""
# Check if email already exists
existing = await User.prisma().find_unique(where={"email": data.email})
if existing:
raise HTTPException(status_code=400, detail="Email already registered")
# Validate password strength
if len(data.password) < 8:
raise HTTPException(
status_code=400, detail="Password must be at least 8 characters"
)
# Create user with hashed password
password_hash = hash_password(data.password)
user = await User.prisma().create(
data={
"id": str(uuid.uuid4()),
"email": data.email,
"passwordHash": password_hash,
"authProvider": "password",
"emailVerified": False,
}
)
# Create verification link and send email
token = await create_email_verification_link(data.email)
email_service = get_auth_email_service()
email_sent = email_service.send_verification_email(data.email, token)
if not email_sent:
logger.warning(f"Failed to send verification email to {data.email}")
# Still log the token for development
logger.info(f"Verification token for {data.email}: {token}")
return MessageResponse(
message="Please check your email to verify your account"
)
@router.post("/login", response_model=AuthResponse)
async def login(data: LoginRequest, response: Response):
"""
Login with email and password.
Sets httpOnly cookies for session management.
"""
user = await User.prisma().find_unique(where={"email": data.email})
if not user:
raise HTTPException(status_code=401, detail="Invalid email or password")
# Check if this is a migrated user without password
if user.passwordHash is None:
if user.migratedFromSupabase:
# Send password reset email for migrated user
token = await create_password_reset_link(data.email, user.id)
email_service = get_auth_email_service()
email_service.send_migrated_user_password_reset(data.email, token)
raise HTTPException(
status_code=400,
detail="Please check your email to set your password",
)
else:
# OAuth user trying to login with password
raise HTTPException(
status_code=400,
detail=f"This account uses {user.authProvider} login",
)
# Verify password
if not verify_password(user.passwordHash, data.password):
raise HTTPException(status_code=401, detail="Invalid email or password")
# Check if email is verified
if not user.emailVerified:
raise HTTPException(
status_code=400, detail="Please verify your email before logging in"
)
# Rehash password if needed (transparent security upgrade)
if needs_rehash(user.passwordHash):
new_hash = hash_password(data.password)
await User.prisma().update(
where={"id": user.id}, data={"passwordHash": new_hash}
)
# Create tokens
role = get_user_role(user.email)
access_token = create_access_token(user.id, user.email, role)
refresh_token, _ = await create_refresh_token_db(user.id)
# Set cookies
_set_auth_cookies(response, access_token, refresh_token)
return AuthResponse(
user=UserResponse.from_db(user),
access_token=access_token,
refresh_token=refresh_token,
expires_in=int(ACCESS_TOKEN_TTL.total_seconds()),
)
@router.post("/logout", response_model=MessageResponse)
async def logout(request: Request, response: Response, scope: str = Query("local")):
"""
Logout the current user.
Args:
scope: "local" to clear current session, "global" to revoke all sessions.
"""
# Get refresh token to revoke
refresh_token = request.cookies.get(REFRESH_TOKEN_COOKIE)
if scope == "global":
# Get user from access token
access_token = _get_access_token(request)
if access_token:
payload = decode_access_token(access_token)
if payload:
await revoke_all_user_refresh_tokens(payload.sub)
elif refresh_token:
await revoke_refresh_token(refresh_token)
# Clear cookies
_clear_auth_cookies(response)
return MessageResponse(message="Logged out successfully")
@router.post("/refresh", response_model=AuthResponse)
async def refresh(request: Request, response: Response):
"""
Refresh the access token using the refresh token.
"""
refresh_token = request.cookies.get(REFRESH_TOKEN_COOKIE)
if not refresh_token:
raise HTTPException(status_code=401, detail="No refresh token")
# Validate refresh token
user_id = await validate_refresh_token(refresh_token)
if not user_id:
_clear_auth_cookies(response)
raise HTTPException(status_code=401, detail="Invalid or expired refresh token")
# Get user
user = await User.prisma().find_unique(where={"id": user_id})
if not user:
_clear_auth_cookies(response)
raise HTTPException(status_code=401, detail="User not found")
# Revoke old refresh token
await revoke_refresh_token(refresh_token)
# Create new tokens
role = get_user_role(user.email)
new_access_token = create_access_token(user.id, user.email, role)
new_refresh_token, _ = await create_refresh_token_db(user.id)
# Set new cookies
_set_auth_cookies(response, new_access_token, new_refresh_token)
return AuthResponse(
user=UserResponse.from_db(user),
access_token=new_access_token,
refresh_token=new_refresh_token,
expires_in=int(ACCESS_TOKEN_TTL.total_seconds()),
)
@router.get("/me", response_model=UserResponse)
async def get_current_user(request: Request):
"""
Get the currently authenticated user.
Supports admin impersonation via X-Act-As-User-Id header.
"""
access_token = _get_access_token(request)
if not access_token:
raise HTTPException(status_code=401, detail="Not authenticated")
payload = decode_access_token(access_token)
if not payload:
raise HTTPException(status_code=401, detail="Invalid or expired token")
# Check for impersonation header
impersonate_user_id = request.headers.get(IMPERSONATION_HEADER, "").strip()
if impersonate_user_id:
# Verify caller is admin
if payload.role != "admin":
raise HTTPException(
status_code=403, detail="Only admins can impersonate users"
)
# Log impersonation for audit
logger.info(
f"Admin impersonation: {payload.sub} ({payload.email}) "
f"viewing as user {impersonate_user_id}"
)
user = await User.prisma().find_unique(where={"id": impersonate_user_id})
if not user:
raise HTTPException(status_code=404, detail="Impersonated user not found")
else:
user = await User.prisma().find_unique(where={"id": payload.sub})
if not user:
raise HTTPException(status_code=401, detail="User not found")
return UserResponse.from_db(user, include_role=True)
# ============================================================================
# Password Reset Endpoints
# ============================================================================
@router.post("/password/reset", response_model=MessageResponse)
async def request_password_reset(data: PasswordResetRequest):
"""
Request a password reset email.
"""
user = await User.prisma().find_unique(where={"email": data.email})
# Always return success to prevent email enumeration
if not user:
return MessageResponse(message="If the email exists, a reset link has been sent")
# Don't allow password reset for OAuth-only users
if user.authProvider not in ("password", "supabase"):
return MessageResponse(message="If the email exists, a reset link has been sent")
# Create reset link and send email
token = await create_password_reset_link(data.email, user.id)
email_service = get_auth_email_service()
email_service.send_password_reset_email(data.email, token)
return MessageResponse(message="If the email exists, a reset link has been sent")
@router.post("/password/set", response_model=MessageResponse)
async def set_password(data: PasswordSetRequest, response: Response):
"""
Set a new password using a reset token.
"""
# Validate token
result = await verify_password_reset_token(data.token)
if not result:
raise HTTPException(status_code=400, detail="Invalid or expired reset token")
user_id, email = result
# Validate password strength
if len(data.password) < 8:
raise HTTPException(
status_code=400, detail="Password must be at least 8 characters"
)
# Update password and verify email (if not already)
password_hash = hash_password(data.password)
await User.prisma().update(
where={"id": user_id},
data={
"passwordHash": password_hash,
"emailVerified": True,
"emailVerifiedAt": datetime.now(timezone.utc),
"authProvider": "password",
"migratedFromSupabase": False, # Clear migration flag
},
)
# Send notification that password was changed
email_service = get_auth_email_service()
email_service.send_password_changed_notification(email)
# Revoke all existing sessions for security
await revoke_all_user_refresh_tokens(user_id)
# Clear any existing cookies
_clear_auth_cookies(response)
return MessageResponse(message="Password updated successfully. Please log in.")
# ============================================================================
# Email Verification
# ============================================================================
@router.get("/verify-email", response_model=MessageResponse)
async def verify_email(token: str = Query(...)):
"""
Verify email address from magic link.
"""
email = await verify_email_token(token)
if not email:
raise HTTPException(status_code=400, detail="Invalid or expired verification link")
# Update user as verified
user = await User.prisma().find_unique(where={"email": email})
if user:
await User.prisma().update(
where={"id": user.id},
data={
"emailVerified": True,
"emailVerifiedAt": datetime.now(timezone.utc),
},
)
return MessageResponse(message="Email verified successfully. You can now log in.")
# ============================================================================
# Google OAuth
# ============================================================================
GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token"
GOOGLE_USERINFO_URL = "https://www.googleapis.com/oauth2/v2/userinfo"
def _get_google_config():
"""Get Google OAuth configuration from environment."""
client_id = os.getenv("GOOGLE_CLIENT_ID")
client_secret = os.getenv("GOOGLE_CLIENT_SECRET")
redirect_uri = os.getenv("GOOGLE_REDIRECT_URI", "")
if not client_id or not client_secret:
raise HTTPException(
status_code=503, detail="Google OAuth not configured"
)
return client_id, client_secret, redirect_uri
@router.get("/oauth/google/authorize")
async def google_authorize(
response: Response,
redirect_to: str = Query("/marketplace", description="URL to redirect after auth"),
):
"""
Initiate Google OAuth flow.
Returns the authorization URL to redirect the user to.
"""
client_id, _, redirect_uri = _get_google_config()
# Generate state for CSRF protection
state = secrets.token_urlsafe(32)
# Store state and redirect_to in cookie
secure = _is_production()
response.set_cookie(
key=OAUTH_STATE_COOKIE,
value=f"{state}|{redirect_to}",
httponly=True,
secure=secure,
samesite="lax",
max_age=600, # 10 minutes
)
# Build authorization URL
params = {
"client_id": client_id,
"redirect_uri": redirect_uri,
"response_type": "code",
"scope": "openid email profile",
"state": state,
"access_type": "offline",
"prompt": "consent",
}
auth_url = f"{GOOGLE_AUTH_URL}?{urlencode(params)}"
return {"url": auth_url}
@router.get("/oauth/google/callback")
async def google_callback(
request: Request,
response: Response,
code: str = Query(...),
state: str = Query(...),
):
"""
Handle Google OAuth callback.
Exchanges the authorization code for tokens and creates/updates the user.
"""
client_id, client_secret, redirect_uri = _get_google_config()
# Verify state
stored_state_cookie = request.cookies.get(OAUTH_STATE_COOKIE)
if not stored_state_cookie:
raise HTTPException(status_code=400, detail="Missing OAuth state")
stored_state, redirect_to = stored_state_cookie.split("|", 1)
if state != stored_state:
raise HTTPException(status_code=400, detail="Invalid OAuth state")
# Clear state cookie
response.delete_cookie(key=OAUTH_STATE_COOKIE)
# Exchange code for tokens
async with httpx.AsyncClient() as client:
token_response = await client.post(
GOOGLE_TOKEN_URL,
data={
"client_id": client_id,
"client_secret": client_secret,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": redirect_uri,
},
)
if token_response.status_code != 200:
logger.error(f"Google token exchange failed: {token_response.text}")
raise HTTPException(status_code=400, detail="Failed to exchange code")
tokens = token_response.json()
google_access_token = tokens.get("access_token")
# Get user info
userinfo_response = await client.get(
GOOGLE_USERINFO_URL,
headers={"Authorization": f"Bearer {google_access_token}"},
)
if userinfo_response.status_code != 200:
raise HTTPException(status_code=400, detail="Failed to get user info")
userinfo = userinfo_response.json()
email = userinfo.get("email")
if not email:
raise HTTPException(status_code=400, detail="Email not provided by Google")
# Get or create user
user = await User.prisma().find_unique(where={"email": email})
if user:
# Update existing user if needed
if user.authProvider == "supabase":
await User.prisma().update(
where={"id": user.id},
data={"authProvider": "google"},
)
else:
# Create new user
user = await User.prisma().create(
data={
"id": str(uuid.uuid4()),
"email": email,
"name": userinfo.get("name"),
"emailVerified": True, # Google verifies emails
"emailVerifiedAt": datetime.now(timezone.utc),
"authProvider": "google",
}
)
# Create tokens
role = get_user_role(email)
access_token = create_access_token(user.id, user.email, role)
refresh_token, _ = await create_refresh_token_db(user.id)
# Set cookies
_set_auth_cookies(response, access_token, refresh_token)
# Redirect to frontend
frontend_url = os.getenv("FRONTEND_BASE_URL", "http://localhost:3000")
from fastapi.responses import RedirectResponse
return RedirectResponse(url=f"{frontend_url}{redirect_to}")
# ============================================================================
# Admin Routes
# ============================================================================
@router.get("/admin/users", response_model=AdminUserListResponse)
async def list_users(
request: Request,
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=100),
search: Optional[str] = Query(None, description="Search by email"),
admin_user: User = Depends(require_admin),
):
"""
List all users (admin only).
"""
skip = (page - 1) * page_size
where_clause = {}
if search:
where_clause["email"] = {"contains": search, "mode": "insensitive"}
users = await User.prisma().find_many(
where=where_clause,
skip=skip,
take=page_size,
order={"createdAt": "desc"},
)
total = await User.prisma().count(where=where_clause)
return AdminUserListResponse(
users=[UserResponse.from_db(u, include_role=True) for u in users],
total=total,
page=page,
page_size=page_size,
)
@router.get("/admin/users/{user_id}", response_model=UserResponse)
async def get_user_by_id(
user_id: str,
admin_user: User = Depends(require_admin),
):
"""
Get a specific user by ID (admin only).
"""
user = await User.prisma().find_unique(where={"id": user_id})
if not user:
raise HTTPException(status_code=404, detail="User not found")
return UserResponse.from_db(user, include_role=True)
@router.post("/admin/users/{user_id}/impersonate", response_model=ImpersonationResponse)
async def impersonate_user(
request: Request,
user_id: str,
admin_user: User = Depends(require_admin),
):
"""
Get an access token to impersonate a user (admin only).
This token can be used with the Authorization header to act as the user.
All actions are logged for audit purposes.
"""
target_user = await User.prisma().find_unique(where={"id": user_id})
if not target_user:
raise HTTPException(status_code=404, detail="User not found")
# Log the impersonation
logger.warning(
f"ADMIN IMPERSONATION: Admin {admin_user.id} ({admin_user.email}) "
f"generated impersonation token for user {target_user.id} ({target_user.email})"
)
# Create an access token for the target user (but with original role for safety)
# The impersonation is tracked via the audit log
role = get_user_role(target_user.email)
access_token = create_access_token(target_user.id, target_user.email, role)
return ImpersonationResponse(
access_token=access_token,
impersonated_user=UserResponse.from_db(target_user, include_role=True),
expires_in=int(ACCESS_TOKEN_TTL.total_seconds()),
)
@router.post("/admin/users/{user_id}/force-password-reset", response_model=MessageResponse)
async def force_password_reset(
user_id: str,
admin_user: User = Depends(require_admin),
):
"""
Force send a password reset email to a user (admin only).
Useful for helping users who are locked out.
"""
user = await User.prisma().find_unique(where={"id": user_id})
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Create and send password reset
token = await create_password_reset_link(user.email, user.id)
email_service = get_auth_email_service()
email_sent = email_service.send_password_reset_email(user.email, token)
# Log the action
logger.info(
f"Admin {admin_user.id} ({admin_user.email}) "
f"triggered password reset for user {user.id} ({user.email})"
)
if email_sent:
return MessageResponse(message=f"Password reset email sent to {user.email}")
else:
return MessageResponse(message="Email service unavailable, reset link logged")
@router.post("/admin/users/{user_id}/revoke-sessions", response_model=MessageResponse)
async def revoke_user_sessions(
user_id: str,
admin_user: User = Depends(require_admin),
):
"""
Revoke all sessions for a user (admin only).
Useful for security incidents.
"""
user = await User.prisma().find_unique(where={"id": user_id})
if not user:
raise HTTPException(status_code=404, detail="User not found")
count = await revoke_all_user_refresh_tokens(user_id)
# Log the action
logger.warning(
f"Admin {admin_user.id} ({admin_user.email}) "
f"revoked all sessions for user {user.id} ({user.email}). "
f"Revoked {count} refresh tokens."
)
return MessageResponse(message=f"Revoked {count} sessions for user {user.email}")

View File

@@ -308,6 +308,16 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
description="The email address to use for sending emails",
)
# Admin configuration for native auth
admin_email_domains: str = Field(
default="agpt.co",
description="Comma-separated list of email domains that grant admin role (e.g., 'agpt.co,autogpt.com')",
)
admin_emails: str = Field(
default="",
description="Comma-separated list of specific email addresses that grant admin role",
)
use_agent_image_generation_v2: bool = Field(
default=True,
description="Whether to use the new agent image generation service",

View File

@@ -25,6 +25,12 @@ model User {
stripeCustomerId String?
topUpConfig Json?
// Native auth fields (for migration from Supabase)
passwordHash String? // NULL for OAuth users or migrated users awaiting password reset
authProvider String @default("supabase") // "password", "google", "supabase" (legacy)
migratedFromSupabase Boolean @default(false)
emailVerifiedAt DateTime?
maxEmailsPerDay Int @default(3)
notifyOnAgentRun Boolean @default(true)
notifyOnZeroBalance Boolean @default(true)
@@ -67,6 +73,10 @@ model User {
OAuthAuthorizationCodes OAuthAuthorizationCode[]
OAuthAccessTokens OAuthAccessToken[]
OAuthRefreshTokens OAuthRefreshToken[]
// Native auth relations
UserAuthRefreshTokens UserAuthRefreshToken[]
UserAuthMagicLinks UserAuthMagicLink[]
}
enum OnboardingStep {
@@ -1092,3 +1102,47 @@ model OAuthRefreshToken {
@@index([userId, applicationId])
@@index([expiresAt]) // For cleanup
}
////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////
////////////// NATIVE AUTH TABLES /////////////////////
////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////
// Refresh tokens for native authentication (30 days TTL)
model UserAuthRefreshToken {
id String @id @default(uuid())
tokenHash String @unique // SHA256 hash of plaintext token
createdAt DateTime @default(now())
expiresAt DateTime // Now + 30 days
userId String
User User @relation(fields: [userId], references: [id], onDelete: Cascade)
revokedAt DateTime? // Set when token is revoked
@@index([tokenHash]) // For token lookup
@@index([userId])
@@index([expiresAt]) // For cleanup
}
// Magic links for email verification and password reset
model UserAuthMagicLink {
id String @id @default(uuid())
tokenHash String @unique // SHA256 hash of plaintext token
createdAt DateTime @default(now())
expiresAt DateTime
email String
purpose String // "email_verification" or "password_reset"
// For password reset, track which user this is for
userId String?
User User? @relation(fields: [userId], references: [id], onDelete: Cascade)
usedAt DateTime? // Set when link is consumed (single-use)
@@index([tokenHash]) // For token lookup
@@index([email, purpose])
@@index([expiresAt]) // For cleanup
}

View File

@@ -0,0 +1,798 @@
# Migrating from Supabase Auth to Native FastAPI Auth
This guide covers the complete migration from Supabase Auth to native FastAPI authentication.
## Overview
The migration replaces Supabase Auth with a native FastAPI implementation while:
- Maintaining the same JWT format so existing sessions remain valid
- Keeping the frontend interface identical (no component changes needed)
- Supporting both password and Google OAuth authentication
- Providing admin impersonation and user management
## Prerequisites
- Access to the production database
- Postmark API credentials configured
- Google OAuth credentials (if using Google sign-in)
- Ability to deploy backend and frontend changes
---
## Phase 1: Backend Setup
### 1.1 Install Dependencies
```bash
cd autogpt_platform/backend
poetry add argon2-cffi
```
### 1.2 Run Database Migration
Create and apply the Prisma migration:
```bash
cd autogpt_platform/backend
poetry run prisma migrate dev --name add_native_auth
```
This adds:
- `passwordHash`, `authProvider`, `migratedFromSupabase`, `emailVerifiedAt` fields to `User`
- `UserAuthRefreshToken` table for session management
- `UserAuthMagicLink` table for email verification and password reset
### 1.3 Configure Environment Variables
Add to your `.env` file:
```bash
# Admin Configuration
ADMIN_EMAIL_DOMAINS=agpt.co,autogpt.com
ADMIN_EMAILS=specific-admin@example.com
# Google OAuth (if using)
GOOGLE_CLIENT_ID=your-google-client-id
GOOGLE_CLIENT_SECRET=your-google-client-secret
GOOGLE_REDIRECT_URI=https://your-domain.com/api/auth/oauth/google/callback
# Frontend URL for redirects
FRONTEND_BASE_URL=https://your-domain.com
# Postmark (should already be configured)
POSTMARK_SERVER_API_TOKEN=your-postmark-token
POSTMARK_SENDER_EMAIL=noreply@your-domain.com
```
### 1.4 Deploy Backend
Deploy the backend with the new auth endpoints. The new endpoints are:
| Endpoint | Method | Description |
|----------|--------|-------------|
| `/api/auth/signup` | POST | Register new user |
| `/api/auth/login` | POST | Login with email/password |
| `/api/auth/logout` | POST | Logout |
| `/api/auth/refresh` | POST | Refresh access token |
| `/api/auth/me` | GET | Get current user |
| `/api/auth/password/reset` | POST | Request password reset |
| `/api/auth/password/set` | POST | Set new password |
| `/api/auth/verify-email` | GET | Verify email from link |
| `/api/auth/oauth/google/authorize` | GET | Start Google OAuth |
| `/api/auth/oauth/google/callback` | GET | Google OAuth callback |
---
## Phase 2: User Migration
### 2.1 Check Migration Status
```bash
cd autogpt_platform/backend
poetry run python -m backend.data.auth.migration --status
```
This shows:
```
Migration Status:
----------------------------------------
Total users: 10000
Already using native auth: 0
OAuth users (Google): 1500
Migrated, pending password: 0
Not yet migrated: 8500
```
### 2.2 Generate Pre-Migration Report
```bash
poetry run python -m backend.data.auth.migration --report
```
This creates a CSV file with all users and their current status.
### 2.3 Dry Run
Test the migration without making changes:
```bash
poetry run python -m backend.data.auth.migration --dry-run --full-migration
```
### 2.4 Run Migration (Mark Users)
Mark all existing Supabase users as migrated:
```bash
poetry run python -m backend.data.auth.migration --mark-migrated --batch-size 500
```
### 2.5 Send Password Reset Emails
Send emails to users who need to set their password:
```bash
# Start with a small batch to verify emails work
poetry run python -m backend.data.auth.migration --send-emails --batch-size 10
# Then send to everyone
poetry run python -m backend.data.auth.migration --send-emails --batch-size 100 --email-delay 0.5
```
**Note:** OAuth users (Google) are automatically skipped - they continue using Google sign-in.
---
## Phase 3: Frontend Migration
The frontend uses a Supabase client abstraction layer. We need to replace the internals while keeping the interface identical.
### 3.1 Understanding the Architecture
The frontend has these Supabase-related files:
```
src/lib/supabase/
├── actions.ts # Server actions (validateSession, logout, etc.)
├── middleware.ts # Next.js middleware for session validation
├── helpers.ts # Utility functions
├── server/
│ └── getServerSupabase.ts # Server-side Supabase client
└── hooks/
├── helpers.ts # Client-side helpers
├── useSupabase.ts # Main auth hook
└── useSupabaseStore.ts # Zustand store for auth state
```
### 3.2 Option A: Gradual Migration (Recommended)
Keep Supabase running during migration and gradually switch endpoints.
#### Step 1: Create Native Auth Client
The native auth client is already created at `src/lib/auth/native-auth.ts`. It provides:
```typescript
// Client-side functions
getAccessToken() // Get token from cookie
isAuthenticated() // Check if user is authenticated
getCurrentUserFromToken() // Parse user from JWT
// Server-side functions (for server actions)
serverLogin(email, password)
serverSignup(email, password)
serverLogout(scope)
serverRefreshToken()
serverGetCurrentUser()
serverRequestPasswordReset(email)
serverSetPassword(token, password)
serverGetGoogleAuthUrl(redirectTo)
```
#### Step 2: Update Login Action
Edit `src/app/(platform)/login/actions.ts`:
```typescript
"use server";
import { serverLogin } from "@/lib/auth/native-auth";
import { loginFormSchema } from "@/types/auth";
import * as Sentry from "@sentry/nextjs";
import BackendAPI from "@/lib/autogpt-server-api";
import { shouldShowOnboarding } from "../../api/helpers";
export async function login(email: string, password: string) {
try {
const parsed = loginFormSchema.safeParse({ email, password });
if (!parsed.success) {
return {
success: false,
error: "Invalid email or password",
};
}
const result = await serverLogin(parsed.data.email, parsed.data.password);
if (!result.success) {
return {
success: false,
error: result.error || "Login failed",
};
}
// Create user in backend if needed
const api = new BackendAPI();
await api.createUser();
const onboarding = await shouldShowOnboarding();
return {
success: true,
onboarding,
};
} catch (err) {
Sentry.captureException(err);
return {
success: false,
error: "Failed to login. Please try again.",
};
}
}
```
#### Step 3: Update Signup Action
Edit `src/app/(platform)/signup/actions.ts`:
```typescript
"use server";
import { serverSignup } from "@/lib/auth/native-auth";
import { signupFormSchema } from "@/types/auth";
import * as Sentry from "@sentry/nextjs";
export async function signup(
email: string,
password: string,
confirmPassword: string,
agreeToTerms: boolean,
) {
try {
const parsed = signupFormSchema.safeParse({
email,
password,
confirmPassword,
agreeToTerms,
});
if (!parsed.success) {
return {
success: false,
error: "Invalid signup payload",
};
}
const result = await serverSignup(parsed.data.email, parsed.data.password);
if (!result.success) {
if (result.error === "Email already registered") {
return { success: false, error: "user_already_exists" };
}
return {
success: false,
error: result.error || "Signup failed",
};
}
// User needs to verify email before logging in
return {
success: true,
message: result.message,
requiresVerification: true,
};
} catch (err) {
Sentry.captureException(err);
return {
success: false,
error: "Failed to sign up. Please try again.",
};
}
}
```
#### Step 4: Update Server Actions
Edit `src/lib/supabase/actions.ts`:
```typescript
"use server";
import * as Sentry from "@sentry/nextjs";
import { revalidatePath } from "next/cache";
import { cookies } from "next/headers";
import { getRedirectPath } from "./helpers";
import {
serverGetCurrentUser,
serverLogout as nativeLogout,
serverRefreshToken,
} from "@/lib/auth/native-auth";
// User type compatible with existing code
interface User {
id: string;
email: string;
role?: string;
}
export interface SessionValidationResult {
user: User | null;
isValid: boolean;
redirectPath?: string;
}
export async function validateSession(
currentPath: string,
): Promise<SessionValidationResult> {
return await Sentry.withServerActionInstrumentation(
"validateSession",
{},
async () => {
try {
const { user, error } = await serverGetCurrentUser();
if (error || !user) {
const redirectPath = getRedirectPath(currentPath);
return {
user: null,
isValid: false,
redirectPath: redirectPath || undefined,
};
}
return {
user: {
id: user.id,
email: user.email,
role: user.role,
},
isValid: true,
};
} catch (error) {
console.error("Session validation error:", error);
const redirectPath = getRedirectPath(currentPath);
return {
user: null,
isValid: false,
redirectPath: redirectPath || undefined,
};
}
},
);
}
export async function getCurrentUser(): Promise<{
user: User | null;
error?: string;
}> {
return await Sentry.withServerActionInstrumentation(
"getCurrentUser",
{},
async () => {
try {
const { user, error } = await serverGetCurrentUser();
if (error) {
return { user: null, error };
}
if (!user) {
return { user: null };
}
return {
user: {
id: user.id,
email: user.email,
role: user.role,
}
};
} catch (error) {
console.error("Get current user error:", error);
return {
user: null,
error: error instanceof Error ? error.message : "Unknown error",
};
}
},
);
}
export async function getWebSocketToken(): Promise<{
token: string | null;
error?: string;
}> {
return await Sentry.withServerActionInstrumentation(
"getWebSocketToken",
{},
async () => {
try {
// Get access token from cookie
const cookieStore = await cookies();
const token = cookieStore.get("access_token")?.value;
return { token: token || null };
} catch (error) {
console.error("Get WebSocket token error:", error);
return {
token: null,
error: error instanceof Error ? error.message : "Unknown error",
};
}
},
);
}
export type ServerLogoutOptions = {
globalLogout?: boolean;
};
export async function serverLogout(options: ServerLogoutOptions = {}) {
return await Sentry.withServerActionInstrumentation(
"serverLogout",
{},
async () => {
try {
const scope = options.globalLogout ? "global" : "local";
const result = await nativeLogout(scope);
revalidatePath("/");
if (!result.success) {
console.error("Error logging out:", result.error);
return { success: false, error: result.error };
}
revalidatePath("/", "layout");
return { success: true };
} catch (error) {
console.error("Logout error:", error);
return {
success: false,
error: error instanceof Error ? error.message : "Unknown error",
};
}
},
);
}
export async function refreshSession() {
return await Sentry.withServerActionInstrumentation(
"refreshSession",
{},
async () => {
try {
const result = await serverRefreshToken();
if (!result.success || !result.user) {
return {
user: null,
error: result.error,
};
}
revalidatePath("/", "layout");
return {
user: {
id: result.user.id,
email: result.user.email,
role: result.user.role,
}
};
} catch (error) {
console.error("Refresh session error:", error);
return {
user: null,
error: error instanceof Error ? error.message : "Unknown error",
};
}
},
);
}
```
#### Step 5: Update Middleware
Edit `src/lib/supabase/middleware.ts`:
```typescript
import { NextResponse, type NextRequest } from "next/server";
import { isAdminPage, isProtectedPage } from "./helpers";
export async function updateSession(request: NextRequest) {
let response = NextResponse.next({ request });
const accessToken = request.cookies.get("access_token")?.value;
// Parse JWT to get user info (without verification - backend will verify)
let user = null;
let userRole = null;
if (accessToken) {
try {
const payload = JSON.parse(
Buffer.from(accessToken.split(".")[1], "base64").toString()
);
// Check if token is expired
if (payload.exp && Date.now() / 1000 < payload.exp) {
user = { id: payload.sub, email: payload.email };
userRole = payload.role;
}
} catch (e) {
// Invalid token format
console.error("Failed to parse access token:", e);
}
}
const url = request.nextUrl.clone();
const pathname = request.nextUrl.pathname;
// AUTH REDIRECTS
// 1. Check if user is not authenticated but trying to access protected content
if (!user) {
const attemptingProtectedPage = isProtectedPage(pathname);
const attemptingAdminPage = isAdminPage(pathname);
if (attemptingProtectedPage || attemptingAdminPage) {
const currentDest = url.pathname + url.search;
url.pathname = "/login";
url.search = `?next=${encodeURIComponent(currentDest)}`;
return NextResponse.redirect(url);
}
}
// 2. Check if user is authenticated but lacks admin role when accessing admin pages
if (user && userRole !== "admin" && isAdminPage(pathname)) {
url.pathname = "/marketplace";
return NextResponse.redirect(url);
}
return response;
}
```
#### Step 6: Update OAuth Callback
Edit `src/app/(platform)/auth/callback/route.ts`:
```typescript
import BackendAPI from "@/lib/autogpt-server-api";
import { NextResponse } from "next/server";
import { revalidatePath } from "next/cache";
import { shouldShowOnboarding } from "@/app/api/helpers";
// This route now just handles the redirect after OAuth
// The actual OAuth callback is handled by the backend at /api/auth/oauth/google/callback
export async function GET(request: Request) {
const { searchParams, origin } = new URL(request.url);
// Check if user is now authenticated (cookie should be set by backend)
const cookies = request.headers.get("cookie") || "";
const hasAccessToken = cookies.includes("access_token=");
if (!hasAccessToken) {
return NextResponse.redirect(`${origin}/auth/auth-code-error`);
}
let next = "/marketplace";
try {
const api = new BackendAPI();
await api.createUser();
if (await shouldShowOnboarding()) {
next = "/onboarding";
revalidatePath("/onboarding", "layout");
} else {
revalidatePath("/", "layout");
}
} catch (createUserError) {
console.error("Error creating user:", createUserError);
return NextResponse.redirect(`${origin}/error?message=user-creation-failed`);
}
// Get redirect destination from 'next' query parameter
next = searchParams.get("next") || next;
const forwardedHost = request.headers.get("x-forwarded-host");
const isLocalEnv = process.env.NODE_ENV === "development";
if (isLocalEnv) {
return NextResponse.redirect(`${origin}${next}`);
} else if (forwardedHost) {
return NextResponse.redirect(`https://${forwardedHost}${next}`);
} else {
return NextResponse.redirect(`${origin}${next}`);
}
}
```
#### Step 7: Update Google OAuth Provider Route
Edit `src/app/api/auth/provider/route.ts`:
```typescript
import { NextResponse } from "next/server";
import { serverGetGoogleAuthUrl } from "@/lib/auth/native-auth";
export async function POST(request: Request) {
try {
const body = await request.json();
const { provider, redirectTo } = body;
if (provider !== "google") {
return NextResponse.json(
{ error: "Unsupported provider" },
{ status: 400 }
);
}
const result = await serverGetGoogleAuthUrl(redirectTo || "/marketplace");
if (result.error) {
return NextResponse.json(
{ error: result.error },
{ status: 500 }
);
}
return NextResponse.json({ url: result.url });
} catch (error) {
console.error("OAuth provider error:", error);
return NextResponse.json(
{ error: "Failed to initialize OAuth" },
{ status: 500 }
);
}
}
```
### 3.3 Option B: Big Bang Migration
Replace all Supabase references at once. Higher risk but faster.
1. Apply all the changes from Option A simultaneously
2. Remove `@supabase/ssr` and `@supabase/supabase-js` dependencies
3. Delete old Supabase configuration
---
## Phase 4: Cutover
### 4.1 Pre-Cutover Checklist
- [ ] Backend deployed with new auth endpoints
- [ ] Database migration applied
- [ ] Environment variables configured
- [ ] Postmark email templates verified
- [ ] Google OAuth redirect URIs updated
- [ ] Frontend changes tested in staging
### 4.2 Cutover Steps
1. **Deploy frontend changes**
2. **Verify login/signup works**
3. **Verify Google OAuth works**
4. **Verify password reset works**
5. **Run user migration script** (if not already done)
### 4.3 Rollback Plan
If issues occur:
1. Revert frontend to use Supabase client
2. Supabase Auth remains functional (keep it running for 30 days)
3. Users can still login via Supabase during rollback
---
## Phase 5: Cleanup (After 30 Days)
Once migration is stable:
1. **Remove Supabase dependencies from frontend**
```bash
cd autogpt_platform/frontend
pnpm remove @supabase/ssr @supabase/supabase-js
```
2. **Remove Supabase environment variables**
- `NEXT_PUBLIC_SUPABASE_URL`
- `NEXT_PUBLIC_SUPABASE_ANON_KEY`
- `SUPABASE_URL`
- `SUPABASE_JWT_SECRET` (keep if using same key)
3. **Delete old Supabase files**
- `src/lib/supabase/server/getServerSupabase.ts`
- Any remaining Supabase-specific code
4. **Cancel Supabase subscription** (if applicable)
---
## Troubleshooting
### Users Can't Login
1. Check if user is marked as migrated: `migratedFromSupabase = true`
2. Check if password reset email was sent
3. Verify Postmark is configured correctly
### OAuth Not Working
1. Verify Google OAuth credentials in environment
2. Check redirect URI matches exactly
3. Look for errors in backend logs
### Token Issues
1. Ensure `JWT_VERIFY_KEY` matches the old `SUPABASE_JWT_SECRET`
2. Check token expiration
3. Verify audience claim is "authenticated"
### Admin Access Issues
1. Verify email is in `ADMIN_EMAIL_DOMAINS` or `ADMIN_EMAILS`
2. Check JWT role claim is "admin"
3. User may need to re-login to get new token with updated role
---
## API Reference
### Authentication Endpoints
| Endpoint | Method | Auth | Description |
|----------|--------|------|-------------|
| `/api/auth/signup` | POST | - | Register new user |
| `/api/auth/login` | POST | - | Login, returns tokens |
| `/api/auth/logout` | POST | Token | Logout |
| `/api/auth/refresh` | POST | Cookie | Refresh access token |
| `/api/auth/me` | GET | Token | Get current user |
| `/api/auth/password/reset` | POST | - | Request reset email |
| `/api/auth/password/set` | POST | - | Set new password |
| `/api/auth/verify-email` | GET | - | Verify email |
| `/api/auth/oauth/google/authorize` | GET | - | Get Google OAuth URL |
| `/api/auth/oauth/google/callback` | GET | - | OAuth callback |
### Admin Endpoints
| Endpoint | Method | Auth | Description |
|----------|--------|------|-------------|
| `/api/auth/admin/users` | GET | Admin | List users |
| `/api/auth/admin/users/{id}` | GET | Admin | Get user details |
| `/api/auth/admin/users/{id}/impersonate` | POST | Admin | Get impersonation token |
| `/api/auth/admin/users/{id}/force-password-reset` | POST | Admin | Force password reset |
| `/api/auth/admin/users/{id}/revoke-sessions` | POST | Admin | Revoke all sessions |
### Cookie Structure
| Cookie | HttpOnly | Path | Purpose |
|--------|----------|------|---------|
| `access_token` | No | `/` | JWT for API auth |
| `refresh_token` | Yes | `/api/auth/refresh` | Session refresh |
### JWT Claims
```json
{
"sub": "user-uuid",
"email": "user@example.com",
"role": "authenticated",
"aud": "authenticated",
"iat": 1234567890,
"exp": 1234571490
}
```

View File

@@ -0,0 +1,311 @@
/**
* Native authentication client for FastAPI backend.
*
* This module provides authentication functions that communicate with the
* FastAPI backend instead of Supabase, while maintaining interface compatibility.
*/
import { environment } from "@/services/environment";
// User type compatible with Supabase User for minimal frontend changes
export interface User {
id: string;
email: string;
email_verified: boolean;
name?: string;
created_at: string;
role?: string;
}
export interface AuthResponse {
user: User;
access_token: string;
refresh_token: string;
expires_in: number;
token_type: string;
}
export interface MessageResponse {
message: string;
}
// Cookie names (must match backend)
const ACCESS_TOKEN_COOKIE = "access_token";
function getApiUrl(): string {
return environment.getAGPTServerBaseUrl();
}
/**
* Parse JWT token without verification (for client-side use only).
* The backend always verifies tokens - this is just for UI purposes.
*/
export function parseJwt(token: string): Record<string, unknown> | null {
try {
const base64Url = token.split(".")[1];
const base64 = base64Url.replace(/-/g, "+").replace(/_/g, "/");
const jsonPayload = decodeURIComponent(
atob(base64)
.split("")
.map((c) => "%" + ("00" + c.charCodeAt(0).toString(16)).slice(-2))
.join(""),
);
return JSON.parse(jsonPayload);
} catch {
return null;
}
}
/**
* Get access token from cookie.
*/
export function getAccessToken(): string | null {
if (typeof document === "undefined") return null;
const cookies = document.cookie.split(";");
for (const cookie of cookies) {
const [name, value] = cookie.trim().split("=");
if (name === ACCESS_TOKEN_COOKIE) {
return value;
}
}
return null;
}
/**
* Check if user is authenticated based on cookie presence and token validity.
*/
export function isAuthenticated(): boolean {
const token = getAccessToken();
if (!token) return false;
const payload = parseJwt(token);
if (!payload) return false;
// Check expiration
const exp = payload.exp as number;
if (exp && Date.now() / 1000 > exp) return false;
return true;
}
/**
* Get current user from access token (client-side only).
* For server-side, use the server action.
*/
export function getCurrentUserFromToken(): User | null {
const token = getAccessToken();
if (!token) return null;
const payload = parseJwt(token);
if (!payload) return null;
// Check expiration
const exp = payload.exp as number;
if (exp && Date.now() / 1000 > exp) return null;
return {
id: payload.sub as string,
email: payload.email as string,
email_verified: true, // If token is valid, email is verified
role: payload.role as string,
created_at: new Date().toISOString(), // Not available in token
};
}
// ============================================================================
// Server-side API calls (for use in server actions)
// ============================================================================
export async function serverLogin(
email: string,
password: string,
): Promise<{ success: boolean; error?: string; user?: User }> {
try {
const response = await fetch(`${getApiUrl()}/api/auth/login`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ email, password }),
credentials: "include",
});
if (!response.ok) {
const error = await response.json();
return { success: false, error: error.detail || "Login failed" };
}
const data: AuthResponse = await response.json();
return { success: true, user: data.user };
} catch (error) {
console.error("Login error:", error);
return { success: false, error: "Network error" };
}
}
export async function serverSignup(
email: string,
password: string,
): Promise<{ success: boolean; error?: string; message?: string }> {
try {
const response = await fetch(`${getApiUrl()}/api/auth/signup`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ email, password }),
credentials: "include",
});
if (!response.ok) {
const error = await response.json();
return { success: false, error: error.detail || "Signup failed" };
}
const data: MessageResponse = await response.json();
return { success: true, message: data.message };
} catch (error) {
console.error("Signup error:", error);
return { success: false, error: "Network error" };
}
}
export async function serverLogout(
scope: "local" | "global" = "local",
): Promise<{ success: boolean; error?: string }> {
try {
const response = await fetch(`${getApiUrl()}/api/auth/logout?scope=${scope}`, {
method: "POST",
credentials: "include",
});
if (!response.ok) {
const error = await response.json();
return { success: false, error: error.detail || "Logout failed" };
}
return { success: true };
} catch (error) {
console.error("Logout error:", error);
return { success: false, error: "Network error" };
}
}
export async function serverRefreshToken(): Promise<{
success: boolean;
error?: string;
user?: User;
}> {
try {
const response = await fetch(`${getApiUrl()}/api/auth/refresh`, {
method: "POST",
credentials: "include",
});
if (!response.ok) {
const error = await response.json();
return { success: false, error: error.detail || "Refresh failed" };
}
const data: AuthResponse = await response.json();
return { success: true, user: data.user };
} catch (error) {
console.error("Refresh error:", error);
return { success: false, error: "Network error" };
}
}
export async function serverGetCurrentUser(): Promise<{
user: User | null;
error?: string;
}> {
try {
const response = await fetch(`${getApiUrl()}/api/auth/me`, {
method: "GET",
credentials: "include",
});
if (!response.ok) {
if (response.status === 401) {
return { user: null };
}
const error = await response.json();
return { user: null, error: error.detail || "Failed to get user" };
}
const user: User = await response.json();
return { user };
} catch (error) {
console.error("Get current user error:", error);
return { user: null, error: "Network error" };
}
}
export async function serverRequestPasswordReset(
email: string,
): Promise<{ success: boolean; error?: string; message?: string }> {
try {
const response = await fetch(`${getApiUrl()}/api/auth/password/reset`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ email }),
});
if (!response.ok) {
const error = await response.json();
return { success: false, error: error.detail || "Reset request failed" };
}
const data: MessageResponse = await response.json();
return { success: true, message: data.message };
} catch (error) {
console.error("Password reset request error:", error);
return { success: false, error: "Network error" };
}
}
export async function serverSetPassword(
token: string,
password: string,
): Promise<{ success: boolean; error?: string; message?: string }> {
try {
const response = await fetch(`${getApiUrl()}/api/auth/password/set`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ token, password }),
});
if (!response.ok) {
const error = await response.json();
return { success: false, error: error.detail || "Password set failed" };
}
const data: MessageResponse = await response.json();
return { success: true, message: data.message };
} catch (error) {
console.error("Set password error:", error);
return { success: false, error: "Network error" };
}
}
export async function serverGetGoogleAuthUrl(
redirectTo: string = "/marketplace",
): Promise<{ url?: string; error?: string }> {
try {
const response = await fetch(
`${getApiUrl()}/api/auth/oauth/google/authorize?redirect_to=${encodeURIComponent(redirectTo)}`,
{
method: "GET",
credentials: "include",
},
);
if (!response.ok) {
const error = await response.json();
return { error: error.detail || "Failed to get OAuth URL" };
}
const data = await response.json();
return { url: data.url };
} catch (error) {
console.error("Get Google auth URL error:", error);
return { error: "Network error" };
}
}