diff --git a/autogpt_platform/backend/backend/api/features/integrations/router.py b/autogpt_platform/backend/backend/api/features/integrations/router.py index f5dd8c092b..36585b14b5 100644 --- a/autogpt_platform/backend/backend/api/features/integrations/router.py +++ b/autogpt_platform/backend/backend/api/features/integrations/router.py @@ -35,11 +35,7 @@ from backend.data.model import ( OAuth2Credentials, UserIntegrations, ) -from backend.data.onboarding import ( - OnboardingStep, - complete_onboarding_step, - increment_runs, -) +from backend.data.onboarding import OnboardingStep, complete_onboarding_step from backend.data.user import get_user_integrations from backend.executor.utils import add_graph_execution from backend.integrations.ayrshare import AyrshareClient, SocialPlatform @@ -378,7 +374,6 @@ async def webhook_ingress_generic( return await complete_onboarding_step(user_id, OnboardingStep.TRIGGER_WEBHOOK) - await increment_runs(user_id) # Execute all triggers concurrently for better performance tasks = [] diff --git a/autogpt_platform/backend/backend/api/features/library/routes/presets.py b/autogpt_platform/backend/backend/api/features/library/routes/presets.py index cd4c04e0f2..98f2cb5f15 100644 --- a/autogpt_platform/backend/backend/api/features/library/routes/presets.py +++ b/autogpt_platform/backend/backend/api/features/library/routes/presets.py @@ -8,7 +8,6 @@ from backend.data.execution import GraphExecutionMeta from backend.data.graph import get_graph from backend.data.integrations import get_webhook from backend.data.model import CredentialsMetaInput -from backend.data.onboarding import increment_runs from backend.executor.utils import add_graph_execution, make_node_credentials_input_map from backend.integrations.creds_manager import IntegrationCredentialsManager from backend.integrations.webhooks import get_webhook_manager @@ -403,8 +402,6 @@ async def execute_preset( merged_node_input = preset.inputs | inputs merged_credential_inputs = preset.credentials | credential_inputs - await increment_runs(user_id) - return await add_graph_execution( user_id=user_id, graph_id=preset.graph_id, diff --git a/autogpt_platform/backend/backend/api/features/v1.py b/autogpt_platform/backend/backend/api/features/v1.py index 9b05b4755f..661e8ff7f2 100644 --- a/autogpt_platform/backend/backend/api/features/v1.py +++ b/autogpt_platform/backend/backend/api/features/v1.py @@ -64,7 +64,6 @@ from backend.data.onboarding import ( complete_re_run_agent, get_recommended_agents, get_user_onboarding, - increment_runs, onboarding_enabled, reset_user_onboarding, update_user_onboarding, @@ -975,7 +974,6 @@ async def execute_graph( # Record successful graph execution record_graph_execution(graph_id=graph_id, status="success", user_id=user_id) record_graph_operation(operation="execute", status="success") - await increment_runs(user_id) await complete_re_run_agent(user_id, graph_id) if source == "library": await complete_onboarding_step( diff --git a/autogpt_platform/backend/backend/data/onboarding.py b/autogpt_platform/backend/backend/data/onboarding.py index cc63b89afd..6a842d1022 100644 --- a/autogpt_platform/backend/backend/data/onboarding.py +++ b/autogpt_platform/backend/backend/data/onboarding.py @@ -334,7 +334,7 @@ async def _get_user_timezone(user_id: str) -> str: return get_user_timezone_or_utc(user.timezone if user else None) -async def increment_runs(user_id: str): +async def increment_onboarding_runs(user_id: str): """ Increment a user's run counters and trigger any onboarding milestones. """ diff --git a/autogpt_platform/backend/backend/executor/database.py b/autogpt_platform/backend/backend/executor/database.py index af68bf526d..9848948bff 100644 --- a/autogpt_platform/backend/backend/executor/database.py +++ b/autogpt_platform/backend/backend/executor/database.py @@ -20,6 +20,7 @@ from backend.data.execution import ( get_execution_kv_data, get_execution_outputs_by_node_exec_id, get_frequently_executed_graphs, + get_graph_execution, get_graph_execution_meta, get_graph_executions, get_graph_executions_count, @@ -57,6 +58,7 @@ from backend.data.notifications import ( get_user_notification_oldest_message_in_batch, remove_notifications_from_batch, ) +from backend.data.onboarding import increment_onboarding_runs from backend.data.user import ( get_active_user_ids_in_timerange, get_user_by_id, @@ -140,6 +142,7 @@ class DatabaseManager(AppService): get_child_graph_executions = _(get_child_graph_executions) get_graph_executions = _(get_graph_executions) get_graph_executions_count = _(get_graph_executions_count) + get_graph_execution = _(get_graph_execution) get_graph_execution_meta = _(get_graph_execution_meta) create_graph_execution = _(create_graph_execution) get_node_execution = _(get_node_execution) @@ -204,6 +207,9 @@ class DatabaseManager(AppService): add_store_agent_to_library = _(add_store_agent_to_library) validate_graph_execution_permissions = _(validate_graph_execution_permissions) + # Onboarding + increment_onboarding_runs = _(increment_onboarding_runs) + # Store get_store_agents = _(get_store_agents) get_store_agent_details = _(get_store_agent_details) @@ -274,6 +280,7 @@ class DatabaseManagerAsyncClient(AppServiceClient): get_graph = d.get_graph get_graph_metadata = d.get_graph_metadata get_graph_settings = d.get_graph_settings + get_graph_execution = d.get_graph_execution get_graph_execution_meta = d.get_graph_execution_meta get_node = d.get_node get_node_execution = d.get_node_execution @@ -318,6 +325,9 @@ class DatabaseManagerAsyncClient(AppServiceClient): add_store_agent_to_library = d.add_store_agent_to_library validate_graph_execution_permissions = d.validate_graph_execution_permissions + # Onboarding + increment_onboarding_runs = d.increment_onboarding_runs + # Store get_store_agents = d.get_store_agents get_store_agent_details = d.get_store_agent_details diff --git a/autogpt_platform/backend/backend/executor/scheduler.py b/autogpt_platform/backend/backend/executor/scheduler.py index 06c50bf82e..963c901fd6 100644 --- a/autogpt_platform/backend/backend/executor/scheduler.py +++ b/autogpt_platform/backend/backend/executor/scheduler.py @@ -27,7 +27,6 @@ from backend.data.auth.oauth import cleanup_expired_oauth_tokens from backend.data.block import BlockInput from backend.data.execution import GraphExecutionWithNodes from backend.data.model import CredentialsMetaInput -from backend.data.onboarding import increment_runs from backend.executor import utils as execution_utils from backend.monitoring import ( NotificationJobArgs, @@ -156,7 +155,6 @@ async def _execute_graph(**kwargs): inputs=args.input_data, graph_credentials_inputs=args.input_credentials, ) - await increment_runs(args.user_id) elapsed = asyncio.get_event_loop().time() - start_time logger.info( f"Graph execution started with ID {graph_exec.id} for graph {args.graph_id} " diff --git a/autogpt_platform/backend/backend/executor/utils.py b/autogpt_platform/backend/backend/executor/utils.py index 1fb2b9404f..25f0389e99 100644 --- a/autogpt_platform/backend/backend/executor/utils.py +++ b/autogpt_platform/backend/backend/executor/utils.py @@ -10,6 +10,7 @@ from pydantic import BaseModel, JsonValue, ValidationError from backend.data import execution as execution_db from backend.data import graph as graph_db +from backend.data import onboarding as onboarding_db from backend.data import user as user_db from backend.data.block import ( Block, @@ -31,7 +32,6 @@ from backend.data.execution import ( GraphExecutionStats, GraphExecutionWithNodes, NodesInputMasks, - get_graph_execution, ) from backend.data.graph import GraphModel, Node from backend.data.model import USER_TIMEZONE_NOT_SET, CredentialsMetaInput @@ -809,13 +809,14 @@ async def add_graph_execution( edb = execution_db udb = user_db gdb = graph_db + odb = onboarding_db else: - edb = udb = gdb = get_database_manager_async_client() + edb = udb = gdb = odb = get_database_manager_async_client() # Get or create the graph execution if graph_exec_id: # Resume existing execution - graph_exec = await get_graph_execution( + graph_exec = await edb.get_graph_execution( user_id=user_id, execution_id=graph_exec_id, include_node_executions=True, @@ -891,6 +892,7 @@ async def add_graph_execution( ) logger.info(f"Publishing execution {graph_exec.id} to execution queue") + # Publish to execution queue for executor to pick up exec_queue = await get_async_execution_queue() await exec_queue.publish_message( routing_key=GRAPH_EXECUTION_ROUTING_KEY, @@ -899,14 +901,12 @@ async def add_graph_execution( ) logger.info(f"Published execution {graph_exec.id} to RabbitMQ queue") + # Update execution status to QUEUED graph_exec.status = ExecutionStatus.QUEUED await edb.update_graph_execution_stats( graph_exec_id=graph_exec.id, status=graph_exec.status, ) - await get_async_execution_event_bus().publish(graph_exec) - - return graph_exec except BaseException as e: err = str(e) or type(e).__name__ if not graph_exec: @@ -927,6 +927,24 @@ async def add_graph_execution( ) raise + try: + await get_async_execution_event_bus().publish(graph_exec) + logger.info(f"Published update for execution #{graph_exec.id} to event bus") + except Exception as e: + logger.error( + f"Failed to publish execution event for graph exec #{graph_exec.id}: {e}" + ) + + try: + await odb.increment_onboarding_runs(user_id) + logger.info( + f"Incremented user #{user_id} onboarding runs for exec #{graph_exec.id}" + ) + except Exception as e: + logger.error(f"Failed to increment onboarding runs for user #{user_id}: {e}") + + return graph_exec + # ============ Execution Output Helpers ============ #