fix(backend): propagate dry-run mode to special blocks (Orchestrator, AgentExecutor, MCP)

Previously dry-run mode simulated ALL blocks via LLM, but this didn't work
well for OrchestratorBlock, AgentExecutorBlock, and MCPToolBlock:

- OrchestratorBlock & AgentExecutorBlock now execute for real in dry-run
  mode so the orchestrator can make LLM calls and agent executors can
  spawn child graphs. Their downstream tool blocks and child-graph blocks
  are still simulated. Credential fields from node defaults are restored
  since validate_exec wipes them in dry-run mode.

- MCPToolBlock gets a specialised simulate_mcp_block() that builds an
  LLM prompt grounded in the selected tool's name and JSON Schema,
  producing more realistic mock responses than the generic simulator.
This commit is contained in:
Zamil Majdy
2026-03-26 16:15:26 +07:00
parent 24d0c35ed3
commit 4c85f2399a
4 changed files with 272 additions and 4 deletions

View File

@@ -245,6 +245,14 @@ real API calls, credentials, or credits:
3. **Iterate**: If the dry run reveals wiring issues or missing inputs, fix
the agent JSON and re-save before suggesting a real execution.
**Special block behaviour in dry-run mode:**
- **OrchestratorBlock** and **AgentExecutorBlock** execute for real so the
orchestrator can make LLM calls and agent executors can spawn child graphs.
Their downstream tool blocks and child-graph blocks are still simulated.
- **MCPToolBlock** is simulated using the selected tool's name and JSON Schema
so the LLM can produce a realistic mock response without connecting to the
MCP server.
### Example: Simple AI Text Processor
A minimal agent with input, processing, and output:

View File

@@ -10,7 +10,12 @@ import backend.copilot.tools.run_block as run_block_module
from backend.copilot.tools.helpers import execute_block
from backend.copilot.tools.models import BlockOutputResponse, ErrorResponse
from backend.copilot.tools.run_block import RunBlockTool
from backend.executor.simulator import build_simulation_prompt, simulate_block
from backend.executor.simulator import (
_build_mcp_simulation_prompt,
build_simulation_prompt,
simulate_block,
simulate_mcp_block,
)
# ---------------------------------------------------------------------------
# Helpers
@@ -493,3 +498,113 @@ async def test_execute_block_dry_run_simulator_error_returns_error_response():
assert isinstance(response, ErrorResponse)
assert "[SIMULATOR ERROR" in response.message
# ---------------------------------------------------------------------------
# simulate_mcp_block tests
# ---------------------------------------------------------------------------
def test_build_mcp_simulation_prompt_contains_tool_info():
"""MCP simulation prompt should include tool name, schema, and arguments."""
input_data = {
"server_url": "https://mcp.example.com/mcp",
"selected_tool": "get_weather",
"tool_input_schema": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
"tool_arguments": {"city": "London"},
}
system_prompt, user_prompt = _build_mcp_simulation_prompt(input_data)
assert "get_weather" in system_prompt
assert "mcp.example.com" in system_prompt
assert '"city"' in system_prompt # schema
assert "London" in user_prompt # arguments
def test_build_mcp_simulation_prompt_handles_empty_schema():
"""MCP prompt handles missing/empty tool_input_schema gracefully."""
input_data = {
"selected_tool": "my_tool",
"tool_input_schema": {},
"tool_arguments": {},
}
system_prompt, user_prompt = _build_mcp_simulation_prompt(input_data)
assert "my_tool" in system_prompt
assert "(none)" in system_prompt
@pytest.mark.asyncio
async def test_simulate_mcp_block_basic():
"""simulate_mcp_block returns result and error tuples."""
mock_client = AsyncMock()
mock_client.chat.completions.create = AsyncMock(
return_value=make_openai_response(
'{"result": {"temperature": 22, "condition": "sunny"}, "error": ""}'
)
)
input_data = {
"server_url": "https://mcp.example.com/mcp",
"selected_tool": "get_weather",
"tool_input_schema": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
"tool_arguments": {"city": "London"},
}
with patch(
"backend.executor.simulator.get_openai_client", return_value=mock_client
):
outputs = []
async for name, data in simulate_mcp_block(None, input_data):
outputs.append((name, data))
assert len(outputs) == 2
result_outputs = [d for n, d in outputs if n == "result"]
assert result_outputs[0]["temperature"] == 22
error_outputs = [d for n, d in outputs if n == "error"]
assert error_outputs[0] == ""
@pytest.mark.asyncio
async def test_simulate_mcp_block_no_client():
"""When no OpenAI client is available, yields SIMULATOR ERROR."""
with patch("backend.executor.simulator.get_openai_client", return_value=None):
outputs = []
async for name, data in simulate_mcp_block(None, {}):
outputs.append((name, data))
assert len(outputs) == 1
assert outputs[0][0] == "error"
assert "[SIMULATOR ERROR" in outputs[0][1]
@pytest.mark.asyncio
async def test_simulate_mcp_block_retries_on_bad_json():
"""simulate_mcp_block retries on invalid JSON, then succeeds."""
mock_client = AsyncMock()
mock_client.chat.completions.create = AsyncMock(
side_effect=[
make_openai_response("not json"),
make_openai_response('{"result": "ok", "error": ""}'),
]
)
with patch(
"backend.executor.simulator.get_openai_client", return_value=mock_client
):
outputs = []
async for name, data in simulate_mcp_block(None, {"selected_tool": "test"}):
outputs.append((name, data))
assert mock_client.chat.completions.create.call_count == 2
assert ("result", "ok") in outputs

