Files
AutoGPT/rnd/autogpt_server/test/executor/test_manager.py
Zamil Majdy d2a5bb286f feat(rnd): Fix concurrency issue on Agent Server & Apply max-1-execution-per-node constraint in graph execution (#7551)
### Background

When multiple executors are executing the same node within the same graph execution, two node executions can read the same queue of input and read the same value—making the data that is supposed to be consumed once, consumed by two executions. The lack of lock & concurrency support for parallel execution within a single graph causes this issue.

Node concurrency also introduces poor UX in the current frontend implementation, when two nodes are executed in parallel, the current UI will not display its parallel execution update, but instead, it shows the updates that override each other. Until the execution observability is improved on the builder UI, this capability will be limited.

### Changes 🏗️

The scope of this change is to solve this issue by:
* Decouple Graph execution & Node execution, each has its own configured process pool.
* Make sure there is only 1 execution per node (we still allow parallel executions on different nodes) in a graph.
* Fixed concurrency issue by adding distributed lock API on agent_server.
* Few cleanups:
    - Add more logging with geid & neid prefix on graph/node executions
    - Moved execution status update to agent-server for a single source of status update (required by conn-manager/web-socket)
    - Configured node parallelism to 10 & graph parallelism to 10 by default, so in the very rare worst-case, there can be 100 node executions.
    - Re-use server resource for each integration test run
2024-07-26 17:08:03 +07:00

139 lines
4.7 KiB
Python

import pytest
from autogpt_server.blocks.basic import ObjectLookupBlock, ValueBlock
from autogpt_server.data import execution, graph
from autogpt_server.executor import ExecutionManager
from autogpt_server.server import AgentServer
from autogpt_server.usecases.sample import create_test_graph
from autogpt_server.util.test import wait_execution
async def execute_graph(
test_manager: ExecutionManager,
test_graph: graph.Graph,
input_data: dict[str, str],
num_execs: int = 4,
) -> str:
# --- Test adding new executions --- #
agent_server = AgentServer()
response = await agent_server.execute_graph(test_graph.id, input_data)
graph_exec_id = response["id"]
# Execution queue should be empty
assert await wait_execution(test_manager, test_graph.id, graph_exec_id, num_execs)
return graph_exec_id
async def assert_sample_graph_executions(test_graph: graph.Graph, graph_exec_id: str):
text = "Hello, World!"
agent_server = AgentServer()
executions = await agent_server.get_run_execution_results(
test_graph.id, graph_exec_id
)
# Executing ConstantBlock1
exec = executions[0]
assert exec.status == execution.ExecutionStatus.COMPLETED
assert exec.graph_exec_id == graph_exec_id
assert exec.output_data == {"output": ["Hello, World!"]}
assert exec.input_data == {"input": text}
assert exec.node_id in [test_graph.nodes[0].id, test_graph.nodes[1].id]
# Executing ConstantBlock2
exec = executions[1]
assert exec.status == execution.ExecutionStatus.COMPLETED
assert exec.graph_exec_id == graph_exec_id
assert exec.output_data == {"output": ["Hello, World!"]}
assert exec.input_data == {"input": text}
assert exec.node_id in [test_graph.nodes[0].id, test_graph.nodes[1].id]
# Executing TextFormatterBlock
exec = executions[2]
assert exec.status == execution.ExecutionStatus.COMPLETED
assert exec.graph_exec_id == graph_exec_id
assert exec.output_data == {"output": ["Hello, World!,Hello, World!,!!!"]}
assert exec.input_data == {
"texts_$_1": "Hello, World!",
"texts_$_2": "Hello, World!",
}
assert exec.node_id == test_graph.nodes[2].id
# Executing PrintingBlock
exec = executions[3]
assert exec.status == execution.ExecutionStatus.COMPLETED
assert exec.graph_exec_id == graph_exec_id
assert exec.output_data == {"status": ["printed"]}
assert exec.input_data == {"text": "Hello, World!,Hello, World!,!!!"}
assert exec.node_id == test_graph.nodes[3].id
@pytest.mark.asyncio(scope="session")
async def test_agent_execution(server):
test_graph = create_test_graph()
await graph.create_graph(test_graph)
data = {"input": "Hello, World!"}
graph_exec_id = await execute_graph(server.exec_manager, test_graph, data, 4)
await assert_sample_graph_executions(test_graph, graph_exec_id)
@pytest.mark.asyncio(scope="session")
async def test_input_pin_always_waited(server):
"""
This test is asserting that the input pin should always be waited for the execution,
even when default value on that pin is defined, the value has to be ignored.
Test scenario:
ValueBlock1
\\ input
>------- ObjectLookupBlock | input_default: key: "", input: {}
// key
ValueBlock2
"""
nodes = [
graph.Node(
block_id=ValueBlock().id,
input_default={"input": {"key1": "value1", "key2": "value2"}},
),
graph.Node(
block_id=ValueBlock().id,
input_default={"input": "key2"},
),
graph.Node(
block_id=ObjectLookupBlock().id,
input_default={"key": "", "input": {}},
),
]
links = [
graph.Link(
source_id=nodes[0].id,
sink_id=nodes[2].id,
source_name="output",
sink_name="input",
),
graph.Link(
source_id=nodes[1].id,
sink_id=nodes[2].id,
source_name="output",
sink_name="key",
),
]
test_graph = graph.Graph(
name="TestGraph",
description="Test graph",
nodes=nodes,
links=links,
)
test_graph = await graph.create_graph(test_graph)
graph_exec_id = await execute_graph(server.exec_manager, test_graph, {}, 3)
agent_server = AgentServer()
executions = await agent_server.get_run_execution_results(
test_graph.id, graph_exec_id
)
assert len(executions) == 3
# ObjectLookupBlock should wait for the input pin to be provided,
# Hence executing extraction of "key" from {"key1": "value1", "key2": "value2"}
assert executions[2].status == execution.ExecutionStatus.COMPLETED
assert executions[2].output_data == {"output": ["value2"]}