feat(backend): schema updates, migration, queries for Email Notification Service (#9445)

<!-- Clearly explain the need for these changes: -->

The email service has requirements to
- Email users when some activity has happened on their account on some
scheduled basis -> We need a way to get active users and the executions
that happened while they were active
- Allow users to configure what emails they get -> Need a user
preference
- Get User email by Id so that we can email them -> Pretty
self-explanatory

We need to add a few new backend queries + db models for the
notification to start handling these details. This is the first set of
those changes based on experience building the app service

### Changes 🏗️

<!-- Concisely describe all of the changes made in this pull request:
-->
- Adds a new DB Model, `UserNotificationPreferences,` with related
migration
- Adds a new DB Model `NotificationEvent` with related migration to
track what notifications we've sent and how many and such (how much we
add here is open to change depending on what limits on data we want)
- Adds a new DB Model `UserNotificationBatch` with related migration to
handle batching of like models
- Adds queries to get users and executions by `datetime` ranges as `ISO`
strings
- Adds new queries to the `DatabaseManager` and exposes them to the
other `AppService`s
- Exposes all new queries plus an existing one `get_user_by_id`

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
- [x] I extracted these changes from a working implementation of the
service, and tested they don't bring down the service by being uncalled
by running the standard agent tests we do on release

---------

Co-authored-by: Reinier van der Leer <pwuts@agpt.co>
This commit is contained in:
Nicholas Tindle
2025-02-12 16:54:16 -06:00
committed by GitHub
parent 016ec0ff6b
commit 7e04fbd25f
8 changed files with 644 additions and 19 deletions

View File

@@ -15,6 +15,7 @@ from pydantic import BaseModel
from backend.data.block import BlockData, BlockInput, CompletedBlockOutput
from backend.data.includes import EXECUTION_RESULT_INCLUDE, GRAPH_EXECUTION_INCLUDE
from backend.data.queue import AsyncRedisEventBus, RedisEventBus
from backend.server.v2.store.exceptions import DatabaseError
from backend.util import json, mock
from backend.util.settings import Config
@@ -364,6 +365,31 @@ async def get_execution_results(graph_exec_id: str) -> list[ExecutionResult]:
return res
async def get_executions_in_timerange(
user_id: str, start_time: str, end_time: str
) -> list[ExecutionResult]:
try:
executions = await AgentGraphExecution.prisma().find_many(
where={
"AND": [
{
"startedAt": {
"gte": datetime.fromisoformat(start_time),
"lte": datetime.fromisoformat(end_time),
}
},
{"userId": user_id},
]
},
include=GRAPH_EXECUTION_INCLUDE,
)
return [ExecutionResult.from_graph(execution) for execution in executions]
except Exception as e:
raise DatabaseError(
f"Failed to get executions in timerange {start_time} to {end_time} for user {user_id}: {e}"
) from e
LIST_SPLIT = "_$_"
DICT_SPLIT = "_#_"
OBJC_SPLIT = "_@_"

View File

