Compare commits

...

20 Commits

Author SHA1 Message Date
Zamil Majdy
4cba1c0cb8 Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into zamilmajdy/fix-static-output-resolve 2025-03-25 09:03:17 +07:00
Zamil Majdy
8df4625fd6 Merge branch 'dev' into zamilmajdy/fix-static-output-resolve 2025-03-12 11:59:09 +07:00
Zamil Majdy
56d85dc770 Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into zamilmajdy/fix-static-output-resolve 2025-03-12 07:29:25 +07:00
Zamil Majdy
67478985a4 revert 2025-03-07 16:55:36 +07:00
Zamil Majdy
07bcfcd477 Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into zamilmajdy/fix-static-output-resolve 2025-03-07 16:55:13 +07:00
Zamil Majdy
8026f4f13a Revert 2025-03-07 16:40:58 +07:00
Zamil Majdy
b55185a2b8 Revert 2025-03-07 15:57:37 +07:00
Zamil Majdy
1635a49a88 Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into zamilmajdy/fix-static-output-resolve 2025-03-07 15:52:38 +07:00
Zamil Majdy
131d9d2e84 Revert 2025-03-07 15:51:42 +07:00
Zamil Majdy
44409c035f Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into zamilmajdy/fix-static-output-resolve 2025-03-06 23:31:10 +07:00
Zamil Majdy
4e4a047a40 fix(backend): Fix static link resolving behaviour on concurrent output 2025-03-06 23:30:03 +07:00
Zamil Majdy
5a9235bcf9 fix(blocks): Avoid appending None last_tool_output on SmartDecisionMakerBlock 2025-03-06 21:26:50 +07:00
Zamil Majdy
b89af139c1 Add warning 2025-03-06 15:59:24 +07:00
Zamil Majdy
4be9399134 Merge remote-tracking branch 'origin/zamilmajdy/improve-sdm-add-anthropic' into zamilmajdy/improve-sdm-add-anthropic 2025-03-06 15:50:23 +07:00
Zamil Majdy
ee5a95d5f6 feat(backend): Improve SmartDecisionMaker Agent-loop capability & add Anthropics support 2025-03-06 15:50:11 +07:00
Zamil Majdy
0705c6ae6d Merge branch 'dev' into zamilmajdy/improve-sdm-add-anthropic 2025-03-06 15:43:45 +07:00
Zamil Majdy
a067e6f6a1 Merge remote-tracking branch 'origin/zamilmajdy/improve-sdm-add-anthropic' into zamilmajdy/improve-sdm-add-anthropic 2025-03-06 15:43:16 +07:00
Zamil Majdy
8cba27c7c4 feat(backend): Improve SmartDecisionMaker Agent-loop capability & add Anthropics support 2025-03-06 15:42:54 +07:00
Zamil Majdy
999cb2636e Merge branch 'dev' into zamilmajdy/improve-sdm-add-anthropic 2025-03-06 13:20:06 +07:00
Zamil Majdy
b0f6b93d86 feat(backend): Improve SmartDecisionMaker Agent-loop capability & add Anthropics support 2025-03-06 13:16:21 +07:00
4 changed files with 78 additions and 34 deletions

View File

