feat(hitl): Add backend validation to prevent review processing during RUNNING/QUEUED status

Defense in depth: validate execution status before processing reviews.

Before:
- Reviews could be processed regardless of execution status
- Could cause race conditions and deadlocks
- User confusion when reviews processed but execution still running

After:
- Reject review processing with 409 Conflict if status is not REVIEW/INCOMPLETE
- Only allow processing when execution is actually paused for review
- Clear error message explaining why the request was rejected

Benefits:
1. **Prevention over cure**: Stop invalid requests before processing
2. **Clear semantics**: Reviews can only be processed when execution paused
3. **Better UX**: User gets immediate feedback if they try to approve too early
4. **Simpler resume logic**: No need for complex status checks since we validate upfront

Changes:
- Fetch graph execution metadata early in the endpoint
- Validate status is REVIEW or INCOMPLETE before processing
- Removed redundant status checks in resume logic (already validated)
- Simplified resume flow: just check if pending reviews remain
- Fixed comment: 'all pending reviews' not 'some reviews'
This commit is contained in:
Zamil Majdy
2026-01-22 18:22:21 -05:00
parent e4c3f9995b
commit 71a6969bbd

View File

@@ -135,6 +135,45 @@ async def process_review_action(
detail="At least one review must be provided",
)
# Get graph execution ID from pending reviews to validate execution status
all_pending = await get_pending_reviews_for_user(user_id)
matching_review = next(
(r for r in all_pending if r.node_exec_id in all_request_node_ids),
None,
)
if not matching_review:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No pending reviews found for the requested node executions",
)
graph_exec_id = matching_review.graph_exec_id
# Validate execution status before processing reviews
graph_exec_meta = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec_meta:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph execution #{graph_exec_id} not found",
)
# Only allow processing reviews if execution is paused for review
# or incomplete (partial execution with some reviews already processed)
if graph_exec_meta.status not in (
ExecutionStatus.REVIEW,
ExecutionStatus.INCOMPLETE,
):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Cannot process reviews while execution status is {graph_exec_meta.status}. "
f"Reviews can only be processed when execution is paused (REVIEW status). "
f"Current status: {graph_exec_meta.status}",
)
# Build review decisions map
# Auto-approved reviews use original data (no modifications allowed)
review_decisions = {}
@@ -196,53 +235,13 @@ async def process_review_action(
if review.status == ReviewStatus.REJECTED
)
# Resume execution if we processed some reviews
# Resume execution only if ALL pending reviews for this execution have been processed
if updated_reviews:
# Get graph execution ID from any processed review
first_review = next(iter(updated_reviews.values()))
graph_exec_id = first_review.graph_exec_id
graph_exec_meta = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec_meta:
logger.error(
f"Graph execution {graph_exec_id} not found after processing reviews"
)
return ReviewResponse(
approved_count=approved_count,
rejected_count=rejected_count,
failed_count=0,
error=None,
)
still_has_pending = await has_pending_reviews_for_graph_exec(graph_exec_id)
if not still_has_pending:
# Don't resume if execution is already completed or failed
if graph_exec_meta.status in (
ExecutionStatus.COMPLETED,
ExecutionStatus.FAILED,
):
logger.info(
f"Skipping resume for execution {graph_exec_id} - "
f"status is {graph_exec_meta.status} (already finished)"
)
return ReviewResponse(
approved_count=approved_count,
rejected_count=rejected_count,
failed_count=0,
error=None,
)
# Resume execution regardless of status to prevent deadlock
# (reviews are already processed, user has nothing left to do)
if graph_exec_meta.status != ExecutionStatus.REVIEW:
logger.warning(
f"Resuming execution {graph_exec_id} despite unexpected status "
f"{graph_exec_meta.status} (expected REVIEW) to prevent deadlock"
)
# Get the graph_id from any processed review
first_review = next(iter(updated_reviews.values()))
try:
settings = await get_graph_settings(