Update models.py

This commit is contained in:
Nicholas Tindle
2025-02-07 20:49:07 -06:00
parent 19e8a6cff1
commit 7eea9c358e

View File

@@ -1,57 +1,9 @@
from datetime import datetime
from enum import Enum
from typing import Annotated, Generic, Literal, Optional, TypeVar, Union, overload
from pydantic import BaseModel, Field
class BatchingStrategy(str, Enum):
IMMEDIATE = "immediate" # Send right away (errors, critical notifications)
HOURLY = "hourly" # Batch for up to an hour (usage reports)
DAILY = "daily" # Daily digest (summary notifications)
BACKOFF = "backoff" # Backoff strategy (exponential backoff)
class NotificationType(str, Enum):
AGENT_RUN = "agent_run"
ZERO_BALANCE = "zero_balance"
LOW_BALANCE = "low_balance"
BLOCK_EXECUTION_FAILED = "block_execution_failed"
CONTINUOUS_AGENT_ERROR = "continuous_agent_error"
DAILY_SUMMARY = "daily_summary"
WEEKLY_SUMMARY = "weekly_summary"
MONTHLY_SUMMARY = "monthly_summary"
@property
def strategy(self) -> BatchingStrategy:
BATCHING_RULES = {
# These are batched by the notification service
NotificationType.AGENT_RUN: BatchingStrategy.IMMEDIATE,
# These are batched by the notification service, but with a backoff strategy
NotificationType.ZERO_BALANCE: BatchingStrategy.BACKOFF,
NotificationType.LOW_BALANCE: BatchingStrategy.BACKOFF,
NotificationType.BLOCK_EXECUTION_FAILED: BatchingStrategy.BACKOFF,
NotificationType.CONTINUOUS_AGENT_ERROR: BatchingStrategy.BACKOFF,
# These aren't batched by the notification service, so we send them right away
NotificationType.DAILY_SUMMARY: BatchingStrategy.IMMEDIATE,
NotificationType.WEEKLY_SUMMARY: BatchingStrategy.IMMEDIATE,
NotificationType.MONTHLY_SUMMARY: BatchingStrategy.IMMEDIATE,
}
return BATCHING_RULES.get(self, BatchingStrategy.HOURLY)
@property
def template(self) -> str:
"""Returns template name for this notification type"""
return {
NotificationType.AGENT_RUN: "agent_run.html",
NotificationType.ZERO_BALANCE: "zero_balance.html",
NotificationType.LOW_BALANCE: "low_balance.html",
NotificationType.BLOCK_EXECUTION_FAILED: "block_failed.html",
NotificationType.CONTINUOUS_AGENT_ERROR: "agent_error.html",
NotificationType.DAILY_SUMMARY: "daily_summary.html",
NotificationType.WEEKLY_SUMMARY: "weekly_summary.html",
NotificationType.MONTHLY_SUMMARY: "monthly_summary.html",
}[self]
from backend.data.model import BatchingStrategy, NotificationType
T_co = TypeVar("T_co", bound="BaseNotificationData", covariant=True)