Refactor file security config and enums for clarity

Refactored file_security/config.py and enums.py to improve documentation, type annotations, and code clarity. Expanded exception imports and public API in __init__.py, removed deprecated utils.py, and updated configuration validation to use standard logging. Enhanced enum docstrings and structure for better maintainability.
This commit is contained in:
João Vitória Silva
2025-10-27 18:50:53 +00:00
parent cf5ee1edd2
commit e26da00608
16 changed files with 2448 additions and 1212 deletions

View File

@@ -6,13 +6,30 @@ A comprehensive file security system for validating uploads and preventing attac
# Core classes and configurations
from .config import SecurityLimits, FileSecurityConfig
from .exceptions import ConfigValidationError, FileSecurityConfigurationError
from .exceptions import (
ConfigValidationError,
FileSecurityConfigurationError,
ErrorCode,
FileSecurityError,
FileValidationError,
FilenameSecurityError,
UnicodeSecurityError,
ExtensionSecurityError,
WindowsReservedNameError,
FileSizeError,
MimeTypeError,
FileSignatureError,
CompressionSecurityError,
ZipBombError,
ZipContentError,
FileProcessingError,
)
from .enums import (
DangerousExtensionCategory,
CompoundExtensionCategory,
UnicodeAttackCategory,
SuspiciousFilePattern,
ZipThreatCategory
ZipThreatCategory,
)
# Main validator
@@ -24,59 +41,52 @@ from .validators import (
UnicodeSecurityValidator,
ExtensionSecurityValidator,
WindowsSecurityValidator,
CompressionSecurityValidator
CompressionSecurityValidator,
)
# Inspectors
from .inspectors import ZipContentInspector
# Utility functions (for backward compatibility)
from .utils import (
validate_profile_image_upload,
validate_profile_data_upload,
get_secure_filename,
validate_configuration,
file_validator
)
# Perform configuration validation when module is imported
# This ensures configuration issues are caught early during application startup
validate_configuration(strict=False)
FileSecurityConfig.validate_and_report(strict=False)
# Export all public APIs for backward compatibility
# Export all public APIs
__all__ = [
# Core configuration
"SecurityLimits",
"FileSecurityConfig",
# Exceptions
"ConfigValidationError",
"ConfigValidationError",
"FileSecurityConfigurationError",
"ErrorCode",
"FileSecurityError",
"FileValidationError",
"FilenameSecurityError",
"UnicodeSecurityError",
"ExtensionSecurityError",
"WindowsReservedNameError",
"FileSizeError",
"MimeTypeError",
"FileSignatureError",
"CompressionSecurityError",
"ZipBombError",
"ZipContentError",
"FileProcessingError",
# Enums
"DangerousExtensionCategory",
"CompoundExtensionCategory",
"CompoundExtensionCategory",
"UnicodeAttackCategory",
"SuspiciousFilePattern",
"ZipThreatCategory",
# Main validator
"FileValidator",
# Specialized validators
"BaseValidator",
"UnicodeSecurityValidator",
"ExtensionSecurityValidator",
"ExtensionSecurityValidator",
"WindowsSecurityValidator",
"CompressionSecurityValidator",
# Inspectors
"ZipContentInspector",
# Utility functions (maintain original API)
"validate_profile_image_upload",
"validate_profile_data_upload",
"get_secure_filename",
"validate_configuration",
"file_validator"
]
]

File diff suppressed because it is too large Load Diff

View File

@@ -1,123 +1,256 @@
"""
File Security Enums Module
"""Enumeration classes for categorizing security threats and patterns."""
Contains all enumeration classes for categorizing security threats and patterns.
"""
from enum import Enum
class DangerousExtensionCategory(Enum):
"""
File extension categories considered potentially dangerous for uploads.
Attributes:
WINDOWS_EXECUTABLES: Traditional Windows executable formats.
SCRIPT_FILES: Script files that can execute code.
WEB_SCRIPTS: Web server and dynamic content scripts.
UNIX_EXECUTABLES: Unix/Linux executables and shell scripts.
MACOS_EXECUTABLES: macOS specific executables and applications.
JAVA_EXECUTABLES: Java related executables and bytecode.
MOBILE_APPS: Mobile application packages.
BROWSER_EXTENSIONS: Browser extensions and web applications.
PACKAGE_FORMATS: Modern package managers and distribution formats.
ARCHIVE_FORMATS: Archive formats that can contain executables.
VIRTUALIZATION_FORMATS: Virtualization and container formats.
OFFICE_MACROS: Office documents with macro capabilities.
SYSTEM_FILES: System shortcuts and configuration files.
SYSTEM_DRIVERS: System drivers and low-level components.
WINDOWS_THEMES: Windows theme and customization files.
HELP_FILES: Help and documentation files that can execute code.
"""
# Traditional Windows executables
WINDOWS_EXECUTABLES = {
".exe", ".bat", ".cmd", ".com", ".pif", ".scr", ".msi", ".dll"
".exe",
".bat",
".cmd",
".com",
".pif",
".scr",
".msi",
".dll",
}
# Script files that can execute code
SCRIPT_FILES = {
".vbs", ".js", ".jse", ".wsf", ".wsh", ".hta", ".ps1", ".psm1",
".ps1xml", ".psc1", ".psd1", ".pssc", ".cdxml", ".xaml"
".vbs",
".js",
".jse",
".wsf",
".wsh",
".hta",
".ps1",
".psm1",
".ps1xml",
".psc1",
".psd1",
".pssc",
".cdxml",
".xaml",
}
# Web server and dynamic content scripts
WEB_SCRIPTS = {
".jsp", ".php", ".php3", ".php4", ".php5", ".phtml", ".asp",
".aspx", ".cer", ".cgi", ".pl", ".py", ".rb", ".go", ".lua"
".jsp",
".php",
".php3",
".php4",
".php5",
".phtml",
".asp",
".aspx",
".cer",
".cgi",
".pl",
".py",
".rb",
".go",
".lua",
}
# Unix/Linux executables and shell scripts
UNIX_EXECUTABLES = {
".sh", ".bash", ".zsh", ".fish", ".csh", ".ksh", ".tcsh",
".run", ".bin", ".out", ".elf", ".so", ".a"
".sh",
".bash",
".zsh",
".fish",
".csh",
".ksh",
".tcsh",
".run",
".bin",
".out",
".elf",
".so",
".a",
}
# macOS specific executables and applications
MACOS_EXECUTABLES = {
".app", ".dmg", ".pkg", ".mpkg", ".command", ".tool",
".workflow", ".action", ".dylib", ".bundle", ".framework"
".app",
".dmg",
".pkg",
".mpkg",
".command",
".tool",
".workflow",
".action",
".dylib",
".bundle",
".framework",
}
# Java related executables and bytecode
JAVA_EXECUTABLES = {
".jar", ".war", ".ear", ".jnlp", ".class"
}
JAVA_EXECUTABLES = {".jar", ".war", ".ear", ".jnlp", ".class"}
# Mobile application packages
MOBILE_APPS = {
".apk", ".aab", ".ipa", ".appx", ".msix", ".xap"
}
MOBILE_APPS = {".apk", ".aab", ".ipa", ".appx", ".msix", ".xap"}
# Browser extensions and web applications
BROWSER_EXTENSIONS = {
".crx", ".xpi", ".safariextz", ".oex", ".nex", ".gadget"
".crx",
".xpi",
".safariextz",
".oex",
".nex",
".gadget",
}
# Modern package managers and distribution formats
PACKAGE_FORMATS = {
".deb", ".rpm", ".snap", ".flatpak", ".appimage", ".vsix",
".nupkg", ".gem", ".whl", ".egg"
".deb",
".rpm",
".snap",
".flatpak",
".appimage",
".vsix",
".nupkg",
".gem",
".whl",
".egg",
}
# Archive formats that can contain executables
ARCHIVE_FORMATS = {
".7z", ".rar", ".cab", ".ace", ".arj", ".lzh", ".lha", ".zoo"
".7z",
".rar",
".cab",
".ace",
".arj",
".lzh",
".lha",
".zoo",
}
# Virtualization and container formats
VIRTUALIZATION_FORMATS = {
".ova", ".ovf", ".vmdk", ".vdi", ".vhd", ".vhdx", ".qcow2", ".docker"
".ova",
".ovf",
".vmdk",
".vdi",
".vhd",
".vhdx",
".qcow2",
".docker",
}
# Office documents with macro capabilities
OFFICE_MACROS = {
".docm", ".dotm", ".xlsm", ".xltm", ".xlam", ".pptm",
".potm", ".ppam", ".sldm"
".docm",
".dotm",
".xlsm",
".xltm",
".xlam",
".pptm",
".potm",
".ppam",
".sldm",
}
# System shortcuts and configuration files
SYSTEM_FILES = {
".url", ".website", ".webloc", ".desktop", ".lnk", ".application",
".manifest", ".deploy", ".msu", ".patch", ".diff", ".reg", ".inf"
".url",
".website",
".webloc",
".desktop",
".lnk",
".application",
".manifest",
".deploy",
".msu",
".patch",
".diff",
".reg",
".inf",
}
# System drivers and low-level components
SYSTEM_DRIVERS = {
".sys", ".drv", ".ocx", ".cpl"
}
SYSTEM_DRIVERS = {".sys", ".drv", ".ocx", ".cpl"}
# Windows theme and customization files
WINDOWS_THEMES = {
".theme", ".themepack", ".scf", ".shs", ".shb", ".sct",
".ws", ".job", ".msc"
".theme",
".themepack",
".scf",
".shs",
".shb",
".sct",
".ws",
".job",
".msc",
}
# Help and documentation files that can execute code
HELP_FILES = {
".chm", ".hlp"
}
HELP_FILES = {".chm", ".hlp"}
class CompoundExtensionCategory(Enum):
"""
Categorized compound file extensions that combine multiple suffixes.
Attributes:
COMPRESSED_ARCHIVES: Multi-part archive formats.
JAVASCRIPT_VARIANTS: Specialized JavaScript files.
WEB_CONTENT: Minified static web assets.
"""
# Compressed archive formats
COMPRESSED_ARCHIVES = {
".tar.xz", ".tar.gz", ".tar.bz2", ".tar.lz", ".tar.lzma",
".tar.Z", ".tgz", ".tbz2"
".tar.xz",
".tar.gz",
".tar.bz2",
".tar.lz",
".tar.lzma",
".tar.Z",
".tgz",
".tbz2",
}
# JavaScript related compound extensions
JAVASCRIPT_VARIANTS = {
".user.js", ".backup.js", ".min.js", ".worker.js"
}
JAVASCRIPT_VARIANTS = {".user.js", ".backup.js", ".min.js", ".worker.js"}
# Web content compound extensions
WEB_CONTENT = {
".min.css", ".min.html"
}
WEB_CONTENT = {".min.css", ".min.html"}
class UnicodeAttackCategory(Enum):
"""
Categorized Unicode code points used in obfuscation attacks.
Attributes:
DIRECTIONAL_OVERRIDES: Right-to-left and directional controls.
ZERO_WIDTH_CHARACTERS: Zero-width and invisible characters.
LANGUAGE_MARKS: Language and format specific characters.
CONFUSING_PUNCTUATION: Punctuation that can disguise extensions.
"""
# Right-to-Left and directional override characters
DIRECTIONAL_OVERRIDES = {
0x202E, # U+202E RIGHT-TO-LEFT OVERRIDE
@@ -130,7 +263,7 @@ class UnicodeAttackCategory(Enum):
0x2068, # U+2068 FIRST STRONG ISOLATE
0x2069, # U+2069 POP DIRECTIONAL ISOLATE
}
# Zero-width and invisible characters
ZERO_WIDTH_CHARACTERS = {
0x200B, # U+200B ZERO WIDTH SPACE
@@ -140,13 +273,13 @@ class UnicodeAttackCategory(Enum):
0xFEFF, # U+FEFF ZERO WIDTH NO-BREAK SPACE (BOM)
0x034F, # U+034F COMBINING GRAPHEME JOINER
}
# Language and format specific characters
LANGUAGE_MARKS = {
0x061C, # U+061C ARABIC LETTER MARK
0x180E, # U+180E MONGOLIAN VOWEL SEPARATOR
}
# Confusing punctuation that can disguise extensions
CONFUSING_PUNCTUATION = {
0x2024, # U+2024 ONE DOT LEADER
@@ -157,76 +290,166 @@ class UnicodeAttackCategory(Enum):
class SuspiciousFilePattern(Enum):
"""
Categorized patterns used to flag potentially malicious uploads.
Attributes:
DIRECTORY_TRAVERSAL: Directory traversal attack patterns.
SUSPICIOUS_NAMES: Suspicious filename patterns.
EXECUTABLE_SIGNATURES: Dangerous file content signatures.
SUSPICIOUS_PATHS: Suspicious path components.
"""
# Directory traversal attack patterns
DIRECTORY_TRAVERSAL = {
"../", "..\\", ".../", "...\\",
"....//", "....\\\\",
"%2e%2e%2f", "%2e%2e%5c", # URL encoded ../ and ..\
"%252e%252e%252f", "%252e%252e%255c" # Double URL encoded
"../",
"..\\",
".../",
"...\\",
"....//",
"....\\\\",
"%2e%2e%2f",
"%2e%2e%5c", # URL encoded ../ and ..\
"%252e%252e%252f",
"%252e%252e%255c", # Double URL encoded
}
# Suspicious filename patterns
SUSPICIOUS_NAMES = {
# Windows system files that shouldn't be in user uploads
"autorun.inf", "desktop.ini", "thumbs.db", ".ds_store",
"autorun.inf",
"desktop.ini",
"thumbs.db",
".ds_store",
# Common malware names
"install.exe", "setup.exe", "update.exe", "patch.exe",
"crack.exe", "keygen.exe", "loader.exe", "activator.exe",
"install.exe",
"setup.exe",
"update.exe",
"patch.exe",
"crack.exe",
"keygen.exe",
"loader.exe",
"activator.exe",
# Hidden or system-like files
".htaccess", ".htpasswd", "web.config", "robots.txt"
".htaccess",
".htpasswd",
"web.config",
"robots.txt",
}
# Dangerous file content signatures (magic bytes)
EXECUTABLE_SIGNATURES = {
# Windows PE executables
b"MZ", b"PE\x00\x00",
b"MZ",
b"PE\x00\x00",
# ELF executables (Linux)
b"\x7fELF",
# Mach-O executables (macOS)
b"\xfe\xed\xfa\xce", b"\xfe\xed\xfa\xcf",
b"\xce\xfa\xed\xfe", b"\xcf\xfa\xed\xfe",
b"\xfe\xed\xfa\xce",
b"\xfe\xed\xfa\xcf",
b"\xce\xfa\xed\xfe",
b"\xcf\xfa\xed\xfe",
# Java class files
b"\xca\xfe\xba\xbe",
# Windows shortcuts (.lnk)
b"L\x00\x00\x00"
b"L\x00\x00\x00",
}
# Suspicious path components
SUSPICIOUS_PATHS = {
# Windows system directories
"windows/", "system32/", "syswow64/", "programfiles/",
"windows/",
"system32/",
"syswow64/",
"programfiles/",
# Unix system directories
"/bin/", "/sbin/", "/usr/bin/", "/usr/sbin/", "/etc/",
"/bin/",
"/sbin/",
"/usr/bin/",
"/usr/sbin/",
"/etc/",
# Web server directories
"cgi-bin/", "htdocs/", "www/", "wwwroot/",
"cgi-bin/",
"htdocs/",
"www/",
"wwwroot/",
# Development/build directories
".git/", ".svn/", "node_modules/", "__pycache__/"
".git/",
".svn/",
"node_modules/",
"__pycache__/",
}
class ZipThreatCategory(Enum):
"""
Categories of potentially harmful contents within ZIP archives.
Attributes:
NESTED_ARCHIVES: Archive format threats.
EXECUTABLE_FILES: Executable content threats.
SCRIPT_FILES: Script and code threats.
SYSTEM_FILES: System and configuration threats.
"""
# Archive format threats
NESTED_ARCHIVES = {
".zip", ".rar", ".7z", ".tar", ".gz", ".bz2", ".xz",
".tar.gz", ".tar.bz2", ".tar.xz", ".tgz", ".tbz2"
".zip",
".rar",
".7z",
".tar",
".gz",
".bz2",
".xz",
".tar.gz",
".tar.bz2",
".tar.xz",
".tgz",
".tbz2",
}
# Executable content threats
EXECUTABLE_FILES = {
".exe", ".com", ".bat", ".cmd", ".scr", ".pif",
".bin", ".run", ".app", ".deb", ".rpm", ".msi"
".exe",
".com",
".bat",
".cmd",
".scr",
".pif",
".bin",
".run",
".app",
".deb",
".rpm",
".msi",
}
# Script and code threats
SCRIPT_FILES = {
".js", ".vbs", ".ps1", ".sh", ".bash", ".py", ".php",
".pl", ".rb", ".lua", ".asp", ".jsp"
".js",
".vbs",
".ps1",
".sh",
".bash",
".py",
".php",
".pl",
".rb",
".lua",
".asp",
".jsp",
}
# System and configuration threats
SYSTEM_FILES = {
".dll", ".so", ".dylib", ".sys", ".drv", ".inf",
".reg", ".cfg", ".conf", ".ini"
}
".dll",
".so",
".dylib",
".sys",
".drv",
".inf",
".reg",
".cfg",
".conf",
".ini",
}

