import os from pathlib import Path import tempfile import threading from functools import lru_cache from typing import Callable from zipfile import ZipFile import docker import requests import tenacity from openhands.core.config import AppConfig from openhands.core.logger import DEBUG from openhands.core.logger import openhands_logger as logger from openhands.events import EventStream from openhands.events.action import ( ActionConfirmationStatus, BrowseInteractiveAction, BrowseURLAction, CmdRunAction, FileEditAction, FileReadAction, FileWriteAction, IPythonRunCellAction, ) from openhands.events.action.action import Action from openhands.events.observation import ( ErrorObservation, NullObservation, Observation, UserRejectObservation, ) from openhands.events.serialization import event_to_dict, observation_from_dict from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS from openhands.runtime.base import Runtime from openhands.runtime.builder import DockerRuntimeBuilder from openhands.runtime.plugins import PluginRequirement from openhands.runtime.utils import find_available_tcp_port from openhands.runtime.utils.request import send_request from openhands.runtime.utils.runtime_build import build_runtime_image from openhands.utils.async_utils import call_sync_from_async from openhands.utils.tenacity_stop import stop_if_should_exit class LogBuffer: """Synchronous buffer for Docker container logs. This class provides a thread-safe way to collect, store, and retrieve logs from a Docker container. It uses a list to store log lines and provides methods for appending, retrieving, and clearing logs. """ def __init__(self, container: docker.models.containers.Container, logFn: Callable): self.init_msg = 'Runtime client initialized.' self.buffer: list[str] = [] self.lock = threading.Lock() self._stop_event = threading.Event() self.log_generator = container.logs(stream=True, follow=True) self.log_stream_thread = threading.Thread(target=self.stream_logs) self.log_stream_thread.daemon = True self.log_stream_thread.start() self.log = logFn def append(self, log_line: str): with self.lock: self.buffer.append(log_line) def get_and_clear(self) -> list[str]: with self.lock: logs = list(self.buffer) self.buffer.clear() return logs def stream_logs(self): """Stream logs from the Docker container in a separate thread. This method runs in its own thread to handle the blocking operation of reading log lines from the Docker SDK's synchronous generator. """ try: for log_line in self.log_generator: if self._stop_event.is_set(): break if log_line: decoded_line = log_line.decode('utf-8').rstrip() self.append(decoded_line) except Exception as e: self.log('error', f'Error streaming docker logs: {e}') def __del__(self): if self.log_stream_thread.is_alive(): self.log( 'warn', "LogBuffer was not properly closed. Use 'log_buffer.close()' for clean shutdown.", ) self.close(timeout=5) def close(self, timeout: float = 5.0): self._stop_event.set() self.log_stream_thread.join(timeout) class EventStreamRuntime(Runtime): """This runtime will subscribe the event stream. When receive an event, it will send the event to runtime-client which run inside the docker environment. Args: config (AppConfig): The application configuration. event_stream (EventStream): The event stream to subscribe to. sid (str, optional): The session ID. Defaults to 'default'. plugins (list[PluginRequirement] | None, optional): List of plugin requirements. Defaults to None. env_vars (dict[str, str] | None, optional): Environment variables to set. Defaults to None. """ container_name_prefix = 'openhands-runtime-' # Need to provide this method to allow inheritors to init the Runtime # without initting the EventStreamRuntime. def init_base_runtime( self, config: AppConfig, event_stream: EventStream, sid: str = 'default', plugins: list[PluginRequirement] | None = None, env_vars: dict[str, str] | None = None, status_callback: Callable | None = None, attach_to_existing: bool = False, ): super().__init__( config, event_stream, sid, plugins, env_vars, status_callback, attach_to_existing, ) def __init__( self, config: AppConfig, event_stream: EventStream, sid: str = 'default', plugins: list[PluginRequirement] | None = None, env_vars: dict[str, str] | None = None, status_callback: Callable | None = None, attach_to_existing: bool = False, ): self.config = config self._host_port = 30000 # initial dummy value self._container_port = 30001 # initial dummy value self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}' self.session = requests.Session() self.status_callback = status_callback self.docker_client: docker.DockerClient = self._init_docker_client() self.base_container_image = self.config.sandbox.base_container_image self.runtime_container_image = self.config.sandbox.runtime_container_image self.container_name = self.container_name_prefix + sid self.container = None self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time self.runtime_builder = DockerRuntimeBuilder(self.docker_client) # Buffer for container logs self.log_buffer: LogBuffer | None = None if self.config.sandbox.runtime_extra_deps: self.log( 'debug', f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}', ) self.skip_container_logs = ( os.environ.get('SKIP_CONTAINER_LOGS', 'false').lower() == 'true' ) self.init_base_runtime( config, event_stream, sid, plugins, env_vars, status_callback, attach_to_existing, ) async def connect(self): self.send_status_message('STATUS$STARTING_RUNTIME') if not self.attach_to_existing: if self.runtime_container_image is None: if self.base_container_image is None: raise ValueError( 'Neither runtime container image nor base container image is set' ) self.send_status_message('STATUS$STARTING_CONTAINER') self.runtime_container_image = build_runtime_image( self.base_container_image, self.runtime_builder, platform=self.config.sandbox.platform, extra_deps=self.config.sandbox.runtime_extra_deps, force_rebuild=self.config.sandbox.force_rebuild_runtime, ) self.log( 'info', f'Starting runtime with image: {self.runtime_container_image}' ) await call_sync_from_async(self._init_container) self.log('info', f'Container started: {self.container_name}') else: await call_sync_from_async(self._attach_to_container) if not self.attach_to_existing: self.log('info', f'Waiting for client to become ready at {self.api_url}...') self.send_status_message('STATUS$WAITING_FOR_CLIENT') await call_sync_from_async(self._wait_until_alive) if not self.attach_to_existing: self.log('info', 'Runtime is ready.') if not self.attach_to_existing: await call_sync_from_async(self.setup_initial_env) self.log( 'debug', f'Container initialized with plugins: {[plugin.name for plugin in self.plugins]}', ) self.send_status_message(' ') @staticmethod @lru_cache(maxsize=1) def _init_docker_client() -> docker.DockerClient: try: return docker.from_env() except Exception as ex: logger.error( 'Launch docker client failed. Please make sure you have installed docker and started docker desktop/daemon.', ) raise ex def _init_container(self): self.log('debug', 'Preparing to start container...') self.send_status_message('STATUS$PREPARING_CONTAINER') plugin_arg = '' if self.plugins is not None and len(self.plugins) > 0: plugin_arg = ( f'--plugins {" ".join([plugin.name for plugin in self.plugins])} ' ) self._host_port = self._find_available_port() self._container_port = ( self._host_port ) # in future this might differ from host port self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}' use_host_network = self.config.sandbox.use_host_network network_mode: str | None = 'host' if use_host_network else None port_mapping: dict[str, list[dict[str, str]]] | None = ( None if use_host_network else {f'{self._container_port}/tcp': [{'HostPort': str(self._host_port)}]} ) if use_host_network: self.log( 'warn', 'Using host network mode. If you are using MacOS, please make sure you have the latest version of Docker Desktop and enabled host network feature: https://docs.docker.com/network/drivers/host/#docker-desktop', ) # Combine environment variables environment = { 'port': str(self._container_port), 'PYTHONUNBUFFERED': 1, } if self.config.debug or DEBUG: environment['DEBUG'] = 'true' self.log('debug', f'Workspace Base: {self.config.workspace_base}') if ( self.config.workspace_mount_path is not None and self.config.workspace_mount_path_in_sandbox is not None ): # e.g. result would be: {"/home/user/openhands/workspace": {'bind': "/workspace", 'mode': 'rw'}} volumes = { self.config.workspace_mount_path: { 'bind': self.config.workspace_mount_path_in_sandbox, 'mode': 'rw', } } logger.debug(f'Mount dir: {self.config.workspace_mount_path}') else: logger.debug( 'Mount dir is not set, will not mount the workspace directory to the container' ) volumes = None self.log( 'debug', f'Sandbox workspace: {self.config.workspace_mount_path_in_sandbox}', ) if self.config.sandbox.browsergym_eval_env is not None: browsergym_arg = ( f'--browsergym-eval-env {self.config.sandbox.browsergym_eval_env}' ) else: browsergym_arg = '' try: self.container = self.docker_client.containers.run( self.runtime_container_image, command=( f'/openhands/micromamba/bin/micromamba run -n openhands ' f'poetry run ' f'python -u -m openhands.runtime.action_execution_server {self._container_port} ' f'--working-dir "{self.config.workspace_mount_path_in_sandbox}" ' f'{plugin_arg}' f'--username {"openhands" if self.config.run_as_openhands else "root"} ' f'--user-id {self.config.sandbox.user_id} ' f'{browsergym_arg}' ), network_mode=network_mode, ports=port_mapping, working_dir='/openhands/code/', # do not change this! name=self.container_name, detach=True, environment=environment, volumes=volumes, ) self.log_buffer = LogBuffer(self.container, self.log) self.log('debug', f'Container started. Server url: {self.api_url}') self.send_status_message('STATUS$CONTAINER_STARTED') except docker.errors.APIError as e: # check 409 error if '409' in str(e): self.log( 'warning', f'Container {self.container_name} already exists. Removing...', ) self._close_containers(rm_all_containers=True) return self._init_container() else: self.log( 'error', f'Error: Instance {self.container_name} FAILED to start container!\n', ) except Exception as e: self.log( 'error', f'Error: Instance {self.container_name} FAILED to start container!\n', ) self.log('error', str(e)) self.close() raise e def _attach_to_container(self): container = self.docker_client.containers.get(self.container_name) self.log_buffer = LogBuffer(container, self.log) self.container = container self._container_port = 0 for port in container.attrs['NetworkSettings']['Ports']: self._container_port = int(port.split('/')[0]) break self._host_port = self._container_port self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}' self.log( 'debug', f'attached to container: {self.container_name} {self._container_port} {self.api_url}', ) def _refresh_logs(self): self.log('debug', 'Getting container logs...') assert ( self.log_buffer is not None ), 'Log buffer is expected to be initialized when container is started' logs = self.log_buffer.get_and_clear() if logs: formatted_logs = '\n'.join([f' |{log}' for log in logs]) self.log( 'debug', '\n' + '-' * 35 + 'Container logs:' + '-' * 35 + f'\n{formatted_logs}' + '\n' + '-' * 80, ) @tenacity.retry( stop=tenacity.stop_after_delay(120) | stop_if_should_exit(), reraise=(ConnectionRefusedError,), wait=tenacity.wait_fixed(2), ) def _wait_until_alive(self): self._refresh_logs() if not self.log_buffer: raise RuntimeError('Runtime client is not ready.') send_request( self.session, 'GET', f'{self.api_url}/alive', timeout=5, ) def close(self, rm_all_containers: bool = True): """Closes the EventStreamRuntime and associated objects Parameters: - rm_all_containers (bool): Whether to remove all containers with the 'openhands-sandbox-' prefix """ if self.log_buffer: self.log_buffer.close() if self.session: self.session.close() if self.attach_to_existing: return self._close_containers(rm_all_containers) def _close_containers(self, rm_all_containers: bool = True): try: containers = self.docker_client.containers.list(all=True) for container in containers: try: # If the app doesn't shut down properly, it can leave runtime containers on the system. This ensures # that all 'openhands-sandbox-' containers are removed as well. if rm_all_containers and container.name.startswith( self.container_name_prefix ): container.remove(force=True) elif container.name == self.container_name: if not self.skip_container_logs: logs = container.logs(tail=1000).decode('utf-8') self.log( 'debug', f'==== Container logs on close ====\n{logs}\n==== End of container logs ====', ) container.remove(force=True) except docker.errors.APIError: pass except docker.errors.NotFound: pass except docker.errors.NotFound: # yes, this can happen! pass def run_action(self, action: Action) -> Observation: if isinstance(action, FileEditAction): return self.edit(action) # set timeout to default if not set if action.timeout is None: action.timeout = self.config.sandbox.timeout with self.action_semaphore: if not action.runnable: return NullObservation('') if ( hasattr(action, 'confirmation_state') and action.confirmation_state == ActionConfirmationStatus.AWAITING_CONFIRMATION ): return NullObservation('') action_type = action.action # type: ignore[attr-defined] if action_type not in ACTION_TYPE_TO_CLASS: raise ValueError(f'Action {action_type} does not exist.') if not hasattr(self, action_type): return ErrorObservation( f'Action {action_type} is not supported in the current runtime.', error_id='AGENT_ERROR$BAD_ACTION', ) if ( getattr(action, 'confirmation_state', None) == ActionConfirmationStatus.REJECTED ): return UserRejectObservation( 'Action has been rejected by the user! Waiting for further user input.' ) self._refresh_logs() assert action.timeout is not None try: response = send_request( self.session, 'POST', f'{self.api_url}/execute_action', json={'action': event_to_dict(action)}, # wait a few more seconds to get the timeout error from client side timeout=action.timeout + 5, ) output = response.json() obs = observation_from_dict(output) obs._cause = action.id # type: ignore[attr-defined] except requests.Timeout: raise RuntimeError( f'Runtime failed to return execute_action before the requested timeout of {action.timeout}s' ) self._refresh_logs() return obs def run(self, action: CmdRunAction) -> Observation: return self.run_action(action) def run_ipython(self, action: IPythonRunCellAction) -> Observation: return self.run_action(action) def read(self, action: FileReadAction) -> Observation: return self.run_action(action) def write(self, action: FileWriteAction) -> Observation: return self.run_action(action) def browse(self, action: BrowseURLAction) -> Observation: return self.run_action(action) def browse_interactive(self, action: BrowseInteractiveAction) -> Observation: return self.run_action(action) # ==================================================================== # Implement these methods (for file operations) in the subclass # ==================================================================== def copy_to( self, host_src: str, sandbox_dest: str, recursive: bool = False ) -> None: if not os.path.exists(host_src): raise FileNotFoundError(f'Source file {host_src} does not exist') self._refresh_logs() try: if recursive: # For recursive copy, create a zip file with tempfile.NamedTemporaryFile( suffix='.zip', delete=False ) as temp_zip: temp_zip_path = temp_zip.name with ZipFile(temp_zip_path, 'w') as zipf: for root, _, files in os.walk(host_src): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath( file_path, os.path.dirname(host_src) ) zipf.write(file_path, arcname) upload_data = {'file': open(temp_zip_path, 'rb')} else: # For single file copy upload_data = {'file': open(host_src, 'rb')} params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()} send_request( self.session, 'POST', f'{self.api_url}/upload_file', files=upload_data, params=params, timeout=300, ) except requests.Timeout: raise TimeoutError('Copy operation timed out') except Exception as e: raise RuntimeError(f'Copy operation failed: {str(e)}') finally: if recursive: os.unlink(temp_zip_path) self.log( 'debug', f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}' ) self._refresh_logs() def list_files(self, path: str | None = None) -> list[str]: """List files in the sandbox. If path is None, list files in the sandbox's initial working directory (e.g., /workspace). """ self._refresh_logs() try: data = {} if path is not None: data['path'] = path response = send_request( self.session, 'POST', f'{self.api_url}/list_files', json=data, timeout=10, ) response_json = response.json() assert isinstance(response_json, list) return response_json except requests.Timeout: raise TimeoutError('List files operation timed out') def copy_from(self, path: str) -> Path: """Zip all files in the sandbox and return as a stream of bytes.""" self._refresh_logs() try: params = {'path': path} response = send_request( self.session, 'GET', f'{self.api_url}/download_files', params=params, stream=True, timeout=30, ) temp_file = tempfile.NamedTemporaryFile(delete=False) for chunk in response.iter_content(chunk_size=8192): if chunk: # filter out keep-alive new chunks temp_file.write(chunk) return Path(temp_file.name) except requests.Timeout: raise TimeoutError('Copy operation timed out') def _is_port_in_use_docker(self, port): containers = self.docker_client.containers.list() for container in containers: container_ports = container.ports if str(port) in str(container_ports): return True return False def _find_available_port(self, max_attempts=5): port = 39999 for _ in range(max_attempts): port = find_available_tcp_port(30000, 39999) if not self._is_port_in_use_docker(port): return port # If no port is found after max_attempts, return the last tried port return port