Remove forge and autogpt

This commit is contained in:
SwiftyOS
2024-09-10 14:10:59 +02:00
parent ebd2ecd84c
commit fa16c207e0
15 changed files with 766 additions and 200 deletions

View File

@@ -1,6 +1,6 @@
import pytest
from .depends import verify_user, requires_admin_user, requires_user
from .depends import requires_admin_user, requires_user, verify_user
def test_verify_user_no_payload():

View File

@@ -0,0 +1,9 @@
from .config import configure_logging
from .filters import BelowLevelFilter
from .formatters import FancyConsoleFormatter
__all__ = [
"configure_logging",
"BelowLevelFilter",
"FancyConsoleFormatter",
]

View File

@@ -0,0 +1,192 @@
"""Logging module for Auto-GPT."""
from __future__ import annotations
import enum
import logging
import os
import sys
from pathlib import Path
from typing import Optional
from .crazyconf import SystemConfiguration, UserConfigurable
from .filters import BelowLevelFilter
from .formatters import AGPTFormatter, StructuredLoggingFormatter
LOG_DIR = Path(__file__).parent.parent.parent / "logs"
LOG_FILE = "activity.log"
DEBUG_LOG_FILE = "debug.log"
ERROR_LOG_FILE = "error.log"
SIMPLE_LOG_FORMAT = "%(asctime)s %(levelname)s %(title)s%(message)s"
DEBUG_LOG_FORMAT = (
"%(asctime)s %(levelname)s %(filename)s:%(lineno)d" " %(title)s%(message)s"
)
SPEECH_OUTPUT_LOGGER = "VOICE"
USER_FRIENDLY_OUTPUT_LOGGER = "USER_FRIENDLY_OUTPUT"
class LogFormatName(str, enum.Enum):
SIMPLE = "simple"
DEBUG = "debug"
STRUCTURED = "structured_google_cloud"
TEXT_LOG_FORMAT_MAP = {
LogFormatName.DEBUG: DEBUG_LOG_FORMAT,
LogFormatName.SIMPLE: SIMPLE_LOG_FORMAT,
}
class LoggingConfig(SystemConfiguration):
level: int = UserConfigurable(
default=logging.INFO,
from_env=lambda: logging.getLevelName(os.getenv("LOG_LEVEL", "INFO")),
)
# Console output
log_format: LogFormatName = UserConfigurable(
default=LogFormatName.SIMPLE, from_env="LOG_FORMAT"
)
plain_console_output: bool = UserConfigurable(
default=False,
from_env=lambda: os.getenv("PLAIN_OUTPUT", "False") == "True",
)
# File output
log_dir: Path = LOG_DIR
log_file_format: Optional[LogFormatName] = UserConfigurable(
default=LogFormatName.SIMPLE,
from_env=lambda: os.getenv( # type: ignore
"LOG_FILE_FORMAT", os.getenv("LOG_FORMAT", "simple")
),
)
def configure_logging(
debug: bool = False,
level: Optional[int | str] = None,
log_dir: Optional[Path] = None,
log_format: Optional[LogFormatName | str] = None,
log_file_format: Optional[LogFormatName | str] = None,
plain_console_output: Optional[bool] = None,
config: Optional[LoggingConfig] = None,
) -> None:
"""Configure the native logging module, based on the environment config and any
specified overrides.
Arguments override values specified in the environment.
Overrides are also applied to `config`, if passed.
Should be usable as `configure_logging(**config.logging.dict())`, where
`config.logging` is a `LoggingConfig` object.
"""
if debug and level:
raise ValueError("Only one of either 'debug' and 'level' arguments may be set")
# Parse arguments
if isinstance(level, str):
if type(_level := logging.getLevelName(level.upper())) is int:
level = _level
else:
raise ValueError(f"Unknown log level '{level}'")
if isinstance(log_format, str):
if log_format in LogFormatName._value2member_map_:
log_format = LogFormatName(log_format)
elif not isinstance(log_format, LogFormatName):
raise ValueError(f"Unknown log format '{log_format}'")
if isinstance(log_file_format, str):
if log_file_format in LogFormatName._value2member_map_:
log_file_format = LogFormatName(log_file_format)
elif not isinstance(log_file_format, LogFormatName):
raise ValueError(f"Unknown log format '{log_format}'")
config = config or LoggingConfig.from_env()
# Aggregate env config + arguments
config.level = logging.DEBUG if debug else level or config.level
config.log_dir = log_dir or config.log_dir
config.log_format = log_format or (
LogFormatName.DEBUG if debug else config.log_format
)
config.log_file_format = log_file_format or log_format or config.log_file_format
config.plain_console_output = (
plain_console_output
if plain_console_output is not None
else config.plain_console_output
)
# Structured logging is used for cloud environments,
# where logging to a file makes no sense.
if config.log_format == LogFormatName.STRUCTURED:
config.plain_console_output = True
config.log_file_format = None
# create log directory if it doesn't exist
if not config.log_dir.exists():
config.log_dir.mkdir()
log_handlers: list[logging.Handler] = []
if config.log_format in (LogFormatName.DEBUG, LogFormatName.SIMPLE):
console_format_template = TEXT_LOG_FORMAT_MAP[config.log_format]
console_formatter = AGPTFormatter(console_format_template)
else:
console_formatter = StructuredLoggingFormatter()
console_format_template = SIMPLE_LOG_FORMAT
# Console output handlers
stdout = logging.StreamHandler(stream=sys.stdout)
stdout.setLevel(config.level)
stdout.addFilter(BelowLevelFilter(logging.WARNING))
stdout.setFormatter(console_formatter)
stderr = logging.StreamHandler()
stderr.setLevel(logging.WARNING)
stderr.setFormatter(console_formatter)
log_handlers += [stdout, stderr]
# File output handlers
if config.log_file_format is not None:
if config.level < logging.ERROR:
file_output_format_template = TEXT_LOG_FORMAT_MAP[config.log_file_format]
file_output_formatter = AGPTFormatter(
file_output_format_template, no_color=True
)
# INFO log file handler
activity_log_handler = logging.FileHandler(
config.log_dir / LOG_FILE, "a", "utf-8"
)
activity_log_handler.setLevel(config.level)
activity_log_handler.setFormatter(file_output_formatter)
log_handlers += [activity_log_handler]
# ERROR log file handler
error_log_handler = logging.FileHandler(
config.log_dir / ERROR_LOG_FILE, "a", "utf-8"
)
error_log_handler.setLevel(logging.ERROR)
error_log_handler.setFormatter(AGPTFormatter(DEBUG_LOG_FORMAT, no_color=True))
log_handlers += [error_log_handler]
# Configure the root logger
logging.basicConfig(
format=console_format_template,
level=config.level,
handlers=log_handlers,
)
# Speech output
speech_output_logger = logging.getLogger(SPEECH_OUTPUT_LOGGER)
speech_output_logger.setLevel(logging.INFO)
speech_output_logger.propagate = False
# JSON logger with better formatting
json_logger = logging.getLogger("JSON_LOGGER")
json_logger.setLevel(logging.DEBUG)
json_logger.propagate = False
# Disable debug logging from OpenAI library
openai_logger = logging.getLogger("openai")
openai_logger.setLevel(logging.WARNING)

