fix merge dzc

This commit is contained in:
tamiflu233
2023-11-28 15:18:37 +08:00
57 changed files with 1631 additions and 39 deletions

View File

@@ -1,35 +1,33 @@
from jarvis.agent.openai_agent import OpenAIAgent
from jarvis.enviroment.old_env import BaseEnviroment
from jarvis.enviroment.py_env import PythonEnv
from jarvis.enviroment.bash_env import BashEnv
'''
Made By WZM
用处打开各种文档例如ppt、excel、word、pdf等
Made By DZC
The function is to be able to open any type of document.
'''
# environment = BaseEnviroment()
environment = PythonEnv()
agent = OpenAIAgent(config_path="config.json")
response = '''
Thought: To set up the working environment, we can focus on two sub-goals: turning on dark mode and organizing the app layout.
Thought: To open a document named , we can focus on one goal: open the specified document(word, pdf, pptx, txt etc.).
Actions:
<<<<<<< HEAD
1. <action>open_document</action> <invoke>open_document()("./作业10答案.pdf")</invoke>
=======
1. <action>open_document</action>
Check local action_lib, the required action code is in the library, according to the function description in the code, combined with the information provided by the user, You can instantiate classes for different tasks.
>>>>>>> 4a892f6411471c671bbaf605ba10fc8b42db61f4
invoke:
1. <invoke>open_document()("/home/heroding/桌面/rnn.pptx" , "pptx")</invoke>
'''
action = agent.extract_action(response, begin_str='<action>', end_str='</action>')
invoke = agent.extract_action(response, begin_str='<invoke>', end_str='</invoke>')
print(invoke)
import time
for (i,a) in enumerate(action):
invoke = agent.extract_invoke(response, begin_str='<invoke>', end_str='</invoke>')
for (i, a) in enumerate(action):
command = agent.action_lib[a] + "\n" + invoke[i]
# print(a, command)
print(environment.step(command).result)
# time.sleep(2)
# from jarvis.action_lib.execute_sql import execute_sql as ExecuteSQL
# action = ExecuteSQL()
# action(query='SELECT * FROM railway\nWHERE number="D1000";')

View File

@@ -22,15 +22,12 @@ Actions:
action = agent.extract_action(response, begin_str='<action>', end_str='</action>')
invoke = agent.extract_action(response, begin_str='<invoke>', end_str='</invoke>')
<<<<<<< HEAD
print(invoke)
=======
>>>>>>> 4a892f6411471c671bbaf605ba10fc8b42db61f4
for (i,a) in enumerate(action):
command = agent.action_lib[a] + "\n" + invoke[i]
# print(a, command)
print(environment.step(command).result)
# time.sleep(2)
# from jarvis.action_lib.execute_sql import execute_sql as ExecuteSQL
# action = ExecuteSQL()
# action(query='SELECT * FROM railway\nWHERE number="D1000";')

View File

@@ -0,0 +1,29 @@
from jarvis.agent.openai_agent import OpenAIAgent
from jarvis.enviroment.py_env import PythonEnv
'''
Made By DZC
The function is to install environment missing package.
'''
environment = PythonEnv()
agent = OpenAIAgent(config_path="config.json")
response = '''
Thought: To install environment missing package , we can focus on one goal: run "pip install xxx" in Bash.
Actions:
1. <action>install_package</action>
Check local action_lib, the required action code is in the library, according to the function description in the code, combined with the information provided by the user, You can instantiate classes for different tasks.
invoke:
1. <invoke>install_package()("numpy")</invoke>
'''
action = agent.extract_action(response, begin_str='<action>', end_str='</action>')
invoke = agent.extract_invoke(response, begin_str='<invoke>', end_str='</invoke>')
for (i, a) in enumerate(action):
command = agent.action_lib[a] + "\n" + invoke[i]
print(environment.step(command).result)

181
jarvis/.gitignore vendored Normal file
View File

@@ -0,0 +1,181 @@
langchain/
guidance/
Auto-GPT/
Agents/
utils/
working_dir/*.log
.vs/
.vscode/
.idea/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
docs/docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
notebooks/
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.envrc
.venv
.venvs
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# macOS display setting files
.DS_Store
# Wandb directory
wandb/
# asdf tool versions
.tool-versions
/.ruff_cache/
*.pkl
*.bin
# integration test artifacts
data_map*
\[('_type', 'fake'), ('stop', None)]
# Replit files
*replit*
node_modules
docs/.yarn/
docs/node_modules/
docs/.docusaurus/
docs/.cache-loader/
docs/_dist
docs/api_reference/api_reference.rst
docs/api_reference/experimental_api_reference.rst
docs/api_reference/_build
docs/api_reference/*/
!docs/api_reference/_static/
!docs/api_reference/templates/
!docs/api_reference/themes/
docs/docs_skeleton/build
docs/docs_skeleton/node_modules
docs/docs_skeleton/yarn.lock

1
jarvis/README.md Normal file
View File

@@ -0,0 +1 @@
# jarvis

View File

@@ -0,0 +1,88 @@
# from jarvis.action.base_action import BaseAction
# import subprocess
# import requests
# import os
# class download_and_play_audio(BaseAction):
# def __init__(self):
# self._description = "This class downloads an audio file from a provided URL and plays it on a Linux desktop."
# def __call__(self, url, *args, **kwargs):
# """
# Downloads an audio file from the specified URL and plays it.
# Args:
# url (str): URL of the audio file to be downloaded.
# *args, **kwargs: Additional arguments and keyword arguments.
# Usage:
# download_and_play_audio = Download_And_Play_Audio()
# download_and_play_audio('http://example.com/audio.mp3')
# """
# # Download the audio file
# try:
# response = requests.get(url)
# response.raise_for_status()
# file_name = self._format_file_name(url)
# desktop_path = os.path.join(os.path.expanduser('~'), '桌面', file_name)
# with open(desktop_path, 'wb') as file:
# file.write(response.content)
# except Exception as e:
# print(f"Error downloading file: {e}")
# return
# # Play the audio file
# try:
# subprocess.run(['xdg-open', desktop_path], check=True)
# except subprocess.CalledProcessError as e:
# print(f"Error playing file: {e}")
# def _format_file_name(self, url):
# """
# Formats the file name from the URL to prevent garbled characters.
# Args:
# url (str): The URL of the file.
# Returns:
# str: Formatted file name.
# """
# return url.split('/')[-1].replace(' ', '_')
import subprocess
import os
from jarvis.action.base_action import BaseAction
class download_and_play_audio(BaseAction):
def __init__(self):
self._description = "Download audio from a given link and play it on the system."
def __call__(self, link):
"""
Download audio from a given link and play it on the system.
Args:
link (str): The URL of the audio file to download.
Returns:
str: A message indicating whether the operation was successful or not.
"""
try:
# Define the desktop folder name based on the system language
desktop_folder = "桌面" # Chinese system language
# Expand the '~' character to the user's home directory
desktop_path = os.path.expanduser(f"~/{desktop_folder}/audio_file.mp3")
# Use the subprocess library to download the audio file to the desktop
subprocess.run(["wget", link, "-O", desktop_path])
# Use a media player to play the downloaded audio file
subprocess.run(["xdg-open", desktop_path])
return "Audio downloaded and playing."
except Exception as e:
return f"Error: {str(e)}"
# Example usage of the class
if __name__ == "__main__":
downloader = download_and_play_audio()
result = downloader("https://dasex101-random-learning.oss-cn-shanghai.aliyuncs.com/DataEthics/Taylor%20Swift%20-%20Look%20What%20You%20Made%20Me%20Do.mp3")
print(result)

