Compare commits

..

5 Commits

Author SHA1 Message Date
Zamil Majdy
0da7a54952 test(backend): update tests to expect exceptions instead of None/False
- Update generate_embedding tests to expect RuntimeError/Exception instead of None
- Update store_embedding tests to expect Exception instead of False
- Update ensure_embedding tests to use side_effect=Exception instead of return_value=None
- Update hybrid search tests to mock embed_query with side_effect=Exception
- All tests now verify exception-based error handling instead of return value checking
2026-01-23 23:55:22 -06:00
Zamil Majdy
ba0aa83de3 fix(backend): remove unnecessary exception re-raising in store listing approval
- Let embedding exceptions bubble up naturally
- Only catch exceptions when graceful fallback is needed (search functions)
- Store listing approval should fail with actual error, not wrapped ValueError
2026-01-23 23:36:24 -06:00
Zamil Majdy
4c9333fc37 fix(backend): update callers to catch exceptions instead of checking None/False
- Update store listing approval to catch embedding errors with try/except
- Update semantic_search to catch embedding generation failures
- Update hybrid search (2 places) to catch and gracefully degrade to lexical search
- Remove None return type from embed_query (now raises exceptions)
- All error handling moved to caller level for consistency
2026-01-23 23:31:52 -06:00
Zamil Majdy
03c805ac5a refactor(backend): remove error swallowing in embedding functions
- Remove try/except blocks from individual embedding functions (generate_embedding, store_content_embedding, ensure_content_embedding, get_content_embedding, ensure_embedding)
- Let exceptions propagate to batch operation level
- Update batch error aggregation to show all unique error types with counts instead of just first error
- Prevents Sentry spam while providing better error diagnostics
2026-01-23 23:20:49 -06:00
Zamil Majdy
24d91a6e3e fix(backend): log only first error in embedding batch to prevent Sentry spam
Addresses AUTOGPT-SERVER-7D2

When batch operations like embedding backfill fail for multiple items with
the same error (e.g., database unavailable, vector type missing), log only
the first exception instead of spamming Sentry with identical alerts.

The batch already uses asyncio.gather(return_exceptions=True), so we just
check for failures and log the first exception with context about how many
items failed.

Before: 100 items fail → 100 identical Sentry alerts
After: 100 items fail → 1 Sentry alert with context

Impact: Reduces Sentry noise while preserving diagnostic information
2026-01-23 23:14:30 -06:00
31 changed files with 398 additions and 2139 deletions

View File

