new example

This commit is contained in:
heroding77
2023-11-22 16:30:11 +08:00
parent 4db5c4acbf
commit a32fcda660
43 changed files with 1160 additions and 10 deletions

View File

@@ -0,0 +1,34 @@
from jarvis.agent.openai_agent import OpenAIAgent
from jarvis.enviroment.old_env import BaseEnviroment
'''
A minimal example for base env and openai agent
The goal of this example is to demonstrate how agent parse response to get actions, and env execute those actions.
'''
environment = BaseEnviroment()
agent = OpenAIAgent(config_path="config.json")
response = '''
Thought: To open a document, we can focus on one goal: open the specified document(word, pdf, pptx, txt).
Actions:
1. <action>open_document</action>
Paramters:
1. <parameter><arg>path:/home/heroding/桌面/test.txt</arg><arg>name:test.txt</arg><arg>type:txt</arg></parameter>
'''
action = agent.extract_action(response, begin_str='<action>', end_str='</action>')
paramter = agent.extract_parameter(response, begin_str='<parameter>', end_str='</parameter>')
print(paramter)
# import time
# for a in action:
# command = agent.action_lib[a]
# # print(a, command)
# print(environment.step(command))
# # time.sleep(2)
# from jarvis.action_lib.execute_sql import ExecuteSQL
# action = ExecuteSQL()
# action(query='SELECT * FROM railway\nWHERE number="D1000";')

181
jarvis/.gitignore vendored Normal file
View File

@@ -0,0 +1,181 @@
langchain/
guidance/
Auto-GPT/
Agents/
utils/
working_dir/*.log
.vs/
.vscode/
.idea/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
docs/docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
notebooks/
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.envrc
.venv
.venvs
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# macOS display setting files
.DS_Store
# Wandb directory
wandb/
# asdf tool versions
.tool-versions
/.ruff_cache/
*.pkl
*.bin
# integration test artifacts
data_map*
\[('_type', 'fake'), ('stop', None)]
# Replit files
*replit*
node_modules
docs/.yarn/
docs/node_modules/
docs/.docusaurus/
docs/.cache-loader/
docs/_dist
docs/api_reference/api_reference.rst
docs/api_reference/experimental_api_reference.rst
docs/api_reference/_build
docs/api_reference/*/
!docs/api_reference/_static/
!docs/api_reference/templates/
!docs/api_reference/themes/
docs/docs_skeleton/build
docs/docs_skeleton/node_modules
docs/docs_skeleton/yarn.lock

1
jarvis/README.md Normal file
View File

@@ -0,0 +1 @@
# jarvis

View File

@@ -0,0 +1,11 @@
from jarvis.action.base_action import BaseAction
class open_picture(BaseAction):
def __init__(self) -> None:
super().__init__()
self._description = "Using open_picture() will open the picture you want."
self.action_type = 'BASH'
def __call__(self, *args, **kwargs):
return 'shortcuts run "open_picture"'

View File

