feat(backend): setup first notification service

This commit is contained in:
Nicholas Tindle
2025-02-10 15:54:02 -06:00
parent c259e5b145
commit cee7929b02
2 changed files with 251 additions and 2 deletions

View File

@@ -12,8 +12,15 @@ from typing import TYPE_CHECKING, Any, Generator, Optional, TypeVar, cast
from redis.lock import Lock as RedisLock
from backend.data.notifications import (
AgentRunData,
NotificationEventDTO,
NotificationType,
)
if TYPE_CHECKING:
from backend.executor import DatabaseManager
from backend.notifications.notifications import NotificationManager
from autogpt_libs.utils.cache import thread_cached
@@ -108,6 +115,7 @@ ExecutionStream = Generator[NodeExecutionEntry, None, None]
def execute_node(
db_client: "DatabaseManager",
creds_manager: IntegrationCredentialsManager,
notification_service: "NotificationManager",
data: NodeExecutionEntry,
execution_stats: dict[str, Any] | None = None,
) -> ExecutionStream:
@@ -196,7 +204,7 @@ def execute_node(
# Charge the user for the execution before running the block.
# TODO: We assume the block is executed within 0 seconds.
# This is fine because for now, there is no block that is charged by time.
db_client.spend_credits(data, input_size + output_size, 0)
cost = db_client.spend_credits(data, input_size + output_size, 0)
for output_name, output_data in node_block.execute(
input_data, **extra_exec_kwargs
@@ -216,7 +224,22 @@ def execute_node(
):
yield execution
# Update execution status and spend credits
update_execution(ExecutionStatus.COMPLETED)
event = NotificationEventDTO(
user_id=user_id,
type=NotificationType.AGENT_RUN,
data=AgentRunData(
agent_name=node_block.name,
credits_used=cost,
execution_time=0,
graph_id=graph_id,
node_count=1,
).model_dump(),
)
logger.info(f"Sending notification for {event}")
notification_service.queue_notification(event)
except Exception as e:
error_msg = str(e)
@@ -480,6 +503,7 @@ class Executor:
cls.pid = os.getpid()
cls.db_client = get_db_client()
cls.creds_manager = IntegrationCredentialsManager()
cls.notification_service = get_notification_service()
# Set up shutdown handlers
cls.shutdown_lock = threading.Lock()
@@ -554,7 +578,11 @@ class Executor:
try:
log_metadata.info(f"Start node execution {node_exec.node_exec_id}")
for execution in execute_node(
cls.db_client, cls.creds_manager, node_exec, stats
db_client=cls.db_client,
creds_manager=cls.creds_manager,
notification_service=cls.notification_service,
data=node_exec,
execution_stats=stats,
):
q.add(execution)
log_metadata.info(f"Finished node execution {node_exec.node_exec_id}")
@@ -966,6 +994,13 @@ def get_db_client() -> "DatabaseManager":
return get_service_client(DatabaseManager)
@thread_cached
def get_notification_service() -> "NotificationManager":
from backend.notifications import NotificationManager
return get_service_client(NotificationManager)
@contextmanager
def synchronized(key: str, timeout: int = 60):
lock: RedisLock = redis.get_redis().lock(f"lock:{key}", timeout=timeout)

View File

@@ -0,0 +1,214 @@
import logging
import time
from typing import TYPE_CHECKING
from aio_pika.exceptions import QueueEmpty
from autogpt_libs.utils.cache import thread_cached
from backend.data.notifications import (
BatchingStrategy,
NotificationEventDTO,
NotificationEventModel,
NotificationResult,
get_data_type,
)
from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig
from backend.executor.database import DatabaseManager
from backend.util.service import AppService, expose, get_service_client
from backend.util.settings import Settings
if TYPE_CHECKING:
from backend.executor import DatabaseManager
logger = logging.getLogger(__name__)
settings = Settings()
def create_notification_config() -> RabbitMQConfig:
"""Create RabbitMQ configuration for notifications"""
notification_exchange = Exchange(name="notifications", type=ExchangeType.TOPIC)
summary_exchange = Exchange(name="summaries", type=ExchangeType.TOPIC)
dead_letter_exchange = Exchange(name="dead_letter", type=ExchangeType.DIRECT)
delay_exchange = Exchange(name="delay", type=ExchangeType.DIRECT)
queues = [
# Main notification queues
Queue(
name="immediate_notifications",
exchange=notification_exchange,
routing_key="notification.immediate.#",
arguments={
"x-dead-letter-exchange": dead_letter_exchange.name,
"x-dead-letter-routing-key": "failed.immediate",
},
),
Queue(
name="backoff_notifications",
exchange=notification_exchange,
routing_key="notification.backoff.#",
arguments={
"x-dead-letter-exchange": dead_letter_exchange.name,
"x-dead-letter-routing-key": "failed.backoff",
},
),
# Summary queues
Queue(
name="daily_summary_trigger",
exchange=summary_exchange,
routing_key="summary.daily",
arguments={"x-message-ttl": 86400000}, # 24 hours
),
Queue(
name="weekly_summary_trigger",
exchange=summary_exchange,
routing_key="summary.weekly",
arguments={"x-message-ttl": 604800000}, # 7 days
),
Queue(
name="monthly_summary_trigger",
exchange=summary_exchange,
routing_key="summary.monthly",
arguments={"x-message-ttl": 2592000000}, # 30 days
),
# Failed notifications queue
Queue(
name="failed_notifications",
exchange=dead_letter_exchange,
routing_key="failed.#",
),
]
return RabbitMQConfig(
exchanges=[
notification_exchange,
# batch_exchange,
summary_exchange,
dead_letter_exchange,
delay_exchange,
],
queues=queues,
)
class NotificationManager(AppService):
"""Service for handling notifications with batching support"""
def __init__(self):
super().__init__()
self.use_db = True
self.use_async = False # Use async RabbitMQ client
self.use_rabbitmq = create_notification_config()
self.running = True
@classmethod
def get_port(cls) -> int:
return settings.config.notification_service_port
def get_routing_key(self, event: NotificationEventModel) -> str:
"""Get the appropriate routing key for an event"""
if event.strategy == BatchingStrategy.IMMEDIATE:
return f"notification.immediate.{event.type.value}"
elif event.strategy == BatchingStrategy.BACKOFF:
return f"notification.backoff.{event.type.value}"
return f"notification.{event.type.value}"
@expose
def queue_notification(self, event: NotificationEventDTO) -> NotificationResult:
"""Queue a notification - exposed method for other services to call"""
try:
logger.info(f"Recieved Request to queue {event=}")
# Workaround for not being able to seralize generics over the expose bus
parsed_event = NotificationEventModel[
get_data_type(event.type)
].model_validate(event.model_dump())
routing_key = self.get_routing_key(parsed_event)
message = parsed_event.model_dump_json()
logger.info(f"Recieved Request to queue {message=}")
exchange = "notifications"
# Publish to RabbitMQ
self.run_and_wait(
self.rabbit.publish_message(
routing_key=routing_key,
message=message,
exchange=next(
ex for ex in self.rabbit_config.exchanges if ex.name == exchange
),
)
)
return NotificationResult(
success=True,
message=(f"Notification queued with routing key: {routing_key}"),
)
except Exception as e:
logger.error(f"Error queueing notification: {e}")
return NotificationResult(success=False, message=str(e))
async def _process_immediate(self, message: str) -> bool:
"""Process a single notification immediately"""
try:
event = NotificationEventDTO.model_validate_json(message)
parsed_event = NotificationEventModel[
get_data_type(event.type)
].model_validate_json(message)
# Implementation of actual notification sending would go here
# self.email_sender.send_templated(event.type, event.user_id, parsed_event)
logger.info(f"Processing notification: {parsed_event}")
return True
except Exception as e:
logger.error(f"Error processing notification: {e}")
return False
def run_service(self):
logger.info(f"[{self.service_name}] Started notification service")
# Set up queue consumers
channel = self.run_and_wait(self.rabbit.get_channel())
immediate_queue = self.run_and_wait(
channel.get_queue("immediate_notifications")
)
while self.running:
try:
# Process immediate notifications
try:
message = self.run_and_wait(immediate_queue.get())
if message:
success = self.run_and_wait(
self._process_immediate(message.body.decode())
)
if success:
self.run_and_wait(message.ack())
else:
self.run_and_wait(message.reject(requeue=True))
except QueueEmpty:
logger.debug("Immediate queue empty")
time.sleep(0.1)
except QueueEmpty as e:
logger.debug(f"Queue empty: {e}")
except Exception as e:
logger.error(f"Error in notification service loop: {e}")
def cleanup(self):
"""Cleanup service resources"""
self.running = False
super().cleanup()
# ------- UTILITIES ------- #
@thread_cached
def get_db_client() -> "DatabaseManager":
from backend.executor import DatabaseManager
return get_service_client(DatabaseManager)