Moved file_validator to use safeuploads

This commit is contained in:
João Vitória Silva
2025-10-30 13:41:06 +00:00
parent 3809918938
commit 6337a8cfa0
17 changed files with 25 additions and 3533 deletions

View File

@@ -1,92 +0,0 @@
"""
File Security Module
A comprehensive file security system for validating uploads and preventing attacks.
"""
# Core classes and configurations
from .config import SecurityLimits, FileSecurityConfig
from .exceptions import (
ConfigValidationError,
FileSecurityConfigurationError,
ErrorCode,
FileSecurityError,
FileValidationError,
FilenameSecurityError,
UnicodeSecurityError,
ExtensionSecurityError,
WindowsReservedNameError,
FileSizeError,
MimeTypeError,
FileSignatureError,
CompressionSecurityError,
ZipBombError,
ZipContentError,
FileProcessingError,
)
from .enums import (
DangerousExtensionCategory,
CompoundExtensionCategory,
UnicodeAttackCategory,
SuspiciousFilePattern,
ZipThreatCategory,
)
# Main validator
from .file_validator import FileValidator
# Specialized validators
from .validators import (
BaseValidator,
UnicodeSecurityValidator,
ExtensionSecurityValidator,
WindowsSecurityValidator,
CompressionSecurityValidator,
)
# Inspectors
from .inspectors import ZipContentInspector
# Perform configuration validation when module is imported
# This ensures configuration issues are caught early during application startup
FileSecurityConfig.validate_and_report(strict=False)
# Export all public APIs
__all__ = [
# Core configuration
"SecurityLimits",
"FileSecurityConfig",
# Exceptions
"ConfigValidationError",
"FileSecurityConfigurationError",
"ErrorCode",
"FileSecurityError",
"FileValidationError",
"FilenameSecurityError",
"UnicodeSecurityError",
"ExtensionSecurityError",
"WindowsReservedNameError",
"FileSizeError",
"MimeTypeError",
"FileSignatureError",
"CompressionSecurityError",
"ZipBombError",
"ZipContentError",
"FileProcessingError",
# Enums
"DangerousExtensionCategory",
"CompoundExtensionCategory",
"UnicodeAttackCategory",
"SuspiciousFilePattern",
"ZipThreatCategory",
# Main validator
"FileValidator",
# Specialized validators
"BaseValidator",
"UnicodeSecurityValidator",
"ExtensionSecurityValidator",
"WindowsSecurityValidator",
"CompressionSecurityValidator",
# Inspectors
"ZipContentInspector",
]

View File

