fix(backend): address review feedback on orchestrator billing

- Extract post-execution billing into _handle_post_execution_billing()
- Deduplicate IBE notification into _try_send_insufficient_funds_notif()
- Combine _charge_usage + _handle_low_balance into single thread dispatch
- Sanitize error messages to LLM (no internal details leaked)
- Default _is_error to True (fail-closed) for tool responses
- Add IBE propagation contract to OrchestratorBlock class docstring
- Reduce per-site IBE comments to one-liners referencing class docstring
- Fix _resolve_block_cost return type annotation (Block | None)
- Move test imports to module level, fix test_default_block_returns_zero
- Add tests for non-IBE billing failure and _charge_usage(count=0)
- Fix Black formatting (CI lint blocker)
This commit is contained in:
majdyz
2026-04-13 06:44:20 +00:00
parent e901b64bed
commit 5ff46ff207
3 changed files with 278 additions and 225 deletions

View File

@@ -365,9 +365,15 @@ def _disambiguate_tool_names(tools: list[dict[str, Any]]) -> None:
class OrchestratorBlock(Block):
"""
A block that uses a language model to orchestrate tool calls, supporting both
single-shot and iterative agent mode execution.
"""A block that uses a language model to orchestrate tool calls.
Supports both single-shot and iterative agent mode execution.
**InsufficientBalanceError propagation contract**: ``InsufficientBalanceError``
(IBE) must always re-raise through every ``except`` block in this class.
Swallowing IBE would let the agent loop continue with unpaid work. Every
exception handler that catches ``Exception`` includes an explicit IBE
re-raise carve-out for this reason.
"""
def extra_credit_charges(self, execution_stats: NodeExecutionStats) -> int:
@@ -1182,8 +1188,12 @@ class OrchestratorBlock(Block):
node_exec_entry,
)
except InsufficientBalanceError:
# IBE must propagate — see OrchestratorBlock class docstring.
raise
except Exception:
# Non-billing charge failures (DB outage, network, etc.)
# are logged with full traceback but surfaced to the LLM
# as a generic error to avoid leaking infrastructure details.
logger.exception(
"Unexpected error charging for tool node %s",
sink_node_id,
@@ -1210,17 +1220,15 @@ class OrchestratorBlock(Block):
return resp
except InsufficientBalanceError:
# Don't downgrade billing failures into tool errors — let the
# orchestrator's outer error handling stop the run cleanly,
# mirroring the behaviour of the main execution queue. Also
# prevents leaking exact balance amounts to the LLM context.
# IBE must propagate — see class docstring.
raise
except Exception as e:
logger.warning("Tool execution with manager failed: %s", e)
# Return error response
logger.warning("Tool execution with manager failed: %s", e, exc_info=True)
# Return a generic error to the LLM — internal exception messages
# may contain server paths, DB details, or infrastructure info.
resp = _create_tool_response(
tool_call.id,
f"Tool execution failed: {e}",
"Tool execution failed due to an internal error",
responses_api=responses_api,
)
resp["_is_error"] = True
@@ -1324,7 +1332,7 @@ class OrchestratorBlock(Block):
content = str(raw_content)
else:
content = "Tool executed successfully"
tool_failed = result.get("_is_error", False)
tool_failed = result.get("_is_error", True)
return ToolCallResult(
tool_call_id=tool_call.id,
tool_name=tool_call.name,
@@ -1332,11 +1340,7 @@ class OrchestratorBlock(Block):
is_error=tool_failed,
)
except InsufficientBalanceError:
# Billing failures must stop the agent loop cleanly — do NOT
# downgrade them into a tool error that gets fed back to the
# LLM. Re-raise so the orchestrator's outer error handling
# halts the run (mirrors main execution queue behaviour) and
# avoids leaking exact balance amounts into LLM context.
# IBE must propagate — see class docstring.
raise
except Exception as e:
logger.error("Tool execution failed: %s", e)
@@ -1458,11 +1462,7 @@ class OrchestratorBlock(Block):
},
)
except InsufficientBalanceError:
# Billing failures must propagate out of the block so the
# executor's billing-leak handling fires (error recording on
# execution_stats, user notification, structured logging).
# Do NOT downgrade to a user-visible "error" output — that
# would swallow the failure and leak the exact balance amount.
# IBE must propagate — see class docstring.
raise
except Exception as e:
# Catch all OTHER errors (validation, network, API) so that
@@ -1545,17 +1545,13 @@ class OrchestratorBlock(Block):
text = content
else:
text = json.dumps(content)
tool_failed = result.get("_is_error", False)
tool_failed = result.get("_is_error", True)
return {
"content": [{"type": "text", "text": text}],
"isError": tool_failed,
}
except InsufficientBalanceError:
# Same carve-out as _agent_mode_tool_executor:
# billing failures must propagate to stop the run
# rather than be fed back to the LLM as a tool
# error (which would leak balance amounts and let
# the loop continue consuming unbillable work).
# IBE must propagate — see class docstring.
raise
except Exception as e:
logger.error("SDK tool execution failed: %s", e)
@@ -1836,12 +1832,8 @@ class OrchestratorBlock(Block):
except (asyncio.CancelledError, StopAsyncIteration):
pass
except InsufficientBalanceError:
# Billing failures must propagate so the executor's
# billing-leak handling fires (error recording, user
# notification, structured logging). Mirrors the carve-out
# in _execute_tools_agent_mode — do not downgrade to a
# user-visible error output. The `finally` block below
# still runs and records partial token usage.
# IBE must propagate — see class docstring. The `finally`
# block below still runs and records partial token usage.
raise
except Exception as e:
# Surface OTHER SDK errors as user-visible output instead

View File

@@ -11,63 +11,50 @@ from unittest.mock import AsyncMock, MagicMock
import pytest
from backend.blocks.orchestrator import OrchestratorBlock
from backend.blocks._base import Block
from backend.blocks.orchestrator import ExecutionParams, OrchestratorBlock
from backend.data.execution import ExecutionContext, ExecutionStatus
from backend.data.model import NodeExecutionStats
from backend.executor import manager
from backend.util.exceptions import InsufficientBalanceError
# ── extra_credit_charges hook ────────────────────────────────────────
class _NoOpBlock(Block):
"""Minimal concrete Block subclass that does not override extra_credit_charges."""
def __init__(self):
super().__init__(id="noop-block", description="No-op test block")
def run(self, input_data, **kwargs): # type: ignore[override]
yield "out", {}
class TestExtraCreditCharges:
"""OrchestratorBlock opts into per-LLM-call billing via extra_credit_charges."""
def test_orchestrator_returns_nonzero_for_multiple_calls(self):
from backend.data.model import NodeExecutionStats
block = OrchestratorBlock()
stats = NodeExecutionStats(llm_call_count=3)
assert block.extra_credit_charges(stats) == 2
def test_orchestrator_returns_zero_for_single_call(self):
from backend.data.model import NodeExecutionStats
block = OrchestratorBlock()
stats = NodeExecutionStats(llm_call_count=1)
assert block.extra_credit_charges(stats) == 0
def test_orchestrator_returns_zero_for_zero_calls(self):
from backend.data.model import NodeExecutionStats
block = OrchestratorBlock()
stats = NodeExecutionStats(llm_call_count=0)
assert block.extra_credit_charges(stats) == 0
def test_default_block_returns_zero(self):
from backend.data.model import NodeExecutionStats
# Use a concrete block (not the abstract Block base) to verify the
# default implementation returns 0.
block = OrchestratorBlock()
stats = NodeExecutionStats(llm_call_count=0)
# When llm_call_count=0, extra_credit_charges should clamp to 0.
"""A block that does not override extra_credit_charges returns 0."""
block = _NoOpBlock()
stats = NodeExecutionStats(llm_call_count=10)
assert block.extra_credit_charges(stats) == 0
# Also verify via Block.extra_credit_charges directly (method-level
# check) by calling the unbound method on an OrchestratorBlock
# instance with the base implementation patched out.
from unittest.mock import patch
from backend.blocks._base import Block
with patch.object(
OrchestratorBlock,
"extra_credit_charges",
Block.extra_credit_charges,
):
base_block = OrchestratorBlock()
assert (
base_block.extra_credit_charges(NodeExecutionStats(llm_call_count=10))
== 0
)
# ── charge_extra_iterations math ───────────────────────────────────
@@ -91,9 +78,11 @@ def patched_processor(monkeypatch):
Returns the processor and a list of credit amounts spent so tests can
assert on what was charged.
"""
from backend.executor import manager
Note: ``ExecutionProcessor.__new__()`` bypasses ``__init__`` — if
``__init__`` gains required state in the future this fixture will need
updating.
"""
spent: list[int] = []
class FakeDb:
@@ -156,7 +145,6 @@ class TestChargeExtraIterations:
@pytest.mark.asyncio
async def test_capped_at_max(self, monkeypatch, fake_node_exec):
"""Runaway llm_call_count is capped at _MAX_EXTRA_ITERATIONS."""
from backend.executor import manager
spent: list[int] = []
@@ -187,7 +175,6 @@ class TestChargeExtraIterations:
@pytest.mark.asyncio
async def test_zero_base_cost_skips_charge(self, monkeypatch, fake_node_exec):
from backend.executor import manager
spent: list[int] = []
@@ -215,7 +202,6 @@ class TestChargeExtraIterations:
@pytest.mark.asyncio
async def test_block_not_found_skips_charge(self, monkeypatch, fake_node_exec):
from backend.executor import manager
spent: list[int] = []
@@ -243,8 +229,6 @@ class TestChargeExtraIterations:
self, monkeypatch, fake_node_exec
):
"""Out-of-credits errors must propagate, not be silently swallowed."""
from backend.executor import manager
from backend.util.exceptions import InsufficientBalanceError
class FakeDb:
def spend_credits(self, *, user_id, cost, metadata):
@@ -280,7 +264,6 @@ class TestChargeNodeUsage:
self, monkeypatch, fake_node_exec
):
"""Nested tool charges should NOT inflate the per-execution counter."""
from backend.executor import manager
captured: dict = {}
@@ -313,7 +296,6 @@ class TestChargeNodeUsage:
self, monkeypatch, fake_node_exec
):
"""charge_node_usage should call _handle_low_balance when total_cost > 0."""
from backend.executor import manager
low_balance_calls: list[dict] = []
@@ -353,7 +335,6 @@ class TestChargeNodeUsage:
self, monkeypatch, fake_node_exec
):
"""charge_node_usage should NOT call _handle_low_balance when cost is 0."""
from backend.executor import manager
low_balance_calls: list = []
@@ -418,9 +399,6 @@ def gated_processor(monkeypatch):
llm_call_count, dry_run) and observe whether charge_extra_iterations
was called.
"""
from backend.data.execution import ExecutionStatus
from backend.data.model import NodeExecutionStats
from backend.executor import manager
calls: dict[str, list] = {
"charge_extra_iterations": [],
@@ -492,9 +470,7 @@ def gated_processor(monkeypatch):
fake_charge_extra,
)
def fake_low_balance(
self, db_client, user_id, current_balance, transaction_cost
):
def fake_low_balance(self, db_client, user_id, current_balance, transaction_cost):
calls["handle_low_balance"].append(
{
"user_id": user_id,
@@ -528,7 +504,6 @@ async def test_on_node_execution_charges_extra_iterations_when_gate_passes(
gated_processor,
):
"""COMPLETED + extra_credit_charges > 0 + not dry_run → charged."""
from backend.data.execution import ExecutionStatus
proc, calls, inner, fake_db, _ = gated_processor
inner["status"] = ExecutionStatus.COMPLETED
@@ -555,7 +530,6 @@ async def test_on_node_execution_charges_extra_iterations_when_gate_passes(
@pytest.mark.asyncio
async def test_on_node_execution_skips_when_status_not_completed(gated_processor):
from backend.data.execution import ExecutionStatus
proc, calls, inner, fake_db, _ = gated_processor
inner["status"] = ExecutionStatus.FAILED
@@ -579,7 +553,6 @@ async def test_on_node_execution_skips_when_status_not_completed(gated_processor
@pytest.mark.asyncio
async def test_on_node_execution_skips_when_extra_charges_zero(gated_processor):
from backend.data.execution import ExecutionStatus
proc, calls, inner, fake_db, _ = gated_processor
inner["status"] = ExecutionStatus.COMPLETED
@@ -604,7 +577,6 @@ async def test_on_node_execution_skips_when_extra_charges_zero(gated_processor):
@pytest.mark.asyncio
async def test_on_node_execution_skips_when_dry_run(gated_processor):
from backend.data.execution import ExecutionStatus
proc, calls, inner, fake_db, _ = gated_processor
inner["status"] = ExecutionStatus.COMPLETED
@@ -640,9 +612,6 @@ async def test_on_node_execution_insufficient_balance_records_error_and_notifies
- _handle_insufficient_funds_notif is called so the user is notified
- the structured ERROR log is the alerting hook
"""
from backend.data.execution import ExecutionStatus
from backend.executor import manager
from backend.util.exceptions import InsufficientBalanceError
proc, calls, inner, fake_db, _ = gated_processor
inner["status"] = ExecutionStatus.COMPLETED
@@ -698,13 +667,9 @@ async def _run_tool_exec_with_stats(
Used to prove the dry_run and error guards around charge_node_usage
behave as documented, and that InsufficientBalanceError propagates.
"""
import threading
from collections import defaultdict
from unittest.mock import AsyncMock, MagicMock, patch
from backend.blocks.orchestrator import ExecutionParams, OrchestratorBlock
from backend.data.execution import ExecutionContext
block = OrchestratorBlock()
# Mocked async DB client used inside orchestrator.
@@ -828,10 +793,6 @@ async def test_tool_execution_insufficient_balance_propagates():
If this leaked into a ToolCallResult the LLM loop would keep running
with 'tool failed' errors and the user would get unpaid work.
"""
from unittest.mock import AsyncMock
from backend.util.exceptions import InsufficientBalanceError
raising_charge = AsyncMock(
side_effect=InsufficientBalanceError(
user_id="u", message="nope", balance=0, amount=10
@@ -862,9 +823,6 @@ async def test_on_node_execution_failed_ibe_sends_notification(
execution_stats.error=IBE. on_node_execution's post-execution block then
sends the user notification so they understand why the run stopped.
"""
from backend.data.execution import ExecutionStatus
from backend.executor import manager
from backend.util.exceptions import InsufficientBalanceError
proc, calls, inner, fake_db, NodeExecutionStats = gated_processor
ibe = InsufficientBalanceError(
@@ -914,3 +872,102 @@ async def test_on_node_execution_failed_ibe_sends_notification(
assert calls["handle_insufficient_funds_notif"][0]["user_id"] == "u"
# charge_extra_iterations must NOT be called — status is FAILED.
assert calls["charge_extra_iterations"] == []
# ── Billing leak: non-IBE exception during extra-iteration charging ──
@pytest.mark.asyncio
async def test_on_node_execution_non_ibe_billing_failure_keeps_completed(
monkeypatch,
gated_processor,
):
"""When charge_extra_iterations raises a non-IBE exception (e.g. DB outage):
- execution_stats.error stays None (node ran to completion)
- status stays COMPLETED (work already done)
- the billing_leak error is logged but does not corrupt execution_stats
"""
proc, calls, inner, fake_db, _ = gated_processor
inner["status"] = ExecutionStatus.COMPLETED
inner["llm_call_count"] = 4
fake_db.get_node = AsyncMock(return_value=_FakeNode(extra_charges=3))
async def raise_conn_error(self, node_exec, extra_iterations):
raise ConnectionError("DB connection lost")
monkeypatch.setattr(
manager.ExecutionProcessor, "charge_extra_iterations", raise_conn_error
)
stats_pair = (
MagicMock(
node_count=0,
nodes_cputime=0,
nodes_walltime=0,
cost=0,
node_error_count=0,
),
threading.Lock(),
)
result_stats = await proc.on_node_execution(
node_exec=_make_node_exec(dry_run=False),
node_exec_progress=MagicMock(),
nodes_input_masks=None,
graph_stats_pair=stats_pair,
)
# error stays None — node completed, only billing failed.
assert result_stats.error is None
# No notification was sent (only IBE triggers notification).
assert len(calls["handle_insufficient_funds_notif"]) == 0
# ── _charge_usage with execution_count=0 ──
class TestChargeUsageZeroExecutionCount:
"""Verify _charge_usage(node_exec, 0) does not invoke execution_usage_cost."""
def test_execution_count_zero_skips_execution_tier(self, monkeypatch):
"""_charge_usage with execution_count=0 must not call execution_usage_cost."""
execution_tier_called = []
def fake_execution_usage_cost(count):
execution_tier_called.append(count)
return (100, count)
spent: list[int] = []
class FakeDb:
def spend_credits(self, *, user_id, cost, metadata):
spent.append(cost)
return 500
fake_block = MagicMock()
fake_block.name = "FakeBlock"
monkeypatch.setattr(manager, "get_db_client", lambda: FakeDb())
monkeypatch.setattr(manager, "get_block", lambda block_id: fake_block)
monkeypatch.setattr(
manager,
"block_usage_cost",
lambda block, input_data, **_kw: (10, {}),
)
monkeypatch.setattr(manager, "execution_usage_cost", fake_execution_usage_cost)
proc = manager.ExecutionProcessor.__new__(manager.ExecutionProcessor)
ne = MagicMock()
ne.user_id = "u"
ne.graph_exec_id = "ge"
ne.graph_id = "g"
ne.node_exec_id = "ne"
ne.node_id = "n"
ne.block_id = "b"
ne.inputs = {}
total_cost, remaining = proc._charge_usage(ne, 0)
assert total_cost == 10 # block cost only
assert remaining == 500
assert spent == [10]
# execution_usage_cost must NOT have been called
assert execution_tier_called == []

View File

@@ -19,7 +19,7 @@ from sentry_sdk.api import flush as _sentry_flush
from sentry_sdk.api import get_current_scope as _sentry_get_current_scope
from backend.blocks import get_block
from backend.blocks._base import BlockSchema
from backend.blocks._base import Block, BlockSchema
from backend.blocks.agent import AgentExecutorBlock
from backend.blocks.io import AgentOutputBlock
from backend.blocks.mcp.block import MCPToolBlock
@@ -681,100 +681,9 @@ class ExecutionProcessor:
execution_stats.walltime = timing_info.wall_time
execution_stats.cputime = timing_info.cpu_time
# Charge extra iterations for blocks that opt into per-LLM-call
# billing (e.g. OrchestratorBlock in agent mode). The first call
# is already covered by _charge_usage(); each additional LLM call
# costs another base_cost. Skipped for dry runs and failed runs.
#
# InsufficientBalanceError here is a post-hoc billing leak — the
# work is already done but the user can no longer pay. We:
# 1. log at ERROR with structured fields so alerting can catch it
# 2. fire _handle_insufficient_funds_notif so the user is
# notified (mirrors the main queue path at ~line 1254)
# The run itself is kept COMPLETED (the block's outputs are
# already committed) — matching the documented "billing leak"
# contract rather than retroactively failing a successful run.
extra_iterations = (
node.block.extra_credit_charges(execution_stats)
if status == ExecutionStatus.COMPLETED
and not node_exec.execution_context.dry_run
else 0
await self._handle_post_execution_billing(
node, node_exec, execution_stats, status, log_metadata
)
if extra_iterations > 0:
try:
extra_cost, remaining_balance = await self.charge_extra_iterations(
node_exec,
extra_iterations,
)
if extra_cost > 0:
execution_stats.extra_cost += extra_cost
# Mirror the low-balance notification path in
# ``charge_node_usage`` so users are alerted when
# their balance crosses the threshold from these
# extra-iteration charges (sentry HIGH/MEDIUM bug
# PRRT_kwDOJKSTjM56Mibw).
await asyncio.to_thread(
self._handle_low_balance,
get_db_client(),
node_exec.user_id,
remaining_balance,
extra_cost,
)
except InsufficientBalanceError as e:
log_metadata.error(
"billing_leak: insufficient balance after "
f"{node.block.name} completed {extra_iterations} "
f"extra iterations",
extra={
"billing_leak": True,
"user_id": node_exec.user_id,
"graph_id": node_exec.graph_id,
"block_id": node_exec.block_id,
"extra_iterations": extra_iterations,
"error": str(e),
},
)
# NOTE: Do NOT set execution_stats.error here. The node ran
# to completion — only the post-hoc charge failed. Setting
# .error would (a) flip node_error_count below, creating an
# "errored COMPLETED node" inconsistency in the metrics, and
# (b) leak balance amounts into the persisted node_stats.
# The structured ERROR log above is the alerting hook;
# node_stats stays clean.
# Notify the user they're out of credits. Runs through
# Redis dedup (per user+graph) so repeat runs don't spam.
try:
await asyncio.to_thread(
self._handle_insufficient_funds_notif,
get_db_client(),
node_exec.user_id,
node_exec.graph_id,
e,
)
except Exception as notif_error: # pragma: no cover
log_metadata.warning(
f"Failed to send insufficient funds notification: {notif_error}"
)
except Exception as e:
# Unexpected billing failure (DB outage, network, etc.).
# Log at ERROR with structured fields and the same
# `billing_leak: True` marker so monitoring catches it
# alongside InsufficientBalanceError. The run is kept
# COMPLETED because the work is already done.
log_metadata.error(
f"billing_leak: failed to charge extra iterations "
f"for {node.block.name}",
extra={
"billing_leak": True,
"user_id": node_exec.user_id,
"graph_id": node_exec.graph_id,
"block_id": node_exec.block_id,
"extra_iterations": extra_iterations,
"error_type": type(e).__name__,
"error": str(e),
},
exc_info=True,
)
graph_stats, graph_stats_lock = graph_stats_pair
with graph_stats_lock:
@@ -811,31 +720,121 @@ class ExecutionProcessor:
db_client=db_client,
)
# If the node failed because a nested tool charge raised
# InsufficientBalanceError (orchestrator agent mode), the main
# queue's _charge_usage IBE notification path was bypassed (the
# initial charge succeeded — only the nested tool charge failed).
# Send the user notification here so they understand why their
# agent run stopped. Failures are logged but not raised so the
# node-execution path stays clean.
# If the node failed because a nested tool charge raised IBE,
# send the user notification so they understand why the run stopped.
if status == ExecutionStatus.FAILED and isinstance(
execution_stats.error, InsufficientBalanceError
):
try:
await asyncio.to_thread(
self._handle_insufficient_funds_notif,
get_db_client(),
node_exec.user_id,
node_exec.graph_id,
execution_stats.error,
)
except Exception as notif_error: # pragma: no cover
log_metadata.warning(
f"Failed to send insufficient funds notification: {notif_error}"
)
await self._try_send_insufficient_funds_notif(
node_exec.user_id,
node_exec.graph_id,
execution_stats.error,
log_metadata,
)
return execution_stats
async def _try_send_insufficient_funds_notif(
self,
user_id: str,
graph_id: str,
error: InsufficientBalanceError,
log_metadata: LogMetadata,
) -> None:
"""Send an insufficient-funds notification, swallowing failures."""
try:
await asyncio.to_thread(
self._handle_insufficient_funds_notif,
get_db_client(),
user_id,
graph_id,
error,
)
except Exception as notif_error: # pragma: no cover
log_metadata.warning(
f"Failed to send insufficient funds notification: {notif_error}"
)
async def _handle_post_execution_billing(
self,
node: Node,
node_exec: NodeExecutionEntry,
execution_stats: NodeExecutionStats,
status: ExecutionStatus,
log_metadata: LogMetadata,
) -> None:
"""Charge extra iterations for blocks that opt into per-LLM-call billing.
The first LLM call is already covered by ``_charge_usage()``; each
additional call costs another ``base_cost``. Skipped for dry runs and
failed runs.
InsufficientBalanceError here is a post-hoc billing leak: the work is
already done but the user can no longer pay. The run stays COMPLETED and
the error is logged with ``billing_leak: True`` for alerting.
"""
extra_iterations = (
node.block.extra_credit_charges(execution_stats)
if status == ExecutionStatus.COMPLETED
and not node_exec.execution_context.dry_run
else 0
)
if extra_iterations <= 0:
return
try:
extra_cost, remaining_balance = await self.charge_extra_iterations(
node_exec,
extra_iterations,
)
if extra_cost > 0:
execution_stats.extra_cost += extra_cost
await asyncio.to_thread(
self._handle_low_balance,
get_db_client(),
node_exec.user_id,
remaining_balance,
extra_cost,
)
except InsufficientBalanceError as e:
log_metadata.error(
"billing_leak: insufficient balance after "
f"{node.block.name} completed {extra_iterations} "
f"extra iterations",
extra={
"billing_leak": True,
"user_id": node_exec.user_id,
"graph_id": node_exec.graph_id,
"block_id": node_exec.block_id,
"extra_iterations": extra_iterations,
"error": str(e),
},
)
# Do NOT set execution_stats.error — the node ran to completion,
# only the post-hoc charge failed. See class-level billing-leak
# contract documentation.
await self._try_send_insufficient_funds_notif(
node_exec.user_id,
node_exec.graph_id,
e,
log_metadata,
)
except Exception as e:
log_metadata.error(
f"billing_leak: failed to charge extra iterations "
f"for {node.block.name}",
extra={
"billing_leak": True,
"user_id": node_exec.user_id,
"graph_id": node_exec.graph_id,
"block_id": node_exec.block_id,
"extra_iterations": extra_iterations,
"error_type": type(e).__name__,
"error": str(e),
},
exc_info=True,
)
@async_time_measured
async def _on_node_execution(
self,
@@ -1065,7 +1064,7 @@ class ExecutionProcessor:
def _resolve_block_cost(
self,
node_exec: NodeExecutionEntry,
) -> tuple[Any, int, dict]:
) -> tuple[Block | None, int, dict]:
"""Look up the block and compute its base usage cost for an exec.
Shared by :meth:`_charge_usage` and :meth:`charge_extra_iterations`
@@ -1211,6 +1210,22 @@ class ExecutionProcessor:
self._charge_extra_iterations_sync, node_exec, capped
)
def _charge_and_check_balance(
self,
node_exec: NodeExecutionEntry,
) -> tuple[int, int]:
"""Charge usage and check low balance in a single thread-pool worker.
Combines ``_charge_usage`` and ``_handle_low_balance`` to avoid
dispatching two thread-pool calls per tool execution.
"""
total_cost, remaining = self._charge_usage(node_exec, 0)
if total_cost > 0:
self._handle_low_balance(
get_db_client(), node_exec.user_id, remaining, total_cost
)
return total_cost, remaining
async def charge_node_usage(
self,
node_exec: NodeExecutionEntry,
@@ -1229,18 +1244,7 @@ class ExecutionProcessor:
sub-steps of a single block run from the user's perspective and
should not push them into higher per-execution cost tiers.
"""
total_cost, remaining = await asyncio.to_thread(
self._charge_usage, node_exec, 0
)
if total_cost > 0:
await asyncio.to_thread(
self._handle_low_balance,
get_db_client(),
node_exec.user_id,
remaining,
total_cost,
)
return total_cost, remaining
return await asyncio.to_thread(self._charge_and_check_balance, node_exec)
@time_measured
def _on_graph_execution(