View File

@@ -0,0 +1,370 @@
import os
import typing
from typing import Any, Callable, Generic, Optional, Type, TypeVar, get_args
from pydantic import BaseModel, ConfigDict, Field, ValidationError
from pydantic._internal._model_construction import (
ModelMetaclass, # HACK shouldn't be used
)
from pydantic.fields import FieldInfo
from pydantic_core import PydanticUndefined, PydanticUndefinedType
T = TypeVar("T")
M = TypeVar("M", bound=BaseModel)
def UserConfigurable(
default: T | PydanticUndefinedType = PydanticUndefined,
*args,
default_factory: Optional[Callable[[], T]] = None,
from_env: Optional[str | Callable[[], T | None]] = None,
description: str = "",
exclude: bool = False,
**kwargs,
) -> T:
# TODO: use this to auto-generate docs for the application configuration
field_info: FieldInfo = Field(
default,
*args,
default_factory=default_factory,
description=description,
exclude=exclude,
**kwargs,
)
field_info.metadata.append(("user_configurable", True))
field_info.metadata.append(("from_env", from_env))
return field_info # type: ignore
def _get_field_metadata(field: FieldInfo, key: str, default: Any = None) -> Any:
for item in field.metadata:
if isinstance(item, tuple) and item[0] == key:
return item[1]
if isinstance(item, str) and item == key:
return True
return default
class SystemConfiguration(BaseModel):
def get_user_config(self) -> dict[str, Any]:
return _recurse_user_config_values(self)
@classmethod
def from_env(cls):
"""
Initializes the config object from environment variables.
Environment variables are mapped to UserConfigurable fields using the from_env
attribute that can be passed to UserConfigurable.
"""
def infer_field_value(field: FieldInfo):
default_value = (
field.default
if field.default not in (None, PydanticUndefined)
else (
field.default_factory()
if field.default_factory
else PydanticUndefined
)
)
if from_env := _get_field_metadata(field, "from_env"):
val_from_env = (
os.getenv(from_env) if type(from_env) is str else from_env()
)
if val_from_env is not None:
return val_from_env
return default_value
return _recursive_init_model(cls, infer_field_value)
model_config = ConfigDict(
extra="forbid", use_enum_values=True, validate_assignment=True
)
SC = TypeVar("SC", bound=SystemConfiguration)
class SystemSettings(BaseModel):
"""A base class for all system settings."""
name: str
description: str
model_config = ConfigDict(
extra="forbid", use_enum_values=True, validate_assignment=True
)
S = TypeVar("S", bound=SystemSettings)
class Configurable(Generic[S]):
"""A base class for all configurable objects."""
prefix: str = ""
default_settings: typing.ClassVar[S] # type: ignore
@classmethod
def get_user_config(cls) -> dict[str, Any]:
return _recurse_user_config_values(cls.default_settings)
@classmethod
def build_agent_configuration(cls, overrides: dict = {}) -> S:
"""Process the configuration for this object."""
base_config = _update_user_config_from_env(cls.default_settings)
final_configuration = deep_update(base_config, overrides)
return cls.default_settings.__class__.model_validate(final_configuration)
def _update_user_config_from_env(instance: BaseModel) -> dict[str, Any]:
"""
Update config fields of a Pydantic model instance from environment variables.
Precedence:
1. Non-default value already on the instance
2. Value returned by `from_env()`
3. Default value for the field
Params:
instance: The Pydantic model instance.
Returns:
The user config fields of the instance.
"""
def infer_field_value(field: FieldInfo, value):
default_value = (
field.default
if field.default not in (None, PydanticUndefined)
else (field.default_factory() if field.default_factory else None)
)
if value == default_value and (
from_env := _get_field_metadata(field, "from_env")
):
val_from_env = os.getenv(from_env) if type(from_env) is str else from_env()
if val_from_env is not None:
return val_from_env
return value
def init_sub_config(model: Type[SC]) -> SC | None:
try:
return model.model_validate(model.from_env(), strict=True)
except ValidationError as e:
# Gracefully handle missing fields
if all(e["type"] == "missing" for e in e.errors()):
return None
raise
return _recurse_user_config_fields(instance, infer_field_value, init_sub_config)
def _recursive_init_model(
model: Type[M],
infer_field_value: Callable[[FieldInfo], Any],
) -> M:
"""
Recursively initialize the user configuration fields of a Pydantic model.
Parameters:
model: The Pydantic model type.
infer_field_value: A callback function to infer the value of each field.
Parameters:
ModelField: The Pydantic ModelField object describing the field.
Returns:
BaseModel: An instance of the model with the initialized configuration.
"""
user_config_fields = {}
for name, field in model.model_fields.items():
if _get_field_metadata(field, "user_configurable"):
user_config_fields[name] = infer_field_value(field)
elif isinstance(field.annotation, ModelMetaclass) and issubclass(
field.annotation, SystemConfiguration
):
try:
user_config_fields[name] = _recursive_init_model(
model=field.annotation,
infer_field_value=infer_field_value,
)
except ValidationError as e:
# Gracefully handle missing fields
if all(e["type"] == "missing" for e in e.errors()):
user_config_fields[name] = None
raise
user_config_fields = remove_none_items(user_config_fields)
return model.model_validate(user_config_fields)
def _recurse_user_config_fields(
model: BaseModel,
infer_field_value: Callable[[FieldInfo, Any], Any],
init_sub_config: Optional[
Callable[[Type[SystemConfiguration]], SystemConfiguration | None]
] = None,
) -> dict[str, Any]:
"""
Recursively process the user configuration fields of a Pydantic model instance.
Params:
model: The Pydantic model to iterate over.
infer_field_value: A callback function to process each field.
Params:
ModelField: The Pydantic ModelField object describing the field.
Any: The current value of the field.
init_sub_config: An optional callback function to initialize a sub-config.
Params:
Type[SystemConfiguration]: The type of the sub-config to initialize.
Returns:
dict[str, Any]: The processed user configuration fields of the instance.
"""
user_config_fields = {}
for name, field in model.model_fields.items():
value = getattr(model, name)
# Handle individual field
if _get_field_metadata(field, "user_configurable"):
user_config_fields[name] = infer_field_value(field, value)
# Recurse into nested config object
elif isinstance(value, SystemConfiguration):
user_config_fields[name] = _recurse_user_config_fields(
model=value,
infer_field_value=infer_field_value,
init_sub_config=init_sub_config,
)
# Recurse into optional nested config object
elif value is None and init_sub_config:
field_type = get_args(field.annotation)[0] # Optional[T] -> T
if type(field_type) is ModelMetaclass and issubclass(
field_type, SystemConfiguration
):
sub_config = init_sub_config(field_type)
if sub_config:
user_config_fields[name] = _recurse_user_config_fields(
model=sub_config,
infer_field_value=infer_field_value,
init_sub_config=init_sub_config,
)
elif isinstance(value, list) and all(
isinstance(i, SystemConfiguration) for i in value
):
user_config_fields[name] = [
_recurse_user_config_fields(i, infer_field_value, init_sub_config)
for i in value
]
elif isinstance(value, dict) and all(
isinstance(i, SystemConfiguration) for i in value.values()
):
user_config_fields[name] = {
k: _recurse_user_config_fields(v, infer_field_value, init_sub_config)
for k, v in value.items()
}
return user_config_fields
def _recurse_user_config_values(
instance: BaseModel,
get_field_value: Callable[[FieldInfo, T], T] = lambda _, v: v,
) -> dict[str, Any]:
"""
This function recursively traverses the user configuration values in a Pydantic
model instance.
Params:
instance: A Pydantic model instance.
get_field_value: A callback function to process each field. Parameters:
ModelField: The Pydantic ModelField object that describes the field.
Any: The current value of the field.
Returns:
A dictionary containing the processed user configuration fields of the instance.
"""
user_config_values = {}
for name, value in instance.__dict__.items():
field = instance.model_fields[name]
if _get_field_metadata(field, "user_configurable"):
user_config_values[name] = get_field_value(field, value)
elif isinstance(value, SystemConfiguration):
user_config_values[name] = _recurse_user_config_values(
instance=value, get_field_value=get_field_value
)
elif isinstance(value, list) and all(
isinstance(i, SystemConfiguration) for i in value
):
user_config_values[name] = [
_recurse_user_config_values(i, get_field_value) for i in value
]
elif isinstance(value, dict) and all(
isinstance(i, SystemConfiguration) for i in value.values()
):
user_config_values[name] = {
k: _recurse_user_config_values(v, get_field_value)
for k, v in value.items()
}
return user_config_values
def _get_non_default_user_config_values(instance: BaseModel) -> dict[str, Any]:
"""
Get the non-default user config fields of a Pydantic model instance.
Params:
instance: The Pydantic model instance.
Returns:
dict[str, Any]: The non-default user config values on the instance.
"""
def get_field_value(field: FieldInfo, value):
default = field.default_factory() if field.default_factory else field.default
if value != default:
return value
return remove_none_items(_recurse_user_config_values(instance, get_field_value))
def deep_update(original_dict: dict, update_dict: dict) -> dict:
"""
Recursively update a dictionary.
Params:
original_dict (dict): The dictionary to be updated.
update_dict (dict): The dictionary to update with.
Returns:
dict: The updated dictionary.
"""
for key, value in update_dict.items():
if (
key in original_dict
and isinstance(original_dict[key], dict)
and isinstance(value, dict)
):
original_dict[key] = deep_update(original_dict[key], value)
else:
original_dict[key] = value
return original_dict
def remove_none_items(d):
if isinstance(d, dict):
return {
k: remove_none_items(v)
for k, v in d.items()
if v not in (None, PydanticUndefined)
}
return d