@@ -1,7 +1,16 @@
from collections import defaultdict
from datetime import datetime, timezone
from multiprocessing import Manager
from typing import Any, AsyncGenerator, Generator, Generic, Optional, Type, TypeVar
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
Generator,
Generic,
Optional,
Type,
TypeVar,
)
from prisma import Json
from prisma.enums import AgentExecutionStatus
@@ -21,6 +30,9 @@ from backend.server.v2.store.exceptions import DatabaseError
from backend.util import mock, type
from backend.util.settings import Config
if TYPE_CHECKING:
pass
class GraphExecutionEntry(BaseModel):
user_id: str
@@ -197,7 +209,7 @@ async def upsert_execution_input(
input_name: str,
input_data: Any,
node_exec_id: str | None = None,
) -> tuple[str, BlockInput]:
) -> tuple[ExecutionResult, BlockInput]:
"""
Insert AgentNodeExecutionInputOutput record for as one of AgentNodeExecution.Input.
If there is no AgentNodeExecution that has no `input_name` as input, create new one.
@@ -234,7 +246,7 @@ async def upsert_execution_input(
"referencedByInputExecId": existing_execution.id,
}
)
return existing_execution.id, {
return ExecutionResult.from_db(existing_execution), {
**{
input_data.name: type.convert(input_data.data, Type[Any])
for input_data in existing_execution.Input or []
@@ -251,7 +263,7 @@ async def upsert_execution_input(
"Input": {"create": {"name": input_name, "data": json_input_data}},
}
)
return result.id, {input_name: input_data}
return ExecutionResult.from_db(result), {input_name: input_data}
else:
raise ValueError(
@@ -556,22 +568,40 @@ def merge_execution_input(data: BlockInput) -> BlockInput:
return data
async def get_latest_execution(node_id: str, graph_eid: str) -> ExecutionResult | None:
execution = await AgentNodeExecution.prisma().find_first(
async def get_output_from_links(
links: dict[str, tuple[str, str]], graph_eid: str
) -> BlockInput:
"""
Get the latest completed node execution output from the graph links.
Args:
links: dict[node_id, (source_name, sink_name)] of the links to get the output from.
graph_eid: the id of the graph execution to get the output from.
Returns:
BlockInput: a dict of the latest output from the links.
"""
executions = await AgentNodeExecution.prisma().find_many(
where={
"agentNodeId": node_id,
"agentNodeId": {"in": list(links.keys())},
"agentGraphExecutionId": graph_eid,
"executionStatus": {"not": ExecutionStatus.INCOMPLETE}, # type: ignore
},
order=[
{"queuedTime": "desc"},
{"queuedTime": "asc"},
{"addedTime": "desc"},
],
include=EXECUTION_RESULT_INCLUDE,
)
if not execution:
return None
return ExecutionResult.from_db(execution)
latest_output = {}
for e in executions:
execution = ExecutionResult.from_db(e)
source_name, sink_name = links[execution.node_id]
if value := execution.output_data.get(source_name):
latest_output[sink_name] = value[-1]
return latest_output
async def get_incomplete_executions(

View File

@@ -5,7 +5,7 @@ from backend.data.execution import (
create_graph_execution,
get_execution_results,
get_incomplete_executions,
get_latest_execution,
get_output_from_links,
update_execution_status,
update_execution_status_batch,
update_graph_execution_start_time,
@@ -69,7 +69,7 @@ class DatabaseManager(AppService):
create_graph_execution = exposed_run_and_wait(create_graph_execution)
get_execution_results = exposed_run_and_wait(get_execution_results)
get_incomplete_executions = exposed_run_and_wait(get_incomplete_executions)
get_latest_execution = exposed_run_and_wait(get_latest_execution)
get_output_from_links = exposed_run_and_wait(get_output_from_links)
update_execution_status = exposed_run_and_wait(update_execution_status)
update_execution_status_batch = exposed_run_and_wait(update_execution_status_batch)
update_graph_execution_start_time = exposed_run_and_wait(

View File

@@ -295,6 +295,19 @@ def _enqueue_next_nodes(
data=data,
)
def validate_next_exec(
next_node_exec_id: str, next_node: Node, next_node_input: BlockInput
) -> tuple[BlockInput | None, str]:
try:
return validate_exec(next_node, next_node_input)
except Exception as e:
db_client.upsert_execution_output(next_node_exec_id, "error", str(e))
execution = db_client.update_execution_status(
next_node_exec_id, ExecutionStatus.FAILED
)
db_client.send_execution_update(execution)
return None, str(e)
def register_next_executions(node_link: Link) -> list[NodeExecutionEntry]:
enqueued_executions = []
next_output_name = node_link.source_name
@@ -312,29 +325,33 @@ def _enqueue_next_nodes(
# Or the same input to be consumed multiple times.
with synchronized(f"upsert_input-{next_node_id}-{graph_exec_id}"):
# Add output data to the earliest incomplete execution, or create a new one.
next_node_exec_id, next_node_input = db_client.upsert_execution_input(
next_node_exec, next_node_input = db_client.upsert_execution_input(
node_id=next_node_id,
graph_exec_id=graph_exec_id,
input_name=next_input_name,
input_data=next_data,
)
next_node_exec_id = next_node_exec.node_exec_id
db_client.send_execution_update(next_node_exec)
# Complete missing static input pins data using the last execution input.
static_link_names = {
link.sink_name
static_links = {
link.source_id: (link.source_name, link.sink_name)
for link in next_node.input_links
if link.is_static and link.sink_name not in next_node_input
if link.is_static
}
if static_link_names and (
latest_execution := db_client.get_latest_execution(
next_node_id, graph_exec_id
)
):
for name in static_link_names:
next_node_input[name] = latest_execution.input_data.get(name)
static_output = (
db_client.get_output_from_links(static_links, graph_exec_id)
if static_links
else {}
)
for name, value in static_output.items():
next_node_input[name] = next_node_input.get(name, value)
# Validate the input data for the next node.
next_node_input, validation_msg = validate_exec(next_node, next_node_input)
next_node_input, validation_msg = validate_next_exec(
next_node_exec_id, next_node, next_node_input
)
suffix = f"{next_output_name}>{next_input_name}~{next_node_exec_id}:{validation_msg}"
# Incomplete input data, skip queueing the execution.
@@ -365,15 +382,10 @@ def _enqueue_next_nodes(
idata = iexec.input_data
ineid = iexec.node_exec_id
static_link_names = {
link.sink_name
for link in next_node.input_links
if link.is_static and link.sink_name not in idata
}
for input_name in static_link_names:
idata[input_name] = next_node_input[input_name]
for input_name, input_value in static_output.items():
idata[input_name] = idata.get(input_name, input_value)
idata, msg = validate_exec(next_node, idata)
idata, msg = validate_next_exec(next_node_exec_id, next_node, idata)
suffix = f"{next_output_name}>{next_input_name}~{ineid}:{msg}"
if not idata:
log_metadata.info(f"Enqueueing static-link skipped: {suffix}")

View File

@@ -206,7 +206,9 @@ async def test_input_pin_always_waited(server: SpinTestServer):
graph_exec = await server.agent_server.test_get_graph_run_results(
test_graph.id, graph_exec_id, test_user.id
)
assert len(graph_exec.node_executions) == 3
assert (
len(graph_exec.node_executions) >= 3
) # Concurrency can cause duplicate executions.
# FindInDictionaryBlock should wait for the input pin to be provided,
# Hence executing extraction of "key" from {"key1": "value1", "key2": "value2"}
assert graph_exec.node_executions[2].status == execution.ExecutionStatus.COMPLETED