Compare commits

...

3 Commits

Author SHA1 Message Date
Zamil Majdy
7b08f1d7a0 test(backend): update error message assertion in review tests 2026-01-26 11:24:04 -06:00
Zamil Majdy
fe69677758 refactor(backend): remove unused get_pending_reviews_by_node_exec_ids function
The function was replaced by get_reviews_by_node_exec_ids which fetches
reviews regardless of status. Updated tests to mock the new function.
2026-01-26 10:37:45 -06:00
Zamil Majdy
d9864687fd fix(backend): handle race condition in review processing gracefully
When multiple concurrent requests try to process the same reviews
(e.g., double-click, multiple tabs), the second request would fail
with "Reviews not found, access denied, or not in WAITING status".

Now handles this gracefully:
- Already-processed reviews with the same decision are treated as success
- Only raises error if review doesn't exist or has conflicting status
- Logs when race condition is detected for debugging

Fixes AUTOGPT-SERVER-7HE
2026-01-25 21:11:26 -06:00
3 changed files with 77 additions and 42 deletions

View File

@@ -164,9 +164,9 @@ async def test_process_review_action_approve_success(
"""Test successful review approval""" """Test successful review approval"""
# Mock the route functions # Mock the route functions
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id) # Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch( mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids" "backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
) )
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review} mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
@@ -244,9 +244,9 @@ async def test_process_review_action_reject_success(
"""Test successful review rejection""" """Test successful review rejection"""
# Mock the route functions # Mock the route functions
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id) # Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch( mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids" "backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
) )
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review} mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
@@ -339,9 +339,9 @@ async def test_process_review_action_mixed_success(
# Mock the route functions # Mock the route functions
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id) # Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch( mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids" "backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
) )
mock_get_reviews_for_user.return_value = { mock_get_reviews_for_user.return_value = {
"test_node_123": sample_pending_review, "test_node_123": sample_pending_review,
@@ -463,9 +463,9 @@ async def test_process_review_action_review_not_found(
test_user_id: str, test_user_id: str,
) -> None: ) -> None:
"""Test error when review is not found""" """Test error when review is not found"""
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id) # Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch( mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids" "backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
) )
# Return empty dict to simulate review not found # Return empty dict to simulate review not found
mock_get_reviews_for_user.return_value = {} mock_get_reviews_for_user.return_value = {}
@@ -506,7 +506,7 @@ async def test_process_review_action_review_not_found(
response = await client.post("/api/review/action", json=request_data) response = await client.post("/api/review/action", json=request_data)
assert response.status_code == 404 assert response.status_code == 404
assert "No pending review found" in response.json()["detail"] assert "Review(s) not found" in response.json()["detail"]
@pytest.mark.asyncio(loop_scope="session") @pytest.mark.asyncio(loop_scope="session")
@@ -517,9 +517,9 @@ async def test_process_review_action_partial_failure(
test_user_id: str, test_user_id: str,
) -> None: ) -> None:
"""Test handling of partial failures in review processing""" """Test handling of partial failures in review processing"""
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id) # Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch( mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids" "backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
) )
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review} mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
@@ -567,9 +567,9 @@ async def test_process_review_action_invalid_node_exec_id(
test_user_id: str, test_user_id: str,
) -> None: ) -> None:
"""Test failure when trying to process review with invalid node execution ID""" """Test failure when trying to process review with invalid node execution ID"""
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id) # Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch( mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids" "backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
) )
# Return empty dict to simulate review not found # Return empty dict to simulate review not found
mock_get_reviews_for_user.return_value = {} mock_get_reviews_for_user.return_value = {}
@@ -596,7 +596,7 @@ async def test_process_review_action_invalid_node_exec_id(
# Returns 404 when review is not found # Returns 404 when review is not found
assert response.status_code == 404 assert response.status_code == 404
assert "No pending review found" in response.json()["detail"] assert "Review(s) not found" in response.json()["detail"]
@pytest.mark.asyncio(loop_scope="session") @pytest.mark.asyncio(loop_scope="session")
@@ -607,9 +607,9 @@ async def test_process_review_action_auto_approve_creates_auto_approval_records(
test_user_id: str, test_user_id: str,
) -> None: ) -> None:
"""Test that auto_approve_future_actions flag creates auto-approval records""" """Test that auto_approve_future_actions flag creates auto-approval records"""
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id) # Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch( mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids" "backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
) )
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review} mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
@@ -737,9 +737,9 @@ async def test_process_review_action_without_auto_approve_still_loads_settings(
test_user_id: str, test_user_id: str,
) -> None: ) -> None:
"""Test that execution context is created with settings even without auto-approve""" """Test that execution context is created with settings even without auto-approve"""
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id) # Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch( mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids" "backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
) )
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review} mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
@@ -885,9 +885,9 @@ async def test_process_review_action_auto_approve_only_applies_to_approved_revie
reviewed_at=FIXED_NOW, reviewed_at=FIXED_NOW,
) )
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id) # Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch( mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids" "backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
) )
# Need to return both reviews in WAITING state (before processing) # Need to return both reviews in WAITING state (before processing)
approved_review_waiting = PendingHumanReviewModel( approved_review_waiting = PendingHumanReviewModel(
@@ -1031,9 +1031,9 @@ async def test_process_review_action_per_review_auto_approve_granularity(
test_user_id: str, test_user_id: str,
) -> None: ) -> None:
"""Test that auto-approval can be set per-review (granular control)""" """Test that auto-approval can be set per-review (granular control)"""
# Mock get_pending_reviews_by_node_exec_ids - return different reviews based on node_exec_id # Mock get_reviews_by_node_exec_ids - return different reviews based on node_exec_id
mock_get_reviews_for_user = mocker.patch( mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids" "backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
) )
# Create a mapping of node_exec_id to review # Create a mapping of node_exec_id to review

