feat(platform): Add Graph Execution error data & status (#8250)

This commit is contained in:
Zamil Majdy
2024-10-04 13:15:54 +04:00
committed by GitHub
parent d3266d003d
commit 1e3b1dad06
2 changed files with 40 additions and 13 deletions

View File

@@ -268,10 +268,29 @@ async def update_graph_execution_start_time(graph_exec_id: str):
)
async def update_graph_execution_stats(graph_exec_id: str, stats: dict[str, Any]):
async def update_graph_execution_stats(
graph_exec_id: str,
error: Exception | None,
wall_time: float,
cpu_time: float,
node_count: int,
):
status = ExecutionStatus.FAILED if error else ExecutionStatus.COMPLETED
stats = (
{
"walltime": wall_time,
"cputime": cpu_time,
"nodecount": node_count,
"error": str(error) if error else None,
},
)
await AgentGraphExecution.prisma().update(
where={"id": graph_exec_id},
data={"executionStatus": ExecutionStatus.COMPLETED, "stats": json.dumps(stats)},
data={
"executionStatus": status,
"stats": json.dumps(stats),
},
)

View File

@@ -561,18 +561,17 @@ class Executor:
node_eid="*",
block_name="-",
)
timing_info, node_count = cls._on_graph_execution(
timing_info, (node_count, error) = cls._on_graph_execution(
graph_exec, cancel, log_metadata
)
cls.loop.run_until_complete(
update_graph_execution_stats(
graph_exec.graph_exec_id,
{
"walltime": timing_info.wall_time,
"cputime": timing_info.cpu_time,
"nodecount": node_count,
},
graph_exec_id=graph_exec.graph_exec_id,
error=error,
wall_time=timing_info.wall_time,
cpu_time=timing_info.cpu_time,
node_count=node_count,
)
)
@@ -583,9 +582,15 @@ class Executor:
graph_exec: GraphExecution,
cancel: threading.Event,
log_metadata: LogMetadata,
) -> int:
) -> tuple[int, Exception | None]:
"""
Returns:
The number of node executions completed.
The error that occurred during the execution.
"""
log_metadata.info(f"Start graph execution {graph_exec.graph_exec_id}")
n_node_executions = 0
error = None
finished = False
def cancel_handler():
@@ -619,7 +624,8 @@ class Executor:
while not queue.empty():
if cancel.is_set():
return n_node_executions
error = RuntimeError("Execution is cancelled")
return n_node_executions, error
exec_data = queue.get()
@@ -653,7 +659,8 @@ class Executor:
)
for node_id, execution in list(running_executions.items()):
if cancel.is_set():
return n_node_executions
error = RuntimeError("Execution is cancelled")
return n_node_executions, error
if not queue.empty():
break # yield to parent loop to execute new queue items
@@ -666,12 +673,13 @@ class Executor:
log_metadata.exception(
f"Failed graph execution {graph_exec.graph_exec_id}: {e}"
)
error = e
finally:
if not cancel.is_set():
finished = True
cancel.set()
cancel_thread.join()
return n_node_executions
return n_node_executions, error
class ExecutionManager(AppService):