Compare commits

...

43 Commits

Author SHA1 Message Date
Robert Brennan
86403a9446 Merge branch 'feature/runtime-manager' into feature/runtime-manager-impl 2024-12-24 15:54:48 -05:00
Robert Brennan
3ecd214d69 Merge branch 'main' into feature/runtime-manager 2024-12-24 15:54:40 -05:00
Robert Brennan
c9a6402103 add cleanup logic 2024-12-24 15:52:58 -05:00
Robert Brennan
33a1dd89e7 remove conversation logic 2024-12-24 15:48:11 -05:00
Robert Brennan
2bdad7d910 Merge branch 'feature/runtime-manager' into feature/runtime-manager-impl 2024-12-24 15:33:18 -05:00
Robert Brennan
d3f726df51 change connect logic 2024-12-24 15:33:04 -05:00
Robert Brennan
fc9bcc3511 change connect logic 2024-12-24 15:31:51 -05:00
Robert Brennan
5eecddbf13 Merge branch 'feature/runtime-manager' into feature/runtime-manager-impl 2024-12-24 15:28:05 -05:00
Robert Brennan
333f9a5bdf fix tests 2024-12-24 15:27:57 -05:00
Robert Brennan
9b61f38c4a fix imports 2024-12-24 15:25:59 -05:00
Robert Brennan
076cb2d055 Merge branch 'feature/runtime-manager' into feature/runtime-manager-impl 2024-12-24 15:24:20 -05:00
Robert Brennan
0d454d46f2 Revert "refactor: move runtime creation logic to RuntimeManager.get_runtime()"
This reverts commit 42730014d5.
2024-12-24 15:16:32 -05:00
Robert Brennan
e7685f185c fix cli 2024-12-24 15:15:08 -05:00
Robert Brennan
749da6367e update cli 2024-12-24 15:14:18 -05:00
Robert Brennan
4b497c8e64 fix runtime_manager plumbing 2024-12-24 15:12:15 -05:00
openhands
42730014d5 refactor: move runtime creation logic to RuntimeManager.get_runtime() 2024-12-24 19:58:14 +00:00
openhands
81110671b2 refactor: use singleton RuntimeManager from shared.py 2024-12-24 19:57:06 +00:00
openhands
25f3349e1a refactor: use RuntimeManager to get existing runtime in Conversation 2024-12-24 19:52:35 +00:00
Robert Brennan
05dd9d7a06 revert base runtime 2024-12-24 14:38:36 -05:00
openhands
42b2ccb985 refactor: fix ownership model and remove container knowledge from base RuntimeManager
- Restore base RuntimeManager to original state
- Move container initialization to EventStreamRuntimeManager.create_runtime
- Update EventStreamRuntime to accept container_info in __init__
- Fix imports and type errors
2024-12-24 19:33:04 +00:00
openhands
c1278c9835 fix: add type hints and fix runtime builder initialization 2024-12-24 19:22:17 +00:00
openhands
4bbb3572a6 revert: restore base RuntimeManager from feature/runtime-manager branch 2024-12-24 19:18:19 +00:00
openhands
b4bd940a63 refactor: add EventStreamRuntimeManager
- Add ContainerInfo class in shared location
- Add EventStreamRuntimeManager for Docker container lifecycle
- Update EventStreamRuntime to use specialized manager
- Fix type errors and improve code quality
2024-12-24 19:14:51 +00:00
openhands
cbd4f94a2d refactor: remove runtime_manager dependency from Runtime
- Remove runtime_manager parameter from Runtime class
- Remove runtime_manager property from Runtime class
- Update EventStreamRuntime to use RuntimeManager singleton
- Fix type errors and improve code quality
2024-12-24 19:06:51 +00:00
openhands
ef95ddc680 refactor: improve container lifecycle management
- Move container lifecycle management to RuntimeManager
- Add ContainerInfo class to encapsulate container details
- Remove runtime_manager dependency from Runtime
- Add container state checks in EventStreamRuntime
- Fix type errors and add missing imports
2024-12-24 18:57:20 +00:00
openhands
a287e5388c refactor: move container lifecycle management to RuntimeManager
- Move Docker-related methods from EventStreamRuntime to RuntimeManager
- Add container lifecycle management methods to RuntimeManager
- Update EventStreamRuntime to use RuntimeManager
- Update base Runtime class to support RuntimeManager
- Fix type errors and add missing imports
2024-12-24 18:45:23 +00:00
openhands
30f6166bf6 fix: Fix async mock in test_process_issue 2024-12-24 17:55:26 +00:00
Robert Brennan
1f706fe2f2 fix test 2024-12-24 12:32:02 -05:00
Robert Brennan
4123c65317 Merge branch 'main' into feature/runtime-manager 2024-12-24 12:04:33 -05:00
Robert Brennan
6dfd54be9f fix plumbing 2024-12-24 11:53:17 -05:00
openhands
8eef9b2563 Use server's shared config for RuntimeManager 2024-12-24 16:39:27 +00:00
openhands
5d5978c6cb Move runtime class resolution to RuntimeManager and remove redundant error callback 2024-12-24 16:35:14 +00:00
openhands
1a17972b4e Move RuntimeManager to module level and simplify config handling 2024-12-24 16:30:14 +00:00
openhands
4de7a4f85d Simplify RuntimeManager config handling and fix type issues 2024-12-24 16:25:24 +00:00
openhands
8befeca41d Fix linting issues and add missing await for create_runtime 2024-12-24 16:16:16 +00:00
openhands
918139e886 Move AppConfig to RuntimeManager class level and update initialization flow 2024-12-24 16:13:09 +00:00
openhands
6374174095 Update main.py to use RuntimeManager for runtime creation 2024-12-24 16:02:47 +00:00
Robert Brennan
138f6932eb move import 2024-12-24 10:08:13 -05:00
Robert Brennan
7181efd26d move import 2024-12-24 10:07:24 -05:00
Robert Brennan
3a52360ab0 remove exit 2024-12-24 10:05:07 -05:00
openhands
cd9eb1d85c Fix singleton import and add tests for RuntimeManager 2024-12-24 15:03:10 +00:00
openhands
ada657b476 Fix linting issues in runtime_manager.py 2024-12-24 14:54:11 +00:00
openhands
b630d65626 Add RuntimeManager for centralized runtime management 2024-12-24 14:52:19 +00:00
23 changed files with 712 additions and 535 deletions

View File

