fix(backend): Handle add execution API request failure (#9838)

There are cases where the publishing agent execution is failing, making
the agent execution appear to be stuck in a queue, but the execution has
never been in a queue in the first place.

### Changes 🏗️

On publishing failure, we set the graph & starting node execution status
to FAILED and let the UI bubble up the error so the user can try again.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Normal add execution flow
This commit is contained in:
Zamil Majdy
2025-04-18 20:35:43 +02:00
committed by GitHub
parent 055a231aed
commit c783f64b33
2 changed files with 61 additions and 27 deletions

View File

@@ -87,7 +87,7 @@ class LogMetadata:
"node_id": node_id,
"block_name": block_name,
}
self.prefix = f"[ExecutionManager|uid:{user_id}|gid:{graph_id}|nid:{node_id}]|geid:{graph_eid}|nid:{node_eid}|{block_name}]"
self.prefix = f"[ExecutionManager|uid:{user_id}|gid:{graph_id}|nid:{node_id}]|geid:{graph_eid}|neid:{node_eid}|{block_name}]"
def info(self, msg: str, **extra):
msg = self._wrap(msg, **extra)

View File

@@ -16,9 +16,13 @@ from backend.data.block_cost_config import BLOCK_COSTS
from backend.data.cost import BlockCostType
from backend.data.execution import (
AsyncRedisExecutionEventBus,
ExecutionStatus,
GraphExecutionStats,
GraphExecutionWithNodes,
RedisExecutionEventBus,
create_graph_execution,
update_graph_execution_stats,
update_node_execution_status_batch,
)
from backend.data.graph import GraphModel, Node, get_graph
from backend.data.model import CredentialsMetaInput
@@ -611,9 +615,6 @@ async def add_graph_execution_async(
Raises:
ValueError: If the graph is not found or if there are validation errors.
""" # noqa
execution_event_bus = get_async_execution_event_bus()
graph_execution_queue = await get_async_execution_queue()
graph: GraphModel | None = await get_graph(
graph_id=graph_id, user_id=user_id, version=graph_version
)
@@ -638,18 +639,34 @@ async def add_graph_execution_async(
),
preset_id=preset_id,
)
await execution_event_bus.publish(graph_exec)
try:
queue = await get_async_execution_queue()
graph_exec_entry = graph_exec.to_graph_execution_entry()
if node_credentials_input_map:
graph_exec_entry.node_credentials_input_map = node_credentials_input_map
await queue.publish_message(
routing_key=GRAPH_EXECUTION_ROUTING_KEY,
message=graph_exec_entry.model_dump_json(),
exchange=GRAPH_EXECUTION_EXCHANGE,
)
graph_exec_entry = graph_exec.to_graph_execution_entry()
if node_credentials_input_map:
graph_exec_entry.node_credentials_input_map = node_credentials_input_map
await graph_execution_queue.publish_message(
routing_key=GRAPH_EXECUTION_ROUTING_KEY,
message=graph_exec_entry.model_dump_json(),
exchange=GRAPH_EXECUTION_EXCHANGE,
)
bus = get_async_execution_event_bus()
await bus.publish(graph_exec)
return graph_exec
return graph_exec
except Exception as e:
logger.error(f"Unable to publish graph #{graph_id} exec #{graph_exec.id}: {e}")
await update_node_execution_status_batch(
[node_exec.node_exec_id for node_exec in graph_exec.node_executions],
ExecutionStatus.FAILED,
)
await update_graph_execution_stats(
graph_exec_id=graph_exec.id,
status=ExecutionStatus.FAILED,
stats=GraphExecutionStats(error=str(e)),
)
raise
def add_graph_execution(
@@ -675,8 +692,9 @@ def add_graph_execution(
GraphExecutionEntry: The entry for the graph execution.
Raises:
ValueError: If the graph is not found or if there are validation errors.
""" # noqa
graph: GraphModel | None = get_db_client().get_graph(
"""
db = get_db_client()
graph: GraphModel | None = db.get_graph(
graph_id=graph_id, user_id=user_id, version=graph_version
)
if not graph:
@@ -688,7 +706,7 @@ def add_graph_execution(
else None
)
graph_exec = get_db_client().create_graph_execution(
graph_exec = db.create_graph_execution(
user_id=user_id,
graph_id=graph_id,
graph_version=graph.version,
@@ -700,15 +718,31 @@ def add_graph_execution(
),
preset_id=preset_id,
)
get_execution_event_bus().publish(graph_exec)
try:
queue = get_execution_queue()
graph_exec_entry = graph_exec.to_graph_execution_entry()
if node_credentials_input_map:
graph_exec_entry.node_credentials_input_map = node_credentials_input_map
queue.publish_message(
routing_key=GRAPH_EXECUTION_ROUTING_KEY,
message=graph_exec_entry.model_dump_json(),
exchange=GRAPH_EXECUTION_EXCHANGE,
)
graph_exec_entry = graph_exec.to_graph_execution_entry()
if node_credentials_input_map:
graph_exec_entry.node_credentials_input_map = node_credentials_input_map
get_execution_queue().publish_message(
routing_key=GRAPH_EXECUTION_ROUTING_KEY,
message=graph_exec_entry.model_dump_json(),
exchange=GRAPH_EXECUTION_EXCHANGE,
)
bus = get_execution_event_bus()
bus.publish(graph_exec)
return graph_exec
return graph_exec
except Exception as e:
logger.error(f"Unable to publish graph #{graph_id} exec #{graph_exec.id}: {e}")
db.update_node_execution_status_batch(
[node_exec.node_exec_id for node_exec in graph_exec.node_executions],
ExecutionStatus.FAILED,
)
db.update_graph_execution_stats(
graph_exec_id=graph_exec.id,
status=ExecutionStatus.FAILED,
stats=GraphExecutionStats(error=str(e)),
)
raise