Replace `moderate_graph_content with moderate_block_content` and fix api calls to iffy

This commit is contained in:
Bentlybro
2025-06-03 19:07:48 +01:00
parent 0cd8ae4ec4
commit 718bb085ad
5 changed files with 77 additions and 89 deletions

View File

@@ -116,6 +116,7 @@ class DatabaseManager(AppService):
update_user_metadata = _(update_user_metadata)
get_user_integrations = _(get_user_integrations)
update_user_integrations = _(update_user_integrations)
get_user_info_by_id = _(get_user_info_by_id)
# User Comms - async
get_active_user_ids_in_timerange = _(get_active_user_ids_in_timerange)
@@ -173,6 +174,7 @@ class DatabaseManagerClient(AppServiceClient):
update_user_metadata = _(d.update_user_metadata)
get_user_integrations = _(d.get_user_integrations)
update_user_integrations = _(d.update_user_integrations)
get_user_info_by_id = _(d.get_user_info_by_id)
# User Comms - async
get_active_user_ids_in_timerange = _(d.get_active_user_ids_in_timerange)

View File

@@ -29,6 +29,7 @@ from backend.data.notifications import (
from backend.data.rabbitmq import SyncRabbitMQ
from backend.executor.utils import create_execution_queue_config
from backend.notifications.notifications import queue_notification
from backend.server.v2.iffy.block_moderation import moderate_block_content
from backend.util.exceptions import InsufficientBalanceError
if TYPE_CHECKING:
@@ -501,6 +502,15 @@ class Executor:
block_name="-",
)
moderate_block_content(
graph_id=node_exec.graph_id,
graph_exec_id=node_exec.graph_exec_id,
node_id=node_exec.node_id,
block_id=node_exec.block_id,
input_data=node_exec.inputs,
user_id=node_exec.user_id,
)
execution_stats = NodeExecutionStats()
timing_info, _ = cls._on_node_execution(
q, node_exec, log_metadata, execution_stats, node_credentials_input_map

View File

@@ -0,0 +1,63 @@
import logging
from typing import List, Tuple
from backend.data.block import BlockInput, BlockType, get_block
from backend.data.graph import GraphModel
from backend.server.v2.iffy.models import BlockContentForModeration
from backend.server.v2.iffy.service import IffyService
from backend.util.settings import BehaveAs, Settings
logger = logging.getLogger(__name__)
settings = Settings()
def moderate_block_content(
graph_id: str,
graph_exec_id: str,
node_id: str,
block_id: str,
input_data: BlockInput,
user_id: str,
) -> None:
"""
Moderate the content of a single block before execution.
Args:
graph_id: The ID of the graph
graph_exec_id: The ID of the graph execution
node_id: The ID of the node being executed
block_id: The ID of the block being executed
input_data: Input data for the block
user_id: The ID of the user running the block
"""
if settings.config.behave_as == BehaveAs.LOCAL:
return
try:
block = get_block(block_id)
if not block or block.block_type == BlockType.NOTE:
return
block_content = BlockContentForModeration(
graph_id=graph_id,
graph_exec_id=graph_exec_id,
node_id=node_id,
block_id=block.id,
block_name=block.name,
block_type=block.block_type.value,
input_data=input_data,
)
# Send to Iffy for moderation
result = IffyService.moderate_content(user_id, block_content)
# CRITICAL: Ensure we never proceed if moderation fails
if not result.is_safe:
logger.error(
f"Content moderation failed for {block.name}: {result.reason}"
)
raise ValueError(f"Content moderation failed for {block.name}: {result.reason}")
except Exception as e:
logger.error(f"Error during content moderation: {str(e)}")
raise ValueError("Content moderation system error")

View File

@@ -1,87 +0,0 @@
import logging
from typing import List, Tuple
from backend.data.block import BlockInput, BlockType, get_block
from backend.data.graph import GraphModel
from backend.server.v2.iffy.models import BlockContentForModeration
from backend.server.v2.iffy.service import IffyService
from backend.util.settings import BehaveAs, Settings
logger = logging.getLogger(__name__)
settings = Settings()
def moderate_graph_content(
graph: GraphModel,
graph_id: str,
graph_exec_id: str,
nodes_input: List[Tuple[str, BlockInput]],
user_id: str,
) -> None:
"""
Moderate the content of a graph before execution.
Args:
graph: The graph model to moderate
graph_id: The ID of the graph
graph_exec_id: The ID of the graph execution
nodes_input: Input data for starting nodes
user_id: The ID of the user running the graph
"""
if settings.config.behave_as == BehaveAs.LOCAL:
return
try:
for node in graph.nodes:
block = get_block(node.block_id)
if not block or block.block_type == BlockType.NOTE:
continue
# For starting nodes, use their input data
if node.id in dict(nodes_input):
input_data = dict(nodes_input)[node.id]
else:
# For non-starting nodes, collect their default inputs and static values
input_data = node.input_default.copy()
# Add any static input from connected nodes
for link in node.input_links:
if not link.is_static:
continue
source_node = next(
(n for n in graph.nodes if n.id == link.source_id), None
)
if not source_node:
continue
source_block = get_block(source_node.block_id)
if not source_block:
continue
input_data[link.sink_name] = source_node.input_default.get(
link.source_name
)
block_content = BlockContentForModeration(
graph_id=graph_id,
graph_exec_id=graph_exec_id,
node_id=node.id,
block_id=block.id,
block_name=block.name,
block_type=block.block_type.value,
input_data=input_data,
)
# Send to Iffy for moderation
result = IffyService.moderate_content(user_id, block_content)
# CRITICAL: Ensure we never proceed if moderation fails
if not result.is_safe:
logger.error(
f"Content moderation failed for {block.name}: {result.reason}"
)
raise ValueError(f"Content moderation failed for {block.name}")
except Exception as e:
logger.error(f"Error during content moderation: {str(e)}")
raise ValueError("Content moderation system error")

View File

@@ -16,9 +16,9 @@ settings = Settings()
@thread_cached
def get_db():
from backend.executor.database import DatabaseManager
from backend.executor.database import DatabaseManagerClient
return get_service_client(DatabaseManager)
return get_service_client(DatabaseManagerClient)
class IffyService: