Coverage for autogpt/app.py: 22%

98 statements  

« prev     ^ index     » next       coverage.py v7.2.3, created at 2023-04-22 05:45 +0000

1""" Command and Control """ 

2import json 

3from typing import Dict, List, NoReturn, Union 

4 

5from autogpt.agent.agent_manager import AgentManager 

6from autogpt.commands.command import CommandRegistry, command 

7from autogpt.commands.web_requests import scrape_links, scrape_text 

8from autogpt.config import Config 

9from autogpt.memory import get_memory 

10from autogpt.processing.text import summarize_text 

11from autogpt.prompts.generator import PromptGenerator 

12from autogpt.speech import say_text 

13 

14CFG = Config() 

15AGENT_MANAGER = AgentManager() 

16 

17 

18def is_valid_int(value: str) -> bool: 

19 """Check if the value is a valid integer 

20 

21 Args: 

22 value (str): The value to check 

23 

24 Returns: 

25 bool: True if the value is a valid integer, False otherwise 

26 """ 

27 try: 

28 int(value) 

29 return True 

30 except ValueError: 

31 return False 

32 

33 

34def get_command(response_json: Dict): 

35 """Parse the response and return the command name and arguments 

36 

37 Args: 

38 response_json (json): The response from the AI 

39 

40 Returns: 

41 tuple: The command name and arguments 

42 

43 Raises: 

44 json.decoder.JSONDecodeError: If the response is not valid JSON 

45 

46 Exception: If any other error occurs 

47 """ 

48 try: 

49 if "command" not in response_json: 

50 return "Error:", "Missing 'command' object in JSON" 

51 

52 if not isinstance(response_json, dict): 

53 return "Error:", f"'response_json' object is not dictionary {response_json}" 

54 

55 command = response_json["command"] 

56 if not isinstance(command, dict): 

57 return "Error:", "'command' object is not a dictionary" 

58 

59 if "name" not in command: 

60 return "Error:", "Missing 'name' field in 'command' object" 

61 

62 command_name = command["name"] 

63 

64 # Use an empty dictionary if 'args' field is not present in 'command' object 

65 arguments = command.get("args", {}) 

66 

67 return command_name, arguments 

68 except json.decoder.JSONDecodeError: 

69 return "Error:", "Invalid JSON" 

70 # All other errors, return "Error: + error message" 

71 except Exception as e: 

72 return "Error:", str(e) 

73 

74 

75def map_command_synonyms(command_name: str): 

76 """Takes the original command name given by the AI, and checks if the 

77 string matches a list of common/known hallucinations 

78 """ 

79 synonyms = [ 

80 ("write_file", "write_to_file"), 

81 ("create_file", "write_to_file"), 

82 ("search", "google"), 

83 ] 

84 for seen_command, actual_command_name in synonyms: 

85 if command_name == seen_command: 

86 return actual_command_name 

87 return command_name 

88 

89 

90def execute_command( 

91 command_registry: CommandRegistry, 

92 command_name: str, 

93 arguments, 

94 prompt: PromptGenerator, 

95): 

96 """Execute the command and return the result 

97 

98 Args: 

99 command_name (str): The name of the command to execute 

100 arguments (dict): The arguments for the command 

101 

102 Returns: 

103 str: The result of the command 

104 """ 

105 try: 

106 cmd = command_registry.commands.get(command_name) 

107 

108 # If the command is found, call it with the provided arguments 

109 if cmd: 

110 return cmd(**arguments) 

111 

112 # TODO: Remove commands below after they are moved to the command registry. 

113 command_name = map_command_synonyms(command_name.lower()) 

114 

115 if command_name == "memory_add": 

116 return get_memory(CFG).add(arguments["string"]) 

117 

118 # TODO: Change these to take in a file rather than pasted code, if 

119 # non-file is given, return instructions "Input should be a python 

120 # filepath, write your code to file and try again 

121 elif command_name == "do_nothing": 

122 return "No action performed." 

123 elif command_name == "task_complete": 

124 shutdown() 

125 else: 

126 for command in prompt.commands: 

127 if command_name == command["label"] or command_name == command["name"]: 

128 return command["function"](*arguments.values()) 

