mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
Add workspace abstraction (#2982)
* Add workspace abstraction * Remove old workspace implementation * Extract path resolution to a helper function * Add api key requirements to new tests
This commit is contained in:
@@ -9,6 +9,7 @@ from autogpt.logs import logger, print_assistant_thoughts
|
||||
from autogpt.speech import say_text
|
||||
from autogpt.spinner import Spinner
|
||||
from autogpt.utils import clean_input
|
||||
from autogpt.workspace import Workspace
|
||||
|
||||
|
||||
class Agent:
|
||||
@@ -50,7 +51,9 @@ class Agent:
|
||||
config,
|
||||
system_prompt,
|
||||
triggering_prompt,
|
||||
workspace_directory,
|
||||
):
|
||||
cfg = Config()
|
||||
self.ai_name = ai_name
|
||||
self.memory = memory
|
||||
self.full_message_history = full_message_history
|
||||
@@ -59,6 +62,7 @@ class Agent:
|
||||
self.config = config
|
||||
self.system_prompt = system_prompt
|
||||
self.triggering_prompt = triggering_prompt
|
||||
self.workspace = Workspace(workspace_directory, cfg.restrict_to_workspace)
|
||||
|
||||
def start_interaction_loop(self):
|
||||
# Interaction Loop
|
||||
@@ -107,6 +111,8 @@ class Agent:
|
||||
command_name, arguments = get_command(assistant_reply_json)
|
||||
if cfg.speak_mode:
|
||||
say_text(f"I want to execute {command_name}")
|
||||
arguments = self._resolve_pathlike_command_args(arguments)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error: \n", str(e))
|
||||
|
||||
@@ -226,3 +232,14 @@ class Agent:
|
||||
logger.typewriter_log(
|
||||
"SYSTEM: ", Fore.YELLOW, "Unable to execute command"
|
||||
)
|
||||
|
||||
def _resolve_pathlike_command_args(self, command_args):
|
||||
if "directory" in command_args and command_args["directory"] in {"", "/"}:
|
||||
command_args["directory"] = str(self.workspace.root)
|
||||
else:
|
||||
for pathlike in ["filename", "directory", "clone_path"]:
|
||||
if pathlike in command_args:
|
||||
command_args[pathlike] = str(
|
||||
self.workspace.get_path(command_args[pathlike])
|
||||
)
|
||||
return command_args
|
||||
|
||||
@@ -47,6 +47,14 @@ import click
|
||||
is_flag=True,
|
||||
help="Specifies whether to suppress the output of latest news on startup.",
|
||||
)
|
||||
@click.option(
|
||||
# TODO: this is a hidden option for now, necessary for integration testing.
|
||||
# We should make this public once we're ready to roll out agent specific workspaces.
|
||||
"--workspace-directory",
|
||||
"-w",
|
||||
type=click.Path(),
|
||||
hidden=True,
|
||||
)
|
||||
@click.pass_context
|
||||
def main(
|
||||
ctx: click.Context,
|
||||
@@ -62,6 +70,7 @@ def main(
|
||||
browser_name: str,
|
||||
allow_downloads: bool,
|
||||
skip_news: bool,
|
||||
workspace_directory: str,
|
||||
) -> None:
|
||||
"""
|
||||
Welcome to AutoGPT an experimental open-source application showcasing the capabilities of the GPT-4 pushing the boundaries of AI.
|
||||
@@ -71,6 +80,7 @@ def main(
|
||||
# Put imports inside function to avoid importing everything when starting the CLI
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from colorama import Fore
|
||||
|
||||
@@ -83,6 +93,7 @@ def main(
|
||||
from autogpt.plugins import scan_plugins
|
||||
from autogpt.prompts.prompt import construct_main_ai_config
|
||||
from autogpt.utils import get_current_git_branch, get_latest_bulletin
|
||||
from autogpt.workspace import Workspace
|
||||
|
||||
if ctx.invoked_subcommand is None:
|
||||
cfg = Config()
|
||||
@@ -103,7 +114,6 @@ def main(
|
||||
skip_news,
|
||||
)
|
||||
logger.set_level(logging.DEBUG if cfg.debug_mode else logging.INFO)
|
||||
ai_name = ""
|
||||
if not cfg.skip_news:
|
||||
motd = get_latest_bulletin()
|
||||
if motd:
|
||||
@@ -126,7 +136,6 @@ def main(
|
||||
"Please consider upgrading to Python 3.10 or higher.",
|
||||
)
|
||||
|
||||
cfg = Config()
|
||||
cfg.set_plugins(scan_plugins(cfg, cfg.debug_mode))
|
||||
# Create a CommandRegistry instance and scan default folder
|
||||
command_registry = CommandRegistry()
|
||||
@@ -142,6 +151,7 @@ def main(
|
||||
command_registry.import_commands("autogpt.commands.web_selenium")
|
||||
command_registry.import_commands("autogpt.commands.write_tests")
|
||||
command_registry.import_commands("autogpt.app")
|
||||
|
||||
ai_name = ""
|
||||
ai_config = construct_main_ai_config()
|
||||
ai_config.command_registry = command_registry
|
||||
@@ -164,6 +174,27 @@ def main(
|
||||
system_prompt = ai_config.construct_full_prompt()
|
||||
if cfg.debug_mode:
|
||||
logger.typewriter_log("Prompt:", Fore.GREEN, system_prompt)
|
||||
|
||||
# TODO: have this directory live outside the repository (e.g. in a user's
|
||||
# home directory) and have it come in as a command line argument or part of
|
||||
# the env file.
|
||||
if workspace_directory is None:
|
||||
workspace_directory = Path(__file__).parent / "auto_gpt_workspace"
|
||||
else:
|
||||
workspace_directory = Path(workspace_directory)
|
||||
# TODO: pass in the ai_settings file and the env file and have them cloned into
|
||||
# the workspace directory so we can bind them to the agent.
|
||||
workspace_directory = Workspace.make_workspace(workspace_directory)
|
||||
cfg.workspace_path = str(workspace_directory)
|
||||
|
||||
# HACK: doing this here to collect some globals that depend on the workspace.
|
||||
file_logger_path = workspace_directory / "file_logger.txt"
|
||||
if not file_logger_path.exists():
|
||||
with file_logger_path.open(mode="w", encoding="utf-8") as f:
|
||||
f.write("File Operation Logger ")
|
||||
|
||||
cfg.file_logger_path = str(file_logger_path)
|
||||
|
||||
agent = Agent(
|
||||
ai_name=ai_name,
|
||||
memory=memory,
|
||||
@@ -173,6 +204,7 @@ def main(
|
||||
config=ai_config,
|
||||
system_prompt=system_prompt,
|
||||
triggering_prompt=triggering_prompt,
|
||||
workspace_directory=workspace_directory,
|
||||
)
|
||||
agent.start_interaction_loop()
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@ import requests
|
||||
|
||||
from autogpt.commands.command import command
|
||||
from autogpt.config import Config
|
||||
from autogpt.workspace import path_in_workspace
|
||||
|
||||
CFG = Config()
|
||||
|
||||
@@ -22,13 +21,12 @@ def read_audio_from_file(filename: str) -> str:
|
||||
Convert audio to text.
|
||||
|
||||
Args:
|
||||
audio_path (str): The path to the audio file
|
||||
filename (str): The path to the audio file
|
||||
|
||||
Returns:
|
||||
str: The text from the audio
|
||||
"""
|
||||
audio_path = path_in_workspace(filename)
|
||||
with open(audio_path, "rb") as audio_file:
|
||||
with open(filename, "rb") as audio_file:
|
||||
audio = audio_file.read()
|
||||
return read_audio(audio)
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import functools
|
||||
import importlib
|
||||
import inspect
|
||||
from typing import Any, Callable, Optional
|
||||
@@ -142,12 +143,14 @@ def command(
|
||||
disabled_reason=disabled_reason,
|
||||
)
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs) -> Any:
|
||||
return func(*args, **kwargs)
|
||||
|
||||
wrapper.command = cmd
|
||||
|
||||
setattr(wrapper, AUTO_GPT_COMMAND_IDENTIFIER, True)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
@@ -7,7 +7,6 @@ from docker.errors import ImageNotFound
|
||||
|
||||
from autogpt.commands.command import command
|
||||
from autogpt.config import Config
|
||||
from autogpt.workspace import WORKSPACE_PATH, path_in_workspace
|
||||
|
||||
CFG = Config()
|
||||
|
||||
@@ -22,20 +21,17 @@ def execute_python_file(filename: str) -> str:
|
||||
Returns:
|
||||
str: The output of the file
|
||||
"""
|
||||
file = filename
|
||||
print(f"Executing file '{file}' in workspace '{WORKSPACE_PATH}'")
|
||||
print(f"Executing file '{filename}'")
|
||||
|
||||
if not file.endswith(".py"):
|
||||
if not filename.endswith(".py"):
|
||||
return "Error: Invalid file type. Only .py files are allowed."
|
||||
|
||||
file_path = path_in_workspace(file)
|
||||
|
||||
if not os.path.isfile(file_path):
|
||||
return f"Error: File '{file}' does not exist."
|
||||
if not os.path.isfile(filename):
|
||||
return f"Error: File '{filename}' does not exist."
|
||||
|
||||
if we_are_running_in_a_docker_container():
|
||||
result = subprocess.run(
|
||||
f"python {file_path}", capture_output=True, encoding="utf8", shell=True
|
||||
f"python {filename}", capture_output=True, encoding="utf8", shell=True
|
||||
)
|
||||
if result.returncode == 0:
|
||||
return result.stdout
|
||||
@@ -67,9 +63,9 @@ def execute_python_file(filename: str) -> str:
|
||||
|
||||
container = client.containers.run(
|
||||
image_name,
|
||||
f"python {file}",
|
||||
f"python {filename}",
|
||||
volumes={
|
||||
os.path.abspath(WORKSPACE_PATH): {
|
||||
CFG.workspace_path: {
|
||||
"bind": "/workspace",
|
||||
"mode": "ro",
|
||||
}
|
||||
@@ -126,8 +122,8 @@ def execute_shell(command_line: str) -> str:
|
||||
)
|
||||
current_dir = os.getcwd()
|
||||
# Change dir into workspace if necessary
|
||||
if str(WORKSPACE_PATH) not in current_dir:
|
||||
os.chdir(WORKSPACE_PATH)
|
||||
if CFG.workspace_path not in current_dir:
|
||||
os.chdir(CFG.workspace_path)
|
||||
|
||||
print(f"Executing command '{command_line}' in working directory '{os.getcwd()}'")
|
||||
|
||||
@@ -160,8 +156,8 @@ def execute_shell_popen(command_line) -> str:
|
||||
"""
|
||||
current_dir = os.getcwd()
|
||||
# Change dir into workspace if necessary
|
||||
if str(WORKSPACE_PATH) not in current_dir:
|
||||
os.chdir(WORKSPACE_PATH)
|
||||
if CFG.workspace_path not in current_dir:
|
||||
os.chdir(CFG.workspace_path)
|
||||
|
||||
print(f"Executing command '{command_line}' in working directory '{os.getcwd()}'")
|
||||
|
||||
|
||||
@@ -13,11 +13,8 @@ from autogpt.commands.command import command
|
||||
from autogpt.config import Config
|
||||
from autogpt.spinner import Spinner
|
||||
from autogpt.utils import readable_file_size
|
||||
from autogpt.workspace import WORKSPACE_PATH, path_in_workspace
|
||||
|
||||
CFG = Config()
|
||||
LOG_FILE = "file_logger.txt"
|
||||
LOG_FILE_PATH = WORKSPACE_PATH / LOG_FILE
|
||||
|
||||
|
||||
def check_duplicate_operation(operation: str, filename: str) -> bool:
|
||||
@@ -30,7 +27,7 @@ def check_duplicate_operation(operation: str, filename: str) -> bool:
|
||||
Returns:
|
||||
bool: True if the operation has already been performed on the file
|
||||
"""
|
||||
log_content = read_file(LOG_FILE)
|
||||
log_content = read_file(CFG.file_logger_path)
|
||||
log_entry = f"{operation}: {filename}\n"
|
||||
return log_entry in log_content
|
||||
|
||||
@@ -43,12 +40,7 @@ def log_operation(operation: str, filename: str) -> None:
|
||||
filename (str): The name of the file the operation was performed on
|
||||
"""
|
||||
log_entry = f"{operation}: {filename}\n"
|
||||
|
||||
# Create the log file if it doesn't exist
|
||||
if not os.path.exists(LOG_FILE_PATH):
|
||||
with open(LOG_FILE_PATH, "w", encoding="utf-8") as f:
|
||||
f.write("File Operation Logger ")
|
||||
append_to_file(str(LOG_FILE_PATH), log_entry, should_log=False)
|
||||
append_to_file(CFG.file_logger_path, log_entry, should_log=False)
|
||||
|
||||
|
||||
def split_file(
|
||||
@@ -93,9 +85,8 @@ def read_file(filename: str) -> str:
|
||||
Returns:
|
||||
str: The contents of the file
|
||||
"""
|
||||
filepath = path_in_workspace(filename)
|
||||
try:
|
||||
with open(filepath, "r", encoding="utf-8") as f:
|
||||
with open(filename, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
return content
|
||||
except Exception as e:
|
||||
@@ -150,11 +141,10 @@ def write_to_file(filename: str, text: str) -> str:
|
||||
if check_duplicate_operation("write", filename):
|
||||
return "Error: File has already been updated."
|
||||
try:
|
||||
filepath = path_in_workspace(filename)
|
||||
directory = os.path.dirname(filepath)
|
||||
directory = os.path.dirname(filename)
|
||||
if not os.path.exists(directory):
|
||||
os.makedirs(directory)
|
||||
with open(filepath, "w", encoding="utf-8") as f:
|
||||
with open(filename, "w", encoding="utf-8") as f:
|
||||
f.write(text)
|
||||
log_operation("write", filename)
|
||||
return "File written to successfully."
|
||||
@@ -177,8 +167,7 @@ def append_to_file(filename: str, text: str, should_log: bool = True) -> str:
|
||||
str: A message indicating success or failure
|
||||
"""
|
||||
try:
|
||||
filepath = path_in_workspace(filename)
|
||||
with open(filepath, "a") as f:
|
||||
with open(filename, "a") as f:
|
||||
f.write(text)
|
||||
|
||||
if should_log:
|
||||
@@ -202,8 +191,7 @@ def delete_file(filename: str) -> str:
|
||||
if check_duplicate_operation("delete", filename):
|
||||
return "Error: File has already been deleted."
|
||||
try:
|
||||
filepath = path_in_workspace(filename)
|
||||
os.remove(filepath)
|
||||
os.remove(filename)
|
||||
log_operation("delete", filename)
|
||||
return "File deleted successfully."
|
||||
except Exception as e:
|
||||
@@ -222,16 +210,13 @@ def search_files(directory: str) -> list[str]:
|
||||
"""
|
||||
found_files = []
|
||||
|
||||
if directory in {"", "/"}:
|
||||
search_directory = WORKSPACE_PATH
|
||||
else:
|
||||
search_directory = path_in_workspace(directory)
|
||||
|
||||
for root, _, files in os.walk(search_directory):
|
||||
for root, _, files in os.walk(directory):
|
||||
for file in files:
|
||||
if file.startswith("."):
|
||||
continue
|
||||
relative_path = os.path.relpath(os.path.join(root, file), WORKSPACE_PATH)
|
||||
relative_path = os.path.relpath(
|
||||
os.path.join(root, file), CFG.workspace_path
|
||||
)
|
||||
found_files.append(relative_path)
|
||||
|
||||
return found_files
|
||||
@@ -250,7 +235,6 @@ def download_file(url, filename):
|
||||
url (str): URL of the file to download
|
||||
filename (str): Filename to save the file as
|
||||
"""
|
||||
safe_filename = path_in_workspace(filename)
|
||||
try:
|
||||
message = f"{Fore.YELLOW}Downloading file from {Back.LIGHTBLUE_EX}{url}{Back.RESET}{Fore.RESET}"
|
||||
with Spinner(message) as spinner:
|
||||
@@ -268,7 +252,7 @@ def download_file(url, filename):
|
||||
total_size = int(r.headers.get("Content-Length", 0))
|
||||
downloaded_size = 0
|
||||
|
||||
with open(safe_filename, "wb") as f:
|
||||
with open(filename, "wb") as f:
|
||||
for chunk in r.iter_content(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
downloaded_size += len(chunk)
|
||||
|
||||
@@ -3,7 +3,6 @@ from git.repo import Repo
|
||||
|
||||
from autogpt.commands.command import command
|
||||
from autogpt.config import Config
|
||||
from autogpt.workspace import path_in_workspace
|
||||
|
||||
CFG = Config()
|
||||
|
||||
@@ -27,9 +26,8 @@ def clone_repository(repository_url: str, clone_path: str) -> str:
|
||||
"""
|
||||
split_url = repository_url.split("//")
|
||||
auth_repo_url = f"//{CFG.github_username}:{CFG.github_api_key}@".join(split_url)
|
||||
safe_clone_path = path_in_workspace(clone_path)
|
||||
try:
|
||||
Repo.clone_from(auth_repo_url, safe_clone_path)
|
||||
return f"""Cloned {repository_url} to {safe_clone_path}"""
|
||||
Repo.clone_from(auth_repo_url, clone_path)
|
||||
return f"""Cloned {repository_url} to {clone_path}"""
|
||||
except Exception as e:
|
||||
return f"Error: {str(e)}"
|
||||
|
||||
@@ -9,7 +9,6 @@ from PIL import Image
|
||||
|
||||
from autogpt.commands.command import command
|
||||
from autogpt.config import Config
|
||||
from autogpt.workspace import path_in_workspace
|
||||
|
||||
CFG = Config()
|
||||
|
||||
@@ -25,7 +24,7 @@ def generate_image(prompt: str, size: int = 256) -> str:
|
||||
Returns:
|
||||
str: The filename of the image
|
||||
"""
|
||||
filename = f"{str(uuid.uuid4())}.jpg"
|
||||
filename = f"{CFG.workspace_path}/{str(uuid.uuid4())}.jpg"
|
||||
|
||||
# DALL-E
|
||||
if CFG.image_provider == "dalle":
|
||||
@@ -72,7 +71,7 @@ def generate_image_with_hf(prompt: str, filename: str) -> str:
|
||||
image = Image.open(io.BytesIO(response.content))
|
||||
print(f"Image Generated for prompt:{prompt}")
|
||||
|
||||
image.save(path_in_workspace(filename))
|
||||
image.save(filename)
|
||||
|
||||
return f"Saved to disk:{filename}"
|
||||
|
||||
@@ -109,7 +108,7 @@ def generate_image_with_dalle(prompt: str, filename: str, size: int) -> str:
|
||||
|
||||
image_data = b64decode(response["data"][0]["b64_json"])
|
||||
|
||||
with open(path_in_workspace(filename), mode="wb") as png:
|
||||
with open(filename, mode="wb") as png:
|
||||
png.write(image_data)
|
||||
|
||||
return f"Saved to disk:{filename}"
|
||||
@@ -160,6 +159,6 @@ def generate_image_with_sd_webui(
|
||||
response = response.json()
|
||||
b64 = b64decode(response["images"][0].split(",", 1)[0])
|
||||
image = Image.open(io.BytesIO(b64))
|
||||
image.save(path_in_workspace(filename))
|
||||
image.save(filename)
|
||||
|
||||
return f"Saved to disk:{filename}"
|
||||
|
||||
@@ -20,6 +20,9 @@ class Config(metaclass=Singleton):
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize the Config class"""
|
||||
self.workspace_path = None
|
||||
self.file_logger_path = None
|
||||
|
||||
self.debug_mode = False
|
||||
self.continuous_mode = False
|
||||
self.continuous_limit = 0
|
||||
|
||||
@@ -1,48 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from autogpt.config import Config
|
||||
|
||||
CFG = Config()
|
||||
|
||||
# Set a dedicated folder for file I/O
|
||||
WORKSPACE_PATH = Path(os.getcwd()) / "auto_gpt_workspace"
|
||||
|
||||
# Create the directory if it doesn't exist
|
||||
if not os.path.exists(WORKSPACE_PATH):
|
||||
os.makedirs(WORKSPACE_PATH)
|
||||
|
||||
|
||||
def path_in_workspace(relative_path: str | Path) -> Path:
|
||||
"""Get full path for item in workspace
|
||||
|
||||
Parameters:
|
||||
relative_path (str | Path): Path to translate into the workspace
|
||||
|
||||
Returns:
|
||||
Path: Absolute path for the given path in the workspace
|
||||
"""
|
||||
return safe_path_join(WORKSPACE_PATH, relative_path)
|
||||
|
||||
|
||||
def safe_path_join(base: Path, *paths: str | Path) -> Path:
|
||||
"""Join one or more path components, asserting the resulting path is within the workspace.
|
||||
|
||||
Args:
|
||||
base (Path): The base path
|
||||
*paths (str): The paths to join to the base path
|
||||
|
||||
Returns:
|
||||
Path: The joined path
|
||||
"""
|
||||
base = base.resolve()
|
||||
joined_path = base.joinpath(*paths).resolve()
|
||||
|
||||
if CFG.restrict_to_workspace and not joined_path.is_relative_to(base):
|
||||
raise ValueError(
|
||||
f"Attempted to access path '{joined_path}' outside of workspace '{base}'."
|
||||
)
|
||||
|
||||
return joined_path
|
||||
5
autogpt/workspace/__init__.py
Normal file
5
autogpt/workspace/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from autogpt.workspace.workspace import Workspace
|
||||
|
||||
__all__ = [
|
||||
"Workspace",
|
||||
]
|
||||
120
autogpt/workspace/workspace.py
Normal file
120
autogpt/workspace/workspace.py
Normal file
@@ -0,0 +1,120 @@
|
||||
"""
|
||||
=========
|
||||
Workspace
|
||||
=========
|
||||
|
||||
The workspace is a directory containing configuration and working files for an AutoGPT
|
||||
agent.
|
||||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class Workspace:
|
||||
"""A class that represents a workspace for an AutoGPT agent."""
|
||||
|
||||
def __init__(self, workspace_root: str | Path, restrict_to_workspace: bool):
|
||||
self._root = self._sanitize_path(workspace_root)
|
||||
self._restrict_to_workspace = restrict_to_workspace
|
||||
|
||||
@property
|
||||
def root(self) -> Path:
|
||||
"""The root directory of the workspace."""
|
||||
return self._root
|
||||
|
||||
@property
|
||||
def restrict_to_workspace(self):
|
||||
"""Whether to restrict generated paths to the workspace."""
|
||||
return self._restrict_to_workspace
|
||||
|
||||
@classmethod
|
||||
def make_workspace(cls, workspace_directory: str | Path, *args, **kwargs) -> Path:
|
||||
"""Create a workspace directory and return the path to it.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
workspace_directory
|
||||
The path to the workspace directory.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Path
|
||||
The path to the workspace directory.
|
||||
|
||||
"""
|
||||
# TODO: have this make the env file and ai settings file in the directory.
|
||||
workspace_directory = cls._sanitize_path(workspace_directory)
|
||||
workspace_directory.mkdir(exist_ok=True, parents=True)
|
||||
return workspace_directory
|
||||
|
||||
def get_path(self, relative_path: str | Path) -> Path:
|
||||
"""Get the full path for an item in the workspace.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
relative_path
|
||||
The relative path to resolve in the workspace.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Path
|
||||
The resolved path relative to the workspace.
|
||||
|
||||
"""
|
||||
return self._sanitize_path(
|
||||
relative_path,
|
||||
root=self.root,
|
||||
restrict_to_root=self.restrict_to_workspace,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _sanitize_path(
|
||||
relative_path: str | Path,
|
||||
root: str | Path = None,
|
||||
restrict_to_root: bool = True,
|
||||
) -> Path:
|
||||
"""Resolve the relative path within the given root if possible.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
relative_path
|
||||
The relative path to resolve.
|
||||
root
|
||||
The root path to resolve the relative path within.
|
||||
restrict_to_root
|
||||
Whether to restrict the path to the root.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Path
|
||||
The resolved path.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If the path is absolute and a root is provided.
|
||||
ValueError
|
||||
If the path is outside the root and the root is restricted.
|
||||
|
||||
"""
|
||||
|
||||
if root is None:
|
||||
return Path(relative_path).resolve()
|
||||
|
||||
root, relative_path = Path(root), Path(relative_path)
|
||||
|
||||
if relative_path.is_absolute():
|
||||
raise ValueError(
|
||||
f"Attempted to access absolute path '{relative_path}' in workspace '{root}'."
|
||||
)
|
||||
|
||||
full_path = root.joinpath(relative_path).resolve()
|
||||
|
||||
if restrict_to_root and not full_path.is_relative_to(root):
|
||||
raise ValueError(
|
||||
f"Attempted to access path '{full_path}' outside of workspace '{root}'."
|
||||
)
|
||||
|
||||
return full_path
|
||||
Reference in New Issue
Block a user