Files
OpenHands/opendevin/runtime/runtime.py
மனோஜ்குமார் பழனிச்சாமி 34c765688b Streamline Logging Events (#2532)
* Skip duplicate log

* log user actions

* fix tests

* log all action _step

* refactor log

* revert test

* refactor log

* visual diff

* disable overriding event source

* Revert "disable overriding event source"

This reverts commit b0047cc0cd.

* Refactor logic

* refactored runtime on_event

* fix merge conflict

in Web UI, it shows as red color (seems deletion but added)

* linted

---------

Co-authored-by: Xingyao Wang <xingyao6@illinois.edu>
2024-07-08 05:34:47 +09:00

156 lines
5.2 KiB
Python

from abc import abstractmethod
from typing import Any, Optional
from opendevin.core.config import config
from opendevin.core.exceptions import BrowserInitException
from opendevin.core.logger import opendevin_logger as logger
from opendevin.events import EventStream, EventStreamSubscriber
from opendevin.events.action import (
Action,
AgentRecallAction,
BrowseInteractiveAction,
BrowseURLAction,
CmdRunAction,
FileReadAction,
FileWriteAction,
IPythonRunCellAction,
)
from opendevin.events.event import Event
from opendevin.events.observation import (
ErrorObservation,
NullObservation,
Observation,
)
from opendevin.events.serialization.action import ACTION_TYPE_TO_CLASS
from opendevin.runtime import (
DockerSSHBox,
E2BBox,
LocalBox,
Sandbox,
)
from opendevin.runtime.browser.browser_env import BrowserEnv
from opendevin.runtime.plugins import PluginRequirement
from opendevin.runtime.tools import RuntimeTool
from opendevin.storage import FileStore, InMemoryFileStore
def create_sandbox(sid: str = 'default', box_type: str = 'ssh') -> Sandbox:
if box_type == 'local':
return LocalBox()
elif box_type == 'ssh':
return DockerSSHBox(sid=sid)
elif box_type == 'e2b':
return E2BBox()
else:
raise ValueError(f'Invalid sandbox type: {box_type}')
class Runtime:
"""
The runtime is how the agent interacts with the external environment.
This includes a bash sandbox, a browser, and filesystem interactions.
sid is the session id, which is used to identify the current user session.
"""
sid: str
file_store: FileStore
def __init__(
self,
event_stream: EventStream,
sid: str = 'default',
sandbox: Sandbox | None = None,
):
self.sid = sid
if sandbox is None:
self.sandbox = create_sandbox(sid, config.sandbox.box_type)
self._is_external_sandbox = False
else:
self.sandbox = sandbox
self._is_external_sandbox = True
self.browser: BrowserEnv | None = None
self.file_store = InMemoryFileStore()
self.event_stream = event_stream
self.event_stream.subscribe(EventStreamSubscriber.RUNTIME, self.on_event)
def close(self):
if not self._is_external_sandbox:
self.sandbox.close()
if self.browser is not None:
self.browser.close()
def init_sandbox_plugins(self, plugins: list[PluginRequirement]) -> None:
self.sandbox.init_plugins(plugins)
def init_runtime_tools(
self,
runtime_tools: list[RuntimeTool],
runtime_tools_config: Optional[dict[RuntimeTool, Any]] = None,
is_async: bool = True,
) -> None:
# if browser in runtime_tools, init it
if RuntimeTool.BROWSER in runtime_tools:
if runtime_tools_config is None:
runtime_tools_config = {}
browser_env_config = runtime_tools_config.get(RuntimeTool.BROWSER, {})
try:
self.browser = BrowserEnv(is_async=is_async, **browser_env_config)
except BrowserInitException:
logger.warn(
'Failed to start browser environment, web browsing functionality will not work'
)
async def on_event(self, event: Event) -> None:
if isinstance(event, Action):
observation = await self.run_action(event)
observation._cause = event.id # type: ignore[attr-defined]
self.event_stream.add_event(observation, event.source) # type: ignore[arg-type]
async def run_action(self, action: Action) -> Observation:
"""
Run an action and return the resulting observation.
If the action is not runnable in any runtime, a NullObservation is returned.
If the action is not supported by the current runtime, an ErrorObservation is returned.
"""
if not action.runnable:
return NullObservation('')
action_type = action.action # type: ignore[attr-defined]
if action_type not in ACTION_TYPE_TO_CLASS:
return ErrorObservation(f'Action {action_type} does not exist.')
if not hasattr(self, action_type):
return ErrorObservation(
f'Action {action_type} is not supported in the current runtime.'
)
observation = await getattr(self, action_type)(action)
observation._parent = action.id # type: ignore[attr-defined]
return observation
@abstractmethod
async def run(self, action: CmdRunAction) -> Observation:
pass
@abstractmethod
async def run_ipython(self, action: IPythonRunCellAction) -> Observation:
pass
@abstractmethod
async def read(self, action: FileReadAction) -> Observation:
pass
@abstractmethod
async def write(self, action: FileWriteAction) -> Observation:
pass
@abstractmethod
async def browse(self, action: BrowseURLAction) -> Observation:
pass
@abstractmethod
async def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
pass
@abstractmethod
async def recall(self, action: AgentRecallAction) -> Observation:
pass