mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
fix(backend/notifications): handle processing notifiactions one at a time so we dont resend everything in a batch
This commit is contained in:
@@ -235,6 +235,7 @@ class BaseEventModel(BaseModel):
|
||||
|
||||
|
||||
class NotificationEventModel(BaseEventModel, Generic[NotificationDataType_co]):
|
||||
id: Optional[str] = None # None when creating, populated when reading from DB
|
||||
data: NotificationDataType_co
|
||||
|
||||
@property
|
||||
@@ -378,6 +379,7 @@ class NotificationPreference(BaseModel):
|
||||
|
||||
|
||||
class UserNotificationEventDTO(BaseModel):
|
||||
id: str # Added to track notifications for removal
|
||||
type: NotificationType
|
||||
data: dict
|
||||
created_at: datetime
|
||||
@@ -386,6 +388,7 @@ class UserNotificationEventDTO(BaseModel):
|
||||
@staticmethod
|
||||
def from_db(model: NotificationEvent) -> "UserNotificationEventDTO":
|
||||
return UserNotificationEventDTO(
|
||||
id=model.id,
|
||||
type=model.type,
|
||||
data=dict(model.data),
|
||||
created_at=model.createdAt,
|
||||
@@ -541,6 +544,58 @@ async def empty_user_notification_batch(
|
||||
) from e
|
||||
|
||||
|
||||
async def remove_notifications_from_batch(
|
||||
user_id: str, notification_type: NotificationType, notification_ids: list[str]
|
||||
) -> None:
|
||||
"""Remove specific notifications from a user's batch by their IDs.
|
||||
|
||||
This is used after successful sending to remove only the
|
||||
sent notifications, preventing duplicates on retry.
|
||||
"""
|
||||
if not notification_ids:
|
||||
return
|
||||
|
||||
try:
|
||||
async with transaction() as tx:
|
||||
# Delete the specific notification events
|
||||
deleted_count = await tx.notificationevent.delete_many(
|
||||
where={
|
||||
"id": {"in": notification_ids},
|
||||
"UserNotificationBatch": {
|
||||
"is": {"userId": user_id, "type": notification_type}
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Removed {deleted_count} notifications from batch for user {user_id}"
|
||||
)
|
||||
|
||||
# Check if batch is now empty and delete it if so
|
||||
remaining = await tx.notificationevent.count(
|
||||
where={
|
||||
"UserNotificationBatch": {
|
||||
"is": {"userId": user_id, "type": notification_type}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
if remaining == 0:
|
||||
await tx.usernotificationbatch.delete_many(
|
||||
where=UserNotificationBatchWhereInput(
|
||||
userId=user_id,
|
||||
type=notification_type,
|
||||
)
|
||||
)
|
||||
logger.info(
|
||||
f"Deleted empty batch for user {user_id} and type {notification_type}"
|
||||
)
|
||||
except Exception as e:
|
||||
raise DatabaseError(
|
||||
f"Failed to remove notifications from batch for user {user_id} and type {notification_type}: {e}"
|
||||
) from e
|
||||
|
||||
|
||||
async def get_user_notification_batch(
|
||||
user_id: str,
|
||||
notification_type: NotificationType,
|
||||
|
||||
@@ -33,6 +33,7 @@ from backend.data.notifications import (
|
||||
get_all_batches_by_type,
|
||||
get_user_notification_batch,
|
||||
get_user_notification_oldest_message_in_batch,
|
||||
remove_notifications_from_batch,
|
||||
)
|
||||
from backend.data.user import (
|
||||
get_active_user_ids_in_timerange,
|
||||
@@ -151,6 +152,7 @@ class DatabaseManager(AppService):
|
||||
create_or_add_to_user_notification_batch
|
||||
)
|
||||
empty_user_notification_batch = _(empty_user_notification_batch)
|
||||
remove_notifications_from_batch = _(remove_notifications_from_batch)
|
||||
get_all_batches_by_type = _(get_all_batches_by_type)
|
||||
get_user_notification_batch = _(get_user_notification_batch)
|
||||
get_user_notification_oldest_message_in_batch = _(
|
||||
@@ -245,6 +247,7 @@ class DatabaseManagerAsyncClient(AppServiceClient):
|
||||
d.create_or_add_to_user_notification_batch
|
||||
)
|
||||
empty_user_notification_batch = d.empty_user_notification_batch
|
||||
remove_notifications_from_batch = d.remove_notifications_from_batch
|
||||
get_all_batches_by_type = d.get_all_batches_by_type
|
||||
get_user_notification_batch = d.get_user_notification_batch
|
||||
get_user_notification_oldest_message_in_batch = (
|
||||
|
||||
@@ -657,6 +657,7 @@ class NotificationManager(AppService):
|
||||
get_notif_data_type(db_event.type)
|
||||
].model_validate(
|
||||
{
|
||||
"id": db_event.id, # Include ID from database
|
||||
"user_id": event.user_id,
|
||||
"type": db_event.type,
|
||||
"data": db_event.data,
|
||||
@@ -679,6 +680,9 @@ class NotificationManager(AppService):
|
||||
chunk_sent = False
|
||||
for attempt_size in [chunk_size, 50, 25, 10, 5, 1]:
|
||||
chunk = batch_messages[i : i + attempt_size]
|
||||
chunk_ids = [
|
||||
msg.id for msg in chunk if msg.id
|
||||
] # Extract IDs for removal
|
||||
|
||||
try:
|
||||
# Try to render the email to check its size
|
||||
@@ -705,6 +709,21 @@ class NotificationManager(AppService):
|
||||
user_unsub_link=unsub_link,
|
||||
)
|
||||
|
||||
# Remove successfully sent notifications immediately
|
||||
if chunk_ids:
|
||||
try:
|
||||
await get_database_manager_async_client().remove_notifications_from_batch(
|
||||
event.user_id, event.type, chunk_ids
|
||||
)
|
||||
logger.info(
|
||||
f"Removed {len(chunk_ids)} sent notifications from batch"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to remove sent notifications: {e}"
|
||||
)
|
||||
# Continue anyway - better to risk duplicates than lose emails
|
||||
|
||||
# Track successful sends
|
||||
successfully_sent_count += len(chunk)
|
||||
|
||||
@@ -722,6 +741,21 @@ class NotificationManager(AppService):
|
||||
i += len(chunk)
|
||||
chunk_sent = True
|
||||
break
|
||||
else:
|
||||
# Message is too large even after size reduction
|
||||
if attempt_size == 1:
|
||||
logger.error(
|
||||
f"Failed to send notification at index {i}: "
|
||||
f"Single notification exceeds email size limit "
|
||||
f"({len(test_message):,} chars > {MAX_EMAIL_SIZE:,} chars). "
|
||||
f"Skipping this notification."
|
||||
)
|
||||
failed_indices.append(i)
|
||||
i += 1
|
||||
chunk_sent = True
|
||||
break
|
||||
# Try smaller chunk size
|
||||
continue
|
||||
except Exception as e:
|
||||
# Check if it's a Postmark API error
|
||||
if attempt_size == 1:
|
||||
@@ -791,18 +825,22 @@ class NotificationManager(AppService):
|
||||
failed_indices.append(i)
|
||||
i += 1
|
||||
|
||||
# Only empty the batch if ALL notifications were sent successfully
|
||||
if successfully_sent_count == len(batch_messages):
|
||||
logger.info(
|
||||
f"Successfully sent all {successfully_sent_count} notifications, clearing batch"
|
||||
)
|
||||
await get_database_manager_async_client().empty_user_notification_batch(
|
||||
# Check what remains in the batch (notifications are removed as sent)
|
||||
remaining_batch = (
|
||||
await get_database_manager_async_client().get_user_notification_batch(
|
||||
event.user_id, event.type
|
||||
)
|
||||
)
|
||||
|
||||
if not remaining_batch or not remaining_batch.notifications:
|
||||
logger.info(
|
||||
f"All {successfully_sent_count} notifications sent and removed from batch"
|
||||
)
|
||||
else:
|
||||
remaining_count = len(remaining_batch.notifications)
|
||||
logger.warning(
|
||||
f"Only sent {successfully_sent_count} of {len(batch_messages)} notifications. "
|
||||
f"Failed indices: {failed_indices}. Batch will be retained for retry."
|
||||
f"Sent {successfully_sent_count} notifications. "
|
||||
f"{remaining_count} remain in batch for retry due to errors."
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
|
||||
Reference in New Issue
Block a user