mirror of
https://github.com/All-Hands-AI/OpenHands.git
synced 2026-04-29 03:00:45 -04:00
Compare commits
8 Commits
add-lamina
...
feature/ro
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9c061d178b | ||
|
|
cc7e002ffe | ||
|
|
d058b64597 | ||
|
|
954c922a98 | ||
|
|
c9cf142949 | ||
|
|
7e0b6a39c1 | ||
|
|
268fa43ba9 | ||
|
|
ff14104db4 |
320
enterprise/server/auth/authorization.py
Normal file
320
enterprise/server/auth/authorization.py
Normal file
@@ -0,0 +1,320 @@
|
||||
"""
|
||||
Permission-based authorization dependencies for API endpoints (SAAS mode).
|
||||
|
||||
This module provides FastAPI dependencies for checking user permissions
|
||||
within organizations. It uses a permission-based authorization model where
|
||||
roles (owner, admin, member) are mapped to specific permissions.
|
||||
|
||||
This is the SAAS/enterprise implementation that performs real authorization
|
||||
checks against the database.
|
||||
|
||||
Usage:
|
||||
from server.auth.authorization import (
|
||||
Permission,
|
||||
require_permission,
|
||||
require_org_role,
|
||||
require_org_user,
|
||||
require_org_admin,
|
||||
require_org_owner,
|
||||
)
|
||||
|
||||
@router.get('/{org_id}/settings')
|
||||
async def get_settings(
|
||||
org_id: UUID,
|
||||
user_id: str = Depends(require_permission(Permission.VIEW_LLM_SETTINGS)),
|
||||
):
|
||||
# Only users with VIEW_LLM_SETTINGS permission can access
|
||||
...
|
||||
"""
|
||||
|
||||
from enum import Enum
|
||||
from uuid import UUID
|
||||
|
||||
from fastapi import Depends, HTTPException, status
|
||||
from storage.org_member_store import OrgMemberStore
|
||||
from storage.role import Role
|
||||
from storage.role_store import RoleStore
|
||||
|
||||
from openhands.core.logger import openhands_logger as logger
|
||||
from openhands.server.auth import Permission
|
||||
from openhands.server.user_auth import get_user_id
|
||||
|
||||
|
||||
class RoleName(str, Enum):
|
||||
"""Role names used in the system."""
|
||||
|
||||
OWNER = 'owner'
|
||||
ADMIN = 'admin'
|
||||
MEMBER = 'member'
|
||||
|
||||
|
||||
# Permission mappings for each role
|
||||
ROLE_PERMISSIONS: dict[RoleName, frozenset[Permission]] = {
|
||||
RoleName.OWNER: frozenset(
|
||||
[
|
||||
# Settings (Full access)
|
||||
Permission.MANAGE_SECRETS,
|
||||
Permission.MANAGE_MCP,
|
||||
Permission.MANAGE_INTEGRATIONS,
|
||||
Permission.MANAGE_APPLICATION_SETTINGS,
|
||||
Permission.MANAGE_API_KEYS,
|
||||
Permission.VIEW_LLM_SETTINGS,
|
||||
Permission.EDIT_LLM_SETTINGS,
|
||||
Permission.VIEW_BILLING,
|
||||
Permission.ADD_CREDITS,
|
||||
# Organization Members
|
||||
Permission.INVITE_USER_TO_ORGANIZATION,
|
||||
Permission.CHANGE_USER_ROLE_MEMBER,
|
||||
Permission.CHANGE_USER_ROLE_ADMIN,
|
||||
Permission.CHANGE_USER_ROLE_OWNER,
|
||||
# Organization Management (Owner only)
|
||||
Permission.CHANGE_ORGANIZATION_NAME,
|
||||
Permission.DELETE_ORGANIZATION,
|
||||
]
|
||||
),
|
||||
RoleName.ADMIN: frozenset(
|
||||
[
|
||||
# Settings (Full access)
|
||||
Permission.MANAGE_SECRETS,
|
||||
Permission.MANAGE_MCP,
|
||||
Permission.MANAGE_INTEGRATIONS,
|
||||
Permission.MANAGE_APPLICATION_SETTINGS,
|
||||
Permission.MANAGE_API_KEYS,
|
||||
Permission.VIEW_LLM_SETTINGS,
|
||||
Permission.EDIT_LLM_SETTINGS,
|
||||
Permission.VIEW_BILLING,
|
||||
Permission.ADD_CREDITS,
|
||||
# Organization Members
|
||||
Permission.INVITE_USER_TO_ORGANIZATION,
|
||||
Permission.CHANGE_USER_ROLE_MEMBER,
|
||||
Permission.CHANGE_USER_ROLE_ADMIN,
|
||||
]
|
||||
),
|
||||
RoleName.MEMBER: frozenset(
|
||||
[
|
||||
# Settings (Full access)
|
||||
Permission.MANAGE_SECRETS,
|
||||
Permission.MANAGE_MCP,
|
||||
Permission.MANAGE_INTEGRATIONS,
|
||||
Permission.MANAGE_APPLICATION_SETTINGS,
|
||||
Permission.MANAGE_API_KEYS,
|
||||
# LLM Settings (View only)
|
||||
Permission.VIEW_LLM_SETTINGS,
|
||||
]
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def get_role_permissions(role_name: str) -> frozenset[Permission]:
|
||||
"""Get the permissions for a role."""
|
||||
try:
|
||||
role_enum = RoleName(role_name)
|
||||
return ROLE_PERMISSIONS.get(role_enum, frozenset())
|
||||
except ValueError:
|
||||
return frozenset()
|
||||
|
||||
|
||||
def get_user_org_role(user_id: str, org_id: UUID) -> Role | None:
|
||||
"""
|
||||
Get the user's role in an organization.
|
||||
|
||||
Args:
|
||||
user_id: User ID (string that will be converted to UUID)
|
||||
org_id: Organization ID
|
||||
|
||||
Returns:
|
||||
Role object if user is a member, None otherwise
|
||||
"""
|
||||
from uuid import UUID as parse_uuid
|
||||
|
||||
org_member = OrgMemberStore.get_org_member(org_id, parse_uuid(user_id))
|
||||
if not org_member:
|
||||
return None
|
||||
|
||||
return RoleStore.get_role_by_id(org_member.role_id)
|
||||
|
||||
|
||||
def has_permission(user_role: Role, permission: Permission) -> bool:
|
||||
"""
|
||||
Check if a role has a specific permission.
|
||||
|
||||
Args:
|
||||
user_role: User's Role object
|
||||
permission: Permission to check
|
||||
|
||||
Returns:
|
||||
True if the role has the permission
|
||||
"""
|
||||
permissions = get_role_permissions(user_role.name)
|
||||
return permission in permissions
|
||||
|
||||
|
||||
def has_required_role(user_role: Role, required_role: Role) -> bool:
|
||||
"""
|
||||
Check if user's role meets or exceeds the required role.
|
||||
|
||||
Uses role hierarchy based on rank where lower rank = higher position
|
||||
(e.g., rank 1 owner > rank 2 admin > rank 3 user).
|
||||
|
||||
Args:
|
||||
user_role: User's actual Role object
|
||||
required_role: Minimum required Role object
|
||||
|
||||
Returns:
|
||||
True if user has sufficient permissions
|
||||
"""
|
||||
return user_role.rank <= required_role.rank
|
||||
|
||||
|
||||
def require_permission(permission: Permission):
|
||||
"""
|
||||
Factory function that creates a dependency to require a specific permission.
|
||||
|
||||
This creates a FastAPI dependency that:
|
||||
1. Extracts org_id from the path parameter
|
||||
2. Gets the authenticated user_id
|
||||
3. Checks if the user has the required permission in the organization
|
||||
4. Returns the user_id if authorized, raises HTTPException otherwise
|
||||
|
||||
Usage:
|
||||
@router.get('/{org_id}/settings')
|
||||
async def get_settings(
|
||||
org_id: UUID,
|
||||
user_id: str = Depends(require_permission(Permission.VIEW_LLM_SETTINGS)),
|
||||
):
|
||||
...
|
||||
|
||||
Args:
|
||||
permission: The permission required to access the endpoint
|
||||
|
||||
Returns:
|
||||
Dependency function that validates permission and returns user_id
|
||||
"""
|
||||
|
||||
async def permission_checker(
|
||||
org_id: UUID,
|
||||
user_id: str | None = Depends(get_user_id),
|
||||
) -> str:
|
||||
if not user_id:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail='User not authenticated',
|
||||
)
|
||||
|
||||
user_role = get_user_org_role(user_id, org_id)
|
||||
|
||||
if not user_role:
|
||||
logger.warning(
|
||||
'User not a member of organization',
|
||||
extra={'user_id': user_id, 'org_id': str(org_id)},
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail='User is not a member of this organization',
|
||||
)
|
||||
|
||||
if not has_permission(user_role, permission):
|
||||
logger.warning(
|
||||
'Insufficient permissions',
|
||||
extra={
|
||||
'user_id': user_id,
|
||||
'org_id': str(org_id),
|
||||
'user_role': user_role.name,
|
||||
'required_permission': permission.value,
|
||||
},
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail=f'Requires {permission.value} permission',
|
||||
)
|
||||
|
||||
return user_id
|
||||
|
||||
return permission_checker
|
||||
|
||||
|
||||
def require_org_role(required_role_name: str):
|
||||
"""
|
||||
Factory function that creates a dependency to require a minimum org role.
|
||||
|
||||
This creates a FastAPI dependency that:
|
||||
1. Extracts org_id from the path parameter
|
||||
2. Gets the authenticated user_id
|
||||
3. Checks if the user has the required role in the organization
|
||||
4. Returns the user_id if authorized, raises HTTPException otherwise
|
||||
|
||||
Role hierarchy is based on rank from the Role class, where
|
||||
lower rank = higher position (e.g., rank 1 > rank 2 > rank 3).
|
||||
|
||||
Usage:
|
||||
@router.get('/{org_id}/resource')
|
||||
async def get_resource(
|
||||
org_id: UUID,
|
||||
user_id: str = Depends(require_org_role('user')),
|
||||
):
|
||||
...
|
||||
|
||||
Args:
|
||||
required_role_name: Name of the minimum required role to access the endpoint
|
||||
|
||||
Returns:
|
||||
Dependency function that validates role and returns user_id
|
||||
"""
|
||||
|
||||
async def role_checker(
|
||||
org_id: UUID,
|
||||
user_id: str | None = Depends(get_user_id),
|
||||
) -> str:
|
||||
if not user_id:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail='User not authenticated',
|
||||
)
|
||||
|
||||
user_role = get_user_org_role(user_id, org_id)
|
||||
|
||||
if not user_role:
|
||||
logger.warning(
|
||||
'User not a member of organization',
|
||||
extra={'user_id': user_id, 'org_id': str(org_id)},
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail='User is not a member of this organization',
|
||||
)
|
||||
|
||||
required_role = RoleStore.get_role_by_name(required_role_name)
|
||||
if not required_role:
|
||||
logger.error(
|
||||
'Required role not found in database',
|
||||
extra={'required_role': required_role_name},
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail='Role configuration error',
|
||||
)
|
||||
|
||||
if not has_required_role(user_role, required_role):
|
||||
logger.warning(
|
||||
'Insufficient role permissions',
|
||||
extra={
|
||||
'user_id': user_id,
|
||||
'org_id': str(org_id),
|
||||
'user_role': user_role.name,
|
||||
'required_role': required_role_name,
|
||||
},
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail=f'Requires {required_role_name} role or higher',
|
||||
)
|
||||
|
||||
return user_id
|
||||
|
||||
return role_checker
|
||||
|
||||
|
||||
# Convenience dependencies for common role checks
|
||||
require_org_user = require_org_role('user')
|
||||
require_org_admin = require_org_role('admin')
|
||||
require_org_owner = require_org_role('owner')
|
||||
@@ -2,6 +2,11 @@ from typing import Annotated
|
||||
from uuid import UUID
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
||||
from server.auth.authorization import (
|
||||
require_org_admin,
|
||||
require_org_owner,
|
||||
require_org_user,
|
||||
)
|
||||
from server.email_validation import get_admin_user_id
|
||||
from server.routes.org_models import (
|
||||
CannotModifySelfError,
|
||||
@@ -189,23 +194,26 @@ async def create_org(
|
||||
@org_router.get('/{org_id}', response_model=OrgResponse, status_code=status.HTTP_200_OK)
|
||||
async def get_org(
|
||||
org_id: UUID,
|
||||
user_id: str = Depends(get_user_id),
|
||||
user_id: str = Depends(require_org_user),
|
||||
) -> OrgResponse:
|
||||
"""Get organization details by ID.
|
||||
|
||||
This endpoint allows authenticated users who are members of an organization
|
||||
to retrieve its details. Only members of the organization can access this endpoint.
|
||||
Requires user, admin, or owner role.
|
||||
|
||||
Args:
|
||||
org_id: Organization ID (UUID)
|
||||
user_id: Authenticated user ID (injected by dependency)
|
||||
user_id: Authenticated user ID (injected by dependency, requires org membership)
|
||||
|
||||
Returns:
|
||||
OrgResponse: The organization details
|
||||
|
||||
Raises:
|
||||
HTTPException: 401 if user is not authenticated
|
||||
HTTPException: 403 if user is not a member of the organization
|
||||
HTTPException: 422 if org_id is not a valid UUID (handled by FastAPI)
|
||||
HTTPException: 404 if organization not found or user is not a member
|
||||
HTTPException: 404 if organization not found
|
||||
HTTPException: 500 if retrieval fails
|
||||
"""
|
||||
logger.info(
|
||||
@@ -305,23 +313,25 @@ async def get_me(
|
||||
@org_router.delete('/{org_id}', status_code=status.HTTP_200_OK)
|
||||
async def delete_org(
|
||||
org_id: UUID,
|
||||
user_id: str = Depends(get_user_id),
|
||||
user_id: str = Depends(require_org_owner),
|
||||
) -> dict:
|
||||
"""Delete an organization.
|
||||
|
||||
This endpoint allows authenticated organization owners to delete their organization.
|
||||
All associated data including organization members, conversations, billing data,
|
||||
and external LiteLLM team resources will be permanently removed.
|
||||
Requires owner role.
|
||||
|
||||
Args:
|
||||
org_id: Organization ID to delete
|
||||
user_id: Authenticated user ID (injected by dependency)
|
||||
user_id: Authenticated user ID (injected by dependency, requires owner role)
|
||||
|
||||
Returns:
|
||||
dict: Confirmation message with deleted organization details
|
||||
|
||||
Raises:
|
||||
HTTPException: 403 if user is not the organization owner
|
||||
HTTPException: 401 if user is not authenticated
|
||||
HTTPException: 403 if user is not an owner of the organization
|
||||
HTTPException: 404 if organization not found
|
||||
HTTPException: 500 if deletion fails
|
||||
"""
|
||||
@@ -414,24 +424,24 @@ async def delete_org(
|
||||
async def update_org(
|
||||
org_id: UUID,
|
||||
update_data: OrgUpdate,
|
||||
user_id: str = Depends(get_user_id),
|
||||
user_id: str = Depends(require_org_admin),
|
||||
) -> OrgResponse:
|
||||
"""Update an existing organization.
|
||||
|
||||
This endpoint allows authenticated users to update organization settings.
|
||||
LLM-related settings require admin or owner role in the organization.
|
||||
This endpoint allows authenticated admins and owners to update organization settings.
|
||||
Requires admin or owner role in the organization.
|
||||
|
||||
Args:
|
||||
org_id: Organization ID to update (UUID validated by FastAPI)
|
||||
update_data: Organization update data
|
||||
user_id: Authenticated user ID (injected by dependency)
|
||||
user_id: Authenticated user ID (injected by dependency, requires admin role)
|
||||
|
||||
Returns:
|
||||
OrgResponse: The updated organization details
|
||||
|
||||
Raises:
|
||||
HTTPException: 400 if org_id is invalid UUID format (handled by FastAPI)
|
||||
HTTPException: 403 if user lacks permission for LLM settings
|
||||
HTTPException: 401 if user is not authenticated
|
||||
HTTPException: 403 if user is not an admin or owner of the organization
|
||||
HTTPException: 404 if organization not found
|
||||
HTTPException: 422 if validation errors occur (handled by FastAPI)
|
||||
HTTPException: 500 if update fails
|
||||
|
||||
491
enterprise/tests/unit/test_authorization.py
Normal file
491
enterprise/tests/unit/test_authorization.py
Normal file
@@ -0,0 +1,491 @@
|
||||
"""
|
||||
Unit tests for role-based authorization (authorization.py).
|
||||
|
||||
Tests the FastAPI dependencies that validate user roles within organizations.
|
||||
"""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
from fastapi import HTTPException
|
||||
from server.auth.authorization import (
|
||||
ROLE_HIERARCHY,
|
||||
OrgRole,
|
||||
get_user_org_role,
|
||||
has_required_role,
|
||||
require_org_admin,
|
||||
require_org_owner,
|
||||
require_org_role,
|
||||
require_org_user,
|
||||
)
|
||||
|
||||
# =============================================================================
|
||||
# Tests for OrgRole enum
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestOrgRole:
|
||||
"""Tests for OrgRole enum."""
|
||||
|
||||
def test_org_role_values(self):
|
||||
"""
|
||||
GIVEN: OrgRole enum
|
||||
WHEN: Accessing role values
|
||||
THEN: All expected roles exist with correct string values
|
||||
"""
|
||||
assert OrgRole.OWNER.value == 'owner'
|
||||
assert OrgRole.ADMIN.value == 'admin'
|
||||
assert OrgRole.USER.value == 'user'
|
||||
|
||||
def test_org_role_from_string(self):
|
||||
"""
|
||||
GIVEN: Valid role string
|
||||
WHEN: Creating OrgRole from string
|
||||
THEN: Correct enum value is returned
|
||||
"""
|
||||
assert OrgRole('owner') == OrgRole.OWNER
|
||||
assert OrgRole('admin') == OrgRole.ADMIN
|
||||
assert OrgRole('user') == OrgRole.USER
|
||||
|
||||
def test_org_role_invalid_string(self):
|
||||
"""
|
||||
GIVEN: Invalid role string
|
||||
WHEN: Creating OrgRole from string
|
||||
THEN: ValueError is raised
|
||||
"""
|
||||
with pytest.raises(ValueError):
|
||||
OrgRole('invalid_role')
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tests for role hierarchy
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestRoleHierarchy:
|
||||
"""Tests for role hierarchy constants."""
|
||||
|
||||
def test_owner_highest_rank(self):
|
||||
"""
|
||||
GIVEN: Role hierarchy
|
||||
WHEN: Comparing role ranks
|
||||
THEN: Owner has highest rank
|
||||
"""
|
||||
assert ROLE_HIERARCHY[OrgRole.OWNER] > ROLE_HIERARCHY[OrgRole.ADMIN]
|
||||
assert ROLE_HIERARCHY[OrgRole.OWNER] > ROLE_HIERARCHY[OrgRole.USER]
|
||||
|
||||
def test_admin_middle_rank(self):
|
||||
"""
|
||||
GIVEN: Role hierarchy
|
||||
WHEN: Comparing role ranks
|
||||
THEN: Admin is between owner and user
|
||||
"""
|
||||
assert ROLE_HIERARCHY[OrgRole.ADMIN] > ROLE_HIERARCHY[OrgRole.USER]
|
||||
assert ROLE_HIERARCHY[OrgRole.ADMIN] < ROLE_HIERARCHY[OrgRole.OWNER]
|
||||
|
||||
def test_user_lowest_rank(self):
|
||||
"""
|
||||
GIVEN: Role hierarchy
|
||||
WHEN: Comparing role ranks
|
||||
THEN: User has lowest rank
|
||||
"""
|
||||
assert ROLE_HIERARCHY[OrgRole.USER] < ROLE_HIERARCHY[OrgRole.ADMIN]
|
||||
assert ROLE_HIERARCHY[OrgRole.USER] < ROLE_HIERARCHY[OrgRole.OWNER]
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tests for has_required_role function
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestHasRequiredRole:
|
||||
"""Tests for has_required_role function."""
|
||||
|
||||
def test_owner_has_owner_role(self):
|
||||
"""
|
||||
GIVEN: User with owner role
|
||||
WHEN: Checking for owner requirement
|
||||
THEN: Returns True
|
||||
"""
|
||||
assert has_required_role('owner', OrgRole.OWNER) is True
|
||||
|
||||
def test_owner_has_admin_role(self):
|
||||
"""
|
||||
GIVEN: User with owner role
|
||||
WHEN: Checking for admin requirement
|
||||
THEN: Returns True (owner > admin)
|
||||
"""
|
||||
assert has_required_role('owner', OrgRole.ADMIN) is True
|
||||
|
||||
def test_owner_has_user_role(self):
|
||||
"""
|
||||
GIVEN: User with owner role
|
||||
WHEN: Checking for user requirement
|
||||
THEN: Returns True (owner > user)
|
||||
"""
|
||||
assert has_required_role('owner', OrgRole.USER) is True
|
||||
|
||||
def test_admin_has_admin_role(self):
|
||||
"""
|
||||
GIVEN: User with admin role
|
||||
WHEN: Checking for admin requirement
|
||||
THEN: Returns True
|
||||
"""
|
||||
assert has_required_role('admin', OrgRole.ADMIN) is True
|
||||
|
||||
def test_admin_has_user_role(self):
|
||||
"""
|
||||
GIVEN: User with admin role
|
||||
WHEN: Checking for user requirement
|
||||
THEN: Returns True (admin > user)
|
||||
"""
|
||||
assert has_required_role('admin', OrgRole.USER) is True
|
||||
|
||||
def test_admin_lacks_owner_role(self):
|
||||
"""
|
||||
GIVEN: User with admin role
|
||||
WHEN: Checking for owner requirement
|
||||
THEN: Returns False (admin < owner)
|
||||
"""
|
||||
assert has_required_role('admin', OrgRole.OWNER) is False
|
||||
|
||||
def test_user_has_user_role(self):
|
||||
"""
|
||||
GIVEN: User with user role
|
||||
WHEN: Checking for user requirement
|
||||
THEN: Returns True
|
||||
"""
|
||||
assert has_required_role('user', OrgRole.USER) is True
|
||||
|
||||
def test_user_lacks_admin_role(self):
|
||||
"""
|
||||
GIVEN: User with user role
|
||||
WHEN: Checking for admin requirement
|
||||
THEN: Returns False (user < admin)
|
||||
"""
|
||||
assert has_required_role('user', OrgRole.ADMIN) is False
|
||||
|
||||
def test_user_lacks_owner_role(self):
|
||||
"""
|
||||
GIVEN: User with user role
|
||||
WHEN: Checking for owner requirement
|
||||
THEN: Returns False (user < owner)
|
||||
"""
|
||||
assert has_required_role('user', OrgRole.OWNER) is False
|
||||
|
||||
def test_invalid_role_returns_false(self):
|
||||
"""
|
||||
GIVEN: Invalid role string
|
||||
WHEN: Checking for any requirement
|
||||
THEN: Returns False
|
||||
"""
|
||||
assert has_required_role('invalid_role', OrgRole.USER) is False
|
||||
assert has_required_role('invalid_role', OrgRole.ADMIN) is False
|
||||
assert has_required_role('invalid_role', OrgRole.OWNER) is False
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tests for get_user_org_role function
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestGetUserOrgRole:
|
||||
"""Tests for get_user_org_role function."""
|
||||
|
||||
def test_returns_role_when_member_exists(self):
|
||||
"""
|
||||
GIVEN: User is a member of organization with role
|
||||
WHEN: get_user_org_role is called
|
||||
THEN: Role name is returned
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
mock_org_member = MagicMock()
|
||||
mock_org_member.role_id = 1
|
||||
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'admin'
|
||||
|
||||
with (
|
||||
patch(
|
||||
'server.auth.authorization.OrgMemberStore.get_org_member',
|
||||
return_value=mock_org_member,
|
||||
),
|
||||
patch(
|
||||
'server.auth.authorization.RoleStore.get_role_by_id',
|
||||
return_value=mock_role,
|
||||
),
|
||||
):
|
||||
result = get_user_org_role(user_id, org_id)
|
||||
assert result == 'admin'
|
||||
|
||||
def test_returns_none_when_not_member(self):
|
||||
"""
|
||||
GIVEN: User is not a member of organization
|
||||
WHEN: get_user_org_role is called
|
||||
THEN: None is returned
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.OrgMemberStore.get_org_member',
|
||||
return_value=None,
|
||||
):
|
||||
result = get_user_org_role(user_id, org_id)
|
||||
assert result is None
|
||||
|
||||
def test_returns_none_when_role_not_found(self):
|
||||
"""
|
||||
GIVEN: User is member but role not found
|
||||
WHEN: get_user_org_role is called
|
||||
THEN: None is returned
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
mock_org_member = MagicMock()
|
||||
mock_org_member.role_id = 999 # Non-existent role
|
||||
|
||||
with (
|
||||
patch(
|
||||
'server.auth.authorization.OrgMemberStore.get_org_member',
|
||||
return_value=mock_org_member,
|
||||
),
|
||||
patch(
|
||||
'server.auth.authorization.RoleStore.get_role_by_id',
|
||||
return_value=None,
|
||||
),
|
||||
):
|
||||
result = get_user_org_role(user_id, org_id)
|
||||
assert result is None
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tests for require_org_role dependency
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestRequireOrgRole:
|
||||
"""Tests for require_org_role dependency factory."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_user_id_when_authorized(self):
|
||||
"""
|
||||
GIVEN: User with sufficient role
|
||||
WHEN: Role checker is called
|
||||
THEN: User ID is returned
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role',
|
||||
return_value='admin',
|
||||
):
|
||||
role_checker = require_org_role(OrgRole.USER)
|
||||
result = await role_checker(org_id=org_id, user_id=user_id)
|
||||
assert result == user_id
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_raises_401_when_not_authenticated(self):
|
||||
"""
|
||||
GIVEN: No user ID (not authenticated)
|
||||
WHEN: Role checker is called
|
||||
THEN: 401 Unauthorized is raised
|
||||
"""
|
||||
org_id = uuid4()
|
||||
|
||||
role_checker = require_org_role(OrgRole.USER)
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await role_checker(org_id=org_id, user_id=None)
|
||||
|
||||
assert exc_info.value.status_code == 401
|
||||
assert 'not authenticated' in exc_info.value.detail.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_raises_403_when_not_member(self):
|
||||
"""
|
||||
GIVEN: User is not a member of organization
|
||||
WHEN: Role checker is called
|
||||
THEN: 403 Forbidden is raised
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role',
|
||||
return_value=None,
|
||||
):
|
||||
role_checker = require_org_role(OrgRole.USER)
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await role_checker(org_id=org_id, user_id=user_id)
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
assert 'not a member' in exc_info.value.detail.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_raises_403_when_insufficient_role(self):
|
||||
"""
|
||||
GIVEN: User with insufficient role
|
||||
WHEN: Role checker is called
|
||||
THEN: 403 Forbidden is raised
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role',
|
||||
return_value='user',
|
||||
):
|
||||
role_checker = require_org_role(OrgRole.ADMIN)
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await role_checker(org_id=org_id, user_id=user_id)
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
assert 'admin' in exc_info.value.detail.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_owner_satisfies_admin_requirement(self):
|
||||
"""
|
||||
GIVEN: User with owner role
|
||||
WHEN: Admin role is required
|
||||
THEN: User ID is returned (owner > admin)
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role',
|
||||
return_value='owner',
|
||||
):
|
||||
role_checker = require_org_role(OrgRole.ADMIN)
|
||||
result = await role_checker(org_id=org_id, user_id=user_id)
|
||||
assert result == user_id
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_logs_warning_on_insufficient_role(self):
|
||||
"""
|
||||
GIVEN: User with insufficient role
|
||||
WHEN: Role checker is called
|
||||
THEN: Warning is logged with details
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
with (
|
||||
patch(
|
||||
'server.auth.authorization.get_user_org_role',
|
||||
return_value='user',
|
||||
),
|
||||
patch('server.auth.authorization.logger') as mock_logger,
|
||||
):
|
||||
role_checker = require_org_role(OrgRole.OWNER)
|
||||
with pytest.raises(HTTPException):
|
||||
await role_checker(org_id=org_id, user_id=user_id)
|
||||
|
||||
mock_logger.warning.assert_called()
|
||||
call_args = mock_logger.warning.call_args
|
||||
assert call_args[1]['extra']['user_id'] == user_id
|
||||
assert call_args[1]['extra']['user_role'] == 'user'
|
||||
assert call_args[1]['extra']['required_role'] == 'owner'
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tests for convenience dependencies
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestConvenienceDependencies:
|
||||
"""Tests for pre-configured convenience dependencies."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_require_org_user_allows_user(self):
|
||||
"""
|
||||
GIVEN: User with user role
|
||||
WHEN: require_org_user is used
|
||||
THEN: User ID is returned
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role',
|
||||
return_value='user',
|
||||
):
|
||||
result = await require_org_user(org_id=org_id, user_id=user_id)
|
||||
assert result == user_id
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_require_org_admin_allows_admin(self):
|
||||
"""
|
||||
GIVEN: User with admin role
|
||||
WHEN: require_org_admin is used
|
||||
THEN: User ID is returned
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role',
|
||||
return_value='admin',
|
||||
):
|
||||
result = await require_org_admin(org_id=org_id, user_id=user_id)
|
||||
assert result == user_id
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_require_org_admin_rejects_user(self):
|
||||
"""
|
||||
GIVEN: User with user role
|
||||
WHEN: require_org_admin is used
|
||||
THEN: 403 Forbidden is raised
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role',
|
||||
return_value='user',
|
||||
):
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await require_org_admin(org_id=org_id, user_id=user_id)
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_require_org_owner_allows_owner(self):
|
||||
"""
|
||||
GIVEN: User with owner role
|
||||
WHEN: require_org_owner is used
|
||||
THEN: User ID is returned
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role',
|
||||
return_value='owner',
|
||||
):
|
||||
result = await require_org_owner(org_id=org_id, user_id=user_id)
|
||||
assert result == user_id
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_require_org_owner_rejects_admin(self):
|
||||
"""
|
||||
GIVEN: User with admin role
|
||||
WHEN: require_org_owner is used
|
||||
THEN: 403 Forbidden is raised
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role',
|
||||
return_value='admin',
|
||||
):
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await require_org_owner(org_id=org_id, user_id=user_id)
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
16
openhands/server/auth/__init__.py
Normal file
16
openhands/server/auth/__init__.py
Normal file
@@ -0,0 +1,16 @@
|
||||
"""
|
||||
Authorization module for OpenHands.
|
||||
|
||||
In OSS mode, authorization is a no-op - all checks pass.
|
||||
In SAAS mode, the enterprise implementation performs real authorization checks.
|
||||
"""
|
||||
|
||||
from openhands.server.auth.authorization import (
|
||||
Permission,
|
||||
require_permission,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
'Permission',
|
||||
'require_permission',
|
||||
]
|
||||
50
openhands/server/auth/authorization.py
Normal file
50
openhands/server/auth/authorization.py
Normal file
@@ -0,0 +1,50 @@
|
||||
"""
|
||||
Permission-based authorization for API endpoints (OSS/OpenHands mode).
|
||||
|
||||
In OSS mode, authorization is a no-op - all checks pass.
|
||||
For SAAS mode with real authorization checks, see the enterprise implementation.
|
||||
"""
|
||||
|
||||
from enum import Enum
|
||||
from uuid import UUID
|
||||
|
||||
from fastapi import Depends
|
||||
|
||||
from openhands.server.user_auth import get_user_id
|
||||
|
||||
|
||||
class Permission(str, Enum):
|
||||
"""Permissions that can be checked in authorization."""
|
||||
|
||||
MANAGE_SECRETS = 'manage_secrets'
|
||||
MANAGE_MCP = 'manage_mcp'
|
||||
MANAGE_INTEGRATIONS = 'manage_integrations'
|
||||
MANAGE_APPLICATION_SETTINGS = 'manage_application_settings'
|
||||
MANAGE_API_KEYS = 'manage_api_keys'
|
||||
VIEW_LLM_SETTINGS = 'view_llm_settings'
|
||||
EDIT_LLM_SETTINGS = 'edit_llm_settings'
|
||||
VIEW_BILLING = 'view_billing'
|
||||
ADD_CREDITS = 'add_credits'
|
||||
INVITE_USER_TO_ORGANIZATION = 'invite_user_to_organization'
|
||||
CHANGE_USER_ROLE_MEMBER = 'change_user_role:member'
|
||||
CHANGE_USER_ROLE_ADMIN = 'change_user_role:admin'
|
||||
CHANGE_USER_ROLE_OWNER = 'change_user_role:owner'
|
||||
CHANGE_ORGANIZATION_NAME = 'change_organization_name'
|
||||
DELETE_ORGANIZATION = 'delete_organization'
|
||||
|
||||
|
||||
def require_permission(permission: Permission):
|
||||
"""
|
||||
No-op authorization dependency for OSS mode.
|
||||
|
||||
Returns the user_id without performing any permission checks.
|
||||
In SAAS mode, the enterprise implementation overrides this.
|
||||
"""
|
||||
|
||||
async def permission_checker(
|
||||
org_id: UUID,
|
||||
user_id: str | None = Depends(get_user_id),
|
||||
) -> str | None:
|
||||
return user_id
|
||||
|
||||
return permission_checker
|
||||
Reference in New Issue
Block a user