mirror of
https://github.com/danielmiessler/Fabric.git
synced 2026-04-24 03:00:15 -04:00
Updated the standalone app
This commit is contained in:
144
client/utils.py
144
client/utils.py
@@ -1,50 +1,30 @@
|
||||
import requests
|
||||
import yaml
|
||||
import os
|
||||
from openai import OpenAI
|
||||
from dotenv import load_dotenv
|
||||
import pyperclip
|
||||
import socketio
|
||||
import sys
|
||||
import subprocess
|
||||
import shlex
|
||||
|
||||
current_directory = os.path.dirname(os.path.realpath(__file__))
|
||||
config_file = os.path.join(current_directory, "config.yaml")
|
||||
config_directory = os.path.expanduser("~/.config/fabric")
|
||||
env_file = os.path.join(config_directory, '.env')
|
||||
try:
|
||||
config = yaml.safe_load(open(config_file))['server']
|
||||
except FileNotFoundError:
|
||||
config = {}
|
||||
gunicorn_directory = os.path.join(current_directory, ".venv/bin")
|
||||
|
||||
|
||||
class Utilities:
|
||||
def __init__(self):
|
||||
domain = config['domain']
|
||||
port = config['port']
|
||||
baseurl = ''
|
||||
if config['port'] != 443:
|
||||
baseurl = f'http://{domain}:{port}'
|
||||
else:
|
||||
baseurl = f'https://{domain}'
|
||||
self.summarizestream = f"ws://{domain}:{port}"
|
||||
|
||||
|
||||
class Standalone:
|
||||
def __init__(self, args, pattern=''):
|
||||
with open(env_file, "r") as f:
|
||||
apikey = f.read().split("=")[1]
|
||||
self.client = OpenAI(api_key=apikey)
|
||||
try:
|
||||
with open(env_file, "r") as f:
|
||||
apikey = f.read().split("=")[1]
|
||||
self.client = OpenAI(api_key=apikey)
|
||||
except FileNotFoundError:
|
||||
print("No API key found. Use the --apikey option to set the key")
|
||||
sys.exit()
|
||||
self.config_pattern_directory = config_directory
|
||||
self.pattern = pattern
|
||||
self.args = args
|
||||
|
||||
def streamMessage(self, input_data: str):
|
||||
wisdomfileDirectory = os.path.join(
|
||||
current_directory, "server/app/chatgpt/patterns")
|
||||
wisdomFilePath = os.path.join(
|
||||
wisdomfileDirectory, f"{self.pattern}/system.md")
|
||||
config_directory, f"patterns/{self.pattern}/system.md")
|
||||
user_message = {"role": "user", "content": f"{input_data}"}
|
||||
wisdom_File = os.path.join(
|
||||
current_directory, wisdomFilePath)
|
||||
@@ -90,10 +70,8 @@ class Standalone:
|
||||
f.write(buffer)
|
||||
|
||||
def sendMessage(self, input_data: str):
|
||||
wisdomfileDirectory = os.path.join(
|
||||
current_directory, "server/app/chatgpt/patterns")
|
||||
wisdomFilePath = os.path.join(
|
||||
wisdomfileDirectory, f"{self.pattern}/system.md")
|
||||
config_directory, f"paterns/{self.pattern}/system.md")
|
||||
user_message = {"role": "user", "content": f"{input_data}"}
|
||||
wisdom_File = os.path.join(
|
||||
current_directory, wisdomFilePath)
|
||||
@@ -127,66 +105,46 @@ class Standalone:
|
||||
f.write(response.choices[0].message.content)
|
||||
|
||||
|
||||
class Remote:
|
||||
def __init__(self, module, args):
|
||||
self.module = module
|
||||
self.buffer = ''
|
||||
self.utils = Utilities()
|
||||
self.sio = socketio.Client()
|
||||
self.setup_handlers()
|
||||
self.args = args
|
||||
|
||||
def setup_handlers(self):
|
||||
@self.sio.event
|
||||
def message(data):
|
||||
global buffer
|
||||
self.buffer += data
|
||||
if self.args.stream:
|
||||
for char in data:
|
||||
if char not in ['\n', ' ']: # If the character is not a newline or space
|
||||
print(char, end='')
|
||||
elif char == ' ':
|
||||
print(' ', end='') # Explicitly handle spaces
|
||||
elif char == '\n':
|
||||
print() # Handle newlines
|
||||
sys.stdout.flush()
|
||||
|
||||
@self.sio.event
|
||||
def error(data):
|
||||
print(data)
|
||||
|
||||
@self.sio.event
|
||||
def disconnect():
|
||||
if self.args.copy:
|
||||
pyperclip.copy(self.buffer)
|
||||
if self.args.output:
|
||||
with open(self.args.output, 'w') as f:
|
||||
f.write(self.buffer)
|
||||
if not self.args.stream:
|
||||
print(self.buffer, end='')
|
||||
|
||||
def analyze(self, text, copy_to_clipboard=False, save_to_file=False):
|
||||
url = self.utils.summarizestream
|
||||
self.sio.connect(url)
|
||||
self.sio.emit('fabric', {'input_data': text, 'module': self.module})
|
||||
self.sio.wait()
|
||||
if copy_to_clipboard:
|
||||
pyperclip.copy(self.buffer)
|
||||
if save_to_file:
|
||||
with open(save_to_file, 'w') as f:
|
||||
f.write(self.buffer)
|
||||
|
||||
def disconnect_handler(self):
|
||||
self.sio.disconnect()
|
||||
|
||||
|
||||
class Server:
|
||||
class Update:
|
||||
def __init__(self):
|
||||
server_directory = os.path.join(
|
||||
current_directory, "server")
|
||||
os.chdir(server_directory)
|
||||
# Initialize with the root API URL
|
||||
self.root_api_url = "https://api.github.com/repos/danielmiessler/fabric/contents/patterns?ref=main"
|
||||
self.config_directory = os.path.expanduser("~/.config/fabric")
|
||||
self.pattern_directory = os.path.join(
|
||||
self.config_directory, 'patterns')
|
||||
# Ensure local directory exists
|
||||
os.makedirs(self.pattern_directory, exist_ok=True)
|
||||
self.get_github_directory_contents(
|
||||
self.root_api_url, self.pattern_directory)
|
||||
|
||||
def run_server(self, domain: str, port: str):
|
||||
print(f"please visit http://{domain}:{port} to view the web frontend")
|
||||
command = f"{gunicorn_directory}/gunicorn -k gevent -w 1 --timeout 240 -b {domain}:{port} 'app:create_app()'"
|
||||
subprocess.run(shlex.split(command))
|
||||
def download_file(self, url, local_path):
|
||||
"""
|
||||
Download a file from a URL to a local path.
|
||||
"""
|
||||
response = requests.get(url)
|
||||
response.raise_for_status() # This will raise an exception for HTTP error codes
|
||||
with open(local_path, 'wb') as f:
|
||||
f.write(response.content)
|
||||
|
||||
def process_item(self, item, local_dir):
|
||||
"""
|
||||
Process an individual item, downloading if it's a file, or processing further if it's a directory.
|
||||
"""
|
||||
if item['type'] == 'file':
|
||||
print(f"Downloading file: {item['name']} to {local_dir}")
|
||||
self.download_file(item['download_url'],
|
||||
os.path.join(local_dir, item['name']))
|
||||
elif item['type'] == 'dir':
|
||||
new_dir = os.path.join(local_dir, item['name'])
|
||||
os.makedirs(new_dir, exist_ok=True)
|
||||
self.get_github_directory_contents(item['url'], new_dir)
|
||||
|
||||
def get_github_directory_contents(self, api_url, local_dir):
|
||||
"""
|
||||
Fetches the contents of a directory in a GitHub repository and downloads files, recursively handling directories.
|
||||
"""
|
||||
response = requests.get(api_url)
|
||||
response.raise_for_status() # This will raise an exception for HTTP error codes
|
||||
jsonList = response.json()
|
||||
for item in jsonList:
|
||||
self.process_item(item, local_dir)
|
||||
|
||||
Reference in New Issue
Block a user