mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-10 06:45:28 -05:00
## Summary Routes `increment_onboarding_runs` and `cleanup_expired_oauth_tokens` through the DatabaseManager RPC client instead of calling Prisma directly. ## Problem The Scheduler service never connects its Prisma client. While `add_graph_execution()` in `utils.py` has a fallback that routes through DatabaseManager when Prisma isn't connected, subsequent calls in the scheduler were hitting Prisma directly: - `increment_onboarding_runs()` after successful graph execution - `cleanup_expired_oauth_tokens()` in the scheduled job These threw `ClientNotConnectedError`, caught by generic exception handlers but spamming Sentry (~696K events since December per the original analysis in #11926). ## Solution Follow the same pattern as `utils.py`: 1. Add `cleanup_expired_oauth_tokens` to `DatabaseManager` and `DatabaseManagerAsyncClient` 2. Update scheduler to use `get_database_manager_async_client()` for both calls ## Changes - **database.py**: Import and expose `cleanup_expired_oauth_tokens` in both manager classes - **scheduler.py**: Use `db.increment_onboarding_runs()` and `db.cleanup_expired_oauth_tokens()` via the async client ## Impact - Eliminates Sentry error spam from scheduler - Onboarding run counters now actually increment for scheduled executions - OAuth token cleanup now actually runs ## Testing Deploy to staging with scheduled graphs and verify: 1. No more `ClientNotConnectedError` in scheduler logs 2. `UserOnboarding.agentRuns` increments on scheduled runs 3. Expired OAuth tokens get cleaned up Refs: #11926 (original fix that was closed)
791 lines
28 KiB
Python
791 lines
28 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from enum import Enum
|
|
from typing import Optional
|
|
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
|
|
|
|
from apscheduler.events import (
|
|
EVENT_JOB_ERROR,
|
|
EVENT_JOB_EXECUTED,
|
|
EVENT_JOB_MAX_INSTANCES,
|
|
EVENT_JOB_MISSED,
|
|
)
|
|
from apscheduler.job import Job as JobObj
|
|
from apscheduler.jobstores.memory import MemoryJobStore
|
|
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from apscheduler.util import ZoneInfo
|
|
from dotenv import load_dotenv
|
|
from pydantic import BaseModel, Field, ValidationError
|
|
from sqlalchemy import MetaData, create_engine
|
|
|
|
from backend.data.block import BlockInput
|
|
from backend.data.execution import GraphExecutionWithNodes
|
|
from backend.data.model import CredentialsMetaInput
|
|
from backend.executor import utils as execution_utils
|
|
from backend.monitoring import (
|
|
NotificationJobArgs,
|
|
process_existing_batches,
|
|
process_weekly_summary,
|
|
report_block_error_rates,
|
|
report_execution_accuracy_alerts,
|
|
report_late_executions,
|
|
)
|
|
from backend.util.clients import (
|
|
get_database_manager_async_client,
|
|
get_database_manager_client,
|
|
get_scheduler_client,
|
|
)
|
|
from backend.util.cloud_storage import cleanup_expired_files_async
|
|
from backend.util.exceptions import (
|
|
GraphNotFoundError,
|
|
GraphNotInLibraryError,
|
|
GraphValidationError,
|
|
NotAuthorizedError,
|
|
NotFoundError,
|
|
)
|
|
from backend.util.logging import PrefixFilter
|
|
from backend.util.retry import func_retry
|
|
from backend.util.service import (
|
|
AppService,
|
|
AppServiceClient,
|
|
UnhealthyServiceError,
|
|
endpoint_to_async,
|
|
expose,
|
|
)
|
|
from backend.util.settings import Config
|
|
|
|
|
|
def _extract_schema_from_url(database_url) -> tuple[str, str]:
|
|
"""
|
|
Extracts the schema from the DATABASE_URL and returns the schema and cleaned URL.
|
|
"""
|
|
parsed_url = urlparse(database_url)
|
|
query_params = parse_qs(parsed_url.query)
|
|
|
|
# Extract the 'schema' parameter
|
|
schema_list = query_params.pop("schema", None)
|
|
schema = schema_list[0] if schema_list else "public"
|
|
|
|
# Reconstruct the query string without the 'schema' parameter
|
|
new_query = urlencode(query_params, doseq=True)
|
|
new_parsed_url = parsed_url._replace(query=new_query)
|
|
database_url_clean = str(urlunparse(new_parsed_url))
|
|
|
|
return schema, database_url_clean
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.addFilter(PrefixFilter("[Scheduler]"))
|
|
apscheduler_logger = logger.getChild("apscheduler")
|
|
apscheduler_logger.addFilter(PrefixFilter("[Scheduler] [APScheduler]"))
|
|
|
|
config = Config()
|
|
|
|
# Timeout constants
|
|
SCHEDULER_OPERATION_TIMEOUT_SECONDS = 300 # 5 minutes for scheduler operations
|
|
|
|
|
|
def job_listener(event):
|
|
"""Logs job execution outcomes for better monitoring."""
|
|
if event.exception:
|
|
logger.error(
|
|
f"Job {event.job_id} failed: {type(event.exception).__name__}: {event.exception}"
|
|
)
|
|
else:
|
|
logger.info(f"Job {event.job_id} completed successfully.")
|
|
|
|
|
|
def job_missed_listener(event):
|
|
"""Logs when jobs are missed due to scheduling issues."""
|
|
logger.warning(
|
|
f"Job {event.job_id} was missed at scheduled time {event.scheduled_run_time}. "
|
|
f"This can happen if the scheduler is overloaded or if previous executions are still running."
|
|
)
|
|
|
|
|
|
def job_max_instances_listener(event):
|
|
"""Logs when jobs hit max instances limit."""
|
|
logger.warning(
|
|
f"Job {event.job_id} execution was SKIPPED - max instances limit reached. "
|
|
f"Previous execution(s) are still running. "
|
|
f"Consider increasing max_instances or check why previous executions are taking too long."
|
|
)
|
|
|
|
|
|
_event_loop: asyncio.AbstractEventLoop | None = None
|
|
_event_loop_thread: threading.Thread | None = None
|
|
|
|
|
|
@func_retry
|
|
def get_event_loop():
|
|
"""Get the shared event loop."""
|
|
if _event_loop is None:
|
|
raise RuntimeError("Event loop not initialized. Scheduler not started.")
|
|
return _event_loop
|
|
|
|
|
|
def run_async(coro, timeout: float = SCHEDULER_OPERATION_TIMEOUT_SECONDS):
|
|
"""Run a coroutine in the shared event loop and wait for completion."""
|
|
loop = get_event_loop()
|
|
future = asyncio.run_coroutine_threadsafe(coro, loop)
|
|
try:
|
|
return future.result(timeout=timeout)
|
|
except Exception as e:
|
|
logger.error(f"Async operation failed: {type(e).__name__}: {e}")
|
|
raise
|
|
|
|
|
|
def execute_graph(**kwargs):
|
|
"""Execute graph in the shared event loop and wait for completion."""
|
|
# Wait for completion to ensure job doesn't exit prematurely
|
|
run_async(_execute_graph(**kwargs))
|
|
|
|
|
|
async def _execute_graph(**kwargs):
|
|
args = GraphExecutionJobArgs(**kwargs)
|
|
start_time = asyncio.get_event_loop().time()
|
|
db = get_database_manager_async_client()
|
|
try:
|
|
logger.info(f"Executing recurring job for graph #{args.graph_id}")
|
|
graph_exec: GraphExecutionWithNodes = await execution_utils.add_graph_execution(
|
|
user_id=args.user_id,
|
|
graph_id=args.graph_id,
|
|
graph_version=args.graph_version,
|
|
inputs=args.input_data,
|
|
graph_credentials_inputs=args.input_credentials,
|
|
)
|
|
await db.increment_onboarding_runs(args.user_id)
|
|
elapsed = asyncio.get_event_loop().time() - start_time
|
|
logger.info(
|
|
f"Graph execution started with ID {graph_exec.id} for graph {args.graph_id} "
|
|
f"(took {elapsed:.2f}s to create and publish)"
|
|
)
|
|
if elapsed > 10:
|
|
logger.warning(
|
|
f"Graph execution {graph_exec.id} took {elapsed:.2f}s to create/publish - "
|
|
f"this is unusually slow and may indicate resource contention"
|
|
)
|
|
except GraphNotFoundError as e:
|
|
await _handle_graph_not_available(e, args, start_time)
|
|
except GraphNotInLibraryError as e:
|
|
await _handle_graph_not_available(e, args, start_time)
|
|
except GraphValidationError:
|
|
await _handle_graph_validation_error(args)
|
|
except Exception as e:
|
|
elapsed = asyncio.get_event_loop().time() - start_time
|
|
logger.error(
|
|
f"Error executing graph {args.graph_id} after {elapsed:.2f}s: "
|
|
f"{type(e).__name__}: {e}"
|
|
)
|
|
|
|
|
|
async def _handle_graph_validation_error(args: "GraphExecutionJobArgs") -> None:
|
|
logger.error(
|
|
f"Scheduled Graph {args.graph_id} failed validation. Unscheduling graph"
|
|
)
|
|
if args.schedule_id:
|
|
scheduler_client = get_scheduler_client()
|
|
await scheduler_client.delete_schedule(
|
|
schedule_id=args.schedule_id,
|
|
user_id=args.user_id,
|
|
)
|
|
else:
|
|
logger.error(
|
|
f"Unable to unschedule graph: {args.graph_id} as this is an old job with no associated schedule_id please remove manually"
|
|
)
|
|
|
|
|
|
async def _handle_graph_not_available(
|
|
e: Exception, args: "GraphExecutionJobArgs", start_time: float
|
|
) -> None:
|
|
elapsed = asyncio.get_event_loop().time() - start_time
|
|
logger.warning(
|
|
f"Scheduled execution blocked for deleted/archived graph {args.graph_id} "
|
|
f"(user {args.user_id}) after {elapsed:.2f}s: {e}"
|
|
)
|
|
# Clean up orphaned schedules for this graph
|
|
await _cleanup_orphaned_schedules_for_graph(args.graph_id, args.user_id)
|
|
|
|
|
|
async def _cleanup_orphaned_schedules_for_graph(graph_id: str, user_id: str) -> None:
|
|
"""
|
|
Clean up orphaned schedules for a specific graph when execution fails with GraphNotAccessibleError.
|
|
This happens when an agent is pulled from the Marketplace or deleted
|
|
but schedules still exist.
|
|
"""
|
|
# Use scheduler client to access the scheduler service
|
|
scheduler_client = get_scheduler_client()
|
|
|
|
# Find all schedules for this graph and user
|
|
schedules = await scheduler_client.get_execution_schedules(
|
|
graph_id=graph_id, user_id=user_id
|
|
)
|
|
|
|
for schedule in schedules:
|
|
try:
|
|
await scheduler_client.delete_schedule(
|
|
schedule_id=schedule.id, user_id=user_id
|
|
)
|
|
logger.info(
|
|
f"Cleaned up orphaned schedule {schedule.id} for deleted/archived graph {graph_id}"
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
f"Failed to delete orphaned schedule {schedule.id} for graph {graph_id}"
|
|
)
|
|
|
|
|
|
def cleanup_expired_files():
|
|
"""Clean up expired files from cloud storage."""
|
|
# Wait for completion
|
|
run_async(cleanup_expired_files_async())
|
|
|
|
|
|
def cleanup_oauth_tokens():
|
|
"""Clean up expired OAuth tokens from the database."""
|
|
|
|
# Wait for completion
|
|
async def _cleanup():
|
|
db = get_database_manager_async_client()
|
|
return await db.cleanup_expired_oauth_tokens()
|
|
|
|
run_async(_cleanup())
|
|
|
|
|
|
def execution_accuracy_alerts():
|
|
"""Check execution accuracy and send alerts if drops are detected."""
|
|
return report_execution_accuracy_alerts()
|
|
|
|
|
|
def ensure_embeddings_coverage():
|
|
"""
|
|
Ensure all content types (store agents, blocks, docs) have embeddings for search.
|
|
|
|
Processes ALL missing embeddings in batches of 10 per content type until 100% coverage.
|
|
Missing embeddings = content invisible in hybrid search.
|
|
|
|
Schedule: Runs every 6 hours (balanced between coverage and API costs).
|
|
- Catches new content added between scheduled runs
|
|
- Batch size 10 per content type: gradual processing to avoid rate limits
|
|
- Manual trigger available via execute_ensure_embeddings_coverage endpoint
|
|
"""
|
|
db_client = get_database_manager_client()
|
|
stats = db_client.get_embedding_stats()
|
|
|
|
# Check for error from get_embedding_stats() first
|
|
if "error" in stats:
|
|
logger.error(
|
|
f"Failed to get embedding stats: {stats['error']} - skipping backfill"
|
|
)
|
|
return {
|
|
"backfill": {"processed": 0, "success": 0, "failed": 0},
|
|
"cleanup": {"deleted": 0},
|
|
"error": stats["error"],
|
|
}
|
|
|
|
# Extract totals from new stats structure
|
|
totals = stats.get("totals", {})
|
|
without_embeddings = totals.get("without_embeddings", 0)
|
|
coverage_percent = totals.get("coverage_percent", 0)
|
|
|
|
total_processed = 0
|
|
total_success = 0
|
|
total_failed = 0
|
|
|
|
if without_embeddings == 0:
|
|
logger.info("All content has embeddings, skipping backfill")
|
|
else:
|
|
# Log per-content-type stats for visibility
|
|
by_type = stats.get("by_type", {})
|
|
for content_type, type_stats in by_type.items():
|
|
if type_stats.get("without_embeddings", 0) > 0:
|
|
logger.info(
|
|
f"{content_type}: {type_stats['without_embeddings']} items without embeddings "
|
|
f"({type_stats['coverage_percent']}% coverage)"
|
|
)
|
|
|
|
logger.info(
|
|
f"Total: {without_embeddings} items without embeddings "
|
|
f"({coverage_percent}% coverage) - processing all"
|
|
)
|
|
|
|
# Process in batches until no more missing embeddings
|
|
while True:
|
|
result = db_client.backfill_missing_embeddings(batch_size=100)
|
|
|
|
total_processed += result["processed"]
|
|
total_success += result["success"]
|
|
total_failed += result["failed"]
|
|
|
|
if result["processed"] == 0:
|
|
# No more missing embeddings
|
|
break
|
|
|
|
if result["success"] == 0 and result["processed"] > 0:
|
|
# All attempts in this batch failed - stop to avoid infinite loop
|
|
logger.error(
|
|
f"All {result['processed']} embedding attempts failed - stopping backfill"
|
|
)
|
|
break
|
|
|
|
# Small delay between batches to avoid rate limits
|
|
time.sleep(1)
|
|
|
|
logger.info(
|
|
f"Embedding backfill completed: {total_success}/{total_processed} succeeded, "
|
|
f"{total_failed} failed"
|
|
)
|
|
|
|
# Clean up orphaned embeddings for blocks and docs
|
|
logger.info("Running cleanup for orphaned embeddings (blocks/docs)...")
|
|
cleanup_result = db_client.cleanup_orphaned_embeddings()
|
|
cleanup_totals = cleanup_result.get("totals", {})
|
|
cleanup_deleted = cleanup_totals.get("deleted", 0)
|
|
|
|
if cleanup_deleted > 0:
|
|
logger.info(f"Cleanup completed: deleted {cleanup_deleted} orphaned embeddings")
|
|
by_type = cleanup_result.get("by_type", {})
|
|
for content_type, type_result in by_type.items():
|
|
if type_result.get("deleted", 0) > 0:
|
|
logger.info(
|
|
f"{content_type}: deleted {type_result['deleted']} orphaned embeddings"
|
|
)
|
|
else:
|
|
logger.info("Cleanup completed: no orphaned embeddings found")
|
|
|
|
return {
|
|
"backfill": {
|
|
"processed": total_processed,
|
|
"success": total_success,
|
|
"failed": total_failed,
|
|
},
|
|
"cleanup": {
|
|
"deleted": cleanup_deleted,
|
|
},
|
|
}
|
|
|
|
|
|
# Monitoring functions are now imported from monitoring module
|
|
|
|
|
|
class Jobstores(Enum):
|
|
EXECUTION = "execution"
|
|
BATCHED_NOTIFICATIONS = "batched_notifications"
|
|
WEEKLY_NOTIFICATIONS = "weekly_notifications"
|
|
|
|
|
|
class GraphExecutionJobArgs(BaseModel):
|
|
schedule_id: str | None = None
|
|
user_id: str
|
|
graph_id: str
|
|
graph_version: int
|
|
agent_name: str | None = None
|
|
cron: str
|
|
input_data: BlockInput
|
|
input_credentials: dict[str, CredentialsMetaInput] = Field(default_factory=dict)
|
|
|
|
|
|
class GraphExecutionJobInfo(GraphExecutionJobArgs):
|
|
id: str
|
|
name: str
|
|
next_run_time: str
|
|
timezone: str = Field(default="UTC", description="Timezone used for scheduling")
|
|
|
|
@staticmethod
|
|
def from_db(
|
|
job_args: GraphExecutionJobArgs, job_obj: JobObj
|
|
) -> "GraphExecutionJobInfo":
|
|
# Extract timezone from the trigger if it's a CronTrigger
|
|
timezone_str = "UTC"
|
|
if hasattr(job_obj.trigger, "timezone"):
|
|
timezone_str = str(job_obj.trigger.timezone)
|
|
|
|
return GraphExecutionJobInfo(
|
|
id=job_obj.id,
|
|
name=job_obj.name,
|
|
next_run_time=job_obj.next_run_time.isoformat(),
|
|
timezone=timezone_str,
|
|
**job_args.model_dump(),
|
|
)
|
|
|
|
|
|
class NotificationJobInfo(NotificationJobArgs):
|
|
id: str
|
|
name: str
|
|
next_run_time: str
|
|
|
|
@staticmethod
|
|
def from_db(
|
|
job_args: NotificationJobArgs, job_obj: JobObj
|
|
) -> "NotificationJobInfo":
|
|
return NotificationJobInfo(
|
|
id=job_obj.id,
|
|
name=job_obj.name,
|
|
next_run_time=job_obj.next_run_time.isoformat(),
|
|
**job_args.model_dump(),
|
|
)
|
|
|
|
|
|
class Scheduler(AppService):
|
|
scheduler: BackgroundScheduler
|
|
|
|
def __init__(self, register_system_tasks: bool = True):
|
|
self.register_system_tasks = register_system_tasks
|
|
|
|
@classmethod
|
|
def get_port(cls) -> int:
|
|
return config.execution_scheduler_port
|
|
|
|
@classmethod
|
|
def db_pool_size(cls) -> int:
|
|
return config.scheduler_db_pool_size
|
|
|
|
async def health_check(self) -> str:
|
|
# Thread-safe health check with proper initialization handling
|
|
if not hasattr(self, "scheduler"):
|
|
raise UnhealthyServiceError("Scheduler is still initializing")
|
|
|
|
# Check if we're in the middle of cleanup
|
|
if self._shutting_down:
|
|
return await super().health_check()
|
|
|
|
# Normal operation - check if scheduler is running
|
|
if not self.scheduler.running:
|
|
raise UnhealthyServiceError("Scheduler is not running")
|
|
|
|
return await super().health_check()
|
|
|
|
def run_service(self):
|
|
load_dotenv()
|
|
|
|
# Initialize the event loop for async jobs
|
|
global _event_loop
|
|
_event_loop = asyncio.new_event_loop()
|
|
|
|
# Use daemon thread since it should die with the main service
|
|
global _event_loop_thread
|
|
_event_loop_thread = threading.Thread(
|
|
target=_event_loop.run_forever, daemon=True, name="SchedulerEventLoop"
|
|
)
|
|
_event_loop_thread.start()
|
|
|
|
db_schema, db_url = _extract_schema_from_url(os.getenv("DIRECT_URL"))
|
|
# Configure executors to limit concurrency without skipping jobs
|
|
from apscheduler.executors.pool import ThreadPoolExecutor
|
|
|
|
self.scheduler = BackgroundScheduler(
|
|
executors={
|
|
"default": ThreadPoolExecutor(
|
|
max_workers=self.db_pool_size()
|
|
), # Match DB pool size to prevent resource contention
|
|
},
|
|
job_defaults={
|
|
"coalesce": True, # Skip redundant missed jobs - just run the latest
|
|
"max_instances": 1000, # Effectively unlimited - never drop executions
|
|
"misfire_grace_time": None, # No time limit for missed jobs
|
|
},
|
|
jobstores={
|
|
Jobstores.EXECUTION.value: SQLAlchemyJobStore(
|
|
engine=create_engine(
|
|
url=db_url,
|
|
pool_size=self.db_pool_size(),
|
|
max_overflow=0,
|
|
),
|
|
metadata=MetaData(schema=db_schema),
|
|
# this one is pre-existing so it keeps the
|
|
# default table name.
|
|
tablename="apscheduler_jobs",
|
|
),
|
|
Jobstores.BATCHED_NOTIFICATIONS.value: SQLAlchemyJobStore(
|
|
engine=create_engine(
|
|
url=db_url,
|
|
pool_size=self.db_pool_size(),
|
|
max_overflow=0,
|
|
),
|
|
metadata=MetaData(schema=db_schema),
|
|
tablename="apscheduler_jobs_batched_notifications",
|
|
),
|
|
# These don't really need persistence
|
|
Jobstores.WEEKLY_NOTIFICATIONS.value: MemoryJobStore(),
|
|
},
|
|
logger=apscheduler_logger,
|
|
timezone=ZoneInfo("UTC"),
|
|
)
|
|
|
|
if self.register_system_tasks:
|
|
# Notification PROCESS WEEKLY SUMMARY
|
|
# Runs every Monday at 9 AM UTC
|
|
self.scheduler.add_job(
|
|
process_weekly_summary,
|
|
CronTrigger.from_crontab("0 9 * * 1"),
|
|
id="process_weekly_summary",
|
|
kwargs={},
|
|
replace_existing=True,
|
|
jobstore=Jobstores.WEEKLY_NOTIFICATIONS.value,
|
|
)
|
|
|
|
# Notification PROCESS EXISTING BATCHES
|
|
# self.scheduler.add_job(
|
|
# process_existing_batches,
|
|
# id="process_existing_batches",
|
|
# CronTrigger.from_crontab("0 12 * * 5"),
|
|
# replace_existing=True,
|
|
# jobstore=Jobstores.BATCHED_NOTIFICATIONS.value,
|
|
# )
|
|
|
|
# Notification LATE EXECUTIONS ALERT
|
|
self.scheduler.add_job(
|
|
report_late_executions,
|
|
id="report_late_executions",
|
|
trigger="interval",
|
|
replace_existing=True,
|
|
seconds=config.execution_late_notification_threshold_secs,
|
|
jobstore=Jobstores.EXECUTION.value,
|
|
)
|
|
|
|
# Block Error Rate Monitoring
|
|
self.scheduler.add_job(
|
|
report_block_error_rates,
|
|
id="report_block_error_rates",
|
|
trigger="interval",
|
|
replace_existing=True,
|
|
seconds=config.block_error_rate_check_interval_secs,
|
|
jobstore=Jobstores.EXECUTION.value,
|
|
)
|
|
|
|
# Cloud Storage Cleanup - configurable interval
|
|
self.scheduler.add_job(
|
|
cleanup_expired_files,
|
|
id="cleanup_expired_files",
|
|
trigger="interval",
|
|
replace_existing=True,
|
|
seconds=config.cloud_storage_cleanup_interval_hours
|
|
* 3600, # Convert hours to seconds
|
|
jobstore=Jobstores.EXECUTION.value,
|
|
)
|
|
|
|
# OAuth Token Cleanup - configurable interval
|
|
self.scheduler.add_job(
|
|
cleanup_oauth_tokens,
|
|
id="cleanup_oauth_tokens",
|
|
trigger="interval",
|
|
replace_existing=True,
|
|
seconds=config.oauth_token_cleanup_interval_hours
|
|
* 3600, # Convert hours to seconds
|
|
jobstore=Jobstores.EXECUTION.value,
|
|
)
|
|
|
|
# Execution Accuracy Monitoring - configurable interval
|
|
self.scheduler.add_job(
|
|
execution_accuracy_alerts,
|
|
id="report_execution_accuracy_alerts",
|
|
trigger="interval",
|
|
replace_existing=True,
|
|
seconds=config.execution_accuracy_check_interval_hours
|
|
* 3600, # Convert hours to seconds
|
|
jobstore=Jobstores.EXECUTION.value,
|
|
)
|
|
|
|
# Embedding Coverage - Every 6 hours
|
|
# Ensures all approved agents have embeddings for hybrid search
|
|
# Critical: missing embeddings = agents invisible in search
|
|
self.scheduler.add_job(
|
|
ensure_embeddings_coverage,
|
|
id="ensure_embeddings_coverage",
|
|
trigger="interval",
|
|
hours=6,
|
|
replace_existing=True,
|
|
max_instances=1, # Prevent overlapping runs
|
|
jobstore=Jobstores.EXECUTION.value,
|
|
)
|
|
|
|
self.scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
|
|
self.scheduler.add_listener(job_missed_listener, EVENT_JOB_MISSED)
|
|
self.scheduler.add_listener(job_max_instances_listener, EVENT_JOB_MAX_INSTANCES)
|
|
self.scheduler.start()
|
|
|
|
# Run embedding backfill immediately on startup
|
|
# This ensures blocks/docs are searchable right away, not after 6 hours
|
|
# Safe to run on multiple pods - uses upserts and checks for existing embeddings
|
|
if self.register_system_tasks:
|
|
logger.info("Running embedding backfill on startup...")
|
|
try:
|
|
result = ensure_embeddings_coverage()
|
|
logger.info(f"Startup embedding backfill complete: {result}")
|
|
except Exception as e:
|
|
logger.error(f"Startup embedding backfill failed: {e}")
|
|
# Don't fail startup - the scheduled job will retry later
|
|
|
|
# Keep the service running since BackgroundScheduler doesn't block
|
|
super().run_service()
|
|
|
|
def cleanup(self):
|
|
if self.scheduler:
|
|
logger.info("⏳ Shutting down scheduler...")
|
|
self.scheduler.shutdown(wait=True)
|
|
|
|
global _event_loop
|
|
if _event_loop:
|
|
logger.info("⏳ Closing event loop...")
|
|
_event_loop.call_soon_threadsafe(_event_loop.stop)
|
|
|
|
global _event_loop_thread
|
|
if _event_loop_thread:
|
|
logger.info("⏳ Waiting for event loop thread to finish...")
|
|
_event_loop_thread.join(timeout=SCHEDULER_OPERATION_TIMEOUT_SECONDS)
|
|
|
|
super().cleanup()
|
|
|
|
@expose
|
|
def add_graph_execution_schedule(
|
|
self,
|
|
user_id: str,
|
|
graph_id: str,
|
|
graph_version: int,
|
|
cron: str,
|
|
input_data: BlockInput,
|
|
input_credentials: dict[str, CredentialsMetaInput],
|
|
name: Optional[str] = None,
|
|
user_timezone: str | None = None,
|
|
) -> GraphExecutionJobInfo:
|
|
# Validate the graph before scheduling to prevent runtime failures
|
|
# We don't need the return value, just want the validation to run
|
|
run_async(
|
|
execution_utils.validate_and_construct_node_execution_input(
|
|
graph_id=graph_id,
|
|
user_id=user_id,
|
|
graph_inputs=input_data,
|
|
graph_version=graph_version,
|
|
graph_credentials_inputs=input_credentials,
|
|
)
|
|
)
|
|
|
|
# Use provided timezone or default to UTC
|
|
# Note: Timezone should be passed from the client to avoid database lookups
|
|
if not user_timezone:
|
|
user_timezone = "UTC"
|
|
logger.warning(
|
|
f"No timezone provided for user {user_id}, using UTC for scheduling. "
|
|
f"Client should pass user's timezone for correct scheduling."
|
|
)
|
|
|
|
logger.info(
|
|
f"Scheduling job for user {user_id} with timezone {user_timezone} (cron: {cron})"
|
|
)
|
|
schedule_id = str(uuid.uuid4())
|
|
|
|
job_args = GraphExecutionJobArgs(
|
|
schedule_id=schedule_id,
|
|
user_id=user_id,
|
|
graph_id=graph_id,
|
|
graph_version=graph_version,
|
|
agent_name=name,
|
|
cron=cron,
|
|
input_data=input_data,
|
|
input_credentials=input_credentials,
|
|
)
|
|
job = self.scheduler.add_job(
|
|
execute_graph,
|
|
kwargs=job_args.model_dump(),
|
|
name=name,
|
|
trigger=CronTrigger.from_crontab(cron, timezone=user_timezone),
|
|
jobstore=Jobstores.EXECUTION.value,
|
|
replace_existing=True,
|
|
id=schedule_id,
|
|
)
|
|
logger.info(
|
|
f"Added job {job.id} with cron schedule '{cron}' in timezone {user_timezone}, input data: {input_data}"
|
|
)
|
|
return GraphExecutionJobInfo.from_db(job_args, job)
|
|
|
|
@expose
|
|
def delete_graph_execution_schedule(
|
|
self, schedule_id: str, user_id: str
|
|
) -> GraphExecutionJobInfo:
|
|
job = self.scheduler.get_job(schedule_id, jobstore=Jobstores.EXECUTION.value)
|
|
if not job:
|
|
raise NotFoundError(f"Job #{schedule_id} not found.")
|
|
|
|
job_args = GraphExecutionJobArgs(**job.kwargs)
|
|
if job_args.user_id != user_id:
|
|
raise NotAuthorizedError("User ID does not match the job's user ID")
|
|
|
|
logger.info(f"Deleting job {schedule_id}")
|
|
job.remove()
|
|
|
|
return GraphExecutionJobInfo.from_db(job_args, job)
|
|
|
|
@expose
|
|
def get_graph_execution_schedules(
|
|
self, graph_id: str | None = None, user_id: str | None = None
|
|
) -> list[GraphExecutionJobInfo]:
|
|
jobs: list[JobObj] = self.scheduler.get_jobs(jobstore=Jobstores.EXECUTION.value)
|
|
schedules = []
|
|
for job in jobs:
|
|
logger.debug(
|
|
f"Found job {job.id} with cron schedule {job.trigger} and args {job.kwargs}"
|
|
)
|
|
try:
|
|
job_args = GraphExecutionJobArgs.model_validate(job.kwargs)
|
|
except ValidationError:
|
|
continue
|
|
if (
|
|
job.next_run_time is not None
|
|
and (graph_id is None or job_args.graph_id == graph_id)
|
|
and (user_id is None or job_args.user_id == user_id)
|
|
):
|
|
schedules.append(GraphExecutionJobInfo.from_db(job_args, job))
|
|
return schedules
|
|
|
|
@expose
|
|
def execute_process_existing_batches(self, kwargs: dict):
|
|
process_existing_batches(**kwargs)
|
|
|
|
@expose
|
|
def execute_process_weekly_summary(self):
|
|
process_weekly_summary()
|
|
|
|
@expose
|
|
def execute_report_late_executions(self):
|
|
return report_late_executions()
|
|
|
|
@expose
|
|
def execute_report_block_error_rates(self):
|
|
return report_block_error_rates()
|
|
|
|
@expose
|
|
def execute_cleanup_expired_files(self):
|
|
"""Manually trigger cleanup of expired cloud storage files."""
|
|
return cleanup_expired_files()
|
|
|
|
@expose
|
|
def execute_cleanup_oauth_tokens(self):
|
|
"""Manually trigger cleanup of expired OAuth tokens."""
|
|
return cleanup_oauth_tokens()
|
|
|
|
@expose
|
|
def execute_report_execution_accuracy_alerts(self):
|
|
"""Manually trigger execution accuracy alert checking."""
|
|
return execution_accuracy_alerts()
|
|
|
|
@expose
|
|
def execute_ensure_embeddings_coverage(self):
|
|
"""Manually trigger embedding backfill for approved store agents."""
|
|
return ensure_embeddings_coverage()
|
|
|
|
|
|
class SchedulerClient(AppServiceClient):
|
|
@classmethod
|
|
def get_service_type(cls):
|
|
return Scheduler
|
|
|
|
add_execution_schedule = endpoint_to_async(Scheduler.add_graph_execution_schedule)
|
|
delete_schedule = endpoint_to_async(Scheduler.delete_graph_execution_schedule)
|
|
get_execution_schedules = endpoint_to_async(Scheduler.get_graph_execution_schedules)
|