fix(backend): address PR review — email masking, semaphore, openrouter cost style

- Mask user emails in admin API responses (dashboard + logs) to reduce
  PII exposure in proxy/CDN logs; _mask_email() shows first 2 chars only
- Add _log_semaphore(50) in platform_cost.py to bound concurrent DB inserts
  and provide back-pressure under high load
- Refactor extract_openrouter_cost() to use try/except AttributeError
  instead of getattr/hasattr, and log a WARNING when _response is missing
  so SDK changes are detectable
- Add comment to usePlatformCostContent.ts explaining why server actions
  are used instead of React Query (server-side withRoleAccess constraint)
This commit is contained in:
Zamil Majdy
2026-04-07 16:38:07 +07:00
parent c9461836c6
commit a616e5a060
3 changed files with 44 additions and 14 deletions

View File

@@ -777,13 +777,19 @@ def extract_openrouter_cost(response) -> float | None:
OpenRouter returns the per-request USD cost in a response header. The
OpenAI SDK exposes the raw httpx response via an undocumented `_response`
attribute. If the SDK ever drops or renames that attribute, we silently
degrade to no cost tracking rather than raising.
attribute. We use try/except AttributeError so that if the SDK ever drops
or renames that attribute, the warning is visible in logs rather than
silently degrading to no cost tracking.
"""
try:
raw_resp = getattr(response, "_response", None)
if raw_resp is None or not hasattr(raw_resp, "headers"):
return None
raw_resp = response._response # type: ignore[attr-defined]
except AttributeError:
logger.warning(
"OpenAI SDK response missing _response attribute"
" — OpenRouter cost tracking unavailable"
)
return None
try:
cost_header = raw_resp.headers.get("x-total-cost")
if not cost_header:
return None

View File

@@ -86,10 +86,16 @@ async def log_platform_cost(entry: PlatformCostEntry) -> None:
)
# Bound the number of concurrent cost-log DB inserts to prevent unbounded
# task/connection growth under sustained load or DB slowness.
_log_semaphore = asyncio.Semaphore(50)
async def log_platform_cost_safe(entry: PlatformCostEntry) -> None:
"""Fire-and-forget wrapper that never raises."""
try:
await log_platform_cost(entry)
async with _log_semaphore:
await log_platform_cost(entry)
except Exception:
logger.exception(
"Failed to log platform cost for user=%s provider=%s block=%s",
@@ -101,12 +107,8 @@ async def log_platform_cost_safe(entry: PlatformCostEntry) -> None:
# Hold strong references to in-flight log tasks to prevent GC.
# Tasks remove themselves on completion via add_done_callback.
#
# NOTE: this set is intentionally unbounded. Under sustained high load or DB
# slowness the set could grow without limit. Adding a bounded asyncio.Semaphore
# would provide back-pressure but is deferred until we observe memory pressure
# in production. The set is small in practice because log inserts are fast
# (sub-millisecond on a healthy DB).
# Concurrent DB inserts are bounded by _log_semaphore (50) to provide
# back-pressure under high load or DB slowness.
_pending_log_tasks: set["asyncio.Task[None]"] = set()
@@ -141,6 +143,23 @@ def _json_or_none(data: dict[str, Any] | None) -> str | None:
return json.dumps(data)
def _mask_email(email: str | None) -> str | None:
"""Mask an email address to reduce PII exposure in admin API responses.
Turns 'user@example.com' into 'us***@example.com'.
Handles short local parts gracefully (e.g. 'a@b.com''a***@b.com').
"""
if not email:
return email
at = email.find("@")
if at < 0:
return "***"
local = email[:at]
domain = email[at:]
visible = local[:2] if len(local) >= 2 else local[:1]
return f"{visible}***{domain}"
class ProviderCostSummary(BaseModel):
provider: str
tracking_type: str | None = None
@@ -304,7 +323,7 @@ async def get_platform_cost_dashboard(
by_user=[
UserCostSummary(
user_id=r.get("user_id"),
email=r.get("email"),
email=_mask_email(r.get("email")),
total_cost_microdollars=r["total_cost"],
total_input_tokens=r["total_input_tokens"],
total_output_tokens=r["total_output_tokens"],
@@ -378,7 +397,7 @@ async def get_platform_cost_logs(
id=r["id"],
created_at=r["created_at"],
user_id=r.get("user_id"),
email=r.get("email"),
email=_mask_email(r.get("email")),
graph_exec_id=r.get("graph_exec_id"),
node_exec_id=r.get("node_exec_id"),
block_name=r["block_name"],

View File

@@ -49,6 +49,11 @@ export function usePlatformCostContent(searchParams: InitialSearchParams) {
const [userInput, setUserInput] = useState(userFilter);
useEffect(() => {
// Fetching is triggered only on URL param changes (user-driven navigation),
// so rapid re-fetches are naturally debounced by the URL update cycle.
// React Query is not used here because this component calls 'use server'
// actions that run server-side (withRoleAccess wrapping); React Query hooks
// from Orval are browser-only and cannot enforce server-side role checks.
async function load() {
setLoading(true);
setError(null);