View File

@@ -0,0 +1,35 @@
from jarvis.action.base_action import BaseAction
import subprocess
class download_and_play_music(BaseAction):
def __init__(self):
self._description = "Download audio from the given link and play it in the system"
def __call__(self, link):
"""
Download audio from the given link and play it in the system.
Args:
link (str): The URL of the audio file to be downloaded.
Returns:
None
"""
try:
# Download the audio file to the desktop
subprocess.run(["wget", link, "-P", "~/Desktop"], check=True)
# Get the file name from the link
file_name = link.split("/")[-1]
# Play the audio file
subprocess.run(["xdg-open", f"~/Desktop/{file_name}"], check=True)
except Exception as e:
# Re-throw the caught exception
raise Exception("An error occurred while processing the audio file.") from e
# Example usage
# task = DownloadAndPlayAudio()
# task("https://dasex101-random-learning.oss-cn-shanghai.aliyuncs.com/DataEthics/Taylor%20Swift%20-%20Look%20What%20You%20Made%20Me%20Do.mp3")

View File

@@ -0,0 +1,42 @@
from jarvis.action.base_action import BaseAction
import subprocess
import sys
class install_package(BaseAction):
def __init__(self) -> None:
super().__init__()
self._description = "install environment missing package."
self.action_type = 'BASH'
def __call__(self, package: str, *args, **kwargs):
"""
Install a Python package using pip.
Args:
package_name (str): Name of the package to install.
"""
try:
subprocess.check_call([sys.executable, '-m', 'pip', 'install', package])
print(f"Package '{package}' installed successfully.")
except subprocess.CalledProcessError as e:
print(f"Failed to install package '{package}'. Error: {e}")
# try:
# # 检查pip是否已安装
# print("check pip ...")
# subprocess.check_call([sys.executable, '-m', 'pip', '--version'])
# except subprocess.CalledProcessError:
# # 安装pip
# print("Installing pip...")
# subprocess.check_call([sys.executable, '-m', 'ensurepip'])
# # 安装指定的包
# try:
# print(f"Installing {package}...")
# subprocess.check_call([sys.executable, '-m', 'pip', 'install', package])
# print(f"{package} installed successfully.")
# except subprocess.CalledProcessError:
# print(f"Failed to install {package}.")

View File

@@ -1,5 +1,6 @@
from jarvis.action.base_action import BaseAction
from jarvis.atom_action.operations.media import view_office_document
from jarvis.atom_action.operations.media import view_office_document, play_video, play_audio, view_txt
class open_document(BaseAction):
def __init__(self) -> None:
@@ -7,7 +8,18 @@ class open_document(BaseAction):
self._description = "open the target document in your offered file path."
self.action_type = 'BASH'
def __call__(self, path: str, *args, **kwargs):
view_office_document(path)
def __call__(self, path: str, type: str, *args, **kwargs):
# 如果是offcie文件
if type == 'doc' or type == 'docx' or type == 'ppt' or type == 'pptx' or type == 'xls' or type == 'xlsx' or type == 'pdf':
view_office_document(path)
# 如果是视频文件
elif type == 'avi' or type == 'mp4' or type == 'mkv':
play_video(path)
# 如果是音频文件
elif type == 'mp3' or type == 'wav':
play_audio(path)
# 如果是文本文件或者其它文件
else:
view_txt(path)

View File

@@ -0,0 +1,11 @@
from jarvis.action.base_action import BaseAction
class open_picture(BaseAction):
def __init__(self) -> None:
super().__init__()
self._description = "Using open_picture() will open the picture you want."
self.action_type = 'BASH'
def __call__(self, *args, **kwargs):
return 'shortcuts run "open_picture"'

View File