@@ -1,842 +0,0 @@
"""
File security configuration module.
"""
from dataclasses import dataclass
import logging
from .enums import (
DangerousExtensionCategory,
CompoundExtensionCategory,
UnicodeAttackCategory,
)
from .exceptions import ConfigValidationError, FileSecurityConfigurationError
logger = logging.getLogger(__name__)
@dataclass
class SecurityLimits:
"""
Security constraints for file submissions.
Attributes:
max_image_size: Maximum size in bytes for image files.
max_zip_size: Maximum size in bytes for ZIP archives.
max_compression_ratio: Maximum expansion ratio for ZIP files.
max_uncompressed_size: Maximum cumulative size of ZIP contents.
max_individual_file_size: Maximum size of single file in ZIP.
max_zip_entries: Maximum number of file entries in ZIP.
zip_analysis_timeout: Maximum seconds for ZIP analysis.
max_zip_depth: Maximum directory nesting depth in ZIP.
max_filename_length: Maximum length for filenames in ZIP.
max_path_length: Maximum length for full paths in ZIP.
allow_nested_archives: Whether nested archives are permitted.
allow_symlinks: Whether symbolic links are permitted.
allow_absolute_paths: Whether absolute paths are permitted.
scan_zip_content: Whether deep content inspection is enabled.
"""
# File size limits (in bytes)
max_image_size: int = 20 * 1024 * 1024 # 20MB for images
max_zip_size: int = 500 * 1024 * 1024 # 500MB for ZIP files
# ZIP compression security settings
max_compression_ratio: int = 100 # Maximum allowed expansion ratio (e.g., 100:1)
max_uncompressed_size: int = 1024 * 1024 * 1024 # 1GB max uncompressed size
max_individual_file_size: int = (
500 * 1024 * 1024
) # 500MB max per individual file in ZIP
max_zip_entries: int = 10000 # Maximum number of files in ZIP archive
zip_analysis_timeout: float = (
5.0 # Maximum seconds to spend analyzing ZIP structure
)
# ZIP content inspection settings
max_zip_depth: int = 10 # Maximum nesting depth for directories in ZIP
max_filename_length: int = 255 # Maximum length for individual file names
max_path_length: int = 1024 # Maximum length for full file paths
allow_nested_archives: bool = False # Whether to allow nested archive files
allow_symlinks: bool = False # Whether to allow symbolic links in ZIP
allow_absolute_paths: bool = False # Whether to allow absolute paths in ZIP
scan_zip_content: bool = True # Whether to perform deep content inspection
class FileSecurityConfig:
"""
Centralizes file upload security settings and validation.
Attributes:
limits: Security limits configuration instance.
ALLOWED_IMAGE_MIMES: Permitted MIME types for images.
ALLOWED_ZIP_MIMES: Permitted MIME types for ZIP files.
ALLOWED_IMAGE_EXTENSIONS: Permitted image file extensions.
ALLOWED_ZIP_EXTENSIONS: Permitted ZIP file extensions.
BLOCKED_EXTENSIONS: Dangerous file extensions to block.
COMPOUND_BLOCKED_EXTENSIONS: Multi-part extensions to block.
DANGEROUS_UNICODE_CHARS: Unicode characters for filename attacks.
WINDOWS_RESERVED_NAMES: Platform-specific reserved filenames.
"""
# Security limits configuration
limits = SecurityLimits()
# Allowed MIME types for images
ALLOWED_IMAGE_MIMES: set[str] = {"image/jpeg", "image/jpg", "image/png"}
# Allowed MIME types for ZIP files
ALLOWED_ZIP_MIMES: set[str] = {
"application/zip",
"application/x-zip-compressed",
"multipart/x-zip",
}
# Allowed file extensions
ALLOWED_IMAGE_EXTENSIONS: set[str] = {".jpg", ".jpeg", ".png"}
ALLOWED_ZIP_EXTENSIONS: set[str] = {".zip"}
# Generate dangerous file extensions from categorized enums
@staticmethod
def _generate_blocked_extensions() -> set[str]:
"""
Aggregate all dangerous extension categories.
Returns:
Combined set of blocked file extensions.
"""
blocked_extensions = set()
# Combine all dangerous extension categories
for category in DangerousExtensionCategory:
blocked_extensions.update(category.value)
return blocked_extensions
# Generate compound dangerous file extensions from categorized enums
@staticmethod
def _generate_compound_blocked_extensions() -> set[str]:
"""
Aggregate all compound extension categories.
Returns:
Combined set of blocked compound file extensions.
"""
compound_extensions = set()
# Combine all compound extension categories
for category in CompoundExtensionCategory:
compound_extensions.update(category.value)
return compound_extensions
# Generate dangerous Unicode characters from categorized enums
@staticmethod
def _generate_dangerous_unicode_chars() -> set[int]:
"""
Aggregate all dangerous Unicode code points.
Returns:
Combined set of dangerous Unicode code points.
"""
dangerous_chars = set()
# Combine all Unicode attack categories
for category in UnicodeAttackCategory:
dangerous_chars.update(category.value)
return dangerous_chars
# Dangerous file extensions to explicitly block (generated from enums)
BLOCKED_EXTENSIONS: set[str] = _generate_blocked_extensions()
# Compound dangerous file extensions (multi-part extensions)
# These are checked as complete strings, not individual parts
COMPOUND_BLOCKED_EXTENSIONS: set[str] = _generate_compound_blocked_extensions()
# Dangerous Unicode characters that can be used for filename attacks
# These characters can disguise file extensions or cause rendering issues
DANGEROUS_UNICODE_CHARS: set[int] = _generate_dangerous_unicode_chars()
# Windows reserved names that cannot be used as filenames
# These names are reserved by Windows regardless of extension
WINDOWS_RESERVED_NAMES: set[str] = {
"con",
"prn",
"aux",
"nul",
"com1",
"com2",
"com3",
"com4",
"com5",
"com6",
"com7",
"com8",
"com9",
"lpt1",
"lpt2",
"lpt3",
"lpt4",
"lpt5",
"lpt6",
"lpt7",
"lpt8",
"lpt9",
}
# Configuration validation trigger
@classmethod
def __init_subclass__(cls, **kwargs):
"""
Hook for subclass creation to validate configuration.
Args:
**kwargs: Subclass initialization arguments.
"""
super().__init_subclass__(**kwargs)
# Perform validation with warnings allowed (non-strict mode)
try:
cls.validate_and_report(strict=False)
except Exception as err:
logger.warning("Configuration validation failed: %s", err)
@classmethod
def get_extensions_by_category(
cls, category: DangerousExtensionCategory
) -> set[str]:
"""
Return extensions for a dangerous extension category.
Args:
category: The dangerous extension category.
Returns:
Copy of extensions in the specified category.
"""
return category.value.copy()
@classmethod
def get_compound_extensions_by_category(
cls, category: CompoundExtensionCategory
) -> set[str]:
"""
Return compound extensions for a category.
Args:
category: The compound extension category.
Returns:
Copy of compound extensions in the specified category.
"""
return category.value.copy()
@classmethod
def get_unicode_chars_by_category(cls, category: UnicodeAttackCategory) -> set[int]:
"""
Return Unicode code points for an attack category.
Args:
category: The Unicode attack category.
Returns:
Copy of code points in the specified category.
"""
return category.value.copy()
@classmethod
def is_extension_in_category(
cls, extension: str, category: DangerousExtensionCategory
) -> bool:
"""
Check if extension belongs to a dangerous category.
Args:
extension: File extension to evaluate.
category: Category to check against.
Returns:
True if extension is in the category, False otherwise.
"""
return extension.lower() in category.value
@classmethod
def get_extension_category(
cls, extension: str
) -> DangerousExtensionCategory | None:
"""
Return the dangerous extension category for an extension.
Args:
extension: The file extension to evaluate.
Returns:
Matching category if dangerous, None otherwise.
"""
extension_lower = extension.lower()
for category in DangerousExtensionCategory:
if extension_lower in category.value:
return category
return None
@classmethod
def validate_configuration(cls, strict: bool = True) -> list[ConfigValidationError]:
"""
Run all configuration validation routines.
Args:
strict: Reserved for future behavior adjustments.
Returns:
List of detected validation errors.
"""
errors = []
# Validate file size limits
errors.extend(cls._validate_file_size_limits())
# Validate MIME type configurations
errors.extend(cls._validate_mime_configurations())
# Validate file extension configurations
errors.extend(cls._validate_extension_configurations())
# Validate ZIP compression settings
errors.extend(cls._validate_compression_settings())
# Validate enum consistency
errors.extend(cls._validate_enum_consistency())
# Validate cross-configuration dependencies
errors.extend(cls._validate_cross_dependencies())
return errors
@classmethod
def _validate_file_size_limits(cls) -> list[ConfigValidationError]:
"""
Validate configured file size limits.
Returns:
List of detected configuration issues.
"""
errors = []
# Check image size limits
if cls.limits.max_image_size <= 0:
errors.append(
ConfigValidationError(
error_type="invalid_size_limit",
message="max_image_size must be greater than 0",
severity="error",
component="file_sizes",
recommendation="Set max_image_size to a positive value (e.g., 20MB)",
)
)
if cls.limits.max_image_size > 100 * 1024 * 1024: # 100MB
errors.append(
ConfigValidationError(
error_type="excessive_size_limit",
message=f"max_image_size ({cls.limits.max_image_size // (1024*1024)}MB) is very large",
severity="warning",
component="file_sizes",
recommendation="Consider reducing image size limit to prevent resource exhaustion",
)
)
# Check ZIP size limits
if cls.limits.max_zip_size <= 0:
errors.append(
ConfigValidationError(
error_type="invalid_size_limit",
message="max_zip_size must be greater than 0",
severity="error",
component="file_sizes",
recommendation="Set max_zip_size to a positive value (e.g., 500MB)",
)
)
if cls.limits.max_zip_size > 2 * 1024 * 1024 * 1024: # 2GB
errors.append(
ConfigValidationError(
error_type="excessive_size_limit",
message=f"max_zip_size ({cls.limits.max_zip_size // (1024*1024)}MB) is very large",
severity="warning",
component="file_sizes",
recommendation="Consider reducing ZIP size limit to prevent resource exhaustion",
)
)
# Validate size relationship
if cls.limits.max_zip_size <= cls.limits.max_image_size:
errors.append(
ConfigValidationError(
error_type="inconsistent_size_limits",
message="max_zip_size should typically be larger than max_image_size",
severity="warning",
component="file_sizes",
recommendation="ZIP files usually contain multiple files and should have higher limits",
)
)
return errors
@classmethod
def _validate_mime_configurations(cls) -> list[ConfigValidationError]:
"""
Validate MIME type configurations.
Returns:
List of detected configuration issues.
"""
errors = []
# Check image MIME types
if not cls.ALLOWED_IMAGE_MIMES:
errors.append(
ConfigValidationError(
error_type="empty_mime_set",
message="ALLOWED_IMAGE_MIMES cannot be empty",
severity="error",
component="mime_types",
recommendation="Add at least one allowed image MIME type",
)
)
# Validate image MIME type format
for mime_type in cls.ALLOWED_IMAGE_MIMES:
if not mime_type.startswith("image/"):
errors.append(
ConfigValidationError(
error_type="invalid_image_mime",
message=f"Image MIME type '{mime_type}' should start with 'image/'",
severity="warning",
component="mime_types",
recommendation="Use standard image MIME types like 'image/jpeg', 'image/png'",
)
)
# Check ZIP MIME types
if not cls.ALLOWED_ZIP_MIMES:
errors.append(
ConfigValidationError(
error_type="empty_mime_set",
message="ALLOWED_ZIP_MIMES cannot be empty",
severity="error",
component="mime_types",
recommendation="Add at least one allowed ZIP MIME type",
)
)
# Check for duplicate MIME types
all_mimes = list(cls.ALLOWED_IMAGE_MIMES) + list(cls.ALLOWED_ZIP_MIMES)
duplicates = set([mime for mime in all_mimes if all_mimes.count(mime) > 1])
if duplicates:
errors.append(
ConfigValidationError(
error_type="duplicate_mime_types",
message=f"Duplicate MIME types found: {duplicates}",
severity="warning",
component="mime_types",
recommendation="Remove duplicate MIME types to avoid confusion",
)
)
return errors
@classmethod
def _validate_extension_configurations(cls) -> list[ConfigValidationError]:
"""
Validate file extension configurations.
Returns:
List of detected configuration issues.
"""
errors = []
# Check extension format
for ext_set_name, ext_set in [
("ALLOWED_IMAGE_EXTENSIONS", cls.ALLOWED_IMAGE_EXTENSIONS),
("ALLOWED_ZIP_EXTENSIONS", cls.ALLOWED_ZIP_EXTENSIONS),
]:
if not ext_set:
errors.append(
ConfigValidationError(
error_type="empty_extension_set",
message=f"{ext_set_name} cannot be empty",
severity="error",
component="extensions",
recommendation=f"Add at least one extension to {ext_set_name}",
)
)
for ext in ext_set:
if not ext.startswith("."):
errors.append(
ConfigValidationError(
error_type="invalid_extension_format",
message=f"Extension '{ext}' in {ext_set_name} should start with '.'",
severity="error",
component="extensions",
recommendation="Use format '.ext' for file extensions",
)
)
# Check blocked extensions
if not cls.BLOCKED_EXTENSIONS:
errors.append(
ConfigValidationError(
error_type="empty_blocked_extensions",
message="BLOCKED_EXTENSIONS is empty - security risk",
severity="error",
component="extensions",
recommendation="Ensure dangerous extensions are properly blocked",
)
)
# Check for overlap between allowed and blocked extensions
image_blocked = cls.ALLOWED_IMAGE_EXTENSIONS.intersection(
cls.BLOCKED_EXTENSIONS
)
if image_blocked:
errors.append(
ConfigValidationError(
error_type="extension_conflict",
message=f"Image extensions {image_blocked} are both allowed and blocked",
severity="error",
component="extensions",
recommendation="Remove conflicts between allowed and blocked extensions",
)
)
zip_blocked = cls.ALLOWED_ZIP_EXTENSIONS.intersection(cls.BLOCKED_EXTENSIONS)
if zip_blocked:
errors.append(
ConfigValidationError(
error_type="extension_conflict",
message=f"ZIP extensions {zip_blocked} are both allowed and blocked",
severity="error",
component="extensions",
recommendation="Remove conflicts between allowed and blocked extensions",
)
)
# Check compound extension consistency
compound_overlap = cls.BLOCKED_EXTENSIONS.intersection(
cls.COMPOUND_BLOCKED_EXTENSIONS
)
if compound_overlap:
errors.append(
ConfigValidationError(
error_type="compound_extension_overlap",
message=f"Extensions {compound_overlap} appear in both blocked and compound blocked lists",
severity="warning",
component="extensions",
recommendation="Compound extensions should only be in COMPOUND_BLOCKED_EXTENSIONS",
)
)
return errors
@classmethod
def _validate_compression_settings(cls) -> list[ConfigValidationError]:
"""
Validate compression-related limits.
Returns:
List of detected configuration issues.
"""
errors = []
# Validate compression ratio
if cls.limits.max_compression_ratio <= 0:
errors.append(
ConfigValidationError(
error_type="invalid_compression_ratio",
message="max_compression_ratio must be greater than 0",
severity="error",
component="compression",
recommendation="Set a reasonable compression ratio limit (e.g., 100:1)",
)
)
if cls.limits.max_compression_ratio < 10:
errors.append(
ConfigValidationError(
error_type="too_strict_compression",
message=f"max_compression_ratio ({cls.limits.max_compression_ratio}) is very strict",
severity="warning",
component="compression",
recommendation="Consider allowing higher compression ratios for legitimate files",
)
)
if cls.limits.max_compression_ratio > 1000:
errors.append(
ConfigValidationError(
error_type="too_permissive_compression",
message=f"max_compression_ratio ({cls.limits.max_compression_ratio}) may allow zip bombs",
severity="warning",
component="compression",
recommendation="Reduce compression ratio limit to prevent zip bomb attacks",
)
)
# Validate uncompressed size limit
if cls.limits.max_uncompressed_size <= 0:
errors.append(
ConfigValidationError(
error_type="invalid_uncompressed_size",
message="max_uncompressed_size must be greater than 0",
severity="error",
component="compression",
recommendation="Set a reasonable uncompressed size limit",
)
)
# Validate individual file size limit
if cls.limits.max_individual_file_size <= 0:
errors.append(
ConfigValidationError(
error_type="invalid_individual_file_size",
message="max_individual_file_size must be greater than 0",
severity="error",
component="compression",
recommendation="Set a reasonable individual file size limit",
)
)
# Check individual file size doesn't exceed total uncompressed size
if cls.limits.max_individual_file_size > cls.limits.max_uncompressed_size:
errors.append(
ConfigValidationError(
error_type="inconsistent_size_limits",
message=f"max_individual_file_size ({cls.limits.max_individual_file_size // (1024*1024)}MB) "
f"exceeds max_uncompressed_size ({cls.limits.max_uncompressed_size // (1024*1024)}MB)",
severity="warning",
component="compression",
recommendation="Individual file size limit should not exceed total uncompressed size limit",
)
)
# Validate ZIP entry limits
if cls.limits.max_zip_entries <= 0:
errors.append(
ConfigValidationError(
error_type="invalid_zip_entries",
message="max_zip_entries must be greater than 0",
severity="error",
component="compression",
recommendation="Set a reasonable limit for ZIP file entries",
)
)
if cls.limits.max_zip_entries > 100000:
errors.append(
ConfigValidationError(
error_type="excessive_zip_entries",
message=f"max_zip_entries ({cls.limits.max_zip_entries}) is very high",
severity="warning",
component="compression",
recommendation="High entry limits may impact performance",
)
)
# Validate timeout settings
if cls.limits.zip_analysis_timeout <= 0:
errors.append(
ConfigValidationError(
error_type="invalid_timeout",
message="zip_analysis_timeout must be greater than 0",
severity="error",
component="compression",
recommendation="Set a reasonable timeout for ZIP analysis",
)
)
if cls.limits.zip_analysis_timeout > 30:
errors.append(
ConfigValidationError(
error_type="excessive_timeout",
message=f"zip_analysis_timeout ({cls.limits.zip_analysis_timeout}s) is very long",
severity="warning",
component="compression",
recommendation="Long timeouts may impact user experience",
)
)
return errors
@classmethod
def _validate_enum_consistency(cls) -> list[ConfigValidationError]:
"""
Validate enum categories for emptiness and overlaps.
Returns:
List of detected configuration issues.
"""
errors = []
# Check for empty enum categories
for category in DangerousExtensionCategory:
if not category.value:
errors.append(
ConfigValidationError(
error_type="empty_enum_category",
message=f"Extension category {category.name} is empty",
severity="warning",
component="enums",
recommendation=f"Add extensions to {category.name} or remove unused category",
)
)
for category in CompoundExtensionCategory:
if not category.value:
errors.append(
ConfigValidationError(
error_type="empty_enum_category",
message=f"Compound extension category {category.name} is empty",
severity="warning",
component="enums",
recommendation=f"Add extensions to {category.name} or remove unused category",
)
)
for category in UnicodeAttackCategory:
if not category.value:
errors.append(
ConfigValidationError(
error_type="empty_enum_category",
message=f"Unicode attack category {category.name} is empty",
severity="warning",
component="enums",
recommendation=f"Add Unicode characters to {category.name} or remove unused category",
)
)
# Check for overlapping extensions between categories
all_extensions_by_category = {}
for category in DangerousExtensionCategory:
all_extensions_by_category[category.name] = category.value
for cat1_name, cat1_exts in all_extensions_by_category.items():
for cat2_name, cat2_exts in all_extensions_by_category.items():
if cat1_name != cat2_name:
overlap = cat1_exts.intersection(cat2_exts)
if overlap:
errors.append(
ConfigValidationError(
error_type="category_overlap",
message=f"Categories {cat1_name} and {cat2_name} share extensions: {overlap}",
severity="info",
component="enums",
recommendation="Consider if extensions should belong to multiple categories",
)
)
return errors
@classmethod
def _validate_cross_dependencies(cls) -> list[ConfigValidationError]:
"""
Validate cross-field configuration constraints.
Returns:
List of detected configuration issues.
"""
errors = []
# Check Windows reserved names format
for name in cls.WINDOWS_RESERVED_NAMES:
if not name.islower():
errors.append(
ConfigValidationError(
error_type="case_sensitive_reserved_name",
message=f"Windows reserved name '{name}' should be lowercase",
severity="warning",
component="reserved_names",
recommendation="Use lowercase for consistent case-insensitive matching",
)
)
# Validate Unicode character ranges
for char_code in cls.DANGEROUS_UNICODE_CHARS:
if not isinstance(char_code, int):
errors.append(
ConfigValidationError(
error_type="invalid_unicode_char",
message=f"Unicode character code {char_code} is not an integer",
severity="error",
component="unicode",
recommendation="Use integer Unicode code points",
)
)
elif char_code < 0 or char_code > 0x10FFFF:
errors.append(
ConfigValidationError(
error_type="invalid_unicode_range",
message=f"Unicode character code {char_code} is outside valid range",
severity="error",
component="unicode",
recommendation="Use valid Unicode code points (0-0x10FFFF)",
)
)
return errors
@classmethod
def validate_and_report(cls, strict: bool = True) -> None:
"""
Validate configuration and log outcomes.
Args:
strict: If True, raise on errors/warnings.
Raises:
FileSecurityConfigurationError: If strict and issues found.
"""
errors = cls.validate_configuration(strict=strict)
if not errors:
logger.info("File security configuration validation passed")
return
# Separate errors by severity
error_list = [e for e in errors if e.severity == "error"]
warning_list = [e for e in errors if e.severity == "warning"]
info_list = [e for e in errors if e.severity == "info"]
# Log validation results
if error_list:
for error in error_list:
logger.error(
"Configuration error in %s: %s. %s",
error.component,
error.message,
error.recommendation,
)
if warning_list:
for warning in warning_list:
logger.warning(
"Configuration warning in %s: %s. %s",
warning.component,
warning.message,
warning.recommendation,
)
if info_list:
for info in info_list:
logger.info(
"Configuration info in %s: %s. %s",
info.component,
info.message,
info.recommendation,
)
# Raise exception if there are errors and strict mode is enabled
if error_list and strict:
raise FileSecurityConfigurationError(error_list)
elif (error_list or warning_list) and strict:
raise FileSecurityConfigurationError(error_list + warning_list)

