feat(backend): baseline notifications service

This commit is contained in:
Nicholas Tindle
2025-02-03 16:19:36 -06:00
parent 6ed6fa1033
commit bef5a83bef
6 changed files with 269 additions and 1 deletions

View File

@@ -0,0 +1,5 @@
from .notifications import NotificationManager
__all__ = [
"NotificationManager",
]

View File

@@ -0,0 +1,3 @@
class AsyncEmailSender:
def send_email(self, user_id: str, subject: str, body: str):
pass

View File

@@ -0,0 +1,79 @@
from datetime import datetime
from enum import Enum
from typing import Optional
from pydantic import BaseModel, EmailStr
class BatchingStrategy(str, Enum):
IMMEDIATE = "immediate" # Send right away (errors, critical notifications)
HOURLY = "hourly" # Batch for up to an hour (usage reports)
DAILY = "daily" # Daily digest (summary notifications)
class NotificationType(str, Enum):
AGENT_RUN = "agent_run" # BatchingStrategy.HOURLY
ZERO_BALANCE = "zero_balance" # BatchingStrategy.IMMEDIATE
LOW_BALANCE = "low_balance" # BatchingStrategy.IMMEDIATE
BLOCK_EXECUTION_FAILED = "block_execution_failed" # BatchingStrategy.IMMEDIATE
CONTINUOUS_AGENT_ERROR = "continuous_agent_error" # BatchingStrategy.IMMEDIATE
@property
def strategy(self) -> BatchingStrategy:
BATCHING_RULES = {
NotificationType.AGENT_RUN: BatchingStrategy.HOURLY,
NotificationType.ZERO_BALANCE: BatchingStrategy.IMMEDIATE,
NotificationType.LOW_BALANCE: BatchingStrategy.IMMEDIATE,
NotificationType.BLOCK_EXECUTION_FAILED: BatchingStrategy.IMMEDIATE,
NotificationType.CONTINUOUS_AGENT_ERROR: BatchingStrategy.IMMEDIATE,
}
return BATCHING_RULES.get(self, BatchingStrategy.HOURLY)
@property
def template(self) -> str:
"""Returns template name for this notification type"""
return {
NotificationType.AGENT_RUN: "agent_run.html",
NotificationType.ZERO_BALANCE: "zero_balance.html",
NotificationType.LOW_BALANCE: "low_balance.html",
NotificationType.BLOCK_EXECUTION_FAILED: "block_failed.html",
NotificationType.CONTINUOUS_AGENT_ERROR: "agent_error.html",
}[self]
class NotificationEvent(BaseModel):
user_id: str
type: NotificationType
data: dict
created_at: datetime = datetime.now()
@property
def strategy(self) -> BatchingStrategy:
return self.type.strategy
@property
def template(self) -> str:
return self.type.template
class NotificationBatch(BaseModel):
user_id: str
events: list[NotificationEvent]
strategy: BatchingStrategy
last_update: datetime = datetime.now()
class NotificationResult(BaseModel):
success: bool
message: Optional[str] = None
class NotificationPreference(BaseModel):
"""User's notification preferences"""
user_id: str
email: EmailStr
preferences: dict[NotificationType, bool] = {} # Which notifications they want
daily_limit: int = 10 # Max emails per day
emails_sent_today: int = 0
last_reset_date: datetime = datetime.now()

View File

