feat(platform/library): Add real-time "Steps" count to agent run view (#9740)

- Resolves #9731

### Changes 🏗️

- feat: Add "Steps" showing `node_execution_count` to agent run view
  - Add `GraphExecutionMeta.stats.node_exec_count` attribute

- feat(backend/executor): Send graph execution update after *every* node
execution (instead of only I/O node executions)
  - Update graph execution stats after every node execution

- refactor: Move `GraphExecutionMeta` stats into sub-object
(`cost`, `duration`, `total_run_time` -> `stats.cost`, `stats.duration`,
`stats.node_exec_time`)

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - View an agent run with 1+ steps on `/library/agents/[id]`
    - [x] "Info" section layout doesn't break
    - [x] Number of steps is shown
  - Initiate a new agent run
    - [x] "Steps" increments in real time during execution
This commit is contained in:
Reinier van der Leer
2025-04-03 20:58:21 +02:00
committed by GitHub
parent d0d610720c
commit 1fc984f7fd
10 changed files with 123 additions and 104 deletions

View File

@@ -59,23 +59,27 @@ ExecutionStatus = AgentExecutionStatus
class GraphExecutionMeta(BaseDbModel):
user_id: str
started_at: datetime
ended_at: datetime
cost: Optional[int] = Field(..., description="Execution cost in credits")
duration: float = Field(..., description="Seconds from start to end of run")
total_run_time: float = Field(..., description="Seconds of node runtime")
status: ExecutionStatus
graph_id: str
graph_version: int
preset_id: Optional[str] = None
status: ExecutionStatus
started_at: datetime
ended_at: datetime
class Stats(BaseModel):
cost: int = Field(..., description="Execution cost (cents)")
duration: float = Field(..., description="Seconds from start to end of run")
node_exec_time: float = Field(..., description="Seconds of total node runtime")
node_exec_count: int = Field(..., description="Number of node executions")
stats: Stats | None
@staticmethod
def from_db(_graph_exec: AgentGraphExecution):
now = datetime.now(timezone.utc)
# TODO: make started_at and ended_at optional
start_time = _graph_exec.startedAt or _graph_exec.createdAt
end_time = _graph_exec.updatedAt or now
duration = (end_time - start_time).total_seconds()
total_run_time = duration
try:
stats = GraphExecutionStats.model_validate(_graph_exec.stats)
@@ -87,21 +91,25 @@ class GraphExecutionMeta(BaseDbModel):
)
stats = None
duration = stats.walltime if stats else duration
total_run_time = stats.nodes_walltime if stats else total_run_time
return GraphExecutionMeta(
id=_graph_exec.id,
user_id=_graph_exec.userId,
started_at=start_time,
ended_at=end_time,
cost=stats.cost if stats else None,
duration=duration,
total_run_time=total_run_time,
status=ExecutionStatus(_graph_exec.executionStatus),
graph_id=_graph_exec.agentGraphId,
graph_version=_graph_exec.agentGraphVersion,
preset_id=_graph_exec.agentPresetId,
status=ExecutionStatus(_graph_exec.executionStatus),
started_at=start_time,
ended_at=end_time,
stats=(
GraphExecutionMeta.Stats(
cost=stats.cost,
duration=stats.walltime,
node_exec_time=stats.nodes_walltime,
node_exec_count=stats.node_count,
)
if stats
else None
),
)
@@ -116,10 +124,11 @@ class GraphExecution(GraphExecutionMeta):
graph_exec = GraphExecutionMeta.from_db(_graph_exec)
node_executions = sorted(
complete_node_executions = sorted(
[
NodeExecutionResult.from_db(ne, _graph_exec.userId)
for ne in _graph_exec.AgentNodeExecutions
if ne.executionStatus != ExecutionStatus.INCOMPLETE
],
key=lambda ne: (ne.queue_time is None, ne.queue_time or ne.add_time),
)
@@ -128,7 +137,7 @@ class GraphExecution(GraphExecutionMeta):
**{
# inputs from Agent Input Blocks
exec.input_data["name"]: exec.input_data.get("value")
for exec in node_executions
for exec in complete_node_executions
if (
(block := get_block(exec.block_id))
and block.block_type == BlockType.INPUT
@@ -137,7 +146,7 @@ class GraphExecution(GraphExecutionMeta):
**{
# input from webhook-triggered block
"payload": exec.input_data["payload"]
for exec in node_executions
for exec in complete_node_executions
if (
(block := get_block(exec.block_id))
and block.block_type
@@ -147,7 +156,7 @@ class GraphExecution(GraphExecutionMeta):
}
outputs: CompletedBlockOutput = defaultdict(list)
for exec in node_executions:
for exec in complete_node_executions:
if (
block := get_block(exec.block_id)
) and block.block_type == BlockType.OUTPUT:

View File

@@ -1,4 +1,5 @@
import prisma
import prisma.enums
import prisma.types
from backend.blocks.io import IO_BLOCK_IDs
@@ -46,6 +47,9 @@ GRAPH_EXECUTION_INCLUDE: prisma.types.AgentGraphExecutionInclude = {
"AgentNode": {
"AgentBlock": {"id": {"in": IO_BLOCK_IDs}}, # type: ignore
},
"NOT": {
"executionStatus": prisma.enums.AgentExecutionStatus.INCOMPLETE,
},
},
}
}

View File

@@ -160,12 +160,8 @@ def execute_node(
node_block = node.block
def push_output(output_name: str, output_data: Any) -> None:
_push_node_execution_output(
db_client=db_client,
user_id=user_id,
graph_exec_id=graph_exec_id,
db_client.upsert_execution_output(
node_exec_id=node_exec_id,
block_id=node_block.id,
output_name=output_name,
output_data=output_data,
)
@@ -277,35 +273,6 @@ def execute_node(
execution_stats.output_size = output_size
def _push_node_execution_output(
db_client: "DatabaseManager",
user_id: str,
graph_exec_id: str,
node_exec_id: str,
block_id: str,
output_name: str,
output_data: Any,
):
from backend.blocks.io import IO_BLOCK_IDs
db_client.upsert_execution_output(
node_exec_id=node_exec_id,
output_name=output_name,
output_data=output_data,
)
# Automatically push execution updates for all agent I/O
if block_id in IO_BLOCK_IDs:
graph_exec = db_client.get_graph_execution(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec:
raise ValueError(
f"Graph execution #{graph_exec_id} for user #{user_id} not found"
)
db_client.send_execution_update(graph_exec)
def _enqueue_next_nodes(
db_client: "DatabaseManager",
node: Node,
@@ -745,15 +712,19 @@ class Executor:
Exception | None: The error that occurred during the execution, if any.
"""
log_metadata.info(f"Start graph execution {graph_exec.graph_exec_id}")
exec_stats = GraphExecutionStats()
execution_stats = GraphExecutionStats()
execution_status = ExecutionStatus.RUNNING
error = None
finished = False
def cancel_handler():
nonlocal execution_status
while not cancel.is_set():
cancel.wait(1)
if finished:
return
execution_status = ExecutionStatus.TERMINATED
cls.executor.terminate()
log_metadata.info(f"Terminated graph execution {graph_exec.graph_exec_id}")
cls._init_node_executor_pool()
@@ -776,18 +747,34 @@ class Executor:
if not isinstance(result, NodeExecutionStats):
return
nonlocal exec_stats
exec_stats.node_count += 1
exec_stats.nodes_cputime += result.cputime
exec_stats.nodes_walltime += result.walltime
nonlocal execution_stats
execution_stats.node_count += 1
execution_stats.nodes_cputime += result.cputime
execution_stats.nodes_walltime += result.walltime
if (err := result.error) and isinstance(err, Exception):
exec_stats.node_error_count += 1
execution_stats.node_error_count += 1
if _graph_exec := cls.db_client.update_graph_execution_stats(
graph_exec_id=exec_data.graph_exec_id,
status=execution_status,
stats=execution_stats,
):
cls.db_client.send_execution_update(_graph_exec)
else:
logger.error(
"Callback for "
f"finished node execution #{exec_data.node_exec_id} "
"could not update execution stats "
f"for graph execution #{exec_data.graph_exec_id}; "
f"triggered while graph exec status = {execution_status}"
)
return callback
while not queue.empty():
if cancel.is_set():
return exec_stats, ExecutionStatus.TERMINATED, error
execution_status = ExecutionStatus.TERMINATED
return execution_stats, execution_status, error
exec_data = queue.get()
@@ -809,29 +796,26 @@ class Executor:
exec_cost_counter = cls._charge_usage(
node_exec=exec_data,
execution_count=exec_cost_counter + 1,
execution_stats=exec_stats,
execution_stats=execution_stats,
)
except InsufficientBalanceError as error:
node_exec_id = exec_data.node_exec_id
_push_node_execution_output(
db_client=cls.db_client,
user_id=graph_exec.user_id,
graph_exec_id=graph_exec.graph_exec_id,
cls.db_client.upsert_execution_output(
node_exec_id=node_exec_id,
block_id=exec_data.block_id,
output_name="error",
output_data=str(error),
)
execution_status = ExecutionStatus.FAILED
exec_update = cls.db_client.update_node_execution_status(
node_exec_id, ExecutionStatus.FAILED
node_exec_id, execution_status
)
cls.db_client.send_execution_update(exec_update)
cls._handle_low_balance_notif(
graph_exec.user_id,
graph_exec.graph_id,
exec_stats,
execution_stats,
error,
)
raise
@@ -849,7 +833,8 @@ class Executor:
)
for node_id, execution in list(running_executions.items()):
if cancel.is_set():
return exec_stats, ExecutionStatus.TERMINATED, error
execution_status = ExecutionStatus.TERMINATED
return execution_stats, execution_status, error
if not queue.empty():
break # yield to parent loop to execute new queue items
@@ -876,7 +861,7 @@ class Executor:
cancel_thread.join()
clean_exec_files(graph_exec.graph_exec_id)
return exec_stats, execution_status, error
return execution_stats, execution_status, error
@classmethod
def _handle_agent_run_notif(

View File

@@ -89,11 +89,14 @@ async def test_send_graph_execution_result(
graph_id="test_graph",
graph_version=1,
status=ExecutionStatus.COMPLETED,
cost=0,
duration=1.2,
total_run_time=0.5,
started_at=datetime.now(tz=timezone.utc),
ended_at=datetime.now(tz=timezone.utc),
stats=GraphExecutionEvent.Stats(
cost=0,
duration=1.2,
node_exec_time=0.5,
node_exec_count=2,
),
inputs={
"input_1": "some input value :)",
"input_2": "some *other* input value",

View File

@@ -55,11 +55,16 @@ export default function AgentRunDetailsView({
label: "Started",
value: `${moment(run.started_at).fromNow()}, ${moment(run.started_at).format("HH:mm")}`,
},
...(run.stats
? [
{
label: "Duration",
value: moment.duration(run.duration, "seconds").humanize(),
value: moment.duration(run.stats.duration, "seconds").humanize(),
},
...(run.cost ? [{ label: "Cost", value: `${run.cost} credits` }] : []),
{ label: "Steps", value: run.stats.node_exec_count },
{ label: "Cost", value: `${run.stats.cost} credits` },
]
: []),
];
}, [run, runStatus]);

View File

@@ -115,11 +115,13 @@ export const FlowRunInfo: React.FC<
<strong>Finished:</strong>{" "}
{moment(execution.ended_at).format("YYYY-MM-DD HH:mm:ss")}
</p>
{execution.stats && (
<p>
<strong>Duration (run time):</strong>{" "}
{execution.duration.toFixed(1)} (
{execution.total_run_time.toFixed(1)}) seconds
{execution.stats.duration.toFixed(1)} (
{execution.stats.node_exec_time.toFixed(1)}) seconds
</p>
)}
</CardContent>
</Card>
<RunnerOutputUI

View File

@@ -62,7 +62,11 @@ export const FlowRunsList: React.FC<{
className="w-full justify-center"
/>
</TableCell>
<TableCell>{formatDuration(execution.duration)}</TableCell>
<TableCell>
{execution.stats
? formatDuration(execution.stats.duration)
: ""}
</TableCell>
</TableRow>
))}
</TableBody>

View File

@@ -105,16 +105,16 @@ export const FlowRunsStatus: React.FC<{
<p>
<strong>Total run time:</strong>{" "}
{filteredFlowRuns.reduce(
(total, run) => total + run.total_run_time,
(total, run) => total + (run.stats?.node_exec_time ?? 0),
0,
)}{" "}
seconds
</p>
{filteredFlowRuns.some((r) => r.cost) && (
{filteredFlowRuns.some((r) => r.stats) && (
<p>
<strong>Total cost:</strong>{" "}
{filteredFlowRuns.reduce(
(total, run) => total + (run.cost ?? 0),
(total, run) => total + (run.stats?.cost ?? 0),
0,
)}{" "}
seconds

View File

@@ -81,11 +81,13 @@ export const FlowRunsTimeline = ({
<strong>Started:</strong>{" "}
{moment(data.started_at).format("YYYY-MM-DD HH:mm:ss")}
</p>
{data.stats && (
<p>
<strong>Duration / run time:</strong>{" "}
{formatDuration(data.duration)} /{" "}
{formatDuration(data.total_run_time)}
{formatDuration(data.stats.duration)} /{" "}
{formatDuration(data.stats.node_exec_time)}
</p>
)}
</Card>
);
}
@@ -99,8 +101,9 @@ export const FlowRunsTimeline = ({
.filter((e) => e.graph_id == flow.agent_id)
.map((e) => ({
...e,
time: e.started_at.getTime() + e.total_run_time * 1000,
_duration: e.total_run_time,
time:
e.started_at.getTime() + (e.stats?.node_exec_time ?? 0) * 1000,
_duration: e.stats?.node_exec_time ?? 0,
}))}
name={flow.name}
fill={`hsl(${(hashString(flow.id) * 137.5) % 360}, 70%, 50%)`}
@@ -120,7 +123,7 @@ export const FlowRunsTimeline = ({
{
...execution,
time: execution.ended_at.getTime(),
_duration: execution.total_run_time,
_duration: execution.stats?.node_exec_time ?? 0,
},
]}
stroke={`hsl(${(hashString(execution.graph_id) * 137.5) % 360}, 70%, 50%)`}

View File

@@ -249,15 +249,19 @@ export type LinkCreatable = Omit<Link, "id" | "is_static"> & {
/* Mirror of backend/data/execution.py:GraphExecutionMeta */
export type GraphExecutionMeta = {
id: GraphExecutionID;
started_at: Date;
ended_at: Date;
cost?: number;
duration: number;
total_run_time: number;
status: "QUEUED" | "RUNNING" | "COMPLETED" | "TERMINATED" | "FAILED";
user_id: UserID;
graph_id: GraphID;
graph_version: number;
preset_id?: string;
status: "QUEUED" | "RUNNING" | "COMPLETED" | "TERMINATED" | "FAILED";
started_at: Date;
ended_at: Date;
stats?: {
cost: number;
duration: number;
node_exec_time: number;
node_exec_count: number;
};
};
export type GraphExecutionID = Brand<string, "GraphExecutionID">;