Compare commits

...

3 Commits

7 changed files with 369 additions and 5 deletions

View File

@@ -0,0 +1,167 @@
from __future__ import annotations
import os
from typing import Any, Dict, Optional
import posthog
from openhands.core.logger import openhands_logger as logger
from openhands.server.config.server_config import ServerConfig
from openhands.server.shared import SettingsStoreImpl, config
class UserAnalytics:
"""
Handles user analytics tracking using PostHog.
Respects user opt-in settings for analytics.
"""
_instance: Optional[UserAnalytics] = None
_user_consent_cache: Dict[str, bool] = {}
def __init__(self, server_config: ServerConfig):
"""
Initialize the UserAnalytics class.
Args:
server_config: The server configuration containing PostHog client key
"""
self.posthog_client_key = server_config.posthog_client_key
self.initialized = False
self._initialize_posthog()
def _initialize_posthog(self) -> None:
"""Initialize the PostHog client."""
if not self.posthog_client_key:
logger.warning('PostHog client key not found, analytics will be disabled')
return
try:
# Initialize PostHog with the client key
posthog_host = os.environ.get('POSTHOG_HOST', 'https://app.posthog.com')
posthog.api_key = self.posthog_client_key
posthog.host = posthog_host
self.initialized = True
logger.info('PostHog analytics initialized successfully')
except Exception as e:
logger.error(f'Failed to initialize PostHog: {e}')
self.initialized = False
async def has_user_consented(self, user_id: str) -> bool:
"""
Check if the user has consented to analytics.
Args:
user_id: The ID of the user
Returns:
True if the user has consented, False otherwise
"""
if not user_id:
return False
# Check if we have the consent in cache
if user_id in self._user_consent_cache:
return self._user_consent_cache[user_id]
try:
# Load user settings to check consent
settings_store = await SettingsStoreImpl.get_instance(config, user_id)
settings = await settings_store.load()
# Check if the user has explicitly consented
has_consented = (
settings is not None and settings.user_consents_to_analytics is True
)
# Cache the result
self._user_consent_cache[user_id] = has_consented
return has_consented
except Exception as e:
logger.error(f'Error checking user consent for analytics: {e}')
return False
async def track_event(
self, user_id: str, event_name: str, properties: Dict[str, Any] | None = None
) -> bool:
"""
Track an event in PostHog if the user has opted into analytics.
Args:
user_id: The ID of the user
event_name: The name of the event to track
properties: Additional properties to include with the event
Returns:
True if the event was tracked, False otherwise
"""
if not self.initialized:
return False
if not user_id:
logger.debug(f'Not tracking event {event_name}: No user ID provided')
return False
# Check if the user has consented to analytics
has_consented = await self.has_user_consented(user_id)
if not has_consented:
logger.debug(
f'Not tracking event {event_name}: User has not consented to analytics'
)
return False
try:
# Send the event to PostHog
posthog.capture(
distinct_id=user_id, event=event_name, properties=properties or {}
)
logger.debug(f'Tracked event {event_name} for user {user_id}')
return True
except Exception as e:
logger.error(f'Failed to track event {event_name}: {e}')
return False
async def track_conversation_created(
self,
user_id: str,
conversation_id: str,
has_initial_message: bool,
has_repository: bool,
has_images: bool,
) -> bool:
"""
Track when a user creates a new conversation.
Args:
user_id: The ID of the user
conversation_id: The ID of the conversation
has_initial_message: Whether the conversation was created with an initial message
has_repository: Whether the conversation was created with a repository
has_images: Whether the conversation was created with images
Returns:
True if the event was tracked, False otherwise
"""
properties = {
'conversation_id': conversation_id,
'has_initial_message': has_initial_message,
'has_repository': has_repository,
'has_images': has_images,
}
return await self.track_event(user_id, 'conversation_created', properties)
@classmethod
def get_instance(cls, server_config: ServerConfig) -> UserAnalytics:
"""
Get or create the singleton instance of UserAnalytics.
Args:
server_config: The server configuration
Returns:
The UserAnalytics instance
"""
if cls._instance is None:
cls._instance = UserAnalytics(server_config)
return cls._instance

View File

