feat(backend/AM): Integrate AutoMod content moderation (#10539)

Copy of [feat(backend/AM): Integrate AutoMod content moderation - By
Bentlybro - PR
#10490](https://github.com/Significant-Gravitas/AutoGPT/pull/10490) cos
i messed it up 🤦

Adds AutoMod input and output moderation to the execution flow.
Introduces a new AutoMod manager and models, updates settings for
moderation configuration, and modifies execution result handling to
support moderation-cleared data. Moderation failures now clear sensitive
data and mark executions as failed.

<img width="921" height="816" alt="image"
src="https://github.com/user-attachments/assets/65c0fee8-d652-42bc-9553-ff507bc067c5"
/>


### Changes 🏗️

I have made some small changes to
``autogpt_platform\backend\backend\executor\manager.py`` to send the
needed into to the AutoMod system which collects the data, combines and
makes the api call to AM and based on its reply lets it run or not!

I also had to make small changes to
``autogpt_platform\backend\backend\data\execution.py`` to add checks
that allow me to clear the content from the blocks if it was flagged

I am working on finalizing the AM repo then that will be public

To note: we will want to set this up behind launch darkly first for
testing on the team before we roll it out any more

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
- [x] Setup and run the platform with ``automod_enabled`` set to False
and it works normally
- [x] Setup and run the platform with ``automod_enabled`` set to True,
set the AM URL and API Key and test it runs safe blocks normally
- [x] Test AM with content that would trigger it to flag and watch it
stop and clear all the blocks outputs

Message @Bentlybro for the URL and an API key to AM for local testing!

## Changes made to Settings.py 

I have added a few new options to the settings.py for AutoMod Config!

```
    # AutoMod configuration
    automod_enabled: bool = Field(
        default=False,
        description="Whether AutoMod content moderation is enabled",
    )
    automod_api_url: str = Field(
        default="",
        description="AutoMod API base URL - Make sure it ends in /api",
    )
    automod_timeout: int = Field(
        default=30,
        description="Timeout in seconds for AutoMod API requests",
    )
    automod_retry_attempts: int = Field(
        default=3,
        description="Number of retry attempts for AutoMod API requests",
    )
    automod_retry_delay: float = Field(
        default=1.0,
        description="Delay between retries for AutoMod API requests in seconds",
    )
    automod_fail_open: bool = Field(
        default=False,
        description="If True, allow execution to continue if AutoMod fails",
    )
    automod_moderate_inputs: bool = Field(
        default=True,
        description="Whether to moderate block inputs",
    )
    automod_moderate_outputs: bool = Field(
        default=True,
        description="Whether to moderate block outputs",
    )
```
and
```
automod_api_key: str = Field(default="", description="AutoMod API key")
```

---------

Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
This commit is contained in:
Bently
2025-08-11 10:39:28 +01:00
committed by GitHub
parent d4b5508ed1
commit 28d85ad61c
10 changed files with 531 additions and 10 deletions

View File

@@ -33,7 +33,7 @@ from prisma.types import (
AgentNodeExecutionUpdateInput,
AgentNodeExecutionWhereInput,
)
from pydantic import BaseModel, ConfigDict, JsonValue
from pydantic import BaseModel, ConfigDict, JsonValue, ValidationError
from pydantic.fields import Field
from backend.server.v2.store.exceptions import DatabaseError
@@ -59,7 +59,7 @@ from .includes import (
GRAPH_EXECUTION_INCLUDE_WITH_NODES,
graph_execution_include,
)
from .model import GraphExecutionStats
from .model import GraphExecutionStats, NodeExecutionStats
T = TypeVar("T")
@@ -318,18 +318,30 @@ class NodeExecutionResult(BaseModel):
@staticmethod
def from_db(_node_exec: AgentNodeExecution, user_id: Optional[str] = None):
if _node_exec.executionData:
# Execution that has been queued for execution will persist its data.
try:
stats = NodeExecutionStats.model_validate(_node_exec.stats or {})
except (ValueError, ValidationError):
stats = NodeExecutionStats()
if stats.cleared_inputs:
input_data: BlockInput = defaultdict()
for name, messages in stats.cleared_inputs.items():
input_data[name] = messages[-1] if messages else ""
elif _node_exec.executionData:
input_data = type_utils.convert(_node_exec.executionData, dict[str, Any])
else:
# For incomplete execution, executionData will not be yet available.
input_data: BlockInput = defaultdict()
for data in _node_exec.Input or []:
input_data[data.name] = type_utils.convert(data.data, type[Any])
output_data: CompletedBlockOutput = defaultdict(list)
for data in _node_exec.Output or []:
output_data[data.name].append(type_utils.convert(data.data, type[Any]))
if stats.cleared_outputs:
for name, messages in stats.cleared_outputs.items():
output_data[name].extend(messages)
else:
for data in _node_exec.Output or []:
output_data[data.name].append(type_utils.convert(data.data, type[Any]))
graph_execution: AgentGraphExecution | None = _node_exec.GraphExecution
if graph_execution:

View File

@@ -655,6 +655,9 @@ class NodeExecutionStats(BaseModel):
output_token_count: int = 0
extra_cost: int = 0
extra_steps: int = 0
# Moderation fields
cleared_inputs: Optional[dict[str, list[str]]] = None
cleared_outputs: Optional[dict[str, list[str]]] = None
def __iadd__(self, other: "NodeExecutionStats") -> "NodeExecutionStats":
"""Mutate this instance by adding another NodeExecutionStats."""

View File

@@ -27,7 +27,7 @@ from backend.executor.activity_status_generator import (
)
from backend.executor.utils import LogMetadata
from backend.notifications.notifications import queue_notification
from backend.util.exceptions import InsufficientBalanceError
from backend.util.exceptions import InsufficientBalanceError, ModerationError
if TYPE_CHECKING:
from backend.executor import DatabaseManagerClient, DatabaseManagerAsyncClient
@@ -67,6 +67,7 @@ from backend.executor.utils import (
validate_exec,
)
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.server.v2.AutoMod.manager import automod_manager
from backend.util import json
from backend.util.clients import (
get_async_execution_event_bus,
@@ -759,6 +760,22 @@ class ExecutionProcessor:
amount=1,
)
# Input moderation
try:
if moderation_error := asyncio.run_coroutine_threadsafe(
automod_manager.moderate_graph_execution_inputs(
db_client=get_db_async_client(),
graph_exec=graph_exec,
),
self.node_evaluation_loop,
).result(timeout=30.0):
raise moderation_error
except asyncio.TimeoutError:
log_metadata.warning(
f"Input moderation timed out for graph execution {graph_exec.graph_exec_id}, bypassing moderation and continuing execution"
)
# Continue execution without moderation
# ------------------------------------------------------------
# Prepopulate queue ---------------------------------------
# ------------------------------------------------------------
@@ -897,6 +914,25 @@ class ExecutionProcessor:
time.sleep(0.1)
# loop done --------------------------------------------------
# Output moderation
try:
if moderation_error := asyncio.run_coroutine_threadsafe(
automod_manager.moderate_graph_execution_outputs(
db_client=get_db_async_client(),
graph_exec_id=graph_exec.graph_exec_id,
user_id=graph_exec.user_id,
graph_id=graph_exec.graph_id,
),
self.node_evaluation_loop,
).result(timeout=30.0):
raise moderation_error
except asyncio.TimeoutError:
log_metadata.warning(
f"Output moderation timed out for graph execution {graph_exec.graph_exec_id}, bypassing moderation and continuing execution"
)
# Continue execution without moderation
# Determine final execution status based on whether there was an error or termination
if cancel.is_set():
execution_status = ExecutionStatus.TERMINATED
@@ -917,11 +953,12 @@ class ExecutionProcessor:
else Exception(f"{e.__class__.__name__}: {e}")
)
known_errors = (InsufficientBalanceError,)
known_errors = (InsufficientBalanceError, ModerationError)
if isinstance(error, known_errors):
execution_stats.error = str(error)
return ExecutionStatus.FAILED
execution_status = ExecutionStatus.FAILED
log_metadata.exception(
f"Failed graph execution {graph_exec.graph_exec_id}: {error}"
)

View File

@@ -0,0 +1 @@
# AutoMod integration for content moderation

View File

@@ -0,0 +1,353 @@
import asyncio
import json
import logging
from typing import TYPE_CHECKING, Any, Literal
if TYPE_CHECKING:
from backend.executor import DatabaseManagerAsyncClient
from autogpt_libs.feature_flag.client import is_feature_enabled
from pydantic import ValidationError
from backend.data.execution import ExecutionStatus
from backend.server.v2.AutoMod.models import (
AutoModRequest,
AutoModResponse,
ModerationConfig,
)
from backend.util.exceptions import ModerationError
from backend.util.request import Requests
from backend.util.settings import Settings
logger = logging.getLogger(__name__)
class AutoModManager:
def __init__(self):
self.config = self._load_config()
def _load_config(self) -> ModerationConfig:
"""Load AutoMod configuration from settings"""
settings = Settings()
return ModerationConfig(
enabled=settings.config.automod_enabled,
api_url=settings.config.automod_api_url,
api_key=settings.secrets.automod_api_key,
timeout=settings.config.automod_timeout,
retry_attempts=settings.config.automod_retry_attempts,
retry_delay=settings.config.automod_retry_delay,
fail_open=settings.config.automod_fail_open,
)
async def moderate_graph_execution_inputs(
self, db_client: "DatabaseManagerAsyncClient", graph_exec, timeout: int = 10
) -> Exception | None:
"""
Complete input moderation flow for graph execution
Returns: error_if_failed (None means success)
"""
if not self.config.enabled:
return None
# Check if AutoMod feature is enabled for this user
if not is_feature_enabled("AutoMod", graph_exec.user_id, default=False):
logger.debug(f"AutoMod feature not enabled for user {graph_exec.user_id}")
return None
# Get graph model and collect all inputs
graph_model = await db_client.get_graph(
graph_exec.graph_id,
user_id=graph_exec.user_id,
version=graph_exec.graph_version,
)
if not graph_model or not graph_model.nodes:
return None
all_inputs = []
for node in graph_model.nodes:
if node.input_default:
all_inputs.extend(str(v) for v in node.input_default.values() if v)
if (masks := graph_exec.nodes_input_masks) and (mask := masks.get(node.id)):
all_inputs.extend(str(v) for v in mask.values() if v)
if not all_inputs:
return None
# Combine all content and moderate directly
content = " ".join(all_inputs)
# Run moderation
logger.warning(
f"Moderating inputs for graph execution {graph_exec.graph_exec_id}"
)
try:
moderation_passed = await self._moderate_content(
content,
{
"user_id": graph_exec.user_id,
"graph_id": graph_exec.graph_id,
"graph_exec_id": graph_exec.graph_exec_id,
"moderation_type": "execution_input",
},
)
if not moderation_passed:
logger.warning(
f"Moderation failed for graph execution {graph_exec.graph_exec_id}"
)
# Update node statuses for frontend display before raising error
await self._update_failed_nodes_for_moderation(
db_client, graph_exec.graph_exec_id, "input"
)
return ModerationError(
message="Execution failed due to input content moderation",
user_id=graph_exec.user_id,
graph_exec_id=graph_exec.graph_exec_id,
moderation_type="input",
)
return None
except asyncio.TimeoutError:
logger.warning(
f"Input moderation timed out for graph execution {graph_exec.graph_exec_id}, bypassing moderation"
)
return None # Bypass moderation on timeout
except Exception as e:
logger.warning(f"Input moderation execution failed: {e}")
return ModerationError(
message="Execution failed due to input content moderation error",
user_id=graph_exec.user_id,
graph_exec_id=graph_exec.graph_exec_id,
moderation_type="input",
)
async def moderate_graph_execution_outputs(
self,
db_client: "DatabaseManagerAsyncClient",
graph_exec_id: str,
user_id: str,
graph_id: str,
timeout: int = 10,
) -> Exception | None:
"""
Complete output moderation flow for graph execution
Returns: error_if_failed (None means success)
"""
if not self.config.enabled:
return None
# Check if AutoMod feature is enabled for this user
if not is_feature_enabled("AutoMod", user_id, default=False):
logger.debug(f"AutoMod feature not enabled for user {user_id}")
return None
# Get completed executions and collect outputs
completed_executions = await db_client.get_node_executions(
graph_exec_id, statuses=[ExecutionStatus.COMPLETED], include_exec_data=True
)
if not completed_executions:
return None
all_outputs = []
for exec_entry in completed_executions:
if exec_entry.output_data:
all_outputs.extend(str(v) for v in exec_entry.output_data.values() if v)
if not all_outputs:
return None
# Combine all content and moderate directly
content = " ".join(all_outputs)
# Run moderation
logger.warning(f"Moderating outputs for graph execution {graph_exec_id}")
try:
moderation_passed = await self._moderate_content(
content,
{
"user_id": user_id,
"graph_id": graph_id,
"graph_exec_id": graph_exec_id,
"moderation_type": "execution_output",
},
)
if not moderation_passed:
logger.warning(f"Moderation failed for graph execution {graph_exec_id}")
# Update node statuses for frontend display before raising error
await self._update_failed_nodes_for_moderation(
db_client, graph_exec_id, "output"
)
return ModerationError(
message="Execution failed due to output content moderation",
user_id=user_id,
graph_exec_id=graph_exec_id,
moderation_type="output",
)
return None
except asyncio.TimeoutError:
logger.warning(
f"Output moderation timed out for graph execution {graph_exec_id}, bypassing moderation"
)
return None # Bypass moderation on timeout
except Exception as e:
logger.warning(f"Output moderation execution failed: {e}")
return ModerationError(
message="Execution failed due to output content moderation error",
user_id=user_id,
graph_exec_id=graph_exec_id,
moderation_type="output",
)
async def _update_failed_nodes_for_moderation(
self,
db_client: "DatabaseManagerAsyncClient",
graph_exec_id: str,
moderation_type: Literal["input", "output"],
):
"""Update node execution statuses for frontend display when moderation fails"""
# Import here to avoid circular imports
from backend.executor.manager import send_async_execution_update
if moderation_type == "input":
# For input moderation, mark queued/running/incomplete nodes as failed
target_statuses = [
ExecutionStatus.QUEUED,
ExecutionStatus.RUNNING,
ExecutionStatus.INCOMPLETE,
]
else:
# For output moderation, mark completed nodes as failed
target_statuses = [ExecutionStatus.COMPLETED]
# Get the executions that need to be updated
executions_to_update = await db_client.get_node_executions(
graph_exec_id, statuses=target_statuses, include_exec_data=True
)
if not executions_to_update:
return
# Prepare database update tasks
exec_updates = []
for exec_entry in executions_to_update:
# Collect all input and output names to clear
cleared_inputs = {}
cleared_outputs = {}
if exec_entry.input_data:
for name in exec_entry.input_data.keys():
cleared_inputs[name] = ["Failed due to content moderation"]
if exec_entry.output_data:
for name in exec_entry.output_data.keys():
cleared_outputs[name] = ["Failed due to content moderation"]
# Add update task to list
exec_updates.append(
db_client.update_node_execution_status(
exec_entry.node_exec_id,
status=ExecutionStatus.FAILED,
stats={
"error": "Failed due to content moderation",
"cleared_inputs": cleared_inputs,
"cleared_outputs": cleared_outputs,
},
)
)
# Execute all database updates in parallel
updated_execs = await asyncio.gather(*exec_updates)
# Send all websocket updates in parallel
await asyncio.gather(
*[
send_async_execution_update(updated_exec)
for updated_exec in updated_execs
]
)
async def _moderate_content(self, content: str, metadata: dict[str, Any]) -> bool:
"""Moderate content using AutoMod API
Returns:
True: Content approved or timeout occurred
False: Content rejected by moderation
Raises:
asyncio.TimeoutError: When moderation times out (should be bypassed)
"""
try:
request_data = AutoModRequest(
type="text",
content=content,
metadata=metadata,
)
response = await self._make_request(request_data)
if response.success and response.status == "approved":
logger.debug(
f"Content approved for {metadata.get('graph_exec_id', 'unknown')}"
)
return True
else:
reasons = [r.reason for r in response.moderation_results if r.reason]
error_msg = f"Content rejected by AutoMod: {'; '.join(reasons)}"
logger.warning(f"Content rejected: {error_msg}")
return False
except asyncio.TimeoutError:
# Re-raise timeout to be handled by calling methods
logger.warning(
f"AutoMod API timeout for {metadata.get('graph_exec_id', 'unknown')}"
)
raise
except Exception as e:
logger.error(f"AutoMod moderation error: {e}")
return self.config.fail_open
async def _make_request(self, request_data: AutoModRequest) -> AutoModResponse:
"""Make HTTP request to AutoMod API using the standard request utility"""
url = f"{self.config.api_url}/moderate"
headers = {
"Content-Type": "application/json",
"X-API-Key": self.config.api_key,
}
# Create requests instance with timeout and retry configuration
requests = Requests(
extra_headers=headers,
retry_max_wait=float(self.config.timeout),
)
try:
response = await requests.post(
url, json=request_data.model_dump(), timeout=self.config.timeout
)
response_data = response.json()
return AutoModResponse.model_validate(response_data)
except asyncio.TimeoutError:
# Re-raise timeout error to be caught by _moderate_content
raise
except (json.JSONDecodeError, ValidationError) as e:
raise Exception(f"Invalid response from AutoMod API: {e}")
except Exception as e:
# Check if this is an aiohttp timeout that we should convert
if "timeout" in str(e).lower():
raise asyncio.TimeoutError(f"AutoMod API request timed out: {e}")
raise Exception(f"AutoMod API request failed: {e}")
# Global instance
automod_manager = AutoModManager()

View File

@@ -0,0 +1,57 @@
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
class AutoModRequest(BaseModel):
"""Request model for AutoMod API"""
type: str = Field(..., description="Content type - 'text', 'image', 'video'")
content: str = Field(..., description="The content to moderate")
metadata: Optional[Dict[str, Any]] = Field(
default=None, description="Additional context about the content"
)
class ModerationResult(BaseModel):
"""Individual moderation result"""
decision: str = Field(
..., description="Moderation decision: 'approved', 'rejected', 'flagged'"
)
reason: Optional[str] = Field(default=None, description="Reason for the decision")
class AutoModResponse(BaseModel):
"""Response model for AutoMod API"""
success: bool = Field(..., description="Whether the request was successful")
status: str = Field(
..., description="Overall status: 'approved', 'rejected', 'flagged', 'pending'"
)
moderation_results: List[ModerationResult] = Field(
default_factory=list, description="List of moderation results"
)
class ModerationConfig(BaseModel):
"""Configuration for AutoMod integration"""
enabled: bool = Field(default=True, description="Whether moderation is enabled")
api_url: str = Field(default="", description="AutoMod API base URL")
api_key: str = Field(..., description="AutoMod API key")
timeout: int = Field(default=30, description="Request timeout in seconds")
retry_attempts: int = Field(default=3, description="Number of retry attempts")
retry_delay: float = Field(
default=1.0, description="Delay between retries in seconds"
)
fail_open: bool = Field(
default=False,
description="If True, allow execution to continue if moderation fails",
)
moderate_inputs: bool = Field(
default=True, description="Whether to moderate block inputs"
)
moderate_outputs: bool = Field(
default=True, description="Whether to moderate block outputs"
)

View File

@@ -33,6 +33,33 @@ class InsufficientBalanceError(ValueError):
return self.message
class ModerationError(ValueError):
"""Content moderation failure during execution"""
user_id: str
message: str
graph_exec_id: str
moderation_type: str
def __init__(
self,
message: str,
user_id: str,
graph_exec_id: str,
moderation_type: str = "content",
):
super().__init__(message)
self.args = (message, user_id, graph_exec_id, moderation_type)
self.message = message
self.user_id = user_id
self.graph_exec_id = graph_exec_id
self.moderation_type = moderation_type
def __str__(self):
"""Used to display the error message in the frontend, because we str() the error when sending the execution update"""
return self.message
class GraphValidationError(ValueError):
"""Structured validation error for graph validation failures"""

View File

@@ -295,6 +295,32 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
description="Maximum file size in MB for file uploads (1-1024 MB)",
)
# AutoMod configuration
automod_enabled: bool = Field(
default=False,
description="Whether AutoMod content moderation is enabled",
)
automod_api_url: str = Field(
default="",
description="AutoMod API base URL - Make sure it ends in /api",
)
automod_timeout: int = Field(
default=30,
description="Timeout in seconds for AutoMod API requests",
)
automod_retry_attempts: int = Field(
default=3,
description="Number of retry attempts for AutoMod API requests",
)
automod_retry_delay: float = Field(
default=1.0,
description="Delay between retries for AutoMod API requests in seconds",
)
automod_fail_open: bool = Field(
default=False,
description="If True, allow execution to continue if AutoMod fails",
)
@field_validator("platform_base_url", "frontend_base_url")
@classmethod
def validate_platform_base_url(cls, v: str, info: ValidationInfo) -> str:
@@ -495,6 +521,10 @@ class Secrets(UpdateTrackingModel["Secrets"], BaseSettings):
apollo_api_key: str = Field(default="", description="Apollo API Key")
smartlead_api_key: str = Field(default="", description="SmartLead API Key")
zerobounce_api_key: str = Field(default="", description="ZeroBounce API Key")
# AutoMod API credentials
automod_api_key: str = Field(default="", description="AutoMod API key")
ayrshare_api_key: str = Field(default="", description="Ayrshare API Key")
ayrshare_jwt_key: str = Field(default="", description="Ayrshare private Key")
# Add more secret fields as needed

View File

@@ -6737,4 +6737,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<3.13"
content-hash = "05e2b99bd6dc5a74a89df0e1e504853b66bd519062159b0de5fbedf6a1f4d986"
content-hash = "795414d7ce8f288ea6c65893268b5c29a7c9a60ad75cde28ac7bcdb65f426dfe"

View File

@@ -10,6 +10,7 @@ packages = [{ include = "backend", format = "sdist" }]
[tool.poetry.dependencies]
python = ">=3.10,<3.13"
aio-pika = "^9.5.5"
aiohttp = "^3.10.0"
aiodns = "^3.5.0"
anthropic = "^0.59.0"
apscheduler = "^3.11.0"