Updates based on comments

This commit is contained in:
Bentlybro
2025-04-02 13:44:43 +01:00
parent fd4a61f415
commit cb9bedcb67
8 changed files with 87 additions and 66 deletions

View File

@@ -212,6 +212,9 @@ class HMACValidator:
return True
def get_dependency(self):
"""
Returns a callable dependency that FastAPI will recognize as a security scheme
"""
async def validate_signature(request: Request, signature: str = Security(self.header)) -> bool:
return await self(request, signature)

View File

@@ -56,6 +56,7 @@ async def get_user_by_id(user_id: str) -> User:
async def get_user_info_by_id(user_id: str) -> dict:
# TODO: Change return type to a Pydantic model instead of a dict
try:
user = await User.prisma().find_unique(where={"id": user_id})
if not user:

View File

@@ -1081,14 +1081,13 @@ class ExecutionManager(AppService):
)
# Right after creating the graph execution, we need to check if the content is safe
if settings.config.behave_as != BehaveAs.LOCAL:
moderate_graph_content(
graph=graph,
graph_id=graph.id,
graph_exec_id=graph_exec.id,
nodes_input=nodes_input,
user_id=user_id
)
moderate_graph_content(
graph=graph,
graph_id=graph.id,
graph_exec_id=graph_exec.id,
nodes_input=nodes_input,
user_id=user_id
)
self.queue.add(graph_exec_entry)

View File

