Compare commits

...

20 Commits

Author SHA1 Message Date
Zamil Majdy
4cba1c0cb8 Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into zamilmajdy/fix-static-output-resolve 2025-03-25 09:03:17 +07:00
Zamil Majdy
8df4625fd6 Merge branch 'dev' into zamilmajdy/fix-static-output-resolve 2025-03-12 11:59:09 +07:00
Zamil Majdy
56d85dc770 Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into zamilmajdy/fix-static-output-resolve 2025-03-12 07:29:25 +07:00
Zamil Majdy
67478985a4 revert 2025-03-07 16:55:36 +07:00
Zamil Majdy
07bcfcd477 Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into zamilmajdy/fix-static-output-resolve 2025-03-07 16:55:13 +07:00
Zamil Majdy
8026f4f13a Revert 2025-03-07 16:40:58 +07:00
Zamil Majdy
b55185a2b8 Revert 2025-03-07 15:57:37 +07:00
Zamil Majdy
1635a49a88 Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into zamilmajdy/fix-static-output-resolve 2025-03-07 15:52:38 +07:00
Zamil Majdy
131d9d2e84 Revert 2025-03-07 15:51:42 +07:00
Zamil Majdy
44409c035f Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into zamilmajdy/fix-static-output-resolve 2025-03-06 23:31:10 +07:00
Zamil Majdy
4e4a047a40 fix(backend): Fix static link resolving behaviour on concurrent output 2025-03-06 23:30:03 +07:00
Zamil Majdy
5a9235bcf9 fix(blocks): Avoid appending None last_tool_output on SmartDecisionMakerBlock 2025-03-06 21:26:50 +07:00
Zamil Majdy
b89af139c1 Add warning 2025-03-06 15:59:24 +07:00
Zamil Majdy
4be9399134 Merge remote-tracking branch 'origin/zamilmajdy/improve-sdm-add-anthropic' into zamilmajdy/improve-sdm-add-anthropic 2025-03-06 15:50:23 +07:00
Zamil Majdy
ee5a95d5f6 feat(backend): Improve SmartDecisionMaker Agent-loop capability & add Anthropics support 2025-03-06 15:50:11 +07:00
Zamil Majdy
0705c6ae6d Merge branch 'dev' into zamilmajdy/improve-sdm-add-anthropic 2025-03-06 15:43:45 +07:00
Zamil Majdy
a067e6f6a1 Merge remote-tracking branch 'origin/zamilmajdy/improve-sdm-add-anthropic' into zamilmajdy/improve-sdm-add-anthropic 2025-03-06 15:43:16 +07:00
Zamil Majdy
8cba27c7c4 feat(backend): Improve SmartDecisionMaker Agent-loop capability & add Anthropics support 2025-03-06 15:42:54 +07:00
Zamil Majdy
999cb2636e Merge branch 'dev' into zamilmajdy/improve-sdm-add-anthropic 2025-03-06 13:20:06 +07:00
Zamil Majdy
b0f6b93d86 feat(backend): Improve SmartDecisionMaker Agent-loop capability & add Anthropics support 2025-03-06 13:16:21 +07:00
4 changed files with 78 additions and 34 deletions

View File

