Compare commits

...

2 Commits

Author SHA1 Message Date
Aarushi
8a1145426a add rag blocks 2024-10-07 17:30:07 +01:00
Aarushi
e02ec32ad3 add rag blocks 2024-10-07 16:58:19 +01:00
5 changed files with 508 additions and 0 deletions

View File

@@ -0,0 +1,63 @@
import requests
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import BlockSecret, SchemaField, SecretField
class JinaChunkingBlock(Block):
class Input(BlockSchema):
texts: list = SchemaField(description="List of texts to chunk")
api_key: BlockSecret = SecretField(
key="jina_api_key", description="Jina API Key"
)
max_chunk_length: int = SchemaField(
description="Maximum length of each chunk", default=1000
)
return_tokens: bool = SchemaField(
description="Whether to return token information", default=False
)
class Output(BlockSchema):
chunks: list = SchemaField(description="List of chunked texts")
tokens: list = SchemaField(
description="List of token information for each chunk", optional=True
)
def __init__(self):
super().__init__(
id="806fb15e-830f-4796-8692-557d300ff43c",
description="Chunks texts using Jina AI's segmentation service",
categories={BlockCategory.AI, BlockCategory.TEXT},
input_schema=JinaChunkingBlock.Input,
output_schema=JinaChunkingBlock.Output,
)
def run(self, input_data: Input, **kwargs) -> BlockOutput:
url = "https://segment.jina.ai/"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {input_data.api_key.get_secret_value()}",
}
all_chunks = []
all_tokens = []
for text in input_data.texts:
data = {
"content": text,
"return_tokens": str(input_data.return_tokens).lower(),
"return_chunks": "true",
"max_chunk_length": str(input_data.max_chunk_length),
}
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
result = response.json()
all_chunks.extend(result.get("chunks", []))
if input_data.return_tokens:
all_tokens.extend(result.get("tokens", []))
yield "chunks", all_chunks
if input_data.return_tokens:
yield "tokens", all_tokens

View File

@@ -0,0 +1,39 @@
import requests
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import BlockSecret, SchemaField, SecretField
class JinaEmbeddingBlock(Block):
class Input(BlockSchema):
texts: list = SchemaField(description="List of texts to embed")
api_key: BlockSecret = SecretField(
key="jina_api_key", description="Jina API Key"
)
model: str = SchemaField(
description="Jina embedding model to use",
default="jina-embeddings-v2-base-en",
)
class Output(BlockSchema):
embeddings: list = SchemaField(description="List of embeddings")
def __init__(self):
super().__init__(
id="7c56b3ab-62e7-43a2-a2dc-4ec4245660b6",
description="Generates embeddings using Jina AI",
categories={BlockCategory.AI},
input_schema=JinaEmbeddingBlock.Input,
output_schema=JinaEmbeddingBlock.Output,
)
def run(self, input_data: Input, **kwargs) -> BlockOutput:
url = "https://api.jina.ai/v1/embeddings"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {input_data.api_key.get_secret_value()}",
}
data = {"input": input_data.texts, "model": input_data.model}
response = requests.post(url, headers=headers, json=data)
embeddings = [e["embedding"] for e in response.json()["data"]]
yield "embeddings", embeddings

View File

@@ -0,0 +1,60 @@
from pinecone import Pinecone, ServerlessSpec
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import BlockSecret, SchemaField, SecretField
class PineconeInitBlock(Block):
class Input(BlockSchema):
api_key: BlockSecret = SecretField(
key="pinecone_api_key", description="Pinecone API Key"
)
index_name: str = SchemaField(description="Name of the Pinecone index")
dimension: int = SchemaField(
description="Dimension of the vectors", default=768
)
metric: str = SchemaField(
description="Distance metric for the index", default="cosine"
)
cloud: str = SchemaField(
description="Cloud provider for serverless", default="aws"
)
region: str = SchemaField(
description="Region for serverless", default="us-east-1"
)
class Output(BlockSchema):
index: str = SchemaField(description="Name of the initialized Pinecone index")
message: str = SchemaField(description="Status message")
def __init__(self):
super().__init__(
id="48d8fdab-8f03-41f3-8407-8107ba11ec9b",
description="Initializes a Pinecone index",
categories={BlockCategory.LOGIC},
input_schema=PineconeInitBlock.Input,
output_schema=PineconeInitBlock.Output,
)
def run(self, input_data: Input, **kwargs) -> BlockOutput:
pc = Pinecone(api_key=input_data.api_key.get_secret_value())
try:
if input_data.index_name not in pc.list_indexes():
pc.create_index(
name=input_data.index_name,
dimension=input_data.dimension,
metric=input_data.metric,
spec=ServerlessSpec(
cloud=input_data.cloud, region=input_data.region
),
)
message = f"Created new index: {input_data.index_name}"
else:
message = f"Using existing index: {input_data.index_name}"
# Instead of yielding the index object, we yield the index name
yield "index", input_data.index_name
yield "message", message
except Exception as e:
yield "message", f"Error initializing Pinecone index: {str(e)}"