View File

@@ -0,0 +1,12 @@
import logging
class BelowLevelFilter(logging.Filter):
"""Filter for logging levels below a certain threshold."""
def __init__(self, below_level: int):
super().__init__()
self.below_level = below_level
def filter(self, record: logging.LogRecord):
return record.levelno < self.below_level

View File

@@ -0,0 +1,95 @@
import logging
from colorama import Fore, Style
from google.cloud.logging_v2.handlers import CloudLoggingFilter, StructuredLogHandler
from .utils import remove_color_codes
class FancyConsoleFormatter(logging.Formatter):
"""
A custom logging formatter designed for console output.
This formatter enhances the standard logging output with color coding. The color
coding is based on the level of the log message, making it easier to distinguish
between different types of messages in the console output.
The color for each level is defined in the LEVEL_COLOR_MAP class attribute.
"""
# level -> (level & text color, title color)
LEVEL_COLOR_MAP = {
logging.DEBUG: Fore.LIGHTBLACK_EX,
logging.INFO: Fore.BLUE,
logging.WARNING: Fore.YELLOW,
logging.ERROR: Fore.RED,
logging.CRITICAL: Fore.RED + Style.BRIGHT,
}
def format(self, record: logging.LogRecord) -> str:
# Make sure `msg` is a string
if not hasattr(record, "msg"):
record.msg = ""
elif type(record.msg) is not str:
record.msg = str(record.msg)
# Determine default color based on error level
level_color = ""
if record.levelno in self.LEVEL_COLOR_MAP:
level_color = self.LEVEL_COLOR_MAP[record.levelno]
record.levelname = f"{level_color}{record.levelname}{Style.RESET_ALL}"
# Determine color for message
color = getattr(record, "color", level_color)
color_is_specified = hasattr(record, "color")
# Don't color INFO messages unless the color is explicitly specified.
if color and (record.levelno != logging.INFO or color_is_specified):
record.msg = f"{color}{record.msg}{Style.RESET_ALL}"
return super().format(record)
class AGPTFormatter(FancyConsoleFormatter):
def __init__(self, *args, no_color: bool = False, **kwargs):
super().__init__(*args, **kwargs)
self.no_color = no_color
def format(self, record: logging.LogRecord) -> str:
# Make sure `msg` is a string
if not hasattr(record, "msg"):
record.msg = ""
elif type(record.msg) is not str:
record.msg = str(record.msg)
# Strip color from the message to prevent color spoofing
if record.msg and not getattr(record, "preserve_color", False):
record.msg = remove_color_codes(record.msg)
# Determine color for title
title = getattr(record, "title", "")
title_color = getattr(record, "title_color", "") or self.LEVEL_COLOR_MAP.get(
record.levelno, ""
)
if title and title_color:
title = f"{title_color + Style.BRIGHT}{title}{Style.RESET_ALL}"
# Make sure record.title is set, and padded with a space if not empty
record.title = f"{title} " if title else ""
if self.no_color:
return remove_color_codes(super().format(record))
else:
return super().format(record)
class StructuredLoggingFormatter(StructuredLogHandler, logging.Formatter):
def __init__(self):
# Set up CloudLoggingFilter to add diagnostic info to the log records
self.cloud_logging_filter = CloudLoggingFilter()
# Init StructuredLogHandler
super().__init__()
def format(self, record: logging.LogRecord) -> str:
self.cloud_logging_filter.filter(record)
return super().format(record)

