mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
I'm getting circular import issues because there is a lot of cross-importing between `backend.data`, `backend.blocks`, and other modules. This change reduces block-related cross-imports and thus risk of breaking circular imports. ### Changes 🏗️ - Strip down `backend.data.block` - Move `Block` base class and related class/enum defs to `backend.blocks._base` - Move `is_block_auth_configured` to `backend.blocks._utils` - Move `get_blocks()`, `get_io_block_ids()` etc. to `backend.blocks` (`__init__.py`) - Update imports everywhere - Remove unused and poorly typed `Block.create()` - Change usages from `block_cls.create()` to `block_cls()` - Improve typing of `load_all_blocks` and `get_blocks` - Move cross-import of `backend.api.features.library.model` from `backend/data/__init__.py` to `backend/data/integrations.py` - Remove deprecated attribute `NodeModel.webhook` - Re-generate OpenAPI spec and fix frontend usage - Eliminate module-level `backend.blocks` import from `blocks/agent.py` - Eliminate module-level `backend.data.execution` and `backend.executor.manager` imports from `blocks/helpers/review.py` - Replace `BlockInput` with `GraphInput` for graph inputs ### Checklist 📋 #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: - CI static type-checking + tests should be sufficient for this
160 lines
5.1 KiB
Python
160 lines
5.1 KiB
Python
import importlib
|
|
import logging
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Sequence, Type, TypeVar
|
|
|
|
from backend.blocks._base import AnyBlockSchema, BlockType
|
|
from backend.util.cache import cached
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
T = TypeVar("T")
|
|
|
|
|
|
@cached(ttl_seconds=3600)
|
|
def load_all_blocks() -> dict[str, type["AnyBlockSchema"]]:
|
|
from backend.blocks._base import Block
|
|
from backend.util.settings import Config
|
|
|
|
# Check if example blocks should be loaded from settings
|
|
config = Config()
|
|
load_examples = config.enable_example_blocks
|
|
|
|
# Dynamically load all modules under backend.blocks
|
|
current_dir = Path(__file__).parent
|
|
modules = []
|
|
for f in current_dir.rglob("*.py"):
|
|
if not f.is_file() or f.name == "__init__.py" or f.name.startswith("test_"):
|
|
continue
|
|
|
|
# Skip examples directory if not enabled
|
|
relative_path = f.relative_to(current_dir)
|
|
if not load_examples and relative_path.parts[0] == "examples":
|
|
continue
|
|
|
|
module_path = str(relative_path)[:-3].replace(os.path.sep, ".")
|
|
modules.append(module_path)
|
|
|
|
for module in modules:
|
|
if not re.match("^[a-z0-9_.]+$", module):
|
|
raise ValueError(
|
|
f"Block module {module} error: module name must be lowercase, "
|
|
"and contain only alphanumeric characters and underscores."
|
|
)
|
|
|
|
importlib.import_module(f".{module}", package=__name__)
|
|
|
|
# Load all Block instances from the available modules
|
|
available_blocks: dict[str, type["AnyBlockSchema"]] = {}
|
|
for block_cls in _all_subclasses(Block):
|
|
class_name = block_cls.__name__
|
|
|
|
if class_name.endswith("Base"):
|
|
continue
|
|
|
|
if not class_name.endswith("Block"):
|
|
raise ValueError(
|
|
f"Block class {class_name} does not end with 'Block'. "
|
|
"If you are creating an abstract class, "
|
|
"please name the class with 'Base' at the end"
|
|
)
|
|
|
|
block = block_cls() # pyright: ignore[reportAbstractUsage]
|
|
|
|
if not isinstance(block.id, str) or len(block.id) != 36:
|
|
raise ValueError(
|
|
f"Block ID {block.name} error: {block.id} is not a valid UUID"
|
|
)
|
|
|
|
if block.id in available_blocks:
|
|
raise ValueError(
|
|
f"Block ID {block.name} error: {block.id} is already in use"
|
|
)
|
|
|
|
input_schema = block.input_schema.model_fields
|
|
output_schema = block.output_schema.model_fields
|
|
|
|
# Make sure `error` field is a string in the output schema
|
|
if "error" in output_schema and output_schema["error"].annotation is not str:
|
|
raise ValueError(
|
|
f"{block.name} `error` field in output_schema must be a string"
|
|
)
|
|
|
|
# Ensure all fields in input_schema and output_schema are annotated SchemaFields
|
|
for field_name, field in [*input_schema.items(), *output_schema.items()]:
|
|
if field.annotation is None:
|
|
raise ValueError(
|
|
f"{block.name} has a field {field_name} that is not annotated"
|
|
)
|
|
if field.json_schema_extra is None:
|
|
raise ValueError(
|
|
f"{block.name} has a field {field_name} not defined as SchemaField"
|
|
)
|
|
|
|
for field in block.input_schema.model_fields.values():
|
|
if field.annotation is bool and field.default not in (True, False):
|
|
raise ValueError(
|
|
f"{block.name} has a boolean field with no default value"
|
|
)
|
|
|
|
available_blocks[block.id] = block_cls
|
|
|
|
# Filter out blocks with incomplete auth configs, e.g. missing OAuth server secrets
|
|
from ._utils import is_block_auth_configured
|
|
|
|
filtered_blocks = {}
|
|
for block_id, block_cls in available_blocks.items():
|
|
if is_block_auth_configured(block_cls):
|
|
filtered_blocks[block_id] = block_cls
|
|
|
|
return filtered_blocks
|
|
|
|
|
|
def _all_subclasses(cls: type[T]) -> list[type[T]]:
|
|
subclasses = cls.__subclasses__()
|
|
for subclass in subclasses:
|
|
subclasses += _all_subclasses(subclass)
|
|
return subclasses
|
|
|
|
|
|
# ============== Block access helper functions ============== #
|
|
|
|
|
|
def get_blocks() -> dict[str, Type["AnyBlockSchema"]]:
|
|
return load_all_blocks()
|
|
|
|
|
|
# Note on the return type annotation: https://github.com/microsoft/pyright/issues/10281
|
|
def get_block(block_id: str) -> "AnyBlockSchema | None":
|
|
cls = get_blocks().get(block_id)
|
|
return cls() if cls else None
|
|
|
|
|
|
@cached(ttl_seconds=3600)
|
|
def get_webhook_block_ids() -> Sequence[str]:
|
|
return [
|
|
id
|
|
for id, B in get_blocks().items()
|
|
if B().block_type in (BlockType.WEBHOOK, BlockType.WEBHOOK_MANUAL)
|
|
]
|
|
|
|
|
|
@cached(ttl_seconds=3600)
|
|
def get_io_block_ids() -> Sequence[str]:
|
|
return [
|
|
id
|
|
for id, B in get_blocks().items()
|
|
if B().block_type in (BlockType.INPUT, BlockType.OUTPUT)
|
|
]
|
|
|
|
|
|
@cached(ttl_seconds=3600)
|
|
def get_human_in_the_loop_block_ids() -> Sequence[str]:
|
|
return [
|
|
id
|
|
for id, B in get_blocks().items()
|
|
if B().block_type == BlockType.HUMAN_IN_THE_LOOP
|
|
]
|