Merge branch 'ntindle/secrt-1088-add-db-models-for-the-notification-service' into ntindle/secrt-1077-add-email-service-smaller

This commit is contained in:
Nicholas Tindle
2025-02-11 20:30:48 -06:00
committed by GitHub
3 changed files with 168 additions and 125 deletions

View File

@@ -15,6 +15,7 @@ from pydantic import BaseModel
from backend.data.block import BlockData, BlockInput, CompletedBlockOutput
from backend.data.includes import EXECUTION_RESULT_INCLUDE, GRAPH_EXECUTION_INCLUDE
from backend.data.queue import AsyncRedisEventBus, RedisEventBus
from backend.server.v2.store.exceptions import DatabaseError
from backend.util import json, mock
from backend.util.settings import Config
@@ -367,21 +368,26 @@ async def get_execution_results(graph_exec_id: str) -> list[ExecutionResult]:
async def get_executions_in_timerange(
user_id: str, start_time: str, end_time: str
) -> list[ExecutionResult]:
executions = await AgentGraphExecution.prisma().find_many(
where={
"AND": [
{
"startedAt": {
"gte": datetime.fromisoformat(start_time),
"lte": datetime.fromisoformat(end_time),
}
},
{"userId": user_id},
]
},
include=GRAPH_EXECUTION_INCLUDE,
)
return [ExecutionResult.from_graph(execution) for execution in executions]
try:
executions = await AgentGraphExecution.prisma().find_many(
where={
"AND": [
{
"startedAt": {
"gte": datetime.fromisoformat(start_time),
"lte": datetime.fromisoformat(end_time),
}
},
{"userId": user_id},
]
},
include=GRAPH_EXECUTION_INCLUDE,
)
return [ExecutionResult.from_graph(execution) for execution in executions]
except Exception as e:
raise DatabaseError(
f"Failed to get executions in timerange {start_time} to {end_time} for user {user_id}: {e}"
) from e
LIST_SPLIT = "_$_"

View File

