Compare commits

...

42 Commits

Author SHA1 Message Date
Reinier van der Leer
1dfc840907 add pyro (de)serializer for datetime 2025-02-12 17:11:43 +01:00
Nicholas Tindle
aa21bf7b2d fix(backend): format 2025-02-11 21:36:22 -06:00
Nicholas Tindle
ddd2b9de15 fix(backend): the tests did dumb stuff like reusing ids 2025-02-11 21:09:51 -06:00
Nicholas Tindle
85f8f4136f fix(backend): relock 2025-02-11 20:33:49 -06:00
Nicholas Tindle
1cd90ef113 Merge branch 'dev' into ntindle/secrt-1088-add-db-models-for-the-notification-service 2025-02-11 20:32:47 -06:00
Nicholas Tindle
c69df5cb79 ref(backend): update from pr changes 2025-02-11 20:30:33 -06:00
Nicholas Tindle
957ebe697f refactor(backend): raise errors from db queries 2025-02-11 15:08:33 -06:00
Nicholas Tindle
11cda46724 ref(backend): pr changes 2025-02-11 15:01:15 -06:00
Nicholas Tindle
bf49a0a08a Update autogpt_platform/backend/backend/data/notifications.py 2025-02-11 14:26:33 -06:00
Nicholas Tindle
e2441c5220 Apply suggestions from code review
Co-authored-by: Reinier van der Leer <pwuts@agpt.co>
2025-02-11 14:23:00 -06:00
Nicholas Tindle
63e3582114 Merge branch 'ntindle/secrt-1088-add-db-models-for-the-notification-service' of https://github.com/Significant-Gravitas/AutoGPT into ntindle/secrt-1088-add-db-models-for-the-notification-service 2025-02-11 14:20:11 -06:00
Nicholas Tindle
0e99bdc742 ref(backend): smash migrations and apply changes to api 2025-02-11 14:20:04 -06:00
Nicholas Tindle
91df11b44c ref(backend): use lowercase types
Co-authored-by: Reinier van der Leer <pwuts@agpt.co>
2025-02-11 14:19:33 -06:00
Nicholas Tindle
c71bb17658 fix(backend): relock 2025-02-10 15:31:11 -06:00
Nicholas Tindle
4481827480 Merge branch 'dev' into ntindle/secrt-1088-add-db-models-for-the-notification-service 2025-02-10 15:28:12 -06:00
Nicholas Tindle
bada17d6d3 feat(backend): pull up changes from downstream 2025-02-10 15:25:13 -06:00
Nicholas Tindle
26ce2811e3 Merge branch 'dev' into ntindle/secrt-1088-add-db-models-for-the-notification-service 2025-02-10 08:21:32 -06:00
Nicholas Tindle
6ed6fa1033 fix(backend): broken db query oops 2025-02-10 06:35:34 -06:00
Nicholas Tindle
64e6294abf fix(backend): linting 2025-02-10 06:21:14 -06:00
Nicholas Tindle
eeec3c3424 fix(backend): add the package deps 2025-02-10 06:13:21 -06:00
Nicholas Tindle
1b28a04072 feat(backend): add more migrations 2025-02-10 06:03:49 -06:00
Nicholas Tindle
7dbb1a2d30 feat(backend): bring downstream changes up 2025-02-10 05:59:41 -06:00
Nicholas Tindle
ea07f03638 Merge branch 'dev' into ntindle/secrt-1088-add-db-models-for-the-notification-service 2025-02-10 02:51:47 -06:00
Nicholas Tindle
74b1ea05ec feat(backend): update models a bit more 2025-02-07 20:47:40 -06:00
Nicholas Tindle
815935f2f5 feat(backend): expose added queries 2025-02-07 16:13:22 -06:00
Nicholas Tindle
7ff6f48873 feat(backend): executions in time range query 2025-02-07 16:13:12 -06:00
Nicholas Tindle
c23f180be7 feat(backend): notification queries + active user counter 2025-02-07 16:13:00 -06:00
Nicholas Tindle
a9a268e9bc feat(backend): updated models for notification types + preferences 2025-02-07 16:12:11 -06:00
Nicholas Tindle
2cb81575b0 feat(db): schema updates + migration 2025-02-07 16:11:43 -06:00
Nicholas Tindle
66fee3a2e9 fix(lint): fix linting and minor issues 2025-02-07 14:57:47 -06:00
Nicholas Tindle
e80d4ab9b1 Merge branch 'dev' into ntindle/secrt-1087-attach-rabbit-mq-to-the-services-processes-similar-to-how 2025-02-07 11:24:55 -06:00
Nicholas Tindle
ee1df0fefc feat(backend): add async checker + classifiy rabbitmq 2025-02-07 11:19:29 -06:00
Nicholas Tindle
d864e9cbd6 fix(backend): rebuild rabbitmq infra 2025-02-07 11:19:05 -06:00
Nicholas Tindle
fab0aba21f Merge branch 'ntindle/secrt-1086-deploy-rabbitmq-as-part-of-our-docker-compose' into ntindle/secrt-1087-attach-rabbit-mq-to-the-services-processes-similar-to-how 2025-02-07 09:40:34 -06:00
Nicholas Tindle
c5eab7c417 Merge branch 'dev' into ntindle/secrt-1086-deploy-rabbitmq-as-part-of-our-docker-compose 2025-02-07 09:34:15 -06:00
Nicholas Tindle
81bad75b2c fix: add healthcheck 2025-02-07 09:12:52 -06:00
Nicholas Tindle
325cd0aa46 feat(backend): base level attachment to rabbitmq for services 2025-02-06 16:13:57 -06:00
Nicholas Tindle
d1d09df24b Merge branch 'ntindle/secrt-1086-deploy-rabbitmq-as-part-of-our-docker-compose' into ntindle/secrt-1087-attach-rabbit-mq-to-the-services-processes-similar-to-how 2025-02-06 16:08:24 -06:00
Nicholas Tindle
dfe2ec25d4 fix(infra): missed dependencies + docs 2025-02-06 16:07:20 -06:00
Nicholas Tindle
40b4d2fa43 feat(backend): add dependencies 2025-02-06 13:52:19 -06:00
Nicholas Tindle
e2c9ce0beb fix(infra): missed some 2025-02-06 13:08:08 -06:00
Nicholas Tindle
cae84e5ec8 feat(infra): add rabbitmq to dockercompose 2025-02-06 13:07:36 -06:00
9 changed files with 754 additions and 34 deletions

