feat(backend): Add ClamAV support for anti-virus scan on file upload to the platform (#10232)

An anti-virus file scan step is added to each file upload step on the
platform before the file is sent to cloud storage or local machine
storage.

### Changes 🏗️

* Added ClamAV service
* Added AV file scan on each upload step
* Added tests & documentation
* Make the step mandatory even on local development

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [x] Tried using FileUploadBlock & AgentFileInputBlock
This commit is contained in:
Zamil Majdy
2025-06-25 08:29:04 -07:00
committed by GitHub
parent 203cb1c88c
commit 68749a28d4
12 changed files with 574 additions and 4 deletions

View File

@@ -50,6 +50,23 @@ jobs:
env:
RABBITMQ_DEFAULT_USER: ${{ env.RABBITMQ_DEFAULT_USER }}
RABBITMQ_DEFAULT_PASS: ${{ env.RABBITMQ_DEFAULT_PASS }}
clamav:
image: clamav/clamav-debian:latest
ports:
- 3310:3310
env:
CLAMAV_NO_FRESHCLAMD: false
CLAMD_CONF_StreamMaxLength: 50M
CLAMD_CONF_MaxFileSize: 100M
CLAMD_CONF_MaxScanSize: 100M
CLAMD_CONF_MaxThreads: 4
CLAMD_CONF_ReadTimeout: 300
options: >-
--health-cmd "clamdscan --version || exit 1"
--health-interval 30s
--health-timeout 10s
--health-retries 5
--health-start-period 180s
steps:
- name: Checkout repository
@@ -131,6 +148,35 @@ jobs:
# outputs:
# DB_URL, API_URL, GRAPHQL_URL, ANON_KEY, SERVICE_ROLE_KEY, JWT_SECRET
- name: Wait for ClamAV to be ready
run: |
echo "Waiting for ClamAV daemon to start..."
max_attempts=60
attempt=0
until nc -z localhost 3310 || [ $attempt -eq $max_attempts ]; do
echo "ClamAV is unavailable - sleeping (attempt $((attempt+1))/$max_attempts)"
sleep 5
attempt=$((attempt+1))
done
if [ $attempt -eq $max_attempts ]; then
echo "ClamAV failed to start after $((max_attempts*5)) seconds"
echo "Checking ClamAV service logs..."
docker logs $(docker ps -q --filter "ancestor=clamav/clamav-debian:latest") 2>&1 | tail -50 || echo "No ClamAV container found"
exit 1
fi
echo "ClamAV is ready!"
# Verify ClamAV is responsive
echo "Testing ClamAV connection..."
timeout 10 bash -c 'echo "PING" | nc localhost 3310' || {
echo "ClamAV is not responding to PING"
docker logs $(docker ps -q --filter "ancestor=clamav/clamav-debian:latest") 2>&1 | tail -50 || echo "No ClamAV container found"
exit 1
}
- name: Run Database Migrations
run: poetry run prisma migrate dev --name updates
env:

View File

@@ -19,7 +19,7 @@ cd backend && poetry install
# Run database migrations
poetry run prisma migrate dev
# Start all services (database, redis, rabbitmq)
# Start all services (database, redis, rabbitmq, clamav)
docker compose up -d
# Run the backend server
@@ -92,6 +92,7 @@ npm run type-check
2. **Blocks**: Reusable components in `/backend/blocks/` that perform specific tasks
3. **Integrations**: OAuth and API connections stored per user
4. **Store**: Marketplace for sharing agent templates
5. **Virus Scanning**: ClamAV integration for file upload security
### Testing Approach
- Backend uses pytest with snapshot testing for API responses

View File

@@ -34,6 +34,20 @@ class StorageUploadError(MediaUploadError):
pass
class VirusDetectedError(MediaUploadError):
"""Raised when a virus is detected in uploaded file"""
def __init__(self, threat_name: str, message: str | None = None):
self.threat_name = threat_name
super().__init__(message or f"Virus detected: {threat_name}")
class VirusScanError(MediaUploadError):
"""Raised when virus scanning fails"""
pass
class StoreError(Exception):
"""Base exception for store-related errors"""

View File

@@ -8,6 +8,7 @@ from google.cloud import storage
import backend.server.v2.store.exceptions
from backend.util.exceptions import MissingConfigError
from backend.util.settings import Settings
from backend.util.virus_scanner import scan_content_safe
logger = logging.getLogger(__name__)
@@ -67,7 +68,7 @@ async def upload_media(
# Validate file signature/magic bytes
if file.content_type in ALLOWED_IMAGE_TYPES:
# Check image file signatures
if content.startswith(b"\xFF\xD8\xFF"): # JPEG
if content.startswith(b"\xff\xd8\xff"): # JPEG
if file.content_type != "image/jpeg":
raise backend.server.v2.store.exceptions.InvalidFileTypeError(
"File signature does not match content type"
@@ -175,6 +176,7 @@ async def upload_media(
blob.content_type = content_type
file_bytes = await file.read()
await scan_content_safe(file_bytes, filename=unique_filename)
blob.upload_from_string(file_bytes, content_type=content_type)
public_url = blob.public_url

View File

@@ -12,6 +12,7 @@ from autogpt_libs.auth.depends import auth_middleware, get_user_id
import backend.data.block
import backend.data.graph
import backend.server.v2.store.db
import backend.server.v2.store.exceptions
import backend.server.v2.store.image_gen
import backend.server.v2.store.media
import backend.server.v2.store.model
@@ -589,6 +590,25 @@ async def upload_submission_media(
user_id=user_id, file=file
)
return media_url
except backend.server.v2.store.exceptions.VirusDetectedError as e:
logger.warning(f"Virus detected in uploaded file: {e.threat_name}")
return fastapi.responses.JSONResponse(
status_code=400,
content={
"detail": f"File rejected due to virus detection: {e.threat_name}",
"error_type": "virus_detected",
"threat_name": e.threat_name,
},
)
except backend.server.v2.store.exceptions.VirusScanError as e:
logger.error(f"Virus scanning failed: {str(e)}")
return fastapi.responses.JSONResponse(
status_code=503,
content={
"detail": "Virus scanning service unavailable. Please try again later.",
"error_type": "virus_scan_failed",
},
)
except Exception:
logger.exception("Exception occurred whilst uploading submission media")
return fastapi.responses.JSONResponse(

View File

@@ -9,6 +9,7 @@ from urllib.parse import urlparse
from backend.util.request import Requests
from backend.util.type import MediaFileType
from backend.util.virus_scanner import scan_content_safe
TEMP_DIR = Path(tempfile.gettempdir()).resolve()
@@ -105,7 +106,11 @@ async def store_media_file(
extension = _extension_from_mime(mime_type)
filename = f"{uuid.uuid4()}{extension}"
target_path = _ensure_inside_base(base_path / filename, base_path)
target_path.write_bytes(base64.b64decode(b64_content))
content = base64.b64decode(b64_content)
# Virus scan the base64 content before writing
await scan_content_safe(content, filename=filename)
target_path.write_bytes(content)
elif file.startswith(("http://", "https://")):
# URL
@@ -115,6 +120,9 @@ async def store_media_file(
# Download and save
resp = await Requests().get(file)
# Virus scan the downloaded content before writing
await scan_content_safe(resp.content, filename=filename)
target_path.write_bytes(resp.content)
else:

View File

@@ -238,6 +238,23 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
description="The Discord channel for the platform",
)
clamav_service_host: str = Field(
default="localhost",
description="The host for the ClamAV daemon",
)
clamav_service_port: int = Field(
default=3310,
description="The port for the ClamAV daemon",
)
clamav_service_timeout: int = Field(
default=60,
description="The timeout in seconds for the ClamAV daemon",
)
clamav_service_enabled: bool = Field(
default=True,
description="Whether virus scanning is enabled or not",
)
@field_validator("platform_base_url", "frontend_base_url")
@classmethod
def validate_platform_base_url(cls, v: str, info: ValidationInfo) -> str:

View File

@@ -0,0 +1,205 @@
import asyncio
import logging
import time
from typing import Optional, Tuple
import pyclamd
from pydantic import BaseModel
from pydantic_settings import BaseSettings
from backend.util.settings import Settings
logger = logging.getLogger(__name__)
settings = Settings()
class VirusScanResult(BaseModel):
is_clean: bool
scan_time_ms: int
file_size: int
threat_name: Optional[str] = None
class VirusScannerSettings(BaseSettings):
clamav_service_host: str = "localhost"
clamav_service_port: int = 3310
clamav_service_timeout: int = 60
clamav_service_enabled: bool = True
max_scan_size: int = 100 * 1024 * 1024 # 100 MB
chunk_size: int = 25 * 1024 * 1024 # 25 MB (safe for 50MB stream limit)
min_chunk_size: int = 128 * 1024 # 128 KB minimum
max_retries: int = 8 # halve chunk ≤ max_retries times
class VirusScannerService:
"""
Thin async wrapper around ClamAV. Creates a fresh `ClamdNetworkSocket`
per chunk (the class is *not* thread-safe) and falls back to smaller
chunks when the daemon rejects the stream size.
"""
def __init__(self, settings: VirusScannerSettings) -> None:
self.settings = settings
def _new_client(self) -> pyclamd.ClamdNetworkSocket:
return pyclamd.ClamdNetworkSocket(
host=self.settings.clamav_service_host,
port=self.settings.clamav_service_port,
timeout=self.settings.clamav_service_timeout,
)
@staticmethod
def _parse_raw(raw: Optional[dict]) -> Tuple[bool, Optional[str]]:
"""
Convert pyclamd output to (infected?, threat_name).
Returns (False, None) for clean.
"""
if not raw:
return False, None
status, threat = next(iter(raw.values()))
return status == "FOUND", threat
async def _scan_chunk(self, chunk: bytes) -> Tuple[bool, Optional[str]]:
loop = asyncio.get_running_loop()
client = self._new_client()
try:
raw = await loop.run_in_executor(None, client.scan_stream, chunk)
return self._parse_raw(raw)
# ClamAV aborts the socket when >StreamMaxLength → BrokenPipe/Reset.
except (BrokenPipeError, ConnectionResetError) as exc:
raise RuntimeError("size-limit") from exc
except Exception as exc:
if "INSTREAM size limit exceeded" in str(exc):
raise RuntimeError("size-limit") from exc
raise
# --------------------------------------------------------------------- #
# Public API
# --------------------------------------------------------------------- #
async def scan_file(
self, content: bytes, *, filename: str = "unknown"
) -> VirusScanResult:
"""
Scan `content`. Returns a result object or raises on infrastructure
failure (unreachable daemon, etc.).
"""
if not self.settings.clamav_service_enabled:
logger.warning("Virus scanning disabled accepting %s", filename)
return VirusScanResult(
is_clean=True, scan_time_ms=0, file_size=len(content)
)
if len(content) > self.settings.max_scan_size:
logger.warning(
f"File {filename} ({len(content)} bytes) exceeds max scan size ({self.settings.max_scan_size}), skipping virus scan"
)
return VirusScanResult(
is_clean=True, # Assume clean for oversized files
file_size=len(content),
scan_time_ms=0,
threat_name=None,
)
loop = asyncio.get_running_loop()
if not await loop.run_in_executor(None, self._new_client().ping):
raise RuntimeError("ClamAV service is unreachable")
start = time.monotonic()
chunk_size = self.settings.chunk_size
for retry in range(self.settings.max_retries + 1):
try:
logger.debug(
f"Scanning {filename} with chunk size: {chunk_size // 1_048_576}MB"
)
# Scan all chunks with current chunk size
for offset in range(0, len(content), chunk_size):
chunk_data = content[offset : offset + chunk_size]
infected, threat = await self._scan_chunk(chunk_data)
if infected:
return VirusScanResult(
is_clean=False,
threat_name=threat,
file_size=len(content),
scan_time_ms=int((time.monotonic() - start) * 1000),
)
# All chunks clean
return VirusScanResult(
is_clean=True,
file_size=len(content),
scan_time_ms=int((time.monotonic() - start) * 1000),
)
except RuntimeError as exc:
if (
str(exc) == "size-limit"
and chunk_size > self.settings.min_chunk_size
):
chunk_size //= 2
logger.info(
f"Chunk size too large for {filename}, reducing to {chunk_size // 1_048_576}MB (retry {retry + 1}/{self.settings.max_retries + 1})"
)
continue
else:
# Either not a size-limit error, or we've hit minimum chunk size
logger.error(f"Cannot scan {filename}: {exc}")
raise
# If we can't scan even with minimum chunk size, log warning and allow file
logger.warning(
f"Unable to virus scan {filename} ({len(content)} bytes) - chunk size limits exceeded. "
f"Allowing file but recommend manual review."
)
return VirusScanResult(
is_clean=True, # Allow file when scanning impossible
file_size=len(content),
scan_time_ms=int((time.monotonic() - start) * 1000),
threat_name=None,
)
_scanner: Optional[VirusScannerService] = None
def get_virus_scanner() -> VirusScannerService:
global _scanner
if _scanner is None:
_settings = VirusScannerSettings(
clamav_service_host=settings.config.clamav_service_host,
clamav_service_port=settings.config.clamav_service_port,
clamav_service_enabled=settings.config.clamav_service_enabled,
)
_scanner = VirusScannerService(_settings)
return _scanner
async def scan_content_safe(content: bytes, *, filename: str = "unknown") -> None:
"""
Helper function to scan content and raise appropriate exceptions.
Raises:
VirusDetectedError: If virus is found
VirusScanError: If scanning fails
"""
from backend.server.v2.store.exceptions import VirusDetectedError, VirusScanError
try:
result = await get_virus_scanner().scan_file(content, filename=filename)
if not result.is_clean:
threat_name = result.threat_name or "Unknown threat"
logger.warning(f"Virus detected in file {filename}: {threat_name}")
raise VirusDetectedError(
threat_name, f"File rejected due to virus detection: {threat_name}"
)
logger.info(f"File {filename} passed virus scan in {result.scan_time_ms}ms")
except VirusDetectedError:
raise
except Exception as e:
logger.error(f"Virus scanning failed for {filename}: {str(e)}")
raise VirusScanError(f"Virus scanning failed: {str(e)}") from e

View File

@@ -3856,6 +3856,17 @@ cffi = ">=1.5.0"
[package.extras]
idna = ["idna (>=2.1)"]
[[package]]
name = "pyclamd"
version = "0.4.0"
description = "pyClamd is a python interface to Clamd (Clamav daemon)."
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "pyClamd-0.4.0.tar.gz", hash = "sha256:ddd588577e5db123760b6ddaac46b5c4b1d9044a00b5d9422de59f83a55c20fe"},
]
[[package]]
name = "pycodestyle"
version = "2.13.0"
@@ -6369,4 +6380,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<3.13"
content-hash = "6c93e51cf22c06548aa6d0e23ca8ceb4450f5e02d4142715e941aabc1a2cbd6a"
content-hash = "35f6516ea0e72a0b4381842f4a6ad6d01ed263e01baabb09e554f9a63ca8b175"

View File

@@ -68,6 +68,7 @@ zerobouncesdk = "^1.1.1"
# NOTE: please insert new dependencies in their alphabetical location
pytest-snapshot = "^0.9.0"
aiofiles = "^24.1.0"
pyclamd = "^0.4.0"
[tool.poetry.group.dev.dependencies]
aiohappyeyeballs = "^2.6.1"

View File

@@ -0,0 +1,223 @@
import asyncio
import time
from unittest.mock import AsyncMock, Mock, patch
import pytest
from backend.server.v2.store.exceptions import VirusDetectedError, VirusScanError
from backend.util.virus_scanner import (
VirusScannerService,
VirusScannerSettings,
VirusScanResult,
get_virus_scanner,
scan_content_safe,
)
class TestVirusScannerService:
@pytest.fixture
def scanner_settings(self):
return VirusScannerSettings(
clamav_service_host="localhost",
clamav_service_port=3310,
clamav_service_enabled=True,
max_scan_size=10 * 1024 * 1024, # 10MB for testing
)
@pytest.fixture
def scanner(self, scanner_settings):
return VirusScannerService(scanner_settings)
@pytest.fixture
def disabled_scanner(self):
settings = VirusScannerSettings(clamav_service_enabled=False)
return VirusScannerService(settings)
def test_scanner_initialization(self, scanner_settings):
scanner = VirusScannerService(scanner_settings)
assert scanner.settings.clamav_service_host == "localhost"
assert scanner.settings.clamav_service_port == 3310
assert scanner.settings.clamav_service_enabled is True
@pytest.mark.asyncio
async def test_scan_disabled_returns_clean(self, disabled_scanner):
content = b"test file content"
result = await disabled_scanner.scan_file(content, filename="test.txt")
assert result.is_clean is True
assert result.threat_name is None
assert result.file_size == len(content)
assert result.scan_time_ms == 0
@pytest.mark.asyncio
async def test_scan_file_too_large(self, scanner):
# Create content larger than max_scan_size
large_content = b"x" * (scanner.settings.max_scan_size + 1)
# Large files are allowed but marked as clean with a warning
result = await scanner.scan_file(large_content, filename="large_file.txt")
assert result.is_clean is True
assert result.file_size == len(large_content)
assert result.scan_time_ms == 0
# Note: ping method was removed from current implementation
@pytest.mark.asyncio
@patch("pyclamd.ClamdNetworkSocket")
async def test_scan_clean_file(self, mock_clamav_class, scanner):
def mock_scan_stream(_):
time.sleep(0.001) # Small delay to ensure timing > 0
return None # No virus detected
mock_client = Mock()
mock_client.ping.return_value = True
mock_client.scan_stream = mock_scan_stream
mock_clamav_class.return_value = mock_client
content = b"clean file content"
result = await scanner.scan_file(content, filename="clean.txt")
assert result.is_clean is True
assert result.threat_name is None
assert result.file_size == len(content)
assert result.scan_time_ms > 0
@pytest.mark.asyncio
@patch("pyclamd.ClamdNetworkSocket")
async def test_scan_infected_file(self, mock_clamav_class, scanner):
def mock_scan_stream(_):
time.sleep(0.001) # Small delay to ensure timing > 0
return {"stream": ("FOUND", "Win.Test.EICAR_HDB-1")}
mock_client = Mock()
mock_client.ping.return_value = True
mock_client.scan_stream = mock_scan_stream
mock_clamav_class.return_value = mock_client
content = b"infected file content"
result = await scanner.scan_file(content, filename="infected.txt")
assert result.is_clean is False
assert result.threat_name == "Win.Test.EICAR_HDB-1"
assert result.file_size == len(content)
assert result.scan_time_ms > 0
@pytest.mark.asyncio
@patch("pyclamd.ClamdNetworkSocket")
async def test_scan_clamav_unavailable_fail_safe(self, mock_clamav_class, scanner):
mock_client = Mock()
mock_client.ping.return_value = False
mock_clamav_class.return_value = mock_client
content = b"test content"
with pytest.raises(RuntimeError, match="ClamAV service is unreachable"):
await scanner.scan_file(content, filename="test.txt")
@pytest.mark.asyncio
@patch("pyclamd.ClamdNetworkSocket")
async def test_scan_error_fail_safe(self, mock_clamav_class, scanner):
mock_client = Mock()
mock_client.ping.return_value = True
mock_client.scan_stream.side_effect = Exception("Scanning error")
mock_clamav_class.return_value = mock_client
content = b"test content"
with pytest.raises(Exception, match="Scanning error"):
await scanner.scan_file(content, filename="test.txt")
# Note: scan_file_method and scan_upload_file tests removed as these APIs don't exist in current implementation
def test_get_virus_scanner_singleton(self):
scanner1 = get_virus_scanner()
scanner2 = get_virus_scanner()
# Should return the same instance
assert scanner1 is scanner2
# Note: client_reuse test removed as _get_client method doesn't exist in current implementation
def test_scan_result_model(self):
# Test VirusScanResult model
result = VirusScanResult(
is_clean=False, threat_name="Test.Virus", scan_time_ms=150, file_size=1024
)
assert result.is_clean is False
assert result.threat_name == "Test.Virus"
assert result.scan_time_ms == 150
assert result.file_size == 1024
@pytest.mark.asyncio
@patch("pyclamd.ClamdNetworkSocket")
async def test_concurrent_scans(self, mock_clamav_class, scanner):
def mock_scan_stream(_):
time.sleep(0.001) # Small delay to ensure timing > 0
return None
mock_client = Mock()
mock_client.ping.return_value = True
mock_client.scan_stream = mock_scan_stream
mock_clamav_class.return_value = mock_client
content1 = b"file1 content"
content2 = b"file2 content"
# Run concurrent scans
results = await asyncio.gather(
scanner.scan_file(content1, filename="file1.txt"),
scanner.scan_file(content2, filename="file2.txt"),
)
assert len(results) == 2
assert all(result.is_clean for result in results)
assert results[0].file_size == len(content1)
assert results[1].file_size == len(content2)
assert all(result.scan_time_ms > 0 for result in results)
class TestHelperFunctions:
"""Test the helper functions scan_content_safe"""
@pytest.mark.asyncio
async def test_scan_content_safe_clean(self):
"""Test scan_content_safe with clean content"""
with patch("backend.util.virus_scanner.get_virus_scanner") as mock_get_scanner:
mock_scanner = Mock()
mock_scanner.scan_file = AsyncMock()
mock_scanner.scan_file.return_value = Mock(
is_clean=True, threat_name=None, scan_time_ms=50, file_size=100
)
mock_get_scanner.return_value = mock_scanner
# Should not raise any exception
await scan_content_safe(b"clean content", filename="test.txt")
@pytest.mark.asyncio
async def test_scan_content_safe_infected(self):
"""Test scan_content_safe with infected content"""
with patch("backend.util.virus_scanner.get_virus_scanner") as mock_get_scanner:
mock_scanner = Mock()
mock_scanner.scan_file = AsyncMock()
mock_scanner.scan_file.return_value = Mock(
is_clean=False, threat_name="Test.Virus", scan_time_ms=50, file_size=100
)
mock_get_scanner.return_value = mock_scanner
with pytest.raises(VirusDetectedError) as exc_info:
await scan_content_safe(b"infected content", filename="virus.txt")
assert exc_info.value.threat_name == "Test.Virus"
@pytest.mark.asyncio
async def test_scan_content_safe_scan_error(self):
"""Test scan_content_safe when scanning fails"""
with patch("backend.util.virus_scanner.get_virus_scanner") as mock_get_scanner:
mock_scanner = Mock()
mock_scanner.scan_file = AsyncMock()
mock_scanner.scan_file.side_effect = Exception("Scan failed")
mock_get_scanner.return_value = mock_scanner
with pytest.raises(VirusScanError, match="Virus scanning failed"):
await scan_content_safe(b"test content", filename="test.txt")

View File

@@ -6,6 +6,7 @@ networks:
volumes:
supabase-config:
clamav-data:
x-agpt-services:
&agpt-services
@@ -63,6 +64,26 @@ services:
file: ./docker-compose.platform.yml
service: scheduler_server
clamav:
<<: *agpt-services
image: clamav/clamav-debian:latest
ports:
- "3310:3310"
volumes:
- clamav-data:/var/lib/clamav
environment:
- CLAMAV_NO_FRESHCLAMD=false
- CLAMD_CONF_StreamMaxLength=50M
- CLAMD_CONF_MaxFileSize=100M
- CLAMD_CONF_MaxScanSize=100M
- CLAMD_CONF_MaxThreads=12
- CLAMD_CONF_ReadTimeout=300
healthcheck:
test: ["CMD-SHELL", "clamdscan --version || exit 1"]
interval: 30s
timeout: 10s
retries: 3
# frontend:
# <<: *agpt-services
# extends:
@@ -162,3 +183,4 @@ services:
- vector
- redis
- rabbitmq
- clamav