Minor fixes for new Agent framework (#158)

* add message if exit happens before finish

* fix kill command

* fix browse action

* fix sandbox

* update requirements

* add status code

* refactor controller

* add try to callbacks

* fix background log collection

* format logs a bit more nicely

* run background procs in same container

* fix close command

* rename sockt

* fix up kills

* fix lint issues

* fix ruff

---------

Co-authored-by: Xingyao Wang <xingyao6@illinois.edu>
This commit is contained in:
Robert Brennan
2024-03-26 00:55:33 -04:00
committed by GitHub
parent 430527680f
commit 486ec2d94a
7 changed files with 181 additions and 114 deletions

View File

@@ -9,6 +9,7 @@ from agenthub.langchains_agent.utils.monologue import Monologue
from agenthub.langchains_agent.utils.memory import LongTermMemory
from opendevin.action import (
NullAction,
CmdRunAction,
CmdKillAction,
BrowseURLAction,
@@ -128,16 +129,18 @@ class LangchainsAgent(Agent):
# Translate state to action_dict
for prev_action, obs in state.updated_info:
d = None
if isinstance(obs, CmdOutputObservation):
if obs.error:
d = {"action": "error", "args": {"output": obs.content}}
else:
d = {"action": "output", "args": {"output": obs.content}}
else:
d = {"action": "output", "args": {"output": obs.content}}
self._add_event(d)
raise ValueError(f"Unknown observation type: {obs}")
if d is not None:
self._add_event(d)
d = None
if isinstance(prev_action, CmdRunAction):
d = {"action": "run", "args": {"command": prev_action.command}}
elif isinstance(prev_action, CmdKillAction):
@@ -154,9 +157,12 @@ class LangchainsAgent(Agent):
d = {"action": "think", "args": {"thought": prev_action.thought}}
elif isinstance(prev_action, AgentFinishAction):
d = {"action": "finish"}
elif isinstance(prev_action, NullAction):
d = None
else:
raise NotImplementedError(f"Unknown action type: {prev_action}")
self._add_event(d)
raise ValueError(f"Unknown action type: {prev_action}")
if d is not None:
self._add_event(d)
state.updated_info = []

View File

@@ -10,11 +10,19 @@ class BrowseURLAction(ExecutableAction):
url: str
def run(self, *args, **kwargs) -> BrowserOutputObservation:
response = requests.get(self.url)
return BrowserOutputObservation(
content=response.text,
url=self.url
)
try:
response = requests.get(self.url)
return BrowserOutputObservation(
content=response.text,
status_code=response.status_code,
url=self.url
)
except requests.exceptions.RequestException as e:
return BrowserOutputObservation(
content=str(e),
error=True,
url=self.url
)
@property
def message(self) -> str:

View File

@@ -18,6 +18,8 @@ from opendevin.observation import (
from .command_manager import CommandManager
def print_with_indent(text: str):
print("\t"+text.replace("\n","\n\t"), flush=True)
class AgentController:
def __init__(
@@ -47,45 +49,60 @@ class AgentController:
self.state_updated_info.append((NullAction(), observation))
async def start_loop(self, task_instruction: str):
try:
self.agent.instruction = task_instruction
for i in range(self.max_iterations):
print("STEP", i, flush=True)
finished = False
self.agent.instruction = task_instruction
for i in range(self.max_iterations):
try:
finished = await self.step(i)
except Exception as e:
print("Error in loop", e, flush=True)
break
if finished:
break
if not finished:
print("Exited before finishing", flush=True)
state: State = self.get_current_state()
action: Action = self.agent.step(state)
print("ACTION", action, flush=True)
for _callback_fn in self.callbacks:
_callback_fn(action)
if isinstance(action, AgentFinishAction):
print("FINISHED", flush=True)
break
if isinstance(action, (FileReadAction, FileWriteAction)):
action_cls = action.__class__
_kwargs = action.__dict__
_kwargs["base_path"] = self.workdir
action = action_cls(**_kwargs)
print(action, flush=True)
print("---", flush=True)
async def step(self, i: int):
print("\n\n==============", flush=True)
print("STEP", i, flush=True)
log_obs = self.command_manager.get_background_obs()
for obs in log_obs:
self.add_observation(obs)
await self._run_callbacks(obs)
print_with_indent("\nBACKGROUND LOG:\n%s" % obs)
state: State = self.get_current_state()
action: Action = self.agent.step(state)
print_with_indent("\nACTION:\n%s" % action)
await self._run_callbacks(action)
if isinstance(action, AgentFinishAction):
print_with_indent("\nFINISHED")
return True
if isinstance(action, (FileReadAction, FileWriteAction)):
action_cls = action.__class__
_kwargs = action.__dict__
_kwargs["base_path"] = self.workdir
action = action_cls(**_kwargs)
print(action, flush=True)
if action.executable:
observation: Observation = action.run(self)
else:
observation = NullObservation("")
print_with_indent("\nOBSERVATION:\n%s" % observation)
self.state_updated_info.append((action, observation))
await self._run_callbacks(observation)
if action.executable:
observation: Observation = action.run(self)
else:
print("ACTION NOT EXECUTABLE", flush=True)
observation = NullObservation("")
print("OBSERVATION", observation, flush=True)
self.state_updated_info.append((action, observation))
print(observation, flush=True)
for _callback_fn in self.callbacks:
_callback_fn(observation)
print("==============", flush=True)
await asyncio.sleep(0.001)
except Exception as e:
print("Error in loop", e, flush=True)
pass
async def _run_callbacks(self, event):
if event is None:
return
for callback in self.callbacks:
idx = self.callbacks.index(callback)
try:
callback(event)
except Exception as e:
print("Callback error:" + str(idx), e, flush=True)
pass
await asyncio.sleep(0.001) # Give back control for a tick, so we can await in callbacks

View File

@@ -3,24 +3,9 @@ from typing import List
from opendevin.observation import CmdOutputObservation
from opendevin.sandbox.sandbox import DockerInteractive
class BackgroundCommand:
def __init__(self, id: int, command: str, dir: str):
self.command = command
self.id = id
self.shell = DockerInteractive(id=str(id), workspace_dir=dir)
self.shell.execute_in_background(command)
def get_logs(self) -> str:
# TODO: get an exit code if process is exited
return self.shell.read_logs()
class CommandManager:
def __init__(self, dir):
self.cur_id = 0
self.directory = dir
self.background_commands = {}
self.shell = DockerInteractive(id="default", workspace_dir=dir)
def run_command(self, command: str, background=False) -> CmdOutputObservation:
@@ -32,35 +17,38 @@ class CommandManager:
def _run_immediately(self, command: str) -> CmdOutputObservation:
exit_code, output = self.shell.execute(command)
return CmdOutputObservation(
command_id=-1,
content=output,
command_id=self.cur_id,
command=command,
exit_code=exit_code
)
def _run_background(self, command: str) -> CmdOutputObservation:
bg_cmd = BackgroundCommand(self.cur_id, command, self.directory)
self.cur_id += 1
self.background_commands[bg_cmd.id] = bg_cmd
bg_cmd = self.shell.execute_in_background(command)
return CmdOutputObservation(
content=f"Background command started. To stop it, send a `kill` action with id {bg_cmd.id}",
content=f"Background command started. To stop it, send a `kill` action with id {bg_cmd.id}",
command_id=bg_cmd.id,
command=command,
exit_code=0
)
def kill_command(self, id: int):
# TODO: get log events before killing
self.background_commands[id].shell.close()
del self.background_commands[id]
def kill_command(self, id: int) -> CmdOutputObservation:
cmd = self.shell.kill_background(id)
return CmdOutputObservation(
content=f"Background command with id {id} has been killed.",
command_id=id,
command=cmd.command,
exit_code=0
)
def get_background_obs(self) -> List[CmdOutputObservation]:
obs = []
for _id, cmd in self.background_commands.items():
output = cmd.get_logs()
obs.append(
CmdOutputObservation(
content=output, command_id=_id, command=cmd.command
for _id, cmd in self.shell.background_commands.items():
output = cmd.read_logs()
if output is not None and output != "":
obs.append(
CmdOutputObservation(
content=output, command_id=_id, command=cmd.command
)
)
)
return obs

View File

@@ -57,6 +57,8 @@ class BrowserOutputObservation(Observation):
"""
url: str
status_code: int = 200
error: bool = False
@property
def message(self) -> str:

View File

@@ -4,7 +4,7 @@ import uuid
import time
import select
import docker
from typing import Tuple
from typing import Tuple, Dict, List
from collections import namedtuple
import atexit
@@ -12,6 +12,35 @@ InputType = namedtuple("InputType", ["content"])
OutputType = namedtuple("OutputType", ["content"])
CONTAINER_IMAGE = os.getenv("SANDBOX_CONTAINER_IMAGE", "opendevin/sandbox:v0.1")
# FIXME: On some containers, the devin user doesn't have enough permission, e.g. to install packages
# How do we make this more flexible?
RUN_AS_DEVIN = os.getenv("RUN_AS_DEVIN", "true").lower() != "false"
class BackgroundCommand:
def __init__(self, id: int, command: str, result):
self.id = id
self.command = command
self.result = result
def read_logs(self) -> str:
# TODO: get an exit code if process is exited
logs = ""
while True:
ready_to_read, _, _ = select.select([self.result.output], [], [], .1) # type: ignore[has-type]
if ready_to_read:
data = self.result.output.read(4096) # type: ignore[has-type]
if not data:
break
# FIXME: we're occasionally seeing some escape characters like `\x02` and `\x00` in the logs...
chunk = data.decode('utf-8')
logs += chunk
else:
break
return logs
def kill(self):
# FIXME: this doesn't actually kill the process!
self.result.output.close()
USER_ID = 1000
@@ -21,6 +50,9 @@ elif hasattr(os, "getuid"):
USER_ID = os.getuid()
class DockerInteractive:
closed = False
cur_background_id = 0
background_commands : Dict[int, BackgroundCommand] = {}
def __init__(
self,
@@ -55,45 +87,55 @@ class DockerInteractive:
self.container_name = f"sandbox-{self.instance_id}"
self.restart_docker_container()
if RUN_AS_DEVIN:
self.setup_devin_user()
atexit.register(self.cleanup)
def setup_devin_user(self):
uid = os.getuid()
exit_code, logs = self.container.exec_run([
'/bin/bash', '-c',
f'useradd --shell /bin/bash -u {USER_ID} -o -c \"\" -m devin'
f'useradd --shell /bin/bash -u {uid} -o -c \"\" -m devin'
],
workdir="/workspace"
)
# regester container cleanup function
atexit.register(self.cleanup)
def read_logs(self) -> str:
if not hasattr(self, "log_generator"):
return ""
logs = ""
while True:
ready_to_read, _, _ = select.select([self.log_generator], [], [], .1) # type: ignore[has-type]
if ready_to_read:
data = self.log_generator.read(4096) # type: ignore[has-type]
if not data:
break
# FIXME: we're occasionally seeing some escape characters like `\x02` and `\x00` in the logs...
chunk = data.decode('utf-8')
logs += chunk
else:
break
return logs
def get_exec_cmd(self, cmd: str) -> List[str]:
if RUN_AS_DEVIN:
return ['su', 'devin', '-c', cmd]
else:
return ['/bin/bash', '-c', cmd]
def read_logs(self, id) -> str:
if id not in self.background_commands:
raise ValueError("Invalid background command id")
bg_cmd = self.background_commands[id]
return bg_cmd.read_logs()
def execute(self, cmd: str) -> Tuple[int, str]:
# TODO: each execute is not stateful! We need to keep track of the current working directory
exit_code, logs = self.container.exec_run(['su', 'devin', '-c', cmd], workdir="/workspace")
exit_code, logs = self.container.exec_run(self.get_exec_cmd(cmd), workdir="/workspace")
return exit_code, logs.decode('utf-8')
def execute_in_background(self, cmd: str) -> None:
self.log_time = time.time()
result = self.container.exec_run(['su', 'devin', '-c', cmd], socket=True, workdir="/workspace")
self.log_generator = result.output # socket.SocketIO
self.log_generator._sock.setblocking(0)
def execute_in_background(self, cmd: str) -> BackgroundCommand:
result = self.container.exec_run(self.get_exec_cmd(cmd), socket=True, workdir="/workspace")
result.output._sock.setblocking(0)
bg_cmd = BackgroundCommand(self.cur_background_id, cmd, result)
self.background_commands[bg_cmd.id] = bg_cmd
self.cur_background_id += 1
return bg_cmd
def kill_background(self, id: int) -> BackgroundCommand:
if id not in self.background_commands:
raise ValueError("Invalid background command id")
bg_cmd = self.background_commands[id]
bg_cmd.kill()
self.background_commands.pop(id)
return bg_cmd
def close(self):
self.stop_docker_container()
self.closed = True
def stop_docker_container(self):
docker_client = docker.from_env()
@@ -145,8 +187,9 @@ class DockerInteractive:
# clean up the container, cannot do it in __del__ because the python interpreter is already shutting down
def cleanup(self):
if self.closed:
return
self.container.remove(force=True)
print("Finish cleaning up Docker container")
if __name__ == "__main__":
import argparse
@@ -165,10 +208,7 @@ if __name__ == "__main__":
)
print("Interactive Docker container started. Type 'exit' or use Ctrl+C to exit.")
bg = DockerInteractive(
workspace_dir=args.directory,
)
bg.execute_in_background("while true; do echo 'dot ' && sleep 1; done")
bg_cmd = docker_interactive.execute_in_background("while true; do echo 'dot ' && sleep 1; done")
sys.stdout.flush()
try:
@@ -181,11 +221,16 @@ if __name__ == "__main__":
if user_input.lower() == "exit":
print("Exiting...")
break
if user_input.lower() == "kill":
docker_interactive.kill_background(bg_cmd.id)
print("Background process killed")
continue
exit_code, output = docker_interactive.execute(user_input)
print("exit code:", exit_code)
print(output + "\n", end="")
logs = bg.read_logs()
print("background logs:", logs, "\n")
if bg_cmd.id in docker_interactive.background_commands:
logs = docker_interactive.read_logs(bg_cmd.id)
print("background logs:", logs, "\n")
sys.stdout.flush()
except KeyboardInterrupt:
print("\nExiting...")

View File

@@ -6,12 +6,13 @@ seaborn
docker
fastapi
uvicorn[standard]
ruff
mypy
# for agenthub/lanchangs_agent
langchain
langchain-core
langchain-openai
langchain-community
llama-index
llama-index-vector-stores-chroma
chromadb
chromadb