mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-01-26 23:48:11 -05:00
Compare commits
3 Commits
dev
...
fix/review
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b08f1d7a0 | ||
|
|
fe69677758 | ||
|
|
d9864687fd |
@@ -164,9 +164,9 @@ async def test_process_review_action_approve_success(
|
|||||||
"""Test successful review approval"""
|
"""Test successful review approval"""
|
||||||
# Mock the route functions
|
# Mock the route functions
|
||||||
|
|
||||||
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||||
mock_get_reviews_for_user = mocker.patch(
|
mock_get_reviews_for_user = mocker.patch(
|
||||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||||
)
|
)
|
||||||
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
|
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
|
||||||
|
|
||||||
@@ -244,9 +244,9 @@ async def test_process_review_action_reject_success(
|
|||||||
"""Test successful review rejection"""
|
"""Test successful review rejection"""
|
||||||
# Mock the route functions
|
# Mock the route functions
|
||||||
|
|
||||||
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||||
mock_get_reviews_for_user = mocker.patch(
|
mock_get_reviews_for_user = mocker.patch(
|
||||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||||
)
|
)
|
||||||
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
|
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
|
||||||
|
|
||||||
@@ -339,9 +339,9 @@ async def test_process_review_action_mixed_success(
|
|||||||
|
|
||||||
# Mock the route functions
|
# Mock the route functions
|
||||||
|
|
||||||
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||||
mock_get_reviews_for_user = mocker.patch(
|
mock_get_reviews_for_user = mocker.patch(
|
||||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||||
)
|
)
|
||||||
mock_get_reviews_for_user.return_value = {
|
mock_get_reviews_for_user.return_value = {
|
||||||
"test_node_123": sample_pending_review,
|
"test_node_123": sample_pending_review,
|
||||||
@@ -463,9 +463,9 @@ async def test_process_review_action_review_not_found(
|
|||||||
test_user_id: str,
|
test_user_id: str,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test error when review is not found"""
|
"""Test error when review is not found"""
|
||||||
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||||
mock_get_reviews_for_user = mocker.patch(
|
mock_get_reviews_for_user = mocker.patch(
|
||||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||||
)
|
)
|
||||||
# Return empty dict to simulate review not found
|
# Return empty dict to simulate review not found
|
||||||
mock_get_reviews_for_user.return_value = {}
|
mock_get_reviews_for_user.return_value = {}
|
||||||
@@ -506,7 +506,7 @@ async def test_process_review_action_review_not_found(
|
|||||||
response = await client.post("/api/review/action", json=request_data)
|
response = await client.post("/api/review/action", json=request_data)
|
||||||
|
|
||||||
assert response.status_code == 404
|
assert response.status_code == 404
|
||||||
assert "No pending review found" in response.json()["detail"]
|
assert "Review(s) not found" in response.json()["detail"]
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio(loop_scope="session")
|
@pytest.mark.asyncio(loop_scope="session")
|
||||||
@@ -517,9 +517,9 @@ async def test_process_review_action_partial_failure(
|
|||||||
test_user_id: str,
|
test_user_id: str,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test handling of partial failures in review processing"""
|
"""Test handling of partial failures in review processing"""
|
||||||
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||||
mock_get_reviews_for_user = mocker.patch(
|
mock_get_reviews_for_user = mocker.patch(
|
||||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||||
)
|
)
|
||||||
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
|
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
|
||||||
|
|
||||||
@@ -567,9 +567,9 @@ async def test_process_review_action_invalid_node_exec_id(
|
|||||||
test_user_id: str,
|
test_user_id: str,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test failure when trying to process review with invalid node execution ID"""
|
"""Test failure when trying to process review with invalid node execution ID"""
|
||||||
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||||
mock_get_reviews_for_user = mocker.patch(
|
mock_get_reviews_for_user = mocker.patch(
|
||||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||||
)
|
)
|
||||||
# Return empty dict to simulate review not found
|
# Return empty dict to simulate review not found
|
||||||
mock_get_reviews_for_user.return_value = {}
|
mock_get_reviews_for_user.return_value = {}
|
||||||
@@ -596,7 +596,7 @@ async def test_process_review_action_invalid_node_exec_id(
|
|||||||
|
|
||||||
# Returns 404 when review is not found
|
# Returns 404 when review is not found
|
||||||
assert response.status_code == 404
|
assert response.status_code == 404
|
||||||
assert "No pending review found" in response.json()["detail"]
|
assert "Review(s) not found" in response.json()["detail"]
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio(loop_scope="session")
|
@pytest.mark.asyncio(loop_scope="session")
|
||||||
@@ -607,9 +607,9 @@ async def test_process_review_action_auto_approve_creates_auto_approval_records(
|
|||||||
test_user_id: str,
|
test_user_id: str,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test that auto_approve_future_actions flag creates auto-approval records"""
|
"""Test that auto_approve_future_actions flag creates auto-approval records"""
|
||||||
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||||
mock_get_reviews_for_user = mocker.patch(
|
mock_get_reviews_for_user = mocker.patch(
|
||||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||||
)
|
)
|
||||||
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
|
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
|
||||||
|
|
||||||
@@ -737,9 +737,9 @@ async def test_process_review_action_without_auto_approve_still_loads_settings(
|
|||||||
test_user_id: str,
|
test_user_id: str,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test that execution context is created with settings even without auto-approve"""
|
"""Test that execution context is created with settings even without auto-approve"""
|
||||||
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||||
mock_get_reviews_for_user = mocker.patch(
|
mock_get_reviews_for_user = mocker.patch(
|
||||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||||
)
|
)
|
||||||
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
|
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
|
||||||
|
|
||||||
@@ -885,9 +885,9 @@ async def test_process_review_action_auto_approve_only_applies_to_approved_revie
|
|||||||
reviewed_at=FIXED_NOW,
|
reviewed_at=FIXED_NOW,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||||
mock_get_reviews_for_user = mocker.patch(
|
mock_get_reviews_for_user = mocker.patch(
|
||||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||||
)
|
)
|
||||||
# Need to return both reviews in WAITING state (before processing)
|
# Need to return both reviews in WAITING state (before processing)
|
||||||
approved_review_waiting = PendingHumanReviewModel(
|
approved_review_waiting = PendingHumanReviewModel(
|
||||||
@@ -1031,9 +1031,9 @@ async def test_process_review_action_per_review_auto_approve_granularity(
|
|||||||
test_user_id: str,
|
test_user_id: str,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test that auto-approval can be set per-review (granular control)"""
|
"""Test that auto-approval can be set per-review (granular control)"""
|
||||||
# Mock get_pending_reviews_by_node_exec_ids - return different reviews based on node_exec_id
|
# Mock get_reviews_by_node_exec_ids - return different reviews based on node_exec_id
|
||||||
mock_get_reviews_for_user = mocker.patch(
|
mock_get_reviews_for_user = mocker.patch(
|
||||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create a mapping of node_exec_id to review
|
# Create a mapping of node_exec_id to review
|
||||||
|
|||||||
@@ -14,9 +14,9 @@ from backend.data.execution import (
|
|||||||
from backend.data.graph import get_graph_settings
|
from backend.data.graph import get_graph_settings
|
||||||
from backend.data.human_review import (
|
from backend.data.human_review import (
|
||||||
create_auto_approval_record,
|
create_auto_approval_record,
|
||||||
get_pending_reviews_by_node_exec_ids,
|
|
||||||
get_pending_reviews_for_execution,
|
get_pending_reviews_for_execution,
|
||||||
get_pending_reviews_for_user,
|
get_pending_reviews_for_user,
|
||||||
|
get_reviews_by_node_exec_ids,
|
||||||
has_pending_reviews_for_graph_exec,
|
has_pending_reviews_for_graph_exec,
|
||||||
process_all_reviews_for_execution,
|
process_all_reviews_for_execution,
|
||||||
)
|
)
|
||||||
@@ -137,17 +137,17 @@ async def process_review_action(
|
|||||||
detail="At least one review must be provided",
|
detail="At least one review must be provided",
|
||||||
)
|
)
|
||||||
|
|
||||||
# Batch fetch all requested reviews
|
# Batch fetch all requested reviews (regardless of status for idempotent handling)
|
||||||
reviews_map = await get_pending_reviews_by_node_exec_ids(
|
reviews_map = await get_reviews_by_node_exec_ids(
|
||||||
list(all_request_node_ids), user_id
|
list(all_request_node_ids), user_id
|
||||||
)
|
)
|
||||||
|
|
||||||
# Validate all reviews were found
|
# Validate all reviews were found (must exist, any status is OK for now)
|
||||||
missing_ids = all_request_node_ids - set(reviews_map.keys())
|
missing_ids = all_request_node_ids - set(reviews_map.keys())
|
||||||
if missing_ids:
|
if missing_ids:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_404_NOT_FOUND,
|
status_code=status.HTTP_404_NOT_FOUND,
|
||||||
detail=f"No pending review found for node execution(s): {', '.join(missing_ids)}",
|
detail=f"Review(s) not found: {', '.join(missing_ids)}",
|
||||||
)
|
)
|
||||||
|
|
||||||
# Validate all reviews belong to the same execution
|
# Validate all reviews belong to the same execution
|
||||||
|
|||||||
@@ -263,11 +263,14 @@ async def get_pending_review_by_node_exec_id(
|
|||||||
return PendingHumanReviewModel.from_db(review, node_id=node_id)
|
return PendingHumanReviewModel.from_db(review, node_id=node_id)
|
||||||
|
|
||||||
|
|
||||||
async def get_pending_reviews_by_node_exec_ids(
|
async def get_reviews_by_node_exec_ids(
|
||||||
node_exec_ids: list[str], user_id: str
|
node_exec_ids: list[str], user_id: str
|
||||||
) -> dict[str, "PendingHumanReviewModel"]:
|
) -> dict[str, "PendingHumanReviewModel"]:
|
||||||
"""
|
"""
|
||||||
Get multiple pending reviews by their node execution IDs in a single batch query.
|
Get multiple reviews by their node execution IDs regardless of status.
|
||||||
|
|
||||||
|
Unlike get_pending_reviews_by_node_exec_ids, this returns reviews in any status
|
||||||
|
(WAITING, APPROVED, REJECTED). Used for validation in idempotent operations.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
node_exec_ids: List of node execution IDs to look up
|
node_exec_ids: List of node execution IDs to look up
|
||||||
@@ -283,7 +286,6 @@ async def get_pending_reviews_by_node_exec_ids(
|
|||||||
where={
|
where={
|
||||||
"nodeExecId": {"in": node_exec_ids},
|
"nodeExecId": {"in": node_exec_ids},
|
||||||
"userId": user_id,
|
"userId": user_id,
|
||||||
"status": ReviewStatus.WAITING,
|
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -407,38 +409,68 @@ async def process_all_reviews_for_execution(
|
|||||||
) -> dict[str, PendingHumanReviewModel]:
|
) -> dict[str, PendingHumanReviewModel]:
|
||||||
"""Process all pending reviews for an execution with approve/reject decisions.
|
"""Process all pending reviews for an execution with approve/reject decisions.
|
||||||
|
|
||||||
|
Handles race conditions gracefully: if a review was already processed with the
|
||||||
|
same decision by a concurrent request, it's treated as success rather than error.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
user_id: User ID for ownership validation
|
user_id: User ID for ownership validation
|
||||||
review_decisions: Map of node_exec_id -> (status, reviewed_data, message)
|
review_decisions: Map of node_exec_id -> (status, reviewed_data, message)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Dict of node_exec_id -> updated review model
|
Dict of node_exec_id -> updated review model (includes already-processed reviews)
|
||||||
"""
|
"""
|
||||||
if not review_decisions:
|
if not review_decisions:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
node_exec_ids = list(review_decisions.keys())
|
node_exec_ids = list(review_decisions.keys())
|
||||||
|
|
||||||
# Get all reviews for validation
|
# Get all reviews (both WAITING and already processed) for the user
|
||||||
reviews = await PendingHumanReview.prisma().find_many(
|
all_reviews = await PendingHumanReview.prisma().find_many(
|
||||||
where={
|
where={
|
||||||
"nodeExecId": {"in": node_exec_ids},
|
"nodeExecId": {"in": node_exec_ids},
|
||||||
"userId": user_id,
|
"userId": user_id,
|
||||||
"status": ReviewStatus.WAITING,
|
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
# Validate all reviews can be processed
|
# Separate into pending and already-processed reviews
|
||||||
if len(reviews) != len(node_exec_ids):
|
reviews_to_process = []
|
||||||
missing_ids = set(node_exec_ids) - {review.nodeExecId for review in reviews}
|
already_processed = []
|
||||||
|
for review in all_reviews:
|
||||||
|
if review.status == ReviewStatus.WAITING:
|
||||||
|
reviews_to_process.append(review)
|
||||||
|
else:
|
||||||
|
already_processed.append(review)
|
||||||
|
|
||||||
|
# Check for truly missing reviews (not found at all)
|
||||||
|
found_ids = {review.nodeExecId for review in all_reviews}
|
||||||
|
missing_ids = set(node_exec_ids) - found_ids
|
||||||
|
if missing_ids:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"Reviews not found, access denied, or not in WAITING status: {', '.join(missing_ids)}"
|
f"Reviews not found or access denied: {', '.join(missing_ids)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create parallel update tasks
|
# Validate already-processed reviews have compatible status (same decision)
|
||||||
|
# This handles race conditions where another request processed the same reviews
|
||||||
|
for review in already_processed:
|
||||||
|
requested_status = review_decisions[review.nodeExecId][0]
|
||||||
|
if review.status != requested_status:
|
||||||
|
raise ValueError(
|
||||||
|
f"Review {review.nodeExecId} was already processed with status "
|
||||||
|
f"{review.status}, cannot change to {requested_status}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Log if we're handling a race condition (some reviews already processed)
|
||||||
|
if already_processed:
|
||||||
|
already_processed_ids = [r.nodeExecId for r in already_processed]
|
||||||
|
logger.info(
|
||||||
|
f"Race condition handled: {len(already_processed)} review(s) already "
|
||||||
|
f"processed by concurrent request: {already_processed_ids}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create parallel update tasks for reviews that still need processing
|
||||||
update_tasks = []
|
update_tasks = []
|
||||||
|
|
||||||
for review in reviews:
|
for review in reviews_to_process:
|
||||||
new_status, reviewed_data, message = review_decisions[review.nodeExecId]
|
new_status, reviewed_data, message = review_decisions[review.nodeExecId]
|
||||||
has_data_changes = reviewed_data is not None and reviewed_data != review.payload
|
has_data_changes = reviewed_data is not None and reviewed_data != review.payload
|
||||||
|
|
||||||
@@ -463,7 +495,7 @@ async def process_all_reviews_for_execution(
|
|||||||
update_tasks.append(task)
|
update_tasks.append(task)
|
||||||
|
|
||||||
# Execute all updates in parallel and get updated reviews
|
# Execute all updates in parallel and get updated reviews
|
||||||
updated_reviews = await asyncio.gather(*update_tasks)
|
updated_reviews = await asyncio.gather(*update_tasks) if update_tasks else []
|
||||||
|
|
||||||
# Note: Execution resumption is now handled at the API layer after ALL reviews
|
# Note: Execution resumption is now handled at the API layer after ALL reviews
|
||||||
# for an execution are processed (both approved and rejected)
|
# for an execution are processed (both approved and rejected)
|
||||||
@@ -472,8 +504,11 @@ async def process_all_reviews_for_execution(
|
|||||||
# Local import to avoid event loop conflicts in tests
|
# Local import to avoid event loop conflicts in tests
|
||||||
from backend.data.execution import get_node_execution
|
from backend.data.execution import get_node_execution
|
||||||
|
|
||||||
|
# Combine updated reviews with already-processed ones (for idempotent response)
|
||||||
|
all_result_reviews = list(updated_reviews) + already_processed
|
||||||
|
|
||||||
result = {}
|
result = {}
|
||||||
for review in updated_reviews:
|
for review in all_result_reviews:
|
||||||
node_exec = await get_node_execution(review.nodeExecId)
|
node_exec = await get_node_execution(review.nodeExecId)
|
||||||
node_id = node_exec.node_id if node_exec else review.nodeExecId
|
node_id = node_exec.node_id if node_exec else review.nodeExecId
|
||||||
result[review.nodeExecId] = PendingHumanReviewModel.from_db(
|
result[review.nodeExecId] = PendingHumanReviewModel.from_db(
|
||||||
|
|||||||
Reference in New Issue
Block a user