mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
feat(rnd): Add freezed/static input link feature for retaining input data to be re-used in multiple node executions (#7631)
### Background
Input from the input pin is consumed only once. While this is required in most of the use cases, there are some cases where the input can only be produced once, and that input needs to be re-used just like an input default value, that is passively providing input data, without triggering any execution. The scope of this change is providing that functionality in the link level, this property will be called **`static link`** in this system.
### Changes 🏗️
Provides a static link feature with the following behaviours:
* A link can be marked `static` to become a static link.
* Once a node produces an output it will persist the output data and propagate the output to the other nodes through the link, for a static link, instead of making the data queued in the input pin, it will override the default value.
* Any input executions still waiting for the input will be backfilled using this output produced by the static link.
* And any upcoming executions that will use the input will always reuse the output produced by the static link.
See the added test to see the expected usage.
This commit is contained in:
@@ -75,9 +75,14 @@ class ExecutionResult(BaseModel):
|
||||
|
||||
@staticmethod
|
||||
def from_db(execution: AgentNodeExecution):
|
||||
input_data: BlockInput = defaultdict()
|
||||
for data in execution.Input or []:
|
||||
input_data[data.name] = json.loads(data.data)
|
||||
if execution.executionData:
|
||||
# Execution that has been queued for execution will persist its data.
|
||||
input_data = json.loads(execution.executionData)
|
||||
else:
|
||||
# For incomplete execution, executionData will not be yet available.
|
||||
input_data: BlockInput = defaultdict()
|
||||
for data in execution.Input or []:
|
||||
input_data[data.name] = json.loads(data.data)
|
||||
|
||||
output_data: CompletedBlockOutput = defaultdict(list)
|
||||
for data in execution.Output or []:
|
||||
@@ -155,19 +160,29 @@ async def upsert_execution_input(
|
||||
graph_exec_id: str,
|
||||
input_name: str,
|
||||
input_data: Any,
|
||||
node_exec_id: str | None = None,
|
||||
) -> tuple[str, BlockInput]:
|
||||
"""
|
||||
Insert AgentNodeExecutionInputOutput record for as one of AgentNodeExecution.Input.
|
||||
If there is no AgentNodeExecution that has no `input_name` as input, create new one.
|
||||
|
||||
Args:
|
||||
node_id: The id of the AgentNode.
|
||||
graph_exec_id: The id of the AgentGraphExecution.
|
||||
input_name: The name of the input data.
|
||||
input_data: The input data to be inserted.
|
||||
node_exec_id: [Optional] The id of the AgentNodeExecution that has no `input_name` as input. If not provided, it will find the eligible incomplete AgentNodeExecution or create a new one.
|
||||
|
||||
Returns:
|
||||
* The id of the created or existing AgentNodeExecution.
|
||||
* Dict of node input data, key is the input name, value is the input data.
|
||||
"""
|
||||
existing_execution = await AgentNodeExecution.prisma().find_first(
|
||||
where={ # type: ignore
|
||||
**({"id": node_exec_id} if node_exec_id else {}),
|
||||
"agentNodeId": node_id,
|
||||
"agentGraphExecutionId": graph_exec_id,
|
||||
"executionStatus": ExecutionStatus.INCOMPLETE,
|
||||
"Input": {"every": {"name": {"not": input_name}}},
|
||||
},
|
||||
order={"addedTime": "asc"},
|
||||
@@ -191,7 +206,7 @@ async def upsert_execution_input(
|
||||
input_name: input_data,
|
||||
}
|
||||
|
||||
else:
|
||||
elif not node_exec_id:
|
||||
result = await AgentNodeExecution.prisma().create(
|
||||
data={
|
||||
"agentNodeId": node_id,
|
||||
@@ -202,6 +217,11 @@ async def upsert_execution_input(
|
||||
)
|
||||
return result.id, {input_name: input_data}
|
||||
|
||||
else:
|
||||
raise ValueError(
|
||||
f"NodeExecution {node_exec_id} not found or already has input {input_name}."
|
||||
)
|
||||
|
||||
|
||||
async def upsert_execution_output(
|
||||
node_exec_id: str,
|
||||
@@ -221,8 +241,11 @@ async def upsert_execution_output(
|
||||
|
||||
|
||||
async def update_execution_status(
|
||||
node_exec_id: str, status: ExecutionStatus
|
||||
node_exec_id: str, status: ExecutionStatus, execution_data: BlockInput | None = None
|
||||
) -> ExecutionResult:
|
||||
if status == ExecutionStatus.QUEUED and execution_data is None:
|
||||
raise ValueError("Execution data must be provided when queuing an execution.")
|
||||
|
||||
now = datetime.now(tz=timezone.utc)
|
||||
data = {
|
||||
**({"executionStatus": status}),
|
||||
@@ -230,6 +253,7 @@ async def update_execution_status(
|
||||
**({"startedTime": now} if status == ExecutionStatus.RUNNING else {}),
|
||||
**({"endedTime": now} if status == ExecutionStatus.FAILED else {}),
|
||||
**({"endedTime": now} if status == ExecutionStatus.COMPLETED else {}),
|
||||
**({"executionData": json.dumps(execution_data)} if execution_data else {}),
|
||||
}
|
||||
|
||||
res = await AgentNodeExecution.prisma().update(
|
||||
@@ -255,7 +279,10 @@ async def get_execution_results(graph_exec_id: str) -> list[ExecutionResult]:
|
||||
executions = await AgentNodeExecution.prisma().find_many(
|
||||
where={"agentGraphExecutionId": graph_exec_id},
|
||||
include=EXECUTION_RESULT_INCLUDE, # type: ignore
|
||||
order={"addedTime": "asc"},
|
||||
order=[
|
||||
{"queuedTime": "asc"},
|
||||
{"addedTime": "asc"}, # Fallback: Incomplete execs has no queuedTime.
|
||||
],
|
||||
)
|
||||
res = [ExecutionResult.from_db(execution) for execution in executions]
|
||||
return res
|
||||
@@ -337,3 +364,33 @@ def merge_execution_input(data: BlockInput) -> BlockInput:
|
||||
setattr(data[name], index, value)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
async def get_latest_execution(node_id: str, graph_eid: str) -> ExecutionResult | None:
|
||||
execution = await AgentNodeExecution.prisma().find_first(
|
||||
where={ # type: ignore
|
||||
"agentNodeId": node_id,
|
||||
"agentGraphExecutionId": graph_eid,
|
||||
"executionStatus": {"not": ExecutionStatus.INCOMPLETE},
|
||||
"executionData": {"not": None},
|
||||
},
|
||||
order={"queuedTime": "desc"},
|
||||
include=EXECUTION_RESULT_INCLUDE, # type: ignore
|
||||
)
|
||||
if not execution:
|
||||
return None
|
||||
return ExecutionResult.from_db(execution)
|
||||
|
||||
|
||||
async def get_incomplete_executions(
|
||||
node_id: str, graph_eid: str
|
||||
) -> list[ExecutionResult]:
|
||||
executions = await AgentNodeExecution.prisma().find_many(
|
||||
where={ # type: ignore
|
||||
"agentNodeId": node_id,
|
||||
"agentGraphExecutionId": graph_eid,
|
||||
"executionStatus": ExecutionStatus.INCOMPLETE,
|
||||
},
|
||||
include=EXECUTION_RESULT_INCLUDE, # type: ignore
|
||||
)
|
||||
return [ExecutionResult.from_db(execution) for execution in executions]
|
||||
|
||||
@@ -17,6 +17,7 @@ class Link(BaseDbModel):
|
||||
sink_id: str
|
||||
source_name: str
|
||||
sink_name: str
|
||||
is_static: bool = False
|
||||
|
||||
@staticmethod
|
||||
def from_db(link: AgentNodeLink):
|
||||
@@ -26,6 +27,7 @@ class Link(BaseDbModel):
|
||||
source_id=link.agentNodeSourceId,
|
||||
sink_name=link.sinkName,
|
||||
sink_id=link.agentNodeSinkId,
|
||||
is_static=link.isStatic,
|
||||
)
|
||||
|
||||
def __hash__(self):
|
||||
@@ -224,7 +226,6 @@ async def create_graph(graph: Graph) -> Graph:
|
||||
}
|
||||
)
|
||||
|
||||
# TODO: replace bulk creation using create_many
|
||||
await asyncio.gather(
|
||||
*[
|
||||
AgentNode.prisma().create(
|
||||
@@ -250,6 +251,7 @@ async def create_graph(graph: Graph) -> Graph:
|
||||
"sinkName": link.sink_name,
|
||||
"agentNodeSourceId": link.source_id,
|
||||
"agentNodeSinkId": link.sink_id,
|
||||
"isStatic": link.is_static,
|
||||
}
|
||||
)
|
||||
for link in graph.links
|
||||
|
||||
@@ -15,6 +15,8 @@ from autogpt_server.data.execution import (
|
||||
GraphExecution,
|
||||
NodeExecution,
|
||||
create_graph_execution,
|
||||
get_incomplete_executions,
|
||||
get_latest_execution,
|
||||
merge_execution_input,
|
||||
parse_execution_output,
|
||||
update_execution_status,
|
||||
@@ -129,23 +131,34 @@ def _enqueue_next_nodes(
|
||||
def wait(f: Coroutine[T, Any, T]) -> T:
|
||||
return loop.run_until_complete(f)
|
||||
|
||||
def execution_update(node_exec_id: str, status: ExecutionStatus):
|
||||
exec_update = wait(update_execution_status(node_exec_id, status))
|
||||
def add_enqueued_execution(
|
||||
node_exec_id: str, node_id: str, data: BlockInput
|
||||
) -> NodeExecution:
|
||||
exec_update = wait(
|
||||
update_execution_status(node_exec_id, ExecutionStatus.QUEUED, data)
|
||||
)
|
||||
api_client.send_execution_update(exec_update.model_dump())
|
||||
return NodeExecution(
|
||||
graph_exec_id=graph_exec_id,
|
||||
node_exec_id=node_exec_id,
|
||||
node_id=node_id,
|
||||
data=data,
|
||||
)
|
||||
|
||||
def register_next_execution(node_link: Link) -> NodeExecution | None:
|
||||
def register_next_executions(node_link: Link) -> list[NodeExecution]:
|
||||
enqueued_executions = []
|
||||
next_output_name = node_link.source_name
|
||||
next_input_name = node_link.sink_name
|
||||
next_node_id = node_link.sink_id
|
||||
|
||||
next_data = parse_execution_output(output, next_output_name)
|
||||
if next_data is None:
|
||||
return
|
||||
return enqueued_executions
|
||||
|
||||
next_node = wait(get_node(next_node_id))
|
||||
if not next_node:
|
||||
logger.error(f"{prefix} Error, next node {next_node_id} not found.")
|
||||
return
|
||||
return enqueued_executions
|
||||
|
||||
# Upserting execution input includes reading the existing input pins in the node
|
||||
# which then either updating the existing execution input or creating a new one.
|
||||
@@ -160,27 +173,67 @@ def _enqueue_next_nodes(
|
||||
)
|
||||
)
|
||||
|
||||
# Complete missing static input pins data using the last execution input.
|
||||
static_link_names = {
|
||||
link.sink_name
|
||||
for link in next_node.input_links
|
||||
if link.is_static and link.sink_name not in next_node_input
|
||||
}
|
||||
if static_link_names and (
|
||||
latest_execution := wait(get_latest_execution(next_node_id, graph_exec_id))
|
||||
):
|
||||
for name in static_link_names:
|
||||
next_node_input[name] = latest_execution.input_data.get(name)
|
||||
|
||||
next_node_input, validation_msg = validate_exec(next_node, next_node_input)
|
||||
suffix = f"{next_output_name}>{next_input_name}~{next_node_id}:{validation_msg}"
|
||||
suffix = (
|
||||
f"{next_output_name}>{next_input_name}~{next_node_exec_id}:{validation_msg}"
|
||||
)
|
||||
|
||||
if not next_node_input:
|
||||
logger.warning(f"{prefix} Skipped queueing {suffix}")
|
||||
return
|
||||
return enqueued_executions
|
||||
|
||||
# Input is complete, enqueue the execution.
|
||||
logger.warning(f"{prefix} Enqueued {suffix}")
|
||||
execution_update(next_node_exec_id, ExecutionStatus.QUEUED)
|
||||
return NodeExecution(
|
||||
graph_exec_id=graph_exec_id,
|
||||
node_exec_id=next_node_exec_id,
|
||||
node_id=next_node.id,
|
||||
data=next_node_input,
|
||||
enqueued_executions.append(
|
||||
add_enqueued_execution(next_node_exec_id, next_node_id, next_node_input)
|
||||
)
|
||||
|
||||
if not node_link.is_static:
|
||||
return enqueued_executions
|
||||
|
||||
# If link is static, there could be some incomplete executions waiting for it.
|
||||
# Load and complete the input missing input data, and try to re-enqueue them.
|
||||
# While reading, we should avoid any other process to re-enqueue the same node.
|
||||
with synchronized(api_client, ("upsert_input", next_node_id, graph_exec_id)):
|
||||
for iexec in wait(get_incomplete_executions(next_node_id, graph_exec_id)):
|
||||
idata = iexec.input_data
|
||||
ineid = iexec.node_exec_id
|
||||
|
||||
static_link_names = {
|
||||
link.sink_name
|
||||
for link in next_node.input_links
|
||||
if link.is_static and link.sink_name not in idata
|
||||
}
|
||||
for input_name in static_link_names:
|
||||
idata[input_name] = next_node_input[input_name]
|
||||
|
||||
idata, msg = validate_exec(next_node, idata)
|
||||
suffix = f"{next_output_name}>{next_input_name}~{ineid}:{msg}"
|
||||
if not idata:
|
||||
logger.warning(f"{prefix} Re-enqueueing skipped: {suffix}")
|
||||
continue
|
||||
logger.warning(f"{prefix} Re-enqueued {suffix}")
|
||||
enqueued_executions.append(
|
||||
add_enqueued_execution(iexec.node_exec_id, next_node_id, idata)
|
||||
)
|
||||
return enqueued_executions
|
||||
|
||||
return [
|
||||
execution
|
||||
for link in node.output_links
|
||||
if (execution := register_next_execution(link))
|
||||
for execution in register_next_executions(link)
|
||||
]
|
||||
|
||||
|
||||
@@ -391,7 +444,9 @@ class ExecutionManager(AppService):
|
||||
)
|
||||
)
|
||||
exec_update = self.run_and_wait(
|
||||
update_execution_status(node_exec.node_exec_id, ExecutionStatus.QUEUED)
|
||||
update_execution_status(
|
||||
node_exec.node_exec_id, ExecutionStatus.QUEUED, node_exec.input_data
|
||||
)
|
||||
)
|
||||
self.agent_server_client.send_execution_update(exec_update.model_dump())
|
||||
|
||||
|
||||
@@ -469,9 +469,6 @@ class AgentServer(AppService):
|
||||
status_code=400, detail="Either graph or template_id must be provided."
|
||||
)
|
||||
|
||||
# TODO: replace uuid generation here to DB generated uuids.
|
||||
graph.id = str(uuid.uuid4())
|
||||
|
||||
graph.is_template = is_template
|
||||
graph.is_active = not is_template
|
||||
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
-- AlterTable
|
||||
ALTER TABLE "AgentNodeExecution" ADD COLUMN "executionData" TEXT;
|
||||
|
||||
-- RedefineTables
|
||||
PRAGMA foreign_keys=OFF;
|
||||
CREATE TABLE "new_AgentNodeLink" (
|
||||
"id" TEXT NOT NULL PRIMARY KEY,
|
||||
"agentNodeSourceId" TEXT NOT NULL,
|
||||
"sourceName" TEXT NOT NULL,
|
||||
"agentNodeSinkId" TEXT NOT NULL,
|
||||
"sinkName" TEXT NOT NULL,
|
||||
"isStatic" BOOLEAN NOT NULL DEFAULT false,
|
||||
CONSTRAINT "AgentNodeLink_agentNodeSourceId_fkey" FOREIGN KEY ("agentNodeSourceId") REFERENCES "AgentNode" ("id") ON DELETE RESTRICT ON UPDATE CASCADE,
|
||||
CONSTRAINT "AgentNodeLink_agentNodeSinkId_fkey" FOREIGN KEY ("agentNodeSinkId") REFERENCES "AgentNode" ("id") ON DELETE RESTRICT ON UPDATE CASCADE
|
||||
);
|
||||
INSERT INTO "new_AgentNodeLink" ("agentNodeSinkId", "agentNodeSourceId", "id", "sinkName", "sourceName") SELECT "agentNodeSinkId", "agentNodeSourceId", "id", "sinkName", "sourceName" FROM "AgentNodeLink";
|
||||
DROP TABLE "AgentNodeLink";
|
||||
ALTER TABLE "new_AgentNodeLink" RENAME TO "AgentNodeLink";
|
||||
PRAGMA foreign_key_check;
|
||||
PRAGMA foreign_keys=ON;
|
||||
@@ -0,0 +1,5 @@
|
||||
-- AlterTable
|
||||
ALTER TABLE "AgentNodeExecution" ADD COLUMN "executionData" TEXT;
|
||||
|
||||
-- AlterTable
|
||||
ALTER TABLE "AgentNodeLink" ADD COLUMN "isStatic" BOOLEAN NOT NULL DEFAULT false;
|
||||
@@ -65,6 +65,9 @@ model AgentNodeLink {
|
||||
agentNodeSinkId String
|
||||
AgentNodeSink AgentNode @relation("AgentNodeSink", fields: [agentNodeSinkId], references: [id])
|
||||
sinkName String
|
||||
|
||||
// Default: the data coming from the source can only be consumed by the sink once, Static: input data will be reused.
|
||||
isStatic Boolean @default(false)
|
||||
}
|
||||
|
||||
// This model describes a component that will be executed by the AgentNode.
|
||||
@@ -108,6 +111,8 @@ model AgentNodeExecution {
|
||||
// sqlite does not support enum
|
||||
// enum Status { INCOMPLETE, QUEUED, RUNNING, SUCCESS, FAILED }
|
||||
executionStatus String
|
||||
// Final JSON serialized input data for the node execution.
|
||||
executionData String?
|
||||
addedTime DateTime @default(now())
|
||||
queuedTime DateTime?
|
||||
startedTime DateTime?
|
||||
@@ -148,4 +153,4 @@ model AgentGraphExecutionSchedule {
|
||||
lastUpdated DateTime @updatedAt
|
||||
|
||||
@@index([isEnabled])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,6 +65,9 @@ model AgentNodeLink {
|
||||
agentNodeSinkId String
|
||||
AgentNodeSink AgentNode @relation("AgentNodeSink", fields: [agentNodeSinkId], references: [id])
|
||||
sinkName String
|
||||
|
||||
// Default: the data coming from the source can only be consumed by the sink once, Static: input data will be reused.
|
||||
isStatic Boolean @default(false)
|
||||
}
|
||||
|
||||
// This model describes a component that will be executed by the AgentNode.
|
||||
@@ -108,6 +111,8 @@ model AgentNodeExecution {
|
||||
// sqlite does not support enum
|
||||
// enum Status { INCOMPLETE, QUEUED, RUNNING, SUCCESS, FAILED }
|
||||
executionStatus String
|
||||
// Final JSON serialized input data for the node execution.
|
||||
executionData String?
|
||||
addedTime DateTime @default(now())
|
||||
queuedTime DateTime?
|
||||
startedTime DateTime?
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import pytest
|
||||
|
||||
from autogpt_server.blocks.basic import ObjectLookupBlock, ValueBlock
|
||||
from autogpt_server.blocks.if_block import ComparisonOperator, ConditionBlock
|
||||
from autogpt_server.blocks.maths import MathsBlock, Operation
|
||||
from autogpt_server.data import execution, graph
|
||||
from autogpt_server.executor import ExecutionManager
|
||||
from autogpt_server.server import AgentServer
|
||||
@@ -9,13 +11,13 @@ from autogpt_server.util.test import wait_execution
|
||||
|
||||
|
||||
async def execute_graph(
|
||||
agent_server: AgentServer,
|
||||
test_manager: ExecutionManager,
|
||||
test_graph: graph.Graph,
|
||||
input_data: dict[str, str],
|
||||
input_data: dict,
|
||||
num_execs: int = 4,
|
||||
) -> str:
|
||||
# --- Test adding new executions --- #
|
||||
agent_server = AgentServer()
|
||||
response = await agent_server.execute_graph(test_graph.id, input_data)
|
||||
graph_exec_id = response["id"]
|
||||
|
||||
@@ -24,9 +26,10 @@ async def execute_graph(
|
||||
return graph_exec_id
|
||||
|
||||
|
||||
async def assert_sample_graph_executions(test_graph: graph.Graph, graph_exec_id: str):
|
||||
async def assert_sample_graph_executions(
|
||||
agent_server: AgentServer, test_graph: graph.Graph, graph_exec_id: str
|
||||
):
|
||||
text = "Hello, World!"
|
||||
agent_server = AgentServer()
|
||||
executions = await agent_server.get_run_execution_results(
|
||||
test_graph.id, graph_exec_id
|
||||
)
|
||||
@@ -53,8 +56,11 @@ async def assert_sample_graph_executions(test_graph: graph.Graph, graph_exec_id:
|
||||
assert exec.graph_exec_id == graph_exec_id
|
||||
assert exec.output_data == {"output": ["Hello, World!,Hello, World!,!!!"]}
|
||||
assert exec.input_data == {
|
||||
"format": "{texts[0]},{texts[1]},{texts[2]}",
|
||||
"texts": ["Hello, World!", "Hello, World!", "!!!"],
|
||||
"texts_$_1": "Hello, World!",
|
||||
"texts_$_2": "Hello, World!",
|
||||
"texts_$_3": "!!!",
|
||||
}
|
||||
assert exec.node_id == test_graph.nodes[2].id
|
||||
|
||||
@@ -72,8 +78,10 @@ async def test_agent_execution(server):
|
||||
test_graph = create_test_graph()
|
||||
await graph.create_graph(test_graph)
|
||||
data = {"input": "Hello, World!"}
|
||||
graph_exec_id = await execute_graph(server.exec_manager, test_graph, data, 4)
|
||||
await assert_sample_graph_executions(test_graph, graph_exec_id)
|
||||
graph_exec_id = await execute_graph(
|
||||
server.agent_server, server.exec_manager, test_graph, data, 4
|
||||
)
|
||||
await assert_sample_graph_executions(server.agent_server, test_graph, graph_exec_id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio(scope="session")
|
||||
@@ -125,10 +133,11 @@ async def test_input_pin_always_waited(server):
|
||||
)
|
||||
|
||||
test_graph = await graph.create_graph(test_graph)
|
||||
graph_exec_id = await execute_graph(server.exec_manager, test_graph, {}, 3)
|
||||
graph_exec_id = await execute_graph(
|
||||
server.agent_server, server.exec_manager, test_graph, {}, 3
|
||||
)
|
||||
|
||||
agent_server = AgentServer()
|
||||
executions = await agent_server.get_run_execution_results(
|
||||
executions = await server.agent_server.get_run_execution_results(
|
||||
test_graph.id, graph_exec_id
|
||||
)
|
||||
assert len(executions) == 3
|
||||
@@ -136,3 +145,83 @@ async def test_input_pin_always_waited(server):
|
||||
# Hence executing extraction of "key" from {"key1": "value1", "key2": "value2"}
|
||||
assert executions[2].status == execution.ExecutionStatus.COMPLETED
|
||||
assert executions[2].output_data == {"output": ["value2"]}
|
||||
|
||||
|
||||
@pytest.mark.asyncio(scope="session")
|
||||
async def test_static_input_link_on_graph(server):
|
||||
"""
|
||||
This test is asserting the behaviour of static input link, e.g: reusable input link.
|
||||
|
||||
Test scenario:
|
||||
*ValueBlock1*===a=========\\
|
||||
*ValueBlock2*===a=====\\ ||
|
||||
*ValueBlock3*===a===*MathBlock*====b / static====*ValueBlock5*
|
||||
*ValueBlock4*=========================================//
|
||||
|
||||
In this test, there will be three input waiting in the MathBlock input pin `a`.
|
||||
And later, another output is produced on input pin `b`, which is a static link,
|
||||
this input will complete the input of those three incomplete executions.
|
||||
"""
|
||||
nodes = [
|
||||
graph.Node(block_id=ValueBlock().id, input_default={"input": 4}), # a
|
||||
graph.Node(block_id=ValueBlock().id, input_default={"input": 4}), # a
|
||||
graph.Node(block_id=ValueBlock().id, input_default={"input": 4}), # a
|
||||
graph.Node(block_id=ValueBlock().id, input_default={"input": 5}), # b
|
||||
graph.Node(block_id=ValueBlock().id),
|
||||
graph.Node(
|
||||
block_id=MathsBlock().id,
|
||||
input_default={"operation": Operation.ADD.value},
|
||||
),
|
||||
]
|
||||
links = [
|
||||
graph.Link(
|
||||
source_id=nodes[0].id,
|
||||
sink_id=nodes[5].id,
|
||||
source_name="output",
|
||||
sink_name="a",
|
||||
),
|
||||
graph.Link(
|
||||
source_id=nodes[1].id,
|
||||
sink_id=nodes[5].id,
|
||||
source_name="output",
|
||||
sink_name="a",
|
||||
),
|
||||
graph.Link(
|
||||
source_id=nodes[2].id,
|
||||
sink_id=nodes[5].id,
|
||||
source_name="output",
|
||||
sink_name="a",
|
||||
),
|
||||
graph.Link(
|
||||
source_id=nodes[3].id,
|
||||
sink_id=nodes[4].id,
|
||||
source_name="output",
|
||||
sink_name="input",
|
||||
),
|
||||
graph.Link(
|
||||
source_id=nodes[4].id,
|
||||
sink_id=nodes[5].id,
|
||||
source_name="output",
|
||||
sink_name="b",
|
||||
is_static=True, # This is the static link to test.
|
||||
),
|
||||
]
|
||||
test_graph = graph.Graph(
|
||||
name="TestGraph",
|
||||
description="Test graph",
|
||||
nodes=nodes,
|
||||
links=links,
|
||||
)
|
||||
|
||||
test_graph = await graph.create_graph(test_graph)
|
||||
graph_exec_id = await execute_graph(
|
||||
server.agent_server, server.exec_manager, test_graph, {}, 8
|
||||
)
|
||||
executions = await server.agent_server.get_run_execution_results(
|
||||
test_graph.id, graph_exec_id
|
||||
)
|
||||
assert len(executions) == 8
|
||||
# The last 3 executions will be a+b=4+5=9
|
||||
for exec_data in executions[-3:]:
|
||||
assert exec_data.status == execution.ExecutionStatus.COMPLETED
|
||||
assert exec_data.output_data == {"result": [9]}
|
||||
|
||||
Reference in New Issue
Block a user