@@ -1,7 +1,16 @@
from collections import defaultdict from collections import defaultdict
from datetime import datetime, timezone from datetime import datetime, timezone
from multiprocessing import Manager from multiprocessing import Manager
from typing import Any, AsyncGenerator, Generator, Generic, Optional, Type, TypeVar from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
Generator,
Generic,
Optional,
Type,
TypeVar,
)
from prisma import Json from prisma import Json
from prisma.enums import AgentExecutionStatus from prisma.enums import AgentExecutionStatus
@@ -21,6 +30,9 @@ from backend.server.v2.store.exceptions import DatabaseError
from backend.util import mock, type from backend.util import mock, type
from backend.util.settings import Config from backend.util.settings import Config
if TYPE_CHECKING:
pass
class GraphExecutionEntry(BaseModel): class GraphExecutionEntry(BaseModel):
user_id: str user_id: str
@@ -197,7 +209,7 @@ async def upsert_execution_input(
input_name: str, input_name: str,
input_data: Any, input_data: Any,
node_exec_id: str | None = None, node_exec_id: str | None = None,
) -> tuple[str, BlockInput]: ) -> tuple[ExecutionResult, BlockInput]:
""" """
Insert AgentNodeExecutionInputOutput record for as one of AgentNodeExecution.Input. Insert AgentNodeExecutionInputOutput record for as one of AgentNodeExecution.Input.
If there is no AgentNodeExecution that has no `input_name` as input, create new one. If there is no AgentNodeExecution that has no `input_name` as input, create new one.
@@ -234,7 +246,7 @@ async def upsert_execution_input(
"referencedByInputExecId": existing_execution.id, "referencedByInputExecId": existing_execution.id,
} }
) )
return existing_execution.id, { return ExecutionResult.from_db(existing_execution), {
**{ **{
input_data.name: type.convert(input_data.data, Type[Any]) input_data.name: type.convert(input_data.data, Type[Any])
for input_data in existing_execution.Input or [] for input_data in existing_execution.Input or []
@@ -251,7 +263,7 @@ async def upsert_execution_input(
"Input": {"create": {"name": input_name, "data": json_input_data}}, "Input": {"create": {"name": input_name, "data": json_input_data}},
} }
) )
return result.id, {input_name: input_data} return ExecutionResult.from_db(result), {input_name: input_data}
else: else:
raise ValueError( raise ValueError(
@@ -556,22 +568,40 @@ def merge_execution_input(data: BlockInput) -> BlockInput:
return data return data
async def get_latest_execution(node_id: str, graph_eid: str) -> ExecutionResult | None: async def get_output_from_links(
execution = await AgentNodeExecution.prisma().find_first( links: dict[str, tuple[str, str]], graph_eid: str
) -> BlockInput:
"""
Get the latest completed node execution output from the graph links.
Args:
links: dict[node_id, (source_name, sink_name)] of the links to get the output from.
graph_eid: the id of the graph execution to get the output from.
Returns:
BlockInput: a dict of the latest output from the links.
"""
executions = await AgentNodeExecution.prisma().find_many(
where={ where={
"agentNodeId": node_id, "agentNodeId": {"in": list(links.keys())},
"agentGraphExecutionId": graph_eid, "agentGraphExecutionId": graph_eid,
"executionStatus": {"not": ExecutionStatus.INCOMPLETE}, # type: ignore "executionStatus": {"not": ExecutionStatus.INCOMPLETE}, # type: ignore
}, },
order=[ order=[
{"queuedTime": "desc"}, {"queuedTime": "asc"},
{"addedTime": "desc"}, {"addedTime": "desc"},
], ],
include=EXECUTION_RESULT_INCLUDE, include=EXECUTION_RESULT_INCLUDE,
) )
if not execution:
return None latest_output = {}
return ExecutionResult.from_db(execution) for e in executions:
execution = ExecutionResult.from_db(e)
source_name, sink_name = links[execution.node_id]
if value := execution.output_data.get(source_name):
latest_output[sink_name] = value[-1]
return latest_output
async def get_incomplete_executions( async def get_incomplete_executions(

View File

@@ -5,7 +5,7 @@ from backend.data.execution import (
create_graph_execution, create_graph_execution,
get_execution_results, get_execution_results,
get_incomplete_executions, get_incomplete_executions,
get_latest_execution, get_output_from_links,
update_execution_status, update_execution_status,
update_execution_status_batch, update_execution_status_batch,
update_graph_execution_start_time, update_graph_execution_start_time,
@@ -69,7 +69,7 @@ class DatabaseManager(AppService):
create_graph_execution = exposed_run_and_wait(create_graph_execution) create_graph_execution = exposed_run_and_wait(create_graph_execution)
get_execution_results = exposed_run_and_wait(get_execution_results) get_execution_results = exposed_run_and_wait(get_execution_results)
get_incomplete_executions = exposed_run_and_wait(get_incomplete_executions) get_incomplete_executions = exposed_run_and_wait(get_incomplete_executions)
get_latest_execution = exposed_run_and_wait(get_latest_execution) get_output_from_links = exposed_run_and_wait(get_output_from_links)
update_execution_status = exposed_run_and_wait(update_execution_status) update_execution_status = exposed_run_and_wait(update_execution_status)
update_execution_status_batch = exposed_run_and_wait(update_execution_status_batch) update_execution_status_batch = exposed_run_and_wait(update_execution_status_batch)
update_graph_execution_start_time = exposed_run_and_wait( update_graph_execution_start_time = exposed_run_and_wait(

View File

@@ -295,6 +295,19 @@ def _enqueue_next_nodes(
data=data, data=data,
) )
def validate_next_exec(
next_node_exec_id: str, next_node: Node, next_node_input: BlockInput
) -> tuple[BlockInput | None, str]:
try:
return validate_exec(next_node, next_node_input)
except Exception as e:
db_client.upsert_execution_output(next_node_exec_id, "error", str(e))
execution = db_client.update_execution_status(
next_node_exec_id, ExecutionStatus.FAILED
)
db_client.send_execution_update(execution)
return None, str(e)
def register_next_executions(node_link: Link) -> list[NodeExecutionEntry]: def register_next_executions(node_link: Link) -> list[NodeExecutionEntry]:
enqueued_executions = [] enqueued_executions = []
next_output_name = node_link.source_name next_output_name = node_link.source_name
@@ -312,29 +325,33 @@ def _enqueue_next_nodes(
# Or the same input to be consumed multiple times. # Or the same input to be consumed multiple times.
with synchronized(f"upsert_input-{next_node_id}-{graph_exec_id}"): with synchronized(f"upsert_input-{next_node_id}-{graph_exec_id}"):
# Add output data to the earliest incomplete execution, or create a new one. # Add output data to the earliest incomplete execution, or create a new one.
next_node_exec_id, next_node_input = db_client.upsert_execution_input( next_node_exec, next_node_input = db_client.upsert_execution_input(
node_id=next_node_id, node_id=next_node_id,
graph_exec_id=graph_exec_id, graph_exec_id=graph_exec_id,
input_name=next_input_name, input_name=next_input_name,
input_data=next_data, input_data=next_data,
) )
next_node_exec_id = next_node_exec.node_exec_id
db_client.send_execution_update(next_node_exec)
# Complete missing static input pins data using the last execution input. # Complete missing static input pins data using the last execution input.
static_link_names = { static_links = {
link.sink_name link.source_id: (link.source_name, link.sink_name)
for link in next_node.input_links for link in next_node.input_links
if link.is_static and link.sink_name not in next_node_input if link.is_static
} }
if static_link_names and ( static_output = (
latest_execution := db_client.get_latest_execution( db_client.get_output_from_links(static_links, graph_exec_id)
next_node_id, graph_exec_id if static_links
) else {}
): )
for name in static_link_names: for name, value in static_output.items():
next_node_input[name] = latest_execution.input_data.get(name) next_node_input[name] = next_node_input.get(name, value)
# Validate the input data for the next node. # Validate the input data for the next node.
next_node_input, validation_msg = validate_exec(next_node, next_node_input) next_node_input, validation_msg = validate_next_exec(
next_node_exec_id, next_node, next_node_input
)
suffix = f"{next_output_name}>{next_input_name}~{next_node_exec_id}:{validation_msg}" suffix = f"{next_output_name}>{next_input_name}~{next_node_exec_id}:{validation_msg}"
# Incomplete input data, skip queueing the execution. # Incomplete input data, skip queueing the execution.
@@ -365,15 +382,10 @@ def _enqueue_next_nodes(
idata = iexec.input_data idata = iexec.input_data
ineid = iexec.node_exec_id ineid = iexec.node_exec_id
static_link_names = { for input_name, input_value in static_output.items():
link.sink_name idata[input_name] = idata.get(input_name, input_value)
for link in next_node.input_links
if link.is_static and link.sink_name not in idata
}
for input_name in static_link_names:
idata[input_name] = next_node_input[input_name]
idata, msg = validate_exec(next_node, idata) idata, msg = validate_next_exec(next_node_exec_id, next_node, idata)
suffix = f"{next_output_name}>{next_input_name}~{ineid}:{msg}" suffix = f"{next_output_name}>{next_input_name}~{ineid}:{msg}"
if not idata: if not idata:
log_metadata.info(f"Enqueueing static-link skipped: {suffix}") log_metadata.info(f"Enqueueing static-link skipped: {suffix}")

View File

@@ -206,7 +206,9 @@ async def test_input_pin_always_waited(server: SpinTestServer):
graph_exec = await server.agent_server.test_get_graph_run_results( graph_exec = await server.agent_server.test_get_graph_run_results(
test_graph.id, graph_exec_id, test_user.id test_graph.id, graph_exec_id, test_user.id
) )
assert len(graph_exec.node_executions) == 3 assert (
len(graph_exec.node_executions) >= 3
) # Concurrency can cause duplicate executions.
# FindInDictionaryBlock should wait for the input pin to be provided, # FindInDictionaryBlock should wait for the input pin to be provided,
# Hence executing extraction of "key" from {"key1": "value1", "key2": "value2"} # Hence executing extraction of "key" from {"key1": "value1", "key2": "value2"}
assert graph_exec.node_executions[2].status == execution.ExecutionStatus.COMPLETED assert graph_exec.node_executions[2].status == execution.ExecutionStatus.COMPLETED