Compare commits

...

8 Commits

Author SHA1 Message Date
openhands
9c061d178b refactor(auth): simplify OSS authorization to bare minimum
OSS module (openhands/server/auth/):
- Permission enum only
- Single no-op require_permission() that returns user_id
- No roles, no ROLE_PERMISSIONS, no helper methods

Enterprise module (enterprise/server/auth/authorization.py):
- Imports Permission from OSS module
- Defines RoleName enum and ROLE_PERMISSIONS locally
- Implements all authorization logic: require_permission(),
  require_org_role(), has_permission(), get_role_permissions(), etc.

Co-authored-by: openhands <openhands@all-hands.dev>
2026-02-17 01:15:08 +00:00
openhands
cc7e002ffe feat(auth): add OSS no-op authorization with SAAS override
- Create openhands/server/auth/ module with no-op authorization
  - Permission and RoleName enums defined in OSS module
  - ROLE_PERMISSIONS mapping defined in OSS module
  - No-op require_permission() and require_org_role() dependencies
  - All authorization checks pass in OSS mode

- Update enterprise authorization to import from OSS module
  - Import Permission, RoleName, ROLE_PERMISSIONS from OSS
  - Import get_role_permissions() helper from OSS
  - Override require_permission() and require_org_role() with real checks
  - Enterprise performs actual permission validation in SAAS mode

This allows the same authorization decorators to be used in both modes:
- OSS mode: All checks pass (no-op)
- SAAS mode: Real permission checks are enforced

Co-authored-by: openhands <openhands@all-hands.dev>
2026-02-16 23:44:39 +00:00
openhands
d058b64597 feat(auth): implement permission-based authorization
- Add Permission enum with 15 granular permissions
- Add RoleName enum for owner, admin, member roles
- Add ROLE_PERMISSIONS mapping with correct permission sets per role
- Add require_permission() FastAPI dependency for permission-based access control
- Add helper functions: get_role_permissions(), has_permission()

Permission mapping:
- Member: manage_secrets, manage_mcp, manage_integrations,
  manage_application_settings, manage_api_keys, view_llm_settings
- Admin: All member permissions + edit_llm_settings, view_billing,
  add_credits, invite_user_to_organization, change_user_role:member,
  change_user_role:admin
- Owner: All admin permissions + change_user_role:owner,
  change_organization_name, delete_organization

Co-authored-by: openhands <openhands@all-hands.dev>
2026-02-16 23:09:16 +00:00
Chuck Butkus
954c922a98 Merge branch 'main' into feature/role-based-authorization 2026-02-16 14:23:31 -05:00
openhands
c9cf142949 refactor: use Role class for role hierarchy in authorization
- Remove hardcoded OrgRole enum and ROLE_HIERARCHY dictionary
- Import Role class from storage.role module
- Update get_user_org_role() to return Role object instead of string
- Update has_required_role() to compare roles using rank field
- Lower rank = higher position in hierarchy (e.g., rank 1 > rank 2 > rank 3)
- Update require_org_role() to accept role name string and fetch Role from database
- Add error handling for missing role in database

Co-authored-by: openhands <openhands@all-hands.dev>
2026-02-04 04:18:15 +00:00
Chuck Butkus
7e0b6a39c1 Merge branch 'main' into feature/role-based-authorization 2026-02-03 22:43:19 -05:00
Chuck Butkus
268fa43ba9 Lint fix 2026-02-03 02:05:40 -05:00
openhands
ff14104db4 feat: Add role-based authorization for org-scoped endpoints
Implement role-based authorization using FastAPI dependencies that check
user roles (owner, admin, user) within organizations.

Changes:
- Add enterprise/server/auth/authorization.py with:
  - OrgRole enum for role types
  - Role hierarchy (owner > admin > user)
  - get_user_org_role() to retrieve user's role in an org
  - has_required_role() to check role hierarchy
  - require_org_role() dependency factory
  - Convenience dependencies: require_org_user, require_org_admin, require_org_owner

- Update enterprise/server/routes/orgs.py:
  - GET /{org_id}: require_org_user (any member can view)
  - PATCH /{org_id}: require_org_admin (admin or owner can update)
  - DELETE /{org_id}: require_org_owner (only owner can delete)

- Add comprehensive unit tests for authorization module

Co-authored-by: openhands <openhands@all-hands.dev>
2026-02-03 06:59:35 +00:00
5 changed files with 899 additions and 12 deletions

