From 4f6e66447f063a2d06d054c3ce16173ed0b5bba6 Mon Sep 17 00:00:00 2001 From: Abhimanyu Yadav <122007096+Abhi1992002@users.noreply.github.com> Date: Mon, 30 Jun 2025 19:49:36 +0530 Subject: [PATCH 1/5] fix(frontend): fix custom mutator of orval (#10269) This pull request includes updates to the environment configuration and API mutator logic in the `autogpt_platform/frontend` directory. The changes aim to improve flexibility by introducing dynamic base URLs through environment variables. Environment configuration updates: * [`autogpt_platform/frontend/.env.example`](diffhunk://#diff-72012a00359825421736dc064be74187011cb5b0462bea1ed3a3c5ca80bb3117R2): Added `NEXT_PUBLIC_FRONTEND_BASE_URL` to define the base URL for the frontend dynamically. API mutator logic updates: * [`autogpt_platform/frontend/src/app/api/mutators/custom-mutator.ts`](diffhunk://#diff-28c5af33c7bd0ecddc1793aa6a27bfd5b4f979b62c29990538aceea3320d8be9L1-R1): Updated `BASE_URL` to use the `NEXT_PUBLIC_FRONTEND_BASE_URL` environment variable, enabling dynamic configuration of the API proxy URL. ### Checklist - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: - [x] Tested manually and everything is working perfectly --- autogpt_platform/frontend/.env.example | 1 + .../frontend/src/app/api/mutators/custom-mutator.ts | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/autogpt_platform/frontend/.env.example b/autogpt_platform/frontend/.env.example index 0660979013..038c343f98 100644 --- a/autogpt_platform/frontend/.env.example +++ b/autogpt_platform/frontend/.env.example @@ -1,4 +1,5 @@ FRONTEND_BASE_URL=http://localhost:3000 +NEXT_PUBLIC_FRONTEND_BASE_URL=http://localhost:3000 NEXT_PUBLIC_AUTH_CALLBACK_URL=http://localhost:8006/auth/callback NEXT_PUBLIC_AGPT_SERVER_URL=http://localhost:8006/api diff --git a/autogpt_platform/frontend/src/app/api/mutators/custom-mutator.ts b/autogpt_platform/frontend/src/app/api/mutators/custom-mutator.ts index f0fdbc308c..53b899dfd5 100644 --- a/autogpt_platform/frontend/src/app/api/mutators/custom-mutator.ts +++ b/autogpt_platform/frontend/src/app/api/mutators/custom-mutator.ts @@ -1,4 +1,4 @@ -const BASE_URL = "/api/proxy"; // Sending request via nextjs Server +const BASE_URL = `${process.env.NEXT_PUBLIC_FRONTEND_BASE_URL}/api/proxy`; // Sending request via nextjs Server const getBody = (c: Response | Request): Promise => { const contentType = c.headers.get("content-type"); From b32ac898db9c1348aab3b737db309e79cebdebae Mon Sep 17 00:00:00 2001 From: Ubbe Date: Mon, 30 Jun 2025 20:42:31 +0400 Subject: [PATCH 2/5] fix(frontend): migrate to `NEXT_PUBLIC_FRONTEND_BASE_URL` (#10270) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Changes 🏗️ We need to `FRONTEND_BASE_URL` to → `NEXT_PUBLIC_FRONTEND_BASE_URL` given is needed on the new API client on the Front-end to make requests. The `NEXT_PUBLIC` prefix is important so that it is available on the client. ## Checklist 📋 #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: - [x] Run the app locally - [x] The library and other pages work --- autogpt_platform/backend/.env.example | 4 ++-- autogpt_platform/docker-compose.platform.yml | 2 +- autogpt_platform/frontend/.env.example | 3 +-- .../frontend/src/app/(platform)/reset-password/actions.ts | 3 ++- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/autogpt_platform/backend/.env.example b/autogpt_platform/backend/.env.example index 18343d7725..395248f7ed 100644 --- a/autogpt_platform/backend/.env.example +++ b/autogpt_platform/backend/.env.example @@ -55,9 +55,9 @@ RABBITMQ_DEFAULT_PASS=k0VMxyIJF9S35f3x2uaw5IWAl6Y536O7 ## GCS bucket is required for marketplace and library functionality MEDIA_GCS_BUCKET_NAME= -## For local development, you may need to set FRONTEND_BASE_URL for the OAuth flow +## For local development, you may need to set NEXT_PUBLIC_FRONTEND_BASE_URL for the OAuth flow ## for integrations to work. Defaults to the value of PLATFORM_BASE_URL if not set. -# FRONTEND_BASE_URL=http://localhost:3000 +# NEXT_PUBLIC_FRONTEND_BASE_URL=http://localhost:3000 ## PLATFORM_BASE_URL must be set to a *publicly accessible* URL pointing to your backend ## to use the platform's webhook-related functionality. diff --git a/autogpt_platform/docker-compose.platform.yml b/autogpt_platform/docker-compose.platform.yml index cc5e8c3d6a..ca9a483b40 100644 --- a/autogpt_platform/docker-compose.platform.yml +++ b/autogpt_platform/docker-compose.platform.yml @@ -93,7 +93,7 @@ services: - SCHEDULER_HOST=scheduler_server - EXECUTIONMANAGER_HOST=executor - NOTIFICATIONMANAGER_HOST=rest_server - - FRONTEND_BASE_URL=http://localhost:3000 + - NEXT_PUBLIC_FRONTEND_BASE_URL=http://localhost:3000 - BACKEND_CORS_ALLOW_ORIGINS=["http://localhost:3000"] - ENCRYPTION_KEY=dvziYgz0KSK8FENhju0ZYi8-fRTfAdlz6YLhdB_jhNw= # DO NOT USE IN PRODUCTION!! - UNSUBSCRIBE_SECRET_KEY=HlP8ivStJjmbf6NKi78m_3FnOogut0t5ckzjsIqeaio= # DO NOT USE IN PRODUCTION!! diff --git a/autogpt_platform/frontend/.env.example b/autogpt_platform/frontend/.env.example index 038c343f98..287efe5f22 100644 --- a/autogpt_platform/frontend/.env.example +++ b/autogpt_platform/frontend/.env.example @@ -1,4 +1,3 @@ -FRONTEND_BASE_URL=http://localhost:3000 NEXT_PUBLIC_FRONTEND_BASE_URL=http://localhost:3000 NEXT_PUBLIC_AUTH_CALLBACK_URL=http://localhost:8006/auth/callback @@ -24,7 +23,7 @@ NEXT_PUBLIC_SUPABASE_ANON_KEY=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyAgCiAgICAic ## OAuth Callback URL ## This should be {domain}/auth/callback ## Only used if you're using Supabase and OAuth -AUTH_CALLBACK_URL="${FRONTEND_BASE_URL}/auth/callback" +AUTH_CALLBACK_URL="${NEXT_PUBLIC_FRONTEND_BASE_URL}/auth/callback" GA_MEASUREMENT_ID=G-FH2XK2W4GN # When running locally, set NEXT_PUBLIC_BEHAVE_AS=CLOUD to use the a locally hosted marketplace (as is typical in development, and the cloud deployment), otherwise set it to LOCAL to have the marketplace open in a new tab diff --git a/autogpt_platform/frontend/src/app/(platform)/reset-password/actions.ts b/autogpt_platform/frontend/src/app/(platform)/reset-password/actions.ts index 0ee4e4e7cc..327b4f86de 100644 --- a/autogpt_platform/frontend/src/app/(platform)/reset-password/actions.ts +++ b/autogpt_platform/frontend/src/app/(platform)/reset-password/actions.ts @@ -10,7 +10,8 @@ export async function sendResetEmail(email: string, turnstileToken: string) { {}, async () => { const supabase = await getServerSupabase(); - const origin = process.env.FRONTEND_BASE_URL || "http://localhost:3000"; + const origin = + process.env.NEXT_PUBLIC_FRONTEND_BASE_URL || "http://localhost:3000"; if (!supabase) { redirect("/error"); From 89a5ba69e57bf9561036cddb1f0f812afb46ae66 Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Mon, 30 Jun 2025 14:06:29 -0700 Subject: [PATCH 3/5] fix(blocks): Fix boolean/toggle block input with false/disabled value --- .../frontend/src/components/node-input-components.tsx | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/autogpt_platform/frontend/src/components/node-input-components.tsx b/autogpt_platform/frontend/src/components/node-input-components.tsx index b1dabfb99c..5a8c197434 100644 --- a/autogpt_platform/frontend/src/components/node-input-components.tsx +++ b/autogpt_platform/frontend/src/components/node-input-components.tsx @@ -1219,7 +1219,9 @@ const NodeBooleanInput: FC<{ className?: string; displayName: string; }> = ({ selfKey, schema, value, error, handleInputChange, className }) => { - value ||= schema.default ?? false; + if (value == null) { + value = schema.default ?? false; + } return (
Date: Mon, 30 Jun 2025 14:09:30 -0700 Subject: [PATCH 4/5] fix(backend): Convert pyclamd to aioclamd for anti-virus scan concurrency improvement (#10258) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Currently, we are using PyClamd to run a file anti-virus scan for all the files uploaded into the platform. We split the file into small chunks and serially check the chunks for the virus scan. The socket is not thread-safe, and we need to create multiple sockets across many threads to leverage concurrency. To make this step concurrent and keep it fully async, we need to migrate PyClamd to aioclamd. ### Changes 🏗️ Convert pyclamd to aioclamd, leverage chunk parallelism scan with a semaphore limiting the concurrency limit. #### Side Note Shout-out to @tedyu for raising this improvement idea. ### Checklist 📋 #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: - [x] Execute file upload into the platform --- .../backend/backend/util/settings.py | 8 + .../backend/backend/util/virus_scanner.py | 140 +++++++++--------- .../backend/util/virus_scanner_test.py | 96 +++++++----- autogpt_platform/backend/poetry.lock | 46 ++++-- autogpt_platform/backend/pyproject.toml | 3 +- 5 files changed, 179 insertions(+), 114 deletions(-) diff --git a/autogpt_platform/backend/backend/util/settings.py b/autogpt_platform/backend/backend/util/settings.py index 542c010bca..feef2d3c97 100644 --- a/autogpt_platform/backend/backend/util/settings.py +++ b/autogpt_platform/backend/backend/util/settings.py @@ -254,6 +254,14 @@ class Config(UpdateTrackingModel["Config"], BaseSettings): default=True, description="Whether virus scanning is enabled or not", ) + clamav_max_concurrency: int = Field( + default=10, + description="The maximum number of concurrent scans to perform", + ) + clamav_mark_failed_scans_as_clean: bool = Field( + default=False, + description="Whether to mark failed scans as clean or not", + ) @field_validator("platform_base_url", "frontend_base_url") @classmethod diff --git a/autogpt_platform/backend/backend/util/virus_scanner.py b/autogpt_platform/backend/backend/util/virus_scanner.py index eb096b605b..4494c94790 100644 --- a/autogpt_platform/backend/backend/util/virus_scanner.py +++ b/autogpt_platform/backend/backend/util/virus_scanner.py @@ -1,9 +1,10 @@ import asyncio +import io import logging import time from typing import Optional, Tuple -import pyclamd +import aioclamd from pydantic import BaseModel from pydantic_settings import BaseSettings @@ -21,37 +22,47 @@ class VirusScanResult(BaseModel): class VirusScannerSettings(BaseSettings): + # Tunables for the scanner layer (NOT the ClamAV daemon). clamav_service_host: str = "localhost" clamav_service_port: int = 3310 clamav_service_timeout: int = 60 clamav_service_enabled: bool = True - max_scan_size: int = 100 * 1024 * 1024 # 100 MB - chunk_size: int = 25 * 1024 * 1024 # 25 MB (safe for 50MB stream limit) - min_chunk_size: int = 128 * 1024 # 128 KB minimum - max_retries: int = 8 # halve chunk ≤ max_retries times + # If the service is disabled, all files are considered clean. + mark_failed_scans_as_clean: bool = False + # Client-side protective limits + max_scan_size: int = 2 * 1024 * 1024 * 1024 # 2 GB guard-rail in memory + min_chunk_size: int = 128 * 1024 # 128 KB hard floor + max_retries: int = 8 # halve ≤ max_retries times + # Concurrency throttle toward the ClamAV daemon. Do *NOT* simply turn this + # up to the number of CPU cores; keep it ≤ (MaxThreads / pods) – 1. + max_concurrency: int = 5 class VirusScannerService: - """ - Thin async wrapper around ClamAV. Creates a fresh `ClamdNetworkSocket` - per chunk (the class is *not* thread-safe) and falls back to smaller - chunks when the daemon rejects the stream size. + """Fully-async ClamAV wrapper using **aioclamd**. + + • Reuses a single `ClamdAsyncClient` connection (aioclamd keeps the socket open). + • Throttles concurrent `INSTREAM` calls with an `asyncio.Semaphore` so we don't exhaust daemon worker threads or file descriptors. + • Falls back to progressively smaller chunk sizes when the daemon rejects a stream as too large. """ def __init__(self, settings: VirusScannerSettings) -> None: self.settings = settings - - def _new_client(self) -> pyclamd.ClamdNetworkSocket: - return pyclamd.ClamdNetworkSocket( - host=self.settings.clamav_service_host, - port=self.settings.clamav_service_port, - timeout=self.settings.clamav_service_timeout, + self._client = aioclamd.ClamdAsyncClient( + host=settings.clamav_service_host, + port=settings.clamav_service_port, + timeout=settings.clamav_service_timeout, ) + self._sem = asyncio.Semaphore(settings.max_concurrency) + + # ------------------------------------------------------------------ # + # Helpers + # ------------------------------------------------------------------ # @staticmethod def _parse_raw(raw: Optional[dict]) -> Tuple[bool, Optional[str]]: """ - Convert pyclamd output to (infected?, threat_name). + Convert aioclamd output to (infected?, threat_name). Returns (False, None) for clean. """ if not raw: @@ -59,24 +70,22 @@ class VirusScannerService: status, threat = next(iter(raw.values())) return status == "FOUND", threat - async def _scan_chunk(self, chunk: bytes) -> Tuple[bool, Optional[str]]: - loop = asyncio.get_running_loop() - client = self._new_client() - try: - raw = await loop.run_in_executor(None, client.scan_stream, chunk) - return self._parse_raw(raw) - - # ClamAV aborts the socket when >StreamMaxLength → BrokenPipe/Reset. - except (BrokenPipeError, ConnectionResetError) as exc: - raise RuntimeError("size-limit") from exc - except Exception as exc: - if "INSTREAM size limit exceeded" in str(exc): + async def _instream(self, chunk: bytes) -> Tuple[bool, Optional[str]]: + """Scan **one** chunk with concurrency control.""" + async with self._sem: + try: + raw = await self._client.instream(io.BytesIO(chunk)) + return self._parse_raw(raw) + except (BrokenPipeError, ConnectionResetError) as exc: raise RuntimeError("size-limit") from exc - raise + except Exception as exc: + if "INSTREAM size limit exceeded" in str(exc): + raise RuntimeError("size-limit") from exc + raise - # --------------------------------------------------------------------- # + # ------------------------------------------------------------------ # # Public API - # --------------------------------------------------------------------- # + # ------------------------------------------------------------------ # async def scan_file( self, content: bytes, *, filename: str = "unknown" @@ -84,81 +93,74 @@ class VirusScannerService: """ Scan `content`. Returns a result object or raises on infrastructure failure (unreachable daemon, etc.). + The algorithm always tries whole-file first. If the daemon refuses + on size grounds, it falls back to chunked parallel scanning. """ if not self.settings.clamav_service_enabled: - logger.warning("Virus scanning disabled – accepting %s", filename) + logger.warning(f"Virus scanning disabled – accepting {filename}") return VirusScanResult( is_clean=True, scan_time_ms=0, file_size=len(content) ) - if len(content) > self.settings.max_scan_size: logger.warning( - f"File {filename} ({len(content)} bytes) exceeds max scan size ({self.settings.max_scan_size}), skipping virus scan" + f"File {filename} ({len(content)} bytes) exceeds client max scan size ({self.settings.max_scan_size}); Stopping virus scan" ) return VirusScanResult( - is_clean=True, # Assume clean for oversized files + is_clean=self.settings.mark_failed_scans_as_clean, file_size=len(content), scan_time_ms=0, - threat_name=None, ) - loop = asyncio.get_running_loop() - if not await loop.run_in_executor(None, self._new_client().ping): + # Ensure daemon is reachable (small RTT check) + if not await self._client.ping(): raise RuntimeError("ClamAV service is unreachable") start = time.monotonic() - chunk_size = self.settings.chunk_size - - for retry in range(self.settings.max_retries + 1): + chunk_size = len(content) # Start with full content length + for retry in range(self.settings.max_retries): + # For small files, don't check min_chunk_size limit + if chunk_size < self.settings.min_chunk_size and chunk_size < len(content): + break + logger.debug( + f"Scanning {filename} with chunk size: {chunk_size // 1_048_576} MB (retry {retry + 1}/{self.settings.max_retries})" + ) try: - logger.debug( - f"Scanning {filename} with chunk size: {chunk_size // 1_048_576}MB" - ) - - # Scan all chunks with current chunk size - for offset in range(0, len(content), chunk_size): - chunk_data = content[offset : offset + chunk_size] - infected, threat = await self._scan_chunk(chunk_data) + tasks = [ + asyncio.create_task(self._instream(content[o : o + chunk_size])) + for o in range(0, len(content), chunk_size) + ] + for coro in asyncio.as_completed(tasks): + infected, threat = await coro if infected: + for t in tasks: + if not t.done(): + t.cancel() return VirusScanResult( is_clean=False, threat_name=threat, file_size=len(content), scan_time_ms=int((time.monotonic() - start) * 1000), ) - # All chunks clean return VirusScanResult( is_clean=True, file_size=len(content), scan_time_ms=int((time.monotonic() - start) * 1000), ) - except RuntimeError as exc: - if ( - str(exc) == "size-limit" - and chunk_size > self.settings.min_chunk_size - ): + if str(exc) == "size-limit": chunk_size //= 2 - logger.info( - f"Chunk size too large for {filename}, reducing to {chunk_size // 1_048_576}MB (retry {retry + 1}/{self.settings.max_retries + 1})" - ) continue - else: - # Either not a size-limit error, or we've hit minimum chunk size - logger.error(f"Cannot scan {filename}: {exc}") - raise - - # If we can't scan even with minimum chunk size, log warning and allow file + logger.error(f"Cannot scan {filename}: {exc}") + raise + # Phase 3 – give up but warn logger.warning( - f"Unable to virus scan {filename} ({len(content)} bytes) - chunk size limits exceeded. " - f"Allowing file but recommend manual review." + f"Unable to virus scan {filename} ({len(content)} bytes) even with minimum chunk size ({self.settings.min_chunk_size} bytes). Recommend manual review." ) return VirusScanResult( - is_clean=True, # Allow file when scanning impossible + is_clean=self.settings.mark_failed_scans_as_clean, file_size=len(content), scan_time_ms=int((time.monotonic() - start) * 1000), - threat_name=None, ) @@ -172,6 +174,8 @@ def get_virus_scanner() -> VirusScannerService: clamav_service_host=settings.config.clamav_service_host, clamav_service_port=settings.config.clamav_service_port, clamav_service_enabled=settings.config.clamav_service_enabled, + max_concurrency=settings.config.clamav_max_concurrency, + mark_failed_scans_as_clean=settings.config.clamav_mark_failed_scans_as_clean, ) _scanner = VirusScannerService(_settings) return _scanner diff --git a/autogpt_platform/backend/backend/util/virus_scanner_test.py b/autogpt_platform/backend/backend/util/virus_scanner_test.py index 561d60e569..81b5ad3342 100644 --- a/autogpt_platform/backend/backend/util/virus_scanner_test.py +++ b/autogpt_platform/backend/backend/util/virus_scanner_test.py @@ -1,5 +1,4 @@ import asyncio -import time from unittest.mock import AsyncMock, Mock, patch import pytest @@ -22,6 +21,7 @@ class TestVirusScannerService: clamav_service_port=3310, clamav_service_enabled=True, max_scan_size=10 * 1024 * 1024, # 10MB for testing + mark_failed_scans_as_clean=False, # For testing, failed scans should be clean ) @pytest.fixture @@ -54,25 +54,51 @@ class TestVirusScannerService: # Create content larger than max_scan_size large_content = b"x" * (scanner.settings.max_scan_size + 1) - # Large files are allowed but marked as clean with a warning + # Large files behavior depends on mark_failed_scans_as_clean setting result = await scanner.scan_file(large_content, filename="large_file.txt") - assert result.is_clean is True + assert result.is_clean == scanner.settings.mark_failed_scans_as_clean assert result.file_size == len(large_content) assert result.scan_time_ms == 0 + @pytest.mark.asyncio + async def test_scan_file_too_large_both_configurations(self): + """Test large file handling with both mark_failed_scans_as_clean configurations""" + large_content = b"x" * (10 * 1024 * 1024 + 1) # Larger than 10MB + + # Test with mark_failed_scans_as_clean=True + settings_clean = VirusScannerSettings( + max_scan_size=10 * 1024 * 1024, mark_failed_scans_as_clean=True + ) + scanner_clean = VirusScannerService(settings_clean) + result_clean = await scanner_clean.scan_file( + large_content, filename="large_file.txt" + ) + assert result_clean.is_clean is True + + # Test with mark_failed_scans_as_clean=False + settings_dirty = VirusScannerSettings( + max_scan_size=10 * 1024 * 1024, mark_failed_scans_as_clean=False + ) + scanner_dirty = VirusScannerService(settings_dirty) + result_dirty = await scanner_dirty.scan_file( + large_content, filename="large_file.txt" + ) + assert result_dirty.is_clean is False + # Note: ping method was removed from current implementation @pytest.mark.asyncio - @patch("pyclamd.ClamdNetworkSocket") - async def test_scan_clean_file(self, mock_clamav_class, scanner): - def mock_scan_stream(_): - time.sleep(0.001) # Small delay to ensure timing > 0 + async def test_scan_clean_file(self, scanner): + async def mock_instream(_): + await asyncio.sleep(0.001) # Small delay to ensure timing > 0 return None # No virus detected mock_client = Mock() - mock_client.ping.return_value = True - mock_client.scan_stream = mock_scan_stream - mock_clamav_class.return_value = mock_client + mock_client.ping = AsyncMock(return_value=True) + mock_client.instream = AsyncMock(side_effect=mock_instream) + + # Replace the client instance that was created in the constructor + scanner._client = mock_client content = b"clean file content" result = await scanner.scan_file(content, filename="clean.txt") @@ -83,16 +109,17 @@ class TestVirusScannerService: assert result.scan_time_ms > 0 @pytest.mark.asyncio - @patch("pyclamd.ClamdNetworkSocket") - async def test_scan_infected_file(self, mock_clamav_class, scanner): - def mock_scan_stream(_): - time.sleep(0.001) # Small delay to ensure timing > 0 + async def test_scan_infected_file(self, scanner): + async def mock_instream(_): + await asyncio.sleep(0.001) # Small delay to ensure timing > 0 return {"stream": ("FOUND", "Win.Test.EICAR_HDB-1")} mock_client = Mock() - mock_client.ping.return_value = True - mock_client.scan_stream = mock_scan_stream - mock_clamav_class.return_value = mock_client + mock_client.ping = AsyncMock(return_value=True) + mock_client.instream = AsyncMock(side_effect=mock_instream) + + # Replace the client instance that was created in the constructor + scanner._client = mock_client content = b"infected file content" result = await scanner.scan_file(content, filename="infected.txt") @@ -103,11 +130,12 @@ class TestVirusScannerService: assert result.scan_time_ms > 0 @pytest.mark.asyncio - @patch("pyclamd.ClamdNetworkSocket") - async def test_scan_clamav_unavailable_fail_safe(self, mock_clamav_class, scanner): + async def test_scan_clamav_unavailable_fail_safe(self, scanner): mock_client = Mock() - mock_client.ping.return_value = False - mock_clamav_class.return_value = mock_client + mock_client.ping = AsyncMock(return_value=False) + + # Replace the client instance that was created in the constructor + scanner._client = mock_client content = b"test content" @@ -115,12 +143,13 @@ class TestVirusScannerService: await scanner.scan_file(content, filename="test.txt") @pytest.mark.asyncio - @patch("pyclamd.ClamdNetworkSocket") - async def test_scan_error_fail_safe(self, mock_clamav_class, scanner): + async def test_scan_error_fail_safe(self, scanner): mock_client = Mock() - mock_client.ping.return_value = True - mock_client.scan_stream.side_effect = Exception("Scanning error") - mock_clamav_class.return_value = mock_client + mock_client.ping = AsyncMock(return_value=True) + mock_client.instream = AsyncMock(side_effect=Exception("Scanning error")) + + # Replace the client instance that was created in the constructor + scanner._client = mock_client content = b"test content" @@ -150,16 +179,17 @@ class TestVirusScannerService: assert result.file_size == 1024 @pytest.mark.asyncio - @patch("pyclamd.ClamdNetworkSocket") - async def test_concurrent_scans(self, mock_clamav_class, scanner): - def mock_scan_stream(_): - time.sleep(0.001) # Small delay to ensure timing > 0 + async def test_concurrent_scans(self, scanner): + async def mock_instream(_): + await asyncio.sleep(0.001) # Small delay to ensure timing > 0 return None mock_client = Mock() - mock_client.ping.return_value = True - mock_client.scan_stream = mock_scan_stream - mock_clamav_class.return_value = mock_client + mock_client.ping = AsyncMock(return_value=True) + mock_client.instream = AsyncMock(side_effect=mock_instream) + + # Replace the client instance that was created in the constructor + scanner._client = mock_client content1 = b"file1 content" content2 = b"file2 content" diff --git a/autogpt_platform/backend/poetry.lock b/autogpt_platform/backend/poetry.lock index 078dd4c526..b3312f7c39 100644 --- a/autogpt_platform/backend/poetry.lock +++ b/autogpt_platform/backend/poetry.lock @@ -17,6 +17,18 @@ aiormq = ">=6.8,<6.9" exceptiongroup = ">=1,<2" yarl = "*" +[[package]] +name = "aioclamd" +version = "1.0.0" +description = "Asynchronous client for virus scanning with ClamAV" +optional = false +python-versions = ">=3.7,<4.0" +groups = ["main"] +files = [ + {file = "aioclamd-1.0.0-py3-none-any.whl", hash = "sha256:4727da3953a4b38be4c2de1acb6b3bb3c94c1c171dcac780b80234ee6253f3d9"}, + {file = "aioclamd-1.0.0.tar.gz", hash = "sha256:7b14e94e3a2285cc89e2f4d434e2a01f322d3cb95476ce2dda015a7980876047"}, +] + [[package]] name = "aiodns" version = "3.4.0" @@ -3856,17 +3868,6 @@ cffi = ">=1.5.0" [package.extras] idna = ["idna (>=2.1)"] -[[package]] -name = "pyclamd" -version = "0.4.0" -description = "pyClamd is a python interface to Clamd (Clamav daemon)." -optional = false -python-versions = "*" -groups = ["main"] -files = [ - {file = "pyClamd-0.4.0.tar.gz", hash = "sha256:ddd588577e5db123760b6ddaac46b5c4b1d9044a00b5d9422de59f83a55c20fe"}, -] - [[package]] name = "pycodestyle" version = "2.13.0" @@ -5017,6 +5018,27 @@ statsig = ["statsig (>=0.55.3)"] tornado = ["tornado (>=6)"] unleash = ["UnleashClient (>=6.0.1)"] +[[package]] +name = "setuptools" +version = "80.9.0" +description = "Easily download, build, install, upgrade, and uninstall Python packages" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "setuptools-80.9.0-py3-none-any.whl", hash = "sha256:062d34222ad13e0cc312a4c02d73f059e86a4acbfbdea8f8f76b28c99f306922"}, + {file = "setuptools-80.9.0.tar.gz", hash = "sha256:f36b47402ecde768dbfafc46e8e4207b4360c654f1f3bb84475f0a28628fb19c"}, +] + +[package.extras] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\"", "ruff (>=0.8.0) ; sys_platform != \"cygwin\""] +core = ["importlib_metadata (>=6) ; python_version < \"3.10\"", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1) ; python_version < \"3.11\"", "wheel (>=0.43.0)"] +cover = ["pytest-cov"] +doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier", "towncrier (<24.7)"] +enabler = ["pytest-enabler (>=2.2)"] +test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"] +type = ["importlib_metadata (>=7.0.2) ; python_version < \"3.10\"", "jaraco.develop (>=7.21) ; sys_platform != \"cygwin\"", "mypy (==1.14.*)", "pytest-mypy"] + [[package]] name = "sgmllib3k" version = "1.0.0" @@ -6380,4 +6402,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<3.13" -content-hash = "bd117a21d817a2a735ed923c383713dd08469938ef5f7d07c4222da1acca2b5c" +content-hash = "b5c1201f27ee8d05d5d8c89702123df4293f124301d1aef7451591a351872260" diff --git a/autogpt_platform/backend/pyproject.toml b/autogpt_platform/backend/pyproject.toml index 5980bf3b6b..30429152cb 100644 --- a/autogpt_platform/backend/pyproject.toml +++ b/autogpt_platform/backend/pyproject.toml @@ -68,8 +68,9 @@ zerobouncesdk = "^1.1.1" # NOTE: please insert new dependencies in their alphabetical location pytest-snapshot = "^0.9.0" aiofiles = "^24.1.0" -pyclamd = "^0.4.0" tiktoken = "^0.9.0" +aioclamd = "^1.0.0" +setuptools = "^80.9.0" [tool.poetry.group.dev.dependencies] aiohappyeyeballs = "^2.6.1" From 198b3d9f4563bb17733d42d100f056ae338dbd38 Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Mon, 30 Jun 2025 14:09:53 -0700 Subject: [PATCH 5/5] fix(backend): Avoid swallowing exception on graph execution failure (#10260) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Graph execution that fails due to interruption or unknown error should be enqueued back to the queue. However, swallowing the error ends up not marking the execution as a failure. ### Changes 🏗️ * Avoid keep updating the graph execution status on each node execution step. * Added a guard rail to avoid completing graph execution on non-completed execution status. * Avoid acknowledging messages from the queue if the graph execution is not yet completed. ### Checklist 📋 #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: - [x] Run graph execution, kill the process, re-run the process --------- Co-authored-by: Swifty --- .../backend/backend/data/execution.py | 9 +- .../backend/backend/executor/manager.py | 14 +- .../backend/backend/util/decorator.py | 136 ++++++++++++++---- .../backend/backend/util/decorator_test.py | 60 +++++++- .../backend/backend/util/service_test.py | 2 +- 5 files changed, 183 insertions(+), 38 deletions(-) diff --git a/autogpt_platform/backend/backend/data/execution.py b/autogpt_platform/backend/backend/data/execution.py index 2a31fef256..090d120580 100644 --- a/autogpt_platform/backend/backend/data/execution.py +++ b/autogpt_platform/backend/backend/data/execution.py @@ -588,12 +588,10 @@ async def update_graph_execution_start_time( async def update_graph_execution_stats( graph_exec_id: str, - status: ExecutionStatus, + status: ExecutionStatus | None = None, stats: GraphExecutionStats | None = None, ) -> GraphExecution | None: - update_data: AgentGraphExecutionUpdateManyMutationInput = { - "executionStatus": status - } + update_data: AgentGraphExecutionUpdateManyMutationInput = {} if stats: stats_dict = stats.model_dump() @@ -601,6 +599,9 @@ async def update_graph_execution_stats( stats_dict["error"] = str(stats_dict["error"]) update_data["stats"] = Json(stats_dict) + if status: + update_data["executionStatus"] = status + updated_count = await AgentGraphExecution.prisma().update_many( where={ "id": graph_exec_id, diff --git a/autogpt_platform/backend/backend/executor/manager.py b/autogpt_platform/backend/backend/executor/manager.py index b787ccf890..db0d2db37f 100644 --- a/autogpt_platform/backend/backend/executor/manager.py +++ b/autogpt_platform/backend/backend/executor/manager.py @@ -421,7 +421,7 @@ class Executor: """ @classmethod - @async_error_logged + @async_error_logged(swallow=True) async def on_node_execution( cls, node_exec: NodeExecutionEntry, @@ -529,7 +529,7 @@ class Executor: logger.info(f"[GraphExecutor] {cls.pid} started") @classmethod - @error_logged + @error_logged(swallow=False) def on_graph_execution( cls, graph_exec: GraphExecutionEntry, cancel: threading.Event ): @@ -581,6 +581,15 @@ class Executor: exec_stats.cputime += timing_info.cpu_time exec_stats.error = str(error) if error else exec_stats.error + if status not in { + ExecutionStatus.COMPLETED, + ExecutionStatus.TERMINATED, + ExecutionStatus.FAILED, + }: + raise RuntimeError( + f"Graph Execution #{graph_exec.graph_exec_id} ended with unexpected status {status}" + ) + if graph_exec_result := db_client.update_graph_execution_stats( graph_exec_id=graph_exec.graph_exec_id, status=status, @@ -684,7 +693,6 @@ class Executor: if _graph_exec := db_client.update_graph_execution_stats( graph_exec_id=graph_exec.graph_exec_id, - status=execution_status, stats=execution_stats, ): send_execution_update(_graph_exec) diff --git a/autogpt_platform/backend/backend/util/decorator.py b/autogpt_platform/backend/backend/util/decorator.py index 84f128333f..a0b0ce91ba 100644 --- a/autogpt_platform/backend/backend/util/decorator.py +++ b/autogpt_platform/backend/backend/util/decorator.py @@ -2,7 +2,17 @@ import functools import logging import os import time -from typing import Any, Awaitable, Callable, Coroutine, ParamSpec, Tuple, TypeVar +from typing import ( + Any, + Awaitable, + Callable, + Coroutine, + Literal, + ParamSpec, + Tuple, + TypeVar, + overload, +) from pydantic import BaseModel @@ -72,37 +82,115 @@ def async_time_measured( return async_wrapper -def error_logged(func: Callable[P, T]) -> Callable[P, T | None]: +@overload +def error_logged( + *, swallow: Literal[True] +) -> Callable[[Callable[P, T]], Callable[P, T | None]]: ... + + +@overload +def error_logged( + *, swallow: Literal[False] +) -> Callable[[Callable[P, T]], Callable[P, T]]: ... + + +@overload +def error_logged() -> Callable[[Callable[P, T]], Callable[P, T | None]]: ... + + +def error_logged( + *, swallow: bool = True +) -> ( + Callable[[Callable[P, T]], Callable[P, T | None]] + | Callable[[Callable[P, T]], Callable[P, T]] +): """ - Decorator to suppress and log any exceptions raised by a function. + Decorator to log any exceptions raised by a function, with optional suppression. + + Args: + swallow: Whether to suppress the exception (True) or re-raise it (False). Default is True. + + Usage: + @error_logged() # Default behavior (swallow errors) + @error_logged(swallow=False) # Log and re-raise errors """ - @functools.wraps(func) - def wrapper(*args: P.args, **kwargs: P.kwargs) -> T | None: - try: - return func(*args, **kwargs) - except Exception as e: - logger.exception( - f"Error when calling function {func.__name__} with arguments {args} {kwargs}: {e}" - ) + def decorator(f: Callable[P, T]) -> Callable[P, T | None]: + @functools.wraps(f) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> T | None: + try: + return f(*args, **kwargs) + except Exception as e: + logger.exception( + f"Error when calling function {f.__name__} with arguments {args} {kwargs}: {e}" + ) + if not swallow: + raise + return None - return wrapper + return wrapper + + return decorator +@overload def async_error_logged( - func: Callable[P, Coroutine[Any, Any, T]], -) -> Callable[P, Coroutine[Any, Any, T | None]]: + *, swallow: Literal[True] +) -> Callable[ + [Callable[P, Coroutine[Any, Any, T]]], Callable[P, Coroutine[Any, Any, T | None]] +]: ... + + +@overload +def async_error_logged( + *, swallow: Literal[False] +) -> Callable[ + [Callable[P, Coroutine[Any, Any, T]]], Callable[P, Coroutine[Any, Any, T]] +]: ... + + +@overload +def async_error_logged() -> Callable[ + [Callable[P, Coroutine[Any, Any, T]]], + Callable[P, Coroutine[Any, Any, T | None]], +]: ... + + +def async_error_logged(*, swallow: bool = True) -> ( + Callable[ + [Callable[P, Coroutine[Any, Any, T]]], + Callable[P, Coroutine[Any, Any, T | None]], + ] + | Callable[ + [Callable[P, Coroutine[Any, Any, T]]], Callable[P, Coroutine[Any, Any, T]] + ] +): """ - Decorator to suppress and log any exceptions raised by an async function. + Decorator to log any exceptions raised by an async function, with optional suppression. + + Args: + swallow: Whether to suppress the exception (True) or re-raise it (False). Default is True. + + Usage: + @async_error_logged() # Default behavior (swallow errors) + @async_error_logged(swallow=False) # Log and re-raise errors """ - @functools.wraps(func) - async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T | None: - try: - return await func(*args, **kwargs) - except Exception as e: - logger.exception( - f"Error when calling async function {func.__name__} with arguments {args} {kwargs}: {e}" - ) + def decorator( + f: Callable[P, Coroutine[Any, Any, T]] + ) -> Callable[P, Coroutine[Any, Any, T | None]]: + @functools.wraps(f) + async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T | None: + try: + return await f(*args, **kwargs) + except Exception as e: + logger.exception( + f"Error when calling async function {f.__name__} with arguments {args} {kwargs}: {e}" + ) + if not swallow: + raise + return None - return wrapper + return wrapper + + return decorator diff --git a/autogpt_platform/backend/backend/util/decorator_test.py b/autogpt_platform/backend/backend/util/decorator_test.py index de38b747d8..cdc1cc5b75 100644 --- a/autogpt_platform/backend/backend/util/decorator_test.py +++ b/autogpt_platform/backend/backend/util/decorator_test.py @@ -1,6 +1,8 @@ import time -from backend.util.decorator import error_logged, time_measured +import pytest + +from backend.util.decorator import async_error_logged, error_logged, time_measured @time_measured @@ -9,18 +11,64 @@ def example_function(a: int, b: int, c: int) -> int: return a + b + c -@error_logged -def example_function_with_error(a: int, b: int, c: int) -> int: - raise ValueError("This is a test error") +@error_logged(swallow=True) +def example_function_with_error_swallowed(a: int, b: int, c: int) -> int: + raise ValueError("This error should be swallowed") + + +@error_logged(swallow=False) +def example_function_with_error_not_swallowed(a: int, b: int, c: int) -> int: + raise ValueError("This error should NOT be swallowed") + + +@async_error_logged(swallow=True) +async def async_function_with_error_swallowed() -> int: + raise ValueError("This async error should be swallowed") + + +@async_error_logged(swallow=False) +async def async_function_with_error_not_swallowed() -> int: + raise ValueError("This async error should NOT be swallowed") def test_timer_decorator(): + """Test that the time_measured decorator correctly measures execution time.""" info, res = example_function(1, 2, 3) assert info.cpu_time >= 0 assert info.wall_time >= 0.4 assert res == 6 -def test_error_decorator(): - res = example_function_with_error(1, 2, 3) +def test_error_decorator_swallow_true(): + """Test that error_logged(swallow=True) logs and swallows errors.""" + res = example_function_with_error_swallowed(1, 2, 3) assert res is None + + +def test_error_decorator_swallow_false(): + """Test that error_logged(swallow=False) logs errors but re-raises them.""" + with pytest.raises(ValueError, match="This error should NOT be swallowed"): + example_function_with_error_not_swallowed(1, 2, 3) + + +def test_async_error_decorator_swallow_true(): + """Test that async_error_logged(swallow=True) logs and swallows errors.""" + import asyncio + + async def run_test(): + res = await async_function_with_error_swallowed() + return res + + res = asyncio.run(run_test()) + assert res is None + + +def test_async_error_decorator_swallow_false(): + """Test that async_error_logged(swallow=False) logs errors but re-raises them.""" + import asyncio + + async def run_test(): + await async_function_with_error_not_swallowed() + + with pytest.raises(ValueError, match="This async error should NOT be swallowed"): + asyncio.run(run_test()) diff --git a/autogpt_platform/backend/backend/util/service_test.py b/autogpt_platform/backend/backend/util/service_test.py index 49f7d6f66a..3d8eb29419 100644 --- a/autogpt_platform/backend/backend/util/service_test.py +++ b/autogpt_platform/backend/backend/util/service_test.py @@ -51,7 +51,7 @@ class ServiceTestClient(AppServiceClient): subtract_async = endpoint_to_async(ServiceTest.subtract) -@pytest.mark.asyncio(loop_scope="session") +@pytest.mark.asyncio async def test_service_creation(server): with ServiceTest(): client = get_service_client(ServiceTestClient)