View File

@@ -1,14 +1,26 @@
"""
File Security Exceptions Module
"""File security exception classes and error codes."""
Contains all exception classes used by the file security system.
"""
from typing import List
from dataclasses import dataclass
# ============================================================================
# Configuration Validation
# ============================================================================
@dataclass
class ConfigValidationError:
"""
Configuration validation issue with severity and recommendation.
Attributes:
error_type: Type of the validation error.
message: Human-readable error message.
severity: Error severity level ('error', 'warning', 'info').
component: Component that failed validation.
recommendation: Optional recommendation to fix the issue.
"""
error_type: str
message: str
severity: str # 'error', 'warning', 'info'
@@ -17,8 +29,440 @@ class ConfigValidationError:
class FileSecurityConfigurationError(Exception):
def __init__(self, errors: List[ConfigValidationError]):
"""
Configuration validation failed with aggregated errors.
Args:
errors: List of ConfigValidationError instances.
Attributes:
errors: List of validation errors that caused failure.
"""
def __init__(self, errors: list[ConfigValidationError]):
self.errors = errors
error_messages = [f"{error.severity.upper()}: {error.message}" for error in errors]
super().__init__(f"Configuration validation failed: {'; '.join(error_messages)}")
error_messages = [
f"{error.severity.upper()}: {error.message}" for error in errors
]
super().__init__(
f"Configuration validation failed: {'; '.join(error_messages)}"
)
# ============================================================================
# Error Codes
# ============================================================================
class ErrorCode:
"""
Machine-readable error codes for file validation failures.
Attributes:
Error codes are class-level string constants for various
validation failure types.
"""
# Filename validation errors
FILENAME_EMPTY = "FILENAME_EMPTY"
FILENAME_INVALID = "FILENAME_INVALID"
FILENAME_TOO_LONG = "FILENAME_TOO_LONG"
# Unicode security errors
UNICODE_SECURITY = "UNICODE_SECURITY"
UNICODE_DANGEROUS_CHARS = "UNICODE_DANGEROUS_CHARS"
UNICODE_NORMALIZATION_ERROR = "UNICODE_NORMALIZATION_ERROR"
# Extension validation errors
EXTENSION_BLOCKED = "EXTENSION_BLOCKED"
EXTENSION_NOT_ALLOWED = "EXTENSION_NOT_ALLOWED"
COMPOUND_EXTENSION_BLOCKED = "COMPOUND_EXTENSION_BLOCKED"
EXTENSION_MISSING = "EXTENSION_MISSING"
# Windows security errors
WINDOWS_RESERVED_NAME = "WINDOWS_RESERVED_NAME"
# File size errors
FILE_TOO_LARGE = "FILE_TOO_LARGE"
FILE_EMPTY = "FILE_EMPTY"
FILE_SIZE_UNKNOWN = "FILE_SIZE_UNKNOWN"
# MIME type errors
MIME_TYPE_INVALID = "MIME_TYPE_INVALID"
MIME_TYPE_MISMATCH = "MIME_TYPE_MISMATCH"
MIME_DETECTION_FAILED = "MIME_DETECTION_FAILED"
# File signature errors
FILE_SIGNATURE_INVALID = "FILE_SIGNATURE_INVALID"
FILE_SIGNATURE_MISSING = "FILE_SIGNATURE_MISSING"
FILE_SIGNATURE_MISMATCH = "FILE_SIGNATURE_MISMATCH"
# Compression and ZIP errors
ZIP_BOMB_DETECTED = "ZIP_BOMB_DETECTED"
ZIP_CONTENT_THREAT = "ZIP_CONTENT_THREAT"
COMPRESSION_RATIO_EXCEEDED = "COMPRESSION_RATIO_EXCEEDED"
ZIP_TOO_MANY_ENTRIES = "ZIP_TOO_MANY_ENTRIES"
ZIP_INVALID_STRUCTURE = "ZIP_INVALID_STRUCTURE"
ZIP_CORRUPT = "ZIP_CORRUPT"
ZIP_TOO_LARGE = "ZIP_TOO_LARGE"
ZIP_NESTED_ARCHIVE = "ZIP_NESTED_ARCHIVE"
ZIP_DIRECTORY_TRAVERSAL = "ZIP_DIRECTORY_TRAVERSAL"
ZIP_SYMLINK_DETECTED = "ZIP_SYMLINK_DETECTED"
ZIP_ABSOLUTE_PATH = "ZIP_ABSOLUTE_PATH"
ZIP_ANALYSIS_TIMEOUT = "ZIP_ANALYSIS_TIMEOUT"
# Processing errors
PROCESSING_ERROR = "PROCESSING_ERROR"
IO_ERROR = "IO_ERROR"
MEMORY_ERROR = "MEMORY_ERROR"
# ============================================================================
# Base Exceptions
# ============================================================================
class FileSecurityError(Exception):
"""
Base exception for all file security validation failures.
Args:
message: Human-readable error description.
error_code: Optional machine-readable error code.
Attributes:
message: Human-readable error message.
error_code: Machine-readable error code from ErrorCode.
"""
def __init__(self, message: str, error_code: str | None = None):
self.message = message
self.error_code = error_code
super().__init__(message)
# ============================================================================
# File Validation Exceptions
# ============================================================================
class FileValidationError(FileSecurityError):
"""
File validation failed.
Args:
message: Human-readable error description.
filename: Optional name of the file that failed validation.
error_code: Optional machine-readable error code.
Attributes:
filename: Name of the file that failed validation.
"""
def __init__(
self,
message: str,
filename: str | None = None,
error_code: str | None = None,
):
self.filename = filename
super().__init__(message, error_code)
# ============================================================================
# Filename Security Exceptions
# ============================================================================
class FilenameSecurityError(FileValidationError):
"""Filename failed security checks."""
pass
class UnicodeSecurityError(FilenameSecurityError):
"""
Dangerous Unicode characters detected in filename.
Args:
message: Human-readable error description.
filename: Optional filename containing dangerous Unicode.
dangerous_chars: Optional list of (char, code_point, position)
tuples for each dangerous character found.
Attributes:
dangerous_chars: List of dangerous character tuples.
"""
def __init__(
self,
message: str,
filename: str | None = None,
dangerous_chars: list[tuple[str, int, int]] | None = None,
):
self.dangerous_chars = dangerous_chars or []
super().__init__(
message,
filename=filename,
error_code=ErrorCode.UNICODE_DANGEROUS_CHARS,
)
class ExtensionSecurityError(FilenameSecurityError):
"""
Dangerous file extension detected.
Args:
message: Human-readable error description.
filename: Optional filename with dangerous extension.
extension: Optional specific extension that was blocked.
error_code: Optional error code (defaults to
EXTENSION_BLOCKED).
Attributes:
extension: The specific extension that was blocked.
"""
def __init__(
self,
message: str,
filename: str | None = None,
extension: str | None = None,
error_code: str | None = None,
):
self.extension = extension
super().__init__(
message,
filename=filename,
error_code=error_code or ErrorCode.EXTENSION_BLOCKED,
)
class WindowsReservedNameError(FilenameSecurityError):
"""
Windows reserved device name used.
Args:
message: Human-readable error description.
filename: Optional filename using a reserved name.
reserved_name: Optional specific reserved name detected.
Attributes:
reserved_name: The specific reserved name that was detected.
"""
def __init__(
self,
message: str,
filename: str | None = None,
reserved_name: str | None = None,
):
self.reserved_name = reserved_name
super().__init__(
message,
filename=filename,
error_code=ErrorCode.WINDOWS_RESERVED_NAME,
)
# ============================================================================
# File Content Exceptions
# ============================================================================
class FileSizeError(FileValidationError):
"""
File exceeds configured size limits.
Args:
message: Human-readable error description.
filename: Optional filename that exceeded size limits.
size: Optional actual file size in bytes.
max_size: Optional maximum allowed size in bytes.
Attributes:
size: The actual file size in bytes.
max_size: The maximum allowed size in bytes.
"""
def __init__(
self,
message: str,
filename: str | None = None,
size: int | None = None,
max_size: int | None = None,
):
self.size = size
self.max_size = max_size
super().__init__(
message, filename=filename, error_code=ErrorCode.FILE_TOO_LARGE
)
class MimeTypeError(FileValidationError):
"""
File MIME type not allowed or mismatches extension.
Args:
message: Human-readable error description.
filename: Optional filename with MIME type issue.
detected_mime: Optional detected MIME type string.
allowed_mimes: Optional list of allowed MIME types.
error_code: Optional error code (defaults to
MIME_TYPE_INVALID).
Attributes:
detected_mime: The detected MIME type string.
allowed_mimes: List of allowed MIME types.
"""
def __init__(
self,
message: str,
filename: str | None = None,
detected_mime: str | None = None,
allowed_mimes: list[str] | None = None,
error_code: str | None = None,
):
self.detected_mime = detected_mime
self.allowed_mimes = allowed_mimes or []
super().__init__(
message,
filename=filename,
error_code=error_code or ErrorCode.MIME_TYPE_INVALID,
)
class FileSignatureError(FileValidationError):
"""
File header signature invalid or mismatched.
Args:
message: Human-readable error description.
filename: Optional filename with signature issue.
expected_type: Optional expected file type based on extension.
Attributes:
expected_type: The expected file type based on extension.
"""
def __init__(
self,
message: str,
filename: str | None = None,
expected_type: str | None = None,
):
self.expected_type = expected_type
super().__init__(
message,
filename=filename,
error_code=ErrorCode.FILE_SIGNATURE_MISMATCH,
)
# ============================================================================
# Compression and ZIP Exceptions
# ============================================================================
class CompressionSecurityError(FileValidationError):
"""
Compressed file security check failed.
Args:
message: Human-readable error description.
filename: Optional filename of compressed file.
error_code: Optional error code (defaults to
COMPRESSION_GENERIC).
Attributes:
None beyond inherited FileValidationError attributes.
"""
pass
class ZipBombError(CompressionSecurityError):
"""
Zip archive exceeds compression ratio or uncompressed size limits.
Args:
message: Human-readable error description.
filename: Optional filename of zip bomb.
compression_ratio: Optional actual compression ratio detected.
uncompressed_size: Optional total uncompressed size in bytes.
max_ratio: Optional maximum allowed compression ratio.
max_size: Optional maximum allowed uncompressed size in bytes.
Attributes:
compression_ratio: Actual compression ratio detected.
uncompressed_size: Total uncompressed size in bytes.
max_ratio: Maximum allowed compression ratio.
max_size: Maximum allowed uncompressed size in bytes.
"""
def __init__(
self,
message: str,
filename: str | None = None,
compression_ratio: float | None = None,
uncompressed_size: int | None = None,
max_ratio: float | None = None,
max_size: int | None = None,
):
self.compression_ratio = compression_ratio
self.uncompressed_size = uncompressed_size
self.max_ratio = max_ratio
self.max_size = max_size
super().__init__(
message,
filename=filename,
error_code=ErrorCode.ZIP_BOMB_DETECTED,
)
class ZipContentError(CompressionSecurityError):
"""
Zip archive contains dangerous content or structure.
Args:
message: Human-readable error description.
filename: Optional filename of problematic archive.
threats: Optional list of detected threat descriptions.
error_code: Optional error code (defaults to
ZIP_CONTENT_THREAT).
Attributes:
threats: List of detected threat descriptions.
"""
def __init__(
self,
message: str,
filename: str | None = None,
threats: list[str] | None = None,
error_code: str | None = None,
):
self.threats = threats or []
super().__init__(
message,
filename=filename,
error_code=error_code or ErrorCode.ZIP_CONTENT_THREAT,
)
class FileProcessingError(FileSecurityError):
"""
Unexpected processing error during file validation.
Args:
message: Human-readable error description.
original_error: Optional original exception that was caught.
Attributes:
original_error: The original exception that was caught.
"""
def __init__(self, message: str, original_error: Exception | None = None):
self.original_error = original_error
super().__init__(message, error_code=ErrorCode.PROCESSING_ERROR)

