Compare commits

...

5 Commits

Author SHA1 Message Date
Graham Neubig
e94ca25ace Merge branch 'main' into fix/eventstore-cache-gcp-errors 2025-11-04 15:15:44 -05:00
openhands
a934ff2660 Apply pre-commit formatting fixes
- Fix line length in EventStream constructor
- Remove unused json import from test file
- Convert all string literals to single quotes for consistency
- Fix other minor formatting issues

Co-authored-by: openhands <openhands@all-hands.dev>
2025-08-23 15:33:52 +00:00
openhands
043280bf4e Fix linting issues in EventStore and test files
- Fix docstring formatting and missing argument descriptions
- Apply ruff formatting to test file
- Ensure all code passes pre-commit hooks

Co-authored-by: openhands <openhands@all-hands.dev>
2025-08-23 15:26:56 +00:00
openhands
15422ca650 Add comprehensive tests for EventStore cache control functionality
- Tests verify use_cache flag behavior for both EventStore and EventStream
- Tests confirm EventStore defaults to use_cache=False (no cache reading)
- Tests confirm EventStream defaults to use_cache=True (cache enabled)
- Tests verify cache disabled returns dummy page to avoid storage requests
- Tests verify backwards compatibility with existing code
- Tests confirm no GCP storage requests made for cache files when disabled

Co-authored-by: openhands <openhands@all-hands.dev>
2025-08-23 15:04:06 +00:00
openhands
9d7009dab4 Fix GCP storage errors by disabling cache reading in EventStore
EventStore was attempting to read cache files that it cannot create,
causing 'No such object' errors in GCP storage. This fix adds a
use_cache flag to control cache behavior:

- EventStore: use_cache=False (default) - skips cache reading entirely
- EventStream: use_cache=True - maintains cache functionality

This eliminates unnecessary storage requests while preserving the
fallback mechanism to read individual event files.

Fixes GCP storage retrieval errors that started occurring after
commit 9b0a5da83 switched remember-prompt endpoint to use EventStore.

Co-authored-by: openhands <openhands@all-hands.dev>
2025-08-23 14:58:28 +00:00
3 changed files with 255 additions and 4 deletions

View File

@@ -42,12 +42,13 @@ _DUMMY_PAGE = _CachePage(None, 1, -1)
@dataclass
class EventStore(EventStoreABC):
"""A stored list of events backing a conversation"""
"""A stored list of events backing a conversation."""
sid: str
file_store: FileStore
user_id: str | None
cache_size: int = 25
use_cache: bool = False # EventStore doesn't write cache files, so disable cache reading by default
_cur_id: int | None = None # Private field to cache the calculated value
@property
@@ -90,14 +91,14 @@ class EventStore(EventStoreABC):
filter: EventFilter | None = None,
limit: int | None = None,
) -> Iterable[Event]:
"""Retrieve events from the event stream, optionally filtering out events of a given type
and events marked as hidden.
"""Retrieve events from the event stream, optionally filtering out events of a given type and events marked as hidden.
Args:
start_id: The ID of the first event to retrieve. Defaults to 0.
end_id: The ID of the last event to retrieve. Defaults to the last event in the stream.
reverse: Whether to retrieve events in reverse order. Defaults to False.
filter: EventFilter to use
limit: Maximum number of events to retrieve. Defaults to None (no limit).
Yields:
Events from the stream that match the criteria.
@@ -170,6 +171,9 @@ class EventStore(EventStoreABC):
return page
def _load_cache_page_for_index(self, index: int) -> _CachePage:
if not self.use_cache:
# Return a dummy page with no events, forcing fallback to individual event reading
return _DUMMY_PAGE
offset = index % self.cache_size
index -= offset
return self._load_cache_page(index, index + self.cache_size)

View File

@@ -54,7 +54,9 @@ class EventStream(EventStore):
_write_page_cache: list[dict]
def __init__(self, sid: str, file_store: FileStore, user_id: str | None = None):
super().__init__(sid, file_store, user_id)
super().__init__(
sid, file_store, user_id, use_cache=True
) # EventStream can write cache files
self._stop_flag = threading.Event()
self._queue: queue.Queue[Event] = queue.Queue()
self._thread_pools = {}

View File

