ref(backend): move notification models around

This commit is contained in:
Nicholas Tindle
2025-02-10 05:41:35 -06:00
parent 8350552709
commit 2240a7807d
4 changed files with 17 additions and 134 deletions

View File

@@ -12,7 +12,11 @@ from typing import TYPE_CHECKING, Any, Generator, Optional, TypeVar, cast
from redis.lock import Lock as RedisLock
from backend.notifications.models import AgentRunData, create_notification
from backend.data.notifications import (
AgentRunData,
NotificationEventModel,
NotificationType,
)
if TYPE_CHECKING:
from backend.executor import DatabaseManager
@@ -223,7 +227,7 @@ def execute_node(
# Update execution status and spend credits
update_execution(ExecutionStatus.COMPLETED)
notification_service.queue_notification(
create_notification(
NotificationEventModel(
user_id=user_id,
type=NotificationType.AGENT_RUN,
data=AgentRunData(

View File

@@ -1,120 +0,0 @@
from datetime import datetime
from typing import Annotated, Generic, Optional, TypeVar, Union
from pydantic import BaseModel, Field
from backend.data.model import BatchingStrategy, NotificationType
T_co = TypeVar("T_co", bound="BaseNotificationData", covariant=True)
class BaseNotificationData(BaseModel):
pass
class AgentRunData(BaseNotificationData):
agent_name: str
credits_used: float
# remaining_balance: float
execution_time: float
graph_id: str
node_count: int = Field(..., description="Number of nodes executed")
class ZeroBalanceData(BaseNotificationData):
last_transaction: float
last_transaction_time: datetime
top_up_link: str
class LowBalanceData(BaseNotificationData):
current_balance: float
threshold_amount: float
top_up_link: str
recent_usage: float = Field(..., description="Usage in the last 24 hours")
class BlockExecutionFailedData(BaseNotificationData):
block_name: str
block_id: str
error_message: str
graph_id: str
node_id: str
execution_id: str
class ContinuousAgentErrorData(BaseNotificationData):
agent_name: str
error_message: str
graph_id: str
execution_id: str
start_time: datetime
error_time: datetime
attempts: int = Field(..., description="Number of retry attempts made")
class BaseSummaryData(BaseNotificationData):
total_credits_used: float
total_executions: int
most_used_agent: str
total_execution_time: float
successful_runs: int
failed_runs: int
average_execution_time: float
cost_breakdown: dict[str, float]
class DailySummaryData(BaseSummaryData):
date: datetime
class WeeklySummaryData(BaseSummaryData):
start_date: datetime
end_date: datetime
week_number: int
year: int
class MonthlySummaryData(BaseSummaryData):
month: int
year: int
NotificationData = Annotated[
Union[
AgentRunData,
ZeroBalanceData,
LowBalanceData,
BlockExecutionFailedData,
ContinuousAgentErrorData,
MonthlySummaryData,
],
Field(discriminator="type"),
]
class NotificationEvent(BaseModel, Generic[T_co]):
user_id: str
type: NotificationType
data: T_co
created_at: datetime = Field(default_factory=datetime.now)
@property
def strategy(self) -> BatchingStrategy:
return self.type.strategy
@property
def template(self) -> str:
return self.type.template
class NotificationBatch(BaseModel):
user_id: str
events: list[NotificationEvent]
strategy: BatchingStrategy
last_update: datetime = datetime.now()
class NotificationResult(BaseModel):
success: bool
message: Optional[str] = None

View File

@@ -6,13 +6,12 @@ from typing import TYPE_CHECKING, Optional
from autogpt_libs.utils.cache import thread_cached
from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig
from backend.executor.database import DatabaseManager
from backend.notifications.models import (
from backend.data.notifications import (
BatchingStrategy,
NotificationEvent,
NotificationResult,
)
from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig
from backend.executor.database import DatabaseManager
from backend.notifications.summary import SummaryManager
from backend.util.service import AppService, expose, get_service_client
from backend.util.settings import Settings
@@ -110,7 +109,7 @@ class NotificationManager(AppService):
def get_port(cls) -> int:
return settings.config.notification_service_port
def get_routing_key(self, event: NotificationEvent) -> str:
def get_routing_key(self, event: NotificationEventModel) -> str:
"""Get the appropriate routing key for an event"""
if event.strategy == BatchingStrategy.IMMEDIATE:
return f"notification.immediate.{event.type.value}"
@@ -122,7 +121,7 @@ class NotificationManager(AppService):
return f"batch.daily.{event.type.value}"
@expose
def queue_notification(self, event: NotificationEvent) -> NotificationResult:
def queue_notification(self, event: NotificationEventModel) -> NotificationResult:
"""Queue a notification - exposed method for other services to call"""
try:
routing_key = self.get_routing_key(event)
@@ -255,7 +254,7 @@ class NotificationManager(AppService):
async def _process_notification(self, message: str) -> bool:
"""Process a single notification"""
try:
event = NotificationEvent.parse_raw(message)
event = NotificationEventModel.parse_raw(message)
# Implementation of actual notification sending would go here
logger.info(f"Processing notification: {event}")
return True

View File

@@ -9,10 +9,10 @@ from autogpt_libs.utils.cache import thread_cached
if TYPE_CHECKING:
from backend.executor.database import DatabaseManager
from backend.notifications.models import (
from backend.data.notifications import (
DailySummaryData,
MonthlySummaryData,
NotificationEvent,
NotificationEventModel,
NotificationType,
WeeklySummaryData,
)
@@ -120,7 +120,7 @@ class SummaryManager:
if summary_type == "daily":
data = DailySummaryData(date=start_time, **stats)
type_ = NotificationType.DAILY_SUMMARY
notification = NotificationEvent(
notification = NotificationEventModel(
user_id=user_id,
type=type_,
data=data,
@@ -134,7 +134,7 @@ class SummaryManager:
**stats,
)
type_ = NotificationType.WEEKLY_SUMMARY
notification = NotificationEvent(
notification = NotificationEventModel(
user_id=user_id,
type=type_,
data=data,
@@ -144,7 +144,7 @@ class SummaryManager:
month=start_time.month, year=start_time.year, **stats
)
type_ = NotificationType.MONTHLY_SUMMARY
notification = NotificationEvent(
notification = NotificationEventModel(
user_id=user_id,
type=type_,
data=data,