feat(platform): Add AllQuiet alert integration alongside Discord alerts

- Added system_alert method to NotificationManager that sends both Discord and AllQuiet alerts
- Implemented correlation IDs for all system alerts to prevent duplicate incidents:
  - Late executions: Based on threshold, count, and affected users
  - Block errors: Based on affected blocks and date
  - Balance alerts: Based on user ID
  - Retry failures: Based on function, context, and error type
- Updated all alert locations to use NotificationManager.system_alert() method
- Added AllQuiet webhook URL configuration in settings
- Maintained backward compatibility with existing Discord alerts

AllQuiet alerts are only sent when correlation_id is provided, ensuring
controlled rollout. Severity levels (critical/warning/minor) and extra
attributes provide better incident management and debugging context.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Nicholas Tindle
2025-10-21 15:31:18 -05:00
parent eba67e0a4b
commit c559431131
7 changed files with 218 additions and 18 deletions

View File

@@ -1217,8 +1217,23 @@ class ExecutionProcessor:
f"[View User Details]({base_url}/admin/spending?search={user_email})"
)
get_notification_manager_client().discord_system_alert(
alert_message, DiscordChannel.PRODUCT
# Send both Discord and AllQuiet alerts
correlation_id = f"insufficient_funds_{user_id}"
get_notification_manager_client().system_alert(
content=alert_message,
channel=DiscordChannel.PRODUCT,
correlation_id=correlation_id,
severity='critical',
status='open',
extra_attributes={
'user_id': user_id,
'user_email': user_email or 'unknown',
'balance': f"${e.balance / 100:.2f}",
'attempted_cost': f"${abs(e.amount) / 100:.2f}",
'shortfall': f"${abs(shortfall) / 100:.2f}",
'agent_name': metadata.name if metadata else 'Unknown',
}
)
except Exception as alert_error:
logger.error(
@@ -1265,8 +1280,22 @@ class ExecutionProcessor:
f"Transaction cost: ${transaction_cost / 100:.2f}\n"
f"[View User Details]({base_url}/admin/spending?search={user_email})"
)
get_notification_manager_client().discord_system_alert(
alert_message, DiscordChannel.PRODUCT
# Send both Discord and AllQuiet alerts
correlation_id = f"low_balance_{user_id}"
get_notification_manager_client().system_alert(
content=alert_message,
channel=DiscordChannel.PRODUCT,
correlation_id=correlation_id,
severity='warning',
status='open',
extra_attributes={
'user_id': user_id,
'user_email': user_email or 'unknown',
'current_balance': f"${current_balance / 100:.2f}",
'transaction_cost': f"${transaction_cost / 100:.2f}",
'threshold': f"${LOW_BALANCE_THRESHOLD / 100:.2f}",
}
)
except Exception as e:
logger.error(f"Failed to send low balance Discord alert: {e}")

View File

@@ -12,7 +12,7 @@ from backend.util.clients import (
get_database_manager_client,
get_notification_manager_client,
)
from backend.util.metrics import sentry_capture_error
from backend.util.metrics import sentry_capture_error, DiscordChannel
from backend.util.settings import Config
logger = logging.getLogger(__name__)
@@ -75,7 +75,28 @@ class BlockErrorMonitor:
if critical_alerts:
msg = "Block Error Rate Alert:\n\n" + "\n\n".join(critical_alerts)
self.notification_client.discord_system_alert(msg)
# Send alert with correlation ID for block errors
# We'll create a simple hash of the block IDs that have errors
blocks_with_errors = [
stats.block_id for name, stats in block_stats.items()
if stats.total_executions >= 10 and stats.error_rate >= threshold * 100
]
correlation_id = f"block_errors_{len(blocks_with_errors)}_blocks_{end_time.date()}"
self.notification_client.system_alert(
content=msg,
channel=DiscordChannel.PLATFORM,
correlation_id=correlation_id,
severity='warning',
status='open',
extra_attributes={
'blocks_affected': str(len(critical_alerts)),
'date': end_time.date().isoformat(),
'threshold': f"{threshold * 100}%",
}
)
logger.info(
f"Sent block error rate alert for {len(critical_alerts)} blocks"
)
@@ -87,7 +108,22 @@ class BlockErrorMonitor:
block_stats, start_time, end_time
)
if top_blocks_msg:
self.notification_client.discord_system_alert(top_blocks_msg)
# Daily summary gets a date-based correlation ID
correlation_id = f"block_error_daily_summary_{end_time.date()}"
self.notification_client.system_alert(
content=top_blocks_msg,
channel=DiscordChannel.PLATFORM,
correlation_id=correlation_id,
severity='minor',
status='open',
extra_attributes={
'type': 'daily_summary',
'date': end_time.date().isoformat(),
'top_blocks_count': str(self.include_top_blocks),
}
)
logger.info("Sent top blocks summary")
return "Sent top blocks summary"
@@ -100,7 +136,22 @@ class BlockErrorMonitor:
error = Exception(f"Error checking block error rates: {e}")
msg = str(error)
sentry_capture_error(error)
self.notification_client.discord_system_alert(msg)
# Send error alert with generic correlation ID
correlation_id = "block_error_monitoring_failure"
self.notification_client.system_alert(
content=msg,
channel=DiscordChannel.PLATFORM,
correlation_id=correlation_id,
severity='critical',
status='open',
extra_attributes={
'error_type': type(e).__name__,
'error_message': str(e)[:200],
}
)
return msg
def _get_block_stats_from_db(

View File

@@ -8,7 +8,7 @@ from backend.util.clients import (
get_database_manager_client,
get_notification_manager_client,
)
from backend.util.metrics import sentry_capture_error
from backend.util.metrics import DiscordChannel, sentry_capture_error
from backend.util.settings import Config
logger = logging.getLogger(__name__)
@@ -100,7 +100,28 @@ class LateExecutionMonitor:
msg = str(error)
sentry_capture_error(error)
self.notification_client.discord_system_alert(msg)
# Generate correlation ID based on the threshold and number of late executions
correlation_id = f"late_execution_{self.config.execution_late_notification_threshold_secs}s_{num_total_late}_execs_{num_users}_users"
# Send both Discord and AllQuiet alerts
self.notification_client.system_alert(
content=msg,
channel=DiscordChannel.PLATFORM,
correlation_id=correlation_id,
severity="critical",
status="open",
extra_attributes={
"total_late_executions": str(num_total_late),
"queued_executions": str(num_queued),
"running_executions": str(num_running),
"affected_users": str(num_users),
"threshold_seconds": str(
self.config.execution_late_notification_threshold_secs
),
},
)
return msg

View File

@@ -1,7 +1,7 @@
import asyncio
import logging
from datetime import datetime, timedelta, timezone
from typing import Awaitable, Callable
from typing import Awaitable, Callable, Literal
import aio_pika
from prisma.enums import NotificationType
@@ -33,7 +33,7 @@ from backend.data.user import (
from backend.notifications.email import EmailSender
from backend.util.clients import get_database_manager_async_client
from backend.util.logging import TruncatedLogger
from backend.util.metrics import DiscordChannel, discord_send_alert
from backend.util.metrics import AllQuietAlert, DiscordChannel, discord_send_alert, send_allquiet_alert
from backend.util.retry import continuous_retry
from backend.util.service import (
AppService,
@@ -417,6 +417,45 @@ class NotificationManager(AppService):
):
await discord_send_alert(content, channel)
@expose
async def allquiet_system_alert(self, alert: AllQuietAlert):
await send_allquiet_alert(alert)
@expose
async def system_alert(
self,
content: str,
channel: DiscordChannel = DiscordChannel.PLATFORM,
correlation_id: str | None = None,
severity: Literal['warning', 'critical', 'minor'] = 'warning',
status: Literal['resolved', 'open'] = 'open',
extra_attributes: dict[str, str] | None = None,
):
"""Send both Discord and AllQuiet alerts for system events."""
# Send Discord alert
await discord_send_alert(content, channel)
# Send AllQuiet alert if correlation_id is provided
if correlation_id:
# Extract title from content (first line or first sentence)
lines = content.split('\n')
title = lines[0] if lines else content[:100]
# Remove Discord formatting from title
title = title.replace('**', '').replace('🚨', '').replace('⚠️', '').replace('', '').replace('', '').replace('📊', '').strip()
alert = AllQuietAlert(
severity=severity,
status=status,
title=title[:100], # Limit title length
description=content,
correlation_id=correlation_id,
extra_attributes=extra_attributes or {},
)
try:
await send_allquiet_alert(alert)
except Exception as e:
logger.error(f"Failed to send AllQuiet alert: {e}")
async def _queue_scheduled_notification(self, event: SummaryParamsEventModel):
"""Queue a scheduled notification - exposed method for other services to call"""
try:
@@ -1101,3 +1140,5 @@ class NotificationManagerClient(AppServiceClient):
)
queue_weekly_summary = endpoint_to_sync(NotificationManager.queue_weekly_summary)
discord_system_alert = endpoint_to_sync(NotificationManager.discord_system_alert)
allquiet_system_alert = endpoint_to_sync(NotificationManager.allquiet_system_alert)
system_alert = endpoint_to_sync(NotificationManager.system_alert)

View File

@@ -1,8 +1,9 @@
import logging
from enum import Enum
from typing import Literal
import sentry_sdk
from pydantic import SecretStr
from pydantic import BaseModel, Field, SecretStr
from sentry_sdk.integrations.anthropic import AnthropicIntegration
from sentry_sdk.integrations.asyncio import AsyncioIntegration
from sentry_sdk.integrations.launchdarkly import LaunchDarklyIntegration
@@ -46,6 +47,27 @@ def sentry_capture_error(error: BaseException):
sentry_sdk.flush()
class AllQuietAlert(BaseModel):
severity: Literal['warning'] | Literal['critical'] | Literal['minor']
status: Literal['resolved'] | Literal['open']
title: str | None = None
description: str | None = None
correlation_id: str | None = None
extra_attributes: dict[str, str] = Field(default_factory=dict)
environment: str = f"app:{settings.config.app_env.value}-behave:{settings.config.behave_as.value}"
async def send_allquiet_alert(alert: AllQuietAlert):
hook_url = settings.secrets.allquiet_webhook_url
if not hook_url:
logging.warning("AllQuiet webhook URL not configured")
return
import httpx
async with httpx.AsyncClient() as client:
response = await client.post(hook_url, json=alert.model_dump())
response.raise_for_status()
async def discord_send_alert(
content: str, channel: DiscordChannel = DiscordChannel.PLATFORM
):

View File

@@ -43,7 +43,8 @@ def should_send_alert(func_name: str, exception: Exception, context: str = "") -
def send_rate_limited_discord_alert(
func_name: str, exception: Exception, context: str, alert_msg: str, channel=None
func_name: str, exception: Exception, context: str, alert_msg: str, channel=None,
correlation_id: str | None = None, severity: str = 'critical', extra_attributes: dict | None = None
) -> bool:
"""
Send a Discord alert with rate limiting.
@@ -58,8 +59,13 @@ def send_rate_limited_discord_alert(
from backend.util.metrics import DiscordChannel
notification_client = get_notification_manager_client()
notification_client.discord_system_alert(
alert_msg, channel or DiscordChannel.PLATFORM
notification_client.system_alert(
content=alert_msg,
channel=channel or DiscordChannel.PLATFORM,
correlation_id=correlation_id,
severity=severity,
status='open',
extra_attributes=extra_attributes or {}
)
return True
@@ -74,14 +80,28 @@ def _send_critical_retry_alert(
"""Send alert when a function is approaching the retry failure threshold."""
prefix = f"{context}: " if context else ""
error_type = type(exception).__name__
# Create correlation ID based on context, function name, and error type
correlation_id = f"retry_failure_{context}_{func_name}_{error_type}".replace(" ", "_").replace(":", "")
if send_rate_limited_discord_alert(
func_name,
exception,
context,
f"🚨 CRITICAL: Operation Approaching Failure Threshold: {prefix}'{func_name}'\n\n"
f"Current attempt: {attempt_number}/{EXCESSIVE_RETRY_THRESHOLD}\n"
f"Error: {type(exception).__name__}: {exception}\n\n"
f"Error: {error_type}: {exception}\n\n"
f"This operation is about to fail permanently. Investigate immediately.",
correlation_id=correlation_id,
severity='critical',
extra_attributes={
'function_name': func_name,
'attempt_number': str(attempt_number),
'max_attempts': str(EXCESSIVE_RETRY_THRESHOLD),
'error_type': error_type,
'context': context or 'none',
}
):
logger.critical(
f"CRITICAL ALERT SENT: Operation {func_name} at attempt {attempt_number}"
@@ -185,6 +205,10 @@ def conn_retry(
logger.error(f"{prefix} {action_name} failed after retries: {exception}")
else:
if attempt_number == EXCESSIVE_RETRY_THRESHOLD:
error_type = type(exception).__name__
# Create correlation ID for infrastructure issues
correlation_id = f"infrastructure_{resource_name}_{action_name}_{func_name}".replace(" ", "_")
if send_rate_limited_discord_alert(
func_name,
exception,
@@ -194,8 +218,18 @@ def conn_retry(
f"Action: {action_name}\n"
f"Function: {func_name}\n"
f"Current attempt: {attempt_number}/{max_retry + 1}\n"
f"Error: {type(exception).__name__}: {str(exception)[:200]}{'...' if len(str(exception)) > 200 else ''}\n\n"
f"Error: {error_type}: {str(exception)[:200]}{'...' if len(str(exception)) > 200 else ''}\n\n"
f"Infrastructure component is approaching failure threshold. Investigate immediately.",
correlation_id=correlation_id,
severity='critical',
extra_attributes={
'resource_name': resource_name,
'action_name': action_name,
'function_name': func_name,
'attempt_number': str(attempt_number),
'max_attempts': str(max_retry + 1),
'error_type': error_type,
}
):
logger.critical(
f"INFRASTRUCTURE ALERT SENT: {resource_name} at {attempt_number} attempts"

View File

@@ -602,6 +602,8 @@ class Secrets(UpdateTrackingModel["Secrets"], BaseSettings):
description="The LaunchDarkly SDK key for feature flag management",
)
allquiet_webhook_url: str = Field(default="", description="AllQuiet webhook URL")
ayrshare_api_key: str = Field(default="", description="Ayrshare API Key")
ayrshare_jwt_key: str = Field(default="", description="Ayrshare private Key")
# Add more secret fields as needed