Merge remote-tracking branch 'origin/codex/platform-cost-tracking' into combined-preview-test

This commit is contained in:
Zamil Majdy
2026-04-02 18:32:06 +02:00
26 changed files with 173 additions and 34 deletions

View File

@@ -18,6 +18,7 @@ from backend.data.model import (
APIKeyCredentials,
CredentialsField,
CredentialsMetaInput,
NodeExecutionStats,
SchemaField,
)
from backend.integrations.providers import ProviderName
@@ -358,6 +359,7 @@ class AIShortformVideoCreatorBlock(Block):
execution_context=execution_context,
return_format="for_block_output",
)
self.merge_stats(NodeExecutionStats(output_size=1))
yield "video_url", stored_url
@@ -565,6 +567,7 @@ class AIAdMakerVideoCreatorBlock(Block):
execution_context=execution_context,
return_format="for_block_output",
)
self.merge_stats(NodeExecutionStats(output_size=1))
yield "video_url", stored_url
@@ -760,4 +763,5 @@ class AIScreenshotToVideoAdBlock(Block):
execution_context=execution_context,
return_format="for_block_output",
)
self.merge_stats(NodeExecutionStats(output_size=1))
yield "video_url", stored_url

View File

@@ -17,7 +17,7 @@ from backend.blocks.apollo.models import (
PrimaryPhone,
SearchOrganizationsRequest,
)
from backend.data.model import CredentialsField, SchemaField
from backend.data.model import CredentialsField, NodeExecutionStats, SchemaField
class SearchOrganizationsBlock(Block):
@@ -218,6 +218,7 @@ To find IDs, identify the values for organization_id when you call this endpoint
) -> BlockOutput:
query = SearchOrganizationsRequest(**input_data.model_dump())
organizations = await self.search_organizations(query, credentials)
self.merge_stats(NodeExecutionStats(output_size=len(organizations)))
for organization in organizations:
yield "organization", organization
yield "organizations", organizations

View File

@@ -21,7 +21,7 @@ from backend.blocks.apollo.models import (
SearchPeopleRequest,
SenorityLevels,
)
from backend.data.model import CredentialsField, SchemaField
from backend.data.model import CredentialsField, NodeExecutionStats, SchemaField
class SearchPeopleBlock(Block):
@@ -366,4 +366,5 @@ class SearchPeopleBlock(Block):
*(enrich_or_fallback(person) for person in people)
)
self.merge_stats(NodeExecutionStats(output_size=len(people)))
yield "people", people

View File

@@ -13,7 +13,7 @@ from backend.blocks.apollo._auth import (
ApolloCredentialsInput,
)
from backend.blocks.apollo.models import Contact, EnrichPersonRequest
from backend.data.model import CredentialsField, SchemaField
from backend.data.model import CredentialsField, NodeExecutionStats, SchemaField
class GetPersonDetailBlock(Block):
@@ -141,4 +141,5 @@ class GetPersonDetailBlock(Block):
**kwargs,
) -> BlockOutput:
query = EnrichPersonRequest(**input_data.model_dump())
self.merge_stats(NodeExecutionStats(output_size=1))
yield "contact", await self.enrich_person(query, credentials)

View File

@@ -17,6 +17,7 @@ from backend.data.model import (
APIKeyCredentials,
CredentialsField,
CredentialsMetaInput,
NodeExecutionStats,
SchemaField,
)
from backend.integrations.providers import ProviderName
@@ -342,6 +343,7 @@ class ExecuteCodeBlock(Block, BaseE2BExecutorMixin):
# Determine result object shape & filter out empty formats
main_result, results = self.process_execution_results(results)
self.merge_stats(NodeExecutionStats(output_size=1))
if main_result:
yield "main_result", main_result
yield "results", results
@@ -467,6 +469,7 @@ class InstantiateCodeSandboxBlock(Block, BaseE2BExecutorMixin):
setup_commands=input_data.setup_commands,
timeout=input_data.timeout,
)
self.merge_stats(NodeExecutionStats(output_size=1))
if sandbox_id:
yield "sandbox_id", sandbox_id
else:
@@ -577,6 +580,7 @@ class ExecuteCodeStepBlock(Block, BaseE2BExecutorMixin):
# Determine result object shape & filter out empty formats
main_result, results = self.process_execution_results(results)
self.merge_stats(NodeExecutionStats(output_size=1))
if main_result:
yield "main_result", main_result
yield "results", results

View File

