Merge branch 'dev' into seer/validate-agent-graph-schema

This commit is contained in:
Swifty
2025-10-15 11:01:33 +02:00
committed by GitHub
10 changed files with 279 additions and 9 deletions

View File

@@ -0,0 +1,226 @@
# flake8: noqa: E501
import logging
from enum import Enum
from typing import Any, Literal
import openai
from pydantic import SecretStr
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import (
APIKeyCredentials,
CredentialsField,
CredentialsMetaInput,
NodeExecutionStats,
SchemaField,
)
from backend.integrations.providers import ProviderName
from backend.util.logging import TruncatedLogger
logger = TruncatedLogger(logging.getLogger(__name__), "[Perplexity-Block]")
class PerplexityModel(str, Enum):
"""Perplexity sonar models available via OpenRouter"""
SONAR = "perplexity/sonar"
SONAR_PRO = "perplexity/sonar-pro"
SONAR_DEEP_RESEARCH = "perplexity/sonar-deep-research"
PerplexityCredentials = CredentialsMetaInput[
Literal[ProviderName.OPEN_ROUTER], Literal["api_key"]
]
TEST_CREDENTIALS = APIKeyCredentials(
id="test-perplexity-creds",
provider="open_router",
api_key=SecretStr("mock-openrouter-api-key"),
title="Mock OpenRouter API key",
expires_at=None,
)
TEST_CREDENTIALS_INPUT = {
"provider": TEST_CREDENTIALS.provider,
"id": TEST_CREDENTIALS.id,
"type": TEST_CREDENTIALS.type,
"title": TEST_CREDENTIALS.title,
}
def PerplexityCredentialsField() -> PerplexityCredentials:
return CredentialsField(
description="OpenRouter API key for accessing Perplexity models.",
)
class PerplexityBlock(Block):
class Input(BlockSchema):
prompt: str = SchemaField(
description="The query to send to the Perplexity model.",
placeholder="Enter your query here...",
)
model: PerplexityModel = SchemaField(
title="Perplexity Model",
default=PerplexityModel.SONAR,
description="The Perplexity sonar model to use.",
advanced=False,
)
credentials: PerplexityCredentials = PerplexityCredentialsField()
system_prompt: str = SchemaField(
title="System Prompt",
default="",
description="Optional system prompt to provide context to the model.",
advanced=True,
)
max_tokens: int | None = SchemaField(
advanced=True,
default=None,
description="The maximum number of tokens to generate.",
)
class Output(BlockSchema):
response: str = SchemaField(
description="The response from the Perplexity model."
)
annotations: list[dict[str, Any]] = SchemaField(
description="List of URL citations and annotations from the response."
)
error: str = SchemaField(description="Error message if the API call failed.")
def __init__(self):
super().__init__(
id="c8a5f2e9-8b3d-4a7e-9f6c-1d5e3c9b7a4f",
description="Query Perplexity's sonar models with real-time web search capabilities and receive annotated responses with source citations.",
categories={BlockCategory.AI, BlockCategory.SEARCH},
input_schema=PerplexityBlock.Input,
output_schema=PerplexityBlock.Output,
test_input={
"prompt": "What is the weather today?",
"model": PerplexityModel.SONAR,
"credentials": TEST_CREDENTIALS_INPUT,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("response", "The weather varies by location..."),
("annotations", list),
],
test_mock={
"call_perplexity": lambda *args, **kwargs: {
"response": "The weather varies by location...",
"annotations": [
{
"type": "url_citation",
"url_citation": {
"title": "weather.com",
"url": "https://weather.com",
},
}
],
}
},
)
self.execution_stats = NodeExecutionStats()
async def call_perplexity(
self,
credentials: APIKeyCredentials,
model: PerplexityModel,
prompt: str,
system_prompt: str = "",
max_tokens: int | None = None,
) -> dict[str, Any]:
"""Call Perplexity via OpenRouter and extract annotations."""
client = openai.AsyncOpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=credentials.api_key.get_secret_value(),
)
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
try:
response = await client.chat.completions.create(
extra_headers={
"HTTP-Referer": "https://agpt.co",
"X-Title": "AutoGPT",
},
model=model.value,
messages=messages,
max_tokens=max_tokens,
)
if not response.choices:
raise ValueError("No response from Perplexity via OpenRouter.")
# Extract the response content
response_content = response.choices[0].message.content or ""
# Extract annotations if present in the message
annotations = []
if hasattr(response.choices[0].message, "annotations"):
# If annotations are directly available
annotations = response.choices[0].message.annotations
else:
# Check if there's a raw response with annotations
raw = getattr(response.choices[0].message, "_raw_response", None)
if isinstance(raw, dict) and "annotations" in raw:
annotations = raw["annotations"]
if not annotations and hasattr(response, "model_extra"):
# Check model_extra for annotations
model_extra = response.model_extra
if isinstance(model_extra, dict):
# Check in choices
if "choices" in model_extra and len(model_extra["choices"]) > 0:
choice = model_extra["choices"][0]
if "message" in choice and "annotations" in choice["message"]:
annotations = choice["message"]["annotations"]
# Also check the raw response object for annotations
if not annotations:
raw = getattr(response, "_raw_response", None)
if isinstance(raw, dict):
# Check various possible locations for annotations
if "annotations" in raw:
annotations = raw["annotations"]
elif "choices" in raw and len(raw["choices"]) > 0:
choice = raw["choices"][0]
if "message" in choice and "annotations" in choice["message"]:
annotations = choice["message"]["annotations"]
# Update execution stats
if response.usage:
self.execution_stats.input_token_count = response.usage.prompt_tokens
self.execution_stats.output_token_count = (
response.usage.completion_tokens
)
return {"response": response_content, "annotations": annotations or []}
except Exception as e:
logger.error(f"Error calling Perplexity: {e}")
raise
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
logger.debug(f"Running Perplexity block with model: {input_data.model}")
try:
result = await self.call_perplexity(
credentials=credentials,
model=input_data.model,
prompt=input_data.prompt,
system_prompt=input_data.system_prompt,
max_tokens=input_data.max_tokens,
)
yield "response", result["response"]
yield "annotations", result["annotations"]
except Exception as e:
error_msg = f"Error calling Perplexity: {str(e)}"
logger.error(error_msg)
yield "error", error_msg

