From cee7929b026f4ff51587bd68c8015ad09402d7db Mon Sep 17 00:00:00 2001 From: Nicholas Tindle Date: Mon, 10 Feb 2025 15:54:02 -0600 Subject: [PATCH] feat(backend): setup first notification service --- .../backend/backend/executor/manager.py | 39 +++- .../backend/notifications/notifications.py | 214 ++++++++++++++++++ 2 files changed, 251 insertions(+), 2 deletions(-) create mode 100644 autogpt_platform/backend/backend/notifications/notifications.py diff --git a/autogpt_platform/backend/backend/executor/manager.py b/autogpt_platform/backend/backend/executor/manager.py index 1d965e0121..0d3604143c 100644 --- a/autogpt_platform/backend/backend/executor/manager.py +++ b/autogpt_platform/backend/backend/executor/manager.py @@ -12,8 +12,15 @@ from typing import TYPE_CHECKING, Any, Generator, Optional, TypeVar, cast from redis.lock import Lock as RedisLock +from backend.data.notifications import ( + AgentRunData, + NotificationEventDTO, + NotificationType, +) + if TYPE_CHECKING: from backend.executor import DatabaseManager + from backend.notifications.notifications import NotificationManager from autogpt_libs.utils.cache import thread_cached @@ -108,6 +115,7 @@ ExecutionStream = Generator[NodeExecutionEntry, None, None] def execute_node( db_client: "DatabaseManager", creds_manager: IntegrationCredentialsManager, + notification_service: "NotificationManager", data: NodeExecutionEntry, execution_stats: dict[str, Any] | None = None, ) -> ExecutionStream: @@ -196,7 +204,7 @@ def execute_node( # Charge the user for the execution before running the block. # TODO: We assume the block is executed within 0 seconds. # This is fine because for now, there is no block that is charged by time. - db_client.spend_credits(data, input_size + output_size, 0) + cost = db_client.spend_credits(data, input_size + output_size, 0) for output_name, output_data in node_block.execute( input_data, **extra_exec_kwargs @@ -216,7 +224,22 @@ def execute_node( ): yield execution + # Update execution status and spend credits update_execution(ExecutionStatus.COMPLETED) + event = NotificationEventDTO( + user_id=user_id, + type=NotificationType.AGENT_RUN, + data=AgentRunData( + agent_name=node_block.name, + credits_used=cost, + execution_time=0, + graph_id=graph_id, + node_count=1, + ).model_dump(), + ) + + logger.info(f"Sending notification for {event}") + notification_service.queue_notification(event) except Exception as e: error_msg = str(e) @@ -480,6 +503,7 @@ class Executor: cls.pid = os.getpid() cls.db_client = get_db_client() cls.creds_manager = IntegrationCredentialsManager() + cls.notification_service = get_notification_service() # Set up shutdown handlers cls.shutdown_lock = threading.Lock() @@ -554,7 +578,11 @@ class Executor: try: log_metadata.info(f"Start node execution {node_exec.node_exec_id}") for execution in execute_node( - cls.db_client, cls.creds_manager, node_exec, stats + db_client=cls.db_client, + creds_manager=cls.creds_manager, + notification_service=cls.notification_service, + data=node_exec, + execution_stats=stats, ): q.add(execution) log_metadata.info(f"Finished node execution {node_exec.node_exec_id}") @@ -966,6 +994,13 @@ def get_db_client() -> "DatabaseManager": return get_service_client(DatabaseManager) +@thread_cached +def get_notification_service() -> "NotificationManager": + from backend.notifications import NotificationManager + + return get_service_client(NotificationManager) + + @contextmanager def synchronized(key: str, timeout: int = 60): lock: RedisLock = redis.get_redis().lock(f"lock:{key}", timeout=timeout) diff --git a/autogpt_platform/backend/backend/notifications/notifications.py b/autogpt_platform/backend/backend/notifications/notifications.py new file mode 100644 index 0000000000..4c97d951f2 --- /dev/null +++ b/autogpt_platform/backend/backend/notifications/notifications.py @@ -0,0 +1,214 @@ +import logging +import time +from typing import TYPE_CHECKING + +from aio_pika.exceptions import QueueEmpty +from autogpt_libs.utils.cache import thread_cached + +from backend.data.notifications import ( + BatchingStrategy, + NotificationEventDTO, + NotificationEventModel, + NotificationResult, + get_data_type, +) +from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig +from backend.executor.database import DatabaseManager +from backend.util.service import AppService, expose, get_service_client +from backend.util.settings import Settings + +if TYPE_CHECKING: + from backend.executor import DatabaseManager + +logger = logging.getLogger(__name__) +settings = Settings() + + +def create_notification_config() -> RabbitMQConfig: + """Create RabbitMQ configuration for notifications""" + notification_exchange = Exchange(name="notifications", type=ExchangeType.TOPIC) + + summary_exchange = Exchange(name="summaries", type=ExchangeType.TOPIC) + + dead_letter_exchange = Exchange(name="dead_letter", type=ExchangeType.DIRECT) + delay_exchange = Exchange(name="delay", type=ExchangeType.DIRECT) + + queues = [ + # Main notification queues + Queue( + name="immediate_notifications", + exchange=notification_exchange, + routing_key="notification.immediate.#", + arguments={ + "x-dead-letter-exchange": dead_letter_exchange.name, + "x-dead-letter-routing-key": "failed.immediate", + }, + ), + Queue( + name="backoff_notifications", + exchange=notification_exchange, + routing_key="notification.backoff.#", + arguments={ + "x-dead-letter-exchange": dead_letter_exchange.name, + "x-dead-letter-routing-key": "failed.backoff", + }, + ), + # Summary queues + Queue( + name="daily_summary_trigger", + exchange=summary_exchange, + routing_key="summary.daily", + arguments={"x-message-ttl": 86400000}, # 24 hours + ), + Queue( + name="weekly_summary_trigger", + exchange=summary_exchange, + routing_key="summary.weekly", + arguments={"x-message-ttl": 604800000}, # 7 days + ), + Queue( + name="monthly_summary_trigger", + exchange=summary_exchange, + routing_key="summary.monthly", + arguments={"x-message-ttl": 2592000000}, # 30 days + ), + # Failed notifications queue + Queue( + name="failed_notifications", + exchange=dead_letter_exchange, + routing_key="failed.#", + ), + ] + + return RabbitMQConfig( + exchanges=[ + notification_exchange, + # batch_exchange, + summary_exchange, + dead_letter_exchange, + delay_exchange, + ], + queues=queues, + ) + + +class NotificationManager(AppService): + """Service for handling notifications with batching support""" + + def __init__(self): + super().__init__() + self.use_db = True + self.use_async = False # Use async RabbitMQ client + self.use_rabbitmq = create_notification_config() + self.running = True + + @classmethod + def get_port(cls) -> int: + return settings.config.notification_service_port + + def get_routing_key(self, event: NotificationEventModel) -> str: + """Get the appropriate routing key for an event""" + if event.strategy == BatchingStrategy.IMMEDIATE: + return f"notification.immediate.{event.type.value}" + elif event.strategy == BatchingStrategy.BACKOFF: + return f"notification.backoff.{event.type.value}" + return f"notification.{event.type.value}" + + @expose + def queue_notification(self, event: NotificationEventDTO) -> NotificationResult: + """Queue a notification - exposed method for other services to call""" + try: + logger.info(f"Recieved Request to queue {event=}") + # Workaround for not being able to seralize generics over the expose bus + parsed_event = NotificationEventModel[ + get_data_type(event.type) + ].model_validate(event.model_dump()) + routing_key = self.get_routing_key(parsed_event) + message = parsed_event.model_dump_json() + + logger.info(f"Recieved Request to queue {message=}") + + exchange = "notifications" + + # Publish to RabbitMQ + self.run_and_wait( + self.rabbit.publish_message( + routing_key=routing_key, + message=message, + exchange=next( + ex for ex in self.rabbit_config.exchanges if ex.name == exchange + ), + ) + ) + + return NotificationResult( + success=True, + message=(f"Notification queued with routing key: {routing_key}"), + ) + + except Exception as e: + logger.error(f"Error queueing notification: {e}") + return NotificationResult(success=False, message=str(e)) + + async def _process_immediate(self, message: str) -> bool: + """Process a single notification immediately""" + try: + event = NotificationEventDTO.model_validate_json(message) + parsed_event = NotificationEventModel[ + get_data_type(event.type) + ].model_validate_json(message) + # Implementation of actual notification sending would go here + # self.email_sender.send_templated(event.type, event.user_id, parsed_event) + logger.info(f"Processing notification: {parsed_event}") + return True + except Exception as e: + logger.error(f"Error processing notification: {e}") + return False + + def run_service(self): + logger.info(f"[{self.service_name}] Started notification service") + + # Set up queue consumers + channel = self.run_and_wait(self.rabbit.get_channel()) + + immediate_queue = self.run_and_wait( + channel.get_queue("immediate_notifications") + ) + + while self.running: + try: + # Process immediate notifications + try: + message = self.run_and_wait(immediate_queue.get()) + + if message: + success = self.run_and_wait( + self._process_immediate(message.body.decode()) + ) + if success: + self.run_and_wait(message.ack()) + else: + self.run_and_wait(message.reject(requeue=True)) + except QueueEmpty: + logger.debug("Immediate queue empty") + + time.sleep(0.1) + + except QueueEmpty as e: + logger.debug(f"Queue empty: {e}") + except Exception as e: + logger.error(f"Error in notification service loop: {e}") + + def cleanup(self): + """Cleanup service resources""" + self.running = False + super().cleanup() + + # ------- UTILITIES ------- # + + +@thread_cached +def get_db_client() -> "DatabaseManager": + from backend.executor import DatabaseManager + + return get_service_client(DatabaseManager)