Merge branch 'master' into dev

This commit is contained in:
Andres Caicedo
2023-04-04 10:37:42 +02:00
7 changed files with 135 additions and 56 deletions

View File

@@ -10,6 +10,9 @@ from file_operations import read_file, write_to_file, append_to_file, delete_fil
from execute_code import execute_python_file
from json_parser import fix_and_parse_json
from googlesearch import search
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
cfg = Config()
@@ -46,7 +49,13 @@ def execute_command(command_name, arguments):
"""Execute the command and return the response"""
try:
if command_name == "google":
return google_search(arguments["input"])
# Check if the Google API key is set and use the official search method
# If the API key is not set or has only whitespaces, use the unofficial search method
if cfg.google_api_key and (cfg.google_api_key.strip() if cfg.google_api_key else None):
return google_official_search(arguments["input"])
else:
return google_search(arguments["input"])
elif command_name == "memory_add":
return commit_memory(arguments["string"])
elif command_name == "memory_del":
@@ -112,6 +121,40 @@ def google_search(query, num_results=8):
return json.dumps(search_results, ensure_ascii=False, indent=4)
def google_official_search(query, num_results=8):
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import json
try:
# Get the Google API key and Custom Search Engine ID from the config file
api_key = cfg.google_api_key
custom_search_engine_id = cfg.custom_search_engine_id
# Initialize the Custom Search API service
service = build("customsearch", "v1", developerKey=api_key)
# Send the search query and retrieve the results
result = service.cse().list(q=query, cx=custom_search_engine_id, num=num_results).execute()
# Extract the search result items from the response
search_results = result.get("items", [])
# Create a list of only the URLs from the search results
search_results_links = [item["link"] for item in search_results]
except HttpError as e:
# Handle errors in the API call
error_details = json.loads(e.content.decode())
# Check if the error is related to an invalid or missing API key
if error_details.get("error", {}).get("code") == 403 and "invalid API key" in error_details.get("error", {}).get("message", ""):
return "Error: The provided Google API key is invalid or missing."
else:
return f"Error: {e}"
# Return the list of search result URLs
return search_results_links
def browse_website(url):
"""Return the results of a google search"""

View File

@@ -37,11 +37,13 @@ class Config(metaclass=Singleton):
self.openai_api_key = os.getenv("OPENAI_API_KEY")
self.elevenlabs_api_key = os.getenv("ELEVENLABS_API_KEY")
self.google_api_key = os.getenv("GOOGLE_API_KEY")
self.custom_search_engine_id = os.getenv("CUSTOM_SEARCH_ENGINE_ID")
# Initialize the OpenAI API client
openai.api_key = self.openai_api_key
def set_continuous_mode(self, value: bool):
"""Set the continuous mode value."""
self.continuous_mode = value
@@ -68,11 +70,16 @@ class Config(metaclass=Singleton):
def set_openai_api_key(self, value: str):
"""Set the OpenAI API key value."""
self.apiopenai_api_key_key = value
self.openai_api_key = value
def set_elevenlabs_api_key(self, value: str):
"""Set the ElevenLabs API key value."""
self.elevenlabs_api_key = value
def set_google_api_key(self, value: str):
"""Set the Google API key value."""
self.google_api_key = value
def set_custom_search_engine_id(self, value: str):
"""Set the custom search engine ID value."""
self.custom_search_engine_id = value

View File

@@ -34,6 +34,9 @@ def write_to_file(filename, text):
"""Write text to a file"""
try:
filepath = safe_join(working_directory, filename)
directory = os.path.dirname(filepath)
if not os.path.exists(directory):
os.makedirs(directory)
with open(filepath, "w") as f:
f.write(text)
return "File written to successfully."

View File

@@ -54,65 +54,50 @@ def print_assistant_thoughts(assistant_reply):
# Parse and print Assistant response
assistant_reply_json = fix_and_parse_json(assistant_reply)
try:
assistant_thoughts = assistant_reply_json.get("thoughts")
if assistant_thoughts:
assistant_thoughts_text = assistant_thoughts.get("text")
assistant_thoughts_reasoning = assistant_thoughts.get("reasoning")
assistant_thoughts_plan = assistant_thoughts.get("plan")
assistant_thoughts_criticism = assistant_thoughts.get("criticism")
assistant_thoughts_speak = assistant_thoughts.get("speak")
else:
assistant_thoughts_text = None
assistant_thoughts_reasoning = None
assistant_thoughts_plan = None
assistant_thoughts_criticism = None
assistant_thoughts_speak = None
except Exception as e:
assistant_thoughts_text = "The AI's response was unreadable."
assistant_thoughts_reasoning = None
assistant_thoughts_plan = None
assistant_thoughts_speak = None
assistant_thoughts_criticism = None
assistant_thoughts = assistant_reply_json.get("thoughts", {})
assistant_thoughts_text = assistant_thoughts.get("text")
if assistant_thoughts:
assistant_thoughts_reasoning = assistant_thoughts.get("reasoning")
assistant_thoughts_plan = assistant_thoughts.get("plan")
assistant_thoughts_criticism = assistant_thoughts.get("criticism")
assistant_thoughts_speak = assistant_thoughts.get("speak")
print_to_console(f"{ai_name.upper()} THOUGHTS:", Fore.YELLOW, assistant_thoughts_text)
print_to_console("REASONING:", Fore.YELLOW, assistant_thoughts_reasoning)
print_to_console(
f"{ai_name.upper()} THOUGHTS:",
Fore.YELLOW,
assistant_thoughts_text)
print_to_console(
"REASONING:",
Fore.YELLOW,
assistant_thoughts_reasoning)
if assistant_thoughts_plan:
print_to_console("PLAN:", Fore.YELLOW, "")
if assistant_thoughts_plan:
# If it's a list, join it into a string
if isinstance(assistant_thoughts_plan, list):
assistant_thoughts_plan = "\n".join(assistant_thoughts_plan)
elif isinstance(assistant_thoughts_plan, dict):
assistant_thoughts_plan = str(assistant_thoughts_plan)
# Split the input_string using the newline character and dash
lines = assistant_thoughts_plan.split('\n')
# If it's a list, join it into a string
if isinstance(assistant_thoughts_plan, list):
assistant_thoughts_plan = "\n".join(assistant_thoughts_plan)
elif isinstance(assistant_thoughts_plan, dict):
assistant_thoughts_plan = str(assistant_thoughts_plan)
# Iterate through the lines and print each one with a bullet
# point
for line in lines:
# Remove any "-" characters from the start of the line
line = line.lstrip("- ")
print_to_console("- ", Fore.GREEN, line.strip())
print_to_console(
"CRITICISM:",
Fore.YELLOW,
assistant_thoughts_criticism)
# Split the input_string using the newline character and dashes
lines = assistant_thoughts_plan.split('\n')
for line in lines:
line = line.lstrip("- ")
print_to_console("- ", Fore.GREEN, line.strip())
print_to_console("CRITICISM:", Fore.YELLOW, assistant_thoughts_criticism)
# Speak the assistant's thoughts
if cfg.speak_mode and assistant_thoughts_speak:
speak.say_text(assistant_thoughts_speak)
except json.decoder.JSONDecodeError:
print_to_console("Error: Invalid JSON\n", Fore.RED, assistant_reply)
# All other errors, return "Error: + error message"
except Exception as e:
call_stack = traceback.format_exc()
print_to_console("Error: \n", Fore.RED, call_stack)
def load_variables(config_file="config.yaml"):
"""Load variables from yaml file if it exists, otherwise prompt the user for input"""
try: