feat: support security analyzer settings for v1 conversations (#12008)

This commit is contained in:
Hiep Le
2025-12-12 21:49:15 +07:00
committed by GitHub
parent 5a21c59a3c
commit c6a8fc379b
4 changed files with 417 additions and 14 deletions

View File

@@ -1,11 +1,13 @@
"""Unit tests for git functionality in AppConversationServiceBase.
"""Unit tests for git and security functionality in AppConversationServiceBase.
This module tests the git-related functionality, specifically the clone_or_init_git_repo method
and the recent bug fixes for git checkout operations.
"""
import subprocess
from types import MethodType
from unittest.mock import AsyncMock, MagicMock, Mock, patch
from uuid import uuid4
import pytest
@@ -434,13 +436,298 @@ def test_create_condenser_plan_agent_with_custom_max_size(mock_condenser_class):
mock_llm.model_copy.assert_called_once()
# =============================================================================
# Tests for security analyzer helpers
# =============================================================================
@pytest.mark.parametrize('value', [None, '', 'none', 'NoNe'])
def test_create_security_analyzer_returns_none_for_empty_values(value):
"""_create_security_analyzer_from_string returns None for empty/none values."""
# Arrange
service, _ = _create_service_with_mock_user_context(
MockUserInfo(), bind_methods=('_create_security_analyzer_from_string',)
)
# Act
result = service._create_security_analyzer_from_string(value)
# Assert
assert result is None
def test_create_security_analyzer_returns_llm_analyzer():
"""_create_security_analyzer_from_string returns LLMSecurityAnalyzer for llm string."""
# Arrange
security_analyzer_str = 'llm'
service, _ = _create_service_with_mock_user_context(
MockUserInfo(), bind_methods=('_create_security_analyzer_from_string',)
)
# Act
result = service._create_security_analyzer_from_string(security_analyzer_str)
# Assert
from openhands.sdk.security.llm_analyzer import LLMSecurityAnalyzer
assert isinstance(result, LLMSecurityAnalyzer)
def test_create_security_analyzer_logs_warning_for_unknown_value():
"""_create_security_analyzer_from_string logs warning and returns None for unknown."""
# Arrange
unknown_value = 'custom'
service, _ = _create_service_with_mock_user_context(
MockUserInfo(), bind_methods=('_create_security_analyzer_from_string',)
)
# Act
with patch(
'openhands.app_server.app_conversation.app_conversation_service_base._logger'
) as mock_logger:
result = service._create_security_analyzer_from_string(unknown_value)
# Assert
assert result is None
mock_logger.warning.assert_called_once()
def test_select_confirmation_policy_when_disabled_returns_never_confirm():
"""_select_confirmation_policy returns NeverConfirm when confirmation_mode is False."""
# Arrange
confirmation_mode = False
security_analyzer = 'llm'
service, _ = _create_service_with_mock_user_context(
MockUserInfo(), bind_methods=('_select_confirmation_policy',)
)
# Act
policy = service._select_confirmation_policy(confirmation_mode, security_analyzer)
# Assert
from openhands.sdk.security.confirmation_policy import NeverConfirm
assert isinstance(policy, NeverConfirm)
def test_select_confirmation_policy_llm_returns_confirm_risky():
"""_select_confirmation_policy uses ConfirmRisky when analyzer is llm."""
# Arrange
confirmation_mode = True
security_analyzer = 'llm'
service, _ = _create_service_with_mock_user_context(
MockUserInfo(), bind_methods=('_select_confirmation_policy',)
)
# Act
policy = service._select_confirmation_policy(confirmation_mode, security_analyzer)
# Assert
from openhands.sdk.security.confirmation_policy import ConfirmRisky
assert isinstance(policy, ConfirmRisky)
@pytest.mark.parametrize('security_analyzer', [None, '', 'none', 'custom'])
def test_select_confirmation_policy_non_llm_returns_always_confirm(
security_analyzer,
):
"""_select_confirmation_policy falls back to AlwaysConfirm for non-llm values."""
# Arrange
confirmation_mode = True
service, _ = _create_service_with_mock_user_context(
MockUserInfo(), bind_methods=('_select_confirmation_policy',)
)
# Act
policy = service._select_confirmation_policy(confirmation_mode, security_analyzer)
# Assert
from openhands.sdk.security.confirmation_policy import AlwaysConfirm
assert isinstance(policy, AlwaysConfirm)
@pytest.mark.asyncio
async def test_set_security_analyzer_skips_when_no_session_key():
"""_set_security_analyzer_from_settings exits early without session_api_key."""
# Arrange
agent_server_url = 'https://agent.example.com'
conversation_id = uuid4()
httpx_client = AsyncMock()
service, _ = _create_service_with_mock_user_context(
MockUserInfo(),
bind_methods=(
'_create_security_analyzer_from_string',
'_set_security_analyzer_from_settings',
),
)
with patch.object(service, '_create_security_analyzer_from_string') as mock_create:
# Act
await service._set_security_analyzer_from_settings(
agent_server_url=agent_server_url,
session_api_key=None,
conversation_id=conversation_id,
security_analyzer_str='llm',
httpx_client=httpx_client,
)
# Assert
mock_create.assert_not_called()
httpx_client.post.assert_not_called()
@pytest.mark.asyncio
async def test_set_security_analyzer_skips_when_analyzer_none():
"""_set_security_analyzer_from_settings skips API call when analyzer resolves to None."""
# Arrange
agent_server_url = 'https://agent.example.com'
session_api_key = 'session-key'
conversation_id = uuid4()
httpx_client = AsyncMock()
service, _ = _create_service_with_mock_user_context(
MockUserInfo(),
bind_methods=(
'_create_security_analyzer_from_string',
'_set_security_analyzer_from_settings',
),
)
with patch.object(
service, '_create_security_analyzer_from_string', return_value=None
) as mock_create:
# Act
await service._set_security_analyzer_from_settings(
agent_server_url=agent_server_url,
session_api_key=session_api_key,
conversation_id=conversation_id,
security_analyzer_str='none',
httpx_client=httpx_client,
)
# Assert
mock_create.assert_called_once_with('none')
httpx_client.post.assert_not_called()
class DummyAnalyzer:
"""Simple analyzer stub for testing model_dump contract."""
def __init__(self, payload: dict):
self._payload = payload
def model_dump(self) -> dict:
return self._payload
@pytest.mark.asyncio
async def test_set_security_analyzer_successfully_calls_agent_server():
"""_set_security_analyzer_from_settings posts analyzer payload when available."""
# Arrange
agent_server_url = 'https://agent.example.com'
session_api_key = 'session-key'
conversation_id = uuid4()
analyzer_payload = {'type': 'llm'}
httpx_client = AsyncMock()
http_response = MagicMock()
http_response.raise_for_status = MagicMock()
httpx_client.post.return_value = http_response
service, _ = _create_service_with_mock_user_context(
MockUserInfo(),
bind_methods=(
'_create_security_analyzer_from_string',
'_set_security_analyzer_from_settings',
),
)
analyzer = DummyAnalyzer(analyzer_payload)
with (
patch.object(
service,
'_create_security_analyzer_from_string',
return_value=analyzer,
) as mock_create,
patch(
'openhands.app_server.app_conversation.app_conversation_service_base._logger'
) as mock_logger,
):
# Act
await service._set_security_analyzer_from_settings(
agent_server_url=agent_server_url,
session_api_key=session_api_key,
conversation_id=conversation_id,
security_analyzer_str='llm',
httpx_client=httpx_client,
)
# Assert
mock_create.assert_called_once_with('llm')
httpx_client.post.assert_awaited_once_with(
f'{agent_server_url}/api/conversations/{conversation_id}/security_analyzer',
json={'security_analyzer': analyzer_payload},
headers={'X-Session-API-Key': session_api_key},
timeout=30.0,
)
http_response.raise_for_status.assert_called_once()
mock_logger.info.assert_called()
@pytest.mark.asyncio
async def test_set_security_analyzer_logs_warning_on_failure():
"""_set_security_analyzer_from_settings warns but does not raise on errors."""
# Arrange
agent_server_url = 'https://agent.example.com'
session_api_key = 'session-key'
conversation_id = uuid4()
analyzer_payload = {'type': 'llm'}
httpx_client = AsyncMock()
httpx_client.post.side_effect = RuntimeError('network down')
service, _ = _create_service_with_mock_user_context(
MockUserInfo(),
bind_methods=(
'_create_security_analyzer_from_string',
'_set_security_analyzer_from_settings',
),
)
analyzer = DummyAnalyzer(analyzer_payload)
with (
patch.object(
service,
'_create_security_analyzer_from_string',
return_value=analyzer,
) as mock_create,
patch(
'openhands.app_server.app_conversation.app_conversation_service_base._logger'
) as mock_logger,
):
# Act
await service._set_security_analyzer_from_settings(
agent_server_url=agent_server_url,
session_api_key=session_api_key,
conversation_id=conversation_id,
security_analyzer_str='llm',
httpx_client=httpx_client,
)
# Assert
mock_create.assert_called_once_with('llm')
httpx_client.post.assert_awaited_once()
mock_logger.warning.assert_called()
# =============================================================================
# Tests for _configure_git_user_settings
# =============================================================================
def _create_service_with_mock_user_context(user_info: MockUserInfo) -> tuple:
"""Create a mock service with the actual _configure_git_user_settings method.
def _create_service_with_mock_user_context(
user_info: MockUserInfo, bind_methods: tuple[str, ...] | None = None
) -> tuple:
"""Create a mock service with selected real methods bound for testing.
Uses MagicMock for the service but binds the real method for testing.
@@ -452,13 +739,16 @@ def _create_service_with_mock_user_context(user_info: MockUserInfo) -> tuple:
# Create a simple mock service and set required attribute
service = MagicMock()
service.user_context = mock_user_context
methods_to_bind = ['_configure_git_user_settings']
if bind_methods:
methods_to_bind.extend(bind_methods)
# Remove potential duplicates while keeping order
methods_to_bind = list(dict.fromkeys(methods_to_bind))
# Bind the actual method from the real class to test real implementation
service._configure_git_user_settings = (
lambda workspace: AppConversationServiceBase._configure_git_user_settings(
service, workspace
)
)
# Bind actual methods from the real class to test implementations directly
for method_name in methods_to_bind:
real_method = getattr(AppConversationServiceBase, method_name)
setattr(service, method_name, MethodType(real_method, service))
return service, mock_user_context

View File

@@ -153,6 +153,7 @@ class TestExperimentManagerIntegration:
llm_api_key=None,
confirmation_mode=False,
condenser_max_size=None,
security_analyzer=None,
)
async def get_secrets(self):