From 9753ad3a487b864074c6e1d0e7ab079381353f60 Mon Sep 17 00:00:00 2001 From: Tim O'Farrell Date: Mon, 15 Dec 2025 10:47:21 -0700 Subject: [PATCH] Removed Legacy Conversation Manager (#12053) --- .../server/legacy_conversation_manager.py | 331 ------------ .../unit/test_legacy_conversation_manager.py | 485 ------------------ 2 files changed, 816 deletions(-) delete mode 100644 enterprise/server/legacy_conversation_manager.py delete mode 100644 enterprise/tests/unit/test_legacy_conversation_manager.py diff --git a/enterprise/server/legacy_conversation_manager.py b/enterprise/server/legacy_conversation_manager.py deleted file mode 100644 index 5c82b5b420..0000000000 --- a/enterprise/server/legacy_conversation_manager.py +++ /dev/null @@ -1,331 +0,0 @@ -from __future__ import annotations - -import time -from dataclasses import dataclass, field - -import socketio -from server.clustered_conversation_manager import ClusteredConversationManager -from server.saas_nested_conversation_manager import SaasNestedConversationManager - -from openhands.core.config import LLMConfig, OpenHandsConfig -from openhands.events.action import MessageAction -from openhands.server.config.server_config import ServerConfig -from openhands.server.conversation_manager.conversation_manager import ( - ConversationManager, -) -from openhands.server.data_models.agent_loop_info import AgentLoopInfo -from openhands.server.monitoring import MonitoringListener -from openhands.server.session.conversation import ServerConversation -from openhands.storage.data_models.settings import Settings -from openhands.storage.files import FileStore -from openhands.utils.async_utils import wait_all - -_LEGACY_ENTRY_TIMEOUT_SECONDS = 3600 - - -@dataclass -class LegacyCacheEntry: - """Cache entry for legacy mode status.""" - - is_legacy: bool - timestamp: float - - -@dataclass -class LegacyConversationManager(ConversationManager): - """ - Conversation manager for use while migrating - since existing conversations are not nested! - Separate class from SaasNestedConversationManager so it can be easliy removed in a few weeks. - (As of 2025-07-23) - """ - - sio: socketio.AsyncServer - config: OpenHandsConfig - server_config: ServerConfig - file_store: FileStore - conversation_manager: SaasNestedConversationManager - legacy_conversation_manager: ClusteredConversationManager - _legacy_cache: dict[str, LegacyCacheEntry] = field(default_factory=dict) - - async def __aenter__(self): - await wait_all( - [ - self.conversation_manager.__aenter__(), - self.legacy_conversation_manager.__aenter__(), - ] - ) - return self - - async def __aexit__(self, exc_type, exc_value, traceback): - await wait_all( - [ - self.conversation_manager.__aexit__(exc_type, exc_value, traceback), - self.legacy_conversation_manager.__aexit__( - exc_type, exc_value, traceback - ), - ] - ) - - async def request_llm_completion( - self, - sid: str, - service_id: str, - llm_config: LLMConfig, - messages: list[dict[str, str]], - ) -> str: - session = self.get_agent_session(sid) - llm_registry = session.llm_registry - return llm_registry.request_extraneous_completion( - service_id, llm_config, messages - ) - - async def attach_to_conversation( - self, sid: str, user_id: str | None = None - ) -> ServerConversation | None: - if await self.should_start_in_legacy_mode(sid): - return await self.legacy_conversation_manager.attach_to_conversation( - sid, user_id - ) - return await self.conversation_manager.attach_to_conversation(sid, user_id) - - async def detach_from_conversation(self, conversation: ServerConversation): - if await self.should_start_in_legacy_mode(conversation.sid): - return await self.legacy_conversation_manager.detach_from_conversation( - conversation - ) - return await self.conversation_manager.detach_from_conversation(conversation) - - async def join_conversation( - self, - sid: str, - connection_id: str, - settings: Settings, - user_id: str | None, - ) -> AgentLoopInfo: - if await self.should_start_in_legacy_mode(sid): - return await self.legacy_conversation_manager.join_conversation( - sid, connection_id, settings, user_id - ) - return await self.conversation_manager.join_conversation( - sid, connection_id, settings, user_id - ) - - def get_agent_session(self, sid: str): - session = self.legacy_conversation_manager.get_agent_session(sid) - if session is None: - session = self.conversation_manager.get_agent_session(sid) - return session - - async def get_running_agent_loops( - self, user_id: str | None = None, filter_to_sids: set[str] | None = None - ) -> set[str]: - if filter_to_sids and len(filter_to_sids) == 1: - sid = next(iter(filter_to_sids)) - if await self.should_start_in_legacy_mode(sid): - return await self.legacy_conversation_manager.get_running_agent_loops( - user_id, filter_to_sids - ) - return await self.conversation_manager.get_running_agent_loops( - user_id, filter_to_sids - ) - - # Get all running agent loops from both managers - agent_loops, legacy_agent_loops = await wait_all( - [ - self.conversation_manager.get_running_agent_loops( - user_id, filter_to_sids - ), - self.legacy_conversation_manager.get_running_agent_loops( - user_id, filter_to_sids - ), - ] - ) - - # Combine the results - result = set() - for sid in legacy_agent_loops: - if await self.should_start_in_legacy_mode(sid): - result.add(sid) - - for sid in agent_loops: - if not await self.should_start_in_legacy_mode(sid): - result.add(sid) - - return result - - async def is_agent_loop_running(self, sid: str) -> bool: - return bool(await self.get_running_agent_loops(filter_to_sids={sid})) - - async def get_connections( - self, user_id: str | None = None, filter_to_sids: set[str] | None = None - ) -> dict[str, str]: - if filter_to_sids and len(filter_to_sids) == 1: - sid = next(iter(filter_to_sids)) - if await self.should_start_in_legacy_mode(sid): - return await self.legacy_conversation_manager.get_connections( - user_id, filter_to_sids - ) - return await self.conversation_manager.get_connections( - user_id, filter_to_sids - ) - agent_loops, legacy_agent_loops = await wait_all( - [ - self.conversation_manager.get_connections(user_id, filter_to_sids), - self.legacy_conversation_manager.get_connections( - user_id, filter_to_sids - ), - ] - ) - legacy_agent_loops.update(agent_loops) - return legacy_agent_loops - - async def maybe_start_agent_loop( - self, - sid: str, - settings: Settings, - user_id: str, # type: ignore[override] - initial_user_msg: MessageAction | None = None, - replay_json: str | None = None, - ) -> AgentLoopInfo: - if await self.should_start_in_legacy_mode(sid): - return await self.legacy_conversation_manager.maybe_start_agent_loop( - sid, settings, user_id, initial_user_msg, replay_json - ) - return await self.conversation_manager.maybe_start_agent_loop( - sid, settings, user_id, initial_user_msg, replay_json - ) - - async def send_to_event_stream(self, connection_id: str, data: dict): - return await self.legacy_conversation_manager.send_to_event_stream( - connection_id, data - ) - - async def send_event_to_conversation(self, sid: str, data: dict): - if await self.should_start_in_legacy_mode(sid): - await self.legacy_conversation_manager.send_event_to_conversation(sid, data) - await self.conversation_manager.send_event_to_conversation(sid, data) - - async def disconnect_from_session(self, connection_id: str): - return await self.legacy_conversation_manager.disconnect_from_session( - connection_id - ) - - async def close_session(self, sid: str): - if await self.should_start_in_legacy_mode(sid): - await self.legacy_conversation_manager.close_session(sid) - await self.conversation_manager.close_session(sid) - - async def get_agent_loop_info( - self, user_id: str | None = None, filter_to_sids: set[str] | None = None - ) -> list[AgentLoopInfo]: - if filter_to_sids and len(filter_to_sids) == 1: - sid = next(iter(filter_to_sids)) - if await self.should_start_in_legacy_mode(sid): - return await self.legacy_conversation_manager.get_agent_loop_info( - user_id, filter_to_sids - ) - return await self.conversation_manager.get_agent_loop_info( - user_id, filter_to_sids - ) - agent_loops, legacy_agent_loops = await wait_all( - [ - self.conversation_manager.get_agent_loop_info(user_id, filter_to_sids), - self.legacy_conversation_manager.get_agent_loop_info( - user_id, filter_to_sids - ), - ] - ) - - # Combine results - result = [] - legacy_sids = set() - - # Add legacy agent loops - for agent_loop in legacy_agent_loops: - if await self.should_start_in_legacy_mode(agent_loop.conversation_id): - result.append(agent_loop) - legacy_sids.add(agent_loop.conversation_id) - - # Add non-legacy agent loops - for agent_loop in agent_loops: - if ( - agent_loop.conversation_id not in legacy_sids - and not await self.should_start_in_legacy_mode( - agent_loop.conversation_id - ) - ): - result.append(agent_loop) - - return result - - def _cleanup_expired_cache_entries(self): - """Remove expired entries from the local cache.""" - current_time = time.time() - expired_keys = [ - key - for key, entry in self._legacy_cache.items() - if current_time - entry.timestamp > _LEGACY_ENTRY_TIMEOUT_SECONDS - ] - for key in expired_keys: - del self._legacy_cache[key] - - async def should_start_in_legacy_mode(self, conversation_id: str) -> bool: - """ - Check if a conversation should run in legacy mode by directly checking the runtime. - The /list method does not include stopped conversations even though the PVC for these - may not yet have been deleted, so we need to check /sessions/{session_id} directly. - """ - # Clean up expired entries periodically - self._cleanup_expired_cache_entries() - - # First check the local cache - if conversation_id in self._legacy_cache: - cached_entry = self._legacy_cache[conversation_id] - # Check if the cached value is still valid - if time.time() - cached_entry.timestamp <= _LEGACY_ENTRY_TIMEOUT_SECONDS: - return cached_entry.is_legacy - - # If not in cache or expired, check the runtime directly - runtime = await self.conversation_manager._get_runtime(conversation_id) - is_legacy = self.is_legacy_runtime(runtime) - - # Cache the result with current timestamp - self._legacy_cache[conversation_id] = LegacyCacheEntry(is_legacy, time.time()) - - return is_legacy - - def is_legacy_runtime(self, runtime: dict | None) -> bool: - """ - Determine if a runtime is a legacy runtime based on its command. - - Args: - runtime: The runtime dictionary or None if not found - - Returns: - bool: True if this is a legacy runtime, False otherwise - """ - if runtime is None: - return False - return 'openhands.server' not in runtime['command'] - - @classmethod - def get_instance( - cls, - sio: socketio.AsyncServer, - config: OpenHandsConfig, - file_store: FileStore, - server_config: ServerConfig, - monitoring_listener: MonitoringListener, - ) -> ConversationManager: - return LegacyConversationManager( - sio=sio, - config=config, - server_config=server_config, - file_store=file_store, - conversation_manager=SaasNestedConversationManager.get_instance( - sio, config, file_store, server_config, monitoring_listener - ), - legacy_conversation_manager=ClusteredConversationManager.get_instance( - sio, config, file_store, server_config, monitoring_listener - ), - ) diff --git a/enterprise/tests/unit/test_legacy_conversation_manager.py b/enterprise/tests/unit/test_legacy_conversation_manager.py deleted file mode 100644 index 55b424dabc..0000000000 --- a/enterprise/tests/unit/test_legacy_conversation_manager.py +++ /dev/null @@ -1,485 +0,0 @@ -import time -from unittest.mock import AsyncMock, MagicMock, patch - -import pytest -from server.legacy_conversation_manager import ( - _LEGACY_ENTRY_TIMEOUT_SECONDS, - LegacyCacheEntry, - LegacyConversationManager, -) - -from openhands.core.config.openhands_config import OpenHandsConfig -from openhands.server.config.server_config import ServerConfig -from openhands.server.monitoring import MonitoringListener -from openhands.storage.memory import InMemoryFileStore - - -@pytest.fixture -def mock_sio(): - """Create a mock SocketIO server.""" - return MagicMock() - - -@pytest.fixture -def mock_config(): - """Create a mock OpenHands config.""" - return MagicMock(spec=OpenHandsConfig) - - -@pytest.fixture -def mock_server_config(): - """Create a mock server config.""" - return MagicMock(spec=ServerConfig) - - -@pytest.fixture -def mock_file_store(): - """Create a mock file store.""" - return MagicMock(spec=InMemoryFileStore) - - -@pytest.fixture -def mock_monitoring_listener(): - """Create a mock monitoring listener.""" - return MagicMock(spec=MonitoringListener) - - -@pytest.fixture -def mock_conversation_manager(): - """Create a mock SaasNestedConversationManager.""" - mock_cm = MagicMock() - mock_cm._get_runtime = AsyncMock() - return mock_cm - - -@pytest.fixture -def mock_legacy_conversation_manager(): - """Create a mock ClusteredConversationManager.""" - return MagicMock() - - -@pytest.fixture -def legacy_manager( - mock_sio, - mock_config, - mock_server_config, - mock_file_store, - mock_conversation_manager, - mock_legacy_conversation_manager, -): - """Create a LegacyConversationManager instance for testing.""" - return LegacyConversationManager( - sio=mock_sio, - config=mock_config, - server_config=mock_server_config, - file_store=mock_file_store, - conversation_manager=mock_conversation_manager, - legacy_conversation_manager=mock_legacy_conversation_manager, - ) - - -class TestLegacyCacheEntry: - """Test the LegacyCacheEntry dataclass.""" - - def test_cache_entry_creation(self): - """Test creating a cache entry.""" - timestamp = time.time() - entry = LegacyCacheEntry(is_legacy=True, timestamp=timestamp) - - assert entry.is_legacy is True - assert entry.timestamp == timestamp - - def test_cache_entry_false(self): - """Test creating a cache entry with False value.""" - timestamp = time.time() - entry = LegacyCacheEntry(is_legacy=False, timestamp=timestamp) - - assert entry.is_legacy is False - assert entry.timestamp == timestamp - - -class TestLegacyConversationManagerCacheCleanup: - """Test cache cleanup functionality.""" - - def test_cleanup_expired_cache_entries_removes_expired(self, legacy_manager): - """Test that expired entries are removed from cache.""" - current_time = time.time() - expired_time = current_time - _LEGACY_ENTRY_TIMEOUT_SECONDS - 1 - valid_time = current_time - 100 # Well within timeout - - # Add both expired and valid entries - legacy_manager._legacy_cache = { - 'expired_conversation': LegacyCacheEntry(True, expired_time), - 'valid_conversation': LegacyCacheEntry(False, valid_time), - 'another_expired': LegacyCacheEntry(True, expired_time - 100), - } - - legacy_manager._cleanup_expired_cache_entries() - - # Only valid entry should remain - assert len(legacy_manager._legacy_cache) == 1 - assert 'valid_conversation' in legacy_manager._legacy_cache - assert 'expired_conversation' not in legacy_manager._legacy_cache - assert 'another_expired' not in legacy_manager._legacy_cache - - def test_cleanup_expired_cache_entries_keeps_valid(self, legacy_manager): - """Test that valid entries are kept during cleanup.""" - current_time = time.time() - valid_time = current_time - 100 # Well within timeout - - legacy_manager._legacy_cache = { - 'valid_conversation_1': LegacyCacheEntry(True, valid_time), - 'valid_conversation_2': LegacyCacheEntry(False, valid_time - 50), - } - - legacy_manager._cleanup_expired_cache_entries() - - # Both entries should remain - assert len(legacy_manager._legacy_cache) == 2 - assert 'valid_conversation_1' in legacy_manager._legacy_cache - assert 'valid_conversation_2' in legacy_manager._legacy_cache - - def test_cleanup_expired_cache_entries_empty_cache(self, legacy_manager): - """Test cleanup with empty cache.""" - legacy_manager._legacy_cache = {} - - legacy_manager._cleanup_expired_cache_entries() - - assert len(legacy_manager._legacy_cache) == 0 - - -class TestIsLegacyRuntime: - """Test the is_legacy_runtime method.""" - - def test_is_legacy_runtime_none(self, legacy_manager): - """Test with None runtime.""" - result = legacy_manager.is_legacy_runtime(None) - assert result is False - - def test_is_legacy_runtime_legacy_command(self, legacy_manager): - """Test with legacy runtime command.""" - runtime = {'command': 'some_old_legacy_command'} - result = legacy_manager.is_legacy_runtime(runtime) - assert result is True - - def test_is_legacy_runtime_new_command(self, legacy_manager): - """Test with new runtime command containing openhands.server.""" - runtime = {'command': 'python -m openhands.server.listen'} - result = legacy_manager.is_legacy_runtime(runtime) - assert result is False - - def test_is_legacy_runtime_partial_match(self, legacy_manager): - """Test with command that partially matches but is still legacy.""" - runtime = {'command': 'openhands.client.start'} - result = legacy_manager.is_legacy_runtime(runtime) - assert result is True - - def test_is_legacy_runtime_empty_command(self, legacy_manager): - """Test with empty command.""" - runtime = {'command': ''} - result = legacy_manager.is_legacy_runtime(runtime) - assert result is True - - def test_is_legacy_runtime_missing_command_key(self, legacy_manager): - """Test with runtime missing command key.""" - runtime = {'other_key': 'value'} - # This should raise a KeyError - with pytest.raises(KeyError): - legacy_manager.is_legacy_runtime(runtime) - - -class TestShouldStartInLegacyMode: - """Test the should_start_in_legacy_mode method.""" - - @pytest.mark.asyncio - async def test_cache_hit_valid_entry_legacy(self, legacy_manager): - """Test cache hit with valid legacy entry.""" - conversation_id = 'test_conversation' - current_time = time.time() - - # Add valid cache entry - legacy_manager._legacy_cache[conversation_id] = LegacyCacheEntry( - True, current_time - 100 - ) - - result = await legacy_manager.should_start_in_legacy_mode(conversation_id) - - assert result is True - # Should not call _get_runtime since we hit cache - legacy_manager.conversation_manager._get_runtime.assert_not_called() - - @pytest.mark.asyncio - async def test_cache_hit_valid_entry_non_legacy(self, legacy_manager): - """Test cache hit with valid non-legacy entry.""" - conversation_id = 'test_conversation' - current_time = time.time() - - # Add valid cache entry - legacy_manager._legacy_cache[conversation_id] = LegacyCacheEntry( - False, current_time - 100 - ) - - result = await legacy_manager.should_start_in_legacy_mode(conversation_id) - - assert result is False - # Should not call _get_runtime since we hit cache - legacy_manager.conversation_manager._get_runtime.assert_not_called() - - @pytest.mark.asyncio - async def test_cache_miss_legacy_runtime(self, legacy_manager): - """Test cache miss with legacy runtime.""" - conversation_id = 'test_conversation' - runtime = {'command': 'old_command'} - - legacy_manager.conversation_manager._get_runtime.return_value = runtime - - result = await legacy_manager.should_start_in_legacy_mode(conversation_id) - - assert result is True - # Should call _get_runtime - legacy_manager.conversation_manager._get_runtime.assert_called_once_with( - conversation_id - ) - # Should cache the result - assert conversation_id in legacy_manager._legacy_cache - assert legacy_manager._legacy_cache[conversation_id].is_legacy is True - - @pytest.mark.asyncio - async def test_cache_miss_non_legacy_runtime(self, legacy_manager): - """Test cache miss with non-legacy runtime.""" - conversation_id = 'test_conversation' - runtime = {'command': 'python -m openhands.server.listen'} - - legacy_manager.conversation_manager._get_runtime.return_value = runtime - - result = await legacy_manager.should_start_in_legacy_mode(conversation_id) - - assert result is False - # Should call _get_runtime - legacy_manager.conversation_manager._get_runtime.assert_called_once_with( - conversation_id - ) - # Should cache the result - assert conversation_id in legacy_manager._legacy_cache - assert legacy_manager._legacy_cache[conversation_id].is_legacy is False - - @pytest.mark.asyncio - async def test_cache_expired_entry(self, legacy_manager): - """Test with expired cache entry.""" - conversation_id = 'test_conversation' - expired_time = time.time() - _LEGACY_ENTRY_TIMEOUT_SECONDS - 1 - runtime = {'command': 'python -m openhands.server.listen'} - - # Add expired cache entry - legacy_manager._legacy_cache[conversation_id] = LegacyCacheEntry( - True, - expired_time, # This should be considered expired - ) - - legacy_manager.conversation_manager._get_runtime.return_value = runtime - - result = await legacy_manager.should_start_in_legacy_mode(conversation_id) - - assert result is False # Runtime indicates non-legacy - # Should call _get_runtime since cache is expired - legacy_manager.conversation_manager._get_runtime.assert_called_once_with( - conversation_id - ) - # Should update cache with new result - assert legacy_manager._legacy_cache[conversation_id].is_legacy is False - - @pytest.mark.asyncio - async def test_cache_exactly_at_timeout(self, legacy_manager): - """Test with cache entry exactly at timeout boundary.""" - conversation_id = 'test_conversation' - timeout_time = time.time() - _LEGACY_ENTRY_TIMEOUT_SECONDS - runtime = {'command': 'python -m openhands.server.listen'} - - # Add cache entry exactly at timeout - legacy_manager._legacy_cache[conversation_id] = LegacyCacheEntry( - True, timeout_time - ) - - legacy_manager.conversation_manager._get_runtime.return_value = runtime - - result = await legacy_manager.should_start_in_legacy_mode(conversation_id) - - # Should treat as expired and fetch from runtime - assert result is False - legacy_manager.conversation_manager._get_runtime.assert_called_once_with( - conversation_id - ) - - @pytest.mark.asyncio - async def test_runtime_returns_none(self, legacy_manager): - """Test when runtime returns None.""" - conversation_id = 'test_conversation' - - legacy_manager.conversation_manager._get_runtime.return_value = None - - result = await legacy_manager.should_start_in_legacy_mode(conversation_id) - - assert result is False - # Should cache the result - assert conversation_id in legacy_manager._legacy_cache - assert legacy_manager._legacy_cache[conversation_id].is_legacy is False - - @pytest.mark.asyncio - async def test_cleanup_called_on_each_invocation(self, legacy_manager): - """Test that cleanup is called on each invocation.""" - conversation_id = 'test_conversation' - runtime = {'command': 'test'} - - legacy_manager.conversation_manager._get_runtime.return_value = runtime - - # Mock the cleanup method to verify it's called - with patch.object( - legacy_manager, '_cleanup_expired_cache_entries' - ) as mock_cleanup: - await legacy_manager.should_start_in_legacy_mode(conversation_id) - mock_cleanup.assert_called_once() - - @pytest.mark.asyncio - async def test_multiple_conversations_cached_independently(self, legacy_manager): - """Test that multiple conversations are cached independently.""" - conv1 = 'conversation_1' - conv2 = 'conversation_2' - - runtime1 = {'command': 'old_command'} # Legacy - runtime2 = {'command': 'python -m openhands.server.listen'} # Non-legacy - - # Mock to return different runtimes based on conversation_id - def mock_get_runtime(conversation_id): - if conversation_id == conv1: - return runtime1 - return runtime2 - - legacy_manager.conversation_manager._get_runtime.side_effect = mock_get_runtime - - result1 = await legacy_manager.should_start_in_legacy_mode(conv1) - result2 = await legacy_manager.should_start_in_legacy_mode(conv2) - - assert result1 is True - assert result2 is False - - # Both should be cached - assert conv1 in legacy_manager._legacy_cache - assert conv2 in legacy_manager._legacy_cache - assert legacy_manager._legacy_cache[conv1].is_legacy is True - assert legacy_manager._legacy_cache[conv2].is_legacy is False - - @pytest.mark.asyncio - async def test_cache_timestamp_updated_on_refresh(self, legacy_manager): - """Test that cache timestamp is updated when entry is refreshed.""" - conversation_id = 'test_conversation' - old_time = time.time() - _LEGACY_ENTRY_TIMEOUT_SECONDS - 1 - runtime = {'command': 'test'} - - # Add expired entry - legacy_manager._legacy_cache[conversation_id] = LegacyCacheEntry(True, old_time) - legacy_manager.conversation_manager._get_runtime.return_value = runtime - - # Record time before call - before_call = time.time() - await legacy_manager.should_start_in_legacy_mode(conversation_id) - after_call = time.time() - - # Timestamp should be updated - cached_entry = legacy_manager._legacy_cache[conversation_id] - assert cached_entry.timestamp >= before_call - assert cached_entry.timestamp <= after_call - - -class TestLegacyConversationManagerIntegration: - """Integration tests for LegacyConversationManager.""" - - @pytest.mark.asyncio - async def test_get_instance_creates_proper_manager( - self, - mock_sio, - mock_config, - mock_file_store, - mock_server_config, - mock_monitoring_listener, - ): - """Test that get_instance creates a properly configured manager.""" - with patch( - 'server.legacy_conversation_manager.SaasNestedConversationManager' - ) as mock_saas, patch( - 'server.legacy_conversation_manager.ClusteredConversationManager' - ) as mock_clustered: - mock_saas.get_instance.return_value = MagicMock() - mock_clustered.get_instance.return_value = MagicMock() - - manager = LegacyConversationManager.get_instance( - mock_sio, - mock_config, - mock_file_store, - mock_server_config, - mock_monitoring_listener, - ) - - assert isinstance(manager, LegacyConversationManager) - assert manager.sio == mock_sio - assert manager.config == mock_config - assert manager.file_store == mock_file_store - assert manager.server_config == mock_server_config - - # Verify that both nested managers are created - mock_saas.get_instance.assert_called_once() - mock_clustered.get_instance.assert_called_once() - - def test_legacy_cache_initialized_empty(self, legacy_manager): - """Test that legacy cache is initialized as empty dict.""" - assert isinstance(legacy_manager._legacy_cache, dict) - assert len(legacy_manager._legacy_cache) == 0 - - -class TestEdgeCases: - """Test edge cases and error scenarios.""" - - @pytest.mark.asyncio - async def test_get_runtime_raises_exception(self, legacy_manager): - """Test behavior when _get_runtime raises an exception.""" - conversation_id = 'test_conversation' - - legacy_manager.conversation_manager._get_runtime.side_effect = Exception( - 'Runtime error' - ) - - # Should propagate the exception - with pytest.raises(Exception, match='Runtime error'): - await legacy_manager.should_start_in_legacy_mode(conversation_id) - - @pytest.mark.asyncio - async def test_very_large_cache(self, legacy_manager): - """Test behavior with a large number of cache entries.""" - current_time = time.time() - - # Add many cache entries - for i in range(1000): - legacy_manager._legacy_cache[f'conversation_{i}'] = LegacyCacheEntry( - i % 2 == 0, current_time - i - ) - - # This should work without issues - await legacy_manager.should_start_in_legacy_mode('new_conversation') - - # Should have added one more entry - assert len(legacy_manager._legacy_cache) == 1001 - - def test_cleanup_with_concurrent_modifications(self, legacy_manager): - """Test cleanup behavior when cache is modified during cleanup.""" - current_time = time.time() - expired_time = current_time - _LEGACY_ENTRY_TIMEOUT_SECONDS - 1 - - # Add expired entries - legacy_manager._legacy_cache = { - f'conversation_{i}': LegacyCacheEntry(True, expired_time) for i in range(10) - } - - # This should work without raising exceptions - legacy_manager._cleanup_expired_cache_entries() - - # All entries should be removed - assert len(legacy_manager._legacy_cache) == 0