From 7b951c977e512b39831888375db80bf97fa3bef3 Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Tue, 2 Dec 2025 16:55:55 +0700 Subject: [PATCH] feat(platform): implement graph-level Safe Mode toggle for HITL blocks (#11455) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary This PR implements a graph-level Safe Mode toggle system for Human-in-the-Loop (HITL) blocks. When Safe Mode is ON (default), HITL blocks require manual review before proceeding. When OFF, they execute automatically. ## ๐Ÿ”ง Backend Changes - **Database**: Added `metadata` JSON column to `AgentGraph` table with migration - **API**: Updated `execute_graph` endpoint to accept `safe_mode` parameter - **Execution**: Enhanced execution context to use graph metadata as default with API override capability - **Auto-detection**: Automatically populate `has_human_in_the_loop` for graphs containing HITL blocks - **Block Detection**: HITL block ID: `8b2a7b3c-6e9d-4a5f-8c1b-2e3f4a5b6c7d` ## ๐ŸŽจ Frontend Changes - **Component**: New `FloatingSafeModeToggle` with dual variants: - **White variant**: For library pages, integrates with action buttons - **Black variant**: For builders, floating positioned - **Integration**: Added toggles to both new/legacy builders and library pages - **API Integration**: Direct graph metadata updates via `usePutV1UpdateGraphVersion` - **Query Management**: React Query cache invalidation for consistent UI updates - **Conditional Display**: Toggle only appears when graph contains HITL blocks ## ๐Ÿ›  Technical Implementation - **Safe Mode ON** (default): HITL blocks require manual review before proceeding - **Safe Mode OFF**: HITL blocks execute automatically without intervention - **Priority**: Backend API `safe_mode` parameter takes precedence over graph metadata - **Detection**: Auto-populates `has_human_in_the_loop` metadata field - **Positioning**: Proper z-index and responsive positioning for floating elements ## ๐Ÿšง Known Issues (Work in Progress) ### High Priority - [ ] **Toggle state persistence**: Always shows "ON" regardless of actual state - query invalidation issue - [ ] **LibraryAgent metadata**: Missing metadata field causing TypeScript errors - [ ] **Tooltip z-index**: Still covered by some UI elements despite high z-index ### Medium Priority - [ ] **HITL detection**: Logic needs improvement for reliable block detection - [ ] **Error handling**: Removing HITL blocks from graph causes save errors - [ ] **TypeScript**: Fix type mismatches between GraphModel and LibraryAgent ### Low Priority - [ ] **Frontend API**: Add `safe_mode` parameter to execution calls once OpenAPI is regenerated - [ ] **Performance**: Consider debouncing rapid toggle clicks ## ๐Ÿงช Test Plan - [ ] Verify toggle appears only when graph has HITL blocks - [ ] Test toggle persistence across page refreshes - [ ] Confirm API calls update graph metadata correctly - [ ] Validate execution behavior respects safe mode setting - [ ] Check styling consistency across builder and library contexts ## ๐Ÿ”— Related - Addresses requirements for graph-level HITL configuration - Builds on existing FloatingReviewsPanel infrastructure - Integrates with existing graph metadata system ๐Ÿค– Generated with [Claude Code](https://claude.ai/code) --- .../backend/backend/blocks/agent.py | 9 +- .../backend/blocks/human_in_the_loop.py | 55 +++-- .../backend/backend/blocks/time_blocks.py | 23 +- .../backend/backend/data/block.py | 10 + .../backend/backend/data/credit_test.py | 6 +- .../backend/backend/data/execution.py | 33 +-- .../backend/backend/data/graph.py | 35 +++ .../backend/backend/data/human_review.py | 56 +---- .../backend/backend/data/human_review_test.py | 70 ++---- .../backend/backend/data/integrations.py | 14 +- .../backend/backend/data/model.py | 5 +- .../backend/backend/executor/database.py | 3 + .../backend/backend/executor/manager.py | 63 +++--- .../backend/backend/executor/utils.py | 76 +++---- .../backend/backend/executor/utils_test.py | 15 +- .../backend/backend/server/routers/v1.py | 58 ++++- .../executions/review/review_routes_test.py | 195 ++++++++++------- .../backend/backend/server/v2/library/db.py | 78 ++++++- .../backend/server/v2/library/db_test.py | 44 +++- .../backend/server/v2/library/model.py | 21 +- .../server/v2/library/routes/agents.py | 1 + .../backend/backend/util/cache.py | 7 +- autogpt_platform/backend/backend/util/test.py | 4 +- .../backend/backend/util/timezone_utils.py | 4 +- .../migration.sql | 2 + autogpt_platform/backend/schema.prisma | 48 ++-- .../backend/snapshots/grph_single | 1 + autogpt_platform/backend/snapshots/grphs_all | 1 + .../backend/snapshots/lib_agts_search | 6 + .../backend/test_requeue_integration.py | 3 +- .../build/components/FlowEditor/Flow/Flow.tsx | 29 ++- .../components/legacy-builder/Flow/Flow.tsx | 9 + .../RunDetailHeader/RunDetailHeader.tsx | 6 + .../RunDetailHeader/useRunDetailHeader.ts | 4 +- .../frontend/src/app/api/openapi.json | 82 +++++++ .../FloatingSafeModeToggle.tsx | 206 ++++++++++++++++++ .../FloatingReviewsPanel.tsx | 24 +- 37 files changed, 916 insertions(+), 390 deletions(-) create mode 100644 autogpt_platform/backend/migrations/20251128112407_add_library_agent_settings/migration.sql create mode 100644 autogpt_platform/frontend/src/components/molecules/FloatingSafeModeToggle/FloatingSafeModeToggle.tsx diff --git a/autogpt_platform/backend/backend/blocks/agent.py b/autogpt_platform/backend/backend/blocks/agent.py index 1abb5ade53..0efc0a3369 100644 --- a/autogpt_platform/backend/backend/blocks/agent.py +++ b/autogpt_platform/backend/backend/blocks/agent.py @@ -11,7 +11,7 @@ from backend.data.block import ( BlockType, get_block, ) -from backend.data.execution import ExecutionStatus, NodesInputMasks +from backend.data.execution import ExecutionContext, ExecutionStatus, NodesInputMasks from backend.data.model import NodeExecutionStats, SchemaField from backend.util.json import validate_with_jsonschema from backend.util.retry import func_retry @@ -72,9 +72,9 @@ class AgentExecutorBlock(Block): input_data: Input, *, graph_exec_id: str, + execution_context: ExecutionContext, **kwargs, ) -> BlockOutput: - from backend.executor import utils as execution_utils graph_exec = await execution_utils.add_graph_execution( @@ -83,8 +83,9 @@ class AgentExecutorBlock(Block): user_id=input_data.user_id, inputs=input_data.inputs, nodes_input_masks=input_data.nodes_input_masks, - parent_graph_exec_id=graph_exec_id, - is_sub_graph=True, # AgentExecutorBlock executions are always sub-graphs + execution_context=execution_context.model_copy( + update={"parent_execution_id": graph_exec_id}, + ), ) logger = execution_utils.LogMetadata( diff --git a/autogpt_platform/backend/backend/blocks/human_in_the_loop.py b/autogpt_platform/backend/backend/blocks/human_in_the_loop.py index 1dd5dbac9d..42c98b5146 100644 --- a/autogpt_platform/backend/backend/blocks/human_in_the_loop.py +++ b/autogpt_platform/backend/backend/blocks/human_in_the_loop.py @@ -9,8 +9,9 @@ from backend.data.block import ( BlockOutput, BlockSchemaInput, BlockSchemaOutput, + BlockType, ) -from backend.data.execution import ExecutionStatus +from backend.data.execution import ExecutionContext, ExecutionStatus from backend.data.human_review import ReviewResult from backend.data.model import SchemaField from backend.executor.manager import async_update_node_execution_status @@ -61,15 +62,15 @@ class HumanInTheLoopBlock(Block): categories={BlockCategory.BASIC}, input_schema=HumanInTheLoopBlock.Input, output_schema=HumanInTheLoopBlock.Output, + block_type=BlockType.HUMAN_IN_THE_LOOP, test_input={ "data": {"name": "John Doe", "age": 30}, "name": "User profile data", "editable": True, }, test_output=[ - ("reviewed_data", {"name": "John Doe", "age": 30}), ("status", "approved"), - ("review_message", ""), + ("reviewed_data", {"name": "John Doe", "age": 30}), ], test_mock={ "get_or_create_human_review": lambda *_args, **_kwargs: ReviewResult( @@ -80,9 +81,25 @@ class HumanInTheLoopBlock(Block): node_exec_id="test-node-exec-id", ), "update_node_execution_status": lambda *_args, **_kwargs: None, + "update_review_processed_status": lambda *_args, **_kwargs: None, }, ) + async def get_or_create_human_review(self, **kwargs): + return await get_database_manager_async_client().get_or_create_human_review( + **kwargs + ) + + async def update_node_execution_status(self, **kwargs): + return await async_update_node_execution_status( + db_client=get_database_manager_async_client(), **kwargs + ) + + async def update_review_processed_status(self, node_exec_id: str, processed: bool): + return await get_database_manager_async_client().update_review_processed_status( + node_exec_id, processed + ) + async def run( self, input_data: Input, @@ -92,20 +109,20 @@ class HumanInTheLoopBlock(Block): graph_exec_id: str, graph_id: str, graph_version: int, + execution_context: ExecutionContext, **kwargs, ) -> BlockOutput: - """ - Execute the Human In The Loop block. + if not execution_context.safe_mode: + logger.info( + f"HITL block skipping review for node {node_exec_id} - safe mode disabled" + ) + yield "status", "approved" + yield "reviewed_data", input_data.data + yield "review_message", "Auto-approved (safe mode disabled)" + return - This method uses one function to handle the complete workflow - checking existing reviews - and creating pending ones as needed. - """ try: - logger.debug(f"HITL block executing for node {node_exec_id}") - - # Use the data layer to handle the complete workflow - db_client = get_database_manager_async_client() - result = await db_client.get_or_create_human_review( + result = await self.get_or_create_human_review( user_id=user_id, node_exec_id=node_exec_id, graph_exec_id=graph_exec_id, @@ -119,21 +136,15 @@ class HumanInTheLoopBlock(Block): logger.error(f"Error in HITL block for node {node_exec_id}: {str(e)}") raise - # Check if we're waiting for human input if result is None: logger.info( f"HITL block pausing execution for node {node_exec_id} - awaiting human review" ) try: - # Set node status to REVIEW so execution manager can't mark it as COMPLETED - # The VALID_STATUS_TRANSITIONS will then prevent any unwanted status changes - # Use the proper wrapper function to ensure websocket events are published - await async_update_node_execution_status( - db_client=db_client, + await self.update_node_execution_status( exec_id=node_exec_id, status=ExecutionStatus.REVIEW, ) - # Execution pauses here until API routes process the review return except Exception as e: logger.error( @@ -141,10 +152,8 @@ class HumanInTheLoopBlock(Block): ) raise - # Review is complete (approved or rejected) - check if unprocessed if not result.processed: - # Mark as processed before yielding - await db_client.update_review_processed_status( + await self.update_review_processed_status( node_exec_id=node_exec_id, processed=True ) diff --git a/autogpt_platform/backend/backend/blocks/time_blocks.py b/autogpt_platform/backend/backend/blocks/time_blocks.py index 611dfa8281..3a1f4c678e 100644 --- a/autogpt_platform/backend/backend/blocks/time_blocks.py +++ b/autogpt_platform/backend/backend/blocks/time_blocks.py @@ -14,7 +14,7 @@ from backend.data.block import ( BlockSchemaInput, BlockSchemaOutput, ) -from backend.data.execution import UserContext +from backend.data.execution import ExecutionContext from backend.data.model import SchemaField # Shared timezone literal type for all time/date blocks @@ -188,10 +188,9 @@ class GetCurrentTimeBlock(Block): ) async def run( - self, input_data: Input, *, user_context: UserContext, **kwargs + self, input_data: Input, *, execution_context: ExecutionContext, **kwargs ) -> BlockOutput: - # Extract timezone from user_context (always present) - effective_timezone = user_context.timezone + effective_timezone = execution_context.user_timezone # Get the appropriate timezone tz = _get_timezone(input_data.format_type, effective_timezone) @@ -298,10 +297,10 @@ class GetCurrentDateBlock(Block): ], ) - async def run(self, input_data: Input, **kwargs) -> BlockOutput: - # Extract timezone from user_context (required keyword argument) - user_context: UserContext = kwargs["user_context"] - effective_timezone = user_context.timezone + async def run( + self, input_data: Input, *, execution_context: ExecutionContext, **kwargs + ) -> BlockOutput: + effective_timezone = execution_context.user_timezone try: offset = int(input_data.offset) @@ -404,10 +403,10 @@ class GetCurrentDateAndTimeBlock(Block): ], ) - async def run(self, input_data: Input, **kwargs) -> BlockOutput: - # Extract timezone from user_context (required keyword argument) - user_context: UserContext = kwargs["user_context"] - effective_timezone = user_context.timezone + async def run( + self, input_data: Input, *, execution_context: ExecutionContext, **kwargs + ) -> BlockOutput: + effective_timezone = execution_context.user_timezone # Get the appropriate timezone tz = _get_timezone(input_data.format_type, effective_timezone) diff --git a/autogpt_platform/backend/backend/data/block.py b/autogpt_platform/backend/backend/data/block.py index 2f05f444ae..762e9b37ef 100644 --- a/autogpt_platform/backend/backend/data/block.py +++ b/autogpt_platform/backend/backend/data/block.py @@ -71,6 +71,7 @@ class BlockType(Enum): AGENT = "Agent" AI = "AI" AYRSHARE = "Ayrshare" + HUMAN_IN_THE_LOOP = "Human In The Loop" class BlockCategory(Enum): @@ -796,3 +797,12 @@ def get_io_block_ids() -> Sequence[str]: for id, B in get_blocks().items() if B().block_type in (BlockType.INPUT, BlockType.OUTPUT) ] + + +@cached(ttl_seconds=3600) +def get_human_in_the_loop_block_ids() -> Sequence[str]: + return [ + id + for id, B in get_blocks().items() + if B().block_type == BlockType.HUMAN_IN_THE_LOOP + ] diff --git a/autogpt_platform/backend/backend/data/credit_test.py b/autogpt_platform/backend/backend/data/credit_test.py index 6f604975cf..391a373b86 100644 --- a/autogpt_platform/backend/backend/data/credit_test.py +++ b/autogpt_platform/backend/backend/data/credit_test.py @@ -7,7 +7,7 @@ from prisma.models import CreditTransaction, UserBalance from backend.blocks.llm import AITextGeneratorBlock from backend.data.block import get_block from backend.data.credit import BetaUserCredit, UsageTransactionMetadata -from backend.data.execution import NodeExecutionEntry, UserContext +from backend.data.execution import ExecutionContext, NodeExecutionEntry from backend.data.user import DEFAULT_USER_ID from backend.executor.utils import block_usage_cost from backend.integrations.credentials_store import openai_credentials @@ -86,7 +86,7 @@ async def test_block_credit_usage(server: SpinTestServer): "type": openai_credentials.type, }, }, - user_context=UserContext(timezone="UTC"), + execution_context=ExecutionContext(user_timezone="UTC"), ), ) assert spending_amount_1 > 0 @@ -101,7 +101,7 @@ async def test_block_credit_usage(server: SpinTestServer): node_exec_id="test_node_exec", block_id=AITextGeneratorBlock().id, inputs={"model": "gpt-4-turbo", "api_key": "owned_api_key"}, - user_context=UserContext(timezone="UTC"), + execution_context=ExecutionContext(user_timezone="UTC"), ), ) assert spending_amount_2 == 0 diff --git a/autogpt_platform/backend/backend/data/execution.py b/autogpt_platform/backend/backend/data/execution.py index b78633cf58..c7c54ef268 100644 --- a/autogpt_platform/backend/backend/data/execution.py +++ b/autogpt_platform/backend/backend/data/execution.py @@ -71,6 +71,18 @@ logger = logging.getLogger(__name__) config = Config() +class ExecutionContext(BaseModel): + """ + Unified context that carries execution-level data throughout the entire execution flow. + This includes information needed by blocks, sub-graphs, and execution management. + """ + + safe_mode: bool = True + user_timezone: str = "UTC" + root_execution_id: Optional[str] = None + parent_execution_id: Optional[str] = None + + # -------------------------- Models -------------------------- # @@ -365,9 +377,8 @@ class GraphExecutionWithNodes(GraphExecution): def to_graph_execution_entry( self, - user_context: "UserContext", + execution_context: ExecutionContext, compiled_nodes_input_masks: Optional[NodesInputMasks] = None, - parent_graph_exec_id: Optional[str] = None, ): return GraphExecutionEntry( user_id=self.user_id, @@ -375,8 +386,7 @@ class GraphExecutionWithNodes(GraphExecution): graph_version=self.graph_version or 0, graph_exec_id=self.id, nodes_input_masks=compiled_nodes_input_masks, - user_context=user_context, - parent_graph_exec_id=parent_graph_exec_id, + execution_context=execution_context, ) @@ -449,7 +459,7 @@ class NodeExecutionResult(BaseModel): ) def to_node_execution_entry( - self, user_context: "UserContext" + self, execution_context: ExecutionContext ) -> "NodeExecutionEntry": return NodeExecutionEntry( user_id=self.user_id, @@ -460,7 +470,7 @@ class NodeExecutionResult(BaseModel): node_id=self.node_id, block_id=self.block_id, inputs=self.input_data, - user_context=user_context, + execution_context=execution_context, ) @@ -1099,20 +1109,13 @@ async def get_latest_node_execution( # ----------------- Execution Infrastructure ----------------- # -class UserContext(BaseModel): - """Generic user context for graph execution containing user-specific settings.""" - - timezone: str - - class GraphExecutionEntry(BaseModel): user_id: str graph_exec_id: str graph_id: str graph_version: int nodes_input_masks: Optional[NodesInputMasks] = None - user_context: UserContext - parent_graph_exec_id: Optional[str] = None + execution_context: ExecutionContext class NodeExecutionEntry(BaseModel): @@ -1124,7 +1127,7 @@ class NodeExecutionEntry(BaseModel): node_id: str block_id: str inputs: BlockInput - user_context: UserContext + execution_context: ExecutionContext class ExecutionQueue(Generic[T]): diff --git a/autogpt_platform/backend/backend/data/graph.py b/autogpt_platform/backend/backend/data/graph.py index 53993fd7e1..0757a86f4a 100644 --- a/autogpt_platform/backend/backend/data/graph.py +++ b/autogpt_platform/backend/backend/data/graph.py @@ -61,6 +61,10 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +class GraphSettings(BaseModel): + human_in_the_loop_safe_mode: bool | None = None + + class Link(BaseDbModel): source_id: str sink_id: str @@ -225,6 +229,15 @@ class BaseGraph(BaseDbModel): def has_external_trigger(self) -> bool: return self.webhook_input_node is not None + @computed_field + @property + def has_human_in_the_loop(self) -> bool: + return any( + node.block_id + for node in self.nodes + if node.block.block_type == BlockType.HUMAN_IN_THE_LOOP + ) + @property def webhook_input_node(self) -> Node | None: return next( @@ -1105,6 +1118,28 @@ async def delete_graph(graph_id: str, user_id: str) -> int: return entries_count +async def get_graph_settings(user_id: str, graph_id: str) -> GraphSettings: + lib = await LibraryAgent.prisma().find_first( + where={ + "userId": user_id, + "agentGraphId": graph_id, + "isDeleted": False, + "isArchived": False, + }, + order={"agentGraphVersion": "desc"}, + ) + if not lib or not lib.settings: + return GraphSettings() + + try: + return GraphSettings.model_validate(lib.settings) + except Exception: + logger.warning( + f"Malformed settings for LibraryAgent user={user_id} graph={graph_id}" + ) + return GraphSettings() + + async def validate_graph_execution_permissions( user_id: str, graph_id: str, graph_version: int, is_sub_graph: bool = False ) -> None: diff --git a/autogpt_platform/backend/backend/data/human_review.py b/autogpt_platform/backend/backend/data/human_review.py index 2b0b2dbfb7..df0b4b21e8 100644 --- a/autogpt_platform/backend/backend/data/human_review.py +++ b/autogpt_platform/backend/backend/data/human_review.py @@ -32,32 +32,6 @@ class ReviewResult(BaseModel): node_exec_id: str -async def get_pending_review_by_node_exec_id( - node_exec_id: str, user_id: str -) -> Optional["PendingHumanReviewModel"]: - """ - Get a pending review by node execution ID with user ownership validation. - - Args: - node_exec_id: The node execution ID to check - user_id: The user ID to validate ownership - - Returns: - The existing review if found and owned by the user, None otherwise - """ - review = await PendingHumanReview.prisma().find_first( - where={ - "nodeExecId": node_exec_id, - "userId": user_id, - } - ) - - if review: - return PendingHumanReviewModel.from_db(review) - - return None - - async def get_or_create_human_review( user_id: str, node_exec_id: str, @@ -121,27 +95,17 @@ async def get_or_create_human_review( if review.processed: return None - if review.status == ReviewStatus.APPROVED: - # Return the approved review result - return ReviewResult( - data=review.payload, - status=ReviewStatus.APPROVED, - message=review.reviewMessage or "", - processed=review.processed, - node_exec_id=review.nodeExecId, - ) - elif review.status == ReviewStatus.REJECTED: - # Return the rejected review result - return ReviewResult( - data=None, - status=ReviewStatus.REJECTED, - message=review.reviewMessage or "", - processed=review.processed, - node_exec_id=review.nodeExecId, - ) - else: - # Review is pending - return None to continue waiting + # If pending, return None to continue waiting, otherwise return the review result + if review.status == ReviewStatus.WAITING: return None + else: + return ReviewResult( + data=review.payload if review.status == ReviewStatus.APPROVED else None, + status=review.status, + message=review.reviewMessage or "", + processed=review.processed, + node_exec_id=review.nodeExecId, + ) async def has_pending_reviews_for_graph_exec(graph_exec_id: str) -> bool: diff --git a/autogpt_platform/backend/backend/data/human_review_test.py b/autogpt_platform/backend/backend/data/human_review_test.py index 02cd676524..44b34ae9f3 100644 --- a/autogpt_platform/backend/backend/data/human_review_test.py +++ b/autogpt_platform/backend/backend/data/human_review_test.py @@ -7,7 +7,6 @@ from prisma.enums import ReviewStatus from backend.data.human_review import ( get_or_create_human_review, - get_pending_review_by_node_exec_id, get_pending_reviews_for_execution, get_pending_reviews_for_user, has_pending_reviews_for_graph_exec, @@ -24,7 +23,7 @@ def sample_db_review(): """Create a sample database review object""" mock_review = Mock() mock_review.nodeExecId = "test_node_123" - mock_review.userId = "test_user" + mock_review.userId = "test-user-123" mock_review.graphExecId = "test_graph_exec_456" mock_review.graphId = "test_graph_789" mock_review.graphVersion = 1 @@ -41,40 +40,6 @@ def sample_db_review(): return mock_review -@pytest.mark.asyncio -async def test_get_pending_review_by_node_exec_id_found( - mocker: pytest_mock.MockFixture, - sample_db_review, -): - """Test finding an existing pending review""" - mock_find_first = mocker.patch( - "backend.data.human_review.PendingHumanReview.prisma" - ) - mock_find_first.return_value.find_first = AsyncMock(return_value=sample_db_review) - - result = await get_pending_review_by_node_exec_id("test_node_123", "test_user") - - assert result is not None - assert result.node_exec_id == "test_node_123" - assert result.user_id == "test_user" - assert result.status == ReviewStatus.WAITING - - -@pytest.mark.asyncio -async def test_get_pending_review_by_node_exec_id_not_found( - mocker: pytest_mock.MockFixture, -): - """Test when review is not found""" - mock_find_first = mocker.patch( - "backend.data.human_review.PendingHumanReview.prisma" - ) - mock_find_first.return_value.find_first = AsyncMock(return_value=None) - - result = await get_pending_review_by_node_exec_id("nonexistent", "test_user") - - assert result is None - - @pytest.mark.asyncio async def test_get_or_create_human_review_new( mocker: pytest_mock.MockFixture, @@ -89,7 +54,7 @@ async def test_get_or_create_human_review_new( mock_upsert.return_value.upsert = AsyncMock(return_value=sample_db_review) result = await get_or_create_human_review( - user_id="test_user", + user_id="test-user-123", node_exec_id="test_node_123", graph_exec_id="test_graph_exec_456", graph_id="test_graph_789", @@ -118,7 +83,7 @@ async def test_get_or_create_human_review_approved( mock_upsert.return_value.upsert = AsyncMock(return_value=sample_db_review) result = await get_or_create_human_review( - user_id="test_user", + user_id="test-user-123", node_exec_id="test_node_123", graph_exec_id="test_graph_exec_456", graph_id="test_graph_789", @@ -190,7 +155,9 @@ async def test_get_pending_reviews_for_execution( mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma") mock_find_many.return_value.find_many = AsyncMock(return_value=[sample_db_review]) - result = await get_pending_reviews_for_execution("test_graph_exec_456", "test_user") + result = await get_pending_reviews_for_execution( + "test_graph_exec_456", "test-user-123" + ) assert len(result) == 1 assert result[0].graph_exec_id == "test_graph_exec_456" @@ -198,7 +165,7 @@ async def test_get_pending_reviews_for_execution( # Verify it filters by execution and user call_args = mock_find_many.return_value.find_many.call_args where_clause = call_args.kwargs["where"] - assert where_clause["userId"] == "test_user" + assert where_clause["userId"] == "test-user-123" assert where_clause["graphExecId"] == "test_graph_exec_456" assert where_clause["status"] == ReviewStatus.WAITING @@ -210,13 +177,13 @@ async def test_process_all_reviews_for_execution_success( ): """Test successful processing of reviews for an execution""" # Mock finding reviews - mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma") - mock_find_many.return_value.find_many = AsyncMock(return_value=[sample_db_review]) + mock_prisma = mocker.patch("backend.data.human_review.PendingHumanReview.prisma") + mock_prisma.return_value.find_many = AsyncMock(return_value=[sample_db_review]) # Mock updating reviews updated_review = Mock() updated_review.nodeExecId = "test_node_123" - updated_review.userId = "test_user" + updated_review.userId = "test-user-123" updated_review.graphExecId = "test_graph_exec_456" updated_review.graphId = "test_graph_789" updated_review.graphVersion = 1 @@ -230,8 +197,7 @@ async def test_process_all_reviews_for_execution_success( updated_review.createdAt = datetime.datetime.now(datetime.timezone.utc) updated_review.updatedAt = datetime.datetime.now(datetime.timezone.utc) updated_review.reviewedAt = datetime.datetime.now(datetime.timezone.utc) - mock_update = mocker.patch("backend.data.human_review.PendingHumanReview.prisma") - mock_update.return_value.update = AsyncMock(return_value=updated_review) + mock_prisma.return_value.update = AsyncMock(return_value=updated_review) # Mock gather to simulate parallel updates mocker.patch( @@ -240,7 +206,7 @@ async def test_process_all_reviews_for_execution_success( ) result = await process_all_reviews_for_execution( - user_id="test_user", + user_id="test-user-123", review_decisions={ "test_node_123": (ReviewStatus.APPROVED, {"data": "modified"}, "Approved") }, @@ -264,7 +230,7 @@ async def test_process_all_reviews_for_execution_validation_errors( with pytest.raises(ValueError, match="Reviews not found"): await process_all_reviews_for_execution( - user_id="test_user", + user_id="test-user-123", review_decisions={ "nonexistent_node": (ReviewStatus.APPROVED, {"data": "test"}, "message") }, @@ -286,7 +252,7 @@ async def test_process_all_reviews_edit_permission_error( with pytest.raises(ValueError, match="not editable"): await process_all_reviews_for_execution( - user_id="test_user", + user_id="test-user-123", review_decisions={ "test_node_123": ( ReviewStatus.APPROVED, @@ -306,7 +272,7 @@ async def test_process_all_reviews_mixed_approval_rejection( # Create second review for rejection second_review = Mock() second_review.nodeExecId = "test_node_456" - second_review.userId = "test_user" + second_review.userId = "test-user-123" second_review.graphExecId = "test_graph_exec_456" second_review.graphId = "test_graph_789" second_review.graphVersion = 1 @@ -330,7 +296,7 @@ async def test_process_all_reviews_mixed_approval_rejection( # Mock updating reviews approved_review = Mock() approved_review.nodeExecId = "test_node_123" - approved_review.userId = "test_user" + approved_review.userId = "test-user-123" approved_review.graphExecId = "test_graph_exec_456" approved_review.graphId = "test_graph_789" approved_review.graphVersion = 1 @@ -347,7 +313,7 @@ async def test_process_all_reviews_mixed_approval_rejection( rejected_review = Mock() rejected_review.nodeExecId = "test_node_456" - rejected_review.userId = "test_user" + rejected_review.userId = "test-user-123" rejected_review.graphExecId = "test_graph_exec_456" rejected_review.graphId = "test_graph_789" rejected_review.graphVersion = 1 @@ -368,7 +334,7 @@ async def test_process_all_reviews_mixed_approval_rejection( ) result = await process_all_reviews_for_execution( - user_id="test_user", + user_id="test-user-123", review_decisions={ "test_node_123": (ReviewStatus.APPROVED, {"data": "modified"}, "Approved"), "test_node_456": (ReviewStatus.REJECTED, None, "Rejected"), diff --git a/autogpt_platform/backend/backend/data/integrations.py b/autogpt_platform/backend/backend/data/integrations.py index 68bdbe085f..0f328e81b7 100644 --- a/autogpt_platform/backend/backend/data/integrations.py +++ b/autogpt_platform/backend/backend/data/integrations.py @@ -1,5 +1,5 @@ import logging -from typing import AsyncGenerator, Literal, Optional, overload +from typing import TYPE_CHECKING, AsyncGenerator, Literal, Optional, overload from prisma.models import AgentNode, AgentPreset, IntegrationWebhook from prisma.types import ( @@ -19,10 +19,12 @@ from backend.integrations.creds_manager import IntegrationCredentialsManager from backend.integrations.providers import ProviderName from backend.integrations.webhooks import get_webhook_manager from backend.integrations.webhooks.utils import webhook_ingress_url -from backend.server.v2.library.model import LibraryAgentPreset from backend.util.exceptions import NotFoundError from backend.util.json import SafeJson +if TYPE_CHECKING: + from backend.server.v2.library.model import LibraryAgentPreset + from .db import BaseDbModel from .graph import NodeModel @@ -64,7 +66,7 @@ class Webhook(BaseDbModel): class WebhookWithRelations(Webhook): triggered_nodes: list[NodeModel] - triggered_presets: list[LibraryAgentPreset] + triggered_presets: list["LibraryAgentPreset"] @staticmethod def from_db(webhook: IntegrationWebhook): @@ -73,6 +75,12 @@ class WebhookWithRelations(Webhook): "AgentNodes and AgentPresets must be included in " "IntegrationWebhook query with relations" ) + # LibraryAgentPreset import is moved to TYPE_CHECKING to avoid circular import: + # integrations.py โ†’ library/model.py โ†’ integrations.py (for Webhook) + # Runtime import is used in WebhookWithRelations.from_db() method instead + # Import at runtime to avoid circular dependency + from backend.server.v2.library.model import LibraryAgentPreset + return WebhookWithRelations( **Webhook.from_db(webhook).model_dump(), triggered_nodes=[NodeModel.from_db(node) for node in webhook.AgentNodes], diff --git a/autogpt_platform/backend/backend/data/model.py b/autogpt_platform/backend/backend/data/model.py index 1c32a9f444..ca4d330301 100644 --- a/autogpt_platform/backend/backend/data/model.py +++ b/autogpt_platform/backend/backend/data/model.py @@ -46,6 +46,7 @@ from backend.util.settings import Secrets # Type alias for any provider name (including custom ones) AnyProviderName = str # Will be validated as ProviderName at runtime +USER_TIMEZONE_NOT_SET = "not-set" class User(BaseModel): @@ -98,7 +99,7 @@ class User(BaseModel): # User timezone for scheduling and time display timezone: str = Field( - default="not-set", + default=USER_TIMEZONE_NOT_SET, description="User timezone (IANA timezone identifier or 'not-set')", ) @@ -155,7 +156,7 @@ class User(BaseModel): notify_on_daily_summary=prisma_user.notifyOnDailySummary or True, notify_on_weekly_summary=prisma_user.notifyOnWeeklySummary or True, notify_on_monthly_summary=prisma_user.notifyOnMonthlySummary or True, - timezone=prisma_user.timezone or "not-set", + timezone=prisma_user.timezone or USER_TIMEZONE_NOT_SET, ) diff --git a/autogpt_platform/backend/backend/executor/database.py b/autogpt_platform/backend/backend/executor/database.py index 27e8b01043..c2cb0dd2e7 100644 --- a/autogpt_platform/backend/backend/executor/database.py +++ b/autogpt_platform/backend/backend/executor/database.py @@ -28,6 +28,7 @@ from backend.data.graph import ( get_connected_output_nodes, get_graph, get_graph_metadata, + get_graph_settings, get_node, validate_graph_execution_permissions, ) @@ -150,6 +151,7 @@ class DatabaseManager(AppService): get_graph = _(get_graph) get_connected_output_nodes = _(get_connected_output_nodes) get_graph_metadata = _(get_graph_metadata) + get_graph_settings = _(get_graph_settings) # Credits spend_credits = _(_spend_credits, name="spend_credits") @@ -254,6 +256,7 @@ class DatabaseManagerAsyncClient(AppServiceClient): get_latest_node_execution = d.get_latest_node_execution get_graph = d.get_graph get_graph_metadata = d.get_graph_metadata + get_graph_settings = d.get_graph_settings get_graph_execution_meta = d.get_graph_execution_meta get_node = d.get_node get_node_execution = d.get_node_execution diff --git a/autogpt_platform/backend/backend/executor/manager.py b/autogpt_platform/backend/backend/executor/manager.py index 06ad06e6dc..bfec94176d 100644 --- a/autogpt_platform/backend/backend/executor/manager.py +++ b/autogpt_platform/backend/backend/executor/manager.py @@ -29,6 +29,7 @@ from backend.data.block import ( from backend.data.credit import UsageTransactionMetadata from backend.data.dynamic_fields import parse_execution_output from backend.data.execution import ( + ExecutionContext, ExecutionQueue, ExecutionStatus, GraphExecution, @@ -36,7 +37,6 @@ from backend.data.execution import ( NodeExecutionEntry, NodeExecutionResult, NodesInputMasks, - UserContext, ) from backend.data.graph import Link, Node from backend.data.model import GraphExecutionStats, NodeExecutionStats @@ -168,6 +168,7 @@ async def execute_node( node_exec_id = data.node_exec_id node_id = data.node_id node_block = node.block + execution_context = data.execution_context log_metadata = LogMetadata( logger=_logger, @@ -210,11 +211,9 @@ async def execute_node( "graph_exec_id": graph_exec_id, "node_exec_id": node_exec_id, "user_id": user_id, + "execution_context": execution_context, } - # Add user context from NodeExecutionEntry - extra_exec_kwargs["user_context"] = data.user_context - # Last-minute fetch credentials + acquire a system-wide read-write lock to prevent # changes during execution. โš ๏ธ This means a set of credentials can only be used by # one (running) block at a time; simultaneous execution of blocks using same @@ -243,8 +242,8 @@ async def execute_node( scope.set_tag("node_id", node_id) scope.set_tag("block_name", node_block.name) scope.set_tag("block_id", node_block.id) - for k, v in (data.user_context or UserContext(timezone="UTC")).model_dump().items(): - scope.set_tag(f"user_context.{k}", v) + for k, v in execution_context.model_dump().items(): + scope.set_tag(f"execution_context.{k}", v) try: async for output_name, output_data in node_block.execute( @@ -289,7 +288,7 @@ async def _enqueue_next_nodes( graph_version: int, log_metadata: LogMetadata, nodes_input_masks: Optional[NodesInputMasks], - user_context: UserContext, + execution_context: ExecutionContext, ) -> list[NodeExecutionEntry]: async def add_enqueued_execution( node_exec_id: str, node_id: str, block_id: str, data: BlockInput @@ -309,7 +308,7 @@ async def _enqueue_next_nodes( node_id=node_id, block_id=block_id, inputs=data, - user_context=user_context, + execution_context=execution_context, ) async def register_next_executions(node_link: Link) -> list[NodeExecutionEntry]: @@ -861,7 +860,9 @@ class ExecutionProcessor: ExecutionStatus.REVIEW, ], ): - node_entry = node_exec.to_node_execution_entry(graph_exec.user_context) + node_entry = node_exec.to_node_execution_entry( + graph_exec.execution_context + ) execution_queue.add(node_entry) # ------------------------------------------------------------ @@ -1165,7 +1166,7 @@ class ExecutionProcessor: graph_version=graph_exec.graph_version, log_metadata=log_metadata, nodes_input_masks=nodes_input_masks, - user_context=graph_exec.user_context, + execution_context=graph_exec.execution_context, ): execution_queue.add(next_execution) @@ -1555,36 +1556,32 @@ class ExecutionManager(AppProcess): graph_exec_id = graph_exec_entry.graph_exec_id user_id = graph_exec_entry.user_id graph_id = graph_exec_entry.graph_id - parent_graph_exec_id = graph_exec_entry.parent_graph_exec_id + root_exec_id = graph_exec_entry.execution_context.root_execution_id + parent_exec_id = graph_exec_entry.execution_context.parent_execution_id logger.info( f"[{self.service_name}] Received RUN for graph_exec_id={graph_exec_id}, user_id={user_id}, executor_id={self.executor_id}" - + (f", parent={parent_graph_exec_id}" if parent_graph_exec_id else "") + + (f", root={root_exec_id}" if root_exec_id else "") + + (f", parent={parent_exec_id}" if parent_exec_id else "") ) - # Check if parent execution is already terminated (prevents orphaned child executions) - if parent_graph_exec_id: - try: - parent_exec = get_db_client().get_graph_execution_meta( - execution_id=parent_graph_exec_id, - user_id=user_id, + # Check if root execution is already terminated (prevents orphaned child executions) + if root_exec_id and root_exec_id != graph_exec_id: + parent_exec = get_db_client().get_graph_execution_meta( + execution_id=root_exec_id, + user_id=user_id, + ) + if parent_exec and parent_exec.status == ExecutionStatus.TERMINATED: + logger.info( + f"[{self.service_name}] Skipping execution {graph_exec_id} - parent {root_exec_id} is TERMINATED" ) - if parent_exec and parent_exec.status == ExecutionStatus.TERMINATED: - logger.info( - f"[{self.service_name}] Skipping execution {graph_exec_id} - parent {parent_graph_exec_id} is TERMINATED" - ) - # Mark this child as terminated since parent was stopped - get_db_client().update_graph_execution_stats( - graph_exec_id=graph_exec_id, - status=ExecutionStatus.TERMINATED, - ) - _ack_message(reject=False, requeue=False) - return - except Exception as e: - logger.warning( - f"[{self.service_name}] Could not check parent status for {graph_exec_id}: {e}" + # Mark this child as terminated since parent was stopped + get_db_client().update_graph_execution_stats( + graph_exec_id=graph_exec_id, + status=ExecutionStatus.TERMINATED, ) - # Continue execution if parent check fails (don't block on errors) + _ack_message(reject=False, requeue=False) + return # Check user rate limit before processing try: diff --git a/autogpt_platform/backend/backend/executor/utils.py b/autogpt_platform/backend/backend/executor/utils.py index f8c6da8546..bcd3dcf3b6 100644 --- a/autogpt_platform/backend/backend/executor/utils.py +++ b/autogpt_platform/backend/backend/executor/utils.py @@ -10,6 +10,7 @@ from pydantic import BaseModel, JsonValue, ValidationError from backend.data import execution as execution_db from backend.data import graph as graph_db +from backend.data import user as user_db from backend.data.block import ( Block, BlockCostType, @@ -24,19 +25,17 @@ from backend.data.db import prisma # Import dynamic field utilities from centralized location from backend.data.dynamic_fields import merge_execution_input from backend.data.execution import ( + ExecutionContext, ExecutionStatus, GraphExecutionMeta, GraphExecutionStats, GraphExecutionWithNodes, NodesInputMasks, - UserContext, get_graph_execution, ) from backend.data.graph import GraphModel, Node -from backend.data.model import CredentialsMetaInput +from backend.data.model import USER_TIMEZONE_NOT_SET, CredentialsMetaInput from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig -from backend.data.user import get_user_by_id -from backend.util.cache import cached from backend.util.clients import ( get_async_execution_event_bus, get_async_execution_queue, @@ -52,32 +51,6 @@ from backend.util.logging import TruncatedLogger, is_structured_logging_enabled from backend.util.settings import Config from backend.util.type import convert - -@cached(maxsize=1000, ttl_seconds=3600) -async def get_user_context(user_id: str) -> UserContext: - """ - Get UserContext for a user, always returns a valid context with timezone. - Defaults to UTC if user has no timezone set. - """ - user_context = UserContext(timezone="UTC") # Default to UTC - try: - if prisma.is_connected(): - user = await get_user_by_id(user_id) - else: - user = await get_database_manager_async_client().get_user_by_id(user_id) - - if user and user.timezone and user.timezone != "not-set": - user_context.timezone = user.timezone - logger.debug(f"Retrieved user context: timezone={user.timezone}") - else: - logger.debug("User has no timezone set, using UTC") - except Exception as e: - logger.warning(f"Could not fetch user timezone: {e}") - # Continue with UTC as default - - return user_context - - config = Config() logger = TruncatedLogger(logging.getLogger(__name__), prefix="[GraphExecutorUtil]") @@ -495,7 +468,6 @@ async def validate_and_construct_node_execution_input( graph_version: The version of the graph to use. graph_credentials_inputs: Credentials inputs to use. nodes_input_masks: Node inputs to use. - is_sub_graph: Whether this is a sub-graph execution. Returns: GraphModel: Full graph object for the given `graph_id`. @@ -763,8 +735,7 @@ async def add_graph_execution( graph_version: Optional[int] = None, graph_credentials_inputs: Optional[Mapping[str, CredentialsMetaInput]] = None, nodes_input_masks: Optional[NodesInputMasks] = None, - parent_graph_exec_id: Optional[str] = None, - is_sub_graph: bool = False, + execution_context: Optional[ExecutionContext] = None, graph_exec_id: Optional[str] = None, ) -> GraphExecutionWithNodes: """ @@ -780,7 +751,6 @@ async def add_graph_execution( Keys should map to the keys generated by `GraphModel.aggregate_credentials_inputs`. nodes_input_masks: Node inputs to use in the execution. parent_graph_exec_id: The ID of the parent graph execution (for nested executions). - is_sub_graph: Whether this is a sub-graph execution. graph_exec_id: If provided, resume this existing execution instead of creating a new one. Returns: GraphExecutionEntry: The entry for the graph execution. @@ -790,8 +760,10 @@ async def add_graph_execution( """ if prisma.is_connected(): edb = execution_db + udb = user_db + gdb = graph_db else: - edb = get_database_manager_async_client() + edb = udb = gdb = get_database_manager_async_client() # Get or create the graph execution if graph_exec_id: @@ -810,6 +782,10 @@ async def add_graph_execution( logger.info(f"Resuming graph execution #{graph_exec.id} for graph #{graph_id}") else: + parent_exec_id = ( + execution_context.parent_execution_id if execution_context else None + ) + # Create new execution graph, starting_nodes_input, compiled_nodes_input_masks = ( await validate_and_construct_node_execution_input( @@ -819,7 +795,7 @@ async def add_graph_execution( graph_version=graph_version, graph_credentials_inputs=graph_credentials_inputs, nodes_input_masks=nodes_input_masks, - is_sub_graph=is_sub_graph, + is_sub_graph=parent_exec_id is not None, ) ) @@ -832,7 +808,7 @@ async def add_graph_execution( nodes_input_masks=nodes_input_masks, starting_nodes_input=starting_nodes_input, preset_id=preset_id, - parent_graph_exec_id=parent_graph_exec_id, + parent_graph_exec_id=parent_exec_id, ) logger.info( @@ -840,14 +816,28 @@ async def add_graph_execution( f"#{graph_id} with {len(starting_nodes_input)} starting nodes" ) - # Common path: publish to queue and update status - try: - graph_exec_entry = graph_exec.to_graph_execution_entry( - user_context=await get_user_context(user_id), - compiled_nodes_input_masks=compiled_nodes_input_masks, - parent_graph_exec_id=parent_graph_exec_id, + # Generate execution context if it's not provided + if execution_context is None: + user = await udb.get_user_by_id(user_id) + settings = await gdb.get_graph_settings(user_id=user_id, graph_id=graph_id) + + execution_context = ExecutionContext( + safe_mode=( + settings.human_in_the_loop_safe_mode + if settings.human_in_the_loop_safe_mode is not None + else True + ), + user_timezone=( + user.timezone if user.timezone != USER_TIMEZONE_NOT_SET else "UTC" + ), + root_execution_id=graph_exec.id, ) + try: + graph_exec_entry = graph_exec.to_graph_execution_entry( + compiled_nodes_input_masks=compiled_nodes_input_masks, + execution_context=execution_context, + ) logger.info(f"Publishing execution {graph_exec.id} to execution queue") exec_queue = await get_async_execution_queue() diff --git a/autogpt_platform/backend/backend/executor/utils_test.py b/autogpt_platform/backend/backend/executor/utils_test.py index b418a069aa..8854214e14 100644 --- a/autogpt_platform/backend/backend/executor/utils_test.py +++ b/autogpt_platform/backend/backend/executor/utils_test.py @@ -348,9 +348,6 @@ async def test_add_graph_execution_is_repeatable(mocker: MockerFixture): mock_graph_exec.node_executions = [] # Add this to avoid AttributeError mock_graph_exec.to_graph_execution_entry.return_value = mocker.MagicMock() - # Mock user context - mock_user_context = {"user_id": user_id, "context": "test_context"} - # Mock the queue and event bus mock_queue = mocker.AsyncMock() mock_event_bus = mocker.MagicMock() @@ -362,7 +359,8 @@ async def test_add_graph_execution_is_repeatable(mocker: MockerFixture): ) mock_edb = mocker.patch("backend.executor.utils.execution_db") mock_prisma = mocker.patch("backend.executor.utils.prisma") - mock_get_user_context = mocker.patch("backend.executor.utils.get_user_context") + mock_udb = mocker.patch("backend.executor.utils.user_db") + mock_gdb = mocker.patch("backend.executor.utils.graph_db") mock_get_queue = mocker.patch("backend.executor.utils.get_async_execution_queue") mock_get_event_bus = mocker.patch( "backend.executor.utils.get_async_execution_event_bus" @@ -380,7 +378,14 @@ async def test_add_graph_execution_is_repeatable(mocker: MockerFixture): return_value=mock_graph_exec ) mock_edb.update_node_execution_status_batch = mocker.AsyncMock() - mock_get_user_context.return_value = mock_user_context + # Mock user and settings data + mock_user = mocker.MagicMock() + mock_user.timezone = "UTC" + mock_settings = mocker.MagicMock() + mock_settings.human_in_the_loop_safe_mode = True + + mock_udb.get_user_by_id = mocker.AsyncMock(return_value=mock_user) + mock_gdb.get_graph_settings = mocker.AsyncMock(return_value=mock_settings) mock_get_queue.return_value = mock_queue mock_get_event_bus.return_value = mock_event_bus diff --git a/autogpt_platform/backend/backend/server/routers/v1.py b/autogpt_platform/backend/backend/server/routers/v1.py index e38dd77ac5..7ad812af8a 100644 --- a/autogpt_platform/backend/backend/server/routers/v1.py +++ b/autogpt_platform/backend/backend/server/routers/v1.py @@ -44,7 +44,7 @@ from backend.data.credit import ( get_user_credit_model, set_auto_top_up, ) -from backend.data.execution import UserContext +from backend.data.graph import GraphSettings from backend.data.model import CredentialsMetaInput from backend.data.notifications import NotificationPreference, NotificationPreferenceDTO from backend.data.onboarding import ( @@ -387,19 +387,15 @@ async def execute_graph_block( if not obj: raise HTTPException(status_code=404, detail=f"Block #{block_id} not found.") - # Get user context for block execution user = await get_user_by_id(user_id) if not user: raise HTTPException(status_code=404, detail="User not found.") - user_context = UserContext(timezone=user.timezone) - start_time = time.time() try: output = defaultdict(list) async for name, data in obj.execute( data, - user_context=user_context, user_id=user_id, # Note: graph_exec_id and graph_id are not available for direct block execution ): @@ -842,9 +838,18 @@ async def update_graph( if new_graph_version.is_active: # Keep the library agent up to date with the new active version - await library_db.update_agent_version_in_library( + library = await library_db.update_agent_version_in_library( user_id, graph.id, graph.version ) + if ( + new_graph_version.has_human_in_the_loop + and library.settings.human_in_the_loop_safe_mode is None + ): + await library_db.update_library_agent_settings( + user_id=user_id, + agent_id=library.id, + settings=GraphSettings(human_in_the_loop_safe_mode=True), + ) # Handle activation of the new graph first to ensure continuity new_graph_version = await on_graph_activate(new_graph_version, user_id=user_id) @@ -901,15 +906,54 @@ async def set_graph_active_version( ) # Keep the library agent up to date with the new active version - await library_db.update_agent_version_in_library( + library = await library_db.update_agent_version_in_library( user_id, new_active_graph.id, new_active_graph.version ) + if ( + new_active_graph.has_human_in_the_loop + and library.settings.human_in_the_loop_safe_mode is None + ): + await library_db.update_library_agent_settings( + user_id=user_id, + agent_id=library.id, + settings=GraphSettings(human_in_the_loop_safe_mode=True), + ) if current_active_graph and current_active_graph.version != new_active_version: # Handle deactivation of the previously active version await on_graph_deactivate(current_active_graph, user_id=user_id) +@v1_router.patch( + path="/graphs/{graph_id}/settings", + summary="Update graph settings", + tags=["graphs"], + dependencies=[Security(requires_user)], +) +async def update_graph_settings( + graph_id: str, + settings: GraphSettings, + user_id: Annotated[str, Security(get_user_id)], +) -> GraphSettings: + """Update graph settings for the user's library agent.""" + # Get the library agent for this graph + library_agent = await library_db.get_library_agent_by_graph_id( + graph_id=graph_id, user_id=user_id + ) + if not library_agent: + raise HTTPException(404, f"Graph #{graph_id} not found in user's library") + + # Update the library agent settings + updated_agent = await library_db.update_library_agent_settings( + user_id=user_id, + agent_id=library_agent.id, + settings=settings, + ) + + # Return the updated settings + return GraphSettings.model_validate(updated_agent.settings) + + @v1_router.post( path="/graphs/{graph_id}/execute/{graph_version}", summary="Execute graph agent", diff --git a/autogpt_platform/backend/backend/server/v2/executions/review/review_routes_test.py b/autogpt_platform/backend/backend/server/v2/executions/review/review_routes_test.py index da4636655b..0e282f3629 100644 --- a/autogpt_platform/backend/backend/server/v2/executions/review/review_routes_test.py +++ b/autogpt_platform/backend/backend/server/v2/executions/review/review_routes_test.py @@ -7,6 +7,7 @@ import pytest_mock from prisma.enums import ReviewStatus from pytest_snapshot.plugin import Snapshot +from backend.server.rest_api import handle_internal_http_error from backend.server.v2.executions.review.model import PendingHumanReviewModel from backend.server.v2.executions.review.routes import router @@ -15,6 +16,7 @@ FIXED_NOW = datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) app = fastapi.FastAPI() app.include_router(router, prefix="/api/review") +app.add_exception_handler(ValueError, handle_internal_http_error(400)) client = fastapi.testclient.TestClient(app) @@ -34,11 +36,11 @@ def setup_app_auth(mock_jwt_user): @pytest.fixture -def sample_pending_review() -> PendingHumanReviewModel: +def sample_pending_review(test_user_id: str) -> PendingHumanReviewModel: """Create a sample pending review for testing""" return PendingHumanReviewModel( node_exec_id="test_node_123", - user_id="test_user", + user_id=test_user_id, graph_exec_id="test_graph_exec_456", graph_id="test_graph_789", graph_version=1, @@ -58,6 +60,7 @@ def sample_pending_review() -> PendingHumanReviewModel: def test_get_pending_reviews_empty( mocker: pytest_mock.MockFixture, snapshot: Snapshot, + test_user_id: str, ) -> None: """Test getting pending reviews when none exist""" mock_get_reviews = mocker.patch( @@ -69,13 +72,14 @@ def test_get_pending_reviews_empty( assert response.status_code == 200 assert response.json() == [] - mock_get_reviews.assert_called_once_with("test_user", 1, 25) + mock_get_reviews.assert_called_once_with(test_user_id, 1, 25) def test_get_pending_reviews_with_data( mocker: pytest_mock.MockFixture, sample_pending_review: PendingHumanReviewModel, snapshot: Snapshot, + test_user_id: str, ) -> None: """Test getting pending reviews with data""" mock_get_reviews = mocker.patch( @@ -90,13 +94,14 @@ def test_get_pending_reviews_with_data( assert len(data) == 1 assert data[0]["node_exec_id"] == "test_node_123" assert data[0]["status"] == "WAITING" - mock_get_reviews.assert_called_once_with("test_user", 2, 10) + mock_get_reviews.assert_called_once_with(test_user_id, 2, 10) def test_get_pending_reviews_for_execution_success( mocker: pytest_mock.MockFixture, sample_pending_review: PendingHumanReviewModel, snapshot: Snapshot, + test_user_id: str, ) -> None: """Test getting pending reviews for specific execution""" mock_get_graph_execution = mocker.patch( @@ -104,7 +109,7 @@ def test_get_pending_reviews_for_execution_success( ) mock_get_graph_execution.return_value = { "id": "test_graph_exec_456", - "user_id": "test_user", + "user_id": test_user_id, } mock_get_reviews = mocker.patch( @@ -122,6 +127,7 @@ def test_get_pending_reviews_for_execution_success( def test_get_pending_reviews_for_execution_access_denied( mocker: pytest_mock.MockFixture, + test_user_id: str, ) -> None: """Test access denied when user doesn't own the execution""" mock_get_graph_execution = mocker.patch( @@ -138,13 +144,10 @@ def test_get_pending_reviews_for_execution_access_denied( def test_process_review_action_approve_success( mocker: pytest_mock.MockFixture, sample_pending_review: PendingHumanReviewModel, + test_user_id: str, ) -> None: """Test successful review approval""" - # Mock the validation functions - mock_get_pending_review = mocker.patch( - "backend.data.human_review.get_pending_review_by_node_exec_id" - ) - mock_get_pending_review.return_value = sample_pending_review + # Mock the route functions mock_get_reviews_for_execution = mocker.patch( "backend.server.v2.executions.review.routes.get_pending_reviews_for_execution" @@ -154,24 +157,42 @@ def test_process_review_action_approve_success( mock_process_all_reviews = mocker.patch( "backend.server.v2.executions.review.routes.process_all_reviews_for_execution" ) - mock_process_all_reviews.return_value = {"test_node_123": sample_pending_review} + # Create approved review for return + approved_review = PendingHumanReviewModel( + node_exec_id="test_node_123", + user_id=test_user_id, + graph_exec_id="test_graph_exec_456", + graph_id="test_graph_789", + graph_version=1, + payload={"data": "modified payload", "value": 50}, + instructions="Please review this data", + editable=True, + status=ReviewStatus.APPROVED, + review_message="Looks good", + was_edited=True, + processed=False, + created_at=FIXED_NOW, + updated_at=FIXED_NOW, + reviewed_at=FIXED_NOW, + ) + mock_process_all_reviews.return_value = {"test_node_123": approved_review} mock_has_pending = mocker.patch( - "backend.data.human_review.has_pending_reviews_for_graph_exec" + "backend.server.v2.executions.review.routes.has_pending_reviews_for_graph_exec" ) mock_has_pending.return_value = False - mocker.patch("backend.executor.utils.add_graph_execution") + mocker.patch("backend.server.v2.executions.review.routes.add_graph_execution") request_data = { - "approved_reviews": [ + "reviews": [ { "node_exec_id": "test_node_123", + "approved": True, "message": "Looks good", "reviewed_data": {"data": "modified payload", "value": 50}, } - ], - "rejected_review_ids": [], + ] } response = client.post("/api/review/action", json=request_data) @@ -187,13 +208,10 @@ def test_process_review_action_approve_success( def test_process_review_action_reject_success( mocker: pytest_mock.MockFixture, sample_pending_review: PendingHumanReviewModel, + test_user_id: str, ) -> None: """Test successful review rejection""" - # Mock the validation functions - mock_get_pending_review = mocker.patch( - "backend.data.human_review.get_pending_review_by_node_exec_id" - ) - mock_get_pending_review.return_value = sample_pending_review + # Mock the route functions mock_get_reviews_for_execution = mocker.patch( "backend.server.v2.executions.review.routes.get_pending_reviews_for_execution" @@ -205,7 +223,7 @@ def test_process_review_action_reject_success( ) rejected_review = PendingHumanReviewModel( node_exec_id="test_node_123", - user_id="test_user", + user_id=test_user_id, graph_exec_id="test_graph_exec_456", graph_id="test_graph_789", graph_version=1, @@ -223,11 +241,19 @@ def test_process_review_action_reject_success( mock_process_all_reviews.return_value = {"test_node_123": rejected_review} mock_has_pending = mocker.patch( - "backend.data.human_review.has_pending_reviews_for_graph_exec" + "backend.server.v2.executions.review.routes.has_pending_reviews_for_graph_exec" ) mock_has_pending.return_value = False - request_data = {"approved_reviews": [], "rejected_review_ids": ["test_node_123"]} + request_data = { + "reviews": [ + { + "node_exec_id": "test_node_123", + "approved": False, + "message": None, + } + ] + } response = client.post("/api/review/action", json=request_data) @@ -242,12 +268,13 @@ def test_process_review_action_reject_success( def test_process_review_action_mixed_success( mocker: pytest_mock.MockFixture, sample_pending_review: PendingHumanReviewModel, + test_user_id: str, ) -> None: """Test mixed approve/reject operations""" # Create a second review second_review = PendingHumanReviewModel( node_exec_id="test_node_456", - user_id="test_user", + user_id=test_user_id, graph_exec_id="test_graph_exec_456", graph_id="test_graph_789", graph_version=1, @@ -263,13 +290,7 @@ def test_process_review_action_mixed_success( reviewed_at=None, ) - # Mock the validation functions - mock_get_pending_review = mocker.patch( - "backend.data.human_review.get_pending_review_by_node_exec_id" - ) - mock_get_pending_review.side_effect = lambda node_id, user_id: ( - sample_pending_review if node_id == "test_node_123" else second_review - ) + # Mock the route functions mock_get_reviews_for_execution = mocker.patch( "backend.server.v2.executions.review.routes.get_pending_reviews_for_execution" @@ -282,7 +303,7 @@ def test_process_review_action_mixed_success( # Create approved version of first review approved_review = PendingHumanReviewModel( node_exec_id="test_node_123", - user_id="test_user", + user_id=test_user_id, graph_exec_id="test_graph_exec_456", graph_id="test_graph_789", graph_version=1, @@ -300,7 +321,7 @@ def test_process_review_action_mixed_success( # Create rejected version of second review rejected_review = PendingHumanReviewModel( node_exec_id="test_node_456", - user_id="test_user", + user_id=test_user_id, graph_exec_id="test_graph_exec_456", graph_id="test_graph_789", graph_version=1, @@ -321,19 +342,24 @@ def test_process_review_action_mixed_success( } mock_has_pending = mocker.patch( - "backend.data.human_review.has_pending_reviews_for_graph_exec" + "backend.server.v2.executions.review.routes.has_pending_reviews_for_graph_exec" ) mock_has_pending.return_value = False request_data = { - "approved_reviews": [ + "reviews": [ { "node_exec_id": "test_node_123", + "approved": True, "message": "Approved", "reviewed_data": {"data": "modified"}, - } - ], - "rejected_review_ids": ["test_node_456"], + }, + { + "node_exec_id": "test_node_456", + "approved": False, + "message": None, + }, + ] } response = client.post("/api/review/action", json=request_data) @@ -348,52 +374,64 @@ def test_process_review_action_mixed_success( def test_process_review_action_empty_request( mocker: pytest_mock.MockFixture, + test_user_id: str, ) -> None: """Test error when no reviews provided""" - request_data = {"approved_reviews": [], "rejected_review_ids": []} + request_data = {"reviews": []} response = client.post("/api/review/action", json=request_data) - assert response.status_code == 400 - assert "At least one review must be provided" in response.json()["detail"] + assert response.status_code == 422 + response_data = response.json() + # Pydantic validation error format + assert isinstance(response_data["detail"], list) + assert len(response_data["detail"]) > 0 + assert "At least one review must be provided" in response_data["detail"][0]["msg"] def test_process_review_action_review_not_found( mocker: pytest_mock.MockFixture, + test_user_id: str, ) -> None: """Test error when review is not found""" - mock_get_pending_review = mocker.patch( - "backend.data.human_review.get_pending_review_by_node_exec_id" + # Mock the functions that extract graph execution ID from the request + mock_get_reviews_for_execution = mocker.patch( + "backend.server.v2.executions.review.routes.get_pending_reviews_for_execution" + ) + mock_get_reviews_for_execution.return_value = [] # No reviews found + + # Mock process_all_reviews to simulate not finding reviews + mock_process_all_reviews = mocker.patch( + "backend.server.v2.executions.review.routes.process_all_reviews_for_execution" + ) + # This should raise a ValueError with "Reviews not found" message based on the data/human_review.py logic + mock_process_all_reviews.side_effect = ValueError( + "Reviews not found or access denied for IDs: nonexistent_node" ) - mock_get_pending_review.return_value = None request_data = { - "approved_reviews": [ + "reviews": [ { "node_exec_id": "nonexistent_node", + "approved": True, "message": "Test", } - ], - "rejected_review_ids": [], + ] } response = client.post("/api/review/action", json=request_data) - assert response.status_code == 403 - assert "not found or access denied" in response.json()["detail"] + assert response.status_code == 400 + assert "Reviews not found" in response.json()["detail"] def test_process_review_action_partial_failure( mocker: pytest_mock.MockFixture, sample_pending_review: PendingHumanReviewModel, + test_user_id: str, ) -> None: """Test handling of partial failures in review processing""" - # Mock successful validation - mock_get_pending_review = mocker.patch( - "backend.data.human_review.get_pending_review_by_node_exec_id" - ) - mock_get_pending_review.return_value = sample_pending_review - + # Mock the route functions mock_get_reviews_for_execution = mocker.patch( "backend.server.v2.executions.review.routes.get_pending_reviews_for_execution" ) @@ -406,58 +444,53 @@ def test_process_review_action_partial_failure( mock_process_all_reviews.side_effect = ValueError("Some reviews failed validation") request_data = { - "approved_reviews": [ + "reviews": [ { "node_exec_id": "test_node_123", + "approved": True, "message": "Test", } - ], - "rejected_review_ids": [], + ] } response = client.post("/api/review/action", json=request_data) - assert response.status_code == 200 - data = response.json() - assert data["approved_count"] == 0 - assert data["rejected_count"] == 0 - assert data["failed_count"] == 1 - assert "Failed to process reviews" in data["error"] + assert response.status_code == 400 + assert "Some reviews failed validation" in response.json()["detail"] -def test_process_review_action_complete_failure( +def test_process_review_action_invalid_node_exec_id( mocker: pytest_mock.MockFixture, sample_pending_review: PendingHumanReviewModel, + test_user_id: str, ) -> None: - """Test complete failure scenario""" - # Mock successful validation - mock_get_pending_review = mocker.patch( - "backend.data.human_review.get_pending_review_by_node_exec_id" - ) - mock_get_pending_review.return_value = sample_pending_review - + """Test failure when trying to process review with invalid node execution ID""" + # Mock the route functions mock_get_reviews_for_execution = mocker.patch( "backend.server.v2.executions.review.routes.get_pending_reviews_for_execution" ) mock_get_reviews_for_execution.return_value = [sample_pending_review] - # Mock complete failure in processing + # Mock validation failure - this should return 400, not 500 mock_process_all_reviews = mocker.patch( "backend.server.v2.executions.review.routes.process_all_reviews_for_execution" ) - mock_process_all_reviews.side_effect = Exception("Database error") + mock_process_all_reviews.side_effect = ValueError( + "Invalid node execution ID format" + ) request_data = { - "approved_reviews": [ + "reviews": [ { - "node_exec_id": "test_node_123", + "node_exec_id": "invalid-node-format", + "approved": True, "message": "Test", } - ], - "rejected_review_ids": [], + ] } response = client.post("/api/review/action", json=request_data) - assert response.status_code == 500 - assert "error" in response.json()["detail"].lower() + # Should be a 400 Bad Request, not 500 Internal Server Error + assert response.status_code == 400 + assert "Invalid node execution ID format" in response.json()["detail"] diff --git a/autogpt_platform/backend/backend/server/v2/library/db.py b/autogpt_platform/backend/backend/server/v2/library/db.py index a95b840d90..aae6e4cc65 100644 --- a/autogpt_platform/backend/backend/server/v2/library/db.py +++ b/autogpt_platform/backend/backend/server/v2/library/db.py @@ -17,6 +17,7 @@ import backend.server.v2.store.media as store_media from backend.data.block import BlockInput from backend.data.db import transaction from backend.data.execution import get_graph_execution +from backend.data.graph import GraphSettings from backend.data.includes import AGENT_PRESET_INCLUDE, library_agent_include from backend.data.model import CredentialsMetaInput from backend.integrations.creds_manager import IntegrationCredentialsManager @@ -400,6 +401,24 @@ async def add_generated_agent_image( ) +def _initialize_graph_settings(graph: graph_db.GraphModel) -> GraphSettings: + """ + Initialize GraphSettings based on graph content. + + Args: + graph: The graph to analyze + + Returns: + GraphSettings with appropriate human_in_the_loop_safe_mode value + """ + if graph.has_human_in_the_loop: + # Graph has HITL blocks - set safe mode to True by default + return GraphSettings(human_in_the_loop_safe_mode=True) + else: + # Graph has no HITL blocks - keep None + return GraphSettings(human_in_the_loop_safe_mode=None) + + async def create_library_agent( graph: graph_db.GraphModel, user_id: str, @@ -445,6 +464,9 @@ async def create_library_agent( } } }, + settings=SafeJson( + _initialize_graph_settings(graph_entry).model_dump() + ), ), include=library_agent_include( user_id, include_nodes=False, include_executions=False @@ -465,7 +487,7 @@ async def update_agent_version_in_library( user_id: str, agent_graph_id: str, agent_graph_version: int, -) -> None: +) -> library_model.LibraryAgent: """ Updates the agent version in the library if useGraphIsActiveVersion is True. @@ -489,7 +511,7 @@ async def update_agent_version_in_library( "useGraphIsActiveVersion": True, }, ) - await prisma.models.LibraryAgent.prisma().update( + lib = await prisma.models.LibraryAgent.prisma().update( where={"id": library_agent.id}, data={ "AgentGraph": { @@ -502,6 +524,10 @@ async def update_agent_version_in_library( }, }, ) + if lib is None: + raise NotFoundError(f"Library agent {library_agent.id} not found") + + return library_model.LibraryAgent.from_db(lib) except prisma.errors.PrismaError as e: logger.error(f"Database error updating agent version in library: {e}") raise DatabaseError("Failed to update agent version in library") from e @@ -514,6 +540,7 @@ async def update_library_agent( is_favorite: Optional[bool] = None, is_archived: Optional[bool] = None, is_deleted: Optional[Literal[False]] = None, + settings: Optional[GraphSettings] = None, ) -> library_model.LibraryAgent: """ Updates the specified LibraryAgent record. @@ -524,6 +551,7 @@ async def update_library_agent( auto_update_version: Whether the agent should auto-update to active version. is_favorite: Whether this agent is marked as a favorite. is_archived: Whether this agent is archived. + settings: User-specific settings for this library agent. Returns: The updated LibraryAgent. @@ -535,7 +563,7 @@ async def update_library_agent( logger.debug( f"Updating library agent {library_agent_id} for user {user_id} with " f"auto_update_version={auto_update_version}, is_favorite={is_favorite}, " - f"is_archived={is_archived}" + f"is_archived={is_archived}, settings={settings}" ) update_fields: prisma.types.LibraryAgentUpdateManyMutationInput = {} if auto_update_version is not None: @@ -550,6 +578,8 @@ async def update_library_agent( "Use delete_library_agent() to (soft-)delete library agents" ) update_fields["isDeleted"] = is_deleted + if settings is not None: + update_fields["settings"] = SafeJson(settings.model_dump()) if not update_fields: raise ValueError("No values were passed to update") @@ -570,6 +600,33 @@ async def update_library_agent( raise DatabaseError("Failed to update library agent") from e +async def update_library_agent_settings( + user_id: str, + agent_id: str, + settings: GraphSettings, +) -> library_model.LibraryAgent: + """ + Updates the settings for a specific LibraryAgent. + + Args: + user_id: The owner of the LibraryAgent. + agent_id: The ID of the LibraryAgent to update. + settings: New GraphSettings to apply. + + Returns: + The updated LibraryAgent. + + Raises: + NotFoundError: If the specified LibraryAgent does not exist. + DatabaseError: If there's an error in the update operation. + """ + return await update_library_agent( + library_agent_id=agent_id, + user_id=user_id, + settings=settings, + ) + + async def delete_library_agent( library_agent_id: str, user_id: str, soft_delete: bool = True ) -> None: @@ -706,6 +763,18 @@ async def add_store_agent_to_library( graph = store_listing_version.AgentGraph + # Convert to GraphModel to check for HITL blocks + graph_model = await graph_db.get_graph( + graph_id=graph.id, + version=graph.version, + user_id=user_id, + include_subgraphs=False, + ) + if not graph_model: + raise store_exceptions.AgentNotFoundError( + f"Graph #{graph.id} v{graph.version} not found or accessible" + ) + # Check if user already has this agent existing_library_agent = await prisma.models.LibraryAgent.prisma().find_unique( where={ @@ -740,6 +809,9 @@ async def add_store_agent_to_library( } }, "isCreatedByUser": False, + "settings": SafeJson( + _initialize_graph_settings(graph_model).model_dump() + ), }, include=library_agent_include( user_id, include_nodes=False, include_executions=False diff --git a/autogpt_platform/backend/backend/server/v2/library/db_test.py b/autogpt_platform/backend/backend/server/v2/library/db_test.py index 2d42d26cfa..cb0095fb39 100644 --- a/autogpt_platform/backend/backend/server/v2/library/db_test.py +++ b/autogpt_platform/backend/backend/server/v2/library/db_test.py @@ -32,6 +32,7 @@ async def test_get_library_agents(mocker): id="ua1", userId="test-user", agentGraphId="agent2", + settings="{}", # type: ignore agentGraphVersion=1, isCreatedByUser=False, isDeleted=False, @@ -123,6 +124,7 @@ async def test_add_agent_to_library(mocker): id="ua1", userId="test-user", agentGraphId=mock_store_listing_data.agentGraphId, + settings="{}", # type: ignore agentGraphVersion=1, isCreatedByUser=False, isDeleted=False, @@ -148,6 +150,14 @@ async def test_add_agent_to_library(mocker): return_value=mock_library_agent_data ) + # Mock graph_db.get_graph function that's called to check for HITL blocks + mock_graph_db = mocker.patch("backend.server.v2.library.db.graph_db") + mock_graph_model = mocker.Mock() + mock_graph_model.nodes = ( + [] + ) # Empty list so _has_human_in_the_loop_blocks returns False + mock_graph_db.get_graph = mocker.AsyncMock(return_value=mock_graph_model) + # Mock the model conversion mock_from_db = mocker.patch("backend.server.v2.library.model.LibraryAgent.from_db") mock_from_db.return_value = mocker.Mock() @@ -169,17 +179,29 @@ async def test_add_agent_to_library(mocker): }, include={"AgentGraph": True}, ) - mock_library_agent.return_value.create.assert_called_once_with( - data={ - "User": {"connect": {"id": "test-user"}}, - "AgentGraph": { - "connect": {"graphVersionId": {"id": "agent1", "version": 1}} - }, - "isCreatedByUser": False, - }, - include=library_agent_include( - "test-user", include_nodes=False, include_executions=False - ), + # Check that create was called with the expected data including settings + create_call_args = mock_library_agent.return_value.create.call_args + assert create_call_args is not None + + # Verify the main structure + expected_data = { + "User": {"connect": {"id": "test-user"}}, + "AgentGraph": {"connect": {"graphVersionId": {"id": "agent1", "version": 1}}}, + "isCreatedByUser": False, + } + + actual_data = create_call_args[1]["data"] + # Check that all expected fields are present + for key, value in expected_data.items(): + assert actual_data[key] == value + + # Check that settings field is present and is a SafeJson object + assert "settings" in actual_data + assert hasattr(actual_data["settings"], "__class__") # Should be a SafeJson object + + # Check include parameter + assert create_call_args[1]["include"] == library_agent_include( + "test-user", include_nodes=False, include_executions=False ) diff --git a/autogpt_platform/backend/backend/server/v2/library/model.py b/autogpt_platform/backend/backend/server/v2/library/model.py index d6b5ba90de..ab4bec586e 100644 --- a/autogpt_platform/backend/backend/server/v2/library/model.py +++ b/autogpt_platform/backend/backend/server/v2/library/model.py @@ -6,8 +6,8 @@ import prisma.enums import prisma.models import pydantic -import backend.data.block as block_model -import backend.data.graph as graph_model +from backend.data.block import BlockInput +from backend.data.graph import GraphModel, GraphSettings, GraphTriggerInfo from backend.data.model import CredentialsMetaInput, is_credentials_field_name from backend.util.models import Pagination @@ -72,7 +72,7 @@ class LibraryAgent(pydantic.BaseModel): has_external_trigger: bool = pydantic.Field( description="Whether the agent has an external trigger (e.g. webhook) node" ) - trigger_setup_info: Optional[graph_model.GraphTriggerInfo] = None + trigger_setup_info: Optional[GraphTriggerInfo] = None # Indicates whether there's a new output (based on recent runs) new_output: bool @@ -89,6 +89,9 @@ class LibraryAgent(pydantic.BaseModel): # Recommended schedule cron (from marketplace agents) recommended_schedule_cron: str | None = None + # User-specific settings for this library agent + settings: GraphSettings = pydantic.Field(default_factory=GraphSettings) + # Marketplace listing information if the agent has been published marketplace_listing: Optional["MarketplaceListing"] = None @@ -106,7 +109,7 @@ class LibraryAgent(pydantic.BaseModel): if not agent.AgentGraph: raise ValueError("Associated Agent record is required.") - graph = graph_model.GraphModel.from_db(agent.AgentGraph, sub_graphs=sub_graphs) + graph = GraphModel.from_db(agent.AgentGraph, sub_graphs=sub_graphs) created_at = agent.createdAt @@ -181,6 +184,7 @@ class LibraryAgent(pydantic.BaseModel): is_latest_version=is_latest_version, is_favorite=agent.isFavorite, recommended_schedule_cron=agent.AgentGraph.recommendedScheduleCron, + settings=GraphSettings.model_validate(agent.settings), marketplace_listing=marketplace_listing_data, ) @@ -249,7 +253,7 @@ class LibraryAgentPresetCreatable(pydantic.BaseModel): graph_id: str graph_version: int - inputs: block_model.BlockInput + inputs: BlockInput credentials: dict[str, CredentialsMetaInput] name: str @@ -278,7 +282,7 @@ class LibraryAgentPresetUpdatable(pydantic.BaseModel): Request model used when updating a preset for a library agent. """ - inputs: Optional[block_model.BlockInput] = None + inputs: Optional[BlockInput] = None credentials: Optional[dict[str, CredentialsMetaInput]] = None name: Optional[str] = None @@ -321,7 +325,7 @@ class LibraryAgentPreset(LibraryAgentPresetCreatable): "Webhook must be included in AgentPreset query when webhookId is set" ) - input_data: block_model.BlockInput = {} + input_data: BlockInput = {} input_credentials: dict[str, CredentialsMetaInput] = {} for preset_input in preset.InputPresets: @@ -387,3 +391,6 @@ class LibraryAgentUpdateRequest(pydantic.BaseModel): is_archived: Optional[bool] = pydantic.Field( default=None, description="Archive the agent" ) + settings: Optional[GraphSettings] = pydantic.Field( + default=None, description="User-specific settings for this library agent" + ) diff --git a/autogpt_platform/backend/backend/server/v2/library/routes/agents.py b/autogpt_platform/backend/backend/server/v2/library/routes/agents.py index eeea9d8fb6..1235ca07d0 100644 --- a/autogpt_platform/backend/backend/server/v2/library/routes/agents.py +++ b/autogpt_platform/backend/backend/server/v2/library/routes/agents.py @@ -276,6 +276,7 @@ async def update_library_agent( auto_update_version=payload.auto_update_version, is_favorite=payload.is_favorite, is_archived=payload.is_archived, + settings=payload.settings, ) except NotFoundError as e: raise HTTPException( diff --git a/autogpt_platform/backend/backend/util/cache.py b/autogpt_platform/backend/backend/util/cache.py index c718d4ef90..757ba45b42 100644 --- a/autogpt_platform/backend/backend/util/cache.py +++ b/autogpt_platform/backend/backend/util/cache.py @@ -27,6 +27,7 @@ from backend.util.settings import Settings P = ParamSpec("P") R = TypeVar("R") R_co = TypeVar("R_co", covariant=True) +T = TypeVar("T") logger = logging.getLogger(__name__) settings = Settings() @@ -143,7 +144,7 @@ def cached( ttl_seconds: int, shared_cache: bool = False, refresh_ttl_on_get: bool = False, -) -> Callable[[Callable], CachedFunction]: +) -> Callable[[Callable[P, R]], CachedFunction[P, R]]: """ Thundering herd safe cache decorator for both sync and async functions. @@ -169,7 +170,7 @@ def cached( return {"result": param} """ - def decorator(target_func): + def decorator(target_func: Callable[P, R]) -> CachedFunction[P, R]: cache_storage: dict[tuple, CachedValue] = {} _event_loop_locks: dict[Any, asyncio.Lock] = {} @@ -386,7 +387,7 @@ def cached( setattr(wrapper, "cache_info", cache_info) setattr(wrapper, "cache_delete", cache_delete) - return cast(CachedFunction, wrapper) + return cast(CachedFunction[P, R], wrapper) return decorator diff --git a/autogpt_platform/backend/backend/util/test.py b/autogpt_platform/backend/backend/util/test.py index 0a2015254b..dda62e7f9f 100644 --- a/autogpt_platform/backend/backend/util/test.py +++ b/autogpt_platform/backend/backend/util/test.py @@ -9,9 +9,9 @@ from autogpt_libs.auth import get_user_id from backend.data import db from backend.data.block import Block, BlockSchema, initialize_blocks from backend.data.execution import ( + ExecutionContext, ExecutionStatus, NodeExecutionResult, - UserContext, get_graph_execution, ) from backend.data.model import _BaseCredentials @@ -141,7 +141,7 @@ async def execute_block_test(block: Block): "node_exec_id": str(uuid.uuid4()), "user_id": str(uuid.uuid4()), "graph_version": 1, # Default version for tests - "user_context": UserContext(timezone="UTC"), # Default for tests + "execution_context": ExecutionContext(), } input_model = cast(type[BlockSchema], block.input_schema) credentials_input_fields = input_model.get_credentials_fields() diff --git a/autogpt_platform/backend/backend/util/timezone_utils.py b/autogpt_platform/backend/backend/util/timezone_utils.py index 6a6c438085..76614a8357 100644 --- a/autogpt_platform/backend/backend/util/timezone_utils.py +++ b/autogpt_platform/backend/backend/util/timezone_utils.py @@ -10,6 +10,8 @@ from zoneinfo import ZoneInfo from croniter import croniter +from backend.data.model import USER_TIMEZONE_NOT_SET + logger = logging.getLogger(__name__) @@ -138,7 +140,7 @@ def get_user_timezone_or_utc(user_timezone: Optional[str]) -> str: Returns: Valid timezone string (user's preference or UTC fallback) """ - if not user_timezone or user_timezone == "not-set": + if not user_timezone or user_timezone == USER_TIMEZONE_NOT_SET: return "UTC" if validate_timezone(user_timezone): diff --git a/autogpt_platform/backend/migrations/20251128112407_add_library_agent_settings/migration.sql b/autogpt_platform/backend/migrations/20251128112407_add_library_agent_settings/migration.sql new file mode 100644 index 0000000000..a9cf141ce2 --- /dev/null +++ b/autogpt_platform/backend/migrations/20251128112407_add_library_agent_settings/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "LibraryAgent" ADD COLUMN "settings" JSONB NOT NULL DEFAULT '{}'; diff --git a/autogpt_platform/backend/schema.prisma b/autogpt_platform/backend/schema.prisma index 7ebfa54905..ad1f484b68 100644 --- a/autogpt_platform/backend/schema.prisma +++ b/autogpt_platform/backend/schema.prisma @@ -266,6 +266,8 @@ model LibraryAgent { isArchived Boolean @default(false) isDeleted Boolean @default(false) + settings Json @default("{}") + @@unique([userId, agentGraphId, agentGraphVersion]) @@index([agentGraphId, agentGraphVersion]) @@index([creatorId]) @@ -478,26 +480,26 @@ enum ReviewStatus { // Pending human reviews for Human-in-the-loop blocks model PendingHumanReview { - nodeExecId String @id - userId String - graphExecId String - graphId String - graphVersion Int - payload Json // The actual payload data to be reviewed - instructions String? // Instructions/message for the reviewer - editable Boolean @default(true) // Whether the reviewer can edit the data - status ReviewStatus @default(WAITING) - reviewMessage String? // Optional message from the reviewer - wasEdited Boolean? // Whether the data was modified during review - processed Boolean @default(false) // Whether the review result has been processed by the execution engine - createdAt DateTime @default(now()) - updatedAt DateTime? @updatedAt - reviewedAt DateTime? - - User User @relation(fields: [userId], references: [id], onDelete: Cascade) - NodeExecution AgentNodeExecution @relation(fields: [nodeExecId], references: [id], onDelete: Cascade) - GraphExecution AgentGraphExecution @relation(fields: [graphExecId], references: [id], onDelete: Cascade) - + nodeExecId String @id + userId String + graphExecId String + graphId String + graphVersion Int + payload Json // The actual payload data to be reviewed + instructions String? // Instructions/message for the reviewer + editable Boolean @default(true) // Whether the reviewer can edit the data + status ReviewStatus @default(WAITING) + reviewMessage String? // Optional message from the reviewer + wasEdited Boolean? // Whether the data was modified during review + processed Boolean @default(false) // Whether the review result has been processed by the execution engine + createdAt DateTime @default(now()) + updatedAt DateTime? @updatedAt + reviewedAt DateTime? + + User User @relation(fields: [userId], references: [id], onDelete: Cascade) + NodeExecution AgentNodeExecution @relation(fields: [nodeExecId], references: [id], onDelete: Cascade) + GraphExecution AgentGraphExecution @relation(fields: [graphExecId], references: [id], onDelete: Cascade) + @@unique([nodeExecId]) // One pending review per node execution @@index([userId, status]) @@index([graphExecId, status]) @@ -704,7 +706,7 @@ view StoreAgent { agent_video String? agent_image String[] - featured Boolean @default(false) + featured Boolean @default(false) creator_username String? creator_avatar String? sub_heading String @@ -714,8 +716,8 @@ view StoreAgent { runs Int rating Float versions String[] - is_available Boolean @default(true) - useForOnboarding Boolean @default(false) + is_available Boolean @default(true) + useForOnboarding Boolean @default(false) // Materialized views used (refreshed every 15 minutes via pg_cron): // - mv_agent_run_counts - Pre-aggregated agent execution counts by agentGraphId diff --git a/autogpt_platform/backend/snapshots/grph_single b/autogpt_platform/backend/snapshots/grph_single index d9207eb205..7ba26f6171 100644 --- a/autogpt_platform/backend/snapshots/grph_single +++ b/autogpt_platform/backend/snapshots/grph_single @@ -9,6 +9,7 @@ "forked_from_id": null, "forked_from_version": null, "has_external_trigger": false, + "has_human_in_the_loop": false, "id": "graph-123", "input_schema": { "properties": {}, diff --git a/autogpt_platform/backend/snapshots/grphs_all b/autogpt_platform/backend/snapshots/grphs_all index 42f4174d7b..d54df2bc18 100644 --- a/autogpt_platform/backend/snapshots/grphs_all +++ b/autogpt_platform/backend/snapshots/grphs_all @@ -9,6 +9,7 @@ "forked_from_id": null, "forked_from_version": null, "has_external_trigger": false, + "has_human_in_the_loop": false, "id": "graph-123", "input_schema": { "properties": {}, diff --git a/autogpt_platform/backend/snapshots/lib_agts_search b/autogpt_platform/backend/snapshots/lib_agts_search index 649c82975e..d1feb7d16d 100644 --- a/autogpt_platform/backend/snapshots/lib_agts_search +++ b/autogpt_platform/backend/snapshots/lib_agts_search @@ -32,6 +32,9 @@ "is_latest_version": true, "is_favorite": false, "recommended_schedule_cron": null, + "settings": { + "human_in_the_loop_safe_mode": null + }, "marketplace_listing": null }, { @@ -66,6 +69,9 @@ "is_latest_version": true, "is_favorite": false, "recommended_schedule_cron": null, + "settings": { + "human_in_the_loop_safe_mode": null + }, "marketplace_listing": null } ], diff --git a/autogpt_platform/backend/test_requeue_integration.py b/autogpt_platform/backend/test_requeue_integration.py index 95deb1f183..da1e00e357 100644 --- a/autogpt_platform/backend/test_requeue_integration.py +++ b/autogpt_platform/backend/test_requeue_integration.py @@ -59,10 +59,9 @@ class QueueOrderTester: "graph_exec_id": f"exec-{message_id}", "graph_id": f"graph-{message_id}", "user_id": user_id, - "user_context": {"timezone": "UTC"}, + "execution_context": {"timezone": "UTC"}, "nodes_input_masks": {}, "starting_nodes_input": [], - "parent_graph_exec_id": None, } ) diff --git a/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/Flow/Flow.tsx b/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/Flow/Flow.tsx index d4c97353b7..1fb10f72a1 100644 --- a/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/Flow/Flow.tsx +++ b/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/Flow/Flow.tsx @@ -16,14 +16,29 @@ import { useCopyPaste } from "./useCopyPaste"; import { FloatingReviewsPanel } from "@/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel"; import { parseAsString, useQueryStates } from "nuqs"; import { CustomControls } from "./components/CustomControl"; +import { FloatingSafeModeToggle } from "@/components/molecules/FloatingSafeModeToggle/FloatingSafeModeToggle"; +import { useGetV1GetSpecificGraph } from "@/app/api/__generated__/endpoints/graphs/graphs"; +import { GraphModel } from "@/app/api/__generated__/models/graphModel"; +import { okData } from "@/app/api/helpers"; import { TriggerAgentBanner } from "./components/TriggerAgentBanner"; export const Flow = () => { - const [{ flowExecutionID }] = useQueryStates({ + const [{ flowID, flowExecutionID }] = useQueryStates({ flowID: parseAsString, flowExecutionID: parseAsString, }); + const { data: graph } = useGetV1GetSpecificGraph( + flowID ?? "", + {}, + { + query: { + select: okData, + enabled: !!flowID, + }, + }, + ); + const nodes = useNodeStore(useShallow((state) => state.nodes)); const onNodesChange = useNodeStore( useShallow((state) => state.onNodesChange), @@ -83,9 +98,19 @@ export const Flow = () => { {hasWebhookNodes ? : } {} {isGraphRunning && } + {graph && ( + + )} - + ); }; diff --git a/autogpt_platform/frontend/src/app/(platform)/build/components/legacy-builder/Flow/Flow.tsx b/autogpt_platform/frontend/src/app/(platform)/build/components/legacy-builder/Flow/Flow.tsx index 3e733eab96..f80a480542 100644 --- a/autogpt_platform/frontend/src/app/(platform)/build/components/legacy-builder/Flow/Flow.tsx +++ b/autogpt_platform/frontend/src/app/(platform)/build/components/legacy-builder/Flow/Flow.tsx @@ -65,6 +65,7 @@ import NewControlPanel from "@/app/(platform)/build/components/NewControlPanel/N import { Flag, useGetFlag } from "@/services/feature-flags/use-get-flag"; import { BuildActionBar } from "../BuildActionBar"; import { FloatingReviewsPanel } from "@/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel"; +import { FloatingSafeModeToggle } from "@/components/molecules/FloatingSafeModeToggle/FloatingSafeModeToggle"; // This is for the history, this is the minimum distance a block must move before it is logged // It helps to prevent spamming the history with small movements especially when pressing on a input in a block @@ -927,6 +928,13 @@ const FlowEditor: React.FC<{ > + {savedAgent && ( + + )} {isNewBlockEnabled ? ( diff --git a/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/NewAgentLibraryView/components/selected-views/RunDetailHeader/RunDetailHeader.tsx b/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/NewAgentLibraryView/components/selected-views/RunDetailHeader/RunDetailHeader.tsx index af19d94cd5..7f9c5065d1 100644 --- a/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/NewAgentLibraryView/components/selected-views/RunDetailHeader/RunDetailHeader.tsx +++ b/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/NewAgentLibraryView/components/selected-views/RunDetailHeader/RunDetailHeader.tsx @@ -14,6 +14,7 @@ import moment from "moment"; import { AgentActionsDropdown } from "../AgentActionsDropdown"; import { RunStatusBadge } from "../SelectedRunView/components/RunStatusBadge"; import { ShareRunButton } from "../ShareRunButton/ShareRunButton"; +import { FloatingSafeModeToggle } from "@/components/molecules/FloatingSafeModeToggle/FloatingSafeModeToggle"; import { useRunDetailHeader } from "./useRunDetailHeader"; type Props = { @@ -79,6 +80,11 @@ export function RunDetailHeader({ shareToken={run.share_token} /> )} + {!isRunning ? ( + + +
+
+ Safe Mode: {currentSafeMode! ? "ON" : "OFF"} +
+
+ {currentSafeMode! + ? "HITL blocks require manual review" + : "HITL blocks proceed automatically"} +
+
+
+ + + ); +} diff --git a/autogpt_platform/frontend/src/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel.tsx b/autogpt_platform/frontend/src/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel.tsx index feb4da96fe..183fd8599e 100644 --- a/autogpt_platform/frontend/src/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel.tsx +++ b/autogpt_platform/frontend/src/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel.tsx @@ -5,36 +5,50 @@ import { Button } from "@/components/atoms/Button/Button"; import { ClockIcon, XIcon } from "@phosphor-icons/react"; import { cn } from "@/lib/utils"; import { Text } from "@/components/atoms/Text/Text"; +import { useGetV1GetExecutionDetails } from "@/app/api/__generated__/endpoints/graphs/graphs"; import { AgentExecutionStatus } from "@/app/api/__generated__/models/agentExecutionStatus"; -import { useGraphStore } from "@/app/(platform)/build/stores/graphStore"; interface FloatingReviewsPanelProps { executionId?: string; + graphId?: string; className?: string; } export function FloatingReviewsPanel({ executionId, + graphId, className, }: FloatingReviewsPanelProps) { const [isOpen, setIsOpen] = useState(false); - const executionStatus = useGraphStore((state) => state.graphExecutionStatus); + const { data: executionDetails } = useGetV1GetExecutionDetails( + graphId || "", + executionId || "", + { + query: { + enabled: !!(graphId && executionId), + }, + }, + ); + + const executionStatus = + executionDetails?.status === 200 ? executionDetails.data.status : undefined; const { pendingReviews, isLoading, refetch } = usePendingReviewsForExecution( executionId || "", ); useEffect(() => { - if (executionStatus === AgentExecutionStatus.REVIEW && executionId) { + if (executionId) { refetch(); } }, [executionStatus, executionId, refetch]); if ( !executionId || - (!isLoading && pendingReviews.length === 0) || - executionStatus !== AgentExecutionStatus.REVIEW + (!isLoading && + pendingReviews.length === 0 && + executionStatus !== AgentExecutionStatus.REVIEW) ) { return null; }