refactor: update allowed_paths default behavior in environment tools

Modified the default value of allowed_paths in BaseEnvironmentTool and EnvironmentTools to default to the current directory (".") instead of an empty list. This change enhances security by ensuring that operations are restricted to the current directory unless explicitly specified otherwise. Additionally, updated related tests to reflect this new default behavior and improved type hints for better clarity.
This commit is contained in:
lorenzejay
2026-01-08 16:33:13 -08:00
parent 4364503615
commit 9d33706fd5
5 changed files with 88 additions and 50 deletions

View File

@@ -22,8 +22,8 @@ class BaseEnvironmentTool(BaseTool):
"""
allowed_paths: list[str] = Field(
default_factory=list,
description="Restrict operations to these paths. Empty list allows all.",
default_factory=lambda: ["."],
description="Restrict operations to these paths. Defaults to current directory.",
)
def _validate_path(self, path: str) -> tuple[bool, Path | str]:

View File

@@ -48,11 +48,12 @@ class EnvironmentTools:
Args:
allowed_paths: List of paths to restrict operations to.
Empty or None allows access to all paths.
Defaults to current directory ["."] if None.
Pass empty list [] to allow all paths (not recommended).
include_grep: Whether to include GrepTool (requires grep installed).
include_search: Whether to include FileSearchTool.
"""
self.allowed_paths = allowed_paths or []
self.allowed_paths = allowed_paths if allowed_paths is not None else ["."]
self.include_grep = include_grep
self.include_search = include_search

View File