View File

@@ -0,0 +1,320 @@
"""
Permission-based authorization dependencies for API endpoints (SAAS mode).
This module provides FastAPI dependencies for checking user permissions
within organizations. It uses a permission-based authorization model where
roles (owner, admin, member) are mapped to specific permissions.
This is the SAAS/enterprise implementation that performs real authorization
checks against the database.
Usage:
from server.auth.authorization import (
Permission,
require_permission,
require_org_role,
require_org_user,
require_org_admin,
require_org_owner,
)
@router.get('/{org_id}/settings')
async def get_settings(
org_id: UUID,
user_id: str = Depends(require_permission(Permission.VIEW_LLM_SETTINGS)),
):
# Only users with VIEW_LLM_SETTINGS permission can access
...
"""
from enum import Enum
from uuid import UUID
from fastapi import Depends, HTTPException, status
from storage.org_member_store import OrgMemberStore
from storage.role import Role
from storage.role_store import RoleStore
from openhands.core.logger import openhands_logger as logger
from openhands.server.auth import Permission
from openhands.server.user_auth import get_user_id
class RoleName(str, Enum):
"""Role names used in the system."""
OWNER = 'owner'
ADMIN = 'admin'
MEMBER = 'member'
# Permission mappings for each role
ROLE_PERMISSIONS: dict[RoleName, frozenset[Permission]] = {
RoleName.OWNER: frozenset(
[
# Settings (Full access)
Permission.MANAGE_SECRETS,
Permission.MANAGE_MCP,
Permission.MANAGE_INTEGRATIONS,
Permission.MANAGE_APPLICATION_SETTINGS,
Permission.MANAGE_API_KEYS,
Permission.VIEW_LLM_SETTINGS,
Permission.EDIT_LLM_SETTINGS,
Permission.VIEW_BILLING,
Permission.ADD_CREDITS,
# Organization Members
Permission.INVITE_USER_TO_ORGANIZATION,
Permission.CHANGE_USER_ROLE_MEMBER,
Permission.CHANGE_USER_ROLE_ADMIN,
Permission.CHANGE_USER_ROLE_OWNER,
# Organization Management (Owner only)
Permission.CHANGE_ORGANIZATION_NAME,
Permission.DELETE_ORGANIZATION,
]
),
RoleName.ADMIN: frozenset(
[
# Settings (Full access)
Permission.MANAGE_SECRETS,
Permission.MANAGE_MCP,
Permission.MANAGE_INTEGRATIONS,
Permission.MANAGE_APPLICATION_SETTINGS,
Permission.MANAGE_API_KEYS,
Permission.VIEW_LLM_SETTINGS,
Permission.EDIT_LLM_SETTINGS,
Permission.VIEW_BILLING,
Permission.ADD_CREDITS,
# Organization Members
Permission.INVITE_USER_TO_ORGANIZATION,
Permission.CHANGE_USER_ROLE_MEMBER,
Permission.CHANGE_USER_ROLE_ADMIN,
]
),
RoleName.MEMBER: frozenset(
[
# Settings (Full access)
Permission.MANAGE_SECRETS,
Permission.MANAGE_MCP,
Permission.MANAGE_INTEGRATIONS,
Permission.MANAGE_APPLICATION_SETTINGS,
Permission.MANAGE_API_KEYS,
# LLM Settings (View only)
Permission.VIEW_LLM_SETTINGS,
]
),
}
def get_role_permissions(role_name: str) -> frozenset[Permission]:
"""Get the permissions for a role."""
try:
role_enum = RoleName(role_name)
return ROLE_PERMISSIONS.get(role_enum, frozenset())
except ValueError:
return frozenset()
def get_user_org_role(user_id: str, org_id: UUID) -> Role | None:
"""
Get the user's role in an organization.
Args:
user_id: User ID (string that will be converted to UUID)
org_id: Organization ID
Returns:
Role object if user is a member, None otherwise
"""
from uuid import UUID as parse_uuid
org_member = OrgMemberStore.get_org_member(org_id, parse_uuid(user_id))
if not org_member:
return None
return RoleStore.get_role_by_id(org_member.role_id)
def has_permission(user_role: Role, permission: Permission) -> bool:
"""
Check if a role has a specific permission.
Args:
user_role: User's Role object
permission: Permission to check
Returns:
True if the role has the permission
"""
permissions = get_role_permissions(user_role.name)
return permission in permissions
def has_required_role(user_role: Role, required_role: Role) -> bool:
"""
Check if user's role meets or exceeds the required role.
Uses role hierarchy based on rank where lower rank = higher position
(e.g., rank 1 owner > rank 2 admin > rank 3 user).
Args:
user_role: User's actual Role object
required_role: Minimum required Role object
Returns:
True if user has sufficient permissions
"""
return user_role.rank <= required_role.rank
def require_permission(permission: Permission):
"""
Factory function that creates a dependency to require a specific permission.
This creates a FastAPI dependency that:
1. Extracts org_id from the path parameter
2. Gets the authenticated user_id
3. Checks if the user has the required permission in the organization
4. Returns the user_id if authorized, raises HTTPException otherwise
Usage:
@router.get('/{org_id}/settings')
async def get_settings(
org_id: UUID,
user_id: str = Depends(require_permission(Permission.VIEW_LLM_SETTINGS)),
):
...
Args:
permission: The permission required to access the endpoint
Returns:
Dependency function that validates permission and returns user_id
"""
async def permission_checker(
org_id: UUID,
user_id: str | None = Depends(get_user_id),
) -> str:
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='User not authenticated',
)
user_role = get_user_org_role(user_id, org_id)
if not user_role:
logger.warning(
'User not a member of organization',
extra={'user_id': user_id, 'org_id': str(org_id)},
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail='User is not a member of this organization',
)
if not has_permission(user_role, permission):
logger.warning(
'Insufficient permissions',
extra={
'user_id': user_id,
'org_id': str(org_id),
'user_role': user_role.name,
'required_permission': permission.value,
},
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f'Requires {permission.value} permission',
)
return user_id
return permission_checker
def require_org_role(required_role_name: str):
"""
Factory function that creates a dependency to require a minimum org role.
This creates a FastAPI dependency that:
1. Extracts org_id from the path parameter
2. Gets the authenticated user_id
3. Checks if the user has the required role in the organization
4. Returns the user_id if authorized, raises HTTPException otherwise
Role hierarchy is based on rank from the Role class, where
lower rank = higher position (e.g., rank 1 > rank 2 > rank 3).
Usage:
@router.get('/{org_id}/resource')
async def get_resource(
org_id: UUID,
user_id: str = Depends(require_org_role('user')),
):
...
Args:
required_role_name: Name of the minimum required role to access the endpoint
Returns:
Dependency function that validates role and returns user_id
"""
async def role_checker(
org_id: UUID,
user_id: str | None = Depends(get_user_id),
) -> str:
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='User not authenticated',
)
user_role = get_user_org_role(user_id, org_id)
if not user_role:
logger.warning(
'User not a member of organization',
extra={'user_id': user_id, 'org_id': str(org_id)},
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail='User is not a member of this organization',
)
required_role = RoleStore.get_role_by_name(required_role_name)
if not required_role:
logger.error(
'Required role not found in database',
extra={'required_role': required_role_name},
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='Role configuration error',
)
if not has_required_role(user_role, required_role):
logger.warning(
'Insufficient role permissions',
extra={
'user_id': user_id,
'org_id': str(org_id),
'user_role': user_role.name,
'required_role': required_role_name,
},
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f'Requires {required_role_name} role or higher',
)
return user_id
return role_checker
# Convenience dependencies for common role checks
require_org_user = require_org_role('user')
require_org_admin = require_org_role('admin')
require_org_owner = require_org_role('owner')