View File

@@ -0,0 +1,14 @@
from __future__ import annotations
import json
import logging
class JsonFileHandler(logging.FileHandler):
def format(self, record: logging.LogRecord) -> str:
record.json_data = json.loads(record.getMessage())
return json.dumps(getattr(record, "json_data"), ensure_ascii=False, indent=4)
def emit(self, record: logging.LogRecord) -> None:
with open(self.baseFilename, "w", encoding="utf-8") as f:
f.write(self.format(record))

View File

@@ -0,0 +1,36 @@
import pytest
from .utils import remove_color_codes
@pytest.mark.parametrize(
"raw_text, clean_text",
[
(
"COMMAND = \x1b[36mbrowse_website\x1b[0m "
"ARGUMENTS = \x1b[36m{'url': 'https://www.google.com',"
" 'question': 'What is the capital of France?'}\x1b[0m",
"COMMAND = browse_website "
"ARGUMENTS = {'url': 'https://www.google.com',"
" 'question': 'What is the capital of France?'}",
),
(
"{'Schaue dir meine Projekte auf github () an, als auch meine Webseiten': "
"'https://github.com/Significant-Gravitas/AutoGPT,"
" https://discord.gg/autogpt und https://twitter.com/Auto_GPT'}",
"{'Schaue dir meine Projekte auf github () an, als auch meine Webseiten': "
"'https://github.com/Significant-Gravitas/AutoGPT,"
" https://discord.gg/autogpt und https://twitter.com/Auto_GPT'}",
),
("", ""),
("hello", "hello"),
("hello\x1B[31m world", "hello world"),
("\x1B[36mHello,\x1B[32m World!", "Hello, World!"),
(
"\x1B[1m\x1B[31mError:\x1B[0m\x1B[31m file not found",
"Error: file not found",
),
],
)
def test_remove_color_codes(raw_text, clean_text):
assert remove_color_codes(raw_text) == clean_text

