mirror of
https://github.com/All-Hands-AI/OpenHands.git
synced 2026-04-29 03:00:45 -04:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 12b81eb75b | |||
| 4e99cb9a4c | |||
| b629483a26 | |||
| 68a3854fa8 | |||
| fe2716ff48 | |||
| 6f83a215a8 |
@@ -8,11 +8,7 @@ from openhands.events.event_filter import EventFilter
|
||||
from openhands.events.event_store_abc import EventStoreABC
|
||||
from openhands.events.serialization.event import event_from_dict
|
||||
from openhands.storage.files import FileStore
|
||||
from openhands.storage.locations import (
|
||||
get_conversation_dir,
|
||||
get_conversation_event_filename,
|
||||
get_conversation_events_dir,
|
||||
)
|
||||
from openhands.storage.paths import ConversationPaths
|
||||
from openhands.utils.shutdown_listener import should_continue
|
||||
|
||||
|
||||
@@ -44,12 +40,20 @@ _DUMMY_PAGE = _CachePage(None, 1, -1)
|
||||
class EventStore(EventStoreABC):
|
||||
"""A stored list of events backing a conversation"""
|
||||
|
||||
sid: str
|
||||
paths: ConversationPaths
|
||||
file_store: FileStore
|
||||
user_id: str | None
|
||||
cache_size: int = 25
|
||||
_cur_id: int | None = None # Private field to cache the calculated value
|
||||
|
||||
# Back-compat for code that reads these attributes directly
|
||||
@property
|
||||
def sid(self) -> str: # type: ignore[override]
|
||||
return self.paths.sid
|
||||
|
||||
@property
|
||||
def user_id(self) -> str | None: # type: ignore[override]
|
||||
return self.paths.user_id
|
||||
|
||||
@property
|
||||
def cur_id(self) -> int:
|
||||
"""Lazy calculated property for the current event ID."""
|
||||
@@ -66,10 +70,12 @@ class EventStore(EventStoreABC):
|
||||
"""Calculate the current event ID based on file system content."""
|
||||
events = []
|
||||
try:
|
||||
events_dir = get_conversation_events_dir(self.sid, self.user_id)
|
||||
events_dir = self.paths.events_dir()
|
||||
events = self.file_store.list(events_dir)
|
||||
except FileNotFoundError:
|
||||
logger.debug(f'No events found for session {self.sid} at {events_dir}')
|
||||
logger.debug(
|
||||
f'No events found for session {self.paths.sid} at {events_dir}'
|
||||
)
|
||||
|
||||
if not events:
|
||||
return 0
|
||||
@@ -136,7 +142,7 @@ class EventStore(EventStoreABC):
|
||||
return
|
||||
|
||||
def get_event(self, id: int) -> Event:
|
||||
filename = self._get_filename_for_id(id, self.user_id)
|
||||
filename = self.paths.event_filename(id)
|
||||
content = self.file_store.read(filename)
|
||||
data = json.loads(content)
|
||||
return event_from_dict(data)
|
||||
@@ -153,10 +159,10 @@ class EventStore(EventStoreABC):
|
||||
yield event
|
||||
|
||||
def _get_filename_for_id(self, id: int, user_id: str | None) -> str:
|
||||
return get_conversation_event_filename(self.sid, id, user_id)
|
||||
return self.paths.event_filename(id)
|
||||
|
||||
def _get_filename_for_cache(self, start: int, end: int) -> str:
|
||||
return f'{get_conversation_dir(self.sid, self.user_id)}event_cache/{start}-{end}.json'
|
||||
return self.paths.event_cache_filename(start, end)
|
||||
|
||||
def _load_cache_page(self, start: int, end: int) -> _CachePage:
|
||||
"""Read a page from the cache. Reading individual events is slow when there are a lot of them, so we use pages."""
|
||||
|
||||
@@ -13,9 +13,7 @@ from openhands.events.event_store import EventStore
|
||||
from openhands.events.serialization.event import event_from_dict, event_to_dict
|
||||
from openhands.io import json
|
||||
from openhands.storage import FileStore
|
||||
from openhands.storage.locations import (
|
||||
get_conversation_dir,
|
||||
)
|
||||
from openhands.storage.paths import ConversationPaths
|
||||
from openhands.utils.async_utils import call_sync_from_async
|
||||
from openhands.utils.shutdown_listener import should_continue
|
||||
|
||||
@@ -34,7 +32,8 @@ async def session_exists(
|
||||
sid: str, file_store: FileStore, user_id: str | None = None
|
||||
) -> bool:
|
||||
try:
|
||||
await call_sync_from_async(file_store.list, get_conversation_dir(sid, user_id))
|
||||
paths = ConversationPaths(sid=sid, user_id=user_id)
|
||||
await call_sync_from_async(file_store.list, paths.conversation_dir())
|
||||
return True
|
||||
except FileNotFoundError:
|
||||
return False
|
||||
@@ -54,7 +53,7 @@ class EventStream(EventStore):
|
||||
_write_page_cache: list[dict]
|
||||
|
||||
def __init__(self, sid: str, file_store: FileStore, user_id: str | None = None):
|
||||
super().__init__(sid, file_store, user_id)
|
||||
super().__init__(ConversationPaths(sid=sid, user_id=user_id), file_store)
|
||||
self._stop_flag = threading.Event()
|
||||
self._queue: queue.Queue[Event] = queue.Queue()
|
||||
self._thread_pools = {}
|
||||
|
||||
@@ -30,6 +30,7 @@ from openhands.server.shared import (
|
||||
from openhands.storage.conversation.conversation_validator import (
|
||||
create_conversation_validator,
|
||||
)
|
||||
from openhands.storage.paths import ConversationPaths
|
||||
|
||||
|
||||
@sio.event
|
||||
@@ -77,7 +78,8 @@ async def connect(connection_id: str, environ: dict) -> None:
|
||||
|
||||
try:
|
||||
event_store = EventStore(
|
||||
conversation_id, conversation_manager.file_store, user_id
|
||||
ConversationPaths(conversation_id, user_id),
|
||||
conversation_manager.file_store,
|
||||
)
|
||||
except FileNotFoundError as e:
|
||||
logger.error(
|
||||
|
||||
@@ -15,6 +15,7 @@ from openhands.server.shared import conversation_manager, file_store
|
||||
from openhands.server.user_auth import get_user_id
|
||||
from openhands.server.utils import get_conversation, get_conversation_metadata
|
||||
from openhands.storage.data_models.conversation_metadata import ConversationMetadata
|
||||
from openhands.storage.paths import ConversationPaths
|
||||
|
||||
app = APIRouter(
|
||||
prefix='/api/conversations/{conversation_id}', dependencies=get_dependencies()
|
||||
@@ -140,9 +141,8 @@ async def search_events(
|
||||
|
||||
# Create an event store to access the events directly
|
||||
event_store = EventStore(
|
||||
sid=conversation_id,
|
||||
ConversationPaths(conversation_id, user_id),
|
||||
file_store=file_store,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
# Get matching events from the store
|
||||
|
||||
@@ -72,6 +72,7 @@ from openhands.storage.data_models.conversation_status import ConversationStatus
|
||||
from openhands.storage.data_models.settings import Settings
|
||||
from openhands.storage.data_models.user_secrets import UserSecrets
|
||||
from openhands.storage.locations import get_experiment_config_filename
|
||||
from openhands.storage.paths import ConversationPaths
|
||||
from openhands.storage.settings.settings_store import SettingsStore
|
||||
from openhands.utils.async_utils import wait_all
|
||||
from openhands.utils.conversation_summary import get_default_conversation_title
|
||||
@@ -378,7 +379,7 @@ async def get_prompt(
|
||||
):
|
||||
# get event store for the conversation
|
||||
event_store = EventStore(
|
||||
sid=conversation_id, file_store=file_store, user_id=metadata.user_id
|
||||
ConversationPaths(conversation_id, metadata.user_id), file_store
|
||||
)
|
||||
|
||||
# retrieve the relevant events
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from openhands.storage.locations import (
|
||||
get_conversation_agent_state_filename,
|
||||
get_conversation_dir,
|
||||
get_conversation_event_filename,
|
||||
get_conversation_events_dir,
|
||||
get_conversation_init_data_filename,
|
||||
get_conversation_llm_registry_filename,
|
||||
get_conversation_metadata_filename,
|
||||
get_conversation_stats_filename,
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ConversationPaths:
|
||||
sid: str
|
||||
user_id: str | None = None
|
||||
|
||||
def conversation_dir(self) -> str:
|
||||
return get_conversation_dir(self.sid, self.user_id)
|
||||
|
||||
def events_dir(self) -> str:
|
||||
return get_conversation_events_dir(self.sid, self.user_id)
|
||||
|
||||
def event_filename(self, id: int) -> str:
|
||||
return get_conversation_event_filename(self.sid, id, self.user_id)
|
||||
|
||||
def metadata_filename(self) -> str:
|
||||
return get_conversation_metadata_filename(self.sid, self.user_id)
|
||||
|
||||
def init_data_filename(self) -> str:
|
||||
return get_conversation_init_data_filename(self.sid, self.user_id)
|
||||
|
||||
def agent_state_filename(self) -> str:
|
||||
return get_conversation_agent_state_filename(self.sid, self.user_id)
|
||||
|
||||
def llm_registry_filename(self) -> str:
|
||||
return get_conversation_llm_registry_filename(self.sid, self.user_id)
|
||||
|
||||
def stats_filename(self) -> str:
|
||||
return get_conversation_stats_filename(self.sid, self.user_id)
|
||||
|
||||
def event_cache_filename(self, start: int, end: int) -> str:
|
||||
return f'{self.conversation_dir()}event_cache/{start}-{end}.json'
|
||||
@@ -10,6 +10,7 @@ from openhands.events.event_store import EventStore
|
||||
from openhands.llm.llm_registry import LLMRegistry
|
||||
from openhands.storage.data_models.settings import Settings
|
||||
from openhands.storage.files import FileStore
|
||||
from openhands.storage.paths import ConversationPaths
|
||||
|
||||
|
||||
async def generate_conversation_title(
|
||||
@@ -95,7 +96,9 @@ async def auto_generate_title(
|
||||
"""
|
||||
try:
|
||||
# Create an event store for the conversation
|
||||
event_store = EventStore(conversation_id, file_store, user_id)
|
||||
event_store = EventStore(
|
||||
ConversationPaths(conversation_id, user_id), file_store
|
||||
)
|
||||
|
||||
# Find the first user message
|
||||
first_user_message = None
|
||||
|
||||
@@ -16,6 +16,7 @@ from openhands.server.conversation_manager.standalone_conversation_manager impor
|
||||
from openhands.server.monitoring import MonitoringListener
|
||||
from openhands.storage.data_models.settings import Settings
|
||||
from openhands.storage.memory import InMemoryFileStore
|
||||
from openhands.storage.paths import ConversationPaths
|
||||
from openhands.utils.conversation_summary import auto_generate_title
|
||||
|
||||
|
||||
@@ -67,10 +68,13 @@ async def test_auto_generate_title_with_llm():
|
||||
# Verify the result
|
||||
assert title == 'Python Data Analysis Script'
|
||||
|
||||
# Verify EventStore was created with the correct parameters
|
||||
mock_event_store_cls.assert_called_once_with(
|
||||
conversation_id, file_store, user_id
|
||||
)
|
||||
# Verify EventStore was created with the correct parameters (paths + file_store)
|
||||
mock_event_store_cls.assert_called_once()
|
||||
call_args, _ = mock_event_store_cls.call_args
|
||||
assert isinstance(call_args[0], ConversationPaths)
|
||||
assert call_args[0].sid == conversation_id
|
||||
assert call_args[0].user_id == user_id
|
||||
assert call_args[1] is file_store
|
||||
|
||||
# Verify LLM registry was called with appropriate parameters
|
||||
llm_registry.request_extraneous_completion.assert_called_once()
|
||||
@@ -122,10 +126,13 @@ async def test_auto_generate_title_fallback():
|
||||
assert title == 'This is a very long message th...'
|
||||
assert len(title) <= 35
|
||||
|
||||
# Verify EventStore was created with the correct parameters
|
||||
mock_event_store_cls.assert_called_once_with(
|
||||
conversation_id, file_store, user_id
|
||||
)
|
||||
# Verify EventStore was created with the correct parameters (paths + file_store)
|
||||
mock_event_store_cls.assert_called_once()
|
||||
call_args, _ = mock_event_store_cls.call_args
|
||||
assert isinstance(call_args[0], ConversationPaths)
|
||||
assert call_args[0].sid == conversation_id
|
||||
assert call_args[0].user_id == user_id
|
||||
assert call_args[1] is file_store
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -163,10 +170,13 @@ async def test_auto_generate_title_no_messages():
|
||||
# Verify the result is empty
|
||||
assert title == ''
|
||||
|
||||
# Verify EventStore was created with the correct parameters
|
||||
mock_event_store_cls.assert_called_once_with(
|
||||
conversation_id, file_store, user_id
|
||||
)
|
||||
# Verify EventStore was created with the correct parameters (paths + file_store)
|
||||
mock_event_store_cls.assert_called_once()
|
||||
call_args, _ = mock_event_store_cls.call_args
|
||||
assert isinstance(call_args[0], ConversationPaths)
|
||||
assert call_args[0].sid == conversation_id
|
||||
assert call_args[0].user_id == user_id
|
||||
assert call_args[1] is file_store
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
Reference in New Issue
Block a user