Compare commits

..

1 Commits

Author SHA1 Message Date
openhands
3113757859 Sort GitHub repos by pushed_at and add search functionality 2024-12-24 12:46:12 +00:00
30 changed files with 603 additions and 775 deletions

View File

@@ -104,6 +104,22 @@ export const retrieveGitHubUser = async () => {
return user;
};
export const searchPublicGitHubRepo = async (query: string): Promise<GitHubRepository | null> => {
const response = await github.get<{ items: GitHubRepository[] }>(
`/search/repositories`,
{
params: {
q: query,
sort: 'stars',
order: 'desc',
per_page: 1,
},
},
);
return response.data.items[0] || null;
};
export const retrieveLatestGitHubCommit = async (
repository: string,
): Promise<GitHubCommit> => {

View File

@@ -4,6 +4,7 @@ import { useDispatch } from "react-redux";
import posthog from "posthog-js";
import { setSelectedRepository } from "#/state/initial-query-slice";
import { useConfig } from "#/hooks/query/use-config";
import { searchPublicGitHubRepo } from "#/api/github";
interface GitHubRepositorySelectorProps {
onSelect: () => void;
@@ -16,12 +17,45 @@ export function GitHubRepositorySelector({
}: GitHubRepositorySelectorProps) {
const { data: config } = useConfig();
const [selectedKey, setSelectedKey] = React.useState<string | null>(null);
const [searchQuery, setSearchQuery] = React.useState<string>("");
const [searchedRepo, setSearchedRepo] = React.useState<GitHubRepository | null>(null);
const [isSearching, setIsSearching] = React.useState<boolean>(false);
// Add option to install app onto more repos
const finalRepositories =
config?.APP_MODE === "saas"
? [{ id: -1000, full_name: "Add more repositories..." }, ...repositories]
: repositories;
React.useEffect(() => {
const searchTimeout = setTimeout(async () => {
if (searchQuery.trim()) {
setIsSearching(true);
try {
const repo = await searchPublicGitHubRepo(searchQuery);
setSearchedRepo(repo);
} catch (error) {
console.error("Error searching for repo:", error);
}
setIsSearching(false);
} else {
setSearchedRepo(null);
}
}, 500);
return () => clearTimeout(searchTimeout);
}, [searchQuery]);
// Sort repositories by pushed_at
const sortedRepositories = [...repositories].sort((a, b) => {
const dateA = a.pushed_at ? new Date(a.pushed_at).getTime() : 0;
const dateB = b.pushed_at ? new Date(b.pushed_at).getTime() : 0;
return dateB - dateA;
});
// Add option to install app onto more repos and searched repo if found
const finalRepositories = [
...(searchedRepo ? [{
...searchedRepo,
full_name: `${searchedRepo.full_name} (${searchedRepo.stargazers_count} ⭐)`,
}] : []),
...(config?.APP_MODE === "saas" ? [{ id: -1000, full_name: "Add more repositories..." }] : []),
...sortedRepositories,
];
const dispatch = useDispatch();
@@ -67,6 +101,8 @@ export function GitHubRepositorySelector({
aria-label="GitHub Repository"
placeholder="Select a GitHub project"
selectedKey={selectedKey}
inputValue={searchQuery}
onInputChange={(value) => setSearchQuery(value)}
inputProps={{
classNames: {
inputWrapper:
@@ -76,7 +112,7 @@ export function GitHubRepositorySelector({
onSelectionChange={(id) => handleRepoSelection(id?.toString() ?? null)}
clearButtonProps={{ onClick: handleClearSelection }}
listboxProps={{
emptyContent,
emptyContent: isSearching ? "Searching..." : emptyContent,
}}
>
{finalRepositories.map((repo) => (

View File

@@ -79,12 +79,6 @@ export function WsClientProvider({
function handleDisconnect() {
setStatus(WsClientProviderStatus.DISCONNECTED);
const sio = sioRef.current;
if (!sio) {
return;
}
sio.io.opts.query = sio.io.opts.query || {};
sio.io.opts.query.latest_event_id = lastEventRef.current?.id;
}
function handleError() {

View File

@@ -15,7 +15,7 @@ ALWAYS use the GitHub API for operations instead of a web browser.
Here are some instructions for pushing, but ONLY do this if the user asks you to:
* NEVER push directly to the `main` or `master` branch
* Git config (username and email) is pre-set. Do not modify.
* You may already be on a branch starting with `openhands-workspace`. Create a new branch with a better name before pushing.
* You may already be on a branch called `openhands-workspace`. Create a new branch with a better name before pushing.
* Use the GitHub API to create a pull request, if you haven't already
* Use the main branch as the base branch, unless the user requests otherwise
* After opening or updating a pull request, send the user a short message with a link to the pull request.

View File

@@ -35,8 +35,8 @@ from openhands.events.observation import (
NullObservation,
)
from openhands.llm.llm import LLM
from openhands.runtime import get_runtime_cls
from openhands.runtime.base import Runtime
from openhands.runtime.runtime_manager import RuntimeManager
from openhands.security import SecurityAnalyzer, options
from openhands.storage import get_file_store
@@ -125,8 +125,9 @@ async def main():
file_store = get_file_store(config.file_store, config.file_store_path)
event_stream = EventStream(sid, file_store)
runtime_manager = RuntimeManager(config)
runtime: Runtime = await runtime_manager.create_runtime(
runtime_cls = get_runtime_cls(config.runtime)
runtime: Runtime = runtime_cls( # noqa: F841
config=config,
event_stream=event_stream,
sid=sid,
plugins=agent_cls.sandbox_plugins,
@@ -194,6 +195,8 @@ async def main():
event_stream.subscribe(EventStreamSubscriber.MAIN, on_event, str(uuid4()))
await runtime.connect()
asyncio.create_task(prompt_for_next_task())
await run_agent_until_done(

View File

@@ -26,8 +26,8 @@ from openhands.events.event import Event
from openhands.events.observation import AgentStateChangedObservation
from openhands.events.serialization.event import event_to_trajectory
from openhands.llm.llm import LLM
from openhands.runtime import get_runtime_cls
from openhands.runtime.base import Runtime
from openhands.runtime.runtime_manager import RuntimeManager
from openhands.storage import get_file_store
@@ -51,7 +51,7 @@ def read_task_from_stdin() -> str:
return sys.stdin.read()
async def create_runtime(
def create_runtime(
config: AppConfig,
sid: str | None = None,
headless_mode: bool = True,
@@ -77,8 +77,10 @@ async def create_runtime(
agent_cls = openhands.agenthub.Agent.get_cls(config.default_agent)
# runtime and tools
runtime_manager = RuntimeManager(config)
runtime: Runtime = await runtime_manager.create_runtime(
runtime_cls = get_runtime_cls(config.runtime)
logger.debug(f'Initializing runtime: {runtime_cls.__name__}')
runtime: Runtime = runtime_cls(
config=config,
event_stream=event_stream,
sid=session_id,
plugins=agent_cls.sandbox_plugins,
@@ -127,7 +129,8 @@ async def run_controller(
sid = sid or generate_sid(config)
if runtime is None:
runtime = await create_runtime(config, sid=sid, headless_mode=headless_mode)
runtime = create_runtime(config, sid=sid, headless_mode=headless_mode)
await runtime.connect()
event_stream = runtime.event_stream

View File

@@ -199,7 +199,8 @@ async def process_issue(
)
config.set_llm_config(llm_config)
runtime = await create_runtime(config)
runtime = create_runtime(config)
await runtime.connect()
async def on_event(evt):
logger.info(evt)

View File

@@ -2,8 +2,6 @@ import atexit
import copy
import json
import os
import random
import string
from abc import abstractmethod
from pathlib import Path
from typing import Callable
@@ -205,13 +203,8 @@ class Runtime(FileEditRuntimeMixin):
return
url = f'https://{github_token}@github.com/{selected_repository}.git'
dir_name = selected_repository.split('/')[1]
# add random branch name to avoid conflicts
random_str = ''.join(
random.choices(string.ascii_lowercase + string.digits, k=8)
)
branch_name = f'openhands-workspace-{random_str}'
action = CmdRunAction(
command=f'git clone {url} {dir_name} ; cd {dir_name} ; git checkout -b {branch_name}',
command=f'git clone {url} {dir_name} ; cd {dir_name} ; git checkout -b openhands-workspace'
)
self.log('info', f'Cloning repo: {selected_repository}')
self.run_action(action)

View File

@@ -1,21 +0,0 @@
"""Container-related types and utilities."""
import docker
class ContainerInfo:
"""Information about a running container that a Runtime needs."""
def __init__(
self,
container_id: str,
api_url: str,
host_port: int,
container_port: int,
container: docker.models.containers.Container,
):
self.container_id = container_id
self.api_url = api_url
self.host_port = host_port
self.container_port = container_port
self.container = container

View File

@@ -1,17 +1,26 @@
import atexit
import os
import tempfile
import threading
from functools import lru_cache
from pathlib import Path
from typing import Callable
from zipfile import ZipFile
import docker
import requests
import tenacity
from openhands.core.config import AppConfig
from openhands.core.exceptions import (
AgentRuntimeDisconnectedError,
AgentRuntimeError,
AgentRuntimeNotFoundError,
AgentRuntimeNotReadyError,
AgentRuntimeTimeoutError,
)
from openhands.core.logger import DEBUG
from openhands.core.logger import openhands_logger as logger
from openhands.events import EventStream
from openhands.events.action import (
ActionConfirmationStatus,
@@ -33,11 +42,24 @@ from openhands.events.observation import (
from openhands.events.serialization import event_to_dict, observation_from_dict
from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
from openhands.runtime.base import Runtime
from openhands.runtime.container import ContainerInfo
from openhands.runtime.builder import DockerRuntimeBuilder
from openhands.runtime.impl.eventstream.containers import remove_all_containers
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils import find_available_tcp_port
from openhands.runtime.utils.log_streamer import LogStreamer
from openhands.runtime.utils.request import send_request
from openhands.runtime.utils.runtime_build import build_runtime_image
from openhands.utils.async_utils import call_sync_from_async
from openhands.utils.tenacity_stop import stop_if_should_exit
CONTAINER_NAME_PREFIX = 'openhands-runtime-'
def remove_all_runtime_containers():
remove_all_containers(CONTAINER_NAME_PREFIX)
_atexit_registered = False
class EventStreamRuntime(Runtime):
@@ -52,6 +74,8 @@ class EventStreamRuntime(Runtime):
env_vars (dict[str, str] | None, optional): Environment variables to set. Defaults to None.
"""
# Need to provide this method to allow inheritors to init the Runtime
# without initting the EventStreamRuntime.
def init_base_runtime(
self,
config: AppConfig,
@@ -84,15 +108,31 @@ class EventStreamRuntime(Runtime):
status_callback: Callable | None = None,
attach_to_existing: bool = False,
headless_mode: bool = True,
container_info: ContainerInfo | None = None,
):
global _atexit_registered
if not _atexit_registered:
_atexit_registered = True
atexit.register(remove_all_runtime_containers)
self.config = config
self._vscode_url: str | None = None
self._host_port = 30000 # initial dummy value
self._container_port = 30001 # initial dummy value
self._vscode_url: str | None = None # initial dummy value
self._runtime_initialized: bool = False
self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
self.session = requests.Session()
self.status_callback = status_callback
self.container_info = container_info
self.docker_client: docker.DockerClient = self._init_docker_client()
self.base_container_image = self.config.sandbox.base_container_image
self.runtime_container_image = self.config.sandbox.runtime_container_image
self.container_name = CONTAINER_NAME_PREFIX + sid
self.container = None
self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time
self.runtime_builder = DockerRuntimeBuilder(self.docker_client)
# Buffer for container logs
self.log_streamer: LogStreamer | None = None
self.init_base_runtime(
@@ -114,22 +154,52 @@ class EventStreamRuntime(Runtime):
)
async def connect(self):
"""Initialize the runtime with the provided container."""
if not self.container_info:
raise RuntimeError('Container info not provided')
self.send_status_message('STATUS$STARTING_RUNTIME')
self.log_streamer = LogStreamer(self.container_info.container, self.log)
try:
await call_sync_from_async(self._attach_to_container)
except docker.errors.NotFound as e:
if self.attach_to_existing:
self.log(
'error',
f'Container {self.container_name} not found.',
)
raise e
if self.runtime_container_image is None:
if self.base_container_image is None:
raise ValueError(
'Neither runtime container image nor base container image is set'
)
self.send_status_message('STATUS$STARTING_CONTAINER')
self.runtime_container_image = build_runtime_image(
self.base_container_image,
self.runtime_builder,
platform=self.config.sandbox.platform,
extra_deps=self.config.sandbox.runtime_extra_deps,
force_rebuild=self.config.sandbox.force_rebuild_runtime,
extra_build_args=self.config.sandbox.runtime_extra_build_args,
)
if not self.attach_to_existing:
self.log(
'info', f'Starting runtime with image: {self.runtime_container_image}'
)
await call_sync_from_async(self._init_container)
self.log(
'info',
f'Waiting for client to become ready at {self.container_info.api_url}...',
f'Container started: {self.container_name}. VSCode URL: {self.vscode_url}',
)
self.log_streamer = LogStreamer(self.container, self.log)
if not self.attach_to_existing:
self.log('info', f'Waiting for client to become ready at {self.api_url}...')
self.send_status_message('STATUS$WAITING_FOR_CLIENT')
await call_sync_from_async(self._wait_until_alive)
if not self.attach_to_existing:
self.log('info', 'Runtime is ready.')
if not self.attach_to_existing:
await call_sync_from_async(self.setup_initial_env)
self.log(
@@ -140,24 +210,207 @@ class EventStreamRuntime(Runtime):
self.send_status_message(' ')
self._runtime_initialized = True
def close(self):
"""Closes the EventStreamRuntime and associated objects."""
@staticmethod
@lru_cache(maxsize=1)
def _init_docker_client() -> docker.DockerClient:
try:
return docker.from_env()
except Exception as ex:
logger.error(
'Launch docker client failed. Please make sure you have installed docker and started docker desktop/daemon.',
)
raise ex
def _init_container(self):
self.log('debug', 'Preparing to start container...')
self.send_status_message('STATUS$PREPARING_CONTAINER')
plugin_arg = ''
if self.plugins is not None and len(self.plugins) > 0:
plugin_arg = (
f'--plugins {" ".join([plugin.name for plugin in self.plugins])} '
)
self._host_port = self._find_available_port()
self._container_port = (
self._host_port
) # in future this might differ from host port
self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
use_host_network = self.config.sandbox.use_host_network
network_mode: str | None = 'host' if use_host_network else None
port_mapping: dict[str, list[dict[str, str]]] | None = (
None
if use_host_network
else {f'{self._container_port}/tcp': [{'HostPort': str(self._host_port)}]}
)
if use_host_network:
self.log(
'warn',
'Using host network mode. If you are using MacOS, please make sure you have the latest version of Docker Desktop and enabled host network feature: https://docs.docker.com/network/drivers/host/#docker-desktop',
)
# Combine environment variables
environment = {
'port': str(self._container_port),
'PYTHONUNBUFFERED': 1,
}
if self.config.debug or DEBUG:
environment['DEBUG'] = 'true'
if self.vscode_enabled:
# vscode is on port +1 from container port
if isinstance(port_mapping, dict):
port_mapping[f'{self._container_port + 1}/tcp'] = [
{'HostPort': str(self._host_port + 1)}
]
self.log('debug', f'Workspace Base: {self.config.workspace_base}')
if (
self.config.workspace_mount_path is not None
and self.config.workspace_mount_path_in_sandbox is not None
):
# e.g. result would be: {"/home/user/openhands/workspace": {'bind': "/workspace", 'mode': 'rw'}}
volumes = {
self.config.workspace_mount_path: {
'bind': self.config.workspace_mount_path_in_sandbox,
'mode': 'rw',
}
}
logger.debug(f'Mount dir: {self.config.workspace_mount_path}')
else:
logger.debug(
'Mount dir is not set, will not mount the workspace directory to the container'
)
volumes = None
self.log(
'debug',
f'Sandbox workspace: {self.config.workspace_mount_path_in_sandbox}',
)
if self.config.sandbox.browsergym_eval_env is not None:
browsergym_arg = (
f'--browsergym-eval-env {self.config.sandbox.browsergym_eval_env}'
)
else:
browsergym_arg = ''
try:
self.container = self.docker_client.containers.run(
self.runtime_container_image,
command=(
f'/openhands/micromamba/bin/micromamba run -n openhands '
f'poetry run '
f'python -u -m openhands.runtime.action_execution_server {self._container_port} '
f'--working-dir "{self.config.workspace_mount_path_in_sandbox}" '
f'{plugin_arg}'
f'--username {"openhands" if self.config.run_as_openhands else "root"} '
f'--user-id {self.config.sandbox.user_id} '
f'{browsergym_arg}'
),
network_mode=network_mode,
ports=port_mapping,
working_dir='/openhands/code/', # do not change this!
name=self.container_name,
detach=True,
environment=environment,
volumes=volumes,
)
self.log('debug', f'Container started. Server url: {self.api_url}')
self.send_status_message('STATUS$CONTAINER_STARTED')
except docker.errors.APIError as e:
if '409' in str(e):
self.log(
'warning',
f'Container {self.container_name} already exists. Removing...',
)
remove_all_containers(self.container_name)
return self._init_container()
else:
self.log(
'error',
f'Error: Instance {self.container_name} FAILED to start container!\n',
)
except Exception as e:
self.log(
'error',
f'Error: Instance {self.container_name} FAILED to start container!\n',
)
self.log('error', str(e))
self.close()
raise e
def _attach_to_container(self):
self._container_port = 0
self.container = self.docker_client.containers.get(self.container_name)
for port in self.container.attrs['NetworkSettings']['Ports']: # type: ignore
self._container_port = int(port.split('/')[0])
break
self._host_port = self._container_port
self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
self.log(
'debug',
f'attached to container: {self.container_name} {self._container_port} {self.api_url}',
)
@tenacity.retry(
stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
retry=tenacity.retry_if_exception_type(
(ConnectionError, requests.exceptions.ConnectionError)
),
reraise=True,
wait=tenacity.wait_fixed(2),
)
def _wait_until_alive(self):
try:
container = self.docker_client.containers.get(self.container_name)
if container.status == 'exited':
raise AgentRuntimeDisconnectedError(
f'Container {self.container_name} has exited.'
)
except docker.errors.NotFound:
raise AgentRuntimeNotFoundError(
f'Container {self.container_name} not found.'
)
if not self.log_streamer:
raise AgentRuntimeNotReadyError('Runtime client is not ready.')
with send_request(
self.session,
'GET',
f'{self.api_url}/alive',
timeout=5,
):
pass
def close(self, rm_all_containers: bool | None = None):
"""Closes the EventStreamRuntime and associated objects
Parameters:
- rm_all_containers (bool): Whether to remove all containers with the 'openhands-sandbox-' prefix
"""
if self.log_streamer:
self.log_streamer.close()
if self.session:
self.session.close()
if rm_all_containers is None:
rm_all_containers = self.config.sandbox.rm_all_containers
if self.config.sandbox.keep_runtime_alive or self.attach_to_existing:
return
close_prefix = (
CONTAINER_NAME_PREFIX if rm_all_containers else self.container_name
)
remove_all_containers(close_prefix)
def run_action(self, action: Action) -> Observation:
if isinstance(action, FileEditAction):
return self.edit(action)
if not self.container_info:
return ErrorObservation(
'Runtime container is not initialized.',
error_id='AGENT_ERROR$RUNTIME_NOT_READY',
)
# set timeout to default if not set
if action.timeout is None:
action.timeout = self.config.sandbox.timeout
@@ -193,7 +446,7 @@ class EventStreamRuntime(Runtime):
with send_request(
self.session,
'POST',
f'{self.container_info.api_url}/execute_action',
f'{self.api_url}/execute_action',
json={'action': event_to_dict(action)},
# wait a few more seconds to get the timeout error from client side
timeout=action.timeout + 5,
@@ -226,15 +479,16 @@ class EventStreamRuntime(Runtime):
def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
return self.run_action(action)
# ====================================================================
# Implement these methods (for file operations) in the subclass
# ====================================================================
def copy_to(
self, host_src: str, sandbox_dest: str, recursive: bool = False
) -> None:
if not os.path.exists(host_src):
raise FileNotFoundError(f'Source file {host_src} does not exist')
if not self.container_info:
raise RuntimeError('Runtime container is not initialized.')
try:
if recursive:
# For recursive copy, create a zip file
@@ -262,7 +516,7 @@ class EventStreamRuntime(Runtime):
with send_request(
self.session,
'POST',
f'{self.container_info.api_url}/upload_file',
f'{self.api_url}/upload_file',
files=upload_data,
params=params,
timeout=300,
@@ -285,8 +539,6 @@ class EventStreamRuntime(Runtime):
If path is None, list files in the sandbox's initial working directory (e.g., /workspace).
"""
if not self.container_info:
raise RuntimeError('Runtime container is not initialized.')
try:
data = {}
@@ -296,7 +548,7 @@ class EventStreamRuntime(Runtime):
with send_request(
self.session,
'POST',
f'{self.container_info.api_url}/list_files',
f'{self.api_url}/list_files',
json=data,
timeout=10,
) as response:
@@ -308,15 +560,13 @@ class EventStreamRuntime(Runtime):
def copy_from(self, path: str) -> Path:
"""Zip all files in the sandbox and return as a stream of bytes."""
if not self.container_info:
raise RuntimeError('Runtime container is not initialized.')
try:
params = {'path': path}
with send_request(
self.session,
'GET',
f'{self.container_info.api_url}/download_files',
f'{self.api_url}/download_files',
params=params,
stream=True,
timeout=30,
@@ -329,9 +579,26 @@ class EventStreamRuntime(Runtime):
except requests.Timeout:
raise TimeoutError('Copy operation timed out')
def _is_port_in_use_docker(self, port):
containers = self.docker_client.containers.list()
for container in containers:
container_ports = container.ports
if str(port) in str(container_ports):
return True
return False
def _find_available_port(self, max_attempts=5):
port = 39999
for _ in range(max_attempts):
port = find_available_tcp_port(30000, 39999)
if not self._is_port_in_use_docker(port):
return port
# If no port is found after max_attempts, return the last tried port
return port
@property
def vscode_url(self) -> str | None:
if self.vscode_enabled and self._runtime_initialized and self.container_info:
if self.vscode_enabled and self._runtime_initialized:
if (
hasattr(self, '_vscode_url') and self._vscode_url is not None
): # cached value
@@ -340,14 +607,14 @@ class EventStreamRuntime(Runtime):
with send_request(
self.session,
'GET',
f'{self.container_info.api_url}/vscode/connection_token',
f'{self.api_url}/vscode/connection_token',
timeout=10,
) as response:
response_json = response.json()
assert isinstance(response_json, dict)
if response_json['token'] is None:
return None
self._vscode_url = f'http://localhost:{self.container_info.host_port + 1}/?tkn={response_json["token"]}&folder={self.config.workspace_mount_path_in_sandbox}'
self._vscode_url = f'http://localhost:{self._host_port + 1}/?tkn={response_json["token"]}&folder={self.config.workspace_mount_path_in_sandbox}'
self.log(
'debug',
f'VSCode URL: {self._vscode_url}',

View File

@@ -1,456 +0,0 @@
import atexit
import functools
from typing import Dict, List, Optional
import docker
import requests
import tenacity
from openhands.core.config import AppConfig
from openhands.core.exceptions import (
AgentRuntimeDisconnectedError,
AgentRuntimeNotFoundError,
AgentRuntimeNotReadyError,
AgentRuntimeUnavailableError,
)
from openhands.core.logger import DEBUG
from openhands.core.logger import openhands_logger as logger
from openhands.events import EventStream
from openhands.runtime.base import Runtime
from openhands.runtime.builder import DockerRuntimeBuilder
from openhands.runtime.container import ContainerInfo
from openhands.runtime.impl.eventstream.eventstream_runtime import EventStreamRuntime
from openhands.runtime.plugins import PluginRequirement, VSCodeRequirement
from openhands.runtime.runtime_manager import RuntimeManager
from openhands.runtime.utils import find_available_tcp_port
from openhands.runtime.utils.log_streamer import LogStreamer
from openhands.runtime.utils.request import send_request
from openhands.runtime.utils.runtime_build import build_runtime_image
from openhands.utils.tenacity_stop import stop_if_should_exit
CONTAINER_NAME_PREFIX = 'openhands-runtime-'
_atexit_registered = False
class EventStreamRuntimeManager(RuntimeManager):
"""Manages Docker container lifecycle for EventStreamRuntime instances."""
def __init__(self, config: AppConfig):
super().__init__(config)
global _atexit_registered
if not _atexit_registered:
_atexit_registered = True
atexit.register(self._cleanup_all_containers)
self._containers: Dict[str, ContainerInfo] = {}
self._docker_client = self._init_docker_client()
self._runtime_builder: DockerRuntimeBuilder = DockerRuntimeBuilder(
self._docker_client
)
@staticmethod
@functools.lru_cache(maxsize=1)
def _init_docker_client() -> docker.DockerClient:
try:
return docker.from_env()
except Exception as ex:
logger.error(
'Launch docker client failed. Please make sure you have installed docker and started docker desktop/daemon.',
)
raise ex
async def create_runtime(
self,
event_stream: EventStream,
sid: str,
plugins: Optional[List[PluginRequirement]] = None,
env_vars: Optional[Dict[str, str]] = None,
status_callback=None,
attach_to_existing: bool = False,
headless_mode: bool = False,
) -> Runtime:
"""Create a new EventStreamRuntime with an initialized container.
This overrides the base create_runtime to handle container initialization
before creating the runtime.
"""
if sid in self._runtimes:
raise RuntimeError(f'Runtime with ID {sid} already exists')
# First initialize or attach to the container
try:
if attach_to_existing:
container_info = self.attach_to_container(sid)
else:
runtime_container_image = self.config.sandbox.runtime_container_image
if runtime_container_image is None:
if self.config.sandbox.base_container_image is None:
raise ValueError(
'Neither runtime container image nor base container image is set'
)
if status_callback:
status_callback('info', 'STATUS$STARTING_CONTAINER')
runtime_container_image = build_runtime_image(
self.config.sandbox.base_container_image,
self._runtime_builder,
platform=self.config.sandbox.platform,
extra_deps=self.config.sandbox.runtime_extra_deps,
force_rebuild=self.config.sandbox.force_rebuild_runtime,
extra_build_args=self.config.sandbox.runtime_extra_build_args,
)
container_info = self.initialize_container(
runtime_container_image,
sid,
plugins,
env_vars,
status_callback,
)
# Create the runtime with the initialized container
runtime = EventStreamRuntime(
config=self.config,
event_stream=event_stream,
sid=sid,
plugins=plugins,
env_vars=env_vars,
status_callback=status_callback,
attach_to_existing=attach_to_existing,
headless_mode=headless_mode,
container_info=container_info,
)
# Initialize the runtime
try:
await runtime.connect()
except AgentRuntimeUnavailableError as e:
logger.error(f'Runtime initialization failed: {e}', exc_info=True)
if status_callback:
status_callback(
'error', 'STATUS$ERROR_RUNTIME_DISCONNECTED', str(e)
)
self._cleanup_container(sid)
raise
self._runtimes[sid] = runtime
logger.info(f'Created runtime with ID: {sid}')
return runtime
except Exception as e:
logger.error(f'Failed to create runtime: {str(e)}')
self._cleanup_container(sid)
raise
def _is_port_in_use_docker(self, port):
containers = self._docker_client.containers.list()
for container in containers:
container_ports = container.ports
if str(port) in str(container_ports):
return True
return False
def _find_available_port(self, max_attempts=5):
port = 39999
for _ in range(max_attempts):
port = find_available_tcp_port(30000, 39999)
if not self._is_port_in_use_docker(port):
return port
return port
def initialize_container(
self,
runtime_container_image: str,
sid: str,
plugins: Optional[list[PluginRequirement]] = None,
env_vars: Optional[Dict[str, str]] = None,
status_callback=None,
) -> ContainerInfo:
"""Initialize a new container for a runtime.
Args:
runtime_container_image: The Docker image to use
sid: The session ID that will be used to generate the container name
plugins: Optional list of plugins to enable
env_vars: Optional environment variables to set
status_callback: Optional callback for status updates
Returns:
ContainerInfo object with connection details
"""
logger.debug('Preparing to start container...')
if status_callback:
status_callback('info', 'STATUS$PREPARING_CONTAINER')
container_name = f'{CONTAINER_NAME_PREFIX}{sid}'
if container_name in self._containers:
raise RuntimeError(f'Container {container_name} already exists')
# Find an available port
container_port = self._find_available_port()
host_port = container_port # In future this might differ
plugin_arg = ''
if plugins:
plugin_arg = f'--plugins {" ".join([plugin.name for plugin in plugins])} '
use_host_network = self.config.sandbox.use_host_network
network_mode: str | None = 'host' if use_host_network else None
port_mapping: dict[str, list[dict[str, str]]] | None = (
None
if use_host_network
else {f'{container_port}/tcp': [{'HostPort': str(host_port)}]}
)
if use_host_network:
logger.warn(
'Using host network mode. If you are using MacOS, please make sure you have the latest version of Docker Desktop and enabled host network feature: https://docs.docker.com/network/drivers/host/#docker-desktop',
)
environment = {
'port': str(container_port),
'PYTHONUNBUFFERED': '1',
**(env_vars or {}),
}
if self.config.debug or DEBUG:
environment['DEBUG'] = 'true'
if any(isinstance(plugin, VSCodeRequirement) for plugin in (plugins or [])):
if isinstance(port_mapping, dict):
port_mapping[f'{container_port + 1}/tcp'] = [
{'HostPort': str(host_port + 1)}
]
logger.debug(f'Workspace Base: {self.config.workspace_base}')
if (
self.config.workspace_mount_path is not None
and self.config.workspace_mount_path_in_sandbox is not None
):
volumes = {
self.config.workspace_mount_path: {
'bind': self.config.workspace_mount_path_in_sandbox,
'mode': 'rw',
}
}
logger.debug(f'Mount dir: {self.config.workspace_mount_path}')
else:
logger.debug(
'Mount dir is not set, will not mount the workspace directory to the container'
)
volumes = {}
logger.debug(
f'Sandbox workspace: {self.config.workspace_mount_path_in_sandbox}',
)
browsergym_arg = ''
if self.config.sandbox.browsergym_eval_env is not None:
browsergym_arg = (
f'--browsergym-eval-env {self.config.sandbox.browsergym_eval_env}'
)
try:
container = self._docker_client.containers.run(
runtime_container_image,
command=(
f'/openhands/micromamba/bin/micromamba run -n openhands '
f'poetry run '
f'python -u -m openhands.runtime.action_execution_server {container_port} '
f'--working-dir "{self.config.workspace_mount_path_in_sandbox}" '
f'{plugin_arg}'
f'--username {"openhands" if self.config.run_as_openhands else "root"} '
f'--user-id {self.config.sandbox.user_id} '
f'{browsergym_arg}'
),
network_mode=network_mode,
ports=port_mapping,
working_dir='/openhands/code/',
name=container_name,
detach=True,
environment=environment,
volumes=volumes,
)
api_url = f'{self.config.sandbox.local_runtime_url}:{container_port}'
logger.debug(f'Container started. Server url: {api_url}')
if status_callback:
status_callback('info', 'STATUS$CONTAINER_STARTED')
container_info = ContainerInfo(
container_id=container.id,
api_url=api_url,
host_port=host_port,
container_port=container_port,
container=container,
)
self._containers[container_name] = container_info
return container_info
except docker.errors.APIError as e:
if '409' in str(e):
logger.warning(
f'Container {container_name} already exists. Removing...',
)
self._cleanup_container(container_name)
return self.initialize_container(
runtime_container_image,
sid,
plugins,
env_vars,
status_callback,
)
else:
logger.error(
f'Error: Instance {container_name} FAILED to start container!\n',
)
raise
except Exception as e:
logger.error(
f'Error: Instance {container_name} FAILED to start container!\n',
)
logger.error(str(e))
raise
def attach_to_container(self, sid: str) -> ContainerInfo:
"""Attach to an existing container.
Args:
sid: The session ID used to generate the container name
Returns:
ContainerInfo object with connection details
Raises:
AgentRuntimeNotFoundError: If the container doesn't exist
"""
container_name = f'{CONTAINER_NAME_PREFIX}{sid}'
# Check if we already have the container info
if container_name in self._containers:
return self._containers[container_name]
try:
container = self._docker_client.containers.get(container_name)
container_port = 0
for port in container.attrs['NetworkSettings']['Ports']:
container_port = int(port.split('/')[0])
break
host_port = container_port # In future this might differ
api_url = f'{self.config.sandbox.local_runtime_url}:{container_port}'
container_info = ContainerInfo(
container_id=container.id,
api_url=api_url,
host_port=host_port,
container_port=container_port,
container=container,
)
self._containers[container_name] = container_info
return container_info
except docker.errors.NotFound:
raise AgentRuntimeNotFoundError(f'Container {container_name} not found.')
@tenacity.retry(
stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
retry=tenacity.retry_if_exception_type(
(ConnectionError, requests.exceptions.ConnectionError)
),
reraise=True,
wait=tenacity.wait_fixed(2),
)
def wait_until_alive(
self,
sid: str,
log_streamer: Optional[LogStreamer] = None,
):
"""Wait until a container is ready to accept connections.
Args:
sid: The session ID used to generate the container name
log_streamer: Optional log streamer that must be ready
Raises:
AgentRuntimeNotFoundError: If the container doesn't exist
AgentRuntimeDisconnectedError: If the container has exited
AgentRuntimeNotReadyError: If the log streamer isn't ready
"""
container_name = f'{CONTAINER_NAME_PREFIX}{sid}'
container_info = self._containers.get(container_name)
if not container_info:
raise AgentRuntimeNotFoundError(f'Container {container_name} not found.')
try:
if container_info.container.status == 'exited':
raise AgentRuntimeDisconnectedError(
f'Container {container_name} has exited.'
)
except docker.errors.NotFound:
raise AgentRuntimeNotFoundError(f'Container {container_name} not found.')
if not log_streamer:
raise AgentRuntimeNotReadyError('Runtime client is not ready.')
with send_request(
requests.Session(),
'GET',
f'{container_info.api_url}/alive',
timeout=5,
):
pass
def _cleanup_container(self, sid: str, remove_all: bool = False) -> None:
"""Clean up a container and its resources.
Args:
sid: The session ID used to generate the container name
remove_all: If True, remove all containers with the same prefix
"""
container_name = f'{CONTAINER_NAME_PREFIX}{sid}'
if remove_all:
self._cleanup_all_containers()
else:
try:
container = self._docker_client.containers.get(container_name)
container.remove(force=True)
if container_name in self._containers:
del self._containers[container_name]
except docker.errors.NotFound:
pass
def _cleanup_all_containers(self):
"""Clean up all containers managed by this RuntimeManager."""
containers = self._docker_client.containers.list(all=True)
for container in containers:
if container.name.startswith(CONTAINER_NAME_PREFIX):
try:
container.remove(force=True)
except docker.errors.NotFound:
pass
self._containers.clear()
def destroy_runtime(self, runtime_id: str) -> bool:
"""Destroy a runtime and its container.
Args:
runtime_id: The runtime ID to destroy
Returns:
True if the runtime was found and destroyed, False otherwise
"""
runtime = self._runtimes.get(runtime_id)
if runtime:
runtime.close()
self._cleanup_container(runtime_id)
del self._runtimes[runtime_id]
logger.info(f'Destroyed runtime with ID: {runtime_id}')
return True
return False
async def destroy_all_runtimes(self):
"""Destroy all runtimes and their containers."""
for runtime_id in list(self._runtimes.keys()):
self.destroy_runtime(runtime_id)
self._cleanup_all_containers()

View File

@@ -16,7 +16,6 @@ from openhands.runtime.impl.eventstream.eventstream_runtime import (
)
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils.command import get_remote_startup_command
from openhands.runtime.utils.request import send_request
from openhands.runtime.utils.runtime_build import (
BuildFromImageType,
prep_build_folder,
@@ -182,7 +181,7 @@ class ModalRuntime(EventStreamRuntime):
self.log('debug', 'Waiting for client to become ready...')
self.send_status_message('STATUS$WAITING_FOR_CLIENT')
await call_sync_from_async(self._wait_until_alive)
self._wait_until_alive()
self.setup_initial_env()
if not self.attach_to_existing:
@@ -291,26 +290,6 @@ echo 'export INPUTRC=/etc/inputrc' >> /etc/bash.bashrc
self.close()
raise e
@tenacity.retry(
stop=tenacity.stop_after_delay(120),
retry=tenacity.retry_if_exception_type(
(ConnectionError, requests.exceptions.ConnectionError)
),
reraise=True,
wait=tenacity.wait_fixed(2),
)
def _wait_until_alive(self):
if not self.sandbox:
raise RuntimeError('Sandbox not initialized')
with send_request(
self.session,
'GET',
f'{self.api_url}/alive',
timeout=5,
):
pass
def close(self):
"""Closes the ModalRuntime and associated objects."""
if self.log_streamer:

View File

@@ -1,77 +0,0 @@
from typing import Dict, List, Optional
from openhands.core.config import AppConfig
from openhands.core.exceptions import AgentRuntimeUnavailableError
from openhands.core.logger import openhands_logger as logger
from openhands.events import EventStream
from openhands.runtime import get_runtime_cls
from openhands.runtime.base import Runtime
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils.singleton import Singleton
class RuntimeManager(metaclass=Singleton):
def __init__(self, config: AppConfig):
self._runtimes: Dict[str, Runtime] = {}
self._config = config
@property
def config(self) -> AppConfig:
return self._config
async def create_runtime(
self,
event_stream: EventStream,
sid: str,
plugins: Optional[List[PluginRequirement]] = None,
env_vars: Optional[Dict[str, str]] = None,
status_callback=None,
attach_to_existing: bool = False,
headless_mode: bool = False,
) -> Runtime:
if sid in self._runtimes:
raise RuntimeError(f'Runtime with ID {sid} already exists')
runtime_class = get_runtime_cls(self.config.runtime)
logger.debug(f'Initializing runtime: {runtime_class.__name__}')
runtime = runtime_class(
config=self.config,
event_stream=event_stream,
sid=sid,
plugins=plugins,
env_vars=env_vars,
status_callback=status_callback,
attach_to_existing=attach_to_existing,
headless_mode=headless_mode,
)
try:
await runtime.connect()
except AgentRuntimeUnavailableError as e:
logger.error(f'Runtime initialization failed: {e}', exc_info=True)
if status_callback:
status_callback('error', 'STATUS$ERROR_RUNTIME_DISCONNECTED', str(e))
raise
self._runtimes[sid] = runtime
logger.info(f'Created runtime with ID: {sid}')
return runtime
def get_runtime(self, sid: str) -> Optional[Runtime]:
return self._runtimes.get(sid)
def list_runtimes(self) -> List[str]:
return list(self._runtimes.keys())
def destroy_runtime(self, sid: str) -> bool:
runtime = self._runtimes.get(sid)
if runtime:
runtime.close()
del self._runtimes[sid]
logger.info(f'Destroyed runtime with ID: {sid}')
return True
return False
async def destroy_all_runtimes(self):
for runtime_id in list(self._runtimes.keys()):
self.destroy_runtime(runtime_id)

View File

@@ -1,14 +0,0 @@
class Singleton(type):
"""Metaclass for creating singleton classes.
Usage:
class MyClass(metaclass=Singleton):
pass
"""
_instances: dict = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]

View File

@@ -15,7 +15,6 @@ from openhands.server.middleware import (
LocalhostCORSMiddleware,
NoCacheMiddleware,
RateLimitMiddleware,
session_manager,
)
from openhands.server.routes.conversation import app as conversation_api_router
from openhands.server.routes.feedback import app as feedback_api_router
@@ -25,7 +24,7 @@ from openhands.server.routes.new_conversation import app as new_conversation_api
from openhands.server.routes.public import app as public_api_router
from openhands.server.routes.security import app as security_api_router
from openhands.server.routes.settings import app as settings_router
from openhands.server.shared import openhands_config
from openhands.server.shared import openhands_config, session_manager
from openhands.utils.import_utils import get_impl

View File

@@ -13,9 +13,8 @@ from openhands.events.observation import (
from openhands.events.observation.agent import AgentStateChangedObservation
from openhands.events.serialization import event_to_dict
from openhands.events.stream import AsyncEventStreamWrapper
from openhands.server.middleware import session_manager
from openhands.server.session.manager import ConversationDoesNotExistError
from openhands.server.shared import config, openhands_config, sio
from openhands.server.shared import config, openhands_config, session_manager, sio
from openhands.server.types import AppMode
from openhands.storage.conversation.conversation_store import (
ConversationStore,

View File

@@ -10,12 +10,9 @@ from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import ASGIApp
from openhands.server.session import SessionManager
from openhands.server.shared import config, file_store, runtime_manager, sio
from openhands.server.shared import session_manager
from openhands.server.types import SessionMiddlewareInterface
session_manager = SessionManager(sio, config, file_store)
class LocalhostCORSMiddleware(CORSMiddleware):
"""
@@ -137,17 +134,10 @@ class AttachConversationMiddleware(SessionMiddlewareInterface):
"""
Attach the user's session based on the provided authentication token.
"""
request.state.runtime = runtime_manager.get_runtime(request.state.sid)
if request.state.runtime is None:
event_stream = await session_manager.get_event_stream(request.state.sid)
if event_stream:
request.state.runtime = await runtime_manager.create_runtime(
event_stream=event_stream,
sid=request.state.sid,
attach_to_existing=True,
headless_mode=False,
)
if not request.state.runtime:
request.state.conversation = await session_manager.attach_to_conversation(
request.state.sid
)
if not request.state.conversation:
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={'error': 'Session not found'},
@@ -158,7 +148,7 @@ class AttachConversationMiddleware(SessionMiddlewareInterface):
"""
Detach the user's session.
"""
pass
await session_manager.detach_from_conversation(request.state.conversation)
async def __call__(self, request: Request, call_next: Callable):
if not self._should_attach(request):

View File

@@ -13,7 +13,7 @@ async def get_remote_runtime_config(request: Request):
Currently, this is the session ID and runtime ID (if available).
"""
runtime = request.state.runtime
runtime = request.state.conversation.runtime
runtime_id = runtime.runtime_id if hasattr(runtime, 'runtime_id') else None
session_id = runtime.sid if hasattr(runtime, 'sid') else None
return JSONResponse(
@@ -37,7 +37,7 @@ async def get_vscode_url(request: Request):
JSONResponse: A JSON response indicating the success of the operation.
"""
try:
runtime: Runtime = request.state.runtime
runtime: Runtime = request.state.conversation.runtime
logger.debug(f'Runtime type: {type(runtime)}')
logger.debug(f'Runtime VSCode URL: {runtime.vscode_url}')
return JSONResponse(status_code=200, content={'vscode_url': runtime.vscode_url})
@@ -81,12 +81,12 @@ async def search_events(
HTTPException: If conversation is not found
ValueError: If limit is less than 1 or greater than 100
"""
if not request.state.runtime:
if not request.state.conversation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail='Conversation not found'
)
# Get matching events from the stream
event_stream = request.state.runtime.event_stream
event_stream = request.state.conversation.event_stream
matching_events = event_stream.get_matching_events(
query=query,
event_type=event_type,

View File

@@ -35,7 +35,7 @@ async def submit_feedback(request: Request, conversation_id: str):
# and there is a function to handle the storage.
body = await request.json()
async_stream = AsyncEventStreamWrapper(
request.state.runtime.event_stream, filter_hidden=True
request.state.conversation.event_stream, filter_hidden=True
)
trajectory = []
async for event in async_stream:

View File

@@ -58,13 +58,13 @@ async def list_files(request: Request, conversation_id: str, path: str | None =
Raises:
HTTPException: If there's an error listing the files.
"""
if not request.state.runtime:
if not request.state.conversation.runtime:
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={'error': 'Runtime not yet initialized'},
)
runtime: Runtime = request.state.runtime
runtime: Runtime = request.state.conversation.runtime
try:
file_list = await call_sync_from_async(runtime.list_files, path)
except AgentRuntimeUnavailableError as e:
@@ -124,7 +124,7 @@ async def select_file(file: str, request: Request):
Raises:
HTTPException: If there's an error opening the file.
"""
runtime: Runtime = request.state.runtime
runtime: Runtime = request.state.conversation.runtime
file = os.path.join(runtime.config.workspace_mount_path_in_sandbox, file)
read_action = FileReadAction(file)
@@ -199,7 +199,7 @@ async def upload_file(request: Request, conversation_id: str, files: list[Upload
tmp_file.write(file_contents)
tmp_file.flush()
runtime: Runtime = request.state.runtime
runtime: Runtime = request.state.conversation.runtime
try:
await call_sync_from_async(
runtime.copy_to,
@@ -276,7 +276,7 @@ async def save_file(request: Request):
raise HTTPException(status_code=400, detail='Missing filePath or content')
# Save the file to the agent's runtime file store
runtime: Runtime = request.state.runtime
runtime: Runtime = request.state.conversation.runtime
file_path = os.path.join(
runtime.config.workspace_mount_path_in_sandbox, file_path
)
@@ -316,7 +316,7 @@ async def zip_current_workspace(
):
try:
logger.debug('Zipping workspace')
runtime: Runtime = request.state.runtime
runtime: Runtime = request.state.conversation.runtime
path = runtime.config.workspace_mount_path_in_sandbox
try:
zip_file = await call_sync_from_async(runtime.copy_from, path)

View File

@@ -6,10 +6,9 @@ from github import Github
from pydantic import BaseModel
from openhands.core.logger import openhands_logger as logger
from openhands.server.middleware import session_manager
from openhands.server.routes.settings import SettingsStoreImpl
from openhands.server.session.conversation_init_data import ConversationInitData
from openhands.server.shared import config
from openhands.server.shared import config, session_manager
from openhands.storage.conversation.conversation_store import (
ConversationMetadata,
ConversationStore,

View File

@@ -4,9 +4,6 @@ from fastapi import (
Request,
)
from openhands.security import SecurityAnalyzer, options
from openhands.server.shared import config
app = APIRouter(prefix='/api/conversations/{conversation_id}')
@@ -25,10 +22,9 @@ async def security_api(request: Request):
Raises:
HTTPException: If the security analyzer is not initialized.
"""
if not request.state.runtime:
if not request.state.conversation.security_analyzer:
raise HTTPException(status_code=404, detail='Security analyzer not initialized')
security_analyzer = options.SecurityAnalyzers.get(
config.security.security_analyzer or '', SecurityAnalyzer
)(request.state.runtime.event_stream)
return await security_analyzer.handle_api_request(request)
return await request.state.conversation.security_analyzer.handle_api_request(
request
)

View File

@@ -4,16 +4,16 @@ from typing import Callable, Optional
from openhands.controller import AgentController
from openhands.controller.agent import Agent
from openhands.controller.state.state import State
from openhands.core.config import AgentConfig, LLMConfig
from openhands.core.config import AgentConfig, AppConfig, LLMConfig
from openhands.core.exceptions import AgentRuntimeUnavailableError
from openhands.core.logger import openhands_logger as logger
from openhands.core.schema.agent import AgentState
from openhands.events.action import ChangeAgentStateAction
from openhands.events.event import EventSource
from openhands.events.stream import EventStream
from openhands.runtime import get_runtime_cls
from openhands.runtime.base import Runtime
from openhands.security import SecurityAnalyzer, options
from openhands.server.shared import runtime_manager
from openhands.storage.files import FileStore
from openhands.utils.async_utils import call_async_from_sync, call_sync_from_async
from openhands.utils.shutdown_listener import should_continue
@@ -59,6 +59,8 @@ class AgentSession:
async def start(
self,
runtime_name: str,
config: AppConfig,
agent: Agent,
max_iterations: int,
max_budget_per_task: float | None = None,
@@ -69,6 +71,8 @@ class AgentSession:
):
"""Starts the Agent session
Parameters:
- runtime_name: The name of the runtime associated with the session
- config:
- agent:
- max_iterations:
- max_budget_per_task:
@@ -83,6 +87,8 @@ class AgentSession:
asyncio.get_event_loop().run_in_executor(
None,
self._start_thread,
runtime_name,
config,
agent,
max_iterations,
max_budget_per_task,
@@ -101,6 +107,8 @@ class AgentSession:
async def _start(
self,
runtime_name: str,
config: AppConfig,
agent: Agent,
max_iterations: int,
max_budget_per_task: float | None = None,
@@ -113,10 +121,10 @@ class AgentSession:
logger.warning('Session closed before starting')
return
self._initializing = True
self._create_security_analyzer(
runtime_manager.config.security.security_analyzer
)
self._create_security_analyzer(config.security.security_analyzer)
await self._create_runtime(
runtime_name=runtime_name,
config=config,
agent=agent,
github_token=github_token,
selected_repository=selected_repository,
@@ -124,7 +132,7 @@ class AgentSession:
self.controller = self._create_controller(
agent,
runtime_manager.config.security.confirmation_mode,
config.security.confirmation_mode,
max_iterations,
max_budget_per_task=max_budget_per_task,
agent_to_llm_config=agent_to_llm_config,
@@ -162,7 +170,7 @@ class AgentSession:
end_state.save_to_session(self.sid, self.file_store)
await self.controller.close()
if self.runtime is not None:
runtime_manager.destroy_runtime(self.sid)
self.runtime.close()
if self.security_analyzer is not None:
await self.security_analyzer.close()
@@ -185,6 +193,8 @@ class AgentSession:
async def _create_runtime(
self,
runtime_name: str,
config: AppConfig,
agent: Agent,
github_token: str | None = None,
selected_repository: str | None = None,
@@ -192,28 +202,38 @@ class AgentSession:
"""Creates a runtime instance
Parameters:
- runtime_name: The name of the runtime associated with the session
- config:
- agent:
"""
if self.runtime is not None:
raise RuntimeError('Runtime already created')
logger.debug(f'Initializing runtime `{runtime_name}` now...')
runtime_cls = get_runtime_cls(runtime_name)
self.runtime = runtime_cls(
config=config,
event_stream=self.event_stream,
sid=self.sid,
plugins=agent.sandbox_plugins,
status_callback=self._status_callback,
headless_mode=False,
)
# FIXME: this sleep is a terrible hack.
# This is to give the websocket a second to connect, so that
# the status messages make it through to the frontend.
# We should find a better way to plumb status messages through.
await asyncio.sleep(1)
try:
self.runtime = await runtime_manager.create_runtime(
event_stream=self.event_stream,
sid=self.sid,
plugins=agent.sandbox_plugins,
status_callback=self._status_callback,
headless_mode=False,
)
await self.runtime.connect()
except AgentRuntimeUnavailableError as e:
logger.error(f'Runtime initialization failed: {e}', exc_info=True)
if self._status_callback:
self._status_callback(
'error', 'STATUS$ERROR_RUNTIME_DISCONNECTED', str(e)
)
return
self.runtime.clone_repo(github_token, selected_repository)

View File

@@ -0,0 +1,46 @@
import asyncio
from openhands.core.config import AppConfig
from openhands.events.stream import EventStream
from openhands.runtime import get_runtime_cls
from openhands.runtime.base import Runtime
from openhands.security import SecurityAnalyzer, options
from openhands.storage.files import FileStore
from openhands.utils.async_utils import call_sync_from_async
class Conversation:
sid: str
file_store: FileStore
event_stream: EventStream
runtime: Runtime
def __init__(
self,
sid: str,
file_store: FileStore,
config: AppConfig,
):
self.sid = sid
self.config = config
self.file_store = file_store
self.event_stream = EventStream(sid, file_store)
if config.security.security_analyzer:
self.security_analyzer = options.SecurityAnalyzers.get(
config.security.security_analyzer, SecurityAnalyzer
)(self.event_stream)
runtime_cls = get_runtime_cls(self.config.runtime)
self.runtime = runtime_cls(
config=config,
event_stream=self.event_stream,
sid=self.sid,
attach_to_existing=True,
headless_mode=False,
)
async def connect(self):
await self.runtime.connect()
async def disconnect(self):
asyncio.create_task(call_sync_from_async(self.runtime.close))

View File

@@ -6,11 +6,12 @@ from dataclasses import dataclass, field
import socketio
from openhands.core.config import AppConfig
from openhands.core.exceptions import AgentRuntimeUnavailableError
from openhands.core.logger import openhands_logger as logger
from openhands.events.stream import EventStream
from openhands.events.stream import EventStream, session_exists
from openhands.server.session.conversation import Conversation
from openhands.server.session.conversation_init_data import ConversationInitData
from openhands.server.session.session import ROOM_KEY, Session
from openhands.server.shared import runtime_manager
from openhands.storage.files import FileStore
from openhands.utils.async_utils import call_sync_from_async
from openhands.utils.shutdown_listener import should_continue
@@ -18,6 +19,9 @@ from openhands.utils.shutdown_listener import should_continue
_REDIS_POLL_TIMEOUT = 1.5
_CHECK_ALIVE_INTERVAL = 15
_CLEANUP_INTERVAL = 15
_CLEANUP_EXCEPTION_WAIT_TIME = 15
class ConversationDoesNotExistError(Exception):
pass
@@ -33,6 +37,14 @@ class SessionManager:
_last_alive_timestamps: dict[str, float] = field(default_factory=dict)
_redis_listen_task: asyncio.Task | None = None
_session_is_running_flags: dict[str, asyncio.Event] = field(default_factory=dict)
_active_conversations: dict[str, tuple[Conversation, int]] = field(
default_factory=dict
)
_detached_conversations: dict[str, tuple[Conversation, float]] = field(
default_factory=dict
)
_conversations_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
_cleanup_task: asyncio.Task | None = None
_has_remote_connections_flags: dict[str, asyncio.Event] = field(
default_factory=dict
)
@@ -41,12 +53,16 @@ class SessionManager:
redis_client = self._get_redis_client()
if redis_client:
self._redis_listen_task = asyncio.create_task(self._redis_subscribe())
self._cleanup_task = asyncio.create_task(self._cleanup_detached_conversations())
return self
async def __aexit__(self, exc_type, exc_value, traceback):
if self._redis_listen_task:
self._redis_listen_task.cancel()
self._redis_listen_task = None
if self._cleanup_task:
self._cleanup_task.cancel()
self._cleanup_task = None
def _get_redis_client(self):
redis_client = getattr(self.sio.manager, 'redis', None)
@@ -118,7 +134,6 @@ class SessionManager:
# Session closing event - We only get this in the event of graceful shutdown,
# which can't be guaranteed - nodes can simply vanish unexpectedly!
logger.debug(f'session_closing:{sid}')
await call_sync_from_async(runtime_manager.destroy_runtime, sid)
for (
connection_id,
local_sid,
@@ -129,15 +144,96 @@ class SessionManager:
)
await self.sio.disconnect(connection_id)
async def attach_to_conversation(self, sid: str) -> Conversation | None:
start_time = time.time()
if not await session_exists(sid, self.file_store):
return None
async with self._conversations_lock:
# Check if we have an active conversation we can reuse
if sid in self._active_conversations:
conversation, count = self._active_conversations[sid]
self._active_conversations[sid] = (conversation, count + 1)
logger.info(f'Reusing active conversation {sid}')
return conversation
# Check if we have a detached conversation we can reuse
if sid in self._detached_conversations:
conversation, _ = self._detached_conversations.pop(sid)
self._active_conversations[sid] = (conversation, 1)
logger.info(f'Reusing detached conversation {sid}')
return conversation
# Create new conversation if none exists
c = Conversation(sid, file_store=self.file_store, config=self.config)
try:
await c.connect()
except AgentRuntimeUnavailableError as e:
logger.error(f'Error connecting to conversation {c.sid}: {e}')
return None
end_time = time.time()
logger.info(
f'Conversation {c.sid} connected in {end_time - start_time} seconds'
)
self._active_conversations[sid] = (c, 1)
return c
async def join_conversation(self, sid: str, connection_id: str) -> EventStream:
logger.info(f'join_conversation:{sid}:{connection_id}')
await self.sio.enter_room(connection_id, ROOM_KEY.format(sid=sid))
self.local_connection_id_to_session_id[connection_id] = sid
event_stream = await self.get_event_stream(sid)
event_stream = await self._get_event_stream(sid)
if not event_stream:
return await self.maybe_start_agent_loop(sid)
return event_stream
async def detach_from_conversation(self, conversation: Conversation):
sid = conversation.sid
async with self._conversations_lock:
if sid in self._active_conversations:
conv, count = self._active_conversations[sid]
if count > 1:
self._active_conversations[sid] = (conv, count - 1)
return
else:
self._active_conversations.pop(sid)
self._detached_conversations[sid] = (conversation, time.time())
async def _cleanup_detached_conversations(self):
while should_continue():
if self._get_redis_client():
# Debug info for HA envs
logger.info(
f'Attached conversations: {len(self._active_conversations)}'
)
logger.info(
f'Detached conversations: {len(self._detached_conversations)}'
)
logger.info(
f'Running agent loops: {len(self._local_agent_loops_by_sid)}'
)
logger.info(
f'Local connections: {len(self.local_connection_id_to_session_id)}'
)
try:
async with self._conversations_lock:
# Create a list of items to process to avoid modifying dict during iteration
items = list(self._detached_conversations.items())
for sid, (conversation, detach_time) in items:
await conversation.disconnect()
self._detached_conversations.pop(sid, None)
await asyncio.sleep(_CLEANUP_INTERVAL)
except asyncio.CancelledError:
async with self._conversations_lock:
for conversation, _ in self._detached_conversations.values():
await conversation.disconnect()
self._detached_conversations.clear()
return
except Exception:
logger.warning('error_cleaning_detached_conversations', exc_info=True)
await asyncio.sleep(_CLEANUP_EXCEPTION_WAIT_TIME)
async def _is_agent_loop_running(self, sid: str) -> bool:
if await self._is_agent_loop_running_locally(sid):
return True
@@ -214,22 +310,19 @@ class SessionManager:
if not await self._is_agent_loop_running(sid):
logger.info(f'start_agent_loop:{sid}')
session = Session(
sid=sid,
file_store=self.file_store,
config=self.config,
sio=self.sio,
sid=sid, file_store=self.file_store, config=self.config, sio=self.sio
)
self._local_agent_loops_by_sid[sid] = session
await session.initialize_agent(conversation_init_data)
event_stream = await self.get_event_stream(sid)
event_stream = await self._get_event_stream(sid)
if not event_stream:
logger.error(f'No event stream after starting agent loop: {sid}')
raise RuntimeError(f'no_event_stream:{sid}')
return event_stream
async def get_event_stream(self, sid: str) -> EventStream | None:
logger.info(f'get_event_stream:{sid}')
async def _get_event_stream(self, sid: str) -> EventStream | None:
logger.info(f'_get_event_stream:{sid}')
session = self._local_agent_loops_by_sid.get(sid)
if session:
logger.info(f'found_local_agent_loop:{sid}')
@@ -300,7 +393,6 @@ class SessionManager:
async def _cleanup_session(self, sid: str) -> bool:
# Get local connections
logger.info(f'_cleanup_session:{sid}')
await call_sync_from_async(runtime_manager.destroy_runtime, sid)
has_local_connections = next(
(True for v in self.local_connection_id_to_session_id.values() if v == sid),
False,
@@ -329,8 +421,8 @@ class SessionManager:
# Clear up local variables
connection_ids_to_remove = list(
connection_id
for connection_id, conn_sid in self.local_connection_id_to_session_id.items()
if sid == conn_sid
for connection_id, sid in self.local_connection_id_to_session_id.items()
if sid == sid
)
logger.info(f'removing connections: {connection_ids_to_remove}')
for connnnection_id in connection_ids_to_remove:

View File

@@ -52,9 +52,7 @@ class Session:
self.last_active_ts = int(time.time())
self.file_store = file_store
self.agent_session = AgentSession(
sid,
file_store,
status_callback=self.queue_status_message,
sid, file_store, status_callback=self.queue_status_message
)
self.agent_session.event_stream.subscribe(
EventStreamSubscriber.SERVER, self.on_event, self.sid
@@ -133,6 +131,8 @@ class Session:
try:
await self.agent_session.start(
runtime_name=self.config.runtime,
config=self.config,
agent=agent,
max_iterations=max_iterations,
max_budget_per_task=self.config.max_budget_per_task,

View File

@@ -4,8 +4,8 @@ import socketio
from dotenv import load_dotenv
from openhands.core.config import load_app_config
from openhands.runtime.runtime_manager import RuntimeManager
from openhands.server.config.openhands_config import load_openhands_config
from openhands.server.session import SessionManager
from openhands.storage import get_file_store
load_dotenv()
@@ -27,4 +27,4 @@ sio = socketio.AsyncServer(
async_mode='asgi', cors_allowed_origins='*', client_manager=client_manager
)
runtime_manager = RuntimeManager(config)
session_manager = SessionManager(sio, config, file_store)

17
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.8.5 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand.
[[package]]
name = "aiohappyeyeballs"
@@ -3783,19 +3783,19 @@ pydantic = ">=1.10"
[[package]]
name = "llama-index"
version = "0.12.8"
version = "0.12.7"
description = "Interface between LLMs and your data"
optional = false
python-versions = "<4.0,>=3.9"
files = [
{file = "llama_index-0.12.8-py3-none-any.whl", hash = "sha256:6b98ea44c225c7d230fd7f552dfcc2911ef327e3be352dc239011118242e4a28"},
{file = "llama_index-0.12.8.tar.gz", hash = "sha256:f1578bb6873fa4f90a8645a80f4f997d184770e63bd7a2b45a98ab6e5c70fb59"},
{file = "llama_index-0.12.7-py3-none-any.whl", hash = "sha256:9fee54e1dfdee7d1154ae6a702178052c72d81a946fce000eb80dffe98a7e9f6"},
{file = "llama_index-0.12.7.tar.gz", hash = "sha256:2c197246a85de8e472e559b88212e4e92c167fdef9c0b131ae1f760ddcdfaca6"},
]
[package.dependencies]
llama-index-agent-openai = ">=0.4.0,<0.5.0"
llama-index-cli = ">=0.4.0,<0.5.0"
llama-index-core = ">=0.12.8,<0.13.0"
llama-index-core = ">=0.12.7,<0.13.0"
llama-index-embeddings-openai = ">=0.3.0,<0.4.0"
llama-index-indices-managed-llama-cloud = ">=0.4.0"
llama-index-llms-openai = ">=0.3.0,<0.4.0"
@@ -3840,13 +3840,13 @@ llama-index-llms-openai = ">=0.3.0,<0.4.0"
[[package]]
name = "llama-index-core"
version = "0.12.8"
version = "0.12.7"
description = "Interface between LLMs and your data"
optional = false
python-versions = "<4.0,>=3.9"
files = [
{file = "llama_index_core-0.12.8-py3-none-any.whl", hash = "sha256:7ebecbdaa1d5b6a320c050bf90525605ac03b242d26ad55f0e00a0e1df69e070"},
{file = "llama_index_core-0.12.8.tar.gz", hash = "sha256:3b360437b4ae47b7bd1733f6492a95126e6739c7a2fd2b649ebe8bb3afea7143"},
{file = "llama_index_core-0.12.7-py3-none-any.whl", hash = "sha256:691493915598c09b636f964e85b8baca630faa362a4a8ea130ddea8584ab8d0a"},
{file = "llama_index_core-0.12.7.tar.gz", hash = "sha256:9935b249c08f87c124962a8ea1e301e1b5bfa7e3ffd6771b6cb59a0de9bb8cb5"},
]
[package.dependencies]
@@ -5414,7 +5414,6 @@ optional = false
python-versions = ">=3.6"
files = [
{file = "opencv-python-4.10.0.84.tar.gz", hash = "sha256:72d234e4582e9658ffea8e9cae5b63d488ad06994ef12d81dc303b17472f3526"},
{file = "opencv_python-4.10.0.84-cp37-abi3-macosx_11_0_arm64.whl", hash = "sha256:fc182f8f4cda51b45f01c64e4cbedfc2f00aff799debebc305d8d0210c43f251"},
{file = "opencv_python-4.10.0.84-cp37-abi3-macosx_12_0_x86_64.whl", hash = "sha256:71e575744f1d23f79741450254660442785f45a0797212852ee5199ef12eed98"},
{file = "opencv_python-4.10.0.84-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:09a332b50488e2dda866a6c5573ee192fe3583239fb26ff2f7f9ceb0bc119ea6"},
{file = "opencv_python-4.10.0.84-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9ace140fc6d647fbe1c692bcb2abce768973491222c067c131d80957c595b71f"},

View File

@@ -326,8 +326,7 @@ async def test_complete_runtime():
@pytest.mark.asyncio
async def test_process_issue(mock_output_dir, mock_prompt_template):
# Mock dependencies
mock_runtime = MagicMock(connect=AsyncMock())
mock_create_runtime = AsyncMock(return_value=mock_runtime)
mock_create_runtime = MagicMock()
mock_initialize_runtime = AsyncMock()
mock_run_controller = AsyncMock()
mock_complete_runtime = AsyncMock()
@@ -409,9 +408,7 @@ async def test_process_issue(mock_output_dir, mock_prompt_template):
handler_instance.reset_mock()
# Mock return values
mock_runtime = MagicMock(connect=AsyncMock())
mock_create_runtime.return_value = AsyncMock()
mock_create_runtime.return_value.__aenter__.return_value = mock_runtime
mock_create_runtime.return_value = MagicMock(connect=AsyncMock())
if test_case['run_controller_raises']:
mock_run_controller.side_effect = test_case['run_controller_raises']
else:

View File

@@ -237,36 +237,3 @@ async def test_add_to_cluster_event_stream():
'oh_event',
'{"sid": "new-session-id", "message_type": "event", "data": {"event_type": "some_event"}}',
)
@pytest.mark.asyncio
async def test_cleanup_session_connections():
sio = get_mock_sio()
with (
patch('openhands.server.session.manager._REDIS_POLL_TIMEOUT', 0.01),
patch(
'openhands.server.session.manager.SessionManager._redis_subscribe',
AsyncMock(),
),
):
async with SessionManager(
sio, AppConfig(), InMemoryFileStore()
) as session_manager:
session_manager.local_connection_id_to_session_id.update(
{
'conn1': 'session1',
'conn2': 'session1',
'conn3': 'session2',
'conn4': 'session2',
}
)
await session_manager._close_session('session1')
remaining_connections = session_manager.local_connection_id_to_session_id
assert 'conn1' not in remaining_connections
assert 'conn2' not in remaining_connections
assert 'conn3' in remaining_connections
assert 'conn4' in remaining_connections
assert remaining_connections['conn3'] == 'session2'
assert remaining_connections['conn4'] == 'session2'