feat: baseline backend for optional

This commit is contained in:
Nicholas Tindle
2025-09-25 16:24:19 -05:00
parent 0b267f573e
commit 03863219a3
6 changed files with 173 additions and 0 deletions

View File

@@ -14,6 +14,7 @@ from backend.data.block import (
BlockType,
)
from backend.data.model import NodeExecutionStats, SchemaField
from backend.data.optional_block import get_optional_config
from backend.util import json
from backend.util.clients import get_database_manager_async_client
@@ -428,6 +429,17 @@ class SmartDecisionMakerBlock(Block):
for sink_node, links in grouped_tool_links.values():
if not sink_node:
raise ValueError(f"Sink node not found: {links[0].sink_id}")
# todo: use the renamed value of metadata when available
# Check if this node is marked as optional and should be skipped
optional_config = get_optional_config(sink_node.metadata)
if optional_config and optional_config.enabled:
# For now, we'll filter out nodes that are marked as optional
# In future, we can add more sophisticated checks here
# based on the optional conditions (credentials, flags, etc.)
# TODO: Add runtime checks for whether node should be skipped
continue
if sink_node.block_id == AgentExecutorBlock().id:
return_tool_functions.append(

View File

@@ -115,6 +115,10 @@ VALID_STATUS_TRANSITIONS = {
ExecutionStatus.QUEUED,
ExecutionStatus.RUNNING,
],
ExecutionStatus.SKIPPED: [
ExecutionStatus.INCOMPLETE,
ExecutionStatus.QUEUED,
],
}

View File

@@ -0,0 +1,59 @@
from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field
class ConditionOperator(str, Enum):
AND = "and"
OR = "or"
class OptionalBlockConditions(BaseModel):
"""Conditions that determine when a block should be skipped"""
on_missing_credentials: bool = Field(
default=False,
description="Skip block if any required credentials are missing",
)
input_flag: Optional[str] = Field(
default=None,
description="Name of boolean agent input field that controls skip behavior",
)
kv_flag: Optional[str] = Field(
default=None,
description="Key-value store flag name that controls skip behavior",
)
operator: ConditionOperator = Field(
default=ConditionOperator.OR,
description="Logical operator for combining conditions (AND/OR)",
)
class OptionalBlockConfig(BaseModel):
"""Configuration for making a block optional/skippable"""
enabled: bool = Field(
default=False,
description="Whether this block can be optionally skipped",
)
conditions: OptionalBlockConditions = Field(
default_factory=OptionalBlockConditions,
description="Conditions that trigger skipping",
)
skip_message: Optional[str] = Field(
default=None,
description="Custom message to log when block is skipped",
)
def get_optional_config(node_metadata: dict) -> Optional[OptionalBlockConfig]:
"""Extract optional block configuration from node metadata"""
if "optional" not in node_metadata:
return None
optional_data = node_metadata.get("optional", {})
if not optional_data:
return None
return OptionalBlockConfig(**optional_data)

View File

@@ -55,6 +55,7 @@ from backend.data.execution import (
UserContext,
)
from backend.data.graph import Link, Node
from backend.data.optional_block import get_optional_config
from backend.executor.utils import (
GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS,
GRAPH_EXECUTION_CANCEL_QUEUE_NAME,
@@ -126,6 +127,73 @@ def execute_graph(
T = TypeVar("T")
async def should_skip_node(
node: Node,
creds_manager: IntegrationCredentialsManager,
user_id: str,
user_context: UserContext,
input_data: BlockInput,
) -> tuple[bool, str]:
"""
Check if a node should be skipped based on optional configuration.
Returns:
Tuple of (should_skip, skip_reason)
"""
optional_config = get_optional_config(node.metadata)
if not optional_config or not optional_config.enabled:
return False, ""
conditions = optional_config.conditions
skip_reasons = []
conditions_met = []
# Check credential availability
if conditions.on_missing_credentials:
node_block = node.block
input_model = cast(type[BlockSchema], node_block.input_schema)
for field_name, input_type in input_model.get_credentials_fields().items():
if field_name in input_data:
credentials_meta = input_type(**input_data[field_name])
# Check if credentials exist without acquiring lock
if not await creds_manager.exists(user_id, credentials_meta.id):
skip_reasons.append(f"Missing credentials: {field_name}")
conditions_met.append(True)
break
else:
# All credentials exist
if conditions.on_missing_credentials:
conditions_met.append(False)
# Check input flag
if conditions.input_flag and conditions.input_flag in input_data:
flag_value = input_data.get(conditions.input_flag, False)
if flag_value is True: # Skip if flag is True
skip_reasons.append(f"Input flag '{conditions.input_flag}' is true")
conditions_met.append(True)
else:
conditions_met.append(False)
# Check key-value flag
if conditions.kv_flag:
# TODO: Implement key-value store check once available
# For now, we'll skip this condition
pass
# Apply logical operator
if not conditions_met:
return False, ""
if conditions.operator == "and":
should_skip = all(conditions_met)
else: # OR
should_skip = any(conditions_met)
skip_message = optional_config.skip_message or "; ".join(skip_reasons)
return should_skip, skip_message
async def execute_node(
node: Node,
creds_manager: IntegrationCredentialsManager,
@@ -511,6 +579,25 @@ class ExecutionProcessor:
)
try:
# Check if node should be skipped
should_skip, skip_reason = await should_skip_node(
node=node,
creds_manager=self.creds_manager,
user_id=node_exec.user_id,
user_context=node_exec.user_context,
input_data=node_exec.inputs,
)
if should_skip:
log_metadata.info(f"Skipping node execution {node_exec.node_exec_id}: {skip_reason}")
await async_update_node_execution_status(
db_client=db_client,
exec_id=node_exec.node_exec_id,
status=ExecutionStatus.SKIPPED,
stats={"skip_reason": skip_reason},
)
return ExecutionStatus.SKIPPED
log_metadata.info(f"Start node execution {node_exec.node_exec_id}")
await async_update_node_execution_status(
db_client=db_client,

View File

@@ -0,0 +1,10 @@
-- Migration: Add SKIPPED status to AgentExecutionStatus enum
-- This migration adds support for conditional/optional block execution
-- Add SKIPPED value to the AgentExecutionStatus enum
ALTER TYPE "AgentExecutionStatus" ADD VALUE 'SKIPPED';
-- Note: This migration is irreversible in PostgreSQL.
-- Enum values cannot be removed once added.
-- To run this migration, execute:
-- cd autogpt_platform/backend && poetry run prisma migrate dev --name add-skipped-execution-status

View File

@@ -339,6 +339,7 @@ enum AgentExecutionStatus {
COMPLETED
TERMINATED
FAILED
SKIPPED
}
// This model describes the execution of an AgentGraph.