feat(backend): add personal org bootstrap migration (PR2)

Idempotent data migration that creates a personal Organization for every
existing user, migrates billing state, and assigns all tenant-bound
resources to the user's default workspace.

Migration steps (all idempotent):
- Create Organization per user (slug from Profile.username → User.name → email)
- Slug collision resolution with deterministic numeric suffixes
- Create OrgMember (owner), OrgWorkspace (default), OrgWorkspaceMember
- Create OrganizationProfile from user's Profile data
- Create OrganizationSeatAssignment (FREE seat)
- Copy UserBalance → OrgBalance
- Copy CreditTransaction → OrgCreditTransaction
- Set organizationId + orgWorkspaceId on: AgentGraph, AgentGraphExecution,
  ChatSession, AgentPreset, LibraryAgent, LibraryFolder, IntegrationWebhook,
  APIKey, BuilderSearchHistory, PendingHumanReview, StoreListingVersion
- Set owningOrgId on StoreListing
- Create OrganizationAlias for published agents with divergent slugs

Wired into rest_api.py lifespan to run on server startup.

23 tests covering: slug sanitization, collision resolution, org creation
with/without profiles, balance migration, resource assignment, orchestration.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Nicholas Tindle
2026-04-01 08:27:59 +02:00
parent 51ab80425d
commit 277362b5cc
3 changed files with 809 additions and 0 deletions

View File

@@ -38,6 +38,7 @@ import backend.api.features.workspace.routes as workspace_routes
import backend.data.block
import backend.data.db
import backend.data.graph
import backend.data.org_migration
import backend.data.user
import backend.integrations.webhooks.utils
import backend.util.service
@@ -124,6 +125,7 @@ async def lifespan_context(app: fastapi.FastAPI):
await backend.data.graph.fix_llm_provider_credentials()
await backend.data.graph.migrate_llm_models(DEFAULT_LLM_MODEL)
await backend.integrations.webhooks.utils.migrate_legacy_triggered_graphs()
await backend.data.org_migration.run_migration()
with launch_darkly_context():
yield

View File

