diff --git a/autogpt_platform/backend/backend/blocks/orchestrator.py b/autogpt_platform/backend/backend/blocks/orchestrator.py index 1c40a7df96..34b4929879 100644 --- a/autogpt_platform/backend/backend/blocks/orchestrator.py +++ b/autogpt_platform/backend/backend/blocks/orchestrator.py @@ -365,9 +365,15 @@ def _disambiguate_tool_names(tools: list[dict[str, Any]]) -> None: class OrchestratorBlock(Block): - """ - A block that uses a language model to orchestrate tool calls, supporting both - single-shot and iterative agent mode execution. + """A block that uses a language model to orchestrate tool calls. + + Supports both single-shot and iterative agent mode execution. + + **InsufficientBalanceError propagation contract**: ``InsufficientBalanceError`` + (IBE) must always re-raise through every ``except`` block in this class. + Swallowing IBE would let the agent loop continue with unpaid work. Every + exception handler that catches ``Exception`` includes an explicit IBE + re-raise carve-out for this reason. """ def extra_credit_charges(self, execution_stats: NodeExecutionStats) -> int: @@ -1182,8 +1188,12 @@ class OrchestratorBlock(Block): node_exec_entry, ) except InsufficientBalanceError: + # IBE must propagate — see OrchestratorBlock class docstring. raise except Exception: + # Non-billing charge failures (DB outage, network, etc.) + # are logged with full traceback but surfaced to the LLM + # as a generic error to avoid leaking infrastructure details. logger.exception( "Unexpected error charging for tool node %s", sink_node_id, @@ -1210,17 +1220,15 @@ class OrchestratorBlock(Block): return resp except InsufficientBalanceError: - # Don't downgrade billing failures into tool errors — let the - # orchestrator's outer error handling stop the run cleanly, - # mirroring the behaviour of the main execution queue. Also - # prevents leaking exact balance amounts to the LLM context. + # IBE must propagate — see class docstring. raise except Exception as e: - logger.warning("Tool execution with manager failed: %s", e) - # Return error response + logger.warning("Tool execution with manager failed: %s", e, exc_info=True) + # Return a generic error to the LLM — internal exception messages + # may contain server paths, DB details, or infrastructure info. resp = _create_tool_response( tool_call.id, - f"Tool execution failed: {e}", + "Tool execution failed due to an internal error", responses_api=responses_api, ) resp["_is_error"] = True @@ -1324,7 +1332,7 @@ class OrchestratorBlock(Block): content = str(raw_content) else: content = "Tool executed successfully" - tool_failed = result.get("_is_error", False) + tool_failed = result.get("_is_error", True) return ToolCallResult( tool_call_id=tool_call.id, tool_name=tool_call.name, @@ -1332,11 +1340,7 @@ class OrchestratorBlock(Block): is_error=tool_failed, ) except InsufficientBalanceError: - # Billing failures must stop the agent loop cleanly — do NOT - # downgrade them into a tool error that gets fed back to the - # LLM. Re-raise so the orchestrator's outer error handling - # halts the run (mirrors main execution queue behaviour) and - # avoids leaking exact balance amounts into LLM context. + # IBE must propagate — see class docstring. raise except Exception as e: logger.error("Tool execution failed: %s", e) @@ -1458,11 +1462,7 @@ class OrchestratorBlock(Block): }, ) except InsufficientBalanceError: - # Billing failures must propagate out of the block so the - # executor's billing-leak handling fires (error recording on - # execution_stats, user notification, structured logging). - # Do NOT downgrade to a user-visible "error" output — that - # would swallow the failure and leak the exact balance amount. + # IBE must propagate — see class docstring. raise except Exception as e: # Catch all OTHER errors (validation, network, API) so that @@ -1545,17 +1545,13 @@ class OrchestratorBlock(Block): text = content else: text = json.dumps(content) - tool_failed = result.get("_is_error", False) + tool_failed = result.get("_is_error", True) return { "content": [{"type": "text", "text": text}], "isError": tool_failed, } except InsufficientBalanceError: - # Same carve-out as _agent_mode_tool_executor: - # billing failures must propagate to stop the run - # rather than be fed back to the LLM as a tool - # error (which would leak balance amounts and let - # the loop continue consuming unbillable work). + # IBE must propagate — see class docstring. raise except Exception as e: logger.error("SDK tool execution failed: %s", e) @@ -1836,12 +1832,8 @@ class OrchestratorBlock(Block): except (asyncio.CancelledError, StopAsyncIteration): pass except InsufficientBalanceError: - # Billing failures must propagate so the executor's - # billing-leak handling fires (error recording, user - # notification, structured logging). Mirrors the carve-out - # in _execute_tools_agent_mode — do not downgrade to a - # user-visible error output. The `finally` block below - # still runs and records partial token usage. + # IBE must propagate — see class docstring. The `finally` + # block below still runs and records partial token usage. raise except Exception as e: # Surface OTHER SDK errors as user-visible output instead diff --git a/autogpt_platform/backend/backend/blocks/test/test_orchestrator_per_iteration_cost.py b/autogpt_platform/backend/backend/blocks/test/test_orchestrator_per_iteration_cost.py index f166093d4c..3dc9e9b9ae 100644 --- a/autogpt_platform/backend/backend/blocks/test/test_orchestrator_per_iteration_cost.py +++ b/autogpt_platform/backend/backend/blocks/test/test_orchestrator_per_iteration_cost.py @@ -11,63 +11,50 @@ from unittest.mock import AsyncMock, MagicMock import pytest -from backend.blocks.orchestrator import OrchestratorBlock +from backend.blocks._base import Block +from backend.blocks.orchestrator import ExecutionParams, OrchestratorBlock +from backend.data.execution import ExecutionContext, ExecutionStatus +from backend.data.model import NodeExecutionStats +from backend.executor import manager +from backend.util.exceptions import InsufficientBalanceError # ── extra_credit_charges hook ──────────────────────────────────────── +class _NoOpBlock(Block): + """Minimal concrete Block subclass that does not override extra_credit_charges.""" + + def __init__(self): + super().__init__(id="noop-block", description="No-op test block") + + def run(self, input_data, **kwargs): # type: ignore[override] + yield "out", {} + + class TestExtraCreditCharges: """OrchestratorBlock opts into per-LLM-call billing via extra_credit_charges.""" def test_orchestrator_returns_nonzero_for_multiple_calls(self): - from backend.data.model import NodeExecutionStats - block = OrchestratorBlock() stats = NodeExecutionStats(llm_call_count=3) assert block.extra_credit_charges(stats) == 2 def test_orchestrator_returns_zero_for_single_call(self): - from backend.data.model import NodeExecutionStats - block = OrchestratorBlock() stats = NodeExecutionStats(llm_call_count=1) assert block.extra_credit_charges(stats) == 0 def test_orchestrator_returns_zero_for_zero_calls(self): - from backend.data.model import NodeExecutionStats - block = OrchestratorBlock() stats = NodeExecutionStats(llm_call_count=0) assert block.extra_credit_charges(stats) == 0 def test_default_block_returns_zero(self): - from backend.data.model import NodeExecutionStats - - # Use a concrete block (not the abstract Block base) to verify the - # default implementation returns 0. - block = OrchestratorBlock() - stats = NodeExecutionStats(llm_call_count=0) - # When llm_call_count=0, extra_credit_charges should clamp to 0. + """A block that does not override extra_credit_charges returns 0.""" + block = _NoOpBlock() + stats = NodeExecutionStats(llm_call_count=10) assert block.extra_credit_charges(stats) == 0 - # Also verify via Block.extra_credit_charges directly (method-level - # check) by calling the unbound method on an OrchestratorBlock - # instance with the base implementation patched out. - from unittest.mock import patch - - from backend.blocks._base import Block - - with patch.object( - OrchestratorBlock, - "extra_credit_charges", - Block.extra_credit_charges, - ): - base_block = OrchestratorBlock() - assert ( - base_block.extra_credit_charges(NodeExecutionStats(llm_call_count=10)) - == 0 - ) - # ── charge_extra_iterations math ─────────────────────────────────── @@ -91,9 +78,11 @@ def patched_processor(monkeypatch): Returns the processor and a list of credit amounts spent so tests can assert on what was charged. - """ - from backend.executor import manager + Note: ``ExecutionProcessor.__new__()`` bypasses ``__init__`` — if + ``__init__`` gains required state in the future this fixture will need + updating. + """ spent: list[int] = [] class FakeDb: @@ -156,7 +145,6 @@ class TestChargeExtraIterations: @pytest.mark.asyncio async def test_capped_at_max(self, monkeypatch, fake_node_exec): """Runaway llm_call_count is capped at _MAX_EXTRA_ITERATIONS.""" - from backend.executor import manager spent: list[int] = [] @@ -187,7 +175,6 @@ class TestChargeExtraIterations: @pytest.mark.asyncio async def test_zero_base_cost_skips_charge(self, monkeypatch, fake_node_exec): - from backend.executor import manager spent: list[int] = [] @@ -215,7 +202,6 @@ class TestChargeExtraIterations: @pytest.mark.asyncio async def test_block_not_found_skips_charge(self, monkeypatch, fake_node_exec): - from backend.executor import manager spent: list[int] = [] @@ -243,8 +229,6 @@ class TestChargeExtraIterations: self, monkeypatch, fake_node_exec ): """Out-of-credits errors must propagate, not be silently swallowed.""" - from backend.executor import manager - from backend.util.exceptions import InsufficientBalanceError class FakeDb: def spend_credits(self, *, user_id, cost, metadata): @@ -280,7 +264,6 @@ class TestChargeNodeUsage: self, monkeypatch, fake_node_exec ): """Nested tool charges should NOT inflate the per-execution counter.""" - from backend.executor import manager captured: dict = {} @@ -313,7 +296,6 @@ class TestChargeNodeUsage: self, monkeypatch, fake_node_exec ): """charge_node_usage should call _handle_low_balance when total_cost > 0.""" - from backend.executor import manager low_balance_calls: list[dict] = [] @@ -353,7 +335,6 @@ class TestChargeNodeUsage: self, monkeypatch, fake_node_exec ): """charge_node_usage should NOT call _handle_low_balance when cost is 0.""" - from backend.executor import manager low_balance_calls: list = [] @@ -418,9 +399,6 @@ def gated_processor(monkeypatch): llm_call_count, dry_run) and observe whether charge_extra_iterations was called. """ - from backend.data.execution import ExecutionStatus - from backend.data.model import NodeExecutionStats - from backend.executor import manager calls: dict[str, list] = { "charge_extra_iterations": [], @@ -492,9 +470,7 @@ def gated_processor(monkeypatch): fake_charge_extra, ) - def fake_low_balance( - self, db_client, user_id, current_balance, transaction_cost - ): + def fake_low_balance(self, db_client, user_id, current_balance, transaction_cost): calls["handle_low_balance"].append( { "user_id": user_id, @@ -528,7 +504,6 @@ async def test_on_node_execution_charges_extra_iterations_when_gate_passes( gated_processor, ): """COMPLETED + extra_credit_charges > 0 + not dry_run → charged.""" - from backend.data.execution import ExecutionStatus proc, calls, inner, fake_db, _ = gated_processor inner["status"] = ExecutionStatus.COMPLETED @@ -555,7 +530,6 @@ async def test_on_node_execution_charges_extra_iterations_when_gate_passes( @pytest.mark.asyncio async def test_on_node_execution_skips_when_status_not_completed(gated_processor): - from backend.data.execution import ExecutionStatus proc, calls, inner, fake_db, _ = gated_processor inner["status"] = ExecutionStatus.FAILED @@ -579,7 +553,6 @@ async def test_on_node_execution_skips_when_status_not_completed(gated_processor @pytest.mark.asyncio async def test_on_node_execution_skips_when_extra_charges_zero(gated_processor): - from backend.data.execution import ExecutionStatus proc, calls, inner, fake_db, _ = gated_processor inner["status"] = ExecutionStatus.COMPLETED @@ -604,7 +577,6 @@ async def test_on_node_execution_skips_when_extra_charges_zero(gated_processor): @pytest.mark.asyncio async def test_on_node_execution_skips_when_dry_run(gated_processor): - from backend.data.execution import ExecutionStatus proc, calls, inner, fake_db, _ = gated_processor inner["status"] = ExecutionStatus.COMPLETED @@ -640,9 +612,6 @@ async def test_on_node_execution_insufficient_balance_records_error_and_notifies - _handle_insufficient_funds_notif is called so the user is notified - the structured ERROR log is the alerting hook """ - from backend.data.execution import ExecutionStatus - from backend.executor import manager - from backend.util.exceptions import InsufficientBalanceError proc, calls, inner, fake_db, _ = gated_processor inner["status"] = ExecutionStatus.COMPLETED @@ -698,13 +667,9 @@ async def _run_tool_exec_with_stats( Used to prove the dry_run and error guards around charge_node_usage behave as documented, and that InsufficientBalanceError propagates. """ - import threading from collections import defaultdict from unittest.mock import AsyncMock, MagicMock, patch - from backend.blocks.orchestrator import ExecutionParams, OrchestratorBlock - from backend.data.execution import ExecutionContext - block = OrchestratorBlock() # Mocked async DB client used inside orchestrator. @@ -828,10 +793,6 @@ async def test_tool_execution_insufficient_balance_propagates(): If this leaked into a ToolCallResult the LLM loop would keep running with 'tool failed' errors and the user would get unpaid work. """ - from unittest.mock import AsyncMock - - from backend.util.exceptions import InsufficientBalanceError - raising_charge = AsyncMock( side_effect=InsufficientBalanceError( user_id="u", message="nope", balance=0, amount=10 @@ -862,9 +823,6 @@ async def test_on_node_execution_failed_ibe_sends_notification( execution_stats.error=IBE. on_node_execution's post-execution block then sends the user notification so they understand why the run stopped. """ - from backend.data.execution import ExecutionStatus - from backend.executor import manager - from backend.util.exceptions import InsufficientBalanceError proc, calls, inner, fake_db, NodeExecutionStats = gated_processor ibe = InsufficientBalanceError( @@ -914,3 +872,102 @@ async def test_on_node_execution_failed_ibe_sends_notification( assert calls["handle_insufficient_funds_notif"][0]["user_id"] == "u" # charge_extra_iterations must NOT be called — status is FAILED. assert calls["charge_extra_iterations"] == [] + + +# ── Billing leak: non-IBE exception during extra-iteration charging ── + + +@pytest.mark.asyncio +async def test_on_node_execution_non_ibe_billing_failure_keeps_completed( + monkeypatch, + gated_processor, +): + """When charge_extra_iterations raises a non-IBE exception (e.g. DB outage): + + - execution_stats.error stays None (node ran to completion) + - status stays COMPLETED (work already done) + - the billing_leak error is logged but does not corrupt execution_stats + """ + proc, calls, inner, fake_db, _ = gated_processor + inner["status"] = ExecutionStatus.COMPLETED + inner["llm_call_count"] = 4 + fake_db.get_node = AsyncMock(return_value=_FakeNode(extra_charges=3)) + + async def raise_conn_error(self, node_exec, extra_iterations): + raise ConnectionError("DB connection lost") + + monkeypatch.setattr( + manager.ExecutionProcessor, "charge_extra_iterations", raise_conn_error + ) + + stats_pair = ( + MagicMock( + node_count=0, + nodes_cputime=0, + nodes_walltime=0, + cost=0, + node_error_count=0, + ), + threading.Lock(), + ) + result_stats = await proc.on_node_execution( + node_exec=_make_node_exec(dry_run=False), + node_exec_progress=MagicMock(), + nodes_input_masks=None, + graph_stats_pair=stats_pair, + ) + # error stays None — node completed, only billing failed. + assert result_stats.error is None + # No notification was sent (only IBE triggers notification). + assert len(calls["handle_insufficient_funds_notif"]) == 0 + + +# ── _charge_usage with execution_count=0 ── + + +class TestChargeUsageZeroExecutionCount: + """Verify _charge_usage(node_exec, 0) does not invoke execution_usage_cost.""" + + def test_execution_count_zero_skips_execution_tier(self, monkeypatch): + """_charge_usage with execution_count=0 must not call execution_usage_cost.""" + execution_tier_called = [] + + def fake_execution_usage_cost(count): + execution_tier_called.append(count) + return (100, count) + + spent: list[int] = [] + + class FakeDb: + def spend_credits(self, *, user_id, cost, metadata): + spent.append(cost) + return 500 + + fake_block = MagicMock() + fake_block.name = "FakeBlock" + + monkeypatch.setattr(manager, "get_db_client", lambda: FakeDb()) + monkeypatch.setattr(manager, "get_block", lambda block_id: fake_block) + monkeypatch.setattr( + manager, + "block_usage_cost", + lambda block, input_data, **_kw: (10, {}), + ) + monkeypatch.setattr(manager, "execution_usage_cost", fake_execution_usage_cost) + + proc = manager.ExecutionProcessor.__new__(manager.ExecutionProcessor) + ne = MagicMock() + ne.user_id = "u" + ne.graph_exec_id = "ge" + ne.graph_id = "g" + ne.node_exec_id = "ne" + ne.node_id = "n" + ne.block_id = "b" + ne.inputs = {} + + total_cost, remaining = proc._charge_usage(ne, 0) + assert total_cost == 10 # block cost only + assert remaining == 500 + assert spent == [10] + # execution_usage_cost must NOT have been called + assert execution_tier_called == [] diff --git a/autogpt_platform/backend/backend/executor/manager.py b/autogpt_platform/backend/backend/executor/manager.py index 68f0ab7fb4..6e4b3e2aad 100644 --- a/autogpt_platform/backend/backend/executor/manager.py +++ b/autogpt_platform/backend/backend/executor/manager.py @@ -19,7 +19,7 @@ from sentry_sdk.api import flush as _sentry_flush from sentry_sdk.api import get_current_scope as _sentry_get_current_scope from backend.blocks import get_block -from backend.blocks._base import BlockSchema +from backend.blocks._base import Block, BlockSchema from backend.blocks.agent import AgentExecutorBlock from backend.blocks.io import AgentOutputBlock from backend.blocks.mcp.block import MCPToolBlock @@ -681,100 +681,9 @@ class ExecutionProcessor: execution_stats.walltime = timing_info.wall_time execution_stats.cputime = timing_info.cpu_time - # Charge extra iterations for blocks that opt into per-LLM-call - # billing (e.g. OrchestratorBlock in agent mode). The first call - # is already covered by _charge_usage(); each additional LLM call - # costs another base_cost. Skipped for dry runs and failed runs. - # - # InsufficientBalanceError here is a post-hoc billing leak — the - # work is already done but the user can no longer pay. We: - # 1. log at ERROR with structured fields so alerting can catch it - # 2. fire _handle_insufficient_funds_notif so the user is - # notified (mirrors the main queue path at ~line 1254) - # The run itself is kept COMPLETED (the block's outputs are - # already committed) — matching the documented "billing leak" - # contract rather than retroactively failing a successful run. - extra_iterations = ( - node.block.extra_credit_charges(execution_stats) - if status == ExecutionStatus.COMPLETED - and not node_exec.execution_context.dry_run - else 0 + await self._handle_post_execution_billing( + node, node_exec, execution_stats, status, log_metadata ) - if extra_iterations > 0: - try: - extra_cost, remaining_balance = await self.charge_extra_iterations( - node_exec, - extra_iterations, - ) - if extra_cost > 0: - execution_stats.extra_cost += extra_cost - # Mirror the low-balance notification path in - # ``charge_node_usage`` so users are alerted when - # their balance crosses the threshold from these - # extra-iteration charges (sentry HIGH/MEDIUM bug - # PRRT_kwDOJKSTjM56Mibw). - await asyncio.to_thread( - self._handle_low_balance, - get_db_client(), - node_exec.user_id, - remaining_balance, - extra_cost, - ) - except InsufficientBalanceError as e: - log_metadata.error( - "billing_leak: insufficient balance after " - f"{node.block.name} completed {extra_iterations} " - f"extra iterations", - extra={ - "billing_leak": True, - "user_id": node_exec.user_id, - "graph_id": node_exec.graph_id, - "block_id": node_exec.block_id, - "extra_iterations": extra_iterations, - "error": str(e), - }, - ) - # NOTE: Do NOT set execution_stats.error here. The node ran - # to completion — only the post-hoc charge failed. Setting - # .error would (a) flip node_error_count below, creating an - # "errored COMPLETED node" inconsistency in the metrics, and - # (b) leak balance amounts into the persisted node_stats. - # The structured ERROR log above is the alerting hook; - # node_stats stays clean. - # Notify the user they're out of credits. Runs through - # Redis dedup (per user+graph) so repeat runs don't spam. - try: - await asyncio.to_thread( - self._handle_insufficient_funds_notif, - get_db_client(), - node_exec.user_id, - node_exec.graph_id, - e, - ) - except Exception as notif_error: # pragma: no cover - log_metadata.warning( - f"Failed to send insufficient funds notification: {notif_error}" - ) - except Exception as e: - # Unexpected billing failure (DB outage, network, etc.). - # Log at ERROR with structured fields and the same - # `billing_leak: True` marker so monitoring catches it - # alongside InsufficientBalanceError. The run is kept - # COMPLETED because the work is already done. - log_metadata.error( - f"billing_leak: failed to charge extra iterations " - f"for {node.block.name}", - extra={ - "billing_leak": True, - "user_id": node_exec.user_id, - "graph_id": node_exec.graph_id, - "block_id": node_exec.block_id, - "extra_iterations": extra_iterations, - "error_type": type(e).__name__, - "error": str(e), - }, - exc_info=True, - ) graph_stats, graph_stats_lock = graph_stats_pair with graph_stats_lock: @@ -811,31 +720,121 @@ class ExecutionProcessor: db_client=db_client, ) - # If the node failed because a nested tool charge raised - # InsufficientBalanceError (orchestrator agent mode), the main - # queue's _charge_usage IBE notification path was bypassed (the - # initial charge succeeded — only the nested tool charge failed). - # Send the user notification here so they understand why their - # agent run stopped. Failures are logged but not raised so the - # node-execution path stays clean. + # If the node failed because a nested tool charge raised IBE, + # send the user notification so they understand why the run stopped. if status == ExecutionStatus.FAILED and isinstance( execution_stats.error, InsufficientBalanceError ): - try: - await asyncio.to_thread( - self._handle_insufficient_funds_notif, - get_db_client(), - node_exec.user_id, - node_exec.graph_id, - execution_stats.error, - ) - except Exception as notif_error: # pragma: no cover - log_metadata.warning( - f"Failed to send insufficient funds notification: {notif_error}" - ) + await self._try_send_insufficient_funds_notif( + node_exec.user_id, + node_exec.graph_id, + execution_stats.error, + log_metadata, + ) return execution_stats + async def _try_send_insufficient_funds_notif( + self, + user_id: str, + graph_id: str, + error: InsufficientBalanceError, + log_metadata: LogMetadata, + ) -> None: + """Send an insufficient-funds notification, swallowing failures.""" + try: + await asyncio.to_thread( + self._handle_insufficient_funds_notif, + get_db_client(), + user_id, + graph_id, + error, + ) + except Exception as notif_error: # pragma: no cover + log_metadata.warning( + f"Failed to send insufficient funds notification: {notif_error}" + ) + + async def _handle_post_execution_billing( + self, + node: Node, + node_exec: NodeExecutionEntry, + execution_stats: NodeExecutionStats, + status: ExecutionStatus, + log_metadata: LogMetadata, + ) -> None: + """Charge extra iterations for blocks that opt into per-LLM-call billing. + + The first LLM call is already covered by ``_charge_usage()``; each + additional call costs another ``base_cost``. Skipped for dry runs and + failed runs. + + InsufficientBalanceError here is a post-hoc billing leak: the work is + already done but the user can no longer pay. The run stays COMPLETED and + the error is logged with ``billing_leak: True`` for alerting. + """ + extra_iterations = ( + node.block.extra_credit_charges(execution_stats) + if status == ExecutionStatus.COMPLETED + and not node_exec.execution_context.dry_run + else 0 + ) + if extra_iterations <= 0: + return + + try: + extra_cost, remaining_balance = await self.charge_extra_iterations( + node_exec, + extra_iterations, + ) + if extra_cost > 0: + execution_stats.extra_cost += extra_cost + await asyncio.to_thread( + self._handle_low_balance, + get_db_client(), + node_exec.user_id, + remaining_balance, + extra_cost, + ) + except InsufficientBalanceError as e: + log_metadata.error( + "billing_leak: insufficient balance after " + f"{node.block.name} completed {extra_iterations} " + f"extra iterations", + extra={ + "billing_leak": True, + "user_id": node_exec.user_id, + "graph_id": node_exec.graph_id, + "block_id": node_exec.block_id, + "extra_iterations": extra_iterations, + "error": str(e), + }, + ) + # Do NOT set execution_stats.error — the node ran to completion, + # only the post-hoc charge failed. See class-level billing-leak + # contract documentation. + await self._try_send_insufficient_funds_notif( + node_exec.user_id, + node_exec.graph_id, + e, + log_metadata, + ) + except Exception as e: + log_metadata.error( + f"billing_leak: failed to charge extra iterations " + f"for {node.block.name}", + extra={ + "billing_leak": True, + "user_id": node_exec.user_id, + "graph_id": node_exec.graph_id, + "block_id": node_exec.block_id, + "extra_iterations": extra_iterations, + "error_type": type(e).__name__, + "error": str(e), + }, + exc_info=True, + ) + @async_time_measured async def _on_node_execution( self, @@ -1065,7 +1064,7 @@ class ExecutionProcessor: def _resolve_block_cost( self, node_exec: NodeExecutionEntry, - ) -> tuple[Any, int, dict]: + ) -> tuple[Block | None, int, dict]: """Look up the block and compute its base usage cost for an exec. Shared by :meth:`_charge_usage` and :meth:`charge_extra_iterations` @@ -1211,6 +1210,22 @@ class ExecutionProcessor: self._charge_extra_iterations_sync, node_exec, capped ) + def _charge_and_check_balance( + self, + node_exec: NodeExecutionEntry, + ) -> tuple[int, int]: + """Charge usage and check low balance in a single thread-pool worker. + + Combines ``_charge_usage`` and ``_handle_low_balance`` to avoid + dispatching two thread-pool calls per tool execution. + """ + total_cost, remaining = self._charge_usage(node_exec, 0) + if total_cost > 0: + self._handle_low_balance( + get_db_client(), node_exec.user_id, remaining, total_cost + ) + return total_cost, remaining + async def charge_node_usage( self, node_exec: NodeExecutionEntry, @@ -1229,18 +1244,7 @@ class ExecutionProcessor: sub-steps of a single block run from the user's perspective and should not push them into higher per-execution cost tiers. """ - total_cost, remaining = await asyncio.to_thread( - self._charge_usage, node_exec, 0 - ) - if total_cost > 0: - await asyncio.to_thread( - self._handle_low_balance, - get_db_client(), - node_exec.user_id, - remaining, - total_cost, - ) - return total_cost, remaining + return await asyncio.to_thread(self._charge_and_check_balance, node_exec) @time_measured def _on_graph_execution(