View File

@@ -14,9 +14,9 @@ from backend.data.execution import (
from backend.data.graph import get_graph_settings from backend.data.graph import get_graph_settings
from backend.data.human_review import ( from backend.data.human_review import (
create_auto_approval_record, create_auto_approval_record,
get_pending_reviews_by_node_exec_ids,
get_pending_reviews_for_execution, get_pending_reviews_for_execution,
get_pending_reviews_for_user, get_pending_reviews_for_user,
get_reviews_by_node_exec_ids,
has_pending_reviews_for_graph_exec, has_pending_reviews_for_graph_exec,
process_all_reviews_for_execution, process_all_reviews_for_execution,
) )
@@ -137,17 +137,17 @@ async def process_review_action(
detail="At least one review must be provided", detail="At least one review must be provided",
) )
# Batch fetch all requested reviews # Batch fetch all requested reviews (regardless of status for idempotent handling)
reviews_map = await get_pending_reviews_by_node_exec_ids( reviews_map = await get_reviews_by_node_exec_ids(
list(all_request_node_ids), user_id list(all_request_node_ids), user_id
) )
# Validate all reviews were found # Validate all reviews were found (must exist, any status is OK for now)
missing_ids = all_request_node_ids - set(reviews_map.keys()) missing_ids = all_request_node_ids - set(reviews_map.keys())
if missing_ids: if missing_ids:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, status_code=status.HTTP_404_NOT_FOUND,
detail=f"No pending review found for node execution(s): {', '.join(missing_ids)}", detail=f"Review(s) not found: {', '.join(missing_ids)}",
) )
# Validate all reviews belong to the same execution # Validate all reviews belong to the same execution

View File

