feat(backend): batching operations for queues + agent run become batched (#9586)

<!-- Clearly explain the need for these changes: -->
We don't want to spam the user with similar notification types so we
want to group them up over a timespan and handle that as a group of
notifications.
### Changes 🏗️
- Adds a batch queue
- Moves the ExecutionScheduleur to a generic Scheduler
- Makes the Agent run a batch operation
- Fixes various bugs in how we originally made the batch db models and
queries
<!-- Concisely describe all of the changes made in this pull request:
-->

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [x] Run 10 agents back to back
  - [x] Notice how many emails you get
- [x] Wait a bit and you should after an hour (change the cron rule to
speed up testing this) you'll get an email and see all the batches in
your db are empty

---------

Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
This commit is contained in:
Nicholas Tindle
2025-03-10 13:10:57 -05:00
committed by GitHub
parent b2e94611e8
commit 08cd935a47
11 changed files with 553 additions and 143 deletions

View File

@@ -32,7 +32,7 @@ def main(**kwargs):
Run all the processes required for the AutoGPT-server (REST and WebSocket APIs).
"""
from backend.executor import DatabaseManager, ExecutionManager, ExecutionScheduler
from backend.executor import DatabaseManager, ExecutionManager, Scheduler
from backend.notifications import NotificationManager
from backend.server.rest_api import AgentServer
from backend.server.ws_api import WebsocketServer
@@ -40,7 +40,7 @@ def main(**kwargs):
run_processes(
DatabaseManager(),
ExecutionManager(),
ExecutionScheduler(),
Scheduler(),
NotificationManager(),
WebsocketServer(),
AgentServer(),

View File

@@ -23,8 +23,8 @@ T_co = TypeVar("T_co", bound="BaseNotificationData", covariant=True)
class QueueType(Enum):
IMMEDIATE = "immediate" # Send right away (errors, critical notifications)
HOURLY = "hourly" # Batch for up to an hour (usage reports)
DAILY = "daily" # Daily digest (summary notifications)
BATCH = "batch" # Batch for up to an hour (usage reports)
SUMMARY = "summary" # Daily digest (summary notifications)
BACKOFF = "backoff" # Backoff strategy (exponential backoff)
ADMIN = "admin" # Admin notifications (errors, critical notifications)
@@ -196,15 +196,15 @@ class NotificationTypeOverride:
def strategy(self) -> QueueType:
BATCHING_RULES = {
# These are batched by the notification service
NotificationType.AGENT_RUN: QueueType.IMMEDIATE,
NotificationType.AGENT_RUN: QueueType.BATCH,
# These are batched by the notification service, but with a backoff strategy
NotificationType.ZERO_BALANCE: QueueType.BACKOFF,
NotificationType.LOW_BALANCE: QueueType.IMMEDIATE,
NotificationType.BLOCK_EXECUTION_FAILED: QueueType.BACKOFF,
NotificationType.CONTINUOUS_AGENT_ERROR: QueueType.BACKOFF,
NotificationType.DAILY_SUMMARY: QueueType.DAILY,
NotificationType.WEEKLY_SUMMARY: QueueType.DAILY,
NotificationType.MONTHLY_SUMMARY: QueueType.DAILY,
NotificationType.DAILY_SUMMARY: QueueType.SUMMARY,
NotificationType.WEEKLY_SUMMARY: QueueType.SUMMARY,
NotificationType.MONTHLY_SUMMARY: QueueType.SUMMARY,
NotificationType.REFUND_REQUEST: QueueType.ADMIN,
NotificationType.REFUND_PROCESSED: QueueType.ADMIN,
}
@@ -263,7 +263,7 @@ class NotificationPreference(BaseModel):
def get_batch_delay(notification_type: NotificationType) -> timedelta:
return {
NotificationType.AGENT_RUN: timedelta(seconds=1),
NotificationType.AGENT_RUN: timedelta(minutes=1),
NotificationType.ZERO_BALANCE: timedelta(minutes=60),
NotificationType.LOW_BALANCE: timedelta(minutes=60),
NotificationType.BLOCK_EXECUTION_FAILED: timedelta(minutes=60),
@@ -274,19 +274,15 @@ def get_batch_delay(notification_type: NotificationType) -> timedelta:
async def create_or_add_to_user_notification_batch(
user_id: str,
notification_type: NotificationType,
data: str, # type: 'NotificationEventModel'
) -> dict:
notification_data: NotificationEventModel,
) -> UserNotificationBatch:
try:
logger.info(
f"Creating or adding to notification batch for {user_id} with type {notification_type} and data {data}"
f"Creating or adding to notification batch for {user_id} with type {notification_type} and data {notification_data}"
)
notification_data = NotificationEventModel[
get_data_type(notification_type)
].model_validate_json(data)
# Serialize the data
json_data: Json = Json(notification_data.data.model_dump_json())
json_data: Json = Json(notification_data.data.model_dump())
# First try to find existing batch
existing_batch = await UserNotificationBatch.prisma().find_unique(
@@ -317,7 +313,7 @@ async def create_or_add_to_user_notification_batch(
},
include={"notifications": True},
)
return resp.model_dump()
return resp
else:
async with transaction() as tx:
notification_event = await tx.notificationevent.create(
@@ -339,27 +335,28 @@ async def create_or_add_to_user_notification_batch(
raise DatabaseError(
f"Failed to add notification event {notification_event.id} to existing batch {existing_batch.id}"
)
return resp.model_dump()
return resp
except Exception as e:
raise DatabaseError(
f"Failed to create or add to notification batch for user {user_id} and type {notification_type}: {e}"
) from e
async def get_user_notification_last_message_in_batch(
async def get_user_notification_oldest_message_in_batch(
user_id: str,
notification_type: NotificationType,
) -> NotificationEvent | None:
try:
batch = await UserNotificationBatch.prisma().find_first(
where={"userId": user_id, "type": notification_type},
order={"createdAt": "desc"},
include={"notifications": True},
)
if not batch:
return None
if not batch.notifications:
return None
return batch.notifications[-1]
sorted_notifications = sorted(batch.notifications, key=lambda x: x.createdAt)
return sorted_notifications[0]
except Exception as e:
raise DatabaseError(
f"Failed to get user notification last message in batch for user {user_id} and type {notification_type}: {e}"
@@ -404,3 +401,22 @@ async def get_user_notification_batch(
raise DatabaseError(
f"Failed to get user notification batch for user {user_id} and type {notification_type}: {e}"
) from e
async def get_all_batches_by_type(
notification_type: NotificationType,
) -> list[UserNotificationBatch]:
try:
return await UserNotificationBatch.prisma().find_many(
where={
"type": notification_type,
"notifications": {
"some": {} # Only return batches with at least one notification
},
},
include={"notifications": True},
)
except Exception as e:
raise DatabaseError(
f"Failed to get all batches by type {notification_type}: {e}"
) from e

View File

@@ -1,9 +1,9 @@
from .database import DatabaseManager
from .manager import ExecutionManager
from .scheduler import ExecutionScheduler
from .scheduler import Scheduler
__all__ = [
"DatabaseManager",
"ExecutionManager",
"ExecutionScheduler",
"Scheduler",
]

View File

@@ -1,5 +1,6 @@
import logging
import os
from enum import Enum
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
@@ -9,11 +10,13 @@ from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from autogpt_libs.utils.cache import thread_cached
from dotenv import load_dotenv
from prisma.enums import NotificationType
from pydantic import BaseModel
from sqlalchemy import MetaData, create_engine
from backend.data.block import BlockInput
from backend.executor.manager import ExecutionManager
from backend.notifications.notifications import NotificationManager
from backend.util.service import AppService, expose, get_service_client
from backend.util.settings import Config
@@ -42,7 +45,7 @@ config = Config()
def log(msg, **kwargs):
logger.info("[ExecutionScheduler] " + msg, **kwargs)
logger.info("[Scheduler] " + msg, **kwargs)
def job_listener(event):
@@ -58,8 +61,15 @@ def get_execution_client() -> ExecutionManager:
return get_service_client(ExecutionManager)
@thread_cached
def get_notification_client():
from backend.notifications import NotificationManager
return get_service_client(NotificationManager)
def execute_graph(**kwargs):
args = JobArgs(**kwargs)
args = ExecutionJobArgs(**kwargs)
try:
log(f"Executing recurring job for graph #{args.graph_id}")
get_execution_client().add_execution(
@@ -72,7 +82,23 @@ def execute_graph(**kwargs):
logger.exception(f"Error executing graph {args.graph_id}: {e}")
class JobArgs(BaseModel):
def process_existing_batches(**kwargs):
args = NotificationJobArgs(**kwargs)
try:
log(
f"Processing existing batches for notification type {args.notification_types}"
)
get_notification_client().process_existing_batches(args.notification_types)
except Exception as e:
logger.exception(f"Error processing existing batches: {e}")
class Jobstores(Enum):
EXECUTION = "execution"
BATCHED_NOTIFICATIONS = "batched_notifications"
class ExecutionJobArgs(BaseModel):
graph_id: str
input_data: BlockInput
user_id: str
@@ -80,14 +106,14 @@ class JobArgs(BaseModel):
cron: str
class JobInfo(JobArgs):
class ExecutionJobInfo(ExecutionJobArgs):
id: str
name: str
next_run_time: str
@staticmethod
def from_db(job_args: JobArgs, job_obj: JobObj) -> "JobInfo":
return JobInfo(
def from_db(job_args: ExecutionJobArgs, job_obj: JobObj) -> "ExecutionJobInfo":
return ExecutionJobInfo(
id=job_obj.id,
name=job_obj.name,
next_run_time=job_obj.next_run_time.isoformat(),
@@ -95,7 +121,29 @@ class JobInfo(JobArgs):
)
class ExecutionScheduler(AppService):
class NotificationJobArgs(BaseModel):
notification_types: list[NotificationType]
cron: str
class NotificationJobInfo(NotificationJobArgs):
id: str
name: str
next_run_time: str
@staticmethod
def from_db(
job_args: NotificationJobArgs, job_obj: JobObj
) -> "NotificationJobInfo":
return NotificationJobInfo(
id=job_obj.id,
name=job_obj.name,
next_run_time=job_obj.next_run_time.isoformat(),
**job_args.model_dump(),
)
class Scheduler(AppService):
scheduler: BlockingScheduler
@classmethod
@@ -111,19 +159,36 @@ class ExecutionScheduler(AppService):
def execution_client(self) -> ExecutionManager:
return get_service_client(ExecutionManager)
@property
@thread_cached
def notification_client(self) -> NotificationManager:
return get_service_client(NotificationManager)
def run_service(self):
load_dotenv()
db_schema, db_url = _extract_schema_from_url(os.getenv("DATABASE_URL"))
self.scheduler = BlockingScheduler(
jobstores={
"default": SQLAlchemyJobStore(
Jobstores.EXECUTION.value: SQLAlchemyJobStore(
engine=create_engine(
url=db_url,
pool_size=self.db_pool_size(),
max_overflow=0,
),
metadata=MetaData(schema=db_schema),
)
# this one is pre-existing so it keeps the
# default table name.
tablename="apscheduler_jobs",
),
Jobstores.BATCHED_NOTIFICATIONS.value: SQLAlchemyJobStore(
engine=create_engine(
url=db_url,
pool_size=self.db_pool_size(),
max_overflow=0,
),
metadata=MetaData(schema=db_schema),
tablename="apscheduler_jobs_batched_notifications",
),
}
)
self.scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
@@ -137,8 +202,8 @@ class ExecutionScheduler(AppService):
cron: str,
input_data: BlockInput,
user_id: str,
) -> JobInfo:
job_args = JobArgs(
) -> ExecutionJobInfo:
job_args = ExecutionJobArgs(
graph_id=graph_id,
input_data=input_data,
user_id=user_id,
@@ -150,37 +215,59 @@ class ExecutionScheduler(AppService):
CronTrigger.from_crontab(cron),
kwargs=job_args.model_dump(),
replace_existing=True,
jobstore=Jobstores.EXECUTION.value,
)
log(f"Added job {job.id} with cron schedule '{cron}' input data: {input_data}")
return JobInfo.from_db(job_args, job)
return ExecutionJobInfo.from_db(job_args, job)
@expose
def delete_schedule(self, schedule_id: str, user_id: str) -> JobInfo:
job = self.scheduler.get_job(schedule_id)
def delete_schedule(self, schedule_id: str, user_id: str) -> ExecutionJobInfo:
job = self.scheduler.get_job(schedule_id, jobstore=Jobstores.EXECUTION.value)
if not job:
log(f"Job {schedule_id} not found.")
raise ValueError(f"Job #{schedule_id} not found.")
job_args = JobArgs(**job.kwargs)
job_args = ExecutionJobArgs(**job.kwargs)
if job_args.user_id != user_id:
raise ValueError("User ID does not match the job's user ID.")
log(f"Deleting job {schedule_id}")
job.remove()
return JobInfo.from_db(job_args, job)
return ExecutionJobInfo.from_db(job_args, job)
@expose
def get_execution_schedules(
self, graph_id: str | None = None, user_id: str | None = None
) -> list[JobInfo]:
) -> list[ExecutionJobInfo]:
schedules = []
for job in self.scheduler.get_jobs():
job_args = JobArgs(**job.kwargs)
for job in self.scheduler.get_jobs(jobstore=Jobstores.EXECUTION.value):
job_args = ExecutionJobArgs(**job.kwargs)
if (
job.next_run_time is not None
and (graph_id is None or job_args.graph_id == graph_id)
and (user_id is None or job_args.user_id == user_id)
):
schedules.append(JobInfo.from_db(job_args, job))
schedules.append(ExecutionJobInfo.from_db(job_args, job))
return schedules
@expose
def add_batched_notification_schedule(
self,
notification_types: list[NotificationType],
data: dict,
cron: str,
) -> NotificationJobInfo:
job_args = NotificationJobArgs(
notification_types=notification_types,
cron=cron,
)
job = self.scheduler.add_job(
process_existing_batches,
CronTrigger.from_crontab(cron),
kwargs=job_args.model_dump(),
replace_existing=True,
jobstore=Jobstores.BATCHED_NOTIFICATIONS.value,
)
log(f"Added job {job.id} with cron schedule '{cron}' input data: {data}")
return NotificationJobInfo.from_db(job_args, job)

View File

@@ -60,15 +60,21 @@ class EmailSender:
base_url = (
settings.config.frontend_base_url or settings.config.platform_base_url
)
# Handle the case when data is a list
template_data = data
if isinstance(data, list):
# Create a dictionary with a 'notifications' key containing the list
template_data = {"notifications": data}
try:
subject, full_message = self.formatter.format_email(
base_template=template.base_template,
subject_template=template.subject_template,
content_template=template.body_template,
data=data,
data=template_data,
unsubscribe_link=f"{base_url}/profile/settings",
)
except Exception as e:
logger.error(f"Error formatting full message: {e}")
raise e

View File

@@ -1,9 +1,11 @@
import logging
import time
from datetime import datetime, timezone
from typing import Callable
import aio_pika
from aio_pika.exceptions import QueueEmpty
from autogpt_libs.utils.cache import thread_cached
from prisma.enums import NotificationType
from pydantic import BaseModel
@@ -12,7 +14,13 @@ from backend.data.notifications import (
NotificationEventModel,
NotificationResult,
QueueType,
create_or_add_to_user_notification_batch,
empty_user_notification_batch,
get_all_batches_by_type,
get_batch_delay,
get_data_type,
get_user_notification_batch,
get_user_notification_oldest_message_in_batch,
)
from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig
from backend.data.user import (
@@ -22,7 +30,7 @@ from backend.data.user import (
get_user_notification_preference,
)
from backend.notifications.email import EmailSender
from backend.util.service import AppService, expose
from backend.util.service import AppService, expose, get_service_client
from backend.util.settings import Settings
logger = logging.getLogger(__name__)
@@ -60,6 +68,16 @@ def create_notification_config() -> RabbitMQConfig:
"x-dead-letter-routing-key": "failed.admin",
},
),
# Batch Queue
Queue(
name="batch_notifications",
exchange=notification_exchange,
routing_key="notification.batch.#",
arguments={
"x-dead-letter-exchange": dead_letter_exchange.name,
"x-dead-letter-routing-key": "failed.batch",
},
),
# Failed notifications queue
Queue(
name="failed_notifications",
@@ -77,6 +95,13 @@ def create_notification_config() -> RabbitMQConfig:
)
@thread_cached
def get_scheduler():
from backend.executor import Scheduler
return get_service_client(Scheduler)
class NotificationManager(AppService):
"""Service for handling notifications with batching support"""
@@ -99,12 +124,134 @@ class NotificationManager(AppService):
return f"notification.backoff.{event.type.value}"
elif event.strategy == QueueType.ADMIN:
return f"notification.admin.{event.type.value}"
elif event.strategy == QueueType.HOURLY:
return f"notification.hourly.{event.type.value}"
elif event.strategy == QueueType.DAILY:
return f"notification.daily.{event.type.value}"
elif event.strategy == QueueType.BATCH:
return f"notification.batch.{event.type.value}"
elif event.strategy == QueueType.SUMMARY:
return f"notification.summary.{event.type.value}"
return f"notification.{event.type.value}"
@expose
def process_existing_batches(self, notification_types: list[NotificationType]):
"""Process existing batches for specified notification types"""
try:
processed_count = 0
current_time = datetime.now(tz=timezone.utc)
for notification_type in notification_types:
# Get all batches for this notification type
batches = self.run_and_wait(get_all_batches_by_type(notification_type))
for batch in batches:
# Check if batch has aged out
oldest_message = self.run_and_wait(
get_user_notification_oldest_message_in_batch(
batch.userId, notification_type
)
)
if not oldest_message:
# this should never happen
logger.error(
f"Batch for user {batch.userId} and type {notification_type} has no oldest message whichshould never happen!!!!!!!!!!!!!!!!"
)
continue
max_delay = get_batch_delay(notification_type)
# If batch has aged out, process it
if oldest_message.createdAt + max_delay < current_time:
recipient_email = self.run_and_wait(
get_user_email_by_id(batch.userId)
)
if not recipient_email:
logger.error(
f"User email not found for user {batch.userId}"
)
continue
should_send = self._should_email_user_based_on_preference(
batch.userId, notification_type
)
if not should_send:
logger.debug(
f"User {batch.userId} does not want to receive {notification_type} notifications"
)
# Clear the batch
self.run_and_wait(
empty_user_notification_batch(
batch.userId, notification_type
)
)
continue
batch_data = self.run_and_wait(
get_user_notification_batch(batch.userId, notification_type)
)
if not batch_data or not batch_data.notifications:
logger.error(
f"Batch data not found for user {batch.userId}"
)
# Clear the batch
self.run_and_wait(
empty_user_notification_batch(
batch.userId, notification_type
)
)
continue
unsub_link = generate_unsubscribe_link(batch.userId)
events = [
NotificationEventModel[
get_data_type(db_event.type)
].model_validate(
{
"user_id": batch.userId,
"type": db_event.type,
"data": db_event.data,
"created_at": db_event.createdAt,
}
)
for db_event in batch_data.notifications
]
logger.info(f"{events=}")
self.email_sender.send_templated(
notification=notification_type,
user_email=recipient_email,
data=events,
user_unsub_link=unsub_link,
)
# Clear the batch
self.run_and_wait(
empty_user_notification_batch(
batch.userId, notification_type
)
)
processed_count += 1
logger.info(f"Processed {processed_count} aged batches")
return {
"success": True,
"processed_count": processed_count,
"notification_types": [nt.value for nt in notification_types],
"timestamp": current_time.isoformat(),
}
except Exception as e:
logger.exception(f"Error processing batches: {e}")
return {
"success": False,
"error": str(e),
"notification_types": [nt.value for nt in notification_types],
"timestamp": datetime.now(tz=timezone.utc).isoformat(),
}
@expose
def queue_notification(self, event: NotificationEventDTO) -> NotificationResult:
"""Queue a notification - exposed method for other services to call"""
@@ -152,6 +299,32 @@ class NotificationManager(AppService):
# only if both are true, should we email this person
return validated_email and preference
async def _should_batch(
self, user_id: str, event_type: NotificationType, event: NotificationEventModel
) -> bool:
await create_or_add_to_user_notification_batch(user_id, event_type, event)
oldest_message = await get_user_notification_oldest_message_in_batch(
user_id, event_type
)
if not oldest_message:
logger.error(
f"Batch for user {user_id} and type {event_type} has no oldest message whichshould never happen!!!!!!!!!!!!!!!!"
)
return False
oldest_age = oldest_message.createdAt
max_delay = get_batch_delay(event_type)
if oldest_age + max_delay < datetime.now(tz=timezone.utc):
logger.info(f"Batch for user {user_id} and type {event_type} is old enough")
return True
logger.info(
f"Batch for user {user_id} and type {event_type} is not old enough: {oldest_age + max_delay} < {datetime.now(tz=timezone.utc)} max_delay={max_delay}"
)
return False
def _parse_message(self, message: str) -> NotificationEvent | None:
try:
event = NotificationEventDTO.model_validate_json(message)
@@ -216,6 +389,70 @@ class NotificationManager(AppService):
logger.exception(f"Error processing notification for immediate queue: {e}")
return False
def _process_batch(self, message: str) -> bool:
"""Process a single notification with a batching strategy, returning whether to put into the failed queue"""
try:
parsed = self._parse_message(message)
if not parsed:
return False
event = parsed.event
model = parsed.model
logger.info(f"Processing batch notification: {model}")
recipient_email = self.run_and_wait(get_user_email_by_id(event.user_id))
if not recipient_email:
logger.error(f"User email not found for user {event.user_id}")
return False
should_send = self._should_email_user_based_on_preference(
event.user_id, event.type
)
if not should_send:
logger.info(
f"User {event.user_id} does not want to receive {event.type} notifications"
)
return True
should_send = self.run_and_wait(
self._should_batch(event.user_id, event.type, model)
)
if not should_send:
logger.info("Batch not old enough to send")
return False
batch = self.run_and_wait(
get_user_notification_batch(event.user_id, event.type)
)
if not batch or not batch.notifications:
logger.error(f"Batch not found for user {event.user_id}")
return False
unsub_link = generate_unsubscribe_link(event.user_id)
batch_messages = [
NotificationEventModel[get_data_type(db_event.type)].model_validate(
{
"user_id": event.user_id,
"type": db_event.type,
"data": db_event.data,
"created_at": db_event.createdAt,
}
)
for db_event in batch.notifications
]
self.email_sender.send_templated(
notification=event.type,
user_email=recipient_email,
data=batch_messages,
user_unsub_link=unsub_link,
)
# only empty the batch if we sent the email successfully
self.run_and_wait(empty_user_notification_batch(event.user_id, event.type))
return True
except Exception as e:
logger.exception(f"Error processing notification for batch queue: {e}")
return False
def _run_queue(
self,
queue: aio_pika.abc.AbstractQueue,
@@ -248,12 +485,25 @@ class NotificationManager(AppService):
def run_service(self):
logger.info(f"[{self.service_name}] Started notification service")
# Set up scheduler for batch processing of all notification types
# this can be changed later to spawn differnt cleanups on different schedules
try:
get_scheduler().add_batched_notification_schedule(
notification_types=list(NotificationType),
data={},
cron="0 * * * *",
)
logger.info("Scheduled notification cleanup")
except Exception as e:
logger.error(f"Error scheduling notification cleanup: {e}")
# Set up queue consumers
channel = self.run_and_wait(self.rabbit.get_channel())
immediate_queue = self.run_and_wait(
channel.get_queue("immediate_notifications")
)
batch_queue = self.run_and_wait(channel.get_queue("batch_notifications"))
admin_queue = self.run_and_wait(channel.get_queue("admin_notifications"))
@@ -269,6 +519,11 @@ class NotificationManager(AppService):
process_func=self._process_admin_message,
error_queue_name="admin_notifications",
)
self._run_queue(
queue=batch_queue,
process_func=self._process_batch,
error_queue_name="batch_notifications",
)
time.sleep(0.1)

View File

@@ -1,5 +1,6 @@
{# Agent Run #}
{# Template variables:
notification.data: the stuff below but a list of them
data.agent_name: the name of the agent
data.credits_used: the number of credits used by the agent
data.node_count: the number of nodes the agent ran on
@@ -7,90 +8,135 @@ data.execution_time: the time it took to run the agent
data.graph_id: the id of the graph the agent ran on
data.outputs: the list of outputs of the agent
#}
<p style="
font-family: 'Poppins', sans-serif;
color: #070629;
font-size: 16px;
line-height: 165%;
margin-top: 0;
margin-bottom: 10px;
">
Your agent, <strong>{{ data.agent_name }}</strong>, has completed its run!
</p>
<p style="
font-family: 'Poppins', sans-serif;
color: #070629;
font-size: 16px;
line-height: 165%;
margin-top: 0;
margin-bottom: 20px;
padding-left: 20px;
">
<p style="margin-bottom: 10px;"><strong>Time Taken:</strong> {{ data.execution_time | int }} seconds</p>
<p style="margin-bottom: 10px;"><strong>Nodes Used:</strong> {{ data.node_count }}</p>
<p style="margin-bottom: 10px;"><strong>Cost:</strong> ${{ "{:.2f}".format((data.credits_used|float)/100) }}</p>
</p>
{% if data.outputs and data.outputs|length > 0 %}
<div style="
margin-left: 15px;
margin-bottom: 20px;
">
<p style="
font-family: 'Poppins', sans-serif;
color: #070629;
font-weight: 600;
font-size: 16px;
margin-bottom: 10px;
">
Results:
</p>
{% for output in data.outputs %}
<div style="
margin-left: 15px;
margin-bottom: 15px;
">
<p style="
font-family: 'Poppins', sans-serif;
color: #5D23BB;
font-weight: 500;
font-size: 16px;
margin-top: 0;
margin-bottom: 8px;
">
{{ output.name }}
{% if notifications is defined %}
{# BATCH MODE #}
<div style="font-family: 'Poppins', sans-serif; color: #070629;">
<h2 style="color: #5D23BB; margin-bottom: 15px;">Agent Run Summary</h2>
<p style="font-size: 16px; line-height: 165%; margin-top: 0; margin-bottom: 15px;">
<strong>{{ notifications|length }}</strong> agent runs have completed!
</p>
{% for key, value in output.items() %}
{% if key != 'name' %}
<div style="
margin-left: 15px;
background-color: #f5f5ff;
padding: 8px 12px;
border-radius: 4px;
font-family: 'Roboto Mono', monospace;
white-space: pre-wrap;
word-break: break-word;
overflow-wrap: break-word;
max-width: 100%;
overflow-x: auto;
margin-top: 5px;
margin-bottom: 10px;
line-height: 1.4;
">
{% if value is iterable and value is not string %}
{% if value|length == 1 %}
{{ value[0] }}
{% else %}
[{% for item in value %}{{ item }}{% if not loop.last %}, {% endif %}{% endfor %}]
{% endif %}
{% else %}
{{ value }}
{% endif %}
</div>
{# Calculate summary stats #}
{% set total_time = 0 %}
{% set total_nodes = 0 %}
{% set total_credits = 0 %}
{% set agent_names = [] %}
{% for notification in notifications %}
{% set total_time = total_time + notification.data.execution_time %}
{% set total_nodes = total_nodes + notification.data.node_count %}
{% set total_credits = total_credits + notification.data.credits_used %}
{% if notification.data.agent_name not in agent_names %}
{% set agent_names = agent_names + [notification.data.agent_name] %}
{% endif %}
{% endfor %}
<div style="background-color: #f8f7ff; border-radius: 8px; padding: 15px; margin-bottom: 25px;">
<h3 style="margin-top: 0; margin-bottom: 10px; color: #5D23BB;">Summary</h3>
<p style="margin: 5px 0;"><strong>Agents:</strong> {{ agent_names|join(", ") }}</p>
<p style="margin: 5px 0;"><strong>Total Time:</strong> {{ total_time | int }} seconds</p>
<p style="margin: 5px 0;"><strong>Total Nodes:</strong> {{ total_nodes }}</p>
<p style="margin: 5px 0;"><strong>Total Cost:</strong> ${{ "{:.2f}".format((total_credits|float)/100) }}</p>
</div>
<h3 style="margin-top: 25px; margin-bottom: 15px; color: #5D23BB;">Individual Runs</h3>
{% for notification in notifications %}
<div style="margin-bottom: 30px; border-left: 3px solid #5D23BB; padding-left: 15px;">
<p style="font-size: 16px; font-weight: 600; margin-top: 0; margin-bottom: 10px;">
Agent: <strong>{{ notification.data.agent_name }}</strong>
</p>
<div style="margin-left: 10px;">
<p style="margin: 5px 0;"><strong>Time:</strong> {{ notification.data.execution_time | int }} seconds</p>
<p style="margin: 5px 0;"><strong>Nodes:</strong> {{ notification.data.node_count }}</p>
<p style="margin: 5px 0;"><strong>Cost:</strong> ${{ "{:.2f}".format((notification.data.credits_used|float)/100) }}</p>
</div>
{% if notification.data.outputs and notification.data.outputs|length > 0 %}
<div style="margin-left: 10px; margin-top: 15px;">
<p style="font-weight: 600; margin-bottom: 10px;">Results:</p>
{% for output in notification.data.outputs %}
<div style="margin-left: 10px; margin-bottom: 12px;">
<p style="color: #5D23BB; font-weight: 500; margin-top: 0; margin-bottom: 5px;">
{{ output.name }}
</p>
{% for key, value in output.items() %}
{% if key != 'name' %}
<div style="margin-left: 10px; background-color: #f5f5ff; padding: 8px 12px; border-radius: 4px;
font-family: 'Roboto Mono', monospace; white-space: pre-wrap; word-break: break-word;
overflow-wrap: break-word; max-width: 100%; overflow-x: auto; margin-top: 3px;
margin-bottom: 8px; line-height: 1.4;">
{% if value is iterable and value is not string %}
{% if value|length == 1 %}
{{ value[0] }}
{% else %}
[{% for item in value %}{{ item }}{% if not loop.last %}, {% endif %}{% endfor %}]
{% endif %}
{% else %}
{{ value }}
{% endif %}
</div>
{% endif %}
{% endfor %}
</div>
{% endfor %}
</div>
{% endif %}
</div>
{% endfor %}
</div>
{% endfor %}
</div>
{% else %}
{# SINGLE NOTIFICATION MODE - Original template #}
<p style="font-family: 'Poppins', sans-serif; color: #070629; font-size: 16px; line-height: 165%;
margin-top: 0; margin-bottom: 10px;">
Your agent, <strong>{{ data.agent_name }}</strong>, has completed its run!
</p>
<p style="font-family: 'Poppins', sans-serif; color: #070629; font-size: 16px; line-height: 165%;
margin-top: 0; margin-bottom: 20px; padding-left: 20px;">
<p style="margin-bottom: 10px;"><strong>Time Taken:</strong> {{ data.execution_time | int }} seconds</p>
<p style="margin-bottom: 10px;"><strong>Nodes Used:</strong> {{ data.node_count }}</p>
<p style="margin-bottom: 10px;"><strong>Cost:</strong> ${{ "{:.2f}".format((data.credits_used|float)/100) }}</p>
</p>
{% if data.outputs and data.outputs|length > 0 %}
<div style="margin-left: 15px; margin-bottom: 20px;">
<p style="font-family: 'Poppins', sans-serif; color: #070629; font-weight: 600;
font-size: 16px; margin-bottom: 10px;">
Results:
</p>
{% for output in data.outputs %}
<div style="margin-left: 15px; margin-bottom: 15px;">
<p style="font-family: 'Poppins', sans-serif; color: #5D23BB; font-weight: 500;
font-size: 16px; margin-top: 0; margin-bottom: 8px;">
{{ output.name }}
</p>
{% for key, value in output.items() %}
{% if key != 'name' %}
<div style="margin-left: 15px; background-color: #f5f5ff; padding: 8px 12px; border-radius: 4px;
font-family: 'Roboto Mono', monospace; white-space: pre-wrap; word-break: break-word;
overflow-wrap: break-word; max-width: 100%; overflow-x: auto; margin-top: 5px;
margin-bottom: 10px; line-height: 1.4;">
{% if value is iterable and value is not string %}
{% if value|length == 1 %}
{{ value[0] }}
{% else %}
[{% for item in value %}{{ item }}{% if not loop.last %}, {% endif %}{% endfor %}]
{% endif %}
{% else %}
{{ value }}
{% endif %}
</div>
{% endif %}
{% endfor %}
</div>
{% endfor %}
</div>
{% endif %}
{% endif %}

View File

@@ -1,5 +1,5 @@
from backend.app import run_processes
from backend.executor import DatabaseManager, ExecutionScheduler
from backend.executor import DatabaseManager, Scheduler
from backend.notifications.notifications import NotificationManager
from backend.server.rest_api import AgentServer
@@ -11,7 +11,7 @@ def main():
run_processes(
NotificationManager(),
DatabaseManager(),
ExecutionScheduler(),
Scheduler(),
AgentServer(),
)

View File

@@ -55,7 +55,7 @@ from backend.data.user import (
update_user_email,
update_user_notification_preference,
)
from backend.executor import ExecutionManager, ExecutionScheduler, scheduler
from backend.executor import ExecutionManager, Scheduler, scheduler
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.integrations.webhooks.graph_lifecycle_hooks import (
on_graph_activate,
@@ -84,8 +84,8 @@ def execution_manager_client() -> ExecutionManager:
@thread_cached
def execution_scheduler_client() -> ExecutionScheduler:
return get_service_client(ExecutionScheduler)
def execution_scheduler_client() -> Scheduler:
return get_service_client(Scheduler)
settings = Settings()
@@ -701,7 +701,7 @@ class ScheduleCreationRequest(pydantic.BaseModel):
async def create_schedule(
user_id: Annotated[str, Depends(get_user_id)],
schedule: ScheduleCreationRequest,
) -> scheduler.JobInfo:
) -> scheduler.ExecutionJobInfo:
graph = await graph_db.get_graph(
schedule.graph_id, schedule.graph_version, user_id=user_id
)
@@ -743,7 +743,7 @@ def delete_schedule(
def get_execution_schedules(
user_id: Annotated[str, Depends(get_user_id)],
graph_id: str | None = None,
) -> list[scheduler.JobInfo]:
) -> list[scheduler.ExecutionJobInfo]:
return execution_scheduler_client().get_execution_schedules(
user_id=user_id,
graph_id=graph_id,

View File

@@ -8,7 +8,7 @@ from backend.data.block import Block, BlockSchema, initialize_blocks
from backend.data.execution import ExecutionResult, ExecutionStatus
from backend.data.model import _BaseCredentials
from backend.data.user import create_default_user
from backend.executor import DatabaseManager, ExecutionManager, ExecutionScheduler
from backend.executor import DatabaseManager, ExecutionManager, Scheduler
from backend.notifications.notifications import NotificationManager
from backend.server.rest_api import AgentServer
from backend.server.utils import get_user_id
@@ -21,7 +21,7 @@ class SpinTestServer:
self.db_api = DatabaseManager()
self.exec_manager = ExecutionManager()
self.agent_server = AgentServer()
self.scheduler = ExecutionScheduler()
self.scheduler = Scheduler()
self.notif_manager = NotificationManager()
@staticmethod

View File

@@ -1,7 +1,7 @@
import pytest
from backend.data import db
from backend.executor import ExecutionScheduler
from backend.executor import Scheduler
from backend.server.model import CreateGraph
from backend.usecases.sample import create_test_graph, create_test_user
from backend.util.service import get_service_client
@@ -17,7 +17,7 @@ async def test_agent_schedule(server: SpinTestServer):
user_id=test_user.id,
)
scheduler = get_service_client(ExecutionScheduler)
scheduler = get_service_client(Scheduler)
schedules = scheduler.get_execution_schedules(test_graph.id, test_user.id)
assert len(schedules) == 0