View File

@@ -479,16 +479,18 @@ async def get_graph_executions(
async def get_graph_executions_count(
user_id: str,
user_id: Optional[str] = None,
graph_id: Optional[str] = None,
statuses: Optional[list[ExecutionStatus]] = None,
created_time_gte: Optional[datetime] = None,
created_time_lte: Optional[datetime] = None,
) -> int:
"""
Get count of graph executions for a user with optional filters.
Get count of graph executions with optional filters.
Args:
user_id: The user ID to filter by
user_id: Optional user ID to filter by
graph_id: Optional graph ID to filter by
statuses: Optional list of execution statuses to filter by
created_time_gte: Optional minimum creation time
created_time_lte: Optional maximum creation time
@@ -498,9 +500,14 @@ async def get_graph_executions_count(
"""
where_filter: AgentGraphExecutionWhereInput = {
"isDeleted": False,
"userId": user_id,
}
if user_id:
where_filter["userId"] = user_id
if graph_id:
where_filter["agentGraphId"] = graph_id
if created_time_gte or created_time_lte:
where_filter["createdAt"] = {
"gte": created_time_gte or datetime.min.replace(tzinfo=timezone.utc),

View File

@@ -1467,6 +1467,7 @@ class ExecutionManager(AppProcess):
graph_exec_id = graph_exec_entry.graph_exec_id
user_id = graph_exec_entry.user_id
graph_id = graph_exec_entry.graph_id
logger.info(
f"[{self.service_name}] Received RUN for graph_exec_id={graph_exec_id}, user_id={user_id}"
)
@@ -1476,6 +1477,7 @@ class ExecutionManager(AppProcess):
# Only check executions from the last 24 hours for performance
current_running_count = get_db_client().get_graph_executions_count(
user_id=user_id,
graph_id=graph_id,
statuses=[ExecutionStatus.RUNNING],
created_time_gte=datetime.now(timezone.utc) - timedelta(hours=24),
)
@@ -1485,7 +1487,7 @@ class ExecutionManager(AppProcess):
>= settings.config.max_concurrent_graph_executions_per_user
):
logger.warning(
f"[{self.service_name}] Rate limit exceeded for user {user_id}: "
f"[{self.service_name}] Rate limit exceeded for user {user_id} on graph {graph_id}: "
f"{current_running_count}/{settings.config.max_concurrent_graph_executions_per_user} running executions"
)
_ack_message(reject=True, requeue=True)

View File

@@ -742,7 +742,21 @@ async def create_store_submission(
store_listing_version_id=store_listing_version_id,
changes_summary=changes_summary,
)
except prisma.errors.UniqueViolationError as exc:
# Attempt to check if the error was due to the slug field being unique
error_str = str(exc)
if "slug" in error_str.lower():
logger.debug(
f"Slug '{slug}' is already in use by another agent (agent_id: {agent_id}) for user {user_id}"
)
raise backend.server.v2.store.exceptions.SlugAlreadyInUseError(
f"The URL slug '{slug}' is already in use by another one of your agents. Please choose a different slug."
) from exc
else:
# Reraise as a generic database error for other unique violations
raise backend.server.v2.store.exceptions.DatabaseError(
f"Unique constraint violated (not slug): {error_str}"
) from exc
except (
backend.server.v2.store.exceptions.AgentNotFoundError,
backend.server.v2.store.exceptions.ListingExistsError,

View File

@@ -279,7 +279,6 @@ async def test_create_store_submission(mocker):
# Verify mocks called correctly
mock_agent_graph.return_value.find_first.assert_called_once()
mock_store_listing.return_value.find_first.assert_called_once()
mock_store_listing.return_value.create.assert_called_once()

View File

@@ -106,3 +106,9 @@ class UnauthorizedError(StoreError):
"""Raised when a user is not authorized to perform an action"""
pass
class SlugAlreadyInUseError(StoreError):
"""Raised when a slug is already in use by another agent owned by the user"""
pass

View File

@@ -547,6 +547,12 @@ async def create_submission(
)
return result
except backend.server.v2.store.exceptions.SlugAlreadyInUseError as e:
logger.warning("Slug already in use: %s", str(e))
return fastapi.responses.JSONResponse(
status_code=409,
content={"detail": str(e)},
)
except Exception:
logger.exception("Exception occurred whilst creating store submission")
return fastapi.responses.JSONResponse(

View File

@@ -149,10 +149,10 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
description="Time in seconds for how far back to check for the late executions.",
)
max_concurrent_graph_executions_per_user: int = Field(
default=50,
default=25,
ge=1,
le=1000,
description="Maximum number of concurrent graph executions allowed per user.",
description="Maximum number of concurrent graph executions allowed per user per graph.",
)
block_error_rate_threshold: float = Field(

View File

@@ -2,6 +2,7 @@ import { withSentryConfig } from "@sentry/nextjs";
/** @type {import('next').NextConfig} */
const nextConfig = {
productionBrowserSourceMaps: true,
images: {
domains: [
"images.unsplash.com",
@@ -74,6 +75,14 @@ export default isDevelopmentBuild
// since the source is public anyway :)
hideSourceMaps: false,
// This helps Sentry with sourcemaps... https://docs.sentry.io/platforms/javascript/guides/nextjs/sourcemaps/
sourcemaps: {
disable: false, // Source maps are enabled by default
assets: ["**/*.js", "**/*.js.map"], // Specify which files to upload
ignore: ["**/node_modules/**"], // Files to exclude
deleteSourcemapsAfterUpload: true, // Security: delete after upload
},
// Automatically tree-shake Sentry logger statements to reduce bundle size
disableLogger: true,

View File

@@ -116,6 +116,7 @@ export function useAgentInfoStep({
toast({
title: "Submit Agent Error",
description:
(error instanceof Error ? error.message : undefined) ||
"An error occurred while submitting the agent. Please try again.",
duration: 3000,
variant: "destructive",