add rag blocks

This commit is contained in:
Aarushi
2024-10-07 17:30:07 +01:00
parent e02ec32ad3
commit 8a1145426a
6 changed files with 194 additions and 263 deletions

View File

@@ -1,18 +1,27 @@
import requests
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField, BlockSecret, SecretField
import uuid
from backend.data.model import BlockSecret, SchemaField, SecretField
class JinaChunkingBlock(Block):
class Input(BlockSchema):
texts: list = SchemaField(description="List of texts to chunk")
api_key: BlockSecret = SecretField(key="jina_api_key", description="Jina API Key")
max_chunk_length: int = SchemaField(description="Maximum length of each chunk", default=1000)
return_tokens: bool = SchemaField(description="Whether to return token information", default=False)
api_key: BlockSecret = SecretField(
key="jina_api_key", description="Jina API Key"
)
max_chunk_length: int = SchemaField(
description="Maximum length of each chunk", default=1000
)
return_tokens: bool = SchemaField(
description="Whether to return token information", default=False
)
class Output(BlockSchema):
chunks: list = SchemaField(description="List of chunked texts")
tokens: list = SchemaField(description="List of token information for each chunk", optional=True)
tokens: list = SchemaField(
description="List of token information for each chunk", optional=True
)
def __init__(self):
super().__init__(
@@ -23,11 +32,11 @@ class JinaChunkingBlock(Block):
output_schema=JinaChunkingBlock.Output,
)
def run(self, input_data: Input) -> BlockOutput:
url = 'https://segment.jina.ai/'
def run(self, input_data: Input, **kwargs) -> BlockOutput:
url = "https://segment.jina.ai/"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {input_data.api_key.get_secret_value()}'
"Content-Type": "application/json",
"Authorization": f"Bearer {input_data.api_key.get_secret_value()}",
}
all_chunks = []
@@ -35,20 +44,20 @@ class JinaChunkingBlock(Block):
for text in input_data.texts:
data = {
'content': text,
'return_tokens': str(input_data.return_tokens).lower(),
'return_chunks': 'true',
'max_chunk_length': str(input_data.max_chunk_length)
"content": text,
"return_tokens": str(input_data.return_tokens).lower(),
"return_chunks": "true",
"max_chunk_length": str(input_data.max_chunk_length),
}
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
result = response.json()
all_chunks.extend(result.get('chunks', []))
all_chunks.extend(result.get("chunks", []))
if input_data.return_tokens:
all_tokens.extend(result.get('tokens', []))
all_tokens.extend(result.get("tokens", []))
yield "chunks", all_chunks
if input_data.return_tokens:
yield "tokens", all_tokens
yield "tokens", all_tokens

View File

@@ -1,13 +1,19 @@
import requests
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField, BlockSecret, SecretField
import uuid
from backend.data.model import BlockSecret, SchemaField, SecretField
class JinaEmbeddingBlock(Block):
class Input(BlockSchema):
texts: list = SchemaField(description="List of texts to embed")
api_key: BlockSecret = SecretField(key="jina_api_key", description="Jina API Key")
model: str = SchemaField(description="Jina embedding model to use", default="jina-embeddings-v2-base-en")
api_key: BlockSecret = SecretField(
key="jina_api_key", description="Jina API Key"
)
model: str = SchemaField(
description="Jina embedding model to use",
default="jina-embeddings-v2-base-en",
)
class Output(BlockSchema):
embeddings: list = SchemaField(description="List of embeddings")
@@ -21,16 +27,13 @@ class JinaEmbeddingBlock(Block):
output_schema=JinaEmbeddingBlock.Output,
)
def run(self, input_data: Input) -> BlockOutput:
url = 'https://api.jina.ai/v1/embeddings'
def run(self, input_data: Input, **kwargs) -> BlockOutput:
url = "https://api.jina.ai/v1/embeddings"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {input_data.api_key.get_secret_value()}'
}
data = {
'input': input_data.texts,
'model': input_data.model
"Content-Type": "application/json",
"Authorization": f"Bearer {input_data.api_key.get_secret_value()}",
}
data = {"input": input_data.texts, "model": input_data.model}
response = requests.post(url, headers=headers, json=data)
embeddings = [e["embedding"] for e in response.json()["data"]]
yield "embeddings", embeddings
yield "embeddings", embeddings