View File

@@ -0,0 +1,33 @@
import logging
import re
from typing import Any
from colorama import Fore
def remove_color_codes(s: str) -> str:
return re.sub(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])", "", s)
def fmt_kwargs(kwargs: dict) -> str:
return ", ".join(f"{n}={repr(v)}" for n, v in kwargs.items())
def print_attribute(
title: str, value: Any, title_color: str = Fore.GREEN, value_color: str = ""
) -> None:
logger = logging.getLogger()
logger.info(
str(value),
extra={
"title": f"{title.rstrip(':')}:",
"title_color": title_color,
"color": value_color,
},
)
def speak(message: str, level: int = logging.INFO) -> None:
from .config import SPEECH_OUTPUT_LOGGER
logging.getLogger(SPEECH_OUTPUT_LOGGER).log(level, message)

View File

@@ -12,6 +12,8 @@ pydantic = "^2.8.2"
pyjwt = "^2.8.0"
python-dotenv = "^1.0.1"
supabase = "^2.7.2"
colorama = "^0.4.6"
google-cloud-logging = "^3.8.0"
[build-system]

View File

@@ -23,8 +23,6 @@ ENV POETRY_VERSION=1.8.3 \
PATH="$POETRY_HOME/bin:$PATH"
RUN pip3 install poetry
COPY autogpt /app/autogpt
COPY forge /app/forge
COPY rnd/autogpt_libs /app/rnd/autogpt_libs
WORKDIR /app/rnd/autogpt_server

