mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
Compare commits
19 Commits
harness
...
ntindle/sy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1dcb4d9d09 | ||
|
|
076b878233 | ||
|
|
b7228b93fe | ||
|
|
6ba7ecd41f | ||
|
|
6ca0ad70a6 | ||
|
|
8c2da07317 | ||
|
|
07f5913715 | ||
|
|
85548a49b6 | ||
|
|
66c2260256 | ||
|
|
640953f7a0 | ||
|
|
b9df7544b7 | ||
|
|
07b2ce2816 | ||
|
|
bc5a3d28fb | ||
|
|
5485a9666b | ||
|
|
3ed3ad9d62 | ||
|
|
60cdcbaa67 | ||
|
|
b663945c66 | ||
|
|
2f9f74fce9 | ||
|
|
c559431131 |
1
.claude/scheduled_tasks.lock
Normal file
1
.claude/scheduled_tasks.lock
Normal file
@@ -0,0 +1 @@
|
||||
{"sessionId":"3b78edab-00e7-4b48-8496-58c7b070d0ea","pid":25870,"acquiredAt":1776291768448}
|
||||
@@ -157,6 +157,7 @@ POSTMARK_WEBHOOK_TOKEN=
|
||||
|
||||
# Error Tracking
|
||||
SENTRY_DSN=
|
||||
ALLQUIET_WEBHOOK_URL=
|
||||
|
||||
# Feature Flags
|
||||
LAUNCH_DARKLY_SDK_KEY=
|
||||
|
||||
@@ -456,11 +456,25 @@ def handle_insufficient_funds_notif(
|
||||
f"[View User Details]({base_url}/admin/spending?search={user_email})"
|
||||
)
|
||||
|
||||
get_notification_manager_client().discord_system_alert(
|
||||
alert_message, DiscordChannel.PRODUCT
|
||||
correlation_id = f"insufficient_funds_{user_id}"
|
||||
|
||||
get_notification_manager_client().system_alert(
|
||||
content=alert_message,
|
||||
channel=DiscordChannel.PRODUCT,
|
||||
correlation_id=correlation_id,
|
||||
severity="critical",
|
||||
status="open",
|
||||
extra_attributes={
|
||||
"user_id": user_id,
|
||||
"user_email": user_email or "unknown",
|
||||
"balance": f"${e.balance / 100:.2f}",
|
||||
"attempted_cost": f"${abs(e.amount) / 100:.2f}",
|
||||
"shortfall": f"${abs(shortfall) / 100:.2f}",
|
||||
"agent_name": metadata.name if metadata else "Unknown",
|
||||
},
|
||||
)
|
||||
except Exception as alert_error:
|
||||
logger.error(f"Failed to send insufficient funds Discord alert: {alert_error}")
|
||||
logger.error(f"Failed to send insufficient funds alert: {alert_error}")
|
||||
|
||||
|
||||
def handle_low_balance(
|
||||
@@ -502,8 +516,21 @@ def handle_low_balance(
|
||||
f"Transaction cost: ${transaction_cost / 100:.2f}\n"
|
||||
f"[View User Details]({base_url}/admin/spending?search={user_email})"
|
||||
)
|
||||
get_notification_manager_client().discord_system_alert(
|
||||
alert_message, DiscordChannel.PRODUCT
|
||||
correlation_id = f"low_balance_{user_id}"
|
||||
|
||||
get_notification_manager_client().system_alert(
|
||||
content=alert_message,
|
||||
channel=DiscordChannel.PRODUCT,
|
||||
correlation_id=correlation_id,
|
||||
severity="warning",
|
||||
status="open",
|
||||
extra_attributes={
|
||||
"user_id": user_id,
|
||||
"user_email": user_email or "unknown",
|
||||
"current_balance": f"${current_balance / 100:.2f}",
|
||||
"transaction_cost": f"${transaction_cost / 100:.2f}",
|
||||
"threshold": f"${LOW_BALANCE_THRESHOLD / 100:.2f}",
|
||||
},
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to send low balance Discord alert: {e}")
|
||||
logger.warning(f"Failed to send low balance alert: {e}")
|
||||
|
||||
@@ -84,12 +84,14 @@ async def test_handle_insufficient_funds_sends_discord_alert_first_time(
|
||||
assert call_args[0][0] == expected_key
|
||||
assert call_args[1]["nx"] is True
|
||||
|
||||
# Verify Discord alert was sent
|
||||
mock_client.discord_system_alert.assert_called_once()
|
||||
discord_message = mock_client.discord_system_alert.call_args[0][0]
|
||||
# Verify system alert was sent (includes Discord and AllQuiet)
|
||||
mock_client.system_alert.assert_called_once()
|
||||
alert_call = mock_client.system_alert.call_args
|
||||
discord_message = alert_call[1]["content"]
|
||||
assert "Insufficient Funds Alert" in discord_message
|
||||
assert "test@example.com" in discord_message
|
||||
assert "Test Agent" in discord_message
|
||||
assert alert_call[1]["correlation_id"] == f"insufficient_funds_{user_id}"
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
@@ -142,8 +144,8 @@ async def test_handle_insufficient_funds_skips_duplicate_notifications(
|
||||
# Verify email notification was NOT queued (deduplication worked)
|
||||
mock_queue_notif.assert_not_called()
|
||||
|
||||
# Verify Discord alert was NOT sent (deduplication worked)
|
||||
mock_client.discord_system_alert.assert_not_called()
|
||||
# Verify system alert was NOT sent (deduplication worked)
|
||||
mock_client.system_alert.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
@@ -202,8 +204,8 @@ async def test_handle_insufficient_funds_different_agents_get_separate_alerts(
|
||||
e=error,
|
||||
)
|
||||
|
||||
# Verify Discord alerts were sent for both agents
|
||||
assert mock_client.discord_system_alert.call_count == 2
|
||||
# Verify system alerts were sent for both agents
|
||||
assert mock_client.system_alert.call_count == 2
|
||||
|
||||
# Verify Redis was called with different keys
|
||||
assert mock_redis_client.set.call_count == 2
|
||||
@@ -352,8 +354,8 @@ async def test_handle_insufficient_funds_continues_on_redis_error(
|
||||
# Verify email notification was still queued despite Redis error
|
||||
mock_queue_notif.assert_called_once()
|
||||
|
||||
# Verify Discord alert was still sent despite Redis error
|
||||
mock_client.discord_system_alert.assert_called_once()
|
||||
# Verify system alert was still sent despite Redis error
|
||||
mock_client.system_alert.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
|
||||
@@ -53,13 +53,16 @@ async def test_handle_low_balance_threshold_crossing(server: SpinTestServer):
|
||||
assert isinstance(notification_call.data, LowBalanceData)
|
||||
assert notification_call.data.current_balance == current_balance
|
||||
|
||||
# Verify Discord alert was sent
|
||||
mock_client.discord_system_alert.assert_called_once()
|
||||
discord_message = mock_client.discord_system_alert.call_args[0][0]
|
||||
# Verify system alert was sent (includes Discord and AllQuiet)
|
||||
mock_client.system_alert.assert_called_once()
|
||||
alert_call = mock_client.system_alert.call_args
|
||||
discord_message = alert_call[1]["content"]
|
||||
assert "Low Balance Alert" in discord_message
|
||||
assert "test@example.com" in discord_message
|
||||
assert "$4.00" in discord_message
|
||||
assert "$6.00" in discord_message
|
||||
# Verify AllQuiet correlation ID is set
|
||||
assert alert_call[1]["correlation_id"] == f"low_balance_{user_id}"
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
@@ -101,7 +104,7 @@ async def test_handle_low_balance_no_notification_when_not_crossing(
|
||||
|
||||
# Verify no notification was sent
|
||||
mock_queue_notif.assert_not_called()
|
||||
mock_client.discord_system_alert.assert_not_called()
|
||||
mock_client.system_alert.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
@@ -143,4 +146,4 @@ async def test_handle_low_balance_no_duplicate_when_already_below(
|
||||
|
||||
# Verify no notification was sent (user was already below threshold)
|
||||
mock_queue_notif.assert_not_called()
|
||||
mock_client.discord_system_alert.assert_not_called()
|
||||
mock_client.system_alert.assert_not_called()
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Block error rate monitoring module."""
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
import re
|
||||
from datetime import datetime, timedelta, timezone
|
||||
@@ -12,7 +13,7 @@ from backend.util.clients import (
|
||||
get_database_manager_client,
|
||||
get_notification_manager_client,
|
||||
)
|
||||
from backend.util.metrics import sentry_capture_error
|
||||
from backend.util.metrics import DiscordChannel, sentry_capture_error
|
||||
from backend.util.settings import Config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -75,7 +76,31 @@ class BlockErrorMonitor:
|
||||
|
||||
if critical_alerts:
|
||||
msg = "Block Error Rate Alert:\n\n" + "\n\n".join(critical_alerts)
|
||||
self.notification_client.discord_system_alert(msg)
|
||||
|
||||
blocks_with_errors = sorted(
|
||||
stats.block_id
|
||||
for name, stats in block_stats.items()
|
||||
if stats.total_executions >= 10
|
||||
and stats.error_rate >= threshold * 100
|
||||
)
|
||||
blocks_hash = hashlib.md5(
|
||||
",".join(blocks_with_errors).encode()
|
||||
).hexdigest()[:8]
|
||||
correlation_id = f"block_errors_{blocks_hash}_{end_time.date()}"
|
||||
|
||||
self.notification_client.system_alert(
|
||||
content=msg,
|
||||
channel=DiscordChannel.PLATFORM,
|
||||
correlation_id=correlation_id,
|
||||
severity="warning",
|
||||
status="open",
|
||||
extra_attributes={
|
||||
"blocks_affected": str(len(critical_alerts)),
|
||||
"date": end_time.date().isoformat(),
|
||||
"threshold": f"{threshold * 100}%",
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Sent block error rate alert for {len(critical_alerts)} blocks"
|
||||
)
|
||||
@@ -87,7 +112,22 @@ class BlockErrorMonitor:
|
||||
block_stats, start_time, end_time
|
||||
)
|
||||
if top_blocks_msg:
|
||||
self.notification_client.discord_system_alert(top_blocks_msg)
|
||||
# Daily summary gets a date-based correlation ID
|
||||
correlation_id = f"block_error_daily_summary_{end_time.date()}"
|
||||
|
||||
self.notification_client.system_alert(
|
||||
content=top_blocks_msg,
|
||||
channel=DiscordChannel.PLATFORM,
|
||||
correlation_id=correlation_id,
|
||||
severity="minor",
|
||||
status="open",
|
||||
extra_attributes={
|
||||
"type": "daily_summary",
|
||||
"date": end_time.date().isoformat(),
|
||||
"top_blocks_count": str(self.include_top_blocks),
|
||||
},
|
||||
)
|
||||
|
||||
logger.info("Sent top blocks summary")
|
||||
return "Sent top blocks summary"
|
||||
|
||||
@@ -100,7 +140,22 @@ class BlockErrorMonitor:
|
||||
error = Exception(f"Error checking block error rates: {e}")
|
||||
msg = str(error)
|
||||
sentry_capture_error(error)
|
||||
self.notification_client.discord_system_alert(msg)
|
||||
|
||||
# Send error alert with generic correlation ID
|
||||
correlation_id = "block_error_monitoring_failure"
|
||||
|
||||
self.notification_client.system_alert(
|
||||
content=msg,
|
||||
channel=DiscordChannel.PLATFORM,
|
||||
correlation_id=correlation_id,
|
||||
severity="critical",
|
||||
status="open",
|
||||
extra_attributes={
|
||||
"error_type": type(e).__name__,
|
||||
"error_message": str(e)[:200],
|
||||
},
|
||||
)
|
||||
|
||||
return msg
|
||||
|
||||
def _get_block_stats_from_db(
|
||||
|
||||
@@ -8,7 +8,7 @@ from backend.util.clients import (
|
||||
get_database_manager_client,
|
||||
get_notification_manager_client,
|
||||
)
|
||||
from backend.util.metrics import sentry_capture_error
|
||||
from backend.util.metrics import DiscordChannel, sentry_capture_error
|
||||
from backend.util.settings import Config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -102,7 +102,37 @@ class LateExecutionMonitor:
|
||||
msg = str(error)
|
||||
|
||||
sentry_capture_error(error)
|
||||
self.notification_client.discord_system_alert(msg)
|
||||
|
||||
late_execution_types = []
|
||||
if queued_late_executions:
|
||||
late_execution_types.append("queued")
|
||||
if running_late_executions:
|
||||
late_execution_types.append("running")
|
||||
|
||||
correlation_id = (
|
||||
"late_execution_"
|
||||
f"{'_'.join(late_execution_types)}_"
|
||||
f"{self.config.execution_late_notification_threshold_secs}s"
|
||||
)
|
||||
|
||||
# Send both Discord and AllQuiet alerts
|
||||
self.notification_client.system_alert(
|
||||
content=msg,
|
||||
channel=DiscordChannel.PLATFORM,
|
||||
correlation_id=correlation_id,
|
||||
severity="critical",
|
||||
status="open",
|
||||
extra_attributes={
|
||||
"total_late_executions": str(num_total_late),
|
||||
"queued_executions": str(num_queued),
|
||||
"running_executions": str(num_running),
|
||||
"affected_users": str(num_users),
|
||||
"threshold_seconds": str(
|
||||
self.config.execution_late_notification_threshold_secs
|
||||
),
|
||||
},
|
||||
)
|
||||
|
||||
return msg
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,54 @@
|
||||
from datetime import datetime, timezone
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from backend.data.execution import ExecutionStatus
|
||||
from backend.monitoring.late_execution_monitor import LateExecutionMonitor
|
||||
from backend.util.metrics import DiscordChannel
|
||||
|
||||
|
||||
def test_late_execution_alert_uses_stable_correlation_id():
|
||||
queued_execution = SimpleNamespace(
|
||||
id="exec-1",
|
||||
graph_id="graph-1",
|
||||
graph_version=1,
|
||||
user_id="user-1",
|
||||
status=ExecutionStatus.QUEUED,
|
||||
started_at=None,
|
||||
)
|
||||
running_execution = SimpleNamespace(
|
||||
id="exec-2",
|
||||
graph_id="graph-2",
|
||||
graph_version=1,
|
||||
user_id="user-2",
|
||||
status=ExecutionStatus.RUNNING,
|
||||
started_at=datetime.now(timezone.utc),
|
||||
)
|
||||
|
||||
with patch(
|
||||
"backend.monitoring.late_execution_monitor.get_database_manager_client"
|
||||
) as mock_get_database_manager_client, patch(
|
||||
"backend.monitoring.late_execution_monitor.get_notification_manager_client"
|
||||
) as mock_get_notification_manager_client, patch(
|
||||
"backend.monitoring.late_execution_monitor.sentry_capture_error"
|
||||
):
|
||||
mock_db_client = MagicMock()
|
||||
mock_db_client.get_graph_executions.side_effect = [
|
||||
[queued_execution],
|
||||
[running_execution],
|
||||
]
|
||||
mock_get_database_manager_client.return_value = mock_db_client
|
||||
|
||||
mock_notification_client = MagicMock()
|
||||
mock_get_notification_manager_client.return_value = mock_notification_client
|
||||
|
||||
monitor = LateExecutionMonitor()
|
||||
monitor.check_late_executions()
|
||||
|
||||
mock_notification_client.system_alert.assert_called_once()
|
||||
assert mock_notification_client.system_alert.call_args.kwargs["channel"] == (
|
||||
DiscordChannel.PLATFORM
|
||||
)
|
||||
assert mock_notification_client.system_alert.call_args.kwargs["correlation_id"] == (
|
||||
f"late_execution_queued_running_{monitor.config.execution_late_notification_threshold_secs}s"
|
||||
)
|
||||
@@ -1,7 +1,8 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Awaitable, Callable
|
||||
from typing import Awaitable, Callable, Literal
|
||||
|
||||
import aio_pika
|
||||
from prisma.enums import NotificationType
|
||||
@@ -32,8 +33,14 @@ from backend.data.user import (
|
||||
)
|
||||
from backend.notifications.email import EmailSender
|
||||
from backend.util.clients import get_database_manager_async_client
|
||||
from backend.util.feature_flag import Flag, is_feature_enabled
|
||||
from backend.util.logging import TruncatedLogger
|
||||
from backend.util.metrics import DiscordChannel, discord_send_alert
|
||||
from backend.util.metrics import (
|
||||
AllQuietAlert,
|
||||
DiscordChannel,
|
||||
discord_send_alert,
|
||||
send_allquiet_alert,
|
||||
)
|
||||
from backend.util.retry import continuous_retry
|
||||
from backend.util.service import (
|
||||
AppService,
|
||||
@@ -48,6 +55,26 @@ logger = TruncatedLogger(logging.getLogger(__name__), "[NotificationManager]")
|
||||
settings = Settings()
|
||||
|
||||
|
||||
_EMOJI_PATTERN = re.compile(
|
||||
"["
|
||||
"\U0001f300-\U0001f9ff"
|
||||
"\U00002702-\U000027b0"
|
||||
"\U0000fe00-\U0000fe0f"
|
||||
"\U0000200d"
|
||||
"]+",
|
||||
flags=re.UNICODE,
|
||||
)
|
||||
|
||||
|
||||
def _extract_clean_title(content: str, max_length: int = 100) -> str:
|
||||
"""Extract the first line and strip Discord markdown / emoji."""
|
||||
lines = content.split("\n")
|
||||
title = lines[0] if lines else content[:max_length]
|
||||
title = title.replace("**", "")
|
||||
title = _EMOJI_PATTERN.sub("", title).strip()
|
||||
return title[:max_length]
|
||||
|
||||
|
||||
NOTIFICATION_EXCHANGE = Exchange(name="notifications", type=ExchangeType.TOPIC)
|
||||
DEAD_LETTER_EXCHANGE = Exchange(name="dead_letter", type=ExchangeType.TOPIC)
|
||||
EXCHANGES = [NOTIFICATION_EXCHANGE, DEAD_LETTER_EXCHANGE]
|
||||
@@ -420,6 +447,49 @@ class NotificationManager(AppService):
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to send Discord system alert: {e}")
|
||||
|
||||
@expose
|
||||
async def allquiet_system_alert(self, alert: AllQuietAlert):
|
||||
await send_allquiet_alert(alert)
|
||||
|
||||
@expose
|
||||
async def system_alert(
|
||||
self,
|
||||
content: str,
|
||||
channel: DiscordChannel = DiscordChannel.PLATFORM,
|
||||
correlation_id: str | None = None,
|
||||
severity: Literal["warning", "critical", "minor"] = "warning",
|
||||
status: Literal["resolved", "open"] = "open",
|
||||
extra_attributes: dict[str, str] | None = None,
|
||||
):
|
||||
"""Send both Discord and AllQuiet alerts for system events."""
|
||||
discord_error: Exception | None = None
|
||||
try:
|
||||
await discord_send_alert(content, channel)
|
||||
except Exception as e:
|
||||
discord_error = e
|
||||
logger.error(f"Failed to send Discord alert: {e}")
|
||||
|
||||
if correlation_id and await is_feature_enabled(
|
||||
Flag.ALLQUIET_ALERTS, "system", default=True
|
||||
):
|
||||
title = _extract_clean_title(content)
|
||||
alert = AllQuietAlert(
|
||||
severity=severity,
|
||||
status=status,
|
||||
title=title,
|
||||
description=content,
|
||||
correlation_id=correlation_id,
|
||||
channel=channel.value,
|
||||
extra_attributes=extra_attributes or {},
|
||||
)
|
||||
try:
|
||||
await send_allquiet_alert(alert)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to send AllQuiet alert: {e}")
|
||||
|
||||
if discord_error:
|
||||
raise discord_error
|
||||
|
||||
async def _queue_scheduled_notification(self, event: SummaryParamsEventModel):
|
||||
"""Queue a scheduled notification - exposed method for other services to call"""
|
||||
try:
|
||||
@@ -1117,3 +1187,5 @@ class NotificationManagerClient(AppServiceClient):
|
||||
)
|
||||
queue_weekly_summary = endpoint_to_sync(NotificationManager.queue_weekly_summary)
|
||||
discord_system_alert = endpoint_to_sync(NotificationManager.discord_system_alert)
|
||||
allquiet_system_alert = endpoint_to_sync(NotificationManager.allquiet_system_alert)
|
||||
system_alert = endpoint_to_sync(NotificationManager.system_alert)
|
||||
|
||||
@@ -7,7 +7,33 @@ import pytest
|
||||
from prisma.enums import NotificationType
|
||||
|
||||
from backend.data.notifications import AgentRunData, NotificationEventModel
|
||||
from backend.notifications.notifications import NotificationManager
|
||||
from backend.notifications.notifications import (
|
||||
NotificationManager,
|
||||
_extract_clean_title,
|
||||
)
|
||||
from backend.util.metrics import DiscordChannel
|
||||
|
||||
|
||||
class TestExtractCleanTitle:
|
||||
def test_strips_markdown_bold(self):
|
||||
assert _extract_clean_title("**Alert Title**\nBody") == "Alert Title"
|
||||
|
||||
def test_strips_emoji(self):
|
||||
assert _extract_clean_title("🚨 Alert Title") == "Alert Title"
|
||||
|
||||
def test_strips_mixed_emoji_and_markdown(self):
|
||||
result = _extract_clean_title("❌ **Insufficient Funds Alert**\nUser: x")
|
||||
assert result == "Insufficient Funds Alert"
|
||||
|
||||
def test_truncates_at_max_length(self):
|
||||
long = "A" * 200
|
||||
assert len(_extract_clean_title(long, max_length=50)) == 50
|
||||
|
||||
def test_uses_first_line(self):
|
||||
assert _extract_clean_title("First\nSecond\nThird") == "First"
|
||||
|
||||
def test_empty_string(self):
|
||||
assert _extract_clean_title("") == ""
|
||||
|
||||
|
||||
class TestNotificationErrorHandling:
|
||||
@@ -166,6 +192,107 @@ class TestNotificationErrorHandling:
|
||||
|
||||
# No further processing should occur after 406
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_system_alert_sends_discord_and_allquiet_with_correlation_id(
|
||||
self, notification_manager
|
||||
):
|
||||
with patch(
|
||||
"backend.notifications.notifications.discord_send_alert",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_discord_send_alert, patch(
|
||||
"backend.notifications.notifications.send_allquiet_alert",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_send_allquiet_alert, patch(
|
||||
"backend.notifications.notifications.is_feature_enabled",
|
||||
new_callable=AsyncMock,
|
||||
return_value=True,
|
||||
):
|
||||
await notification_manager.system_alert(
|
||||
content="🚨 **Alert Title**\nDetails here",
|
||||
channel=DiscordChannel.PRODUCT,
|
||||
correlation_id="alert-123",
|
||||
severity="critical",
|
||||
extra_attributes={"key": "value"},
|
||||
)
|
||||
|
||||
mock_discord_send_alert.assert_awaited_once_with(
|
||||
"🚨 **Alert Title**\nDetails here", DiscordChannel.PRODUCT
|
||||
)
|
||||
mock_send_allquiet_alert.assert_awaited_once()
|
||||
alert = mock_send_allquiet_alert.await_args_list[0].args[0]
|
||||
assert alert.title == "Alert Title"
|
||||
assert alert.description == "🚨 **Alert Title**\nDetails here"
|
||||
assert alert.correlation_id == "alert-123"
|
||||
assert alert.severity == "critical"
|
||||
assert alert.channel == DiscordChannel.PRODUCT.value
|
||||
assert alert.extra_attributes == {"key": "value"}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_system_alert_sends_allquiet_even_if_discord_fails(
|
||||
self, notification_manager
|
||||
):
|
||||
with patch(
|
||||
"backend.notifications.notifications.discord_send_alert",
|
||||
new_callable=AsyncMock,
|
||||
side_effect=Exception("Discord down"),
|
||||
), patch(
|
||||
"backend.notifications.notifications.send_allquiet_alert",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_send_allquiet_alert, patch(
|
||||
"backend.notifications.notifications.is_feature_enabled",
|
||||
new_callable=AsyncMock,
|
||||
return_value=True,
|
||||
):
|
||||
with pytest.raises(Exception, match="Discord down"):
|
||||
await notification_manager.system_alert(
|
||||
content="🚨 **Alert**\nDetails",
|
||||
correlation_id="fail-test",
|
||||
severity="critical",
|
||||
)
|
||||
|
||||
mock_send_allquiet_alert.assert_awaited_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_system_alert_skips_allquiet_when_flag_disabled(
|
||||
self, notification_manager
|
||||
):
|
||||
with patch(
|
||||
"backend.notifications.notifications.discord_send_alert",
|
||||
new_callable=AsyncMock,
|
||||
), patch(
|
||||
"backend.notifications.notifications.send_allquiet_alert",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_send_allquiet_alert, patch(
|
||||
"backend.notifications.notifications.is_feature_enabled",
|
||||
new_callable=AsyncMock,
|
||||
return_value=False,
|
||||
):
|
||||
await notification_manager.system_alert(
|
||||
content="🚨 **Alert**\nDetails",
|
||||
correlation_id="should-be-skipped",
|
||||
severity="critical",
|
||||
)
|
||||
|
||||
mock_send_allquiet_alert.assert_not_awaited()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_system_alert_skips_allquiet_without_correlation_id(
|
||||
self, notification_manager
|
||||
):
|
||||
with patch(
|
||||
"backend.notifications.notifications.discord_send_alert",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_discord_send_alert, patch(
|
||||
"backend.notifications.notifications.send_allquiet_alert",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_send_allquiet_alert:
|
||||
await notification_manager.system_alert(content="Alert only")
|
||||
|
||||
mock_discord_send_alert.assert_awaited_once_with(
|
||||
"Alert only", DiscordChannel.PLATFORM
|
||||
)
|
||||
mock_send_allquiet_alert.assert_not_awaited()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_422_permanently_removes_malformed_notification(
|
||||
self, notification_manager, sample_batch_event
|
||||
|
||||
@@ -47,6 +47,7 @@ class Flag(str, Enum):
|
||||
STRIPE_PRICE_PRO = "stripe-price-id-pro"
|
||||
STRIPE_PRICE_BUSINESS = "stripe-price-id-business"
|
||||
GRAPHITI_MEMORY = "graphiti-memory"
|
||||
ALLQUIET_ALERTS = "allquiet-alerts"
|
||||
|
||||
|
||||
def is_configured() -> bool:
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import logging
|
||||
from enum import Enum
|
||||
from typing import Literal
|
||||
|
||||
from pydantic import SecretStr
|
||||
from pydantic import BaseModel, Field, SecretStr
|
||||
from sentry_sdk._init_implementation import init as _sentry_init
|
||||
from sentry_sdk.api import capture_exception as _sentry_capture_exception
|
||||
from sentry_sdk.api import flush as _sentry_flush
|
||||
@@ -171,6 +172,38 @@ def sentry_capture_error(error: BaseException):
|
||||
_sentry_flush()
|
||||
|
||||
|
||||
def _default_environment() -> str:
|
||||
return (
|
||||
f"app:{settings.config.app_env.value}-behave:{settings.config.behave_as.value}"
|
||||
)
|
||||
|
||||
|
||||
class AllQuietAlert(BaseModel):
|
||||
severity: Literal["warning", "critical", "minor"]
|
||||
status: Literal["resolved", "open"]
|
||||
title: str | None = None
|
||||
description: str | None = None
|
||||
correlation_id: str | None = None
|
||||
channel: str | None = None
|
||||
extra_attributes: dict[str, str] = Field(default_factory=dict)
|
||||
environment: str = Field(default_factory=_default_environment)
|
||||
|
||||
|
||||
async def send_allquiet_alert(alert: AllQuietAlert):
|
||||
hook_url = settings.secrets.allquiet_webhook_url
|
||||
|
||||
if not hook_url:
|
||||
logging.warning("AllQuiet webhook URL not configured")
|
||||
return
|
||||
|
||||
from backend.util.request import Requests
|
||||
|
||||
try:
|
||||
await Requests().post(hook_url, json=alert.model_dump())
|
||||
except Exception:
|
||||
logging.exception("Failed to POST AllQuiet alert")
|
||||
|
||||
|
||||
async def discord_send_alert(
|
||||
content: str, channel: DiscordChannel = DiscordChannel.PLATFORM
|
||||
):
|
||||
|
||||
@@ -4,6 +4,7 @@ import os
|
||||
import threading
|
||||
import time
|
||||
from functools import wraps
|
||||
from typing import Literal
|
||||
from uuid import uuid4
|
||||
|
||||
from tenacity import (
|
||||
@@ -43,7 +44,14 @@ def should_send_alert(func_name: str, exception: Exception, context: str = "") -
|
||||
|
||||
|
||||
def send_rate_limited_discord_alert(
|
||||
func_name: str, exception: Exception, context: str, alert_msg: str, channel=None
|
||||
func_name: str,
|
||||
exception: Exception,
|
||||
context: str,
|
||||
alert_msg: str,
|
||||
channel=None,
|
||||
correlation_id: str | None = None,
|
||||
severity: Literal["warning", "critical", "minor"] = "critical",
|
||||
extra_attributes: dict | None = None,
|
||||
) -> bool:
|
||||
"""
|
||||
Send a Discord alert with rate limiting.
|
||||
@@ -58,8 +66,13 @@ def send_rate_limited_discord_alert(
|
||||
from backend.util.metrics import DiscordChannel
|
||||
|
||||
notification_client = get_notification_manager_client()
|
||||
notification_client.discord_system_alert(
|
||||
alert_msg, channel or DiscordChannel.PLATFORM
|
||||
notification_client.system_alert(
|
||||
content=alert_msg,
|
||||
channel=channel or DiscordChannel.PLATFORM,
|
||||
correlation_id=correlation_id,
|
||||
severity=severity,
|
||||
status="open",
|
||||
extra_attributes=extra_attributes or {},
|
||||
)
|
||||
return True
|
||||
|
||||
@@ -74,14 +87,30 @@ def _send_critical_retry_alert(
|
||||
"""Send alert when a function is approaching the retry failure threshold."""
|
||||
|
||||
prefix = f"{context}: " if context else ""
|
||||
error_type = type(exception).__name__
|
||||
|
||||
# Create correlation ID based on context, function name, and error type
|
||||
correlation_id = f"retry_failure_{context}_{func_name}_{error_type}".replace(
|
||||
" ", "_"
|
||||
).replace(":", "")
|
||||
|
||||
if send_rate_limited_discord_alert(
|
||||
func_name,
|
||||
exception,
|
||||
context,
|
||||
f"🚨 CRITICAL: Operation Approaching Failure Threshold: {prefix}'{func_name}'\n\n"
|
||||
f"Current attempt: {attempt_number}/{EXCESSIVE_RETRY_THRESHOLD}\n"
|
||||
f"Error: {type(exception).__name__}: {exception}\n\n"
|
||||
f"Error: {error_type}: {exception}\n\n"
|
||||
f"This operation is about to fail permanently. Investigate immediately.",
|
||||
correlation_id=correlation_id,
|
||||
severity="critical",
|
||||
extra_attributes={
|
||||
"function_name": func_name,
|
||||
"attempt_number": str(attempt_number),
|
||||
"max_attempts": str(EXCESSIVE_RETRY_THRESHOLD),
|
||||
"error_type": error_type,
|
||||
"context": context or "none",
|
||||
},
|
||||
):
|
||||
logger.critical(
|
||||
f"CRITICAL ALERT SENT: Operation {func_name} at attempt {attempt_number}"
|
||||
@@ -186,6 +215,14 @@ def conn_retry(
|
||||
pass
|
||||
else:
|
||||
if attempt_number == EXCESSIVE_RETRY_THRESHOLD:
|
||||
error_type = type(exception).__name__
|
||||
# Create correlation ID for infrastructure issues
|
||||
correlation_id = (
|
||||
f"infrastructure_{resource_name}_{action_name}_{func_name}".replace(
|
||||
" ", "_"
|
||||
)
|
||||
)
|
||||
|
||||
if send_rate_limited_discord_alert(
|
||||
func_name,
|
||||
exception,
|
||||
@@ -195,8 +232,18 @@ def conn_retry(
|
||||
f"Action: {action_name}\n"
|
||||
f"Function: {func_name}\n"
|
||||
f"Current attempt: {attempt_number}/{max_retry + 1}\n"
|
||||
f"Error: {type(exception).__name__}: {str(exception)[:200]}{'...' if len(str(exception)) > 200 else ''}\n\n"
|
||||
f"Error: {error_type}: {str(exception)[:200]}{'...' if len(str(exception)) > 200 else ''}\n\n"
|
||||
f"Infrastructure component is approaching failure threshold. Investigate immediately.",
|
||||
correlation_id=correlation_id,
|
||||
severity="critical",
|
||||
extra_attributes={
|
||||
"resource_name": resource_name,
|
||||
"action_name": action_name,
|
||||
"function_name": func_name,
|
||||
"attempt_number": str(attempt_number),
|
||||
"max_attempts": str(max_retry + 1),
|
||||
"error_type": error_type,
|
||||
},
|
||||
):
|
||||
logger.critical(
|
||||
f"INFRASTRUCTURE ALERT SENT: {resource_name} at {attempt_number} attempts"
|
||||
|
||||
@@ -166,16 +166,16 @@ class TestRetryRateLimiting:
|
||||
|
||||
# First alert should be sent
|
||||
_send_critical_retry_alert("spend_credits", 50, exc, "Service communication")
|
||||
assert mock_client.discord_system_alert.call_count == 1
|
||||
assert mock_client.system_alert.call_count == 1
|
||||
|
||||
# Second identical alert should be rate limited (not sent)
|
||||
_send_critical_retry_alert("spend_credits", 50, exc, "Service communication")
|
||||
assert mock_client.discord_system_alert.call_count == 1 # Still 1, not 2
|
||||
assert mock_client.system_alert.call_count == 1 # Still 1, not 2
|
||||
|
||||
# Different error should be allowed
|
||||
exc2 = ValueError("different API error")
|
||||
_send_critical_retry_alert("spend_credits", 50, exc2, "Service communication")
|
||||
assert mock_client.discord_system_alert.call_count == 2
|
||||
assert mock_client.system_alert.call_count == 2
|
||||
|
||||
@patch("backend.util.clients.get_notification_manager_client")
|
||||
def test_send_critical_retry_alert_handles_notification_failure(
|
||||
@@ -183,7 +183,7 @@ class TestRetryRateLimiting:
|
||||
):
|
||||
"""Test that notification failures don't break the rate limiter."""
|
||||
mock_client = Mock()
|
||||
mock_client.discord_system_alert.side_effect = Exception("Notification failed")
|
||||
mock_client.system_alert.side_effect = Exception("Notification failed")
|
||||
mock_get_client.return_value = mock_client
|
||||
|
||||
exc = ValueError("test error")
|
||||
@@ -221,7 +221,7 @@ class TestRetryRateLimiting:
|
||||
_send_critical_retry_alert(
|
||||
"_call_method_sync", 50, exc, "Service communication"
|
||||
)
|
||||
assert mock_client.discord_system_alert.call_count == 1
|
||||
assert mock_client.system_alert.call_count == 1
|
||||
|
||||
# Next 950 failures should not send alerts (rate limited)
|
||||
for _ in range(950):
|
||||
@@ -230,7 +230,7 @@ class TestRetryRateLimiting:
|
||||
)
|
||||
|
||||
# Still only 1 alert sent total
|
||||
assert mock_client.discord_system_alert.call_count == 1
|
||||
assert mock_client.system_alert.call_count == 1
|
||||
|
||||
@patch("backend.util.clients.get_notification_manager_client")
|
||||
def test_retry_decorator_with_excessive_failures(self, mock_get_client):
|
||||
@@ -248,4 +248,4 @@ class TestRetryRateLimiting:
|
||||
always_failing_function()
|
||||
|
||||
# Should have sent exactly one alert at the threshold
|
||||
assert mock_client.discord_system_alert.call_count == 1
|
||||
assert mock_client.system_alert.call_count == 1
|
||||
|
||||
@@ -708,6 +708,7 @@ class Secrets(UpdateTrackingModel["Secrets"], BaseSettings):
|
||||
description="The LaunchDarkly SDK key for feature flag management",
|
||||
)
|
||||
|
||||
allquiet_webhook_url: str = Field(default="", description="AllQuiet webhook URL")
|
||||
agentmail_api_key: str = Field(default="", description="AgentMail API Key")
|
||||
|
||||
ayrshare_api_key: str = Field(default="", description="Ayrshare API Key")
|
||||
|
||||
Reference in New Issue
Block a user