Compare commits

...

14 Commits

Author SHA1 Message Date
Robert Brennan 86403a9446 Merge branch 'feature/runtime-manager' into feature/runtime-manager-impl 2024-12-24 15:54:48 -05:00
Robert Brennan 2bdad7d910 Merge branch 'feature/runtime-manager' into feature/runtime-manager-impl 2024-12-24 15:33:18 -05:00
Robert Brennan fc9bcc3511 change connect logic 2024-12-24 15:31:51 -05:00
Robert Brennan 5eecddbf13 Merge branch 'feature/runtime-manager' into feature/runtime-manager-impl 2024-12-24 15:28:05 -05:00
Robert Brennan 9b61f38c4a fix imports 2024-12-24 15:25:59 -05:00
Robert Brennan 076cb2d055 Merge branch 'feature/runtime-manager' into feature/runtime-manager-impl 2024-12-24 15:24:20 -05:00
Robert Brennan 05dd9d7a06 revert base runtime 2024-12-24 14:38:36 -05:00
openhands 42b2ccb985 refactor: fix ownership model and remove container knowledge from base RuntimeManager
- Restore base RuntimeManager to original state
- Move container initialization to EventStreamRuntimeManager.create_runtime
- Update EventStreamRuntime to accept container_info in __init__
- Fix imports and type errors
2024-12-24 19:33:04 +00:00
openhands c1278c9835 fix: add type hints and fix runtime builder initialization 2024-12-24 19:22:17 +00:00
openhands 4bbb3572a6 revert: restore base RuntimeManager from feature/runtime-manager branch 2024-12-24 19:18:19 +00:00
openhands b4bd940a63 refactor: add EventStreamRuntimeManager
- Add ContainerInfo class in shared location
- Add EventStreamRuntimeManager for Docker container lifecycle
- Update EventStreamRuntime to use specialized manager
- Fix type errors and improve code quality
2024-12-24 19:14:51 +00:00
openhands cbd4f94a2d refactor: remove runtime_manager dependency from Runtime
- Remove runtime_manager parameter from Runtime class
- Remove runtime_manager property from Runtime class
- Update EventStreamRuntime to use RuntimeManager singleton
- Fix type errors and improve code quality
2024-12-24 19:06:51 +00:00
openhands ef95ddc680 refactor: improve container lifecycle management
- Move container lifecycle management to RuntimeManager
- Add ContainerInfo class to encapsulate container details
- Remove runtime_manager dependency from Runtime
- Add container state checks in EventStreamRuntime
- Fix type errors and add missing imports
2024-12-24 18:57:20 +00:00
openhands a287e5388c refactor: move container lifecycle management to RuntimeManager
- Move Docker-related methods from EventStreamRuntime to RuntimeManager
- Add container lifecycle management methods to RuntimeManager
- Update EventStreamRuntime to use RuntimeManager
- Update base Runtime class to support RuntimeManager
- Fix type errors and add missing imports
2024-12-24 18:45:23 +00:00
4 changed files with 534 additions and 303 deletions
+21
View File
@@ -0,0 +1,21 @@
"""Container-related types and utilities."""
import docker
class ContainerInfo:
"""Information about a running container that a Runtime needs."""
def __init__(
self,
container_id: str,
api_url: str,
host_port: int,
container_port: int,
container: docker.models.containers.Container,
):
self.container_id = container_id
self.api_url = api_url
self.host_port = host_port
self.container_port = container_port
self.container = container
@@ -1,26 +1,17 @@
import atexit
import os
import tempfile
import threading
from functools import lru_cache
from pathlib import Path
from typing import Callable
from zipfile import ZipFile
import docker
import requests
import tenacity
from openhands.core.config import AppConfig
from openhands.core.exceptions import (
AgentRuntimeDisconnectedError,
AgentRuntimeError,
AgentRuntimeNotFoundError,
AgentRuntimeNotReadyError,
AgentRuntimeTimeoutError,
)
from openhands.core.logger import DEBUG
from openhands.core.logger import openhands_logger as logger
from openhands.events import EventStream
from openhands.events.action import (
ActionConfirmationStatus,
@@ -42,24 +33,11 @@ from openhands.events.observation import (
from openhands.events.serialization import event_to_dict, observation_from_dict
from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
from openhands.runtime.base import Runtime
from openhands.runtime.builder import DockerRuntimeBuilder
from openhands.runtime.impl.eventstream.containers import remove_all_containers
from openhands.runtime.container import ContainerInfo
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils import find_available_tcp_port
from openhands.runtime.utils.log_streamer import LogStreamer
from openhands.runtime.utils.request import send_request
from openhands.runtime.utils.runtime_build import build_runtime_image
from openhands.utils.async_utils import call_sync_from_async
from openhands.utils.tenacity_stop import stop_if_should_exit
CONTAINER_NAME_PREFIX = 'openhands-runtime-'
def remove_all_runtime_containers():
remove_all_containers(CONTAINER_NAME_PREFIX)
_atexit_registered = False
class EventStreamRuntime(Runtime):
@@ -74,8 +52,6 @@ class EventStreamRuntime(Runtime):
env_vars (dict[str, str] | None, optional): Environment variables to set. Defaults to None.
"""
# Need to provide this method to allow inheritors to init the Runtime
# without initting the EventStreamRuntime.
def init_base_runtime(
self,
config: AppConfig,
@@ -108,31 +84,15 @@ class EventStreamRuntime(Runtime):
status_callback: Callable | None = None,
attach_to_existing: bool = False,
headless_mode: bool = True,
container_info: ContainerInfo | None = None,
):
global _atexit_registered
if not _atexit_registered:
_atexit_registered = True
atexit.register(remove_all_runtime_containers)
self.config = config
self._host_port = 30000 # initial dummy value
self._container_port = 30001 # initial dummy value
self._vscode_url: str | None = None # initial dummy value
self._vscode_url: str | None = None
self._runtime_initialized: bool = False
self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
self.session = requests.Session()
self.status_callback = status_callback
self.docker_client: docker.DockerClient = self._init_docker_client()
self.base_container_image = self.config.sandbox.base_container_image
self.runtime_container_image = self.config.sandbox.runtime_container_image
self.container_name = CONTAINER_NAME_PREFIX + sid
self.container = None
self.container_info = container_info
self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time
self.runtime_builder = DockerRuntimeBuilder(self.docker_client)
# Buffer for container logs
self.log_streamer: LogStreamer | None = None
self.init_base_runtime(
@@ -154,52 +114,22 @@ class EventStreamRuntime(Runtime):
)
async def connect(self):
"""Initialize the runtime with the provided container."""
if not self.container_info:
raise RuntimeError('Container info not provided')
self.send_status_message('STATUS$STARTING_RUNTIME')
try:
await call_sync_from_async(self._attach_to_container)
except docker.errors.NotFound as e:
if self.attach_to_existing:
self.log(
'error',
f'Container {self.container_name} not found.',
)
raise e
if self.runtime_container_image is None:
if self.base_container_image is None:
raise ValueError(
'Neither runtime container image nor base container image is set'
)
self.send_status_message('STATUS$STARTING_CONTAINER')
self.runtime_container_image = build_runtime_image(
self.base_container_image,
self.runtime_builder,
platform=self.config.sandbox.platform,
extra_deps=self.config.sandbox.runtime_extra_deps,
force_rebuild=self.config.sandbox.force_rebuild_runtime,
extra_build_args=self.config.sandbox.runtime_extra_build_args,
)
self.log(
'info', f'Starting runtime with image: {self.runtime_container_image}'
)
await call_sync_from_async(self._init_container)
self.log(
'info',
f'Container started: {self.container_name}. VSCode URL: {self.vscode_url}',
)
self.log_streamer = LogStreamer(self.container, self.log)
self.log_streamer = LogStreamer(self.container_info.container, self.log)
if not self.attach_to_existing:
self.log('info', f'Waiting for client to become ready at {self.api_url}...')
self.log(
'info',
f'Waiting for client to become ready at {self.container_info.api_url}...',
)
self.send_status_message('STATUS$WAITING_FOR_CLIENT')
await call_sync_from_async(self._wait_until_alive)
if not self.attach_to_existing:
self.log('info', 'Runtime is ready.')
if not self.attach_to_existing:
await call_sync_from_async(self.setup_initial_env)
self.log(
@@ -210,207 +140,24 @@ class EventStreamRuntime(Runtime):
self.send_status_message(' ')
self._runtime_initialized = True
@staticmethod
@lru_cache(maxsize=1)
def _init_docker_client() -> docker.DockerClient:
try:
return docker.from_env()
except Exception as ex:
logger.error(
'Launch docker client failed. Please make sure you have installed docker and started docker desktop/daemon.',
)
raise ex
def _init_container(self):
self.log('debug', 'Preparing to start container...')
self.send_status_message('STATUS$PREPARING_CONTAINER')
plugin_arg = ''
if self.plugins is not None and len(self.plugins) > 0:
plugin_arg = (
f'--plugins {" ".join([plugin.name for plugin in self.plugins])} '
)
self._host_port = self._find_available_port()
self._container_port = (
self._host_port
) # in future this might differ from host port
self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
use_host_network = self.config.sandbox.use_host_network
network_mode: str | None = 'host' if use_host_network else None
port_mapping: dict[str, list[dict[str, str]]] | None = (
None
if use_host_network
else {f'{self._container_port}/tcp': [{'HostPort': str(self._host_port)}]}
)
if use_host_network:
self.log(
'warn',
'Using host network mode. If you are using MacOS, please make sure you have the latest version of Docker Desktop and enabled host network feature: https://docs.docker.com/network/drivers/host/#docker-desktop',
)
# Combine environment variables
environment = {
'port': str(self._container_port),
'PYTHONUNBUFFERED': 1,
}
if self.config.debug or DEBUG:
environment['DEBUG'] = 'true'
if self.vscode_enabled:
# vscode is on port +1 from container port
if isinstance(port_mapping, dict):
port_mapping[f'{self._container_port + 1}/tcp'] = [
{'HostPort': str(self._host_port + 1)}
]
self.log('debug', f'Workspace Base: {self.config.workspace_base}')
if (
self.config.workspace_mount_path is not None
and self.config.workspace_mount_path_in_sandbox is not None
):
# e.g. result would be: {"/home/user/openhands/workspace": {'bind': "/workspace", 'mode': 'rw'}}
volumes = {
self.config.workspace_mount_path: {
'bind': self.config.workspace_mount_path_in_sandbox,
'mode': 'rw',
}
}
logger.debug(f'Mount dir: {self.config.workspace_mount_path}')
else:
logger.debug(
'Mount dir is not set, will not mount the workspace directory to the container'
)
volumes = None
self.log(
'debug',
f'Sandbox workspace: {self.config.workspace_mount_path_in_sandbox}',
)
if self.config.sandbox.browsergym_eval_env is not None:
browsergym_arg = (
f'--browsergym-eval-env {self.config.sandbox.browsergym_eval_env}'
)
else:
browsergym_arg = ''
try:
self.container = self.docker_client.containers.run(
self.runtime_container_image,
command=(
f'/openhands/micromamba/bin/micromamba run -n openhands '
f'poetry run '
f'python -u -m openhands.runtime.action_execution_server {self._container_port} '
f'--working-dir "{self.config.workspace_mount_path_in_sandbox}" '
f'{plugin_arg}'
f'--username {"openhands" if self.config.run_as_openhands else "root"} '
f'--user-id {self.config.sandbox.user_id} '
f'{browsergym_arg}'
),
network_mode=network_mode,
ports=port_mapping,
working_dir='/openhands/code/', # do not change this!
name=self.container_name,
detach=True,
environment=environment,
volumes=volumes,
)
self.log('debug', f'Container started. Server url: {self.api_url}')
self.send_status_message('STATUS$CONTAINER_STARTED')
except docker.errors.APIError as e:
if '409' in str(e):
self.log(
'warning',
f'Container {self.container_name} already exists. Removing...',
)
remove_all_containers(self.container_name)
return self._init_container()
else:
self.log(
'error',
f'Error: Instance {self.container_name} FAILED to start container!\n',
)
except Exception as e:
self.log(
'error',
f'Error: Instance {self.container_name} FAILED to start container!\n',
)
self.log('error', str(e))
self.close()
raise e
def _attach_to_container(self):
self._container_port = 0
self.container = self.docker_client.containers.get(self.container_name)
for port in self.container.attrs['NetworkSettings']['Ports']: # type: ignore
self._container_port = int(port.split('/')[0])
break
self._host_port = self._container_port
self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
self.log(
'debug',
f'attached to container: {self.container_name} {self._container_port} {self.api_url}',
)
@tenacity.retry(
stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
retry=tenacity.retry_if_exception_type(
(ConnectionError, requests.exceptions.ConnectionError)
),
reraise=True,
wait=tenacity.wait_fixed(2),
)
def _wait_until_alive(self):
try:
container = self.docker_client.containers.get(self.container_name)
if container.status == 'exited':
raise AgentRuntimeDisconnectedError(
f'Container {self.container_name} has exited.'
)
except docker.errors.NotFound:
raise AgentRuntimeNotFoundError(
f'Container {self.container_name} not found.'
)
if not self.log_streamer:
raise AgentRuntimeNotReadyError('Runtime client is not ready.')
with send_request(
self.session,
'GET',
f'{self.api_url}/alive',
timeout=5,
):
pass
def close(self, rm_all_containers: bool | None = None):
"""Closes the EventStreamRuntime and associated objects
Parameters:
- rm_all_containers (bool): Whether to remove all containers with the 'openhands-sandbox-' prefix
"""
def close(self):
"""Closes the EventStreamRuntime and associated objects."""
if self.log_streamer:
self.log_streamer.close()
if self.session:
self.session.close()
if rm_all_containers is None:
rm_all_containers = self.config.sandbox.rm_all_containers
if self.config.sandbox.keep_runtime_alive or self.attach_to_existing:
return
close_prefix = (
CONTAINER_NAME_PREFIX if rm_all_containers else self.container_name
)
remove_all_containers(close_prefix)
def run_action(self, action: Action) -> Observation:
if isinstance(action, FileEditAction):
return self.edit(action)
if not self.container_info:
return ErrorObservation(
'Runtime container is not initialized.',
error_id='AGENT_ERROR$RUNTIME_NOT_READY',
)
# set timeout to default if not set
if action.timeout is None:
action.timeout = self.config.sandbox.timeout
@@ -446,7 +193,7 @@ class EventStreamRuntime(Runtime):
with send_request(
self.session,
'POST',
f'{self.api_url}/execute_action',
f'{self.container_info.api_url}/execute_action',
json={'action': event_to_dict(action)},
# wait a few more seconds to get the timeout error from client side
timeout=action.timeout + 5,
@@ -479,16 +226,15 @@ class EventStreamRuntime(Runtime):
def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
return self.run_action(action)
# ====================================================================
# Implement these methods (for file operations) in the subclass
# ====================================================================
def copy_to(
self, host_src: str, sandbox_dest: str, recursive: bool = False
) -> None:
if not os.path.exists(host_src):
raise FileNotFoundError(f'Source file {host_src} does not exist')
if not self.container_info:
raise RuntimeError('Runtime container is not initialized.')
try:
if recursive:
# For recursive copy, create a zip file
@@ -516,7 +262,7 @@ class EventStreamRuntime(Runtime):
with send_request(
self.session,
'POST',
f'{self.api_url}/upload_file',
f'{self.container_info.api_url}/upload_file',
files=upload_data,
params=params,
timeout=300,
@@ -539,6 +285,8 @@ class EventStreamRuntime(Runtime):
If path is None, list files in the sandbox's initial working directory (e.g., /workspace).
"""
if not self.container_info:
raise RuntimeError('Runtime container is not initialized.')
try:
data = {}
@@ -548,7 +296,7 @@ class EventStreamRuntime(Runtime):
with send_request(
self.session,
'POST',
f'{self.api_url}/list_files',
f'{self.container_info.api_url}/list_files',
json=data,
timeout=10,
) as response:
@@ -560,13 +308,15 @@ class EventStreamRuntime(Runtime):
def copy_from(self, path: str) -> Path:
"""Zip all files in the sandbox and return as a stream of bytes."""
if not self.container_info:
raise RuntimeError('Runtime container is not initialized.')
try:
params = {'path': path}
with send_request(
self.session,
'GET',
f'{self.api_url}/download_files',
f'{self.container_info.api_url}/download_files',
params=params,
stream=True,
timeout=30,
@@ -579,26 +329,9 @@ class EventStreamRuntime(Runtime):
except requests.Timeout:
raise TimeoutError('Copy operation timed out')
def _is_port_in_use_docker(self, port):
containers = self.docker_client.containers.list()
for container in containers:
container_ports = container.ports
if str(port) in str(container_ports):
return True
return False
def _find_available_port(self, max_attempts=5):
port = 39999
for _ in range(max_attempts):
port = find_available_tcp_port(30000, 39999)
if not self._is_port_in_use_docker(port):
return port
# If no port is found after max_attempts, return the last tried port
return port
@property
def vscode_url(self) -> str | None:
if self.vscode_enabled and self._runtime_initialized:
if self.vscode_enabled and self._runtime_initialized and self.container_info:
if (
hasattr(self, '_vscode_url') and self._vscode_url is not None
): # cached value
@@ -607,14 +340,14 @@ class EventStreamRuntime(Runtime):
with send_request(
self.session,
'GET',
f'{self.api_url}/vscode/connection_token',
f'{self.container_info.api_url}/vscode/connection_token',
timeout=10,
) as response:
response_json = response.json()
assert isinstance(response_json, dict)
if response_json['token'] is None:
return None
self._vscode_url = f'http://localhost:{self._host_port + 1}/?tkn={response_json["token"]}&folder={self.config.workspace_mount_path_in_sandbox}'
self._vscode_url = f'http://localhost:{self.container_info.host_port + 1}/?tkn={response_json["token"]}&folder={self.config.workspace_mount_path_in_sandbox}'
self.log(
'debug',
f'VSCode URL: {self._vscode_url}',
@@ -0,0 +1,456 @@
import atexit
import functools
from typing import Dict, List, Optional
import docker
import requests
import tenacity
from openhands.core.config import AppConfig
from openhands.core.exceptions import (
AgentRuntimeDisconnectedError,
AgentRuntimeNotFoundError,
AgentRuntimeNotReadyError,
AgentRuntimeUnavailableError,
)
from openhands.core.logger import DEBUG
from openhands.core.logger import openhands_logger as logger
from openhands.events import EventStream
from openhands.runtime.base import Runtime
from openhands.runtime.builder import DockerRuntimeBuilder
from openhands.runtime.container import ContainerInfo
from openhands.runtime.impl.eventstream.eventstream_runtime import EventStreamRuntime
from openhands.runtime.plugins import PluginRequirement, VSCodeRequirement
from openhands.runtime.runtime_manager import RuntimeManager
from openhands.runtime.utils import find_available_tcp_port
from openhands.runtime.utils.log_streamer import LogStreamer
from openhands.runtime.utils.request import send_request
from openhands.runtime.utils.runtime_build import build_runtime_image
from openhands.utils.tenacity_stop import stop_if_should_exit
CONTAINER_NAME_PREFIX = 'openhands-runtime-'
_atexit_registered = False
class EventStreamRuntimeManager(RuntimeManager):
"""Manages Docker container lifecycle for EventStreamRuntime instances."""
def __init__(self, config: AppConfig):
super().__init__(config)
global _atexit_registered
if not _atexit_registered:
_atexit_registered = True
atexit.register(self._cleanup_all_containers)
self._containers: Dict[str, ContainerInfo] = {}
self._docker_client = self._init_docker_client()
self._runtime_builder: DockerRuntimeBuilder = DockerRuntimeBuilder(
self._docker_client
)
@staticmethod
@functools.lru_cache(maxsize=1)
def _init_docker_client() -> docker.DockerClient:
try:
return docker.from_env()
except Exception as ex:
logger.error(
'Launch docker client failed. Please make sure you have installed docker and started docker desktop/daemon.',
)
raise ex
async def create_runtime(
self,
event_stream: EventStream,
sid: str,
plugins: Optional[List[PluginRequirement]] = None,
env_vars: Optional[Dict[str, str]] = None,
status_callback=None,
attach_to_existing: bool = False,
headless_mode: bool = False,
) -> Runtime:
"""Create a new EventStreamRuntime with an initialized container.
This overrides the base create_runtime to handle container initialization
before creating the runtime.
"""
if sid in self._runtimes:
raise RuntimeError(f'Runtime with ID {sid} already exists')
# First initialize or attach to the container
try:
if attach_to_existing:
container_info = self.attach_to_container(sid)
else:
runtime_container_image = self.config.sandbox.runtime_container_image
if runtime_container_image is None:
if self.config.sandbox.base_container_image is None:
raise ValueError(
'Neither runtime container image nor base container image is set'
)
if status_callback:
status_callback('info', 'STATUS$STARTING_CONTAINER')
runtime_container_image = build_runtime_image(
self.config.sandbox.base_container_image,
self._runtime_builder,
platform=self.config.sandbox.platform,
extra_deps=self.config.sandbox.runtime_extra_deps,
force_rebuild=self.config.sandbox.force_rebuild_runtime,
extra_build_args=self.config.sandbox.runtime_extra_build_args,
)
container_info = self.initialize_container(
runtime_container_image,
sid,
plugins,
env_vars,
status_callback,
)
# Create the runtime with the initialized container
runtime = EventStreamRuntime(
config=self.config,
event_stream=event_stream,
sid=sid,
plugins=plugins,
env_vars=env_vars,
status_callback=status_callback,
attach_to_existing=attach_to_existing,
headless_mode=headless_mode,
container_info=container_info,
)
# Initialize the runtime
try:
await runtime.connect()
except AgentRuntimeUnavailableError as e:
logger.error(f'Runtime initialization failed: {e}', exc_info=True)
if status_callback:
status_callback(
'error', 'STATUS$ERROR_RUNTIME_DISCONNECTED', str(e)
)
self._cleanup_container(sid)
raise
self._runtimes[sid] = runtime
logger.info(f'Created runtime with ID: {sid}')
return runtime
except Exception as e:
logger.error(f'Failed to create runtime: {str(e)}')
self._cleanup_container(sid)
raise
def _is_port_in_use_docker(self, port):
containers = self._docker_client.containers.list()
for container in containers:
container_ports = container.ports
if str(port) in str(container_ports):
return True
return False
def _find_available_port(self, max_attempts=5):
port = 39999
for _ in range(max_attempts):
port = find_available_tcp_port(30000, 39999)
if not self._is_port_in_use_docker(port):
return port
return port
def initialize_container(
self,
runtime_container_image: str,
sid: str,
plugins: Optional[list[PluginRequirement]] = None,
env_vars: Optional[Dict[str, str]] = None,
status_callback=None,
) -> ContainerInfo:
"""Initialize a new container for a runtime.
Args:
runtime_container_image: The Docker image to use
sid: The session ID that will be used to generate the container name
plugins: Optional list of plugins to enable
env_vars: Optional environment variables to set
status_callback: Optional callback for status updates
Returns:
ContainerInfo object with connection details
"""
logger.debug('Preparing to start container...')
if status_callback:
status_callback('info', 'STATUS$PREPARING_CONTAINER')
container_name = f'{CONTAINER_NAME_PREFIX}{sid}'
if container_name in self._containers:
raise RuntimeError(f'Container {container_name} already exists')
# Find an available port
container_port = self._find_available_port()
host_port = container_port # In future this might differ
plugin_arg = ''
if plugins:
plugin_arg = f'--plugins {" ".join([plugin.name for plugin in plugins])} '
use_host_network = self.config.sandbox.use_host_network
network_mode: str | None = 'host' if use_host_network else None
port_mapping: dict[str, list[dict[str, str]]] | None = (
None
if use_host_network
else {f'{container_port}/tcp': [{'HostPort': str(host_port)}]}
)
if use_host_network:
logger.warn(
'Using host network mode. If you are using MacOS, please make sure you have the latest version of Docker Desktop and enabled host network feature: https://docs.docker.com/network/drivers/host/#docker-desktop',
)
environment = {
'port': str(container_port),
'PYTHONUNBUFFERED': '1',
**(env_vars or {}),
}
if self.config.debug or DEBUG:
environment['DEBUG'] = 'true'
if any(isinstance(plugin, VSCodeRequirement) for plugin in (plugins or [])):
if isinstance(port_mapping, dict):
port_mapping[f'{container_port + 1}/tcp'] = [
{'HostPort': str(host_port + 1)}
]
logger.debug(f'Workspace Base: {self.config.workspace_base}')
if (
self.config.workspace_mount_path is not None
and self.config.workspace_mount_path_in_sandbox is not None
):
volumes = {
self.config.workspace_mount_path: {
'bind': self.config.workspace_mount_path_in_sandbox,
'mode': 'rw',
}
}
logger.debug(f'Mount dir: {self.config.workspace_mount_path}')
else:
logger.debug(
'Mount dir is not set, will not mount the workspace directory to the container'
)
volumes = {}
logger.debug(
f'Sandbox workspace: {self.config.workspace_mount_path_in_sandbox}',
)
browsergym_arg = ''
if self.config.sandbox.browsergym_eval_env is not None:
browsergym_arg = (
f'--browsergym-eval-env {self.config.sandbox.browsergym_eval_env}'
)
try:
container = self._docker_client.containers.run(
runtime_container_image,
command=(
f'/openhands/micromamba/bin/micromamba run -n openhands '
f'poetry run '
f'python -u -m openhands.runtime.action_execution_server {container_port} '
f'--working-dir "{self.config.workspace_mount_path_in_sandbox}" '
f'{plugin_arg}'
f'--username {"openhands" if self.config.run_as_openhands else "root"} '
f'--user-id {self.config.sandbox.user_id} '
f'{browsergym_arg}'
),
network_mode=network_mode,
ports=port_mapping,
working_dir='/openhands/code/',
name=container_name,
detach=True,
environment=environment,
volumes=volumes,
)
api_url = f'{self.config.sandbox.local_runtime_url}:{container_port}'
logger.debug(f'Container started. Server url: {api_url}')
if status_callback:
status_callback('info', 'STATUS$CONTAINER_STARTED')
container_info = ContainerInfo(
container_id=container.id,
api_url=api_url,
host_port=host_port,
container_port=container_port,
container=container,
)
self._containers[container_name] = container_info
return container_info
except docker.errors.APIError as e:
if '409' in str(e):
logger.warning(
f'Container {container_name} already exists. Removing...',
)
self._cleanup_container(container_name)
return self.initialize_container(
runtime_container_image,
sid,
plugins,
env_vars,
status_callback,
)
else:
logger.error(
f'Error: Instance {container_name} FAILED to start container!\n',
)
raise
except Exception as e:
logger.error(
f'Error: Instance {container_name} FAILED to start container!\n',
)
logger.error(str(e))
raise
def attach_to_container(self, sid: str) -> ContainerInfo:
"""Attach to an existing container.
Args:
sid: The session ID used to generate the container name
Returns:
ContainerInfo object with connection details
Raises:
AgentRuntimeNotFoundError: If the container doesn't exist
"""
container_name = f'{CONTAINER_NAME_PREFIX}{sid}'
# Check if we already have the container info
if container_name in self._containers:
return self._containers[container_name]
try:
container = self._docker_client.containers.get(container_name)
container_port = 0
for port in container.attrs['NetworkSettings']['Ports']:
container_port = int(port.split('/')[0])
break
host_port = container_port # In future this might differ
api_url = f'{self.config.sandbox.local_runtime_url}:{container_port}'
container_info = ContainerInfo(
container_id=container.id,
api_url=api_url,
host_port=host_port,
container_port=container_port,
container=container,
)
self._containers[container_name] = container_info
return container_info
except docker.errors.NotFound:
raise AgentRuntimeNotFoundError(f'Container {container_name} not found.')
@tenacity.retry(
stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
retry=tenacity.retry_if_exception_type(
(ConnectionError, requests.exceptions.ConnectionError)
),
reraise=True,
wait=tenacity.wait_fixed(2),
)
def wait_until_alive(
self,
sid: str,
log_streamer: Optional[LogStreamer] = None,
):
"""Wait until a container is ready to accept connections.
Args:
sid: The session ID used to generate the container name
log_streamer: Optional log streamer that must be ready
Raises:
AgentRuntimeNotFoundError: If the container doesn't exist
AgentRuntimeDisconnectedError: If the container has exited
AgentRuntimeNotReadyError: If the log streamer isn't ready
"""
container_name = f'{CONTAINER_NAME_PREFIX}{sid}'
container_info = self._containers.get(container_name)
if not container_info:
raise AgentRuntimeNotFoundError(f'Container {container_name} not found.')
try:
if container_info.container.status == 'exited':
raise AgentRuntimeDisconnectedError(
f'Container {container_name} has exited.'
)
except docker.errors.NotFound:
raise AgentRuntimeNotFoundError(f'Container {container_name} not found.')
if not log_streamer:
raise AgentRuntimeNotReadyError('Runtime client is not ready.')
with send_request(
requests.Session(),
'GET',
f'{container_info.api_url}/alive',
timeout=5,
):
pass
def _cleanup_container(self, sid: str, remove_all: bool = False) -> None:
"""Clean up a container and its resources.
Args:
sid: The session ID used to generate the container name
remove_all: If True, remove all containers with the same prefix
"""
container_name = f'{CONTAINER_NAME_PREFIX}{sid}'
if remove_all:
self._cleanup_all_containers()
else:
try:
container = self._docker_client.containers.get(container_name)
container.remove(force=True)
if container_name in self._containers:
del self._containers[container_name]
except docker.errors.NotFound:
pass
def _cleanup_all_containers(self):
"""Clean up all containers managed by this RuntimeManager."""
containers = self._docker_client.containers.list(all=True)
for container in containers:
if container.name.startswith(CONTAINER_NAME_PREFIX):
try:
container.remove(force=True)
except docker.errors.NotFound:
pass
self._containers.clear()
def destroy_runtime(self, runtime_id: str) -> bool:
"""Destroy a runtime and its container.
Args:
runtime_id: The runtime ID to destroy
Returns:
True if the runtime was found and destroyed, False otherwise
"""
runtime = self._runtimes.get(runtime_id)
if runtime:
runtime.close()
self._cleanup_container(runtime_id)
del self._runtimes[runtime_id]
logger.info(f'Destroyed runtime with ID: {runtime_id}')
return True
return False
async def destroy_all_runtimes(self):
"""Destroy all runtimes and their containers."""
for runtime_id in list(self._runtimes.keys()):
self.destroy_runtime(runtime_id)
self._cleanup_all_containers()
+22 -1
View File
@@ -16,6 +16,7 @@ from openhands.runtime.impl.eventstream.eventstream_runtime import (
)
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils.command import get_remote_startup_command
from openhands.runtime.utils.request import send_request
from openhands.runtime.utils.runtime_build import (
BuildFromImageType,
prep_build_folder,
@@ -181,7 +182,7 @@ class ModalRuntime(EventStreamRuntime):
self.log('debug', 'Waiting for client to become ready...')
self.send_status_message('STATUS$WAITING_FOR_CLIENT')
self._wait_until_alive()
await call_sync_from_async(self._wait_until_alive)
self.setup_initial_env()
if not self.attach_to_existing:
@@ -290,6 +291,26 @@ echo 'export INPUTRC=/etc/inputrc' >> /etc/bash.bashrc
self.close()
raise e
@tenacity.retry(
stop=tenacity.stop_after_delay(120),
retry=tenacity.retry_if_exception_type(
(ConnectionError, requests.exceptions.ConnectionError)
),
reraise=True,
wait=tenacity.wait_fixed(2),
)
def _wait_until_alive(self):
if not self.sandbox:
raise RuntimeError('Sandbox not initialized')
with send_request(
self.session,
'GET',
f'{self.api_url}/alive',
timeout=5,
):
pass
def close(self):
"""Closes the ModalRuntime and associated objects."""
if self.log_streamer: