Refactor event store cleanup (#8505)

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
tofarr
2025-05-14 16:20:43 -06:00
committed by GitHub
parent f046efd53d
commit 7e88d4195f
4 changed files with 227 additions and 136 deletions

View File

@@ -0,0 +1,98 @@
import json
from dataclasses import dataclass
from openhands.events.event import Event
from openhands.events.serialization.event import event_to_dict
@dataclass
class EventFilter:
"""A filter for Event objects in the event stream.
EventFilter provides a flexible way to filter events based on various criteria
such as event type, source, date range, and content. It can be used to include
or exclude events from search results based on the specified criteria.
Attributes:
exclude_hidden: Whether to exclude events marked as hidden. Defaults to False.
query: Text string to search for in event content. Case-insensitive. Defaults to None.
include_types: Tuple of Event types to include. Only events of these types will pass the filter.
Defaults to None (include all types).
exclude_types: Tuple of Event types to exclude. Events of these types will be filtered out.
Defaults to None (exclude no types).
source: Filter by event source (e.g., 'agent', 'user', 'environment'). Defaults to None.
start_date: ISO format date string. Only events after this date will pass the filter.
Defaults to None.
end_date: ISO format date string. Only events before this date will pass the filter.
Defaults to None.
"""
exclude_hidden: bool = False
query: str | None = None
include_types: tuple[type[Event], ...] | None = None
exclude_types: tuple[type[Event], ...] | None = None
source: str | None = None
start_date: str | None = None
end_date: str | None = None
def include(self, event: Event) -> bool:
"""Determine if an event should be included based on the filter criteria.
This method checks if the given event matches all the filter criteria.
If any criterion fails, the event is excluded.
Args:
event: The Event object to check against the filter criteria.
Returns:
bool: True if the event passes all filter criteria and should be included,
False otherwise.
"""
if self.include_types and not isinstance(event, self.include_types):
return False
if self.exclude_types is not None and isinstance(event, self.exclude_types):
return False
if self.source:
if event.source is None or event.source.value != self.source:
return False
if (
self.start_date
and event.timestamp is not None
and event.timestamp < self.start_date
):
return False
if (
self.end_date
and event.timestamp is not None
and event.timestamp > self.end_date
):
return False
if self.exclude_hidden and getattr(event, 'hidden', False):
return False
# Text search in event content if query provided
if self.query:
event_dict = event_to_dict(event)
event_str = json.dumps(event_dict).lower()
if self.query.lower() not in event_str:
return False
return True
def exclude(self, event: Event) -> bool:
"""Determine if an event should be excluded based on the filter criteria.
This is the inverse of the include method.
Args:
event: The Event object to check against the filter criteria.
Returns:
bool: True if the event should be excluded, False if it should be included.
"""
return not self.include(event)

View File

@@ -4,7 +4,9 @@ from typing import Iterable
from openhands.core.logger import openhands_logger as logger
from openhands.events.event import Event, EventSource
from openhands.events.serialization.event import event_from_dict, event_to_dict
from openhands.events.event_filter import EventFilter
from openhands.events.event_store_abc import EventStoreABC
from openhands.events.serialization.event import event_from_dict
from openhands.storage.files import FileStore
from openhands.storage.locations import (
get_conversation_dir,
@@ -39,7 +41,7 @@ _DUMMY_PAGE = _CachePage(None, 1, -1)
@dataclass
class EventStore:
class EventStore(EventStoreABC):
"""
A stored list of events backing a conversation
"""
@@ -60,15 +62,6 @@ class EventStore:
except FileNotFoundError:
logger.debug(f'No events found for session {self.sid} at {events_dir}')
if self.user_id:
# During transition to new location, try old location if user_id is set
# TODO: remove this code after 5/1/2025
try:
events_dir = get_conversation_events_dir(self.sid)
events += self.file_store.list(events_dir)
except FileNotFoundError:
logger.debug(f'No events found for session {self.sid} at {events_dir}')
if not events:
self.cur_id = 0
return
@@ -79,13 +72,12 @@ class EventStore:
if id >= self.cur_id:
self.cur_id = id + 1
def get_events(
def search_events(
self,
start_id: int = 0,
end_id: int | None = None,
reverse: bool = False,
filter_out_type: tuple[type[Event], ...] | None = None,
filter_hidden: bool = False,
filter: EventFilter | None = None,
) -> Iterable[Event]:
"""
Retrieve events from the event stream, optionally filtering out events of a given type
@@ -95,20 +87,12 @@ class EventStore:
start_id: The ID of the first event to retrieve. Defaults to 0.
end_id: The ID of the last event to retrieve. Defaults to the last event in the stream.
reverse: Whether to retrieve events in reverse order. Defaults to False.
filter_out_type: A tuple of event types to filter out. Typically used to filter out backend events from the agent.
filter_hidden: If True, filters out events with the 'hidden' attribute set to True.
filter: EventFilter to use
Yields:
Events from the stream that match the criteria.
"""
def should_filter(event: Event) -> bool:
if filter_hidden and hasattr(event, 'hidden') and event.hidden:
return True
if filter_out_type is not None and isinstance(event, filter_out_type):
return True
return False
if end_id is None:
end_id = self.cur_id
else:
@@ -134,24 +118,15 @@ class EventStore:
event = self.get_event(index)
except FileNotFoundError:
event = None
if event and not should_filter(event):
yield event
if event:
if not filter or filter.include(event):
yield event
def get_event(self, id: int) -> Event:
filename = self._get_filename_for_id(id, self.user_id)
try:
content = self.file_store.read(filename)
data = json.loads(content)
return event_from_dict(data)
except FileNotFoundError:
logger.debug(f'File {filename} not found')
# TODO remove this block after 5/1/2025
if self.user_id:
filename = self._get_filename_for_id(id, None)
content = self.file_store.read(filename)
data = json.loads(content)
return event_from_dict(data)
raise
content = self.file_store.read(filename)
data = json.loads(content)
return event_from_dict(data)
def get_latest_event(self) -> Event:
return self.get_event(self.cur_id - 1)
@@ -164,98 +139,6 @@ class EventStore:
if event.source == source:
yield event
def _should_filter_event(
self,
event: Event,
query: str | None = None,
event_types: tuple[type[Event], ...] | None = None,
source: str | None = None,
start_date: str | None = None,
end_date: str | None = None,
) -> bool:
"""Check if an event should be filtered out based on the given criteria.
Args:
event: The event to check
query: Text to search for in event content
event_type: Filter by event type classes (e.g., (FileReadAction, ) ).
source: Filter by event source
start_date: Filter events after this date (ISO format)
end_date: Filter events before this date (ISO format)
Returns:
bool: True if the event should be filtered out, False if it matches all criteria
"""
if event_types and not isinstance(event, event_types):
return True
if source:
if event.source is None or event.source.value != source:
return True
if start_date and event.timestamp is not None and event.timestamp < start_date:
return True
if end_date and event.timestamp is not None and event.timestamp > end_date:
return True
# Text search in event content if query provided
if query:
event_dict = event_to_dict(event)
event_str = json.dumps(event_dict).lower()
if query.lower() not in event_str:
return True
return False
def get_matching_events(
self,
query: str | None = None,
event_types: tuple[type[Event], ...] | None = None,
source: str | None = None,
start_date: str | None = None,
end_date: str | None = None,
start_id: int = 0,
limit: int = 100,
reverse: bool = False,
) -> list[Event]:
"""Get matching events from the event stream based on filters.
Args:
query: Text to search for in event content
event_types: Filter by event type classes (e.g., (FileReadAction, ) ).
source: Filter by event source
start_date: Filter events after this date (ISO format)
end_date: Filter events before this date (ISO format)
start_id: Starting ID in the event stream. Defaults to 0
limit: Maximum number of events to return. Must be between 1 and 100. Defaults to 100
reverse: Whether to retrieve events in reverse order. Defaults to False.
Returns:
list: List of matching events (as dicts)
Raises:
ValueError: If limit is less than 1 or greater than 100
"""
if limit < 1 or limit > 100:
raise ValueError('Limit must be between 1 and 100')
matching_events: list = []
for event in self.get_events(start_id=start_id, reverse=reverse):
if self._should_filter_event(
event, query, event_types, source, start_date, end_date
):
continue
matching_events.append(event)
# Stop if we have enough events
if len(matching_events) >= limit:
break
return matching_events
def _get_filename_for_id(self, id: int, user_id: str | None) -> str:
return get_conversation_event_filename(self.sid, id, user_id)

View File

@@ -0,0 +1,113 @@
from abc import abstractmethod
from itertools import islice
from typing import Iterable
from deprecated import deprecated # type: ignore
from openhands.events.event import Event, EventSource
from openhands.events.event_filter import EventFilter
class EventStoreABC:
"""
A stored list of events backing a conversation
"""
sid: str
user_id: str | None
@abstractmethod
def search_events(
self,
start_id: int = 0,
end_id: int | None = None,
reverse: bool = False,
filter: EventFilter | None = None,
) -> Iterable[Event]:
"""
Retrieve events from the event stream, optionally excluding events using a filter
Args:
start_id: The ID of the first event to retrieve. Defaults to 0.
end_id: The ID of the last event to retrieve. Defaults to the last event in the stream.
reverse: Whether to retrieve events in reverse order. Defaults to False.
filter: An optional event filter
Yields:
Events from the stream that match the criteria.
"""
@deprecated('Use search_events instead')
def get_events(
self,
start_id: int = 0,
end_id: int | None = None,
reverse: bool = False,
filter_out_type: tuple[type[Event], ...] | None = None,
filter_hidden: bool = False,
) -> Iterable[Event]:
yield from self.search_events(
start_id,
end_id,
reverse,
EventFilter(exclude_types=filter_out_type, exclude_hidden=filter_hidden),
)
@abstractmethod
def get_event(self, id: int) -> Event:
"""Retrieve a single event from the event stream. Raise a FileNotFoundError if there was no such event"""
@abstractmethod
def get_latest_event(self) -> Event:
"""Get the latest event from the event stream"""
@abstractmethod
def get_latest_event_id(self) -> int:
"""Get the id of the latest event from the event stream"""
@deprecated('use search_events instead')
def filtered_events_by_source(self, source: EventSource) -> Iterable[Event]:
yield from self.search_events(filter=EventFilter(source=source))
@deprecated('use search_events instead')
def get_matching_events(
self,
query: str | None = None,
event_types: tuple[type[Event], ...] | None = None,
source: str | None = None,
start_date: str | None = None,
end_date: str | None = None,
start_id: int = 0,
limit: int = 100,
reverse: bool = False,
) -> list[Event]:
"""Get matching events from the event stream based on filters.
Args:
query: Text to search for in event content
event_types: Filter by event type classes (e.g., (FileReadAction, ) ).
source: Filter by event source
start_date: Filter events after this date (ISO format)
end_date: Filter events before this date (ISO format)
start_id: Starting ID in the event stream. Defaults to 0
limit: Maximum number of events to return. Must be between 1 and 100. Defaults to 100
reverse: Whether to retrieve events in reverse order. Defaults to False.
Returns:
list: List of matching events (as dicts)
"""
if limit < 1 or limit > 100:
raise ValueError('Limit must be between 1 and 100')
events = self.search_events(
start_id=start_id,
reverse=reverse,
filter=EventFilter(
query=query,
include_types=event_types,
source=source,
start_date=start_date,
end_date=end_date,
),
)
return list(islice(events, limit))

View File

@@ -19,6 +19,7 @@ from openhands.events.action.files import (
)
from openhands.events.action.message import MessageAction
from openhands.events.event import FileEditSource, FileReadSource
from openhands.events.event_filter import EventFilter
from openhands.events.observation import NullObservation
from openhands.events.observation.files import (
FileEditObservation,
@@ -177,12 +178,8 @@ def test_get_matching_events_source_filter(temp_dir: str):
)
# Verify that source comparison works correctly
assert event_stream._should_filter_event(
event, source='agent'
) # Should filter out None source events
assert not event_stream._should_filter_event(
event, source=None
) # Should not filter out when source filter is None
assert EventFilter(source='agent').exclude(event)
assert EventFilter(source=None).include(event)
# Filter by AGENT source again
events = event_stream.get_matching_events(source='agent')