diff --git a/autogpt_platform/backend/backend/api/features/store/media.py b/autogpt_platform/backend/backend/api/features/store/media.py index cfdc71567a..d741dd4966 100644 --- a/autogpt_platform/backend/backend/api/features/store/media.py +++ b/autogpt_platform/backend/backend/api/features/store/media.py @@ -71,46 +71,41 @@ async def upload_media( logger.error(f"Error reading file content: {str(e)}") raise store_exceptions.FileReadError("Failed to read file content") from e - # Validate file signature/magic bytes - if file.content_type in ALLOWED_IMAGE_TYPES: - # Check image file signatures - if content.startswith(b"\xff\xd8\xff"): # JPEG - if file.content_type != "image/jpeg": - raise store_exceptions.InvalidFileTypeError( - "File signature does not match content type" - ) - elif content.startswith(b"\x89PNG\r\n\x1a\n"): # PNG - if file.content_type != "image/png": - raise store_exceptions.InvalidFileTypeError( - "File signature does not match content type" - ) - elif content.startswith(b"GIF87a") or content.startswith(b"GIF89a"): # GIF - if file.content_type != "image/gif": - raise store_exceptions.InvalidFileTypeError( - "File signature does not match content type" - ) - elif content.startswith(b"RIFF") and content[8:12] == b"WEBP": # WebP - if file.content_type != "image/webp": - raise store_exceptions.InvalidFileTypeError( - "File signature does not match content type" - ) - else: - raise store_exceptions.InvalidFileTypeError("Invalid image file signature") + # Detect actual content type from file signature/magic bytes + # Trust the file signature over the declared content-type header + detected_content_type: str | None = None - elif file.content_type in ALLOWED_VIDEO_TYPES: - # Check video file signatures - if content.startswith(b"\x00\x00\x00") and (content[4:8] == b"ftyp"): # MP4 - if file.content_type != "video/mp4": - raise store_exceptions.InvalidFileTypeError( - "File signature does not match content type" - ) - elif content.startswith(b"\x1a\x45\xdf\xa3"): # WebM - if file.content_type != "video/webm": - raise store_exceptions.InvalidFileTypeError( - "File signature does not match content type" - ) - else: - raise store_exceptions.InvalidFileTypeError("Invalid video file signature") + # Check image file signatures + if content.startswith(b"\xff\xd8\xff"): # JPEG + detected_content_type = "image/jpeg" + elif content.startswith(b"\x89PNG\r\n\x1a\n"): # PNG + detected_content_type = "image/png" + elif content.startswith(b"GIF87a") or content.startswith(b"GIF89a"): # GIF + detected_content_type = "image/gif" + elif content.startswith(b"RIFF") and len(content) >= 12 and content[8:12] == b"WEBP": # WebP + detected_content_type = "image/webp" + # Check video file signatures + elif content.startswith(b"\x00\x00\x00") and len(content) >= 8 and content[4:8] == b"ftyp": # MP4 + detected_content_type = "video/mp4" + elif content.startswith(b"\x1a\x45\xdf\xa3"): # WebM + detected_content_type = "video/webm" + + # If we detected a valid type, use it; otherwise reject the file + if detected_content_type is None: + raise store_exceptions.InvalidFileTypeError( + "Could not detect a valid image or video file signature. " + "Supported formats: JPEG, PNG, GIF, WebP, MP4, WebM" + ) + + # Log if we're auto-correcting a mismatched content-type + if file.content_type != detected_content_type: + logger.info( + f"Auto-correcting content-type from '{file.content_type}' to " + f"'{detected_content_type}' based on file signature" + ) + + # Use the detected content type going forward + content_type = detected_content_type settings = Settings() @@ -122,19 +117,7 @@ async def upload_media( ) try: - # Validate file type - content_type = file.content_type - if content_type is None: - content_type = "image/jpeg" - - if ( - content_type not in ALLOWED_IMAGE_TYPES - and content_type not in ALLOWED_VIDEO_TYPES - ): - logger.warning(f"Invalid file type attempted: {content_type}") - raise store_exceptions.InvalidFileTypeError( - f"File type not supported. Must be jpeg, png, gif, webp, mp4 or webm. Content type: {content_type}" - ) + # content_type is already validated from file signature detection above # Validate file size file_size = 0 diff --git a/autogpt_platform/backend/backend/api/features/store/media_test.py b/autogpt_platform/backend/backend/api/features/store/media_test.py index 7f3899c8a5..d765e77a52 100644 --- a/autogpt_platform/backend/backend/api/features/store/media_test.py +++ b/autogpt_platform/backend/backend/api/features/store/media_test.py @@ -191,23 +191,35 @@ async def test_upload_media_webm_success(mock_settings, mock_storage_client): assert result.endswith(".webm") -async def test_upload_media_mismatched_signature(mock_settings, mock_storage_client): +async def test_upload_media_mismatched_signature_auto_corrects( + mock_settings, mock_storage_client +): + """Test that mismatched content-type is auto-corrected based on file signature.""" test_file = fastapi.UploadFile( filename="test.jpeg", file=io.BytesIO(b"\x89PNG\r\n\x1a\n"), # PNG signature with JPEG content type headers=starlette.datastructures.Headers({"content-type": "image/jpeg"}), ) - with pytest.raises(store_exceptions.InvalidFileTypeError): - await store_media.upload_media("test-user", test_file) + # Should auto-correct to PNG and succeed + result = await store_media.upload_media("test-user", test_file) + assert result.startswith( + "https://storage.googleapis.com/test-bucket/users/test-user/images/" + ) + # File should be stored as PNG based on actual content + mock_storage_client.upload.assert_called_once() async def test_upload_media_invalid_signature(mock_settings, mock_storage_client): + """Test that files with unrecognized signatures are rejected.""" test_file = fastapi.UploadFile( filename="test.jpeg", file=io.BytesIO(b"invalid signature"), headers=starlette.datastructures.Headers({"content-type": "image/jpeg"}), ) - with pytest.raises(store_exceptions.InvalidFileTypeError): + with pytest.raises(store_exceptions.InvalidFileTypeError) as exc_info: await store_media.upload_media("test-user", test_file) + assert "Could not detect a valid image or video file signature" in str( + exc_info.value + )