diff --git a/autogpt_platform/backend/.env.example b/autogpt_platform/backend/.env.example index 18343d7725..395248f7ed 100644 --- a/autogpt_platform/backend/.env.example +++ b/autogpt_platform/backend/.env.example @@ -55,9 +55,9 @@ RABBITMQ_DEFAULT_PASS=k0VMxyIJF9S35f3x2uaw5IWAl6Y536O7 ## GCS bucket is required for marketplace and library functionality MEDIA_GCS_BUCKET_NAME= -## For local development, you may need to set FRONTEND_BASE_URL for the OAuth flow +## For local development, you may need to set NEXT_PUBLIC_FRONTEND_BASE_URL for the OAuth flow ## for integrations to work. Defaults to the value of PLATFORM_BASE_URL if not set. -# FRONTEND_BASE_URL=http://localhost:3000 +# NEXT_PUBLIC_FRONTEND_BASE_URL=http://localhost:3000 ## PLATFORM_BASE_URL must be set to a *publicly accessible* URL pointing to your backend ## to use the platform's webhook-related functionality. diff --git a/autogpt_platform/backend/backend/data/execution.py b/autogpt_platform/backend/backend/data/execution.py index 2a31fef256..090d120580 100644 --- a/autogpt_platform/backend/backend/data/execution.py +++ b/autogpt_platform/backend/backend/data/execution.py @@ -588,12 +588,10 @@ async def update_graph_execution_start_time( async def update_graph_execution_stats( graph_exec_id: str, - status: ExecutionStatus, + status: ExecutionStatus | None = None, stats: GraphExecutionStats | None = None, ) -> GraphExecution | None: - update_data: AgentGraphExecutionUpdateManyMutationInput = { - "executionStatus": status - } + update_data: AgentGraphExecutionUpdateManyMutationInput = {} if stats: stats_dict = stats.model_dump() @@ -601,6 +599,9 @@ async def update_graph_execution_stats( stats_dict["error"] = str(stats_dict["error"]) update_data["stats"] = Json(stats_dict) + if status: + update_data["executionStatus"] = status + updated_count = await AgentGraphExecution.prisma().update_many( where={ "id": graph_exec_id, diff --git a/autogpt_platform/backend/backend/executor/manager.py b/autogpt_platform/backend/backend/executor/manager.py index b787ccf890..db0d2db37f 100644 --- a/autogpt_platform/backend/backend/executor/manager.py +++ b/autogpt_platform/backend/backend/executor/manager.py @@ -421,7 +421,7 @@ class Executor: """ @classmethod - @async_error_logged + @async_error_logged(swallow=True) async def on_node_execution( cls, node_exec: NodeExecutionEntry, @@ -529,7 +529,7 @@ class Executor: logger.info(f"[GraphExecutor] {cls.pid} started") @classmethod - @error_logged + @error_logged(swallow=False) def on_graph_execution( cls, graph_exec: GraphExecutionEntry, cancel: threading.Event ): @@ -581,6 +581,15 @@ class Executor: exec_stats.cputime += timing_info.cpu_time exec_stats.error = str(error) if error else exec_stats.error + if status not in { + ExecutionStatus.COMPLETED, + ExecutionStatus.TERMINATED, + ExecutionStatus.FAILED, + }: + raise RuntimeError( + f"Graph Execution #{graph_exec.graph_exec_id} ended with unexpected status {status}" + ) + if graph_exec_result := db_client.update_graph_execution_stats( graph_exec_id=graph_exec.graph_exec_id, status=status, @@ -684,7 +693,6 @@ class Executor: if _graph_exec := db_client.update_graph_execution_stats( graph_exec_id=graph_exec.graph_exec_id, - status=execution_status, stats=execution_stats, ): send_execution_update(_graph_exec) diff --git a/autogpt_platform/backend/backend/util/decorator.py b/autogpt_platform/backend/backend/util/decorator.py index 84f128333f..a0b0ce91ba 100644 --- a/autogpt_platform/backend/backend/util/decorator.py +++ b/autogpt_platform/backend/backend/util/decorator.py @@ -2,7 +2,17 @@ import functools import logging import os import time -from typing import Any, Awaitable, Callable, Coroutine, ParamSpec, Tuple, TypeVar +from typing import ( + Any, + Awaitable, + Callable, + Coroutine, + Literal, + ParamSpec, + Tuple, + TypeVar, + overload, +) from pydantic import BaseModel @@ -72,37 +82,115 @@ def async_time_measured( return async_wrapper -def error_logged(func: Callable[P, T]) -> Callable[P, T | None]: +@overload +def error_logged( + *, swallow: Literal[True] +) -> Callable[[Callable[P, T]], Callable[P, T | None]]: ... + + +@overload +def error_logged( + *, swallow: Literal[False] +) -> Callable[[Callable[P, T]], Callable[P, T]]: ... + + +@overload +def error_logged() -> Callable[[Callable[P, T]], Callable[P, T | None]]: ... + + +def error_logged( + *, swallow: bool = True +) -> ( + Callable[[Callable[P, T]], Callable[P, T | None]] + | Callable[[Callable[P, T]], Callable[P, T]] +): """ - Decorator to suppress and log any exceptions raised by a function. + Decorator to log any exceptions raised by a function, with optional suppression. + + Args: + swallow: Whether to suppress the exception (True) or re-raise it (False). Default is True. + + Usage: + @error_logged() # Default behavior (swallow errors) + @error_logged(swallow=False) # Log and re-raise errors """ - @functools.wraps(func) - def wrapper(*args: P.args, **kwargs: P.kwargs) -> T | None: - try: - return func(*args, **kwargs) - except Exception as e: - logger.exception( - f"Error when calling function {func.__name__} with arguments {args} {kwargs}: {e}" - ) + def decorator(f: Callable[P, T]) -> Callable[P, T | None]: + @functools.wraps(f) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> T | None: + try: + return f(*args, **kwargs) + except Exception as e: + logger.exception( + f"Error when calling function {f.__name__} with arguments {args} {kwargs}: {e}" + ) + if not swallow: + raise + return None - return wrapper + return wrapper + + return decorator +@overload def async_error_logged( - func: Callable[P, Coroutine[Any, Any, T]], -) -> Callable[P, Coroutine[Any, Any, T | None]]: + *, swallow: Literal[True] +) -> Callable[ + [Callable[P, Coroutine[Any, Any, T]]], Callable[P, Coroutine[Any, Any, T | None]] +]: ... + + +@overload +def async_error_logged( + *, swallow: Literal[False] +) -> Callable[ + [Callable[P, Coroutine[Any, Any, T]]], Callable[P, Coroutine[Any, Any, T]] +]: ... + + +@overload +def async_error_logged() -> Callable[ + [Callable[P, Coroutine[Any, Any, T]]], + Callable[P, Coroutine[Any, Any, T | None]], +]: ... + + +def async_error_logged(*, swallow: bool = True) -> ( + Callable[ + [Callable[P, Coroutine[Any, Any, T]]], + Callable[P, Coroutine[Any, Any, T | None]], + ] + | Callable[ + [Callable[P, Coroutine[Any, Any, T]]], Callable[P, Coroutine[Any, Any, T]] + ] +): """ - Decorator to suppress and log any exceptions raised by an async function. + Decorator to log any exceptions raised by an async function, with optional suppression. + + Args: + swallow: Whether to suppress the exception (True) or re-raise it (False). Default is True. + + Usage: + @async_error_logged() # Default behavior (swallow errors) + @async_error_logged(swallow=False) # Log and re-raise errors """ - @functools.wraps(func) - async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T | None: - try: - return await func(*args, **kwargs) - except Exception as e: - logger.exception( - f"Error when calling async function {func.__name__} with arguments {args} {kwargs}: {e}" - ) + def decorator( + f: Callable[P, Coroutine[Any, Any, T]] + ) -> Callable[P, Coroutine[Any, Any, T | None]]: + @functools.wraps(f) + async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T | None: + try: + return await f(*args, **kwargs) + except Exception as e: + logger.exception( + f"Error when calling async function {f.__name__} with arguments {args} {kwargs}: {e}" + ) + if not swallow: + raise + return None - return wrapper + return wrapper + + return decorator diff --git a/autogpt_platform/backend/backend/util/decorator_test.py b/autogpt_platform/backend/backend/util/decorator_test.py index de38b747d8..cdc1cc5b75 100644 --- a/autogpt_platform/backend/backend/util/decorator_test.py +++ b/autogpt_platform/backend/backend/util/decorator_test.py @@ -1,6 +1,8 @@ import time -from backend.util.decorator import error_logged, time_measured +import pytest + +from backend.util.decorator import async_error_logged, error_logged, time_measured @time_measured @@ -9,18 +11,64 @@ def example_function(a: int, b: int, c: int) -> int: return a + b + c -@error_logged -def example_function_with_error(a: int, b: int, c: int) -> int: - raise ValueError("This is a test error") +@error_logged(swallow=True) +def example_function_with_error_swallowed(a: int, b: int, c: int) -> int: + raise ValueError("This error should be swallowed") + + +@error_logged(swallow=False) +def example_function_with_error_not_swallowed(a: int, b: int, c: int) -> int: + raise ValueError("This error should NOT be swallowed") + + +@async_error_logged(swallow=True) +async def async_function_with_error_swallowed() -> int: + raise ValueError("This async error should be swallowed") + + +@async_error_logged(swallow=False) +async def async_function_with_error_not_swallowed() -> int: + raise ValueError("This async error should NOT be swallowed") def test_timer_decorator(): + """Test that the time_measured decorator correctly measures execution time.""" info, res = example_function(1, 2, 3) assert info.cpu_time >= 0 assert info.wall_time >= 0.4 assert res == 6 -def test_error_decorator(): - res = example_function_with_error(1, 2, 3) +def test_error_decorator_swallow_true(): + """Test that error_logged(swallow=True) logs and swallows errors.""" + res = example_function_with_error_swallowed(1, 2, 3) assert res is None + + +def test_error_decorator_swallow_false(): + """Test that error_logged(swallow=False) logs errors but re-raises them.""" + with pytest.raises(ValueError, match="This error should NOT be swallowed"): + example_function_with_error_not_swallowed(1, 2, 3) + + +def test_async_error_decorator_swallow_true(): + """Test that async_error_logged(swallow=True) logs and swallows errors.""" + import asyncio + + async def run_test(): + res = await async_function_with_error_swallowed() + return res + + res = asyncio.run(run_test()) + assert res is None + + +def test_async_error_decorator_swallow_false(): + """Test that async_error_logged(swallow=False) logs errors but re-raises them.""" + import asyncio + + async def run_test(): + await async_function_with_error_not_swallowed() + + with pytest.raises(ValueError, match="This async error should NOT be swallowed"): + asyncio.run(run_test()) diff --git a/autogpt_platform/backend/backend/util/service_test.py b/autogpt_platform/backend/backend/util/service_test.py index 49f7d6f66a..3d8eb29419 100644 --- a/autogpt_platform/backend/backend/util/service_test.py +++ b/autogpt_platform/backend/backend/util/service_test.py @@ -51,7 +51,7 @@ class ServiceTestClient(AppServiceClient): subtract_async = endpoint_to_async(ServiceTest.subtract) -@pytest.mark.asyncio(loop_scope="session") +@pytest.mark.asyncio async def test_service_creation(server): with ServiceTest(): client = get_service_client(ServiceTestClient) diff --git a/autogpt_platform/backend/backend/util/settings.py b/autogpt_platform/backend/backend/util/settings.py index 542c010bca..feef2d3c97 100644 --- a/autogpt_platform/backend/backend/util/settings.py +++ b/autogpt_platform/backend/backend/util/settings.py @@ -254,6 +254,14 @@ class Config(UpdateTrackingModel["Config"], BaseSettings): default=True, description="Whether virus scanning is enabled or not", ) + clamav_max_concurrency: int = Field( + default=10, + description="The maximum number of concurrent scans to perform", + ) + clamav_mark_failed_scans_as_clean: bool = Field( + default=False, + description="Whether to mark failed scans as clean or not", + ) @field_validator("platform_base_url", "frontend_base_url") @classmethod diff --git a/autogpt_platform/backend/backend/util/virus_scanner.py b/autogpt_platform/backend/backend/util/virus_scanner.py index eb096b605b..4494c94790 100644 --- a/autogpt_platform/backend/backend/util/virus_scanner.py +++ b/autogpt_platform/backend/backend/util/virus_scanner.py @@ -1,9 +1,10 @@ import asyncio +import io import logging import time from typing import Optional, Tuple -import pyclamd +import aioclamd from pydantic import BaseModel from pydantic_settings import BaseSettings @@ -21,37 +22,47 @@ class VirusScanResult(BaseModel): class VirusScannerSettings(BaseSettings): + # Tunables for the scanner layer (NOT the ClamAV daemon). clamav_service_host: str = "localhost" clamav_service_port: int = 3310 clamav_service_timeout: int = 60 clamav_service_enabled: bool = True - max_scan_size: int = 100 * 1024 * 1024 # 100 MB - chunk_size: int = 25 * 1024 * 1024 # 25 MB (safe for 50MB stream limit) - min_chunk_size: int = 128 * 1024 # 128 KB minimum - max_retries: int = 8 # halve chunk ≤ max_retries times + # If the service is disabled, all files are considered clean. + mark_failed_scans_as_clean: bool = False + # Client-side protective limits + max_scan_size: int = 2 * 1024 * 1024 * 1024 # 2 GB guard-rail in memory + min_chunk_size: int = 128 * 1024 # 128 KB hard floor + max_retries: int = 8 # halve ≤ max_retries times + # Concurrency throttle toward the ClamAV daemon. Do *NOT* simply turn this + # up to the number of CPU cores; keep it ≤ (MaxThreads / pods) – 1. + max_concurrency: int = 5 class VirusScannerService: - """ - Thin async wrapper around ClamAV. Creates a fresh `ClamdNetworkSocket` - per chunk (the class is *not* thread-safe) and falls back to smaller - chunks when the daemon rejects the stream size. + """Fully-async ClamAV wrapper using **aioclamd**. + + • Reuses a single `ClamdAsyncClient` connection (aioclamd keeps the socket open). + • Throttles concurrent `INSTREAM` calls with an `asyncio.Semaphore` so we don't exhaust daemon worker threads or file descriptors. + • Falls back to progressively smaller chunk sizes when the daemon rejects a stream as too large. """ def __init__(self, settings: VirusScannerSettings) -> None: self.settings = settings - - def _new_client(self) -> pyclamd.ClamdNetworkSocket: - return pyclamd.ClamdNetworkSocket( - host=self.settings.clamav_service_host, - port=self.settings.clamav_service_port, - timeout=self.settings.clamav_service_timeout, + self._client = aioclamd.ClamdAsyncClient( + host=settings.clamav_service_host, + port=settings.clamav_service_port, + timeout=settings.clamav_service_timeout, ) + self._sem = asyncio.Semaphore(settings.max_concurrency) + + # ------------------------------------------------------------------ # + # Helpers + # ------------------------------------------------------------------ # @staticmethod def _parse_raw(raw: Optional[dict]) -> Tuple[bool, Optional[str]]: """ - Convert pyclamd output to (infected?, threat_name). + Convert aioclamd output to (infected?, threat_name). Returns (False, None) for clean. """ if not raw: @@ -59,24 +70,22 @@ class VirusScannerService: status, threat = next(iter(raw.values())) return status == "FOUND", threat - async def _scan_chunk(self, chunk: bytes) -> Tuple[bool, Optional[str]]: - loop = asyncio.get_running_loop() - client = self._new_client() - try: - raw = await loop.run_in_executor(None, client.scan_stream, chunk) - return self._parse_raw(raw) - - # ClamAV aborts the socket when >StreamMaxLength → BrokenPipe/Reset. - except (BrokenPipeError, ConnectionResetError) as exc: - raise RuntimeError("size-limit") from exc - except Exception as exc: - if "INSTREAM size limit exceeded" in str(exc): + async def _instream(self, chunk: bytes) -> Tuple[bool, Optional[str]]: + """Scan **one** chunk with concurrency control.""" + async with self._sem: + try: + raw = await self._client.instream(io.BytesIO(chunk)) + return self._parse_raw(raw) + except (BrokenPipeError, ConnectionResetError) as exc: raise RuntimeError("size-limit") from exc - raise + except Exception as exc: + if "INSTREAM size limit exceeded" in str(exc): + raise RuntimeError("size-limit") from exc + raise - # --------------------------------------------------------------------- # + # ------------------------------------------------------------------ # # Public API - # --------------------------------------------------------------------- # + # ------------------------------------------------------------------ # async def scan_file( self, content: bytes, *, filename: str = "unknown" @@ -84,81 +93,74 @@ class VirusScannerService: """ Scan `content`. Returns a result object or raises on infrastructure failure (unreachable daemon, etc.). + The algorithm always tries whole-file first. If the daemon refuses + on size grounds, it falls back to chunked parallel scanning. """ if not self.settings.clamav_service_enabled: - logger.warning("Virus scanning disabled – accepting %s", filename) + logger.warning(f"Virus scanning disabled – accepting {filename}") return VirusScanResult( is_clean=True, scan_time_ms=0, file_size=len(content) ) - if len(content) > self.settings.max_scan_size: logger.warning( - f"File {filename} ({len(content)} bytes) exceeds max scan size ({self.settings.max_scan_size}), skipping virus scan" + f"File {filename} ({len(content)} bytes) exceeds client max scan size ({self.settings.max_scan_size}); Stopping virus scan" ) return VirusScanResult( - is_clean=True, # Assume clean for oversized files + is_clean=self.settings.mark_failed_scans_as_clean, file_size=len(content), scan_time_ms=0, - threat_name=None, ) - loop = asyncio.get_running_loop() - if not await loop.run_in_executor(None, self._new_client().ping): + # Ensure daemon is reachable (small RTT check) + if not await self._client.ping(): raise RuntimeError("ClamAV service is unreachable") start = time.monotonic() - chunk_size = self.settings.chunk_size - - for retry in range(self.settings.max_retries + 1): + chunk_size = len(content) # Start with full content length + for retry in range(self.settings.max_retries): + # For small files, don't check min_chunk_size limit + if chunk_size < self.settings.min_chunk_size and chunk_size < len(content): + break + logger.debug( + f"Scanning {filename} with chunk size: {chunk_size // 1_048_576} MB (retry {retry + 1}/{self.settings.max_retries})" + ) try: - logger.debug( - f"Scanning {filename} with chunk size: {chunk_size // 1_048_576}MB" - ) - - # Scan all chunks with current chunk size - for offset in range(0, len(content), chunk_size): - chunk_data = content[offset : offset + chunk_size] - infected, threat = await self._scan_chunk(chunk_data) + tasks = [ + asyncio.create_task(self._instream(content[o : o + chunk_size])) + for o in range(0, len(content), chunk_size) + ] + for coro in asyncio.as_completed(tasks): + infected, threat = await coro if infected: + for t in tasks: + if not t.done(): + t.cancel() return VirusScanResult( is_clean=False, threat_name=threat, file_size=len(content), scan_time_ms=int((time.monotonic() - start) * 1000), ) - # All chunks clean return VirusScanResult( is_clean=True, file_size=len(content), scan_time_ms=int((time.monotonic() - start) * 1000), ) - except RuntimeError as exc: - if ( - str(exc) == "size-limit" - and chunk_size > self.settings.min_chunk_size - ): + if str(exc) == "size-limit": chunk_size //= 2 - logger.info( - f"Chunk size too large for {filename}, reducing to {chunk_size // 1_048_576}MB (retry {retry + 1}/{self.settings.max_retries + 1})" - ) continue - else: - # Either not a size-limit error, or we've hit minimum chunk size - logger.error(f"Cannot scan {filename}: {exc}") - raise - - # If we can't scan even with minimum chunk size, log warning and allow file + logger.error(f"Cannot scan {filename}: {exc}") + raise + # Phase 3 – give up but warn logger.warning( - f"Unable to virus scan {filename} ({len(content)} bytes) - chunk size limits exceeded. " - f"Allowing file but recommend manual review." + f"Unable to virus scan {filename} ({len(content)} bytes) even with minimum chunk size ({self.settings.min_chunk_size} bytes). Recommend manual review." ) return VirusScanResult( - is_clean=True, # Allow file when scanning impossible + is_clean=self.settings.mark_failed_scans_as_clean, file_size=len(content), scan_time_ms=int((time.monotonic() - start) * 1000), - threat_name=None, ) @@ -172,6 +174,8 @@ def get_virus_scanner() -> VirusScannerService: clamav_service_host=settings.config.clamav_service_host, clamav_service_port=settings.config.clamav_service_port, clamav_service_enabled=settings.config.clamav_service_enabled, + max_concurrency=settings.config.clamav_max_concurrency, + mark_failed_scans_as_clean=settings.config.clamav_mark_failed_scans_as_clean, ) _scanner = VirusScannerService(_settings) return _scanner diff --git a/autogpt_platform/backend/backend/util/virus_scanner_test.py b/autogpt_platform/backend/backend/util/virus_scanner_test.py index 561d60e569..81b5ad3342 100644 --- a/autogpt_platform/backend/backend/util/virus_scanner_test.py +++ b/autogpt_platform/backend/backend/util/virus_scanner_test.py @@ -1,5 +1,4 @@ import asyncio -import time from unittest.mock import AsyncMock, Mock, patch import pytest @@ -22,6 +21,7 @@ class TestVirusScannerService: clamav_service_port=3310, clamav_service_enabled=True, max_scan_size=10 * 1024 * 1024, # 10MB for testing + mark_failed_scans_as_clean=False, # For testing, failed scans should be clean ) @pytest.fixture @@ -54,25 +54,51 @@ class TestVirusScannerService: # Create content larger than max_scan_size large_content = b"x" * (scanner.settings.max_scan_size + 1) - # Large files are allowed but marked as clean with a warning + # Large files behavior depends on mark_failed_scans_as_clean setting result = await scanner.scan_file(large_content, filename="large_file.txt") - assert result.is_clean is True + assert result.is_clean == scanner.settings.mark_failed_scans_as_clean assert result.file_size == len(large_content) assert result.scan_time_ms == 0 + @pytest.mark.asyncio + async def test_scan_file_too_large_both_configurations(self): + """Test large file handling with both mark_failed_scans_as_clean configurations""" + large_content = b"x" * (10 * 1024 * 1024 + 1) # Larger than 10MB + + # Test with mark_failed_scans_as_clean=True + settings_clean = VirusScannerSettings( + max_scan_size=10 * 1024 * 1024, mark_failed_scans_as_clean=True + ) + scanner_clean = VirusScannerService(settings_clean) + result_clean = await scanner_clean.scan_file( + large_content, filename="large_file.txt" + ) + assert result_clean.is_clean is True + + # Test with mark_failed_scans_as_clean=False + settings_dirty = VirusScannerSettings( + max_scan_size=10 * 1024 * 1024, mark_failed_scans_as_clean=False + ) + scanner_dirty = VirusScannerService(settings_dirty) + result_dirty = await scanner_dirty.scan_file( + large_content, filename="large_file.txt" + ) + assert result_dirty.is_clean is False + # Note: ping method was removed from current implementation @pytest.mark.asyncio - @patch("pyclamd.ClamdNetworkSocket") - async def test_scan_clean_file(self, mock_clamav_class, scanner): - def mock_scan_stream(_): - time.sleep(0.001) # Small delay to ensure timing > 0 + async def test_scan_clean_file(self, scanner): + async def mock_instream(_): + await asyncio.sleep(0.001) # Small delay to ensure timing > 0 return None # No virus detected mock_client = Mock() - mock_client.ping.return_value = True - mock_client.scan_stream = mock_scan_stream - mock_clamav_class.return_value = mock_client + mock_client.ping = AsyncMock(return_value=True) + mock_client.instream = AsyncMock(side_effect=mock_instream) + + # Replace the client instance that was created in the constructor + scanner._client = mock_client content = b"clean file content" result = await scanner.scan_file(content, filename="clean.txt") @@ -83,16 +109,17 @@ class TestVirusScannerService: assert result.scan_time_ms > 0 @pytest.mark.asyncio - @patch("pyclamd.ClamdNetworkSocket") - async def test_scan_infected_file(self, mock_clamav_class, scanner): - def mock_scan_stream(_): - time.sleep(0.001) # Small delay to ensure timing > 0 + async def test_scan_infected_file(self, scanner): + async def mock_instream(_): + await asyncio.sleep(0.001) # Small delay to ensure timing > 0 return {"stream": ("FOUND", "Win.Test.EICAR_HDB-1")} mock_client = Mock() - mock_client.ping.return_value = True - mock_client.scan_stream = mock_scan_stream - mock_clamav_class.return_value = mock_client + mock_client.ping = AsyncMock(return_value=True) + mock_client.instream = AsyncMock(side_effect=mock_instream) + + # Replace the client instance that was created in the constructor + scanner._client = mock_client content = b"infected file content" result = await scanner.scan_file(content, filename="infected.txt") @@ -103,11 +130,12 @@ class TestVirusScannerService: assert result.scan_time_ms > 0 @pytest.mark.asyncio - @patch("pyclamd.ClamdNetworkSocket") - async def test_scan_clamav_unavailable_fail_safe(self, mock_clamav_class, scanner): + async def test_scan_clamav_unavailable_fail_safe(self, scanner): mock_client = Mock() - mock_client.ping.return_value = False - mock_clamav_class.return_value = mock_client + mock_client.ping = AsyncMock(return_value=False) + + # Replace the client instance that was created in the constructor + scanner._client = mock_client content = b"test content" @@ -115,12 +143,13 @@ class TestVirusScannerService: await scanner.scan_file(content, filename="test.txt") @pytest.mark.asyncio - @patch("pyclamd.ClamdNetworkSocket") - async def test_scan_error_fail_safe(self, mock_clamav_class, scanner): + async def test_scan_error_fail_safe(self, scanner): mock_client = Mock() - mock_client.ping.return_value = True - mock_client.scan_stream.side_effect = Exception("Scanning error") - mock_clamav_class.return_value = mock_client + mock_client.ping = AsyncMock(return_value=True) + mock_client.instream = AsyncMock(side_effect=Exception("Scanning error")) + + # Replace the client instance that was created in the constructor + scanner._client = mock_client content = b"test content" @@ -150,16 +179,17 @@ class TestVirusScannerService: assert result.file_size == 1024 @pytest.mark.asyncio - @patch("pyclamd.ClamdNetworkSocket") - async def test_concurrent_scans(self, mock_clamav_class, scanner): - def mock_scan_stream(_): - time.sleep(0.001) # Small delay to ensure timing > 0 + async def test_concurrent_scans(self, scanner): + async def mock_instream(_): + await asyncio.sleep(0.001) # Small delay to ensure timing > 0 return None mock_client = Mock() - mock_client.ping.return_value = True - mock_client.scan_stream = mock_scan_stream - mock_clamav_class.return_value = mock_client + mock_client.ping = AsyncMock(return_value=True) + mock_client.instream = AsyncMock(side_effect=mock_instream) + + # Replace the client instance that was created in the constructor + scanner._client = mock_client content1 = b"file1 content" content2 = b"file2 content" diff --git a/autogpt_platform/backend/poetry.lock b/autogpt_platform/backend/poetry.lock index 078dd4c526..b3312f7c39 100644 --- a/autogpt_platform/backend/poetry.lock +++ b/autogpt_platform/backend/poetry.lock @@ -17,6 +17,18 @@ aiormq = ">=6.8,<6.9" exceptiongroup = ">=1,<2" yarl = "*" +[[package]] +name = "aioclamd" +version = "1.0.0" +description = "Asynchronous client for virus scanning with ClamAV" +optional = false +python-versions = ">=3.7,<4.0" +groups = ["main"] +files = [ + {file = "aioclamd-1.0.0-py3-none-any.whl", hash = "sha256:4727da3953a4b38be4c2de1acb6b3bb3c94c1c171dcac780b80234ee6253f3d9"}, + {file = "aioclamd-1.0.0.tar.gz", hash = "sha256:7b14e94e3a2285cc89e2f4d434e2a01f322d3cb95476ce2dda015a7980876047"}, +] + [[package]] name = "aiodns" version = "3.4.0" @@ -3856,17 +3868,6 @@ cffi = ">=1.5.0" [package.extras] idna = ["idna (>=2.1)"] -[[package]] -name = "pyclamd" -version = "0.4.0" -description = "pyClamd is a python interface to Clamd (Clamav daemon)." -optional = false -python-versions = "*" -groups = ["main"] -files = [ - {file = "pyClamd-0.4.0.tar.gz", hash = "sha256:ddd588577e5db123760b6ddaac46b5c4b1d9044a00b5d9422de59f83a55c20fe"}, -] - [[package]] name = "pycodestyle" version = "2.13.0" @@ -5017,6 +5018,27 @@ statsig = ["statsig (>=0.55.3)"] tornado = ["tornado (>=6)"] unleash = ["UnleashClient (>=6.0.1)"] +[[package]] +name = "setuptools" +version = "80.9.0" +description = "Easily download, build, install, upgrade, and uninstall Python packages" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "setuptools-80.9.0-py3-none-any.whl", hash = "sha256:062d34222ad13e0cc312a4c02d73f059e86a4acbfbdea8f8f76b28c99f306922"}, + {file = "setuptools-80.9.0.tar.gz", hash = "sha256:f36b47402ecde768dbfafc46e8e4207b4360c654f1f3bb84475f0a28628fb19c"}, +] + +[package.extras] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\"", "ruff (>=0.8.0) ; sys_platform != \"cygwin\""] +core = ["importlib_metadata (>=6) ; python_version < \"3.10\"", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1) ; python_version < \"3.11\"", "wheel (>=0.43.0)"] +cover = ["pytest-cov"] +doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier", "towncrier (<24.7)"] +enabler = ["pytest-enabler (>=2.2)"] +test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"] +type = ["importlib_metadata (>=7.0.2) ; python_version < \"3.10\"", "jaraco.develop (>=7.21) ; sys_platform != \"cygwin\"", "mypy (==1.14.*)", "pytest-mypy"] + [[package]] name = "sgmllib3k" version = "1.0.0" @@ -6380,4 +6402,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<3.13" -content-hash = "bd117a21d817a2a735ed923c383713dd08469938ef5f7d07c4222da1acca2b5c" +content-hash = "b5c1201f27ee8d05d5d8c89702123df4293f124301d1aef7451591a351872260" diff --git a/autogpt_platform/backend/pyproject.toml b/autogpt_platform/backend/pyproject.toml index 5980bf3b6b..30429152cb 100644 --- a/autogpt_platform/backend/pyproject.toml +++ b/autogpt_platform/backend/pyproject.toml @@ -68,8 +68,9 @@ zerobouncesdk = "^1.1.1" # NOTE: please insert new dependencies in their alphabetical location pytest-snapshot = "^0.9.0" aiofiles = "^24.1.0" -pyclamd = "^0.4.0" tiktoken = "^0.9.0" +aioclamd = "^1.0.0" +setuptools = "^80.9.0" [tool.poetry.group.dev.dependencies] aiohappyeyeballs = "^2.6.1" diff --git a/autogpt_platform/docker-compose.platform.yml b/autogpt_platform/docker-compose.platform.yml index cc5e8c3d6a..ca9a483b40 100644 --- a/autogpt_platform/docker-compose.platform.yml +++ b/autogpt_platform/docker-compose.platform.yml @@ -93,7 +93,7 @@ services: - SCHEDULER_HOST=scheduler_server - EXECUTIONMANAGER_HOST=executor - NOTIFICATIONMANAGER_HOST=rest_server - - FRONTEND_BASE_URL=http://localhost:3000 + - NEXT_PUBLIC_FRONTEND_BASE_URL=http://localhost:3000 - BACKEND_CORS_ALLOW_ORIGINS=["http://localhost:3000"] - ENCRYPTION_KEY=dvziYgz0KSK8FENhju0ZYi8-fRTfAdlz6YLhdB_jhNw= # DO NOT USE IN PRODUCTION!! - UNSUBSCRIBE_SECRET_KEY=HlP8ivStJjmbf6NKi78m_3FnOogut0t5ckzjsIqeaio= # DO NOT USE IN PRODUCTION!! diff --git a/autogpt_platform/frontend/.env.example b/autogpt_platform/frontend/.env.example index 0660979013..287efe5f22 100644 --- a/autogpt_platform/frontend/.env.example +++ b/autogpt_platform/frontend/.env.example @@ -1,4 +1,4 @@ -FRONTEND_BASE_URL=http://localhost:3000 +NEXT_PUBLIC_FRONTEND_BASE_URL=http://localhost:3000 NEXT_PUBLIC_AUTH_CALLBACK_URL=http://localhost:8006/auth/callback NEXT_PUBLIC_AGPT_SERVER_URL=http://localhost:8006/api @@ -23,7 +23,7 @@ NEXT_PUBLIC_SUPABASE_ANON_KEY=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyAgCiAgICAic ## OAuth Callback URL ## This should be {domain}/auth/callback ## Only used if you're using Supabase and OAuth -AUTH_CALLBACK_URL="${FRONTEND_BASE_URL}/auth/callback" +AUTH_CALLBACK_URL="${NEXT_PUBLIC_FRONTEND_BASE_URL}/auth/callback" GA_MEASUREMENT_ID=G-FH2XK2W4GN # When running locally, set NEXT_PUBLIC_BEHAVE_AS=CLOUD to use the a locally hosted marketplace (as is typical in development, and the cloud deployment), otherwise set it to LOCAL to have the marketplace open in a new tab diff --git a/autogpt_platform/frontend/src/app/(platform)/reset-password/actions.ts b/autogpt_platform/frontend/src/app/(platform)/reset-password/actions.ts index 0ee4e4e7cc..327b4f86de 100644 --- a/autogpt_platform/frontend/src/app/(platform)/reset-password/actions.ts +++ b/autogpt_platform/frontend/src/app/(platform)/reset-password/actions.ts @@ -10,7 +10,8 @@ export async function sendResetEmail(email: string, turnstileToken: string) { {}, async () => { const supabase = await getServerSupabase(); - const origin = process.env.FRONTEND_BASE_URL || "http://localhost:3000"; + const origin = + process.env.NEXT_PUBLIC_FRONTEND_BASE_URL || "http://localhost:3000"; if (!supabase) { redirect("/error"); diff --git a/autogpt_platform/frontend/src/app/api/mutators/custom-mutator.ts b/autogpt_platform/frontend/src/app/api/mutators/custom-mutator.ts index f0fdbc308c..53b899dfd5 100644 --- a/autogpt_platform/frontend/src/app/api/mutators/custom-mutator.ts +++ b/autogpt_platform/frontend/src/app/api/mutators/custom-mutator.ts @@ -1,4 +1,4 @@ -const BASE_URL = "/api/proxy"; // Sending request via nextjs Server +const BASE_URL = `${process.env.NEXT_PUBLIC_FRONTEND_BASE_URL}/api/proxy`; // Sending request via nextjs Server const getBody = (c: Response | Request): Promise => { const contentType = c.headers.get("content-type"); diff --git a/autogpt_platform/frontend/src/components/node-input-components.tsx b/autogpt_platform/frontend/src/components/node-input-components.tsx index b1dabfb99c..5a8c197434 100644 --- a/autogpt_platform/frontend/src/components/node-input-components.tsx +++ b/autogpt_platform/frontend/src/components/node-input-components.tsx @@ -1219,7 +1219,9 @@ const NodeBooleanInput: FC<{ className?: string; displayName: string; }> = ({ selfKey, schema, value, error, handleInputChange, className }) => { - value ||= schema.default ?? false; + if (value == null) { + value = schema.default ?? false; + } return (