View File

@@ -25,8 +25,6 @@ ENV POETRY_VERSION=1.8.3 \
PATH="$POETRY_HOME/bin:$PATH"
RUN pip3 install poetry
COPY autogpt /app/autogpt
COPY forge /app/forge
COPY rnd/autogpt_libs /app/rnd/autogpt_libs
WORKDIR /app/rnd/autogpt_server
@@ -38,7 +36,6 @@ COPY rnd/autogpt_server/schema.prisma ./
RUN poetry run prisma generate
COPY rnd/autogpt_server /app/rnd/autogpt_server
FROM server_base as server
FROM server_base as server

View File

@@ -1,190 +0,0 @@
from __future__ import annotations
import asyncio
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Iterator
from autogpt.agents.agent import Agent, AgentSettings
from autogpt.app.config import ConfigBuilder
from forge.agent.components import AgentComponent
from forge.agent.protocols import CommandProvider
from forge.command import command
from forge.command.command import Command
from forge.file_storage import FileStorageBackendName, get_storage
from forge.file_storage.base import FileStorage
from forge.llm.providers import MultiProvider
from forge.llm.providers.openai import OpenAICredentials, OpenAIProvider
from forge.llm.providers.schema import ModelProviderName
from forge.models.json_schema import JSONSchema
from pydantic import Field, SecretStr
from autogpt_server.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from autogpt_server.data.model import BlockSecret, SchemaField, SecretField
if TYPE_CHECKING:
from autogpt.app.config import AppConfig
logger = logging.getLogger(__name__)
class BlockAgentSettings(AgentSettings):
enabled_components: list[str] = Field(default_factory=list)
class OutputComponent(CommandProvider):
def get_commands(self) -> Iterator[Command]:
yield self.output
@command(
parameters={
"output": JSONSchema(
type=JSONSchema.Type.STRING,
description="Output data to be returned.",
required=True,
),
},
)
def output(self, output: str) -> str:
"""Use this to output the result."""
return output
class BlockAgent(Agent):
def __init__(
self,
settings: BlockAgentSettings,
llm_provider: MultiProvider,
file_storage: FileStorage,
app_config: AppConfig,
):
super().__init__(settings, llm_provider, file_storage, app_config)
self.output = OutputComponent()
# Disable components
for attr_name in list(self.__dict__.keys()):
attr_value = getattr(self, attr_name)
if not isinstance(attr_value, AgentComponent):
continue
component_name = type(attr_value).__name__
if (
component_name != "SystemComponent"
and component_name not in settings.enabled_components
):
delattr(self, attr_name)
class AutoGPTAgentBlock(Block):
class Input(BlockSchema):
task: str = SchemaField(
description="Task description for the agent.",
placeholder="Calculate and use Output command",
)
input: str = SchemaField(
description="Input data for the task",
placeholder="8 + 5",
)
openai_api_key: BlockSecret = SecretField(
key="openai_api_key", description="OpenAI API key"
)
enabled_components: list[str] = Field(
default_factory=lambda: [OutputComponent.__name__],
description="List of [AgentComponents](https://docs.agpt.co/forge/components/built-in-components/) enabled for the agent.",
)
disabled_commands: list[str] = Field(
default_factory=list,
description="List of commands from enabled components to disable.",
)
fast_mode: bool = Field(
False,
description="If true uses fast llm, otherwise uses smart and slow llm.",
)
class Output(BlockSchema):
result: str
def __init__(self):
super().__init__(
id="d2e2ecd2-9ae6-422d-8dfe-ceca500ce6a6",
description="AutoGPT agent, it utilizes a Large Language Model and enabled components/tools to perform a task.",
categories={BlockCategory.AI},
input_schema=AutoGPTAgentBlock.Input,
output_schema=AutoGPTAgentBlock.Output,
test_input={
"task": "Make calculations and use output command to output the result",
"input": "5 + 3",
"openai_api_key": "openai_api_key",
"enabled_components": [OutputComponent.__name__],
"disabled_commands": ["finish"],
"fast_mode": True,
},
test_output=[
("result", "8"),
],
test_mock={
"get_provider": lambda _: MultiProvider(),
"get_result": lambda _: "8",
},
)
@staticmethod
def get_provider(openai_api_key: str) -> MultiProvider:
# LLM provider
settings = OpenAIProvider.default_settings.model_copy()
settings.credentials = OpenAICredentials(api_key=SecretStr(openai_api_key))
openai_provider = OpenAIProvider(settings=settings)
multi_provider = MultiProvider()
# HACK: Add OpenAI provider to the multi provider with api key
multi_provider._provider_instances[ModelProviderName.OPENAI] = openai_provider
return multi_provider
@staticmethod
def get_result(agent: BlockAgent) -> str:
error: Exception | None = None
for tries in range(3):
try:
proposal = asyncio.run(agent.propose_action())
result = asyncio.run(agent.execute(proposal))
return str(result)
except Exception as e:
error = e
raise error or Exception("Failed to get result")
def run(self, input_data: Input) -> BlockOutput:
# Set up configuration
config = ConfigBuilder.build_config_from_env()
# Disable commands
config.disabled_commands.extend(input_data.disabled_commands)
# Storage
local = config.file_storage_backend == FileStorageBackendName.LOCAL
restrict_to_root = not local or config.restrict_to_workspace
file_storage = get_storage(
config.file_storage_backend,
root_path=Path("data"),
restrict_to_root=restrict_to_root,
)
file_storage.initialize()
# State
state = BlockAgentSettings(
agent_id="TemporaryAgentID",
name="WrappedAgent",
description="Wrapped agent for the Agent Server.",
task=f"Your task: {input_data.task}\n" f"Input data: {input_data.input}",
enabled_components=input_data.enabled_components,
)
# Switch big brain mode
state.config.big_brain = not input_data.fast_mode
provider = self.get_provider(input_data.openai_api_key.get_secret_value())
agent = BlockAgent(state, provider, file_storage, config)
result = self.get_result(agent)
yield "result", result

View File

@@ -1,12 +1,12 @@
import os
from forge.logging.config import LogFormatName
from autogpt_libs.logging.config import LogFormatName
def configure_logging():
import logging
from forge.logging import configure_logging
from autogpt_libs.logging import configure_logging
if os.getenv("APP_ENV") != "cloud":
configure_logging()

View File

@@ -10,11 +10,9 @@ readme = "README.md"
[tool.poetry.dependencies]
python = "^3.10"
agpt = { path = "../../autogpt", develop = true }
aio-pika = "^9.4.3"
anthropic = "^0.25.1"
apscheduler = "^3.10.4"
autogpt-forge = { path = "../../forge", develop = true }
autogpt-libs = { path = "../autogpt_libs" }
click = "^8.1.7"
croniter = "^2.0.5"