mirror of
https://github.com/All-Hands-AI/OpenHands.git
synced 2026-04-29 03:00:45 -04:00
Fix mypy type errors in token_manager.py and auth_token_store.py (#13179)
Co-authored-by: openhands <openhands@all-hands.dev> Co-authored-by: OpenHands Bot <contact@all-hands.dev>
This commit is contained in:
@@ -191,12 +191,11 @@ class SaaSGitLabService(GitLabService):
|
||||
user_info = await self.token_manager.get_user_info(
|
||||
self.external_auth_token.get_secret_value()
|
||||
)
|
||||
keycloak_user_id = user_info.get('sub')
|
||||
if keycloak_user_id:
|
||||
self.external_auth_id = keycloak_user_id
|
||||
logger.info(
|
||||
f'Determined external_auth_id from Keycloak token: {self.external_auth_id}'
|
||||
)
|
||||
keycloak_user_id = user_info.sub
|
||||
self.external_auth_id = keycloak_user_id
|
||||
logger.info(
|
||||
f'Determined external_auth_id from Keycloak token: {self.external_auth_id}'
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
'Cannot store repository data: external_auth_id is not set and could not be determined from token',
|
||||
|
||||
@@ -16,6 +16,7 @@ from keycloak.exceptions import (
|
||||
KeycloakError,
|
||||
KeycloakPostError,
|
||||
)
|
||||
from pydantic import BaseModel
|
||||
from server.auth.auth_error import ExpiredError
|
||||
from server.auth.constants import (
|
||||
BITBUCKET_APP_CLIENT_ID,
|
||||
@@ -49,6 +50,30 @@ from openhands.integrations.service_types import ProviderType
|
||||
from openhands.server.types import SessionExpiredError
|
||||
from openhands.utils.http_session import httpx_verify_option
|
||||
|
||||
|
||||
class KeycloakUserInfo(BaseModel):
|
||||
"""Pydantic model for Keycloak UserInfo endpoint response.
|
||||
|
||||
Based on OIDC standard claims. 'sub' is always required per OIDC spec.
|
||||
Additional fields from Keycloak are captured via model_config extra='allow'.
|
||||
"""
|
||||
|
||||
model_config = {'extra': 'allow'}
|
||||
|
||||
sub: str
|
||||
name: str | None = None
|
||||
given_name: str | None = None
|
||||
family_name: str | None = None
|
||||
preferred_username: str | None = None
|
||||
email: str | None = None
|
||||
email_verified: bool | None = None
|
||||
picture: str | None = None
|
||||
attributes: dict[str, list[str]] | None = None
|
||||
identity_provider: str | None = None
|
||||
company: str | None = None
|
||||
roles: list[str] | None = None
|
||||
|
||||
|
||||
# HTTP timeout for external IDP calls (in seconds)
|
||||
# This prevents indefinite blocking if an IDP is slow or unresponsive
|
||||
IDP_HTTP_TIMEOUT = 15.0
|
||||
@@ -141,22 +166,22 @@ class TokenManager:
|
||||
new_keycloak_tokens['refresh_token'],
|
||||
)
|
||||
|
||||
# UserInfo from Keycloak return a dictionary with the following format:
|
||||
# {
|
||||
# 'sub': '248289761001',
|
||||
# 'name': 'Jane Doe',
|
||||
# 'given_name': 'Jane',
|
||||
# 'family_name': 'Doe',
|
||||
# 'preferred_username': 'j.doe',
|
||||
# 'email': 'janedoe@example.com',
|
||||
# 'picture': 'http://example.com/janedoe/me.jpg'
|
||||
# 'github_id': '354322532'
|
||||
# }
|
||||
async def get_user_info(self, access_token: str) -> dict:
|
||||
if not access_token:
|
||||
return {}
|
||||
async def get_user_info(self, access_token: str) -> KeycloakUserInfo:
|
||||
"""Get user info from Keycloak userinfo endpoint.
|
||||
|
||||
Args:
|
||||
access_token: A valid Keycloak access token
|
||||
|
||||
Returns:
|
||||
KeycloakUserInfo with user claims. 'sub' is always present per OIDC spec.
|
||||
|
||||
Raises:
|
||||
KeycloakAuthenticationError: If the token is invalid
|
||||
ValidationError: If the response is missing the required 'sub' field
|
||||
"""
|
||||
user_info = await get_keycloak_openid(self.external).a_userinfo(access_token)
|
||||
return user_info
|
||||
# Pydantic validation will raise ValidationError if 'sub' is missing
|
||||
return KeycloakUserInfo.model_validate(user_info)
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(2),
|
||||
@@ -270,8 +295,8 @@ class TokenManager:
|
||||
) -> str:
|
||||
# Get user info to determine user_id and idp
|
||||
user_info = await self.get_user_info(access_token=access_token)
|
||||
user_id = user_info.get('sub')
|
||||
username = user_info.get('preferred_username')
|
||||
user_id = user_info.sub
|
||||
username = user_info.preferred_username
|
||||
logger.info(f'Getting token for user {username} and IDP {idp}')
|
||||
token_store = await AuthTokenStore.get_instance(
|
||||
keycloak_user_id=user_id, idp=idp
|
||||
|
||||
@@ -189,34 +189,35 @@ async def keycloak_callback(
|
||||
|
||||
user_info = await token_manager.get_user_info(keycloak_access_token)
|
||||
logger.debug(f'user_info: {user_info}')
|
||||
if ROLE_CHECK_ENABLED and 'roles' not in user_info:
|
||||
if ROLE_CHECK_ENABLED and user_info.roles is None:
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
content={'error': 'Missing required role'},
|
||||
)
|
||||
|
||||
if 'sub' not in user_info or 'preferred_username' not in user_info:
|
||||
if user_info.preferred_username is None:
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
content={'error': 'Missing user ID or username in response'},
|
||||
)
|
||||
|
||||
email = user_info.get('email')
|
||||
user_id = user_info['sub']
|
||||
email = user_info.email
|
||||
user_id = user_info.sub
|
||||
user_info_dict = user_info.model_dump(exclude_none=True)
|
||||
user = await UserStore.get_user_by_id_async(user_id)
|
||||
if not user:
|
||||
user = await UserStore.create_user(user_id, user_info)
|
||||
user = await UserStore.create_user(user_id, user_info_dict)
|
||||
else:
|
||||
# Existing user — gradually backfill contact_name if it still has a username-style value
|
||||
await UserStore.backfill_contact_name(user_id, user_info)
|
||||
await UserStore.backfill_user_email(user_id, user_info)
|
||||
await UserStore.backfill_contact_name(user_id, user_info_dict)
|
||||
await UserStore.backfill_user_email(user_id, user_info_dict)
|
||||
|
||||
if not user:
|
||||
logger.error(f'Failed to authenticate user {user_info["preferred_username"]}')
|
||||
logger.error(f'Failed to authenticate user {user_info.preferred_username}')
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
content={
|
||||
'error': f'Failed to authenticate user {user_info["preferred_username"]}'
|
||||
'error': f'Failed to authenticate user {user_info.preferred_username}'
|
||||
},
|
||||
)
|
||||
|
||||
@@ -323,7 +324,7 @@ async def keycloak_callback(
|
||||
)
|
||||
|
||||
# Check email verification status
|
||||
email_verified = user_info.get('email_verified', False)
|
||||
email_verified = user_info.email_verified or False
|
||||
if not email_verified:
|
||||
# Send verification email
|
||||
# Import locally to avoid circular import with email.py
|
||||
@@ -341,7 +342,7 @@ async def keycloak_callback(
|
||||
|
||||
# default to github IDP for now.
|
||||
# TODO: remove default once Keycloak is updated universally with the new attribute.
|
||||
idp: str = user_info.get('identity_provider', ProviderType.GITHUB.value)
|
||||
idp: str = user_info.identity_provider or ProviderType.GITHUB.value
|
||||
logger.info(f'Full IDP is {idp}')
|
||||
idp_type = 'oidc'
|
||||
if ':' in idp:
|
||||
@@ -352,7 +353,7 @@ async def keycloak_callback(
|
||||
ProviderType(idp), user_id, keycloak_access_token
|
||||
)
|
||||
|
||||
username = user_info['preferred_username']
|
||||
username = user_info.preferred_username
|
||||
if user_verifier.is_active() and not user_verifier.is_user_allowed(username):
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
@@ -360,7 +361,7 @@ async def keycloak_callback(
|
||||
)
|
||||
|
||||
valid_offline_token = (
|
||||
await token_manager.validate_offline_token(user_id=user_info['sub'])
|
||||
await token_manager.validate_offline_token(user_id=user_info.sub)
|
||||
if idp_type != 'saml'
|
||||
else True
|
||||
)
|
||||
@@ -541,14 +542,10 @@ async def keycloak_offline_callback(code: str, state: str, request: Request):
|
||||
|
||||
user_info = await token_manager.get_user_info(keycloak_access_token)
|
||||
logger.debug(f'user_info: {user_info}')
|
||||
if 'sub' not in user_info:
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
content={'error': 'Missing Keycloak ID in response'},
|
||||
)
|
||||
# sub is a required field in KeycloakUserInfo, validation happens in get_user_info
|
||||
|
||||
await token_manager.store_offline_token(
|
||||
user_id=user_info['sub'], offline_token=keycloak_refresh_token
|
||||
user_id=user_info.sub, offline_token=keycloak_refresh_token
|
||||
)
|
||||
|
||||
redirect_url, _, _ = _extract_oauth_state(state)
|
||||
|
||||
@@ -93,6 +93,16 @@ async def _process_batch_operations_background(
|
||||
)
|
||||
continue # Skip this operation but continue with others
|
||||
|
||||
if user_id is None:
|
||||
logger.error(
|
||||
'user_id_not_set_in_batch_webhook',
|
||||
extra={
|
||||
'conversation_id': conversation_id,
|
||||
'path': batch_op.path,
|
||||
},
|
||||
)
|
||||
continue
|
||||
|
||||
if subpath == 'agent_state.pkl':
|
||||
update_agent_state(user_id, conversation_id, batch_op.get_content())
|
||||
continue
|
||||
|
||||
@@ -196,7 +196,7 @@ async def keycloak_callback(
|
||||
)
|
||||
|
||||
user_info = await token_manager.get_user_info(keycloak_access_token)
|
||||
keycloak_user_id = user_info['sub']
|
||||
keycloak_user_id = user_info.sub
|
||||
user = await UserStore.get_user_by_id_async(keycloak_user_id)
|
||||
if not user:
|
||||
return _html_response(
|
||||
@@ -208,7 +208,7 @@ async def keycloak_callback(
|
||||
# These tokens are offline access tokens - store them!
|
||||
await token_manager.store_offline_token(keycloak_user_id, keycloak_refresh_token)
|
||||
|
||||
idp: str = user_info.get('identity_provider', ProviderType.GITHUB)
|
||||
idp: str = user_info.identity_provider or ProviderType.GITHUB.value
|
||||
idp_type = 'oidc'
|
||||
if ':' in idp:
|
||||
idp, idp_type = idp.rsplit(':', 1)
|
||||
|
||||
@@ -111,30 +111,26 @@ async def saas_get_user(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
)
|
||||
user_info = await token_manager.get_user_info(access_token.get_secret_value())
|
||||
if not user_info:
|
||||
return JSONResponse(
|
||||
content='Failed to retrieve user_info.',
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
)
|
||||
# Prefer email from DB; fall back to Keycloak if not yet persisted
|
||||
email = user_info.get('email') if user_info else None
|
||||
sub = user_info.get('sub') if user_info else ''
|
||||
email = user_info.email
|
||||
sub = user_info.sub
|
||||
if sub:
|
||||
db_user = await UserStore.get_user_by_id_async(sub)
|
||||
if db_user and db_user.email is not None:
|
||||
email = db_user.email
|
||||
|
||||
user_info_dict = user_info.model_dump(exclude_none=True)
|
||||
retval = await _check_idp(
|
||||
access_token=access_token,
|
||||
default_value=User(
|
||||
id=sub,
|
||||
login=(user_info.get('preferred_username') if user_info else '') or '',
|
||||
login=user_info.preferred_username or '',
|
||||
avatar_url='',
|
||||
email=email,
|
||||
name=resolve_display_name(user_info) if user_info else None,
|
||||
company=user_info.get('company') if user_info else None,
|
||||
name=resolve_display_name(user_info_dict),
|
||||
company=user_info.company,
|
||||
),
|
||||
user_info=user_info,
|
||||
user_info=user_info_dict,
|
||||
)
|
||||
if retval is not None:
|
||||
return retval
|
||||
@@ -364,16 +360,11 @@ async def _check_idp(
|
||||
content='User is not authenticated.',
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
)
|
||||
user_info = (
|
||||
user_info
|
||||
if user_info
|
||||
else await token_manager.get_user_info(access_token.get_secret_value())
|
||||
)
|
||||
if not user_info:
|
||||
return JSONResponse(
|
||||
content='Failed to retrieve user_info.',
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
if user_info is None:
|
||||
user_info_model = await token_manager.get_user_info(
|
||||
access_token.get_secret_value()
|
||||
)
|
||||
user_info = user_info_model.model_dump(exclude_none=True)
|
||||
idp: str | None = user_info.get('identity_provider')
|
||||
if not idp:
|
||||
return JSONResponse(
|
||||
|
||||
@@ -102,7 +102,7 @@ class AuthTokenStore:
|
||||
async def load_tokens(
|
||||
self,
|
||||
check_expiration_and_refresh: Callable[
|
||||
[ProviderType, str, int, int], Awaitable[Dict[str, str | int]]
|
||||
[ProviderType, str, int, int], Awaitable[Dict[str, str | int] | None]
|
||||
]
|
||||
| None = None,
|
||||
) -> Dict[str, str | int] | None:
|
||||
|
||||
@@ -136,15 +136,9 @@ class SaasConversationValidator(ConversationValidator):
|
||||
raise ConnectionRefusedError('SESSION$TIMEOUT_MESSAGE')
|
||||
if access_token is None:
|
||||
raise AuthError('no_access_token')
|
||||
user_info_dict = await token_manager.get_user_info(
|
||||
access_token.get_secret_value()
|
||||
)
|
||||
if not user_info_dict or 'sub' not in user_info_dict:
|
||||
logger.info(
|
||||
f'Invalid user_info {user_info_dict} for access token {access_token}'
|
||||
)
|
||||
raise RuntimeError('Invalid user_info')
|
||||
user_id = user_info_dict['sub']
|
||||
user_info = await token_manager.get_user_info(access_token.get_secret_value())
|
||||
# sub is a required field in KeycloakUserInfo, validation happens in get_user_info
|
||||
user_id = user_info.sub
|
||||
|
||||
logger.info(f'User {user_id} is connecting to conversation {conversation_id}')
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ from datetime import datetime
|
||||
from uuid import UUID
|
||||
|
||||
import pytest
|
||||
from server.auth.token_manager import KeycloakUserInfo
|
||||
from server.constants import ORG_SETTINGS_VERSION
|
||||
from server.verified_models.verified_model_service import (
|
||||
StoredVerifiedModel, # noqa: F401
|
||||
@@ -36,6 +37,26 @@ from storage.stripe_customer import StripeCustomer
|
||||
from storage.user import User
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def create_keycloak_user_info():
|
||||
"""Fixture that returns a factory function to create KeycloakUserInfo models.
|
||||
|
||||
Usage:
|
||||
def test_example(create_keycloak_user_info):
|
||||
user_info = create_keycloak_user_info(sub='user123', email='test@example.com')
|
||||
"""
|
||||
|
||||
def _create(**kwargs) -> KeycloakUserInfo:
|
||||
defaults = {
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
}
|
||||
defaults.update(kwargs)
|
||||
return KeycloakUserInfo(**defaults)
|
||||
|
||||
return _create
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def db_path(tmp_path):
|
||||
"""Create a unique temp file path for each test."""
|
||||
|
||||
@@ -105,15 +105,20 @@ async def test_keycloak_callback_token_retrieval_failure(mock_request):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keycloak_callback_missing_user_info(mock_request):
|
||||
"""Test keycloak_callback when user info is missing required fields."""
|
||||
async def test_keycloak_callback_missing_user_info(
|
||||
mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test keycloak_callback when user info is missing preferred_username."""
|
||||
with patch('server.routes.auth.token_manager') as mock_token_manager:
|
||||
mock_token_manager.get_keycloak_tokens = AsyncMock(
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
# Return KeycloakUserInfo with sub but without preferred_username
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={'some_field': 'value'}
|
||||
) # Missing 'sub' and 'preferred_username'
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id', preferred_username=None
|
||||
)
|
||||
)
|
||||
|
||||
result = await keycloak_callback(
|
||||
code='test_code', state='test_state', request=mock_request
|
||||
@@ -126,7 +131,9 @@ async def test_keycloak_callback_missing_user_info(mock_request):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keycloak_callback_user_not_allowed(mock_request):
|
||||
async def test_keycloak_callback_user_not_allowed(
|
||||
mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test keycloak_callback when user is not allowed by verifier."""
|
||||
with (
|
||||
patch('server.routes.auth.token_manager') as mock_token_manager,
|
||||
@@ -137,12 +144,12 @@ async def test_keycloak_callback_user_not_allowed(mock_request):
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'identity_provider': 'github',
|
||||
'email_verified': True,
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
identity_provider='github',
|
||||
email_verified=True,
|
||||
)
|
||||
)
|
||||
mock_token_manager.store_idp_tokens = AsyncMock()
|
||||
|
||||
@@ -172,7 +179,9 @@ async def test_keycloak_callback_user_not_allowed(mock_request):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keycloak_callback_success_with_valid_offline_token(mock_request):
|
||||
async def test_keycloak_callback_success_with_valid_offline_token(
|
||||
mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test successful keycloak_callback with valid offline token."""
|
||||
with (
|
||||
patch('server.routes.auth.token_manager') as mock_token_manager,
|
||||
@@ -198,12 +207,12 @@ async def test_keycloak_callback_success_with_valid_offline_token(mock_request):
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'identity_provider': 'github',
|
||||
'email_verified': True,
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
identity_provider='github',
|
||||
email_verified=True,
|
||||
)
|
||||
)
|
||||
mock_token_manager.store_idp_tokens = AsyncMock()
|
||||
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
|
||||
@@ -234,7 +243,9 @@ async def test_keycloak_callback_success_with_valid_offline_token(mock_request):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keycloak_callback_email_not_verified(mock_request):
|
||||
async def test_keycloak_callback_email_not_verified(
|
||||
mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test keycloak_callback when email is not verified."""
|
||||
# Arrange
|
||||
mock_verify_email = AsyncMock()
|
||||
@@ -248,12 +259,12 @@ async def test_keycloak_callback_email_not_verified(mock_request):
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'identity_provider': 'github',
|
||||
'email_verified': False,
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
identity_provider='github',
|
||||
email_verified=False,
|
||||
)
|
||||
)
|
||||
mock_token_manager.store_idp_tokens = AsyncMock()
|
||||
mock_verifier.is_active.return_value = False
|
||||
@@ -283,7 +294,9 @@ async def test_keycloak_callback_email_not_verified(mock_request):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keycloak_callback_email_not_verified_missing_field(mock_request):
|
||||
async def test_keycloak_callback_email_not_verified_missing_field(
|
||||
mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test keycloak_callback when email_verified field is missing (defaults to False)."""
|
||||
# Arrange
|
||||
mock_verify_email = AsyncMock()
|
||||
@@ -297,12 +310,12 @@ async def test_keycloak_callback_email_not_verified_missing_field(mock_request):
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'identity_provider': 'github',
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
identity_provider='github',
|
||||
# email_verified field is missing
|
||||
}
|
||||
)
|
||||
)
|
||||
mock_token_manager.store_idp_tokens = AsyncMock()
|
||||
mock_verifier.is_active.return_value = False
|
||||
@@ -332,7 +345,9 @@ async def test_keycloak_callback_email_not_verified_missing_field(mock_request):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keycloak_callback_success_without_offline_token(mock_request):
|
||||
async def test_keycloak_callback_success_without_offline_token(
|
||||
mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test successful keycloak_callback without valid offline token."""
|
||||
with (
|
||||
patch('server.routes.auth.token_manager') as mock_token_manager,
|
||||
@@ -363,12 +378,12 @@ async def test_keycloak_callback_success_without_offline_token(mock_request):
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'identity_provider': 'github',
|
||||
'email_verified': True,
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
identity_provider='github',
|
||||
email_verified=True,
|
||||
)
|
||||
)
|
||||
mock_token_manager.store_idp_tokens = AsyncMock()
|
||||
# Set validate_offline_token to return False to test the "without offline token" scenario
|
||||
@@ -448,33 +463,42 @@ async def test_keycloak_offline_callback_token_retrieval_failure(mock_request):
|
||||
@pytest.mark.asyncio
|
||||
async def test_keycloak_offline_callback_missing_user_info(mock_request):
|
||||
"""Test keycloak_offline_callback when user info is missing required fields."""
|
||||
from pydantic import ValidationError
|
||||
|
||||
with patch('server.routes.auth.token_manager') as mock_token_manager:
|
||||
mock_token_manager.get_keycloak_tokens = AsyncMock(
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
# With Pydantic model, missing 'sub' raises ValidationError during get_user_info
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={'some_field': 'value'}
|
||||
) # Missing 'sub'
|
||||
|
||||
result = await keycloak_offline_callback(
|
||||
'test_code', 'test_state', mock_request
|
||||
side_effect=ValidationError.from_exception_data(
|
||||
'KeycloakUserInfo',
|
||||
[
|
||||
{
|
||||
'type': 'missing',
|
||||
'loc': ('sub',),
|
||||
'input': {'some_field': 'value'},
|
||||
}
|
||||
],
|
||||
)
|
||||
)
|
||||
|
||||
assert isinstance(result, JSONResponse)
|
||||
assert result.status_code == status.HTTP_400_BAD_REQUEST
|
||||
assert 'error' in result.body.decode()
|
||||
assert 'Missing Keycloak ID' in result.body.decode()
|
||||
# The endpoint should propagate the error (or handle it gracefully)
|
||||
with pytest.raises(ValidationError):
|
||||
await keycloak_offline_callback('test_code', 'test_state', mock_request)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keycloak_offline_callback_success(mock_request):
|
||||
async def test_keycloak_offline_callback_success(
|
||||
mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test successful keycloak_offline_callback."""
|
||||
with patch('server.routes.auth.token_manager') as mock_token_manager:
|
||||
mock_token_manager.get_keycloak_tokens = AsyncMock(
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={'sub': 'test_user_id'}
|
||||
return_value=create_keycloak_user_info(sub='test_user_id')
|
||||
)
|
||||
mock_token_manager.store_idp_tokens = AsyncMock()
|
||||
mock_token_manager.store_offline_token = AsyncMock()
|
||||
@@ -565,7 +589,9 @@ async def test_logout_without_refresh_token():
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keycloak_callback_blocked_email_domain(mock_request):
|
||||
async def test_keycloak_callback_blocked_email_domain(
|
||||
mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test keycloak_callback when email domain is blocked."""
|
||||
# Arrange
|
||||
with (
|
||||
@@ -577,12 +603,12 @@ async def test_keycloak_callback_blocked_email_domain(mock_request):
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'email': 'user@colsch.us',
|
||||
'identity_provider': 'github',
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
email='user@colsch.us',
|
||||
identity_provider='github',
|
||||
)
|
||||
)
|
||||
mock_token_manager.disable_keycloak_user = AsyncMock()
|
||||
|
||||
@@ -615,7 +641,9 @@ async def test_keycloak_callback_blocked_email_domain(mock_request):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keycloak_callback_allowed_email_domain(mock_request):
|
||||
async def test_keycloak_callback_allowed_email_domain(
|
||||
mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test keycloak_callback when email domain is not blocked."""
|
||||
# Arrange
|
||||
with (
|
||||
@@ -639,13 +667,13 @@ async def test_keycloak_callback_allowed_email_domain(mock_request):
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'email': 'user@example.com',
|
||||
'identity_provider': 'github',
|
||||
'email_verified': True,
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
email='user@example.com',
|
||||
identity_provider='github',
|
||||
email_verified=True,
|
||||
)
|
||||
)
|
||||
mock_token_manager.store_idp_tokens = AsyncMock()
|
||||
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
|
||||
@@ -680,7 +708,9 @@ async def test_keycloak_callback_allowed_email_domain(mock_request):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keycloak_callback_domain_blocking_inactive(mock_request):
|
||||
async def test_keycloak_callback_domain_blocking_inactive(
|
||||
mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test keycloak_callback when email domain is not blocked."""
|
||||
# Arrange
|
||||
with (
|
||||
@@ -704,13 +734,13 @@ async def test_keycloak_callback_domain_blocking_inactive(mock_request):
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'email': 'user@colsch.us',
|
||||
'identity_provider': 'github',
|
||||
'email_verified': True,
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
email='user@colsch.us',
|
||||
identity_provider='github',
|
||||
email_verified=True,
|
||||
)
|
||||
)
|
||||
mock_token_manager.store_idp_tokens = AsyncMock()
|
||||
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
|
||||
@@ -743,7 +773,7 @@ async def test_keycloak_callback_domain_blocking_inactive(mock_request):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keycloak_callback_missing_email(mock_request):
|
||||
async def test_keycloak_callback_missing_email(mock_request, create_keycloak_user_info):
|
||||
"""Test keycloak_callback when user info does not contain email."""
|
||||
# Arrange
|
||||
with (
|
||||
@@ -767,13 +797,13 @@ async def test_keycloak_callback_missing_email(mock_request):
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'identity_provider': 'github',
|
||||
'email_verified': True,
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
identity_provider='github',
|
||||
email_verified=True,
|
||||
# No email field
|
||||
}
|
||||
)
|
||||
)
|
||||
mock_token_manager.store_idp_tokens = AsyncMock()
|
||||
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
|
||||
@@ -805,7 +835,9 @@ async def test_keycloak_callback_missing_email(mock_request):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keycloak_callback_duplicate_email_detected(mock_request):
|
||||
async def test_keycloak_callback_duplicate_email_detected(
|
||||
mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test keycloak_callback when duplicate email is detected."""
|
||||
with (
|
||||
patch('server.routes.auth.token_manager') as mock_token_manager,
|
||||
@@ -816,12 +848,12 @@ async def test_keycloak_callback_duplicate_email_detected(mock_request):
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'email': 'joe+test@example.com',
|
||||
'identity_provider': 'github',
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
email='joe+test@example.com',
|
||||
identity_provider='github',
|
||||
)
|
||||
)
|
||||
mock_token_manager.check_duplicate_base_email = AsyncMock(return_value=True)
|
||||
mock_token_manager.delete_keycloak_user = AsyncMock(return_value=True)
|
||||
@@ -851,7 +883,9 @@ async def test_keycloak_callback_duplicate_email_detected(mock_request):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keycloak_callback_duplicate_email_deletion_fails(mock_request):
|
||||
async def test_keycloak_callback_duplicate_email_deletion_fails(
|
||||
mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test keycloak_callback when duplicate is detected but deletion fails."""
|
||||
with (
|
||||
patch('server.routes.auth.token_manager') as mock_token_manager,
|
||||
@@ -862,12 +896,12 @@ async def test_keycloak_callback_duplicate_email_deletion_fails(mock_request):
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'email': 'joe+test@example.com',
|
||||
'identity_provider': 'github',
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
email='joe+test@example.com',
|
||||
identity_provider='github',
|
||||
)
|
||||
)
|
||||
mock_token_manager.check_duplicate_base_email = AsyncMock(return_value=True)
|
||||
mock_token_manager.delete_keycloak_user = AsyncMock(return_value=False)
|
||||
@@ -894,7 +928,9 @@ async def test_keycloak_callback_duplicate_email_deletion_fails(mock_request):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keycloak_callback_duplicate_check_exception(mock_request):
|
||||
async def test_keycloak_callback_duplicate_check_exception(
|
||||
mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test keycloak_callback when duplicate check raises exception."""
|
||||
with (
|
||||
patch('server.routes.auth.token_manager') as mock_token_manager,
|
||||
@@ -916,13 +952,13 @@ async def test_keycloak_callback_duplicate_check_exception(mock_request):
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'email': 'joe+test@example.com',
|
||||
'identity_provider': 'github',
|
||||
'email_verified': True,
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
email='joe+test@example.com',
|
||||
identity_provider='github',
|
||||
email_verified=True,
|
||||
)
|
||||
)
|
||||
mock_token_manager.check_duplicate_base_email = AsyncMock(
|
||||
side_effect=Exception('Check failed')
|
||||
@@ -955,7 +991,9 @@ async def test_keycloak_callback_duplicate_check_exception(mock_request):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keycloak_callback_no_duplicate_email(mock_request):
|
||||
async def test_keycloak_callback_no_duplicate_email(
|
||||
mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test keycloak_callback when no duplicate email is found."""
|
||||
with (
|
||||
patch('server.routes.auth.token_manager') as mock_token_manager,
|
||||
@@ -977,13 +1015,13 @@ async def test_keycloak_callback_no_duplicate_email(mock_request):
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'email': 'joe+test@example.com',
|
||||
'identity_provider': 'github',
|
||||
'email_verified': True,
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
email='joe+test@example.com',
|
||||
identity_provider='github',
|
||||
email_verified=True,
|
||||
)
|
||||
)
|
||||
mock_token_manager.check_duplicate_base_email = AsyncMock(return_value=False)
|
||||
mock_token_manager.store_idp_tokens = AsyncMock()
|
||||
@@ -1018,7 +1056,9 @@ async def test_keycloak_callback_no_duplicate_email(mock_request):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keycloak_callback_no_email_in_user_info(mock_request):
|
||||
async def test_keycloak_callback_no_email_in_user_info(
|
||||
mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test keycloak_callback when email is not in user_info."""
|
||||
with (
|
||||
patch('server.routes.auth.token_manager') as mock_token_manager,
|
||||
@@ -1040,13 +1080,13 @@ async def test_keycloak_callback_no_email_in_user_info(mock_request):
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
# No email field
|
||||
'identity_provider': 'github',
|
||||
'email_verified': True,
|
||||
}
|
||||
identity_provider='github',
|
||||
email_verified=True,
|
||||
)
|
||||
)
|
||||
mock_token_manager.store_idp_tokens = AsyncMock()
|
||||
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
|
||||
@@ -1154,7 +1194,7 @@ class TestKeycloakCallbackRecaptcha:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_verify_recaptcha_and_allow_login_when_score_is_high(
|
||||
self, mock_request
|
||||
self, mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test that login proceeds when reCAPTCHA score is high."""
|
||||
# Arrange
|
||||
@@ -1195,13 +1235,13 @@ class TestKeycloakCallbackRecaptcha:
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'email': 'user@example.com',
|
||||
'identity_provider': 'github',
|
||||
'email_verified': True,
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
email='user@example.com',
|
||||
identity_provider='github',
|
||||
email_verified=True,
|
||||
)
|
||||
)
|
||||
mock_token_manager.store_idp_tokens = AsyncMock()
|
||||
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
|
||||
@@ -1240,7 +1280,9 @@ class TestKeycloakCallbackRecaptcha:
|
||||
mock_recaptcha_service.create_assessment.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_block_login_when_recaptcha_score_is_low(self, mock_request):
|
||||
async def test_should_block_login_when_recaptcha_score_is_low(
|
||||
self, mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test that login is blocked and redirected when reCAPTCHA score is low."""
|
||||
# Arrange
|
||||
state_data = {
|
||||
@@ -1266,11 +1308,11 @@ class TestKeycloakCallbackRecaptcha:
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'email': 'user@example.com',
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
email='user@example.com',
|
||||
)
|
||||
)
|
||||
mock_token_manager.check_duplicate_base_email = AsyncMock(
|
||||
return_value=False
|
||||
@@ -1303,7 +1345,9 @@ class TestKeycloakCallbackRecaptcha:
|
||||
assert 'recaptcha_blocked=true' in result.headers['location']
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_extract_ip_from_x_forwarded_for_header(self, mock_request):
|
||||
async def test_should_extract_ip_from_x_forwarded_for_header(
|
||||
self, mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test that IP is extracted from X-Forwarded-For header when present."""
|
||||
# Arrange
|
||||
state_data = {
|
||||
@@ -1345,13 +1389,13 @@ class TestKeycloakCallbackRecaptcha:
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'email': 'user@example.com',
|
||||
'identity_provider': 'github',
|
||||
'email_verified': True,
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
email='user@example.com',
|
||||
identity_provider='github',
|
||||
email_verified=True,
|
||||
)
|
||||
)
|
||||
mock_token_manager.store_idp_tokens = AsyncMock()
|
||||
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
|
||||
@@ -1390,7 +1434,7 @@ class TestKeycloakCallbackRecaptcha:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_use_client_host_when_x_forwarded_for_missing(
|
||||
self, mock_request
|
||||
self, mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test that client.host is used when X-Forwarded-For is missing."""
|
||||
# Arrange
|
||||
@@ -1434,13 +1478,13 @@ class TestKeycloakCallbackRecaptcha:
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'email': 'user@example.com',
|
||||
'identity_provider': 'github',
|
||||
'email_verified': True,
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
email='user@example.com',
|
||||
identity_provider='github',
|
||||
email_verified=True,
|
||||
)
|
||||
)
|
||||
mock_token_manager.store_idp_tokens = AsyncMock()
|
||||
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
|
||||
@@ -1478,7 +1522,9 @@ class TestKeycloakCallbackRecaptcha:
|
||||
assert call_args[1]['user_ip'] == '192.168.1.2'
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_use_unknown_ip_when_client_is_none(self, mock_request):
|
||||
async def test_should_use_unknown_ip_when_client_is_none(
|
||||
self, mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test that 'unknown' IP is used when client is None."""
|
||||
# Arrange
|
||||
state_data = {
|
||||
@@ -1520,13 +1566,13 @@ class TestKeycloakCallbackRecaptcha:
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'email': 'user@example.com',
|
||||
'identity_provider': 'github',
|
||||
'email_verified': True,
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
email='user@example.com',
|
||||
identity_provider='github',
|
||||
email_verified=True,
|
||||
)
|
||||
)
|
||||
mock_token_manager.store_idp_tokens = AsyncMock()
|
||||
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
|
||||
@@ -1565,7 +1611,7 @@ class TestKeycloakCallbackRecaptcha:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_include_email_in_assessment_when_available(
|
||||
self, mock_request
|
||||
self, mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test that email is included in assessment when available."""
|
||||
# Arrange
|
||||
@@ -1605,13 +1651,13 @@ class TestKeycloakCallbackRecaptcha:
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'email': 'user@example.com',
|
||||
'identity_provider': 'github',
|
||||
'email_verified': True,
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
email='user@example.com',
|
||||
identity_provider='github',
|
||||
email_verified=True,
|
||||
)
|
||||
)
|
||||
mock_token_manager.store_idp_tokens = AsyncMock()
|
||||
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
|
||||
@@ -1650,7 +1696,7 @@ class TestKeycloakCallbackRecaptcha:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_skip_recaptcha_when_site_key_not_configured(
|
||||
self, mock_request
|
||||
self, mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test that reCAPTCHA is skipped when RECAPTCHA_SITE_KEY is not configured."""
|
||||
# Arrange
|
||||
@@ -1687,13 +1733,13 @@ class TestKeycloakCallbackRecaptcha:
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'email': 'user@example.com',
|
||||
'identity_provider': 'github',
|
||||
'email_verified': True,
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
email='user@example.com',
|
||||
identity_provider='github',
|
||||
email_verified=True,
|
||||
)
|
||||
)
|
||||
mock_token_manager.store_idp_tokens = AsyncMock()
|
||||
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
|
||||
@@ -1725,7 +1771,9 @@ class TestKeycloakCallbackRecaptcha:
|
||||
mock_recaptcha_service.create_assessment.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_skip_recaptcha_when_token_is_missing(self, mock_request):
|
||||
async def test_should_skip_recaptcha_when_token_is_missing(
|
||||
self, mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test that reCAPTCHA is skipped when token is missing from state."""
|
||||
# Arrange
|
||||
state = 'https://example.com' # Old format without token
|
||||
@@ -1755,13 +1803,13 @@ class TestKeycloakCallbackRecaptcha:
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'email': 'user@example.com',
|
||||
'identity_provider': 'github',
|
||||
'email_verified': True,
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
email='user@example.com',
|
||||
identity_provider='github',
|
||||
email_verified=True,
|
||||
)
|
||||
)
|
||||
mock_token_manager.store_idp_tokens = AsyncMock()
|
||||
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
|
||||
@@ -1792,7 +1840,7 @@ class TestKeycloakCallbackRecaptcha:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_fail_open_when_recaptcha_service_throws_exception(
|
||||
self, mock_request
|
||||
self, mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test that login proceeds (fail open) when reCAPTCHA service throws exception."""
|
||||
# Arrange
|
||||
@@ -1829,13 +1877,13 @@ class TestKeycloakCallbackRecaptcha:
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'email': 'user@example.com',
|
||||
'identity_provider': 'github',
|
||||
'email_verified': True,
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
email='user@example.com',
|
||||
identity_provider='github',
|
||||
email_verified=True,
|
||||
)
|
||||
)
|
||||
mock_token_manager.store_idp_tokens = AsyncMock()
|
||||
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
|
||||
@@ -1878,7 +1926,9 @@ class TestKeycloakCallbackRecaptcha:
|
||||
assert len(recaptcha_error_calls) > 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_log_warning_when_recaptcha_blocks_user(self, mock_request):
|
||||
async def test_should_log_warning_when_recaptcha_blocks_user(
|
||||
self, mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""Test that warning is logged when reCAPTCHA blocks user."""
|
||||
# Arrange
|
||||
state_data = {
|
||||
@@ -1906,11 +1956,11 @@ class TestKeycloakCallbackRecaptcha:
|
||||
return_value=('test_access_token', 'test_refresh_token')
|
||||
)
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'email': 'user@example.com',
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
email='user@example.com',
|
||||
)
|
||||
)
|
||||
mock_token_manager.check_duplicate_base_email = AsyncMock(
|
||||
return_value=False
|
||||
@@ -1947,16 +1997,16 @@ class TestKeycloakCallbackRecaptcha:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keycloak_callback_calls_backfill_user_email_for_existing_user(
|
||||
mock_request,
|
||||
mock_request, create_keycloak_user_info
|
||||
):
|
||||
"""When an existing user logs in, backfill_user_email should be called."""
|
||||
user_info = {
|
||||
'sub': 'test_user_id',
|
||||
'preferred_username': 'test_user',
|
||||
'identity_provider': 'github',
|
||||
'email': 'test@example.com',
|
||||
'email_verified': True,
|
||||
}
|
||||
user_info = create_keycloak_user_info(
|
||||
sub='test_user_id',
|
||||
preferred_username='test_user',
|
||||
identity_provider='github',
|
||||
email='test@example.com',
|
||||
email_verified=True,
|
||||
)
|
||||
|
||||
with (
|
||||
patch('server.routes.auth.token_manager') as mock_token_manager,
|
||||
@@ -1993,9 +2043,9 @@ async def test_keycloak_callback_calls_backfill_user_email_for_existing_user(
|
||||
assert isinstance(result, RedirectResponse)
|
||||
assert result.status_code == 302
|
||||
|
||||
# backfill_user_email should have been called with the user_id and user_info
|
||||
# backfill_user_email should have been called with the user_id and user_info dict
|
||||
mock_user_store.backfill_user_email.assert_called_once_with(
|
||||
'test_user_id', user_info
|
||||
'test_user_id', user_info.model_dump(exclude_none=True)
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -147,6 +147,8 @@ async def test_verify_keycloak_token_refresh(token_manager):
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_user_info(token_manager):
|
||||
"""Test getting user info from a Keycloak token."""
|
||||
from server.auth.token_manager import KeycloakUserInfo
|
||||
|
||||
mock_user_info = {
|
||||
'sub': 'test_user_id',
|
||||
'name': 'Test User',
|
||||
@@ -158,7 +160,11 @@ async def test_get_user_info(token_manager):
|
||||
|
||||
user_info = await token_manager.get_user_info('test_access_token')
|
||||
|
||||
assert user_info == mock_user_info
|
||||
# Now returns KeycloakUserInfo Pydantic model instead of dict
|
||||
assert isinstance(user_info, KeycloakUserInfo)
|
||||
assert user_info.sub == 'test_user_id'
|
||||
assert user_info.name == 'Test User'
|
||||
assert user_info.email == 'test@example.com'
|
||||
mock_keycloak.return_value.a_userinfo.assert_called_once_with(
|
||||
'test_access_token'
|
||||
)
|
||||
@@ -167,9 +173,17 @@ async def test_get_user_info(token_manager):
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_user_info_empty_token(token_manager):
|
||||
"""Test handling of empty token when getting user info."""
|
||||
user_info = await token_manager.get_user_info('')
|
||||
from keycloak.exceptions import KeycloakAuthenticationError
|
||||
|
||||
assert user_info == {}
|
||||
with patch('server.auth.token_manager.get_keycloak_openid') as mock_keycloak:
|
||||
mock_keycloak.return_value.a_userinfo = AsyncMock(
|
||||
side_effect=KeycloakAuthenticationError('Invalid token')
|
||||
)
|
||||
|
||||
with pytest.raises(KeycloakAuthenticationError):
|
||||
await token_manager.get_user_info('')
|
||||
|
||||
mock_keycloak.return_value.a_userinfo.assert_called_once_with('')
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -203,12 +217,12 @@ async def test_store_idp_tokens(token_manager):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_idp_token(token_manager):
|
||||
async def test_get_idp_token(token_manager, create_keycloak_user_info):
|
||||
"""Test getting an identity provider token."""
|
||||
with (
|
||||
patch(
|
||||
'server.auth.token_manager.TokenManager.get_user_info',
|
||||
AsyncMock(return_value={'sub': 'test_user_id'}),
|
||||
AsyncMock(return_value=create_keycloak_user_info(sub='test_user_id')),
|
||||
),
|
||||
patch('server.auth.token_manager.AuthTokenStore') as mock_token_store_cls,
|
||||
):
|
||||
|
||||
@@ -46,18 +46,18 @@ def mock_user_store():
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fallback_user_includes_name_from_name_claim(
|
||||
mock_token_manager, mock_check_idp, mock_user_store
|
||||
mock_token_manager, mock_check_idp, mock_user_store, create_keycloak_user_info
|
||||
):
|
||||
"""When Keycloak provides a 'name' claim, the fallback User should include it."""
|
||||
from server.routes.user import saas_get_user
|
||||
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': '248289761001',
|
||||
'name': 'Jane Doe',
|
||||
'preferred_username': 'j.doe',
|
||||
'email': 'jane@example.com',
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='248289761001',
|
||||
name='Jane Doe',
|
||||
preferred_username='j.doe',
|
||||
email='jane@example.com',
|
||||
)
|
||||
)
|
||||
|
||||
result = await saas_get_user(
|
||||
@@ -73,19 +73,19 @@ async def test_fallback_user_includes_name_from_name_claim(
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fallback_user_combines_given_and_family_name(
|
||||
mock_token_manager, mock_check_idp, mock_user_store
|
||||
mock_token_manager, mock_check_idp, mock_user_store, create_keycloak_user_info
|
||||
):
|
||||
"""When 'name' is absent, combine given_name + family_name."""
|
||||
from server.routes.user import saas_get_user
|
||||
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': '248289761001',
|
||||
'given_name': 'Jane',
|
||||
'family_name': 'Doe',
|
||||
'preferred_username': 'j.doe',
|
||||
'email': 'jane@example.com',
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='248289761001',
|
||||
given_name='Jane',
|
||||
family_name='Doe',
|
||||
preferred_username='j.doe',
|
||||
email='jane@example.com',
|
||||
)
|
||||
)
|
||||
|
||||
result = await saas_get_user(
|
||||
@@ -100,17 +100,17 @@ async def test_fallback_user_combines_given_and_family_name(
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fallback_user_name_is_none_when_no_name_claims(
|
||||
mock_token_manager, mock_check_idp, mock_user_store
|
||||
mock_token_manager, mock_check_idp, mock_user_store, create_keycloak_user_info
|
||||
):
|
||||
"""When no name claims exist, name should be None."""
|
||||
from server.routes.user import saas_get_user
|
||||
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': '248289761001',
|
||||
'preferred_username': 'j.doe',
|
||||
'email': 'jane@example.com',
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='248289761001',
|
||||
preferred_username='j.doe',
|
||||
email='jane@example.com',
|
||||
)
|
||||
)
|
||||
|
||||
result = await saas_get_user(
|
||||
@@ -125,19 +125,19 @@ async def test_fallback_user_name_is_none_when_no_name_claims(
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fallback_user_includes_company_claim(
|
||||
mock_token_manager, mock_check_idp, mock_user_store
|
||||
mock_token_manager, mock_check_idp, mock_user_store, create_keycloak_user_info
|
||||
):
|
||||
"""When Keycloak provides a 'company' claim, include it in the User."""
|
||||
from server.routes.user import saas_get_user
|
||||
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': '248289761001',
|
||||
'name': 'Jane Doe',
|
||||
'preferred_username': 'j.doe',
|
||||
'email': 'jane@example.com',
|
||||
'company': 'Acme Corp',
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='248289761001',
|
||||
name='Jane Doe',
|
||||
preferred_username='j.doe',
|
||||
email='jane@example.com',
|
||||
company='Acme Corp',
|
||||
)
|
||||
)
|
||||
|
||||
result = await saas_get_user(
|
||||
@@ -152,18 +152,18 @@ async def test_fallback_user_includes_company_claim(
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fallback_user_company_is_none_when_absent(
|
||||
mock_token_manager, mock_check_idp, mock_user_store
|
||||
mock_token_manager, mock_check_idp, mock_user_store, create_keycloak_user_info
|
||||
):
|
||||
"""When 'company' is not in Keycloak claims, company should be None."""
|
||||
from server.routes.user import saas_get_user
|
||||
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': '248289761001',
|
||||
'name': 'Jane Doe',
|
||||
'preferred_username': 'j.doe',
|
||||
'email': 'jane@example.com',
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='248289761001',
|
||||
name='Jane Doe',
|
||||
preferred_username='j.doe',
|
||||
email='jane@example.com',
|
||||
)
|
||||
)
|
||||
|
||||
result = await saas_get_user(
|
||||
@@ -178,17 +178,17 @@ async def test_fallback_user_company_is_none_when_absent(
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fallback_user_email_from_db_when_available(
|
||||
mock_token_manager, mock_check_idp, mock_user_store
|
||||
mock_token_manager, mock_check_idp, mock_user_store, create_keycloak_user_info
|
||||
):
|
||||
"""When User.email is stored in DB, use it instead of Keycloak's live email."""
|
||||
from server.routes.user import saas_get_user
|
||||
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': '248289761001',
|
||||
'preferred_username': 'j.doe',
|
||||
'email': 'keycloak@example.com',
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='248289761001',
|
||||
preferred_username='j.doe',
|
||||
email='keycloak@example.com',
|
||||
)
|
||||
)
|
||||
|
||||
mock_db_user = MagicMock()
|
||||
@@ -207,17 +207,17 @@ async def test_fallback_user_email_from_db_when_available(
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fallback_user_email_falls_back_to_keycloak_when_db_null(
|
||||
mock_token_manager, mock_check_idp, mock_user_store
|
||||
mock_token_manager, mock_check_idp, mock_user_store, create_keycloak_user_info
|
||||
):
|
||||
"""When User.email is NULL in DB, fall back to Keycloak's email."""
|
||||
from server.routes.user import saas_get_user
|
||||
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': '248289761001',
|
||||
'preferred_username': 'j.doe',
|
||||
'email': 'keycloak@example.com',
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='248289761001',
|
||||
preferred_username='j.doe',
|
||||
email='keycloak@example.com',
|
||||
)
|
||||
)
|
||||
|
||||
mock_db_user = MagicMock()
|
||||
@@ -236,17 +236,17 @@ async def test_fallback_user_email_falls_back_to_keycloak_when_db_null(
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fallback_user_email_falls_back_to_keycloak_when_no_db_user(
|
||||
mock_token_manager, mock_check_idp, mock_user_store
|
||||
mock_token_manager, mock_check_idp, mock_user_store, create_keycloak_user_info
|
||||
):
|
||||
"""When DB user doesn't exist, fall back to Keycloak's email."""
|
||||
from server.routes.user import saas_get_user
|
||||
|
||||
mock_token_manager.get_user_info = AsyncMock(
|
||||
return_value={
|
||||
'sub': '248289761001',
|
||||
'preferred_username': 'j.doe',
|
||||
'email': 'keycloak@example.com',
|
||||
}
|
||||
return_value=create_keycloak_user_info(
|
||||
sub='248289761001',
|
||||
preferred_username='j.doe',
|
||||
email='keycloak@example.com',
|
||||
)
|
||||
)
|
||||
|
||||
# mock_user_store already returns None by default
|
||||
|
||||
Reference in New Issue
Block a user