mirror of
https://github.com/All-Hands-AI/OpenHands.git
synced 2026-04-29 03:00:45 -04:00
Compare commits
43 Commits
feature/ru
...
feature/ru
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
86403a9446 | ||
|
|
3ecd214d69 | ||
|
|
c9a6402103 | ||
|
|
33a1dd89e7 | ||
|
|
2bdad7d910 | ||
|
|
d3f726df51 | ||
|
|
fc9bcc3511 | ||
|
|
5eecddbf13 | ||
|
|
333f9a5bdf | ||
|
|
9b61f38c4a | ||
|
|
076cb2d055 | ||
|
|
0d454d46f2 | ||
|
|
e7685f185c | ||
|
|
749da6367e | ||
|
|
4b497c8e64 | ||
|
|
42730014d5 | ||
|
|
81110671b2 | ||
|
|
25f3349e1a | ||
|
|
05dd9d7a06 | ||
|
|
42b2ccb985 | ||
|
|
c1278c9835 | ||
|
|
4bbb3572a6 | ||
|
|
b4bd940a63 | ||
|
|
cbd4f94a2d | ||
|
|
ef95ddc680 | ||
|
|
a287e5388c | ||
|
|
30f6166bf6 | ||
|
|
1f706fe2f2 | ||
|
|
4123c65317 | ||
|
|
6dfd54be9f | ||
|
|
8eef9b2563 | ||
|
|
5d5978c6cb | ||
|
|
1a17972b4e | ||
|
|
4de7a4f85d | ||
|
|
8befeca41d | ||
|
|
918139e886 | ||
|
|
6374174095 | ||
|
|
138f6932eb | ||
|
|
7181efd26d | ||
|
|
3a52360ab0 | ||
|
|
cd9eb1d85c | ||
|
|
ada657b476 | ||
|
|
b630d65626 |
@@ -35,8 +35,8 @@ from openhands.events.observation import (
|
|||||||
NullObservation,
|
NullObservation,
|
||||||
)
|
)
|
||||||
from openhands.llm.llm import LLM
|
from openhands.llm.llm import LLM
|
||||||
from openhands.runtime import get_runtime_cls
|
|
||||||
from openhands.runtime.base import Runtime
|
from openhands.runtime.base import Runtime
|
||||||
|
from openhands.runtime.runtime_manager import RuntimeManager
|
||||||
from openhands.security import SecurityAnalyzer, options
|
from openhands.security import SecurityAnalyzer, options
|
||||||
from openhands.storage import get_file_store
|
from openhands.storage import get_file_store
|
||||||
|
|
||||||
@@ -125,9 +125,8 @@ async def main():
|
|||||||
file_store = get_file_store(config.file_store, config.file_store_path)
|
file_store = get_file_store(config.file_store, config.file_store_path)
|
||||||
event_stream = EventStream(sid, file_store)
|
event_stream = EventStream(sid, file_store)
|
||||||
|
|
||||||
runtime_cls = get_runtime_cls(config.runtime)
|
runtime_manager = RuntimeManager(config)
|
||||||
runtime: Runtime = runtime_cls( # noqa: F841
|
runtime: Runtime = await runtime_manager.create_runtime(
|
||||||
config=config,
|
|
||||||
event_stream=event_stream,
|
event_stream=event_stream,
|
||||||
sid=sid,
|
sid=sid,
|
||||||
plugins=agent_cls.sandbox_plugins,
|
plugins=agent_cls.sandbox_plugins,
|
||||||
@@ -195,8 +194,6 @@ async def main():
|
|||||||
|
|
||||||
event_stream.subscribe(EventStreamSubscriber.MAIN, on_event, str(uuid4()))
|
event_stream.subscribe(EventStreamSubscriber.MAIN, on_event, str(uuid4()))
|
||||||
|
|
||||||
await runtime.connect()
|
|
||||||
|
|
||||||
asyncio.create_task(prompt_for_next_task())
|
asyncio.create_task(prompt_for_next_task())
|
||||||
|
|
||||||
await run_agent_until_done(
|
await run_agent_until_done(
|
||||||
|
|||||||
@@ -26,8 +26,8 @@ from openhands.events.event import Event
|
|||||||
from openhands.events.observation import AgentStateChangedObservation
|
from openhands.events.observation import AgentStateChangedObservation
|
||||||
from openhands.events.serialization.event import event_to_trajectory
|
from openhands.events.serialization.event import event_to_trajectory
|
||||||
from openhands.llm.llm import LLM
|
from openhands.llm.llm import LLM
|
||||||
from openhands.runtime import get_runtime_cls
|
|
||||||
from openhands.runtime.base import Runtime
|
from openhands.runtime.base import Runtime
|
||||||
|
from openhands.runtime.runtime_manager import RuntimeManager
|
||||||
from openhands.storage import get_file_store
|
from openhands.storage import get_file_store
|
||||||
|
|
||||||
|
|
||||||
@@ -51,7 +51,7 @@ def read_task_from_stdin() -> str:
|
|||||||
return sys.stdin.read()
|
return sys.stdin.read()
|
||||||
|
|
||||||
|
|
||||||
def create_runtime(
|
async def create_runtime(
|
||||||
config: AppConfig,
|
config: AppConfig,
|
||||||
sid: str | None = None,
|
sid: str | None = None,
|
||||||
headless_mode: bool = True,
|
headless_mode: bool = True,
|
||||||
@@ -77,10 +77,8 @@ def create_runtime(
|
|||||||
agent_cls = openhands.agenthub.Agent.get_cls(config.default_agent)
|
agent_cls = openhands.agenthub.Agent.get_cls(config.default_agent)
|
||||||
|
|
||||||
# runtime and tools
|
# runtime and tools
|
||||||
runtime_cls = get_runtime_cls(config.runtime)
|
runtime_manager = RuntimeManager(config)
|
||||||
logger.debug(f'Initializing runtime: {runtime_cls.__name__}')
|
runtime: Runtime = await runtime_manager.create_runtime(
|
||||||
runtime: Runtime = runtime_cls(
|
|
||||||
config=config,
|
|
||||||
event_stream=event_stream,
|
event_stream=event_stream,
|
||||||
sid=session_id,
|
sid=session_id,
|
||||||
plugins=agent_cls.sandbox_plugins,
|
plugins=agent_cls.sandbox_plugins,
|
||||||
@@ -129,8 +127,7 @@ async def run_controller(
|
|||||||
sid = sid or generate_sid(config)
|
sid = sid or generate_sid(config)
|
||||||
|
|
||||||
if runtime is None:
|
if runtime is None:
|
||||||
runtime = create_runtime(config, sid=sid, headless_mode=headless_mode)
|
runtime = await create_runtime(config, sid=sid, headless_mode=headless_mode)
|
||||||
await runtime.connect()
|
|
||||||
|
|
||||||
event_stream = runtime.event_stream
|
event_stream = runtime.event_stream
|
||||||
|
|
||||||
|
|||||||
@@ -199,8 +199,7 @@ async def process_issue(
|
|||||||
)
|
)
|
||||||
config.set_llm_config(llm_config)
|
config.set_llm_config(llm_config)
|
||||||
|
|
||||||
runtime = create_runtime(config)
|
runtime = await create_runtime(config)
|
||||||
await runtime.connect()
|
|
||||||
|
|
||||||
async def on_event(evt):
|
async def on_event(evt):
|
||||||
logger.info(evt)
|
logger.info(evt)
|
||||||
|
|||||||
21
openhands/runtime/container.py
Normal file
21
openhands/runtime/container.py
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
"""Container-related types and utilities."""
|
||||||
|
|
||||||
|
import docker
|
||||||
|
|
||||||
|
|
||||||
|
class ContainerInfo:
|
||||||
|
"""Information about a running container that a Runtime needs."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
container_id: str,
|
||||||
|
api_url: str,
|
||||||
|
host_port: int,
|
||||||
|
container_port: int,
|
||||||
|
container: docker.models.containers.Container,
|
||||||
|
):
|
||||||
|
self.container_id = container_id
|
||||||
|
self.api_url = api_url
|
||||||
|
self.host_port = host_port
|
||||||
|
self.container_port = container_port
|
||||||
|
self.container = container
|
||||||
@@ -1,26 +1,17 @@
|
|||||||
import atexit
|
|
||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
import threading
|
import threading
|
||||||
from functools import lru_cache
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Callable
|
from typing import Callable
|
||||||
from zipfile import ZipFile
|
from zipfile import ZipFile
|
||||||
|
|
||||||
import docker
|
|
||||||
import requests
|
import requests
|
||||||
import tenacity
|
|
||||||
|
|
||||||
from openhands.core.config import AppConfig
|
from openhands.core.config import AppConfig
|
||||||
from openhands.core.exceptions import (
|
from openhands.core.exceptions import (
|
||||||
AgentRuntimeDisconnectedError,
|
|
||||||
AgentRuntimeError,
|
AgentRuntimeError,
|
||||||
AgentRuntimeNotFoundError,
|
|
||||||
AgentRuntimeNotReadyError,
|
|
||||||
AgentRuntimeTimeoutError,
|
AgentRuntimeTimeoutError,
|
||||||
)
|
)
|
||||||
from openhands.core.logger import DEBUG
|
|
||||||
from openhands.core.logger import openhands_logger as logger
|
|
||||||
from openhands.events import EventStream
|
from openhands.events import EventStream
|
||||||
from openhands.events.action import (
|
from openhands.events.action import (
|
||||||
ActionConfirmationStatus,
|
ActionConfirmationStatus,
|
||||||
@@ -42,24 +33,11 @@ from openhands.events.observation import (
|
|||||||
from openhands.events.serialization import event_to_dict, observation_from_dict
|
from openhands.events.serialization import event_to_dict, observation_from_dict
|
||||||
from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
|
from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
|
||||||
from openhands.runtime.base import Runtime
|
from openhands.runtime.base import Runtime
|
||||||
from openhands.runtime.builder import DockerRuntimeBuilder
|
from openhands.runtime.container import ContainerInfo
|
||||||
from openhands.runtime.impl.eventstream.containers import remove_all_containers
|
|
||||||
from openhands.runtime.plugins import PluginRequirement
|
from openhands.runtime.plugins import PluginRequirement
|
||||||
from openhands.runtime.utils import find_available_tcp_port
|
|
||||||
from openhands.runtime.utils.log_streamer import LogStreamer
|
from openhands.runtime.utils.log_streamer import LogStreamer
|
||||||
from openhands.runtime.utils.request import send_request
|
from openhands.runtime.utils.request import send_request
|
||||||
from openhands.runtime.utils.runtime_build import build_runtime_image
|
|
||||||
from openhands.utils.async_utils import call_sync_from_async
|
from openhands.utils.async_utils import call_sync_from_async
|
||||||
from openhands.utils.tenacity_stop import stop_if_should_exit
|
|
||||||
|
|
||||||
CONTAINER_NAME_PREFIX = 'openhands-runtime-'
|
|
||||||
|
|
||||||
|
|
||||||
def remove_all_runtime_containers():
|
|
||||||
remove_all_containers(CONTAINER_NAME_PREFIX)
|
|
||||||
|
|
||||||
|
|
||||||
_atexit_registered = False
|
|
||||||
|
|
||||||
|
|
||||||
class EventStreamRuntime(Runtime):
|
class EventStreamRuntime(Runtime):
|
||||||
@@ -74,8 +52,6 @@ class EventStreamRuntime(Runtime):
|
|||||||
env_vars (dict[str, str] | None, optional): Environment variables to set. Defaults to None.
|
env_vars (dict[str, str] | None, optional): Environment variables to set. Defaults to None.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Need to provide this method to allow inheritors to init the Runtime
|
|
||||||
# without initting the EventStreamRuntime.
|
|
||||||
def init_base_runtime(
|
def init_base_runtime(
|
||||||
self,
|
self,
|
||||||
config: AppConfig,
|
config: AppConfig,
|
||||||
@@ -108,31 +84,15 @@ class EventStreamRuntime(Runtime):
|
|||||||
status_callback: Callable | None = None,
|
status_callback: Callable | None = None,
|
||||||
attach_to_existing: bool = False,
|
attach_to_existing: bool = False,
|
||||||
headless_mode: bool = True,
|
headless_mode: bool = True,
|
||||||
|
container_info: ContainerInfo | None = None,
|
||||||
):
|
):
|
||||||
global _atexit_registered
|
|
||||||
if not _atexit_registered:
|
|
||||||
_atexit_registered = True
|
|
||||||
atexit.register(remove_all_runtime_containers)
|
|
||||||
|
|
||||||
self.config = config
|
self.config = config
|
||||||
self._host_port = 30000 # initial dummy value
|
self._vscode_url: str | None = None
|
||||||
self._container_port = 30001 # initial dummy value
|
|
||||||
self._vscode_url: str | None = None # initial dummy value
|
|
||||||
self._runtime_initialized: bool = False
|
self._runtime_initialized: bool = False
|
||||||
self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
|
|
||||||
self.session = requests.Session()
|
self.session = requests.Session()
|
||||||
self.status_callback = status_callback
|
self.status_callback = status_callback
|
||||||
|
self.container_info = container_info
|
||||||
self.docker_client: docker.DockerClient = self._init_docker_client()
|
|
||||||
self.base_container_image = self.config.sandbox.base_container_image
|
|
||||||
self.runtime_container_image = self.config.sandbox.runtime_container_image
|
|
||||||
self.container_name = CONTAINER_NAME_PREFIX + sid
|
|
||||||
self.container = None
|
|
||||||
self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time
|
self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time
|
||||||
|
|
||||||
self.runtime_builder = DockerRuntimeBuilder(self.docker_client)
|
|
||||||
|
|
||||||
# Buffer for container logs
|
|
||||||
self.log_streamer: LogStreamer | None = None
|
self.log_streamer: LogStreamer | None = None
|
||||||
|
|
||||||
self.init_base_runtime(
|
self.init_base_runtime(
|
||||||
@@ -154,52 +114,22 @@ class EventStreamRuntime(Runtime):
|
|||||||
)
|
)
|
||||||
|
|
||||||
async def connect(self):
|
async def connect(self):
|
||||||
|
"""Initialize the runtime with the provided container."""
|
||||||
|
if not self.container_info:
|
||||||
|
raise RuntimeError('Container info not provided')
|
||||||
|
|
||||||
self.send_status_message('STATUS$STARTING_RUNTIME')
|
self.send_status_message('STATUS$STARTING_RUNTIME')
|
||||||
try:
|
self.log_streamer = LogStreamer(self.container_info.container, self.log)
|
||||||
await call_sync_from_async(self._attach_to_container)
|
|
||||||
except docker.errors.NotFound as e:
|
|
||||||
if self.attach_to_existing:
|
|
||||||
self.log(
|
|
||||||
'error',
|
|
||||||
f'Container {self.container_name} not found.',
|
|
||||||
)
|
|
||||||
raise e
|
|
||||||
if self.runtime_container_image is None:
|
|
||||||
if self.base_container_image is None:
|
|
||||||
raise ValueError(
|
|
||||||
'Neither runtime container image nor base container image is set'
|
|
||||||
)
|
|
||||||
self.send_status_message('STATUS$STARTING_CONTAINER')
|
|
||||||
self.runtime_container_image = build_runtime_image(
|
|
||||||
self.base_container_image,
|
|
||||||
self.runtime_builder,
|
|
||||||
platform=self.config.sandbox.platform,
|
|
||||||
extra_deps=self.config.sandbox.runtime_extra_deps,
|
|
||||||
force_rebuild=self.config.sandbox.force_rebuild_runtime,
|
|
||||||
extra_build_args=self.config.sandbox.runtime_extra_build_args,
|
|
||||||
)
|
|
||||||
|
|
||||||
self.log(
|
|
||||||
'info', f'Starting runtime with image: {self.runtime_container_image}'
|
|
||||||
)
|
|
||||||
await call_sync_from_async(self._init_container)
|
|
||||||
self.log(
|
|
||||||
'info',
|
|
||||||
f'Container started: {self.container_name}. VSCode URL: {self.vscode_url}',
|
|
||||||
)
|
|
||||||
|
|
||||||
self.log_streamer = LogStreamer(self.container, self.log)
|
|
||||||
|
|
||||||
if not self.attach_to_existing:
|
if not self.attach_to_existing:
|
||||||
self.log('info', f'Waiting for client to become ready at {self.api_url}...')
|
self.log(
|
||||||
|
'info',
|
||||||
|
f'Waiting for client to become ready at {self.container_info.api_url}...',
|
||||||
|
)
|
||||||
self.send_status_message('STATUS$WAITING_FOR_CLIENT')
|
self.send_status_message('STATUS$WAITING_FOR_CLIENT')
|
||||||
|
|
||||||
await call_sync_from_async(self._wait_until_alive)
|
|
||||||
|
|
||||||
if not self.attach_to_existing:
|
if not self.attach_to_existing:
|
||||||
self.log('info', 'Runtime is ready.')
|
self.log('info', 'Runtime is ready.')
|
||||||
|
|
||||||
if not self.attach_to_existing:
|
|
||||||
await call_sync_from_async(self.setup_initial_env)
|
await call_sync_from_async(self.setup_initial_env)
|
||||||
|
|
||||||
self.log(
|
self.log(
|
||||||
@@ -210,207 +140,24 @@ class EventStreamRuntime(Runtime):
|
|||||||
self.send_status_message(' ')
|
self.send_status_message(' ')
|
||||||
self._runtime_initialized = True
|
self._runtime_initialized = True
|
||||||
|
|
||||||
@staticmethod
|
def close(self):
|
||||||
@lru_cache(maxsize=1)
|
"""Closes the EventStreamRuntime and associated objects."""
|
||||||
def _init_docker_client() -> docker.DockerClient:
|
|
||||||
try:
|
|
||||||
return docker.from_env()
|
|
||||||
except Exception as ex:
|
|
||||||
logger.error(
|
|
||||||
'Launch docker client failed. Please make sure you have installed docker and started docker desktop/daemon.',
|
|
||||||
)
|
|
||||||
raise ex
|
|
||||||
|
|
||||||
def _init_container(self):
|
|
||||||
self.log('debug', 'Preparing to start container...')
|
|
||||||
self.send_status_message('STATUS$PREPARING_CONTAINER')
|
|
||||||
plugin_arg = ''
|
|
||||||
if self.plugins is not None and len(self.plugins) > 0:
|
|
||||||
plugin_arg = (
|
|
||||||
f'--plugins {" ".join([plugin.name for plugin in self.plugins])} '
|
|
||||||
)
|
|
||||||
self._host_port = self._find_available_port()
|
|
||||||
self._container_port = (
|
|
||||||
self._host_port
|
|
||||||
) # in future this might differ from host port
|
|
||||||
self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
|
|
||||||
|
|
||||||
use_host_network = self.config.sandbox.use_host_network
|
|
||||||
network_mode: str | None = 'host' if use_host_network else None
|
|
||||||
|
|
||||||
port_mapping: dict[str, list[dict[str, str]]] | None = (
|
|
||||||
None
|
|
||||||
if use_host_network
|
|
||||||
else {f'{self._container_port}/tcp': [{'HostPort': str(self._host_port)}]}
|
|
||||||
)
|
|
||||||
|
|
||||||
if use_host_network:
|
|
||||||
self.log(
|
|
||||||
'warn',
|
|
||||||
'Using host network mode. If you are using MacOS, please make sure you have the latest version of Docker Desktop and enabled host network feature: https://docs.docker.com/network/drivers/host/#docker-desktop',
|
|
||||||
)
|
|
||||||
|
|
||||||
# Combine environment variables
|
|
||||||
environment = {
|
|
||||||
'port': str(self._container_port),
|
|
||||||
'PYTHONUNBUFFERED': 1,
|
|
||||||
}
|
|
||||||
if self.config.debug or DEBUG:
|
|
||||||
environment['DEBUG'] = 'true'
|
|
||||||
|
|
||||||
if self.vscode_enabled:
|
|
||||||
# vscode is on port +1 from container port
|
|
||||||
if isinstance(port_mapping, dict):
|
|
||||||
port_mapping[f'{self._container_port + 1}/tcp'] = [
|
|
||||||
{'HostPort': str(self._host_port + 1)}
|
|
||||||
]
|
|
||||||
|
|
||||||
self.log('debug', f'Workspace Base: {self.config.workspace_base}')
|
|
||||||
if (
|
|
||||||
self.config.workspace_mount_path is not None
|
|
||||||
and self.config.workspace_mount_path_in_sandbox is not None
|
|
||||||
):
|
|
||||||
# e.g. result would be: {"/home/user/openhands/workspace": {'bind': "/workspace", 'mode': 'rw'}}
|
|
||||||
volumes = {
|
|
||||||
self.config.workspace_mount_path: {
|
|
||||||
'bind': self.config.workspace_mount_path_in_sandbox,
|
|
||||||
'mode': 'rw',
|
|
||||||
}
|
|
||||||
}
|
|
||||||
logger.debug(f'Mount dir: {self.config.workspace_mount_path}')
|
|
||||||
else:
|
|
||||||
logger.debug(
|
|
||||||
'Mount dir is not set, will not mount the workspace directory to the container'
|
|
||||||
)
|
|
||||||
volumes = None
|
|
||||||
self.log(
|
|
||||||
'debug',
|
|
||||||
f'Sandbox workspace: {self.config.workspace_mount_path_in_sandbox}',
|
|
||||||
)
|
|
||||||
|
|
||||||
if self.config.sandbox.browsergym_eval_env is not None:
|
|
||||||
browsergym_arg = (
|
|
||||||
f'--browsergym-eval-env {self.config.sandbox.browsergym_eval_env}'
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
browsergym_arg = ''
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.container = self.docker_client.containers.run(
|
|
||||||
self.runtime_container_image,
|
|
||||||
command=(
|
|
||||||
f'/openhands/micromamba/bin/micromamba run -n openhands '
|
|
||||||
f'poetry run '
|
|
||||||
f'python -u -m openhands.runtime.action_execution_server {self._container_port} '
|
|
||||||
f'--working-dir "{self.config.workspace_mount_path_in_sandbox}" '
|
|
||||||
f'{plugin_arg}'
|
|
||||||
f'--username {"openhands" if self.config.run_as_openhands else "root"} '
|
|
||||||
f'--user-id {self.config.sandbox.user_id} '
|
|
||||||
f'{browsergym_arg}'
|
|
||||||
),
|
|
||||||
network_mode=network_mode,
|
|
||||||
ports=port_mapping,
|
|
||||||
working_dir='/openhands/code/', # do not change this!
|
|
||||||
name=self.container_name,
|
|
||||||
detach=True,
|
|
||||||
environment=environment,
|
|
||||||
volumes=volumes,
|
|
||||||
)
|
|
||||||
self.log('debug', f'Container started. Server url: {self.api_url}')
|
|
||||||
self.send_status_message('STATUS$CONTAINER_STARTED')
|
|
||||||
except docker.errors.APIError as e:
|
|
||||||
if '409' in str(e):
|
|
||||||
self.log(
|
|
||||||
'warning',
|
|
||||||
f'Container {self.container_name} already exists. Removing...',
|
|
||||||
)
|
|
||||||
remove_all_containers(self.container_name)
|
|
||||||
return self._init_container()
|
|
||||||
|
|
||||||
else:
|
|
||||||
self.log(
|
|
||||||
'error',
|
|
||||||
f'Error: Instance {self.container_name} FAILED to start container!\n',
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
self.log(
|
|
||||||
'error',
|
|
||||||
f'Error: Instance {self.container_name} FAILED to start container!\n',
|
|
||||||
)
|
|
||||||
self.log('error', str(e))
|
|
||||||
self.close()
|
|
||||||
raise e
|
|
||||||
|
|
||||||
def _attach_to_container(self):
|
|
||||||
self._container_port = 0
|
|
||||||
self.container = self.docker_client.containers.get(self.container_name)
|
|
||||||
for port in self.container.attrs['NetworkSettings']['Ports']: # type: ignore
|
|
||||||
self._container_port = int(port.split('/')[0])
|
|
||||||
break
|
|
||||||
self._host_port = self._container_port
|
|
||||||
self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
|
|
||||||
self.log(
|
|
||||||
'debug',
|
|
||||||
f'attached to container: {self.container_name} {self._container_port} {self.api_url}',
|
|
||||||
)
|
|
||||||
|
|
||||||
@tenacity.retry(
|
|
||||||
stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
|
|
||||||
retry=tenacity.retry_if_exception_type(
|
|
||||||
(ConnectionError, requests.exceptions.ConnectionError)
|
|
||||||
),
|
|
||||||
reraise=True,
|
|
||||||
wait=tenacity.wait_fixed(2),
|
|
||||||
)
|
|
||||||
def _wait_until_alive(self):
|
|
||||||
try:
|
|
||||||
container = self.docker_client.containers.get(self.container_name)
|
|
||||||
if container.status == 'exited':
|
|
||||||
raise AgentRuntimeDisconnectedError(
|
|
||||||
f'Container {self.container_name} has exited.'
|
|
||||||
)
|
|
||||||
except docker.errors.NotFound:
|
|
||||||
raise AgentRuntimeNotFoundError(
|
|
||||||
f'Container {self.container_name} not found.'
|
|
||||||
)
|
|
||||||
|
|
||||||
if not self.log_streamer:
|
|
||||||
raise AgentRuntimeNotReadyError('Runtime client is not ready.')
|
|
||||||
|
|
||||||
with send_request(
|
|
||||||
self.session,
|
|
||||||
'GET',
|
|
||||||
f'{self.api_url}/alive',
|
|
||||||
timeout=5,
|
|
||||||
):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def close(self, rm_all_containers: bool | None = None):
|
|
||||||
"""Closes the EventStreamRuntime and associated objects
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- rm_all_containers (bool): Whether to remove all containers with the 'openhands-sandbox-' prefix
|
|
||||||
"""
|
|
||||||
if self.log_streamer:
|
if self.log_streamer:
|
||||||
self.log_streamer.close()
|
self.log_streamer.close()
|
||||||
|
|
||||||
if self.session:
|
if self.session:
|
||||||
self.session.close()
|
self.session.close()
|
||||||
|
|
||||||
if rm_all_containers is None:
|
|
||||||
rm_all_containers = self.config.sandbox.rm_all_containers
|
|
||||||
|
|
||||||
if self.config.sandbox.keep_runtime_alive or self.attach_to_existing:
|
|
||||||
return
|
|
||||||
close_prefix = (
|
|
||||||
CONTAINER_NAME_PREFIX if rm_all_containers else self.container_name
|
|
||||||
)
|
|
||||||
remove_all_containers(close_prefix)
|
|
||||||
|
|
||||||
def run_action(self, action: Action) -> Observation:
|
def run_action(self, action: Action) -> Observation:
|
||||||
if isinstance(action, FileEditAction):
|
if isinstance(action, FileEditAction):
|
||||||
return self.edit(action)
|
return self.edit(action)
|
||||||
|
|
||||||
|
if not self.container_info:
|
||||||
|
return ErrorObservation(
|
||||||
|
'Runtime container is not initialized.',
|
||||||
|
error_id='AGENT_ERROR$RUNTIME_NOT_READY',
|
||||||
|
)
|
||||||
|
|
||||||
# set timeout to default if not set
|
# set timeout to default if not set
|
||||||
if action.timeout is None:
|
if action.timeout is None:
|
||||||
action.timeout = self.config.sandbox.timeout
|
action.timeout = self.config.sandbox.timeout
|
||||||
@@ -446,7 +193,7 @@ class EventStreamRuntime(Runtime):
|
|||||||
with send_request(
|
with send_request(
|
||||||
self.session,
|
self.session,
|
||||||
'POST',
|
'POST',
|
||||||
f'{self.api_url}/execute_action',
|
f'{self.container_info.api_url}/execute_action',
|
||||||
json={'action': event_to_dict(action)},
|
json={'action': event_to_dict(action)},
|
||||||
# wait a few more seconds to get the timeout error from client side
|
# wait a few more seconds to get the timeout error from client side
|
||||||
timeout=action.timeout + 5,
|
timeout=action.timeout + 5,
|
||||||
@@ -479,16 +226,15 @@ class EventStreamRuntime(Runtime):
|
|||||||
def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
|
def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
|
||||||
return self.run_action(action)
|
return self.run_action(action)
|
||||||
|
|
||||||
# ====================================================================
|
|
||||||
# Implement these methods (for file operations) in the subclass
|
|
||||||
# ====================================================================
|
|
||||||
|
|
||||||
def copy_to(
|
def copy_to(
|
||||||
self, host_src: str, sandbox_dest: str, recursive: bool = False
|
self, host_src: str, sandbox_dest: str, recursive: bool = False
|
||||||
) -> None:
|
) -> None:
|
||||||
if not os.path.exists(host_src):
|
if not os.path.exists(host_src):
|
||||||
raise FileNotFoundError(f'Source file {host_src} does not exist')
|
raise FileNotFoundError(f'Source file {host_src} does not exist')
|
||||||
|
|
||||||
|
if not self.container_info:
|
||||||
|
raise RuntimeError('Runtime container is not initialized.')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if recursive:
|
if recursive:
|
||||||
# For recursive copy, create a zip file
|
# For recursive copy, create a zip file
|
||||||
@@ -516,7 +262,7 @@ class EventStreamRuntime(Runtime):
|
|||||||
with send_request(
|
with send_request(
|
||||||
self.session,
|
self.session,
|
||||||
'POST',
|
'POST',
|
||||||
f'{self.api_url}/upload_file',
|
f'{self.container_info.api_url}/upload_file',
|
||||||
files=upload_data,
|
files=upload_data,
|
||||||
params=params,
|
params=params,
|
||||||
timeout=300,
|
timeout=300,
|
||||||
@@ -539,6 +285,8 @@ class EventStreamRuntime(Runtime):
|
|||||||
|
|
||||||
If path is None, list files in the sandbox's initial working directory (e.g., /workspace).
|
If path is None, list files in the sandbox's initial working directory (e.g., /workspace).
|
||||||
"""
|
"""
|
||||||
|
if not self.container_info:
|
||||||
|
raise RuntimeError('Runtime container is not initialized.')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
data = {}
|
data = {}
|
||||||
@@ -548,7 +296,7 @@ class EventStreamRuntime(Runtime):
|
|||||||
with send_request(
|
with send_request(
|
||||||
self.session,
|
self.session,
|
||||||
'POST',
|
'POST',
|
||||||
f'{self.api_url}/list_files',
|
f'{self.container_info.api_url}/list_files',
|
||||||
json=data,
|
json=data,
|
||||||
timeout=10,
|
timeout=10,
|
||||||
) as response:
|
) as response:
|
||||||
@@ -560,13 +308,15 @@ class EventStreamRuntime(Runtime):
|
|||||||
|
|
||||||
def copy_from(self, path: str) -> Path:
|
def copy_from(self, path: str) -> Path:
|
||||||
"""Zip all files in the sandbox and return as a stream of bytes."""
|
"""Zip all files in the sandbox and return as a stream of bytes."""
|
||||||
|
if not self.container_info:
|
||||||
|
raise RuntimeError('Runtime container is not initialized.')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
params = {'path': path}
|
params = {'path': path}
|
||||||
with send_request(
|
with send_request(
|
||||||
self.session,
|
self.session,
|
||||||
'GET',
|
'GET',
|
||||||
f'{self.api_url}/download_files',
|
f'{self.container_info.api_url}/download_files',
|
||||||
params=params,
|
params=params,
|
||||||
stream=True,
|
stream=True,
|
||||||
timeout=30,
|
timeout=30,
|
||||||
@@ -579,26 +329,9 @@ class EventStreamRuntime(Runtime):
|
|||||||
except requests.Timeout:
|
except requests.Timeout:
|
||||||
raise TimeoutError('Copy operation timed out')
|
raise TimeoutError('Copy operation timed out')
|
||||||
|
|
||||||
def _is_port_in_use_docker(self, port):
|
|
||||||
containers = self.docker_client.containers.list()
|
|
||||||
for container in containers:
|
|
||||||
container_ports = container.ports
|
|
||||||
if str(port) in str(container_ports):
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _find_available_port(self, max_attempts=5):
|
|
||||||
port = 39999
|
|
||||||
for _ in range(max_attempts):
|
|
||||||
port = find_available_tcp_port(30000, 39999)
|
|
||||||
if not self._is_port_in_use_docker(port):
|
|
||||||
return port
|
|
||||||
# If no port is found after max_attempts, return the last tried port
|
|
||||||
return port
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def vscode_url(self) -> str | None:
|
def vscode_url(self) -> str | None:
|
||||||
if self.vscode_enabled and self._runtime_initialized:
|
if self.vscode_enabled and self._runtime_initialized and self.container_info:
|
||||||
if (
|
if (
|
||||||
hasattr(self, '_vscode_url') and self._vscode_url is not None
|
hasattr(self, '_vscode_url') and self._vscode_url is not None
|
||||||
): # cached value
|
): # cached value
|
||||||
@@ -607,14 +340,14 @@ class EventStreamRuntime(Runtime):
|
|||||||
with send_request(
|
with send_request(
|
||||||
self.session,
|
self.session,
|
||||||
'GET',
|
'GET',
|
||||||
f'{self.api_url}/vscode/connection_token',
|
f'{self.container_info.api_url}/vscode/connection_token',
|
||||||
timeout=10,
|
timeout=10,
|
||||||
) as response:
|
) as response:
|
||||||
response_json = response.json()
|
response_json = response.json()
|
||||||
assert isinstance(response_json, dict)
|
assert isinstance(response_json, dict)
|
||||||
if response_json['token'] is None:
|
if response_json['token'] is None:
|
||||||
return None
|
return None
|
||||||
self._vscode_url = f'http://localhost:{self._host_port + 1}/?tkn={response_json["token"]}&folder={self.config.workspace_mount_path_in_sandbox}'
|
self._vscode_url = f'http://localhost:{self.container_info.host_port + 1}/?tkn={response_json["token"]}&folder={self.config.workspace_mount_path_in_sandbox}'
|
||||||
self.log(
|
self.log(
|
||||||
'debug',
|
'debug',
|
||||||
f'VSCode URL: {self._vscode_url}',
|
f'VSCode URL: {self._vscode_url}',
|
||||||
|
|||||||
456
openhands/runtime/impl/eventstream/runtime_manager.py
Normal file
456
openhands/runtime/impl/eventstream/runtime_manager.py
Normal file
@@ -0,0 +1,456 @@
|
|||||||
|
import atexit
|
||||||
|
import functools
|
||||||
|
from typing import Dict, List, Optional
|
||||||
|
|
||||||
|
import docker
|
||||||
|
import requests
|
||||||
|
import tenacity
|
||||||
|
|
||||||
|
from openhands.core.config import AppConfig
|
||||||
|
from openhands.core.exceptions import (
|
||||||
|
AgentRuntimeDisconnectedError,
|
||||||
|
AgentRuntimeNotFoundError,
|
||||||
|
AgentRuntimeNotReadyError,
|
||||||
|
AgentRuntimeUnavailableError,
|
||||||
|
)
|
||||||
|
from openhands.core.logger import DEBUG
|
||||||
|
from openhands.core.logger import openhands_logger as logger
|
||||||
|
from openhands.events import EventStream
|
||||||
|
from openhands.runtime.base import Runtime
|
||||||
|
from openhands.runtime.builder import DockerRuntimeBuilder
|
||||||
|
from openhands.runtime.container import ContainerInfo
|
||||||
|
from openhands.runtime.impl.eventstream.eventstream_runtime import EventStreamRuntime
|
||||||
|
from openhands.runtime.plugins import PluginRequirement, VSCodeRequirement
|
||||||
|
from openhands.runtime.runtime_manager import RuntimeManager
|
||||||
|
from openhands.runtime.utils import find_available_tcp_port
|
||||||
|
from openhands.runtime.utils.log_streamer import LogStreamer
|
||||||
|
from openhands.runtime.utils.request import send_request
|
||||||
|
from openhands.runtime.utils.runtime_build import build_runtime_image
|
||||||
|
from openhands.utils.tenacity_stop import stop_if_should_exit
|
||||||
|
|
||||||
|
CONTAINER_NAME_PREFIX = 'openhands-runtime-'
|
||||||
|
|
||||||
|
_atexit_registered = False
|
||||||
|
|
||||||
|
|
||||||
|
class EventStreamRuntimeManager(RuntimeManager):
|
||||||
|
"""Manages Docker container lifecycle for EventStreamRuntime instances."""
|
||||||
|
|
||||||
|
def __init__(self, config: AppConfig):
|
||||||
|
super().__init__(config)
|
||||||
|
global _atexit_registered
|
||||||
|
if not _atexit_registered:
|
||||||
|
_atexit_registered = True
|
||||||
|
atexit.register(self._cleanup_all_containers)
|
||||||
|
|
||||||
|
self._containers: Dict[str, ContainerInfo] = {}
|
||||||
|
self._docker_client = self._init_docker_client()
|
||||||
|
self._runtime_builder: DockerRuntimeBuilder = DockerRuntimeBuilder(
|
||||||
|
self._docker_client
|
||||||
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
@functools.lru_cache(maxsize=1)
|
||||||
|
def _init_docker_client() -> docker.DockerClient:
|
||||||
|
try:
|
||||||
|
return docker.from_env()
|
||||||
|
except Exception as ex:
|
||||||
|
logger.error(
|
||||||
|
'Launch docker client failed. Please make sure you have installed docker and started docker desktop/daemon.',
|
||||||
|
)
|
||||||
|
raise ex
|
||||||
|
|
||||||
|
async def create_runtime(
|
||||||
|
self,
|
||||||
|
event_stream: EventStream,
|
||||||
|
sid: str,
|
||||||
|
plugins: Optional[List[PluginRequirement]] = None,
|
||||||
|
env_vars: Optional[Dict[str, str]] = None,
|
||||||
|
status_callback=None,
|
||||||
|
attach_to_existing: bool = False,
|
||||||
|
headless_mode: bool = False,
|
||||||
|
) -> Runtime:
|
||||||
|
"""Create a new EventStreamRuntime with an initialized container.
|
||||||
|
|
||||||
|
This overrides the base create_runtime to handle container initialization
|
||||||
|
before creating the runtime.
|
||||||
|
"""
|
||||||
|
if sid in self._runtimes:
|
||||||
|
raise RuntimeError(f'Runtime with ID {sid} already exists')
|
||||||
|
|
||||||
|
# First initialize or attach to the container
|
||||||
|
try:
|
||||||
|
if attach_to_existing:
|
||||||
|
container_info = self.attach_to_container(sid)
|
||||||
|
else:
|
||||||
|
runtime_container_image = self.config.sandbox.runtime_container_image
|
||||||
|
if runtime_container_image is None:
|
||||||
|
if self.config.sandbox.base_container_image is None:
|
||||||
|
raise ValueError(
|
||||||
|
'Neither runtime container image nor base container image is set'
|
||||||
|
)
|
||||||
|
if status_callback:
|
||||||
|
status_callback('info', 'STATUS$STARTING_CONTAINER')
|
||||||
|
runtime_container_image = build_runtime_image(
|
||||||
|
self.config.sandbox.base_container_image,
|
||||||
|
self._runtime_builder,
|
||||||
|
platform=self.config.sandbox.platform,
|
||||||
|
extra_deps=self.config.sandbox.runtime_extra_deps,
|
||||||
|
force_rebuild=self.config.sandbox.force_rebuild_runtime,
|
||||||
|
extra_build_args=self.config.sandbox.runtime_extra_build_args,
|
||||||
|
)
|
||||||
|
|
||||||
|
container_info = self.initialize_container(
|
||||||
|
runtime_container_image,
|
||||||
|
sid,
|
||||||
|
plugins,
|
||||||
|
env_vars,
|
||||||
|
status_callback,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create the runtime with the initialized container
|
||||||
|
runtime = EventStreamRuntime(
|
||||||
|
config=self.config,
|
||||||
|
event_stream=event_stream,
|
||||||
|
sid=sid,
|
||||||
|
plugins=plugins,
|
||||||
|
env_vars=env_vars,
|
||||||
|
status_callback=status_callback,
|
||||||
|
attach_to_existing=attach_to_existing,
|
||||||
|
headless_mode=headless_mode,
|
||||||
|
container_info=container_info,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Initialize the runtime
|
||||||
|
try:
|
||||||
|
await runtime.connect()
|
||||||
|
except AgentRuntimeUnavailableError as e:
|
||||||
|
logger.error(f'Runtime initialization failed: {e}', exc_info=True)
|
||||||
|
if status_callback:
|
||||||
|
status_callback(
|
||||||
|
'error', 'STATUS$ERROR_RUNTIME_DISCONNECTED', str(e)
|
||||||
|
)
|
||||||
|
self._cleanup_container(sid)
|
||||||
|
raise
|
||||||
|
|
||||||
|
self._runtimes[sid] = runtime
|
||||||
|
logger.info(f'Created runtime with ID: {sid}')
|
||||||
|
return runtime
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f'Failed to create runtime: {str(e)}')
|
||||||
|
self._cleanup_container(sid)
|
||||||
|
raise
|
||||||
|
|
||||||
|
def _is_port_in_use_docker(self, port):
|
||||||
|
containers = self._docker_client.containers.list()
|
||||||
|
for container in containers:
|
||||||
|
container_ports = container.ports
|
||||||
|
if str(port) in str(container_ports):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _find_available_port(self, max_attempts=5):
|
||||||
|
port = 39999
|
||||||
|
for _ in range(max_attempts):
|
||||||
|
port = find_available_tcp_port(30000, 39999)
|
||||||
|
if not self._is_port_in_use_docker(port):
|
||||||
|
return port
|
||||||
|
return port
|
||||||
|
|
||||||
|
def initialize_container(
|
||||||
|
self,
|
||||||
|
runtime_container_image: str,
|
||||||
|
sid: str,
|
||||||
|
plugins: Optional[list[PluginRequirement]] = None,
|
||||||
|
env_vars: Optional[Dict[str, str]] = None,
|
||||||
|
status_callback=None,
|
||||||
|
) -> ContainerInfo:
|
||||||
|
"""Initialize a new container for a runtime.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
runtime_container_image: The Docker image to use
|
||||||
|
sid: The session ID that will be used to generate the container name
|
||||||
|
plugins: Optional list of plugins to enable
|
||||||
|
env_vars: Optional environment variables to set
|
||||||
|
status_callback: Optional callback for status updates
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ContainerInfo object with connection details
|
||||||
|
"""
|
||||||
|
logger.debug('Preparing to start container...')
|
||||||
|
if status_callback:
|
||||||
|
status_callback('info', 'STATUS$PREPARING_CONTAINER')
|
||||||
|
|
||||||
|
container_name = f'{CONTAINER_NAME_PREFIX}{sid}'
|
||||||
|
if container_name in self._containers:
|
||||||
|
raise RuntimeError(f'Container {container_name} already exists')
|
||||||
|
|
||||||
|
# Find an available port
|
||||||
|
container_port = self._find_available_port()
|
||||||
|
host_port = container_port # In future this might differ
|
||||||
|
|
||||||
|
plugin_arg = ''
|
||||||
|
if plugins:
|
||||||
|
plugin_arg = f'--plugins {" ".join([plugin.name for plugin in plugins])} '
|
||||||
|
|
||||||
|
use_host_network = self.config.sandbox.use_host_network
|
||||||
|
network_mode: str | None = 'host' if use_host_network else None
|
||||||
|
|
||||||
|
port_mapping: dict[str, list[dict[str, str]]] | None = (
|
||||||
|
None
|
||||||
|
if use_host_network
|
||||||
|
else {f'{container_port}/tcp': [{'HostPort': str(host_port)}]}
|
||||||
|
)
|
||||||
|
|
||||||
|
if use_host_network:
|
||||||
|
logger.warn(
|
||||||
|
'Using host network mode. If you are using MacOS, please make sure you have the latest version of Docker Desktop and enabled host network feature: https://docs.docker.com/network/drivers/host/#docker-desktop',
|
||||||
|
)
|
||||||
|
|
||||||
|
environment = {
|
||||||
|
'port': str(container_port),
|
||||||
|
'PYTHONUNBUFFERED': '1',
|
||||||
|
**(env_vars or {}),
|
||||||
|
}
|
||||||
|
if self.config.debug or DEBUG:
|
||||||
|
environment['DEBUG'] = 'true'
|
||||||
|
|
||||||
|
if any(isinstance(plugin, VSCodeRequirement) for plugin in (plugins or [])):
|
||||||
|
if isinstance(port_mapping, dict):
|
||||||
|
port_mapping[f'{container_port + 1}/tcp'] = [
|
||||||
|
{'HostPort': str(host_port + 1)}
|
||||||
|
]
|
||||||
|
|
||||||
|
logger.debug(f'Workspace Base: {self.config.workspace_base}')
|
||||||
|
if (
|
||||||
|
self.config.workspace_mount_path is not None
|
||||||
|
and self.config.workspace_mount_path_in_sandbox is not None
|
||||||
|
):
|
||||||
|
volumes = {
|
||||||
|
self.config.workspace_mount_path: {
|
||||||
|
'bind': self.config.workspace_mount_path_in_sandbox,
|
||||||
|
'mode': 'rw',
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logger.debug(f'Mount dir: {self.config.workspace_mount_path}')
|
||||||
|
else:
|
||||||
|
logger.debug(
|
||||||
|
'Mount dir is not set, will not mount the workspace directory to the container'
|
||||||
|
)
|
||||||
|
volumes = {}
|
||||||
|
logger.debug(
|
||||||
|
f'Sandbox workspace: {self.config.workspace_mount_path_in_sandbox}',
|
||||||
|
)
|
||||||
|
|
||||||
|
browsergym_arg = ''
|
||||||
|
if self.config.sandbox.browsergym_eval_env is not None:
|
||||||
|
browsergym_arg = (
|
||||||
|
f'--browsergym-eval-env {self.config.sandbox.browsergym_eval_env}'
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
container = self._docker_client.containers.run(
|
||||||
|
runtime_container_image,
|
||||||
|
command=(
|
||||||
|
f'/openhands/micromamba/bin/micromamba run -n openhands '
|
||||||
|
f'poetry run '
|
||||||
|
f'python -u -m openhands.runtime.action_execution_server {container_port} '
|
||||||
|
f'--working-dir "{self.config.workspace_mount_path_in_sandbox}" '
|
||||||
|
f'{plugin_arg}'
|
||||||
|
f'--username {"openhands" if self.config.run_as_openhands else "root"} '
|
||||||
|
f'--user-id {self.config.sandbox.user_id} '
|
||||||
|
f'{browsergym_arg}'
|
||||||
|
),
|
||||||
|
network_mode=network_mode,
|
||||||
|
ports=port_mapping,
|
||||||
|
working_dir='/openhands/code/',
|
||||||
|
name=container_name,
|
||||||
|
detach=True,
|
||||||
|
environment=environment,
|
||||||
|
volumes=volumes,
|
||||||
|
)
|
||||||
|
|
||||||
|
api_url = f'{self.config.sandbox.local_runtime_url}:{container_port}'
|
||||||
|
logger.debug(f'Container started. Server url: {api_url}')
|
||||||
|
|
||||||
|
if status_callback:
|
||||||
|
status_callback('info', 'STATUS$CONTAINER_STARTED')
|
||||||
|
|
||||||
|
container_info = ContainerInfo(
|
||||||
|
container_id=container.id,
|
||||||
|
api_url=api_url,
|
||||||
|
host_port=host_port,
|
||||||
|
container_port=container_port,
|
||||||
|
container=container,
|
||||||
|
)
|
||||||
|
self._containers[container_name] = container_info
|
||||||
|
return container_info
|
||||||
|
|
||||||
|
except docker.errors.APIError as e:
|
||||||
|
if '409' in str(e):
|
||||||
|
logger.warning(
|
||||||
|
f'Container {container_name} already exists. Removing...',
|
||||||
|
)
|
||||||
|
self._cleanup_container(container_name)
|
||||||
|
return self.initialize_container(
|
||||||
|
runtime_container_image,
|
||||||
|
sid,
|
||||||
|
plugins,
|
||||||
|
env_vars,
|
||||||
|
status_callback,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.error(
|
||||||
|
f'Error: Instance {container_name} FAILED to start container!\n',
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
f'Error: Instance {container_name} FAILED to start container!\n',
|
||||||
|
)
|
||||||
|
logger.error(str(e))
|
||||||
|
raise
|
||||||
|
|
||||||
|
def attach_to_container(self, sid: str) -> ContainerInfo:
|
||||||
|
"""Attach to an existing container.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sid: The session ID used to generate the container name
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ContainerInfo object with connection details
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
AgentRuntimeNotFoundError: If the container doesn't exist
|
||||||
|
"""
|
||||||
|
container_name = f'{CONTAINER_NAME_PREFIX}{sid}'
|
||||||
|
|
||||||
|
# Check if we already have the container info
|
||||||
|
if container_name in self._containers:
|
||||||
|
return self._containers[container_name]
|
||||||
|
|
||||||
|
try:
|
||||||
|
container = self._docker_client.containers.get(container_name)
|
||||||
|
container_port = 0
|
||||||
|
for port in container.attrs['NetworkSettings']['Ports']:
|
||||||
|
container_port = int(port.split('/')[0])
|
||||||
|
break
|
||||||
|
|
||||||
|
host_port = container_port # In future this might differ
|
||||||
|
api_url = f'{self.config.sandbox.local_runtime_url}:{container_port}'
|
||||||
|
|
||||||
|
container_info = ContainerInfo(
|
||||||
|
container_id=container.id,
|
||||||
|
api_url=api_url,
|
||||||
|
host_port=host_port,
|
||||||
|
container_port=container_port,
|
||||||
|
container=container,
|
||||||
|
)
|
||||||
|
self._containers[container_name] = container_info
|
||||||
|
return container_info
|
||||||
|
|
||||||
|
except docker.errors.NotFound:
|
||||||
|
raise AgentRuntimeNotFoundError(f'Container {container_name} not found.')
|
||||||
|
|
||||||
|
@tenacity.retry(
|
||||||
|
stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
|
||||||
|
retry=tenacity.retry_if_exception_type(
|
||||||
|
(ConnectionError, requests.exceptions.ConnectionError)
|
||||||
|
),
|
||||||
|
reraise=True,
|
||||||
|
wait=tenacity.wait_fixed(2),
|
||||||
|
)
|
||||||
|
def wait_until_alive(
|
||||||
|
self,
|
||||||
|
sid: str,
|
||||||
|
log_streamer: Optional[LogStreamer] = None,
|
||||||
|
):
|
||||||
|
"""Wait until a container is ready to accept connections.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sid: The session ID used to generate the container name
|
||||||
|
log_streamer: Optional log streamer that must be ready
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
AgentRuntimeNotFoundError: If the container doesn't exist
|
||||||
|
AgentRuntimeDisconnectedError: If the container has exited
|
||||||
|
AgentRuntimeNotReadyError: If the log streamer isn't ready
|
||||||
|
"""
|
||||||
|
container_name = f'{CONTAINER_NAME_PREFIX}{sid}'
|
||||||
|
container_info = self._containers.get(container_name)
|
||||||
|
if not container_info:
|
||||||
|
raise AgentRuntimeNotFoundError(f'Container {container_name} not found.')
|
||||||
|
|
||||||
|
try:
|
||||||
|
if container_info.container.status == 'exited':
|
||||||
|
raise AgentRuntimeDisconnectedError(
|
||||||
|
f'Container {container_name} has exited.'
|
||||||
|
)
|
||||||
|
except docker.errors.NotFound:
|
||||||
|
raise AgentRuntimeNotFoundError(f'Container {container_name} not found.')
|
||||||
|
|
||||||
|
if not log_streamer:
|
||||||
|
raise AgentRuntimeNotReadyError('Runtime client is not ready.')
|
||||||
|
|
||||||
|
with send_request(
|
||||||
|
requests.Session(),
|
||||||
|
'GET',
|
||||||
|
f'{container_info.api_url}/alive',
|
||||||
|
timeout=5,
|
||||||
|
):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _cleanup_container(self, sid: str, remove_all: bool = False) -> None:
|
||||||
|
"""Clean up a container and its resources.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sid: The session ID used to generate the container name
|
||||||
|
remove_all: If True, remove all containers with the same prefix
|
||||||
|
"""
|
||||||
|
container_name = f'{CONTAINER_NAME_PREFIX}{sid}'
|
||||||
|
if remove_all:
|
||||||
|
self._cleanup_all_containers()
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
container = self._docker_client.containers.get(container_name)
|
||||||
|
container.remove(force=True)
|
||||||
|
if container_name in self._containers:
|
||||||
|
del self._containers[container_name]
|
||||||
|
except docker.errors.NotFound:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _cleanup_all_containers(self):
|
||||||
|
"""Clean up all containers managed by this RuntimeManager."""
|
||||||
|
containers = self._docker_client.containers.list(all=True)
|
||||||
|
for container in containers:
|
||||||
|
if container.name.startswith(CONTAINER_NAME_PREFIX):
|
||||||
|
try:
|
||||||
|
container.remove(force=True)
|
||||||
|
except docker.errors.NotFound:
|
||||||
|
pass
|
||||||
|
self._containers.clear()
|
||||||
|
|
||||||
|
def destroy_runtime(self, runtime_id: str) -> bool:
|
||||||
|
"""Destroy a runtime and its container.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
runtime_id: The runtime ID to destroy
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if the runtime was found and destroyed, False otherwise
|
||||||
|
"""
|
||||||
|
runtime = self._runtimes.get(runtime_id)
|
||||||
|
if runtime:
|
||||||
|
runtime.close()
|
||||||
|
self._cleanup_container(runtime_id)
|
||||||
|
del self._runtimes[runtime_id]
|
||||||
|
logger.info(f'Destroyed runtime with ID: {runtime_id}')
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def destroy_all_runtimes(self):
|
||||||
|
"""Destroy all runtimes and their containers."""
|
||||||
|
for runtime_id in list(self._runtimes.keys()):
|
||||||
|
self.destroy_runtime(runtime_id)
|
||||||
|
self._cleanup_all_containers()
|
||||||
@@ -16,6 +16,7 @@ from openhands.runtime.impl.eventstream.eventstream_runtime import (
|
|||||||
)
|
)
|
||||||
from openhands.runtime.plugins import PluginRequirement
|
from openhands.runtime.plugins import PluginRequirement
|
||||||
from openhands.runtime.utils.command import get_remote_startup_command
|
from openhands.runtime.utils.command import get_remote_startup_command
|
||||||
|
from openhands.runtime.utils.request import send_request
|
||||||
from openhands.runtime.utils.runtime_build import (
|
from openhands.runtime.utils.runtime_build import (
|
||||||
BuildFromImageType,
|
BuildFromImageType,
|
||||||
prep_build_folder,
|
prep_build_folder,
|
||||||
@@ -181,7 +182,7 @@ class ModalRuntime(EventStreamRuntime):
|
|||||||
self.log('debug', 'Waiting for client to become ready...')
|
self.log('debug', 'Waiting for client to become ready...')
|
||||||
self.send_status_message('STATUS$WAITING_FOR_CLIENT')
|
self.send_status_message('STATUS$WAITING_FOR_CLIENT')
|
||||||
|
|
||||||
self._wait_until_alive()
|
await call_sync_from_async(self._wait_until_alive)
|
||||||
self.setup_initial_env()
|
self.setup_initial_env()
|
||||||
|
|
||||||
if not self.attach_to_existing:
|
if not self.attach_to_existing:
|
||||||
@@ -290,6 +291,26 @@ echo 'export INPUTRC=/etc/inputrc' >> /etc/bash.bashrc
|
|||||||
self.close()
|
self.close()
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
|
@tenacity.retry(
|
||||||
|
stop=tenacity.stop_after_delay(120),
|
||||||
|
retry=tenacity.retry_if_exception_type(
|
||||||
|
(ConnectionError, requests.exceptions.ConnectionError)
|
||||||
|
),
|
||||||
|
reraise=True,
|
||||||
|
wait=tenacity.wait_fixed(2),
|
||||||
|
)
|
||||||
|
def _wait_until_alive(self):
|
||||||
|
if not self.sandbox:
|
||||||
|
raise RuntimeError('Sandbox not initialized')
|
||||||
|
|
||||||
|
with send_request(
|
||||||
|
self.session,
|
||||||
|
'GET',
|
||||||
|
f'{self.api_url}/alive',
|
||||||
|
timeout=5,
|
||||||
|
):
|
||||||
|
pass
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
"""Closes the ModalRuntime and associated objects."""
|
"""Closes the ModalRuntime and associated objects."""
|
||||||
if self.log_streamer:
|
if self.log_streamer:
|
||||||
|
|||||||
77
openhands/runtime/runtime_manager.py
Normal file
77
openhands/runtime/runtime_manager.py
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
from typing import Dict, List, Optional
|
||||||
|
|
||||||
|
from openhands.core.config import AppConfig
|
||||||
|
from openhands.core.exceptions import AgentRuntimeUnavailableError
|
||||||
|
from openhands.core.logger import openhands_logger as logger
|
||||||
|
from openhands.events import EventStream
|
||||||
|
from openhands.runtime import get_runtime_cls
|
||||||
|
from openhands.runtime.base import Runtime
|
||||||
|
from openhands.runtime.plugins import PluginRequirement
|
||||||
|
from openhands.runtime.utils.singleton import Singleton
|
||||||
|
|
||||||
|
|
||||||
|
class RuntimeManager(metaclass=Singleton):
|
||||||
|
def __init__(self, config: AppConfig):
|
||||||
|
self._runtimes: Dict[str, Runtime] = {}
|
||||||
|
self._config = config
|
||||||
|
|
||||||
|
@property
|
||||||
|
def config(self) -> AppConfig:
|
||||||
|
return self._config
|
||||||
|
|
||||||
|
async def create_runtime(
|
||||||
|
self,
|
||||||
|
event_stream: EventStream,
|
||||||
|
sid: str,
|
||||||
|
plugins: Optional[List[PluginRequirement]] = None,
|
||||||
|
env_vars: Optional[Dict[str, str]] = None,
|
||||||
|
status_callback=None,
|
||||||
|
attach_to_existing: bool = False,
|
||||||
|
headless_mode: bool = False,
|
||||||
|
) -> Runtime:
|
||||||
|
if sid in self._runtimes:
|
||||||
|
raise RuntimeError(f'Runtime with ID {sid} already exists')
|
||||||
|
|
||||||
|
runtime_class = get_runtime_cls(self.config.runtime)
|
||||||
|
logger.debug(f'Initializing runtime: {runtime_class.__name__}')
|
||||||
|
runtime = runtime_class(
|
||||||
|
config=self.config,
|
||||||
|
event_stream=event_stream,
|
||||||
|
sid=sid,
|
||||||
|
plugins=plugins,
|
||||||
|
env_vars=env_vars,
|
||||||
|
status_callback=status_callback,
|
||||||
|
attach_to_existing=attach_to_existing,
|
||||||
|
headless_mode=headless_mode,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await runtime.connect()
|
||||||
|
except AgentRuntimeUnavailableError as e:
|
||||||
|
logger.error(f'Runtime initialization failed: {e}', exc_info=True)
|
||||||
|
if status_callback:
|
||||||
|
status_callback('error', 'STATUS$ERROR_RUNTIME_DISCONNECTED', str(e))
|
||||||
|
raise
|
||||||
|
|
||||||
|
self._runtimes[sid] = runtime
|
||||||
|
logger.info(f'Created runtime with ID: {sid}')
|
||||||
|
return runtime
|
||||||
|
|
||||||
|
def get_runtime(self, sid: str) -> Optional[Runtime]:
|
||||||
|
return self._runtimes.get(sid)
|
||||||
|
|
||||||
|
def list_runtimes(self) -> List[str]:
|
||||||
|
return list(self._runtimes.keys())
|
||||||
|
|
||||||
|
def destroy_runtime(self, sid: str) -> bool:
|
||||||
|
runtime = self._runtimes.get(sid)
|
||||||
|
if runtime:
|
||||||
|
runtime.close()
|
||||||
|
del self._runtimes[sid]
|
||||||
|
logger.info(f'Destroyed runtime with ID: {sid}')
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def destroy_all_runtimes(self):
|
||||||
|
for runtime_id in list(self._runtimes.keys()):
|
||||||
|
self.destroy_runtime(runtime_id)
|
||||||
@@ -0,0 +1,14 @@
|
|||||||
|
class Singleton(type):
|
||||||
|
"""Metaclass for creating singleton classes.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
class MyClass(metaclass=Singleton):
|
||||||
|
pass
|
||||||
|
"""
|
||||||
|
|
||||||
|
_instances: dict = {}
|
||||||
|
|
||||||
|
def __call__(cls, *args, **kwargs):
|
||||||
|
if cls not in cls._instances:
|
||||||
|
cls._instances[cls] = super().__call__(*args, **kwargs)
|
||||||
|
return cls._instances[cls]
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ from openhands.server.middleware import (
|
|||||||
LocalhostCORSMiddleware,
|
LocalhostCORSMiddleware,
|
||||||
NoCacheMiddleware,
|
NoCacheMiddleware,
|
||||||
RateLimitMiddleware,
|
RateLimitMiddleware,
|
||||||
|
session_manager,
|
||||||
)
|
)
|
||||||
from openhands.server.routes.conversation import app as conversation_api_router
|
from openhands.server.routes.conversation import app as conversation_api_router
|
||||||
from openhands.server.routes.feedback import app as feedback_api_router
|
from openhands.server.routes.feedback import app as feedback_api_router
|
||||||
@@ -24,7 +25,7 @@ from openhands.server.routes.new_conversation import app as new_conversation_api
|
|||||||
from openhands.server.routes.public import app as public_api_router
|
from openhands.server.routes.public import app as public_api_router
|
||||||
from openhands.server.routes.security import app as security_api_router
|
from openhands.server.routes.security import app as security_api_router
|
||||||
from openhands.server.routes.settings import app as settings_router
|
from openhands.server.routes.settings import app as settings_router
|
||||||
from openhands.server.shared import openhands_config, session_manager
|
from openhands.server.shared import openhands_config
|
||||||
from openhands.utils.import_utils import get_impl
|
from openhands.utils.import_utils import get_impl
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -13,8 +13,9 @@ from openhands.events.observation import (
|
|||||||
from openhands.events.observation.agent import AgentStateChangedObservation
|
from openhands.events.observation.agent import AgentStateChangedObservation
|
||||||
from openhands.events.serialization import event_to_dict
|
from openhands.events.serialization import event_to_dict
|
||||||
from openhands.events.stream import AsyncEventStreamWrapper
|
from openhands.events.stream import AsyncEventStreamWrapper
|
||||||
|
from openhands.server.middleware import session_manager
|
||||||
from openhands.server.session.manager import ConversationDoesNotExistError
|
from openhands.server.session.manager import ConversationDoesNotExistError
|
||||||
from openhands.server.shared import config, openhands_config, session_manager, sio
|
from openhands.server.shared import config, openhands_config, sio
|
||||||
from openhands.server.types import AppMode
|
from openhands.server.types import AppMode
|
||||||
from openhands.storage.conversation.conversation_store import (
|
from openhands.storage.conversation.conversation_store import (
|
||||||
ConversationStore,
|
ConversationStore,
|
||||||
|
|||||||
@@ -10,9 +10,12 @@ from fastapi.responses import JSONResponse
|
|||||||
from starlette.middleware.base import BaseHTTPMiddleware
|
from starlette.middleware.base import BaseHTTPMiddleware
|
||||||
from starlette.types import ASGIApp
|
from starlette.types import ASGIApp
|
||||||
|
|
||||||
from openhands.server.shared import session_manager
|
from openhands.server.session import SessionManager
|
||||||
|
from openhands.server.shared import config, file_store, runtime_manager, sio
|
||||||
from openhands.server.types import SessionMiddlewareInterface
|
from openhands.server.types import SessionMiddlewareInterface
|
||||||
|
|
||||||
|
session_manager = SessionManager(sio, config, file_store)
|
||||||
|
|
||||||
|
|
||||||
class LocalhostCORSMiddleware(CORSMiddleware):
|
class LocalhostCORSMiddleware(CORSMiddleware):
|
||||||
"""
|
"""
|
||||||
@@ -134,10 +137,17 @@ class AttachConversationMiddleware(SessionMiddlewareInterface):
|
|||||||
"""
|
"""
|
||||||
Attach the user's session based on the provided authentication token.
|
Attach the user's session based on the provided authentication token.
|
||||||
"""
|
"""
|
||||||
request.state.conversation = await session_manager.attach_to_conversation(
|
request.state.runtime = runtime_manager.get_runtime(request.state.sid)
|
||||||
request.state.sid
|
if request.state.runtime is None:
|
||||||
)
|
event_stream = await session_manager.get_event_stream(request.state.sid)
|
||||||
if not request.state.conversation:
|
if event_stream:
|
||||||
|
request.state.runtime = await runtime_manager.create_runtime(
|
||||||
|
event_stream=event_stream,
|
||||||
|
sid=request.state.sid,
|
||||||
|
attach_to_existing=True,
|
||||||
|
headless_mode=False,
|
||||||
|
)
|
||||||
|
if not request.state.runtime:
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
status_code=status.HTTP_404_NOT_FOUND,
|
status_code=status.HTTP_404_NOT_FOUND,
|
||||||
content={'error': 'Session not found'},
|
content={'error': 'Session not found'},
|
||||||
@@ -148,7 +158,7 @@ class AttachConversationMiddleware(SessionMiddlewareInterface):
|
|||||||
"""
|
"""
|
||||||
Detach the user's session.
|
Detach the user's session.
|
||||||
"""
|
"""
|
||||||
await session_manager.detach_from_conversation(request.state.conversation)
|
pass
|
||||||
|
|
||||||
async def __call__(self, request: Request, call_next: Callable):
|
async def __call__(self, request: Request, call_next: Callable):
|
||||||
if not self._should_attach(request):
|
if not self._should_attach(request):
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ async def get_remote_runtime_config(request: Request):
|
|||||||
|
|
||||||
Currently, this is the session ID and runtime ID (if available).
|
Currently, this is the session ID and runtime ID (if available).
|
||||||
"""
|
"""
|
||||||
runtime = request.state.conversation.runtime
|
runtime = request.state.runtime
|
||||||
runtime_id = runtime.runtime_id if hasattr(runtime, 'runtime_id') else None
|
runtime_id = runtime.runtime_id if hasattr(runtime, 'runtime_id') else None
|
||||||
session_id = runtime.sid if hasattr(runtime, 'sid') else None
|
session_id = runtime.sid if hasattr(runtime, 'sid') else None
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
@@ -37,7 +37,7 @@ async def get_vscode_url(request: Request):
|
|||||||
JSONResponse: A JSON response indicating the success of the operation.
|
JSONResponse: A JSON response indicating the success of the operation.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
runtime: Runtime = request.state.conversation.runtime
|
runtime: Runtime = request.state.runtime
|
||||||
logger.debug(f'Runtime type: {type(runtime)}')
|
logger.debug(f'Runtime type: {type(runtime)}')
|
||||||
logger.debug(f'Runtime VSCode URL: {runtime.vscode_url}')
|
logger.debug(f'Runtime VSCode URL: {runtime.vscode_url}')
|
||||||
return JSONResponse(status_code=200, content={'vscode_url': runtime.vscode_url})
|
return JSONResponse(status_code=200, content={'vscode_url': runtime.vscode_url})
|
||||||
@@ -81,12 +81,12 @@ async def search_events(
|
|||||||
HTTPException: If conversation is not found
|
HTTPException: If conversation is not found
|
||||||
ValueError: If limit is less than 1 or greater than 100
|
ValueError: If limit is less than 1 or greater than 100
|
||||||
"""
|
"""
|
||||||
if not request.state.conversation:
|
if not request.state.runtime:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_404_NOT_FOUND, detail='Conversation not found'
|
status_code=status.HTTP_404_NOT_FOUND, detail='Conversation not found'
|
||||||
)
|
)
|
||||||
# Get matching events from the stream
|
# Get matching events from the stream
|
||||||
event_stream = request.state.conversation.event_stream
|
event_stream = request.state.runtime.event_stream
|
||||||
matching_events = event_stream.get_matching_events(
|
matching_events = event_stream.get_matching_events(
|
||||||
query=query,
|
query=query,
|
||||||
event_type=event_type,
|
event_type=event_type,
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ async def submit_feedback(request: Request, conversation_id: str):
|
|||||||
# and there is a function to handle the storage.
|
# and there is a function to handle the storage.
|
||||||
body = await request.json()
|
body = await request.json()
|
||||||
async_stream = AsyncEventStreamWrapper(
|
async_stream = AsyncEventStreamWrapper(
|
||||||
request.state.conversation.event_stream, filter_hidden=True
|
request.state.runtime.event_stream, filter_hidden=True
|
||||||
)
|
)
|
||||||
trajectory = []
|
trajectory = []
|
||||||
async for event in async_stream:
|
async for event in async_stream:
|
||||||
|
|||||||
@@ -58,13 +58,13 @@ async def list_files(request: Request, conversation_id: str, path: str | None =
|
|||||||
Raises:
|
Raises:
|
||||||
HTTPException: If there's an error listing the files.
|
HTTPException: If there's an error listing the files.
|
||||||
"""
|
"""
|
||||||
if not request.state.conversation.runtime:
|
if not request.state.runtime:
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
status_code=status.HTTP_404_NOT_FOUND,
|
status_code=status.HTTP_404_NOT_FOUND,
|
||||||
content={'error': 'Runtime not yet initialized'},
|
content={'error': 'Runtime not yet initialized'},
|
||||||
)
|
)
|
||||||
|
|
||||||
runtime: Runtime = request.state.conversation.runtime
|
runtime: Runtime = request.state.runtime
|
||||||
try:
|
try:
|
||||||
file_list = await call_sync_from_async(runtime.list_files, path)
|
file_list = await call_sync_from_async(runtime.list_files, path)
|
||||||
except AgentRuntimeUnavailableError as e:
|
except AgentRuntimeUnavailableError as e:
|
||||||
@@ -124,7 +124,7 @@ async def select_file(file: str, request: Request):
|
|||||||
Raises:
|
Raises:
|
||||||
HTTPException: If there's an error opening the file.
|
HTTPException: If there's an error opening the file.
|
||||||
"""
|
"""
|
||||||
runtime: Runtime = request.state.conversation.runtime
|
runtime: Runtime = request.state.runtime
|
||||||
|
|
||||||
file = os.path.join(runtime.config.workspace_mount_path_in_sandbox, file)
|
file = os.path.join(runtime.config.workspace_mount_path_in_sandbox, file)
|
||||||
read_action = FileReadAction(file)
|
read_action = FileReadAction(file)
|
||||||
@@ -199,7 +199,7 @@ async def upload_file(request: Request, conversation_id: str, files: list[Upload
|
|||||||
tmp_file.write(file_contents)
|
tmp_file.write(file_contents)
|
||||||
tmp_file.flush()
|
tmp_file.flush()
|
||||||
|
|
||||||
runtime: Runtime = request.state.conversation.runtime
|
runtime: Runtime = request.state.runtime
|
||||||
try:
|
try:
|
||||||
await call_sync_from_async(
|
await call_sync_from_async(
|
||||||
runtime.copy_to,
|
runtime.copy_to,
|
||||||
@@ -276,7 +276,7 @@ async def save_file(request: Request):
|
|||||||
raise HTTPException(status_code=400, detail='Missing filePath or content')
|
raise HTTPException(status_code=400, detail='Missing filePath or content')
|
||||||
|
|
||||||
# Save the file to the agent's runtime file store
|
# Save the file to the agent's runtime file store
|
||||||
runtime: Runtime = request.state.conversation.runtime
|
runtime: Runtime = request.state.runtime
|
||||||
file_path = os.path.join(
|
file_path = os.path.join(
|
||||||
runtime.config.workspace_mount_path_in_sandbox, file_path
|
runtime.config.workspace_mount_path_in_sandbox, file_path
|
||||||
)
|
)
|
||||||
@@ -316,7 +316,7 @@ async def zip_current_workspace(
|
|||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
logger.debug('Zipping workspace')
|
logger.debug('Zipping workspace')
|
||||||
runtime: Runtime = request.state.conversation.runtime
|
runtime: Runtime = request.state.runtime
|
||||||
path = runtime.config.workspace_mount_path_in_sandbox
|
path = runtime.config.workspace_mount_path_in_sandbox
|
||||||
try:
|
try:
|
||||||
zip_file = await call_sync_from_async(runtime.copy_from, path)
|
zip_file = await call_sync_from_async(runtime.copy_from, path)
|
||||||
|
|||||||
@@ -6,9 +6,10 @@ from github import Github
|
|||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from openhands.core.logger import openhands_logger as logger
|
from openhands.core.logger import openhands_logger as logger
|
||||||
|
from openhands.server.middleware import session_manager
|
||||||
from openhands.server.routes.settings import SettingsStoreImpl
|
from openhands.server.routes.settings import SettingsStoreImpl
|
||||||
from openhands.server.session.conversation_init_data import ConversationInitData
|
from openhands.server.session.conversation_init_data import ConversationInitData
|
||||||
from openhands.server.shared import config, session_manager
|
from openhands.server.shared import config
|
||||||
from openhands.storage.conversation.conversation_store import (
|
from openhands.storage.conversation.conversation_store import (
|
||||||
ConversationMetadata,
|
ConversationMetadata,
|
||||||
ConversationStore,
|
ConversationStore,
|
||||||
|
|||||||
@@ -4,6 +4,9 @@ from fastapi import (
|
|||||||
Request,
|
Request,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from openhands.security import SecurityAnalyzer, options
|
||||||
|
from openhands.server.shared import config
|
||||||
|
|
||||||
app = APIRouter(prefix='/api/conversations/{conversation_id}')
|
app = APIRouter(prefix='/api/conversations/{conversation_id}')
|
||||||
|
|
||||||
|
|
||||||
@@ -22,9 +25,10 @@ async def security_api(request: Request):
|
|||||||
Raises:
|
Raises:
|
||||||
HTTPException: If the security analyzer is not initialized.
|
HTTPException: If the security analyzer is not initialized.
|
||||||
"""
|
"""
|
||||||
if not request.state.conversation.security_analyzer:
|
if not request.state.runtime:
|
||||||
raise HTTPException(status_code=404, detail='Security analyzer not initialized')
|
raise HTTPException(status_code=404, detail='Security analyzer not initialized')
|
||||||
|
security_analyzer = options.SecurityAnalyzers.get(
|
||||||
|
config.security.security_analyzer or '', SecurityAnalyzer
|
||||||
|
)(request.state.runtime.event_stream)
|
||||||
|
|
||||||
return await request.state.conversation.security_analyzer.handle_api_request(
|
return await security_analyzer.handle_api_request(request)
|
||||||
request
|
|
||||||
)
|
|
||||||
|
|||||||
@@ -4,16 +4,16 @@ from typing import Callable, Optional
|
|||||||
from openhands.controller import AgentController
|
from openhands.controller import AgentController
|
||||||
from openhands.controller.agent import Agent
|
from openhands.controller.agent import Agent
|
||||||
from openhands.controller.state.state import State
|
from openhands.controller.state.state import State
|
||||||
from openhands.core.config import AgentConfig, AppConfig, LLMConfig
|
from openhands.core.config import AgentConfig, LLMConfig
|
||||||
from openhands.core.exceptions import AgentRuntimeUnavailableError
|
from openhands.core.exceptions import AgentRuntimeUnavailableError
|
||||||
from openhands.core.logger import openhands_logger as logger
|
from openhands.core.logger import openhands_logger as logger
|
||||||
from openhands.core.schema.agent import AgentState
|
from openhands.core.schema.agent import AgentState
|
||||||
from openhands.events.action import ChangeAgentStateAction
|
from openhands.events.action import ChangeAgentStateAction
|
||||||
from openhands.events.event import EventSource
|
from openhands.events.event import EventSource
|
||||||
from openhands.events.stream import EventStream
|
from openhands.events.stream import EventStream
|
||||||
from openhands.runtime import get_runtime_cls
|
|
||||||
from openhands.runtime.base import Runtime
|
from openhands.runtime.base import Runtime
|
||||||
from openhands.security import SecurityAnalyzer, options
|
from openhands.security import SecurityAnalyzer, options
|
||||||
|
from openhands.server.shared import runtime_manager
|
||||||
from openhands.storage.files import FileStore
|
from openhands.storage.files import FileStore
|
||||||
from openhands.utils.async_utils import call_async_from_sync, call_sync_from_async
|
from openhands.utils.async_utils import call_async_from_sync, call_sync_from_async
|
||||||
from openhands.utils.shutdown_listener import should_continue
|
from openhands.utils.shutdown_listener import should_continue
|
||||||
@@ -59,8 +59,6 @@ class AgentSession:
|
|||||||
|
|
||||||
async def start(
|
async def start(
|
||||||
self,
|
self,
|
||||||
runtime_name: str,
|
|
||||||
config: AppConfig,
|
|
||||||
agent: Agent,
|
agent: Agent,
|
||||||
max_iterations: int,
|
max_iterations: int,
|
||||||
max_budget_per_task: float | None = None,
|
max_budget_per_task: float | None = None,
|
||||||
@@ -71,8 +69,6 @@ class AgentSession:
|
|||||||
):
|
):
|
||||||
"""Starts the Agent session
|
"""Starts the Agent session
|
||||||
Parameters:
|
Parameters:
|
||||||
- runtime_name: The name of the runtime associated with the session
|
|
||||||
- config:
|
|
||||||
- agent:
|
- agent:
|
||||||
- max_iterations:
|
- max_iterations:
|
||||||
- max_budget_per_task:
|
- max_budget_per_task:
|
||||||
@@ -87,8 +83,6 @@ class AgentSession:
|
|||||||
asyncio.get_event_loop().run_in_executor(
|
asyncio.get_event_loop().run_in_executor(
|
||||||
None,
|
None,
|
||||||
self._start_thread,
|
self._start_thread,
|
||||||
runtime_name,
|
|
||||||
config,
|
|
||||||
agent,
|
agent,
|
||||||
max_iterations,
|
max_iterations,
|
||||||
max_budget_per_task,
|
max_budget_per_task,
|
||||||
@@ -107,8 +101,6 @@ class AgentSession:
|
|||||||
|
|
||||||
async def _start(
|
async def _start(
|
||||||
self,
|
self,
|
||||||
runtime_name: str,
|
|
||||||
config: AppConfig,
|
|
||||||
agent: Agent,
|
agent: Agent,
|
||||||
max_iterations: int,
|
max_iterations: int,
|
||||||
max_budget_per_task: float | None = None,
|
max_budget_per_task: float | None = None,
|
||||||
@@ -121,10 +113,10 @@ class AgentSession:
|
|||||||
logger.warning('Session closed before starting')
|
logger.warning('Session closed before starting')
|
||||||
return
|
return
|
||||||
self._initializing = True
|
self._initializing = True
|
||||||
self._create_security_analyzer(config.security.security_analyzer)
|
self._create_security_analyzer(
|
||||||
|
runtime_manager.config.security.security_analyzer
|
||||||
|
)
|
||||||
await self._create_runtime(
|
await self._create_runtime(
|
||||||
runtime_name=runtime_name,
|
|
||||||
config=config,
|
|
||||||
agent=agent,
|
agent=agent,
|
||||||
github_token=github_token,
|
github_token=github_token,
|
||||||
selected_repository=selected_repository,
|
selected_repository=selected_repository,
|
||||||
@@ -132,7 +124,7 @@ class AgentSession:
|
|||||||
|
|
||||||
self.controller = self._create_controller(
|
self.controller = self._create_controller(
|
||||||
agent,
|
agent,
|
||||||
config.security.confirmation_mode,
|
runtime_manager.config.security.confirmation_mode,
|
||||||
max_iterations,
|
max_iterations,
|
||||||
max_budget_per_task=max_budget_per_task,
|
max_budget_per_task=max_budget_per_task,
|
||||||
agent_to_llm_config=agent_to_llm_config,
|
agent_to_llm_config=agent_to_llm_config,
|
||||||
@@ -170,7 +162,7 @@ class AgentSession:
|
|||||||
end_state.save_to_session(self.sid, self.file_store)
|
end_state.save_to_session(self.sid, self.file_store)
|
||||||
await self.controller.close()
|
await self.controller.close()
|
||||||
if self.runtime is not None:
|
if self.runtime is not None:
|
||||||
self.runtime.close()
|
runtime_manager.destroy_runtime(self.sid)
|
||||||
if self.security_analyzer is not None:
|
if self.security_analyzer is not None:
|
||||||
await self.security_analyzer.close()
|
await self.security_analyzer.close()
|
||||||
|
|
||||||
@@ -193,8 +185,6 @@ class AgentSession:
|
|||||||
|
|
||||||
async def _create_runtime(
|
async def _create_runtime(
|
||||||
self,
|
self,
|
||||||
runtime_name: str,
|
|
||||||
config: AppConfig,
|
|
||||||
agent: Agent,
|
agent: Agent,
|
||||||
github_token: str | None = None,
|
github_token: str | None = None,
|
||||||
selected_repository: str | None = None,
|
selected_repository: str | None = None,
|
||||||
@@ -202,38 +192,28 @@ class AgentSession:
|
|||||||
"""Creates a runtime instance
|
"""Creates a runtime instance
|
||||||
|
|
||||||
Parameters:
|
Parameters:
|
||||||
- runtime_name: The name of the runtime associated with the session
|
|
||||||
- config:
|
|
||||||
- agent:
|
- agent:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if self.runtime is not None:
|
if self.runtime is not None:
|
||||||
raise RuntimeError('Runtime already created')
|
raise RuntimeError('Runtime already created')
|
||||||
|
|
||||||
logger.debug(f'Initializing runtime `{runtime_name}` now...')
|
|
||||||
runtime_cls = get_runtime_cls(runtime_name)
|
|
||||||
self.runtime = runtime_cls(
|
|
||||||
config=config,
|
|
||||||
event_stream=self.event_stream,
|
|
||||||
sid=self.sid,
|
|
||||||
plugins=agent.sandbox_plugins,
|
|
||||||
status_callback=self._status_callback,
|
|
||||||
headless_mode=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
# FIXME: this sleep is a terrible hack.
|
# FIXME: this sleep is a terrible hack.
|
||||||
# This is to give the websocket a second to connect, so that
|
# This is to give the websocket a second to connect, so that
|
||||||
# the status messages make it through to the frontend.
|
# the status messages make it through to the frontend.
|
||||||
# We should find a better way to plumb status messages through.
|
# We should find a better way to plumb status messages through.
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await self.runtime.connect()
|
self.runtime = await runtime_manager.create_runtime(
|
||||||
|
event_stream=self.event_stream,
|
||||||
|
sid=self.sid,
|
||||||
|
plugins=agent.sandbox_plugins,
|
||||||
|
status_callback=self._status_callback,
|
||||||
|
headless_mode=False,
|
||||||
|
)
|
||||||
except AgentRuntimeUnavailableError as e:
|
except AgentRuntimeUnavailableError as e:
|
||||||
logger.error(f'Runtime initialization failed: {e}', exc_info=True)
|
logger.error(f'Runtime initialization failed: {e}', exc_info=True)
|
||||||
if self._status_callback:
|
|
||||||
self._status_callback(
|
|
||||||
'error', 'STATUS$ERROR_RUNTIME_DISCONNECTED', str(e)
|
|
||||||
)
|
|
||||||
return
|
return
|
||||||
|
|
||||||
self.runtime.clone_repo(github_token, selected_repository)
|
self.runtime.clone_repo(github_token, selected_repository)
|
||||||
|
|||||||
@@ -1,46 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
|
|
||||||
from openhands.core.config import AppConfig
|
|
||||||
from openhands.events.stream import EventStream
|
|
||||||
from openhands.runtime import get_runtime_cls
|
|
||||||
from openhands.runtime.base import Runtime
|
|
||||||
from openhands.security import SecurityAnalyzer, options
|
|
||||||
from openhands.storage.files import FileStore
|
|
||||||
from openhands.utils.async_utils import call_sync_from_async
|
|
||||||
|
|
||||||
|
|
||||||
class Conversation:
|
|
||||||
sid: str
|
|
||||||
file_store: FileStore
|
|
||||||
event_stream: EventStream
|
|
||||||
runtime: Runtime
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
sid: str,
|
|
||||||
file_store: FileStore,
|
|
||||||
config: AppConfig,
|
|
||||||
):
|
|
||||||
self.sid = sid
|
|
||||||
self.config = config
|
|
||||||
self.file_store = file_store
|
|
||||||
self.event_stream = EventStream(sid, file_store)
|
|
||||||
if config.security.security_analyzer:
|
|
||||||
self.security_analyzer = options.SecurityAnalyzers.get(
|
|
||||||
config.security.security_analyzer, SecurityAnalyzer
|
|
||||||
)(self.event_stream)
|
|
||||||
|
|
||||||
runtime_cls = get_runtime_cls(self.config.runtime)
|
|
||||||
self.runtime = runtime_cls(
|
|
||||||
config=config,
|
|
||||||
event_stream=self.event_stream,
|
|
||||||
sid=self.sid,
|
|
||||||
attach_to_existing=True,
|
|
||||||
headless_mode=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
async def connect(self):
|
|
||||||
await self.runtime.connect()
|
|
||||||
|
|
||||||
async def disconnect(self):
|
|
||||||
asyncio.create_task(call_sync_from_async(self.runtime.close))
|
|
||||||
@@ -6,12 +6,11 @@ from dataclasses import dataclass, field
|
|||||||
import socketio
|
import socketio
|
||||||
|
|
||||||
from openhands.core.config import AppConfig
|
from openhands.core.config import AppConfig
|
||||||
from openhands.core.exceptions import AgentRuntimeUnavailableError
|
|
||||||
from openhands.core.logger import openhands_logger as logger
|
from openhands.core.logger import openhands_logger as logger
|
||||||
from openhands.events.stream import EventStream, session_exists
|
from openhands.events.stream import EventStream
|
||||||
from openhands.server.session.conversation import Conversation
|
|
||||||
from openhands.server.session.conversation_init_data import ConversationInitData
|
from openhands.server.session.conversation_init_data import ConversationInitData
|
||||||
from openhands.server.session.session import ROOM_KEY, Session
|
from openhands.server.session.session import ROOM_KEY, Session
|
||||||
|
from openhands.server.shared import runtime_manager
|
||||||
from openhands.storage.files import FileStore
|
from openhands.storage.files import FileStore
|
||||||
from openhands.utils.async_utils import call_sync_from_async
|
from openhands.utils.async_utils import call_sync_from_async
|
||||||
from openhands.utils.shutdown_listener import should_continue
|
from openhands.utils.shutdown_listener import should_continue
|
||||||
@@ -19,9 +18,6 @@ from openhands.utils.shutdown_listener import should_continue
|
|||||||
_REDIS_POLL_TIMEOUT = 1.5
|
_REDIS_POLL_TIMEOUT = 1.5
|
||||||
_CHECK_ALIVE_INTERVAL = 15
|
_CHECK_ALIVE_INTERVAL = 15
|
||||||
|
|
||||||
_CLEANUP_INTERVAL = 15
|
|
||||||
_CLEANUP_EXCEPTION_WAIT_TIME = 15
|
|
||||||
|
|
||||||
|
|
||||||
class ConversationDoesNotExistError(Exception):
|
class ConversationDoesNotExistError(Exception):
|
||||||
pass
|
pass
|
||||||
@@ -37,14 +33,6 @@ class SessionManager:
|
|||||||
_last_alive_timestamps: dict[str, float] = field(default_factory=dict)
|
_last_alive_timestamps: dict[str, float] = field(default_factory=dict)
|
||||||
_redis_listen_task: asyncio.Task | None = None
|
_redis_listen_task: asyncio.Task | None = None
|
||||||
_session_is_running_flags: dict[str, asyncio.Event] = field(default_factory=dict)
|
_session_is_running_flags: dict[str, asyncio.Event] = field(default_factory=dict)
|
||||||
_active_conversations: dict[str, tuple[Conversation, int]] = field(
|
|
||||||
default_factory=dict
|
|
||||||
)
|
|
||||||
_detached_conversations: dict[str, tuple[Conversation, float]] = field(
|
|
||||||
default_factory=dict
|
|
||||||
)
|
|
||||||
_conversations_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
|
|
||||||
_cleanup_task: asyncio.Task | None = None
|
|
||||||
_has_remote_connections_flags: dict[str, asyncio.Event] = field(
|
_has_remote_connections_flags: dict[str, asyncio.Event] = field(
|
||||||
default_factory=dict
|
default_factory=dict
|
||||||
)
|
)
|
||||||
@@ -53,16 +41,12 @@ class SessionManager:
|
|||||||
redis_client = self._get_redis_client()
|
redis_client = self._get_redis_client()
|
||||||
if redis_client:
|
if redis_client:
|
||||||
self._redis_listen_task = asyncio.create_task(self._redis_subscribe())
|
self._redis_listen_task = asyncio.create_task(self._redis_subscribe())
|
||||||
self._cleanup_task = asyncio.create_task(self._cleanup_detached_conversations())
|
|
||||||
return self
|
return self
|
||||||
|
|
||||||
async def __aexit__(self, exc_type, exc_value, traceback):
|
async def __aexit__(self, exc_type, exc_value, traceback):
|
||||||
if self._redis_listen_task:
|
if self._redis_listen_task:
|
||||||
self._redis_listen_task.cancel()
|
self._redis_listen_task.cancel()
|
||||||
self._redis_listen_task = None
|
self._redis_listen_task = None
|
||||||
if self._cleanup_task:
|
|
||||||
self._cleanup_task.cancel()
|
|
||||||
self._cleanup_task = None
|
|
||||||
|
|
||||||
def _get_redis_client(self):
|
def _get_redis_client(self):
|
||||||
redis_client = getattr(self.sio.manager, 'redis', None)
|
redis_client = getattr(self.sio.manager, 'redis', None)
|
||||||
@@ -134,6 +118,7 @@ class SessionManager:
|
|||||||
# Session closing event - We only get this in the event of graceful shutdown,
|
# Session closing event - We only get this in the event of graceful shutdown,
|
||||||
# which can't be guaranteed - nodes can simply vanish unexpectedly!
|
# which can't be guaranteed - nodes can simply vanish unexpectedly!
|
||||||
logger.debug(f'session_closing:{sid}')
|
logger.debug(f'session_closing:{sid}')
|
||||||
|
await call_sync_from_async(runtime_manager.destroy_runtime, sid)
|
||||||
for (
|
for (
|
||||||
connection_id,
|
connection_id,
|
||||||
local_sid,
|
local_sid,
|
||||||
@@ -144,96 +129,15 @@ class SessionManager:
|
|||||||
)
|
)
|
||||||
await self.sio.disconnect(connection_id)
|
await self.sio.disconnect(connection_id)
|
||||||
|
|
||||||
async def attach_to_conversation(self, sid: str) -> Conversation | None:
|
|
||||||
start_time = time.time()
|
|
||||||
if not await session_exists(sid, self.file_store):
|
|
||||||
return None
|
|
||||||
|
|
||||||
async with self._conversations_lock:
|
|
||||||
# Check if we have an active conversation we can reuse
|
|
||||||
if sid in self._active_conversations:
|
|
||||||
conversation, count = self._active_conversations[sid]
|
|
||||||
self._active_conversations[sid] = (conversation, count + 1)
|
|
||||||
logger.info(f'Reusing active conversation {sid}')
|
|
||||||
return conversation
|
|
||||||
|
|
||||||
# Check if we have a detached conversation we can reuse
|
|
||||||
if sid in self._detached_conversations:
|
|
||||||
conversation, _ = self._detached_conversations.pop(sid)
|
|
||||||
self._active_conversations[sid] = (conversation, 1)
|
|
||||||
logger.info(f'Reusing detached conversation {sid}')
|
|
||||||
return conversation
|
|
||||||
|
|
||||||
# Create new conversation if none exists
|
|
||||||
c = Conversation(sid, file_store=self.file_store, config=self.config)
|
|
||||||
try:
|
|
||||||
await c.connect()
|
|
||||||
except AgentRuntimeUnavailableError as e:
|
|
||||||
logger.error(f'Error connecting to conversation {c.sid}: {e}')
|
|
||||||
return None
|
|
||||||
end_time = time.time()
|
|
||||||
logger.info(
|
|
||||||
f'Conversation {c.sid} connected in {end_time - start_time} seconds'
|
|
||||||
)
|
|
||||||
self._active_conversations[sid] = (c, 1)
|
|
||||||
return c
|
|
||||||
|
|
||||||
async def join_conversation(self, sid: str, connection_id: str) -> EventStream:
|
async def join_conversation(self, sid: str, connection_id: str) -> EventStream:
|
||||||
logger.info(f'join_conversation:{sid}:{connection_id}')
|
logger.info(f'join_conversation:{sid}:{connection_id}')
|
||||||
await self.sio.enter_room(connection_id, ROOM_KEY.format(sid=sid))
|
await self.sio.enter_room(connection_id, ROOM_KEY.format(sid=sid))
|
||||||
self.local_connection_id_to_session_id[connection_id] = sid
|
self.local_connection_id_to_session_id[connection_id] = sid
|
||||||
event_stream = await self._get_event_stream(sid)
|
event_stream = await self.get_event_stream(sid)
|
||||||
if not event_stream:
|
if not event_stream:
|
||||||
return await self.maybe_start_agent_loop(sid)
|
return await self.maybe_start_agent_loop(sid)
|
||||||
return event_stream
|
return event_stream
|
||||||
|
|
||||||
async def detach_from_conversation(self, conversation: Conversation):
|
|
||||||
sid = conversation.sid
|
|
||||||
async with self._conversations_lock:
|
|
||||||
if sid in self._active_conversations:
|
|
||||||
conv, count = self._active_conversations[sid]
|
|
||||||
if count > 1:
|
|
||||||
self._active_conversations[sid] = (conv, count - 1)
|
|
||||||
return
|
|
||||||
else:
|
|
||||||
self._active_conversations.pop(sid)
|
|
||||||
self._detached_conversations[sid] = (conversation, time.time())
|
|
||||||
|
|
||||||
async def _cleanup_detached_conversations(self):
|
|
||||||
while should_continue():
|
|
||||||
if self._get_redis_client():
|
|
||||||
# Debug info for HA envs
|
|
||||||
logger.info(
|
|
||||||
f'Attached conversations: {len(self._active_conversations)}'
|
|
||||||
)
|
|
||||||
logger.info(
|
|
||||||
f'Detached conversations: {len(self._detached_conversations)}'
|
|
||||||
)
|
|
||||||
logger.info(
|
|
||||||
f'Running agent loops: {len(self._local_agent_loops_by_sid)}'
|
|
||||||
)
|
|
||||||
logger.info(
|
|
||||||
f'Local connections: {len(self.local_connection_id_to_session_id)}'
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
async with self._conversations_lock:
|
|
||||||
# Create a list of items to process to avoid modifying dict during iteration
|
|
||||||
items = list(self._detached_conversations.items())
|
|
||||||
for sid, (conversation, detach_time) in items:
|
|
||||||
await conversation.disconnect()
|
|
||||||
self._detached_conversations.pop(sid, None)
|
|
||||||
|
|
||||||
await asyncio.sleep(_CLEANUP_INTERVAL)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
async with self._conversations_lock:
|
|
||||||
for conversation, _ in self._detached_conversations.values():
|
|
||||||
await conversation.disconnect()
|
|
||||||
self._detached_conversations.clear()
|
|
||||||
return
|
|
||||||
except Exception:
|
|
||||||
logger.warning('error_cleaning_detached_conversations', exc_info=True)
|
|
||||||
await asyncio.sleep(_CLEANUP_EXCEPTION_WAIT_TIME)
|
|
||||||
|
|
||||||
async def _is_agent_loop_running(self, sid: str) -> bool:
|
async def _is_agent_loop_running(self, sid: str) -> bool:
|
||||||
if await self._is_agent_loop_running_locally(sid):
|
if await self._is_agent_loop_running_locally(sid):
|
||||||
return True
|
return True
|
||||||
@@ -310,19 +214,22 @@ class SessionManager:
|
|||||||
if not await self._is_agent_loop_running(sid):
|
if not await self._is_agent_loop_running(sid):
|
||||||
logger.info(f'start_agent_loop:{sid}')
|
logger.info(f'start_agent_loop:{sid}')
|
||||||
session = Session(
|
session = Session(
|
||||||
sid=sid, file_store=self.file_store, config=self.config, sio=self.sio
|
sid=sid,
|
||||||
|
file_store=self.file_store,
|
||||||
|
config=self.config,
|
||||||
|
sio=self.sio,
|
||||||
)
|
)
|
||||||
self._local_agent_loops_by_sid[sid] = session
|
self._local_agent_loops_by_sid[sid] = session
|
||||||
await session.initialize_agent(conversation_init_data)
|
await session.initialize_agent(conversation_init_data)
|
||||||
|
|
||||||
event_stream = await self._get_event_stream(sid)
|
event_stream = await self.get_event_stream(sid)
|
||||||
if not event_stream:
|
if not event_stream:
|
||||||
logger.error(f'No event stream after starting agent loop: {sid}')
|
logger.error(f'No event stream after starting agent loop: {sid}')
|
||||||
raise RuntimeError(f'no_event_stream:{sid}')
|
raise RuntimeError(f'no_event_stream:{sid}')
|
||||||
return event_stream
|
return event_stream
|
||||||
|
|
||||||
async def _get_event_stream(self, sid: str) -> EventStream | None:
|
async def get_event_stream(self, sid: str) -> EventStream | None:
|
||||||
logger.info(f'_get_event_stream:{sid}')
|
logger.info(f'get_event_stream:{sid}')
|
||||||
session = self._local_agent_loops_by_sid.get(sid)
|
session = self._local_agent_loops_by_sid.get(sid)
|
||||||
if session:
|
if session:
|
||||||
logger.info(f'found_local_agent_loop:{sid}')
|
logger.info(f'found_local_agent_loop:{sid}')
|
||||||
@@ -393,6 +300,7 @@ class SessionManager:
|
|||||||
async def _cleanup_session(self, sid: str) -> bool:
|
async def _cleanup_session(self, sid: str) -> bool:
|
||||||
# Get local connections
|
# Get local connections
|
||||||
logger.info(f'_cleanup_session:{sid}')
|
logger.info(f'_cleanup_session:{sid}')
|
||||||
|
await call_sync_from_async(runtime_manager.destroy_runtime, sid)
|
||||||
has_local_connections = next(
|
has_local_connections = next(
|
||||||
(True for v in self.local_connection_id_to_session_id.values() if v == sid),
|
(True for v in self.local_connection_id_to_session_id.values() if v == sid),
|
||||||
False,
|
False,
|
||||||
|
|||||||
@@ -52,7 +52,9 @@ class Session:
|
|||||||
self.last_active_ts = int(time.time())
|
self.last_active_ts = int(time.time())
|
||||||
self.file_store = file_store
|
self.file_store = file_store
|
||||||
self.agent_session = AgentSession(
|
self.agent_session = AgentSession(
|
||||||
sid, file_store, status_callback=self.queue_status_message
|
sid,
|
||||||
|
file_store,
|
||||||
|
status_callback=self.queue_status_message,
|
||||||
)
|
)
|
||||||
self.agent_session.event_stream.subscribe(
|
self.agent_session.event_stream.subscribe(
|
||||||
EventStreamSubscriber.SERVER, self.on_event, self.sid
|
EventStreamSubscriber.SERVER, self.on_event, self.sid
|
||||||
@@ -131,8 +133,6 @@ class Session:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
await self.agent_session.start(
|
await self.agent_session.start(
|
||||||
runtime_name=self.config.runtime,
|
|
||||||
config=self.config,
|
|
||||||
agent=agent,
|
agent=agent,
|
||||||
max_iterations=max_iterations,
|
max_iterations=max_iterations,
|
||||||
max_budget_per_task=self.config.max_budget_per_task,
|
max_budget_per_task=self.config.max_budget_per_task,
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ import socketio
|
|||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
from openhands.core.config import load_app_config
|
from openhands.core.config import load_app_config
|
||||||
|
from openhands.runtime.runtime_manager import RuntimeManager
|
||||||
from openhands.server.config.openhands_config import load_openhands_config
|
from openhands.server.config.openhands_config import load_openhands_config
|
||||||
from openhands.server.session import SessionManager
|
|
||||||
from openhands.storage import get_file_store
|
from openhands.storage import get_file_store
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
@@ -27,4 +27,4 @@ sio = socketio.AsyncServer(
|
|||||||
async_mode='asgi', cors_allowed_origins='*', client_manager=client_manager
|
async_mode='asgi', cors_allowed_origins='*', client_manager=client_manager
|
||||||
)
|
)
|
||||||
|
|
||||||
session_manager = SessionManager(sio, config, file_store)
|
runtime_manager = RuntimeManager(config)
|
||||||
|
|||||||
@@ -326,7 +326,8 @@ async def test_complete_runtime():
|
|||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_process_issue(mock_output_dir, mock_prompt_template):
|
async def test_process_issue(mock_output_dir, mock_prompt_template):
|
||||||
# Mock dependencies
|
# Mock dependencies
|
||||||
mock_create_runtime = MagicMock()
|
mock_runtime = MagicMock(connect=AsyncMock())
|
||||||
|
mock_create_runtime = AsyncMock(return_value=mock_runtime)
|
||||||
mock_initialize_runtime = AsyncMock()
|
mock_initialize_runtime = AsyncMock()
|
||||||
mock_run_controller = AsyncMock()
|
mock_run_controller = AsyncMock()
|
||||||
mock_complete_runtime = AsyncMock()
|
mock_complete_runtime = AsyncMock()
|
||||||
@@ -408,7 +409,9 @@ async def test_process_issue(mock_output_dir, mock_prompt_template):
|
|||||||
handler_instance.reset_mock()
|
handler_instance.reset_mock()
|
||||||
|
|
||||||
# Mock return values
|
# Mock return values
|
||||||
mock_create_runtime.return_value = MagicMock(connect=AsyncMock())
|
mock_runtime = MagicMock(connect=AsyncMock())
|
||||||
|
mock_create_runtime.return_value = AsyncMock()
|
||||||
|
mock_create_runtime.return_value.__aenter__.return_value = mock_runtime
|
||||||
if test_case['run_controller_raises']:
|
if test_case['run_controller_raises']:
|
||||||
mock_run_controller.side_effect = test_case['run_controller_raises']
|
mock_run_controller.side_effect = test_case['run_controller_raises']
|
||||||
else:
|
else:
|
||||||
|
|||||||
Reference in New Issue
Block a user