View File

@@ -1,455 +0,0 @@
"""Enumeration classes for categorizing security threats and patterns."""
from enum import Enum
class DangerousExtensionCategory(Enum):
"""
File extension categories considered potentially dangerous for uploads.
Attributes:
WINDOWS_EXECUTABLES: Traditional Windows executable formats.
SCRIPT_FILES: Script files that can execute code.
WEB_SCRIPTS: Web server and dynamic content scripts.
UNIX_EXECUTABLES: Unix/Linux executables and shell scripts.
MACOS_EXECUTABLES: macOS specific executables and applications.
JAVA_EXECUTABLES: Java related executables and bytecode.
MOBILE_APPS: Mobile application packages.
BROWSER_EXTENSIONS: Browser extensions and web applications.
PACKAGE_FORMATS: Modern package managers and distribution formats.
ARCHIVE_FORMATS: Archive formats that can contain executables.
VIRTUALIZATION_FORMATS: Virtualization and container formats.
OFFICE_MACROS: Office documents with macro capabilities.
SYSTEM_FILES: System shortcuts and configuration files.
SYSTEM_DRIVERS: System drivers and low-level components.
WINDOWS_THEMES: Windows theme and customization files.
HELP_FILES: Help and documentation files that can execute code.
"""
# Traditional Windows executables
WINDOWS_EXECUTABLES = {
".exe",
".bat",
".cmd",
".com",
".pif",
".scr",
".msi",
".dll",
}
# Script files that can execute code
SCRIPT_FILES = {
".vbs",
".js",
".jse",
".wsf",
".wsh",
".hta",
".ps1",
".psm1",
".ps1xml",
".psc1",
".psd1",
".pssc",
".cdxml",
".xaml",
}
# Web server and dynamic content scripts
WEB_SCRIPTS = {
".jsp",
".php",
".php3",
".php4",
".php5",
".phtml",
".asp",
".aspx",
".cer",
".cgi",
".pl",
".py",
".rb",
".go",
".lua",
}
# Unix/Linux executables and shell scripts
UNIX_EXECUTABLES = {
".sh",
".bash",
".zsh",
".fish",
".csh",
".ksh",
".tcsh",
".run",
".bin",
".out",
".elf",
".so",
".a",
}
# macOS specific executables and applications
MACOS_EXECUTABLES = {
".app",
".dmg",
".pkg",
".mpkg",
".command",
".tool",
".workflow",
".action",
".dylib",
".bundle",
".framework",
}
# Java related executables and bytecode
JAVA_EXECUTABLES = {".jar", ".war", ".ear", ".jnlp", ".class"}
# Mobile application packages
MOBILE_APPS = {".apk", ".aab", ".ipa", ".appx", ".msix", ".xap"}
# Browser extensions and web applications
BROWSER_EXTENSIONS = {
".crx",
".xpi",
".safariextz",
".oex",
".nex",
".gadget",
}
# Modern package managers and distribution formats
PACKAGE_FORMATS = {
".deb",
".rpm",
".snap",
".flatpak",
".appimage",
".vsix",
".nupkg",
".gem",
".whl",
".egg",
}
# Archive formats that can contain executables
ARCHIVE_FORMATS = {
".7z",
".rar",
".cab",
".ace",
".arj",
".lzh",
".lha",
".zoo",
}
# Virtualization and container formats
VIRTUALIZATION_FORMATS = {
".ova",
".ovf",
".vmdk",
".vdi",
".vhd",
".vhdx",
".qcow2",
".docker",
}
# Office documents with macro capabilities
OFFICE_MACROS = {
".docm",
".dotm",
".xlsm",
".xltm",
".xlam",
".pptm",
".potm",
".ppam",
".sldm",
}
# System shortcuts and configuration files
SYSTEM_FILES = {
".url",
".website",
".webloc",
".desktop",
".lnk",
".application",
".manifest",
".deploy",
".msu",
".patch",
".diff",
".reg",
".inf",
}
# System drivers and low-level components
SYSTEM_DRIVERS = {".sys", ".drv", ".ocx", ".cpl"}
# Windows theme and customization files
WINDOWS_THEMES = {
".theme",
".themepack",
".scf",
".shs",
".shb",
".sct",
".ws",
".job",
".msc",
}
# Help and documentation files that can execute code
HELP_FILES = {".chm", ".hlp"}
class CompoundExtensionCategory(Enum):
"""
Categorized compound file extensions that combine multiple suffixes.
Attributes:
COMPRESSED_ARCHIVES: Multi-part archive formats.
JAVASCRIPT_VARIANTS: Specialized JavaScript files.
WEB_CONTENT: Minified static web assets.
"""
# Compressed archive formats
COMPRESSED_ARCHIVES = {
".tar.xz",
".tar.gz",
".tar.bz2",
".tar.lz",
".tar.lzma",
".tar.Z",
".tgz",
".tbz2",
}
# JavaScript related compound extensions
JAVASCRIPT_VARIANTS = {".user.js", ".backup.js", ".min.js", ".worker.js"}
# Web content compound extensions
WEB_CONTENT = {".min.css", ".min.html"}
class UnicodeAttackCategory(Enum):
"""
Categorized Unicode code points used in obfuscation attacks.
Attributes:
DIRECTIONAL_OVERRIDES: Right-to-left and directional controls.
ZERO_WIDTH_CHARACTERS: Zero-width and invisible characters.
LANGUAGE_MARKS: Language and format specific characters.
CONFUSING_PUNCTUATION: Punctuation that can disguise extensions.
"""
# Right-to-Left and directional override characters
DIRECTIONAL_OVERRIDES = {
0x202E, # U+202E RIGHT-TO-LEFT OVERRIDE
0x202D, # U+202D LEFT-TO-RIGHT OVERRIDE
0x202A, # U+202A LEFT-TO-RIGHT EMBEDDING
0x202B, # U+202B RIGHT-TO-LEFT EMBEDDING
0x202C, # U+202C POP DIRECTIONAL FORMATTING
0x2066, # U+2066 LEFT-TO-RIGHT ISOLATE
0x2067, # U+2067 RIGHT-TO-LEFT ISOLATE
0x2068, # U+2068 FIRST STRONG ISOLATE
0x2069, # U+2069 POP DIRECTIONAL ISOLATE
}
# Zero-width and invisible characters
ZERO_WIDTH_CHARACTERS = {
0x200B, # U+200B ZERO WIDTH SPACE
0x200C, # U+200C ZERO WIDTH NON-JOINER
0x200D, # U+200D ZERO WIDTH JOINER
0x2060, # U+2060 WORD JOINER
0xFEFF, # U+FEFF ZERO WIDTH NO-BREAK SPACE (BOM)
0x034F, # U+034F COMBINING GRAPHEME JOINER
}
# Language and format specific characters
LANGUAGE_MARKS = {
0x061C, # U+061C ARABIC LETTER MARK
0x180E, # U+180E MONGOLIAN VOWEL SEPARATOR
}
# Confusing punctuation that can disguise extensions
CONFUSING_PUNCTUATION = {
0x2024, # U+2024 ONE DOT LEADER
0x2025, # U+2025 TWO DOT LEADER
0x2026, # U+2026 HORIZONTAL ELLIPSIS
0xFF0E, # U+FF0E FULLWIDTH FULL STOP
}
class SuspiciousFilePattern(Enum):
"""
Categorized patterns used to flag potentially malicious uploads.
Attributes:
DIRECTORY_TRAVERSAL: Directory traversal attack patterns.
SUSPICIOUS_NAMES: Suspicious filename patterns.
EXECUTABLE_SIGNATURES: Dangerous file content signatures.
SUSPICIOUS_PATHS: Suspicious path components.
"""
# Directory traversal attack patterns
DIRECTORY_TRAVERSAL = {
"../",
"..\\",
".../",
"...\\",
"....//",
"....\\\\",
"%2e%2e%2f",
"%2e%2e%5c", # URL encoded ../ and ..\
"%252e%252e%252f",
"%252e%252e%255c", # Double URL encoded
}
# Suspicious filename patterns
SUSPICIOUS_NAMES = {
# Windows system files that shouldn't be in user uploads
"autorun.inf",
"desktop.ini",
"thumbs.db",
".ds_store",
# Common malware names
"install.exe",
"setup.exe",
"update.exe",
"patch.exe",
"crack.exe",
"keygen.exe",
"loader.exe",
"activator.exe",
# Hidden or system-like files
".htaccess",
".htpasswd",
"web.config",
"robots.txt",
}
# Dangerous file content signatures (magic bytes)
EXECUTABLE_SIGNATURES = {
# Windows PE executables
b"MZ",
b"PE\x00\x00",
# ELF executables (Linux)
b"\x7fELF",
# Mach-O executables (macOS)
b"\xfe\xed\xfa\xce",
b"\xfe\xed\xfa\xcf",
b"\xce\xfa\xed\xfe",
b"\xcf\xfa\xed\xfe",
# Java class files
b"\xca\xfe\xba\xbe",
# Windows shortcuts (.lnk)
b"L\x00\x00\x00",
}
# Suspicious path components
SUSPICIOUS_PATHS = {
# Windows system directories
"windows/",
"system32/",
"syswow64/",
"programfiles/",
# Unix system directories
"/bin/",
"/sbin/",
"/usr/bin/",
"/usr/sbin/",
"/etc/",
# Web server directories
"cgi-bin/",
"htdocs/",
"www/",
"wwwroot/",
# Development/build directories
".git/",
".svn/",
"node_modules/",
"__pycache__/",
}
class ZipThreatCategory(Enum):
"""
Categories of potentially harmful contents within ZIP archives.
Attributes:
NESTED_ARCHIVES: Archive format threats.
EXECUTABLE_FILES: Executable content threats.
SCRIPT_FILES: Script and code threats.
SYSTEM_FILES: System and configuration threats.
"""
# Archive format threats
NESTED_ARCHIVES = {
".zip",
".rar",
".7z",
".tar",
".gz",
".bz2",
".xz",
".tar.gz",
".tar.bz2",
".tar.xz",
".tgz",
".tbz2",
}
# Executable content threats
EXECUTABLE_FILES = {
".exe",
".com",
".bat",
".cmd",
".scr",
".pif",
".bin",
".run",
".app",
".deb",
".rpm",
".msi",
}
# Script and code threats
SCRIPT_FILES = {
".js",
".vbs",
".ps1",
".sh",
".bash",
".py",
".php",
".pl",
".rb",
".lua",
".asp",
".jsp",
}
# System and configuration threats
SYSTEM_FILES = {
".dll",
".so",
".dylib",
".sys",
".drv",
".inf",
".reg",
".cfg",
".conf",
".ini",
}