@@ -0,0 +1,473 @@
"""
Data migration: Bootstrap personal organizations for existing users.
Creates one Organization per user, with owner membership, default workspace,
org profile, seat assignment, and org balance. Assigns all tenant-bound
resources to the user's default workspace. Idempotent — safe to run repeatedly.
Run automatically during server startup via rest_api.py lifespan.
"""
import logging
import re
import time
from typing import LiteralString
from backend.data.db import prisma
logger = logging.getLogger(__name__)
def _sanitize_slug(raw: str) -> str:
"""Convert a string to a URL-safe slug: lowercase, alphanumeric + hyphens."""
slug = re.sub(r"[^a-z0-9-]", "-", raw.lower().strip())
slug = re.sub(r"-+", "-", slug).strip("-")
return slug or "user"
async def _resolve_unique_slug(desired: str) -> str:
"""Return *desired* if no Organization uses it yet, else append a numeric suffix."""
existing = await prisma.organization.find_unique(where={"slug": desired})
if existing is None:
# Also check aliases
alias = await prisma.organizationalias.find_unique(where={"aliasSlug": desired})
if alias is None:
return desired
# Collision — find the next available numeric suffix
for i in range(1, 10_000):
candidate = f"{desired}-{i}"
org = await prisma.organization.find_unique(where={"slug": candidate})
alias = await prisma.organizationalias.find_unique(
where={"aliasSlug": candidate}
)
if org is None and alias is None:
return candidate
raise RuntimeError(
f"Could not resolve a unique slug for '{desired}' after 10000 attempts"
)
async def create_orgs_for_existing_users() -> int:
"""Create a personal Organization for every user that lacks one.
Returns the number of orgs created.
"""
# Find users who are NOT yet an owner of any personal org
users_without_org = await prisma.query_raw(
"""
SELECT u."id", u."email", u."name", u."stripeCustomerId", u."topUpConfig",
p."username" AS profile_username, p."name" AS profile_name,
p."description" AS profile_description,
p."avatarUrl" AS profile_avatar_url,
p."links" AS profile_links
FROM "User" u
LEFT JOIN "Profile" p ON p."userId" = u."id"
WHERE NOT EXISTS (
SELECT 1 FROM "OrgMember" om
JOIN "Organization" o ON o."id" = om."orgId"
WHERE om."userId" = u."id" AND om."isOwner" = true AND o."isPersonal" = true
)
""",
)
if not users_without_org:
logger.info("Org migration: all users already have personal orgs")
return 0
logger.info(
f"Org migration: creating personal orgs for {len(users_without_org)} users"
)
created = 0
for row in users_without_org:
user_id: str = row["id"]
email: str = row["email"]
profile_username: str | None = row.get("profile_username")
profile_name: str | None = row.get("profile_name")
user_name: str | None = row.get("name")
# Determine slug: Profile.username → sanitized User.name → email local part → user-{id[:8]}
if profile_username:
desired_slug = _sanitize_slug(profile_username)
elif user_name:
desired_slug = _sanitize_slug(user_name)
else:
local_part = email.split("@")[0] if email else ""
desired_slug = (
_sanitize_slug(local_part) if local_part else f"user-{user_id[:8]}"
)
slug = await _resolve_unique_slug(desired_slug)
display_name = profile_name or user_name or email.split("@")[0]
# Create Organization
org = await prisma.organization.create(
data={
"name": display_name,
"slug": slug,
"isPersonal": True,
"stripeCustomerId": row.get("stripeCustomerId"),
"topUpConfig": row.get("topUpConfig"),
"bootstrapUserId": user_id,
"settings": "{}",
}
)
# Create OrgMember (owner)
await prisma.orgmember.create(
data={
"orgId": org.id,
"userId": user_id,
"isOwner": True,
"isAdmin": True,
"status": "ACTIVE",
}
)
# Create default OrgWorkspace
workspace = await prisma.orgworkspace.create(
data={
"name": "Default",
"orgId": org.id,
"isDefault": True,
"joinPolicy": "OPEN",
"createdByUserId": user_id,
}
)
# Create OrgWorkspaceMember
await prisma.orgworkspacemember.create(
data={
"workspaceId": workspace.id,
"userId": user_id,
"isAdmin": True,
"status": "ACTIVE",
}
)
# Create OrganizationProfile (from user's Profile if exists)
await prisma.organizationprofile.create(
data={
"organizationId": org.id,
"username": slug,
"displayName": display_name,
"avatarUrl": row.get("profile_avatar_url"),
"bio": row.get("profile_description"),
"socialLinks": row.get("profile_links"),
}
)
# Create seat assignment (FREE seat for personal org)
await prisma.organizationseatassignment.create(
data={
"organizationId": org.id,
"userId": user_id,
"seatType": "FREE",
"status": "ACTIVE",
"assignedByUserId": user_id,
}
)
# Log if slug diverged from desired
if slug != desired_slug:
logger.info(
f"Org migration: slug collision for user {user_id}"
f"desired '{desired_slug}', assigned '{slug}'"
)
# Create alias for the original desired slug if it was taken by another org
# (only if the desired slug belongs to a different org)
existing_org = await prisma.organization.find_unique(
where={"slug": desired_slug}
)
if existing_org and existing_org.id != org.id:
await prisma.organizationalias.create(
data={
"organizationId": org.id,
"aliasSlug": slug,
"aliasType": "MIGRATION",
"createdByUserId": user_id,
"isRemovable": False,
}
)
created += 1
logger.info(f"Org migration: created {created} personal orgs")
return created
async def migrate_org_balances() -> int:
"""Copy UserBalance rows into OrgBalance for personal orgs that lack one.
Returns the number of balances migrated.
"""
result = await prisma.execute_raw(
"""
INSERT INTO "OrgBalance" ("orgId", "balance", "updatedAt")
SELECT o."id", ub."balance", ub."updatedAt"
FROM "UserBalance" ub
JOIN "OrgMember" om ON om."userId" = ub."userId" AND om."isOwner" = true
JOIN "Organization" o ON o."id" = om."orgId" AND o."isPersonal" = true
WHERE NOT EXISTS (
SELECT 1 FROM "OrgBalance" ob WHERE ob."orgId" = o."id"
)
"""
)
logger.info(f"Org migration: migrated {result} org balances")
return result
async def migrate_credit_transactions() -> int:
"""Copy CreditTransaction rows into OrgCreditTransaction for personal orgs.
Only copies transactions that haven't been migrated yet (by checking for
matching transactionKey + orgId).
Returns the number of transactions migrated.
"""
result = await prisma.execute_raw(
"""
INSERT INTO "OrgCreditTransaction"
("transactionKey", "createdAt", "orgId", "initiatedByUserId",
"amount", "type", "runningBalance", "isActive", "metadata")
SELECT
ct."transactionKey", ct."createdAt", o."id", ct."userId",
ct."amount", ct."type", ct."runningBalance", ct."isActive", ct."metadata"
FROM "CreditTransaction" ct
JOIN "OrgMember" om ON om."userId" = ct."userId" AND om."isOwner" = true
JOIN "Organization" o ON o."id" = om."orgId" AND o."isPersonal" = true
WHERE NOT EXISTS (
SELECT 1 FROM "OrgCreditTransaction" oct
WHERE oct."transactionKey" = ct."transactionKey" AND oct."orgId" = o."id"
)
"""
)
logger.info(f"Org migration: migrated {result} credit transactions")
return result
async def _assign_workspace_tenancy(table_sql: "LiteralString") -> int:
"""Assign organizationId + orgWorkspaceId on a single table's unassigned rows."""
return await prisma.execute_raw(table_sql)
async def assign_resources_to_workspaces() -> dict[str, int]:
"""Set organizationId and orgWorkspaceId on all tenant-bound rows that lack them.
Uses the user's personal org and its default workspace.
Returns a dict of table_name -> rows_updated.
"""
results: dict[str, int] = {}
# --- Tables needing both organizationId + orgWorkspaceId ---
results["AgentGraph"] = await _assign_workspace_tenancy(
"""
UPDATE "AgentGraph" t
SET "organizationId" = o."id", "orgWorkspaceId" = w."id"
FROM "OrgMember" om
JOIN "Organization" o ON o."id" = om."orgId" AND o."isPersonal" = true
JOIN "OrgWorkspace" w ON w."orgId" = o."id" AND w."isDefault" = true
WHERE t."userId" = om."userId" AND om."isOwner" = true AND t."organizationId" IS NULL
"""
)
results["AgentGraphExecution"] = await _assign_workspace_tenancy(
"""
UPDATE "AgentGraphExecution" t
SET "organizationId" = o."id", "orgWorkspaceId" = w."id"
FROM "OrgMember" om
JOIN "Organization" o ON o."id" = om."orgId" AND o."isPersonal" = true
JOIN "OrgWorkspace" w ON w."orgId" = o."id" AND w."isDefault" = true
WHERE t."userId" = om."userId" AND om."isOwner" = true AND t."organizationId" IS NULL
"""
)
results["ChatSession"] = await _assign_workspace_tenancy(
"""
UPDATE "ChatSession" t
SET "organizationId" = o."id", "orgWorkspaceId" = w."id"
FROM "OrgMember" om
JOIN "Organization" o ON o."id" = om."orgId" AND o."isPersonal" = true
JOIN "OrgWorkspace" w ON w."orgId" = o."id" AND w."isDefault" = true
WHERE t."userId" = om."userId" AND om."isOwner" = true AND t."organizationId" IS NULL
"""
)
results["AgentPreset"] = await _assign_workspace_tenancy(
"""
UPDATE "AgentPreset" t
SET "organizationId" = o."id", "orgWorkspaceId" = w."id"
FROM "OrgMember" om
JOIN "Organization" o ON o."id" = om."orgId" AND o."isPersonal" = true
JOIN "OrgWorkspace" w ON w."orgId" = o."id" AND w."isDefault" = true
WHERE t."userId" = om."userId" AND om."isOwner" = true AND t."organizationId" IS NULL
"""
)
results["LibraryAgent"] = await _assign_workspace_tenancy(
"""
UPDATE "LibraryAgent" t
SET "organizationId" = o."id", "orgWorkspaceId" = w."id"
FROM "OrgMember" om
JOIN "Organization" o ON o."id" = om."orgId" AND o."isPersonal" = true
JOIN "OrgWorkspace" w ON w."orgId" = o."id" AND w."isDefault" = true
WHERE t."userId" = om."userId" AND om."isOwner" = true AND t."organizationId" IS NULL
"""
)
results["LibraryFolder"] = await _assign_workspace_tenancy(
"""
UPDATE "LibraryFolder" t
SET "organizationId" = o."id", "orgWorkspaceId" = w."id"
FROM "OrgMember" om
JOIN "Organization" o ON o."id" = om."orgId" AND o."isPersonal" = true
JOIN "OrgWorkspace" w ON w."orgId" = o."id" AND w."isDefault" = true
WHERE t."userId" = om."userId" AND om."isOwner" = true AND t."organizationId" IS NULL
"""
)
results["IntegrationWebhook"] = await _assign_workspace_tenancy(
"""
UPDATE "IntegrationWebhook" t
SET "organizationId" = o."id", "orgWorkspaceId" = w."id"
FROM "OrgMember" om
JOIN "Organization" o ON o."id" = om."orgId" AND o."isPersonal" = true
JOIN "OrgWorkspace" w ON w."orgId" = o."id" AND w."isDefault" = true
WHERE t."userId" = om."userId" AND om."isOwner" = true AND t."organizationId" IS NULL
"""
)
results["APIKey"] = await _assign_workspace_tenancy(
"""
UPDATE "APIKey" t
SET "organizationId" = o."id", "orgWorkspaceId" = w."id"
FROM "OrgMember" om
JOIN "Organization" o ON o."id" = om."orgId" AND o."isPersonal" = true
JOIN "OrgWorkspace" w ON w."orgId" = o."id" AND w."isDefault" = true
WHERE t."userId" = om."userId" AND om."isOwner" = true AND t."organizationId" IS NULL
"""
)
# --- Tables needing only organizationId ---
results["BuilderSearchHistory"] = await prisma.execute_raw(
"""
UPDATE "BuilderSearchHistory" t
SET "organizationId" = o."id"
FROM "OrgMember" om
JOIN "Organization" o ON o."id" = om."orgId" AND o."isPersonal" = true
WHERE t."userId" = om."userId" AND om."isOwner" = true AND t."organizationId" IS NULL
"""
)
results["PendingHumanReview"] = await prisma.execute_raw(
"""
UPDATE "PendingHumanReview" t
SET "organizationId" = o."id"
FROM "OrgMember" om
JOIN "Organization" o ON o."id" = om."orgId" AND o."isPersonal" = true
WHERE t."userId" = om."userId" AND om."isOwner" = true AND t."organizationId" IS NULL
"""
)
results["StoreListingVersion"] = await prisma.execute_raw(
"""
UPDATE "StoreListingVersion" slv
SET "organizationId" = o."id"
FROM "StoreListingVersion" v
JOIN "StoreListing" sl ON sl."id" = v."storeListingId"
JOIN "OrgMember" om ON om."userId" = sl."owningUserId" AND om."isOwner" = true
JOIN "Organization" o ON o."id" = om."orgId" AND o."isPersonal" = true
WHERE slv."id" = v."id" AND slv."organizationId" IS NULL
"""
)
for table_name, count in results.items():
if count > 0:
logger.info(f"Org migration: assigned {count} {table_name} rows")
return results
async def migrate_store_listings() -> int:
"""Set owningOrgId on StoreListings that lack it.
Returns the number of listings migrated.
"""
result = await prisma.execute_raw(
"""
UPDATE "StoreListing" sl
SET "owningOrgId" = o."id"
FROM "OrgMember" om
JOIN "Organization" o ON o."id" = om."orgId" AND o."isPersonal" = true
WHERE sl."owningUserId" = om."userId"
AND om."isOwner" = true
AND sl."owningOrgId" IS NULL
"""
)
if result > 0:
logger.info(f"Org migration: assigned {result} store listings to orgs")
return result
async def create_store_listing_aliases() -> int:
"""Create OrganizationAlias entries for published store listings.
This ensures that marketplace URLs using the org slug continue to work.
Only creates aliases for listings whose org slug matches the user's Profile
username (which it should for personal orgs created from Profile.username).
Returns the number of aliases created.
"""
result = await prisma.execute_raw(
"""
INSERT INTO "OrganizationAlias"
("id", "organizationId", "aliasSlug", "aliasType", "createdByUserId", "isRemovable")
SELECT
gen_random_uuid(),
o."id",
p."username",
'MIGRATION',
o."bootstrapUserId",
false
FROM "StoreListing" sl
JOIN "Organization" o ON o."id" = sl."owningOrgId"
JOIN "Profile" p ON p."userId" = sl."owningUserId"
WHERE sl."owningOrgId" IS NOT NULL
AND sl."hasApprovedVersion" = true
AND o."slug" != p."username"
AND NOT EXISTS (
SELECT 1 FROM "OrganizationAlias" oa
WHERE oa."aliasSlug" = p."username"
)
"""
)
if result > 0:
logger.info(f"Org migration: created {result} store listing aliases")
return result
async def run_migration() -> None:
"""Orchestrate the full org bootstrap migration. Idempotent."""
start = time.monotonic()
logger.info("Org migration: starting personal org bootstrap")
orgs_created = await create_orgs_for_existing_users()
await migrate_org_balances()
await migrate_credit_transactions()
resource_counts = await assign_resources_to_workspaces()
await migrate_store_listings()
await create_store_listing_aliases()
total_resources = sum(resource_counts.values())
elapsed = time.monotonic() - start
logger.info(
f"Org migration: complete in {elapsed:.2f}s — "
f"{orgs_created} orgs created, {total_resources} resources assigned"
)

