refactor: Update tests to call actual _process_batch implementation

- Refactored all tests to call actual _process_batch method with mocked dependencies
- Fixed AsyncMock import and usage for async functions
- Removed duplicate error handling logic from tests
- Tests now verify real implementation behavior
- Added proper mocking for database operations and internal methods

Co-authored-by: Nicholas Tindle <ntindle@users.noreply.github.com>
This commit is contained in:
claude[bot]
2025-10-07 21:39:56 +00:00
parent 78af66cafb
commit f93128b312

View File

@@ -1,7 +1,7 @@
"""Tests for notification error handling in NotificationManager."""
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, Mock, patch
import pytest
from prisma.enums import NotificationType
@@ -10,15 +10,6 @@ from backend.data.notifications import AgentRunData, NotificationEventModel
from backend.notifications.notifications import NotificationManager
class MockPostmarkError(Exception):
"""Mock exception to simulate Postmark API errors."""
def __init__(self, message, error_code=None, status_code=None):
super().__init__(message)
self.error_code = error_code
self.status_code = status_code
class TestNotificationErrorHandling:
"""Test cases for notification error handling in NotificationManager."""
@@ -28,411 +19,420 @@ class TestNotificationErrorHandling:
with patch("backend.notifications.notifications.AppService.__init__"):
manager = NotificationManager()
manager.email_sender = MagicMock()
# Mock the _get_template method used by _process_batch
template_mock = Mock()
template_mock.base_template = "base"
template_mock.subject_template = "subject"
template_mock.body_template = "body"
manager.email_sender._get_template = Mock(return_value=template_mock)
# Mock the formatter
manager.email_sender.formatter = Mock()
manager.email_sender.formatter.format_email = Mock(
return_value=("subject", "body content")
)
manager.email_sender.formatter.env = Mock()
manager.email_sender.formatter.env.globals = {"base_url": "http://example.com"}
return manager
@pytest.fixture
def sample_notifications(self):
"""Create sample notification data for testing."""
return [
NotificationEventModel(
type=NotificationType.AGENT_RUN,
user_id="user_1",
created_at=datetime.now(timezone.utc),
data=AgentRunData(
agent_name="Test Agent 1",
credits_used=10.0,
execution_time=5.0,
node_count=3,
graph_id="graph_1",
outputs=[],
),
def sample_batch_event(self):
"""Create a sample batch event for testing."""
return NotificationEventModel(
type=NotificationType.AGENT_RUN,
user_id="user_1",
created_at=datetime.now(timezone.utc),
data=AgentRunData(
agent_name="Test Agent",
credits_used=10.0,
execution_time=5.0,
node_count=3,
graph_id="graph_1",
outputs=[],
),
NotificationEventModel(
type=NotificationType.AGENT_RUN,
user_id="user_2",
created_at=datetime.now(timezone.utc),
data=AgentRunData(
agent_name="Test Agent 2",
credits_used=20.0,
execution_time=10.0,
node_count=5,
graph_id="graph_2",
outputs=[],
),
),
NotificationEventModel(
type=NotificationType.AGENT_RUN,
user_id="user_3",
created_at=datetime.now(timezone.utc),
data=AgentRunData(
agent_name="Test Agent 3",
credits_used=30.0,
execution_time=15.0,
node_count=7,
graph_id="graph_3",
outputs=[],
),
),
]
)
@pytest.fixture
def sample_batch_notifications(self):
"""Create sample batch notifications for testing."""
notifications = []
for i in range(3):
notification = Mock()
notification.type = NotificationType.AGENT_RUN
notification.data = {
"agent_name": f"Test Agent {i}",
"credits_used": 10.0 * (i + 1),
"execution_time": 5.0 * (i + 1),
"node_count": 3 + i,
"graph_id": f"graph_{i}",
"outputs": [],
}
notification.created_at = datetime.now(timezone.utc)
notifications.append(notification)
return notifications
@pytest.mark.asyncio
async def test_postmark_406_inactive_recipient_error_handling(
self, notification_manager, sample_notifications
self, notification_manager, sample_batch_event, sample_batch_notifications
):
"""Test handling of Postmark 406 error for inactive recipients and email deactivation."""
with patch("backend.notifications.notifications.logger") as mock_logger:
# Create an exception that simulates a 406 error
error = Exception("Recipient marked as inactive (406)")
with patch("backend.notifications.notifications.logger") as mock_logger, \
patch("backend.notifications.notifications.set_user_email_verification",
new_callable=AsyncMock) as mock_set_verification, \
patch("backend.notifications.notifications.get_database_manager_async_client") as mock_db_client, \
patch("backend.notifications.notifications.generate_unsubscribe_link") as mock_unsub_link:
# Mock the email sender to raise the error
notification_manager.email_sender.send_templated.side_effect = error
# Setup mocks
mock_db = mock_db_client.return_value
mock_db.get_user_email_by_id = AsyncMock(return_value="test@example.com")
mock_db.get_user_notification_batch = AsyncMock(
return_value=Mock(notifications=sample_batch_notifications)
)
mock_db.empty_user_notification_batch = AsyncMock()
mock_unsub_link.return_value = "http://example.com/unsub"
# Process notifications
failed_indices = []
recipient_email = "test@example.com"
user_id = "user_1"
# Mock _should_email_user_based_on_preference
notification_manager._should_email_user_based_on_preference = AsyncMock(
return_value=True
)
notification_manager._should_batch = AsyncMock(return_value=True)
notification_manager._parse_message = Mock(return_value=sample_batch_event)
# Mock set_user_email_verification function
with patch(
"backend.notifications.notifications.set_user_email_verification"
) as mock_set_verification:
mock_set_verification.return_value = None
# Configure email sender to raise 406 error for single notifications
def send_side_effect(*args, **kwargs):
# Check if data is a list (batch) or single notification
data = kwargs.get("data", [])
if isinstance(data, list) and len(data) == 1:
raise Exception("Recipient marked as inactive (406)")
raise Exception("Recipient marked as inactive (406)")
# Simulate the error handling logic from the actual code
i = 0
while i < len(sample_notifications):
try:
notification_manager.email_sender.send_templated(
notification=NotificationType.AGENT_RUN,
user_email=recipient_email,
data=sample_notifications[i],
user_unsub_link="http://example.com/unsub",
)
i += 1
except Exception as e:
# This is the error handling logic we're testing
error_message = str(e).lower()
notification_manager.email_sender.send_templated.side_effect = send_side_effect
if "406" in error_message or "inactive" in error_message:
mock_logger.warning(
f"Failed to send notification at index {i}: "
f"Recipient marked as inactive by Postmark. "
f"Error: {e}. Skipping this notification."
)
# Simulate the email deactivation
try:
await mock_set_verification(user_id, False)
mock_logger.info(
f"Set email verification to false for user {user_id} "
f"after receiving 406 inactive recipient error"
)
except Exception as deactivation_error:
mock_logger.error(
f"Failed to deactivate email for user {user_id}: "
f"{deactivation_error}"
)
# Create message
message = sample_batch_event.model_dump_json()
failed_indices.append(i)
i += 1
# Act: call actual implementation
result = await notification_manager._process_batch(message)
# Verify the warning was logged for all notifications
assert mock_logger.warning.call_count == len(sample_notifications)
assert len(failed_indices) == len(sample_notifications)
# Assert: processing returns success (True = handled, won't retry)
assert result is True
# Verify the warning message contains the expected information
warning_call = mock_logger.warning.call_args[0][0]
assert "inactive" in warning_call.lower()
assert "406" in warning_call
# Email verification should be set to false due to 406
assert mock_set_verification.called
mock_set_verification.assert_any_call("user_1", False)
# Verify that set_user_email_verification was called
assert mock_set_verification.called
mock_set_verification.assert_called_with(user_id, False)
# Warning logs should be emitted for inactive recipient
warning_calls = [call[0][0] for call in mock_logger.warning.call_args_list]
assert any("inactive" in call.lower() for call in warning_calls)
assert any("406" in call for call in warning_calls)
def test_postmark_422_malformed_data_error_handling(
self, notification_manager, sample_notifications
@pytest.mark.asyncio
async def test_postmark_422_malformed_data_error_handling(
self, notification_manager, sample_batch_event, sample_batch_notifications
):
"""Test handling of Postmark 422 error for malformed data."""
with patch("backend.notifications.notifications.logger") as mock_logger:
# Create an exception that simulates a 422 error
error = Exception("Unprocessable entity (422): Malformed email data")
with patch("backend.notifications.notifications.logger") as mock_logger, \
patch("backend.notifications.notifications.get_database_manager_async_client") as mock_db_client, \
patch("backend.notifications.notifications.generate_unsubscribe_link") as mock_unsub_link:
# Mock the email sender to raise the error
notification_manager.email_sender.send_templated.side_effect = error
# Setup mocks
mock_db = mock_db_client.return_value
mock_db.get_user_email_by_id = AsyncMock(return_value="test@example.com")
mock_db.get_user_notification_batch = AsyncMock(
return_value=Mock(notifications=sample_batch_notifications)
)
mock_db.empty_user_notification_batch = AsyncMock()
mock_unsub_link.return_value = "http://example.com/unsub"
# Process notifications
failed_indices = []
recipient_email = "test@example.com"
# Mock internal methods
notification_manager._should_email_user_based_on_preference = AsyncMock(
return_value=True
)
notification_manager._should_batch = AsyncMock(return_value=True)
notification_manager._parse_message = Mock(return_value=sample_batch_event)
# Simulate the error handling logic
i = 0
while i < len(sample_notifications):
try:
notification_manager.email_sender.send_templated(
notification=NotificationType.AGENT_RUN,
user_email=recipient_email,
data=sample_notifications[i],
user_unsub_link="http://example.com/unsub",
)
i += 1
except Exception as e:
error_message = str(e).lower()
# Configure email sender to raise 422 error
def send_side_effect(*args, **kwargs):
data = kwargs.get("data", [])
if isinstance(data, list) and len(data) == 1:
raise Exception("Unprocessable entity (422): Malformed email data")
raise Exception("Unprocessable entity (422): Malformed email data")
if "422" in error_message or "unprocessable" in error_message:
mock_logger.error(
f"Failed to send notification at index {i}: "
f"Malformed notification data rejected by Postmark. "
f"Error: {e}. Skipping this notification."
)
failed_indices.append(i)
notification_manager.email_sender.send_templated.side_effect = send_side_effect
i += 1
# Create message
message = sample_batch_event.model_dump_json()
# Verify the error was logged for all notifications
assert mock_logger.error.call_count == len(sample_notifications)
assert len(failed_indices) == len(sample_notifications)
# Act
result = await notification_manager._process_batch(message)
# Verify the error message contains the expected information
error_call = mock_logger.error.call_args[0][0]
assert "malformed" in error_call.lower()
# Assert
assert result is True
def test_value_error_too_large_notification(
self, notification_manager, sample_notifications
# Error logs should be emitted for malformed data
error_calls = [call[0][0] for call in mock_logger.error.call_args_list]
assert any("malformed" in call.lower() for call in error_calls)
assert any("422" in call for call in error_calls)
@pytest.mark.asyncio
async def test_postmark_oversized_notification_error_handling(
self, notification_manager, sample_batch_event, sample_batch_notifications
):
"""Test handling of ValueError for oversized notifications."""
with patch("backend.notifications.notifications.logger") as mock_logger:
# Create a ValueError for too large email
error = ValueError(
"Email body too large: 6000000 characters (limit: 5242880)"
with patch("backend.notifications.notifications.logger") as mock_logger, \
patch("backend.notifications.notifications.get_database_manager_async_client") as mock_db_client, \
patch("backend.notifications.notifications.generate_unsubscribe_link") as mock_unsub_link:
# Setup mocks
mock_db = mock_db_client.return_value
mock_db.get_user_email_by_id = AsyncMock(return_value="test@example.com")
mock_db.get_user_notification_batch = AsyncMock(
return_value=Mock(notifications=sample_batch_notifications)
)
mock_db.empty_user_notification_batch = AsyncMock()
mock_unsub_link.return_value = "http://example.com/unsub"
# Mock the email sender to raise the error
notification_manager.email_sender.send_templated.side_effect = error
# Mock internal methods
notification_manager._should_email_user_based_on_preference = AsyncMock(
return_value=True
)
notification_manager._should_batch = AsyncMock(return_value=True)
notification_manager._parse_message = Mock(return_value=sample_batch_event)
# Process notifications
failed_indices = []
recipient_email = "test@example.com"
# Configure email sender to raise ValueError for oversized email
def send_side_effect(*args, **kwargs):
data = kwargs.get("data", [])
if isinstance(data, list) and len(data) == 1:
raise ValueError("Email body too large: 6000000 characters (limit: 5242880)")
raise ValueError("Email body too large: 6000000 characters (limit: 5242880)")
# Simulate the error handling logic
i = 0
while i < len(sample_notifications):
try:
notification_manager.email_sender.send_templated(
notification=NotificationType.AGENT_RUN,
user_email=recipient_email,
data=sample_notifications[i],
user_unsub_link="http://example.com/unsub",
)
i += 1
except ValueError as e:
if "too large" in str(e).lower():
mock_logger.error(
f"Failed to send notification at index {i}: "
f"Notification size exceeds email limit. "
f"Error: {e}. Skipping this notification."
)
else:
mock_logger.error(
f"Failed to send notification at index {i}: "
f"ValueError: {e}. Skipping this notification."
)
notification_manager.email_sender.send_templated.side_effect = send_side_effect
failed_indices.append(i)
i += 1
# Create message
message = sample_batch_event.model_dump_json()
# Verify the error was logged for all notifications
assert mock_logger.error.call_count == len(sample_notifications)
assert len(failed_indices) == len(sample_notifications)
# Act
result = await notification_manager._process_batch(message)
# Verify the error message contains size information
error_call = mock_logger.error.call_args[0][0]
assert "size exceeds" in error_call.lower()
# Assert
assert result is True
def test_generic_error_handling(self, notification_manager, sample_notifications):
"""Test handling of generic errors."""
with patch("backend.notifications.notifications.logger") as mock_logger:
# Create a generic exception
error = Exception("Some other API error")
# Error logs should be emitted for oversized notification
error_calls = [call[0][0] for call in mock_logger.error.call_args_list]
assert any("size exceeds" in call.lower() or "too large" in call.lower()
for call in error_calls)
# Mock the email sender to raise the error
notification_manager.email_sender.send_templated.side_effect = error
@pytest.mark.asyncio
async def test_postmark_generic_api_error_handling(
self, notification_manager, sample_batch_event, sample_batch_notifications
):
"""Test handling of generic API errors."""
with patch("backend.notifications.notifications.logger") as mock_logger, \
patch("backend.notifications.notifications.get_database_manager_async_client") as mock_db_client, \
patch("backend.notifications.notifications.generate_unsubscribe_link") as mock_unsub_link:
# Process notifications
failed_indices = []
recipient_email = "test@example.com"
# Setup mocks
mock_db = mock_db_client.return_value
mock_db.get_user_email_by_id = AsyncMock(return_value="test@example.com")
mock_db.get_user_notification_batch = AsyncMock(
return_value=Mock(notifications=sample_batch_notifications)
)
mock_db.empty_user_notification_batch = AsyncMock()
mock_unsub_link.return_value = "http://example.com/unsub"
# Simulate the error handling logic
i = 0
while i < len(sample_notifications):
try:
notification_manager.email_sender.send_templated(
notification=NotificationType.AGENT_RUN,
user_email=recipient_email,
data=sample_notifications[i],
user_unsub_link="http://example.com/unsub",
)
i += 1
except Exception as e:
error_message = str(e).lower()
# Mock internal methods
notification_manager._should_email_user_based_on_preference = AsyncMock(
return_value=True
)
notification_manager._should_batch = AsyncMock(return_value=True)
notification_manager._parse_message = Mock(return_value=sample_batch_event)
# Check for specific errors first
if "406" in error_message or "inactive" in error_message:
mock_logger.warning(
f"Failed to send notification at index {i}: "
f"Recipient marked as inactive by Postmark. "
f"Error: {e}. Skipping this notification."
)
elif "422" in error_message or "unprocessable" in error_message:
mock_logger.error(
f"Failed to send notification at index {i}: "
f"Malformed notification data rejected by Postmark. "
f"Error: {e}. Skipping this notification."
)
else:
# Generic error
mock_logger.error(
f"Failed to send notification at index {i}: "
f"Email API error ({type(e).__name__}): {e}. "
f"Skipping this notification."
)
# Configure email sender to raise generic error
def send_side_effect(*args, **kwargs):
data = kwargs.get("data", [])
if isinstance(data, list) and len(data) == 1:
raise Exception("Some other API error")
raise Exception("Some other API error")
failed_indices.append(i)
i += 1
notification_manager.email_sender.send_templated.side_effect = send_side_effect
# Verify the generic error was logged
assert mock_logger.error.call_count == len(sample_notifications)
assert len(failed_indices) == len(sample_notifications)
# Create message
message = sample_batch_event.model_dump_json()
# Verify the error message is generic
error_call = mock_logger.error.call_args[0][0]
assert "email api error" in error_call.lower()
# Act
result = await notification_manager._process_batch(message)
def test_mixed_error_scenarios(self, notification_manager, sample_notifications):
# Assert
assert result is True
# Generic error logs should be emitted
error_calls = [call[0][0] for call in mock_logger.error.call_args_list]
assert any("email api error" in call.lower() or "skipping" in call.lower()
for call in error_calls)
@pytest.mark.asyncio
async def test_postmark_batch_processing_with_mixed_errors(
self, notification_manager, sample_batch_event
):
"""Test handling of mixed error scenarios in a batch."""
with patch("backend.notifications.notifications.logger") as mock_logger:
# Create different errors for different notifications
errors = [
Exception("Recipient marked as inactive (406)"),
Exception("Unprocessable entity (422)"),
ValueError("Email body too large"),
]
with patch("backend.notifications.notifications.logger") as mock_logger, \
patch("backend.notifications.notifications.set_user_email_verification",
new_callable=AsyncMock) as mock_set_verification, \
patch("backend.notifications.notifications.get_database_manager_async_client") as mock_db_client, \
patch("backend.notifications.notifications.generate_unsubscribe_link") as mock_unsub_link:
# Mock the email sender to raise different errors
notification_manager.email_sender.send_templated.side_effect = errors
# Create larger batch with mixed notifications
notifications = []
for i in range(5):
notification = Mock()
notification.type = NotificationType.AGENT_RUN
notification.data = {
"agent_name": f"Test Agent {i}",
"credits_used": 10.0 * (i + 1),
"execution_time": 5.0 * (i + 1),
"node_count": 3 + i,
"graph_id": f"graph_{i}",
"outputs": [],
}
notification.created_at = datetime.now(timezone.utc)
notifications.append(notification)
# Process notifications
failed_indices = []
recipient_email = "test@example.com"
# Simulate the error handling logic
i = 0
while i < len(sample_notifications):
try:
notification_manager.email_sender.send_templated(
notification=NotificationType.AGENT_RUN,
user_email=recipient_email,
data=sample_notifications[i],
user_unsub_link="http://example.com/unsub",
)
i += 1
except (ValueError, Exception) as e:
error_message = str(e).lower()
if "406" in error_message or "inactive" in error_message:
mock_logger.warning(
f"Failed to send notification at index {i}: "
f"Recipient marked as inactive by Postmark. "
f"Error: {e}. Skipping this notification."
)
elif "422" in error_message or "unprocessable" in error_message:
mock_logger.error(
f"Failed to send notification at index {i}: "
f"Malformed notification data rejected by Postmark. "
f"Error: {e}. Skipping this notification."
)
elif isinstance(e, ValueError) and "too large" in error_message:
mock_logger.error(
f"Failed to send notification at index {i}: "
f"Notification size exceeds email limit. "
f"Error: {e}. Skipping this notification."
)
failed_indices.append(i)
i += 1
# Verify all errors were handled
assert len(failed_indices) == len(sample_notifications)
# One warning (406) and two errors (422 and ValueError)
assert mock_logger.warning.call_count == 1
assert mock_logger.error.call_count == 2
def test_batch_continues_after_individual_failures(self, notification_manager):
"""Test that the batch continues processing after individual notification failures."""
with patch("backend.notifications.notifications.logger") as mock_logger:
# Create a larger batch of notifications
large_batch = [
NotificationEventModel(
type=NotificationType.AGENT_RUN,
user_id=f"user_{i}",
created_at=datetime.now(timezone.utc),
data=AgentRunData(
agent_name=f"Test Agent {i}",
credits_used=10.0 * i,
execution_time=5.0 * i,
node_count=3 + i,
graph_id=f"graph_{i}",
outputs=[],
),
)
for i in range(10)
]
# Mock the email sender to fail on specific indices (2, 5, 8)
call_count = [0] # Use list to make it mutable in the closure
def side_effect_function(*_, **__):
# Count which call this is
call_number = call_count[0]
call_count[0] += 1
if call_number in [2, 5, 8]:
raise Exception("Recipient marked as inactive (406)")
return None
notification_manager.email_sender.send_templated.side_effect = (
side_effect_function
# Setup mocks
mock_db = mock_db_client.return_value
mock_db.get_user_email_by_id = AsyncMock(return_value="test@example.com")
mock_db.get_user_notification_batch = AsyncMock(
return_value=Mock(notifications=notifications)
)
mock_db.empty_user_notification_batch = AsyncMock()
mock_unsub_link.return_value = "http://example.com/unsub"
# Process notifications
failed_indices = []
successful_indices = []
recipient_email = "test@example.com"
# Mock internal methods
notification_manager._should_email_user_based_on_preference = AsyncMock(
return_value=True
)
notification_manager._should_batch = AsyncMock(return_value=True)
notification_manager._parse_message = Mock(return_value=sample_batch_event)
# Simulate the error handling logic
i = 0
while i < len(large_batch):
try:
notification_manager.email_sender.send_templated(
notification=NotificationType.AGENT_RUN,
user_email=recipient_email,
data=large_batch[i],
user_unsub_link="http://example.com/unsub",
)
successful_indices.append(i)
i += 1
except Exception as e:
error_message = str(e).lower()
if "406" in error_message or "inactive" in error_message:
mock_logger.warning(
f"Failed to send notification at index {i}: "
f"Recipient marked as inactive by Postmark. "
f"Error: {e}. Skipping this notification."
)
# Track call count to return different errors
call_count = [0]
failed_indices.append(i)
i += 1
def send_side_effect(*args, **kwargs):
data = kwargs.get("data", [])
# Only fail for single notifications
if isinstance(data, list) and len(data) == 1:
current_call = call_count[0]
call_count[0] += 1
# Verify the batch continued processing
assert len(failed_indices) == 3 # Should have 3 failures
assert len(successful_indices) == 7 # Should have 7 successes
assert failed_indices == [2, 5, 8] # Specific indices that failed
assert mock_logger.warning.call_count == 3
if current_call == 0:
raise Exception("Recipient marked as inactive (406)")
elif current_call == 1:
raise Exception("Unprocessable entity (422)")
elif current_call == 2:
raise ValueError("Email body too large")
elif current_call == 3:
raise Exception("Generic API error")
else:
# Let the last one succeed
return None
# For batch attempts, fail to force single processing
raise Exception("Force single processing")
notification_manager.email_sender.send_templated.side_effect = send_side_effect
# Create message
message = sample_batch_event.model_dump_json()
# Act
result = await notification_manager._process_batch(message)
# Assert
assert result is True
# Verify mixed error handling
assert mock_logger.warning.called # For 406
assert mock_logger.error.called # For 422, ValueError, generic
assert mock_set_verification.called # For 406 error
@pytest.mark.asyncio
async def test_batch_continues_after_individual_failures(
self, notification_manager, sample_batch_event
):
"""Test that the batch continues processing after individual notification failures."""
with patch("backend.notifications.notifications.logger") as mock_logger, \
patch("backend.notifications.notifications.get_database_manager_async_client") as mock_db_client, \
patch("backend.notifications.notifications.generate_unsubscribe_link") as mock_unsub_link:
# Create larger batch
notifications = []
for i in range(10):
notification = Mock()
notification.type = NotificationType.AGENT_RUN
notification.data = {
"agent_name": f"Test Agent {i}",
"credits_used": 10.0 * (i + 1),
"execution_time": 5.0 * (i + 1),
"node_count": 3 + i,
"graph_id": f"graph_{i}",
"outputs": [],
}
notification.created_at = datetime.now(timezone.utc)
notifications.append(notification)
# Setup mocks
mock_db = mock_db_client.return_value
mock_db.get_user_email_by_id = AsyncMock(return_value="test@example.com")
mock_db.get_user_notification_batch = AsyncMock(
return_value=Mock(notifications=notifications)
)
mock_db.empty_user_notification_batch = AsyncMock()
mock_unsub_link.return_value = "http://example.com/unsub"
# Mock internal methods
notification_manager._should_email_user_based_on_preference = AsyncMock(
return_value=True
)
notification_manager._should_batch = AsyncMock(return_value=True)
notification_manager._parse_message = Mock(return_value=sample_batch_event)
# Track calls
call_count = [0]
successful_calls = []
failed_calls = []
def send_side_effect(*args, **kwargs):
data = kwargs.get("data", [])
# Only process single notifications
if isinstance(data, list) and len(data) == 1:
current_call = call_count[0]
call_count[0] += 1
# Fail on indices 2, 5, 8
if current_call in [2, 5, 8]:
failed_calls.append(current_call)
raise Exception("Recipient marked as inactive (406)")
else:
successful_calls.append(current_call)
return None
# Force single processing
raise Exception("Force single processing")
notification_manager.email_sender.send_templated.side_effect = send_side_effect
# Create message
message = sample_batch_event.model_dump_json()
# Act
result = await notification_manager._process_batch(message)
# Assert
assert result is True
# Verify batch processing continued despite failures
assert len(failed_calls) == 3 # Should have 3 failures
assert len(successful_calls) == 7 # Should have 7 successes
assert failed_calls == [2, 5, 8] # Specific indices that failed
# Verify warnings were logged for failures
assert mock_logger.warning.call_count >= 3
# Batch should not be emptied since not all were successful
assert not mock_db.empty_user_notification_batch.called