@@ -263,11 +263,14 @@ async def get_pending_review_by_node_exec_id(
return PendingHumanReviewModel.from_db(review, node_id=node_id) return PendingHumanReviewModel.from_db(review, node_id=node_id)
async def get_pending_reviews_by_node_exec_ids( async def get_reviews_by_node_exec_ids(
node_exec_ids: list[str], user_id: str node_exec_ids: list[str], user_id: str
) -> dict[str, "PendingHumanReviewModel"]: ) -> dict[str, "PendingHumanReviewModel"]:
""" """
Get multiple pending reviews by their node execution IDs in a single batch query. Get multiple reviews by their node execution IDs regardless of status.
Unlike get_pending_reviews_by_node_exec_ids, this returns reviews in any status
(WAITING, APPROVED, REJECTED). Used for validation in idempotent operations.
Args: Args:
node_exec_ids: List of node execution IDs to look up node_exec_ids: List of node execution IDs to look up
@@ -283,7 +286,6 @@ async def get_pending_reviews_by_node_exec_ids(
where={ where={
"nodeExecId": {"in": node_exec_ids}, "nodeExecId": {"in": node_exec_ids},
"userId": user_id, "userId": user_id,
"status": ReviewStatus.WAITING,
} }
) )
@@ -407,38 +409,68 @@ async def process_all_reviews_for_execution(
) -> dict[str, PendingHumanReviewModel]: ) -> dict[str, PendingHumanReviewModel]:
"""Process all pending reviews for an execution with approve/reject decisions. """Process all pending reviews for an execution with approve/reject decisions.
Handles race conditions gracefully: if a review was already processed with the
same decision by a concurrent request, it's treated as success rather than error.
Args: Args:
user_id: User ID for ownership validation user_id: User ID for ownership validation
review_decisions: Map of node_exec_id -> (status, reviewed_data, message) review_decisions: Map of node_exec_id -> (status, reviewed_data, message)
Returns: Returns:
Dict of node_exec_id -> updated review model Dict of node_exec_id -> updated review model (includes already-processed reviews)
""" """
if not review_decisions: if not review_decisions:
return {} return {}
node_exec_ids = list(review_decisions.keys()) node_exec_ids = list(review_decisions.keys())
# Get all reviews for validation # Get all reviews (both WAITING and already processed) for the user
reviews = await PendingHumanReview.prisma().find_many( all_reviews = await PendingHumanReview.prisma().find_many(
where={ where={
"nodeExecId": {"in": node_exec_ids}, "nodeExecId": {"in": node_exec_ids},
"userId": user_id, "userId": user_id,
"status": ReviewStatus.WAITING,
}, },
) )
# Validate all reviews can be processed # Separate into pending and already-processed reviews
if len(reviews) != len(node_exec_ids): reviews_to_process = []
missing_ids = set(node_exec_ids) - {review.nodeExecId for review in reviews} already_processed = []
for review in all_reviews:
if review.status == ReviewStatus.WAITING:
reviews_to_process.append(review)
else:
already_processed.append(review)
# Check for truly missing reviews (not found at all)
found_ids = {review.nodeExecId for review in all_reviews}
missing_ids = set(node_exec_ids) - found_ids
if missing_ids:
raise ValueError( raise ValueError(
f"Reviews not found, access denied, or not in WAITING status: {', '.join(missing_ids)}" f"Reviews not found or access denied: {', '.join(missing_ids)}"
) )
# Create parallel update tasks # Validate already-processed reviews have compatible status (same decision)
# This handles race conditions where another request processed the same reviews
for review in already_processed:
requested_status = review_decisions[review.nodeExecId][0]
if review.status != requested_status:
raise ValueError(
f"Review {review.nodeExecId} was already processed with status "
f"{review.status}, cannot change to {requested_status}"
)
# Log if we're handling a race condition (some reviews already processed)
if already_processed:
already_processed_ids = [r.nodeExecId for r in already_processed]
logger.info(
f"Race condition handled: {len(already_processed)} review(s) already "
f"processed by concurrent request: {already_processed_ids}"
)
# Create parallel update tasks for reviews that still need processing
update_tasks = [] update_tasks = []
for review in reviews: for review in reviews_to_process:
new_status, reviewed_data, message = review_decisions[review.nodeExecId] new_status, reviewed_data, message = review_decisions[review.nodeExecId]
has_data_changes = reviewed_data is not None and reviewed_data != review.payload has_data_changes = reviewed_data is not None and reviewed_data != review.payload
@@ -463,7 +495,7 @@ async def process_all_reviews_for_execution(
update_tasks.append(task) update_tasks.append(task)
# Execute all updates in parallel and get updated reviews # Execute all updates in parallel and get updated reviews
updated_reviews = await asyncio.gather(*update_tasks) updated_reviews = await asyncio.gather(*update_tasks) if update_tasks else []
# Note: Execution resumption is now handled at the API layer after ALL reviews # Note: Execution resumption is now handled at the API layer after ALL reviews
# for an execution are processed (both approved and rejected) # for an execution are processed (both approved and rejected)
@@ -472,8 +504,11 @@ async def process_all_reviews_for_execution(
# Local import to avoid event loop conflicts in tests # Local import to avoid event loop conflicts in tests
from backend.data.execution import get_node_execution from backend.data.execution import get_node_execution
# Combine updated reviews with already-processed ones (for idempotent response)
all_result_reviews = list(updated_reviews) + already_processed
result = {} result = {}
for review in updated_reviews: for review in all_result_reviews:
node_exec = await get_node_execution(review.nodeExecId) node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId node_id = node_exec.node_id if node_exec else review.nodeExecId
result[review.nodeExecId] = PendingHumanReviewModel.from_db( result[review.nodeExecId] = PendingHumanReviewModel.from_db(