@@ -0,0 +1,360 @@
import logging
from datetime import datetime, timedelta
from enum import Enum
from typing import Annotated, Generic, Optional, TypeVar, Union
from prisma import Json
from prisma.enums import NotificationType
from prisma.models import NotificationEvent, UserNotificationBatch
from prisma.types import UserNotificationBatchWhereInput
# from backend.notifications.models import NotificationEvent
from pydantic import BaseModel, EmailStr, Field, field_validator
from backend.server.v2.store.exceptions import DatabaseError
from .db import transaction
logger = logging.getLogger(__name__)
T_co = TypeVar("T_co", bound="BaseNotificationData", covariant=True)
class BatchingStrategy(Enum):
IMMEDIATE = "immediate" # Send right away (errors, critical notifications)
HOURLY = "hourly" # Batch for up to an hour (usage reports)
DAILY = "daily" # Daily digest (summary notifications)
BACKOFF = "backoff" # Backoff strategy (exponential backoff)
class BaseNotificationData(BaseModel):
pass
class AgentRunData(BaseNotificationData):
agent_name: str
credits_used: float
# remaining_balance: float
execution_time: float
graph_id: str
node_count: int = Field(..., description="Number of nodes executed")
class ZeroBalanceData(BaseNotificationData):
last_transaction: float
last_transaction_time: datetime
top_up_link: str
class LowBalanceData(BaseNotificationData):
current_balance: float
threshold_amount: float
top_up_link: str
recent_usage: float = Field(..., description="Usage in the last 24 hours")
class BlockExecutionFailedData(BaseNotificationData):
block_name: str
block_id: str
error_message: str
graph_id: str
node_id: str
execution_id: str
class ContinuousAgentErrorData(BaseNotificationData):
agent_name: str
error_message: str
graph_id: str
execution_id: str
start_time: datetime
error_time: datetime
attempts: int = Field(..., description="Number of retry attempts made")
class BaseSummaryData(BaseNotificationData):
total_credits_used: float
total_executions: int
most_used_agent: str
total_execution_time: float
successful_runs: int
failed_runs: int
average_execution_time: float
cost_breakdown: dict[str, float]
class DailySummaryData(BaseSummaryData):
date: datetime
class WeeklySummaryData(BaseSummaryData):
start_date: datetime
end_date: datetime
week_number: int
year: int
class MonthlySummaryData(BaseSummaryData):
month: int
year: int
NotificationData = Annotated[
Union[
AgentRunData,
ZeroBalanceData,
LowBalanceData,
BlockExecutionFailedData,
ContinuousAgentErrorData,
MonthlySummaryData,
],
Field(discriminator="type"),
]
class NotificationEventDTO(BaseModel):
user_id: str
type: NotificationType
data: dict
created_at: datetime = Field(default_factory=datetime.now)
class NotificationEventModel(BaseModel, Generic[T_co]):
user_id: str
type: NotificationType
data: T_co
created_at: datetime = Field(default_factory=datetime.now)
@property
def strategy(self) -> BatchingStrategy:
return NotificationTypeOverride(self.type).strategy
@field_validator("type", mode="before")
def uppercase_type(cls, v):
if isinstance(v, str):
return v.upper()
return v
@property
def template(self) -> str:
return NotificationTypeOverride(self.type).template
def get_data_type(
notification_type: NotificationType,
) -> type[BaseNotificationData]:
return {
NotificationType.AGENT_RUN: AgentRunData,
NotificationType.ZERO_BALANCE: ZeroBalanceData,
NotificationType.LOW_BALANCE: LowBalanceData,
NotificationType.BLOCK_EXECUTION_FAILED: BlockExecutionFailedData,
NotificationType.CONTINUOUS_AGENT_ERROR: ContinuousAgentErrorData,
NotificationType.DAILY_SUMMARY: DailySummaryData,
NotificationType.WEEKLY_SUMMARY: WeeklySummaryData,
NotificationType.MONTHLY_SUMMARY: MonthlySummaryData,
}[notification_type]
class NotificationBatch(BaseModel):
user_id: str
events: list[NotificationEvent]
strategy: BatchingStrategy
last_update: datetime = datetime.now()
class NotificationResult(BaseModel):
success: bool
message: Optional[str] = None
class NotificationTypeOverride:
def __init__(self, notification_type: NotificationType):
self.notification_type = notification_type
@property
def strategy(self) -> BatchingStrategy:
BATCHING_RULES = {
# These are batched by the notification service
NotificationType.AGENT_RUN: BatchingStrategy.IMMEDIATE,
# These are batched by the notification service, but with a backoff strategy
NotificationType.ZERO_BALANCE: BatchingStrategy.BACKOFF,
NotificationType.LOW_BALANCE: BatchingStrategy.BACKOFF,
NotificationType.BLOCK_EXECUTION_FAILED: BatchingStrategy.BACKOFF,
NotificationType.CONTINUOUS_AGENT_ERROR: BatchingStrategy.BACKOFF,
# These aren't batched by the notification service, so we send them right away
NotificationType.DAILY_SUMMARY: BatchingStrategy.IMMEDIATE,
NotificationType.WEEKLY_SUMMARY: BatchingStrategy.IMMEDIATE,
NotificationType.MONTHLY_SUMMARY: BatchingStrategy.IMMEDIATE,
}
return BATCHING_RULES.get(self.notification_type, BatchingStrategy.HOURLY)
@property
def template(self) -> str:
"""Returns template name for this notification type"""
return {
NotificationType.AGENT_RUN: "agent_run.html",
NotificationType.ZERO_BALANCE: "zero_balance.html",
NotificationType.LOW_BALANCE: "low_balance.html",
NotificationType.BLOCK_EXECUTION_FAILED: "block_failed.html",
NotificationType.CONTINUOUS_AGENT_ERROR: "agent_error.html",
NotificationType.DAILY_SUMMARY: "daily_summary.html",
NotificationType.WEEKLY_SUMMARY: "weekly_summary.html",
NotificationType.MONTHLY_SUMMARY: "monthly_summary.html",
}[self.notification_type]
class NotificationPreference(BaseModel):
user_id: str
email: EmailStr
preferences: dict[NotificationType, bool] = Field(
default_factory=dict, description="Which notifications the user wants"
)
daily_limit: int = 10 # Max emails per day
emails_sent_today: int = 0
last_reset_date: datetime = Field(default_factory=datetime.now)
def get_batch_delay(notification_type: NotificationType) -> timedelta:
return {
NotificationType.AGENT_RUN: timedelta(seconds=1),
NotificationType.ZERO_BALANCE: timedelta(minutes=60),
NotificationType.LOW_BALANCE: timedelta(minutes=60),
NotificationType.BLOCK_EXECUTION_FAILED: timedelta(minutes=60),
NotificationType.CONTINUOUS_AGENT_ERROR: timedelta(minutes=60),
}[notification_type]
async def create_or_add_to_user_notification_batch(
user_id: str,
notification_type: NotificationType,
data: str, # type: 'NotificationEventModel'
) -> dict:
try:
logger.info(
f"Creating or adding to notification batch for {user_id} with type {notification_type} and data {data}"
)
notification_data = NotificationEventModel[
get_data_type(notification_type)
].model_validate_json(data)
# Serialize the data
json_data: Json = Json(notification_data.data.model_dump_json())
# First try to find existing batch
existing_batch = await UserNotificationBatch.prisma().find_unique(
where={
"userId_type": {
"userId": user_id,
"type": notification_type,
}
},
include={"notifications": True},
)
if not existing_batch:
async with transaction() as tx:
notification_event = await tx.notificationevent.create(
data={
"type": notification_type,
"data": json_data,
}
)
# Create new batch
resp = await tx.usernotificationbatch.create(
data={
"userId": user_id,
"type": notification_type,
"notifications": {"connect": [{"id": notification_event.id}]},
},
include={"notifications": True},
)
return resp.model_dump()
else:
async with transaction() as tx:
notification_event = await tx.notificationevent.create(
data={
"type": notification_type,
"data": json_data,
"UserNotificationBatch": {"connect": {"id": existing_batch.id}},
}
)
# Add to existing batch
resp = await tx.usernotificationbatch.update(
where={"id": existing_batch.id},
data={
"notifications": {"connect": [{"id": notification_event.id}]}
},
include={"notifications": True},
)
if not resp:
raise DatabaseError(
f"Failed to add notification event {notification_event.id} to existing batch {existing_batch.id}"
)
return resp.model_dump()
except Exception as e:
raise DatabaseError(
f"Failed to create or add to notification batch for user {user_id} and type {notification_type}: {e}"
) from e
async def get_user_notification_last_message_in_batch(
user_id: str,
notification_type: NotificationType,
) -> NotificationEvent | None:
try:
batch = await UserNotificationBatch.prisma().find_first(
where={"userId": user_id, "type": notification_type},
order={"createdAt": "desc"},
)
if not batch:
return None
if not batch.notifications:
return None
return batch.notifications[-1]
except Exception as e:
raise DatabaseError(
f"Failed to get user notification last message in batch for user {user_id} and type {notification_type}: {e}"
) from e
async def empty_user_notification_batch(
user_id: str, notification_type: NotificationType
) -> None:
try:
async with transaction() as tx:
await tx.notificationevent.delete_many(
where={
"UserNotificationBatch": {
"is": {"userId": user_id, "type": notification_type}
}
}
)
await tx.usernotificationbatch.delete_many(
where=UserNotificationBatchWhereInput(
userId=user_id,
type=notification_type,
)
)
except Exception as e:
raise DatabaseError(
f"Failed to empty user notification batch for user {user_id} and type {notification_type}: {e}"
) from e
async def get_user_notification_batch(
user_id: str,
notification_type: NotificationType,
) -> UserNotificationBatch | None:
try:
return await UserNotificationBatch.prisma().find_first(
where={"userId": user_id, "type": notification_type},
include={"notifications": True},
)
except Exception as e:
raise DatabaseError(
f"Failed to get user notification batch for user {user_id} and type {notification_type}: {e}"
) from e

View File

@@ -1,37 +1,45 @@
import logging
from datetime import datetime, timedelta
from typing import Optional, cast
from autogpt_libs.auth.models import DEFAULT_USER_ID
from fastapi import HTTPException
from prisma import Json
from prisma.enums import NotificationType
from prisma.models import User
from backend.data.db import prisma
from backend.data.model import UserIntegrations, UserMetadata, UserMetadataRaw
from backend.data.notifications import NotificationPreference
from backend.server.v2.store.exceptions import DatabaseError
from backend.util.encryption import JSONCryptor
logger = logging.getLogger(__name__)
async def get_or_create_user(user_data: dict) -> User:
user_id = user_data.get("sub")
if not user_id:
raise HTTPException(status_code=401, detail="User ID not found in token")
try:
user_id = user_data.get("sub")
if not user_id:
raise HTTPException(status_code=401, detail="User ID not found in token")
user_email = user_data.get("email")
if not user_email:
raise HTTPException(status_code=401, detail="Email not found in token")
user_email = user_data.get("email")
if not user_email:
raise HTTPException(status_code=401, detail="Email not found in token")
user = await prisma.user.find_unique(where={"id": user_id})
if not user:
user = await prisma.user.create(
data={
"id": user_id,
"email": user_email,
"name": user_data.get("user_metadata", {}).get("name"),
}
)
return User.model_validate(user)
user = await prisma.user.find_unique(where={"id": user_id})
if not user:
user = await prisma.user.create(
data={
"id": user_id,
"email": user_email,
"name": user_data.get("user_metadata", {}).get("name"),
}
)
return User.model_validate(user)
except Exception as e:
raise DatabaseError(f"Failed to get or create user {user_data}: {e}") from e
async def get_user_by_id(user_id: str) -> User:
@@ -130,3 +138,70 @@ async def migrate_and_encrypt_user_integrations():
where={"id": user.id},
data={"metadata": Json(raw_metadata)},
)
async def get_active_user_ids_in_timerange(start_time: str, end_time: str) -> list[str]:
try:
users = await User.prisma().find_many(
where={
"AgentGraphExecutions": {
"some": {
"createdAt": {
"gte": datetime.fromisoformat(start_time),
"lte": datetime.fromisoformat(end_time),
}
}
}
},
)
return [user.id for user in users]
except Exception as e:
raise DatabaseError(
f"Failed to get active user ids in timerange {start_time} to {end_time}: {e}"
) from e
async def get_active_users_ids() -> list[str]:
user_ids = await get_active_user_ids_in_timerange(
(datetime.now() - timedelta(days=30)).isoformat(),
datetime.now().isoformat(),
)
return user_ids
async def get_user_notification_preference(user_id: str) -> NotificationPreference:
try:
user = await User.prisma().find_unique_or_raise(
where={"id": user_id},
)
# enable notifications by default if user has no notification preference (shouldn't ever happen though)
preferences: dict[NotificationType, bool] = {
NotificationType.AGENT_RUN: user.notifyOnAgentRun or True,
NotificationType.ZERO_BALANCE: user.notifyOnZeroBalance or True,
NotificationType.LOW_BALANCE: user.notifyOnLowBalance or True,
NotificationType.BLOCK_EXECUTION_FAILED: user.notifyOnBlockExecutionFailed
or True,
NotificationType.CONTINUOUS_AGENT_ERROR: user.notifyOnContinuousAgentError
or True,
NotificationType.DAILY_SUMMARY: user.notifyOnDailySummary or True,
NotificationType.WEEKLY_SUMMARY: user.notifyOnWeeklySummary or True,
NotificationType.MONTHLY_SUMMARY: user.notifyOnMonthlySummary or True,
}
daily_limit = user.maxEmailsPerDay or 3
notification_preference = NotificationPreference(
user_id=user.id,
email=user.email,
preferences=preferences,
daily_limit=daily_limit,
# TODO with other changes later, for now we just will email them
emails_sent_today=0,
last_reset_date=datetime.now(),
)
return NotificationPreference.model_validate(notification_preference)
except Exception as e:
raise DatabaseError(
f"Failed to upsert user notification preference for user {user_id}: {e}"
) from e

