mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
feat(backend): basic email servicing
This commit is contained in:
@@ -1,10 +1,69 @@
|
||||
import logging
|
||||
import pathlib
|
||||
|
||||
from backend.util.text import TextFormatter
|
||||
from postmarker.core import PostmarkClient
|
||||
from postmarker.models.emails import EmailManager
|
||||
from prisma.enums import NotificationType
|
||||
|
||||
from backend.data.notifications import (
|
||||
NotificationEventModel,
|
||||
NotificationTypeOverride,
|
||||
T_co,
|
||||
)
|
||||
from backend.util.settings import Settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
settings = Settings()
|
||||
|
||||
|
||||
class AsyncEmailSender:
|
||||
def send_email(self, user_id: str, subject: str, body: str):
|
||||
# The following is a workaround to get the type checker to recognize the EmailManager type
|
||||
# This is a temporary solution and should be removed once the Postmark library is updated
|
||||
# to support type annotations.
|
||||
class TypedPostmarkClient(PostmarkClient):
|
||||
emails: EmailManager
|
||||
|
||||
|
||||
class EmailSender:
|
||||
def __init__(self):
|
||||
self.postmark = TypedPostmarkClient(
|
||||
server_token=settings.secrets.postmark_server_api_token
|
||||
)
|
||||
self.formatter = TextFormatter()
|
||||
|
||||
def send_templated(
|
||||
self,
|
||||
notification: NotificationType,
|
||||
user_id: str,
|
||||
data: NotificationEventModel[T_co] | list[NotificationEventModel[T_co]],
|
||||
):
|
||||
body = self._get_template(notification)
|
||||
# use the jinja2 library to render the template
|
||||
body = self.formatter.format_string(body, data)
|
||||
logger.info(
|
||||
f"Sending email to {user_id} with subject {"subject"} and body {body}"
|
||||
)
|
||||
self._send_email(user_id, "subject", body)
|
||||
|
||||
def _get_template(self, notification: NotificationType):
|
||||
# convert the notification type to a notification type override
|
||||
notification_type_override = NotificationTypeOverride(notification)
|
||||
# find the template in templates/name.html (the .template returns with the .html)
|
||||
template_path = f"templates/{notification_type_override.template}.jinja2"
|
||||
logger.info(
|
||||
f"Template full path: {pathlib.Path(__file__).parent / template_path}"
|
||||
)
|
||||
with open(pathlib.Path(__file__).parent / template_path, "r") as file:
|
||||
template = file.read()
|
||||
return template
|
||||
|
||||
def _send_email(self, user_id: str, subject: str, body: str):
|
||||
logger.info(
|
||||
f"Sending email to {user_id} with subject {subject} and body {body}"
|
||||
)
|
||||
self.postmark.emails.send(
|
||||
From="nicholas.tindle@agpt.co",
|
||||
To="nicholas.tindle@agpt.co",
|
||||
Subject=subject,
|
||||
HtmlBody=body,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user