View File

@@ -1,468 +0,0 @@
"""File security exception classes and error codes."""
from dataclasses import dataclass
# ============================================================================
# Configuration Validation
# ============================================================================
@dataclass
class ConfigValidationError:
"""
Configuration validation issue with severity and recommendation.
Attributes:
error_type: Type of the validation error.
message: Human-readable error message.
severity: Error severity level ('error', 'warning', 'info').
component: Component that failed validation.
recommendation: Optional recommendation to fix the issue.
"""
error_type: str
message: str
severity: str # 'error', 'warning', 'info'
component: str
recommendation: str = ""
class FileSecurityConfigurationError(Exception):
"""
Configuration validation failed with aggregated errors.
Args:
errors: List of ConfigValidationError instances.
Attributes:
errors: List of validation errors that caused failure.
"""
def __init__(self, errors: list[ConfigValidationError]):
self.errors = errors
error_messages = [
f"{error.severity.upper()}: {error.message}" for error in errors
]
super().__init__(
f"Configuration validation failed: {'; '.join(error_messages)}"
)
# ============================================================================
# Error Codes
# ============================================================================
class ErrorCode:
"""
Machine-readable error codes for file validation failures.
Attributes:
Error codes are class-level string constants for various
validation failure types.
"""
# Filename validation errors
FILENAME_EMPTY = "FILENAME_EMPTY"
FILENAME_INVALID = "FILENAME_INVALID"
FILENAME_TOO_LONG = "FILENAME_TOO_LONG"
# Unicode security errors
UNICODE_SECURITY = "UNICODE_SECURITY"
UNICODE_DANGEROUS_CHARS = "UNICODE_DANGEROUS_CHARS"
UNICODE_NORMALIZATION_ERROR = "UNICODE_NORMALIZATION_ERROR"
# Extension validation errors
EXTENSION_BLOCKED = "EXTENSION_BLOCKED"
EXTENSION_NOT_ALLOWED = "EXTENSION_NOT_ALLOWED"
COMPOUND_EXTENSION_BLOCKED = "COMPOUND_EXTENSION_BLOCKED"
EXTENSION_MISSING = "EXTENSION_MISSING"
# Windows security errors
WINDOWS_RESERVED_NAME = "WINDOWS_RESERVED_NAME"
# File size errors
FILE_TOO_LARGE = "FILE_TOO_LARGE"
FILE_EMPTY = "FILE_EMPTY"
FILE_SIZE_UNKNOWN = "FILE_SIZE_UNKNOWN"
# MIME type errors
MIME_TYPE_INVALID = "MIME_TYPE_INVALID"
MIME_TYPE_MISMATCH = "MIME_TYPE_MISMATCH"
MIME_DETECTION_FAILED = "MIME_DETECTION_FAILED"
# File signature errors
FILE_SIGNATURE_INVALID = "FILE_SIGNATURE_INVALID"
FILE_SIGNATURE_MISSING = "FILE_SIGNATURE_MISSING"
FILE_SIGNATURE_MISMATCH = "FILE_SIGNATURE_MISMATCH"
# Compression and ZIP errors
ZIP_BOMB_DETECTED = "ZIP_BOMB_DETECTED"
ZIP_CONTENT_THREAT = "ZIP_CONTENT_THREAT"
COMPRESSION_RATIO_EXCEEDED = "COMPRESSION_RATIO_EXCEEDED"
ZIP_TOO_MANY_ENTRIES = "ZIP_TOO_MANY_ENTRIES"
ZIP_INVALID_STRUCTURE = "ZIP_INVALID_STRUCTURE"
ZIP_CORRUPT = "ZIP_CORRUPT"
ZIP_TOO_LARGE = "ZIP_TOO_LARGE"
ZIP_NESTED_ARCHIVE = "ZIP_NESTED_ARCHIVE"
ZIP_DIRECTORY_TRAVERSAL = "ZIP_DIRECTORY_TRAVERSAL"
ZIP_SYMLINK_DETECTED = "ZIP_SYMLINK_DETECTED"
ZIP_ABSOLUTE_PATH = "ZIP_ABSOLUTE_PATH"
ZIP_ANALYSIS_TIMEOUT = "ZIP_ANALYSIS_TIMEOUT"
# Processing errors
PROCESSING_ERROR = "PROCESSING_ERROR"
IO_ERROR = "IO_ERROR"
MEMORY_ERROR = "MEMORY_ERROR"
# ============================================================================
# Base Exceptions
# ============================================================================
class FileSecurityError(Exception):
"""
Base exception for all file security validation failures.
Args:
message: Human-readable error description.
error_code: Optional machine-readable error code.
Attributes:
message: Human-readable error message.
error_code: Machine-readable error code from ErrorCode.
"""
def __init__(self, message: str, error_code: str | None = None):
self.message = message
self.error_code = error_code
super().__init__(message)
# ============================================================================
# File Validation Exceptions
# ============================================================================
class FileValidationError(FileSecurityError):
"""
File validation failed.
Args:
message: Human-readable error description.
filename: Optional name of the file that failed validation.
error_code: Optional machine-readable error code.
Attributes:
filename: Name of the file that failed validation.
"""
def __init__(
self,
message: str,
filename: str | None = None,
error_code: str | None = None,
):
self.filename = filename
super().__init__(message, error_code)
# ============================================================================
# Filename Security Exceptions
# ============================================================================
class FilenameSecurityError(FileValidationError):
"""Filename failed security checks."""
pass
class UnicodeSecurityError(FilenameSecurityError):
"""
Dangerous Unicode characters detected in filename.
Args:
message: Human-readable error description.
filename: Optional filename containing dangerous Unicode.
dangerous_chars: Optional list of (char, code_point, position)
tuples for each dangerous character found.
Attributes:
dangerous_chars: List of dangerous character tuples.
"""
def __init__(
self,
message: str,
filename: str | None = None,
dangerous_chars: list[tuple[str, int, int]] | None = None,
):
self.dangerous_chars = dangerous_chars or []
super().__init__(
message,
filename=filename,
error_code=ErrorCode.UNICODE_DANGEROUS_CHARS,
)
class ExtensionSecurityError(FilenameSecurityError):
"""
Dangerous file extension detected.
Args:
message: Human-readable error description.
filename: Optional filename with dangerous extension.
extension: Optional specific extension that was blocked.
error_code: Optional error code (defaults to
EXTENSION_BLOCKED).
Attributes:
extension: The specific extension that was blocked.
"""
def __init__(
self,
message: str,
filename: str | None = None,
extension: str | None = None,
error_code: str | None = None,
):
self.extension = extension
super().__init__(
message,
filename=filename,
error_code=error_code or ErrorCode.EXTENSION_BLOCKED,
)
class WindowsReservedNameError(FilenameSecurityError):
"""
Windows reserved device name used.
Args:
message: Human-readable error description.
filename: Optional filename using a reserved name.
reserved_name: Optional specific reserved name detected.
Attributes:
reserved_name: The specific reserved name that was detected.
"""
def __init__(
self,
message: str,
filename: str | None = None,
reserved_name: str | None = None,
):
self.reserved_name = reserved_name
super().__init__(
message,
filename=filename,
error_code=ErrorCode.WINDOWS_RESERVED_NAME,
)
# ============================================================================
# File Content Exceptions
# ============================================================================
class FileSizeError(FileValidationError):
"""
File exceeds configured size limits.
Args:
message: Human-readable error description.
filename: Optional filename that exceeded size limits.
size: Optional actual file size in bytes.
max_size: Optional maximum allowed size in bytes.
Attributes:
size: The actual file size in bytes.
max_size: The maximum allowed size in bytes.
"""
def __init__(
self,
message: str,
filename: str | None = None,
size: int | None = None,
max_size: int | None = None,
):
self.size = size
self.max_size = max_size
super().__init__(
message, filename=filename, error_code=ErrorCode.FILE_TOO_LARGE
)
class MimeTypeError(FileValidationError):
"""
File MIME type not allowed or mismatches extension.
Args:
message: Human-readable error description.
filename: Optional filename with MIME type issue.
detected_mime: Optional detected MIME type string.
allowed_mimes: Optional list of allowed MIME types.
error_code: Optional error code (defaults to
MIME_TYPE_INVALID).
Attributes:
detected_mime: The detected MIME type string.
allowed_mimes: List of allowed MIME types.
"""
def __init__(
self,
message: str,
filename: str | None = None,
detected_mime: str | None = None,
allowed_mimes: list[str] | None = None,
error_code: str | None = None,
):
self.detected_mime = detected_mime
self.allowed_mimes = allowed_mimes or []
super().__init__(
message,
filename=filename,
error_code=error_code or ErrorCode.MIME_TYPE_INVALID,
)
class FileSignatureError(FileValidationError):
"""
File header signature invalid or mismatched.
Args:
message: Human-readable error description.
filename: Optional filename with signature issue.
expected_type: Optional expected file type based on extension.
Attributes:
expected_type: The expected file type based on extension.
"""
def __init__(
self,
message: str,
filename: str | None = None,
expected_type: str | None = None,
):
self.expected_type = expected_type
super().__init__(
message,
filename=filename,
error_code=ErrorCode.FILE_SIGNATURE_MISMATCH,
)
# ============================================================================
# Compression and ZIP Exceptions
# ============================================================================
class CompressionSecurityError(FileValidationError):
"""
Compressed file security check failed.
Args:
message: Human-readable error description.
filename: Optional filename of compressed file.
error_code: Optional error code (defaults to
COMPRESSION_GENERIC).
Attributes:
None beyond inherited FileValidationError attributes.
"""
pass
class ZipBombError(CompressionSecurityError):
"""
Zip archive exceeds compression ratio or uncompressed size limits.
Args:
message: Human-readable error description.
filename: Optional filename of zip bomb.
compression_ratio: Optional actual compression ratio detected.
uncompressed_size: Optional total uncompressed size in bytes.
max_ratio: Optional maximum allowed compression ratio.
max_size: Optional maximum allowed uncompressed size in bytes.
Attributes:
compression_ratio: Actual compression ratio detected.
uncompressed_size: Total uncompressed size in bytes.
max_ratio: Maximum allowed compression ratio.
max_size: Maximum allowed uncompressed size in bytes.
"""
def __init__(
self,
message: str,
filename: str | None = None,
compression_ratio: float | None = None,
uncompressed_size: int | None = None,
max_ratio: float | None = None,
max_size: int | None = None,
):
self.compression_ratio = compression_ratio
self.uncompressed_size = uncompressed_size
self.max_ratio = max_ratio
self.max_size = max_size
super().__init__(
message,
filename=filename,
error_code=ErrorCode.ZIP_BOMB_DETECTED,
)
class ZipContentError(CompressionSecurityError):
"""
Zip archive contains dangerous content or structure.
Args:
message: Human-readable error description.
filename: Optional filename of problematic archive.
threats: Optional list of detected threat descriptions.
error_code: Optional error code (defaults to
ZIP_CONTENT_THREAT).
Attributes:
threats: List of detected threat descriptions.
"""
def __init__(
self,
message: str,
filename: str | None = None,
threats: list[str] | None = None,
error_code: str | None = None,
):
self.threats = threats or []
super().__init__(
message,
filename=filename,
error_code=error_code or ErrorCode.ZIP_CONTENT_THREAT,
)
class FileProcessingError(FileSecurityError):
"""
Unexpected processing error during file validation.
Args:
message: Human-readable error description.
original_error: Optional original exception that was caught.
Attributes:
original_error: The original exception that was caught.
"""
def __init__(self, message: str, original_error: Exception | None = None):
self.original_error = original_error
super().__init__(message, error_code=ErrorCode.PROCESSING_ERROR)