View File

@@ -15,6 +15,7 @@ from pydantic import BaseModel
from backend.data.block import BlockData, BlockInput, CompletedBlockOutput
from backend.data.includes import EXECUTION_RESULT_INCLUDE, GRAPH_EXECUTION_INCLUDE
from backend.data.queue import AsyncRedisEventBus, RedisEventBus
from backend.server.v2.store.exceptions import DatabaseError
from backend.util import json, mock
from backend.util.settings import Config
@@ -364,6 +365,31 @@ async def get_execution_results(graph_exec_id: str) -> list[ExecutionResult]:
return res
async def get_executions_in_timerange(
user_id: str, start_time: datetime, end_time: datetime
) -> list[ExecutionResult]:
try:
executions = await AgentGraphExecution.prisma().find_many(
where={
"AND": [
{
"startedAt": {
"gte": start_time,
"lte": end_time,
}
},
{"userId": user_id},
]
},
include=GRAPH_EXECUTION_INCLUDE,
)
return [ExecutionResult.from_graph(execution) for execution in executions]
except Exception as e:
raise DatabaseError(
f"Failed to get executions in timerange {start_time} to {end_time} for user {user_id}: {e}"
) from e
LIST_SPLIT = "_$_"
DICT_SPLIT = "_#_"
OBJC_SPLIT = "_@_"

View File

