From c9ed9b166be89d448c01a2ada5ef4ee525eb74b3 Mon Sep 17 00:00:00 2001 From: Robert Brennan Date: Tue, 19 Nov 2024 13:46:03 -0500 Subject: [PATCH] handle exceptions more explicitly (#4971) --- openhands/controller/agent_controller.py | 4 ++-- openhands/core/logger.py | 9 ++++++-- openhands/runtime/base.py | 12 +++++++++-- .../impl/eventstream/eventstream_runtime.py | 20 ++++++++++++++++-- .../runtime/impl/remote/remote_runtime.py | 5 ++++- openhands/server/listen.py | 21 ++++++++++++++++--- openhands/server/session/agent_session.py | 6 +++--- openhands/server/session/manager.py | 7 ++++++- 8 files changed, 68 insertions(+), 16 deletions(-) diff --git a/openhands/controller/agent_controller.py b/openhands/controller/agent_controller.py index f9e4a8edb5..e0fa0dab03 100644 --- a/openhands/controller/agent_controller.py +++ b/openhands/controller/agent_controller.py @@ -18,6 +18,7 @@ from openhands.core.exceptions import ( LLMNoActionError, LLMResponseError, ) +from openhands.core.logger import LOG_ALL_EVENTS from openhands.core.logger import openhands_logger as logger from openhands.core.schema import AgentState from openhands.events import EventSource, EventStream, EventStreamSubscriber @@ -528,8 +529,7 @@ class AgentController: await self.update_state_after_step() - # Use info level if LOG_ALL_EVENTS is set - log_level = 'info' if os.getenv('LOG_ALL_EVENTS') in ('true', '1') else 'debug' + log_level = 'info' if LOG_ALL_EVENTS else 'debug' self.log(log_level, str(action), extra={'msg_type': 'ACTION'}) async def _delegate_step(self): diff --git a/openhands/core/logger.py b/openhands/core/logger.py index 1450e50351..238b4c3943 100644 --- a/openhands/core/logger.py +++ b/openhands/core/logger.py @@ -17,6 +17,8 @@ if DEBUG: LOG_TO_FILE = os.getenv('LOG_TO_FILE', 'False').lower() in ['true', '1', 'yes'] DISABLE_COLOR_PRINTING = False +LOG_ALL_EVENTS = os.getenv('LOG_ALL_EVENTS', 'False').lower() in ['true', '1', 'yes'] + ColorType = Literal[ 'red', 'green', @@ -89,8 +91,11 @@ class ColoredFormatter(logging.Formatter): return f'{time_str} - {name_str}:{level_str}: {record.filename}:{record.lineno}\n{msg_type_color}\n{msg}' return f'{time_str} - {msg_type_color}\n{msg}' elif msg_type == 'STEP': - msg = '\n\n==============\n' + record.msg + '\n' - return f'{msg}' + if LOG_ALL_EVENTS: + msg = '\n\n==============\n' + record.msg + '\n' + return f'{msg}' + else: + return record.msg return super().format(record) diff --git a/openhands/runtime/base.py b/openhands/runtime/base.py index b12c501c19..74891a7d52 100644 --- a/openhands/runtime/base.py +++ b/openhands/runtime/base.py @@ -47,11 +47,19 @@ STATUS_MESSAGES = { } -class RuntimeNotReadyError(Exception): +class RuntimeUnavailableError(Exception): pass -class RuntimeDisconnectedError(Exception): +class RuntimeNotReadyError(RuntimeUnavailableError): + pass + + +class RuntimeDisconnectedError(RuntimeUnavailableError): + pass + + +class RuntimeNotFoundError(RuntimeUnavailableError): pass diff --git a/openhands/runtime/impl/eventstream/eventstream_runtime.py b/openhands/runtime/impl/eventstream/eventstream_runtime.py index e65c36cc67..fe8f67f295 100644 --- a/openhands/runtime/impl/eventstream/eventstream_runtime.py +++ b/openhands/runtime/impl/eventstream/eventstream_runtime.py @@ -34,7 +34,11 @@ from openhands.events.observation import ( ) from openhands.events.serialization import event_to_dict, observation_from_dict from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS -from openhands.runtime.base import Runtime +from openhands.runtime.base import ( + Runtime, + RuntimeDisconnectedError, + RuntimeNotFoundError, +) from openhands.runtime.builder import DockerRuntimeBuilder from openhands.runtime.impl.eventstream.containers import remove_all_containers from openhands.runtime.plugins import PluginRequirement @@ -424,10 +428,22 @@ class EventStreamRuntime(Runtime): @tenacity.retry( stop=tenacity.stop_after_delay(120) | stop_if_should_exit(), - reraise=(ConnectionRefusedError,), + retry=tenacity.retry_if_exception_type( + (ConnectionError, requests.exceptions.ConnectionError) + ), + reraise=True, wait=tenacity.wait_fixed(2), ) def _wait_until_alive(self): + try: + container = self.docker_client.containers.get(self.container_name) + if container.status == 'exited': + raise RuntimeDisconnectedError( + f'Container {self.container_name} has exited.' + ) + except docker.errors.NotFound: + raise RuntimeNotFoundError(f'Container {self.container_name} not found.') + self._refresh_logs() if not self.log_buffer: raise RuntimeError('Runtime client is not ready.') diff --git a/openhands/runtime/impl/remote/remote_runtime.py b/openhands/runtime/impl/remote/remote_runtime.py index 4191a047b1..d996441b66 100644 --- a/openhands/runtime/impl/remote/remote_runtime.py +++ b/openhands/runtime/impl/remote/remote_runtime.py @@ -31,6 +31,7 @@ from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS from openhands.runtime.base import ( Runtime, RuntimeDisconnectedError, + RuntimeNotFoundError, RuntimeNotReadyError, ) from openhands.runtime.builder.remote import RemoteRuntimeBuilder @@ -109,7 +110,9 @@ class RemoteRuntime(Runtime): if existing_runtime: self.log('debug', f'Using existing runtime with ID: {self.runtime_id}') elif self.attach_to_existing: - raise RuntimeError('Could not find existing runtime to attach to.') + raise RuntimeNotFoundError( + f'Could not find existing runtime for SID: {self.sid}' + ) else: self.send_status_message('STATUS$STARTING_CONTAINER') if self.config.sandbox.runtime_container_image is None: diff --git a/openhands/server/listen.py b/openhands/server/listen.py index 8724daf190..929a26ec98 100644 --- a/openhands/server/listen.py +++ b/openhands/server/listen.py @@ -34,6 +34,7 @@ from fastapi import ( Request, UploadFile, WebSocket, + WebSocketDisconnect, status, ) from fastapi.responses import FileResponse, JSONResponse @@ -238,7 +239,8 @@ async def attach_session(request: Request, call_next): request.state.conversation = await session_manager.attach_to_conversation( request.state.sid ) - if request.state.conversation is None: + if not request.state.conversation: + logger.error(f'Runtime not found for session: {request.state.sid}') return JSONResponse( status_code=status.HTTP_404_NOT_FOUND, content={'error': 'Session not found'}, @@ -344,7 +346,13 @@ async def websocket_endpoint(websocket: WebSocket): latest_event_id = -1 if websocket.query_params.get('latest_event_id'): - latest_event_id = int(websocket.query_params.get('latest_event_id')) + try: + latest_event_id = int(websocket.query_params.get('latest_event_id')) + except ValueError: + logger.warning( + f'Invalid latest_event_id: {websocket.query_params.get("latest_event_id")}' + ) + pass async_stream = AsyncEventStreamWrapper( session.agent_session.event_stream, latest_event_id + 1 @@ -361,7 +369,14 @@ async def websocket_endpoint(websocket: WebSocket): ), ): continue - await websocket.send_json(event_to_dict(event)) + try: + await websocket.send_json(event_to_dict(event)) + except WebSocketDisconnect: + logger.warning( + 'Websocket disconnected while sending event history, before loop started' + ) + session.close() + return await session.loop_recv() diff --git a/openhands/server/session/agent_session.py b/openhands/server/session/agent_session.py index 8f9d20a5dc..76f6e2aa8b 100644 --- a/openhands/server/session/agent_session.py +++ b/openhands/server/session/agent_session.py @@ -11,7 +11,7 @@ from openhands.events.action.agent import ChangeAgentStateAction from openhands.events.event import EventSource from openhands.events.stream import EventStream from openhands.runtime import get_runtime_cls -from openhands.runtime.base import Runtime +from openhands.runtime.base import Runtime, RuntimeUnavailableError from openhands.security import SecurityAnalyzer, options from openhands.storage.files import FileStore @@ -194,13 +194,13 @@ class AgentSession: try: await self.runtime.connect() - except Exception as e: + except RuntimeUnavailableError as e: logger.error(f'Runtime initialization failed: {e}', exc_info=True) if self._status_callback: self._status_callback( 'error', 'STATUS$ERROR_RUNTIME_DISCONNECTED', str(e) ) - raise + return if self.runtime is not None: logger.debug( diff --git a/openhands/server/session/manager.py b/openhands/server/session/manager.py index f746b3676e..790b7c4bd1 100644 --- a/openhands/server/session/manager.py +++ b/openhands/server/session/manager.py @@ -6,6 +6,7 @@ from fastapi import WebSocket from openhands.core.config import AppConfig from openhands.core.logger import openhands_logger as logger from openhands.events.stream import session_exists +from openhands.runtime.base import RuntimeUnavailableError from openhands.server.session.conversation import Conversation from openhands.server.session.session import Session from openhands.storage.files import FileStore @@ -26,7 +27,11 @@ class SessionManager: if not await session_exists(sid, self.file_store): return None c = Conversation(sid, file_store=self.file_store, config=self.config) - await c.connect() + try: + await c.connect() + except RuntimeUnavailableError as e: + logger.error(f'Error connecting to conversation {c.sid}: {e}') + return None end_time = time.time() logger.info( f'Conversation {c.sid} connected in {end_time - start_time} seconds'