View File

@@ -1,525 +0,0 @@
"""Main file validator coordinating all security validations."""
import logging
import os
import time
import mimetypes
import magic
# Optional FastAPI integration - fallback to protocol if not available
try:
from fastapi import UploadFile
except ImportError:
from .protocols import UploadFileProtocol as UploadFile
from .config import FileSecurityConfig
from .validators import (
UnicodeSecurityValidator,
ExtensionSecurityValidator,
WindowsSecurityValidator,
CompressionSecurityValidator,
)
from .inspectors import ZipContentInspector
from .exceptions import (
ErrorCode,
FileValidationError,
FilenameSecurityError,
ExtensionSecurityError,
FileSizeError,
MimeTypeError,
FileSignatureError,
FileProcessingError,
)
logger = logging.getLogger(__name__)
class FileValidator:
"""
Coordinated security validation for uploaded files.
Attributes:
config: Active security configuration.
unicode_validator: Validator for Unicode-related checks.
extension_validator: Validator for file extension rules.
windows_validator: Validator enforcing Windows-specific constraints.
compression_validator: Validator handling compressed file limits.
zip_inspector: Inspector for ZIP archive contents.
magic_mime: MIME type detector based on python-magic.
magic_available: Whether python-magic was successfully initialized.
"""
def __init__(self, config: FileSecurityConfig | None = None):
"""
Initialize file validator with configuration and detection utilities.
Args:
config: Optional configuration object defining file security
rules. Defaults to new FileSecurityConfig instance.
Attributes:
config: Active security configuration.
unicode_validator: Validator for Unicode-related checks.
extension_validator: Validator for file extension rules.
windows_validator: Validator enforcing Windows constraints.
compression_validator: Validator for compressed file limits.
zip_inspector: Inspector for ZIP archive contents.
magic_mime: MIME type detector based on python-magic.
magic_available: Whether python-magic initialized successfully.
"""
self.config = config or FileSecurityConfig()
# Initialize specialized validators
self.unicode_validator = UnicodeSecurityValidator(self.config)
self.extension_validator = ExtensionSecurityValidator(self.config)
self.windows_validator = WindowsSecurityValidator(self.config)
self.compression_validator = CompressionSecurityValidator(self.config)
self.zip_inspector = ZipContentInspector(self.config)
# Initialize python-magic for content-based detection
try:
self.magic_mime = magic.Magic(mime=True)
self.magic_available = True
logger.debug("File content detection (python-magic) initialized")
except Exception as err:
self.magic_available = False
logger.warning(
"python-magic not available for content detection: %s",
err,
)
def _detect_mime_type(self, file_content: bytes, filename: str) -> str:
"""
Determine MIME type for file content.
Args:
file_content: Raw bytes of the file to inspect.
filename: Original filename for fallback MIME detection.
Returns:
Detected MIME type or "application/octet-stream" if detection
fails.
"""
detected_mime = None
# Content-based detection using python-magic (most reliable)
if self.magic_available:
try:
detected_mime = self.magic_mime.from_buffer(file_content)
except Exception as err:
logger.warning("Magic MIME detection failed: %s", err)
# Fallback to filename-based detection
if not detected_mime:
logger.info("Fallback to filename-based MIME detection")
detected_mime, _ = mimetypes.guess_type(filename)
return detected_mime or "application/octet-stream"
def _validate_file_signature(self, file_content: bytes, expected_type: str) -> None:
"""
Verify file content begins with known signature for expected type.
Args:
file_content: Raw bytes of the uploaded file.
expected_type: Logical file category ("image" or "zip").
Raises:
FileSignatureError: File header doesn't match expected type
signatures.
"""
if len(file_content) < 4:
raise FileSignatureError(
f"File too small to verify {expected_type} signature",
expected_type=expected_type,
)
# Common file signatures
signatures = {
"image": [
b"\xff\xd8\xff", # JPEG
b"\xff\xd8\xff\xe1", # JPEG EXIF (additional JPEG variant)
b"\x89PNG\r\n\x1a\n", # PNG
],
"zip": [
b"PK\x03\x04", # ZIP file
b"PK\x05\x06", # Empty ZIP
b"PK\x07\x08", # ZIP with spanning
],
}
expected_signatures = signatures.get(expected_type, [])
for signature in expected_signatures:
if file_content.startswith(signature):
return # Signature matched
# No matching signature found
raise FileSignatureError(
f"File content does not match expected {expected_type} format",
expected_type=expected_type,
)
def _sanitize_filename(self, filename: str) -> str:
"""
Sanitize user-provided filename to prevent security risks.
Args:
filename: Original filename supplied by the user.
Returns:
Sanitized filename safe for storage and processing.
Raises:
UnicodeSecurityError: Filename contains dangerous Unicode
characters or fails normalization checks.
WindowsReservedNameError: Filename uses Windows reserved
device names.
ExtensionSecurityError: Filename contains blocked or
dangerous file extensions.
ValueError: Filename is empty string.
"""
if not filename:
raise ValueError("Filename cannot be empty")
# Unicode security validation (must be first)
# This detects and blocks Unicode-based attacks before any other processing
filename = self.unicode_validator.validate_unicode_security(filename)
# Remove path components to prevent directory traversal
filename = os.path.basename(filename)
# Remove null bytes and control characters
filename = "".join(
char for char in filename if ord(char) >= 32 and char != "\x7f"
)
# Remove dangerous characters that could be used for path traversal or command injection
dangerous_chars = '<>:"/\\|?*\x00'
for char in dangerous_chars:
filename = filename.replace(char, "_")
# Check for Windows reserved names before any other processing
# This must be done early to prevent reserved names from being created
self.windows_validator.validate_windows_reserved_names(filename)
# Handle compound and double extensions security risk
# This also checks all dangerous extensions
self.extension_validator.validate_extensions(filename)
# Limit filename length (preserve extension)
name_part, ext_part = os.path.splitext(filename)
if len(name_part) > 100:
name_part = name_part[:100]
filename = name_part + ext_part
# Ensure we don't end up with just an extension or empty name
if not name_part or name_part.strip() == "":
filename = f"file_{int(time.time())}{ext_part}"
# Final check: ensure the sanitized filename doesn't become a reserved name
self.windows_validator.validate_windows_reserved_names(filename)
logger.debug(
"Filename sanitized: original='%s' -> sanitized='%s'",
os.path.basename(filename if filename else "None"),
filename,
)
return filename
def _validate_filename(self, file: UploadFile) -> None:
"""
Validate filename of uploaded file and sanitize it in place.
Args:
file: Uploaded file whose filename should be validated and
sanitized.
Raises:
FilenameSecurityError: Filename is empty, invalid, or fails
sanitization.
FileProcessingError: Unexpected error during filename
validation.
"""
# Check filename
if not file.filename:
raise FilenameSecurityError(
"Filename is required",
error_code=ErrorCode.FILENAME_EMPTY,
)
# Sanitize the filename to prevent security issues
try:
sanitized_filename = self._sanitize_filename(file.filename)
# Update the file object with sanitized filename
file.filename = sanitized_filename
# Additional validation after sanitization
if not sanitized_filename or sanitized_filename.strip() == "":
raise FilenameSecurityError(
"Invalid filename after sanitization",
filename=file.filename,
error_code=ErrorCode.FILENAME_INVALID,
)
except FileValidationError:
# Let FileValidationError and subclasses propagate
raise
except Exception as err:
logger.exception("Unexpected error during filename validation: %s", err)
raise FileProcessingError(
"Filename validation failed due to internal error",
original_error=err,
) from err
def _validate_file_extension(
self, file: UploadFile, allowed_extensions: set[str]
) -> None:
"""
Validate extension of uploaded file against allowed and blocked lists.
Args:
file: File whose extension will be validated.
allowed_extensions: Set of allowed file extensions.
Raises:
FilenameSecurityError: Filename is missing.
ExtensionSecurityError: Extension is not allowed or is blocked.
"""
# Check file extension
if not file.filename:
raise FilenameSecurityError(
"Filename is required for extension validation",
error_code=ErrorCode.FILENAME_EMPTY,
)
_, ext = os.path.splitext(file.filename.lower())
if ext not in allowed_extensions:
raise ExtensionSecurityError(
f"Invalid file extension. Allowed: {', '.join(allowed_extensions)}",
filename=file.filename,
extension=ext,
error_code=ErrorCode.EXTENSION_NOT_ALLOWED,
)
# Check for blocked extensions
if ext in self.config.BLOCKED_EXTENSIONS:
raise ExtensionSecurityError(
f"File extension {ext} is blocked for security reasons",
filename=file.filename,
extension=ext,
error_code=ErrorCode.EXTENSION_BLOCKED,
)
async def _validate_file_size(
self, file: UploadFile, max_file_size: int
) -> tuple[bytes, int]:
"""
Validate uploaded file size by sampling content and determining total bytes.
Args:
file: Uploaded file supporting asynchronous read and seek.
max_file_size: Maximum allowed file size in bytes.
Returns:
Tuple containing first 8 KB of file content and detected file
size in bytes.
Raises:
FileSizeError: File size exceeds maximum or file is empty.
"""
# Read first chunk for content analysis
file_content = await file.read(8192) # Read first 8KB
# Reset file position
await file.seek(0)
# Check file size
file_size = len(file_content)
if hasattr(file, "size") and file.size:
file_size = file.size
else:
# Estimate size by reading the rest
remaining = await file.read()
file_size = len(file_content) + len(remaining)
await file.seek(0)
if file_size > max_file_size:
raise FileSizeError(
f"File too large. File size: {file_size // (1024*1024)}MB, maximum: {max_file_size // (1024*1024)}MB",
size=file_size,
max_size=max_file_size,
)
if file_size == 0:
raise FileSizeError(
"Empty file not allowed",
size=0,
max_size=max_file_size,
)
return file_content, file_size
async def validate_image_file(self, file: UploadFile) -> None:
"""
Validate uploaded image by checking filename, extension, size, MIME type, and signature.
Args:
file: Uploaded file to validate.
Raises:
FilenameSecurityError: Filename is empty, invalid, or fails
security checks.
ExtensionSecurityError: File extension is not allowed or is
blocked.
FileSizeError: File size exceeds maximum or file is empty.
MimeTypeError: MIME type is not in allowed image types.
FileSignatureError: File signature doesn't match expected image
format.
FileProcessingError: Unexpected error during validation.
"""
try:
# Validate filename (raises exceptions on failure)
self._validate_filename(file)
# Validate file extension (raises exceptions on failure)
self._validate_file_extension(file, self.config.ALLOWED_IMAGE_EXTENSIONS)
# Validate file size (raises exceptions on failure, returns content and size on success)
file_content, file_size = await self._validate_file_size(
file, self.config.limits.max_image_size
)
# Detect MIME type
filename = file.filename or "unknown"
detected_mime = self._detect_mime_type(file_content, filename)
if detected_mime not in self.config.ALLOWED_IMAGE_MIMES:
raise MimeTypeError(
f"Invalid file type. Detected: {detected_mime}. Allowed: {', '.join(self.config.ALLOWED_IMAGE_MIMES)}",
filename=filename,
detected_mime=detected_mime,
allowed_mimes=list(self.config.ALLOWED_IMAGE_MIMES),
)
# Validate file signature (raises exceptions on failure)
self._validate_file_signature(file_content, "image")
logger.debug(
"Image file validation passed: %s (%s, %s bytes)",
filename,
detected_mime,
file_size,
)
except FileValidationError:
# Let FileValidationError and subclasses propagate
raise
except Exception as err:
logger.exception("Error during image file validation: %s", err)
raise FileProcessingError(
"File validation failed due to internal error",
original_error=err,
) from err
async def validate_zip_file(self, file: UploadFile) -> None:
"""
Validate uploaded ZIP archive against service configuration.
Args:
file: Incoming ZIP file-like object to validate.
Raises:
FilenameSecurityError: Filename is empty, invalid, or fails
security checks.
ExtensionSecurityError: File extension is not allowed or is
blocked.
FileSizeError: File size exceeds maximum or file is empty.
MimeTypeError: MIME type is not in allowed ZIP types.
FileSignatureError: File signature doesn't match expected ZIP
format.
CompressionSecurityError: ZIP compression validation failed
(zip bomb detected).
FileProcessingError: Unexpected error during validation.
"""
try:
# Validate filename (raises exceptions on failure)
self._validate_filename(file)
# Validate file extension (raises exceptions on failure)
self._validate_file_extension(file, self.config.ALLOWED_ZIP_EXTENSIONS)
# Validate file size (raises exceptions on failure, returns content and size on success)
file_content, file_size = await self._validate_file_size(
file, self.config.limits.max_zip_size
)
# Detect MIME type using first 8KB
filename = file.filename or "unknown"
detected_mime = self._detect_mime_type(file_content, filename)
# Validate ZIP file signature first (most reliable check)
# This will raise FileSignatureError if signature doesn't match
try:
self._validate_file_signature(file_content, "zip")
except FileSignatureError as err:
# Re-raise with more specific message
raise FileSignatureError(
"File content does not match ZIP format",
filename=filename,
expected_type="zip",
) from err
# Check MIME type, but allow application/octet-stream if signature is valid
# Some ZIP files are detected as octet-stream, but signature check ensures it's really a ZIP
if detected_mime not in self.config.ALLOWED_ZIP_MIMES:
if detected_mime == "application/octet-stream":
# Valid ZIP file, just detected as generic binary
logger.debug(
"ZIP file detected as application/octet-stream, but signature is valid: %s",
filename,
)
else:
raise MimeTypeError(
f"Invalid file type. Detected: {detected_mime}. Expected ZIP file.",
filename=filename,
detected_mime=detected_mime,
allowed_mimes=list(self.config.ALLOWED_ZIP_MIMES),
)
# For ZIP validation (compression ratio and content inspection), we need the full file
# Read the entire file content for proper ZIP analysis
await file.seek(0)
full_file_content = await file.read()
file_size = len(full_file_content)
# Reset file position for any subsequent operations
await file.seek(0)
# Validate ZIP compression ratio to detect zip bombs
if file_size is not None:
self.compression_validator.validate_zip_compression_ratio(
full_file_content, file_size
)
# Perform ZIP content inspection if enabled
if self.config.limits.scan_zip_content:
self.zip_inspector.inspect_zip_content(full_file_content)
logger.debug(
"ZIP file validation passed: %s (%s, %s bytes)",
filename,
detected_mime,
file_size,
)
except FileValidationError:
# Let FileValidationError and subclasses propagate
raise
except Exception as err:
logger.exception("Error during ZIP file validation: %s", err)
raise FileProcessingError(
"File validation failed due to internal error",
original_error=err,
) from err

View File

@@ -1,10 +0,0 @@
"""
File content inspection modules for security validation.
This package provides inspectors that analyze the internal structure
and contents of uploaded files to detect potential security threats.
"""
from .zip_inspector import ZipContentInspector
__all__ = ["ZipContentInspector"]

View File

@@ -1,411 +0,0 @@
"""ZIP content inspector for security threat detection."""
from __future__ import annotations
import io
import os
import time
import zipfile
from typing import TYPE_CHECKING
import logging
from ..enums import SuspiciousFilePattern, ZipThreatCategory
from ..exceptions import ZipContentError, FileProcessingError, ErrorCode
if TYPE_CHECKING:
from ..config import FileSecurityConfig
logger = logging.getLogger(__name__)
class ZipContentInspector:
"""
Inspects ZIP archive contents for security threats.
Attributes:
config: File security configuration.
"""
def __init__(self, config: FileSecurityConfig):
"""
Initialize ZIP inspector with configuration.
Args:
config: File security configuration.
"""
self.config = config
def inspect_zip_content(self, file_content: bytes) -> None:
"""
Inspect ZIP archive for potential security threats.
Args:
file_content: Raw bytes of ZIP archive.
Raises:
ZipContentError: If security threats are detected in ZIP
content such as directory traversal, symlinks, nested
archives, or suspicious patterns.
FileProcessingError: If ZIP structure is invalid or
unexpected error occurs during inspection.
"""
try:
zip_bytes = io.BytesIO(file_content)
threats_found = []
# Start analysis timer
start_time = time.time()
with zipfile.ZipFile(zip_bytes, "r") as zip_file:
zip_entries = zip_file.infolist()
# Analyze each entry in the ZIP
for entry in zip_entries:
# Check for timeout
if (
time.time() - start_time
> self.config.limits.zip_analysis_timeout
):
logger.error(
"ZIP content inspection timeout",
extra={
"error_type": "zip_analysis_timeout",
"timeout": self.config.limits.zip_analysis_timeout,
},
)
raise ZipContentError(
message=f"ZIP content inspection timeout after {self.config.limits.zip_analysis_timeout}s",
threats=["Analysis timeout - potential zip bomb"],
error_code=ErrorCode.ZIP_ANALYSIS_TIMEOUT,
)
# Inspect individual entry
entry_threats = self._inspect_zip_entry(entry, zip_file)
threats_found.extend(entry_threats)
# Check for ZIP structure threats
structure_threats = self._inspect_zip_structure(zip_entries)
threats_found.extend(structure_threats)
# Return results
if threats_found:
logger.warning(
"ZIP content threats detected",
extra={
"error_type": "zip_content_threat",
"threats": threats_found,
"threat_count": len(threats_found),
},
)
raise ZipContentError(
message=f"ZIP content threats detected: {'; '.join(threats_found)}",
threats=threats_found,
)
logger.debug(
"ZIP content inspection passed: %s entries analyzed",
len(zip_entries),
)
except ZipContentError:
# Re-raise our own exceptions
raise
except zipfile.BadZipFile as err:
logger.error("Invalid or corrupted ZIP file structure", exc_info=True)
raise FileProcessingError(
message="Invalid or corrupted ZIP file structure",
original_error=err,
) from err
except Exception as err:
logger.error(
"Unexpected error during ZIP content inspection",
exc_info=True,
)
raise FileProcessingError(
message=f"ZIP content inspection failed: {str(err)}",
original_error=err,
) from err
def _inspect_zip_entry(
self, entry: zipfile.ZipInfo, zip_file: zipfile.ZipFile
) -> list[str]:
"""
Inspect single ZIP entry for security threats.
Args:
entry: ZIP entry metadata.
zip_file: Parent ZIP archive.
Returns:
List of threat descriptions.
"""
threats = []
filename = entry.filename
# 1. Check for directory traversal attacks
if self._has_directory_traversal(filename):
threats.append(f"Directory traversal attack in '{filename}'")
# 2. Check for absolute paths
if not self.config.limits.allow_absolute_paths and self._has_absolute_path(
filename
):
threats.append(f"Absolute path detected in '{filename}'")
# 3. Check for symbolic links
if not self.config.limits.allow_symlinks and self._is_symlink(entry):
threats.append(f"Symbolic link detected: '{filename}'")
# 4. Check filename length limits
if len(os.path.basename(filename)) > self.config.limits.max_filename_length:
threats.append(
f"Filename too long: '{filename}' ({len(os.path.basename(filename))} chars)"
)
# 5. Check path length limits
if len(filename) > self.config.limits.max_path_length:
threats.append(f"Path too long: '{filename}' ({len(filename)} chars)")
# 6. Check for suspicious filename patterns
suspicious_patterns = self._check_suspicious_patterns(filename)
threats.extend(suspicious_patterns)
# 7. Check for nested archives
if not self.config.limits.allow_nested_archives and self._is_nested_archive(
filename
):
threats.append(f"Nested archive detected: '{filename}'")
# 8. Check file content if enabled and entry is small enough
if (
self.config.limits.scan_zip_content
and not entry.is_dir()
and entry.file_size < 1024 * 1024
): # 1MB limit for content scan
content_threats = self._inspect_entry_content(entry, zip_file)
threats.extend(content_threats)
return threats
def _inspect_zip_structure(self, entries: list[zipfile.ZipInfo]) -> list[str]:
"""
Inspect ZIP structure for anomalies.
Args:
entries: All ZIP entries to analyze.
Returns:
List of structural threat descriptions.
"""
threats = []
# Check directory depth
max_depth = 0
for entry in entries:
depth = entry.filename.count("/") + entry.filename.count("\\")
max_depth = max(max_depth, depth)
if max_depth > self.config.limits.max_zip_depth:
threats.append(
f"Excessive directory depth: {max_depth} (max: {self.config.limits.max_zip_depth})"
)
# Check for suspicious file distribution
file_types = {}
for entry in entries:
if not entry.is_dir():
ext = os.path.splitext(entry.filename)[1].lower()
file_types[ext] = file_types.get(ext, 0) + 1
# Check for excessive number of same-type files (potential spam/bomb)
for ext, count in file_types.items():
if count > 1000: # More than 1000 files of same type
threats.append(f"Excessive number of {ext} files: {count}")
return threats
def _has_directory_traversal(self, filename: str) -> bool:
"""
Check for directory traversal indicators.
Args:
filename: Filename to check.
Returns:
True if traversal detected.
"""
filename_lower = filename.lower()
for category in SuspiciousFilePattern:
if category == SuspiciousFilePattern.DIRECTORY_TRAVERSAL:
for pattern in category.value:
if pattern.lower() in filename_lower:
return True
# Additional checks for normalized paths
normalized = os.path.normpath(filename)
if normalized.startswith("..") or "/.." in normalized or "\\.." in normalized:
return True
return False
def _has_absolute_path(self, filename: str) -> bool:
"""
Check if filename is an absolute path.
Args:
filename: Path to check.
Returns:
True if absolute path detected.
"""
return (
filename.startswith("/") # Unix absolute path
or filename.startswith("\\") # Windows UNC path
or (len(filename) > 1 and filename[1] == ":") # Windows drive path
)
def _is_symlink(self, entry: zipfile.ZipInfo) -> bool:
"""
Check if entry is a symbolic link.
Args:
entry: ZIP entry to check.
Returns:
True if entry is a symlink.
"""
# Check if entry has symlink attributes
return (entry.external_attr >> 16) & 0o120000 == 0o120000
def _check_suspicious_patterns(self, filename: str) -> list[str]:
"""
Check filename for suspicious patterns.
Args:
filename: Filename to check.
Returns:
List of pattern warnings.
"""
threats = []
filename_lower = filename.lower()
basename = os.path.basename(filename_lower)
# Check suspicious names
for pattern in SuspiciousFilePattern.SUSPICIOUS_NAMES.value:
if basename == pattern.lower():
threats.append(f"Suspicious filename pattern: '{filename}'")
break
# Check suspicious path components
for pattern in SuspiciousFilePattern.SUSPICIOUS_PATHS.value:
if pattern.lower() in filename_lower:
threats.append(
f"Suspicious path component: '{filename}' contains '{pattern}'"
)
break
return threats
def _is_nested_archive(self, filename: str) -> bool:
"""
Check if filename represents a nested archive.
Args:
filename: Filename to check.
Returns:
True if nested archive detected.
"""
ext = os.path.splitext(filename)[1].lower()
for category in ZipThreatCategory:
if category == ZipThreatCategory.NESTED_ARCHIVES:
return ext in category.value
return False
def _inspect_entry_content(
self, entry: zipfile.ZipInfo, zip_file: zipfile.ZipFile
) -> list[str]:
"""
Inspect ZIP entry content for malicious signatures.
Args:
entry: ZIP entry to inspect.
zip_file: Parent ZIP archive.
Returns:
List of content threat descriptions.
"""
threats = []
try:
# Read first few bytes to check for executable signatures
with zip_file.open(entry, "r") as file:
content_sample = file.read(512) # Read first 512 bytes
# Check for executable signatures
for signature in SuspiciousFilePattern.EXECUTABLE_SIGNATURES.value:
if content_sample.startswith(signature):
threats.append(
f"Executable content detected in '{entry.filename}'"
)
break
# Check for script content patterns
if self._contains_script_patterns(content_sample, entry.filename):
threats.append(f"Script content detected in '{entry.filename}'")
except Exception as err:
logger.warning(
"Could not inspect content of '%s': %s",
entry.filename,
err,
)
return threats
def _contains_script_patterns(self, content: bytes, filename: str) -> bool:
"""
Check content for malicious script patterns.
Args:
content: Raw bytes to inspect.
filename: Filename for context.
Returns:
True if script patterns found.
"""
try:
# Try to decode as text
text_content = content.decode("utf-8", errors="ignore").lower()
# Check for common script patterns
script_patterns = [
"#!/bin/",
"#!/usr/bin/",
"powershell",
"cmd.exe",
"eval(",
"exec(",
"system(",
"shell_exec(",
"<script",
"<?php",
"<%",
"import os",
"import subprocess",
]
for pattern in script_patterns:
if pattern in text_content:
return True
except Exception:
# If we can't decode as text, it's probably binary
pass
return False

