feat(backend): move failed messages into a dead letter queue (#9501)

<!-- Clearly explain the need for these changes: -->
When we fail to process something, we don't want to keep retrying
forever. We should store those and process them later

### Changes 🏗️

<!-- Concisely describe all of the changes made in this pull request:
-->
- Fix the type of the failed exchange from Direct to Topic to allow
filtering based on name (allows us later to do more advanced handling of
queue types)
- abstract processing the messages in a queue a bit to reduce repeated
code
- abstract how we check if a user wants a notification so that its a bit
easier to process
- Handle errors better
- Abstract model parsing

### Checklist 📋

#### For code changes:
- [ ] I have clearly listed my changes in the PR description
- [ ] I have made a test plan
- [ ] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [ ] ...

<details>
  <summary>Example test plan</summary>
  
  - [ ] Create from scratch and execute an agent with at least 3 blocks
- [ ] Import an agent from file upload, and confirm it executes
correctly
  - [ ] Upload agent to marketplace
- [ ] Import an agent from marketplace and confirm it executes correctly
  - [ ] Edit an agent from monitor, and confirm it executes correctly
</details>

---------

Co-authored-by: Bently <tomnoon9@gmail.com>
This commit is contained in:
Nicholas Tindle
2025-02-20 04:09:51 -06:00
committed by GitHub
parent 63005631f0
commit 4ae016606b

View File

@@ -1,9 +1,12 @@
import logging
import time
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any, Callable, Coroutine
import aio_pika
from aio_pika.exceptions import QueueEmpty
from autogpt_libs.utils.cache import thread_cached
from prisma.enums import NotificationType
from pydantic import BaseModel
from backend.data.notifications import (
BatchingStrategy,
@@ -25,14 +28,16 @@ logger = logging.getLogger(__name__)
settings = Settings()
class NotificationEvent(BaseModel):
event: NotificationEventDTO
model: NotificationEventModel
def create_notification_config() -> RabbitMQConfig:
"""Create RabbitMQ configuration for notifications"""
notification_exchange = Exchange(name="notifications", type=ExchangeType.TOPIC)
summary_exchange = Exchange(name="summaries", type=ExchangeType.TOPIC)
dead_letter_exchange = Exchange(name="dead_letter", type=ExchangeType.DIRECT)
delay_exchange = Exchange(name="delay", type=ExchangeType.DIRECT)
dead_letter_exchange = Exchange(name="dead_letter", type=ExchangeType.TOPIC)
queues = [
# Main notification queues
@@ -45,34 +50,6 @@ def create_notification_config() -> RabbitMQConfig:
"x-dead-letter-routing-key": "failed.immediate",
},
),
Queue(
name="backoff_notifications",
exchange=notification_exchange,
routing_key="notification.backoff.#",
arguments={
"x-dead-letter-exchange": dead_letter_exchange.name,
"x-dead-letter-routing-key": "failed.backoff",
},
),
# Summary queues
Queue(
name="daily_summary_trigger",
exchange=summary_exchange,
routing_key="summary.daily",
arguments={"x-message-ttl": 86400000}, # 24 hours
),
Queue(
name="weekly_summary_trigger",
exchange=summary_exchange,
routing_key="summary.weekly",
arguments={"x-message-ttl": 604800000}, # 7 days
),
Queue(
name="monthly_summary_trigger",
exchange=summary_exchange,
routing_key="summary.monthly",
arguments={"x-message-ttl": 2592000000}, # 30 days
),
# Failed notifications queue
Queue(
name="failed_notifications",
@@ -84,10 +61,7 @@ def create_notification_config() -> RabbitMQConfig:
return RabbitMQConfig(
exchanges=[
notification_exchange,
# batch_exchange,
summary_exchange,
dead_letter_exchange,
delay_exchange,
],
queues=queues,
)
@@ -151,18 +125,37 @@ class NotificationManager(AppService):
logger.error(f"Error queueing notification: {e}")
return NotificationResult(success=False, message=str(e))
async def _should_email_user_based_on_preference(
self, user_id: str, event_type: NotificationType
) -> bool:
return (
get_db_client()
.get_user_notification_preference(user_id)
.preferences[event_type]
)
def _parse_message(self, message: str) -> NotificationEvent | None:
try:
event = NotificationEventDTO.model_validate_json(message)
model = NotificationEventModel[
get_data_type(event.type)
].model_validate_json(message)
return NotificationEvent(event=event, model=model)
except Exception as e:
logger.error(f"Error parsing message due to non matching schema {e}")
return None
async def _process_immediate(self, message: str) -> bool:
"""Process a single notification immediately, returning whether to put into the failed queue"""
try:
event = NotificationEventDTO.model_validate_json(message)
parsed_event = NotificationEventModel[
get_data_type(event.type)
].model_validate_json(message)
parsed = self._parse_message(message)
if not parsed:
return False
event = parsed.event
model = parsed.model
user_email = get_db_client().get_user_email_by_id(event.user_id)
should_send = (
get_db_client()
.get_user_notification_preference(event.user_id)
.preferences[event.type]
should_send = await self._should_email_user_based_on_preference(
event.user_id, event.type
)
if not user_email:
logger.error(f"User email not found for user {event.user_id}")
@@ -172,13 +165,42 @@ class NotificationManager(AppService):
f"User {event.user_id} does not want to receive {event.type} notifications"
)
return True
self.email_sender.send_templated(event.type, user_email, parsed_event)
logger.info(f"Processing notification: {parsed_event}")
self.email_sender.send_templated(event.type, user_email, model)
logger.info(f"Processing notification: {model}")
return True
except Exception as e:
logger.error(f"Error processing notification: {e}")
return False
def _run_queue(
self,
queue: aio_pika.abc.AbstractQueue,
process_func: Callable[[str], Coroutine[Any, Any, bool]],
error_queue_name: str,
):
message: aio_pika.abc.AbstractMessage | None = None
try:
# This parameter "no_ack" is named like shit, think of it as "auto_ack"
message = self.run_and_wait(queue.get(timeout=1.0, no_ack=False))
result = self.run_and_wait(process_func(message.body.decode()))
if result:
self.run_and_wait(message.ack())
else:
self.run_and_wait(message.reject(requeue=False))
except QueueEmpty:
logger.debug(f"Queue {error_queue_name} empty")
except Exception as e:
if message:
logger.error(
f"Error in notification service loop, message rejected {e}"
)
self.run_and_wait(message.reject(requeue=False))
else:
logger.error(
f"Error in notification service loop, message unable to be rejected, and will have to be manually removed to free space in the queue: {e}"
)
def run_service(self):
logger.info(f"[{self.service_name}] Started notification service")
@@ -191,20 +213,11 @@ class NotificationManager(AppService):
while self.running:
try:
# Process immediate notifications
try:
message = self.run_and_wait(immediate_queue.get())
if message:
success = self.run_and_wait(
self._process_immediate(message.body.decode())
)
if success:
self.run_and_wait(message.ack())
else:
self.run_and_wait(message.reject(requeue=True))
except QueueEmpty:
logger.debug("Immediate queue empty")
self._run_queue(
queue=immediate_queue,
process_func=self._process_immediate,
error_queue_name="immediate_notifications",
)
time.sleep(0.1)