@@ -15,7 +15,12 @@ from backend.blocks._base import (
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import APIKeyCredentials, CredentialsField, SchemaField
from backend.data.model import (
APIKeyCredentials,
CredentialsField,
NodeExecutionStats,
SchemaField,
)
from backend.util.type import MediaFileType
from ._api import (
@@ -195,6 +200,7 @@ class GetLinkedinProfileBlock(Block):
include_social_media=input_data.include_social_media,
include_extra=input_data.include_extra,
)
self.merge_stats(NodeExecutionStats(output_size=1))
yield "profile", profile
except Exception as e:
logger.error(f"Error fetching LinkedIn profile: {str(e)}")
@@ -341,6 +347,7 @@ class LinkedinPersonLookupBlock(Block):
include_similarity_checks=input_data.include_similarity_checks,
enrich_profile=input_data.enrich_profile,
)
self.merge_stats(NodeExecutionStats(output_size=1))
yield "lookup_result", lookup_result
except Exception as e:
logger.error(f"Error looking up LinkedIn profile: {str(e)}")
@@ -443,6 +450,7 @@ class LinkedinRoleLookupBlock(Block):
company_name=input_data.company_name,
enrich_profile=input_data.enrich_profile,
)
self.merge_stats(NodeExecutionStats(output_size=1))
yield "role_lookup_result", role_lookup_result
except Exception as e:
logger.error(f"Error looking up role in company: {str(e)}")
@@ -523,6 +531,7 @@ class GetLinkedinProfilePictureBlock(Block):
credentials=credentials,
linkedin_profile_url=input_data.linkedin_profile_url,
)
self.merge_stats(NodeExecutionStats(output_size=1))
yield "profile_picture_url", profile_picture
except Exception as e:
logger.error(f"Error getting profile picture: {str(e)}")

View File

@@ -18,7 +18,7 @@ from backend.blocks.fal._auth import (
FalCredentialsInput,
)
from backend.data.execution import ExecutionContext
from backend.data.model import SchemaField
from backend.data.model import NodeExecutionStats, SchemaField
from backend.util.file import store_media_file
from backend.util.request import ClientResponseError, Requests
from backend.util.type import MediaFileType
@@ -230,6 +230,7 @@ class AIVideoGeneratorBlock(Block):
execution_context=execution_context,
return_format="for_block_output",
)
self.merge_stats(NodeExecutionStats(output_size=1))
yield "video_url", stored_url
except Exception as e:
error_message = str(e)

View File

@@ -14,6 +14,7 @@ from backend.data.model import (
APIKeyCredentials,
CredentialsField,
CredentialsMetaInput,
NodeExecutionStats,
SchemaField,
)
from backend.integrations.providers import ProviderName
@@ -117,6 +118,7 @@ class GoogleMapsSearchBlock(Block):
input_data.radius,
input_data.max_results,
)
self.merge_stats(NodeExecutionStats(output_size=len(places)))
for place in places:
yield "place", place

View File

@@ -14,6 +14,7 @@ from backend.data.model import (
APIKeyCredentials,
CredentialsField,
CredentialsMetaInput,
NodeExecutionStats,
SchemaField,
)
from backend.integrations.providers import ProviderName
@@ -227,6 +228,7 @@ class IdeogramModelBlock(Block):
image_url=result,
)
self.merge_stats(NodeExecutionStats(output_size=1))
yield "result", result
async def run_model(

View File

@@ -10,7 +10,7 @@ from backend.blocks.jina._auth import (
JinaCredentialsField,
JinaCredentialsInput,
)
from backend.data.model import SchemaField
from backend.data.model import NodeExecutionStats, SchemaField
from backend.util.request import Requests
@@ -45,5 +45,13 @@ class JinaEmbeddingBlock(Block):
}
data = {"input": input_data.texts, "model": input_data.model}
response = await Requests().post(url, headers=headers, json=data)
embeddings = [e["embedding"] for e in response.json()["data"]]
resp_json = response.json()
embeddings = [e["embedding"] for e in resp_json["data"]]
usage = resp_json.get("usage", {})
if usage.get("total_tokens"):
self.merge_stats(
NodeExecutionStats(
input_token_count=usage.get("total_tokens", 0),
)
)
yield "embeddings", embeddings

View File

@@ -8,6 +8,7 @@ from backend.data.model import (
APIKeyCredentials,
CredentialsField,
CredentialsMetaInput,
NodeExecutionStats,
SchemaField,
)
from backend.integrations.providers import ProviderName
@@ -153,6 +154,7 @@ class AddMemoryBlock(Block, Mem0Base):
messages,
**params,
)
self.merge_stats(NodeExecutionStats(output_size=1))
results = result.get("results", [])
yield "results", results
@@ -255,6 +257,7 @@ class SearchMemoryBlock(Block, Mem0Base):
result: list[dict[str, Any]] = client.search(
input_data.query, version="v2", filters=filters
)
self.merge_stats(NodeExecutionStats(output_size=1))
yield "memories", result
except Exception as e:
@@ -340,6 +343,7 @@ class GetAllMemoriesBlock(Block, Mem0Base):
filters=filters,
version="v2",
)
self.merge_stats(NodeExecutionStats(output_size=1))
yield "memories", memories
@@ -434,6 +438,7 @@ class GetLatestMemoryBlock(Block, Mem0Base):
filters=filters,
version="v2",
)
self.merge_stats(NodeExecutionStats(output_size=1))
if memories:
# Return the latest memory (first in the list as they're sorted by recency)

View File

@@ -10,7 +10,7 @@ from backend.blocks.nvidia._auth import (
NvidiaCredentialsField,
NvidiaCredentialsInput,
)
from backend.data.model import SchemaField
from backend.data.model import NodeExecutionStats, SchemaField
from backend.util.request import Requests
from backend.util.type import MediaFileType
@@ -69,6 +69,7 @@ class NvidiaDeepfakeDetectBlock(Block):
data = response.json()
result = data.get("data", [{}])[0]
self.merge_stats(NodeExecutionStats(output_size=1))
# Get deepfake probability from first bounding box if any
deepfake_prob = 0.0

View File

@@ -17,7 +17,12 @@ from backend.blocks.replicate._auth import (
ReplicateCredentialsInput,
)
from backend.blocks.replicate._helper import ReplicateOutputs, extract_result
from backend.data.model import APIKeyCredentials, CredentialsField, SchemaField
from backend.data.model import (
APIKeyCredentials,
CredentialsField,
NodeExecutionStats,
SchemaField,
)
from backend.util.exceptions import BlockExecutionError, BlockInputError
logger = logging.getLogger(__name__)
@@ -108,6 +113,7 @@ class ReplicateModelBlock(Block):
result = await self.run_model(
model_ref, input_data.model_inputs, credentials.api_key
)
self.merge_stats(NodeExecutionStats(output_size=1))
yield "result", result
yield "status", "succeeded"
yield "model_name", input_data.model_name

View File

@@ -16,6 +16,7 @@ from backend.data.model import (
APIKeyCredentials,
CredentialsField,
CredentialsMetaInput,
NodeExecutionStats,
SchemaField,
)
from backend.integrations.providers import ProviderName
@@ -185,6 +186,7 @@ class ScreenshotWebPageBlock(Block):
block_chats=input_data.block_chats,
cache=input_data.cache,
)
self.merge_stats(NodeExecutionStats(output_size=1))
yield "image", screenshot_data["image"]
except Exception as e:
yield "error", str(e)

View File

@@ -15,6 +15,7 @@ from backend.data.model import (
APIKeyCredentials,
CredentialsField,
CredentialsMetaInput,
NodeExecutionStats,
SchemaField,
)
from backend.integrations.providers import ProviderName
@@ -146,6 +147,7 @@ class GetWeatherInformationBlock(Block, GetRequest):
weather_data = await self.get_request(url, json=True)
if "main" in weather_data and "weather" in weather_data:
self.merge_stats(NodeExecutionStats(output_size=1))
yield "temperature", str(weather_data["main"]["temp"])
yield "humidity", str(weather_data["main"]["humidity"])
yield "condition", weather_data["weather"][0]["description"]

View File

@@ -23,7 +23,7 @@ from backend.blocks.smartlead.models import (
SaveSequencesResponse,
Sequence,
)
from backend.data.model import CredentialsField, SchemaField
from backend.data.model import CredentialsField, NodeExecutionStats, SchemaField
class CreateCampaignBlock(Block):
@@ -100,6 +100,7 @@ class CreateCampaignBlock(Block):
**kwargs,
) -> BlockOutput:
response = await self.create_campaign(input_data.name, credentials)
self.merge_stats(NodeExecutionStats(output_size=1))
yield "id", response.id
yield "name", response.name
@@ -226,6 +227,7 @@ class AddLeadToCampaignBlock(Block):
response = await self.add_leads_to_campaign(
input_data.campaign_id, input_data.lead_list, credentials
)
self.merge_stats(NodeExecutionStats(output_size=len(input_data.lead_list)))
yield "campaign_id", input_data.campaign_id
yield "upload_count", response.upload_count
@@ -321,6 +323,7 @@ class SaveCampaignSequencesBlock(Block):
response = await self.save_campaign_sequences(
input_data.campaign_id, input_data.sequences, credentials
)
self.merge_stats(NodeExecutionStats(output_size=1))
if response.data:
yield "data", response.data

