mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
Merge 'dev' into 'feat/agent-notifications'
This commit is contained in:
@@ -199,6 +199,10 @@ ZEROBOUNCE_API_KEY=
|
||||
|
||||
## ===== OPTIONAL API KEYS END ===== ##
|
||||
|
||||
# Block Error Rate Monitoring
|
||||
BLOCK_ERROR_RATE_THRESHOLD=0.5
|
||||
BLOCK_ERROR_RATE_CHECK_INTERVAL_SECS=86400
|
||||
|
||||
# Logging Configuration
|
||||
LOG_LEVEL=INFO
|
||||
ENABLE_CLOUD_LOGGING=false
|
||||
|
||||
@@ -93,6 +93,28 @@ async def locked_transaction(key: str):
|
||||
yield tx
|
||||
|
||||
|
||||
def get_database_schema() -> str:
|
||||
"""Extract database schema from DATABASE_URL."""
|
||||
parsed_url = urlparse(DATABASE_URL)
|
||||
query_params = dict(parse_qsl(parsed_url.query))
|
||||
return query_params.get("schema", "public")
|
||||
|
||||
|
||||
async def query_raw_with_schema(query_template: str, *args) -> list[dict]:
|
||||
"""Execute raw SQL query with proper schema handling."""
|
||||
schema = get_database_schema()
|
||||
schema_prefix = f"{schema}." if schema != "public" else ""
|
||||
formatted_query = query_template.format(schema_prefix=schema_prefix)
|
||||
|
||||
import prisma as prisma_module
|
||||
|
||||
result = await prisma_module.get_client().query_raw(
|
||||
formatted_query, *args # type: ignore
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
class BaseDbModel(BaseModel):
|
||||
id: str = Field(default_factory=lambda: str(uuid4()))
|
||||
|
||||
|
||||
@@ -49,7 +49,7 @@ from .block import (
|
||||
get_io_block_ids,
|
||||
get_webhook_block_ids,
|
||||
)
|
||||
from .db import BaseDbModel
|
||||
from .db import BaseDbModel, query_raw_with_schema
|
||||
from .event_bus import AsyncRedisEventBus, RedisEventBus
|
||||
from .includes import (
|
||||
EXECUTION_RESULT_INCLUDE,
|
||||
@@ -68,6 +68,21 @@ config = Config()
|
||||
# -------------------------- Models -------------------------- #
|
||||
|
||||
|
||||
class BlockErrorStats(BaseModel):
|
||||
"""Typed data structure for block error statistics."""
|
||||
|
||||
block_id: str
|
||||
total_executions: int
|
||||
failed_executions: int
|
||||
|
||||
@property
|
||||
def error_rate(self) -> float:
|
||||
"""Calculate error rate as a percentage."""
|
||||
if self.total_executions == 0:
|
||||
return 0.0
|
||||
return (self.failed_executions / self.total_executions) * 100
|
||||
|
||||
|
||||
ExecutionStatus = AgentExecutionStatus
|
||||
|
||||
|
||||
@@ -732,15 +747,18 @@ async def get_node_execution(node_exec_id: str) -> NodeExecutionResult | None:
|
||||
|
||||
|
||||
async def get_node_executions(
|
||||
graph_exec_id: str,
|
||||
graph_exec_id: str | None = None,
|
||||
node_id: str | None = None,
|
||||
block_ids: list[str] | None = None,
|
||||
statuses: list[ExecutionStatus] | None = None,
|
||||
limit: int | None = None,
|
||||
created_time_gte: datetime | None = None,
|
||||
created_time_lte: datetime | None = None,
|
||||
include_exec_data: bool = True,
|
||||
) -> list[NodeExecutionResult]:
|
||||
where_clause: AgentNodeExecutionWhereInput = {
|
||||
"agentGraphExecutionId": graph_exec_id,
|
||||
}
|
||||
where_clause: AgentNodeExecutionWhereInput = {}
|
||||
if graph_exec_id:
|
||||
where_clause["agentGraphExecutionId"] = graph_exec_id
|
||||
if node_id:
|
||||
where_clause["agentNodeId"] = node_id
|
||||
if block_ids:
|
||||
@@ -748,9 +766,19 @@ async def get_node_executions(
|
||||
if statuses:
|
||||
where_clause["OR"] = [{"executionStatus": status} for status in statuses]
|
||||
|
||||
if created_time_gte or created_time_lte:
|
||||
where_clause["addedTime"] = {
|
||||
"gte": created_time_gte or datetime.min.replace(tzinfo=timezone.utc),
|
||||
"lte": created_time_lte or datetime.max.replace(tzinfo=timezone.utc),
|
||||
}
|
||||
|
||||
executions = await AgentNodeExecution.prisma().find_many(
|
||||
where=where_clause,
|
||||
include=EXECUTION_RESULT_INCLUDE,
|
||||
include=(
|
||||
EXECUTION_RESULT_INCLUDE
|
||||
if include_exec_data
|
||||
else {"Node": True, "GraphExecution": True}
|
||||
),
|
||||
order=EXECUTION_RESULT_ORDER,
|
||||
take=limit,
|
||||
)
|
||||
@@ -963,3 +991,33 @@ async def set_execution_kv_data(
|
||||
},
|
||||
)
|
||||
return type_utils.convert(resp.data, type[Any]) if resp and resp.data else None
|
||||
|
||||
|
||||
async def get_block_error_stats(
|
||||
start_time: datetime, end_time: datetime
|
||||
) -> list[BlockErrorStats]:
|
||||
"""Get block execution stats using efficient SQL aggregation."""
|
||||
|
||||
query_template = """
|
||||
SELECT
|
||||
n."agentBlockId" as block_id,
|
||||
COUNT(*) as total_executions,
|
||||
SUM(CASE WHEN ne."executionStatus" = 'FAILED' THEN 1 ELSE 0 END) as failed_executions
|
||||
FROM {schema_prefix}"AgentNodeExecution" ne
|
||||
JOIN {schema_prefix}"AgentNode" n ON ne."agentNodeId" = n.id
|
||||
WHERE ne."addedTime" >= $1::timestamp AND ne."addedTime" <= $2::timestamp
|
||||
GROUP BY n."agentBlockId"
|
||||
HAVING COUNT(*) >= 10
|
||||
"""
|
||||
|
||||
result = await query_raw_with_schema(query_template, start_time, end_time)
|
||||
|
||||
# Convert to typed data structures
|
||||
return [
|
||||
BlockErrorStats(
|
||||
block_id=row["block_id"],
|
||||
total_executions=int(row["total_executions"]),
|
||||
failed_executions=int(row["failed_executions"]),
|
||||
)
|
||||
for row in result
|
||||
]
|
||||
|
||||
@@ -3,7 +3,6 @@ import uuid
|
||||
from collections import defaultdict
|
||||
from typing import TYPE_CHECKING, Any, Literal, Optional, cast
|
||||
|
||||
import prisma
|
||||
from prisma import Json
|
||||
from prisma.enums import SubmissionStatus
|
||||
from prisma.models import AgentGraph, AgentNode, AgentNodeLink, StoreListingVersion
|
||||
@@ -31,7 +30,7 @@ from backend.integrations.providers import ProviderName
|
||||
from backend.util import type as type_utils
|
||||
|
||||
from .block import Block, BlockInput, BlockSchema, BlockType, get_block, get_blocks
|
||||
from .db import BaseDbModel, transaction
|
||||
from .db import BaseDbModel, query_raw_with_schema, transaction
|
||||
from .includes import AGENT_GRAPH_INCLUDE, AGENT_NODE_INCLUDE
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -1040,13 +1039,13 @@ async def fix_llm_provider_credentials():
|
||||
|
||||
broken_nodes = []
|
||||
try:
|
||||
broken_nodes = await prisma.get_client().query_raw(
|
||||
broken_nodes = await query_raw_with_schema(
|
||||
"""
|
||||
SELECT graph."userId" user_id,
|
||||
node.id node_id,
|
||||
node."constantInput" node_preset_input
|
||||
FROM platform."AgentNode" node
|
||||
LEFT JOIN platform."AgentGraph" graph
|
||||
FROM {schema_prefix}"AgentNode" node
|
||||
LEFT JOIN {schema_prefix}"AgentGraph" graph
|
||||
ON node."agentGraphId" = graph.id
|
||||
WHERE node."constantInput"::jsonb->'credentials'->>'provider' = 'llm'
|
||||
ORDER BY graph."userId";
|
||||
|
||||
@@ -5,6 +5,7 @@ from backend.data import db
|
||||
from backend.data.credit import UsageTransactionMetadata, get_user_credit_model
|
||||
from backend.data.execution import (
|
||||
create_graph_execution,
|
||||
get_block_error_stats,
|
||||
get_execution_kv_data,
|
||||
get_graph_execution,
|
||||
get_graph_execution_meta,
|
||||
@@ -105,6 +106,7 @@ class DatabaseManager(AppService):
|
||||
upsert_execution_output = _(upsert_execution_output)
|
||||
get_execution_kv_data = _(get_execution_kv_data)
|
||||
set_execution_kv_data = _(set_execution_kv_data)
|
||||
get_block_error_stats = _(get_block_error_stats)
|
||||
|
||||
# Graphs
|
||||
get_node = _(get_node)
|
||||
@@ -199,6 +201,9 @@ class DatabaseManagerClient(AppServiceClient):
|
||||
d.get_user_notification_oldest_message_in_batch
|
||||
)
|
||||
|
||||
# Block error monitoring
|
||||
get_block_error_stats = _(d.get_block_error_stats)
|
||||
|
||||
|
||||
class DatabaseManagerAsyncClient(AppServiceClient):
|
||||
d = DatabaseManager
|
||||
@@ -226,3 +231,4 @@ class DatabaseManagerAsyncClient(AppServiceClient):
|
||||
update_user_integrations = d.update_user_integrations
|
||||
get_execution_kv_data = d.get_execution_kv_data
|
||||
set_execution_kv_data = d.set_execution_kv_data
|
||||
get_block_error_stats = d.get_block_error_stats
|
||||
|
||||
@@ -877,6 +877,7 @@ class Executor:
|
||||
ExecutionStatus.QUEUED,
|
||||
ExecutionStatus.RUNNING,
|
||||
],
|
||||
include_exec_data=False,
|
||||
)
|
||||
db_client.update_node_execution_status_batch(
|
||||
[node_exec.node_exec_id for node_exec in inflight_executions],
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
|
||||
@@ -14,25 +13,23 @@ from apscheduler.schedulers.blocking import BlockingScheduler
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
from autogpt_libs.utils.cache import thread_cached
|
||||
from dotenv import load_dotenv
|
||||
from prisma.enums import NotificationType
|
||||
from pydantic import BaseModel, Field, ValidationError
|
||||
from sqlalchemy import MetaData, create_engine
|
||||
|
||||
from backend.data.block import BlockInput
|
||||
from backend.data.execution import ExecutionStatus
|
||||
from backend.data.execution import GraphExecutionWithNodes
|
||||
from backend.data.model import CredentialsMetaInput
|
||||
from backend.executor import utils as execution_utils
|
||||
from backend.notifications.notifications import NotificationManagerClient
|
||||
from backend.monitoring import (
|
||||
NotificationJobArgs,
|
||||
process_existing_batches,
|
||||
process_weekly_summary,
|
||||
report_block_error_rates,
|
||||
report_late_executions,
|
||||
)
|
||||
from backend.util.exceptions import NotAuthorizedError, NotFoundError
|
||||
from backend.util.logging import PrefixFilter
|
||||
from backend.util.metrics import sentry_capture_error
|
||||
from backend.util.service import (
|
||||
AppService,
|
||||
AppServiceClient,
|
||||
endpoint_to_async,
|
||||
expose,
|
||||
get_service_client,
|
||||
)
|
||||
from backend.util.service import AppService, AppServiceClient, endpoint_to_async, expose
|
||||
from backend.util.settings import Config
|
||||
|
||||
|
||||
@@ -71,11 +68,6 @@ def job_listener(event):
|
||||
logger.info(f"Job {event.job_id} completed successfully.")
|
||||
|
||||
|
||||
@thread_cached
|
||||
def get_notification_client():
|
||||
return get_service_client(NotificationManagerClient)
|
||||
|
||||
|
||||
@thread_cached
|
||||
def get_event_loop():
|
||||
return asyncio.new_event_loop()
|
||||
@@ -89,7 +81,7 @@ async def _execute_graph(**kwargs):
|
||||
args = GraphExecutionJobArgs(**kwargs)
|
||||
try:
|
||||
logger.info(f"Executing recurring job for graph #{args.graph_id}")
|
||||
await execution_utils.add_graph_execution(
|
||||
graph_exec: GraphExecutionWithNodes = await execution_utils.add_graph_execution(
|
||||
user_id=args.user_id,
|
||||
graph_id=args.graph_id,
|
||||
graph_version=args.graph_version,
|
||||
@@ -97,65 +89,14 @@ async def _execute_graph(**kwargs):
|
||||
graph_credentials_inputs=args.input_credentials,
|
||||
use_db_query=False,
|
||||
)
|
||||
logger.info(
|
||||
f"Graph execution started with ID {graph_exec.id} for graph {args.graph_id}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error executing graph {args.graph_id}: {e}")
|
||||
|
||||
|
||||
class LateExecutionException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def report_late_executions() -> str:
|
||||
late_executions = execution_utils.get_db_client().get_graph_executions(
|
||||
statuses=[ExecutionStatus.QUEUED],
|
||||
created_time_gte=datetime.now(timezone.utc)
|
||||
- timedelta(seconds=config.execution_late_notification_checkrange_secs),
|
||||
created_time_lte=datetime.now(timezone.utc)
|
||||
- timedelta(seconds=config.execution_late_notification_threshold_secs),
|
||||
limit=1000,
|
||||
)
|
||||
|
||||
if not late_executions:
|
||||
return "No late executions detected."
|
||||
|
||||
num_late_executions = len(late_executions)
|
||||
num_users = len(set([r.user_id for r in late_executions]))
|
||||
|
||||
late_execution_details = [
|
||||
f"* `Execution ID: {exec.id}, Graph ID: {exec.graph_id}v{exec.graph_version}, User ID: {exec.user_id}, Created At: {exec.started_at.isoformat()}`"
|
||||
for exec in late_executions
|
||||
]
|
||||
|
||||
error = LateExecutionException(
|
||||
f"Late executions detected: {num_late_executions} late executions from {num_users} users "
|
||||
f"in the last {config.execution_late_notification_checkrange_secs} seconds. "
|
||||
f"Graph has been queued for more than {config.execution_late_notification_threshold_secs} seconds. "
|
||||
"Please check the executor status. Details:\n"
|
||||
+ "\n".join(late_execution_details)
|
||||
)
|
||||
msg = str(error)
|
||||
sentry_capture_error(error)
|
||||
get_notification_client().discord_system_alert(msg)
|
||||
return msg
|
||||
|
||||
|
||||
def process_existing_batches(**kwargs):
|
||||
args = NotificationJobArgs(**kwargs)
|
||||
try:
|
||||
logger.info(
|
||||
f"Processing existing batches for notification type {args.notification_types}"
|
||||
)
|
||||
get_notification_client().process_existing_batches(args.notification_types)
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing existing batches: {e}")
|
||||
|
||||
|
||||
def process_weekly_summary(**kwargs):
|
||||
try:
|
||||
logger.info("Processing weekly summary")
|
||||
get_notification_client().queue_weekly_summary()
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing weekly summary: {e}")
|
||||
# Monitoring functions are now imported from monitoring module
|
||||
|
||||
|
||||
class Jobstores(Enum):
|
||||
@@ -190,11 +131,6 @@ class GraphExecutionJobInfo(GraphExecutionJobArgs):
|
||||
)
|
||||
|
||||
|
||||
class NotificationJobArgs(BaseModel):
|
||||
notification_types: list[NotificationType]
|
||||
cron: str
|
||||
|
||||
|
||||
class NotificationJobInfo(NotificationJobArgs):
|
||||
id: str
|
||||
name: str
|
||||
@@ -287,6 +223,16 @@ class Scheduler(AppService):
|
||||
jobstore=Jobstores.EXECUTION.value,
|
||||
)
|
||||
|
||||
# Block Error Rate Monitoring
|
||||
self.scheduler.add_job(
|
||||
report_block_error_rates,
|
||||
id="report_block_error_rates",
|
||||
trigger="interval",
|
||||
replace_existing=True,
|
||||
seconds=config.block_error_rate_check_interval_secs,
|
||||
jobstore=Jobstores.EXECUTION.value,
|
||||
)
|
||||
|
||||
self.scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
|
||||
self.scheduler.start()
|
||||
|
||||
@@ -379,6 +325,10 @@ class Scheduler(AppService):
|
||||
def execute_report_late_executions(self):
|
||||
return report_late_executions()
|
||||
|
||||
@expose
|
||||
def execute_report_block_error_rates(self):
|
||||
return report_block_error_rates()
|
||||
|
||||
|
||||
class SchedulerClient(AppServiceClient):
|
||||
@classmethod
|
||||
|
||||
@@ -731,6 +731,7 @@ async def stop_graph_execution(
|
||||
node_execs = await db.get_node_executions(
|
||||
graph_exec_id=graph_exec_id,
|
||||
statuses=[ExecutionStatus.QUEUED, ExecutionStatus.INCOMPLETE],
|
||||
include_exec_data=False,
|
||||
)
|
||||
await db.update_node_execution_status_batch(
|
||||
[node_exec.node_exec_id for node_exec in node_execs],
|
||||
|
||||
24
autogpt_platform/backend/backend/monitoring/__init__.py
Normal file
24
autogpt_platform/backend/backend/monitoring/__init__.py
Normal file
@@ -0,0 +1,24 @@
|
||||
"""Monitoring module for platform health and alerting."""
|
||||
|
||||
from .block_error_monitor import BlockErrorMonitor, report_block_error_rates
|
||||
from .late_execution_monitor import (
|
||||
LateExecutionException,
|
||||
LateExecutionMonitor,
|
||||
report_late_executions,
|
||||
)
|
||||
from .notification_monitor import (
|
||||
NotificationJobArgs,
|
||||
process_existing_batches,
|
||||
process_weekly_summary,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"BlockErrorMonitor",
|
||||
"LateExecutionMonitor",
|
||||
"LateExecutionException",
|
||||
"NotificationJobArgs",
|
||||
"report_block_error_rates",
|
||||
"report_late_executions",
|
||||
"process_existing_batches",
|
||||
"process_weekly_summary",
|
||||
]
|
||||
@@ -0,0 +1,291 @@
|
||||
"""Block error rate monitoring module."""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from backend.data.block import get_block
|
||||
from backend.data.execution import ExecutionStatus, NodeExecutionResult
|
||||
from backend.executor import utils as execution_utils
|
||||
from backend.notifications.notifications import NotificationManagerClient
|
||||
from backend.util.metrics import sentry_capture_error
|
||||
from backend.util.service import get_service_client
|
||||
from backend.util.settings import Config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
config = Config()
|
||||
|
||||
|
||||
class BlockStatsWithSamples(BaseModel):
|
||||
"""Enhanced block stats with error samples."""
|
||||
|
||||
block_id: str
|
||||
block_name: str
|
||||
total_executions: int
|
||||
failed_executions: int
|
||||
error_samples: list[str] = []
|
||||
|
||||
@property
|
||||
def error_rate(self) -> float:
|
||||
"""Calculate error rate as a percentage."""
|
||||
if self.total_executions == 0:
|
||||
return 0.0
|
||||
return (self.failed_executions / self.total_executions) * 100
|
||||
|
||||
|
||||
class BlockErrorMonitor:
|
||||
"""Monitor block error rates and send alerts when thresholds are exceeded."""
|
||||
|
||||
def __init__(self, include_top_blocks: int | None = None):
|
||||
self.config = config
|
||||
self.notification_client = get_service_client(NotificationManagerClient)
|
||||
self.include_top_blocks = (
|
||||
include_top_blocks
|
||||
if include_top_blocks is not None
|
||||
else config.block_error_include_top_blocks
|
||||
)
|
||||
|
||||
def check_block_error_rates(self) -> str:
|
||||
"""Check block error rates and send Discord alerts if thresholds are exceeded."""
|
||||
try:
|
||||
logger.info("Checking block error rates")
|
||||
|
||||
# Get executions from the last 24 hours
|
||||
end_time = datetime.now(timezone.utc)
|
||||
start_time = end_time - timedelta(hours=24)
|
||||
|
||||
# Use SQL aggregation to efficiently count totals and failures by block
|
||||
block_stats = self._get_block_stats_from_db(start_time, end_time)
|
||||
|
||||
# For blocks with high error rates, fetch error samples
|
||||
threshold = self.config.block_error_rate_threshold
|
||||
for block_name, stats in block_stats.items():
|
||||
if stats.total_executions >= 10 and stats.error_rate >= threshold * 100:
|
||||
# Only fetch error samples for blocks that exceed threshold
|
||||
error_samples = self._get_error_samples_for_block(
|
||||
stats.block_id, start_time, end_time, limit=3
|
||||
)
|
||||
stats.error_samples = error_samples
|
||||
|
||||
# Check thresholds and send alerts
|
||||
critical_alerts = self._generate_critical_alerts(block_stats, threshold)
|
||||
|
||||
if critical_alerts:
|
||||
msg = "Block Error Rate Alert:\n\n" + "\n\n".join(critical_alerts)
|
||||
self.notification_client.discord_system_alert(msg)
|
||||
logger.info(
|
||||
f"Sent block error rate alert for {len(critical_alerts)} blocks"
|
||||
)
|
||||
return f"Alert sent for {len(critical_alerts)} blocks with high error rates"
|
||||
|
||||
# If no critical alerts, check if we should show top blocks
|
||||
if self.include_top_blocks > 0:
|
||||
top_blocks_msg = self._generate_top_blocks_alert(
|
||||
block_stats, start_time, end_time
|
||||
)
|
||||
if top_blocks_msg:
|
||||
self.notification_client.discord_system_alert(top_blocks_msg)
|
||||
logger.info("Sent top blocks summary")
|
||||
return "Sent top blocks summary"
|
||||
|
||||
logger.info("No blocks exceeded error rate threshold")
|
||||
return "No errors reported for today"
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Error checking block error rates: {e}")
|
||||
|
||||
error = Exception(f"Error checking block error rates: {e}")
|
||||
msg = str(error)
|
||||
sentry_capture_error(error)
|
||||
self.notification_client.discord_system_alert(msg)
|
||||
return msg
|
||||
|
||||
def _get_block_stats_from_db(
|
||||
self, start_time: datetime, end_time: datetime
|
||||
) -> dict[str, BlockStatsWithSamples]:
|
||||
"""Get block execution stats using efficient SQL aggregation."""
|
||||
|
||||
result = execution_utils.get_db_client().get_block_error_stats(
|
||||
start_time, end_time
|
||||
)
|
||||
|
||||
block_stats = {}
|
||||
for stats in result:
|
||||
block_name = b.name if (b := get_block(stats.block_id)) else "Unknown"
|
||||
|
||||
block_stats[block_name] = BlockStatsWithSamples(
|
||||
block_id=stats.block_id,
|
||||
block_name=block_name,
|
||||
total_executions=stats.total_executions,
|
||||
failed_executions=stats.failed_executions,
|
||||
error_samples=[],
|
||||
)
|
||||
|
||||
return block_stats
|
||||
|
||||
def _generate_critical_alerts(
|
||||
self, block_stats: dict[str, BlockStatsWithSamples], threshold: float
|
||||
) -> list[str]:
|
||||
"""Generate alerts for blocks that exceed the error rate threshold."""
|
||||
alerts = []
|
||||
|
||||
for block_name, stats in block_stats.items():
|
||||
if stats.total_executions >= 10 and stats.error_rate >= threshold * 100:
|
||||
error_groups = self._group_similar_errors(stats.error_samples)
|
||||
|
||||
alert_msg = (
|
||||
f"🚨 Block '{block_name}' has {stats.error_rate:.1f}% error rate "
|
||||
f"({stats.failed_executions}/{stats.total_executions}) in the last 24 hours"
|
||||
)
|
||||
|
||||
if error_groups:
|
||||
alert_msg += "\n\n📊 Error Types:"
|
||||
for error_pattern, count in error_groups.items():
|
||||
alert_msg += f"\n• {error_pattern} ({count}x)"
|
||||
|
||||
alerts.append(alert_msg)
|
||||
|
||||
return alerts
|
||||
|
||||
def _generate_top_blocks_alert(
|
||||
self,
|
||||
block_stats: dict[str, BlockStatsWithSamples],
|
||||
start_time: datetime,
|
||||
end_time: datetime,
|
||||
) -> str | None:
|
||||
"""Generate top blocks summary when no critical alerts exist."""
|
||||
top_error_blocks = sorted(
|
||||
[
|
||||
(name, stats)
|
||||
for name, stats in block_stats.items()
|
||||
if stats.total_executions >= 10 and stats.failed_executions > 0
|
||||
],
|
||||
key=lambda x: x[1].failed_executions,
|
||||
reverse=True,
|
||||
)[: self.include_top_blocks]
|
||||
|
||||
if not top_error_blocks:
|
||||
return "✅ No errors reported for today - all blocks are running smoothly!"
|
||||
|
||||
# Get error samples for top blocks
|
||||
for block_name, stats in top_error_blocks:
|
||||
if not stats.error_samples:
|
||||
stats.error_samples = self._get_error_samples_for_block(
|
||||
stats.block_id, start_time, end_time, limit=2
|
||||
)
|
||||
|
||||
count_text = (
|
||||
f"top {self.include_top_blocks}" if self.include_top_blocks > 1 else "top"
|
||||
)
|
||||
alert_msg = f"📊 Daily Error Summary - {count_text} blocks with most errors:"
|
||||
for block_name, stats in top_error_blocks:
|
||||
alert_msg += f"\n• {block_name}: {stats.failed_executions} errors ({stats.error_rate:.1f}% of {stats.total_executions})"
|
||||
|
||||
if stats.error_samples:
|
||||
error_groups = self._group_similar_errors(stats.error_samples)
|
||||
if error_groups:
|
||||
# Show most common error
|
||||
most_common_error = next(iter(error_groups.items()))
|
||||
alert_msg += f"\n └ Most common: {most_common_error[0]}"
|
||||
|
||||
return alert_msg
|
||||
|
||||
def _get_error_samples_for_block(
|
||||
self, block_id: str, start_time: datetime, end_time: datetime, limit: int = 3
|
||||
) -> list[str]:
|
||||
"""Get error samples for a specific block - just a few recent ones."""
|
||||
# Only fetch a small number of recent failed executions for this specific block
|
||||
executions = execution_utils.get_db_client().get_node_executions(
|
||||
block_ids=[block_id],
|
||||
statuses=[ExecutionStatus.FAILED],
|
||||
created_time_gte=start_time,
|
||||
created_time_lte=end_time,
|
||||
limit=limit, # Just get the limit we need
|
||||
)
|
||||
|
||||
error_samples = []
|
||||
for execution in executions:
|
||||
if error_message := self._extract_error_message(execution):
|
||||
masked_error = self._mask_sensitive_data(error_message)
|
||||
error_samples.append(masked_error)
|
||||
|
||||
if len(error_samples) >= limit: # Stop once we have enough samples
|
||||
break
|
||||
|
||||
return error_samples
|
||||
|
||||
def _extract_error_message(self, execution: NodeExecutionResult) -> str | None:
|
||||
"""Extract error message from execution output."""
|
||||
try:
|
||||
if execution.output_data and (
|
||||
error_msg := execution.output_data.get("error")
|
||||
):
|
||||
return str(error_msg[0])
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def _mask_sensitive_data(self, error_message):
|
||||
"""Mask sensitive data in error messages to enable grouping."""
|
||||
if not error_message:
|
||||
return ""
|
||||
|
||||
# Convert to string if not already
|
||||
error_str = str(error_message)
|
||||
|
||||
# Mask numbers (replace with X)
|
||||
error_str = re.sub(r"\d+", "X", error_str)
|
||||
|
||||
# Mask all caps words (likely constants/IDs)
|
||||
error_str = re.sub(r"\b[A-Z_]{3,}\b", "MASKED", error_str)
|
||||
|
||||
# Mask words with underscores (likely internal variables)
|
||||
error_str = re.sub(r"\b\w*_\w*\b", "MASKED", error_str)
|
||||
|
||||
# Mask UUIDs and long alphanumeric strings
|
||||
error_str = re.sub(
|
||||
r"\b[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}\b",
|
||||
"UUID",
|
||||
error_str,
|
||||
)
|
||||
error_str = re.sub(r"\b[a-f0-9]{20,}\b", "HASH", error_str)
|
||||
|
||||
# Mask file paths
|
||||
error_str = re.sub(r"(/[^/\s]+)+", "/MASKED/path", error_str)
|
||||
|
||||
# Mask URLs
|
||||
error_str = re.sub(r"https?://[^\s]+", "URL", error_str)
|
||||
|
||||
# Mask email addresses
|
||||
error_str = re.sub(
|
||||
r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "EMAIL", error_str
|
||||
)
|
||||
|
||||
# Truncate if too long
|
||||
if len(error_str) > 100:
|
||||
error_str = error_str[:97] + "..."
|
||||
|
||||
return error_str.strip()
|
||||
|
||||
def _group_similar_errors(self, error_samples):
|
||||
"""Group similar error messages and return counts."""
|
||||
if not error_samples:
|
||||
return {}
|
||||
|
||||
error_groups = {}
|
||||
for error in error_samples:
|
||||
if error in error_groups:
|
||||
error_groups[error] += 1
|
||||
else:
|
||||
error_groups[error] = 1
|
||||
|
||||
# Sort by frequency, most common first
|
||||
return dict(sorted(error_groups.items(), key=lambda x: x[1], reverse=True))
|
||||
|
||||
|
||||
def report_block_error_rates(include_top_blocks: int | None = None):
|
||||
"""Check block error rates and send Discord alerts if thresholds are exceeded."""
|
||||
monitor = BlockErrorMonitor(include_top_blocks=include_top_blocks)
|
||||
return monitor.check_block_error_rates()
|
||||
@@ -0,0 +1,71 @@
|
||||
"""Late execution monitoring module."""
|
||||
|
||||
import logging
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
from backend.data.execution import ExecutionStatus
|
||||
from backend.executor import utils as execution_utils
|
||||
from backend.notifications.notifications import NotificationManagerClient
|
||||
from backend.util.metrics import sentry_capture_error
|
||||
from backend.util.service import get_service_client
|
||||
from backend.util.settings import Config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
config = Config()
|
||||
|
||||
|
||||
class LateExecutionException(Exception):
|
||||
"""Exception raised when late executions are detected."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class LateExecutionMonitor:
|
||||
"""Monitor late executions and send alerts when thresholds are exceeded."""
|
||||
|
||||
def __init__(self):
|
||||
self.config = config
|
||||
self.notification_client = get_service_client(NotificationManagerClient)
|
||||
|
||||
def check_late_executions(self) -> str:
|
||||
"""Check for late executions and send alerts if found."""
|
||||
late_executions = execution_utils.get_db_client().get_graph_executions(
|
||||
statuses=[ExecutionStatus.QUEUED],
|
||||
created_time_gte=datetime.now(timezone.utc)
|
||||
- timedelta(
|
||||
seconds=self.config.execution_late_notification_checkrange_secs
|
||||
),
|
||||
created_time_lte=datetime.now(timezone.utc)
|
||||
- timedelta(seconds=self.config.execution_late_notification_threshold_secs),
|
||||
limit=1000,
|
||||
)
|
||||
|
||||
if not late_executions:
|
||||
return "No late executions detected."
|
||||
|
||||
num_late_executions = len(late_executions)
|
||||
num_users = len(set([r.user_id for r in late_executions]))
|
||||
|
||||
late_execution_details = [
|
||||
f"* `Execution ID: {exec.id}, Graph ID: {exec.graph_id}v{exec.graph_version}, User ID: {exec.user_id}, Created At: {exec.started_at.isoformat()}`"
|
||||
for exec in late_executions
|
||||
]
|
||||
|
||||
error = LateExecutionException(
|
||||
f"Late executions detected: {num_late_executions} late executions from {num_users} users "
|
||||
f"in the last {self.config.execution_late_notification_checkrange_secs} seconds. "
|
||||
f"Graph has been queued for more than {self.config.execution_late_notification_threshold_secs} seconds. "
|
||||
"Please check the executor status. Details:\n"
|
||||
+ "\n".join(late_execution_details)
|
||||
)
|
||||
msg = str(error)
|
||||
|
||||
sentry_capture_error(error)
|
||||
self.notification_client.discord_system_alert(msg)
|
||||
return msg
|
||||
|
||||
|
||||
def report_late_executions() -> str:
|
||||
"""Check for late executions and send Discord alerts if found."""
|
||||
monitor = LateExecutionMonitor()
|
||||
return monitor.check_late_executions()
|
||||
@@ -0,0 +1,39 @@
|
||||
"""Notification processing monitoring module."""
|
||||
|
||||
import logging
|
||||
|
||||
from prisma.enums import NotificationType
|
||||
from pydantic import BaseModel
|
||||
|
||||
from backend.notifications.notifications import NotificationManagerClient
|
||||
from backend.util.service import get_service_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NotificationJobArgs(BaseModel):
|
||||
notification_types: list[NotificationType]
|
||||
cron: str
|
||||
|
||||
|
||||
def process_existing_batches(**kwargs):
|
||||
"""Process existing notification batches."""
|
||||
args = NotificationJobArgs(**kwargs)
|
||||
try:
|
||||
logging.info(
|
||||
f"Processing existing batches for notification type {args.notification_types}"
|
||||
)
|
||||
get_service_client(NotificationManagerClient).process_existing_batches(
|
||||
args.notification_types
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(f"Error processing existing batches: {e}")
|
||||
|
||||
|
||||
def process_weekly_summary(**kwargs):
|
||||
"""Process weekly summary notifications."""
|
||||
try:
|
||||
logging.info("Processing weekly summary")
|
||||
get_service_client(NotificationManagerClient).queue_weekly_summary()
|
||||
except Exception as e:
|
||||
logger.exception(f"Error processing weekly summary: {e}")
|
||||
@@ -124,6 +124,19 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
|
||||
description="Time in seconds for how far back to check for the late executions.",
|
||||
)
|
||||
|
||||
block_error_rate_threshold: float = Field(
|
||||
default=0.5,
|
||||
description="Error rate threshold (0.0-1.0) for triggering block error alerts.",
|
||||
)
|
||||
block_error_rate_check_interval_secs: int = Field(
|
||||
default=24 * 60 * 60, # 24 hours
|
||||
description="Interval in seconds between block error rate checks.",
|
||||
)
|
||||
block_error_include_top_blocks: int = Field(
|
||||
default=3,
|
||||
description="Number of top blocks with most errors to show when no blocks exceed threshold (0 to disable).",
|
||||
)
|
||||
|
||||
model_config = SettingsConfigDict(
|
||||
env_file=".env",
|
||||
extra="allow",
|
||||
|
||||
Reference in New Issue
Block a user