feat(platform): implement graph-level Safe Mode toggle for HITL blocks (#11455)

## Summary

This PR implements a graph-level Safe Mode toggle system for
Human-in-the-Loop (HITL) blocks. When Safe Mode is ON (default), HITL
blocks require manual review before proceeding. When OFF, they execute
automatically.

## 🔧 Backend Changes

- **Database**: Added `metadata` JSON column to `AgentGraph` table with
migration
- **API**: Updated `execute_graph` endpoint to accept `safe_mode`
parameter
- **Execution**: Enhanced execution context to use graph metadata as
default with API override capability
- **Auto-detection**: Automatically populate `has_human_in_the_loop` for
graphs containing HITL blocks
- **Block Detection**: HITL block ID:
`8b2a7b3c-6e9d-4a5f-8c1b-2e3f4a5b6c7d`

## 🎨 Frontend Changes

- **Component**: New `FloatingSafeModeToggle` with dual variants:
  - **White variant**: For library pages, integrates with action buttons
  - **Black variant**: For builders, floating positioned  
- **Integration**: Added toggles to both new/legacy builders and library
pages
- **API Integration**: Direct graph metadata updates via
`usePutV1UpdateGraphVersion`
- **Query Management**: React Query cache invalidation for consistent UI
updates
- **Conditional Display**: Toggle only appears when graph contains HITL
blocks

## 🛠 Technical Implementation

- **Safe Mode ON** (default): HITL blocks require manual review before
proceeding
- **Safe Mode OFF**: HITL blocks execute automatically without
intervention
- **Priority**: Backend API `safe_mode` parameter takes precedence over
graph metadata
- **Detection**: Auto-populates `has_human_in_the_loop` metadata field
- **Positioning**: Proper z-index and responsive positioning for
floating elements

## 🚧 Known Issues (Work in Progress)

### High Priority
- [ ] **Toggle state persistence**: Always shows "ON" regardless of
actual state - query invalidation issue
- [ ] **LibraryAgent metadata**: Missing metadata field causing
TypeScript errors
- [ ] **Tooltip z-index**: Still covered by some UI elements despite
high z-index

### Medium Priority  
- [ ] **HITL detection**: Logic needs improvement for reliable block
detection
- [ ] **Error handling**: Removing HITL blocks from graph causes save
errors
- [ ] **TypeScript**: Fix type mismatches between GraphModel and
LibraryAgent

### Low Priority
- [ ] **Frontend API**: Add `safe_mode` parameter to execution calls
once OpenAPI is regenerated
- [ ] **Performance**: Consider debouncing rapid toggle clicks

## 🧪 Test Plan

- [ ] Verify toggle appears only when graph has HITL blocks
- [ ] Test toggle persistence across page refreshes  
- [ ] Confirm API calls update graph metadata correctly
- [ ] Validate execution behavior respects safe mode setting
- [ ] Check styling consistency across builder and library contexts

## 🔗 Related

- Addresses requirements for graph-level HITL configuration
- Builds on existing FloatingReviewsPanel infrastructure
- Integrates with existing graph metadata system

🤖 Generated with [Claude Code](https://claude.ai/code)
This commit is contained in:
Zamil Majdy
2025-12-02 16:55:55 +07:00
committed by GitHub
parent 3b2a67a711
commit 7b951c977e
37 changed files with 916 additions and 390 deletions

View File

@@ -11,7 +11,7 @@ from backend.data.block import (
BlockType,
get_block,
)
from backend.data.execution import ExecutionStatus, NodesInputMasks
from backend.data.execution import ExecutionContext, ExecutionStatus, NodesInputMasks
from backend.data.model import NodeExecutionStats, SchemaField
from backend.util.json import validate_with_jsonschema
from backend.util.retry import func_retry
@@ -72,9 +72,9 @@ class AgentExecutorBlock(Block):
input_data: Input,
*,
graph_exec_id: str,
execution_context: ExecutionContext,
**kwargs,
) -> BlockOutput:
from backend.executor import utils as execution_utils
graph_exec = await execution_utils.add_graph_execution(
@@ -83,8 +83,9 @@ class AgentExecutorBlock(Block):
user_id=input_data.user_id,
inputs=input_data.inputs,
nodes_input_masks=input_data.nodes_input_masks,
parent_graph_exec_id=graph_exec_id,
is_sub_graph=True, # AgentExecutorBlock executions are always sub-graphs
execution_context=execution_context.model_copy(
update={"parent_execution_id": graph_exec_id},
),
)
logger = execution_utils.LogMetadata(

View File

@@ -9,8 +9,9 @@ from backend.data.block import (
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
BlockType,
)
from backend.data.execution import ExecutionStatus
from backend.data.execution import ExecutionContext, ExecutionStatus
from backend.data.human_review import ReviewResult
from backend.data.model import SchemaField
from backend.executor.manager import async_update_node_execution_status
@@ -61,15 +62,15 @@ class HumanInTheLoopBlock(Block):
categories={BlockCategory.BASIC},
input_schema=HumanInTheLoopBlock.Input,
output_schema=HumanInTheLoopBlock.Output,
block_type=BlockType.HUMAN_IN_THE_LOOP,
test_input={
"data": {"name": "John Doe", "age": 30},
"name": "User profile data",
"editable": True,
},
test_output=[
("reviewed_data", {"name": "John Doe", "age": 30}),
("status", "approved"),
("review_message", ""),
("reviewed_data", {"name": "John Doe", "age": 30}),
],
test_mock={
"get_or_create_human_review": lambda *_args, **_kwargs: ReviewResult(
@@ -80,9 +81,25 @@ class HumanInTheLoopBlock(Block):
node_exec_id="test-node-exec-id",
),
"update_node_execution_status": lambda *_args, **_kwargs: None,
"update_review_processed_status": lambda *_args, **_kwargs: None,
},
)
async def get_or_create_human_review(self, **kwargs):
return await get_database_manager_async_client().get_or_create_human_review(
**kwargs
)
async def update_node_execution_status(self, **kwargs):
return await async_update_node_execution_status(
db_client=get_database_manager_async_client(), **kwargs
)
async def update_review_processed_status(self, node_exec_id: str, processed: bool):
return await get_database_manager_async_client().update_review_processed_status(
node_exec_id, processed
)
async def run(
self,
input_data: Input,
@@ -92,20 +109,20 @@ class HumanInTheLoopBlock(Block):
graph_exec_id: str,
graph_id: str,
graph_version: int,
execution_context: ExecutionContext,
**kwargs,
) -> BlockOutput:
"""
Execute the Human In The Loop block.
if not execution_context.safe_mode:
logger.info(
f"HITL block skipping review for node {node_exec_id} - safe mode disabled"
)
yield "status", "approved"
yield "reviewed_data", input_data.data
yield "review_message", "Auto-approved (safe mode disabled)"
return
This method uses one function to handle the complete workflow - checking existing reviews
and creating pending ones as needed.
"""
try:
logger.debug(f"HITL block executing for node {node_exec_id}")
# Use the data layer to handle the complete workflow
db_client = get_database_manager_async_client()
result = await db_client.get_or_create_human_review(
result = await self.get_or_create_human_review(
user_id=user_id,
node_exec_id=node_exec_id,
graph_exec_id=graph_exec_id,
@@ -119,21 +136,15 @@ class HumanInTheLoopBlock(Block):
logger.error(f"Error in HITL block for node {node_exec_id}: {str(e)}")
raise
# Check if we're waiting for human input
if result is None:
logger.info(
f"HITL block pausing execution for node {node_exec_id} - awaiting human review"
)
try:
# Set node status to REVIEW so execution manager can't mark it as COMPLETED
# The VALID_STATUS_TRANSITIONS will then prevent any unwanted status changes
# Use the proper wrapper function to ensure websocket events are published
await async_update_node_execution_status(
db_client=db_client,
await self.update_node_execution_status(
exec_id=node_exec_id,
status=ExecutionStatus.REVIEW,
)
# Execution pauses here until API routes process the review
return
except Exception as e:
logger.error(
@@ -141,10 +152,8 @@ class HumanInTheLoopBlock(Block):
)
raise
# Review is complete (approved or rejected) - check if unprocessed
if not result.processed:
# Mark as processed before yielding
await db_client.update_review_processed_status(
await self.update_review_processed_status(
node_exec_id=node_exec_id, processed=True
)

View File

@@ -14,7 +14,7 @@ from backend.data.block import (
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.execution import UserContext
from backend.data.execution import ExecutionContext
from backend.data.model import SchemaField
# Shared timezone literal type for all time/date blocks
@@ -188,10 +188,9 @@ class GetCurrentTimeBlock(Block):
)
async def run(
self, input_data: Input, *, user_context: UserContext, **kwargs
self, input_data: Input, *, execution_context: ExecutionContext, **kwargs
) -> BlockOutput:
# Extract timezone from user_context (always present)
effective_timezone = user_context.timezone
effective_timezone = execution_context.user_timezone
# Get the appropriate timezone
tz = _get_timezone(input_data.format_type, effective_timezone)
@@ -298,10 +297,10 @@ class GetCurrentDateBlock(Block):
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
# Extract timezone from user_context (required keyword argument)
user_context: UserContext = kwargs["user_context"]
effective_timezone = user_context.timezone
async def run(
self, input_data: Input, *, execution_context: ExecutionContext, **kwargs
) -> BlockOutput:
effective_timezone = execution_context.user_timezone
try:
offset = int(input_data.offset)
@@ -404,10 +403,10 @@ class GetCurrentDateAndTimeBlock(Block):
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
# Extract timezone from user_context (required keyword argument)
user_context: UserContext = kwargs["user_context"]
effective_timezone = user_context.timezone
async def run(
self, input_data: Input, *, execution_context: ExecutionContext, **kwargs
) -> BlockOutput:
effective_timezone = execution_context.user_timezone
# Get the appropriate timezone
tz = _get_timezone(input_data.format_type, effective_timezone)

View File

@@ -71,6 +71,7 @@ class BlockType(Enum):
AGENT = "Agent"
AI = "AI"
AYRSHARE = "Ayrshare"
HUMAN_IN_THE_LOOP = "Human In The Loop"
class BlockCategory(Enum):
@@ -796,3 +797,12 @@ def get_io_block_ids() -> Sequence[str]:
for id, B in get_blocks().items()
if B().block_type in (BlockType.INPUT, BlockType.OUTPUT)
]
@cached(ttl_seconds=3600)
def get_human_in_the_loop_block_ids() -> Sequence[str]:
return [
id
for id, B in get_blocks().items()
if B().block_type == BlockType.HUMAN_IN_THE_LOOP
]

View File

@@ -7,7 +7,7 @@ from prisma.models import CreditTransaction, UserBalance
from backend.blocks.llm import AITextGeneratorBlock
from backend.data.block import get_block
from backend.data.credit import BetaUserCredit, UsageTransactionMetadata
from backend.data.execution import NodeExecutionEntry, UserContext
from backend.data.execution import ExecutionContext, NodeExecutionEntry
from backend.data.user import DEFAULT_USER_ID
from backend.executor.utils import block_usage_cost
from backend.integrations.credentials_store import openai_credentials
@@ -86,7 +86,7 @@ async def test_block_credit_usage(server: SpinTestServer):
"type": openai_credentials.type,
},
},
user_context=UserContext(timezone="UTC"),
execution_context=ExecutionContext(user_timezone="UTC"),
),
)
assert spending_amount_1 > 0
@@ -101,7 +101,7 @@ async def test_block_credit_usage(server: SpinTestServer):
node_exec_id="test_node_exec",
block_id=AITextGeneratorBlock().id,
inputs={"model": "gpt-4-turbo", "api_key": "owned_api_key"},
user_context=UserContext(timezone="UTC"),
execution_context=ExecutionContext(user_timezone="UTC"),
),
)
assert spending_amount_2 == 0