@@ -4,6 +4,8 @@ import glob
from jarvis.action.base_action import BaseAction
import importlib
from typing import Optional
import inspect
class BaseAgent:
"""
@@ -29,14 +31,32 @@ class BaseAgent:
if file.endswith('.py') and "__init__" not in file:
class_name = file[:-3].split('/')[-1] # 去除.py后缀获取类名
module = importlib.import_module(class_name)
tmp_obj = getattr(module, class_name)()
self.action_lib.update({class_name: tmp_obj})
self.action_lib_description.update({class_name: tmp_obj.description})
obj_code = self.get_class_source_code(module, class_name)
self.action_lib.update({class_name: obj_code})
def get_class_source_code(self, module, class_name):
# 获取类对象
tmp_obj = getattr(module, class_name)
print(type(tmp_obj.description))
self.action_lib_description.update({class_name: tmp_obj.description})
# 获取类定义的源文件和行号
source_file = inspect.getsourcefile(tmp_obj)
source_lines, start_line = inspect.getsourcelines(tmp_obj)
# 读取源文件
with open(source_file, 'r') as file:
lines = file.readlines()
# 提取类的源代码
class_code = ''.join(lines[start_line - 1: start_line - 1 + len(source_lines)])
return class_code
if __name__ == '__main__':
a = BaseAgent()
a.init_action_lib()
for k,v in a.action_lib.items():
print(k)
print(v())
# a.init_action_lib()
# for k,v in a.action_lib.items():
# print(k)
# print(v)

View File

@@ -76,6 +76,32 @@ class OpenAIAgent(BaseAgent):
_end = message.find(end_str)
return result
def extract_parameter(self, message, begin_str='[BEGIN]', end_str='[END]'):
result = []
_begin_parameter = message.find(begin_str)
_end_parameter = message.find(end_str)
# go through parameters
while not (_begin_parameter == -1 or _end_parameter == -1):
# get current task parameters
parameter = message[_begin_parameter + len(begin_str):_end_parameter].strip()
_begin_arg = parameter.find("<arg>")
_end_arg = parameter.find("</arg>")
args = {}
# go through args
while not (_begin_arg == -1 or _end_arg == -1):
arg = parameter[_begin_arg + len("<arg>"): _end_arg].strip()
key, value = arg.split(":")
args[key] = value
parameter = parameter[_end_arg + len("</arg>"):].strip()
_begin_arg = parameter.find("<arg>")
_end_arg = parameter.find("</arg>")
result.append(args)
message = message[_end_parameter + len(end_str):]
_begin_parameter = message.find(begin_str)
_end_parameter = message.find(end_str)
return result
def chat(self, goal: str):
self._history = []

View File

@@ -11,3 +11,7 @@ def open_editor(file_path: str) -> None:
def open_vscode(path: str) -> None:
return code(path)

View File

@@ -1,7 +1,7 @@
import sys
sys.dont_write_bytecode = True
from ..src import *
from jarvis.atom_action.src import *
def view_document(file_path) -> None:
return evince(file_path)
@@ -11,3 +11,7 @@ def play_audio(file_path) -> None:
def play_video(file_path) -> None:
return totem(file_path)
path = "/home/heroding/桌面/CCF.pdf"
view_document(path)

View File

@@ -22,7 +22,9 @@ class BashEnv(Env):
def step(self, _command) -> EnvState:
self.env_state = EnvState(command=_command)
_command = _command() + ' && pwd'
print("where is the error???")
_command = _command + ' && pwd'
print("give me the answer!!!")
try:
results = subprocess.run([_command], capture_output=True, check=True, cwd=self.working_dir,
text=True, shell=True, timeout=self.timeout)

View File

@@ -40,4 +40,4 @@ class Env:
if __name__ == '__main__':
env = Env()
env.env_state = EnvState()
result = env.observe()
# result = env.observe()

View File

@@ -0,0 +1,31 @@
{
"model_name": "gpt-3.5-turbo-16k-0613",
"OPENAI_API_KEY": "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",
"OPENAI_ORGANIZATION": "",
"config": {
"API_KEY": "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",
"MAX_CHAT_HISTORY": "10",
"User_Names": "[\"User\"]",
"TOP_K": "3"
},
"LLM_type": "OpenAI",
"LLM": {
"temperature": 0.0,
"model": "gpt-3.5-turbo-16k-0613",
"log_path": "logs/god"
},
"agents": {
"Yang bufan": {
"style": "indifferent and idle",
"roles": {
"Response_state": "Yang bufan"
}
},
"User": {
"style": "soso",
"roles": {
"Response_state": "User"
}
}
}
}

21
jarvis/index.py Normal file
View File

@@ -0,0 +1,21 @@
import time
from jarvis.enviroment.base_env import BaseEnviroment
from jarvis.agent.openai_agent import OpenAIAgent
if __name__ == '__main__':
environment = BaseEnviroment()
agent = OpenAIAgent(
config_path="examples/config.json",
environment=environment
)
action = [
'turn_on_dark_mode',
'turn_on_light_mode'
]
# environment.init_env()
for a in action:
print(a)
command = agent.action_lib[a]
print(agent.env.step(command))
time.sleep(2)

View File

View File

View File

@@ -0,0 +1,97 @@
import subprocess
import time
import os
from typing import Optional
from jarvis.core.schema import ActionReturn, ActionStatusCode
class BaseAction:
"""Base class for all actions.
Args:
description (str, optional): The description of the action. Defaults to
None.
name (str, optional): The name of the action. If None, the name will
be class name. Defaults to None.
"""
def __init__(self,
description: Optional[str] = None,
name: Optional[str] = None,
timeout: Optional[int] = 2) -> None:
if name is None:
name = self.__class__.__name__
self._name = name
self._description = description
self._timeout = timeout
# self.execute_log_path = execute_log_path
# self.state_log_path = state_log_path
# self.log_execute = logging_command + self.execute_log_path
# self.log_state = logging_command + self.state_log_path
# self.working_dir = os.path.abspath(os.path.join(os.getcwd(), "..", "..", "working_dir"))
# self.server_name = "test"
# print(self.working_dir)
def _python(self, *lines):
return f'python -Bc "{"; ".join(lines)}"'
def _import(self, *packages):
return f'from jarvis.{".".join(packages)} import *'
@property
def _command(self):
raise NotImplementedError
@property
def timeout(self):
return self._timeout
# def _success(self):
# raise NotImplementedError
# def execute_command(self, command):
# subprocess.run(["tmux", "send-keys", "-t", self.server_name, command, "Enter"])
# async def parse_result(self):
# def __call__(self, *args, **kwargs) -> ActionReturn:
# def run(self, *args, **kwargs) -> ActionReturn:
# # 将run搬到env中
# command = self._command() + self.log_execute
# subprocess.run(["tmux", "send-keys", "-t", self.server_name, command, "Enter"])
# time.sleep(self.timeout)
# with open()
# action_return = ActionReturn(type=self.name, args=command)
# try:
# result = subprocess.run([command], capture_output=True, check=True,
# text=True, shell=True, timeout=self.timeout, stdin=subprocess.DEVNULL)
# result = subprocess.run(["tmux", "send-keys", "-t", 'test', command, "Enter"])
# if result.returncode == 0:
# action_return.state = ActionStatusCode.SUCCESS
# action_return.thought = self._success()
# if result.stdout:
# action_return.result = result.stdout
# if result.stderr:
# action_return.result = result.stderr
# except subprocess.CalledProcessError as e:
# action_return.state = ActionStatusCode.FAILED
# action_return.errmsg = e.stderr
# action_return.result = e.stdout
# return action_return
@property
def name(self):
return self._name
@property
def description(self):
return self._description
def __repr__(self):
return f'{self.name}:{self.description}'
def __str__(self):
return self.__repr__()
if __name__ == '__main__':
action = BaseAction()

View File

@@ -0,0 +1,39 @@
from jarvis.action.turn_on_dark_mode import turn_on_dark_mode
from jarvis.action.organize_app_layout import organize_app_layout
import subprocess
# import os
#
# functions = {"turn_on_dark_mode()": turn_on_dark_mode(), "turn_on_dark_mode()":organize_app_layout()}
# functions["turn_on_dark_mode()"].run()
organize_app_layout().run()
# command = 'shortcuts run "Dark Mode"'
# # command = 'du -d 1 -h'
# # subprocess.run([command], text=True, shell=True)
# # os.system(command)
# os.popen(command)
# try:
# # 执行命令
# cmd_output = os.popen(command).read()
# except Exception as e:
# # 捕获异常并处理
# print("An error occurred:\n", str(e))
# else:
# # 没有发生异常时执行的代码
# print("Command output:\n", cmd_output)
# import subprocess
#
# try:
# # 执行命令
# result = subprocess.run(['shortcuts', 'run', 'Dark Mode'], capture_output=True, text=True, stdin=subprocess.DEVNULL)
# result.check_returncode() # 检查命令的返回码
# except subprocess.CalledProcessError as e:
# # 命令执行失败时的异常处理
# print("Command failed with return code:", e.returncode)
# print("Command output:", e.stdout)
# except Exception as e:
# # 其他异常处理
# print("An error occurred:", str(e))
# else:
# # 没有发生异常时执行的代码
# print("Command output:", result.stdout)

View File

View File

@@ -0,0 +1,15 @@
from jarvis.action.base_action import BaseAction
class organize_app_layout(BaseAction):
def __init__(self) -> None:
super().__init__()
self._description = "Using organize_app_layout() will help user reorganize their Desktop layout for better working condition and focus more easily."
self._timeout = 15
@property
def _command(self) -> str:
return 'shortcuts run "Organize APP Layout"'
#
# def _success(self) -> str:
# return "Successfully organized the app's layout"

View File

@@ -0,0 +1,31 @@
from jarvis.action.base_action import BaseAction
class turn_on_dark_mode(BaseAction):
def __init__(self) -> None:
super().__init__()
self._description = "Using turn_on_dark_mode() will change your system into the dark mode."
@property
def _command(self):
return self._python(
self._import("atom", "operations"),
"adjust_theme('Adwaita-dark')"
)
# def _success(self):
# return "Successfully turned the system into the Dark Mode"
# def __call__(self, *args, **kwargs):
#
# command = 'shortcuts run "Dark Mode"'
# try:
# # result = subprocess.run([command, "Dark Mode"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# result = subprocess.run([command], capture_output=True, check=True,
# text=True, shell=True, timeout=self.timeout, stdin=subprocess.DEVNULL)
# if result.returncode == 0:
# return result
# except subprocess.CalledProcessError as e:
# return e
# except subprocess.TimeoutExpired:
# raise TimeoutError(f"Command '{command}' timed out after {self.timeout} seconds.")

View File

@@ -0,0 +1,32 @@
from jarvis.action.base_action import BaseAction
class turn_on_light_mode(BaseAction):
def __init__(self) -> None:
super().__init__()
self._description = "Using turn_on_light_mode() will change your system into the light mode."
@property
def _command(self):
return self._python(
self._import("atom", "operations"),
"adjust_theme('Adwaita')"
)
# def _success(self):
# return "Successfully turned the system into the Light Mode"
# def __call__(self, *args, **kwargs):
#
# command = 'shortcuts run "Dark Mode"'
# try:
# # result = subprocess.run([command, "Dark Mode"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# result = subprocess.run([command], capture_output=True, check=True,
# text=True, shell=True, timeout=self.timeout, stdin=subprocess.DEVNULL)
# if result.returncode == 0:
# return result
# except subprocess.CalledProcessError as e:
# return e
# except subprocess.TimeoutExpired:
# raise TimeoutError(f"Command '{command}' timed out after {self.timeout} seconds.")

View File

View File

@@ -0,0 +1,42 @@
import os
import sys
import glob
from jarvis.action.base_action import BaseAction
import importlib
from typing import Optional
class BaseAgent:
"""
BaseAgent is the base class of all agents.
"""
def __init__(self, config_path=None):
self.llm = None
self.environment = None
self.action_lib = {}
self.action_lib_description = {}
self.action = None
self.init_action_lib()
#
# def from_config(self, config_path=None):
# raise NotImplementedError
def init_action_lib(self, path=None, attribute_name='_description'):
if not path:
path = os.path.abspath(os.path.join(__file__, "..", "..", "action_lib"))
sys.path.append(path)
files = glob.glob(path + "/*.py")
for file in files:
if file.endswith('.py') and "__init__" not in file:
class_name = file[:-3].split('/')[-1] # 去除.py后缀获取类名
module = importlib.import_module(class_name)
tmp_obj = getattr(module, class_name)()
self.action_lib.update({class_name: tmp_obj._command})
self.action_lib_description.update({class_name: tmp_obj.description})
if __name__ == '__main__':
a = BaseAgent()
a.init_action_lib()
for k,v in a.action_lib.items():
print(k)
print(v)

View File

@@ -0,0 +1,119 @@
import json
from jarvis.core.llms import OpenAI
from jarvis.agent.base_agent import BaseAgent
from jarvis.enviroment.base_env import BaseEnviroment
from jarvis.core.schema import EnvState
a = "{action_input} the input to the action, could be any valid input for python programs or shell commands, such numbers, strings, or path to a file, etc."
BASE_PROMPT = """
{system_prompt}
{tool_description}
To use a tool, please use the following format:
```
{thought} to address the user request, thinking about what are the sub-goals you need to achieve and which tool is needed for each sub-goal?
{action} the tool names, each action name should be one of [{action_names}].
```
The response after utilizing tools should using the following format:
```
{response} To generate a response, you need to summarize your thoughts above and combined them with the tool execution results.
``
If you already know the answer, or you do not need to use tools,
please using the following format to reply:
```
{thought} the thought process to answer user questions
{response} respond to user request based on thought
```
Remember you must surround you action between <action> and </action>.
Now you are ready to take questions and requests from users.
"""
class OpenAIAgent(BaseAgent):
"""
BaseAgent is the base class of all agents.
"""
def __init__(self, config_path=None, environment: BaseEnviroment = None):
super().__init__()
self.llm = OpenAI(config_path)
self.env = environment
self.actions = None
self.max_iter = 10
self.system_prompt = """You are a personal assistant that aims to automate the workflow for human.\nYou are capable of understanding human intent and decompose it into several subgoals that can be addressed via language generation or acomplished using external tools.\nSome of the external tools you can use and their functionalities are as follows:
"""
self.action_names = self.action_lib.keys()
# todo: 添加工具检索模块
self.available_action_description = ""
for i, name in enumerate(self.action_names):
self.available_action_description += "Tool {}: <action>{}</action>\n{}\n".format(i+1, name, self.action_lib_description[name])
def from_config(self, config_path=None):
self.llm = OpenAI(config_path)
def format_message(self, query):
self.prompt = BASE_PROMPT.format(
system_prompt=self.system_prompt,
tool_description=self.available_action_description,
action_names=self.action_names,
thought="Thought:",
action="Actions:",
action_input="Action Input:",
response="Response:"
)
self.message = [
{"role": "system", "content": self.prompt},
{"role": "user", "content": query},
]
return self.llm.chat(self.message)
def extract_action(self, message, begin_str='[BEGIN]', end_str='[END]'):
result = []
_begin = message.find(begin_str)
_end = message.find(end_str)
while not (_begin == -1 or _end == -1):
result.append(message[_begin + len(begin_str):_end].strip())
message = message[_end + len(end_str):]
_begin = message.find(begin_str)
_end = message.find(end_str)
return result
def chat(self, goal: str):
self._history = []
def step(self, single_action) -> EnvState:
_command = self.action_lib[single_action]
self.environment.step(_command)
return self.environment.observe()
if __name__ == '__main__':
actions = {
"turn_on_dark_mode()": "Using turn_on_dark_mode() will change your system into the dark mode.",
"play_study_music()": "Using play_study_music() will open Music in your Mac and play songs that are sutiable for study and work.",
"create_meeting()": "Using create_meeting() will help user create a meeting event. When users request to create a meeting, don't ask questions such as meeting title and time, just invoke this tool by generating the action name.",
"show_upcoming_meetings()": "Using show_upcoming_meetings() will open Calendar and show the their upcoming meetings for the user.",
"organize_app_layout()": "Using organize_app_layout() will help user reorganize their Desktop layout for better working condition and focus more easily."
}
environment = BaseEnviroment()
agent = OpenAIAgent(config_path="../../examples/config.json", environment=environment)
# print(agent.action_lib)
# print(agent.action_lib_description)
# executation_action = agent.action_lib["turn_on_dark_mode"]()
# executation_action.run()
# response = agent.format_message(query="I want to start working now. Please help set up the working environment for me.")
# print(agent.prompt)
# print(response['content'])
response = '''
Thought: To set up the working environment, we can focus on two sub-goals: turning on dark mode and organizing the app layout.
Actions:
1. <action>turn_on_dark_mode</action>
2. <action>turn_on_light_mode</action>'''
action = agent.extract_action(response, begin_str='<action>', end_str='</action>')
import time
for a in action:
print(a)
command = agent.action_lib[a]
print(agent.env.step(command))
time.sleep(2)

View File

@@ -0,0 +1,6 @@
import sys
sys.dont_write_bytecode = True
from .operations import *
from .query import *
from .src import *

View File

@@ -0,0 +1,8 @@
import sys
sys.dont_write_bytecode = True
from .coding import *
from .files import *
from .media import *
from .routine import *
from .system import *

View File

@@ -0,0 +1,13 @@
import sys
sys.dont_write_bytecode = True
from ..src import *
def exec_python(file_path: str) -> None:
return python(file_path)
def open_editor(file_path: str) -> None:
return gedit(file_path)
def open_vscode(path: str) -> None:
return code(path)

View File

@@ -0,0 +1,35 @@
import sys
import os
sys.dont_write_bytecode = True
from ..src import *
def create_dir(dir_path: str, new_dir_name: str) -> None:
assert os.path.exists(dir_path)
return mkdir(os.path.join(dir_path, new_dir_name))
def create_file(file_path: str, new_file_name: str) -> None:
assert os.path.exists(file_path)
return touch(os.path.join(file_path, new_file_name))
def download_file(url: str, file_name: str) -> None:
return wget(url, "-O", file_name, "--no-check-certificate")
def copy(src_path: str, dst_path: str) -> None:
assert os.path.exists(src_path)
return cp(src_path, dst_path)
def move(src_path: str, dst_path: str) -> None:
assert os.path.exists(src_path)
return mv(src_path, dst_path)
def rename(file_path: str, new_file_name: str) -> None:
assert os.path.exists(file_path)
components = list(os.path.split(file_path))
components[-1] = new_file_name
dst = "/".join(components)
return mv(file_path, dst)
def delete(path: str) -> None:
assert os.path.exists(path)
return rm("-rf", path)

View File

@@ -0,0 +1,20 @@
import sys
sys.dont_write_bytecode = True
from jarvis.jarvis.atom.src import *
def view_document(file_path) -> None:
return evince(file_path)
def play_audio(file_path) -> None:
return rhythmbox_client(f"--play-uri=\"{file_path}\"")
def play_video(file_path) -> None:
return totem(file_path)
def root_view_document(file_path) -> None:
return Pkexec_evince(file_path)
path = "/home/heroding/桌面/test.txt"
view_document(path)
print(view_document(path))

View File

@@ -0,0 +1,6 @@
import sys
sys.dont_write_bytecode = True
def add_plan() -> None:
raise NotImplementedError

View File

@@ -0,0 +1,16 @@
import sys
sys.dont_write_bytecode = True
from ..src import *
from ..query import screen
def sudo_install(package: str) -> None:
return Pkexec_apt("install", package)
def adjust_theme(theme: str) -> None:
return gsettings("set", "org.gnome.desktop.interface", "gtk-theme", theme)
def adjust_brightness(brightness: int) -> None:
assert brightness >= 0.5 and brightness <= 1
brightness = str(brightness)
return xrandr("--output", screen(), "--brightness", brightness)

View File

@@ -0,0 +1,6 @@
import sys
sys.dont_write_bytecode = True
from .device import *
from .files import *
from .package import *

View File

@@ -0,0 +1,10 @@
import sys
sys.dont_write_bytecode = True
from ..src import *
def screen() -> str:
return Promise() \
.then(xrandr)() \
.then(grep)(" connected") \
.then(cut)("-f1", "-d", " ")()

View File

@@ -0,0 +1,10 @@
import sys
sys.dont_write_bytecode = True
from ..src import *
def dir_list(dir_path: str) -> str:
return ls(dir_path)
def dir_tree(dir_path: str) -> str:
return tree(dir_path)

View File

@@ -0,0 +1,9 @@
import sys
sys.dont_write_bytecode = True
from ..src import *
def is_installed(package: str) -> str:
return Promise() \
.then(apt)("list") \
.then(grep)(f"{package}/")()

View File

@@ -0,0 +1,5 @@
import sys
sys.dont_write_bytecode = True
from .bash import *
from .commands import *

View File

@@ -0,0 +1,79 @@
from __future__ import annotations
import sys
sys.dont_write_bytecode = True
from typing import Union, Optional
from subprocess import run, PIPE
class Output:
def __init__(self, stdout: str, stderr: str):
self.stdout = stdout
self.stderr = stderr
def observe(self):
if self.stdout == "" and self.stderr != "":
raise RuntimeError(self.stderr)
else:
return self.stdout
def __str__(self):
print([self.stdout, self.stderr])
return self.stdout + self.stderr
class Bash:
def __init__(self, cmd: str, stdin: Optional[str]=None):
self.cmd = cmd
self.stdin = stdin
def __call__(self, *arg) -> str:
result = run(
[self.cmd, *arg],
input=self.stdin,
capture_output=True,
text=True
)
return Output(result.stdout, result.stderr)
class Promise():
def __init__(self, stdin: Optional[str]=None):
self.stdin = stdin
self.stash = None
self.stdout = None
def then(self, bash: Union[str, Bash], *arg) -> Promise:
if type(bash) == str:
bash = Bash(bash)
self.stash = bash
self.stash.stdin = self.stdout if self.stdout != None else self.stdin
return self.__call__(*arg) if len(arg) > 0 else self
def observe(self) -> str:
return self.stdout.strip("\n")
def __call__(self, *arg) -> Promise:
if self.stash != None:
self.stdout = self.stash(*arg).observe()
self.stash = None
return self
else:
return self.observe()
Pkexec = \
lambda cmd: \
lambda *arg: \
Bash("pkexec")(cmd, *arg)
Pkexec_GUI = \
lambda cmd: \
lambda *arg: \
Bash("pkexec")(
"env",
"DISPLAY=$DISPLAY",
"XAUTHORITY=$XAUTHORITY",
cmd,
*arg
)

View File

@@ -0,0 +1,33 @@
from .bash import *
# file operation
mkdir = Bash("mkdir")
touch = Bash("touch")
cp = Bash("cp")
mv = Bash("mv")
rm = Bash("rm")
ls = Bash("ls")
tree = Bash("tree")
# tools
grep = Bash("grep")
cut = Bash("cut")
wget = Bash("wget")
gedit = Bash("gedit")
Pkexec_gedit = Pkexec_GUI("gedit")
# system & setting
apt = Bash("apt")
Pkexec_apt = Pkexec("apt")
Pkexec_evince = Pkexec("evince")
gsettings = Bash("gsettings")
xrandr = Bash("xrandr")
# development
python = Bash("python")
code = Bash("code")
# application
evince = Bash("evince")
rhythmbox_client = Bash("rhythmbox-client")
totem = Bash("totem")

View File

View File

@@ -0,0 +1,27 @@
import os
import openai
import time
import json
class OpenAI:
"""
A wrapper for OpenAI API
"""
def __init__(self, config_path=None):
with open(config_path) as f:
config = json.load(f)
self.model_name = config['model_name']
openai.api_key = config['OPENAI_API_KEY']
openai.organization = config['OPENAI_ORGANIZATION']
def chat(self, messages, temperature=0, sleep_time=2):
response = openai.ChatCompletion.create(
model=self.model_name,
messages=messages,
temperature=temperature
)
# time.sleep(sleep_time)
return response['choices'][0]['message']

View File

@@ -0,0 +1,21 @@
base_prompt = """{system_prompt}
{tool_description}
To use a tool, please use the following format:
```
{thought}to address the user request, thinking about what are the sub-goals you need to achieve and which tool is needed for each sub-goal?
{action}the tool name, should be one of [{action_names}].
{action_input}the input to the action, could be any valid input for python programs or shell commands, such numbers, strings, or path to a file, etc.
```
The response after utilizing tools should using the following format:
```
{response}To generate a response, you need to summarize your thoughts above and combined them with the tool execution results.
``
If you already know the answer, or you do not need to use tools,
please using the following format to reply:
```
{thought}the thought process to answer user questions
{response}respond to user request based on thought
```
Now you are ready to take questions, requests from users.
"""

View File

@@ -0,0 +1,36 @@
from dataclasses import asdict, dataclass, field
from enum import Enum
from typing import Dict, List, Optional, Union
class ActionStatusCode(int, Enum):
ING = 0
SUCCESS = 1
FAILED = -1
class ActionValidCode(int, Enum):
FINISH = 1
OPEN = 0
CLOSED = -1
INVALID = -2
ABSENT = -3 # NO ACTION
@dataclass
class ActionReturn:
args: Dict
url: Optional[str] = None
type: Optional[str] = None
result: Optional[str] = None
errmsg: Optional[str] = None
state: Union[ActionStatusCode, int] = ActionStatusCode.SUCCESS
thought: Optional[str] = None
valid: Optional[ActionValidCode] = ActionValidCode.OPEN
@dataclass
class EnvState:
command: List[str] = field(default_factory=list)
result: Optional[str] = None
pwd: Optional[str] = None
ls: Optional[str] = None

View File

View File

@@ -0,0 +1,100 @@
import subprocess
import time
import os
from typing import Optional
from jarvis.core.schema import ActionReturn, ActionStatusCode, EnvState
import asyncio
class BaseEnviroment:
"""Base class for all actions.
Args:
description (str, optional): The description of the action. Defaults to
None.
name (str, optional): The name of the action. If None, the name will
be class name. Defaults to None.
"""
def __init__(self,
name: Optional[str] = None,
timeout: Optional[int] = 2,
execute_log_path: Optional[str] = "execute.log",
state_log_path: Optional[str] = "state.log",
logging_command: Optional[str] = " 2>&1 | tee ") -> None:
if name is None:
name = self.__class__.__name__
self._name = name
self.timeout = timeout
self.execute_log_path = execute_log_path
self.state_log_path = state_log_path
self.working_dir = os.path.abspath(os.path.join(__file__, "..", "..", "..", "working_dir"))
self.log_execute = logging_command + os.path.join(self.working_dir, self.execute_log_path)
self.log_state = logging_command + os.path.join(self.working_dir, self.state_log_path)
self.server_name = "test"
self.env_state = None
# print(self.working_dir)
def init_env(self):
# cur_dir = os.getcwd()
# os.chdir(self.working_dir)
# command = "tmux new-session -d -s " + self.server_name
# os.system(command)
# os.chdir(cur_dir)
# todo catch new duplicate session error
subprocess.run(["tmux", "new-session", "-d", "-s", self.server_name], cwd=self.working_dir)
output = subprocess.check_output(["tmux", "ls"]).decode("utf-8")
# output = subprocess.run(["tmux ls"], capture_output=True, shell=True, text=True)
# print(output)
# todo add raise error
if self.server_name not in output:
return False, "Error: init server failed"
else:
return True, "Init tmux server, done."
def execute_command(self, command):
subprocess.run(["tmux", "send-keys", "-t", self.server_name, command, "Enter"])
def step(self, _command) -> EnvState:
self.env_state = EnvState()
self.env_state.command.append(_command)
command = _command + self.log_execute
subprocess.run(["tmux", "send-keys", "-t", self.server_name, command, "Enter"])
time.sleep(self.timeout)
self.observe()
return self.env_state
def read_log(self, log_path):
os.chdir(self.working_dir)
with open(log_path) as f:
return f.readlines()
def observe(self) -> None:
for _command in ["pwd", "ls"]:
command = _command + self.log_state
subprocess.run(["tmux", "send-keys", "-t", self.server_name, command, "Enter"])
# todo 处理异步的问题,可能提交过去还没执行完。
time.sleep(self.timeout)
if _command == "pwd":
self.env_state.pwd = self.read_log(self.state_log_path)
else:
self.env_state.ls = self.read_log(self.state_log_path)
self.env_state.result = self.read_log(self.execute_log_path)
@property
def name(self):
return self._name
@property
def description(self):
return self._description
def __repr__(self):
return f'{self.name}:{self.description}'
def __str__(self):
return self.__repr__()
if __name__ == '__main__':
env = BaseEnviroment()
result = env.observe()