Merge branch 'dev' into swiftyos/sse-long-running-tasks

This commit is contained in:
Swifty
2026-01-30 12:01:45 +01:00
33 changed files with 742 additions and 417 deletions

View File

@@ -0,0 +1,170 @@
# CLAUDE.md - Backend
This file provides guidance to Claude Code when working with the backend.
## Essential Commands
To run something with Python package dependencies you MUST use `poetry run ...`.
```bash
# Install dependencies
poetry install
# Run database migrations
poetry run prisma migrate dev
# Start all services (database, redis, rabbitmq, clamav)
docker compose up -d
# Run the backend as a whole
poetry run app
# Run tests
poetry run test
# Run specific test
poetry run pytest path/to/test_file.py::test_function_name
# Run block tests (tests that validate all blocks work correctly)
poetry run pytest backend/blocks/test/test_block.py -xvs
# Run tests for a specific block (e.g., GetCurrentTimeBlock)
poetry run pytest 'backend/blocks/test/test_block.py::test_available_blocks[GetCurrentTimeBlock]' -xvs
# Lint and format
# prefer format if you want to just "fix" it and only get the errors that can't be autofixed
poetry run format # Black + isort
poetry run lint # ruff
```
More details can be found in @TESTING.md
### Creating/Updating Snapshots
When you first write a test or when the expected output changes:
```bash
poetry run pytest path/to/test.py --snapshot-update
```
⚠️ **Important**: Always review snapshot changes before committing! Use `git diff` to verify the changes are expected.
## Architecture
- **API Layer**: FastAPI with REST and WebSocket endpoints
- **Database**: PostgreSQL with Prisma ORM, includes pgvector for embeddings
- **Queue System**: RabbitMQ for async task processing
- **Execution Engine**: Separate executor service processes agent workflows
- **Authentication**: JWT-based with Supabase integration
- **Security**: Cache protection middleware prevents sensitive data caching in browsers/proxies
## Testing Approach
- Uses pytest with snapshot testing for API responses
- Test files are colocated with source files (`*_test.py`)
## Database Schema
Key models (defined in `schema.prisma`):
- `User`: Authentication and profile data
- `AgentGraph`: Workflow definitions with version control
- `AgentGraphExecution`: Execution history and results
- `AgentNode`: Individual nodes in a workflow
- `StoreListing`: Marketplace listings for sharing agents
## Environment Configuration
- **Backend**: `.env.default` (defaults) → `.env` (user overrides)
## Common Development Tasks
### Adding a new block
Follow the comprehensive [Block SDK Guide](@../../docs/content/platform/block-sdk-guide.md) which covers:
- Provider configuration with `ProviderBuilder`
- Block schema definition
- Authentication (API keys, OAuth, webhooks)
- Testing and validation
- File organization
Quick steps:
1. Create new file in `backend/blocks/`
2. Configure provider using `ProviderBuilder` in `_config.py`
3. Inherit from `Block` base class
4. Define input/output schemas using `BlockSchema`
5. Implement async `run` method
6. Generate unique block ID using `uuid.uuid4()`
7. Test with `poetry run pytest backend/blocks/test/test_block.py`
Note: when making many new blocks analyze the interfaces for each of these blocks and picture if they would go well together in a graph-based editor or would they struggle to connect productively?
ex: do the inputs and outputs tie well together?
If you get any pushback or hit complex block conditions check the new_blocks guide in the docs.
#### Handling files in blocks with `store_media_file()`
When blocks need to work with files (images, videos, documents), use `store_media_file()` from `backend.util.file`. The `return_format` parameter determines what you get back:
| Format | Use When | Returns |
|--------|----------|---------|
| `"for_local_processing"` | Processing with local tools (ffmpeg, MoviePy, PIL) | Local file path (e.g., `"image.png"`) |
| `"for_external_api"` | Sending content to external APIs (Replicate, OpenAI) | Data URI (e.g., `"data:image/png;base64,..."`) |
| `"for_block_output"` | Returning output from your block | Smart: `workspace://` in CoPilot, data URI in graphs |
**Examples:**
```python
# INPUT: Need to process file locally with ffmpeg
local_path = await store_media_file(
file=input_data.video,
execution_context=execution_context,
return_format="for_local_processing",
)
# local_path = "video.mp4" - use with Path/ffmpeg/etc
# INPUT: Need to send to external API like Replicate
image_b64 = await store_media_file(
file=input_data.image,
execution_context=execution_context,
return_format="for_external_api",
)
# image_b64 = "..." - send to API
# OUTPUT: Returning result from block
result_url = await store_media_file(
file=generated_image_url,
execution_context=execution_context,
return_format="for_block_output",
)
yield "image_url", result_url
# In CoPilot: result_url = "workspace://abc123"
# In graphs: result_url = "data:image/png;base64,..."
```
**Key points:**
- `for_block_output` is the ONLY format that auto-adapts to execution context
- Always use `for_block_output` for block outputs unless you have a specific reason not to
- Never hardcode workspace checks - let `for_block_output` handle it
### Modifying the API
1. Update route in `backend/api/features/`
2. Add/update Pydantic models in same directory
3. Write tests alongside the route file
4. Run `poetry run test` to verify
## Security Implementation
### Cache Protection Middleware
- Located in `backend/api/middleware/security.py`
- Default behavior: Disables caching for ALL endpoints with `Cache-Control: no-store, no-cache, must-revalidate, private`
- Uses an allow list approach - only explicitly permitted paths can be cached
- Cacheable paths include: static assets (`static/*`, `_next/static/*`), health checks, public store pages, documentation
- Prevents sensitive data (auth tokens, API keys, user data) from being cached by browsers/proxies
- To allow caching for a new endpoint, add it to `CACHEABLE_PATHS` in the middleware
- Applied to both main API server and external API applications