View File

@@ -1,51 +0,0 @@
"""
Framework-agnostic protocols for file upload handling.
This module defines protocols that allow safeuploads to work with any
web framework's file upload implementation without depending on specific
framework packages.
"""
from typing import Protocol, runtime_checkable
@runtime_checkable
class UploadFileProtocol(Protocol):
"""
Protocol for file upload objects from any web framework.
This protocol defines the minimal interface required for file
validation. Any object with these attributes and methods can be
validated, regardless of the web framework being used.
Attributes:
filename: Original filename from the client.
size: Size of the uploaded file in bytes.
"""
filename: str | None
size: int | None
async def read(self, size: int = -1) -> bytes:
"""
Read bytes from the uploaded file.
Args:
size: Number of bytes to read. -1 reads entire file.
Returns:
Bytes read from the file.
"""
...
async def seek(self, offset: int) -> int:
"""
Move file pointer to specified position.
Args:
offset: Position to move to in bytes.
Returns:
New position in the file.
"""
...

View File

@@ -1,22 +0,0 @@
"""
Security validation modules for uploaded files.
This package provides validators that check filenames and file
properties for potential security threats including Unicode attacks,
invalid extensions, Windows-specific vulnerabilities, and compression
bombs.
"""
from .base import BaseValidator
from .unicode_validator import UnicodeSecurityValidator
from .extension_validator import ExtensionSecurityValidator
from .windows_validator import WindowsSecurityValidator
from .compression_validator import CompressionSecurityValidator
__all__ = [
"BaseValidator",
"UnicodeSecurityValidator",
"ExtensionSecurityValidator",
"WindowsSecurityValidator",
"CompressionSecurityValidator",
]

View File

