feat: allow manual reinstallation for gitlab resolver (#12184)

This commit is contained in:
Hiep Le
2026-01-05 12:05:20 +07:00
committed by GitHub
parent 5bd8695ab8
commit 6f86e589c8
23 changed files with 3725 additions and 157 deletions

View File

@@ -80,22 +80,52 @@ class SaaSGitLabService(GitLabService):
logger.warning('external_auth_token and user_id not set!')
return gitlab_token
async def get_owned_groups(self) -> list[dict]:
async def get_owned_groups(self, min_access_level: int = 40) -> list[dict]:
"""
Get all groups for which the current user is the owner.
Get all top-level groups where the current user has admin access.
This method supports pagination and fetches all groups where the user has
at least the specified access level.
Args:
min_access_level: Minimum access level required (default: 40 for Maintainer or Owner)
- 40: Maintainer or Owner
- 50: Owner only
Returns:
list[dict]: A list of groups owned by the current user.
list[dict]: A list of groups where user has the specified access level or higher.
"""
url = f'{self.BASE_URL}/groups'
params = {'owned': 'true', 'per_page': 100, 'top_level_only': 'true'}
groups_with_admin_access = []
page = 1
per_page = 100
try:
response, headers = await self._make_request(url, params)
return response
except Exception:
logger.warning('Error fetching owned groups', exc_info=True)
return []
while True:
try:
url = f'{self.BASE_URL}/groups'
params = {
'page': str(page),
'per_page': str(per_page),
'min_access_level': min_access_level,
'top_level_only': 'true',
}
response, headers = await self._make_request(url, params)
if not response:
break
groups_with_admin_access.extend(response)
page += 1
# Check if we've reached the last page
link_header = headers.get('Link', '')
if 'rel="next"' not in link_header:
break
except Exception:
logger.warning(f'Error fetching groups on page {page}', exc_info=True)
break
return groups_with_admin_access
async def add_owned_projects_and_groups_to_db(self, owned_personal_projects):
"""
@@ -527,3 +557,55 @@ class SaaSGitLabService(GitLabService):
await self._make_request(url=url, params=params, method=RequestMethod.POST)
except Exception as e:
logger.exception(f'[GitLab]: Reply to MR failed {e}')
async def get_user_resources_with_admin_access(
self,
) -> tuple[list[dict], list[dict]]:
"""
Get all projects and groups where the current user has admin access (maintainer or owner).
Returns:
tuple[list[dict], list[dict]]: A tuple containing:
- list of projects where user has admin access
- list of groups where user has admin access
"""
projects_with_admin_access = []
groups_with_admin_access = []
# Fetch all projects the user is a member of
page = 1
per_page = 100
while True:
try:
url = f'{self.BASE_URL}/projects'
params = {
'page': str(page),
'per_page': str(per_page),
'membership': 1,
'min_access_level': 40, # Maintainer or Owner
}
response, headers = await self._make_request(url, params)
if not response:
break
projects_with_admin_access.extend(response)
page += 1
# Check if we've reached the last page
link_header = headers.get('Link', '')
if 'rel="next"' not in link_header:
break
except Exception:
logger.warning(f'Error fetching projects on page {page}', exc_info=True)
break
# Fetch all groups where user is owner or maintainer
groups_with_admin_access = await self.get_owned_groups(min_access_level=40)
logger.info(
f'Found {len(projects_with_admin_access)} projects and {len(groups_with_admin_access)} groups with admin access'
)
return projects_with_admin_access, groups_with_admin_access

View File

@@ -0,0 +1,199 @@
"""Shared utilities for GitLab webhook installation.
This module contains reusable functions and classes for installing GitLab webhooks
that can be used by both the cron job and API routes.
"""
from typing import cast
from uuid import uuid4
from integrations.types import GitLabResourceType
from integrations.utils import GITLAB_WEBHOOK_URL
from storage.gitlab_webhook import GitlabWebhook, WebhookStatus
from storage.gitlab_webhook_store import GitlabWebhookStore
from openhands.core.logger import openhands_logger as logger
from openhands.integrations.service_types import GitService
# Webhook configuration constants
WEBHOOK_NAME = 'OpenHands Resolver'
SCOPES: list[str] = [
'note_events',
'merge_requests_events',
'confidential_issues_events',
'issues_events',
'confidential_note_events',
'job_events',
'pipeline_events',
]
class BreakLoopException(Exception):
"""Exception raised when webhook installation conditions are not met or rate limited."""
pass
async def verify_webhook_conditions(
gitlab_service: type[GitService],
resource_type: GitLabResourceType,
resource_id: str,
webhook_store: GitlabWebhookStore,
webhook: GitlabWebhook,
) -> None:
"""
Verify all conditions are met for webhook installation.
Raises BreakLoopException if any condition fails or rate limited.
Args:
gitlab_service: GitLab service instance
resource_type: Type of resource (PROJECT or GROUP)
resource_id: ID of the resource
webhook_store: Webhook store instance
webhook: Webhook object to verify
"""
from integrations.gitlab.gitlab_service import SaaSGitLabService
gitlab_service = cast(type[SaaSGitLabService], gitlab_service)
# Check if resource exists
does_resource_exist, status = await gitlab_service.check_resource_exists(
resource_type, resource_id
)
logger.info(
'Does resource exists',
extra={
'does_resource_exist': does_resource_exist,
'status': status,
'resource_id': resource_id,
'resource_type': resource_type,
},
)
if status == WebhookStatus.RATE_LIMITED:
raise BreakLoopException()
if not does_resource_exist and status != WebhookStatus.RATE_LIMITED:
await webhook_store.delete_webhook(webhook)
raise BreakLoopException()
# Check if user has admin access
(
is_user_admin_of_resource,
status,
) = await gitlab_service.check_user_has_admin_access_to_resource(
resource_type, resource_id
)
logger.info(
'Is user admin',
extra={
'is_user_admin': is_user_admin_of_resource,
'status': status,
'resource_id': resource_id,
'resource_type': resource_type,
},
)
if status == WebhookStatus.RATE_LIMITED:
raise BreakLoopException()
if not is_user_admin_of_resource:
await webhook_store.delete_webhook(webhook)
raise BreakLoopException()
# Check if webhook already exists
(
does_webhook_exist_on_resource,
status,
) = await gitlab_service.check_webhook_exists_on_resource(
resource_type, resource_id, GITLAB_WEBHOOK_URL
)
logger.info(
'Does webhook already exist',
extra={
'does_webhook_exist_on_resource': does_webhook_exist_on_resource,
'status': status,
'resource_id': resource_id,
'resource_type': resource_type,
},
)
if status == WebhookStatus.RATE_LIMITED:
raise BreakLoopException()
if does_webhook_exist_on_resource != webhook.webhook_exists:
await webhook_store.update_webhook(
webhook, {'webhook_exists': does_webhook_exist_on_resource}
)
if does_webhook_exist_on_resource:
raise BreakLoopException()
async def install_webhook_on_resource(
gitlab_service: type[GitService],
resource_type: GitLabResourceType,
resource_id: str,
webhook_store: GitlabWebhookStore,
webhook: GitlabWebhook,
) -> tuple[str | None, WebhookStatus | None]:
"""
Install webhook on a GitLab resource.
Args:
gitlab_service: GitLab service instance
resource_type: Type of resource (PROJECT or GROUP)
resource_id: ID of the resource
webhook_store: Webhook store instance
webhook: Webhook object to install
Returns:
Tuple of (webhook_id, status)
"""
from integrations.gitlab.gitlab_service import SaaSGitLabService
gitlab_service = cast(type[SaaSGitLabService], gitlab_service)
webhook_secret = f'{webhook.user_id}-{str(uuid4())}'
webhook_uuid = f'{str(uuid4())}'
webhook_id, status = await gitlab_service.install_webhook(
resource_type=resource_type,
resource_id=resource_id,
webhook_name=WEBHOOK_NAME,
webhook_url=GITLAB_WEBHOOK_URL,
webhook_secret=webhook_secret,
webhook_uuid=webhook_uuid,
scopes=SCOPES,
)
logger.info(
'Creating new webhook',
extra={
'webhook_id': webhook_id,
'status': status,
'resource_id': resource_id,
'resource_type': resource_type,
},
)
if status == WebhookStatus.RATE_LIMITED:
raise BreakLoopException()
if webhook_id:
await webhook_store.update_webhook(
webhook=webhook,
update_fields={
'webhook_secret': webhook_secret,
'webhook_exists': True, # webhook was created
'webhook_url': GITLAB_WEBHOOK_URL,
'scopes': SCOPES,
'webhook_uuid': webhook_uuid, # required to identify which webhook installation is sending payload
},
)
logger.info(
f'Installed webhook for {webhook.user_id} on {resource_type}:{resource_id}'
)
return webhook_id, status

View File

@@ -1,15 +1,28 @@
import asyncio
import hashlib
import json
from fastapi import APIRouter, Header, HTTPException, Request
from fastapi import APIRouter, Depends, Header, HTTPException, Request, status
from fastapi.responses import JSONResponse
from integrations.gitlab.gitlab_manager import GitlabManager
from integrations.gitlab.gitlab_service import SaaSGitLabService
from integrations.gitlab.webhook_installation import (
BreakLoopException,
install_webhook_on_resource,
verify_webhook_conditions,
)
from integrations.models import Message, SourceType
from integrations.types import GitLabResourceType
from integrations.utils import GITLAB_WEBHOOK_URL
from pydantic import BaseModel
from server.auth.token_manager import TokenManager
from storage.gitlab_webhook import GitlabWebhook
from storage.gitlab_webhook_store import GitlabWebhookStore
from openhands.core.logger import openhands_logger as logger
from openhands.integrations.gitlab.gitlab_service import GitLabServiceImpl
from openhands.server.shared import sio
from openhands.server.user_auth import get_user_id
gitlab_integration_router = APIRouter(prefix='/integration')
webhook_store = GitlabWebhookStore()
@@ -18,6 +31,37 @@ token_manager = TokenManager()
gitlab_manager = GitlabManager(token_manager)
# Request/Response models
class ResourceIdentifier(BaseModel):
type: GitLabResourceType
id: str
class ReinstallWebhookRequest(BaseModel):
resource: ResourceIdentifier
class ResourceWithWebhookStatus(BaseModel):
id: str
name: str
full_path: str
type: str
webhook_installed: bool
webhook_uuid: str | None
last_synced: str | None
class GitLabResourcesResponse(BaseModel):
resources: list[ResourceWithWebhookStatus]
class ResourceInstallationResult(BaseModel):
resource_id: str
resource_type: str
success: bool
error: str | None
async def verify_gitlab_signature(
header_webhook_secret: str, webhook_uuid: str, user_id: str
):
@@ -83,3 +127,260 @@ async def gitlab_events(
except Exception as e:
logger.exception(f'Error processing GitLab event: {e}')
return JSONResponse(status_code=400, content={'error': 'Invalid payload.'})
@gitlab_integration_router.get('/gitlab/resources')
async def get_gitlab_resources(
user_id: str = Depends(get_user_id),
) -> GitLabResourcesResponse:
"""Get all GitLab projects and groups where the user has admin access.
Returns a list of resources with their webhook installation status.
"""
try:
# Get GitLab service for the user
gitlab_service = GitLabServiceImpl(external_auth_id=user_id)
if not isinstance(gitlab_service, SaaSGitLabService):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Only SaaS GitLab service is supported',
)
# Fetch projects and groups with admin access
projects, groups = await gitlab_service.get_user_resources_with_admin_access()
# Filter out projects that belong to a group (nested projects)
# We only want top-level personal projects since group webhooks cover nested projects
filtered_projects = [
project
for project in projects
if project.get('namespace', {}).get('kind') != 'group'
]
# Extract IDs for bulk fetching
project_ids = [str(project['id']) for project in filtered_projects]
group_ids = [str(group['id']) for group in groups]
# Bulk fetch webhook records from database (organization-wide)
(
project_webhook_map,
group_webhook_map,
) = await webhook_store.get_webhooks_by_resources(project_ids, group_ids)
# Parallelize GitLab API calls to check webhook status for all resources
async def check_project_webhook(project):
project_id = str(project['id'])
webhook_exists, _ = await gitlab_service.check_webhook_exists_on_resource(
GitLabResourceType.PROJECT, project_id, GITLAB_WEBHOOK_URL
)
return project_id, webhook_exists
async def check_group_webhook(group):
group_id = str(group['id'])
webhook_exists, _ = await gitlab_service.check_webhook_exists_on_resource(
GitLabResourceType.GROUP, group_id, GITLAB_WEBHOOK_URL
)
return group_id, webhook_exists
# Gather all API calls in parallel
project_checks = [
check_project_webhook(project) for project in filtered_projects
]
group_checks = [check_group_webhook(group) for group in groups]
# Execute all checks concurrently
all_results = await asyncio.gather(*(project_checks + group_checks))
# Split results back into projects and groups
num_projects = len(filtered_projects)
project_results = all_results[:num_projects]
group_results = all_results[num_projects:]
# Build response
resources = []
# Add projects with their webhook status
for project, (project_id, webhook_exists) in zip(
filtered_projects, project_results
):
webhook = project_webhook_map.get(project_id)
resources.append(
ResourceWithWebhookStatus(
id=project_id,
name=project.get('name', ''),
full_path=project.get('path_with_namespace', ''),
type='project',
webhook_installed=webhook_exists,
webhook_uuid=webhook.webhook_uuid if webhook else None,
last_synced=(
webhook.last_synced.isoformat()
if webhook and webhook.last_synced
else None
),
)
)
# Add groups with their webhook status
for group, (group_id, webhook_exists) in zip(groups, group_results):
webhook = group_webhook_map.get(group_id)
resources.append(
ResourceWithWebhookStatus(
id=group_id,
name=group.get('name', ''),
full_path=group.get('full_path', ''),
type='group',
webhook_installed=webhook_exists,
webhook_uuid=webhook.webhook_uuid if webhook else None,
last_synced=(
webhook.last_synced.isoformat()
if webhook and webhook.last_synced
else None
),
)
)
logger.info(
'Retrieved GitLab resources',
extra={
'user_id': user_id,
'project_count': len(projects),
'group_count': len(groups),
},
)
return GitLabResourcesResponse(resources=resources)
except HTTPException:
raise
except Exception as e:
logger.exception(f'Error retrieving GitLab resources: {e}')
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='Failed to retrieve GitLab resources',
)
@gitlab_integration_router.post('/gitlab/reinstall-webhook')
async def reinstall_gitlab_webhook(
body: ReinstallWebhookRequest,
user_id: str = Depends(get_user_id),
) -> ResourceInstallationResult:
"""Reinstall GitLab webhook for a specific resource immediately.
This endpoint validates permissions, resets webhook status in the database,
and immediately installs the webhook on the specified resource.
"""
try:
# Get GitLab service for the user
gitlab_service = GitLabServiceImpl(external_auth_id=user_id)
if not isinstance(gitlab_service, SaaSGitLabService):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Only SaaS GitLab service is supported',
)
resource_id = body.resource.id
resource_type = body.resource.type
# Check if user has admin access to this resource
(
has_admin_access,
check_status,
) = await gitlab_service.check_user_has_admin_access_to_resource(
resource_type, resource_id
)
if not has_admin_access:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail='User does not have admin access to this resource',
)
# Reset webhook in database (organization-wide, not user-specific)
# This allows any admin user to reinstall webhooks
await webhook_store.reset_webhook_for_reinstallation_by_resource(
resource_type, resource_id, user_id
)
# Get or create webhook record (without user_id filter)
webhook = await webhook_store.get_webhook_by_resource_only(
resource_type, resource_id
)
if not webhook:
# Create new webhook record
webhook = GitlabWebhook(
user_id=user_id, # Track who created it
project_id=resource_id
if resource_type == GitLabResourceType.PROJECT
else None,
group_id=resource_id
if resource_type == GitLabResourceType.GROUP
else None,
webhook_exists=False,
)
await webhook_store.store_webhooks([webhook])
# Fetch it again to get the ID (without user_id filter)
webhook = await webhook_store.get_webhook_by_resource_only(
resource_type, resource_id
)
# Verify conditions and install webhook
try:
await verify_webhook_conditions(
gitlab_service=gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_store=webhook_store,
webhook=webhook,
)
# Install the webhook
webhook_id, install_status = await install_webhook_on_resource(
gitlab_service=gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_store=webhook_store,
webhook=webhook,
)
if webhook_id:
logger.info(
'GitLab webhook reinstalled successfully',
extra={
'user_id': user_id,
'resource_type': resource_type.value,
'resource_id': resource_id,
},
)
return ResourceInstallationResult(
resource_id=resource_id,
resource_type=resource_type.value,
success=True,
error=None,
)
else:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='Failed to install webhook',
)
except BreakLoopException:
# Conditions not met or webhook already exists
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Webhook installation conditions not met or webhook already exists',
)
except HTTPException:
raise
except Exception as e:
logger.exception(f'Error reinstalling GitLab webhook: {e}')
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='Failed to reinstall webhook',
)

View File

@@ -220,6 +220,127 @@ class GitlabWebhookStore:
return webhooks[0].webhook_secret
return None
async def get_webhook_by_resource_only(
self, resource_type: GitLabResourceType, resource_id: str
) -> GitlabWebhook | None:
"""Get a webhook by resource without filtering by user_id.
This allows any admin user in the organization to manage webhooks,
not just the original installer.
Args:
resource_type: The type of resource (PROJECT or GROUP)
resource_id: The ID of the resource
Returns:
GitlabWebhook object if found, None otherwise
"""
async with self.a_session_maker() as session:
if resource_type == GitLabResourceType.PROJECT:
query = select(GitlabWebhook).where(
GitlabWebhook.project_id == resource_id
)
else: # GROUP
query = select(GitlabWebhook).where(
GitlabWebhook.group_id == resource_id
)
result = await session.execute(query)
webhook = result.scalars().first()
return webhook
async def get_webhooks_by_resources(
self, project_ids: list[str], group_ids: list[str]
) -> tuple[dict[str, GitlabWebhook], dict[str, GitlabWebhook]]:
"""Bulk fetch webhooks for multiple resources.
This is more efficient than fetching one at a time in a loop.
Args:
project_ids: List of project IDs to fetch
group_ids: List of group IDs to fetch
Returns:
Tuple of (project_webhook_map, group_webhook_map)
"""
async with self.a_session_maker() as session:
project_webhook_map = {}
group_webhook_map = {}
# Fetch all project webhooks in one query
if project_ids:
project_query = select(GitlabWebhook).where(
GitlabWebhook.project_id.in_(project_ids)
)
result = await session.execute(project_query)
project_webhooks = result.scalars().all()
project_webhook_map = {wh.project_id: wh for wh in project_webhooks}
# Fetch all group webhooks in one query
if group_ids:
group_query = select(GitlabWebhook).where(
GitlabWebhook.group_id.in_(group_ids)
)
result = await session.execute(group_query)
group_webhooks = result.scalars().all()
group_webhook_map = {wh.group_id: wh for wh in group_webhooks}
return project_webhook_map, group_webhook_map
async def reset_webhook_for_reinstallation_by_resource(
self, resource_type: GitLabResourceType, resource_id: str, updating_user_id: str
) -> bool:
"""Reset webhook for reinstallation without filtering by user_id.
This allows any admin user to reset webhooks, and updates the user_id
to track who last modified it.
Args:
resource_type: The type of resource (PROJECT or GROUP)
resource_id: The ID of the resource
updating_user_id: The user ID performing the update (for audit purposes)
Returns:
True if webhook was reset, False if not found
"""
async with self.a_session_maker() as session:
async with session.begin():
if resource_type == GitLabResourceType.PROJECT:
update_statement = (
update(GitlabWebhook)
.where(GitlabWebhook.project_id == resource_id)
.values(
webhook_exists=False,
webhook_uuid=None,
user_id=updating_user_id, # Update to track who modified it
)
)
else: # GROUP
update_statement = (
update(GitlabWebhook)
.where(GitlabWebhook.group_id == resource_id)
.values(
webhook_exists=False,
webhook_uuid=None,
user_id=updating_user_id, # Update to track who modified it
)
)
result = await session.execute(update_statement)
rows_updated = result.rowcount
logger.info(
'Reset webhook for reinstallation (organization-wide)',
extra={
'updating_user_id': updating_user_id,
'resource_type': resource_type.value,
'resource_id': resource_id,
'rows_updated': rows_updated,
},
)
return rows_updated > 0
@classmethod
async def get_instance(cls) -> GitlabWebhookStore:
"""Get an instance of the GitlabWebhookStore.

View File

@@ -1,7 +1,11 @@
import asyncio
from typing import cast
from uuid import uuid4
from integrations.gitlab.webhook_installation import (
BreakLoopException,
install_webhook_on_resource,
verify_webhook_conditions,
)
from integrations.types import GitLabResourceType
from integrations.utils import GITLAB_WEBHOOK_URL
from sqlalchemy import text
@@ -14,20 +18,6 @@ from openhands.integrations.gitlab.gitlab_service import GitLabServiceImpl
from openhands.integrations.service_types import GitService
CHUNK_SIZE = 100
WEBHOOK_NAME = 'OpenHands Resolver'
SCOPES: list[str] = [
'note_events',
'merge_requests_events',
'confidential_issues_events',
'issues_events',
'confidential_note_events',
'job_events',
'pipeline_events',
]
class BreakLoopException(Exception):
pass
class VerifyWebhookStatus:
@@ -43,77 +33,6 @@ class VerifyWebhookStatus:
if status == WebhookStatus.RATE_LIMITED:
raise BreakLoopException()
async def check_if_resource_exists(
self,
gitlab_service: type[GitService],
resource_type: GitLabResourceType,
resource_id: str,
webhook_store: GitlabWebhookStore,
webhook: GitlabWebhook,
):
"""
Check if the GitLab resource still exists
"""
from integrations.gitlab.gitlab_service import SaaSGitLabService
gitlab_service = cast(type[SaaSGitLabService], gitlab_service)
does_resource_exist, status = await gitlab_service.check_resource_exists(
resource_type, resource_id
)
logger.info(
'Does resource exists',
extra={
'does_resource_exist': does_resource_exist,
'status': status,
'resource_id': resource_id,
'resource_type': resource_type,
},
)
self.determine_if_rate_limited(status)
if not does_resource_exist and status != WebhookStatus.RATE_LIMITED:
await webhook_store.delete_webhook(webhook)
raise BreakLoopException()
async def check_if_user_has_admin_acccess_to_resource(
self,
gitlab_service: type[GitService],
resource_type: GitLabResourceType,
resource_id: str,
webhook_store: GitlabWebhookStore,
webhook: GitlabWebhook,
):
"""
Check is user still has permission to resource
"""
from integrations.gitlab.gitlab_service import SaaSGitLabService
gitlab_service = cast(type[SaaSGitLabService], gitlab_service)
(
is_user_admin_of_resource,
status,
) = await gitlab_service.check_user_has_admin_access_to_resource(
resource_type, resource_id
)
logger.info(
'Is user admin',
extra={
'is_user_admin': is_user_admin_of_resource,
'status': status,
'resource_id': resource_id,
'resource_type': resource_type,
},
)
self.determine_if_rate_limited(status)
if not is_user_admin_of_resource:
await webhook_store.delete_webhook(webhook)
raise BreakLoopException()
async def check_if_webhook_already_exists_on_resource(
self,
gitlab_service: type[GitService],
@@ -162,23 +81,8 @@ class VerifyWebhookStatus:
webhook_store: GitlabWebhookStore,
webhook: GitlabWebhook,
):
await self.check_if_resource_exists(
gitlab_service=gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_store=webhook_store,
webhook=webhook,
)
await self.check_if_user_has_admin_acccess_to_resource(
gitlab_service=gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_store=webhook_store,
webhook=webhook,
)
await self.check_if_webhook_already_exists_on_resource(
# Use the standalone function
await verify_webhook_conditions(
gitlab_service=gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
@@ -197,51 +101,15 @@ class VerifyWebhookStatus:
"""
Install webhook on resource
"""
from integrations.gitlab.gitlab_service import SaaSGitLabService
gitlab_service = cast(type[SaaSGitLabService], gitlab_service)
webhook_secret = f'{webhook.user_id}-{str(uuid4())}'
webhook_uuid = f'{str(uuid4())}'
webhook_id, status = await gitlab_service.install_webhook(
# Use the standalone function
await install_webhook_on_resource(
gitlab_service=gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_name=WEBHOOK_NAME,
webhook_url=GITLAB_WEBHOOK_URL,
webhook_secret=webhook_secret,
webhook_uuid=webhook_uuid,
scopes=SCOPES,
webhook_store=webhook_store,
webhook=webhook,
)
logger.info(
'Creating new webhook',
extra={
'webhook_id': webhook_id,
'status': status,
'resource_id': resource_id,
'resource_type': resource_type,
},
)
self.determine_if_rate_limited(status)
if webhook_id:
await webhook_store.update_webhook(
webhook=webhook,
update_fields={
'webhook_secret': webhook_secret,
'webhook_exists': True, # webhook was created
'webhook_url': GITLAB_WEBHOOK_URL,
'scopes': SCOPES,
'webhook_uuid': webhook_uuid, # required to identify which webhook installation is sending payload
},
)
logger.info(
f'Installed webhook for {webhook.user_id} on {resource_type}:{resource_id}'
)
async def install_webhooks(self):
"""
Periodically check the conditions for installing a webhook on resource as valid

View File

@@ -0,0 +1,204 @@
"""Unit tests for SaaSGitLabService."""
from unittest.mock import patch
import pytest
from integrations.gitlab.gitlab_service import SaaSGitLabService
@pytest.fixture
def gitlab_service():
"""Create a SaaSGitLabService instance for testing."""
return SaaSGitLabService(external_auth_id='test_user_id')
class TestGetUserResourcesWithAdminAccess:
"""Test cases for get_user_resources_with_admin_access method."""
@pytest.mark.asyncio
async def test_get_resources_single_page_projects_and_groups(self, gitlab_service):
"""Test fetching resources when all data fits in a single page."""
# Arrange
mock_projects = [
{'id': 1, 'name': 'Project 1'},
{'id': 2, 'name': 'Project 2'},
]
mock_groups = [
{'id': 10, 'name': 'Group 1'},
]
with patch.object(gitlab_service, '_make_request') as mock_request:
# First call for projects, second for groups
mock_request.side_effect = [
(mock_projects, {'Link': ''}), # No next page
(mock_groups, {'Link': ''}), # No next page
]
# Act
(
projects,
groups,
) = await gitlab_service.get_user_resources_with_admin_access()
# Assert
assert len(projects) == 2
assert len(groups) == 1
assert projects[0]['id'] == 1
assert projects[1]['id'] == 2
assert groups[0]['id'] == 10
assert mock_request.call_count == 2
@pytest.mark.asyncio
async def test_get_resources_multiple_pages_projects(self, gitlab_service):
"""Test fetching projects across multiple pages."""
# Arrange
page1_projects = [{'id': i, 'name': f'Project {i}'} for i in range(1, 101)]
page2_projects = [{'id': i, 'name': f'Project {i}'} for i in range(101, 151)]
with patch.object(gitlab_service, '_make_request') as mock_request:
mock_request.side_effect = [
(page1_projects, {'Link': '<url>; rel="next"'}), # Has next page
(page2_projects, {'Link': ''}), # Last page
([], {'Link': ''}), # Groups (empty)
]
# Act
(
projects,
groups,
) = await gitlab_service.get_user_resources_with_admin_access()
# Assert
assert len(projects) == 150
assert len(groups) == 0
assert mock_request.call_count == 3
@pytest.mark.asyncio
async def test_get_resources_multiple_pages_groups(self, gitlab_service):
"""Test fetching groups across multiple pages."""
# Arrange
page1_groups = [{'id': i, 'name': f'Group {i}'} for i in range(1, 101)]
page2_groups = [{'id': i, 'name': f'Group {i}'} for i in range(101, 151)]
with patch.object(gitlab_service, '_make_request') as mock_request:
mock_request.side_effect = [
([], {'Link': ''}), # Projects (empty)
(page1_groups, {'Link': '<url>; rel="next"'}), # Has next page
(page2_groups, {'Link': ''}), # Last page
]
# Act
(
projects,
groups,
) = await gitlab_service.get_user_resources_with_admin_access()
# Assert
assert len(projects) == 0
assert len(groups) == 150
assert mock_request.call_count == 3
@pytest.mark.asyncio
async def test_get_resources_empty_response(self, gitlab_service):
"""Test when user has no projects or groups with admin access."""
# Arrange
with patch.object(gitlab_service, '_make_request') as mock_request:
mock_request.side_effect = [
([], {'Link': ''}), # No projects
([], {'Link': ''}), # No groups
]
# Act
(
projects,
groups,
) = await gitlab_service.get_user_resources_with_admin_access()
# Assert
assert len(projects) == 0
assert len(groups) == 0
assert mock_request.call_count == 2
@pytest.mark.asyncio
async def test_get_resources_uses_correct_params_for_projects(self, gitlab_service):
"""Test that projects API is called with correct parameters."""
# Arrange
with patch.object(gitlab_service, '_make_request') as mock_request:
mock_request.side_effect = [
([], {'Link': ''}), # Projects
([], {'Link': ''}), # Groups
]
# Act
await gitlab_service.get_user_resources_with_admin_access()
# Assert
# Check first call (projects)
first_call = mock_request.call_args_list[0]
assert 'projects' in first_call[0][0]
assert first_call[0][1]['membership'] == 1
assert first_call[0][1]['min_access_level'] == 40
assert first_call[0][1]['per_page'] == '100'
@pytest.mark.asyncio
async def test_get_resources_uses_correct_params_for_groups(self, gitlab_service):
"""Test that groups API is called with correct parameters."""
# Arrange
with patch.object(gitlab_service, '_make_request') as mock_request:
mock_request.side_effect = [
([], {'Link': ''}), # Projects
([], {'Link': ''}), # Groups
]
# Act
await gitlab_service.get_user_resources_with_admin_access()
# Assert
# Check second call (groups)
second_call = mock_request.call_args_list[1]
assert 'groups' in second_call[0][0]
assert second_call[0][1]['min_access_level'] == 40
assert second_call[0][1]['top_level_only'] == 'true'
assert second_call[0][1]['per_page'] == '100'
@pytest.mark.asyncio
async def test_get_resources_handles_api_error_gracefully(self, gitlab_service):
"""Test that API errors are handled gracefully and don't crash."""
# Arrange
with patch.object(gitlab_service, '_make_request') as mock_request:
# First call succeeds, second call fails
mock_request.side_effect = [
([{'id': 1, 'name': 'Project 1'}], {'Link': ''}),
Exception('API Error'),
]
# Act
(
projects,
groups,
) = await gitlab_service.get_user_resources_with_admin_access()
# Assert
# Should return what was fetched before the error
assert len(projects) == 1
assert len(groups) == 0
@pytest.mark.asyncio
async def test_get_resources_stops_on_empty_response(self, gitlab_service):
"""Test that pagination stops when API returns empty response."""
# Arrange
with patch.object(gitlab_service, '_make_request') as mock_request:
mock_request.side_effect = [
(None, {'Link': ''}), # Empty response stops pagination
([], {'Link': ''}), # Groups
]
# Act
(
projects,
groups,
) = await gitlab_service.get_user_resources_with_admin_access()
# Assert
assert len(projects) == 0
assert mock_request.call_count == 2 # Should not continue pagination

View File

@@ -0,0 +1,502 @@
"""Unit tests for GitLab integration routes."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from fastapi import HTTPException, status
from integrations.gitlab.gitlab_service import SaaSGitLabService
from integrations.gitlab.webhook_installation import BreakLoopException
from integrations.types import GitLabResourceType
from server.routes.integration.gitlab import (
ReinstallWebhookRequest,
ResourceIdentifier,
get_gitlab_resources,
reinstall_gitlab_webhook,
)
from storage.gitlab_webhook import GitlabWebhook
@pytest.fixture
def mock_gitlab_service():
"""Create a mock SaaSGitLabService instance."""
service = MagicMock(spec=SaaSGitLabService)
service.get_user_resources_with_admin_access = AsyncMock(
return_value=(
[
{
'id': 1,
'name': 'Test Project',
'path_with_namespace': 'user/test-project',
'namespace': {'kind': 'user'},
},
{
'id': 2,
'name': 'Group Project',
'path_with_namespace': 'group/group-project',
'namespace': {'kind': 'group'},
},
],
[
{
'id': 10,
'name': 'Test Group',
'full_path': 'test-group',
},
],
)
)
service.check_webhook_exists_on_resource = AsyncMock(return_value=(True, None))
service.check_user_has_admin_access_to_resource = AsyncMock(
return_value=(True, None)
)
return service
@pytest.fixture
def mock_webhook():
"""Create a mock webhook object."""
webhook = MagicMock(spec=GitlabWebhook)
webhook.webhook_uuid = 'test-uuid'
webhook.last_synced = None
return webhook
class TestGetGitLabResources:
"""Test cases for get_gitlab_resources endpoint."""
@pytest.mark.asyncio
@patch('server.routes.integration.gitlab.GitLabServiceImpl')
@patch('server.routes.integration.gitlab.webhook_store')
@patch('server.routes.integration.gitlab.isinstance')
async def test_get_resources_success(
self,
mock_isinstance,
mock_webhook_store,
mock_gitlab_service_impl,
mock_gitlab_service,
):
"""Test successfully retrieving GitLab resources with webhook status."""
# Arrange
user_id = 'test_user_id'
mock_gitlab_service_impl.return_value = mock_gitlab_service
mock_isinstance.return_value = True
mock_webhook_store.get_webhooks_by_resources = AsyncMock(
return_value=({}, {}) # Empty maps for simplicity
)
# Act
response = await get_gitlab_resources(user_id=user_id)
# Assert
assert len(response.resources) == 2 # 1 project (filtered) + 1 group
assert response.resources[0].type == 'project'
assert response.resources[0].id == '1'
assert response.resources[0].name == 'Test Project'
assert response.resources[1].type == 'group'
assert response.resources[1].id == '10'
mock_gitlab_service.get_user_resources_with_admin_access.assert_called_once()
mock_webhook_store.get_webhooks_by_resources.assert_called_once()
@pytest.mark.asyncio
@patch('server.routes.integration.gitlab.GitLabServiceImpl')
@patch('server.routes.integration.gitlab.webhook_store')
@patch('server.routes.integration.gitlab.isinstance')
async def test_get_resources_filters_nested_projects(
self,
mock_isinstance,
mock_webhook_store,
mock_gitlab_service_impl,
mock_gitlab_service,
):
"""Test that projects nested under groups are filtered out."""
# Arrange
user_id = 'test_user_id'
mock_gitlab_service_impl.return_value = mock_gitlab_service
mock_isinstance.return_value = True
mock_webhook_store.get_webhooks_by_resources = AsyncMock(return_value=({}, {}))
# Act
response = await get_gitlab_resources(user_id=user_id)
# Assert
# Should only include the user project, not the group project
project_resources = [r for r in response.resources if r.type == 'project']
assert len(project_resources) == 1
assert project_resources[0].id == '1'
assert project_resources[0].name == 'Test Project'
@pytest.mark.asyncio
@patch('server.routes.integration.gitlab.GitLabServiceImpl')
@patch('server.routes.integration.gitlab.webhook_store')
@patch('server.routes.integration.gitlab.isinstance')
async def test_get_resources_includes_webhook_metadata(
self,
mock_isinstance,
mock_webhook_store,
mock_gitlab_service_impl,
mock_gitlab_service,
mock_webhook,
):
"""Test that webhook metadata is included in the response."""
# Arrange
user_id = 'test_user_id'
mock_gitlab_service_impl.return_value = mock_gitlab_service
mock_isinstance.return_value = True
mock_webhook_store.get_webhooks_by_resources = AsyncMock(
return_value=({'1': mock_webhook}, {'10': mock_webhook})
)
# Act
response = await get_gitlab_resources(user_id=user_id)
# Assert
assert response.resources[0].webhook_uuid == 'test-uuid'
assert response.resources[1].webhook_uuid == 'test-uuid'
@pytest.mark.asyncio
@patch('server.routes.integration.gitlab.GitLabServiceImpl')
async def test_get_resources_non_saas_service(
self, mock_gitlab_service_impl, mock_gitlab_service
):
"""Test that non-SaaS GitLab service raises an error."""
# Arrange
user_id = 'test_user_id'
non_saas_service = AsyncMock()
mock_gitlab_service_impl.return_value = non_saas_service
# Act & Assert
with pytest.raises(HTTPException) as exc_info:
await get_gitlab_resources(user_id=user_id)
assert exc_info.value.status_code == status.HTTP_400_BAD_REQUEST
assert 'Only SaaS GitLab service is supported' in exc_info.value.detail
@pytest.mark.asyncio
@patch('server.routes.integration.gitlab.GitLabServiceImpl')
@patch('server.routes.integration.gitlab.webhook_store')
@patch('server.routes.integration.gitlab.isinstance')
async def test_get_resources_parallel_api_calls(
self,
mock_isinstance,
mock_webhook_store,
mock_gitlab_service_impl,
mock_gitlab_service,
):
"""Test that webhook status checks are made in parallel."""
# Arrange
user_id = 'test_user_id'
mock_gitlab_service_impl.return_value = mock_gitlab_service
mock_isinstance.return_value = True
mock_webhook_store.get_webhooks_by_resources = AsyncMock(return_value=({}, {}))
call_count = 0
async def track_calls(*args, **kwargs):
nonlocal call_count
call_count += 1
return (True, None)
mock_gitlab_service.check_webhook_exists_on_resource = AsyncMock(
side_effect=track_calls
)
# Act
await get_gitlab_resources(user_id=user_id)
# Assert
# Should be called for each resource (1 project + 1 group)
assert call_count == 2
class TestReinstallGitLabWebhook:
"""Test cases for reinstall_gitlab_webhook endpoint."""
@pytest.mark.asyncio
@patch('server.routes.integration.gitlab.install_webhook_on_resource')
@patch('server.routes.integration.gitlab.verify_webhook_conditions')
@patch('server.routes.integration.gitlab.GitLabServiceImpl')
@patch('server.routes.integration.gitlab.webhook_store')
@patch('server.routes.integration.gitlab.isinstance')
async def test_reinstall_webhook_success_existing_webhook(
self,
mock_isinstance,
mock_webhook_store,
mock_gitlab_service_impl,
mock_verify_conditions,
mock_install_webhook,
mock_gitlab_service,
mock_webhook,
):
"""Test successful webhook reinstallation when webhook record exists."""
# Arrange
user_id = 'test_user_id'
resource_id = 'project-123'
resource_type = GitLabResourceType.PROJECT
mock_gitlab_service_impl.return_value = mock_gitlab_service
mock_isinstance.return_value = True
mock_webhook_store.reset_webhook_for_reinstallation_by_resource = AsyncMock(
return_value=True
)
mock_webhook_store.get_webhook_by_resource_only = AsyncMock(
return_value=mock_webhook
)
mock_verify_conditions.return_value = None
mock_install_webhook.return_value = ('webhook-id-123', None)
body = ReinstallWebhookRequest(
resource=ResourceIdentifier(type=resource_type, id=resource_id)
)
# Act
result = await reinstall_gitlab_webhook(body=body, user_id=user_id)
# Assert
assert result.success is True
assert result.resource_id == resource_id
assert result.resource_type == resource_type.value
assert result.error is None
mock_gitlab_service.check_user_has_admin_access_to_resource.assert_called_once_with(
resource_type, resource_id
)
mock_webhook_store.reset_webhook_for_reinstallation_by_resource.assert_called_once_with(
resource_type, resource_id, user_id
)
mock_verify_conditions.assert_called_once()
mock_install_webhook.assert_called_once()
@pytest.mark.asyncio
@patch('server.routes.integration.gitlab.install_webhook_on_resource')
@patch('server.routes.integration.gitlab.verify_webhook_conditions')
@patch('server.routes.integration.gitlab.GitLabServiceImpl')
@patch('server.routes.integration.gitlab.webhook_store')
@patch('server.routes.integration.gitlab.isinstance')
async def test_reinstall_webhook_success_new_webhook_record(
self,
mock_isinstance,
mock_webhook_store,
mock_gitlab_service_impl,
mock_verify_conditions,
mock_install_webhook,
mock_gitlab_service,
):
"""Test successful webhook reinstallation when webhook record doesn't exist."""
# Arrange
user_id = 'test_user_id'
resource_id = 'project-456'
resource_type = GitLabResourceType.PROJECT
mock_gitlab_service_impl.return_value = mock_gitlab_service
mock_isinstance.return_value = True
mock_webhook_store.reset_webhook_for_reinstallation_by_resource = (
AsyncMock(return_value=False) # No existing webhook to reset
)
mock_webhook_store.get_webhook_by_resource_only = AsyncMock(
side_effect=[
None,
MagicMock(),
] # First call returns None, second returns new webhook
)
mock_webhook_store.store_webhooks = AsyncMock()
mock_verify_conditions.return_value = None
mock_install_webhook.return_value = ('webhook-id-456', None)
body = ReinstallWebhookRequest(
resource=ResourceIdentifier(type=resource_type, id=resource_id)
)
# Act
result = await reinstall_gitlab_webhook(body=body, user_id=user_id)
# Assert
assert result.success is True
mock_webhook_store.store_webhooks.assert_called_once()
# Should fetch webhook twice: once to check, once after creating
assert mock_webhook_store.get_webhook_by_resource_only.call_count == 2
@pytest.mark.asyncio
@patch('server.routes.integration.gitlab.GitLabServiceImpl')
@patch('server.routes.integration.gitlab.isinstance')
async def test_reinstall_webhook_no_admin_access(
self, mock_isinstance, mock_gitlab_service_impl, mock_gitlab_service
):
"""Test reinstallation when user doesn't have admin access."""
# Arrange
user_id = 'test_user_id'
resource_id = 'project-789'
resource_type = GitLabResourceType.PROJECT
mock_gitlab_service_impl.return_value = mock_gitlab_service
mock_isinstance.return_value = True
mock_gitlab_service.check_user_has_admin_access_to_resource = AsyncMock(
return_value=(False, None)
)
body = ReinstallWebhookRequest(
resource=ResourceIdentifier(type=resource_type, id=resource_id)
)
# Act & Assert
with pytest.raises(HTTPException) as exc_info:
await reinstall_gitlab_webhook(body=body, user_id=user_id)
assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN
assert 'does not have admin access' in exc_info.value.detail
@pytest.mark.asyncio
@patch('server.routes.integration.gitlab.GitLabServiceImpl')
async def test_reinstall_webhook_non_saas_service(self, mock_gitlab_service_impl):
"""Test reinstallation with non-SaaS GitLab service."""
# Arrange
user_id = 'test_user_id'
resource_id = 'project-999'
resource_type = GitLabResourceType.PROJECT
non_saas_service = AsyncMock()
mock_gitlab_service_impl.return_value = non_saas_service
body = ReinstallWebhookRequest(
resource=ResourceIdentifier(type=resource_type, id=resource_id)
)
# Act & Assert
with pytest.raises(HTTPException) as exc_info:
await reinstall_gitlab_webhook(body=body, user_id=user_id)
assert exc_info.value.status_code == status.HTTP_400_BAD_REQUEST
assert 'Only SaaS GitLab service is supported' in exc_info.value.detail
@pytest.mark.asyncio
@patch('server.routes.integration.gitlab.install_webhook_on_resource')
@patch('server.routes.integration.gitlab.verify_webhook_conditions')
@patch('server.routes.integration.gitlab.GitLabServiceImpl')
@patch('server.routes.integration.gitlab.webhook_store')
@patch('server.routes.integration.gitlab.isinstance')
async def test_reinstall_webhook_conditions_not_met(
self,
mock_isinstance,
mock_webhook_store,
mock_gitlab_service_impl,
mock_verify_conditions,
mock_install_webhook,
mock_gitlab_service,
mock_webhook,
):
"""Test reinstallation when webhook conditions are not met."""
# Arrange
user_id = 'test_user_id'
resource_id = 'project-111'
resource_type = GitLabResourceType.PROJECT
mock_gitlab_service_impl.return_value = mock_gitlab_service
mock_isinstance.return_value = True
mock_webhook_store.reset_webhook_for_reinstallation_by_resource = AsyncMock(
return_value=True
)
mock_webhook_store.get_webhook_by_resource_only = AsyncMock(
return_value=mock_webhook
)
mock_verify_conditions.side_effect = BreakLoopException()
body = ReinstallWebhookRequest(
resource=ResourceIdentifier(type=resource_type, id=resource_id)
)
# Act & Assert
with pytest.raises(HTTPException) as exc_info:
await reinstall_gitlab_webhook(body=body, user_id=user_id)
assert exc_info.value.status_code == status.HTTP_400_BAD_REQUEST
assert 'conditions not met' in exc_info.value.detail.lower()
mock_install_webhook.assert_not_called()
@pytest.mark.asyncio
@patch('server.routes.integration.gitlab.install_webhook_on_resource')
@patch('server.routes.integration.gitlab.verify_webhook_conditions')
@patch('server.routes.integration.gitlab.GitLabServiceImpl')
@patch('server.routes.integration.gitlab.webhook_store')
@patch('server.routes.integration.gitlab.isinstance')
async def test_reinstall_webhook_installation_fails(
self,
mock_isinstance,
mock_webhook_store,
mock_gitlab_service_impl,
mock_verify_conditions,
mock_install_webhook,
mock_gitlab_service,
mock_webhook,
):
"""Test reinstallation when webhook installation fails."""
# Arrange
user_id = 'test_user_id'
resource_id = 'project-222'
resource_type = GitLabResourceType.PROJECT
mock_gitlab_service_impl.return_value = mock_gitlab_service
mock_isinstance.return_value = True
mock_webhook_store.reset_webhook_for_reinstallation_by_resource = AsyncMock(
return_value=True
)
mock_webhook_store.get_webhook_by_resource_only = AsyncMock(
return_value=mock_webhook
)
mock_verify_conditions.return_value = None
mock_install_webhook.return_value = (None, None) # Installation failed
body = ReinstallWebhookRequest(
resource=ResourceIdentifier(type=resource_type, id=resource_id)
)
# Act & Assert
with pytest.raises(HTTPException) as exc_info:
await reinstall_gitlab_webhook(body=body, user_id=user_id)
assert exc_info.value.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
assert 'Failed to install webhook' in exc_info.value.detail
@pytest.mark.asyncio
@patch('server.routes.integration.gitlab.install_webhook_on_resource')
@patch('server.routes.integration.gitlab.verify_webhook_conditions')
@patch('server.routes.integration.gitlab.GitLabServiceImpl')
@patch('server.routes.integration.gitlab.webhook_store')
@patch('server.routes.integration.gitlab.isinstance')
async def test_reinstall_webhook_group_resource(
self,
mock_isinstance,
mock_webhook_store,
mock_gitlab_service_impl,
mock_verify_conditions,
mock_install_webhook,
mock_gitlab_service,
mock_webhook,
):
"""Test reinstallation for a group resource."""
# Arrange
user_id = 'test_user_id'
resource_id = 'group-333'
resource_type = GitLabResourceType.GROUP
mock_gitlab_service_impl.return_value = mock_gitlab_service
mock_isinstance.return_value = True
mock_webhook_store.reset_webhook_for_reinstallation_by_resource = AsyncMock(
return_value=True
)
mock_webhook_store.get_webhook_by_resource_only = AsyncMock(
return_value=mock_webhook
)
mock_verify_conditions.return_value = None
mock_install_webhook.return_value = ('webhook-id-group', None)
body = ReinstallWebhookRequest(
resource=ResourceIdentifier(type=resource_type, id=resource_id)
)
# Act
result = await reinstall_gitlab_webhook(body=body, user_id=user_id)
# Assert
assert result.success is True
assert result.resource_id == resource_id
assert result.resource_type == resource_type.value
mock_webhook_store.reset_webhook_for_reinstallation_by_resource.assert_called_once_with(
resource_type, resource_id, user_id
)

View File

@@ -0,0 +1,388 @@
"""Unit tests for GitlabWebhookStore."""
import pytest
from integrations.types import GitLabResourceType
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.pool import StaticPool
from storage.base import Base
from storage.gitlab_webhook import GitlabWebhook
from storage.gitlab_webhook_store import GitlabWebhookStore
@pytest.fixture
async def async_engine():
"""Create an async SQLite engine for testing."""
engine = create_async_engine(
'sqlite+aiosqlite:///:memory:',
poolclass=StaticPool,
connect_args={'check_same_thread': False},
echo=False,
)
# Create all tables
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
await engine.dispose()
@pytest.fixture
async def async_session_maker(async_engine):
"""Create an async session maker for testing."""
return async_sessionmaker(async_engine, class_=AsyncSession, expire_on_commit=False)
@pytest.fixture
async def webhook_store(async_session_maker):
"""Create a GitlabWebhookStore instance for testing."""
return GitlabWebhookStore(a_session_maker=async_session_maker)
@pytest.fixture
async def sample_webhooks(async_session_maker):
"""Create sample webhook records for testing."""
async with async_session_maker() as session:
# Create webhooks for user_1
webhook1 = GitlabWebhook(
project_id='project-1',
group_id=None,
user_id='user_1',
webhook_exists=True,
webhook_url='https://example.com/webhook',
webhook_secret='secret-1',
webhook_uuid='uuid-1',
)
webhook2 = GitlabWebhook(
project_id='project-2',
group_id=None,
user_id='user_1',
webhook_exists=True,
webhook_url='https://example.com/webhook',
webhook_secret='secret-2',
webhook_uuid='uuid-2',
)
webhook3 = GitlabWebhook(
project_id=None,
group_id='group-1',
user_id='user_1',
webhook_exists=False, # Already marked for reinstallation
webhook_url='https://example.com/webhook',
webhook_secret='secret-3',
webhook_uuid='uuid-3',
)
# Create webhook for user_2
webhook4 = GitlabWebhook(
project_id='project-3',
group_id=None,
user_id='user_2',
webhook_exists=True,
webhook_url='https://example.com/webhook',
webhook_secret='secret-4',
webhook_uuid='uuid-4',
)
session.add_all([webhook1, webhook2, webhook3, webhook4])
await session.commit()
# Refresh to get IDs (outside of begin() context)
await session.refresh(webhook1)
await session.refresh(webhook2)
await session.refresh(webhook3)
await session.refresh(webhook4)
return [webhook1, webhook2, webhook3, webhook4]
class TestGetWebhookByResourceOnly:
"""Test cases for get_webhook_by_resource_only method."""
@pytest.mark.asyncio
async def test_get_project_webhook_by_resource_only(
self, webhook_store, async_session_maker, sample_webhooks
):
"""Test getting a project webhook by resource ID without user_id filter."""
# Arrange
resource_type = GitLabResourceType.PROJECT
resource_id = 'project-1'
# Act
webhook = await webhook_store.get_webhook_by_resource_only(
resource_type, resource_id
)
# Assert
assert webhook is not None
assert webhook.project_id == resource_id
assert webhook.user_id == 'user_1'
@pytest.mark.asyncio
async def test_get_group_webhook_by_resource_only(
self, webhook_store, async_session_maker, sample_webhooks
):
"""Test getting a group webhook by resource ID without user_id filter."""
# Arrange
resource_type = GitLabResourceType.GROUP
resource_id = 'group-1'
# Act
webhook = await webhook_store.get_webhook_by_resource_only(
resource_type, resource_id
)
# Assert
assert webhook is not None
assert webhook.group_id == resource_id
assert webhook.user_id == 'user_1'
@pytest.mark.asyncio
async def test_get_webhook_by_resource_only_not_found(
self, webhook_store, async_session_maker
):
"""Test getting a webhook that doesn't exist."""
# Arrange
resource_type = GitLabResourceType.PROJECT
resource_id = 'non-existent-project'
# Act
webhook = await webhook_store.get_webhook_by_resource_only(
resource_type, resource_id
)
# Assert
assert webhook is None
@pytest.mark.asyncio
async def test_get_webhook_by_resource_only_organization_wide(
self, webhook_store, async_session_maker, sample_webhooks
):
"""Test that webhook lookup works regardless of which user originally created it."""
# Arrange
resource_type = GitLabResourceType.PROJECT
resource_id = 'project-3' # Created by user_2
# Act
webhook = await webhook_store.get_webhook_by_resource_only(
resource_type, resource_id
)
# Assert
assert webhook is not None
assert webhook.project_id == resource_id
# Should find webhook even though it was created by a different user
assert webhook.user_id == 'user_2'
class TestResetWebhookForReinstallationByResource:
"""Test cases for reset_webhook_for_reinstallation_by_resource method."""
@pytest.mark.asyncio
async def test_reset_project_webhook_by_resource(
self, webhook_store, async_session_maker, sample_webhooks
):
"""Test resetting a project webhook by resource without user_id filter."""
# Arrange
resource_type = GitLabResourceType.PROJECT
resource_id = 'project-1'
updating_user_id = 'user_2' # Different user can reset it
# Act
result = await webhook_store.reset_webhook_for_reinstallation_by_resource(
resource_type, resource_id, updating_user_id
)
# Assert
assert result is True
# Verify webhook was reset
async with async_session_maker() as session:
result_query = await session.execute(
select(GitlabWebhook).where(GitlabWebhook.project_id == resource_id)
)
webhook = result_query.scalars().first()
assert webhook.webhook_exists is False
assert webhook.webhook_uuid is None
assert (
webhook.user_id == updating_user_id
) # Updated to track who modified it
@pytest.mark.asyncio
async def test_reset_group_webhook_by_resource(
self, webhook_store, async_session_maker, sample_webhooks
):
"""Test resetting a group webhook by resource without user_id filter."""
# Arrange
resource_type = GitLabResourceType.GROUP
resource_id = 'group-1'
updating_user_id = 'user_2'
# Act
result = await webhook_store.reset_webhook_for_reinstallation_by_resource(
resource_type, resource_id, updating_user_id
)
# Assert
assert result is True
# Verify webhook was reset
async with async_session_maker() as session:
result_query = await session.execute(
select(GitlabWebhook).where(GitlabWebhook.group_id == resource_id)
)
webhook = result_query.scalars().first()
assert webhook.webhook_exists is False
assert webhook.webhook_uuid is None
assert webhook.user_id == updating_user_id
@pytest.mark.asyncio
async def test_reset_webhook_by_resource_not_found(
self, webhook_store, async_session_maker
):
"""Test resetting a webhook that doesn't exist."""
# Arrange
resource_type = GitLabResourceType.PROJECT
resource_id = 'non-existent-project'
updating_user_id = 'user_1'
# Act
result = await webhook_store.reset_webhook_for_reinstallation_by_resource(
resource_type, resource_id, updating_user_id
)
# Assert
assert result is False
@pytest.mark.asyncio
async def test_reset_webhook_by_resource_organization_wide(
self, webhook_store, async_session_maker, sample_webhooks
):
"""Test that any user can reset a webhook regardless of original creator."""
# Arrange
resource_type = GitLabResourceType.PROJECT
resource_id = 'project-3' # Created by user_2
updating_user_id = 'user_1' # Different user resetting it
# Act
result = await webhook_store.reset_webhook_for_reinstallation_by_resource(
resource_type, resource_id, updating_user_id
)
# Assert
assert result is True
# Verify webhook was reset and user_id updated
async with async_session_maker() as session:
result_query = await session.execute(
select(GitlabWebhook).where(GitlabWebhook.project_id == resource_id)
)
webhook = result_query.scalars().first()
assert webhook.webhook_exists is False
assert webhook.user_id == updating_user_id
class TestGetWebhooksByResources:
"""Test cases for get_webhooks_by_resources method."""
@pytest.mark.asyncio
async def test_get_webhooks_by_resources_projects_only(
self, webhook_store, async_session_maker, sample_webhooks
):
"""Test bulk fetching webhooks for multiple projects."""
# Arrange
project_ids = ['project-1', 'project-2', 'project-3']
group_ids: list[str] = []
# Act
project_map, group_map = await webhook_store.get_webhooks_by_resources(
project_ids, group_ids
)
# Assert
assert len(project_map) == 3
assert 'project-1' in project_map
assert 'project-2' in project_map
assert 'project-3' in project_map
assert len(group_map) == 0
@pytest.mark.asyncio
async def test_get_webhooks_by_resources_groups_only(
self, webhook_store, async_session_maker, sample_webhooks
):
"""Test bulk fetching webhooks for multiple groups."""
# Arrange
project_ids: list[str] = []
group_ids = ['group-1']
# Act
project_map, group_map = await webhook_store.get_webhooks_by_resources(
project_ids, group_ids
)
# Assert
assert len(project_map) == 0
assert len(group_map) == 1
assert 'group-1' in group_map
@pytest.mark.asyncio
async def test_get_webhooks_by_resources_mixed(
self, webhook_store, async_session_maker, sample_webhooks
):
"""Test bulk fetching webhooks for both projects and groups."""
# Arrange
project_ids = ['project-1', 'project-2']
group_ids = ['group-1']
# Act
project_map, group_map = await webhook_store.get_webhooks_by_resources(
project_ids, group_ids
)
# Assert
assert len(project_map) == 2
assert len(group_map) == 1
assert 'project-1' in project_map
assert 'project-2' in project_map
assert 'group-1' in group_map
@pytest.mark.asyncio
async def test_get_webhooks_by_resources_empty_lists(
self, webhook_store, async_session_maker
):
"""Test bulk fetching with empty ID lists."""
# Arrange
project_ids: list[str] = []
group_ids: list[str] = []
# Act
project_map, group_map = await webhook_store.get_webhooks_by_resources(
project_ids, group_ids
)
# Assert
assert len(project_map) == 0
assert len(group_map) == 0
@pytest.mark.asyncio
async def test_get_webhooks_by_resources_partial_matches(
self, webhook_store, async_session_maker, sample_webhooks
):
"""Test bulk fetching when some IDs don't exist."""
# Arrange
project_ids = ['project-1', 'non-existent-project']
group_ids = ['group-1', 'non-existent-group']
# Act
project_map, group_map = await webhook_store.get_webhooks_by_resources(
project_ids, group_ids
)
# Assert
assert len(project_map) == 1
assert 'project-1' in project_map
assert 'non-existent-project' not in project_map
assert len(group_map) == 1
assert 'group-1' in group_map
assert 'non-existent-group' not in group_map

View File

@@ -0,0 +1,438 @@
"""Unit tests for install_gitlab_webhooks module."""
from unittest.mock import AsyncMock, MagicMock
import pytest
from integrations.gitlab.webhook_installation import (
BreakLoopException,
install_webhook_on_resource,
verify_webhook_conditions,
)
from integrations.types import GitLabResourceType
from integrations.utils import GITLAB_WEBHOOK_URL
from storage.gitlab_webhook import GitlabWebhook, WebhookStatus
@pytest.fixture
def mock_gitlab_service():
"""Create a mock GitLab service."""
service = MagicMock()
service.check_resource_exists = AsyncMock(return_value=(True, None))
service.check_user_has_admin_access_to_resource = AsyncMock(
return_value=(True, None)
)
service.check_webhook_exists_on_resource = AsyncMock(return_value=(False, None))
service.install_webhook = AsyncMock(return_value=('webhook-id-123', None))
return service
@pytest.fixture
def mock_webhook_store():
"""Create a mock webhook store."""
store = MagicMock()
store.delete_webhook = AsyncMock()
store.update_webhook = AsyncMock()
return store
@pytest.fixture
def sample_webhook():
"""Create a sample webhook object."""
webhook = MagicMock(spec=GitlabWebhook)
webhook.user_id = 'test_user_id'
webhook.webhook_exists = False
webhook.webhook_uuid = None
return webhook
class TestVerifyWebhookConditions:
"""Test cases for verify_webhook_conditions function."""
@pytest.mark.asyncio
async def test_verify_conditions_all_pass(
self, mock_gitlab_service, mock_webhook_store, sample_webhook
):
"""Test when all conditions are met for webhook installation."""
# Arrange
resource_type = GitLabResourceType.PROJECT
resource_id = 'project-123'
# Act
# Should not raise any exception
await verify_webhook_conditions(
gitlab_service=mock_gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_store=mock_webhook_store,
webhook=sample_webhook,
)
# Assert
mock_gitlab_service.check_resource_exists.assert_called_once_with(
resource_type, resource_id
)
mock_gitlab_service.check_user_has_admin_access_to_resource.assert_called_once_with(
resource_type, resource_id
)
mock_gitlab_service.check_webhook_exists_on_resource.assert_called_once_with(
resource_type, resource_id, GITLAB_WEBHOOK_URL
)
mock_webhook_store.delete_webhook.assert_not_called()
@pytest.mark.asyncio
async def test_verify_conditions_resource_does_not_exist(
self, mock_gitlab_service, mock_webhook_store, sample_webhook
):
"""Test when resource does not exist."""
# Arrange
resource_type = GitLabResourceType.PROJECT
resource_id = 'project-999'
mock_gitlab_service.check_resource_exists = AsyncMock(
return_value=(False, None)
)
# Act & Assert
with pytest.raises(BreakLoopException):
await verify_webhook_conditions(
gitlab_service=mock_gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_store=mock_webhook_store,
webhook=sample_webhook,
)
# Assert webhook is deleted
mock_webhook_store.delete_webhook.assert_called_once_with(sample_webhook)
@pytest.mark.asyncio
async def test_verify_conditions_rate_limited_on_resource_check(
self, mock_gitlab_service, mock_webhook_store, sample_webhook
):
"""Test when rate limited during resource existence check."""
# Arrange
resource_type = GitLabResourceType.PROJECT
resource_id = 'project-123'
mock_gitlab_service.check_resource_exists = AsyncMock(
return_value=(False, WebhookStatus.RATE_LIMITED)
)
# Act & Assert
with pytest.raises(BreakLoopException):
await verify_webhook_conditions(
gitlab_service=mock_gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_store=mock_webhook_store,
webhook=sample_webhook,
)
# Should not delete webhook on rate limit
mock_webhook_store.delete_webhook.assert_not_called()
@pytest.mark.asyncio
async def test_verify_conditions_user_no_admin_access(
self, mock_gitlab_service, mock_webhook_store, sample_webhook
):
"""Test when user does not have admin access."""
# Arrange
resource_type = GitLabResourceType.GROUP
resource_id = 'group-456'
mock_gitlab_service.check_user_has_admin_access_to_resource = AsyncMock(
return_value=(False, None)
)
# Act & Assert
with pytest.raises(BreakLoopException):
await verify_webhook_conditions(
gitlab_service=mock_gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_store=mock_webhook_store,
webhook=sample_webhook,
)
# Assert webhook is deleted
mock_webhook_store.delete_webhook.assert_called_once_with(sample_webhook)
@pytest.mark.asyncio
async def test_verify_conditions_rate_limited_on_admin_check(
self, mock_gitlab_service, mock_webhook_store, sample_webhook
):
"""Test when rate limited during admin access check."""
# Arrange
resource_type = GitLabResourceType.PROJECT
resource_id = 'project-123'
mock_gitlab_service.check_user_has_admin_access_to_resource = AsyncMock(
return_value=(False, WebhookStatus.RATE_LIMITED)
)
# Act & Assert
with pytest.raises(BreakLoopException):
await verify_webhook_conditions(
gitlab_service=mock_gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_store=mock_webhook_store,
webhook=sample_webhook,
)
# Should not delete webhook on rate limit
mock_webhook_store.delete_webhook.assert_not_called()
@pytest.mark.asyncio
async def test_verify_conditions_webhook_already_exists(
self, mock_gitlab_service, mock_webhook_store, sample_webhook
):
"""Test when webhook already exists on resource."""
# Arrange
resource_type = GitLabResourceType.PROJECT
resource_id = 'project-123'
mock_gitlab_service.check_webhook_exists_on_resource = AsyncMock(
return_value=(True, None)
)
# Act & Assert
with pytest.raises(BreakLoopException):
await verify_webhook_conditions(
gitlab_service=mock_gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_store=mock_webhook_store,
webhook=sample_webhook,
)
@pytest.mark.asyncio
async def test_verify_conditions_rate_limited_on_webhook_check(
self, mock_gitlab_service, mock_webhook_store, sample_webhook
):
"""Test when rate limited during webhook existence check."""
# Arrange
resource_type = GitLabResourceType.PROJECT
resource_id = 'project-123'
mock_gitlab_service.check_webhook_exists_on_resource = AsyncMock(
return_value=(False, WebhookStatus.RATE_LIMITED)
)
# Act & Assert
with pytest.raises(BreakLoopException):
await verify_webhook_conditions(
gitlab_service=mock_gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_store=mock_webhook_store,
webhook=sample_webhook,
)
@pytest.mark.asyncio
async def test_verify_conditions_updates_webhook_status_mismatch(
self, mock_gitlab_service, mock_webhook_store, sample_webhook
):
"""Test that webhook status is updated when database and API don't match."""
# Arrange
resource_type = GitLabResourceType.PROJECT
resource_id = 'project-123'
sample_webhook.webhook_exists = True # DB says exists
mock_gitlab_service.check_webhook_exists_on_resource = AsyncMock(
return_value=(False, None) # API says doesn't exist
)
# Act
# Should not raise BreakLoopException when webhook doesn't exist (allows installation)
await verify_webhook_conditions(
gitlab_service=mock_gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_store=mock_webhook_store,
webhook=sample_webhook,
)
# Assert webhook status was updated to match API
mock_webhook_store.update_webhook.assert_called_once_with(
sample_webhook, {'webhook_exists': False}
)
class TestInstallWebhookOnResource:
"""Test cases for install_webhook_on_resource function."""
@pytest.mark.asyncio
async def test_install_webhook_success(
self, mock_gitlab_service, mock_webhook_store, sample_webhook
):
"""Test successful webhook installation."""
# Arrange
resource_type = GitLabResourceType.PROJECT
resource_id = 'project-123'
# Act
webhook_id, status = await install_webhook_on_resource(
gitlab_service=mock_gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_store=mock_webhook_store,
webhook=sample_webhook,
)
# Assert
assert webhook_id == 'webhook-id-123'
assert status is None
mock_gitlab_service.install_webhook.assert_called_once()
mock_webhook_store.update_webhook.assert_called_once()
# Verify update_webhook was called with correct fields (using keyword arguments)
call_args = mock_webhook_store.update_webhook.call_args
assert call_args[1]['webhook'] == sample_webhook
update_fields = call_args[1]['update_fields']
assert update_fields['webhook_exists'] is True
assert update_fields['webhook_url'] == GITLAB_WEBHOOK_URL
assert 'webhook_secret' in update_fields
assert 'webhook_uuid' in update_fields
assert 'scopes' in update_fields
@pytest.mark.asyncio
async def test_install_webhook_group_resource(
self, mock_gitlab_service, mock_webhook_store, sample_webhook
):
"""Test webhook installation for a group resource."""
# Arrange
resource_type = GitLabResourceType.GROUP
resource_id = 'group-456'
# Act
webhook_id, status = await install_webhook_on_resource(
gitlab_service=mock_gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_store=mock_webhook_store,
webhook=sample_webhook,
)
# Assert
assert webhook_id == 'webhook-id-123'
# Verify install_webhook was called with GROUP type
call_args = mock_gitlab_service.install_webhook.call_args
assert call_args[1]['resource_type'] == resource_type
assert call_args[1]['resource_id'] == resource_id
@pytest.mark.asyncio
async def test_install_webhook_rate_limited(
self, mock_gitlab_service, mock_webhook_store, sample_webhook
):
"""Test when installation is rate limited."""
# Arrange
resource_type = GitLabResourceType.PROJECT
resource_id = 'project-123'
mock_gitlab_service.install_webhook = AsyncMock(
return_value=(None, WebhookStatus.RATE_LIMITED)
)
# Act & Assert
with pytest.raises(BreakLoopException):
await install_webhook_on_resource(
gitlab_service=mock_gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_store=mock_webhook_store,
webhook=sample_webhook,
)
# Should not update webhook on rate limit
mock_webhook_store.update_webhook.assert_not_called()
@pytest.mark.asyncio
async def test_install_webhook_installation_fails(
self, mock_gitlab_service, mock_webhook_store, sample_webhook
):
"""Test when webhook installation fails."""
# Arrange
resource_type = GitLabResourceType.PROJECT
resource_id = 'project-123'
mock_gitlab_service.install_webhook = AsyncMock(return_value=(None, None))
# Act
webhook_id, status = await install_webhook_on_resource(
gitlab_service=mock_gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_store=mock_webhook_store,
webhook=sample_webhook,
)
# Assert
assert webhook_id is None
assert status is None
# Should not update webhook when installation fails
mock_webhook_store.update_webhook.assert_not_called()
@pytest.mark.asyncio
async def test_install_webhook_generates_unique_secrets(
self, mock_gitlab_service, mock_webhook_store, sample_webhook
):
"""Test that unique webhook secrets and UUIDs are generated."""
# Arrange
resource_type = GitLabResourceType.PROJECT
resource_id = 'project-123'
# Act - First call
webhook_id1, _ = await install_webhook_on_resource(
gitlab_service=mock_gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_store=mock_webhook_store,
webhook=sample_webhook,
)
# Capture first call's values before resetting
call1_secret = mock_webhook_store.update_webhook.call_args_list[0][1][
'update_fields'
]['webhook_secret']
call1_uuid = mock_webhook_store.update_webhook.call_args_list[0][1][
'update_fields'
]['webhook_uuid']
# Reset mocks and call again
mock_gitlab_service.install_webhook.reset_mock()
mock_webhook_store.update_webhook.reset_mock()
# Act - Second call
webhook_id2, _ = await install_webhook_on_resource(
gitlab_service=mock_gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_store=mock_webhook_store,
webhook=sample_webhook,
)
# Capture second call's values
call2_secret = mock_webhook_store.update_webhook.call_args_list[0][1][
'update_fields'
]['webhook_secret']
call2_uuid = mock_webhook_store.update_webhook.call_args_list[0][1][
'update_fields'
]['webhook_uuid']
# Assert - Secrets and UUIDs should be different
assert call1_secret != call2_secret
assert call1_uuid != call2_uuid
@pytest.mark.asyncio
async def test_install_webhook_uses_correct_webhook_name_and_url(
self, mock_gitlab_service, mock_webhook_store, sample_webhook
):
"""Test that correct webhook name and URL are used."""
# Arrange
resource_type = GitLabResourceType.PROJECT
resource_id = 'project-123'
# Act
await install_webhook_on_resource(
gitlab_service=mock_gitlab_service,
resource_type=resource_type,
resource_id=resource_id,
webhook_store=mock_webhook_store,
webhook=sample_webhook,
)
# Assert
call_args = mock_gitlab_service.install_webhook.call_args
assert call_args[1]['webhook_name'] == 'OpenHands Resolver'
assert call_args[1]['webhook_url'] == GITLAB_WEBHOOK_URL