add rag blocks

This commit is contained in:
Aarushi
2024-10-07 16:58:19 +01:00
parent d742019349
commit e02ec32ad3
6 changed files with 577 additions and 0 deletions

View File

@@ -0,0 +1,54 @@
import requests
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField, BlockSecret, SecretField
import uuid
class JinaChunkingBlock(Block):
class Input(BlockSchema):
texts: list = SchemaField(description="List of texts to chunk")
api_key: BlockSecret = SecretField(key="jina_api_key", description="Jina API Key")
max_chunk_length: int = SchemaField(description="Maximum length of each chunk", default=1000)
return_tokens: bool = SchemaField(description="Whether to return token information", default=False)
class Output(BlockSchema):
chunks: list = SchemaField(description="List of chunked texts")
tokens: list = SchemaField(description="List of token information for each chunk", optional=True)
def __init__(self):
super().__init__(
id="806fb15e-830f-4796-8692-557d300ff43c",
description="Chunks texts using Jina AI's segmentation service",
categories={BlockCategory.AI, BlockCategory.TEXT},
input_schema=JinaChunkingBlock.Input,
output_schema=JinaChunkingBlock.Output,
)
def run(self, input_data: Input) -> BlockOutput:
url = 'https://segment.jina.ai/'
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {input_data.api_key.get_secret_value()}'
}
all_chunks = []
all_tokens = []
for text in input_data.texts:
data = {
'content': text,
'return_tokens': str(input_data.return_tokens).lower(),
'return_chunks': 'true',
'max_chunk_length': str(input_data.max_chunk_length)
}
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
result = response.json()
all_chunks.extend(result.get('chunks', []))
if input_data.return_tokens:
all_tokens.extend(result.get('tokens', []))
yield "chunks", all_chunks
if input_data.return_tokens:
yield "tokens", all_tokens

View File

@@ -0,0 +1,36 @@
import requests
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField, BlockSecret, SecretField
import uuid
class JinaEmbeddingBlock(Block):
class Input(BlockSchema):
texts: list = SchemaField(description="List of texts to embed")
api_key: BlockSecret = SecretField(key="jina_api_key", description="Jina API Key")
model: str = SchemaField(description="Jina embedding model to use", default="jina-embeddings-v2-base-en")
class Output(BlockSchema):
embeddings: list = SchemaField(description="List of embeddings")
def __init__(self):
super().__init__(
id="7c56b3ab-62e7-43a2-a2dc-4ec4245660b6",
description="Generates embeddings using Jina AI",
categories={BlockCategory.AI},
input_schema=JinaEmbeddingBlock.Input,
output_schema=JinaEmbeddingBlock.Output,
)
def run(self, input_data: Input) -> BlockOutput:
url = 'https://api.jina.ai/v1/embeddings'
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {input_data.api_key.get_secret_value()}'
}
data = {
'input': input_data.texts,
'model': input_data.model
}
response = requests.post(url, headers=headers, json=data)
embeddings = [e["embedding"] for e in response.json()["data"]]
yield "embeddings", embeddings

View File

@@ -0,0 +1,50 @@
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField, BlockSecret, SecretField
from pinecone import Pinecone, ServerlessSpec
import uuid
class PineconeInitBlock(Block):
class Input(BlockSchema):
api_key: BlockSecret = SecretField(key="pinecone_api_key", description="Pinecone API Key")
index_name: str = SchemaField(description="Name of the Pinecone index")
dimension: int = SchemaField(description="Dimension of the vectors", default=768)
metric: str = SchemaField(description="Distance metric for the index", default="cosine")
cloud: str = SchemaField(description="Cloud provider for serverless", default="aws")
region: str = SchemaField(description="Region for serverless", default="us-east-1")
class Output(BlockSchema):
index: str = SchemaField(description="Name of the initialized Pinecone index")
message: str = SchemaField(description="Status message")
def __init__(self):
super().__init__(
id="48d8fdab-8f03-41f3-8407-8107ba11ec9b",
description="Initializes a Pinecone index",
categories={BlockCategory.LOGIC},
input_schema=PineconeInitBlock.Input,
output_schema=PineconeInitBlock.Output,
)
def run(self, input_data: Input) -> BlockOutput:
pc = Pinecone(api_key=input_data.api_key.get_secret_value())
try:
if input_data.index_name not in pc.list_indexes():
pc.create_index(
name=input_data.index_name,
dimension=input_data.dimension,
metric=input_data.metric,
spec=ServerlessSpec(
cloud=input_data.cloud,
region=input_data.region
)
)
message = f"Created new index: {input_data.index_name}"
else:
message = f"Using existing index: {input_data.index_name}"
# Instead of yielding the index object, we yield the index name
yield "index", input_data.index_name
yield "message", message
except Exception as e:
yield "message", f"Error initializing Pinecone index: {str(e)}"

