feat(backend): use "better" batching logic

This commit is contained in:
Nicholas Tindle
2025-02-10 05:43:34 -06:00
parent b6ecf9076a
commit c2f9e243fc

View File

@@ -18,6 +18,7 @@ from backend.executor.database import DatabaseManager
from backend.notifications.summary import SummaryManager
from backend.util.service import AppService, expose, get_service_client
from backend.util.settings import Settings
from aio_pika.exceptions import QueueEmpty
if TYPE_CHECKING:
from backend.executor import DatabaseManager
@@ -265,13 +266,46 @@ class NotificationManager(AppService):
logger.error(f"Error processing notification: {e}")
return False
async def _process_batch(self, messages: list[str]) -> bool:
"""Process a batch of notifications"""
try:
events = [NotificationEvent.parse_raw(msg) for msg in messages]
# Implementation of batch notification sending would go here
logger.info(f"Processing batch of {len(events)} notifications")
def should_send(self, batch: UserNotificationBatch) -> bool:
"""Determine if a batch should be sent"""
if not batch.notifications:
return False
# if any notifications are older than the batch delay, send them
if any(
notification.created_at < datetime.now() - get_batch_delay(batch.type)
for notification in batch.notifications
if isinstance(notification, NotificationEventModel)
):
logger.info(f"Sending batch of {len(batch.notifications)} notifications")
return True
return False
async def _process_batch_message(self, message: str) -> bool:
"""Process a batch notification & return status from processing"""
try:
event = NotificationEventModel.model_validate_json(message)
# Implementation of batch notification sending would go here
# Add to database
db = get_db_client()
logger.info(f"Processing batch ingestion of {event}")
logger.info(f"type of event: {type(event)}")
batch = db.create_or_add_to_user_notification_batch(
event.user_id, event.type, event
)
if not batch.notifications:
logger.info(f"No notifications to send for batch of {event.user_id}")
return True
if self.should_send(batch):
logger.info(
f"Processing batch of {len(batch.notifications)} notifications"
)
db.empty_user_notification_batch(event.user_id, batch.type)
# send_email_with_template(event.user_id, event.type, event.data)
else:
logger.info(f"Holding on to batch for {event.user_id}")
# queue a message to check again in x time if we haven't gotten any new to send it
return True
except Exception as e:
logger.error(f"Error processing batch: {e}")
return False
@@ -308,62 +342,71 @@ class NotificationManager(AppService):
while self.running:
try:
# Process immediate notifications
message = self.run_and_wait(immediate_queue.get())
if message:
success = self.run_and_wait(
self._process_notification(message.body.decode())
)
if success:
self.run_and_wait(message.ack())
else:
self.run_and_wait(message.reject(requeue=False))
try:
message = self.run_and_wait(immediate_queue.get())
# Process backoff notifications similarly
message = self.run_and_wait(backoff_queue.get())
if message:
success = self.run_and_wait(
self._process_notification(message.body.decode())
)
if success:
self.run_and_wait(message.ack())
else:
# If failed, will go to DLQ with delay
self.run_and_wait(message.reject(requeue=False))
# Process batch queues
for queue, batch_size in [
(hourly_queue, 50), # Process up to 50 messages per batch
(daily_queue, 100), # Process up to 100 messages per batch
]:
messages = []
while len(messages) < batch_size:
message = self.run_and_wait(queue.get())
if not message:
break
messages.append(message)
if messages:
if message:
success = self.run_and_wait(
self._process_batch([msg.body.decode() for msg in messages])
self._process_notification(message.body.decode())
)
if success:
for msg in messages:
self.run_and_wait(msg.ack())
self.run_and_wait(message.ack())
else:
for msg in messages:
self.run_and_wait(msg.reject(requeue=True))
self.run_and_wait(message.reject(requeue=False))
except QueueEmpty:
logger.info("Immediate queue empty")
# Process backoff notifications similarly
try:
message = self.run_and_wait(backoff_queue.get())
if message:
success = self.run_and_wait(
self._process_notification(message.body.decode())
)
if success:
self.run_and_wait(message.ack())
else:
# If failed, will go to DLQ with delay
self.run_and_wait(message.reject(requeue=False))
except QueueEmpty:
logger.info("Backoff queue empty")
# Add to plan db/process batch and delay or send
for queue in [
hourly_queue,
daily_queue,
]:
try:
message = self.run_and_wait(queue.get())
if message:
success = self.run_and_wait(
self._process_batch_message(message.body.decode())
)
if success:
self.run_and_wait(message.ack())
else:
self.run_and_wait(message.reject(requeue=True))
except QueueEmpty:
logger.info(f"Queue empty: {queue}")
# Process summary triggers
for queue in summary_queues:
message = self.run_and_wait(queue.get())
if message:
self.run_and_wait(
self._process_summary_trigger(message.body.decode())
)
self.run_and_wait(message.ack())
try:
message = self.run_and_wait(queue.get())
if message:
self.run_and_wait(
self._process_summary_trigger(message.body.decode())
)
self.run_and_wait(message.ack())
except QueueEmpty:
logger.info(f"Queue empty: {queue}")
time.sleep(0.1)
except QueueEmpty as e:
logger.info(f"Queue empty: {e}")
except Exception as e:
logger.error(f"Error in notification service loop: {e}")