Files
OpenHands/openhands/runtime/runtime.py
Xingyao Wang 8b1f207d39 feat: support remote runtime (#3406)
* feat: refactor building logic into runtime builder

* return image name

* fix testcases

* use runtime builder for eventstream runtime

* have runtime builder return str

* add api_key to sandbox config

* draft remote runtime

* remove extra if clause

* initialize runtime based on box class

* add build logic

* use base64 for file upload

* get runtime image prefix from API

* replace ___ with _s_ to make it a valid image name

* use /build to start build and /build_status to check the build progress

* update logging

* fix exit code

* always use port

* add remote runtime

* rename runtime

* fix tests import

* make dir first if work_dir does not exists;

* update debug print to remote runtime

* fix exit close_sync

* update logging

* add retry for stop

* use all box class for test keep prompt

* fix test browsing

* add retry stop

* merge init commands to save startup time

* fix await

* remove sandbox url

* support execute through specific runtime url

* fix file ops

* simplify close

* factor out runtime retry code

* fix exception handling

* fix content type error (e.g., bad gateway when runtime is not ready)

* add retry for wait until alive;
add retry for check image exists

* Revert "add retry for wait until alive;"

This reverts commit dd013cd268.

* retry when wait until alive

* clean up msg

* directly save sdist to temp dir for _put_source_code_to_dir

* support running testcases in parallel

* tweak logging;
try to close session

* try to close session even on exception

* update poetry lock

* support remote to run integration tests

* add warning for workspace base on remote runtime

* set default runtime api

* remove server runtime

* update poetry lock

* support running swe-bench (n=1) eval on remoteruntime

* add a timeout of 30 min

* add todo for docker namespace

* update poetry loc
2024-08-29 15:53:37 +00:00

212 lines
7.7 KiB
Python

import asyncio
import atexit
import copy
import json
import os
from abc import abstractmethod
from openhands.core.config import AppConfig, SandboxConfig
from openhands.core.logger import openhands_logger as logger
from openhands.events import EventSource, EventStream, EventStreamSubscriber
from openhands.events.action import (
Action,
ActionConfirmationStatus,
BrowseInteractiveAction,
BrowseURLAction,
CmdRunAction,
FileReadAction,
FileWriteAction,
IPythonRunCellAction,
)
from openhands.events.event import Event
from openhands.events.observation import (
CmdOutputObservation,
ErrorObservation,
NullObservation,
Observation,
UserRejectObservation,
)
from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
from openhands.runtime.plugins import JupyterRequirement, PluginRequirement
def _default_env_vars(sandbox_config: SandboxConfig) -> dict[str, str]:
ret = {}
for key in os.environ:
if key.startswith('SANDBOX_ENV_'):
sandbox_key = key.removeprefix('SANDBOX_ENV_')
ret[sandbox_key] = os.environ[key]
if sandbox_config.enable_auto_lint:
ret['ENABLE_AUTO_LINT'] = 'true'
return ret
class Runtime:
"""The runtime is how the agent interacts with the external environment.
This includes a bash sandbox, a browser, and filesystem interactions.
sid is the session id, which is used to identify the current user session.
"""
sid: str
config: AppConfig
DEFAULT_ENV_VARS: dict[str, str]
def __init__(
self,
config: AppConfig,
event_stream: EventStream,
sid: str = 'default',
plugins: list[PluginRequirement] | None = None,
):
self.sid = sid
self.event_stream = event_stream
self.event_stream.subscribe(EventStreamSubscriber.RUNTIME, self.on_event)
self.plugins = plugins if plugins is not None and len(plugins) > 0 else []
self.config = copy.deepcopy(config)
self.DEFAULT_ENV_VARS = _default_env_vars(config.sandbox)
atexit.register(self.close_sync)
logger.debug(f'Runtime `{sid}` config:\n{self.config}')
async def ainit(self, env_vars: dict[str, str] | None = None) -> None:
"""
Initialize the runtime (asynchronously).
This method should be called after the runtime's constructor.
"""
if self.DEFAULT_ENV_VARS:
logger.debug(f'Adding default env vars: {self.DEFAULT_ENV_VARS}')
await self.add_env_vars(self.DEFAULT_ENV_VARS)
if env_vars is not None:
logger.debug(f'Adding provided env vars: {env_vars}')
await self.add_env_vars(env_vars)
async def close(self) -> None:
pass
def close_sync(self) -> None:
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
return
if loop.is_running():
loop.create_task(self.close())
else:
loop.run_until_complete(self.close())
except RuntimeError:
# Event loop is already closed, nothing to do
pass
# ====================================================================
async def add_env_vars(self, env_vars: dict[str, str]) -> None:
# Add env vars to the IPython shell (if Jupyter is used)
if any(isinstance(plugin, JupyterRequirement) for plugin in self.plugins):
code = 'import os\n'
for key, value in env_vars.items():
# Note: json.dumps gives us nice escaping for free
code += f'os.environ["{key}"] = {json.dumps(value)}\n'
code += '\n'
obs = await self.run_ipython(IPythonRunCellAction(code))
logger.info(f'Added env vars to IPython: code={code}, obs={obs}')
# Add env vars to the Bash shell
cmd = ''
for key, value in env_vars.items():
# Note: json.dumps gives us nice escaping for free
cmd += f'export {key}={json.dumps(value)}; '
if not cmd:
return
cmd = cmd.strip()
logger.debug(f'Adding env var: {cmd}')
obs = await self.run(CmdRunAction(cmd))
if not isinstance(obs, CmdOutputObservation) or obs.exit_code != 0:
raise RuntimeError(
f'Failed to add env vars [{env_vars}] to environment: {obs.content}'
)
async def on_event(self, event: Event) -> None:
if isinstance(event, Action):
# set timeout to default if not set
if event.timeout is None:
event.timeout = self.config.sandbox.timeout
assert event.timeout is not None
observation = await self.run_action(event)
observation._cause = event.id # type: ignore[attr-defined]
source = event.source if event.source else EventSource.AGENT
self.event_stream.add_event(observation, source) # type: ignore[arg-type]
async def run_action(self, action: Action) -> Observation:
"""Run an action and return the resulting observation.
If the action is not runnable in any runtime, a NullObservation is returned.
If the action is not supported by the current runtime, an ErrorObservation is returned.
"""
if not action.runnable:
return NullObservation('')
if (
hasattr(action, 'is_confirmed')
and action.is_confirmed == ActionConfirmationStatus.AWAITING_CONFIRMATION
):
return NullObservation('')
action_type = action.action # type: ignore[attr-defined]
if action_type not in ACTION_TYPE_TO_CLASS:
return ErrorObservation(f'Action {action_type} does not exist.')
if not hasattr(self, action_type):
return ErrorObservation(
f'Action {action_type} is not supported in the current runtime.'
)
if (
hasattr(action, 'is_confirmed')
and action.is_confirmed == ActionConfirmationStatus.REJECTED
):
return UserRejectObservation(
'Action has been rejected by the user! Waiting for further user input.'
)
observation = await getattr(self, action_type)(action)
return observation
# ====================================================================
# Action execution
# ====================================================================
@abstractmethod
async def run(self, action: CmdRunAction) -> Observation:
pass
@abstractmethod
async def run_ipython(self, action: IPythonRunCellAction) -> Observation:
pass
@abstractmethod
async def read(self, action: FileReadAction) -> Observation:
pass
@abstractmethod
async def write(self, action: FileWriteAction) -> Observation:
pass
@abstractmethod
async def browse(self, action: BrowseURLAction) -> Observation:
pass
@abstractmethod
async def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
pass
# ====================================================================
# File operations
# ====================================================================
@abstractmethod
async def copy_to(self, host_src: str, sandbox_dest: str, recursive: bool = False):
raise NotImplementedError('This method is not implemented in the base class.')
@abstractmethod
async def list_files(self, path: str | None = None) -> list[str]:
"""List files in the sandbox.
If path is None, list files in the sandbox's initial working directory (e.g., /workspace).
"""
raise NotImplementedError('This method is not implemented in the base class.')