View File

@@ -0,0 +1,34 @@
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField
import uuid
class PineconeQueryBlock(Block):
class Input(BlockSchema):
index: object = SchemaField(description="Initialized Pinecone index")
query_vector: list = SchemaField(description="Query vector")
namespace: str = SchemaField(description="Namespace to query in Pinecone", default="")
top_k: int = SchemaField(description="Number of top results to return", default=3)
include_values: bool = SchemaField(description="Whether to include vector values in the response", default=False)
include_metadata: bool = SchemaField(description="Whether to include metadata in the response", default=True)
class Output(BlockSchema):
results: dict = SchemaField(description="Query results from Pinecone")
def __init__(self):
super().__init__(
id="9ad93d0f-91b4-4c9c-8eb1-82e26b4a01c5",
description="Queries a Pinecone index",
categories={BlockCategory.LOGIC},
input_schema=PineconeQueryBlock.Input,
output_schema=PineconeQueryBlock.Output,
)
def run(self, input_data: Input) -> BlockOutput:
results = input_data.index.query(
namespace=input_data.namespace,
vector=input_data.query_vector,
top_k=input_data.top_k,
include_values=input_data.include_values,
include_metadata=input_data.include_metadata
)
yield "results", results

View File

@@ -0,0 +1,131 @@
import logging
from enum import Enum
from json import JSONDecodeError
from typing import Any, List, NamedTuple
import anthropic
import ollama
import openai
import requests
from groq import Groq
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import BlockSecret, SchemaField, SecretField
from backend.util import json
from pinecone import Pinecone
logger = logging.getLogger(__name__)
class RAGTechnique(str, Enum):
BASIC = "basic",
COT = "chain_of_thought"
HYDE = "hypothetical_document"
MULTI_QUERY = "multi_query"
class RagPipelineBlock(Block):
class Input(BlockSchema):
index_name: str = SchemaField(description="Name of the Pinecone Index")
pinecone_api_key: BlockSecret = SecretField(key="pinecone_api_key", description="Pinecone API Key")
jina_api_key: BlockSecret = SecretField(key="jina_api_key", description="Jina API Key")
openai_api_key: BlockSecret = SecretField(key="openai_api_key", description="OpenAI API Key")
query: str = SchemaField(description="Natural language query about a topic")
namespace: str = SchemaField(description="Namespace of the topic")
top_k: str = SchemaField(description="Number of top results to return")
rag_technique: str = SchemaField(description="RAG technique to use", default=RAGTechnique.BASIC)
class Output(BlockSchema):
response: dict[str, Any]
error: str
def __init__(self):
super().__init__(
id="0cfcc32b-4526-4729-adb1-2a4628d66feb",
description="Block to query data from pinecone",
categories={BlockCategory.AI, BlockCategory.LOGIC},
input_schema=RagPipelineBlock.Input,
output_schema=RagPipelineBlock.Output,
)
def get_embeddings(self, text: str, api_key: str) -> list:
url = 'https://api.jina.ai/v1/embeddings'
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}'
}
data = {
'input': [text],
'model': 'jina-embeddings-v3'
}
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
def query_pinecone(self, index_name: str, api_key: str, vector: list, namespace: str, top_k: int) -> list:
pc = Pinecone(api_key=api_key)
index = pc.Index(index_name)
results = index.query(vector=vector, top_k=top_k, include_metadata=True, namespace=namespace)
return results.matches
def generate_hypothetical_document(self, query: str, api_key:str) -> str:
openai.api_key = api_key
response = openai.chat.completions.create(
model="gpt4o",
messages=[
{"role": "system", "content": "You are an AI that generates hypothetical documents based on queries from the user."},
{"role": "user", "content": f"write a passage containing information about the following query: {query}"}
],
max_tokens=4096,
n=1,
stop=None,
temperature=0.7
)
def hyde_technique(self, query: str, api_keys: dict) -> str:
hyde_document = self.generate_hypothetical_document(query, api_keys['openai_api_key'])
hyde_embedding = self.get_embeddings(hyde_document, api_keys['jina'])
results = self.query_pinecone(
self.input_data.index_name,
api_keys['pinecone_api_key'],
hyde_embedding,
self.input_data.namespace,
self.input_data.top_k,
)
context = "\n".join(results['metadata']['text'] for result in results)
prompt = f"Based on the following information and only the following information, please answer the question: '{query}'\n\nContext:\n{context}\n\nAnswer:"
openai.api_key = api_keys['openai']
response = openai.chat.completions.create(
model="gpt4o",
messages=[
{"role": "system",
"content": "You are a helpful assistant that answers questions based on the provided context."},
{"role": "user",
"content": prompt}
],
max_tokens=4096,
n=1,
stop=None,
temperature=0.7
)
return response.choices[0].message.content.strip()
def run(self, input_data: Input) -> BlockOutput:
self.input_data = input_data
api_keys = {
'openai': input_data.openai_api_key.get_secret_value(),
'pinecone': input_data.pinecone_api_key.get_secret_value(),
'jina': input_data.jina_api_key.get_secret_value(),
}
try:
if input_data.rag_technique == RAGTechnique.HYDE:
response = self.hyde_technique(input_data.query, api_keys)
else:
raise ValueError(f"Unknown RAG technique {input_data.rag_technique}")
yield "response", response
yield "technique_used", input_data.rag_technique
except Exception as e:
error_message = f"error during query: {str(e)}"
yield "error", error_message
yield "I'm sorry something went wrong when trying to answer your query"
yield "technique_used", "none"