View File

@@ -138,7 +138,7 @@ If the test doesn't need the `user_id` specifically, mocking is not necessary as
#### Using Global Auth Fixtures
Two global auth fixtures are provided by `backend/server/conftest.py`:
Two global auth fixtures are provided by `backend/api/conftest.py`:
- `mock_jwt_user` - Regular user with `test_user_id` ("test-user-id")
- `mock_jwt_admin` - Admin user with `admin_user_id` ("admin-user-id")

View File

@@ -17,7 +17,7 @@ router = fastapi.APIRouter(
)
# Taken from backend/server/v2/store/db.py
# Taken from backend/api/features/store/db.py
def sanitize_query(query: str | None) -> str | None:
if query is None:
return query

View File

@@ -9,6 +9,7 @@ from .core import (
json_to_graph,
save_agent_to_library,
)
from .errors import get_user_message_for_error
from .service import health_check as check_external_service_health
from .service import is_external_service_configured
@@ -25,4 +26,6 @@ __all__ = [
# Service
"is_external_service_configured",
"check_external_service_health",
# Error handling
"get_user_message_for_error",
]

View File

@@ -70,7 +70,7 @@ async def generate_agent(
task_id: Task ID for async processing (enables RabbitMQ callback)
Returns:
Agent JSON dict, {"status": "accepted"} for async, or None on error
Agent JSON dict, {"status": "accepted"} for async, error dict {"type": "error", ...}, or None on error
Raises:
AgentGeneratorNotConfiguredError: If the external service is not configured.
@@ -84,7 +84,10 @@ async def generate_agent(
return result
if result:
# Ensure required fields
# Check if it's an error response - pass through as-is
if isinstance(result, dict) and result.get("type") == "error":
return result
# Ensure required fields for successful agent generation
if "id" not in result:
result["id"] = str(uuid.uuid4())
if "version" not in result:
@@ -283,7 +286,8 @@ async def generate_agent_patch(
task_id: Task ID for async processing (enables RabbitMQ callback)
Returns:
Updated agent JSON, clarifying questions dict, {"status": "accepted"} for async, or None on error
Updated agent JSON, clarifying questions dict {"type": "clarifying_questions", ...},
{"status": "accepted"} for async, error dict {"type": "error", ...}, or None on error
Raises:
AgentGeneratorNotConfiguredError: If the external service is not configured.

View File

@@ -0,0 +1,43 @@
"""Error handling utilities for agent generator."""
def get_user_message_for_error(
error_type: str,
operation: str = "process the request",
llm_parse_message: str | None = None,
validation_message: str | None = None,
) -> str:
"""Get a user-friendly error message based on error type.
This function maps internal error types to user-friendly messages,
providing a consistent experience across different agent operations.
Args:
error_type: The error type from the external service
(e.g., "llm_parse_error", "timeout", "rate_limit")
operation: Description of what operation failed, used in the default
message (e.g., "analyze the goal", "generate the agent")
llm_parse_message: Custom message for llm_parse_error type
validation_message: Custom message for validation_error type
Returns:
User-friendly error message suitable for display to the user
"""
if error_type == "llm_parse_error":
return (
llm_parse_message
or "The AI had trouble processing this request. Please try again."
)
elif error_type == "validation_error":
return (
validation_message
or "The request failed validation. Please try rephrasing."
)
elif error_type == "patch_error":
return "Failed to apply the changes. Please try a different approach."
elif error_type in ("timeout", "llm_timeout"):
return "The request took too long. Please try again."
elif error_type in ("rate_limit", "llm_rate_limit"):
return "The service is currently busy. Please try again in a moment."
else:
return f"Failed to {operation}. Please try again."

View File

@@ -14,6 +14,70 @@ from backend.util.settings import Settings
logger = logging.getLogger(__name__)
def _create_error_response(
error_message: str,
error_type: str = "unknown",
details: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Create a standardized error response dict.
Args:
error_message: Human-readable error message
error_type: Machine-readable error type
details: Optional additional error details
Returns:
Error dict with type="error" and error details
"""
response: dict[str, Any] = {
"type": "error",
"error": error_message,
"error_type": error_type,
}
if details:
response["details"] = details
return response
def _classify_http_error(e: httpx.HTTPStatusError) -> tuple[str, str]:
"""Classify an HTTP error into error_type and message.
Args:
e: The HTTP status error
Returns:
Tuple of (error_type, error_message)
"""
status = e.response.status_code
if status == 429:
return "rate_limit", f"Agent Generator rate limited: {e}"
elif status == 503:
return "service_unavailable", f"Agent Generator unavailable: {e}"
elif status == 504 or status == 408:
return "timeout", f"Agent Generator timed out: {e}"
else:
return "http_error", f"HTTP error calling Agent Generator: {e}"
def _classify_request_error(e: httpx.RequestError) -> tuple[str, str]:
"""Classify a request error into error_type and message.
Args:
e: The request error
Returns:
Tuple of (error_type, error_message)
"""
error_str = str(e).lower()
if "timeout" in error_str or "timed out" in error_str:
return "timeout", f"Agent Generator request timed out: {e}"
elif "connect" in error_str:
return "connection_error", f"Could not connect to Agent Generator: {e}"
else:
return "request_error", f"Request error calling Agent Generator: {e}"
_client: httpx.AsyncClient | None = None
_settings: Settings | None = None
@@ -67,7 +131,8 @@ async def decompose_goal_external(
- {"type": "instructions", "steps": [...]}
- {"type": "unachievable_goal", ...}
- {"type": "vague_goal", ...}
Or None on error
- {"type": "error", "error": "...", "error_type": "..."} on error
Or None on unexpected error
"""
client = _get_client()
@@ -83,8 +148,13 @@ async def decompose_goal_external(
data = response.json()
if not data.get("success"):
logger.error(f"External service returned error: {data.get('error')}")
return None
error_msg = data.get("error", "Unknown error from Agent Generator")
error_type = data.get("error_type", "unknown")
logger.error(
f"Agent Generator decomposition failed: {error_msg} "
f"(type: {error_type})"
)
return _create_error_response(error_msg, error_type)
# Map the response to the expected format
response_type = data.get("type")
@@ -106,21 +176,33 @@ async def decompose_goal_external(
"type": "vague_goal",
"suggested_goal": data.get("suggested_goal"),
}
elif response_type == "error":
# Pass through error from the service
return _create_error_response(
data.get("error", "Unknown error"),
data.get("error_type", "unknown"),
)
else:
logger.error(
f"Unknown response type from external service: {response_type}"
)
return None
return _create_error_response(
f"Unknown response type from Agent Generator: {response_type}",
"invalid_response",
)
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error calling external agent generator: {e}")
return None
error_type, error_msg = _classify_http_error(e)
logger.error(error_msg)
return _create_error_response(error_msg, error_type)
except httpx.RequestError as e:
logger.error(f"Request error calling external agent generator: {e}")
return None
error_type, error_msg = _classify_request_error(e)
logger.error(error_msg)
return _create_error_response(error_msg, error_type)
except Exception as e:
logger.error(f"Unexpected error calling external agent generator: {e}")
return None
error_msg = f"Unexpected error calling Agent Generator: {e}"
logger.error(error_msg)
return _create_error_response(error_msg, "unexpected_error")
async def generate_agent_external(
@@ -136,7 +218,7 @@ async def generate_agent_external(
task_id: Task ID for async processing (enables RabbitMQ callback)
Returns:
Agent JSON dict, or {"status": "accepted"} for async, or None on error
Agent JSON dict, {"status": "accepted"} for async, or error dict {"type": "error", ...} on error
"""
client = _get_client()
@@ -165,20 +247,28 @@ async def generate_agent_external(
data = response.json()
if not data.get("success"):
logger.error(f"External service returned error: {data.get('error')}")
return None
error_msg = data.get("error", "Unknown error from Agent Generator")
error_type = data.get("error_type", "unknown")
logger.error(
f"Agent Generator generation failed: {error_msg} "
f"(type: {error_type})"
)
return _create_error_response(error_msg, error_type)
return data.get("agent_json")
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error calling external agent generator: {e}")
return None
error_type, error_msg = _classify_http_error(e)
logger.error(error_msg)
return _create_error_response(error_msg, error_type)
except httpx.RequestError as e:
logger.error(f"Request error calling external agent generator: {e}")
return None
error_type, error_msg = _classify_request_error(e)
logger.error(error_msg)
return _create_error_response(error_msg, error_type)
except Exception as e:
logger.error(f"Unexpected error calling external agent generator: {e}")
return None
error_msg = f"Unexpected error calling Agent Generator: {e}"
logger.error(error_msg)
return _create_error_response(error_msg, "unexpected_error")
async def generate_agent_patch_external(
@@ -196,7 +286,7 @@ async def generate_agent_patch_external(
task_id: Task ID for async processing (enables RabbitMQ callback)
Returns:
Updated agent JSON, clarifying questions dict, {"status": "accepted"} for async, or None on error
Updated agent JSON, clarifying questions dict, {"status": "accepted"} for async, or error dict on error
"""
client = _get_client()
@@ -228,8 +318,13 @@ async def generate_agent_patch_external(
data = response.json()
if not data.get("success"):
logger.error(f"External service returned error: {data.get('error')}")
return None
error_msg = data.get("error", "Unknown error from Agent Generator")
error_type = data.get("error_type", "unknown")
logger.error(
f"Agent Generator patch generation failed: {error_msg} "
f"(type: {error_type})"
)
return _create_error_response(error_msg, error_type)
# Check if it's clarifying questions
if data.get("type") == "clarifying_questions":
@@ -238,18 +333,28 @@ async def generate_agent_patch_external(
"questions": data.get("questions", []),
}
# Check if it's an error passed through
if data.get("type") == "error":
return _create_error_response(
data.get("error", "Unknown error"),
data.get("error_type", "unknown"),
)
# Otherwise return the updated agent JSON
return data.get("agent_json")
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error calling external agent generator: {e}")
return None
error_type, error_msg = _classify_http_error(e)
logger.error(error_msg)
return _create_error_response(error_msg, error_type)
except httpx.RequestError as e:
logger.error(f"Request error calling external agent generator: {e}")
return None
error_type, error_msg = _classify_request_error(e)
logger.error(error_msg)
return _create_error_response(error_msg, error_type)
except Exception as e:
logger.error(f"Unexpected error calling external agent generator: {e}")
return None
error_msg = f"Unexpected error calling Agent Generator: {e}"
logger.error(error_msg)
return _create_error_response(error_msg, "unexpected_error")
async def get_blocks_external() -> list[dict[str, Any]] | None:

View File

@@ -9,6 +9,7 @@ from .agent_generator import (
AgentGeneratorNotConfiguredError,
decompose_goal,
generate_agent,
get_user_message_for_error,
save_agent_to_library,
)
from .base import BaseTool
@@ -122,11 +123,29 @@ class CreateAgentTool(BaseTool):
if decomposition_result is None:
return ErrorResponse(
message="Failed to analyze the goal. The agent generation service may be unavailable or timed out. Please try again.",
message="Failed to analyze the goal. The agent generation service may be unavailable. Please try again.",
error="decomposition_failed",
details={"description": description[:100]},
session_id=session_id,
)
# Check if the result is an error from the external service
if decomposition_result.get("type") == "error":
error_msg = decomposition_result.get("error", "Unknown error")
error_type = decomposition_result.get("error_type", "unknown")
user_message = get_user_message_for_error(
error_type,
operation="analyze the goal",
llm_parse_message="The AI had trouble understanding this request. Please try rephrasing your goal.",
)
return ErrorResponse(
message=user_message,
error=f"decomposition_failed:{error_type}",
details={
"description": description[:100]
}, # Include context for debugging
"description": description[:100],
"service_error": error_msg,
"error_type": error_type,
},
session_id=session_id,
)
@@ -195,11 +214,30 @@ class CreateAgentTool(BaseTool):
if agent_json is None:
return ErrorResponse(
message="Failed to generate the agent. The agent generation service may be unavailable or timed out. Please try again.",
message="Failed to generate the agent. The agent generation service may be unavailable. Please try again.",
error="generation_failed",
details={"description": description[:100]},
session_id=session_id,
)
# Check if the result is an error from the external service
if isinstance(agent_json, dict) and agent_json.get("type") == "error":
error_msg = agent_json.get("error", "Unknown error")
error_type = agent_json.get("error_type", "unknown")
user_message = get_user_message_for_error(
error_type,
operation="generate the agent",
llm_parse_message="The AI had trouble generating the agent. Please try again or simplify your goal.",
validation_message="The generated agent failed validation. Please try rephrasing your goal.",
)
return ErrorResponse(
message=user_message,
error=f"generation_failed:{error_type}",
details={
"description": description[:100]
}, # Include context for debugging
"description": description[:100],
"service_error": error_msg,
"error_type": error_type,
},
session_id=session_id,
)

View File

@@ -9,6 +9,7 @@ from .agent_generator import (
AgentGeneratorNotConfiguredError,
generate_agent_patch,
get_agent_as_json,
get_user_message_for_error,
save_agent_to_library,
)
from .base import BaseTool
@@ -175,6 +176,28 @@ class EditAgentTool(BaseTool):
session_id=session_id,
)
# Check if the result is an error from the external service
if isinstance(result, dict) and result.get("type") == "error":
error_msg = result.get("error", "Unknown error")
error_type = result.get("error_type", "unknown")
user_message = get_user_message_for_error(
error_type,
operation="generate the changes",
llm_parse_message="The AI had trouble generating the changes. Please try again or simplify your request.",
validation_message="The generated changes failed validation. Please try rephrasing your request.",
)
return ErrorResponse(
message=user_message,
error=f"update_generation_failed:{error_type}",
details={
"agent_id": agent_id,
"changes": changes[:100],
"service_error": error_msg,
"error_type": error_type,
},
session_id=session_id,
)
# Check if LLM returned clarifying questions
if result.get("type") == "clarifying_questions":
questions = result.get("questions", [])

View File

@@ -115,7 +115,6 @@ class LlmModel(str, Enum, metaclass=LlmModelMeta):
CLAUDE_4_5_OPUS = "claude-opus-4-5-20251101"
CLAUDE_4_5_SONNET = "claude-sonnet-4-5-20250929"
CLAUDE_4_5_HAIKU = "claude-haiku-4-5-20251001"
CLAUDE_3_7_SONNET = "claude-3-7-sonnet-20250219"
CLAUDE_3_HAIKU = "claude-3-haiku-20240307"
# AI/ML API models
AIML_API_QWEN2_5_72B = "Qwen/Qwen2.5-72B-Instruct-Turbo"
@@ -280,9 +279,6 @@ MODEL_METADATA = {
LlmModel.CLAUDE_4_5_HAIKU: ModelMetadata(
"anthropic", 200000, 64000, "Claude Haiku 4.5", "Anthropic", "Anthropic", 2
), # claude-haiku-4-5-20251001
LlmModel.CLAUDE_3_7_SONNET: ModelMetadata(
"anthropic", 200000, 64000, "Claude 3.7 Sonnet", "Anthropic", "Anthropic", 2
), # claude-3-7-sonnet-20250219
LlmModel.CLAUDE_3_HAIKU: ModelMetadata(
"anthropic", 200000, 4096, "Claude 3 Haiku", "Anthropic", "Anthropic", 1
), # claude-3-haiku-20240307

View File

@@ -83,7 +83,7 @@ class StagehandRecommendedLlmModel(str, Enum):
GPT41_MINI = "gpt-4.1-mini-2025-04-14"
# Anthropic
CLAUDE_3_7_SONNET = "claude-3-7-sonnet-20250219"
CLAUDE_4_5_SONNET = "claude-sonnet-4-5-20250929"
@property
def provider_name(self) -> str:
@@ -137,7 +137,7 @@ class StagehandObserveBlock(Block):
model: StagehandRecommendedLlmModel = SchemaField(
title="LLM Model",
description="LLM to use for Stagehand (provider is inferred)",
default=StagehandRecommendedLlmModel.CLAUDE_3_7_SONNET,
default=StagehandRecommendedLlmModel.CLAUDE_4_5_SONNET,
advanced=False,
)
model_credentials: AICredentials = AICredentialsField()
@@ -230,7 +230,7 @@ class StagehandActBlock(Block):
model: StagehandRecommendedLlmModel = SchemaField(
title="LLM Model",
description="LLM to use for Stagehand (provider is inferred)",
default=StagehandRecommendedLlmModel.CLAUDE_3_7_SONNET,
default=StagehandRecommendedLlmModel.CLAUDE_4_5_SONNET,
advanced=False,
)
model_credentials: AICredentials = AICredentialsField()
@@ -330,7 +330,7 @@ class StagehandExtractBlock(Block):
model: StagehandRecommendedLlmModel = SchemaField(
title="LLM Model",
description="LLM to use for Stagehand (provider is inferred)",
default=StagehandRecommendedLlmModel.CLAUDE_3_7_SONNET,
default=StagehandRecommendedLlmModel.CLAUDE_4_5_SONNET,
advanced=False,
)
model_credentials: AICredentials = AICredentialsField()

View File

@@ -81,7 +81,6 @@ MODEL_COST: dict[LlmModel, int] = {
LlmModel.CLAUDE_4_5_HAIKU: 4,
LlmModel.CLAUDE_4_5_OPUS: 14,
LlmModel.CLAUDE_4_5_SONNET: 9,
LlmModel.CLAUDE_3_7_SONNET: 5,
LlmModel.CLAUDE_3_HAIKU: 1,
LlmModel.AIML_API_QWEN2_5_72B: 1,
LlmModel.AIML_API_LLAMA3_1_70B: 1,

View File

@@ -666,10 +666,16 @@ class CredentialsFieldInfo(BaseModel, Generic[CP, CT]):
if not (self.discriminator and self.discriminator_mapping):
return self
try:
provider = self.discriminator_mapping[discriminator_value]
except KeyError:
raise ValueError(
f"Model '{discriminator_value}' is not supported. "
"It may have been deprecated. Please update your agent configuration."
)
return CredentialsFieldInfo(
credentials_provider=frozenset(
[self.discriminator_mapping[discriminator_value]]
),
credentials_provider=frozenset([provider]),
credentials_types=self.supported_types,
credentials_scopes=self.required_scopes,
discriminator=self.discriminator,

View File

@@ -0,0 +1,22 @@
-- Migrate Claude 3.7 Sonnet to Claude 4.5 Sonnet
-- This updates all AgentNode blocks that use the deprecated Claude 3.7 Sonnet model
-- Anthropic is retiring claude-3-7-sonnet-20250219 on February 19, 2026
-- Update AgentNode constant inputs
UPDATE "AgentNode"
SET "constantInput" = JSONB_SET(
"constantInput"::jsonb,
'{model}',
'"claude-sonnet-4-5-20250929"'::jsonb
)
WHERE "constantInput"::jsonb->>'model' = 'claude-3-7-sonnet-20250219';
-- Update AgentPreset input overrides (stored in AgentNodeExecutionInputOutput)
UPDATE "AgentNodeExecutionInputOutput"
SET "data" = JSONB_SET(
"data"::jsonb,
'{model}',
'"claude-sonnet-4-5-20250929"'::jsonb
)
WHERE "agentPresetId" IS NOT NULL
AND "data"::jsonb->>'model' = 'claude-3-7-sonnet-20250219';

View File

@@ -151,15 +151,20 @@ class TestDecomposeGoalExternal:
@pytest.mark.asyncio
async def test_decompose_goal_handles_http_error(self):
"""Test decomposition handles HTTP errors gracefully."""
mock_response = MagicMock()
mock_response.status_code = 500
mock_client = AsyncMock()
mock_client.post.side_effect = httpx.HTTPStatusError(
"Server error", request=MagicMock(), response=MagicMock()
"Server error", request=MagicMock(), response=mock_response
)
with patch.object(service, "_get_client", return_value=mock_client):
result = await service.decompose_goal_external("Build a chatbot")
assert result is None
assert result is not None
assert result.get("type") == "error"
assert result.get("error_type") == "http_error"
assert "Server error" in result.get("error", "")
@pytest.mark.asyncio
async def test_decompose_goal_handles_request_error(self):
@@ -170,7 +175,10 @@ class TestDecomposeGoalExternal:
with patch.object(service, "_get_client", return_value=mock_client):
result = await service.decompose_goal_external("Build a chatbot")
assert result is None
assert result is not None
assert result.get("type") == "error"
assert result.get("error_type") == "connection_error"
assert "Connection failed" in result.get("error", "")
@pytest.mark.asyncio
async def test_decompose_goal_handles_service_error(self):
@@ -179,6 +187,7 @@ class TestDecomposeGoalExternal:
mock_response.json.return_value = {
"success": False,
"error": "Internal error",
"error_type": "internal_error",
}
mock_response.raise_for_status = MagicMock()
@@ -188,7 +197,10 @@ class TestDecomposeGoalExternal:
with patch.object(service, "_get_client", return_value=mock_client):
result = await service.decompose_goal_external("Build a chatbot")
assert result is None
assert result is not None
assert result.get("type") == "error"
assert result.get("error") == "Internal error"
assert result.get("error_type") == "internal_error"
class TestGenerateAgentExternal:
@@ -236,7 +248,10 @@ class TestGenerateAgentExternal:
with patch.object(service, "_get_client", return_value=mock_client):
result = await service.generate_agent_external({"steps": []})
assert result is None
assert result is not None
assert result.get("type") == "error"
assert result.get("error_type") == "connection_error"
assert "Connection failed" in result.get("error", "")
class TestGenerateAgentPatchExternal:

View File

@@ -43,19 +43,24 @@ faker = Faker()
# Constants for data generation limits (reduced for E2E tests)
NUM_USERS = 15
NUM_AGENT_BLOCKS = 30
MIN_GRAPHS_PER_USER = 15
MAX_GRAPHS_PER_USER = 15
MIN_GRAPHS_PER_USER = 25
MAX_GRAPHS_PER_USER = 25
MIN_NODES_PER_GRAPH = 3
MAX_NODES_PER_GRAPH = 6
MIN_PRESETS_PER_USER = 2
MAX_PRESETS_PER_USER = 3
MIN_AGENTS_PER_USER = 15
MAX_AGENTS_PER_USER = 15
MIN_AGENTS_PER_USER = 25
MAX_AGENTS_PER_USER = 25
MIN_EXECUTIONS_PER_GRAPH = 2
MAX_EXECUTIONS_PER_GRAPH = 8
MIN_REVIEWS_PER_VERSION = 2
MAX_REVIEWS_PER_VERSION = 5
# Guaranteed minimums for marketplace tests (deterministic)
GUARANTEED_FEATURED_AGENTS = 8
GUARANTEED_FEATURED_CREATORS = 5
GUARANTEED_TOP_AGENTS = 10
def get_image():
"""Generate a consistent image URL using picsum.photos service."""
@@ -385,7 +390,7 @@ class TestDataCreator:
library_agents = []
for user in self.users:
num_agents = 10 # Create exactly 10 agents per user
num_agents = random.randint(MIN_AGENTS_PER_USER, MAX_AGENTS_PER_USER)
# Get available graphs for this user
user_graphs = [
@@ -507,14 +512,17 @@ class TestDataCreator:
existing_profiles, min(num_creators, len(existing_profiles))
)
# Mark about 50% of creators as featured (more for testing)
num_featured = max(2, int(num_creators * 0.5))
# Guarantee at least GUARANTEED_FEATURED_CREATORS featured creators
num_featured = max(GUARANTEED_FEATURED_CREATORS, int(num_creators * 0.5))
num_featured = min(
num_featured, len(selected_profiles)
) # Don't exceed available profiles
featured_profile_ids = set(
random.sample([p.id for p in selected_profiles], num_featured)
)
print(
f"🎯 Creating {num_featured} featured creators (min: {GUARANTEED_FEATURED_CREATORS})"
)
for profile in selected_profiles:
try:
@@ -545,21 +553,25 @@ class TestDataCreator:
return profiles
async def create_test_store_submissions(self) -> List[Dict[str, Any]]:
"""Create test store submissions using the API function."""
"""Create test store submissions using the API function.
DETERMINISTIC: Guarantees minimum featured agents for E2E tests.
"""
print("Creating test store submissions...")
submissions = []
approved_submissions = []
featured_count = 0
submission_counter = 0
# Create a special test submission for test123@gmail.com
# Create a special test submission for test123@gmail.com (ALWAYS approved + featured)
test_user = next(
(user for user in self.users if user["email"] == "test123@gmail.com"), None
)
if test_user:
# Special test data for consistent testing
if test_user and self.agent_graphs:
test_submission_data = {
"user_id": test_user["id"],
"agent_id": self.agent_graphs[0]["id"], # Use first available graph
"agent_id": self.agent_graphs[0]["id"],
"agent_version": 1,
"slug": "test-agent-submission",
"name": "Test Agent Submission",
@@ -580,37 +592,24 @@ class TestDataCreator:
submissions.append(test_submission.model_dump())
print("✅ Created special test store submission for test123@gmail.com")
# Randomly approve, reject, or leave pending the test submission
# ALWAYS approve and feature the test submission
if test_submission.store_listing_version_id:
random_value = random.random()
if random_value < 0.4: # 40% chance to approve
approved_submission = await review_store_submission(
store_listing_version_id=test_submission.store_listing_version_id,
is_approved=True,
external_comments="Test submission approved",
internal_comments="Auto-approved test submission",
reviewer_id=test_user["id"],
)
approved_submissions.append(approved_submission.model_dump())
print("✅ Approved test store submission")
approved_submission = await review_store_submission(
store_listing_version_id=test_submission.store_listing_version_id,
is_approved=True,
external_comments="Test submission approved",
internal_comments="Auto-approved test submission",
reviewer_id=test_user["id"],
)
approved_submissions.append(approved_submission.model_dump())
print("✅ Approved test store submission")
# Mark approved submission as featured
await prisma.storelistingversion.update(
where={"id": test_submission.store_listing_version_id},
data={"isFeatured": True},
)
print("🌟 Marked test agent as FEATURED")
elif random_value < 0.7: # 30% chance to reject (40% to 70%)
await review_store_submission(
store_listing_version_id=test_submission.store_listing_version_id,
is_approved=False,
external_comments="Test submission rejected - needs improvements",
internal_comments="Auto-rejected test submission for E2E testing",
reviewer_id=test_user["id"],
)
print("❌ Rejected test store submission")
else: # 30% chance to leave pending (70% to 100%)
print("⏳ Left test submission pending for review")
await prisma.storelistingversion.update(
where={"id": test_submission.store_listing_version_id},
data={"isFeatured": True},
)
featured_count += 1
print("🌟 Marked test agent as FEATURED")
except Exception as e:
print(f"Error creating test store submission: {e}")
@@ -620,7 +619,6 @@ class TestDataCreator:
# Create regular submissions for all users
for user in self.users:
# Get available graphs for this specific user
user_graphs = [
g for g in self.agent_graphs if g.get("userId") == user["id"]
]
@@ -631,18 +629,17 @@ class TestDataCreator:
)
continue
# Create exactly 4 store submissions per user
for submission_index in range(4):
graph = random.choice(user_graphs)
submission_counter += 1
try:
print(
f"Creating store submission for user {user['id']} with graph {graph['id']} (owner: {graph.get('userId')})"
f"Creating store submission for user {user['id']} with graph {graph['id']}"
)
# Use the API function to create store submission with correct parameters
submission = await create_store_submission(
user_id=user["id"], # Must match graph's userId
user_id=user["id"],
agent_id=graph["id"],
agent_version=graph.get("version", 1),
slug=faker.slug(),
@@ -651,22 +648,24 @@ class TestDataCreator:
video_url=get_video_url() if random.random() < 0.3 else None,
image_urls=[get_image() for _ in range(3)],
description=faker.text(),
categories=[
get_category()
], # Single category from predefined list
categories=[get_category()],
changes_summary="Initial E2E test submission",
)
submissions.append(submission.model_dump())
print(f"✅ Created store submission: {submission.name}")
# Randomly approve, reject, or leave pending the submission
if submission.store_listing_version_id:
random_value = random.random()
if random_value < 0.4: # 40% chance to approve
try:
# Pick a random user as the reviewer (admin)
reviewer_id = random.choice(self.users)["id"]
# DETERMINISTIC: First N submissions are always approved
# First GUARANTEED_FEATURED_AGENTS of those are always featured
should_approve = (
submission_counter <= GUARANTEED_TOP_AGENTS
or random.random() < 0.4
)
should_feature = featured_count < GUARANTEED_FEATURED_AGENTS
if should_approve:
try:
reviewer_id = random.choice(self.users)["id"]
approved_submission = await review_store_submission(
store_listing_version_id=submission.store_listing_version_id,
is_approved=True,
@@ -681,16 +680,7 @@ class TestDataCreator:
f"✅ Approved store submission: {submission.name}"
)
# Mark some agents as featured during creation (30% chance)
# More likely for creators and first submissions
is_creator = user["id"] in [
p.get("userId") for p in self.profiles
]
feature_chance = (
0.5 if is_creator else 0.2
) # 50% for creators, 20% for others
if random.random() < feature_chance:
if should_feature:
try:
await prisma.storelistingversion.update(
where={
@@ -698,8 +688,25 @@ class TestDataCreator:
},
data={"isFeatured": True},
)
featured_count += 1
print(
f"🌟 Marked agent as FEATURED: {submission.name}"
f"🌟 Marked agent as FEATURED ({featured_count}/{GUARANTEED_FEATURED_AGENTS}): {submission.name}"
)
except Exception as e:
print(
f"Warning: Could not mark submission as featured: {e}"
)
elif random.random() < 0.2:
try:
await prisma.storelistingversion.update(
where={
"id": submission.store_listing_version_id
},
data={"isFeatured": True},
)
featured_count += 1
print(
f"🌟 Marked agent as FEATURED (bonus): {submission.name}"
)
except Exception as e:
print(
@@ -710,11 +717,9 @@ class TestDataCreator:
print(
f"Warning: Could not approve submission {submission.name}: {e}"
)
elif random_value < 0.7: # 30% chance to reject (40% to 70%)
elif random.random() < 0.5:
try:
# Pick a random user as the reviewer (admin)
reviewer_id = random.choice(self.users)["id"]
await review_store_submission(
store_listing_version_id=submission.store_listing_version_id,
is_approved=False,
@@ -729,7 +734,7 @@ class TestDataCreator:
print(
f"Warning: Could not reject submission {submission.name}: {e}"
)
else: # 30% chance to leave pending (70% to 100%)
else:
print(
f"⏳ Left submission pending for review: {submission.name}"
)
@@ -743,9 +748,13 @@ class TestDataCreator:
traceback.print_exc()
continue
print("\n📊 Store Submissions Summary:")
print(f" Created: {len(submissions)}")
print(f" Approved: {len(approved_submissions)}")
print(
f"Created {len(submissions)} store submissions, approved {len(approved_submissions)}"
f" Featured: {featured_count} (guaranteed min: {GUARANTEED_FEATURED_AGENTS})"
)
self.store_submissions = submissions
return submissions
@@ -825,12 +834,15 @@ class TestDataCreator:
print(f"✅ Agent blocks available: {len(self.agent_blocks)}")
print(f"✅ Agent graphs created: {len(self.agent_graphs)}")
print(f"✅ Library agents created: {len(self.library_agents)}")
print(f"✅ Creator profiles updated: {len(self.profiles)} (some featured)")
print(
f"✅ Store submissions created: {len(self.store_submissions)} (some marked as featured during creation)"
)
print(f"✅ Creator profiles updated: {len(self.profiles)}")
print(f"✅ Store submissions created: {len(self.store_submissions)}")
print(f"✅ API keys created: {len(self.api_keys)}")
print(f"✅ Presets created: {len(self.presets)}")
print("\n🎯 Deterministic Guarantees:")
print(f" • Featured agents: >= {GUARANTEED_FEATURED_AGENTS}")
print(f" • Featured creators: >= {GUARANTEED_FEATURED_CREATORS}")
print(f" • Top agents (approved): >= {GUARANTEED_TOP_AGENTS}")
print(f" • Library agents per user: >= {MIN_AGENTS_PER_USER}")
print("\n🚀 Your E2E test database is ready to use!")