mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
## Summary Reduces Sentry error noise by ~90% by filtering out expected/transient errors and downgrading inappropriate error-level logs to warnings. Most of the top Sentry issues are not actual bugs but expected conditions (user errors, transient infra, business logic) that were incorrectly logged at ERROR level, causing them to be captured as Sentry events. ## Changes ### 1. Sentry `before_send` filter (`metrics.py`) Added a `before_send` hook to filter known expected errors before they reach Sentry: - **AMQP/RabbitMQ connection errors** — transient during deploys/restarts - **User credential errors** — invalid API keys, missing auth headers (user error, not platform bug) - **Insufficient balance** — expected business logic - **Blocked IP access** — security check working as intended - **Discord bot token errors** — misconfiguration, not runtime error - **Google metadata DNS errors** — expected in non-GCP environments - **Inactive email recipients** — expected for bounced addresses - **Unclosed client sessions/connectors** — resource cleanup noise ### 2. Connection retry log levels (`retry.py`) - `conn_retry` final failure: `error` → `warning` (these are infra retries, not bugs) - `conn_retry` wrapper final failure: `error` → `warning` - Discord alert send failure: `error` → `warning` ### 3. Block execution Sentry capture (`manager.py`) - Skip `sentry_sdk.capture_exception()` for `ValueError` subclasses (BlockExecutionError, BlockInputError, InsufficientBalanceError, etc.) — these are user-caused errors, not platform bugs - Downgrade executor shutdown/disconnect errors to warning ### 4. Scheduler log levels (`scheduler.py`) - Graph validation failure: `error` → `warning` (expected for old/invalid graphs) - Unable to unschedule graph: `error` → `warning` - Job listener failure: `error` → `warning` - Async operation failure: `error` → `warning` ### 5. Discord system alert (`notifications.py`) - Wrapped `discord_system_alert` endpoint with try/catch to prevent unhandled exceptions (fixes AUTOGPT-SERVER-743, AUTOGPT-SERVER-7MW) ### 6. Notification system log levels (`notifications.py`) - All batch processing errors: `error` → `warning` - User email not found: `error` → `warning` - Notification parsing errors: `error` → `warning` - Email sending failures: `error` → `warning` - Summary data gathering failure: `error` → `warning` - Cleaned up unprofessional error messages ### 7. Cloud storage cleanup (`cloud_storage.py`) - Cleanup error: `error` → `warning` ## Sentry Issues Addressed ### AMQP/RabbitMQ (~3.4M events total) - [AUTOGPT-SERVER-3H2](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-3H2) — AMQPConnector ConnectionRefusedError (1.2M events) - [AUTOGPT-SERVER-3H3](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-3H3) — AMQPConnectionWorkflowFailed (770K events) - [AUTOGPT-SERVER-3H4](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-3H4) — AMQP connection workflow failed (770K events) - [AUTOGPT-SERVER-3H5](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-3H5) — AMQPConnectionWorkflow reporting failure (770K events) - [AUTOGPT-SERVER-3H7](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-3H7) — Socket failed to connect (514K events) - [AUTOGPT-SERVER-3H8](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-3H8) — TCP Connection attempt failed (514K events) - [AUTOGPT-SERVER-3H6](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-3H6) — AMQPConnectionError (93K events) - [AUTOGPT-SERVER-7SX](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-7SX) — Error creating transport (69K events) - [AUTOGPT-SERVER-1TN](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-1TN) — ChannelInvalidStateError (39K events) - [AUTOGPT-SERVER-6JC](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-6JC) — ConnectionClosedByBroker (2K events) - [AUTOGPT-SERVER-6RJ/6RK/6RN/6RQ/6RP/6RR](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-6RJ) — Various connection failures (~15K events) - [AUTOGPT-SERVER-4A5/6RM/7XN](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-4A5) — Connection close/transport errors (~540 events) ### User Credential Errors (~15K events) - [AUTOGPT-SERVER-6S5](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-6S5) — Incorrect OpenAI API key (9.2K events) - [AUTOGPT-SERVER-7W4](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-7W4) — Incorrect API key in AIConditionBlock (3.4K events) - [AUTOGPT-SERVER-83Y](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-83Y) — AI condition invalid key (2.3K events) - [AUTOGPT-SERVER-7ZP](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-7ZP) — Perplexity missing auth header (451 events) - [AUTOGPT-SERVER-7XK/7XM](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-7XK) — Anthropic invalid key (125 events) - [AUTOGPT-SERVER-82C](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-82C) — Missing auth header (27 events) - [AUTOGPT-SERVER-721](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-721) — Ideogram invalid token (165 events) ### Business Logic / Validation (~120K events) - [AUTOGPT-SERVER-7YQ](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-7YQ) — Disabled block used in graph (56K events) - [AUTOGPT-SERVER-6W3](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-6W3) — Graph failed validation (46K events) - [AUTOGPT-SERVER-6W2](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-6W2) — Unable to unschedule graph (46K events) - [AUTOGPT-SERVER-83X](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-83X) — Blocked IP access (15K events) - [AUTOGPT-SERVER-6K9](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-6K9) — Insufficient balance (4K events) ### Discord Alert Failures (~24K events) - [AUTOGPT-SERVER-743](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-743) — Discord improper token (22K events) - [AUTOGPT-SERVER-7MW](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-7MW) — Discord 403 Missing Access (1.5K events) ### Notification System (~16K events) - [AUTOGPT-SERVER-550](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-550) — Notification batch create error (8.3K events) - [AUTOGPT-SERVER-58H](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-58H) — ValidationError for NotificationEventModel (3K events) - [AUTOGPT-SERVER-5C6](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-5C6) — Get notification batch error (2.1K events) - [AUTOGPT-SERVER-4BT](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-4BT) — Notification batch create error (1.8K events) - [AUTOGPT-SERVER-5E4](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-5E4) — NotificationPreference validation (1.4K events) - [AUTOGPT-SERVER-508](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-508) — Inactive email recipients (702 events) ### Infrastructure / Transient (~20K events) - [AUTOGPT-SERVER-6WJ](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-6WJ) — Unclosed client session (13K events) - [AUTOGPT-SERVER-745](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-745) — Unclosed connector (5.8K events) - [AUTOGPT-SERVER-4V1](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-4V1) — Google metadata DNS error (2.2K events) - [AUTOGPT-SERVER-80J](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-80J) — CloudStorage DNS error (35 events) ### Executor Shutdown - [AUTOGPT-SERVER-55J](https://significant-gravitas.sentry.io/issues/AUTOGPT-SERVER-55J) — Error disconnecting run client (118 events) ## Test plan - [x] All pre-commit hooks pass (Ruff, isort, Black, Pyright typecheck) - [x] All changed modules import successfully - [ ] Deploy to staging and verify Sentry event volume drops significantly - [ ] Verify legitimate errors still appear in Sentry
805 lines
28 KiB
Python
805 lines
28 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from enum import Enum
|
|
from typing import Optional
|
|
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
|
|
|
|
from apscheduler.events import (
|
|
EVENT_JOB_ERROR,
|
|
EVENT_JOB_EXECUTED,
|
|
EVENT_JOB_MAX_INSTANCES,
|
|
EVENT_JOB_MISSED,
|
|
)
|
|
from apscheduler.job import Job as JobObj
|
|
from apscheduler.jobstores.memory import MemoryJobStore
|
|
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from apscheduler.util import ZoneInfo
|
|
from dotenv import load_dotenv
|
|
from pydantic import BaseModel, Field, ValidationError
|
|
from sqlalchemy import MetaData, create_engine
|
|
|
|
from backend.copilot.optimize_blocks import optimize_block_descriptions
|
|
from backend.data.execution import GraphExecutionWithNodes
|
|
from backend.data.model import CredentialsMetaInput, GraphInput
|
|
from backend.executor import utils as execution_utils
|
|
from backend.monitoring import (
|
|
NotificationJobArgs,
|
|
process_existing_batches,
|
|
process_weekly_summary,
|
|
report_block_error_rates,
|
|
report_execution_accuracy_alerts,
|
|
report_late_executions,
|
|
)
|
|
from backend.util.clients import (
|
|
get_database_manager_async_client,
|
|
get_database_manager_client,
|
|
get_scheduler_client,
|
|
)
|
|
from backend.util.cloud_storage import cleanup_expired_files_async
|
|
from backend.util.exceptions import (
|
|
GraphNotFoundError,
|
|
GraphNotInLibraryError,
|
|
GraphValidationError,
|
|
NotAuthorizedError,
|
|
NotFoundError,
|
|
)
|
|
from backend.util.logging import PrefixFilter
|
|
from backend.util.retry import func_retry
|
|
from backend.util.service import (
|
|
AppService,
|
|
AppServiceClient,
|
|
UnhealthyServiceError,
|
|
endpoint_to_async,
|
|
expose,
|
|
)
|
|
from backend.util.settings import Config
|
|
|
|
|
|
def _extract_schema_from_url(database_url) -> tuple[str, str]:
|
|
"""
|
|
Extracts the schema from the DATABASE_URL and returns the schema and cleaned URL.
|
|
"""
|
|
parsed_url = urlparse(database_url)
|
|
query_params = parse_qs(parsed_url.query)
|
|
|
|
# Extract the 'schema' parameter
|
|
schema_list = query_params.pop("schema", None)
|
|
schema = schema_list[0] if schema_list else "public"
|
|
|
|
# Reconstruct the query string without the 'schema' parameter
|
|
new_query = urlencode(query_params, doseq=True)
|
|
new_parsed_url = parsed_url._replace(query=new_query)
|
|
database_url_clean = str(urlunparse(new_parsed_url))
|
|
|
|
return schema, database_url_clean
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.addFilter(PrefixFilter("[Scheduler]"))
|
|
apscheduler_logger = logger.getChild("apscheduler")
|
|
apscheduler_logger.addFilter(PrefixFilter("[Scheduler] [APScheduler]"))
|
|
|
|
config = Config()
|
|
|
|
# Timeout constants
|
|
SCHEDULER_OPERATION_TIMEOUT_SECONDS = 300 # 5 minutes for scheduler operations
|
|
|
|
|
|
def job_listener(event):
|
|
"""Logs job execution outcomes for better monitoring."""
|
|
if event.exception:
|
|
logger.warning(
|
|
f"Job {event.job_id} failed: {type(event.exception).__name__}: {event.exception}"
|
|
)
|
|
else:
|
|
logger.info(f"Job {event.job_id} completed successfully.")
|
|
|
|
|
|
def job_missed_listener(event):
|
|
"""Logs when jobs are missed due to scheduling issues."""
|
|
logger.warning(
|
|
f"Job {event.job_id} was missed at scheduled time {event.scheduled_run_time}. "
|
|
f"This can happen if the scheduler is overloaded or if previous executions are still running."
|
|
)
|
|
|
|
|
|
def job_max_instances_listener(event):
|
|
"""Logs when jobs hit max instances limit."""
|
|
logger.warning(
|
|
f"Job {event.job_id} execution was SKIPPED - max instances limit reached. "
|
|
f"Previous execution(s) are still running. "
|
|
f"Consider increasing max_instances or check why previous executions are taking too long."
|
|
)
|
|
|
|
|
|
_event_loop: asyncio.AbstractEventLoop | None = None
|
|
_event_loop_thread: threading.Thread | None = None
|
|
|
|
|
|
@func_retry
|
|
def get_event_loop():
|
|
"""Get the shared event loop."""
|
|
if _event_loop is None:
|
|
raise RuntimeError("Event loop not initialized. Scheduler not started.")
|
|
return _event_loop
|
|
|
|
|
|
def run_async(coro, timeout: float = SCHEDULER_OPERATION_TIMEOUT_SECONDS):
|
|
"""Run a coroutine in the shared event loop and wait for completion."""
|
|
loop = get_event_loop()
|
|
future = asyncio.run_coroutine_threadsafe(coro, loop)
|
|
try:
|
|
return future.result(timeout=timeout)
|
|
except Exception as e:
|
|
logger.warning(f"Async operation failed: {type(e).__name__}: {e}")
|
|
raise
|
|
|
|
|
|
def execute_graph(**kwargs):
|
|
"""Execute graph in the shared event loop and wait for completion."""
|
|
# Wait for completion to ensure job doesn't exit prematurely
|
|
run_async(_execute_graph(**kwargs))
|
|
|
|
|
|
async def _execute_graph(**kwargs):
|
|
args = GraphExecutionJobArgs(**kwargs)
|
|
start_time = asyncio.get_event_loop().time()
|
|
db = get_database_manager_async_client()
|
|
try:
|
|
logger.info(f"Executing recurring job for graph #{args.graph_id}")
|
|
graph_exec: GraphExecutionWithNodes = await execution_utils.add_graph_execution(
|
|
user_id=args.user_id,
|
|
graph_id=args.graph_id,
|
|
graph_version=args.graph_version,
|
|
inputs=args.input_data,
|
|
graph_credentials_inputs=args.input_credentials,
|
|
)
|
|
await db.increment_onboarding_runs(args.user_id)
|
|
elapsed = asyncio.get_event_loop().time() - start_time
|
|
logger.info(
|
|
f"Graph execution started with ID {graph_exec.id} for graph {args.graph_id} "
|
|
f"(took {elapsed:.2f}s to create and publish)"
|
|
)
|
|
if elapsed > 10:
|
|
logger.warning(
|
|
f"Graph execution {graph_exec.id} took {elapsed:.2f}s to create/publish - "
|
|
f"this is unusually slow and may indicate resource contention"
|
|
)
|
|
except GraphNotFoundError as e:
|
|
await _handle_graph_not_available(e, args, start_time)
|
|
except GraphNotInLibraryError as e:
|
|
await _handle_graph_not_available(e, args, start_time)
|
|
except GraphValidationError:
|
|
await _handle_graph_validation_error(args)
|
|
except Exception as e:
|
|
elapsed = asyncio.get_event_loop().time() - start_time
|
|
logger.error(
|
|
f"Error executing graph {args.graph_id} after {elapsed:.2f}s: "
|
|
f"{type(e).__name__}: {e}"
|
|
)
|
|
|
|
|
|
async def _handle_graph_validation_error(args: "GraphExecutionJobArgs") -> None:
|
|
logger.warning(
|
|
f"Scheduled Graph {args.graph_id} failed validation. Unscheduling graph"
|
|
)
|
|
if args.schedule_id:
|
|
scheduler_client = get_scheduler_client()
|
|
await scheduler_client.delete_schedule(
|
|
schedule_id=args.schedule_id,
|
|
user_id=args.user_id,
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"Unable to unschedule graph: {args.graph_id} as this is an old job "
|
|
f"with no associated schedule_id please remove manually"
|
|
)
|
|
|
|
|
|
async def _handle_graph_not_available(
|
|
e: Exception, args: "GraphExecutionJobArgs", start_time: float
|
|
) -> None:
|
|
elapsed = asyncio.get_event_loop().time() - start_time
|
|
logger.warning(
|
|
f"Scheduled execution blocked for deleted/archived graph {args.graph_id} "
|
|
f"(user {args.user_id}) after {elapsed:.2f}s: {e}"
|
|
)
|
|
# Clean up orphaned schedules for this graph
|
|
await _cleanup_orphaned_schedules_for_graph(args.graph_id, args.user_id)
|
|
|
|
|
|
async def _cleanup_orphaned_schedules_for_graph(graph_id: str, user_id: str) -> None:
|
|
"""
|
|
Clean up orphaned schedules for a specific graph when execution fails with GraphNotAccessibleError.
|
|
This happens when an agent is pulled from the Marketplace or deleted
|
|
but schedules still exist.
|
|
"""
|
|
# Use scheduler client to access the scheduler service
|
|
scheduler_client = get_scheduler_client()
|
|
|
|
# Find all schedules for this graph and user
|
|
schedules = await scheduler_client.get_execution_schedules(
|
|
graph_id=graph_id, user_id=user_id
|
|
)
|
|
|
|
for schedule in schedules:
|
|
try:
|
|
await scheduler_client.delete_schedule(
|
|
schedule_id=schedule.id, user_id=user_id
|
|
)
|
|
logger.info(
|
|
f"Cleaned up orphaned schedule {schedule.id} for deleted/archived graph {graph_id}"
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
f"Failed to delete orphaned schedule {schedule.id} for graph {graph_id}"
|
|
)
|
|
|
|
|
|
def cleanup_expired_files():
|
|
"""Clean up expired files from cloud storage."""
|
|
# Wait for completion
|
|
run_async(cleanup_expired_files_async())
|
|
|
|
|
|
def cleanup_oauth_tokens():
|
|
"""Clean up expired OAuth tokens from the database."""
|
|
|
|
# Wait for completion
|
|
async def _cleanup():
|
|
db = get_database_manager_async_client()
|
|
return await db.cleanup_expired_oauth_tokens()
|
|
|
|
run_async(_cleanup())
|
|
|
|
|
|
def execution_accuracy_alerts():
|
|
"""Check execution accuracy and send alerts if drops are detected."""
|
|
return report_execution_accuracy_alerts()
|
|
|
|
|
|
def ensure_embeddings_coverage():
|
|
"""
|
|
Ensure all content types (store agents, blocks, docs) have embeddings for search.
|
|
|
|
Processes ALL missing embeddings in batches of 10 per content type until 100% coverage.
|
|
Missing embeddings = content invisible in hybrid search.
|
|
|
|
Schedule: Runs every 6 hours (balanced between coverage and API costs).
|
|
- Catches new content added between scheduled runs
|
|
- Batch size 10 per content type: gradual processing to avoid rate limits
|
|
- Manual trigger available via execute_ensure_embeddings_coverage endpoint
|
|
"""
|
|
db_client = get_database_manager_client()
|
|
stats = db_client.get_embedding_stats()
|
|
|
|
# Check for error from get_embedding_stats() first
|
|
if "error" in stats:
|
|
logger.error(
|
|
f"Failed to get embedding stats: {stats['error']} - skipping backfill"
|
|
)
|
|
return {
|
|
"backfill": {"processed": 0, "success": 0, "failed": 0},
|
|
"cleanup": {"deleted": 0},
|
|
"error": stats["error"],
|
|
}
|
|
|
|
# Extract totals from new stats structure
|
|
totals = stats.get("totals", {})
|
|
without_embeddings = totals.get("without_embeddings", 0)
|
|
coverage_percent = totals.get("coverage_percent", 0)
|
|
|
|
total_processed = 0
|
|
total_success = 0
|
|
total_failed = 0
|
|
|
|
if without_embeddings == 0:
|
|
logger.info("All content has embeddings, skipping backfill")
|
|
else:
|
|
# Log per-content-type stats for visibility
|
|
by_type = stats.get("by_type", {})
|
|
for content_type, type_stats in by_type.items():
|
|
if type_stats.get("without_embeddings", 0) > 0:
|
|
logger.info(
|
|
f"{content_type}: {type_stats['without_embeddings']} items without embeddings "
|
|
f"({type_stats['coverage_percent']}% coverage)"
|
|
)
|
|
|
|
logger.info(
|
|
f"Total: {without_embeddings} items without embeddings "
|
|
f"({coverage_percent}% coverage) - processing all"
|
|
)
|
|
|
|
# Process in batches until no more missing embeddings
|
|
while True:
|
|
result = db_client.backfill_missing_embeddings(batch_size=100)
|
|
|
|
total_processed += result["processed"]
|
|
total_success += result["success"]
|
|
total_failed += result["failed"]
|
|
|
|
if result["processed"] == 0:
|
|
# No more missing embeddings
|
|
break
|
|
|
|
if result["success"] == 0 and result["processed"] > 0:
|
|
# All attempts in this batch failed - stop to avoid infinite loop
|
|
logger.error(
|
|
f"All {result['processed']} embedding attempts failed - stopping backfill"
|
|
)
|
|
break
|
|
|
|
# Small delay between batches to avoid rate limits
|
|
time.sleep(1)
|
|
|
|
logger.info(
|
|
f"Embedding backfill completed: {total_success}/{total_processed} succeeded, "
|
|
f"{total_failed} failed"
|
|
)
|
|
|
|
# Clean up orphaned embeddings for blocks and docs
|
|
logger.info("Running cleanup for orphaned embeddings (blocks/docs)...")
|
|
cleanup_result = db_client.cleanup_orphaned_embeddings()
|
|
cleanup_totals = cleanup_result.get("totals", {})
|
|
cleanup_deleted = cleanup_totals.get("deleted", 0)
|
|
|
|
if cleanup_deleted > 0:
|
|
logger.info(f"Cleanup completed: deleted {cleanup_deleted} orphaned embeddings")
|
|
by_type = cleanup_result.get("by_type", {})
|
|
for content_type, type_result in by_type.items():
|
|
if type_result.get("deleted", 0) > 0:
|
|
logger.info(
|
|
f"{content_type}: deleted {type_result['deleted']} orphaned embeddings"
|
|
)
|
|
else:
|
|
logger.info("Cleanup completed: no orphaned embeddings found")
|
|
|
|
return {
|
|
"backfill": {
|
|
"processed": total_processed,
|
|
"success": total_success,
|
|
"failed": total_failed,
|
|
},
|
|
"cleanup": {
|
|
"deleted": cleanup_deleted,
|
|
},
|
|
}
|
|
|
|
|
|
# Monitoring functions are now imported from monitoring module
|
|
|
|
|
|
class Jobstores(Enum):
|
|
EXECUTION = "execution"
|
|
BATCHED_NOTIFICATIONS = "batched_notifications"
|
|
WEEKLY_NOTIFICATIONS = "weekly_notifications"
|
|
|
|
|
|
class GraphExecutionJobArgs(BaseModel):
|
|
schedule_id: str | None = None
|
|
user_id: str
|
|
graph_id: str
|
|
graph_version: int
|
|
agent_name: str | None = None
|
|
cron: str
|
|
input_data: GraphInput
|
|
input_credentials: dict[str, CredentialsMetaInput] = Field(default_factory=dict)
|
|
|
|
|
|
class GraphExecutionJobInfo(GraphExecutionJobArgs):
|
|
id: str
|
|
name: str
|
|
next_run_time: str
|
|
timezone: str = Field(default="UTC", description="Timezone used for scheduling")
|
|
|
|
@staticmethod
|
|
def from_db(
|
|
job_args: GraphExecutionJobArgs, job_obj: JobObj
|
|
) -> "GraphExecutionJobInfo":
|
|
# Extract timezone from the trigger if it's a CronTrigger
|
|
timezone_str = "UTC"
|
|
if hasattr(job_obj.trigger, "timezone"):
|
|
timezone_str = str(job_obj.trigger.timezone)
|
|
|
|
return GraphExecutionJobInfo(
|
|
id=job_obj.id,
|
|
name=job_obj.name,
|
|
next_run_time=job_obj.next_run_time.isoformat(),
|
|
timezone=timezone_str,
|
|
**job_args.model_dump(),
|
|
)
|
|
|
|
|
|
class NotificationJobInfo(NotificationJobArgs):
|
|
id: str
|
|
name: str
|
|
next_run_time: str
|
|
|
|
@staticmethod
|
|
def from_db(
|
|
job_args: NotificationJobArgs, job_obj: JobObj
|
|
) -> "NotificationJobInfo":
|
|
return NotificationJobInfo(
|
|
id=job_obj.id,
|
|
name=job_obj.name,
|
|
next_run_time=job_obj.next_run_time.isoformat(),
|
|
**job_args.model_dump(),
|
|
)
|
|
|
|
|
|
class Scheduler(AppService):
|
|
scheduler: BackgroundScheduler
|
|
|
|
def __init__(self, register_system_tasks: bool = True):
|
|
self.register_system_tasks = register_system_tasks
|
|
|
|
@classmethod
|
|
def get_port(cls) -> int:
|
|
return config.execution_scheduler_port
|
|
|
|
@classmethod
|
|
def db_pool_size(cls) -> int:
|
|
return config.scheduler_db_pool_size
|
|
|
|
async def health_check(self) -> str:
|
|
# Thread-safe health check with proper initialization handling
|
|
if not hasattr(self, "scheduler"):
|
|
raise UnhealthyServiceError("Scheduler is still initializing")
|
|
|
|
# Check if we're in the middle of cleanup
|
|
if self._shutting_down:
|
|
return await super().health_check()
|
|
|
|
# Normal operation - check if scheduler is running
|
|
if not self.scheduler.running:
|
|
raise UnhealthyServiceError("Scheduler is not running")
|
|
|
|
return await super().health_check()
|
|
|
|
def run_service(self):
|
|
load_dotenv()
|
|
|
|
# Initialize the event loop for async jobs
|
|
global _event_loop
|
|
_event_loop = asyncio.new_event_loop()
|
|
|
|
# Use daemon thread since it should die with the main service
|
|
global _event_loop_thread
|
|
_event_loop_thread = threading.Thread(
|
|
target=_event_loop.run_forever, daemon=True, name="SchedulerEventLoop"
|
|
)
|
|
_event_loop_thread.start()
|
|
|
|
db_schema, db_url = _extract_schema_from_url(os.getenv("DIRECT_URL"))
|
|
# Configure executors to limit concurrency without skipping jobs
|
|
from apscheduler.executors.pool import ThreadPoolExecutor
|
|
|
|
self.scheduler = BackgroundScheduler(
|
|
executors={
|
|
"default": ThreadPoolExecutor(
|
|
max_workers=self.db_pool_size()
|
|
), # Match DB pool size to prevent resource contention
|
|
},
|
|
job_defaults={
|
|
"coalesce": True, # Skip redundant missed jobs - just run the latest
|
|
"max_instances": 1000, # Effectively unlimited - never drop executions
|
|
"misfire_grace_time": None, # No time limit for missed jobs
|
|
},
|
|
jobstores={
|
|
Jobstores.EXECUTION.value: SQLAlchemyJobStore(
|
|
engine=create_engine(
|
|
url=db_url,
|
|
pool_size=self.db_pool_size(),
|
|
max_overflow=0,
|
|
),
|
|
metadata=MetaData(schema=db_schema),
|
|
# this one is pre-existing so it keeps the
|
|
# default table name.
|
|
tablename="apscheduler_jobs",
|
|
),
|
|
Jobstores.BATCHED_NOTIFICATIONS.value: SQLAlchemyJobStore(
|
|
engine=create_engine(
|
|
url=db_url,
|
|
pool_size=self.db_pool_size(),
|
|
max_overflow=0,
|
|
),
|
|
metadata=MetaData(schema=db_schema),
|
|
tablename="apscheduler_jobs_batched_notifications",
|
|
),
|
|
# These don't really need persistence
|
|
Jobstores.WEEKLY_NOTIFICATIONS.value: MemoryJobStore(),
|
|
},
|
|
logger=apscheduler_logger,
|
|
timezone=ZoneInfo("UTC"),
|
|
)
|
|
|
|
if self.register_system_tasks:
|
|
# Notification PROCESS WEEKLY SUMMARY
|
|
# Runs every Monday at 9 AM UTC
|
|
self.scheduler.add_job(
|
|
process_weekly_summary,
|
|
CronTrigger.from_crontab("0 9 * * 1"),
|
|
id="process_weekly_summary",
|
|
kwargs={},
|
|
replace_existing=True,
|
|
jobstore=Jobstores.WEEKLY_NOTIFICATIONS.value,
|
|
)
|
|
|
|
# Notification PROCESS EXISTING BATCHES
|
|
# self.scheduler.add_job(
|
|
# process_existing_batches,
|
|
# id="process_existing_batches",
|
|
# CronTrigger.from_crontab("0 12 * * 5"),
|
|
# replace_existing=True,
|
|
# jobstore=Jobstores.BATCHED_NOTIFICATIONS.value,
|
|
# )
|
|
|
|
# Notification LATE EXECUTIONS ALERT
|
|
self.scheduler.add_job(
|
|
report_late_executions,
|
|
id="report_late_executions",
|
|
trigger="interval",
|
|
replace_existing=True,
|
|
seconds=config.execution_late_notification_threshold_secs,
|
|
jobstore=Jobstores.EXECUTION.value,
|
|
)
|
|
|
|
# Block Error Rate Monitoring
|
|
self.scheduler.add_job(
|
|
report_block_error_rates,
|
|
id="report_block_error_rates",
|
|
trigger="interval",
|
|
replace_existing=True,
|
|
seconds=config.block_error_rate_check_interval_secs,
|
|
jobstore=Jobstores.EXECUTION.value,
|
|
)
|
|
|
|
# Cloud Storage Cleanup - configurable interval
|
|
self.scheduler.add_job(
|
|
cleanup_expired_files,
|
|
id="cleanup_expired_files",
|
|
trigger="interval",
|
|
replace_existing=True,
|
|
seconds=config.cloud_storage_cleanup_interval_hours
|
|
* 3600, # Convert hours to seconds
|
|
jobstore=Jobstores.EXECUTION.value,
|
|
)
|
|
|
|
# OAuth Token Cleanup - configurable interval
|
|
self.scheduler.add_job(
|
|
cleanup_oauth_tokens,
|
|
id="cleanup_oauth_tokens",
|
|
trigger="interval",
|
|
replace_existing=True,
|
|
seconds=config.oauth_token_cleanup_interval_hours
|
|
* 3600, # Convert hours to seconds
|
|
jobstore=Jobstores.EXECUTION.value,
|
|
)
|
|
|
|
# Execution Accuracy Monitoring - configurable interval
|
|
self.scheduler.add_job(
|
|
execution_accuracy_alerts,
|
|
id="report_execution_accuracy_alerts",
|
|
trigger="interval",
|
|
replace_existing=True,
|
|
seconds=config.execution_accuracy_check_interval_hours
|
|
* 3600, # Convert hours to seconds
|
|
jobstore=Jobstores.EXECUTION.value,
|
|
)
|
|
|
|
# Embedding Coverage - Every 6 hours
|
|
# Ensures all approved agents have embeddings for hybrid search
|
|
# Critical: missing embeddings = agents invisible in search
|
|
self.scheduler.add_job(
|
|
ensure_embeddings_coverage,
|
|
id="ensure_embeddings_coverage",
|
|
trigger="interval",
|
|
hours=6,
|
|
replace_existing=True,
|
|
max_instances=1, # Prevent overlapping runs
|
|
jobstore=Jobstores.EXECUTION.value,
|
|
)
|
|
|
|
# Block Description Optimization - Every 24 hours
|
|
# Generates concise LLM-optimized block descriptions for
|
|
# agent generation. Only processes blocks missing descriptions.
|
|
self.scheduler.add_job(
|
|
optimize_block_descriptions,
|
|
id="optimize_block_descriptions",
|
|
trigger="interval",
|
|
hours=24,
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
jobstore=Jobstores.EXECUTION.value,
|
|
)
|
|
|
|
self.scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
|
|
self.scheduler.add_listener(job_missed_listener, EVENT_JOB_MISSED)
|
|
self.scheduler.add_listener(job_max_instances_listener, EVENT_JOB_MAX_INSTANCES)
|
|
self.scheduler.start()
|
|
|
|
# Run embedding backfill immediately on startup
|
|
# This ensures blocks/docs are searchable right away, not after 6 hours
|
|
# Safe to run on multiple pods - uses upserts and checks for existing embeddings
|
|
if self.register_system_tasks:
|
|
logger.info("Running embedding backfill on startup...")
|
|
try:
|
|
result = ensure_embeddings_coverage()
|
|
logger.info(f"Startup embedding backfill complete: {result}")
|
|
except Exception as e:
|
|
logger.error(f"Startup embedding backfill failed: {e}")
|
|
# Don't fail startup - the scheduled job will retry later
|
|
|
|
# Keep the service running since BackgroundScheduler doesn't block
|
|
super().run_service()
|
|
|
|
def cleanup(self):
|
|
if self.scheduler:
|
|
logger.info("⏳ Shutting down scheduler...")
|
|
self.scheduler.shutdown(wait=True)
|
|
|
|
global _event_loop
|
|
if _event_loop:
|
|
logger.info("⏳ Closing event loop...")
|
|
_event_loop.call_soon_threadsafe(_event_loop.stop)
|
|
|
|
global _event_loop_thread
|
|
if _event_loop_thread:
|
|
logger.info("⏳ Waiting for event loop thread to finish...")
|
|
_event_loop_thread.join(timeout=SCHEDULER_OPERATION_TIMEOUT_SECONDS)
|
|
|
|
super().cleanup()
|
|
|
|
@expose
|
|
def add_graph_execution_schedule(
|
|
self,
|
|
user_id: str,
|
|
graph_id: str,
|
|
graph_version: int,
|
|
cron: str,
|
|
input_data: GraphInput,
|
|
input_credentials: dict[str, CredentialsMetaInput],
|
|
name: Optional[str] = None,
|
|
user_timezone: str | None = None,
|
|
) -> GraphExecutionJobInfo:
|
|
# Validate the graph before scheduling to prevent runtime failures
|
|
# We don't need the return value, just want the validation to run
|
|
run_async(
|
|
execution_utils.validate_and_construct_node_execution_input(
|
|
graph_id=graph_id,
|
|
user_id=user_id,
|
|
graph_inputs=input_data,
|
|
graph_version=graph_version,
|
|
graph_credentials_inputs=input_credentials,
|
|
)
|
|
)
|
|
|
|
# Use provided timezone or default to UTC
|
|
# Note: Timezone should be passed from the client to avoid database lookups
|
|
if not user_timezone:
|
|
user_timezone = "UTC"
|
|
logger.warning(
|
|
f"No timezone provided for user {user_id}, using UTC for scheduling. "
|
|
f"Client should pass user's timezone for correct scheduling."
|
|
)
|
|
|
|
logger.info(
|
|
f"Scheduling job for user {user_id} with timezone {user_timezone} (cron: {cron})"
|
|
)
|
|
schedule_id = str(uuid.uuid4())
|
|
|
|
job_args = GraphExecutionJobArgs(
|
|
schedule_id=schedule_id,
|
|
user_id=user_id,
|
|
graph_id=graph_id,
|
|
graph_version=graph_version,
|
|
agent_name=name,
|
|
cron=cron,
|
|
input_data=input_data,
|
|
input_credentials=input_credentials,
|
|
)
|
|
job = self.scheduler.add_job(
|
|
execute_graph,
|
|
kwargs=job_args.model_dump(),
|
|
name=name,
|
|
trigger=CronTrigger.from_crontab(cron, timezone=user_timezone),
|
|
jobstore=Jobstores.EXECUTION.value,
|
|
replace_existing=True,
|
|
id=schedule_id,
|
|
)
|
|
logger.info(
|
|
f"Added job {job.id} with cron schedule '{cron}' in timezone {user_timezone}, input data: {input_data}"
|
|
)
|
|
return GraphExecutionJobInfo.from_db(job_args, job)
|
|
|
|
@expose
|
|
def delete_graph_execution_schedule(
|
|
self, schedule_id: str, user_id: str
|
|
) -> GraphExecutionJobInfo:
|
|
job = self.scheduler.get_job(schedule_id, jobstore=Jobstores.EXECUTION.value)
|
|
if not job:
|
|
raise NotFoundError(f"Job #{schedule_id} not found.")
|
|
|
|
job_args = GraphExecutionJobArgs(**job.kwargs)
|
|
if job_args.user_id != user_id:
|
|
raise NotAuthorizedError("User ID does not match the job's user ID")
|
|
|
|
logger.info(f"Deleting job {schedule_id}")
|
|
job.remove()
|
|
|
|
return GraphExecutionJobInfo.from_db(job_args, job)
|
|
|
|
@expose
|
|
def get_graph_execution_schedules(
|
|
self, graph_id: str | None = None, user_id: str | None = None
|
|
) -> list[GraphExecutionJobInfo]:
|
|
jobs: list[JobObj] = self.scheduler.get_jobs(jobstore=Jobstores.EXECUTION.value)
|
|
schedules = []
|
|
for job in jobs:
|
|
logger.debug(
|
|
f"Found job {job.id} with cron schedule {job.trigger} and args {job.kwargs}"
|
|
)
|
|
try:
|
|
job_args = GraphExecutionJobArgs.model_validate(job.kwargs)
|
|
except ValidationError:
|
|
continue
|
|
if (
|
|
job.next_run_time is not None
|
|
and (graph_id is None or job_args.graph_id == graph_id)
|
|
and (user_id is None or job_args.user_id == user_id)
|
|
):
|
|
schedules.append(GraphExecutionJobInfo.from_db(job_args, job))
|
|
return schedules
|
|
|
|
@expose
|
|
def execute_process_existing_batches(self, kwargs: dict):
|
|
process_existing_batches(**kwargs)
|
|
|
|
@expose
|
|
def execute_process_weekly_summary(self):
|
|
process_weekly_summary()
|
|
|
|
@expose
|
|
def execute_report_late_executions(self):
|
|
return report_late_executions()
|
|
|
|
@expose
|
|
def execute_report_block_error_rates(self):
|
|
return report_block_error_rates()
|
|
|
|
@expose
|
|
def execute_cleanup_expired_files(self):
|
|
"""Manually trigger cleanup of expired cloud storage files."""
|
|
return cleanup_expired_files()
|
|
|
|
@expose
|
|
def execute_cleanup_oauth_tokens(self):
|
|
"""Manually trigger cleanup of expired OAuth tokens."""
|
|
return cleanup_oauth_tokens()
|
|
|
|
@expose
|
|
def execute_report_execution_accuracy_alerts(self):
|
|
"""Manually trigger execution accuracy alert checking."""
|
|
return execution_accuracy_alerts()
|
|
|
|
@expose
|
|
def execute_ensure_embeddings_coverage(self):
|
|
"""Manually trigger embedding backfill for approved store agents."""
|
|
return ensure_embeddings_coverage()
|
|
|
|
|
|
class SchedulerClient(AppServiceClient):
|
|
@classmethod
|
|
def get_service_type(cls):
|
|
return Scheduler
|
|
|
|
add_execution_schedule = endpoint_to_async(Scheduler.add_graph_execution_schedule)
|
|
delete_schedule = endpoint_to_async(Scheduler.delete_graph_execution_schedule)
|
|
get_execution_schedules = endpoint_to_async(Scheduler.get_graph_execution_schedules)
|