fix(backend): migrate all query_raw calls to query_raw_with_schema for proper schema handling (#11462)

## Summary

Complete migration of all non-test `query_raw` calls to use
`query_raw_with_schema` for proper PostgreSQL schema context handling.
This resolves the marketplace API failures where queries were looking
for unqualified table names.

## Root Cause

Prisma's `query_raw()` doesn't respect the `schema` parameter in
`DATABASE_URL` (`?schema=platform`) for raw SQL queries, causing queries
to fail when looking for unqualified table names in multi-schema
environments.

## Changes Made

### Files Updated
-  **backend/server/v2/store/db.py**: Already updated in previous
commit
-  **backend/server/v2/builder/db.py**: Updated `get_suggested_blocks`
query at line 343
-  **backend/check_store_data.py**: Updated all 4 `query_raw` calls to
use schema-aware queries
-  **backend/check_db.py**: Updated all `query_raw` calls (import
already existed)

### Technical Implementation
- Add import: `from backend.data.db import query_raw_with_schema`
- Replace `prisma.get_client().query_raw()` with
`query_raw_with_schema()`
- Add `{schema_prefix}` placeholder to table references in SQL queries
- Fix f-string template conflicts by using double braces
`{{schema_prefix}}`

### Query Examples

**Before:**
```sql
FROM "StoreAgent"
FROM "AgentNodeExecution" execution
```

**After:**
```sql  
FROM {schema_prefix}"StoreAgent"
FROM {schema_prefix}"AgentNodeExecution" execution
```

## Impact

-  All raw SQL queries now properly respect platform schema context
-  Fixes "relation does not exist" errors in multi-schema environments
-  Maintains backward compatibility with public schema deployments
-  Code formatting passes with `poetry run format`

## Testing

- All `query_raw` usages in non-test code successfully migrated
- `query_raw_with_schema` automatically handles schema prefix injection
- Existing query logic unchanged, only schema awareness added

## Before/After

**Before:** GET /api/store/agents → "relation 'StoreAgent' does not
exist"
**After:** GET /api/store/agents →  Returns store agents correctly

Resolves the marketplace API failures and ensures consistent schema
handling across all raw SQL operations.

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Zamil Majdy
2025-11-27 11:04:20 +07:00
committed by GitHub
parent 02f8a69c6a
commit ff5dd7a5b4
4 changed files with 30 additions and 29 deletions

View File

@@ -5,6 +5,8 @@ from datetime import datetime
from faker import Faker
from prisma import Prisma
from backend.data.db import query_raw_with_schema
faker = Faker()
@@ -15,9 +17,9 @@ async def check_cron_job(db):
try:
# Check if pg_cron extension exists
extension_check = await db.query_raw("CREATE EXTENSION pg_cron;")
extension_check = await query_raw_with_schema("CREATE EXTENSION pg_cron;")
print(extension_check)
extension_check = await db.query_raw(
extension_check = await query_raw_with_schema(
"SELECT COUNT(*) as count FROM pg_extension WHERE extname = 'pg_cron'"
)
if extension_check[0]["count"] == 0:
@@ -25,7 +27,7 @@ async def check_cron_job(db):
return False
# Check if the refresh job exists
job_check = await db.query_raw(
job_check = await query_raw_with_schema(
"""
SELECT jobname, schedule, command
FROM cron.job
@@ -55,33 +57,33 @@ async def get_materialized_view_counts(db):
print("-" * 40)
# Get counts from mv_agent_run_counts
agent_runs = await db.query_raw(
agent_runs = await query_raw_with_schema(
"""
SELECT COUNT(*) as total_agents,
SUM(run_count) as total_runs,
MAX(run_count) as max_runs,
MIN(run_count) as min_runs
FROM mv_agent_run_counts
FROM {schema_prefix}mv_agent_run_counts
"""
)
# Get counts from mv_review_stats
review_stats = await db.query_raw(
review_stats = await query_raw_with_schema(
"""
SELECT COUNT(*) as total_listings,
SUM(review_count) as total_reviews,
AVG(avg_rating) as overall_avg_rating
FROM mv_review_stats
FROM {schema_prefix}mv_review_stats
"""
)
# Get sample data from StoreAgent view
store_agents = await db.query_raw(
store_agents = await query_raw_with_schema(
"""
SELECT COUNT(*) as total_store_agents,
AVG(runs) as avg_runs,
AVG(rating) as avg_rating
FROM "StoreAgent"
FROM {schema_prefix}"StoreAgent"
"""
)

View File

@@ -5,6 +5,8 @@ import asyncio
from prisma import Prisma
from backend.data.db import query_raw_with_schema
async def check_store_data(db):
"""Check what store data exists in the database."""
@@ -89,11 +91,11 @@ async def check_store_data(db):
sa.creator_username,
sa.categories,
sa.updated_at
FROM "StoreAgent" sa
FROM {schema_prefix}"StoreAgent" sa
LIMIT 10;
"""
store_agents = await db.query_raw(query)
store_agents = await query_raw_with_schema(query)
print(f"Total store agents in view: {len(store_agents)}")
if store_agents:
@@ -111,22 +113,22 @@ async def check_store_data(db):
# Check for any APPROVED store listing versions
query = """
SELECT COUNT(*) as count
FROM "StoreListingVersion"
FROM {schema_prefix}"StoreListingVersion"
WHERE "submissionStatus" = 'APPROVED'
"""
result = await db.query_raw(query)
result = await query_raw_with_schema(query)
approved_count = result[0]["count"] if result else 0
print(f"Approved store listing versions: {approved_count}")
# Check for store listings with hasApprovedVersion = true
query = """
SELECT COUNT(*) as count
FROM "StoreListing"
FROM {schema_prefix}"StoreListing"
WHERE "hasApprovedVersion" = true AND "isDeleted" = false
"""
result = await db.query_raw(query)
result = await query_raw_with_schema(query)
has_approved_count = result[0]["count"] if result else 0
print(f"Store listings with approved versions: {has_approved_count}")
@@ -134,10 +136,10 @@ async def check_store_data(db):
query = """
SELECT COUNT(DISTINCT "agentGraphId") as unique_agents,
COUNT(*) as total_executions
FROM "AgentGraphExecution"
FROM {schema_prefix}"AgentGraphExecution"
"""
result = await db.query_raw(query)
result = await query_raw_with_schema(query)
if result:
print("\nAgent Graph Executions:")
print(f" Unique agents with executions: {result[0]['unique_agents']}")

View File

@@ -7,6 +7,7 @@ import backend.data.block
from backend.blocks import load_all_blocks
from backend.blocks.llm import LlmModel
from backend.data.block import AnyBlockSchema, BlockCategory, BlockInfo, BlockSchema
from backend.data.db import query_raw_with_schema
from backend.integrations.providers import ProviderName
from backend.server.v2.builder.model import (
BlockCategoryResponse,
@@ -340,13 +341,13 @@ async def get_suggested_blocks(count: int = 5) -> list[BlockInfo]:
# Calculate the cutoff timestamp
timestamp_threshold = datetime.now(timezone.utc) - timedelta(days=30)
results = await prisma.get_client().query_raw(
results = await query_raw_with_schema(
"""
SELECT
agent_node."agentBlockId" AS block_id,
COUNT(execution.id) AS execution_count
FROM "AgentNodeExecution" execution
JOIN "AgentNode" agent_node ON execution."agentNodeId" = agent_node.id
FROM {schema_prefix}"AgentNodeExecution" execution
JOIN {schema_prefix}"AgentNode" agent_node ON execution."agentNodeId" = agent_node.id
WHERE execution."endedTime" >= $1::timestamp
GROUP BY agent_node."agentBlockId"
ORDER BY execution_count DESC;

View File

@@ -12,7 +12,7 @@ import prisma.types
import backend.server.v2.store.exceptions
import backend.server.v2.store.model
from backend.data.db import transaction
from backend.data.db import query_raw_with_schema, transaction
from backend.data.graph import (
GraphMeta,
GraphModel,
@@ -120,7 +120,7 @@ async def get_store_agents(
is_available,
updated_at,
ts_rank_cd(search, query) AS rank
FROM "StoreAgent",
FROM {{schema_prefix}}"StoreAgent",
plainto_tsquery('english', $1) AS query
WHERE {sql_where_clause}
AND search @@ query
@@ -131,22 +131,18 @@ async def get_store_agents(
# Count query for pagination - only uses search term parameter
count_query = f"""
SELECT COUNT(*) as count
FROM "StoreAgent",
FROM {{schema_prefix}}"StoreAgent",
plainto_tsquery('english', $1) AS query
WHERE {sql_where_clause}
AND search @@ query
"""
# Execute both queries with parameters
agents = await prisma.client.get_client().query_raw(
typing.cast(typing.LiteralString, sql_query), *params
)
agents = await query_raw_with_schema(sql_query, *params)
# For count, use params without pagination (last 2 params)
count_params = params[:-2]
count_result = await prisma.client.get_client().query_raw(
typing.cast(typing.LiteralString, count_query), *count_params
)
count_result = await query_raw_with_schema(count_query, *count_params)
total = count_result[0]["count"] if count_result else 0
total_pages = (total + page_size - 1) // page_size