Files
AutoGPT/autogpt_platform/backend/backend/api/features/library/model.py
Reinier van der Leer 113e87a23c refactor(backend): Reduce circular imports (#12068)
I'm getting circular import issues because there is a lot of
cross-importing between `backend.data`, `backend.blocks`, and other
modules. This change reduces block-related cross-imports and thus risk
of breaking circular imports.

### Changes 🏗️

- Strip down `backend.data.block`
- Move `Block` base class and related class/enum defs to
`backend.blocks._base`
  - Move `is_block_auth_configured` to `backend.blocks._utils`
- Move `get_blocks()`, `get_io_block_ids()` etc. to `backend.blocks`
(`__init__.py`)
  - Update imports everywhere
- Remove unused and poorly typed `Block.create()`
  - Change usages from `block_cls.create()` to `block_cls()`
- Improve typing of `load_all_blocks` and `get_blocks`
- Move cross-import of `backend.api.features.library.model` from
`backend/data/__init__.py` to `backend/data/integrations.py`
- Remove deprecated attribute `NodeModel.webhook`
  - Re-generate OpenAPI spec and fix frontend usage
- Eliminate module-level `backend.blocks` import from `blocks/agent.py`
- Eliminate module-level `backend.data.execution` and
`backend.executor.manager` imports from `blocks/helpers/review.py`
- Replace `BlockInput` with `GraphInput` for graph inputs

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - CI static type-checking + tests should be sufficient for this
2026-02-12 12:07:49 +00:00

473 lines
15 KiB
Python

import datetime
from enum import Enum
from typing import TYPE_CHECKING, Any, Optional
import prisma.enums
import prisma.models
import pydantic
from backend.data.graph import GraphModel, GraphSettings, GraphTriggerInfo
from backend.data.model import (
CredentialsMetaInput,
GraphInput,
is_credentials_field_name,
)
from backend.util.json import loads as json_loads
from backend.util.models import Pagination
if TYPE_CHECKING:
from backend.data.integrations import Webhook
class LibraryAgentStatus(str, Enum):
COMPLETED = "COMPLETED"
HEALTHY = "HEALTHY"
WAITING = "WAITING"
ERROR = "ERROR"
class MarketplaceListingCreator(pydantic.BaseModel):
"""Creator information for a marketplace listing."""
name: str
id: str
slug: str
class MarketplaceListing(pydantic.BaseModel):
"""Marketplace listing information for a library agent."""
id: str
name: str
slug: str
creator: MarketplaceListingCreator
class RecentExecution(pydantic.BaseModel):
"""Summary of a recent execution for quality assessment.
Used by the LLM to understand the agent's recent performance with specific examples
rather than just aggregate statistics.
"""
status: str
correctness_score: float | None = None
activity_summary: str | None = None
def _parse_settings(settings: dict | str | None) -> GraphSettings:
"""Parse settings from database, handling both dict and string formats."""
if settings is None:
return GraphSettings()
try:
if isinstance(settings, str):
settings = json_loads(settings)
return GraphSettings.model_validate(settings)
except Exception:
return GraphSettings()
class LibraryAgent(pydantic.BaseModel):
"""
Represents an agent in the library, including metadata for display and
user interaction within the system.
"""
id: str
graph_id: str
graph_version: int
owner_user_id: str
image_url: str | None
creator_name: str
creator_image_url: str
status: LibraryAgentStatus
created_at: datetime.datetime
updated_at: datetime.datetime
name: str
description: str
instructions: str | None = None
input_schema: dict[str, Any]
output_schema: dict[str, Any]
credentials_input_schema: dict[str, Any] | None = pydantic.Field(
description="Input schema for credentials required by the agent",
)
has_external_trigger: bool = pydantic.Field(
description="Whether the agent has an external trigger (e.g. webhook) node"
)
has_human_in_the_loop: bool = pydantic.Field(
description="Whether the agent has human-in-the-loop blocks"
)
has_sensitive_action: bool = pydantic.Field(
description="Whether the agent has sensitive action blocks"
)
trigger_setup_info: Optional[GraphTriggerInfo] = None
new_output: bool
execution_count: int = 0
success_rate: float | None = None
avg_correctness_score: float | None = None
recent_executions: list[RecentExecution] = pydantic.Field(
default_factory=list,
description="List of recent executions with status, score, and summary",
)
can_access_graph: bool
is_latest_version: bool
is_favorite: bool
recommended_schedule_cron: str | None = None
settings: GraphSettings = pydantic.Field(default_factory=GraphSettings)
marketplace_listing: Optional["MarketplaceListing"] = None
@staticmethod
def from_db(
agent: prisma.models.LibraryAgent,
sub_graphs: Optional[list[prisma.models.AgentGraph]] = None,
store_listing: Optional[prisma.models.StoreListing] = None,
profile: Optional[prisma.models.Profile] = None,
) -> "LibraryAgent":
"""
Factory method that constructs a LibraryAgent from a Prisma LibraryAgent
model instance.
"""
if not agent.AgentGraph:
raise ValueError("Associated Agent record is required.")
graph = GraphModel.from_db(agent.AgentGraph, sub_graphs=sub_graphs)
created_at = agent.createdAt
agent_updated_at = agent.AgentGraph.updatedAt
lib_agent_updated_at = agent.updatedAt
updated_at = (
max(agent_updated_at, lib_agent_updated_at)
if agent_updated_at
else lib_agent_updated_at
)
creator_name = "Unknown"
creator_image_url = ""
if agent.Creator:
creator_name = agent.Creator.name or "Unknown"
creator_image_url = agent.Creator.avatarUrl or ""
week_ago = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(
days=7
)
executions = agent.AgentGraph.Executions or []
status_result = _calculate_agent_status(executions, week_ago)
status = status_result.status
new_output = status_result.new_output
execution_count = len(executions)
success_rate: float | None = None
avg_correctness_score: float | None = None
if execution_count > 0:
success_count = sum(
1
for e in executions
if e.executionStatus == prisma.enums.AgentExecutionStatus.COMPLETED
)
success_rate = (success_count / execution_count) * 100
correctness_scores = []
for e in executions:
if e.stats and isinstance(e.stats, dict):
score = e.stats.get("correctness_score")
if score is not None and isinstance(score, (int, float)):
correctness_scores.append(float(score))
if correctness_scores:
avg_correctness_score = sum(correctness_scores) / len(
correctness_scores
)
recent_executions: list[RecentExecution] = []
for e in executions:
exec_score: float | None = None
exec_summary: str | None = None
if e.stats and isinstance(e.stats, dict):
score = e.stats.get("correctness_score")
if score is not None and isinstance(score, (int, float)):
exec_score = float(score)
summary = e.stats.get("activity_status")
if summary is not None and isinstance(summary, str):
exec_summary = summary
exec_status = (
e.executionStatus.value
if hasattr(e.executionStatus, "value")
else str(e.executionStatus)
)
recent_executions.append(
RecentExecution(
status=exec_status,
correctness_score=exec_score,
activity_summary=exec_summary,
)
)
can_access_graph = agent.AgentGraph.userId == agent.userId
is_latest_version = True
marketplace_listing_data = None
if store_listing and store_listing.ActiveVersion and profile:
creator_data = MarketplaceListingCreator(
name=profile.name,
id=profile.id,
slug=profile.username,
)
marketplace_listing_data = MarketplaceListing(
id=store_listing.id,
name=store_listing.ActiveVersion.name,
slug=store_listing.slug,
creator=creator_data,
)
return LibraryAgent(
id=agent.id,
graph_id=agent.agentGraphId,
graph_version=agent.agentGraphVersion,
owner_user_id=agent.userId,
image_url=agent.imageUrl,
creator_name=creator_name,
creator_image_url=creator_image_url,
status=status,
created_at=created_at,
updated_at=updated_at,
name=graph.name,
description=graph.description,
instructions=graph.instructions,
input_schema=graph.input_schema,
output_schema=graph.output_schema,
credentials_input_schema=(
graph.credentials_input_schema if sub_graphs is not None else None
),
has_external_trigger=graph.has_external_trigger,
has_human_in_the_loop=graph.has_human_in_the_loop,
has_sensitive_action=graph.has_sensitive_action,
trigger_setup_info=graph.trigger_setup_info,
new_output=new_output,
execution_count=execution_count,
success_rate=success_rate,
avg_correctness_score=avg_correctness_score,
recent_executions=recent_executions,
can_access_graph=can_access_graph,
is_latest_version=is_latest_version,
is_favorite=agent.isFavorite,
recommended_schedule_cron=agent.AgentGraph.recommendedScheduleCron,
settings=_parse_settings(agent.settings),
marketplace_listing=marketplace_listing_data,
)
class AgentStatusResult(pydantic.BaseModel):
status: LibraryAgentStatus
new_output: bool
def _calculate_agent_status(
executions: list[prisma.models.AgentGraphExecution],
recent_threshold: datetime.datetime,
) -> AgentStatusResult:
"""
Helper function to determine the overall agent status and whether there
is new output (i.e., completed runs within the recent threshold).
:param executions: A list of AgentGraphExecution objects.
:param recent_threshold: A datetime; any execution after this indicates new output.
:return: (AgentStatus, new_output_flag)
"""
if not executions:
return AgentStatusResult(status=LibraryAgentStatus.COMPLETED, new_output=False)
status_counts = {status: 0 for status in prisma.enums.AgentExecutionStatus}
new_output = False
for execution in executions:
if execution.createdAt >= recent_threshold:
if execution.executionStatus == prisma.enums.AgentExecutionStatus.COMPLETED:
new_output = True
status_counts[execution.executionStatus] += 1
if status_counts[prisma.enums.AgentExecutionStatus.FAILED] > 0:
return AgentStatusResult(status=LibraryAgentStatus.ERROR, new_output=new_output)
elif status_counts[prisma.enums.AgentExecutionStatus.QUEUED] > 0:
return AgentStatusResult(
status=LibraryAgentStatus.WAITING, new_output=new_output
)
elif status_counts[prisma.enums.AgentExecutionStatus.RUNNING] > 0:
return AgentStatusResult(
status=LibraryAgentStatus.HEALTHY, new_output=new_output
)
else:
return AgentStatusResult(
status=LibraryAgentStatus.COMPLETED, new_output=new_output
)
class LibraryAgentResponse(pydantic.BaseModel):
"""Response schema for a list of library agents and pagination info."""
agents: list[LibraryAgent]
pagination: Pagination
class LibraryAgentPresetCreatable(pydantic.BaseModel):
"""
Request model used when creating a new preset for a library agent.
"""
graph_id: str
graph_version: int
inputs: GraphInput
credentials: dict[str, CredentialsMetaInput]
name: str
description: str
is_active: bool = True
webhook_id: Optional[str] = None
class LibraryAgentPresetCreatableFromGraphExecution(pydantic.BaseModel):
"""
Request model used when creating a new preset for a library agent.
"""
graph_execution_id: str
name: str
description: str
is_active: bool = True
class LibraryAgentPresetUpdatable(pydantic.BaseModel):
"""
Request model used when updating a preset for a library agent.
"""
inputs: Optional[GraphInput] = None
credentials: Optional[dict[str, CredentialsMetaInput]] = None
name: Optional[str] = None
description: Optional[str] = None
is_active: Optional[bool] = None
class TriggeredPresetSetupRequest(pydantic.BaseModel):
name: str
description: str = ""
graph_id: str
graph_version: int
trigger_config: dict[str, Any]
agent_credentials: dict[str, CredentialsMetaInput] = pydantic.Field(
default_factory=dict
)
class LibraryAgentPreset(LibraryAgentPresetCreatable):
"""Represents a preset configuration for a library agent."""
id: str
user_id: str
created_at: datetime.datetime
updated_at: datetime.datetime
webhook: "Webhook | None"
@classmethod
def from_db(cls, preset: prisma.models.AgentPreset) -> "LibraryAgentPreset":
from backend.data.integrations import Webhook
if preset.InputPresets is None:
raise ValueError("InputPresets must be included in AgentPreset query")
if preset.webhookId and preset.Webhook is None:
raise ValueError(
"Webhook must be included in AgentPreset query when webhookId is set"
)
input_data: GraphInput = {}
input_credentials: dict[str, CredentialsMetaInput] = {}
for preset_input in preset.InputPresets:
if not is_credentials_field_name(preset_input.name):
input_data[preset_input.name] = preset_input.data
else:
input_credentials[preset_input.name] = (
CredentialsMetaInput.model_validate(preset_input.data)
)
return cls(
id=preset.id,
user_id=preset.userId,
created_at=preset.createdAt,
updated_at=preset.updatedAt,
graph_id=preset.agentGraphId,
graph_version=preset.agentGraphVersion,
name=preset.name,
description=preset.description,
is_active=preset.isActive,
inputs=input_data,
credentials=input_credentials,
webhook_id=preset.webhookId,
webhook=Webhook.from_db(preset.Webhook) if preset.Webhook else None,
)
class LibraryAgentPresetResponse(pydantic.BaseModel):
"""Response schema for a list of agent presets and pagination info."""
presets: list[LibraryAgentPreset]
pagination: Pagination
class LibraryAgentFilter(str, Enum):
"""Possible filters for searching library agents."""
IS_FAVOURITE = "isFavourite"
IS_CREATED_BY_USER = "isCreatedByUser"
class LibraryAgentSort(str, Enum):
"""Possible sort options for sorting library agents."""
CREATED_AT = "createdAt"
UPDATED_AT = "updatedAt"
class LibraryAgentUpdateRequest(pydantic.BaseModel):
"""
Schema for updating a library agent via PUT.
Includes flags for auto-updating version, marking as favorite,
archiving, or deleting.
"""
auto_update_version: Optional[bool] = pydantic.Field(
default=None, description="Auto-update the agent version"
)
graph_version: Optional[int] = pydantic.Field(
default=None, description="Specific graph version to update to"
)
is_favorite: Optional[bool] = pydantic.Field(
default=None, description="Mark the agent as a favorite"
)
is_archived: Optional[bool] = pydantic.Field(
default=None, description="Archive the agent"
)
settings: Optional[GraphSettings] = pydantic.Field(
default=None, description="User-specific settings for this library agent"
)