From 626fe17aac5cfc6b6b986dce52880c93b9f51fd0 Mon Sep 17 00:00:00 2001 From: majdyz Date: Mon, 13 Apr 2026 04:03:08 +0000 Subject: [PATCH] fix(orchestrator): resolve None future on swallowed errors; add missing tests - Move tool_node_stats None guard before node_exec_future.set_result so that when on_node_execution returns None (swallowed by @async_error_logged), the future carries set_exception(RuntimeError) rather than set_result(None), giving the tracking system an accurate error state - Remove redundant `tool_node_stats is not None` check that was dead code after the early-return guard was added - Add explanatory comment in _charge_extra_iterations_sync docstring explaining why the block lookup is intentionally repeated rather than cached from _charge_usage (two separate thread-pool workers, no shared mutable state) - Add assertion to test_on_node_execution_charges_extra_iterations_when_gate_passes verifying _handle_low_balance is called when extra_cost > 0 - Add test_on_node_execution_failed_ibe_sends_notification covering the FAILED + InsufficientBalanceError path in on_node_execution (lines 822-836) that was previously untested --- .../backend/backend/blocks/orchestrator.py | 29 +++++--- .../test_orchestrator_per_iteration_cost.py | 74 +++++++++++++++++++ .../backend/backend/executor/manager.py | 12 ++- 3 files changed, 102 insertions(+), 13 deletions(-) diff --git a/autogpt_platform/backend/backend/blocks/orchestrator.py b/autogpt_platform/backend/backend/blocks/orchestrator.py index 5262f8eb1d..1c40a7df96 100644 --- a/autogpt_platform/backend/backend/blocks/orchestrator.py +++ b/autogpt_platform/backend/backend/blocks/orchestrator.py @@ -1128,6 +1128,12 @@ class OrchestratorBlock(Block): # Execute the node directly since we're in the Orchestrator context. # Wrap in try/except so the future is always resolved, even on # error — an unresolved Future would block anything awaiting it. + # + # on_node_execution is decorated with @async_error_logged(swallow=True), + # which catches BaseException and returns None rather than raising. + # Treat a None return as a failure: set_exception so the future + # carries an error state rather than a None result, and return an + # error response so the LLM knows the tool failed. try: tool_node_stats = await execution_processor.on_node_execution( node_exec=node_exec_entry, @@ -1135,20 +1141,24 @@ class OrchestratorBlock(Block): nodes_input_masks=None, graph_stats_pair=graph_stats_pair, ) + if tool_node_stats is None: + nil_err = RuntimeError( + f"on_node_execution returned None for node {sink_node_id} " + "(error was swallowed by @async_error_logged)" + ) + node_exec_future.set_exception(nil_err) + resp = _create_tool_response( + tool_call.id, + "Tool execution returned no result", + responses_api=responses_api, + ) + resp["_is_error"] = True + return resp node_exec_future.set_result(tool_node_stats) except Exception as exec_err: node_exec_future.set_exception(exec_err) raise - if tool_node_stats is None: - resp = _create_tool_response( - tool_call.id, - "Tool execution returned no result", - responses_api=responses_api, - ) - resp["_is_error"] = True - return resp - # Charge user credits AFTER successful tool execution. Tools # spawned by the orchestrator bypass the main execution queue # (where _charge_usage is called), so we must charge here to @@ -1165,7 +1175,6 @@ class OrchestratorBlock(Block): # by the generic tool-error handler below. if ( not execution_params.execution_context.dry_run - and tool_node_stats is not None and tool_node_stats.error is None ): try: diff --git a/autogpt_platform/backend/backend/blocks/test/test_orchestrator_per_iteration_cost.py b/autogpt_platform/backend/backend/blocks/test/test_orchestrator_per_iteration_cost.py index c703e32c71..611ab54fac 100644 --- a/autogpt_platform/backend/backend/blocks/test/test_orchestrator_per_iteration_cost.py +++ b/autogpt_platform/backend/backend/blocks/test/test_orchestrator_per_iteration_cost.py @@ -540,6 +540,9 @@ async def test_on_node_execution_charges_extra_iterations_when_gate_passes( graph_stats_pair=stats_pair, ) assert calls["charge_extra_iterations"] == [2] + # _handle_low_balance must be called with the remaining balance returned by + # charge_extra_iterations (500) so users are alerted when balance drops low. + assert len(calls["handle_low_balance"]) == 1 @pytest.mark.asyncio @@ -832,3 +835,74 @@ async def test_tool_execution_insufficient_balance_propagates(): charge_node_usage_mock=raising_charge, ) assert isinstance(raised, InsufficientBalanceError) + + +# ── on_node_execution FAILED + InsufficientBalanceError notification ── + + +@pytest.mark.asyncio +async def test_on_node_execution_failed_ibe_sends_notification( + monkeypatch, + gated_processor, +): + """When status == FAILED and execution_stats.error is InsufficientBalanceError, + _handle_insufficient_funds_notif must be called. + + This path fires when a nested tool charge inside the orchestrator raises + InsufficientBalanceError, which propagates out of the block's run() generator + and is caught by _on_node_execution's broad except, setting status=FAILED and + execution_stats.error=IBE. on_node_execution's post-execution block then + sends the user notification so they understand why the run stopped. + """ + from backend.data.execution import ExecutionStatus + from backend.executor import manager + from backend.util.exceptions import InsufficientBalanceError + + proc, calls, inner, fake_db, NodeExecutionStats = gated_processor + ibe = InsufficientBalanceError( + user_id="u", + message="Insufficient balance", + balance=0, + amount=30, + ) + + # Simulate _on_node_execution returning FAILED with IBE in stats.error. + async def fake_inner_failed( + self, + *, + node, + node_exec, + node_exec_progress, + stats, + db_client, + log_metadata, + nodes_input_masks=None, + nodes_to_skip=None, + ): + stats.error = ibe + return MagicMock(wall_time=0.1, cpu_time=0.1), ExecutionStatus.FAILED + + monkeypatch.setattr( + manager.ExecutionProcessor, + "_on_node_execution", + fake_inner_failed, + ) + fake_db.get_node = AsyncMock(return_value=_FakeNode(extra_charges=0)) + + stats_pair = ( + MagicMock( + node_count=0, nodes_cputime=0, nodes_walltime=0, cost=0, node_error_count=0 + ), + threading.Lock(), + ) + await proc.on_node_execution( + node_exec=_make_node_exec(dry_run=False), + node_exec_progress=MagicMock(), + nodes_input_masks=None, + graph_stats_pair=stats_pair, + ) + # The notification must have fired so the user knows why their run stopped. + assert len(calls["handle_insufficient_funds_notif"]) == 1 + assert calls["handle_insufficient_funds_notif"][0]["user_id"] == "u" + # charge_extra_iterations must NOT be called — status is FAILED. + assert calls["charge_extra_iterations"] == [] diff --git a/autogpt_platform/backend/backend/executor/manager.py b/autogpt_platform/backend/backend/executor/manager.py index ca66fad63a..68f0ab7fb4 100644 --- a/autogpt_platform/backend/backend/executor/manager.py +++ b/autogpt_platform/backend/backend/executor/manager.py @@ -753,8 +753,7 @@ class ExecutionProcessor: ) except Exception as notif_error: # pragma: no cover log_metadata.warning( - f"Failed to send insufficient funds notification: " - f"{notif_error}" + f"Failed to send insufficient funds notification: {notif_error}" ) except Exception as e: # Unexpected billing failure (DB outage, network, etc.). @@ -832,7 +831,7 @@ class ExecutionProcessor: ) except Exception as notif_error: # pragma: no cover log_metadata.warning( - f"Failed to send insufficient funds notification: " f"{notif_error}" + f"Failed to send insufficient funds notification: {notif_error}" ) return execution_stats @@ -1153,6 +1152,13 @@ class ExecutionProcessor: Called only from :meth:`charge_extra_iterations`. Do not call directly from async code. + + Note: ``_resolve_block_cost`` is called again here (rather than + reusing the result from ``_charge_usage`` at the start of execution) + because the two calls happen in separate thread-pool workers and + sharing mutable state across workers would require locks. The block + config is immutable during a run, so the repeated lookup is safe and + produces the same cost; the only overhead is an extra registry lookup. """ db_client = get_db_client() block, cost, matching_filter = self._resolve_block_cost(node_exec)