mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-09 22:35:54 -05:00
## Summary Enable LaunchDarkly feature flags to use rich user context and metadata for advanced targeting, including user segments, account age, email domains, and custom attributes. This unlocks LaunchDarkly's powerful targeting capabilities beyond simple user ID checks. ## Problem LaunchDarkly feature flags were only receiving basic user IDs, preventing the use of: - **Segment-based targeting** (e.g., "employees", "beta users", "new accounts") - **Contextual rules** (e.g., account age, email domain, custom metadata) - **Advanced LaunchDarkly features** like percentage rollouts by user attributes This limited feature flag flexibility and required manual user ID management for targeting. ## Solution ### 🎯 **LaunchDarkly Context Enhancement** - **Rich user context**: Send user metadata, segments, account age, email domain to LaunchDarkly - **Automatic segmentation**: Users automatically categorized as "employee", "new_user", "established_user" etc. - **Custom metadata support**: Any user metadata becomes available for LaunchDarkly targeting - **24-hour caching**: Efficient user context retrieval with TTL cache to reduce database calls ### 📊 **User Context Data** ```python # Before: Only user ID context = Context.builder("user-123").build() # After: Full context with targeting data context = { "email": "user@agpt.co", "created_at": "2023-01-15T10:00:00Z", "segments": ["employee", "established_user"], "email_domain": "agpt.co", "account_age_days": 365, "custom_role": "admin" } ``` ### 🏗️ **Required Infrastructure Changes** To support proper LaunchDarkly serialization, we needed to implement clean application models: #### **Application-Layer User Model** - Created snake_case User model (`created_at`, `email_verified`) for proper JSON serialization - LaunchDarkly expects consistent field naming - camelCase Prisma objects caused validation errors - Added `User.from_db()` converter to safely transform database objects #### **HTTP Client Reliability** - Fixed HTTP 4xx retry issue that was causing unnecessary load - Added layer validation to prevent database objects leaking to external services #### **Type Safety** - Eliminated `Any` types and defensive coding patterns - Proper typing enables better IDE support and catches errors early ## Technical Implementation ### **Core LaunchDarkly Enhancement** ```python # autogpt_libs/feature_flag/client.py @async_ttl_cache(maxsize=1000, ttl_seconds=86400) # 24h cache async def _fetch_user_context_data(user_id: str) -> dict[str, Any]: user = await get_user_by_id(user_id) return _build_launchdarkly_context(user) def _build_launchdarkly_context(user: User) -> dict[str, Any]: return { "email": user.email, "created_at": user.created_at.isoformat(), # snake_case for serialization "segments": determine_user_segments(user), "account_age_days": calculate_account_age(user), # ... more context data } ``` ### **User Segmentation Logic** - **Role-based**: `admin`, `user`, `system` segments - **Domain-based**: `employee` for @agpt.co emails - **Account age**: `new_user` (<7 days), `recent_user` (7-30 days), `established_user` (>30 days) - **Custom metadata**: Any user metadata becomes available for targeting ### **Infrastructure Updates** - `backend/data/model.py`: Application User model with proper serialization - `backend/util/service.py`: HTTP client improvements and layer validation - Multiple files: Migration to use application models for consistency ## LaunchDarkly Usage Examples With this enhancement, you can now create LaunchDarkly rules like: ```yaml # Target employees only - variation: true targets: - values: ["employee"] contextKind: "user" attribute: "segments" # Target new users for gradual rollout - variation: true rollout: variations: - variation: true weight: 25000 # 25% of new users contextKind: "user" bucketBy: "segments" filters: - attribute: "segments" op: "contains" values: ["new_user"] ``` ## Performance & Caching - **24-hour TTL cache**: Dramatically reduces database calls for user context - **Graceful fallbacks**: Simple user ID context if database unavailable - **Efficient caching**: 1000 entry LRU cache with automatic TTL expiration ## Testing - [x] LaunchDarkly context includes all expected user attributes - [x] Segmentation logic correctly categorizes users - [x] 24-hour cache reduces database load - [x] Fallback to simple context works when database unavailable - [x] All existing feature flag functionality preserved - [x] HTTP retry improvements work correctly ## Breaking Changes ✅ **No external API changes** - all existing feature flag usage continues to work ⚠️ **Internal changes only**: - `get_user_by_id()` returns application User model instead of Prisma model - Test utilities need to import User from `backend.data.model` ## Impact 🎯 **Product Impact**: - **Advanced targeting**: Product teams can now use sophisticated LaunchDarkly rules - **Better user experience**: Gradual rollouts, A/B testing, and segment-based features - **Operational efficiency**: Reduced need for manual user ID management 🚀 **Performance Impact**: - **Reduced database load**: 24-hour caching minimizes repeated user context queries - **Improved reliability**: Fixed HTTP retry inefficiencies - **Better monitoring**: Cleaner logs without 4xx retry noise --- **Primary goal**: Enable rich LaunchDarkly targeting with user context and segments **Infrastructure changes**: Required for proper serialization and reliability 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
788 lines
31 KiB
Python
788 lines
31 KiB
Python
"""
|
|
E2E Test Data Creator for AutoGPT Platform
|
|
|
|
This script creates test data for E2E tests by using API functions instead of direct Prisma calls.
|
|
This approach ensures compatibility with future model changes by using the API layer.
|
|
|
|
Image/Video URL Domains Used:
|
|
- Images: picsum.photos (for all image URLs - avatars, store listing images, etc.)
|
|
- Videos: youtube.com (for store listing video URLs)
|
|
|
|
Add these domains to your Next.js config:
|
|
```javascript
|
|
// next.config.js
|
|
images: {
|
|
domains: ['picsum.photos'],
|
|
}
|
|
```
|
|
"""
|
|
|
|
import asyncio
|
|
import random
|
|
from typing import Any, Dict, List
|
|
|
|
from faker import Faker
|
|
|
|
from backend.data.api_key import generate_api_key
|
|
from backend.data.credit import get_user_credit_model
|
|
from backend.data.db import prisma
|
|
from backend.data.graph import Graph, Link, Node, create_graph
|
|
|
|
# Import API functions from the backend
|
|
from backend.data.user import get_or_create_user
|
|
from backend.server.v2.library.db import create_library_agent, create_preset
|
|
from backend.server.v2.library.model import LibraryAgentPresetCreatable
|
|
from backend.server.v2.store.db import create_store_submission, review_store_submission
|
|
from backend.util.clients import get_supabase
|
|
|
|
faker = Faker()
|
|
|
|
|
|
# Constants for data generation limits (reduced for E2E tests)
|
|
NUM_USERS = 10
|
|
NUM_AGENT_BLOCKS = 20
|
|
MIN_GRAPHS_PER_USER = 10
|
|
MAX_GRAPHS_PER_USER = 10
|
|
MIN_NODES_PER_GRAPH = 2
|
|
MAX_NODES_PER_GRAPH = 4
|
|
MIN_PRESETS_PER_USER = 1
|
|
MAX_PRESETS_PER_USER = 2
|
|
MIN_AGENTS_PER_USER = 10
|
|
MAX_AGENTS_PER_USER = 10
|
|
MIN_EXECUTIONS_PER_GRAPH = 1
|
|
MAX_EXECUTIONS_PER_GRAPH = 5
|
|
MIN_REVIEWS_PER_VERSION = 1
|
|
MAX_REVIEWS_PER_VERSION = 3
|
|
|
|
|
|
def get_image():
|
|
"""Generate a consistent image URL using picsum.photos service."""
|
|
width = random.choice([200, 300, 400, 500, 600, 800])
|
|
height = random.choice([200, 300, 400, 500, 600, 800])
|
|
seed = random.randint(1, 1000)
|
|
return f"https://picsum.photos/seed/{seed}/{width}/{height}"
|
|
|
|
|
|
def get_video_url():
|
|
"""Generate a consistent video URL using YouTube."""
|
|
video_ids = [
|
|
"dQw4w9WgXcQ",
|
|
"9bZkp7q19f0",
|
|
"kJQP7kiw5Fk",
|
|
"RgKAFK5djSk",
|
|
"L_jWHffIx5E",
|
|
]
|
|
video_id = random.choice(video_ids)
|
|
return f"https://www.youtube.com/watch?v={video_id}"
|
|
|
|
|
|
class TestDataCreator:
|
|
"""Creates test data using API functions for E2E tests."""
|
|
|
|
def __init__(self):
|
|
self.users: List[Dict[str, Any]] = []
|
|
self.agent_blocks: List[Dict[str, Any]] = []
|
|
self.agent_graphs: List[Dict[str, Any]] = []
|
|
self.library_agents: List[Dict[str, Any]] = []
|
|
self.store_submissions: List[Dict[str, Any]] = []
|
|
self.api_keys: List[Dict[str, Any]] = []
|
|
self.presets: List[Dict[str, Any]] = []
|
|
self.profiles: List[Dict[str, Any]] = []
|
|
|
|
async def create_test_users(self) -> List[Dict[str, Any]]:
|
|
"""Create test users using Supabase client."""
|
|
print(f"Creating {NUM_USERS} test users...")
|
|
|
|
supabase = get_supabase()
|
|
users = []
|
|
|
|
for i in range(NUM_USERS):
|
|
try:
|
|
# Generate test user data
|
|
if i == 0:
|
|
# First user should have test123@gmail.com email for testing
|
|
email = "test123@gmail.com"
|
|
else:
|
|
email = faker.unique.email()
|
|
password = "testpassword123" # Standard test password
|
|
user_id = f"test-user-{i}-{faker.uuid4()}"
|
|
|
|
# Create user in Supabase Auth (if needed)
|
|
try:
|
|
auth_response = supabase.auth.admin.create_user(
|
|
{"email": email, "password": password, "email_confirm": True}
|
|
)
|
|
if auth_response.user:
|
|
user_id = auth_response.user.id
|
|
except Exception as supabase_error:
|
|
print(
|
|
f"Supabase user creation failed for {email}, using fallback: {supabase_error}"
|
|
)
|
|
# Fall back to direct database creation
|
|
|
|
# Create mock user data similar to what auth middleware would provide
|
|
user_data = {
|
|
"sub": user_id,
|
|
"email": email,
|
|
}
|
|
|
|
# Use the API function to create user in local database
|
|
user = await get_or_create_user(user_data)
|
|
users.append(user.model_dump())
|
|
|
|
except Exception as e:
|
|
print(f"Error creating user {i}: {e}")
|
|
continue
|
|
|
|
self.users = users
|
|
return users
|
|
|
|
async def get_available_blocks(self) -> List[Dict[str, Any]]:
|
|
"""Get available agent blocks from database."""
|
|
print("Getting available agent blocks...")
|
|
|
|
# Get blocks from database instead of the registry
|
|
db_blocks = await prisma.agentblock.find_many()
|
|
if not db_blocks:
|
|
print("No blocks found in database, creating some basic blocks...")
|
|
# Create some basic blocks if none exist
|
|
from backend.blocks.io import AgentInputBlock, AgentOutputBlock
|
|
from backend.blocks.maths import CalculatorBlock
|
|
from backend.blocks.time_blocks import GetCurrentTimeBlock
|
|
|
|
blocks_to_create = [
|
|
AgentInputBlock(),
|
|
AgentOutputBlock(),
|
|
CalculatorBlock(),
|
|
GetCurrentTimeBlock(),
|
|
]
|
|
|
|
for block in blocks_to_create:
|
|
try:
|
|
await prisma.agentblock.create(
|
|
data={
|
|
"id": block.id,
|
|
"name": block.name,
|
|
"inputSchema": "{}",
|
|
"outputSchema": "{}",
|
|
}
|
|
)
|
|
except Exception as e:
|
|
print(f"Error creating block {block.name}: {e}")
|
|
|
|
# Get blocks again after creation
|
|
db_blocks = await prisma.agentblock.find_many()
|
|
|
|
self.agent_blocks = [
|
|
{"id": block.id, "name": block.name} for block in db_blocks
|
|
]
|
|
print(f"Found {len(self.agent_blocks)} blocks in database")
|
|
return self.agent_blocks
|
|
|
|
async def create_test_graphs(self) -> List[Dict[str, Any]]:
|
|
"""Create test graphs using the API function."""
|
|
print("Creating test graphs...")
|
|
|
|
graphs = []
|
|
for user in self.users:
|
|
num_graphs = random.randint(MIN_GRAPHS_PER_USER, MAX_GRAPHS_PER_USER)
|
|
|
|
for graph_num in range(num_graphs):
|
|
# Create a simple graph with nodes and links
|
|
graph_id = str(faker.uuid4())
|
|
nodes = []
|
|
links = []
|
|
|
|
# Determine if this should be a DummyInput graph (first 3-4 graphs per user)
|
|
is_dummy_input = graph_num < 4
|
|
|
|
# Create nodes based on graph type
|
|
if is_dummy_input:
|
|
# For dummy input graphs: only GetCurrentTimeBlock
|
|
node_id = str(faker.uuid4())
|
|
block = next(
|
|
b
|
|
for b in self.agent_blocks
|
|
if b["name"] == "GetCurrentTimeBlock"
|
|
)
|
|
input_default = {"trigger": "start", "format": "%H:%M:%S"}
|
|
|
|
node = Node(
|
|
id=node_id,
|
|
block_id=block["id"],
|
|
input_default=input_default,
|
|
metadata={"position": {"x": 0, "y": 0}},
|
|
)
|
|
nodes.append(node)
|
|
else:
|
|
# For regular graphs: Create calculator agent pattern with 4 nodes
|
|
# Node 1: AgentInputBlock for 'a'
|
|
input_a_id = str(faker.uuid4())
|
|
input_a_block = next(
|
|
b for b in self.agent_blocks if b["name"] == "AgentInputBlock"
|
|
)
|
|
input_a_node = Node(
|
|
id=input_a_id,
|
|
block_id=input_a_block["id"],
|
|
input_default={
|
|
"name": "a",
|
|
"title": None,
|
|
"value": "",
|
|
"advanced": False,
|
|
"description": None,
|
|
"placeholder_values": [],
|
|
},
|
|
metadata={"position": {"x": -1012, "y": 674}},
|
|
)
|
|
nodes.append(input_a_node)
|
|
|
|
# Node 2: AgentInputBlock for 'b'
|
|
input_b_id = str(faker.uuid4())
|
|
input_b_block = next(
|
|
b for b in self.agent_blocks if b["name"] == "AgentInputBlock"
|
|
)
|
|
input_b_node = Node(
|
|
id=input_b_id,
|
|
block_id=input_b_block["id"],
|
|
input_default={
|
|
"name": "b",
|
|
"title": None,
|
|
"value": "",
|
|
"advanced": False,
|
|
"description": None,
|
|
"placeholder_values": [],
|
|
},
|
|
metadata={"position": {"x": -1117, "y": 78}},
|
|
)
|
|
nodes.append(input_b_node)
|
|
|
|
# Node 3: CalculatorBlock
|
|
calc_id = str(faker.uuid4())
|
|
calc_block = next(
|
|
b for b in self.agent_blocks if b["name"] == "CalculatorBlock"
|
|
)
|
|
calc_node = Node(
|
|
id=calc_id,
|
|
block_id=calc_block["id"],
|
|
input_default={"operation": "Add", "round_result": False},
|
|
metadata={"position": {"x": -435, "y": 363}},
|
|
)
|
|
nodes.append(calc_node)
|
|
|
|
# Node 4: AgentOutputBlock
|
|
output_id = str(faker.uuid4())
|
|
output_block = next(
|
|
b for b in self.agent_blocks if b["name"] == "AgentOutputBlock"
|
|
)
|
|
output_node = Node(
|
|
id=output_id,
|
|
block_id=output_block["id"],
|
|
input_default={
|
|
"name": "result",
|
|
"title": None,
|
|
"value": "",
|
|
"format": "",
|
|
"advanced": False,
|
|
"description": None,
|
|
},
|
|
metadata={"position": {"x": 402, "y": 0}},
|
|
)
|
|
nodes.append(output_node)
|
|
|
|
# Create links between nodes (only for non-dummy graphs with multiple nodes)
|
|
if len(nodes) >= 4:
|
|
# Use the actual node IDs from the created nodes instead of our variables
|
|
actual_input_a_id = nodes[0].id # First node (input_a)
|
|
actual_input_b_id = nodes[1].id # Second node (input_b)
|
|
actual_calc_id = nodes[2].id # Third node (calculator)
|
|
actual_output_id = nodes[3].id # Fourth node (output)
|
|
|
|
# Link input_a to calculator.a
|
|
link1 = Link(
|
|
source_id=actual_input_a_id,
|
|
sink_id=actual_calc_id,
|
|
source_name="result",
|
|
sink_name="a",
|
|
is_static=True,
|
|
)
|
|
links.append(link1)
|
|
|
|
# Link input_b to calculator.b
|
|
link2 = Link(
|
|
source_id=actual_input_b_id,
|
|
sink_id=actual_calc_id,
|
|
source_name="result",
|
|
sink_name="b",
|
|
is_static=True,
|
|
)
|
|
links.append(link2)
|
|
|
|
# Link calculator.result to output.value
|
|
link3 = Link(
|
|
source_id=actual_calc_id,
|
|
sink_id=actual_output_id,
|
|
source_name="result",
|
|
sink_name="value",
|
|
is_static=False,
|
|
)
|
|
links.append(link3)
|
|
|
|
# Create graph object with DummyInput in name if it's a dummy input graph
|
|
graph_name = faker.sentence(nb_words=3)
|
|
if is_dummy_input:
|
|
graph_name = f"DummyInput {graph_name}"
|
|
|
|
graph_name = f"{graph_name} Agents"
|
|
|
|
graph = Graph(
|
|
id=graph_id,
|
|
name=graph_name,
|
|
description=faker.text(max_nb_chars=200),
|
|
nodes=nodes,
|
|
links=links,
|
|
is_active=True,
|
|
)
|
|
|
|
try:
|
|
# Use the API function to create graph
|
|
created_graph = await create_graph(graph, user["id"])
|
|
graph_dict = created_graph.model_dump()
|
|
# Ensure userId is included for store submissions
|
|
graph_dict["userId"] = user["id"]
|
|
graphs.append(graph_dict)
|
|
print(
|
|
f"✅ Created graph for user {user['id']}: {graph_dict['name']}"
|
|
)
|
|
except Exception as e:
|
|
print(f"Error creating graph: {e}")
|
|
continue
|
|
|
|
self.agent_graphs = graphs
|
|
return graphs
|
|
|
|
async def create_test_library_agents(self) -> List[Dict[str, Any]]:
|
|
"""Create test library agents using the API function."""
|
|
print("Creating test library agents...")
|
|
|
|
library_agents = []
|
|
for user in self.users:
|
|
num_agents = 10 # Create exactly 10 agents per user
|
|
|
|
# Get available graphs for this user
|
|
user_graphs = [
|
|
g for g in self.agent_graphs if g.get("userId") == user["id"]
|
|
]
|
|
if not user_graphs:
|
|
continue
|
|
|
|
# Shuffle and take unique graphs to avoid duplicates
|
|
random.shuffle(user_graphs)
|
|
selected_graphs = user_graphs[: min(num_agents, len(user_graphs))]
|
|
|
|
for graph_data in selected_graphs:
|
|
try:
|
|
# Get the graph model from the database
|
|
from backend.data.graph import get_graph
|
|
|
|
graph = await get_graph(
|
|
graph_data["id"], graph_data.get("version", 1), user["id"]
|
|
)
|
|
if graph:
|
|
# Use the API function to create library agent
|
|
library_agents.extend(
|
|
v.model_dump()
|
|
for v in await create_library_agent(graph, user["id"])
|
|
)
|
|
except Exception as e:
|
|
print(f"Error creating library agent: {e}")
|
|
continue
|
|
|
|
self.library_agents = library_agents
|
|
return library_agents
|
|
|
|
async def create_test_presets(self) -> List[Dict[str, Any]]:
|
|
"""Create test presets using the API function."""
|
|
print("Creating test presets...")
|
|
|
|
presets = []
|
|
for user in self.users:
|
|
num_presets = random.randint(MIN_PRESETS_PER_USER, MAX_PRESETS_PER_USER)
|
|
|
|
# Get available graphs for this user
|
|
user_graphs = [
|
|
g for g in self.agent_graphs if g.get("userId") == user["id"]
|
|
]
|
|
if not user_graphs:
|
|
continue
|
|
|
|
for _ in range(min(num_presets, len(user_graphs))):
|
|
graph = random.choice(user_graphs)
|
|
|
|
preset_data = LibraryAgentPresetCreatable(
|
|
name=faker.sentence(nb_words=3),
|
|
description=faker.text(max_nb_chars=200),
|
|
graph_id=graph["id"], # Fixed field name
|
|
graph_version=graph.get("version", 1), # Fixed field name
|
|
inputs={}, # Required field - empty inputs for test data
|
|
credentials={}, # Required field - empty credentials for test data
|
|
is_active=True,
|
|
)
|
|
|
|
try:
|
|
# Use the API function to create preset
|
|
preset = await create_preset(user["id"], preset_data)
|
|
presets.append(preset.model_dump())
|
|
except Exception as e:
|
|
print(f"Error creating preset: {e}")
|
|
continue
|
|
|
|
self.presets = presets
|
|
return presets
|
|
|
|
async def create_test_api_keys(self) -> List[Dict[str, Any]]:
|
|
"""Create test API keys using the API function."""
|
|
print("Creating test API keys...")
|
|
|
|
api_keys = []
|
|
for user in self.users:
|
|
from backend.data.api_key import APIKeyPermission
|
|
|
|
try:
|
|
# Use the API function to create API key
|
|
api_key, _ = await generate_api_key(
|
|
name=faker.word(),
|
|
user_id=user["id"],
|
|
permissions=[
|
|
APIKeyPermission.EXECUTE_GRAPH,
|
|
APIKeyPermission.READ_GRAPH,
|
|
],
|
|
description=faker.text(),
|
|
)
|
|
api_keys.append(api_key.model_dump())
|
|
except Exception as e:
|
|
print(f"Error creating API key for user {user['id']}: {e}")
|
|
continue
|
|
|
|
self.api_keys = api_keys
|
|
return api_keys
|
|
|
|
async def update_test_profiles(self) -> List[Dict[str, Any]]:
|
|
"""Update existing user profiles to make some into featured creators."""
|
|
print("Updating user profiles to create featured creators...")
|
|
|
|
# Get all existing profiles (auto-created when users were created)
|
|
existing_profiles = await prisma.profile.find_many(
|
|
where={"userId": {"in": [user["id"] for user in self.users]}}
|
|
)
|
|
|
|
if not existing_profiles:
|
|
print("No existing profiles found. Profiles may not be auto-created.")
|
|
return []
|
|
|
|
profiles = []
|
|
# Select about 70% of users to become creators (update their profiles)
|
|
num_creators = max(1, int(len(existing_profiles) * 0.7))
|
|
selected_profiles = random.sample(
|
|
existing_profiles, min(num_creators, len(existing_profiles))
|
|
)
|
|
|
|
# Mark about 50% of creators as featured (more for testing)
|
|
num_featured = max(2, int(num_creators * 0.5))
|
|
num_featured = min(
|
|
num_featured, len(selected_profiles)
|
|
) # Don't exceed available profiles
|
|
featured_profile_ids = set(
|
|
random.sample([p.id for p in selected_profiles], num_featured)
|
|
)
|
|
|
|
for profile in selected_profiles:
|
|
try:
|
|
is_featured = profile.id in featured_profile_ids
|
|
|
|
# Update the profile with creator data
|
|
updated_profile = await prisma.profile.update(
|
|
where={"id": profile.id},
|
|
data={
|
|
"name": faker.name(),
|
|
"username": faker.user_name()
|
|
+ str(random.randint(100, 999)), # Ensure uniqueness
|
|
"description": faker.text(max_nb_chars=200),
|
|
"links": [faker.url() for _ in range(random.randint(1, 3))],
|
|
"avatarUrl": get_image(),
|
|
"isFeatured": is_featured,
|
|
},
|
|
)
|
|
|
|
if updated_profile:
|
|
profiles.append(updated_profile.model_dump())
|
|
|
|
except Exception as e:
|
|
print(f"Error updating profile {profile.id}: {e}")
|
|
continue
|
|
|
|
self.profiles = profiles
|
|
return profiles
|
|
|
|
async def create_test_store_submissions(self) -> List[Dict[str, Any]]:
|
|
"""Create test store submissions using the API function."""
|
|
print("Creating test store submissions...")
|
|
|
|
submissions = []
|
|
approved_submissions = []
|
|
|
|
# Create a special test submission for test123@gmail.com
|
|
test_user = next(
|
|
(user for user in self.users if user["email"] == "test123@gmail.com"), None
|
|
)
|
|
if test_user:
|
|
# Special test data for consistent testing
|
|
test_submission_data = {
|
|
"user_id": test_user["id"],
|
|
"agent_id": self.agent_graphs[0]["id"], # Use first available graph
|
|
"agent_version": 1,
|
|
"slug": "test-agent-submission",
|
|
"name": "Test Agent Submission",
|
|
"sub_heading": "A test agent for frontend testing",
|
|
"video_url": "https://www.youtube.com/watch?v=test123",
|
|
"image_urls": [
|
|
"https://picsum.photos/200/300",
|
|
"https://picsum.photos/200/301",
|
|
"https://picsum.photos/200/302",
|
|
],
|
|
"description": "This is a test agent submission specifically created for frontend testing purposes.",
|
|
"categories": ["test", "demo", "frontend"],
|
|
"changes_summary": "Initial test submission",
|
|
}
|
|
|
|
try:
|
|
test_submission = await create_store_submission(**test_submission_data)
|
|
submissions.append(test_submission.model_dump())
|
|
print("✅ Created special test store submission for test123@gmail.com")
|
|
|
|
# Auto-approve the test submission
|
|
if test_submission.store_listing_version_id:
|
|
approved_submission = await review_store_submission(
|
|
store_listing_version_id=test_submission.store_listing_version_id,
|
|
is_approved=True,
|
|
external_comments="Test submission approved",
|
|
internal_comments="Auto-approved test submission",
|
|
reviewer_id=test_user["id"],
|
|
)
|
|
approved_submissions.append(approved_submission.model_dump())
|
|
print("✅ Approved test store submission")
|
|
|
|
# Mark test submission as featured
|
|
await prisma.storelistingversion.update(
|
|
where={"id": test_submission.store_listing_version_id},
|
|
data={"isFeatured": True},
|
|
)
|
|
print("🌟 Marked test agent as FEATURED")
|
|
|
|
except Exception as e:
|
|
print(f"Error creating test store submission: {e}")
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
|
|
# Create regular submissions for all users
|
|
for user in self.users:
|
|
# Get available graphs for this specific user
|
|
user_graphs = [
|
|
g for g in self.agent_graphs if g.get("userId") == user["id"]
|
|
]
|
|
print(f"User {user['id']} has {len(user_graphs)} graphs")
|
|
if not user_graphs:
|
|
print(
|
|
f"No graphs found for user {user['id']}, skipping store submissions"
|
|
)
|
|
continue
|
|
|
|
# Create exactly 4 store submissions per user
|
|
for submission_index in range(4):
|
|
graph = random.choice(user_graphs)
|
|
|
|
try:
|
|
print(
|
|
f"Creating store submission for user {user['id']} with graph {graph['id']} (owner: {graph.get('userId')})"
|
|
)
|
|
|
|
# Use the API function to create store submission with correct parameters
|
|
submission = await create_store_submission(
|
|
user_id=user["id"], # Must match graph's userId
|
|
agent_id=graph["id"],
|
|
agent_version=graph.get("version", 1),
|
|
slug=faker.slug(),
|
|
name=graph.get("name", faker.sentence(nb_words=3)),
|
|
sub_heading=faker.sentence(),
|
|
video_url=get_video_url() if random.random() < 0.3 else None,
|
|
image_urls=[get_image() for _ in range(3)],
|
|
description=faker.text(),
|
|
categories=[faker.word() for _ in range(3)],
|
|
changes_summary="Initial E2E test submission",
|
|
)
|
|
submissions.append(submission.model_dump())
|
|
print(f"✅ Created store submission: {submission.name}")
|
|
|
|
# Approve the submission so it appears in the store
|
|
if submission.store_listing_version_id:
|
|
try:
|
|
# Pick a random user as the reviewer (admin)
|
|
reviewer_id = random.choice(self.users)["id"]
|
|
|
|
approved_submission = await review_store_submission(
|
|
store_listing_version_id=submission.store_listing_version_id,
|
|
is_approved=True,
|
|
external_comments="Auto-approved for E2E testing",
|
|
internal_comments="Automatically approved by E2E test data script",
|
|
reviewer_id=reviewer_id,
|
|
)
|
|
approved_submissions.append(
|
|
approved_submission.model_dump()
|
|
)
|
|
print(f"✅ Approved store submission: {submission.name}")
|
|
|
|
# Mark some agents as featured during creation (30% chance)
|
|
# More likely for creators and first submissions
|
|
is_creator = user["id"] in [
|
|
p.get("userId") for p in self.profiles
|
|
]
|
|
feature_chance = (
|
|
0.5 if is_creator else 0.2
|
|
) # 50% for creators, 20% for others
|
|
|
|
if random.random() < feature_chance:
|
|
try:
|
|
await prisma.storelistingversion.update(
|
|
where={
|
|
"id": submission.store_listing_version_id
|
|
},
|
|
data={"isFeatured": True},
|
|
)
|
|
print(
|
|
f"🌟 Marked agent as FEATURED: {submission.name}"
|
|
)
|
|
except Exception as e:
|
|
print(
|
|
f"Warning: Could not mark submission as featured: {e}"
|
|
)
|
|
|
|
except Exception as e:
|
|
print(
|
|
f"Warning: Could not approve submission {submission.name}: {e}"
|
|
)
|
|
|
|
except Exception as e:
|
|
print(
|
|
f"Error creating store submission for user {user['id']} graph {graph['id']}: {e}"
|
|
)
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
continue
|
|
|
|
print(
|
|
f"Created {len(submissions)} store submissions, approved {len(approved_submissions)}"
|
|
)
|
|
self.store_submissions = submissions
|
|
return submissions
|
|
|
|
async def add_user_credits(self):
|
|
"""Add credits to users."""
|
|
print("Adding credits to users...")
|
|
|
|
credit_model = get_user_credit_model()
|
|
|
|
for user in self.users:
|
|
try:
|
|
# Skip credits for disabled credit model to avoid errors
|
|
if (
|
|
hasattr(credit_model, "__class__")
|
|
and "Disabled" in credit_model.__class__.__name__
|
|
):
|
|
print(f"Skipping credits for user {user['id']} - credits disabled")
|
|
continue
|
|
|
|
# Add random credits to each user
|
|
credit_amount = random.randint(100, 1000)
|
|
|
|
await credit_model.top_up_credits(
|
|
user_id=user["id"], amount=credit_amount
|
|
)
|
|
print(f"Added {credit_amount} credits to user {user['id']}")
|
|
except Exception:
|
|
print(
|
|
f"Skipping credits for user {user['id']}: credits may be disabled"
|
|
)
|
|
continue
|
|
|
|
async def create_all_test_data(self):
|
|
"""Create all test data."""
|
|
print("Starting E2E test data creation...")
|
|
|
|
# Create users first
|
|
await self.create_test_users()
|
|
|
|
# Get available blocks
|
|
await self.get_available_blocks()
|
|
|
|
# Create graphs
|
|
await self.create_test_graphs()
|
|
|
|
# Create library agents
|
|
await self.create_test_library_agents()
|
|
|
|
# Create presets
|
|
await self.create_test_presets()
|
|
|
|
# Create API keys
|
|
await self.create_test_api_keys()
|
|
|
|
# Update user profiles to create featured creators
|
|
await self.update_test_profiles()
|
|
|
|
# Create store submissions
|
|
await self.create_test_store_submissions()
|
|
|
|
# Add user credits
|
|
await self.add_user_credits()
|
|
|
|
# Refresh materialized views
|
|
print("Refreshing materialized views...")
|
|
try:
|
|
await prisma.execute_raw("SELECT refresh_store_materialized_views();")
|
|
except Exception as e:
|
|
print(f"Error refreshing materialized views: {e}")
|
|
|
|
print("E2E test data creation completed successfully!")
|
|
|
|
# Print summary
|
|
print("\n🎉 E2E Test Data Creation Summary:")
|
|
print(f"✅ Users created: {len(self.users)}")
|
|
print(f"✅ Agent blocks available: {len(self.agent_blocks)}")
|
|
print(f"✅ Agent graphs created: {len(self.agent_graphs)}")
|
|
print(f"✅ Library agents created: {len(self.library_agents)}")
|
|
print(f"✅ Creator profiles updated: {len(self.profiles)} (some featured)")
|
|
print(
|
|
f"✅ Store submissions created: {len(self.store_submissions)} (some marked as featured during creation)"
|
|
)
|
|
print(f"✅ API keys created: {len(self.api_keys)}")
|
|
print(f"✅ Presets created: {len(self.presets)}")
|
|
print("\n🚀 Your E2E test database is ready to use!")
|
|
|
|
|
|
async def main():
|
|
"""Main function to run the test data creation."""
|
|
# Connect to database
|
|
await prisma.connect()
|
|
|
|
try:
|
|
creator = TestDataCreator()
|
|
await creator.create_all_test_data()
|
|
finally:
|
|
# Disconnect from database
|
|
await prisma.disconnect()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|