View File

@@ -2,6 +2,11 @@ from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, status
from server.auth.authorization import (
require_org_admin,
require_org_owner,
require_org_user,
)
from server.email_validation import get_admin_user_id
from server.routes.org_models import (
CannotModifySelfError,
@@ -189,23 +194,26 @@ async def create_org(
@org_router.get('/{org_id}', response_model=OrgResponse, status_code=status.HTTP_200_OK)
async def get_org(
org_id: UUID,
user_id: str = Depends(get_user_id),
user_id: str = Depends(require_org_user),
) -> OrgResponse:
"""Get organization details by ID.
This endpoint allows authenticated users who are members of an organization
to retrieve its details. Only members of the organization can access this endpoint.
Requires user, admin, or owner role.
Args:
org_id: Organization ID (UUID)
user_id: Authenticated user ID (injected by dependency)
user_id: Authenticated user ID (injected by dependency, requires org membership)
Returns:
OrgResponse: The organization details
Raises:
HTTPException: 401 if user is not authenticated
HTTPException: 403 if user is not a member of the organization
HTTPException: 422 if org_id is not a valid UUID (handled by FastAPI)
HTTPException: 404 if organization not found or user is not a member
HTTPException: 404 if organization not found
HTTPException: 500 if retrieval fails
"""
logger.info(
@@ -305,23 +313,25 @@ async def get_me(
@org_router.delete('/{org_id}', status_code=status.HTTP_200_OK)
async def delete_org(
org_id: UUID,
user_id: str = Depends(get_user_id),
user_id: str = Depends(require_org_owner),
) -> dict:
"""Delete an organization.
This endpoint allows authenticated organization owners to delete their organization.
All associated data including organization members, conversations, billing data,
and external LiteLLM team resources will be permanently removed.
Requires owner role.
Args:
org_id: Organization ID to delete
user_id: Authenticated user ID (injected by dependency)
user_id: Authenticated user ID (injected by dependency, requires owner role)
Returns:
dict: Confirmation message with deleted organization details
Raises:
HTTPException: 403 if user is not the organization owner
HTTPException: 401 if user is not authenticated
HTTPException: 403 if user is not an owner of the organization
HTTPException: 404 if organization not found
HTTPException: 500 if deletion fails
"""
@@ -414,24 +424,24 @@ async def delete_org(
async def update_org(
org_id: UUID,
update_data: OrgUpdate,
user_id: str = Depends(get_user_id),
user_id: str = Depends(require_org_admin),
) -> OrgResponse:
"""Update an existing organization.
This endpoint allows authenticated users to update organization settings.
LLM-related settings require admin or owner role in the organization.
This endpoint allows authenticated admins and owners to update organization settings.
Requires admin or owner role in the organization.
Args:
org_id: Organization ID to update (UUID validated by FastAPI)
update_data: Organization update data
user_id: Authenticated user ID (injected by dependency)
user_id: Authenticated user ID (injected by dependency, requires admin role)
Returns:
OrgResponse: The updated organization details
Raises:
HTTPException: 400 if org_id is invalid UUID format (handled by FastAPI)
HTTPException: 403 if user lacks permission for LLM settings
HTTPException: 401 if user is not authenticated
HTTPException: 403 if user is not an admin or owner of the organization
HTTPException: 404 if organization not found
HTTPException: 422 if validation errors occur (handled by FastAPI)
HTTPException: 500 if update fails

View File

@@ -0,0 +1,491 @@
"""
Unit tests for role-based authorization (authorization.py).
Tests the FastAPI dependencies that validate user roles within organizations.
"""
from unittest.mock import MagicMock, patch
from uuid import uuid4
import pytest
from fastapi import HTTPException
from server.auth.authorization import (
ROLE_HIERARCHY,
OrgRole,
get_user_org_role,
has_required_role,
require_org_admin,
require_org_owner,
require_org_role,
require_org_user,
)
# =============================================================================
# Tests for OrgRole enum
# =============================================================================
class TestOrgRole:
"""Tests for OrgRole enum."""
def test_org_role_values(self):
"""
GIVEN: OrgRole enum
WHEN: Accessing role values
THEN: All expected roles exist with correct string values
"""
assert OrgRole.OWNER.value == 'owner'
assert OrgRole.ADMIN.value == 'admin'
assert OrgRole.USER.value == 'user'
def test_org_role_from_string(self):
"""
GIVEN: Valid role string
WHEN: Creating OrgRole from string
THEN: Correct enum value is returned
"""
assert OrgRole('owner') == OrgRole.OWNER
assert OrgRole('admin') == OrgRole.ADMIN
assert OrgRole('user') == OrgRole.USER
def test_org_role_invalid_string(self):
"""
GIVEN: Invalid role string
WHEN: Creating OrgRole from string
THEN: ValueError is raised
"""
with pytest.raises(ValueError):
OrgRole('invalid_role')
# =============================================================================
# Tests for role hierarchy
# =============================================================================
class TestRoleHierarchy:
"""Tests for role hierarchy constants."""
def test_owner_highest_rank(self):
"""
GIVEN: Role hierarchy
WHEN: Comparing role ranks
THEN: Owner has highest rank
"""
assert ROLE_HIERARCHY[OrgRole.OWNER] > ROLE_HIERARCHY[OrgRole.ADMIN]
assert ROLE_HIERARCHY[OrgRole.OWNER] > ROLE_HIERARCHY[OrgRole.USER]
def test_admin_middle_rank(self):
"""
GIVEN: Role hierarchy
WHEN: Comparing role ranks
THEN: Admin is between owner and user
"""
assert ROLE_HIERARCHY[OrgRole.ADMIN] > ROLE_HIERARCHY[OrgRole.USER]
assert ROLE_HIERARCHY[OrgRole.ADMIN] < ROLE_HIERARCHY[OrgRole.OWNER]
def test_user_lowest_rank(self):
"""
GIVEN: Role hierarchy
WHEN: Comparing role ranks
THEN: User has lowest rank
"""
assert ROLE_HIERARCHY[OrgRole.USER] < ROLE_HIERARCHY[OrgRole.ADMIN]
assert ROLE_HIERARCHY[OrgRole.USER] < ROLE_HIERARCHY[OrgRole.OWNER]
# =============================================================================
# Tests for has_required_role function
# =============================================================================
class TestHasRequiredRole:
"""Tests for has_required_role function."""
def test_owner_has_owner_role(self):
"""
GIVEN: User with owner role
WHEN: Checking for owner requirement
THEN: Returns True
"""
assert has_required_role('owner', OrgRole.OWNER) is True
def test_owner_has_admin_role(self):
"""
GIVEN: User with owner role
WHEN: Checking for admin requirement
THEN: Returns True (owner > admin)
"""
assert has_required_role('owner', OrgRole.ADMIN) is True
def test_owner_has_user_role(self):
"""
GIVEN: User with owner role
WHEN: Checking for user requirement
THEN: Returns True (owner > user)
"""
assert has_required_role('owner', OrgRole.USER) is True
def test_admin_has_admin_role(self):
"""
GIVEN: User with admin role
WHEN: Checking for admin requirement
THEN: Returns True
"""
assert has_required_role('admin', OrgRole.ADMIN) is True
def test_admin_has_user_role(self):
"""
GIVEN: User with admin role
WHEN: Checking for user requirement
THEN: Returns True (admin > user)
"""
assert has_required_role('admin', OrgRole.USER) is True
def test_admin_lacks_owner_role(self):
"""
GIVEN: User with admin role
WHEN: Checking for owner requirement
THEN: Returns False (admin < owner)
"""
assert has_required_role('admin', OrgRole.OWNER) is False
def test_user_has_user_role(self):
"""
GIVEN: User with user role
WHEN: Checking for user requirement
THEN: Returns True
"""
assert has_required_role('user', OrgRole.USER) is True
def test_user_lacks_admin_role(self):
"""
GIVEN: User with user role
WHEN: Checking for admin requirement
THEN: Returns False (user < admin)
"""
assert has_required_role('user', OrgRole.ADMIN) is False
def test_user_lacks_owner_role(self):
"""
GIVEN: User with user role
WHEN: Checking for owner requirement
THEN: Returns False (user < owner)
"""
assert has_required_role('user', OrgRole.OWNER) is False
def test_invalid_role_returns_false(self):
"""
GIVEN: Invalid role string
WHEN: Checking for any requirement
THEN: Returns False
"""
assert has_required_role('invalid_role', OrgRole.USER) is False
assert has_required_role('invalid_role', OrgRole.ADMIN) is False
assert has_required_role('invalid_role', OrgRole.OWNER) is False
# =============================================================================
# Tests for get_user_org_role function
# =============================================================================
class TestGetUserOrgRole:
"""Tests for get_user_org_role function."""
def test_returns_role_when_member_exists(self):
"""
GIVEN: User is a member of organization with role
WHEN: get_user_org_role is called
THEN: Role name is returned
"""
user_id = str(uuid4())
org_id = uuid4()
mock_org_member = MagicMock()
mock_org_member.role_id = 1
mock_role = MagicMock()
mock_role.name = 'admin'
with (
patch(
'server.auth.authorization.OrgMemberStore.get_org_member',
return_value=mock_org_member,
),
patch(
'server.auth.authorization.RoleStore.get_role_by_id',
return_value=mock_role,
),
):
result = get_user_org_role(user_id, org_id)
assert result == 'admin'
def test_returns_none_when_not_member(self):
"""
GIVEN: User is not a member of organization
WHEN: get_user_org_role is called
THEN: None is returned
"""
user_id = str(uuid4())
org_id = uuid4()
with patch(
'server.auth.authorization.OrgMemberStore.get_org_member',
return_value=None,
):
result = get_user_org_role(user_id, org_id)
assert result is None
def test_returns_none_when_role_not_found(self):
"""
GIVEN: User is member but role not found
WHEN: get_user_org_role is called
THEN: None is returned
"""
user_id = str(uuid4())
org_id = uuid4()
mock_org_member = MagicMock()
mock_org_member.role_id = 999 # Non-existent role
with (
patch(
'server.auth.authorization.OrgMemberStore.get_org_member',
return_value=mock_org_member,
),
patch(
'server.auth.authorization.RoleStore.get_role_by_id',
return_value=None,
),
):
result = get_user_org_role(user_id, org_id)
assert result is None
# =============================================================================
# Tests for require_org_role dependency
# =============================================================================
class TestRequireOrgRole:
"""Tests for require_org_role dependency factory."""
@pytest.mark.asyncio
async def test_returns_user_id_when_authorized(self):
"""
GIVEN: User with sufficient role
WHEN: Role checker is called
THEN: User ID is returned
"""
user_id = str(uuid4())
org_id = uuid4()
with patch(
'server.auth.authorization.get_user_org_role',
return_value='admin',
):
role_checker = require_org_role(OrgRole.USER)
result = await role_checker(org_id=org_id, user_id=user_id)
assert result == user_id
@pytest.mark.asyncio
async def test_raises_401_when_not_authenticated(self):
"""
GIVEN: No user ID (not authenticated)
WHEN: Role checker is called
THEN: 401 Unauthorized is raised
"""
org_id = uuid4()
role_checker = require_org_role(OrgRole.USER)
with pytest.raises(HTTPException) as exc_info:
await role_checker(org_id=org_id, user_id=None)
assert exc_info.value.status_code == 401
assert 'not authenticated' in exc_info.value.detail.lower()
@pytest.mark.asyncio
async def test_raises_403_when_not_member(self):
"""
GIVEN: User is not a member of organization
WHEN: Role checker is called
THEN: 403 Forbidden is raised
"""
user_id = str(uuid4())
org_id = uuid4()
with patch(
'server.auth.authorization.get_user_org_role',
return_value=None,
):
role_checker = require_org_role(OrgRole.USER)
with pytest.raises(HTTPException) as exc_info:
await role_checker(org_id=org_id, user_id=user_id)
assert exc_info.value.status_code == 403
assert 'not a member' in exc_info.value.detail.lower()
@pytest.mark.asyncio
async def test_raises_403_when_insufficient_role(self):
"""
GIVEN: User with insufficient role
WHEN: Role checker is called
THEN: 403 Forbidden is raised
"""
user_id = str(uuid4())
org_id = uuid4()
with patch(
'server.auth.authorization.get_user_org_role',
return_value='user',
):
role_checker = require_org_role(OrgRole.ADMIN)
with pytest.raises(HTTPException) as exc_info:
await role_checker(org_id=org_id, user_id=user_id)
assert exc_info.value.status_code == 403
assert 'admin' in exc_info.value.detail.lower()
@pytest.mark.asyncio
async def test_owner_satisfies_admin_requirement(self):
"""
GIVEN: User with owner role
WHEN: Admin role is required
THEN: User ID is returned (owner > admin)
"""
user_id = str(uuid4())
org_id = uuid4()
with patch(
'server.auth.authorization.get_user_org_role',
return_value='owner',
):
role_checker = require_org_role(OrgRole.ADMIN)
result = await role_checker(org_id=org_id, user_id=user_id)
assert result == user_id
@pytest.mark.asyncio
async def test_logs_warning_on_insufficient_role(self):
"""
GIVEN: User with insufficient role
WHEN: Role checker is called
THEN: Warning is logged with details
"""
user_id = str(uuid4())
org_id = uuid4()
with (
patch(
'server.auth.authorization.get_user_org_role',
return_value='user',
),
patch('server.auth.authorization.logger') as mock_logger,
):
role_checker = require_org_role(OrgRole.OWNER)
with pytest.raises(HTTPException):
await role_checker(org_id=org_id, user_id=user_id)
mock_logger.warning.assert_called()
call_args = mock_logger.warning.call_args
assert call_args[1]['extra']['user_id'] == user_id
assert call_args[1]['extra']['user_role'] == 'user'
assert call_args[1]['extra']['required_role'] == 'owner'
# =============================================================================
# Tests for convenience dependencies
# =============================================================================
class TestConvenienceDependencies:
"""Tests for pre-configured convenience dependencies."""
@pytest.mark.asyncio
async def test_require_org_user_allows_user(self):
"""
GIVEN: User with user role
WHEN: require_org_user is used
THEN: User ID is returned
"""
user_id = str(uuid4())
org_id = uuid4()
with patch(
'server.auth.authorization.get_user_org_role',
return_value='user',
):
result = await require_org_user(org_id=org_id, user_id=user_id)
assert result == user_id
@pytest.mark.asyncio
async def test_require_org_admin_allows_admin(self):
"""
GIVEN: User with admin role
WHEN: require_org_admin is used
THEN: User ID is returned
"""
user_id = str(uuid4())
org_id = uuid4()
with patch(
'server.auth.authorization.get_user_org_role',
return_value='admin',
):
result = await require_org_admin(org_id=org_id, user_id=user_id)
assert result == user_id
@pytest.mark.asyncio
async def test_require_org_admin_rejects_user(self):
"""
GIVEN: User with user role
WHEN: require_org_admin is used
THEN: 403 Forbidden is raised
"""
user_id = str(uuid4())
org_id = uuid4()
with patch(
'server.auth.authorization.get_user_org_role',
return_value='user',
):
with pytest.raises(HTTPException) as exc_info:
await require_org_admin(org_id=org_id, user_id=user_id)
assert exc_info.value.status_code == 403
@pytest.mark.asyncio
async def test_require_org_owner_allows_owner(self):
"""
GIVEN: User with owner role
WHEN: require_org_owner is used
THEN: User ID is returned
"""
user_id = str(uuid4())
org_id = uuid4()
with patch(
'server.auth.authorization.get_user_org_role',
return_value='owner',
):
result = await require_org_owner(org_id=org_id, user_id=user_id)
assert result == user_id
@pytest.mark.asyncio
async def test_require_org_owner_rejects_admin(self):
"""
GIVEN: User with admin role
WHEN: require_org_owner is used
THEN: 403 Forbidden is raised
"""
user_id = str(uuid4())
org_id = uuid4()
with patch(
'server.auth.authorization.get_user_org_role',
return_value='admin',
):
with pytest.raises(HTTPException) as exc_info:
await require_org_owner(org_id=org_id, user_id=user_id)
assert exc_info.value.status_code == 403

View File

@@ -0,0 +1,16 @@
"""
Authorization module for OpenHands.
In OSS mode, authorization is a no-op - all checks pass.
In SAAS mode, the enterprise implementation performs real authorization checks.
"""
from openhands.server.auth.authorization import (
Permission,
require_permission,
)
__all__ = [
'Permission',
'require_permission',
]

View File

@@ -0,0 +1,50 @@
"""
Permission-based authorization for API endpoints (OSS/OpenHands mode).
In OSS mode, authorization is a no-op - all checks pass.
For SAAS mode with real authorization checks, see the enterprise implementation.
"""
from enum import Enum
from uuid import UUID
from fastapi import Depends
from openhands.server.user_auth import get_user_id
class Permission(str, Enum):
"""Permissions that can be checked in authorization."""
MANAGE_SECRETS = 'manage_secrets'
MANAGE_MCP = 'manage_mcp'
MANAGE_INTEGRATIONS = 'manage_integrations'
MANAGE_APPLICATION_SETTINGS = 'manage_application_settings'
MANAGE_API_KEYS = 'manage_api_keys'
VIEW_LLM_SETTINGS = 'view_llm_settings'
EDIT_LLM_SETTINGS = 'edit_llm_settings'
VIEW_BILLING = 'view_billing'
ADD_CREDITS = 'add_credits'
INVITE_USER_TO_ORGANIZATION = 'invite_user_to_organization'
CHANGE_USER_ROLE_MEMBER = 'change_user_role:member'
CHANGE_USER_ROLE_ADMIN = 'change_user_role:admin'
CHANGE_USER_ROLE_OWNER = 'change_user_role:owner'
CHANGE_ORGANIZATION_NAME = 'change_organization_name'
DELETE_ORGANIZATION = 'delete_organization'
def require_permission(permission: Permission):
"""
No-op authorization dependency for OSS mode.
Returns the user_id without performing any permission checks.
In SAAS mode, the enterprise implementation overrides this.
"""
async def permission_checker(
org_id: UUID,
user_id: str | None = Depends(get_user_id),
) -> str | None:
return user_id
return permission_checker