fix(backend/executor): Fix node execution status and output persistence ordering (#10541)

The node execution status can be done before the output persistence,
making the output be persisted when the node execution status is already
completed.

### Changes 🏗️

* Re-order the node execution status & output persistence logic.
* Make agent.py avoid yielding the same node_exec_id twice (that can be
caused by the above issue).

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [x] Existing CI
This commit is contained in:
Zamil Majdy
2025-08-04 19:17:30 +04:00
committed by GitHub
parent b85e8204df
commit 6f8d0bfdf2
2 changed files with 32 additions and 43 deletions

View File

@@ -121,6 +121,7 @@ class AgentExecutorBlock(Block):
log_id = f"Graph #{graph_id}-V{graph_version}, exec-id: {graph_exec_id}"
logger.info(f"Starting execution of {log_id}")
yielded_node_exec_ids = set()
async for event in event_bus.listen(
user_id=user_id,
@@ -152,6 +153,14 @@ class AgentExecutorBlock(Block):
f"Execution {log_id} produced input {event.input_data} output {event.output_data}"
)
if event.node_exec_id in yielded_node_exec_ids:
logger.warning(
f"{log_id} received duplicate event for node execution {event.node_exec_id}"
)
continue
else:
yielded_node_exec_ids.add(event.node_exec_id)
if not event.block_id:
logger.warning(f"{log_id} received event without block_id {event}")
continue

View File

@@ -486,6 +486,16 @@ class Executor:
execution_stats=stats,
nodes_input_masks=nodes_input_masks,
):
await db_client.upsert_execution_output(
node_exec_id=node_exec.node_exec_id,
output_name=output_name,
output_data=output_data,
)
if exec_update := await db_client.get_node_execution(
node_exec.node_exec_id
):
await send_async_execution_update(exec_update)
node_exec_progress.add_output(
ExecutionOutputEntry(
node=node,
@@ -974,6 +984,7 @@ class Executor:
clean_exec_files(graph_exec_id)
@classmethod
@async_error_logged(swallow=True)
async def _process_node_output(
cls,
output: ExecutionOutputEntry,
@@ -995,49 +1006,18 @@ class Executor:
"""
db_client = get_db_async_client()
try:
name, data = output.data
await db_client.upsert_execution_output(
node_exec_id=output.node_exec_id,
output_name=name,
output_data=data,
)
if exec_update := await db_client.get_node_execution(output.node_exec_id):
await send_async_execution_update(exec_update)
log_metadata.debug(f"Enqueue nodes for {node_id}: {output}")
for next_execution in await _enqueue_next_nodes(
db_client=db_client,
node=output.node,
output=output.data,
user_id=graph_exec.user_id,
graph_exec_id=graph_exec.graph_exec_id,
graph_id=graph_exec.graph_id,
log_metadata=log_metadata,
nodes_input_masks=nodes_input_masks,
):
execution_queue.add(next_execution)
except asyncio.CancelledError as e:
log_metadata.warning(
f"Node execution {output.node_exec_id} was cancelled: {e}"
)
await async_update_node_execution_status(
db_client=db_client,
exec_id=output.node_exec_id,
status=ExecutionStatus.TERMINATED,
)
except Exception as e:
log_metadata.exception(f"Failed to process node output: {e}")
await db_client.upsert_execution_output(
node_exec_id=output.node_exec_id,
output_name="error",
output_data=str(e),
)
await async_update_node_execution_status(
db_client=db_client,
exec_id=output.node_exec_id,
status=ExecutionStatus.FAILED,
)
log_metadata.debug(f"Enqueue nodes for {node_id}: {output}")
for next_execution in await _enqueue_next_nodes(
db_client=db_client,
node=output.node,
output=output.data,
user_id=graph_exec.user_id,
graph_exec_id=graph_exec.graph_exec_id,
graph_id=graph_exec.graph_id,
log_metadata=log_metadata,
nodes_input_masks=nodes_input_masks,
):
execution_queue.add(next_execution)
@classmethod
def _handle_agent_run_notif(