View File

@@ -1,16 +1,27 @@
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField, BlockSecret, SecretField
from pinecone import Pinecone, ServerlessSpec
import uuid
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import BlockSecret, SchemaField, SecretField
class PineconeInitBlock(Block):
class Input(BlockSchema):
api_key: BlockSecret = SecretField(key="pinecone_api_key", description="Pinecone API Key")
api_key: BlockSecret = SecretField(
key="pinecone_api_key", description="Pinecone API Key"
)
index_name: str = SchemaField(description="Name of the Pinecone index")
dimension: int = SchemaField(description="Dimension of the vectors", default=768)
metric: str = SchemaField(description="Distance metric for the index", default="cosine")
cloud: str = SchemaField(description="Cloud provider for serverless", default="aws")
region: str = SchemaField(description="Region for serverless", default="us-east-1")
dimension: int = SchemaField(
description="Dimension of the vectors", default=768
)
metric: str = SchemaField(
description="Distance metric for the index", default="cosine"
)
cloud: str = SchemaField(
description="Cloud provider for serverless", default="aws"
)
region: str = SchemaField(
description="Region for serverless", default="us-east-1"
)
class Output(BlockSchema):
index: str = SchemaField(description="Name of the initialized Pinecone index")
@@ -25,7 +36,7 @@ class PineconeInitBlock(Block):
output_schema=PineconeInitBlock.Output,
)
def run(self, input_data: Input) -> BlockOutput:
def run(self, input_data: Input, **kwargs) -> BlockOutput:
pc = Pinecone(api_key=input_data.api_key.get_secret_value())
try:
@@ -35,9 +46,8 @@ class PineconeInitBlock(Block):
dimension=input_data.dimension,
metric=input_data.metric,
spec=ServerlessSpec(
cloud=input_data.cloud,
region=input_data.region
)
cloud=input_data.cloud, region=input_data.region
),
)
message = f"Created new index: {input_data.index_name}"
else:
@@ -47,4 +57,4 @@ class PineconeInitBlock(Block):
yield "index", input_data.index_name
yield "message", message
except Exception as e:
yield "message", f"Error initializing Pinecone index: {str(e)}"
yield "message", f"Error initializing Pinecone index: {str(e)}"

View File

@@ -1,15 +1,24 @@
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField
import uuid
class PineconeQueryBlock(Block):
class Input(BlockSchema):
index: object = SchemaField(description="Initialized Pinecone index")
query_vector: list = SchemaField(description="Query vector")
namespace: str = SchemaField(description="Namespace to query in Pinecone", default="")
top_k: int = SchemaField(description="Number of top results to return", default=3)
include_values: bool = SchemaField(description="Whether to include vector values in the response", default=False)
include_metadata: bool = SchemaField(description="Whether to include metadata in the response", default=True)
namespace: str = SchemaField(
description="Namespace to query in Pinecone", default=""
)
top_k: int = SchemaField(
description="Number of top results to return", default=3
)
include_values: bool = SchemaField(
description="Whether to include vector values in the response",
default=False,
)
include_metadata: bool = SchemaField(
description="Whether to include metadata in the response", default=True
)
class Output(BlockSchema):
results: dict = SchemaField(description="Query results from Pinecone")
@@ -23,12 +32,12 @@ class PineconeQueryBlock(Block):
output_schema=PineconeQueryBlock.Output,
)
def run(self, input_data: Input) -> BlockOutput:
def run(self, input_data: Input, **kwargs) -> BlockOutput:
results = input_data.index.query(
namespace=input_data.namespace,
vector=input_data.query_vector,
top_k=input_data.top_k,
include_values=input_data.include_values,
include_metadata=input_data.include_metadata
include_metadata=input_data.include_metadata,
)
yield "results", results
yield "results", results

