Coverage for autogpt/app.py: 22%
98 statements
« prev ^ index » next coverage.py v7.2.3, created at 2023-04-22 05:45 +0000
« prev ^ index » next coverage.py v7.2.3, created at 2023-04-22 05:45 +0000
1""" Command and Control """
2import json
3from typing import Dict, List, NoReturn, Union
5from autogpt.agent.agent_manager import AgentManager
6from autogpt.commands.command import CommandRegistry, command
7from autogpt.commands.web_requests import scrape_links, scrape_text
8from autogpt.config import Config
9from autogpt.memory import get_memory
10from autogpt.processing.text import summarize_text
11from autogpt.prompts.generator import PromptGenerator
12from autogpt.speech import say_text
14CFG = Config()
15AGENT_MANAGER = AgentManager()
18def is_valid_int(value: str) -> bool:
19 """Check if the value is a valid integer
21 Args:
22 value (str): The value to check
24 Returns:
25 bool: True if the value is a valid integer, False otherwise
26 """
27 try:
28 int(value)
29 return True
30 except ValueError:
31 return False
34def get_command(response_json: Dict):
35 """Parse the response and return the command name and arguments
37 Args:
38 response_json (json): The response from the AI
40 Returns:
41 tuple: The command name and arguments
43 Raises:
44 json.decoder.JSONDecodeError: If the response is not valid JSON
46 Exception: If any other error occurs
47 """
48 try:
49 if "command" not in response_json:
50 return "Error:", "Missing 'command' object in JSON"
52 if not isinstance(response_json, dict):
53 return "Error:", f"'response_json' object is not dictionary {response_json}"
55 command = response_json["command"]
56 if not isinstance(command, dict):
57 return "Error:", "'command' object is not a dictionary"
59 if "name" not in command:
60 return "Error:", "Missing 'name' field in 'command' object"
62 command_name = command["name"]
64 # Use an empty dictionary if 'args' field is not present in 'command' object
65 arguments = command.get("args", {})
67 return command_name, arguments
68 except json.decoder.JSONDecodeError:
69 return "Error:", "Invalid JSON"
70 # All other errors, return "Error: + error message"
71 except Exception as e:
72 return "Error:", str(e)
75def map_command_synonyms(command_name: str):
76 """Takes the original command name given by the AI, and checks if the
77 string matches a list of common/known hallucinations
78 """
79 synonyms = [
80 ("write_file", "write_to_file"),
81 ("create_file", "write_to_file"),
82 ("search", "google"),
83 ]
84 for seen_command, actual_command_name in synonyms:
85 if command_name == seen_command:
86 return actual_command_name
87 return command_name
90def execute_command(
91 command_registry: CommandRegistry,
92 command_name: str,
93 arguments,
94 prompt: PromptGenerator,
95):
96 """Execute the command and return the result
98 Args:
99 command_name (str): The name of the command to execute
100 arguments (dict): The arguments for the command
102 Returns:
103 str: The result of the command
104 """
105 try:
106 cmd = command_registry.commands.get(command_name)
108 # If the command is found, call it with the provided arguments
109 if cmd:
110 return cmd(**arguments)
112 # TODO: Remove commands below after they are moved to the command registry.
113 command_name = map_command_synonyms(command_name.lower())
115 if command_name == "memory_add":
116 return get_memory(CFG).add(arguments["string"])
118 # TODO: Change these to take in a file rather than pasted code, if
119 # non-file is given, return instructions "Input should be a python
120 # filepath, write your code to file and try again
121 elif command_name == "do_nothing":
122 return "No action performed."
123 elif command_name == "task_complete":
124 shutdown()
125 else:
126 for command in prompt.commands:
127 if command_name == command["label"] or command_name == command["name"]:
128 return command["function"](*arguments.values())
129 return (
130 f"Unknown command '{command_name}'. Please refer to the 'COMMANDS'"
131 " list for available commands and only respond in the specified JSON"
132 " format."
133 )
134 except Exception as e:
135 return f"Error: {str(e)}"
138@command(
139 "get_text_summary", "Get text summary", '"url": "<url>", "question": "<question>"'
140)
141def get_text_summary(url: str, question: str) -> str:
142 """Return the results of a Google search
144 Args:
145 url (str): The url to scrape
146 question (str): The question to summarize the text for
148 Returns:
149 str: The summary of the text
150 """
151 text = scrape_text(url)
152 summary = summarize_text(url, text, question)
153 return f""" "Result" : {summary}"""
156@command("get_hyperlinks", "Get text summary", '"url": "<url>"')
157def get_hyperlinks(url: str) -> Union[str, List[str]]:
158 """Return the results of a Google search
160 Args:
161 url (str): The url to scrape
163 Returns:
164 str or list: The hyperlinks on the page
165 """
166 return scrape_links(url)
169def shutdown() -> NoReturn:
170 """Shut down the program"""
171 print("Shutting down...")
172 quit()
175@command(
176 "start_agent",
177 "Start GPT Agent",
178 '"name": "<name>", "task": "<short_task_desc>", "prompt": "<prompt>"',
179)
180def start_agent(name: str, task: str, prompt: str, model=CFG.fast_llm_model) -> str:
181 """Start an agent with a given name, task, and prompt
183 Args:
184 name (str): The name of the agent
185 task (str): The task of the agent
186 prompt (str): The prompt for the agent
187 model (str): The model to use for the agent
189 Returns:
190 str: The response of the agent
191 """
192 # Remove underscores from name
193 voice_name = name.replace("_", " ")
195 first_message = f"""You are {name}. Respond with: "Acknowledged"."""
196 agent_intro = f"{voice_name} here, Reporting for duty!"
198 # Create agent
199 if CFG.speak_mode:
200 say_text(agent_intro, 1)
201 key, ack = AGENT_MANAGER.create_agent(task, first_message, model)
203 if CFG.speak_mode:
204 say_text(f"Hello {voice_name}. Your task is as follows. {task}.")
206 # Assign task (prompt), get response
207 agent_response = AGENT_MANAGER.message_agent(key, prompt)
209 return f"Agent {name} created with key {key}. First response: {agent_response}"
212@command("message_agent", "Message GPT Agent", '"key": "<key>", "message": "<message>"')
213def message_agent(key: str, message: str) -> str:
214 """Message an agent with a given key and message"""
215 # Check if the key is a valid integer
216 if is_valid_int(key):
217 agent_response = AGENT_MANAGER.message_agent(int(key), message)
218 else:
219 return "Invalid key, must be an integer."
221 # Speak response
222 if CFG.speak_mode:
223 say_text(agent_response, 1)
224 return agent_response
227@command("list_agents", "List GPT Agents", "")
228def list_agents() -> str:
229 """List all agents
231 Returns:
232 str: A list of all agents
233 """
234 return "List of agents:\n" + "\n".join(
235 [str(x[0]) + ": " + x[1] for x in AGENT_MANAGER.list_agents()]
236 )
239@command("delete_agent", "Delete GPT Agent", '"key": "<key>"')
240def delete_agent(key: str) -> str:
241 """Delete an agent with a given key
243 Args:
244 key (str): The key of the agent to delete
246 Returns:
247 str: A message indicating whether the agent was deleted or not
248 """
249 result = AGENT_MANAGER.delete_agent(key)
250 return f"Agent {key} deleted." if result else f"Agent {key} does not exist."