feat(rnd): Fix concurrency issue on Agent Server & Apply max-1-execution-per-node constraint in graph execution (#7551)

### Background

When multiple executors are executing the same node within the same graph execution, two node executions can read the same queue of input and read the same value—making the data that is supposed to be consumed once, consumed by two executions. The lack of lock & concurrency support for parallel execution within a single graph causes this issue.

Node concurrency also introduces poor UX in the current frontend implementation, when two nodes are executed in parallel, the current UI will not display its parallel execution update, but instead, it shows the updates that override each other. Until the execution observability is improved on the builder UI, this capability will be limited.

### Changes 🏗️

The scope of this change is to solve this issue by:
* Decouple Graph execution & Node execution, each has its own configured process pool.
* Make sure there is only 1 execution per node (we still allow parallel executions on different nodes) in a graph.
* Fixed concurrency issue by adding distributed lock API on agent_server.
* Few cleanups:
    - Add more logging with geid & neid prefix on graph/node executions
    - Moved execution status update to agent-server for a single source of status update (required by conn-manager/web-socket)
    - Configured node parallelism to 10 & graph parallelism to 10 by default, so in the very rare worst-case, there can be 100 node executions.
    - Re-use server resource for each integration test run
This commit is contained in:
Zamil Majdy
2024-07-26 14:08:03 +04:00
committed by GitHub
parent 36b9a0a930
commit d2a5bb286f
14 changed files with 287 additions and 120 deletions

View File

@@ -28,14 +28,13 @@ def run_processes(processes: list[AppProcess], **kwargs):
def main(**kwargs):
settings = get_config_and_secrets()
set_start_method("spawn", force=True)
freeze_support()
run_processes(
[
PyroNameServer(),
ExecutionManager(pool_size=settings.config.num_workers),
ExecutionManager(),
ExecutionScheduler(),
AgentServer(),
],

View File

@@ -2,7 +2,7 @@ from collections import defaultdict
from datetime import datetime, timezone
from enum import Enum
from multiprocessing import Manager
from typing import Any
from typing import Any, Generic, TypeVar
from prisma.models import (
AgentGraphExecution,
@@ -16,6 +16,11 @@ from autogpt_server.data.block import BlockData, BlockInput, CompletedBlockOutpu
from autogpt_server.util import json
class GraphExecution(BaseModel):
graph_exec_id: str
start_node_execs: list["NodeExecution"]
class NodeExecution(BaseModel):
graph_exec_id: str
node_exec_id: str
@@ -31,7 +36,10 @@ class ExecutionStatus(str, Enum):
FAILED = "FAILED"
class ExecutionQueue:
T = TypeVar("T")
class ExecutionQueue(Generic[T]):
"""
Queue for managing the execution of agents.
This will be shared between different processes
@@ -40,11 +48,11 @@ class ExecutionQueue:
def __init__(self):
self.queue = Manager().Queue()
def add(self, execution: NodeExecution) -> NodeExecution:
def add(self, execution: T) -> T:
self.queue.put(execution)
return execution
def get(self) -> NodeExecution:
def get(self) -> T:
return self.queue.get()
def empty(self) -> bool:
@@ -146,14 +154,15 @@ async def upsert_execution_input(
node_id: str,
graph_exec_id: str,
input_name: str,
data: Any,
) -> str:
input_data: Any,
) -> tuple[str, BlockInput]:
"""
Insert AgentNodeExecutionInputOutput record for as one of AgentNodeExecution.Input.
If there is no AgentNodeExecution that has no `input_name` as input, create new one.
Returns:
The id of the created or existing AgentNodeExecution.
* The id of the created or existing AgentNodeExecution.
* Dict of node input data, key is the input name, value is the input data.
"""
existing_execution = await AgentNodeExecution.prisma().find_first(
where={ # type: ignore
@@ -162,18 +171,25 @@ async def upsert_execution_input(
"Input": {"every": {"name": {"not": input_name}}},
},
order={"addedTime": "asc"},
include={"Input": True},
)
json_data = json.dumps(data)
json_input_data = json.dumps(input_data)
if existing_execution:
await AgentNodeExecutionInputOutput.prisma().create(
data={
"name": input_name,
"data": json_data,
"data": json_input_data,
"referencedByInputExecId": existing_execution.id,
}
)
return existing_execution.id
return existing_execution.id, {
**{
input_data.name: json.loads(input_data.data)
for input_data in existing_execution.Input or []
},
input_name: input_data,
}
else:
result = await AgentNodeExecution.prisma().create(
@@ -181,10 +197,10 @@ async def upsert_execution_input(
"agentNodeId": node_id,
"agentGraphExecutionId": graph_exec_id,
"executionStatus": ExecutionStatus.INCOMPLETE,
"Input": {"create": {"name": input_name, "data": json_data}},
"Input": {"create": {"name": input_name, "data": json_input_data}},
}
)
return result.id
return result.id, {input_name: input_data}
async def upsert_execution_output(
@@ -245,26 +261,6 @@ async def get_execution_results(graph_exec_id: str) -> list[ExecutionResult]:
return res
async def get_node_execution_input(node_exec_id: str) -> BlockInput:
"""
Get execution node input data from the previous node execution result.
Returns:
dictionary of input data, key is the input name, value is the input data.
"""
execution = await AgentNodeExecution.prisma().find_unique_or_raise(
where={"id": node_exec_id},
include=EXECUTION_RESULT_INCLUDE, # type: ignore
)
if not execution.AgentNode:
raise ValueError(f"Node {execution.agentNodeId} not found.")
return {
input_data.name: json.loads(input_data.data)
for input_data in execution.Input or []
}
LIST_SPLIT = "_$_"
DICT_SPLIT = "_#_"
OBJC_SPLIT = "_@_"

View File

@@ -1,6 +1,7 @@
import asyncio
import logging
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import Future, ProcessPoolExecutor, TimeoutError
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Coroutine, Generator, TypeVar
if TYPE_CHECKING:
@@ -8,33 +9,34 @@ if TYPE_CHECKING:
from autogpt_server.data import db
from autogpt_server.data.block import Block, BlockData, BlockInput, get_block
from autogpt_server.data.execution import ExecutionQueue, ExecutionStatus
from autogpt_server.data.execution import NodeExecution as Execution
from autogpt_server.data.execution import (
ExecutionQueue,
ExecutionStatus,
GraphExecution,
NodeExecution,
create_graph_execution,
get_node_execution_input,
merge_execution_input,
parse_execution_output,
update_execution_status,
upsert_execution_input,
upsert_execution_output,
)
from autogpt_server.data.graph import Graph, Link, Node, get_graph, get_node
from autogpt_server.util.service import AppService, expose, get_service_client
from autogpt_server.util.settings import Config
logger = logging.getLogger(__name__)
def get_log_prefix(graph_eid: str, node_eid: str, block_name: str = "-"):
return f"[ExecutionManager] [graph-{graph_eid}|node-{node_eid}|{block_name}]"
return f"[ExecutionManager][graph-eid-{graph_eid}|node-eid-{node_eid}|{block_name}]"
T = TypeVar("T")
ExecutionStream = Generator[Execution, None, None]
ExecutionStream = Generator[NodeExecution, None, None]
def execute_node(
loop: asyncio.AbstractEventLoop, api_client: "AgentServer", data: Execution
loop: asyncio.AbstractEventLoop, api_client: "AgentServer", data: NodeExecution
) -> ExecutionStream:
"""
Execute a node in the graph. This will trigger a block execution on a node,
@@ -58,9 +60,7 @@ def execute_node(
return loop.run_until_complete(f)
def update_execution(status: ExecutionStatus):
api_client.send_execution_update(
wait(update_execution_status(node_exec_id, status)).model_dump()
)
api_client.update_execution_status(node_exec_id, status)
node = wait(get_node(node_id))
if not node:
@@ -89,7 +89,7 @@ def execute_node(
wait(upsert_execution_output(node_exec_id, output_name, output_data))
update_execution(ExecutionStatus.COMPLETED)
for execution in enqueue_next_nodes(
for execution in _enqueue_next_nodes(
api_client=api_client,
loop=loop,
node=node,
@@ -107,23 +107,30 @@ def execute_node(
raise e
def enqueue_next_nodes(
@contextmanager
def synchronized(api_client: "AgentServer", key: Any):
api_client.acquire_lock(key)
try:
yield
finally:
api_client.release_lock(key)
def _enqueue_next_nodes(
api_client: "AgentServer",
loop: asyncio.AbstractEventLoop,
node: Node,
output: BlockData,
graph_exec_id: str,
prefix: str,
) -> list[Execution]:
) -> list[NodeExecution]:
def wait(f: Coroutine[T, Any, T]) -> T:
return loop.run_until_complete(f)
def execution_update(node_exec_id: str, status: ExecutionStatus):
api_client.send_execution_update(
wait(update_execution_status(node_exec_id, status)).model_dump()
)
api_client.update_execution_status(node_exec_id, status)
def update_execution_result(node_link: Link) -> Execution | None:
def register_next_execution(node_link: Link) -> NodeExecution | None:
next_output_name = node_link.source_name
next_input_name = node_link.sink_name
next_node_id = node_link.sink_id
@@ -137,18 +144,21 @@ def enqueue_next_nodes(
logger.error(f"{prefix} Error, next node {next_node_id} not found.")
return
next_node_exec_id = wait(
upsert_execution_input(
node_id=next_node_id,
graph_exec_id=graph_exec_id,
input_name=next_input_name,
data=next_data,
# Upserting execution input includes reading the existing input pins in the node
# which then either updating the existing execution input or creating a new one.
# While reading, we should avoid any other process to add input to the same node.
with synchronized(api_client, ("upsert_input", next_node_id, graph_exec_id)):
next_node_exec_id, next_node_input = wait(
upsert_execution_input(
node_id=next_node_id,
graph_exec_id=graph_exec_id,
input_name=next_input_name,
input_data=next_data,
)
)
)
next_node_input = wait(get_node_execution_input(next_node_exec_id))
next_node_input, validation_msg = validate_exec(next_node, next_node_input)
suffix = f"{next_output_name}~{next_input_name}#{next_node_id}:{validation_msg}"
suffix = f"{next_output_name}>{next_input_name}~{next_node_id}:{validation_msg}"
if not next_node_input:
logger.warning(f"{prefix} Skipped queueing {suffix}")
@@ -157,7 +167,7 @@ def enqueue_next_nodes(
# Input is complete, enqueue the execution.
logger.warning(f"{prefix} Enqueued {suffix}")
execution_update(next_node_exec_id, ExecutionStatus.QUEUED)
return Execution(
return NodeExecution(
graph_exec_id=graph_exec_id,
node_exec_id=next_node_exec_id,
node_id=next_node.id,
@@ -167,7 +177,7 @@ def enqueue_next_nodes(
return [
execution
for link in node.output_links
if (execution := update_execution_result(link))
if (execution := register_next_execution(link))
]
@@ -228,44 +238,118 @@ def get_agent_server_client() -> "AgentServer":
class Executor:
loop: asyncio.AbstractEventLoop
"""
This class contains event handlers for the process pool executor events.
The main events are:
on_node_executor_start: Initialize the process that executes the node.
on_node_execution: Execution logic for a node.
on_graph_executor_start: Initialize the process that executes the graph.
on_graph_execution: Execution logic for a graph.
The execution flow:
1. Graph execution request is added to the queue.
2. Graph executor loop picks the request from the queue.
3. Graph executor loop submits the graph execution request to the executor pool.
[on_graph_execution]
4. Graph executor initialize the node execution queue.
5. Graph executor adds the starting nodes to the node execution queue.
6. Graph executor waits for all nodes to be executed.
[on_node_execution]
7. Node executor picks the node execution request from the queue.
8. Node executor executes the node.
9. Node executor enqueues the next executed nodes to the node execution queue.
"""
@classmethod
def on_executor_start(cls):
def on_node_executor_start(cls):
cls.loop = asyncio.new_event_loop()
cls.loop.run_until_complete(db.connect())
cls.agent_server_client = get_agent_server_client()
@classmethod
def on_start_execution(cls, q: ExecutionQueue, data: Execution) -> bool:
def on_node_execution(cls, q: ExecutionQueue[NodeExecution], data: NodeExecution):
prefix = get_log_prefix(data.graph_exec_id, data.node_exec_id)
try:
logger.warning(f"{prefix} Start execution")
logger.warning(f"{prefix} Start node execution")
for execution in execute_node(cls.loop, cls.agent_server_client, data):
q.add(execution)
return True
logger.warning(f"{prefix} Finished node execution")
except Exception as e:
logger.exception(f"{prefix} Error: {e}")
return False
logger.exception(f"{prefix} Failed node execution: {e}")
@classmethod
def on_graph_executor_start(cls):
cls.pool_size = Config().num_node_workers
cls.executor = ProcessPoolExecutor(
max_workers=cls.pool_size,
initializer=cls.on_node_executor_start,
)
logger.warning(f"Graph executor started with max-{cls.pool_size} node workers.")
@classmethod
def on_graph_execution(cls, graph_data: GraphExecution):
prefix = get_log_prefix(graph_data.graph_exec_id, "*")
logger.warning(f"{prefix} Start graph execution")
try:
queue = ExecutionQueue[NodeExecution]()
for node_exec in graph_data.start_node_execs:
queue.add(node_exec)
futures: dict[str, Future] = {}
while not queue.empty():
execution = queue.get()
# Avoid parallel execution of the same node.
fut = futures.get(execution.node_id)
if fut and not fut.done():
cls.wait_future(fut)
logger.warning(f"{prefix} Re-enqueueing {execution.node_id}")
queue.add(execution)
continue
futures[execution.node_id] = cls.executor.submit(
cls.on_node_execution, queue, execution
)
# Avoid terminating graph execution when some nodes are still running.
while queue.empty() and futures:
for node_id, future in list(futures.items()):
if future.done():
del futures[node_id]
elif queue.empty():
cls.wait_future(future)
logger.warning(f"{prefix} Finished graph execution")
except Exception as e:
logger.exception(f"{prefix} Failed graph execution: {e}")
@classmethod
def wait_future(cls, future: Future):
try:
future.result(timeout=3)
except TimeoutError:
# Avoid being blocked by long-running node, by not waiting its completion.
pass
class ExecutionManager(AppService):
def __init__(self, pool_size: int):
self.pool_size = pool_size
self.queue = ExecutionQueue()
def __init__(self):
self.pool_size = Config().num_graph_workers
self.queue = ExecutionQueue[GraphExecution]()
def run_service(self):
with ProcessPoolExecutor(
max_workers=self.pool_size,
initializer=Executor.on_executor_start,
initializer=Executor.on_graph_executor_start,
) as executor:
logger.warning(f"Execution manager started with {self.pool_size} workers.")
logger.warning(
f"Execution manager started with max-{self.pool_size} graph workers."
)
while True:
executor.submit(
Executor.on_start_execution,
self.queue,
self.queue.get(),
)
executor.submit(Executor.on_graph_execution, self.queue.get())
@property
def agent_server_client(self) -> "AgentServer":
@@ -292,32 +376,25 @@ class ExecutionManager(AppService):
nodes_input=nodes_input,
)
)
executions: list[BlockInput] = []
starting_node_execs = []
for node_exec in node_execs:
self.add_node_execution(
Execution(
starting_node_execs.append(
NodeExecution(
graph_exec_id=node_exec.graph_exec_id,
node_exec_id=node_exec.node_exec_id,
node_id=node_exec.node_id,
data=node_exec.input_data,
)
)
executions.append(
{
"id": node_exec.node_exec_id,
"node_id": node_exec.node_id,
}
self.agent_server_client.update_execution_status(
node_exec.node_exec_id, ExecutionStatus.QUEUED
)
return {
"id": graph_exec_id,
"executions": executions,
}
def add_node_execution(self, execution: Execution) -> Execution:
res = self.run_and_wait(
update_execution_status(execution.node_exec_id, ExecutionStatus.QUEUED)
graph_exec = GraphExecution(
graph_exec_id=graph_exec_id,
start_node_execs=starting_node_execs,
)
self.agent_server_client.send_execution_update(res.model_dump())
return self.queue.add(execution)
self.queue.add(graph_exec)
return {"id": graph_exec_id}

View File

@@ -18,9 +18,16 @@ from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
import autogpt_server.server.ws_api
from autogpt_server.data import block, db, execution
from autogpt_server.data import block, db
from autogpt_server.data import graph as graph_db
from autogpt_server.data.block import BlockInput, CompletedBlockOutput
from autogpt_server.data.execution import (
ExecutionResult,
ExecutionStatus,
get_execution_results,
list_executions,
update_execution_status,
)
from autogpt_server.executor import ExecutionManager, ExecutionScheduler
from autogpt_server.server.conn_manager import ConnectionManager
from autogpt_server.server.model import (
@@ -30,17 +37,19 @@ from autogpt_server.server.model import (
WsMessage,
)
from autogpt_server.util.data import get_frontend_path
from autogpt_server.util.lock import KeyedMutex
from autogpt_server.util.service import AppService, expose, get_service_client
from autogpt_server.util.settings import Settings
class AgentServer(AppService):
event_queue: asyncio.Queue[execution.ExecutionResult] = asyncio.Queue()
event_queue: asyncio.Queue[ExecutionResult] = asyncio.Queue()
manager = ConnectionManager()
mutex = KeyedMutex()
async def event_broadcaster(self):
while True:
event: execution.ExecutionResult = await self.event_queue.get()
event: ExecutionResult = await self.event_queue.get()
await self.manager.send_execution_result(event)
@asynccontextmanager
@@ -552,17 +561,17 @@ class AgentServer(AppService):
status_code=404, detail=f"Agent #{graph_id}{rev} not found."
)
return await execution.list_executions(graph_id, graph_version)
return await list_executions(graph_id, graph_version)
@classmethod
async def get_run_execution_results(
cls, graph_id: str, run_id: str
) -> list[execution.ExecutionResult]:
) -> list[ExecutionResult]:
graph = await graph_db.get_graph(graph_id)
if not graph:
raise HTTPException(status_code=404, detail=f"Graph #{graph_id} not found.")
return await execution.get_execution_results(run_id)
return await get_execution_results(run_id)
async def create_schedule(
self, graph_id: str, cron: str, input_data: dict[Any, Any]
@@ -590,15 +599,33 @@ class AgentServer(AppService):
return execution_scheduler.get_execution_schedules(graph_id) # type: ignore
@expose
def send_execution_update(self, execution_result_dict: dict[Any, Any]):
execution_result = execution.ExecutionResult(**execution_result_dict)
self.run_and_wait(self.event_queue.put(execution_result))
def update_execution_status(self, exec_id: str, status: ExecutionStatus):
exec_result = self.run_and_wait(update_execution_status(exec_id, status))
self.run_and_wait(self.event_queue.put(exec_result))
@expose
def acquire_lock(self, key: Any):
self.mutex.lock(key)
@expose
def release_lock(self, key: Any):
self.mutex.unlock(key)
@classmethod
def update_configuration(
cls,
updated_settings: Annotated[
Dict[str, Any], Body(examples=[{"config": {"num_workers": 10}}])
Dict[str, Any],
Body(
examples=[
{
"config": {
"num_graph_workers": 10,
"num_node_workers": 10,
}
}
]
),
],
):
settings = Settings()

View File

@@ -0,0 +1,32 @@
from threading import Lock
from typing import Any
from expiringdict import ExpiringDict
class KeyedMutex:
"""
This class provides a mutex that can be locked and unlocked by a specific key.
It uses an ExpiringDict to automatically clear the mutex after a specified timeout,
in case the key is not unlocked for a specified duration, to prevent memory leaks.
"""
def __init__(self):
self.locks: dict[Any, Lock] = ExpiringDict(max_len=6000, max_age_seconds=60)
self.locks_lock = Lock()
def lock(self, key: Any):
with self.locks_lock:
if key not in self.locks:
self.locks[key] = (lock := Lock())
else:
lock = self.locks[key]
lock.acquire()
def unlock(self, key: Any):
with self.locks_lock:
if key in self.locks:
lock = self.locks.pop(key)
else:
return
lock.release()

View File

@@ -41,8 +41,17 @@ class UpdateTrackingModel(BaseModel, Generic[T]):
class Config(UpdateTrackingModel["Config"], BaseSettings):
"""Config for the server."""
num_workers: int = Field(
default=9, ge=1, le=100, description="Number of workers to use for execution."
num_graph_workers: int = Field(
default=1,
ge=1,
le=100,
description="Maximum number of workers to use for graph execution.",
)
num_node_workers: int = Field(
default=1,
ge=1,
le=100,
description="Maximum number of workers to use for node execution within a single graph.",
)
# Add more configuration fields as needed

View File

@@ -13,7 +13,7 @@ log = print
class SpinTestServer:
def __init__(self):
self.name_server = PyroNameServer()
self.exec_manager = ExecutionManager(1)
self.exec_manager = ExecutionManager()
self.agent_server = AgentServer()
self.scheduler = ExecutionScheduler()

View File

@@ -1,3 +1,4 @@
{
"num_workers": 5
}
"num_graph_workers": 10,
"num_node_workers": 10
}

View File

@@ -0,0 +1,8 @@
/*
Warnings:
- A unique constraint covering the columns `[referencedByInputExecId,referencedByOutputExecId,name]` on the table `AgentNodeExecutionInputOutput` will be added. If there are existing duplicate values, this will fail.
*/
-- CreateIndex
CREATE UNIQUE INDEX "AgentNodeExecutionInputOutput_referencedByInputExecId_referencedByOutputExecId_name_key" ON "AgentNodeExecutionInputOutput"("referencedByInputExecId", "referencedByOutputExecId", "name");

View File

@@ -1,3 +1,3 @@
# Please do not edit this file manually
# It should be added in your version-control system (i.e. Git)
provider = "sqlite"
provider = "sqlite"

View File

@@ -25,7 +25,7 @@ requests = "*"
sentry-sdk = "^1.40.4"
[package.extras]
benchmark = ["agbenchmark @ file:///workspaces/AutoGPT/benchmark"]
benchmark = ["agbenchmark"]
[package.source]
type = "directory"
@@ -329,7 +329,7 @@ watchdog = "4.0.0"
webdriver-manager = "^4.0.1"
[package.extras]
benchmark = ["agbenchmark @ file:///workspaces/AutoGPT/benchmark"]
benchmark = ["agbenchmark"]
[package.source]
type = "directory"
@@ -1179,6 +1179,20 @@ files = [
[package.extras]
test = ["pytest (>=6)"]
[[package]]
name = "expiringdict"
version = "1.2.2"
description = "Dictionary with auto-expiring values for caching purposes"
optional = false
python-versions = "*"
files = [
{file = "expiringdict-1.2.2-py3-none-any.whl", hash = "sha256:09a5d20bc361163e6432a874edd3179676e935eb81b925eccef48d409a8a45e8"},
{file = "expiringdict-1.2.2.tar.gz", hash = "sha256:300fb92a7e98f15b05cf9a856c1415b3bc4f2e132be07daa326da6414c23ee09"},
]
[package.extras]
tests = ["coverage", "coveralls", "dill", "mock", "nose"]
[[package]]
name = "fastapi"
version = "0.109.2"
@@ -6348,4 +6362,4 @@ test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools",
[metadata]
lock-version = "2.0"
python-versions = "^3.10"
content-hash = "422edccf59dc5cdcc1cb0f6991228b2f58ef807a934d1aeda9643fa0cad27780"
content-hash = "9013ff78344cb68878809bd7220453879c32c9291e39d99321dbcc9a7359855c"

View File

@@ -38,6 +38,7 @@ youtube-transcript-api = "^0.6.2"
ollama = "^0.3.0"
feedparser = "^6.0.11"
python-dotenv = "^1.0.1"
expiringdict = "^1.2.2"
[tool.poetry.group.dev.dependencies]
cx-freeze = { git = "https://github.com/ntindle/cx_Freeze.git", rev = "main", develop = true }

View File

@@ -127,6 +127,9 @@ model AgentNodeExecutionInputOutput {
ReferencedByInputExec AgentNodeExecution? @relation("AgentNodeExecutionInput", fields: [referencedByInputExecId], references: [id])
referencedByOutputExecId String?
ReferencedByOutputExec AgentNodeExecution? @relation("AgentNodeExecutionOutput", fields: [referencedByOutputExecId], references: [id])
// Input and Output pin names are unique for each AgentNodeExecution.
@@unique([referencedByInputExecId, referencedByOutputExecId, name])
}
// This model describes the recurring execution schedule of an Agent.

View File

@@ -37,7 +37,7 @@ async def assert_sample_graph_executions(test_graph: graph.Graph, graph_exec_id:
assert exec.graph_exec_id == graph_exec_id
assert exec.output_data == {"output": ["Hello, World!"]}
assert exec.input_data == {"input": text}
assert exec.node_id == test_graph.nodes[0].id
assert exec.node_id in [test_graph.nodes[0].id, test_graph.nodes[1].id]
# Executing ConstantBlock2
exec = executions[1]
@@ -45,7 +45,7 @@ async def assert_sample_graph_executions(test_graph: graph.Graph, graph_exec_id:
assert exec.graph_exec_id == graph_exec_id
assert exec.output_data == {"output": ["Hello, World!"]}
assert exec.input_data == {"input": text}
assert exec.node_id == test_graph.nodes[1].id
assert exec.node_id in [test_graph.nodes[0].id, test_graph.nodes[1].id]
# Executing TextFormatterBlock
exec = executions[2]