View File

@@ -1,18 +1,18 @@
"""
File Validator Module
Main validator class that coordinates all file security validations.
"""
"""Main file validator coordinating all security validations."""
import logging
import os
import time
import mimetypes
from typing import Set, Tuple
import magic
from fastapi import UploadFile
import core.logger as core_logger
# Optional FastAPI integration - fallback to protocol if not available
try:
from fastapi import UploadFile
except ImportError:
from .protocols import UploadFileProtocol as UploadFile
from .config import FileSecurityConfig
from .validators import (
UnicodeSecurityValidator,
@@ -21,30 +21,53 @@ from .validators import (
CompressionSecurityValidator,
)
from .inspectors import ZipContentInspector
from .exceptions import (
ErrorCode,
FileValidationError,
FilenameSecurityError,
ExtensionSecurityError,
FileSizeError,
MimeTypeError,
FileSignatureError,
FileProcessingError,
)
logger = logging.getLogger(__name__)
class FileValidator:
"""
Coordinated security validation for uploaded files.
Attributes:
config: Active security configuration.
unicode_validator: Validator for Unicode-related checks.
extension_validator: Validator for file extension rules.
windows_validator: Validator enforcing Windows-specific constraints.
compression_validator: Validator handling compressed file limits.
zip_inspector: Inspector for ZIP archive contents.
magic_mime: MIME type detector based on python-magic.
magic_available: Whether python-magic was successfully initialized.
"""
def __init__(self, config: FileSecurityConfig | None = None):
"""
Initialize the FileValidator with configuration and specialized validators.
Initialize file validator with configuration and detection utilities.
Args:
config (FileSecurityConfig | None, optional): Configuration object for file security settings.
If None, a default FileSecurityConfig instance will be created. Defaults to None.
config: Optional configuration object defining file security
rules. Defaults to new FileSecurityConfig instance.
Attributes:
config (FileSecurityConfig): The file security configuration to use.
unicode_validator (UnicodeSecurityValidator): Validator for Unicode security checks.
extension_validator (ExtensionSecurityValidator): Validator for file extension checks.
windows_validator (WindowsSecurityValidator): Validator for Windows-specific security checks.
compression_validator (CompressionSecurityValidator): Validator for compression-related checks.
zip_inspector (ZipContentInspector): Inspector for ZIP file contents.
magic_mime (magic.Magic | None): Magic object for MIME type detection if available.
magic_available (bool): Flag indicating whether python-magic is available for use.
Raises:
Exception: Logs a warning if python-magic initialization fails, but does not raise.
config: Active security configuration.
unicode_validator: Validator for Unicode-related checks.
extension_validator: Validator for file extension rules.
windows_validator: Validator enforcing Windows constraints.
compression_validator: Validator for compressed file limits.
zip_inspector: Inspector for ZIP archive contents.
magic_mime: MIME type detector based on python-magic.
magic_available: Whether python-magic initialized successfully.
"""
self.config = config or FileSecurityConfig()
@@ -59,38 +82,25 @@ class FileValidator:
try:
self.magic_mime = magic.Magic(mime=True)
self.magic_available = True
core_logger.print_to_log(
"File content detection (python-magic) initialized", "debug"
)
logger.debug("File content detection (python-magic) initialized")
except Exception as err:
self.magic_available = False
core_logger.print_to_log(
f"Warning: python-magic not available for content detection: {err}",
"warning",
logger.warning(
"python-magic not available for content detection: %s",
err,
)
def _detect_mime_type(self, file_content: bytes, filename: str) -> str:
"""
Detect the MIME type of a file from its content and filename.
This method attempts to determine the MIME type using multiple strategies:
1. Content-based detection using python-magic library (most reliable)
2. Filename extension-based detection as a fallback
3. Default to "application/octet-stream" if detection fails
Determine MIME type for file content.
Args:
file_content (bytes): The raw binary content of the file to analyze.
filename (str): The name of the file, used for extension-based detection.
file_content: Raw bytes of the file to inspect.
filename: Original filename for fallback MIME detection.
Returns:
str: The detected MIME type (e.g., "image/jpeg", "application/pdf") or
"application/octet-stream" if detection fails.
Note:
- Content-based detection requires the python-magic library to be available.
- If magic detection fails, the method logs a warning and falls back to
filename-based detection.
- Filename-based detection is less reliable as it only considers the extension.
Detected MIME type or "application/octet-stream" if detection
fails.
"""
detected_mime = None
@@ -99,46 +109,32 @@ class FileValidator:
try:
detected_mime = self.magic_mime.from_buffer(file_content)
except Exception as err:
core_logger.print_to_log(
f"Magic MIME detection failed: {err}", "warning"
)
logger.warning("Magic MIME detection failed: %s", err)
# Fallback to filename-based detection
if not detected_mime:
core_logger.print_to_log(
"Fallback to filename-based MIME detection", "info"
)
logger.info("Fallback to filename-based MIME detection")
detected_mime, _ = mimetypes.guess_type(filename)
return detected_mime or "application/octet-stream"
def _validate_file_signature(self, file_content: bytes, expected_type: str) -> bool:
def _validate_file_signature(self, file_content: bytes, expected_type: str) -> None:
"""
Validate a file's content by checking its magic number (file signature).
This method examines the first few bytes of a file to determine if they match
known file signatures for the expected file type. This is a more reliable method
of file type validation than relying solely on file extensions.
Verify file content begins with known signature for expected type.
Args:
file_content (bytes): The raw bytes content of the file to validate.
expected_type (str): The expected file type category. Currently supports:
- "image": Validates JPEG and PNG image formats
- "zip": Validates ZIP archive formats
file_content: Raw bytes of the uploaded file.
expected_type: Logical file category ("image" or "zip").
Returns:
bool: True if the file signature matches one of the expected signatures
for the given type, False otherwise. Also returns False if the
file content is too short (less than 4 bytes).
Note:
This method checks against a predefined set of file signatures:
- JPEG images: Multiple variants including standard and EXIF
- PNG images: Standard PNG signature
- ZIP archives: Multiple ZIP format variants including empty and spanning archives
Raises:
FileSignatureError: File header doesn't match expected type
signatures.
"""
if len(file_content) < 4:
return False
raise FileSignatureError(
f"File too small to verify {expected_type} signature",
expected_type=expected_type,
)
# Common file signatures
signatures = {
@@ -158,47 +154,39 @@ class FileValidator:
for signature in expected_signatures:
if file_content.startswith(signature):
return True
return # Signature matched
return False
# No matching signature found
raise FileSignatureError(
f"File content does not match expected {expected_type} format",
expected_type=expected_type,
)
def _sanitize_filename(self, filename: str) -> str:
"""
Sanitize a filename to ensure it is safe for filesystem operations.
This method performs comprehensive filename sanitization including:
- Unicode security validation to prevent homograph and other Unicode-based attacks
- Path traversal prevention by removing path components
- Removal of null bytes and control characters
- Replacement of dangerous characters with underscores
- Validation against Windows reserved names (e.g., CON, PRN, AUX)
- Extension security validation to prevent compound/double extension attacks
- Filename length limitation while preserving extensions
- Ensures resulting filename is not empty or extension-only
Sanitize user-provided filename to prevent security risks.
Args:
filename (str): The original filename to sanitize.
filename: Original filename supplied by the user.
Returns:
str: A sanitized filename that is safe for filesystem operations.
Sanitized filename safe for storage and processing.
Raises:
ValueError: If the filename is empty, contains dangerous Unicode sequences,
is a Windows reserved name, or has dangerous extensions.
Note:
The order of validation steps is intentional and critical for security.
Unicode validation must occur first to prevent bypassing other checks.
UnicodeSecurityError: Filename contains dangerous Unicode
characters or fails normalization checks.
WindowsReservedNameError: Filename uses Windows reserved
device names.
ExtensionSecurityError: Filename contains blocked or
dangerous file extensions.
ValueError: Filename is empty string.
"""
if not filename:
raise ValueError("Filename cannot be empty")
# Unicode security validation (must be first)
# This detects and blocks Unicode-based attacks before any other processing
try:
filename = self.unicode_validator.validate_unicode_security(filename)
except ValueError as err:
raise err
filename = self.unicode_validator.validate_unicode_security(filename)
# Remove path components to prevent directory traversal
filename = os.path.basename(filename)
@@ -234,45 +222,34 @@ class FileValidator:
# Final check: ensure the sanitized filename doesn't become a reserved name
self.windows_validator.validate_windows_reserved_names(filename)
core_logger.print_to_log(
f"Filename sanitized: original='{os.path.basename(filename if filename else 'None')}' -> sanitized='{filename}'",
"debug",
logger.debug(
"Filename sanitized: original='%s' -> sanitized='%s'",
os.path.basename(filename if filename else "None"),
filename,
)
return filename
def _validate_filename(self, file: UploadFile) -> Tuple[bool, str] | None:
def _validate_filename(self, file: UploadFile) -> None:
"""
Validates and sanitizes the filename of an uploaded file.
This method performs comprehensive filename validation including checking for
presence, sanitizing potentially dangerous characters or patterns, and verifying
the filename remains valid after sanitization. The original file object is
updated with the sanitized filename if validation succeeds.
Validate filename of uploaded file and sanitize it in place.
Args:
file (UploadFile): The uploaded file object whose filename needs validation.
The filename attribute will be modified in-place if valid.
Returns:
Tuple[bool, str] | None: A tuple containing:
- bool: True if validation succeeds, False otherwise
- str: An error message describing why validation failed, or empty if successful
Returns None implicitly if no validation issues occur (though explicit return
values are preferred in all code paths).
file: Uploaded file whose filename should be validated and
sanitized.
Raises:
ValueError: When dangerous file extensions are detected during sanitization.
Exception: For unexpected errors during the validation process.
Note:
This method modifies the file.filename attribute in-place when sanitization
is successful. All validation failures are logged and returned as descriptive
error messages rather than raising exceptions to the caller.
FilenameSecurityError: Filename is empty, invalid, or fails
sanitization.
FileProcessingError: Unexpected error during filename
validation.
"""
# Check filename
if not file.filename:
return False, "Filename is required"
raise FilenameSecurityError(
"Filename is required",
error_code=ErrorCode.FILENAME_EMPTY,
)
# Sanitize the filename to prevent security issues
try:
@@ -283,83 +260,76 @@ class FileValidator:
# Additional validation after sanitization
if not sanitized_filename or sanitized_filename.strip() == "":
return False, "Invalid filename after sanitization"
except ValueError as err:
# Dangerous extension detected - reject the file
return False, str(err)
raise FilenameSecurityError(
"Invalid filename after sanitization",
filename=file.filename,
error_code=ErrorCode.FILENAME_INVALID,
)
except FileValidationError:
# Let FileValidationError and subclasses propagate
raise
except Exception as err:
core_logger.print_to_log(
f"Unexpected error during filename validation: {str(err)}", "error"
)
return False, "Filename validation failed due to internal error"
logger.exception("Unexpected error during filename validation: %s", err)
raise FileProcessingError(
"Filename validation failed due to internal error",
original_error=err,
) from err
def _validate_file_extension(
self, file: UploadFile, allowed_extensions: Set[str]
) -> Tuple[bool, str] | None:
self, file: UploadFile, allowed_extensions: set[str]
) -> None:
"""
Validate the file extension against allowed and blocked extensions.
This method checks if the uploaded file has a valid extension by verifying it against
a set of allowed extensions and ensuring it's not in the blocked extensions list.
Validate extension of uploaded file against allowed and blocked lists.
Args:
file (UploadFile): The uploaded file object to validate
allowed_extensions (Set[str]): A set of allowed file extensions (e.g., {'.jpg', '.png'})
Returns:
Tuple[bool, str] | None: A tuple containing:
- bool: False if validation fails
- str: Error message describing the validation failure
Returns None implicitly if validation passes
file: File whose extension will be validated.
allowed_extensions: Set of allowed file extensions.
Raises:
None
Note:
- File extensions are compared in lowercase for case-insensitive matching
- The method first checks if the extension is in the allowed list
- Then verifies the extension is not in the globally blocked extensions list
FilenameSecurityError: Filename is missing.
ExtensionSecurityError: Extension is not allowed or is blocked.
"""
# Check file extension
if not file.filename:
return False, "Filename is required for extension validation"
raise FilenameSecurityError(
"Filename is required for extension validation",
error_code=ErrorCode.FILENAME_EMPTY,
)
_, ext = os.path.splitext(file.filename.lower())
if ext not in allowed_extensions:
return (
False,
raise ExtensionSecurityError(
f"Invalid file extension. Allowed: {', '.join(allowed_extensions)}",
filename=file.filename,
extension=ext,
error_code=ErrorCode.EXTENSION_NOT_ALLOWED,
)
# Check for blocked extensions
if ext in self.config.BLOCKED_EXTENSIONS:
return False, f"File extension {ext} is blocked for security reasons"
raise ExtensionSecurityError(
f"File extension {ext} is blocked for security reasons",
filename=file.filename,
extension=ext,
error_code=ErrorCode.EXTENSION_BLOCKED,
)
async def _validate_file_size(
self, file: UploadFile, max_file_size: int
) -> Tuple[bytes | None, int | None, bool, str]:
) -> tuple[bytes, int]:
"""
Validates the size of an uploaded file against a maximum allowed size.
This method reads the file content to determine its actual size and compares it
against the specified maximum file size. It handles files both with and without
size metadata.
Validate uploaded file size by sampling content and determining total bytes.
Args:
file (UploadFile): The file to validate, typically from a FastAPI upload.
max_file_size (int): Maximum allowed file size in bytes.
file: Uploaded file supporting asynchronous read and seek.
max_file_size: Maximum allowed file size in bytes.
Returns:
Tuple[bytes | None, int | None, bool, str]: A tuple containing:
- bytes | None: The first 8KB of file content if validation passes, None otherwise.
- int | None: The total file size in bytes if validation passes, None otherwise.
- bool: True if validation passes, False otherwise.
- str: A message describing the validation result. Returns "Passed" on success,
or an error message indicating why validation failed.
Tuple containing first 8 KB of file content and detected file
size in bytes.
Note:
The file pointer is reset to the beginning after size determination.
This method checks for both oversized files and empty files.
Raises:
FileSizeError: File size exceeds maximum or file is empty.
"""
# Read first chunk for content analysis
file_content = await file.read(8192) # Read first 8KB
@@ -378,161 +348,145 @@ class FileValidator:
await file.seek(0)
if file_size > max_file_size:
return (
None,
None,
False,
raise FileSizeError(
f"File too large. File size: {file_size // (1024*1024)}MB, maximum: {max_file_size // (1024*1024)}MB",
size=file_size,
max_size=max_file_size,
)
if file_size == 0:
return None, None, False, "Empty file not allowed"
raise FileSizeError(
"Empty file not allowed",
size=0,
max_size=max_file_size,
)
return file_content, file_size, True, "Passed"
return file_content, file_size
async def validate_image_file(self, file: UploadFile) -> Tuple[bool, str]:
async def validate_image_file(self, file: UploadFile) -> None:
"""
Validates an uploaded image file through multiple security checks.
This method performs comprehensive validation of an image file including:
- Filename validation (safe characters, length)
- File extension validation against allowed image extensions
- File size validation against configured maximum image size
- MIME type detection and validation
- File signature validation to ensure content matches expected image format
Validate uploaded image by checking filename, extension, size, MIME type, and signature.
Args:
file (UploadFile): The uploaded file object to validate.
Returns:
Tuple[bool, str]: A tuple containing:
- bool: True if validation passed, False otherwise
- str: A message describing the validation result or error
file: Uploaded file to validate.
Raises:
Exception: Any unexpected errors during validation are caught and logged,
returning (False, "File validation failed due to internal error")
Example:
>>> validator = FileValidator()
>>> is_valid, message = await validator.validate_image_file(uploaded_file)
>>> if is_valid:
... print("Image is valid")
FilenameSecurityError: Filename is empty, invalid, or fails
security checks.
ExtensionSecurityError: File extension is not allowed or is
blocked.
FileSizeError: File size exceeds maximum or file is empty.
MimeTypeError: MIME type is not in allowed image types.
FileSignatureError: File signature doesn't match expected image
format.
FileProcessingError: Unexpected error during validation.
"""
try:
# Validate filename
filename_validation = self._validate_filename(file)
if filename_validation is not None:
return filename_validation
# Validate filename (raises exceptions on failure)
self._validate_filename(file)
# Validate file extension
extension_validation = self._validate_file_extension(
file, self.config.ALLOWED_IMAGE_EXTENSIONS
)
if extension_validation is not None:
return extension_validation
# Validate file extension (raises exceptions on failure)
self._validate_file_extension(file, self.config.ALLOWED_IMAGE_EXTENSIONS)
# Validate file size
size_validation = await self._validate_file_size(
# Validate file size (raises exceptions on failure, returns content and size on success)
file_content, file_size = await self._validate_file_size(
file, self.config.limits.max_image_size
)
if size_validation[0] is None:
return size_validation[2], size_validation[3]
# Detect MIME type
filename = file.filename or "unknown"
detected_mime = self._detect_mime_type(size_validation[0], filename)
detected_mime = self._detect_mime_type(file_content, filename)
if detected_mime not in self.config.ALLOWED_IMAGE_MIMES:
return (
False,
raise MimeTypeError(
f"Invalid file type. Detected: {detected_mime}. Allowed: {', '.join(self.config.ALLOWED_IMAGE_MIMES)}",
filename=filename,
detected_mime=detected_mime,
allowed_mimes=list(self.config.ALLOWED_IMAGE_MIMES),
)
# Validate file signature
if not self._validate_file_signature(size_validation[0], "image"):
return False, "File content does not match expected image format"
# Validate file signature (raises exceptions on failure)
self._validate_file_signature(file_content, "image")
core_logger.print_to_log(
f"Image file validation passed: {filename} ({detected_mime}, {size_validation[1]} bytes)",
"debug",
logger.debug(
"Image file validation passed: %s (%s, %s bytes)",
filename,
detected_mime,
file_size,
)
return True, "Validation successful"
except FileValidationError:
# Let FileValidationError and subclasses propagate
raise
except Exception as err:
core_logger.print_to_log(
f"Error during image file validation: {err}", "error", exc=err
)
return False, "File validation failed due to internal error"
logger.exception("Error during image file validation: %s", err)
raise FileProcessingError(
"File validation failed due to internal error",
original_error=err,
) from err
async def validate_zip_file(self, file: UploadFile) -> Tuple[bool, str]:
async def validate_zip_file(self, file: UploadFile) -> None:
"""
Validates an uploaded ZIP file through multiple security checks.
This method performs comprehensive validation of ZIP files including:
- Filename validation for dangerous patterns
- File extension verification
- File size limits enforcement
- MIME type detection and validation
- ZIP file signature verification
- Compression ratio analysis (zip bomb detection)
- ZIP content inspection (if enabled)
Validate uploaded ZIP archive against service configuration.
Args:
file (UploadFile): The uploaded file to validate. Must be a ZIP file.
Returns:
Tuple[bool, str]: A tuple containing:
- bool: True if validation passed, False otherwise
- str: Success message or detailed error message explaining the validation failure
file: Incoming ZIP file-like object to validate.
Raises:
ValueError: When a dangerous file extension is detected during validation
Exception: For any unexpected errors during the validation process
Notes:
- The method allows application/octet-stream MIME type if the ZIP signature is valid
- Full file content is read for compression ratio and content inspection
- File position is reset to beginning after validation for subsequent operations
FilenameSecurityError: Filename is empty, invalid, or fails
security checks.
ExtensionSecurityError: File extension is not allowed or is
blocked.
FileSizeError: File size exceeds maximum or file is empty.
MimeTypeError: MIME type is not in allowed ZIP types.
FileSignatureError: File signature doesn't match expected ZIP
format.
CompressionSecurityError: ZIP compression validation failed
(zip bomb detected).
FileProcessingError: Unexpected error during validation.
"""
try:
# Validate filename
filename_validation = self._validate_filename(file)
if filename_validation is not None:
return filename_validation
# Validate filename (raises exceptions on failure)
self._validate_filename(file)
# Validate file extension
extension_validation = self._validate_file_extension(
file, self.config.ALLOWED_ZIP_EXTENSIONS
)
if extension_validation is not None:
return extension_validation
# Validate file extension (raises exceptions on failure)
self._validate_file_extension(file, self.config.ALLOWED_ZIP_EXTENSIONS)
# Validate file size
size_validation = await self._validate_file_size(
# Validate file size (raises exceptions on failure, returns content and size on success)
file_content, file_size = await self._validate_file_size(
file, self.config.limits.max_zip_size
)
if size_validation[0] is None:
return size_validation[2], size_validation[3]
# Detect MIME type using first 8KB
filename = file.filename or "unknown"
detected_mime = self._detect_mime_type(size_validation[0], filename)
detected_mime = self._detect_mime_type(file_content, filename)
# Validate ZIP file signature first (most reliable check)
has_zip_signature = self._validate_file_signature(size_validation[0], "zip")
if not has_zip_signature:
return False, "File content does not match ZIP format"
# This will raise FileSignatureError if signature doesn't match
try:
self._validate_file_signature(file_content, "zip")
except FileSignatureError as err:
# Re-raise with more specific message
raise FileSignatureError(
"File content does not match ZIP format",
filename=filename,
expected_type="zip",
) from err
# Check MIME type, but allow application/octet-stream if signature is valid
# Some ZIP files are detected as octet-stream, but signature check ensures it's really a ZIP
if detected_mime not in self.config.ALLOWED_ZIP_MIMES:
if detected_mime == "application/octet-stream" and has_zip_signature:
if detected_mime == "application/octet-stream":
# Valid ZIP file, just detected as generic binary
core_logger.print_to_log(
f"ZIP file detected as application/octet-stream, but signature is valid: {filename}",
"debug",
logger.debug(
"ZIP file detected as application/octet-stream, but signature is valid: %s",
filename,
)
else:
return (
False,
raise MimeTypeError(
f"Invalid file type. Detected: {detected_mime}. Expected ZIP file.",
filename=filename,
detected_mime=detected_mime,
allowed_mimes=list(self.config.ALLOWED_ZIP_MIMES),
)
# For ZIP validation (compression ratio and content inspection), we need the full file
@@ -546,39 +500,26 @@ class FileValidator:
# Validate ZIP compression ratio to detect zip bombs
if file_size is not None:
compression_validation = (
self.compression_validator.validate_zip_compression_ratio(
full_file_content, file_size
)
self.compression_validator.validate_zip_compression_ratio(
full_file_content, file_size
)
if not compression_validation[0]:
return (
False,
f"ZIP compression validation failed: {compression_validation[1]}",
)
# Perform ZIP content inspection if enabled
if self.config.limits.scan_zip_content:
content_inspection = self.zip_inspector.inspect_zip_content(
full_file_content
)
if not content_inspection[0]:
return (
False,
f"ZIP content inspection failed: {content_inspection[1]}",
)
self.zip_inspector.inspect_zip_content(full_file_content)
core_logger.print_to_log(
f"ZIP file validation passed: {filename} ({detected_mime}, {file_size} bytes)",
"debug",
logger.debug(
"ZIP file validation passed: %s (%s, %s bytes)",
filename,
detected_mime,
file_size,
)
return True, "Validation successful"
except ValueError as err:
# Dangerous extension detected - reject the file
return False, str(err)
except FileValidationError:
# Let FileValidationError and subclasses propagate
raise
except Exception as err:
core_logger.print_to_log(
f"Error during ZIP file validation: {err}", "error", exc=err
)
return False, "File validation failed due to internal error"
logger.exception("Error during ZIP file validation: %s", err)
raise FileProcessingError(
"File validation failed due to internal error",
original_error=err,
) from err

View File

@@ -1,6 +1,10 @@
# Inspectors package
"""
File content inspection modules for security validation.
This package provides inspectors that analyze the internal structure
and contents of uploaded files to detect potential security threats.
"""
from .zip_inspector import ZipContentInspector
__all__ = [
"ZipContentInspector"
]
__all__ = ["ZipContentInspector"]

View File

@@ -1,277 +1,411 @@
"""
ZIP Content Inspector Module
"""ZIP content inspector for security threat detection."""
from __future__ import annotations
Handles deep inspection of ZIP file contents for security threats.
"""
import io
import os
import time
import zipfile
from typing import List, Tuple, TYPE_CHECKING
from typing import TYPE_CHECKING
import core.logger as core_logger
import logging
from ..enums import SuspiciousFilePattern, ZipThreatCategory
from ..exceptions import ZipContentError, FileProcessingError, ErrorCode
if TYPE_CHECKING:
from ..config import FileSecurityConfig
class ZipContentInspector:
logger = logging.getLogger(__name__)
def __init__(self, config: "FileSecurityConfig"):
class ZipContentInspector:
"""
Inspects ZIP archive contents for security threats.
Attributes:
config: File security configuration.
"""
def __init__(self, config: FileSecurityConfig):
"""
Initialize ZIP inspector with configuration.
Args:
config: File security configuration.
"""
self.config = config
def inspect_zip_content(self, file_content: bytes) -> Tuple[bool, str]:
def inspect_zip_content(self, file_content: bytes) -> None:
"""
Perform deep inspection of ZIP file contents.
Inspect ZIP archive for potential security threats.
Args:
file_content: The ZIP file content as bytes
Returns:
Tuple[bool, str]: (is_safe, error_message)
file_content: Raw bytes of ZIP archive.
Raises:
ZipContentError: If security threats are detected in ZIP
content such as directory traversal, symlinks, nested
archives, or suspicious patterns.
FileProcessingError: If ZIP structure is invalid or
unexpected error occurs during inspection.
"""
try:
zip_bytes = io.BytesIO(file_content)
threats_found = []
# Start analysis timer
start_time = time.time()
with zipfile.ZipFile(zip_bytes, "r") as zip_file:
zip_entries = zip_file.infolist()
# Analyze each entry in the ZIP
for entry in zip_entries:
# Check for timeout
if time.time() - start_time > self.config.limits.zip_analysis_timeout:
return False, f"ZIP content inspection timeout after {self.config.limits.zip_analysis_timeout}s"
if (
time.time() - start_time
> self.config.limits.zip_analysis_timeout
):
logger.error(
"ZIP content inspection timeout",
extra={
"error_type": "zip_analysis_timeout",
"timeout": self.config.limits.zip_analysis_timeout,
},
)
raise ZipContentError(
message=f"ZIP content inspection timeout after {self.config.limits.zip_analysis_timeout}s",
threats=["Analysis timeout - potential zip bomb"],
error_code=ErrorCode.ZIP_ANALYSIS_TIMEOUT,
)
# Inspect individual entry
entry_threats = self._inspect_zip_entry(entry, zip_file)
threats_found.extend(entry_threats)
# Check for ZIP structure threats
structure_threats = self._inspect_zip_structure(zip_entries)
threats_found.extend(structure_threats)
# Return results
if threats_found:
return False, f"ZIP content threats detected: {'; '.join(threats_found)}"
core_logger.print_to_log(
f"ZIP content inspection passed: {len(zip_entries)} entries analyzed",
"debug"
)
return True, "ZIP content inspection passed"
except zipfile.BadZipFile:
return False, "Invalid or corrupted ZIP file structure"
except Exception as err:
core_logger.print_to_log(
f"Error during ZIP content inspection: {err}", "warning", exc=err
)
return False, f"ZIP content inspection failed: {str(err)}"
logger.warning(
"ZIP content threats detected",
extra={
"error_type": "zip_content_threat",
"threats": threats_found,
"threat_count": len(threats_found),
},
)
raise ZipContentError(
message=f"ZIP content threats detected: {'; '.join(threats_found)}",
threats=threats_found,
)
def _inspect_zip_entry(self, entry: zipfile.ZipInfo, zip_file: zipfile.ZipFile) -> List[str]:
logger.debug(
"ZIP content inspection passed: %s entries analyzed",
len(zip_entries),
)
except ZipContentError:
# Re-raise our own exceptions
raise
except zipfile.BadZipFile as err:
logger.error("Invalid or corrupted ZIP file structure", exc_info=True)
raise FileProcessingError(
message="Invalid or corrupted ZIP file structure",
original_error=err,
) from err
except Exception as err:
logger.error(
"Unexpected error during ZIP content inspection",
exc_info=True,
)
raise FileProcessingError(
message=f"ZIP content inspection failed: {str(err)}",
original_error=err,
) from err
def _inspect_zip_entry(
self, entry: zipfile.ZipInfo, zip_file: zipfile.ZipFile
) -> list[str]:
"""
Inspect individual ZIP entry for threats.
Inspect single ZIP entry for security threats.
Args:
entry: The ZIP entry to inspect
zip_file: The ZIP file object
entry: ZIP entry metadata.
zip_file: Parent ZIP archive.
Returns:
List[str]: List of threats found
List of threat descriptions.
"""
threats = []
filename = entry.filename
# 1. Check for directory traversal attacks
if self._has_directory_traversal(filename):
threats.append(f"Directory traversal attack in '{filename}'")
# 2. Check for absolute paths
if not self.config.limits.allow_absolute_paths and self._has_absolute_path(filename):
if not self.config.limits.allow_absolute_paths and self._has_absolute_path(
filename
):
threats.append(f"Absolute path detected in '{filename}'")
# 3. Check for symbolic links
if not self.config.limits.allow_symlinks and self._is_symlink(entry):
threats.append(f"Symbolic link detected: '{filename}'")
# 4. Check filename length limits
if len(os.path.basename(filename)) > self.config.limits.max_filename_length:
threats.append(f"Filename too long: '{filename}' ({len(os.path.basename(filename))} chars)")
threats.append(
f"Filename too long: '{filename}' ({len(os.path.basename(filename))} chars)"
)
# 5. Check path length limits
if len(filename) > self.config.limits.max_path_length:
threats.append(f"Path too long: '{filename}' ({len(filename)} chars)")
# 6. Check for suspicious filename patterns
suspicious_patterns = self._check_suspicious_patterns(filename)
threats.extend(suspicious_patterns)
# 7. Check for nested archives
if not self.config.limits.allow_nested_archives and self._is_nested_archive(filename):
if not self.config.limits.allow_nested_archives and self._is_nested_archive(
filename
):
threats.append(f"Nested archive detected: '{filename}'")
# 8. Check file content if enabled and entry is small enough
if self.config.limits.scan_zip_content and not entry.is_dir() and entry.file_size < 1024 * 1024: # 1MB limit for content scan
if (
self.config.limits.scan_zip_content
and not entry.is_dir()
and entry.file_size < 1024 * 1024
): # 1MB limit for content scan
content_threats = self._inspect_entry_content(entry, zip_file)
threats.extend(content_threats)
return threats
def _inspect_zip_structure(self, entries: List[zipfile.ZipInfo]) -> List[str]:
def _inspect_zip_structure(self, entries: list[zipfile.ZipInfo]) -> list[str]:
"""
Inspect overall ZIP structure for threats.
Inspect ZIP structure for anomalies.
Args:
entries: List of ZIP entries
entries: All ZIP entries to analyze.
Returns:
List[str]: List of structural threats found
List of structural threat descriptions.
"""
threats = []
# Check directory depth
max_depth = 0
for entry in entries:
depth = entry.filename.count('/') + entry.filename.count('\\')
depth = entry.filename.count("/") + entry.filename.count("\\")
max_depth = max(max_depth, depth)
if max_depth > self.config.limits.max_zip_depth:
threats.append(f"Excessive directory depth: {max_depth} (max: {self.config.limits.max_zip_depth})")
threats.append(
f"Excessive directory depth: {max_depth} (max: {self.config.limits.max_zip_depth})"
)
# Check for suspicious file distribution
file_types = {}
for entry in entries:
if not entry.is_dir():
ext = os.path.splitext(entry.filename)[1].lower()
file_types[ext] = file_types.get(ext, 0) + 1
# Check for excessive number of same-type files (potential spam/bomb)
for ext, count in file_types.items():
if count > 1000: # More than 1000 files of same type
threats.append(f"Excessive number of {ext} files: {count}")
return threats
def _has_directory_traversal(self, filename: str) -> bool:
"""Check if filename contains directory traversal patterns."""
"""
Check for directory traversal indicators.
Args:
filename: Filename to check.
Returns:
True if traversal detected.
"""
filename_lower = filename.lower()
for category in SuspiciousFilePattern:
if category == SuspiciousFilePattern.DIRECTORY_TRAVERSAL:
for pattern in category.value:
if pattern.lower() in filename_lower:
return True
# Additional checks for normalized paths
normalized = os.path.normpath(filename)
if normalized.startswith('..') or '/..' in normalized or '\\..' in normalized:
if normalized.startswith("..") or "/.." in normalized or "\\.." in normalized:
return True
return False
def _has_absolute_path(self, filename: str) -> bool:
"""Check if filename is an absolute path."""
"""
Check if filename is an absolute path.
Args:
filename: Path to check.
Returns:
True if absolute path detected.
"""
return (
filename.startswith('/') or # Unix absolute path
filename.startswith('\\') or # Windows UNC path
(len(filename) > 1 and filename[1] == ':') # Windows drive path
filename.startswith("/") # Unix absolute path
or filename.startswith("\\") # Windows UNC path
or (len(filename) > 1 and filename[1] == ":") # Windows drive path
)
def _is_symlink(self, entry: zipfile.ZipInfo) -> bool:
"""Check if ZIP entry is a symbolic link."""
"""
Check if entry is a symbolic link.
Args:
entry: ZIP entry to check.
Returns:
True if entry is a symlink.
"""
# Check if entry has symlink attributes
return (entry.external_attr >> 16) & 0o120000 == 0o120000
def _check_suspicious_patterns(self, filename: str) -> List[str]:
"""Check filename for suspicious patterns."""
def _check_suspicious_patterns(self, filename: str) -> list[str]:
"""
Check filename for suspicious patterns.
Args:
filename: Filename to check.
Returns:
List of pattern warnings.
"""
threats = []
filename_lower = filename.lower()
basename = os.path.basename(filename_lower)
# Check suspicious names
for pattern in SuspiciousFilePattern.SUSPICIOUS_NAMES.value:
if basename == pattern.lower():
threats.append(f"Suspicious filename pattern: '{filename}'")
break
# Check suspicious path components
for pattern in SuspiciousFilePattern.SUSPICIOUS_PATHS.value:
if pattern.lower() in filename_lower:
threats.append(f"Suspicious path component: '{filename}' contains '{pattern}'")
threats.append(
f"Suspicious path component: '{filename}' contains '{pattern}'"
)
break
return threats
def _is_nested_archive(self, filename: str) -> bool:
"""Check if filename is a nested archive."""
"""
Check if filename represents a nested archive.
Args:
filename: Filename to check.
Returns:
True if nested archive detected.
"""
ext = os.path.splitext(filename)[1].lower()
for category in ZipThreatCategory:
if category == ZipThreatCategory.NESTED_ARCHIVES:
return ext in category.value
return False
def _inspect_entry_content(self, entry: zipfile.ZipInfo, zip_file: zipfile.ZipFile) -> List[str]:
def _inspect_entry_content(
self, entry: zipfile.ZipInfo, zip_file: zipfile.ZipFile
) -> list[str]:
"""
Inspect the content of a ZIP entry for threats.
Inspect ZIP entry content for malicious signatures.
Args:
entry: The ZIP entry to inspect
zip_file: The ZIP file object
entry: ZIP entry to inspect.
zip_file: Parent ZIP archive.
Returns:
List[str]: List of content threats found
List of content threat descriptions.
"""
threats = []
try:
# Read first few bytes to check for executable signatures
with zip_file.open(entry, 'r') as file:
with zip_file.open(entry, "r") as file:
content_sample = file.read(512) # Read first 512 bytes
# Check for executable signatures
for signature in SuspiciousFilePattern.EXECUTABLE_SIGNATURES.value:
if content_sample.startswith(signature):
threats.append(f"Executable content detected in '{entry.filename}'")
threats.append(
f"Executable content detected in '{entry.filename}'"
)
break
# Check for script content patterns
if self._contains_script_patterns(content_sample, entry.filename):
threats.append(f"Script content detected in '{entry.filename}'")
except Exception as err:
core_logger.print_to_log(
f"Warning: Could not inspect content of '{entry.filename}': {err}",
"warning"
logger.warning(
"Could not inspect content of '%s': %s",
entry.filename,
err,
)
return threats
def _contains_script_patterns(self, content: bytes, filename: str) -> bool:
"""Check if content contains script patterns."""
"""
Check content for malicious script patterns.
Args:
content: Raw bytes to inspect.
filename: Filename for context.
Returns:
True if script patterns found.
"""
try:
# Try to decode as text
text_content = content.decode('utf-8', errors='ignore').lower()
text_content = content.decode("utf-8", errors="ignore").lower()
# Check for common script patterns
script_patterns = [
'#!/bin/', '#!/usr/bin/', 'powershell', 'cmd.exe',
'eval(', 'exec(', 'system(', 'shell_exec(',
'<script', '<?php', '<%', 'import os', 'import subprocess'
"#!/bin/",
"#!/usr/bin/",
"powershell",
"cmd.exe",
"eval(",
"exec(",
"system(",
"shell_exec(",
"<script",
"<?php",
"<%",
"import os",
"import subprocess",
]
for pattern in script_patterns:
if pattern in text_content:
return True
except Exception:
# If we can't decode as text, it's probably binary
pass
return False
return False

View File

@@ -0,0 +1,51 @@
"""
Framework-agnostic protocols for file upload handling.
This module defines protocols that allow safeuploads to work with any
web framework's file upload implementation without depending on specific
framework packages.
"""
from typing import Protocol, runtime_checkable
@runtime_checkable
class UploadFileProtocol(Protocol):
"""
Protocol for file upload objects from any web framework.
This protocol defines the minimal interface required for file
validation. Any object with these attributes and methods can be
validated, regardless of the web framework being used.
Attributes:
filename: Original filename from the client.
size: Size of the uploaded file in bytes.
"""
filename: str | None
size: int | None
async def read(self, size: int = -1) -> bytes:
"""
Read bytes from the uploaded file.
Args:
size: Number of bytes to read. -1 reads entire file.
Returns:
Bytes read from the file.
"""
...
async def seek(self, offset: int) -> int:
"""
Move file pointer to specified position.
Args:
offset: Position to move to in bytes.
Returns:
New position in the file.
"""
...

View File

@@ -1,149 +0,0 @@
"""
File Security Utilities Module
Contains utility functions for file security operations.
"""
from fastapi import HTTPException, status, UploadFile
import core.logger as core_logger
from .file_validator import FileValidator
from .config import FileSecurityConfig
# Global validator instance
file_validator = FileValidator()
async def validate_profile_image_upload(file: UploadFile) -> None:
"""
Validate a profile image file upload.
This function validates an uploaded image file to ensure it meets the required
criteria for a profile image. If validation fails, it logs a warning and raises
an HTTP 400 Bad Request exception.
Args:
file (UploadFile): The uploaded file to validate.
Raises:
HTTPException: If the image file is invalid, raises a 400 Bad Request error
with details about the validation failure.
Returns:
None
"""
is_valid, error_message = await file_validator.validate_image_file(file)
if not is_valid:
core_logger.print_to_log(
f"Profile image upload validation failed: {error_message}", "warning"
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid image file: {error_message}",
)
async def validate_profile_data_upload(file: UploadFile) -> None:
"""
Validates a profile data upload file.
This function checks if the uploaded file is a valid ZIP file by delegating
validation to the file_validator. If validation fails, it logs a warning
and raises an HTTP 400 Bad Request exception.
Args:
file (UploadFile): The uploaded file to validate. Must be a ZIP file.
Raises:
HTTPException: If the file validation fails, raises a 400 Bad Request
exception with details about why the validation failed.
Returns:
None: This function doesn't return a value but raises an exception
if validation fails.
"""
is_valid, error_message = await file_validator.validate_zip_file(file)
if not is_valid:
core_logger.print_to_log(
f"Profile data upload validation failed: {error_message}", "warning"
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid ZIP file: {error_message}",
)
def get_secure_filename(original_filename: str) -> str:
"""
Sanitize and secure a filename for safe storage.
This function takes an original filename and sanitizes it to prevent security
vulnerabilities such as path traversal attacks and invalid characters.
Args:
original_filename (str): The original filename to be sanitized.
Returns:
str: A sanitized version of the filename that is safe to use.
Raises:
ValueError: If the filename cannot be sanitized (re-raised from validator).
HTTPException: If an unexpected error occurs during sanitization, returns
a 500 Internal Server Error.
Example:
>>> get_secure_filename("../../etc/passwd")
'passwd'
>>> get_secure_filename("my_file.txt")
'my_file.txt'
"""
try:
return file_validator._sanitize_filename(original_filename)
except ValueError as err:
raise err
except Exception as err:
core_logger.print_to_log(
f"Error during filename sanitization: {err}", "error", exc=err
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal Server Error during filename sanitization",
) from err
def validate_configuration(strict: bool = False) -> None:
"""
Validate the file security configuration and log the results.
This function validates the file security configuration using FileSecurityConfig's
validate_and_report method. It logs the outcome of the validation, indicating
success or any issues encountered during the process.
Args:
strict (bool, optional): If True, enforces strict validation rules.
Defaults to False.
Returns:
None
Raises:
None: All exceptions are caught and logged as warnings rather than
being propagated.
Note:
- Successful validation is logged at 'info' level
- Validation errors are logged at 'warning' level
"""
try:
FileSecurityConfig.validate_and_report(strict=strict)
core_logger.print_to_log(
"File security configuration validation completed successfully", "info"
)
except Exception as validation_error:
core_logger.print_to_log(
f"File security configuration validation encountered issues: {validation_error}",
"warning",
)

View File

@@ -1,4 +1,12 @@
# Validators package
"""
Security validation modules for uploaded files.
This package provides validators that check filenames and file
properties for potential security threats including Unicode attacks,
invalid extensions, Windows-specific vulnerabilities, and compression
bombs.
"""
from .base import BaseValidator
from .unicode_validator import UnicodeSecurityValidator
from .extension_validator import ExtensionSecurityValidator
@@ -7,8 +15,8 @@ from .compression_validator import CompressionSecurityValidator
__all__ = [
"BaseValidator",
"UnicodeSecurityValidator",
"UnicodeSecurityValidator",
"ExtensionSecurityValidator",
"WindowsSecurityValidator",
"CompressionSecurityValidator"
]
"CompressionSecurityValidator",
]

View File

@@ -1,8 +1,9 @@
"""
Base Validator Module
Base validator interface for file security checks.
"""
from __future__ import annotations
Contains base classes and interfaces for file security validators.
"""
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any
@@ -11,12 +12,32 @@ if TYPE_CHECKING:
class BaseValidator(ABC):
"""Base class for all file security validators."""
def __init__(self, config: "FileSecurityConfig"):
"""
Abstract base class for file security validators.
Attributes:
config: File security configuration parameters.
"""
def __init__(self, config: FileSecurityConfig):
"""
Initialize validator with configuration.
Args:
config: File security settings to apply.
"""
self.config = config
@abstractmethod
def validate(self, *args, **kwargs) -> Any:
"""Validate input according to the specific validator's requirements."""
pass
"""
Validate data using subclass-specific logic.
Args:
*args: Positional arguments for concrete validator.
**kwargs: Keyword arguments for concrete validator.
Returns:
Validated result defined by subclass.
"""
pass

View File

@@ -1,37 +1,65 @@
"""
Compression Security Validator Module
Validates ZIP compression ratios and detects zip bombs.
"""
from __future__ import annotations
Handles validation of ZIP compression ratios and zip bomb detection.
"""
import io
import time
import zipfile
from typing import Tuple, TYPE_CHECKING
import logging
import core.logger as core_logger
from typing import TYPE_CHECKING
from .base import BaseValidator
from ..exceptions import (
ZipBombError,
CompressionSecurityError,
FileProcessingError,
ErrorCode,
)
if TYPE_CHECKING:
from ..config import FileSecurityConfig
class CompressionSecurityValidator(BaseValidator):
logger = logging.getLogger(__name__)
def __init__(self, config: "FileSecurityConfig"):
class CompressionSecurityValidator(BaseValidator):
"""
Validates ZIP uploads against zip bombs and compression attacks.
Attributes:
config: Security configuration for validation limits.
"""
def __init__(self, config: FileSecurityConfig):
"""
Initialize the compression validator.
Args:
config: Security configuration with compression limits.
"""
super().__init__(config)
def validate_zip_compression_ratio(
self, file_content: bytes, compressed_size: int
) -> Tuple[bool, str]:
) -> None:
"""
Validate ZIP compression ratio to detect zip bombs.
Validate ZIP archive against security limits.
Args:
file_content: The ZIP file content as bytes
compressed_size: The compressed file size
Returns:
Tuple[bool, str]: (is_valid, error_message)
file_content: Raw bytes of the ZIP archive.
compressed_size: Size of the compressed archive in bytes.
Raises:
ZipBombError: If compression ratio exceeds maximum allowed
or total uncompressed size is too large.
CompressionSecurityError: If ZIP structure is invalid, too
many entries, nested archives detected, or individual
file too large.
FileProcessingError: If unexpected error occurs during
validation such as memory errors or I/O errors.
"""
try:
# Create a BytesIO object from file content for zipfile analysis
@@ -54,18 +82,37 @@ class CompressionSecurityValidator(BaseValidator):
file_count = len(zip_entries)
if file_count > self.config.limits.max_zip_entries:
return (
False,
f"ZIP contains too many files: {file_count}. Maximum allowed: {self.config.limits.max_zip_entries}",
logger.warning(
"ZIP contains too many files",
extra={
"error_type": "zip_too_many_entries",
"file_count": file_count,
"max_entries": self.config.limits.max_zip_entries,
},
)
raise CompressionSecurityError(
message=f"ZIP contains too many files: {file_count}. "
f"Maximum allowed: {self.config.limits.max_zip_entries}",
error_code=ErrorCode.ZIP_TOO_MANY_ENTRIES,
)
# Analyze each entry in the ZIP
for entry in zip_entries:
# Check for timeout
if time.time() - start_time > self.config.limits.zip_analysis_timeout:
return (
False,
f"ZIP analysis timeout after {self.config.limits.zip_analysis_timeout}s - potential zip bomb",
if (
time.time() - start_time
> self.config.limits.zip_analysis_timeout
):
logger.error(
"ZIP analysis timeout",
extra={
"error_type": "zip_analysis_timeout",
"timeout": self.config.limits.zip_analysis_timeout,
},
)
raise ZipBombError(
message=f"ZIP analysis timeout after {self.config.limits.zip_analysis_timeout}s - potential zip bomb",
compression_ratio=0,
)
# Skip directories
@@ -85,9 +132,19 @@ class CompressionSecurityValidator(BaseValidator):
)
if compression_ratio > self.config.limits.max_compression_ratio:
return (
False,
f"Excessive compression ratio detected: {compression_ratio:.1f}:1 for '{entry.filename}'. Maximum allowed: {self.config.limits.max_compression_ratio}:1",
logger.error(
"Excessive compression ratio detected",
extra={
"error_type": "compression_ratio_exceeded",
"file_name": entry.filename,
"compression_ratio": compression_ratio,
"max_ratio": self.config.limits.max_compression_ratio,
},
)
raise ZipBombError(
message=f"Excessive compression ratio detected: {compression_ratio:.1f}:1 for '{entry.filename}'. "
f"Maximum allowed: {self.config.limits.max_compression_ratio}:1",
compression_ratio=compression_ratio,
)
# Check for nested archive files
@@ -101,17 +158,39 @@ class CompressionSecurityValidator(BaseValidator):
# Check for excessively large individual files
# Use the configurable max_individual_file_size limit
if uncompressed_size > self.config.limits.max_individual_file_size:
return (
False,
f"Individual file too large: '{entry.filename}' would expand to {uncompressed_size // (1024*1024)}MB. "
logger.warning(
"Individual file too large",
extra={
"error_type": "file_too_large",
"file_name": entry.filename,
"size_mb": uncompressed_size // (1024 * 1024),
"max_size_mb": self.config.limits.max_individual_file_size
// (1024 * 1024),
},
)
raise CompressionSecurityError(
message=f"Individual file too large: '{entry.filename}' would expand to {uncompressed_size // (1024*1024)}MB. "
f"Maximum allowed: {self.config.limits.max_individual_file_size // (1024*1024)}MB",
error_code=ErrorCode.FILE_TOO_LARGE,
)
# Check total uncompressed size
if total_uncompressed_size > self.config.limits.max_uncompressed_size:
return (
False,
f"Total uncompressed size too large: {total_uncompressed_size // (1024*1024)}MB. Maximum allowed: {self.config.limits.max_uncompressed_size // (1024*1024)}MB",
logger.warning(
"Total uncompressed size too large",
extra={
"error_type": "zip_too_large",
"total_size_mb": total_uncompressed_size // (1024 * 1024),
"max_size_mb": self.config.limits.max_uncompressed_size
// (1024 * 1024),
},
)
raise ZipBombError(
message=f"Total uncompressed size too large: {total_uncompressed_size // (1024*1024)}MB. "
f"Maximum allowed: {self.config.limits.max_uncompressed_size // (1024*1024)}MB",
compression_ratio=0,
uncompressed_size=total_uncompressed_size,
max_size=self.config.limits.max_uncompressed_size,
)
# Check overall compression ratio
@@ -119,44 +198,89 @@ class CompressionSecurityValidator(BaseValidator):
overall_compression_ratio = (
total_uncompressed_size / total_compressed_size
)
if overall_compression_ratio > self.config.limits.max_compression_ratio:
return (
False,
f"Overall compression ratio too high: {overall_compression_ratio:.1f}:1. Maximum allowed: {self.config.limits.max_compression_ratio}:1",
if (
overall_compression_ratio
> self.config.limits.max_compression_ratio
):
logger.error(
"Overall compression ratio too high",
extra={
"error_type": "compression_ratio_exceeded",
"overall_ratio": overall_compression_ratio,
"max_ratio": self.config.limits.max_compression_ratio,
},
)
raise ZipBombError(
message=f"Overall compression ratio too high: {overall_compression_ratio:.1f}:1. "
f"Maximum allowed: {self.config.limits.max_compression_ratio}:1",
compression_ratio=overall_compression_ratio,
max_ratio=self.config.limits.max_compression_ratio,
)
# Reject nested archives (potential security risk)
if nested_archives:
core_logger.print_to_log(
"Detected nested archives in ZIP file. Upload rejected for security.",
"warning",
logger.warning(
"Nested archives detected",
extra={
"error_type": "zip_nested_archive",
"nested_archives": nested_archives,
},
)
raise CompressionSecurityError(
message=f"Nested archives are not allowed: {', '.join(nested_archives)}",
error_code=ErrorCode.ZIP_NESTED_ARCHIVE,
)
return (False, "Nested archives are not allowed")
# Log analysis results
core_logger.print_to_log(
f"ZIP analysis: {file_count} files, {total_uncompressed_size // (1024*1024)}MB uncompressed, "
f"max ratio: {max_compression_ratio:.1f}:1, overall ratio: {overall_compression_ratio:.1f}:1",
"debug",
logger.debug(
"ZIP analysis: %s files, %sMB uncompressed, max ratio: %.1f:1, overall ratio: %.1f:1",
file_count,
total_uncompressed_size // (1024 * 1024),
max_compression_ratio,
overall_compression_ratio,
)
return True, "ZIP compression validation passed"
except zipfile.BadZipFile:
return False, "Invalid or corrupted ZIP file"
except zipfile.LargeZipFile:
return False, "ZIP file too large to process safely"
except MemoryError:
return (
False,
"ZIP file requires too much memory to process - potential zip bomb",
)
except zipfile.BadZipFile as err:
logger.error("Invalid or corrupted ZIP file", exc_info=True)
raise CompressionSecurityError(
message="Invalid or corrupted ZIP file",
error_code=ErrorCode.ZIP_CORRUPT,
) from err
except zipfile.LargeZipFile as err:
logger.error("ZIP file too large to process", exc_info=True)
raise CompressionSecurityError(
message="ZIP file too large to process safely",
error_code=ErrorCode.ZIP_TOO_LARGE,
) from err
except MemoryError as err:
logger.error("ZIP requires excessive memory", exc_info=True)
raise ZipBombError(
message="ZIP file requires too much memory to process - potential zip bomb",
compression_ratio=0,
) from err
except (ZipBombError, CompressionSecurityError):
# Re-raise our own exceptions
raise
except Exception as err:
core_logger.print_to_log(
f"Error during ZIP compression validation: {err}", "warning", exc=err
logger.error(
"Unexpected error during ZIP compression validation",
exc_info=True,
)
return False, f"ZIP validation failed: {str(err)}"
def validate(self, file_content: bytes, compressed_size: int) -> Tuple[bool, str]:
"""Compatibility method for base class interface."""
return self.validate_zip_compression_ratio(file_content, compressed_size)
raise FileProcessingError(
message=f"ZIP validation failed: {str(err)}",
) from err
def validate(self, file_content: bytes, compressed_size: int) -> None:
"""
Validate the compression ratio of a ZIP file.
Args:
file_content: Raw bytes of the uploaded file.
compressed_size: Size of the file after compression in bytes.
Raises:
ZipBombError: If compression ratio exceeds maximum allowed.
CompressionSecurityError: If ZIP structure is invalid.
FileProcessingError: If unexpected error occurs.
"""
return self.validate_zip_compression_ratio(file_content, compressed_size)

View File

@@ -1,48 +1,97 @@
"""
Extension Security Validator Module
from __future__ import annotations
Handles validation of file extensions for security threats.
"""
import logging
from typing import TYPE_CHECKING
from .base import BaseValidator
from ..exceptions import ExtensionSecurityError, ErrorCode
if TYPE_CHECKING:
from ..config import FileSecurityConfig
class ExtensionSecurityValidator(BaseValidator):
logger = logging.getLogger(__name__)
def __init__(self, config: "FileSecurityConfig"):
class ExtensionSecurityValidator(BaseValidator):
"""
Validates filenames against configured forbidden extensions.
Attributes:
config: File security configuration settings.
"""
def __init__(self, config: FileSecurityConfig):
"""
Initialize the validator.
Args:
config: File security configuration settings.
"""
super().__init__(config)
def validate_extensions(self, filename: str) -> None:
"""
Validate file extensions for security threats.
Validate filename against blocked extensions.
Args:
filename: The filename to validate
filename: Name of the file to validate.
Raises:
ValueError: If dangerous extensions are detected
ExtensionSecurityError: If blocked compound or single
extension detected in filename.
"""
# Check for compound dangerous extensions first (e.g., .tar.xz, .user.js)
filename_lower = filename.lower()
for compound_ext in self.config.COMPOUND_BLOCKED_EXTENSIONS:
if filename_lower.endswith(compound_ext):
raise ValueError(
f"Dangerous compound file extension '{compound_ext}' detected in filename. Upload rejected for security."
logger.warning(
"Dangerous compound extension detected",
extra={
"error_type": "compound_extension_blocked",
"file_name": filename,
"extension": compound_ext,
},
)
raise ExtensionSecurityError(
message=f"Dangerous compound file extension '{compound_ext}' detected in filename. "
f"Upload rejected for security.",
filename=filename,
extension=compound_ext,
error_code=ErrorCode.COMPOUND_EXTENSION_BLOCKED,
)
# Check ALL extensions in the filename for dangerous ones
parts = filename.split(".")
if len(parts) > 1:
for i in range(1, len(parts)):
if f".{parts[i].lower()}" in self.config.BLOCKED_EXTENSIONS:
raise ValueError(
f"Dangerous file extension '.{parts[i].lower()}' detected in filename. Upload rejected for security."
ext = f".{parts[i].lower()}"
if ext in self.config.BLOCKED_EXTENSIONS:
logger.warning(
"Dangerous extension detected",
extra={
"error_type": "extension_blocked",
"file_name": filename,
"extension": ext,
},
)
raise ExtensionSecurityError(
message=f"Dangerous file extension '{ext}' detected in filename. "
f"Upload rejected for security.",
filename=filename,
extension=ext,
error_code=ErrorCode.EXTENSION_BLOCKED,
)
def validate(self, filename: str) -> None:
"""Compatibility method for base class interface."""
return self.validate_extensions(filename)
"""
Validate the given filename.
Args:
filename: Name of the file to validate.
Raises:
ExtensionSecurityError: If filename extension is not
permitted.
"""
return self.validate_extensions(filename)

View File

@@ -1,35 +1,51 @@
"""
Unicode Security Validator Module
"""Unicode Security Validator Module."""
from __future__ import annotations
Handles validation of Unicode-based attacks in filenames.
"""
import unicodedata
from typing import TYPE_CHECKING
import logging
import core.logger as core_logger
from typing import TYPE_CHECKING
from .base import BaseValidator
from ..exceptions import UnicodeSecurityError
if TYPE_CHECKING:
from ..config import FileSecurityConfig
logger = logging.getLogger(__name__)
class UnicodeSecurityValidator(BaseValidator):
def __init__(self, config: "FileSecurityConfig"):
"""
Validates filenames for Unicode security threats.
Attributes:
config: Runtime configuration for file security rules.
"""
def __init__(self, config: FileSecurityConfig):
"""
Initialize the Unicode validator.
Args:
config: Runtime configuration that controls file security rules.
"""
super().__init__(config)
def validate_unicode_security(self, filename: str) -> str:
"""
Validate and normalize Unicode characters in filenames.
Validate filename for unsafe Unicode characters.
Args:
filename: The filename to validate and normalize
filename: The filename to validate and normalize.
Returns:
str: The normalized filename
The NFC-normalized filename.
Raises:
ValueError: If dangerous Unicode characters are detected
UnicodeSecurityError: If dangerous Unicode characters are
detected in the filename or result from normalization.
"""
if not filename:
return filename
@@ -50,9 +66,20 @@ class UnicodeSecurityValidator(BaseValidator):
f"'{char}' (U+{code:04X}: {char_name}) at position {pos}"
)
raise ValueError(
f"Dangerous Unicode characters detected in filename: {', '.join(char_details)}. "
f"These characters can be used to disguise file extensions or create security vulnerabilities."
logger.warning(
"Dangerous Unicode characters detected",
extra={
"error_type": "unicode_security",
"file_name": filename,
"char_codes": [code for _, code, _ in dangerous_chars_found],
"positions": [pos for _, _, pos in dangerous_chars_found],
},
)
raise UnicodeSecurityError(
message=f"Dangerous Unicode characters detected in filename: {', '.join(char_details)}. "
f"These characters can be used to disguise file extensions or create security vulnerabilities.",
filename=filename,
dangerous_chars=dangerous_chars_found,
)
# Normalize Unicode to prevent normalization attacks
@@ -62,9 +89,10 @@ class UnicodeSecurityValidator(BaseValidator):
# Check if normalization changed the filename significantly
if normalized_filename != filename:
core_logger.print_to_log(
f"Unicode normalization applied: '{filename}' -> '{normalized_filename}'",
"info",
logger.info(
"Unicode normalization applied: '%s' -> '%s'",
filename,
normalized_filename,
)
# Additional check: ensure normalized filename doesn't contain dangerous chars
@@ -72,13 +100,33 @@ class UnicodeSecurityValidator(BaseValidator):
for char in normalized_filename:
char_code = ord(char)
if char_code in self.config.DANGEROUS_UNICODE_CHARS:
raise ValueError(
f"Unicode normalization resulted in dangerous character: "
f"'{char}' (U+{char_code:04X}: {unicodedata.name(char, f'U+{char_code:04X}')})"
char_name = unicodedata.name(char, f"U+{char_code:04X}")
logger.error(
"Unicode normalization resulted in dangerous character",
extra={
"error_type": "unicode_normalization_error",
"file_name": filename,
"normalized_filename": normalized_filename,
"char_code": char_code,
},
)
raise UnicodeSecurityError(
message=f"Unicode normalization resulted in dangerous character: "
f"'{char}' (U+{char_code:04X}: {char_name})",
filename=filename,
dangerous_chars=[(char, char_code, 0)],
)
return normalized_filename
def validate(self, filename: str) -> str:
"""Compatibility method for base class interface."""
return self.validate_unicode_security(filename)
"""
Validate a filename for Unicode security issues.
Args:
filename: The name of the file to assess.
Returns:
The validated and normalized filename.
"""
return self.validate_unicode_security(filename)

View File

@@ -1,44 +1,95 @@
"""
Windows Security Validator Module
"""Windows security validator for filename validation."""
from __future__ import annotations
Handles validation of Windows-specific security threats.
"""
import os
import logging
from typing import TYPE_CHECKING
from .base import BaseValidator
from ..exceptions import WindowsReservedNameError
if TYPE_CHECKING:
from ..config import FileSecurityConfig
class WindowsSecurityValidator(BaseValidator):
logger = logging.getLogger(__name__)
def __init__(self, config: "FileSecurityConfig"):
class WindowsSecurityValidator(BaseValidator):
"""
Validator for Windows reserved device names.
Attributes:
config: File security configuration settings.
"""
def __init__(self, config: FileSecurityConfig):
"""
Initialize the validator.
Args:
config: File security configuration settings.
"""
super().__init__(config)
def validate_windows_reserved_names(self, filename: str) -> None:
"""
Validate that filename doesn't use Windows reserved names.
Args:
filename: The filename to validate
Raises:
ValueError: If Windows reserved names are detected
"""
name_without_ext = os.path.splitext(filename)[0].lower().strip()
# Remove leading dots to handle hidden files like ".CON.jpg"
name_without_ext = name_without_ext.lstrip(".")
# Remove trailing dots to handle cases like "con." or "con.."
name_without_ext = name_without_ext.rstrip(".")
Validate filename against Windows reserved device names.
Args:
filename: The filename to validate.
Raises:
WindowsReservedNameError: If filename matches a Windows
reserved device name.
"""
# Check iteratively by removing extensions to handle compound extensions
# e.g., "CON.tar.gz" -> check "con.tar" and "con"
current_name = filename
while current_name:
# Get basename without extension
name_without_ext, ext = os.path.splitext(current_name)
# Normalize: lowercase, strip whitespace
name_to_check = name_without_ext.lower().strip()
# Remove leading dots to handle hidden files like ".CON.jpg"
name_to_check = name_to_check.lstrip(".")
# Remove trailing dots to handle cases like "con." or "con.."
name_to_check = name_to_check.rstrip(".")
if name_to_check in self.config.WINDOWS_RESERVED_NAMES:
logger.warning(
"Windows reserved name detected",
extra={
"error_type": "windows_reserved_name",
"file_name": filename,
"reserved_name": name_to_check.upper(),
},
)
raise WindowsReservedNameError(
message=f"Filename '{filename}' uses Windows reserved name '{name_to_check.upper()}'. "
f"Reserved names: {', '.join(sorted(self.config.WINDOWS_RESERVED_NAMES)).upper()}",
filename=filename,
reserved_name=name_to_check.upper(),
)
# If no extension was removed, we're done
if not ext or name_without_ext == current_name:
break
current_name = name_without_ext
if name_without_ext in self.config.WINDOWS_RESERVED_NAMES:
raise ValueError(
f"Filename '{filename}' uses Windows reserved name '{name_without_ext.upper()}'. "
f"Reserved names: {', '.join(sorted(self.config.WINDOWS_RESERVED_NAMES)).upper()}"
)
def validate(self, filename: str) -> None:
"""Compatibility method for base class interface."""
return self.validate_windows_reserved_names(filename)
"""
Validate filename against Windows reserved naming rules.
Args:
filename: The filename to validate.
Raises:
WindowsReservedNameError: If filename matches a Windows
reserved device name.
"""
return self.validate_windows_reserved_names(filename)

View File

@@ -24,13 +24,18 @@ import session.crud as session_crud
import core.database as core_database
import core.logger as core_logger
import core.file_security.utils as core_file_security_utils
from core.file_security.file_validator import FileValidator
from core.file_security.exceptions import FileValidationError
import websocket.schema as websocket_schema
# Define the API router
router = APIRouter()
# Initialize the file validator
file_validator = FileValidator()
@router.get("", response_model=users_schema.UserMe)
async def read_users_me(
@@ -167,7 +172,12 @@ async def upload_profile_image(
HTTPException: If the upload validation fails or save operation fails.
"""
# Comprehensive security validation
await core_file_security_utils.validate_profile_image_upload(file)
try:
await file_validator.validate_image_file(file)
except FileValidationError as err:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=str(err)
) from err
# If validation passes, proceed with saving
return await users_utils.save_user_image(token_user_id, file, db)
@@ -485,7 +495,12 @@ async def import_profile_data(
```
"""
# Comprehensive security validation
await core_file_security_utils.validate_profile_data_upload(file)
try:
await file_validator.validate_zip_file(file)
except FileValidationError as err:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=str(err)
) from err
try:
# Read the ZIP file data