refactor(backend): move dry-run credential logic from manager.py to simulator.py

- OrchestratorBlock now uses platform simulation model + OpenRouter key
  instead of user's model/credentials during dry-run
- Credential restore + fallback-to-simulation logic moved into
  prepare_dry_run() and get_dry_run_credentials() in simulator.py
- manager.py reduced by ~30 lines of business logic
- Falls back to LLM simulation if platform OpenRouter key unavailable
This commit is contained in:
Zamil Majdy
2026-04-02 07:10:28 +02:00
parent 89c7f34d26
commit 9f2257daaa
3 changed files with 123 additions and 58 deletions

View File

@@ -81,7 +81,7 @@ from backend.util.settings import Settings
from .activity_status_generator import generate_activity_status_for_execution
from .automod.manager import automod_manager
from .cluster_lock import ClusterLock
from .simulator import prepare_dry_run, simulate_block
from .simulator import get_dry_run_credentials, prepare_dry_run, simulate_block
from .utils import (
GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS,
GRAPH_EXECUTION_CANCEL_QUEUE_NAME,
@@ -279,43 +279,20 @@ async def execute_node(
"nodes_to_skip": nodes_to_skip or set(),
}
# For special blocks in dry-run (OrchestratorBlock, AgentExecutorBlock),
# prepare_dry_run returns a (possibly modified) copy of input_data so the
# block executes for real. For all other blocks it returns None -> use
# LLM simulator.
# For special blocks in dry-run, prepare_dry_run returns a (possibly
# modified) copy of input_data so the block executes for real. For all
# other blocks it returns None -> use LLM simulator.
# OrchestratorBlock uses the platform's simulation model + OpenRouter key
# so no user credentials are needed.
_dry_run_input: dict[str, Any] | None = None
if execution_context.dry_run:
_dry_run_input = prepare_dry_run(node_block, input_data)
if _dry_run_input is not None:
pre_dry_run_input = input_data # Save in case we need to fall back
input_data = _dry_run_input
# Restore credential fields from node defaults so the block can
# acquire credentials during dry-run.
for field_name in cast(
type[BlockSchema], node_block.input_schema
).get_credentials_fields():
default_value = node.input_default.get(field_name)
if default_value is not None and not input_data.get(field_name):
input_data[field_name] = default_value
# If any required credentials fields are still missing after restoring
# from node defaults, fall back to LLM simulation instead of attempting
# real execution that would fail with "credentials is a required property".
creds_fields = cast(
type[BlockSchema], node_block.input_schema
).get_credentials_fields()
if creds_fields:
missing_creds = any(
not input_data.get(f)
or (isinstance(input_data.get(f), dict) and not input_data[f].get("id"))
for f in creds_fields
)
if missing_creds:
log_metadata.info(
"Dry-run: credentials not configured, falling back to simulation"
)
_dry_run_input = None
input_data = pre_dry_run_input
# Check for dry-run platform credentials (OrchestratorBlock uses the
# platform's OpenRouter key instead of user credentials).
_dry_run_creds = get_dry_run_credentials(input_data) if _dry_run_input else None
# Last-minute fetch credentials + acquire a system-wide read-write lock to prevent
# changes during execution. ⚠️ This means a set of credentials can only be used by
@@ -326,6 +303,12 @@ async def execute_node(
# Handle regular credentials fields
for field_name, input_type in input_model.get_credentials_fields().items():
# Dry-run platform credentials bypass the credential store
if _dry_run_creds is not None:
input_data[field_name] = None
extra_exec_kwargs[field_name] = _dry_run_creds
continue
field_value = input_data.get(field_name)
if not field_value or (
isinstance(field_value, dict) and not field_value.get("id")

View File

@@ -6,10 +6,12 @@ role-plays the block's execution using an LLM. For most blocks no real
API calls or side effects occur.
Special cases (no LLM simulation needed):
- OrchestratorBlock executes for real with the user's own model/credentials
(iterations capped to 1).
- OrchestratorBlock executes for real with the platform's simulation model
(iterations capped to 1). Uses the platform OpenRouter key so no user
credentials are required. Falls back to LLM simulation if the platform
key is unavailable.
- AgentExecutorBlock executes for real so it can spawn child graph executions
(whose blocks are then simulated).
(whose blocks are then simulated). No credentials needed.
- AgentInputBlock (and all subclasses) and AgentOutputBlock are pure
passthrough -- they forward their input values directly.
- MCPToolBlock is simulated via the generic LLM prompt (with run() source code).
@@ -283,41 +285,89 @@ Available output pins: {json.dumps(required_output_properties)}
# ---------------------------------------------------------------------------
def _get_platform_openrouter_key() -> str | None:
"""Return the platform's OpenRouter API key, or None if unavailable."""
try:
from backend.util.settings import Settings # noqa: PLC0415
key = Settings().secrets.open_router_api_key
return key if key else None
except Exception:
return None
def prepare_dry_run(block: Any, input_data: dict[str, Any]) -> dict[str, Any] | None:
"""Prepare *input_data* for a dry-run execution of *block*.
Returns a **modified copy** of *input_data* for blocks that should execute
for real with cheap settings (e.g. OrchestratorBlock), or ``None`` when the
block should be LLM-simulated instead.
for real with cheap settings, or ``None`` when the block should be
LLM-simulated instead.
AgentExecutorBlock also executes for real so it can spawn a child graph
execution. The child graph's blocks will be simulated because the
execution context inherits ``dry_run=True`` (see ``add_graph_execution``
in utils.py).
- **OrchestratorBlock** executes for real with the platform's simulation
model (iterations capped to 1). Uses the platform OpenRouter key so no
user credentials are needed. Falls back to LLM simulation if the
platform key is unavailable.
- **AgentExecutorBlock** executes for real so it can spawn a child graph
execution. The child graph inherits ``dry_run=True`` and its blocks
are simulated. No credentials are needed.
"""
if isinstance(block, OrchestratorBlock):
# Preserve the user's configured mode: 0 means traditional (single
# LLM call, no agent loop). Only override to 1 when the user chose
# agent mode (non-zero) so the dry run still exercises the loop once
# without running away.
# Keep the user's own model and credentials -- swapping to a different
# model can cause credential mismatches (e.g. Anthropic key vs OpenAI
# model).
or_key = _get_platform_openrouter_key()
if not or_key:
logger.info(
"Dry-run: no platform OpenRouter key, "
"falling back to LLM simulation for OrchestratorBlock"
)
return None
original = input_data.get("agent_mode_max_iterations", 0)
max_iters = 1 if original != 0 else 0
sim_model = _simulator_model()
return {
**input_data,
"agent_mode_max_iterations": max_iters,
"model": sim_model,
# Signal to manager.py that credentials should be skipped —
# the platform key is injected via _dry_run_credentials().
"credentials": None,
"_dry_run_api_key": or_key,
}
if isinstance(block, AgentExecutorBlock):
# AgentExecutorBlock spawns a child graph execution. No input
# modifications are needed -- the child graph inherits dry_run=True
# from the execution context and its blocks will be simulated.
# Return a copy so the caller knows this is a passthrough block.
return {**input_data}
return None
def get_dry_run_credentials(
input_data: dict[str, Any],
) -> Any | None:
"""Build an ``APIKeyCredentials`` for dry-run OrchestratorBlock execution.
Returns credentials using the platform's OpenRouter key (injected by
``prepare_dry_run``), or ``None`` if not a dry-run override.
"""
api_key = input_data.pop("_dry_run_api_key", None)
if not api_key:
return None
try:
from backend.blocks.llm import APIKeyCredentials # noqa: PLC0415
from backend.integrations.providers import ProviderName # noqa: PLC0415
return APIKeyCredentials(
id="dry-run-platform",
provider=ProviderName.OPEN_ROUTER,
api_key=api_key,
title="Dry-run simulation",
expires_at=None,
)
except Exception:
logger.warning("Failed to create dry-run credentials", exc_info=True)
return None
async def simulate_block(
block: Any,
input_data: dict[str, Any],

View File

@@ -139,25 +139,57 @@ class TestBuildSimulationPrompt:
class TestPrepareDryRun:
def test_orchestrator_block_caps_iterations(self) -> None:
def test_orchestrator_uses_simulation_model(self) -> None:
"""OrchestratorBlock should use the simulation model and cap iterations."""
from unittest.mock import patch
from backend.blocks.orchestrator import OrchestratorBlock
block = OrchestratorBlock()
result = prepare_dry_run(
block, {"agent_mode_max_iterations": 10, "other": "val"}
)
with patch(
"backend.executor.simulator._get_platform_openrouter_key",
return_value="sk-or-test-key",
):
result = prepare_dry_run(
block,
{"agent_mode_max_iterations": 10, "model": "gpt-4o", "other": "val"},
)
assert result is not None
assert result["agent_mode_max_iterations"] == 1
assert result["other"] == "val"
assert result["model"] != "gpt-4o" # overridden to simulation model
assert result["credentials"] is None
assert result["_dry_run_api_key"] == "sk-or-test-key"
def test_orchestrator_zero_stays_zero(self) -> None:
from unittest.mock import patch
def test_orchestrator_block_zero_stays_zero(self) -> None:
from backend.blocks.orchestrator import OrchestratorBlock
block = OrchestratorBlock()
result = prepare_dry_run(block, {"agent_mode_max_iterations": 0})
with patch(
"backend.executor.simulator._get_platform_openrouter_key",
return_value="sk-or-test-key",
):
result = prepare_dry_run(block, {"agent_mode_max_iterations": 0})
assert result is not None
assert result["agent_mode_max_iterations"] == 0
def test_orchestrator_falls_back_without_key(self) -> None:
"""Without platform OpenRouter key, OrchestratorBlock falls back
to LLM simulation (returns None)."""
from unittest.mock import patch
from backend.blocks.orchestrator import OrchestratorBlock
block = OrchestratorBlock()
with patch(
"backend.executor.simulator._get_platform_openrouter_key",
return_value=None,
):
result = prepare_dry_run(block, {"agent_mode_max_iterations": 5})
assert result is None
def test_agent_executor_block_passthrough(self) -> None:
from backend.blocks.agent import AgentExecutorBlock