View File

@@ -71,6 +71,18 @@ logger = logging.getLogger(__name__)
config = Config()
class ExecutionContext(BaseModel):
"""
Unified context that carries execution-level data throughout the entire execution flow.
This includes information needed by blocks, sub-graphs, and execution management.
"""
safe_mode: bool = True
user_timezone: str = "UTC"
root_execution_id: Optional[str] = None
parent_execution_id: Optional[str] = None
# -------------------------- Models -------------------------- #
@@ -365,9 +377,8 @@ class GraphExecutionWithNodes(GraphExecution):
def to_graph_execution_entry(
self,
user_context: "UserContext",
execution_context: ExecutionContext,
compiled_nodes_input_masks: Optional[NodesInputMasks] = None,
parent_graph_exec_id: Optional[str] = None,
):
return GraphExecutionEntry(
user_id=self.user_id,
@@ -375,8 +386,7 @@ class GraphExecutionWithNodes(GraphExecution):
graph_version=self.graph_version or 0,
graph_exec_id=self.id,
nodes_input_masks=compiled_nodes_input_masks,
user_context=user_context,
parent_graph_exec_id=parent_graph_exec_id,
execution_context=execution_context,
)
@@ -449,7 +459,7 @@ class NodeExecutionResult(BaseModel):
)
def to_node_execution_entry(
self, user_context: "UserContext"
self, execution_context: ExecutionContext
) -> "NodeExecutionEntry":
return NodeExecutionEntry(
user_id=self.user_id,
@@ -460,7 +470,7 @@ class NodeExecutionResult(BaseModel):
node_id=self.node_id,
block_id=self.block_id,
inputs=self.input_data,
user_context=user_context,
execution_context=execution_context,
)
@@ -1099,20 +1109,13 @@ async def get_latest_node_execution(
# ----------------- Execution Infrastructure ----------------- #
class UserContext(BaseModel):
"""Generic user context for graph execution containing user-specific settings."""
timezone: str
class GraphExecutionEntry(BaseModel):
user_id: str
graph_exec_id: str
graph_id: str
graph_version: int
nodes_input_masks: Optional[NodesInputMasks] = None
user_context: UserContext
parent_graph_exec_id: Optional[str] = None
execution_context: ExecutionContext
class NodeExecutionEntry(BaseModel):
@@ -1124,7 +1127,7 @@ class NodeExecutionEntry(BaseModel):
node_id: str
block_id: str
inputs: BlockInput
user_context: UserContext
execution_context: ExecutionContext
class ExecutionQueue(Generic[T]):

View File

@@ -61,6 +61,10 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class GraphSettings(BaseModel):
human_in_the_loop_safe_mode: bool | None = None
class Link(BaseDbModel):
source_id: str
sink_id: str
@@ -225,6 +229,15 @@ class BaseGraph(BaseDbModel):
def has_external_trigger(self) -> bool:
return self.webhook_input_node is not None
@computed_field
@property
def has_human_in_the_loop(self) -> bool:
return any(
node.block_id
for node in self.nodes
if node.block.block_type == BlockType.HUMAN_IN_THE_LOOP
)
@property
def webhook_input_node(self) -> Node | None:
return next(
@@ -1105,6 +1118,28 @@ async def delete_graph(graph_id: str, user_id: str) -> int:
return entries_count
async def get_graph_settings(user_id: str, graph_id: str) -> GraphSettings:
lib = await LibraryAgent.prisma().find_first(
where={
"userId": user_id,
"agentGraphId": graph_id,
"isDeleted": False,
"isArchived": False,
},
order={"agentGraphVersion": "desc"},
)
if not lib or not lib.settings:
return GraphSettings()
try:
return GraphSettings.model_validate(lib.settings)
except Exception:
logger.warning(
f"Malformed settings for LibraryAgent user={user_id} graph={graph_id}"
)
return GraphSettings()
async def validate_graph_execution_permissions(
user_id: str, graph_id: str, graph_version: int, is_sub_graph: bool = False
) -> None:

View File

@@ -32,32 +32,6 @@ class ReviewResult(BaseModel):
node_exec_id: str
async def get_pending_review_by_node_exec_id(
node_exec_id: str, user_id: str
) -> Optional["PendingHumanReviewModel"]:
"""
Get a pending review by node execution ID with user ownership validation.
Args:
node_exec_id: The node execution ID to check
user_id: The user ID to validate ownership
Returns:
The existing review if found and owned by the user, None otherwise
"""
review = await PendingHumanReview.prisma().find_first(
where={
"nodeExecId": node_exec_id,
"userId": user_id,
}
)
if review:
return PendingHumanReviewModel.from_db(review)
return None
async def get_or_create_human_review(
user_id: str,
node_exec_id: str,
@@ -121,27 +95,17 @@ async def get_or_create_human_review(
if review.processed:
return None
if review.status == ReviewStatus.APPROVED:
# Return the approved review result
return ReviewResult(
data=review.payload,
status=ReviewStatus.APPROVED,
message=review.reviewMessage or "",
processed=review.processed,
node_exec_id=review.nodeExecId,
)
elif review.status == ReviewStatus.REJECTED:
# Return the rejected review result
return ReviewResult(
data=None,
status=ReviewStatus.REJECTED,
message=review.reviewMessage or "",
processed=review.processed,
node_exec_id=review.nodeExecId,
)
else:
# Review is pending - return None to continue waiting
# If pending, return None to continue waiting, otherwise return the review result
if review.status == ReviewStatus.WAITING:
return None
else:
return ReviewResult(
data=review.payload if review.status == ReviewStatus.APPROVED else None,
status=review.status,
message=review.reviewMessage or "",
processed=review.processed,
node_exec_id=review.nodeExecId,
)
async def has_pending_reviews_for_graph_exec(graph_exec_id: str) -> bool:

View File

@@ -7,7 +7,6 @@ from prisma.enums import ReviewStatus
from backend.data.human_review import (
get_or_create_human_review,
get_pending_review_by_node_exec_id,
get_pending_reviews_for_execution,
get_pending_reviews_for_user,
has_pending_reviews_for_graph_exec,
@@ -24,7 +23,7 @@ def sample_db_review():
"""Create a sample database review object"""
mock_review = Mock()
mock_review.nodeExecId = "test_node_123"
mock_review.userId = "test_user"
mock_review.userId = "test-user-123"
mock_review.graphExecId = "test_graph_exec_456"
mock_review.graphId = "test_graph_789"
mock_review.graphVersion = 1
@@ -41,40 +40,6 @@ def sample_db_review():
return mock_review
@pytest.mark.asyncio
async def test_get_pending_review_by_node_exec_id_found(
mocker: pytest_mock.MockFixture,
sample_db_review,
):
"""Test finding an existing pending review"""
mock_find_first = mocker.patch(
"backend.data.human_review.PendingHumanReview.prisma"
)
mock_find_first.return_value.find_first = AsyncMock(return_value=sample_db_review)
result = await get_pending_review_by_node_exec_id("test_node_123", "test_user")
assert result is not None
assert result.node_exec_id == "test_node_123"
assert result.user_id == "test_user"
assert result.status == ReviewStatus.WAITING
@pytest.mark.asyncio
async def test_get_pending_review_by_node_exec_id_not_found(
mocker: pytest_mock.MockFixture,
):
"""Test when review is not found"""
mock_find_first = mocker.patch(
"backend.data.human_review.PendingHumanReview.prisma"
)
mock_find_first.return_value.find_first = AsyncMock(return_value=None)
result = await get_pending_review_by_node_exec_id("nonexistent", "test_user")
assert result is None
@pytest.mark.asyncio
async def test_get_or_create_human_review_new(
mocker: pytest_mock.MockFixture,
@@ -89,7 +54,7 @@ async def test_get_or_create_human_review_new(
mock_upsert.return_value.upsert = AsyncMock(return_value=sample_db_review)
result = await get_or_create_human_review(
user_id="test_user",
user_id="test-user-123",
node_exec_id="test_node_123",
graph_exec_id="test_graph_exec_456",
graph_id="test_graph_789",
@@ -118,7 +83,7 @@ async def test_get_or_create_human_review_approved(
mock_upsert.return_value.upsert = AsyncMock(return_value=sample_db_review)
result = await get_or_create_human_review(
user_id="test_user",
user_id="test-user-123",
node_exec_id="test_node_123",
graph_exec_id="test_graph_exec_456",
graph_id="test_graph_789",
@@ -190,7 +155,9 @@ async def test_get_pending_reviews_for_execution(
mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_find_many.return_value.find_many = AsyncMock(return_value=[sample_db_review])
result = await get_pending_reviews_for_execution("test_graph_exec_456", "test_user")
result = await get_pending_reviews_for_execution(
"test_graph_exec_456", "test-user-123"
)
assert len(result) == 1
assert result[0].graph_exec_id == "test_graph_exec_456"
@@ -198,7 +165,7 @@ async def test_get_pending_reviews_for_execution(
# Verify it filters by execution and user
call_args = mock_find_many.return_value.find_many.call_args
where_clause = call_args.kwargs["where"]
assert where_clause["userId"] == "test_user"
assert where_clause["userId"] == "test-user-123"
assert where_clause["graphExecId"] == "test_graph_exec_456"
assert where_clause["status"] == ReviewStatus.WAITING
@@ -210,13 +177,13 @@ async def test_process_all_reviews_for_execution_success(
):
"""Test successful processing of reviews for an execution"""
# Mock finding reviews
mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_find_many.return_value.find_many = AsyncMock(return_value=[sample_db_review])
mock_prisma = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_prisma.return_value.find_many = AsyncMock(return_value=[sample_db_review])
# Mock updating reviews
updated_review = Mock()
updated_review.nodeExecId = "test_node_123"
updated_review.userId = "test_user"
updated_review.userId = "test-user-123"
updated_review.graphExecId = "test_graph_exec_456"
updated_review.graphId = "test_graph_789"
updated_review.graphVersion = 1
@@ -230,8 +197,7 @@ async def test_process_all_reviews_for_execution_success(
updated_review.createdAt = datetime.datetime.now(datetime.timezone.utc)
updated_review.updatedAt = datetime.datetime.now(datetime.timezone.utc)
updated_review.reviewedAt = datetime.datetime.now(datetime.timezone.utc)
mock_update = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_update.return_value.update = AsyncMock(return_value=updated_review)
mock_prisma.return_value.update = AsyncMock(return_value=updated_review)
# Mock gather to simulate parallel updates
mocker.patch(
@@ -240,7 +206,7 @@ async def test_process_all_reviews_for_execution_success(
)
result = await process_all_reviews_for_execution(
user_id="test_user",
user_id="test-user-123",
review_decisions={
"test_node_123": (ReviewStatus.APPROVED, {"data": "modified"}, "Approved")
},
@@ -264,7 +230,7 @@ async def test_process_all_reviews_for_execution_validation_errors(
with pytest.raises(ValueError, match="Reviews not found"):
await process_all_reviews_for_execution(
user_id="test_user",
user_id="test-user-123",
review_decisions={
"nonexistent_node": (ReviewStatus.APPROVED, {"data": "test"}, "message")
},
@@ -286,7 +252,7 @@ async def test_process_all_reviews_edit_permission_error(
with pytest.raises(ValueError, match="not editable"):
await process_all_reviews_for_execution(
user_id="test_user",
user_id="test-user-123",
review_decisions={
"test_node_123": (
ReviewStatus.APPROVED,
@@ -306,7 +272,7 @@ async def test_process_all_reviews_mixed_approval_rejection(
# Create second review for rejection
second_review = Mock()
second_review.nodeExecId = "test_node_456"
second_review.userId = "test_user"
second_review.userId = "test-user-123"
second_review.graphExecId = "test_graph_exec_456"
second_review.graphId = "test_graph_789"
second_review.graphVersion = 1
@@ -330,7 +296,7 @@ async def test_process_all_reviews_mixed_approval_rejection(
# Mock updating reviews
approved_review = Mock()
approved_review.nodeExecId = "test_node_123"
approved_review.userId = "test_user"
approved_review.userId = "test-user-123"
approved_review.graphExecId = "test_graph_exec_456"
approved_review.graphId = "test_graph_789"
approved_review.graphVersion = 1
@@ -347,7 +313,7 @@ async def test_process_all_reviews_mixed_approval_rejection(
rejected_review = Mock()
rejected_review.nodeExecId = "test_node_456"
rejected_review.userId = "test_user"
rejected_review.userId = "test-user-123"
rejected_review.graphExecId = "test_graph_exec_456"
rejected_review.graphId = "test_graph_789"
rejected_review.graphVersion = 1
@@ -368,7 +334,7 @@ async def test_process_all_reviews_mixed_approval_rejection(
)
result = await process_all_reviews_for_execution(
user_id="test_user",
user_id="test-user-123",
review_decisions={
"test_node_123": (ReviewStatus.APPROVED, {"data": "modified"}, "Approved"),
"test_node_456": (ReviewStatus.REJECTED, None, "Rejected"),

View File

@@ -1,5 +1,5 @@
import logging
from typing import AsyncGenerator, Literal, Optional, overload
from typing import TYPE_CHECKING, AsyncGenerator, Literal, Optional, overload
from prisma.models import AgentNode, AgentPreset, IntegrationWebhook
from prisma.types import (
@@ -19,10 +19,12 @@ from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.integrations.providers import ProviderName
from backend.integrations.webhooks import get_webhook_manager
from backend.integrations.webhooks.utils import webhook_ingress_url
from backend.server.v2.library.model import LibraryAgentPreset
from backend.util.exceptions import NotFoundError
from backend.util.json import SafeJson
if TYPE_CHECKING:
from backend.server.v2.library.model import LibraryAgentPreset
from .db import BaseDbModel
from .graph import NodeModel
@@ -64,7 +66,7 @@ class Webhook(BaseDbModel):
class WebhookWithRelations(Webhook):
triggered_nodes: list[NodeModel]
triggered_presets: list[LibraryAgentPreset]
triggered_presets: list["LibraryAgentPreset"]
@staticmethod
def from_db(webhook: IntegrationWebhook):
@@ -73,6 +75,12 @@ class WebhookWithRelations(Webhook):
"AgentNodes and AgentPresets must be included in "
"IntegrationWebhook query with relations"
)
# LibraryAgentPreset import is moved to TYPE_CHECKING to avoid circular import:
# integrations.py → library/model.py → integrations.py (for Webhook)
# Runtime import is used in WebhookWithRelations.from_db() method instead
# Import at runtime to avoid circular dependency
from backend.server.v2.library.model import LibraryAgentPreset
return WebhookWithRelations(
**Webhook.from_db(webhook).model_dump(),
triggered_nodes=[NodeModel.from_db(node) for node in webhook.AgentNodes],

View File

@@ -46,6 +46,7 @@ from backend.util.settings import Secrets
# Type alias for any provider name (including custom ones)
AnyProviderName = str # Will be validated as ProviderName at runtime
USER_TIMEZONE_NOT_SET = "not-set"
class User(BaseModel):
@@ -98,7 +99,7 @@ class User(BaseModel):
# User timezone for scheduling and time display
timezone: str = Field(
default="not-set",
default=USER_TIMEZONE_NOT_SET,
description="User timezone (IANA timezone identifier or 'not-set')",
)
@@ -155,7 +156,7 @@ class User(BaseModel):
notify_on_daily_summary=prisma_user.notifyOnDailySummary or True,
notify_on_weekly_summary=prisma_user.notifyOnWeeklySummary or True,
notify_on_monthly_summary=prisma_user.notifyOnMonthlySummary or True,
timezone=prisma_user.timezone or "not-set",
timezone=prisma_user.timezone or USER_TIMEZONE_NOT_SET,
)

View File

@@ -28,6 +28,7 @@ from backend.data.graph import (
get_connected_output_nodes,
get_graph,
get_graph_metadata,
get_graph_settings,
get_node,
validate_graph_execution_permissions,
)
@@ -150,6 +151,7 @@ class DatabaseManager(AppService):
get_graph = _(get_graph)
get_connected_output_nodes = _(get_connected_output_nodes)
get_graph_metadata = _(get_graph_metadata)
get_graph_settings = _(get_graph_settings)
# Credits
spend_credits = _(_spend_credits, name="spend_credits")
@@ -254,6 +256,7 @@ class DatabaseManagerAsyncClient(AppServiceClient):
get_latest_node_execution = d.get_latest_node_execution
get_graph = d.get_graph
get_graph_metadata = d.get_graph_metadata
get_graph_settings = d.get_graph_settings
get_graph_execution_meta = d.get_graph_execution_meta
get_node = d.get_node
get_node_execution = d.get_node_execution

View File

@@ -29,6 +29,7 @@ from backend.data.block import (
from backend.data.credit import UsageTransactionMetadata
from backend.data.dynamic_fields import parse_execution_output
from backend.data.execution import (
ExecutionContext,
ExecutionQueue,
ExecutionStatus,
GraphExecution,
@@ -36,7 +37,6 @@ from backend.data.execution import (
NodeExecutionEntry,
NodeExecutionResult,
NodesInputMasks,
UserContext,
)
from backend.data.graph import Link, Node
from backend.data.model import GraphExecutionStats, NodeExecutionStats
@@ -168,6 +168,7 @@ async def execute_node(
node_exec_id = data.node_exec_id
node_id = data.node_id
node_block = node.block
execution_context = data.execution_context
log_metadata = LogMetadata(
logger=_logger,
@@ -210,11 +211,9 @@ async def execute_node(
"graph_exec_id": graph_exec_id,
"node_exec_id": node_exec_id,
"user_id": user_id,
"execution_context": execution_context,
}
# Add user context from NodeExecutionEntry
extra_exec_kwargs["user_context"] = data.user_context
# Last-minute fetch credentials + acquire a system-wide read-write lock to prevent
# changes during execution. ⚠️ This means a set of credentials can only be used by
# one (running) block at a time; simultaneous execution of blocks using same
@@ -243,8 +242,8 @@ async def execute_node(
scope.set_tag("node_id", node_id)
scope.set_tag("block_name", node_block.name)
scope.set_tag("block_id", node_block.id)
for k, v in (data.user_context or UserContext(timezone="UTC")).model_dump().items():
scope.set_tag(f"user_context.{k}", v)
for k, v in execution_context.model_dump().items():
scope.set_tag(f"execution_context.{k}", v)
try:
async for output_name, output_data in node_block.execute(
@@ -289,7 +288,7 @@ async def _enqueue_next_nodes(
graph_version: int,
log_metadata: LogMetadata,
nodes_input_masks: Optional[NodesInputMasks],
user_context: UserContext,
execution_context: ExecutionContext,
) -> list[NodeExecutionEntry]:
async def add_enqueued_execution(
node_exec_id: str, node_id: str, block_id: str, data: BlockInput
@@ -309,7 +308,7 @@ async def _enqueue_next_nodes(
node_id=node_id,
block_id=block_id,
inputs=data,
user_context=user_context,
execution_context=execution_context,
)
async def register_next_executions(node_link: Link) -> list[NodeExecutionEntry]:
@@ -861,7 +860,9 @@ class ExecutionProcessor:
ExecutionStatus.REVIEW,
],
):
node_entry = node_exec.to_node_execution_entry(graph_exec.user_context)
node_entry = node_exec.to_node_execution_entry(
graph_exec.execution_context
)
execution_queue.add(node_entry)
# ------------------------------------------------------------
@@ -1165,7 +1166,7 @@ class ExecutionProcessor:
graph_version=graph_exec.graph_version,
log_metadata=log_metadata,
nodes_input_masks=nodes_input_masks,
user_context=graph_exec.user_context,
execution_context=graph_exec.execution_context,
):
execution_queue.add(next_execution)
@@ -1555,36 +1556,32 @@ class ExecutionManager(AppProcess):
graph_exec_id = graph_exec_entry.graph_exec_id
user_id = graph_exec_entry.user_id
graph_id = graph_exec_entry.graph_id
parent_graph_exec_id = graph_exec_entry.parent_graph_exec_id
root_exec_id = graph_exec_entry.execution_context.root_execution_id
parent_exec_id = graph_exec_entry.execution_context.parent_execution_id
logger.info(
f"[{self.service_name}] Received RUN for graph_exec_id={graph_exec_id}, user_id={user_id}, executor_id={self.executor_id}"
+ (f", parent={parent_graph_exec_id}" if parent_graph_exec_id else "")
+ (f", root={root_exec_id}" if root_exec_id else "")
+ (f", parent={parent_exec_id}" if parent_exec_id else "")
)
# Check if parent execution is already terminated (prevents orphaned child executions)
if parent_graph_exec_id:
try:
parent_exec = get_db_client().get_graph_execution_meta(
execution_id=parent_graph_exec_id,
user_id=user_id,
# Check if root execution is already terminated (prevents orphaned child executions)
if root_exec_id and root_exec_id != graph_exec_id:
parent_exec = get_db_client().get_graph_execution_meta(
execution_id=root_exec_id,
user_id=user_id,
)
if parent_exec and parent_exec.status == ExecutionStatus.TERMINATED:
logger.info(
f"[{self.service_name}] Skipping execution {graph_exec_id} - parent {root_exec_id} is TERMINATED"
)
if parent_exec and parent_exec.status == ExecutionStatus.TERMINATED:
logger.info(
f"[{self.service_name}] Skipping execution {graph_exec_id} - parent {parent_graph_exec_id} is TERMINATED"
)
# Mark this child as terminated since parent was stopped
get_db_client().update_graph_execution_stats(
graph_exec_id=graph_exec_id,
status=ExecutionStatus.TERMINATED,
)
_ack_message(reject=False, requeue=False)
return
except Exception as e:
logger.warning(
f"[{self.service_name}] Could not check parent status for {graph_exec_id}: {e}"
# Mark this child as terminated since parent was stopped
get_db_client().update_graph_execution_stats(
graph_exec_id=graph_exec_id,
status=ExecutionStatus.TERMINATED,
)
# Continue execution if parent check fails (don't block on errors)
_ack_message(reject=False, requeue=False)
return
# Check user rate limit before processing
try:

View File

@@ -10,6 +10,7 @@ from pydantic import BaseModel, JsonValue, ValidationError
from backend.data import execution as execution_db
from backend.data import graph as graph_db
from backend.data import user as user_db
from backend.data.block import (
Block,
BlockCostType,
@@ -24,19 +25,17 @@ from backend.data.db import prisma
# Import dynamic field utilities from centralized location
from backend.data.dynamic_fields import merge_execution_input
from backend.data.execution import (
ExecutionContext,
ExecutionStatus,
GraphExecutionMeta,
GraphExecutionStats,
GraphExecutionWithNodes,
NodesInputMasks,
UserContext,
get_graph_execution,
)
from backend.data.graph import GraphModel, Node
from backend.data.model import CredentialsMetaInput
from backend.data.model import USER_TIMEZONE_NOT_SET, CredentialsMetaInput
from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig
from backend.data.user import get_user_by_id
from backend.util.cache import cached
from backend.util.clients import (
get_async_execution_event_bus,
get_async_execution_queue,
@@ -52,32 +51,6 @@ from backend.util.logging import TruncatedLogger, is_structured_logging_enabled
from backend.util.settings import Config
from backend.util.type import convert
@cached(maxsize=1000, ttl_seconds=3600)
async def get_user_context(user_id: str) -> UserContext:
"""
Get UserContext for a user, always returns a valid context with timezone.
Defaults to UTC if user has no timezone set.
"""
user_context = UserContext(timezone="UTC") # Default to UTC
try:
if prisma.is_connected():
user = await get_user_by_id(user_id)
else:
user = await get_database_manager_async_client().get_user_by_id(user_id)
if user and user.timezone and user.timezone != "not-set":
user_context.timezone = user.timezone
logger.debug(f"Retrieved user context: timezone={user.timezone}")
else:
logger.debug("User has no timezone set, using UTC")
except Exception as e:
logger.warning(f"Could not fetch user timezone: {e}")
# Continue with UTC as default
return user_context
config = Config()
logger = TruncatedLogger(logging.getLogger(__name__), prefix="[GraphExecutorUtil]")
@@ -495,7 +468,6 @@ async def validate_and_construct_node_execution_input(
graph_version: The version of the graph to use.
graph_credentials_inputs: Credentials inputs to use.
nodes_input_masks: Node inputs to use.
is_sub_graph: Whether this is a sub-graph execution.
Returns:
GraphModel: Full graph object for the given `graph_id`.
@@ -763,8 +735,7 @@ async def add_graph_execution(
graph_version: Optional[int] = None,
graph_credentials_inputs: Optional[Mapping[str, CredentialsMetaInput]] = None,
nodes_input_masks: Optional[NodesInputMasks] = None,
parent_graph_exec_id: Optional[str] = None,
is_sub_graph: bool = False,
execution_context: Optional[ExecutionContext] = None,
graph_exec_id: Optional[str] = None,
) -> GraphExecutionWithNodes:
"""
@@ -780,7 +751,6 @@ async def add_graph_execution(
Keys should map to the keys generated by `GraphModel.aggregate_credentials_inputs`.
nodes_input_masks: Node inputs to use in the execution.
parent_graph_exec_id: The ID of the parent graph execution (for nested executions).
is_sub_graph: Whether this is a sub-graph execution.
graph_exec_id: If provided, resume this existing execution instead of creating a new one.
Returns:
GraphExecutionEntry: The entry for the graph execution.
@@ -790,8 +760,10 @@ async def add_graph_execution(
"""
if prisma.is_connected():
edb = execution_db
udb = user_db
gdb = graph_db
else:
edb = get_database_manager_async_client()
edb = udb = gdb = get_database_manager_async_client()
# Get or create the graph execution
if graph_exec_id:
@@ -810,6 +782,10 @@ async def add_graph_execution(
logger.info(f"Resuming graph execution #{graph_exec.id} for graph #{graph_id}")
else:
parent_exec_id = (
execution_context.parent_execution_id if execution_context else None
)
# Create new execution
graph, starting_nodes_input, compiled_nodes_input_masks = (
await validate_and_construct_node_execution_input(
@@ -819,7 +795,7 @@ async def add_graph_execution(
graph_version=graph_version,
graph_credentials_inputs=graph_credentials_inputs,
nodes_input_masks=nodes_input_masks,
is_sub_graph=is_sub_graph,
is_sub_graph=parent_exec_id is not None,
)
)
@@ -832,7 +808,7 @@ async def add_graph_execution(
nodes_input_masks=nodes_input_masks,
starting_nodes_input=starting_nodes_input,
preset_id=preset_id,
parent_graph_exec_id=parent_graph_exec_id,
parent_graph_exec_id=parent_exec_id,
)
logger.info(
@@ -840,14 +816,28 @@ async def add_graph_execution(
f"#{graph_id} with {len(starting_nodes_input)} starting nodes"
)
# Common path: publish to queue and update status
try:
graph_exec_entry = graph_exec.to_graph_execution_entry(
user_context=await get_user_context(user_id),
compiled_nodes_input_masks=compiled_nodes_input_masks,
parent_graph_exec_id=parent_graph_exec_id,
# Generate execution context if it's not provided
if execution_context is None:
user = await udb.get_user_by_id(user_id)
settings = await gdb.get_graph_settings(user_id=user_id, graph_id=graph_id)
execution_context = ExecutionContext(
safe_mode=(
settings.human_in_the_loop_safe_mode
if settings.human_in_the_loop_safe_mode is not None
else True
),
user_timezone=(
user.timezone if user.timezone != USER_TIMEZONE_NOT_SET else "UTC"
),
root_execution_id=graph_exec.id,
)
try:
graph_exec_entry = graph_exec.to_graph_execution_entry(
compiled_nodes_input_masks=compiled_nodes_input_masks,
execution_context=execution_context,
)
logger.info(f"Publishing execution {graph_exec.id} to execution queue")
exec_queue = await get_async_execution_queue()

View File

@@ -348,9 +348,6 @@ async def test_add_graph_execution_is_repeatable(mocker: MockerFixture):
mock_graph_exec.node_executions = [] # Add this to avoid AttributeError
mock_graph_exec.to_graph_execution_entry.return_value = mocker.MagicMock()
# Mock user context
mock_user_context = {"user_id": user_id, "context": "test_context"}
# Mock the queue and event bus
mock_queue = mocker.AsyncMock()
mock_event_bus = mocker.MagicMock()
@@ -362,7 +359,8 @@ async def test_add_graph_execution_is_repeatable(mocker: MockerFixture):
)
mock_edb = mocker.patch("backend.executor.utils.execution_db")
mock_prisma = mocker.patch("backend.executor.utils.prisma")
mock_get_user_context = mocker.patch("backend.executor.utils.get_user_context")
mock_udb = mocker.patch("backend.executor.utils.user_db")
mock_gdb = mocker.patch("backend.executor.utils.graph_db")
mock_get_queue = mocker.patch("backend.executor.utils.get_async_execution_queue")
mock_get_event_bus = mocker.patch(
"backend.executor.utils.get_async_execution_event_bus"
@@ -380,7 +378,14 @@ async def test_add_graph_execution_is_repeatable(mocker: MockerFixture):
return_value=mock_graph_exec
)
mock_edb.update_node_execution_status_batch = mocker.AsyncMock()
mock_get_user_context.return_value = mock_user_context
# Mock user and settings data
mock_user = mocker.MagicMock()
mock_user.timezone = "UTC"
mock_settings = mocker.MagicMock()
mock_settings.human_in_the_loop_safe_mode = True
mock_udb.get_user_by_id = mocker.AsyncMock(return_value=mock_user)
mock_gdb.get_graph_settings = mocker.AsyncMock(return_value=mock_settings)
mock_get_queue.return_value = mock_queue
mock_get_event_bus.return_value = mock_event_bus

View File

@@ -44,7 +44,7 @@ from backend.data.credit import (
get_user_credit_model,
set_auto_top_up,
)
from backend.data.execution import UserContext
from backend.data.graph import GraphSettings
from backend.data.model import CredentialsMetaInput
from backend.data.notifications import NotificationPreference, NotificationPreferenceDTO
from backend.data.onboarding import (
@@ -387,19 +387,15 @@ async def execute_graph_block(
if not obj:
raise HTTPException(status_code=404, detail=f"Block #{block_id} not found.")
# Get user context for block execution
user = await get_user_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found.")
user_context = UserContext(timezone=user.timezone)
start_time = time.time()
try:
output = defaultdict(list)
async for name, data in obj.execute(
data,
user_context=user_context,
user_id=user_id,
# Note: graph_exec_id and graph_id are not available for direct block execution
):
@@ -842,9 +838,18 @@ async def update_graph(
if new_graph_version.is_active:
# Keep the library agent up to date with the new active version
await library_db.update_agent_version_in_library(
library = await library_db.update_agent_version_in_library(
user_id, graph.id, graph.version
)
if (
new_graph_version.has_human_in_the_loop
and library.settings.human_in_the_loop_safe_mode is None
):
await library_db.update_library_agent_settings(
user_id=user_id,
agent_id=library.id,
settings=GraphSettings(human_in_the_loop_safe_mode=True),
)
# Handle activation of the new graph first to ensure continuity
new_graph_version = await on_graph_activate(new_graph_version, user_id=user_id)
@@ -901,15 +906,54 @@ async def set_graph_active_version(
)
# Keep the library agent up to date with the new active version
await library_db.update_agent_version_in_library(
library = await library_db.update_agent_version_in_library(
user_id, new_active_graph.id, new_active_graph.version
)
if (
new_active_graph.has_human_in_the_loop
and library.settings.human_in_the_loop_safe_mode is None
):
await library_db.update_library_agent_settings(
user_id=user_id,
agent_id=library.id,
settings=GraphSettings(human_in_the_loop_safe_mode=True),
)
if current_active_graph and current_active_graph.version != new_active_version:
# Handle deactivation of the previously active version
await on_graph_deactivate(current_active_graph, user_id=user_id)
@v1_router.patch(
path="/graphs/{graph_id}/settings",
summary="Update graph settings",
tags=["graphs"],
dependencies=[Security(requires_user)],
)
async def update_graph_settings(
graph_id: str,
settings: GraphSettings,
user_id: Annotated[str, Security(get_user_id)],
) -> GraphSettings:
"""Update graph settings for the user's library agent."""
# Get the library agent for this graph
library_agent = await library_db.get_library_agent_by_graph_id(
graph_id=graph_id, user_id=user_id
)
if not library_agent:
raise HTTPException(404, f"Graph #{graph_id} not found in user's library")
# Update the library agent settings
updated_agent = await library_db.update_library_agent_settings(
user_id=user_id,
agent_id=library_agent.id,
settings=settings,
)
# Return the updated settings
return GraphSettings.model_validate(updated_agent.settings)
@v1_router.post(
path="/graphs/{graph_id}/execute/{graph_version}",
summary="Execute graph agent",

View File

@@ -7,6 +7,7 @@ import pytest_mock
from prisma.enums import ReviewStatus
from pytest_snapshot.plugin import Snapshot
from backend.server.rest_api import handle_internal_http_error
from backend.server.v2.executions.review.model import PendingHumanReviewModel
from backend.server.v2.executions.review.routes import router
@@ -15,6 +16,7 @@ FIXED_NOW = datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc)
app = fastapi.FastAPI()
app.include_router(router, prefix="/api/review")
app.add_exception_handler(ValueError, handle_internal_http_error(400))
client = fastapi.testclient.TestClient(app)
@@ -34,11 +36,11 @@ def setup_app_auth(mock_jwt_user):
@pytest.fixture
def sample_pending_review() -> PendingHumanReviewModel:
def sample_pending_review(test_user_id: str) -> PendingHumanReviewModel:
"""Create a sample pending review for testing"""
return PendingHumanReviewModel(
node_exec_id="test_node_123",
user_id="test_user",
user_id=test_user_id,
graph_exec_id="test_graph_exec_456",
graph_id="test_graph_789",
graph_version=1,
@@ -58,6 +60,7 @@ def sample_pending_review() -> PendingHumanReviewModel:
def test_get_pending_reviews_empty(
mocker: pytest_mock.MockFixture,
snapshot: Snapshot,
test_user_id: str,
) -> None:
"""Test getting pending reviews when none exist"""
mock_get_reviews = mocker.patch(
@@ -69,13 +72,14 @@ def test_get_pending_reviews_empty(
assert response.status_code == 200
assert response.json() == []
mock_get_reviews.assert_called_once_with("test_user", 1, 25)
mock_get_reviews.assert_called_once_with(test_user_id, 1, 25)
def test_get_pending_reviews_with_data(
mocker: pytest_mock.MockFixture,
sample_pending_review: PendingHumanReviewModel,
snapshot: Snapshot,
test_user_id: str,
) -> None:
"""Test getting pending reviews with data"""
mock_get_reviews = mocker.patch(
@@ -90,13 +94,14 @@ def test_get_pending_reviews_with_data(
assert len(data) == 1
assert data[0]["node_exec_id"] == "test_node_123"
assert data[0]["status"] == "WAITING"
mock_get_reviews.assert_called_once_with("test_user", 2, 10)
mock_get_reviews.assert_called_once_with(test_user_id, 2, 10)
def test_get_pending_reviews_for_execution_success(
mocker: pytest_mock.MockFixture,
sample_pending_review: PendingHumanReviewModel,
snapshot: Snapshot,
test_user_id: str,
) -> None:
"""Test getting pending reviews for specific execution"""
mock_get_graph_execution = mocker.patch(
@@ -104,7 +109,7 @@ def test_get_pending_reviews_for_execution_success(
)
mock_get_graph_execution.return_value = {
"id": "test_graph_exec_456",
"user_id": "test_user",
"user_id": test_user_id,
}
mock_get_reviews = mocker.patch(
@@ -122,6 +127,7 @@ def test_get_pending_reviews_for_execution_success(
def test_get_pending_reviews_for_execution_access_denied(
mocker: pytest_mock.MockFixture,
test_user_id: str,
) -> None:
"""Test access denied when user doesn't own the execution"""
mock_get_graph_execution = mocker.patch(
@@ -138,13 +144,10 @@ def test_get_pending_reviews_for_execution_access_denied(
def test_process_review_action_approve_success(
mocker: pytest_mock.MockFixture,
sample_pending_review: PendingHumanReviewModel,
test_user_id: str,
) -> None:
"""Test successful review approval"""
# Mock the validation functions
mock_get_pending_review = mocker.patch(
"backend.data.human_review.get_pending_review_by_node_exec_id"
)
mock_get_pending_review.return_value = sample_pending_review
# Mock the route functions
mock_get_reviews_for_execution = mocker.patch(
"backend.server.v2.executions.review.routes.get_pending_reviews_for_execution"
@@ -154,24 +157,42 @@ def test_process_review_action_approve_success(
mock_process_all_reviews = mocker.patch(
"backend.server.v2.executions.review.routes.process_all_reviews_for_execution"
)
mock_process_all_reviews.return_value = {"test_node_123": sample_pending_review}
# Create approved review for return
approved_review = PendingHumanReviewModel(
node_exec_id="test_node_123",
user_id=test_user_id,
graph_exec_id="test_graph_exec_456",
graph_id="test_graph_789",
graph_version=1,
payload={"data": "modified payload", "value": 50},
instructions="Please review this data",
editable=True,
status=ReviewStatus.APPROVED,
review_message="Looks good",
was_edited=True,
processed=False,
created_at=FIXED_NOW,
updated_at=FIXED_NOW,
reviewed_at=FIXED_NOW,
)
mock_process_all_reviews.return_value = {"test_node_123": approved_review}
mock_has_pending = mocker.patch(
"backend.data.human_review.has_pending_reviews_for_graph_exec"
"backend.server.v2.executions.review.routes.has_pending_reviews_for_graph_exec"
)
mock_has_pending.return_value = False
mocker.patch("backend.executor.utils.add_graph_execution")
mocker.patch("backend.server.v2.executions.review.routes.add_graph_execution")
request_data = {
"approved_reviews": [
"reviews": [
{
"node_exec_id": "test_node_123",
"approved": True,
"message": "Looks good",
"reviewed_data": {"data": "modified payload", "value": 50},
}
],
"rejected_review_ids": [],
]
}
response = client.post("/api/review/action", json=request_data)
@@ -187,13 +208,10 @@ def test_process_review_action_approve_success(
def test_process_review_action_reject_success(
mocker: pytest_mock.MockFixture,
sample_pending_review: PendingHumanReviewModel,
test_user_id: str,
) -> None:
"""Test successful review rejection"""
# Mock the validation functions
mock_get_pending_review = mocker.patch(
"backend.data.human_review.get_pending_review_by_node_exec_id"
)
mock_get_pending_review.return_value = sample_pending_review
# Mock the route functions
mock_get_reviews_for_execution = mocker.patch(
"backend.server.v2.executions.review.routes.get_pending_reviews_for_execution"
@@ -205,7 +223,7 @@ def test_process_review_action_reject_success(
)
rejected_review = PendingHumanReviewModel(
node_exec_id="test_node_123",
user_id="test_user",
user_id=test_user_id,
graph_exec_id="test_graph_exec_456",
graph_id="test_graph_789",
graph_version=1,
@@ -223,11 +241,19 @@ def test_process_review_action_reject_success(
mock_process_all_reviews.return_value = {"test_node_123": rejected_review}
mock_has_pending = mocker.patch(
"backend.data.human_review.has_pending_reviews_for_graph_exec"
"backend.server.v2.executions.review.routes.has_pending_reviews_for_graph_exec"
)
mock_has_pending.return_value = False
request_data = {"approved_reviews": [], "rejected_review_ids": ["test_node_123"]}
request_data = {
"reviews": [
{
"node_exec_id": "test_node_123",
"approved": False,
"message": None,
}
]
}
response = client.post("/api/review/action", json=request_data)
@@ -242,12 +268,13 @@ def test_process_review_action_reject_success(
def test_process_review_action_mixed_success(
mocker: pytest_mock.MockFixture,
sample_pending_review: PendingHumanReviewModel,
test_user_id: str,
) -> None:
"""Test mixed approve/reject operations"""
# Create a second review
second_review = PendingHumanReviewModel(
node_exec_id="test_node_456",
user_id="test_user",
user_id=test_user_id,
graph_exec_id="test_graph_exec_456",
graph_id="test_graph_789",
graph_version=1,
@@ -263,13 +290,7 @@ def test_process_review_action_mixed_success(
reviewed_at=None,
)
# Mock the validation functions
mock_get_pending_review = mocker.patch(
"backend.data.human_review.get_pending_review_by_node_exec_id"
)
mock_get_pending_review.side_effect = lambda node_id, user_id: (
sample_pending_review if node_id == "test_node_123" else second_review
)
# Mock the route functions
mock_get_reviews_for_execution = mocker.patch(
"backend.server.v2.executions.review.routes.get_pending_reviews_for_execution"
@@ -282,7 +303,7 @@ def test_process_review_action_mixed_success(
# Create approved version of first review
approved_review = PendingHumanReviewModel(
node_exec_id="test_node_123",
user_id="test_user",
user_id=test_user_id,
graph_exec_id="test_graph_exec_456",
graph_id="test_graph_789",
graph_version=1,
@@ -300,7 +321,7 @@ def test_process_review_action_mixed_success(
# Create rejected version of second review
rejected_review = PendingHumanReviewModel(
node_exec_id="test_node_456",
user_id="test_user",
user_id=test_user_id,
graph_exec_id="test_graph_exec_456",
graph_id="test_graph_789",
graph_version=1,
@@ -321,19 +342,24 @@ def test_process_review_action_mixed_success(
}
mock_has_pending = mocker.patch(
"backend.data.human_review.has_pending_reviews_for_graph_exec"
"backend.server.v2.executions.review.routes.has_pending_reviews_for_graph_exec"
)
mock_has_pending.return_value = False
request_data = {
"approved_reviews": [
"reviews": [
{
"node_exec_id": "test_node_123",
"approved": True,
"message": "Approved",
"reviewed_data": {"data": "modified"},
}
],
"rejected_review_ids": ["test_node_456"],
},
{
"node_exec_id": "test_node_456",
"approved": False,
"message": None,
},
]
}
response = client.post("/api/review/action", json=request_data)
@@ -348,52 +374,64 @@ def test_process_review_action_mixed_success(
def test_process_review_action_empty_request(
mocker: pytest_mock.MockFixture,
test_user_id: str,
) -> None:
"""Test error when no reviews provided"""
request_data = {"approved_reviews": [], "rejected_review_ids": []}
request_data = {"reviews": []}
response = client.post("/api/review/action", json=request_data)
assert response.status_code == 400
assert "At least one review must be provided" in response.json()["detail"]
assert response.status_code == 422
response_data = response.json()
# Pydantic validation error format
assert isinstance(response_data["detail"], list)
assert len(response_data["detail"]) > 0
assert "At least one review must be provided" in response_data["detail"][0]["msg"]
def test_process_review_action_review_not_found(
mocker: pytest_mock.MockFixture,
test_user_id: str,
) -> None:
"""Test error when review is not found"""
mock_get_pending_review = mocker.patch(
"backend.data.human_review.get_pending_review_by_node_exec_id"
# Mock the functions that extract graph execution ID from the request
mock_get_reviews_for_execution = mocker.patch(
"backend.server.v2.executions.review.routes.get_pending_reviews_for_execution"
)
mock_get_reviews_for_execution.return_value = [] # No reviews found
# Mock process_all_reviews to simulate not finding reviews
mock_process_all_reviews = mocker.patch(
"backend.server.v2.executions.review.routes.process_all_reviews_for_execution"
)
# This should raise a ValueError with "Reviews not found" message based on the data/human_review.py logic
mock_process_all_reviews.side_effect = ValueError(
"Reviews not found or access denied for IDs: nonexistent_node"
)
mock_get_pending_review.return_value = None
request_data = {
"approved_reviews": [
"reviews": [
{
"node_exec_id": "nonexistent_node",
"approved": True,
"message": "Test",
}
],
"rejected_review_ids": [],
]
}
response = client.post("/api/review/action", json=request_data)
assert response.status_code == 403
assert "not found or access denied" in response.json()["detail"]
assert response.status_code == 400
assert "Reviews not found" in response.json()["detail"]
def test_process_review_action_partial_failure(
mocker: pytest_mock.MockFixture,
sample_pending_review: PendingHumanReviewModel,
test_user_id: str,
) -> None:
"""Test handling of partial failures in review processing"""
# Mock successful validation
mock_get_pending_review = mocker.patch(
"backend.data.human_review.get_pending_review_by_node_exec_id"
)
mock_get_pending_review.return_value = sample_pending_review
# Mock the route functions
mock_get_reviews_for_execution = mocker.patch(
"backend.server.v2.executions.review.routes.get_pending_reviews_for_execution"
)
@@ -406,58 +444,53 @@ def test_process_review_action_partial_failure(
mock_process_all_reviews.side_effect = ValueError("Some reviews failed validation")
request_data = {
"approved_reviews": [
"reviews": [
{
"node_exec_id": "test_node_123",
"approved": True,
"message": "Test",
}
],
"rejected_review_ids": [],
]
}
response = client.post("/api/review/action", json=request_data)
assert response.status_code == 200
data = response.json()
assert data["approved_count"] == 0
assert data["rejected_count"] == 0
assert data["failed_count"] == 1
assert "Failed to process reviews" in data["error"]
assert response.status_code == 400
assert "Some reviews failed validation" in response.json()["detail"]
def test_process_review_action_complete_failure(
def test_process_review_action_invalid_node_exec_id(
mocker: pytest_mock.MockFixture,
sample_pending_review: PendingHumanReviewModel,
test_user_id: str,
) -> None:
"""Test complete failure scenario"""
# Mock successful validation
mock_get_pending_review = mocker.patch(
"backend.data.human_review.get_pending_review_by_node_exec_id"
)
mock_get_pending_review.return_value = sample_pending_review
"""Test failure when trying to process review with invalid node execution ID"""
# Mock the route functions
mock_get_reviews_for_execution = mocker.patch(
"backend.server.v2.executions.review.routes.get_pending_reviews_for_execution"
)
mock_get_reviews_for_execution.return_value = [sample_pending_review]
# Mock complete failure in processing
# Mock validation failure - this should return 400, not 500
mock_process_all_reviews = mocker.patch(
"backend.server.v2.executions.review.routes.process_all_reviews_for_execution"
)
mock_process_all_reviews.side_effect = Exception("Database error")
mock_process_all_reviews.side_effect = ValueError(
"Invalid node execution ID format"
)
request_data = {
"approved_reviews": [
"reviews": [
{
"node_exec_id": "test_node_123",
"node_exec_id": "invalid-node-format",
"approved": True,
"message": "Test",
}
],
"rejected_review_ids": [],
]
}
response = client.post("/api/review/action", json=request_data)
assert response.status_code == 500
assert "error" in response.json()["detail"].lower()
# Should be a 400 Bad Request, not 500 Internal Server Error
assert response.status_code == 400
assert "Invalid node execution ID format" in response.json()["detail"]

View File

@@ -17,6 +17,7 @@ import backend.server.v2.store.media as store_media
from backend.data.block import BlockInput
from backend.data.db import transaction
from backend.data.execution import get_graph_execution
from backend.data.graph import GraphSettings
from backend.data.includes import AGENT_PRESET_INCLUDE, library_agent_include
from backend.data.model import CredentialsMetaInput
from backend.integrations.creds_manager import IntegrationCredentialsManager
@@ -400,6 +401,24 @@ async def add_generated_agent_image(
)
def _initialize_graph_settings(graph: graph_db.GraphModel) -> GraphSettings:
"""
Initialize GraphSettings based on graph content.
Args:
graph: The graph to analyze
Returns:
GraphSettings with appropriate human_in_the_loop_safe_mode value
"""
if graph.has_human_in_the_loop:
# Graph has HITL blocks - set safe mode to True by default
return GraphSettings(human_in_the_loop_safe_mode=True)
else:
# Graph has no HITL blocks - keep None
return GraphSettings(human_in_the_loop_safe_mode=None)
async def create_library_agent(
graph: graph_db.GraphModel,
user_id: str,
@@ -445,6 +464,9 @@ async def create_library_agent(
}
}
},
settings=SafeJson(
_initialize_graph_settings(graph_entry).model_dump()
),
),
include=library_agent_include(
user_id, include_nodes=False, include_executions=False
@@ -465,7 +487,7 @@ async def update_agent_version_in_library(
user_id: str,
agent_graph_id: str,
agent_graph_version: int,
) -> None:
) -> library_model.LibraryAgent:
"""
Updates the agent version in the library if useGraphIsActiveVersion is True.
@@ -489,7 +511,7 @@ async def update_agent_version_in_library(
"useGraphIsActiveVersion": True,
},
)
await prisma.models.LibraryAgent.prisma().update(
lib = await prisma.models.LibraryAgent.prisma().update(
where={"id": library_agent.id},
data={
"AgentGraph": {
@@ -502,6 +524,10 @@ async def update_agent_version_in_library(
},
},
)
if lib is None:
raise NotFoundError(f"Library agent {library_agent.id} not found")
return library_model.LibraryAgent.from_db(lib)
except prisma.errors.PrismaError as e:
logger.error(f"Database error updating agent version in library: {e}")
raise DatabaseError("Failed to update agent version in library") from e
@@ -514,6 +540,7 @@ async def update_library_agent(
is_favorite: Optional[bool] = None,
is_archived: Optional[bool] = None,
is_deleted: Optional[Literal[False]] = None,
settings: Optional[GraphSettings] = None,
) -> library_model.LibraryAgent:
"""
Updates the specified LibraryAgent record.
@@ -524,6 +551,7 @@ async def update_library_agent(
auto_update_version: Whether the agent should auto-update to active version.
is_favorite: Whether this agent is marked as a favorite.
is_archived: Whether this agent is archived.
settings: User-specific settings for this library agent.
Returns:
The updated LibraryAgent.
@@ -535,7 +563,7 @@ async def update_library_agent(
logger.debug(
f"Updating library agent {library_agent_id} for user {user_id} with "
f"auto_update_version={auto_update_version}, is_favorite={is_favorite}, "
f"is_archived={is_archived}"
f"is_archived={is_archived}, settings={settings}"
)
update_fields: prisma.types.LibraryAgentUpdateManyMutationInput = {}
if auto_update_version is not None:
@@ -550,6 +578,8 @@ async def update_library_agent(
"Use delete_library_agent() to (soft-)delete library agents"
)
update_fields["isDeleted"] = is_deleted
if settings is not None:
update_fields["settings"] = SafeJson(settings.model_dump())
if not update_fields:
raise ValueError("No values were passed to update")
@@ -570,6 +600,33 @@ async def update_library_agent(
raise DatabaseError("Failed to update library agent") from e
async def update_library_agent_settings(
user_id: str,
agent_id: str,
settings: GraphSettings,
) -> library_model.LibraryAgent:
"""
Updates the settings for a specific LibraryAgent.
Args:
user_id: The owner of the LibraryAgent.
agent_id: The ID of the LibraryAgent to update.
settings: New GraphSettings to apply.
Returns:
The updated LibraryAgent.
Raises:
NotFoundError: If the specified LibraryAgent does not exist.
DatabaseError: If there's an error in the update operation.
"""
return await update_library_agent(
library_agent_id=agent_id,
user_id=user_id,
settings=settings,
)
async def delete_library_agent(
library_agent_id: str, user_id: str, soft_delete: bool = True
) -> None:
@@ -706,6 +763,18 @@ async def add_store_agent_to_library(
graph = store_listing_version.AgentGraph
# Convert to GraphModel to check for HITL blocks
graph_model = await graph_db.get_graph(
graph_id=graph.id,
version=graph.version,
user_id=user_id,
include_subgraphs=False,
)
if not graph_model:
raise store_exceptions.AgentNotFoundError(
f"Graph #{graph.id} v{graph.version} not found or accessible"
)
# Check if user already has this agent
existing_library_agent = await prisma.models.LibraryAgent.prisma().find_unique(
where={
@@ -740,6 +809,9 @@ async def add_store_agent_to_library(
}
},
"isCreatedByUser": False,
"settings": SafeJson(
_initialize_graph_settings(graph_model).model_dump()
),
},
include=library_agent_include(
user_id, include_nodes=False, include_executions=False

View File

@@ -32,6 +32,7 @@ async def test_get_library_agents(mocker):
id="ua1",
userId="test-user",
agentGraphId="agent2",
settings="{}", # type: ignore
agentGraphVersion=1,
isCreatedByUser=False,
isDeleted=False,
@@ -123,6 +124,7 @@ async def test_add_agent_to_library(mocker):
id="ua1",
userId="test-user",
agentGraphId=mock_store_listing_data.agentGraphId,
settings="{}", # type: ignore
agentGraphVersion=1,
isCreatedByUser=False,
isDeleted=False,
@@ -148,6 +150,14 @@ async def test_add_agent_to_library(mocker):
return_value=mock_library_agent_data
)
# Mock graph_db.get_graph function that's called to check for HITL blocks
mock_graph_db = mocker.patch("backend.server.v2.library.db.graph_db")
mock_graph_model = mocker.Mock()
mock_graph_model.nodes = (
[]
) # Empty list so _has_human_in_the_loop_blocks returns False
mock_graph_db.get_graph = mocker.AsyncMock(return_value=mock_graph_model)
# Mock the model conversion
mock_from_db = mocker.patch("backend.server.v2.library.model.LibraryAgent.from_db")
mock_from_db.return_value = mocker.Mock()
@@ -169,17 +179,29 @@ async def test_add_agent_to_library(mocker):
},
include={"AgentGraph": True},
)
mock_library_agent.return_value.create.assert_called_once_with(
data={
"User": {"connect": {"id": "test-user"}},
"AgentGraph": {
"connect": {"graphVersionId": {"id": "agent1", "version": 1}}
},
"isCreatedByUser": False,
},
include=library_agent_include(
"test-user", include_nodes=False, include_executions=False
),
# Check that create was called with the expected data including settings
create_call_args = mock_library_agent.return_value.create.call_args
assert create_call_args is not None
# Verify the main structure
expected_data = {
"User": {"connect": {"id": "test-user"}},
"AgentGraph": {"connect": {"graphVersionId": {"id": "agent1", "version": 1}}},
"isCreatedByUser": False,
}
actual_data = create_call_args[1]["data"]
# Check that all expected fields are present
for key, value in expected_data.items():
assert actual_data[key] == value
# Check that settings field is present and is a SafeJson object
assert "settings" in actual_data
assert hasattr(actual_data["settings"], "__class__") # Should be a SafeJson object
# Check include parameter
assert create_call_args[1]["include"] == library_agent_include(
"test-user", include_nodes=False, include_executions=False
)

View File

@@ -6,8 +6,8 @@ import prisma.enums
import prisma.models
import pydantic
import backend.data.block as block_model
import backend.data.graph as graph_model
from backend.data.block import BlockInput
from backend.data.graph import GraphModel, GraphSettings, GraphTriggerInfo
from backend.data.model import CredentialsMetaInput, is_credentials_field_name
from backend.util.models import Pagination
@@ -72,7 +72,7 @@ class LibraryAgent(pydantic.BaseModel):
has_external_trigger: bool = pydantic.Field(
description="Whether the agent has an external trigger (e.g. webhook) node"
)
trigger_setup_info: Optional[graph_model.GraphTriggerInfo] = None
trigger_setup_info: Optional[GraphTriggerInfo] = None
# Indicates whether there's a new output (based on recent runs)
new_output: bool
@@ -89,6 +89,9 @@ class LibraryAgent(pydantic.BaseModel):
# Recommended schedule cron (from marketplace agents)
recommended_schedule_cron: str | None = None
# User-specific settings for this library agent
settings: GraphSettings = pydantic.Field(default_factory=GraphSettings)
# Marketplace listing information if the agent has been published
marketplace_listing: Optional["MarketplaceListing"] = None
@@ -106,7 +109,7 @@ class LibraryAgent(pydantic.BaseModel):
if not agent.AgentGraph:
raise ValueError("Associated Agent record is required.")
graph = graph_model.GraphModel.from_db(agent.AgentGraph, sub_graphs=sub_graphs)
graph = GraphModel.from_db(agent.AgentGraph, sub_graphs=sub_graphs)
created_at = agent.createdAt
@@ -181,6 +184,7 @@ class LibraryAgent(pydantic.BaseModel):
is_latest_version=is_latest_version,
is_favorite=agent.isFavorite,
recommended_schedule_cron=agent.AgentGraph.recommendedScheduleCron,
settings=GraphSettings.model_validate(agent.settings),
marketplace_listing=marketplace_listing_data,
)
@@ -249,7 +253,7 @@ class LibraryAgentPresetCreatable(pydantic.BaseModel):
graph_id: str
graph_version: int
inputs: block_model.BlockInput
inputs: BlockInput
credentials: dict[str, CredentialsMetaInput]
name: str
@@ -278,7 +282,7 @@ class LibraryAgentPresetUpdatable(pydantic.BaseModel):
Request model used when updating a preset for a library agent.
"""
inputs: Optional[block_model.BlockInput] = None
inputs: Optional[BlockInput] = None
credentials: Optional[dict[str, CredentialsMetaInput]] = None
name: Optional[str] = None
@@ -321,7 +325,7 @@ class LibraryAgentPreset(LibraryAgentPresetCreatable):
"Webhook must be included in AgentPreset query when webhookId is set"
)
input_data: block_model.BlockInput = {}
input_data: BlockInput = {}
input_credentials: dict[str, CredentialsMetaInput] = {}
for preset_input in preset.InputPresets:
@@ -387,3 +391,6 @@ class LibraryAgentUpdateRequest(pydantic.BaseModel):
is_archived: Optional[bool] = pydantic.Field(
default=None, description="Archive the agent"
)
settings: Optional[GraphSettings] = pydantic.Field(
default=None, description="User-specific settings for this library agent"
)

View File

@@ -276,6 +276,7 @@ async def update_library_agent(
auto_update_version=payload.auto_update_version,
is_favorite=payload.is_favorite,
is_archived=payload.is_archived,
settings=payload.settings,
)
except NotFoundError as e:
raise HTTPException(

View File

@@ -27,6 +27,7 @@ from backend.util.settings import Settings
P = ParamSpec("P")
R = TypeVar("R")
R_co = TypeVar("R_co", covariant=True)
T = TypeVar("T")
logger = logging.getLogger(__name__)
settings = Settings()
@@ -143,7 +144,7 @@ def cached(
ttl_seconds: int,
shared_cache: bool = False,
refresh_ttl_on_get: bool = False,
) -> Callable[[Callable], CachedFunction]:
) -> Callable[[Callable[P, R]], CachedFunction[P, R]]:
"""
Thundering herd safe cache decorator for both sync and async functions.
@@ -169,7 +170,7 @@ def cached(
return {"result": param}
"""
def decorator(target_func):
def decorator(target_func: Callable[P, R]) -> CachedFunction[P, R]:
cache_storage: dict[tuple, CachedValue] = {}
_event_loop_locks: dict[Any, asyncio.Lock] = {}
@@ -386,7 +387,7 @@ def cached(
setattr(wrapper, "cache_info", cache_info)
setattr(wrapper, "cache_delete", cache_delete)
return cast(CachedFunction, wrapper)
return cast(CachedFunction[P, R], wrapper)
return decorator

View File

@@ -9,9 +9,9 @@ from autogpt_libs.auth import get_user_id
from backend.data import db
from backend.data.block import Block, BlockSchema, initialize_blocks
from backend.data.execution import (
ExecutionContext,
ExecutionStatus,
NodeExecutionResult,
UserContext,
get_graph_execution,
)
from backend.data.model import _BaseCredentials
@@ -141,7 +141,7 @@ async def execute_block_test(block: Block):
"node_exec_id": str(uuid.uuid4()),
"user_id": str(uuid.uuid4()),
"graph_version": 1, # Default version for tests
"user_context": UserContext(timezone="UTC"), # Default for tests
"execution_context": ExecutionContext(),
}
input_model = cast(type[BlockSchema], block.input_schema)
credentials_input_fields = input_model.get_credentials_fields()

View File

@@ -10,6 +10,8 @@ from zoneinfo import ZoneInfo
from croniter import croniter
from backend.data.model import USER_TIMEZONE_NOT_SET
logger = logging.getLogger(__name__)
@@ -138,7 +140,7 @@ def get_user_timezone_or_utc(user_timezone: Optional[str]) -> str:
Returns:
Valid timezone string (user's preference or UTC fallback)
"""
if not user_timezone or user_timezone == "not-set":
if not user_timezone or user_timezone == USER_TIMEZONE_NOT_SET:
return "UTC"
if validate_timezone(user_timezone):

View File

@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "LibraryAgent" ADD COLUMN "settings" JSONB NOT NULL DEFAULT '{}';

View File

@@ -266,6 +266,8 @@ model LibraryAgent {
isArchived Boolean @default(false)
isDeleted Boolean @default(false)
settings Json @default("{}")
@@unique([userId, agentGraphId, agentGraphVersion])
@@index([agentGraphId, agentGraphVersion])
@@index([creatorId])
@@ -478,26 +480,26 @@ enum ReviewStatus {
// Pending human reviews for Human-in-the-loop blocks
model PendingHumanReview {
nodeExecId String @id
userId String
graphExecId String
graphId String
graphVersion Int
payload Json // The actual payload data to be reviewed
instructions String? // Instructions/message for the reviewer
editable Boolean @default(true) // Whether the reviewer can edit the data
status ReviewStatus @default(WAITING)
reviewMessage String? // Optional message from the reviewer
wasEdited Boolean? // Whether the data was modified during review
processed Boolean @default(false) // Whether the review result has been processed by the execution engine
createdAt DateTime @default(now())
updatedAt DateTime? @updatedAt
reviewedAt DateTime?
User User @relation(fields: [userId], references: [id], onDelete: Cascade)
NodeExecution AgentNodeExecution @relation(fields: [nodeExecId], references: [id], onDelete: Cascade)
GraphExecution AgentGraphExecution @relation(fields: [graphExecId], references: [id], onDelete: Cascade)
nodeExecId String @id
userId String
graphExecId String
graphId String
graphVersion Int
payload Json // The actual payload data to be reviewed
instructions String? // Instructions/message for the reviewer
editable Boolean @default(true) // Whether the reviewer can edit the data
status ReviewStatus @default(WAITING)
reviewMessage String? // Optional message from the reviewer
wasEdited Boolean? // Whether the data was modified during review
processed Boolean @default(false) // Whether the review result has been processed by the execution engine
createdAt DateTime @default(now())
updatedAt DateTime? @updatedAt
reviewedAt DateTime?
User User @relation(fields: [userId], references: [id], onDelete: Cascade)
NodeExecution AgentNodeExecution @relation(fields: [nodeExecId], references: [id], onDelete: Cascade)
GraphExecution AgentGraphExecution @relation(fields: [graphExecId], references: [id], onDelete: Cascade)
@@unique([nodeExecId]) // One pending review per node execution
@@index([userId, status])
@@index([graphExecId, status])
@@ -704,7 +706,7 @@ view StoreAgent {
agent_video String?
agent_image String[]
featured Boolean @default(false)
featured Boolean @default(false)
creator_username String?
creator_avatar String?
sub_heading String
@@ -714,8 +716,8 @@ view StoreAgent {
runs Int
rating Float
versions String[]
is_available Boolean @default(true)
useForOnboarding Boolean @default(false)
is_available Boolean @default(true)
useForOnboarding Boolean @default(false)
// Materialized views used (refreshed every 15 minutes via pg_cron):
// - mv_agent_run_counts - Pre-aggregated agent execution counts by agentGraphId

View File

@@ -9,6 +9,7 @@
"forked_from_id": null,
"forked_from_version": null,
"has_external_trigger": false,
"has_human_in_the_loop": false,
"id": "graph-123",
"input_schema": {
"properties": {},

View File

@@ -9,6 +9,7 @@
"forked_from_id": null,
"forked_from_version": null,
"has_external_trigger": false,
"has_human_in_the_loop": false,
"id": "graph-123",
"input_schema": {
"properties": {},

View File

@@ -32,6 +32,9 @@
"is_latest_version": true,
"is_favorite": false,
"recommended_schedule_cron": null,
"settings": {
"human_in_the_loop_safe_mode": null
},
"marketplace_listing": null
},
{
@@ -66,6 +69,9 @@
"is_latest_version": true,
"is_favorite": false,
"recommended_schedule_cron": null,
"settings": {
"human_in_the_loop_safe_mode": null
},
"marketplace_listing": null
}
],

View File

@@ -59,10 +59,9 @@ class QueueOrderTester:
"graph_exec_id": f"exec-{message_id}",
"graph_id": f"graph-{message_id}",
"user_id": user_id,
"user_context": {"timezone": "UTC"},
"execution_context": {"timezone": "UTC"},
"nodes_input_masks": {},
"starting_nodes_input": [],
"parent_graph_exec_id": None,
}
)

View File

@@ -16,14 +16,29 @@ import { useCopyPaste } from "./useCopyPaste";
import { FloatingReviewsPanel } from "@/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel";
import { parseAsString, useQueryStates } from "nuqs";
import { CustomControls } from "./components/CustomControl";
import { FloatingSafeModeToggle } from "@/components/molecules/FloatingSafeModeToggle/FloatingSafeModeToggle";
import { useGetV1GetSpecificGraph } from "@/app/api/__generated__/endpoints/graphs/graphs";
import { GraphModel } from "@/app/api/__generated__/models/graphModel";
import { okData } from "@/app/api/helpers";
import { TriggerAgentBanner } from "./components/TriggerAgentBanner";
export const Flow = () => {
const [{ flowExecutionID }] = useQueryStates({
const [{ flowID, flowExecutionID }] = useQueryStates({
flowID: parseAsString,
flowExecutionID: parseAsString,
});
const { data: graph } = useGetV1GetSpecificGraph(
flowID ?? "",
{},
{
query: {
select: okData<GraphModel>,
enabled: !!flowID,
},
},
);
const nodes = useNodeStore(useShallow((state) => state.nodes));
const onNodesChange = useNodeStore(
useShallow((state) => state.onNodesChange),
@@ -83,9 +98,19 @@ export const Flow = () => {
{hasWebhookNodes ? <TriggerAgentBanner /> : <BuilderActions />}
{<GraphLoadingBox flowContentLoading={isFlowContentLoading} />}
{isGraphRunning && <RunningBackground />}
{graph && (
<FloatingSafeModeToggle
graph={graph}
className="right-4 top-32 p-2"
variant="black"
/>
)}
</ReactFlow>
</div>
<FloatingReviewsPanel executionId={flowExecutionID || undefined} />
<FloatingReviewsPanel
executionId={flowExecutionID || undefined}
graphId={flowID || undefined}
/>
</div>
);
};

View File

@@ -65,6 +65,7 @@ import NewControlPanel from "@/app/(platform)/build/components/NewControlPanel/N
import { Flag, useGetFlag } from "@/services/feature-flags/use-get-flag";
import { BuildActionBar } from "../BuildActionBar";
import { FloatingReviewsPanel } from "@/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel";
import { FloatingSafeModeToggle } from "@/components/molecules/FloatingSafeModeToggle/FloatingSafeModeToggle";
// This is for the history, this is the minimum distance a block must move before it is logged
// It helps to prevent spamming the history with small movements especially when pressing on a input in a block
@@ -927,6 +928,13 @@ const FlowEditor: React.FC<{
>
<Controls />
<Background className="dark:bg-slate-800" />
{savedAgent && (
<FloatingSafeModeToggle
graph={savedAgent}
className="right-4 top-32 p-2"
variant="black"
/>
)}
{isNewBlockEnabled ? (
<NewControlPanel
flowExecutionID={flowExecutionID}
@@ -1027,6 +1035,7 @@ const FlowEditor: React.FC<{
)}
<FloatingReviewsPanel
executionId={flowExecutionID || undefined}
graphId={flowID || undefined}
className="fixed bottom-24 right-4"
/>
<Suspense fallback={null}>

View File

@@ -14,6 +14,7 @@ import moment from "moment";
import { AgentActionsDropdown } from "../AgentActionsDropdown";
import { RunStatusBadge } from "../SelectedRunView/components/RunStatusBadge";
import { ShareRunButton } from "../ShareRunButton/ShareRunButton";
import { FloatingSafeModeToggle } from "@/components/molecules/FloatingSafeModeToggle/FloatingSafeModeToggle";
import { useRunDetailHeader } from "./useRunDetailHeader";
type Props = {
@@ -79,6 +80,11 @@ export function RunDetailHeader({
shareToken={run.share_token}
/>
)}
<FloatingSafeModeToggle
graph={agent}
variant="white"
fullWidth={false}
/>
{!isRunning ? (
<Button
variant="secondary"

View File

@@ -5,9 +5,9 @@ import { useQueryClient } from "@tanstack/react-query";
import {
usePostV1StopGraphExecution,
getGetV1ListGraphExecutionsInfiniteQueryOptions,
useDeleteV1DeleteGraphExecution,
usePostV1ExecuteGraphAgent,
} from "@/app/api/__generated__/endpoints/graphs/graphs";
import { useDeleteV1DeleteGraphExecution } from "@/app/api/__generated__/endpoints/graphs/graphs";
import { usePostV1ExecuteGraphAgent } from "@/app/api/__generated__/endpoints/graphs/graphs";
import type { GraphExecution } from "@/app/api/__generated__/models/graphExecution";
import { useState } from "react";

View File

@@ -1702,6 +1702,52 @@
}
}
},
"/api/graphs/{graph_id}/settings": {
"patch": {
"tags": ["v1", "graphs"],
"summary": "Update graph settings",
"description": "Update graph settings for the user's library agent.",
"operationId": "patchV1Update graph settings",
"security": [{ "HTTPBearerJWT": [] }],
"parameters": [
{
"name": "graph_id",
"in": "path",
"required": true,
"schema": { "type": "string", "title": "Graph Id" }
}
],
"requestBody": {
"required": true,
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/GraphSettings" }
}
}
},
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/GraphSettings" }
}
}
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/HTTPValidationError" }
}
}
},
"401": {
"$ref": "#/components/responses/HTTP401NotAuthenticatedError"
}
}
}
},
"/api/graphs/{graph_id}/execute/{graph_version}": {
"post": {
"tags": ["v1", "graphs"],
@@ -5469,6 +5515,11 @@
"title": "Has External Trigger",
"readOnly": true
},
"has_human_in_the_loop": {
"type": "boolean",
"title": "Has Human In The Loop",
"readOnly": true
},
"trigger_setup_info": {
"anyOf": [
{ "$ref": "#/components/schemas/GraphTriggerInfo" },
@@ -5484,6 +5535,7 @@
"input_schema",
"output_schema",
"has_external_trigger",
"has_human_in_the_loop",
"trigger_setup_info"
],
"title": "BaseGraph"
@@ -6568,6 +6620,11 @@
"title": "Has External Trigger",
"readOnly": true
},
"has_human_in_the_loop": {
"type": "boolean",
"title": "Has Human In The Loop",
"readOnly": true
},
"trigger_setup_info": {
"anyOf": [
{ "$ref": "#/components/schemas/GraphTriggerInfo" },
@@ -6590,6 +6647,7 @@
"input_schema",
"output_schema",
"has_external_trigger",
"has_human_in_the_loop",
"trigger_setup_info",
"credentials_input_schema"
],
@@ -6663,6 +6721,11 @@
"title": "Has External Trigger",
"readOnly": true
},
"has_human_in_the_loop": {
"type": "boolean",
"title": "Has Human In The Loop",
"readOnly": true
},
"trigger_setup_info": {
"anyOf": [
{ "$ref": "#/components/schemas/GraphTriggerInfo" },
@@ -6686,11 +6749,22 @@
"input_schema",
"output_schema",
"has_external_trigger",
"has_human_in_the_loop",
"trigger_setup_info",
"credentials_input_schema"
],
"title": "GraphModel"
},
"GraphSettings": {
"properties": {
"human_in_the_loop_safe_mode": {
"anyOf": [{ "type": "boolean" }, { "type": "null" }],
"title": "Human In The Loop Safe Mode"
}
},
"type": "object",
"title": "GraphSettings"
},
"GraphTriggerInfo": {
"properties": {
"provider": {
@@ -6862,6 +6936,7 @@
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Recommended Schedule Cron"
},
"settings": { "$ref": "#/components/schemas/GraphSettings" },
"marketplace_listing": {
"anyOf": [
{ "$ref": "#/components/schemas/MarketplaceListing" },
@@ -7109,6 +7184,13 @@
"anyOf": [{ "type": "boolean" }, { "type": "null" }],
"title": "Is Archived",
"description": "Archive the agent"
},
"settings": {
"anyOf": [
{ "$ref": "#/components/schemas/GraphSettings" },
{ "type": "null" }
],
"description": "User-specific settings for this library agent"
}
},
"type": "object",

View File

@@ -0,0 +1,206 @@
import { useCallback, useState, useEffect } from "react";
import { ShieldIcon, ShieldCheckIcon } from "@phosphor-icons/react";
import { Button } from "@/components/atoms/Button/Button";
import {
Tooltip,
TooltipContent,
TooltipTrigger,
} from "@/components/atoms/Tooltip/BaseTooltip";
import { usePatchV1UpdateGraphSettings } from "@/app/api/__generated__/endpoints/graphs/graphs";
import {
getGetV2GetLibraryAgentQueryOptions,
useGetV2GetLibraryAgentByGraphId,
} from "@/app/api/__generated__/endpoints/library/library";
import { useToast } from "@/components/molecules/Toast/use-toast";
import { cn } from "@/lib/utils";
import { GraphModel } from "@/app/api/__generated__/models/graphModel";
import { LibraryAgent } from "@/app/api/__generated__/models/libraryAgent";
import { useQueryClient } from "@tanstack/react-query";
import { Graph } from "@/lib/autogpt-server-api/types";
function getGraphId(graph: GraphModel | LibraryAgent | Graph): string {
if ("graph_id" in graph) return graph.graph_id || "";
return (graph.id || "").toString();
}
function hasHITLBlocks(graph: GraphModel | LibraryAgent | Graph): boolean {
if ("has_human_in_the_loop" in graph) {
return !!graph.has_human_in_the_loop;
}
if (isLibraryAgent(graph)) {
return graph.settings?.human_in_the_loop_safe_mode !== null;
}
return false;
}
function isLibraryAgent(
graph: GraphModel | LibraryAgent | Graph,
): graph is LibraryAgent {
return "graph_id" in graph && "settings" in graph;
}
interface FloatingSafeModeToggleProps {
graph: GraphModel | LibraryAgent | Graph;
className?: string;
fullWidth?: boolean;
variant?: "white" | "black";
}
export function FloatingSafeModeToggle({
graph,
className,
fullWidth = false,
variant = "white",
}: FloatingSafeModeToggleProps) {
const { toast } = useToast();
const queryClient = useQueryClient();
const graphId = getGraphId(graph);
const isAgent = isLibraryAgent(graph);
const shouldShowToggle = hasHITLBlocks(graph);
const { mutateAsync: updateGraphSettings, isPending } =
usePatchV1UpdateGraphSettings();
const { data: libraryAgent, isLoading } = useGetV2GetLibraryAgentByGraphId(
graphId,
{},
{ query: { enabled: !isAgent && shouldShowToggle } },
);
const [localSafeMode, setLocalSafeMode] = useState<boolean | null>(null);
useEffect(() => {
if (!isAgent && libraryAgent?.status === 200) {
const backendValue =
libraryAgent.data?.settings?.human_in_the_loop_safe_mode;
if (backendValue !== undefined) {
setLocalSafeMode(backendValue);
}
}
}, [isAgent, libraryAgent]);
const currentSafeMode = isAgent
? graph.settings?.human_in_the_loop_safe_mode
: localSafeMode;
const isStateUndetermined = isAgent
? graph.settings?.human_in_the_loop_safe_mode == null
: isLoading || localSafeMode === null;
const handleToggle = useCallback(async () => {
const newSafeMode = !currentSafeMode;
try {
await updateGraphSettings({
graphId,
data: { human_in_the_loop_safe_mode: newSafeMode },
});
if (!isAgent) {
setLocalSafeMode(newSafeMode);
}
if (isAgent) {
queryClient.invalidateQueries({
queryKey: getGetV2GetLibraryAgentQueryOptions(graph.id.toString())
.queryKey,
});
}
queryClient.invalidateQueries({
queryKey: ["v1", "graphs", graphId, "executions"],
});
queryClient.invalidateQueries({ queryKey: ["v2", "executions"] });
toast({
title: `Safe mode ${newSafeMode ? "enabled" : "disabled"}`,
description: newSafeMode
? "Human-in-the-loop blocks will require manual review"
: "Human-in-the-loop blocks will proceed automatically",
});
} catch (error) {
const isNotFoundError =
error instanceof Error &&
(error.message.includes("404") || error.message.includes("not found"));
if (!isAgent && isNotFoundError) {
toast({
title: "Safe mode not available",
description:
"To configure safe mode, please save this graph to your library first.",
variant: "destructive",
});
} else {
toast({
title: "Failed to update safe mode",
description:
error instanceof Error
? error.message
: "An unexpected error occurred.",
variant: "destructive",
});
}
}
}, [
currentSafeMode,
graphId,
isAgent,
graph.id,
updateGraphSettings,
queryClient,
toast,
]);
if (!shouldShowToggle || isStateUndetermined) {
return null;
}
return (
<div className={cn(variant === "black" ? "fixed z-50" : "", className)}>
<Tooltip delayDuration={100}>
<TooltipTrigger asChild>
<Button
variant="secondary"
size="small"
onClick={handleToggle}
disabled={isPending}
loading={isPending}
className={cn(
fullWidth ? "w-full" : "",
variant === "black"
? "bg-gray-800 text-white hover:bg-gray-700"
: "",
)}
>
{currentSafeMode! ? (
<>
<ShieldCheckIcon className="h-4 w-4" />
Safe Mode: ON
</>
) : (
<>
<ShieldIcon className="h-4 w-4" />
Safe Mode: OFF
</>
)}
</Button>
</TooltipTrigger>
<TooltipContent>
<div className="text-center">
<div className="font-medium">
Safe Mode: {currentSafeMode! ? "ON" : "OFF"}
</div>
<div className="mt-1 text-xs text-muted-foreground">
{currentSafeMode!
? "HITL blocks require manual review"
: "HITL blocks proceed automatically"}
</div>
</div>
</TooltipContent>
</Tooltip>
</div>
);
}

View File

@@ -5,36 +5,50 @@ import { Button } from "@/components/atoms/Button/Button";
import { ClockIcon, XIcon } from "@phosphor-icons/react";
import { cn } from "@/lib/utils";
import { Text } from "@/components/atoms/Text/Text";
import { useGetV1GetExecutionDetails } from "@/app/api/__generated__/endpoints/graphs/graphs";
import { AgentExecutionStatus } from "@/app/api/__generated__/models/agentExecutionStatus";
import { useGraphStore } from "@/app/(platform)/build/stores/graphStore";
interface FloatingReviewsPanelProps {
executionId?: string;
graphId?: string;
className?: string;
}
export function FloatingReviewsPanel({
executionId,
graphId,
className,
}: FloatingReviewsPanelProps) {
const [isOpen, setIsOpen] = useState(false);
const executionStatus = useGraphStore((state) => state.graphExecutionStatus);
const { data: executionDetails } = useGetV1GetExecutionDetails(
graphId || "",
executionId || "",
{
query: {
enabled: !!(graphId && executionId),
},
},
);
const executionStatus =
executionDetails?.status === 200 ? executionDetails.data.status : undefined;
const { pendingReviews, isLoading, refetch } = usePendingReviewsForExecution(
executionId || "",
);
useEffect(() => {
if (executionStatus === AgentExecutionStatus.REVIEW && executionId) {
if (executionId) {
refetch();
}
}, [executionStatus, executionId, refetch]);
if (
!executionId ||
(!isLoading && pendingReviews.length === 0) ||
executionStatus !== AgentExecutionStatus.REVIEW
(!isLoading &&
pendingReviews.length === 0 &&
executionStatus !== AgentExecutionStatus.REVIEW)
) {
return null;
}