View File

@@ -0,0 +1,334 @@
"""Tests for the personal org bootstrap migration.
Tests the migration logic including slug resolution, idempotency,
and correct data mapping. Uses mocks for Prisma DB calls since the
test infrastructure does not provide a live database connection.
"""
from unittest.mock import AsyncMock, MagicMock
import pytest
from backend.data.org_migration import (
_resolve_unique_slug,
_sanitize_slug,
assign_resources_to_workspaces,
create_orgs_for_existing_users,
migrate_credit_transactions,
migrate_org_balances,
migrate_store_listings,
run_migration,
)
@pytest.fixture(autouse=True)
def mock_prisma(mocker):
"""Replace the prisma client in org_migration with a full mock."""
mock = MagicMock()
# Default: all find_unique calls return None (no collisions)
mock.organization.find_unique = AsyncMock(return_value=None)
mock.organizationalias.find_unique = AsyncMock(return_value=None)
mock.organization.create = AsyncMock(return_value=MagicMock(id="org-1"))
mock.orgmember.create = AsyncMock()
mock.orgworkspace.create = AsyncMock(return_value=MagicMock(id="ws-1"))
mock.orgworkspacemember.create = AsyncMock()
mock.organizationprofile.create = AsyncMock()
mock.organizationseatassignment.create = AsyncMock()
mock.query_raw = AsyncMock(return_value=[])
mock.execute_raw = AsyncMock(return_value=0)
mocker.patch("backend.data.org_migration.prisma", mock)
return mock
# ---------------------------------------------------------------------------
# _sanitize_slug
# ---------------------------------------------------------------------------
class TestSanitizeSlug:
def test_lowercase_and_hyphens(self):
assert _sanitize_slug("Hello World") == "hello-world"
def test_strips_special_chars(self):
assert _sanitize_slug("user@name!#$%") == "user-name"
def test_collapses_multiple_hyphens(self):
assert _sanitize_slug("a---b") == "a-b"
def test_strips_leading_trailing_hyphens(self):
assert _sanitize_slug("-hello-") == "hello"
def test_empty_string_returns_user(self):
assert _sanitize_slug("") == "user"
def test_only_special_chars_returns_user(self):
assert _sanitize_slug("@#$%") == "user"
def test_numeric_slug(self):
assert _sanitize_slug("12345") == "12345"
def test_preserves_hyphens(self):
assert _sanitize_slug("my-cool-agent") == "my-cool-agent"
def test_unicode_stripped(self):
assert _sanitize_slug("caf\u00e9-latt\u00e9") == "caf-latt"
def test_whitespace_only(self):
assert _sanitize_slug(" ") == "user"
# ---------------------------------------------------------------------------
# _resolve_unique_slug
# ---------------------------------------------------------------------------
class TestResolveUniqueSlug:
@pytest.mark.asyncio
async def test_slug_available_returns_as_is(self, mock_prisma):
result = await _resolve_unique_slug("my-org")
assert result == "my-org"
@pytest.mark.asyncio
async def test_slug_taken_by_org_gets_suffix(self, mock_prisma):
async def org_find(where):
slug = where.get("slug", "")
if slug == "taken":
return MagicMock(id="existing-org")
return None
mock_prisma.organization.find_unique = AsyncMock(side_effect=org_find)
result = await _resolve_unique_slug("taken")
assert result == "taken-1"
@pytest.mark.asyncio
async def test_slug_taken_by_alias_gets_suffix(self, mock_prisma):
async def alias_find(where):
slug = where.get("aliasSlug", "")
if slug == "aliased":
return MagicMock()
return None
mock_prisma.organizationalias.find_unique = AsyncMock(side_effect=alias_find)
result = await _resolve_unique_slug("aliased")
assert result == "aliased-1"
@pytest.mark.asyncio
async def test_multiple_collisions_increments(self, mock_prisma):
async def org_find(where):
slug = where.get("slug", "")
if slug in ("x", "x-1", "x-2"):
return MagicMock(id="existing")
return None
mock_prisma.organization.find_unique = AsyncMock(side_effect=org_find)
result = await _resolve_unique_slug("x")
assert result == "x-3"
# ---------------------------------------------------------------------------
# create_orgs_for_existing_users
# ---------------------------------------------------------------------------
class TestCreateOrgsForExistingUsers:
@pytest.mark.asyncio
async def test_no_users_without_org_is_noop(self, mock_prisma):
result = await create_orgs_for_existing_users()
assert result == 0
@pytest.mark.asyncio
async def test_user_with_profile_gets_profile_username_slug(self, mock_prisma):
mock_prisma.query_raw = AsyncMock(
return_value=[
{
"id": "user-1",
"email": "alice@example.com",
"name": "Alice",
"stripeCustomerId": "cus_123",
"topUpConfig": None,
"profile_username": "alice",
"profile_name": "Alice Smith",
"profile_description": "A developer",
"profile_avatar_url": "https://example.com/avatar.png",
"profile_links": ["https://github.com/alice"],
},
]
)
result = await create_orgs_for_existing_users()
assert result == 1
# Verify org was created with profile-derived slug
mock_prisma.organization.create.assert_called_once()
create_data = mock_prisma.organization.create.call_args[1]["data"]
assert create_data["slug"] == "alice"
assert create_data["name"] == "Alice Smith"
assert create_data["isPersonal"] is True
assert create_data["stripeCustomerId"] == "cus_123"
assert create_data["bootstrapUserId"] == "user-1"
# Verify workspace created
mock_prisma.orgworkspace.create.assert_called_once()
ws_data = mock_prisma.orgworkspace.create.call_args[1]["data"]
assert ws_data["name"] == "Default"
assert ws_data["isDefault"] is True
assert ws_data["joinPolicy"] == "OPEN"
@pytest.mark.asyncio
async def test_user_without_profile_uses_email_slug(self, mock_prisma):
mock_prisma.query_raw = AsyncMock(
return_value=[
{
"id": "user-2",
"email": "bob@company.org",
"name": None,
"stripeCustomerId": None,
"topUpConfig": None,
"profile_username": None,
"profile_name": None,
"profile_description": None,
"profile_avatar_url": None,
"profile_links": None,
},
]
)
result = await create_orgs_for_existing_users()
assert result == 1
create_data = mock_prisma.organization.create.call_args[1]["data"]
assert create_data["slug"] == "bob"
assert create_data["name"] == "bob"
# ---------------------------------------------------------------------------
# migrate_org_balances
# ---------------------------------------------------------------------------
class TestMigrateOrgBalances:
@pytest.mark.asyncio
async def test_returns_count(self, mock_prisma):
mock_prisma.execute_raw = AsyncMock(return_value=5)
result = await migrate_org_balances()
assert result == 5
# ---------------------------------------------------------------------------
# migrate_credit_transactions
# ---------------------------------------------------------------------------
class TestMigrateCreditTransactions:
@pytest.mark.asyncio
async def test_returns_count(self, mock_prisma):
mock_prisma.execute_raw = AsyncMock(return_value=42)
result = await migrate_credit_transactions()
assert result == 42
# ---------------------------------------------------------------------------
# assign_resources_to_workspaces
# ---------------------------------------------------------------------------
class TestAssignResources:
@pytest.mark.asyncio
async def test_updates_all_tables(self, mock_prisma, mocker):
mocker.patch(
"backend.data.org_migration._assign_workspace_tenancy",
new_callable=AsyncMock,
return_value=10,
)
mock_prisma.execute_raw = AsyncMock(return_value=10)
result = await assign_resources_to_workspaces()
# 8 tables with workspace + 3 tables org-only = 11 entries
assert len(result) == 11
assert result["AgentGraph"] == 10
assert result["ChatSession"] == 10
assert result["BuilderSearchHistory"] == 10
assert result["PendingHumanReview"] == 10
assert result["StoreListingVersion"] == 10
@pytest.mark.asyncio
async def test_zero_updates_still_returns(self, mock_prisma, mocker):
mocker.patch(
"backend.data.org_migration._assign_workspace_tenancy",
new_callable=AsyncMock,
return_value=0,
)
mock_prisma.execute_raw = AsyncMock(return_value=0)
result = await assign_resources_to_workspaces()
assert all(v == 0 for v in result.values())
# ---------------------------------------------------------------------------
# migrate_store_listings
# ---------------------------------------------------------------------------
class TestMigrateStoreListings:
@pytest.mark.asyncio
async def test_returns_count(self, mock_prisma):
mock_prisma.execute_raw = AsyncMock(return_value=3)
result = await migrate_store_listings()
assert result == 3
# ---------------------------------------------------------------------------
# run_migration (orchestrator)
# ---------------------------------------------------------------------------
class TestRunMigration:
@pytest.mark.asyncio
async def test_calls_all_steps_in_order(self, mocker):
calls: list[str] = []
mocker.patch(
"backend.data.org_migration.create_orgs_for_existing_users",
new_callable=lambda: lambda: _track(calls, "create_orgs", 1),
)
mocker.patch(
"backend.data.org_migration.migrate_org_balances",
new_callable=lambda: lambda: _track(calls, "balances", 0),
)
mocker.patch(
"backend.data.org_migration.migrate_credit_transactions",
new_callable=lambda: lambda: _track(calls, "credits", 0),
)
mocker.patch(
"backend.data.org_migration.assign_resources_to_workspaces",
new_callable=lambda: lambda: _track(
calls, "assign_resources", {"AgentGraph": 5}
),
)
mocker.patch(
"backend.data.org_migration.migrate_store_listings",
new_callable=lambda: lambda: _track(calls, "store_listings", 0),
)
mocker.patch(
"backend.data.org_migration.create_store_listing_aliases",
new_callable=lambda: lambda: _track(calls, "aliases", 0),
)
await run_migration()
assert calls == [
"create_orgs",
"balances",
"credits",
"assign_resources",
"store_listings",
"aliases",
]
async def _track(calls: list[str], name: str, result):
calls.append(name)
return result