feat(backend): Fix failed RPC on Notification Service (#9630)

Although returning a Prisma object on an RPC is a bad practice, we have
instances where we do so and the type contains a `prisma.Json` field.
This Json field can't be seamlessly serialized and then converted back
into the Prisma object.

### Changes 🏗️

Replacing prisma object as return type on notification service with a
plain pydantic object as DTO.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Calling notification APIs through the RPC client.

(cherry picked from commit b9f31a9c44)
This commit is contained in:
Zamil Majdy
2025-03-14 08:54:18 +07:00
parent 90f9e4e94a
commit b0fed43971
3 changed files with 71 additions and 27 deletions

View File

@@ -341,6 +341,43 @@ class NotificationPreference(BaseModel):
)
class UserNotificationEventDTO(BaseModel):
type: NotificationType
data: dict
created_at: datetime
updated_at: datetime
@staticmethod
def from_db(model: NotificationEvent) -> "UserNotificationEventDTO":
return UserNotificationEventDTO(
type=model.type,
data=dict(model.data),
created_at=model.createdAt,
updated_at=model.updatedAt,
)
class UserNotificationBatchDTO(BaseModel):
user_id: str
type: NotificationType
notifications: list[UserNotificationEventDTO]
created_at: datetime
updated_at: datetime
@staticmethod
def from_db(model: UserNotificationBatch) -> "UserNotificationBatchDTO":
return UserNotificationBatchDTO(
user_id=model.userId,
type=model.type,
notifications=[
UserNotificationEventDTO.from_db(notification)
for notification in model.notifications or []
],
created_at=model.createdAt,
updated_at=model.updatedAt,
)
def get_batch_delay(notification_type: NotificationType) -> timedelta:
return {
NotificationType.AGENT_RUN: timedelta(minutes=1),
@@ -355,7 +392,7 @@ async def create_or_add_to_user_notification_batch(
user_id: str,
notification_type: NotificationType,
notification_data: NotificationEventModel,
) -> UserNotificationBatch:
) -> UserNotificationBatchDTO:
try:
logger.info(
f"Creating or adding to notification batch for {user_id} with type {notification_type} and data {notification_data}"
@@ -393,7 +430,7 @@ async def create_or_add_to_user_notification_batch(
},
include={"notifications": True},
)
return resp
return UserNotificationBatchDTO.from_db(resp)
else:
async with transaction() as tx:
notification_event = await tx.notificationevent.create(
@@ -415,7 +452,7 @@ async def create_or_add_to_user_notification_batch(
raise DatabaseError(
f"Failed to add notification event {notification_event.id} to existing batch {existing_batch.id}"
)
return resp
return UserNotificationBatchDTO.from_db(resp)
except Exception as e:
raise DatabaseError(
f"Failed to create or add to notification batch for user {user_id} and type {notification_type}: {e}"
@@ -425,7 +462,7 @@ async def create_or_add_to_user_notification_batch(
async def get_user_notification_oldest_message_in_batch(
user_id: str,
notification_type: NotificationType,
) -> NotificationEvent | None:
) -> UserNotificationEventDTO | None:
try:
batch = await UserNotificationBatch.prisma().find_first(
where={"userId": user_id, "type": notification_type},
@@ -436,7 +473,12 @@ async def get_user_notification_oldest_message_in_batch(
if not batch.notifications:
return None
sorted_notifications = sorted(batch.notifications, key=lambda x: x.createdAt)
return sorted_notifications[0]
return (
UserNotificationEventDTO.from_db(sorted_notifications[0])
if sorted_notifications
else None
)
except Exception as e:
raise DatabaseError(
f"Failed to get user notification last message in batch for user {user_id} and type {notification_type}: {e}"
@@ -471,12 +513,13 @@ async def empty_user_notification_batch(
async def get_user_notification_batch(
user_id: str,
notification_type: NotificationType,
) -> UserNotificationBatch | None:
) -> UserNotificationBatchDTO | None:
try:
return await UserNotificationBatch.prisma().find_first(
batch = await UserNotificationBatch.prisma().find_first(
where={"userId": user_id, "type": notification_type},
include={"notifications": True},
)
return UserNotificationBatchDTO.from_db(batch) if batch else None
except Exception as e:
raise DatabaseError(
f"Failed to get user notification batch for user {user_id} and type {notification_type}: {e}"
@@ -485,9 +528,9 @@ async def get_user_notification_batch(
async def get_all_batches_by_type(
notification_type: NotificationType,
) -> list[UserNotificationBatch]:
) -> list[UserNotificationBatchDTO]:
try:
return await UserNotificationBatch.prisma().find_many(
batches = await UserNotificationBatch.prisma().find_many(
where={
"type": notification_type,
"notifications": {
@@ -496,6 +539,7 @@ async def get_all_batches_by_type(
},
include={"notifications": True},
)
return [UserNotificationBatchDTO.from_db(batch) for batch in batches]
except Exception as e:
raise DatabaseError(
f"Failed to get all batches by type {notification_type}: {e}"

View File

@@ -193,68 +193,68 @@ class NotificationManager(AppService):
# Check if batch has aged out
oldest_message = (
get_db().get_user_notification_oldest_message_in_batch(
batch.userId, notification_type
batch.user_id, notification_type
)
)
if not oldest_message:
# this should never happen
logger.error(
f"Batch for user {batch.userId} and type {notification_type} has no oldest message whichshould never happen!!!!!!!!!!!!!!!!"
f"Batch for user {batch.user_id} and type {notification_type} has no oldest message whichshould never happen!!!!!!!!!!!!!!!!"
)
continue
max_delay = get_batch_delay(notification_type)
# If batch has aged out, process it
if oldest_message.createdAt + max_delay < current_time:
recipient_email = get_db().get_user_email_by_id(batch.userId)
if oldest_message.created_at + max_delay < current_time:
recipient_email = get_db().get_user_email_by_id(batch.user_id)
if not recipient_email:
logger.error(
f"User email not found for user {batch.userId}"
f"User email not found for user {batch.user_id}"
)
continue
should_send = self._should_email_user_based_on_preference(
batch.userId, notification_type
batch.user_id, notification_type
)
if not should_send:
logger.debug(
f"User {batch.userId} does not want to receive {notification_type} notifications"
f"User {batch.user_id} does not want to receive {notification_type} notifications"
)
# Clear the batch
get_db().empty_user_notification_batch(
batch.userId, notification_type
batch.user_id, notification_type
)
continue
batch_data = get_db().get_user_notification_batch(
batch.userId, notification_type
batch.user_id, notification_type
)
if not batch_data or not batch_data.notifications:
logger.error(
f"Batch data not found for user {batch.userId}"
f"Batch data not found for user {batch.user_id}"
)
# Clear the batch
get_db().empty_user_notification_batch(
batch.userId, notification_type
batch.user_id, notification_type
)
continue
unsub_link = generate_unsubscribe_link(batch.userId)
unsub_link = generate_unsubscribe_link(batch.user_id)
events = [
NotificationEventModel[
get_notif_data_type(db_event.type)
].model_validate(
{
"user_id": batch.userId,
"user_id": batch.user_id,
"type": db_event.type,
"data": db_event.data,
"created_at": db_event.createdAt,
"created_at": db_event.created_at,
}
)
for db_event in batch_data.notifications
@@ -270,7 +270,7 @@ class NotificationManager(AppService):
# Clear the batch
get_db().empty_user_notification_batch(
batch.userId, notification_type
batch.user_id, notification_type
)
processed_count += 1
@@ -465,7 +465,7 @@ class NotificationManager(AppService):
f"Batch for user {user_id} and type {event_type} has no oldest message whichshould never happen!!!!!!!!!!!!!!!!"
)
return False
oldest_age = oldest_message.createdAt
oldest_age = oldest_message.created_at
max_delay = get_batch_delay(event_type)
@@ -584,7 +584,7 @@ class NotificationManager(AppService):
"user_id": event.user_id,
"type": db_event.type,
"data": db_event.data,
"created_at": db_event.createdAt,
"created_at": db_event.created_at,
}
)
for db_event in batch.notifications

View File

@@ -455,7 +455,7 @@ def fastapi_get_service_client(
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error in {method_name}: {e.response.text}")
error = RemoteCallError.model_validate(e.response.json(), strict=False)
error = RemoteCallError.model_validate(e.response.json())
# DEBUG HELP: if you made a custom exception, make sure you override self.args to be how to make your exception
raise EXCEPTION_MAPPING.get(error.type, Exception)(
*(error.args or [str(e)])