@@ -2,6 +2,8 @@
from __future__ import annotations
from pathlib import Path
from pydantic import BaseModel, Field
from crewai.experimental.environment_tools.base_environment_tool import (
@@ -67,6 +69,7 @@ Examples:
if not valid:
return f"Error: {result}"
assert isinstance(result, Path) # noqa: S101
file_path = result
# Check file exists and is a file

View File

@@ -2,6 +2,8 @@
from __future__ import annotations
from pathlib import Path
from pydantic import BaseModel, Field
from crewai.experimental.environment_tools.base_environment_tool import (
@@ -68,6 +70,7 @@ Examples:
if not valid:
return f"Error: {result}"
assert isinstance(result, Path) # noqa: S101
dir_path = result
# Check directory exists

View File

@@ -1,12 +1,16 @@
"""Tests for experimental environment tools."""
from __future__ import annotations
import os
import tempfile
from collections.abc import Generator
from pathlib import Path
import pytest
from crewai.experimental.environment_tools import (
BaseEnvironmentTool,
EnvironmentTools,
FileReadTool,
FileSearchTool,
@@ -21,7 +25,7 @@ from crewai.experimental.environment_tools import (
@pytest.fixture
def temp_dir():
def temp_dir() -> Generator[str, None, None]:
"""Create a temporary directory with test files."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create test files
@@ -41,7 +45,7 @@ def temp_dir():
@pytest.fixture
def restricted_temp_dir():
def restricted_temp_dir() -> Generator[tuple[str, str], None, None]:
"""Create two directories - one allowed, one not."""
with tempfile.TemporaryDirectory() as allowed_dir:
with tempfile.TemporaryDirectory() as forbidden_dir:
@@ -60,15 +64,21 @@ def restricted_temp_dir():
class TestBaseEnvironmentTool:
"""Tests for BaseEnvironmentTool path validation."""
def test_validate_path_no_restrictions(self, temp_dir):
"""With no allowed_paths, all paths should be allowed."""
def test_default_allowed_paths_is_current_directory(self) -> None:
"""Default allowed_paths should be current directory for security."""
tool = FileReadTool()
assert tool.allowed_paths == ["."]
def test_validate_path_explicit_no_restrictions(self, temp_dir: str) -> None:
"""With explicit empty allowed_paths, all paths should be allowed."""
tool = FileReadTool(allowed_paths=[])
valid, result = tool._validate_path(temp_dir)
assert valid is True
assert isinstance(result, Path)
def test_validate_path_within_allowed(self, temp_dir):
def test_validate_path_within_allowed(self, temp_dir: str) -> None:
"""Paths within allowed_paths should be valid."""
tool = FileReadTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
@@ -78,7 +88,7 @@ class TestBaseEnvironmentTool:
assert valid is True
assert isinstance(result, Path)
def test_validate_path_outside_allowed(self, restricted_temp_dir):
def test_validate_path_outside_allowed(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Paths outside allowed_paths should be rejected."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = FileReadTool(allowed_paths=[allowed_dir])
@@ -87,9 +97,10 @@ class TestBaseEnvironmentTool:
valid, result = tool._validate_path(forbidden_file)
assert valid is False
assert isinstance(result, str)
assert "outside allowed paths" in result
def test_format_size(self):
def test_format_size(self) -> None:
"""Test human-readable size formatting."""
tool = FileReadTool()
@@ -108,9 +119,9 @@ class TestBaseEnvironmentTool:
class TestFileReadTool:
"""Tests for FileReadTool."""
def test_read_entire_file(self, temp_dir):
def test_read_entire_file(self, temp_dir: str) -> None:
"""Should read entire file contents."""
tool = FileReadTool()
tool = FileReadTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(path=test_file)
@@ -120,9 +131,9 @@ class TestFileReadTool:
assert "Line 5" in result
assert "File:" in result # Metadata header
def test_read_with_line_range(self, temp_dir):
def test_read_with_line_range(self, temp_dir: str) -> None:
"""Should read specific line range."""
tool = FileReadTool()
tool = FileReadTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(path=test_file, start_line=2, line_count=2)
@@ -132,16 +143,16 @@ class TestFileReadTool:
# Should not include lines outside range
assert "Line 1" not in result.split("=" * 60)[-1] # Check content after header
def test_read_file_not_found(self, temp_dir):
def test_read_file_not_found(self, temp_dir: str) -> None:
"""Should return error for missing file."""
tool = FileReadTool()
tool = FileReadTool(allowed_paths=[temp_dir])
missing_file = os.path.join(temp_dir, "nonexistent.txt")
result = tool._run(path=missing_file)
assert "Error: File not found" in result
def test_read_file_path_restricted(self, restricted_temp_dir):
def test_read_file_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = FileReadTool(allowed_paths=[allowed_dir])
@@ -161,9 +172,9 @@ class TestFileReadTool:
class TestListDirTool:
"""Tests for ListDirTool."""
def test_list_directory(self, temp_dir):
def test_list_directory(self, temp_dir: str) -> None:
"""Should list directory contents."""
tool = ListDirTool()
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=temp_dir)
@@ -172,33 +183,33 @@ class TestListDirTool:
assert "subdir" in result
assert "Total:" in result
def test_list_with_pattern(self, temp_dir):
def test_list_with_pattern(self, temp_dir: str) -> None:
"""Should filter by pattern."""
tool = ListDirTool()
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=temp_dir, pattern="*.py")
assert "example.py" in result
assert "test.txt" not in result
def test_list_recursive(self, temp_dir):
def test_list_recursive(self, temp_dir: str) -> None:
"""Should list recursively when enabled."""
tool = ListDirTool()
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=temp_dir, recursive=True)
assert "nested.txt" in result
assert "another.py" in result
def test_list_nonexistent_directory(self, temp_dir):
def test_list_nonexistent_directory(self, temp_dir: str) -> None:
"""Should return error for missing directory."""
tool = ListDirTool()
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=os.path.join(temp_dir, "nonexistent"))
assert "Error: Directory not found" in result
def test_list_path_restricted(self, restricted_temp_dir):
def test_list_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = ListDirTool(allowed_paths=[allowed_dir])
@@ -217,9 +228,9 @@ class TestListDirTool:
class TestGrepTool:
"""Tests for GrepTool."""
def test_grep_finds_pattern(self, temp_dir):
def test_grep_finds_pattern(self, temp_dir: str) -> None:
"""Should find matching patterns."""
tool = GrepTool()
tool = GrepTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(pattern="Line 2", path=test_file)
@@ -227,33 +238,33 @@ class TestGrepTool:
assert "Line 2" in result
assert "matches" in result.lower() or "found" in result.lower()
def test_grep_no_matches(self, temp_dir):
def test_grep_no_matches(self, temp_dir: str) -> None:
"""Should report when no matches found."""
tool = GrepTool()
tool = GrepTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(pattern="nonexistent pattern xyz", path=test_file)
assert "No matches found" in result
def test_grep_recursive(self, temp_dir):
def test_grep_recursive(self, temp_dir: str) -> None:
"""Should search recursively in directories."""
tool = GrepTool()
tool = GrepTool(allowed_paths=[temp_dir])
result = tool._run(pattern="Nested", path=temp_dir, recursive=True)
assert "Nested" in result
def test_grep_case_insensitive(self, temp_dir):
def test_grep_case_insensitive(self, temp_dir: str) -> None:
"""Should support case-insensitive search."""
tool = GrepTool()
tool = GrepTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(pattern="LINE", path=test_file, ignore_case=True)
assert "Line" in result or "matches" in result.lower()
def test_grep_path_restricted(self, restricted_temp_dir):
def test_grep_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = GrepTool(allowed_paths=[allowed_dir])
@@ -272,26 +283,26 @@ class TestGrepTool:
class TestFileSearchTool:
"""Tests for FileSearchTool."""
def test_find_files_by_pattern(self, temp_dir):
def test_find_files_by_pattern(self, temp_dir: str) -> None:
"""Should find files matching pattern."""
tool = FileSearchTool()
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*.py", path=temp_dir)
assert "example.py" in result
assert "another.py" in result
def test_find_no_matches(self, temp_dir):
def test_find_no_matches(self, temp_dir: str) -> None:
"""Should report when no files match."""
tool = FileSearchTool()
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*.xyz", path=temp_dir)
assert "No" in result and "found" in result
def test_find_files_only(self, temp_dir):
def test_find_files_only(self, temp_dir: str) -> None:
"""Should filter to files only."""
tool = FileSearchTool()
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*", path=temp_dir, file_type="file")
@@ -300,15 +311,15 @@ class TestFileSearchTool:
# Directories should have trailing slash in output
# Check that subdir is not listed as a file
def test_find_dirs_only(self, temp_dir):
def test_find_dirs_only(self, temp_dir: str) -> None:
"""Should filter to directories only."""
tool = FileSearchTool()
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*", path=temp_dir, file_type="dir")
assert "subdir" in result
def test_find_path_restricted(self, restricted_temp_dir):
def test_find_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = FileSearchTool(allowed_paths=[allowed_dir])
@@ -327,7 +338,26 @@ class TestFileSearchTool:
class TestEnvironmentTools:
"""Tests for EnvironmentTools manager class."""
def test_returns_all_tools_by_default(self):
def test_default_allowed_paths_is_current_directory(self) -> None:
"""Default should restrict to current directory for security."""
env_tools = EnvironmentTools()
tools = env_tools.tools()
# All tools should default to current directory
for tool in tools:
assert isinstance(tool, BaseEnvironmentTool)
assert tool.allowed_paths == ["."]
def test_explicit_empty_allowed_paths_allows_all(self) -> None:
"""Passing empty list should allow all paths."""
env_tools = EnvironmentTools(allowed_paths=[])
tools = env_tools.tools()
for tool in tools:
assert isinstance(tool, BaseEnvironmentTool)
assert tool.allowed_paths == []
def test_returns_all_tools_by_default(self) -> None:
"""Should return all four tools by default."""
env_tools = EnvironmentTools()
tools = env_tools.tools()
@@ -340,7 +370,7 @@ class TestEnvironmentTools:
assert "grep_search" in tool_names
assert "find_files" in tool_names
def test_exclude_grep(self):
def test_exclude_grep(self) -> None:
"""Should exclude grep tool when disabled."""
env_tools = EnvironmentTools(include_grep=False)
tools = env_tools.tools()
@@ -349,7 +379,7 @@ class TestEnvironmentTools:
tool_names = [t.name for t in tools]
assert "grep_search" not in tool_names
def test_exclude_search(self):
def test_exclude_search(self) -> None:
"""Should exclude search tool when disabled."""
env_tools = EnvironmentTools(include_search=False)
tools = env_tools.tools()
@@ -358,15 +388,16 @@ class TestEnvironmentTools:
tool_names = [t.name for t in tools]
assert "find_files" not in tool_names
def test_allowed_paths_propagated(self, temp_dir):
def test_allowed_paths_propagated(self, temp_dir: str) -> None:
"""Should propagate allowed_paths to all tools."""
env_tools = EnvironmentTools(allowed_paths=[temp_dir])
tools = env_tools.tools()
for tool in tools:
assert isinstance(tool, BaseEnvironmentTool)
assert tool.allowed_paths == [temp_dir]
def test_tools_are_base_tool_instances(self):
def test_tools_are_base_tool_instances(self) -> None:
"""All returned tools should be BaseTool instances."""
from crewai.tools.base_tool import BaseTool