feat(backend): Notification Integration for credits system (#9488)

<!-- Clearly explain the need for these changes: -->

### Changes 🏗️

Add email notifications on refund events.

### Checklist 📋

#### For code changes:
- [ ] I have clearly listed my changes in the PR description
- [ ] I have made a test plan
- [ ] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [ ] ...

<details>
  <summary>Example test plan</summary>
  
  - [ ] Create from scratch and execute an agent with at least 3 blocks
- [ ] Import an agent from file upload, and confirm it executes
correctly
  - [ ] Upload agent to marketplace
- [ ] Import an agent from marketplace and confirm it executes correctly
  - [ ] Edit an agent from monitor, and confirm it executes correctly
</details>

#### For configuration changes:
- [ ] `.env.example` is updated or already compatible with my changes
- [ ] `docker-compose.yml` is updated or already compatible with my
changes
- [ ] I have included a list of my configuration changes in the PR
description (under **Changes**)

<details>
  <summary>Examples of configuration changes</summary>

  - Changing ports
  - Adding new services that need to communicate with each other
  - Secrets or environment variable changes
  - New or infrastructure changes such as databases
</details>
This commit is contained in:
Zamil Majdy
2025-02-20 19:19:58 +07:00
committed by GitHub
parent 4ae016606b
commit a692eedb1c
10 changed files with 258 additions and 78 deletions

View File

@@ -1,11 +1,17 @@
import asyncio
import logging
from abc import ABC, abstractmethod
from collections import defaultdict
from datetime import datetime, timezone
import stripe
from autogpt_libs.utils.cache import thread_cached
from prisma import Json
from prisma.enums import CreditRefundRequestStatus, CreditTransactionType
from prisma.enums import (
CreditRefundRequestStatus,
CreditTransactionType,
NotificationType,
)
from prisma.errors import UniqueViolationError
from prisma.models import CreditRefundRequest, CreditTransaction, User
from prisma.types import CreditTransactionCreateInput, CreditTransactionWhereInput
@@ -23,7 +29,10 @@ from backend.data.model import (
TransactionHistory,
UserTransaction,
)
from backend.data.notifications import NotificationEventDTO, RefundRequestData
from backend.data.user import get_user_by_id
from backend.notifications import NotificationManager
from backend.util.service import get_service_client
from backend.util.settings import Settings
settings = Settings()
@@ -338,6 +347,26 @@ class UsageTransactionMetadata(BaseModel):
class UserCredit(UserCreditBase):
@thread_cached
def notification_client(self) -> NotificationManager:
return get_service_client(NotificationManager)
async def _send_refund_notification(
self,
notification_request: RefundRequestData,
notification_type: NotificationType,
):
await asyncio.to_thread(
lambda: self.notification_client().queue_notification(
NotificationEventDTO(
recipient_email=settings.config.refund_notification_email,
user_id=notification_request.user_id,
type=notification_type,
data=notification_request.model_dump(),
)
)
)
def _block_usage_cost(
self,
block: Block,
@@ -457,10 +486,11 @@ class UserCredit(UserCreditBase):
)
balance = await self.get_credits(user_id)
amount = transaction.amount
refund_key = f"{transaction.createdAt.strftime('%Y-%W')}-{user_id}"
refund_key_format = settings.config.refund_request_time_key_format
refund_key = f"{transaction.createdAt.strftime(refund_key_format)}-{user_id}"
try:
await CreditRefundRequest.prisma().create(
refund_request = await CreditRefundRequest.prisma().create(
data={
"id": refund_key,
"transactionKey": transaction_key,
@@ -477,7 +507,20 @@ class UserCredit(UserCreditBase):
)
if amount - balance > settings.config.refund_credit_tolerance_threshold:
# TODO: add a notification for the platform administrator.
user_data = await get_user_by_id(user_id)
await self._send_refund_notification(
RefundRequestData(
user_id=user_id,
user_name=user_data.name or "AutoGPT Platform User",
user_email=user_data.email,
transaction_id=transaction_key,
refund_request_id=refund_request.id,
reason=refund_request.reason,
amount=amount,
balance=balance,
),
NotificationType.REFUND_REQUEST,
)
return 0 # Register the refund request for manual approval.
# Auto refund the top-up.
@@ -509,7 +552,7 @@ class UserCredit(UserCreditBase):
f"Invalid amount to deduct ${request.amount/100} from ${transaction.amount/100} top-up"
)
await self._add_transaction(
balance, _ = await self._add_transaction(
user_id=transaction.userId,
amount=-request.amount,
transaction_type=CreditTransactionType.REFUND,
@@ -531,6 +574,21 @@ class UserCredit(UserCreditBase):
},
)
user_data = await get_user_by_id(transaction.userId)
await self._send_refund_notification(
RefundRequestData(
user_id=user_data.id,
user_name=user_data.name or "AutoGPT Platform User",
user_email=user_data.email,
transaction_id=transaction.transactionKey,
refund_request_id=request.id,
reason=str(request.reason or "-"),
amount=transaction.amount,
balance=balance,
),
NotificationType.REFUND_PROCESSED,
)
async def handle_dispute(self, dispute: stripe.Dispute):
transaction = await CreditTransaction.prisma().find_first_or_raise(
where={

View File

@@ -100,6 +100,17 @@ class MonthlySummaryData(BaseSummaryData):
year: int
class RefundRequestData(BaseNotificationData):
user_id: str
user_name: str
user_email: str
transaction_id: str
refund_request_id: str
reason: str
amount: float
balance: int
NotificationData = Annotated[
Union[
AgentRunData,
@@ -118,6 +129,8 @@ class NotificationEventDTO(BaseModel):
type: NotificationType
data: dict
created_at: datetime = Field(default_factory=datetime.now)
recipient_email: Optional[str] = None
retry_count: int = 0
class NotificationEventModel(BaseModel, Generic[T_co]):
@@ -153,6 +166,8 @@ def get_data_type(
NotificationType.DAILY_SUMMARY: DailySummaryData,
NotificationType.WEEKLY_SUMMARY: WeeklySummaryData,
NotificationType.MONTHLY_SUMMARY: MonthlySummaryData,
NotificationType.REFUND_REQUEST: RefundRequestData,
NotificationType.REFUND_PROCESSED: RefundRequestData,
}[notification_type]
@@ -186,6 +201,8 @@ class NotificationTypeOverride:
NotificationType.DAILY_SUMMARY: BatchingStrategy.IMMEDIATE,
NotificationType.WEEKLY_SUMMARY: BatchingStrategy.IMMEDIATE,
NotificationType.MONTHLY_SUMMARY: BatchingStrategy.IMMEDIATE,
NotificationType.REFUND_REQUEST: BatchingStrategy.IMMEDIATE,
NotificationType.REFUND_PROCESSED: BatchingStrategy.IMMEDIATE,
}
return BATCHING_RULES.get(self.notification_type, BatchingStrategy.HOURLY)
@@ -201,6 +218,8 @@ class NotificationTypeOverride:
NotificationType.DAILY_SUMMARY: "daily_summary.html",
NotificationType.WEEKLY_SUMMARY: "weekly_summary.html",
NotificationType.MONTHLY_SUMMARY: "monthly_summary.html",
NotificationType.REFUND_REQUEST: "refund_request.html",
NotificationType.REFUND_PROCESSED: "refund_processed.html",
}[self.notification_type]
@property
@@ -214,6 +233,8 @@ class NotificationTypeOverride:
NotificationType.DAILY_SUMMARY: "Here's your daily summary!",
NotificationType.WEEKLY_SUMMARY: "Look at all the cool stuff you did last week!",
NotificationType.MONTHLY_SUMMARY: "We did a lot this month!",
NotificationType.REFUND_REQUEST: "[ACTION REQUIRED] You got a ${{data.amount / 100}} refund request from {{data.user_name}}",
NotificationType.REFUND_PROCESSED: "Refund for ${{data.amount / 100}} to {{data.user_name}} has been processed",
}[self.notification_type]

View File

@@ -50,10 +50,10 @@ async def get_user_by_id(user_id: str) -> User:
return User.model_validate(user)
async def get_user_email_by_id(user_id: str) -> str:
async def get_user_email_by_id(user_id: str) -> Optional[str]:
try:
user = await prisma.user.find_unique_or_raise(where={"id": user_id})
return user.email
user = await prisma.user.find_unique(where={"id": user_id})
return user.email if user else None
except Exception as e:
raise DatabaseError(f"Failed to get user email for user {user_id}: {e}") from e

View File

@@ -8,7 +8,6 @@ from backend.data.execution import (
RedisExecutionEventBus,
create_graph_execution,
get_execution_results,
get_executions_in_timerange,
get_incomplete_executions,
get_latest_execution,
update_execution_status,
@@ -18,20 +17,9 @@ from backend.data.execution import (
upsert_execution_output,
)
from backend.data.graph import get_graph, get_node
from backend.data.notifications import (
create_or_add_to_user_notification_batch,
empty_user_notification_batch,
get_user_notification_batch,
get_user_notification_last_message_in_batch,
)
from backend.data.user import (
get_active_user_ids_in_timerange,
get_active_users_ids,
get_user_by_id,
get_user_email_by_id,
get_user_integrations,
get_user_metadata,
get_user_notification_preference,
update_user_integrations,
update_user_metadata,
)
@@ -84,7 +72,6 @@ class DatabaseManager(AppService):
update_node_execution_stats = exposed_run_and_wait(update_node_execution_stats)
upsert_execution_input = exposed_run_and_wait(upsert_execution_input)
upsert_execution_output = exposed_run_and_wait(upsert_execution_output)
get_executions_in_timerange = exposed_run_and_wait(get_executions_in_timerange)
# Graphs
get_node = exposed_run_and_wait(get_node)
@@ -97,27 +84,8 @@ class DatabaseManager(AppService):
exposed_run_and_wait(user_credit_model.spend_credits),
)
# User + User Metadata + User Integrations + User Notification Preferences
# User + User Metadata + User Integrations
get_user_metadata = exposed_run_and_wait(get_user_metadata)
update_user_metadata = exposed_run_and_wait(update_user_metadata)
get_user_integrations = exposed_run_and_wait(get_user_integrations)
update_user_integrations = exposed_run_and_wait(update_user_integrations)
get_active_user_ids_in_timerange = exposed_run_and_wait(
get_active_user_ids_in_timerange
)
get_user_by_id = exposed_run_and_wait(get_user_by_id)
get_user_email_by_id = exposed_run_and_wait(get_user_email_by_id)
get_user_notification_preference = exposed_run_and_wait(
get_user_notification_preference
)
get_active_users_ids = exposed_run_and_wait(get_active_users_ids)
# Notifications
create_or_add_to_user_notification_batch = exposed_run_and_wait(
create_or_add_to_user_notification_batch
)
get_user_notification_last_message_in_batch = exposed_run_and_wait(
get_user_notification_last_message_in_batch
)
empty_user_notification_batch = exposed_run_and_wait(empty_user_notification_batch)
get_user_notification_batch = exposed_run_and_wait(get_user_notification_batch)

View File

@@ -1,10 +1,9 @@
import logging
import time
from typing import TYPE_CHECKING, Any, Callable, Coroutine
from typing import Callable
import aio_pika
from aio_pika.exceptions import QueueEmpty
from autogpt_libs.utils.cache import thread_cached
from prisma.enums import NotificationType
from pydantic import BaseModel
@@ -16,14 +15,11 @@ from backend.data.notifications import (
get_data_type,
)
from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig
from backend.executor.database import DatabaseManager
from backend.data.user import get_user_email_by_id, get_user_notification_preference
from backend.notifications.email import EmailSender
from backend.util.service import AppService, expose, get_service_client
from backend.util.service import AppService, expose
from backend.util.settings import Settings
if TYPE_CHECKING:
from backend.executor import DatabaseManager
logger = logging.getLogger(__name__)
settings = Settings()
@@ -93,15 +89,15 @@ class NotificationManager(AppService):
def queue_notification(self, event: NotificationEventDTO) -> NotificationResult:
"""Queue a notification - exposed method for other services to call"""
try:
logger.info(f"Recieved Request to queue {event=}")
# Workaround for not being able to seralize generics over the expose bus
logger.info(f"Received Request to queue {event=}")
# Workaround for not being able to serialize generics over the expose bus
parsed_event = NotificationEventModel[
get_data_type(event.type)
].model_validate(event.model_dump())
routing_key = self.get_routing_key(parsed_event)
message = parsed_event.model_dump_json()
logger.info(f"Recieved Request to queue {message=}")
logger.info(f"Received Request to queue {message=}")
exchange = "notifications"
@@ -118,21 +114,19 @@ class NotificationManager(AppService):
return NotificationResult(
success=True,
message=(f"Notification queued with routing key: {routing_key}"),
message=f"Notification queued with routing key: {routing_key}",
)
except Exception as e:
logger.error(f"Error queueing notification: {e}")
logger.exception(f"Error queueing notification: {e}")
return NotificationResult(success=False, message=str(e))
async def _should_email_user_based_on_preference(
def _should_email_user_based_on_preference(
self, user_id: str, event_type: NotificationType
) -> bool:
return (
get_db_client()
.get_user_notification_preference(user_id)
.preferences[event_type]
)
return self.run_and_wait(
get_user_notification_preference(user_id)
).preferences.get(event_type, True)
def _parse_message(self, message: str) -> NotificationEvent | None:
try:
@@ -145,7 +139,7 @@ class NotificationManager(AppService):
logger.error(f"Error parsing message due to non matching schema {e}")
return None
async def _process_immediate(self, message: str) -> bool:
def _process_immediate(self, message: str) -> bool:
"""Process a single notification immediately, returning whether to put into the failed queue"""
try:
parsed = self._parse_message(message)
@@ -153,36 +147,42 @@ class NotificationManager(AppService):
return False
event = parsed.event
model = parsed.model
user_email = get_db_client().get_user_email_by_id(event.user_id)
should_send = await self._should_email_user_based_on_preference(
event.user_id, event.type
)
if not user_email:
if event.recipient_email:
recipient_email = event.recipient_email
else:
recipient_email = self.run_and_wait(get_user_email_by_id(event.user_id))
if not recipient_email:
logger.error(f"User email not found for user {event.user_id}")
return False
should_send = self._should_email_user_based_on_preference(
event.user_id, event.type
)
if not should_send:
logger.debug(
f"User {event.user_id} does not want to receive {event.type} notifications"
)
return True
self.email_sender.send_templated(event.type, user_email, model)
self.email_sender.send_templated(event.type, recipient_email, model)
logger.info(f"Processing notification: {model}")
return True
except Exception as e:
logger.error(f"Error processing notification: {e}")
logger.exception(f"Error processing notification: {e}")
return False
def _run_queue(
self,
queue: aio_pika.abc.AbstractQueue,
process_func: Callable[[str], Coroutine[Any, Any, bool]],
process_func: Callable[[str], bool],
error_queue_name: str,
):
message: aio_pika.abc.AbstractMessage | None = None
try:
# This parameter "no_ack" is named like shit, think of it as "auto_ack"
message = self.run_and_wait(queue.get(timeout=1.0, no_ack=False))
result = self.run_and_wait(process_func(message.body.decode()))
result = process_func(message.body.decode())
if result:
self.run_and_wait(message.ack())
else:
@@ -230,12 +230,3 @@ class NotificationManager(AppService):
"""Cleanup service resources"""
self.running = False
super().cleanup()
# ------- UTILITIES ------- #
@thread_cached
def get_db_client() -> "DatabaseManager":
from backend.executor import DatabaseManager
return get_service_client(DatabaseManager)

View File

@@ -0,0 +1,51 @@
{# Refund Processed Notification Email Template #}
{#
Template variables:
data.user_id: the ID of the user
data.user_name: the user's name
data.user_email: the user's email address
data.transaction_id: the transaction ID for the refund request
data.refund_request_id: the refund request ID
data.reason: the reason for the refund request
data.amount: the refund amount in cents (divide by 100 for dollars)
data.balance: the user's latest balance in cents (after the refund deduction)
Subject: Refund for ${{ data.amount / 100 }} to {{ data.user_name }} has been processed
#}
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Refund Processed Notification</title>
</head>
<body style="font-family: 'Poppins', sans-serif; color: #070629; font-size: 16px; line-height: 1.65; margin: 0; padding: 20px;">
<p style="margin-bottom: 10px;">Hello Administrator,</p>
<p style="margin-bottom: 10px;">
This is to notify you that the refund for <strong>${{ data.amount / 100 }}</strong> to <strong>{{ data.user_name }}</strong> has been processed successfully.
</p>
<h2 style="margin-bottom: 10px;">Refund Details</h2>
<ul style="margin-bottom: 10px;">
<li><strong>User ID:</strong> {{ data.user_id }}</li>
<li><strong>User Name:</strong> {{ data.user_name }}</li>
<li><strong>User Email:</strong> {{ data.user_email }}</li>
<li><strong>Transaction ID:</strong> {{ data.transaction_id }}</li>
<li><strong>Refund Request ID:</strong> {{ data.refund_request_id }}</li>
<li><strong>Refund Amount:</strong> ${{ data.amount / 100 }}</li>
<li><strong>Reason for Refund:</strong> {{ data.reason }}</li>
<li><strong>Latest User Balance:</strong> ${{ data.balance / 100 }}</li>
</ul>
<p style="margin-bottom: 10px;">
The user's balance has been updated accordingly after the deduction.
</p>
<p style="margin-bottom: 10px;">
Please contact the support team if you have any questions or need further assistance regarding this refund.
</p>
<p style="margin-bottom: 0;">Best regards,<br>Your Notification System</p>
</body>
</html>

View File

@@ -0,0 +1,72 @@
{# Refund Request Email Template #}
{#
Template variables:
data.user_id: the ID of the user
data.user_name: the user's name
data.user_email: the user's email address
data.transaction_id: the transaction ID for the refund request
data.refund_request_id: the refund request ID
data.reason: the reason for the refund request
data.amount: the refund amount in cents (divide by 100 for dollars)
data.balance: the user's balance in cents (divide by 100 for dollars)
Subject: [ACTION REQUIRED] You got a ${{ data.amount / 100 }} refund request from {{ data.user_name }}
#}
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Refund Request Approval Needed</title>
</head>
<body style="font-family: 'Poppins', sans-serif; color: #070629; font-size: 16px; line-height: 1.65; margin: 0; padding: 20px;">
<p style="margin-bottom: 10px;">Hello Administrator,</p>
<p style="margin-bottom: 10px;">
A refund request has been submitted by a user and requires your approval.
</p>
<h2 style="margin-bottom: 10px;">Refund Request Details</h2>
<ul style="margin-bottom: 10px;">
<li><strong>User ID:</strong> {{ data.user_id }}</li>
<li><strong>User Name:</strong> {{ data.user_name }}</li>
<li><strong>User Email:</strong> {{ data.user_email }}</li>
<li><strong>Transaction ID:</strong> {{ data.transaction_id }}</li>
<li><strong>Refund Request ID:</strong> {{ data.refund_request_id }}</li>
<li><strong>Refund Amount:</strong> ${{ data.amount / 100 }}</li>
<li><strong>User Balance:</strong> ${{ data.balance / 100 }}</li>
<li><strong>Reason for Refund:</strong> {{ data.reason }}</li>
</ul>
<p style="margin-bottom: 10px;">
To approve this refund, please click on the following Stripe link:
https://dashboard.stripe.com/test/payments/{{data.transaction_id}}
<br/>
And then click on the "Refund" button.
</p>
<p style="margin-bottom: 10px;">
To reject this refund, please follow these steps:
</p>
<ol style="margin-bottom: 10px;">
<li>
Visit the Supabase Dashboard:
https://supabase.com/dashboard/project/bgwpwdsxblryihinutbx/editor
</li>
<li>
Navigate to the <strong>RefundRequest</strong> table.
</li>
<li>
Filter the <code>transactionKey</code> column with the Transaction ID: <strong>{{ data.transaction_id }}</strong>.
</li>
<li>
Update the <code>status</code> field to <strong>REJECTED</strong> and enter the rejection reason in the <code>result</code> column.
</li>
</ol>
<p style="margin-bottom: 10px;">
Please take the necessary action at your earliest convenience.
</p>
<p style="margin-bottom: 10px;">Thank you for your prompt attention.</p>
<p style="margin-bottom: 0;">Best regards,<br>Your Notification System</p>
</body>
</html>

View File

@@ -97,6 +97,14 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
default=500,
description="Maximum number of credits above the balance to be auto-approved.",
)
refund_notification_email: str = Field(
default="refund@agpt.co",
description="Email address to send refund notifications to.",
)
refund_request_time_key_format: str = Field(
default="%Y-%W", # This will allow for weekly refunds per user.
description="Time key format for refund requests.",
)
model_config = SettingsConfigDict(
env_file=".env",

View File

@@ -0,0 +1,9 @@
-- AlterEnum
-- This migration adds more than one value to an enum.
-- With PostgreSQL versions 11 and earlier, this is not possible
-- in a single migration. This can be worked around by creating
-- multiple migrations, each migration adding only one value to
-- the enum.
ALTER TYPE "NotificationType" ADD VALUE 'REFUND_REQUEST';
ALTER TYPE "NotificationType" ADD VALUE 'REFUND_PROCESSED';

View File

@@ -133,6 +133,8 @@ enum NotificationType {
DAILY_SUMMARY
WEEKLY_SUMMARY
MONTHLY_SUMMARY
REFUND_REQUEST
REFUND_PROCESSED
}
model NotificationEvent {