View File

@@ -8,6 +8,7 @@ from backend.data.execution import (
RedisExecutionEventBus,
create_graph_execution,
get_execution_results,
get_executions_in_timerange,
get_incomplete_executions,
get_latest_execution,
update_execution_status,
@@ -17,9 +18,19 @@ from backend.data.execution import (
upsert_execution_output,
)
from backend.data.graph import get_graph, get_node
from backend.data.notifications import (
create_or_add_to_user_notification_batch,
empty_user_notification_batch,
get_user_notification_batch,
get_user_notification_last_message_in_batch,
)
from backend.data.user import (
get_active_user_ids_in_timerange,
get_active_users_ids,
get_user_by_id,
get_user_integrations,
get_user_metadata,
get_user_notification_preference,
update_user_integrations,
update_user_metadata,
)
@@ -72,6 +83,7 @@ class DatabaseManager(AppService):
update_node_execution_stats = exposed_run_and_wait(update_node_execution_stats)
upsert_execution_input = exposed_run_and_wait(upsert_execution_input)
upsert_execution_output = exposed_run_and_wait(upsert_execution_output)
get_executions_in_timerange = exposed_run_and_wait(get_executions_in_timerange)
# Graphs
get_node = exposed_run_and_wait(get_node)
@@ -84,8 +96,26 @@ class DatabaseManager(AppService):
exposed_run_and_wait(user_credit_model.spend_credits),
)
# User + User Metadata + User Integrations
# User + User Metadata + User Integrations + User Notification Preferences
get_user_metadata = exposed_run_and_wait(get_user_metadata)
update_user_metadata = exposed_run_and_wait(update_user_metadata)
get_user_integrations = exposed_run_and_wait(get_user_integrations)
update_user_integrations = exposed_run_and_wait(update_user_integrations)
get_active_user_ids_in_timerange = exposed_run_and_wait(
get_active_user_ids_in_timerange
)
get_user_by_id = exposed_run_and_wait(get_user_by_id)
get_user_notification_preference = exposed_run_and_wait(
get_user_notification_preference
)
get_active_users_ids = exposed_run_and_wait(get_active_users_ids)
# Notifications
create_or_add_to_user_notification_batch = exposed_run_and_wait(
create_or_add_to_user_notification_batch
)
get_user_notification_last_message_in_batch = exposed_run_and_wait(
get_user_notification_last_message_in_batch
)
empty_user_notification_batch = exposed_run_and_wait(empty_user_notification_batch)
get_user_notification_batch = exposed_run_and_wait(get_user_notification_batch)