@@ -0,0 +1,360 @@
import logging
from datetime import datetime, timedelta
from enum import Enum
from typing import Annotated, Generic, Optional, TypeVar, Union
from prisma import Json
from prisma.enums import NotificationType
from prisma.models import NotificationEvent, UserNotificationBatch
from prisma.types import UserNotificationBatchWhereInput
# from backend.notifications.models import NotificationEvent
from pydantic import BaseModel, EmailStr, Field, field_validator
from backend.server.v2.store.exceptions import DatabaseError
from .db import transaction
logger = logging.getLogger(__name__)
T_co = TypeVar("T_co", bound="BaseNotificationData", covariant=True)
class BatchingStrategy(Enum):
IMMEDIATE = "immediate" # Send right away (errors, critical notifications)
HOURLY = "hourly" # Batch for up to an hour (usage reports)
DAILY = "daily" # Daily digest (summary notifications)
BACKOFF = "backoff" # Backoff strategy (exponential backoff)
class BaseNotificationData(BaseModel):
pass
class AgentRunData(BaseNotificationData):
agent_name: str
credits_used: float
# remaining_balance: float
execution_time: float
graph_id: str
node_count: int = Field(..., description="Number of nodes executed")
class ZeroBalanceData(BaseNotificationData):
last_transaction: float
last_transaction_time: datetime
top_up_link: str
class LowBalanceData(BaseNotificationData):
current_balance: float
threshold_amount: float
top_up_link: str
recent_usage: float = Field(..., description="Usage in the last 24 hours")
class BlockExecutionFailedData(BaseNotificationData):
block_name: str
block_id: str
error_message: str
graph_id: str
node_id: str
execution_id: str
class ContinuousAgentErrorData(BaseNotificationData):
agent_name: str
error_message: str
graph_id: str
execution_id: str
start_time: datetime
error_time: datetime
attempts: int = Field(..., description="Number of retry attempts made")
class BaseSummaryData(BaseNotificationData):
total_credits_used: float
total_executions: int
most_used_agent: str
total_execution_time: float
successful_runs: int
failed_runs: int
average_execution_time: float
cost_breakdown: dict[str, float]
class DailySummaryData(BaseSummaryData):
date: datetime
class WeeklySummaryData(BaseSummaryData):
start_date: datetime
end_date: datetime
week_number: int
year: int
class MonthlySummaryData(BaseSummaryData):
month: int
year: int
NotificationData = Annotated[
Union[
AgentRunData,
ZeroBalanceData,
LowBalanceData,
BlockExecutionFailedData,
ContinuousAgentErrorData,
MonthlySummaryData,
],
Field(discriminator="type"),
]
class NotificationEventDTO(BaseModel):
user_id: str
type: NotificationType
data: dict
created_at: datetime = Field(default_factory=datetime.now)
class NotificationEventModel(BaseModel, Generic[T_co]):
user_id: str
type: NotificationType
data: T_co
created_at: datetime = Field(default_factory=datetime.now)
@property
def strategy(self) -> BatchingStrategy:
return NotificationTypeOverride(self.type).strategy
@field_validator("type", mode="before")
def uppercase_type(cls, v):
if isinstance(v, str):
return v.upper()
return v
@property
def template(self) -> str:
return NotificationTypeOverride(self.type).template
def get_data_type(
notification_type: NotificationType,
) -> type[BaseNotificationData]:
return {
NotificationType.AGENT_RUN: AgentRunData,
NotificationType.ZERO_BALANCE: ZeroBalanceData,
NotificationType.LOW_BALANCE: LowBalanceData,
NotificationType.BLOCK_EXECUTION_FAILED: BlockExecutionFailedData,
NotificationType.CONTINUOUS_AGENT_ERROR: ContinuousAgentErrorData,
NotificationType.DAILY_SUMMARY: DailySummaryData,
NotificationType.WEEKLY_SUMMARY: WeeklySummaryData,
NotificationType.MONTHLY_SUMMARY: MonthlySummaryData,
}[notification_type]
class NotificationBatch(BaseModel):
user_id: str
events: list[NotificationEvent]
strategy: BatchingStrategy
last_update: datetime = datetime.now()
class NotificationResult(BaseModel):
success: bool
message: Optional[str] = None
class NotificationTypeOverride:
def __init__(self, notification_type: NotificationType):
self.notification_type = notification_type
@property
def strategy(self) -> BatchingStrategy:
BATCHING_RULES = {
# These are batched by the notification service
NotificationType.AGENT_RUN: BatchingStrategy.IMMEDIATE,
# These are batched by the notification service, but with a backoff strategy
NotificationType.ZERO_BALANCE: BatchingStrategy.BACKOFF,
NotificationType.LOW_BALANCE: BatchingStrategy.BACKOFF,
NotificationType.BLOCK_EXECUTION_FAILED: BatchingStrategy.BACKOFF,
NotificationType.CONTINUOUS_AGENT_ERROR: BatchingStrategy.BACKOFF,
# These aren't batched by the notification service, so we send them right away
NotificationType.DAILY_SUMMARY: BatchingStrategy.IMMEDIATE,
NotificationType.WEEKLY_SUMMARY: BatchingStrategy.IMMEDIATE,
NotificationType.MONTHLY_SUMMARY: BatchingStrategy.IMMEDIATE,
}
return BATCHING_RULES.get(self.notification_type, BatchingStrategy.HOURLY)
@property
def template(self) -> str:
"""Returns template name for this notification type"""
return {
NotificationType.AGENT_RUN: "agent_run.html",
NotificationType.ZERO_BALANCE: "zero_balance.html",
NotificationType.LOW_BALANCE: "low_balance.html",
NotificationType.BLOCK_EXECUTION_FAILED: "block_failed.html",
NotificationType.CONTINUOUS_AGENT_ERROR: "agent_error.html",
NotificationType.DAILY_SUMMARY: "daily_summary.html",
NotificationType.WEEKLY_SUMMARY: "weekly_summary.html",
NotificationType.MONTHLY_SUMMARY: "monthly_summary.html",
}[self.notification_type]
class NotificationPreference(BaseModel):
user_id: str
email: EmailStr
preferences: dict[NotificationType, bool] = Field(
default_factory=dict, description="Which notifications the user wants"
)
daily_limit: int = 10 # Max emails per day
emails_sent_today: int = 0
last_reset_date: datetime = Field(default_factory=datetime.now)
def get_batch_delay(notification_type: NotificationType) -> timedelta:
return {
NotificationType.AGENT_RUN: timedelta(seconds=1),
NotificationType.ZERO_BALANCE: timedelta(minutes=60),
NotificationType.LOW_BALANCE: timedelta(minutes=60),
NotificationType.BLOCK_EXECUTION_FAILED: timedelta(minutes=60),
NotificationType.CONTINUOUS_AGENT_ERROR: timedelta(minutes=60),
}[notification_type]
async def create_or_add_to_user_notification_batch(
user_id: str,
notification_type: NotificationType,
data: str, # type: 'NotificationEventModel'
) -> dict:
try:
logger.info(
f"Creating or adding to notification batch for {user_id} with type {notification_type} and data {data}"
)
notification_data = NotificationEventModel[
get_data_type(notification_type)
].model_validate_json(data)
# Serialize the data
json_data: Json = Json(notification_data.data.model_dump_json())
# First try to find existing batch
existing_batch = await UserNotificationBatch.prisma().find_unique(
where={
"userId_type": {
"userId": user_id,
"type": notification_type,
}
},
include={"notifications": True},
)
if not existing_batch:
async with transaction() as tx:
notification_event = await tx.notificationevent.create(
data={
"type": notification_type,
"data": json_data,
}
)
# Create new batch
resp = await tx.usernotificationbatch.create(
data={
"userId": user_id,
"type": notification_type,
"notifications": {"connect": [{"id": notification_event.id}]},
},
include={"notifications": True},
)
return resp.model_dump()
else:
async with transaction() as tx:
notification_event = await tx.notificationevent.create(
data={
"type": notification_type,
"data": json_data,
"UserNotificationBatch": {"connect": {"id": existing_batch.id}},
}
)
# Add to existing batch
resp = await tx.usernotificationbatch.update(
where={"id": existing_batch.id},
data={
"notifications": {"connect": [{"id": notification_event.id}]}
},
include={"notifications": True},
)
if not resp:
raise DatabaseError(
f"Failed to add notification event {notification_event.id} to existing batch {existing_batch.id}"
)
return resp.model_dump()
except Exception as e:
raise DatabaseError(
f"Failed to create or add to notification batch for user {user_id} and type {notification_type}: {e}"
) from e
async def get_user_notification_last_message_in_batch(
user_id: str,
notification_type: NotificationType,
) -> NotificationEvent | None:
try:
batch = await UserNotificationBatch.prisma().find_first(
where={"userId": user_id, "type": notification_type},
order={"createdAt": "desc"},
)
if not batch:
return None
if not batch.notifications:
return None
return batch.notifications[-1]
except Exception as e:
raise DatabaseError(
f"Failed to get user notification last message in batch for user {user_id} and type {notification_type}: {e}"
) from e
async def empty_user_notification_batch(
user_id: str, notification_type: NotificationType
) -> None:
try:
async with transaction() as tx:
await tx.notificationevent.delete_many(
where={
"UserNotificationBatch": {
"is": {"userId": user_id, "type": notification_type}
}
}
)
await tx.usernotificationbatch.delete_many(
where=UserNotificationBatchWhereInput(
userId=user_id,
type=notification_type,
)
)
except Exception as e:
raise DatabaseError(
f"Failed to empty user notification batch for user {user_id} and type {notification_type}: {e}"
) from e
async def get_user_notification_batch(
user_id: str,
notification_type: NotificationType,
) -> UserNotificationBatch | None:
try:
return await UserNotificationBatch.prisma().find_first(
where={"userId": user_id, "type": notification_type},
include={"notifications": True},
)
except Exception as e:
raise DatabaseError(
f"Failed to get user notification batch for user {user_id} and type {notification_type}: {e}"
) from e

