feat(backend): Introduce late execution check scheduled job (#9914)

Introduce a late execution check scheduled job. The late threshold
duration is configurable.
This initial version only reports the error to Sentry.

### Changes 🏗️

* Added late execution check scheduled job
* Move the registration weekly notification processing job out of API
call and calling it directly from the scheduler service.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [x] Manual firing of scheduled job through an exposed API
This commit is contained in:
Zamil Majdy
2025-05-07 12:00:37 +07:00
committed by GitHub
parent 519ad94ec9
commit ac8ef9bdb2
9 changed files with 112 additions and 84 deletions

View File

@@ -284,8 +284,12 @@ class NodeExecutionResult(BaseModel):
async def get_graph_executions(
graph_id: Optional[str] = None,
user_id: Optional[str] = None,
graph_id: str | None = None,
user_id: str | None = None,
statuses: list[ExecutionStatus] | None = None,
created_time_gte: datetime | None = None,
created_time_lte: datetime | None = None,
limit: int | None = None,
) -> list[GraphExecutionMeta]:
where_filter: AgentGraphExecutionWhereInput = {
"isDeleted": False,
@@ -294,10 +298,18 @@ async def get_graph_executions(
where_filter["userId"] = user_id
if graph_id:
where_filter["agentGraphId"] = graph_id
if created_time_gte or created_time_lte:
where_filter["createdAt"] = {
"gte": created_time_gte or datetime.min.replace(tzinfo=timezone.utc),
"lte": created_time_lte or datetime.max.replace(tzinfo=timezone.utc),
}
if statuses:
where_filter["OR"] = [{"executionStatus": status} for status in statuses]
executions = await AgentGraphExecution.prisma().find_many(
where=where_filter,
order={"createdAt": "desc"},
take=limit,
)
return [GraphExecutionMeta.from_db(execution) for execution in executions]
@@ -641,28 +653,6 @@ async def get_node_executions(
return res
async def get_graph_executions_in_timerange(
user_id: str, start_time: str, end_time: str
) -> list[GraphExecution]:
try:
executions = await AgentGraphExecution.prisma().find_many(
where={
"startedAt": {
"gte": datetime.fromisoformat(start_time),
"lte": datetime.fromisoformat(end_time),
},
"userId": user_id,
"isDeleted": False,
},
include=GRAPH_EXECUTION_INCLUDE,
)
return [GraphExecution.from_db(execution) for execution in executions]
except Exception as e:
raise DatabaseError(
f"Failed to get executions in timerange {start_time} to {end_time} for user {user_id}: {e}"
) from e
async def get_latest_node_execution(
node_id: str, graph_eid: str
) -> NodeExecutionResult | None:

View File

@@ -7,6 +7,7 @@ from backend.data.execution import (
create_graph_execution,
get_graph_execution,
get_graph_execution_meta,
get_graph_executions,
get_latest_node_execution,
get_node_executions,
update_graph_execution_start_time,
@@ -86,6 +87,7 @@ class DatabaseManager(AppService):
# Executions
get_graph_execution = _(get_graph_execution)
get_graph_executions = _(get_graph_executions)
get_graph_execution_meta = _(get_graph_execution_meta)
create_graph_execution = _(create_graph_execution)
get_node_executions = _(get_node_executions)
@@ -142,6 +144,7 @@ class DatabaseManagerClient(AppServiceClient):
# Executions
get_graph_execution = _(d.get_graph_execution)
get_graph_executions = _(d.get_graph_executions)
get_graph_execution_meta = _(d.get_graph_execution_meta)
create_graph_execution = _(d.create_graph_execution)
get_node_executions = _(d.get_node_executions)

View File

@@ -1,5 +1,6 @@
import logging
import os
from datetime import datetime, timedelta, timezone
from enum import Enum
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
@@ -16,8 +17,10 @@ from pydantic import BaseModel
from sqlalchemy import MetaData, create_engine
from backend.data.block import BlockInput
from backend.data.execution import ExecutionStatus
from backend.executor import utils as execution_utils
from backend.notifications.notifications import NotificationManagerClient
from backend.util.metrics import sentry_alert
from backend.util.service import (
AppService,
AppServiceClient,
@@ -82,6 +85,36 @@ def execute_graph(**kwargs):
logger.exception(f"Error executing graph {args.graph_id}: {e}")
class LateExecutionException(Exception):
pass
def report_late_executions() -> str:
late_executions = execution_utils.get_db_client().get_graph_executions(
statuses=[ExecutionStatus.QUEUED],
created_time_gte=datetime.now(timezone.utc)
- timedelta(seconds=config.execution_late_notification_checkrange_secs),
created_time_lte=datetime.now(timezone.utc)
- timedelta(seconds=config.execution_late_notification_threshold_secs),
limit=1000,
)
if not late_executions:
return "No late executions detected."
num_late_executions = len(late_executions)
num_users = len(set([r.user_id for r in late_executions]))
error = LateExecutionException(
f"Late executions detected: {num_late_executions} late executions from {num_users} users "
f"in the last {config.execution_late_notification_checkrange_secs} seconds. "
f"Graph has been queued for more than {config.execution_late_notification_threshold_secs} seconds. "
"Please check the executor status."
)
logger.error(error)
sentry_alert(error)
return str(error)
def process_existing_batches(**kwargs):
args = NotificationJobArgs(**kwargs)
try:
@@ -155,6 +188,9 @@ class NotificationJobInfo(NotificationJobArgs):
class Scheduler(AppService):
scheduler: BlockingScheduler
def __init__(self, register_system_tasks: bool = True):
self.register_system_tasks = register_system_tasks
@classmethod
def get_port(cls) -> int:
return config.execution_scheduler_port
@@ -192,6 +228,37 @@ class Scheduler(AppService):
Jobstores.WEEKLY_NOTIFICATIONS.value: MemoryJobStore(),
}
)
if self.register_system_tasks:
# Notification PROCESS WEEKLY SUMMARY
self.scheduler.add_job(
process_weekly_summary,
CronTrigger.from_crontab("0 * * * *"),
id="process_weekly_summary",
kwargs={},
replace_existing=True,
jobstore=Jobstores.WEEKLY_NOTIFICATIONS.value,
)
# Notification PROCESS EXISTING BATCHES
# self.scheduler.add_job(
# process_existing_batches,
# id="process_existing_batches",
# CronTrigger.from_crontab("0 12 * * 5"),
# replace_existing=True,
# jobstore=Jobstores.BATCHED_NOTIFICATIONS.value,
# )
# Notification LATE EXECUTIONS ALERT
self.scheduler.add_job(
report_late_executions,
id="report_late_executions",
trigger="interval",
replace_existing=True,
seconds=config.execution_late_notification_threshold_secs,
jobstore=Jobstores.EXECUTION.value,
)
self.scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
self.scheduler.start()
@@ -262,43 +329,16 @@ class Scheduler(AppService):
return schedules
@expose
def add_batched_notification_schedule(
self,
notification_types: list[NotificationType],
data: dict,
cron: str,
) -> NotificationJobInfo:
job_args = NotificationJobArgs(
notification_types=notification_types,
cron=cron,
)
job = self.scheduler.add_job(
process_existing_batches,
CronTrigger.from_crontab(cron),
kwargs=job_args.model_dump(),
replace_existing=True,
jobstore=Jobstores.BATCHED_NOTIFICATIONS.value,
)
log(f"Added job {job.id} with cron schedule '{cron}' input data: {data}")
return NotificationJobInfo.from_db(job_args, job)
def execute_process_existing_batches(self, kwargs: dict):
process_existing_batches(**kwargs)
@expose
def add_weekly_notification_schedule(self, cron: str) -> NotificationJobInfo:
def execute_process_weekly_summary(self):
process_weekly_summary()
job = self.scheduler.add_job(
process_weekly_summary,
CronTrigger.from_crontab(cron),
kwargs={},
replace_existing=True,
jobstore=Jobstores.WEEKLY_NOTIFICATIONS.value,
)
log(f"Added job {job.id} with cron schedule '{cron}'")
return NotificationJobInfo.from_db(
NotificationJobArgs(
cron=cron, notification_types=[NotificationType.WEEKLY_SUMMARY]
),
job,
)
@expose
def execute_report_late_executions(self):
return report_late_executions()
class SchedulerClient(AppServiceClient):
@@ -309,5 +349,3 @@ class SchedulerClient(AppServiceClient):
add_execution_schedule = endpoint_to_async(Scheduler.add_execution_schedule)
delete_schedule = endpoint_to_async(Scheduler.delete_schedule)
get_execution_schedules = endpoint_to_async(Scheduler.get_execution_schedules)
add_batched_notification_schedule = Scheduler.add_batched_notification_schedule
add_weekly_notification_schedule = Scheduler.add_weekly_notification_schedule

View File

@@ -112,13 +112,6 @@ def create_notification_config() -> RabbitMQConfig:
)
@thread_cached
def get_scheduler():
from backend.executor.scheduler import SchedulerClient
return get_service_client(SchedulerClient)
@thread_cached
def get_db():
from backend.executor.database import DatabaseManagerClient
@@ -715,22 +708,6 @@ class NotificationManager(AppService):
logger.info(f"[{self.service_name}] Started notification service")
# Set up scheduler for batch processing of all notification types
# this can be changed later to spawn different cleanups on different schedules
try:
get_scheduler().add_batched_notification_schedule(
notification_types=list(NotificationType),
data={},
cron="0 * * * *",
)
# get_scheduler().add_weekly_notification_schedule(
# # weekly on Friday at 12pm
# cron="0 12 * * 5",
# )
logger.info("Scheduled notification cleanup")
except Exception as e:
logger.error(f"Error scheduling notification cleanup: {e}")
# Set up queue consumers
channel = self.run_and_wait(self.rabbit.get_channel())

View File

@@ -22,3 +22,8 @@ def sentry_init():
),
],
)
def sentry_alert(error: Exception):
sentry_sdk.capture_exception(error)
sentry_sdk.flush()

