Compare commits

...

2 Commits

Author SHA1 Message Date
openhands
e29a4147f2 Resolve merge conflicts in app_config.py and action_execution_server.py 2025-02-24 16:11:11 +00:00
Chase Shimmin
f7424970f0 Refactor runtime executor for better extensibility
Extracts runtime execution logic into dedicated modules for better organization and extensibility:
- Splits ActionExecutor into base and implementation classes
- Adds support for custom executor implementations via config
- Maintains backward compatibility with existing execution logic
- Allows specifying executor class from arbitrary python modules via new core.runtime_executor config.

This change enables easier customization of runtime behavior and cleaner separation of concerns.
2025-01-15 20:29:00 +00:00
6 changed files with 413 additions and 6 deletions

View File

@@ -83,6 +83,9 @@ workspace_base = "./workspace"
# Runtime environment
#runtime = "docker"
# Runtime executor
#runtime_executor = "openhands.runtime.executor:ActionExecutor"
# Name of the default agent
#default_agent = "CodeActAgent"

View File

@@ -15,7 +15,7 @@ import tempfile
import time
import traceback
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Type
from zipfile import ZipFile
from fastapi import Depends, FastAPI, HTTPException, Request, UploadFile
@@ -60,7 +60,6 @@ from openhands.runtime.utils.files import insert_lines, read_lines
from openhands.runtime.utils.memory_monitor import MemoryMonitor
from openhands.runtime.utils.runtime_init import init_user_and_working_directory
from openhands.runtime.utils.system_stats import get_system_stats
from openhands.utils.async_utils import call_sync_from_async, wait_all
class ActionRequest(BaseModel):
@@ -68,7 +67,6 @@ class ActionRequest(BaseModel):
ROOT_GID = 0
SESSION_API_KEY = os.environ.get('SESSION_API_KEY')
api_key_header = APIKeyHeader(name='X-Session-API-Key', auto_error=False)
@@ -135,7 +133,6 @@ class ActionExecutor:
"""ActionExecutor is running inside docker sandbox.
It is responsible for executing actions received from OpenHands backend and producing observations.
"""
def __init__(
self,
plugins_to_load: list[Plugin],
@@ -463,6 +460,7 @@ class ActionExecutor:
if self.bash_session is not None:
self.bash_session.close()
self.browser.close()
>>>>>>> origin/main
if __name__ == '__main__':
@@ -480,6 +478,12 @@ if __name__ == '__main__':
help='BrowserGym environment used for browser evaluation',
default=None,
)
parser.add_argument(
'--executor-class',
type=str,
default='openhands.runtime.executor:ActionExecutor',
help='Action executor class to use (format: module.path:ClassName)',
)
# example: python client.py 8000 --working-dir /workspace --plugins JupyterRequirement
args = parser.parse_args()
@@ -490,12 +494,13 @@ if __name__ == '__main__':
raise ValueError(f'Plugin {plugin} not found')
plugins_to_load.append(ALL_PLUGINS[plugin]()) # type: ignore
client: ActionExecutor | None = None
executor_class = get_action_executor_class(args.executor_class)
client: RuntimeExecutor | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global client
client = ActionExecutor(
client = executor_class(
plugins_to_load,
work_dir=args.working_dir,
username=args.username,

View File

@@ -0,0 +1,4 @@
from .base import RuntimeExecutor
from .action_executor import ActionExecutor, BaseActionExecutor
__all__ = ['ActionExecutor', 'BaseActionExecutor', 'RuntimeExecutor']

View File

@@ -0,0 +1,267 @@
import base64
import json
import mimetypes
import os
from pathlib import Path
import re
from openhands_aci.utils.diff import get_diff
from openhands.core.logger import openhands_logger as logger
from openhands.events.action.browse import BrowseInteractiveAction, BrowseURLAction
from openhands.events.action.commands import IPythonRunCellAction
from openhands.events.action.files import FileReadAction, FileWriteAction
from openhands.events.event import FileEditSource, FileReadSource
from openhands.events.observation.commands import (
IPythonRunCellObservation,
)
from openhands.events.observation.error import ErrorObservation
from openhands.events.observation.files import (
FileEditObservation,
FileReadObservation,
FileWriteObservation,
)
from openhands.events.observation.observation import Observation
from openhands.runtime.browser import browse
from openhands.runtime.executor.base import RuntimeExecutor
from openhands.runtime.plugins.jupyter import JupyterPlugin
from openhands.runtime.utils.files import insert_lines, read_lines
class BaseActionExecutor(RuntimeExecutor):
"""Runtime executor that dynamically dispatches actions to the appropriate method based on their name."""
async def run_action(self, action) -> Observation:
async with self.lock:
action_type = action.action
logger.debug(f'Running action:\n{action}')
observation = await getattr(self, action_type)(action)
logger.debug(f'Action output:\n{observation}')
return observation
class ActionExecutor(BaseActionExecutor):
"""ActionExecutor runs inside docker sandbox.
It is responsible for executing actions received from OpenHands backend and producing observations.
It is a BaseActionExectuor that provides a default implementation for all of the built-in actions.
"""
async def run_ipython(self, action: IPythonRunCellAction) -> Observation:
assert self.bash_session is not None
if 'jupyter' in self.plugins:
_jupyter_plugin: JupyterPlugin = self.plugins['jupyter'] # type: ignore
# This is used to make AgentSkills in Jupyter aware of the
# current working directory in Bash
jupyter_cwd = getattr(self, '_jupyter_cwd', None)
if self.bash_session.cwd != jupyter_cwd:
logger.debug(
f'{self.bash_session.cwd} != {jupyter_cwd} -> reset Jupyter PWD'
)
reset_jupyter_cwd_code = (
f'import os; os.chdir("{self.bash_session.cwd}")'
)
_aux_action = IPythonRunCellAction(code=reset_jupyter_cwd_code)
_reset_obs: IPythonRunCellObservation = await _jupyter_plugin.run(
_aux_action
)
logger.debug(
f'Changed working directory in IPython to: {self.bash_session.cwd}. Output: {_reset_obs}'
)
self._jupyter_cwd = self.bash_session.cwd
obs: IPythonRunCellObservation = await _jupyter_plugin.run(action)
obs.content = obs.content.rstrip()
matches = re.findall(
r'<oh_aci_output_[0-9a-f]{32}>(.*?)</oh_aci_output_[0-9a-f]{32}>',
obs.content,
re.DOTALL,
)
if matches:
results: list[str] = []
if len(matches) == 1:
# Use specific actions/observations types
match = matches[0]
try:
result_dict = json.loads(match)
if result_dict.get('path'): # Successful output
if (
result_dict['new_content'] is not None
): # File edit commands
diff = get_diff(
old_contents=result_dict['old_content']
or '', # old_content is None when file is created
new_contents=result_dict['new_content'],
filepath=result_dict['path'],
)
return FileEditObservation(
content=diff,
path=result_dict['path'],
old_content=result_dict['old_content'],
new_content=result_dict['new_content'],
prev_exist=result_dict['prev_exist'],
impl_source=FileEditSource.OH_ACI,
formatted_output_and_error=result_dict[
'formatted_output_and_error'
],
)
else: # File view commands
return FileReadObservation(
content=result_dict['formatted_output_and_error'],
path=result_dict['path'],
impl_source=FileReadSource.OH_ACI,
)
else: # Error output
results.append(result_dict['formatted_output_and_error'])
except json.JSONDecodeError:
# Handle JSON decoding errors if necessary
results.append(
f"Invalid JSON in 'openhands-aci' output: {match}"
)
else:
for match in matches:
try:
result_dict = json.loads(match)
results.append(result_dict['formatted_output_and_error'])
except json.JSONDecodeError:
# Handle JSON decoding errors if necessary
results.append(
f"Invalid JSON in 'openhands-aci' output: {match}"
)
# Combine the results (e.g., join them) or handle them as required
obs.content = '\n'.join(str(result) for result in results)
if action.include_extra:
obs.content += (
f'\n[Jupyter current working directory: {self.bash_session.cwd}]'
)
obs.content += f'\n[Jupyter Python interpreter: {_jupyter_plugin.python_interpreter_path}]'
return obs
else:
raise RuntimeError(
'JupyterRequirement not found. Unable to run IPython action.'
)
def _resolve_path(self, path: str, working_dir: str) -> str:
filepath = Path(path)
if not filepath.is_absolute():
return str(Path(working_dir) / filepath)
return str(filepath)
async def read(self, action: FileReadAction) -> Observation:
assert self.bash_session is not None
if action.impl_source == FileReadSource.OH_ACI:
return await self.run_ipython(
IPythonRunCellAction(
code=action.translated_ipython_code,
include_extra=False,
)
)
# NOTE: the client code is running inside the sandbox,
# so there's no need to check permission
working_dir = self.bash_session.cwd
filepath = self._resolve_path(action.path, working_dir)
try:
if filepath.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
with open(filepath, 'rb') as file:
image_data = file.read()
encoded_image = base64.b64encode(image_data).decode('utf-8')
mime_type, _ = mimetypes.guess_type(filepath)
if mime_type is None:
mime_type = 'image/png' # default to PNG if mime type cannot be determined
encoded_image = f'data:{mime_type};base64,{encoded_image}'
return FileReadObservation(path=filepath, content=encoded_image)
elif filepath.lower().endswith('.pdf'):
with open(filepath, 'rb') as file:
pdf_data = file.read()
encoded_pdf = base64.b64encode(pdf_data).decode('utf-8')
encoded_pdf = f'data:application/pdf;base64,{encoded_pdf}'
return FileReadObservation(path=filepath, content=encoded_pdf)
elif filepath.lower().endswith(('.mp4', '.webm', '.ogg')):
with open(filepath, 'rb') as file:
video_data = file.read()
encoded_video = base64.b64encode(video_data).decode('utf-8')
mime_type, _ = mimetypes.guess_type(filepath)
if mime_type is None:
mime_type = 'video/mp4' # default to MP4 if MIME type cannot be determined
encoded_video = f'data:{mime_type};base64,{encoded_video}'
return FileReadObservation(path=filepath, content=encoded_video)
with open(filepath, 'r', encoding='utf-8') as file:
lines = read_lines(file.readlines(), action.start, action.end)
except FileNotFoundError:
return ErrorObservation(
f'File not found: {filepath}. Your current working directory is {working_dir}.'
)
except UnicodeDecodeError:
return ErrorObservation(f'File could not be decoded as utf-8: {filepath}.')
except IsADirectoryError:
return ErrorObservation(
f'Path is a directory: {filepath}. You can only read files'
)
code_view = ''.join(lines)
return FileReadObservation(path=filepath, content=code_view)
async def write(self, action: FileWriteAction) -> Observation:
assert self.bash_session is not None
working_dir = self.bash_session.cwd
filepath = self._resolve_path(action.path, working_dir)
insert = action.content.split('\n')
try:
if not os.path.exists(os.path.dirname(filepath)):
os.makedirs(os.path.dirname(filepath))
file_exists = os.path.exists(filepath)
if file_exists:
file_stat = os.stat(filepath)
else:
file_stat = None
mode = 'w' if not file_exists else 'r+'
try:
with open(filepath, mode, encoding='utf-8') as file:
if mode != 'w':
all_lines = file.readlines()
new_file = insert_lines(
insert, all_lines, action.start, action.end
)
else:
new_file = [i + '\n' for i in insert]
file.seek(0)
file.writelines(new_file)
file.truncate()
# Handle file permissions
if file_exists:
assert file_stat is not None
# restore the original file permissions if the file already exists
os.chmod(filepath, file_stat.st_mode)
os.chown(filepath, file_stat.st_uid, file_stat.st_gid)
else:
# set the new file permissions if the file is new
os.chmod(filepath, 0o664)
os.chown(filepath, self.user_id, self.user_id)
except FileNotFoundError:
return ErrorObservation(f'File not found: {filepath}')
except IsADirectoryError:
return ErrorObservation(
f'Path is a directory: {filepath}. You can only write to files'
)
except UnicodeDecodeError:
return ErrorObservation(
f'File could not be decoded as utf-8: {filepath}'
)
except PermissionError:
return ErrorObservation(f'Malformed paths not permitted: {filepath}')
return FileWriteObservation(content='', path=filepath)
async def browse(self, action: BrowseURLAction) -> Observation:
return await browse(action, self.browser)
async def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
return await browse(action, self.browser)

View File

@@ -0,0 +1,126 @@
import asyncio
import time
from openhands.core.logger import openhands_logger as logger
from openhands.events.action.commands import CmdRunAction, IPythonRunCellAction
from openhands.events.observation.commands import CmdOutputObservation
from openhands.events.observation.error import ErrorObservation
from openhands.runtime.browser.browser_env import BrowserEnv
from openhands.runtime.plugins.jupyter import JupyterPlugin
from openhands.runtime.plugins.requirement import Plugin
from openhands.runtime.utils.bash import BashSession
from openhands.runtime.utils.runtime_init import init_user_and_working_directory
from openhands.utils.async_utils import call_sync_from_async, wait_all
ROOT_GID = 0
INIT_COMMANDS = [
'git config --global user.name "openhands" && git config --global user.email "openhands@all-hands.dev" && alias git="git --no-pager"',
]
class RuntimeExecutor:
"""RuntimeExecutor for running inside docker sandbox.
It provides a minimal base class that handles initialization of the executor, and provides a run method to execute bash commands.
"""
def __init__(
self,
plugins_to_load: list[Plugin],
work_dir: str,
username: str,
user_id: int,
browsergym_eval_env: str | None,
) -> None:
self.plugins_to_load = plugins_to_load
self._initial_cwd = work_dir
self.username = username
self.user_id = user_id
_updated_user_id = init_user_and_working_directory(
username=username, user_id=self.user_id, initial_cwd=work_dir
)
if _updated_user_id is not None:
self.user_id = _updated_user_id
self.bash_session: BashSession | None = None
self.lock = asyncio.Lock()
self.plugins: dict[str, Plugin] = {}
self.browser = BrowserEnv(browsergym_eval_env)
self.start_time = time.time()
self.last_execution_time = self.start_time
self._initialized = False
@property
def initial_cwd(self):
return self._initial_cwd
async def ainit(self):
# bash needs to be initialized first
self.bash_session = BashSession(
work_dir=self._initial_cwd,
username=self.username,
)
self.bash_session.initialize()
await wait_all(
(self._init_plugin(plugin) for plugin in self.plugins_to_load),
timeout=30,
)
# This is a temporary workaround
# TODO: refactor AgentSkills to be part of JupyterPlugin
# AFTER ServerRuntime is deprecated
if 'agent_skills' in self.plugins and 'jupyter' in self.plugins:
obs = await self.run_ipython(
IPythonRunCellAction(
code='from openhands.runtime.plugins.agent_skills.agentskills import *\n'
)
)
logger.debug(f'AgentSkills initialized: {obs}')
await self._init_bash_commands()
logger.debug('Runtime client initialized.')
self._initialized = True
@property
def initialized(self) -> bool:
return self._initialized
async def _init_plugin(self, plugin: Plugin):
assert self.bash_session is not None
await plugin.initialize(self.username)
self.plugins[plugin.name] = plugin
logger.debug(f'Initializing plugin: {plugin.name}')
if isinstance(plugin, JupyterPlugin):
await self.run_ipython(
IPythonRunCellAction(
code=f'import os; os.chdir("{self.bash_session.cwd}")'
)
)
async def _init_bash_commands(self):
logger.debug(f'Initializing by running {len(INIT_COMMANDS)} bash commands...')
for command in INIT_COMMANDS:
action = CmdRunAction(command=command)
action.timeout = 300
logger.debug(f'Executing init command: {command}')
obs = await self.run(action)
assert isinstance(obs, CmdOutputObservation)
logger.debug(
f'Init command outputs (exit code: {obs.exit_code}): {obs.content}'
)
assert obs.exit_code == 0
logger.debug('Bash init commands completed')
async def run(
self, action: CmdRunAction
) -> CmdOutputObservation | ErrorObservation:
assert self.bash_session is not None
obs = await call_sync_from_async(self.bash_session.execute, action)
return obs
def close(self):
if self.bash_session is not None:
self.bash_session.close()
self.browser.close()

View File

@@ -55,6 +55,8 @@ def get_action_execution_server_startup_command(
'--user-id',
str(user_id),
*browsergym_args,
'--executor-class',
app_config.runtime_executor,
]
return base_cmd