View File

@@ -1,37 +1,49 @@
import logging
from datetime import datetime, timedelta
from typing import Optional, cast
from autogpt_libs.auth.models import DEFAULT_USER_ID
from fastapi import HTTPException
from prisma import Json
from prisma.enums import NotificationType
from prisma.models import User
from backend.data.db import prisma
from backend.data.model import UserIntegrations, UserMetadata, UserMetadataRaw
from backend.data.notifications import NotificationPreference
from backend.server.v2.store.exceptions import DatabaseError
from backend.util.encryption import JSONCryptor
logger = logging.getLogger(__name__)
async def get_or_create_user(user_data: dict) -> User:
user_id = user_data.get("sub")
if not user_id:
raise HTTPException(status_code=401, detail="User ID not found in token")
try:
user_id = user_data.get("sub")
if not user_id:
raise HTTPException(status_code=401, detail="User ID not found in token")
user_email = user_data.get("email")
if not user_email:
raise HTTPException(status_code=401, detail="Email not found in token")
user_email = user_data.get("email")
if not user_email:
raise HTTPException(status_code=401, detail="Email not found in token")
user = await prisma.user.find_unique(where={"id": user_id})
if not user:
user = await prisma.user.create(
data={
"id": user_id,
"email": user_email,
"name": user_data.get("user_metadata", {}).get("name"),
}
)
return User.model_validate(user)
user = await prisma.user.find_unique(where={"id": user_id})
if not user:
user = await prisma.user.create(
data={
"id": user_id,
"email": user_email,
"name": user_data.get("user_metadata", {}).get("name"),
"UserNotificationPreference": {"create": {"userId": user_id}},
}
)
if not user.userNotificationPreferenceId:
user.UserNotificationPreference = (
await prisma.usernotificationpreference.create(data={"userId": user_id})
)
return User.model_validate(user)
except Exception as e:
raise DatabaseError(f"Failed to get or create user {user_data}: {e}") from e
async def get_user_by_id(user_id: str) -> User:
@@ -130,3 +142,109 @@ async def migrate_and_encrypt_user_integrations():
where={"id": user.id},
data={"metadata": Json(raw_metadata)},
)
async def get_active_user_ids_in_timerange(
start_time: datetime, end_time: datetime
) -> list[str]:
try:
users = await User.prisma().find_many(
where={
"AgentGraphExecutions": {
"some": {
"createdAt": {
"gte": start_time,
"lte": end_time,
}
}
}
},
)
return [user.id for user in users]
except Exception as e:
raise DatabaseError(
f"Failed to get active user ids in timerange {start_time} to {end_time}: {e}"
) from e
async def get_active_users_ids() -> list[str]:
user_ids = await get_active_user_ids_in_timerange(
datetime.now() - timedelta(days=30),
datetime.now(),
)
return user_ids
async def get_user_notification_preference(user_id: str) -> NotificationPreference:
try:
user = await User.prisma().find_unique_or_raise(
where={"id": user_id},
include={
"UserNotificationPreference": True,
},
)
# enable notifications by default if user has no notification preference (shouldn't ever happen though)
preferences: dict[NotificationType, bool] = {
NotificationType.AGENT_RUN: (
user.UserNotificationPreference.notifyOnAgentRun
if user.UserNotificationPreference
else True
),
NotificationType.ZERO_BALANCE: (
user.UserNotificationPreference.notifyOnZeroBalance
if user.UserNotificationPreference
else True
),
NotificationType.LOW_BALANCE: (
user.UserNotificationPreference.notifyOnLowBalance
if user.UserNotificationPreference
else True
),
NotificationType.BLOCK_EXECUTION_FAILED: (
user.UserNotificationPreference.notifyOnBlockExecutionFailed
if user.UserNotificationPreference
else True
),
NotificationType.CONTINUOUS_AGENT_ERROR: (
user.UserNotificationPreference.notifyOnContinuousAgentError
if user.UserNotificationPreference
else True
),
NotificationType.DAILY_SUMMARY: (
user.UserNotificationPreference.notifyOnDailySummary
if user.UserNotificationPreference
else True
),
NotificationType.WEEKLY_SUMMARY: (
user.UserNotificationPreference.notifyOnWeeklySummary
if user.UserNotificationPreference
else True
),
NotificationType.MONTHLY_SUMMARY: (
user.UserNotificationPreference.notifyOnMonthlySummary
if user.UserNotificationPreference
else True
),
}
daily_limit = (
user.UserNotificationPreference.maxEmailsPerDay
if user.UserNotificationPreference
else 3
)
notification_preference = NotificationPreference(
user_id=user.id,
email=user.email,
preferences=preferences,
daily_limit=daily_limit,
# TODO with other changes later, for now we just will email them
emails_sent_today=0,
last_reset_date=datetime.now(),
)
return NotificationPreference.model_validate(notification_preference)
except Exception as e:
raise DatabaseError(
f"Failed to upsert user notification preference for user {user_id}: {e}"
) from e

