Compare commits

...

1 Commits

Author SHA1 Message Date
openhands
f4fcd51b3a Implement concurrent event fetching with batch size 2025-03-20 20:16:46 +00:00

View File

@@ -20,6 +20,9 @@ from openhands.storage.locations import (
from openhands.utils.async_utils import call_sync_from_async
from openhands.utils.shutdown_listener import should_continue
# Number of events to fetch concurrently in each batch
EVENT_BATCH_SIZE = 20
class EventStreamSubscriber(str, Enum):
AGENT_CONTROLLER = 'agent_controller'
@@ -215,30 +218,80 @@ class EventStream:
return True
return False
# Helper function to fetch a single event and handle exceptions
def fetch_single_event(event_id: int) -> tuple[int, Event | None]:
try:
event = self.get_event(event_id)
return (event_id, event)
except FileNotFoundError:
logger.debug(f'No event found for ID {event_id}')
return (event_id, None)
if reverse:
if end_id is None:
end_id = self._cur_id - 1
event_id = end_id
while event_id >= start_id:
try:
event = self.get_event(event_id)
if not should_filter(event):
current_id = end_id
while current_id >= start_id:
# Determine batch size for this iteration
batch_end = current_id
batch_start = max(start_id, current_id - EVENT_BATCH_SIZE + 1)
# Create list of IDs to fetch in this batch (in reverse order)
batch_ids = list(range(batch_end, batch_start - 1, -1))
# Fetch events concurrently
with ThreadPoolExecutor() as executor:
batch_results = list(executor.map(fetch_single_event, batch_ids))
# Sort results by ID in reverse order to maintain order
batch_results.sort(key=lambda x: x[0], reverse=True)
# Yield events in order
for event_id, event in batch_results:
if event is not None and not should_filter(event):
yield event
except FileNotFoundError:
logger.debug(f'No event found for ID {event_id}')
event_id -= 1
# Move to next batch
current_id = batch_start - 1
else:
event_id = start_id
current_id = start_id
while should_continue():
if end_id is not None and event_id > end_id:
if end_id is not None and current_id > end_id:
break
try:
event = self.get_event(event_id)
if not should_filter(event):
# Determine batch size for this iteration
batch_start = current_id
batch_end = (
end_id if end_id is not None else batch_start + EVENT_BATCH_SIZE - 1
)
batch_end = min(batch_end, batch_start + EVENT_BATCH_SIZE - 1)
# Create list of IDs to fetch in this batch
batch_ids = list(range(batch_start, batch_end + 1))
# Fetch events concurrently
with ThreadPoolExecutor() as executor:
batch_results = list(executor.map(fetch_single_event, batch_ids))
# Sort results by ID to maintain order
batch_results.sort(key=lambda x: x[0])
# Check if we've reached the end of available events
all_not_found = all(event is None for _, event in batch_results)
if all_not_found:
break
# Yield events in order
for event_id, event in batch_results:
if event is not None and not should_filter(event):
yield event
except FileNotFoundError:
break
event_id += 1
elif event is None and event_id == batch_start:
# If the first event in the batch is missing, we've reached the end
return
# Move to next batch
current_id = batch_end + 1
def get_event(self, id: int) -> Event:
filename = self._get_filename_for_id(id, self.user_id)