@@ -11,6 +11,8 @@ from prisma.types import UserNotificationBatchWhereInput
# from backend.notifications.models import NotificationEvent
from pydantic import BaseModel, EmailStr, Field, field_validator
from backend.server.v2.store.exceptions import DatabaseError
from .db import transaction
logger = logging.getLogger(__name__)
@@ -228,107 +230,131 @@ async def create_or_add_to_user_notification_batch(
notification_type: NotificationType,
data: str, # type: 'NotificationEventModel'
) -> dict:
logger.info(
f"Creating or adding to notification batch for {user_id} with type {notification_type} and data {data}"
)
try:
logger.info(
f"Creating or adding to notification batch for {user_id} with type {notification_type} and data {data}"
)
notification_data = NotificationEventModel[
get_data_type(notification_type)
].model_validate_json(data)
notification_data = NotificationEventModel[
get_data_type(notification_type)
].model_validate_json(data)
# Serialize the data
json_data: Json = Json(notification_data.data.model_dump_json())
# Serialize the data
json_data: Json = Json(notification_data.data.model_dump_json())
# First try to find existing batch
existing_batch = await UserNotificationBatch.prisma().find_unique(
where={
"userId_type": {
"userId": user_id,
"type": notification_type,
}
},
include={"notifications": True},
)
if not existing_batch:
async with transaction() as tx:
notification_event = await tx.notificationevent.create(
data={
"type": notification_type,
"data": json_data,
}
)
# Create new batch
resp = await tx.usernotificationbatch.create(
data={
# First try to find existing batch
existing_batch = await UserNotificationBatch.prisma().find_unique(
where={
"userId_type": {
"userId": user_id,
"type": notification_type,
"notifications": {"connect": [{"id": notification_event.id}]},
},
include={"notifications": True},
)
return resp.model_dump()
else:
async with transaction() as tx:
notification_event = await tx.notificationevent.create(
data={
"type": notification_type,
"data": json_data,
"UserNotificationBatch": {"connect": {"id": existing_batch.id}},
}
)
# Add to existing batch
resp = await tx.usernotificationbatch.update(
where={"id": existing_batch.id},
data={"notifications": {"connect": [{"id": notification_event.id}]}},
include={"notifications": True},
)
if not resp:
raise Exception("Failed to add to existing batch")
return resp.model_dump()
},
include={"notifications": True},
)
if not existing_batch:
async with transaction() as tx:
notification_event = await tx.notificationevent.create(
data={
"type": notification_type,
"data": json_data,
}
)
# Create new batch
resp = await tx.usernotificationbatch.create(
data={
"userId": user_id,
"type": notification_type,
"notifications": {"connect": [{"id": notification_event.id}]},
},
include={"notifications": True},
)
return resp.model_dump()
else:
async with transaction() as tx:
notification_event = await tx.notificationevent.create(
data={
"type": notification_type,
"data": json_data,
"UserNotificationBatch": {"connect": {"id": existing_batch.id}},
}
)
# Add to existing batch
resp = await tx.usernotificationbatch.update(
where={"id": existing_batch.id},
data={
"notifications": {"connect": [{"id": notification_event.id}]}
},
include={"notifications": True},
)
if not resp:
raise DatabaseError(
f"Failed to add notification event {notification_event.id} to existing batch {existing_batch.id}"
)
return resp.model_dump()
except Exception as e:
raise DatabaseError(
f"Failed to create or add to notification batch for user {user_id} and type {notification_type}: {e}"
) from e
async def get_user_notification_last_message_in_batch(
user_id: str,
notification_type: NotificationType,
) -> NotificationEvent | None:
batch = await UserNotificationBatch.prisma().find_first(
where={"userId": user_id, "type": notification_type},
order={"createdAt": "desc"},
)
if not batch:
return None
if not batch.notifications:
return None
return batch.notifications[-1]
try:
batch = await UserNotificationBatch.prisma().find_first(
where={"userId": user_id, "type": notification_type},
order={"createdAt": "desc"},
)
if not batch:
return None
if not batch.notifications:
return None
return batch.notifications[-1]
except Exception as e:
raise DatabaseError(
f"Failed to get user notification last message in batch for user {user_id} and type {notification_type}: {e}"
) from e
async def empty_user_notification_batch(
user_id: str, notification_type: NotificationType
) -> None:
async with transaction() as tx:
await tx.notificationevent.delete_many(
where={
"UserNotificationBatch": {
"is": {"userId": user_id, "type": notification_type}
try:
async with transaction() as tx:
await tx.notificationevent.delete_many(
where={
"UserNotificationBatch": {
"is": {"userId": user_id, "type": notification_type}
}
}
}
)
await tx.usernotificationbatch.delete_many(
where=UserNotificationBatchWhereInput(
userId=user_id,
type=notification_type,
)
)
await tx.usernotificationbatch.delete_many(
where=UserNotificationBatchWhereInput(
userId=user_id,
type=notification_type,
)
)
except Exception as e:
raise DatabaseError(
f"Failed to empty user notification batch for user {user_id} and type {notification_type}: {e}"
) from e
async def get_user_notification_batch(
user_id: str,
notification_type: NotificationType,
) -> UserNotificationBatch | None:
return await UserNotificationBatch.prisma().find_first(
where={"userId": user_id, "type": notification_type},
include={"notifications": True},
)
try:
return await UserNotificationBatch.prisma().find_first(
where={"userId": user_id, "type": notification_type},
include={"notifications": True},
)
except Exception as e:
raise DatabaseError(
f"Failed to get user notification batch for user {user_id} and type {notification_type}: {e}"
) from e

View File

@@ -18,29 +18,34 @@ logger = logging.getLogger(__name__)
async def get_or_create_user(user_data: dict) -> User:
user_id = user_data.get("sub")
if not user_id:
raise HTTPException(status_code=401, detail="User ID not found in token")
try:
user_id = user_data.get("sub")
if not user_id:
raise HTTPException(status_code=401, detail="User ID not found in token")
user_email = user_data.get("email")
if not user_email:
raise HTTPException(status_code=401, detail="Email not found in token")
user_email = user_data.get("email")
if not user_email:
raise HTTPException(status_code=401, detail="Email not found in token")
user = await prisma.user.find_unique(where={"id": user_id})
if not user:
user = await prisma.user.create(
data={
"id": user_id,
"email": user_email,
"name": user_data.get("user_metadata", {}).get("name"),
"UserNotificationPreference": {"create": {"userId": user_id}},
}
user = await prisma.user.find_unique(
where={"id": user_id}, include={"UserNotificationPreference": True}
)
if not user.UserNotificationPreference:
user.UserNotificationPreference = (
await prisma.usernotificationpreference.create(data={"userId": user_id})
)
return User.model_validate(user)
if not user:
user = await prisma.user.create(
data={
"id": user_id,
"email": user_email,
"name": user_data.get("user_metadata", {}).get("name"),
"UserNotificationPreference": {"create": {"userId": user_id}},
}
)
if not user.UserNotificationPreference:
user.UserNotificationPreference = (
await prisma.usernotificationpreference.create(data={"userId": user_id})
)
return User.model_validate(user)
except Exception as e:
raise DatabaseError(f"Failed to get or create user {user_data}: {e}") from e
async def get_user_by_id(user_id: str) -> User:
@@ -142,19 +147,25 @@ async def migrate_and_encrypt_user_integrations():
async def get_active_user_ids_in_timerange(start_time: str, end_time: str) -> list[str]:
users = await User.prisma().find_many(
where={
"AgentGraphExecutions": {
"some": {
"createdAt": {
"gte": datetime.fromisoformat(start_time),
"lte": datetime.fromisoformat(end_time),
try:
users = await User.prisma().find_many(
where={
"AgentGraphExecutions": {
"some": {
"createdAt": {
"gte": datetime.fromisoformat(start_time),
"lte": datetime.fromisoformat(end_time),
}
}
}
}
},
)
return [user.id for user in users]
},
)
return [user.id for user in users]
except Exception as e:
raise DatabaseError(
f"Failed to get active user ids in timerange {start_time} to {end_time}: {e}"
) from e
async def get_active_users_ids() -> list[str]: