mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-01-26 23:48:11 -05:00
Compare commits
3 Commits
feat/agent
...
fix/review
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b08f1d7a0 | ||
|
|
fe69677758 | ||
|
|
d9864687fd |
@@ -164,9 +164,9 @@ async def test_process_review_action_approve_success(
|
||||
"""Test successful review approval"""
|
||||
# Mock the route functions
|
||||
|
||||
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
||||
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||
)
|
||||
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
|
||||
|
||||
@@ -244,9 +244,9 @@ async def test_process_review_action_reject_success(
|
||||
"""Test successful review rejection"""
|
||||
# Mock the route functions
|
||||
|
||||
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
||||
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||
)
|
||||
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
|
||||
|
||||
@@ -339,9 +339,9 @@ async def test_process_review_action_mixed_success(
|
||||
|
||||
# Mock the route functions
|
||||
|
||||
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
||||
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||
)
|
||||
mock_get_reviews_for_user.return_value = {
|
||||
"test_node_123": sample_pending_review,
|
||||
@@ -463,9 +463,9 @@ async def test_process_review_action_review_not_found(
|
||||
test_user_id: str,
|
||||
) -> None:
|
||||
"""Test error when review is not found"""
|
||||
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
||||
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||
)
|
||||
# Return empty dict to simulate review not found
|
||||
mock_get_reviews_for_user.return_value = {}
|
||||
@@ -506,7 +506,7 @@ async def test_process_review_action_review_not_found(
|
||||
response = await client.post("/api/review/action", json=request_data)
|
||||
|
||||
assert response.status_code == 404
|
||||
assert "No pending review found" in response.json()["detail"]
|
||||
assert "Review(s) not found" in response.json()["detail"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
@@ -517,9 +517,9 @@ async def test_process_review_action_partial_failure(
|
||||
test_user_id: str,
|
||||
) -> None:
|
||||
"""Test handling of partial failures in review processing"""
|
||||
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
||||
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||
)
|
||||
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
|
||||
|
||||
@@ -567,9 +567,9 @@ async def test_process_review_action_invalid_node_exec_id(
|
||||
test_user_id: str,
|
||||
) -> None:
|
||||
"""Test failure when trying to process review with invalid node execution ID"""
|
||||
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
||||
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||
)
|
||||
# Return empty dict to simulate review not found
|
||||
mock_get_reviews_for_user.return_value = {}
|
||||
@@ -596,7 +596,7 @@ async def test_process_review_action_invalid_node_exec_id(
|
||||
|
||||
# Returns 404 when review is not found
|
||||
assert response.status_code == 404
|
||||
assert "No pending review found" in response.json()["detail"]
|
||||
assert "Review(s) not found" in response.json()["detail"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
@@ -607,9 +607,9 @@ async def test_process_review_action_auto_approve_creates_auto_approval_records(
|
||||
test_user_id: str,
|
||||
) -> None:
|
||||
"""Test that auto_approve_future_actions flag creates auto-approval records"""
|
||||
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
||||
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||
)
|
||||
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
|
||||
|
||||
@@ -737,9 +737,9 @@ async def test_process_review_action_without_auto_approve_still_loads_settings(
|
||||
test_user_id: str,
|
||||
) -> None:
|
||||
"""Test that execution context is created with settings even without auto-approve"""
|
||||
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
||||
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||
)
|
||||
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
|
||||
|
||||
@@ -885,9 +885,9 @@ async def test_process_review_action_auto_approve_only_applies_to_approved_revie
|
||||
reviewed_at=FIXED_NOW,
|
||||
)
|
||||
|
||||
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
||||
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||
)
|
||||
# Need to return both reviews in WAITING state (before processing)
|
||||
approved_review_waiting = PendingHumanReviewModel(
|
||||
@@ -1031,9 +1031,9 @@ async def test_process_review_action_per_review_auto_approve_granularity(
|
||||
test_user_id: str,
|
||||
) -> None:
|
||||
"""Test that auto-approval can be set per-review (granular control)"""
|
||||
# Mock get_pending_reviews_by_node_exec_ids - return different reviews based on node_exec_id
|
||||
# Mock get_reviews_by_node_exec_ids - return different reviews based on node_exec_id
|
||||
mock_get_reviews_for_user = mocker.patch(
|
||||
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
|
||||
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
|
||||
)
|
||||
|
||||
# Create a mapping of node_exec_id to review
|
||||
|
||||
@@ -14,9 +14,9 @@ from backend.data.execution import (
|
||||
from backend.data.graph import get_graph_settings
|
||||
from backend.data.human_review import (
|
||||
create_auto_approval_record,
|
||||
get_pending_reviews_by_node_exec_ids,
|
||||
get_pending_reviews_for_execution,
|
||||
get_pending_reviews_for_user,
|
||||
get_reviews_by_node_exec_ids,
|
||||
has_pending_reviews_for_graph_exec,
|
||||
process_all_reviews_for_execution,
|
||||
)
|
||||
@@ -137,17 +137,17 @@ async def process_review_action(
|
||||
detail="At least one review must be provided",
|
||||
)
|
||||
|
||||
# Batch fetch all requested reviews
|
||||
reviews_map = await get_pending_reviews_by_node_exec_ids(
|
||||
# Batch fetch all requested reviews (regardless of status for idempotent handling)
|
||||
reviews_map = await get_reviews_by_node_exec_ids(
|
||||
list(all_request_node_ids), user_id
|
||||
)
|
||||
|
||||
# Validate all reviews were found
|
||||
# Validate all reviews were found (must exist, any status is OK for now)
|
||||
missing_ids = all_request_node_ids - set(reviews_map.keys())
|
||||
if missing_ids:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"No pending review found for node execution(s): {', '.join(missing_ids)}",
|
||||
detail=f"Review(s) not found: {', '.join(missing_ids)}",
|
||||
)
|
||||
|
||||
# Validate all reviews belong to the same execution
|
||||
|
||||
@@ -263,11 +263,14 @@ async def get_pending_review_by_node_exec_id(
|
||||
return PendingHumanReviewModel.from_db(review, node_id=node_id)
|
||||
|
||||
|
||||
async def get_pending_reviews_by_node_exec_ids(
|
||||
async def get_reviews_by_node_exec_ids(
|
||||
node_exec_ids: list[str], user_id: str
|
||||
) -> dict[str, "PendingHumanReviewModel"]:
|
||||
"""
|
||||
Get multiple pending reviews by their node execution IDs in a single batch query.
|
||||
Get multiple reviews by their node execution IDs regardless of status.
|
||||
|
||||
Unlike get_pending_reviews_by_node_exec_ids, this returns reviews in any status
|
||||
(WAITING, APPROVED, REJECTED). Used for validation in idempotent operations.
|
||||
|
||||
Args:
|
||||
node_exec_ids: List of node execution IDs to look up
|
||||
@@ -283,7 +286,6 @@ async def get_pending_reviews_by_node_exec_ids(
|
||||
where={
|
||||
"nodeExecId": {"in": node_exec_ids},
|
||||
"userId": user_id,
|
||||
"status": ReviewStatus.WAITING,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -407,38 +409,68 @@ async def process_all_reviews_for_execution(
|
||||
) -> dict[str, PendingHumanReviewModel]:
|
||||
"""Process all pending reviews for an execution with approve/reject decisions.
|
||||
|
||||
Handles race conditions gracefully: if a review was already processed with the
|
||||
same decision by a concurrent request, it's treated as success rather than error.
|
||||
|
||||
Args:
|
||||
user_id: User ID for ownership validation
|
||||
review_decisions: Map of node_exec_id -> (status, reviewed_data, message)
|
||||
|
||||
Returns:
|
||||
Dict of node_exec_id -> updated review model
|
||||
Dict of node_exec_id -> updated review model (includes already-processed reviews)
|
||||
"""
|
||||
if not review_decisions:
|
||||
return {}
|
||||
|
||||
node_exec_ids = list(review_decisions.keys())
|
||||
|
||||
# Get all reviews for validation
|
||||
reviews = await PendingHumanReview.prisma().find_many(
|
||||
# Get all reviews (both WAITING and already processed) for the user
|
||||
all_reviews = await PendingHumanReview.prisma().find_many(
|
||||
where={
|
||||
"nodeExecId": {"in": node_exec_ids},
|
||||
"userId": user_id,
|
||||
"status": ReviewStatus.WAITING,
|
||||
},
|
||||
)
|
||||
|
||||
# Validate all reviews can be processed
|
||||
if len(reviews) != len(node_exec_ids):
|
||||
missing_ids = set(node_exec_ids) - {review.nodeExecId for review in reviews}
|
||||
# Separate into pending and already-processed reviews
|
||||
reviews_to_process = []
|
||||
already_processed = []
|
||||
for review in all_reviews:
|
||||
if review.status == ReviewStatus.WAITING:
|
||||
reviews_to_process.append(review)
|
||||
else:
|
||||
already_processed.append(review)
|
||||
|
||||
# Check for truly missing reviews (not found at all)
|
||||
found_ids = {review.nodeExecId for review in all_reviews}
|
||||
missing_ids = set(node_exec_ids) - found_ids
|
||||
if missing_ids:
|
||||
raise ValueError(
|
||||
f"Reviews not found, access denied, or not in WAITING status: {', '.join(missing_ids)}"
|
||||
f"Reviews not found or access denied: {', '.join(missing_ids)}"
|
||||
)
|
||||
|
||||
# Create parallel update tasks
|
||||
# Validate already-processed reviews have compatible status (same decision)
|
||||
# This handles race conditions where another request processed the same reviews
|
||||
for review in already_processed:
|
||||
requested_status = review_decisions[review.nodeExecId][0]
|
||||
if review.status != requested_status:
|
||||
raise ValueError(
|
||||
f"Review {review.nodeExecId} was already processed with status "
|
||||
f"{review.status}, cannot change to {requested_status}"
|
||||
)
|
||||
|
||||
# Log if we're handling a race condition (some reviews already processed)
|
||||
if already_processed:
|
||||
already_processed_ids = [r.nodeExecId for r in already_processed]
|
||||
logger.info(
|
||||
f"Race condition handled: {len(already_processed)} review(s) already "
|
||||
f"processed by concurrent request: {already_processed_ids}"
|
||||
)
|
||||
|
||||
# Create parallel update tasks for reviews that still need processing
|
||||
update_tasks = []
|
||||
|
||||
for review in reviews:
|
||||
for review in reviews_to_process:
|
||||
new_status, reviewed_data, message = review_decisions[review.nodeExecId]
|
||||
has_data_changes = reviewed_data is not None and reviewed_data != review.payload
|
||||
|
||||
@@ -463,7 +495,7 @@ async def process_all_reviews_for_execution(
|
||||
update_tasks.append(task)
|
||||
|
||||
# Execute all updates in parallel and get updated reviews
|
||||
updated_reviews = await asyncio.gather(*update_tasks)
|
||||
updated_reviews = await asyncio.gather(*update_tasks) if update_tasks else []
|
||||
|
||||
# Note: Execution resumption is now handled at the API layer after ALL reviews
|
||||
# for an execution are processed (both approved and rejected)
|
||||
@@ -472,8 +504,11 @@ async def process_all_reviews_for_execution(
|
||||
# Local import to avoid event loop conflicts in tests
|
||||
from backend.data.execution import get_node_execution
|
||||
|
||||
# Combine updated reviews with already-processed ones (for idempotent response)
|
||||
all_result_reviews = list(updated_reviews) + already_processed
|
||||
|
||||
result = {}
|
||||
for review in updated_reviews:
|
||||
for review in all_result_reviews:
|
||||
node_exec = await get_node_execution(review.nodeExecId)
|
||||
node_id = node_exec.node_id if node_exec else review.nodeExecId
|
||||
result[review.nodeExecId] = PendingHumanReviewModel.from_db(
|
||||
|
||||
Reference in New Issue
Block a user