From dcd6aa912b992d7c024ea1b668d3249b201b0c2c Mon Sep 17 00:00:00 2001 From: James Collins Date: Sun, 23 Apr 2023 12:36:04 -0700 Subject: [PATCH] Add workspace abstraction (#2982) * Add workspace abstraction * Remove old workspace implementation * Extract path resolution to a helper function * Add api key requirements to new tests --- autogpt/agent/agent.py | 17 +++ autogpt/cli.py | 36 +++++- autogpt/commands/audio_text.py | 6 +- autogpt/commands/command.py | 3 + autogpt/commands/execute_code.py | 26 ++-- autogpt/commands/file_operations.py | 40 ++---- autogpt/commands/git_operations.py | 6 +- autogpt/commands/image_gen.py | 9 +- autogpt/config/config.py | 3 + autogpt/workspace.py | 48 ------- autogpt/workspace/__init__.py | 5 + autogpt/workspace/workspace.py | 120 ++++++++++++++++++ tests/conftest.py | 16 +++ .../goal_oriented/test_write_file.py | 30 ++--- tests/test_image_gen.py | 30 +++-- tests/test_workspace.py | 86 +++++++++++++ tests/unit/test_file_operations.py | 90 +++++-------- tests/unit/test_setup.py | 2 + tests/utils.py | 2 + 19 files changed, 379 insertions(+), 196 deletions(-) delete mode 100644 autogpt/workspace.py create mode 100644 autogpt/workspace/__init__.py create mode 100644 autogpt/workspace/workspace.py create mode 100644 tests/test_workspace.py diff --git a/autogpt/agent/agent.py b/autogpt/agent/agent.py index 189338f51b..8c2bfb743f 100644 --- a/autogpt/agent/agent.py +++ b/autogpt/agent/agent.py @@ -9,6 +9,7 @@ from autogpt.logs import logger, print_assistant_thoughts from autogpt.speech import say_text from autogpt.spinner import Spinner from autogpt.utils import clean_input +from autogpt.workspace import Workspace class Agent: @@ -50,7 +51,9 @@ class Agent: config, system_prompt, triggering_prompt, + workspace_directory, ): + cfg = Config() self.ai_name = ai_name self.memory = memory self.full_message_history = full_message_history @@ -59,6 +62,7 @@ class Agent: self.config = config self.system_prompt = system_prompt self.triggering_prompt = triggering_prompt + self.workspace = Workspace(workspace_directory, cfg.restrict_to_workspace) def start_interaction_loop(self): # Interaction Loop @@ -107,6 +111,8 @@ class Agent: command_name, arguments = get_command(assistant_reply_json) if cfg.speak_mode: say_text(f"I want to execute {command_name}") + arguments = self._resolve_pathlike_command_args(arguments) + except Exception as e: logger.error("Error: \n", str(e)) @@ -226,3 +232,14 @@ class Agent: logger.typewriter_log( "SYSTEM: ", Fore.YELLOW, "Unable to execute command" ) + + def _resolve_pathlike_command_args(self, command_args): + if "directory" in command_args and command_args["directory"] in {"", "/"}: + command_args["directory"] = str(self.workspace.root) + else: + for pathlike in ["filename", "directory", "clone_path"]: + if pathlike in command_args: + command_args[pathlike] = str( + self.workspace.get_path(command_args[pathlike]) + ) + return command_args diff --git a/autogpt/cli.py b/autogpt/cli.py index 51a946a73e..75908a1e2b 100644 --- a/autogpt/cli.py +++ b/autogpt/cli.py @@ -47,6 +47,14 @@ import click is_flag=True, help="Specifies whether to suppress the output of latest news on startup.", ) +@click.option( + # TODO: this is a hidden option for now, necessary for integration testing. + # We should make this public once we're ready to roll out agent specific workspaces. + "--workspace-directory", + "-w", + type=click.Path(), + hidden=True, +) @click.pass_context def main( ctx: click.Context, @@ -62,6 +70,7 @@ def main( browser_name: str, allow_downloads: bool, skip_news: bool, + workspace_directory: str, ) -> None: """ Welcome to AutoGPT an experimental open-source application showcasing the capabilities of the GPT-4 pushing the boundaries of AI. @@ -71,6 +80,7 @@ def main( # Put imports inside function to avoid importing everything when starting the CLI import logging import sys + from pathlib import Path from colorama import Fore @@ -83,6 +93,7 @@ def main( from autogpt.plugins import scan_plugins from autogpt.prompts.prompt import construct_main_ai_config from autogpt.utils import get_current_git_branch, get_latest_bulletin + from autogpt.workspace import Workspace if ctx.invoked_subcommand is None: cfg = Config() @@ -103,7 +114,6 @@ def main( skip_news, ) logger.set_level(logging.DEBUG if cfg.debug_mode else logging.INFO) - ai_name = "" if not cfg.skip_news: motd = get_latest_bulletin() if motd: @@ -126,7 +136,6 @@ def main( "Please consider upgrading to Python 3.10 or higher.", ) - cfg = Config() cfg.set_plugins(scan_plugins(cfg, cfg.debug_mode)) # Create a CommandRegistry instance and scan default folder command_registry = CommandRegistry() @@ -142,6 +151,7 @@ def main( command_registry.import_commands("autogpt.commands.web_selenium") command_registry.import_commands("autogpt.commands.write_tests") command_registry.import_commands("autogpt.app") + ai_name = "" ai_config = construct_main_ai_config() ai_config.command_registry = command_registry @@ -164,6 +174,27 @@ def main( system_prompt = ai_config.construct_full_prompt() if cfg.debug_mode: logger.typewriter_log("Prompt:", Fore.GREEN, system_prompt) + + # TODO: have this directory live outside the repository (e.g. in a user's + # home directory) and have it come in as a command line argument or part of + # the env file. + if workspace_directory is None: + workspace_directory = Path(__file__).parent / "auto_gpt_workspace" + else: + workspace_directory = Path(workspace_directory) + # TODO: pass in the ai_settings file and the env file and have them cloned into + # the workspace directory so we can bind them to the agent. + workspace_directory = Workspace.make_workspace(workspace_directory) + cfg.workspace_path = str(workspace_directory) + + # HACK: doing this here to collect some globals that depend on the workspace. + file_logger_path = workspace_directory / "file_logger.txt" + if not file_logger_path.exists(): + with file_logger_path.open(mode="w", encoding="utf-8") as f: + f.write("File Operation Logger ") + + cfg.file_logger_path = str(file_logger_path) + agent = Agent( ai_name=ai_name, memory=memory, @@ -173,6 +204,7 @@ def main( config=ai_config, system_prompt=system_prompt, triggering_prompt=triggering_prompt, + workspace_directory=workspace_directory, ) agent.start_interaction_loop() diff --git a/autogpt/commands/audio_text.py b/autogpt/commands/audio_text.py index b409fefdce..0a8640cf57 100644 --- a/autogpt/commands/audio_text.py +++ b/autogpt/commands/audio_text.py @@ -5,7 +5,6 @@ import requests from autogpt.commands.command import command from autogpt.config import Config -from autogpt.workspace import path_in_workspace CFG = Config() @@ -22,13 +21,12 @@ def read_audio_from_file(filename: str) -> str: Convert audio to text. Args: - audio_path (str): The path to the audio file + filename (str): The path to the audio file Returns: str: The text from the audio """ - audio_path = path_in_workspace(filename) - with open(audio_path, "rb") as audio_file: + with open(filename, "rb") as audio_file: audio = audio_file.read() return read_audio(audio) diff --git a/autogpt/commands/command.py b/autogpt/commands/command.py index e97af008a4..22ebace5a6 100644 --- a/autogpt/commands/command.py +++ b/autogpt/commands/command.py @@ -1,3 +1,4 @@ +import functools import importlib import inspect from typing import Any, Callable, Optional @@ -142,12 +143,14 @@ def command( disabled_reason=disabled_reason, ) + @functools.wraps(func) def wrapper(*args, **kwargs) -> Any: return func(*args, **kwargs) wrapper.command = cmd setattr(wrapper, AUTO_GPT_COMMAND_IDENTIFIER, True) + return wrapper return decorator diff --git a/autogpt/commands/execute_code.py b/autogpt/commands/execute_code.py index ff35d4286f..71c1bd2e97 100644 --- a/autogpt/commands/execute_code.py +++ b/autogpt/commands/execute_code.py @@ -7,7 +7,6 @@ from docker.errors import ImageNotFound from autogpt.commands.command import command from autogpt.config import Config -from autogpt.workspace import WORKSPACE_PATH, path_in_workspace CFG = Config() @@ -22,20 +21,17 @@ def execute_python_file(filename: str) -> str: Returns: str: The output of the file """ - file = filename - print(f"Executing file '{file}' in workspace '{WORKSPACE_PATH}'") + print(f"Executing file '{filename}'") - if not file.endswith(".py"): + if not filename.endswith(".py"): return "Error: Invalid file type. Only .py files are allowed." - file_path = path_in_workspace(file) - - if not os.path.isfile(file_path): - return f"Error: File '{file}' does not exist." + if not os.path.isfile(filename): + return f"Error: File '{filename}' does not exist." if we_are_running_in_a_docker_container(): result = subprocess.run( - f"python {file_path}", capture_output=True, encoding="utf8", shell=True + f"python {filename}", capture_output=True, encoding="utf8", shell=True ) if result.returncode == 0: return result.stdout @@ -67,9 +63,9 @@ def execute_python_file(filename: str) -> str: container = client.containers.run( image_name, - f"python {file}", + f"python {filename}", volumes={ - os.path.abspath(WORKSPACE_PATH): { + CFG.workspace_path: { "bind": "/workspace", "mode": "ro", } @@ -126,8 +122,8 @@ def execute_shell(command_line: str) -> str: ) current_dir = os.getcwd() # Change dir into workspace if necessary - if str(WORKSPACE_PATH) not in current_dir: - os.chdir(WORKSPACE_PATH) + if CFG.workspace_path not in current_dir: + os.chdir(CFG.workspace_path) print(f"Executing command '{command_line}' in working directory '{os.getcwd()}'") @@ -160,8 +156,8 @@ def execute_shell_popen(command_line) -> str: """ current_dir = os.getcwd() # Change dir into workspace if necessary - if str(WORKSPACE_PATH) not in current_dir: - os.chdir(WORKSPACE_PATH) + if CFG.workspace_path not in current_dir: + os.chdir(CFG.workspace_path) print(f"Executing command '{command_line}' in working directory '{os.getcwd()}'") diff --git a/autogpt/commands/file_operations.py b/autogpt/commands/file_operations.py index cb2d80e157..9999fccf1e 100644 --- a/autogpt/commands/file_operations.py +++ b/autogpt/commands/file_operations.py @@ -13,11 +13,8 @@ from autogpt.commands.command import command from autogpt.config import Config from autogpt.spinner import Spinner from autogpt.utils import readable_file_size -from autogpt.workspace import WORKSPACE_PATH, path_in_workspace CFG = Config() -LOG_FILE = "file_logger.txt" -LOG_FILE_PATH = WORKSPACE_PATH / LOG_FILE def check_duplicate_operation(operation: str, filename: str) -> bool: @@ -30,7 +27,7 @@ def check_duplicate_operation(operation: str, filename: str) -> bool: Returns: bool: True if the operation has already been performed on the file """ - log_content = read_file(LOG_FILE) + log_content = read_file(CFG.file_logger_path) log_entry = f"{operation}: {filename}\n" return log_entry in log_content @@ -43,12 +40,7 @@ def log_operation(operation: str, filename: str) -> None: filename (str): The name of the file the operation was performed on """ log_entry = f"{operation}: {filename}\n" - - # Create the log file if it doesn't exist - if not os.path.exists(LOG_FILE_PATH): - with open(LOG_FILE_PATH, "w", encoding="utf-8") as f: - f.write("File Operation Logger ") - append_to_file(str(LOG_FILE_PATH), log_entry, should_log=False) + append_to_file(CFG.file_logger_path, log_entry, should_log=False) def split_file( @@ -93,9 +85,8 @@ def read_file(filename: str) -> str: Returns: str: The contents of the file """ - filepath = path_in_workspace(filename) try: - with open(filepath, "r", encoding="utf-8") as f: + with open(filename, "r", encoding="utf-8") as f: content = f.read() return content except Exception as e: @@ -150,11 +141,10 @@ def write_to_file(filename: str, text: str) -> str: if check_duplicate_operation("write", filename): return "Error: File has already been updated." try: - filepath = path_in_workspace(filename) - directory = os.path.dirname(filepath) + directory = os.path.dirname(filename) if not os.path.exists(directory): os.makedirs(directory) - with open(filepath, "w", encoding="utf-8") as f: + with open(filename, "w", encoding="utf-8") as f: f.write(text) log_operation("write", filename) return "File written to successfully." @@ -177,8 +167,7 @@ def append_to_file(filename: str, text: str, should_log: bool = True) -> str: str: A message indicating success or failure """ try: - filepath = path_in_workspace(filename) - with open(filepath, "a") as f: + with open(filename, "a") as f: f.write(text) if should_log: @@ -202,8 +191,7 @@ def delete_file(filename: str) -> str: if check_duplicate_operation("delete", filename): return "Error: File has already been deleted." try: - filepath = path_in_workspace(filename) - os.remove(filepath) + os.remove(filename) log_operation("delete", filename) return "File deleted successfully." except Exception as e: @@ -222,16 +210,13 @@ def search_files(directory: str) -> list[str]: """ found_files = [] - if directory in {"", "/"}: - search_directory = WORKSPACE_PATH - else: - search_directory = path_in_workspace(directory) - - for root, _, files in os.walk(search_directory): + for root, _, files in os.walk(directory): for file in files: if file.startswith("."): continue - relative_path = os.path.relpath(os.path.join(root, file), WORKSPACE_PATH) + relative_path = os.path.relpath( + os.path.join(root, file), CFG.workspace_path + ) found_files.append(relative_path) return found_files @@ -250,7 +235,6 @@ def download_file(url, filename): url (str): URL of the file to download filename (str): Filename to save the file as """ - safe_filename = path_in_workspace(filename) try: message = f"{Fore.YELLOW}Downloading file from {Back.LIGHTBLUE_EX}{url}{Back.RESET}{Fore.RESET}" with Spinner(message) as spinner: @@ -268,7 +252,7 @@ def download_file(url, filename): total_size = int(r.headers.get("Content-Length", 0)) downloaded_size = 0 - with open(safe_filename, "wb") as f: + with open(filename, "wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) downloaded_size += len(chunk) diff --git a/autogpt/commands/git_operations.py b/autogpt/commands/git_operations.py index 1a6beb95de..c373b8c0a8 100644 --- a/autogpt/commands/git_operations.py +++ b/autogpt/commands/git_operations.py @@ -3,7 +3,6 @@ from git.repo import Repo from autogpt.commands.command import command from autogpt.config import Config -from autogpt.workspace import path_in_workspace CFG = Config() @@ -27,9 +26,8 @@ def clone_repository(repository_url: str, clone_path: str) -> str: """ split_url = repository_url.split("//") auth_repo_url = f"//{CFG.github_username}:{CFG.github_api_key}@".join(split_url) - safe_clone_path = path_in_workspace(clone_path) try: - Repo.clone_from(auth_repo_url, safe_clone_path) - return f"""Cloned {repository_url} to {safe_clone_path}""" + Repo.clone_from(auth_repo_url, clone_path) + return f"""Cloned {repository_url} to {clone_path}""" except Exception as e: return f"Error: {str(e)}" diff --git a/autogpt/commands/image_gen.py b/autogpt/commands/image_gen.py index 962c41fb60..834432c591 100644 --- a/autogpt/commands/image_gen.py +++ b/autogpt/commands/image_gen.py @@ -9,7 +9,6 @@ from PIL import Image from autogpt.commands.command import command from autogpt.config import Config -from autogpt.workspace import path_in_workspace CFG = Config() @@ -25,7 +24,7 @@ def generate_image(prompt: str, size: int = 256) -> str: Returns: str: The filename of the image """ - filename = f"{str(uuid.uuid4())}.jpg" + filename = f"{CFG.workspace_path}/{str(uuid.uuid4())}.jpg" # DALL-E if CFG.image_provider == "dalle": @@ -72,7 +71,7 @@ def generate_image_with_hf(prompt: str, filename: str) -> str: image = Image.open(io.BytesIO(response.content)) print(f"Image Generated for prompt:{prompt}") - image.save(path_in_workspace(filename)) + image.save(filename) return f"Saved to disk:{filename}" @@ -109,7 +108,7 @@ def generate_image_with_dalle(prompt: str, filename: str, size: int) -> str: image_data = b64decode(response["data"][0]["b64_json"]) - with open(path_in_workspace(filename), mode="wb") as png: + with open(filename, mode="wb") as png: png.write(image_data) return f"Saved to disk:{filename}" @@ -160,6 +159,6 @@ def generate_image_with_sd_webui( response = response.json() b64 = b64decode(response["images"][0].split(",", 1)[0]) image = Image.open(io.BytesIO(b64)) - image.save(path_in_workspace(filename)) + image.save(filename) return f"Saved to disk:{filename}" diff --git a/autogpt/config/config.py b/autogpt/config/config.py index b8981beefb..7fa849e557 100644 --- a/autogpt/config/config.py +++ b/autogpt/config/config.py @@ -20,6 +20,9 @@ class Config(metaclass=Singleton): def __init__(self) -> None: """Initialize the Config class""" + self.workspace_path = None + self.file_logger_path = None + self.debug_mode = False self.continuous_mode = False self.continuous_limit = 0 diff --git a/autogpt/workspace.py b/autogpt/workspace.py deleted file mode 100644 index 724f8443e4..0000000000 --- a/autogpt/workspace.py +++ /dev/null @@ -1,48 +0,0 @@ -from __future__ import annotations - -import os -from pathlib import Path - -from autogpt.config import Config - -CFG = Config() - -# Set a dedicated folder for file I/O -WORKSPACE_PATH = Path(os.getcwd()) / "auto_gpt_workspace" - -# Create the directory if it doesn't exist -if not os.path.exists(WORKSPACE_PATH): - os.makedirs(WORKSPACE_PATH) - - -def path_in_workspace(relative_path: str | Path) -> Path: - """Get full path for item in workspace - - Parameters: - relative_path (str | Path): Path to translate into the workspace - - Returns: - Path: Absolute path for the given path in the workspace - """ - return safe_path_join(WORKSPACE_PATH, relative_path) - - -def safe_path_join(base: Path, *paths: str | Path) -> Path: - """Join one or more path components, asserting the resulting path is within the workspace. - - Args: - base (Path): The base path - *paths (str): The paths to join to the base path - - Returns: - Path: The joined path - """ - base = base.resolve() - joined_path = base.joinpath(*paths).resolve() - - if CFG.restrict_to_workspace and not joined_path.is_relative_to(base): - raise ValueError( - f"Attempted to access path '{joined_path}' outside of workspace '{base}'." - ) - - return joined_path diff --git a/autogpt/workspace/__init__.py b/autogpt/workspace/__init__.py new file mode 100644 index 0000000000..b348144b7e --- /dev/null +++ b/autogpt/workspace/__init__.py @@ -0,0 +1,5 @@ +from autogpt.workspace.workspace import Workspace + +__all__ = [ + "Workspace", +] diff --git a/autogpt/workspace/workspace.py b/autogpt/workspace/workspace.py new file mode 100644 index 0000000000..b06fa9eb0f --- /dev/null +++ b/autogpt/workspace/workspace.py @@ -0,0 +1,120 @@ +""" +========= +Workspace +========= + +The workspace is a directory containing configuration and working files for an AutoGPT +agent. + +""" +from __future__ import annotations + +from pathlib import Path + + +class Workspace: + """A class that represents a workspace for an AutoGPT agent.""" + + def __init__(self, workspace_root: str | Path, restrict_to_workspace: bool): + self._root = self._sanitize_path(workspace_root) + self._restrict_to_workspace = restrict_to_workspace + + @property + def root(self) -> Path: + """The root directory of the workspace.""" + return self._root + + @property + def restrict_to_workspace(self): + """Whether to restrict generated paths to the workspace.""" + return self._restrict_to_workspace + + @classmethod + def make_workspace(cls, workspace_directory: str | Path, *args, **kwargs) -> Path: + """Create a workspace directory and return the path to it. + + Parameters + ---------- + workspace_directory + The path to the workspace directory. + + Returns + ------- + Path + The path to the workspace directory. + + """ + # TODO: have this make the env file and ai settings file in the directory. + workspace_directory = cls._sanitize_path(workspace_directory) + workspace_directory.mkdir(exist_ok=True, parents=True) + return workspace_directory + + def get_path(self, relative_path: str | Path) -> Path: + """Get the full path for an item in the workspace. + + Parameters + ---------- + relative_path + The relative path to resolve in the workspace. + + Returns + ------- + Path + The resolved path relative to the workspace. + + """ + return self._sanitize_path( + relative_path, + root=self.root, + restrict_to_root=self.restrict_to_workspace, + ) + + @staticmethod + def _sanitize_path( + relative_path: str | Path, + root: str | Path = None, + restrict_to_root: bool = True, + ) -> Path: + """Resolve the relative path within the given root if possible. + + Parameters + ---------- + relative_path + The relative path to resolve. + root + The root path to resolve the relative path within. + restrict_to_root + Whether to restrict the path to the root. + + Returns + ------- + Path + The resolved path. + + Raises + ------ + ValueError + If the path is absolute and a root is provided. + ValueError + If the path is outside the root and the root is restricted. + + """ + + if root is None: + return Path(relative_path).resolve() + + root, relative_path = Path(root), Path(relative_path) + + if relative_path.is_absolute(): + raise ValueError( + f"Attempted to access absolute path '{relative_path}' in workspace '{root}'." + ) + + full_path = root.joinpath(relative_path).resolve() + + if restrict_to_root and not full_path.is_relative_to(root): + raise ValueError( + f"Attempted to access path '{full_path}' outside of workspace '{root}'." + ) + + return full_path diff --git a/tests/conftest.py b/tests/conftest.py index bf6bd6c59f..99ccb940d6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,19 @@ +from pathlib import Path + +import pytest from dotenv import load_dotenv +from autogpt.workspace import Workspace + load_dotenv() + + +@pytest.fixture() +def workspace_root(tmp_path) -> Path: + return tmp_path / "home/users/monty/auto_gpt_workspace" + + +@pytest.fixture() +def workspace(workspace_root: Path) -> Workspace: + workspace_root = Workspace.make_workspace(workspace_root) + return Workspace(workspace_root, restrict_to_workspace=True) diff --git a/tests/integration/goal_oriented/test_write_file.py b/tests/integration/goal_oriented/test_write_file.py index d995c7a39a..053d6e08c2 100644 --- a/tests/integration/goal_oriented/test_write_file.py +++ b/tests/integration/goal_oriented/test_write_file.py @@ -6,12 +6,9 @@ import vcr from autogpt.agent import Agent from autogpt.commands.command import CommandRegistry -from autogpt.commands.file_operations import LOG_FILE, delete_file, read_file -from autogpt.config import AIConfig, Config, check_openai_api_key +from autogpt.commands.file_operations import delete_file, read_file +from autogpt.config import AIConfig, Config from autogpt.memory import get_memory - -# from autogpt.prompt import Prompt -from autogpt.workspace import WORKSPACE_PATH from tests.integration.goal_oriented.vcr_helper import before_record_request from tests.utils import requires_api_key @@ -28,19 +25,12 @@ CFG = Config() @requires_api_key("OPENAI_API_KEY") -def test_write_file() -> None: - # if file exist - file_name = "hello_world.txt" +def test_write_file(workspace) -> None: + CFG.workspace_path = workspace.root + CFG.file_logger_path = os.path.join(workspace.root, "file_logger.txt") - file_path_to_write_into = f"{WORKSPACE_PATH}/{file_name}" - if os.path.exists(file_path_to_write_into): - os.remove(file_path_to_write_into) - file_logger_path = f"{WORKSPACE_PATH}/{LOG_FILE}" - if os.path.exists(file_logger_path): - os.remove(file_logger_path) - - delete_file(file_name) - agent = create_writer_agent() + file_name = str(workspace.get_path("hello_world.txt")) + agent = create_writer_agent(workspace) try: with my_vcr.use_cassette( "write_file.vcr.yml", @@ -58,14 +48,11 @@ def test_write_file() -> None: assert False, "The process took longer than 45 seconds to complete." # catch system exit exceptions except SystemExit: # the agent returns an exception when it shuts down - content = "" content = read_file(file_name) - os.remove(file_path_to_write_into) - assert content == "Hello World", f"Expected 'Hello World', got {content}" -def create_writer_agent(): +def create_writer_agent(workspace): command_registry = CommandRegistry() command_registry.import_commands("autogpt.commands.file_operations") command_registry.import_commands("autogpt.app") @@ -96,6 +83,7 @@ def create_writer_agent(): next_action_count=0, system_prompt=system_prompt, triggering_prompt=triggering_prompt, + workspace_directory=workspace.root, ) CFG.set_continuous_mode(True) CFG.set_memory_backend("no_memory") diff --git a/tests/test_image_gen.py b/tests/test_image_gen.py index 58b8337f42..d29d5aa7ae 100644 --- a/tests/test_image_gen.py +++ b/tests/test_image_gen.py @@ -1,39 +1,46 @@ import hashlib import os +import shutil import unittest +from pathlib import Path from PIL import Image from autogpt.commands.image_gen import generate_image, generate_image_with_sd_webui from autogpt.config import Config -from autogpt.workspace import path_in_workspace +from autogpt.workspace import Workspace from tests.utils import requires_api_key def lst(txt): - return txt.split(":")[1].strip() + return Path(txt.split(":")[1].strip()) -@unittest.skipIf(os.getenv("CI"), "Skipping image generation tests") +@unittest.skip("Skipping image generation tests") class TestImageGen(unittest.TestCase): def setUp(self): self.config = Config() + workspace_path = os.path.join(os.path.dirname(__file__), "workspace") + self.workspace_path = Workspace.make_workspace(workspace_path) + self.config.workspace_path = workspace_path + self.workspace = Workspace(workspace_path, restrict_to_workspace=True) + + def tearDown(self) -> None: + shutil.rmtree(self.workspace_path) @requires_api_key("OPENAI_API_KEY") def test_dalle(self): self.config.image_provider = "dalle" # Test using size 256 - result = lst(generate_image("astronaut riding a horse", 256)) - image_path = path_in_workspace(result) + image_path = lst(generate_image("astronaut riding a horse", 256)) self.assertTrue(image_path.exists()) with Image.open(image_path) as img: self.assertEqual(img.size, (256, 256)) image_path.unlink() # Test using size 512 - result = lst(generate_image("astronaut riding a horse", 512)) - image_path = path_in_workspace(result) + image_path = lst(generate_image("astronaut riding a horse", 512)) with Image.open(image_path) as img: self.assertEqual(img.size, (512, 512)) image_path.unlink() @@ -44,8 +51,7 @@ class TestImageGen(unittest.TestCase): # Test usin SD 1.4 model and size 512 self.config.huggingface_image_model = "CompVis/stable-diffusion-v1-4" - result = lst(generate_image("astronaut riding a horse", 512)) - image_path = path_in_workspace(result) + image_path = lst(generate_image("astronaut riding a horse", 512)) self.assertTrue(image_path.exists()) with Image.open(image_path) as img: self.assertEqual(img.size, (512, 512)) @@ -53,8 +59,7 @@ class TestImageGen(unittest.TestCase): # Test using SD 2.1 768 model and size 768 self.config.huggingface_image_model = "stabilityai/stable-diffusion-2-1" - result = lst(generate_image("astronaut riding a horse", 768)) - image_path = path_in_workspace(result) + image_path = lst(generate_image("astronaut riding a horse", 768)) with Image.open(image_path) as img: self.assertEqual(img.size, (768, 768)) image_path.unlink() @@ -64,8 +69,7 @@ class TestImageGen(unittest.TestCase): return # Test using size 128 - result = lst(generate_image_with_sd_webui("astronaut riding a horse", 128)) - image_path = path_in_workspace(result) + image_path = lst(generate_image_with_sd_webui("astronaut riding a horse", 128)) self.assertTrue(image_path.exists()) with Image.open(image_path) as img: self.assertEqual(img.size, (128, 128)) diff --git a/tests/test_workspace.py b/tests/test_workspace.py new file mode 100644 index 0000000000..33c096d316 --- /dev/null +++ b/tests/test_workspace.py @@ -0,0 +1,86 @@ +from pathlib import Path + +import pytest + +from autogpt.workspace import Workspace + +_WORKSPACE_ROOT = Path("home/users/monty/auto_gpt_workspace") + +_ACCESSIBLE_PATHS = [ + Path("."), + Path("test_file.txt"), + Path("test_folder"), + Path("test_folder/test_file.txt"), + Path("test_folder/.."), + Path("test_folder/../test_file.txt"), + Path("test_folder/../test_folder"), + Path("test_folder/../test_folder/test_file.txt"), +] + +_INACCESSIBLE_PATHS = [ + # Takes us out of the workspace + Path(".."), + Path("../test_file.txt"), + Path("../not_auto_gpt_workspace"), + Path("../not_auto_gpt_workspace/test_file.txt"), + Path("test_folder/../.."), + Path("test_folder/../../test_file.txt"), + Path("test_folder/../../not_auto_gpt_workspace"), + Path("test_folder/../../not_auto_gpt_workspace/test_file.txt"), + # Contains null bytes + Path("\x00"), + Path("\x00test_file.txt"), + Path("test_folder/\x00"), + Path("test_folder/\x00test_file.txt"), + # Absolute paths + Path("/"), + Path("/test_file.txt"), + Path("/home"), +] + + +@pytest.fixture() +def workspace_root(tmp_path): + return tmp_path / _WORKSPACE_ROOT + + +@pytest.fixture(params=_ACCESSIBLE_PATHS) +def accessible_path(request): + return request.param + + +@pytest.fixture(params=_INACCESSIBLE_PATHS) +def inaccessible_path(request): + return request.param + + +def test_sanitize_path_accessible(accessible_path, workspace_root): + full_path = Workspace._sanitize_path( + accessible_path, + root=workspace_root, + restrict_to_root=True, + ) + assert full_path.is_absolute() + assert full_path.is_relative_to(workspace_root) + + +def test_sanitize_path_inaccessible(inaccessible_path, workspace_root): + with pytest.raises(ValueError): + Workspace._sanitize_path( + inaccessible_path, + root=workspace_root, + restrict_to_root=True, + ) + + +def test_get_path_accessible(accessible_path, workspace_root): + workspace = Workspace(workspace_root, True) + full_path = workspace.get_path(accessible_path) + assert full_path.is_absolute() + assert full_path.is_relative_to(workspace_root) + + +def test_get_path_inaccessible(inaccessible_path, workspace_root): + workspace = Workspace(workspace_root, True) + with pytest.raises(ValueError): + workspace.get_path(inaccessible_path) diff --git a/tests/unit/test_file_operations.py b/tests/unit/test_file_operations.py index dfcde571e4..35f9b91a21 100644 --- a/tests/unit/test_file_operations.py +++ b/tests/unit/test_file_operations.py @@ -4,7 +4,6 @@ import unittest from pathlib import Path from autogpt.commands.file_operations import ( - LOG_FILE_PATH, append_to_file, check_duplicate_operation, delete_file, @@ -15,7 +14,7 @@ from autogpt.commands.file_operations import ( write_to_file, ) from autogpt.config import Config -from autogpt.workspace import path_in_workspace +from autogpt.workspace import Workspace class TestFileOperations(unittest.TestCase): @@ -24,24 +23,24 @@ class TestFileOperations(unittest.TestCase): """ def setUp(self): - self.test_file = "test_file.txt" + self.config = Config() + workspace_path = os.path.join(os.path.dirname(__file__), "workspace") + self.workspace_path = Workspace.make_workspace(workspace_path) + self.config.workspace_path = workspace_path + self.config.file_logger_path = os.path.join(workspace_path, "file_logger.txt") + self.workspace = Workspace(workspace_path, restrict_to_workspace=True) + + self.test_file = str(self.workspace.get_path("test_file.txt")) self.test_file2 = "test_file2.txt" - self.test_directory = "test_directory" + self.test_directory = str(self.workspace.get_path("test_directory")) self.file_content = "This is a test file.\n" self.file_logger_logs = "file_logger.txt" - with open(path_in_workspace(self.test_file), "w") as f: + with open(self.test_file, "w") as f: f.write(self.file_content) - if os.path.exists(LOG_FILE_PATH): - os.remove(LOG_FILE_PATH) - - def tearDown(self): - if os.path.exists(path_in_workspace(self.test_file)): - os.remove(path_in_workspace(self.test_file)) - - if os.path.exists(self.test_directory): - shutil.rmtree(self.test_directory) + def tearDown(self) -> None: + shutil.rmtree(self.workspace_path) def test_check_duplicate_operation(self): log_operation("write", self.test_file) @@ -53,9 +52,9 @@ class TestFileOperations(unittest.TestCase): os.remove(self.file_logger_logs) log_operation("log_test", self.test_file) - with open(LOG_FILE_PATH, "r") as f: + with open(self.config.file_logger_path, "r") as f: content = f.read() - self.assertIn("log_test: test_file.txt", content) + self.assertIn(f"log_test: {self.test_file}", content) # Test splitting a file into chunks def test_split_file(self): @@ -71,80 +70,59 @@ class TestFileOperations(unittest.TestCase): def test_write_to_file(self): new_content = "This is new content.\n" write_to_file(self.test_file, new_content) - with open(path_in_workspace(self.test_file), "r") as f: + with open(self.test_file, "r") as f: content = f.read() self.assertEqual(content, new_content) def test_append_to_file(self): - with open(path_in_workspace(self.test_file), "r") as f: + with open(self.test_file, "r") as f: content_before = f.read() append_text = "This is appended text.\n" append_to_file(self.test_file, append_text) - with open(path_in_workspace(self.test_file), "r") as f: + with open(self.test_file, "r") as f: content = f.read() self.assertEqual(content, content_before + append_text) def test_delete_file(self): delete_file(self.test_file) - self.assertFalse(os.path.exists(path_in_workspace(self.test_file))) + self.assertFalse(os.path.exists(self.test_file)) def test_search_files(self): # Case 1: Create files A and B, search for A, and ensure we don't return A and B - file_a = "file_a.txt" - file_b = "file_b.txt" + file_a = self.workspace.get_path("file_a.txt") + file_b = self.workspace.get_path("file_b.txt") - with open(path_in_workspace(file_a), "w") as f: + with open(file_a, "w") as f: f.write("This is file A.") - with open(path_in_workspace(file_b), "w") as f: + with open(file_b, "w") as f: f.write("This is file B.") # Create a subdirectory and place a copy of file_a in it - if not os.path.exists(path_in_workspace(self.test_directory)): - os.makedirs(path_in_workspace(self.test_directory)) + if not os.path.exists(self.test_directory): + os.makedirs(self.test_directory) - with open( - path_in_workspace(os.path.join(self.test_directory, file_a)), "w" - ) as f: + with open(os.path.join(self.test_directory, file_a.name), "w") as f: f.write("This is file A in the subdirectory.") - files = search_files(path_in_workspace("")) - self.assertIn(file_a, files) - self.assertIn(file_b, files) - self.assertIn(os.path.join(self.test_directory, file_a), files) + files = search_files(str(self.workspace.root)) + self.assertIn(file_a.name, files) + self.assertIn(file_b.name, files) + self.assertIn(f"{Path(self.test_directory).name}/{file_a.name}", files) # Clean up - os.remove(path_in_workspace(file_a)) - os.remove(path_in_workspace(file_b)) - os.remove(path_in_workspace(os.path.join(self.test_directory, file_a))) - os.rmdir(path_in_workspace(self.test_directory)) + os.remove(file_a) + os.remove(file_b) + os.remove(os.path.join(self.test_directory, file_a.name)) + os.rmdir(self.test_directory) # Case 2: Search for a file that does not exist and make sure we don't throw non_existent_file = "non_existent_file.txt" files = search_files("") self.assertNotIn(non_existent_file, files) - # Test to ensure we cannot read files out of workspace - def test_restrict_workspace(self): - CFG = Config() - with open(self.test_file2, "w+") as f: - f.write("test text") - - CFG.restrict_to_workspace = True - - # Get the absolute path of self.test_file2 - test_file2_abs_path = os.path.abspath(self.test_file2) - - with self.assertRaises(ValueError): - read_file(test_file2_abs_path) - - CFG.restrict_to_workspace = False - read_file(test_file2_abs_path) - - os.remove(test_file2_abs_path) - if __name__ == "__main__": unittest.main() diff --git a/tests/unit/test_setup.py b/tests/unit/test_setup.py index daf8524acc..c922ff9c67 100644 --- a/tests/unit/test_setup.py +++ b/tests/unit/test_setup.py @@ -35,6 +35,7 @@ class TestAutoGPT(unittest.TestCase): self.assertGreaterEqual(len(ai_config.ai_goals), 1) self.assertLessEqual(len(ai_config.ai_goals), 5) + @requires_api_key("OPENAI_API_KEY") def test_generate_aiconfig_automatic_fallback(self): user_inputs = [ "T&GF£OIBECC()!*", @@ -52,6 +53,7 @@ class TestAutoGPT(unittest.TestCase): self.assertEqual(ai_config.ai_role, "an AI designed to browse bake a cake.") self.assertEqual(ai_config.ai_goals, ["Purchase ingredients", "Bake a cake"]) + @requires_api_key("OPENAI_API_KEY") def test_prompt_user_manual_mode(self): user_inputs = [ "--manual", diff --git a/tests/utils.py b/tests/utils.py index f042974c48..8c72f1fac6 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,3 +1,4 @@ +import functools import os import pytest @@ -5,6 +6,7 @@ import pytest def requires_api_key(env_var): def decorator(func): + @functools.wraps(func) def wrapper(*args, **kwargs): if not os.environ.get(env_var): pytest.skip(