APP-307 Add Google Cloud Storage-based EventService implementation (#12264)

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
Tim O'Farrell
2026-01-06 15:52:07 -07:00
committed by GitHub
parent af5c22700c
commit fa974f8106
13 changed files with 447 additions and 435 deletions

View File

@@ -133,6 +133,9 @@ def config_from_env() -> AppServerConfig:
from openhands.app_server.event.filesystem_event_service import (
FilesystemEventServiceInjector,
)
from openhands.app_server.event.google_cloud_event_service import (
GoogleCloudEventServiceInjector,
)
from openhands.app_server.event_callback.sql_event_callback_service import (
SQLEventCallbackServiceInjector,
)
@@ -161,7 +164,13 @@ def config_from_env() -> AppServerConfig:
config: AppServerConfig = from_env(AppServerConfig, 'OH') # type: ignore
if config.event is None:
config.event = FilesystemEventServiceInjector()
if os.environ.get('FILE_STORE') == 'google_cloud':
# Legacy V0 google cloud storage configuration
config.event = GoogleCloudEventServiceInjector(
bucket_name=os.environ.get('FILE_STORE_PATH')
)
else:
config.event = FilesystemEventServiceInjector()
if config.event_callback is None:
config.event_callback = SQLEventCallbackServiceInjector()

View File

@@ -12,7 +12,7 @@ from openhands.app_server.event.event_service import EventService
from openhands.app_server.event_callback.event_callback_models import EventKind
from openhands.sdk import Event
router = APIRouter(prefix='/events', tags=['Events'])
router = APIRouter(prefix='/conversation/{conversation_id}/events', tags=['Events'])
event_service_dependency = depends_event_service()
@@ -21,10 +21,7 @@ event_service_dependency = depends_event_service()
@router.get('/search')
async def search_events(
conversation_id__eq: Annotated[
str | None,
Query(title='Optional filter by conversation ID'),
] = None,
conversation_id: str,
kind__eq: Annotated[
EventKind | None,
Query(title='Optional filter by event kind'),
@@ -55,7 +52,7 @@ async def search_events(
assert limit > 0
assert limit <= 100
return await event_service.search_events(
conversation_id__eq=UUID(conversation_id__eq) if conversation_id__eq else None,
conversation_id=UUID(conversation_id),
kind__eq=kind__eq,
timestamp__gte=timestamp__gte,
timestamp__lt=timestamp__lt,
@@ -67,10 +64,7 @@ async def search_events(
@router.get('/count')
async def count_events(
conversation_id__eq: Annotated[
str | None,
Query(title='Optional filter by conversation ID'),
] = None,
conversation_id: str,
kind__eq: Annotated[
EventKind | None,
Query(title='Optional filter by event kind'),
@@ -83,28 +77,25 @@ async def count_events(
datetime | None,
Query(title='Optional filter by timestamp less than'),
] = None,
sort_order: Annotated[
EventSortOrder,
Query(title='Sort order for results'),
] = EventSortOrder.TIMESTAMP,
event_service: EventService = event_service_dependency,
) -> int:
"""Count events matching the given filters."""
return await event_service.count_events(
conversation_id__eq=UUID(conversation_id__eq) if conversation_id__eq else None,
conversation_id=UUID(conversation_id),
kind__eq=kind__eq,
timestamp__gte=timestamp__gte,
timestamp__lt=timestamp__lt,
sort_order=sort_order,
)
@router.get('')
async def batch_get_events(
conversation_id: str,
id: Annotated[list[str], Query()],
event_service: EventService = event_service_dependency,
) -> list[Event | None]:
"""Get a batch of events given their ids, returning null for any missing event."""
event_ids = [UUID(id_) for id_ in id]
assert len(id) <= 100
events = await event_service.batch_get_events(id)
events = await event_service.batch_get_events(UUID(conversation_id), event_ids)
return events

View File

@@ -17,13 +17,13 @@ class EventService(ABC):
"""Event Service for getting events."""
@abstractmethod
async def get_event(self, event_id: str) -> Event | None:
async def get_event(self, conversation_id: UUID, event_id: UUID) -> Event | None:
"""Given an id, retrieve an event."""
@abstractmethod
async def search_events(
self,
conversation_id__eq: UUID | None = None,
conversation_id: UUID,
kind__eq: EventKind | None = None,
timestamp__gte: datetime | None = None,
timestamp__lt: datetime | None = None,
@@ -36,11 +36,10 @@ class EventService(ABC):
@abstractmethod
async def count_events(
self,
conversation_id__eq: UUID | None = None,
conversation_id: UUID,
kind__eq: EventKind | None = None,
timestamp__gte: datetime | None = None,
timestamp__lt: datetime | None = None,
sort_order: EventSortOrder = EventSortOrder.TIMESTAMP,
) -> int:
"""Count events matching the given filters."""
@@ -48,10 +47,12 @@ class EventService(ABC):
async def save_event(self, conversation_id: UUID, event: Event):
"""Save an event. Internal method intended not be part of the REST api."""
async def batch_get_events(self, event_ids: list[str]) -> list[Event | None]:
async def batch_get_events(
self, conversation_id: UUID, event_ids: list[UUID]
) -> list[Event | None]:
"""Given a list of ids, get events (Or none for any which were not found)."""
return await asyncio.gather(
*[self.get_event(event_id) for event_id in event_ids]
*[self.get_event(conversation_id, event_id) for event_id in event_ids]
)

View File

@@ -0,0 +1,165 @@
import asyncio
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from uuid import UUID
from openhands.agent_server.models import EventPage, EventSortOrder
from openhands.agent_server.sockets import page_iterator
from openhands.app_server.app_conversation.app_conversation_info_service import (
AppConversationInfoService,
)
from openhands.app_server.app_conversation.app_conversation_models import (
AppConversationInfo,
)
from openhands.app_server.event.event_service import EventService
from openhands.app_server.event_callback.event_callback_models import EventKind
from openhands.sdk import Event
@dataclass
class EventServiceBase(EventService, ABC):
"""Event Service for getting events - the only check on permissions for events is
in the strict prefix for storage.
"""
prefix: Path
user_id: str | None
app_conversation_info_service: AppConversationInfoService | None
app_conversation_info_load_tasks: dict[
UUID, asyncio.Task[AppConversationInfo | None]
]
@abstractmethod
def _load_event(self, path: Path) -> Event | None:
"""Get the event at the path given."""
@abstractmethod
def _store_event(self, path: Path, event: Event):
"""Store the event given at the path given."""
@abstractmethod
def _search_paths(self, prefix: Path) -> list[Path]:
"""Search paths."""
async def get_conversation_path(self, conversation_id: UUID) -> Path:
"""Get a path for a conversation. Ensure user_id is included if possible."""
path = self.prefix
if self.user_id:
path /= self.user_id
elif self.app_conversation_info_service:
task = self.app_conversation_info_load_tasks.get(conversation_id)
if task is None:
task = asyncio.create_task(
self.app_conversation_info_service.get_app_conversation_info(
conversation_id
)
)
self.app_conversation_info_load_tasks[conversation_id] = task
conversation_info = await task
if conversation_info and conversation_info.created_by_user_id:
path /= conversation_info.created_by_user_id
path = path / 'v1_conversations' / conversation_id.hex
return path
async def get_event(self, conversation_id: UUID, event_id: UUID) -> Event | None:
"""Get the event with the given id, or None if not found."""
conversation_path = await self.get_conversation_path(conversation_id)
path = conversation_path / f'{event_id.hex}.json'
loop = asyncio.get_running_loop()
event: Event = await loop.run_in_executor(None, self._load_event, path)
return event
async def search_events(
self,
conversation_id: UUID,
kind__eq: EventKind | None = None,
timestamp__gte: datetime | None = None,
timestamp__lt: datetime | None = None,
sort_order: EventSortOrder = EventSortOrder.TIMESTAMP,
page_id: str | None = None,
limit: int = 100,
) -> EventPage:
"""Search events matching the given filters."""
loop = asyncio.get_running_loop()
prefix = await self.get_conversation_path(conversation_id)
paths = await loop.run_in_executor(None, self._search_paths, prefix)
events = await asyncio.gather(
*[loop.run_in_executor(None, self._load_event, path) for path in paths]
)
items = []
for event in events:
if not event:
continue
if kind__eq and event.kind != kind__eq:
continue
if timestamp__gte and event.timestamp < timestamp__gte:
continue
if timestamp__lt and event.timestamp >= timestamp__lt:
continue
items.append(event)
if sort_order:
items.sort(
key=lambda e: e.timestamp,
reverse=(sort_order == EventSortOrder.TIMESTAMP_DESC),
)
start_offset = 0
if page_id:
start_offset = int(page_id)
paths = paths[start_offset:]
if len(paths) > limit:
paths = paths[:limit]
next_page_id = str(start_offset + limit)
return EventPage(items, next_page_id=next_page_id)
async def count_events(
self,
conversation_id: UUID,
kind__eq: EventKind | None = None,
timestamp__gte: datetime | None = None,
timestamp__lt: datetime | None = None,
) -> int:
"""Count events matching the given filters."""
# If we are not filtering, we can simply count the paths
if not (kind__eq or timestamp__gte or timestamp__lt):
conversation_path = await self.get_conversation_path(conversation_id)
result = await self._count_events_no_filter(conversation_path)
return result
events = page_iterator(
self.search_events,
conversation_id=conversation_id,
kind__eq=kind__eq,
timestamp__gte=timestamp__gte,
timestamp__lt=timestamp__lt,
)
result = sum(1 for event in events)
return result
async def _count_events_no_filter(self, conversation_path: Path) -> int:
paths = page_iterator(self._search_paths, conversation_path)
result = 0
async for _ in paths:
result += 1
return result
async def save_event(self, conversation_id: UUID, event: Event):
if isinstance(event.id, str):
id_hex = event.id.replace('-', '')
else:
id_hex = event.id.hex
path = (await self.get_conversation_path(conversation_id)) / f'{id_hex}.json'
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._store_event, path, event)
async def batch_get_events(
self, conversation_id: UUID, event_ids: list[UUID]
) -> list[Event | None]:
"""Given a list of ids, get events (Or none for any which were not found)."""
return await asyncio.gather(
*[self.get_event(conversation_id, event_id) for event_id in event_ids]
)

View File

@@ -1,87 +1,44 @@
"""Filesystem-based EventService implementation."""
import json
import glob
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import AsyncGenerator
from uuid import UUID
from fastapi import Request
from openhands.app_server.app_conversation.app_conversation_info_service import (
AppConversationInfoService,
)
from openhands.app_server.errors import OpenHandsError
from openhands.app_server.event.event_service import EventService, EventServiceInjector
from openhands.app_server.event.filesystem_event_service_base import (
FilesystemEventServiceBase,
)
from openhands.app_server.event.event_service_base import EventServiceBase
from openhands.app_server.services.injector import InjectorState
from openhands.sdk import Event
_logger = logging.getLogger(__name__)
@dataclass
class FilesystemEventService(FilesystemEventServiceBase, EventService):
"""Filesystem-based implementation of EventService.
class FilesystemEventService(EventServiceBase):
"""Event service based on file system"""
Events are stored in files with the naming format:
{conversation_id}/{YYYYMMDDHHMMSS}_{kind}_{id.hex}
limit: int = 500
Uses an AppConversationInfoService to lookup conversations
"""
def _load_event(self, path: Path) -> Event | None:
try:
content = path.read_text(str(path))
content = Event.model_validate_json(content)
return content
except Exception:
_logger.exception('Error reading event', stack_info=True)
return None
app_conversation_info_service: AppConversationInfoService
events_dir: Path
def _store_event(self, path: Path, event: Event):
path.parent.mkdir(parents=True, exist_ok=True)
content = event.model_dump_json(indent=2)
path.write_text(content)
def _ensure_events_dir(self, conversation_id: UUID | None = None) -> Path:
"""Ensure the events directory exists."""
if conversation_id:
events_path = self.events_dir / str(conversation_id)
else:
events_path = self.events_dir
events_path.mkdir(parents=True, exist_ok=True)
return events_path
def _save_event_to_file(self, conversation_id: UUID, event: Event) -> None:
"""Save an event to a file."""
events_path = self._ensure_events_dir(conversation_id)
filename = self._get_event_filename(conversation_id, event)
filepath = events_path / filename
with open(filepath, 'w') as f:
# Use model_dump with mode='json' to handle UUID serialization
data = event.model_dump(mode='json')
f.write(json.dumps(data, indent=2))
async def save_event(self, conversation_id: UUID, event: Event):
"""Save an event. Internal method intended not be part of the REST api."""
conversation = (
await self.app_conversation_info_service.get_app_conversation_info(
conversation_id
)
)
if not conversation:
# This is either an illegal state or somebody is trying to hack
raise OpenHandsError('No such conversation: {conversaiont_id}')
self._save_event_to_file(conversation_id, event)
async def _filter_files_by_conversation(self, files: list[Path]) -> list[Path]:
conversation_ids = list(self._get_conversation_ids(files))
conversations = (
await self.app_conversation_info_service.batch_get_app_conversation_info(
conversation_ids
)
)
permitted_conversation_ids = set()
for conversation in conversations:
if conversation:
permitted_conversation_ids.add(conversation.id)
result = [
file
for file in files
if self._get_conversation_id(file) in permitted_conversation_ids
]
return result
def _search_paths(self, prefix: Path, page_id: str | None = None) -> list[Path]:
search_path = f'{prefix}*'
files = glob.glob(str(search_path))
paths = [Path(file) for file in files]
return paths
class FilesystemEventServiceInjector(EventServiceInjector):
@@ -91,14 +48,22 @@ class FilesystemEventServiceInjector(EventServiceInjector):
from openhands.app_server.config import (
get_app_conversation_info_service,
get_global_config,
get_user_context,
)
async with get_app_conversation_info_service(
state, request
) as app_conversation_info_service:
persistence_dir = get_global_config().persistence_dir
async with (
get_user_context(state, request) as user_context,
get_app_conversation_info_service(
state, request
) as app_conversation_info_service,
):
# Set up a service with a path {persistence_dir}/{user_id}/v1_conversations
prefix = get_global_config().persistence_dir
user_id = await user_context.get_user_id()
yield FilesystemEventService(
prefix=prefix,
user_id=user_id,
app_conversation_info_service=app_conversation_info_service,
events_dir=persistence_dir / 'v1' / 'events',
app_conversation_info_load_tasks={},
)

View File

@@ -1,224 +0,0 @@
import asyncio
import glob
from abc import abstractmethod
from datetime import datetime
from pathlib import Path
from uuid import UUID
from openhands.agent_server.models import EventPage, EventSortOrder
from openhands.app_server.event_callback.event_callback_models import EventKind
from openhands.sdk import Event
class FilesystemEventServiceBase:
events_dir: Path
async def get_event(self, event_id: str) -> Event | None:
"""Get the event with the given id, or None if not found."""
# Convert event_id to hex format (remove dashes) for filename matching
if isinstance(event_id, str) and '-' in event_id:
id_hex = event_id.replace('-', '')
else:
id_hex = event_id
# Use glob pattern to find files ending with the event_id
pattern = f'*_{id_hex}'
files = self._get_event_files_by_pattern(pattern)
files = await self._filter_files_by_conversation(files)
if not files:
return None
# Load and return the first matching event
return self._load_event_from_file(files[0])
async def search_events(
self,
conversation_id__eq: UUID | None = None,
kind__eq: EventKind | None = None,
timestamp__gte: datetime | None = None,
timestamp__lt: datetime | None = None,
sort_order: EventSortOrder = EventSortOrder.TIMESTAMP,
page_id: str | None = None,
limit: int = 100,
) -> EventPage:
"""Search for events matching the given filters."""
# Build the search pattern
pattern = '*'
files = self._get_event_files_by_pattern(pattern, conversation_id__eq)
files = await self._filter_files_by_conversation(files)
files = self._filter_files_by_criteria(
files, conversation_id__eq, kind__eq, timestamp__gte, timestamp__lt
)
files.sort(
key=lambda f: f.name,
reverse=(sort_order == EventSortOrder.TIMESTAMP_DESC),
)
# Handle pagination
start_index = 0
if page_id:
for i, file_path in enumerate(files):
if file_path.name == page_id:
start_index = i + 1
break
# Collect items for this page
page_files = files[start_index : start_index + limit]
next_page_id = None
if start_index + limit < len(files):
next_page_id = files[start_index + limit].name
# Load all events from files in a background thread.
loop = asyncio.get_running_loop()
page_events = await loop.run_in_executor(
None, self._load_events_from_files, page_files
)
return EventPage(items=page_events, next_page_id=next_page_id)
async def count_events(
self,
conversation_id__eq: UUID | None = None,
kind__eq: EventKind | None = None,
timestamp__gte: datetime | None = None,
timestamp__lt: datetime | None = None,
sort_order: EventSortOrder = EventSortOrder.TIMESTAMP,
) -> int:
"""Count events matching the given filters."""
# Build the search pattern
pattern = '*'
files = self._get_event_files_by_pattern(pattern, conversation_id__eq)
files = await self._filter_files_by_conversation(files)
files = self._filter_files_by_criteria(
files, conversation_id__eq, kind__eq, timestamp__gte, timestamp__lt
)
return len(files)
def _get_event_filename(self, conversation_id: UUID, event: Event) -> str:
"""Generate filename using YYYYMMDDHHMMSS_kind_id.hex format."""
timestamp_str = self._timestamp_to_str(event.timestamp)
kind = event.__class__.__name__
# Handle both UUID objects and string UUIDs
if isinstance(event.id, str):
id_hex = event.id.replace('-', '')
else:
id_hex = event.id.hex
return f'{timestamp_str}_{kind}_{id_hex}'
def _timestamp_to_str(self, timestamp: datetime | str) -> str:
"""Convert timestamp to YYYYMMDDHHMMSS format."""
if isinstance(timestamp, str):
# Parse ISO format timestamp string
dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
return dt.strftime('%Y%m%d%H%M%S')
return timestamp.strftime('%Y%m%d%H%M%S')
def _load_events_from_files(self, file_paths: list[Path]) -> list[Event]:
events = []
for file_path in file_paths:
event = self._load_event_from_file(file_path)
if event is not None:
events.append(event)
return events
def _load_event_from_file(self, filepath: Path) -> Event | None:
"""Load an event from a file."""
try:
json_data = filepath.read_text()
return Event.model_validate_json(json_data)
except Exception:
return None
def _get_event_files_by_pattern(
self, pattern: str, conversation_id: UUID | None = None
) -> list[Path]:
"""Get event files matching a glob pattern, sorted by timestamp."""
if conversation_id:
search_path = self.events_dir / str(conversation_id) / pattern
else:
search_path = self.events_dir / '*' / pattern
files = glob.glob(str(search_path))
return sorted([Path(f) for f in files])
def _parse_filename(self, filename: str) -> dict[str, str] | None:
"""Parse filename to extract timestamp, kind, and event_id."""
try:
parts = filename.split('_')
if len(parts) >= 3:
timestamp_str = parts[0]
kind = '_'.join(parts[1:-1]) # Handle kinds with underscores
event_id = parts[-1]
return {'timestamp': timestamp_str, 'kind': kind, 'event_id': event_id}
except Exception:
pass
return None
def _get_conversation_id(self, file: Path) -> UUID | None:
try:
return UUID(file.parent.name)
except Exception:
return None
def _get_conversation_ids(self, files: list[Path]) -> set[UUID]:
result = set()
for file in files:
conversation_id = self._get_conversation_id(file)
if conversation_id:
result.add(conversation_id)
return result
@abstractmethod
async def _filter_files_by_conversation(self, files: list[Path]) -> list[Path]:
"""Filter files by conversation."""
def _filter_files_by_criteria(
self,
files: list[Path],
conversation_id__eq: UUID | None = None,
kind__eq: EventKind | None = None,
timestamp__gte: datetime | None = None,
timestamp__lt: datetime | None = None,
) -> list[Path]:
"""Filter files based on search criteria."""
filtered_files = []
for file_path in files:
# Check conversation_id filter
if conversation_id__eq:
if str(conversation_id__eq) not in str(file_path):
continue
# Parse filename for additional filtering
filename_info = self._parse_filename(file_path.name)
if not filename_info:
continue
# Check kind filter
if kind__eq and filename_info['kind'] != kind__eq:
continue
# Check timestamp filters
if timestamp__gte or timestamp__lt:
try:
file_timestamp = datetime.strptime(
filename_info['timestamp'], '%Y%m%d%H%M%S'
)
if timestamp__gte and file_timestamp < timestamp__gte:
continue
if timestamp__lt and file_timestamp >= timestamp__lt:
continue
except ValueError:
continue
filtered_files.append(file_path)
return filtered_files

View File

@@ -0,0 +1,90 @@
"""Google Cloud Storage-based EventService implementation."""
import json
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import AsyncGenerator, Iterator
from fastapi import Request
from google.api_core.exceptions import NotFound
from google.cloud import storage
from google.cloud.storage.blob import Blob
from google.cloud.storage.bucket import Bucket
from google.cloud.storage.client import Client
from openhands.app_server.config import get_app_conversation_info_service
from openhands.app_server.event.event_service import EventService, EventServiceInjector
from openhands.app_server.event.event_service_base import EventServiceBase
from openhands.app_server.services.injector import InjectorState
from openhands.sdk import Event
_logger = logging.getLogger(__name__)
@dataclass
class GoogleCloudEventService(EventServiceBase):
"""Google Cloud Storage-based implementation of EventService."""
bucket: Bucket
def _load_event(self, path: Path) -> Event | None:
"""Get the event at the path given."""
blob: Blob = self.bucket.blob(str(path))
try:
with blob.open('r') as f:
json_data = f.read()
event = Event.model_validate_json(json_data)
return event
except NotFound:
return None
except Exception:
_logger.exception(f'Error reading event from {path}')
return None
def _store_event(self, path: Path, event: Event):
"""Store the event given at the path given."""
blob: Blob = self.bucket.blob(str(path))
data = event.model_dump(mode='json')
with blob.open('w') as f:
f.write(json.dumps(data, indent=2))
def _search_paths(self, prefix: Path, page_id: str | None = None) -> list[Path]:
"""Search paths."""
blobs: Iterator[Blob] = self.bucket.list_blobs(
page_token=page_id, prefix=str(prefix)
)
paths = list(Path(blob.name) for blob in blobs)
return paths
class GoogleCloudEventServiceInjector(EventServiceInjector):
bucket_name: str
prefix: Path = Path('users')
async def inject(
self, state: InjectorState, request: Request | None = None
) -> AsyncGenerator[EventService, None]:
from openhands.app_server.config import (
get_user_context,
)
async with (
get_user_context(state, request) as user_context,
get_app_conversation_info_service(
state, request
) as app_conversation_info_service,
):
user_id = await user_context.get_user_id()
bucket_name = self.bucket_name
storage_client: Client = storage.Client()
bucket: Bucket = storage_client.bucket(bucket_name)
yield GoogleCloudEventService(
prefix=self.prefix,
user_id=user_id,
app_conversation_info_service=app_conversation_info_service,
bucket=bucket,
app_conversation_info_load_tasks={},
)

View File

@@ -3,6 +3,7 @@ import logging
import os
from dataclasses import dataclass
from typing import Any, AsyncGenerator, Union
from uuid import UUID
import base62
import httpx
@@ -729,7 +730,9 @@ async def refresh_conversation(
return EventPage.model_validate(response.json())
async for event in page_iterator(fetch_events_page):
existing = await event_service.get_event(event.id)
existing = await event_service.get_event(
app_conversation_info.id, UUID(event.id)
)
if existing is None:
await event_service.save_event(app_conversation_info.id, event)
await event_callback_service.execute_callbacks(