Building a simple document search using FAISS and OpenAI
Unlike traditional search mechanisms that rely on keyword matching, semantic search delves into the nuances of language, offering results that are contextually aligned with the user's intent. This blog post explores the construction of a semantic search system leveraging the FAISS library and open-source embedding tools, designed to enhance the search experience by understanding and matching based on semantic content.
Installation
First install the necessary libraries:
pip install spacy openai sentence_transformers faiss-cpu python-dotenv pymupdf
python -m spacy download en_core_web_sm
System Architecture
The semantic search system is structured around several key components, each playing a vital role in processing, understanding, and retrieving information:
-
Data Preprocessing: The raw text data undergoes cleaning and chunking, preparing it for further processing. This step is crucial for standardizing the data and ensuring it is in an optimal format for embedding generation. This phase includes handling the cleaning, chunking, and batch loading of text data from files.
-
Embedding Generation: The core of the semantic search system lies in its ability to generate semantic embeddings from text. Utilizing Sentence Transformers, an open-source framework, the text is converted into dense vector representations. These embeddings capture the semantic nuances of the text, enabling the system to understand and compare the meaning of different documents and queries.
-
Indexing and Searching with FAISS: With embeddings in hand, the system uses FAISS, an open-source library developed by Facebook AI, to index these vectors efficiently. FAISS excels in handling large volumes of data, making it an ideal choice for the indexing needs of a semantic search system.
-
Query Processing and Matching: Finally this phase sets up a language model to handle user queries. It translates search queries into embeddings using the same method applied to the document corpus. Then it queries the FAISS index to find the most relevant documents based on semantic similarity, ensuring that search results align closely with the user's intent. The LLM can handle complex, nuanced queries that require an understanding of context, synonyms, and related concepts, and ensures the results are as relevant as possible to the user's intent.
Full implementation with sample data can be found here.
Chunking
Documents can be embedded in whole if they are small enough and the embedding engine allows it. Generally, documents have to be chunked, preferably with some overlap for continuity. Here we use the spaCy library to overlap chunks by one sentence. The cleaning process involves removing unnecessary whitespace and standardizing the text format. This is crucial for maintaining consistency across the dataset, which can significantly impact the performance of the models used later in the pipeline.
After cleaning, the text is chunked into smaller pieces. This step is essential because many embedding models have limitations on the maximum input length they can handle. By chunking the text into smaller, manageable pieces, we ensure that each text fragment can be processed effectively without losing important information due to truncation.
import re
import spacy
def clean_text(text):
"""Cleans the text by removing unnecessary whitespace."""
# Replace multiple spaces with a single space
text = re.sub(r'\s+', ' ', text)
# Strip leading and trailing spaces
text = text.strip()
return text
class NLPOverlapStrategy:
def __init__(self, chunk_size=256, num_of_sentences_to_overlap=1):
self.chunk_size = chunk_size
self.num_of_sentences_to_overlap = num_of_sentences_to_overlap
# Load spaCy English language model
# https://spacy.io/models/en
self.nlp = spacy.load("en_core_web_sm")
def get_chunks(self, data):
"""Chunks the text into natural segments based on sentence end using spaCy, with overlap."""
chunks = []
current_chunk = []
if isinstance(data, str):
data = [data]
for text in data:
doc = self.nlp(text)
sentences = [sentence.text.strip() for sentence in doc.sents] # Extract sentences as clean text
for sentence in sentences:
current_chunk.append(clean_text(sentence))
chunk_text = ' '.join(current_chunk).strip()
if len(chunk_text) >= self.chunk_size:
# When the current chunk reaches the maximum size,
# save the chunk, and start a new one considering the overlap
chunks.append(chunk_text)
if 0 < self.num_of_sentences_to_overlap < len(current_chunk):
# Start the next chunk with the last 'overlap_size' sentences from the current chunk
current_chunk = current_chunk[-self.num_of_sentences_to_overlap:]
else:
current_chunk = []
# Add the last chunk if it's not empty
if current_chunk:
chunks.append(' '.join(current_chunk).strip())
return chunks
Data loading
The data loader reads pdf and txt files for a directory and uses a chunking strategy such as above to create chunks and batches
import os
import fitz # PyMuPDF
def parse_pdf(file_path):
"""Parses the PDF at the given file_path and returns the extracted text."""
text = ''
try:
with fitz.open(file_path) as doc:
for page in doc:
text += page.get_text() # Extract text from each page
except Exception as e:
print(f"Failed to parse PDF: {e}") # Handle exceptions gracefully
return text
def read_text_file(file_path):
"""Reads the text file at the given file_path and returns the text."""
with open(file_path, "r") as f:
text = f.read() # Read the entire text file content
return text
class DataLoader:
def __init__(self, directory, chunker, batch_size=512):
self.directory = directory
self.chunker = chunker
self.batch_size = batch_size # Control the batch size for processing
def load(self):
"""Iterates over files in the specified directory, yielding text from txt and pdf files."""
for root, _, files in os.walk(self.directory):
for file in files:
print(f"Loading file: {os.path.join(root, file)}")
file_path = os.path.join(root, file)
# Yield text content based on file extension
if file_path.endswith('.txt'):
yield read_text_file(file_path)
elif file_path.endswith('.pdf'):
yield parse_pdf(file_path)
def get_chunks(self):
"""Utilizes the chunker to break down the loaded text into manageable pieces."""
return self.chunker.get_chunks(self.load())
def get_chunk_batches(self):
"""Yields batches of text chunks, adhering to the specified batch size."""
chunks = []
for chunk in self.get_chunks():
chunks.append(chunk)
if len(chunks) == self.batch_size:
yield chunks # Yield a full batch
chunks = []
if len(chunks) > 0:
yield chunks # Yield any remaining chunks as the last batch
def __iter__(self):
"""Makes DataLoader iterable, yielding batches of chunks."""
return self.get_chunk_batches()
Embedding
Next, the text is converted into semantic vectors using the Sentence Transformers library. Sentence Transformers is built on top of the Transformers library. We store the embeddings in FAISS for indexing and enabling efficient searching.
from sentence_transformers import SentenceTransformer
from tqdm import tqdm
import faiss
import numpy as np
class OpenSourceEmbedding:
def __init__(self, model_name='sentence-transformers/all-MiniLM-L6-v2'):
# Load the SentenceTransformer model
self.model = SentenceTransformer(model_name)
self.dimensions = 384 # Model embedding dimensions
self.name = model_name.split('/')[1] # Extract model name
def get_embedding(self, text):
# Preprocess text and generate embedding
text = text.replace('\n', ' ') # Replace newlines with space
return self.model.encode(text, convert_to_numpy=True)
class InMemoryIndex:
def __init__(self, loader, embed=OpenSourceEmbedding()):
self.index = None # FAISS index for embeddings
self.loader = loader # DataLoader instance
self.embed = embed # Embedding model instance
self.build_index() # Build index upon initialization
def build_index(self):
# Builds the index using embeddings from the loader
self.content_chunks = [] # Store original text chunks
self.index = None # Initialize FAISS index
for chunk_batch in tqdm(self.loader):
embeddings = self.get_embeddings(chunk_batch) # Get embeddings for a batch of text chunks
if self.index is None:
# Initialize the index with the dimensionality of the embeddings
self.index = faiss.IndexFlatL2(len(embeddings[0]))
self.index.add(embeddings) # Add embeddings to the index
self.content_chunks.extend(chunk_batch) # Keep track of the original text chunks
def get_embeddings(self, data):
# Generate embeddings for a list of text data
embedding_list = [self.embed.get_embedding(text) for text in data]
return np.array(embedding_list)
def query(self, query, k=5):
# Perform a query to find the k most similar text chunks
embedding = self.get_embeddings([query])[0] # Get embedding for the query
embedding_array = np.array([embedding])
_, indices = self.index.search(embedding_array, k) # Search the index
return [self.content_chunks[i] for i in indices[0]] # Return the k most similar text chunks
Query processing and matching
This takes a user query, generates its embedding using the same method applied to the indexed data, and then queries the FAISS index to retrieve the most relevant results. The LLM then aggregates the matching documents to generate a response, hopefully matching the user's query. This approach ensures that the search results are based on the semantic meaning of the query and the text, leading to more accurate and contextually relevant search outcomes.
from openai import OpenAI
import os
# Retrieve the OpenAI API key from environment variables
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# Define a system prompt guiding the model's behavior
SYSTEM_PROMPT = '''
You are a helpful assistant that can only reference material from a knowledge base.
You do not like using any of your general knowledge.
You may only use information prefixed by "Explicit knowledge base:"
If the question cannot be answered only using information from the knowledge base, say instead "I'm sorry I cannot answer that."
Start every answer with a justification of whether their question can be answered from the explicit knowledge base provided.
'''.strip()
class OpenaiQueryEngine:
def __init__(self, data_index, num_results=10):
# Initialize the query engine with a data index and desired number of results
self.data_index = data_index
self.num_results = num_results
# Initialize the OpenAI client with the API key
self.client = OpenAI(api_key=OPENAI_API_KEY)
def answer(self, query):
# Get similar items from the data index based on the query
similar = self.data_index.query(query)
# Use OpenAI's chat API to generate a response, incorporating the system prompt
# and the query with the explicit knowledge base information
response = self.client.chat.completions.create(
model='gpt-4-0125-preview',
messages=[
{"role": "system", "content": f"{SYSTEM_PROMPT}."},
{"role": "user", "content": f"{query}. Explicit knowledge base: {similar}"}
]
)
# Extract and return the content of the response
return response.choices[0].message.content.strip()
Bringing It All Together
Finally, we bring all these components into a cohesive workflow. From data preprocessing and embedding generation to indexing and query processing, each step is designed to work seamlessly together, providing a powerful tool for semantic search applications.
from chunking import NLPOverlapStrategy
from data_index import InMemoryIndex
from data_loader import DataLoader
from embedding import OpenSourceEmbedding
from query_engine import OpenaiQueryEngine
class RetrievalAugmentedRunner:
def __init__(self, query_engine, data_indexer, k=5):
# Initialize with a query engine, data indexer, and the number of results to retrieve
self.k = k
self.index = data_indexer
self.query = query_engine
def __call__(self, query):
# Use the query engine to answer a given query
return self.query.answer(query)
def main():
# Setup for data processing and querying
chunker = NLPOverlapStrategy(chunk_size=256)
data_loader = DataLoader('data', batch_size=128, chunker=chunker)
# Create an in-memory index for fast retrieval of data
index = InMemoryIndex(data_loader, embed=OpenSourceEmbedding())
# Initialize the query engine with the data index
query_engine = OpenaiQueryEngine(index, 3)
# Configure the retrieval-augmented runner with the query engine and index
runner = RetrievalAugmentedRunner(query_engine, index, k=3)
while True:
question = input("Enter your question: ")
if question == ":q":
break # Exit the loop if ":q" is entered
result = runner(question) # Run the query through the retrieval-augmented runner
print(result) # Print the result
if __name__ == "__main__":
main() # Execute the main function
Remember to add the files you want to query in the named directory.