feat(library, executor): Make "Run Again" work with credentials (#10821)

- Resolves [OPEN-2549: Make "Run again" work with credentials in
`AgentRunDetailsView`](https://linear.app/autogpt/issue/OPEN-2549/make-run-again-work-with-credentials-in-agentrundetailsview)
- Resolves #10237

### Changes 🏗️

- feat(frontend/library): Make "Run Again" button work for runs with
credentials
- feat(backend/executor): Store passed-in credentials on
`GraphExecution`

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - Go to `/library/agents/[id]` for an agent with credentials inputs
  - Run the agent manually
    - [x] -> runs successfully
- [x] -> "Run again" shows among the action buttons on the newly created
run
  - Click "Run again"
    - [x] -> runs successfully
This commit is contained in:
Reinier van der Leer
2025-09-02 19:34:56 +01:00
committed by GitHub
parent f669db4a10
commit e16e69ca55
14 changed files with 403 additions and 105 deletions

View File

@@ -1,8 +1,6 @@
import logging
from typing import Any, Optional
from pydantic import JsonValue
from backend.data.block import (
Block,
BlockCategory,
@@ -12,7 +10,7 @@ from backend.data.block import (
BlockType,
get_block,
)
from backend.data.execution import ExecutionStatus
from backend.data.execution import ExecutionStatus, NodesInputMasks
from backend.data.model import NodeExecutionStats, SchemaField
from backend.util.json import validate_with_jsonschema
from backend.util.retry import func_retry
@@ -33,7 +31,7 @@ class AgentExecutorBlock(Block):
input_schema: dict = SchemaField(description="Input schema for the graph")
output_schema: dict = SchemaField(description="Output schema for the graph")
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = SchemaField(
nodes_input_masks: Optional[NodesInputMasks] = SchemaField(
default=None, hidden=True
)

View File

@@ -8,6 +8,7 @@ from enum import Enum
from typing import (
TYPE_CHECKING,
Any,
Callable,
ClassVar,
Generic,
Optional,
@@ -44,9 +45,10 @@ if TYPE_CHECKING:
app_config = Config()
BlockData = tuple[str, Any] # Input & Output data should be a tuple of (name, data).
BlockInput = dict[str, Any] # Input: 1 input pin consumes 1 data.
BlockOutput = AsyncGen[BlockData, None] # Output: 1 output pin produces n data.
BlockOutputEntry = tuple[str, Any] # Output data should be a tuple of (name, value).
BlockOutput = AsyncGen[BlockOutputEntry, None] # Output: 1 output pin produces n data.
BlockTestOutput = BlockOutputEntry | tuple[str, Callable[[Any], bool]]
CompletedBlockOutput = dict[str, list[Any]] # Completed stream, collected as a dict.
@@ -306,7 +308,7 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
input_schema: Type[BlockSchemaInputType] = EmptySchema,
output_schema: Type[BlockSchemaOutputType] = EmptySchema,
test_input: BlockInput | list[BlockInput] | None = None,
test_output: BlockData | list[BlockData] | None = None,
test_output: BlockTestOutput | list[BlockTestOutput] | None = None,
test_mock: dict[str, Any] | None = None,
test_credentials: Optional[Credentials | dict[str, Credentials]] = None,
disabled: bool = False,

View File

@@ -11,11 +11,14 @@ from typing import (
Generator,
Generic,
Literal,
Mapping,
Optional,
TypeVar,
cast,
overload,
)
from prisma import Json
from prisma.enums import AgentExecutionStatus
from prisma.models import (
AgentGraphExecution,
@@ -24,7 +27,6 @@ from prisma.models import (
AgentNodeExecutionKeyValueData,
)
from prisma.types import (
AgentGraphExecutionCreateInput,
AgentGraphExecutionUpdateManyMutationInput,
AgentGraphExecutionWhereInput,
AgentNodeExecutionCreateInput,
@@ -60,7 +62,7 @@ from .includes import (
GRAPH_EXECUTION_INCLUDE_WITH_NODES,
graph_execution_include,
)
from .model import GraphExecutionStats, NodeExecutionStats
from .model import CredentialsMetaInput, GraphExecutionStats, NodeExecutionStats
T = TypeVar("T")
@@ -87,6 +89,8 @@ class BlockErrorStats(BaseModel):
ExecutionStatus = AgentExecutionStatus
NodeInputMask = Mapping[str, JsonValue]
NodesInputMasks = Mapping[str, NodeInputMask]
class GraphExecutionMeta(BaseDbModel):
@@ -94,7 +98,10 @@ class GraphExecutionMeta(BaseDbModel):
user_id: str
graph_id: str
graph_version: int
preset_id: Optional[str] = None
inputs: Optional[BlockInput] # no default -> required in the OpenAPI spec
credential_inputs: Optional[dict[str, CredentialsMetaInput]]
nodes_input_masks: Optional[dict[str, BlockInput]]
preset_id: Optional[str]
status: ExecutionStatus
started_at: datetime
ended_at: datetime
@@ -179,6 +186,18 @@ class GraphExecutionMeta(BaseDbModel):
user_id=_graph_exec.userId,
graph_id=_graph_exec.agentGraphId,
graph_version=_graph_exec.agentGraphVersion,
inputs=cast(BlockInput | None, _graph_exec.inputs),
credential_inputs=(
{
name: CredentialsMetaInput.model_validate(cmi)
for name, cmi in cast(dict, _graph_exec.credentialInputs).items()
}
if _graph_exec.credentialInputs
else None
),
nodes_input_masks=cast(
dict[str, BlockInput] | None, _graph_exec.nodesInputMasks
),
preset_id=_graph_exec.agentPresetId,
status=ExecutionStatus(_graph_exec.executionStatus),
started_at=start_time,
@@ -206,7 +225,7 @@ class GraphExecutionMeta(BaseDbModel):
class GraphExecution(GraphExecutionMeta):
inputs: BlockInput
inputs: BlockInput # type: ignore - incompatible override is intentional
outputs: CompletedBlockOutput
@staticmethod
@@ -226,15 +245,18 @@ class GraphExecution(GraphExecutionMeta):
)
inputs = {
**{
# inputs from Agent Input Blocks
exec.input_data["name"]: exec.input_data.get("value")
for exec in complete_node_executions
if (
(block := get_block(exec.block_id))
and block.block_type == BlockType.INPUT
)
},
**(
graph_exec.inputs
or {
# fallback: extract inputs from Agent Input Blocks
exec.input_data["name"]: exec.input_data.get("value")
for exec in complete_node_executions
if (
(block := get_block(exec.block_id))
and block.block_type == BlockType.INPUT
)
}
),
**{
# input from webhook-triggered block
"payload": exec.input_data["payload"]
@@ -252,14 +274,13 @@ class GraphExecution(GraphExecutionMeta):
if (
block := get_block(exec.block_id)
) and block.block_type == BlockType.OUTPUT:
outputs[exec.input_data["name"]].append(
exec.input_data.get("value", None)
)
outputs[exec.input_data["name"]].append(exec.input_data.get("value"))
return GraphExecution(
**{
field_name: getattr(graph_exec, field_name)
for field_name in GraphExecutionMeta.model_fields
if field_name != "inputs"
},
inputs=inputs,
outputs=outputs,
@@ -292,13 +313,17 @@ class GraphExecutionWithNodes(GraphExecution):
node_executions=node_executions,
)
def to_graph_execution_entry(self, user_context: "UserContext"):
def to_graph_execution_entry(
self,
user_context: "UserContext",
compiled_nodes_input_masks: Optional[NodesInputMasks] = None,
):
return GraphExecutionEntry(
user_id=self.user_id,
graph_id=self.graph_id,
graph_version=self.graph_version or 0,
graph_exec_id=self.id,
nodes_input_masks={}, # FIXME: store credentials on AgentGraphExecution
nodes_input_masks=compiled_nodes_input_masks,
user_context=user_context,
)
@@ -335,7 +360,7 @@ class NodeExecutionResult(BaseModel):
else:
input_data: BlockInput = defaultdict()
for data in _node_exec.Input or []:
input_data[data.name] = type_utils.convert(data.data, type[Any])
input_data[data.name] = type_utils.convert(data.data, JsonValue)
output_data: CompletedBlockOutput = defaultdict(list)
@@ -344,7 +369,7 @@ class NodeExecutionResult(BaseModel):
output_data[name].extend(messages)
else:
for data in _node_exec.Output or []:
output_data[data.name].append(type_utils.convert(data.data, type[Any]))
output_data[data.name].append(type_utils.convert(data.data, JsonValue))
graph_execution: AgentGraphExecution | None = _node_exec.GraphExecution
if graph_execution:
@@ -539,9 +564,12 @@ async def get_graph_execution(
async def create_graph_execution(
graph_id: str,
graph_version: int,
starting_nodes_input: list[tuple[str, BlockInput]],
starting_nodes_input: list[tuple[str, BlockInput]], # list[(node_id, BlockInput)]
inputs: Mapping[str, JsonValue],
user_id: str,
preset_id: str | None = None,
preset_id: Optional[str] = None,
credential_inputs: Optional[Mapping[str, CredentialsMetaInput]] = None,
nodes_input_masks: Optional[NodesInputMasks] = None,
) -> GraphExecutionWithNodes:
"""
Create a new AgentGraphExecution record.
@@ -549,11 +577,18 @@ async def create_graph_execution(
The id of the AgentGraphExecution and the list of ExecutionResult for each node.
"""
result = await AgentGraphExecution.prisma().create(
data=AgentGraphExecutionCreateInput(
agentGraphId=graph_id,
agentGraphVersion=graph_version,
executionStatus=ExecutionStatus.QUEUED,
NodeExecutions={
data={
"agentGraphId": graph_id,
"agentGraphVersion": graph_version,
"executionStatus": ExecutionStatus.QUEUED,
"inputs": SafeJson(inputs),
"credentialInputs": (
SafeJson(credential_inputs) if credential_inputs else Json({})
),
"nodesInputMasks": (
SafeJson(nodes_input_masks) if nodes_input_masks else Json({})
),
"NodeExecutions": {
"create": [
AgentNodeExecutionCreateInput(
agentNodeId=node_id,
@@ -569,9 +604,9 @@ async def create_graph_execution(
for node_id, node_input in starting_nodes_input
]
},
userId=user_id,
agentPresetId=preset_id,
),
"userId": user_id,
"agentPresetId": preset_id,
},
include=GRAPH_EXECUTION_INCLUDE_WITH_NODES,
)
@@ -582,7 +617,7 @@ async def upsert_execution_input(
node_id: str,
graph_exec_id: str,
input_name: str,
input_data: Any,
input_data: JsonValue,
node_exec_id: str | None = None,
) -> tuple[str, BlockInput]:
"""
@@ -631,7 +666,7 @@ async def upsert_execution_input(
)
return existing_execution.id, {
**{
input_data.name: type_utils.convert(input_data.data, type[Any])
input_data.name: type_utils.convert(input_data.data, JsonValue)
for input_data in existing_execution.Input or []
},
input_name: input_data,
@@ -888,7 +923,7 @@ class GraphExecutionEntry(BaseModel):
graph_exec_id: str
graph_id: str
graph_version: int
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None
nodes_input_masks: Optional[NodesInputMasks] = None
user_context: UserContext

View File

@@ -12,7 +12,7 @@ from prisma.types import (
AgentNodeLinkCreateInput,
StoreListingVersionWhereInput,
)
from pydantic import BaseModel, Field, JsonValue, create_model
from pydantic import BaseModel, Field, create_model
from pydantic.fields import computed_field
from backend.blocks.agent import AgentExecutorBlock
@@ -34,6 +34,7 @@ from .db import BaseDbModel, query_raw_with_schema, transaction
from .includes import AGENT_GRAPH_INCLUDE, AGENT_NODE_INCLUDE
if TYPE_CHECKING:
from .execution import NodesInputMasks
from .integrations import Webhook
logger = logging.getLogger(__name__)
@@ -451,7 +452,7 @@ class GraphModel(Graph):
def validate_graph(
self,
for_run: bool = False,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
nodes_input_masks: Optional["NodesInputMasks"] = None,
):
"""
Validate graph structure and raise `ValueError` on issues.
@@ -465,7 +466,7 @@ class GraphModel(Graph):
def _validate_graph(
graph: BaseGraph,
for_run: bool = False,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
nodes_input_masks: Optional["NodesInputMasks"] = None,
) -> None:
errors = GraphModel._validate_graph_get_errors(
graph, for_run, nodes_input_masks
@@ -479,7 +480,7 @@ class GraphModel(Graph):
def validate_graph_get_errors(
self,
for_run: bool = False,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
nodes_input_masks: Optional["NodesInputMasks"] = None,
) -> dict[str, dict[str, str]]:
"""
Validate graph and return structured errors per node.
@@ -501,7 +502,7 @@ class GraphModel(Graph):
def _validate_graph_get_errors(
graph: BaseGraph,
for_run: bool = False,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
nodes_input_masks: Optional["NodesInputMasks"] = None,
) -> dict[str, dict[str, str]]:
"""
Validate graph and return structured errors per node.

View File

@@ -10,7 +10,6 @@ from typing import TYPE_CHECKING, Any, Optional, TypeVar, cast
from pika.adapters.blocking_connection import BlockingChannel
from pika.spec import Basic, BasicProperties
from pydantic import JsonValue
from redis.asyncio.lock import Lock as RedisLock
from backend.blocks.io import AgentOutputBlock
@@ -38,9 +37,9 @@ from prometheus_client import Gauge, start_http_server
from backend.blocks.agent import AgentExecutorBlock
from backend.data import redis_client as redis
from backend.data.block import (
BlockData,
BlockInput,
BlockOutput,
BlockOutputEntry,
BlockSchema,
get_block,
)
@@ -52,6 +51,7 @@ from backend.data.execution import (
GraphExecutionEntry,
NodeExecutionEntry,
NodeExecutionResult,
NodesInputMasks,
UserContext,
)
from backend.data.graph import Link, Node
@@ -131,7 +131,7 @@ async def execute_node(
creds_manager: IntegrationCredentialsManager,
data: NodeExecutionEntry,
execution_stats: NodeExecutionStats | None = None,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
nodes_input_masks: Optional[NodesInputMasks] = None,
) -> BlockOutput:
"""
Execute a node in the graph. This will trigger a block execution on a node,
@@ -237,12 +237,12 @@ async def execute_node(
async def _enqueue_next_nodes(
db_client: "DatabaseManagerAsyncClient",
node: Node,
output: BlockData,
output: BlockOutputEntry,
user_id: str,
graph_exec_id: str,
graph_id: str,
log_metadata: LogMetadata,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]],
nodes_input_masks: Optional[NodesInputMasks],
user_context: UserContext,
) -> list[NodeExecutionEntry]:
async def add_enqueued_execution(
@@ -419,7 +419,7 @@ class ExecutionProcessor:
self,
node_exec: NodeExecutionEntry,
node_exec_progress: NodeExecutionProgress,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]],
nodes_input_masks: Optional[NodesInputMasks],
graph_stats_pair: tuple[GraphExecutionStats, threading.Lock],
) -> NodeExecutionStats:
log_metadata = LogMetadata(
@@ -487,7 +487,7 @@ class ExecutionProcessor:
stats: NodeExecutionStats,
db_client: "DatabaseManagerAsyncClient",
log_metadata: LogMetadata,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
nodes_input_masks: Optional[NodesInputMasks] = None,
) -> ExecutionStatus:
status = ExecutionStatus.RUNNING
@@ -1053,7 +1053,7 @@ class ExecutionProcessor:
node_id: str,
graph_exec: GraphExecutionEntry,
log_metadata: LogMetadata,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]],
nodes_input_masks: Optional[NodesInputMasks],
execution_queue: ExecutionQueue[NodeExecutionEntry],
) -> None:
"""Process a node's output, update its status, and enqueue next nodes.

View File

@@ -4,13 +4,13 @@ import threading
import time
from collections import defaultdict
from concurrent.futures import Future
from typing import Any, Optional
from typing import Any, Mapping, Optional, cast
from pydantic import BaseModel, JsonValue, ValidationError
from backend.data import execution as execution_db
from backend.data import graph as graph_db
from backend.data.block import Block, BlockData, BlockInput, BlockType, get_block
from backend.data.block import Block, BlockInput, BlockOutputEntry, BlockType, get_block
from backend.data.block_cost_config import BLOCK_COSTS
from backend.data.cost import BlockCostType
from backend.data.db import prisma
@@ -18,6 +18,7 @@ from backend.data.execution import (
ExecutionStatus,
GraphExecutionStats,
GraphExecutionWithNodes,
NodesInputMasks,
UserContext,
)
from backend.data.graph import GraphModel, Node
@@ -239,7 +240,7 @@ def _tokenise(path: str) -> list[tuple[str, str]] | None:
# --------------------------------------------------------------------------- #
def parse_execution_output(output: BlockData, name: str) -> Any | None:
def parse_execution_output(output: BlockOutputEntry, name: str) -> JsonValue | None:
"""
Retrieve a nested value out of `output` using the flattened *name*.
@@ -263,7 +264,7 @@ def parse_execution_output(output: BlockData, name: str) -> Any | None:
if tokens is None:
return None
cur: Any = data
cur: JsonValue = data
for delim, ident in tokens:
if delim == LIST_SPLIT:
# list[index]
@@ -428,7 +429,7 @@ def validate_exec(
async def _validate_node_input_credentials(
graph: GraphModel,
user_id: str,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
nodes_input_masks: Optional[NodesInputMasks] = None,
) -> dict[str, dict[str, str]]:
"""
Checks all credentials for all nodes of the graph and returns structured errors.
@@ -508,8 +509,8 @@ async def _validate_node_input_credentials(
def make_node_credentials_input_map(
graph: GraphModel,
graph_credentials_input: dict[str, CredentialsMetaInput],
) -> dict[str, dict[str, JsonValue]]:
graph_credentials_input: Mapping[str, CredentialsMetaInput],
) -> NodesInputMasks:
"""
Maps credentials for an execution to the correct nodes.
@@ -544,8 +545,8 @@ def make_node_credentials_input_map(
async def validate_graph_with_credentials(
graph: GraphModel,
user_id: str,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
) -> dict[str, dict[str, str]]:
nodes_input_masks: Optional[NodesInputMasks] = None,
) -> Mapping[str, Mapping[str, str]]:
"""
Validate graph including credentials and return structured errors per node.
@@ -575,7 +576,7 @@ async def _construct_starting_node_execution_input(
graph: GraphModel,
user_id: str,
graph_inputs: BlockInput,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
nodes_input_masks: Optional[NodesInputMasks] = None,
) -> list[tuple[str, BlockInput]]:
"""
Validates and prepares the input data for executing a graph.
@@ -616,7 +617,7 @@ async def _construct_starting_node_execution_input(
# Extract request input data, and assign it to the input pin.
if block.block_type == BlockType.INPUT:
input_name = node.input_default.get("name")
input_name = cast(str | None, node.input_default.get("name"))
if input_name and input_name in graph_inputs:
input_data = {"value": graph_inputs[input_name]}
@@ -643,9 +644,9 @@ async def validate_and_construct_node_execution_input(
user_id: str,
graph_inputs: BlockInput,
graph_version: Optional[int] = None,
graph_credentials_inputs: Optional[dict[str, CredentialsMetaInput]] = None,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
) -> tuple[GraphModel, list[tuple[str, BlockInput]], dict[str, dict[str, JsonValue]]]:
graph_credentials_inputs: Optional[Mapping[str, CredentialsMetaInput]] = None,
nodes_input_masks: Optional[NodesInputMasks] = None,
) -> tuple[GraphModel, list[tuple[str, BlockInput]], NodesInputMasks]:
"""
Public wrapper that handles graph fetching, credential mapping, and validation+construction.
This centralizes the logic used by both scheduler validation and actual execution.
@@ -659,7 +660,9 @@ async def validate_and_construct_node_execution_input(
nodes_input_masks: Node inputs to use.
Returns:
tuple[GraphModel, list[tuple[str, BlockInput]]]: Graph model and list of tuples for node execution input.
GraphModel: Full graph object for the given `graph_id`.
list[tuple[node_id, BlockInput]]: Starting node IDs with corresponding inputs.
dict[str, BlockInput]: Node input masks including all passed-in credentials.
Raises:
NotFoundError: If the graph is not found.
@@ -700,11 +703,11 @@ async def validate_and_construct_node_execution_input(
def _merge_nodes_input_masks(
overrides_map_1: dict[str, dict[str, JsonValue]],
overrides_map_2: dict[str, dict[str, JsonValue]],
) -> dict[str, dict[str, JsonValue]]:
overrides_map_1: NodesInputMasks,
overrides_map_2: NodesInputMasks,
) -> NodesInputMasks:
"""Perform a per-node merge of input overrides"""
result = overrides_map_1.copy()
result = dict(overrides_map_1).copy()
for node_id, overrides2 in overrides_map_2.items():
if node_id in result:
result[node_id] = {**result[node_id], **overrides2}
@@ -854,8 +857,8 @@ async def add_graph_execution(
inputs: Optional[BlockInput] = None,
preset_id: Optional[str] = None,
graph_version: Optional[int] = None,
graph_credentials_inputs: Optional[dict[str, CredentialsMetaInput]] = None,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
graph_credentials_inputs: Optional[Mapping[str, CredentialsMetaInput]] = None,
nodes_input_masks: Optional[NodesInputMasks] = None,
) -> GraphExecutionWithNodes:
"""
Adds a graph execution to the queue and returns the execution entry.
@@ -879,7 +882,7 @@ async def add_graph_execution(
else:
edb = get_database_manager_async_client()
graph, starting_nodes_input, nodes_input_masks = (
graph, starting_nodes_input, compiled_nodes_input_masks = (
await validate_and_construct_node_execution_input(
graph_id=graph_id,
user_id=user_id,
@@ -892,10 +895,15 @@ async def add_graph_execution(
graph_exec = None
try:
# Sanity check: running add_graph_execution with the properties of
# the graph_exec created here should create the same execution again.
graph_exec = await edb.create_graph_execution(
user_id=user_id,
graph_id=graph_id,
graph_version=graph.version,
inputs=inputs or {},
credential_inputs=graph_credentials_inputs,
nodes_input_masks=nodes_input_masks,
starting_nodes_input=starting_nodes_input,
preset_id=preset_id,
)
@@ -904,9 +912,9 @@ async def add_graph_execution(
user_context = await get_user_context(user_id)
queue = await get_async_execution_queue()
graph_exec_entry = graph_exec.to_graph_execution_entry(user_context)
if nodes_input_masks:
graph_exec_entry.nodes_input_masks = nodes_input_masks
graph_exec_entry = graph_exec.to_graph_execution_entry(
user_context, compiled_nodes_input_masks
)
logger.info(
f"Created graph execution #{graph_exec.id} for graph "
@@ -952,7 +960,7 @@ async def add_graph_execution(
class ExecutionOutputEntry(BaseModel):
node: Node
node_exec_id: str
data: BlockData
data: BlockOutputEntry
class NodeExecutionProgress:

View File

@@ -1,6 +1,7 @@
from typing import cast
import pytest
from pytest_mock import MockerFixture
from backend.executor.utils import merge_execution_input, parse_execution_output
from backend.util.mock import MockObject
@@ -276,3 +277,142 @@ def test_merge_execution_input():
result = merge_execution_input(data)
assert "mixed" in result
assert result["mixed"].attr[0]["key"] == "value3"
@pytest.mark.asyncio
async def test_add_graph_execution_is_repeatable(mocker: MockerFixture):
"""
Verify that calling the function with its own output creates the same execution again.
"""
from backend.data.execution import GraphExecutionWithNodes
from backend.data.model import CredentialsMetaInput
from backend.executor.utils import add_graph_execution
from backend.integrations.providers import ProviderName
# Mock data
graph_id = "test-graph-id"
user_id = "test-user-id"
inputs = {"test_input": "test_value"}
preset_id = "test-preset-id"
graph_version = 1
graph_credentials_inputs = {
"cred_key": CredentialsMetaInput(
id="cred-id", provider=ProviderName("test_provider"), type="oauth2"
)
}
nodes_input_masks = {"node1": {"input1": "masked_value"}}
# Mock the graph object returned by validate_and_construct_node_execution_input
mock_graph = mocker.MagicMock()
mock_graph.version = graph_version
# Mock the starting nodes input and compiled nodes input masks
starting_nodes_input = [
("node1", {"input1": "value1"}),
("node2", {"input1": "value2"}),
]
compiled_nodes_input_masks = {"node1": {"input1": "compiled_mask"}}
# Mock the graph execution object
mock_graph_exec = mocker.MagicMock(spec=GraphExecutionWithNodes)
mock_graph_exec.id = "execution-id-123"
mock_graph_exec.to_graph_execution_entry.return_value = mocker.MagicMock()
# Mock user context
mock_user_context = {"user_id": user_id, "context": "test_context"}
# Mock the queue and event bus
mock_queue = mocker.AsyncMock()
mock_event_bus = mocker.MagicMock()
mock_event_bus.publish = mocker.AsyncMock()
# Setup mocks
mock_validate = mocker.patch(
"backend.executor.utils.validate_and_construct_node_execution_input"
)
mock_edb = mocker.patch("backend.executor.utils.execution_db")
mock_prisma = mocker.patch("backend.executor.utils.prisma")
mock_get_user_context = mocker.patch("backend.executor.utils.get_user_context")
mock_get_queue = mocker.patch("backend.executor.utils.get_async_execution_queue")
mock_get_event_bus = mocker.patch(
"backend.executor.utils.get_async_execution_event_bus"
)
# Setup mock returns
mock_validate.return_value = (
mock_graph,
starting_nodes_input,
compiled_nodes_input_masks,
)
mock_prisma.is_connected.return_value = True
mock_edb.create_graph_execution = mocker.AsyncMock(return_value=mock_graph_exec)
mock_get_user_context.return_value = mock_user_context
mock_get_queue.return_value = mock_queue
mock_get_event_bus.return_value = mock_event_bus
# Call the function - first execution
result1 = await add_graph_execution(
graph_id=graph_id,
user_id=user_id,
inputs=inputs,
preset_id=preset_id,
graph_version=graph_version,
graph_credentials_inputs=graph_credentials_inputs,
nodes_input_masks=nodes_input_masks,
)
# Store the parameters used in the first call to create_graph_execution
first_call_kwargs = mock_edb.create_graph_execution.call_args[1]
# Verify the create_graph_execution was called with correct parameters
mock_edb.create_graph_execution.assert_called_once_with(
user_id=user_id,
graph_id=graph_id,
graph_version=mock_graph.version,
inputs=inputs,
credential_inputs=graph_credentials_inputs,
nodes_input_masks=nodes_input_masks,
starting_nodes_input=starting_nodes_input,
preset_id=preset_id,
)
# Set up the graph execution mock to have properties we can extract
mock_graph_exec.graph_id = graph_id
mock_graph_exec.user_id = user_id
mock_graph_exec.graph_version = graph_version
mock_graph_exec.inputs = inputs
mock_graph_exec.credential_inputs = graph_credentials_inputs
mock_graph_exec.nodes_input_masks = nodes_input_masks
mock_graph_exec.preset_id = preset_id
# Create a second mock execution for the sanity check
mock_graph_exec_2 = mocker.MagicMock(spec=GraphExecutionWithNodes)
mock_graph_exec_2.id = "execution-id-456"
mock_graph_exec_2.to_graph_execution_entry.return_value = mocker.MagicMock()
# Reset mocks and set up for second call
mock_edb.create_graph_execution.reset_mock()
mock_edb.create_graph_execution.return_value = mock_graph_exec_2
mock_validate.reset_mock()
# Sanity check: call add_graph_execution with properties from first result
# This should create the same execution parameters
result2 = await add_graph_execution(
graph_id=mock_graph_exec.graph_id,
user_id=mock_graph_exec.user_id,
inputs=mock_graph_exec.inputs,
preset_id=mock_graph_exec.preset_id,
graph_version=mock_graph_exec.graph_version,
graph_credentials_inputs=mock_graph_exec.credential_inputs,
nodes_input_masks=mock_graph_exec.nodes_input_masks,
)
# Verify that create_graph_execution was called with identical parameters
second_call_kwargs = mock_edb.create_graph_execution.call_args[1]
# The sanity check: both calls should use identical parameters
assert first_call_kwargs == second_call_kwargs
# Both executions should succeed (though they create different objects)
assert result1 == mock_graph_exec
assert result2 == mock_graph_exec_2

View File

@@ -88,6 +88,7 @@ async def test_send_graph_execution_result(
user_id="user-1",
graph_id="test_graph",
graph_version=1,
preset_id=None,
status=ExecutionStatus.COMPLETED,
started_at=datetime.now(tz=timezone.utc),
ended_at=datetime.now(tz=timezone.utc),
@@ -101,6 +102,8 @@ async def test_send_graph_execution_result(
"input_1": "some input value :)",
"input_2": "some *other* input value",
},
credential_inputs=None,
nodes_input_masks=None,
outputs={
"the_output": ["some output value"],
"other_output": ["sike there was another output"],

View File

@@ -1,3 +1,6 @@
from typing import Mapping
class MissingConfigError(Exception):
"""The attempted operation requires configuration which is not available"""
@@ -69,7 +72,7 @@ class GraphValidationError(ValueError):
"""Structured validation error for graph validation failures"""
def __init__(
self, message: str, node_errors: dict[str, dict[str, str]] | None = None
self, message: str, node_errors: Mapping[str, Mapping[str, str]] | None = None
):
super().__init__(message)
self.message = message

View File

@@ -0,0 +1,5 @@
-- Add 'credentialInputs', 'inputs', and 'nodesInputMasks' columns to the AgentGraphExecution table
ALTER TABLE "AgentGraphExecution"
ADD COLUMN "credentialInputs" JSONB,
ADD COLUMN "inputs" JSONB,
ADD COLUMN "nodesInputMasks" JSONB;

View File

@@ -36,7 +36,7 @@ model User {
notifyOnAgentApproved Boolean @default(true)
notifyOnAgentRejected Boolean @default(true)
timezone String @default("not-set")
timezone String @default("not-set")
// Relations
@@ -354,15 +354,20 @@ model AgentGraphExecution {
agentGraphVersion Int @default(1)
AgentGraph AgentGraph @relation(fields: [agentGraphId, agentGraphVersion], references: [id, version], onDelete: Cascade)
agentPresetId String?
AgentPreset AgentPreset? @relation(fields: [agentPresetId], references: [id])
inputs Json?
credentialInputs Json?
nodesInputMasks Json?
NodeExecutions AgentNodeExecution[]
// Link to User model -- Executed by this user
userId String
User User @relation(fields: [userId], references: [id], onDelete: Cascade)
stats Json?
agentPresetId String?
AgentPreset AgentPreset? @relation(fields: [agentPresetId], references: [id])
stats Json?
@@index([agentGraphId, agentGraphVersion])
@@index([userId])

View File

@@ -1,5 +1,4 @@
"use client";
import { isEmpty } from "lodash";
import moment from "moment";
import React, { useCallback, useMemo } from "react";
@@ -92,7 +91,7 @@ export function AgentRunDetailsView({
}
>
| undefined = useMemo(() => {
if (!("inputs" in run)) return undefined;
if (!run.inputs) return undefined;
// TODO: show (link to) preset - https://github.com/Significant-Gravitas/AutoGPT/issues/9168
// Add type info from agent input schema
@@ -110,14 +109,16 @@ export function AgentRunDetailsView({
const runAgain = useCallback(
() =>
agentRunInputs &&
run.inputs &&
graph.credentials_input_schema.required.every(
(k) => k in (run.credential_inputs ?? {}),
) &&
api
.executeGraph(
graph.id,
graph.version,
Object.fromEntries(
Object.entries(agentRunInputs).map(([k, v]) => [k, v.value]),
),
run.inputs,
run.credential_inputs ?? undefined,
)
.then(({ graph_exec_id }) => onRun(graph_exec_id))
.catch(toastOnFail("execute agent")),
@@ -177,7 +178,9 @@ export function AgentRunDetailsView({
: []),
...(["success", "failed", "stopped"].includes(runStatus) &&
!graph.has_external_trigger &&
isEmpty(graph.credentials_input_schema.required) // TODO: enable re-run with credentials - https://linear.app/autogpt/issue/SECRT-1243
graph.credentials_input_schema.required.every(
(k) => k in (run.credential_inputs ?? {}),
)
? [
{
label: (

View File

@@ -5199,6 +5199,36 @@
"user_id": { "type": "string", "title": "User Id" },
"graph_id": { "type": "string", "title": "Graph Id" },
"graph_version": { "type": "integer", "title": "Graph Version" },
"inputs": {
"additionalProperties": true,
"type": "object",
"title": "Inputs"
},
"credential_inputs": {
"anyOf": [
{
"additionalProperties": {
"$ref": "#/components/schemas/CredentialsMetaInput"
},
"type": "object"
},
{ "type": "null" }
],
"title": "Credential Inputs"
},
"nodes_input_masks": {
"anyOf": [
{
"additionalProperties": {
"additionalProperties": true,
"type": "object"
},
"type": "object"
},
{ "type": "null" }
],
"title": "Nodes Input Masks"
},
"preset_id": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Preset Id"
@@ -5220,11 +5250,6 @@
{ "type": "null" }
]
},
"inputs": {
"additionalProperties": true,
"type": "object",
"title": "Inputs"
},
"outputs": {
"additionalProperties": { "items": {}, "type": "array" },
"type": "object",
@@ -5237,11 +5262,14 @@
"user_id",
"graph_id",
"graph_version",
"inputs",
"credential_inputs",
"nodes_input_masks",
"preset_id",
"status",
"started_at",
"ended_at",
"stats",
"inputs",
"outputs"
],
"title": "GraphExecution"
@@ -5287,6 +5315,38 @@
"user_id": { "type": "string", "title": "User Id" },
"graph_id": { "type": "string", "title": "Graph Id" },
"graph_version": { "type": "integer", "title": "Graph Version" },
"inputs": {
"anyOf": [
{ "additionalProperties": true, "type": "object" },
{ "type": "null" }
],
"title": "Inputs"
},
"credential_inputs": {
"anyOf": [
{
"additionalProperties": {
"$ref": "#/components/schemas/CredentialsMetaInput"
},
"type": "object"
},
{ "type": "null" }
],
"title": "Credential Inputs"
},
"nodes_input_masks": {
"anyOf": [
{
"additionalProperties": {
"additionalProperties": true,
"type": "object"
},
"type": "object"
},
{ "type": "null" }
],
"title": "Nodes Input Masks"
},
"preset_id": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Preset Id"
@@ -5315,6 +5375,10 @@
"user_id",
"graph_id",
"graph_version",
"inputs",
"credential_inputs",
"nodes_input_masks",
"preset_id",
"status",
"started_at",
"ended_at",
@@ -5328,6 +5392,36 @@
"user_id": { "type": "string", "title": "User Id" },
"graph_id": { "type": "string", "title": "Graph Id" },
"graph_version": { "type": "integer", "title": "Graph Version" },
"inputs": {
"additionalProperties": true,
"type": "object",
"title": "Inputs"
},
"credential_inputs": {
"anyOf": [
{
"additionalProperties": {
"$ref": "#/components/schemas/CredentialsMetaInput"
},
"type": "object"
},
{ "type": "null" }
],
"title": "Credential Inputs"
},
"nodes_input_masks": {
"anyOf": [
{
"additionalProperties": {
"additionalProperties": true,
"type": "object"
},
"type": "object"
},
{ "type": "null" }
],
"title": "Nodes Input Masks"
},
"preset_id": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Preset Id"
@@ -5349,11 +5443,6 @@
{ "type": "null" }
]
},
"inputs": {
"additionalProperties": true,
"type": "object",
"title": "Inputs"
},
"outputs": {
"additionalProperties": { "items": {}, "type": "array" },
"type": "object",
@@ -5371,11 +5460,14 @@
"user_id",
"graph_id",
"graph_version",
"inputs",
"credential_inputs",
"nodes_input_masks",
"preset_id",
"status",
"started_at",
"ended_at",
"stats",
"inputs",
"outputs",
"node_executions"
],

View File

@@ -250,6 +250,9 @@ export type GraphExecutionMeta = {
user_id: UserID;
graph_id: GraphID;
graph_version: number;
inputs: Record<string, any> | null;
credential_inputs: Record<string, CredentialsMetaInput> | null;
nodes_input_masks: Record<string, Record<string, any>> | null;
preset_id: LibraryAgentPresetID | null;
status:
| "QUEUED"
@@ -276,7 +279,7 @@ export type GraphExecutionMeta = {
export type GraphExecutionID = Brand<string, "GraphExecutionID">;
/* Mirror of backend/data/execution.py:GraphExecution */
export type GraphExecution = GraphExecutionMeta & {
export type GraphExecution = Omit<GraphExecutionMeta, "inputs"> & {
inputs: Record<string, any>;
outputs: Record<string, Array<any>>;
node_executions?: NodeExecutionResult[];
@@ -534,7 +537,7 @@ export type CredentialsDeleteNeedConfirmationResponse = {
export type CredentialsMetaInput = {
id: string;
type: CredentialsType;
title?: string;
title?: string | null;
provider: string;
};