From e16e69ca55875df20e4e43a09b75c318bd243b8a Mon Sep 17 00:00:00 2001 From: Reinier van der Leer Date: Tue, 2 Sep 2025 19:34:56 +0100 Subject: [PATCH] feat(library, executor): Make "Run Again" work with credentials (#10821) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Resolves [OPEN-2549: Make "Run again" work with credentials in `AgentRunDetailsView`](https://linear.app/autogpt/issue/OPEN-2549/make-run-again-work-with-credentials-in-agentrundetailsview) - Resolves #10237 ### Changes 🏗️ - feat(frontend/library): Make "Run Again" button work for runs with credentials - feat(backend/executor): Store passed-in credentials on `GraphExecution` ### Checklist 📋 #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: - Go to `/library/agents/[id]` for an agent with credentials inputs - Run the agent manually - [x] -> runs successfully - [x] -> "Run again" shows among the action buttons on the newly created run - Click "Run again" - [x] -> runs successfully --- .../backend/backend/blocks/agent.py | 6 +- .../backend/backend/data/block.py | 8 +- .../backend/backend/data/execution.py | 101 ++++++++----- .../backend/backend/data/graph.py | 11 +- .../backend/backend/executor/manager.py | 16 +- .../backend/backend/executor/utils.py | 60 ++++---- .../executor/{util_test.py => utils_test.py} | 140 ++++++++++++++++++ .../backend/server/conn_manager_test.py | 3 + .../backend/backend/util/exceptions.py | 5 +- .../migration.sql | 5 + autogpt_platform/backend/schema.prisma | 13 +- .../components/agent-run-details-view.tsx | 17 ++- .../frontend/src/app/api/openapi.json | 116 +++++++++++++-- .../src/lib/autogpt-server-api/types.ts | 7 +- 14 files changed, 403 insertions(+), 105 deletions(-) rename autogpt_platform/backend/backend/executor/{util_test.py => utils_test.py} (63%) create mode 100644 autogpt_platform/backend/migrations/20250902171554_add_credentials_to_graph_execution/migration.sql diff --git a/autogpt_platform/backend/backend/blocks/agent.py b/autogpt_platform/backend/backend/blocks/agent.py index ac3eedb12b..68fef45ada 100644 --- a/autogpt_platform/backend/backend/blocks/agent.py +++ b/autogpt_platform/backend/backend/blocks/agent.py @@ -1,8 +1,6 @@ import logging from typing import Any, Optional -from pydantic import JsonValue - from backend.data.block import ( Block, BlockCategory, @@ -12,7 +10,7 @@ from backend.data.block import ( BlockType, get_block, ) -from backend.data.execution import ExecutionStatus +from backend.data.execution import ExecutionStatus, NodesInputMasks from backend.data.model import NodeExecutionStats, SchemaField from backend.util.json import validate_with_jsonschema from backend.util.retry import func_retry @@ -33,7 +31,7 @@ class AgentExecutorBlock(Block): input_schema: dict = SchemaField(description="Input schema for the graph") output_schema: dict = SchemaField(description="Output schema for the graph") - nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = SchemaField( + nodes_input_masks: Optional[NodesInputMasks] = SchemaField( default=None, hidden=True ) diff --git a/autogpt_platform/backend/backend/data/block.py b/autogpt_platform/backend/backend/data/block.py index 0b47ae9362..7dd5560251 100644 --- a/autogpt_platform/backend/backend/data/block.py +++ b/autogpt_platform/backend/backend/data/block.py @@ -8,6 +8,7 @@ from enum import Enum from typing import ( TYPE_CHECKING, Any, + Callable, ClassVar, Generic, Optional, @@ -44,9 +45,10 @@ if TYPE_CHECKING: app_config = Config() -BlockData = tuple[str, Any] # Input & Output data should be a tuple of (name, data). BlockInput = dict[str, Any] # Input: 1 input pin consumes 1 data. -BlockOutput = AsyncGen[BlockData, None] # Output: 1 output pin produces n data. +BlockOutputEntry = tuple[str, Any] # Output data should be a tuple of (name, value). +BlockOutput = AsyncGen[BlockOutputEntry, None] # Output: 1 output pin produces n data. +BlockTestOutput = BlockOutputEntry | tuple[str, Callable[[Any], bool]] CompletedBlockOutput = dict[str, list[Any]] # Completed stream, collected as a dict. @@ -306,7 +308,7 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]): input_schema: Type[BlockSchemaInputType] = EmptySchema, output_schema: Type[BlockSchemaOutputType] = EmptySchema, test_input: BlockInput | list[BlockInput] | None = None, - test_output: BlockData | list[BlockData] | None = None, + test_output: BlockTestOutput | list[BlockTestOutput] | None = None, test_mock: dict[str, Any] | None = None, test_credentials: Optional[Credentials | dict[str, Credentials]] = None, disabled: bool = False, diff --git a/autogpt_platform/backend/backend/data/execution.py b/autogpt_platform/backend/backend/data/execution.py index 2d9fc9c65b..cc9c797b22 100644 --- a/autogpt_platform/backend/backend/data/execution.py +++ b/autogpt_platform/backend/backend/data/execution.py @@ -11,11 +11,14 @@ from typing import ( Generator, Generic, Literal, + Mapping, Optional, TypeVar, + cast, overload, ) +from prisma import Json from prisma.enums import AgentExecutionStatus from prisma.models import ( AgentGraphExecution, @@ -24,7 +27,6 @@ from prisma.models import ( AgentNodeExecutionKeyValueData, ) from prisma.types import ( - AgentGraphExecutionCreateInput, AgentGraphExecutionUpdateManyMutationInput, AgentGraphExecutionWhereInput, AgentNodeExecutionCreateInput, @@ -60,7 +62,7 @@ from .includes import ( GRAPH_EXECUTION_INCLUDE_WITH_NODES, graph_execution_include, ) -from .model import GraphExecutionStats, NodeExecutionStats +from .model import CredentialsMetaInput, GraphExecutionStats, NodeExecutionStats T = TypeVar("T") @@ -87,6 +89,8 @@ class BlockErrorStats(BaseModel): ExecutionStatus = AgentExecutionStatus +NodeInputMask = Mapping[str, JsonValue] +NodesInputMasks = Mapping[str, NodeInputMask] class GraphExecutionMeta(BaseDbModel): @@ -94,7 +98,10 @@ class GraphExecutionMeta(BaseDbModel): user_id: str graph_id: str graph_version: int - preset_id: Optional[str] = None + inputs: Optional[BlockInput] # no default -> required in the OpenAPI spec + credential_inputs: Optional[dict[str, CredentialsMetaInput]] + nodes_input_masks: Optional[dict[str, BlockInput]] + preset_id: Optional[str] status: ExecutionStatus started_at: datetime ended_at: datetime @@ -179,6 +186,18 @@ class GraphExecutionMeta(BaseDbModel): user_id=_graph_exec.userId, graph_id=_graph_exec.agentGraphId, graph_version=_graph_exec.agentGraphVersion, + inputs=cast(BlockInput | None, _graph_exec.inputs), + credential_inputs=( + { + name: CredentialsMetaInput.model_validate(cmi) + for name, cmi in cast(dict, _graph_exec.credentialInputs).items() + } + if _graph_exec.credentialInputs + else None + ), + nodes_input_masks=cast( + dict[str, BlockInput] | None, _graph_exec.nodesInputMasks + ), preset_id=_graph_exec.agentPresetId, status=ExecutionStatus(_graph_exec.executionStatus), started_at=start_time, @@ -206,7 +225,7 @@ class GraphExecutionMeta(BaseDbModel): class GraphExecution(GraphExecutionMeta): - inputs: BlockInput + inputs: BlockInput # type: ignore - incompatible override is intentional outputs: CompletedBlockOutput @staticmethod @@ -226,15 +245,18 @@ class GraphExecution(GraphExecutionMeta): ) inputs = { - **{ - # inputs from Agent Input Blocks - exec.input_data["name"]: exec.input_data.get("value") - for exec in complete_node_executions - if ( - (block := get_block(exec.block_id)) - and block.block_type == BlockType.INPUT - ) - }, + **( + graph_exec.inputs + or { + # fallback: extract inputs from Agent Input Blocks + exec.input_data["name"]: exec.input_data.get("value") + for exec in complete_node_executions + if ( + (block := get_block(exec.block_id)) + and block.block_type == BlockType.INPUT + ) + } + ), **{ # input from webhook-triggered block "payload": exec.input_data["payload"] @@ -252,14 +274,13 @@ class GraphExecution(GraphExecutionMeta): if ( block := get_block(exec.block_id) ) and block.block_type == BlockType.OUTPUT: - outputs[exec.input_data["name"]].append( - exec.input_data.get("value", None) - ) + outputs[exec.input_data["name"]].append(exec.input_data.get("value")) return GraphExecution( **{ field_name: getattr(graph_exec, field_name) for field_name in GraphExecutionMeta.model_fields + if field_name != "inputs" }, inputs=inputs, outputs=outputs, @@ -292,13 +313,17 @@ class GraphExecutionWithNodes(GraphExecution): node_executions=node_executions, ) - def to_graph_execution_entry(self, user_context: "UserContext"): + def to_graph_execution_entry( + self, + user_context: "UserContext", + compiled_nodes_input_masks: Optional[NodesInputMasks] = None, + ): return GraphExecutionEntry( user_id=self.user_id, graph_id=self.graph_id, graph_version=self.graph_version or 0, graph_exec_id=self.id, - nodes_input_masks={}, # FIXME: store credentials on AgentGraphExecution + nodes_input_masks=compiled_nodes_input_masks, user_context=user_context, ) @@ -335,7 +360,7 @@ class NodeExecutionResult(BaseModel): else: input_data: BlockInput = defaultdict() for data in _node_exec.Input or []: - input_data[data.name] = type_utils.convert(data.data, type[Any]) + input_data[data.name] = type_utils.convert(data.data, JsonValue) output_data: CompletedBlockOutput = defaultdict(list) @@ -344,7 +369,7 @@ class NodeExecutionResult(BaseModel): output_data[name].extend(messages) else: for data in _node_exec.Output or []: - output_data[data.name].append(type_utils.convert(data.data, type[Any])) + output_data[data.name].append(type_utils.convert(data.data, JsonValue)) graph_execution: AgentGraphExecution | None = _node_exec.GraphExecution if graph_execution: @@ -539,9 +564,12 @@ async def get_graph_execution( async def create_graph_execution( graph_id: str, graph_version: int, - starting_nodes_input: list[tuple[str, BlockInput]], + starting_nodes_input: list[tuple[str, BlockInput]], # list[(node_id, BlockInput)] + inputs: Mapping[str, JsonValue], user_id: str, - preset_id: str | None = None, + preset_id: Optional[str] = None, + credential_inputs: Optional[Mapping[str, CredentialsMetaInput]] = None, + nodes_input_masks: Optional[NodesInputMasks] = None, ) -> GraphExecutionWithNodes: """ Create a new AgentGraphExecution record. @@ -549,11 +577,18 @@ async def create_graph_execution( The id of the AgentGraphExecution and the list of ExecutionResult for each node. """ result = await AgentGraphExecution.prisma().create( - data=AgentGraphExecutionCreateInput( - agentGraphId=graph_id, - agentGraphVersion=graph_version, - executionStatus=ExecutionStatus.QUEUED, - NodeExecutions={ + data={ + "agentGraphId": graph_id, + "agentGraphVersion": graph_version, + "executionStatus": ExecutionStatus.QUEUED, + "inputs": SafeJson(inputs), + "credentialInputs": ( + SafeJson(credential_inputs) if credential_inputs else Json({}) + ), + "nodesInputMasks": ( + SafeJson(nodes_input_masks) if nodes_input_masks else Json({}) + ), + "NodeExecutions": { "create": [ AgentNodeExecutionCreateInput( agentNodeId=node_id, @@ -569,9 +604,9 @@ async def create_graph_execution( for node_id, node_input in starting_nodes_input ] }, - userId=user_id, - agentPresetId=preset_id, - ), + "userId": user_id, + "agentPresetId": preset_id, + }, include=GRAPH_EXECUTION_INCLUDE_WITH_NODES, ) @@ -582,7 +617,7 @@ async def upsert_execution_input( node_id: str, graph_exec_id: str, input_name: str, - input_data: Any, + input_data: JsonValue, node_exec_id: str | None = None, ) -> tuple[str, BlockInput]: """ @@ -631,7 +666,7 @@ async def upsert_execution_input( ) return existing_execution.id, { **{ - input_data.name: type_utils.convert(input_data.data, type[Any]) + input_data.name: type_utils.convert(input_data.data, JsonValue) for input_data in existing_execution.Input or [] }, input_name: input_data, @@ -888,7 +923,7 @@ class GraphExecutionEntry(BaseModel): graph_exec_id: str graph_id: str graph_version: int - nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None + nodes_input_masks: Optional[NodesInputMasks] = None user_context: UserContext diff --git a/autogpt_platform/backend/backend/data/graph.py b/autogpt_platform/backend/backend/data/graph.py index a53d745fb2..bdf43f539c 100644 --- a/autogpt_platform/backend/backend/data/graph.py +++ b/autogpt_platform/backend/backend/data/graph.py @@ -12,7 +12,7 @@ from prisma.types import ( AgentNodeLinkCreateInput, StoreListingVersionWhereInput, ) -from pydantic import BaseModel, Field, JsonValue, create_model +from pydantic import BaseModel, Field, create_model from pydantic.fields import computed_field from backend.blocks.agent import AgentExecutorBlock @@ -34,6 +34,7 @@ from .db import BaseDbModel, query_raw_with_schema, transaction from .includes import AGENT_GRAPH_INCLUDE, AGENT_NODE_INCLUDE if TYPE_CHECKING: + from .execution import NodesInputMasks from .integrations import Webhook logger = logging.getLogger(__name__) @@ -451,7 +452,7 @@ class GraphModel(Graph): def validate_graph( self, for_run: bool = False, - nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None, + nodes_input_masks: Optional["NodesInputMasks"] = None, ): """ Validate graph structure and raise `ValueError` on issues. @@ -465,7 +466,7 @@ class GraphModel(Graph): def _validate_graph( graph: BaseGraph, for_run: bool = False, - nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None, + nodes_input_masks: Optional["NodesInputMasks"] = None, ) -> None: errors = GraphModel._validate_graph_get_errors( graph, for_run, nodes_input_masks @@ -479,7 +480,7 @@ class GraphModel(Graph): def validate_graph_get_errors( self, for_run: bool = False, - nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None, + nodes_input_masks: Optional["NodesInputMasks"] = None, ) -> dict[str, dict[str, str]]: """ Validate graph and return structured errors per node. @@ -501,7 +502,7 @@ class GraphModel(Graph): def _validate_graph_get_errors( graph: BaseGraph, for_run: bool = False, - nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None, + nodes_input_masks: Optional["NodesInputMasks"] = None, ) -> dict[str, dict[str, str]]: """ Validate graph and return structured errors per node. diff --git a/autogpt_platform/backend/backend/executor/manager.py b/autogpt_platform/backend/backend/executor/manager.py index 5e38c285d9..38377c1853 100644 --- a/autogpt_platform/backend/backend/executor/manager.py +++ b/autogpt_platform/backend/backend/executor/manager.py @@ -10,7 +10,6 @@ from typing import TYPE_CHECKING, Any, Optional, TypeVar, cast from pika.adapters.blocking_connection import BlockingChannel from pika.spec import Basic, BasicProperties -from pydantic import JsonValue from redis.asyncio.lock import Lock as RedisLock from backend.blocks.io import AgentOutputBlock @@ -38,9 +37,9 @@ from prometheus_client import Gauge, start_http_server from backend.blocks.agent import AgentExecutorBlock from backend.data import redis_client as redis from backend.data.block import ( - BlockData, BlockInput, BlockOutput, + BlockOutputEntry, BlockSchema, get_block, ) @@ -52,6 +51,7 @@ from backend.data.execution import ( GraphExecutionEntry, NodeExecutionEntry, NodeExecutionResult, + NodesInputMasks, UserContext, ) from backend.data.graph import Link, Node @@ -131,7 +131,7 @@ async def execute_node( creds_manager: IntegrationCredentialsManager, data: NodeExecutionEntry, execution_stats: NodeExecutionStats | None = None, - nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None, + nodes_input_masks: Optional[NodesInputMasks] = None, ) -> BlockOutput: """ Execute a node in the graph. This will trigger a block execution on a node, @@ -237,12 +237,12 @@ async def execute_node( async def _enqueue_next_nodes( db_client: "DatabaseManagerAsyncClient", node: Node, - output: BlockData, + output: BlockOutputEntry, user_id: str, graph_exec_id: str, graph_id: str, log_metadata: LogMetadata, - nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]], + nodes_input_masks: Optional[NodesInputMasks], user_context: UserContext, ) -> list[NodeExecutionEntry]: async def add_enqueued_execution( @@ -419,7 +419,7 @@ class ExecutionProcessor: self, node_exec: NodeExecutionEntry, node_exec_progress: NodeExecutionProgress, - nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]], + nodes_input_masks: Optional[NodesInputMasks], graph_stats_pair: tuple[GraphExecutionStats, threading.Lock], ) -> NodeExecutionStats: log_metadata = LogMetadata( @@ -487,7 +487,7 @@ class ExecutionProcessor: stats: NodeExecutionStats, db_client: "DatabaseManagerAsyncClient", log_metadata: LogMetadata, - nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None, + nodes_input_masks: Optional[NodesInputMasks] = None, ) -> ExecutionStatus: status = ExecutionStatus.RUNNING @@ -1053,7 +1053,7 @@ class ExecutionProcessor: node_id: str, graph_exec: GraphExecutionEntry, log_metadata: LogMetadata, - nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]], + nodes_input_masks: Optional[NodesInputMasks], execution_queue: ExecutionQueue[NodeExecutionEntry], ) -> None: """Process a node's output, update its status, and enqueue next nodes. diff --git a/autogpt_platform/backend/backend/executor/utils.py b/autogpt_platform/backend/backend/executor/utils.py index dd4b94a60c..85fe8cc278 100644 --- a/autogpt_platform/backend/backend/executor/utils.py +++ b/autogpt_platform/backend/backend/executor/utils.py @@ -4,13 +4,13 @@ import threading import time from collections import defaultdict from concurrent.futures import Future -from typing import Any, Optional +from typing import Any, Mapping, Optional, cast from pydantic import BaseModel, JsonValue, ValidationError from backend.data import execution as execution_db from backend.data import graph as graph_db -from backend.data.block import Block, BlockData, BlockInput, BlockType, get_block +from backend.data.block import Block, BlockInput, BlockOutputEntry, BlockType, get_block from backend.data.block_cost_config import BLOCK_COSTS from backend.data.cost import BlockCostType from backend.data.db import prisma @@ -18,6 +18,7 @@ from backend.data.execution import ( ExecutionStatus, GraphExecutionStats, GraphExecutionWithNodes, + NodesInputMasks, UserContext, ) from backend.data.graph import GraphModel, Node @@ -239,7 +240,7 @@ def _tokenise(path: str) -> list[tuple[str, str]] | None: # --------------------------------------------------------------------------- # -def parse_execution_output(output: BlockData, name: str) -> Any | None: +def parse_execution_output(output: BlockOutputEntry, name: str) -> JsonValue | None: """ Retrieve a nested value out of `output` using the flattened *name*. @@ -263,7 +264,7 @@ def parse_execution_output(output: BlockData, name: str) -> Any | None: if tokens is None: return None - cur: Any = data + cur: JsonValue = data for delim, ident in tokens: if delim == LIST_SPLIT: # list[index] @@ -428,7 +429,7 @@ def validate_exec( async def _validate_node_input_credentials( graph: GraphModel, user_id: str, - nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None, + nodes_input_masks: Optional[NodesInputMasks] = None, ) -> dict[str, dict[str, str]]: """ Checks all credentials for all nodes of the graph and returns structured errors. @@ -508,8 +509,8 @@ async def _validate_node_input_credentials( def make_node_credentials_input_map( graph: GraphModel, - graph_credentials_input: dict[str, CredentialsMetaInput], -) -> dict[str, dict[str, JsonValue]]: + graph_credentials_input: Mapping[str, CredentialsMetaInput], +) -> NodesInputMasks: """ Maps credentials for an execution to the correct nodes. @@ -544,8 +545,8 @@ def make_node_credentials_input_map( async def validate_graph_with_credentials( graph: GraphModel, user_id: str, - nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None, -) -> dict[str, dict[str, str]]: + nodes_input_masks: Optional[NodesInputMasks] = None, +) -> Mapping[str, Mapping[str, str]]: """ Validate graph including credentials and return structured errors per node. @@ -575,7 +576,7 @@ async def _construct_starting_node_execution_input( graph: GraphModel, user_id: str, graph_inputs: BlockInput, - nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None, + nodes_input_masks: Optional[NodesInputMasks] = None, ) -> list[tuple[str, BlockInput]]: """ Validates and prepares the input data for executing a graph. @@ -616,7 +617,7 @@ async def _construct_starting_node_execution_input( # Extract request input data, and assign it to the input pin. if block.block_type == BlockType.INPUT: - input_name = node.input_default.get("name") + input_name = cast(str | None, node.input_default.get("name")) if input_name and input_name in graph_inputs: input_data = {"value": graph_inputs[input_name]} @@ -643,9 +644,9 @@ async def validate_and_construct_node_execution_input( user_id: str, graph_inputs: BlockInput, graph_version: Optional[int] = None, - graph_credentials_inputs: Optional[dict[str, CredentialsMetaInput]] = None, - nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None, -) -> tuple[GraphModel, list[tuple[str, BlockInput]], dict[str, dict[str, JsonValue]]]: + graph_credentials_inputs: Optional[Mapping[str, CredentialsMetaInput]] = None, + nodes_input_masks: Optional[NodesInputMasks] = None, +) -> tuple[GraphModel, list[tuple[str, BlockInput]], NodesInputMasks]: """ Public wrapper that handles graph fetching, credential mapping, and validation+construction. This centralizes the logic used by both scheduler validation and actual execution. @@ -659,7 +660,9 @@ async def validate_and_construct_node_execution_input( nodes_input_masks: Node inputs to use. Returns: - tuple[GraphModel, list[tuple[str, BlockInput]]]: Graph model and list of tuples for node execution input. + GraphModel: Full graph object for the given `graph_id`. + list[tuple[node_id, BlockInput]]: Starting node IDs with corresponding inputs. + dict[str, BlockInput]: Node input masks including all passed-in credentials. Raises: NotFoundError: If the graph is not found. @@ -700,11 +703,11 @@ async def validate_and_construct_node_execution_input( def _merge_nodes_input_masks( - overrides_map_1: dict[str, dict[str, JsonValue]], - overrides_map_2: dict[str, dict[str, JsonValue]], -) -> dict[str, dict[str, JsonValue]]: + overrides_map_1: NodesInputMasks, + overrides_map_2: NodesInputMasks, +) -> NodesInputMasks: """Perform a per-node merge of input overrides""" - result = overrides_map_1.copy() + result = dict(overrides_map_1).copy() for node_id, overrides2 in overrides_map_2.items(): if node_id in result: result[node_id] = {**result[node_id], **overrides2} @@ -854,8 +857,8 @@ async def add_graph_execution( inputs: Optional[BlockInput] = None, preset_id: Optional[str] = None, graph_version: Optional[int] = None, - graph_credentials_inputs: Optional[dict[str, CredentialsMetaInput]] = None, - nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None, + graph_credentials_inputs: Optional[Mapping[str, CredentialsMetaInput]] = None, + nodes_input_masks: Optional[NodesInputMasks] = None, ) -> GraphExecutionWithNodes: """ Adds a graph execution to the queue and returns the execution entry. @@ -879,7 +882,7 @@ async def add_graph_execution( else: edb = get_database_manager_async_client() - graph, starting_nodes_input, nodes_input_masks = ( + graph, starting_nodes_input, compiled_nodes_input_masks = ( await validate_and_construct_node_execution_input( graph_id=graph_id, user_id=user_id, @@ -892,10 +895,15 @@ async def add_graph_execution( graph_exec = None try: + # Sanity check: running add_graph_execution with the properties of + # the graph_exec created here should create the same execution again. graph_exec = await edb.create_graph_execution( user_id=user_id, graph_id=graph_id, graph_version=graph.version, + inputs=inputs or {}, + credential_inputs=graph_credentials_inputs, + nodes_input_masks=nodes_input_masks, starting_nodes_input=starting_nodes_input, preset_id=preset_id, ) @@ -904,9 +912,9 @@ async def add_graph_execution( user_context = await get_user_context(user_id) queue = await get_async_execution_queue() - graph_exec_entry = graph_exec.to_graph_execution_entry(user_context) - if nodes_input_masks: - graph_exec_entry.nodes_input_masks = nodes_input_masks + graph_exec_entry = graph_exec.to_graph_execution_entry( + user_context, compiled_nodes_input_masks + ) logger.info( f"Created graph execution #{graph_exec.id} for graph " @@ -952,7 +960,7 @@ async def add_graph_execution( class ExecutionOutputEntry(BaseModel): node: Node node_exec_id: str - data: BlockData + data: BlockOutputEntry class NodeExecutionProgress: diff --git a/autogpt_platform/backend/backend/executor/util_test.py b/autogpt_platform/backend/backend/executor/utils_test.py similarity index 63% rename from autogpt_platform/backend/backend/executor/util_test.py rename to autogpt_platform/backend/backend/executor/utils_test.py index 197daa2239..c36208c045 100644 --- a/autogpt_platform/backend/backend/executor/util_test.py +++ b/autogpt_platform/backend/backend/executor/utils_test.py @@ -1,6 +1,7 @@ from typing import cast import pytest +from pytest_mock import MockerFixture from backend.executor.utils import merge_execution_input, parse_execution_output from backend.util.mock import MockObject @@ -276,3 +277,142 @@ def test_merge_execution_input(): result = merge_execution_input(data) assert "mixed" in result assert result["mixed"].attr[0]["key"] == "value3" + + +@pytest.mark.asyncio +async def test_add_graph_execution_is_repeatable(mocker: MockerFixture): + """ + Verify that calling the function with its own output creates the same execution again. + """ + from backend.data.execution import GraphExecutionWithNodes + from backend.data.model import CredentialsMetaInput + from backend.executor.utils import add_graph_execution + from backend.integrations.providers import ProviderName + + # Mock data + graph_id = "test-graph-id" + user_id = "test-user-id" + inputs = {"test_input": "test_value"} + preset_id = "test-preset-id" + graph_version = 1 + graph_credentials_inputs = { + "cred_key": CredentialsMetaInput( + id="cred-id", provider=ProviderName("test_provider"), type="oauth2" + ) + } + nodes_input_masks = {"node1": {"input1": "masked_value"}} + + # Mock the graph object returned by validate_and_construct_node_execution_input + mock_graph = mocker.MagicMock() + mock_graph.version = graph_version + + # Mock the starting nodes input and compiled nodes input masks + starting_nodes_input = [ + ("node1", {"input1": "value1"}), + ("node2", {"input1": "value2"}), + ] + compiled_nodes_input_masks = {"node1": {"input1": "compiled_mask"}} + + # Mock the graph execution object + mock_graph_exec = mocker.MagicMock(spec=GraphExecutionWithNodes) + mock_graph_exec.id = "execution-id-123" + mock_graph_exec.to_graph_execution_entry.return_value = mocker.MagicMock() + + # Mock user context + mock_user_context = {"user_id": user_id, "context": "test_context"} + + # Mock the queue and event bus + mock_queue = mocker.AsyncMock() + mock_event_bus = mocker.MagicMock() + mock_event_bus.publish = mocker.AsyncMock() + + # Setup mocks + mock_validate = mocker.patch( + "backend.executor.utils.validate_and_construct_node_execution_input" + ) + mock_edb = mocker.patch("backend.executor.utils.execution_db") + mock_prisma = mocker.patch("backend.executor.utils.prisma") + mock_get_user_context = mocker.patch("backend.executor.utils.get_user_context") + mock_get_queue = mocker.patch("backend.executor.utils.get_async_execution_queue") + mock_get_event_bus = mocker.patch( + "backend.executor.utils.get_async_execution_event_bus" + ) + + # Setup mock returns + mock_validate.return_value = ( + mock_graph, + starting_nodes_input, + compiled_nodes_input_masks, + ) + mock_prisma.is_connected.return_value = True + mock_edb.create_graph_execution = mocker.AsyncMock(return_value=mock_graph_exec) + mock_get_user_context.return_value = mock_user_context + mock_get_queue.return_value = mock_queue + mock_get_event_bus.return_value = mock_event_bus + + # Call the function - first execution + result1 = await add_graph_execution( + graph_id=graph_id, + user_id=user_id, + inputs=inputs, + preset_id=preset_id, + graph_version=graph_version, + graph_credentials_inputs=graph_credentials_inputs, + nodes_input_masks=nodes_input_masks, + ) + + # Store the parameters used in the first call to create_graph_execution + first_call_kwargs = mock_edb.create_graph_execution.call_args[1] + + # Verify the create_graph_execution was called with correct parameters + mock_edb.create_graph_execution.assert_called_once_with( + user_id=user_id, + graph_id=graph_id, + graph_version=mock_graph.version, + inputs=inputs, + credential_inputs=graph_credentials_inputs, + nodes_input_masks=nodes_input_masks, + starting_nodes_input=starting_nodes_input, + preset_id=preset_id, + ) + + # Set up the graph execution mock to have properties we can extract + mock_graph_exec.graph_id = graph_id + mock_graph_exec.user_id = user_id + mock_graph_exec.graph_version = graph_version + mock_graph_exec.inputs = inputs + mock_graph_exec.credential_inputs = graph_credentials_inputs + mock_graph_exec.nodes_input_masks = nodes_input_masks + mock_graph_exec.preset_id = preset_id + + # Create a second mock execution for the sanity check + mock_graph_exec_2 = mocker.MagicMock(spec=GraphExecutionWithNodes) + mock_graph_exec_2.id = "execution-id-456" + mock_graph_exec_2.to_graph_execution_entry.return_value = mocker.MagicMock() + + # Reset mocks and set up for second call + mock_edb.create_graph_execution.reset_mock() + mock_edb.create_graph_execution.return_value = mock_graph_exec_2 + mock_validate.reset_mock() + + # Sanity check: call add_graph_execution with properties from first result + # This should create the same execution parameters + result2 = await add_graph_execution( + graph_id=mock_graph_exec.graph_id, + user_id=mock_graph_exec.user_id, + inputs=mock_graph_exec.inputs, + preset_id=mock_graph_exec.preset_id, + graph_version=mock_graph_exec.graph_version, + graph_credentials_inputs=mock_graph_exec.credential_inputs, + nodes_input_masks=mock_graph_exec.nodes_input_masks, + ) + + # Verify that create_graph_execution was called with identical parameters + second_call_kwargs = mock_edb.create_graph_execution.call_args[1] + + # The sanity check: both calls should use identical parameters + assert first_call_kwargs == second_call_kwargs + + # Both executions should succeed (though they create different objects) + assert result1 == mock_graph_exec + assert result2 == mock_graph_exec_2 diff --git a/autogpt_platform/backend/backend/server/conn_manager_test.py b/autogpt_platform/backend/backend/server/conn_manager_test.py index 69d984c7f2..401a9eaf81 100644 --- a/autogpt_platform/backend/backend/server/conn_manager_test.py +++ b/autogpt_platform/backend/backend/server/conn_manager_test.py @@ -88,6 +88,7 @@ async def test_send_graph_execution_result( user_id="user-1", graph_id="test_graph", graph_version=1, + preset_id=None, status=ExecutionStatus.COMPLETED, started_at=datetime.now(tz=timezone.utc), ended_at=datetime.now(tz=timezone.utc), @@ -101,6 +102,8 @@ async def test_send_graph_execution_result( "input_1": "some input value :)", "input_2": "some *other* input value", }, + credential_inputs=None, + nodes_input_masks=None, outputs={ "the_output": ["some output value"], "other_output": ["sike there was another output"], diff --git a/autogpt_platform/backend/backend/util/exceptions.py b/autogpt_platform/backend/backend/util/exceptions.py index 3a1efa0e5d..def98f544b 100644 --- a/autogpt_platform/backend/backend/util/exceptions.py +++ b/autogpt_platform/backend/backend/util/exceptions.py @@ -1,3 +1,6 @@ +from typing import Mapping + + class MissingConfigError(Exception): """The attempted operation requires configuration which is not available""" @@ -69,7 +72,7 @@ class GraphValidationError(ValueError): """Structured validation error for graph validation failures""" def __init__( - self, message: str, node_errors: dict[str, dict[str, str]] | None = None + self, message: str, node_errors: Mapping[str, Mapping[str, str]] | None = None ): super().__init__(message) self.message = message diff --git a/autogpt_platform/backend/migrations/20250902171554_add_credentials_to_graph_execution/migration.sql b/autogpt_platform/backend/migrations/20250902171554_add_credentials_to_graph_execution/migration.sql new file mode 100644 index 0000000000..1619ee84dd --- /dev/null +++ b/autogpt_platform/backend/migrations/20250902171554_add_credentials_to_graph_execution/migration.sql @@ -0,0 +1,5 @@ +-- Add 'credentialInputs', 'inputs', and 'nodesInputMasks' columns to the AgentGraphExecution table +ALTER TABLE "AgentGraphExecution" + ADD COLUMN "credentialInputs" JSONB, + ADD COLUMN "inputs" JSONB, + ADD COLUMN "nodesInputMasks" JSONB; diff --git a/autogpt_platform/backend/schema.prisma b/autogpt_platform/backend/schema.prisma index 13c3d2c529..ae54808bec 100644 --- a/autogpt_platform/backend/schema.prisma +++ b/autogpt_platform/backend/schema.prisma @@ -36,7 +36,7 @@ model User { notifyOnAgentApproved Boolean @default(true) notifyOnAgentRejected Boolean @default(true) - timezone String @default("not-set") + timezone String @default("not-set") // Relations @@ -354,15 +354,20 @@ model AgentGraphExecution { agentGraphVersion Int @default(1) AgentGraph AgentGraph @relation(fields: [agentGraphId, agentGraphVersion], references: [id, version], onDelete: Cascade) + agentPresetId String? + AgentPreset AgentPreset? @relation(fields: [agentPresetId], references: [id]) + + inputs Json? + credentialInputs Json? + nodesInputMasks Json? + NodeExecutions AgentNodeExecution[] // Link to User model -- Executed by this user userId String User User @relation(fields: [userId], references: [id], onDelete: Cascade) - stats Json? - agentPresetId String? - AgentPreset AgentPreset? @relation(fields: [agentPresetId], references: [id]) + stats Json? @@index([agentGraphId, agentGraphVersion]) @@index([userId]) diff --git a/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/OldAgentLibraryView/components/agent-run-details-view.tsx b/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/OldAgentLibraryView/components/agent-run-details-view.tsx index 78e384d211..b68ec3a9ab 100644 --- a/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/OldAgentLibraryView/components/agent-run-details-view.tsx +++ b/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/OldAgentLibraryView/components/agent-run-details-view.tsx @@ -1,5 +1,4 @@ "use client"; -import { isEmpty } from "lodash"; import moment from "moment"; import React, { useCallback, useMemo } from "react"; @@ -92,7 +91,7 @@ export function AgentRunDetailsView({ } > | undefined = useMemo(() => { - if (!("inputs" in run)) return undefined; + if (!run.inputs) return undefined; // TODO: show (link to) preset - https://github.com/Significant-Gravitas/AutoGPT/issues/9168 // Add type info from agent input schema @@ -110,14 +109,16 @@ export function AgentRunDetailsView({ const runAgain = useCallback( () => - agentRunInputs && + run.inputs && + graph.credentials_input_schema.required.every( + (k) => k in (run.credential_inputs ?? {}), + ) && api .executeGraph( graph.id, graph.version, - Object.fromEntries( - Object.entries(agentRunInputs).map(([k, v]) => [k, v.value]), - ), + run.inputs, + run.credential_inputs ?? undefined, ) .then(({ graph_exec_id }) => onRun(graph_exec_id)) .catch(toastOnFail("execute agent")), @@ -177,7 +178,9 @@ export function AgentRunDetailsView({ : []), ...(["success", "failed", "stopped"].includes(runStatus) && !graph.has_external_trigger && - isEmpty(graph.credentials_input_schema.required) // TODO: enable re-run with credentials - https://linear.app/autogpt/issue/SECRT-1243 + graph.credentials_input_schema.required.every( + (k) => k in (run.credential_inputs ?? {}), + ) ? [ { label: ( diff --git a/autogpt_platform/frontend/src/app/api/openapi.json b/autogpt_platform/frontend/src/app/api/openapi.json index 72f9d0848c..7f53237bb2 100644 --- a/autogpt_platform/frontend/src/app/api/openapi.json +++ b/autogpt_platform/frontend/src/app/api/openapi.json @@ -5199,6 +5199,36 @@ "user_id": { "type": "string", "title": "User Id" }, "graph_id": { "type": "string", "title": "Graph Id" }, "graph_version": { "type": "integer", "title": "Graph Version" }, + "inputs": { + "additionalProperties": true, + "type": "object", + "title": "Inputs" + }, + "credential_inputs": { + "anyOf": [ + { + "additionalProperties": { + "$ref": "#/components/schemas/CredentialsMetaInput" + }, + "type": "object" + }, + { "type": "null" } + ], + "title": "Credential Inputs" + }, + "nodes_input_masks": { + "anyOf": [ + { + "additionalProperties": { + "additionalProperties": true, + "type": "object" + }, + "type": "object" + }, + { "type": "null" } + ], + "title": "Nodes Input Masks" + }, "preset_id": { "anyOf": [{ "type": "string" }, { "type": "null" }], "title": "Preset Id" @@ -5220,11 +5250,6 @@ { "type": "null" } ] }, - "inputs": { - "additionalProperties": true, - "type": "object", - "title": "Inputs" - }, "outputs": { "additionalProperties": { "items": {}, "type": "array" }, "type": "object", @@ -5237,11 +5262,14 @@ "user_id", "graph_id", "graph_version", + "inputs", + "credential_inputs", + "nodes_input_masks", + "preset_id", "status", "started_at", "ended_at", "stats", - "inputs", "outputs" ], "title": "GraphExecution" @@ -5287,6 +5315,38 @@ "user_id": { "type": "string", "title": "User Id" }, "graph_id": { "type": "string", "title": "Graph Id" }, "graph_version": { "type": "integer", "title": "Graph Version" }, + "inputs": { + "anyOf": [ + { "additionalProperties": true, "type": "object" }, + { "type": "null" } + ], + "title": "Inputs" + }, + "credential_inputs": { + "anyOf": [ + { + "additionalProperties": { + "$ref": "#/components/schemas/CredentialsMetaInput" + }, + "type": "object" + }, + { "type": "null" } + ], + "title": "Credential Inputs" + }, + "nodes_input_masks": { + "anyOf": [ + { + "additionalProperties": { + "additionalProperties": true, + "type": "object" + }, + "type": "object" + }, + { "type": "null" } + ], + "title": "Nodes Input Masks" + }, "preset_id": { "anyOf": [{ "type": "string" }, { "type": "null" }], "title": "Preset Id" @@ -5315,6 +5375,10 @@ "user_id", "graph_id", "graph_version", + "inputs", + "credential_inputs", + "nodes_input_masks", + "preset_id", "status", "started_at", "ended_at", @@ -5328,6 +5392,36 @@ "user_id": { "type": "string", "title": "User Id" }, "graph_id": { "type": "string", "title": "Graph Id" }, "graph_version": { "type": "integer", "title": "Graph Version" }, + "inputs": { + "additionalProperties": true, + "type": "object", + "title": "Inputs" + }, + "credential_inputs": { + "anyOf": [ + { + "additionalProperties": { + "$ref": "#/components/schemas/CredentialsMetaInput" + }, + "type": "object" + }, + { "type": "null" } + ], + "title": "Credential Inputs" + }, + "nodes_input_masks": { + "anyOf": [ + { + "additionalProperties": { + "additionalProperties": true, + "type": "object" + }, + "type": "object" + }, + { "type": "null" } + ], + "title": "Nodes Input Masks" + }, "preset_id": { "anyOf": [{ "type": "string" }, { "type": "null" }], "title": "Preset Id" @@ -5349,11 +5443,6 @@ { "type": "null" } ] }, - "inputs": { - "additionalProperties": true, - "type": "object", - "title": "Inputs" - }, "outputs": { "additionalProperties": { "items": {}, "type": "array" }, "type": "object", @@ -5371,11 +5460,14 @@ "user_id", "graph_id", "graph_version", + "inputs", + "credential_inputs", + "nodes_input_masks", + "preset_id", "status", "started_at", "ended_at", "stats", - "inputs", "outputs", "node_executions" ], diff --git a/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts b/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts index b83cb96bae..dbe2cdb1fd 100644 --- a/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts +++ b/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts @@ -250,6 +250,9 @@ export type GraphExecutionMeta = { user_id: UserID; graph_id: GraphID; graph_version: number; + inputs: Record | null; + credential_inputs: Record | null; + nodes_input_masks: Record> | null; preset_id: LibraryAgentPresetID | null; status: | "QUEUED" @@ -276,7 +279,7 @@ export type GraphExecutionMeta = { export type GraphExecutionID = Brand; /* Mirror of backend/data/execution.py:GraphExecution */ -export type GraphExecution = GraphExecutionMeta & { +export type GraphExecution = Omit & { inputs: Record; outputs: Record>; node_executions?: NodeExecutionResult[]; @@ -534,7 +537,7 @@ export type CredentialsDeleteNeedConfirmationResponse = { export type CredentialsMetaInput = { id: string; type: CredentialsType; - title?: string; + title?: string | null; provider: string; };