@@ -1,43 +0,0 @@
"""
Base validator interface for file security checks.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from ..config import FileSecurityConfig
class BaseValidator(ABC):
"""
Abstract base class for file security validators.
Attributes:
config: File security configuration parameters.
"""
def __init__(self, config: FileSecurityConfig):
"""
Initialize validator with configuration.
Args:
config: File security settings to apply.
"""
self.config = config
@abstractmethod
def validate(self, *args, **kwargs) -> Any:
"""
Validate data using subclass-specific logic.
Args:
*args: Positional arguments for concrete validator.
**kwargs: Keyword arguments for concrete validator.
Returns:
Validated result defined by subclass.
"""
pass

View File

@@ -1,286 +0,0 @@
"""
Validates ZIP compression ratios and detects zip bombs.
"""
from __future__ import annotations
import io
import time
import zipfile
import logging
from typing import TYPE_CHECKING
from .base import BaseValidator
from ..exceptions import (
ZipBombError,
CompressionSecurityError,
FileProcessingError,
ErrorCode,
)
if TYPE_CHECKING:
from ..config import FileSecurityConfig
logger = logging.getLogger(__name__)
class CompressionSecurityValidator(BaseValidator):
"""
Validates ZIP uploads against zip bombs and compression attacks.
Attributes:
config: Security configuration for validation limits.
"""
def __init__(self, config: FileSecurityConfig):
"""
Initialize the compression validator.
Args:
config: Security configuration with compression limits.
"""
super().__init__(config)
def validate_zip_compression_ratio(
self, file_content: bytes, compressed_size: int
) -> None:
"""
Validate ZIP archive against security limits.
Args:
file_content: Raw bytes of the ZIP archive.
compressed_size: Size of the compressed archive in bytes.
Raises:
ZipBombError: If compression ratio exceeds maximum allowed
or total uncompressed size is too large.
CompressionSecurityError: If ZIP structure is invalid, too
many entries, nested archives detected, or individual
file too large.
FileProcessingError: If unexpected error occurs during
validation such as memory errors or I/O errors.
"""
try:
# Create a BytesIO object from file content for zipfile analysis
zip_bytes = io.BytesIO(file_content)
# Track analysis metrics
total_uncompressed_size = 0
total_compressed_size = compressed_size
file_count = 0
nested_archives = []
max_compression_ratio = 0
overall_compression_ratio = 0 # Initialize to avoid unbound variable
# Analyze ZIP file structure with timeout protection
start_time = time.time()
with zipfile.ZipFile(zip_bytes, "r") as zip_file:
# Check for excessive number of files
zip_entries = zip_file.infolist()
file_count = len(zip_entries)
if file_count > self.config.limits.max_zip_entries:
logger.warning(
"ZIP contains too many files",
extra={
"error_type": "zip_too_many_entries",
"file_count": file_count,
"max_entries": self.config.limits.max_zip_entries,
},
)
raise CompressionSecurityError(
message=f"ZIP contains too many files: {file_count}. "
f"Maximum allowed: {self.config.limits.max_zip_entries}",
error_code=ErrorCode.ZIP_TOO_MANY_ENTRIES,
)
# Analyze each entry in the ZIP
for entry in zip_entries:
# Check for timeout
if (
time.time() - start_time
> self.config.limits.zip_analysis_timeout
):
logger.error(
"ZIP analysis timeout",
extra={
"error_type": "zip_analysis_timeout",
"timeout": self.config.limits.zip_analysis_timeout,
},
)
raise ZipBombError(
message=f"ZIP analysis timeout after {self.config.limits.zip_analysis_timeout}s - potential zip bomb",
compression_ratio=0,
)
# Skip directories
if entry.is_dir():
continue
# Track uncompressed size
uncompressed_size = entry.file_size
compressed_size_entry = entry.compress_size
total_uncompressed_size += uncompressed_size
# Check individual file compression ratio
if compressed_size_entry > 0: # Avoid division by zero
compression_ratio = uncompressed_size / compressed_size_entry
max_compression_ratio = max(
max_compression_ratio, compression_ratio
)
if compression_ratio > self.config.limits.max_compression_ratio:
logger.error(
"Excessive compression ratio detected",
extra={
"error_type": "compression_ratio_exceeded",
"file_name": entry.filename,
"compression_ratio": compression_ratio,
"max_ratio": self.config.limits.max_compression_ratio,
},
)
raise ZipBombError(
message=f"Excessive compression ratio detected: {compression_ratio:.1f}:1 for '{entry.filename}'. "
f"Maximum allowed: {self.config.limits.max_compression_ratio}:1",
compression_ratio=compression_ratio,
)
# Check for nested archive files
filename_lower = entry.filename.lower()
if any(
filename_lower.endswith(ext)
for ext in [".zip", ".rar", ".7z", ".tar", ".gz", ".bz2"]
):
nested_archives.append(entry.filename)
# Check for excessively large individual files
# Use the configurable max_individual_file_size limit
if uncompressed_size > self.config.limits.max_individual_file_size:
logger.warning(
"Individual file too large",
extra={
"error_type": "file_too_large",
"file_name": entry.filename,
"size_mb": uncompressed_size // (1024 * 1024),
"max_size_mb": self.config.limits.max_individual_file_size
// (1024 * 1024),
},
)
raise CompressionSecurityError(
message=f"Individual file too large: '{entry.filename}' would expand to {uncompressed_size // (1024*1024)}MB. "
f"Maximum allowed: {self.config.limits.max_individual_file_size // (1024*1024)}MB",
error_code=ErrorCode.FILE_TOO_LARGE,
)
# Check total uncompressed size
if total_uncompressed_size > self.config.limits.max_uncompressed_size:
logger.warning(
"Total uncompressed size too large",
extra={
"error_type": "zip_too_large",
"total_size_mb": total_uncompressed_size // (1024 * 1024),
"max_size_mb": self.config.limits.max_uncompressed_size
// (1024 * 1024),
},
)
raise ZipBombError(
message=f"Total uncompressed size too large: {total_uncompressed_size // (1024*1024)}MB. "
f"Maximum allowed: {self.config.limits.max_uncompressed_size // (1024*1024)}MB",
compression_ratio=0,
uncompressed_size=total_uncompressed_size,
max_size=self.config.limits.max_uncompressed_size,
)
# Check overall compression ratio
if total_compressed_size > 0:
overall_compression_ratio = (
total_uncompressed_size / total_compressed_size
)
if (
overall_compression_ratio
> self.config.limits.max_compression_ratio
):
logger.error(
"Overall compression ratio too high",
extra={
"error_type": "compression_ratio_exceeded",
"overall_ratio": overall_compression_ratio,
"max_ratio": self.config.limits.max_compression_ratio,
},
)
raise ZipBombError(
message=f"Overall compression ratio too high: {overall_compression_ratio:.1f}:1. "
f"Maximum allowed: {self.config.limits.max_compression_ratio}:1",
compression_ratio=overall_compression_ratio,
max_ratio=self.config.limits.max_compression_ratio,
)
# Reject nested archives (potential security risk)
if nested_archives:
logger.warning(
"Nested archives detected",
extra={
"error_type": "zip_nested_archive",
"nested_archives": nested_archives,
},
)
raise CompressionSecurityError(
message=f"Nested archives are not allowed: {', '.join(nested_archives)}",
error_code=ErrorCode.ZIP_NESTED_ARCHIVE,
)
# Log analysis results
logger.debug(
"ZIP analysis: %s files, %sMB uncompressed, max ratio: %.1f:1, overall ratio: %.1f:1",
file_count,
total_uncompressed_size // (1024 * 1024),
max_compression_ratio,
overall_compression_ratio,
)
except zipfile.BadZipFile as err:
logger.error("Invalid or corrupted ZIP file", exc_info=True)
raise CompressionSecurityError(
message="Invalid or corrupted ZIP file",
error_code=ErrorCode.ZIP_CORRUPT,
) from err
except zipfile.LargeZipFile as err:
logger.error("ZIP file too large to process", exc_info=True)
raise CompressionSecurityError(
message="ZIP file too large to process safely",
error_code=ErrorCode.ZIP_TOO_LARGE,
) from err
except MemoryError as err:
logger.error("ZIP requires excessive memory", exc_info=True)
raise ZipBombError(
message="ZIP file requires too much memory to process - potential zip bomb",
compression_ratio=0,
) from err
except (ZipBombError, CompressionSecurityError):
# Re-raise our own exceptions
raise
except Exception as err:
logger.error(
"Unexpected error during ZIP compression validation",
exc_info=True,
)
raise FileProcessingError(
message=f"ZIP validation failed: {str(err)}",
) from err
def validate(self, file_content: bytes, compressed_size: int) -> None:
"""
Validate the compression ratio of a ZIP file.
Args:
file_content: Raw bytes of the uploaded file.
compressed_size: Size of the file after compression in bytes.
Raises:
ZipBombError: If compression ratio exceeds maximum allowed.
CompressionSecurityError: If ZIP structure is invalid.
FileProcessingError: If unexpected error occurs.
"""
return self.validate_zip_compression_ratio(file_content, compressed_size)

View File

@@ -1,97 +0,0 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from .base import BaseValidator
from ..exceptions import ExtensionSecurityError, ErrorCode
if TYPE_CHECKING:
from ..config import FileSecurityConfig
logger = logging.getLogger(__name__)
class ExtensionSecurityValidator(BaseValidator):
"""
Validates filenames against configured forbidden extensions.
Attributes:
config: File security configuration settings.
"""
def __init__(self, config: FileSecurityConfig):
"""
Initialize the validator.
Args:
config: File security configuration settings.
"""
super().__init__(config)
def validate_extensions(self, filename: str) -> None:
"""
Validate filename against blocked extensions.
Args:
filename: Name of the file to validate.
Raises:
ExtensionSecurityError: If blocked compound or single
extension detected in filename.
"""
# Check for compound dangerous extensions first (e.g., .tar.xz, .user.js)
filename_lower = filename.lower()
for compound_ext in self.config.COMPOUND_BLOCKED_EXTENSIONS:
if filename_lower.endswith(compound_ext):
logger.warning(
"Dangerous compound extension detected",
extra={
"error_type": "compound_extension_blocked",
"file_name": filename,
"extension": compound_ext,
},
)
raise ExtensionSecurityError(
message=f"Dangerous compound file extension '{compound_ext}' detected in filename. "
f"Upload rejected for security.",
filename=filename,
extension=compound_ext,
error_code=ErrorCode.COMPOUND_EXTENSION_BLOCKED,
)
# Check ALL extensions in the filename for dangerous ones
parts = filename.split(".")
if len(parts) > 1:
for i in range(1, len(parts)):
ext = f".{parts[i].lower()}"
if ext in self.config.BLOCKED_EXTENSIONS:
logger.warning(
"Dangerous extension detected",
extra={
"error_type": "extension_blocked",
"file_name": filename,
"extension": ext,
},
)
raise ExtensionSecurityError(
message=f"Dangerous file extension '{ext}' detected in filename. "
f"Upload rejected for security.",
filename=filename,
extension=ext,
error_code=ErrorCode.EXTENSION_BLOCKED,
)
def validate(self, filename: str) -> None:
"""
Validate the given filename.
Args:
filename: Name of the file to validate.
Raises:
ExtensionSecurityError: If filename extension is not
permitted.
"""
return self.validate_extensions(filename)

View File

@@ -1,132 +0,0 @@
"""Unicode Security Validator Module."""
from __future__ import annotations
import unicodedata
import logging
from typing import TYPE_CHECKING
from .base import BaseValidator
from ..exceptions import UnicodeSecurityError
if TYPE_CHECKING:
from ..config import FileSecurityConfig
logger = logging.getLogger(__name__)
class UnicodeSecurityValidator(BaseValidator):
"""
Validates filenames for Unicode security threats.
Attributes:
config: Runtime configuration for file security rules.
"""
def __init__(self, config: FileSecurityConfig):
"""
Initialize the Unicode validator.
Args:
config: Runtime configuration that controls file security rules.
"""
super().__init__(config)
def validate_unicode_security(self, filename: str) -> str:
"""
Validate filename for unsafe Unicode characters.
Args:
filename: The filename to validate and normalize.
Returns:
The NFC-normalized filename.
Raises:
UnicodeSecurityError: If dangerous Unicode characters are
detected in the filename or result from normalization.
"""
if not filename:
return filename
# Check for dangerous Unicode characters
dangerous_chars_found = []
for i, char in enumerate(filename):
char_code = ord(char)
if char_code in self.config.DANGEROUS_UNICODE_CHARS:
dangerous_chars_found.append((char, char_code, i))
# If dangerous characters found, reject the file entirely
if dangerous_chars_found:
char_details = []
for char, code, pos in dangerous_chars_found:
char_name = unicodedata.name(char, f"U+{code:04X}")
char_details.append(
f"'{char}' (U+{code:04X}: {char_name}) at position {pos}"
)
logger.warning(
"Dangerous Unicode characters detected",
extra={
"error_type": "unicode_security",
"file_name": filename,
"char_codes": [code for _, code, _ in dangerous_chars_found],
"positions": [pos for _, _, pos in dangerous_chars_found],
},
)
raise UnicodeSecurityError(
message=f"Dangerous Unicode characters detected in filename: {', '.join(char_details)}. "
f"These characters can be used to disguise file extensions or create security vulnerabilities.",
filename=filename,
dangerous_chars=dangerous_chars_found,
)
# Normalize Unicode to prevent normalization attacks
# Use NFC (Canonical Decomposition, followed by Canonical Composition)
# This prevents attacks where different Unicode representations of the same text are used
normalized_filename = unicodedata.normalize("NFC", filename)
# Check if normalization changed the filename significantly
if normalized_filename != filename:
logger.info(
"Unicode normalization applied: '%s' -> '%s'",
filename,
normalized_filename,
)
# Additional check: ensure normalized filename doesn't contain dangerous chars
# (some normalization attacks might introduce them)
for char in normalized_filename:
char_code = ord(char)
if char_code in self.config.DANGEROUS_UNICODE_CHARS:
char_name = unicodedata.name(char, f"U+{char_code:04X}")
logger.error(
"Unicode normalization resulted in dangerous character",
extra={
"error_type": "unicode_normalization_error",
"file_name": filename,
"normalized_filename": normalized_filename,
"char_code": char_code,
},
)
raise UnicodeSecurityError(
message=f"Unicode normalization resulted in dangerous character: "
f"'{char}' (U+{char_code:04X}: {char_name})",
filename=filename,
dangerous_chars=[(char, char_code, 0)],
)
return normalized_filename
def validate(self, filename: str) -> str:
"""
Validate a filename for Unicode security issues.
Args:
filename: The name of the file to assess.
Returns:
The validated and normalized filename.
"""
return self.validate_unicode_security(filename)

View File

@@ -1,95 +0,0 @@
"""Windows security validator for filename validation."""
from __future__ import annotations
import os
import logging
from typing import TYPE_CHECKING
from .base import BaseValidator
from ..exceptions import WindowsReservedNameError
if TYPE_CHECKING:
from ..config import FileSecurityConfig
logger = logging.getLogger(__name__)
class WindowsSecurityValidator(BaseValidator):
"""
Validator for Windows reserved device names.
Attributes:
config: File security configuration settings.
"""
def __init__(self, config: FileSecurityConfig):
"""
Initialize the validator.
Args:
config: File security configuration settings.
"""
super().__init__(config)
def validate_windows_reserved_names(self, filename: str) -> None:
"""
Validate filename against Windows reserved device names.
Args:
filename: The filename to validate.
Raises:
WindowsReservedNameError: If filename matches a Windows
reserved device name.
"""
# Check iteratively by removing extensions to handle compound extensions
# e.g., "CON.tar.gz" -> check "con.tar" and "con"
current_name = filename
while current_name:
# Get basename without extension
name_without_ext, ext = os.path.splitext(current_name)
# Normalize: lowercase, strip whitespace
name_to_check = name_without_ext.lower().strip()
# Remove leading dots to handle hidden files like ".CON.jpg"
name_to_check = name_to_check.lstrip(".")
# Remove trailing dots to handle cases like "con." or "con.."
name_to_check = name_to_check.rstrip(".")
if name_to_check in self.config.WINDOWS_RESERVED_NAMES:
logger.warning(
"Windows reserved name detected",
extra={
"error_type": "windows_reserved_name",
"file_name": filename,
"reserved_name": name_to_check.upper(),
},
)
raise WindowsReservedNameError(
message=f"Filename '{filename}' uses Windows reserved name '{name_to_check.upper()}'. "
f"Reserved names: {', '.join(sorted(self.config.WINDOWS_RESERVED_NAMES)).upper()}",
filename=filename,
reserved_name=name_to_check.upper(),
)
# If no extension was removed, we're done
if not ext or name_without_ext == current_name:
break
current_name = name_without_ext
def validate(self, filename: str) -> None:
"""
Validate filename against Windows reserved naming rules.
Args:
filename: The filename to validate.
Raises:
WindowsReservedNameError: If filename matches a Windows
reserved device name.
"""
return self.validate_windows_reserved_names(filename)

View File

@@ -4,6 +4,9 @@ from fastapi import APIRouter, Depends, HTTPException, status, UploadFile
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from safeuploads import FileValidator
from safeuploads.exceptions import FileValidationError
import users.user.schema as users_schema
import users.user.crud as users_crud
import users.user.utils as users_utils
@@ -31,9 +34,6 @@ import auth.password_hasher as auth_password_hasher
import core.database as core_database
import core.logger as core_logger
from core.file_security.file_validator import FileValidator
from core.file_security.exceptions import FileValidationError
import websocket.schema as websocket_schema
# Define the API router

22
backend/poetry.lock generated
View File

@@ -3341,6 +3341,26 @@ files = [
[package.dependencies]
requests = ">=2.0.1,<3.0.0"
[[package]]
name = "safeuploads"
version = "0.1.0"
description = "A comprehensive file security system for validating uploads and preventing attacks"
optional = false
python-versions = ">=3.13"
groups = ["main"]
files = [
{file = "safeuploads-0.1.0-py3-none-any.whl", hash = "sha256:c1a64b1e3def7c5b84ba6f0ad38dc8d19969438d054d866ba636aa9d6e02441b"},
{file = "safeuploads-0.1.0.tar.gz", hash = "sha256:84d56245af9c24ee1b9d380e56cf05d7f88c30b70bbcae2d893de8e3e87c8540"},
]
[package.dependencies]
fastapi = {version = ">=0.110,<1.0", optional = true, markers = "extra == \"fastapi\""}
python-magic = ">=0.4.27,<0.5.0"
[package.extras]
dev = ["pytest (>=8.0,<9.0)", "pytest-asyncio (>=0.23,<1.0)", "pytest-cov (>=4.1,<5.0)"]
fastapi = ["fastapi (>=0.110,<1.0)"]
[[package]]
name = "secretstorage"
version = "3.4.0"
@@ -4147,4 +4167,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt
[metadata]
lock-version = "2.1"
python-versions = "^3.13"
content-hash = "9dad0ddc36e53c8b342f20e163f69a8be07446148a65d6c2b7f61302872ced1e"
content-hash = "7a3b048b9d49f791e7e7e6666f2b5d0ea79168fa0d731bf7eb8affbaafc58ac1"

View File

@@ -47,6 +47,7 @@ authlib = "^1.3.2"
httpx = "^0.28.1"
itsdangerous = "^2.2.0"
slowapi = "^0.1.9"
safeuploads = {extras = ["fastapi"], version = "^0.1.0"}
[tool.poetry.group.dev.dependencies]
pytest = "^8.3.4"