mirror of
https://github.com/All-Hands-AI/OpenHands.git
synced 2026-04-29 03:00:45 -04:00
Compare commits
2 Commits
github-tok
...
pr-6297
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e29a4147f2 | ||
|
|
f7424970f0 |
@@ -83,6 +83,9 @@ workspace_base = "./workspace"
|
||||
# Runtime environment
|
||||
#runtime = "docker"
|
||||
|
||||
# Runtime executor
|
||||
#runtime_executor = "openhands.runtime.executor:ActionExecutor"
|
||||
|
||||
# Name of the default agent
|
||||
#default_agent = "CodeActAgent"
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ import tempfile
|
||||
import time
|
||||
import traceback
|
||||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
from typing import Type
|
||||
from zipfile import ZipFile
|
||||
|
||||
from fastapi import Depends, FastAPI, HTTPException, Request, UploadFile
|
||||
@@ -60,7 +60,6 @@ from openhands.runtime.utils.files import insert_lines, read_lines
|
||||
from openhands.runtime.utils.memory_monitor import MemoryMonitor
|
||||
from openhands.runtime.utils.runtime_init import init_user_and_working_directory
|
||||
from openhands.runtime.utils.system_stats import get_system_stats
|
||||
from openhands.utils.async_utils import call_sync_from_async, wait_all
|
||||
|
||||
|
||||
class ActionRequest(BaseModel):
|
||||
@@ -68,7 +67,6 @@ class ActionRequest(BaseModel):
|
||||
|
||||
|
||||
ROOT_GID = 0
|
||||
|
||||
SESSION_API_KEY = os.environ.get('SESSION_API_KEY')
|
||||
api_key_header = APIKeyHeader(name='X-Session-API-Key', auto_error=False)
|
||||
|
||||
@@ -135,7 +133,6 @@ class ActionExecutor:
|
||||
"""ActionExecutor is running inside docker sandbox.
|
||||
It is responsible for executing actions received from OpenHands backend and producing observations.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
plugins_to_load: list[Plugin],
|
||||
@@ -463,6 +460,7 @@ class ActionExecutor:
|
||||
if self.bash_session is not None:
|
||||
self.bash_session.close()
|
||||
self.browser.close()
|
||||
>>>>>>> origin/main
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
@@ -480,6 +478,12 @@ if __name__ == '__main__':
|
||||
help='BrowserGym environment used for browser evaluation',
|
||||
default=None,
|
||||
)
|
||||
parser.add_argument(
|
||||
'--executor-class',
|
||||
type=str,
|
||||
default='openhands.runtime.executor:ActionExecutor',
|
||||
help='Action executor class to use (format: module.path:ClassName)',
|
||||
)
|
||||
# example: python client.py 8000 --working-dir /workspace --plugins JupyterRequirement
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -490,12 +494,13 @@ if __name__ == '__main__':
|
||||
raise ValueError(f'Plugin {plugin} not found')
|
||||
plugins_to_load.append(ALL_PLUGINS[plugin]()) # type: ignore
|
||||
|
||||
client: ActionExecutor | None = None
|
||||
executor_class = get_action_executor_class(args.executor_class)
|
||||
client: RuntimeExecutor | None = None
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
global client
|
||||
client = ActionExecutor(
|
||||
client = executor_class(
|
||||
plugins_to_load,
|
||||
work_dir=args.working_dir,
|
||||
username=args.username,
|
||||
|
||||
4
openhands/runtime/executor/__init__.py
Normal file
4
openhands/runtime/executor/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from .base import RuntimeExecutor
|
||||
from .action_executor import ActionExecutor, BaseActionExecutor
|
||||
|
||||
__all__ = ['ActionExecutor', 'BaseActionExecutor', 'RuntimeExecutor']
|
||||
267
openhands/runtime/executor/action_executor.py
Normal file
267
openhands/runtime/executor/action_executor.py
Normal file
@@ -0,0 +1,267 @@
|
||||
import base64
|
||||
import json
|
||||
import mimetypes
|
||||
import os
|
||||
from pathlib import Path
|
||||
import re
|
||||
from openhands_aci.utils.diff import get_diff
|
||||
from openhands.core.logger import openhands_logger as logger
|
||||
from openhands.events.action.browse import BrowseInteractiveAction, BrowseURLAction
|
||||
from openhands.events.action.commands import IPythonRunCellAction
|
||||
from openhands.events.action.files import FileReadAction, FileWriteAction
|
||||
from openhands.events.event import FileEditSource, FileReadSource
|
||||
from openhands.events.observation.commands import (
|
||||
IPythonRunCellObservation,
|
||||
)
|
||||
from openhands.events.observation.error import ErrorObservation
|
||||
from openhands.events.observation.files import (
|
||||
FileEditObservation,
|
||||
FileReadObservation,
|
||||
FileWriteObservation,
|
||||
)
|
||||
from openhands.events.observation.observation import Observation
|
||||
from openhands.runtime.browser import browse
|
||||
from openhands.runtime.executor.base import RuntimeExecutor
|
||||
from openhands.runtime.plugins.jupyter import JupyterPlugin
|
||||
from openhands.runtime.utils.files import insert_lines, read_lines
|
||||
|
||||
|
||||
class BaseActionExecutor(RuntimeExecutor):
|
||||
"""Runtime executor that dynamically dispatches actions to the appropriate method based on their name."""
|
||||
|
||||
async def run_action(self, action) -> Observation:
|
||||
async with self.lock:
|
||||
action_type = action.action
|
||||
logger.debug(f'Running action:\n{action}')
|
||||
observation = await getattr(self, action_type)(action)
|
||||
logger.debug(f'Action output:\n{observation}')
|
||||
return observation
|
||||
|
||||
|
||||
class ActionExecutor(BaseActionExecutor):
|
||||
"""ActionExecutor runs inside docker sandbox.
|
||||
It is responsible for executing actions received from OpenHands backend and producing observations.
|
||||
It is a BaseActionExectuor that provides a default implementation for all of the built-in actions.
|
||||
"""
|
||||
|
||||
async def run_ipython(self, action: IPythonRunCellAction) -> Observation:
|
||||
assert self.bash_session is not None
|
||||
if 'jupyter' in self.plugins:
|
||||
_jupyter_plugin: JupyterPlugin = self.plugins['jupyter'] # type: ignore
|
||||
# This is used to make AgentSkills in Jupyter aware of the
|
||||
# current working directory in Bash
|
||||
jupyter_cwd = getattr(self, '_jupyter_cwd', None)
|
||||
if self.bash_session.cwd != jupyter_cwd:
|
||||
logger.debug(
|
||||
f'{self.bash_session.cwd} != {jupyter_cwd} -> reset Jupyter PWD'
|
||||
)
|
||||
reset_jupyter_cwd_code = (
|
||||
f'import os; os.chdir("{self.bash_session.cwd}")'
|
||||
)
|
||||
_aux_action = IPythonRunCellAction(code=reset_jupyter_cwd_code)
|
||||
_reset_obs: IPythonRunCellObservation = await _jupyter_plugin.run(
|
||||
_aux_action
|
||||
)
|
||||
logger.debug(
|
||||
f'Changed working directory in IPython to: {self.bash_session.cwd}. Output: {_reset_obs}'
|
||||
)
|
||||
self._jupyter_cwd = self.bash_session.cwd
|
||||
|
||||
obs: IPythonRunCellObservation = await _jupyter_plugin.run(action)
|
||||
obs.content = obs.content.rstrip()
|
||||
matches = re.findall(
|
||||
r'<oh_aci_output_[0-9a-f]{32}>(.*?)</oh_aci_output_[0-9a-f]{32}>',
|
||||
obs.content,
|
||||
re.DOTALL,
|
||||
)
|
||||
if matches:
|
||||
results: list[str] = []
|
||||
if len(matches) == 1:
|
||||
# Use specific actions/observations types
|
||||
match = matches[0]
|
||||
try:
|
||||
result_dict = json.loads(match)
|
||||
if result_dict.get('path'): # Successful output
|
||||
if (
|
||||
result_dict['new_content'] is not None
|
||||
): # File edit commands
|
||||
diff = get_diff(
|
||||
old_contents=result_dict['old_content']
|
||||
or '', # old_content is None when file is created
|
||||
new_contents=result_dict['new_content'],
|
||||
filepath=result_dict['path'],
|
||||
)
|
||||
return FileEditObservation(
|
||||
content=diff,
|
||||
path=result_dict['path'],
|
||||
old_content=result_dict['old_content'],
|
||||
new_content=result_dict['new_content'],
|
||||
prev_exist=result_dict['prev_exist'],
|
||||
impl_source=FileEditSource.OH_ACI,
|
||||
formatted_output_and_error=result_dict[
|
||||
'formatted_output_and_error'
|
||||
],
|
||||
)
|
||||
else: # File view commands
|
||||
return FileReadObservation(
|
||||
content=result_dict['formatted_output_and_error'],
|
||||
path=result_dict['path'],
|
||||
impl_source=FileReadSource.OH_ACI,
|
||||
)
|
||||
else: # Error output
|
||||
results.append(result_dict['formatted_output_and_error'])
|
||||
except json.JSONDecodeError:
|
||||
# Handle JSON decoding errors if necessary
|
||||
results.append(
|
||||
f"Invalid JSON in 'openhands-aci' output: {match}"
|
||||
)
|
||||
else:
|
||||
for match in matches:
|
||||
try:
|
||||
result_dict = json.loads(match)
|
||||
results.append(result_dict['formatted_output_and_error'])
|
||||
except json.JSONDecodeError:
|
||||
# Handle JSON decoding errors if necessary
|
||||
results.append(
|
||||
f"Invalid JSON in 'openhands-aci' output: {match}"
|
||||
)
|
||||
|
||||
# Combine the results (e.g., join them) or handle them as required
|
||||
obs.content = '\n'.join(str(result) for result in results)
|
||||
|
||||
if action.include_extra:
|
||||
obs.content += (
|
||||
f'\n[Jupyter current working directory: {self.bash_session.cwd}]'
|
||||
)
|
||||
obs.content += f'\n[Jupyter Python interpreter: {_jupyter_plugin.python_interpreter_path}]'
|
||||
return obs
|
||||
else:
|
||||
raise RuntimeError(
|
||||
'JupyterRequirement not found. Unable to run IPython action.'
|
||||
)
|
||||
|
||||
def _resolve_path(self, path: str, working_dir: str) -> str:
|
||||
filepath = Path(path)
|
||||
if not filepath.is_absolute():
|
||||
return str(Path(working_dir) / filepath)
|
||||
return str(filepath)
|
||||
|
||||
async def read(self, action: FileReadAction) -> Observation:
|
||||
assert self.bash_session is not None
|
||||
if action.impl_source == FileReadSource.OH_ACI:
|
||||
return await self.run_ipython(
|
||||
IPythonRunCellAction(
|
||||
code=action.translated_ipython_code,
|
||||
include_extra=False,
|
||||
)
|
||||
)
|
||||
|
||||
# NOTE: the client code is running inside the sandbox,
|
||||
# so there's no need to check permission
|
||||
working_dir = self.bash_session.cwd
|
||||
filepath = self._resolve_path(action.path, working_dir)
|
||||
try:
|
||||
if filepath.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
|
||||
with open(filepath, 'rb') as file:
|
||||
image_data = file.read()
|
||||
encoded_image = base64.b64encode(image_data).decode('utf-8')
|
||||
mime_type, _ = mimetypes.guess_type(filepath)
|
||||
if mime_type is None:
|
||||
mime_type = 'image/png' # default to PNG if mime type cannot be determined
|
||||
encoded_image = f'data:{mime_type};base64,{encoded_image}'
|
||||
|
||||
return FileReadObservation(path=filepath, content=encoded_image)
|
||||
elif filepath.lower().endswith('.pdf'):
|
||||
with open(filepath, 'rb') as file:
|
||||
pdf_data = file.read()
|
||||
encoded_pdf = base64.b64encode(pdf_data).decode('utf-8')
|
||||
encoded_pdf = f'data:application/pdf;base64,{encoded_pdf}'
|
||||
return FileReadObservation(path=filepath, content=encoded_pdf)
|
||||
elif filepath.lower().endswith(('.mp4', '.webm', '.ogg')):
|
||||
with open(filepath, 'rb') as file:
|
||||
video_data = file.read()
|
||||
encoded_video = base64.b64encode(video_data).decode('utf-8')
|
||||
mime_type, _ = mimetypes.guess_type(filepath)
|
||||
if mime_type is None:
|
||||
mime_type = 'video/mp4' # default to MP4 if MIME type cannot be determined
|
||||
encoded_video = f'data:{mime_type};base64,{encoded_video}'
|
||||
|
||||
return FileReadObservation(path=filepath, content=encoded_video)
|
||||
|
||||
with open(filepath, 'r', encoding='utf-8') as file:
|
||||
lines = read_lines(file.readlines(), action.start, action.end)
|
||||
except FileNotFoundError:
|
||||
return ErrorObservation(
|
||||
f'File not found: {filepath}. Your current working directory is {working_dir}.'
|
||||
)
|
||||
except UnicodeDecodeError:
|
||||
return ErrorObservation(f'File could not be decoded as utf-8: {filepath}.')
|
||||
except IsADirectoryError:
|
||||
return ErrorObservation(
|
||||
f'Path is a directory: {filepath}. You can only read files'
|
||||
)
|
||||
|
||||
code_view = ''.join(lines)
|
||||
return FileReadObservation(path=filepath, content=code_view)
|
||||
|
||||
async def write(self, action: FileWriteAction) -> Observation:
|
||||
assert self.bash_session is not None
|
||||
working_dir = self.bash_session.cwd
|
||||
filepath = self._resolve_path(action.path, working_dir)
|
||||
|
||||
insert = action.content.split('\n')
|
||||
try:
|
||||
if not os.path.exists(os.path.dirname(filepath)):
|
||||
os.makedirs(os.path.dirname(filepath))
|
||||
|
||||
file_exists = os.path.exists(filepath)
|
||||
if file_exists:
|
||||
file_stat = os.stat(filepath)
|
||||
else:
|
||||
file_stat = None
|
||||
|
||||
mode = 'w' if not file_exists else 'r+'
|
||||
try:
|
||||
with open(filepath, mode, encoding='utf-8') as file:
|
||||
if mode != 'w':
|
||||
all_lines = file.readlines()
|
||||
new_file = insert_lines(
|
||||
insert, all_lines, action.start, action.end
|
||||
)
|
||||
else:
|
||||
new_file = [i + '\n' for i in insert]
|
||||
|
||||
file.seek(0)
|
||||
file.writelines(new_file)
|
||||
file.truncate()
|
||||
|
||||
# Handle file permissions
|
||||
if file_exists:
|
||||
assert file_stat is not None
|
||||
# restore the original file permissions if the file already exists
|
||||
os.chmod(filepath, file_stat.st_mode)
|
||||
os.chown(filepath, file_stat.st_uid, file_stat.st_gid)
|
||||
else:
|
||||
# set the new file permissions if the file is new
|
||||
os.chmod(filepath, 0o664)
|
||||
os.chown(filepath, self.user_id, self.user_id)
|
||||
|
||||
except FileNotFoundError:
|
||||
return ErrorObservation(f'File not found: {filepath}')
|
||||
except IsADirectoryError:
|
||||
return ErrorObservation(
|
||||
f'Path is a directory: {filepath}. You can only write to files'
|
||||
)
|
||||
except UnicodeDecodeError:
|
||||
return ErrorObservation(
|
||||
f'File could not be decoded as utf-8: {filepath}'
|
||||
)
|
||||
except PermissionError:
|
||||
return ErrorObservation(f'Malformed paths not permitted: {filepath}')
|
||||
return FileWriteObservation(content='', path=filepath)
|
||||
|
||||
async def browse(self, action: BrowseURLAction) -> Observation:
|
||||
return await browse(action, self.browser)
|
||||
|
||||
async def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
|
||||
return await browse(action, self.browser)
|
||||
126
openhands/runtime/executor/base.py
Normal file
126
openhands/runtime/executor/base.py
Normal file
@@ -0,0 +1,126 @@
|
||||
import asyncio
|
||||
import time
|
||||
from openhands.core.logger import openhands_logger as logger
|
||||
from openhands.events.action.commands import CmdRunAction, IPythonRunCellAction
|
||||
from openhands.events.observation.commands import CmdOutputObservation
|
||||
from openhands.events.observation.error import ErrorObservation
|
||||
from openhands.runtime.browser.browser_env import BrowserEnv
|
||||
from openhands.runtime.plugins.jupyter import JupyterPlugin
|
||||
from openhands.runtime.plugins.requirement import Plugin
|
||||
from openhands.runtime.utils.bash import BashSession
|
||||
from openhands.runtime.utils.runtime_init import init_user_and_working_directory
|
||||
from openhands.utils.async_utils import call_sync_from_async, wait_all
|
||||
|
||||
|
||||
ROOT_GID = 0
|
||||
INIT_COMMANDS = [
|
||||
'git config --global user.name "openhands" && git config --global user.email "openhands@all-hands.dev" && alias git="git --no-pager"',
|
||||
]
|
||||
|
||||
|
||||
class RuntimeExecutor:
|
||||
"""RuntimeExecutor for running inside docker sandbox.
|
||||
It provides a minimal base class that handles initialization of the executor, and provides a run method to execute bash commands.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
plugins_to_load: list[Plugin],
|
||||
work_dir: str,
|
||||
username: str,
|
||||
user_id: int,
|
||||
browsergym_eval_env: str | None,
|
||||
) -> None:
|
||||
self.plugins_to_load = plugins_to_load
|
||||
self._initial_cwd = work_dir
|
||||
self.username = username
|
||||
self.user_id = user_id
|
||||
_updated_user_id = init_user_and_working_directory(
|
||||
username=username, user_id=self.user_id, initial_cwd=work_dir
|
||||
)
|
||||
if _updated_user_id is not None:
|
||||
self.user_id = _updated_user_id
|
||||
|
||||
self.bash_session: BashSession | None = None
|
||||
self.lock = asyncio.Lock()
|
||||
self.plugins: dict[str, Plugin] = {}
|
||||
self.browser = BrowserEnv(browsergym_eval_env)
|
||||
self.start_time = time.time()
|
||||
self.last_execution_time = self.start_time
|
||||
self._initialized = False
|
||||
|
||||
@property
|
||||
def initial_cwd(self):
|
||||
return self._initial_cwd
|
||||
|
||||
async def ainit(self):
|
||||
# bash needs to be initialized first
|
||||
self.bash_session = BashSession(
|
||||
work_dir=self._initial_cwd,
|
||||
username=self.username,
|
||||
)
|
||||
self.bash_session.initialize()
|
||||
await wait_all(
|
||||
(self._init_plugin(plugin) for plugin in self.plugins_to_load),
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
# This is a temporary workaround
|
||||
# TODO: refactor AgentSkills to be part of JupyterPlugin
|
||||
# AFTER ServerRuntime is deprecated
|
||||
if 'agent_skills' in self.plugins and 'jupyter' in self.plugins:
|
||||
obs = await self.run_ipython(
|
||||
IPythonRunCellAction(
|
||||
code='from openhands.runtime.plugins.agent_skills.agentskills import *\n'
|
||||
)
|
||||
)
|
||||
logger.debug(f'AgentSkills initialized: {obs}')
|
||||
|
||||
await self._init_bash_commands()
|
||||
logger.debug('Runtime client initialized.')
|
||||
|
||||
self._initialized = True
|
||||
|
||||
@property
|
||||
def initialized(self) -> bool:
|
||||
return self._initialized
|
||||
|
||||
async def _init_plugin(self, plugin: Plugin):
|
||||
assert self.bash_session is not None
|
||||
await plugin.initialize(self.username)
|
||||
self.plugins[plugin.name] = plugin
|
||||
logger.debug(f'Initializing plugin: {plugin.name}')
|
||||
|
||||
if isinstance(plugin, JupyterPlugin):
|
||||
await self.run_ipython(
|
||||
IPythonRunCellAction(
|
||||
code=f'import os; os.chdir("{self.bash_session.cwd}")'
|
||||
)
|
||||
)
|
||||
|
||||
async def _init_bash_commands(self):
|
||||
logger.debug(f'Initializing by running {len(INIT_COMMANDS)} bash commands...')
|
||||
for command in INIT_COMMANDS:
|
||||
action = CmdRunAction(command=command)
|
||||
action.timeout = 300
|
||||
logger.debug(f'Executing init command: {command}')
|
||||
obs = await self.run(action)
|
||||
assert isinstance(obs, CmdOutputObservation)
|
||||
logger.debug(
|
||||
f'Init command outputs (exit code: {obs.exit_code}): {obs.content}'
|
||||
)
|
||||
assert obs.exit_code == 0
|
||||
|
||||
logger.debug('Bash init commands completed')
|
||||
|
||||
async def run(
|
||||
self, action: CmdRunAction
|
||||
) -> CmdOutputObservation | ErrorObservation:
|
||||
assert self.bash_session is not None
|
||||
obs = await call_sync_from_async(self.bash_session.execute, action)
|
||||
return obs
|
||||
|
||||
def close(self):
|
||||
if self.bash_session is not None:
|
||||
self.bash_session.close()
|
||||
self.browser.close()
|
||||
@@ -55,6 +55,8 @@ def get_action_execution_server_startup_command(
|
||||
'--user-id',
|
||||
str(user_id),
|
||||
*browsergym_args,
|
||||
'--executor-class',
|
||||
app_config.runtime_executor,
|
||||
]
|
||||
|
||||
return base_cmd
|
||||
|
||||
Reference in New Issue
Block a user