Some more updates based on comments + work on removing async

This commit is contained in:
Bentlybro
2025-03-26 18:26:43 +00:00
parent 5ad9485a77
commit 982b8b6945
6 changed files with 105 additions and 100 deletions

View File

@@ -57,12 +57,14 @@ async def get_user_by_id(user_id: str) -> User:
async def get_user_info_by_id(user_id: str) -> dict:
try:
user = await prisma.user.find_unique(where={"id": user_id})
user = await User.prisma().find_unique(where={"id": user_id})
if not user:
raise ValueError(f"User with ID {user_id} not found")
return {
"id": getattr(user, "id", None),
"name": getattr(user, "name", None),
"username": getattr(user, "username", None),
"email": getattr(user, "email", None),
"id": user.id,
"name": user.name,
"email": user.email,
}
except Exception as e:
raise DatabaseError(f"Failed to get user info for user {user_id}: {e}") from e

View File

@@ -22,6 +22,7 @@ from backend.data.notifications import (
NotificationType,
)
from backend.util.exceptions import InsufficientBalanceError
from backend.server.v2.iffy.graph_moderation import moderate_graph_content
if TYPE_CHECKING:
from backend.executor import DatabaseManager
@@ -68,7 +69,6 @@ from backend.util.service import (
)
from backend.util.settings import Settings, BehaveAs
from backend.util.type import convert
from backend.server.v2.iffy.service import IffyService
logger = logging.getLogger(__name__)
settings = Settings()
@@ -957,19 +957,6 @@ class ExecutionManager(AppService):
user_id: str,
graph_version: Optional[int] = None,
preset_id: str | None = None,
) -> GraphExecutionEntry:
"""Add a graph execution to the queue"""
return self.run_and_wait(
self._add_execution_async(graph_id, data, user_id, graph_version, preset_id)
)
async def _add_execution_async(
self,
graph_id: str,
data: BlockInput,
user_id: str,
graph_version: Optional[int] = None,
preset_id: str | None = None,
) -> GraphExecutionEntry:
graph: GraphModel | None = self.db_client.get_graph(
graph_id=graph_id, user_id=user_id, version=graph_version
@@ -1028,12 +1015,14 @@ class ExecutionManager(AppService):
# Right after creating the graph execution, we need to check if the content is safe
if settings.config.behave_as != BehaveAs.LOCAL:
await self._moderate_graph_content(
self.run_and_wait(
moderate_graph_content(
graph=graph,
graph_id=graph_id,
graph_exec_id=graph_exec_id,
nodes_input=nodes_input,
user_id=user_id
)
)
starting_node_execs = []
@@ -1145,74 +1134,6 @@ class ExecutionManager(AppService):
"type/provider mismatch"
)
async def _moderate_graph_content(
self,
graph: GraphModel,
graph_id: str,
graph_exec_id: str,
nodes_input: list[tuple[str, BlockInput]],
user_id: str
) -> None:
"""
Moderate the content of a graph before execution.
Args:
graph: The graph model to moderate
graph_id: The ID of the graph
graph_exec_id: The ID of the graph execution
nodes_input: Input data for starting nodes
user_id: The ID of the user running the graph
Raises:
ValueError: If content moderation fails
"""
try:
for node in graph.nodes:
block = get_block(node.block_id)
if not block or block.block_type == BlockType.NOTE:
continue
# For starting nodes, use their input data
if node.id in dict(nodes_input):
input_data = dict(nodes_input)[node.id]
else:
# For non-starting nodes, collect their default inputs and static values
input_data = node.input_default.copy()
# Add any static input from connected nodes
for link in node.input_links:
if link.is_static:
source_node = next((n for n in graph.nodes if n.id == link.source_id), None)
if source_node:
source_block = get_block(source_node.block_id)
if source_block:
input_data[link.sink_name] = source_node.input_default.get(link.source_name)
block_content = {
"graph_id": graph_id,
"graph_exec_id": graph_exec_id,
"node_id": node.id,
"block_id": block.id,
"block_name": block.name,
"block_type": block.block_type.value,
"input_data": input_data
}
# Send to Iffy for moderation using the service directly
result = await IffyService._moderate_content(user_id, block_content)
# CRITICAL: Ensure we never proceed if moderation fails
if not result.is_safe:
logger.error(f"Content moderation failed for {block.name}: {result.reason}")
raise ValueError(f"Content moderation failed for {block.name}")
except ValueError as ve:
logger.error(f"Moderation error: {str(ve)}")
raise
except Exception as e:
logger.error(f"Error during content moderation: {str(e)}")
raise ValueError(f"Content moderation system error")
# ------- UTILITIES ------- #

View File

@@ -0,0 +1,77 @@
import logging
from typing import List, Tuple
from backend.data.block import BlockInput, BlockType, get_block
from backend.data.graph import GraphModel
from backend.server.v2.iffy.service import IffyService
from backend.util.settings import Settings, BehaveAs
logger = logging.getLogger(__name__)
settings = Settings()
async def moderate_graph_content(
graph: GraphModel,
graph_id: str,
graph_exec_id: str,
nodes_input: List[Tuple[str, BlockInput]],
user_id: str
) -> None:
"""
Moderate the content of a graph before execution.
Args:
graph: The graph model to moderate
graph_id: The ID of the graph
graph_exec_id: The ID of the graph execution
nodes_input: Input data for starting nodes
user_id: The ID of the user running the graph
"""
if settings.config.behave_as == BehaveAs.LOCAL:
return
try:
for node in graph.nodes:
block = get_block(node.block_id)
if not block or block.block_type == BlockType.NOTE:
continue
# For starting nodes, use their input data
if node.id in dict(nodes_input):
input_data = dict(nodes_input)[node.id]
else:
# For non-starting nodes, collect their default inputs and static values
input_data = node.input_default.copy()
# Add any static input from connected nodes
for link in node.input_links:
if link.is_static:
source_node = next((n for n in graph.nodes if n.id == link.source_id), None)
if source_node:
source_block = get_block(source_node.block_id)
if source_block:
input_data[link.sink_name] = source_node.input_default.get(link.source_name)
block_content = {
"graph_id": graph_id,
"graph_exec_id": graph_exec_id,
"node_id": node.id,
"block_id": block.id,
"block_name": block.name,
"block_type": block.block_type.value,
"input_data": input_data
}
# Send to Iffy for moderation
result = await IffyService.moderate_content(user_id, block_content)
# CRITICAL: Ensure we never proceed if moderation fails
if not result.is_safe:
logger.error(f"Content moderation failed for {block.name}: {result.reason}")
raise ValueError(f"Content moderation failed for {block.name}")
except ValueError as e:
logger.error(f"Moderation error: {str(e)}")
raise
except Exception as e:
logger.error(f"Error during content moderation: {str(e)}")
raise ValueError(f"Content moderation system error")

View File

@@ -2,12 +2,12 @@ import hmac
import hashlib
import logging
from typing import Dict, Any
from fastapi import APIRouter, Request, Response, HTTPException, Header, Depends
from fastapi import APIRouter, Request, Response, HTTPException, Depends
from backend.util.settings import Settings
from backend.util.service import get_service_client
from backend.executor import ExecutionManager
from .models import EventType, IffyWebhookEvent
from autogpt_libs.auth.middleware import HMACValidator
from autogpt_libs.utils.cache import thread_cached
logger = logging.getLogger(__name__)
settings = Settings()
@@ -20,6 +20,11 @@ iffy_signature_validator = HMACValidator(
error_message="Invalid Iffy signature"
)
@thread_cached
def get_execution_manager():
from backend.executor import ExecutionManager
return get_service_client(ExecutionManager)
# This handles the webhook events from iffy like stopping an execution if a flagged block is detected.
async def handle_record_event(event_type: EventType, metadata: Dict[str, Any]) -> Response:
"""Handle record-related webhook events
@@ -34,7 +39,7 @@ async def handle_record_event(event_type: EventType, metadata: Dict[str, Any]) -
f'Content flagged for node "{node_id}" ("{block_name}") '
f'in execution "{graph_exec_id}"'
)
execution_manager = get_service_client(ExecutionManager)
execution_manager = get_execution_manager()
try:
execution_manager.cancel_execution(graph_exec_id)
logger.info(f'Successfully stopped execution "{graph_exec_id}" due to flagged content')

View File

@@ -7,10 +7,12 @@ from backend.util.settings import Settings, BehaveAs
from backend.util.openrouter import open_router_moderate_content
from backend.util.service import get_service_client
from .models import UserData, IffyPayload, ModerationResult
from autogpt_libs.utils.cache import thread_cached
logger = logging.getLogger(__name__)
settings = Settings()
@thread_cached
def get_db():
from backend.executor.database import DatabaseManager
return get_service_client(DatabaseManager)
@@ -26,7 +28,6 @@ class IffyService:
"clientId": user_id,
"email": None,
"name": None,
"username": None,
}
try:
@@ -35,7 +36,6 @@ class IffyService:
user_data.update({
"id": user["id"],
"name": user["name"],
"username": user["username"],
"email": user["email"],
})
except Exception as e:
@@ -44,7 +44,7 @@ class IffyService:
return user_data
@staticmethod
async def _moderate_content(user_id: str, block_content: Dict[str, Any]) -> ModerationResult:
async def moderate_content(user_id: str, block_content: Dict[str, Any]) -> ModerationResult:
"""
Send block content to Iffy for content moderation.
Only used in cloud mode - local mode skips moderation entirely.
@@ -67,7 +67,7 @@ class IffyService:
# Validate Iffy API URL and key at the start
if not IFFY_API_URL or not IFFY_API_KEY:
logger.warning("Iffy API URL or key not configured, falling back to OpenRouter moderation")
is_safe, reason = await open_router_moderate_content(json.dumps(block_content.get('input_data', {}), indent=2))
is_safe, reason = open_router_moderate_content(json.dumps(block_content.get('input_data', {}), indent=2))
if not is_safe:
logger.error(f"OpenRouter moderation failed after Iffy configuration issue: {reason}")
return ModerationResult(is_safe=is_safe, reason=f"Iffy not configured. OpenRouter result: {reason}")
@@ -76,7 +76,7 @@ class IffyService:
# Validate URL format
if not IFFY_API_URL.startswith(('http://', 'https://')):
logger.error(f"Invalid Iffy API URL format: {IFFY_API_URL}")
is_safe, reason = await open_router_moderate_content(json.dumps(block_content.get('input_data', {}), indent=2))
is_safe, reason = open_router_moderate_content(json.dumps(block_content.get('input_data', {}), indent=2))
return ModerationResult(is_safe=is_safe, reason="Invalid Iffy API URL format")
headers = {
@@ -125,7 +125,7 @@ class IffyService:
if response.status != 200:
logger.info(f"Iffy moderation failed, falling back to OpenRouter. Status: {response.status}, Response: {response_text}")
# Fall back to OpenRouter moderation
is_safe, reason = await open_router_moderate_content(input_data)
is_safe, reason = open_router_moderate_content(input_data)
if is_safe:
logger.info(f"OpenRouter moderation passed. Block: {name}")
else:
@@ -139,7 +139,7 @@ class IffyService:
logger.error(f"Error in primary moderation service: {str(e)}", exc_info=True)
try:
# Last attempt with OpenRouter
is_safe, reason = await open_router_moderate_content(json.dumps(block_content.get('input_data', {}), indent=2))
is_safe, reason = open_router_moderate_content(json.dumps(block_content.get('input_data', {}), indent=2))
if is_safe:
logger.info(f"OpenRouter moderation passed after Iffy failure. Block: {name}")
else:
@@ -148,4 +148,4 @@ class IffyService:
except Exception as e2:
reason = f"Both moderation services failed. Error: {str(e2)}"
logger.error(f"{reason}. Block: {name}", exc_info=True)
return ModerationResult(is_safe=False, reason=reason)
return ModerationResult(is_safe=False, reason=reason)

View File

@@ -15,7 +15,7 @@ Content to moderate:MODERATION_PROMPT
Respond with only one word from the above choices."""
async def open_router_moderate_content(content: str, user_id: str | None = None) -> Tuple[bool, str]:
def open_router_moderate_content(content: str, user_id: str | None = None) -> Tuple[bool, str]:
"""
Use OpenRouter's API to moderate content using an LLM.
Uses OpenRouter's auto-routing to select the best available model.