mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-01-26 15:38:14 -05:00
fix(backend): handle race condition in review processing gracefully
When multiple concurrent requests try to process the same reviews (e.g., double-click, multiple tabs), the second request would fail with "Reviews not found, access denied, or not in WAITING status". Now handles this gracefully: - Already-processed reviews with the same decision are treated as success - Only raises error if review doesn't exist or has conflicting status - Logs when race condition is detected for debugging Fixes AUTOGPT-SERVER-7HE
This commit is contained in:
@@ -14,9 +14,9 @@ from backend.data.execution import (
|
||||
from backend.data.graph import get_graph_settings
|
||||
from backend.data.human_review import (
|
||||
create_auto_approval_record,
|
||||
get_pending_reviews_by_node_exec_ids,
|
||||
get_pending_reviews_for_execution,
|
||||
get_pending_reviews_for_user,
|
||||
get_reviews_by_node_exec_ids,
|
||||
has_pending_reviews_for_graph_exec,
|
||||
process_all_reviews_for_execution,
|
||||
)
|
||||
@@ -137,17 +137,17 @@ async def process_review_action(
|
||||
detail="At least one review must be provided",
|
||||
)
|
||||
|
||||
# Batch fetch all requested reviews
|
||||
reviews_map = await get_pending_reviews_by_node_exec_ids(
|
||||
# Batch fetch all requested reviews (regardless of status for idempotent handling)
|
||||
reviews_map = await get_reviews_by_node_exec_ids(
|
||||
list(all_request_node_ids), user_id
|
||||
)
|
||||
|
||||
# Validate all reviews were found
|
||||
# Validate all reviews were found (must exist, any status is OK for now)
|
||||
missing_ids = all_request_node_ids - set(reviews_map.keys())
|
||||
if missing_ids:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"No pending review found for node execution(s): {', '.join(missing_ids)}",
|
||||
detail=f"Review(s) not found: {', '.join(missing_ids)}",
|
||||
)
|
||||
|
||||
# Validate all reviews belong to the same execution
|
||||
|
||||
@@ -312,6 +312,57 @@ async def get_pending_reviews_by_node_exec_ids(
|
||||
return result
|
||||
|
||||
|
||||
async def get_reviews_by_node_exec_ids(
|
||||
node_exec_ids: list[str], user_id: str
|
||||
) -> dict[str, "PendingHumanReviewModel"]:
|
||||
"""
|
||||
Get multiple reviews by their node execution IDs regardless of status.
|
||||
|
||||
Unlike get_pending_reviews_by_node_exec_ids, this returns reviews in any status
|
||||
(WAITING, APPROVED, REJECTED). Used for validation in idempotent operations.
|
||||
|
||||
Args:
|
||||
node_exec_ids: List of node execution IDs to look up
|
||||
user_id: User ID for authorization (only returns reviews belonging to this user)
|
||||
|
||||
Returns:
|
||||
Dictionary mapping node_exec_id -> PendingHumanReviewModel for found reviews
|
||||
"""
|
||||
if not node_exec_ids:
|
||||
return {}
|
||||
|
||||
reviews = await PendingHumanReview.prisma().find_many(
|
||||
where={
|
||||
"nodeExecId": {"in": node_exec_ids},
|
||||
"userId": user_id,
|
||||
}
|
||||
)
|
||||
|
||||
if not reviews:
|
||||
return {}
|
||||
|
||||
# Batch fetch all node executions to avoid N+1 queries
|
||||
node_exec_ids_to_fetch = [review.nodeExecId for review in reviews]
|
||||
node_execs = await AgentNodeExecution.prisma().find_many(
|
||||
where={"id": {"in": node_exec_ids_to_fetch}},
|
||||
include={"Node": True},
|
||||
)
|
||||
|
||||
# Create mapping from node_exec_id to node_id
|
||||
node_exec_id_to_node_id = {
|
||||
node_exec.id: node_exec.agentNodeId for node_exec in node_execs
|
||||
}
|
||||
|
||||
result = {}
|
||||
for review in reviews:
|
||||
node_id = node_exec_id_to_node_id.get(review.nodeExecId, review.nodeExecId)
|
||||
result[review.nodeExecId] = PendingHumanReviewModel.from_db(
|
||||
review, node_id=node_id
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
async def has_pending_reviews_for_graph_exec(graph_exec_id: str) -> bool:
|
||||
"""
|
||||
Check if a graph execution has any pending reviews.
|
||||
@@ -407,38 +458,68 @@ async def process_all_reviews_for_execution(
|
||||
) -> dict[str, PendingHumanReviewModel]:
|
||||
"""Process all pending reviews for an execution with approve/reject decisions.
|
||||
|
||||
Handles race conditions gracefully: if a review was already processed with the
|
||||
same decision by a concurrent request, it's treated as success rather than error.
|
||||
|
||||
Args:
|
||||
user_id: User ID for ownership validation
|
||||
review_decisions: Map of node_exec_id -> (status, reviewed_data, message)
|
||||
|
||||
Returns:
|
||||
Dict of node_exec_id -> updated review model
|
||||
Dict of node_exec_id -> updated review model (includes already-processed reviews)
|
||||
"""
|
||||
if not review_decisions:
|
||||
return {}
|
||||
|
||||
node_exec_ids = list(review_decisions.keys())
|
||||
|
||||
# Get all reviews for validation
|
||||
reviews = await PendingHumanReview.prisma().find_many(
|
||||
# Get all reviews (both WAITING and already processed) for the user
|
||||
all_reviews = await PendingHumanReview.prisma().find_many(
|
||||
where={
|
||||
"nodeExecId": {"in": node_exec_ids},
|
||||
"userId": user_id,
|
||||
"status": ReviewStatus.WAITING,
|
||||
},
|
||||
)
|
||||
|
||||
# Validate all reviews can be processed
|
||||
if len(reviews) != len(node_exec_ids):
|
||||
missing_ids = set(node_exec_ids) - {review.nodeExecId for review in reviews}
|
||||
# Separate into pending and already-processed reviews
|
||||
reviews_to_process = []
|
||||
already_processed = []
|
||||
for review in all_reviews:
|
||||
if review.status == ReviewStatus.WAITING:
|
||||
reviews_to_process.append(review)
|
||||
else:
|
||||
already_processed.append(review)
|
||||
|
||||
# Check for truly missing reviews (not found at all)
|
||||
found_ids = {review.nodeExecId for review in all_reviews}
|
||||
missing_ids = set(node_exec_ids) - found_ids
|
||||
if missing_ids:
|
||||
raise ValueError(
|
||||
f"Reviews not found, access denied, or not in WAITING status: {', '.join(missing_ids)}"
|
||||
f"Reviews not found or access denied: {', '.join(missing_ids)}"
|
||||
)
|
||||
|
||||
# Create parallel update tasks
|
||||
# Validate already-processed reviews have compatible status (same decision)
|
||||
# This handles race conditions where another request processed the same reviews
|
||||
for review in already_processed:
|
||||
requested_status = review_decisions[review.nodeExecId][0]
|
||||
if review.status != requested_status:
|
||||
raise ValueError(
|
||||
f"Review {review.nodeExecId} was already processed with status "
|
||||
f"{review.status}, cannot change to {requested_status}"
|
||||
)
|
||||
|
||||
# Log if we're handling a race condition (some reviews already processed)
|
||||
if already_processed:
|
||||
already_processed_ids = [r.nodeExecId for r in already_processed]
|
||||
logger.info(
|
||||
f"Race condition handled: {len(already_processed)} review(s) already "
|
||||
f"processed by concurrent request: {already_processed_ids}"
|
||||
)
|
||||
|
||||
# Create parallel update tasks for reviews that still need processing
|
||||
update_tasks = []
|
||||
|
||||
for review in reviews:
|
||||
for review in reviews_to_process:
|
||||
new_status, reviewed_data, message = review_decisions[review.nodeExecId]
|
||||
has_data_changes = reviewed_data is not None and reviewed_data != review.payload
|
||||
|
||||
@@ -463,7 +544,7 @@ async def process_all_reviews_for_execution(
|
||||
update_tasks.append(task)
|
||||
|
||||
# Execute all updates in parallel and get updated reviews
|
||||
updated_reviews = await asyncio.gather(*update_tasks)
|
||||
updated_reviews = await asyncio.gather(*update_tasks) if update_tasks else []
|
||||
|
||||
# Note: Execution resumption is now handled at the API layer after ALL reviews
|
||||
# for an execution are processed (both approved and rejected)
|
||||
@@ -472,8 +553,11 @@ async def process_all_reviews_for_execution(
|
||||
# Local import to avoid event loop conflicts in tests
|
||||
from backend.data.execution import get_node_execution
|
||||
|
||||
# Combine updated reviews with already-processed ones (for idempotent response)
|
||||
all_result_reviews = list(updated_reviews) + already_processed
|
||||
|
||||
result = {}
|
||||
for review in updated_reviews:
|
||||
for review in all_result_reviews:
|
||||
node_exec = await get_node_execution(review.nodeExecId)
|
||||
node_id = node_exec.node_id if node_exec else review.nodeExecId
|
||||
result[review.nodeExecId] = PendingHumanReviewModel.from_db(
|
||||
|
||||
Reference in New Issue
Block a user