untangle some more

This commit is contained in:
Reinier van der Leer
2026-02-12 11:23:42 +01:00
parent 23175708e6
commit eef892893c
13 changed files with 63 additions and 75 deletions

View File

@@ -16,7 +16,7 @@ from backend.data import execution as execution_db
from backend.data import graph as graph_db
from backend.data import user as user_db
from backend.data.auth.base import APIAuthorizationInfo
from backend.data.model import BlockInput, CompletedBlockOutput
from backend.data.block import BlockInput, CompletedBlockOutput
from backend.executor.utils import add_graph_execution
from backend.util.settings import Settings

View File

@@ -16,7 +16,7 @@ from backend.data.db import transaction
from backend.data.execution import get_graph_execution
from backend.data.graph import GraphSettings
from backend.data.includes import AGENT_PRESET_INCLUDE, library_agent_include
from backend.data.model import BlockInput, CredentialsMetaInput
from backend.data.model import CredentialsMetaInput, GraphInput
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.integrations.webhooks.graph_lifecycle_hooks import (
on_graph_activate,
@@ -1129,7 +1129,7 @@ async def create_preset_from_graph_execution(
async def update_preset(
user_id: str,
preset_id: str,
inputs: Optional[BlockInput] = None,
inputs: Optional[GraphInput] = None,
credentials: Optional[dict[str, CredentialsMetaInput]] = None,
name: Optional[str] = None,
description: Optional[str] = None,

View File

@@ -8,8 +8,8 @@ import pydantic
from backend.data.graph import GraphModel, GraphSettings, GraphTriggerInfo
from backend.data.model import (
BlockInput,
CredentialsMetaInput,
GraphInput,
is_credentials_field_name,
)
from backend.util.json import loads as json_loads
@@ -326,7 +326,7 @@ class LibraryAgentPresetCreatable(pydantic.BaseModel):
graph_id: str
graph_version: int
inputs: BlockInput
inputs: GraphInput
credentials: dict[str, CredentialsMetaInput]
name: str
@@ -355,7 +355,7 @@ class LibraryAgentPresetUpdatable(pydantic.BaseModel):
Request model used when updating a preset for a library agent.
"""
inputs: Optional[BlockInput] = None
inputs: Optional[GraphInput] = None
credentials: Optional[dict[str, CredentialsMetaInput]] = None
name: Optional[str] = None
@@ -398,7 +398,7 @@ class LibraryAgentPreset(LibraryAgentPresetCreatable):
"Webhook must be included in AgentPreset query when webhookId is set"
)
input_data: BlockInput = {}
input_data: GraphInput = {}
input_credentials: dict[str, CredentialsMetaInput] = {}
for preset_input in preset.InputPresets:

View File

@@ -44,6 +44,7 @@ from backend.blocks import get_block, get_blocks
from backend.data import execution as execution_db
from backend.data import graph as graph_db
from backend.data.auth import api_key as api_key_db
from backend.data.block import BlockInput, CompletedBlockOutput
from backend.data.credit import (
AutoTopUpConfig,
RefundRequest,
@@ -54,12 +55,7 @@ from backend.data.credit import (
set_auto_top_up,
)
from backend.data.graph import GraphSettings
from backend.data.model import (
BlockInput,
CompletedBlockOutput,
CredentialsMetaInput,
UserOnboarding,
)
from backend.data.model import CredentialsMetaInput, UserOnboarding
from backend.data.notifications import NotificationPreference, NotificationPreferenceDTO
from backend.data.onboarding import (
FrontendOnboardingStep,

View File

@@ -20,10 +20,8 @@ import jsonref
import jsonschema
from pydantic import BaseModel
from backend.data.block import BlockInput, BlockOutput, BlockOutputEntry
from backend.data.model import (
BlockInput,
BlockOutput,
BlockOutputEntry,
Credentials,
CredentialsFieldInfo,
CredentialsMetaInput,

View File

@@ -9,9 +9,7 @@ from typing import Any, Optional
from prisma.enums import ReviewStatus
from pydantic import BaseModel
from backend.data.execution import ExecutionStatus
from backend.data.human_review import ReviewResult
from backend.executor.manager import async_update_node_execution_status
from backend.util.clients import get_database_manager_async_client
logger = logging.getLogger(__name__)
@@ -43,6 +41,8 @@ class HITLReviewHelper:
@staticmethod
async def update_node_execution_status(**kwargs) -> None:
"""Update the execution status of a node."""
from backend.executor.manager import async_update_node_execution_status
await async_update_node_execution_status(
db_client=get_database_manager_async_client(), **kwargs
)
@@ -88,12 +88,13 @@ class HITLReviewHelper:
Raises:
Exception: If review creation or status update fails
"""
from backend.data.execution import ExecutionStatus
# Note: Safe mode checks (human_in_the_loop_safe_mode, sensitive_action_safe_mode)
# are handled by the caller:
# - HITL blocks check human_in_the_loop_safe_mode in their run() method
# - Sensitive action blocks check sensitive_action_safe_mode in is_block_exec_need_review()
# This function only handles checking for existing approvals.
# Check if this node has already been approved (normal or auto-approval)
if approval_result := await HITLReviewHelper.check_approval(
node_exec_id=node_exec_id,

View File

@@ -1,23 +1,32 @@
import logging
from typing import TYPE_CHECKING, Any, AsyncGenerator
from prisma.models import AgentBlock
from prisma.types import AgentBlockCreateInput
from backend.blocks import get_blocks
from backend.blocks._base import Block
from backend.util import json
if TYPE_CHECKING:
from backend.blocks._base import AnyBlockSchema
logger = logging.getLogger(__name__)
BlockInput = dict[str, Any] # Input: 1 input pin <- 1 data.
BlockOutputEntry = tuple[str, Any] # Output data should be a tuple of (name, value).
BlockOutput = AsyncGenerator[BlockOutputEntry, None] # Output: 1 output pin -> N data.
CompletedBlockOutput = dict[str, list[Any]] # Completed stream, collected as a dict.
async def initialize_blocks() -> None:
from backend.blocks import get_blocks
from backend.sdk.cost_integration import sync_all_provider_costs
from backend.util.retry import func_retry
sync_all_provider_costs()
@func_retry
async def sync_block_to_db(block: Block) -> None:
async def sync_block_to_db(block: "AnyBlockSchema") -> None:
existing_block = await AgentBlock.prisma().find_first(
where={"OR": [{"id": block.id}, {"name": block.name}]}
)

View File

@@ -4,7 +4,6 @@ from collections import defaultdict
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import (
TYPE_CHECKING,
Annotated,
Any,
AsyncGenerator,
@@ -41,7 +40,6 @@ from pydantic.fields import Field
from backend.blocks import get_block, get_io_block_ids, get_webhook_block_ids
from backend.blocks._base import BlockType
from backend.data.model import BlockInput, CompletedBlockOutput
from backend.util import type as type_utils
from backend.util.exceptions import DatabaseError
from backend.util.json import SafeJson
@@ -50,6 +48,7 @@ from backend.util.retry import func_retry
from backend.util.settings import Config
from backend.util.truncate import truncate
from .block import BlockInput, CompletedBlockOutput
from .db import BaseDbModel, query_raw_with_schema
from .event_bus import AsyncRedisEventBus, RedisEventBus
from .includes import (
@@ -58,10 +57,12 @@ from .includes import (
GRAPH_EXECUTION_INCLUDE_WITH_NODES,
graph_execution_include,
)
from .model import CredentialsMetaInput, GraphExecutionStats, NodeExecutionStats
if TYPE_CHECKING:
pass
from .model import (
CredentialsMetaInput,
GraphExecutionStats,
GraphInput,
NodeExecutionStats,
)
T = TypeVar("T")
@@ -162,7 +163,7 @@ class GraphExecutionMeta(BaseDbModel):
user_id: str
graph_id: str
graph_version: int
inputs: Optional[BlockInput] # no default -> required in the OpenAPI spec
inputs: Optional[GraphInput] # no default -> required in the OpenAPI spec
credential_inputs: Optional[dict[str, CredentialsMetaInput]]
nodes_input_masks: Optional[dict[str, BlockInput]]
preset_id: Optional[str]
@@ -267,7 +268,7 @@ class GraphExecutionMeta(BaseDbModel):
user_id=_graph_exec.userId,
graph_id=_graph_exec.agentGraphId,
graph_version=_graph_exec.agentGraphVersion,
inputs=cast(BlockInput | None, _graph_exec.inputs),
inputs=cast(GraphInput | None, _graph_exec.inputs),
credential_inputs=(
{
name: CredentialsMetaInput.model_validate(cmi)
@@ -309,7 +310,7 @@ class GraphExecutionMeta(BaseDbModel):
class GraphExecution(GraphExecutionMeta):
inputs: BlockInput # type: ignore - incompatible override is intentional
inputs: GraphInput # type: ignore - incompatible override is intentional
outputs: CompletedBlockOutput
@staticmethod
@@ -442,7 +443,7 @@ class NodeExecutionResult(BaseModel):
for name, messages in stats.cleared_inputs.items():
input_data[name] = messages[-1] if messages else ""
elif _node_exec.executionData:
input_data = type_utils.convert(_node_exec.executionData, dict[str, Any])
input_data = type_utils.convert(_node_exec.executionData, BlockInput)
else:
input_data: BlockInput = defaultdict()
for data in _node_exec.Input or []:
@@ -862,7 +863,7 @@ async def upsert_execution_output(
async def get_execution_outputs_by_node_exec_id(
node_exec_id: str,
) -> dict[str, Any]:
) -> CompletedBlockOutput:
"""
Get all execution outputs for a specific node execution ID.
@@ -1493,7 +1494,7 @@ async def get_graph_execution_by_share_token(
# The executionData contains the structured input with 'name' and 'value' fields
if hasattr(node_exec, "executionData") and node_exec.executionData:
exec_data = type_utils.convert(
node_exec.executionData, dict[str, Any]
node_exec.executionData, BlockInput
)
if "name" in exec_data:
name = exec_data["name"]

View File

@@ -28,23 +28,19 @@ from backend.blocks._base import Block, BlockType, EmptySchema
from backend.blocks.agent import AgentExecutorBlock
from backend.blocks.io import AgentInputBlock, AgentOutputBlock
from backend.blocks.llm import LlmModel
from backend.data.db import prisma as db
from backend.data.dynamic_fields import is_tool_pin, sanitize_pin_name
from backend.data.includes import MAX_GRAPH_VERSIONS_FETCH
from backend.data.model import (
BlockInput,
CredentialsFieldInfo,
CredentialsMetaInput,
is_credentials_field_name,
)
from backend.integrations.providers import ProviderName
from backend.util import type as type_utils
from backend.util.exceptions import GraphNotAccessibleError, GraphNotInLibraryError
from backend.util.json import SafeJson
from backend.util.models import Pagination
from .db import BaseDbModel, query_raw_with_schema, transaction
from .includes import AGENT_GRAPH_INCLUDE, AGENT_NODE_INCLUDE
from .block import BlockInput
from .db import BaseDbModel
from .db import prisma as db
from .db import query_raw_with_schema, transaction
from .dynamic_fields import is_tool_pin, sanitize_pin_name
from .includes import AGENT_GRAPH_INCLUDE, AGENT_NODE_INCLUDE, MAX_GRAPH_VERSIONS_FETCH
from .model import CredentialsFieldInfo, CredentialsMetaInput, is_credentials_field_name
if TYPE_CHECKING:
from backend.blocks._base import AnyBlockSchema
@@ -147,7 +143,7 @@ class NodeModel(Node):
obj = NodeModel(
id=node.id,
block_id=node.agentBlockId,
input_default=type_utils.convert(node.constantInput, dict[str, Any]),
input_default=type_utils.convert(node.constantInput, BlockInput),
metadata=type_utils.convert(node.metadata, dict[str, Any]),
graph_id=node.agentGraphId,
graph_version=node.agentGraphVersion,
@@ -189,8 +185,8 @@ class NodeModel(Node):
@staticmethod
def _filter_secrets_from_node_input(
input_data: dict[str, Any], schema: dict[str, Any] | None
) -> dict[str, Any]:
input_data: BlockInput, schema: dict[str, Any] | None
) -> BlockInput:
sensitive_keys = ["credentials", "api_key", "password", "token", "secret"]
field_schemas = schema.get("properties", {}) if schema else {}
result = {}

View File

@@ -10,7 +10,6 @@ from typing import (
TYPE_CHECKING,
Annotated,
Any,
AsyncGenerator,
Callable,
ClassVar,
Generic,
@@ -169,10 +168,7 @@ T = TypeVar("T")
logger = logging.getLogger(__name__)
BlockInput = dict[str, Any] # Input: 1 input pin <- 1 data.
BlockOutputEntry = tuple[str, Any] # Output data should be a tuple of (name, value).
BlockOutput = AsyncGenerator[BlockOutputEntry, None] # Output: 1 output pin -> N data.
CompletedBlockOutput = dict[str, list[Any]] # Completed stream, collected as a dict.
GraphInput = dict[str, Any]
class BlockSecret:

View File

@@ -21,6 +21,7 @@ from backend.blocks._base import BlockSchema
from backend.blocks.agent import AgentExecutorBlock
from backend.blocks.io import AgentOutputBlock
from backend.data import redis_client as redis
from backend.data.block import BlockInput, BlockOutput, BlockOutputEntry
from backend.data.credit import UsageTransactionMetadata
from backend.data.dynamic_fields import parse_execution_output
from backend.data.execution import (
@@ -34,13 +35,7 @@ from backend.data.execution import (
NodesInputMasks,
)
from backend.data.graph import Link, Node
from backend.data.model import (
BlockInput,
BlockOutput,
BlockOutputEntry,
GraphExecutionStats,
NodeExecutionStats,
)
from backend.data.model import GraphExecutionStats, NodeExecutionStats
from backend.data.notifications import (
AgentRunData,
LowBalanceData,

View File

@@ -25,7 +25,7 @@ from pydantic import BaseModel, Field, ValidationError
from sqlalchemy import MetaData, create_engine
from backend.data.execution import GraphExecutionWithNodes
from backend.data.model import BlockInput, CredentialsMetaInput
from backend.data.model import CredentialsMetaInput, GraphInput
from backend.executor import utils as execution_utils
from backend.monitoring import (
NotificationJobArgs,
@@ -386,7 +386,7 @@ class GraphExecutionJobArgs(BaseModel):
graph_version: int
agent_name: str | None = None
cron: str
input_data: BlockInput
input_data: GraphInput
input_credentials: dict[str, CredentialsMetaInput] = Field(default_factory=dict)
@@ -648,7 +648,7 @@ class Scheduler(AppService):
graph_id: str,
graph_version: int,
cron: str,
input_data: BlockInput,
input_data: GraphInput,
input_credentials: dict[str, CredentialsMetaInput],
name: Optional[str] = None,
user_timezone: str | None = None,

View File

@@ -15,10 +15,11 @@ from backend.data import graph as graph_db
from backend.data import human_review as human_review_db
from backend.data import onboarding as onboarding_db
from backend.data import user as user_db
from backend.data.block_cost_config import BLOCK_COSTS
from backend.data.db import prisma
# Import dynamic field utilities from centralized location
from backend.data.block import BlockInput, BlockOutputEntry
from backend.data.block_cost_config import BLOCK_COSTS
from backend.data.db import prisma
from backend.data.dynamic_fields import merge_execution_input
from backend.data.execution import (
ExecutionContext,
@@ -29,12 +30,7 @@ from backend.data.execution import (
NodesInputMasks,
)
from backend.data.graph import GraphModel, Node
from backend.data.model import (
USER_TIMEZONE_NOT_SET,
BlockInput,
BlockOutputEntry,
CredentialsMetaInput,
)
from backend.data.model import USER_TIMEZONE_NOT_SET, CredentialsMetaInput, GraphInput
from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig
from backend.util.clients import (
get_async_execution_event_bus,
@@ -425,7 +421,7 @@ async def validate_graph_with_credentials(
async def _construct_starting_node_execution_input(
graph: GraphModel,
user_id: str,
graph_inputs: BlockInput,
graph_inputs: GraphInput,
nodes_input_masks: Optional[NodesInputMasks] = None,
) -> tuple[list[tuple[str, BlockInput]], set[str]]:
"""
@@ -437,7 +433,7 @@ async def _construct_starting_node_execution_input(
Args:
graph (GraphModel): The graph model to execute.
user_id (str): The ID of the user executing the graph.
data (BlockInput): The input data for the graph execution.
data (GraphInput): The input data for the graph execution.
node_credentials_map: `dict[node_id, dict[input_name, CredentialsMetaInput]]`
Returns:
@@ -495,7 +491,7 @@ async def _construct_starting_node_execution_input(
async def validate_and_construct_node_execution_input(
graph_id: str,
user_id: str,
graph_inputs: BlockInput,
graph_inputs: GraphInput,
graph_version: Optional[int] = None,
graph_credentials_inputs: Optional[Mapping[str, CredentialsMetaInput]] = None,
nodes_input_masks: Optional[NodesInputMasks] = None,
@@ -795,7 +791,7 @@ async def stop_graph_execution(
async def add_graph_execution(
graph_id: str,
user_id: str,
inputs: Optional[BlockInput] = None,
inputs: Optional[GraphInput] = None,
preset_id: Optional[str] = None,
graph_version: Optional[int] = None,
graph_credentials_inputs: Optional[Mapping[str, CredentialsMetaInput]] = None,