diff --git a/autogpt_platform/backend/backend/api/features/executions/review/routes.py b/autogpt_platform/backend/backend/api/features/executions/review/routes.py index a10071e9cb..539c7fd87b 100644 --- a/autogpt_platform/backend/backend/api/features/executions/review/routes.py +++ b/autogpt_platform/backend/backend/api/features/executions/review/routes.py @@ -14,9 +14,9 @@ from backend.data.execution import ( from backend.data.graph import get_graph_settings from backend.data.human_review import ( create_auto_approval_record, - get_pending_reviews_by_node_exec_ids, get_pending_reviews_for_execution, get_pending_reviews_for_user, + get_reviews_by_node_exec_ids, has_pending_reviews_for_graph_exec, process_all_reviews_for_execution, ) @@ -137,17 +137,17 @@ async def process_review_action( detail="At least one review must be provided", ) - # Batch fetch all requested reviews - reviews_map = await get_pending_reviews_by_node_exec_ids( + # Batch fetch all requested reviews (regardless of status for idempotent handling) + reviews_map = await get_reviews_by_node_exec_ids( list(all_request_node_ids), user_id ) - # Validate all reviews were found + # Validate all reviews were found (must exist, any status is OK for now) missing_ids = all_request_node_ids - set(reviews_map.keys()) if missing_ids: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, - detail=f"No pending review found for node execution(s): {', '.join(missing_ids)}", + detail=f"Review(s) not found: {', '.join(missing_ids)}", ) # Validate all reviews belong to the same execution diff --git a/autogpt_platform/backend/backend/data/human_review.py b/autogpt_platform/backend/backend/data/human_review.py index c70eaa7b64..c008cad57d 100644 --- a/autogpt_platform/backend/backend/data/human_review.py +++ b/autogpt_platform/backend/backend/data/human_review.py @@ -312,6 +312,57 @@ async def get_pending_reviews_by_node_exec_ids( return result +async def get_reviews_by_node_exec_ids( + node_exec_ids: list[str], user_id: str +) -> dict[str, "PendingHumanReviewModel"]: + """ + Get multiple reviews by their node execution IDs regardless of status. + + Unlike get_pending_reviews_by_node_exec_ids, this returns reviews in any status + (WAITING, APPROVED, REJECTED). Used for validation in idempotent operations. + + Args: + node_exec_ids: List of node execution IDs to look up + user_id: User ID for authorization (only returns reviews belonging to this user) + + Returns: + Dictionary mapping node_exec_id -> PendingHumanReviewModel for found reviews + """ + if not node_exec_ids: + return {} + + reviews = await PendingHumanReview.prisma().find_many( + where={ + "nodeExecId": {"in": node_exec_ids}, + "userId": user_id, + } + ) + + if not reviews: + return {} + + # Batch fetch all node executions to avoid N+1 queries + node_exec_ids_to_fetch = [review.nodeExecId for review in reviews] + node_execs = await AgentNodeExecution.prisma().find_many( + where={"id": {"in": node_exec_ids_to_fetch}}, + include={"Node": True}, + ) + + # Create mapping from node_exec_id to node_id + node_exec_id_to_node_id = { + node_exec.id: node_exec.agentNodeId for node_exec in node_execs + } + + result = {} + for review in reviews: + node_id = node_exec_id_to_node_id.get(review.nodeExecId, review.nodeExecId) + result[review.nodeExecId] = PendingHumanReviewModel.from_db( + review, node_id=node_id + ) + + return result + + async def has_pending_reviews_for_graph_exec(graph_exec_id: str) -> bool: """ Check if a graph execution has any pending reviews. @@ -407,38 +458,68 @@ async def process_all_reviews_for_execution( ) -> dict[str, PendingHumanReviewModel]: """Process all pending reviews for an execution with approve/reject decisions. + Handles race conditions gracefully: if a review was already processed with the + same decision by a concurrent request, it's treated as success rather than error. + Args: user_id: User ID for ownership validation review_decisions: Map of node_exec_id -> (status, reviewed_data, message) Returns: - Dict of node_exec_id -> updated review model + Dict of node_exec_id -> updated review model (includes already-processed reviews) """ if not review_decisions: return {} node_exec_ids = list(review_decisions.keys()) - # Get all reviews for validation - reviews = await PendingHumanReview.prisma().find_many( + # Get all reviews (both WAITING and already processed) for the user + all_reviews = await PendingHumanReview.prisma().find_many( where={ "nodeExecId": {"in": node_exec_ids}, "userId": user_id, - "status": ReviewStatus.WAITING, }, ) - # Validate all reviews can be processed - if len(reviews) != len(node_exec_ids): - missing_ids = set(node_exec_ids) - {review.nodeExecId for review in reviews} + # Separate into pending and already-processed reviews + reviews_to_process = [] + already_processed = [] + for review in all_reviews: + if review.status == ReviewStatus.WAITING: + reviews_to_process.append(review) + else: + already_processed.append(review) + + # Check for truly missing reviews (not found at all) + found_ids = {review.nodeExecId for review in all_reviews} + missing_ids = set(node_exec_ids) - found_ids + if missing_ids: raise ValueError( - f"Reviews not found, access denied, or not in WAITING status: {', '.join(missing_ids)}" + f"Reviews not found or access denied: {', '.join(missing_ids)}" ) - # Create parallel update tasks + # Validate already-processed reviews have compatible status (same decision) + # This handles race conditions where another request processed the same reviews + for review in already_processed: + requested_status = review_decisions[review.nodeExecId][0] + if review.status != requested_status: + raise ValueError( + f"Review {review.nodeExecId} was already processed with status " + f"{review.status}, cannot change to {requested_status}" + ) + + # Log if we're handling a race condition (some reviews already processed) + if already_processed: + already_processed_ids = [r.nodeExecId for r in already_processed] + logger.info( + f"Race condition handled: {len(already_processed)} review(s) already " + f"processed by concurrent request: {already_processed_ids}" + ) + + # Create parallel update tasks for reviews that still need processing update_tasks = [] - for review in reviews: + for review in reviews_to_process: new_status, reviewed_data, message = review_decisions[review.nodeExecId] has_data_changes = reviewed_data is not None and reviewed_data != review.payload @@ -463,7 +544,7 @@ async def process_all_reviews_for_execution( update_tasks.append(task) # Execute all updates in parallel and get updated reviews - updated_reviews = await asyncio.gather(*update_tasks) + updated_reviews = await asyncio.gather(*update_tasks) if update_tasks else [] # Note: Execution resumption is now handled at the API layer after ALL reviews # for an execution are processed (both approved and rejected) @@ -472,8 +553,11 @@ async def process_all_reviews_for_execution( # Local import to avoid event loop conflicts in tests from backend.data.execution import get_node_execution + # Combine updated reviews with already-processed ones (for idempotent response) + all_result_reviews = list(updated_reviews) + already_processed + result = {} - for review in updated_reviews: + for review in all_result_reviews: node_exec = await get_node_execution(review.nodeExecId) node_id = node_exec.node_id if node_exec else review.nodeExecId result[review.nodeExecId] = PendingHumanReviewModel.from_db(