fix(backend): Move Notification service to DB manager (#9626)

DatabaseManager is already provisioned in RestApiService, and
NotificationService lives within the same instance as the Rest Server.

### Changes 🏗️

Moving the DB calls of NotificationService to DatabaseManager.

### Checklist 📋

#### For code changes:
- [ ] I have clearly listed my changes in the PR description
- [ ] I have made a test plan
- [ ] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [ ] ...

<details>
  <summary>Example test plan</summary>

  - [ ] Create from scratch and execute an agent with at least 3 blocks
- [ ] Import an agent from file upload, and confirm it executes
correctly
  - [ ] Upload agent to marketplace
- [ ] Import an agent from marketplace and confirm it executes correctly
  - [ ] Edit an agent from monitor, and confirm it executes correctly
</details>

#### For configuration changes:
- [ ] `.env.example` is updated or already compatible with my changes
- [ ] `docker-compose.yml` is updated or already compatible with my
changes
- [ ] I have included a list of my configuration changes in the PR
description (under **Changes**)

<details>
  <summary>Examples of configuration changes</summary>

  - Changing ports
  - Adding new services that need to communicate with each other
  - Secrets or environment variable changes
  - New or infrastructure changes such as databases
</details>

(cherry picked from commit f4d4bb83b0)
This commit is contained in:
Zamil Majdy
2025-03-13 10:14:46 +07:00
parent c1e329497c
commit 90f9e4e94a
4 changed files with 87 additions and 70 deletions

View File

@@ -14,7 +14,7 @@ from prisma.models import (
AgentNodeLink,
StoreListingVersion,
)
from prisma.types import AgentGraphWhereInput
from prisma.types import AgentGraphExecutionWhereInput, AgentGraphWhereInput
from pydantic.fields import Field, computed_field
from backend.blocks.agent import AgentExecutorBlock
@@ -597,18 +597,20 @@ async def get_graphs(
return graph_models
# TODO: move execution stuff to .execution
async def get_graphs_executions(user_id: str) -> list[GraphExecutionMeta]:
executions = await AgentGraphExecution.prisma().find_many(
where={"isDeleted": False, "userId": user_id},
order={"createdAt": "desc"},
)
return [GraphExecutionMeta.from_db(execution) for execution in executions]
async def get_graph_executions(
graph_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> list[GraphExecutionMeta]:
where_filter: AgentGraphExecutionWhereInput = {
"isDeleted": False,
}
if user_id:
where_filter["userId"] = user_id
if graph_id:
where_filter["agentGraphId"] = graph_id
async def get_graph_executions(graph_id: str, user_id: str) -> list[GraphExecutionMeta]:
executions = await AgentGraphExecution.prisma().find_many(
where={"agentGraphId": graph_id, "isDeleted": False, "userId": user_id},
where=where_filter,
order={"createdAt": "desc"},
)
return [GraphExecutionMeta.from_db(execution) for execution in executions]

View File

@@ -20,9 +20,20 @@ from backend.data.graph import (
get_graph_metadata,
get_node,
)
from backend.data.notifications import (
create_or_add_to_user_notification_batch,
empty_user_notification_batch,
get_all_batches_by_type,
get_user_notification_batch,
get_user_notification_oldest_message_in_batch,
)
from backend.data.user import (
get_active_user_ids_in_timerange,
get_user_email_by_id,
get_user_email_verification,
get_user_integrations,
get_user_metadata,
get_user_notification_preference,
update_user_integrations,
update_user_metadata,
)
@@ -80,3 +91,24 @@ class DatabaseManager(AppService):
update_user_metadata = exposed_run_and_wait(update_user_metadata)
get_user_integrations = exposed_run_and_wait(get_user_integrations)
update_user_integrations = exposed_run_and_wait(update_user_integrations)
# User Comms - async
get_active_user_ids_in_timerange = exposed_run_and_wait(
get_active_user_ids_in_timerange
)
get_user_email_by_id = exposed_run_and_wait(get_user_email_by_id)
get_user_email_verification = exposed_run_and_wait(get_user_email_verification)
get_user_notification_preference = exposed_run_and_wait(
get_user_notification_preference
)
# Notifications - async
create_or_add_to_user_notification_batch = exposed_run_and_wait(
create_or_add_to_user_notification_batch
)
empty_user_notification_batch = exposed_run_and_wait(empty_user_notification_batch)
get_all_batches_by_type = exposed_run_and_wait(get_all_batches_by_type)
get_user_notification_batch = exposed_run_and_wait(get_user_notification_batch)
get_user_notification_oldest_message_in_batch = exposed_run_and_wait(
get_user_notification_oldest_message_in_batch
)

View File

@@ -23,23 +23,12 @@ from backend.data.notifications import (
SummaryParamsEventModel,
WeeklySummaryData,
WeeklySummaryParams,
create_or_add_to_user_notification_batch,
empty_user_notification_batch,
get_all_batches_by_type,
get_batch_delay,
get_notif_data_type,
get_summary_params_type,
get_user_notification_batch,
get_user_notification_oldest_message_in_batch,
)
from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig
from backend.data.user import (
generate_unsubscribe_link,
get_active_user_ids_in_timerange,
get_user_email_by_id,
get_user_email_verification,
get_user_notification_preference,
)
from backend.data.user import generate_unsubscribe_link
from backend.notifications.email import EmailSender
from backend.util.service import AppService, expose, get_service_client
from backend.util.settings import Settings
@@ -123,12 +112,18 @@ def get_scheduler():
return get_service_client(Scheduler)
@thread_cached
def get_db():
from backend.executor.database import DatabaseManager
return get_service_client(DatabaseManager)
class NotificationManager(AppService):
"""Service for handling notifications with batching support"""
def __init__(self):
super().__init__()
self.use_db = True
self.rabbitmq_config = create_notification_config()
self.running = True
self.email_sender = EmailSender()
@@ -160,11 +155,9 @@ class NotificationManager(AppService):
processed_count = 0
current_time = datetime.now(tz=timezone.utc)
start_time = current_time - timedelta(days=7)
users = self.run_and_wait(
get_active_user_ids_in_timerange(
end_time=current_time.isoformat(),
start_time=start_time.isoformat(),
)
users = get_db().get_active_user_ids_in_timerange(
end_time=current_time.isoformat(),
start_time=start_time.isoformat(),
)
for user in users:
@@ -194,12 +187,12 @@ class NotificationManager(AppService):
for notification_type in notification_types:
# Get all batches for this notification type
batches = self.run_and_wait(get_all_batches_by_type(notification_type))
batches = get_db().get_all_batches_by_type(notification_type)
for batch in batches:
# Check if batch has aged out
oldest_message = self.run_and_wait(
get_user_notification_oldest_message_in_batch(
oldest_message = (
get_db().get_user_notification_oldest_message_in_batch(
batch.userId, notification_type
)
)
@@ -215,9 +208,7 @@ class NotificationManager(AppService):
# If batch has aged out, process it
if oldest_message.createdAt + max_delay < current_time:
recipient_email = self.run_and_wait(
get_user_email_by_id(batch.userId)
)
recipient_email = get_db().get_user_email_by_id(batch.userId)
if not recipient_email:
logger.error(
@@ -234,15 +225,13 @@ class NotificationManager(AppService):
f"User {batch.userId} does not want to receive {notification_type} notifications"
)
# Clear the batch
self.run_and_wait(
empty_user_notification_batch(
batch.userId, notification_type
)
get_db().empty_user_notification_batch(
batch.userId, notification_type
)
continue
batch_data = self.run_and_wait(
get_user_notification_batch(batch.userId, notification_type)
batch_data = get_db().get_user_notification_batch(
batch.userId, notification_type
)
if not batch_data or not batch_data.notifications:
@@ -250,10 +239,8 @@ class NotificationManager(AppService):
f"Batch data not found for user {batch.userId}"
)
# Clear the batch
self.run_and_wait(
empty_user_notification_batch(
batch.userId, notification_type
)
get_db().empty_user_notification_batch(
batch.userId, notification_type
)
continue
@@ -282,10 +269,8 @@ class NotificationManager(AppService):
)
# Clear the batch
self.run_and_wait(
empty_user_notification_batch(
batch.userId, notification_type
)
get_db().empty_user_notification_batch(
batch.userId, notification_type
)
processed_count += 1
@@ -377,14 +362,16 @@ class NotificationManager(AppService):
self, user_id: str, event_type: NotificationType
) -> bool:
"""Check if a user wants to receive a notification based on their preferences and email verification status"""
validated_email = self.run_and_wait(get_user_email_verification(user_id))
preference = self.run_and_wait(
get_user_notification_preference(user_id)
).preferences.get(event_type, True)
validated_email = get_db().get_user_email_verification(user_id)
preference = (
get_db()
.get_user_notification_preference(user_id)
.preferences.get(event_type, True)
)
# only if both are true, should we email this person
return validated_email and preference
async def _gather_summary_data(
def _gather_summary_data(
self, user_id: str, event_type: NotificationType, params: BaseSummaryParams
) -> BaseSummaryData:
"""Gathers the data to build a summary notification"""
@@ -464,13 +451,13 @@ class NotificationManager(AppService):
else:
raise ValueError("Invalid event type or params")
async def _should_batch(
def _should_batch(
self, user_id: str, event_type: NotificationType, event: NotificationEventModel
) -> bool:
await create_or_add_to_user_notification_batch(user_id, event_type, event)
get_db().create_or_add_to_user_notification_batch(user_id, event_type, event)
oldest_message = await get_user_notification_oldest_message_in_batch(
oldest_message = get_db().get_user_notification_oldest_message_in_batch(
user_id, event_type
)
if not oldest_message:
@@ -527,7 +514,7 @@ class NotificationManager(AppService):
model = parsed.model
logger.debug(f"Processing immediate notification: {model}")
recipient_email = self.run_and_wait(get_user_email_by_id(event.user_id))
recipient_email = get_db().get_user_email_by_id(event.user_id)
if not recipient_email:
logger.error(f"User email not found for user {event.user_id}")
return False
@@ -564,7 +551,7 @@ class NotificationManager(AppService):
model = parsed.model
logger.info(f"Processing batch notification: {model}")
recipient_email = self.run_and_wait(get_user_email_by_id(event.user_id))
recipient_email = get_db().get_user_email_by_id(event.user_id)
if not recipient_email:
logger.error(f"User email not found for user {event.user_id}")
return False
@@ -578,16 +565,12 @@ class NotificationManager(AppService):
)
return True
should_send = self.run_and_wait(
self._should_batch(event.user_id, event.type, model)
)
should_send = self._should_batch(event.user_id, event.type, model)
if not should_send:
logger.info("Batch not old enough to send")
return False
batch = self.run_and_wait(
get_user_notification_batch(event.user_id, event.type)
)
batch = get_db().get_user_notification_batch(event.user_id, event.type)
if not batch or not batch.notifications:
logger.error(f"Batch not found for user {event.user_id}")
return False
@@ -614,7 +597,7 @@ class NotificationManager(AppService):
user_unsub_link=unsub_link,
)
# only empty the batch if we sent the email successfully
self.run_and_wait(empty_user_notification_batch(event.user_id, event.type))
get_db().empty_user_notification_batch(event.user_id, event.type)
return True
except Exception as e:
logger.exception(f"Error processing notification for batch queue: {e}")
@@ -631,7 +614,7 @@ class NotificationManager(AppService):
logger.info(f"Processing summary notification: {model}")
recipient_email = self.run_and_wait(get_user_email_by_id(event.user_id))
recipient_email = get_db().get_user_email_by_id(event.user_id)
if not recipient_email:
logger.error(f"User email not found for user {event.user_id}")
return False
@@ -644,8 +627,8 @@ class NotificationManager(AppService):
)
return True
summary_data = self.run_and_wait(
self._gather_summary_data(event.user_id, event.type, model.data)
summary_data = self._gather_summary_data(
event.user_id, event.type, model.data
)
unsub_link = generate_unsubscribe_link(event.user_id)

View File

@@ -630,7 +630,7 @@ async def stop_graph_run(
async def get_graphs_executions(
user_id: Annotated[str, Depends(get_user_id)],
) -> list[graph_db.GraphExecutionMeta]:
return await graph_db.get_graphs_executions(user_id=user_id)
return await graph_db.get_graph_executions(user_id=user_id)
@v1_router.get(