@@ -0,0 +1,177 @@
import logging
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
from autogpt_libs.utils.cache import thread_cached
from backend.notifications.models import (
BatchingStrategy,
NotificationBatch,
NotificationEvent,
NotificationResult,
)
if TYPE_CHECKING:
from backend.executor import DatabaseManager
from backend.util.service import AppService, expose, get_service_client
from backend.data.redis import get_redis, get_redis_async
from backend.util.settings import Settings
logger = logging.getLogger(__name__)
settings = Settings()
class NotificationManager(AppService):
"""Service for handling notifications with batching support"""
def __init__(self):
super().__init__()
self.use_redis = True
self.batch_key_prefix = "notification_batch:"
self.running = True
@classmethod
def get_port(cls) -> int:
return settings.config.notification_service_port
@expose
def queue_notification(self, event: NotificationEvent) -> NotificationResult:
"""Queue a notification - exposed method for other services to call"""
try:
if event.strategy == BatchingStrategy.IMMEDIATE:
success = self.run_and_wait(self._process_immediate(event))
return NotificationResult(
success=success,
message=(
"Immediate notification processed"
if success
else "Failed to send immediate notification"
),
)
success = self.run_and_wait(self._add_to_batch(event))
return NotificationResult(
success=success,
message=(
"Notification queued for batch processing"
if success
else "Failed to queue notification"
),
)
except Exception as e:
logger.error(f"Error queueing notification: {e}")
return NotificationResult(success=False, message=str(e))
async def _add_to_batch(self, event: NotificationEvent) -> bool:
"""Add an event to its appropriate batch"""
redis = await get_redis_async()
batch_key = f"{self.batch_key_prefix}{event.user_id}:{event.strategy}"
try:
current_batch = await redis.get(batch_key)
if current_batch:
batch = NotificationBatch.parse_raw(current_batch)
batch.events.append(event)
batch.last_update = datetime.now()
else:
batch = NotificationBatch(
user_id=event.user_id, events=[event], strategy=event.strategy
)
pipeline = redis.pipeline()
await pipeline.set(
batch_key, batch.json(), ex=self._get_batch_expiry(batch.strategy)
)
await pipeline.execute()
# Notify batch processor
await redis.publish("notification_triggers", batch_key)
return True
except Exception as e:
logger.error(f"Error adding to batch: {e}")
return False
async def _process_immediate(self, event: NotificationEvent) -> bool:
"""Process an immediate notification"""
try:
# Implementation of actual email sending would go here
# For now, just log it
logger.info(f"Sending immediate notification: {event}")
return True
except Exception as e:
logger.error(f"Error processing immediate notification: {e}")
return False
def _get_batch_expiry(self, strategy: BatchingStrategy) -> int:
return {
BatchingStrategy.HOURLY: 3600,
BatchingStrategy.DAILY: 86400,
BatchingStrategy.IMMEDIATE: 300,
}.get(strategy, 3600)
def run_service(self):
"""Main service loop - handles batch processing"""
redis_conn = get_redis()
pubsub = redis_conn.pubsub()
pubsub.subscribe("notification_triggers")
logger.info(f"[{self.service_name}] Started notification service")
while self.running:
try:
message = pubsub.get_message(ignore_subscribe_messages=True)
if message and message["type"] == "message":
batch_key = message["data"].decode()
self.run_and_wait(self._process_batch(batch_key))
except Exception as e:
logger.error(f"Error in notification service loop: {e}")
async def _process_batch(self, batch_key: str):
"""Process a batch of notifications"""
redis = await get_redis_async()
try:
batch_data = await redis.get(batch_key)
if not batch_data:
return
batch = NotificationBatch.parse_raw(batch_data)
if not self._should_process_batch(batch):
return
# Implementation of batch email sending would go here
logger.info(f"Processing batch: {batch}")
await redis.delete(batch_key)
except Exception as e:
logger.error(f"Error processing batch {batch_key}: {e}")
def _should_process_batch(self, batch: NotificationBatch) -> bool:
age = datetime.now() - batch.last_update
return (
len(batch.events) >= 10
or (batch.strategy == BatchingStrategy.HOURLY and age >= timedelta(hours=1))
or (batch.strategy == BatchingStrategy.DAILY and age >= timedelta(days=1))
)
def cleanup(self):
"""Cleanup service resources"""
self.running = False
super().cleanup()
# ------- UTILITIES ------- #
# @thread_cached
# def get_notification_service() -> "NotificationService":
# from backend.notifications import NotificationService
# return get_service_client(NotificationService)
@thread_cached
def get_db_client() -> "DatabaseManager":
from backend.executor import DatabaseManager
return get_service_client(DatabaseManager)

View File

@@ -140,6 +140,11 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
description="The port for agent server API to run on",
)
notification_service_port: int = Field(
default=8007,
description="The port for notification service daemon to run on",
)
platform_base_url: str = Field(
default="",
description="Must be set so the application knows where it's hosted at. "

View File

@@ -3157,7 +3157,6 @@ files = [
{file = "psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:bb89f0a835bcfc1d42ccd5f41f04870c1b936d8507c6df12b7737febc40f0909"},
{file = "psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:f0c2d907a1e102526dd2986df638343388b94c33860ff3bbe1384130828714b1"},
{file = "psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:f8157bed2f51db683f31306aa497311b560f2265998122abe1dce6428bd86567"},
{file = "psycopg2_binary-2.9.10-cp313-cp313-win_amd64.whl", hash = "sha256:27422aa5f11fbcd9b18da48373eb67081243662f9b46e6fd07c3eb46e4535142"},
{file = "psycopg2_binary-2.9.10-cp38-cp38-macosx_12_0_x86_64.whl", hash = "sha256:eb09aa7f9cecb45027683bb55aebaaf45a0df8bf6de68801a6afdc7947bb09d4"},
{file = "psycopg2_binary-2.9.10-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b73d6d7f0ccdad7bc43e6d34273f70d587ef62f824d7261c4ae9b8b1b6af90e8"},
{file = "psycopg2_binary-2.9.10-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ce5ab4bf46a211a8e924d307c1b1fcda82368586a19d0a24f8ae166f5c784864"},