feat(backend): optimize processing of queues in notif service (#10513)

The notification service was running an inefficient polling loop that
constantly
checked each queue sequentially with 1-second timeouts, even when queues
were
  empty. This caused:
  - High CPU usage from continuous polling
- Sequential processing that blocked queues from being processed in
parallel
- Unnecessary delays from timeout-based polling instead of event-driven
  consumption
- Poor throughput (500-2,000 messages/second) compared to potential
(8,000-12,000
  messages/second)

  ## Changes 🏗️

- Replaced polling-based _run_queue() with event-driven _consume_queue()
using
  async iterators
- Implemented concurrent queue consumption using asyncio.gather()
instead of
  sequential processing
  - Added QoS settings (prefetch_count=10) to control memory usage
- Improved error handling with message.process() context manager for
automatic
  ack/nack
  - Added graceful shutdown that properly cancels all consumer tasks
  - Removed unused QueueEmpty import

  ## Checklist 📋

 ### For code changes:

- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [ ] I have tested my changes according to the test plan:
    - [ ] Deploy to test environment and monitor CPU usage
- [ ] Verify all queue types (immediate, admin, batch, summary) process
messages
  correctly
    - [ ] Test graceful shutdown with messages in flight
    - [ ] Monitor that database management service remains stable
    - [ ] Check logs for proper consumer startup messages
    - [ ] Verify messages are properly acked/nacked on success/failure

---------

Co-authored-by: Claude <claude@users.noreply.github.com>
Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
This commit is contained in:
Nicholas Tindle
2025-07-31 12:17:05 -05:00
committed by GitHub
parent 686d811062
commit d323dc2821
2 changed files with 165 additions and 49 deletions

View File

@@ -82,6 +82,19 @@ class EmailSender:
logger.error(f"Error formatting full message: {e}")
raise e
# Check email size (Postmark limit is 5MB = 5,242,880 characters)
email_size = len(full_message)
if email_size > 5_000_000: # Leave some buffer
logger.warning(
f"Email size ({email_size} chars) exceeds safe limit. "
f"This should have been chunked before calling send_templated."
)
raise ValueError(
f"Email body too large: {email_size} characters (limit: 5,242,880)"
)
logger.debug(f"Sending email with size: {email_size} characters")
self._send_email(
user_email=user_email,
user_unsubscribe_link=user_unsub_link,

View File

@@ -5,7 +5,6 @@ from datetime import datetime, timedelta, timezone
from typing import Callable
import aio_pika
from aio_pika.exceptions import QueueEmpty
from autogpt_libs.utils.cache import thread_cached
from prisma.enums import NotificationType
@@ -634,14 +633,93 @@ class NotificationManager(AppService):
for db_event in batch.notifications
]
self.email_sender.send_templated(
notification=event.type,
user_email=recipient_email,
data=batch_messages,
user_unsub_link=unsub_link,
)
# only empty the batch if we sent the email successfully
get_db().empty_user_notification_batch(event.user_id, event.type)
# Split batch into chunks to avoid exceeding email size limits
# Start with a reasonable chunk size and adjust dynamically
MAX_EMAIL_SIZE = 4_500_000 # 4.5MB to leave buffer under 5MB limit
chunk_size = 100 # Initial chunk size
successfully_sent_count = 0
failed_indices = []
i = 0
while i < len(batch_messages):
# Try progressively smaller chunks if needed
chunk_sent = False
for attempt_size in [chunk_size, 50, 25, 10, 5, 1]:
chunk = batch_messages[i : i + attempt_size]
try:
# Try to render the email to check its size
template = self.email_sender._get_template(event.type)
_, test_message = self.email_sender.formatter.format_email(
base_template=template.base_template,
subject_template=template.subject_template,
content_template=template.body_template,
data={"notifications": chunk},
unsubscribe_link=f"{self.email_sender.formatter.env.globals.get('base_url', '')}/profile/settings",
)
if len(test_message) < MAX_EMAIL_SIZE:
# Size is acceptable, send the email
logger.info(
f"Sending email with {len(chunk)} notifications "
f"(size: {len(test_message):,} chars)"
)
self.email_sender.send_templated(
notification=event.type,
user_email=recipient_email,
data=chunk,
user_unsub_link=unsub_link,
)
# Track successful sends
successfully_sent_count += len(chunk)
# Update chunk_size for next iteration based on success
if (
attempt_size == chunk_size
and len(test_message) < MAX_EMAIL_SIZE * 0.7
):
# If we're well under limit, try larger chunks next time
chunk_size = min(chunk_size + 10, 100)
elif len(test_message) > MAX_EMAIL_SIZE * 0.9:
# If we're close to limit, use smaller chunks
chunk_size = max(attempt_size - 10, 1)
i += len(chunk)
chunk_sent = True
break
except Exception as e:
if attempt_size == 1:
# Even single notification is too large
logger.error(
f"Single notification too large to send: {e}. "
f"Skipping notification at index {i}"
)
failed_indices.append(i)
i += 1
chunk_sent = True
break
# Try smaller chunk
continue
if not chunk_sent:
# Should not reach here due to single notification handling
logger.error(f"Failed to send notifications starting at index {i}")
failed_indices.append(i)
i += 1
# Only empty the batch if ALL notifications were sent successfully
if successfully_sent_count == len(batch_messages):
logger.info(
f"Successfully sent all {successfully_sent_count} notifications, clearing batch"
)
get_db().empty_user_notification_batch(event.user_id, event.type)
else:
logger.warning(
f"Only sent {successfully_sent_count} of {len(batch_messages)} notifications. "
f"Failed indices: {failed_indices}. Batch will be retained for retry."
)
return True
except Exception as e:
logger.exception(f"Error processing notification for batch queue: {e}")
@@ -694,36 +772,42 @@ class NotificationManager(AppService):
logger.exception(f"Error processing notification for summary queue: {e}")
return False
async def _run_queue(
async def _consume_queue(
self,
queue: aio_pika.abc.AbstractQueue,
process_func: Callable[[str], bool],
error_queue_name: str,
queue_name: str,
):
message: aio_pika.abc.AbstractMessage | None = None
try:
# This parameter "no_ack" is named like shit, think of it as "auto_ack"
message = await queue.get(timeout=1.0, no_ack=False)
result = process_func(message.body.decode())
if result:
await message.ack()
else:
await message.reject(requeue=False)
"""Continuously consume messages from a queue using async iteration"""
logger.info(f"Starting consumer for queue: {queue_name}")
except QueueEmpty:
logger.debug(f"Queue {error_queue_name} empty")
except TimeoutError:
logger.debug(f"Queue {error_queue_name} timed out")
try:
async with queue.iterator() as queue_iter:
async for message in queue_iter:
if not self.running:
break
try:
async with message.process():
result = process_func(message.body.decode())
if not result:
# Message will be rejected when exiting context without exception
raise aio_pika.exceptions.MessageProcessError(
"Processing failed"
)
except aio_pika.exceptions.MessageProcessError:
# Let message.process() handle the rejection
pass
except Exception as e:
logger.error(f"Error processing message in {queue_name}: {e}")
# Let message.process() handle the rejection
raise
except asyncio.CancelledError:
logger.info(f"Consumer for {queue_name} cancelled")
raise
except Exception as e:
if message:
logger.error(
f"Error in notification service loop, message rejected {e}"
)
await message.reject(requeue=False)
else:
logger.exception(
f"Error in notification service loop, message unable to be rejected, and will have to be manually removed to free space in the queue: {e=}"
)
logger.exception(f"Fatal error in consumer for {queue_name}: {e}")
raise
@continuous_retry()
def run_service(self):
@@ -736,41 +820,60 @@ class NotificationManager(AppService):
logger.info(f"[{self.service_name}] Started notification service")
# Set up queue consumers
# Set up queue consumers with QoS settings
channel = await self.rabbit.get_channel()
# Set prefetch to prevent overwhelming the service
await channel.set_qos(prefetch_count=10)
immediate_queue = await channel.get_queue("immediate_notifications")
batch_queue = await channel.get_queue("batch_notifications")
admin_queue = await channel.get_queue("admin_notifications")
summary_queue = await channel.get_queue("summary_notifications")
while self.running:
try:
await self._run_queue(
# Create consumer tasks for each queue - running in parallel
consumer_tasks = [
asyncio.create_task(
self._consume_queue(
queue=immediate_queue,
process_func=self._process_immediate,
error_queue_name="immediate_notifications",
queue_name="immediate_notifications",
)
await self._run_queue(
),
asyncio.create_task(
self._consume_queue(
queue=admin_queue,
process_func=self._process_admin_message,
error_queue_name="admin_notifications",
queue_name="admin_notifications",
)
await self._run_queue(
),
asyncio.create_task(
self._consume_queue(
queue=batch_queue,
process_func=self._process_batch,
error_queue_name="batch_notifications",
queue_name="batch_notifications",
)
await self._run_queue(
),
asyncio.create_task(
self._consume_queue(
queue=summary_queue,
process_func=self._process_summary,
error_queue_name="summary_notifications",
queue_name="summary_notifications",
)
await asyncio.sleep(0.1)
except QueueEmpty as e:
logger.debug(f"Queue empty: {e}")
),
]
try:
# Run all consumers concurrently
await asyncio.gather(*consumer_tasks)
except asyncio.CancelledError:
logger.info("Service shutdown requested")
# Cancel all consumer tasks
for task in consumer_tasks:
task.cancel()
# Wait for all tasks to complete cancellation
await asyncio.gather(*consumer_tasks, return_exceptions=True)
raise
def cleanup(self):
"""Cleanup service resources"""