mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-01-19 20:18:22 -05:00
Compare commits
20 Commits
fix/undefi
...
zamilmajdy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4cba1c0cb8 | ||
|
|
8df4625fd6 | ||
|
|
56d85dc770 | ||
|
|
67478985a4 | ||
|
|
07bcfcd477 | ||
|
|
8026f4f13a | ||
|
|
b55185a2b8 | ||
|
|
1635a49a88 | ||
|
|
131d9d2e84 | ||
|
|
44409c035f | ||
|
|
4e4a047a40 | ||
|
|
5a9235bcf9 | ||
|
|
b89af139c1 | ||
|
|
4be9399134 | ||
|
|
ee5a95d5f6 | ||
|
|
0705c6ae6d | ||
|
|
a067e6f6a1 | ||
|
|
8cba27c7c4 | ||
|
|
999cb2636e | ||
|
|
b0f6b93d86 |
@@ -1,7 +1,16 @@
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from multiprocessing import Manager
|
||||
from typing import Any, AsyncGenerator, Generator, Generic, Optional, Type, TypeVar
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
AsyncGenerator,
|
||||
Generator,
|
||||
Generic,
|
||||
Optional,
|
||||
Type,
|
||||
TypeVar,
|
||||
)
|
||||
|
||||
from prisma import Json
|
||||
from prisma.enums import AgentExecutionStatus
|
||||
@@ -21,6 +30,9 @@ from backend.server.v2.store.exceptions import DatabaseError
|
||||
from backend.util import mock, type
|
||||
from backend.util.settings import Config
|
||||
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
|
||||
|
||||
class GraphExecutionEntry(BaseModel):
|
||||
user_id: str
|
||||
@@ -197,7 +209,7 @@ async def upsert_execution_input(
|
||||
input_name: str,
|
||||
input_data: Any,
|
||||
node_exec_id: str | None = None,
|
||||
) -> tuple[str, BlockInput]:
|
||||
) -> tuple[ExecutionResult, BlockInput]:
|
||||
"""
|
||||
Insert AgentNodeExecutionInputOutput record for as one of AgentNodeExecution.Input.
|
||||
If there is no AgentNodeExecution that has no `input_name` as input, create new one.
|
||||
@@ -234,7 +246,7 @@ async def upsert_execution_input(
|
||||
"referencedByInputExecId": existing_execution.id,
|
||||
}
|
||||
)
|
||||
return existing_execution.id, {
|
||||
return ExecutionResult.from_db(existing_execution), {
|
||||
**{
|
||||
input_data.name: type.convert(input_data.data, Type[Any])
|
||||
for input_data in existing_execution.Input or []
|
||||
@@ -251,7 +263,7 @@ async def upsert_execution_input(
|
||||
"Input": {"create": {"name": input_name, "data": json_input_data}},
|
||||
}
|
||||
)
|
||||
return result.id, {input_name: input_data}
|
||||
return ExecutionResult.from_db(result), {input_name: input_data}
|
||||
|
||||
else:
|
||||
raise ValueError(
|
||||
@@ -556,22 +568,40 @@ def merge_execution_input(data: BlockInput) -> BlockInput:
|
||||
return data
|
||||
|
||||
|
||||
async def get_latest_execution(node_id: str, graph_eid: str) -> ExecutionResult | None:
|
||||
execution = await AgentNodeExecution.prisma().find_first(
|
||||
async def get_output_from_links(
|
||||
links: dict[str, tuple[str, str]], graph_eid: str
|
||||
) -> BlockInput:
|
||||
"""
|
||||
Get the latest completed node execution output from the graph links.
|
||||
|
||||
Args:
|
||||
links: dict[node_id, (source_name, sink_name)] of the links to get the output from.
|
||||
graph_eid: the id of the graph execution to get the output from.
|
||||
|
||||
Returns:
|
||||
BlockInput: a dict of the latest output from the links.
|
||||
"""
|
||||
executions = await AgentNodeExecution.prisma().find_many(
|
||||
where={
|
||||
"agentNodeId": node_id,
|
||||
"agentNodeId": {"in": list(links.keys())},
|
||||
"agentGraphExecutionId": graph_eid,
|
||||
"executionStatus": {"not": ExecutionStatus.INCOMPLETE}, # type: ignore
|
||||
},
|
||||
order=[
|
||||
{"queuedTime": "desc"},
|
||||
{"queuedTime": "asc"},
|
||||
{"addedTime": "desc"},
|
||||
],
|
||||
include=EXECUTION_RESULT_INCLUDE,
|
||||
)
|
||||
if not execution:
|
||||
return None
|
||||
return ExecutionResult.from_db(execution)
|
||||
|
||||
latest_output = {}
|
||||
for e in executions:
|
||||
execution = ExecutionResult.from_db(e)
|
||||
source_name, sink_name = links[execution.node_id]
|
||||
if value := execution.output_data.get(source_name):
|
||||
latest_output[sink_name] = value[-1]
|
||||
|
||||
return latest_output
|
||||
|
||||
|
||||
async def get_incomplete_executions(
|
||||
|
||||
@@ -5,7 +5,7 @@ from backend.data.execution import (
|
||||
create_graph_execution,
|
||||
get_execution_results,
|
||||
get_incomplete_executions,
|
||||
get_latest_execution,
|
||||
get_output_from_links,
|
||||
update_execution_status,
|
||||
update_execution_status_batch,
|
||||
update_graph_execution_start_time,
|
||||
@@ -69,7 +69,7 @@ class DatabaseManager(AppService):
|
||||
create_graph_execution = exposed_run_and_wait(create_graph_execution)
|
||||
get_execution_results = exposed_run_and_wait(get_execution_results)
|
||||
get_incomplete_executions = exposed_run_and_wait(get_incomplete_executions)
|
||||
get_latest_execution = exposed_run_and_wait(get_latest_execution)
|
||||
get_output_from_links = exposed_run_and_wait(get_output_from_links)
|
||||
update_execution_status = exposed_run_and_wait(update_execution_status)
|
||||
update_execution_status_batch = exposed_run_and_wait(update_execution_status_batch)
|
||||
update_graph_execution_start_time = exposed_run_and_wait(
|
||||
|
||||
@@ -295,6 +295,19 @@ def _enqueue_next_nodes(
|
||||
data=data,
|
||||
)
|
||||
|
||||
def validate_next_exec(
|
||||
next_node_exec_id: str, next_node: Node, next_node_input: BlockInput
|
||||
) -> tuple[BlockInput | None, str]:
|
||||
try:
|
||||
return validate_exec(next_node, next_node_input)
|
||||
except Exception as e:
|
||||
db_client.upsert_execution_output(next_node_exec_id, "error", str(e))
|
||||
execution = db_client.update_execution_status(
|
||||
next_node_exec_id, ExecutionStatus.FAILED
|
||||
)
|
||||
db_client.send_execution_update(execution)
|
||||
return None, str(e)
|
||||
|
||||
def register_next_executions(node_link: Link) -> list[NodeExecutionEntry]:
|
||||
enqueued_executions = []
|
||||
next_output_name = node_link.source_name
|
||||
@@ -312,29 +325,33 @@ def _enqueue_next_nodes(
|
||||
# Or the same input to be consumed multiple times.
|
||||
with synchronized(f"upsert_input-{next_node_id}-{graph_exec_id}"):
|
||||
# Add output data to the earliest incomplete execution, or create a new one.
|
||||
next_node_exec_id, next_node_input = db_client.upsert_execution_input(
|
||||
next_node_exec, next_node_input = db_client.upsert_execution_input(
|
||||
node_id=next_node_id,
|
||||
graph_exec_id=graph_exec_id,
|
||||
input_name=next_input_name,
|
||||
input_data=next_data,
|
||||
)
|
||||
next_node_exec_id = next_node_exec.node_exec_id
|
||||
db_client.send_execution_update(next_node_exec)
|
||||
|
||||
# Complete missing static input pins data using the last execution input.
|
||||
static_link_names = {
|
||||
link.sink_name
|
||||
static_links = {
|
||||
link.source_id: (link.source_name, link.sink_name)
|
||||
for link in next_node.input_links
|
||||
if link.is_static and link.sink_name not in next_node_input
|
||||
if link.is_static
|
||||
}
|
||||
if static_link_names and (
|
||||
latest_execution := db_client.get_latest_execution(
|
||||
next_node_id, graph_exec_id
|
||||
)
|
||||
):
|
||||
for name in static_link_names:
|
||||
next_node_input[name] = latest_execution.input_data.get(name)
|
||||
static_output = (
|
||||
db_client.get_output_from_links(static_links, graph_exec_id)
|
||||
if static_links
|
||||
else {}
|
||||
)
|
||||
for name, value in static_output.items():
|
||||
next_node_input[name] = next_node_input.get(name, value)
|
||||
|
||||
# Validate the input data for the next node.
|
||||
next_node_input, validation_msg = validate_exec(next_node, next_node_input)
|
||||
next_node_input, validation_msg = validate_next_exec(
|
||||
next_node_exec_id, next_node, next_node_input
|
||||
)
|
||||
suffix = f"{next_output_name}>{next_input_name}~{next_node_exec_id}:{validation_msg}"
|
||||
|
||||
# Incomplete input data, skip queueing the execution.
|
||||
@@ -365,15 +382,10 @@ def _enqueue_next_nodes(
|
||||
idata = iexec.input_data
|
||||
ineid = iexec.node_exec_id
|
||||
|
||||
static_link_names = {
|
||||
link.sink_name
|
||||
for link in next_node.input_links
|
||||
if link.is_static and link.sink_name not in idata
|
||||
}
|
||||
for input_name in static_link_names:
|
||||
idata[input_name] = next_node_input[input_name]
|
||||
for input_name, input_value in static_output.items():
|
||||
idata[input_name] = idata.get(input_name, input_value)
|
||||
|
||||
idata, msg = validate_exec(next_node, idata)
|
||||
idata, msg = validate_next_exec(next_node_exec_id, next_node, idata)
|
||||
suffix = f"{next_output_name}>{next_input_name}~{ineid}:{msg}"
|
||||
if not idata:
|
||||
log_metadata.info(f"Enqueueing static-link skipped: {suffix}")
|
||||
|
||||
@@ -206,7 +206,9 @@ async def test_input_pin_always_waited(server: SpinTestServer):
|
||||
graph_exec = await server.agent_server.test_get_graph_run_results(
|
||||
test_graph.id, graph_exec_id, test_user.id
|
||||
)
|
||||
assert len(graph_exec.node_executions) == 3
|
||||
assert (
|
||||
len(graph_exec.node_executions) >= 3
|
||||
) # Concurrency can cause duplicate executions.
|
||||
# FindInDictionaryBlock should wait for the input pin to be provided,
|
||||
# Hence executing extraction of "key" from {"key1": "value1", "key2": "value2"}
|
||||
assert graph_exec.node_executions[2].status == execution.ExecutionStatus.COMPLETED
|
||||
|
||||
Reference in New Issue
Block a user