@@ -0,0 +1,245 @@
"""Unit tests for EventStore cache functionality.
These tests focus on the cache control behavior introduced to fix GCP storage errors
where EventStore was attempting to read cache files it cannot create.
"""
import tempfile
from unittest.mock import MagicMock, patch
import pytest
from openhands.events.action import NullAction
from openhands.events.event import EventSource
from openhands.events.event_store import EventStore
from openhands.events.observation import NullObservation
from openhands.events.stream import EventStream
from openhands.storage.local import LocalFileStore
@pytest.fixture
def temp_dir():
"""Create a temporary directory for testing."""
with tempfile.TemporaryDirectory() as temp_dir:
yield temp_dir
@pytest.fixture
def file_store(temp_dir):
"""Create a LocalFileStore for testing."""
return LocalFileStore(temp_dir)
class TestEventStoreCache:
"""Tests for EventStore cache control functionality."""
def test_event_store_cache_disabled_by_default(self, file_store):
"""Test that EventStore has cache disabled by default."""
event_store = EventStore(
sid='test-conversation', file_store=file_store, user_id='test-user'
)
assert event_store.use_cache is False, (
'EventStore should have use_cache=False by default'
)
def test_event_store_cache_can_be_enabled(self, file_store):
"""Test that EventStore cache can be explicitly enabled."""
event_store = EventStore(
sid='test-conversation',
file_store=file_store,
user_id='test-user',
use_cache=True,
)
assert event_store.use_cache is True, (
'EventStore should respect explicit use_cache=True'
)
def test_event_stream_cache_enabled_by_default(self, file_store):
"""Test that EventStream has cache enabled by default."""
event_stream = EventStream(
sid='test-conversation', file_store=file_store, user_id='test-user'
)
assert event_stream.use_cache is True, (
'EventStream should have use_cache=True by default'
)
def test_cache_disabled_returns_dummy_page(self, file_store):
"""Test that when cache is disabled, _load_cache_page_for_index returns dummy page."""
event_store = EventStore(
sid='test-conversation',
file_store=file_store,
user_id='test-user',
use_cache=False,
)
# Load a cache page for any index
cache_page = event_store._load_cache_page_for_index(150)
# Should return the dummy page
assert cache_page.events is None, 'Should return dummy page with None events'
assert cache_page.start == 1, 'Dummy page should have start=1'
assert cache_page.end == -1, 'Dummy page should have end=-1'
def test_cache_enabled_attempts_to_load_cache(self, file_store):
"""Test that when cache is enabled, _load_cache_page_for_index attempts to load cache."""
event_store = EventStore(
sid='test-conversation',
file_store=file_store,
user_id='test-user',
use_cache=True,
)
# Mock the _load_cache_page method to verify it's called
with patch.object(event_store, '_load_cache_page') as mock_load_cache:
mock_load_cache.return_value = MagicMock(events=None, start=150, end=175)
# Load a cache page for index 150
event_store._load_cache_page_for_index(150)
# Should have called _load_cache_page
mock_load_cache.assert_called_once_with(150, 175)
def test_search_events_works_without_cache(self, file_store):
"""Test that search_events works correctly when cache is disabled."""
event_store = EventStore(
sid='test-conversation',
file_store=file_store,
user_id='test-user',
use_cache=False,
)
# Search for events should work without errors
events = list(event_store.search_events())
# Should return empty list since no events exist
assert events == [], 'Should return empty list when no events exist'
def test_search_events_with_missing_cache_files(self, file_store):
"""Test that search_events handles missing cache files gracefully when cache is disabled."""
# Create an EventStore with cache disabled
event_store = EventStore(
sid='test-conversation',
file_store=file_store,
user_id='test-user',
use_cache=False,
)
# Mock file_store.read to simulate GCP "No such object" error
original_read = file_store.read
def mock_read(path):
if 'event_cache' in path:
# Simulate GCP storage error
raise Exception(
'No such object: prod-openhands-sessions/users/.../event_cache/150-175.json'
)
return original_read(path)
file_store.read = mock_read
# Search for events in a specific range
events = list(event_store.search_events(start_id=150, end_id=175))
# Should work without errors (returns empty since no individual event files exist)
assert events == [], 'Should handle missing cache files gracefully'
def test_get_event_works_without_cache(self, file_store):
"""Test that get_event works correctly when cache is disabled."""
event_store = EventStore(
sid='test-conversation',
file_store=file_store,
user_id='test-user',
use_cache=False,
)
# Try to get a specific event that doesn't exist
# This should handle the FileNotFoundError gracefully
try:
event = event_store.get_event(0)
# If no exception, should return None
assert event is None, "Should return None when event doesn't exist"
except FileNotFoundError:
# This is expected behavior when the event file doesn't exist
pass
def test_cache_integration_with_event_stream(self, file_store):
"""Test that EventStream can write cache files and EventStore can read them when enabled."""
# Create an EventStream (cache enabled by default)
event_stream = EventStream(
sid='test-conversation', file_store=file_store, user_id='test-user'
)
# Add some events to create cache files
for i in range(10):
event_stream.add_event(NullObservation(f'test{i}'), EventSource.AGENT)
# Close the stream to ensure cache files are written
event_stream.close()
# Create an EventStore with cache enabled
event_store = EventStore(
sid='test-conversation',
file_store=file_store,
user_id='test-user',
use_cache=True,
)
# Should be able to read events using cache
events = list(event_store.search_events())
# Should find the events
assert len(events) > 0, 'Should find events when cache is enabled'
def test_cache_disabled_avoids_storage_requests(self, file_store):
"""Test that when cache is disabled, no storage requests are made for cache files."""
event_store = EventStore(
sid='test-conversation',
file_store=file_store,
user_id='test-user',
use_cache=False,
)
# Mock file_store.read to track calls
original_read = file_store.read
read_calls = []
def mock_read(path):
read_calls.append(path)
return original_read(path)
file_store.read = mock_read
# Search for events
list(event_store.search_events(start_id=150, end_id=175))
# Should not have made any calls to read cache files
cache_calls = [call for call in read_calls if 'event_cache' in call]
assert len(cache_calls) == 0, (
'Should not make storage requests for cache files when cache is disabled'
)
def test_backwards_compatibility(self, file_store):
"""Test that existing code continues to work with the new cache control."""
# Create EventStore without specifying use_cache (should default to False)
event_store = EventStore(
sid='test-conversation', file_store=file_store, user_id='test-user'
)
# Should work without errors
events = list(event_store.search_events())
assert events == [], 'Should work with default cache settings'
# Create EventStream without specifying use_cache (should default to True)
event_stream = EventStream(
sid='test-conversation-2', file_store=file_store, user_id='test-user'
)
# Should work without errors
event_stream.add_event(NullAction(), EventSource.AGENT)
events = list(event_stream.get_events())
assert len(events) == 1, 'EventStream should work with default cache settings'
event_stream.close()