fix(rnd): Avoid expensive call of get_service_client on block execution (#7349)

This commit is contained in:
Zamil Majdy
2024-07-09 15:07:57 +04:00
committed by GitHub
parent 57cc8b69e9
commit ff71b0beb7

View File

@@ -1,8 +1,10 @@
import asyncio
import logging
from concurrent.futures import ProcessPoolExecutor
from typing import Any, Coroutine, Generator, TypeVar
from typing import Any, Coroutine, Generator, TypeVar, TYPE_CHECKING
if TYPE_CHECKING:
from autogpt_server.server.server import AgentServer
from autogpt_server.data import db
from autogpt_server.data.block import Block, get_block
from autogpt_server.data.execution import (
@@ -33,7 +35,9 @@ ExecutionStream = Generator[Execution, None, None]
def execute_node(
loop: asyncio.AbstractEventLoop, data: Execution
loop: asyncio.AbstractEventLoop,
agent_server_client: "AgentServer",
data: Execution
) -> ExecutionStream:
"""
Execute a node in the graph. This will trigger a block execution on a node,
@@ -41,13 +45,12 @@ def execute_node(
Args:
loop: The event loop to run the async functions.
agent_server_client: The client to send execution updates to the server.
data: The execution data for executing the current node.
Returns:
The subsequent node to be enqueued, or None if there is no subsequent node.
"""
from autogpt_server.server.server import AgentServer
agent_server_client = get_service_client(AgentServer)
graph_exec_id = data.graph_exec_id
node_exec_id = data.node_exec_id
exec_data = data.data
@@ -62,7 +65,7 @@ def execute_node(
if not node:
logger.error(f"Node {node_id} not found.")
return
node_block = get_block(node.block_id) # type: ignore
if not node_block:
logger.error(f"Block {node.block_id} not found.")
@@ -76,8 +79,8 @@ def execute_node(
# TODO: Remove need for multiple database lookups
execution_result = wait(get_execution_result(
graph_exec_id, node_exec_id
))
graph_exec_id, node_exec_id
))
agent_server_client.send_execution_update(execution_result.model_dump()) # type: ignore
try:
@@ -108,8 +111,8 @@ def execute_node(
# TODO: Remove need for multiple database lookups
execution_result = wait(get_execution_result(
graph_exec_id, node_exec_id
))
graph_exec_id, node_exec_id
))
agent_server_client.send_execution_update(execution_result.model_dump()) # type: ignore
raise e
@@ -178,14 +181,14 @@ def validate_exec(node: Node, data: dict[str, Any]) -> tuple[bool, str]:
A tuple of a boolean indicating if the data is valid, and a message if not.
Return the executed block name if the data is valid.
"""
node_block: Block | None = get_block(node.block_id) # type: ignore
node_block: Block | None = get_block(node.block_id) # type: ignore
if not node_block:
return False, f"Block for {node.block_id} not found."
error_message = f"Input data missing for {node_block.name}:"
input_fields_from_schema = node_block.input_schema.get_required_fields() # type: ignore
if not input_fields_from_schema.issubset(data): # type: ignore
input_fields_from_schema = node_block.input_schema.get_required_fields() # type: ignore
if not input_fields_from_schema.issubset(data): # type: ignore
return False, f"{error_message} {input_fields_from_schema - set(data)}"
input_fields_from_nodes = {link.sink_name for link in node.input_links}
@@ -200,25 +203,26 @@ def validate_exec(node: Node, data: dict[str, Any]) -> tuple[bool, str]:
return True, node_block.name
def get_agent_server_client() -> "AgentServer":
from autogpt_server.server.server import AgentServer
return get_service_client(AgentServer)
class Executor:
loop: asyncio.AbstractEventLoop
@property
def agent_server_client(self) -> Any:
from autogpt_server.server.server import AgentServer
return get_service_client(AgentServer) # type: ignore
@classmethod
def on_executor_start(cls):
cls.loop = asyncio.new_event_loop()
cls.loop.run_until_complete(db.connect())
cls.agent_server_client = get_agent_server_client()
@classmethod
def on_start_execution(cls, q: ExecutionQueue, data: Execution) -> bool:
prefix = get_log_prefix(data.graph_exec_id, data.node_exec_id)
try:
logger.warning(f"{prefix} Start execution")
for execution in execute_node(cls.loop, data): # type: ignore
for execution in execute_node(cls.loop, cls.agent_server_client, data):
q.add(execution)
return True
except Exception as e:
@@ -245,9 +249,8 @@ class ExecutionManager(AppService):
)
@property
def agent_server_client(self) -> Any:
from autogpt_server.server.server import AgentServer
return get_service_client(AgentServer) # type: ignore
def agent_server_client(self) -> "AgentServer":
return get_agent_server_client()
@expose
def add_execution(self, graph_id: str, data: dict[str, Any]) -> dict[Any, Any]:
@@ -289,7 +292,8 @@ class ExecutionManager(AppService):
try:
self.agent_server_client.send_execution_update(execution_result.model_dump()) # type: ignore
except Exception as e:
raise(Exception(f"Error sending execution of type: {type(execution_result)} update: {e} "))
msg = f"Error sending execution of type {type(execution_result)}: {e}"
raise Exception(msg)
executions.append(
{