fix(backend/executor): Improve graph execution permission check (#11323)

- Resolves #11316
- Durable fix to replace #11318

### Changes 🏗️

- Expand graph execution permissions check
  - Don't require library membership for execution as sub-graph

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Can run sub-agent with non-latest graph version
- [x] Can run sub-agent that is available in Marketplace but not added
to Library
This commit is contained in:
Reinier van der Leer
2025-11-05 18:13:41 +01:00
committed by GitHub
parent 979826f559
commit d68dceb9c1
7 changed files with 104 additions and 62 deletions

View File

@@ -84,6 +84,7 @@ class AgentExecutorBlock(Block):
inputs=input_data.inputs,
nodes_input_masks=input_data.nodes_input_masks,
parent_graph_exec_id=graph_exec_id,
is_sub_graph=True, # AgentExecutorBlock executions are always sub-graphs
)
logger = execution_utils.LogMetadata(

View File

@@ -1,3 +1,4 @@
import asyncio
import logging
import uuid
from collections import defaultdict
@@ -17,8 +18,6 @@ from prisma.types import (
AgentGraphWhereInput,
AgentNodeCreateInput,
AgentNodeLinkCreateInput,
LibraryAgentWhereInput,
StoreListingVersionWhereInput,
)
from pydantic import BaseModel, Field, create_model
from pydantic.fields import computed_field
@@ -37,7 +36,7 @@ from backend.data.model import (
)
from backend.integrations.providers import ProviderName
from backend.util import type as type_utils
from backend.util.exceptions import GraphNotInLibraryError
from backend.util.exceptions import GraphNotAccessibleError, GraphNotInLibraryError
from backend.util.json import SafeJson
from backend.util.models import Pagination
@@ -897,9 +896,11 @@ async def get_graph_metadata(graph_id: str, version: int | None = None) -> Graph
async def get_graph(
graph_id: str,
version: int | None = None,
*,
user_id: str | None = None,
for_export: bool = False,
include_subgraphs: bool = False,
skip_access_check: bool = False,
) -> GraphModel | None:
"""
Retrieves a graph from the DB.
@@ -922,19 +923,9 @@ async def get_graph(
if graph is None:
return None
if graph.userId != user_id:
store_listing_filter: StoreListingVersionWhereInput = {
"agentGraphId": graph_id,
"isDeleted": False,
"submissionStatus": SubmissionStatus.APPROVED,
}
if version is not None:
store_listing_filter["agentGraphVersion"] = version
if not skip_access_check and graph.userId != user_id:
# For access, the graph must be owned by the user or listed in the store
if not await StoreListingVersion.prisma().find_first(
where=store_listing_filter, order={"agentGraphVersion": "desc"}
):
if not await is_graph_published_in_marketplace(graph_id, graph.version):
return None
if include_subgraphs or for_export:
@@ -978,13 +969,8 @@ async def get_graph_as_admin(
# For access, the graph must be owned by the user or listed in the store
if graph is None or (
graph.userId != user_id
and not (
await StoreListingVersion.prisma().find_first(
where={
"agentGraphId": graph_id,
"agentGraphVersion": version or graph.version,
}
)
and not await is_graph_published_in_marketplace(
graph_id, version or graph.version
)
):
return None
@@ -1112,7 +1098,7 @@ async def delete_graph(graph_id: str, user_id: str) -> int:
async def validate_graph_execution_permissions(
graph_id: str, user_id: str, graph_version: Optional[int] = None
user_id: str, graph_id: str, graph_version: int, is_sub_graph: bool = False
) -> None:
"""
Validate that a user has permission to execute a specific graph.
@@ -1120,47 +1106,88 @@ async def validate_graph_execution_permissions(
This function performs comprehensive authorization checks and raises specific
exceptions for different types of failures to enable appropriate error handling.
## Logic
A user can execute a graph if any of these is true:
1. They own the graph and some version of it is still listed in their library
2. The graph is published in the marketplace and listed in their library
3. The graph is published in the marketplace and is being executed as a sub-agent
Args:
graph_id: The ID of the graph to check
user_id: The ID of the user
graph_version: Optional specific version to check. If None (recommended),
performs version-agnostic check allowing execution of any
version as long as the graph is in the user's library.
This is important for sub-graphs that may reference older
versions no longer in the library.
graph_version: The version of the graph to check
is_sub_graph: Whether this is being executed as a sub-graph.
If `True`, the graph isn't required to be in the user's Library.
Raises:
GraphNotInLibraryError: If the graph is not in the user's library (deleted/archived)
GraphNotAccessibleError: If the graph is not accessible to the user.
GraphNotInLibraryError: If the graph is not in the user's library (deleted/archived).
NotAuthorizedError: If the user lacks execution permissions for other reasons
"""
graph, library_agent = await asyncio.gather(
AgentGraph.prisma().find_unique(
where={"graphVersionId": {"id": graph_id, "version": graph_version}}
),
LibraryAgent.prisma().find_first(
where={
"userId": user_id,
"agentGraphId": graph_id,
"isDeleted": False,
"isArchived": False,
}
),
)
# Step 1: Check library membership (raises specific GraphNotInLibraryError)
where_clause: LibraryAgentWhereInput = {
"userId": user_id,
"agentGraphId": graph_id,
"isDeleted": False,
"isArchived": False,
}
# Step 1: Check if user owns this graph
user_owns_graph = graph and graph.userId == user_id
if graph_version is not None:
where_clause["agentGraphVersion"] = graph_version
# Step 2: Check if agent is in the library *and not deleted*
user_has_in_library = library_agent is not None
count = await LibraryAgent.prisma().count(where=where_clause)
if count == 0:
raise GraphNotInLibraryError(
f"Graph #{graph_id} is not accessible in your library"
# Step 3: Apply permission logic
if not (
user_owns_graph
or await is_graph_published_in_marketplace(graph_id, graph_version)
):
raise GraphNotAccessibleError(
f"You do not have access to graph #{graph_id} v{graph_version}: "
"it is not owned by you and not available in the Marketplace"
)
elif not (user_has_in_library or is_sub_graph):
raise GraphNotInLibraryError(f"Graph #{graph_id} is not in your library")
# Step 2: Check execution-specific permissions (raises generic NotAuthorizedError)
# Additional authorization checks beyond library membership:
# Step 6: Check execution-specific permissions (raises generic NotAuthorizedError)
# Additional authorization checks beyond the above:
# 1. Check if user has execution credits (future)
# 2. Check if graph is suspended/disabled (future)
# 3. Check rate limiting rules (future)
# 4. Check organization-level permissions (future)
# For now, library membership is sufficient for execution permission
# Future enhancements can add more granular permission checks here
# When adding new checks, raise NotAuthorizedError for non-library issues
# For now, the above check logic is sufficient for execution permission.
# Future enhancements can add more granular permission checks here.
# When adding new checks, raise NotAuthorizedError for non-library issues.
async def is_graph_published_in_marketplace(graph_id: str, graph_version: int) -> bool:
"""
Check if a graph is published in the marketplace.
Params:
graph_id: The ID of the graph to check
graph_version: The version of the graph to check
Returns:
True if the graph is published and approved in the marketplace, False otherwise
"""
marketplace_listing = await StoreListingVersion.prisma().find_first(
where={
"agentGraphId": graph_id,
"agentGraphVersion": graph_version,
"submissionStatus": SubmissionStatus.APPROVED,
"isDeleted": False,
}
)
return marketplace_listing is not None
async def create_graph(graph: Graph, user_id: str) -> GraphModel:
@@ -1177,7 +1204,7 @@ async def fork_graph(graph_id: str, graph_version: int, user_id: str) -> GraphMo
"""
Forks a graph by copying it and all its nodes and links to a new graph.
"""
graph = await get_graph(graph_id, graph_version, user_id, True)
graph = await get_graph(graph_id, graph_version, user_id=user_id, for_export=True)
if not graph:
raise ValueError(f"Graph {graph_id} v{graph_version} not found")

View File

@@ -178,8 +178,9 @@ async def _execute_graph(**kwargs):
async def _cleanup_orphaned_schedules_for_graph(graph_id: str, user_id: str) -> None:
"""
Clean up orphaned schedules for a specific graph when execution fails with GraphNotInLibraryError.
This happens when an agent is deleted but schedules still exist.
Clean up orphaned schedules for a specific graph when execution fails with GraphNotAccessibleError.
This happens when an agent is pulled from the Marketplace or deleted
but schedules still exist.
"""
# Use scheduler client to access the scheduler service
scheduler_client = get_scheduler_client()

View File

@@ -477,6 +477,7 @@ async def validate_and_construct_node_execution_input(
graph_version: Optional[int] = None,
graph_credentials_inputs: Optional[Mapping[str, CredentialsMetaInput]] = None,
nodes_input_masks: Optional[NodesInputMasks] = None,
is_sub_graph: bool = False,
) -> tuple[GraphModel, list[tuple[str, BlockInput]], NodesInputMasks]:
"""
Public wrapper that handles graph fetching, credential mapping, and validation+construction.
@@ -489,6 +490,7 @@ async def validate_and_construct_node_execution_input(
graph_version: The version of the graph to use.
graph_credentials_inputs: Credentials inputs to use.
nodes_input_masks: Node inputs to use.
is_sub_graph: Whether this is a sub-graph execution.
Returns:
GraphModel: Full graph object for the given `graph_id`.
@@ -510,19 +512,20 @@ async def validate_and_construct_node_execution_input(
user_id=user_id,
version=graph_version,
include_subgraphs=True,
# Execution/access permission is checked by validate_graph_execution_permissions
skip_access_check=True,
)
if not graph:
raise NotFoundError(f"Graph #{graph_id} not found.")
# Validate that the user has permission to execute this graph
# This checks both library membership and execution permissions,
# raising specific exceptions for appropriate error handling
# Note: Version-agnostic check to allow execution of graphs that reference
# older versions of sub-graphs that may no longer be in the library
# raising specific exceptions for appropriate error handling.
await gdb.validate_graph_execution_permissions(
graph_id=graph_id,
user_id=user_id,
# graph_version omitted for version-agnostic permission check
graph_id=graph.id,
graph_version=graph.version,
is_sub_graph=is_sub_graph,
)
nodes_input_masks = _merge_nodes_input_masks(
@@ -756,6 +759,7 @@ async def add_graph_execution(
graph_credentials_inputs: Optional[Mapping[str, CredentialsMetaInput]] = None,
nodes_input_masks: Optional[NodesInputMasks] = None,
parent_graph_exec_id: Optional[str] = None,
is_sub_graph: bool = False,
) -> GraphExecutionWithNodes:
"""
Adds a graph execution to the queue and returns the execution entry.
@@ -770,6 +774,7 @@ async def add_graph_execution(
Keys should map to the keys generated by `GraphModel.aggregate_credentials_inputs`.
nodes_input_masks: Node inputs to use in the execution.
parent_graph_exec_id: The ID of the parent graph execution (for nested executions).
is_sub_graph: Whether this is a sub-graph execution.
Returns:
GraphExecutionEntry: The entry for the graph execution.
Raises:
@@ -788,6 +793,7 @@ async def add_graph_execution(
graph_version=graph_version,
graph_credentials_inputs=graph_credentials_inputs,
nodes_input_masks=nodes_input_masks,
is_sub_graph=is_sub_graph,
)
)
graph_exec = None

View File

@@ -470,7 +470,9 @@ async def _execute_webhook_preset_trigger(
logger.debug(f"Preset #{preset.id} is inactive")
return
graph = await get_graph(preset.graph_id, preset.graph_version, webhook.user_id)
graph = await get_graph(
preset.graph_id, preset.graph_version, user_id=webhook.user_id
)
if not graph:
logger.error(
f"User #{webhook.user_id} has preset #{preset.id} for graph "
@@ -562,8 +564,9 @@ async def _cleanup_orphaned_webhook_for_graph(
graph_id: str, user_id: str, webhook_id: str
) -> None:
"""
Clean up orphaned webhook connections for a specific graph when execution fails with GraphNotInLibraryError.
This happens when an agent is deleted but webhook triggers still exist.
Clean up orphaned webhook connections for a specific graph when execution fails with GraphNotAccessibleError.
This happens when an agent is pulled from the Marketplace or deleted
but webhook triggers still exist.
"""
try:
webhook = await get_webhook(webhook_id, include_relations=True)

View File

@@ -17,10 +17,12 @@ class NotAuthorizedError(ValueError):
"""The user is not authorized to perform the requested operation"""
class GraphNotInLibraryError(NotAuthorizedError):
"""Raised when attempting to execute a graph that is not in the user's library (deleted/archived)."""
class GraphNotAccessibleError(NotAuthorizedError):
"""Raised when attempting to execute a graph that is not accessible to the user."""
pass
class GraphNotInLibraryError(GraphNotAccessibleError):
"""Raised when attempting to execute a graph that is not / no longer in the user's library."""
class InsufficientBalanceError(ValueError):

View File

@@ -402,7 +402,9 @@ class TestDataCreator:
from backend.data.graph import get_graph
graph = await get_graph(
graph_data["id"], graph_data.get("version", 1), user["id"]
graph_data["id"],
graph_data.get("version", 1),
user_id=user["id"],
)
if graph:
# Use the API function to create library agent