View File

@@ -8,6 +8,7 @@ from backend.data.execution import (
RedisExecutionEventBus,
create_graph_execution,
get_execution_results,
get_executions_in_timerange,
get_incomplete_executions,
get_latest_execution,
update_execution_status,
@@ -17,9 +18,19 @@ from backend.data.execution import (
upsert_execution_output,
)
from backend.data.graph import get_graph, get_node
from backend.data.notifications import (
create_or_add_to_user_notification_batch,
empty_user_notification_batch,
get_user_notification_batch,
get_user_notification_last_message_in_batch,
)
from backend.data.user import (
get_active_user_ids_in_timerange,
get_active_users_ids,
get_user_by_id,
get_user_integrations,
get_user_metadata,
get_user_notification_preference,
update_user_integrations,
update_user_metadata,
)
@@ -72,6 +83,7 @@ class DatabaseManager(AppService):
update_node_execution_stats = exposed_run_and_wait(update_node_execution_stats)
upsert_execution_input = exposed_run_and_wait(upsert_execution_input)
upsert_execution_output = exposed_run_and_wait(upsert_execution_output)
get_executions_in_timerange = exposed_run_and_wait(get_executions_in_timerange)
# Graphs
get_node = exposed_run_and_wait(get_node)
@@ -84,8 +96,26 @@ class DatabaseManager(AppService):
exposed_run_and_wait(user_credit_model.spend_credits),
)
# User + User Metadata + User Integrations
# User + User Metadata + User Integrations + User Notification Preferences
get_user_metadata = exposed_run_and_wait(get_user_metadata)
update_user_metadata = exposed_run_and_wait(update_user_metadata)
get_user_integrations = exposed_run_and_wait(get_user_integrations)
update_user_integrations = exposed_run_and_wait(update_user_integrations)
get_active_user_ids_in_timerange = exposed_run_and_wait(
get_active_user_ids_in_timerange
)
get_user_by_id = exposed_run_and_wait(get_user_by_id)
get_user_notification_preference = exposed_run_and_wait(
get_user_notification_preference
)
get_active_users_ids = exposed_run_and_wait(get_active_users_ids)
# Notifications
create_or_add_to_user_notification_batch = exposed_run_and_wait(
create_or_add_to_user_notification_batch
)
get_user_notification_last_message_in_batch = exposed_run_and_wait(
get_user_notification_last_message_in_batch
)
empty_user_notification_batch = exposed_run_and_wait(empty_user_notification_batch)
get_user_notification_batch = exposed_run_and_wait(get_user_notification_batch)

View File

@@ -6,6 +6,7 @@ import threading
import time
import typing
from abc import ABC, abstractmethod
from datetime import datetime
from enum import Enum
from types import NoneType, UnionType
from typing import (
@@ -307,3 +308,16 @@ def _pydantic_models_from_type_annotation(annotation) -> Iterator[type[BaseModel
yield annotype
elif annotype not in builtin_types and not issubclass(annotype, Enum):
raise TypeError(f"Unsupported type encountered: {annotype}")
# Register (de)serializers for datetime objects
pyro.register_class_to_dict(
datetime,
lambda dt: {
"__class__": datetime.__qualname__,
"iso": cast(datetime, dt).isoformat(),
},
)
pyro.register_dict_to_class(
datetime.__qualname__, lambda _, dict: datetime.fromisoformat(dict["iso"])
)

View File

@@ -0,0 +1,71 @@
/*
Warnings:
- A unique constraint covering the columns `[userNotificationPreferenceId]` on the table `User` will be added. If there are existing duplicate values, this will fail.
*/
-- CreateEnum
CREATE TYPE "NotificationType" AS ENUM ('AGENT_RUN', 'ZERO_BALANCE', 'LOW_BALANCE', 'BLOCK_EXECUTION_FAILED', 'CONTINUOUS_AGENT_ERROR', 'DAILY_SUMMARY', 'WEEKLY_SUMMARY', 'MONTHLY_SUMMARY');
-- AlterTable
ALTER TABLE "User" ADD COLUMN "userNotificationPreferenceId" TEXT;
-- CreateTable
CREATE TABLE "NotificationEvent" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"userNotificationBatchId" TEXT,
"type" "NotificationType" NOT NULL,
"data" JSONB NOT NULL,
CONSTRAINT "NotificationEvent_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "UserNotificationBatch" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"userId" TEXT NOT NULL,
"type" "NotificationType" NOT NULL,
CONSTRAINT "UserNotificationBatch_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "UserNotificationPreference" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"userId" TEXT NOT NULL,
"maxEmailsPerDay" INTEGER NOT NULL DEFAULT 3,
"notifyOnAgentRun" BOOLEAN NOT NULL DEFAULT true,
"notifyOnZeroBalance" BOOLEAN NOT NULL DEFAULT true,
"notifyOnLowBalance" BOOLEAN NOT NULL DEFAULT true,
"notifyOnBlockExecutionFailed" BOOLEAN NOT NULL DEFAULT true,
"notifyOnContinuousAgentError" BOOLEAN NOT NULL DEFAULT true,
"notifyOnDailySummary" BOOLEAN NOT NULL DEFAULT true,
"notifyOnWeeklySummary" BOOLEAN NOT NULL DEFAULT true,
"notifyOnMonthlySummary" BOOLEAN NOT NULL DEFAULT true,
CONSTRAINT "UserNotificationPreference_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "UserNotificationBatch_userId_type_key" ON "UserNotificationBatch"("userId", "type");
-- CreateIndex
CREATE UNIQUE INDEX "UserNotificationPreference_userId_key" ON "UserNotificationPreference"("userId");
-- CreateIndex
CREATE UNIQUE INDEX "User_userNotificationPreferenceId_key" ON "User"("userNotificationPreferenceId");
-- AddForeignKey
ALTER TABLE "User" ADD CONSTRAINT "User_userNotificationPreferenceId_fkey" FOREIGN KEY ("userNotificationPreferenceId") REFERENCES "UserNotificationPreference"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "NotificationEvent" ADD CONSTRAINT "NotificationEvent_userNotificationBatchId_fkey" FOREIGN KEY ("userNotificationBatchId") REFERENCES "UserNotificationBatch"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "UserNotificationBatch" ADD CONSTRAINT "UserNotificationBatch_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE;

View File

@@ -740,6 +740,27 @@ files = [
{file = "distro-1.9.0.tar.gz", hash = "sha256:2fa77c6fd8940f116ee1d6b94a2f90b13b5ea8d019b98bc8bafdcabcdd9bdbed"},
]
[[package]]
name = "dnspython"
version = "2.7.0"
description = "DNS toolkit"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "dnspython-2.7.0-py3-none-any.whl", hash = "sha256:b4c34b7d10b51bcc3a5071e7b8dee77939f1e878477eeecc965e9835f63c6c86"},
{file = "dnspython-2.7.0.tar.gz", hash = "sha256:ce9c432eda0dc91cf618a5cedf1a4e142651196bbcd2c80e89ed5a907e5cfaf1"},
]
[package.extras]
dev = ["black (>=23.1.0)", "coverage (>=7.0)", "flake8 (>=7)", "hypercorn (>=0.16.0)", "mypy (>=1.8)", "pylint (>=3)", "pytest (>=7.4)", "pytest-cov (>=4.1.0)", "quart-trio (>=0.11.0)", "sphinx (>=7.2.0)", "sphinx-rtd-theme (>=2.0.0)", "twine (>=4.0.0)", "wheel (>=0.42.0)"]
dnssec = ["cryptography (>=43)"]
doh = ["h2 (>=4.1.0)", "httpcore (>=1.0.0)", "httpx (>=0.26.0)"]
doq = ["aioquic (>=1.0.0)"]
idna = ["idna (>=3.7)"]
trio = ["trio (>=0.23)"]
wmi = ["wmi (>=1.5.1)"]
[[package]]
name = "e2b"
version = "1.0.5"
@@ -778,6 +799,22 @@ attrs = ">=21.3.0"
e2b = ">=1.0.4,<2.0.0"
httpx = ">=0.20.0,<1.0.0"
[[package]]
name = "email-validator"
version = "2.2.0"
description = "A robust email address syntax and deliverability validation library."
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "email_validator-2.2.0-py3-none-any.whl", hash = "sha256:561977c2d73ce3611850a06fa56b414621e0c8faa9d66f2611407d87465da631"},
{file = "email_validator-2.2.0.tar.gz", hash = "sha256:cb690f344c617a714f22e66ae771445a1ceb46821152df8e165c5f9a364582b7"},
]
[package.dependencies]
dnspython = ">=2.0.0"
idna = ">=2.0.0"
[[package]]
name = "exceptiongroup"
version = "1.2.2"
@@ -3226,6 +3263,7 @@ files = [
[package.dependencies]
annotated-types = ">=0.6.0"
email-validator = {version = ">=2.0.0", optional = true, markers = "extra == \"email\""}
pydantic-core = "2.27.2"
typing-extensions = ">=4.12.2"
@@ -5084,4 +5122,4 @@ type = ["pytest-mypy"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<3.13"
content-hash = "a3af2c13c3fce626006c6469263e44cc7f1b8c26e3d0a6beabd1b33caac35128"
content-hash = "4052d96f95ad3dbf8bef4d651168f6df1ef21c506f152ddca119ad8f23caf159"

View File

@@ -40,7 +40,7 @@ praw = "~7.8.1"
prisma = "^0.15.0"
psutil = "^6.1.0"
psycopg2-binary = "^2.9.10"
pydantic = "^2.9.2"
pydantic = {extras = ["email"], version = "^2.10.6"}
pydantic-settings = "^2.3.4"
pyro5 = "^5.15"
pytest = "^8.2.1"

View File

@@ -13,17 +13,19 @@ generator client {
// User model to mirror Auth provider users
model User {
id String @id // This should match the Supabase user ID
email String @unique
name String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
metadata Json @default("{}")
integrations String @default("")
stripeCustomerId String?
topUpConfig Json?
id String @id // This should match the Supabase user ID
email String @unique
name String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
metadata Json @default("{}")
integrations String @default("")
stripeCustomerId String?
topUpConfig Json?
userNotificationPreferenceId String? @unique
// Relations
AgentGraphs AgentGraph[]
AgentGraphExecutions AgentGraphExecution[]
AnalyticsDetails AnalyticsDetails[]
@@ -33,12 +35,14 @@ model User {
AgentPreset AgentPreset[]
UserAgent UserAgent[]
Profile Profile[]
StoreListing StoreListing[]
StoreListingReview StoreListingReview[]
StoreListingSubmission StoreListingSubmission[]
APIKeys APIKey[]
IntegrationWebhooks IntegrationWebhook[]
Profile Profile[]
StoreListing StoreListing[]
StoreListingReview StoreListingReview[]
StoreListingSubmission StoreListingSubmission[]
APIKeys APIKey[]
IntegrationWebhooks IntegrationWebhook[]
UserNotificationBatch UserNotificationBatch[]
UserNotificationPreference UserNotificationPreference? @relation(fields: [userNotificationPreferenceId], references: [id], onDelete: Cascade)
@@index([id])
@@index([email])
@@ -110,6 +114,65 @@ model AgentPreset {
@@index([userId])
}
enum NotificationType {
AGENT_RUN
ZERO_BALANCE
LOW_BALANCE
BLOCK_EXECUTION_FAILED
CONTINUOUS_AGENT_ERROR
DAILY_SUMMARY
WEEKLY_SUMMARY
MONTHLY_SUMMARY
}
model NotificationEvent {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
UserNotificationBatch UserNotificationBatch? @relation(fields: [userNotificationBatchId], references: [id])
userNotificationBatchId String?
type NotificationType
data Json
}
model UserNotificationBatch {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
userId String
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
type NotificationType
notifications NotificationEvent[]
// Each user can only have one batch of a notification type at a time
@@unique([userId, type])
}
model UserNotificationPreference {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
userId String @unique // Add @unique here
User User?
maxEmailsPerDay Int @default(3)
notifyOnAgentRun Boolean @default(true)
notifyOnZeroBalance Boolean @default(true)
notifyOnLowBalance Boolean @default(true)
notifyOnBlockExecutionFailed Boolean @default(true)
notifyOnContinuousAgentError Boolean @default(true)
notifyOnDailySummary Boolean @default(true)
notifyOnWeeklySummary Boolean @default(true)
notifyOnMonthlySummary Boolean @default(true)
}
// For the library page
// It is a user controlled list of agents, that they will see in there library
model UserAgent {