feat(backend): Make execution limit per user per graph and reduce to 25 (#11169)

## Summary
- Changed max_concurrent_graph_executions_per_user from 50 to 25
concurrent executions
- Updated the limit to be per user per graph instead of globally per
user
- Users can now run different graphs concurrently without being limited
by executions of other graphs
- Enhanced database query to filter by both user_id and graph_id

## Changes Made
- **Settings**: Reduced default limit from 50 to 25 and updated
description to clarify per-graph scope
- **Database Layer**: Modified `get_graph_executions_count` to accept
optional `graph_id` parameter
- **Executor Manager**: Updated rate limiting logic to check
per-user-per-graph instead of per-user globally
- **Logging**: Enhanced warning messages to include graph_id context

## Test plan
- [ ] Verify that users can run up to 25 concurrent executions of the
same graph
- [ ] Verify that users can run different graphs concurrently without
interference
- [ ] Test rate limiting behavior when limit is exceeded for a specific
graph
- [ ] Confirm logging shows correct graph_id context in rate limit
messages

## Impact
This change improves the user experience by allowing concurrent
execution of different graphs while still preventing resource exhaustion
from running too many instances of the same graph.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Zamil Majdy
2025-10-15 07:02:55 +07:00
committed by GitHub
parent 7b8499ec69
commit 934cb3a9c7
3 changed files with 16 additions and 7 deletions

View File

@@ -479,16 +479,18 @@ async def get_graph_executions(
async def get_graph_executions_count(
user_id: str,
user_id: Optional[str] = None,
graph_id: Optional[str] = None,
statuses: Optional[list[ExecutionStatus]] = None,
created_time_gte: Optional[datetime] = None,
created_time_lte: Optional[datetime] = None,
) -> int:
"""
Get count of graph executions for a user with optional filters.
Get count of graph executions with optional filters.
Args:
user_id: The user ID to filter by
user_id: Optional user ID to filter by
graph_id: Optional graph ID to filter by
statuses: Optional list of execution statuses to filter by
created_time_gte: Optional minimum creation time
created_time_lte: Optional maximum creation time
@@ -498,9 +500,14 @@ async def get_graph_executions_count(
"""
where_filter: AgentGraphExecutionWhereInput = {
"isDeleted": False,
"userId": user_id,
}
if user_id:
where_filter["userId"] = user_id
if graph_id:
where_filter["agentGraphId"] = graph_id
if created_time_gte or created_time_lte:
where_filter["createdAt"] = {
"gte": created_time_gte or datetime.min.replace(tzinfo=timezone.utc),

View File

@@ -1467,6 +1467,7 @@ class ExecutionManager(AppProcess):
graph_exec_id = graph_exec_entry.graph_exec_id
user_id = graph_exec_entry.user_id
graph_id = graph_exec_entry.graph_id
logger.info(
f"[{self.service_name}] Received RUN for graph_exec_id={graph_exec_id}, user_id={user_id}"
)
@@ -1476,6 +1477,7 @@ class ExecutionManager(AppProcess):
# Only check executions from the last 24 hours for performance
current_running_count = get_db_client().get_graph_executions_count(
user_id=user_id,
graph_id=graph_id,
statuses=[ExecutionStatus.RUNNING],
created_time_gte=datetime.now(timezone.utc) - timedelta(hours=24),
)
@@ -1485,7 +1487,7 @@ class ExecutionManager(AppProcess):
>= settings.config.max_concurrent_graph_executions_per_user
):
logger.warning(
f"[{self.service_name}] Rate limit exceeded for user {user_id}: "
f"[{self.service_name}] Rate limit exceeded for user {user_id} on graph {graph_id}: "
f"{current_running_count}/{settings.config.max_concurrent_graph_executions_per_user} running executions"
)
_ack_message(reject=True, requeue=True)

View File

@@ -149,10 +149,10 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
description="Time in seconds for how far back to check for the late executions.",
)
max_concurrent_graph_executions_per_user: int = Field(
default=50,
default=25,
ge=1,
le=1000,
description="Maximum number of concurrent graph executions allowed per user.",
description="Maximum number of concurrent graph executions allowed per user per graph.",
)
block_error_rate_threshold: float = Field(