View File

@@ -0,0 +1,272 @@
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField, BlockSecret, SecretField
from pinecone import Pinecone
import requests
import openai
import uuid
from enum import Enum
import ollama
class RAGTechnique(str, Enum):
BASIC = "basic"
COT = "chain_of_thought"
HYDE = "hypothetical_document"
MULTI_QUERY = "multi_query"
class RAGPromptingBlock(Block):
class Input(BlockSchema):
index_name: str = SchemaField(description="Name of the Pinecone index")
pinecone_api_key: BlockSecret = SecretField(key="pinecone_api_key", description="Pinecone API Key")
jina_api_key: BlockSecret = SecretField(key="jina_api_key", description="Jina API Key")
openai_api_key: BlockSecret = SecretField(key="openai_api_key", description="OpenAI API Key")
query: str = SchemaField(description="Natural language query")
namespace: str = SchemaField(description="Namespace to query in Pinecone", default="")
top_k: int = SchemaField(description="Number of top results to retrieve", default=3)
rag_technique: RAGTechnique = SchemaField(description="RAG technique to use", default=RAGTechnique.BASIC)
class Output(BlockSchema):
response: str = SchemaField(description="Natural language response based on retrieved information")
technique_used: str = SchemaField(description="RAG technique used for this query")
error: str = SchemaField(description="Error message if query fails", default="")
def __init__(self):
super().__init__(
id="e9aeec7e-6333-44e7-a80e-4846f2a0b60b",
description="Advanced Pinecone query block with multiple RAG techniques",
categories={BlockCategory.AI, BlockCategory.LOGIC},
input_schema=RAGPromptingBlock.Input,
output_schema=RAGPromptingBlock.Output,
)
def get_embedding(self, text: str, api_key: str) -> list:
url = 'https://api.jina.ai/v1/embeddings'
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}'
}
data = {
'input': [text],
'model': 'jina-embeddings-v2-base-en'
}
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
return response.json()["data"][0]["embedding"]
def query_pinecone(self, index_name: str, api_key: str, vector: list, namespace: str, top_k: int) -> list:
pc = Pinecone(api_key=api_key)
index = pc.Index(index_name)
results = index.query(vector=vector, top_k=top_k, include_metadata=True, namespace=namespace)
return results.matches
def generate_hypothetical_document(self, query: str, api_key: str) -> str:
openai.api_key = api_key
response = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are an AI that generates hypothetical documents based on queries."},
{"role": "user",
"content": f"Write a passage containing information about the following query: {query}"}
],
max_tokens=300,
n=1,
stop=None,
temperature=0.7,
)
return response.choices[0].message.content.strip()
def generate_sub_queries(self, query: str, api_key: str, num_queries: int = 3) -> list:
openai.api_key = api_key
response = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system",
"content": "You are an AI that generates similar sub-queries based on an original query."},
{"role": "user",
"content": f"Generate {num_queries} similar sub-queries for the following query: {query}"}
],
max_tokens=200,
n=1,
stop=None,
temperature=0.7,
)
sub_queries = response.choices[0].message.content.strip().split("\n")
return [sq.split(". ", 1)[-1] for sq in sub_queries] # Remove numbering if present
def basic_technique(self, query: str, api_keys: dict) -> str:
query_embedding = self.get_embedding(query, api_keys['jina'])
results = self.query_pinecone(
self.input_data.index_name,
api_keys['pinecone'],
query_embedding,
self.input_data.namespace,
self.input_data.top_k
)
context = "\n".join([result['metadata']['text'] for result in results])
prompt = f"Based on the following information, please answer the question: '{query}'\n\nContext:\n{context}\n\nAnswer:"
openai.api_key = api_keys['openai']
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system",
"content": "You are a helpful assistant that answers questions based on the provided context."},
{"role": "user", "content": prompt}
],
max_tokens=4096,
n=1,
stop=None,
temperature=0.7,
)
# response = ollama.chat(
# model="qwen2.5",
# messages=[
# {"role": "system",
# "content": "You are a helpful assistant that answers questions based on the provided context."},
# {"role": "user", "content": prompt}
# ],
# )
return response.choices[0].message.content.strip()
def chain_of_thought_technique(self, query: str, api_keys: dict) -> str:
# Retrieve relevant information
query_embedding = self.get_embedding(query, api_keys['jina'])
results = self.query_pinecone(
self.input_data.index_name,
api_keys['pinecone'],
query_embedding,
self.input_data.namespace,
self.input_data.top_k
)
context = "\n".join([result['metadata']['text'] for result in results])
# Construct the CoT prompt
cot_prompt = f"""To answer the question: '{query}', let's approach this step-by-step using the following information:
Context:
{context}
Please follow these steps:
1. Identify the key elements of the question.
2. Analyze the relevant information from the context.
3. Form a logical chain of reasoning.
4. Arrive at a conclusion.
Provide your thought process for each step, then give the final answer.
Step-by-step reasoning:"""
openai.api_key = api_keys['openai']
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a helpful assistant that uses chain-of-thought reasoning to answer questions based on provided context."},
{"role": "user", "content": cot_prompt}
],
max_tokens=4096,
n=1,
stop=None,
temperature=0.7,
)
return response.choices[0].message.content.strip()
def hyde_technique(self, query: str, api_keys: dict) -> str:
hyde_doc = self.generate_hypothetical_document(query, api_keys['openai'])
hyde_embedding = self.get_embedding(hyde_doc, api_keys['jina'])
results = self.query_pinecone(
self.input_data.index_name,
api_keys['pinecone'],
hyde_embedding,
self.input_data.namespace,
self.input_data.top_k
)
context = "\n".join([result['metadata']['text'] for result in results])
prompt = f"Based on the following information, please answer the question: '{query}'\n\nContext:\n{context}\n\nAnswer:"
openai.api_key = api_keys['openai']
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system",
"content": "You are a helpful assistant that answers questions based on the provided context."},
{"role": "user", "content": prompt}
],
max_tokens=4096,
n=1,
stop=None,
temperature=0.7,
)
return response.choices[0].message.content.strip()
def multi_query_technique(self, query: str, api_keys: dict) -> str:
# Generate sub-queries
sub_queries = self.generate_sub_queries(query, api_keys['openai'])
# Retrieve information for each sub-query and the original query
all_contexts = []
for q in [query] + sub_queries:
embedding = self.get_embedding(q, api_keys['jina'])
results = self.query_pinecone(
self.input_data.index_name,
api_keys['pinecone'],
embedding,
self.input_data.namespace,
self.input_data.top_k
)
context = "\n".join([result['metadata']['text'] for result in results])
all_contexts.append(f"Query: {q}\nContext: {context}\n")
# Combine all contexts
combined_context = "\n".join(all_contexts)
# Generate final answer using all retrieved information
prompt = f"""Based on the following information from multiple related queries, please provide a comprehensive answer to the original question: '{query}'
Context from multiple queries:
{combined_context}
Comprehensive Answer:"""
openai.api_key = api_keys['openai']
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system",
"content": "You are a helpful assistant that provides comprehensive answers based on information from multiple related queries."},
{"role": "user", "content": prompt}
],
max_tokens=4096,
n=1,
stop=None,
temperature=0.7,
)
return response.choices[0].message.content.strip()
def run(self, input_data: Input) -> BlockOutput:
self.input_data = input_data
api_keys = {
'openai': input_data.openai_api_key.get_secret_value(),
'pinecone': input_data.pinecone_api_key.get_secret_value(),
'jina': input_data.jina_api_key.get_secret_value()
}
try:
if input_data.rag_technique == RAGTechnique.BASIC:
response = self.basic_technique(input_data.query, api_keys)
elif input_data.rag_technique == RAGTechnique.HYDE:
response = self.hyde_technique(input_data.query, api_keys)
elif input_data.rag_technique == RAGTechnique.MULTI_QUERY:
response = self.multi_query_technique(input_data.query, api_keys)
elif input_data.rag_technique == RAGTechnique.COT:
response = self.chain_of_thought_technique(input_data.query, api_keys)
else:
raise ValueError(f"Unknown RAG technique: {input_data.rag_technique}")
yield "response", response
yield "technique_used", input_data.rag_technique.value
except Exception as e:
error_message = f"Error during query process: {str(e)}"
yield "error", error_message
yield "response", "I'm sorry, but I encountered an error while processing your query."
yield "technique_used", "none"