View File

@@ -0,0 +1,45 @@
-- CreateEnum
CREATE TYPE "NotificationType" AS ENUM ('AGENT_RUN', 'ZERO_BALANCE', 'LOW_BALANCE', 'BLOCK_EXECUTION_FAILED', 'CONTINUOUS_AGENT_ERROR', 'DAILY_SUMMARY', 'WEEKLY_SUMMARY', 'MONTHLY_SUMMARY');
-- AlterTable
ALTER TABLE "User" ADD COLUMN "maxEmailsPerDay" INTEGER NOT NULL DEFAULT 3,
ADD COLUMN "notifyOnAgentRun" BOOLEAN NOT NULL DEFAULT true,
ADD COLUMN "notifyOnBlockExecutionFailed" BOOLEAN NOT NULL DEFAULT true,
ADD COLUMN "notifyOnContinuousAgentError" BOOLEAN NOT NULL DEFAULT true,
ADD COLUMN "notifyOnDailySummary" BOOLEAN NOT NULL DEFAULT true,
ADD COLUMN "notifyOnLowBalance" BOOLEAN NOT NULL DEFAULT true,
ADD COLUMN "notifyOnMonthlySummary" BOOLEAN NOT NULL DEFAULT true,
ADD COLUMN "notifyOnWeeklySummary" BOOLEAN NOT NULL DEFAULT true,
ADD COLUMN "notifyOnZeroBalance" BOOLEAN NOT NULL DEFAULT true;
-- CreateTable
CREATE TABLE "NotificationEvent" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"userNotificationBatchId" TEXT,
"type" "NotificationType" NOT NULL,
"data" JSONB NOT NULL,
CONSTRAINT "NotificationEvent_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "UserNotificationBatch" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"userId" TEXT NOT NULL,
"type" "NotificationType" NOT NULL,
CONSTRAINT "UserNotificationBatch_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "UserNotificationBatch_userId_type_key" ON "UserNotificationBatch"("userId", "type");
-- AddForeignKey
ALTER TABLE "NotificationEvent" ADD CONSTRAINT "NotificationEvent_userNotificationBatchId_fkey" FOREIGN KEY ("userNotificationBatchId") REFERENCES "UserNotificationBatch"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "UserNotificationBatch" ADD CONSTRAINT "UserNotificationBatch_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE;

