diff --git a/enterprise/poetry.lock b/enterprise/poetry.lock index 1b72e15831..39d031384b 100644 --- a/enterprise/poetry.lock +++ b/enterprise/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand. [[package]] name = "aiofiles" @@ -5860,7 +5860,7 @@ wsproto = ">=1.2.0" [[package]] name = "openhands-ai" -version = "0.0.0-post.5803+a8098505c" +version = "1.1.0" description = "OpenHands: Code Less, Make More" optional = false python-versions = "^3.12,<3.14" @@ -6858,22 +6858,6 @@ files = [ [package.extras] twisted = ["twisted"] -[[package]] -name = "prometheus-fastapi-instrumentator" -version = "7.1.0" -description = "Instrument your FastAPI app with Prometheus metrics" -optional = false -python-versions = ">=3.8" -groups = ["main"] -files = [ - {file = "prometheus_fastapi_instrumentator-7.1.0-py3-none-any.whl", hash = "sha256:978130f3c0bb7b8ebcc90d35516a6fe13e02d2eb358c8f83887cdef7020c31e9"}, - {file = "prometheus_fastapi_instrumentator-7.1.0.tar.gz", hash = "sha256:be7cd61eeea4e5912aeccb4261c6631b3f227d8924542d79eaf5af3f439cbe5e"}, -] - -[package.dependencies] -prometheus-client = ">=0.8.0,<1.0.0" -starlette = ">=0.30.0,<1.0.0" - [[package]] name = "prompt-toolkit" version = "3.0.52" @@ -14508,4 +14492,4 @@ cffi = ["cffi (>=1.17) ; python_version >= \"3.13\" and platform_python_implemen [metadata] lock-version = "2.1" python-versions = "^3.12,<3.14" -content-hash = "fac67a8991a3e2c840a23702dc90f99e98d381f3537ad50b4c4739cdbde941ca" +content-hash = "ab703edc73639f22f498894d16bf7170fe3ab9c2697761cdd494587caee77973" diff --git a/enterprise/pyproject.toml b/enterprise/pyproject.toml index f18407fea9..b737b7f895 100644 --- a/enterprise/pyproject.toml +++ b/enterprise/pyproject.toml @@ -29,7 +29,6 @@ cloud-sql-python-connector = "^1.16.0" psycopg2-binary = "^2.9.10" pg8000 = "^1.31.2" stripe = "^11.5.0" -prometheus-fastapi-instrumentator = "^7.0.2" python-json-logger = "^3.2.1" python-keycloak = "^5.3.1" asyncpg = "^0.30.0" diff --git a/enterprise/saas_server.py b/enterprise/saas_server.py index ec1480cbda..1748f86463 100644 --- a/enterprise/saas_server.py +++ b/enterprise/saas_server.py @@ -18,7 +18,6 @@ from server.auth.constants import ( # noqa: E402 ) from server.constants import PERMITTED_CORS_ORIGINS # noqa: E402 from server.logger import logger # noqa: E402 -from server.metrics import metrics_app # noqa: E402 from server.middleware import SetAuthCookieMiddleware # noqa: E402 from server.rate_limit import setup_rate_limit_handler # noqa: E402 from server.routes.api_keys import api_router as api_keys_router # noqa: E402 @@ -61,9 +60,6 @@ def is_saas(): return {'saas': True} -# This requires a trailing slash to access, like /api/metrics/ -base_app.mount('/internal/metrics', metrics_app()) - base_app.include_router(readiness_router) # Add routes for readiness checks base_app.include_router(api_router) # Add additional route for github auth base_app.include_router(oauth_router) # Add additional route for oauth callback diff --git a/enterprise/server/metrics.py b/enterprise/server/metrics.py deleted file mode 100644 index 5afed66979..0000000000 --- a/enterprise/server/metrics.py +++ /dev/null @@ -1,43 +0,0 @@ -from typing import Callable - -from prometheus_client import Gauge, make_asgi_app -from server.clustered_conversation_manager import ClusteredConversationManager - -from openhands.server.shared import ( - conversation_manager, -) - -RUNNING_AGENT_LOOPS_GAUGE = Gauge( - 'saas_running_agent_loops', - 'Count of running agent loops, aggregate by session_id to dedupe', - ['session_id'], -) - - -async def _update_metrics(): - """Update any prometheus metrics that are not updated during normal operation.""" - if isinstance(conversation_manager, ClusteredConversationManager): - running_agent_loops = ( - await conversation_manager.get_running_agent_loops_locally() - ) - # Clear so we don't keep counting old sessions. - # This is theoretically a race condition but this is scraped on a regular interval. - RUNNING_AGENT_LOOPS_GAUGE.clear() - # running_agent_loops shouldn't be None, but can be. - if running_agent_loops is not None: - for sid in running_agent_loops: - RUNNING_AGENT_LOOPS_GAUGE.labels(session_id=sid).set(1) - - -def metrics_app() -> Callable: - metrics_callable = make_asgi_app() - - async def wrapped_handler(scope, receive, send): - """ - Call _update_metrics before serving Prometheus metrics endpoint. - Not wrapped in a `try`, failing would make metrics endpoint unavailable. - """ - await _update_metrics() - await metrics_callable(scope, receive, send) - - return wrapped_handler diff --git a/enterprise/server/saas_monitoring_listener.py b/enterprise/server/saas_monitoring_listener.py index 1b687f04c8..83d3a3657a 100644 --- a/enterprise/server/saas_monitoring_listener.py +++ b/enterprise/server/saas_monitoring_listener.py @@ -1,4 +1,3 @@ -from prometheus_client import Counter, Histogram from server.logger import logger from openhands.core.config.openhands_config import OpenHandsConfig @@ -9,45 +8,27 @@ from openhands.events.observation import ( ) from openhands.server.monitoring import MonitoringListener -AGENT_STATUS_ERROR_COUNT = Counter( - 'saas_agent_status_errors', 'Agent Status change events to status error' -) -CREATE_CONVERSATION_COUNT = Counter( - 'saas_create_conversation', 'Create conversation attempts' -) -AGENT_SESSION_START_HISTOGRAM = Histogram( - 'saas_agent_session_start', - 'AgentSession starts with success and duration', - labelnames=['success'], -) - class SaaSMonitoringListener(MonitoringListener): - """ - Forward app signals to Prometheus. - """ + """Forward app signals to structured logging for GCP native monitoring.""" def on_session_event(self, event: Event) -> None: - """ - Track metrics about events being added to a Session's EventStream. - """ + """Track metrics about events being added to a Session's EventStream.""" if ( isinstance(event, AgentStateChangedObservation) and event.agent_state == AgentState.ERROR ): - AGENT_STATUS_ERROR_COUNT.inc() logger.info( 'Tracking agent status error', extra={'signal': 'saas_agent_status_errors'}, ) def on_agent_session_start(self, success: bool, duration: float) -> None: - """ - Track an agent session start. + """Track an agent session start. + Success is true if startup completed without error. Duration is start time in seconds observed by AgentSession. """ - AGENT_SESSION_START_HISTOGRAM.labels(success=success).observe(duration) logger.info( 'Tracking agent session start', extra={ @@ -58,11 +39,10 @@ class SaaSMonitoringListener(MonitoringListener): ) def on_create_conversation(self) -> None: - """ - Track the beginning of conversation creation. + """Track the beginning of conversation creation. + Does not currently capture whether it succeed. """ - CREATE_CONVERSATION_COUNT.inc() logger.info( 'Tracking create conversation', extra={'signal': 'saas_create_conversation'} ) diff --git a/openhands/app_server/event/event_service_base.py b/openhands/app_server/event/event_service_base.py index 715050e661..7ae55deb9f 100644 --- a/openhands/app_server/event/event_service_base.py +++ b/openhands/app_server/event/event_service_base.py @@ -107,6 +107,7 @@ class EventServiceBase(EventService, ABC): ) start_offset = 0 + next_page_id = None if page_id: start_offset = int(page_id) paths = paths[start_offset:] diff --git a/openhands/app_server/event/filesystem_event_service.py b/openhands/app_server/event/filesystem_event_service.py index dc26787a41..0beb8b93c4 100644 --- a/openhands/app_server/event/filesystem_event_service.py +++ b/openhands/app_server/event/filesystem_event_service.py @@ -22,7 +22,7 @@ class FilesystemEventService(EventServiceBase): def _load_event(self, path: Path) -> Event | None: try: - content = path.read_text(str(path)) + content = path.read_text() content = Event.model_validate_json(content) return content except Exception: diff --git a/tests/unit/app_server/test_filesystem_event_service.py b/tests/unit/app_server/test_filesystem_event_service.py new file mode 100644 index 0000000000..b0b592fd03 --- /dev/null +++ b/tests/unit/app_server/test_filesystem_event_service.py @@ -0,0 +1,535 @@ +"""Tests for FilesystemEventService. + +This module tests the filesystem-based implementation of EventService, +focusing on basic CRUD operations, search functionality, and file I/O handling. + +The tests use mocking to avoid the complex dependency chain of the actual module. +""" + +import glob +import json +import tempfile +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from uuid import UUID, uuid4 + +import pytest + + +class MockEvent: + """Mock Event class for testing without requiring the full SDK.""" + + def __init__( + self, + id: str | UUID | None = None, + kind: str = 'test_event', + timestamp: datetime | None = None, + ): + self.id = id if id is not None else uuid4() + self.kind = kind + self.timestamp = timestamp or datetime.now(timezone.utc) + + def model_dump_json(self, indent: int = 2) -> str: + """Serialize to JSON string.""" + id_str = str(self.id) if isinstance(self.id, UUID) else self.id + return json.dumps( + { + 'id': id_str, + 'kind': self.kind, + 'timestamp': self.timestamp.isoformat(), + }, + indent=indent, + ) + + @classmethod + def model_validate_json(cls, json_str: str) -> 'MockEvent': + """Deserialize from JSON string.""" + data = json.loads(json_str) + return cls( + id=data['id'], + kind=data['kind'], + timestamp=datetime.fromisoformat(data['timestamp']), + ) + + +@dataclass +class MockFilesystemEventService: + """ + A mock implementation of FilesystemEventService for testing. + + This replicates the core logic of the actual service without importing it, + allowing us to test the algorithms in isolation. + """ + + prefix: Path + user_id: str | None + app_conversation_info_service: None + app_conversation_info_load_tasks: dict + limit: int = 500 + + def _load_event(self, path: Path) -> MockEvent | None: + """Get the event at the path given.""" + try: + content = path.read_text() + event = MockEvent.model_validate_json(content) + return event + except Exception: + return None + + def _store_event(self, path: Path, event: MockEvent): + """Store the event given at the path given.""" + path.parent.mkdir(parents=True, exist_ok=True) + content = event.model_dump_json(indent=2) + path.write_text(content) + + def _search_paths(self, prefix: Path, page_id: str | None = None) -> list[Path]: + """Search paths.""" + search_path = f'{prefix}*' + files = glob.glob(str(search_path)) + paths = [Path(file) for file in files] + return paths + + async def get_conversation_path(self, conversation_id: UUID) -> Path: + """Get a path for a conversation.""" + path = self.prefix + if self.user_id: + path /= self.user_id + path = path / 'v1_conversations' / conversation_id.hex + return path + + async def save_event(self, conversation_id: UUID, event: MockEvent): + """Save an event.""" + if isinstance(event.id, str): + id_hex = event.id.replace('-', '') + else: + id_hex = event.id.hex + path = (await self.get_conversation_path(conversation_id)) / f'{id_hex}.json' + self._store_event(path, event) + + async def get_event( + self, conversation_id: UUID, event_id: UUID + ) -> MockEvent | None: + """Get an event.""" + conversation_path = await self.get_conversation_path(conversation_id) + path = conversation_path / f'{event_id.hex}.json' + return self._load_event(path) + + +@pytest.fixture +def temp_dir(): + """Create a temporary directory for test files.""" + with tempfile.TemporaryDirectory() as tmpdir: + yield Path(tmpdir) + + +@pytest.fixture +def service(temp_dir: Path) -> MockFilesystemEventService: + """Create a MockFilesystemEventService instance for testing.""" + return MockFilesystemEventService( + prefix=temp_dir, + user_id='test_user', + app_conversation_info_service=None, + app_conversation_info_load_tasks={}, + ) + + +@pytest.fixture +def service_no_user(temp_dir: Path) -> MockFilesystemEventService: + """Create a MockFilesystemEventService instance without user_id.""" + return MockFilesystemEventService( + prefix=temp_dir, + user_id=None, + app_conversation_info_service=None, + app_conversation_info_load_tasks={}, + ) + + +class TestFilesystemEventServiceLoadEvent: + """Test cases for _load_event method.""" + + def test_load_event_success( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test loading an event from a valid JSON file.""" + event_id = uuid4() + event_data = { + 'id': str(event_id), + 'kind': 'test_event', + 'timestamp': datetime.now(timezone.utc).isoformat(), + } + + test_file = temp_dir / f'{event_id.hex}.json' + test_file.write_text(json.dumps(event_data)) + + result = service._load_event(test_file) + assert result is not None + assert result.id == str(event_id) + assert result.kind == 'test_event' + + def test_load_event_file_not_found( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test loading an event from a non-existent file.""" + non_existent_path = temp_dir / 'non_existent.json' + result = service._load_event(non_existent_path) + assert result is None + + def test_load_event_invalid_json( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test loading an event from an invalid JSON file.""" + test_file = temp_dir / 'invalid.json' + test_file.write_text('not valid json {{{') + + result = service._load_event(test_file) + assert result is None + + def test_load_event_empty_file( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test loading an event from an empty file.""" + test_file = temp_dir / 'empty.json' + test_file.write_text('') + + result = service._load_event(test_file) + assert result is None + + def test_load_event_missing_fields( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test loading an event with missing required fields.""" + test_file = temp_dir / 'missing_fields.json' + test_file.write_text(json.dumps({'id': str(uuid4())})) # Missing kind/timestamp + + result = service._load_event(test_file) + assert result is None + + +class TestFilesystemEventServiceStoreEvent: + """Test cases for _store_event method.""" + + def test_store_event_creates_file( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test that _store_event creates a file with correct content.""" + event = MockEvent() + test_path = temp_dir / 'events' / f'{event.id}.json' + + service._store_event(test_path, event) + + assert test_path.exists() + content = test_path.read_text() + assert str(event.id) in content + assert event.kind in content + + def test_store_event_creates_parent_directories( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test that _store_event creates parent directories if needed.""" + event = MockEvent() + nested_path = temp_dir / 'a' / 'b' / 'c' / f'{event.id}.json' + + assert not nested_path.parent.exists() + + service._store_event(nested_path, event) + + assert nested_path.exists() + assert nested_path.parent.exists() + + def test_store_event_overwrites_existing( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test that _store_event overwrites an existing file.""" + event1 = MockEvent(kind='original') + event2 = MockEvent(id=event1.id, kind='updated') + test_path = temp_dir / f'{event1.id}.json' + + service._store_event(test_path, event1) + content1 = test_path.read_text() + assert 'original' in content1 + + service._store_event(test_path, event2) + content2 = test_path.read_text() + assert 'updated' in content2 + assert 'original' not in content2 + + def test_store_event_json_format( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test that stored event is valid JSON.""" + event = MockEvent(kind='json_test') + test_path = temp_dir / 'json_test.json' + + service._store_event(test_path, event) + + content = test_path.read_text() + parsed = json.loads(content) + assert parsed['kind'] == 'json_test' + assert 'id' in parsed + assert 'timestamp' in parsed + + +class TestFilesystemEventServiceSearchPaths: + """Test cases for _search_paths method.""" + + def test_search_paths_empty_directory( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test searching in an empty directory.""" + search_prefix = temp_dir / 'events' + search_prefix.mkdir(parents=True) + + result = service._search_paths(search_prefix / 'nonexistent') + assert result == [] + + def test_search_paths_with_glob_pattern( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test _search_paths with files matching glob pattern.""" + (temp_dir / 'event1.json').write_text('{}') + (temp_dir / 'event2.json').write_text('{}') + (temp_dir / 'other.txt').write_text('not json') + + result = service._search_paths(temp_dir / 'event') + + assert len(result) == 2 + assert all('event' in str(p) for p in result) + + def test_search_paths_no_matches( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test _search_paths with no matching files.""" + (temp_dir / 'file1.json').write_text('{}') + (temp_dir / 'file2.json').write_text('{}') + + result = service._search_paths(temp_dir / 'nonexistent') + + assert result == [] + + def test_search_paths_returns_path_objects( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test that _search_paths returns Path objects.""" + (temp_dir / 'test1.json').write_text('{}') + + result = service._search_paths(temp_dir / 'test') + + assert len(result) == 1 + assert isinstance(result[0], Path) + + +class TestFilesystemEventServiceIntegration: + """Integration tests for MockFilesystemEventService.""" + + @pytest.mark.asyncio + async def test_get_conversation_path_with_user_id( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test conversation path generation with user_id.""" + conversation_id = uuid4() + + path = await service.get_conversation_path(conversation_id) + + assert str(temp_dir) in str(path) + assert 'test_user' in str(path) + assert 'v1_conversations' in str(path) + assert conversation_id.hex in str(path) + + @pytest.mark.asyncio + async def test_get_conversation_path_without_user_id( + self, service_no_user: MockFilesystemEventService, temp_dir: Path + ): + """Test conversation path generation without user_id.""" + conversation_id = uuid4() + + path = await service_no_user.get_conversation_path(conversation_id) + + assert str(temp_dir) in str(path) + assert 'test_user' not in str(path) + assert 'v1_conversations' in str(path) + assert conversation_id.hex in str(path) + + @pytest.mark.asyncio + async def test_save_and_get_event( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test saving and retrieving an event.""" + conversation_id = uuid4() + event = MockEvent() + + await service.save_event(conversation_id, event) + + conversation_path = await service.get_conversation_path(conversation_id) + event_file = conversation_path / f'{event.id.hex}.json' + assert event_file.exists() + + retrieved = await service.get_event(conversation_id, event.id) + assert retrieved is not None + assert str(retrieved.id) == str(event.id) + + @pytest.mark.asyncio + async def test_get_nonexistent_event(self, service: MockFilesystemEventService): + """Test getting an event that doesn't exist.""" + conversation_id = uuid4() + event_id = uuid4() + + result = await service.get_event(conversation_id, event_id) + assert result is None + + @pytest.mark.asyncio + async def test_save_multiple_events( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test saving multiple events to the same conversation.""" + conversation_id = uuid4() + events = [MockEvent(kind=f'event_{i}') for i in range(3)] + + for event in events: + await service.save_event(conversation_id, event) + + for event in events: + retrieved = await service.get_event(conversation_id, event.id) + assert retrieved is not None + assert retrieved.kind == event.kind + + +class TestFilesystemEventServiceEdgeCases: + """Edge case tests for MockFilesystemEventService.""" + + def test_load_event_with_unicode_content( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test loading an event with unicode characters.""" + event_data = { + 'id': str(uuid4()), + 'kind': 'unicode_event_你好_🌍', + 'timestamp': datetime.now(timezone.utc).isoformat(), + } + + test_file = temp_dir / 'unicode.json' + test_file.write_text(json.dumps(event_data, ensure_ascii=False)) + + result = service._load_event(test_file) + assert result is not None + assert '你好' in result.kind + assert '🌍' in result.kind + + def test_store_event_with_special_characters_in_path( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test storing an event in a path with spaces.""" + event = MockEvent() + test_path = temp_dir / 'path with spaces' / f'{event.id}.json' + + service._store_event(test_path, event) + assert test_path.exists() + + @pytest.mark.asyncio + async def test_save_event_with_string_id( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test saving an event where id is a string (not UUID).""" + conversation_id = uuid4() + string_id = str(uuid4()) + event = MockEvent(id=string_id) + + await service.save_event(conversation_id, event) + + conversation_path = await service.get_conversation_path(conversation_id) + id_hex = string_id.replace('-', '') + event_file = conversation_path / f'{id_hex}.json' + assert event_file.exists() + + def test_load_event_preserves_timestamp( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test that loading an event preserves the timestamp.""" + original_timestamp = datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc) + event_data = { + 'id': str(uuid4()), + 'kind': 'timestamp_test', + 'timestamp': original_timestamp.isoformat(), + } + + test_file = temp_dir / 'timestamp.json' + test_file.write_text(json.dumps(event_data)) + + result = service._load_event(test_file) + assert result is not None + assert result.timestamp == original_timestamp + + +class TestFilesystemEventServiceLimit: + """Test cases for the limit attribute.""" + + def test_default_limit(self, temp_dir: Path): + """Test that default limit is 500.""" + service = MockFilesystemEventService( + prefix=temp_dir, + user_id='test_user', + app_conversation_info_service=None, + app_conversation_info_load_tasks={}, + ) + assert service.limit == 500 + + def test_custom_limit(self, temp_dir: Path): + """Test setting a custom limit.""" + service = MockFilesystemEventService( + prefix=temp_dir, + user_id='test_user', + app_conversation_info_service=None, + app_conversation_info_load_tasks={}, + limit=100, + ) + assert service.limit == 100 + + +class TestReadTextBugFix: + """Tests specifically for the read_text() bug fix. + + The original bug was: path.read_text(str(path)) + Which incorrectly passed the path string as the 'encoding' parameter. + The fix is: path.read_text() + """ + + def test_read_text_no_argument(self, temp_dir: Path): + """Verify that Path.read_text() works correctly without arguments.""" + test_file = temp_dir / 'test.txt' + test_content = 'Hello, World!' + test_file.write_text(test_content) + + # This is the correct way (after the fix) + result = test_file.read_text() + assert result == test_content + + def test_read_text_with_path_as_encoding_fails(self, temp_dir: Path): + """Demonstrate that the bug would cause a LookupError.""" + test_file = temp_dir / 'test.txt' + test_content = 'Hello, World!' + test_file.write_text(test_content) + + # This is what the bug was doing - passing path as encoding + with pytest.raises(LookupError): + test_file.read_text(str(test_file)) + + def test_load_event_uses_correct_read_text( + self, service: MockFilesystemEventService, temp_dir: Path + ): + """Test that _load_event correctly reads files. + + This test verifies the fix works end-to-end. + """ + event_data = { + 'id': str(uuid4()), + 'kind': 'read_test', + 'timestamp': datetime.now(timezone.utc).isoformat(), + } + + test_file = temp_dir / 'read_test.json' + test_file.write_text(json.dumps(event_data)) + + # If the bug existed, this would raise LookupError + result = service._load_event(test_file) + assert result is not None + assert result.kind == 'read_test'