Chris4K's picture
Update app.py
3a4f84d verified
raw
history blame
15.9 kB
import os
import time
import pdfplumber
import docx
import nltk
import gradio as gr
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.embeddings import (
OpenAIEmbeddings,
CohereEmbeddings,
)
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS, Chroma
from langchain_text_splitters import (
RecursiveCharacterTextSplitter,
TokenTextSplitter,
)
from typing import List, Dict, Any
import pandas as pd
import re
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import SnowballStemmer
import jellyfish # For Kölner Phonetik
from gensim.models import Word2Vec
from gensim.models.fasttext import FastText
from collections import Counter
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
nltk.download('stopwords', quiet=True)
nltk.download('punkt', quiet=True)
FILES_DIR = './files'
MODELS = {
'HuggingFace': {
'e5-base-de': "danielheinz/e5-base-sts-en-de",
'paraphrase-miniLM': "paraphrase-multilingual-MiniLM-L12-v2",
'paraphrase-mpnet': "paraphrase-multilingual-mpnet-base-v2",
'gte-large': "gte-large",
'gbert-base': "gbert-base"
},
'OpenAI': {
'text-embedding-ada-002': "text-embedding-ada-002"
},
'Cohere': {
'embed-multilingual-v2.0': "embed-multilingual-v2.0"
}
}
def preprocess_text(text, lang='german'):
# Convert to lowercase
text = text.lower()
# Remove special characters and digits
text = re.sub(r'[^a-zA-Z\s]', '', text)
# Tokenize
tokens = word_tokenize(text, language=lang)
# Remove stopwords
stop_words = set(stopwords.words(lang))
tokens = [token for token in tokens if token not in stop_words]
# Stemming
stemmer = SnowballStemmer(lang)
tokens = [stemmer.stem(token) for token in tokens]
return ' '.join(tokens)
def phonetic_match(text, query, method='koelner_phonetik'):
if method == 'koelner_phonetik':
text_phonetic = jellyfish.cologne_phonetic(text)
query_phonetic = jellyfish.cologne_phonetic(query)
return jellyfish.jaro_winkler(text_phonetic, query_phonetic)
# Add other phonetic methods as needed
return 0
class FileHandler:
@staticmethod
def extract_text(file_path):
ext = os.path.splitext(file_path)[-1].lower()
if ext == '.pdf':
return FileHandler._extract_from_pdf(file_path)
elif ext == '.docx':
return FileHandler._extract_from_docx(file_path)
elif ext == '.txt':
return FileHandler._extract_from_txt(file_path)
else:
raise ValueError(f"Unsupported file type: {ext}")
@staticmethod
def _extract_from_pdf(file_path):
with pdfplumber.open(file_path) as pdf:
return ' '.join([page.extract_text() for page in pdf.pages])
@staticmethod
def _extract_from_docx(file_path):
doc = docx.Document(file_path)
return ' '.join([para.text for para in doc.paragraphs])
@staticmethod
def _extract_from_txt(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
def get_embedding_model(model_type, model_name):
if model_type == 'HuggingFace':
return HuggingFaceEmbeddings(model_name=MODELS[model_type][model_name])
elif model_type == 'OpenAI':
return OpenAIEmbeddings(model=MODELS[model_type][model_name])
elif model_type == 'Cohere':
return CohereEmbeddings(model=MODELS[model_type][model_name])
else:
raise ValueError(f"Unsupported model type: {model_type}")
def get_text_splitter(split_strategy, chunk_size, overlap_size, custom_separators=None):
if split_strategy == 'token':
return TokenTextSplitter(chunk_size=chunk_size, chunk_overlap=overlap_size)
elif split_strategy == 'recursive':
return RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=overlap_size,
separators=custom_separators or ["\n\n", "\n", " ", ""]
)
else:
raise ValueError(f"Unsupported split strategy: {split_strategy}")
def get_vector_store(vector_store_type, chunks, embedding_model):
if vector_store_type == 'FAISS':
return FAISS.from_texts(chunks, embedding_model)
elif vector_store_type == 'Chroma':
return Chroma.from_texts(chunks, embedding_model)
else:
raise ValueError(f"Unsupported vector store type: {vector_store_type}")
def get_retriever(vector_store, search_type, search_kwargs):
if search_type == 'similarity':
return vector_store.as_retriever(search_type="similarity", search_kwargs=search_kwargs)
elif search_type == 'mmr':
return vector_store.as_retriever(search_type="mmr", search_kwargs=search_kwargs)
elif search_type == 'custom':
# Implement custom retriever logic here
pass
else:
raise ValueError(f"Unsupported search type: {search_type}")
def process_files(file_path, model_type, model_name, split_strategy, chunk_size, overlap_size, custom_separators, lang='german'):
if file_path:
text = FileHandler.extract_text(file_path)
else:
text = ""
for file in os.listdir(FILES_DIR):
file_path = os.path.join(FILES_DIR, file)
text += FileHandler.extract_text(file_path)
# Preprocess the text
text = preprocess_text(text, lang)
text_splitter = get_text_splitter(split_strategy, chunk_size, overlap_size, custom_separators)
chunks = text_splitter.split_text(text)
embedding_model = get_embedding_model(model_type, model_name)
return chunks, embedding_model, len(text.split())
def search_embeddings(chunks, embedding_model, vector_store_type, search_type, query, top_k, lang='german', phonetic_weight=0.3):
# Preprocess the query
preprocessed_query = preprocess_text(query, lang)
vector_store = get_vector_store(vector_store_type, chunks, embedding_model)
retriever = get_retriever(vector_store, search_type, {"k": top_k})
start_time = time.time()
results = retriever.get_relevant_documents(preprocessed_query)
# Apply phonetic matching
results = sorted(results, key=lambda x: (1 - phonetic_weight) * vector_store.similarity_search(x.page_content, k=1)[0][1] +
phonetic_weight * phonetic_match(x.page_content, query),
reverse=True)
end_time = time.time()
return results[:top_k], end_time - start_time, vector_store
def calculate_statistics(results, search_time, vector_store, num_tokens, embedding_model, query, top_k):
stats = {
"num_results": len(results),
"avg_content_length": np.mean([len(doc.page_content) for doc in results]) if results else 0,
"search_time": search_time,
"vector_store_size": vector_store._index.ntotal if hasattr(vector_store, '_index') else "N/A",
"num_documents": len(vector_store.docstore._dict),
"num_tokens": num_tokens,
"embedding_vocab_size": embedding_model.client.get_vocab_size() if hasattr(embedding_model, 'client') and hasattr(embedding_model.client, 'get_vocab_size') else "N/A",
"embedding_dimension": len(embedding_model.embed_query(query)),
"top_k": top_k,
}
# Calculate diversity of results
if len(results) > 1:
embeddings = [embedding_model.embed_query(doc.page_content) for doc in results]
pairwise_similarities = cosine_similarity(embeddings)
stats["result_diversity"] = 1 - np.mean(pairwise_similarities[np.triu_indices(len(embeddings), k=1)])
else:
stats["result_diversity"] = "N/A"
# Calculate rank correlation between embedding similarity and result order
query_embedding = embedding_model.embed_query(query)
result_embeddings = [embedding_model.embed_query(doc.page_content) for doc in results]
similarities = [cosine_similarity([query_embedding], [emb])[0][0] for emb in result_embeddings]
rank_correlation, _ = spearmanr(similarities, range(len(similarities)))
stats["rank_correlation"] = rank_correlation
return stats
def create_custom_embedding(texts, model_type='word2vec', vector_size=100, window=5, min_count=1):
# Tokenize the texts
tokenized_texts = [text.split() for text in texts]
if model_type == 'word2vec':
model = Word2Vec(sentences=tokenized_texts, vector_size=vector_size, window=window, min_count=min_count, workers=4)
elif model_type == 'fasttext':
model = FastText(sentences=tokenized_texts, vector_size=vector_size, window=window, min_count=min_count, workers=4)
else:
raise ValueError("Unsupported model type")
return model
class CustomEmbeddings(HuggingFaceEmbeddings):
def __init__(self, model_path):
self.model = Word2Vec.load(model_path) # or FastText.load() for FastText models
def embed_documents(self, texts):
return [self.model.wv[text.split()] for text in texts]
def embed_query(self, text):
return self.model.wv[text.split()]
def optimize_vocabulary(texts, vocab_size=10000, min_frequency=2):
# Count word frequencies
word_freq = Counter(word for text in texts for word in text.split())
# Remove rare words
optimized_texts = [
' '.join(word for word in text.split() if word_freq[word] >= min_frequency)
for text in texts
]
# Train BPE tokenizer
tokenizer = Tokenizer(BPE(unk_token="[UNK]"))
trainer = BpeTrainer(vocab_size=vocab_size, special_tokens=["[UNK]", "[CLS]", "[SEP]", "[PAD]", "[MASK]"])
tokenizer.train_from_iterator(optimized_texts, trainer)
return tokenizer, optimized_texts
def compare_embeddings(file, query, model_types, model_names, split_strategy, chunk_size, overlap_size, custom_separators, vector_store_type, search_type, top_k, lang='german', use_custom_embedding=False, optimize_vocab=False, phonetic_weight=0.3):
all_results = []
all_stats = []
settings = {
"split_strategy": split_strategy,
"chunk_size": chunk_size,
"overlap_size": overlap_size,
"custom_separators": custom_separators,
"vector_store_type": vector_store_type,
"search_type": search_type,
"top_k": top_k,
"lang": lang,
"use_custom_embedding": use_custom_embedding,
"optimize_vocab": optimize_vocab,
"phonetic_weight": phonetic_weight
}
for model_type, model_name in zip(model_types, model_names):
chunks, embedding_model, num_tokens = process_files(
file.name if file else None,
model_type,
model_name,
split_strategy,
chunk_size,
overlap_size,
custom_separators.split(',') if custom_separators else None,
lang
)
if use_custom_embedding:
custom_model = create_custom_embedding(chunks)
embedding_model = CustomEmbeddings(custom_model)
if optimize_vocab:
tokenizer, optimized_chunks = optimize_vocabulary(chunks)
chunks = optimized_chunks
results, search_time, vector_store = search_embeddings(
chunks,
embedding_model,
vector_store_type,
search_type,
query,
top_k,
lang,
phonetic_weight
)
stats = calculate_statistics(results, search_time, vector_store, num_tokens, embedding_model, query, top_k)
stats["model"] = f"{model_type} - {model_name}"
stats.update(settings)
formatted_results = format_results(results, stats)
all_results.extend(formatted_results)
all_stats.append(stats)
results_df = pd.DataFrame(all_results)
stats_df = pd.DataFrame(all_stats)
return results_df, stats_df
def format_results(results, stats):
formatted_results = []
for doc in results:
result = {
"Model": stats["model"],
"Content": doc.page_content,
**doc.metadata,
**{k: v for k, v in stats.items() if k not in ["model"]}
}
formatted_results.append(result)
return formatted_results
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.manifold import TSNE
def visualize_results(results_df, stats_df):
# Create a figure with subplots
fig, axs = plt.subplots(2, 2, figsize=(20, 20))
# 1. Bar plot of search times
sns.barplot(x='model', y='search_time', data=stats_df, ax=axs[0, 0])
axs[0, 0].set_title('Search Time by Model')
axs[0, 0].set_xticklabels(axs[0, 0].get_xticklabels(), rotation=45, ha='right')
# 2. Scatter plot of result diversity vs. rank correlation
sns.scatterplot(x='result_diversity', y='rank_correlation', hue='model', data=stats_df, ax=axs[0, 1])
axs[0, 1].set_title('Result Diversity vs. Rank Correlation')
# 3. Box plot of content lengths
sns.boxplot(x='model', y='content_length', data=results_df, ax=axs[1, 0])
axs[1, 0].set_title('Distribution of Result Content Lengths')
axs[1, 0].set_xticklabels(axs[1, 0].get_xticklabels(), rotation=45, ha='right')
# 4. t-SNE visualization of embeddings
embeddings = np.array(results_df['embedding'].tolist())
tsne = TSNE(n_components=2, random_state=42)
embeddings_2d = tsne.fit_transform(embeddings)
sns.scatterplot(x=embeddings_2d[:, 0], y=embeddings_2d[:, 1], hue=results_df['model'], ax=axs[1, 1])
axs[1, 1].set_title('t-SNE Visualization of Result Embeddings')
plt.tight_layout()
return fig
def launch_interface(share=True):
iface = gr.Interface(
fn=compare_embeddings,
inputs=[
gr.File(label="Upload File (Optional)"),
gr.Textbox(label="Search Query"),
gr.CheckboxGroup(choices=list(MODELS.keys()) + ["Custom"], label="Embedding Model Types"),
gr.CheckboxGroup(choices=[model for models in MODELS.values() for model in models] + ["custom_model"], label="Embedding Models"),
gr.Radio(choices=["token", "recursive"], label="Split Strategy", value="recursive"),
gr.Slider(100, 1000, step=100, value=500, label="Chunk Size"),
gr.Slider(0, 100, step=10, value=50, label="Overlap Size"),
gr.Textbox(label="Custom Split Separators (comma-separated, optional)"),
gr.Radio(choices=["FAISS", "Chroma"], label="Vector Store Type", value="FAISS"),
gr.Radio(choices=["similarity", "mmr", "custom"], label="Search Type", value="similarity"),
gr.Slider(1, 10, step=1, value=5, label="Top K"),
gr.Dropdown(choices=["german", "english", "french"], label="Language", value="german"),
gr.Checkbox(label="Use Custom Embedding", value=False),
gr.Checkbox(label="Optimize Vocabulary", value=False),
gr.Slider(0, 1, step=0.1, value=0.3, label="Phonetic Matching Weight")
],
outputs=[
gr.Dataframe(label="Results", interactive=False),
gr.Dataframe(label="Statistics", interactive=False),
gr.Plot(label="Visualizations")
],
title="Advanced Embedding Comparison Tool",
description="Compare different embedding models and retrieval strategies with advanced preprocessing and phonetic matching"
)
tutorial_md = """
# Advanced Embedding Comparison Tool Tutorial
... (update the tutorial to include information about the new features) ...
"""
iface = gr.TabbedInterface(
[iface, gr.Markdown(tutorial_md)],
["Embedding Comparison", "Tutorial"]
)
iface.launch(share=share)
if __name__ == "__main__":
launch_interface()