Files
OpenHands/opendevin/controller.py
Robert Brennan 3b2ed14ae7 Use Docker SDK for sandbox, integrate into CommandManager (#93)
* refactor command manager to use docker and move to docker sdk

* fix read and write actions

* actually run background cmd

* use bash for running cmds and fix logs

* keep logs in buffer file

* fix up background logs

* consolidate requirements

* fix docker imports

* add fixme

* add remove fixme

* fix sandbox.py path in README

* fix typo annotation and prompt

---------

Co-authored-by: Xingyao Wang <xingyao6@illinois.edu>
2024-03-22 21:54:00 +08:00

50 lines
1.7 KiB
Python

from opendevin.lib.command_manager import CommandManager
from opendevin.lib.event import Event
def print_callback(event):
print(event, flush=True)
class AgentController:
def __init__(self, agent, workdir, max_iterations=100, callbacks=[]):
self.agent = agent
self.max_iterations = max_iterations
self.background_commands = []
self.command_manager = CommandManager(workdir)
self.callbacks = callbacks
self.callbacks.append(self.agent.add_event)
self.callbacks.append(print_callback)
def maybe_perform_action(self, event):
if not (event and event.is_runnable()):
return
action = 'output'
try:
output = event.run(self)
except Exception as e:
output = 'Error: ' + str(e)
action = 'error'
out_event = Event(action, {'output': output})
return out_event
def start_loop(self):
output = None
for i in range(self.max_iterations):
print("STEP", i, flush=True)
log_events = self.command_manager.get_background_events()
for event in log_events:
for callback in self.callbacks:
callback(event)
action_event = self.agent.step(self.command_manager)
for callback in self.callbacks:
callback(action_event)
if action_event.action == 'finish':
break
print("---", flush=True)
output_event = self.maybe_perform_action(action_event)
if output_event is not None:
for callback in self.callbacks:
callback(output_event)
print("==============", flush=True)