fix(backend): Improve Postmark error handling and logging for notification delivery (#11052)

<!-- Clearly explain the need for these changes: -->
Fixes
[AUTOGPT-SERVER-5K6](https://sentry.io/organizations/significant-gravitas/issues/6887660207/).
The issue was that: Batch sending fails due to malformed data (422) and
inactive recipients (406); the 406 error is misclassified as a size
limit failure.

- Implements more robust error handling for Postmark API failures during
notification sending.
- Specifically handles inactive recipients (HTTP 406), malformed data
(HTTP 422), and oversized notifications.
- Adds detailed logging for each error case, including the notification
index and error message.
- Skips individual notifications that fail due to these errors,
preventing the entire batch from failing.
- Improves error handling for ValueErrors during send_templated calls,
specifically addressing oversized notifications.


This fix was generated by Seer in Sentry, triggered by Nicholas Tindle.
👁️ Run ID: 1675950

Not quite right? [Click here to continue debugging with
Seer.](https://sentry.io/organizations/significant-gravitas/issues/6887660207/?seerDrawer=true)

### Changes 🏗️

<!-- Concisely describe all of the changes made in this pull request:
-->
- Implements more robust error handling for Postmark API failures during
notification sending.
- Specifically handles inactive recipients (HTTP 406), malformed data
(HTTP 422), and oversized notifications.
- Adds detailed logging for each error case, including the notification
index and error message.
- Skips individual notifications that fail due to these errors,
preventing the entire batch from failing.
- Improves error handling for ValueErrors during send_templated calls,
specifically addressing oversized notifications.
- Also disables this in prod to prevent scaling issues until we work out
some of the more critical issues

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
- [x] Test sending notifications with invalid email addresses to ensure
406 errors are handled correctly.
- [x] Test sending notifications with malformed data to ensure 422
errors are handled correctly.
- [x] Test sending oversized notifications to ensure they are skipped
and logged correctly.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- New Features
  - None

- Bug Fixes
- Individual email failures no longer abort a batch; processing
continues after per-recipient errors.
- Specific handling for inactive recipients and malformed messages to
prevent repeated delivery attempts.

- Chores
  - Improved error logging and diagnostics for email delivery scenarios.

- Tests
- Added tests covering email-sending error cases, user-deactivation on
inactive addresses, and batch-continuation behavior.

- Documentation
  - None
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: seer-by-sentry[bot] <157164994+seer-by-sentry[bot]@users.noreply.github.com>
Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com>
Co-authored-by: Nicholas Tindle <ntindle@users.noreply.github.com>
Co-authored-by: Nicholas Tindle <nicholas.tindle@agpt.co>
This commit is contained in:
seer-by-sentry[bot]
2025-10-13 02:16:48 -05:00
committed by GitHub
parent a49c957467
commit 20acd8b51d
6 changed files with 949 additions and 68 deletions

View File

@@ -235,6 +235,7 @@ class BaseEventModel(BaseModel):
class NotificationEventModel(BaseEventModel, Generic[NotificationDataType_co]):
id: Optional[str] = None # None when creating, populated when reading from DB
data: NotificationDataType_co
@property
@@ -378,6 +379,7 @@ class NotificationPreference(BaseModel):
class UserNotificationEventDTO(BaseModel):
id: str # Added to track notifications for removal
type: NotificationType
data: dict
created_at: datetime
@@ -386,6 +388,7 @@ class UserNotificationEventDTO(BaseModel):
@staticmethod
def from_db(model: NotificationEvent) -> "UserNotificationEventDTO":
return UserNotificationEventDTO(
id=model.id,
type=model.type,
data=dict(model.data),
created_at=model.createdAt,
@@ -541,6 +544,79 @@ async def empty_user_notification_batch(
) from e
async def clear_all_user_notification_batches(user_id: str) -> None:
"""Clear ALL notification batches for a user across all types.
Used when user's email is bounced/inactive and we should stop
trying to send them ANY emails.
"""
try:
# Delete all notification events for this user
await NotificationEvent.prisma().delete_many(
where={"UserNotificationBatch": {"is": {"userId": user_id}}}
)
# Delete all batches for this user
await UserNotificationBatch.prisma().delete_many(where={"userId": user_id})
logger.info(f"Cleared all notification batches for user {user_id}")
except Exception as e:
raise DatabaseError(
f"Failed to clear all notification batches for user {user_id}: {e}"
) from e
async def remove_notifications_from_batch(
user_id: str, notification_type: NotificationType, notification_ids: list[str]
) -> None:
"""Remove specific notifications from a user's batch by their IDs.
This is used after successful sending to remove only the
sent notifications, preventing duplicates on retry.
"""
if not notification_ids:
return
try:
# Delete the specific notification events
deleted_count = await NotificationEvent.prisma().delete_many(
where={
"id": {"in": notification_ids},
"UserNotificationBatch": {
"is": {"userId": user_id, "type": notification_type}
},
}
)
logger.info(
f"Removed {deleted_count} notifications from batch for user {user_id}"
)
# Check if batch is now empty and delete it if so
remaining = await NotificationEvent.prisma().count(
where={
"UserNotificationBatch": {
"is": {"userId": user_id, "type": notification_type}
}
}
)
if remaining == 0:
await UserNotificationBatch.prisma().delete_many(
where=UserNotificationBatchWhereInput(
userId=user_id,
type=notification_type,
)
)
logger.info(
f"Deleted empty batch for user {user_id} and type {notification_type}"
)
except Exception as e:
raise DatabaseError(
f"Failed to remove notifications from batch for user {user_id} and type {notification_type}: {e}"
) from e
async def get_user_notification_batch(
user_id: str,
notification_type: NotificationType,

View File

@@ -354,6 +354,36 @@ async def set_user_email_verification(user_id: str, verified: bool) -> None:
) from e
async def disable_all_user_notifications(user_id: str) -> None:
"""Disable all notification preferences for a user.
Used when user's email bounces/is inactive to prevent any future notifications.
"""
try:
await PrismaUser.prisma().update(
where={"id": user_id},
data={
"notifyOnAgentRun": False,
"notifyOnZeroBalance": False,
"notifyOnLowBalance": False,
"notifyOnBlockExecutionFailed": False,
"notifyOnContinuousAgentError": False,
"notifyOnDailySummary": False,
"notifyOnWeeklySummary": False,
"notifyOnMonthlySummary": False,
"notifyOnAgentApproved": False,
"notifyOnAgentRejected": False,
},
)
# Invalidate cache for this user
get_user_by_id.cache_delete(user_id)
logger.info(f"Disabled all notification preferences for user {user_id}")
except Exception as e:
raise DatabaseError(
f"Failed to disable notifications for user {user_id}: {e}"
) from e
async def get_user_email_verification(user_id: str) -> bool:
"""Get the email verification status for a user."""
try:

View File

@@ -29,11 +29,13 @@ from backend.data.graph import (
get_node,
)
from backend.data.notifications import (
clear_all_user_notification_batches,
create_or_add_to_user_notification_batch,
empty_user_notification_batch,
get_all_batches_by_type,
get_user_notification_batch,
get_user_notification_oldest_message_in_batch,
remove_notifications_from_batch,
)
from backend.data.user import (
get_active_user_ids_in_timerange,
@@ -148,10 +150,12 @@ class DatabaseManager(AppService):
get_user_notification_preference = _(get_user_notification_preference)
# Notifications - async
clear_all_user_notification_batches = _(clear_all_user_notification_batches)
create_or_add_to_user_notification_batch = _(
create_or_add_to_user_notification_batch
)
empty_user_notification_batch = _(empty_user_notification_batch)
remove_notifications_from_batch = _(remove_notifications_from_batch)
get_all_batches_by_type = _(get_all_batches_by_type)
get_user_notification_batch = _(get_user_notification_batch)
get_user_notification_oldest_message_in_batch = _(
@@ -243,10 +247,12 @@ class DatabaseManagerAsyncClient(AppServiceClient):
get_user_notification_preference = d.get_user_notification_preference
# Notifications
clear_all_user_notification_batches = d.clear_all_user_notification_batches
create_or_add_to_user_notification_batch = (
d.create_or_add_to_user_notification_batch
)
empty_user_notification_batch = d.empty_user_notification_batch
remove_notifications_from_batch = d.remove_notifications_from_batch
get_all_batches_by_type = d.get_all_batches_by_type
get_user_notification_batch = d.get_user_notification_batch
get_user_notification_oldest_message_in_batch = (

View File

@@ -25,7 +25,11 @@ from backend.data.notifications import (
get_summary_params_type,
)
from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig
from backend.data.user import generate_unsubscribe_link
from backend.data.user import (
disable_all_user_notifications,
generate_unsubscribe_link,
set_user_email_verification,
)
from backend.notifications.email import EmailSender
from backend.util.clients import get_database_manager_async_client
from backend.util.logging import TruncatedLogger
@@ -38,7 +42,7 @@ from backend.util.service import (
endpoint_to_sync,
expose,
)
from backend.util.settings import Settings
from backend.util.settings import AppEnvironment, Settings
logger = TruncatedLogger(logging.getLogger(__name__), "[NotificationManager]")
settings = Settings()
@@ -124,6 +128,12 @@ def get_routing_key(event_type: NotificationType) -> str:
def queue_notification(event: NotificationEventModel) -> NotificationResult:
"""Queue a notification - exposed method for other services to call"""
# Disable in production
if settings.config.app_env == AppEnvironment.PRODUCTION:
return NotificationResult(
success=True,
message="Queueing notifications is disabled in production",
)
try:
logger.debug(f"Received Request to queue {event=}")
@@ -151,6 +161,12 @@ def queue_notification(event: NotificationEventModel) -> NotificationResult:
async def queue_notification_async(event: NotificationEventModel) -> NotificationResult:
"""Queue a notification - exposed method for other services to call"""
# Disable in production
if settings.config.app_env == AppEnvironment.PRODUCTION:
return NotificationResult(
success=True,
message="Queueing notifications is disabled in production",
)
try:
logger.debug(f"Received Request to queue {event=}")
@@ -213,6 +229,9 @@ class NotificationManager(AppService):
@expose
async def queue_weekly_summary(self):
# disable in prod
if settings.config.app_env == AppEnvironment.PRODUCTION:
return
# Use the existing event loop instead of creating a new one with asyncio.run()
asyncio.create_task(self._queue_weekly_summary())
@@ -226,7 +245,9 @@ class NotificationManager(AppService):
logger.info(
f"Querying for active users between {start_time} and {current_time}"
)
users = await get_database_manager_async_client().get_active_user_ids_in_timerange(
users = await get_database_manager_async_client(
should_retry=False
).get_active_user_ids_in_timerange(
end_time=current_time.isoformat(),
start_time=start_time.isoformat(),
)
@@ -253,6 +274,9 @@ class NotificationManager(AppService):
async def process_existing_batches(
self, notification_types: list[NotificationType]
):
# disable in prod
if settings.config.app_env == AppEnvironment.PRODUCTION:
return
# Use the existing event loop instead of creating a new process
asyncio.create_task(self._process_existing_batches(notification_types))
@@ -266,15 +290,15 @@ class NotificationManager(AppService):
for notification_type in notification_types:
# Get all batches for this notification type
batches = (
await get_database_manager_async_client().get_all_batches_by_type(
notification_type
)
)
batches = await get_database_manager_async_client(
should_retry=False
).get_all_batches_by_type(notification_type)
for batch in batches:
# Check if batch has aged out
oldest_message = await get_database_manager_async_client().get_user_notification_oldest_message_in_batch(
oldest_message = await get_database_manager_async_client(
should_retry=False
).get_user_notification_oldest_message_in_batch(
batch.user_id, notification_type
)
@@ -289,9 +313,9 @@ class NotificationManager(AppService):
# If batch has aged out, process it
if oldest_message.created_at + max_delay < current_time:
recipient_email = await get_database_manager_async_client().get_user_email_by_id(
batch.user_id
)
recipient_email = await get_database_manager_async_client(
should_retry=False
).get_user_email_by_id(batch.user_id)
if not recipient_email:
logger.error(
@@ -308,21 +332,25 @@ class NotificationManager(AppService):
f"User {batch.user_id} does not want to receive {notification_type} notifications"
)
# Clear the batch
await get_database_manager_async_client().empty_user_notification_batch(
await get_database_manager_async_client(
should_retry=False
).empty_user_notification_batch(
batch.user_id, notification_type
)
continue
batch_data = await get_database_manager_async_client().get_user_notification_batch(
batch.user_id, notification_type
)
batch_data = await get_database_manager_async_client(
should_retry=False
).get_user_notification_batch(batch.user_id, notification_type)
if not batch_data or not batch_data.notifications:
logger.error(
f"Batch data not found for user {batch.user_id}"
)
# Clear the batch
await get_database_manager_async_client().empty_user_notification_batch(
await get_database_manager_async_client(
should_retry=False
).empty_user_notification_batch(
batch.user_id, notification_type
)
continue
@@ -358,7 +386,9 @@ class NotificationManager(AppService):
)
# Clear the batch
await get_database_manager_async_client().empty_user_notification_batch(
await get_database_manager_async_client(
should_retry=False
).empty_user_notification_batch(
batch.user_id, notification_type
)
@@ -413,15 +443,13 @@ class NotificationManager(AppService):
self, user_id: str, event_type: NotificationType
) -> bool:
"""Check if a user wants to receive a notification based on their preferences and email verification status"""
validated_email = (
await get_database_manager_async_client().get_user_email_verification(
user_id
)
)
validated_email = await get_database_manager_async_client(
should_retry=False
).get_user_email_verification(user_id)
preference = (
await get_database_manager_async_client().get_user_notification_preference(
user_id
)
await get_database_manager_async_client(
should_retry=False
).get_user_notification_preference(user_id)
).preferences.get(event_type, True)
# only if both are true, should we email this person
return validated_email and preference
@@ -437,7 +465,9 @@ class NotificationManager(AppService):
try:
# Get summary data from the database
summary_data = await get_database_manager_async_client().get_user_execution_summary_data(
summary_data = await get_database_manager_async_client(
should_retry=False
).get_user_execution_summary_data(
user_id=user_id,
start_time=params.start_date,
end_time=params.end_date,
@@ -524,13 +554,13 @@ class NotificationManager(AppService):
self, user_id: str, event_type: NotificationType, event: NotificationEventModel
) -> bool:
await get_database_manager_async_client().create_or_add_to_user_notification_batch(
user_id, event_type, event
)
await get_database_manager_async_client(
should_retry=False
).create_or_add_to_user_notification_batch(user_id, event_type, event)
oldest_message = await get_database_manager_async_client().get_user_notification_oldest_message_in_batch(
user_id, event_type
)
oldest_message = await get_database_manager_async_client(
should_retry=False
).get_user_notification_oldest_message_in_batch(user_id, event_type)
if not oldest_message:
logger.error(
f"Batch for user {user_id} and type {event_type} has no oldest message whichshould never happen!!!!!!!!!!!!!!!!"
@@ -580,11 +610,9 @@ class NotificationManager(AppService):
return False
logger.debug(f"Processing immediate notification: {event}")
recipient_email = (
await get_database_manager_async_client().get_user_email_by_id(
event.user_id
)
)
recipient_email = await get_database_manager_async_client(
should_retry=False
).get_user_email_by_id(event.user_id)
if not recipient_email:
logger.error(f"User email not found for user {event.user_id}")
return False
@@ -619,11 +647,9 @@ class NotificationManager(AppService):
return False
logger.info(f"Processing batch notification: {event}")
recipient_email = (
await get_database_manager_async_client().get_user_email_by_id(
event.user_id
)
)
recipient_email = await get_database_manager_async_client(
should_retry=False
).get_user_email_by_id(event.user_id)
if not recipient_email:
logger.error(f"User email not found for user {event.user_id}")
return False
@@ -642,11 +668,9 @@ class NotificationManager(AppService):
if not should_send:
logger.info("Batch not old enough to send")
return False
batch = (
await get_database_manager_async_client().get_user_notification_batch(
event.user_id, event.type
)
)
batch = await get_database_manager_async_client(
should_retry=False
).get_user_notification_batch(event.user_id, event.type)
if not batch or not batch.notifications:
logger.error(f"Batch not found for user {event.user_id}")
return False
@@ -657,6 +681,7 @@ class NotificationManager(AppService):
get_notif_data_type(db_event.type)
].model_validate(
{
"id": db_event.id, # Include ID from database
"user_id": event.user_id,
"type": db_event.type,
"data": db_event.data,
@@ -679,6 +704,9 @@ class NotificationManager(AppService):
chunk_sent = False
for attempt_size in [chunk_size, 50, 25, 10, 5, 1]:
chunk = batch_messages[i : i + attempt_size]
chunk_ids = [
msg.id for msg in chunk if msg.id
] # Extract IDs for removal
try:
# Try to render the email to check its size
@@ -705,6 +733,23 @@ class NotificationManager(AppService):
user_unsub_link=unsub_link,
)
# Remove successfully sent notifications immediately
if chunk_ids:
try:
await get_database_manager_async_client(
should_retry=False
).remove_notifications_from_batch(
event.user_id, event.type, chunk_ids
)
logger.info(
f"Removed {len(chunk_ids)} sent notifications from batch"
)
except Exception as e:
logger.error(
f"Failed to remove sent notifications: {e}"
)
# Continue anyway - better to risk duplicates than lose emails
# Track successful sends
successfully_sent_count += len(chunk)
@@ -722,13 +767,137 @@ class NotificationManager(AppService):
i += len(chunk)
chunk_sent = True
break
else:
# Message is too large even after size reduction
if attempt_size == 1:
logger.error(
f"Failed to send notification at index {i}: "
f"Single notification exceeds email size limit "
f"({len(test_message):,} chars > {MAX_EMAIL_SIZE:,} chars). "
f"Removing permanently from batch - will not retry."
)
# Remove the oversized notification permanently - it will NEVER fit
if chunk_ids:
try:
await get_database_manager_async_client(
should_retry=False
).remove_notifications_from_batch(
event.user_id, event.type, chunk_ids
)
logger.info(
f"Removed oversized notification {chunk_ids[0]} from batch permanently"
)
except Exception as e:
logger.error(
f"Failed to remove oversized notification: {e}"
)
failed_indices.append(i)
i += 1
chunk_sent = True
break
# Try smaller chunk size
continue
except Exception as e:
# Check if it's a Postmark API error
if attempt_size == 1:
# Even single notification is too large
logger.error(
f"Single notification too large to send: {e}. "
f"Skipping notification at index {i}"
)
# Single notification failed - determine the actual cause
error_message = str(e).lower()
error_type = type(e).__name__
# Check for HTTP 406 - Inactive recipient (common in Postmark errors)
if "406" in error_message or "inactive" in error_message:
logger.warning(
f"Failed to send notification at index {i}: "
f"Recipient marked as inactive by Postmark. "
f"Error: {e}. Disabling ALL notifications for this user."
)
# 1. Mark email as unverified
try:
await set_user_email_verification(
event.user_id, False
)
logger.info(
f"Set email verification to false for user {event.user_id}"
)
except Exception as deactivation_error:
logger.error(
f"Failed to deactivate email for user {event.user_id}: "
f"{deactivation_error}"
)
# 2. Disable all notification preferences
try:
await disable_all_user_notifications(event.user_id)
logger.info(
f"Disabled all notification preferences for user {event.user_id}"
)
except Exception as disable_error:
logger.error(
f"Failed to disable notification preferences: {disable_error}"
)
# 3. Clear ALL notification batches for this user
try:
await get_database_manager_async_client(
should_retry=False
).clear_all_user_notification_batches(event.user_id)
logger.info(
f"Cleared ALL notification batches for user {event.user_id}"
)
except Exception as remove_error:
logger.error(
f"Failed to clear batches for inactive recipient: {remove_error}"
)
# Stop processing - we've nuked everything for this user
return True
# Check for HTTP 422 - Malformed data
elif (
"422" in error_message
or "unprocessable" in error_message
):
logger.error(
f"Failed to send notification at index {i}: "
f"Malformed notification data rejected by Postmark. "
f"Error: {e}. Removing from batch permanently."
)
# Remove from batch - 422 means bad data that won't fix itself
if chunk_ids:
try:
await get_database_manager_async_client(
should_retry=False
).remove_notifications_from_batch(
event.user_id, event.type, chunk_ids
)
logger.info(
"Removed malformed notification from batch permanently"
)
except Exception as remove_error:
logger.error(
f"Failed to remove malformed notification: {remove_error}"
)
# Check if it's a ValueError for size limit
elif (
isinstance(e, ValueError)
and "too large" in error_message
):
logger.error(
f"Failed to send notification at index {i}: "
f"Notification size exceeds email limit. "
f"Error: {e}. Skipping this notification."
)
# Other API errors
else:
logger.error(
f"Failed to send notification at index {i}: "
f"Email API error ({error_type}): {e}. "
f"Skipping this notification."
)
failed_indices.append(i)
i += 1
chunk_sent = True
@@ -742,18 +911,20 @@ class NotificationManager(AppService):
failed_indices.append(i)
i += 1
# Only empty the batch if ALL notifications were sent successfully
if successfully_sent_count == len(batch_messages):
# Check what remains in the batch (notifications are removed as sent)
remaining_batch = await get_database_manager_async_client(
should_retry=False
).get_user_notification_batch(event.user_id, event.type)
if not remaining_batch or not remaining_batch.notifications:
logger.info(
f"Successfully sent all {successfully_sent_count} notifications, clearing batch"
)
await get_database_manager_async_client().empty_user_notification_batch(
event.user_id, event.type
f"All {successfully_sent_count} notifications sent and removed from batch"
)
else:
remaining_count = len(remaining_batch.notifications)
logger.warning(
f"Only sent {successfully_sent_count} of {len(batch_messages)} notifications. "
f"Failed indices: {failed_indices}. Batch will be retained for retry."
f"Sent {successfully_sent_count} notifications. "
f"{remaining_count} remain in batch for retry due to errors."
)
return True
except Exception as e:
@@ -771,11 +942,9 @@ class NotificationManager(AppService):
logger.info(f"Processing summary notification: {model}")
recipient_email = (
await get_database_manager_async_client().get_user_email_by_id(
event.user_id
)
)
recipient_email = await get_database_manager_async_client(
should_retry=False
).get_user_email_by_id(event.user_id)
if not recipient_email:
logger.error(f"User email not found for user {event.user_id}")
return False

View File

@@ -0,0 +1,598 @@
"""Tests for notification error handling in NotificationManager."""
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, Mock, patch
import pytest
from prisma.enums import NotificationType
from backend.data.notifications import AgentRunData, NotificationEventModel
from backend.notifications.notifications import NotificationManager
class TestNotificationErrorHandling:
"""Test cases for notification error handling in NotificationManager."""
@pytest.fixture
def notification_manager(self):
"""Create a NotificationManager instance for testing."""
with patch("backend.notifications.notifications.AppService.__init__"):
manager = NotificationManager()
manager.email_sender = MagicMock()
# Mock the _get_template method used by _process_batch
template_mock = Mock()
template_mock.base_template = "base"
template_mock.subject_template = "subject"
template_mock.body_template = "body"
manager.email_sender._get_template = Mock(return_value=template_mock)
# Mock the formatter
manager.email_sender.formatter = Mock()
manager.email_sender.formatter.format_email = Mock(
return_value=("subject", "body content")
)
manager.email_sender.formatter.env = Mock()
manager.email_sender.formatter.env.globals = {
"base_url": "http://example.com"
}
return manager
@pytest.fixture
def sample_batch_event(self):
"""Create a sample batch event for testing."""
return NotificationEventModel(
type=NotificationType.AGENT_RUN,
user_id="user_1",
created_at=datetime.now(timezone.utc),
data=AgentRunData(
agent_name="Test Agent",
credits_used=10.0,
execution_time=5.0,
node_count=3,
graph_id="graph_1",
outputs=[],
),
)
@pytest.fixture
def sample_batch_notifications(self):
"""Create sample batch notifications for testing."""
notifications = []
for i in range(3):
notification = Mock()
notification.type = NotificationType.AGENT_RUN
notification.data = {
"agent_name": f"Test Agent {i}",
"credits_used": 10.0 * (i + 1),
"execution_time": 5.0 * (i + 1),
"node_count": 3 + i,
"graph_id": f"graph_{i}",
"outputs": [],
}
notification.created_at = datetime.now(timezone.utc)
notifications.append(notification)
return notifications
@pytest.mark.asyncio
async def test_406_stops_all_processing_for_user(
self, notification_manager, sample_batch_event
):
"""Test that 406 inactive recipient error stops ALL processing for that user."""
with patch("backend.notifications.notifications.logger"), patch(
"backend.notifications.notifications.set_user_email_verification",
new_callable=AsyncMock,
) as mock_set_verification, patch(
"backend.notifications.notifications.disable_all_user_notifications",
new_callable=AsyncMock,
) as mock_disable_all, patch(
"backend.notifications.notifications.get_database_manager_async_client"
) as mock_db_client, patch(
"backend.notifications.notifications.generate_unsubscribe_link"
) as mock_unsub_link:
# Create batch of 5 notifications
notifications = []
for i in range(5):
notification = Mock()
notification.id = f"notif_{i}"
notification.type = NotificationType.AGENT_RUN
notification.data = {
"agent_name": f"Test Agent {i}",
"credits_used": 10.0 * (i + 1),
"execution_time": 5.0 * (i + 1),
"node_count": 3 + i,
"graph_id": f"graph_{i}",
"outputs": [],
}
notification.created_at = datetime.now(timezone.utc)
notifications.append(notification)
# Setup mocks
mock_db = mock_db_client.return_value
mock_db.get_user_email_by_id = AsyncMock(return_value="test@example.com")
mock_db.get_user_notification_batch = AsyncMock(
return_value=Mock(notifications=notifications)
)
mock_db.clear_all_user_notification_batches = AsyncMock()
mock_db.remove_notifications_from_batch = AsyncMock()
mock_unsub_link.return_value = "http://example.com/unsub"
# Mock internal methods
notification_manager._should_email_user_based_on_preference = AsyncMock(
return_value=True
)
notification_manager._should_batch = AsyncMock(return_value=True)
notification_manager._parse_message = Mock(return_value=sample_batch_event)
# Track calls
call_count = [0]
def send_side_effect(*args, **kwargs):
data = kwargs.get("data", [])
if isinstance(data, list) and len(data) == 1:
current_call = call_count[0]
call_count[0] += 1
# First two succeed, third hits 406
if current_call < 2:
return None
else:
raise Exception("Recipient marked as inactive (406)")
# Force single processing
raise Exception("Force single processing")
notification_manager.email_sender.send_templated.side_effect = (
send_side_effect
)
# Act
result = await notification_manager._process_batch(
sample_batch_event.model_dump_json()
)
# Assert
assert result is True
# Only 3 calls should have been made (2 successful, 1 failed with 406)
assert call_count[0] == 3
# User should be deactivated
mock_set_verification.assert_called_once_with("user_1", False)
mock_disable_all.assert_called_once_with("user_1")
mock_db.clear_all_user_notification_batches.assert_called_once_with(
"user_1"
)
# No further processing should occur after 406
@pytest.mark.asyncio
async def test_422_permanently_removes_malformed_notification(
self, notification_manager, sample_batch_event
):
"""Test that 422 error permanently removes the malformed notification from batch and continues with others."""
with patch("backend.notifications.notifications.logger") as mock_logger, patch(
"backend.notifications.notifications.get_database_manager_async_client"
) as mock_db_client, patch(
"backend.notifications.notifications.generate_unsubscribe_link"
) as mock_unsub_link:
# Create batch of 5 notifications
notifications = []
for i in range(5):
notification = Mock()
notification.id = f"notif_{i}"
notification.type = NotificationType.AGENT_RUN
notification.data = {
"agent_name": f"Test Agent {i}",
"credits_used": 10.0 * (i + 1),
"execution_time": 5.0 * (i + 1),
"node_count": 3 + i,
"graph_id": f"graph_{i}",
"outputs": [],
}
notification.created_at = datetime.now(timezone.utc)
notifications.append(notification)
# Setup mocks
mock_db = mock_db_client.return_value
mock_db.get_user_email_by_id = AsyncMock(return_value="test@example.com")
mock_db.get_user_notification_batch = AsyncMock(
side_effect=[
Mock(notifications=notifications),
Mock(notifications=[]), # Empty after processing
]
)
mock_db.remove_notifications_from_batch = AsyncMock()
mock_unsub_link.return_value = "http://example.com/unsub"
# Mock internal methods
notification_manager._should_email_user_based_on_preference = AsyncMock(
return_value=True
)
notification_manager._should_batch = AsyncMock(return_value=True)
notification_manager._parse_message = Mock(return_value=sample_batch_event)
# Track calls
call_count = [0]
successful_indices = []
removed_notification_ids = []
# Capture what gets removed
def remove_side_effect(user_id, notif_type, notif_ids):
removed_notification_ids.extend(notif_ids)
return None
mock_db.remove_notifications_from_batch.side_effect = remove_side_effect
def send_side_effect(*args, **kwargs):
data = kwargs.get("data", [])
if isinstance(data, list) and len(data) == 1:
current_call = call_count[0]
call_count[0] += 1
# Index 2 has malformed data (422)
if current_call == 2:
raise Exception(
"Unprocessable entity (422): Malformed email data"
)
else:
successful_indices.append(current_call)
return None
# Force single processing
raise Exception("Force single processing")
notification_manager.email_sender.send_templated.side_effect = (
send_side_effect
)
# Act
result = await notification_manager._process_batch(
sample_batch_event.model_dump_json()
)
# Assert
assert result is True
assert call_count[0] == 5 # All 5 attempted
assert len(successful_indices) == 4 # 4 succeeded (all except index 2)
assert 2 not in successful_indices # Index 2 failed
# Verify 422 error was logged
error_calls = [call[0][0] for call in mock_logger.error.call_args_list]
assert any(
"422" in call or "malformed" in call.lower() for call in error_calls
)
# Verify all notifications were removed (4 successful + 1 malformed)
assert mock_db.remove_notifications_from_batch.call_count == 5
assert (
"notif_2" in removed_notification_ids
) # Malformed one was removed permanently
@pytest.mark.asyncio
async def test_oversized_notification_permanently_removed(
self, notification_manager, sample_batch_event
):
"""Test that oversized notifications are permanently removed from batch but others continue."""
with patch("backend.notifications.notifications.logger") as mock_logger, patch(
"backend.notifications.notifications.get_database_manager_async_client"
) as mock_db_client, patch(
"backend.notifications.notifications.generate_unsubscribe_link"
) as mock_unsub_link:
# Create batch of 5 notifications
notifications = []
for i in range(5):
notification = Mock()
notification.id = f"notif_{i}"
notification.type = NotificationType.AGENT_RUN
notification.data = {
"agent_name": f"Test Agent {i}",
"credits_used": 10.0 * (i + 1),
"execution_time": 5.0 * (i + 1),
"node_count": 3 + i,
"graph_id": f"graph_{i}",
"outputs": [],
}
notification.created_at = datetime.now(timezone.utc)
notifications.append(notification)
# Setup mocks
mock_db = mock_db_client.return_value
mock_db.get_user_email_by_id = AsyncMock(return_value="test@example.com")
mock_db.get_user_notification_batch = AsyncMock(
side_effect=[
Mock(notifications=notifications),
Mock(notifications=[]), # Empty after processing
]
)
mock_db.remove_notifications_from_batch = AsyncMock()
mock_unsub_link.return_value = "http://example.com/unsub"
# Mock internal methods
notification_manager._should_email_user_based_on_preference = AsyncMock(
return_value=True
)
notification_manager._should_batch = AsyncMock(return_value=True)
notification_manager._parse_message = Mock(return_value=sample_batch_event)
# Override formatter to simulate oversized on index 3
# original_format = notification_manager.email_sender.formatter.format_email
def format_side_effect(*args, **kwargs):
# Check if we're formatting index 3
data = kwargs.get("data", {}).get("notifications", [])
if data and len(data) == 1:
# Check notification content to identify index 3
if any(
"Test Agent 3" in str(n.data)
for n in data
if hasattr(n, "data")
):
# Return oversized message for index 3
return ("subject", "x" * 5_000_000) # Over 4.5MB limit
return ("subject", "normal sized content")
notification_manager.email_sender.formatter.format_email = Mock(
side_effect=format_side_effect
)
# Track calls
successful_indices = []
def send_side_effect(*args, **kwargs):
data = kwargs.get("data", [])
if isinstance(data, list) and len(data) == 1:
# Track which notification was sent based on content
for i, notif in enumerate(notifications):
if any(
f"Test Agent {i}" in str(n.data)
for n in data
if hasattr(n, "data")
):
successful_indices.append(i)
return None
return None
# Force single processing
raise Exception("Force single processing")
notification_manager.email_sender.send_templated.side_effect = (
send_side_effect
)
# Act
result = await notification_manager._process_batch(
sample_batch_event.model_dump_json()
)
# Assert
assert result is True
assert (
len(successful_indices) == 4
) # Only 4 sent (index 3 skipped due to size)
assert 3 not in successful_indices # Index 3 was not sent
# Verify oversized error was logged
error_calls = [call[0][0] for call in mock_logger.error.call_args_list]
assert any(
"exceeds email size limit" in call or "oversized" in call.lower()
for call in error_calls
)
@pytest.mark.asyncio
async def test_generic_api_error_keeps_notification_for_retry(
self, notification_manager, sample_batch_event
):
"""Test that generic API errors keep notifications in batch for retry while others continue."""
with patch("backend.notifications.notifications.logger") as mock_logger, patch(
"backend.notifications.notifications.get_database_manager_async_client"
) as mock_db_client, patch(
"backend.notifications.notifications.generate_unsubscribe_link"
) as mock_unsub_link:
# Create batch of 5 notifications
notifications = []
for i in range(5):
notification = Mock()
notification.id = f"notif_{i}"
notification.type = NotificationType.AGENT_RUN
notification.data = {
"agent_name": f"Test Agent {i}",
"credits_used": 10.0 * (i + 1),
"execution_time": 5.0 * (i + 1),
"node_count": 3 + i,
"graph_id": f"graph_{i}",
"outputs": [],
}
notification.created_at = datetime.now(timezone.utc)
notifications.append(notification)
# Notification that failed with generic error
failed_notifications = [notifications[1]] # Only index 1 remains for retry
# Setup mocks
mock_db = mock_db_client.return_value
mock_db.get_user_email_by_id = AsyncMock(return_value="test@example.com")
mock_db.get_user_notification_batch = AsyncMock(
side_effect=[
Mock(notifications=notifications),
Mock(
notifications=failed_notifications
), # Failed ones remain for retry
]
)
mock_db.remove_notifications_from_batch = AsyncMock()
mock_unsub_link.return_value = "http://example.com/unsub"
# Mock internal methods
notification_manager._should_email_user_based_on_preference = AsyncMock(
return_value=True
)
notification_manager._should_batch = AsyncMock(return_value=True)
notification_manager._parse_message = Mock(return_value=sample_batch_event)
# Track calls
successful_indices = []
failed_indices = []
removed_notification_ids = []
# Capture what gets removed
def remove_side_effect(user_id, notif_type, notif_ids):
removed_notification_ids.extend(notif_ids)
return None
mock_db.remove_notifications_from_batch.side_effect = remove_side_effect
def send_side_effect(*args, **kwargs):
data = kwargs.get("data", [])
if isinstance(data, list) and len(data) == 1:
# Track which notification based on content
for i, notif in enumerate(notifications):
if any(
f"Test Agent {i}" in str(n.data)
for n in data
if hasattr(n, "data")
):
# Index 1 has generic API error
if i == 1:
failed_indices.append(i)
raise Exception("Network timeout - temporary failure")
else:
successful_indices.append(i)
return None
return None
# Force single processing
raise Exception("Force single processing")
notification_manager.email_sender.send_templated.side_effect = (
send_side_effect
)
# Act
result = await notification_manager._process_batch(
sample_batch_event.model_dump_json()
)
# Assert
assert result is True
assert len(successful_indices) == 4 # 4 succeeded (0, 2, 3, 4)
assert len(failed_indices) == 1 # 1 failed
assert 1 in failed_indices # Index 1 failed
# Verify generic error was logged
error_calls = [call[0][0] for call in mock_logger.error.call_args_list]
assert any(
"api error" in call.lower() or "skipping" in call.lower()
for call in error_calls
)
# Only successful ones should be removed from batch (failed one stays for retry)
assert mock_db.remove_notifications_from_batch.call_count == 4
assert (
"notif_1" not in removed_notification_ids
) # Failed one NOT removed (stays for retry)
assert "notif_0" in removed_notification_ids # Successful one removed
assert "notif_2" in removed_notification_ids # Successful one removed
assert "notif_3" in removed_notification_ids # Successful one removed
assert "notif_4" in removed_notification_ids # Successful one removed
@pytest.mark.asyncio
async def test_batch_all_notifications_sent_successfully(
self, notification_manager, sample_batch_event
):
"""Test successful batch processing where all notifications are sent without errors."""
with patch("backend.notifications.notifications.logger") as mock_logger, patch(
"backend.notifications.notifications.get_database_manager_async_client"
) as mock_db_client, patch(
"backend.notifications.notifications.generate_unsubscribe_link"
) as mock_unsub_link:
# Create batch of 5 notifications
notifications = []
for i in range(5):
notification = Mock()
notification.id = f"notif_{i}"
notification.type = NotificationType.AGENT_RUN
notification.data = {
"agent_name": f"Test Agent {i}",
"credits_used": 10.0 * (i + 1),
"execution_time": 5.0 * (i + 1),
"node_count": 3 + i,
"graph_id": f"graph_{i}",
"outputs": [],
}
notification.created_at = datetime.now(timezone.utc)
notifications.append(notification)
# Setup mocks
mock_db = mock_db_client.return_value
mock_db.get_user_email_by_id = AsyncMock(return_value="test@example.com")
mock_db.get_user_notification_batch = AsyncMock(
side_effect=[
Mock(notifications=notifications),
Mock(notifications=[]), # Empty after all sent successfully
]
)
mock_db.remove_notifications_from_batch = AsyncMock()
mock_unsub_link.return_value = "http://example.com/unsub"
# Mock internal methods
notification_manager._should_email_user_based_on_preference = AsyncMock(
return_value=True
)
notification_manager._should_batch = AsyncMock(return_value=True)
notification_manager._parse_message = Mock(return_value=sample_batch_event)
# Track successful sends
successful_indices = []
removed_notification_ids = []
# Capture what gets removed
def remove_side_effect(user_id, notif_type, notif_ids):
removed_notification_ids.extend(notif_ids)
return None
mock_db.remove_notifications_from_batch.side_effect = remove_side_effect
def send_side_effect(*args, **kwargs):
data = kwargs.get("data", [])
if isinstance(data, list) and len(data) == 1:
# Track which notification was sent
for i, notif in enumerate(notifications):
if any(
f"Test Agent {i}" in str(n.data)
for n in data
if hasattr(n, "data")
):
successful_indices.append(i)
return None
return None # Success
# Force single processing
raise Exception("Force single processing")
notification_manager.email_sender.send_templated.side_effect = (
send_side_effect
)
# Act
result = await notification_manager._process_batch(
sample_batch_event.model_dump_json()
)
# Assert
assert result is True
# All 5 notifications should be sent successfully
assert len(successful_indices) == 5
assert successful_indices == [0, 1, 2, 3, 4]
# All notifications should be removed from batch
assert mock_db.remove_notifications_from_batch.call_count == 5
assert len(removed_notification_ids) == 5
for i in range(5):
assert f"notif_{i}" in removed_notification_ids
# No errors should be logged
assert mock_logger.error.call_count == 0
# Info message about successful sends should be logged
info_calls = [call[0][0] for call in mock_logger.info.call_args_list]
assert any("sent and removed" in call.lower() for call in info_calls)

View File

@@ -34,12 +34,14 @@ def get_database_manager_client() -> "DatabaseManagerClient":
@thread_cached
def get_database_manager_async_client() -> "DatabaseManagerAsyncClient":
def get_database_manager_async_client(
should_retry: bool = True,
) -> "DatabaseManagerAsyncClient":
"""Get a thread-cached DatabaseManagerAsyncClient with request retry enabled."""
from backend.executor import DatabaseManagerAsyncClient
from backend.util.service import get_service_client
return get_service_client(DatabaseManagerAsyncClient, request_retry=True)
return get_service_client(DatabaseManagerAsyncClient, request_retry=should_retry)
@thread_cached