@@ -4,6 +4,7 @@ from typing import List, Tuple
from backend.data.block import BlockInput, BlockType, get_block
from backend.data.graph import GraphModel
from backend.server.v2.iffy.service import IffyService
from backend.server.v2.iffy.models import BlockContentForModeration
from backend.util.settings import Settings, BehaveAs
logger = logging.getLogger(__name__)
@@ -43,22 +44,28 @@ def moderate_graph_content(
input_data = node.input_default.copy()
# Add any static input from connected nodes
for link in node.input_links:
if link.is_static:
source_node = next((n for n in graph.nodes if n.id == link.source_id), None)
if source_node:
source_block = get_block(source_node.block_id)
if source_block:
input_data[link.sink_name] = source_node.input_default.get(link.source_name)
if not link.is_static:
continue
source_node = next((n for n in graph.nodes if n.id == link.source_id), None)
if not source_node:
continue
source_block = get_block(source_node.block_id)
if not source_block:
continue
input_data[link.sink_name] = source_node.input_default.get(link.source_name)
block_content = {
"graph_id": graph_id,
"graph_exec_id": graph_exec_id,
"node_id": node.id,
"block_id": block.id,
"block_name": block.name,
"block_type": block.block_type.value,
"input_data": input_data
}
block_content = BlockContentForModeration(
graph_id=graph_id,
graph_exec_id=graph_exec_id,
node_id=node.id,
block_id=block.id,
block_name=block.name,
block_type=block.block_type.value,
input_data=input_data
)
# Send to Iffy for moderation
result = IffyService.moderate_content(user_id, block_content)
@@ -68,10 +75,6 @@ def moderate_graph_content(
logger.error(f"Content moderation failed for {block.name}: {result.reason}")
raise ValueError(f"Content moderation failed for {block.name}")
except ValueError as e:
logger.error(f"Moderation error: {str(e)}")
raise
except Exception as e:
logger.error(f"Error during content moderation: {str(e)}")
raise ValueError(f"Content moderation system error")
raise ValueError(f"Content moderation system error")

View File

@@ -33,4 +33,13 @@ class IffyPayload(BaseModel):
class ModerationResult(BaseModel):
is_safe: bool
reason: str
reason: str
class BlockContentForModeration(BaseModel):
graph_id: str
graph_exec_id: str
node_id: str
block_id: str
block_name: str
block_type: str
input_data: Dict[str, Any]

View File

@@ -1,11 +1,9 @@
import hmac
import hashlib
import logging
from typing import Dict, Any
from fastapi import APIRouter, Request, Response, HTTPException, Depends
from backend.util.settings import Settings
from backend.util.service import get_service_client
from .models import EventType, IffyWebhookEvent
from .models import EventType, IffyWebhookEvent, BlockContentForModeration, UserData
from autogpt_libs.auth.middleware import HMACValidator
from autogpt_libs.utils.cache import thread_cached
@@ -26,45 +24,41 @@ def get_execution_manager():
return get_service_client(ExecutionManager)
# This handles the webhook events from iffy like stopping an execution if a flagged block is detected.
async def handle_record_event(event_type: EventType, metadata: Dict[str, Any]) -> Response:
async def handle_record_event(event_type: EventType, block_content: BlockContentForModeration) -> Response:
"""Handle record-related webhook events
If any blocks are flagged, we stop the execution and log the event."""
graph_exec_id = metadata.get("graphExecutionId")
node_id = metadata.get("nodeId")
block_name = metadata.get("blockName", "Unknown Block")
if event_type == EventType.RECORD_FLAGGED:
logger.warning(
f'Content flagged for node "{node_id}" ("{block_name}") '
f'in execution "{graph_exec_id}"'
f'Content flagged for node "{block_content.node_id}" ("{block_content.block_name}") '
f'in execution "{block_content.graph_exec_id}"'
)
execution_manager = get_execution_manager()
try:
execution_manager.cancel_execution(graph_exec_id)
logger.info(f'Successfully stopped execution "{graph_exec_id}" due to flagged content')
execution_manager.cancel_execution(block_content.graph_exec_id)
logger.info(f'Successfully stopped execution "{block_content.graph_exec_id}" due to flagged content')
except Exception as e:
if "not active/running" not in str(e):
logger.error(f"Error cancelling execution processes: {str(e)}")
raise
logger.info(f'Execution "{graph_exec_id}" was already completed/cancelled')
logger.info(f'Execution "{block_content.graph_exec_id}" was already completed/cancelled')
return Response(status_code=200)
elif event_type in (EventType.RECORD_COMPLIANT, EventType.RECORD_UNFLAGGED):
logger.info(
f'Content cleared for node "{node_id}" ("{block_name}") '
f'in execution "{graph_exec_id}"'
f'Content cleared for node "{block_content.node_id}" ("{block_content.block_name}") '
f'in execution "{block_content.graph_exec_id}"'
)
return Response(status_code=200)
async def handle_user_event(event_type: EventType, payload: Dict[str, Any]) -> Response:
async def handle_user_event(event_type: EventType, user_payload: UserData) -> Response:
"""Handle user-related webhook events
For now we are just logging these events from iffy
and replying with a 200 status code to keep iffy happy and to prevent it from retrying the request."""
user_id = payload.get("clientId")
user_id = user_payload.clientId
if not user_id:
logger.error("Received user event without user ID")
raise HTTPException(
@@ -72,8 +66,8 @@ async def handle_user_event(event_type: EventType, payload: Dict[str, Any]) -> R
detail="Missing required field 'clientId' in user event payload"
)
status_updated_at = payload.get("statusUpdatedAt")
status_updated_via = payload.get("statusUpdatedVia")
status_updated_at = user_payload.get("statusUpdatedAt", "unknown time")
status_updated_via = user_payload.get("statusUpdatedVia", "unknown method")
event_messages = {
EventType.USER_SUSPENDED: f'User "{user_id}" has been SUSPENDED via {status_updated_via} at {status_updated_at}',
@@ -105,13 +99,25 @@ async def handle_iffy_webhook(
try:
if event_data.event.startswith("record."):
return await handle_record_event(event_data.event, event_data.payload.get("metadata", {}))
metadata = event_data.payload.get("metadata", {})
block_content = BlockContentForModeration(
graph_id=metadata.get("graphId", ""),
graph_exec_id=metadata.get("graphExecutionId", ""),
node_id=metadata.get("nodeId", ""),
block_id=metadata.get("blockId", ""),
block_name=metadata.get("blockName", "Unknown Block"),
block_type=metadata.get("blockType", ""),
input_data=metadata.get("inputData", {})
)
return await handle_record_event(event_data.event, block_content)
elif event_data.event.startswith("user."):
return await handle_user_event(event_data.event, event_data.payload)
# Create UserData from payload
user_data = UserData(**event_data.payload)
return await handle_user_event(event_data.event, user_data)
else:
logger.info(f"Received unhandled Iffy event: {event_data.event}")
return Response(status_code=200)
except Exception as e:
if "not active/running" in str(e):
return Response(status_code=200)
raise HTTPException(status_code=200, detail=str(e))
raise HTTPException(status_code=200, detail=str(e))

View File

@@ -6,7 +6,7 @@ from typing import Dict, Any
from backend.util.settings import Settings, BehaveAs
from backend.util.openrouter import open_router_moderate_content
from backend.util.service import get_service_client
from .models import UserData, IffyPayload, ModerationResult
from .models import UserData, IffyPayload, ModerationResult, BlockContentForModeration
from autogpt_libs.utils.cache import thread_cached
logger = logging.getLogger(__name__)
@@ -44,14 +44,14 @@ class IffyService:
return user_data
@staticmethod
def moderate_content(user_id: str, block_content: Dict[str, Any]) -> ModerationResult:
def moderate_content(user_id: str, block_content: BlockContentForModeration) -> ModerationResult:
"""
Send block content to Iffy for content moderation.
Only used in cloud mode - local mode skips moderation entirely.
Args:
user_id: The ID of the user executing the block
block_content: The content of the block to be moderated
block_content: The content of the block to be moderated (BlockContentForModeration model)
Returns:
ModerationResult: Result of the moderation check
@@ -67,7 +67,7 @@ class IffyService:
# Validate Iffy API URL and key at the start
if not IFFY_API_URL or not IFFY_API_KEY:
logger.warning("Iffy API URL or key not configured, falling back to OpenRouter moderation")
input_data = json.dumps(block_content.get('input_data', {}), indent=2)
input_data = json.dumps(block_content.input_data, indent=2)
is_safe, reason = open_router_moderate_content(input_data)
return ModerationResult(is_safe=is_safe, reason=f"Iffy not configured. OpenRouter result: {reason}")
@@ -75,7 +75,7 @@ class IffyService:
# Validate URL format
if not IFFY_API_URL.startswith(('http://', 'https://')):
logger.error(f"Invalid Iffy API URL format: {IFFY_API_URL}")
input_data = json.dumps(block_content.get('input_data', {}), indent=2)
input_data = json.dumps(block_content.input_data, indent=2)
is_safe, reason = open_router_moderate_content(input_data)
return ModerationResult(is_safe=is_safe, reason="Invalid Iffy API URL format")
@@ -84,20 +84,20 @@ class IffyService:
"Content-Type": "application/json"
}
input_data = json.dumps(block_content.get('input_data', {}), indent=2)
input_data = json.dumps(block_content.input_data, indent=2)
user_data = IffyService.get_user_data(user_id)
# Prepare the metadata
metadata = {
"graphId": str(block_content.get('graph_id', '')),
"graphExecutionId": str(block_content['graph_exec_id']),
"nodeId": str(block_content['node_id']),
"blockId": str(block_content['block_id']),
"blockName": str(block_content['block_name']),
"graphId": str(block_content.graph_id),
"graphExecutionId": str(block_content.graph_exec_id),
"nodeId": str(block_content.node_id),
"blockId": str(block_content.block_id),
"blockName": str(block_content.block_name),
}
name = f"{block_content['block_name']}-{block_content['block_id']}"
graph_execution_id = f"{block_content['graph_exec_id']}-{block_content['node_id']}"
name = f"{block_content.block_name}-{block_content.block_id}"
graph_execution_id = f"{block_content.graph_exec_id}-{block_content.node_id}"
# Create the payload
payload = IffyPayload(
@@ -133,7 +133,7 @@ class IffyService:
except Exception as e:
logger.error(f"Error in primary moderation service: {str(e)}", exc_info=True)
try:
input_data = json.dumps(block_content.get('input_data', {}), indent=2)
input_data = json.dumps(block_content.input_data, indent=2)
is_safe, reason = open_router_moderate_content(input_data)
if is_safe:
logger.info(f"OpenRouter moderation passed after Iffy failure. Block: {name}")

View File

@@ -99,4 +99,4 @@ def open_router_moderate_content(content: str, user_id: str | None = None) -> Tu
except Exception as e:
logger.error(f"Error in OpenRouter moderation: {str(e)}", exc_info=True)
return False, f"Moderation error: {str(e)}"
return False, f"Moderation error: {str(e)}"