import os import time import pdfplumber import docx import nltk import gradio as gr from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.embeddings import ( OpenAIEmbeddings, CohereEmbeddings, ) from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import FAISS, Chroma from langchain_text_splitters import ( RecursiveCharacterTextSplitter, TokenTextSplitter, ) from typing import List, Dict, Any import pandas as pd import re from nltk.corpus import stopwords from nltk.tokenize import word_tokenize from nltk.stem import SnowballStemmer import jellyfish # For Kölner Phonetik from gensim.models import Word2Vec from gensim.models.fasttext import FastText from collections import Counter from tokenizers import Tokenizer from tokenizers.models import BPE from tokenizers.trainers import BpeTrainer nltk.download('stopwords', quiet=True) nltk.download('punkt', quiet=True) FILES_DIR = './files' MODELS = { 'HuggingFace': { 'e5-base-de': "danielheinz/e5-base-sts-en-de", 'paraphrase-miniLM': "paraphrase-multilingual-MiniLM-L12-v2", 'paraphrase-mpnet': "paraphrase-multilingual-mpnet-base-v2", 'gte-large': "gte-large", 'gbert-base': "gbert-base" }, 'OpenAI': { 'text-embedding-ada-002': "text-embedding-ada-002" }, 'Cohere': { 'embed-multilingual-v2.0': "embed-multilingual-v2.0" } } def preprocess_text(text, lang='german'): # Convert to lowercase text = text.lower() # Remove special characters and digits text = re.sub(r'[^a-zA-Z\s]', '', text) # Tokenize tokens = word_tokenize(text, language=lang) # Remove stopwords stop_words = set(stopwords.words(lang)) tokens = [token for token in tokens if token not in stop_words] # Stemming stemmer = SnowballStemmer(lang) tokens = [stemmer.stem(token) for token in tokens] return ' '.join(tokens) def phonetic_match(text, query, method='koelner_phonetik'): if method == 'koelner_phonetik': text_phonetic = jellyfish.cologne_phonetic(text) query_phonetic = jellyfish.cologne_phonetic(query) return jellyfish.jaro_winkler(text_phonetic, query_phonetic) # Add other phonetic methods as needed return 0 class FileHandler: @staticmethod def extract_text(file_path): ext = os.path.splitext(file_path)[-1].lower() if ext == '.pdf': return FileHandler._extract_from_pdf(file_path) elif ext == '.docx': return FileHandler._extract_from_docx(file_path) elif ext == '.txt': return FileHandler._extract_from_txt(file_path) else: raise ValueError(f"Unsupported file type: {ext}") @staticmethod def _extract_from_pdf(file_path): with pdfplumber.open(file_path) as pdf: return ' '.join([page.extract_text() for page in pdf.pages]) @staticmethod def _extract_from_docx(file_path): doc = docx.Document(file_path) return ' '.join([para.text for para in doc.paragraphs]) @staticmethod def _extract_from_txt(file_path): with open(file_path, 'r', encoding='utf-8') as f: return f.read() def get_embedding_model(model_type, model_name): if model_type == 'HuggingFace': return HuggingFaceEmbeddings(model_name=MODELS[model_type][model_name]) elif model_type == 'OpenAI': return OpenAIEmbeddings(model=MODELS[model_type][model_name]) elif model_type == 'Cohere': return CohereEmbeddings(model=MODELS[model_type][model_name]) else: raise ValueError(f"Unsupported model type: {model_type}") def get_text_splitter(split_strategy, chunk_size, overlap_size, custom_separators=None): if split_strategy == 'token': return TokenTextSplitter(chunk_size=chunk_size, chunk_overlap=overlap_size) elif split_strategy == 'recursive': return RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=overlap_size, separators=custom_separators or ["\n\n", "\n", " ", ""] ) else: raise ValueError(f"Unsupported split strategy: {split_strategy}") def get_vector_store(vector_store_type, chunks, embedding_model): if vector_store_type == 'FAISS': return FAISS.from_texts(chunks, embedding_model) elif vector_store_type == 'Chroma': return Chroma.from_texts(chunks, embedding_model) else: raise ValueError(f"Unsupported vector store type: {vector_store_type}") def get_retriever(vector_store, search_type, search_kwargs): if search_type == 'similarity': return vector_store.as_retriever(search_type="similarity", search_kwargs=search_kwargs) elif search_type == 'mmr': return vector_store.as_retriever(search_type="mmr", search_kwargs=search_kwargs) elif search_type == 'custom': # Implement custom retriever logic here pass else: raise ValueError(f"Unsupported search type: {search_type}") def process_files(file_path, model_type, model_name, split_strategy, chunk_size, overlap_size, custom_separators, lang='german'): if file_path: text = FileHandler.extract_text(file_path) else: text = "" for file in os.listdir(FILES_DIR): file_path = os.path.join(FILES_DIR, file) text += FileHandler.extract_text(file_path) # Preprocess the text text = preprocess_text(text, lang) text_splitter = get_text_splitter(split_strategy, chunk_size, overlap_size, custom_separators) chunks = text_splitter.split_text(text) embedding_model = get_embedding_model(model_type, model_name) return chunks, embedding_model, len(text.split()) def search_embeddings(chunks, embedding_model, vector_store_type, search_type, query, top_k, lang='german', phonetic_weight=0.3): # Preprocess the query preprocessed_query = preprocess_text(query, lang) vector_store = get_vector_store(vector_store_type, chunks, embedding_model) retriever = get_retriever(vector_store, search_type, {"k": top_k}) start_time = time.time() results = retriever.get_relevant_documents(preprocessed_query) # Apply phonetic matching results = sorted(results, key=lambda x: (1 - phonetic_weight) * vector_store.similarity_search(x.page_content, k=1)[0][1] + phonetic_weight * phonetic_match(x.page_content, query), reverse=True) end_time = time.time() return results[:top_k], end_time - start_time, vector_store def calculate_statistics(results, search_time, vector_store, num_tokens, embedding_model, query, top_k): stats = { "num_results": len(results), "avg_content_length": np.mean([len(doc.page_content) for doc in results]) if results else 0, "search_time": search_time, "vector_store_size": vector_store._index.ntotal if hasattr(vector_store, '_index') else "N/A", "num_documents": len(vector_store.docstore._dict), "num_tokens": num_tokens, "embedding_vocab_size": embedding_model.client.get_vocab_size() if hasattr(embedding_model, 'client') and hasattr(embedding_model.client, 'get_vocab_size') else "N/A", "embedding_dimension": len(embedding_model.embed_query(query)), "top_k": top_k, } # Calculate diversity of results if len(results) > 1: embeddings = [embedding_model.embed_query(doc.page_content) for doc in results] pairwise_similarities = cosine_similarity(embeddings) stats["result_diversity"] = 1 - np.mean(pairwise_similarities[np.triu_indices(len(embeddings), k=1)]) else: stats["result_diversity"] = "N/A" # Calculate rank correlation between embedding similarity and result order query_embedding = embedding_model.embed_query(query) result_embeddings = [embedding_model.embed_query(doc.page_content) for doc in results] similarities = [cosine_similarity([query_embedding], [emb])[0][0] for emb in result_embeddings] rank_correlation, _ = spearmanr(similarities, range(len(similarities))) stats["rank_correlation"] = rank_correlation return stats def create_custom_embedding(texts, model_type='word2vec', vector_size=100, window=5, min_count=1): # Tokenize the texts tokenized_texts = [text.split() for text in texts] if model_type == 'word2vec': model = Word2Vec(sentences=tokenized_texts, vector_size=vector_size, window=window, min_count=min_count, workers=4) elif model_type == 'fasttext': model = FastText(sentences=tokenized_texts, vector_size=vector_size, window=window, min_count=min_count, workers=4) else: raise ValueError("Unsupported model type") return model class CustomEmbeddings(HuggingFaceEmbeddings): def __init__(self, model_path): self.model = Word2Vec.load(model_path) # or FastText.load() for FastText models def embed_documents(self, texts): return [self.model.wv[text.split()] for text in texts] def embed_query(self, text): return self.model.wv[text.split()] def optimize_vocabulary(texts, vocab_size=10000, min_frequency=2): # Count word frequencies word_freq = Counter(word for text in texts for word in text.split()) # Remove rare words optimized_texts = [ ' '.join(word for word in text.split() if word_freq[word] >= min_frequency) for text in texts ] # Train BPE tokenizer tokenizer = Tokenizer(BPE(unk_token="[UNK]")) trainer = BpeTrainer(vocab_size=vocab_size, special_tokens=["[UNK]", "[CLS]", "[SEP]", "[PAD]", "[MASK]"]) tokenizer.train_from_iterator(optimized_texts, trainer) return tokenizer, optimized_texts def compare_embeddings(file, query, model_types, model_names, split_strategy, chunk_size, overlap_size, custom_separators, vector_store_type, search_type, top_k, lang='german', use_custom_embedding=False, optimize_vocab=False, phonetic_weight=0.3): all_results = [] all_stats = [] settings = { "split_strategy": split_strategy, "chunk_size": chunk_size, "overlap_size": overlap_size, "custom_separators": custom_separators, "vector_store_type": vector_store_type, "search_type": search_type, "top_k": top_k, "lang": lang, "use_custom_embedding": use_custom_embedding, "optimize_vocab": optimize_vocab, "phonetic_weight": phonetic_weight } for model_type, model_name in zip(model_types, model_names): chunks, embedding_model, num_tokens = process_files( file.name if file else None, model_type, model_name, split_strategy, chunk_size, overlap_size, custom_separators.split(',') if custom_separators else None, lang ) if use_custom_embedding: custom_model = create_custom_embedding(chunks) embedding_model = CustomEmbeddings(custom_model) if optimize_vocab: tokenizer, optimized_chunks = optimize_vocabulary(chunks) chunks = optimized_chunks results, search_time, vector_store = search_embeddings( chunks, embedding_model, vector_store_type, search_type, query, top_k, lang, phonetic_weight ) stats = calculate_statistics(results, search_time, vector_store, num_tokens, embedding_model, query, top_k) stats["model"] = f"{model_type} - {model_name}" stats.update(settings) formatted_results = format_results(results, stats) all_results.extend(formatted_results) all_stats.append(stats) results_df = pd.DataFrame(all_results) stats_df = pd.DataFrame(all_stats) return results_df, stats_df def format_results(results, stats): formatted_results = [] for doc in results: result = { "Model": stats["model"], "Content": doc.page_content, **doc.metadata, **{k: v for k, v in stats.items() if k not in ["model"]} } formatted_results.append(result) return formatted_results import matplotlib.pyplot as plt import seaborn as sns from sklearn.manifold import TSNE def visualize_results(results_df, stats_df): # Create a figure with subplots fig, axs = plt.subplots(2, 2, figsize=(20, 20)) # 1. Bar plot of search times sns.barplot(x='model', y='search_time', data=stats_df, ax=axs[0, 0]) axs[0, 0].set_title('Search Time by Model') axs[0, 0].set_xticklabels(axs[0, 0].get_xticklabels(), rotation=45, ha='right') # 2. Scatter plot of result diversity vs. rank correlation sns.scatterplot(x='result_diversity', y='rank_correlation', hue='model', data=stats_df, ax=axs[0, 1]) axs[0, 1].set_title('Result Diversity vs. Rank Correlation') # 3. Box plot of content lengths sns.boxplot(x='model', y='content_length', data=results_df, ax=axs[1, 0]) axs[1, 0].set_title('Distribution of Result Content Lengths') axs[1, 0].set_xticklabels(axs[1, 0].get_xticklabels(), rotation=45, ha='right') # 4. t-SNE visualization of embeddings embeddings = np.array(results_df['embedding'].tolist()) tsne = TSNE(n_components=2, random_state=42) embeddings_2d = tsne.fit_transform(embeddings) sns.scatterplot(x=embeddings_2d[:, 0], y=embeddings_2d[:, 1], hue=results_df['model'], ax=axs[1, 1]) axs[1, 1].set_title('t-SNE Visualization of Result Embeddings') plt.tight_layout() return fig def launch_interface(share=True): iface = gr.Interface( fn=compare_embeddings, inputs=[ gr.File(label="Upload File (Optional)"), gr.Textbox(label="Search Query"), gr.CheckboxGroup(choices=list(MODELS.keys()) + ["Custom"], label="Embedding Model Types"), gr.CheckboxGroup(choices=[model for models in MODELS.values() for model in models] + ["custom_model"], label="Embedding Models"), gr.Radio(choices=["token", "recursive"], label="Split Strategy", value="recursive"), gr.Slider(100, 1000, step=100, value=500, label="Chunk Size"), gr.Slider(0, 100, step=10, value=50, label="Overlap Size"), gr.Textbox(label="Custom Split Separators (comma-separated, optional)"), gr.Radio(choices=["FAISS", "Chroma"], label="Vector Store Type", value="FAISS"), gr.Radio(choices=["similarity", "mmr", "custom"], label="Search Type", value="similarity"), gr.Slider(1, 10, step=1, value=5, label="Top K"), gr.Dropdown(choices=["german", "english", "french"], label="Language", value="german"), gr.Checkbox(label="Use Custom Embedding", value=False), gr.Checkbox(label="Optimize Vocabulary", value=False), gr.Slider(0, 1, step=0.1, value=0.3, label="Phonetic Matching Weight") ], outputs=[ gr.Dataframe(label="Results", interactive=False), gr.Dataframe(label="Statistics", interactive=False), gr.Plot(label="Visualizations") ], title="Advanced Embedding Comparison Tool", description="Compare different embedding models and retrieval strategies with advanced preprocessing and phonetic matching" ) tutorial_md = """ # Advanced Embedding Comparison Tool Tutorial ... (update the tutorial to include information about the new features) ... """ iface = gr.TabbedInterface( [iface, gr.Markdown(tutorial_md)], ["Embedding Comparison", "Tutorial"] ) iface.launch(share=share) if __name__ == "__main__": launch_interface()