View File

@@ -15,6 +15,7 @@ from backend.data.model import (
APIKeyCredentials,
CredentialsField,
CredentialsMetaInput,
NodeExecutionStats,
SchemaField,
)
from backend.integrations.providers import ProviderName
@@ -181,6 +182,7 @@ class CreateTalkingAvatarVideoBlock(Block):
execution_context=execution_context,
return_format="for_block_output",
)
self.merge_stats(NodeExecutionStats(output_size=1))
yield "video_url", stored_url
return
elif status_response["status"] == "error":

View File

@@ -13,6 +13,7 @@ from backend.data.model import (
APIKeyCredentials,
CredentialsField,
CredentialsMetaInput,
NodeExecutionStats,
SchemaField,
)
from backend.integrations.providers import ProviderName
@@ -104,4 +105,5 @@ class UnrealTextToSpeechBlock(Block):
input_data.text,
input_data.voice_id,
)
self.merge_stats(NodeExecutionStats(output_size=len(input_data.text)))
yield "mp3_url", api_response["OutputUri"]

View File

@@ -19,6 +19,7 @@ from backend.blocks._base import (
from backend.data.model import (
CredentialsField,
CredentialsMetaInput,
NodeExecutionStats,
SchemaField,
UserPasswordCredentials,
)
@@ -170,6 +171,7 @@ class TranscribeYoutubeVideoBlock(Block):
transcript = self.get_transcript(video_id, credentials)
transcript_text = self.format_transcript(transcript=transcript)
self.merge_stats(NodeExecutionStats(output_size=1))
# Only yield after all operations succeed
yield "video_id", video_id
yield "transcript", transcript_text

View File

@@ -21,7 +21,7 @@ from backend.blocks.zerobounce._auth import (
ZeroBounceCredentials,
ZeroBounceCredentialsInput,
)
from backend.data.model import CredentialsField, SchemaField
from backend.data.model import CredentialsField, NodeExecutionStats, SchemaField
class Response(BaseModel):
@@ -177,5 +177,6 @@ class ValidateEmailsBlock(Block):
)
response_model = Response(**response.__dict__)
self.merge_stats(NodeExecutionStats(output_size=1))
yield "response", response_model

View File

