diff --git a/openhands/events/event_filter.py b/openhands/events/event_filter.py new file mode 100644 index 0000000000..cfd41b568c --- /dev/null +++ b/openhands/events/event_filter.py @@ -0,0 +1,98 @@ +import json +from dataclasses import dataclass + +from openhands.events.event import Event +from openhands.events.serialization.event import event_to_dict + + +@dataclass +class EventFilter: + """A filter for Event objects in the event stream. + + EventFilter provides a flexible way to filter events based on various criteria + such as event type, source, date range, and content. It can be used to include + or exclude events from search results based on the specified criteria. + + Attributes: + exclude_hidden: Whether to exclude events marked as hidden. Defaults to False. + query: Text string to search for in event content. Case-insensitive. Defaults to None. + include_types: Tuple of Event types to include. Only events of these types will pass the filter. + Defaults to None (include all types). + exclude_types: Tuple of Event types to exclude. Events of these types will be filtered out. + Defaults to None (exclude no types). + source: Filter by event source (e.g., 'agent', 'user', 'environment'). Defaults to None. + start_date: ISO format date string. Only events after this date will pass the filter. + Defaults to None. + end_date: ISO format date string. Only events before this date will pass the filter. + Defaults to None. + """ + + exclude_hidden: bool = False + query: str | None = None + include_types: tuple[type[Event], ...] | None = None + exclude_types: tuple[type[Event], ...] | None = None + source: str | None = None + start_date: str | None = None + end_date: str | None = None + + def include(self, event: Event) -> bool: + """Determine if an event should be included based on the filter criteria. + + This method checks if the given event matches all the filter criteria. + If any criterion fails, the event is excluded. + + Args: + event: The Event object to check against the filter criteria. + + Returns: + bool: True if the event passes all filter criteria and should be included, + False otherwise. + """ + if self.include_types and not isinstance(event, self.include_types): + return False + + if self.exclude_types is not None and isinstance(event, self.exclude_types): + return False + + if self.source: + if event.source is None or event.source.value != self.source: + return False + + if ( + self.start_date + and event.timestamp is not None + and event.timestamp < self.start_date + ): + return False + + if ( + self.end_date + and event.timestamp is not None + and event.timestamp > self.end_date + ): + return False + + if self.exclude_hidden and getattr(event, 'hidden', False): + return False + + # Text search in event content if query provided + if self.query: + event_dict = event_to_dict(event) + event_str = json.dumps(event_dict).lower() + if self.query.lower() not in event_str: + return False + + return True + + def exclude(self, event: Event) -> bool: + """Determine if an event should be excluded based on the filter criteria. + + This is the inverse of the include method. + + Args: + event: The Event object to check against the filter criteria. + + Returns: + bool: True if the event should be excluded, False if it should be included. + """ + return not self.include(event) diff --git a/openhands/events/event_store.py b/openhands/events/event_store.py index 4b89ef8d67..fe094d66a8 100644 --- a/openhands/events/event_store.py +++ b/openhands/events/event_store.py @@ -4,7 +4,9 @@ from typing import Iterable from openhands.core.logger import openhands_logger as logger from openhands.events.event import Event, EventSource -from openhands.events.serialization.event import event_from_dict, event_to_dict +from openhands.events.event_filter import EventFilter +from openhands.events.event_store_abc import EventStoreABC +from openhands.events.serialization.event import event_from_dict from openhands.storage.files import FileStore from openhands.storage.locations import ( get_conversation_dir, @@ -39,7 +41,7 @@ _DUMMY_PAGE = _CachePage(None, 1, -1) @dataclass -class EventStore: +class EventStore(EventStoreABC): """ A stored list of events backing a conversation """ @@ -60,15 +62,6 @@ class EventStore: except FileNotFoundError: logger.debug(f'No events found for session {self.sid} at {events_dir}') - if self.user_id: - # During transition to new location, try old location if user_id is set - # TODO: remove this code after 5/1/2025 - try: - events_dir = get_conversation_events_dir(self.sid) - events += self.file_store.list(events_dir) - except FileNotFoundError: - logger.debug(f'No events found for session {self.sid} at {events_dir}') - if not events: self.cur_id = 0 return @@ -79,13 +72,12 @@ class EventStore: if id >= self.cur_id: self.cur_id = id + 1 - def get_events( + def search_events( self, start_id: int = 0, end_id: int | None = None, reverse: bool = False, - filter_out_type: tuple[type[Event], ...] | None = None, - filter_hidden: bool = False, + filter: EventFilter | None = None, ) -> Iterable[Event]: """ Retrieve events from the event stream, optionally filtering out events of a given type @@ -95,20 +87,12 @@ class EventStore: start_id: The ID of the first event to retrieve. Defaults to 0. end_id: The ID of the last event to retrieve. Defaults to the last event in the stream. reverse: Whether to retrieve events in reverse order. Defaults to False. - filter_out_type: A tuple of event types to filter out. Typically used to filter out backend events from the agent. - filter_hidden: If True, filters out events with the 'hidden' attribute set to True. + filter: EventFilter to use Yields: Events from the stream that match the criteria. """ - def should_filter(event: Event) -> bool: - if filter_hidden and hasattr(event, 'hidden') and event.hidden: - return True - if filter_out_type is not None and isinstance(event, filter_out_type): - return True - return False - if end_id is None: end_id = self.cur_id else: @@ -134,24 +118,15 @@ class EventStore: event = self.get_event(index) except FileNotFoundError: event = None - if event and not should_filter(event): - yield event + if event: + if not filter or filter.include(event): + yield event def get_event(self, id: int) -> Event: filename = self._get_filename_for_id(id, self.user_id) - try: - content = self.file_store.read(filename) - data = json.loads(content) - return event_from_dict(data) - except FileNotFoundError: - logger.debug(f'File {filename} not found') - # TODO remove this block after 5/1/2025 - if self.user_id: - filename = self._get_filename_for_id(id, None) - content = self.file_store.read(filename) - data = json.loads(content) - return event_from_dict(data) - raise + content = self.file_store.read(filename) + data = json.loads(content) + return event_from_dict(data) def get_latest_event(self) -> Event: return self.get_event(self.cur_id - 1) @@ -164,98 +139,6 @@ class EventStore: if event.source == source: yield event - def _should_filter_event( - self, - event: Event, - query: str | None = None, - event_types: tuple[type[Event], ...] | None = None, - source: str | None = None, - start_date: str | None = None, - end_date: str | None = None, - ) -> bool: - """Check if an event should be filtered out based on the given criteria. - - Args: - event: The event to check - query: Text to search for in event content - event_type: Filter by event type classes (e.g., (FileReadAction, ) ). - source: Filter by event source - start_date: Filter events after this date (ISO format) - end_date: Filter events before this date (ISO format) - - Returns: - bool: True if the event should be filtered out, False if it matches all criteria - """ - if event_types and not isinstance(event, event_types): - return True - - if source: - if event.source is None or event.source.value != source: - return True - - if start_date and event.timestamp is not None and event.timestamp < start_date: - return True - - if end_date and event.timestamp is not None and event.timestamp > end_date: - return True - - # Text search in event content if query provided - if query: - event_dict = event_to_dict(event) - event_str = json.dumps(event_dict).lower() - if query.lower() not in event_str: - return True - - return False - - def get_matching_events( - self, - query: str | None = None, - event_types: tuple[type[Event], ...] | None = None, - source: str | None = None, - start_date: str | None = None, - end_date: str | None = None, - start_id: int = 0, - limit: int = 100, - reverse: bool = False, - ) -> list[Event]: - """Get matching events from the event stream based on filters. - - Args: - query: Text to search for in event content - event_types: Filter by event type classes (e.g., (FileReadAction, ) ). - source: Filter by event source - start_date: Filter events after this date (ISO format) - end_date: Filter events before this date (ISO format) - start_id: Starting ID in the event stream. Defaults to 0 - limit: Maximum number of events to return. Must be between 1 and 100. Defaults to 100 - reverse: Whether to retrieve events in reverse order. Defaults to False. - - Returns: - list: List of matching events (as dicts) - - Raises: - ValueError: If limit is less than 1 or greater than 100 - """ - if limit < 1 or limit > 100: - raise ValueError('Limit must be between 1 and 100') - - matching_events: list = [] - - for event in self.get_events(start_id=start_id, reverse=reverse): - if self._should_filter_event( - event, query, event_types, source, start_date, end_date - ): - continue - - matching_events.append(event) - - # Stop if we have enough events - if len(matching_events) >= limit: - break - - return matching_events - def _get_filename_for_id(self, id: int, user_id: str | None) -> str: return get_conversation_event_filename(self.sid, id, user_id) diff --git a/openhands/events/event_store_abc.py b/openhands/events/event_store_abc.py new file mode 100644 index 0000000000..52e523e482 --- /dev/null +++ b/openhands/events/event_store_abc.py @@ -0,0 +1,113 @@ +from abc import abstractmethod +from itertools import islice +from typing import Iterable + +from deprecated import deprecated # type: ignore + +from openhands.events.event import Event, EventSource +from openhands.events.event_filter import EventFilter + + +class EventStoreABC: + """ + A stored list of events backing a conversation + """ + + sid: str + user_id: str | None + + @abstractmethod + def search_events( + self, + start_id: int = 0, + end_id: int | None = None, + reverse: bool = False, + filter: EventFilter | None = None, + ) -> Iterable[Event]: + """ + Retrieve events from the event stream, optionally excluding events using a filter + + Args: + start_id: The ID of the first event to retrieve. Defaults to 0. + end_id: The ID of the last event to retrieve. Defaults to the last event in the stream. + reverse: Whether to retrieve events in reverse order. Defaults to False. + filter: An optional event filter + + Yields: + Events from the stream that match the criteria. + """ + + @deprecated('Use search_events instead') + def get_events( + self, + start_id: int = 0, + end_id: int | None = None, + reverse: bool = False, + filter_out_type: tuple[type[Event], ...] | None = None, + filter_hidden: bool = False, + ) -> Iterable[Event]: + yield from self.search_events( + start_id, + end_id, + reverse, + EventFilter(exclude_types=filter_out_type, exclude_hidden=filter_hidden), + ) + + @abstractmethod + def get_event(self, id: int) -> Event: + """Retrieve a single event from the event stream. Raise a FileNotFoundError if there was no such event""" + + @abstractmethod + def get_latest_event(self) -> Event: + """Get the latest event from the event stream""" + + @abstractmethod + def get_latest_event_id(self) -> int: + """Get the id of the latest event from the event stream""" + + @deprecated('use search_events instead') + def filtered_events_by_source(self, source: EventSource) -> Iterable[Event]: + yield from self.search_events(filter=EventFilter(source=source)) + + @deprecated('use search_events instead') + def get_matching_events( + self, + query: str | None = None, + event_types: tuple[type[Event], ...] | None = None, + source: str | None = None, + start_date: str | None = None, + end_date: str | None = None, + start_id: int = 0, + limit: int = 100, + reverse: bool = False, + ) -> list[Event]: + """Get matching events from the event stream based on filters. + + Args: + query: Text to search for in event content + event_types: Filter by event type classes (e.g., (FileReadAction, ) ). + source: Filter by event source + start_date: Filter events after this date (ISO format) + end_date: Filter events before this date (ISO format) + start_id: Starting ID in the event stream. Defaults to 0 + limit: Maximum number of events to return. Must be between 1 and 100. Defaults to 100 + reverse: Whether to retrieve events in reverse order. Defaults to False. + + Returns: + list: List of matching events (as dicts) + """ + if limit < 1 or limit > 100: + raise ValueError('Limit must be between 1 and 100') + + events = self.search_events( + start_id=start_id, + reverse=reverse, + filter=EventFilter( + query=query, + include_types=event_types, + source=source, + start_date=start_date, + end_date=end_date, + ), + ) + return list(islice(events, limit)) diff --git a/tests/unit/test_event_stream.py b/tests/unit/test_event_stream.py index 9a3a5db72e..d5cf244678 100644 --- a/tests/unit/test_event_stream.py +++ b/tests/unit/test_event_stream.py @@ -19,6 +19,7 @@ from openhands.events.action.files import ( ) from openhands.events.action.message import MessageAction from openhands.events.event import FileEditSource, FileReadSource +from openhands.events.event_filter import EventFilter from openhands.events.observation import NullObservation from openhands.events.observation.files import ( FileEditObservation, @@ -177,12 +178,8 @@ def test_get_matching_events_source_filter(temp_dir: str): ) # Verify that source comparison works correctly - assert event_stream._should_filter_event( - event, source='agent' - ) # Should filter out None source events - assert not event_stream._should_filter_event( - event, source=None - ) # Should not filter out when source filter is None + assert EventFilter(source='agent').exclude(event) + assert EventFilter(source=None).include(event) # Filter by AGENT source again events = event_stream.get_matching_events(source='agent')