129 return ( 

130 f"Unknown command '{command_name}'. Please refer to the 'COMMANDS'" 

131 " list for available commands and only respond in the specified JSON" 

132 " format." 

133 ) 

134 except Exception as e: 

135 return f"Error: {str(e)}" 

136 

137 

138@command( 

139 "get_text_summary", "Get text summary", '"url": "<url>", "question": "<question>"' 

140) 

141def get_text_summary(url: str, question: str) -> str: 

142 """Return the results of a Google search 

143 

144 Args: 

145 url (str): The url to scrape 

146 question (str): The question to summarize the text for 

147 

148 Returns: 

149 str: The summary of the text 

150 """ 

151 text = scrape_text(url) 

152 summary = summarize_text(url, text, question) 

153 return f""" "Result" : {summary}""" 

154 

155 

156@command("get_hyperlinks", "Get text summary", '"url": "<url>"') 

157def get_hyperlinks(url: str) -> Union[str, List[str]]: 

158 """Return the results of a Google search 

159 

160 Args: 

161 url (str): The url to scrape 

162 

163 Returns: 

164 str or list: The hyperlinks on the page 

165 """ 

166 return scrape_links(url) 

167 

168 

169def shutdown() -> NoReturn: 

170 """Shut down the program""" 

171 print("Shutting down...") 

172 quit() 

173 

174 

175@command( 

176 "start_agent", 

177 "Start GPT Agent", 

178 '"name": "<name>", "task": "<short_task_desc>", "prompt": "<prompt>"', 

179) 

180def start_agent(name: str, task: str, prompt: str, model=CFG.fast_llm_model) -> str: 

181 """Start an agent with a given name, task, and prompt 

182 

183 Args: 

184 name (str): The name of the agent 

185 task (str): The task of the agent 

186 prompt (str): The prompt for the agent 

187 model (str): The model to use for the agent 

188 

189 Returns: 

190 str: The response of the agent 

191 """ 

192 # Remove underscores from name 

193 voice_name = name.replace("_", " ") 

194 

195 first_message = f"""You are {name}. Respond with: "Acknowledged".""" 

196 agent_intro = f"{voice_name} here, Reporting for duty!" 

197 

198 # Create agent 

199 if CFG.speak_mode: 

200 say_text(agent_intro, 1) 

201 key, ack = AGENT_MANAGER.create_agent(task, first_message, model) 

202 

203 if CFG.speak_mode: 

204 say_text(f"Hello {voice_name}. Your task is as follows. {task}.") 

205 

206 # Assign task (prompt), get response 

207 agent_response = AGENT_MANAGER.message_agent(key, prompt) 

208 

209 return f"Agent {name} created with key {key}. First response: {agent_response}" 

210 

211 

212@command("message_agent", "Message GPT Agent", '"key": "<key>", "message": "<message>"') 

213def message_agent(key: str, message: str) -> str: 

214 """Message an agent with a given key and message""" 

215 # Check if the key is a valid integer 

216 if is_valid_int(key): 

217 agent_response = AGENT_MANAGER.message_agent(int(key), message) 

218 else: 

219 return "Invalid key, must be an integer." 

220 

221 # Speak response 

222 if CFG.speak_mode: 

223 say_text(agent_response, 1) 

224 return agent_response 

225 

226 

227@command("list_agents", "List GPT Agents", "") 

228def list_agents() -> str: 

229 """List all agents 

230 

231 Returns: 

232 str: A list of all agents 

233 """ 

234 return "List of agents:\n" + "\n".join( 

235 [str(x[0]) + ": " + x[1] for x in AGENT_MANAGER.list_agents()] 

236 ) 

237 

238 

239@command("delete_agent", "Delete GPT Agent", '"key": "<key>"') 

240def delete_agent(key: str) -> str: 

241 """Delete an agent with a given key 

242 

243 Args: 

244 key (str): The key of the agent to delete 

245 

246 Returns: 

247 str: A message indicating whether the agent was deleted or not 

248 """ 

249 result = AGENT_MANAGER.delete_agent(key) 

250 return f"Agent {key} deleted." if result else f"Agent {key} does not exist."