@@ -23,7 +23,6 @@ class PendingHumanReviewModel(BaseModel):
id: Unique identifier for the review record
user_id: ID of the user who must perform the review
node_exec_id: ID of the node execution that created this review
node_id: ID of the node definition (for grouping reviews from same node)
graph_exec_id: ID of the graph execution containing the node
graph_id: ID of the graph template being executed
graph_version: Version number of the graph template
@@ -38,10 +37,6 @@ class PendingHumanReviewModel(BaseModel):
"""
node_exec_id: str = Field(description="Node execution ID (primary key)")
node_id: str = Field(
description="Node definition ID (for grouping)",
default="", # Temporary default for test compatibility
)
user_id: str = Field(description="User ID associated with the review")
graph_exec_id: str = Field(description="Graph execution ID")
graph_id: str = Field(description="Graph ID")
@@ -71,9 +66,7 @@ class PendingHumanReviewModel(BaseModel):
)
@classmethod
def from_db(
cls, review: "PendingHumanReview", node_id: str
) -> "PendingHumanReviewModel":
def from_db(cls, review: "PendingHumanReview") -> "PendingHumanReviewModel":
"""
Convert a database model to a response model.
@@ -81,14 +74,9 @@ class PendingHumanReviewModel(BaseModel):
payload, instructions, and editable flag.
Handles invalid data gracefully by using safe defaults.
Args:
review: Database review object
node_id: Node definition ID (fetched from NodeExecution)
"""
return cls(
node_exec_id=review.nodeExecId,
node_id=node_id,
user_id=review.userId,
graph_exec_id=review.graphExecId,
graph_id=review.graphId,
@@ -119,13 +107,6 @@ class ReviewItem(BaseModel):
reviewed_data: SafeJsonData | None = Field(
None, description="Optional edited data (ignored if approved=False)"
)
auto_approve_future: bool = Field(
default=False,
description=(
"If true and this review is approved, future executions of this same "
"block (node) will be automatically approved. This only affects approved reviews."
),
)
@field_validator("reviewed_data")
@classmethod
@@ -193,9 +174,6 @@ class ReviewRequest(BaseModel):
This request must include ALL pending reviews for a graph execution.
Each review will be either approved (with optional data modifications)
or rejected (data ignored). The execution will resume only after ALL reviews are processed.
Each review item can individually specify whether to auto-approve future executions
of the same block via the `auto_approve_future` field on ReviewItem.
"""
reviews: List[ReviewItem] = Field(

View File

@@ -8,12 +8,6 @@ from prisma.enums import ReviewStatus
from pytest_snapshot.plugin import Snapshot
from backend.api.rest_api import handle_internal_http_error
from backend.data.execution import (
ExecutionContext,
ExecutionStatus,
NodeExecutionResult,
)
from backend.data.graph import GraphSettings
from .model import PendingHumanReviewModel
from .routes import router
@@ -21,24 +15,20 @@ from .routes import router
# Using a fixed timestamp for reproducible tests
FIXED_NOW = datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc)
app = fastapi.FastAPI()
app.include_router(router, prefix="/api/review")
app.add_exception_handler(ValueError, handle_internal_http_error(400))
@pytest.fixture
def app():
"""Create FastAPI app for testing"""
test_app = fastapi.FastAPI()
test_app.include_router(router, prefix="/api/review")
test_app.add_exception_handler(ValueError, handle_internal_http_error(400))
return test_app
client = fastapi.testclient.TestClient(app)
@pytest.fixture
def client(app, mock_jwt_user):
"""Create test client with auth overrides"""
@pytest.fixture(autouse=True)
def setup_app_auth(mock_jwt_user):
"""Setup auth overrides for all tests in this module"""
from autogpt_libs.auth.jwt_utils import get_jwt_payload
app.dependency_overrides[get_jwt_payload] = mock_jwt_user["get_jwt_payload"]
with fastapi.testclient.TestClient(app) as test_client:
yield test_client
yield
app.dependency_overrides.clear()
@@ -47,7 +37,6 @@ def sample_pending_review(test_user_id: str) -> PendingHumanReviewModel:
"""Create a sample pending review for testing"""
return PendingHumanReviewModel(
node_exec_id="test_node_123",
node_id="test_node_def_456",
user_id=test_user_id,
graph_exec_id="test_graph_exec_456",
graph_id="test_graph_789",
@@ -66,7 +55,6 @@ def sample_pending_review(test_user_id: str) -> PendingHumanReviewModel:
def test_get_pending_reviews_empty(
client: fastapi.testclient.TestClient,
mocker: pytest_mock.MockerFixture,
snapshot: Snapshot,
test_user_id: str,
@@ -85,7 +73,6 @@ def test_get_pending_reviews_empty(
def test_get_pending_reviews_with_data(
client: fastapi.testclient.TestClient,
mocker: pytest_mock.MockerFixture,
sample_pending_review: PendingHumanReviewModel,
snapshot: Snapshot,
@@ -108,7 +95,6 @@ def test_get_pending_reviews_with_data(
def test_get_pending_reviews_for_execution_success(
client: fastapi.testclient.TestClient,
mocker: pytest_mock.MockerFixture,
sample_pending_review: PendingHumanReviewModel,
snapshot: Snapshot,
@@ -137,7 +123,6 @@ def test_get_pending_reviews_for_execution_success(
def test_get_pending_reviews_for_execution_not_available(
client: fastapi.testclient.TestClient,
mocker: pytest_mock.MockerFixture,
) -> None:
"""Test access denied when user doesn't own the execution"""
@@ -153,7 +138,6 @@ def test_get_pending_reviews_for_execution_not_available(
def test_process_review_action_approve_success(
client: fastapi.testclient.TestClient,
mocker: pytest_mock.MockerFixture,
sample_pending_review: PendingHumanReviewModel,
test_user_id: str,
@@ -161,12 +145,6 @@ def test_process_review_action_approve_success(
"""Test successful review approval"""
# Mock the route functions
# Mock get_pending_review_by_node_exec_id (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
)
mock_get_reviews_for_user.return_value = sample_pending_review
mock_get_reviews_for_execution = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_for_execution"
)
@@ -195,14 +173,6 @@ def test_process_review_action_approve_success(
)
mock_process_all_reviews.return_value = {"test_node_123": approved_review}
# Mock get_graph_execution_meta to return execution in REVIEW status
mock_get_graph_exec = mocker.patch(
"backend.api.features.executions.review.routes.get_graph_execution_meta"
)
mock_graph_exec_meta = mocker.Mock()
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
mock_get_graph_exec.return_value = mock_graph_exec_meta
mock_has_pending = mocker.patch(
"backend.api.features.executions.review.routes.has_pending_reviews_for_graph_exec"
)
@@ -232,7 +202,6 @@ def test_process_review_action_approve_success(
def test_process_review_action_reject_success(
client: fastapi.testclient.TestClient,
mocker: pytest_mock.MockerFixture,
sample_pending_review: PendingHumanReviewModel,
test_user_id: str,
@@ -240,20 +209,6 @@ def test_process_review_action_reject_success(
"""Test successful review rejection"""
# Mock the route functions
# Mock get_pending_review_by_node_exec_id (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
)
mock_get_reviews_for_user.return_value = sample_pending_review
# Mock get_graph_execution_meta to return execution in REVIEW status
mock_get_graph_exec = mocker.patch(
"backend.api.features.executions.review.routes.get_graph_execution_meta"
)
mock_graph_exec_meta = mocker.Mock()
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
mock_get_graph_exec.return_value = mock_graph_exec_meta
mock_get_reviews_for_execution = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_for_execution"
)
@@ -307,7 +262,6 @@ def test_process_review_action_reject_success(
def test_process_review_action_mixed_success(
client: fastapi.testclient.TestClient,
mocker: pytest_mock.MockerFixture,
sample_pending_review: PendingHumanReviewModel,
test_user_id: str,
@@ -334,12 +288,6 @@ def test_process_review_action_mixed_success(
# Mock the route functions
# Mock get_pending_review_by_node_exec_id (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
)
mock_get_reviews_for_user.return_value = sample_pending_review
mock_get_reviews_for_execution = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_for_execution"
)
@@ -389,14 +337,6 @@ def test_process_review_action_mixed_success(
"test_node_456": rejected_review,
}
# Mock get_graph_execution_meta to return execution in REVIEW status
mock_get_graph_exec = mocker.patch(
"backend.api.features.executions.review.routes.get_graph_execution_meta"
)
mock_graph_exec_meta = mocker.Mock()
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
mock_get_graph_exec.return_value = mock_graph_exec_meta
mock_has_pending = mocker.patch(
"backend.api.features.executions.review.routes.has_pending_reviews_for_graph_exec"
)
@@ -429,7 +369,6 @@ def test_process_review_action_mixed_success(
def test_process_review_action_empty_request(
client: fastapi.testclient.TestClient,
mocker: pytest_mock.MockerFixture,
test_user_id: str,
) -> None:
@@ -447,45 +386,10 @@ def test_process_review_action_empty_request(
def test_process_review_action_review_not_found(
client: fastapi.testclient.TestClient,
mocker: pytest_mock.MockerFixture,
sample_pending_review: PendingHumanReviewModel,
test_user_id: str,
) -> None:
"""Test error when review is not found"""
# Create a review with the nonexistent_node ID so the route can find the graph_exec_id
nonexistent_review = PendingHumanReviewModel(
node_exec_id="nonexistent_node",
user_id=test_user_id,
graph_exec_id="test_graph_exec_456",
graph_id="test_graph_789",
graph_version=1,
payload={"data": "test"},
instructions="Review",
editable=True,
status=ReviewStatus.WAITING,
review_message=None,
was_edited=None,
processed=False,
created_at=FIXED_NOW,
updated_at=None,
reviewed_at=None,
)
# Mock get_pending_review_by_node_exec_id (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
)
mock_get_reviews_for_user.return_value = nonexistent_review
# Mock get_graph_execution_meta to return execution in REVIEW status
mock_get_graph_exec = mocker.patch(
"backend.api.features.executions.review.routes.get_graph_execution_meta"
)
mock_graph_exec_meta = mocker.Mock()
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
mock_get_graph_exec.return_value = mock_graph_exec_meta
# Mock the functions that extract graph execution ID from the request
mock_get_reviews_for_execution = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_for_execution"
@@ -518,26 +422,11 @@ def test_process_review_action_review_not_found(
def test_process_review_action_partial_failure(
client: fastapi.testclient.TestClient,
mocker: pytest_mock.MockerFixture,
sample_pending_review: PendingHumanReviewModel,
test_user_id: str,
) -> None:
"""Test handling of partial failures in review processing"""
# Mock get_pending_review_by_node_exec_id (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
)
mock_get_reviews_for_user.return_value = sample_pending_review
# Mock get_graph_execution_meta to return execution in REVIEW status
mock_get_graph_exec = mocker.patch(
"backend.api.features.executions.review.routes.get_graph_execution_meta"
)
mock_graph_exec_meta = mocker.Mock()
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
mock_get_graph_exec.return_value = mock_graph_exec_meta
# Mock the route functions
mock_get_reviews_for_execution = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_for_execution"
@@ -567,50 +456,16 @@ def test_process_review_action_partial_failure(
def test_process_review_action_invalid_node_exec_id(
client: fastapi.testclient.TestClient,
mocker: pytest_mock.MockerFixture,
sample_pending_review: PendingHumanReviewModel,
test_user_id: str,
) -> None:
"""Test failure when trying to process review with invalid node execution ID"""
# Create a review with the invalid-node-format ID so the route can find the graph_exec_id
invalid_review = PendingHumanReviewModel(
node_exec_id="invalid-node-format",
user_id=test_user_id,
graph_exec_id="test_graph_exec_456",
graph_id="test_graph_789",
graph_version=1,
payload={"data": "test"},
instructions="Review",
editable=True,
status=ReviewStatus.WAITING,
review_message=None,
was_edited=None,
processed=False,
created_at=FIXED_NOW,
updated_at=None,
reviewed_at=None,
)
# Mock get_pending_review_by_node_exec_id (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
)
mock_get_reviews_for_user.return_value = invalid_review
# Mock get_graph_execution_meta to return execution in REVIEW status
mock_get_graph_exec = mocker.patch(
"backend.api.features.executions.review.routes.get_graph_execution_meta"
)
mock_graph_exec_meta = mocker.Mock()
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
mock_get_graph_exec.return_value = mock_graph_exec_meta
# Mock the route functions
mock_get_reviews_for_execution = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_for_execution"
)
mock_get_reviews_for_execution.return_value = [invalid_review]
mock_get_reviews_for_execution.return_value = [sample_pending_review]
# Mock validation failure - this should return 400, not 500
mock_process_all_reviews = mocker.patch(
@@ -635,595 +490,3 @@ def test_process_review_action_invalid_node_exec_id(
# Should be a 400 Bad Request, not 500 Internal Server Error
assert response.status_code == 400
assert "Invalid node execution ID format" in response.json()["detail"]
def test_process_review_action_auto_approve_creates_auto_approval_records(
client: fastapi.testclient.TestClient,
mocker: pytest_mock.MockerFixture,
sample_pending_review: PendingHumanReviewModel,
test_user_id: str,
) -> None:
"""Test that auto_approve_future_actions flag creates auto-approval records"""
# Mock get_pending_review_by_node_exec_id (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
)
mock_get_reviews_for_user.return_value = sample_pending_review
# Mock process_all_reviews
mock_process_all_reviews = mocker.patch(
"backend.api.features.executions.review.routes.process_all_reviews_for_execution"
)
approved_review = PendingHumanReviewModel(
node_exec_id="test_node_123",
user_id=test_user_id,
graph_exec_id="test_graph_exec_456",
graph_id="test_graph_789",
graph_version=1,
payload={"data": "test payload"},
instructions="Please review",
editable=True,
status=ReviewStatus.APPROVED,
review_message="Approved",
was_edited=False,
processed=False,
created_at=FIXED_NOW,
updated_at=FIXED_NOW,
reviewed_at=FIXED_NOW,
)
mock_process_all_reviews.return_value = {"test_node_123": approved_review}
# Mock get_node_execution to return node_id
mock_get_node_execution = mocker.patch(
"backend.api.features.executions.review.routes.get_node_execution"
)
mock_node_exec = mocker.Mock(spec=NodeExecutionResult)
mock_node_exec.node_id = "test_node_def_456"
mock_get_node_execution.return_value = mock_node_exec
# Mock create_auto_approval_record
mock_create_auto_approval = mocker.patch(
"backend.api.features.executions.review.routes.create_auto_approval_record"
)
# Mock get_graph_execution_meta to return execution in REVIEW status
mock_get_graph_exec = mocker.patch(
"backend.api.features.executions.review.routes.get_graph_execution_meta"
)
mock_graph_exec_meta = mocker.Mock()
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
mock_get_graph_exec.return_value = mock_graph_exec_meta
# Mock has_pending_reviews_for_graph_exec
mock_has_pending = mocker.patch(
"backend.api.features.executions.review.routes.has_pending_reviews_for_graph_exec"
)
mock_has_pending.return_value = False
# Mock get_graph_settings to return custom settings
mock_get_settings = mocker.patch(
"backend.api.features.executions.review.routes.get_graph_settings"
)
mock_get_settings.return_value = GraphSettings(
human_in_the_loop_safe_mode=True,
sensitive_action_safe_mode=True,
)
# Mock get_user_by_id to prevent database access
mock_get_user = mocker.patch(
"backend.api.features.executions.review.routes.get_user_by_id"
)
mock_user = mocker.Mock()
mock_user.timezone = "UTC"
mock_get_user.return_value = mock_user
# Mock add_graph_execution
mock_add_execution = mocker.patch(
"backend.api.features.executions.review.routes.add_graph_execution"
)
request_data = {
"reviews": [
{
"node_exec_id": "test_node_123",
"approved": True,
"message": "Approved",
"auto_approve_future": True,
}
],
}
response = client.post("/api/review/action", json=request_data)
assert response.status_code == 200
# Verify process_all_reviews_for_execution was called (without auto_approve param)
mock_process_all_reviews.assert_called_once()
# Verify create_auto_approval_record was called for the approved review
mock_create_auto_approval.assert_called_once_with(
user_id=test_user_id,
graph_exec_id="test_graph_exec_456",
graph_id="test_graph_789",
graph_version=1,
node_id="test_node_def_456",
payload={"data": "test payload"},
)
# Verify get_graph_settings was called with correct parameters
mock_get_settings.assert_called_once_with(
user_id=test_user_id, graph_id="test_graph_789"
)
# Verify add_graph_execution was called with proper ExecutionContext
mock_add_execution.assert_called_once()
call_kwargs = mock_add_execution.call_args.kwargs
execution_context = call_kwargs["execution_context"]
assert isinstance(execution_context, ExecutionContext)
assert execution_context.human_in_the_loop_safe_mode is True
assert execution_context.sensitive_action_safe_mode is True
def test_process_review_action_without_auto_approve_still_loads_settings(
client: fastapi.testclient.TestClient,
mocker: pytest_mock.MockerFixture,
sample_pending_review: PendingHumanReviewModel,
test_user_id: str,
) -> None:
"""Test that execution context is created with settings even without auto-approve"""
# Mock get_pending_review_by_node_exec_id (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
)
mock_get_reviews_for_user.return_value = sample_pending_review
# Mock process_all_reviews
mock_process_all_reviews = mocker.patch(
"backend.api.features.executions.review.routes.process_all_reviews_for_execution"
)
approved_review = PendingHumanReviewModel(
node_exec_id="test_node_123",
user_id=test_user_id,
graph_exec_id="test_graph_exec_456",
graph_id="test_graph_789",
graph_version=1,
payload={"data": "test payload"},
instructions="Please review",
editable=True,
status=ReviewStatus.APPROVED,
review_message="Approved",
was_edited=False,
processed=False,
created_at=FIXED_NOW,
updated_at=FIXED_NOW,
reviewed_at=FIXED_NOW,
)
mock_process_all_reviews.return_value = {"test_node_123": approved_review}
# Mock create_auto_approval_record - should NOT be called when auto_approve is False
mock_create_auto_approval = mocker.patch(
"backend.api.features.executions.review.routes.create_auto_approval_record"
)
# Mock get_graph_execution_meta to return execution in REVIEW status
mock_get_graph_exec = mocker.patch(
"backend.api.features.executions.review.routes.get_graph_execution_meta"
)
mock_graph_exec_meta = mocker.Mock()
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
mock_get_graph_exec.return_value = mock_graph_exec_meta
# Mock has_pending_reviews_for_graph_exec
mock_has_pending = mocker.patch(
"backend.api.features.executions.review.routes.has_pending_reviews_for_graph_exec"
)
mock_has_pending.return_value = False
# Mock get_graph_settings with sensitive_action_safe_mode enabled
mock_get_settings = mocker.patch(
"backend.api.features.executions.review.routes.get_graph_settings"
)
mock_get_settings.return_value = GraphSettings(
human_in_the_loop_safe_mode=False,
sensitive_action_safe_mode=True,
)
# Mock get_user_by_id to prevent database access
mock_get_user = mocker.patch(
"backend.api.features.executions.review.routes.get_user_by_id"
)
mock_user = mocker.Mock()
mock_user.timezone = "UTC"
mock_get_user.return_value = mock_user
# Mock add_graph_execution
mock_add_execution = mocker.patch(
"backend.api.features.executions.review.routes.add_graph_execution"
)
# Request WITHOUT auto_approve_future (defaults to False)
request_data = {
"reviews": [
{
"node_exec_id": "test_node_123",
"approved": True,
"message": "Approved",
# auto_approve_future defaults to False
}
],
}
response = client.post("/api/review/action", json=request_data)
assert response.status_code == 200
# Verify process_all_reviews_for_execution was called
mock_process_all_reviews.assert_called_once()
# Verify create_auto_approval_record was NOT called (auto_approve_future=False)
mock_create_auto_approval.assert_not_called()
# Verify settings were loaded
mock_get_settings.assert_called_once()
# Verify ExecutionContext has proper settings
mock_add_execution.assert_called_once()
call_kwargs = mock_add_execution.call_args.kwargs
execution_context = call_kwargs["execution_context"]
assert isinstance(execution_context, ExecutionContext)
assert execution_context.human_in_the_loop_safe_mode is False
assert execution_context.sensitive_action_safe_mode is True
def test_process_review_action_auto_approve_only_applies_to_approved_reviews(
client: fastapi.testclient.TestClient,
mocker: pytest_mock.MockerFixture,
test_user_id: str,
) -> None:
"""Test that auto_approve record is created only for approved reviews"""
# Create two reviews - one approved, one rejected
approved_review = PendingHumanReviewModel(
node_exec_id="node_exec_approved",
user_id=test_user_id,
graph_exec_id="test_graph_exec_456",
graph_id="test_graph_789",
graph_version=1,
payload={"data": "approved"},
instructions="Review",
editable=True,
status=ReviewStatus.APPROVED,
review_message=None,
was_edited=False,
processed=False,
created_at=FIXED_NOW,
updated_at=FIXED_NOW,
reviewed_at=FIXED_NOW,
)
rejected_review = PendingHumanReviewModel(
node_exec_id="node_exec_rejected",
user_id=test_user_id,
graph_exec_id="test_graph_exec_456",
graph_id="test_graph_789",
graph_version=1,
payload={"data": "rejected"},
instructions="Review",
editable=True,
status=ReviewStatus.REJECTED,
review_message="Rejected",
was_edited=False,
processed=False,
created_at=FIXED_NOW,
updated_at=FIXED_NOW,
reviewed_at=FIXED_NOW,
)
# Mock get_pending_review_by_node_exec_id (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
)
mock_get_reviews_for_user.return_value = approved_review
# Mock process_all_reviews
mock_process_all_reviews = mocker.patch(
"backend.api.features.executions.review.routes.process_all_reviews_for_execution"
)
mock_process_all_reviews.return_value = {
"node_exec_approved": approved_review,
"node_exec_rejected": rejected_review,
}
# Mock get_node_execution to return node_id (only called for approved review)
mock_get_node_execution = mocker.patch(
"backend.api.features.executions.review.routes.get_node_execution"
)
mock_node_exec = mocker.Mock(spec=NodeExecutionResult)
mock_node_exec.node_id = "test_node_def_approved"
mock_get_node_execution.return_value = mock_node_exec
# Mock create_auto_approval_record
mock_create_auto_approval = mocker.patch(
"backend.api.features.executions.review.routes.create_auto_approval_record"
)
# Mock get_graph_execution_meta to return execution in REVIEW status
mock_get_graph_exec = mocker.patch(
"backend.api.features.executions.review.routes.get_graph_execution_meta"
)
mock_graph_exec_meta = mocker.Mock()
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
mock_get_graph_exec.return_value = mock_graph_exec_meta
# Mock has_pending_reviews_for_graph_exec
mock_has_pending = mocker.patch(
"backend.api.features.executions.review.routes.has_pending_reviews_for_graph_exec"
)
mock_has_pending.return_value = False
# Mock get_graph_settings
mock_get_settings = mocker.patch(
"backend.api.features.executions.review.routes.get_graph_settings"
)
mock_get_settings.return_value = GraphSettings()
# Mock get_user_by_id to prevent database access
mock_get_user = mocker.patch(
"backend.api.features.executions.review.routes.get_user_by_id"
)
mock_user = mocker.Mock()
mock_user.timezone = "UTC"
mock_get_user.return_value = mock_user
# Mock add_graph_execution
mock_add_execution = mocker.patch(
"backend.api.features.executions.review.routes.add_graph_execution"
)
request_data = {
"reviews": [
{
"node_exec_id": "node_exec_approved",
"approved": True,
"auto_approve_future": True,
},
{
"node_exec_id": "node_exec_rejected",
"approved": False,
"auto_approve_future": True, # Should be ignored since rejected
},
],
}
response = client.post("/api/review/action", json=request_data)
assert response.status_code == 200
# Verify process_all_reviews_for_execution was called
mock_process_all_reviews.assert_called_once()
# Verify create_auto_approval_record was called ONLY for the approved review
# (not for the rejected one)
mock_create_auto_approval.assert_called_once_with(
user_id=test_user_id,
graph_exec_id="test_graph_exec_456",
graph_id="test_graph_789",
graph_version=1,
node_id="test_node_def_approved",
payload={"data": "approved"},
)
# Verify get_node_execution was called only for approved review
mock_get_node_execution.assert_called_once_with("node_exec_approved")
# Verify ExecutionContext was created (auto-approval is now DB-based)
call_kwargs = mock_add_execution.call_args.kwargs
execution_context = call_kwargs["execution_context"]
assert isinstance(execution_context, ExecutionContext)
def test_process_review_action_per_review_auto_approve_granularity(
client: fastapi.testclient.TestClient,
mocker: pytest_mock.MockerFixture,
sample_pending_review: PendingHumanReviewModel,
test_user_id: str,
) -> None:
"""Test that auto-approval can be set per-review (granular control)"""
# Mock get_pending_review_by_node_exec_id - return different reviews based on node_exec_id
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
)
# Create a mapping of node_exec_id to review
review_map = {
"node_1_auto": PendingHumanReviewModel(
node_exec_id="node_1_auto",
user_id=test_user_id,
graph_exec_id="test_graph_exec",
graph_id="test_graph",
graph_version=1,
payload={"data": "node1"},
instructions="Review 1",
editable=True,
status=ReviewStatus.WAITING,
review_message=None,
was_edited=False,
processed=False,
created_at=FIXED_NOW,
),
"node_2_manual": PendingHumanReviewModel(
node_exec_id="node_2_manual",
user_id=test_user_id,
graph_exec_id="test_graph_exec",
graph_id="test_graph",
graph_version=1,
payload={"data": "node2"},
instructions="Review 2",
editable=True,
status=ReviewStatus.WAITING,
review_message=None,
was_edited=False,
processed=False,
created_at=FIXED_NOW,
),
"node_3_auto": PendingHumanReviewModel(
node_exec_id="node_3_auto",
user_id=test_user_id,
graph_exec_id="test_graph_exec",
graph_id="test_graph",
graph_version=1,
payload={"data": "node3"},
instructions="Review 3",
editable=True,
status=ReviewStatus.WAITING,
review_message=None,
was_edited=False,
processed=False,
created_at=FIXED_NOW,
),
}
# Use side_effect to return different reviews based on node_exec_id parameter
def mock_get_review_by_id(node_exec_id: str, _user_id: str):
return review_map.get(node_exec_id)
mock_get_reviews_for_user.side_effect = mock_get_review_by_id
# Mock process_all_reviews - return 3 approved reviews
mock_process_all_reviews = mocker.patch(
"backend.api.features.executions.review.routes.process_all_reviews_for_execution"
)
mock_process_all_reviews.return_value = {
"node_1_auto": PendingHumanReviewModel(
node_exec_id="node_1_auto",
user_id=test_user_id,
graph_exec_id="test_graph_exec",
graph_id="test_graph",
graph_version=1,
payload={"data": "node1"},
instructions="Review 1",
editable=True,
status=ReviewStatus.APPROVED,
review_message=None,
was_edited=False,
processed=False,
created_at=FIXED_NOW,
updated_at=FIXED_NOW,
reviewed_at=FIXED_NOW,
),
"node_2_manual": PendingHumanReviewModel(
node_exec_id="node_2_manual",
user_id=test_user_id,
graph_exec_id="test_graph_exec",
graph_id="test_graph",
graph_version=1,
payload={"data": "node2"},
instructions="Review 2",
editable=True,
status=ReviewStatus.APPROVED,
review_message=None,
was_edited=False,
processed=False,
created_at=FIXED_NOW,
updated_at=FIXED_NOW,
reviewed_at=FIXED_NOW,
),
"node_3_auto": PendingHumanReviewModel(
node_exec_id="node_3_auto",
user_id=test_user_id,
graph_exec_id="test_graph_exec",
graph_id="test_graph",
graph_version=1,
payload={"data": "node3"},
instructions="Review 3",
editable=True,
status=ReviewStatus.APPROVED,
review_message=None,
was_edited=False,
processed=False,
created_at=FIXED_NOW,
updated_at=FIXED_NOW,
reviewed_at=FIXED_NOW,
),
}
# Mock get_node_execution
mock_get_node_execution = mocker.patch(
"backend.api.features.executions.review.routes.get_node_execution"
)
def mock_get_node(node_exec_id: str):
mock_node = mocker.Mock(spec=NodeExecutionResult)
mock_node.node_id = f"node_def_{node_exec_id}"
return mock_node
mock_get_node_execution.side_effect = mock_get_node
# Mock create_auto_approval_record
mock_create_auto_approval = mocker.patch(
"backend.api.features.executions.review.routes.create_auto_approval_record"
)
# Mock get_graph_execution_meta
mock_get_graph_exec = mocker.patch(
"backend.api.features.executions.review.routes.get_graph_execution_meta"
)
mock_graph_exec_meta = mocker.Mock()
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
mock_get_graph_exec.return_value = mock_graph_exec_meta
# Mock has_pending_reviews_for_graph_exec
mock_has_pending = mocker.patch(
"backend.api.features.executions.review.routes.has_pending_reviews_for_graph_exec"
)
mock_has_pending.return_value = False
# Mock settings and execution
mock_get_settings = mocker.patch(
"backend.api.features.executions.review.routes.get_graph_settings"
)
mock_get_settings.return_value = GraphSettings(
human_in_the_loop_safe_mode=False, sensitive_action_safe_mode=False
)
mocker.patch("backend.api.features.executions.review.routes.add_graph_execution")
mocker.patch("backend.api.features.executions.review.routes.get_user_by_id")
# Request with granular auto-approval:
# - node_1_auto: auto_approve_future=True
# - node_2_manual: auto_approve_future=False (explicit)
# - node_3_auto: auto_approve_future=True
request_data = {
"reviews": [
{
"node_exec_id": "node_1_auto",
"approved": True,
"auto_approve_future": True,
},
{
"node_exec_id": "node_2_manual",
"approved": True,
"auto_approve_future": False, # Don't auto-approve this one
},
{
"node_exec_id": "node_3_auto",
"approved": True,
"auto_approve_future": True,
},
],
}
response = client.post("/api/review/action", json=request_data)
assert response.status_code == 200
# Verify create_auto_approval_record was called ONLY for reviews with auto_approve_future=True
assert mock_create_auto_approval.call_count == 2
# Check that it was called for node_1 and node_3, but NOT node_2
call_args_list = [call.kwargs for call in mock_create_auto_approval.call_args_list]
node_ids_with_auto_approval = [args["node_id"] for args in call_args_list]
assert "node_def_node_1_auto" in node_ids_with_auto_approval
assert "node_def_node_3_auto" in node_ids_with_auto_approval
assert "node_def_node_2_manual" not in node_ids_with_auto_approval

View File

@@ -5,23 +5,13 @@ import autogpt_libs.auth as autogpt_auth_lib
from fastapi import APIRouter, HTTPException, Query, Security, status
from prisma.enums import ReviewStatus
from backend.data.execution import (
ExecutionContext,
ExecutionStatus,
get_graph_execution_meta,
get_node_execution,
)
from backend.data.graph import get_graph_settings
from backend.data.execution import get_graph_execution_meta
from backend.data.human_review import (
create_auto_approval_record,
get_pending_review_by_node_exec_id,
get_pending_reviews_for_execution,
get_pending_reviews_for_user,
has_pending_reviews_for_graph_exec,
process_all_reviews_for_execution,
)
from backend.data.model import USER_TIMEZONE_NOT_SET
from backend.data.user import get_user_by_id
from backend.executor.utils import add_graph_execution
from .model import PendingHumanReviewModel, ReviewRequest, ReviewResponse
@@ -137,80 +127,17 @@ async def process_review_action(
detail="At least one review must be provided",
)
# Get graph execution ID by looking up all requested reviews
# Use direct lookup to avoid pagination issues (can't miss reviews beyond first page)
# Also validate that all reviews belong to the same execution
matching_review = None
graph_exec_ids: set[str] = set()
for node_exec_id in all_request_node_ids:
review = await get_pending_review_by_node_exec_id(node_exec_id, user_id)
if not review:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"No pending review found for node execution {node_exec_id}",
)
if matching_review is None:
matching_review = review
graph_exec_ids.add(review.graph_exec_id)
# Ensure all reviews belong to the same execution
if len(graph_exec_ids) > 1:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="All reviews in a single request must belong to the same execution.",
)
# Safety check (matching_review should never be None here due to validation above)
if matching_review is None:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal error: No matching review found despite validation",
)
graph_exec_id = matching_review.graph_exec_id
# Validate execution status before processing reviews
graph_exec_meta = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec_meta:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph execution #{graph_exec_id} not found",
)
# Only allow processing reviews if execution is paused for review
# or incomplete (partial execution with some reviews already processed)
if graph_exec_meta.status not in (
ExecutionStatus.REVIEW,
ExecutionStatus.INCOMPLETE,
):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Cannot process reviews while execution status is {graph_exec_meta.status}. "
f"Reviews can only be processed when execution is paused (REVIEW status). "
f"Current status: {graph_exec_meta.status}",
)
# Build review decisions map and track which reviews requested auto-approval
# Auto-approved reviews use original data (no modifications allowed)
# Build review decisions map
review_decisions = {}
auto_approve_requests = {} # Map node_exec_id -> auto_approve_future flag
for review in request.reviews:
review_status = (
ReviewStatus.APPROVED if review.approved else ReviewStatus.REJECTED
)
# If this review requested auto-approval, don't allow data modifications
reviewed_data = None if review.auto_approve_future else review.reviewed_data
review_decisions[review.node_exec_id] = (
review_status,
reviewed_data,
review.reviewed_data,
review.message,
)
auto_approve_requests[review.node_exec_id] = review.auto_approve_future
# Process all reviews
updated_reviews = await process_all_reviews_for_execution(
@@ -218,32 +145,6 @@ async def process_review_action(
review_decisions=review_decisions,
)
# Create auto-approval records for approved reviews that requested it
# Note: Processing sequentially to avoid event loop issues in tests
for node_exec_id, review_result in updated_reviews.items():
# Only create auto-approval if:
# 1. This review was approved
# 2. The review requested auto-approval
if review_result.status == ReviewStatus.APPROVED and auto_approve_requests.get(
node_exec_id, False
):
try:
node_exec = await get_node_execution(node_exec_id)
if node_exec:
await create_auto_approval_record(
user_id=user_id,
graph_exec_id=review_result.graph_exec_id,
graph_id=review_result.graph_id,
graph_version=review_result.graph_version,
node_id=node_exec.node_id,
payload=review_result.payload,
)
except Exception as e:
logger.error(
f"Failed to create auto-approval record for {node_exec_id}",
exc_info=e,
)
# Count results
approved_count = sum(
1
@@ -256,37 +157,22 @@ async def process_review_action(
if review.status == ReviewStatus.REJECTED
)
# Resume execution only if ALL pending reviews for this execution have been processed
# Resume execution if we processed some reviews
if updated_reviews:
# Get graph execution ID from any processed review
first_review = next(iter(updated_reviews.values()))
graph_exec_id = first_review.graph_exec_id
# Check if any pending reviews remain for this execution
still_has_pending = await has_pending_reviews_for_graph_exec(graph_exec_id)
if not still_has_pending:
# Get the graph_id from any processed review
first_review = next(iter(updated_reviews.values()))
# Resume execution
try:
# Fetch user and settings to build complete execution context
user = await get_user_by_id(user_id)
settings = await get_graph_settings(
user_id=user_id, graph_id=first_review.graph_id
)
# Preserve user's timezone preference when resuming execution
user_timezone = (
user.timezone if user.timezone != USER_TIMEZONE_NOT_SET else "UTC"
)
execution_context = ExecutionContext(
human_in_the_loop_safe_mode=settings.human_in_the_loop_safe_mode,
sensitive_action_safe_mode=settings.sensitive_action_safe_mode,
user_timezone=user_timezone,
)
await add_graph_execution(
graph_id=first_review.graph_id,
user_id=user_id,
graph_exec_id=graph_exec_id,
execution_context=execution_context,
)
logger.info(f"Resumed execution {graph_exec_id}")
except Exception as e:

View File

@@ -1552,7 +1552,7 @@ async def review_store_submission(
# Generate embedding for approved listing (blocking - admin operation)
# Inside transaction: if embedding fails, entire transaction rolls back
embedding_success = await ensure_embedding(
await ensure_embedding(
version_id=store_listing_version_id,
name=store_listing_version.name,
description=store_listing_version.description,
@@ -1560,12 +1560,6 @@ async def review_store_submission(
categories=store_listing_version.categories or [],
tx=tx,
)
if not embedding_success:
raise ValueError(
f"Failed to generate embedding for listing {store_listing_version_id}. "
"This is likely due to OpenAI API being unavailable. "
"Please try again later or contact support if the issue persists."
)
await prisma.models.StoreListing.prisma(tx).update(
where={"id": store_listing_version.StoreListing.id},

View File

@@ -6,7 +6,6 @@ Handles generation and storage of OpenAI embeddings for all content types
"""
import asyncio
import contextvars
import logging
import time
from typing import Any
@@ -22,11 +21,6 @@ from backend.util.json import dumps
logger = logging.getLogger(__name__)
# Context variable to track errors logged in the current task/operation
# This prevents spamming the same error multiple times when processing batches
_logged_errors: contextvars.ContextVar[set[str]] = contextvars.ContextVar(
"_logged_errors"
)
# OpenAI embedding model configuration
EMBEDDING_MODEL = "text-embedding-3-small"
@@ -37,42 +31,6 @@ EMBEDDING_DIM = 1536
EMBEDDING_MAX_TOKENS = 8191
def log_once_per_task(error_key: str, log_fn, message: str, **kwargs) -> bool:
"""
Log an error/warning only once per task/operation to avoid log spam.
Uses contextvars to track what has been logged in the current async context.
Useful when processing batches where the same error might occur for many items.
Args:
error_key: Unique identifier for this error type
log_fn: Logger function to call (e.g., logger.error, logger.warning)
message: Message to log
**kwargs: Additional arguments to pass to log_fn
Returns:
True if the message was logged, False if it was suppressed (already logged)
Example:
log_once_per_task("missing_api_key", logger.error, "API key not set")
"""
# Get current logged errors, or create a new set if this is the first call in this context
logged = _logged_errors.get(None)
if logged is None:
logged = set()
_logged_errors.set(logged)
if error_key in logged:
return False
# Log the message with a note that it will only appear once
log_fn(f"{message} (This message will only be shown once per task.)", **kwargs)
# Mark as logged
logged.add(error_key)
return True
def build_searchable_text(
name: str,
description: str,
@@ -105,53 +63,42 @@ def build_searchable_text(
return " ".join(parts)
async def generate_embedding(text: str) -> list[float] | None:
async def generate_embedding(text: str) -> list[float]:
"""
Generate embedding for text using OpenAI API.
Returns None if embedding generation fails.
Fail-fast: no retries to maintain consistency with approval flow.
Raises exceptions on failure - caller should handle.
"""
try:
client = get_openai_client()
if not client:
log_once_per_task(
"openai_api_key_missing",
logger.error,
"openai_internal_api_key not set, cannot generate embeddings",
)
return None
client = get_openai_client()
if not client:
raise RuntimeError("openai_internal_api_key not set, cannot generate embedding")
# Truncate text to token limit using tiktoken
# Character-based truncation is insufficient because token ratios vary by content type
enc = encoding_for_model(EMBEDDING_MODEL)
tokens = enc.encode(text)
if len(tokens) > EMBEDDING_MAX_TOKENS:
tokens = tokens[:EMBEDDING_MAX_TOKENS]
truncated_text = enc.decode(tokens)
logger.info(
f"Truncated text from {len(enc.encode(text))} to {len(tokens)} tokens"
)
else:
truncated_text = text
start_time = time.time()
response = await client.embeddings.create(
model=EMBEDDING_MODEL,
input=truncated_text,
)
latency_ms = (time.time() - start_time) * 1000
embedding = response.data[0].embedding
# Truncate text to token limit using tiktoken
# Character-based truncation is insufficient because token ratios vary by content type
enc = encoding_for_model(EMBEDDING_MODEL)
tokens = enc.encode(text)
if len(tokens) > EMBEDDING_MAX_TOKENS:
tokens = tokens[:EMBEDDING_MAX_TOKENS]
truncated_text = enc.decode(tokens)
logger.info(
f"Generated embedding: {len(embedding)} dims, "
f"{len(tokens)} tokens, {latency_ms:.0f}ms"
f"Truncated text from {len(enc.encode(text))} to {len(tokens)} tokens"
)
return embedding
else:
truncated_text = text
except Exception as e:
logger.error(f"Failed to generate embedding: {e}")
return None
start_time = time.time()
response = await client.embeddings.create(
model=EMBEDDING_MODEL,
input=truncated_text,
)
latency_ms = (time.time() - start_time) * 1000
embedding = response.data[0].embedding
logger.info(
f"Generated embedding: {len(embedding)} dims, "
f"{len(tokens)} tokens, {latency_ms:.0f}ms"
)
return embedding
async def store_embedding(
@@ -190,48 +137,45 @@ async def store_content_embedding(
New function for unified content embedding storage.
Uses raw SQL since Prisma doesn't natively support pgvector.
Raises exceptions on failure - caller should handle.
"""
try:
client = tx if tx else prisma.get_client()
client = tx if tx else prisma.get_client()
# Convert embedding to PostgreSQL vector format
embedding_str = embedding_to_vector_string(embedding)
metadata_json = dumps(metadata or {})
# Convert embedding to PostgreSQL vector format
embedding_str = embedding_to_vector_string(embedding)
metadata_json = dumps(metadata or {})
# Upsert the embedding
# WHERE clause in DO UPDATE prevents PostgreSQL 15 bug with NULLS NOT DISTINCT
# Use unqualified ::vector - pgvector is in search_path on all environments
await execute_raw_with_schema(
"""
INSERT INTO {schema_prefix}"UnifiedContentEmbedding" (
"id", "contentType", "contentId", "userId", "embedding", "searchableText", "metadata", "createdAt", "updatedAt"
)
VALUES (gen_random_uuid()::text, $1::{schema_prefix}"ContentType", $2, $3, $4::vector, $5, $6::jsonb, NOW(), NOW())
ON CONFLICT ("contentType", "contentId", "userId")
DO UPDATE SET
"embedding" = $4::vector,
"searchableText" = $5,
"metadata" = $6::jsonb,
"updatedAt" = NOW()
WHERE {schema_prefix}"UnifiedContentEmbedding"."contentType" = $1::{schema_prefix}"ContentType"
AND {schema_prefix}"UnifiedContentEmbedding"."contentId" = $2
AND ({schema_prefix}"UnifiedContentEmbedding"."userId" = $3 OR ($3 IS NULL AND {schema_prefix}"UnifiedContentEmbedding"."userId" IS NULL))
""",
content_type,
content_id,
user_id,
embedding_str,
searchable_text,
metadata_json,
client=client,
# Upsert the embedding
# WHERE clause in DO UPDATE prevents PostgreSQL 15 bug with NULLS NOT DISTINCT
# Use unqualified ::vector - pgvector is in search_path on all environments
await execute_raw_with_schema(
"""
INSERT INTO {schema_prefix}"UnifiedContentEmbedding" (
"id", "contentType", "contentId", "userId", "embedding", "searchableText", "metadata", "createdAt", "updatedAt"
)
VALUES (gen_random_uuid()::text, $1::{schema_prefix}"ContentType", $2, $3, $4::vector, $5, $6::jsonb, NOW(), NOW())
ON CONFLICT ("contentType", "contentId", "userId")
DO UPDATE SET
"embedding" = $4::vector,
"searchableText" = $5,
"metadata" = $6::jsonb,
"updatedAt" = NOW()
WHERE {schema_prefix}"UnifiedContentEmbedding"."contentType" = $1::{schema_prefix}"ContentType"
AND {schema_prefix}"UnifiedContentEmbedding"."contentId" = $2
AND ({schema_prefix}"UnifiedContentEmbedding"."userId" = $3 OR ($3 IS NULL AND {schema_prefix}"UnifiedContentEmbedding"."userId" IS NULL))
""",
content_type,
content_id,
user_id,
embedding_str,
searchable_text,
metadata_json,
client=client,
)
logger.info(f"Stored embedding for {content_type}:{content_id}")
return True
except Exception as e:
logger.error(f"Failed to store embedding for {content_type}:{content_id}: {e}")
return False
logger.info(f"Stored embedding for {content_type}:{content_id}")
return True
async def get_embedding(version_id: str) -> dict[str, Any] | None:
@@ -263,34 +207,31 @@ async def get_content_embedding(
New function for unified content embedding retrieval.
Returns dict with contentType, contentId, embedding, timestamps or None if not found.
Raises exceptions on failure - caller should handle.
"""
try:
result = await query_raw_with_schema(
"""
SELECT
"contentType",
"contentId",
"userId",
"embedding"::text as "embedding",
"searchableText",
"metadata",
"createdAt",
"updatedAt"
FROM {schema_prefix}"UnifiedContentEmbedding"
WHERE "contentType" = $1::{schema_prefix}"ContentType" AND "contentId" = $2 AND ("userId" = $3 OR ($3 IS NULL AND "userId" IS NULL))
""",
content_type,
content_id,
user_id,
)
result = await query_raw_with_schema(
"""
SELECT
"contentType",
"contentId",
"userId",
"embedding"::text as "embedding",
"searchableText",
"metadata",
"createdAt",
"updatedAt"
FROM {schema_prefix}"UnifiedContentEmbedding"
WHERE "contentType" = $1::{schema_prefix}"ContentType" AND "contentId" = $2 AND ("userId" = $3 OR ($3 IS NULL AND "userId" IS NULL))
""",
content_type,
content_id,
user_id,
)
if result and len(result) > 0:
return result[0]
return None
except Exception as e:
logger.error(f"Failed to get embedding for {content_type}:{content_id}: {e}")
return None
if result and len(result) > 0:
return result[0]
return None
async def ensure_embedding(
@@ -318,51 +259,38 @@ async def ensure_embedding(
tx: Optional transaction client
Returns:
True if embedding exists/was created, False on failure
True if embedding exists/was created
Raises exceptions on failure - caller should handle.
"""
try:
# Check if embedding already exists
if not force:
existing = await get_embedding(version_id)
if existing and existing.get("embedding"):
logger.debug(f"Embedding for version {version_id} already exists")
return True
# Check if embedding already exists
if not force:
existing = await get_embedding(version_id)
if existing and existing.get("embedding"):
logger.debug(f"Embedding for version {version_id} already exists")
return True
# Build searchable text for embedding
searchable_text = build_searchable_text(
name, description, sub_heading, categories
)
# Build searchable text for embedding
searchable_text = build_searchable_text(name, description, sub_heading, categories)
# Generate new embedding
embedding = await generate_embedding(searchable_text)
if embedding is None:
log_once_per_task(
"embedding_generation_failed",
logger.warning,
"Could not generate embeddings (missing API key or service unavailable). "
"Embedding generation is disabled for this task.",
)
return False
# Generate new embedding
embedding = await generate_embedding(searchable_text)
# Store the embedding with metadata using new function
metadata = {
"name": name,
"subHeading": sub_heading,
"categories": categories,
}
return await store_content_embedding(
content_type=ContentType.STORE_AGENT,
content_id=version_id,
embedding=embedding,
searchable_text=searchable_text,
metadata=metadata,
user_id=None, # Store agents are public
tx=tx,
)
except Exception as e:
logger.error(f"Failed to ensure embedding for version {version_id}: {e}")
return False
# Store the embedding with metadata using new function
metadata = {
"name": name,
"subHeading": sub_heading,
"categories": categories,
}
return await store_content_embedding(
content_type=ContentType.STORE_AGENT,
content_id=version_id,
embedding=embedding,
searchable_text=searchable_text,
metadata=metadata,
user_id=None, # Store agents are public
tx=tx,
)
async def delete_embedding(version_id: str) -> bool:
@@ -572,6 +500,24 @@ async def backfill_all_content_types(batch_size: int = 10) -> dict[str, Any]:
success = sum(1 for result in results if result is True)
failed = len(results) - success
# Aggregate unique errors to avoid Sentry spam
if failed > 0:
# Group errors by type and message
error_summary: dict[str, int] = {}
for result in results:
if isinstance(result, Exception):
error_key = f"{type(result).__name__}: {str(result)}"
error_summary[error_key] = error_summary.get(error_key, 0) + 1
# Log aggregated error summary
error_details = ", ".join(
f"{error} ({count}x)" for error, count in error_summary.items()
)
logger.error(
f"{content_type.value}: {failed}/{len(results)} embeddings failed. "
f"Errors: {error_details}"
)
results_by_type[content_type.value] = {
"processed": len(missing_items),
"success": success,
@@ -608,11 +554,12 @@ async def backfill_all_content_types(batch_size: int = 10) -> dict[str, Any]:
}
async def embed_query(query: str) -> list[float] | None:
async def embed_query(query: str) -> list[float]:
"""
Generate embedding for a search query.
Same as generate_embedding but with clearer intent.
Raises exceptions on failure - caller should handle.
"""
return await generate_embedding(query)
@@ -645,43 +592,30 @@ async def ensure_content_embedding(
tx: Optional transaction client
Returns:
True if embedding exists/was created, False on failure
True if embedding exists/was created
Raises exceptions on failure - caller should handle.
"""
try:
# Check if embedding already exists
if not force:
existing = await get_content_embedding(content_type, content_id, user_id)
if existing and existing.get("embedding"):
logger.debug(
f"Embedding for {content_type}:{content_id} already exists"
)
return True
# Check if embedding already exists
if not force:
existing = await get_content_embedding(content_type, content_id, user_id)
if existing and existing.get("embedding"):
logger.debug(f"Embedding for {content_type}:{content_id} already exists")
return True
# Generate new embedding
embedding = await generate_embedding(searchable_text)
if embedding is None:
log_once_per_task(
"embedding_generation_failed",
logger.warning,
"Could not generate embeddings (missing API key or service unavailable). "
"Embedding generation is disabled for this task.",
)
return False
# Generate new embedding
embedding = await generate_embedding(searchable_text)
# Store the embedding
return await store_content_embedding(
content_type=content_type,
content_id=content_id,
embedding=embedding,
searchable_text=searchable_text,
metadata=metadata or {},
user_id=user_id,
tx=tx,
)
except Exception as e:
logger.error(f"Failed to ensure embedding for {content_type}:{content_id}: {e}")
return False
# Store the embedding
return await store_content_embedding(
content_type=content_type,
content_id=content_id,
embedding=embedding,
searchable_text=searchable_text,
metadata=metadata or {},
user_id=user_id,
tx=tx,
)
async def cleanup_orphaned_embeddings() -> dict[str, Any]:
@@ -908,9 +842,8 @@ async def semantic_search(
limit = 100
# Generate query embedding
query_embedding = await embed_query(query)
if query_embedding is not None:
try:
query_embedding = await embed_query(query)
# Semantic search with embeddings
embedding_str = embedding_to_vector_string(query_embedding)
@@ -961,24 +894,21 @@ async def semantic_search(
"""
)
try:
results = await query_raw_with_schema(sql, *params)
return [
{
"content_id": row["content_id"],
"content_type": row["content_type"],
"searchable_text": row["searchable_text"],
"metadata": row["metadata"],
"similarity": float(row["similarity"]),
}
for row in results
]
except Exception as e:
logger.error(f"Semantic search failed: {e}")
# Fall through to lexical search below
results = await query_raw_with_schema(sql, *params)
return [
{
"content_id": row["content_id"],
"content_type": row["content_type"],
"searchable_text": row["searchable_text"],
"metadata": row["metadata"],
"similarity": float(row["similarity"]),
}
for row in results
]
except Exception as e:
logger.warning(f"Semantic search failed, falling back to lexical search: {e}")
# Fallback to lexical search if embeddings unavailable
logger.warning("Falling back to lexical search (embeddings unavailable)")
params_lexical: list[Any] = [limit]
user_filter = ""

View File

@@ -298,17 +298,16 @@ async def test_schema_handling_error_cases():
mock_client.execute_raw.side_effect = Exception("Database error")
mock_get_client.return_value = mock_client
result = await embeddings.store_content_embedding(
content_type=ContentType.STORE_AGENT,
content_id="test-id",
embedding=[0.1] * EMBEDDING_DIM,
searchable_text="test",
metadata=None,
user_id=None,
)
# Should return False on error, not raise
assert result is False
# Should raise exception on error
with pytest.raises(Exception, match="Database error"):
await embeddings.store_content_embedding(
content_type=ContentType.STORE_AGENT,
content_id="test-id",
embedding=[0.1] * EMBEDDING_DIM,
searchable_text="test",
metadata=None,
user_id=None,
)
if __name__ == "__main__":

View File

@@ -80,9 +80,8 @@ async def test_generate_embedding_no_api_key():
) as mock_get_client:
mock_get_client.return_value = None
result = await embeddings.generate_embedding("test text")
assert result is None
with pytest.raises(RuntimeError, match="openai_internal_api_key not set"):
await embeddings.generate_embedding("test text")
@pytest.mark.asyncio(loop_scope="session")
@@ -97,9 +96,8 @@ async def test_generate_embedding_api_error():
) as mock_get_client:
mock_get_client.return_value = mock_client
result = await embeddings.generate_embedding("test text")
assert result is None
with pytest.raises(Exception, match="API Error"):
await embeddings.generate_embedding("test text")
@pytest.mark.asyncio(loop_scope="session")
@@ -173,11 +171,10 @@ async def test_store_embedding_database_error(mocker):
embedding = [0.1, 0.2, 0.3]
result = await embeddings.store_embedding(
version_id="test-version-id", embedding=embedding, tx=mock_client
)
assert result is False
with pytest.raises(Exception, match="Database error"):
await embeddings.store_embedding(
version_id="test-version-id", embedding=embedding, tx=mock_client
)
@pytest.mark.asyncio(loop_scope="session")
@@ -277,17 +274,16 @@ async def test_ensure_embedding_create_new(mock_get, mock_store, mock_generate):
async def test_ensure_embedding_generation_fails(mock_get, mock_generate):
"""Test ensure_embedding when generation fails."""
mock_get.return_value = None
mock_generate.return_value = None
mock_generate.side_effect = Exception("Generation failed")
result = await embeddings.ensure_embedding(
version_id="test-id",
name="Test",
description="Test description",
sub_heading="Test heading",
categories=["test"],
)
assert result is False
with pytest.raises(Exception, match="Generation failed"):
await embeddings.ensure_embedding(
version_id="test-id",
name="Test",
description="Test description",
sub_heading="Test heading",
categories=["test"],
)
@pytest.mark.asyncio(loop_scope="session")

View File

@@ -186,13 +186,12 @@ async def unified_hybrid_search(
offset = (page - 1) * page_size
# Generate query embedding
query_embedding = await embed_query(query)
# Graceful degradation if embedding unavailable
if query_embedding is None or not query_embedding:
# Generate query embedding with graceful degradation
try:
query_embedding = await embed_query(query)
except Exception as e:
logger.warning(
"Failed to generate query embedding - falling back to lexical-only search. "
f"Failed to generate query embedding - falling back to lexical-only search: {e}. "
"Check that openai_internal_api_key is configured and OpenAI API is accessible."
)
query_embedding = [0.0] * EMBEDDING_DIM
@@ -464,13 +463,12 @@ async def hybrid_search(
offset = (page - 1) * page_size
# Generate query embedding
query_embedding = await embed_query(query)
# Graceful degradation
if query_embedding is None or not query_embedding:
# Generate query embedding with graceful degradation
try:
query_embedding = await embed_query(query)
except Exception as e:
logger.warning(
"Failed to generate query embedding - falling back to lexical-only search."
f"Failed to generate query embedding - falling back to lexical-only search: {e}"
)
query_embedding = [0.0] * EMBEDDING_DIM
total_non_semantic = (

View File

@@ -172,8 +172,8 @@ async def test_hybrid_search_without_embeddings():
with patch(
"backend.api.features.store.hybrid_search.query_raw_with_schema"
) as mock_query:
# Simulate embedding failure
mock_embed.return_value = None
# Simulate embedding failure by raising exception
mock_embed.side_effect = Exception("Embedding generation failed")
mock_query.return_value = mock_results
# Should NOT raise - graceful degradation
@@ -613,7 +613,9 @@ async def test_unified_hybrid_search_graceful_degradation():
"backend.api.features.store.hybrid_search.embed_query"
) as mock_embed:
mock_query.return_value = mock_results
mock_embed.return_value = None # Embedding failure
mock_embed.side_effect = Exception(
"Embedding generation failed"
) # Embedding failure
# Should NOT raise - graceful degradation
results, total = await unified_hybrid_search(

View File

@@ -116,7 +116,6 @@ class PrintToConsoleBlock(Block):
input_schema=PrintToConsoleBlock.Input,
output_schema=PrintToConsoleBlock.Output,
test_input={"text": "Hello, World!"},
is_sensitive_action=True,
test_output=[
("output", "Hello, World!"),
("status", "printed"),

View File

@@ -9,7 +9,7 @@ from typing import Any, Optional
from prisma.enums import ReviewStatus
from pydantic import BaseModel
from backend.data.execution import ExecutionStatus
from backend.data.execution import ExecutionContext, ExecutionStatus
from backend.data.human_review import ReviewResult
from backend.executor.manager import async_update_node_execution_status
from backend.util.clients import get_database_manager_async_client
@@ -28,11 +28,6 @@ class ReviewDecision(BaseModel):
class HITLReviewHelper:
"""Helper class for Human-In-The-Loop review operations."""
@staticmethod
async def check_approval(**kwargs) -> Optional[ReviewResult]:
"""Check if there's an existing approval for this node execution."""
return await get_database_manager_async_client().check_approval(**kwargs)
@staticmethod
async def get_or_create_human_review(**kwargs) -> Optional[ReviewResult]:
"""Create or retrieve a human review from the database."""
@@ -60,11 +55,11 @@ class HITLReviewHelper:
async def _handle_review_request(
input_data: Any,
user_id: str,
node_id: str,
node_exec_id: str,
graph_exec_id: str,
graph_id: str,
graph_version: int,
execution_context: ExecutionContext,
block_name: str = "Block",
editable: bool = False,
) -> Optional[ReviewResult]:
@@ -74,11 +69,11 @@ class HITLReviewHelper:
Args:
input_data: The input data to be reviewed
user_id: ID of the user requesting the review
node_id: ID of the node in the graph definition
node_exec_id: ID of the node execution
graph_exec_id: ID of the graph execution
graph_id: ID of the graph
graph_version: Version of the graph
execution_context: Current execution context
block_name: Name of the block requesting review
editable: Whether the reviewer can edit the data
@@ -88,41 +83,15 @@ class HITLReviewHelper:
Raises:
Exception: If review creation or status update fails
"""
# Note: Safe mode checks (human_in_the_loop_safe_mode, sensitive_action_safe_mode)
# are handled by the caller:
# - HITL blocks check human_in_the_loop_safe_mode in their run() method
# - Sensitive action blocks check sensitive_action_safe_mode in is_block_exec_need_review()
# This function only handles checking for existing approvals.
# Check if this node has already been approved (normal or auto-approval)
if approval_result := await HITLReviewHelper.check_approval(
node_exec_id=node_exec_id,
graph_exec_id=graph_exec_id,
node_id=node_id,
user_id=user_id,
input_data=input_data,
):
# Skip review if safe mode is disabled - return auto-approved result
if not execution_context.human_in_the_loop_safe_mode:
logger.info(
f"Block {block_name} skipping review for node {node_exec_id} - "
f"found existing approval"
)
# Return a new ReviewResult with the current node_exec_id but approved status
# For auto-approvals, always use current input_data
# For normal approvals, use approval_result.data unless it's None
is_auto_approval = approval_result.node_exec_id != node_exec_id
approved_data = (
input_data
if is_auto_approval
else (
approval_result.data
if approval_result.data is not None
else input_data
)
f"Block {block_name} skipping review for node {node_exec_id} - safe mode disabled"
)
return ReviewResult(
data=approved_data,
data=input_data,
status=ReviewStatus.APPROVED,
message=approval_result.message,
message="Auto-approved (safe mode disabled)",
processed=True,
node_exec_id=node_exec_id,
)
@@ -160,11 +129,11 @@ class HITLReviewHelper:
async def handle_review_decision(
input_data: Any,
user_id: str,
node_id: str,
node_exec_id: str,
graph_exec_id: str,
graph_id: str,
graph_version: int,
execution_context: ExecutionContext,
block_name: str = "Block",
editable: bool = False,
) -> Optional[ReviewDecision]:
@@ -174,11 +143,11 @@ class HITLReviewHelper:
Args:
input_data: The input data to be reviewed
user_id: ID of the user requesting the review
node_id: ID of the node in the graph definition
node_exec_id: ID of the node execution
graph_exec_id: ID of the graph execution
graph_id: ID of the graph
graph_version: Version of the graph
execution_context: Current execution context
block_name: Name of the block requesting review
editable: Whether the reviewer can edit the data
@@ -189,11 +158,11 @@ class HITLReviewHelper:
review_result = await HITLReviewHelper._handle_review_request(
input_data=input_data,
user_id=user_id,
node_id=node_id,
node_exec_id=node_exec_id,
graph_exec_id=graph_exec_id,
graph_id=graph_id,
graph_version=graph_version,
execution_context=execution_context,
block_name=block_name,
editable=editable,
)

View File

@@ -97,7 +97,6 @@ class HumanInTheLoopBlock(Block):
input_data: Input,
*,
user_id: str,
node_id: str,
node_exec_id: str,
graph_exec_id: str,
graph_id: str,
@@ -116,11 +115,11 @@ class HumanInTheLoopBlock(Block):
decision = await self.handle_review_decision(
input_data=input_data.data,
user_id=user_id,
node_id=node_id,
node_exec_id=node_exec_id,
graph_exec_id=graph_exec_id,
graph_id=graph_id,
graph_version=graph_version,
execution_context=execution_context,
block_name=self.name,
editable=input_data.editable,
)

View File

@@ -441,7 +441,6 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
static_output: bool = False,
block_type: BlockType = BlockType.STANDARD,
webhook_config: Optional[BlockWebhookConfig | BlockManualWebhookConfig] = None,
is_sensitive_action: bool = False,
):
"""
Initialize the block with the given schema.
@@ -474,8 +473,8 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
self.static_output = static_output
self.block_type = block_type
self.webhook_config = webhook_config
self.is_sensitive_action = is_sensitive_action
self.execution_stats: NodeExecutionStats = NodeExecutionStats()
self.is_sensitive_action: bool = False
if self.webhook_config:
if isinstance(self.webhook_config, BlockWebhookConfig):
@@ -623,7 +622,6 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
input_data: BlockInput,
*,
user_id: str,
node_id: str,
node_exec_id: str,
graph_exec_id: str,
graph_id: str,
@@ -650,11 +648,11 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
decision = await HITLReviewHelper.handle_review_decision(
input_data=input_data,
user_id=user_id,
node_id=node_id,
node_exec_id=node_exec_id,
graph_exec_id=graph_exec_id,
graph_id=graph_id,
graph_version=graph_version,
execution_context=execution_context,
block_name=self.name,
editable=True,
)

View File

@@ -6,7 +6,7 @@ Handles all database operations for pending human reviews.
import asyncio
import logging
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Optional
from typing import Optional
from prisma.enums import ReviewStatus
from prisma.models import PendingHumanReview
@@ -17,12 +17,8 @@ from backend.api.features.executions.review.model import (
PendingHumanReviewModel,
SafeJsonData,
)
from backend.data.execution import get_graph_execution_meta
from backend.util.json import SafeJson
if TYPE_CHECKING:
pass
logger = logging.getLogger(__name__)
@@ -36,125 +32,6 @@ class ReviewResult(BaseModel):
node_exec_id: str
def get_auto_approve_key(graph_exec_id: str, node_id: str) -> str:
"""Generate the special nodeExecId key for auto-approval records."""
return f"auto_approve_{graph_exec_id}_{node_id}"
async def check_approval(
node_exec_id: str,
graph_exec_id: str,
node_id: str,
user_id: str,
input_data: SafeJsonData | None = None,
) -> Optional[ReviewResult]:
"""
Check if there's an existing approval for this node execution.
Checks both:
1. Normal approval by node_exec_id (previous run of the same node execution)
2. Auto-approval by special key pattern "auto_approve_{graph_exec_id}_{node_id}"
Args:
node_exec_id: ID of the node execution
graph_exec_id: ID of the graph execution
node_id: ID of the node definition (not execution)
user_id: ID of the user (for data isolation)
input_data: Current input data (used for auto-approvals to avoid stale data)
Returns:
ReviewResult if approval found (either normal or auto), None otherwise
"""
auto_approve_key = get_auto_approve_key(graph_exec_id, node_id)
# Check for either normal approval or auto-approval in a single query
existing_review = await PendingHumanReview.prisma().find_first(
where={
"OR": [
{"nodeExecId": node_exec_id},
{"nodeExecId": auto_approve_key},
],
"status": ReviewStatus.APPROVED,
"userId": user_id,
},
)
if existing_review:
is_auto_approval = existing_review.nodeExecId == auto_approve_key
logger.info(
f"Found {'auto-' if is_auto_approval else ''}approval for node {node_id} "
f"(exec: {node_exec_id}) in execution {graph_exec_id}"
)
# For auto-approvals, use current input_data to avoid replaying stale payload
# For normal approvals, use the stored payload (which may have been edited)
return ReviewResult(
data=(
input_data
if is_auto_approval and input_data is not None
else existing_review.payload
),
status=ReviewStatus.APPROVED,
message=(
"Auto-approved (user approved all future actions for this node)"
if is_auto_approval
else existing_review.reviewMessage or ""
),
processed=True,
node_exec_id=existing_review.nodeExecId,
)
return None
async def create_auto_approval_record(
user_id: str,
graph_exec_id: str,
graph_id: str,
graph_version: int,
node_id: str,
payload: SafeJsonData,
) -> None:
"""
Create an auto-approval record for a node in this execution.
This is stored as a PendingHumanReview with a special nodeExecId pattern
and status=APPROVED, so future executions of the same node can skip review.
Raises:
ValueError: If the graph execution doesn't belong to the user
"""
# Validate that the graph execution belongs to this user (defense in depth)
graph_exec = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec:
raise ValueError(
f"Graph execution {graph_exec_id} not found or doesn't belong to user {user_id}"
)
auto_approve_key = get_auto_approve_key(graph_exec_id, node_id)
await PendingHumanReview.prisma().upsert(
where={"nodeExecId": auto_approve_key},
data={
"create": {
"nodeExecId": auto_approve_key,
"userId": user_id,
"graphExecId": graph_exec_id,
"graphId": graph_id,
"graphVersion": graph_version,
"payload": SafeJson(payload),
"instructions": "Auto-approval record",
"editable": False,
"status": ReviewStatus.APPROVED,
"processed": True,
"reviewedAt": datetime.now(timezone.utc),
},
"update": {}, # Already exists, no update needed
},
)
async def get_or_create_human_review(
user_id: str,
node_exec_id: str,
@@ -231,38 +108,6 @@ async def get_or_create_human_review(
)
async def get_pending_review_by_node_exec_id(
node_exec_id: str, user_id: str
) -> Optional["PendingHumanReviewModel"]:
"""
Get a pending review by its node execution ID.
Args:
node_exec_id: The node execution ID to look up
user_id: User ID for authorization (only returns if review belongs to this user)
Returns:
The pending review if found and belongs to user, None otherwise
"""
review = await PendingHumanReview.prisma().find_first(
where={
"nodeExecId": node_exec_id,
"userId": user_id,
"status": ReviewStatus.WAITING,
}
)
if not review:
return None
# Local import to avoid event loop conflicts in tests
from backend.data.execution import get_node_execution
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
return PendingHumanReviewModel.from_db(review, node_id=node_id)
async def has_pending_reviews_for_graph_exec(graph_exec_id: str) -> bool:
"""
Check if a graph execution has any pending reviews.
@@ -292,11 +137,8 @@ async def get_pending_reviews_for_user(
page_size: Number of reviews per page
Returns:
List of pending review models with node_id included
List of pending review models
"""
# Local import to avoid event loop conflicts in tests
from backend.data.execution import get_node_execution
# Calculate offset for pagination
offset = (page - 1) * page_size
@@ -307,14 +149,7 @@ async def get_pending_reviews_for_user(
take=page_size,
)
# Fetch node_id for each review from NodeExecution
result = []
for review in reviews:
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
result.append(PendingHumanReviewModel.from_db(review, node_id=node_id))
return result
return [PendingHumanReviewModel.from_db(review) for review in reviews]
async def get_pending_reviews_for_execution(
@@ -328,11 +163,8 @@ async def get_pending_reviews_for_execution(
user_id: User ID for security validation
Returns:
List of pending review models with node_id included
List of pending review models
"""
# Local import to avoid event loop conflicts in tests
from backend.data.execution import get_node_execution
reviews = await PendingHumanReview.prisma().find_many(
where={
"userId": user_id,
@@ -342,14 +174,7 @@ async def get_pending_reviews_for_execution(
order={"createdAt": "asc"},
)
# Fetch node_id for each review from NodeExecution
result = []
for review in reviews:
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
result.append(PendingHumanReviewModel.from_db(review, node_id=node_id))
return result
return [PendingHumanReviewModel.from_db(review) for review in reviews]
async def process_all_reviews_for_execution(
@@ -419,19 +244,11 @@ async def process_all_reviews_for_execution(
# Note: Execution resumption is now handled at the API layer after ALL reviews
# for an execution are processed (both approved and rejected)
# Fetch node_id for each review and return as dict for easy access
# Local import to avoid event loop conflicts in tests
from backend.data.execution import get_node_execution
result = {}
for review in updated_reviews:
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
result[review.nodeExecId] = PendingHumanReviewModel.from_db(
review, node_id=node_id
)
return result
# Return as dict for easy access
return {
review.nodeExecId: PendingHumanReviewModel.from_db(review)
for review in updated_reviews
}
async def update_review_processed_status(node_exec_id: str, processed: bool) -> None:
@@ -439,44 +256,3 @@ async def update_review_processed_status(node_exec_id: str, processed: bool) ->
await PendingHumanReview.prisma().update(
where={"nodeExecId": node_exec_id}, data={"processed": processed}
)
async def cancel_pending_reviews_for_execution(graph_exec_id: str, user_id: str) -> int:
"""
Cancel all pending reviews for a graph execution (e.g., when execution is stopped).
Marks all WAITING reviews as REJECTED with a message indicating the execution was stopped.
Args:
graph_exec_id: The graph execution ID
user_id: User ID who owns the execution (for security validation)
Returns:
Number of reviews cancelled
Raises:
ValueError: If the graph execution doesn't belong to the user
"""
# Validate user ownership before cancelling reviews
graph_exec = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec:
raise ValueError(
f"Graph execution {graph_exec_id} not found or doesn't belong to user {user_id}"
)
result = await PendingHumanReview.prisma().update_many(
where={
"graphExecId": graph_exec_id,
"userId": user_id,
"status": ReviewStatus.WAITING,
},
data={
"status": ReviewStatus.REJECTED,
"reviewMessage": "Execution was stopped by user",
"processed": True,
"reviewedAt": datetime.now(timezone.utc),
},
)
return result

View File

@@ -46,8 +46,8 @@ async def test_get_or_create_human_review_new(
sample_db_review.status = ReviewStatus.WAITING
sample_db_review.processed = False
mock_prisma = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_prisma.return_value.upsert = AsyncMock(return_value=sample_db_review)
mock_upsert = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_upsert.return_value.upsert = AsyncMock(return_value=sample_db_review)
result = await get_or_create_human_review(
user_id="test-user-123",
@@ -75,8 +75,8 @@ async def test_get_or_create_human_review_approved(
sample_db_review.processed = False
sample_db_review.reviewMessage = "Looks good"
mock_prisma = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_prisma.return_value.upsert = AsyncMock(return_value=sample_db_review)
mock_upsert = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_upsert.return_value.upsert = AsyncMock(return_value=sample_db_review)
result = await get_or_create_human_review(
user_id="test-user-123",
@@ -131,19 +131,10 @@ async def test_get_pending_reviews_for_user(
mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_find_many.return_value.find_many = AsyncMock(return_value=[sample_db_review])
# Mock get_node_execution to return node with node_id (async function)
mock_node_exec = Mock()
mock_node_exec.node_id = "test_node_def_789"
mocker.patch(
"backend.data.execution.get_node_execution",
new=AsyncMock(return_value=mock_node_exec),
)
result = await get_pending_reviews_for_user("test_user", page=2, page_size=10)
assert len(result) == 1
assert result[0].node_exec_id == "test_node_123"
assert result[0].node_id == "test_node_def_789"
# Verify pagination parameters
call_args = mock_find_many.return_value.find_many.call_args
@@ -160,21 +151,12 @@ async def test_get_pending_reviews_for_execution(
mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_find_many.return_value.find_many = AsyncMock(return_value=[sample_db_review])
# Mock get_node_execution to return node with node_id (async function)
mock_node_exec = Mock()
mock_node_exec.node_id = "test_node_def_789"
mocker.patch(
"backend.data.execution.get_node_execution",
new=AsyncMock(return_value=mock_node_exec),
)
result = await get_pending_reviews_for_execution(
"test_graph_exec_456", "test-user-123"
)
assert len(result) == 1
assert result[0].graph_exec_id == "test_graph_exec_456"
assert result[0].node_id == "test_node_def_789"
# Verify it filters by execution and user
call_args = mock_find_many.return_value.find_many.call_args
@@ -219,14 +201,6 @@ async def test_process_all_reviews_for_execution_success(
new=AsyncMock(return_value=[updated_review]),
)
# Mock get_node_execution to return node with node_id (async function)
mock_node_exec = Mock()
mock_node_exec.node_id = "test_node_def_789"
mocker.patch(
"backend.data.execution.get_node_execution",
new=AsyncMock(return_value=mock_node_exec),
)
result = await process_all_reviews_for_execution(
user_id="test-user-123",
review_decisions={
@@ -237,7 +211,6 @@ async def test_process_all_reviews_for_execution_success(
assert len(result) == 1
assert "test_node_123" in result
assert result["test_node_123"].status == ReviewStatus.APPROVED
assert result["test_node_123"].node_id == "test_node_def_789"
@pytest.mark.asyncio
@@ -356,14 +329,6 @@ async def test_process_all_reviews_mixed_approval_rejection(
new=AsyncMock(return_value=[approved_review, rejected_review]),
)
# Mock get_node_execution to return node with node_id (async function)
mock_node_exec = Mock()
mock_node_exec.node_id = "test_node_def_789"
mocker.patch(
"backend.data.execution.get_node_execution",
new=AsyncMock(return_value=mock_node_exec),
)
result = await process_all_reviews_for_execution(
user_id="test-user-123",
review_decisions={
@@ -375,5 +340,3 @@ async def test_process_all_reviews_mixed_approval_rejection(
assert len(result) == 2
assert "test_node_123" in result
assert "test_node_456" in result
assert result["test_node_123"].node_id == "test_node_def_789"
assert result["test_node_456"].node_id == "test_node_def_789"

View File

@@ -50,8 +50,6 @@ from backend.data.graph import (
validate_graph_execution_permissions,
)
from backend.data.human_review import (
cancel_pending_reviews_for_execution,
check_approval,
get_or_create_human_review,
has_pending_reviews_for_graph_exec,
update_review_processed_status,
@@ -192,8 +190,6 @@ class DatabaseManager(AppService):
get_user_notification_preference = _(get_user_notification_preference)
# Human In The Loop
cancel_pending_reviews_for_execution = _(cancel_pending_reviews_for_execution)
check_approval = _(check_approval)
get_or_create_human_review = _(get_or_create_human_review)
has_pending_reviews_for_graph_exec = _(has_pending_reviews_for_graph_exec)
update_review_processed_status = _(update_review_processed_status)
@@ -317,8 +313,6 @@ class DatabaseManagerAsyncClient(AppServiceClient):
set_execution_kv_data = d.set_execution_kv_data
# Human In The Loop
cancel_pending_reviews_for_execution = d.cancel_pending_reviews_for_execution
check_approval = d.check_approval
get_or_create_human_review = d.get_or_create_human_review
update_review_processed_status = d.update_review_processed_status

View File

@@ -10,7 +10,6 @@ from pydantic import BaseModel, JsonValue, ValidationError
from backend.data import execution as execution_db
from backend.data import graph as graph_db
from backend.data import human_review as human_review_db
from backend.data import onboarding as onboarding_db
from backend.data import user as user_db
from backend.data.block import (
@@ -750,27 +749,9 @@ async def stop_graph_execution(
if graph_exec.status in [
ExecutionStatus.QUEUED,
ExecutionStatus.INCOMPLETE,
ExecutionStatus.REVIEW,
]:
# If the graph is queued/incomplete/paused for review, terminate immediately
# No need to wait for executor since it's not actively running
# If graph is in REVIEW status, clean up pending reviews before terminating
if graph_exec.status == ExecutionStatus.REVIEW:
# Use human_review_db if Prisma connected, else database manager
review_db = (
human_review_db
if prisma.is_connected()
else get_database_manager_async_client()
)
# Mark all pending reviews as rejected/cancelled
cancelled_count = await review_db.cancel_pending_reviews_for_execution(
graph_exec_id, user_id
)
logger.info(
f"Cancelled {cancelled_count} pending review(s) for stopped execution {graph_exec_id}"
)
# If the graph is still on the queue, we can prevent them from being executed
# by setting the status to TERMINATED.
graph_exec.status = ExecutionStatus.TERMINATED
await asyncio.gather(
@@ -906,28 +887,9 @@ async def add_graph_execution(
nodes_to_skip=nodes_to_skip,
execution_context=execution_context,
)
logger.info(f"Queueing execution {graph_exec.id}")
# Update execution status to QUEUED BEFORE publishing to prevent race condition
# where two concurrent requests could both publish the same execution
updated_exec = await edb.update_graph_execution_stats(
graph_exec_id=graph_exec.id,
status=ExecutionStatus.QUEUED,
)
# Verify the status update succeeded (prevents duplicate queueing in race conditions)
# If another request already updated the status, this execution will not be QUEUED
if not updated_exec or updated_exec.status != ExecutionStatus.QUEUED:
logger.warning(
f"Skipping queue publish for execution {graph_exec.id} - "
f"status update failed or execution already queued by another request"
)
return graph_exec
graph_exec.status = ExecutionStatus.QUEUED
logger.info(f"Publishing execution {graph_exec.id} to execution queue")
# Publish to execution queue for executor to pick up
# This happens AFTER status update to ensure only one request publishes
exec_queue = await get_async_execution_queue()
await exec_queue.publish_message(
routing_key=GRAPH_EXECUTION_ROUTING_KEY,
@@ -935,6 +897,13 @@ async def add_graph_execution(
exchange=GRAPH_EXECUTION_EXCHANGE,
)
logger.info(f"Published execution {graph_exec.id} to RabbitMQ queue")
# Update execution status to QUEUED
graph_exec.status = ExecutionStatus.QUEUED
await edb.update_graph_execution_stats(
graph_exec_id=graph_exec.id,
status=graph_exec.status,
)
except BaseException as e:
err = str(e) or type(e).__name__
if not graph_exec:

View File

@@ -4,7 +4,6 @@ import pytest
from pytest_mock import MockerFixture
from backend.data.dynamic_fields import merge_execution_input, parse_execution_output
from backend.data.execution import ExecutionStatus
from backend.util.mock import MockObject
@@ -347,7 +346,6 @@ async def test_add_graph_execution_is_repeatable(mocker: MockerFixture):
mock_graph_exec = mocker.MagicMock(spec=GraphExecutionWithNodes)
mock_graph_exec.id = "execution-id-123"
mock_graph_exec.node_executions = [] # Add this to avoid AttributeError
mock_graph_exec.status = ExecutionStatus.QUEUED # Required for race condition check
mock_graph_exec.to_graph_execution_entry.return_value = mocker.MagicMock()
# Mock the queue and event bus
@@ -613,7 +611,6 @@ async def test_add_graph_execution_with_nodes_to_skip(mocker: MockerFixture):
mock_graph_exec = mocker.MagicMock(spec=GraphExecutionWithNodes)
mock_graph_exec.id = "execution-id-123"
mock_graph_exec.node_executions = []
mock_graph_exec.status = ExecutionStatus.QUEUED # Required for race condition check
# Track what's passed to to_graph_execution_entry
captured_kwargs = {}
@@ -673,232 +670,3 @@ async def test_add_graph_execution_with_nodes_to_skip(mocker: MockerFixture):
# Verify nodes_to_skip was passed to to_graph_execution_entry
assert "nodes_to_skip" in captured_kwargs
assert captured_kwargs["nodes_to_skip"] == nodes_to_skip
@pytest.mark.asyncio
async def test_stop_graph_execution_in_review_status_cancels_pending_reviews(
mocker: MockerFixture,
):
"""Test that stopping an execution in REVIEW status cancels pending reviews."""
from backend.data.execution import ExecutionStatus, GraphExecutionMeta
from backend.executor.utils import stop_graph_execution
user_id = "test-user"
graph_exec_id = "test-exec-123"
# Mock graph execution in REVIEW status
mock_graph_exec = mocker.MagicMock(spec=GraphExecutionMeta)
mock_graph_exec.id = graph_exec_id
mock_graph_exec.status = ExecutionStatus.REVIEW
# Mock dependencies
mock_get_queue = mocker.patch("backend.executor.utils.get_async_execution_queue")
mock_queue_client = mocker.AsyncMock()
mock_get_queue.return_value = mock_queue_client
mock_prisma = mocker.patch("backend.executor.utils.prisma")
mock_prisma.is_connected.return_value = True
mock_human_review_db = mocker.patch("backend.executor.utils.human_review_db")
mock_human_review_db.cancel_pending_reviews_for_execution = mocker.AsyncMock(
return_value=2 # 2 reviews cancelled
)
mock_execution_db = mocker.patch("backend.executor.utils.execution_db")
mock_execution_db.get_graph_execution_meta = mocker.AsyncMock(
return_value=mock_graph_exec
)
mock_execution_db.update_graph_execution_stats = mocker.AsyncMock()
mock_get_event_bus = mocker.patch(
"backend.executor.utils.get_async_execution_event_bus"
)
mock_event_bus = mocker.MagicMock()
mock_event_bus.publish = mocker.AsyncMock()
mock_get_event_bus.return_value = mock_event_bus
mock_get_child_executions = mocker.patch(
"backend.executor.utils._get_child_executions"
)
mock_get_child_executions.return_value = [] # No children
# Call stop_graph_execution with timeout to allow status check
await stop_graph_execution(
user_id=user_id,
graph_exec_id=graph_exec_id,
wait_timeout=1.0, # Wait to allow status check
cascade=True,
)
# Verify pending reviews were cancelled
mock_human_review_db.cancel_pending_reviews_for_execution.assert_called_once_with(
graph_exec_id, user_id
)
# Verify execution status was updated to TERMINATED
mock_execution_db.update_graph_execution_stats.assert_called_once()
call_kwargs = mock_execution_db.update_graph_execution_stats.call_args[1]
assert call_kwargs["graph_exec_id"] == graph_exec_id
assert call_kwargs["status"] == ExecutionStatus.TERMINATED
@pytest.mark.asyncio
async def test_stop_graph_execution_with_database_manager_when_prisma_disconnected(
mocker: MockerFixture,
):
"""Test that stop uses database manager when Prisma is not connected."""
from backend.data.execution import ExecutionStatus, GraphExecutionMeta
from backend.executor.utils import stop_graph_execution
user_id = "test-user"
graph_exec_id = "test-exec-456"
# Mock graph execution in REVIEW status
mock_graph_exec = mocker.MagicMock(spec=GraphExecutionMeta)
mock_graph_exec.id = graph_exec_id
mock_graph_exec.status = ExecutionStatus.REVIEW
# Mock dependencies
mock_get_queue = mocker.patch("backend.executor.utils.get_async_execution_queue")
mock_queue_client = mocker.AsyncMock()
mock_get_queue.return_value = mock_queue_client
# Prisma is NOT connected
mock_prisma = mocker.patch("backend.executor.utils.prisma")
mock_prisma.is_connected.return_value = False
# Mock database manager client
mock_get_db_manager = mocker.patch(
"backend.executor.utils.get_database_manager_async_client"
)
mock_db_manager = mocker.AsyncMock()
mock_db_manager.get_graph_execution_meta = mocker.AsyncMock(
return_value=mock_graph_exec
)
mock_db_manager.cancel_pending_reviews_for_execution = mocker.AsyncMock(
return_value=3 # 3 reviews cancelled
)
mock_db_manager.update_graph_execution_stats = mocker.AsyncMock()
mock_get_db_manager.return_value = mock_db_manager
mock_get_event_bus = mocker.patch(
"backend.executor.utils.get_async_execution_event_bus"
)
mock_event_bus = mocker.MagicMock()
mock_event_bus.publish = mocker.AsyncMock()
mock_get_event_bus.return_value = mock_event_bus
mock_get_child_executions = mocker.patch(
"backend.executor.utils._get_child_executions"
)
mock_get_child_executions.return_value = [] # No children
# Call stop_graph_execution with timeout
await stop_graph_execution(
user_id=user_id,
graph_exec_id=graph_exec_id,
wait_timeout=1.0,
cascade=True,
)
# Verify database manager was used for cancel_pending_reviews
mock_db_manager.cancel_pending_reviews_for_execution.assert_called_once_with(
graph_exec_id, user_id
)
# Verify execution status was updated via database manager
mock_db_manager.update_graph_execution_stats.assert_called_once()
@pytest.mark.asyncio
async def test_stop_graph_execution_cascades_to_child_with_reviews(
mocker: MockerFixture,
):
"""Test that stopping parent execution cascades to children and cancels their reviews."""
from backend.data.execution import ExecutionStatus, GraphExecutionMeta
from backend.executor.utils import stop_graph_execution
user_id = "test-user"
parent_exec_id = "parent-exec"
child_exec_id = "child-exec"
# Mock parent execution in RUNNING status
mock_parent_exec = mocker.MagicMock(spec=GraphExecutionMeta)
mock_parent_exec.id = parent_exec_id
mock_parent_exec.status = ExecutionStatus.RUNNING
# Mock child execution in REVIEW status
mock_child_exec = mocker.MagicMock(spec=GraphExecutionMeta)
mock_child_exec.id = child_exec_id
mock_child_exec.status = ExecutionStatus.REVIEW
# Mock dependencies
mock_get_queue = mocker.patch("backend.executor.utils.get_async_execution_queue")
mock_queue_client = mocker.AsyncMock()
mock_get_queue.return_value = mock_queue_client
mock_prisma = mocker.patch("backend.executor.utils.prisma")
mock_prisma.is_connected.return_value = True
mock_human_review_db = mocker.patch("backend.executor.utils.human_review_db")
mock_human_review_db.cancel_pending_reviews_for_execution = mocker.AsyncMock(
return_value=1 # 1 child review cancelled
)
# Mock execution_db to return different status based on which execution is queried
mock_execution_db = mocker.patch("backend.executor.utils.execution_db")
# Track call count to simulate status transition
call_count = {"count": 0}
async def get_exec_meta_side_effect(execution_id, user_id):
call_count["count"] += 1
if execution_id == parent_exec_id:
# After a few calls (child processing happens), transition parent to TERMINATED
# This simulates the executor service processing the stop request
if call_count["count"] > 3:
mock_parent_exec.status = ExecutionStatus.TERMINATED
return mock_parent_exec
elif execution_id == child_exec_id:
return mock_child_exec
return None
mock_execution_db.get_graph_execution_meta = mocker.AsyncMock(
side_effect=get_exec_meta_side_effect
)
mock_execution_db.update_graph_execution_stats = mocker.AsyncMock()
mock_get_event_bus = mocker.patch(
"backend.executor.utils.get_async_execution_event_bus"
)
mock_event_bus = mocker.MagicMock()
mock_event_bus.publish = mocker.AsyncMock()
mock_get_event_bus.return_value = mock_event_bus
# Mock _get_child_executions to return the child
mock_get_child_executions = mocker.patch(
"backend.executor.utils._get_child_executions"
)
def get_children_side_effect(parent_id):
if parent_id == parent_exec_id:
return [mock_child_exec]
return []
mock_get_child_executions.side_effect = get_children_side_effect
# Call stop_graph_execution on parent with cascade=True
await stop_graph_execution(
user_id=user_id,
graph_exec_id=parent_exec_id,
wait_timeout=1.0,
cascade=True,
)
# Verify child reviews were cancelled
mock_human_review_db.cancel_pending_reviews_for_execution.assert_called_once_with(
child_exec_id, user_id
)
# Verify both parent and child status updates
assert mock_execution_db.update_graph_execution_stats.call_count >= 1

View File

@@ -1,4 +1,3 @@
import asyncio
import inspect
import logging
import time
@@ -59,11 +58,6 @@ class SpinTestServer:
self.db_api.__exit__(exc_type, exc_val, exc_tb)
self.notif_manager.__exit__(exc_type, exc_val, exc_tb)
# Give services time to fully shut down
# This prevents event loop issues where services haven't fully cleaned up
# before the next test starts
await asyncio.sleep(0.5)
def setup_dependency_overrides(self):
# Override get_user_id for testing
self.agent_server.set_test_dependency_overrides(

View File

@@ -1,7 +0,0 @@
-- Remove NodeExecution foreign key from PendingHumanReview
-- The nodeExecId column remains as the primary key, but we remove the FK constraint
-- to AgentNodeExecution since PendingHumanReview records can persist after node
-- execution records are deleted.
-- Drop foreign key constraint that linked PendingHumanReview.nodeExecId to AgentNodeExecution.id
ALTER TABLE "PendingHumanReview" DROP CONSTRAINT IF EXISTS "PendingHumanReview_nodeExecId_fkey";

View File

@@ -517,6 +517,8 @@ model AgentNodeExecution {
stats Json?
PendingHumanReview PendingHumanReview?
@@index([agentGraphExecutionId, agentNodeId, executionStatus])
@@index([agentNodeId, executionStatus])
@@index([addedTime, queuedTime])
@@ -565,7 +567,6 @@ enum ReviewStatus {
}
// Pending human reviews for Human-in-the-loop blocks
// Also stores auto-approval records with special nodeExecId patterns (e.g., "auto_approve_{graph_exec_id}_{node_id}")
model PendingHumanReview {
nodeExecId String @id
userId String
@@ -584,6 +585,7 @@ model PendingHumanReview {
reviewedAt DateTime?
User User @relation(fields: [userId], references: [id], onDelete: Cascade)
NodeExecution AgentNodeExecution @relation(fields: [nodeExecId], references: [id], onDelete: Cascade)
GraphExecution AgentGraphExecution @relation(fields: [graphExecId], references: [id], onDelete: Cascade)
@@unique([nodeExecId]) // One pending review per node execution

View File

@@ -86,6 +86,7 @@ export function FloatingSafeModeToggle({
const {
currentHITLSafeMode,
showHITLToggle,
isHITLStateUndetermined,
handleHITLToggle,
currentSensitiveActionSafeMode,
showSensitiveActionToggle,
@@ -98,9 +99,16 @@ export function FloatingSafeModeToggle({
return null;
}
const showHITL = showHITLToggle && !isHITLStateUndetermined;
const showSensitive = showSensitiveActionToggle;
if (!showHITL && !showSensitive) {
return null;
}
return (
<div className={cn("fixed z-50 flex flex-col gap-2", className)}>
{showHITLToggle && (
{showHITL && (
<SafeModeButton
isEnabled={currentHITLSafeMode}
label="Human in the loop block approval"
@@ -111,7 +119,7 @@ export function FloatingSafeModeToggle({
fullWidth={fullWidth}
/>
)}
{showSensitiveActionToggle && (
{showSensitive && (
<SafeModeButton
isEnabled={currentSensitiveActionSafeMode}
label="Sensitive actions blocks approval"

View File

@@ -14,10 +14,6 @@ import {
import { Dialog } from "@/components/molecules/Dialog/Dialog";
import { useEffect, useRef, useState } from "react";
import { ScheduleAgentModal } from "../ScheduleAgentModal/ScheduleAgentModal";
import {
AIAgentSafetyPopup,
useAIAgentSafetyPopup,
} from "./components/AIAgentSafetyPopup/AIAgentSafetyPopup";
import { ModalHeader } from "./components/ModalHeader/ModalHeader";
import { ModalRunSection } from "./components/ModalRunSection/ModalRunSection";
import { RunActions } from "./components/RunActions/RunActions";
@@ -87,18 +83,8 @@ export function RunAgentModal({
const [isScheduleModalOpen, setIsScheduleModalOpen] = useState(false);
const [hasOverflow, setHasOverflow] = useState(false);
const [isSafetyPopupOpen, setIsSafetyPopupOpen] = useState(false);
const [pendingRunAction, setPendingRunAction] = useState<(() => void) | null>(
null,
);
const contentRef = useRef<HTMLDivElement>(null);
const { shouldShowPopup, dismissPopup } = useAIAgentSafetyPopup(
agent.id,
agent.has_sensitive_action,
agent.has_human_in_the_loop,
);
const hasAnySetupFields =
Object.keys(agentInputFields || {}).length > 0 ||
Object.keys(agentCredentialsInputFields || {}).length > 0;
@@ -179,24 +165,6 @@ export function RunAgentModal({
onScheduleCreated?.(schedule);
}
function handleRunWithSafetyCheck() {
if (shouldShowPopup) {
setPendingRunAction(() => handleRun);
setIsSafetyPopupOpen(true);
} else {
handleRun();
}
}
function handleSafetyPopupAcknowledge() {
setIsSafetyPopupOpen(false);
dismissPopup();
if (pendingRunAction) {
pendingRunAction();
setPendingRunAction(null);
}
}
return (
<>
<Dialog
@@ -280,7 +248,7 @@ export function RunAgentModal({
)}
<RunActions
defaultRunType={defaultRunType}
onRun={handleRunWithSafetyCheck}
onRun={handleRun}
isExecuting={isExecuting}
isSettingUpTrigger={isSettingUpTrigger}
isRunReady={allRequiredInputsAreSet}
@@ -298,12 +266,6 @@ export function RunAgentModal({
</div>
</Dialog.Content>
</Dialog>
<AIAgentSafetyPopup
agentId={agent.id}
isOpen={isSafetyPopupOpen}
onAcknowledge={handleSafetyPopupAcknowledge}
/>
</>
);
}

View File

@@ -1,108 +0,0 @@
"use client";
import { Button } from "@/components/atoms/Button/Button";
import { Text } from "@/components/atoms/Text/Text";
import { Dialog } from "@/components/molecules/Dialog/Dialog";
import { Key, storage } from "@/services/storage/local-storage";
import { ShieldCheckIcon } from "@phosphor-icons/react";
import { useCallback, useEffect, useState } from "react";
interface Props {
agentId: string;
onAcknowledge: () => void;
isOpen: boolean;
}
export function AIAgentSafetyPopup({ agentId, onAcknowledge, isOpen }: Props) {
function handleAcknowledge() {
// Add this agent to the list of agents for which popup has been shown
const seenAgentsJson = storage.get(Key.AI_AGENT_SAFETY_POPUP_SHOWN);
const seenAgents: string[] = seenAgentsJson
? JSON.parse(seenAgentsJson)
: [];
if (!seenAgents.includes(agentId)) {
seenAgents.push(agentId);
storage.set(Key.AI_AGENT_SAFETY_POPUP_SHOWN, JSON.stringify(seenAgents));
}
onAcknowledge();
}
if (!isOpen) return null;
return (
<Dialog
controlled={{ isOpen, set: () => {} }}
styling={{ maxWidth: "480px" }}
>
<Dialog.Content>
<div className="flex flex-col items-center p-6 text-center">
<div className="mb-6 flex h-16 w-16 items-center justify-center rounded-full bg-blue-50">
<ShieldCheckIcon
weight="fill"
size={32}
className="text-blue-600"
/>
</div>
<Text variant="h3" className="mb-4">
Safety Checks Enabled
</Text>
<Text variant="body" className="mb-2 text-zinc-700">
AI-generated agents may take actions that affect your data or
external systems.
</Text>
<Text variant="body" className="mb-8 text-zinc-700">
AutoGPT includes safety checks so you&apos;ll always have the
opportunity to review and approve sensitive actions before they
happen.
</Text>
<Button
variant="primary"
size="large"
className="w-full"
onClick={handleAcknowledge}
>
Got it
</Button>
</div>
</Dialog.Content>
</Dialog>
);
}
export function useAIAgentSafetyPopup(
agentId: string,
hasSensitiveAction: boolean,
hasHumanInTheLoop: boolean,
) {
const [shouldShowPopup, setShouldShowPopup] = useState(false);
const [hasChecked, setHasChecked] = useState(false);
useEffect(() => {
if (hasChecked) return;
const seenAgentsJson = storage.get(Key.AI_AGENT_SAFETY_POPUP_SHOWN);
const seenAgents: string[] = seenAgentsJson
? JSON.parse(seenAgentsJson)
: [];
const hasSeenPopupForThisAgent = seenAgents.includes(agentId);
const isRelevantAgent = hasSensitiveAction || hasHumanInTheLoop;
setShouldShowPopup(!hasSeenPopupForThisAgent && isRelevantAgent);
setHasChecked(true);
}, [agentId, hasSensitiveAction, hasHumanInTheLoop, hasChecked]);
const dismissPopup = useCallback(() => {
setShouldShowPopup(false);
}, []);
return {
shouldShowPopup,
dismissPopup,
};
}

View File

@@ -69,6 +69,7 @@ export function SafeModeToggle({ graph, className }: Props) {
const {
currentHITLSafeMode,
showHITLToggle,
isHITLStateUndetermined,
handleHITLToggle,
currentSensitiveActionSafeMode,
showSensitiveActionToggle,
@@ -77,13 +78,20 @@ export function SafeModeToggle({ graph, className }: Props) {
shouldShowToggle,
} = useAgentSafeMode(graph);
if (!shouldShowToggle) {
if (!shouldShowToggle || isHITLStateUndetermined) {
return null;
}
const showHITL = showHITLToggle && !isHITLStateUndetermined;
const showSensitive = showSensitiveActionToggle;
if (!showHITL && !showSensitive) {
return null;
}
return (
<div className={cn("flex gap-1", className)}>
{showHITLToggle && (
{showHITL && (
<SafeModeIconButton
isEnabled={currentHITLSafeMode}
label="Human-in-the-loop"
@@ -93,7 +101,7 @@ export function SafeModeToggle({ graph, className }: Props) {
isPending={isPending}
/>
)}
{showSensitiveActionToggle && (
{showSensitive && (
<SafeModeIconButton
isEnabled={currentSensitiveActionSafeMode}
label="Sensitive actions"

View File

@@ -8809,12 +8809,6 @@
"title": "Node Exec Id",
"description": "Node execution ID (primary key)"
},
"node_id": {
"type": "string",
"title": "Node Id",
"description": "Node definition ID (for grouping)",
"default": ""
},
"user_id": {
"type": "string",
"title": "User Id",
@@ -8914,7 +8908,7 @@
"created_at"
],
"title": "PendingHumanReviewModel",
"description": "Response model for pending human review data.\n\nRepresents a human review request that is awaiting user action.\nContains all necessary information for a user to review and approve\nor reject data from a Human-in-the-Loop block execution.\n\nAttributes:\n id: Unique identifier for the review record\n user_id: ID of the user who must perform the review\n node_exec_id: ID of the node execution that created this review\n node_id: ID of the node definition (for grouping reviews from same node)\n graph_exec_id: ID of the graph execution containing the node\n graph_id: ID of the graph template being executed\n graph_version: Version number of the graph template\n payload: The actual data payload awaiting review\n instructions: Instructions or message for the reviewer\n editable: Whether the reviewer can edit the data\n status: Current review status (WAITING, APPROVED, or REJECTED)\n review_message: Optional message from the reviewer\n created_at: Timestamp when review was created\n updated_at: Timestamp when review was last modified\n reviewed_at: Timestamp when review was completed (if applicable)"
"description": "Response model for pending human review data.\n\nRepresents a human review request that is awaiting user action.\nContains all necessary information for a user to review and approve\nor reject data from a Human-in-the-Loop block execution.\n\nAttributes:\n id: Unique identifier for the review record\n user_id: ID of the user who must perform the review\n node_exec_id: ID of the node execution that created this review\n graph_exec_id: ID of the graph execution containing the node\n graph_id: ID of the graph template being executed\n graph_version: Version number of the graph template\n payload: The actual data payload awaiting review\n instructions: Instructions or message for the reviewer\n editable: Whether the reviewer can edit the data\n status: Current review status (WAITING, APPROVED, or REJECTED)\n review_message: Optional message from the reviewer\n created_at: Timestamp when review was created\n updated_at: Timestamp when review was last modified\n reviewed_at: Timestamp when review was completed (if applicable)"
},
"PostmarkBounceEnum": {
"type": "integer",
@@ -9417,12 +9411,6 @@
],
"title": "Reviewed Data",
"description": "Optional edited data (ignored if approved=False)"
},
"auto_approve_future": {
"type": "boolean",
"title": "Auto Approve Future",
"description": "If true and this review is approved, future executions of this same block (node) will be automatically approved. This only affects approved reviews.",
"default": false
}
},
"type": "object",
@@ -9442,7 +9430,7 @@
"type": "object",
"required": ["reviews"],
"title": "ReviewRequest",
"description": "Request model for processing ALL pending reviews for an execution.\n\nThis request must include ALL pending reviews for a graph execution.\nEach review will be either approved (with optional data modifications)\nor rejected (data ignored). The execution will resume only after ALL reviews are processed.\n\nEach review item can individually specify whether to auto-approve future executions\nof the same block via the `auto_approve_future` field on ReviewItem."
"description": "Request model for processing ALL pending reviews for an execution.\n\nThis request must include ALL pending reviews for a graph execution.\nEach review will be either approved (with optional data modifications)\nor rejected (data ignored). The execution will resume only after ALL reviews are processed."
},
"ReviewResponse": {
"properties": {

View File

@@ -31,29 +31,6 @@ export function FloatingReviewsPanel({
query: {
enabled: !!(graphId && executionId),
select: okData,
// Poll while execution is in progress to detect status changes
refetchInterval: (q) => {
// Note: refetchInterval callback receives raw data before select transform
const rawData = q.state.data as
| { status: number; data?: { status?: string } }
| undefined;
if (rawData?.status !== 200) return false;
const status = rawData?.data?.status;
if (!status) return false;
// Poll every 2 seconds while running or in review
if (
status === AgentExecutionStatus.RUNNING ||
status === AgentExecutionStatus.QUEUED ||
status === AgentExecutionStatus.INCOMPLETE ||
status === AgentExecutionStatus.REVIEW
) {
return 2000;
}
return false;
},
refetchIntervalInBackground: true,
},
},
);
@@ -63,47 +40,28 @@ export function FloatingReviewsPanel({
useShallow((state) => state.graphExecutionStatus),
);
// Determine if we should poll for pending reviews
const isInReviewStatus =
executionDetails?.status === AgentExecutionStatus.REVIEW ||
graphExecutionStatus === AgentExecutionStatus.REVIEW;
const { pendingReviews, isLoading, refetch } = usePendingReviewsForExecution(
executionId || "",
{
enabled: !!executionId,
// Poll every 2 seconds when in REVIEW status to catch new reviews
refetchInterval: isInReviewStatus ? 2000 : false,
},
);
// Refetch pending reviews when execution status changes
useEffect(() => {
if (executionId && executionDetails?.status) {
if (executionId) {
refetch();
}
}, [executionDetails?.status, executionId, refetch]);
// Hide panel if:
// 1. No execution ID
// 2. No pending reviews and not in REVIEW status
// 3. Execution is RUNNING or QUEUED (hasn't paused for review yet)
if (!executionId) {
return null;
}
// Refetch when graph execution status changes to REVIEW
useEffect(() => {
if (graphExecutionStatus === AgentExecutionStatus.REVIEW && executionId) {
refetch();
}
}, [graphExecutionStatus, executionId, refetch]);
if (
!isLoading &&
pendingReviews.length === 0 &&
executionDetails?.status !== AgentExecutionStatus.REVIEW
) {
return null;
}
// Don't show panel while execution is still running/queued (not paused for review)
if (
executionDetails?.status === AgentExecutionStatus.RUNNING ||
executionDetails?.status === AgentExecutionStatus.QUEUED
!executionId ||
(!isLoading &&
pendingReviews.length === 0 &&
executionDetails?.status !== AgentExecutionStatus.REVIEW)
) {
return null;
}

View File

@@ -1,8 +1,10 @@
import { PendingHumanReviewModel } from "@/app/api/__generated__/models/pendingHumanReviewModel";
import { Text } from "@/components/atoms/Text/Text";
import { Button } from "@/components/atoms/Button/Button";
import { Input } from "@/components/atoms/Input/Input";
import { Switch } from "@/components/atoms/Switch/Switch";
import { useEffect, useState } from "react";
import { TrashIcon, EyeSlashIcon } from "@phosphor-icons/react";
import { useState } from "react";
interface StructuredReviewPayload {
data: unknown;
@@ -38,40 +40,37 @@ function extractReviewData(payload: unknown): {
interface PendingReviewCardProps {
review: PendingHumanReviewModel;
onReviewDataChange: (nodeExecId: string, data: string) => void;
autoApproveFuture?: boolean;
onAutoApproveFutureChange?: (nodeExecId: string, enabled: boolean) => void;
externalDataValue?: string;
reviewMessage?: string;
onReviewMessageChange?: (nodeExecId: string, message: string) => void;
isDisabled?: boolean;
onToggleDisabled?: (nodeExecId: string) => void;
}
export function PendingReviewCard({
review,
onReviewDataChange,
autoApproveFuture = false,
onAutoApproveFutureChange,
externalDataValue,
reviewMessage = "",
onReviewMessageChange,
isDisabled = false,
onToggleDisabled,
}: PendingReviewCardProps) {
const extractedData = extractReviewData(review.payload);
const isDataEditable = review.editable;
const instructions = extractedData.instructions || review.instructions;
const [currentData, setCurrentData] = useState(extractedData.data);
// Sync with external data value when auto-approve is toggled
useEffect(() => {
if (externalDataValue !== undefined) {
try {
const parsedData = JSON.parse(externalDataValue);
setCurrentData(parsedData);
} catch {
// If parsing fails, keep current data
}
}
}, [externalDataValue]);
const handleDataChange = (newValue: unknown) => {
setCurrentData(newValue);
onReviewDataChange(review.node_exec_id, JSON.stringify(newValue, null, 2));
};
const handleMessageChange = (newMessage: string) => {
onReviewMessageChange?.(review.node_exec_id, newMessage);
};
// Show simplified view when no toggle functionality is provided (Screenshot 1 mode)
const showSimplified = !onToggleDisabled;
const renderDataInput = () => {
const data = currentData;
@@ -148,13 +147,35 @@ export function PendingReviewCard({
// Use the existing HITL review interface
return (
<div className="space-y-4">
{!showSimplified && (
<div className="flex items-start justify-between">
<div className="flex-1">
{isDisabled && (
<Text variant="small" className="text-muted-foreground">
This item will be rejected
</Text>
)}
</div>
<Button
onClick={() => onToggleDisabled!(review.node_exec_id)}
variant={isDisabled ? "primary" : "secondary"}
size="small"
leftIcon={
isDisabled ? <EyeSlashIcon size={14} /> : <TrashIcon size={14} />
}
>
{isDisabled ? "Include" : "Exclude"}
</Button>
</div>
)}
{/* Show instructions as field label */}
{instructions && (
<div className="space-y-3">
<Text variant="body" className="font-semibold text-gray-900">
{getFieldLabel(instructions)}
</Text>
{isDataEditable && !autoApproveFuture ? (
{isDataEditable && !isDisabled ? (
renderDataInput()
) : (
<div className="rounded-lg border border-gray-200 bg-white p-3">
@@ -177,7 +198,7 @@ export function PendingReviewCard({
</span>
)}
</Text>
{isDataEditable && !autoApproveFuture ? (
{isDataEditable && !isDisabled ? (
renderDataInput()
) : (
<div className="rounded-lg border border-gray-200 bg-white p-3">
@@ -189,26 +210,22 @@ export function PendingReviewCard({
</div>
)}
{/* Auto-approve toggle for this review */}
{onAutoApproveFutureChange && (
<div className="space-y-2 pt-2">
<div className="flex items-center gap-3">
<Switch
checked={autoApproveFuture}
onCheckedChange={(enabled: boolean) =>
onAutoApproveFutureChange(review.node_exec_id, enabled)
}
/>
<Text variant="small" className="text-gray-700">
Auto-approve future executions of this block
</Text>
</div>
{autoApproveFuture && (
<Text variant="small" className="pl-11 text-gray-500">
Original data will be used for this and all future reviews from
this block.
</Text>
)}
{!showSimplified && isDisabled && (
<div>
<Text variant="body" className="mb-2 font-semibold">
Rejection Reason (Optional):
</Text>
<Input
id="rejection-reason"
label="Rejection Reason"
hideLabel
size="small"
type="textarea"
rows={3}
value={reviewMessage}
onChange={(e) => handleMessageChange(e.target.value)}
placeholder="Add any notes about why you're rejecting this..."
/>
</div>
)}
</div>

View File

@@ -32,15 +32,14 @@ export function PendingReviewsList({
},
);
const [reviewMessageMap, setReviewMessageMap] = useState<
Record<string, string>
>({});
const [pendingAction, setPendingAction] = useState<
"approve" | "reject" | null
>(null);
// Track per-review auto-approval state
const [autoApproveFutureMap, setAutoApproveFutureMap] = useState<
Record<string, boolean>
>({});
const { toast } = useToast();
const reviewActionMutation = usePostV2ProcessReviewAction({
@@ -89,23 +88,8 @@ export function PendingReviewsList({
setReviewDataMap((prev) => ({ ...prev, [nodeExecId]: data }));
}
// Handle per-review auto-approval toggle
function handleAutoApproveFutureToggle(nodeExecId: string, enabled: boolean) {
setAutoApproveFutureMap((prev) => ({
...prev,
[nodeExecId]: enabled,
}));
if (enabled) {
// Reset this review's data to original value
const review = reviews.find((r) => r.node_exec_id === nodeExecId);
if (review) {
setReviewDataMap((prev) => ({
...prev,
[nodeExecId]: JSON.stringify(review.payload, null, 2),
}));
}
}
function handleReviewMessageChange(nodeExecId: string, message: string) {
setReviewMessageMap((prev) => ({ ...prev, [nodeExecId]: message }));
}
function processReviews(approved: boolean) {
@@ -123,39 +107,30 @@ export function PendingReviewsList({
for (const review of reviews) {
const reviewData = reviewDataMap[review.node_exec_id];
const autoApproveThisReview = autoApproveFutureMap[review.node_exec_id];
const reviewMessage = reviewMessageMap[review.node_exec_id];
// When auto-approving future actions for this review, send undefined (use original data)
// Otherwise, parse and send the edited data if available
let parsedData: any = undefined;
let parsedData: any = review.payload; // Default to original payload
if (!autoApproveThisReview) {
// For regular approve/reject, use edited data if available
if (review.editable && reviewData) {
try {
parsedData = JSON.parse(reviewData);
} catch (error) {
toast({
title: "Invalid JSON",
description: `Please fix the JSON format in review for node ${review.node_exec_id}: ${error instanceof Error ? error.message : "Invalid syntax"}`,
variant: "destructive",
});
setPendingAction(null);
return;
}
} else {
// No edits, use original payload
parsedData = review.payload;
// Parse edited data if available and editable
if (review.editable && reviewData) {
try {
parsedData = JSON.parse(reviewData);
} catch (error) {
toast({
title: "Invalid JSON",
description: `Please fix the JSON format in review for node ${review.node_exec_id}: ${error instanceof Error ? error.message : "Invalid syntax"}`,
variant: "destructive",
});
setPendingAction(null);
return;
}
}
// When autoApproveThisReview is true, parsedData stays undefined
// Backend will use the original payload stored in the database
reviewItems.push({
node_exec_id: review.node_exec_id,
approved,
reviewed_data: parsedData,
auto_approve_future: autoApproveThisReview && approved,
message: reviewMessage || undefined,
});
}
@@ -207,20 +182,21 @@ export function PendingReviewsList({
<div className="space-y-7">
{reviews.map((review) => (
<PendingReviewCard
key={`${review.node_exec_id}`}
key={review.node_exec_id}
review={review}
onReviewDataChange={handleReviewDataChange}
autoApproveFuture={
autoApproveFutureMap[review.node_exec_id] || false
}
onAutoApproveFutureChange={handleAutoApproveFutureToggle}
externalDataValue={reviewDataMap[review.node_exec_id]}
onReviewMessageChange={handleReviewMessageChange}
reviewMessage={reviewMessageMap[review.node_exec_id] || ""}
/>
))}
</div>
<div className="space-y-4">
<div className="flex flex-wrap gap-2">
<div className="space-y-7">
<Text variant="body" className="text-textGrey">
Note: Changes you make here apply only to this task
</Text>
<div className="flex gap-2">
<Button
onClick={() => processReviews(true)}
disabled={reviewActionMutation.isPending || reviews.length === 0}
@@ -244,11 +220,6 @@ export function PendingReviewsList({
Reject
</Button>
</div>
<Text variant="small" className="text-textGrey">
You can turn auto-approval on or off anytime in this agent&apos;s
settings.
</Text>
</div>
</div>
);

View File

@@ -15,22 +15,8 @@ export function usePendingReviews() {
};
}
interface UsePendingReviewsForExecutionOptions {
enabled?: boolean;
refetchInterval?: number | false;
}
export function usePendingReviewsForExecution(
graphExecId: string,
options?: UsePendingReviewsForExecutionOptions,
) {
const query = useGetV2GetPendingReviewsForExecution(graphExecId, {
query: {
enabled: options?.enabled ?? !!graphExecId,
refetchInterval: options?.refetchInterval,
refetchIntervalInBackground: !!options?.refetchInterval,
},
});
export function usePendingReviewsForExecution(graphExecId: string) {
const query = useGetV2GetPendingReviewsForExecution(graphExecId);
return {
pendingReviews: okData(query.data) || [],

View File

@@ -10,7 +10,6 @@ export enum Key {
LIBRARY_AGENTS_CACHE = "library-agents-cache",
CHAT_SESSION_ID = "chat_session_id",
COOKIE_CONSENT = "autogpt_cookie_consent",
AI_AGENT_SAFETY_POPUP_SHOWN = "ai-agent-safety-popup-shown",
}
function get(key: Key) {