mirror of
https://github.com/e-johnstonn/BriefGPT.git
synced 2026-01-08 21:38:15 -05:00
450 lines
14 KiB
Python
450 lines
14 KiB
Python
import os
|
|
import time
|
|
import urllib.parse
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
import tiktoken
|
|
from langchain import PromptTemplate
|
|
from langchain.chains.summarize import load_summarize_chain
|
|
from langchain.chat_models import ChatOpenAI
|
|
from langchain.document_loaders import YoutubeLoader, TextLoader, PyPDFLoader, UnstructuredEPubLoader
|
|
from langchain.embeddings import OpenAIEmbeddings
|
|
from langchain.schema import Document
|
|
|
|
import numpy as np
|
|
from langchain.text_splitter import TokenTextSplitter
|
|
from sklearn.cluster import KMeans
|
|
|
|
import matplotlib.pyplot as plt
|
|
|
|
import streamlit as st
|
|
|
|
|
|
def doc_loader(file_path: str):
|
|
"""
|
|
Load the contents of a text document from a file path into a loaded langchain Document object.
|
|
|
|
:param file_path: The path to the text document to load.
|
|
|
|
:return: A langchain Document object.
|
|
"""
|
|
if file_path.endswith('.txt'):
|
|
loader = TextLoader(file_path)
|
|
elif file_path.endswith('.pdf'):
|
|
loader = PyPDFLoader(file_path)
|
|
elif file_path.endswith('.epub'):
|
|
try:
|
|
loader = UnstructuredEPubLoader(file_path)
|
|
except Exception as e:
|
|
st.warning('Error loading file - ensure you have pandoc installed and added to PATH.')
|
|
return None
|
|
|
|
return loader.load()
|
|
|
|
|
|
def directory_loader(directory):
|
|
files = os.listdir(directory)
|
|
documents = []
|
|
mixed_documents = []
|
|
for file in files:
|
|
if file.endswith('.txt'):
|
|
loader = TextLoader(os.path.join(directory, file))
|
|
documents.append(loader.load())
|
|
elif file.endswith('.pdf'):
|
|
loader = PyPDFLoader(os.path.join(directory, file))
|
|
documents.append(loader.load())
|
|
elif file.endswith('.epub'):
|
|
loader = UnstructuredEPubLoader(os.path.join(directory, file))
|
|
documents.append(loader.load())
|
|
for doc in documents:
|
|
for section in doc:
|
|
mixed_documents.append(section)
|
|
return mixed_documents
|
|
|
|
|
|
|
|
def token_counter(text: str):
|
|
"""
|
|
Count the number of tokens in a string of text.
|
|
|
|
:param text: The text to count the tokens of.
|
|
|
|
:return: The number of tokens in the text.
|
|
"""
|
|
encoding = tiktoken.get_encoding('cl100k_base')
|
|
token_list = encoding.encode(text, disallowed_special=())
|
|
tokens = len(token_list)
|
|
return tokens
|
|
|
|
|
|
def doc_to_text(document):
|
|
"""
|
|
Convert a langchain Document object into a string of text.
|
|
|
|
:param document: The loaded langchain Document object to convert.
|
|
|
|
:return: A string of text.
|
|
"""
|
|
text = ''
|
|
for i in document:
|
|
text += i.page_content
|
|
special_tokens = ['>|endoftext|', '<|fim_prefix|', '<|fim_middle|', '<|fim_suffix|', '<|endofprompt|']
|
|
words = text.split()
|
|
filtered_words = [word for word in words if word not in special_tokens]
|
|
text = ' '.join(filtered_words)
|
|
return text
|
|
|
|
def remove_special_tokens(docs):
|
|
special_tokens = ['>|endoftext|', '<|fim_prefix|', '<|fim_middle|', '<|fim_suffix|', '<|endofprompt|>']
|
|
for doc in docs:
|
|
content = doc.page_content
|
|
for special in special_tokens:
|
|
content = content.replace(special, '')
|
|
doc.page_content = content
|
|
return docs
|
|
|
|
|
|
|
|
def embed_docs_openai(docs):
|
|
"""
|
|
Embed a list of documents into a list of vectors.
|
|
|
|
:param docs: A list of documents to embed.
|
|
|
|
:param api_key: The OpenAI API key to use for embedding.
|
|
|
|
:return: A list of vectors.
|
|
"""
|
|
docs = remove_special_tokens(docs)
|
|
embeddings = OpenAIEmbeddings()
|
|
vectors = embeddings.embed_documents([x.page_content for x in docs])
|
|
return vectors
|
|
|
|
|
|
def kmeans_clustering(vectors, num_clusters=None):
|
|
"""
|
|
Cluster a list of vectors using K-Means clustering.
|
|
|
|
:param vectors: A list of vectors to cluster.
|
|
|
|
:param num_clusters: The number of clusters to use. If None, the optimal number of clusters will be determined.
|
|
|
|
:return: A K-Means clustering object.
|
|
"""
|
|
if num_clusters is None:
|
|
inertia_values = calculate_inertia(vectors)
|
|
num_clusters = determine_optimal_clusters(inertia_values)
|
|
print(f'Optimal number of clusters: {num_clusters}')
|
|
|
|
kmeans = KMeans(n_clusters=num_clusters, random_state=42).fit(vectors)
|
|
return kmeans
|
|
|
|
|
|
def get_closest_vectors(vectors, kmeans):
|
|
"""
|
|
Get the closest vectors to the cluster centers of a K-Means clustering object.
|
|
|
|
:param vectors: A list of vectors to cluster.
|
|
|
|
:param kmeans: A K-Means clustering object.
|
|
|
|
:return: A list of indices of the closest vectors to the cluster centers.
|
|
"""
|
|
closest_indices = []
|
|
for i in range(len(kmeans.cluster_centers_)):
|
|
distances = np.linalg.norm(vectors - kmeans.cluster_centers_[i], axis=1)
|
|
closest_index = np.argmin(distances)
|
|
closest_indices.append(closest_index)
|
|
|
|
selected_indices = sorted(closest_indices)
|
|
return selected_indices
|
|
|
|
|
|
def map_vectors_to_docs(indices, docs):
|
|
"""
|
|
Map a list of indices to a list of loaded langchain Document objects.
|
|
|
|
:param indices: A list of indices to map.
|
|
|
|
:param docs: A list of langchain Document objects to map to.
|
|
|
|
:return: A list of loaded langchain Document objects.
|
|
"""
|
|
selected_docs = [docs[i] for i in indices]
|
|
return selected_docs
|
|
|
|
|
|
def create_summarize_chain(prompt_list):
|
|
"""
|
|
Create a langchain summarize chain from a list of prompts.
|
|
|
|
:param prompt_list: A list containing the template, input variables, and llm to use for the chain.
|
|
|
|
:return: A langchain summarize chain.
|
|
"""
|
|
template = PromptTemplate(template=prompt_list[0], input_variables=([prompt_list[1]]))
|
|
chain = load_summarize_chain(llm=prompt_list[2], chain_type='stuff', prompt=template)
|
|
return chain
|
|
|
|
|
|
def parallelize_summaries(summary_docs, initial_chain, progress_bar, max_workers=4):
|
|
"""
|
|
Summarize a list of loaded langchain Document objects using multiple langchain summarize chains in parallel.
|
|
|
|
:param summary_docs: A list of loaded langchain Document objects to summarize.
|
|
|
|
:param initial_chain: A langchain summarize chain to use for summarization.
|
|
|
|
:param progress_bar: A streamlit progress bar to display the progress of the summarization.
|
|
|
|
:param max_workers: The maximum number of workers to use for parallelization.
|
|
|
|
:return: A list of summaries.
|
|
"""
|
|
doc_summaries = []
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
future_to_doc = {executor.submit(initial_chain.run, [doc]): doc.page_content for doc in summary_docs}
|
|
|
|
for future in as_completed(future_to_doc):
|
|
doc = future_to_doc[future]
|
|
|
|
try:
|
|
summary = future.result()
|
|
|
|
except Exception as exc:
|
|
print(f'{doc} generated an exception: {exc}')
|
|
|
|
else:
|
|
doc_summaries.append(summary)
|
|
num = (len(doc_summaries)) / (len(summary_docs) + 1)
|
|
progress_bar.progress(num) # Remove this line and all references to it if you are not using Streamlit.
|
|
return doc_summaries
|
|
|
|
|
|
|
|
|
|
|
|
def create_summary_from_docs(summary_docs, initial_chain, final_sum_list, use_gpt_4):
|
|
"""
|
|
Summarize a list of loaded langchain Document objects using multiple langchain summarize chains.
|
|
|
|
:param summary_docs: A list of loaded langchain Document objects to summarize.
|
|
|
|
:param initial_chain: The initial langchain summarize chain to use.
|
|
|
|
:param final_sum_list: A list containing the template, input variables, and llm to use for the final chain.
|
|
|
|
:param use_gpt_4: Whether to use GPT-4 or GPT-3.5-turbo for summarization.
|
|
|
|
:return: A string containing the summary.
|
|
"""
|
|
|
|
progress = st.progress(0) # Create a progress bar to show the progress of summarization.
|
|
# Remove this line and all references to it if you are not using Streamlit.
|
|
|
|
doc_summaries = parallelize_summaries(summary_docs, initial_chain, progress_bar=progress)
|
|
|
|
summaries = '\n'.join(doc_summaries)
|
|
count = token_counter(summaries)
|
|
|
|
if use_gpt_4:
|
|
max_tokens = 7500 - int(count)
|
|
model = 'gpt-4'
|
|
|
|
else:
|
|
max_tokens = 3800 - int(count)
|
|
model = 'gpt-3.5-turbo'
|
|
|
|
final_sum_list[2] = ChatOpenAI(temperature=.7, max_tokens=max_tokens, model_name=model)
|
|
final_sum_chain = create_summarize_chain(final_sum_list)
|
|
summaries = Document(page_content=summaries)
|
|
final_summary = final_sum_chain.run([summaries])
|
|
|
|
progress.progress(1.0) # Remove this line and all references to it if you are not using Streamlit.
|
|
time.sleep(0.4) # Remove this line and all references to it if you are not using Streamlit.
|
|
progress.empty() # Remove this line and all references to it if you are not using Streamlit.
|
|
|
|
return final_summary
|
|
|
|
|
|
def split_by_tokens(doc, num_clusters, ratio=5, minimum_tokens=200, maximum_tokens=2000):
|
|
"""
|
|
Split a langchain Document object into a list of smaller langchain Document objects.
|
|
|
|
:param doc: The langchain Document object to split.
|
|
|
|
:param num_clusters: The number of clusters to use.
|
|
|
|
:param ratio: The ratio of documents to clusters to use for splitting.
|
|
|
|
:param minimum_tokens: The minimum number of tokens to use for splitting.
|
|
|
|
:param maximum_tokens: The maximum number of tokens to use for splitting.
|
|
|
|
:return: A list of langchain Document objects.
|
|
"""
|
|
text_doc = doc_to_text(doc)
|
|
tokens = token_counter(text_doc)
|
|
chunks = num_clusters * ratio
|
|
max_tokens = int(tokens / chunks)
|
|
max_tokens = max(minimum_tokens, min(max_tokens, maximum_tokens))
|
|
overlap = int(max_tokens/10)
|
|
|
|
splitter = TokenTextSplitter(chunk_size=max_tokens, chunk_overlap=overlap)
|
|
split_doc = splitter.create_documents([text_doc])
|
|
return split_doc
|
|
|
|
|
|
def extract_summary_docs(langchain_document, num_clusters, find_clusters):
|
|
"""
|
|
Automatically convert a single langchain Document object into a list of smaller langchain Document objects that represent each cluster.
|
|
|
|
:param langchain_document: The langchain Document object to summarize.
|
|
|
|
:param num_clusters: The number of clusters to use.
|
|
|
|
:param find_clusters: Whether to find the optimal number of clusters to use.
|
|
|
|
:return: A list of langchain Document objects.
|
|
"""
|
|
split_document = split_by_tokens(langchain_document, num_clusters)
|
|
vectors = embed_docs_openai(split_document)
|
|
|
|
if find_clusters:
|
|
kmeans = kmeans_clustering(vectors, None)
|
|
|
|
else:
|
|
kmeans = kmeans_clustering(vectors, num_clusters)
|
|
|
|
indices = get_closest_vectors(vectors, kmeans)
|
|
summary_docs = map_vectors_to_docs(indices, split_document)
|
|
return summary_docs
|
|
|
|
|
|
def doc_to_final_summary(langchain_document, num_clusters, initial_prompt_list, final_prompt_list, use_gpt_4, find_clusters=False):
|
|
"""
|
|
Automatically summarize a single langchain Document object using multiple langchain summarize chains.
|
|
|
|
:param langchain_document: The langchain Document object to summarize.
|
|
|
|
:param num_clusters: The number of clusters to use.
|
|
|
|
:param initial_prompt_list: The initial langchain summarize chain to use.
|
|
|
|
:param final_prompt_list: A list containing the template, input variables, and llm to use for the final chain.
|
|
|
|
:param use_gpt_4: Whether to use GPT-4 or GPT-3.5-turbo for summarization.
|
|
|
|
:param find_clusters: Whether to automatically find the optimal number of clusters to use.
|
|
|
|
:return: A string containing the summary.
|
|
"""
|
|
initial_prompt_list = create_summarize_chain(initial_prompt_list)
|
|
summary_docs = extract_summary_docs(langchain_document, num_clusters, find_clusters)
|
|
output = create_summary_from_docs(summary_docs, initial_prompt_list, final_prompt_list, use_gpt_4)
|
|
return output
|
|
|
|
|
|
def summary_prompt_creator(prompt, input_var, llm):
|
|
"""
|
|
Create a list containing the template, input variables, and llm to use for a langchain summarize chain.
|
|
|
|
:param prompt: The template to use for the chain.
|
|
|
|
:param input_var: The input variables to use for the chain.
|
|
|
|
:param llm: The llm to use for the chain.
|
|
|
|
:return: A list containing the template, input variables, and llm to use for the chain.
|
|
"""
|
|
prompt_list = [prompt, input_var, llm]
|
|
return prompt_list
|
|
|
|
|
|
def extract_video_id(video_url):
|
|
"""
|
|
Extract the YouTube video ID from a YouTube video URL.
|
|
|
|
:param video_url: The URL of the YouTube video.
|
|
|
|
:return: The ID of the YouTube video.
|
|
"""
|
|
parsed_url = urllib.parse.urlparse(video_url)
|
|
if parsed_url.hostname == 'youtu.be':
|
|
return parsed_url.path[1:]
|
|
|
|
elif parsed_url.hostname in ('www.youtube.com', 'youtube.com'):
|
|
|
|
if parsed_url.path == '/watch':
|
|
p = urllib.parse.parse_qs(parsed_url.query)
|
|
return p.get('v', [None])[0]
|
|
|
|
elif parsed_url.path.startswith('/embed/'):
|
|
return parsed_url.path.split('/embed/')[1]
|
|
|
|
elif parsed_url.path.startswith('/v/'):
|
|
return parsed_url.path.split('/v/')[1]
|
|
|
|
return None
|
|
|
|
|
|
def transcript_loader(video_url):
|
|
"""
|
|
Load the transcript of a YouTube video into a loaded langchain Document object.
|
|
|
|
:param video_url: The URL of the YouTube video to load the transcript of.
|
|
|
|
:return: A loaded langchain Document object.
|
|
"""
|
|
transcript = YoutubeLoader(video_id=extract_video_id(video_url))
|
|
loaded = transcript.load()
|
|
return loaded
|
|
|
|
|
|
def calculate_inertia(vectors, max_clusters=12):
|
|
"""
|
|
Calculate the inertia values for a range of clusters.
|
|
|
|
:param vectors: A list of vectors to cluster.
|
|
|
|
:param max_clusters: The maximum number of clusters to use.
|
|
|
|
:return: A list of inertia values.
|
|
"""
|
|
inertia_values = []
|
|
for num_clusters in range(1, max_clusters + 1):
|
|
kmeans = KMeans(n_clusters=num_clusters, random_state=42).fit(vectors)
|
|
inertia_values.append(kmeans.inertia_)
|
|
return inertia_values
|
|
|
|
|
|
def plot_elbow(inertia_values):
|
|
"""
|
|
Plot the inertia values for a range of clusters. Just for fun!
|
|
|
|
:param inertia_values: A list of inertia values.
|
|
|
|
:return: None.
|
|
"""
|
|
plt.plot(inertia_values)
|
|
plt.xlabel('Number of Clusters')
|
|
plt.ylabel('Inertia')
|
|
plt.show()
|
|
|
|
|
|
def determine_optimal_clusters(inertia_values):
|
|
"""
|
|
Determine the optimal number of clusters to use based on the inertia values.
|
|
|
|
:param inertia_values: A list of inertia values.
|
|
|
|
:return: The optimal number of clusters to use.
|
|
"""
|
|
distances = []
|
|
for i in range(len(inertia_values) - 1):
|
|
p1 = np.array([i + 1, inertia_values[i]])
|
|
p2 = np.array([i + 2, inertia_values[i + 1]])
|
|
d = np.linalg.norm(np.cross(p2 - p1, p1 - np.array([1,0]))) / np.linalg.norm(p2 - p1)
|
|
distances.append(d)
|
|
optimal_clusters = distances.index(max(distances)) + 2
|
|
return optimal_clusters |