fix(orchestrator): resolve None future on swallowed errors; add missing tests

- Move tool_node_stats None guard before node_exec_future.set_result so
  that when on_node_execution returns None (swallowed by @async_error_logged),
  the future carries set_exception(RuntimeError) rather than set_result(None),
  giving the tracking system an accurate error state
- Remove redundant `tool_node_stats is not None` check that was dead code
  after the early-return guard was added
- Add explanatory comment in _charge_extra_iterations_sync docstring explaining
  why the block lookup is intentionally repeated rather than cached from
  _charge_usage (two separate thread-pool workers, no shared mutable state)
- Add assertion to test_on_node_execution_charges_extra_iterations_when_gate_passes
  verifying _handle_low_balance is called when extra_cost > 0
- Add test_on_node_execution_failed_ibe_sends_notification covering the
  FAILED + InsufficientBalanceError path in on_node_execution (lines 822-836)
  that was previously untested
This commit is contained in:
majdyz
2026-04-13 04:03:08 +00:00
parent b62288655f
commit 626fe17aac
3 changed files with 102 additions and 13 deletions

View File

@@ -1128,6 +1128,12 @@ class OrchestratorBlock(Block):
# Execute the node directly since we're in the Orchestrator context.
# Wrap in try/except so the future is always resolved, even on
# error — an unresolved Future would block anything awaiting it.
#
# on_node_execution is decorated with @async_error_logged(swallow=True),
# which catches BaseException and returns None rather than raising.
# Treat a None return as a failure: set_exception so the future
# carries an error state rather than a None result, and return an
# error response so the LLM knows the tool failed.
try:
tool_node_stats = await execution_processor.on_node_execution(
node_exec=node_exec_entry,
@@ -1135,20 +1141,24 @@ class OrchestratorBlock(Block):
nodes_input_masks=None,
graph_stats_pair=graph_stats_pair,
)
if tool_node_stats is None:
nil_err = RuntimeError(
f"on_node_execution returned None for node {sink_node_id} "
"(error was swallowed by @async_error_logged)"
)
node_exec_future.set_exception(nil_err)
resp = _create_tool_response(
tool_call.id,
"Tool execution returned no result",
responses_api=responses_api,
)
resp["_is_error"] = True
return resp
node_exec_future.set_result(tool_node_stats)
except Exception as exec_err:
node_exec_future.set_exception(exec_err)
raise
if tool_node_stats is None:
resp = _create_tool_response(
tool_call.id,
"Tool execution returned no result",
responses_api=responses_api,
)
resp["_is_error"] = True
return resp
# Charge user credits AFTER successful tool execution. Tools
# spawned by the orchestrator bypass the main execution queue
# (where _charge_usage is called), so we must charge here to
@@ -1165,7 +1175,6 @@ class OrchestratorBlock(Block):
# by the generic tool-error handler below.
if (
not execution_params.execution_context.dry_run
and tool_node_stats is not None
and tool_node_stats.error is None
):
try:

View File

@@ -540,6 +540,9 @@ async def test_on_node_execution_charges_extra_iterations_when_gate_passes(
graph_stats_pair=stats_pair,
)
assert calls["charge_extra_iterations"] == [2]
# _handle_low_balance must be called with the remaining balance returned by
# charge_extra_iterations (500) so users are alerted when balance drops low.
assert len(calls["handle_low_balance"]) == 1
@pytest.mark.asyncio
@@ -832,3 +835,74 @@ async def test_tool_execution_insufficient_balance_propagates():
charge_node_usage_mock=raising_charge,
)
assert isinstance(raised, InsufficientBalanceError)
# ── on_node_execution FAILED + InsufficientBalanceError notification ──
@pytest.mark.asyncio
async def test_on_node_execution_failed_ibe_sends_notification(
monkeypatch,
gated_processor,
):
"""When status == FAILED and execution_stats.error is InsufficientBalanceError,
_handle_insufficient_funds_notif must be called.
This path fires when a nested tool charge inside the orchestrator raises
InsufficientBalanceError, which propagates out of the block's run() generator
and is caught by _on_node_execution's broad except, setting status=FAILED and
execution_stats.error=IBE. on_node_execution's post-execution block then
sends the user notification so they understand why the run stopped.
"""
from backend.data.execution import ExecutionStatus
from backend.executor import manager
from backend.util.exceptions import InsufficientBalanceError
proc, calls, inner, fake_db, NodeExecutionStats = gated_processor
ibe = InsufficientBalanceError(
user_id="u",
message="Insufficient balance",
balance=0,
amount=30,
)
# Simulate _on_node_execution returning FAILED with IBE in stats.error.
async def fake_inner_failed(
self,
*,
node,
node_exec,
node_exec_progress,
stats,
db_client,
log_metadata,
nodes_input_masks=None,
nodes_to_skip=None,
):
stats.error = ibe
return MagicMock(wall_time=0.1, cpu_time=0.1), ExecutionStatus.FAILED
monkeypatch.setattr(
manager.ExecutionProcessor,
"_on_node_execution",
fake_inner_failed,
)
fake_db.get_node = AsyncMock(return_value=_FakeNode(extra_charges=0))
stats_pair = (
MagicMock(
node_count=0, nodes_cputime=0, nodes_walltime=0, cost=0, node_error_count=0
),
threading.Lock(),
)
await proc.on_node_execution(
node_exec=_make_node_exec(dry_run=False),
node_exec_progress=MagicMock(),
nodes_input_masks=None,
graph_stats_pair=stats_pair,
)
# The notification must have fired so the user knows why their run stopped.
assert len(calls["handle_insufficient_funds_notif"]) == 1
assert calls["handle_insufficient_funds_notif"][0]["user_id"] == "u"
# charge_extra_iterations must NOT be called — status is FAILED.
assert calls["charge_extra_iterations"] == []

View File

@@ -753,8 +753,7 @@ class ExecutionProcessor:
)
except Exception as notif_error: # pragma: no cover
log_metadata.warning(
f"Failed to send insufficient funds notification: "
f"{notif_error}"
f"Failed to send insufficient funds notification: {notif_error}"
)
except Exception as e:
# Unexpected billing failure (DB outage, network, etc.).
@@ -832,7 +831,7 @@ class ExecutionProcessor:
)
except Exception as notif_error: # pragma: no cover
log_metadata.warning(
f"Failed to send insufficient funds notification: " f"{notif_error}"
f"Failed to send insufficient funds notification: {notif_error}"
)
return execution_stats
@@ -1153,6 +1152,13 @@ class ExecutionProcessor:
Called only from :meth:`charge_extra_iterations`. Do not call
directly from async code.
Note: ``_resolve_block_cost`` is called again here (rather than
reusing the result from ``_charge_usage`` at the start of execution)
because the two calls happen in separate thread-pool workers and
sharing mutable state across workers would require locks. The block
config is immutable during a run, so the repeated lookup is safe and
produces the same cost; the only overhead is an extra registry lookup.
"""
db_client = get_db_client()
block, cost, matching_filter = self._resolve_block_cost(node_exec)