feat(backend): add human-in-the-loop review for blocks requiring approval

## Summary
- Add `requires_human_review` flag to Block class to enable automatic HITL reviews
- Blocks marked with `requires_human_review = True` pause execution in safe mode until human approval
- Refactor HumanInTheLoopBlock to use shared review helpers for consistency

## Changes

### New HITL System for Blocks
- **Block.requires_human_review**: Flag to mark blocks requiring human approval
- **HITLReviewHelper**: Shared helper class for review operations across different block types
- **Automatic integration**: Blocks pause execution and create review entries when needed

### Core Functionality
- **Review workflow**: Create review → Pause execution → Human approval/rejection → Continue/Stop
- **Safe mode integration**: Only activates when `execution_context.safe_mode = True`
- **Error handling**: Rejected reviews throw descriptive errors to stop execution

### API Improvements
- **ReviewDecision**: Clean return type with `should_proceed`, `message`, and `review_result`
- **Single entry point**: `handle_review_decision()` provides unified API
- **Type safety**: Proper typing throughout with circular import resolution

### Graph Detection
- **has_human_in_the_loop**: Updated to detect both HITL blocks and blocks with `requires_human_review = True`
- **UI awareness**: Frontend can properly indicate graphs requiring human intervention

## Usage

```python
class MyBlock(Block):
    def __init__(self):
        super().__init__(...)
        self.requires_human_review = True  # Enable automatic HITL
```

When safe mode is enabled, blocks with `requires_human_review = True` will:
1. Pause execution automatically
2. Create a review entry in the database
3. Wait for human approval through the review UI
4. Continue execution if approved, or stop with error if rejected

## Testing
- All existing block tests pass
- Circular import issues resolved
- Clean integration with existing HITL infrastructure
This commit is contained in:
Zamil Majdy
2026-01-08 18:04:12 -06:00
parent 36fb1ea004
commit 4e230dc03c
5 changed files with 292 additions and 60 deletions

View File

@@ -0,0 +1,187 @@
"""
Shared helpers for Human-In-The-Loop (HITL) review functionality.
Used by both the dedicated HumanInTheLoopBlock and blocks that require human review.
"""
import logging
from typing import Any, NamedTuple, Optional
from prisma.enums import ReviewStatus
from backend.data.execution import ExecutionContext, ExecutionStatus
from backend.data.human_review import ReviewResult
from backend.executor.manager import async_update_node_execution_status
from backend.util.clients import get_database_manager_async_client
logger = logging.getLogger(__name__)
class ReviewDecision(NamedTuple):
"""Result of a review decision."""
should_proceed: bool
message: str
review_result: Optional[ReviewResult]
class HITLReviewHelper:
"""Helper class for Human-In-The-Loop review operations."""
@staticmethod
async def get_or_create_human_review(**kwargs) -> Optional[ReviewResult]:
"""Create or retrieve a human review from the database."""
return await get_database_manager_async_client().get_or_create_human_review(
**kwargs
)
@staticmethod
async def update_node_execution_status(**kwargs) -> None:
"""Update the execution status of a node."""
await async_update_node_execution_status(
db_client=get_database_manager_async_client(), **kwargs
)
@staticmethod
async def update_review_processed_status(
node_exec_id: str, processed: bool
) -> None:
"""Update the processed status of a review."""
return await get_database_manager_async_client().update_review_processed_status(
node_exec_id, processed
)
@staticmethod
async def _handle_review_request(
input_data: Any,
user_id: str,
node_exec_id: str,
graph_exec_id: str,
graph_id: str,
graph_version: int,
execution_context: ExecutionContext,
block_name: str = "Block",
editable: bool = False,
) -> Optional[ReviewResult]:
"""
Handle a review request for a block that requires human review.
Args:
input_data: The input data to be reviewed
user_id: ID of the user requesting the review
node_exec_id: ID of the node execution
graph_exec_id: ID of the graph execution
graph_id: ID of the graph
graph_version: Version of the graph
execution_context: Current execution context
block_name: Name of the block requesting review
editable: Whether the reviewer can edit the data
Returns:
ReviewResult if review is complete, None if waiting for human input
Raises:
Exception: If review creation or status update fails
"""
# Skip review if safe mode is disabled
if not execution_context.safe_mode:
logger.info(
f"Block {block_name} skipping review for node {node_exec_id} - safe mode disabled"
)
return None
try:
result = await HITLReviewHelper.get_or_create_human_review(
user_id=user_id,
node_exec_id=node_exec_id,
graph_exec_id=graph_exec_id,
graph_id=graph_id,
graph_version=graph_version,
input_data=input_data,
message=f"Review required for {block_name} execution",
editable=editable,
)
except Exception as e:
logger.error(
f"Error creating review for {block_name} node {node_exec_id}: {str(e)}"
)
raise
if result is None:
logger.info(
f"Block {block_name} pausing execution for node {node_exec_id} - awaiting human review"
)
try:
await HITLReviewHelper.update_node_execution_status(
exec_id=node_exec_id,
status=ExecutionStatus.REVIEW,
)
return None # Signal that execution should pause
except Exception as e:
logger.error(
f"Failed to update node status for block {block_name} {node_exec_id}: {str(e)}"
)
raise
# Mark review as processed if not already done
if not result.processed:
await HITLReviewHelper.update_review_processed_status(
node_exec_id=node_exec_id, processed=True
)
return result
@staticmethod
async def handle_review_decision(
input_data: Any,
user_id: str,
node_exec_id: str,
graph_exec_id: str,
graph_id: str,
graph_version: int,
execution_context: ExecutionContext,
block_name: str = "Block",
editable: bool = False,
) -> Optional[ReviewDecision]:
"""
Handle a review request and return the decision in a single call.
Args:
input_data: The input data to be reviewed
user_id: ID of the user requesting the review
node_exec_id: ID of the node execution
graph_exec_id: ID of the graph execution
graph_id: ID of the graph
graph_version: Version of the graph
execution_context: Current execution context
block_name: Name of the block requesting review
editable: Whether the reviewer can edit the data
Returns:
ReviewDecision if review is complete (approved/rejected),
None if execution should pause (awaiting review)
"""
review_result = await HITLReviewHelper._handle_review_request(
input_data=input_data,
user_id=user_id,
node_exec_id=node_exec_id,
graph_exec_id=graph_exec_id,
graph_id=graph_id,
graph_version=graph_version,
execution_context=execution_context,
block_name=block_name,
editable=editable,
)
if review_result is None:
# Still awaiting review - return None to pause execution
return None
should_proceed = review_result.status == ReviewStatus.APPROVED
if should_proceed:
message = review_result.message or "Execution approved by reviewer"
else:
message = review_result.message or "Execution rejected by reviewer"
return ReviewDecision(
should_proceed=should_proceed, message=message, review_result=review_result
)