View File

@@ -0,0 +1,43 @@
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField
class PineconeQueryBlock(Block):
class Input(BlockSchema):
index: object = SchemaField(description="Initialized Pinecone index")
query_vector: list = SchemaField(description="Query vector")
namespace: str = SchemaField(
description="Namespace to query in Pinecone", default=""
)
top_k: int = SchemaField(
description="Number of top results to return", default=3
)
include_values: bool = SchemaField(
description="Whether to include vector values in the response",
default=False,
)
include_metadata: bool = SchemaField(
description="Whether to include metadata in the response", default=True
)
class Output(BlockSchema):
results: dict = SchemaField(description="Query results from Pinecone")
def __init__(self):
super().__init__(
id="9ad93d0f-91b4-4c9c-8eb1-82e26b4a01c5",
description="Queries a Pinecone index",
categories={BlockCategory.LOGIC},
input_schema=PineconeQueryBlock.Input,
output_schema=PineconeQueryBlock.Output,
)
def run(self, input_data: Input, **kwargs) -> BlockOutput:
results = input_data.index.query(
namespace=input_data.namespace,
vector=input_data.query_vector,
top_k=input_data.top_k,
include_values=input_data.include_values,
include_metadata=input_data.include_metadata,
)
yield "results", results

View File

@@ -0,0 +1,303 @@
from enum import Enum
import openai
import requests
from pinecone import Pinecone
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import BlockSecret, SchemaField, SecretField
class RAGTechnique(str, Enum):
BASIC = "basic"
COT = "chain_of_thought"
HYDE = "hypothetical_document"
MULTI_QUERY = "multi_query"
class RAGPromptingBlock(Block):
class Input(BlockSchema):
index_name: str = SchemaField(description="Name of the Pinecone index")
pinecone_api_key: BlockSecret = SecretField(
key="pinecone_api_key", description="Pinecone API Key"
)
jina_api_key: BlockSecret = SecretField(
key="jina_api_key", description="Jina API Key"
)
openai_api_key: BlockSecret = SecretField(
key="openai_api_key", description="OpenAI API Key"
)
query: str = SchemaField(description="Natural language query")
namespace: str = SchemaField(
description="Namespace to query in Pinecone", default=""
)
top_k: int = SchemaField(
description="Number of top results to retrieve", default=3
)
rag_technique: RAGTechnique = SchemaField(
description="RAG technique to use", default=RAGTechnique.BASIC
)
class Output(BlockSchema):
response: str = SchemaField(
description="Natural language response based on retrieved information"
)
technique_used: str = SchemaField(
description="RAG technique used for this query"
)
error: str = SchemaField(description="Error message if query fails", default="")
def __init__(self):
super().__init__(
id="e9aeec7e-6333-44e7-a80e-4846f2a0b60b",
description="Advanced Pinecone query block with multiple RAG techniques",
categories={BlockCategory.AI, BlockCategory.LOGIC},
input_schema=RAGPromptingBlock.Input,
output_schema=RAGPromptingBlock.Output,
)
def get_embedding(self, text: str, api_key: str) -> list:
url = "https://api.jina.ai/v1/embeddings"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
}
data = {"input": [text], "model": "jina-embeddings-v2-base-en"}
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
return response.json()["data"][0]["embedding"]
def query_pinecone(
self, index_name: str, api_key: str, vector: list, namespace: str, top_k: int
) -> list:
pc = Pinecone(api_key=api_key)
index = pc.Index(index_name)
results = index.query(
vector=vector, top_k=top_k, include_metadata=True, namespace=namespace
)
return results.matches
def generate_hypothetical_document(self, query: str, api_key: str) -> str:
openai.api_key = api_key
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "You are an AI that generates hypothetical documents based on queries.",
},
{
"role": "user",
"content": f"Write a passage containing information about the following query: {query}",
},
],
max_tokens=300,
n=1,
stop=None,
temperature=0.7,
)
return response.choices[0].message.content.strip()
def generate_sub_queries(
self, query: str, api_key: str, num_queries: int = 3
) -> list:
openai.api_key = api_key
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "You are an AI that generates similar sub-queries based on an original query.",
},
{
"role": "user",
"content": f"Generate {num_queries} similar sub-queries for the following query: {query}",
},
],
max_tokens=200,
n=1,
stop=None,
temperature=0.7,
)
sub_queries = response.choices[0].message.content.strip().split("\n")
return [
sq.split(". ", 1)[-1] for sq in sub_queries
] # Remove numbering if present
def basic_technique(self, query: str, api_keys: dict) -> str:
query_embedding = self.get_embedding(query, api_keys["jina"])
results = self.query_pinecone(
self.input_data.index_name,
api_keys["pinecone"],
query_embedding,
self.input_data.namespace,
self.input_data.top_k,
)
context = "\n".join([result["metadata"]["text"] for result in results])
prompt = f"Based on the following information, please answer the question: '{query}'\n\nContext:\n{context}\n\nAnswer:"
openai.api_key = api_keys["openai"]
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "You are a helpful assistant that answers questions based on the provided context.",
},
{"role": "user", "content": prompt},
],
max_tokens=4096,
n=1,
stop=None,
temperature=0.7,
)
return response.choices[0].message.content.strip()
def chain_of_thought_technique(self, query: str, api_keys: dict) -> str:
# Retrieve relevant information
query_embedding = self.get_embedding(query, api_keys["jina"])
results = self.query_pinecone(
self.input_data.index_name,
api_keys["pinecone"],
query_embedding,
self.input_data.namespace,
self.input_data.top_k,
)
context = "\n".join([result["metadata"]["text"] for result in results])
# Construct the CoT prompt
cot_prompt = f"""To answer the question: '{query}', let's approach this step-by-step using the following information:
Context:
{context}
Please follow these steps:
1. Identify the key elements of the question.
2. Analyze the relevant information from the context.
3. Form a logical chain of reasoning.
4. Arrive at a conclusion.
Provide your thought process for each step, then give the final answer.
Step-by-step reasoning:"""
openai.api_key = api_keys["openai"]
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "You are a helpful assistant that uses chain-of-thought reasoning to answer questions based on provided context.",
},
{"role": "user", "content": cot_prompt},
],
max_tokens=4096,
n=1,
stop=None,
temperature=0.7,
)
return response.choices[0].message.content.strip()
def hyde_technique(self, query: str, api_keys: dict) -> str:
hyde_doc = self.generate_hypothetical_document(query, api_keys["openai"])
hyde_embedding = self.get_embedding(hyde_doc, api_keys["jina"])
results = self.query_pinecone(
self.input_data.index_name,
api_keys["pinecone"],
hyde_embedding,
self.input_data.namespace,
self.input_data.top_k,
)
context = "\n".join([result["metadata"]["text"] for result in results])
prompt = f"Based on the following information, please answer the question: '{query}'\n\nContext:\n{context}\n\nAnswer:"
openai.api_key = api_keys["openai"]
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "You are a helpful assistant that answers questions based on the provided context.",
},
{"role": "user", "content": prompt},
],
max_tokens=4096,
n=1,
stop=None,
temperature=0.7,
)
return response.choices[0].message.content.strip()
def multi_query_technique(self, query: str, api_keys: dict) -> str:
# Generate sub-queries
sub_queries = self.generate_sub_queries(query, api_keys["openai"])
# Retrieve information for each sub-query and the original query
all_contexts = []
for q in [query] + sub_queries:
embedding = self.get_embedding(q, api_keys["jina"])
results = self.query_pinecone(
self.input_data.index_name,
api_keys["pinecone"],
embedding,
self.input_data.namespace,
self.input_data.top_k,
)
context = "\n".join([result["metadata"]["text"] for result in results])
all_contexts.append(f"Query: {q}\nContext: {context}\n")
# Combine all contexts
combined_context = "\n".join(all_contexts)
# Generate final answer using all retrieved information
prompt = f"""Based on the following information from multiple related queries, please provide a comprehensive answer to the original question: '{query}'
Context from multiple queries:
{combined_context}
Comprehensive Answer:"""
openai.api_key = api_keys["openai"]
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "You are a helpful assistant that provides comprehensive answers based on information from multiple related queries.",
},
{"role": "user", "content": prompt},
],
max_tokens=4096,
n=1,
stop=None,
temperature=0.7,
)
return response.choices[0].message.content.strip()
def run(self, input_data: Input, **kwargs) -> BlockOutput:
self.input_data = input_data
api_keys = {
"openai": input_data.openai_api_key.get_secret_value(),
"pinecone": input_data.pinecone_api_key.get_secret_value(),
"jina": input_data.jina_api_key.get_secret_value(),
}
try:
if input_data.rag_technique == RAGTechnique.BASIC:
response = self.basic_technique(input_data.query, api_keys)
elif input_data.rag_technique == RAGTechnique.HYDE:
response = self.hyde_technique(input_data.query, api_keys)
elif input_data.rag_technique == RAGTechnique.MULTI_QUERY:
response = self.multi_query_technique(input_data.query, api_keys)
elif input_data.rag_technique == RAGTechnique.COT:
response = self.chain_of_thought_technique(input_data.query, api_keys)
else:
raise ValueError(f"Unknown RAG technique: {input_data.rag_technique}")
yield "response", response
yield "technique_used", input_data.rag_technique.value
except Exception as e:
error_message = f"Error during query process: {str(e)}"
yield "error", error_message
yield "response", "I'm sorry, but I encountered an error while processing your query."
yield "technique_used", "none"