feat(backend): baseline summary processing (#9596)

<!-- Clearly explain the need for these changes: -->
We want to be able to process emails on a scheduled basis for summaries.
This adds the baselines for that

### Changes 🏗️
- Adds new tooling to Scheduluer to handle the in-memory schedule for
the weekly processing
- Adds new exposes to notification manager to handle the different data
models for scheduled emails
- adds new models to the notification data models to handle the
different requirements for scheduled emails, closely paralleling the
existing notification ones
- Adds new email template

Note: After testing, email sending was disabled until the template and
data filling are done later down the line. We don't want to email people
random stuff, ya know?

<!-- Concisely describe all of the changes made in this pull request:
-->

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [x] Test sending an email on the scheduled basis 
- [x] Make sure you get the email, ignoring the fact that all the data
isn't real inside it

---------

Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
This commit is contained in:
Nicholas Tindle
2025-03-12 00:19:23 -05:00
committed by GitHub
parent af65058bb7
commit 942ac0bae4
5 changed files with 403 additions and 33 deletions

View File

@@ -1,5 +1,5 @@
import logging
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Annotated, Any, Generic, Optional, TypeVar, Union
@@ -18,7 +18,12 @@ from .db import transaction
logger = logging.getLogger(__name__)
T_co = TypeVar("T_co", bound="BaseNotificationData", covariant=True)
NotificationDataType_co = TypeVar(
"NotificationDataType_co", bound="BaseNotificationData", covariant=True
)
SummaryParamsType_co = TypeVar(
"SummaryParamsType_co", bound="BaseSummaryParams", covariant=True
)
class QueueType(Enum):
@@ -47,6 +52,13 @@ class ZeroBalanceData(BaseNotificationData):
last_transaction_time: datetime
top_up_link: str
@field_validator("last_transaction_time")
@classmethod
def validate_timezone(cls, value: datetime):
if value.tzinfo is None:
raise ValueError("datetime must have timezone information")
return value
class LowBalanceData(BaseNotificationData):
agent_name: str = Field(..., description="Name of the agent")
@@ -75,6 +87,13 @@ class ContinuousAgentErrorData(BaseNotificationData):
error_time: datetime
attempts: int = Field(..., description="Number of retry attempts made")
@field_validator("start_time", "error_time")
@classmethod
def validate_timezone(cls, value: datetime):
if value.tzinfo is None:
raise ValueError("datetime must have timezone information")
return value
class BaseSummaryData(BaseNotificationData):
total_credits_used: float
@@ -87,18 +106,53 @@ class BaseSummaryData(BaseNotificationData):
cost_breakdown: dict[str, float]
class BaseSummaryParams(BaseModel):
pass
class DailySummaryParams(BaseSummaryParams):
date: datetime
@field_validator("date")
def validate_timezone(cls, value):
if value.tzinfo is None:
raise ValueError("datetime must have timezone information")
return value
class WeeklySummaryParams(BaseSummaryParams):
start_date: datetime
end_date: datetime
@field_validator("start_date", "end_date")
def validate_timezone(cls, value):
if value.tzinfo is None:
raise ValueError("datetime must have timezone information")
return value
class DailySummaryData(BaseSummaryData):
date: datetime
@field_validator("date")
def validate_timezone(cls, value):
if value.tzinfo is None:
raise ValueError("datetime must have timezone information")
return value
class WeeklySummaryData(BaseSummaryData):
start_date: datetime
end_date: datetime
week_number: int
year: int
@field_validator("start_date", "end_date")
def validate_timezone(cls, value):
if value.tzinfo is None:
raise ValueError("datetime must have timezone information")
return value
class MonthlySummaryData(BaseSummaryData):
class MonthlySummaryData(BaseNotificationData):
month: int
year: int
@@ -125,6 +179,7 @@ NotificationData = Annotated[
WeeklySummaryData,
DailySummaryData,
RefundRequestData,
BaseSummaryData,
],
Field(discriminator="type"),
]
@@ -134,15 +189,22 @@ class NotificationEventDTO(BaseModel):
user_id: str
type: NotificationType
data: dict
created_at: datetime = Field(default_factory=datetime.now)
created_at: datetime = Field(default_factory=lambda: datetime.now(tz=timezone.utc))
retry_count: int = 0
class NotificationEventModel(BaseModel, Generic[T_co]):
class SummaryParamsEventDTO(BaseModel):
user_id: str
type: NotificationType
data: T_co
created_at: datetime = Field(default_factory=datetime.now)
data: dict
created_at: datetime = Field(default_factory=lambda: datetime.now(tz=timezone.utc))
class NotificationEventModel(BaseModel, Generic[NotificationDataType_co]):
user_id: str
type: NotificationType
data: NotificationDataType_co
created_at: datetime = Field(default_factory=lambda: datetime.now(tz=timezone.utc))
@property
def strategy(self) -> QueueType:
@@ -159,7 +221,14 @@ class NotificationEventModel(BaseModel, Generic[T_co]):
return NotificationTypeOverride(self.type).template
def get_data_type(
class SummaryParamsEventModel(BaseModel, Generic[SummaryParamsType_co]):
user_id: str
type: NotificationType
data: SummaryParamsType_co
created_at: datetime = Field(default_factory=lambda: datetime.now(tz=timezone.utc))
def get_notif_data_type(
notification_type: NotificationType,
) -> type[BaseNotificationData]:
return {
@@ -176,11 +245,20 @@ def get_data_type(
}[notification_type]
def get_summary_params_type(
notification_type: NotificationType,
) -> type[BaseSummaryParams]:
return {
NotificationType.DAILY_SUMMARY: DailySummaryParams,
NotificationType.WEEKLY_SUMMARY: WeeklySummaryParams,
}[notification_type]
class NotificationBatch(BaseModel):
user_id: str
events: list[NotificationEvent]
strategy: QueueType
last_update: datetime = datetime.now()
last_update: datetime = Field(default_factory=lambda: datetime.now(tz=timezone.utc))
class NotificationResult(BaseModel):
@@ -258,7 +336,9 @@ class NotificationPreference(BaseModel):
)
daily_limit: int = 10 # Max emails per day
emails_sent_today: int = 0
last_reset_date: datetime = Field(default_factory=datetime.now)
last_reset_date: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc)
)
def get_batch_delay(notification_type: NotificationType) -> timedelta:

