Compare commits

..

2 Commits

Author SHA1 Message Date
Reinier van der Leer
0676ecbe53 remove now-redundant calls to increment_onboarding_runs 2026-01-14 16:27:12 +01:00
Reinier van der Leer
a649d73060 make all DB calls in add_graph_execution go through DB manager 2026-01-14 16:14:04 +01:00
7 changed files with 36 additions and 20 deletions

View File

@@ -35,11 +35,7 @@ from backend.data.model import (
OAuth2Credentials,
UserIntegrations,
)
from backend.data.onboarding import (
OnboardingStep,
complete_onboarding_step,
increment_runs,
)
from backend.data.onboarding import OnboardingStep, complete_onboarding_step
from backend.data.user import get_user_integrations
from backend.executor.utils import add_graph_execution
from backend.integrations.ayrshare import AyrshareClient, SocialPlatform
@@ -378,7 +374,6 @@ async def webhook_ingress_generic(
return
await complete_onboarding_step(user_id, OnboardingStep.TRIGGER_WEBHOOK)
await increment_runs(user_id)
# Execute all triggers concurrently for better performance
tasks = []

View File

@@ -8,7 +8,6 @@ from backend.data.execution import GraphExecutionMeta
from backend.data.graph import get_graph
from backend.data.integrations import get_webhook
from backend.data.model import CredentialsMetaInput
from backend.data.onboarding import increment_runs
from backend.executor.utils import add_graph_execution, make_node_credentials_input_map
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.integrations.webhooks import get_webhook_manager
@@ -403,8 +402,6 @@ async def execute_preset(
merged_node_input = preset.inputs | inputs
merged_credential_inputs = preset.credentials | credential_inputs
await increment_runs(user_id)
return await add_graph_execution(
user_id=user_id,
graph_id=preset.graph_id,

View File

@@ -64,7 +64,6 @@ from backend.data.onboarding import (
complete_re_run_agent,
get_recommended_agents,
get_user_onboarding,
increment_runs,
onboarding_enabled,
reset_user_onboarding,
update_user_onboarding,
@@ -975,7 +974,6 @@ async def execute_graph(
# Record successful graph execution
record_graph_execution(graph_id=graph_id, status="success", user_id=user_id)
record_graph_operation(operation="execute", status="success")
await increment_runs(user_id)
await complete_re_run_agent(user_id, graph_id)
if source == "library":
await complete_onboarding_step(

View File

@@ -334,7 +334,7 @@ async def _get_user_timezone(user_id: str) -> str:
return get_user_timezone_or_utc(user.timezone if user else None)
async def increment_runs(user_id: str):
async def increment_onboarding_runs(user_id: str):
"""
Increment a user's run counters and trigger any onboarding milestones.
"""

View File

@@ -20,6 +20,7 @@ from backend.data.execution import (
get_execution_kv_data,
get_execution_outputs_by_node_exec_id,
get_frequently_executed_graphs,
get_graph_execution,
get_graph_execution_meta,
get_graph_executions,
get_graph_executions_count,
@@ -57,6 +58,7 @@ from backend.data.notifications import (
get_user_notification_oldest_message_in_batch,
remove_notifications_from_batch,
)
from backend.data.onboarding import increment_onboarding_runs
from backend.data.user import (
get_active_user_ids_in_timerange,
get_user_by_id,
@@ -140,6 +142,7 @@ class DatabaseManager(AppService):
get_child_graph_executions = _(get_child_graph_executions)
get_graph_executions = _(get_graph_executions)
get_graph_executions_count = _(get_graph_executions_count)
get_graph_execution = _(get_graph_execution)
get_graph_execution_meta = _(get_graph_execution_meta)
create_graph_execution = _(create_graph_execution)
get_node_execution = _(get_node_execution)
@@ -204,6 +207,9 @@ class DatabaseManager(AppService):
add_store_agent_to_library = _(add_store_agent_to_library)
validate_graph_execution_permissions = _(validate_graph_execution_permissions)
# Onboarding
increment_onboarding_runs = _(increment_onboarding_runs)
# Store
get_store_agents = _(get_store_agents)
get_store_agent_details = _(get_store_agent_details)
@@ -274,6 +280,7 @@ class DatabaseManagerAsyncClient(AppServiceClient):
get_graph = d.get_graph
get_graph_metadata = d.get_graph_metadata
get_graph_settings = d.get_graph_settings
get_graph_execution = d.get_graph_execution
get_graph_execution_meta = d.get_graph_execution_meta
get_node = d.get_node
get_node_execution = d.get_node_execution
@@ -318,6 +325,9 @@ class DatabaseManagerAsyncClient(AppServiceClient):
add_store_agent_to_library = d.add_store_agent_to_library
validate_graph_execution_permissions = d.validate_graph_execution_permissions
# Onboarding
increment_onboarding_runs = d.increment_onboarding_runs
# Store
get_store_agents = d.get_store_agents
get_store_agent_details = d.get_store_agent_details

View File

@@ -27,7 +27,6 @@ from backend.data.auth.oauth import cleanup_expired_oauth_tokens
from backend.data.block import BlockInput
from backend.data.execution import GraphExecutionWithNodes
from backend.data.model import CredentialsMetaInput
from backend.data.onboarding import increment_runs
from backend.executor import utils as execution_utils
from backend.monitoring import (
NotificationJobArgs,
@@ -156,7 +155,6 @@ async def _execute_graph(**kwargs):
inputs=args.input_data,
graph_credentials_inputs=args.input_credentials,
)
await increment_runs(args.user_id)
elapsed = asyncio.get_event_loop().time() - start_time
logger.info(
f"Graph execution started with ID {graph_exec.id} for graph {args.graph_id} "

View File

@@ -10,6 +10,7 @@ from pydantic import BaseModel, JsonValue, ValidationError
from backend.data import execution as execution_db
from backend.data import graph as graph_db
from backend.data import onboarding as onboarding_db
from backend.data import user as user_db
from backend.data.block import (
Block,
@@ -31,7 +32,6 @@ from backend.data.execution import (
GraphExecutionStats,
GraphExecutionWithNodes,
NodesInputMasks,
get_graph_execution,
)
from backend.data.graph import GraphModel, Node
from backend.data.model import USER_TIMEZONE_NOT_SET, CredentialsMetaInput
@@ -809,13 +809,14 @@ async def add_graph_execution(
edb = execution_db
udb = user_db
gdb = graph_db
odb = onboarding_db
else:
edb = udb = gdb = get_database_manager_async_client()
edb = udb = gdb = odb = get_database_manager_async_client()
# Get or create the graph execution
if graph_exec_id:
# Resume existing execution
graph_exec = await get_graph_execution(
graph_exec = await edb.get_graph_execution(
user_id=user_id,
execution_id=graph_exec_id,
include_node_executions=True,
@@ -891,6 +892,7 @@ async def add_graph_execution(
)
logger.info(f"Publishing execution {graph_exec.id} to execution queue")
# Publish to execution queue for executor to pick up
exec_queue = await get_async_execution_queue()
await exec_queue.publish_message(
routing_key=GRAPH_EXECUTION_ROUTING_KEY,
@@ -899,14 +901,12 @@ async def add_graph_execution(
)
logger.info(f"Published execution {graph_exec.id} to RabbitMQ queue")
# Update execution status to QUEUED
graph_exec.status = ExecutionStatus.QUEUED
await edb.update_graph_execution_stats(
graph_exec_id=graph_exec.id,
status=graph_exec.status,
)
await get_async_execution_event_bus().publish(graph_exec)
return graph_exec
except BaseException as e:
err = str(e) or type(e).__name__
if not graph_exec:
@@ -927,6 +927,24 @@ async def add_graph_execution(
)
raise
try:
await get_async_execution_event_bus().publish(graph_exec)
logger.info(f"Published update for execution #{graph_exec.id} to event bus")
except Exception as e:
logger.error(
f"Failed to publish execution event for graph exec #{graph_exec.id}: {e}"
)
try:
await odb.increment_onboarding_runs(user_id)
logger.info(
f"Incremented user #{user_id} onboarding runs for exec #{graph_exec.id}"
)
except Exception as e:
logger.error(f"Failed to increment onboarding runs for user #{user_id}: {e}")
return graph_exec
# ============ Execution Output Helpers ============ #