perf(backend/db): Optimize StoreAgent and Creator views with database indexes and materialized views (#10084)

### Summary
Performance optimization for the platform's store and creator
functionality by adding targeted database indexes and implementing
materialized views to reduce query execution time.

### Changes 🏗️

**Database Performance Optimizations:**
- Added strategic database indexes for `StoreListing`,
`StoreListingVersion`, `StoreListingReview`, `AgentGraphExecution`, and
`Profile` tables
- Implemented materialized views (`mv_agent_run_counts`,
`mv_review_stats`) to cache expensive aggregation queries
- Optimized `StoreAgent` and `Creator` views to use materialized views
and improved query patterns
- Added automated refresh function with 15-minute scheduling for
materialized views (when pg_cron extension is available)

**Key Performance Improvements:**
- Filtered indexes on approved store listings to speed up marketplace
queries
- GIN index on categories for faster category-based searches
- Composite indexes for common query patterns (e.g., listing + version
lookups)
- Pre-computed agent run counts and review statistics to eliminate
expensive aggregations

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Verified migration runs successfully without errors
  - [x] Confirmed materialized views are created and populated correctly
- [x] Tested StoreAgent and Creator view queries return expected results
  - [x] Validated automatic refresh function works properly
  - [x] Confirmed rollback migration successfully removes all changes

#### For configuration changes:
- [x] `.env.example` is updated or already compatible with my changes
- [x] `docker-compose.yml` is updated or already compatible with my
changes
- [x] I have included a list of my configuration changes in the PR
description (under **Changes**)

**Note:** No configuration changes were required as this is purely a
database schema optimization.
This commit is contained in:
Swifty
2025-07-10 16:57:55 +02:00
committed by GitHub
parent 243400e128
commit 7688a9701e
13 changed files with 1906 additions and 81 deletions

View File

@@ -0,0 +1,150 @@
# Test Data Scripts
This directory contains scripts for creating and updating test data in the AutoGPT Platform database, specifically designed to test the materialized views for the store functionality.
## Scripts
### test_data_creator.py
Creates a comprehensive set of test data including:
- Users with profiles
- Agent graphs, nodes, and executions
- Store listings with multiple versions
- Reviews and ratings
- Library agents
- Integration webhooks
- Onboarding data
- Credit transactions
**Image/Video Domains Used:**
- Images: `picsum.photos` (for all image URLs)
- Videos: `youtube.com` (for store listing videos)
### test_data_updater.py
Updates existing test data to simulate real-world changes:
- Adds new agent graph executions
- Creates new store listing reviews
- Updates store listing versions
- Adds credit transactions
- Refreshes materialized views
### check_db.py
Tests and verifies materialized views functionality:
- Checks pg_cron job status (for automatic refresh)
- Displays current materialized view counts
- Adds test data (executions and reviews)
- Creates store listings if none exist
- Manually refreshes materialized views
- Compares before/after counts to verify updates
- Provides a summary of test results
## Materialized Views
The scripts test three key database views:
1. **mv_agent_run_counts**: Tracks execution counts by agent
2. **mv_review_stats**: Tracks review statistics (count, average rating) by store listing
3. **StoreAgent**: A view that combines store listing data with execution counts and ratings for display
The materialized views (mv_agent_run_counts and mv_review_stats) are automatically refreshed every 15 minutes via pg_cron, or can be manually refreshed using the `refresh_store_materialized_views()` function.
## Usage
### Prerequisites
1. Ensure the database is running:
```bash
docker compose up -d
# or for test database:
docker compose -f docker-compose.test.yaml --env-file ../.env up -d
```
2. Run database migrations:
```bash
poetry run prisma migrate deploy
```
### Running the Scripts
#### Option 1: Use the helper script (from backend directory)
```bash
poetry run python run_test_data.py
```
#### Option 2: Run individually
```bash
# From backend/test directory:
# Create initial test data
poetry run python test_data_creator.py
# Update data to test materialized view changes
poetry run python test_data_updater.py
# From backend directory:
# Test materialized views functionality
poetry run python check_db.py
# Check store data status
poetry run python check_store_data.py
```
#### Option 3: Use the shell script (from backend directory)
```bash
./run_test_data_scripts.sh
```
### Manual Materialized View Refresh
To manually refresh the materialized views:
```sql
SELECT refresh_store_materialized_views();
```
## Configuration
The scripts use the database configuration from your `.env` file:
- `DATABASE_URL`: PostgreSQL connection string
- Database should have the platform schema
## Data Generation Limits
Configured in `test_data_creator.py`:
- 100 users
- 100 agent blocks
- 1-5 graphs per user
- 2-5 nodes per graph
- 1-5 presets per user
- 1-10 library agents per user
- 1-20 executions per graph
- 1-5 reviews per store listing version
## Notes
- All image URLs use `picsum.photos` for consistency with Next.js image configuration
- The scripts create realistic relationships between entities
- Materialized views are refreshed at the end of each script
- Data is designed to test both happy paths and edge cases
## Troubleshooting
### Reviews and StoreAgent view showing 0
If `check_db.py` shows that reviews remain at 0 and StoreAgent view shows 0 store agents:
1. **No store listings exist**: The script will automatically create test store listings if none exist
2. **No approved versions**: Store listings need approved versions to appear in the StoreAgent view
3. **Check with `check_store_data.py`**: This script provides detailed information about:
- Total store listings
- Store listing versions by status
- Existing reviews
- StoreAgent view contents
- Agent graph executions
### pg_cron not installed
The warning "pg_cron extension is not installed" is normal in local development environments. The materialized views can still be refreshed manually using the `refresh_store_materialized_views()` function, which all scripts do automatically.
### Common Issues
- **Type errors with None values**: Fixed in the latest version of check_db.py by using `or 0` for nullable numeric fields
- **Missing relations**: Ensure you're using the correct field names (e.g., `StoreListing` not `storeListing` in includes)
- **Column name mismatches**: The database uses camelCase for column names (e.g., `agentGraphId` not `agent_graph_id`)

View File

@@ -0,0 +1,359 @@
import asyncio
import random
from datetime import datetime
from faker import Faker
from prisma import Prisma
faker = Faker()
async def check_cron_job(db):
"""Check if the pg_cron job for refreshing materialized views exists."""
print("\n1. Checking pg_cron job...")
print("-" * 40)
try:
# Check if pg_cron extension exists
extension_check = await db.query_raw("CREATE EXTENSION pg_cron;")
print(extension_check)
extension_check = await db.query_raw(
"SELECT COUNT(*) as count FROM pg_extension WHERE extname = 'pg_cron'"
)
if extension_check[0]["count"] == 0:
print("⚠️ pg_cron extension is not installed")
return False
# Check if the refresh job exists
job_check = await db.query_raw(
"""
SELECT jobname, schedule, command
FROM cron.job
WHERE jobname = 'refresh-store-views'
"""
)
if job_check:
job = job_check[0]
print("✅ pg_cron job found:")
print(f" Name: {job['jobname']}")
print(f" Schedule: {job['schedule']} (every 15 minutes)")
print(f" Command: {job['command']}")
return True
else:
print("⚠️ pg_cron job 'refresh-store-views' not found")
return False
except Exception as e:
print(f"❌ Error checking pg_cron: {e}")
return False
async def get_materialized_view_counts(db):
"""Get current counts from materialized views."""
print("\n2. Getting current materialized view data...")
print("-" * 40)
# Get counts from mv_agent_run_counts
agent_runs = await db.query_raw(
"""
SELECT COUNT(*) as total_agents,
SUM(run_count) as total_runs,
MAX(run_count) as max_runs,
MIN(run_count) as min_runs
FROM mv_agent_run_counts
"""
)
# Get counts from mv_review_stats
review_stats = await db.query_raw(
"""
SELECT COUNT(*) as total_listings,
SUM(review_count) as total_reviews,
AVG(avg_rating) as overall_avg_rating
FROM mv_review_stats
"""
)
# Get sample data from StoreAgent view
store_agents = await db.query_raw(
"""
SELECT COUNT(*) as total_store_agents,
AVG(runs) as avg_runs,
AVG(rating) as avg_rating
FROM "StoreAgent"
"""
)
agent_run_data = agent_runs[0] if agent_runs else {}
review_data = review_stats[0] if review_stats else {}
store_data = store_agents[0] if store_agents else {}
print("📊 mv_agent_run_counts:")
print(f" Total agents: {agent_run_data.get('total_agents', 0)}")
print(f" Total runs: {agent_run_data.get('total_runs', 0)}")
print(f" Max runs per agent: {agent_run_data.get('max_runs', 0)}")
print(f" Min runs per agent: {agent_run_data.get('min_runs', 0)}")
print("\n📊 mv_review_stats:")
print(f" Total listings: {review_data.get('total_listings', 0)}")
print(f" Total reviews: {review_data.get('total_reviews', 0)}")
print(f" Overall avg rating: {review_data.get('overall_avg_rating') or 0:.2f}")
print("\n📊 StoreAgent view:")
print(f" Total store agents: {store_data.get('total_store_agents', 0)}")
print(f" Average runs: {store_data.get('avg_runs') or 0:.2f}")
print(f" Average rating: {store_data.get('avg_rating') or 0:.2f}")
return {
"agent_runs": agent_run_data,
"reviews": review_data,
"store_agents": store_data,
}
async def add_test_data(db):
"""Add some test data to verify materialized view updates."""
print("\n3. Adding test data...")
print("-" * 40)
# Get some existing data
users = await db.user.find_many(take=5)
graphs = await db.agentgraph.find_many(take=5)
if not users or not graphs:
print("❌ No existing users or graphs found. Run test_data_creator.py first.")
return False
# Add new executions
print("Adding new agent graph executions...")
new_executions = 0
for graph in graphs:
for _ in range(random.randint(2, 5)):
await db.agentgraphexecution.create(
data={
"agentGraphId": graph.id,
"agentGraphVersion": graph.version,
"userId": random.choice(users).id,
"executionStatus": "COMPLETED",
"startedAt": datetime.now(),
}
)
new_executions += 1
print(f"✅ Added {new_executions} new executions")
# Check if we need to create store listings first
store_versions = await db.storelistingversion.find_many(
where={"submissionStatus": "APPROVED"}, take=5
)
if not store_versions:
print("\nNo approved store listings found. Creating test store listings...")
# Create store listings for existing agent graphs
for i, graph in enumerate(graphs[:3]): # Create up to 3 store listings
# Create a store listing
listing = await db.storelisting.create(
data={
"slug": f"test-agent-{graph.id[:8]}",
"agentGraphId": graph.id,
"agentGraphVersion": graph.version,
"hasApprovedVersion": True,
"owningUserId": graph.userId,
}
)
# Create an approved version
version = await db.storelistingversion.create(
data={
"storeListingId": listing.id,
"agentGraphId": graph.id,
"agentGraphVersion": graph.version,
"name": f"Test Agent {i+1}",
"subHeading": faker.catch_phrase(),
"description": faker.paragraph(nb_sentences=5),
"imageUrls": [faker.image_url()],
"categories": ["productivity", "automation"],
"submissionStatus": "APPROVED",
"submittedAt": datetime.now(),
}
)
# Update listing with active version
await db.storelisting.update(
where={"id": listing.id}, data={"activeVersionId": version.id}
)
print("✅ Created test store listings")
# Re-fetch approved versions
store_versions = await db.storelistingversion.find_many(
where={"submissionStatus": "APPROVED"}, take=5
)
# Add new reviews
print("\nAdding new store listing reviews...")
new_reviews = 0
for version in store_versions:
# Find users who haven't reviewed this version
existing_reviews = await db.storelistingreview.find_many(
where={"storeListingVersionId": version.id}
)
reviewed_user_ids = {r.reviewByUserId for r in existing_reviews}
available_users = [u for u in users if u.id not in reviewed_user_ids]
if available_users:
user = random.choice(available_users)
await db.storelistingreview.create(
data={
"storeListingVersionId": version.id,
"reviewByUserId": user.id,
"score": random.randint(3, 5),
"comments": faker.text(max_nb_chars=100),
}
)
new_reviews += 1
print(f"✅ Added {new_reviews} new reviews")
return True
async def refresh_materialized_views(db):
"""Manually refresh the materialized views."""
print("\n4. Manually refreshing materialized views...")
print("-" * 40)
try:
await db.execute_raw("SELECT refresh_store_materialized_views();")
print("✅ Materialized views refreshed successfully")
return True
except Exception as e:
print(f"❌ Error refreshing views: {e}")
return False
async def compare_counts(before, after):
"""Compare counts before and after refresh."""
print("\n5. Comparing counts before and after refresh...")
print("-" * 40)
# Compare agent runs
print("🔍 Agent run changes:")
before_runs = before["agent_runs"].get("total_runs") or 0
after_runs = after["agent_runs"].get("total_runs") or 0
print(
f" Total runs: {before_runs}{after_runs} " f"(+{after_runs - before_runs})"
)
# Compare reviews
print("\n🔍 Review changes:")
before_reviews = before["reviews"].get("total_reviews") or 0
after_reviews = after["reviews"].get("total_reviews") or 0
print(
f" Total reviews: {before_reviews}{after_reviews} "
f"(+{after_reviews - before_reviews})"
)
# Compare store agents
print("\n🔍 StoreAgent view changes:")
before_avg_runs = before["store_agents"].get("avg_runs", 0) or 0
after_avg_runs = after["store_agents"].get("avg_runs", 0) or 0
print(
f" Average runs: {before_avg_runs:.2f}{after_avg_runs:.2f} "
f"(+{after_avg_runs - before_avg_runs:.2f})"
)
# Verify changes occurred
runs_changed = (after["agent_runs"].get("total_runs") or 0) > (
before["agent_runs"].get("total_runs") or 0
)
reviews_changed = (after["reviews"].get("total_reviews") or 0) > (
before["reviews"].get("total_reviews") or 0
)
if runs_changed and reviews_changed:
print("\n✅ Materialized views are updating correctly!")
return True
else:
print("\n⚠️ Some materialized views may not have updated:")
if not runs_changed:
print(" - Agent run counts did not increase")
if not reviews_changed:
print(" - Review counts did not increase")
return False
async def main():
db = Prisma()
await db.connect()
print("=" * 60)
print("Materialized Views Test")
print("=" * 60)
try:
# Check if data exists
user_count = await db.user.count()
if user_count == 0:
print("❌ No data in database. Please run test_data_creator.py first.")
await db.disconnect()
return
# 1. Check cron job
cron_exists = await check_cron_job(db)
# 2. Get initial counts
counts_before = await get_materialized_view_counts(db)
# 3. Add test data
data_added = await add_test_data(db)
refresh_success = False
if data_added:
# Wait a moment for data to be committed
print("\nWaiting for data to be committed...")
await asyncio.sleep(2)
# 4. Manually refresh views
refresh_success = await refresh_materialized_views(db)
if refresh_success:
# 5. Get counts after refresh
counts_after = await get_materialized_view_counts(db)
# 6. Compare results
await compare_counts(counts_before, counts_after)
# Summary
print("\n" + "=" * 60)
print("Test Summary")
print("=" * 60)
print(f"✓ pg_cron job exists: {'Yes' if cron_exists else 'No'}")
print(f"✓ Test data added: {'Yes' if data_added else 'No'}")
print(f"✓ Manual refresh worked: {'Yes' if refresh_success else 'No'}")
print(
f"✓ Views updated correctly: {'Yes' if data_added and refresh_success else 'Cannot verify'}"
)
if cron_exists:
print(
"\n💡 The materialized views will also refresh automatically every 15 minutes via pg_cron."
)
else:
print(
"\n⚠️ Automatic refresh is not configured. Views must be refreshed manually."
)
except Exception as e:
print(f"\n❌ Test failed with error: {e}")
import traceback
traceback.print_exc()
await db.disconnect()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,159 @@
#!/usr/bin/env python3
"""Check store-related data in the database."""
import asyncio
from prisma import Prisma
async def check_store_data(db):
"""Check what store data exists in the database."""
print("============================================================")
print("Store Data Check")
print("============================================================")
# Check store listings
print("\n1. Store Listings:")
print("-" * 40)
listings = await db.storelisting.find_many()
print(f"Total store listings: {len(listings)}")
if listings:
for listing in listings[:5]:
print(f"\nListing ID: {listing.id}")
print(f" Name: {listing.name}")
print(f" Status: {listing.status}")
print(f" Slug: {listing.slug}")
# Check store listing versions
print("\n\n2. Store Listing Versions:")
print("-" * 40)
versions = await db.storelistingversion.find_many(include={"StoreListing": True})
print(f"Total store listing versions: {len(versions)}")
# Group by submission status
status_counts = {}
for version in versions:
status = version.submissionStatus
status_counts[status] = status_counts.get(status, 0) + 1
print("\nVersions by status:")
for status, count in status_counts.items():
print(f" {status}: {count}")
# Show approved versions
approved_versions = [v for v in versions if v.submissionStatus == "APPROVED"]
print(f"\nApproved versions: {len(approved_versions)}")
if approved_versions:
for version in approved_versions[:5]:
print(f"\n Version ID: {version.id}")
print(f" Listing: {version.StoreListing.name}")
print(f" Version: {version.version}")
# Check store listing reviews
print("\n\n3. Store Listing Reviews:")
print("-" * 40)
reviews = await db.storelistingreview.find_many(
include={"StoreListingVersion": {"include": {"StoreListing": True}}}
)
print(f"Total reviews: {len(reviews)}")
if reviews:
# Calculate average rating
total_score = sum(r.score for r in reviews)
avg_score = total_score / len(reviews) if reviews else 0
print(f"Average rating: {avg_score:.2f}")
# Show sample reviews
print("\nSample reviews:")
for review in reviews[:3]:
print(f"\n Review for: {review.StoreListingVersion.StoreListing.name}")
print(f" Score: {review.score}")
print(f" Comments: {review.comments[:100]}...")
# Check StoreAgent view data
print("\n\n4. StoreAgent View Data:")
print("-" * 40)
# Query the StoreAgent view
query = """
SELECT
sa.listing_id,
sa.slug,
sa.agent_name,
sa.description,
sa.featured,
sa.runs,
sa.rating,
sa.creator_username,
sa.categories,
sa.updated_at
FROM "StoreAgent" sa
LIMIT 10;
"""
store_agents = await db.query_raw(query)
print(f"Total store agents in view: {len(store_agents)}")
if store_agents:
for agent in store_agents[:5]:
print(f"\nStore Agent: {agent['agent_name']}")
print(f" Slug: {agent['slug']}")
print(f" Runs: {agent['runs']}")
print(f" Rating: {agent['rating']}")
print(f" Creator: {agent['creator_username']}")
# Check the underlying data that should populate StoreAgent
print("\n\n5. Data that should populate StoreAgent view:")
print("-" * 40)
# Check for any APPROVED store listing versions
query = """
SELECT COUNT(*) as count
FROM "StoreListingVersion"
WHERE "submissionStatus" = 'APPROVED'
"""
result = await db.query_raw(query)
approved_count = result[0]["count"] if result else 0
print(f"Approved store listing versions: {approved_count}")
# Check for store listings with hasApprovedVersion = true
query = """
SELECT COUNT(*) as count
FROM "StoreListing"
WHERE "hasApprovedVersion" = true AND "isDeleted" = false
"""
result = await db.query_raw(query)
has_approved_count = result[0]["count"] if result else 0
print(f"Store listings with approved versions: {has_approved_count}")
# Check agent graph executions
query = """
SELECT COUNT(DISTINCT "agentGraphId") as unique_agents,
COUNT(*) as total_executions
FROM "AgentGraphExecution"
"""
result = await db.query_raw(query)
if result:
print("\nAgent Graph Executions:")
print(f" Unique agents with executions: {result[0]['unique_agents']}")
print(f" Total executions: {result[0]['total_executions']}")
async def main():
"""Main function."""
db = Prisma()
await db.connect()
try:
await check_store_data(db)
finally:
await db.disconnect()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,101 @@
#!/usr/bin/env python3
"""
Clean the test database by removing all data while preserving the schema.
Usage:
poetry run python clean_test_db.py [--yes]
Options:
--yes Skip confirmation prompt
"""
import asyncio
import sys
from prisma import Prisma
async def main():
db = Prisma()
await db.connect()
print("=" * 60)
print("Cleaning Test Database")
print("=" * 60)
print()
# Get initial counts
user_count = await db.user.count()
agent_count = await db.agentgraph.count()
print(f"Current data: {user_count} users, {agent_count} agent graphs")
if user_count == 0 and agent_count == 0:
print("Database is already clean!")
await db.disconnect()
return
# Check for --yes flag
skip_confirm = "--yes" in sys.argv
if not skip_confirm:
response = input("\nDo you want to clean all data? (yes/no): ")
if response.lower() != "yes":
print("Aborted.")
await db.disconnect()
return
print("\nCleaning database...")
# Delete in reverse order of dependencies
tables = [
("UserNotificationBatch", db.usernotificationbatch),
("NotificationEvent", db.notificationevent),
("CreditRefundRequest", db.creditrefundrequest),
("StoreListingReview", db.storelistingreview),
("StoreListingVersion", db.storelistingversion),
("StoreListing", db.storelisting),
("AgentNodeExecutionInputOutput", db.agentnodeexecutioninputoutput),
("AgentNodeExecution", db.agentnodeexecution),
("AgentGraphExecution", db.agentgraphexecution),
("AgentNodeLink", db.agentnodelink),
("LibraryAgent", db.libraryagent),
("AgentPreset", db.agentpreset),
("IntegrationWebhook", db.integrationwebhook),
("AgentNode", db.agentnode),
("AgentGraph", db.agentgraph),
("AgentBlock", db.agentblock),
("APIKey", db.apikey),
("CreditTransaction", db.credittransaction),
("AnalyticsMetrics", db.analyticsmetrics),
("AnalyticsDetails", db.analyticsdetails),
("Profile", db.profile),
("UserOnboarding", db.useronboarding),
("User", db.user),
]
for table_name, table in tables:
try:
count = await table.count()
if count > 0:
await table.delete_many()
print(f"✓ Deleted {count} records from {table_name}")
except Exception as e:
print(f"⚠ Error cleaning {table_name}: {e}")
# Refresh materialized views (they should be empty now)
try:
await db.execute_raw("SELECT refresh_store_materialized_views();")
print("\n✓ Refreshed materialized views")
except Exception as e:
print(f"\n⚠ Could not refresh materialized views: {e}")
await db.disconnect()
print("\n" + "=" * 60)
print("Database cleaned successfully!")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,37 +1,60 @@
networks:
app-network:
name: app-network
shared-network:
name: shared-network
volumes:
supabase-config:
x-agpt-services:
&agpt-services
networks:
- app-network
- shared-network
x-supabase-services:
&supabase-services
networks:
- app-network
- shared-network
volumes:
clamav-data:
services:
postgres-test:
image: ankane/pgvector:latest
environment:
- POSTGRES_USER=${DB_USER:-postgres}
- POSTGRES_PASSWORD=${DB_PASS:-postgres}
- POSTGRES_DB=${DB_NAME:-postgres}
- POSTGRES_PORT=${DB_PORT:-5432}
healthcheck:
test: pg_isready -U $$POSTGRES_USER -d $$POSTGRES_DB
interval: 10s
timeout: 5s
retries: 5
db:
<<: *supabase-services
extends:
file: ../db/docker/docker-compose.yml
service: db
ports:
- "${DB_PORT:-5432}:5432"
networks:
- app-network-test
redis-test:
- ${POSTGRES_PORT}:5432 # We don't use Supavisor locally, so we expose the db directly.
vector:
<<: *supabase-services
extends:
file: ../db/docker/docker-compose.yml
service: vector
redis:
<<: *agpt-services
image: redis:latest
command: redis-server --requirepass password
ports:
- "6379:6379"
networks:
- app-network-test
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
rabbitmq-test:
rabbitmq:
<<: *agpt-services
image: rabbitmq:management
container_name: rabbitmq-test
container_name: rabbitmq
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 30s
@@ -40,7 +63,7 @@ services:
start_period: 10s
environment:
- RABBITMQ_DEFAULT_USER=rabbitmq_user_default
- RABBITMQ_DEFAULT_PASS=k0VMxyIJF9S35f3x2uaw5IWAl6Y536O7 # CHANGE THIS TO A RANDOM PASSWORD IN PRODUCTION -- everywhere lol
- RABBITMQ_DEFAULT_PASS=k0VMxyIJF9S35f3x2uaw5IWAl6Y536O7
ports:
- "5672:5672"
- "15672:15672"

View File

@@ -0,0 +1,254 @@
-- This migration creates materialized views for performance optimization
--
-- IMPORTANT: For production environments, pg_cron is REQUIRED for automatic refresh
-- Prerequisites for production:
-- 1. pg_cron extension must be installed: CREATE EXTENSION pg_cron;
-- 2. pg_cron must be configured in postgresql.conf:
-- shared_preload_libraries = 'pg_cron'
-- cron.database_name = 'your_database_name'
--
-- For development environments without pg_cron:
-- The migration will succeed but you must manually refresh views with:
-- SELECT refresh_store_materialized_views();
-- Check if pg_cron extension is installed and set a flag
DO $$
DECLARE
has_pg_cron BOOLEAN;
BEGIN
SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pg_cron') INTO has_pg_cron;
IF NOT has_pg_cron THEN
RAISE WARNING 'pg_cron extension is not installed!';
RAISE WARNING 'Materialized views will be created but WILL NOT refresh automatically.';
RAISE WARNING 'For production use, install pg_cron with: CREATE EXTENSION pg_cron;';
RAISE WARNING 'For development, manually refresh with: SELECT refresh_store_materialized_views();';
-- For production deployments, uncomment the following line to make pg_cron mandatory:
-- RAISE EXCEPTION 'pg_cron is required for production deployments';
END IF;
-- Store the flag for later use in the migration
PERFORM set_config('migration.has_pg_cron', has_pg_cron::text, false);
END
$$;
-- CreateIndex
-- Optimized: Only include owningUserId in index columns since isDeleted and hasApprovedVersion are in WHERE clause
CREATE INDEX IF NOT EXISTS "idx_store_listing_approved" ON "StoreListing"("owningUserId") WHERE "isDeleted" = false AND "hasApprovedVersion" = true;
-- CreateIndex
-- Optimized: Only include storeListingId since submissionStatus is in WHERE clause
CREATE INDEX IF NOT EXISTS "idx_store_listing_version_status" ON "StoreListingVersion"("storeListingId") WHERE "submissionStatus" = 'APPROVED';
-- CreateIndex
CREATE INDEX IF NOT EXISTS "idx_slv_categories_gin" ON "StoreListingVersion" USING GIN ("categories") WHERE "submissionStatus" = 'APPROVED';
-- CreateIndex
CREATE INDEX IF NOT EXISTS "idx_slv_agent" ON "StoreListingVersion"("agentGraphId", "agentGraphVersion") WHERE "submissionStatus" = 'APPROVED';
-- CreateIndex
CREATE INDEX IF NOT EXISTS "idx_store_listing_review_version" ON "StoreListingReview"("storeListingVersionId");
-- CreateIndex
CREATE INDEX IF NOT EXISTS "idx_agent_graph_execution_agent" ON "AgentGraphExecution"("agentGraphId");
-- CreateIndex
CREATE INDEX IF NOT EXISTS "idx_profile_user" ON "Profile"("userId");
-- Additional performance indexes
CREATE INDEX IF NOT EXISTS "idx_store_listing_version_approved_listing" ON "StoreListingVersion"("storeListingId", "version") WHERE "submissionStatus" = 'APPROVED';
-- Create materialized view for agent run counts
CREATE MATERIALIZED VIEW IF NOT EXISTS "mv_agent_run_counts" AS
SELECT
"agentGraphId",
COUNT(*) AS run_count
FROM "AgentGraphExecution"
GROUP BY "agentGraphId";
-- CreateIndex
CREATE UNIQUE INDEX IF NOT EXISTS "idx_mv_agent_run_counts" ON "mv_agent_run_counts"("agentGraphId");
-- Create materialized view for review statistics
CREATE MATERIALIZED VIEW IF NOT EXISTS "mv_review_stats" AS
SELECT
sl.id AS "storeListingId",
COUNT(sr.id) AS review_count,
AVG(sr.score::numeric) AS avg_rating
FROM "StoreListing" sl
JOIN "StoreListingVersion" slv ON slv."storeListingId" = sl.id
LEFT JOIN "StoreListingReview" sr ON sr."storeListingVersionId" = slv.id
WHERE sl."isDeleted" = false
AND slv."submissionStatus" = 'APPROVED'
GROUP BY sl.id;
-- CreateIndex
CREATE UNIQUE INDEX IF NOT EXISTS "idx_mv_review_stats" ON "mv_review_stats"("storeListingId");
-- DropForeignKey (if any exist on the views)
-- None needed as views don't have foreign keys
-- DropView
DROP VIEW IF EXISTS "Creator";
-- DropView
DROP VIEW IF EXISTS "StoreAgent";
-- CreateView
CREATE OR REPLACE VIEW "StoreAgent" AS
WITH agent_versions AS (
SELECT
"storeListingId",
array_agg(DISTINCT version::text ORDER BY version::text) AS versions
FROM "StoreListingVersion"
WHERE "submissionStatus" = 'APPROVED'
GROUP BY "storeListingId"
)
SELECT
sl.id AS listing_id,
slv.id AS "storeListingVersionId",
slv."createdAt" AS updated_at,
sl.slug,
COALESCE(slv.name, '') AS agent_name,
slv."videoUrl" AS agent_video,
COALESCE(slv."imageUrls", ARRAY[]::text[]) AS agent_image,
slv."isFeatured" AS featured,
p.username AS creator_username,
p."avatarUrl" AS creator_avatar,
slv."subHeading" AS sub_heading,
slv.description,
slv.categories,
COALESCE(ar.run_count, 0::bigint) AS runs,
COALESCE(rs.avg_rating, 0.0)::double precision AS rating,
COALESCE(av.versions, ARRAY[slv.version::text]) AS versions
FROM "StoreListing" sl
INNER JOIN "StoreListingVersion" slv
ON slv."storeListingId" = sl.id
AND slv."submissionStatus" = 'APPROVED'
JOIN "AgentGraph" a
ON slv."agentGraphId" = a.id
AND slv."agentGraphVersion" = a.version
LEFT JOIN "Profile" p
ON sl."owningUserId" = p."userId"
LEFT JOIN "mv_review_stats" rs
ON sl.id = rs."storeListingId"
LEFT JOIN "mv_agent_run_counts" ar
ON a.id = ar."agentGraphId"
LEFT JOIN agent_versions av
ON sl.id = av."storeListingId"
WHERE sl."isDeleted" = false
AND sl."hasApprovedVersion" = true;
-- CreateView
CREATE OR REPLACE VIEW "Creator" AS
WITH creator_listings AS (
SELECT
sl."owningUserId",
sl.id AS listing_id,
slv."agentGraphId",
slv.categories,
sr.score,
ar.run_count
FROM "StoreListing" sl
INNER JOIN "StoreListingVersion" slv
ON slv."storeListingId" = sl.id
AND slv."submissionStatus" = 'APPROVED'
LEFT JOIN "StoreListingReview" sr
ON sr."storeListingVersionId" = slv.id
LEFT JOIN "mv_agent_run_counts" ar
ON ar."agentGraphId" = slv."agentGraphId"
WHERE sl."isDeleted" = false
AND sl."hasApprovedVersion" = true
),
creator_stats AS (
SELECT
cl."owningUserId",
COUNT(DISTINCT cl.listing_id) AS num_agents,
AVG(COALESCE(cl.score, 0)::numeric) AS agent_rating,
SUM(DISTINCT COALESCE(cl.run_count, 0)) AS agent_runs,
array_agg(DISTINCT cat ORDER BY cat) FILTER (WHERE cat IS NOT NULL) AS all_categories
FROM creator_listings cl
LEFT JOIN LATERAL unnest(COALESCE(cl.categories, ARRAY[]::text[])) AS cat ON true
GROUP BY cl."owningUserId"
)
SELECT
p.username,
p.name,
p."avatarUrl" AS avatar_url,
p.description,
cs.all_categories AS top_categories,
p.links,
p."isFeatured" AS is_featured,
COALESCE(cs.num_agents, 0::bigint) AS num_agents,
COALESCE(cs.agent_rating, 0.0) AS agent_rating,
COALESCE(cs.agent_runs, 0::numeric) AS agent_runs
FROM "Profile" p
LEFT JOIN creator_stats cs ON cs."owningUserId" = p."userId";
-- Create refresh function that works with the current schema
CREATE OR REPLACE FUNCTION refresh_store_materialized_views()
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
current_schema_name text;
BEGIN
-- Get the current schema
current_schema_name := current_schema();
-- Use CONCURRENTLY for better performance during refresh
EXECUTE format('REFRESH MATERIALIZED VIEW CONCURRENTLY %I."mv_agent_run_counts"', current_schema_name);
EXECUTE format('REFRESH MATERIALIZED VIEW CONCURRENTLY %I."mv_review_stats"', current_schema_name);
RAISE NOTICE 'Materialized views refreshed in schema % at %', current_schema_name, NOW();
EXCEPTION
WHEN OTHERS THEN
-- Fallback to non-concurrent refresh if concurrent fails
EXECUTE format('REFRESH MATERIALIZED VIEW %I."mv_agent_run_counts"', current_schema_name);
EXECUTE format('REFRESH MATERIALIZED VIEW %I."mv_review_stats"', current_schema_name);
RAISE NOTICE 'Materialized views refreshed (non-concurrent) in schema % at % due to: %', current_schema_name, NOW(), SQLERRM;
END;
$$;
-- Initial refresh of materialized views
SELECT refresh_store_materialized_views();
-- Schedule automatic refresh every 15 minutes (only if pg_cron is available)
DO $$
DECLARE
has_pg_cron BOOLEAN;
current_schema_name text;
job_name text;
BEGIN
-- Get the flag we set earlier
has_pg_cron := current_setting('migration.has_pg_cron', true)::boolean;
-- Get current schema name
current_schema_name := current_schema();
-- Create a unique job name for this schema
job_name := format('refresh-store-views-%s', current_schema_name);
IF has_pg_cron THEN
-- Try to unschedule existing job (ignore errors if it doesn't exist)
BEGIN
PERFORM cron.unschedule(job_name);
EXCEPTION WHEN OTHERS THEN
-- Job doesn't exist, that's fine
NULL;
END;
-- Schedule the refresh job with schema-specific command
PERFORM cron.schedule(
job_name,
'*/15 * * * *',
format('SELECT %I.refresh_store_materialized_views();', current_schema_name)
);
RAISE NOTICE 'Scheduled automatic refresh of materialized views every 15 minutes for schema %', current_schema_name;
ELSE
RAISE WARNING '⚠️ Automatic refresh NOT configured - pg_cron is not available';
RAISE WARNING '⚠️ You must manually refresh views with: SELECT refresh_store_materialized_views();';
RAISE WARNING '⚠️ Or install pg_cron for automatic refresh in production';
END IF;
END;
$$;

View File

@@ -0,0 +1,155 @@
-- Unschedule cron job (if it exists)
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pg_cron') THEN
PERFORM cron.unschedule('refresh-store-views');
RAISE NOTICE 'Unscheduled automatic refresh of materialized views';
END IF;
EXCEPTION
WHEN OTHERS THEN
RAISE NOTICE 'Could not unschedule cron job (may not exist): %', SQLERRM;
END;
$$;
-- DropView
DROP VIEW IF EXISTS "Creator";
-- DropView
DROP VIEW IF EXISTS "StoreAgent";
-- CreateView (restore original StoreAgent)
CREATE VIEW "StoreAgent" AS
WITH reviewstats AS (
SELECT sl_1.id AS "storeListingId",
count(sr.id) AS review_count,
avg(sr.score::numeric) AS avg_rating
FROM "StoreListing" sl_1
JOIN "StoreListingVersion" slv_1
ON slv_1."storeListingId" = sl_1.id
JOIN "StoreListingReview" sr
ON sr."storeListingVersionId" = slv_1.id
WHERE sl_1."isDeleted" = false
GROUP BY sl_1.id
), agentruns AS (
SELECT "AgentGraphExecution"."agentGraphId",
count(*) AS run_count
FROM "AgentGraphExecution"
GROUP BY "AgentGraphExecution"."agentGraphId"
)
SELECT sl.id AS listing_id,
slv.id AS "storeListingVersionId",
slv."createdAt" AS updated_at,
sl.slug,
COALESCE(slv.name, '') AS agent_name,
slv."videoUrl" AS agent_video,
COALESCE(slv."imageUrls", ARRAY[]::text[]) AS agent_image,
slv."isFeatured" AS featured,
p.username AS creator_username,
p."avatarUrl" AS creator_avatar,
slv."subHeading" AS sub_heading,
slv.description,
slv.categories,
COALESCE(ar.run_count, 0::bigint) AS runs,
COALESCE(rs.avg_rating, 0.0)::double precision AS rating,
array_agg(DISTINCT slv.version::text) AS versions
FROM "StoreListing" sl
JOIN "StoreListingVersion" slv
ON slv."storeListingId" = sl.id
JOIN "AgentGraph" a
ON slv."agentGraphId" = a.id
AND slv."agentGraphVersion" = a.version
LEFT JOIN "Profile" p
ON sl."owningUserId" = p."userId"
LEFT JOIN reviewstats rs
ON sl.id = rs."storeListingId"
LEFT JOIN agentruns ar
ON a.id = ar."agentGraphId"
WHERE sl."isDeleted" = false
AND sl."hasApprovedVersion" = true
AND slv."submissionStatus" = 'APPROVED'
GROUP BY sl.id, slv.id, sl.slug, slv."createdAt", slv.name, slv."videoUrl",
slv."imageUrls", slv."isFeatured", p.username, p."avatarUrl",
slv."subHeading", slv.description, slv.categories, ar.run_count,
rs.avg_rating;
-- CreateView (restore original Creator)
CREATE VIEW "Creator" AS
WITH agentstats AS (
SELECT p_1.username,
count(DISTINCT sl.id) AS num_agents,
avg(COALESCE(sr.score, 0)::numeric) AS agent_rating,
sum(COALESCE(age.run_count, 0::bigint)) AS agent_runs
FROM "Profile" p_1
LEFT JOIN "StoreListing" sl
ON sl."owningUserId" = p_1."userId"
LEFT JOIN "StoreListingVersion" slv
ON slv."storeListingId" = sl.id
LEFT JOIN "StoreListingReview" sr
ON sr."storeListingVersionId" = slv.id
LEFT JOIN (
SELECT "AgentGraphExecution"."agentGraphId",
count(*) AS run_count
FROM "AgentGraphExecution"
GROUP BY "AgentGraphExecution"."agentGraphId"
) age ON age."agentGraphId" = slv."agentGraphId"
WHERE sl."isDeleted" = false
AND sl."hasApprovedVersion" = true
AND slv."submissionStatus" = 'APPROVED'
GROUP BY p_1.username
)
SELECT p.username,
p.name,
p."avatarUrl" AS avatar_url,
p.description,
array_agg(DISTINCT cats.c) FILTER (WHERE cats.c IS NOT NULL) AS top_categories,
p.links,
p."isFeatured" AS is_featured,
COALESCE(ast.num_agents, 0::bigint) AS num_agents,
COALESCE(ast.agent_rating, 0.0) AS agent_rating,
COALESCE(ast.agent_runs, 0::numeric) AS agent_runs
FROM "Profile" p
LEFT JOIN agentstats ast
ON ast.username = p.username
LEFT JOIN LATERAL (
SELECT unnest(slv.categories) AS c
FROM "StoreListing" sl
JOIN "StoreListingVersion" slv
ON slv."storeListingId" = sl.id
WHERE sl."owningUserId" = p."userId"
AND sl."isDeleted" = false
AND sl."hasApprovedVersion" = true
AND slv."submissionStatus" = 'APPROVED'
) cats ON true
GROUP BY p.username, p.name, p."avatarUrl", p.description, p.links,
p."isFeatured", ast.num_agents, ast.agent_rating, ast.agent_runs;
-- Drop function
DROP FUNCTION IF EXISTS platform.refresh_store_materialized_views();
-- Drop materialized views
DROP MATERIALIZED VIEW IF EXISTS "mv_review_stats";
DROP MATERIALIZED VIEW IF EXISTS "mv_agent_run_counts";
-- DropIndex
DROP INDEX IF EXISTS "idx_profile_user";
-- DropIndex
DROP INDEX IF EXISTS "idx_agent_graph_execution_agent";
-- DropIndex
DROP INDEX IF EXISTS "idx_store_listing_review_version";
-- DropIndex
DROP INDEX IF EXISTS "idx_slv_agent";
-- DropIndex
DROP INDEX IF EXISTS "idx_slv_categories_gin";
-- DropIndex
DROP INDEX IF EXISTS "idx_store_listing_version_status";
-- DropIndex
DROP INDEX IF EXISTS "idx_store_listing_approved";
-- DropIndex
DROP INDEX IF EXISTS "idx_store_listing_version_approved_listing";

View File

@@ -0,0 +1,110 @@
#!/usr/bin/env python3
"""
Run test data creation and update scripts in sequence.
Usage:
poetry run python run_test_data.py
"""
import asyncio
import subprocess
import sys
from pathlib import Path
def run_command(cmd: list[str], cwd: Path | None = None) -> bool:
"""Run a command and return True if successful."""
try:
result = subprocess.run(
cmd, check=True, capture_output=True, text=True, cwd=cwd
)
if result.stdout:
print(result.stdout)
return True
except subprocess.CalledProcessError as e:
print(f"Error running command: {' '.join(cmd)}")
print(f"Error: {e.stderr}")
return False
async def main():
"""Main function to run test data scripts."""
print("=" * 60)
print("Running Test Data Scripts for AutoGPT Platform")
print("=" * 60)
print()
# Get the backend directory
backend_dir = Path(__file__).parent
test_dir = backend_dir / "test"
# Check if we're in the right directory
if not (backend_dir / "pyproject.toml").exists():
print("ERROR: This script must be run from the backend directory")
sys.exit(1)
print("1. Checking database connection...")
print("-" * 40)
# Import here to ensure proper environment setup
try:
from prisma import Prisma
db = Prisma()
await db.connect()
print("✓ Database connection successful")
await db.disconnect()
except Exception as e:
print(f"✗ Database connection failed: {e}")
print("\nPlease ensure:")
print("1. The database services are running (docker compose up -d)")
print("2. The DATABASE_URL in .env is correct")
print("3. Migrations have been run (poetry run prisma migrate deploy)")
sys.exit(1)
print()
print("2. Running test data creator...")
print("-" * 40)
# Run test_data_creator.py
if run_command(["poetry", "run", "python", "test_data_creator.py"], cwd=test_dir):
print()
print("✅ Test data created successfully!")
print()
print("3. Running test data updater...")
print("-" * 40)
# Run test_data_updater.py
if run_command(
["poetry", "run", "python", "test_data_updater.py"], cwd=test_dir
):
print()
print("✅ Test data updated successfully!")
else:
print()
print("❌ Test data updater failed!")
sys.exit(1)
else:
print()
print("❌ Test data creator failed!")
sys.exit(1)
print()
print("=" * 60)
print("Test data setup completed successfully!")
print("=" * 60)
print()
print("The materialized views have been populated with test data:")
print("- mv_agent_run_counts: Agent execution statistics")
print("- mv_review_stats: Store listing review statistics")
print()
print("You can now:")
print("1. Run tests: poetry run test")
print("2. Start the backend: poetry run serve")
print("3. View data in the database")
print()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -13,8 +13,10 @@ def wait_for_postgres(max_retries=5, delay=5):
"compose",
"-f",
"docker-compose.test.yaml",
"--env-file",
"../.env",
"exec",
"postgres-test",
"db",
"pg_isready",
"-U",
"postgres",
@@ -51,6 +53,8 @@ def test():
"compose",
"-f",
"docker-compose.test.yaml",
"--env-file",
"../.env",
"up",
"-d",
]
@@ -74,11 +78,20 @@ def test():
# to their development database, running tests would wipe their local data!
test_env = os.environ.copy()
# Use environment variables if set, otherwise use defaults that match docker-compose.test.yaml
db_user = os.getenv("DB_USER", "postgres")
db_pass = os.getenv("DB_PASS", "postgres")
db_name = os.getenv("DB_NAME", "postgres")
db_port = os.getenv("DB_PORT", "5432")
# Load database configuration from .env file
dotenv_path = os.path.join(os.path.dirname(__file__), "../.env")
if os.path.exists(dotenv_path):
with open(dotenv_path) as f:
for line in f:
if line.strip() and not line.startswith("#"):
key, value = line.strip().split("=", 1)
os.environ[key] = value
# Get database config from environment (now populated from .env)
db_user = os.getenv("POSTGRES_USER", "postgres")
db_pass = os.getenv("POSTGRES_PASSWORD", "postgres")
db_name = os.getenv("POSTGRES_DB", "postgres")
db_port = os.getenv("POSTGRES_PORT", "5432")
# Construct the test database URL - this ensures we're always pointing to the test container
test_env["DATABASE_URL"] = (

View File

@@ -599,7 +599,23 @@ view Creator {
agent_runs Int
is_featured Boolean
// Index or unique are not applied to views
// Note: Prisma doesn't support indexes on views, but the following indexes exist in the database:
//
// Optimized indexes (partial indexes to reduce size and improve performance):
// - idx_profile_user on Profile(userId)
// - idx_store_listing_approved on StoreListing(owningUserId) WHERE isDeleted = false AND hasApprovedVersion = true
// - idx_store_listing_version_status on StoreListingVersion(storeListingId) WHERE submissionStatus = 'APPROVED'
// - idx_slv_categories_gin - GIN index on StoreListingVersion(categories) WHERE submissionStatus = 'APPROVED'
// - idx_slv_agent on StoreListingVersion(agentGraphId, agentGraphVersion) WHERE submissionStatus = 'APPROVED'
// - idx_store_listing_review_version on StoreListingReview(storeListingVersionId)
// - idx_store_listing_version_approved_listing on StoreListingVersion(storeListingId, version) WHERE submissionStatus = 'APPROVED'
// - idx_agent_graph_execution_agent on AgentGraphExecution(agentGraphId)
//
// Materialized views used (refreshed every 15 minutes via pg_cron):
// - mv_agent_run_counts - Pre-aggregated agent execution counts by agentGraphId
// - mv_review_stats - Pre-aggregated review statistics (count, avg rating) by storeListingId
//
// Query strategy: Uses CTEs to efficiently aggregate creator statistics leveraging materialized views
}
view StoreAgent {
@@ -622,7 +638,30 @@ view StoreAgent {
rating Float
versions String[]
// Index or unique are not applied to views
// Note: Prisma doesn't support indexes on views, but the following indexes exist in the database:
//
// Optimized indexes (partial indexes to reduce size and improve performance):
// - idx_store_listing_approved on StoreListing(owningUserId) WHERE isDeleted = false AND hasApprovedVersion = true
// - idx_store_listing_version_status on StoreListingVersion(storeListingId) WHERE submissionStatus = 'APPROVED'
// - idx_slv_categories_gin - GIN index on StoreListingVersion(categories) WHERE submissionStatus = 'APPROVED' for array searches
// - idx_slv_agent on StoreListingVersion(agentGraphId, agentGraphVersion) WHERE submissionStatus = 'APPROVED'
// - idx_store_listing_review_version on StoreListingReview(storeListingVersionId)
// - idx_store_listing_version_approved_listing on StoreListingVersion(storeListingId, version) WHERE submissionStatus = 'APPROVED'
// - idx_agent_graph_execution_agent on AgentGraphExecution(agentGraphId)
// - idx_profile_user on Profile(userId)
//
// Additional indexes from earlier migrations:
// - StoreListing_agentId_owningUserId_idx
// - StoreListing_isDeleted_isApproved_idx (replaced by idx_store_listing_approved)
// - StoreListing_isDeleted_idx
// - StoreListing_agentId_key (unique on agentGraphId)
// - StoreListingVersion_agentId_agentVersion_isDeleted_idx
//
// Materialized views used (refreshed every 15 minutes via pg_cron):
// - mv_agent_run_counts - Pre-aggregated agent execution counts by agentGraphId
// - mv_review_stats - Pre-aggregated review statistics (count, avg rating) by storeListingId
//
// Query strategy: Uses CTE for version aggregation and joins with materialized views for performance
}
view StoreSubmission {
@@ -649,6 +688,33 @@ view StoreSubmission {
// Index or unique are not applied to views
}
// Note: This is actually a MATERIALIZED VIEW in the database
// Refreshed automatically every 15 minutes via pg_cron (with fallback to manual refresh)
view mv_agent_run_counts {
agentGraphId String @unique
run_count Int
// Pre-aggregated count of AgentGraphExecution records by agentGraphId
// Used by StoreAgent and Creator views for performance optimization
// Unique index created automatically on agentGraphId for fast lookups
// Refresh uses CONCURRENTLY to avoid blocking reads
}
// Note: This is actually a MATERIALIZED VIEW in the database
// Refreshed automatically every 15 minutes via pg_cron (with fallback to manual refresh)
view mv_review_stats {
storeListingId String @unique
review_count Int
avg_rating Float
// Pre-aggregated review statistics from StoreListingReview
// Includes count of reviews and average rating per StoreListing
// Only includes approved versions (submissionStatus = 'APPROVED') and non-deleted listings
// Used by StoreAgent view for performance optimization
// Unique index created automatically on storeListingId for fast lookups
// Refresh uses CONCURRENTLY to avoid blocking reads
}
model StoreListing {
id String @id @default(uuid())
createdAt DateTime @default(now())

View File

@@ -1,3 +1,21 @@
"""
Test Data Creator for AutoGPT Platform
This script creates test data for the AutoGPT platform database.
Image/Video URL Domains Used:
- Images: picsum.photos (for all image URLs - avatars, store listing images, etc.)
- Videos: youtube.com (for store listing video URLs)
Add these domains to your Next.js config:
```javascript
// next.config.js
images: {
domains: ['picsum.photos'],
}
```
"""
import asyncio
import random
from datetime import datetime
@@ -14,6 +32,7 @@ from prisma.types import (
AnalyticsMetricsCreateInput,
APIKeyCreateInput,
CreditTransactionCreateInput,
IntegrationWebhookCreateInput,
ProfileCreateInput,
StoreListingReviewCreateInput,
UserCreateInput,
@@ -53,10 +72,26 @@ MAX_REVIEWS_PER_VERSION = 5 # Total reviews depends on number of versions creat
def get_image():
url = faker.image_url()
while "placekitten.com" in url:
url = faker.image_url()
return url
"""Generate a consistent image URL using picsum.photos service."""
width = random.choice([200, 300, 400, 500, 600, 800])
height = random.choice([200, 300, 400, 500, 600, 800])
# Use a random seed to get different images
seed = random.randint(1, 1000)
return f"https://picsum.photos/seed/{seed}/{width}/{height}"
def get_video_url():
"""Generate a consistent video URL using a placeholder service."""
# Using YouTube as a consistent source for video URLs
video_ids = [
"dQw4w9WgXcQ", # Example video IDs
"9bZkp7q19f0",
"kJQP7kiw5Fk",
"RgKAFK5djSk",
"L_jWHffIx5E",
]
video_id = random.choice(video_ids)
return f"https://www.youtube.com/watch?v={video_id}"
async def main():
@@ -147,12 +182,27 @@ async def main():
)
agent_presets.append(preset)
# Insert UserAgents
user_agents = []
print(f"Inserting {NUM_USERS * MAX_AGENTS_PER_USER} user agents")
# Insert Profiles first (before LibraryAgents)
profiles = []
print(f"Inserting {NUM_USERS} profiles")
for user in users:
profile = await db.profile.create(
data=ProfileCreateInput(
userId=user.id,
name=user.name or faker.name(),
username=faker.unique.user_name(),
description=faker.text(),
links=[faker.url() for _ in range(3)],
avatarUrl=get_image(),
)
)
profiles.append(profile)
# Insert LibraryAgents
library_agents = []
print("Inserting library agents")
for user in users:
num_agents = random.randint(MIN_AGENTS_PER_USER, MAX_AGENTS_PER_USER)
# Get a shuffled list of graphs to ensure uniqueness per user
available_graphs = agent_graphs.copy()
random.shuffle(available_graphs)
@@ -162,18 +212,27 @@ async def main():
for i in range(num_agents):
graph = available_graphs[i] # Use unique graph for each library agent
user_agent = await db.libraryagent.create(
# Get creator profile for this graph's owner
creator_profile = next(
(p for p in profiles if p.userId == graph.userId), None
)
library_agent = await db.libraryagent.create(
data={
"userId": user.id,
"agentGraphId": graph.id,
"agentGraphVersion": graph.version,
"creatorId": creator_profile.id if creator_profile else None,
"imageUrl": get_image() if random.random() < 0.5 else None,
"useGraphIsActiveVersion": random.choice([True, False]),
"isFavorite": random.choice([True, False]),
"isCreatedByUser": random.choice([True, False]),
"isArchived": random.choice([True, False]),
"isDeleted": random.choice([True, False]),
}
)
user_agents.append(user_agent)
library_agents.append(library_agent)
# Insert AgentGraphExecutions
agent_graph_executions = []
@@ -325,25 +384,9 @@ async def main():
)
)
# Insert Profiles
profiles = []
print(f"Inserting {NUM_USERS} profiles")
for user in users:
profile = await db.profile.create(
data=ProfileCreateInput(
userId=user.id,
name=user.name or faker.name(),
username=faker.unique.user_name(),
description=faker.text(),
links=[faker.url() for _ in range(3)],
avatarUrl=get_image(),
)
)
profiles.append(profile)
# Insert StoreListings
store_listings = []
print(f"Inserting {NUM_USERS} store listings")
print("Inserting store listings")
for graph in agent_graphs:
user = random.choice(users)
slug = faker.slug()
@@ -360,7 +403,7 @@ async def main():
# Insert StoreListingVersions
store_listing_versions = []
print(f"Inserting {NUM_USERS} store listing versions")
print("Inserting store listing versions")
for listing in store_listings:
graph = [g for g in agent_graphs if g.id == listing.agentGraphId][0]
version = await db.storelistingversion.create(
@@ -369,7 +412,7 @@ async def main():
"agentGraphVersion": graph.version,
"name": graph.name or faker.sentence(nb_words=3),
"subHeading": faker.sentence(),
"videoUrl": faker.url(),
"videoUrl": get_video_url() if random.random() < 0.3 else None,
"imageUrls": [get_image() for _ in range(3)],
"description": faker.text(),
"categories": [faker.word() for _ in range(3)],
@@ -388,7 +431,7 @@ async def main():
store_listing_versions.append(version)
# Insert StoreListingReviews
print(f"Inserting {NUM_USERS * MAX_REVIEWS_PER_VERSION} store listing reviews")
print("Inserting store listing reviews")
for version in store_listing_versions:
# Create a copy of users list and shuffle it to avoid duplicates
available_reviewers = users.copy()
@@ -411,26 +454,92 @@ async def main():
)
)
# Update StoreListingVersions with submission status (StoreListingSubmissions table no longer exists)
print(f"Updating {NUM_USERS} store listing versions with submission status")
for version in store_listing_versions:
reviewer = random.choice(users)
status: prisma.enums.SubmissionStatus = random.choice(
[
prisma.enums.SubmissionStatus.PENDING,
prisma.enums.SubmissionStatus.APPROVED,
prisma.enums.SubmissionStatus.REJECTED,
]
)
await db.storelistingversion.update(
where={"id": version.id},
data={
"submissionStatus": status,
"Reviewer": {"connect": {"id": reviewer.id}},
"reviewComments": faker.text(),
"reviewedAt": datetime.now(),
},
)
# Insert UserOnboarding for some users
print("Inserting user onboarding data")
for user in random.sample(
users, k=int(NUM_USERS * 0.7)
): # 70% of users have onboarding data
completed_steps = []
possible_steps = list(prisma.enums.OnboardingStep)
# Randomly complete some steps
if random.random() < 0.8:
num_steps = random.randint(1, len(possible_steps))
completed_steps = random.sample(possible_steps, k=num_steps)
try:
await db.useronboarding.create(
data={
"userId": user.id,
"completedSteps": completed_steps,
"notificationDot": random.choice([True, False]),
"notified": (
random.sample(completed_steps, k=min(3, len(completed_steps)))
if completed_steps
else []
),
"rewardedFor": (
random.sample(completed_steps, k=min(2, len(completed_steps)))
if completed_steps
else []
),
"usageReason": (
random.choice(["personal", "business", "research", "learning"])
if random.random() < 0.7
else None
),
"integrations": random.sample(
["github", "google", "discord", "slack"], k=random.randint(0, 2)
),
"otherIntegrations": (
faker.word() if random.random() < 0.2 else None
),
"selectedStoreListingVersionId": (
random.choice(store_listing_versions).id
if store_listing_versions and random.random() < 0.5
else None
),
"agentInput": (
Json({"test": "data"}) if random.random() < 0.3 else None
),
"onboardingAgentExecutionId": (
random.choice(agent_graph_executions).id
if agent_graph_executions and random.random() < 0.3
else None
),
"agentRuns": random.randint(0, 10),
}
)
except Exception as e:
print(f"Error creating onboarding for user {user.id}: {e}")
# Try simpler version
await db.useronboarding.create(
data={
"userId": user.id,
}
)
# Insert IntegrationWebhooks for some users
print("Inserting integration webhooks")
for user in random.sample(
users, k=int(NUM_USERS * 0.3)
): # 30% of users have webhooks
for _ in range(random.randint(1, 3)):
await db.integrationwebhook.create(
data=IntegrationWebhookCreateInput(
userId=user.id,
provider=random.choice(["github", "slack", "discord"]),
credentialsId=str(faker.uuid4()),
webhookType=random.choice(["repo", "channel", "server"]),
resource=faker.slug(),
events=[
random.choice(["created", "updated", "deleted"])
for _ in range(random.randint(1, 3))
],
config=prisma.Json({"url": faker.url()}),
secret=str(faker.sha256()),
providerWebhookId=str(faker.uuid4()),
)
)
# Insert APIKeys
print(f"Inserting {NUM_USERS} api keys")
@@ -451,7 +560,12 @@ async def main():
)
)
# Refresh materialized views
print("Refreshing materialized views...")
await db.execute_raw("SELECT refresh_store_materialized_views();")
await db.disconnect()
print("Test data creation completed successfully!")
if __name__ == "__main__":

View File

@@ -0,0 +1,323 @@
#!/usr/bin/env python3
"""
Test Data Updater for Store Materialized Views
This script updates existing test data to trigger changes in the materialized views:
- mv_agent_run_counts: Updated by creating new AgentGraphExecution records
- mv_review_stats: Updated by creating new StoreListingReview records
Run this after test_data_creator.py to test that materialized views update correctly.
"""
import asyncio
import random
from datetime import datetime, timedelta
import prisma.enums
from faker import Faker
from prisma import Json, Prisma
faker = Faker()
async def main():
db = Prisma()
await db.connect()
print("Starting test data updates for materialized views...")
print("=" * 60)
# Get existing data
users = await db.user.find_many(take=50)
agent_graphs = await db.agentgraph.find_many(where={"isActive": True}, take=50)
store_listings = await db.storelisting.find_many(
where={"hasApprovedVersion": True}, include={"Versions": True}, take=30
)
agent_nodes = await db.agentnode.find_many(take=100)
if not all([users, agent_graphs, store_listings]):
print(
"ERROR: Not enough test data found. Please run test_data_creator.py first."
)
await db.disconnect()
return
print(
f"Found {len(users)} users, {len(agent_graphs)} graphs, {len(store_listings)} store listings"
)
print()
# 1. Add new AgentGraphExecutions to update mv_agent_run_counts
print("1. Adding new agent graph executions...")
print("-" * 40)
new_executions_count = 0
execution_data = []
for graph in random.sample(agent_graphs, min(20, len(agent_graphs))):
# Add 5-15 new executions per selected graph
num_new_executions = random.randint(5, 15)
for _ in range(num_new_executions):
user = random.choice(users)
execution_data.append(
{
"agentGraphId": graph.id,
"agentGraphVersion": graph.version,
"userId": user.id,
"executionStatus": random.choice(
[
prisma.enums.AgentExecutionStatus.COMPLETED,
prisma.enums.AgentExecutionStatus.FAILED,
prisma.enums.AgentExecutionStatus.RUNNING,
]
),
"startedAt": faker.date_time_between(
start_date="-7d", end_date="now"
),
"stats": Json(
{
"duration": random.randint(100, 5000),
"blocks_executed": random.randint(1, 10),
}
),
}
)
new_executions_count += 1
# Batch create executions
await db.agentgraphexecution.create_many(data=execution_data)
print(f"✓ Created {new_executions_count} new executions")
# Get the created executions for node executions
recent_executions = await db.agentgraphexecution.find_many(
take=new_executions_count, order={"createdAt": "desc"}
)
# 2. Add corresponding AgentNodeExecutions
print("\n2. Adding agent node executions...")
print("-" * 40)
node_execution_data = []
for execution in recent_executions:
# Get nodes for this graph
graph_nodes = [
n for n in agent_nodes if n.agentGraphId == execution.agentGraphId
]
if graph_nodes:
for node in random.sample(graph_nodes, min(3, len(graph_nodes))):
node_execution_data.append(
{
"agentGraphExecutionId": execution.id,
"agentNodeId": node.id,
"executionStatus": execution.executionStatus,
"addedTime": datetime.now(),
"startedTime": datetime.now()
- timedelta(minutes=random.randint(1, 10)),
"endedTime": (
datetime.now()
if execution.executionStatus
== prisma.enums.AgentExecutionStatus.COMPLETED
else None
),
}
)
await db.agentnodeexecution.create_many(data=node_execution_data)
print(f"✓ Created {len(node_execution_data)} node executions")
# 3. Add new StoreListingReviews to update mv_review_stats
print("\n3. Adding new store listing reviews...")
print("-" * 40)
new_reviews_count = 0
for listing in store_listings:
if not listing.Versions:
continue
# Get approved versions
approved_versions = [
v
for v in listing.Versions
if v.submissionStatus == prisma.enums.SubmissionStatus.APPROVED
]
if not approved_versions:
continue
# Pick a version to add reviews to
version = random.choice(approved_versions)
# Get existing reviews for this version to avoid duplicates
existing_reviews = await db.storelistingreview.find_many(
where={"storeListingVersionId": version.id}
)
existing_reviewer_ids = {r.reviewByUserId for r in existing_reviews}
# Find users who haven't reviewed this version yet
available_reviewers = [u for u in users if u.id not in existing_reviewer_ids]
if available_reviewers:
# Add 2-5 new reviews
num_new_reviews = min(random.randint(2, 5), len(available_reviewers))
selected_reviewers = random.sample(available_reviewers, num_new_reviews)
for reviewer in selected_reviewers:
# Bias towards positive reviews (4-5 stars)
score = random.choices([1, 2, 3, 4, 5], weights=[5, 10, 20, 40, 25])[0]
await db.storelistingreview.create(
data={
"storeListingVersionId": version.id,
"reviewByUserId": reviewer.id,
"score": score,
"comments": (
faker.text(max_nb_chars=200)
if random.random() < 0.7
else None
),
}
)
new_reviews_count += 1
print(f"✓ Created {new_reviews_count} new reviews")
# 4. Update some store listing versions (change categories, featured status)
print("\n4. Updating store listing versions...")
print("-" * 40)
updates_count = 0
for listing in random.sample(store_listings, min(10, len(store_listings))):
if listing.Versions:
version = random.choice(listing.Versions)
if version.submissionStatus == prisma.enums.SubmissionStatus.APPROVED:
# Toggle featured status or update categories
new_categories = random.sample(
[
"productivity",
"ai",
"automation",
"data",
"social",
"marketing",
"development",
"analytics",
],
k=random.randint(2, 4),
)
await db.storelistingversion.update(
where={"id": version.id},
data={
"isFeatured": (
not version.isFeatured
if random.random() < 0.3
else version.isFeatured
),
"categories": new_categories,
"updatedAt": datetime.now(),
},
)
updates_count += 1
print(f"✓ Updated {updates_count} store listing versions")
# 5. Create some new credit transactions
print("\n5. Adding credit transactions...")
print("-" * 40)
transaction_count = 0
for user in random.sample(users, min(30, len(users))):
# Add 1-3 transactions per user
for _ in range(random.randint(1, 3)):
transaction_type = random.choice(
[
prisma.enums.CreditTransactionType.USAGE,
prisma.enums.CreditTransactionType.TOP_UP,
prisma.enums.CreditTransactionType.GRANT,
]
)
amount = (
random.randint(10, 500)
if transaction_type == prisma.enums.CreditTransactionType.TOP_UP
else -random.randint(1, 50)
)
await db.credittransaction.create(
data={
"userId": user.id,
"amount": amount,
"type": transaction_type,
"metadata": Json(
{
"source": "test_updater",
"timestamp": datetime.now().isoformat(),
}
),
}
)
transaction_count += 1
print(f"✓ Created {transaction_count} credit transactions")
# 6. Refresh materialized views
print("\n6. Refreshing materialized views...")
print("-" * 40)
try:
await db.execute_raw("SELECT refresh_store_materialized_views();")
print("✓ Materialized views refreshed successfully")
except Exception as e:
print(f"⚠ Warning: Could not refresh materialized views: {e}")
print(
" You may need to refresh them manually with: SELECT refresh_store_materialized_views();"
)
# 7. Verify the updates
print("\n7. Verifying updates...")
print("-" * 40)
# Check agent run counts
run_counts = await db.query_raw(
"SELECT COUNT(*) as view_count FROM mv_agent_run_counts"
)
print(f"✓ mv_agent_run_counts has {run_counts[0]['view_count']} entries")
# Check review stats
review_stats = await db.query_raw(
"SELECT COUNT(*) as view_count FROM mv_review_stats"
)
print(f"✓ mv_review_stats has {review_stats[0]['view_count']} entries")
# Sample some data from the views
print("\nSample data from materialized views:")
sample_runs = await db.query_raw(
"SELECT * FROM mv_agent_run_counts ORDER BY run_count DESC LIMIT 5"
)
print("\nTop 5 agents by run count:")
for row in sample_runs:
print(f" - Agent {row['agentGraphId'][:8]}...: {row['run_count']} runs")
sample_reviews = await db.query_raw(
"SELECT * FROM mv_review_stats ORDER BY avg_rating DESC NULLS LAST LIMIT 5"
)
print("\nTop 5 store listings by rating:")
for row in sample_reviews:
avg_rating = row["avg_rating"] if row["avg_rating"] is not None else 0.0
print(
f" - Listing {row['storeListingId'][:8]}...: {avg_rating:.2f} ⭐ ({row['review_count']} reviews)"
)
await db.disconnect()
print("\n" + "=" * 60)
print("Test data update completed successfully!")
print("The materialized views should now reflect the updated data.")
print(
"\nTo manually refresh views, run: SELECT refresh_store_materialized_views();"
)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -11,8 +11,6 @@ const nextConfig = {
"ideogram.ai", // for generated images
"picsum.photos", // for placeholder images
"dummyimage.com", // for placeholder images
"placekitten.com", // for placeholder images
],
},
output: "standalone",