mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
## Summary - **OrchestratorBlock & AgentExecutorBlock** now execute for real in dry-run mode so the orchestrator can make LLM calls and agent executors can spawn child graphs. Their downstream tool blocks and child-graph blocks are still simulated via `simulate_block()`. Credential fields from node defaults are restored since `validate_exec()` wipes them in dry-run mode. Agent-mode iterations capped at 1 in dry-run. - **All blocks** (including MCPToolBlock) are simulated via a single generic `simulate_block()` path. The LLM prompt is grounded by `inspect.getsource(block.run)`, giving the simulator access to the exact implementation of each block's `run()` method. This produces realistic mock responses for any block type without needing block-specific simulation logic. - Updated agent generation guide to document special block dry-run behavior. - Minor frontend fixes: exported `formatCents` from `RateLimitResetDialog` for reuse in `UsagePanelContent`, used `useRef` for stable callback references in `useResetRateLimit` to avoid stale closures. - 74 tests (21 existing dry-run + 53 new simulator tests covering prompt building, passthrough logic, and special block dry-run). ## Design The simulator (`backend/executor/simulator.py`) uses a two-tier approach: 1. **Passthrough blocks** (OrchestratorBlock, AgentExecutorBlock): `prepare_dry_run()` returns modified input_data so these blocks execute for real in `manager.py`. OrchestratorBlock gets `max_iterations=1` (agent mode) or 0 (traditional mode). AgentExecutorBlock spawns real child graph executions whose blocks inherit `dry_run=True`. 2. **All other blocks**: `simulate_block()` builds an LLM prompt containing: - Block name and description - Input/output schemas (JSON Schema) - The block's `run()` source code via `inspect.getsource(block.run)` - The actual input values (with credentials stripped and long values truncated) The LLM then role-plays the block's execution, producing realistic outputs grounded in the actual implementation. Special handling for input/output blocks: `AgentInputBlock` and `AgentOutputBlock` are pure passthrough (no LLM call needed). ## Test plan - [x] All 74 tests pass (`pytest backend/copilot/tools/test_dry_run.py backend/executor/simulator_test.py`) - [x] Pre-commit hooks pass (ruff, isort, black, pyright, frontend typecheck) - [x] CI: all checks green - [x] E2E: dry-run execution completes with `is_dry_run=true`, cost=0, no errors - [x] E2E: normal (non-dry-run) execution unchanged - [x] E2E: Create agent with OrchestratorBlock + tool blocks, run with `dry_run=True`, verify orchestrator makes real LLM calls while tool blocks are simulated - [x] E2E: AgentExecutorBlock spawns child graph in dry-run, child blocks are LLM-simulated - [x] E2E: Builder simulate button works end-to-end with special blocks --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
232 lines
8.1 KiB
Python
232 lines
8.1 KiB
Python
import logging
|
|
from typing import TYPE_CHECKING, Any, Optional
|
|
|
|
from backend.blocks._base import (
|
|
Block,
|
|
BlockCategory,
|
|
BlockInput,
|
|
BlockOutput,
|
|
BlockSchema,
|
|
BlockSchemaInput,
|
|
BlockType,
|
|
)
|
|
from backend.data.execution import ExecutionContext, ExecutionStatus, NodesInputMasks
|
|
from backend.data.model import NodeExecutionStats, SchemaField
|
|
from backend.util.json import validate_with_jsonschema
|
|
from backend.util.retry import func_retry
|
|
|
|
if TYPE_CHECKING:
|
|
from backend.executor.utils import LogMetadata
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AgentExecutorBlock(Block):
|
|
class Input(BlockSchemaInput):
|
|
user_id: str = SchemaField(description="User ID")
|
|
graph_id: str = SchemaField(description="Graph ID")
|
|
graph_version: int = SchemaField(description="Graph Version")
|
|
agent_name: Optional[str] = SchemaField(
|
|
default=None, description="Name to display in the Builder UI"
|
|
)
|
|
|
|
inputs: BlockInput = SchemaField(description="Input data for the graph")
|
|
input_schema: dict = SchemaField(description="Input schema for the graph")
|
|
output_schema: dict = SchemaField(description="Output schema for the graph")
|
|
|
|
nodes_input_masks: Optional[NodesInputMasks] = SchemaField(
|
|
default=None, hidden=True
|
|
)
|
|
|
|
@classmethod
|
|
def get_input_schema(cls, data: BlockInput) -> dict[str, Any]:
|
|
return data.get("input_schema", {})
|
|
|
|
@classmethod
|
|
def get_input_defaults(cls, data: BlockInput) -> BlockInput:
|
|
return data.get("inputs", {})
|
|
|
|
@classmethod
|
|
def get_missing_input(cls, data: BlockInput) -> set[str]:
|
|
required_fields = cls.get_input_schema(data).get("required", [])
|
|
# Check against the nested `inputs` dict, not the top-level node
|
|
# data — required fields like "topic" live inside data["inputs"],
|
|
# not at data["topic"].
|
|
provided = data.get("inputs", {})
|
|
return set(required_fields) - set(provided)
|
|
|
|
@classmethod
|
|
def get_mismatch_error(cls, data: BlockInput) -> str | None:
|
|
return validate_with_jsonschema(
|
|
cls.get_input_schema(data), data.get("inputs", {})
|
|
)
|
|
|
|
class Output(BlockSchema):
|
|
# Use BlockSchema to avoid automatic error field that could clash with graph outputs
|
|
pass
|
|
|
|
def __init__(self):
|
|
super().__init__(
|
|
id="e189baac-8c20-45a1-94a7-55177ea42565",
|
|
description="Executes an existing agent inside your agent",
|
|
input_schema=AgentExecutorBlock.Input,
|
|
output_schema=AgentExecutorBlock.Output,
|
|
block_type=BlockType.AGENT,
|
|
categories={BlockCategory.AGENT},
|
|
)
|
|
|
|
async def run(
|
|
self,
|
|
input_data: Input,
|
|
*,
|
|
graph_exec_id: str,
|
|
execution_context: ExecutionContext,
|
|
**kwargs,
|
|
) -> BlockOutput:
|
|
from backend.executor import utils as execution_utils
|
|
|
|
graph_exec = await execution_utils.add_graph_execution(
|
|
graph_id=input_data.graph_id,
|
|
graph_version=input_data.graph_version,
|
|
user_id=input_data.user_id,
|
|
inputs=input_data.inputs,
|
|
nodes_input_masks=input_data.nodes_input_masks,
|
|
execution_context=execution_context.model_copy(
|
|
update={"parent_execution_id": graph_exec_id},
|
|
),
|
|
dry_run=execution_context.dry_run,
|
|
)
|
|
|
|
logger = execution_utils.LogMetadata(
|
|
logger=_logger,
|
|
user_id=input_data.user_id,
|
|
graph_eid=graph_exec.id,
|
|
graph_id=input_data.graph_id,
|
|
node_eid="*",
|
|
node_id="*",
|
|
block_name=self.name,
|
|
)
|
|
|
|
try:
|
|
async for name, data in self._run(
|
|
graph_id=input_data.graph_id,
|
|
graph_version=input_data.graph_version,
|
|
graph_exec_id=graph_exec.id,
|
|
user_id=input_data.user_id,
|
|
logger=logger,
|
|
):
|
|
yield name, data
|
|
except BaseException as e:
|
|
await self._stop(
|
|
graph_exec_id=graph_exec.id,
|
|
user_id=input_data.user_id,
|
|
logger=logger,
|
|
)
|
|
logger.warning(
|
|
f"Execution of graph {input_data.graph_id}v{input_data.graph_version} failed: {e.__class__.__name__} {str(e)}; execution is stopped."
|
|
)
|
|
raise
|
|
|
|
async def _run(
|
|
self,
|
|
graph_id: str,
|
|
graph_version: int,
|
|
graph_exec_id: str,
|
|
user_id: str,
|
|
logger: "LogMetadata",
|
|
) -> BlockOutput:
|
|
|
|
from backend.blocks import get_block
|
|
from backend.data.execution import ExecutionEventType
|
|
from backend.executor import utils as execution_utils
|
|
|
|
event_bus = execution_utils.get_async_execution_event_bus()
|
|
|
|
log_id = f"Graph #{graph_id}-V{graph_version}, exec-id: {graph_exec_id}"
|
|
logger.info(f"Starting execution of {log_id}")
|
|
yielded_node_exec_ids = set()
|
|
|
|
async for event in event_bus.listen(
|
|
user_id=user_id,
|
|
graph_id=graph_id,
|
|
graph_exec_id=graph_exec_id,
|
|
):
|
|
if event.status not in [
|
|
ExecutionStatus.COMPLETED,
|
|
ExecutionStatus.TERMINATED,
|
|
ExecutionStatus.FAILED,
|
|
]:
|
|
logger.info(
|
|
f"Execution {log_id} skipping event {event.event_type} status={event.status} "
|
|
f"node={getattr(event, 'node_exec_id', '?')}"
|
|
)
|
|
continue
|
|
|
|
if event.event_type == ExecutionEventType.GRAPH_EXEC_UPDATE:
|
|
# If the graph execution is COMPLETED, TERMINATED, or FAILED,
|
|
# we can stop listening for further events.
|
|
logger.info(
|
|
f"Execution {log_id} graph completed with status {event.status}, "
|
|
f"yielded {len(yielded_node_exec_ids)} outputs"
|
|
)
|
|
self.merge_stats(
|
|
NodeExecutionStats(
|
|
extra_cost=event.stats.cost if event.stats else 0,
|
|
extra_steps=event.stats.node_exec_count if event.stats else 0,
|
|
)
|
|
)
|
|
break
|
|
|
|
logger.debug(
|
|
f"Execution {log_id} produced input {event.input_data} output {event.output_data}"
|
|
)
|
|
|
|
if event.node_exec_id in yielded_node_exec_ids:
|
|
logger.warning(
|
|
f"{log_id} received duplicate event for node execution {event.node_exec_id}"
|
|
)
|
|
continue
|
|
else:
|
|
yielded_node_exec_ids.add(event.node_exec_id)
|
|
|
|
if not event.block_id:
|
|
logger.warning(f"{log_id} received event without block_id {event}")
|
|
continue
|
|
|
|
block = get_block(event.block_id)
|
|
if not block or block.block_type != BlockType.OUTPUT:
|
|
continue
|
|
|
|
output_name = event.input_data.get("name")
|
|
if not output_name:
|
|
logger.warning(f"{log_id} produced an output with no name {event}")
|
|
continue
|
|
|
|
for output_data in event.output_data.get("output", []):
|
|
logger.debug(
|
|
f"Execution {log_id} produced {output_name}: {output_data}"
|
|
)
|
|
yield output_name, output_data
|
|
|
|
@func_retry
|
|
async def _stop(
|
|
self,
|
|
graph_exec_id: str,
|
|
user_id: str,
|
|
logger: "LogMetadata",
|
|
) -> None:
|
|
from backend.executor import utils as execution_utils
|
|
|
|
log_id = f"Graph exec-id: {graph_exec_id}"
|
|
logger.info(f"Stopping execution of {log_id}")
|
|
|
|
try:
|
|
await execution_utils.stop_graph_execution(
|
|
graph_exec_id=graph_exec_id,
|
|
user_id=user_id,
|
|
wait_timeout=3600,
|
|
)
|
|
logger.info(f"Execution {log_id} stopped successfully.")
|
|
except TimeoutError as e:
|
|
logger.error(f"Execution {log_id} stop timed out: {e}")
|