View File

@@ -740,6 +740,27 @@ files = [
{file = "distro-1.9.0.tar.gz", hash = "sha256:2fa77c6fd8940f116ee1d6b94a2f90b13b5ea8d019b98bc8bafdcabcdd9bdbed"},
]
[[package]]
name = "dnspython"
version = "2.7.0"
description = "DNS toolkit"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "dnspython-2.7.0-py3-none-any.whl", hash = "sha256:b4c34b7d10b51bcc3a5071e7b8dee77939f1e878477eeecc965e9835f63c6c86"},
{file = "dnspython-2.7.0.tar.gz", hash = "sha256:ce9c432eda0dc91cf618a5cedf1a4e142651196bbcd2c80e89ed5a907e5cfaf1"},
]
[package.extras]
dev = ["black (>=23.1.0)", "coverage (>=7.0)", "flake8 (>=7)", "hypercorn (>=0.16.0)", "mypy (>=1.8)", "pylint (>=3)", "pytest (>=7.4)", "pytest-cov (>=4.1.0)", "quart-trio (>=0.11.0)", "sphinx (>=7.2.0)", "sphinx-rtd-theme (>=2.0.0)", "twine (>=4.0.0)", "wheel (>=0.42.0)"]
dnssec = ["cryptography (>=43)"]
doh = ["h2 (>=4.1.0)", "httpcore (>=1.0.0)", "httpx (>=0.26.0)"]
doq = ["aioquic (>=1.0.0)"]
idna = ["idna (>=3.7)"]
trio = ["trio (>=0.23)"]
wmi = ["wmi (>=1.5.1)"]
[[package]]
name = "e2b"
version = "1.0.5"
@@ -778,6 +799,22 @@ attrs = ">=21.3.0"
e2b = ">=1.0.4,<2.0.0"
httpx = ">=0.20.0,<1.0.0"
[[package]]
name = "email-validator"
version = "2.2.0"
description = "A robust email address syntax and deliverability validation library."
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "email_validator-2.2.0-py3-none-any.whl", hash = "sha256:561977c2d73ce3611850a06fa56b414621e0c8faa9d66f2611407d87465da631"},
{file = "email_validator-2.2.0.tar.gz", hash = "sha256:cb690f344c617a714f22e66ae771445a1ceb46821152df8e165c5f9a364582b7"},
]
[package.dependencies]
dnspython = ">=2.0.0"
idna = ">=2.0.0"
[[package]]
name = "exceptiongroup"
version = "1.2.2"
@@ -3226,6 +3263,7 @@ files = [
[package.dependencies]
annotated-types = ">=0.6.0"
email-validator = {version = ">=2.0.0", optional = true, markers = "extra == \"email\""}
pydantic-core = "2.27.2"
typing-extensions = ">=4.12.2"
@@ -5084,4 +5122,4 @@ type = ["pytest-mypy"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<3.13"
content-hash = "a3af2c13c3fce626006c6469263e44cc7f1b8c26e3d0a6beabd1b33caac35128"
content-hash = "4052d96f95ad3dbf8bef4d651168f6df1ef21c506f152ddca119ad8f23caf159"

View File

@@ -40,7 +40,7 @@ praw = "~7.8.1"
prisma = "^0.15.0"
psutil = "^6.1.0"
psycopg2-binary = "^2.9.10"
pydantic = "^2.9.2"
pydantic = {extras = ["email"], version = "^2.10.6"}
pydantic-settings = "^2.3.4"
pyro5 = "^5.15"
pytest = "^8.2.1"

View File

@@ -23,7 +23,18 @@ model User {
stripeCustomerId String?
topUpConfig Json?
maxEmailsPerDay Int @default(3)
notifyOnAgentRun Boolean @default(true)
notifyOnZeroBalance Boolean @default(true)
notifyOnLowBalance Boolean @default(true)
notifyOnBlockExecutionFailed Boolean @default(true)
notifyOnContinuousAgentError Boolean @default(true)
notifyOnDailySummary Boolean @default(true)
notifyOnWeeklySummary Boolean @default(true)
notifyOnMonthlySummary Boolean @default(true)
// Relations
AgentGraphs AgentGraph[]
AgentGraphExecutions AgentGraphExecution[]
AnalyticsDetails AnalyticsDetails[]
@@ -39,6 +50,7 @@ model User {
StoreListingSubmission StoreListingSubmission[]
APIKeys APIKey[]
IntegrationWebhooks IntegrationWebhook[]
UserNotificationBatch UserNotificationBatch[]
@@index([id])
@@index([email])
@@ -110,6 +122,45 @@ model AgentPreset {
@@index([userId])
}
enum NotificationType {
AGENT_RUN
ZERO_BALANCE
LOW_BALANCE
BLOCK_EXECUTION_FAILED
CONTINUOUS_AGENT_ERROR
DAILY_SUMMARY
WEEKLY_SUMMARY
MONTHLY_SUMMARY
}
model NotificationEvent {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
UserNotificationBatch UserNotificationBatch? @relation(fields: [userNotificationBatchId], references: [id])
userNotificationBatchId String?
type NotificationType
data Json
}
model UserNotificationBatch {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
userId String
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
type NotificationType
notifications NotificationEvent[]
// Each user can only have one batch of a notification type at a time
@@unique([userId, type])
}
// For the library page
// It is a user controlled list of agents, that they will see in there library
model UserAgent {