View File

@@ -21,6 +21,7 @@ from backend.blocks._base import BlockSchema
from backend.blocks.agent import AgentExecutorBlock
from backend.blocks.io import AgentOutputBlock
from backend.blocks.mcp.block import MCPToolBlock
from backend.blocks.orchestrator import OrchestratorBlock
from backend.data import redis_client as redis
from backend.data.block import BlockInput, BlockOutput, BlockOutputEntry
from backend.data.credit import UsageTransactionMetadata
@@ -81,7 +82,7 @@ from backend.util.settings import Settings
from .activity_status_generator import generate_activity_status_for_execution
from .automod.manager import automod_manager
from .cluster_lock import ClusterLock
from .simulator import simulate_block
from .simulator import simulate_block, simulate_mcp_block
from .utils import (
GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS,
GRAPH_EXECUTION_CANCEL_QUEUE_NAME,
@@ -279,6 +280,23 @@ async def execute_node(
"nodes_to_skip": nodes_to_skip or set(),
}
# OrchestratorBlock and AgentExecutorBlock execute for real even in dry-run
# mode so that the orchestrator can make LLM calls and the agent executor can
# create child graph executions (whose blocks are then simulated).
# validate_exec() in dry-run mode wipes missing credential fields to None,
# which would prevent credential acquisition. Restore them from node defaults
# so the block can run.
_dry_run_passthrough = execution_context.dry_run and isinstance(
node_block, (OrchestratorBlock, AgentExecutorBlock)
)
if _dry_run_passthrough:
for field_name in cast(
type[BlockSchema], node_block.input_schema
).get_credentials_fields():
default_value = node.input_default.get(field_name)
if default_value is not None and not input_data.get(field_name):
input_data[field_name] = default_value
# Last-minute fetch credentials + acquire a system-wide read-write lock to prevent
# changes during execution. ⚠️ This means a set of credentials can only be used by
# one (running) block at a time; simultaneous execution of blocks using same
@@ -375,8 +393,12 @@ async def execute_node(
scope.set_tag(f"execution_context.{k}", v)
try:
if execution_context.dry_run:
block_iter = simulate_block(node_block, input_data)
if execution_context.dry_run and not _dry_run_passthrough:
# MCPToolBlock gets a specialised simulation that uses its tool schema.
if isinstance(node_block, MCPToolBlock):
block_iter = simulate_mcp_block(node_block, input_data)
else:
block_iter = simulate_block(node_block, input_data)
else:
block_iter = node_block.execute(input_data, **extra_exec_kwargs)

View File

@@ -133,6 +133,129 @@ Output pin names you MUST include: {json.dumps(required_output_properties)}
return system_prompt, user_prompt
def _build_mcp_simulation_prompt(
input_data: dict[str, Any],
) -> tuple[str, str]:
"""Build (system_prompt, user_prompt) for MCP tool simulation.
Uses the tool name, its JSON Schema, and the supplied arguments to let the
LLM generate a realistic response.
"""
tool_name = input_data.get("selected_tool", "unknown_tool")
tool_schema = input_data.get("tool_input_schema", {})
tool_arguments = input_data.get("tool_arguments", {})
server_url = input_data.get("server_url", "")
schema_text = json.dumps(tool_schema, indent=2) if tool_schema else "(none)"
system_prompt = f"""You are simulating the execution of an MCP (Model Context Protocol) tool.
## Tool Details
- Tool name: {tool_name}
- MCP server: {server_url}
## Tool Input Schema
{schema_text}
Your task: given the tool arguments below, produce a realistic simulated output
for this MCP tool call.
Rules:
- Respond with a single JSON object with exactly two keys: "result" and "error".
- "result" should contain realistic output data that the tool would return.
- "error" should be "" (empty string) unless you are simulating a logical error.
- Assume all credentials and authentication are present and valid. Never simulate authentication failures.
- Base your response on what a tool named "{tool_name}" with the given schema would realistically return.
"""
safe_args = _truncate_input_values(tool_arguments)
user_prompt = f"## Tool Arguments\n{json.dumps(safe_args, indent=2)}"
return system_prompt, user_prompt
async def simulate_mcp_block(
_block: Any,
input_data: dict[str, Any],
) -> AsyncIterator[tuple[str, Any]]:
"""Simulate MCP tool execution using an LLM.
Unlike the generic ``simulate_block``, this builds a prompt grounded in
the selected MCP tool's name and JSON Schema so the LLM can produce a
realistic response for that specific tool.
Yields ``(output_name, output_data)`` tuples matching the Block.execute()
interface.
"""
client = get_openai_client()
if client is None:
yield (
"error",
"[SIMULATOR ERROR — NOT A BLOCK FAILURE] No LLM client available "
"(missing OpenAI/OpenRouter API key).",
)
return
system_prompt, user_prompt = _build_mcp_simulation_prompt(input_data)
model = _simulator_model()
last_error: Exception | None = None
for attempt in range(_MAX_JSON_RETRIES):
try:
response = await client.chat.completions.create(
model=model,
temperature=_TEMPERATURE,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
)
if not response.choices:
raise ValueError("LLM returned empty choices array")
raw = response.choices[0].message.content or ""
parsed = json.loads(raw)
if not isinstance(parsed, dict):
raise ValueError(f"LLM returned non-object JSON: {raw[:200]}")
logger.debug(
"simulate_mcp_block: tool=%s attempt=%d tokens=%s/%s",
input_data.get("selected_tool", "?"),
attempt + 1,
getattr(getattr(response, "usage", None), "prompt_tokens", "?"),
getattr(getattr(response, "usage", None), "completion_tokens", "?"),
)
yield "result", parsed.get("result", None)
yield "error", parsed.get("error", "")
return
except (json.JSONDecodeError, ValueError) as e:
last_error = e
logger.warning(
"simulate_mcp_block: JSON parse error on attempt %d/%d: %s",
attempt + 1,
_MAX_JSON_RETRIES,
e,
)
except Exception as e:
last_error = e
logger.error("simulate_mcp_block: LLM call failed: %s", e, exc_info=True)
break
logger.error(
"simulate_mcp_block: all %d retries exhausted for tool=%s; last_error=%s",
_MAX_JSON_RETRIES,
input_data.get("selected_tool", "?"),
last_error,
)
yield (
"error",
f"[SIMULATOR ERROR — NOT A BLOCK FAILURE] Failed after {_MAX_JSON_RETRIES} "
f"attempts: {last_error}",
)
async def simulate_block(
block: Any,
input_data: dict[str, Any],