mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-05 20:35:10 -05:00
feat(backend): Add ClamAV scanning for local file paths in store_media_file
Files processed via local paths in store_media_file() were not being scanned through ClamAV, unlike URLs, data URIs, cloud paths, and workspace references which already had scanning. This gap affected video processing blocks (LoopVideoBlock, AddAudioToVideoBlock, etc.) that write output to temp directories then pass filenames to store_media_file(). Changes: - Add virus scanning to the local file path branch in store_media_file() - Add file size limit check consistent with other input types - Add unit tests for local file scanning behavior Closes SECRT-1904
This commit is contained in:
@@ -313,6 +313,14 @@ async def store_media_file(
|
||||
if not target_path.is_file():
|
||||
raise ValueError(f"Local file does not exist: {target_path}")
|
||||
|
||||
# Virus scan the local file before any further processing
|
||||
local_content = target_path.read_bytes()
|
||||
if len(local_content) > MAX_FILE_SIZE_BYTES:
|
||||
raise ValueError(
|
||||
f"File too large: {len(local_content)} bytes > {MAX_FILE_SIZE_BYTES} bytes"
|
||||
)
|
||||
await scan_content_safe(local_content, filename=sanitized_file)
|
||||
|
||||
# Return based on requested format
|
||||
if return_format == "for_local_processing":
|
||||
# Use when processing files locally with tools like ffmpeg, MoviePy, PIL
|
||||
|
||||
@@ -247,3 +247,100 @@ class TestFileCloudIntegration:
|
||||
execution_context=make_test_context(graph_exec_id=graph_exec_id),
|
||||
return_format="for_local_processing",
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_store_media_file_local_path_scanned(self):
|
||||
"""Test that local file paths are scanned for viruses."""
|
||||
graph_exec_id = "test-exec-123"
|
||||
local_file = "test_video.mp4"
|
||||
file_content = b"fake video content"
|
||||
|
||||
with patch(
|
||||
"backend.util.file.get_cloud_storage_handler"
|
||||
) as mock_handler_getter, patch(
|
||||
"backend.util.file.scan_content_safe"
|
||||
) as mock_scan, patch(
|
||||
"backend.util.file.Path"
|
||||
) as mock_path_class:
|
||||
|
||||
# Mock cloud storage handler - not a cloud path
|
||||
mock_handler = MagicMock()
|
||||
mock_handler.is_cloud_path.return_value = False
|
||||
mock_handler_getter.return_value = mock_handler
|
||||
|
||||
# Mock virus scanner
|
||||
mock_scan.return_value = None
|
||||
|
||||
# Mock file system operations
|
||||
mock_base_path = MagicMock()
|
||||
mock_target_path = MagicMock()
|
||||
mock_resolved_path = MagicMock()
|
||||
|
||||
mock_path_class.return_value = mock_base_path
|
||||
mock_base_path.mkdir = MagicMock()
|
||||
mock_base_path.__truediv__ = MagicMock(return_value=mock_target_path)
|
||||
mock_target_path.resolve.return_value = mock_resolved_path
|
||||
mock_resolved_path.is_relative_to.return_value = True
|
||||
mock_resolved_path.is_file.return_value = True
|
||||
mock_resolved_path.read_bytes.return_value = file_content
|
||||
mock_resolved_path.relative_to.return_value = Path(local_file)
|
||||
mock_resolved_path.name = local_file
|
||||
|
||||
result = await store_media_file(
|
||||
file=MediaFileType(local_file),
|
||||
execution_context=make_test_context(graph_exec_id=graph_exec_id),
|
||||
return_format="for_local_processing",
|
||||
)
|
||||
|
||||
# Verify virus scan was called for local file
|
||||
mock_scan.assert_called_once_with(file_content, filename=local_file)
|
||||
|
||||
# Result should be the relative path
|
||||
assert str(result) == local_file
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_store_media_file_local_path_virus_detected(self):
|
||||
"""Test that infected local files raise VirusDetectedError."""
|
||||
from backend.api.features.store.exceptions import VirusDetectedError
|
||||
|
||||
graph_exec_id = "test-exec-123"
|
||||
local_file = "infected.exe"
|
||||
file_content = b"malicious content"
|
||||
|
||||
with patch(
|
||||
"backend.util.file.get_cloud_storage_handler"
|
||||
) as mock_handler_getter, patch(
|
||||
"backend.util.file.scan_content_safe"
|
||||
) as mock_scan, patch(
|
||||
"backend.util.file.Path"
|
||||
) as mock_path_class:
|
||||
|
||||
# Mock cloud storage handler - not a cloud path
|
||||
mock_handler = MagicMock()
|
||||
mock_handler.is_cloud_path.return_value = False
|
||||
mock_handler_getter.return_value = mock_handler
|
||||
|
||||
# Mock virus scanner to detect virus
|
||||
mock_scan.side_effect = VirusDetectedError(
|
||||
"EICAR-Test-File", "File rejected due to virus detection"
|
||||
)
|
||||
|
||||
# Mock file system operations
|
||||
mock_base_path = MagicMock()
|
||||
mock_target_path = MagicMock()
|
||||
mock_resolved_path = MagicMock()
|
||||
|
||||
mock_path_class.return_value = mock_base_path
|
||||
mock_base_path.mkdir = MagicMock()
|
||||
mock_base_path.__truediv__ = MagicMock(return_value=mock_target_path)
|
||||
mock_target_path.resolve.return_value = mock_resolved_path
|
||||
mock_resolved_path.is_relative_to.return_value = True
|
||||
mock_resolved_path.is_file.return_value = True
|
||||
mock_resolved_path.read_bytes.return_value = file_content
|
||||
|
||||
with pytest.raises(VirusDetectedError):
|
||||
await store_media_file(
|
||||
file=MediaFileType(local_file),
|
||||
execution_context=make_test_context(graph_exec_id=graph_exec_id),
|
||||
return_format="for_local_processing",
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user