From aef6f57cfd5feec005c51710161d0ee04f8721f2 Mon Sep 17 00:00:00 2001 From: Otto Date: Tue, 3 Feb 2026 09:54:49 +0000 Subject: [PATCH] fix(scheduler): route db calls through DatabaseManager (#11941) ## Summary Routes `increment_onboarding_runs` and `cleanup_expired_oauth_tokens` through the DatabaseManager RPC client instead of calling Prisma directly. ## Problem The Scheduler service never connects its Prisma client. While `add_graph_execution()` in `utils.py` has a fallback that routes through DatabaseManager when Prisma isn't connected, subsequent calls in the scheduler were hitting Prisma directly: - `increment_onboarding_runs()` after successful graph execution - `cleanup_expired_oauth_tokens()` in the scheduled job These threw `ClientNotConnectedError`, caught by generic exception handlers but spamming Sentry (~696K events since December per the original analysis in #11926). ## Solution Follow the same pattern as `utils.py`: 1. Add `cleanup_expired_oauth_tokens` to `DatabaseManager` and `DatabaseManagerAsyncClient` 2. Update scheduler to use `get_database_manager_async_client()` for both calls ## Changes - **database.py**: Import and expose `cleanup_expired_oauth_tokens` in both manager classes - **scheduler.py**: Use `db.increment_onboarding_runs()` and `db.cleanup_expired_oauth_tokens()` via the async client ## Impact - Eliminates Sentry error spam from scheduler - Onboarding run counters now actually increment for scheduled executions - OAuth token cleanup now actually runs ## Testing Deploy to staging with scheduled graphs and verify: 1. No more `ClientNotConnectedError` in scheduler logs 2. `UserOnboarding.agentRuns` increments on scheduled runs 3. Expired OAuth tokens get cleaned up Refs: #11926 (original fix that was closed) --- .../backend/backend/executor/database.py | 7 +++++++ .../backend/backend/executor/scheduler.py | 18 +++++++++++++----- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/autogpt_platform/backend/backend/executor/database.py b/autogpt_platform/backend/backend/executor/database.py index ae7474fc1d..d44439d51c 100644 --- a/autogpt_platform/backend/backend/executor/database.py +++ b/autogpt_platform/backend/backend/executor/database.py @@ -17,6 +17,7 @@ from backend.data.analytics import ( get_accuracy_trends_and_alerts, get_marketplace_graphs_for_monitoring, ) +from backend.data.auth.oauth import cleanup_expired_oauth_tokens from backend.data.credit import UsageTransactionMetadata, get_user_credit_model from backend.data.execution import ( create_graph_execution, @@ -219,6 +220,9 @@ class DatabaseManager(AppService): # Onboarding increment_onboarding_runs = _(increment_onboarding_runs) + # OAuth + cleanup_expired_oauth_tokens = _(cleanup_expired_oauth_tokens) + # Store get_store_agents = _(get_store_agents) get_store_agent_details = _(get_store_agent_details) @@ -349,6 +353,9 @@ class DatabaseManagerAsyncClient(AppServiceClient): # Onboarding increment_onboarding_runs = d.increment_onboarding_runs + # OAuth + cleanup_expired_oauth_tokens = d.cleanup_expired_oauth_tokens + # Store get_store_agents = d.get_store_agents get_store_agent_details = d.get_store_agent_details diff --git a/autogpt_platform/backend/backend/executor/scheduler.py b/autogpt_platform/backend/backend/executor/scheduler.py index 44b77fc018..cbdc441718 100644 --- a/autogpt_platform/backend/backend/executor/scheduler.py +++ b/autogpt_platform/backend/backend/executor/scheduler.py @@ -24,11 +24,9 @@ from dotenv import load_dotenv from pydantic import BaseModel, Field, ValidationError from sqlalchemy import MetaData, create_engine -from backend.data.auth.oauth import cleanup_expired_oauth_tokens from backend.data.block import BlockInput from backend.data.execution import GraphExecutionWithNodes from backend.data.model import CredentialsMetaInput -from backend.data.onboarding import increment_onboarding_runs from backend.executor import utils as execution_utils from backend.monitoring import ( NotificationJobArgs, @@ -38,7 +36,11 @@ from backend.monitoring import ( report_execution_accuracy_alerts, report_late_executions, ) -from backend.util.clients import get_database_manager_client, get_scheduler_client +from backend.util.clients import ( + get_database_manager_async_client, + get_database_manager_client, + get_scheduler_client, +) from backend.util.cloud_storage import cleanup_expired_files_async from backend.util.exceptions import ( GraphNotFoundError, @@ -148,6 +150,7 @@ def execute_graph(**kwargs): async def _execute_graph(**kwargs): args = GraphExecutionJobArgs(**kwargs) start_time = asyncio.get_event_loop().time() + db = get_database_manager_async_client() try: logger.info(f"Executing recurring job for graph #{args.graph_id}") graph_exec: GraphExecutionWithNodes = await execution_utils.add_graph_execution( @@ -157,7 +160,7 @@ async def _execute_graph(**kwargs): inputs=args.input_data, graph_credentials_inputs=args.input_credentials, ) - await increment_onboarding_runs(args.user_id) + await db.increment_onboarding_runs(args.user_id) elapsed = asyncio.get_event_loop().time() - start_time logger.info( f"Graph execution started with ID {graph_exec.id} for graph {args.graph_id} " @@ -246,8 +249,13 @@ def cleanup_expired_files(): def cleanup_oauth_tokens(): """Clean up expired OAuth tokens from the database.""" + # Wait for completion - run_async(cleanup_expired_oauth_tokens()) + async def _cleanup(): + db = get_database_manager_async_client() + return await db.cleanup_expired_oauth_tokens() + + run_async(_cleanup()) def execution_accuracy_alerts():