Files
AutoGPT/rnd/autogpt_server/test/executor/test_manager.py
Zamil Majdy 2bc22c5450 feat(rnd): Add support for dynamic input as list for AgentServer Block (#7268)
On AgentServer, To create a Block like StringFormatterBlock or LllmCallBlock, we need some way to dynamically link input pins and aggregate them into a single list input. This will give a better experience for the user to construct an input and link it from the output of the other nodes. The scope of this change is adding support for that in the least intrusive way.

Proposal
To differentiate the input list name and its singular entry we are using the $_<index> prefix. For example:
For the input items: list[int], you can set a pin items with values like [1,2,3,4]. But you can also add input pins like items_$_0 or items_$_4 with values 1 or 2, which will be appended to the items input in alphabetical order.
The execution engine will guarantee to wait for the execution until all the input pin value is produced, so input pin with list input will produce fix-sized list.
2024-06-27 21:51:34 +07:00

118 lines
3.9 KiB
Python

import time
import pytest
from autogpt_server.data import block, db, execution, graph
from autogpt_server.executor import ExecutionManager
from autogpt_server.server import AgentServer
from autogpt_server.util.service import PyroNameServer
async def create_test_graph() -> graph.Graph:
"""
ParrotBlock
\
---- TextFormatterBlock ---- PrintingBlock
/
ParrotBlock
"""
nodes = [
graph.Node(block_id=block.ParrotBlock.id),
graph.Node(block_id=block.ParrotBlock.id),
graph.Node(
block_id=block.TextFormatterBlock.id,
input_default={
"format": "{texts[0]},{texts[1]},{texts[2]}",
"texts_$_3": "!!!",
},
),
graph.Node(block_id=block.PrintingBlock.id),
]
nodes[0].connect(nodes[2], "output", "texts_$_1")
nodes[1].connect(nodes[2], "output", "texts_$_2")
nodes[2].connect(nodes[3], "combined_text", "text")
test_graph = graph.Graph(
name="TestGraph",
description="Test graph",
nodes=nodes,
)
await block.initialize_blocks()
result = await graph.create_graph(test_graph)
# Assertions
assert result.name == test_graph.name
assert result.description == test_graph.description
assert len(result.nodes) == len(test_graph.nodes)
return test_graph
async def execute_graph(test_manager: ExecutionManager, test_graph: graph.Graph):
# --- Test adding new executions --- #
text = "Hello, World!"
input_data = {"input": text}
agent_server = AgentServer()
response = await agent_server.execute_graph(test_graph.id, input_data)
executions = response["executions"]
graph_exec_id = response["id"]
assert len(executions) == 2
async def is_execution_completed():
execs = await agent_server.get_executions(test_graph.id, graph_exec_id)
return test_manager.queue.empty() and len(execs) == 4
# Wait for the executions to complete
for i in range(10):
if await is_execution_completed():
break
time.sleep(1)
# Execution queue should be empty
assert await is_execution_completed()
executions = await agent_server.get_executions(test_graph.id, graph_exec_id)
# Executing ParrotBlock1
exec = executions[0]
assert exec.status == execution.ExecutionStatus.COMPLETED
assert exec.graph_exec_id == graph_exec_id
assert exec.output_data == {"output": ["Hello, World!"]}
assert exec.input_data == {"input": text}
assert exec.node_id == test_graph.nodes[0].id
# Executing ParrotBlock2
exec = executions[1]
assert exec.status == execution.ExecutionStatus.COMPLETED
assert exec.graph_exec_id == graph_exec_id
assert exec.output_data == {"output": ["Hello, World!"]}
assert exec.input_data == {"input": text}
assert exec.node_id == test_graph.nodes[1].id
# Executing TextFormatterBlock
exec = executions[2]
assert exec.status == execution.ExecutionStatus.COMPLETED
assert exec.graph_exec_id == graph_exec_id
assert exec.output_data == {"combined_text": ["Hello, World!,Hello, World!,!!!"]}
assert exec.input_data == {
"texts_$_1": "Hello, World!",
"texts_$_2": "Hello, World!",
}
assert exec.node_id == test_graph.nodes[2].id
# Executing PrintingBlock
exec = executions[3]
assert exec.status == execution.ExecutionStatus.COMPLETED
assert exec.graph_exec_id == graph_exec_id
assert exec.output_data == {"status": ["printed"]}
assert exec.input_data == {"text": "Hello, World!,Hello, World!,!!!"}
assert exec.node_id == test_graph.nodes[3].id
@pytest.mark.asyncio(scope="session")
async def test_agent_execution():
with PyroNameServer():
with ExecutionManager(1) as test_manager:
await db.connect()
test_graph = await create_test_graph()
await execute_graph(test_manager, test_graph)