View File

@@ -1,131 +0,0 @@
import logging
from enum import Enum
from json import JSONDecodeError
from typing import Any, List, NamedTuple
import anthropic
import ollama
import openai
import requests
from groq import Groq
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import BlockSecret, SchemaField, SecretField
from backend.util import json
from pinecone import Pinecone
logger = logging.getLogger(__name__)
class RAGTechnique(str, Enum):
BASIC = "basic",
COT = "chain_of_thought"
HYDE = "hypothetical_document"
MULTI_QUERY = "multi_query"
class RagPipelineBlock(Block):
class Input(BlockSchema):
index_name: str = SchemaField(description="Name of the Pinecone Index")
pinecone_api_key: BlockSecret = SecretField(key="pinecone_api_key", description="Pinecone API Key")
jina_api_key: BlockSecret = SecretField(key="jina_api_key", description="Jina API Key")
openai_api_key: BlockSecret = SecretField(key="openai_api_key", description="OpenAI API Key")
query: str = SchemaField(description="Natural language query about a topic")
namespace: str = SchemaField(description="Namespace of the topic")
top_k: str = SchemaField(description="Number of top results to return")
rag_technique: str = SchemaField(description="RAG technique to use", default=RAGTechnique.BASIC)
class Output(BlockSchema):
response: dict[str, Any]
error: str
def __init__(self):
super().__init__(
id="0cfcc32b-4526-4729-adb1-2a4628d66feb",
description="Block to query data from pinecone",
categories={BlockCategory.AI, BlockCategory.LOGIC},
input_schema=RagPipelineBlock.Input,
output_schema=RagPipelineBlock.Output,
)
def get_embeddings(self, text: str, api_key: str) -> list:
url = 'https://api.jina.ai/v1/embeddings'
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}'
}
data = {
'input': [text],
'model': 'jina-embeddings-v3'
}
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
def query_pinecone(self, index_name: str, api_key: str, vector: list, namespace: str, top_k: int) -> list:
pc = Pinecone(api_key=api_key)
index = pc.Index(index_name)
results = index.query(vector=vector, top_k=top_k, include_metadata=True, namespace=namespace)
return results.matches
def generate_hypothetical_document(self, query: str, api_key:str) -> str:
openai.api_key = api_key
response = openai.chat.completions.create(
model="gpt4o",
messages=[
{"role": "system", "content": "You are an AI that generates hypothetical documents based on queries from the user."},
{"role": "user", "content": f"write a passage containing information about the following query: {query}"}
],
max_tokens=4096,
n=1,
stop=None,
temperature=0.7
)
def hyde_technique(self, query: str, api_keys: dict) -> str:
hyde_document = self.generate_hypothetical_document(query, api_keys['openai_api_key'])
hyde_embedding = self.get_embeddings(hyde_document, api_keys['jina'])
results = self.query_pinecone(
self.input_data.index_name,
api_keys['pinecone_api_key'],
hyde_embedding,
self.input_data.namespace,
self.input_data.top_k,
)
context = "\n".join(results['metadata']['text'] for result in results)
prompt = f"Based on the following information and only the following information, please answer the question: '{query}'\n\nContext:\n{context}\n\nAnswer:"
openai.api_key = api_keys['openai']
response = openai.chat.completions.create(
model="gpt4o",
messages=[
{"role": "system",
"content": "You are a helpful assistant that answers questions based on the provided context."},
{"role": "user",
"content": prompt}
],
max_tokens=4096,
n=1,
stop=None,
temperature=0.7
)
return response.choices[0].message.content.strip()
def run(self, input_data: Input) -> BlockOutput:
self.input_data = input_data
api_keys = {
'openai': input_data.openai_api_key.get_secret_value(),
'pinecone': input_data.pinecone_api_key.get_secret_value(),
'jina': input_data.jina_api_key.get_secret_value(),
}
try:
if input_data.rag_technique == RAGTechnique.HYDE:
response = self.hyde_technique(input_data.query, api_keys)
else:
raise ValueError(f"Unknown RAG technique {input_data.rag_technique}")
yield "response", response
yield "technique_used", input_data.rag_technique
except Exception as e:
error_message = f"error during query: {str(e)}"
yield "error", error_message
yield "I'm sorry something went wrong when trying to answer your query"
yield "technique_used", "none"

