mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-01-24 06:28:04 -05:00
Compare commits
5 Commits
feat/sensi
...
fix/reduce
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0da7a54952 | ||
|
|
ba0aa83de3 | ||
|
|
4c9333fc37 | ||
|
|
03c805ac5a | ||
|
|
24d91a6e3e |
@@ -23,7 +23,6 @@ class PendingHumanReviewModel(BaseModel):
|
||||
id: Unique identifier for the review record
|
||||
user_id: ID of the user who must perform the review
|
||||
node_exec_id: ID of the node execution that created this review
|
||||
node_id: ID of the node definition (for grouping reviews from same node)
|
||||
graph_exec_id: ID of the graph execution containing the node
|
||||
graph_id: ID of the graph template being executed
|
||||
graph_version: Version number of the graph template
|
||||
@@ -38,10 +37,6 @@ class PendingHumanReviewModel(BaseModel):
|
||||
"""
|
||||
|
||||
node_exec_id: str = Field(description="Node execution ID (primary key)")
|
||||
node_id: str = Field(
|
||||
description="Node definition ID (for grouping)",
|
||||
default="", # Temporary default for test compatibility
|
||||
)
|
||||
user_id: str = Field(description="User ID associated with the review")
|
||||
graph_exec_id: str = Field(description="Graph execution ID")
|
||||
graph_id: str = Field(description="Graph ID")
|
||||
@@ -71,9 +66,7 @@ class PendingHumanReviewModel(BaseModel):
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_db(
|
||||
cls, review: "PendingHumanReview", node_id: str
|
||||
) -> "PendingHumanReviewModel":
|
||||
def from_db(cls, review: "PendingHumanReview") -> "PendingHumanReviewModel":
|
||||
"""
|
||||
Convert a database model to a response model.
|
||||
|
||||
@@ -81,14 +74,9 @@ class PendingHumanReviewModel(BaseModel):
|
||||
payload, instructions, and editable flag.
|
||||
|
||||
Handles invalid data gracefully by using safe defaults.
|
||||
|
||||
Args:
|
||||
review: Database review object
|
||||
node_id: Node definition ID (fetched from NodeExecution)
|
||||
"""
|
||||
return cls(
|
||||
node_exec_id=review.nodeExecId,
|
||||
node_id=node_id,
|
||||
user_id=review.userId,
|
||||
graph_exec_id=review.graphExecId,
|
||||
graph_id=review.graphId,
|
||||
@@ -119,13 +107,6 @@ class ReviewItem(BaseModel):
|
||||
reviewed_data: SafeJsonData | None = Field(
|
||||
None, description="Optional edited data (ignored if approved=False)"
|
||||
)
|
||||
auto_approve_future: bool = Field(
|
||||
default=False,
|
||||
description=(
|
||||
"If true and this review is approved, future executions of this same "
|
||||
"block (node) will be automatically approved. This only affects approved reviews."
|
||||
),
|
||||
)
|
||||
|
||||
@field_validator("reviewed_data")
|
||||
@classmethod
|
||||
@@ -193,9 +174,6 @@ class ReviewRequest(BaseModel):
|
||||
This request must include ALL pending reviews for a graph execution.
|
||||
Each review will be either approved (with optional data modifications)
|
||||
or rejected (data ignored). The execution will resume only after ALL reviews are processed.
|
||||
|
||||
Each review item can individually specify whether to auto-approve future executions
|
||||
of the same block via the `auto_approve_future` field on ReviewItem.
|
||||
"""
|
||||
|
||||
reviews: List[ReviewItem] = Field(
|
||||
|
||||
@@ -8,12 +8,6 @@ from prisma.enums import ReviewStatus
|
||||
from pytest_snapshot.plugin import Snapshot
|
||||
|
||||
from backend.api.rest_api import handle_internal_http_error
|
||||
from backend.data.execution import (
|
||||
ExecutionContext,
|
||||
ExecutionStatus,
|
||||
NodeExecutionResult,
|
||||
)
|
||||
from backend.data.graph import GraphSettings
|
||||
|
||||
from .model import PendingHumanReviewModel
|
||||
from .routes import router
|
||||
@@ -21,24 +15,20 @@ from .routes import router
|
||||
# Using a fixed timestamp for reproducible tests
|
||||
FIXED_NOW = datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc)
|
||||
|
||||
app = fastapi.FastAPI()
|
||||
app.include_router(router, prefix="/api/review")
|
||||
app.add_exception_handler(ValueError, handle_internal_http_error(400))
|
||||
|
||||
@pytest.fixture
|
||||
def app():
|
||||
"""Create FastAPI app for testing"""
|
||||
test_app = fastapi.FastAPI()
|
||||
test_app.include_router(router, prefix="/api/review")
|
||||
test_app.add_exception_handler(ValueError, handle_internal_http_error(400))
|
||||
return test_app
|
||||
client = fastapi.testclient.TestClient(app)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(app, mock_jwt_user):
|
||||
"""Create test client with auth overrides"""
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_app_auth(mock_jwt_user):
|
||||
"""Setup auth overrides for all tests in this module"""
|
||||
from autogpt_libs.auth.jwt_utils import get_jwt_payload
|
||||
|
||||
app.dependency_overrides[get_jwt_payload] = mock_jwt_user["get_jwt_payload"]
|
||||
with fastapi.testclient.TestClient(app) as test_client:
|
||||
yield test_client
|
||||
yield
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
@@ -47,7 +37,6 @@ def sample_pending_review(test_user_id: str) -> PendingHumanReviewModel:
|
||||
"""Create a sample pending review for testing"""
|
||||
return PendingHumanReviewModel(
|
||||
node_exec_id="test_node_123",
|
||||
node_id="test_node_def_456",
|
||||
user_id=test_user_id,
|
||||
graph_exec_id="test_graph_exec_456",
|
||||
graph_id="test_graph_789",
|
||||
@@ -66,7 +55,6 @@ def sample_pending_review(test_user_id: str) -> PendingHumanReviewModel:
|
||||
|
||||
|
||||
def test_get_pending_reviews_empty(
|
||||
client: fastapi.testclient.TestClient,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
snapshot: Snapshot,
|
||||
test_user_id: str,
|
||||
@@ -85,7 +73,6 @@ def test_get_pending_reviews_empty(
|
||||
|
||||
|
||||
def test_get_pending_reviews_with_data(
|
||||
client: fastapi.testclient.TestClient,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
sample_pending_review: PendingHumanReviewModel,
|
||||
snapshot: Snapshot,
|
||||
@@ -108,7 +95,6 @@ def test_get_pending_reviews_with_data(
|
||||
|
||||
|
||||
def test_get_pending_reviews_for_execution_success(
|
||||
client: fastapi.testclient.TestClient,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
sample_pending_review: PendingHumanReviewModel,
|
||||
snapshot: Snapshot,
|
||||
@@ -137,7 +123,6 @@ def test_get_pending_reviews_for_execution_success(
|
||||
|
||||
|
||||
def test_get_pending_reviews_for_execution_not_available(
|
||||
client: fastapi.testclient.TestClient,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
) -> None:
|
||||
"""Test access denied when user doesn't own the execution"""
|
||||
@@ -153,7 +138,6 @@ def test_get_pending_reviews_for_execution_not_available(
|
||||
|
||||
|
||||
def test_process_review_action_approve_success(
|
||||
client: fastapi.testclient.TestClient,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
sample_pending_review: PendingHumanReviewModel,
|
||||
test_user_id: str,
|
||||
@@ -161,12 +145,6 @@ def test_process_review_action_approve_success(
|
||||
"""Test successful review approval"""
|
||||
# Mock the route functions
|
||||
|
||||
# Mock get_pending_review_by_node_exec_id (called to find the graph_exec_id)
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
|
||||
)
|
||||
mock_get_reviews_for_user.return_value = sample_pending_review
|
||||
|
||||
mock_get_reviews_for_execution = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_reviews_for_execution"
|
||||
)
|
||||
@@ -195,14 +173,6 @@ def test_process_review_action_approve_success(
|
||||
)
|
||||
mock_process_all_reviews.return_value = {"test_node_123": approved_review}
|
||||
|
||||
# Mock get_graph_execution_meta to return execution in REVIEW status
|
||||
mock_get_graph_exec = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_graph_execution_meta"
|
||||
)
|
||||
mock_graph_exec_meta = mocker.Mock()
|
||||
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
|
||||
mock_get_graph_exec.return_value = mock_graph_exec_meta
|
||||
|
||||
mock_has_pending = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.has_pending_reviews_for_graph_exec"
|
||||
)
|
||||
@@ -232,7 +202,6 @@ def test_process_review_action_approve_success(
|
||||
|
||||
|
||||
def test_process_review_action_reject_success(
|
||||
client: fastapi.testclient.TestClient,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
sample_pending_review: PendingHumanReviewModel,
|
||||
test_user_id: str,
|
||||
@@ -240,20 +209,6 @@ def test_process_review_action_reject_success(
|
||||
"""Test successful review rejection"""
|
||||
# Mock the route functions
|
||||
|
||||
# Mock get_pending_review_by_node_exec_id (called to find the graph_exec_id)
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
|
||||
)
|
||||
mock_get_reviews_for_user.return_value = sample_pending_review
|
||||
|
||||
# Mock get_graph_execution_meta to return execution in REVIEW status
|
||||
mock_get_graph_exec = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_graph_execution_meta"
|
||||
)
|
||||
mock_graph_exec_meta = mocker.Mock()
|
||||
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
|
||||
mock_get_graph_exec.return_value = mock_graph_exec_meta
|
||||
|
||||
mock_get_reviews_for_execution = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_reviews_for_execution"
|
||||
)
|
||||
@@ -307,7 +262,6 @@ def test_process_review_action_reject_success(
|
||||
|
||||
|
||||
def test_process_review_action_mixed_success(
|
||||
client: fastapi.testclient.TestClient,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
sample_pending_review: PendingHumanReviewModel,
|
||||
test_user_id: str,
|
||||
@@ -334,12 +288,6 @@ def test_process_review_action_mixed_success(
|
||||
|
||||
# Mock the route functions
|
||||
|
||||
# Mock get_pending_review_by_node_exec_id (called to find the graph_exec_id)
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
|
||||
)
|
||||
mock_get_reviews_for_user.return_value = sample_pending_review
|
||||
|
||||
mock_get_reviews_for_execution = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_reviews_for_execution"
|
||||
)
|
||||
@@ -389,14 +337,6 @@ def test_process_review_action_mixed_success(
|
||||
"test_node_456": rejected_review,
|
||||
}
|
||||
|
||||
# Mock get_graph_execution_meta to return execution in REVIEW status
|
||||
mock_get_graph_exec = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_graph_execution_meta"
|
||||
)
|
||||
mock_graph_exec_meta = mocker.Mock()
|
||||
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
|
||||
mock_get_graph_exec.return_value = mock_graph_exec_meta
|
||||
|
||||
mock_has_pending = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.has_pending_reviews_for_graph_exec"
|
||||
)
|
||||
@@ -429,7 +369,6 @@ def test_process_review_action_mixed_success(
|
||||
|
||||
|
||||
def test_process_review_action_empty_request(
|
||||
client: fastapi.testclient.TestClient,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
test_user_id: str,
|
||||
) -> None:
|
||||
@@ -447,45 +386,10 @@ def test_process_review_action_empty_request(
|
||||
|
||||
|
||||
def test_process_review_action_review_not_found(
|
||||
client: fastapi.testclient.TestClient,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
sample_pending_review: PendingHumanReviewModel,
|
||||
test_user_id: str,
|
||||
) -> None:
|
||||
"""Test error when review is not found"""
|
||||
# Create a review with the nonexistent_node ID so the route can find the graph_exec_id
|
||||
nonexistent_review = PendingHumanReviewModel(
|
||||
node_exec_id="nonexistent_node",
|
||||
user_id=test_user_id,
|
||||
graph_exec_id="test_graph_exec_456",
|
||||
graph_id="test_graph_789",
|
||||
graph_version=1,
|
||||
payload={"data": "test"},
|
||||
instructions="Review",
|
||||
editable=True,
|
||||
status=ReviewStatus.WAITING,
|
||||
review_message=None,
|
||||
was_edited=None,
|
||||
processed=False,
|
||||
created_at=FIXED_NOW,
|
||||
updated_at=None,
|
||||
reviewed_at=None,
|
||||
)
|
||||
|
||||
# Mock get_pending_review_by_node_exec_id (called to find the graph_exec_id)
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
|
||||
)
|
||||
mock_get_reviews_for_user.return_value = nonexistent_review
|
||||
|
||||
# Mock get_graph_execution_meta to return execution in REVIEW status
|
||||
mock_get_graph_exec = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_graph_execution_meta"
|
||||
)
|
||||
mock_graph_exec_meta = mocker.Mock()
|
||||
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
|
||||
mock_get_graph_exec.return_value = mock_graph_exec_meta
|
||||
|
||||
# Mock the functions that extract graph execution ID from the request
|
||||
mock_get_reviews_for_execution = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_reviews_for_execution"
|
||||
@@ -518,26 +422,11 @@ def test_process_review_action_review_not_found(
|
||||
|
||||
|
||||
def test_process_review_action_partial_failure(
|
||||
client: fastapi.testclient.TestClient,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
sample_pending_review: PendingHumanReviewModel,
|
||||
test_user_id: str,
|
||||
) -> None:
|
||||
"""Test handling of partial failures in review processing"""
|
||||
# Mock get_pending_review_by_node_exec_id (called to find the graph_exec_id)
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
|
||||
)
|
||||
mock_get_reviews_for_user.return_value = sample_pending_review
|
||||
|
||||
# Mock get_graph_execution_meta to return execution in REVIEW status
|
||||
mock_get_graph_exec = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_graph_execution_meta"
|
||||
)
|
||||
mock_graph_exec_meta = mocker.Mock()
|
||||
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
|
||||
mock_get_graph_exec.return_value = mock_graph_exec_meta
|
||||
|
||||
# Mock the route functions
|
||||
mock_get_reviews_for_execution = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_reviews_for_execution"
|
||||
@@ -567,50 +456,16 @@ def test_process_review_action_partial_failure(
|
||||
|
||||
|
||||
def test_process_review_action_invalid_node_exec_id(
|
||||
client: fastapi.testclient.TestClient,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
sample_pending_review: PendingHumanReviewModel,
|
||||
test_user_id: str,
|
||||
) -> None:
|
||||
"""Test failure when trying to process review with invalid node execution ID"""
|
||||
# Create a review with the invalid-node-format ID so the route can find the graph_exec_id
|
||||
invalid_review = PendingHumanReviewModel(
|
||||
node_exec_id="invalid-node-format",
|
||||
user_id=test_user_id,
|
||||
graph_exec_id="test_graph_exec_456",
|
||||
graph_id="test_graph_789",
|
||||
graph_version=1,
|
||||
payload={"data": "test"},
|
||||
instructions="Review",
|
||||
editable=True,
|
||||
status=ReviewStatus.WAITING,
|
||||
review_message=None,
|
||||
was_edited=None,
|
||||
processed=False,
|
||||
created_at=FIXED_NOW,
|
||||
updated_at=None,
|
||||
reviewed_at=None,
|
||||
)
|
||||
|
||||
# Mock get_pending_review_by_node_exec_id (called to find the graph_exec_id)
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
|
||||
)
|
||||
mock_get_reviews_for_user.return_value = invalid_review
|
||||
|
||||
# Mock get_graph_execution_meta to return execution in REVIEW status
|
||||
mock_get_graph_exec = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_graph_execution_meta"
|
||||
)
|
||||
mock_graph_exec_meta = mocker.Mock()
|
||||
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
|
||||
mock_get_graph_exec.return_value = mock_graph_exec_meta
|
||||
|
||||
# Mock the route functions
|
||||
mock_get_reviews_for_execution = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_reviews_for_execution"
|
||||
)
|
||||
mock_get_reviews_for_execution.return_value = [invalid_review]
|
||||
mock_get_reviews_for_execution.return_value = [sample_pending_review]
|
||||
|
||||
# Mock validation failure - this should return 400, not 500
|
||||
mock_process_all_reviews = mocker.patch(
|
||||
@@ -635,595 +490,3 @@ def test_process_review_action_invalid_node_exec_id(
|
||||
# Should be a 400 Bad Request, not 500 Internal Server Error
|
||||
assert response.status_code == 400
|
||||
assert "Invalid node execution ID format" in response.json()["detail"]
|
||||
|
||||
|
||||
def test_process_review_action_auto_approve_creates_auto_approval_records(
|
||||
client: fastapi.testclient.TestClient,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
sample_pending_review: PendingHumanReviewModel,
|
||||
test_user_id: str,
|
||||
) -> None:
|
||||
"""Test that auto_approve_future_actions flag creates auto-approval records"""
|
||||
# Mock get_pending_review_by_node_exec_id (called to find the graph_exec_id)
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
|
||||
)
|
||||
mock_get_reviews_for_user.return_value = sample_pending_review
|
||||
|
||||
# Mock process_all_reviews
|
||||
mock_process_all_reviews = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.process_all_reviews_for_execution"
|
||||
)
|
||||
approved_review = PendingHumanReviewModel(
|
||||
node_exec_id="test_node_123",
|
||||
user_id=test_user_id,
|
||||
graph_exec_id="test_graph_exec_456",
|
||||
graph_id="test_graph_789",
|
||||
graph_version=1,
|
||||
payload={"data": "test payload"},
|
||||
instructions="Please review",
|
||||
editable=True,
|
||||
status=ReviewStatus.APPROVED,
|
||||
review_message="Approved",
|
||||
was_edited=False,
|
||||
processed=False,
|
||||
created_at=FIXED_NOW,
|
||||
updated_at=FIXED_NOW,
|
||||
reviewed_at=FIXED_NOW,
|
||||
)
|
||||
mock_process_all_reviews.return_value = {"test_node_123": approved_review}
|
||||
|
||||
# Mock get_node_execution to return node_id
|
||||
mock_get_node_execution = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_node_execution"
|
||||
)
|
||||
mock_node_exec = mocker.Mock(spec=NodeExecutionResult)
|
||||
mock_node_exec.node_id = "test_node_def_456"
|
||||
mock_get_node_execution.return_value = mock_node_exec
|
||||
|
||||
# Mock create_auto_approval_record
|
||||
mock_create_auto_approval = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.create_auto_approval_record"
|
||||
)
|
||||
|
||||
# Mock get_graph_execution_meta to return execution in REVIEW status
|
||||
mock_get_graph_exec = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_graph_execution_meta"
|
||||
)
|
||||
mock_graph_exec_meta = mocker.Mock()
|
||||
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
|
||||
mock_get_graph_exec.return_value = mock_graph_exec_meta
|
||||
|
||||
# Mock has_pending_reviews_for_graph_exec
|
||||
mock_has_pending = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.has_pending_reviews_for_graph_exec"
|
||||
)
|
||||
mock_has_pending.return_value = False
|
||||
|
||||
# Mock get_graph_settings to return custom settings
|
||||
mock_get_settings = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_graph_settings"
|
||||
)
|
||||
mock_get_settings.return_value = GraphSettings(
|
||||
human_in_the_loop_safe_mode=True,
|
||||
sensitive_action_safe_mode=True,
|
||||
)
|
||||
|
||||
# Mock get_user_by_id to prevent database access
|
||||
mock_get_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_user_by_id"
|
||||
)
|
||||
mock_user = mocker.Mock()
|
||||
mock_user.timezone = "UTC"
|
||||
mock_get_user.return_value = mock_user
|
||||
|
||||
# Mock add_graph_execution
|
||||
mock_add_execution = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.add_graph_execution"
|
||||
)
|
||||
|
||||
request_data = {
|
||||
"reviews": [
|
||||
{
|
||||
"node_exec_id": "test_node_123",
|
||||
"approved": True,
|
||||
"message": "Approved",
|
||||
"auto_approve_future": True,
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
response = client.post("/api/review/action", json=request_data)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
# Verify process_all_reviews_for_execution was called (without auto_approve param)
|
||||
mock_process_all_reviews.assert_called_once()
|
||||
|
||||
# Verify create_auto_approval_record was called for the approved review
|
||||
mock_create_auto_approval.assert_called_once_with(
|
||||
user_id=test_user_id,
|
||||
graph_exec_id="test_graph_exec_456",
|
||||
graph_id="test_graph_789",
|
||||
graph_version=1,
|
||||
node_id="test_node_def_456",
|
||||
payload={"data": "test payload"},
|
||||
)
|
||||
|
||||
# Verify get_graph_settings was called with correct parameters
|
||||
mock_get_settings.assert_called_once_with(
|
||||
user_id=test_user_id, graph_id="test_graph_789"
|
||||
)
|
||||
|
||||
# Verify add_graph_execution was called with proper ExecutionContext
|
||||
mock_add_execution.assert_called_once()
|
||||
call_kwargs = mock_add_execution.call_args.kwargs
|
||||
execution_context = call_kwargs["execution_context"]
|
||||
|
||||
assert isinstance(execution_context, ExecutionContext)
|
||||
assert execution_context.human_in_the_loop_safe_mode is True
|
||||
assert execution_context.sensitive_action_safe_mode is True
|
||||
|
||||
|
||||
def test_process_review_action_without_auto_approve_still_loads_settings(
|
||||
client: fastapi.testclient.TestClient,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
sample_pending_review: PendingHumanReviewModel,
|
||||
test_user_id: str,
|
||||
) -> None:
|
||||
"""Test that execution context is created with settings even without auto-approve"""
|
||||
# Mock get_pending_review_by_node_exec_id (called to find the graph_exec_id)
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
|
||||
)
|
||||
mock_get_reviews_for_user.return_value = sample_pending_review
|
||||
|
||||
# Mock process_all_reviews
|
||||
mock_process_all_reviews = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.process_all_reviews_for_execution"
|
||||
)
|
||||
approved_review = PendingHumanReviewModel(
|
||||
node_exec_id="test_node_123",
|
||||
user_id=test_user_id,
|
||||
graph_exec_id="test_graph_exec_456",
|
||||
graph_id="test_graph_789",
|
||||
graph_version=1,
|
||||
payload={"data": "test payload"},
|
||||
instructions="Please review",
|
||||
editable=True,
|
||||
status=ReviewStatus.APPROVED,
|
||||
review_message="Approved",
|
||||
was_edited=False,
|
||||
processed=False,
|
||||
created_at=FIXED_NOW,
|
||||
updated_at=FIXED_NOW,
|
||||
reviewed_at=FIXED_NOW,
|
||||
)
|
||||
mock_process_all_reviews.return_value = {"test_node_123": approved_review}
|
||||
|
||||
# Mock create_auto_approval_record - should NOT be called when auto_approve is False
|
||||
mock_create_auto_approval = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.create_auto_approval_record"
|
||||
)
|
||||
|
||||
# Mock get_graph_execution_meta to return execution in REVIEW status
|
||||
mock_get_graph_exec = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_graph_execution_meta"
|
||||
)
|
||||
mock_graph_exec_meta = mocker.Mock()
|
||||
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
|
||||
mock_get_graph_exec.return_value = mock_graph_exec_meta
|
||||
|
||||
# Mock has_pending_reviews_for_graph_exec
|
||||
mock_has_pending = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.has_pending_reviews_for_graph_exec"
|
||||
)
|
||||
mock_has_pending.return_value = False
|
||||
|
||||
# Mock get_graph_settings with sensitive_action_safe_mode enabled
|
||||
mock_get_settings = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_graph_settings"
|
||||
)
|
||||
mock_get_settings.return_value = GraphSettings(
|
||||
human_in_the_loop_safe_mode=False,
|
||||
sensitive_action_safe_mode=True,
|
||||
)
|
||||
|
||||
# Mock get_user_by_id to prevent database access
|
||||
mock_get_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_user_by_id"
|
||||
)
|
||||
mock_user = mocker.Mock()
|
||||
mock_user.timezone = "UTC"
|
||||
mock_get_user.return_value = mock_user
|
||||
|
||||
# Mock add_graph_execution
|
||||
mock_add_execution = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.add_graph_execution"
|
||||
)
|
||||
|
||||
# Request WITHOUT auto_approve_future (defaults to False)
|
||||
request_data = {
|
||||
"reviews": [
|
||||
{
|
||||
"node_exec_id": "test_node_123",
|
||||
"approved": True,
|
||||
"message": "Approved",
|
||||
# auto_approve_future defaults to False
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
response = client.post("/api/review/action", json=request_data)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
# Verify process_all_reviews_for_execution was called
|
||||
mock_process_all_reviews.assert_called_once()
|
||||
|
||||
# Verify create_auto_approval_record was NOT called (auto_approve_future=False)
|
||||
mock_create_auto_approval.assert_not_called()
|
||||
|
||||
# Verify settings were loaded
|
||||
mock_get_settings.assert_called_once()
|
||||
|
||||
# Verify ExecutionContext has proper settings
|
||||
mock_add_execution.assert_called_once()
|
||||
call_kwargs = mock_add_execution.call_args.kwargs
|
||||
execution_context = call_kwargs["execution_context"]
|
||||
|
||||
assert isinstance(execution_context, ExecutionContext)
|
||||
assert execution_context.human_in_the_loop_safe_mode is False
|
||||
assert execution_context.sensitive_action_safe_mode is True
|
||||
|
||||
|
||||
def test_process_review_action_auto_approve_only_applies_to_approved_reviews(
|
||||
client: fastapi.testclient.TestClient,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
test_user_id: str,
|
||||
) -> None:
|
||||
"""Test that auto_approve record is created only for approved reviews"""
|
||||
# Create two reviews - one approved, one rejected
|
||||
approved_review = PendingHumanReviewModel(
|
||||
node_exec_id="node_exec_approved",
|
||||
user_id=test_user_id,
|
||||
graph_exec_id="test_graph_exec_456",
|
||||
graph_id="test_graph_789",
|
||||
graph_version=1,
|
||||
payload={"data": "approved"},
|
||||
instructions="Review",
|
||||
editable=True,
|
||||
status=ReviewStatus.APPROVED,
|
||||
review_message=None,
|
||||
was_edited=False,
|
||||
processed=False,
|
||||
created_at=FIXED_NOW,
|
||||
updated_at=FIXED_NOW,
|
||||
reviewed_at=FIXED_NOW,
|
||||
)
|
||||
rejected_review = PendingHumanReviewModel(
|
||||
node_exec_id="node_exec_rejected",
|
||||
user_id=test_user_id,
|
||||
graph_exec_id="test_graph_exec_456",
|
||||
graph_id="test_graph_789",
|
||||
graph_version=1,
|
||||
payload={"data": "rejected"},
|
||||
instructions="Review",
|
||||
editable=True,
|
||||
status=ReviewStatus.REJECTED,
|
||||
review_message="Rejected",
|
||||
was_edited=False,
|
||||
processed=False,
|
||||
created_at=FIXED_NOW,
|
||||
updated_at=FIXED_NOW,
|
||||
reviewed_at=FIXED_NOW,
|
||||
)
|
||||
|
||||
# Mock get_pending_review_by_node_exec_id (called to find the graph_exec_id)
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
|
||||
)
|
||||
mock_get_reviews_for_user.return_value = approved_review
|
||||
|
||||
# Mock process_all_reviews
|
||||
mock_process_all_reviews = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.process_all_reviews_for_execution"
|
||||
)
|
||||
mock_process_all_reviews.return_value = {
|
||||
"node_exec_approved": approved_review,
|
||||
"node_exec_rejected": rejected_review,
|
||||
}
|
||||
|
||||
# Mock get_node_execution to return node_id (only called for approved review)
|
||||
mock_get_node_execution = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_node_execution"
|
||||
)
|
||||
mock_node_exec = mocker.Mock(spec=NodeExecutionResult)
|
||||
mock_node_exec.node_id = "test_node_def_approved"
|
||||
mock_get_node_execution.return_value = mock_node_exec
|
||||
|
||||
# Mock create_auto_approval_record
|
||||
mock_create_auto_approval = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.create_auto_approval_record"
|
||||
)
|
||||
|
||||
# Mock get_graph_execution_meta to return execution in REVIEW status
|
||||
mock_get_graph_exec = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_graph_execution_meta"
|
||||
)
|
||||
mock_graph_exec_meta = mocker.Mock()
|
||||
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
|
||||
mock_get_graph_exec.return_value = mock_graph_exec_meta
|
||||
|
||||
# Mock has_pending_reviews_for_graph_exec
|
||||
mock_has_pending = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.has_pending_reviews_for_graph_exec"
|
||||
)
|
||||
mock_has_pending.return_value = False
|
||||
|
||||
# Mock get_graph_settings
|
||||
mock_get_settings = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_graph_settings"
|
||||
)
|
||||
mock_get_settings.return_value = GraphSettings()
|
||||
|
||||
# Mock get_user_by_id to prevent database access
|
||||
mock_get_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_user_by_id"
|
||||
)
|
||||
mock_user = mocker.Mock()
|
||||
mock_user.timezone = "UTC"
|
||||
mock_get_user.return_value = mock_user
|
||||
|
||||
# Mock add_graph_execution
|
||||
mock_add_execution = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.add_graph_execution"
|
||||
)
|
||||
|
||||
request_data = {
|
||||
"reviews": [
|
||||
{
|
||||
"node_exec_id": "node_exec_approved",
|
||||
"approved": True,
|
||||
"auto_approve_future": True,
|
||||
},
|
||||
{
|
||||
"node_exec_id": "node_exec_rejected",
|
||||
"approved": False,
|
||||
"auto_approve_future": True, # Should be ignored since rejected
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
response = client.post("/api/review/action", json=request_data)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
# Verify process_all_reviews_for_execution was called
|
||||
mock_process_all_reviews.assert_called_once()
|
||||
|
||||
# Verify create_auto_approval_record was called ONLY for the approved review
|
||||
# (not for the rejected one)
|
||||
mock_create_auto_approval.assert_called_once_with(
|
||||
user_id=test_user_id,
|
||||
graph_exec_id="test_graph_exec_456",
|
||||
graph_id="test_graph_789",
|
||||
graph_version=1,
|
||||
node_id="test_node_def_approved",
|
||||
payload={"data": "approved"},
|
||||
)
|
||||
|
||||
# Verify get_node_execution was called only for approved review
|
||||
mock_get_node_execution.assert_called_once_with("node_exec_approved")
|
||||
|
||||
# Verify ExecutionContext was created (auto-approval is now DB-based)
|
||||
call_kwargs = mock_add_execution.call_args.kwargs
|
||||
execution_context = call_kwargs["execution_context"]
|
||||
assert isinstance(execution_context, ExecutionContext)
|
||||
|
||||
|
||||
def test_process_review_action_per_review_auto_approve_granularity(
|
||||
client: fastapi.testclient.TestClient,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
sample_pending_review: PendingHumanReviewModel,
|
||||
test_user_id: str,
|
||||
) -> None:
|
||||
"""Test that auto-approval can be set per-review (granular control)"""
|
||||
# Mock get_pending_review_by_node_exec_id - return different reviews based on node_exec_id
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_review_by_node_exec_id"
|
||||
)
|
||||
|
||||
# Create a mapping of node_exec_id to review
|
||||
review_map = {
|
||||
"node_1_auto": PendingHumanReviewModel(
|
||||
node_exec_id="node_1_auto",
|
||||
user_id=test_user_id,
|
||||
graph_exec_id="test_graph_exec",
|
||||
graph_id="test_graph",
|
||||
graph_version=1,
|
||||
payload={"data": "node1"},
|
||||
instructions="Review 1",
|
||||
editable=True,
|
||||
status=ReviewStatus.WAITING,
|
||||
review_message=None,
|
||||
was_edited=False,
|
||||
processed=False,
|
||||
created_at=FIXED_NOW,
|
||||
),
|
||||
"node_2_manual": PendingHumanReviewModel(
|
||||
node_exec_id="node_2_manual",
|
||||
user_id=test_user_id,
|
||||
graph_exec_id="test_graph_exec",
|
||||
graph_id="test_graph",
|
||||
graph_version=1,
|
||||
payload={"data": "node2"},
|
||||
instructions="Review 2",
|
||||
editable=True,
|
||||
status=ReviewStatus.WAITING,
|
||||
review_message=None,
|
||||
was_edited=False,
|
||||
processed=False,
|
||||
created_at=FIXED_NOW,
|
||||
),
|
||||
"node_3_auto": PendingHumanReviewModel(
|
||||
node_exec_id="node_3_auto",
|
||||
user_id=test_user_id,
|
||||
graph_exec_id="test_graph_exec",
|
||||
graph_id="test_graph",
|
||||
graph_version=1,
|
||||
payload={"data": "node3"},
|
||||
instructions="Review 3",
|
||||
editable=True,
|
||||
status=ReviewStatus.WAITING,
|
||||
review_message=None,
|
||||
was_edited=False,
|
||||
processed=False,
|
||||
created_at=FIXED_NOW,
|
||||
),
|
||||
}
|
||||
|
||||
# Use side_effect to return different reviews based on node_exec_id parameter
|
||||
def mock_get_review_by_id(node_exec_id: str, _user_id: str):
|
||||
return review_map.get(node_exec_id)
|
||||
|
||||
mock_get_reviews_for_user.side_effect = mock_get_review_by_id
|
||||
|
||||
# Mock process_all_reviews - return 3 approved reviews
|
||||
mock_process_all_reviews = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.process_all_reviews_for_execution"
|
||||
)
|
||||
mock_process_all_reviews.return_value = {
|
||||
"node_1_auto": PendingHumanReviewModel(
|
||||
node_exec_id="node_1_auto",
|
||||
user_id=test_user_id,
|
||||
graph_exec_id="test_graph_exec",
|
||||
graph_id="test_graph",
|
||||
graph_version=1,
|
||||
payload={"data": "node1"},
|
||||
instructions="Review 1",
|
||||
editable=True,
|
||||
status=ReviewStatus.APPROVED,
|
||||
review_message=None,
|
||||
was_edited=False,
|
||||
processed=False,
|
||||
created_at=FIXED_NOW,
|
||||
updated_at=FIXED_NOW,
|
||||
reviewed_at=FIXED_NOW,
|
||||
),
|
||||
"node_2_manual": PendingHumanReviewModel(
|
||||
node_exec_id="node_2_manual",
|
||||
user_id=test_user_id,
|
||||
graph_exec_id="test_graph_exec",
|
||||
graph_id="test_graph",
|
||||
graph_version=1,
|
||||
payload={"data": "node2"},
|
||||
instructions="Review 2",
|
||||
editable=True,
|
||||
status=ReviewStatus.APPROVED,
|
||||
review_message=None,
|
||||
was_edited=False,
|
||||
processed=False,
|
||||
created_at=FIXED_NOW,
|
||||
updated_at=FIXED_NOW,
|
||||
reviewed_at=FIXED_NOW,
|
||||
),
|
||||
"node_3_auto": PendingHumanReviewModel(
|
||||
node_exec_id="node_3_auto",
|
||||
user_id=test_user_id,
|
||||
graph_exec_id="test_graph_exec",
|
||||
graph_id="test_graph",
|
||||
graph_version=1,
|
||||
payload={"data": "node3"},
|
||||
instructions="Review 3",
|
||||
editable=True,
|
||||
status=ReviewStatus.APPROVED,
|
||||
review_message=None,
|
||||
was_edited=False,
|
||||
processed=False,
|
||||
created_at=FIXED_NOW,
|
||||
updated_at=FIXED_NOW,
|
||||
reviewed_at=FIXED_NOW,
|
||||
),
|
||||
}
|
||||
|
||||
# Mock get_node_execution
|
||||
mock_get_node_execution = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_node_execution"
|
||||
)
|
||||
|
||||
def mock_get_node(node_exec_id: str):
|
||||
mock_node = mocker.Mock(spec=NodeExecutionResult)
|
||||
mock_node.node_id = f"node_def_{node_exec_id}"
|
||||
return mock_node
|
||||
|
||||
mock_get_node_execution.side_effect = mock_get_node
|
||||
|
||||
# Mock create_auto_approval_record
|
||||
mock_create_auto_approval = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.create_auto_approval_record"
|
||||
)
|
||||
|
||||
# Mock get_graph_execution_meta
|
||||
mock_get_graph_exec = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_graph_execution_meta"
|
||||
)
|
||||
mock_graph_exec_meta = mocker.Mock()
|
||||
mock_graph_exec_meta.status = ExecutionStatus.REVIEW
|
||||
mock_get_graph_exec.return_value = mock_graph_exec_meta
|
||||
|
||||
# Mock has_pending_reviews_for_graph_exec
|
||||
mock_has_pending = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.has_pending_reviews_for_graph_exec"
|
||||
)
|
||||
mock_has_pending.return_value = False
|
||||
|
||||
# Mock settings and execution
|
||||
mock_get_settings = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_graph_settings"
|
||||
)
|
||||
mock_get_settings.return_value = GraphSettings(
|
||||
human_in_the_loop_safe_mode=False, sensitive_action_safe_mode=False
|
||||
)
|
||||
|
||||
mocker.patch("backend.api.features.executions.review.routes.add_graph_execution")
|
||||
mocker.patch("backend.api.features.executions.review.routes.get_user_by_id")
|
||||
|
||||
# Request with granular auto-approval:
|
||||
# - node_1_auto: auto_approve_future=True
|
||||
# - node_2_manual: auto_approve_future=False (explicit)
|
||||
# - node_3_auto: auto_approve_future=True
|
||||
request_data = {
|
||||
"reviews": [
|
||||
{
|
||||
"node_exec_id": "node_1_auto",
|
||||
"approved": True,
|
||||
"auto_approve_future": True,
|
||||
},
|
||||
{
|
||||
"node_exec_id": "node_2_manual",
|
||||
"approved": True,
|
||||
"auto_approve_future": False, # Don't auto-approve this one
|
||||
},
|
||||
{
|
||||
"node_exec_id": "node_3_auto",
|
||||
"approved": True,
|
||||
"auto_approve_future": True,
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
response = client.post("/api/review/action", json=request_data)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
# Verify create_auto_approval_record was called ONLY for reviews with auto_approve_future=True
|
||||
assert mock_create_auto_approval.call_count == 2
|
||||
|
||||
# Check that it was called for node_1 and node_3, but NOT node_2
|
||||
call_args_list = [call.kwargs for call in mock_create_auto_approval.call_args_list]
|
||||
node_ids_with_auto_approval = [args["node_id"] for args in call_args_list]
|
||||
|
||||
assert "node_def_node_1_auto" in node_ids_with_auto_approval
|
||||
assert "node_def_node_3_auto" in node_ids_with_auto_approval
|
||||
assert "node_def_node_2_manual" not in node_ids_with_auto_approval
|
||||
|
||||
@@ -5,23 +5,13 @@ import autogpt_libs.auth as autogpt_auth_lib
|
||||
from fastapi import APIRouter, HTTPException, Query, Security, status
|
||||
from prisma.enums import ReviewStatus
|
||||
|
||||
from backend.data.execution import (
|
||||
ExecutionContext,
|
||||
ExecutionStatus,
|
||||
get_graph_execution_meta,
|
||||
get_node_execution,
|
||||
)
|
||||
from backend.data.graph import get_graph_settings
|
||||
from backend.data.execution import get_graph_execution_meta
|
||||
from backend.data.human_review import (
|
||||
create_auto_approval_record,
|
||||
get_pending_review_by_node_exec_id,
|
||||
get_pending_reviews_for_execution,
|
||||
get_pending_reviews_for_user,
|
||||
has_pending_reviews_for_graph_exec,
|
||||
process_all_reviews_for_execution,
|
||||
)
|
||||
from backend.data.model import USER_TIMEZONE_NOT_SET
|
||||
from backend.data.user import get_user_by_id
|
||||
from backend.executor.utils import add_graph_execution
|
||||
|
||||
from .model import PendingHumanReviewModel, ReviewRequest, ReviewResponse
|
||||
@@ -137,80 +127,17 @@ async def process_review_action(
|
||||
detail="At least one review must be provided",
|
||||
)
|
||||
|
||||
# Get graph execution ID by looking up all requested reviews
|
||||
# Use direct lookup to avoid pagination issues (can't miss reviews beyond first page)
|
||||
# Also validate that all reviews belong to the same execution
|
||||
matching_review = None
|
||||
graph_exec_ids: set[str] = set()
|
||||
|
||||
for node_exec_id in all_request_node_ids:
|
||||
review = await get_pending_review_by_node_exec_id(node_exec_id, user_id)
|
||||
if not review:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"No pending review found for node execution {node_exec_id}",
|
||||
)
|
||||
if matching_review is None:
|
||||
matching_review = review
|
||||
graph_exec_ids.add(review.graph_exec_id)
|
||||
|
||||
# Ensure all reviews belong to the same execution
|
||||
if len(graph_exec_ids) > 1:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_409_CONFLICT,
|
||||
detail="All reviews in a single request must belong to the same execution.",
|
||||
)
|
||||
|
||||
# Safety check (matching_review should never be None here due to validation above)
|
||||
if matching_review is None:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail="Internal error: No matching review found despite validation",
|
||||
)
|
||||
|
||||
graph_exec_id = matching_review.graph_exec_id
|
||||
|
||||
# Validate execution status before processing reviews
|
||||
graph_exec_meta = await get_graph_execution_meta(
|
||||
user_id=user_id, execution_id=graph_exec_id
|
||||
)
|
||||
|
||||
if not graph_exec_meta:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"Graph execution #{graph_exec_id} not found",
|
||||
)
|
||||
|
||||
# Only allow processing reviews if execution is paused for review
|
||||
# or incomplete (partial execution with some reviews already processed)
|
||||
if graph_exec_meta.status not in (
|
||||
ExecutionStatus.REVIEW,
|
||||
ExecutionStatus.INCOMPLETE,
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_409_CONFLICT,
|
||||
detail=f"Cannot process reviews while execution status is {graph_exec_meta.status}. "
|
||||
f"Reviews can only be processed when execution is paused (REVIEW status). "
|
||||
f"Current status: {graph_exec_meta.status}",
|
||||
)
|
||||
|
||||
# Build review decisions map and track which reviews requested auto-approval
|
||||
# Auto-approved reviews use original data (no modifications allowed)
|
||||
# Build review decisions map
|
||||
review_decisions = {}
|
||||
auto_approve_requests = {} # Map node_exec_id -> auto_approve_future flag
|
||||
|
||||
for review in request.reviews:
|
||||
review_status = (
|
||||
ReviewStatus.APPROVED if review.approved else ReviewStatus.REJECTED
|
||||
)
|
||||
# If this review requested auto-approval, don't allow data modifications
|
||||
reviewed_data = None if review.auto_approve_future else review.reviewed_data
|
||||
review_decisions[review.node_exec_id] = (
|
||||
review_status,
|
||||
reviewed_data,
|
||||
review.reviewed_data,
|
||||
review.message,
|
||||
)
|
||||
auto_approve_requests[review.node_exec_id] = review.auto_approve_future
|
||||
|
||||
# Process all reviews
|
||||
updated_reviews = await process_all_reviews_for_execution(
|
||||
@@ -218,32 +145,6 @@ async def process_review_action(
|
||||
review_decisions=review_decisions,
|
||||
)
|
||||
|
||||
# Create auto-approval records for approved reviews that requested it
|
||||
# Note: Processing sequentially to avoid event loop issues in tests
|
||||
for node_exec_id, review_result in updated_reviews.items():
|
||||
# Only create auto-approval if:
|
||||
# 1. This review was approved
|
||||
# 2. The review requested auto-approval
|
||||
if review_result.status == ReviewStatus.APPROVED and auto_approve_requests.get(
|
||||
node_exec_id, False
|
||||
):
|
||||
try:
|
||||
node_exec = await get_node_execution(node_exec_id)
|
||||
if node_exec:
|
||||
await create_auto_approval_record(
|
||||
user_id=user_id,
|
||||
graph_exec_id=review_result.graph_exec_id,
|
||||
graph_id=review_result.graph_id,
|
||||
graph_version=review_result.graph_version,
|
||||
node_id=node_exec.node_id,
|
||||
payload=review_result.payload,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to create auto-approval record for {node_exec_id}",
|
||||
exc_info=e,
|
||||
)
|
||||
|
||||
# Count results
|
||||
approved_count = sum(
|
||||
1
|
||||
@@ -256,37 +157,22 @@ async def process_review_action(
|
||||
if review.status == ReviewStatus.REJECTED
|
||||
)
|
||||
|
||||
# Resume execution only if ALL pending reviews for this execution have been processed
|
||||
# Resume execution if we processed some reviews
|
||||
if updated_reviews:
|
||||
# Get graph execution ID from any processed review
|
||||
first_review = next(iter(updated_reviews.values()))
|
||||
graph_exec_id = first_review.graph_exec_id
|
||||
|
||||
# Check if any pending reviews remain for this execution
|
||||
still_has_pending = await has_pending_reviews_for_graph_exec(graph_exec_id)
|
||||
|
||||
if not still_has_pending:
|
||||
# Get the graph_id from any processed review
|
||||
first_review = next(iter(updated_reviews.values()))
|
||||
|
||||
# Resume execution
|
||||
try:
|
||||
# Fetch user and settings to build complete execution context
|
||||
user = await get_user_by_id(user_id)
|
||||
settings = await get_graph_settings(
|
||||
user_id=user_id, graph_id=first_review.graph_id
|
||||
)
|
||||
|
||||
# Preserve user's timezone preference when resuming execution
|
||||
user_timezone = (
|
||||
user.timezone if user.timezone != USER_TIMEZONE_NOT_SET else "UTC"
|
||||
)
|
||||
|
||||
execution_context = ExecutionContext(
|
||||
human_in_the_loop_safe_mode=settings.human_in_the_loop_safe_mode,
|
||||
sensitive_action_safe_mode=settings.sensitive_action_safe_mode,
|
||||
user_timezone=user_timezone,
|
||||
)
|
||||
|
||||
await add_graph_execution(
|
||||
graph_id=first_review.graph_id,
|
||||
user_id=user_id,
|
||||
graph_exec_id=graph_exec_id,
|
||||
execution_context=execution_context,
|
||||
)
|
||||
logger.info(f"Resumed execution {graph_exec_id}")
|
||||
except Exception as e:
|
||||
|
||||
@@ -1552,7 +1552,7 @@ async def review_store_submission(
|
||||
|
||||
# Generate embedding for approved listing (blocking - admin operation)
|
||||
# Inside transaction: if embedding fails, entire transaction rolls back
|
||||
embedding_success = await ensure_embedding(
|
||||
await ensure_embedding(
|
||||
version_id=store_listing_version_id,
|
||||
name=store_listing_version.name,
|
||||
description=store_listing_version.description,
|
||||
@@ -1560,12 +1560,6 @@ async def review_store_submission(
|
||||
categories=store_listing_version.categories or [],
|
||||
tx=tx,
|
||||
)
|
||||
if not embedding_success:
|
||||
raise ValueError(
|
||||
f"Failed to generate embedding for listing {store_listing_version_id}. "
|
||||
"This is likely due to OpenAI API being unavailable. "
|
||||
"Please try again later or contact support if the issue persists."
|
||||
)
|
||||
|
||||
await prisma.models.StoreListing.prisma(tx).update(
|
||||
where={"id": store_listing_version.StoreListing.id},
|
||||
|
||||
@@ -6,7 +6,6 @@ Handles generation and storage of OpenAI embeddings for all content types
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import contextvars
|
||||
import logging
|
||||
import time
|
||||
from typing import Any
|
||||
@@ -22,11 +21,6 @@ from backend.util.json import dumps
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Context variable to track errors logged in the current task/operation
|
||||
# This prevents spamming the same error multiple times when processing batches
|
||||
_logged_errors: contextvars.ContextVar[set[str]] = contextvars.ContextVar(
|
||||
"_logged_errors"
|
||||
)
|
||||
|
||||
# OpenAI embedding model configuration
|
||||
EMBEDDING_MODEL = "text-embedding-3-small"
|
||||
@@ -37,42 +31,6 @@ EMBEDDING_DIM = 1536
|
||||
EMBEDDING_MAX_TOKENS = 8191
|
||||
|
||||
|
||||
def log_once_per_task(error_key: str, log_fn, message: str, **kwargs) -> bool:
|
||||
"""
|
||||
Log an error/warning only once per task/operation to avoid log spam.
|
||||
|
||||
Uses contextvars to track what has been logged in the current async context.
|
||||
Useful when processing batches where the same error might occur for many items.
|
||||
|
||||
Args:
|
||||
error_key: Unique identifier for this error type
|
||||
log_fn: Logger function to call (e.g., logger.error, logger.warning)
|
||||
message: Message to log
|
||||
**kwargs: Additional arguments to pass to log_fn
|
||||
|
||||
Returns:
|
||||
True if the message was logged, False if it was suppressed (already logged)
|
||||
|
||||
Example:
|
||||
log_once_per_task("missing_api_key", logger.error, "API key not set")
|
||||
"""
|
||||
# Get current logged errors, or create a new set if this is the first call in this context
|
||||
logged = _logged_errors.get(None)
|
||||
if logged is None:
|
||||
logged = set()
|
||||
_logged_errors.set(logged)
|
||||
|
||||
if error_key in logged:
|
||||
return False
|
||||
|
||||
# Log the message with a note that it will only appear once
|
||||
log_fn(f"{message} (This message will only be shown once per task.)", **kwargs)
|
||||
|
||||
# Mark as logged
|
||||
logged.add(error_key)
|
||||
return True
|
||||
|
||||
|
||||
def build_searchable_text(
|
||||
name: str,
|
||||
description: str,
|
||||
@@ -105,53 +63,42 @@ def build_searchable_text(
|
||||
return " ".join(parts)
|
||||
|
||||
|
||||
async def generate_embedding(text: str) -> list[float] | None:
|
||||
async def generate_embedding(text: str) -> list[float]:
|
||||
"""
|
||||
Generate embedding for text using OpenAI API.
|
||||
|
||||
Returns None if embedding generation fails.
|
||||
Fail-fast: no retries to maintain consistency with approval flow.
|
||||
Raises exceptions on failure - caller should handle.
|
||||
"""
|
||||
try:
|
||||
client = get_openai_client()
|
||||
if not client:
|
||||
log_once_per_task(
|
||||
"openai_api_key_missing",
|
||||
logger.error,
|
||||
"openai_internal_api_key not set, cannot generate embeddings",
|
||||
)
|
||||
return None
|
||||
client = get_openai_client()
|
||||
if not client:
|
||||
raise RuntimeError("openai_internal_api_key not set, cannot generate embedding")
|
||||
|
||||
# Truncate text to token limit using tiktoken
|
||||
# Character-based truncation is insufficient because token ratios vary by content type
|
||||
enc = encoding_for_model(EMBEDDING_MODEL)
|
||||
tokens = enc.encode(text)
|
||||
if len(tokens) > EMBEDDING_MAX_TOKENS:
|
||||
tokens = tokens[:EMBEDDING_MAX_TOKENS]
|
||||
truncated_text = enc.decode(tokens)
|
||||
logger.info(
|
||||
f"Truncated text from {len(enc.encode(text))} to {len(tokens)} tokens"
|
||||
)
|
||||
else:
|
||||
truncated_text = text
|
||||
|
||||
start_time = time.time()
|
||||
response = await client.embeddings.create(
|
||||
model=EMBEDDING_MODEL,
|
||||
input=truncated_text,
|
||||
)
|
||||
latency_ms = (time.time() - start_time) * 1000
|
||||
|
||||
embedding = response.data[0].embedding
|
||||
# Truncate text to token limit using tiktoken
|
||||
# Character-based truncation is insufficient because token ratios vary by content type
|
||||
enc = encoding_for_model(EMBEDDING_MODEL)
|
||||
tokens = enc.encode(text)
|
||||
if len(tokens) > EMBEDDING_MAX_TOKENS:
|
||||
tokens = tokens[:EMBEDDING_MAX_TOKENS]
|
||||
truncated_text = enc.decode(tokens)
|
||||
logger.info(
|
||||
f"Generated embedding: {len(embedding)} dims, "
|
||||
f"{len(tokens)} tokens, {latency_ms:.0f}ms"
|
||||
f"Truncated text from {len(enc.encode(text))} to {len(tokens)} tokens"
|
||||
)
|
||||
return embedding
|
||||
else:
|
||||
truncated_text = text
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to generate embedding: {e}")
|
||||
return None
|
||||
start_time = time.time()
|
||||
response = await client.embeddings.create(
|
||||
model=EMBEDDING_MODEL,
|
||||
input=truncated_text,
|
||||
)
|
||||
latency_ms = (time.time() - start_time) * 1000
|
||||
|
||||
embedding = response.data[0].embedding
|
||||
logger.info(
|
||||
f"Generated embedding: {len(embedding)} dims, "
|
||||
f"{len(tokens)} tokens, {latency_ms:.0f}ms"
|
||||
)
|
||||
return embedding
|
||||
|
||||
|
||||
async def store_embedding(
|
||||
@@ -190,48 +137,45 @@ async def store_content_embedding(
|
||||
|
||||
New function for unified content embedding storage.
|
||||
Uses raw SQL since Prisma doesn't natively support pgvector.
|
||||
|
||||
Raises exceptions on failure - caller should handle.
|
||||
"""
|
||||
try:
|
||||
client = tx if tx else prisma.get_client()
|
||||
client = tx if tx else prisma.get_client()
|
||||
|
||||
# Convert embedding to PostgreSQL vector format
|
||||
embedding_str = embedding_to_vector_string(embedding)
|
||||
metadata_json = dumps(metadata or {})
|
||||
# Convert embedding to PostgreSQL vector format
|
||||
embedding_str = embedding_to_vector_string(embedding)
|
||||
metadata_json = dumps(metadata or {})
|
||||
|
||||
# Upsert the embedding
|
||||
# WHERE clause in DO UPDATE prevents PostgreSQL 15 bug with NULLS NOT DISTINCT
|
||||
# Use unqualified ::vector - pgvector is in search_path on all environments
|
||||
await execute_raw_with_schema(
|
||||
"""
|
||||
INSERT INTO {schema_prefix}"UnifiedContentEmbedding" (
|
||||
"id", "contentType", "contentId", "userId", "embedding", "searchableText", "metadata", "createdAt", "updatedAt"
|
||||
)
|
||||
VALUES (gen_random_uuid()::text, $1::{schema_prefix}"ContentType", $2, $3, $4::vector, $5, $6::jsonb, NOW(), NOW())
|
||||
ON CONFLICT ("contentType", "contentId", "userId")
|
||||
DO UPDATE SET
|
||||
"embedding" = $4::vector,
|
||||
"searchableText" = $5,
|
||||
"metadata" = $6::jsonb,
|
||||
"updatedAt" = NOW()
|
||||
WHERE {schema_prefix}"UnifiedContentEmbedding"."contentType" = $1::{schema_prefix}"ContentType"
|
||||
AND {schema_prefix}"UnifiedContentEmbedding"."contentId" = $2
|
||||
AND ({schema_prefix}"UnifiedContentEmbedding"."userId" = $3 OR ($3 IS NULL AND {schema_prefix}"UnifiedContentEmbedding"."userId" IS NULL))
|
||||
""",
|
||||
content_type,
|
||||
content_id,
|
||||
user_id,
|
||||
embedding_str,
|
||||
searchable_text,
|
||||
metadata_json,
|
||||
client=client,
|
||||
# Upsert the embedding
|
||||
# WHERE clause in DO UPDATE prevents PostgreSQL 15 bug with NULLS NOT DISTINCT
|
||||
# Use unqualified ::vector - pgvector is in search_path on all environments
|
||||
await execute_raw_with_schema(
|
||||
"""
|
||||
INSERT INTO {schema_prefix}"UnifiedContentEmbedding" (
|
||||
"id", "contentType", "contentId", "userId", "embedding", "searchableText", "metadata", "createdAt", "updatedAt"
|
||||
)
|
||||
VALUES (gen_random_uuid()::text, $1::{schema_prefix}"ContentType", $2, $3, $4::vector, $5, $6::jsonb, NOW(), NOW())
|
||||
ON CONFLICT ("contentType", "contentId", "userId")
|
||||
DO UPDATE SET
|
||||
"embedding" = $4::vector,
|
||||
"searchableText" = $5,
|
||||
"metadata" = $6::jsonb,
|
||||
"updatedAt" = NOW()
|
||||
WHERE {schema_prefix}"UnifiedContentEmbedding"."contentType" = $1::{schema_prefix}"ContentType"
|
||||
AND {schema_prefix}"UnifiedContentEmbedding"."contentId" = $2
|
||||
AND ({schema_prefix}"UnifiedContentEmbedding"."userId" = $3 OR ($3 IS NULL AND {schema_prefix}"UnifiedContentEmbedding"."userId" IS NULL))
|
||||
""",
|
||||
content_type,
|
||||
content_id,
|
||||
user_id,
|
||||
embedding_str,
|
||||
searchable_text,
|
||||
metadata_json,
|
||||
client=client,
|
||||
)
|
||||
|
||||
logger.info(f"Stored embedding for {content_type}:{content_id}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to store embedding for {content_type}:{content_id}: {e}")
|
||||
return False
|
||||
logger.info(f"Stored embedding for {content_type}:{content_id}")
|
||||
return True
|
||||
|
||||
|
||||
async def get_embedding(version_id: str) -> dict[str, Any] | None:
|
||||
@@ -263,34 +207,31 @@ async def get_content_embedding(
|
||||
|
||||
New function for unified content embedding retrieval.
|
||||
Returns dict with contentType, contentId, embedding, timestamps or None if not found.
|
||||
|
||||
Raises exceptions on failure - caller should handle.
|
||||
"""
|
||||
try:
|
||||
result = await query_raw_with_schema(
|
||||
"""
|
||||
SELECT
|
||||
"contentType",
|
||||
"contentId",
|
||||
"userId",
|
||||
"embedding"::text as "embedding",
|
||||
"searchableText",
|
||||
"metadata",
|
||||
"createdAt",
|
||||
"updatedAt"
|
||||
FROM {schema_prefix}"UnifiedContentEmbedding"
|
||||
WHERE "contentType" = $1::{schema_prefix}"ContentType" AND "contentId" = $2 AND ("userId" = $3 OR ($3 IS NULL AND "userId" IS NULL))
|
||||
""",
|
||||
content_type,
|
||||
content_id,
|
||||
user_id,
|
||||
)
|
||||
result = await query_raw_with_schema(
|
||||
"""
|
||||
SELECT
|
||||
"contentType",
|
||||
"contentId",
|
||||
"userId",
|
||||
"embedding"::text as "embedding",
|
||||
"searchableText",
|
||||
"metadata",
|
||||
"createdAt",
|
||||
"updatedAt"
|
||||
FROM {schema_prefix}"UnifiedContentEmbedding"
|
||||
WHERE "contentType" = $1::{schema_prefix}"ContentType" AND "contentId" = $2 AND ("userId" = $3 OR ($3 IS NULL AND "userId" IS NULL))
|
||||
""",
|
||||
content_type,
|
||||
content_id,
|
||||
user_id,
|
||||
)
|
||||
|
||||
if result and len(result) > 0:
|
||||
return result[0]
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get embedding for {content_type}:{content_id}: {e}")
|
||||
return None
|
||||
if result and len(result) > 0:
|
||||
return result[0]
|
||||
return None
|
||||
|
||||
|
||||
async def ensure_embedding(
|
||||
@@ -318,51 +259,38 @@ async def ensure_embedding(
|
||||
tx: Optional transaction client
|
||||
|
||||
Returns:
|
||||
True if embedding exists/was created, False on failure
|
||||
True if embedding exists/was created
|
||||
|
||||
Raises exceptions on failure - caller should handle.
|
||||
"""
|
||||
try:
|
||||
# Check if embedding already exists
|
||||
if not force:
|
||||
existing = await get_embedding(version_id)
|
||||
if existing and existing.get("embedding"):
|
||||
logger.debug(f"Embedding for version {version_id} already exists")
|
||||
return True
|
||||
# Check if embedding already exists
|
||||
if not force:
|
||||
existing = await get_embedding(version_id)
|
||||
if existing and existing.get("embedding"):
|
||||
logger.debug(f"Embedding for version {version_id} already exists")
|
||||
return True
|
||||
|
||||
# Build searchable text for embedding
|
||||
searchable_text = build_searchable_text(
|
||||
name, description, sub_heading, categories
|
||||
)
|
||||
# Build searchable text for embedding
|
||||
searchable_text = build_searchable_text(name, description, sub_heading, categories)
|
||||
|
||||
# Generate new embedding
|
||||
embedding = await generate_embedding(searchable_text)
|
||||
if embedding is None:
|
||||
log_once_per_task(
|
||||
"embedding_generation_failed",
|
||||
logger.warning,
|
||||
"Could not generate embeddings (missing API key or service unavailable). "
|
||||
"Embedding generation is disabled for this task.",
|
||||
)
|
||||
return False
|
||||
# Generate new embedding
|
||||
embedding = await generate_embedding(searchable_text)
|
||||
|
||||
# Store the embedding with metadata using new function
|
||||
metadata = {
|
||||
"name": name,
|
||||
"subHeading": sub_heading,
|
||||
"categories": categories,
|
||||
}
|
||||
return await store_content_embedding(
|
||||
content_type=ContentType.STORE_AGENT,
|
||||
content_id=version_id,
|
||||
embedding=embedding,
|
||||
searchable_text=searchable_text,
|
||||
metadata=metadata,
|
||||
user_id=None, # Store agents are public
|
||||
tx=tx,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to ensure embedding for version {version_id}: {e}")
|
||||
return False
|
||||
# Store the embedding with metadata using new function
|
||||
metadata = {
|
||||
"name": name,
|
||||
"subHeading": sub_heading,
|
||||
"categories": categories,
|
||||
}
|
||||
return await store_content_embedding(
|
||||
content_type=ContentType.STORE_AGENT,
|
||||
content_id=version_id,
|
||||
embedding=embedding,
|
||||
searchable_text=searchable_text,
|
||||
metadata=metadata,
|
||||
user_id=None, # Store agents are public
|
||||
tx=tx,
|
||||
)
|
||||
|
||||
|
||||
async def delete_embedding(version_id: str) -> bool:
|
||||
@@ -572,6 +500,24 @@ async def backfill_all_content_types(batch_size: int = 10) -> dict[str, Any]:
|
||||
success = sum(1 for result in results if result is True)
|
||||
failed = len(results) - success
|
||||
|
||||
# Aggregate unique errors to avoid Sentry spam
|
||||
if failed > 0:
|
||||
# Group errors by type and message
|
||||
error_summary: dict[str, int] = {}
|
||||
for result in results:
|
||||
if isinstance(result, Exception):
|
||||
error_key = f"{type(result).__name__}: {str(result)}"
|
||||
error_summary[error_key] = error_summary.get(error_key, 0) + 1
|
||||
|
||||
# Log aggregated error summary
|
||||
error_details = ", ".join(
|
||||
f"{error} ({count}x)" for error, count in error_summary.items()
|
||||
)
|
||||
logger.error(
|
||||
f"{content_type.value}: {failed}/{len(results)} embeddings failed. "
|
||||
f"Errors: {error_details}"
|
||||
)
|
||||
|
||||
results_by_type[content_type.value] = {
|
||||
"processed": len(missing_items),
|
||||
"success": success,
|
||||
@@ -608,11 +554,12 @@ async def backfill_all_content_types(batch_size: int = 10) -> dict[str, Any]:
|
||||
}
|
||||
|
||||
|
||||
async def embed_query(query: str) -> list[float] | None:
|
||||
async def embed_query(query: str) -> list[float]:
|
||||
"""
|
||||
Generate embedding for a search query.
|
||||
|
||||
Same as generate_embedding but with clearer intent.
|
||||
Raises exceptions on failure - caller should handle.
|
||||
"""
|
||||
return await generate_embedding(query)
|
||||
|
||||
@@ -645,43 +592,30 @@ async def ensure_content_embedding(
|
||||
tx: Optional transaction client
|
||||
|
||||
Returns:
|
||||
True if embedding exists/was created, False on failure
|
||||
True if embedding exists/was created
|
||||
|
||||
Raises exceptions on failure - caller should handle.
|
||||
"""
|
||||
try:
|
||||
# Check if embedding already exists
|
||||
if not force:
|
||||
existing = await get_content_embedding(content_type, content_id, user_id)
|
||||
if existing and existing.get("embedding"):
|
||||
logger.debug(
|
||||
f"Embedding for {content_type}:{content_id} already exists"
|
||||
)
|
||||
return True
|
||||
# Check if embedding already exists
|
||||
if not force:
|
||||
existing = await get_content_embedding(content_type, content_id, user_id)
|
||||
if existing and existing.get("embedding"):
|
||||
logger.debug(f"Embedding for {content_type}:{content_id} already exists")
|
||||
return True
|
||||
|
||||
# Generate new embedding
|
||||
embedding = await generate_embedding(searchable_text)
|
||||
if embedding is None:
|
||||
log_once_per_task(
|
||||
"embedding_generation_failed",
|
||||
logger.warning,
|
||||
"Could not generate embeddings (missing API key or service unavailable). "
|
||||
"Embedding generation is disabled for this task.",
|
||||
)
|
||||
return False
|
||||
# Generate new embedding
|
||||
embedding = await generate_embedding(searchable_text)
|
||||
|
||||
# Store the embedding
|
||||
return await store_content_embedding(
|
||||
content_type=content_type,
|
||||
content_id=content_id,
|
||||
embedding=embedding,
|
||||
searchable_text=searchable_text,
|
||||
metadata=metadata or {},
|
||||
user_id=user_id,
|
||||
tx=tx,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to ensure embedding for {content_type}:{content_id}: {e}")
|
||||
return False
|
||||
# Store the embedding
|
||||
return await store_content_embedding(
|
||||
content_type=content_type,
|
||||
content_id=content_id,
|
||||
embedding=embedding,
|
||||
searchable_text=searchable_text,
|
||||
metadata=metadata or {},
|
||||
user_id=user_id,
|
||||
tx=tx,
|
||||
)
|
||||
|
||||
|
||||
async def cleanup_orphaned_embeddings() -> dict[str, Any]:
|
||||
@@ -908,9 +842,8 @@ async def semantic_search(
|
||||
limit = 100
|
||||
|
||||
# Generate query embedding
|
||||
query_embedding = await embed_query(query)
|
||||
|
||||
if query_embedding is not None:
|
||||
try:
|
||||
query_embedding = await embed_query(query)
|
||||
# Semantic search with embeddings
|
||||
embedding_str = embedding_to_vector_string(query_embedding)
|
||||
|
||||
@@ -961,24 +894,21 @@ async def semantic_search(
|
||||
"""
|
||||
)
|
||||
|
||||
try:
|
||||
results = await query_raw_with_schema(sql, *params)
|
||||
return [
|
||||
{
|
||||
"content_id": row["content_id"],
|
||||
"content_type": row["content_type"],
|
||||
"searchable_text": row["searchable_text"],
|
||||
"metadata": row["metadata"],
|
||||
"similarity": float(row["similarity"]),
|
||||
}
|
||||
for row in results
|
||||
]
|
||||
except Exception as e:
|
||||
logger.error(f"Semantic search failed: {e}")
|
||||
# Fall through to lexical search below
|
||||
results = await query_raw_with_schema(sql, *params)
|
||||
return [
|
||||
{
|
||||
"content_id": row["content_id"],
|
||||
"content_type": row["content_type"],
|
||||
"searchable_text": row["searchable_text"],
|
||||
"metadata": row["metadata"],
|
||||
"similarity": float(row["similarity"]),
|
||||
}
|
||||
for row in results
|
||||
]
|
||||
except Exception as e:
|
||||
logger.warning(f"Semantic search failed, falling back to lexical search: {e}")
|
||||
|
||||
# Fallback to lexical search if embeddings unavailable
|
||||
logger.warning("Falling back to lexical search (embeddings unavailable)")
|
||||
|
||||
params_lexical: list[Any] = [limit]
|
||||
user_filter = ""
|
||||
|
||||
@@ -298,17 +298,16 @@ async def test_schema_handling_error_cases():
|
||||
mock_client.execute_raw.side_effect = Exception("Database error")
|
||||
mock_get_client.return_value = mock_client
|
||||
|
||||
result = await embeddings.store_content_embedding(
|
||||
content_type=ContentType.STORE_AGENT,
|
||||
content_id="test-id",
|
||||
embedding=[0.1] * EMBEDDING_DIM,
|
||||
searchable_text="test",
|
||||
metadata=None,
|
||||
user_id=None,
|
||||
)
|
||||
|
||||
# Should return False on error, not raise
|
||||
assert result is False
|
||||
# Should raise exception on error
|
||||
with pytest.raises(Exception, match="Database error"):
|
||||
await embeddings.store_content_embedding(
|
||||
content_type=ContentType.STORE_AGENT,
|
||||
content_id="test-id",
|
||||
embedding=[0.1] * EMBEDDING_DIM,
|
||||
searchable_text="test",
|
||||
metadata=None,
|
||||
user_id=None,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -80,9 +80,8 @@ async def test_generate_embedding_no_api_key():
|
||||
) as mock_get_client:
|
||||
mock_get_client.return_value = None
|
||||
|
||||
result = await embeddings.generate_embedding("test text")
|
||||
|
||||
assert result is None
|
||||
with pytest.raises(RuntimeError, match="openai_internal_api_key not set"):
|
||||
await embeddings.generate_embedding("test text")
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
@@ -97,9 +96,8 @@ async def test_generate_embedding_api_error():
|
||||
) as mock_get_client:
|
||||
mock_get_client.return_value = mock_client
|
||||
|
||||
result = await embeddings.generate_embedding("test text")
|
||||
|
||||
assert result is None
|
||||
with pytest.raises(Exception, match="API Error"):
|
||||
await embeddings.generate_embedding("test text")
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
@@ -173,11 +171,10 @@ async def test_store_embedding_database_error(mocker):
|
||||
|
||||
embedding = [0.1, 0.2, 0.3]
|
||||
|
||||
result = await embeddings.store_embedding(
|
||||
version_id="test-version-id", embedding=embedding, tx=mock_client
|
||||
)
|
||||
|
||||
assert result is False
|
||||
with pytest.raises(Exception, match="Database error"):
|
||||
await embeddings.store_embedding(
|
||||
version_id="test-version-id", embedding=embedding, tx=mock_client
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
@@ -277,17 +274,16 @@ async def test_ensure_embedding_create_new(mock_get, mock_store, mock_generate):
|
||||
async def test_ensure_embedding_generation_fails(mock_get, mock_generate):
|
||||
"""Test ensure_embedding when generation fails."""
|
||||
mock_get.return_value = None
|
||||
mock_generate.return_value = None
|
||||
mock_generate.side_effect = Exception("Generation failed")
|
||||
|
||||
result = await embeddings.ensure_embedding(
|
||||
version_id="test-id",
|
||||
name="Test",
|
||||
description="Test description",
|
||||
sub_heading="Test heading",
|
||||
categories=["test"],
|
||||
)
|
||||
|
||||
assert result is False
|
||||
with pytest.raises(Exception, match="Generation failed"):
|
||||
await embeddings.ensure_embedding(
|
||||
version_id="test-id",
|
||||
name="Test",
|
||||
description="Test description",
|
||||
sub_heading="Test heading",
|
||||
categories=["test"],
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
|
||||
@@ -186,13 +186,12 @@ async def unified_hybrid_search(
|
||||
|
||||
offset = (page - 1) * page_size
|
||||
|
||||
# Generate query embedding
|
||||
query_embedding = await embed_query(query)
|
||||
|
||||
# Graceful degradation if embedding unavailable
|
||||
if query_embedding is None or not query_embedding:
|
||||
# Generate query embedding with graceful degradation
|
||||
try:
|
||||
query_embedding = await embed_query(query)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Failed to generate query embedding - falling back to lexical-only search. "
|
||||
f"Failed to generate query embedding - falling back to lexical-only search: {e}. "
|
||||
"Check that openai_internal_api_key is configured and OpenAI API is accessible."
|
||||
)
|
||||
query_embedding = [0.0] * EMBEDDING_DIM
|
||||
@@ -464,13 +463,12 @@ async def hybrid_search(
|
||||
|
||||
offset = (page - 1) * page_size
|
||||
|
||||
# Generate query embedding
|
||||
query_embedding = await embed_query(query)
|
||||
|
||||
# Graceful degradation
|
||||
if query_embedding is None or not query_embedding:
|
||||
# Generate query embedding with graceful degradation
|
||||
try:
|
||||
query_embedding = await embed_query(query)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Failed to generate query embedding - falling back to lexical-only search."
|
||||
f"Failed to generate query embedding - falling back to lexical-only search: {e}"
|
||||
)
|
||||
query_embedding = [0.0] * EMBEDDING_DIM
|
||||
total_non_semantic = (
|
||||
|
||||
@@ -172,8 +172,8 @@ async def test_hybrid_search_without_embeddings():
|
||||
with patch(
|
||||
"backend.api.features.store.hybrid_search.query_raw_with_schema"
|
||||
) as mock_query:
|
||||
# Simulate embedding failure
|
||||
mock_embed.return_value = None
|
||||
# Simulate embedding failure by raising exception
|
||||
mock_embed.side_effect = Exception("Embedding generation failed")
|
||||
mock_query.return_value = mock_results
|
||||
|
||||
# Should NOT raise - graceful degradation
|
||||
@@ -613,7 +613,9 @@ async def test_unified_hybrid_search_graceful_degradation():
|
||||
"backend.api.features.store.hybrid_search.embed_query"
|
||||
) as mock_embed:
|
||||
mock_query.return_value = mock_results
|
||||
mock_embed.return_value = None # Embedding failure
|
||||
mock_embed.side_effect = Exception(
|
||||
"Embedding generation failed"
|
||||
) # Embedding failure
|
||||
|
||||
# Should NOT raise - graceful degradation
|
||||
results, total = await unified_hybrid_search(
|
||||
|
||||
@@ -116,7 +116,6 @@ class PrintToConsoleBlock(Block):
|
||||
input_schema=PrintToConsoleBlock.Input,
|
||||
output_schema=PrintToConsoleBlock.Output,
|
||||
test_input={"text": "Hello, World!"},
|
||||
is_sensitive_action=True,
|
||||
test_output=[
|
||||
("output", "Hello, World!"),
|
||||
("status", "printed"),
|
||||
|
||||
@@ -9,7 +9,7 @@ from typing import Any, Optional
|
||||
from prisma.enums import ReviewStatus
|
||||
from pydantic import BaseModel
|
||||
|
||||
from backend.data.execution import ExecutionStatus
|
||||
from backend.data.execution import ExecutionContext, ExecutionStatus
|
||||
from backend.data.human_review import ReviewResult
|
||||
from backend.executor.manager import async_update_node_execution_status
|
||||
from backend.util.clients import get_database_manager_async_client
|
||||
@@ -28,11 +28,6 @@ class ReviewDecision(BaseModel):
|
||||
class HITLReviewHelper:
|
||||
"""Helper class for Human-In-The-Loop review operations."""
|
||||
|
||||
@staticmethod
|
||||
async def check_approval(**kwargs) -> Optional[ReviewResult]:
|
||||
"""Check if there's an existing approval for this node execution."""
|
||||
return await get_database_manager_async_client().check_approval(**kwargs)
|
||||
|
||||
@staticmethod
|
||||
async def get_or_create_human_review(**kwargs) -> Optional[ReviewResult]:
|
||||
"""Create or retrieve a human review from the database."""
|
||||
@@ -60,11 +55,11 @@ class HITLReviewHelper:
|
||||
async def _handle_review_request(
|
||||
input_data: Any,
|
||||
user_id: str,
|
||||
node_id: str,
|
||||
node_exec_id: str,
|
||||
graph_exec_id: str,
|
||||
graph_id: str,
|
||||
graph_version: int,
|
||||
execution_context: ExecutionContext,
|
||||
block_name: str = "Block",
|
||||
editable: bool = False,
|
||||
) -> Optional[ReviewResult]:
|
||||
@@ -74,11 +69,11 @@ class HITLReviewHelper:
|
||||
Args:
|
||||
input_data: The input data to be reviewed
|
||||
user_id: ID of the user requesting the review
|
||||
node_id: ID of the node in the graph definition
|
||||
node_exec_id: ID of the node execution
|
||||
graph_exec_id: ID of the graph execution
|
||||
graph_id: ID of the graph
|
||||
graph_version: Version of the graph
|
||||
execution_context: Current execution context
|
||||
block_name: Name of the block requesting review
|
||||
editable: Whether the reviewer can edit the data
|
||||
|
||||
@@ -88,41 +83,15 @@ class HITLReviewHelper:
|
||||
Raises:
|
||||
Exception: If review creation or status update fails
|
||||
"""
|
||||
# Note: Safe mode checks (human_in_the_loop_safe_mode, sensitive_action_safe_mode)
|
||||
# are handled by the caller:
|
||||
# - HITL blocks check human_in_the_loop_safe_mode in their run() method
|
||||
# - Sensitive action blocks check sensitive_action_safe_mode in is_block_exec_need_review()
|
||||
# This function only handles checking for existing approvals.
|
||||
|
||||
# Check if this node has already been approved (normal or auto-approval)
|
||||
if approval_result := await HITLReviewHelper.check_approval(
|
||||
node_exec_id=node_exec_id,
|
||||
graph_exec_id=graph_exec_id,
|
||||
node_id=node_id,
|
||||
user_id=user_id,
|
||||
input_data=input_data,
|
||||
):
|
||||
# Skip review if safe mode is disabled - return auto-approved result
|
||||
if not execution_context.human_in_the_loop_safe_mode:
|
||||
logger.info(
|
||||
f"Block {block_name} skipping review for node {node_exec_id} - "
|
||||
f"found existing approval"
|
||||
)
|
||||
# Return a new ReviewResult with the current node_exec_id but approved status
|
||||
# For auto-approvals, always use current input_data
|
||||
# For normal approvals, use approval_result.data unless it's None
|
||||
is_auto_approval = approval_result.node_exec_id != node_exec_id
|
||||
approved_data = (
|
||||
input_data
|
||||
if is_auto_approval
|
||||
else (
|
||||
approval_result.data
|
||||
if approval_result.data is not None
|
||||
else input_data
|
||||
)
|
||||
f"Block {block_name} skipping review for node {node_exec_id} - safe mode disabled"
|
||||
)
|
||||
return ReviewResult(
|
||||
data=approved_data,
|
||||
data=input_data,
|
||||
status=ReviewStatus.APPROVED,
|
||||
message=approval_result.message,
|
||||
message="Auto-approved (safe mode disabled)",
|
||||
processed=True,
|
||||
node_exec_id=node_exec_id,
|
||||
)
|
||||
@@ -160,11 +129,11 @@ class HITLReviewHelper:
|
||||
async def handle_review_decision(
|
||||
input_data: Any,
|
||||
user_id: str,
|
||||
node_id: str,
|
||||
node_exec_id: str,
|
||||
graph_exec_id: str,
|
||||
graph_id: str,
|
||||
graph_version: int,
|
||||
execution_context: ExecutionContext,
|
||||
block_name: str = "Block",
|
||||
editable: bool = False,
|
||||
) -> Optional[ReviewDecision]:
|
||||
@@ -174,11 +143,11 @@ class HITLReviewHelper:
|
||||
Args:
|
||||
input_data: The input data to be reviewed
|
||||
user_id: ID of the user requesting the review
|
||||
node_id: ID of the node in the graph definition
|
||||
node_exec_id: ID of the node execution
|
||||
graph_exec_id: ID of the graph execution
|
||||
graph_id: ID of the graph
|
||||
graph_version: Version of the graph
|
||||
execution_context: Current execution context
|
||||
block_name: Name of the block requesting review
|
||||
editable: Whether the reviewer can edit the data
|
||||
|
||||
@@ -189,11 +158,11 @@ class HITLReviewHelper:
|
||||
review_result = await HITLReviewHelper._handle_review_request(
|
||||
input_data=input_data,
|
||||
user_id=user_id,
|
||||
node_id=node_id,
|
||||
node_exec_id=node_exec_id,
|
||||
graph_exec_id=graph_exec_id,
|
||||
graph_id=graph_id,
|
||||
graph_version=graph_version,
|
||||
execution_context=execution_context,
|
||||
block_name=block_name,
|
||||
editable=editable,
|
||||
)
|
||||
|
||||
@@ -97,7 +97,6 @@ class HumanInTheLoopBlock(Block):
|
||||
input_data: Input,
|
||||
*,
|
||||
user_id: str,
|
||||
node_id: str,
|
||||
node_exec_id: str,
|
||||
graph_exec_id: str,
|
||||
graph_id: str,
|
||||
@@ -116,11 +115,11 @@ class HumanInTheLoopBlock(Block):
|
||||
decision = await self.handle_review_decision(
|
||||
input_data=input_data.data,
|
||||
user_id=user_id,
|
||||
node_id=node_id,
|
||||
node_exec_id=node_exec_id,
|
||||
graph_exec_id=graph_exec_id,
|
||||
graph_id=graph_id,
|
||||
graph_version=graph_version,
|
||||
execution_context=execution_context,
|
||||
block_name=self.name,
|
||||
editable=input_data.editable,
|
||||
)
|
||||
|
||||
@@ -441,7 +441,6 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
|
||||
static_output: bool = False,
|
||||
block_type: BlockType = BlockType.STANDARD,
|
||||
webhook_config: Optional[BlockWebhookConfig | BlockManualWebhookConfig] = None,
|
||||
is_sensitive_action: bool = False,
|
||||
):
|
||||
"""
|
||||
Initialize the block with the given schema.
|
||||
@@ -474,8 +473,8 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
|
||||
self.static_output = static_output
|
||||
self.block_type = block_type
|
||||
self.webhook_config = webhook_config
|
||||
self.is_sensitive_action = is_sensitive_action
|
||||
self.execution_stats: NodeExecutionStats = NodeExecutionStats()
|
||||
self.is_sensitive_action: bool = False
|
||||
|
||||
if self.webhook_config:
|
||||
if isinstance(self.webhook_config, BlockWebhookConfig):
|
||||
@@ -623,7 +622,6 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
|
||||
input_data: BlockInput,
|
||||
*,
|
||||
user_id: str,
|
||||
node_id: str,
|
||||
node_exec_id: str,
|
||||
graph_exec_id: str,
|
||||
graph_id: str,
|
||||
@@ -650,11 +648,11 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
|
||||
decision = await HITLReviewHelper.handle_review_decision(
|
||||
input_data=input_data,
|
||||
user_id=user_id,
|
||||
node_id=node_id,
|
||||
node_exec_id=node_exec_id,
|
||||
graph_exec_id=graph_exec_id,
|
||||
graph_id=graph_id,
|
||||
graph_version=graph_version,
|
||||
execution_context=execution_context,
|
||||
block_name=self.name,
|
||||
editable=True,
|
||||
)
|
||||
|
||||
@@ -6,7 +6,7 @@ Handles all database operations for pending human reviews.
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
from typing import Optional
|
||||
|
||||
from prisma.enums import ReviewStatus
|
||||
from prisma.models import PendingHumanReview
|
||||
@@ -17,12 +17,8 @@ from backend.api.features.executions.review.model import (
|
||||
PendingHumanReviewModel,
|
||||
SafeJsonData,
|
||||
)
|
||||
from backend.data.execution import get_graph_execution_meta
|
||||
from backend.util.json import SafeJson
|
||||
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -36,125 +32,6 @@ class ReviewResult(BaseModel):
|
||||
node_exec_id: str
|
||||
|
||||
|
||||
def get_auto_approve_key(graph_exec_id: str, node_id: str) -> str:
|
||||
"""Generate the special nodeExecId key for auto-approval records."""
|
||||
return f"auto_approve_{graph_exec_id}_{node_id}"
|
||||
|
||||
|
||||
async def check_approval(
|
||||
node_exec_id: str,
|
||||
graph_exec_id: str,
|
||||
node_id: str,
|
||||
user_id: str,
|
||||
input_data: SafeJsonData | None = None,
|
||||
) -> Optional[ReviewResult]:
|
||||
"""
|
||||
Check if there's an existing approval for this node execution.
|
||||
|
||||
Checks both:
|
||||
1. Normal approval by node_exec_id (previous run of the same node execution)
|
||||
2. Auto-approval by special key pattern "auto_approve_{graph_exec_id}_{node_id}"
|
||||
|
||||
Args:
|
||||
node_exec_id: ID of the node execution
|
||||
graph_exec_id: ID of the graph execution
|
||||
node_id: ID of the node definition (not execution)
|
||||
user_id: ID of the user (for data isolation)
|
||||
input_data: Current input data (used for auto-approvals to avoid stale data)
|
||||
|
||||
Returns:
|
||||
ReviewResult if approval found (either normal or auto), None otherwise
|
||||
"""
|
||||
auto_approve_key = get_auto_approve_key(graph_exec_id, node_id)
|
||||
|
||||
# Check for either normal approval or auto-approval in a single query
|
||||
existing_review = await PendingHumanReview.prisma().find_first(
|
||||
where={
|
||||
"OR": [
|
||||
{"nodeExecId": node_exec_id},
|
||||
{"nodeExecId": auto_approve_key},
|
||||
],
|
||||
"status": ReviewStatus.APPROVED,
|
||||
"userId": user_id,
|
||||
},
|
||||
)
|
||||
|
||||
if existing_review:
|
||||
is_auto_approval = existing_review.nodeExecId == auto_approve_key
|
||||
logger.info(
|
||||
f"Found {'auto-' if is_auto_approval else ''}approval for node {node_id} "
|
||||
f"(exec: {node_exec_id}) in execution {graph_exec_id}"
|
||||
)
|
||||
# For auto-approvals, use current input_data to avoid replaying stale payload
|
||||
# For normal approvals, use the stored payload (which may have been edited)
|
||||
return ReviewResult(
|
||||
data=(
|
||||
input_data
|
||||
if is_auto_approval and input_data is not None
|
||||
else existing_review.payload
|
||||
),
|
||||
status=ReviewStatus.APPROVED,
|
||||
message=(
|
||||
"Auto-approved (user approved all future actions for this node)"
|
||||
if is_auto_approval
|
||||
else existing_review.reviewMessage or ""
|
||||
),
|
||||
processed=True,
|
||||
node_exec_id=existing_review.nodeExecId,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
async def create_auto_approval_record(
|
||||
user_id: str,
|
||||
graph_exec_id: str,
|
||||
graph_id: str,
|
||||
graph_version: int,
|
||||
node_id: str,
|
||||
payload: SafeJsonData,
|
||||
) -> None:
|
||||
"""
|
||||
Create an auto-approval record for a node in this execution.
|
||||
|
||||
This is stored as a PendingHumanReview with a special nodeExecId pattern
|
||||
and status=APPROVED, so future executions of the same node can skip review.
|
||||
|
||||
Raises:
|
||||
ValueError: If the graph execution doesn't belong to the user
|
||||
"""
|
||||
# Validate that the graph execution belongs to this user (defense in depth)
|
||||
graph_exec = await get_graph_execution_meta(
|
||||
user_id=user_id, execution_id=graph_exec_id
|
||||
)
|
||||
if not graph_exec:
|
||||
raise ValueError(
|
||||
f"Graph execution {graph_exec_id} not found or doesn't belong to user {user_id}"
|
||||
)
|
||||
|
||||
auto_approve_key = get_auto_approve_key(graph_exec_id, node_id)
|
||||
|
||||
await PendingHumanReview.prisma().upsert(
|
||||
where={"nodeExecId": auto_approve_key},
|
||||
data={
|
||||
"create": {
|
||||
"nodeExecId": auto_approve_key,
|
||||
"userId": user_id,
|
||||
"graphExecId": graph_exec_id,
|
||||
"graphId": graph_id,
|
||||
"graphVersion": graph_version,
|
||||
"payload": SafeJson(payload),
|
||||
"instructions": "Auto-approval record",
|
||||
"editable": False,
|
||||
"status": ReviewStatus.APPROVED,
|
||||
"processed": True,
|
||||
"reviewedAt": datetime.now(timezone.utc),
|
||||
},
|
||||
"update": {}, # Already exists, no update needed
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
async def get_or_create_human_review(
|
||||
user_id: str,
|
||||
node_exec_id: str,
|
||||
@@ -231,38 +108,6 @@ async def get_or_create_human_review(
|
||||
)
|
||||
|
||||
|
||||
async def get_pending_review_by_node_exec_id(
|
||||
node_exec_id: str, user_id: str
|
||||
) -> Optional["PendingHumanReviewModel"]:
|
||||
"""
|
||||
Get a pending review by its node execution ID.
|
||||
|
||||
Args:
|
||||
node_exec_id: The node execution ID to look up
|
||||
user_id: User ID for authorization (only returns if review belongs to this user)
|
||||
|
||||
Returns:
|
||||
The pending review if found and belongs to user, None otherwise
|
||||
"""
|
||||
review = await PendingHumanReview.prisma().find_first(
|
||||
where={
|
||||
"nodeExecId": node_exec_id,
|
||||
"userId": user_id,
|
||||
"status": ReviewStatus.WAITING,
|
||||
}
|
||||
)
|
||||
|
||||
if not review:
|
||||
return None
|
||||
|
||||
# Local import to avoid event loop conflicts in tests
|
||||
from backend.data.execution import get_node_execution
|
||||
|
||||
node_exec = await get_node_execution(review.nodeExecId)
|
||||
node_id = node_exec.node_id if node_exec else review.nodeExecId
|
||||
return PendingHumanReviewModel.from_db(review, node_id=node_id)
|
||||
|
||||
|
||||
async def has_pending_reviews_for_graph_exec(graph_exec_id: str) -> bool:
|
||||
"""
|
||||
Check if a graph execution has any pending reviews.
|
||||
@@ -292,11 +137,8 @@ async def get_pending_reviews_for_user(
|
||||
page_size: Number of reviews per page
|
||||
|
||||
Returns:
|
||||
List of pending review models with node_id included
|
||||
List of pending review models
|
||||
"""
|
||||
# Local import to avoid event loop conflicts in tests
|
||||
from backend.data.execution import get_node_execution
|
||||
|
||||
# Calculate offset for pagination
|
||||
offset = (page - 1) * page_size
|
||||
|
||||
@@ -307,14 +149,7 @@ async def get_pending_reviews_for_user(
|
||||
take=page_size,
|
||||
)
|
||||
|
||||
# Fetch node_id for each review from NodeExecution
|
||||
result = []
|
||||
for review in reviews:
|
||||
node_exec = await get_node_execution(review.nodeExecId)
|
||||
node_id = node_exec.node_id if node_exec else review.nodeExecId
|
||||
result.append(PendingHumanReviewModel.from_db(review, node_id=node_id))
|
||||
|
||||
return result
|
||||
return [PendingHumanReviewModel.from_db(review) for review in reviews]
|
||||
|
||||
|
||||
async def get_pending_reviews_for_execution(
|
||||
@@ -328,11 +163,8 @@ async def get_pending_reviews_for_execution(
|
||||
user_id: User ID for security validation
|
||||
|
||||
Returns:
|
||||
List of pending review models with node_id included
|
||||
List of pending review models
|
||||
"""
|
||||
# Local import to avoid event loop conflicts in tests
|
||||
from backend.data.execution import get_node_execution
|
||||
|
||||
reviews = await PendingHumanReview.prisma().find_many(
|
||||
where={
|
||||
"userId": user_id,
|
||||
@@ -342,14 +174,7 @@ async def get_pending_reviews_for_execution(
|
||||
order={"createdAt": "asc"},
|
||||
)
|
||||
|
||||
# Fetch node_id for each review from NodeExecution
|
||||
result = []
|
||||
for review in reviews:
|
||||
node_exec = await get_node_execution(review.nodeExecId)
|
||||
node_id = node_exec.node_id if node_exec else review.nodeExecId
|
||||
result.append(PendingHumanReviewModel.from_db(review, node_id=node_id))
|
||||
|
||||
return result
|
||||
return [PendingHumanReviewModel.from_db(review) for review in reviews]
|
||||
|
||||
|
||||
async def process_all_reviews_for_execution(
|
||||
@@ -419,19 +244,11 @@ async def process_all_reviews_for_execution(
|
||||
# Note: Execution resumption is now handled at the API layer after ALL reviews
|
||||
# for an execution are processed (both approved and rejected)
|
||||
|
||||
# Fetch node_id for each review and return as dict for easy access
|
||||
# Local import to avoid event loop conflicts in tests
|
||||
from backend.data.execution import get_node_execution
|
||||
|
||||
result = {}
|
||||
for review in updated_reviews:
|
||||
node_exec = await get_node_execution(review.nodeExecId)
|
||||
node_id = node_exec.node_id if node_exec else review.nodeExecId
|
||||
result[review.nodeExecId] = PendingHumanReviewModel.from_db(
|
||||
review, node_id=node_id
|
||||
)
|
||||
|
||||
return result
|
||||
# Return as dict for easy access
|
||||
return {
|
||||
review.nodeExecId: PendingHumanReviewModel.from_db(review)
|
||||
for review in updated_reviews
|
||||
}
|
||||
|
||||
|
||||
async def update_review_processed_status(node_exec_id: str, processed: bool) -> None:
|
||||
@@ -439,44 +256,3 @@ async def update_review_processed_status(node_exec_id: str, processed: bool) ->
|
||||
await PendingHumanReview.prisma().update(
|
||||
where={"nodeExecId": node_exec_id}, data={"processed": processed}
|
||||
)
|
||||
|
||||
|
||||
async def cancel_pending_reviews_for_execution(graph_exec_id: str, user_id: str) -> int:
|
||||
"""
|
||||
Cancel all pending reviews for a graph execution (e.g., when execution is stopped).
|
||||
|
||||
Marks all WAITING reviews as REJECTED with a message indicating the execution was stopped.
|
||||
|
||||
Args:
|
||||
graph_exec_id: The graph execution ID
|
||||
user_id: User ID who owns the execution (for security validation)
|
||||
|
||||
Returns:
|
||||
Number of reviews cancelled
|
||||
|
||||
Raises:
|
||||
ValueError: If the graph execution doesn't belong to the user
|
||||
"""
|
||||
# Validate user ownership before cancelling reviews
|
||||
graph_exec = await get_graph_execution_meta(
|
||||
user_id=user_id, execution_id=graph_exec_id
|
||||
)
|
||||
if not graph_exec:
|
||||
raise ValueError(
|
||||
f"Graph execution {graph_exec_id} not found or doesn't belong to user {user_id}"
|
||||
)
|
||||
|
||||
result = await PendingHumanReview.prisma().update_many(
|
||||
where={
|
||||
"graphExecId": graph_exec_id,
|
||||
"userId": user_id,
|
||||
"status": ReviewStatus.WAITING,
|
||||
},
|
||||
data={
|
||||
"status": ReviewStatus.REJECTED,
|
||||
"reviewMessage": "Execution was stopped by user",
|
||||
"processed": True,
|
||||
"reviewedAt": datetime.now(timezone.utc),
|
||||
},
|
||||
)
|
||||
return result
|
||||
|
||||
@@ -46,8 +46,8 @@ async def test_get_or_create_human_review_new(
|
||||
sample_db_review.status = ReviewStatus.WAITING
|
||||
sample_db_review.processed = False
|
||||
|
||||
mock_prisma = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
|
||||
mock_prisma.return_value.upsert = AsyncMock(return_value=sample_db_review)
|
||||
mock_upsert = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
|
||||
mock_upsert.return_value.upsert = AsyncMock(return_value=sample_db_review)
|
||||
|
||||
result = await get_or_create_human_review(
|
||||
user_id="test-user-123",
|
||||
@@ -75,8 +75,8 @@ async def test_get_or_create_human_review_approved(
|
||||
sample_db_review.processed = False
|
||||
sample_db_review.reviewMessage = "Looks good"
|
||||
|
||||
mock_prisma = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
|
||||
mock_prisma.return_value.upsert = AsyncMock(return_value=sample_db_review)
|
||||
mock_upsert = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
|
||||
mock_upsert.return_value.upsert = AsyncMock(return_value=sample_db_review)
|
||||
|
||||
result = await get_or_create_human_review(
|
||||
user_id="test-user-123",
|
||||
@@ -131,19 +131,10 @@ async def test_get_pending_reviews_for_user(
|
||||
mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
|
||||
mock_find_many.return_value.find_many = AsyncMock(return_value=[sample_db_review])
|
||||
|
||||
# Mock get_node_execution to return node with node_id (async function)
|
||||
mock_node_exec = Mock()
|
||||
mock_node_exec.node_id = "test_node_def_789"
|
||||
mocker.patch(
|
||||
"backend.data.execution.get_node_execution",
|
||||
new=AsyncMock(return_value=mock_node_exec),
|
||||
)
|
||||
|
||||
result = await get_pending_reviews_for_user("test_user", page=2, page_size=10)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].node_exec_id == "test_node_123"
|
||||
assert result[0].node_id == "test_node_def_789"
|
||||
|
||||
# Verify pagination parameters
|
||||
call_args = mock_find_many.return_value.find_many.call_args
|
||||
@@ -160,21 +151,12 @@ async def test_get_pending_reviews_for_execution(
|
||||
mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
|
||||
mock_find_many.return_value.find_many = AsyncMock(return_value=[sample_db_review])
|
||||
|
||||
# Mock get_node_execution to return node with node_id (async function)
|
||||
mock_node_exec = Mock()
|
||||
mock_node_exec.node_id = "test_node_def_789"
|
||||
mocker.patch(
|
||||
"backend.data.execution.get_node_execution",
|
||||
new=AsyncMock(return_value=mock_node_exec),
|
||||
)
|
||||
|
||||
result = await get_pending_reviews_for_execution(
|
||||
"test_graph_exec_456", "test-user-123"
|
||||
)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].graph_exec_id == "test_graph_exec_456"
|
||||
assert result[0].node_id == "test_node_def_789"
|
||||
|
||||
# Verify it filters by execution and user
|
||||
call_args = mock_find_many.return_value.find_many.call_args
|
||||
@@ -219,14 +201,6 @@ async def test_process_all_reviews_for_execution_success(
|
||||
new=AsyncMock(return_value=[updated_review]),
|
||||
)
|
||||
|
||||
# Mock get_node_execution to return node with node_id (async function)
|
||||
mock_node_exec = Mock()
|
||||
mock_node_exec.node_id = "test_node_def_789"
|
||||
mocker.patch(
|
||||
"backend.data.execution.get_node_execution",
|
||||
new=AsyncMock(return_value=mock_node_exec),
|
||||
)
|
||||
|
||||
result = await process_all_reviews_for_execution(
|
||||
user_id="test-user-123",
|
||||
review_decisions={
|
||||
@@ -237,7 +211,6 @@ async def test_process_all_reviews_for_execution_success(
|
||||
assert len(result) == 1
|
||||
assert "test_node_123" in result
|
||||
assert result["test_node_123"].status == ReviewStatus.APPROVED
|
||||
assert result["test_node_123"].node_id == "test_node_def_789"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -356,14 +329,6 @@ async def test_process_all_reviews_mixed_approval_rejection(
|
||||
new=AsyncMock(return_value=[approved_review, rejected_review]),
|
||||
)
|
||||
|
||||
# Mock get_node_execution to return node with node_id (async function)
|
||||
mock_node_exec = Mock()
|
||||
mock_node_exec.node_id = "test_node_def_789"
|
||||
mocker.patch(
|
||||
"backend.data.execution.get_node_execution",
|
||||
new=AsyncMock(return_value=mock_node_exec),
|
||||
)
|
||||
|
||||
result = await process_all_reviews_for_execution(
|
||||
user_id="test-user-123",
|
||||
review_decisions={
|
||||
@@ -375,5 +340,3 @@ async def test_process_all_reviews_mixed_approval_rejection(
|
||||
assert len(result) == 2
|
||||
assert "test_node_123" in result
|
||||
assert "test_node_456" in result
|
||||
assert result["test_node_123"].node_id == "test_node_def_789"
|
||||
assert result["test_node_456"].node_id == "test_node_def_789"
|
||||
|
||||
@@ -50,8 +50,6 @@ from backend.data.graph import (
|
||||
validate_graph_execution_permissions,
|
||||
)
|
||||
from backend.data.human_review import (
|
||||
cancel_pending_reviews_for_execution,
|
||||
check_approval,
|
||||
get_or_create_human_review,
|
||||
has_pending_reviews_for_graph_exec,
|
||||
update_review_processed_status,
|
||||
@@ -192,8 +190,6 @@ class DatabaseManager(AppService):
|
||||
get_user_notification_preference = _(get_user_notification_preference)
|
||||
|
||||
# Human In The Loop
|
||||
cancel_pending_reviews_for_execution = _(cancel_pending_reviews_for_execution)
|
||||
check_approval = _(check_approval)
|
||||
get_or_create_human_review = _(get_or_create_human_review)
|
||||
has_pending_reviews_for_graph_exec = _(has_pending_reviews_for_graph_exec)
|
||||
update_review_processed_status = _(update_review_processed_status)
|
||||
@@ -317,8 +313,6 @@ class DatabaseManagerAsyncClient(AppServiceClient):
|
||||
set_execution_kv_data = d.set_execution_kv_data
|
||||
|
||||
# Human In The Loop
|
||||
cancel_pending_reviews_for_execution = d.cancel_pending_reviews_for_execution
|
||||
check_approval = d.check_approval
|
||||
get_or_create_human_review = d.get_or_create_human_review
|
||||
update_review_processed_status = d.update_review_processed_status
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ from pydantic import BaseModel, JsonValue, ValidationError
|
||||
|
||||
from backend.data import execution as execution_db
|
||||
from backend.data import graph as graph_db
|
||||
from backend.data import human_review as human_review_db
|
||||
from backend.data import onboarding as onboarding_db
|
||||
from backend.data import user as user_db
|
||||
from backend.data.block import (
|
||||
@@ -750,27 +749,9 @@ async def stop_graph_execution(
|
||||
if graph_exec.status in [
|
||||
ExecutionStatus.QUEUED,
|
||||
ExecutionStatus.INCOMPLETE,
|
||||
ExecutionStatus.REVIEW,
|
||||
]:
|
||||
# If the graph is queued/incomplete/paused for review, terminate immediately
|
||||
# No need to wait for executor since it's not actively running
|
||||
|
||||
# If graph is in REVIEW status, clean up pending reviews before terminating
|
||||
if graph_exec.status == ExecutionStatus.REVIEW:
|
||||
# Use human_review_db if Prisma connected, else database manager
|
||||
review_db = (
|
||||
human_review_db
|
||||
if prisma.is_connected()
|
||||
else get_database_manager_async_client()
|
||||
)
|
||||
# Mark all pending reviews as rejected/cancelled
|
||||
cancelled_count = await review_db.cancel_pending_reviews_for_execution(
|
||||
graph_exec_id, user_id
|
||||
)
|
||||
logger.info(
|
||||
f"Cancelled {cancelled_count} pending review(s) for stopped execution {graph_exec_id}"
|
||||
)
|
||||
|
||||
# If the graph is still on the queue, we can prevent them from being executed
|
||||
# by setting the status to TERMINATED.
|
||||
graph_exec.status = ExecutionStatus.TERMINATED
|
||||
|
||||
await asyncio.gather(
|
||||
@@ -906,28 +887,9 @@ async def add_graph_execution(
|
||||
nodes_to_skip=nodes_to_skip,
|
||||
execution_context=execution_context,
|
||||
)
|
||||
logger.info(f"Queueing execution {graph_exec.id}")
|
||||
|
||||
# Update execution status to QUEUED BEFORE publishing to prevent race condition
|
||||
# where two concurrent requests could both publish the same execution
|
||||
updated_exec = await edb.update_graph_execution_stats(
|
||||
graph_exec_id=graph_exec.id,
|
||||
status=ExecutionStatus.QUEUED,
|
||||
)
|
||||
|
||||
# Verify the status update succeeded (prevents duplicate queueing in race conditions)
|
||||
# If another request already updated the status, this execution will not be QUEUED
|
||||
if not updated_exec or updated_exec.status != ExecutionStatus.QUEUED:
|
||||
logger.warning(
|
||||
f"Skipping queue publish for execution {graph_exec.id} - "
|
||||
f"status update failed or execution already queued by another request"
|
||||
)
|
||||
return graph_exec
|
||||
|
||||
graph_exec.status = ExecutionStatus.QUEUED
|
||||
logger.info(f"Publishing execution {graph_exec.id} to execution queue")
|
||||
|
||||
# Publish to execution queue for executor to pick up
|
||||
# This happens AFTER status update to ensure only one request publishes
|
||||
exec_queue = await get_async_execution_queue()
|
||||
await exec_queue.publish_message(
|
||||
routing_key=GRAPH_EXECUTION_ROUTING_KEY,
|
||||
@@ -935,6 +897,13 @@ async def add_graph_execution(
|
||||
exchange=GRAPH_EXECUTION_EXCHANGE,
|
||||
)
|
||||
logger.info(f"Published execution {graph_exec.id} to RabbitMQ queue")
|
||||
|
||||
# Update execution status to QUEUED
|
||||
graph_exec.status = ExecutionStatus.QUEUED
|
||||
await edb.update_graph_execution_stats(
|
||||
graph_exec_id=graph_exec.id,
|
||||
status=graph_exec.status,
|
||||
)
|
||||
except BaseException as e:
|
||||
err = str(e) or type(e).__name__
|
||||
if not graph_exec:
|
||||
|
||||
@@ -4,7 +4,6 @@ import pytest
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
from backend.data.dynamic_fields import merge_execution_input, parse_execution_output
|
||||
from backend.data.execution import ExecutionStatus
|
||||
from backend.util.mock import MockObject
|
||||
|
||||
|
||||
@@ -347,7 +346,6 @@ async def test_add_graph_execution_is_repeatable(mocker: MockerFixture):
|
||||
mock_graph_exec = mocker.MagicMock(spec=GraphExecutionWithNodes)
|
||||
mock_graph_exec.id = "execution-id-123"
|
||||
mock_graph_exec.node_executions = [] # Add this to avoid AttributeError
|
||||
mock_graph_exec.status = ExecutionStatus.QUEUED # Required for race condition check
|
||||
mock_graph_exec.to_graph_execution_entry.return_value = mocker.MagicMock()
|
||||
|
||||
# Mock the queue and event bus
|
||||
@@ -613,7 +611,6 @@ async def test_add_graph_execution_with_nodes_to_skip(mocker: MockerFixture):
|
||||
mock_graph_exec = mocker.MagicMock(spec=GraphExecutionWithNodes)
|
||||
mock_graph_exec.id = "execution-id-123"
|
||||
mock_graph_exec.node_executions = []
|
||||
mock_graph_exec.status = ExecutionStatus.QUEUED # Required for race condition check
|
||||
|
||||
# Track what's passed to to_graph_execution_entry
|
||||
captured_kwargs = {}
|
||||
@@ -673,232 +670,3 @@ async def test_add_graph_execution_with_nodes_to_skip(mocker: MockerFixture):
|
||||
# Verify nodes_to_skip was passed to to_graph_execution_entry
|
||||
assert "nodes_to_skip" in captured_kwargs
|
||||
assert captured_kwargs["nodes_to_skip"] == nodes_to_skip
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_graph_execution_in_review_status_cancels_pending_reviews(
|
||||
mocker: MockerFixture,
|
||||
):
|
||||
"""Test that stopping an execution in REVIEW status cancels pending reviews."""
|
||||
from backend.data.execution import ExecutionStatus, GraphExecutionMeta
|
||||
from backend.executor.utils import stop_graph_execution
|
||||
|
||||
user_id = "test-user"
|
||||
graph_exec_id = "test-exec-123"
|
||||
|
||||
# Mock graph execution in REVIEW status
|
||||
mock_graph_exec = mocker.MagicMock(spec=GraphExecutionMeta)
|
||||
mock_graph_exec.id = graph_exec_id
|
||||
mock_graph_exec.status = ExecutionStatus.REVIEW
|
||||
|
||||
# Mock dependencies
|
||||
mock_get_queue = mocker.patch("backend.executor.utils.get_async_execution_queue")
|
||||
mock_queue_client = mocker.AsyncMock()
|
||||
mock_get_queue.return_value = mock_queue_client
|
||||
|
||||
mock_prisma = mocker.patch("backend.executor.utils.prisma")
|
||||
mock_prisma.is_connected.return_value = True
|
||||
|
||||
mock_human_review_db = mocker.patch("backend.executor.utils.human_review_db")
|
||||
mock_human_review_db.cancel_pending_reviews_for_execution = mocker.AsyncMock(
|
||||
return_value=2 # 2 reviews cancelled
|
||||
)
|
||||
|
||||
mock_execution_db = mocker.patch("backend.executor.utils.execution_db")
|
||||
mock_execution_db.get_graph_execution_meta = mocker.AsyncMock(
|
||||
return_value=mock_graph_exec
|
||||
)
|
||||
mock_execution_db.update_graph_execution_stats = mocker.AsyncMock()
|
||||
|
||||
mock_get_event_bus = mocker.patch(
|
||||
"backend.executor.utils.get_async_execution_event_bus"
|
||||
)
|
||||
mock_event_bus = mocker.MagicMock()
|
||||
mock_event_bus.publish = mocker.AsyncMock()
|
||||
mock_get_event_bus.return_value = mock_event_bus
|
||||
|
||||
mock_get_child_executions = mocker.patch(
|
||||
"backend.executor.utils._get_child_executions"
|
||||
)
|
||||
mock_get_child_executions.return_value = [] # No children
|
||||
|
||||
# Call stop_graph_execution with timeout to allow status check
|
||||
await stop_graph_execution(
|
||||
user_id=user_id,
|
||||
graph_exec_id=graph_exec_id,
|
||||
wait_timeout=1.0, # Wait to allow status check
|
||||
cascade=True,
|
||||
)
|
||||
|
||||
# Verify pending reviews were cancelled
|
||||
mock_human_review_db.cancel_pending_reviews_for_execution.assert_called_once_with(
|
||||
graph_exec_id, user_id
|
||||
)
|
||||
|
||||
# Verify execution status was updated to TERMINATED
|
||||
mock_execution_db.update_graph_execution_stats.assert_called_once()
|
||||
call_kwargs = mock_execution_db.update_graph_execution_stats.call_args[1]
|
||||
assert call_kwargs["graph_exec_id"] == graph_exec_id
|
||||
assert call_kwargs["status"] == ExecutionStatus.TERMINATED
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_graph_execution_with_database_manager_when_prisma_disconnected(
|
||||
mocker: MockerFixture,
|
||||
):
|
||||
"""Test that stop uses database manager when Prisma is not connected."""
|
||||
from backend.data.execution import ExecutionStatus, GraphExecutionMeta
|
||||
from backend.executor.utils import stop_graph_execution
|
||||
|
||||
user_id = "test-user"
|
||||
graph_exec_id = "test-exec-456"
|
||||
|
||||
# Mock graph execution in REVIEW status
|
||||
mock_graph_exec = mocker.MagicMock(spec=GraphExecutionMeta)
|
||||
mock_graph_exec.id = graph_exec_id
|
||||
mock_graph_exec.status = ExecutionStatus.REVIEW
|
||||
|
||||
# Mock dependencies
|
||||
mock_get_queue = mocker.patch("backend.executor.utils.get_async_execution_queue")
|
||||
mock_queue_client = mocker.AsyncMock()
|
||||
mock_get_queue.return_value = mock_queue_client
|
||||
|
||||
# Prisma is NOT connected
|
||||
mock_prisma = mocker.patch("backend.executor.utils.prisma")
|
||||
mock_prisma.is_connected.return_value = False
|
||||
|
||||
# Mock database manager client
|
||||
mock_get_db_manager = mocker.patch(
|
||||
"backend.executor.utils.get_database_manager_async_client"
|
||||
)
|
||||
mock_db_manager = mocker.AsyncMock()
|
||||
mock_db_manager.get_graph_execution_meta = mocker.AsyncMock(
|
||||
return_value=mock_graph_exec
|
||||
)
|
||||
mock_db_manager.cancel_pending_reviews_for_execution = mocker.AsyncMock(
|
||||
return_value=3 # 3 reviews cancelled
|
||||
)
|
||||
mock_db_manager.update_graph_execution_stats = mocker.AsyncMock()
|
||||
mock_get_db_manager.return_value = mock_db_manager
|
||||
|
||||
mock_get_event_bus = mocker.patch(
|
||||
"backend.executor.utils.get_async_execution_event_bus"
|
||||
)
|
||||
mock_event_bus = mocker.MagicMock()
|
||||
mock_event_bus.publish = mocker.AsyncMock()
|
||||
mock_get_event_bus.return_value = mock_event_bus
|
||||
|
||||
mock_get_child_executions = mocker.patch(
|
||||
"backend.executor.utils._get_child_executions"
|
||||
)
|
||||
mock_get_child_executions.return_value = [] # No children
|
||||
|
||||
# Call stop_graph_execution with timeout
|
||||
await stop_graph_execution(
|
||||
user_id=user_id,
|
||||
graph_exec_id=graph_exec_id,
|
||||
wait_timeout=1.0,
|
||||
cascade=True,
|
||||
)
|
||||
|
||||
# Verify database manager was used for cancel_pending_reviews
|
||||
mock_db_manager.cancel_pending_reviews_for_execution.assert_called_once_with(
|
||||
graph_exec_id, user_id
|
||||
)
|
||||
|
||||
# Verify execution status was updated via database manager
|
||||
mock_db_manager.update_graph_execution_stats.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_graph_execution_cascades_to_child_with_reviews(
|
||||
mocker: MockerFixture,
|
||||
):
|
||||
"""Test that stopping parent execution cascades to children and cancels their reviews."""
|
||||
from backend.data.execution import ExecutionStatus, GraphExecutionMeta
|
||||
from backend.executor.utils import stop_graph_execution
|
||||
|
||||
user_id = "test-user"
|
||||
parent_exec_id = "parent-exec"
|
||||
child_exec_id = "child-exec"
|
||||
|
||||
# Mock parent execution in RUNNING status
|
||||
mock_parent_exec = mocker.MagicMock(spec=GraphExecutionMeta)
|
||||
mock_parent_exec.id = parent_exec_id
|
||||
mock_parent_exec.status = ExecutionStatus.RUNNING
|
||||
|
||||
# Mock child execution in REVIEW status
|
||||
mock_child_exec = mocker.MagicMock(spec=GraphExecutionMeta)
|
||||
mock_child_exec.id = child_exec_id
|
||||
mock_child_exec.status = ExecutionStatus.REVIEW
|
||||
|
||||
# Mock dependencies
|
||||
mock_get_queue = mocker.patch("backend.executor.utils.get_async_execution_queue")
|
||||
mock_queue_client = mocker.AsyncMock()
|
||||
mock_get_queue.return_value = mock_queue_client
|
||||
|
||||
mock_prisma = mocker.patch("backend.executor.utils.prisma")
|
||||
mock_prisma.is_connected.return_value = True
|
||||
|
||||
mock_human_review_db = mocker.patch("backend.executor.utils.human_review_db")
|
||||
mock_human_review_db.cancel_pending_reviews_for_execution = mocker.AsyncMock(
|
||||
return_value=1 # 1 child review cancelled
|
||||
)
|
||||
|
||||
# Mock execution_db to return different status based on which execution is queried
|
||||
mock_execution_db = mocker.patch("backend.executor.utils.execution_db")
|
||||
|
||||
# Track call count to simulate status transition
|
||||
call_count = {"count": 0}
|
||||
|
||||
async def get_exec_meta_side_effect(execution_id, user_id):
|
||||
call_count["count"] += 1
|
||||
if execution_id == parent_exec_id:
|
||||
# After a few calls (child processing happens), transition parent to TERMINATED
|
||||
# This simulates the executor service processing the stop request
|
||||
if call_count["count"] > 3:
|
||||
mock_parent_exec.status = ExecutionStatus.TERMINATED
|
||||
return mock_parent_exec
|
||||
elif execution_id == child_exec_id:
|
||||
return mock_child_exec
|
||||
return None
|
||||
|
||||
mock_execution_db.get_graph_execution_meta = mocker.AsyncMock(
|
||||
side_effect=get_exec_meta_side_effect
|
||||
)
|
||||
mock_execution_db.update_graph_execution_stats = mocker.AsyncMock()
|
||||
|
||||
mock_get_event_bus = mocker.patch(
|
||||
"backend.executor.utils.get_async_execution_event_bus"
|
||||
)
|
||||
mock_event_bus = mocker.MagicMock()
|
||||
mock_event_bus.publish = mocker.AsyncMock()
|
||||
mock_get_event_bus.return_value = mock_event_bus
|
||||
|
||||
# Mock _get_child_executions to return the child
|
||||
mock_get_child_executions = mocker.patch(
|
||||
"backend.executor.utils._get_child_executions"
|
||||
)
|
||||
|
||||
def get_children_side_effect(parent_id):
|
||||
if parent_id == parent_exec_id:
|
||||
return [mock_child_exec]
|
||||
return []
|
||||
|
||||
mock_get_child_executions.side_effect = get_children_side_effect
|
||||
|
||||
# Call stop_graph_execution on parent with cascade=True
|
||||
await stop_graph_execution(
|
||||
user_id=user_id,
|
||||
graph_exec_id=parent_exec_id,
|
||||
wait_timeout=1.0,
|
||||
cascade=True,
|
||||
)
|
||||
|
||||
# Verify child reviews were cancelled
|
||||
mock_human_review_db.cancel_pending_reviews_for_execution.assert_called_once_with(
|
||||
child_exec_id, user_id
|
||||
)
|
||||
|
||||
# Verify both parent and child status updates
|
||||
assert mock_execution_db.update_graph_execution_stats.call_count >= 1
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import asyncio
|
||||
import inspect
|
||||
import logging
|
||||
import time
|
||||
@@ -59,11 +58,6 @@ class SpinTestServer:
|
||||
self.db_api.__exit__(exc_type, exc_val, exc_tb)
|
||||
self.notif_manager.__exit__(exc_type, exc_val, exc_tb)
|
||||
|
||||
# Give services time to fully shut down
|
||||
# This prevents event loop issues where services haven't fully cleaned up
|
||||
# before the next test starts
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
def setup_dependency_overrides(self):
|
||||
# Override get_user_id for testing
|
||||
self.agent_server.set_test_dependency_overrides(
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
-- Remove NodeExecution foreign key from PendingHumanReview
|
||||
-- The nodeExecId column remains as the primary key, but we remove the FK constraint
|
||||
-- to AgentNodeExecution since PendingHumanReview records can persist after node
|
||||
-- execution records are deleted.
|
||||
|
||||
-- Drop foreign key constraint that linked PendingHumanReview.nodeExecId to AgentNodeExecution.id
|
||||
ALTER TABLE "PendingHumanReview" DROP CONSTRAINT IF EXISTS "PendingHumanReview_nodeExecId_fkey";
|
||||
@@ -517,6 +517,8 @@ model AgentNodeExecution {
|
||||
|
||||
stats Json?
|
||||
|
||||
PendingHumanReview PendingHumanReview?
|
||||
|
||||
@@index([agentGraphExecutionId, agentNodeId, executionStatus])
|
||||
@@index([agentNodeId, executionStatus])
|
||||
@@index([addedTime, queuedTime])
|
||||
@@ -565,7 +567,6 @@ enum ReviewStatus {
|
||||
}
|
||||
|
||||
// Pending human reviews for Human-in-the-loop blocks
|
||||
// Also stores auto-approval records with special nodeExecId patterns (e.g., "auto_approve_{graph_exec_id}_{node_id}")
|
||||
model PendingHumanReview {
|
||||
nodeExecId String @id
|
||||
userId String
|
||||
@@ -584,6 +585,7 @@ model PendingHumanReview {
|
||||
reviewedAt DateTime?
|
||||
|
||||
User User @relation(fields: [userId], references: [id], onDelete: Cascade)
|
||||
NodeExecution AgentNodeExecution @relation(fields: [nodeExecId], references: [id], onDelete: Cascade)
|
||||
GraphExecution AgentGraphExecution @relation(fields: [graphExecId], references: [id], onDelete: Cascade)
|
||||
|
||||
@@unique([nodeExecId]) // One pending review per node execution
|
||||
|
||||
@@ -86,6 +86,7 @@ export function FloatingSafeModeToggle({
|
||||
const {
|
||||
currentHITLSafeMode,
|
||||
showHITLToggle,
|
||||
isHITLStateUndetermined,
|
||||
handleHITLToggle,
|
||||
currentSensitiveActionSafeMode,
|
||||
showSensitiveActionToggle,
|
||||
@@ -98,9 +99,16 @@ export function FloatingSafeModeToggle({
|
||||
return null;
|
||||
}
|
||||
|
||||
const showHITL = showHITLToggle && !isHITLStateUndetermined;
|
||||
const showSensitive = showSensitiveActionToggle;
|
||||
|
||||
if (!showHITL && !showSensitive) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return (
|
||||
<div className={cn("fixed z-50 flex flex-col gap-2", className)}>
|
||||
{showHITLToggle && (
|
||||
{showHITL && (
|
||||
<SafeModeButton
|
||||
isEnabled={currentHITLSafeMode}
|
||||
label="Human in the loop block approval"
|
||||
@@ -111,7 +119,7 @@ export function FloatingSafeModeToggle({
|
||||
fullWidth={fullWidth}
|
||||
/>
|
||||
)}
|
||||
{showSensitiveActionToggle && (
|
||||
{showSensitive && (
|
||||
<SafeModeButton
|
||||
isEnabled={currentSensitiveActionSafeMode}
|
||||
label="Sensitive actions blocks approval"
|
||||
|
||||
@@ -14,10 +14,6 @@ import {
|
||||
import { Dialog } from "@/components/molecules/Dialog/Dialog";
|
||||
import { useEffect, useRef, useState } from "react";
|
||||
import { ScheduleAgentModal } from "../ScheduleAgentModal/ScheduleAgentModal";
|
||||
import {
|
||||
AIAgentSafetyPopup,
|
||||
useAIAgentSafetyPopup,
|
||||
} from "./components/AIAgentSafetyPopup/AIAgentSafetyPopup";
|
||||
import { ModalHeader } from "./components/ModalHeader/ModalHeader";
|
||||
import { ModalRunSection } from "./components/ModalRunSection/ModalRunSection";
|
||||
import { RunActions } from "./components/RunActions/RunActions";
|
||||
@@ -87,18 +83,8 @@ export function RunAgentModal({
|
||||
|
||||
const [isScheduleModalOpen, setIsScheduleModalOpen] = useState(false);
|
||||
const [hasOverflow, setHasOverflow] = useState(false);
|
||||
const [isSafetyPopupOpen, setIsSafetyPopupOpen] = useState(false);
|
||||
const [pendingRunAction, setPendingRunAction] = useState<(() => void) | null>(
|
||||
null,
|
||||
);
|
||||
const contentRef = useRef<HTMLDivElement>(null);
|
||||
|
||||
const { shouldShowPopup, dismissPopup } = useAIAgentSafetyPopup(
|
||||
agent.id,
|
||||
agent.has_sensitive_action,
|
||||
agent.has_human_in_the_loop,
|
||||
);
|
||||
|
||||
const hasAnySetupFields =
|
||||
Object.keys(agentInputFields || {}).length > 0 ||
|
||||
Object.keys(agentCredentialsInputFields || {}).length > 0;
|
||||
@@ -179,24 +165,6 @@ export function RunAgentModal({
|
||||
onScheduleCreated?.(schedule);
|
||||
}
|
||||
|
||||
function handleRunWithSafetyCheck() {
|
||||
if (shouldShowPopup) {
|
||||
setPendingRunAction(() => handleRun);
|
||||
setIsSafetyPopupOpen(true);
|
||||
} else {
|
||||
handleRun();
|
||||
}
|
||||
}
|
||||
|
||||
function handleSafetyPopupAcknowledge() {
|
||||
setIsSafetyPopupOpen(false);
|
||||
dismissPopup();
|
||||
if (pendingRunAction) {
|
||||
pendingRunAction();
|
||||
setPendingRunAction(null);
|
||||
}
|
||||
}
|
||||
|
||||
return (
|
||||
<>
|
||||
<Dialog
|
||||
@@ -280,7 +248,7 @@ export function RunAgentModal({
|
||||
)}
|
||||
<RunActions
|
||||
defaultRunType={defaultRunType}
|
||||
onRun={handleRunWithSafetyCheck}
|
||||
onRun={handleRun}
|
||||
isExecuting={isExecuting}
|
||||
isSettingUpTrigger={isSettingUpTrigger}
|
||||
isRunReady={allRequiredInputsAreSet}
|
||||
@@ -298,12 +266,6 @@ export function RunAgentModal({
|
||||
</div>
|
||||
</Dialog.Content>
|
||||
</Dialog>
|
||||
|
||||
<AIAgentSafetyPopup
|
||||
agentId={agent.id}
|
||||
isOpen={isSafetyPopupOpen}
|
||||
onAcknowledge={handleSafetyPopupAcknowledge}
|
||||
/>
|
||||
</>
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1,108 +0,0 @@
|
||||
"use client";
|
||||
|
||||
import { Button } from "@/components/atoms/Button/Button";
|
||||
import { Text } from "@/components/atoms/Text/Text";
|
||||
import { Dialog } from "@/components/molecules/Dialog/Dialog";
|
||||
import { Key, storage } from "@/services/storage/local-storage";
|
||||
import { ShieldCheckIcon } from "@phosphor-icons/react";
|
||||
import { useCallback, useEffect, useState } from "react";
|
||||
|
||||
interface Props {
|
||||
agentId: string;
|
||||
onAcknowledge: () => void;
|
||||
isOpen: boolean;
|
||||
}
|
||||
|
||||
export function AIAgentSafetyPopup({ agentId, onAcknowledge, isOpen }: Props) {
|
||||
function handleAcknowledge() {
|
||||
// Add this agent to the list of agents for which popup has been shown
|
||||
const seenAgentsJson = storage.get(Key.AI_AGENT_SAFETY_POPUP_SHOWN);
|
||||
const seenAgents: string[] = seenAgentsJson
|
||||
? JSON.parse(seenAgentsJson)
|
||||
: [];
|
||||
|
||||
if (!seenAgents.includes(agentId)) {
|
||||
seenAgents.push(agentId);
|
||||
storage.set(Key.AI_AGENT_SAFETY_POPUP_SHOWN, JSON.stringify(seenAgents));
|
||||
}
|
||||
|
||||
onAcknowledge();
|
||||
}
|
||||
|
||||
if (!isOpen) return null;
|
||||
|
||||
return (
|
||||
<Dialog
|
||||
controlled={{ isOpen, set: () => {} }}
|
||||
styling={{ maxWidth: "480px" }}
|
||||
>
|
||||
<Dialog.Content>
|
||||
<div className="flex flex-col items-center p-6 text-center">
|
||||
<div className="mb-6 flex h-16 w-16 items-center justify-center rounded-full bg-blue-50">
|
||||
<ShieldCheckIcon
|
||||
weight="fill"
|
||||
size={32}
|
||||
className="text-blue-600"
|
||||
/>
|
||||
</div>
|
||||
|
||||
<Text variant="h3" className="mb-4">
|
||||
Safety Checks Enabled
|
||||
</Text>
|
||||
|
||||
<Text variant="body" className="mb-2 text-zinc-700">
|
||||
AI-generated agents may take actions that affect your data or
|
||||
external systems.
|
||||
</Text>
|
||||
|
||||
<Text variant="body" className="mb-8 text-zinc-700">
|
||||
AutoGPT includes safety checks so you'll always have the
|
||||
opportunity to review and approve sensitive actions before they
|
||||
happen.
|
||||
</Text>
|
||||
|
||||
<Button
|
||||
variant="primary"
|
||||
size="large"
|
||||
className="w-full"
|
||||
onClick={handleAcknowledge}
|
||||
>
|
||||
Got it
|
||||
</Button>
|
||||
</div>
|
||||
</Dialog.Content>
|
||||
</Dialog>
|
||||
);
|
||||
}
|
||||
|
||||
export function useAIAgentSafetyPopup(
|
||||
agentId: string,
|
||||
hasSensitiveAction: boolean,
|
||||
hasHumanInTheLoop: boolean,
|
||||
) {
|
||||
const [shouldShowPopup, setShouldShowPopup] = useState(false);
|
||||
const [hasChecked, setHasChecked] = useState(false);
|
||||
|
||||
useEffect(() => {
|
||||
if (hasChecked) return;
|
||||
|
||||
const seenAgentsJson = storage.get(Key.AI_AGENT_SAFETY_POPUP_SHOWN);
|
||||
const seenAgents: string[] = seenAgentsJson
|
||||
? JSON.parse(seenAgentsJson)
|
||||
: [];
|
||||
const hasSeenPopupForThisAgent = seenAgents.includes(agentId);
|
||||
const isRelevantAgent = hasSensitiveAction || hasHumanInTheLoop;
|
||||
|
||||
setShouldShowPopup(!hasSeenPopupForThisAgent && isRelevantAgent);
|
||||
setHasChecked(true);
|
||||
}, [agentId, hasSensitiveAction, hasHumanInTheLoop, hasChecked]);
|
||||
|
||||
const dismissPopup = useCallback(() => {
|
||||
setShouldShowPopup(false);
|
||||
}, []);
|
||||
|
||||
return {
|
||||
shouldShowPopup,
|
||||
dismissPopup,
|
||||
};
|
||||
}
|
||||
@@ -69,6 +69,7 @@ export function SafeModeToggle({ graph, className }: Props) {
|
||||
const {
|
||||
currentHITLSafeMode,
|
||||
showHITLToggle,
|
||||
isHITLStateUndetermined,
|
||||
handleHITLToggle,
|
||||
currentSensitiveActionSafeMode,
|
||||
showSensitiveActionToggle,
|
||||
@@ -77,13 +78,20 @@ export function SafeModeToggle({ graph, className }: Props) {
|
||||
shouldShowToggle,
|
||||
} = useAgentSafeMode(graph);
|
||||
|
||||
if (!shouldShowToggle) {
|
||||
if (!shouldShowToggle || isHITLStateUndetermined) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const showHITL = showHITLToggle && !isHITLStateUndetermined;
|
||||
const showSensitive = showSensitiveActionToggle;
|
||||
|
||||
if (!showHITL && !showSensitive) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return (
|
||||
<div className={cn("flex gap-1", className)}>
|
||||
{showHITLToggle && (
|
||||
{showHITL && (
|
||||
<SafeModeIconButton
|
||||
isEnabled={currentHITLSafeMode}
|
||||
label="Human-in-the-loop"
|
||||
@@ -93,7 +101,7 @@ export function SafeModeToggle({ graph, className }: Props) {
|
||||
isPending={isPending}
|
||||
/>
|
||||
)}
|
||||
{showSensitiveActionToggle && (
|
||||
{showSensitive && (
|
||||
<SafeModeIconButton
|
||||
isEnabled={currentSensitiveActionSafeMode}
|
||||
label="Sensitive actions"
|
||||
|
||||
@@ -8809,12 +8809,6 @@
|
||||
"title": "Node Exec Id",
|
||||
"description": "Node execution ID (primary key)"
|
||||
},
|
||||
"node_id": {
|
||||
"type": "string",
|
||||
"title": "Node Id",
|
||||
"description": "Node definition ID (for grouping)",
|
||||
"default": ""
|
||||
},
|
||||
"user_id": {
|
||||
"type": "string",
|
||||
"title": "User Id",
|
||||
@@ -8914,7 +8908,7 @@
|
||||
"created_at"
|
||||
],
|
||||
"title": "PendingHumanReviewModel",
|
||||
"description": "Response model for pending human review data.\n\nRepresents a human review request that is awaiting user action.\nContains all necessary information for a user to review and approve\nor reject data from a Human-in-the-Loop block execution.\n\nAttributes:\n id: Unique identifier for the review record\n user_id: ID of the user who must perform the review\n node_exec_id: ID of the node execution that created this review\n node_id: ID of the node definition (for grouping reviews from same node)\n graph_exec_id: ID of the graph execution containing the node\n graph_id: ID of the graph template being executed\n graph_version: Version number of the graph template\n payload: The actual data payload awaiting review\n instructions: Instructions or message for the reviewer\n editable: Whether the reviewer can edit the data\n status: Current review status (WAITING, APPROVED, or REJECTED)\n review_message: Optional message from the reviewer\n created_at: Timestamp when review was created\n updated_at: Timestamp when review was last modified\n reviewed_at: Timestamp when review was completed (if applicable)"
|
||||
"description": "Response model for pending human review data.\n\nRepresents a human review request that is awaiting user action.\nContains all necessary information for a user to review and approve\nor reject data from a Human-in-the-Loop block execution.\n\nAttributes:\n id: Unique identifier for the review record\n user_id: ID of the user who must perform the review\n node_exec_id: ID of the node execution that created this review\n graph_exec_id: ID of the graph execution containing the node\n graph_id: ID of the graph template being executed\n graph_version: Version number of the graph template\n payload: The actual data payload awaiting review\n instructions: Instructions or message for the reviewer\n editable: Whether the reviewer can edit the data\n status: Current review status (WAITING, APPROVED, or REJECTED)\n review_message: Optional message from the reviewer\n created_at: Timestamp when review was created\n updated_at: Timestamp when review was last modified\n reviewed_at: Timestamp when review was completed (if applicable)"
|
||||
},
|
||||
"PostmarkBounceEnum": {
|
||||
"type": "integer",
|
||||
@@ -9417,12 +9411,6 @@
|
||||
],
|
||||
"title": "Reviewed Data",
|
||||
"description": "Optional edited data (ignored if approved=False)"
|
||||
},
|
||||
"auto_approve_future": {
|
||||
"type": "boolean",
|
||||
"title": "Auto Approve Future",
|
||||
"description": "If true and this review is approved, future executions of this same block (node) will be automatically approved. This only affects approved reviews.",
|
||||
"default": false
|
||||
}
|
||||
},
|
||||
"type": "object",
|
||||
@@ -9442,7 +9430,7 @@
|
||||
"type": "object",
|
||||
"required": ["reviews"],
|
||||
"title": "ReviewRequest",
|
||||
"description": "Request model for processing ALL pending reviews for an execution.\n\nThis request must include ALL pending reviews for a graph execution.\nEach review will be either approved (with optional data modifications)\nor rejected (data ignored). The execution will resume only after ALL reviews are processed.\n\nEach review item can individually specify whether to auto-approve future executions\nof the same block via the `auto_approve_future` field on ReviewItem."
|
||||
"description": "Request model for processing ALL pending reviews for an execution.\n\nThis request must include ALL pending reviews for a graph execution.\nEach review will be either approved (with optional data modifications)\nor rejected (data ignored). The execution will resume only after ALL reviews are processed."
|
||||
},
|
||||
"ReviewResponse": {
|
||||
"properties": {
|
||||
|
||||
@@ -31,29 +31,6 @@ export function FloatingReviewsPanel({
|
||||
query: {
|
||||
enabled: !!(graphId && executionId),
|
||||
select: okData,
|
||||
// Poll while execution is in progress to detect status changes
|
||||
refetchInterval: (q) => {
|
||||
// Note: refetchInterval callback receives raw data before select transform
|
||||
const rawData = q.state.data as
|
||||
| { status: number; data?: { status?: string } }
|
||||
| undefined;
|
||||
if (rawData?.status !== 200) return false;
|
||||
|
||||
const status = rawData?.data?.status;
|
||||
if (!status) return false;
|
||||
|
||||
// Poll every 2 seconds while running or in review
|
||||
if (
|
||||
status === AgentExecutionStatus.RUNNING ||
|
||||
status === AgentExecutionStatus.QUEUED ||
|
||||
status === AgentExecutionStatus.INCOMPLETE ||
|
||||
status === AgentExecutionStatus.REVIEW
|
||||
) {
|
||||
return 2000;
|
||||
}
|
||||
return false;
|
||||
},
|
||||
refetchIntervalInBackground: true,
|
||||
},
|
||||
},
|
||||
);
|
||||
@@ -63,47 +40,28 @@ export function FloatingReviewsPanel({
|
||||
useShallow((state) => state.graphExecutionStatus),
|
||||
);
|
||||
|
||||
// Determine if we should poll for pending reviews
|
||||
const isInReviewStatus =
|
||||
executionDetails?.status === AgentExecutionStatus.REVIEW ||
|
||||
graphExecutionStatus === AgentExecutionStatus.REVIEW;
|
||||
|
||||
const { pendingReviews, isLoading, refetch } = usePendingReviewsForExecution(
|
||||
executionId || "",
|
||||
{
|
||||
enabled: !!executionId,
|
||||
// Poll every 2 seconds when in REVIEW status to catch new reviews
|
||||
refetchInterval: isInReviewStatus ? 2000 : false,
|
||||
},
|
||||
);
|
||||
|
||||
// Refetch pending reviews when execution status changes
|
||||
useEffect(() => {
|
||||
if (executionId && executionDetails?.status) {
|
||||
if (executionId) {
|
||||
refetch();
|
||||
}
|
||||
}, [executionDetails?.status, executionId, refetch]);
|
||||
|
||||
// Hide panel if:
|
||||
// 1. No execution ID
|
||||
// 2. No pending reviews and not in REVIEW status
|
||||
// 3. Execution is RUNNING or QUEUED (hasn't paused for review yet)
|
||||
if (!executionId) {
|
||||
return null;
|
||||
}
|
||||
// Refetch when graph execution status changes to REVIEW
|
||||
useEffect(() => {
|
||||
if (graphExecutionStatus === AgentExecutionStatus.REVIEW && executionId) {
|
||||
refetch();
|
||||
}
|
||||
}, [graphExecutionStatus, executionId, refetch]);
|
||||
|
||||
if (
|
||||
!isLoading &&
|
||||
pendingReviews.length === 0 &&
|
||||
executionDetails?.status !== AgentExecutionStatus.REVIEW
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Don't show panel while execution is still running/queued (not paused for review)
|
||||
if (
|
||||
executionDetails?.status === AgentExecutionStatus.RUNNING ||
|
||||
executionDetails?.status === AgentExecutionStatus.QUEUED
|
||||
!executionId ||
|
||||
(!isLoading &&
|
||||
pendingReviews.length === 0 &&
|
||||
executionDetails?.status !== AgentExecutionStatus.REVIEW)
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
import { PendingHumanReviewModel } from "@/app/api/__generated__/models/pendingHumanReviewModel";
|
||||
import { Text } from "@/components/atoms/Text/Text";
|
||||
import { Button } from "@/components/atoms/Button/Button";
|
||||
import { Input } from "@/components/atoms/Input/Input";
|
||||
import { Switch } from "@/components/atoms/Switch/Switch";
|
||||
import { useEffect, useState } from "react";
|
||||
import { TrashIcon, EyeSlashIcon } from "@phosphor-icons/react";
|
||||
import { useState } from "react";
|
||||
|
||||
interface StructuredReviewPayload {
|
||||
data: unknown;
|
||||
@@ -38,40 +40,37 @@ function extractReviewData(payload: unknown): {
|
||||
interface PendingReviewCardProps {
|
||||
review: PendingHumanReviewModel;
|
||||
onReviewDataChange: (nodeExecId: string, data: string) => void;
|
||||
autoApproveFuture?: boolean;
|
||||
onAutoApproveFutureChange?: (nodeExecId: string, enabled: boolean) => void;
|
||||
externalDataValue?: string;
|
||||
reviewMessage?: string;
|
||||
onReviewMessageChange?: (nodeExecId: string, message: string) => void;
|
||||
isDisabled?: boolean;
|
||||
onToggleDisabled?: (nodeExecId: string) => void;
|
||||
}
|
||||
|
||||
export function PendingReviewCard({
|
||||
review,
|
||||
onReviewDataChange,
|
||||
autoApproveFuture = false,
|
||||
onAutoApproveFutureChange,
|
||||
externalDataValue,
|
||||
reviewMessage = "",
|
||||
onReviewMessageChange,
|
||||
isDisabled = false,
|
||||
onToggleDisabled,
|
||||
}: PendingReviewCardProps) {
|
||||
const extractedData = extractReviewData(review.payload);
|
||||
const isDataEditable = review.editable;
|
||||
const instructions = extractedData.instructions || review.instructions;
|
||||
const [currentData, setCurrentData] = useState(extractedData.data);
|
||||
|
||||
// Sync with external data value when auto-approve is toggled
|
||||
useEffect(() => {
|
||||
if (externalDataValue !== undefined) {
|
||||
try {
|
||||
const parsedData = JSON.parse(externalDataValue);
|
||||
setCurrentData(parsedData);
|
||||
} catch {
|
||||
// If parsing fails, keep current data
|
||||
}
|
||||
}
|
||||
}, [externalDataValue]);
|
||||
|
||||
const handleDataChange = (newValue: unknown) => {
|
||||
setCurrentData(newValue);
|
||||
onReviewDataChange(review.node_exec_id, JSON.stringify(newValue, null, 2));
|
||||
};
|
||||
|
||||
const handleMessageChange = (newMessage: string) => {
|
||||
onReviewMessageChange?.(review.node_exec_id, newMessage);
|
||||
};
|
||||
|
||||
// Show simplified view when no toggle functionality is provided (Screenshot 1 mode)
|
||||
const showSimplified = !onToggleDisabled;
|
||||
|
||||
const renderDataInput = () => {
|
||||
const data = currentData;
|
||||
|
||||
@@ -148,13 +147,35 @@ export function PendingReviewCard({
|
||||
// Use the existing HITL review interface
|
||||
return (
|
||||
<div className="space-y-4">
|
||||
{!showSimplified && (
|
||||
<div className="flex items-start justify-between">
|
||||
<div className="flex-1">
|
||||
{isDisabled && (
|
||||
<Text variant="small" className="text-muted-foreground">
|
||||
This item will be rejected
|
||||
</Text>
|
||||
)}
|
||||
</div>
|
||||
<Button
|
||||
onClick={() => onToggleDisabled!(review.node_exec_id)}
|
||||
variant={isDisabled ? "primary" : "secondary"}
|
||||
size="small"
|
||||
leftIcon={
|
||||
isDisabled ? <EyeSlashIcon size={14} /> : <TrashIcon size={14} />
|
||||
}
|
||||
>
|
||||
{isDisabled ? "Include" : "Exclude"}
|
||||
</Button>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{/* Show instructions as field label */}
|
||||
{instructions && (
|
||||
<div className="space-y-3">
|
||||
<Text variant="body" className="font-semibold text-gray-900">
|
||||
{getFieldLabel(instructions)}
|
||||
</Text>
|
||||
{isDataEditable && !autoApproveFuture ? (
|
||||
{isDataEditable && !isDisabled ? (
|
||||
renderDataInput()
|
||||
) : (
|
||||
<div className="rounded-lg border border-gray-200 bg-white p-3">
|
||||
@@ -177,7 +198,7 @@ export function PendingReviewCard({
|
||||
</span>
|
||||
)}
|
||||
</Text>
|
||||
{isDataEditable && !autoApproveFuture ? (
|
||||
{isDataEditable && !isDisabled ? (
|
||||
renderDataInput()
|
||||
) : (
|
||||
<div className="rounded-lg border border-gray-200 bg-white p-3">
|
||||
@@ -189,26 +210,22 @@ export function PendingReviewCard({
|
||||
</div>
|
||||
)}
|
||||
|
||||
{/* Auto-approve toggle for this review */}
|
||||
{onAutoApproveFutureChange && (
|
||||
<div className="space-y-2 pt-2">
|
||||
<div className="flex items-center gap-3">
|
||||
<Switch
|
||||
checked={autoApproveFuture}
|
||||
onCheckedChange={(enabled: boolean) =>
|
||||
onAutoApproveFutureChange(review.node_exec_id, enabled)
|
||||
}
|
||||
/>
|
||||
<Text variant="small" className="text-gray-700">
|
||||
Auto-approve future executions of this block
|
||||
</Text>
|
||||
</div>
|
||||
{autoApproveFuture && (
|
||||
<Text variant="small" className="pl-11 text-gray-500">
|
||||
Original data will be used for this and all future reviews from
|
||||
this block.
|
||||
</Text>
|
||||
)}
|
||||
{!showSimplified && isDisabled && (
|
||||
<div>
|
||||
<Text variant="body" className="mb-2 font-semibold">
|
||||
Rejection Reason (Optional):
|
||||
</Text>
|
||||
<Input
|
||||
id="rejection-reason"
|
||||
label="Rejection Reason"
|
||||
hideLabel
|
||||
size="small"
|
||||
type="textarea"
|
||||
rows={3}
|
||||
value={reviewMessage}
|
||||
onChange={(e) => handleMessageChange(e.target.value)}
|
||||
placeholder="Add any notes about why you're rejecting this..."
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
|
||||
@@ -32,15 +32,14 @@ export function PendingReviewsList({
|
||||
},
|
||||
);
|
||||
|
||||
const [reviewMessageMap, setReviewMessageMap] = useState<
|
||||
Record<string, string>
|
||||
>({});
|
||||
|
||||
const [pendingAction, setPendingAction] = useState<
|
||||
"approve" | "reject" | null
|
||||
>(null);
|
||||
|
||||
// Track per-review auto-approval state
|
||||
const [autoApproveFutureMap, setAutoApproveFutureMap] = useState<
|
||||
Record<string, boolean>
|
||||
>({});
|
||||
|
||||
const { toast } = useToast();
|
||||
|
||||
const reviewActionMutation = usePostV2ProcessReviewAction({
|
||||
@@ -89,23 +88,8 @@ export function PendingReviewsList({
|
||||
setReviewDataMap((prev) => ({ ...prev, [nodeExecId]: data }));
|
||||
}
|
||||
|
||||
// Handle per-review auto-approval toggle
|
||||
function handleAutoApproveFutureToggle(nodeExecId: string, enabled: boolean) {
|
||||
setAutoApproveFutureMap((prev) => ({
|
||||
...prev,
|
||||
[nodeExecId]: enabled,
|
||||
}));
|
||||
|
||||
if (enabled) {
|
||||
// Reset this review's data to original value
|
||||
const review = reviews.find((r) => r.node_exec_id === nodeExecId);
|
||||
if (review) {
|
||||
setReviewDataMap((prev) => ({
|
||||
...prev,
|
||||
[nodeExecId]: JSON.stringify(review.payload, null, 2),
|
||||
}));
|
||||
}
|
||||
}
|
||||
function handleReviewMessageChange(nodeExecId: string, message: string) {
|
||||
setReviewMessageMap((prev) => ({ ...prev, [nodeExecId]: message }));
|
||||
}
|
||||
|
||||
function processReviews(approved: boolean) {
|
||||
@@ -123,39 +107,30 @@ export function PendingReviewsList({
|
||||
|
||||
for (const review of reviews) {
|
||||
const reviewData = reviewDataMap[review.node_exec_id];
|
||||
const autoApproveThisReview = autoApproveFutureMap[review.node_exec_id];
|
||||
const reviewMessage = reviewMessageMap[review.node_exec_id];
|
||||
|
||||
// When auto-approving future actions for this review, send undefined (use original data)
|
||||
// Otherwise, parse and send the edited data if available
|
||||
let parsedData: any = undefined;
|
||||
let parsedData: any = review.payload; // Default to original payload
|
||||
|
||||
if (!autoApproveThisReview) {
|
||||
// For regular approve/reject, use edited data if available
|
||||
if (review.editable && reviewData) {
|
||||
try {
|
||||
parsedData = JSON.parse(reviewData);
|
||||
} catch (error) {
|
||||
toast({
|
||||
title: "Invalid JSON",
|
||||
description: `Please fix the JSON format in review for node ${review.node_exec_id}: ${error instanceof Error ? error.message : "Invalid syntax"}`,
|
||||
variant: "destructive",
|
||||
});
|
||||
setPendingAction(null);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// No edits, use original payload
|
||||
parsedData = review.payload;
|
||||
// Parse edited data if available and editable
|
||||
if (review.editable && reviewData) {
|
||||
try {
|
||||
parsedData = JSON.parse(reviewData);
|
||||
} catch (error) {
|
||||
toast({
|
||||
title: "Invalid JSON",
|
||||
description: `Please fix the JSON format in review for node ${review.node_exec_id}: ${error instanceof Error ? error.message : "Invalid syntax"}`,
|
||||
variant: "destructive",
|
||||
});
|
||||
setPendingAction(null);
|
||||
return;
|
||||
}
|
||||
}
|
||||
// When autoApproveThisReview is true, parsedData stays undefined
|
||||
// Backend will use the original payload stored in the database
|
||||
|
||||
reviewItems.push({
|
||||
node_exec_id: review.node_exec_id,
|
||||
approved,
|
||||
reviewed_data: parsedData,
|
||||
auto_approve_future: autoApproveThisReview && approved,
|
||||
message: reviewMessage || undefined,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -207,20 +182,21 @@ export function PendingReviewsList({
|
||||
<div className="space-y-7">
|
||||
{reviews.map((review) => (
|
||||
<PendingReviewCard
|
||||
key={`${review.node_exec_id}`}
|
||||
key={review.node_exec_id}
|
||||
review={review}
|
||||
onReviewDataChange={handleReviewDataChange}
|
||||
autoApproveFuture={
|
||||
autoApproveFutureMap[review.node_exec_id] || false
|
||||
}
|
||||
onAutoApproveFutureChange={handleAutoApproveFutureToggle}
|
||||
externalDataValue={reviewDataMap[review.node_exec_id]}
|
||||
onReviewMessageChange={handleReviewMessageChange}
|
||||
reviewMessage={reviewMessageMap[review.node_exec_id] || ""}
|
||||
/>
|
||||
))}
|
||||
</div>
|
||||
|
||||
<div className="space-y-4">
|
||||
<div className="flex flex-wrap gap-2">
|
||||
<div className="space-y-7">
|
||||
<Text variant="body" className="text-textGrey">
|
||||
Note: Changes you make here apply only to this task
|
||||
</Text>
|
||||
|
||||
<div className="flex gap-2">
|
||||
<Button
|
||||
onClick={() => processReviews(true)}
|
||||
disabled={reviewActionMutation.isPending || reviews.length === 0}
|
||||
@@ -244,11 +220,6 @@ export function PendingReviewsList({
|
||||
Reject
|
||||
</Button>
|
||||
</div>
|
||||
|
||||
<Text variant="small" className="text-textGrey">
|
||||
You can turn auto-approval on or off anytime in this agent's
|
||||
settings.
|
||||
</Text>
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
|
||||
@@ -15,22 +15,8 @@ export function usePendingReviews() {
|
||||
};
|
||||
}
|
||||
|
||||
interface UsePendingReviewsForExecutionOptions {
|
||||
enabled?: boolean;
|
||||
refetchInterval?: number | false;
|
||||
}
|
||||
|
||||
export function usePendingReviewsForExecution(
|
||||
graphExecId: string,
|
||||
options?: UsePendingReviewsForExecutionOptions,
|
||||
) {
|
||||
const query = useGetV2GetPendingReviewsForExecution(graphExecId, {
|
||||
query: {
|
||||
enabled: options?.enabled ?? !!graphExecId,
|
||||
refetchInterval: options?.refetchInterval,
|
||||
refetchIntervalInBackground: !!options?.refetchInterval,
|
||||
},
|
||||
});
|
||||
export function usePendingReviewsForExecution(graphExecId: string) {
|
||||
const query = useGetV2GetPendingReviewsForExecution(graphExecId);
|
||||
|
||||
return {
|
||||
pendingReviews: okData(query.data) || [],
|
||||
|
||||
@@ -10,7 +10,6 @@ export enum Key {
|
||||
LIBRARY_AGENTS_CACHE = "library-agents-cache",
|
||||
CHAT_SESSION_ID = "chat_session_id",
|
||||
COOKIE_CONSENT = "autogpt_cookie_consent",
|
||||
AI_AGENT_SAFETY_POPUP_SHOWN = "ai-agent-safety-popup-shown",
|
||||
}
|
||||
|
||||
function get(key: Key) {
|
||||
|
||||
Reference in New Issue
Block a user