@@ -10,6 +10,7 @@ from fastapi import (
import openhands.agenthub # noqa F401 (we import this to get the agents registered)
from openhands import __version__
from openhands.server.middleware import MonitoringMiddleware
from openhands.server.routes.conversation import app as conversation_api_router
from openhands.server.routes.feedback import app as feedback_api_router
from openhands.server.routes.files import app as files_api_router
@@ -52,3 +53,6 @@ app.include_router(manage_conversation_api_router)
app.include_router(settings_router)
app.include_router(git_api_router)
app.include_router(trajectory_router)
# Add the monitoring middleware
app.add_middleware(MonitoringMiddleware)

View File

@@ -207,3 +207,18 @@ class ProviderTokenMiddleware(SessionMiddlewareInterface):
request.state.provider_tokens = None
return await call_next(request)
class MonitoringMiddleware(BaseHTTPMiddleware):
"""
Middleware to attach the monitoring listener to the request state.
"""
async def dispatch(self, request: Request, call_next: Callable):
# Attach the monitoring listener to the request state
request.state.monitoring_listener = shared.monitoring_listener
# Continue processing the request
response = await call_next(request)
return response

View File

@@ -1,5 +1,8 @@
from openhands.core.config.app_config import AppConfig
from openhands.core.logger import openhands_logger as logger
from openhands.events.event import Event
from openhands.server.analytics import UserAnalytics
from openhands.server.config.server_config import load_server_config
class MonitoringListener:
@@ -9,6 +12,11 @@ class MonitoringListener:
Implementations should be non-disruptive, do not raise or block to perform I/O.
"""
def __init__(self):
"""Initialize the MonitoringListener."""
self.server_config = load_server_config()
self.user_analytics = UserAnalytics.get_instance(self.server_config)
def on_session_event(self, event: Event) -> None:
"""
Track metrics about events being added to a Session's EventStream.
@@ -23,12 +31,41 @@ class MonitoringListener:
"""
pass
def on_create_conversation(self) -> None:
async def on_create_conversation(
self,
user_id: str = None,
conversation_id: str = None,
has_initial_message: bool = False,
has_repository: bool = False,
has_images: bool = False,
) -> None:
"""
Track the beginning of conversation creation.
Does not currently capture whether it succeed.
Args:
user_id: The ID of the user
conversation_id: The ID of the conversation
has_initial_message: Whether the conversation was created with an initial message
has_repository: Whether the conversation was created with a repository
has_images: Whether the conversation was created with images
"""
pass
# If no user_id or conversation_id is provided, just log the event without tracking
if not user_id or not conversation_id:
logger.debug("Conversation creation started")
return
try:
# The UserAnalytics class will check for user consent internally
await self.user_analytics.track_conversation_created(
user_id,
conversation_id,
has_initial_message,
has_repository,
has_images,
)
except Exception as e:
# Don't let analytics failures affect the application
logger.error(f'Error tracking conversation creation: {e}')
@classmethod
def get_instance(

View File

@@ -112,7 +112,9 @@ async def _create_new_conversation(
title=conversation_title,
user_id=user_id,
github_user_id=None,
selected_repository=selected_repository.full_name if selected_repository else selected_repository,
selected_repository=selected_repository.full_name
if selected_repository
else selected_repository,
selected_branch=selected_branch,
)
)
@@ -160,9 +162,10 @@ async def new_conversation(request: Request, data: InitSessionRequest):
replay_json = data.replay_json
try:
user_id = get_user_id(request)
# Create conversation with initial message
conversation_id = await _create_new_conversation(
get_user_id(request),
user_id,
provider_tokens,
selected_repository,
selected_branch,
@@ -171,6 +174,20 @@ async def new_conversation(request: Request, data: InitSessionRequest):
replay_json,
)
# Track conversation creation in analytics
try:
# The UserAnalytics class will check for user consent internally
await request.state.monitoring_listener.on_create_conversation(
user_id=user_id,
conversation_id=conversation_id,
has_initial_message=bool(initial_user_msg),
has_repository=bool(selected_repository),
has_images=bool(image_urls),
)
except Exception as e:
# Don't let analytics failures affect the application
logger.error(f'Error tracking conversation creation analytics: {e}')
return JSONResponse(
content={'status': 'ok', 'conversation_id': conversation_id}
)

View File

@@ -48,6 +48,7 @@ google-cloud-aiplatform = "*"
anthropic = {extras = ["vertex"], version = "*"}
tree-sitter = "^0.24.0"
bashlex = "^0.18"
posthog = "^3.5.0"
pyjwt = "^2.9.0"
dirhash = "*"
python-frontmatter = "^1.1.0"

View File

@@ -0,0 +1,123 @@
import asyncio
import unittest
from unittest.mock import AsyncMock, MagicMock, patch
from openhands.server.analytics import UserAnalytics
from openhands.server.config.server_config import ServerConfig
class TestUserAnalytics(unittest.TestCase):
def setUp(self):
self.mock_server_config = MagicMock(spec=ServerConfig)
self.mock_server_config.posthog_client_key = 'test_key'
@patch('openhands.server.analytics.posthog')
def test_initialization(self, mock_posthog):
analytics = UserAnalytics(self.mock_server_config)
self.assertTrue(analytics.initialized)
mock_posthog.api_key = 'test_key'
@patch('openhands.server.analytics.posthog')
@patch('openhands.server.analytics.UserAnalytics.has_user_consented')
def test_track_event(self, mock_has_consented, mock_posthog):
# Setup
analytics = UserAnalytics(self.mock_server_config)
mock_has_consented.return_value = asyncio.Future()
mock_has_consented.return_value.set_result(True)
# Execute
result = asyncio.run(
analytics.track_event('user123', 'test_event', {'prop': 'value'})
)
# Assert
self.assertTrue(result)
mock_posthog.capture.assert_called_once_with(
distinct_id='user123', event='test_event', properties={'prop': 'value'}
)
@patch('openhands.server.analytics.posthog')
def test_track_event_no_user_id(self, mock_posthog):
# Setup
analytics = UserAnalytics(self.mock_server_config)
# Execute
result = asyncio.run(analytics.track_event('', 'test_event'))
# Assert
self.assertFalse(result)
mock_posthog.capture.assert_not_called()
@patch('openhands.server.analytics.posthog')
@patch('openhands.server.analytics.UserAnalytics.has_user_consented')
def test_track_event_no_consent(self, mock_has_consented, mock_posthog):
# Setup
analytics = UserAnalytics(self.mock_server_config)
mock_has_consented.return_value = asyncio.Future()
mock_has_consented.return_value.set_result(False)
# Execute
result = asyncio.run(analytics.track_event('user123', 'test_event'))
# Assert
self.assertFalse(result)
mock_posthog.capture.assert_not_called()
@patch('openhands.server.analytics.posthog')
@patch('openhands.server.analytics.UserAnalytics.track_event')
def test_track_conversation_created(self, mock_track_event, mock_posthog):
# Setup
analytics = UserAnalytics(self.mock_server_config)
mock_track_event.return_value = asyncio.Future()
mock_track_event.return_value.set_result(True)
# Execute
result = asyncio.run(
analytics.track_conversation_created(
'user123', 'conv456', True, False, True
)
)
# Assert
self.assertTrue(result)
mock_track_event.assert_called_once_with(
'user123',
'conversation_created',
{
'conversation_id': 'conv456',
'has_initial_message': True,
'has_repository': False,
'has_images': True,
},
)
@patch('openhands.server.analytics.posthog')
def test_singleton_pattern(self, mock_posthog):
analytics1 = UserAnalytics.get_instance(self.mock_server_config)
analytics2 = UserAnalytics.get_instance(self.mock_server_config)
self.assertIs(analytics1, analytics2)
@patch('openhands.server.analytics.SettingsStoreImpl')
async def test_has_user_consented(self, mock_settings_store):
# Setup
analytics = UserAnalytics(self.mock_server_config)
mock_settings = MagicMock()
mock_settings.user_consents_to_analytics = True
mock_store_instance = AsyncMock()
mock_store_instance.load.return_value = mock_settings
mock_settings_store.get_instance.return_value = asyncio.Future()
mock_settings_store.get_instance.return_value.set_result(mock_store_instance)
# Execute
result = await analytics.has_user_consented('user123')
# Assert
self.assertTrue(result)
mock_settings_store.get_instance.assert_called_once()
mock_store_instance.load.assert_called_once()
if __name__ == '__main__':
unittest.main()