@@ -35,8 +35,8 @@ from openhands.events.observation import (
NullObservation,
)
from openhands.llm.llm import LLM
from openhands.runtime import get_runtime_cls
from openhands.runtime.base import Runtime
from openhands.runtime.runtime_manager import RuntimeManager
from openhands.security import SecurityAnalyzer, options
from openhands.storage import get_file_store
@@ -125,9 +125,8 @@ async def main():
file_store = get_file_store(config.file_store, config.file_store_path)
event_stream = EventStream(sid, file_store)
runtime_cls = get_runtime_cls(config.runtime)
runtime: Runtime = runtime_cls( # noqa: F841
config=config,
runtime_manager = RuntimeManager(config)
runtime: Runtime = await runtime_manager.create_runtime(
event_stream=event_stream,
sid=sid,
plugins=agent_cls.sandbox_plugins,
@@ -195,8 +194,6 @@ async def main():
event_stream.subscribe(EventStreamSubscriber.MAIN, on_event, str(uuid4()))
await runtime.connect()
asyncio.create_task(prompt_for_next_task())
await run_agent_until_done(

View File

@@ -26,8 +26,8 @@ from openhands.events.event import Event
from openhands.events.observation import AgentStateChangedObservation
from openhands.events.serialization.event import event_to_trajectory
from openhands.llm.llm import LLM
from openhands.runtime import get_runtime_cls
from openhands.runtime.base import Runtime
from openhands.runtime.runtime_manager import RuntimeManager
from openhands.storage import get_file_store
@@ -51,7 +51,7 @@ def read_task_from_stdin() -> str:
return sys.stdin.read()
def create_runtime(
async def create_runtime(
config: AppConfig,
sid: str | None = None,
headless_mode: bool = True,
@@ -77,10 +77,8 @@ def create_runtime(
agent_cls = openhands.agenthub.Agent.get_cls(config.default_agent)
# runtime and tools
runtime_cls = get_runtime_cls(config.runtime)
logger.debug(f'Initializing runtime: {runtime_cls.__name__}')
runtime: Runtime = runtime_cls(
config=config,
runtime_manager = RuntimeManager(config)
runtime: Runtime = await runtime_manager.create_runtime(
event_stream=event_stream,
sid=session_id,
plugins=agent_cls.sandbox_plugins,
@@ -129,8 +127,7 @@ async def run_controller(
sid = sid or generate_sid(config)
if runtime is None:
runtime = create_runtime(config, sid=sid, headless_mode=headless_mode)
await runtime.connect()
runtime = await create_runtime(config, sid=sid, headless_mode=headless_mode)
event_stream = runtime.event_stream

View File

@@ -199,8 +199,7 @@ async def process_issue(
)
config.set_llm_config(llm_config)
runtime = create_runtime(config)
await runtime.connect()
runtime = await create_runtime(config)
async def on_event(evt):
logger.info(evt)

View File

@@ -0,0 +1,21 @@
"""Container-related types and utilities."""
import docker
class ContainerInfo:
"""Information about a running container that a Runtime needs."""
def __init__(
self,
container_id: str,
api_url: str,
host_port: int,
container_port: int,
container: docker.models.containers.Container,
):
self.container_id = container_id
self.api_url = api_url
self.host_port = host_port
self.container_port = container_port
self.container = container

View File

@@ -1,26 +1,17 @@
import atexit
import os
import tempfile
import threading
from functools import lru_cache
from pathlib import Path
from typing import Callable
from zipfile import ZipFile
import docker
import requests
import tenacity
from openhands.core.config import AppConfig
from openhands.core.exceptions import (
AgentRuntimeDisconnectedError,
AgentRuntimeError,
AgentRuntimeNotFoundError,
AgentRuntimeNotReadyError,
AgentRuntimeTimeoutError,
)
from openhands.core.logger import DEBUG
from openhands.core.logger import openhands_logger as logger
from openhands.events import EventStream
from openhands.events.action import (
ActionConfirmationStatus,
@@ -42,24 +33,11 @@ from openhands.events.observation import (
from openhands.events.serialization import event_to_dict, observation_from_dict
from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
from openhands.runtime.base import Runtime
from openhands.runtime.builder import DockerRuntimeBuilder
from openhands.runtime.impl.eventstream.containers import remove_all_containers
from openhands.runtime.container import ContainerInfo
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils import find_available_tcp_port
from openhands.runtime.utils.log_streamer import LogStreamer
from openhands.runtime.utils.request import send_request
from openhands.runtime.utils.runtime_build import build_runtime_image
from openhands.utils.async_utils import call_sync_from_async
from openhands.utils.tenacity_stop import stop_if_should_exit
CONTAINER_NAME_PREFIX = 'openhands-runtime-'
def remove_all_runtime_containers():
remove_all_containers(CONTAINER_NAME_PREFIX)
_atexit_registered = False
class EventStreamRuntime(Runtime):
@@ -74,8 +52,6 @@ class EventStreamRuntime(Runtime):
env_vars (dict[str, str] | None, optional): Environment variables to set. Defaults to None.
"""
# Need to provide this method to allow inheritors to init the Runtime
# without initting the EventStreamRuntime.
def init_base_runtime(
self,
config: AppConfig,
@@ -108,31 +84,15 @@ class EventStreamRuntime(Runtime):
status_callback: Callable | None = None,
attach_to_existing: bool = False,
headless_mode: bool = True,
container_info: ContainerInfo | None = None,
):
global _atexit_registered
if not _atexit_registered:
_atexit_registered = True
atexit.register(remove_all_runtime_containers)
self.config = config
self._host_port = 30000 # initial dummy value
self._container_port = 30001 # initial dummy value
self._vscode_url: str | None = None # initial dummy value
self._vscode_url: str | None = None
self._runtime_initialized: bool = False
self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
self.session = requests.Session()
self.status_callback = status_callback
self.docker_client: docker.DockerClient = self._init_docker_client()
self.base_container_image = self.config.sandbox.base_container_image
self.runtime_container_image = self.config.sandbox.runtime_container_image
self.container_name = CONTAINER_NAME_PREFIX + sid
self.container = None
self.container_info = container_info
self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time
self.runtime_builder = DockerRuntimeBuilder(self.docker_client)
# Buffer for container logs
self.log_streamer: LogStreamer | None = None
self.init_base_runtime(
@@ -154,52 +114,22 @@ class EventStreamRuntime(Runtime):
)
async def connect(self):
"""Initialize the runtime with the provided container."""
if not self.container_info:
raise RuntimeError('Container info not provided')
self.send_status_message('STATUS$STARTING_RUNTIME')
try:
await call_sync_from_async(self._attach_to_container)
except docker.errors.NotFound as e:
if self.attach_to_existing:
self.log(
'error',
f'Container {self.container_name} not found.',
)
raise e
if self.runtime_container_image is None:
if self.base_container_image is None:
raise ValueError(
'Neither runtime container image nor base container image is set'
)
self.send_status_message('STATUS$STARTING_CONTAINER')
self.runtime_container_image = build_runtime_image(
self.base_container_image,
self.runtime_builder,
platform=self.config.sandbox.platform,
extra_deps=self.config.sandbox.runtime_extra_deps,
force_rebuild=self.config.sandbox.force_rebuild_runtime,
extra_build_args=self.config.sandbox.runtime_extra_build_args,
)
self.log(
'info', f'Starting runtime with image: {self.runtime_container_image}'
)
await call_sync_from_async(self._init_container)
self.log(
'info',
f'Container started: {self.container_name}. VSCode URL: {self.vscode_url}',
)
self.log_streamer = LogStreamer(self.container, self.log)
self.log_streamer = LogStreamer(self.container_info.container, self.log)
if not self.attach_to_existing:
self.log('info', f'Waiting for client to become ready at {self.api_url}...')
self.log(
'info',
f'Waiting for client to become ready at {self.container_info.api_url}...',
)
self.send_status_message('STATUS$WAITING_FOR_CLIENT')
await call_sync_from_async(self._wait_until_alive)
if not self.attach_to_existing:
self.log('info', 'Runtime is ready.')
if not self.attach_to_existing:
await call_sync_from_async(self.setup_initial_env)
self.log(
@@ -210,207 +140,24 @@ class EventStreamRuntime(Runtime):
self.send_status_message(' ')
self._runtime_initialized = True
@staticmethod
@lru_cache(maxsize=1)
def _init_docker_client() -> docker.DockerClient:
try:
return docker.from_env()
except Exception as ex:
logger.error(
'Launch docker client failed. Please make sure you have installed docker and started docker desktop/daemon.',
)
raise ex
def _init_container(self):
self.log('debug', 'Preparing to start container...')
self.send_status_message('STATUS$PREPARING_CONTAINER')
plugin_arg = ''
if self.plugins is not None and len(self.plugins) > 0:
plugin_arg = (
f'--plugins {" ".join([plugin.name for plugin in self.plugins])} '
)
self._host_port = self._find_available_port()
self._container_port = (
self._host_port
) # in future this might differ from host port
self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
use_host_network = self.config.sandbox.use_host_network
network_mode: str | None = 'host' if use_host_network else None
port_mapping: dict[str, list[dict[str, str]]] | None = (
None
if use_host_network
else {f'{self._container_port}/tcp': [{'HostPort': str(self._host_port)}]}
)
if use_host_network:
self.log(
'warn',
'Using host network mode. If you are using MacOS, please make sure you have the latest version of Docker Desktop and enabled host network feature: https://docs.docker.com/network/drivers/host/#docker-desktop',
)
# Combine environment variables
environment = {
'port': str(self._container_port),
'PYTHONUNBUFFERED': 1,
}
if self.config.debug or DEBUG:
environment['DEBUG'] = 'true'
if self.vscode_enabled:
# vscode is on port +1 from container port
if isinstance(port_mapping, dict):
port_mapping[f'{self._container_port + 1}/tcp'] = [
{'HostPort': str(self._host_port + 1)}
]
self.log('debug', f'Workspace Base: {self.config.workspace_base}')
if (
self.config.workspace_mount_path is not None
and self.config.workspace_mount_path_in_sandbox is not None
):
# e.g. result would be: {"/home/user/openhands/workspace": {'bind': "/workspace", 'mode': 'rw'}}
volumes = {
self.config.workspace_mount_path: {
'bind': self.config.workspace_mount_path_in_sandbox,
'mode': 'rw',
}
}
logger.debug(f'Mount dir: {self.config.workspace_mount_path}')
else:
logger.debug(
'Mount dir is not set, will not mount the workspace directory to the container'
)
volumes = None
self.log(
'debug',
f'Sandbox workspace: {self.config.workspace_mount_path_in_sandbox}',
)
if self.config.sandbox.browsergym_eval_env is not None:
browsergym_arg = (
f'--browsergym-eval-env {self.config.sandbox.browsergym_eval_env}'
)
else:
browsergym_arg = ''
try:
self.container = self.docker_client.containers.run(
self.runtime_container_image,
command=(
f'/openhands/micromamba/bin/micromamba run -n openhands '
f'poetry run '
f'python -u -m openhands.runtime.action_execution_server {self._container_port} '
f'--working-dir "{self.config.workspace_mount_path_in_sandbox}" '
f'{plugin_arg}'
f'--username {"openhands" if self.config.run_as_openhands else "root"} '
f'--user-id {self.config.sandbox.user_id} '
f'{browsergym_arg}'
),
network_mode=network_mode,
ports=port_mapping,
working_dir='/openhands/code/', # do not change this!
name=self.container_name,
detach=True,
environment=environment,
volumes=volumes,
)
self.log('debug', f'Container started. Server url: {self.api_url}')
self.send_status_message('STATUS$CONTAINER_STARTED')
except docker.errors.APIError as e:
if '409' in str(e):
self.log(
'warning',
f'Container {self.container_name} already exists. Removing...',
)
remove_all_containers(self.container_name)
return self._init_container()
else:
self.log(
'error',
f'Error: Instance {self.container_name} FAILED to start container!\n',
)
except Exception as e:
self.log(
'error',
f'Error: Instance {self.container_name} FAILED to start container!\n',
)
self.log('error', str(e))
self.close()
raise e
def _attach_to_container(self):
self._container_port = 0
self.container = self.docker_client.containers.get(self.container_name)
for port in self.container.attrs['NetworkSettings']['Ports']: # type: ignore
self._container_port = int(port.split('/')[0])
break
self._host_port = self._container_port
self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
self.log(
'debug',
f'attached to container: {self.container_name} {self._container_port} {self.api_url}',
)
@tenacity.retry(
stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
retry=tenacity.retry_if_exception_type(
(ConnectionError, requests.exceptions.ConnectionError)
),
reraise=True,
wait=tenacity.wait_fixed(2),
)
def _wait_until_alive(self):
try:
container = self.docker_client.containers.get(self.container_name)
if container.status == 'exited':
raise AgentRuntimeDisconnectedError(
f'Container {self.container_name} has exited.'
)
except docker.errors.NotFound:
raise AgentRuntimeNotFoundError(
f'Container {self.container_name} not found.'
)
if not self.log_streamer:
raise AgentRuntimeNotReadyError('Runtime client is not ready.')
with send_request(
self.session,
'GET',
f'{self.api_url}/alive',
timeout=5,
):
pass
def close(self, rm_all_containers: bool | None = None):
"""Closes the EventStreamRuntime and associated objects
Parameters:
- rm_all_containers (bool): Whether to remove all containers with the 'openhands-sandbox-' prefix
"""
def close(self):
"""Closes the EventStreamRuntime and associated objects."""
if self.log_streamer:
self.log_streamer.close()
if self.session:
self.session.close()
if rm_all_containers is None:
rm_all_containers = self.config.sandbox.rm_all_containers
if self.config.sandbox.keep_runtime_alive or self.attach_to_existing:
return
close_prefix = (
CONTAINER_NAME_PREFIX if rm_all_containers else self.container_name
)
remove_all_containers(close_prefix)
def run_action(self, action: Action) -> Observation:
if isinstance(action, FileEditAction):
return self.edit(action)
if not self.container_info:
return ErrorObservation(
'Runtime container is not initialized.',
error_id='AGENT_ERROR$RUNTIME_NOT_READY',
)
# set timeout to default if not set
if action.timeout is None:
action.timeout = self.config.sandbox.timeout
@@ -446,7 +193,7 @@ class EventStreamRuntime(Runtime):
with send_request(
self.session,
'POST',
f'{self.api_url}/execute_action',
f'{self.container_info.api_url}/execute_action',
json={'action': event_to_dict(action)},
# wait a few more seconds to get the timeout error from client side
timeout=action.timeout + 5,
@@ -479,16 +226,15 @@ class EventStreamRuntime(Runtime):
def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
return self.run_action(action)
# ====================================================================
# Implement these methods (for file operations) in the subclass
# ====================================================================
def copy_to(
self, host_src: str, sandbox_dest: str, recursive: bool = False
) -> None:
if not os.path.exists(host_src):
raise FileNotFoundError(f'Source file {host_src} does not exist')
if not self.container_info:
raise RuntimeError('Runtime container is not initialized.')
try:
if recursive:
# For recursive copy, create a zip file
@@ -516,7 +262,7 @@ class EventStreamRuntime(Runtime):
with send_request(
self.session,
'POST',
f'{self.api_url}/upload_file',
f'{self.container_info.api_url}/upload_file',
files=upload_data,
params=params,
timeout=300,
@@ -539,6 +285,8 @@ class EventStreamRuntime(Runtime):
If path is None, list files in the sandbox's initial working directory (e.g., /workspace).
"""
if not self.container_info:
raise RuntimeError('Runtime container is not initialized.')
try:
data = {}
@@ -548,7 +296,7 @@ class EventStreamRuntime(Runtime):
with send_request(
self.session,
'POST',
f'{self.api_url}/list_files',
f'{self.container_info.api_url}/list_files',
json=data,
timeout=10,
) as response:
@@ -560,13 +308,15 @@ class EventStreamRuntime(Runtime):
def copy_from(self, path: str) -> Path:
"""Zip all files in the sandbox and return as a stream of bytes."""
if not self.container_info:
raise RuntimeError('Runtime container is not initialized.')
try:
params = {'path': path}
with send_request(
self.session,
'GET',
f'{self.api_url}/download_files',
f'{self.container_info.api_url}/download_files',
params=params,
stream=True,
timeout=30,
@@ -579,26 +329,9 @@ class EventStreamRuntime(Runtime):
except requests.Timeout:
raise TimeoutError('Copy operation timed out')
def _is_port_in_use_docker(self, port):
containers = self.docker_client.containers.list()
for container in containers:
container_ports = container.ports
if str(port) in str(container_ports):
return True
return False
def _find_available_port(self, max_attempts=5):
port = 39999
for _ in range(max_attempts):
port = find_available_tcp_port(30000, 39999)
if not self._is_port_in_use_docker(port):
return port
# If no port is found after max_attempts, return the last tried port
return port
@property
def vscode_url(self) -> str | None:
if self.vscode_enabled and self._runtime_initialized:
if self.vscode_enabled and self._runtime_initialized and self.container_info:
if (
hasattr(self, '_vscode_url') and self._vscode_url is not None
): # cached value
@@ -607,14 +340,14 @@ class EventStreamRuntime(Runtime):
with send_request(
self.session,
'GET',
f'{self.api_url}/vscode/connection_token',
f'{self.container_info.api_url}/vscode/connection_token',
timeout=10,
) as response:
response_json = response.json()
assert isinstance(response_json, dict)
if response_json['token'] is None:
return None
self._vscode_url = f'http://localhost:{self._host_port + 1}/?tkn={response_json["token"]}&folder={self.config.workspace_mount_path_in_sandbox}'
self._vscode_url = f'http://localhost:{self.container_info.host_port + 1}/?tkn={response_json["token"]}&folder={self.config.workspace_mount_path_in_sandbox}'
self.log(
'debug',
f'VSCode URL: {self._vscode_url}',

View File

@@ -0,0 +1,456 @@
import atexit
import functools
from typing import Dict, List, Optional
import docker
import requests
import tenacity
from openhands.core.config import AppConfig
from openhands.core.exceptions import (
AgentRuntimeDisconnectedError,
AgentRuntimeNotFoundError,
AgentRuntimeNotReadyError,
AgentRuntimeUnavailableError,
)
from openhands.core.logger import DEBUG
from openhands.core.logger import openhands_logger as logger
from openhands.events import EventStream
from openhands.runtime.base import Runtime
from openhands.runtime.builder import DockerRuntimeBuilder
from openhands.runtime.container import ContainerInfo
from openhands.runtime.impl.eventstream.eventstream_runtime import EventStreamRuntime
from openhands.runtime.plugins import PluginRequirement, VSCodeRequirement
from openhands.runtime.runtime_manager import RuntimeManager
from openhands.runtime.utils import find_available_tcp_port
from openhands.runtime.utils.log_streamer import LogStreamer
from openhands.runtime.utils.request import send_request
from openhands.runtime.utils.runtime_build import build_runtime_image
from openhands.utils.tenacity_stop import stop_if_should_exit
CONTAINER_NAME_PREFIX = 'openhands-runtime-'
_atexit_registered = False
class EventStreamRuntimeManager(RuntimeManager):
"""Manages Docker container lifecycle for EventStreamRuntime instances."""
def __init__(self, config: AppConfig):
super().__init__(config)
global _atexit_registered
if not _atexit_registered:
_atexit_registered = True
atexit.register(self._cleanup_all_containers)
self._containers: Dict[str, ContainerInfo] = {}
self._docker_client = self._init_docker_client()
self._runtime_builder: DockerRuntimeBuilder = DockerRuntimeBuilder(
self._docker_client
)
@staticmethod
@functools.lru_cache(maxsize=1)
def _init_docker_client() -> docker.DockerClient:
try:
return docker.from_env()
except Exception as ex:
logger.error(
'Launch docker client failed. Please make sure you have installed docker and started docker desktop/daemon.',
)
raise ex
async def create_runtime(
self,
event_stream: EventStream,
sid: str,
plugins: Optional[List[PluginRequirement]] = None,
env_vars: Optional[Dict[str, str]] = None,
status_callback=None,
attach_to_existing: bool = False,
headless_mode: bool = False,
) -> Runtime:
"""Create a new EventStreamRuntime with an initialized container.
This overrides the base create_runtime to handle container initialization
before creating the runtime.
"""
if sid in self._runtimes:
raise RuntimeError(f'Runtime with ID {sid} already exists')
# First initialize or attach to the container
try:
if attach_to_existing:
container_info = self.attach_to_container(sid)
else:
runtime_container_image = self.config.sandbox.runtime_container_image
if runtime_container_image is None:
if self.config.sandbox.base_container_image is None:
raise ValueError(
'Neither runtime container image nor base container image is set'
)
if status_callback:
status_callback('info', 'STATUS$STARTING_CONTAINER')
runtime_container_image = build_runtime_image(
self.config.sandbox.base_container_image,
self._runtime_builder,
platform=self.config.sandbox.platform,
extra_deps=self.config.sandbox.runtime_extra_deps,
force_rebuild=self.config.sandbox.force_rebuild_runtime,
extra_build_args=self.config.sandbox.runtime_extra_build_args,
)
container_info = self.initialize_container(
runtime_container_image,
sid,
plugins,
env_vars,
status_callback,
)
# Create the runtime with the initialized container
runtime = EventStreamRuntime(
config=self.config,
event_stream=event_stream,
sid=sid,
plugins=plugins,
env_vars=env_vars,
status_callback=status_callback,
attach_to_existing=attach_to_existing,
headless_mode=headless_mode,
container_info=container_info,
)
# Initialize the runtime
try:
await runtime.connect()
except AgentRuntimeUnavailableError as e:
logger.error(f'Runtime initialization failed: {e}', exc_info=True)
if status_callback:
status_callback(
'error', 'STATUS$ERROR_RUNTIME_DISCONNECTED', str(e)
)
self._cleanup_container(sid)
raise
self._runtimes[sid] = runtime
logger.info(f'Created runtime with ID: {sid}')
return runtime
except Exception as e:
logger.error(f'Failed to create runtime: {str(e)}')
self._cleanup_container(sid)
raise
def _is_port_in_use_docker(self, port):
containers = self._docker_client.containers.list()
for container in containers:
container_ports = container.ports
if str(port) in str(container_ports):
return True
return False
def _find_available_port(self, max_attempts=5):
port = 39999
for _ in range(max_attempts):
port = find_available_tcp_port(30000, 39999)
if not self._is_port_in_use_docker(port):
return port
return port
def initialize_container(
self,
runtime_container_image: str,
sid: str,
plugins: Optional[list[PluginRequirement]] = None,
env_vars: Optional[Dict[str, str]] = None,
status_callback=None,
) -> ContainerInfo:
"""Initialize a new container for a runtime.
Args:
runtime_container_image: The Docker image to use
sid: The session ID that will be used to generate the container name
plugins: Optional list of plugins to enable
env_vars: Optional environment variables to set
status_callback: Optional callback for status updates
Returns:
ContainerInfo object with connection details
"""
logger.debug('Preparing to start container...')
if status_callback:
status_callback('info', 'STATUS$PREPARING_CONTAINER')
container_name = f'{CONTAINER_NAME_PREFIX}{sid}'
if container_name in self._containers:
raise RuntimeError(f'Container {container_name} already exists')
# Find an available port
container_port = self._find_available_port()
host_port = container_port # In future this might differ
plugin_arg = ''
if plugins:
plugin_arg = f'--plugins {" ".join([plugin.name for plugin in plugins])} '
use_host_network = self.config.sandbox.use_host_network
network_mode: str | None = 'host' if use_host_network else None
port_mapping: dict[str, list[dict[str, str]]] | None = (
None
if use_host_network
else {f'{container_port}/tcp': [{'HostPort': str(host_port)}]}
)
if use_host_network:
logger.warn(
'Using host network mode. If you are using MacOS, please make sure you have the latest version of Docker Desktop and enabled host network feature: https://docs.docker.com/network/drivers/host/#docker-desktop',
)
environment = {
'port': str(container_port),
'PYTHONUNBUFFERED': '1',
**(env_vars or {}),
}
if self.config.debug or DEBUG:
environment['DEBUG'] = 'true'
if any(isinstance(plugin, VSCodeRequirement) for plugin in (plugins or [])):
if isinstance(port_mapping, dict):
port_mapping[f'{container_port + 1}/tcp'] = [
{'HostPort': str(host_port + 1)}
]
logger.debug(f'Workspace Base: {self.config.workspace_base}')
if (
self.config.workspace_mount_path is not None
and self.config.workspace_mount_path_in_sandbox is not None
):
volumes = {
self.config.workspace_mount_path: {
'bind': self.config.workspace_mount_path_in_sandbox,
'mode': 'rw',
}
}
logger.debug(f'Mount dir: {self.config.workspace_mount_path}')
else:
logger.debug(
'Mount dir is not set, will not mount the workspace directory to the container'
)
volumes = {}
logger.debug(
f'Sandbox workspace: {self.config.workspace_mount_path_in_sandbox}',
)
browsergym_arg = ''
if self.config.sandbox.browsergym_eval_env is not None:
browsergym_arg = (
f'--browsergym-eval-env {self.config.sandbox.browsergym_eval_env}'
)
try:
container = self._docker_client.containers.run(
runtime_container_image,
command=(
f'/openhands/micromamba/bin/micromamba run -n openhands '
f'poetry run '
f'python -u -m openhands.runtime.action_execution_server {container_port} '
f'--working-dir "{self.config.workspace_mount_path_in_sandbox}" '
f'{plugin_arg}'
f'--username {"openhands" if self.config.run_as_openhands else "root"} '
f'--user-id {self.config.sandbox.user_id} '
f'{browsergym_arg}'
),
network_mode=network_mode,
ports=port_mapping,
working_dir='/openhands/code/',
name=container_name,
detach=True,
environment=environment,
volumes=volumes,
)
api_url = f'{self.config.sandbox.local_runtime_url}:{container_port}'
logger.debug(f'Container started. Server url: {api_url}')
if status_callback:
status_callback('info', 'STATUS$CONTAINER_STARTED')
container_info = ContainerInfo(
container_id=container.id,
api_url=api_url,
host_port=host_port,
container_port=container_port,
container=container,
)
self._containers[container_name] = container_info
return container_info
except docker.errors.APIError as e:
if '409' in str(e):
logger.warning(
f'Container {container_name} already exists. Removing...',
)
self._cleanup_container(container_name)
return self.initialize_container(
runtime_container_image,
sid,
plugins,
env_vars,
status_callback,
)
else:
logger.error(
f'Error: Instance {container_name} FAILED to start container!\n',
)
raise
except Exception as e:
logger.error(
f'Error: Instance {container_name} FAILED to start container!\n',
)
logger.error(str(e))
raise
def attach_to_container(self, sid: str) -> ContainerInfo:
"""Attach to an existing container.
Args:
sid: The session ID used to generate the container name
Returns:
ContainerInfo object with connection details
Raises:
AgentRuntimeNotFoundError: If the container doesn't exist
"""
container_name = f'{CONTAINER_NAME_PREFIX}{sid}'
# Check if we already have the container info
if container_name in self._containers:
return self._containers[container_name]
try:
container = self._docker_client.containers.get(container_name)
container_port = 0
for port in container.attrs['NetworkSettings']['Ports']:
container_port = int(port.split('/')[0])
break
host_port = container_port # In future this might differ
api_url = f'{self.config.sandbox.local_runtime_url}:{container_port}'
container_info = ContainerInfo(
container_id=container.id,
api_url=api_url,
host_port=host_port,
container_port=container_port,
container=container,
)
self._containers[container_name] = container_info
return container_info
except docker.errors.NotFound:
raise AgentRuntimeNotFoundError(f'Container {container_name} not found.')
@tenacity.retry(
stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
retry=tenacity.retry_if_exception_type(
(ConnectionError, requests.exceptions.ConnectionError)
),
reraise=True,
wait=tenacity.wait_fixed(2),
)
def wait_until_alive(
self,
sid: str,
log_streamer: Optional[LogStreamer] = None,
):
"""Wait until a container is ready to accept connections.
Args:
sid: The session ID used to generate the container name
log_streamer: Optional log streamer that must be ready
Raises:
AgentRuntimeNotFoundError: If the container doesn't exist
AgentRuntimeDisconnectedError: If the container has exited
AgentRuntimeNotReadyError: If the log streamer isn't ready
"""
container_name = f'{CONTAINER_NAME_PREFIX}{sid}'
container_info = self._containers.get(container_name)
if not container_info:
raise AgentRuntimeNotFoundError(f'Container {container_name} not found.')
try:
if container_info.container.status == 'exited':
raise AgentRuntimeDisconnectedError(
f'Container {container_name} has exited.'
)
except docker.errors.NotFound:
raise AgentRuntimeNotFoundError(f'Container {container_name} not found.')
if not log_streamer:
raise AgentRuntimeNotReadyError('Runtime client is not ready.')
with send_request(
requests.Session(),
'GET',
f'{container_info.api_url}/alive',
timeout=5,
):
pass
def _cleanup_container(self, sid: str, remove_all: bool = False) -> None:
"""Clean up a container and its resources.
Args:
sid: The session ID used to generate the container name
remove_all: If True, remove all containers with the same prefix
"""
container_name = f'{CONTAINER_NAME_PREFIX}{sid}'
if remove_all:
self._cleanup_all_containers()
else:
try:
container = self._docker_client.containers.get(container_name)
container.remove(force=True)
if container_name in self._containers:
del self._containers[container_name]
except docker.errors.NotFound:
pass
def _cleanup_all_containers(self):
"""Clean up all containers managed by this RuntimeManager."""
containers = self._docker_client.containers.list(all=True)
for container in containers:
if container.name.startswith(CONTAINER_NAME_PREFIX):
try:
container.remove(force=True)
except docker.errors.NotFound:
pass
self._containers.clear()
def destroy_runtime(self, runtime_id: str) -> bool:
"""Destroy a runtime and its container.
Args:
runtime_id: The runtime ID to destroy
Returns:
True if the runtime was found and destroyed, False otherwise
"""
runtime = self._runtimes.get(runtime_id)
if runtime:
runtime.close()
self._cleanup_container(runtime_id)
del self._runtimes[runtime_id]
logger.info(f'Destroyed runtime with ID: {runtime_id}')
return True
return False
async def destroy_all_runtimes(self):
"""Destroy all runtimes and their containers."""
for runtime_id in list(self._runtimes.keys()):
self.destroy_runtime(runtime_id)
self._cleanup_all_containers()

View File

@@ -16,6 +16,7 @@ from openhands.runtime.impl.eventstream.eventstream_runtime import (
)
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils.command import get_remote_startup_command
from openhands.runtime.utils.request import send_request
from openhands.runtime.utils.runtime_build import (
BuildFromImageType,
prep_build_folder,
@@ -181,7 +182,7 @@ class ModalRuntime(EventStreamRuntime):
self.log('debug', 'Waiting for client to become ready...')
self.send_status_message('STATUS$WAITING_FOR_CLIENT')
self._wait_until_alive()
await call_sync_from_async(self._wait_until_alive)
self.setup_initial_env()
if not self.attach_to_existing:
@@ -290,6 +291,26 @@ echo 'export INPUTRC=/etc/inputrc' >> /etc/bash.bashrc
self.close()
raise e
@tenacity.retry(
stop=tenacity.stop_after_delay(120),
retry=tenacity.retry_if_exception_type(
(ConnectionError, requests.exceptions.ConnectionError)
),
reraise=True,
wait=tenacity.wait_fixed(2),
)
def _wait_until_alive(self):
if not self.sandbox:
raise RuntimeError('Sandbox not initialized')
with send_request(
self.session,
'GET',
f'{self.api_url}/alive',
timeout=5,
):
pass
def close(self):
"""Closes the ModalRuntime and associated objects."""
if self.log_streamer:

View File

@@ -0,0 +1,77 @@
from typing import Dict, List, Optional
from openhands.core.config import AppConfig
from openhands.core.exceptions import AgentRuntimeUnavailableError
from openhands.core.logger import openhands_logger as logger
from openhands.events import EventStream
from openhands.runtime import get_runtime_cls
from openhands.runtime.base import Runtime
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils.singleton import Singleton
class RuntimeManager(metaclass=Singleton):
def __init__(self, config: AppConfig):
self._runtimes: Dict[str, Runtime] = {}
self._config = config
@property
def config(self) -> AppConfig:
return self._config
async def create_runtime(
self,
event_stream: EventStream,
sid: str,
plugins: Optional[List[PluginRequirement]] = None,
env_vars: Optional[Dict[str, str]] = None,
status_callback=None,
attach_to_existing: bool = False,
headless_mode: bool = False,
) -> Runtime:
if sid in self._runtimes:
raise RuntimeError(f'Runtime with ID {sid} already exists')
runtime_class = get_runtime_cls(self.config.runtime)
logger.debug(f'Initializing runtime: {runtime_class.__name__}')
runtime = runtime_class(
config=self.config,
event_stream=event_stream,
sid=sid,
plugins=plugins,
env_vars=env_vars,
status_callback=status_callback,
attach_to_existing=attach_to_existing,
headless_mode=headless_mode,
)
try:
await runtime.connect()
except AgentRuntimeUnavailableError as e:
logger.error(f'Runtime initialization failed: {e}', exc_info=True)
if status_callback:
status_callback('error', 'STATUS$ERROR_RUNTIME_DISCONNECTED', str(e))
raise
self._runtimes[sid] = runtime
logger.info(f'Created runtime with ID: {sid}')
return runtime
def get_runtime(self, sid: str) -> Optional[Runtime]:
return self._runtimes.get(sid)
def list_runtimes(self) -> List[str]:
return list(self._runtimes.keys())
def destroy_runtime(self, sid: str) -> bool:
runtime = self._runtimes.get(sid)
if runtime:
runtime.close()
del self._runtimes[sid]
logger.info(f'Destroyed runtime with ID: {sid}')
return True
return False
async def destroy_all_runtimes(self):
for runtime_id in list(self._runtimes.keys()):
self.destroy_runtime(runtime_id)

View File

@@ -0,0 +1,14 @@
class Singleton(type):
"""Metaclass for creating singleton classes.
Usage:
class MyClass(metaclass=Singleton):
pass
"""
_instances: dict = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]

View File

@@ -15,6 +15,7 @@ from openhands.server.middleware import (
LocalhostCORSMiddleware,
NoCacheMiddleware,
RateLimitMiddleware,
session_manager,
)
from openhands.server.routes.conversation import app as conversation_api_router
from openhands.server.routes.feedback import app as feedback_api_router
@@ -24,7 +25,7 @@ from openhands.server.routes.new_conversation import app as new_conversation_api
from openhands.server.routes.public import app as public_api_router
from openhands.server.routes.security import app as security_api_router
from openhands.server.routes.settings import app as settings_router
from openhands.server.shared import openhands_config, session_manager
from openhands.server.shared import openhands_config
from openhands.utils.import_utils import get_impl

View File

@@ -13,8 +13,9 @@ from openhands.events.observation import (
from openhands.events.observation.agent import AgentStateChangedObservation
from openhands.events.serialization import event_to_dict
from openhands.events.stream import AsyncEventStreamWrapper
from openhands.server.middleware import session_manager
from openhands.server.session.manager import ConversationDoesNotExistError
from openhands.server.shared import config, openhands_config, session_manager, sio
from openhands.server.shared import config, openhands_config, sio
from openhands.server.types import AppMode
from openhands.storage.conversation.conversation_store import (
ConversationStore,

View File

@@ -10,9 +10,12 @@ from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import ASGIApp
from openhands.server.shared import session_manager
from openhands.server.session import SessionManager
from openhands.server.shared import config, file_store, runtime_manager, sio
from openhands.server.types import SessionMiddlewareInterface
session_manager = SessionManager(sio, config, file_store)
class LocalhostCORSMiddleware(CORSMiddleware):
"""
@@ -134,10 +137,17 @@ class AttachConversationMiddleware(SessionMiddlewareInterface):
"""
Attach the user's session based on the provided authentication token.
"""
request.state.conversation = await session_manager.attach_to_conversation(
request.state.sid
)
if not request.state.conversation:
request.state.runtime = runtime_manager.get_runtime(request.state.sid)
if request.state.runtime is None:
event_stream = await session_manager.get_event_stream(request.state.sid)
if event_stream:
request.state.runtime = await runtime_manager.create_runtime(
event_stream=event_stream,
sid=request.state.sid,
attach_to_existing=True,
headless_mode=False,
)
if not request.state.runtime:
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={'error': 'Session not found'},
@@ -148,7 +158,7 @@ class AttachConversationMiddleware(SessionMiddlewareInterface):
"""
Detach the user's session.
"""
await session_manager.detach_from_conversation(request.state.conversation)
pass
async def __call__(self, request: Request, call_next: Callable):
if not self._should_attach(request):

View File

@@ -13,7 +13,7 @@ async def get_remote_runtime_config(request: Request):
Currently, this is the session ID and runtime ID (if available).
"""
runtime = request.state.conversation.runtime
runtime = request.state.runtime
runtime_id = runtime.runtime_id if hasattr(runtime, 'runtime_id') else None
session_id = runtime.sid if hasattr(runtime, 'sid') else None
return JSONResponse(
@@ -37,7 +37,7 @@ async def get_vscode_url(request: Request):
JSONResponse: A JSON response indicating the success of the operation.
"""
try:
runtime: Runtime = request.state.conversation.runtime
runtime: Runtime = request.state.runtime
logger.debug(f'Runtime type: {type(runtime)}')
logger.debug(f'Runtime VSCode URL: {runtime.vscode_url}')
return JSONResponse(status_code=200, content={'vscode_url': runtime.vscode_url})
@@ -81,12 +81,12 @@ async def search_events(
HTTPException: If conversation is not found
ValueError: If limit is less than 1 or greater than 100
"""
if not request.state.conversation:
if not request.state.runtime:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail='Conversation not found'
)
# Get matching events from the stream
event_stream = request.state.conversation.event_stream
event_stream = request.state.runtime.event_stream
matching_events = event_stream.get_matching_events(
query=query,
event_type=event_type,

View File

@@ -35,7 +35,7 @@ async def submit_feedback(request: Request, conversation_id: str):
# and there is a function to handle the storage.
body = await request.json()
async_stream = AsyncEventStreamWrapper(
request.state.conversation.event_stream, filter_hidden=True
request.state.runtime.event_stream, filter_hidden=True
)
trajectory = []
async for event in async_stream:

View File

@@ -58,13 +58,13 @@ async def list_files(request: Request, conversation_id: str, path: str | None =
Raises:
HTTPException: If there's an error listing the files.
"""
if not request.state.conversation.runtime:
if not request.state.runtime:
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={'error': 'Runtime not yet initialized'},
)
runtime: Runtime = request.state.conversation.runtime
runtime: Runtime = request.state.runtime
try:
file_list = await call_sync_from_async(runtime.list_files, path)
except AgentRuntimeUnavailableError as e:
@@ -124,7 +124,7 @@ async def select_file(file: str, request: Request):
Raises:
HTTPException: If there's an error opening the file.
"""
runtime: Runtime = request.state.conversation.runtime
runtime: Runtime = request.state.runtime
file = os.path.join(runtime.config.workspace_mount_path_in_sandbox, file)
read_action = FileReadAction(file)
@@ -199,7 +199,7 @@ async def upload_file(request: Request, conversation_id: str, files: list[Upload
tmp_file.write(file_contents)
tmp_file.flush()
runtime: Runtime = request.state.conversation.runtime
runtime: Runtime = request.state.runtime
try:
await call_sync_from_async(
runtime.copy_to,
@@ -276,7 +276,7 @@ async def save_file(request: Request):
raise HTTPException(status_code=400, detail='Missing filePath or content')
# Save the file to the agent's runtime file store
runtime: Runtime = request.state.conversation.runtime
runtime: Runtime = request.state.runtime
file_path = os.path.join(
runtime.config.workspace_mount_path_in_sandbox, file_path
)
@@ -316,7 +316,7 @@ async def zip_current_workspace(
):
try:
logger.debug('Zipping workspace')
runtime: Runtime = request.state.conversation.runtime
runtime: Runtime = request.state.runtime
path = runtime.config.workspace_mount_path_in_sandbox
try:
zip_file = await call_sync_from_async(runtime.copy_from, path)

View File

@@ -6,9 +6,10 @@ from github import Github
from pydantic import BaseModel
from openhands.core.logger import openhands_logger as logger
from openhands.server.middleware import session_manager
from openhands.server.routes.settings import SettingsStoreImpl
from openhands.server.session.conversation_init_data import ConversationInitData
from openhands.server.shared import config, session_manager
from openhands.server.shared import config
from openhands.storage.conversation.conversation_store import (
ConversationMetadata,
ConversationStore,

View File

@@ -4,6 +4,9 @@ from fastapi import (
Request,
)
from openhands.security import SecurityAnalyzer, options
from openhands.server.shared import config
app = APIRouter(prefix='/api/conversations/{conversation_id}')
@@ -22,9 +25,10 @@ async def security_api(request: Request):
Raises:
HTTPException: If the security analyzer is not initialized.
"""
if not request.state.conversation.security_analyzer:
if not request.state.runtime:
raise HTTPException(status_code=404, detail='Security analyzer not initialized')
security_analyzer = options.SecurityAnalyzers.get(
config.security.security_analyzer or '', SecurityAnalyzer
)(request.state.runtime.event_stream)
return await request.state.conversation.security_analyzer.handle_api_request(
request
)
return await security_analyzer.handle_api_request(request)

View File

@@ -4,16 +4,16 @@ from typing import Callable, Optional
from openhands.controller import AgentController
from openhands.controller.agent import Agent
from openhands.controller.state.state import State
from openhands.core.config import AgentConfig, AppConfig, LLMConfig
from openhands.core.config import AgentConfig, LLMConfig
from openhands.core.exceptions import AgentRuntimeUnavailableError
from openhands.core.logger import openhands_logger as logger
from openhands.core.schema.agent import AgentState
from openhands.events.action import ChangeAgentStateAction
from openhands.events.event import EventSource
from openhands.events.stream import EventStream
from openhands.runtime import get_runtime_cls
from openhands.runtime.base import Runtime
from openhands.security import SecurityAnalyzer, options
from openhands.server.shared import runtime_manager
from openhands.storage.files import FileStore
from openhands.utils.async_utils import call_async_from_sync, call_sync_from_async
from openhands.utils.shutdown_listener import should_continue
@@ -59,8 +59,6 @@ class AgentSession:
async def start(
self,
runtime_name: str,
config: AppConfig,
agent: Agent,
max_iterations: int,
max_budget_per_task: float | None = None,
@@ -71,8 +69,6 @@ class AgentSession:
):
"""Starts the Agent session
Parameters:
- runtime_name: The name of the runtime associated with the session
- config:
- agent:
- max_iterations:
- max_budget_per_task:
@@ -87,8 +83,6 @@ class AgentSession:
asyncio.get_event_loop().run_in_executor(
None,
self._start_thread,
runtime_name,
config,
agent,
max_iterations,
max_budget_per_task,
@@ -107,8 +101,6 @@ class AgentSession:
async def _start(
self,
runtime_name: str,
config: AppConfig,
agent: Agent,
max_iterations: int,
max_budget_per_task: float | None = None,
@@ -121,10 +113,10 @@ class AgentSession:
logger.warning('Session closed before starting')
return
self._initializing = True
self._create_security_analyzer(config.security.security_analyzer)
self._create_security_analyzer(
runtime_manager.config.security.security_analyzer
)
await self._create_runtime(
runtime_name=runtime_name,
config=config,
agent=agent,
github_token=github_token,
selected_repository=selected_repository,
@@ -132,7 +124,7 @@ class AgentSession:
self.controller = self._create_controller(
agent,
config.security.confirmation_mode,
runtime_manager.config.security.confirmation_mode,
max_iterations,
max_budget_per_task=max_budget_per_task,
agent_to_llm_config=agent_to_llm_config,
@@ -170,7 +162,7 @@ class AgentSession:
end_state.save_to_session(self.sid, self.file_store)
await self.controller.close()
if self.runtime is not None:
self.runtime.close()
runtime_manager.destroy_runtime(self.sid)
if self.security_analyzer is not None:
await self.security_analyzer.close()
@@ -193,8 +185,6 @@ class AgentSession:
async def _create_runtime(
self,
runtime_name: str,
config: AppConfig,
agent: Agent,
github_token: str | None = None,
selected_repository: str | None = None,
@@ -202,38 +192,28 @@ class AgentSession:
"""Creates a runtime instance
Parameters:
- runtime_name: The name of the runtime associated with the session
- config:
- agent:
"""
if self.runtime is not None:
raise RuntimeError('Runtime already created')
logger.debug(f'Initializing runtime `{runtime_name}` now...')
runtime_cls = get_runtime_cls(runtime_name)
self.runtime = runtime_cls(
config=config,
event_stream=self.event_stream,
sid=self.sid,
plugins=agent.sandbox_plugins,
status_callback=self._status_callback,
headless_mode=False,
)
# FIXME: this sleep is a terrible hack.
# This is to give the websocket a second to connect, so that
# the status messages make it through to the frontend.
# We should find a better way to plumb status messages through.
await asyncio.sleep(1)
try:
await self.runtime.connect()
self.runtime = await runtime_manager.create_runtime(
event_stream=self.event_stream,
sid=self.sid,
plugins=agent.sandbox_plugins,
status_callback=self._status_callback,
headless_mode=False,
)
except AgentRuntimeUnavailableError as e:
logger.error(f'Runtime initialization failed: {e}', exc_info=True)
if self._status_callback:
self._status_callback(
'error', 'STATUS$ERROR_RUNTIME_DISCONNECTED', str(e)
)
return
self.runtime.clone_repo(github_token, selected_repository)

View File

@@ -1,46 +0,0 @@
import asyncio
from openhands.core.config import AppConfig
from openhands.events.stream import EventStream
from openhands.runtime import get_runtime_cls
from openhands.runtime.base import Runtime
from openhands.security import SecurityAnalyzer, options
from openhands.storage.files import FileStore
from openhands.utils.async_utils import call_sync_from_async
class Conversation:
sid: str
file_store: FileStore
event_stream: EventStream
runtime: Runtime
def __init__(
self,
sid: str,
file_store: FileStore,
config: AppConfig,
):
self.sid = sid
self.config = config
self.file_store = file_store
self.event_stream = EventStream(sid, file_store)
if config.security.security_analyzer:
self.security_analyzer = options.SecurityAnalyzers.get(
config.security.security_analyzer, SecurityAnalyzer
)(self.event_stream)
runtime_cls = get_runtime_cls(self.config.runtime)
self.runtime = runtime_cls(
config=config,
event_stream=self.event_stream,
sid=self.sid,
attach_to_existing=True,
headless_mode=False,
)
async def connect(self):
await self.runtime.connect()
async def disconnect(self):
asyncio.create_task(call_sync_from_async(self.runtime.close))

View File

@@ -6,12 +6,11 @@ from dataclasses import dataclass, field
import socketio
from openhands.core.config import AppConfig
from openhands.core.exceptions import AgentRuntimeUnavailableError
from openhands.core.logger import openhands_logger as logger
from openhands.events.stream import EventStream, session_exists
from openhands.server.session.conversation import Conversation
from openhands.events.stream import EventStream
from openhands.server.session.conversation_init_data import ConversationInitData
from openhands.server.session.session import ROOM_KEY, Session
from openhands.server.shared import runtime_manager
from openhands.storage.files import FileStore
from openhands.utils.async_utils import call_sync_from_async
from openhands.utils.shutdown_listener import should_continue
@@ -19,9 +18,6 @@ from openhands.utils.shutdown_listener import should_continue
_REDIS_POLL_TIMEOUT = 1.5
_CHECK_ALIVE_INTERVAL = 15
_CLEANUP_INTERVAL = 15
_CLEANUP_EXCEPTION_WAIT_TIME = 15
class ConversationDoesNotExistError(Exception):
pass
@@ -37,14 +33,6 @@ class SessionManager:
_last_alive_timestamps: dict[str, float] = field(default_factory=dict)
_redis_listen_task: asyncio.Task | None = None
_session_is_running_flags: dict[str, asyncio.Event] = field(default_factory=dict)
_active_conversations: dict[str, tuple[Conversation, int]] = field(
default_factory=dict
)
_detached_conversations: dict[str, tuple[Conversation, float]] = field(
default_factory=dict
)
_conversations_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
_cleanup_task: asyncio.Task | None = None
_has_remote_connections_flags: dict[str, asyncio.Event] = field(
default_factory=dict
)
@@ -53,16 +41,12 @@ class SessionManager:
redis_client = self._get_redis_client()
if redis_client:
self._redis_listen_task = asyncio.create_task(self._redis_subscribe())
self._cleanup_task = asyncio.create_task(self._cleanup_detached_conversations())
return self
async def __aexit__(self, exc_type, exc_value, traceback):
if self._redis_listen_task:
self._redis_listen_task.cancel()
self._redis_listen_task = None
if self._cleanup_task:
self._cleanup_task.cancel()
self._cleanup_task = None
def _get_redis_client(self):
redis_client = getattr(self.sio.manager, 'redis', None)
@@ -134,6 +118,7 @@ class SessionManager:
# Session closing event - We only get this in the event of graceful shutdown,
# which can't be guaranteed - nodes can simply vanish unexpectedly!
logger.debug(f'session_closing:{sid}')
await call_sync_from_async(runtime_manager.destroy_runtime, sid)
for (
connection_id,
local_sid,
@@ -144,96 +129,15 @@ class SessionManager:
)
await self.sio.disconnect(connection_id)
async def attach_to_conversation(self, sid: str) -> Conversation | None:
start_time = time.time()
if not await session_exists(sid, self.file_store):
return None
async with self._conversations_lock:
# Check if we have an active conversation we can reuse
if sid in self._active_conversations:
conversation, count = self._active_conversations[sid]
self._active_conversations[sid] = (conversation, count + 1)
logger.info(f'Reusing active conversation {sid}')
return conversation
# Check if we have a detached conversation we can reuse
if sid in self._detached_conversations:
conversation, _ = self._detached_conversations.pop(sid)
self._active_conversations[sid] = (conversation, 1)
logger.info(f'Reusing detached conversation {sid}')
return conversation
# Create new conversation if none exists
c = Conversation(sid, file_store=self.file_store, config=self.config)
try:
await c.connect()
except AgentRuntimeUnavailableError as e:
logger.error(f'Error connecting to conversation {c.sid}: {e}')
return None
end_time = time.time()
logger.info(
f'Conversation {c.sid} connected in {end_time - start_time} seconds'
)
self._active_conversations[sid] = (c, 1)
return c
async def join_conversation(self, sid: str, connection_id: str) -> EventStream:
logger.info(f'join_conversation:{sid}:{connection_id}')
await self.sio.enter_room(connection_id, ROOM_KEY.format(sid=sid))
self.local_connection_id_to_session_id[connection_id] = sid
event_stream = await self._get_event_stream(sid)
event_stream = await self.get_event_stream(sid)
if not event_stream:
return await self.maybe_start_agent_loop(sid)
return event_stream
async def detach_from_conversation(self, conversation: Conversation):
sid = conversation.sid
async with self._conversations_lock:
if sid in self._active_conversations:
conv, count = self._active_conversations[sid]
if count > 1:
self._active_conversations[sid] = (conv, count - 1)
return
else:
self._active_conversations.pop(sid)
self._detached_conversations[sid] = (conversation, time.time())
async def _cleanup_detached_conversations(self):
while should_continue():
if self._get_redis_client():
# Debug info for HA envs
logger.info(
f'Attached conversations: {len(self._active_conversations)}'
)
logger.info(
f'Detached conversations: {len(self._detached_conversations)}'
)
logger.info(
f'Running agent loops: {len(self._local_agent_loops_by_sid)}'
)
logger.info(
f'Local connections: {len(self.local_connection_id_to_session_id)}'
)
try:
async with self._conversations_lock:
# Create a list of items to process to avoid modifying dict during iteration
items = list(self._detached_conversations.items())
for sid, (conversation, detach_time) in items:
await conversation.disconnect()
self._detached_conversations.pop(sid, None)
await asyncio.sleep(_CLEANUP_INTERVAL)
except asyncio.CancelledError:
async with self._conversations_lock:
for conversation, _ in self._detached_conversations.values():
await conversation.disconnect()
self._detached_conversations.clear()
return
except Exception:
logger.warning('error_cleaning_detached_conversations', exc_info=True)
await asyncio.sleep(_CLEANUP_EXCEPTION_WAIT_TIME)
async def _is_agent_loop_running(self, sid: str) -> bool:
if await self._is_agent_loop_running_locally(sid):
return True
@@ -310,19 +214,22 @@ class SessionManager:
if not await self._is_agent_loop_running(sid):
logger.info(f'start_agent_loop:{sid}')
session = Session(
sid=sid, file_store=self.file_store, config=self.config, sio=self.sio
sid=sid,
file_store=self.file_store,
config=self.config,
sio=self.sio,
)
self._local_agent_loops_by_sid[sid] = session
await session.initialize_agent(conversation_init_data)
event_stream = await self._get_event_stream(sid)
event_stream = await self.get_event_stream(sid)
if not event_stream:
logger.error(f'No event stream after starting agent loop: {sid}')
raise RuntimeError(f'no_event_stream:{sid}')
return event_stream
async def _get_event_stream(self, sid: str) -> EventStream | None:
logger.info(f'_get_event_stream:{sid}')
async def get_event_stream(self, sid: str) -> EventStream | None:
logger.info(f'get_event_stream:{sid}')
session = self._local_agent_loops_by_sid.get(sid)
if session:
logger.info(f'found_local_agent_loop:{sid}')
@@ -393,6 +300,7 @@ class SessionManager:
async def _cleanup_session(self, sid: str) -> bool:
# Get local connections
logger.info(f'_cleanup_session:{sid}')
await call_sync_from_async(runtime_manager.destroy_runtime, sid)
has_local_connections = next(
(True for v in self.local_connection_id_to_session_id.values() if v == sid),
False,

View File

@@ -52,7 +52,9 @@ class Session:
self.last_active_ts = int(time.time())
self.file_store = file_store
self.agent_session = AgentSession(
sid, file_store, status_callback=self.queue_status_message
sid,
file_store,
status_callback=self.queue_status_message,
)
self.agent_session.event_stream.subscribe(
EventStreamSubscriber.SERVER, self.on_event, self.sid
@@ -131,8 +133,6 @@ class Session:
try:
await self.agent_session.start(
runtime_name=self.config.runtime,
config=self.config,
agent=agent,
max_iterations=max_iterations,
max_budget_per_task=self.config.max_budget_per_task,

View File

@@ -4,8 +4,8 @@ import socketio
from dotenv import load_dotenv
from openhands.core.config import load_app_config
from openhands.runtime.runtime_manager import RuntimeManager
from openhands.server.config.openhands_config import load_openhands_config
from openhands.server.session import SessionManager
from openhands.storage import get_file_store
load_dotenv()
@@ -27,4 +27,4 @@ sio = socketio.AsyncServer(
async_mode='asgi', cors_allowed_origins='*', client_manager=client_manager
)
session_manager = SessionManager(sio, config, file_store)
runtime_manager = RuntimeManager(config)

View File

@@ -326,7 +326,8 @@ async def test_complete_runtime():
@pytest.mark.asyncio
async def test_process_issue(mock_output_dir, mock_prompt_template):
# Mock dependencies
mock_create_runtime = MagicMock()
mock_runtime = MagicMock(connect=AsyncMock())
mock_create_runtime = AsyncMock(return_value=mock_runtime)
mock_initialize_runtime = AsyncMock()
mock_run_controller = AsyncMock()
mock_complete_runtime = AsyncMock()
@@ -408,7 +409,9 @@ async def test_process_issue(mock_output_dir, mock_prompt_template):
handler_instance.reset_mock()
# Mock return values
mock_create_runtime.return_value = MagicMock(connect=AsyncMock())
mock_runtime = MagicMock(connect=AsyncMock())
mock_create_runtime.return_value = AsyncMock()
mock_create_runtime.return_value.__aenter__.return_value = mock_runtime
if test_case['run_controller_raises']:
mock_run_controller.side_effect = test_case['run_controller_raises']
else: