diff --git a/autogpt_platform/backend/backend/util/file.py b/autogpt_platform/backend/backend/util/file.py index baa9225629..0dfdb5bd29 100644 --- a/autogpt_platform/backend/backend/util/file.py +++ b/autogpt_platform/backend/backend/util/file.py @@ -313,6 +313,14 @@ async def store_media_file( if not target_path.is_file(): raise ValueError(f"Local file does not exist: {target_path}") + # Virus scan the local file before any further processing + local_content = target_path.read_bytes() + if len(local_content) > MAX_FILE_SIZE_BYTES: + raise ValueError( + f"File too large: {len(local_content)} bytes > {MAX_FILE_SIZE_BYTES} bytes" + ) + await scan_content_safe(local_content, filename=sanitized_file) + # Return based on requested format if return_format == "for_local_processing": # Use when processing files locally with tools like ffmpeg, MoviePy, PIL diff --git a/autogpt_platform/backend/backend/util/file_test.py b/autogpt_platform/backend/backend/util/file_test.py index 9fe672d155..87c53e4305 100644 --- a/autogpt_platform/backend/backend/util/file_test.py +++ b/autogpt_platform/backend/backend/util/file_test.py @@ -247,3 +247,100 @@ class TestFileCloudIntegration: execution_context=make_test_context(graph_exec_id=graph_exec_id), return_format="for_local_processing", ) + + @pytest.mark.asyncio + async def test_store_media_file_local_path_scanned(self): + """Test that local file paths are scanned for viruses.""" + graph_exec_id = "test-exec-123" + local_file = "test_video.mp4" + file_content = b"fake video content" + + with patch( + "backend.util.file.get_cloud_storage_handler" + ) as mock_handler_getter, patch( + "backend.util.file.scan_content_safe" + ) as mock_scan, patch( + "backend.util.file.Path" + ) as mock_path_class: + + # Mock cloud storage handler - not a cloud path + mock_handler = MagicMock() + mock_handler.is_cloud_path.return_value = False + mock_handler_getter.return_value = mock_handler + + # Mock virus scanner + mock_scan.return_value = None + + # Mock file system operations + mock_base_path = MagicMock() + mock_target_path = MagicMock() + mock_resolved_path = MagicMock() + + mock_path_class.return_value = mock_base_path + mock_base_path.mkdir = MagicMock() + mock_base_path.__truediv__ = MagicMock(return_value=mock_target_path) + mock_target_path.resolve.return_value = mock_resolved_path + mock_resolved_path.is_relative_to.return_value = True + mock_resolved_path.is_file.return_value = True + mock_resolved_path.read_bytes.return_value = file_content + mock_resolved_path.relative_to.return_value = Path(local_file) + mock_resolved_path.name = local_file + + result = await store_media_file( + file=MediaFileType(local_file), + execution_context=make_test_context(graph_exec_id=graph_exec_id), + return_format="for_local_processing", + ) + + # Verify virus scan was called for local file + mock_scan.assert_called_once_with(file_content, filename=local_file) + + # Result should be the relative path + assert str(result) == local_file + + @pytest.mark.asyncio + async def test_store_media_file_local_path_virus_detected(self): + """Test that infected local files raise VirusDetectedError.""" + from backend.api.features.store.exceptions import VirusDetectedError + + graph_exec_id = "test-exec-123" + local_file = "infected.exe" + file_content = b"malicious content" + + with patch( + "backend.util.file.get_cloud_storage_handler" + ) as mock_handler_getter, patch( + "backend.util.file.scan_content_safe" + ) as mock_scan, patch( + "backend.util.file.Path" + ) as mock_path_class: + + # Mock cloud storage handler - not a cloud path + mock_handler = MagicMock() + mock_handler.is_cloud_path.return_value = False + mock_handler_getter.return_value = mock_handler + + # Mock virus scanner to detect virus + mock_scan.side_effect = VirusDetectedError( + "EICAR-Test-File", "File rejected due to virus detection" + ) + + # Mock file system operations + mock_base_path = MagicMock() + mock_target_path = MagicMock() + mock_resolved_path = MagicMock() + + mock_path_class.return_value = mock_base_path + mock_base_path.mkdir = MagicMock() + mock_base_path.__truediv__ = MagicMock(return_value=mock_target_path) + mock_target_path.resolve.return_value = mock_resolved_path + mock_resolved_path.is_relative_to.return_value = True + mock_resolved_path.is_file.return_value = True + mock_resolved_path.read_bytes.return_value = file_content + + with pytest.raises(VirusDetectedError): + await store_media_file( + file=MediaFileType(local_file), + execution_context=make_test_context(graph_exec_id=graph_exec_id), + return_format="for_local_processing", + )