This commit is contained in:
Swifty
2025-12-20 19:16:22 +01:00
parent 974c14a7b9
commit 87e3d7eaad
8 changed files with 434 additions and 40 deletions

View File

@@ -0,0 +1,286 @@
"""
Verification script to check scheduler data integrity after native auth migration.
This script verifies that all scheduled jobs reference valid users in the platform.User table.
It can also clean up orphaned schedules (schedules for users that no longer exist).
Usage:
cd backend
poetry run python scripts/verify_scheduler_data.py [options]
Options:
--dry-run Preview what would be cleaned up without making changes
--cleanup Actually remove orphaned schedules
--database-url <url> Database URL (overrides DATABASE_URL env var)
Examples:
# Check for orphaned schedules (read-only)
poetry run python scripts/verify_scheduler_data.py
# Preview cleanup
poetry run python scripts/verify_scheduler_data.py --dry-run
# Actually clean up orphaned schedules
poetry run python scripts/verify_scheduler_data.py --cleanup
Prerequisites:
- Database must be accessible
- Scheduler service must be running (for cleanup operations)
"""
import argparse
import asyncio
import logging
import os
import pickle
import sys
from datetime import datetime
from urllib.parse import parse_qs, urlparse, urlunparse, urlencode
from prisma import Prisma
from sqlalchemy import create_engine, text, MetaData
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
def _extract_schema_from_url(database_url: str) -> tuple[str, str]:
"""Extract schema from DATABASE_URL and return (schema, clean_url)."""
parsed_url = urlparse(database_url)
query_params = parse_qs(parsed_url.query)
schema_list = query_params.pop("schema", None)
schema = schema_list[0] if schema_list else "public"
new_query = urlencode(query_params, doseq=True)
new_parsed_url = parsed_url._replace(query=new_query)
database_url_clean = str(urlunparse(new_parsed_url))
return schema, database_url_clean
async def get_all_user_ids(db: Prisma) -> set[str]:
"""Get all user IDs from the platform.User table."""
users = await db.user.find_many(select={"id": True})
return {user.id for user in users}
def get_scheduler_jobs(db_url: str, schema: str) -> list[dict]:
"""Get all jobs from the apscheduler_jobs table."""
engine = create_engine(db_url)
jobs = []
with engine.connect() as conn:
# Check if table exists
result = conn.execute(
text(
f"""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = :schema
AND table_name = 'apscheduler_jobs'
)
"""
),
{"schema": schema},
)
if not result.scalar():
logger.warning(
f"Table {schema}.apscheduler_jobs does not exist. "
"Scheduler may not have been initialized yet."
)
return []
# Get all jobs
result = conn.execute(
text(f'SELECT id, job_state FROM {schema}."apscheduler_jobs"')
)
for row in result:
job_id = row[0]
job_state = row[1]
try:
# APScheduler stores job state as pickled data
job_data = pickle.loads(job_state)
kwargs = job_data.get("kwargs", {})
# Only process graph execution jobs (have user_id)
if "user_id" in kwargs:
jobs.append(
{
"id": job_id,
"user_id": kwargs.get("user_id"),
"graph_id": kwargs.get("graph_id"),
"graph_version": kwargs.get("graph_version"),
"cron": kwargs.get("cron"),
"agent_name": kwargs.get("agent_name"),
}
)
except Exception as e:
logger.warning(f"Failed to parse job {job_id}: {e}")
return jobs
async def verify_scheduler_data(
db: Prisma, db_url: str, schema: str
) -> tuple[list[dict], list[dict]]:
"""
Verify scheduler data integrity.
Returns:
Tuple of (valid_jobs, orphaned_jobs)
"""
logger.info("Fetching all users from platform.User...")
user_ids = await get_all_user_ids(db)
logger.info(f"Found {len(user_ids)} users in platform.User")
logger.info("Fetching scheduled jobs from apscheduler_jobs...")
jobs = get_scheduler_jobs(db_url, schema)
logger.info(f"Found {len(jobs)} scheduled graph execution jobs")
valid_jobs = []
orphaned_jobs = []
for job in jobs:
if job["user_id"] in user_ids:
valid_jobs.append(job)
else:
orphaned_jobs.append(job)
return valid_jobs, orphaned_jobs
async def cleanup_orphaned_schedules(orphaned_jobs: list[dict], db_url: str, schema: str):
"""Remove orphaned schedules from the database."""
if not orphaned_jobs:
logger.info("No orphaned schedules to clean up")
return
engine = create_engine(db_url)
with engine.connect() as conn:
for job in orphaned_jobs:
try:
conn.execute(
text(f'DELETE FROM {schema}."apscheduler_jobs" WHERE id = :job_id'),
{"job_id": job["id"]},
)
logger.info(
f"Deleted orphaned schedule {job['id']} "
f"(user: {job['user_id']}, graph: {job['graph_id']})"
)
except Exception as e:
logger.error(f"Failed to delete schedule {job['id']}: {e}")
conn.commit()
logger.info(f"Cleaned up {len(orphaned_jobs)} orphaned schedules")
async def main(dry_run: bool = False, cleanup: bool = False):
"""Run the verification."""
logger.info("=" * 60)
logger.info("Scheduler Data Verification Script")
if dry_run:
logger.info(">>> DRY RUN MODE - No changes will be made <<<")
elif cleanup:
logger.info(">>> CLEANUP MODE - Orphaned schedules will be removed <<<")
else:
logger.info(">>> VERIFY MODE - Read-only check <<<")
logger.info("=" * 60)
logger.info(f"Started at: {datetime.now().isoformat()}")
# Get database URL
db_url = os.getenv("DIRECT_URL") or os.getenv("DATABASE_URL")
if not db_url:
logger.error("DATABASE_URL or DIRECT_URL environment variable not set")
sys.exit(1)
schema, clean_db_url = _extract_schema_from_url(db_url)
logger.info(f"Using schema: {schema}")
db = Prisma()
await db.connect()
try:
valid_jobs, orphaned_jobs = await verify_scheduler_data(db, clean_db_url, schema)
# Report results
logger.info("\n--- Verification Results ---")
logger.info(f"Valid scheduled jobs: {len(valid_jobs)}")
logger.info(f"Orphaned scheduled jobs: {len(orphaned_jobs)}")
if orphaned_jobs:
logger.warning("\n--- Orphaned Schedules (users not in platform.User) ---")
for job in orphaned_jobs:
logger.warning(
f" Schedule ID: {job['id']}\n"
f" User ID: {job['user_id']}\n"
f" Graph ID: {job['graph_id']}\n"
f" Cron: {job['cron']}\n"
f" Agent: {job['agent_name'] or 'N/A'}"
)
if cleanup and not dry_run:
logger.info("\n--- Cleaning up orphaned schedules ---")
await cleanup_orphaned_schedules(orphaned_jobs, clean_db_url, schema)
elif dry_run:
logger.info(
f"\n[DRY RUN] Would delete {len(orphaned_jobs)} orphaned schedules"
)
else:
logger.info(
"\nTo clean up orphaned schedules, run with --cleanup flag"
)
else:
logger.info("\n✅ All scheduled jobs reference valid users!")
# Summary
logger.info("\n" + "=" * 60)
if orphaned_jobs and cleanup and not dry_run:
logger.info("Cleanup completed successfully!")
else:
logger.info("Verification completed!")
logger.info("=" * 60)
except Exception as e:
logger.error(f"Verification failed: {e}")
raise
finally:
await db.disconnect()
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Verify scheduler data integrity after native auth migration"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Preview what would be cleaned up without making changes",
)
parser.add_argument(
"--cleanup",
action="store_true",
help="Actually remove orphaned schedules",
)
parser.add_argument(
"--database-url",
type=str,
help="Database URL (overrides DATABASE_URL env var)",
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
# Override DATABASE_URL if provided via command line
if args.database_url:
os.environ["DATABASE_URL"] = args.database_url
os.environ["DIRECT_URL"] = args.database_url
asyncio.run(main(dry_run=args.dry_run, cleanup=args.cleanup))

View File

@@ -1,7 +1,7 @@
[package]
name = "db-migrate"
version = "0.1.0"
edition = "2021"
edition = "2024"
description = "Database migration tool for AutoGPT Platform - Supabase to GCP Cloud SQL"
[dependencies]

View File

@@ -1,5 +1,5 @@
# Build stage
FROM rust:1.75-slim as builder
FROM rust:1.92-slim-bookworm AS builder
RUN apt-get update && \
apt-get install -y pkg-config libssl-dev && \

View File

@@ -50,7 +50,7 @@ NAMESPACE="${NAMESPACE:-dev-agpt}"
echo "=== Building db-migrate ==="
cd "$SCRIPT_DIR"
docker build -t "$IMAGE" .
docker build --platform linux/amd64 -t "$IMAGE" .
echo ""
echo "=== Pushing to GCR ==="

View File

@@ -39,11 +39,12 @@ pub async fn migrate_auth(source: &Database, dest: &Database) -> Result<()> {
info!("Found {} users with auth data to migrate", auth_user_count);
// Create temp table in destination
// Note: platform.User.id is TEXT, not UUID, so we use TEXT here for compatibility
info!("Creating temp table for auth data...");
dest.batch_execute(
r#"
CREATE TEMP TABLE IF NOT EXISTS temp_auth_users (
id UUID,
id TEXT,
encrypted_password TEXT,
email_verified BOOLEAN,
google_id TEXT
@@ -55,7 +56,7 @@ pub async fn migrate_auth(source: &Database, dest: &Database) -> Result<()> {
// Extract and insert auth data in batches
info!("Extracting auth data from source...");
let batch_size = 1000;
let batch_size = 1000i64;
let mut offset = 0i64;
let mut total_migrated = 0i64;

View File

@@ -71,7 +71,7 @@ impl Database {
self.client
.batch_execute(sql)
.await
.context("Batch execute failed")
.with_context(|| format!("Batch execute failed:\n{}", sql.chars().take(500).collect::<String>()))
}
/// Get all table names in the schema

View File

@@ -151,6 +151,16 @@ async fn main() -> Result<()> {
info!("\n=== Step 1: Creating Schema ===");
migrate::migrate_schema(&source, &dest).await?;
// Step 1.5: Verify all quick tables exist in destination
info!("\n=== Step 1.5: Verifying Tables Exist ===");
for table in &quick_tables {
let exists = dest.table_exists(table).await?;
if !exists {
anyhow::bail!("Table {} was not created in destination! Check schema migration errors.", table);
}
info!(" ✓ {} exists", table);
}
// Step 2: Migrate user-related tables
info!("\n=== Step 2: Migrating User Tables ===");
for table in &quick_tables {

View File

@@ -3,6 +3,39 @@ use anyhow::{Context, Result};
use indicatif::{ProgressBar, ProgressStyle};
use tracing::{info, warn};
/// Get default value for NULL columns that have NOT NULL constraints in dest
/// Returns Some(default_sql) if a default should be used, None otherwise
fn get_null_default(table: &str, column: &str) -> Option<&'static str> {
match (table, column) {
// User table - all Prisma @default values
("User", "createdAt") => Some("NOW()"),
("User", "updatedAt") => Some("NOW()"),
("User", "metadata") => Some("'{}'::jsonb"),
("User", "integrations") => Some("''"),
("User", "emailVerified") => Some("false"),
("User", "role") => Some("'authenticated'"),
("User", "maxEmailsPerDay") => Some("3"),
("User", "notifyOnAgentRun") => Some("true"),
("User", "notifyOnZeroBalance") => Some("true"),
("User", "notifyOnLowBalance") => Some("true"),
("User", "notifyOnBlockExecutionFailed") => Some("true"),
("User", "notifyOnContinuousAgentError") => Some("true"),
("User", "notifyOnDailySummary") => Some("true"),
("User", "notifyOnWeeklySummary") => Some("true"),
("User", "notifyOnMonthlySummary") => Some("true"),
("User", "notifyOnAgentApproved") => Some("true"),
("User", "notifyOnAgentRejected") => Some("true"),
("User", "timezone") => Some("'not-set'"),
// UserOnboarding defaults
("UserOnboarding", "createdAt") => Some("NOW()"),
("UserOnboarding", "updatedAt") => Some("NOW()"),
// UserBalance defaults
("UserBalance", "balance") => Some("0"),
("UserBalance", "updatedAt") => Some("NOW()"),
_ => None,
}
}
/// Tables to skip during initial migration (large execution history)
const LARGE_TABLES: &[&str] = &[
"AgentGraphExecution",
@@ -27,31 +60,75 @@ pub async fn migrate_schema(source: &Database, dest: &Database) -> Result<()> {
))
.await?;
// Create enum types first (before tables that reference them)
info!("Creating enum types...");
let enums = source
.query(
r#"
SELECT
t.typname,
string_agg(e.enumlabel, ',' ORDER BY e.enumsortorder) as labels
FROM pg_type t
JOIN pg_namespace n ON n.oid = t.typnamespace
JOIN pg_enum e ON e.enumtypid = t.oid
WHERE n.nspname = $1
GROUP BY t.typname
"#,
&[&source.schema()],
)
.await?;
for row in &enums {
let type_name: String = row.get(0);
let labels: String = row.get(1);
let label_list: Vec<&str> = labels.split(',').collect();
let quoted_labels = label_list
.iter()
.map(|l| format!("'{}'", l))
.collect::<Vec<_>>()
.join(", ");
let create_enum = format!(
"CREATE TYPE {}.\"{}\" AS ENUM ({})",
source.schema(),
type_name,
quoted_labels
);
if let Err(e) = dest.batch_execute(&create_enum).await {
warn!("Failed to create enum {}: {:?}", type_name, e);
} else {
info!(" Created enum: {}", type_name);
}
}
// Get and apply table definitions
for table in &tables {
info!("Creating table: {}", table);
// Use pg_attribute and format_type() for proper type names (handles arrays, enums, etc.)
let rows = source
.query(
r#"
SELECT
'CREATE TABLE IF NOT EXISTS ' || $1 || '."' || $2 || '" (' ||
'CREATE TABLE IF NOT EXISTS ' || $1 || '."' || c.relname || '" (' ||
string_agg(
'"' || column_name || '" ' ||
data_type ||
CASE
WHEN character_maximum_length IS NOT NULL
THEN '(' || character_maximum_length || ')'
ELSE ''
END ||
CASE WHEN is_nullable = 'NO' THEN ' NOT NULL' ELSE '' END ||
CASE WHEN column_default IS NOT NULL THEN ' DEFAULT ' || column_default ELSE '' END,
'"' || a.attname || '" ' ||
format_type(a.atttypid, a.atttypmod) ||
CASE WHEN a.attnotnull THEN ' NOT NULL' ELSE '' END ||
CASE WHEN d.adrelid IS NOT NULL THEN ' DEFAULT ' || pg_get_expr(d.adbin, d.adrelid) ELSE '' END,
', '
ORDER BY ordinal_position
ORDER BY a.attnum
) || ')'
FROM information_schema.columns
WHERE table_schema = $1 AND table_name = $2
GROUP BY table_schema, table_name
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
JOIN pg_attribute a ON a.attrelid = c.oid
LEFT JOIN pg_attrdef d ON d.adrelid = c.oid AND d.adnum = a.attnum
WHERE n.nspname = $1
AND c.relname = $2
AND a.attnum > 0
AND NOT a.attisdropped
GROUP BY c.relname
"#,
&[&source.schema(), table],
)
@@ -60,7 +137,7 @@ pub async fn migrate_schema(source: &Database, dest: &Database) -> Result<()> {
if let Some(row) = rows.first() {
let create_sql: String = row.get(0);
if let Err(e) = dest.batch_execute(&create_sql).await {
warn!("Failed to create table {}: {} (may already exist)", table, e);
warn!("Failed to create table {}: {:?}", table, e);
}
}
}
@@ -250,25 +327,35 @@ pub async fn migrate_table(source: &Database, dest: &Database, table: &str) -> R
// This is simplified - full implementation would handle all types
let values: Vec<String> = (0..column_names.len())
.map(|i| {
let col_name = &column_names[i];
// Try to get as different types and format appropriately
if let Ok(v) = row.try_get::<_, Option<String>>(i) {
let is_null = if let Ok(v) = row.try_get::<_, Option<String>>(i) {
match v {
Some(s) => format!("'{}'", s.replace('\'', "''")),
None => "NULL".to_string(),
Some(s) => return format!("'{}'", s.replace('\'', "''")),
None => true,
}
} else if let Ok(v) = row.try_get::<_, Option<i64>>(i) {
match v {
Some(n) => n.to_string(),
None => "NULL".to_string(),
Some(n) => return n.to_string(),
None => true,
}
} else if let Ok(v) = row.try_get::<_, Option<bool>>(i) {
match v {
Some(b) => b.to_string(),
None => "NULL".to_string(),
Some(b) => return b.to_string(),
None => true,
}
} else {
"NULL".to_string()
true
};
// If NULL, check if we have a default for this column
if is_null {
if let Some(default) = get_null_default(table, col_name) {
return default.to_string();
}
}
"NULL".to_string()
})
.collect();
@@ -281,7 +368,7 @@ pub async fn migrate_table(source: &Database, dest: &Database, table: &str) -> R
);
if let Err(e) = dest.batch_execute(&insert).await {
warn!("Failed to insert row: {}", e);
warn!("Failed to insert row: {:?}", e);
}
}
@@ -375,29 +462,39 @@ pub async fn migrate_single_user(source: &Database, dest: &Database, user_id: &s
for row in &data_rows {
let values: Vec<String> = (0..column_names.len())
.map(|i| {
if let Ok(v) = row.try_get::<_, Option<String>>(i) {
let col_name = &column_names[i];
let is_null = if let Ok(v) = row.try_get::<_, Option<String>>(i) {
match v {
Some(s) => format!("'{}'", s.replace('\'', "''")),
None => "NULL".to_string(),
Some(s) => return format!("'{}'", s.replace('\'', "''")),
None => true,
}
} else if let Ok(v) = row.try_get::<_, Option<i64>>(i) {
match v {
Some(n) => n.to_string(),
None => "NULL".to_string(),
Some(n) => return n.to_string(),
None => true,
}
} else if let Ok(v) = row.try_get::<_, Option<bool>>(i) {
match v {
Some(b) => b.to_string(),
None => "NULL".to_string(),
Some(b) => return b.to_string(),
None => true,
}
} else if let Ok(v) = row.try_get::<_, Option<uuid::Uuid>>(i) {
match v {
Some(u) => format!("'{}'", u),
None => "NULL".to_string(),
Some(u) => return format!("'{}'", u),
None => true,
}
} else {
"NULL".to_string()
true
};
// If NULL, check if we have a default for this column
if is_null {
if let Some(default) = get_null_default(table, col_name) {
return default.to_string();
}
}
"NULL".to_string()
})
.collect();