mirror of
https://github.com/OS-Copilot/OS-Copilot.git
synced 2026-05-05 03:00:15 -04:00
90 lines
3.5 KiB
Python
90 lines
3.5 KiB
Python
from jarvis.agent.openai_agent import OpenAIAgent
|
|
from jarvis.agent.jarvis_agent import JarvisAgent
|
|
|
|
'''
|
|
Made By DZC
|
|
'''
|
|
|
|
# path of action lib
|
|
action_lib_path = "../jarvis/action_lib"
|
|
jarvis_agent = JarvisAgent(config_path="./config.json", action_lib_dir=action_lib_path)
|
|
planning_agent = jarvis_agent.planner
|
|
retrieve_agent = jarvis_agent.retriever
|
|
execute_agent = jarvis_agent.executor
|
|
|
|
# We assume that the response result comes from the task planning agent.
|
|
task = '''
|
|
zip all the files in the folder called myfolder to test, and unzip test.zip to the folder called agent.
|
|
'''
|
|
# decompose task
|
|
planning_agent.decompose_task(task)
|
|
|
|
# retrieve existing tools
|
|
# for action_name, action_node in planning_agent.action_node.items():
|
|
# action_description = action_node.description
|
|
# retrieve_code = retrieve_agent.search_action(action_description)
|
|
# if retrieve_code:
|
|
# code = retrieve_code[0]
|
|
# planning_agent.update_action(action_name, code, None)
|
|
|
|
while planning_agent.execute_list:
|
|
action = planning_agent.execute_list[0]
|
|
action_node = planning_agent.action_node[action]
|
|
description = action_node.description
|
|
code = action_node.code
|
|
if not code:
|
|
# Create python tool class code
|
|
code = execute_agent.generate_action(action, description)
|
|
# Execute python tool class code
|
|
state = execute_agent.execute_action(code, description)
|
|
# Check whether the code runs correctly, if not, amend the code
|
|
need_mend = False
|
|
trial_times = 0
|
|
critique = ''
|
|
score = 0
|
|
# If no error is reported, check whether the task is completed
|
|
if state.error == None:
|
|
critique, judge, score = execute_agent.judge_action(code, description, state)
|
|
if not judge:
|
|
print("critique: {}".format(critique))
|
|
need_mend = True
|
|
else:
|
|
# Determine whether it is caused by an error outside the code
|
|
reasoning, type = execute_agent.analysis_action(code, description, state)
|
|
if type == 'replan':
|
|
planning_agent.replan_task(reasoning, action)
|
|
continue
|
|
need_mend = True
|
|
# The code failed to complete its task, fix the code
|
|
current_code = code
|
|
while (trial_times < execute_agent.max_iter and need_mend == True):
|
|
trial_times += 1
|
|
print("current amend times: {}".format(trial_times))
|
|
new_code = execute_agent.amend_action(current_code, description, state, critique)
|
|
critique = ''
|
|
current_code = new_code
|
|
# Run the current code and check for errors
|
|
state = execute_agent.execute_action(current_code, description)
|
|
# print(state)
|
|
# Recheck
|
|
if state.error == None:
|
|
critique, judge, score = execute_agent.judge_action(current_code, description, state)
|
|
# The task execution is completed and the loop exits
|
|
if judge:
|
|
need_mend = False
|
|
break
|
|
# print("critique: {}".format(critique))
|
|
else: # The code still needs to be corrected
|
|
need_mend = True
|
|
|
|
# If the task still cannot be completed, an error message will be reported.
|
|
if need_mend == True:
|
|
print("I can't Do this Task!!")
|
|
else: # The task is completed, if code is save the code, args_description, action_description in lib
|
|
if score >= 8:
|
|
execute_agent.store_action(action, current_code)
|
|
print("Current task execution completed!!!")
|
|
planning_agent.update_action(action, current_code, state.result, True)
|
|
planning_agent.execute_list.remove(action)
|
|
|