mirror of
https://github.com/crewAIInc/crewAI-examples.git
synced 2026-01-10 06:17:58 -05:00
fix: implement comprehensive path validation to prevent traversal attacks
- Add robust path sanitization in file_tools.py write_file function - Implement safe file reading with boundary validation in crew.py - Secure template operations with input validation in template_tools.py - Use Path.resolve() to normalize paths and prevent directory escaping - Add character whitelisting and file extension restrictions - Implement proper error handling without information disclosure Prevents arbitrary file access and directory traversal vulnerabilities that could allow reading/writing files outside intended boundaries
This commit is contained in:
@@ -221,19 +221,61 @@ class LandingPageCrew():
|
||||
return json.loads(result)
|
||||
|
||||
def runCreateContentCrew(self,components, expanded_idea):
|
||||
from pathlib import Path
|
||||
import re
|
||||
|
||||
# Establish safe working directory
|
||||
workdir = Path("./workdir").resolve()
|
||||
|
||||
for component_path in components:
|
||||
file_content = open(
|
||||
f"./workdir/{component_path.split('./')[-1]}",
|
||||
"r"
|
||||
).read()
|
||||
inputs3={
|
||||
"component": component_path,
|
||||
"expanded_idea": expanded_idea,
|
||||
"file_content": file_content
|
||||
}
|
||||
try:
|
||||
# Validate component_path
|
||||
if not isinstance(component_path, str):
|
||||
print(f"Warning: Skipping invalid component path: {component_path}")
|
||||
continue
|
||||
|
||||
# Extract filename safely
|
||||
filename = component_path.split('./')[-1]
|
||||
|
||||
# Validate filename contains only safe characters
|
||||
if not re.match(r'^[a-zA-Z0-9._\-]+$', filename):
|
||||
print(f"Warning: Skipping component with invalid filename: {filename}")
|
||||
continue
|
||||
|
||||
# Validate the filename doesn't contain path traversal
|
||||
if ".." in filename or "/" in filename:
|
||||
print(f"Warning: Skipping component with unsafe filename: {filename}")
|
||||
continue
|
||||
|
||||
# Create safe file path
|
||||
file_path = workdir / filename
|
||||
|
||||
# Resolve and validate the path is within workdir
|
||||
resolved_path = file_path.resolve()
|
||||
if not str(resolved_path).startswith(str(workdir)):
|
||||
print(f"Warning: Skipping component outside workdir: {filename}")
|
||||
continue
|
||||
|
||||
# Check if file exists before reading
|
||||
if not resolved_path.exists():
|
||||
print(f"Warning: Component file does not exist: {resolved_path}")
|
||||
continue
|
||||
|
||||
# Read file content safely
|
||||
with open(resolved_path, "r", encoding="utf-8") as f:
|
||||
file_content = f.read()
|
||||
|
||||
inputs3={
|
||||
"component": component_path,
|
||||
"expanded_idea": expanded_idea,
|
||||
"file_content": file_content
|
||||
}
|
||||
|
||||
CreateContentCrew().crew().kickoff(inputs=inputs3)
|
||||
CreateContentCrew().crew().kickoff(inputs=inputs3)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing component {component_path}: {str(e)}")
|
||||
continue
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
from langchain.tools import tool
|
||||
import os
|
||||
from pathlib import Path
|
||||
import re
|
||||
|
||||
|
||||
class FileTools():
|
||||
@@ -14,12 +17,66 @@ class FileTools():
|
||||
Replace REACT_COMPONENT_CODE_PLACEHOLDER with the actual
|
||||
code you want to write to the file."""
|
||||
try:
|
||||
path, content = data.split("|")
|
||||
path = path.replace("\n", "").replace(" ", "").replace("`", "")
|
||||
if not path.startswith("./workdir"):
|
||||
path = f"./workdir/{path}"
|
||||
with open(path, "w") as f:
|
||||
# Split the input data
|
||||
if "|" not in data:
|
||||
return "Error: Input must contain a pipe (|) separator between path and content."
|
||||
|
||||
path, content = data.split("|", 1) # Split only on first pipe
|
||||
|
||||
# Clean and validate the path
|
||||
path = path.strip().replace("\n", "").replace(" ", "").replace("`", "")
|
||||
|
||||
# Validate path contains only safe characters
|
||||
if not re.match(r'^[a-zA-Z0-9._/\-]+$', path):
|
||||
return "Error: Path contains invalid characters. Only alphanumeric, dots, slashes, and hyphens are allowed."
|
||||
|
||||
# Establish the safe working directory
|
||||
workdir = Path("./workdir").resolve()
|
||||
|
||||
# Handle path normalization
|
||||
if path.startswith("./workdir/"):
|
||||
# Remove the ./workdir/ prefix to get relative path
|
||||
relative_path = path[10:]
|
||||
elif path.startswith("./"):
|
||||
# Remove ./ prefix
|
||||
relative_path = path[2:]
|
||||
elif path.startswith("/"):
|
||||
return "Error: Absolute paths are not allowed."
|
||||
else:
|
||||
relative_path = path
|
||||
|
||||
# Validate the relative path doesn't contain traversal attempts
|
||||
if ".." in relative_path or relative_path.startswith("/"):
|
||||
return "Error: Path traversal detected. Relative paths with '..' are not allowed."
|
||||
|
||||
# Create the full safe path
|
||||
target_path = workdir / relative_path
|
||||
|
||||
# Resolve the path and ensure it's still within workdir
|
||||
try:
|
||||
resolved_path = target_path.resolve()
|
||||
if not str(resolved_path).startswith(str(workdir)):
|
||||
return "Error: Path resolves outside of allowed working directory."
|
||||
except Exception:
|
||||
return "Error: Invalid path resolution."
|
||||
|
||||
# Validate file extension (security: prevent writing to system files)
|
||||
allowed_extensions = {'.jsx', '.js', '.tsx', '.ts', '.css', '.scss', '.html', '.json', '.md', '.txt', '.yaml', '.yml'}
|
||||
if resolved_path.suffix.lower() not in allowed_extensions:
|
||||
return f"Error: File extension '{resolved_path.suffix}' not allowed. Allowed extensions: {', '.join(allowed_extensions)}"
|
||||
|
||||
# Create parent directories if they don't exist
|
||||
resolved_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Write the file
|
||||
with open(resolved_path, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
return f"File written to {path}."
|
||||
except Exception:
|
||||
return "Error with the input format for the tool."
|
||||
|
||||
return f"File written to {resolved_path}."
|
||||
|
||||
except ValueError as e:
|
||||
return f"Error: {str(e)}"
|
||||
except PermissionError:
|
||||
return "Error: Permission denied. Cannot write to the specified path."
|
||||
except Exception as e:
|
||||
return f"Error: {str(e)}"
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import json
|
||||
import shutil
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
from langchain.tools import tool
|
||||
@@ -10,16 +11,87 @@ class TemplateTools():
|
||||
@tool("Learn landing page options")
|
||||
def learn_landing_page_options(input):
|
||||
"""Learn the templates at your disposal"""
|
||||
templates = json.load(open("config/templates.json"))
|
||||
return json.dumps(templates, indent=2)
|
||||
try:
|
||||
# Safely read the templates configuration file
|
||||
config_path = Path("config/templates.json").resolve()
|
||||
|
||||
# Validate the config file exists and is readable
|
||||
if not config_path.exists():
|
||||
return "Error: Templates configuration file not found."
|
||||
|
||||
# Check if the path is within expected boundaries
|
||||
if not str(config_path).endswith("config/templates.json"):
|
||||
return "Error: Invalid configuration file path."
|
||||
|
||||
with open(config_path, "r", encoding="utf-8") as f:
|
||||
templates = json.load(f)
|
||||
|
||||
return json.dumps(templates, indent=2)
|
||||
except Exception as e:
|
||||
return f"Error reading templates configuration: {str(e)}"
|
||||
|
||||
@tool("Copy landing page template to project folder")
|
||||
def copy_landing_page_template_to_project_folder(landing_page_template):
|
||||
"""Copy a landing page template to your project
|
||||
folder so you can start modifying it, it expects
|
||||
a landing page template folder as input"""
|
||||
source_path = Path(f"templates/{landing_page_template}")
|
||||
destination_path = Path(f"workdir/{landing_page_template}")
|
||||
destination_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
shutil.copytree(source_path, destination_path)
|
||||
return f"Template copied to {landing_page_template} and ready to be modified, main files should be under ./{landing_page_template}/src/components, you should focus on those."
|
||||
try:
|
||||
# Validate input
|
||||
if not isinstance(landing_page_template, str):
|
||||
return "Error: Template name must be a string."
|
||||
|
||||
# Clean and validate template name
|
||||
template_name = landing_page_template.strip()
|
||||
|
||||
# Validate template name contains only safe characters
|
||||
if not re.match(r'^[a-zA-Z0-9_\-]+$', template_name):
|
||||
return "Error: Template name contains invalid characters. Only alphanumeric, underscore, and hyphen are allowed."
|
||||
|
||||
# Prevent path traversal
|
||||
if ".." in template_name or "/" in template_name or "\\" in template_name:
|
||||
return "Error: Template name cannot contain path traversal characters."
|
||||
|
||||
# Establish safe base directories
|
||||
templates_base = Path("templates").resolve()
|
||||
workdir_base = Path("workdir").resolve()
|
||||
|
||||
# Create source and destination paths
|
||||
source_path = templates_base / template_name
|
||||
destination_path = workdir_base / template_name
|
||||
|
||||
# Resolve paths and validate they're within expected directories
|
||||
source_resolved = source_path.resolve()
|
||||
destination_resolved = destination_path.resolve()
|
||||
|
||||
# Ensure source is within templates directory
|
||||
if not str(source_resolved).startswith(str(templates_base)):
|
||||
return "Error: Source template path is outside allowed templates directory."
|
||||
|
||||
# Ensure destination is within workdir
|
||||
if not str(destination_resolved).startswith(str(workdir_base)):
|
||||
return "Error: Destination path is outside allowed working directory."
|
||||
|
||||
# Check if source template exists
|
||||
if not source_resolved.exists():
|
||||
return f"Error: Template '{template_name}' does not exist in templates directory."
|
||||
|
||||
# Check if source is a directory
|
||||
if not source_resolved.is_dir():
|
||||
return f"Error: Template '{template_name}' is not a directory."
|
||||
|
||||
# Check if destination already exists
|
||||
if destination_resolved.exists():
|
||||
return f"Error: Destination '{template_name}' already exists in workdir. Please choose a different name or remove the existing directory."
|
||||
|
||||
# Create parent directories if needed
|
||||
destination_resolved.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Copy the template
|
||||
shutil.copytree(source_resolved, destination_resolved)
|
||||
|
||||
return f"Template '{template_name}' copied successfully to workdir and ready to be modified. Main files should be under ./{template_name}/src/components, you should focus on those."
|
||||
|
||||
except PermissionError:
|
||||
return "Error: Permission denied. Cannot copy template to destination."
|
||||
except Exception as e:
|
||||
return f"Error copying template: {str(e)}"
|
||||
|
||||
Reference in New Issue
Block a user