View File

@@ -121,6 +121,14 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
default=60 * 60 * 24,
description="Time in seconds after which the execution counter is reset.",
)
execution_late_notification_threshold_secs: int = Field(
default=5 * 60,
description="Time in seconds after which the execution stuck on QUEUED status is considered late.",
)
execution_late_notification_checkrange_secs: int = Field(
default=60 * 60,
description="Time in seconds for how far back to check for the late executions.",
)
model_config = SettingsConfigDict(
env_file=".env",

View File

@@ -25,7 +25,7 @@ class SpinTestServer:
self.db_api = DatabaseManager()
self.exec_manager = ExecutionManager()
self.agent_server = AgentServer()
self.scheduler = Scheduler()
self.scheduler = Scheduler(register_system_tasks=False)
self.notif_manager = NotificationManager()
@staticmethod

View File

@@ -0,0 +1,5 @@
-- CreateIndex
CREATE INDEX "AgentGraphExecution_createdAt_idx" ON "AgentGraphExecution"("createdAt");
-- CreateIndex
CREATE INDEX "AgentNodeExecution_addedTime_idx" ON "AgentNodeExecution"("addedTime");

View File

@@ -352,6 +352,7 @@ model AgentGraphExecution {
@@index([agentGraphId, agentGraphVersion])
@@index([userId])
@@index([createdAt])
}
// This model describes the execution of an AgentNode.
@@ -378,6 +379,7 @@ model AgentNodeExecution {
@@index([agentGraphExecutionId])
@@index([agentNodeId])
@@index([addedTime])
}
// This model describes the output of an AgentNodeExecution.