@@ -4,12 +4,15 @@ Both the baseline (OpenRouter) and SDK (Anthropic) service layers need to:
1. Append a ``Usage`` record to the session.
2. Log the turn's token counts.
3. Record weighted usage in Redis for rate-limiting.
4. Write a PlatformCostLog entry for admin cost tracking.
This module extracts that common logic so both paths stay in sync.
"""
import logging
from backend.data.platform_cost import PlatformCostEntry, log_platform_cost_safe
from .model import ChatSession, Usage
from .rate_limit import record_token_usage
@@ -95,4 +98,47 @@ async def persist_and_record_usage(
except Exception as usage_err:
logger.warning(f"{log_prefix} Failed to record token usage: {usage_err}")
# Log to PlatformCostLog for admin cost dashboard
if user_id and total_tokens > 0:
cost_float = None
if cost_usd is not None:
try:
cost_float = float(cost_usd)
except (ValueError, TypeError):
pass
cost_microdollars = (
int(cost_float * 1_000_000) if cost_float is not None else None
)
session_id = session.session_id if session else None
if cost_float is not None:
tracking_type = "cost_usd"
tracking_amount = cost_float
else:
tracking_type = "tokens"
tracking_amount = total_tokens
await log_platform_cost_safe(
PlatformCostEntry(
user_id=user_id,
graph_exec_id=session_id,
block_id="copilot",
block_name=f"copilot:{log_prefix.strip(' []')}".rstrip(":"),
provider="open_router",
credential_id="copilot_system",
cost_microdollars=cost_microdollars,
input_tokens=prompt_tokens,
output_tokens=completion_tokens,
model=None,
metadata={
"tracking_type": tracking_type,
"tracking_amount": tracking_amount,
"cache_read_tokens": cache_read_tokens,
"cache_creation_tokens": cache_creation_tokens,
"source": "copilot",
},
)
)
return total_tokens

View File

@@ -852,6 +852,9 @@ class NodeExecutionStats(BaseModel):
current_stats = self.model_dump()
for key, value in stats_dict.items():
if value is None:
# Never overwrite an existing value with None
continue
if key not in current_stats:
# Field doesn't exist yet, just set it
setattr(self, key, value)

View File

@@ -2063,6 +2063,49 @@ class ExecutionManager(AppProcess):
# ------- UTILITIES ------- #
def _resolve_tracking(
provider: str,
block_name: str,
stats: NodeExecutionStats,
input_data: dict[str, Any],
) -> tuple[str, float]:
"""Return (tracking_type, tracking_amount) based on provider billing model."""
# 1. Provider returned actual USD cost (OpenRouter, Exa)
if stats.provider_cost is not None:
return "cost_usd", stats.provider_cost
# 2. LLM providers: track by tokens
if stats.input_token_count or stats.output_token_count:
return "tokens", float(
(stats.input_token_count or 0) + (stats.output_token_count or 0)
)
# 3. Provider-specific billing models
# TTS: billed per character of input text
if provider == "unreal_speech":
text = input_data.get("text", "")
return "characters", float(len(text)) if isinstance(text, str) else 0.0
# D-ID + ElevenLabs voice: billed per character of script
if provider in ("d_id", "elevenlabs"):
text = input_data.get("script_input", "") or input_data.get("text", "")
return "characters", float(len(text)) if isinstance(text, str) else 0.0
# E2B: billed per second of sandbox time
if provider == "e2b":
return "sandbox_seconds", round(stats.walltime, 3) if stats.walltime else 0.0
# Video/image gen: walltime includes queue + generation + polling
if provider in ("fal", "revid", "replicate"):
return "walltime_seconds", round(stats.walltime, 3) if stats.walltime else 0.0
# Per-request: Google Maps, Ideogram, Nvidia, Apollo, etc.
# All billed per API call - count 1 per block execution.
# output_size captured separately for volume estimation.
return "per_run", 1.0
async def _log_system_credential_cost(
node_exec: NodeExecutionEntry,
block: "Block",
@@ -2100,25 +2143,13 @@ async def _log_system_credential_cost(
if stats.provider_cost is not None:
cost_microdollars = int(stats.provider_cost * 1_000_000)
# Determine tracking type and amount:
# - cost_usd: provider returned actual dollar cost
# - tokens: LLM provider with token counts
# - duration_seconds: billed by execution time (E2B, video gen)
# - per_run: flat per-request billing
if stats.provider_cost is not None:
tracking_type = "cost_usd"
tracking_amount = stats.provider_cost
elif stats.input_token_count or stats.output_token_count:
tracking_type = "tokens"
tracking_amount = (stats.input_token_count or 0) + (
stats.output_token_count or 0
)
elif stats.walltime and stats.walltime > 0:
tracking_type = "duration_seconds"
tracking_amount = round(stats.walltime, 3)
else:
tracking_type = "per_run"
tracking_amount = 1
provider_name = cred_data.get("provider", "unknown")
tracking_type, tracking_amount = _resolve_tracking(
provider=provider_name,
block_name=block.name,
stats=stats,
input_data=input_data,
)
meta: dict[str, Any] = {
"tracking_type": tracking_type,
@@ -2138,7 +2169,7 @@ async def _log_system_credential_cost(
node_id=node_exec.node_id,
block_id=node_exec.block_id,
block_name=block.name,
provider=cred_data.get("provider", "unknown"),
provider=provider_name,
credential_id=cred_id,
cost_microdollars=cost_microdollars,
input_tokens=stats.input_token_count or None,

View File

@@ -11,7 +11,7 @@ CREATE TABLE "PlatformCostLog" (
"blockName" TEXT NOT NULL,
"provider" TEXT NOT NULL,
"credentialId" TEXT NOT NULL,
"costMicrodollars" INTEGER,
"costMicrodollars" BIGINT,
"inputTokens" INTEGER,
"outputTokens" INTEGER,
"dataSize" INTEGER,

View File

@@ -838,7 +838,7 @@ model PlatformCostLog {
credentialId String
// Cost in microdollars (1 USD = 1,000,000). Null if unknown.
costMicrodollars Int?
costMicrodollars BigInt?
inputTokens Int?
outputTokens Int?

View File

@@ -369,7 +369,7 @@ function LogsTable({
{new Date(log.created_at).toLocaleString()}
</td>
<td className="px-3 py-2 text-xs">
{log.email || log.user_id.slice(0, 8)}
{log.email || log.user_id?.slice(0, 8) || "Deleted user"}
</td>
<td className="px-3 py-2 text-xs font-medium">
{log.block_name}