fix(backend): propagate dry-run mode to special blocks with LLM-powered simulation (#12575)

## Summary
- **OrchestratorBlock & AgentExecutorBlock** now execute for real in
dry-run mode so the orchestrator can make LLM calls and agent executors
can spawn child graphs. Their downstream tool blocks and child-graph
blocks are still simulated via `simulate_block()`. Credential fields
from node defaults are restored since `validate_exec()` wipes them in
dry-run mode. Agent-mode iterations capped at 1 in dry-run.
- **All blocks** (including MCPToolBlock) are simulated via a single
generic `simulate_block()` path. The LLM prompt is grounded by
`inspect.getsource(block.run)`, giving the simulator access to the exact
implementation of each block's `run()` method. This produces realistic
mock responses for any block type without needing block-specific
simulation logic.
- Updated agent generation guide to document special block dry-run
behavior.
- Minor frontend fixes: exported `formatCents` from
`RateLimitResetDialog` for reuse in `UsagePanelContent`, used `useRef`
for stable callback references in `useResetRateLimit` to avoid stale
closures.
- 74 tests (21 existing dry-run + 53 new simulator tests covering prompt
building, passthrough logic, and special block dry-run).

## Design

The simulator (`backend/executor/simulator.py`) uses a two-tier
approach:

1. **Passthrough blocks** (OrchestratorBlock, AgentExecutorBlock):
`prepare_dry_run()` returns modified input_data so these blocks execute
for real in `manager.py`. OrchestratorBlock gets `max_iterations=1`
(agent mode) or 0 (traditional mode). AgentExecutorBlock spawns real
child graph executions whose blocks inherit `dry_run=True`.

2. **All other blocks**: `simulate_block()` builds an LLM prompt
containing:
   - Block name and description
   - Input/output schemas (JSON Schema)
   - The block's `run()` source code via `inspect.getsource(block.run)`
- The actual input values (with credentials stripped and long values
truncated)

The LLM then role-plays the block's execution, producing realistic
outputs grounded in the actual implementation.

Special handling for input/output blocks: `AgentInputBlock` and
`AgentOutputBlock` are pure passthrough (no LLM call needed).

## Test plan
- [x] All 74 tests pass (`pytest backend/copilot/tools/test_dry_run.py
backend/executor/simulator_test.py`)
- [x] Pre-commit hooks pass (ruff, isort, black, pyright, frontend
typecheck)
- [x] CI: all checks green
- [x] E2E: dry-run execution completes with `is_dry_run=true`, cost=0,
no errors
- [x] E2E: normal (non-dry-run) execution unchanged
- [x] E2E: Create agent with OrchestratorBlock + tool blocks, run with
`dry_run=True`, verify orchestrator makes real LLM calls while tool
blocks are simulated
- [x] E2E: AgentExecutorBlock spawns child graph in dry-run, child
blocks are LLM-simulated
- [x] E2E: Builder simulate button works end-to-end with special blocks

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Zamil Majdy
2026-04-02 19:09:55 +02:00
committed by GitHub
parent f115607779
commit f1ac05b2e0
20 changed files with 1216 additions and 171 deletions

View File

@@ -550,6 +550,8 @@ async def reset_copilot_usage(
try:
# Verify the user is actually at or over their daily limit.
# (rate_limit_reset_cost intentionally omitted — this object is only
# used for limit checks, not returned to the client.)
usage_status = await get_usage_status(
user_id=user_id,
daily_token_limit=daily_limit,

View File

@@ -481,6 +481,11 @@ async def create_library_agent(
sensitive_action_safe_mode=sensitive_action_safe_mode,
).model_dump()
),
**(
{"Folder": {"connect": {"id": folder_id}}}
if folder_id and graph_entry is graph
else {}
),
},
},
include=library_agent_include(

View File

@@ -698,13 +698,30 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
if should_pause:
return
# Validate the input data (original or reviewer-modified) once
if error := self.input_schema.validate_data(input_data):
raise BlockInputError(
message=f"Unable to execute block with invalid input data: {error}",
block_name=self.name,
block_id=self.id,
)
# Validate the input data (original or reviewer-modified) once.
# In dry-run mode, credential fields may contain sentinel None values
# that would fail JSON schema required checks. We still validate the
# non-credential fields so blocks that execute for real during dry-run
# (e.g. AgentExecutorBlock) get proper input validation.
is_dry_run = getattr(kwargs.get("execution_context"), "dry_run", False)
if is_dry_run:
cred_field_names = set(self.input_schema.get_credentials_fields().keys())
non_cred_data = {
k: v for k, v in input_data.items() if k not in cred_field_names
}
if error := self.input_schema.validate_data(non_cred_data):
raise BlockInputError(
message=f"Unable to execute block with invalid input data: {error}",
block_name=self.name,
block_id=self.id,
)
else:
if error := self.input_schema.validate_data(input_data):
raise BlockInputError(
message=f"Unable to execute block with invalid input data: {error}",
block_name=self.name,
block_id=self.id,
)
# Use the validated input data
async for output_name, output_data in self.run(

View File

@@ -49,11 +49,17 @@ class AgentExecutorBlock(Block):
@classmethod
def get_missing_input(cls, data: BlockInput) -> set[str]:
required_fields = cls.get_input_schema(data).get("required", [])
return set(required_fields) - set(data)
# Check against the nested `inputs` dict, not the top-level node
# data — required fields like "topic" live inside data["inputs"],
# not at data["topic"].
provided = data.get("inputs", {})
return set(required_fields) - set(provided)
@classmethod
def get_mismatch_error(cls, data: BlockInput) -> str | None:
return validate_with_jsonschema(cls.get_input_schema(data), data)
return validate_with_jsonschema(
cls.get_input_schema(data), data.get("inputs", {})
)
class Output(BlockSchema):
# Use BlockSchema to avoid automatic error field that could clash with graph outputs
@@ -88,6 +94,7 @@ class AgentExecutorBlock(Block):
execution_context=execution_context.model_copy(
update={"parent_execution_id": graph_exec_id},
),
dry_run=execution_context.dry_run,
)
logger = execution_utils.LogMetadata(
@@ -149,14 +156,19 @@ class AgentExecutorBlock(Block):
ExecutionStatus.TERMINATED,
ExecutionStatus.FAILED,
]:
logger.debug(
f"Execution {log_id} received event {event.event_type} with status {event.status}"
logger.info(
f"Execution {log_id} skipping event {event.event_type} status={event.status} "
f"node={getattr(event, 'node_exec_id', '?')}"
)
continue
if event.event_type == ExecutionEventType.GRAPH_EXEC_UPDATE:
# If the graph execution is COMPLETED, TERMINATED, or FAILED,
# we can stop listening for further events.
logger.info(
f"Execution {log_id} graph completed with status {event.status}, "
f"yielded {len(yielded_node_exec_ids)} outputs"
)
self.merge_stats(
NodeExecutionStats(
extra_cost=event.stats.cost if event.stats else 0,

View File

@@ -89,6 +89,12 @@ class MCPToolBlock(Block):
default={},
hidden=True,
)
tool_description: str = SchemaField(
description="Description of the selected MCP tool. "
"Populated automatically when a tool is selected.",
default="",
hidden=True,
)
tool_arguments: dict[str, Any] = SchemaField(
description="Arguments to pass to the selected MCP tool. "

View File

@@ -20,6 +20,10 @@ class ChatConfig(BaseSettings):
default="openai/gpt-4o-mini",
description="Model to use for generating session titles (should be fast/cheap)",
)
simulation_model: str = Field(
default="google/gemini-2.5-flash",
description="Model for dry-run block simulation (should be fast/cheap with good JSON output)",
)
api_key: str | None = Field(default=None, description="OpenAI API key")
base_url: str | None = Field(
default=OPENROUTER_BASE_URL,

View File

@@ -161,8 +161,9 @@ async def reset_daily_usage(user_id: str, daily_token_limit: int = 0) -> bool:
daily_token_limit: The configured daily token limit. When positive,
the weekly counter is reduced by this amount.
Fails open: returns False if Redis is unavailable (consistent with
the fail-open design of this module).
Returns False if Redis is unavailable so the caller can handle
compensation (fail-closed for billed operations, unlike the read-only
rate-limit checks which fail-open).
"""
now = datetime.now(UTC)
try:

View File

@@ -70,6 +70,10 @@ class TestResetCopilotUsage:
with (
patch(f"{_MODULE}.config", _make_config(daily_token_limit=0)),
patch(f"{_MODULE}.settings", _mock_settings()),
patch(
f"{_MODULE}.get_global_rate_limits",
AsyncMock(return_value=(0, 12_500_000)),
),
):
with pytest.raises(HTTPException) as exc_info:
await reset_copilot_usage(user_id="user-1")
@@ -83,6 +87,10 @@ class TestResetCopilotUsage:
with (
patch(f"{_MODULE}.config", cfg),
patch(f"{_MODULE}.settings", _mock_settings()),
patch(
f"{_MODULE}.get_global_rate_limits",
AsyncMock(return_value=(2_500_000, 12_500_000)),
),
patch(f"{_MODULE}.get_daily_reset_count", AsyncMock(return_value=0)),
patch(f"{_MODULE}.acquire_reset_lock", AsyncMock(return_value=True)),
patch(f"{_MODULE}.release_reset_lock", AsyncMock()) as mock_release,
@@ -112,6 +120,10 @@ class TestResetCopilotUsage:
with (
patch(f"{_MODULE}.config", cfg),
patch(f"{_MODULE}.settings", _mock_settings()),
patch(
f"{_MODULE}.get_global_rate_limits",
AsyncMock(return_value=(2_500_000, 12_500_000)),
),
patch(f"{_MODULE}.get_daily_reset_count", AsyncMock(return_value=0)),
patch(f"{_MODULE}.acquire_reset_lock", AsyncMock(return_value=True)),
patch(f"{_MODULE}.release_reset_lock", AsyncMock()) as mock_release,
@@ -141,6 +153,10 @@ class TestResetCopilotUsage:
with (
patch(f"{_MODULE}.config", cfg),
patch(f"{_MODULE}.settings", _mock_settings()),
patch(
f"{_MODULE}.get_global_rate_limits",
AsyncMock(return_value=(2_500_000, 12_500_000)),
),
patch(f"{_MODULE}.get_daily_reset_count", AsyncMock(return_value=0)),
patch(f"{_MODULE}.acquire_reset_lock", AsyncMock(return_value=True)),
patch(f"{_MODULE}.release_reset_lock", AsyncMock()),
@@ -171,6 +187,10 @@ class TestResetCopilotUsage:
with (
patch(f"{_MODULE}.config", cfg),
patch(f"{_MODULE}.settings", _mock_settings()),
patch(
f"{_MODULE}.get_global_rate_limits",
AsyncMock(return_value=(2_500_000, 12_500_000)),
),
patch(f"{_MODULE}.get_daily_reset_count", AsyncMock(return_value=3)),
):
with pytest.raises(HTTPException) as exc_info:
@@ -208,6 +228,10 @@ class TestResetCopilotUsage:
with (
patch(f"{_MODULE}.config", cfg),
patch(f"{_MODULE}.settings", _mock_settings()),
patch(
f"{_MODULE}.get_global_rate_limits",
AsyncMock(return_value=(2_500_000, 12_500_000)),
),
patch(f"{_MODULE}.get_daily_reset_count", AsyncMock(return_value=0)),
patch(f"{_MODULE}.acquire_reset_lock", AsyncMock(return_value=True)),
patch(f"{_MODULE}.release_reset_lock", AsyncMock()) as mock_release,
@@ -228,6 +252,10 @@ class TestResetCopilotUsage:
with (
patch(f"{_MODULE}.config", _make_config()),
patch(f"{_MODULE}.settings", _mock_settings()),
patch(
f"{_MODULE}.get_global_rate_limits",
AsyncMock(return_value=(2_500_000, 12_500_000)),
),
patch(f"{_MODULE}.get_daily_reset_count", AsyncMock(return_value=None)),
):
with pytest.raises(HTTPException) as exc_info:
@@ -245,6 +273,10 @@ class TestResetCopilotUsage:
with (
patch(f"{_MODULE}.config", cfg),
patch(f"{_MODULE}.settings", _mock_settings()),
patch(
f"{_MODULE}.get_global_rate_limits",
AsyncMock(return_value=(2_500_000, 12_500_000)),
),
patch(f"{_MODULE}.get_daily_reset_count", AsyncMock(return_value=0)),
patch(f"{_MODULE}.acquire_reset_lock", AsyncMock(return_value=True)),
patch(f"{_MODULE}.release_reset_lock", AsyncMock()),
@@ -275,6 +307,10 @@ class TestResetCopilotUsage:
with (
patch(f"{_MODULE}.config", cfg),
patch(f"{_MODULE}.settings", _mock_settings()),
patch(
f"{_MODULE}.get_global_rate_limits",
AsyncMock(return_value=(2_500_000, 12_500_000)),
),
patch(f"{_MODULE}.get_daily_reset_count", AsyncMock(return_value=0)),
patch(f"{_MODULE}.acquire_reset_lock", AsyncMock(return_value=True)),
patch(f"{_MODULE}.release_reset_lock", AsyncMock()),

View File

@@ -253,6 +253,17 @@ real API calls, credentials, or credits:
3. **Iterate**: If the dry run reveals wiring issues or missing inputs, fix
the agent JSON and re-save before suggesting a real execution.
**Special block behaviour in dry-run mode:**
- **OrchestratorBlock** and **AgentExecutorBlock** execute for real so the
orchestrator can make LLM calls and agent executors can spawn child graphs.
Their downstream tool blocks and child-graph blocks are still simulated.
Note: real LLM inference calls are made (consuming API quota), even though
platform credits are not charged. Agent-mode iterations are capped at 1 in
dry-run to keep it fast.
- **MCPToolBlock** is simulated using the selected tool's name and JSON Schema
so the LLM can produce a realistic mock response without connecting to the
MCP server.
### Example: Simple AI Text Processor
A minimal agent with input, processing, and output:

View File

@@ -10,7 +10,11 @@ import backend.copilot.tools.run_block as run_block_module
from backend.copilot.tools.helpers import execute_block
from backend.copilot.tools.models import BlockOutputResponse, ErrorResponse
from backend.copilot.tools.run_block import RunBlockTool
from backend.executor.simulator import build_simulation_prompt, simulate_block
from backend.executor.simulator import (
build_simulation_prompt,
prepare_dry_run,
simulate_block,
)
# ---------------------------------------------------------------------------
# Helpers
@@ -75,7 +79,8 @@ def make_openai_response(
async def test_simulate_block_basic():
"""simulate_block returns correct (output_name, output_data) tuples.
Empty "error" pins are dropped at source — only non-empty errors are yielded.
Empty error pins should be omitted (not yielded) — only pins with
meaningful values are forwarded.
"""
mock_block = make_mock_block()
mock_client = AsyncMock()
@@ -91,7 +96,7 @@ async def test_simulate_block_basic():
outputs.append((name, data))
assert ("result", "simulated output") in outputs
# Empty error pin is dropped at the simulator level
# Empty error pin should NOT be yielded the simulator omits empty values
assert ("error", "") not in outputs
@@ -147,7 +152,7 @@ async def test_simulate_block_all_retries_exhausted():
@pytest.mark.asyncio
async def test_simulate_block_missing_output_pins():
"""LLM response missing some output pins; verify non-error pins filled with None."""
"""LLM response missing some output pins; they are omitted (not yielded)."""
mock_block = make_mock_block(
output_props={
"result": {"type": "string"},
@@ -169,30 +174,9 @@ async def test_simulate_block_missing_output_pins():
outputs[name] = data
assert outputs["result"] == "hello"
assert outputs["count"] is None # missing pin filled with None
assert "error" not in outputs # missing error pin is omitted entirely
@pytest.mark.asyncio
async def test_simulate_block_keeps_nonempty_error():
"""simulate_block keeps non-empty error pins (simulated logical errors)."""
mock_block = make_mock_block()
mock_client = AsyncMock()
mock_client.chat.completions.create = AsyncMock(
return_value=make_openai_response(
'{"result": "", "error": "API rate limit exceeded"}'
)
)
with patch(
"backend.executor.simulator.get_openai_client", return_value=mock_client
):
outputs = []
async for name, data in simulate_block(mock_block, {"query": "test"}):
outputs.append((name, data))
assert ("result", "") in outputs
assert ("error", "API rate limit exceeded") in outputs
# Missing pins are omitted — only pins with meaningful values are yielded
assert "count" not in outputs
assert "error" not in outputs
@pytest.mark.asyncio
@@ -228,17 +212,19 @@ async def test_simulate_block_truncates_long_inputs():
assert len(parsed["text"]) < 25000
def test_build_simulation_prompt_excludes_error_from_must_include():
"""The 'MUST include' prompt line should NOT list 'error' — the prompt
already instructs the LLM to OMIT error unless simulating a logical error.
Including it in 'MUST include' would be contradictory."""
def test_build_simulation_prompt_lists_available_output_pins():
"""The prompt should list available output pins (excluding error) so the LLM
knows which keys it MUST include. Error is excluded because the prompt
tells the LLM to omit it unless simulating a logical failure."""
block = make_mock_block() # default output_props has "result" and "error"
system_prompt, _ = build_simulation_prompt(block, {"query": "test"})
must_include_line = [
line for line in system_prompt.splitlines() if "MUST include" in line
available_line = [
line for line in system_prompt.splitlines() if "Available output pins" in line
][0]
assert '"result"' in must_include_line
assert '"error"' not in must_include_line
assert '"result"' in available_line
# "error" is intentionally excluded from the required output pins list
# since the prompt instructs the LLM to omit it unless simulating errors
assert '"error"' not in available_line
# ---------------------------------------------------------------------------
@@ -493,3 +479,146 @@ async def test_execute_block_dry_run_simulator_error_returns_error_response():
assert isinstance(response, ErrorResponse)
assert "[SIMULATOR ERROR" in response.message
# ---------------------------------------------------------------------------
# prepare_dry_run tests
# ---------------------------------------------------------------------------
def test_prepare_dry_run_orchestrator_block():
"""prepare_dry_run caps iterations and overrides model to simulation model."""
from backend.blocks.orchestrator import OrchestratorBlock
block = OrchestratorBlock()
input_data = {"prompt": "hello", "model": "gpt-4o", "agent_mode_max_iterations": 10}
with patch(
"backend.executor.simulator._get_platform_openrouter_key",
return_value="sk-or-test-key",
):
result = prepare_dry_run(block, input_data)
assert result is not None
# Model is overridden to the simulation model (not the user's model).
assert result["model"] != "gpt-4o"
assert result["agent_mode_max_iterations"] == 1
assert result["_dry_run_api_key"] == "sk-or-test-key"
# Original input_data should not be mutated.
assert input_data["model"] == "gpt-4o"
def test_prepare_dry_run_agent_executor_block():
"""prepare_dry_run returns a copy of input_data for AgentExecutorBlock.
AgentExecutorBlock must execute for real during dry-run so it can spawn
a child graph execution (whose blocks are then simulated). Its Output
schema has no properties, so LLM simulation would yield zero outputs.
"""
from backend.blocks.agent import AgentExecutorBlock
block = AgentExecutorBlock()
input_data = {
"user_id": "u1",
"graph_id": "g1",
"graph_version": 1,
"inputs": {"text": "hello"},
"input_schema": {},
"output_schema": {},
}
result = prepare_dry_run(block, input_data)
assert result is not None
# Input data is returned as-is (no model swap needed).
assert result["user_id"] == "u1"
assert result["graph_id"] == "g1"
# Original input_data should not be mutated.
assert result is not input_data
def test_prepare_dry_run_regular_block_returns_none():
"""prepare_dry_run returns None for a regular block (use simulator)."""
mock_block = make_mock_block()
assert prepare_dry_run(mock_block, {"query": "test"}) is None
# ---------------------------------------------------------------------------
# Input/output block passthrough tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_simulate_agent_input_block_passthrough():
"""AgentInputBlock should pass through the value directly, no LLM call."""
from backend.blocks.io import AgentInputBlock
block = AgentInputBlock()
outputs = []
async for name, data in simulate_block(
block, {"value": "hello world", "name": "q"}
):
outputs.append((name, data))
assert outputs == [("result", "hello world")]
@pytest.mark.asyncio
async def test_simulate_agent_dropdown_input_block_passthrough():
"""AgentDropdownInputBlock (subclass of AgentInputBlock) should pass through."""
from backend.blocks.io import AgentDropdownInputBlock
block = AgentDropdownInputBlock()
outputs = []
async for name, data in simulate_block(
block,
{
"value": "Option B",
"name": "sev",
"options": ["Option A", "Option B"],
},
):
outputs.append((name, data))
assert outputs == [("result", "Option B")]
@pytest.mark.asyncio
async def test_simulate_agent_input_block_none_value_falls_back_to_name():
"""AgentInputBlock with value=None falls back to the input name."""
from backend.blocks.io import AgentInputBlock
block = AgentInputBlock()
outputs = []
async for name, data in simulate_block(block, {"value": None, "name": "q"}):
outputs.append((name, data))
# When value is None, the simulator falls back to the "name" field
assert outputs == [("result", "q")]
@pytest.mark.asyncio
async def test_simulate_agent_output_block_passthrough():
"""AgentOutputBlock should pass through value as output."""
from backend.blocks.io import AgentOutputBlock
block = AgentOutputBlock()
outputs = []
async for name, data in simulate_block(
block, {"value": "result text", "name": "out1"}
):
outputs.append((name, data))
assert ("output", "result text") in outputs
assert ("name", "out1") in outputs
@pytest.mark.asyncio
async def test_simulate_agent_output_block_no_name():
"""AgentOutputBlock without name in input should still yield output."""
from backend.blocks.io import AgentOutputBlock
block = AgentOutputBlock()
outputs = []
async for name, data in simulate_block(block, {"value": 42}):
outputs.append((name, data))
assert outputs == [("output", 42)]

View File

@@ -81,7 +81,7 @@ from backend.util.settings import Settings
from .activity_status_generator import generate_activity_status_for_execution
from .automod.manager import automod_manager
from .cluster_lock import ClusterLock
from .simulator import simulate_block
from .simulator import get_dry_run_credentials, prepare_dry_run, simulate_block
from .utils import (
GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS,
GRAPH_EXECUTION_CANCEL_QUEUE_NAME,
@@ -279,6 +279,21 @@ async def execute_node(
"nodes_to_skip": nodes_to_skip or set(),
}
# For special blocks in dry-run, prepare_dry_run returns a (possibly
# modified) copy of input_data so the block executes for real. For all
# other blocks it returns None -> use LLM simulator.
# OrchestratorBlock uses the platform's simulation model + OpenRouter key
# so no user credentials are needed.
_dry_run_input: dict[str, Any] | None = None
if execution_context.dry_run:
_dry_run_input = prepare_dry_run(node_block, input_data)
if _dry_run_input is not None:
input_data = _dry_run_input
# Check for dry-run platform credentials (OrchestratorBlock uses the
# platform's OpenRouter key instead of user credentials).
_dry_run_creds = get_dry_run_credentials(input_data) if _dry_run_input else None
# Last-minute fetch credentials + acquire a system-wide read-write lock to prevent
# changes during execution. ⚠️ This means a set of credentials can only be used by
# one (running) block at a time; simultaneous execution of blocks using same
@@ -288,6 +303,12 @@ async def execute_node(
# Handle regular credentials fields
for field_name, input_type in input_model.get_credentials_fields().items():
# Dry-run platform credentials bypass the credential store
if _dry_run_creds is not None:
input_data[field_name] = None
extra_exec_kwargs[field_name] = _dry_run_creds
continue
field_value = input_data.get(field_name)
if not field_value or (
isinstance(field_value, dict) and not field_value.get("id")
@@ -375,7 +396,7 @@ async def execute_node(
scope.set_tag(f"execution_context.{k}", v)
try:
if execution_context.dry_run:
if execution_context.dry_run and _dry_run_input is None:
block_iter = simulate_block(node_block, input_data)
else:
block_iter = node_block.execute(input_data, **extra_exec_kwargs)

View File

@@ -2,34 +2,66 @@
LLM-powered block simulator for dry-run execution.
When dry_run=True, instead of calling the real block, this module
role-plays the block's execution using an LLM. No real API calls,
no side effects. The LLM is grounded by:
role-plays the block's execution using an LLM. For most blocks no real
API calls or side effects occur.
Special cases (no LLM simulation needed):
- OrchestratorBlock executes for real with the platform's simulation model
(iterations capped to 1). Uses the platform OpenRouter key so no user
credentials are required. Falls back to LLM simulation if the platform
key is unavailable.
- AgentExecutorBlock executes for real so it can spawn child graph executions
(whose blocks are then simulated). No credentials needed.
- AgentInputBlock (and all subclasses) and AgentOutputBlock are pure
passthrough -- they forward their input values directly.
- MCPToolBlock is simulated via the generic LLM prompt (with run() source code).
OrchestratorBlock and AgentExecutorBlock are handled in manager.py via
``prepare_dry_run``.
The LLM simulation is grounded by:
- Block name and description
- Input/output schemas (from block.input_schema.jsonschema() / output_schema.jsonschema())
- The block's run() source code (via inspect.getsource)
- The actual input values
Inspired by https://github.com/Significant-Gravitas/agent-simulator
"""
import inspect
import json
import logging
import os
from collections.abc import AsyncGenerator
from typing import Any
from backend.blocks.agent import AgentExecutorBlock
from backend.blocks.io import AgentInputBlock, AgentOutputBlock
from backend.blocks.orchestrator import OrchestratorBlock
from backend.util.clients import get_openai_client
logger = logging.getLogger(__name__)
# Use the same fast/cheap model the copilot uses for non-primary tasks.
# Overridable via ChatConfig.title_model if ChatConfig is available.
def _simulator_model() -> str:
try:
from backend.copilot.config import ChatConfig # noqa: PLC0415
# Default simulator model — Gemini 2.5 Flash via OpenRouter (fast, cheap, good at
# JSON generation). Configurable via SIMULATION_MODEL env var or
# ChatConfig.simulation_model.
_DEFAULT_SIMULATOR_MODEL = "google/gemini-2.5-flash"
model = ChatConfig().title_model
except Exception:
model = "openai/gpt-4o-mini"
def _simulator_model() -> str:
# 1. Environment variable override (highest priority).
env_model = os.environ.get("SIMULATION_MODEL")
if env_model:
model = env_model
else:
# 2. ChatConfig.simulation_model (falls back to default).
try:
from backend.copilot.config import ChatConfig # noqa: PLC0415
model = ChatConfig().simulation_model or _DEFAULT_SIMULATOR_MODEL
except Exception:
model = _DEFAULT_SIMULATOR_MODEL
# get_openai_client() may return a direct OpenAI client (not OpenRouter).
# Direct OpenAI expects bare model names ("gpt-4o-mini"), not the
@@ -39,10 +71,6 @@ def _simulator_model() -> str:
from backend.util.settings import Settings # noqa: PLC0415
secrets = Settings().secrets
# get_openai_client() uses the direct OpenAI client whenever
# openai_internal_api_key is set, regardless of open_router_api_key.
# Strip the provider prefix (e.g. "openai/gpt-4o-mini" → "gpt-4o-mini")
# so the model name is valid for the direct OpenAI API.
if secrets.openai_internal_api_key and "/" in model:
model = model.split("/", 1)[1]
except Exception:
@@ -54,6 +82,7 @@ def _simulator_model() -> str:
_TEMPERATURE = 0.2
_MAX_JSON_RETRIES = 5
_MAX_INPUT_VALUE_CHARS = 20000
_COMMON_CRED_KEYS = frozenset({"credentials", "api_key", "token", "secret"})
def _truncate_value(value: Any) -> Any:
@@ -88,73 +117,31 @@ def _describe_schema_pins(schema: dict[str, Any]) -> str:
return "\n".join(lines) if lines else "(no output pins defined)"
def build_simulation_prompt(block: Any, input_data: dict[str, Any]) -> tuple[str, str]:
"""Build (system_prompt, user_prompt) for block simulation."""
input_schema = block.input_schema.jsonschema()
output_schema = block.output_schema.jsonschema()
input_pins = _describe_schema_pins(input_schema)
output_pins = _describe_schema_pins(output_schema)
output_properties = list(output_schema.get("properties", {}).keys())
# Build a separate list for the "MUST include" instruction that excludes
# "error" — the prompt already tells the LLM to OMIT the error pin unless
# simulating a logical error. Including it in "MUST include" is contradictory.
required_output_properties = [k for k in output_properties if k != "error"]
block_name = getattr(block, "name", type(block).__name__)
block_description = getattr(block, "description", "No description available.")
system_prompt = f"""You are simulating the execution of a software block called "{block_name}".
## Block Description
{block_description}
## Input Schema
{input_pins}
## Output Schema (what you must return)
{output_pins}
Your task: given the current inputs, produce realistic simulated outputs for this block.
Rules:
- Respond with a single JSON object whose keys are EXACTLY the output pin names listed above.
- Assume all credentials and authentication are present and valid. Never simulate authentication failures.
- Make the simulated outputs realistic and consistent with the inputs.
- If there is an "error" pin, OMIT it entirely unless you are simulating a logical error. Only include the "error" pin when there is a genuine error message to report.
- Do not include any extra keys beyond the output pins.
Output pin names you MUST include: {json.dumps(required_output_properties)}
"""
safe_inputs = _truncate_input_values(input_data)
user_prompt = f"## Current Inputs\n{json.dumps(safe_inputs, indent=2)}"
return system_prompt, user_prompt
# ---------------------------------------------------------------------------
# Shared LLM call helper
# ---------------------------------------------------------------------------
async def simulate_block(
block: Any,
input_data: dict[str, Any],
) -> AsyncGenerator[tuple[str, Any], None]:
"""Simulate block execution using an LLM.
async def _call_llm_for_simulation(
system_prompt: str,
user_prompt: str,
*,
label: str = "simulate",
) -> dict[str, Any]:
"""Send a simulation prompt to the LLM and return the parsed JSON dict.
Yields (output_name, output_data) tuples matching the Block.execute() interface.
On unrecoverable failure, yields a single ("error", "[SIMULATOR ERROR ...") tuple.
Handles client acquisition, retries on invalid JSON, and logging.
Raises:
RuntimeError: If no LLM client is available.
ValueError: If all retry attempts are exhausted.
"""
client = get_openai_client()
if client is None:
yield (
"error",
raise RuntimeError(
"[SIMULATOR ERROR — NOT A BLOCK FAILURE] No LLM client available "
"(missing OpenAI/OpenRouter API key).",
"(missing OpenAI/OpenRouter API key)."
)
return
output_schema = block.output_schema.jsonschema()
output_properties: dict[str, Any] = output_schema.get("properties", {})
system_prompt, user_prompt = build_simulation_prompt(block, input_data)
model = _simulator_model()
last_error: Exception | None = None
@@ -176,60 +163,366 @@ async def simulate_block(
if not isinstance(parsed, dict):
raise ValueError(f"LLM returned non-object JSON: {raw[:200]}")
# Fill missing output pins with defaults.
# Skip empty "error" pins — an empty string means "no error" and
# would only confuse downstream consumers (LLM, frontend).
result: dict[str, Any] = {}
for pin_name in output_properties:
if pin_name in parsed:
value = parsed[pin_name]
# Drop empty/blank error pins: they carry no information.
# Uses strip() intentionally so whitespace-only strings
# (e.g. " ", "\n") are also treated as empty.
if (
pin_name == "error"
and isinstance(value, str)
and not value.strip()
):
continue
result[pin_name] = value
elif pin_name != "error":
# Only fill non-error missing pins with None
result[pin_name] = None
logger.debug(
"simulate_block: block=%s attempt=%d tokens=%s/%s",
getattr(block, "name", "?"),
"simulate(%s): attempt=%d tokens=%s/%s",
label,
attempt + 1,
getattr(getattr(response, "usage", None), "prompt_tokens", "?"),
getattr(getattr(response, "usage", None), "completion_tokens", "?"),
)
for pin_name, pin_value in result.items():
yield pin_name, pin_value
return
return parsed
except (json.JSONDecodeError, ValueError) as e:
last_error = e
logger.warning(
"simulate_block: JSON parse error on attempt %d/%d: %s",
"simulate(%s): JSON parse error on attempt %d/%d: %s",
label,
attempt + 1,
_MAX_JSON_RETRIES,
e,
)
except Exception as e:
last_error = e
logger.error("simulate_block: LLM call failed: %s", e, exc_info=True)
logger.error("simulate(%s): LLM call failed: %s", label, e, exc_info=True)
break
logger.error(
"simulate_block: all %d retries exhausted for block=%s; last_error=%s",
_MAX_JSON_RETRIES,
getattr(block, "name", "?"),
last_error,
)
yield (
"error",
msg = (
f"[SIMULATOR ERROR — NOT A BLOCK FAILURE] Failed after {_MAX_JSON_RETRIES} "
f"attempts: {last_error}",
f"attempts: {last_error}"
)
logger.error(
"simulate(%s): all retries exhausted; last_error=%s", label, last_error
)
raise ValueError(msg)
# ---------------------------------------------------------------------------
# Prompt builders
# ---------------------------------------------------------------------------
def build_simulation_prompt(block: Any, input_data: dict[str, Any]) -> tuple[str, str]:
"""Build (system_prompt, user_prompt) for block simulation."""
input_schema = block.input_schema.jsonschema()
output_schema = block.output_schema.jsonschema()
input_pins = _describe_schema_pins(input_schema)
output_pins = _describe_schema_pins(output_schema)
output_properties = list(output_schema.get("properties", {}).keys())
# Build a separate list for the "MUST include" instruction that excludes
# "error" — the prompt already tells the LLM to OMIT the error pin unless
# simulating a logical error. Including it in "MUST include" is contradictory.
required_output_properties = [k for k in output_properties if k != "error"]
block_name = getattr(block, "name", type(block).__name__)
block_description = getattr(block, "description", "No description available.")
# Include the block's run() source code so the LLM knows exactly how
# inputs are transformed to outputs. Truncate to avoid blowing up the
# prompt for very large blocks.
try:
run_source = inspect.getsource(block.run)
if len(run_source) > _MAX_INPUT_VALUE_CHARS:
run_source = run_source[:_MAX_INPUT_VALUE_CHARS] + "\n# ... [TRUNCATED]"
except (TypeError, OSError):
run_source = ""
implementation_section = ""
if run_source:
implementation_section = (
"\n## Block Implementation (run function source code)\n"
"```python\n"
f"{run_source}\n"
"```\n"
)
system_prompt = f"""You are simulating the execution of a software block called "{block_name}".
## Block Description
{block_description}
## Input Schema
{input_pins}
## Output Schema (what you must return)
{output_pins}
{implementation_section}
Your task: given the current inputs, produce realistic simulated outputs for this block.
{"Study the block's run() source code above to understand exactly how inputs are transformed to outputs." if run_source else "Use the block description and schemas to infer realistic outputs."}
Rules:
- Respond with a single JSON object.
- Only include output pins that have meaningful values. Omit pins with no relevant output.
- Assume all credentials and API keys are present and valid. Do not simulate auth failures.
- Generate REALISTIC, useful outputs: real-looking URLs, plausible text, valid data structures.
- Never return empty strings, null, or "N/A" for pins that should have content.
- You MAY simulate logical errors (e.g., invalid input format, unsupported operation) when the inputs warrant it — use the "error" pin for these. But do NOT simulate auth/credential errors.
- Do not include extra keys beyond the defined output pins.
Available output pins: {json.dumps(required_output_properties)}
"""
# Strip credentials from input so the LLM doesn't see null/empty creds
# and incorrectly simulate auth failures. Use the block's schema to
# detect credential fields when available, falling back to common names.
try:
cred_fields = set(block.input_schema.get_credentials_fields())
except (AttributeError, TypeError):
cred_fields = set()
exclude_keys = cred_fields | _COMMON_CRED_KEYS
safe_inputs = {
k: v
for k, v in _truncate_input_values(input_data).items()
if k not in exclude_keys
}
user_prompt = f"## Current Inputs\n{json.dumps(safe_inputs, indent=2)}"
return system_prompt, user_prompt
# ---------------------------------------------------------------------------
# Public simulation functions
# ---------------------------------------------------------------------------
def _get_platform_openrouter_key() -> str | None:
"""Return the platform's OpenRouter API key, or None if unavailable."""
try:
from backend.util.settings import Settings # noqa: PLC0415
key = Settings().secrets.open_router_api_key
return key if key else None
except Exception:
return None
def prepare_dry_run(block: Any, input_data: dict[str, Any]) -> dict[str, Any] | None:
"""Prepare *input_data* for a dry-run execution of *block*.
Returns a **modified copy** of *input_data* for blocks that should execute
for real with cheap settings, or ``None`` when the block should be
LLM-simulated instead.
- **OrchestratorBlock** executes for real with the platform's simulation
model (iterations capped to 1). Uses the platform OpenRouter key so no
user credentials are needed. Falls back to LLM simulation if the
platform key is unavailable.
- **AgentExecutorBlock** executes for real so it can spawn a child graph
execution. The child graph inherits ``dry_run=True`` and its blocks
are simulated. No credentials are needed.
"""
if isinstance(block, OrchestratorBlock):
or_key = _get_platform_openrouter_key()
if not or_key:
logger.info(
"Dry-run: no platform OpenRouter key, "
"falling back to LLM simulation for OrchestratorBlock"
)
return None
original = input_data.get("agent_mode_max_iterations", 0)
max_iters = 1 if original != 0 else 0
sim_model = _simulator_model()
# Keep the original credentials dict in input_data so the block's
# JSON schema validation passes (validate_data strips None values,
# making the field absent and failing the "required" check).
# The actual credentials are injected via extra_exec_kwargs in
# manager.py using _dry_run_api_key.
return {
**input_data,
"agent_mode_max_iterations": max_iters,
"model": sim_model,
"_dry_run_api_key": or_key,
}
if isinstance(block, AgentExecutorBlock):
return {**input_data}
return None
def get_dry_run_credentials(
input_data: dict[str, Any],
) -> Any | None:
"""Build an ``APIKeyCredentials`` for dry-run OrchestratorBlock execution.
Returns credentials using the platform's OpenRouter key (injected by
``prepare_dry_run``), or ``None`` if not a dry-run override.
"""
api_key = input_data.pop("_dry_run_api_key", None)
if not api_key:
return None
try:
from backend.blocks.llm import APIKeyCredentials # noqa: PLC0415
from backend.integrations.providers import ProviderName # noqa: PLC0415
return APIKeyCredentials(
id="dry-run-platform",
provider=ProviderName.OPEN_ROUTER,
api_key=api_key,
title="Dry-run simulation",
expires_at=None,
)
except Exception:
logger.warning("Failed to create dry-run credentials", exc_info=True)
return None
def _default_for_input_result(result_schema: dict[str, Any], name: str | None) -> Any:
"""Return a type-appropriate sample value for an AgentInputBlock's result pin.
Typed subclasses (AgentNumberInputBlock, AgentDateInputBlock, etc.)
declare a specific type/format on their ``result`` output. When dry-run
has no user-supplied value, this generates a fallback that matches the
expected type so downstream validation doesn't fail with a plain string.
"""
pin_type = result_schema.get("type", "string")
fmt = result_schema.get("format")
if pin_type == "integer":
return 0
if pin_type == "number":
return 0.0
if pin_type == "boolean":
return False
if pin_type == "array":
return []
if pin_type == "object":
return {}
if fmt == "date":
from datetime import date as _date # noqa: PLC0415
return _date.today().isoformat()
if fmt == "time":
return "00:00:00"
# Default: use the block's name as a sample string.
return name or "sample input"
async def simulate_block(
block: Any,
input_data: dict[str, Any],
) -> AsyncGenerator[tuple[str, Any], None]:
"""Simulate block execution using an LLM.
All block types (including MCPToolBlock) use the same generic LLM prompt
which includes the block's run() source code for accurate simulation.
Note: callers should check ``prepare_dry_run(block, input_data)`` first.
OrchestratorBlock and AgentExecutorBlock execute for real in dry-run mode
(see manager.py).
Yields (output_name, output_data) tuples matching the Block.execute() interface.
On unrecoverable failure, yields a single ("error", "[SIMULATOR ERROR ...") tuple.
"""
# Input/output blocks are pure passthrough -- they just forward their
# input values. No LLM simulation needed.
if isinstance(block, AgentInputBlock):
value = input_data.get("value")
if value is None:
# Dry-run with no user input: use first dropdown option or name,
# then coerce to a type-appropriate fallback so typed subclasses
# (e.g. AgentNumberInputBlock → int, AgentDateInputBlock → date)
# don't fail validation with a plain string.
placeholder = input_data.get("options") or input_data.get(
"placeholder_values"
)
if placeholder and isinstance(placeholder, list) and placeholder:
value = placeholder[0]
else:
result_schema = (
block.output_schema.jsonschema()
.get("properties", {})
.get("result", {})
)
value = _default_for_input_result(
result_schema, input_data.get("name", "sample input")
)
yield "result", value
return
if isinstance(block, AgentOutputBlock):
# Mirror AgentOutputBlock.run(): if a format string is provided,
# apply Jinja2 formatting and yield only "output"; otherwise yield
# both "output" (raw value) and "name".
fmt = input_data.get("format", "")
value = input_data.get("value")
name = input_data.get("name", "")
if fmt:
try:
from backend.util.text import TextFormatter # noqa: PLC0415
escape_html = input_data.get("escape_html", False)
formatter = TextFormatter(autoescape=escape_html)
formatted = await formatter.format_string(fmt, {name: value})
yield "output", formatted
except Exception as e:
yield "output", f"Error: {e}, {value}"
else:
yield "output", value
if name:
yield "name", name
return
output_schema = block.output_schema.jsonschema()
output_properties: dict[str, Any] = output_schema.get("properties", {})
system_prompt, user_prompt = build_simulation_prompt(block, input_data)
label = getattr(block, "name", "?")
try:
parsed = await _call_llm_for_simulation(system_prompt, user_prompt, label=label)
# Track which pins were yielded so we can fill in missing required
# ones afterwards — downstream nodes connected to unyielded pins
# would otherwise stall in INCOMPLETE state.
yielded_pins: set[str] = set()
# Yield pins present in the LLM response with meaningful values.
# We skip None and empty strings but preserve valid falsy values
# like False, 0, and [].
for pin_name in output_properties:
if pin_name not in parsed:
continue
value = parsed[pin_name]
if value is None or value == "":
continue
yield pin_name, value
yielded_pins.add(pin_name)
# For any required output pins the LLM omitted (excluding "error"),
# yield a type-appropriate default so downstream nodes still fire.
required_pins = set(output_schema.get("required", []))
for pin_name in required_pins - yielded_pins - {"error"}:
pin_schema = output_properties.get(pin_name, {})
default = _default_for_schema(pin_schema)
logger.debug(
"simulate(%s): filling missing required pin %r with default %r",
label,
pin_name,
default,
)
yield pin_name, default
except (RuntimeError, ValueError) as e:
yield "error", str(e)
def _default_for_schema(pin_schema: dict[str, Any]) -> Any:
"""Return a sensible default value for a JSON schema type."""
pin_type = pin_schema.get("type", "string")
if pin_type == "string":
return ""
if pin_type == "integer":
return 0
if pin_type == "number":
return 0.0
if pin_type == "boolean":
return False
if pin_type == "array":
return []
if pin_type == "object":
return {}
return ""

View File

@@ -0,0 +1,475 @@
"""Tests for the LLM-powered block simulator (dry-run execution).
Covers:
- Prompt building (credential stripping, realistic-output instructions)
- Input/output block passthrough
- prepare_dry_run routing
- simulate_block output-pin filling
"""
from __future__ import annotations
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from backend.executor.simulator import (
_truncate_input_values,
_truncate_value,
build_simulation_prompt,
prepare_dry_run,
simulate_block,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_block(
*,
name: str = "TestBlock",
description: str = "A test block.",
input_schema: dict[str, Any] | None = None,
output_schema: dict[str, Any] | None = None,
) -> Any:
"""Create a minimal mock block for testing."""
block = MagicMock()
block.name = name
block.description = description
block.input_schema.jsonschema.return_value = input_schema or {
"properties": {"query": {"type": "string"}},
"required": ["query"],
}
block.output_schema.jsonschema.return_value = output_schema or {
"properties": {
"result": {"type": "string"},
"error": {"type": "string"},
},
"required": ["result"],
}
return block
# ---------------------------------------------------------------------------
# Truncation
# ---------------------------------------------------------------------------
class TestTruncation:
def test_short_string_unchanged(self) -> None:
assert _truncate_value("hello") == "hello"
def test_long_string_truncated(self) -> None:
long_str = "x" * 30000
result = _truncate_value(long_str)
assert result.endswith("... [TRUNCATED]")
assert len(result) < 25000
def test_nested_dict_truncation(self) -> None:
data = {"key": "y" * 30000}
result = _truncate_input_values(data)
assert result["key"].endswith("... [TRUNCATED]")
# ---------------------------------------------------------------------------
# Prompt building
# ---------------------------------------------------------------------------
class TestBuildSimulationPrompt:
def test_system_prompt_contains_block_name(self) -> None:
block = _make_block(name="WebSearchBlock")
system, _user = build_simulation_prompt(block, {"query": "test"})
assert "WebSearchBlock" in system
def test_system_prompt_contains_realistic_instruction(self) -> None:
block = _make_block()
system, _ = build_simulation_prompt(block, {})
assert "REALISTIC" in system
assert "Never return empty strings" in system
def test_system_prompt_contains_no_auth_failure_instruction(self) -> None:
block = _make_block()
system, _ = build_simulation_prompt(block, {})
assert "Do not simulate auth failures" in system
def test_credentials_stripped_from_user_prompt(self) -> None:
block = _make_block()
_, user = build_simulation_prompt(
block,
{
"query": "test",
"credentials": {"api_key": "sk-secret"},
"api_key": "sk-secret",
"token": "tok-secret",
"secret": "shh",
"normal_field": "visible",
},
)
assert "sk-secret" not in user
assert "tok-secret" not in user
assert "shh" not in user
assert "visible" in user
def test_error_pin_always_empty_instruction(self) -> None:
block = _make_block()
system, _ = build_simulation_prompt(block, {})
assert "error" in system.lower()
assert "empty string" in system.lower()
def test_output_pin_names_in_prompt(self) -> None:
block = _make_block(
output_schema={
"properties": {
"url": {"type": "string"},
"status_code": {"type": "integer"},
},
}
)
system, _ = build_simulation_prompt(block, {})
assert "url" in system
assert "status_code" in system
# ---------------------------------------------------------------------------
# prepare_dry_run routing
# ---------------------------------------------------------------------------
class TestPrepareDryRun:
def test_orchestrator_uses_simulation_model(self) -> None:
"""OrchestratorBlock should use the simulation model and cap iterations."""
from unittest.mock import patch
from backend.blocks.orchestrator import OrchestratorBlock
block = OrchestratorBlock()
with patch(
"backend.executor.simulator._get_platform_openrouter_key",
return_value="sk-or-test-key",
):
result = prepare_dry_run(
block,
{"agent_mode_max_iterations": 10, "model": "gpt-4o", "other": "val"},
)
assert result is not None
assert result["agent_mode_max_iterations"] == 1
assert result["other"] == "val"
assert result["model"] != "gpt-4o" # overridden to simulation model
# credentials left as-is so block schema validation passes —
# actual creds injected via extra_exec_kwargs in manager.py
assert "credentials" not in result
assert result["_dry_run_api_key"] == "sk-or-test-key"
def test_orchestrator_zero_stays_zero(self) -> None:
from unittest.mock import patch
from backend.blocks.orchestrator import OrchestratorBlock
block = OrchestratorBlock()
with patch(
"backend.executor.simulator._get_platform_openrouter_key",
return_value="sk-or-test-key",
):
result = prepare_dry_run(block, {"agent_mode_max_iterations": 0})
assert result is not None
assert result["agent_mode_max_iterations"] == 0
def test_orchestrator_falls_back_without_key(self) -> None:
"""Without platform OpenRouter key, OrchestratorBlock falls back
to LLM simulation (returns None)."""
from unittest.mock import patch
from backend.blocks.orchestrator import OrchestratorBlock
block = OrchestratorBlock()
with patch(
"backend.executor.simulator._get_platform_openrouter_key",
return_value=None,
):
result = prepare_dry_run(block, {"agent_mode_max_iterations": 5})
assert result is None
def test_agent_executor_block_passthrough(self) -> None:
from backend.blocks.agent import AgentExecutorBlock
block = AgentExecutorBlock()
result = prepare_dry_run(block, {"graph_id": "abc"})
assert result is not None
assert result["graph_id"] == "abc"
def test_agent_executor_block_returns_identical_copy(self) -> None:
"""AgentExecutorBlock must execute for real during dry-run so it can
spawn a child graph execution. ``prepare_dry_run`` returns a shallow
copy of input_data with no modifications -- every key/value must be
identical, but the returned dict must be a *different* object so
callers can mutate it without affecting the original."""
from backend.blocks.agent import AgentExecutorBlock
block = AgentExecutorBlock()
input_data = {
"user_id": "user-42",
"graph_id": "graph-99",
"graph_version": 3,
"inputs": {"text": "hello"},
"input_schema": {"props": "a"},
"output_schema": {"props": "b"},
}
result = prepare_dry_run(block, input_data)
assert result is not None
# Must be a different object (copy, not alias)
assert result is not input_data
# Every key/value must be identical -- no modifications
assert result == input_data
# Mutating the copy must not affect the original
result["extra"] = "added"
assert "extra" not in input_data
def test_regular_block_returns_none(self) -> None:
block = _make_block()
result = prepare_dry_run(block, {"query": "test"})
assert result is None
# ---------------------------------------------------------------------------
# simulate_block input/output passthrough
# ---------------------------------------------------------------------------
class TestSimulateBlockPassthrough:
@pytest.mark.asyncio
async def test_input_block_passthrough_with_value(self) -> None:
from backend.blocks.io import AgentInputBlock
block = AgentInputBlock()
outputs = []
async for name, data in simulate_block(block, {"value": "hello world"}):
outputs.append((name, data))
assert outputs == [("result", "hello world")]
@pytest.mark.asyncio
async def test_input_block_passthrough_without_value_uses_name(self) -> None:
from backend.blocks.io import AgentInputBlock
block = AgentInputBlock()
outputs = []
async for name, data in simulate_block(block, {"name": "user_query"}):
outputs.append((name, data))
assert outputs == [("result", "user_query")]
@pytest.mark.asyncio
async def test_input_block_passthrough_uses_placeholder(self) -> None:
from backend.blocks.io import AgentInputBlock
block = AgentInputBlock()
outputs = []
async for name, data in simulate_block(
block, {"options": ["option1", "option2"]}
):
outputs.append((name, data))
assert outputs == [("result", "option1")]
@pytest.mark.asyncio
async def test_output_block_passthrough_no_format(self) -> None:
from backend.blocks.io import AgentOutputBlock
block = AgentOutputBlock()
outputs = []
async for name, data in simulate_block(
block, {"value": "result data", "name": "output_name"}
):
outputs.append((name, data))
assert ("output", "result data") in outputs
assert ("name", "output_name") in outputs
@pytest.mark.asyncio
async def test_output_block_with_format_applies_jinja2(self) -> None:
"""When a format string is provided, AgentOutputBlock simulation should
apply Jinja2 formatting and yield only 'output' (no 'name' pin)."""
from backend.blocks.io import AgentOutputBlock
block = AgentOutputBlock()
outputs = []
async for name, data in simulate_block(
block,
{
"value": "Hello, World!",
"name": "output_1",
"format": "{{ output_1 }}!!",
},
):
outputs.append((name, data))
assert len(outputs) == 1
assert outputs[0] == ("output", "Hello, World!!!")
@pytest.mark.asyncio
async def test_output_block_with_format_no_name_pin(self) -> None:
"""When format is provided, the 'name' pin must NOT be yielded."""
from backend.blocks.io import AgentOutputBlock
block = AgentOutputBlock()
output_names = []
async for name, data in simulate_block(
block,
{
"value": "42",
"name": "output_2",
"format": "{{ output_2 }}",
},
):
output_names.append(name)
assert "name" not in output_names
@pytest.mark.asyncio
async def test_input_block_no_value_no_name_empty_options(self) -> None:
"""AgentInputBlock with value=None, name=None, and empty
options list must not crash.
When the ``name`` key is present but explicitly ``None``,
``dict.get("name", "sample input")`` returns ``None`` (the key
exists), so the fallback sentinel is *not* used. The test verifies
the code does not raise and yields a single result."""
from backend.blocks.io import AgentInputBlock
block = AgentInputBlock()
outputs = []
async for name, data in simulate_block(
block, {"value": None, "name": None, "options": []}
):
outputs.append((name, data))
# Does not crash; yields exactly one output
assert len(outputs) == 1
assert outputs[0][0] == "result"
@pytest.mark.asyncio
async def test_input_block_missing_all_fields_uses_sentinel(self) -> None:
"""AgentInputBlock with no value, name, or placeholders at all should
fall back to the ``"sample input"`` sentinel."""
from backend.blocks.io import AgentInputBlock
block = AgentInputBlock()
outputs = []
async for name, data in simulate_block(block, {}):
outputs.append((name, data))
assert outputs == [("result", "sample input")]
@pytest.mark.asyncio
async def test_generic_block_zero_outputs_handled(self) -> None:
"""When the LLM returns a valid JSON object but none of the output pins
have meaningful values, ``simulate_block`` should still yield defaults
for required output pins so downstream nodes don't stall."""
block = _make_block()
with patch(
"backend.executor.simulator._call_llm_for_simulation",
new_callable=AsyncMock,
# All output pin values are None or empty -- nothing to yield
return_value={"result": None, "error": ""},
):
outputs = []
async for name, data in simulate_block(block, {"query": "test"}):
outputs.append((name, data))
# "result" is required, so a default empty string is yielded
assert outputs == [("result", "")]
@pytest.mark.asyncio
async def test_generic_block_calls_llm(self) -> None:
"""Generic blocks should call _call_llm_for_simulation."""
block = _make_block()
with patch(
"backend.executor.simulator._call_llm_for_simulation",
new_callable=AsyncMock,
return_value={"result": "simulated result", "error": ""},
) as mock_llm:
outputs = []
async for name, data in simulate_block(block, {"query": "test"}):
outputs.append((name, data))
mock_llm.assert_called_once()
assert ("result", "simulated result") in outputs
# Empty error pin is omitted — not yielded
assert ("error", "") not in outputs
@pytest.mark.asyncio
async def test_generic_block_omits_missing_pins(self) -> None:
"""Missing output pins are omitted (not yielded)."""
block = _make_block()
with patch(
"backend.executor.simulator._call_llm_for_simulation",
new_callable=AsyncMock,
return_value={"result": "data"}, # missing "error" pin
):
outputs: dict[str, Any] = {}
async for name, data in simulate_block(block, {"query": "test"}):
outputs[name] = data
assert outputs["result"] == "data"
# Missing pins are omitted — only meaningful values are yielded
assert "error" not in outputs
@pytest.mark.asyncio
async def test_generic_block_preserves_falsy_values(self) -> None:
"""Valid falsy values like False, 0, and [] must be yielded, not dropped."""
block = _make_block(
output_schema={
"properties": {
"flag": {"type": "boolean"},
"count": {"type": "integer"},
"items": {"type": "array"},
},
"required": ["flag", "count", "items"],
}
)
with patch(
"backend.executor.simulator._call_llm_for_simulation",
new_callable=AsyncMock,
return_value={"flag": False, "count": 0, "items": []},
):
outputs: dict[str, Any] = {}
async for name, data in simulate_block(block, {"query": "test"}):
outputs[name] = data
assert outputs["flag"] is False
assert outputs["count"] == 0
assert outputs["items"] == []
@pytest.mark.asyncio
async def test_llm_failure_yields_error(self) -> None:
"""When LLM fails, should yield an error tuple."""
block = _make_block()
with patch(
"backend.executor.simulator._call_llm_for_simulation",
new_callable=AsyncMock,
side_effect=RuntimeError("No client"),
):
outputs = []
async for name, data in simulate_block(block, {"query": "test"}):
outputs.append((name, data))
assert len(outputs) == 1
assert outputs[0][0] == "error"
assert "No client" in outputs[0][1]

View File

@@ -923,6 +923,11 @@ async def add_graph_execution(
execution_context.parent_execution_id if execution_context else None
)
# When execution_context is provided (e.g. from AgentExecutorBlock),
# inherit dry_run so child-graph validation skips credential checks.
if execution_context and execution_context.dry_run:
dry_run = True
# Create new execution
graph, starting_nodes_input, compiled_nodes_input_masks, nodes_to_skip = (
await validate_and_construct_node_execution_input(

View File

@@ -56,6 +56,10 @@ export const useFlow = () => {
flowExecutionID: parseAsString,
});
const isGraphRunning = useGraphStore(
useShallow((state) => state.isGraphRunning),
);
const { data: executionDetails } = useGetV1GetExecutionDetails(
flowID || "",
flowExecutionID || "",
@@ -63,6 +67,11 @@ export const useFlow = () => {
query: {
select: (res) => res.data as GetV1GetExecutionDetails200,
enabled: !!flowID && !!flowExecutionID,
// Poll while the graph is running to catch updates that arrive before
// the WebSocket subscription is established (race condition on fast
// executions like dry-runs). Stops once the execution reaches a
// terminal state and isGraphRunning becomes false.
refetchInterval: isGraphRunning ? 1000 : false,
},
},
);

View File

@@ -92,7 +92,11 @@ export const CustomNode: React.FC<NodeProps<CustomNode>> = React.memo(
const hasOutputError =
typeof outputData === "object" &&
outputData !== null &&
"error" in outputData;
"error" in outputData &&
Array.isArray(outputData.error) &&
outputData.error.some(
(v: unknown) => v !== "" && v !== null && v !== undefined,
);
const hasErrors = hasConfigErrors || hasOutputError;

View File

@@ -4,7 +4,7 @@ import { Button } from "@/components/atoms/Button/Button";
import { Text } from "@/components/atoms/Text/Text";
import { Dialog } from "@/components/molecules/Dialog/Dialog";
import { useRouter } from "next/navigation";
import { useEffect } from "react";
import { useEffect, useRef } from "react";
import { useResetRateLimit } from "../../hooks/useResetRateLimit";
interface Props {
@@ -18,7 +18,7 @@ interface Props {
onCreditChange?: () => void;
}
function formatCents(cents: number): string {
export function formatCents(cents: number): string {
return `$${(cents / 100).toFixed(2)}`;
}
@@ -38,11 +38,16 @@ export function RateLimitResetDialog({
});
const router = useRouter();
// Stable ref for the callback so the effect only re-fires when
// `isOpen` changes, not when the function reference changes.
const onCreditChangeRef = useRef(onCreditChange);
onCreditChangeRef.current = onCreditChange;
// Refresh the credit balance each time the dialog opens so we never
// block a valid reset due to a stale client-side balance.
useEffect(() => {
if (isOpen) onCreditChange?.();
}, [isOpen]); // eslint-disable-line react-hooks/exhaustive-deps
if (isOpen) onCreditChangeRef.current?.();
}, [isOpen]);
// Whether to hide the reset button entirely
const cannotReset = isWeeklyExhausted || hasInsufficientCredits;

View File

@@ -1,6 +1,7 @@
import type { CoPilotUsageStatus } from "@/app/api/__generated__/models/coPilotUsageStatus";
import { Button } from "@/components/atoms/Button/Button";
import Link from "next/link";
import { formatCents } from "../RateLimitResetDialog/RateLimitResetDialog";
import { useResetRateLimit } from "../../hooks/useResetRateLimit";
export function formatResetTime(
@@ -91,7 +92,7 @@ function ResetButton({
>
{isPending
? "Resetting..."
: `Reset daily limit for $${(cost / 100).toFixed(2)}`}
: `Reset daily limit for ${formatCents(cost)}`}
</Button>
);
}

View File

@@ -5,11 +5,20 @@ import {
import { toast } from "@/components/molecules/Toast/use-toast";
import { ApiError } from "@/lib/autogpt-server-api";
import { useQueryClient } from "@tanstack/react-query";
import { useRef } from "react";
export function useResetRateLimit(options?: {
onSuccess?: () => void;
onCreditChange?: () => void;
}) {
// Use refs so mutation callbacks always see the latest options,
// avoiding stale-closure issues when the caller re-renders with
// different callback references.
const onSuccessRef = useRef(options?.onSuccess);
onSuccessRef.current = options?.onSuccess;
const onCreditChangeRef = useRef(options?.onCreditChange);
onCreditChangeRef.current = options?.onCreditChange;
const queryClient = useQueryClient();
const { mutate: resetUsage, isPending } = usePostV2ResetCopilotUsage({
mutation: {
@@ -20,13 +29,13 @@ export function useResetRateLimit(options?: {
await queryClient.invalidateQueries({
queryKey: getGetV2GetCopilotUsageQueryKey(),
});
options?.onCreditChange?.();
onCreditChangeRef.current?.();
toast({
title: "Rate limit reset",
description:
"Your daily usage limit has been reset. You can continue working.",
});
options?.onSuccess?.();
onSuccessRef.current?.();
},
onError: (error: unknown) => {
const message =

View File

@@ -58,7 +58,7 @@ Tool and block identifiers provided in `tools` and `blocks` are validated at run
| system_context | Optional additional context prepended to the prompt. Use this to constrain autopilot behavior, provide domain context, or set output format requirements. | str | No |
| session_id | Session ID to continue an existing autopilot conversation. Leave empty to start a new session. Use the session_id output from a previous run to continue. | str | No |
| max_recursion_depth | Maximum nesting depth when the autopilot calls this block recursively (sub-agent pattern). Prevents infinite loops. | int | No |
| tools | Tool names to filter. Works with tools_exclude to form an allow-list or deny-list. Leave empty to apply no tool filter. | List["add_understanding" \| "bash_exec" \| "browser_act" \| "browser_navigate" \| "browser_screenshot" \| "connect_integration" \| "continue_run_block" \| "create_agent" \| "create_feature_request" \| "create_folder" \| "customize_agent" \| "delete_folder" \| "delete_workspace_file" \| "edit_agent" \| "find_agent" \| "find_block" \| "find_library_agent" \| "fix_agent_graph" \| "get_agent_building_guide" \| "get_doc_page" \| "get_mcp_guide" \| "list_folders" \| "list_workspace_files" \| "move_agents_to_folder" \| "move_folder" \| "read_workspace_file" \| "run_agent" \| "run_block" \| "run_mcp_tool" \| "search_docs" \| "search_feature_requests" \| "update_folder" \| "validate_agent_graph" \| "view_agent_output" \| "web_fetch" \| "write_workspace_file" \| "Edit" \| "Glob" \| "Grep" \| "Read" \| "Task" \| "TodoWrite" \| "WebSearch" \| "Write"] | No |
| tools | Tool names to filter. Works with tools_exclude to form an allow-list or deny-list. Leave empty to apply no tool filter. | List["add_understanding" \| "ask_question" \| "bash_exec" \| "browser_act" \| "browser_navigate" \| "browser_screenshot" \| "connect_integration" \| "continue_run_block" \| "create_agent" \| "create_feature_request" \| "create_folder" \| "customize_agent" \| "delete_folder" \| "delete_workspace_file" \| "edit_agent" \| "find_agent" \| "find_block" \| "find_library_agent" \| "fix_agent_graph" \| "get_agent_building_guide" \| "get_doc_page" \| "get_mcp_guide" \| "list_folders" \| "list_workspace_files" \| "move_agents_to_folder" \| "move_folder" \| "read_workspace_file" \| "run_agent" \| "run_block" \| "run_mcp_tool" \| "search_docs" \| "search_feature_requests" \| "update_folder" \| "validate_agent_graph" \| "view_agent_output" \| "web_fetch" \| "write_workspace_file" \| "Edit" \| "Glob" \| "Grep" \| "Read" \| "Task" \| "TodoWrite" \| "WebSearch" \| "Write"] | No |
| tools_exclude | Controls how the 'tools' list is interpreted. True (default): 'tools' is a deny-list — listed tools are blocked, all others are allowed. An empty 'tools' list means allow everything. False: 'tools' is an allow-list — only listed tools are permitted. | bool | No |
| blocks | Block identifiers to filter when the copilot uses run_block. Each entry can be: a block name (e.g. 'HTTP Request'), a full block UUID, or the first 8 hex characters of the UUID (e.g. 'c069dc6b'). Works with blocks_exclude. Leave empty to apply no block filter. | List[str] | No |
| blocks_exclude | Controls how the 'blocks' list is interpreted. True (default): 'blocks' is a deny-list — listed blocks are blocked, all others are allowed. An empty 'blocks' list means allow everything. False: 'blocks' is an allow-list — only listed blocks are permitted. | bool | No |