View File

@@ -1,11 +1,11 @@
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField, BlockSecret, SecretField
from pinecone import Pinecone
import requests
import openai
import uuid
from enum import Enum
import ollama
import openai
import requests
from pinecone import Pinecone
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import BlockSecret, SchemaField, SecretField
class RAGTechnique(str, Enum):
@@ -18,17 +18,33 @@ class RAGTechnique(str, Enum):
class RAGPromptingBlock(Block):
class Input(BlockSchema):
index_name: str = SchemaField(description="Name of the Pinecone index")
pinecone_api_key: BlockSecret = SecretField(key="pinecone_api_key", description="Pinecone API Key")
jina_api_key: BlockSecret = SecretField(key="jina_api_key", description="Jina API Key")
openai_api_key: BlockSecret = SecretField(key="openai_api_key", description="OpenAI API Key")
pinecone_api_key: BlockSecret = SecretField(
key="pinecone_api_key", description="Pinecone API Key"
)
jina_api_key: BlockSecret = SecretField(
key="jina_api_key", description="Jina API Key"
)
openai_api_key: BlockSecret = SecretField(
key="openai_api_key", description="OpenAI API Key"
)
query: str = SchemaField(description="Natural language query")
namespace: str = SchemaField(description="Namespace to query in Pinecone", default="")
top_k: int = SchemaField(description="Number of top results to retrieve", default=3)
rag_technique: RAGTechnique = SchemaField(description="RAG technique to use", default=RAGTechnique.BASIC)
namespace: str = SchemaField(
description="Namespace to query in Pinecone", default=""
)
top_k: int = SchemaField(
description="Number of top results to retrieve", default=3
)
rag_technique: RAGTechnique = SchemaField(
description="RAG technique to use", default=RAGTechnique.BASIC
)
class Output(BlockSchema):
response: str = SchemaField(description="Natural language response based on retrieved information")
technique_used: str = SchemaField(description="RAG technique used for this query")
response: str = SchemaField(
description="Natural language response based on retrieved information"
)
technique_used: str = SchemaField(
description="RAG technique used for this query"
)
error: str = SchemaField(description="Error message if query fails", default="")
def __init__(self):
@@ -41,33 +57,39 @@ class RAGPromptingBlock(Block):
)
def get_embedding(self, text: str, api_key: str) -> list:
url = 'https://api.jina.ai/v1/embeddings'
url = "https://api.jina.ai/v1/embeddings"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}'
}
data = {
'input': [text],
'model': 'jina-embeddings-v2-base-en'
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
}
data = {"input": [text], "model": "jina-embeddings-v2-base-en"}
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
return response.json()["data"][0]["embedding"]
def query_pinecone(self, index_name: str, api_key: str, vector: list, namespace: str, top_k: int) -> list:
def query_pinecone(
self, index_name: str, api_key: str, vector: list, namespace: str, top_k: int
) -> list:
pc = Pinecone(api_key=api_key)
index = pc.Index(index_name)
results = index.query(vector=vector, top_k=top_k, include_metadata=True, namespace=namespace)
results = index.query(
vector=vector, top_k=top_k, include_metadata=True, namespace=namespace
)
return results.matches
def generate_hypothetical_document(self, query: str, api_key: str) -> str:
openai.api_key = api_key
response = openai.chat.completions.create(
model="gpt-3.5-turbo",
model="gpt-4o",
messages=[
{"role": "system", "content": "You are an AI that generates hypothetical documents based on queries."},
{"role": "user",
"content": f"Write a passage containing information about the following query: {query}"}
{
"role": "system",
"content": "You are an AI that generates hypothetical documents based on queries.",
},
{
"role": "user",
"content": f"Write a passage containing information about the following query: {query}",
},
],
max_tokens=300,
n=1,
@@ -76,15 +98,21 @@ class RAGPromptingBlock(Block):
)
return response.choices[0].message.content.strip()
def generate_sub_queries(self, query: str, api_key: str, num_queries: int = 3) -> list:
def generate_sub_queries(
self, query: str, api_key: str, num_queries: int = 3
) -> list:
openai.api_key = api_key
response = openai.chat.completions.create(
model="gpt-3.5-turbo",
model="gpt-4o",
messages=[
{"role": "system",
"content": "You are an AI that generates similar sub-queries based on an original query."},
{"role": "user",
"content": f"Generate {num_queries} similar sub-queries for the following query: {query}"}
{
"role": "system",
"content": "You are an AI that generates similar sub-queries based on an original query.",
},
{
"role": "user",
"content": f"Generate {num_queries} similar sub-queries for the following query: {query}",
},
],
max_tokens=200,
n=1,
@@ -92,54 +120,50 @@ class RAGPromptingBlock(Block):
temperature=0.7,
)
sub_queries = response.choices[0].message.content.strip().split("\n")
return [sq.split(". ", 1)[-1] for sq in sub_queries] # Remove numbering if present
return [
sq.split(". ", 1)[-1] for sq in sub_queries
] # Remove numbering if present
def basic_technique(self, query: str, api_keys: dict) -> str:
query_embedding = self.get_embedding(query, api_keys['jina'])
query_embedding = self.get_embedding(query, api_keys["jina"])
results = self.query_pinecone(
self.input_data.index_name,
api_keys['pinecone'],
api_keys["pinecone"],
query_embedding,
self.input_data.namespace,
self.input_data.top_k
self.input_data.top_k,
)
context = "\n".join([result['metadata']['text'] for result in results])
context = "\n".join([result["metadata"]["text"] for result in results])
prompt = f"Based on the following information, please answer the question: '{query}'\n\nContext:\n{context}\n\nAnswer:"
openai.api_key = api_keys['openai']
openai.api_key = api_keys["openai"]
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system",
"content": "You are a helpful assistant that answers questions based on the provided context."},
{"role": "user", "content": prompt}
{
"role": "system",
"content": "You are a helpful assistant that answers questions based on the provided context.",
},
{"role": "user", "content": prompt},
],
max_tokens=4096,
n=1,
stop=None,
temperature=0.7,
)
# response = ollama.chat(
# model="qwen2.5",
# messages=[
# {"role": "system",
# "content": "You are a helpful assistant that answers questions based on the provided context."},
# {"role": "user", "content": prompt}
# ],
# )
return response.choices[0].message.content.strip()
def chain_of_thought_technique(self, query: str, api_keys: dict) -> str:
# Retrieve relevant information
query_embedding = self.get_embedding(query, api_keys['jina'])
query_embedding = self.get_embedding(query, api_keys["jina"])
results = self.query_pinecone(
self.input_data.index_name,
api_keys['pinecone'],
api_keys["pinecone"],
query_embedding,
self.input_data.namespace,
self.input_data.top_k
self.input_data.top_k,
)
context = "\n".join([result['metadata']['text'] for result in results])
context = "\n".join([result["metadata"]["text"] for result in results])
# Construct the CoT prompt
cot_prompt = f"""To answer the question: '{query}', let's approach this step-by-step using the following information:
@@ -157,12 +181,15 @@ Provide your thought process for each step, then give the final answer.
Step-by-step reasoning:"""
openai.api_key = api_keys['openai']
openai.api_key = api_keys["openai"]
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a helpful assistant that uses chain-of-thought reasoning to answer questions based on provided context."},
{"role": "user", "content": cot_prompt}
{
"role": "system",
"content": "You are a helpful assistant that uses chain-of-thought reasoning to answer questions based on provided context.",
},
{"role": "user", "content": cot_prompt},
],
max_tokens=4096,
n=1,
@@ -172,25 +199,27 @@ Step-by-step reasoning:"""
return response.choices[0].message.content.strip()
def hyde_technique(self, query: str, api_keys: dict) -> str:
hyde_doc = self.generate_hypothetical_document(query, api_keys['openai'])
hyde_embedding = self.get_embedding(hyde_doc, api_keys['jina'])
hyde_doc = self.generate_hypothetical_document(query, api_keys["openai"])
hyde_embedding = self.get_embedding(hyde_doc, api_keys["jina"])
results = self.query_pinecone(
self.input_data.index_name,
api_keys['pinecone'],
api_keys["pinecone"],
hyde_embedding,
self.input_data.namespace,
self.input_data.top_k
self.input_data.top_k,
)
context = "\n".join([result['metadata']['text'] for result in results])
context = "\n".join([result["metadata"]["text"] for result in results])
prompt = f"Based on the following information, please answer the question: '{query}'\n\nContext:\n{context}\n\nAnswer:"
openai.api_key = api_keys['openai']
openai.api_key = api_keys["openai"]
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system",
"content": "You are a helpful assistant that answers questions based on the provided context."},
{"role": "user", "content": prompt}
{
"role": "system",
"content": "You are a helpful assistant that answers questions based on the provided context.",
},
{"role": "user", "content": prompt},
],
max_tokens=4096,
n=1,
@@ -201,20 +230,20 @@ Step-by-step reasoning:"""
def multi_query_technique(self, query: str, api_keys: dict) -> str:
# Generate sub-queries
sub_queries = self.generate_sub_queries(query, api_keys['openai'])
sub_queries = self.generate_sub_queries(query, api_keys["openai"])
# Retrieve information for each sub-query and the original query
all_contexts = []
for q in [query] + sub_queries:
embedding = self.get_embedding(q, api_keys['jina'])
embedding = self.get_embedding(q, api_keys["jina"])
results = self.query_pinecone(
self.input_data.index_name,
api_keys['pinecone'],
api_keys["pinecone"],
embedding,
self.input_data.namespace,
self.input_data.top_k
self.input_data.top_k,
)
context = "\n".join([result['metadata']['text'] for result in results])
context = "\n".join([result["metadata"]["text"] for result in results])
all_contexts.append(f"Query: {q}\nContext: {context}\n")
# Combine all contexts
@@ -228,13 +257,15 @@ Context from multiple queries:
Comprehensive Answer:"""
openai.api_key = api_keys['openai']
openai.api_key = api_keys["openai"]
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system",
"content": "You are a helpful assistant that provides comprehensive answers based on information from multiple related queries."},
{"role": "user", "content": prompt}
{
"role": "system",
"content": "You are a helpful assistant that provides comprehensive answers based on information from multiple related queries.",
},
{"role": "user", "content": prompt},
],
max_tokens=4096,
n=1,
@@ -243,12 +274,12 @@ Comprehensive Answer:"""
)
return response.choices[0].message.content.strip()
def run(self, input_data: Input) -> BlockOutput:
def run(self, input_data: Input, **kwargs) -> BlockOutput:
self.input_data = input_data
api_keys = {
'openai': input_data.openai_api_key.get_secret_value(),
'pinecone': input_data.pinecone_api_key.get_secret_value(),
'jina': input_data.jina_api_key.get_secret_value()
"openai": input_data.openai_api_key.get_secret_value(),
"pinecone": input_data.pinecone_api_key.get_secret_value(),
"jina": input_data.jina_api_key.get_secret_value(),
}
try:
@@ -269,4 +300,4 @@ Comprehensive Answer:"""
error_message = f"Error during query process: {str(e)}"
yield "error", error_message
yield "response", "I'm sorry, but I encountered an error while processing your query."
yield "technique_used", "none"
yield "technique_used", "none"