View File

@@ -5,6 +5,7 @@ from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from apscheduler.job import Job as JobObj
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
@@ -93,9 +94,18 @@ def process_existing_batches(**kwargs):
logger.exception(f"Error processing existing batches: {e}")
def process_weekly_summary(**kwargs):
try:
log("Processing weekly summary")
get_notification_client().queue_weekly_summary()
except Exception as e:
logger.exception(f"Error processing weekly summary: {e}")
class Jobstores(Enum):
EXECUTION = "execution"
BATCHED_NOTIFICATIONS = "batched_notifications"
WEEKLY_NOTIFICATIONS = "weekly_notifications"
class ExecutionJobArgs(BaseModel):
@@ -189,6 +199,8 @@ class Scheduler(AppService):
metadata=MetaData(schema=db_schema),
tablename="apscheduler_jobs_batched_notifications",
),
# These don't really need persistence
Jobstores.WEEKLY_NOTIFICATIONS.value: MemoryJobStore(),
}
)
self.scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
@@ -242,6 +254,9 @@ class Scheduler(AppService):
) -> list[ExecutionJobInfo]:
schedules = []
for job in self.scheduler.get_jobs(jobstore=Jobstores.EXECUTION.value):
logger.info(
f"Found job {job.id} with cron schedule {job.trigger} and args {job.kwargs}"
)
job_args = ExecutionJobArgs(**job.kwargs)
if (
job.next_run_time is not None
@@ -271,3 +286,21 @@ class Scheduler(AppService):
)
log(f"Added job {job.id} with cron schedule '{cron}' input data: {data}")
return NotificationJobInfo.from_db(job_args, job)
@expose
def add_weekly_notification_schedule(self, cron: str) -> NotificationJobInfo:
job = self.scheduler.add_job(
process_weekly_summary,
CronTrigger.from_crontab(cron),
kwargs={},
replace_existing=True,
jobstore=Jobstores.WEEKLY_NOTIFICATIONS.value,
)
log(f"Added job {job.id} with cron schedule '{cron}'")
return NotificationJobInfo.from_db(
NotificationJobArgs(
cron=cron, notification_types=[NotificationType.WEEKLY_SUMMARY]
),
job,
)

View File

@@ -7,9 +7,9 @@ from prisma.enums import NotificationType
from pydantic import BaseModel
from backend.data.notifications import (
NotificationDataType_co,
NotificationEventModel,
NotificationTypeOverride,
T_co,
)
from backend.util.settings import Settings
from backend.util.text import TextFormatter
@@ -48,7 +48,10 @@ class EmailSender:
self,
notification: NotificationType,
user_email: str,
data: NotificationEventModel[T_co] | list[NotificationEventModel[T_co]],
data: (
NotificationEventModel[NotificationDataType_co]
| list[NotificationEventModel[NotificationDataType_co]]
),
user_unsub_link: str | None = None,
):
"""Send an email to a user using a template pulled from the notification type"""

View File

@@ -1,6 +1,6 @@
import logging
import time
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from typing import Callable
import aio_pika
@@ -10,21 +10,32 @@ from prisma.enums import NotificationType
from pydantic import BaseModel
from backend.data.notifications import (
BaseSummaryData,
BaseSummaryParams,
DailySummaryData,
DailySummaryParams,
NotificationEventDTO,
NotificationEventModel,
NotificationResult,
NotificationTypeOverride,
QueueType,
SummaryParamsEventDTO,
SummaryParamsEventModel,
WeeklySummaryData,
WeeklySummaryParams,
create_or_add_to_user_notification_batch,
empty_user_notification_batch,
get_all_batches_by_type,
get_batch_delay,
get_data_type,
get_notif_data_type,
get_summary_params_type,
get_user_notification_batch,
get_user_notification_oldest_message_in_batch,
)
from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig
from backend.data.user import (
generate_unsubscribe_link,
get_active_user_ids_in_timerange,
get_user_email_by_id,
get_user_email_verification,
get_user_notification_preference,
@@ -68,6 +79,17 @@ def create_notification_config() -> RabbitMQConfig:
"x-dead-letter-routing-key": "failed.admin",
},
),
# Summary notification queues
Queue(
name="summary_notifications",
exchange=notification_exchange,
routing_key="notification.summary.#",
arguments={
"x-dead-letter-exchange": dead_letter_exchange.name,
"x-dead-letter-routing-key": "failed.summary.weekly",
"x-dead-letter-routing-key": "failed.summary",
},
),
# Batch Queue
Queue(
name="batch_notifications",
@@ -116,19 +138,53 @@ class NotificationManager(AppService):
def get_port(cls) -> int:
return settings.config.notification_service_port
def get_routing_key(self, event: NotificationEventModel) -> str:
def get_routing_key(self, event_type: NotificationType) -> str:
strategy = NotificationTypeOverride(event_type).strategy
"""Get the appropriate routing key for an event"""
if event.strategy == QueueType.IMMEDIATE:
return f"notification.immediate.{event.type.value}"
elif event.strategy == QueueType.BACKOFF:
return f"notification.backoff.{event.type.value}"
elif event.strategy == QueueType.ADMIN:
return f"notification.admin.{event.type.value}"
elif event.strategy == QueueType.BATCH:
return f"notification.batch.{event.type.value}"
elif event.strategy == QueueType.SUMMARY:
return f"notification.summary.{event.type.value}"
return f"notification.{event.type.value}"
if strategy == QueueType.IMMEDIATE:
return f"notification.immediate.{event_type.value}"
elif strategy == QueueType.BACKOFF:
return f"notification.backoff.{event_type.value}"
elif strategy == QueueType.ADMIN:
return f"notification.admin.{event_type.value}"
elif strategy == QueueType.BATCH:
return f"notification.batch.{event_type.value}"
elif strategy == QueueType.SUMMARY:
return f"notification.summary.{event_type.value}"
return f"notification.{event_type.value}"
@expose
def queue_weekly_summary(self):
"""Process weekly summary for specified notification types"""
try:
logger.info("Processing weekly summary queuing operation")
processed_count = 0
current_time = datetime.now(tz=timezone.utc)
start_time = current_time - timedelta(days=7)
users = self.run_and_wait(
get_active_user_ids_in_timerange(
end_time=current_time.isoformat(),
start_time=start_time.isoformat(),
)
)
for user in users:
self._queue_scheduled_notification(
SummaryParamsEventDTO(
user_id=user,
type=NotificationType.WEEKLY_SUMMARY,
data=WeeklySummaryParams(
start_date=start_time,
end_date=current_time,
).model_dump(),
),
)
processed_count += 1
logger.info(f"Processed {processed_count} weekly summaries into queue")
except Exception as e:
logger.exception(f"Error processing weekly summary: {e}")
@expose
def process_existing_batches(self, notification_types: list[NotificationType]):
@@ -206,7 +262,7 @@ class NotificationManager(AppService):
events = [
NotificationEventModel[
get_data_type(db_event.type)
get_notif_data_type(db_event.type)
].model_validate(
{
"user_id": batch.userId,
@@ -259,9 +315,9 @@ class NotificationManager(AppService):
logger.info(f"Received Request to queue {event=}")
# Workaround for not being able to serialize generics over the expose bus
parsed_event = NotificationEventModel[
get_data_type(event.type)
get_notif_data_type(event.type)
].model_validate(event.model_dump())
routing_key = self.get_routing_key(parsed_event)
routing_key = self.get_routing_key(parsed_event.type)
message = parsed_event.model_dump_json()
logger.info(f"Received Request to queue {message=}")
@@ -288,6 +344,36 @@ class NotificationManager(AppService):
logger.exception(f"Error queueing notification: {e}")
return NotificationResult(success=False, message=str(e))
def _queue_scheduled_notification(self, event: SummaryParamsEventDTO):
"""Queue a scheduled notification - exposed method for other services to call"""
try:
logger.info(f"Received Request to queue scheduled notification {event=}")
parsed_event = SummaryParamsEventModel[
get_summary_params_type(event.type)
].model_validate(event.model_dump())
routing_key = self.get_routing_key(event.type)
message = parsed_event.model_dump_json()
logger.info(f"Received Request to queue {message=}")
exchange = "notifications"
# Publish to RabbitMQ
self.run_and_wait(
self.rabbit.publish_message(
routing_key=routing_key,
message=message,
exchange=next(
ex for ex in self.rabbit_config.exchanges if ex.name == exchange
),
)
)
except Exception as e:
logger.exception(f"Error queueing notification: {e}")
def _should_email_user_based_on_preference(
self, user_id: str, event_type: NotificationType
) -> bool:
@@ -299,6 +385,86 @@ class NotificationManager(AppService):
# only if both are true, should we email this person
return validated_email and preference
async def _gather_summary_data(
self, user_id: str, event_type: NotificationType, params: BaseSummaryParams
) -> BaseSummaryData:
"""Gathers the data to build a summary notification"""
logger.info(
f"Gathering summary data for {user_id} and {event_type} wiht {params=}"
)
# total_credits_used = self.run_and_wait(
# get_total_credits_used(user_id, start_time, end_time)
# )
# total_executions = self.run_and_wait(
# get_total_executions(user_id, start_time, end_time)
# )
# most_used_agent = self.run_and_wait(
# get_most_used_agent(user_id, start_time, end_time)
# )
# execution_times = self.run_and_wait(
# get_execution_time(user_id, start_time, end_time)
# )
# runs = self.run_and_wait(
# get_runs(user_id, start_time, end_time)
# )
total_credits_used = 3.0
total_executions = 2
most_used_agent = {"name": "Some"}
execution_times = [1, 2, 3]
runs = [{"status": "COMPLETED"}, {"status": "FAILED"}]
successful_runs = len([run for run in runs if run["status"] == "COMPLETED"])
failed_runs = len([run for run in runs if run["status"] != "COMPLETED"])
average_execution_time = (
sum(execution_times) / len(execution_times) if execution_times else 0
)
# cost_breakdown = self.run_and_wait(
# get_cost_breakdown(user_id, start_time, end_time)
# )
cost_breakdown = {
"agent1": 1.0,
"agent2": 2.0,
}
if event_type == NotificationType.DAILY_SUMMARY and isinstance(
params, DailySummaryParams
):
return DailySummaryData(
total_credits_used=total_credits_used,
total_executions=total_executions,
most_used_agent=most_used_agent["name"],
total_execution_time=sum(execution_times),
successful_runs=successful_runs,
failed_runs=failed_runs,
average_execution_time=average_execution_time,
cost_breakdown=cost_breakdown,
date=params.date,
)
elif event_type == NotificationType.WEEKLY_SUMMARY and isinstance(
params, WeeklySummaryParams
):
return WeeklySummaryData(
total_credits_used=total_credits_used,
total_executions=total_executions,
most_used_agent=most_used_agent["name"],
total_execution_time=sum(execution_times),
successful_runs=successful_runs,
failed_runs=failed_runs,
average_execution_time=average_execution_time,
cost_breakdown=cost_breakdown,
start_date=params.start_date,
end_date=params.end_date,
)
else:
raise ValueError("Invalid event type or params")
async def _should_batch(
self, user_id: str, event_type: NotificationType, event: NotificationEventModel
) -> bool:
@@ -329,7 +495,7 @@ class NotificationManager(AppService):
try:
event = NotificationEventDTO.model_validate_json(message)
model = NotificationEventModel[
get_data_type(event.type)
get_notif_data_type(event.type)
].model_validate_json(message)
return NotificationEvent(event=event, model=model)
except Exception as e:
@@ -429,7 +595,9 @@ class NotificationManager(AppService):
unsub_link = generate_unsubscribe_link(event.user_id)
batch_messages = [
NotificationEventModel[get_data_type(db_event.type)].model_validate(
NotificationEventModel[
get_notif_data_type(db_event.type)
].model_validate(
{
"user_id": event.user_id,
"type": db_event.type,
@@ -453,6 +621,53 @@ class NotificationManager(AppService):
logger.exception(f"Error processing notification for batch queue: {e}")
return False
def _process_summary(self, message: str) -> bool:
"""Process a single notification with a summary strategy, returning whether to put into the failed queue"""
try:
logger.info(f"Processing summary notification: {message}")
event = SummaryParamsEventDTO.model_validate_json(message)
model = SummaryParamsEventModel[
get_summary_params_type(event.type)
].model_validate_json(message)
logger.info(f"Processing summary notification: {model}")
recipient_email = self.run_and_wait(get_user_email_by_id(event.user_id))
if not recipient_email:
logger.error(f"User email not found for user {event.user_id}")
return False
should_send = self._should_email_user_based_on_preference(
event.user_id, event.type
)
if not should_send:
logger.info(
f"User {event.user_id} does not want to receive {event.type} notifications"
)
return True
summary_data = self.run_and_wait(
self._gather_summary_data(event.user_id, event.type, model.data)
)
unsub_link = generate_unsubscribe_link(event.user_id)
data = NotificationEventModel(
user_id=event.user_id,
type=event.type,
data=summary_data,
)
self.email_sender.send_templated(
notification=event.type,
user_email=recipient_email,
data=data,
user_unsub_link=unsub_link,
)
return True
except Exception as e:
logger.exception(f"Error processing notification for summary queue: {e}")
return False
def _run_queue(
self,
queue: aio_pika.abc.AbstractQueue,
@@ -493,6 +708,10 @@ class NotificationManager(AppService):
data={},
cron="0 * * * *",
)
# get_scheduler().add_weekly_notification_schedule(
# # weekly on Friday at 12pm
# cron="0 12 * * 5",
# )
logger.info("Scheduled notification cleanup")
except Exception as e:
logger.error(f"Error scheduling notification cleanup: {e}")
@@ -507,6 +726,8 @@ class NotificationManager(AppService):
admin_queue = self.run_and_wait(channel.get_queue("admin_notifications"))
summary_queue = self.run_and_wait(channel.get_queue("summary_notifications"))
while self.running:
try:
self._run_queue(
@@ -525,6 +746,12 @@ class NotificationManager(AppService):
error_queue_name="batch_notifications",
)
self._run_queue(
queue=summary_queue,
process_func=self._process_summary,
error_queue_name="summary_notifications",
)
time.sleep(0.1)
except QueueEmpty as e:

View File

@@ -0,0 +1,27 @@
{# Weekly Summary #}
{# Template variables:
data: the stuff below
data.start_date: the start date of the summary
data.end_date: the end date of the summary
data.total_credits_used: the total credits used during the summary
data.total_executions: the total number of executions during the summary
data.most_used_agent: the most used agent's nameduring the summary
data.total_execution_time: the total execution time during the summary
data.successful_runs: the total number of successful runs during the summary
data.failed_runs: the total number of failed runs during the summary
data.average_execution_time: the average execution time during the summary
data.cost_breakdown: the cost breakdown during the summary
#}
<h1>Weekly Summary</h1>
<p>Start Date: {{ data.start_date }}</p>
<p>End Date: {{ data.end_date }}</p>
<p>Total Credits Used: {{ data.total_credits_used }}</p>
<p>Total Executions: {{ data.total_executions }}</p>
<p>Most Used Agent: {{ data.most_used_agent }}</p>
<p>Total Execution Time: {{ data.total_execution_time }}</p>
<p>Successful Runs: {{ data.successful_runs }}</p>
<p>Failed Runs: {{ data.failed_runs }}</p>
<p>Average Execution Time: {{ data.average_execution_time }}</p>
<p>Cost Breakdown: {{ data.cost_breakdown }}</p>