View File

@@ -3,6 +3,7 @@ from typing import Any
from prisma.enums import ReviewStatus
from backend.blocks.helpers.review import HITLReviewHelper
from backend.data.block import (
Block,
BlockCategory,
@@ -11,11 +12,9 @@ from backend.data.block import (
BlockSchemaOutput,
BlockType,
)
from backend.data.execution import ExecutionContext, ExecutionStatus
from backend.data.execution import ExecutionContext
from backend.data.human_review import ReviewResult
from backend.data.model import SchemaField
from backend.executor.manager import async_update_node_execution_status
from backend.util.clients import get_database_manager_async_client
logger = logging.getLogger(__name__)
@@ -72,32 +71,26 @@ class HumanInTheLoopBlock(Block):
("approved_data", {"name": "John Doe", "age": 30}),
],
test_mock={
"get_or_create_human_review": lambda *_args, **_kwargs: ReviewResult(
data={"name": "John Doe", "age": 30},
status=ReviewStatus.APPROVED,
message="",
processed=False,
node_exec_id="test-node-exec-id",
),
"update_node_execution_status": lambda *_args, **_kwargs: None,
"update_review_processed_status": lambda *_args, **_kwargs: None,
"handle_review_decision": lambda **kwargs: type(
"ReviewDecision",
(),
{
"should_proceed": True,
"message": "Test approval message",
"review_result": ReviewResult(
data={"name": "John Doe", "age": 30},
status=ReviewStatus.APPROVED,
message="",
processed=False,
node_exec_id="test-node-exec-id",
),
},
)(),
},
)
async def get_or_create_human_review(self, **kwargs):
return await get_database_manager_async_client().get_or_create_human_review(
**kwargs
)
async def update_node_execution_status(self, **kwargs):
return await async_update_node_execution_status(
db_client=get_database_manager_async_client(), **kwargs
)
async def update_review_processed_status(self, node_exec_id: str, processed: bool):
return await get_database_manager_async_client().update_review_processed_status(
node_exec_id, processed
)
async def handle_review_decision(self, **kwargs):
return await HITLReviewHelper.handle_review_decision(**kwargs)
async def run(
self,
@@ -109,7 +102,7 @@ class HumanInTheLoopBlock(Block):
graph_id: str,
graph_version: int,
execution_context: ExecutionContext,
**kwargs,
**_kwargs,
) -> BlockOutput:
if not execution_context.safe_mode:
logger.info(
@@ -120,47 +113,37 @@ class HumanInTheLoopBlock(Block):
return
try:
result = await self.get_or_create_human_review(
decision = await self.handle_review_decision(
input_data=input_data.data,
user_id=user_id,
node_exec_id=node_exec_id,
graph_exec_id=graph_exec_id,
graph_id=graph_id,
graph_version=graph_version,
input_data=input_data.data,
message=input_data.name,
execution_context=execution_context,
block_name=self.name,
editable=input_data.editable,
)
except Exception as e:
logger.error(f"Error in HITL block for node {node_exec_id}: {str(e)}")
raise
if result is None:
logger.info(
f"HITL block pausing execution for node {node_exec_id} - awaiting human review"
)
try:
await self.update_node_execution_status(
exec_id=node_exec_id,
status=ExecutionStatus.REVIEW,
)
return
except Exception as e:
logger.error(
f"Failed to update node status for HITL block {node_exec_id}: {str(e)}"
)
raise
if decision is None:
# Execution is paused, the helper already handled status updates
return
if not result.processed:
await self.update_review_processed_status(
node_exec_id=node_exec_id, processed=True
)
if result.status == ReviewStatus.APPROVED:
yield "approved_data", result.data
if result.message:
yield "review_message", result.message
elif result.status == ReviewStatus.REJECTED:
yield "rejected_data", result.data
if result.message:
yield "review_message", result.message
# Process the review result and yield appropriate outputs
if (
decision.review_result
and decision.review_result.status == ReviewStatus.APPROVED
):
yield "approved_data", decision.review_result.data
if decision.message:
yield "review_message", decision.message
elif (
decision.review_result
and decision.review_result.status == ReviewStatus.REJECTED
):
yield "rejected_data", decision.review_result.data
if decision.message:
yield "review_message", decision.message

View File

@@ -50,6 +50,8 @@ from .model import (
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from backend.data.execution import ExecutionContext
from .graph import Link
app_config = Config()
@@ -472,6 +474,7 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
self.block_type = block_type
self.webhook_config = webhook_config
self.execution_stats: NodeExecutionStats = NodeExecutionStats()
self.requires_human_review: bool = False
if self.webhook_config:
if isinstance(self.webhook_config, BlockWebhookConfig):
@@ -614,6 +617,56 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
block_id=self.id,
) from ex
async def is_block_exec_need_review(
self,
input_data: BlockInput,
*,
user_id: str,
node_exec_id: str,
graph_exec_id: str,
graph_id: str,
graph_version: int,
execution_context: "ExecutionContext",
**kwargs,
) -> bool:
"""
Check if this block execution needs human review and handle the review process.
Returns:
True if execution should be paused for review, False if execution can proceed
"""
if not self.requires_human_review:
return False
from backend.blocks.helpers.review import HITLReviewHelper
# Handle the review request and get decision
decision = await HITLReviewHelper.handle_review_decision(
input_data=input_data,
user_id=user_id,
node_exec_id=node_exec_id,
graph_exec_id=graph_exec_id,
graph_id=graph_id,
graph_version=graph_version,
execution_context=execution_context,
block_name=self.name,
editable=True,
)
if decision is None:
# We're awaiting review - pause execution
return True
if not decision.should_proceed:
# Review was rejected, raise an error to stop execution
raise BlockExecutionError(
message=f"Block execution rejected by reviewer: {decision.message}",
block_name=self.name,
block_id=self.id,
)
return False
async def _execute(self, input_data: BlockInput, **kwargs) -> BlockOutput:
if error := self.input_schema.validate_data(input_data):
raise BlockInputError(
@@ -622,6 +675,9 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
block_id=self.id,
)
if await self.is_block_exec_need_review(input_data, **kwargs):
return
async for output_name, output_data in self.run(
self.input_schema(**{k: v for k, v in input_data.items() if v is not None}),
**kwargs,

View File

@@ -235,7 +235,10 @@ class BaseGraph(BaseDbModel):
return any(
node.block_id
for node in self.nodes
if node.block.block_type == BlockType.HUMAN_IN_THE_LOOP
if (
node.block.block_type == BlockType.HUMAN_IN_THE_LOOP
or node.block.requires_human_review
)
)
@property

View File

@@ -134,6 +134,9 @@ ignore_patterns = []
[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "session"
# Disable syrupy plugin to avoid conflict with pytest-snapshot
# Both provide --snapshot-update argument causing ArgumentError
addopts = "-p no:syrupy"
filterwarnings = [
"ignore:'audioop' is deprecated:DeprecationWarning:discord.player",
"ignore:invalid escape sequence:DeprecationWarning:tweepy.api",