mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-03 19:35:15 -05:00
fix(scheduler): route db calls through DatabaseManager (#11941)
## Summary Routes `increment_onboarding_runs` and `cleanup_expired_oauth_tokens` through the DatabaseManager RPC client instead of calling Prisma directly. ## Problem The Scheduler service never connects its Prisma client. While `add_graph_execution()` in `utils.py` has a fallback that routes through DatabaseManager when Prisma isn't connected, subsequent calls in the scheduler were hitting Prisma directly: - `increment_onboarding_runs()` after successful graph execution - `cleanup_expired_oauth_tokens()` in the scheduled job These threw `ClientNotConnectedError`, caught by generic exception handlers but spamming Sentry (~696K events since December per the original analysis in #11926). ## Solution Follow the same pattern as `utils.py`: 1. Add `cleanup_expired_oauth_tokens` to `DatabaseManager` and `DatabaseManagerAsyncClient` 2. Update scheduler to use `get_database_manager_async_client()` for both calls ## Changes - **database.py**: Import and expose `cleanup_expired_oauth_tokens` in both manager classes - **scheduler.py**: Use `db.increment_onboarding_runs()` and `db.cleanup_expired_oauth_tokens()` via the async client ## Impact - Eliminates Sentry error spam from scheduler - Onboarding run counters now actually increment for scheduled executions - OAuth token cleanup now actually runs ## Testing Deploy to staging with scheduled graphs and verify: 1. No more `ClientNotConnectedError` in scheduler logs 2. `UserOnboarding.agentRuns` increments on scheduled runs 3. Expired OAuth tokens get cleaned up Refs: #11926 (original fix that was closed)
This commit is contained in:
@@ -17,6 +17,7 @@ from backend.data.analytics import (
|
||||
get_accuracy_trends_and_alerts,
|
||||
get_marketplace_graphs_for_monitoring,
|
||||
)
|
||||
from backend.data.auth.oauth import cleanup_expired_oauth_tokens
|
||||
from backend.data.credit import UsageTransactionMetadata, get_user_credit_model
|
||||
from backend.data.execution import (
|
||||
create_graph_execution,
|
||||
@@ -219,6 +220,9 @@ class DatabaseManager(AppService):
|
||||
# Onboarding
|
||||
increment_onboarding_runs = _(increment_onboarding_runs)
|
||||
|
||||
# OAuth
|
||||
cleanup_expired_oauth_tokens = _(cleanup_expired_oauth_tokens)
|
||||
|
||||
# Store
|
||||
get_store_agents = _(get_store_agents)
|
||||
get_store_agent_details = _(get_store_agent_details)
|
||||
@@ -349,6 +353,9 @@ class DatabaseManagerAsyncClient(AppServiceClient):
|
||||
# Onboarding
|
||||
increment_onboarding_runs = d.increment_onboarding_runs
|
||||
|
||||
# OAuth
|
||||
cleanup_expired_oauth_tokens = d.cleanup_expired_oauth_tokens
|
||||
|
||||
# Store
|
||||
get_store_agents = d.get_store_agents
|
||||
get_store_agent_details = d.get_store_agent_details
|
||||
|
||||
@@ -24,11 +24,9 @@ from dotenv import load_dotenv
|
||||
from pydantic import BaseModel, Field, ValidationError
|
||||
from sqlalchemy import MetaData, create_engine
|
||||
|
||||
from backend.data.auth.oauth import cleanup_expired_oauth_tokens
|
||||
from backend.data.block import BlockInput
|
||||
from backend.data.execution import GraphExecutionWithNodes
|
||||
from backend.data.model import CredentialsMetaInput
|
||||
from backend.data.onboarding import increment_onboarding_runs
|
||||
from backend.executor import utils as execution_utils
|
||||
from backend.monitoring import (
|
||||
NotificationJobArgs,
|
||||
@@ -38,7 +36,11 @@ from backend.monitoring import (
|
||||
report_execution_accuracy_alerts,
|
||||
report_late_executions,
|
||||
)
|
||||
from backend.util.clients import get_database_manager_client, get_scheduler_client
|
||||
from backend.util.clients import (
|
||||
get_database_manager_async_client,
|
||||
get_database_manager_client,
|
||||
get_scheduler_client,
|
||||
)
|
||||
from backend.util.cloud_storage import cleanup_expired_files_async
|
||||
from backend.util.exceptions import (
|
||||
GraphNotFoundError,
|
||||
@@ -148,6 +150,7 @@ def execute_graph(**kwargs):
|
||||
async def _execute_graph(**kwargs):
|
||||
args = GraphExecutionJobArgs(**kwargs)
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
db = get_database_manager_async_client()
|
||||
try:
|
||||
logger.info(f"Executing recurring job for graph #{args.graph_id}")
|
||||
graph_exec: GraphExecutionWithNodes = await execution_utils.add_graph_execution(
|
||||
@@ -157,7 +160,7 @@ async def _execute_graph(**kwargs):
|
||||
inputs=args.input_data,
|
||||
graph_credentials_inputs=args.input_credentials,
|
||||
)
|
||||
await increment_onboarding_runs(args.user_id)
|
||||
await db.increment_onboarding_runs(args.user_id)
|
||||
elapsed = asyncio.get_event_loop().time() - start_time
|
||||
logger.info(
|
||||
f"Graph execution started with ID {graph_exec.id} for graph {args.graph_id} "
|
||||
@@ -246,8 +249,13 @@ def cleanup_expired_files():
|
||||
|
||||
def cleanup_oauth_tokens():
|
||||
"""Clean up expired OAuth tokens from the database."""
|
||||
|
||||
# Wait for completion
|
||||
run_async(cleanup_expired_oauth_tokens())
|
||||
async def _cleanup():
|
||||
db = get_database_manager_async_client()
|
||||
return await db.cleanup_expired_oauth_tokens()
|
||||
|
||||
run_async(_cleanup())
|
||||
|
||||
|
||||
def execution_accuracy_alerts():
|
||||
|
||||
Reference in New Issue
Block a user