From 1fc984f7fdac99c84fc89f87b8f7e0621572d84c Mon Sep 17 00:00:00 2001 From: Reinier van der Leer Date: Thu, 3 Apr 2025 20:58:21 +0200 Subject: [PATCH] feat(platform/library): Add real-time "Steps" count to agent run view (#9740) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Resolves #9731 ### Changes 🏗️ - feat: Add "Steps" showing `node_execution_count` to agent run view - Add `GraphExecutionMeta.stats.node_exec_count` attribute - feat(backend/executor): Send graph execution update after *every* node execution (instead of only I/O node executions) - Update graph execution stats after every node execution - refactor: Move `GraphExecutionMeta` stats into sub-object (`cost`, `duration`, `total_run_time` -> `stats.cost`, `stats.duration`, `stats.node_exec_time`) ### Checklist 📋 #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: - View an agent run with 1+ steps on `/library/agents/[id]` - [x] "Info" section layout doesn't break - [x] Number of steps is shown - Initiate a new agent run - [x] "Steps" increments in real time during execution --- .../backend/backend/data/execution.py | 51 ++++++----- .../backend/backend/data/includes.py | 6 +- .../backend/backend/executor/manager.py | 87 ++++++++----------- .../backend/test/server/test_con_manager.py | 9 +- .../agents/agent-run-details-view.tsx | 15 ++-- .../src/components/monitor/FlowRunInfo.tsx | 12 +-- .../src/components/monitor/FlowRunsList.tsx | 6 +- .../src/components/monitor/FlowRunsStatus.tsx | 6 +- .../components/monitor/FlowRunsTimeline.tsx | 19 ++-- .../src/lib/autogpt-server-api/types.ts | 16 ++-- 10 files changed, 123 insertions(+), 104 deletions(-) diff --git a/autogpt_platform/backend/backend/data/execution.py b/autogpt_platform/backend/backend/data/execution.py index 9058f1df8b..776277e396 100644 --- a/autogpt_platform/backend/backend/data/execution.py +++ b/autogpt_platform/backend/backend/data/execution.py @@ -59,23 +59,27 @@ ExecutionStatus = AgentExecutionStatus class GraphExecutionMeta(BaseDbModel): user_id: str - started_at: datetime - ended_at: datetime - cost: Optional[int] = Field(..., description="Execution cost in credits") - duration: float = Field(..., description="Seconds from start to end of run") - total_run_time: float = Field(..., description="Seconds of node runtime") - status: ExecutionStatus graph_id: str graph_version: int preset_id: Optional[str] = None + status: ExecutionStatus + started_at: datetime + ended_at: datetime + + class Stats(BaseModel): + cost: int = Field(..., description="Execution cost (cents)") + duration: float = Field(..., description="Seconds from start to end of run") + node_exec_time: float = Field(..., description="Seconds of total node runtime") + node_exec_count: int = Field(..., description="Number of node executions") + + stats: Stats | None @staticmethod def from_db(_graph_exec: AgentGraphExecution): now = datetime.now(timezone.utc) + # TODO: make started_at and ended_at optional start_time = _graph_exec.startedAt or _graph_exec.createdAt end_time = _graph_exec.updatedAt or now - duration = (end_time - start_time).total_seconds() - total_run_time = duration try: stats = GraphExecutionStats.model_validate(_graph_exec.stats) @@ -87,21 +91,25 @@ class GraphExecutionMeta(BaseDbModel): ) stats = None - duration = stats.walltime if stats else duration - total_run_time = stats.nodes_walltime if stats else total_run_time - return GraphExecutionMeta( id=_graph_exec.id, user_id=_graph_exec.userId, - started_at=start_time, - ended_at=end_time, - cost=stats.cost if stats else None, - duration=duration, - total_run_time=total_run_time, - status=ExecutionStatus(_graph_exec.executionStatus), graph_id=_graph_exec.agentGraphId, graph_version=_graph_exec.agentGraphVersion, preset_id=_graph_exec.agentPresetId, + status=ExecutionStatus(_graph_exec.executionStatus), + started_at=start_time, + ended_at=end_time, + stats=( + GraphExecutionMeta.Stats( + cost=stats.cost, + duration=stats.walltime, + node_exec_time=stats.nodes_walltime, + node_exec_count=stats.node_count, + ) + if stats + else None + ), ) @@ -116,10 +124,11 @@ class GraphExecution(GraphExecutionMeta): graph_exec = GraphExecutionMeta.from_db(_graph_exec) - node_executions = sorted( + complete_node_executions = sorted( [ NodeExecutionResult.from_db(ne, _graph_exec.userId) for ne in _graph_exec.AgentNodeExecutions + if ne.executionStatus != ExecutionStatus.INCOMPLETE ], key=lambda ne: (ne.queue_time is None, ne.queue_time or ne.add_time), ) @@ -128,7 +137,7 @@ class GraphExecution(GraphExecutionMeta): **{ # inputs from Agent Input Blocks exec.input_data["name"]: exec.input_data.get("value") - for exec in node_executions + for exec in complete_node_executions if ( (block := get_block(exec.block_id)) and block.block_type == BlockType.INPUT @@ -137,7 +146,7 @@ class GraphExecution(GraphExecutionMeta): **{ # input from webhook-triggered block "payload": exec.input_data["payload"] - for exec in node_executions + for exec in complete_node_executions if ( (block := get_block(exec.block_id)) and block.block_type @@ -147,7 +156,7 @@ class GraphExecution(GraphExecutionMeta): } outputs: CompletedBlockOutput = defaultdict(list) - for exec in node_executions: + for exec in complete_node_executions: if ( block := get_block(exec.block_id) ) and block.block_type == BlockType.OUTPUT: diff --git a/autogpt_platform/backend/backend/data/includes.py b/autogpt_platform/backend/backend/data/includes.py index cad2a9ea54..57dcc9aec9 100644 --- a/autogpt_platform/backend/backend/data/includes.py +++ b/autogpt_platform/backend/backend/data/includes.py @@ -1,4 +1,5 @@ -import prisma +import prisma.enums +import prisma.types from backend.blocks.io import IO_BLOCK_IDs @@ -46,6 +47,9 @@ GRAPH_EXECUTION_INCLUDE: prisma.types.AgentGraphExecutionInclude = { "AgentNode": { "AgentBlock": {"id": {"in": IO_BLOCK_IDs}}, # type: ignore }, + "NOT": { + "executionStatus": prisma.enums.AgentExecutionStatus.INCOMPLETE, + }, }, } } diff --git a/autogpt_platform/backend/backend/executor/manager.py b/autogpt_platform/backend/backend/executor/manager.py index 25e1ac5628..243052c73e 100644 --- a/autogpt_platform/backend/backend/executor/manager.py +++ b/autogpt_platform/backend/backend/executor/manager.py @@ -160,12 +160,8 @@ def execute_node( node_block = node.block def push_output(output_name: str, output_data: Any) -> None: - _push_node_execution_output( - db_client=db_client, - user_id=user_id, - graph_exec_id=graph_exec_id, + db_client.upsert_execution_output( node_exec_id=node_exec_id, - block_id=node_block.id, output_name=output_name, output_data=output_data, ) @@ -277,35 +273,6 @@ def execute_node( execution_stats.output_size = output_size -def _push_node_execution_output( - db_client: "DatabaseManager", - user_id: str, - graph_exec_id: str, - node_exec_id: str, - block_id: str, - output_name: str, - output_data: Any, -): - from backend.blocks.io import IO_BLOCK_IDs - - db_client.upsert_execution_output( - node_exec_id=node_exec_id, - output_name=output_name, - output_data=output_data, - ) - - # Automatically push execution updates for all agent I/O - if block_id in IO_BLOCK_IDs: - graph_exec = db_client.get_graph_execution( - user_id=user_id, execution_id=graph_exec_id - ) - if not graph_exec: - raise ValueError( - f"Graph execution #{graph_exec_id} for user #{user_id} not found" - ) - db_client.send_execution_update(graph_exec) - - def _enqueue_next_nodes( db_client: "DatabaseManager", node: Node, @@ -745,15 +712,19 @@ class Executor: Exception | None: The error that occurred during the execution, if any. """ log_metadata.info(f"Start graph execution {graph_exec.graph_exec_id}") - exec_stats = GraphExecutionStats() + execution_stats = GraphExecutionStats() + execution_status = ExecutionStatus.RUNNING error = None finished = False def cancel_handler(): + nonlocal execution_status + while not cancel.is_set(): cancel.wait(1) if finished: return + execution_status = ExecutionStatus.TERMINATED cls.executor.terminate() log_metadata.info(f"Terminated graph execution {graph_exec.graph_exec_id}") cls._init_node_executor_pool() @@ -776,18 +747,34 @@ class Executor: if not isinstance(result, NodeExecutionStats): return - nonlocal exec_stats - exec_stats.node_count += 1 - exec_stats.nodes_cputime += result.cputime - exec_stats.nodes_walltime += result.walltime + nonlocal execution_stats + execution_stats.node_count += 1 + execution_stats.nodes_cputime += result.cputime + execution_stats.nodes_walltime += result.walltime if (err := result.error) and isinstance(err, Exception): - exec_stats.node_error_count += 1 + execution_stats.node_error_count += 1 + + if _graph_exec := cls.db_client.update_graph_execution_stats( + graph_exec_id=exec_data.graph_exec_id, + status=execution_status, + stats=execution_stats, + ): + cls.db_client.send_execution_update(_graph_exec) + else: + logger.error( + "Callback for " + f"finished node execution #{exec_data.node_exec_id} " + "could not update execution stats " + f"for graph execution #{exec_data.graph_exec_id}; " + f"triggered while graph exec status = {execution_status}" + ) return callback while not queue.empty(): if cancel.is_set(): - return exec_stats, ExecutionStatus.TERMINATED, error + execution_status = ExecutionStatus.TERMINATED + return execution_stats, execution_status, error exec_data = queue.get() @@ -809,29 +796,26 @@ class Executor: exec_cost_counter = cls._charge_usage( node_exec=exec_data, execution_count=exec_cost_counter + 1, - execution_stats=exec_stats, + execution_stats=execution_stats, ) except InsufficientBalanceError as error: node_exec_id = exec_data.node_exec_id - _push_node_execution_output( - db_client=cls.db_client, - user_id=graph_exec.user_id, - graph_exec_id=graph_exec.graph_exec_id, + cls.db_client.upsert_execution_output( node_exec_id=node_exec_id, - block_id=exec_data.block_id, output_name="error", output_data=str(error), ) + execution_status = ExecutionStatus.FAILED exec_update = cls.db_client.update_node_execution_status( - node_exec_id, ExecutionStatus.FAILED + node_exec_id, execution_status ) cls.db_client.send_execution_update(exec_update) cls._handle_low_balance_notif( graph_exec.user_id, graph_exec.graph_id, - exec_stats, + execution_stats, error, ) raise @@ -849,7 +833,8 @@ class Executor: ) for node_id, execution in list(running_executions.items()): if cancel.is_set(): - return exec_stats, ExecutionStatus.TERMINATED, error + execution_status = ExecutionStatus.TERMINATED + return execution_stats, execution_status, error if not queue.empty(): break # yield to parent loop to execute new queue items @@ -876,7 +861,7 @@ class Executor: cancel_thread.join() clean_exec_files(graph_exec.graph_exec_id) - return exec_stats, execution_status, error + return execution_stats, execution_status, error @classmethod def _handle_agent_run_notif( diff --git a/autogpt_platform/backend/test/server/test_con_manager.py b/autogpt_platform/backend/test/server/test_con_manager.py index 5fef82d9fb..69d984c7f2 100644 --- a/autogpt_platform/backend/test/server/test_con_manager.py +++ b/autogpt_platform/backend/test/server/test_con_manager.py @@ -89,11 +89,14 @@ async def test_send_graph_execution_result( graph_id="test_graph", graph_version=1, status=ExecutionStatus.COMPLETED, - cost=0, - duration=1.2, - total_run_time=0.5, started_at=datetime.now(tz=timezone.utc), ended_at=datetime.now(tz=timezone.utc), + stats=GraphExecutionEvent.Stats( + cost=0, + duration=1.2, + node_exec_time=0.5, + node_exec_count=2, + ), inputs={ "input_1": "some input value :)", "input_2": "some *other* input value", diff --git a/autogpt_platform/frontend/src/components/agents/agent-run-details-view.tsx b/autogpt_platform/frontend/src/components/agents/agent-run-details-view.tsx index 42d2c8eecb..14c82352c9 100644 --- a/autogpt_platform/frontend/src/components/agents/agent-run-details-view.tsx +++ b/autogpt_platform/frontend/src/components/agents/agent-run-details-view.tsx @@ -55,11 +55,16 @@ export default function AgentRunDetailsView({ label: "Started", value: `${moment(run.started_at).fromNow()}, ${moment(run.started_at).format("HH:mm")}`, }, - { - label: "Duration", - value: moment.duration(run.duration, "seconds").humanize(), - }, - ...(run.cost ? [{ label: "Cost", value: `${run.cost} credits` }] : []), + ...(run.stats + ? [ + { + label: "Duration", + value: moment.duration(run.stats.duration, "seconds").humanize(), + }, + { label: "Steps", value: run.stats.node_exec_count }, + { label: "Cost", value: `${run.stats.cost} credits` }, + ] + : []), ]; }, [run, runStatus]); diff --git a/autogpt_platform/frontend/src/components/monitor/FlowRunInfo.tsx b/autogpt_platform/frontend/src/components/monitor/FlowRunInfo.tsx index 252d3240d6..079a8d0dde 100644 --- a/autogpt_platform/frontend/src/components/monitor/FlowRunInfo.tsx +++ b/autogpt_platform/frontend/src/components/monitor/FlowRunInfo.tsx @@ -115,11 +115,13 @@ export const FlowRunInfo: React.FC< Finished:{" "} {moment(execution.ended_at).format("YYYY-MM-DD HH:mm:ss")}

-

- Duration (run time):{" "} - {execution.duration.toFixed(1)} ( - {execution.total_run_time.toFixed(1)}) seconds -

+ {execution.stats && ( +

+ Duration (run time):{" "} + {execution.stats.duration.toFixed(1)} ( + {execution.stats.node_exec_time.toFixed(1)}) seconds +

+ )} - {formatDuration(execution.duration)} + + {execution.stats + ? formatDuration(execution.stats.duration) + : ""} + ))} diff --git a/autogpt_platform/frontend/src/components/monitor/FlowRunsStatus.tsx b/autogpt_platform/frontend/src/components/monitor/FlowRunsStatus.tsx index 8f03820f5c..67134389fc 100644 --- a/autogpt_platform/frontend/src/components/monitor/FlowRunsStatus.tsx +++ b/autogpt_platform/frontend/src/components/monitor/FlowRunsStatus.tsx @@ -105,16 +105,16 @@ export const FlowRunsStatus: React.FC<{

Total run time:{" "} {filteredFlowRuns.reduce( - (total, run) => total + run.total_run_time, + (total, run) => total + (run.stats?.node_exec_time ?? 0), 0, )}{" "} seconds

- {filteredFlowRuns.some((r) => r.cost) && ( + {filteredFlowRuns.some((r) => r.stats) && (

Total cost:{" "} {filteredFlowRuns.reduce( - (total, run) => total + (run.cost ?? 0), + (total, run) => total + (run.stats?.cost ?? 0), 0, )}{" "} seconds diff --git a/autogpt_platform/frontend/src/components/monitor/FlowRunsTimeline.tsx b/autogpt_platform/frontend/src/components/monitor/FlowRunsTimeline.tsx index 410bc36c6c..bf333a2207 100644 --- a/autogpt_platform/frontend/src/components/monitor/FlowRunsTimeline.tsx +++ b/autogpt_platform/frontend/src/components/monitor/FlowRunsTimeline.tsx @@ -81,11 +81,13 @@ export const FlowRunsTimeline = ({ Started:{" "} {moment(data.started_at).format("YYYY-MM-DD HH:mm:ss")}

-

- Duration / run time:{" "} - {formatDuration(data.duration)} /{" "} - {formatDuration(data.total_run_time)} -

+ {data.stats && ( +

+ Duration / run time:{" "} + {formatDuration(data.stats.duration)} /{" "} + {formatDuration(data.stats.node_exec_time)} +

+ )} ); } @@ -99,8 +101,9 @@ export const FlowRunsTimeline = ({ .filter((e) => e.graph_id == flow.agent_id) .map((e) => ({ ...e, - time: e.started_at.getTime() + e.total_run_time * 1000, - _duration: e.total_run_time, + time: + e.started_at.getTime() + (e.stats?.node_exec_time ?? 0) * 1000, + _duration: e.stats?.node_exec_time ?? 0, }))} name={flow.name} fill={`hsl(${(hashString(flow.id) * 137.5) % 360}, 70%, 50%)`} @@ -120,7 +123,7 @@ export const FlowRunsTimeline = ({ { ...execution, time: execution.ended_at.getTime(), - _duration: execution.total_run_time, + _duration: execution.stats?.node_exec_time ?? 0, }, ]} stroke={`hsl(${(hashString(execution.graph_id) * 137.5) % 360}, 70%, 50%)`} diff --git a/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts b/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts index c84990a2c6..badd357db7 100644 --- a/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts +++ b/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts @@ -249,15 +249,19 @@ export type LinkCreatable = Omit & { /* Mirror of backend/data/execution.py:GraphExecutionMeta */ export type GraphExecutionMeta = { id: GraphExecutionID; - started_at: Date; - ended_at: Date; - cost?: number; - duration: number; - total_run_time: number; - status: "QUEUED" | "RUNNING" | "COMPLETED" | "TERMINATED" | "FAILED"; + user_id: UserID; graph_id: GraphID; graph_version: number; preset_id?: string; + status: "QUEUED" | "RUNNING" | "COMPLETED" | "TERMINATED" | "FAILED"; + started_at: Date; + ended_at: Date; + stats?: { + cost: number; + duration: number; + node_exec_time: number; + node_exec_count: number; + }; }; export type GraphExecutionID = Brand;