diff --git a/autogpt_platform/backend/backend/data/execution.py b/autogpt_platform/backend/backend/data/execution.py index 3a2fc38ac9..6b2aa5c60c 100644 --- a/autogpt_platform/backend/backend/data/execution.py +++ b/autogpt_platform/backend/backend/data/execution.py @@ -33,7 +33,7 @@ from prisma.types import ( AgentNodeExecutionUpdateInput, AgentNodeExecutionWhereInput, ) -from pydantic import BaseModel, ConfigDict, JsonValue +from pydantic import BaseModel, ConfigDict, JsonValue, ValidationError from pydantic.fields import Field from backend.server.v2.store.exceptions import DatabaseError @@ -59,7 +59,7 @@ from .includes import ( GRAPH_EXECUTION_INCLUDE_WITH_NODES, graph_execution_include, ) -from .model import GraphExecutionStats +from .model import GraphExecutionStats, NodeExecutionStats T = TypeVar("T") @@ -318,18 +318,30 @@ class NodeExecutionResult(BaseModel): @staticmethod def from_db(_node_exec: AgentNodeExecution, user_id: Optional[str] = None): - if _node_exec.executionData: - # Execution that has been queued for execution will persist its data. + try: + stats = NodeExecutionStats.model_validate(_node_exec.stats or {}) + except (ValueError, ValidationError): + stats = NodeExecutionStats() + + if stats.cleared_inputs: + input_data: BlockInput = defaultdict() + for name, messages in stats.cleared_inputs.items(): + input_data[name] = messages[-1] if messages else "" + elif _node_exec.executionData: input_data = type_utils.convert(_node_exec.executionData, dict[str, Any]) else: - # For incomplete execution, executionData will not be yet available. input_data: BlockInput = defaultdict() for data in _node_exec.Input or []: input_data[data.name] = type_utils.convert(data.data, type[Any]) output_data: CompletedBlockOutput = defaultdict(list) - for data in _node_exec.Output or []: - output_data[data.name].append(type_utils.convert(data.data, type[Any])) + + if stats.cleared_outputs: + for name, messages in stats.cleared_outputs.items(): + output_data[name].extend(messages) + else: + for data in _node_exec.Output or []: + output_data[data.name].append(type_utils.convert(data.data, type[Any])) graph_execution: AgentGraphExecution | None = _node_exec.GraphExecution if graph_execution: diff --git a/autogpt_platform/backend/backend/data/model.py b/autogpt_platform/backend/backend/data/model.py index f7a81735a4..66854dc73e 100644 --- a/autogpt_platform/backend/backend/data/model.py +++ b/autogpt_platform/backend/backend/data/model.py @@ -655,6 +655,9 @@ class NodeExecutionStats(BaseModel): output_token_count: int = 0 extra_cost: int = 0 extra_steps: int = 0 + # Moderation fields + cleared_inputs: Optional[dict[str, list[str]]] = None + cleared_outputs: Optional[dict[str, list[str]]] = None def __iadd__(self, other: "NodeExecutionStats") -> "NodeExecutionStats": """Mutate this instance by adding another NodeExecutionStats.""" diff --git a/autogpt_platform/backend/backend/executor/manager.py b/autogpt_platform/backend/backend/executor/manager.py index d65056dda9..c5c45c2345 100644 --- a/autogpt_platform/backend/backend/executor/manager.py +++ b/autogpt_platform/backend/backend/executor/manager.py @@ -27,7 +27,7 @@ from backend.executor.activity_status_generator import ( ) from backend.executor.utils import LogMetadata from backend.notifications.notifications import queue_notification -from backend.util.exceptions import InsufficientBalanceError +from backend.util.exceptions import InsufficientBalanceError, ModerationError if TYPE_CHECKING: from backend.executor import DatabaseManagerClient, DatabaseManagerAsyncClient @@ -67,6 +67,7 @@ from backend.executor.utils import ( validate_exec, ) from backend.integrations.creds_manager import IntegrationCredentialsManager +from backend.server.v2.AutoMod.manager import automod_manager from backend.util import json from backend.util.clients import ( get_async_execution_event_bus, @@ -759,6 +760,22 @@ class ExecutionProcessor: amount=1, ) + # Input moderation + try: + if moderation_error := asyncio.run_coroutine_threadsafe( + automod_manager.moderate_graph_execution_inputs( + db_client=get_db_async_client(), + graph_exec=graph_exec, + ), + self.node_evaluation_loop, + ).result(timeout=30.0): + raise moderation_error + except asyncio.TimeoutError: + log_metadata.warning( + f"Input moderation timed out for graph execution {graph_exec.graph_exec_id}, bypassing moderation and continuing execution" + ) + # Continue execution without moderation + # ------------------------------------------------------------ # Pre‑populate queue --------------------------------------- # ------------------------------------------------------------ @@ -897,6 +914,25 @@ class ExecutionProcessor: time.sleep(0.1) # loop done -------------------------------------------------- + + # Output moderation + try: + if moderation_error := asyncio.run_coroutine_threadsafe( + automod_manager.moderate_graph_execution_outputs( + db_client=get_db_async_client(), + graph_exec_id=graph_exec.graph_exec_id, + user_id=graph_exec.user_id, + graph_id=graph_exec.graph_id, + ), + self.node_evaluation_loop, + ).result(timeout=30.0): + raise moderation_error + except asyncio.TimeoutError: + log_metadata.warning( + f"Output moderation timed out for graph execution {graph_exec.graph_exec_id}, bypassing moderation and continuing execution" + ) + # Continue execution without moderation + # Determine final execution status based on whether there was an error or termination if cancel.is_set(): execution_status = ExecutionStatus.TERMINATED @@ -917,11 +953,12 @@ class ExecutionProcessor: else Exception(f"{e.__class__.__name__}: {e}") ) - known_errors = (InsufficientBalanceError,) + known_errors = (InsufficientBalanceError, ModerationError) if isinstance(error, known_errors): execution_stats.error = str(error) return ExecutionStatus.FAILED + execution_status = ExecutionStatus.FAILED log_metadata.exception( f"Failed graph execution {graph_exec.graph_exec_id}: {error}" ) diff --git a/autogpt_platform/backend/backend/server/v2/AutoMod/__init__.py b/autogpt_platform/backend/backend/server/v2/AutoMod/__init__.py new file mode 100644 index 0000000000..c721d22efa --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/AutoMod/__init__.py @@ -0,0 +1 @@ +# AutoMod integration for content moderation diff --git a/autogpt_platform/backend/backend/server/v2/AutoMod/manager.py b/autogpt_platform/backend/backend/server/v2/AutoMod/manager.py new file mode 100644 index 0000000000..fa84d041cd --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/AutoMod/manager.py @@ -0,0 +1,353 @@ +import asyncio +import json +import logging +from typing import TYPE_CHECKING, Any, Literal + +if TYPE_CHECKING: + from backend.executor import DatabaseManagerAsyncClient + +from autogpt_libs.feature_flag.client import is_feature_enabled +from pydantic import ValidationError + +from backend.data.execution import ExecutionStatus +from backend.server.v2.AutoMod.models import ( + AutoModRequest, + AutoModResponse, + ModerationConfig, +) +from backend.util.exceptions import ModerationError +from backend.util.request import Requests +from backend.util.settings import Settings + +logger = logging.getLogger(__name__) + + +class AutoModManager: + + def __init__(self): + self.config = self._load_config() + + def _load_config(self) -> ModerationConfig: + """Load AutoMod configuration from settings""" + settings = Settings() + return ModerationConfig( + enabled=settings.config.automod_enabled, + api_url=settings.config.automod_api_url, + api_key=settings.secrets.automod_api_key, + timeout=settings.config.automod_timeout, + retry_attempts=settings.config.automod_retry_attempts, + retry_delay=settings.config.automod_retry_delay, + fail_open=settings.config.automod_fail_open, + ) + + async def moderate_graph_execution_inputs( + self, db_client: "DatabaseManagerAsyncClient", graph_exec, timeout: int = 10 + ) -> Exception | None: + """ + Complete input moderation flow for graph execution + Returns: error_if_failed (None means success) + """ + if not self.config.enabled: + return None + + # Check if AutoMod feature is enabled for this user + if not is_feature_enabled("AutoMod", graph_exec.user_id, default=False): + logger.debug(f"AutoMod feature not enabled for user {graph_exec.user_id}") + return None + + # Get graph model and collect all inputs + graph_model = await db_client.get_graph( + graph_exec.graph_id, + user_id=graph_exec.user_id, + version=graph_exec.graph_version, + ) + + if not graph_model or not graph_model.nodes: + return None + + all_inputs = [] + for node in graph_model.nodes: + if node.input_default: + all_inputs.extend(str(v) for v in node.input_default.values() if v) + if (masks := graph_exec.nodes_input_masks) and (mask := masks.get(node.id)): + all_inputs.extend(str(v) for v in mask.values() if v) + + if not all_inputs: + return None + + # Combine all content and moderate directly + content = " ".join(all_inputs) + + # Run moderation + logger.warning( + f"Moderating inputs for graph execution {graph_exec.graph_exec_id}" + ) + try: + moderation_passed = await self._moderate_content( + content, + { + "user_id": graph_exec.user_id, + "graph_id": graph_exec.graph_id, + "graph_exec_id": graph_exec.graph_exec_id, + "moderation_type": "execution_input", + }, + ) + + if not moderation_passed: + logger.warning( + f"Moderation failed for graph execution {graph_exec.graph_exec_id}" + ) + # Update node statuses for frontend display before raising error + await self._update_failed_nodes_for_moderation( + db_client, graph_exec.graph_exec_id, "input" + ) + + return ModerationError( + message="Execution failed due to input content moderation", + user_id=graph_exec.user_id, + graph_exec_id=graph_exec.graph_exec_id, + moderation_type="input", + ) + + return None + + except asyncio.TimeoutError: + logger.warning( + f"Input moderation timed out for graph execution {graph_exec.graph_exec_id}, bypassing moderation" + ) + return None # Bypass moderation on timeout + except Exception as e: + logger.warning(f"Input moderation execution failed: {e}") + return ModerationError( + message="Execution failed due to input content moderation error", + user_id=graph_exec.user_id, + graph_exec_id=graph_exec.graph_exec_id, + moderation_type="input", + ) + + async def moderate_graph_execution_outputs( + self, + db_client: "DatabaseManagerAsyncClient", + graph_exec_id: str, + user_id: str, + graph_id: str, + timeout: int = 10, + ) -> Exception | None: + """ + Complete output moderation flow for graph execution + Returns: error_if_failed (None means success) + """ + if not self.config.enabled: + return None + + # Check if AutoMod feature is enabled for this user + if not is_feature_enabled("AutoMod", user_id, default=False): + logger.debug(f"AutoMod feature not enabled for user {user_id}") + return None + + # Get completed executions and collect outputs + completed_executions = await db_client.get_node_executions( + graph_exec_id, statuses=[ExecutionStatus.COMPLETED], include_exec_data=True + ) + + if not completed_executions: + return None + + all_outputs = [] + for exec_entry in completed_executions: + if exec_entry.output_data: + all_outputs.extend(str(v) for v in exec_entry.output_data.values() if v) + + if not all_outputs: + return None + + # Combine all content and moderate directly + content = " ".join(all_outputs) + + # Run moderation + logger.warning(f"Moderating outputs for graph execution {graph_exec_id}") + try: + moderation_passed = await self._moderate_content( + content, + { + "user_id": user_id, + "graph_id": graph_id, + "graph_exec_id": graph_exec_id, + "moderation_type": "execution_output", + }, + ) + + if not moderation_passed: + logger.warning(f"Moderation failed for graph execution {graph_exec_id}") + # Update node statuses for frontend display before raising error + await self._update_failed_nodes_for_moderation( + db_client, graph_exec_id, "output" + ) + + return ModerationError( + message="Execution failed due to output content moderation", + user_id=user_id, + graph_exec_id=graph_exec_id, + moderation_type="output", + ) + + return None + + except asyncio.TimeoutError: + logger.warning( + f"Output moderation timed out for graph execution {graph_exec_id}, bypassing moderation" + ) + return None # Bypass moderation on timeout + except Exception as e: + logger.warning(f"Output moderation execution failed: {e}") + return ModerationError( + message="Execution failed due to output content moderation error", + user_id=user_id, + graph_exec_id=graph_exec_id, + moderation_type="output", + ) + + async def _update_failed_nodes_for_moderation( + self, + db_client: "DatabaseManagerAsyncClient", + graph_exec_id: str, + moderation_type: Literal["input", "output"], + ): + """Update node execution statuses for frontend display when moderation fails""" + # Import here to avoid circular imports + from backend.executor.manager import send_async_execution_update + + if moderation_type == "input": + # For input moderation, mark queued/running/incomplete nodes as failed + target_statuses = [ + ExecutionStatus.QUEUED, + ExecutionStatus.RUNNING, + ExecutionStatus.INCOMPLETE, + ] + else: + # For output moderation, mark completed nodes as failed + target_statuses = [ExecutionStatus.COMPLETED] + + # Get the executions that need to be updated + executions_to_update = await db_client.get_node_executions( + graph_exec_id, statuses=target_statuses, include_exec_data=True + ) + + if not executions_to_update: + return + + # Prepare database update tasks + exec_updates = [] + for exec_entry in executions_to_update: + # Collect all input and output names to clear + cleared_inputs = {} + cleared_outputs = {} + + if exec_entry.input_data: + for name in exec_entry.input_data.keys(): + cleared_inputs[name] = ["Failed due to content moderation"] + + if exec_entry.output_data: + for name in exec_entry.output_data.keys(): + cleared_outputs[name] = ["Failed due to content moderation"] + + # Add update task to list + exec_updates.append( + db_client.update_node_execution_status( + exec_entry.node_exec_id, + status=ExecutionStatus.FAILED, + stats={ + "error": "Failed due to content moderation", + "cleared_inputs": cleared_inputs, + "cleared_outputs": cleared_outputs, + }, + ) + ) + + # Execute all database updates in parallel + updated_execs = await asyncio.gather(*exec_updates) + + # Send all websocket updates in parallel + await asyncio.gather( + *[ + send_async_execution_update(updated_exec) + for updated_exec in updated_execs + ] + ) + + async def _moderate_content(self, content: str, metadata: dict[str, Any]) -> bool: + """Moderate content using AutoMod API + + Returns: + True: Content approved or timeout occurred + False: Content rejected by moderation + + Raises: + asyncio.TimeoutError: When moderation times out (should be bypassed) + """ + try: + request_data = AutoModRequest( + type="text", + content=content, + metadata=metadata, + ) + + response = await self._make_request(request_data) + + if response.success and response.status == "approved": + logger.debug( + f"Content approved for {metadata.get('graph_exec_id', 'unknown')}" + ) + return True + else: + reasons = [r.reason for r in response.moderation_results if r.reason] + error_msg = f"Content rejected by AutoMod: {'; '.join(reasons)}" + logger.warning(f"Content rejected: {error_msg}") + return False + + except asyncio.TimeoutError: + # Re-raise timeout to be handled by calling methods + logger.warning( + f"AutoMod API timeout for {metadata.get('graph_exec_id', 'unknown')}" + ) + raise + except Exception as e: + logger.error(f"AutoMod moderation error: {e}") + return self.config.fail_open + + async def _make_request(self, request_data: AutoModRequest) -> AutoModResponse: + """Make HTTP request to AutoMod API using the standard request utility""" + url = f"{self.config.api_url}/moderate" + headers = { + "Content-Type": "application/json", + "X-API-Key": self.config.api_key, + } + + # Create requests instance with timeout and retry configuration + requests = Requests( + extra_headers=headers, + retry_max_wait=float(self.config.timeout), + ) + + try: + response = await requests.post( + url, json=request_data.model_dump(), timeout=self.config.timeout + ) + + response_data = response.json() + return AutoModResponse.model_validate(response_data) + + except asyncio.TimeoutError: + # Re-raise timeout error to be caught by _moderate_content + raise + except (json.JSONDecodeError, ValidationError) as e: + raise Exception(f"Invalid response from AutoMod API: {e}") + except Exception as e: + # Check if this is an aiohttp timeout that we should convert + if "timeout" in str(e).lower(): + raise asyncio.TimeoutError(f"AutoMod API request timed out: {e}") + raise Exception(f"AutoMod API request failed: {e}") + + +# Global instance +automod_manager = AutoModManager() diff --git a/autogpt_platform/backend/backend/server/v2/AutoMod/models.py b/autogpt_platform/backend/backend/server/v2/AutoMod/models.py new file mode 100644 index 0000000000..4ccb2c1bd9 --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/AutoMod/models.py @@ -0,0 +1,57 @@ +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + + +class AutoModRequest(BaseModel): + """Request model for AutoMod API""" + + type: str = Field(..., description="Content type - 'text', 'image', 'video'") + content: str = Field(..., description="The content to moderate") + metadata: Optional[Dict[str, Any]] = Field( + default=None, description="Additional context about the content" + ) + + +class ModerationResult(BaseModel): + """Individual moderation result""" + + decision: str = Field( + ..., description="Moderation decision: 'approved', 'rejected', 'flagged'" + ) + reason: Optional[str] = Field(default=None, description="Reason for the decision") + + +class AutoModResponse(BaseModel): + """Response model for AutoMod API""" + + success: bool = Field(..., description="Whether the request was successful") + status: str = Field( + ..., description="Overall status: 'approved', 'rejected', 'flagged', 'pending'" + ) + moderation_results: List[ModerationResult] = Field( + default_factory=list, description="List of moderation results" + ) + + +class ModerationConfig(BaseModel): + """Configuration for AutoMod integration""" + + enabled: bool = Field(default=True, description="Whether moderation is enabled") + api_url: str = Field(default="", description="AutoMod API base URL") + api_key: str = Field(..., description="AutoMod API key") + timeout: int = Field(default=30, description="Request timeout in seconds") + retry_attempts: int = Field(default=3, description="Number of retry attempts") + retry_delay: float = Field( + default=1.0, description="Delay between retries in seconds" + ) + fail_open: bool = Field( + default=False, + description="If True, allow execution to continue if moderation fails", + ) + moderate_inputs: bool = Field( + default=True, description="Whether to moderate block inputs" + ) + moderate_outputs: bool = Field( + default=True, description="Whether to moderate block outputs" + ) diff --git a/autogpt_platform/backend/backend/util/exceptions.py b/autogpt_platform/backend/backend/util/exceptions.py index 03766eebe3..b0ffbfe428 100644 --- a/autogpt_platform/backend/backend/util/exceptions.py +++ b/autogpt_platform/backend/backend/util/exceptions.py @@ -33,6 +33,33 @@ class InsufficientBalanceError(ValueError): return self.message +class ModerationError(ValueError): + """Content moderation failure during execution""" + + user_id: str + message: str + graph_exec_id: str + moderation_type: str + + def __init__( + self, + message: str, + user_id: str, + graph_exec_id: str, + moderation_type: str = "content", + ): + super().__init__(message) + self.args = (message, user_id, graph_exec_id, moderation_type) + self.message = message + self.user_id = user_id + self.graph_exec_id = graph_exec_id + self.moderation_type = moderation_type + + def __str__(self): + """Used to display the error message in the frontend, because we str() the error when sending the execution update""" + return self.message + + class GraphValidationError(ValueError): """Structured validation error for graph validation failures""" diff --git a/autogpt_platform/backend/backend/util/settings.py b/autogpt_platform/backend/backend/util/settings.py index c17f8fba42..f5b872c352 100644 --- a/autogpt_platform/backend/backend/util/settings.py +++ b/autogpt_platform/backend/backend/util/settings.py @@ -295,6 +295,32 @@ class Config(UpdateTrackingModel["Config"], BaseSettings): description="Maximum file size in MB for file uploads (1-1024 MB)", ) + # AutoMod configuration + automod_enabled: bool = Field( + default=False, + description="Whether AutoMod content moderation is enabled", + ) + automod_api_url: str = Field( + default="", + description="AutoMod API base URL - Make sure it ends in /api", + ) + automod_timeout: int = Field( + default=30, + description="Timeout in seconds for AutoMod API requests", + ) + automod_retry_attempts: int = Field( + default=3, + description="Number of retry attempts for AutoMod API requests", + ) + automod_retry_delay: float = Field( + default=1.0, + description="Delay between retries for AutoMod API requests in seconds", + ) + automod_fail_open: bool = Field( + default=False, + description="If True, allow execution to continue if AutoMod fails", + ) + @field_validator("platform_base_url", "frontend_base_url") @classmethod def validate_platform_base_url(cls, v: str, info: ValidationInfo) -> str: @@ -495,6 +521,10 @@ class Secrets(UpdateTrackingModel["Secrets"], BaseSettings): apollo_api_key: str = Field(default="", description="Apollo API Key") smartlead_api_key: str = Field(default="", description="SmartLead API Key") zerobounce_api_key: str = Field(default="", description="ZeroBounce API Key") + + # AutoMod API credentials + automod_api_key: str = Field(default="", description="AutoMod API key") + ayrshare_api_key: str = Field(default="", description="Ayrshare API Key") ayrshare_jwt_key: str = Field(default="", description="Ayrshare private Key") # Add more secret fields as needed diff --git a/autogpt_platform/backend/poetry.lock b/autogpt_platform/backend/poetry.lock index 64f43a20db..7cb4904930 100644 --- a/autogpt_platform/backend/poetry.lock +++ b/autogpt_platform/backend/poetry.lock @@ -6737,4 +6737,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<3.13" -content-hash = "05e2b99bd6dc5a74a89df0e1e504853b66bd519062159b0de5fbedf6a1f4d986" +content-hash = "795414d7ce8f288ea6c65893268b5c29a7c9a60ad75cde28ac7bcdb65f426dfe" diff --git a/autogpt_platform/backend/pyproject.toml b/autogpt_platform/backend/pyproject.toml index c63e13bc29..3ff03a3d69 100644 --- a/autogpt_platform/backend/pyproject.toml +++ b/autogpt_platform/backend/pyproject.toml @@ -10,6 +10,7 @@ packages = [{ include = "backend", format = "sdist" }] [tool.poetry.dependencies] python = ">=3.10,<3.13" aio-pika = "^9.5.5" +aiohttp = "^3.10.0" aiodns = "^3.5.0" anthropic = "^0.59.0" apscheduler = "^3.11.0"