@@ -5,6 +5,8 @@ from jarvis.action.base_action import BaseAction
import importlib
from typing import Optional
import inspect
class BaseAgent:
"""
BaseAgent is the base class of all agents.
@@ -29,17 +31,46 @@ class BaseAgent:
if file.endswith('.py') and "__init__" not in file:
class_name = file[:-3].split('/')[-1] # 去除.py后缀获取类名
module = importlib.import_module(class_name)
<<<<<<< HEAD
source_code = inspect.getsource(module) # wzm修改通过自省获得文件源码
tmp_obj = getattr(module, class_name)() #存储对象方式
# 存储源码字符串
self.action_lib.update({class_name: source_code})# wzm修改技能库存储文件源码而不是对象
# self.action_lib.update({class_name: tmp_obj})
=======
# get origin code
source_code = inspect.getsource(module)
# get class object
tmp_obj = getattr(module, class_name)()
# save origin code
self.action_lib.update({class_name: source_code})
# save code description
>>>>>>> 4a892f6411471c671bbaf605ba10fc8b42db61f4
self.action_lib_description.update({class_name: tmp_obj.description})
# get class source code
# def get_class_source_code(self, module, class_name):
# # 获取类对象
# tmp_obj = getattr(module, class_name)
# print(type(tmp_obj.description))
# self.action_lib_description.update({class_name: tmp_obj.description})
# # 获取类定义的源文件和行号
# source_file = inspect.getsourcefile(tmp_obj)
# source_lines, start_line = inspect.getsourcelines(tmp_obj)
# # 读取源文件
# with open(source_file, 'r') as file:
# lines = file.readlines()
# # 提取类的源代码
# class_code = ''.join(lines[start_line - 1: start_line - 1 + len(source_lines)])
# return class_code
if __name__ == '__main__':
a = BaseAgent()
a.init_action_lib()
agent = BaseAgent()
agent.init_action_lib()
from jarvis.enviroment.py_env import PythonEnv
myEnv = PythonEnv()
# toolName = "execute_sql"
@@ -50,7 +81,7 @@ if __name__ == '__main__':
# print(res)
# print(a.action_lib)
# print(a.action_lib_description)
for k,v in a.action_lib.items():
for k,v in agent.action_lib.items():
toolName = k
toolCode = v
args = None

View File

@@ -0,0 +1,34 @@
from jarvis.agent.linux_skill_creator import LinuxSkillCreator
from jarvis.agent.linux_skill_amend import LinuxSkillAmend
from jarvis.agent.openai_agent import OpenAIAgent
from jarvis.enviroment.py_env import PythonEnv
environment = PythonEnv()
agent = OpenAIAgent(config_path="../../examples/config.json")
skill_amend = LinuxSkillAmend(config_path="../../examples/config.json")
response = '''
Thought: To download a file from internet , we can focus on xxx.
Actions:
1. <action>download_and_play_music</action>
Check local action_lib, the required action code is in the library, according to the function description in the code, combined with the information provided by the user, You can instantiate classes for different tasks.
invoke:
1. <invoke>download_and_play_music()("https://dasex101-random-learning.oss-cn-shanghai.aliyuncs.com/DataEthics/Taylor%20Swift%20-%20Look%20What%20You%20Made%20Me%20Do.mp3")</invoke>
'''
action = agent.extract_action(response, begin_str='<action>', end_str='</action>')
invoke = agent.extract_invoke(response, begin_str='<invoke>', end_str='</invoke>')
task = "Download audio from a given link and play it on the system."
for (i, a) in enumerate(action):
command = agent.action_lib[a] + '\n' + invoke[i]
res = environment.step(command)
if res.error != None:
new_code = skill_amend.amend_code(command, task, res.error)
path = "new_code.txt"
with open(path, 'w', encoding='utf-8') as file:
file.write(new_code)

View File

@@ -0,0 +1,51 @@
from jarvis.action.get_os_version import get_os_version, check_os_version
from jarvis.core.llms import OpenAI
_LINUX_SYSTEM_AMEND_PROMPT = '''
You are an AI expert in Python programming, with a focus on diagnosing and resolving code issues.
Your goal is to precisely identify the reasons for failure in the existing Python code and implement effective modifications to ensure it accomplishes the intended task without errors.
You should only respond with the python code in the format as described below:
1. Initial Code Review: Begin by examining the provided Python code. Gain an understanding of its purpose, structure, and the logic it employs to accomplish the task.
2. Error Analysis: Conduct a step-by-step analysis to identify why the code is generating errors or failing to complete the task. This involves checking for syntax errors, logical flaws, and any other issues that might hinder execution.
3. Detailed Explanation: Offer a clear and comprehensive explanation for each identified issue, detailing why these problems are occurring and how they are impacting the code's functionality.
4. Solution Development: Based on the error analysis, suggest specific changes to rectify the issues. This could include syntax corrections, logic adjustments, or structural optimizations to improve performance.
5. Code Testing and Validation: Describe how to test the modified code to ensure that it now meets the task requirements and is free from the previously identified issues.
6. Programming Best Practices: Provide additional tips or best practices in Python programming that are relevant to the issues encountered, helping to prevent similar problems in future coding endeavors.
And the code you write should also follow the following criteria:
1. The analysis and explanations must be clear, detailed, and easy to understand, even for those with less programming experience.
2. All modifications must address the specific issues identified in the error analysis.
3. Ensure the final code is syntactically correct, optimized for performance, and follows Python best practices.
4. The solution must enable the code to successfully complete the intended task without errors.
Now you will be provided with the following two information:
Original Code: {original_code}
Task: {task}
Error Messages: {error}
Please modify the original python code to fix all the problems.
'''
class LinuxSkillAmend():
def __init__(self, config_path=None) -> None:
super().__init__()
self.llm = OpenAI(config_path)
self.system_version = get_os_version()
try:
check_os_version(self.system_version)
except ValueError as e:
print(e)
# amend the code to fullfill the task.
def amend_code(self, original_code, task, error):
self.prompt = _LINUX_SYSTEM_AMEND_PROMPT.format(
original_code = original_code,
task = task,
error = error
)
self.message = [
{"role": "system", "content": self.prompt},
{"role": "user", "content": task},
]
return self.llm.chat(self.message)

View File

@@ -0,0 +1,66 @@
from jarvis.action.get_os_version import get_os_version, check_os_version
from jarvis.core.llms import OpenAI
_LINUX_SYSTEM_PROMPT = '''
You are helpful assistant to assist in writing Python tool code for tasks completed on Linux operating systems. Your expertise lies in creating Python classes that perform specific tasks, adhering to a predefined format and structure.
Your goal is to generate Python tool code in the form of a class. The code should be structured to perform a user-specified task on a Linux operating system. The class must be easy to use and understand, with clear instructions and comments.
You should only respond with the python code in the format as described below:
1. Code Structure: Begin with the necessary import statement: from jarvis.action.base_action import BaseAction. Then, define the class using the task name provided by the user, converting it into a valid Python class name by replacing spaces with underscores.
2. Parameter Handling: In the __init__ method, only initialize self._description with a brief description of the class's purpose, detailing what task it accomplishes.
3. Subprocess Integration: If the task involves Linux bash operations, instruct the use of the subprocess library, particularly the run method, to execute these operations. This approach should be encapsulated within the __call__ method of the class.
4. Detailed Comments: Provide comprehensive comments throughout the code. This includes describing the purpose of the class, the usage of each method, and the function of parameters, especially in the __call__ method. End with an example of how to use the class, like open_document()(parameters).
And the code you write should also follow the following criteria:
1.The class must start with from jarvis.action.base_action import BaseAction.
2.The class name should be formatted based on the user's task name, with underscores separating different words. Please make the first letter of each word lowercase.
3.In the __init__ method, only self._description should be initialized.
4.The __call__ method must allow flexible arguments (*args, **kwargs) for different user requirements.
5.For tasks involving Linux bash commands, use the subprocess library to execute these commands within the Python class.
6.The code should include detailed comments explaining the purpose of the class, the role of each parameter, and a clear example of how to use the class.
7. If downloading a file is involved, the file name must follow the underscore (_) format to prevent garbled characters in the name.
Now you will be provided with the following two information:
System Version: {system_version}
Task: {task}
System language: simplified chinese
Please write python code to accomplish the task and be compatible with system environments, versions and language. The path names of the system may be inconsistent in different system languages.
'''
class LinuxSkillCreator():
"""
LinuxSkillCreator is used to generate new skills in Linux environment and store them in the action_lib.
"""
def __init__(self, config_path=None) -> None:
super().__init__()
self.llm = OpenAI(config_path)
self.system_version = get_os_version()
try:
check_os_version(self.system_version)
except ValueError as e:
print(e)
# self.mac_systom_prompts =
def format_message(self, task):
self.prompt = _LINUX_SYSTEM_PROMPT.format(
system_version=self.system_version,
task=task
)
self.message = [
{"role": "system", "content": self.prompt},
{"role": "user", "content": task},
]
return self.llm.chat(self.message)
# skillCreator = LinuxSkillCreator(config_path="../../examples/config.json")
# task = "Please download the audio I have given link from the Internet to the desktop of the system and play it in the system."
# python_code = skillCreator.format_message(task)
# if '```python' in python_code:
# python_code = python_code.split('```python')[1].split('```')[0]
# elif '```' in python_code:
# python_code = python_code.split('```')[1].split('```')[0]
# file_name = "my_python_script.py"
# # 打开文件并写入代码字符串
# with open(file_name, "w") as file:
# file.write(python_code)
# print(f"The Python code has been saved to {file_name}")

View File

@@ -0,0 +1,30 @@
from jarvis.action.base_action import BaseAction
import subprocess
class DownloadAndPlayAudio(BaseAction):
def __init__(self):
self._description = "Download audio from the given link and play it in the system"
def __call__(self, link):
"""
Download audio from the given link and play it in the system.
Args:
link (str): The URL of the audio file to be downloaded.
Returns:
None
"""
# Download the audio file to the desktop
subprocess.run(["wget", link, "-P", "~/Desktop"])
# Get the file name from the link
file_name = link.split("/")[-1]
# Play the audio file
subprocess.run(["xdg-open", f"~/Desktop/{file_name}"])
# Example usage
task = DownloadAndPlayAudio()
task("https://example.com/audio.mp3")

62
jarvis/agent/new_code.txt Normal file
View File

@@ -0,0 +1,62 @@
The provided code is attempting to download an audio file from a given link and play it on the system. However, there are a few issues with the code that need to be addressed.
1. The import statement is incorrect. The code is trying to import the `BaseAction` class from the `jarvis.action.base_action` module. However, the correct import statement should be `from jarvis.actions.base_action import BaseAction`.
2. The class name `download_and_play_music` should follow the Python naming convention of using CamelCase. It should be changed to `DownloadAndPlayMusic`.
3. The `__call__` method is missing the `self` parameter. It should be defined as `def __call__(self, link):`.
4. The `subprocess.run` function is being used to execute the `wget` and `xdg-open` commands. However, the `~` character in the file paths is not being expanded correctly. To fix this, we can use the `os.path.expanduser` function to expand the `~` character to the user's home directory.
5. The `subprocess.run` function is raising a `CalledProcessError` when the `xdg-open` command fails. However, the error message is not being propagated correctly. To fix this, we can modify the exception handling to raise a new exception with a more informative error message.
Here's the modified code with the necessary changes:
```python
import subprocess
import os.path
from jarvis.actions.base_action import BaseAction
class DownloadAndPlayMusic(BaseAction):
def __init__(self):
self._description = "Download audio from the given link and play it in the system"
def __call__(self, link):
"""
Download audio from the given link and play it in the system.
Args:
link (str): The URL of the audio file to be downloaded.
Returns:
None
"""
try:
# Download the audio file to the desktop
subprocess.run(["wget", link, "-P", os.path.expanduser("~/Desktop")], check=True)
# Get the file name from the link
file_name = link.split("/")[-1]
# Play the audio file
subprocess.run(["xdg-open", os.path.expanduser(f"~/Desktop/{file_name}")], check=True)
except subprocess.CalledProcessError as e:
# Re-throw the caught exception with a more informative error message
raise Exception("An error occurred while processing the audio file. Make sure the file exists and can be played.") from e
# Example usage
task = DownloadAndPlayMusic()
task("https://dasex101-random-learning.oss-cn-shanghai.aliyuncs.com/DataEthics/Taylor%20Swift%20-%20Look%20What%20You%20Made%20Me%20Do.mp3")
```
To test the modified code:
1. Make sure you have the necessary permissions to download and play files on your system.
2. Replace the example URL with the desired audio file URL.
3. Run the code and check if the audio file is downloaded and played successfully.
Programming Best Practices:
1. Use meaningful and descriptive variable and function names to improve code readability.
2. Handle exceptions gracefully and provide informative error messages to aid in debugging.
3. Use the `os.path` module to handle file paths in a platform-independent manner.
4. Follow Python naming conventions, such as using CamelCase for class names.

View File

@@ -75,6 +75,42 @@ class OpenAIAgent(BaseAgent):
_begin = message.find(begin_str)
_end = message.find(end_str)
return result
def extract_invoke(self, message, begin_str='[BEGIN]', end_str='[END]'):
result = []
_begin = message.find(begin_str)
_end = message.find(end_str)
while not (_begin == -1 or _end == -1):
result.append(message[_begin + len(begin_str):_end].strip())
message = message[_end + len(end_str):]
_begin = message.find(begin_str)
_end = message.find(end_str)
return result
# # @dzc
# def extract_parameter(self, message, begin_str='[BEGIN]', end_str='[END]'):
# result = []
# _begin_parameter = message.find(begin_str)
# _end_parameter = message.find(end_str)
# # go through parameters
# while not (_begin_parameter == -1 or _end_parameter == -1):
# # get current task parameters
# parameter = message[_begin_parameter + len(begin_str):_end_parameter].strip()
# _begin_arg = parameter.find("<arg>")
# _end_arg = parameter.find("</arg>")
# args = []
# # go through args
# while not (_begin_arg == -1 or _end_arg == -1):
# arg = parameter[_begin_arg + len("<arg>"): _end_arg].strip()
# args.append(arg)
# parameter = parameter[_end_arg + len("</arg>"):].strip()
# _begin_arg = parameter.find("<arg>")
# _end_arg = parameter.find("</arg>")
# result.append(args)
# message = message[_end_parameter + len(end_str):]
# _begin_parameter = message.find(begin_str)
# _end_parameter = message.find(end_str)
# return result
def chat(self, goal: str):
self._history = []

View File

@@ -11,3 +11,7 @@ def open_editor(file_path: str) -> None:
def open_vscode(path: str) -> None:
return code(path)

View File

@@ -1,17 +1,19 @@
import sys
sys.dont_write_bytecode = True
from ..src import *
from jarvis.atom_action.src import *
def view_document(file_path) -> None:
return evince(file_path)
# add by wzm
def view_office_document(file_path) -> None:
return libreoffice(file_path)
def view_txt(file_path) -> None:
return gedit(file_path)
def play_audio(file_path) -> None:
return rhythmbox_client(f"--play-uri=\"{file_path}\"")
def play_video(file_path) -> None:
return totem(file_path)
def view_office_document(file_path) -> None:
return libreoffice(file_path)

View File

@@ -7,6 +7,9 @@ from ..query import screen
def sudo_install(package: str) -> None:
return Pkexec_apt("install", package)
def pip_install(package: str) -> None:
return pip("install", package)
def adjust_theme(theme: str) -> None:
return gsettings("set", "org.gnome.desktop.interface", "gtk-theme", theme)

View File

@@ -20,6 +20,7 @@ Pkexec_gedit = Pkexec_GUI("gedit")
# system & setting
apt = Bash("apt")
Pkexec_apt = Pkexec("apt")
pip = Bash("pip")
gsettings = Bash("gsettings")
xrandr = Bash("xrandr")
terminal = Bash("gnome-terminal") # add by wzm
@@ -30,7 +31,7 @@ code = Bash("code")
# application
evince = Bash("evince")
libreoffice = Bash("libreoffice") # add by wzm
gedit = Bash("gedit")
rhythmbox_client = Bash("rhythmbox-client")
totem = Bash("totem")
libreoffice = Bash("libreoffice")

View File

@@ -3,6 +3,11 @@ import openai
import time
import json
proxy = {
'http': '127.0.0.1:2081',
'https': '127.0.0.1:2081',
}
class OpenAI:
"""
@@ -14,6 +19,7 @@ class OpenAI:
self.model_name = config['model_name']
openai.api_key = config['OPENAI_API_KEY']
openai.organization = config['OPENAI_ORGANIZATION']
openai.proxy = proxy
def chat(self, messages, temperature=0, sleep_time=2):
response = openai.chat.completions.create(

View File

@@ -18,7 +18,7 @@ class Env:
def __init__(self) -> None:
self._name: str = self.__class__.__name__
self.timeout: int = 2
self.timeout: int = 30
self.working_dir = os.path.abspath(os.path.join(__file__, "..", "..", "..", "working_dir"))
if not os.path.exists(self.working_dir):
os.makedirs(self.working_dir)
@@ -44,4 +44,4 @@ class Env:
if __name__ == '__main__':
env = Env()
env.env_state = EnvState()
result = env.observe()
# result = env.observe()

View File

@@ -21,8 +21,12 @@ class PythonEnv(Env):
super().__init__()
self._name: str = self.__class__.__name__
<<<<<<< HEAD
def step(self, _command: str, args: Union[list[str], str] = []) -> EnvState:
=======
def step(self, _command: str, args: list[str] | str = []) -> EnvState:
>>>>>>> 4a892f6411471c671bbaf605ba10fc8b42db61f4
tmp_code_file = NamedTemporaryFile("w", dir=self.working_dir, suffix=".py", encoding="utf-8")
# wzm修改解决拿不到最后一行输出的当前工作目录问题
_command = _command.strip() + "\n" + "import os" + "\n" + "print(os.getcwd())"
@@ -37,7 +41,8 @@ class PythonEnv(Env):
["python", '-B', str(filename)],
encoding="utf8",
check=True, cwd=self.working_dir, timeout=self.timeout,
stdout=subprocess.PIPE
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# wzm修改如果有标准化输出
if results.stdout:

View File

@@ -0,0 +1,31 @@
{
"model_name": "gpt-3.5-turbo-16k-0613",
"OPENAI_API_KEY": "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",
"OPENAI_ORGANIZATION": "",
"config": {
"API_KEY": "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",
"MAX_CHAT_HISTORY": "10",
"User_Names": "[\"User\"]",
"TOP_K": "3"
},
"LLM_type": "OpenAI",
"LLM": {
"temperature": 0.0,
"model": "gpt-3.5-turbo-16k-0613",
"log_path": "logs/god"
},
"agents": {
"Yang bufan": {
"style": "indifferent and idle",
"roles": {
"Response_state": "Yang bufan"
}
},
"User": {
"style": "soso",
"roles": {
"Response_state": "User"
}
}
}
}

21
jarvis/index.py Normal file
View File

@@ -0,0 +1,21 @@
import time
from jarvis.enviroment.base_env import BaseEnviroment
from jarvis.agent.openai_agent import OpenAIAgent
if __name__ == '__main__':
environment = BaseEnviroment()
agent = OpenAIAgent(
config_path="examples/config.json",
environment=environment
)
action = [
'turn_on_dark_mode',
'turn_on_light_mode'
]
# environment.init_env()
for a in action:
print(a)
command = agent.action_lib[a]
print(agent.env.step(command))
time.sleep(2)

View File

View File

View File

@@ -0,0 +1,97 @@
import subprocess
import time
import os
from typing import Optional
from jarvis.core.schema import ActionReturn, ActionStatusCode
class BaseAction:
"""Base class for all actions.
Args:
description (str, optional): The description of the action. Defaults to
None.
name (str, optional): The name of the action. If None, the name will
be class name. Defaults to None.
"""
def __init__(self,
description: Optional[str] = None,
name: Optional[str] = None,
timeout: Optional[int] = 2) -> None:
if name is None:
name = self.__class__.__name__
self._name = name
self._description = description
self._timeout = timeout
# self.execute_log_path = execute_log_path
# self.state_log_path = state_log_path
# self.log_execute = logging_command + self.execute_log_path
# self.log_state = logging_command + self.state_log_path
# self.working_dir = os.path.abspath(os.path.join(os.getcwd(), "..", "..", "working_dir"))
# self.server_name = "test"
# print(self.working_dir)
def _python(self, *lines):
return f'python -Bc "{"; ".join(lines)}"'
def _import(self, *packages):
return f'from jarvis.{".".join(packages)} import *'
@property
def _command(self):
raise NotImplementedError
@property
def timeout(self):
return self._timeout
# def _success(self):
# raise NotImplementedError
# def execute_command(self, command):
# subprocess.run(["tmux", "send-keys", "-t", self.server_name, command, "Enter"])
# async def parse_result(self):
# def __call__(self, *args, **kwargs) -> ActionReturn:
# def run(self, *args, **kwargs) -> ActionReturn:
# # 将run搬到env中
# command = self._command() + self.log_execute
# subprocess.run(["tmux", "send-keys", "-t", self.server_name, command, "Enter"])
# time.sleep(self.timeout)
# with open()
# action_return = ActionReturn(type=self.name, args=command)
# try:
# result = subprocess.run([command], capture_output=True, check=True,
# text=True, shell=True, timeout=self.timeout, stdin=subprocess.DEVNULL)
# result = subprocess.run(["tmux", "send-keys", "-t", 'test', command, "Enter"])
# if result.returncode == 0:
# action_return.state = ActionStatusCode.SUCCESS
# action_return.thought = self._success()
# if result.stdout:
# action_return.result = result.stdout
# if result.stderr:
# action_return.result = result.stderr
# except subprocess.CalledProcessError as e:
# action_return.state = ActionStatusCode.FAILED
# action_return.errmsg = e.stderr
# action_return.result = e.stdout
# return action_return
@property
def name(self):
return self._name
@property
def description(self):
return self._description
def __repr__(self):
return f'{self.name}:{self.description}'
def __str__(self):
return self.__repr__()
if __name__ == '__main__':
action = BaseAction()

View File

@@ -0,0 +1,39 @@
from jarvis.action.turn_on_dark_mode import turn_on_dark_mode
from jarvis.action.organize_app_layout import organize_app_layout
import subprocess
# import os
#
# functions = {"turn_on_dark_mode()": turn_on_dark_mode(), "turn_on_dark_mode()":organize_app_layout()}
# functions["turn_on_dark_mode()"].run()
organize_app_layout().run()
# command = 'shortcuts run "Dark Mode"'
# # command = 'du -d 1 -h'
# # subprocess.run([command], text=True, shell=True)
# # os.system(command)
# os.popen(command)
# try:
# # 执行命令
# cmd_output = os.popen(command).read()
# except Exception as e:
# # 捕获异常并处理
# print("An error occurred:\n", str(e))
# else:
# # 没有发生异常时执行的代码
# print("Command output:\n", cmd_output)
# import subprocess
#
# try:
# # 执行命令
# result = subprocess.run(['shortcuts', 'run', 'Dark Mode'], capture_output=True, text=True, stdin=subprocess.DEVNULL)
# result.check_returncode() # 检查命令的返回码
# except subprocess.CalledProcessError as e:
# # 命令执行失败时的异常处理
# print("Command failed with return code:", e.returncode)
# print("Command output:", e.stdout)
# except Exception as e:
# # 其他异常处理
# print("An error occurred:", str(e))
# else:
# # 没有发生异常时执行的代码
# print("Command output:", result.stdout)

View File

View File

@@ -0,0 +1,15 @@
from jarvis.action.base_action import BaseAction
class organize_app_layout(BaseAction):
def __init__(self) -> None:
super().__init__()
self._description = "Using organize_app_layout() will help user reorganize their Desktop layout for better working condition and focus more easily."
self._timeout = 15
@property
def _command(self) -> str:
return 'shortcuts run "Organize APP Layout"'
#
# def _success(self) -> str:
# return "Successfully organized the app's layout"

View File

@@ -0,0 +1,31 @@
from jarvis.action.base_action import BaseAction
class turn_on_dark_mode(BaseAction):
def __init__(self) -> None:
super().__init__()
self._description = "Using turn_on_dark_mode() will change your system into the dark mode."
@property
def _command(self):
return self._python(
self._import("atom", "operations"),
"adjust_theme('Adwaita-dark')"
)
# def _success(self):
# return "Successfully turned the system into the Dark Mode"
# def __call__(self, *args, **kwargs):
#
# command = 'shortcuts run "Dark Mode"'
# try:
# # result = subprocess.run([command, "Dark Mode"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# result = subprocess.run([command], capture_output=True, check=True,
# text=True, shell=True, timeout=self.timeout, stdin=subprocess.DEVNULL)
# if result.returncode == 0:
# return result
# except subprocess.CalledProcessError as e:
# return e
# except subprocess.TimeoutExpired:
# raise TimeoutError(f"Command '{command}' timed out after {self.timeout} seconds.")

View File

@@ -0,0 +1,32 @@
from jarvis.action.base_action import BaseAction
class turn_on_light_mode(BaseAction):
def __init__(self) -> None:
super().__init__()
self._description = "Using turn_on_light_mode() will change your system into the light mode."
@property
def _command(self):
return self._python(
self._import("atom", "operations"),
"adjust_theme('Adwaita')"
)
# def _success(self):
# return "Successfully turned the system into the Light Mode"
# def __call__(self, *args, **kwargs):
#
# command = 'shortcuts run "Dark Mode"'
# try:
# # result = subprocess.run([command, "Dark Mode"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# result = subprocess.run([command], capture_output=True, check=True,
# text=True, shell=True, timeout=self.timeout, stdin=subprocess.DEVNULL)
# if result.returncode == 0:
# return result
# except subprocess.CalledProcessError as e:
# return e
# except subprocess.TimeoutExpired:
# raise TimeoutError(f"Command '{command}' timed out after {self.timeout} seconds.")

View File

View File

@@ -0,0 +1,42 @@
import os
import sys
import glob
from jarvis.action.base_action import BaseAction
import importlib
from typing import Optional
class BaseAgent:
"""
BaseAgent is the base class of all agents.
"""
def __init__(self, config_path=None):
self.llm = None
self.environment = None
self.action_lib = {}
self.action_lib_description = {}
self.action = None
self.init_action_lib()
#
# def from_config(self, config_path=None):
# raise NotImplementedError
def init_action_lib(self, path=None, attribute_name='_description'):
if not path:
path = os.path.abspath(os.path.join(__file__, "..", "..", "action_lib"))
sys.path.append(path)
files = glob.glob(path + "/*.py")
for file in files:
if file.endswith('.py') and "__init__" not in file:
class_name = file[:-3].split('/')[-1] # 去除.py后缀获取类名
module = importlib.import_module(class_name)
tmp_obj = getattr(module, class_name)()
self.action_lib.update({class_name: tmp_obj._command})
self.action_lib_description.update({class_name: tmp_obj.description})
if __name__ == '__main__':
a = BaseAgent()
a.init_action_lib()
for k,v in a.action_lib.items():
print(k)
print(v)

View File

@@ -0,0 +1,119 @@
import json
from jarvis.core.llms import OpenAI
from jarvis.agent.base_agent import BaseAgent
from jarvis.enviroment.base_env import BaseEnviroment
from jarvis.core.schema import EnvState
a = "{action_input} the input to the action, could be any valid input for python programs or shell commands, such numbers, strings, or path to a file, etc."
BASE_PROMPT = """
{system_prompt}
{tool_description}
To use a tool, please use the following format:
```
{thought} to address the user request, thinking about what are the sub-goals you need to achieve and which tool is needed for each sub-goal?
{action} the tool names, each action name should be one of [{action_names}].
```
The response after utilizing tools should using the following format:
```
{response} To generate a response, you need to summarize your thoughts above and combined them with the tool execution results.
``
If you already know the answer, or you do not need to use tools,
please using the following format to reply:
```
{thought} the thought process to answer user questions
{response} respond to user request based on thought
```
Remember you must surround you action between <action> and </action>.
Now you are ready to take questions and requests from users.
"""
class OpenAIAgent(BaseAgent):
"""
BaseAgent is the base class of all agents.
"""
def __init__(self, config_path=None, environment: BaseEnviroment = None):
super().__init__()
self.llm = OpenAI(config_path)
self.env = environment
self.actions = None
self.max_iter = 10
self.system_prompt = """You are a personal assistant that aims to automate the workflow for human.\nYou are capable of understanding human intent and decompose it into several subgoals that can be addressed via language generation or acomplished using external tools.\nSome of the external tools you can use and their functionalities are as follows:
"""
self.action_names = self.action_lib.keys()
# todo: 添加工具检索模块
self.available_action_description = ""
for i, name in enumerate(self.action_names):
self.available_action_description += "Tool {}: <action>{}</action>\n{}\n".format(i+1, name, self.action_lib_description[name])
def from_config(self, config_path=None):
self.llm = OpenAI(config_path)
def format_message(self, query):
self.prompt = BASE_PROMPT.format(
system_prompt=self.system_prompt,
tool_description=self.available_action_description,
action_names=self.action_names,
thought="Thought:",
action="Actions:",
action_input="Action Input:",
response="Response:"
)
self.message = [
{"role": "system", "content": self.prompt},
{"role": "user", "content": query},
]
return self.llm.chat(self.message)
def extract_action(self, message, begin_str='[BEGIN]', end_str='[END]'):
result = []
_begin = message.find(begin_str)
_end = message.find(end_str)
while not (_begin == -1 or _end == -1):
result.append(message[_begin + len(begin_str):_end].strip())
message = message[_end + len(end_str):]
_begin = message.find(begin_str)
_end = message.find(end_str)
return result
def chat(self, goal: str):
self._history = []
def step(self, single_action) -> EnvState:
_command = self.action_lib[single_action]
self.environment.step(_command)
return self.environment.observe()
if __name__ == '__main__':
actions = {
"turn_on_dark_mode()": "Using turn_on_dark_mode() will change your system into the dark mode.",
"play_study_music()": "Using play_study_music() will open Music in your Mac and play songs that are sutiable for study and work.",
"create_meeting()": "Using create_meeting() will help user create a meeting event. When users request to create a meeting, don't ask questions such as meeting title and time, just invoke this tool by generating the action name.",
"show_upcoming_meetings()": "Using show_upcoming_meetings() will open Calendar and show the their upcoming meetings for the user.",
"organize_app_layout()": "Using organize_app_layout() will help user reorganize their Desktop layout for better working condition and focus more easily."
}
environment = BaseEnviroment()
agent = OpenAIAgent(config_path="../../examples/config.json", environment=environment)
# print(agent.action_lib)
# print(agent.action_lib_description)
# executation_action = agent.action_lib["turn_on_dark_mode"]()
# executation_action.run()
# response = agent.format_message(query="I want to start working now. Please help set up the working environment for me.")
# print(agent.prompt)
# print(response['content'])
response = '''
Thought: To set up the working environment, we can focus on two sub-goals: turning on dark mode and organizing the app layout.
Actions:
1. <action>turn_on_dark_mode</action>
2. <action>turn_on_light_mode</action>'''
action = agent.extract_action(response, begin_str='<action>', end_str='</action>')
import time
for a in action:
print(a)
command = agent.action_lib[a]
print(agent.env.step(command))
time.sleep(2)

View File

@@ -0,0 +1,6 @@
import sys
sys.dont_write_bytecode = True
from .operations import *
from .query import *
from .src import *

View File

@@ -0,0 +1,8 @@
import sys
sys.dont_write_bytecode = True
from .coding import *
from .files import *
from .media import *
from .routine import *
from .system import *

View File

@@ -0,0 +1,13 @@
import sys
sys.dont_write_bytecode = True
from ..src import *
def exec_python(file_path: str) -> None:
return python(file_path)
def open_editor(file_path: str) -> None:
return gedit(file_path)
def open_vscode(path: str) -> None:
return code(path)

View File

@@ -0,0 +1,35 @@
import sys
import os
sys.dont_write_bytecode = True
from ..src import *
def create_dir(dir_path: str, new_dir_name: str) -> None:
assert os.path.exists(dir_path)
return mkdir(os.path.join(dir_path, new_dir_name))
def create_file(file_path: str, new_file_name: str) -> None:
assert os.path.exists(file_path)
return touch(os.path.join(file_path, new_file_name))
def download_file(url: str, file_name: str) -> None:
return wget(url, "-O", file_name, "--no-check-certificate")
def copy(src_path: str, dst_path: str) -> None:
assert os.path.exists(src_path)
return cp(src_path, dst_path)
def move(src_path: str, dst_path: str) -> None:
assert os.path.exists(src_path)
return mv(src_path, dst_path)
def rename(file_path: str, new_file_name: str) -> None:
assert os.path.exists(file_path)
components = list(os.path.split(file_path))
components[-1] = new_file_name
dst = "/".join(components)
return mv(file_path, dst)
def delete(path: str) -> None:
assert os.path.exists(path)
return rm("-rf", path)

View File

@@ -0,0 +1,20 @@
import sys
sys.dont_write_bytecode = True
from jarvis.jarvis.atom.src import *
def view_document(file_path) -> None:
return evince(file_path)
def play_audio(file_path) -> None:
return rhythmbox_client(f"--play-uri=\"{file_path}\"")
def play_video(file_path) -> None:
return totem(file_path)
def root_view_document(file_path) -> None:
return Pkexec_evince(file_path)
path = "/home/heroding/桌面/test.txt"
view_document(path)
print(view_document(path))

View File

@@ -0,0 +1,6 @@
import sys
sys.dont_write_bytecode = True
def add_plan() -> None:
raise NotImplementedError

View File

@@ -0,0 +1,16 @@
import sys
sys.dont_write_bytecode = True
from ..src import *
from ..query import screen
def sudo_install(package: str) -> None:
return Pkexec_apt("install", package)
def adjust_theme(theme: str) -> None:
return gsettings("set", "org.gnome.desktop.interface", "gtk-theme", theme)
def adjust_brightness(brightness: int) -> None:
assert brightness >= 0.5 and brightness <= 1
brightness = str(brightness)
return xrandr("--output", screen(), "--brightness", brightness)

View File

@@ -0,0 +1,6 @@
import sys
sys.dont_write_bytecode = True
from .device import *
from .files import *
from .package import *

View File

@@ -0,0 +1,10 @@
import sys
sys.dont_write_bytecode = True
from ..src import *
def screen() -> str:
return Promise() \
.then(xrandr)() \
.then(grep)(" connected") \
.then(cut)("-f1", "-d", " ")()

View File

@@ -0,0 +1,10 @@
import sys
sys.dont_write_bytecode = True
from ..src import *
def dir_list(dir_path: str) -> str:
return ls(dir_path)
def dir_tree(dir_path: str) -> str:
return tree(dir_path)

View File

@@ -0,0 +1,9 @@
import sys
sys.dont_write_bytecode = True
from ..src import *
def is_installed(package: str) -> str:
return Promise() \
.then(apt)("list") \
.then(grep)(f"{package}/")()

View File

@@ -0,0 +1,5 @@
import sys
sys.dont_write_bytecode = True
from .bash import *
from .commands import *

View File

@@ -0,0 +1,79 @@
from __future__ import annotations
import sys
sys.dont_write_bytecode = True
from typing import Union, Optional
from subprocess import run, PIPE
class Output:
def __init__(self, stdout: str, stderr: str):
self.stdout = stdout
self.stderr = stderr
def observe(self):
if self.stdout == "" and self.stderr != "":
raise RuntimeError(self.stderr)
else:
return self.stdout
def __str__(self):
print([self.stdout, self.stderr])
return self.stdout + self.stderr
class Bash:
def __init__(self, cmd: str, stdin: Optional[str]=None):
self.cmd = cmd
self.stdin = stdin
def __call__(self, *arg) -> str:
result = run(
[self.cmd, *arg],
input=self.stdin,
capture_output=True,
text=True
)
return Output(result.stdout, result.stderr)
class Promise():
def __init__(self, stdin: Optional[str]=None):
self.stdin = stdin
self.stash = None
self.stdout = None
def then(self, bash: Union[str, Bash], *arg) -> Promise:
if type(bash) == str:
bash = Bash(bash)
self.stash = bash
self.stash.stdin = self.stdout if self.stdout != None else self.stdin
return self.__call__(*arg) if len(arg) > 0 else self
def observe(self) -> str:
return self.stdout.strip("\n")
def __call__(self, *arg) -> Promise:
if self.stash != None:
self.stdout = self.stash(*arg).observe()
self.stash = None
return self
else:
return self.observe()
Pkexec = \
lambda cmd: \
lambda *arg: \
Bash("pkexec")(cmd, *arg)
Pkexec_GUI = \
lambda cmd: \
lambda *arg: \
Bash("pkexec")(
"env",
"DISPLAY=$DISPLAY",
"XAUTHORITY=$XAUTHORITY",
cmd,
*arg
)

View File

@@ -0,0 +1,33 @@
from .bash import *
# file operation
mkdir = Bash("mkdir")
touch = Bash("touch")
cp = Bash("cp")
mv = Bash("mv")
rm = Bash("rm")
ls = Bash("ls")
tree = Bash("tree")
# tools
grep = Bash("grep")
cut = Bash("cut")
wget = Bash("wget")
gedit = Bash("gedit")
Pkexec_gedit = Pkexec_GUI("gedit")
# system & setting
apt = Bash("apt")
Pkexec_apt = Pkexec("apt")
Pkexec_evince = Pkexec("evince")
gsettings = Bash("gsettings")
xrandr = Bash("xrandr")
# development
python = Bash("python")
code = Bash("code")
# application
evince = Bash("evince")
rhythmbox_client = Bash("rhythmbox-client")
totem = Bash("totem")

View File

View File

@@ -0,0 +1,27 @@
import os
import openai
import time
import json
class OpenAI:
"""
A wrapper for OpenAI API
"""
def __init__(self, config_path=None):
with open(config_path) as f:
config = json.load(f)
self.model_name = config['model_name']
openai.api_key = config['OPENAI_API_KEY']
openai.organization = config['OPENAI_ORGANIZATION']
def chat(self, messages, temperature=0, sleep_time=2):
response = openai.ChatCompletion.create(
model=self.model_name,
messages=messages,
temperature=temperature
)
# time.sleep(sleep_time)
return response['choices'][0]['message']

View File

@@ -0,0 +1,21 @@
base_prompt = """{system_prompt}
{tool_description}
To use a tool, please use the following format:
```
{thought}to address the user request, thinking about what are the sub-goals you need to achieve and which tool is needed for each sub-goal?
{action}the tool name, should be one of [{action_names}].
{action_input}the input to the action, could be any valid input for python programs or shell commands, such numbers, strings, or path to a file, etc.
```
The response after utilizing tools should using the following format:
```
{response}To generate a response, you need to summarize your thoughts above and combined them with the tool execution results.
``
If you already know the answer, or you do not need to use tools,
please using the following format to reply:
```
{thought}the thought process to answer user questions
{response}respond to user request based on thought
```
Now you are ready to take questions, requests from users.
"""

View File

@@ -0,0 +1,36 @@
from dataclasses import asdict, dataclass, field
from enum import Enum
from typing import Dict, List, Optional, Union
class ActionStatusCode(int, Enum):
ING = 0
SUCCESS = 1
FAILED = -1
class ActionValidCode(int, Enum):
FINISH = 1
OPEN = 0
CLOSED = -1
INVALID = -2
ABSENT = -3 # NO ACTION
@dataclass
class ActionReturn:
args: Dict
url: Optional[str] = None
type: Optional[str] = None
result: Optional[str] = None
errmsg: Optional[str] = None
state: Union[ActionStatusCode, int] = ActionStatusCode.SUCCESS
thought: Optional[str] = None
valid: Optional[ActionValidCode] = ActionValidCode.OPEN
@dataclass
class EnvState:
command: List[str] = field(default_factory=list)
result: Optional[str] = None
pwd: Optional[str] = None
ls: Optional[str] = None

View File

View File

@@ -0,0 +1,100 @@
import subprocess
import time
import os
from typing import Optional
from jarvis.core.schema import ActionReturn, ActionStatusCode, EnvState
import asyncio
class BaseEnviroment:
"""Base class for all actions.
Args:
description (str, optional): The description of the action. Defaults to
None.
name (str, optional): The name of the action. If None, the name will
be class name. Defaults to None.
"""
def __init__(self,
name: Optional[str] = None,
timeout: Optional[int] = 2,
execute_log_path: Optional[str] = "execute.log",
state_log_path: Optional[str] = "state.log",
logging_command: Optional[str] = " 2>&1 | tee ") -> None:
if name is None:
name = self.__class__.__name__
self._name = name
self.timeout = timeout
self.execute_log_path = execute_log_path
self.state_log_path = state_log_path
self.working_dir = os.path.abspath(os.path.join(__file__, "..", "..", "..", "working_dir"))
self.log_execute = logging_command + os.path.join(self.working_dir, self.execute_log_path)
self.log_state = logging_command + os.path.join(self.working_dir, self.state_log_path)
self.server_name = "test"
self.env_state = None
# print(self.working_dir)
def init_env(self):
# cur_dir = os.getcwd()
# os.chdir(self.working_dir)
# command = "tmux new-session -d -s " + self.server_name
# os.system(command)
# os.chdir(cur_dir)
# todo catch new duplicate session error
subprocess.run(["tmux", "new-session", "-d", "-s", self.server_name], cwd=self.working_dir)
output = subprocess.check_output(["tmux", "ls"]).decode("utf-8")
# output = subprocess.run(["tmux ls"], capture_output=True, shell=True, text=True)
# print(output)
# todo add raise error
if self.server_name not in output:
return False, "Error: init server failed"
else:
return True, "Init tmux server, done."
def execute_command(self, command):
subprocess.run(["tmux", "send-keys", "-t", self.server_name, command, "Enter"])
def step(self, _command) -> EnvState:
self.env_state = EnvState()
self.env_state.command.append(_command)
command = _command + self.log_execute
subprocess.run(["tmux", "send-keys", "-t", self.server_name, command, "Enter"])
time.sleep(self.timeout)
self.observe()
return self.env_state
def read_log(self, log_path):
os.chdir(self.working_dir)
with open(log_path) as f:
return f.readlines()
def observe(self) -> None:
for _command in ["pwd", "ls"]:
command = _command + self.log_state
subprocess.run(["tmux", "send-keys", "-t", self.server_name, command, "Enter"])
# todo 处理异步的问题,可能提交过去还没执行完。
time.sleep(self.timeout)
if _command == "pwd":
self.env_state.pwd = self.read_log(self.state_log_path)
else:
self.env_state.ls = self.read_log(self.state_log_path)
self.env_state.result = self.read_log(self.execute_log_path)
@property
def name(self):
return self._name
@property
def description(self):
return self._description
def __repr__(self):
return f'{self.name}:{self.description}'
def __str__(self):
return self.__repr__()
if __name__ == '__main__':
env = BaseEnviroment()
result = env.observe()