# Embeddings_tabc.py # Description: This file contains the code for the RAG Chat tab in the Gradio UI # # Imports import json import logging import os # # External Imports import gradio as gr import numpy as np from tqdm import tqdm # # Local Imports from App_Function_Libraries.DB.DB_Manager import get_all_content_from_database, get_all_conversations, \ get_conversation_text, get_note_by_id from App_Function_Libraries.DB.RAG_QA_Chat_DB import get_all_notes from App_Function_Libraries.RAG.ChromaDB_Library import chroma_client, \ store_in_chroma, situate_context from App_Function_Libraries.RAG.Embeddings_Create import create_embedding, create_embeddings_batch from App_Function_Libraries.Chunk_Lib import improved_chunking_process, chunk_for_embedding from App_Function_Libraries.Utils.Utils import load_and_log_configs # ######################################################################################################################## # # Functions: def create_embeddings_tab(): # Load configuration first config = load_and_log_configs() if not config: raise ValueError("Could not load configuration") # Get database paths from config db_config = config['db_config'] media_db_path = db_config['sqlite_path'] rag_qa_db_path = os.path.join(os.path.dirname(media_db_path), "rag_qa.db") character_chat_db_path = os.path.join(os.path.dirname(media_db_path), "chatDB.db") chroma_db_path = db_config['chroma_db_path'] with gr.TabItem("Create Embeddings", visible=True): gr.Markdown("# Create Embeddings for All Content") with gr.Row(): with gr.Column(): # Database selection at the top database_selection = gr.Radio( choices=["Media DB", "RAG Chat", "Character Chat"], label="Select Content Source", value="Media DB", info="Choose which database to create embeddings from" ) # Add database path display current_db_path = gr.Textbox( label="Current Database Path", value=media_db_path, interactive=False ) embedding_provider = gr.Radio( choices=["huggingface", "local", "openai"], label="Select Embedding Provider", value=config['embedding_config']['embedding_provider'] or "huggingface" ) gr.Markdown("Note: Local provider requires a running Llama.cpp/llamafile server.") gr.Markdown("OpenAI provider requires a valid API key.") huggingface_model = gr.Dropdown( choices=[ "jinaai/jina-embeddings-v3", "Alibaba-NLP/gte-large-en-v1.5", "dunzhang/setll_en_400M_v5", "custom" ], label="Hugging Face Model", value="jinaai/jina-embeddings-v3", visible=True ) openai_model = gr.Dropdown( choices=[ "text-embedding-3-small", "text-embedding-3-large" ], label="OpenAI Embedding Model", value="text-embedding-3-small", visible=False ) custom_embedding_model = gr.Textbox( label="Custom Embedding Model", placeholder="Enter your custom embedding model name here", visible=False ) embedding_api_url = gr.Textbox( label="API URL (for local provider)", value=config['embedding_config']['embedding_api_url'], visible=False ) # Add chunking options with config defaults chunking_method = gr.Dropdown( choices=["words", "sentences", "paragraphs", "tokens", "semantic"], label="Chunking Method", value="words" ) max_chunk_size = gr.Slider( minimum=1, maximum=8000, step=1, value=config['embedding_config']['chunk_size'], label="Max Chunk Size" ) chunk_overlap = gr.Slider( minimum=0, maximum=4000, step=1, value=config['embedding_config']['overlap'], label="Chunk Overlap" ) adaptive_chunking = gr.Checkbox( label="Use Adaptive Chunking", value=False ) create_button = gr.Button("Create Embeddings") with gr.Column(): status_output = gr.Textbox(label="Status", lines=10) progress = gr.Progress() def update_provider_options(provider): if provider == "huggingface": return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) elif provider == "local": return gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True) else: # OpenAI return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False) def update_huggingface_options(model): if model == "custom": return gr.update(visible=True) else: return gr.update(visible=False) def update_database_path(database_type): if database_type == "Media DB": return media_db_path elif database_type == "RAG Chat": return rag_qa_db_path else: # Character Chat return character_chat_db_path def create_all_embeddings(provider, hf_model, openai_model, custom_model, api_url, method, max_size, overlap, adaptive, database_type, progress=gr.Progress()): try: # Initialize content based on database selection if database_type == "Media DB": all_content = get_all_content_from_database() content_type = "media" elif database_type == "RAG Chat": all_content = [] page = 1 while True: conversations, total_pages, _ = get_all_conversations(page=page) if not conversations: break all_content.extend([{ 'id': conv['conversation_id'], 'content': get_conversation_text(conv['conversation_id']), 'title': conv['title'], 'type': 'conversation' } for conv in conversations]) progress(page / total_pages, desc=f"Loading conversations... Page {page}/{total_pages}") page += 1 else: # Character Chat all_content = [] page = 1 while True: notes, total_pages, _ = get_all_notes(page=page) if not notes: break all_content.extend([{ 'id': note['id'], 'content': f"{note['title']}\n\n{note['content']}", 'conversation_id': note['conversation_id'], 'type': 'note' } for note in notes]) progress(page / total_pages, desc=f"Loading notes... Page {page}/{total_pages}") page += 1 if not all_content: return "No content found in the selected database." chunk_options = { 'method': method, 'max_size': max_size, 'overlap': overlap, 'adaptive': adaptive } collection_name = f"{database_type.lower().replace(' ', '_')}_embeddings" collection = chroma_client.get_or_create_collection(name=collection_name) # Determine the model to use if provider == "huggingface": model = custom_model if hf_model == "custom" else hf_model elif provider == "openai": model = openai_model else: model = api_url total_items = len(all_content) for idx, item in enumerate(all_content): progress((idx + 1) / total_items, desc=f"Processing item {idx + 1} of {total_items}") content_id = item['id'] text = item['content'] chunks = improved_chunking_process(text, chunk_options) for chunk_idx, chunk in enumerate(chunks): chunk_text = chunk['text'] chunk_id = f"{database_type.lower()}_{content_id}_chunk_{chunk_idx}" try: embedding = create_embedding(chunk_text, provider, model, api_url) metadata = { 'content_id': str(content_id), 'chunk_index': int(chunk_idx), 'total_chunks': int(len(chunks)), 'chunking_method': method, 'max_chunk_size': int(max_size), 'chunk_overlap': int(overlap), 'adaptive_chunking': bool(adaptive), 'embedding_model': model, 'embedding_provider': provider, 'content_type': item.get('type', 'media'), 'conversation_id': item.get('conversation_id'), **{k: (int(v) if isinstance(v, str) and v.isdigit() else v) for k, v in chunk['metadata'].items()} } store_in_chroma(collection_name, [chunk_text], [embedding], [chunk_id], [metadata]) except Exception as e: logging.error(f"Error processing chunk {chunk_id}: {str(e)}") continue return f"Embeddings created and stored successfully for all {database_type} content." except Exception as e: logging.error(f"Error during embedding creation: {str(e)}") return f"Error: {str(e)}" # Event handlers embedding_provider.change( fn=update_provider_options, inputs=[embedding_provider], outputs=[huggingface_model, openai_model, custom_embedding_model, embedding_api_url] ) huggingface_model.change( fn=update_huggingface_options, inputs=[huggingface_model], outputs=[custom_embedding_model] ) database_selection.change( fn=update_database_path, inputs=[database_selection], outputs=[current_db_path] ) create_button.click( fn=create_all_embeddings, inputs=[ embedding_provider, huggingface_model, openai_model, custom_embedding_model, embedding_api_url, chunking_method, max_chunk_size, chunk_overlap, adaptive_chunking, database_selection ], outputs=status_output ) def create_view_embeddings_tab(): # Load configuration first config = load_and_log_configs() if not config: raise ValueError("Could not load configuration") # Get database paths from config db_config = config['db_config'] media_db_path = db_config['sqlite_path'] rag_qa_db_path = os.path.join(os.path.dirname(media_db_path), "rag_chat.db") character_chat_db_path = os.path.join(os.path.dirname(media_db_path), "character_chat.db") chroma_db_path = db_config['chroma_db_path'] with gr.TabItem("View/Update Embeddings", visible=True): gr.Markdown("# View and Update Embeddings") # Initialize item_mapping as a Gradio State with gr.Row(): with gr.Column(): # Add database selection database_selection = gr.Radio( choices=["Media DB", "RAG Chat", "Character Chat"], label="Select Content Source", value="Media DB", info="Choose which database to view embeddings from" ) # Add database path display current_db_path = gr.Textbox( label="Current Database Path", value=media_db_path, interactive=False ) item_dropdown = gr.Dropdown(label="Select Item", choices=[], interactive=True) refresh_button = gr.Button("Refresh Item List") embedding_status = gr.Textbox(label="Embedding Status", interactive=False) embedding_preview = gr.Textbox(label="Embedding Preview", interactive=False, lines=5) embedding_metadata = gr.Textbox(label="Embedding Metadata", interactive=False, lines=10) with gr.Column(): create_new_embedding_button = gr.Button("Create New Embedding") embedding_provider = gr.Radio( choices=["huggingface", "local", "openai"], label="Select Embedding Provider", value="huggingface" ) gr.Markdown("Note: Local provider requires a running Llama.cpp/llamafile server.") gr.Markdown("OpenAI provider requires a valid API key.") huggingface_model = gr.Dropdown( choices=[ "jinaai/jina-embeddings-v3", "Alibaba-NLP/gte-large-en-v1.5", "dunzhang/stella_en_400M_v5", "custom" ], label="Hugging Face Model", value="jinaai/jina-embeddings-v3", visible=True ) openai_model = gr.Dropdown( choices=[ "text-embedding-3-small", "text-embedding-3-large" ], label="OpenAI Embedding Model", value="text-embedding-3-small", visible=False ) custom_embedding_model = gr.Textbox( label="Custom Embedding Model", placeholder="Enter your custom embedding model name here", visible=False ) embedding_api_url = gr.Textbox( label="API URL (for local provider)", value=config['embedding_config']['embedding_api_url'], visible=False ) chunking_method = gr.Dropdown( choices=["words", "sentences", "paragraphs", "tokens", "semantic"], label="Chunking Method", value="words" ) max_chunk_size = gr.Slider( minimum=1, maximum=8000, step=5, value=500, label="Max Chunk Size" ) chunk_overlap = gr.Slider( minimum=0, maximum=5000, step=5, value=200, label="Chunk Overlap" ) adaptive_chunking = gr.Checkbox( label="Use Adaptive Chunking", value=False ) contextual_api_choice = gr.Dropdown( choices=["Local-LLM", "OpenAI", "Anthropic", "Cohere", "Groq", "DeepSeek", "Mistral", "OpenRouter", "Llama.cpp", "Kobold", "Ooba", "Tabbyapi", "VLLM", "ollama", "HuggingFace"], label="Select API for Contextualized Embeddings", value="OpenAI" ) use_contextual_embeddings = gr.Checkbox( label="Use Contextual Embeddings", value=True ) contextual_api_key = gr.Textbox(label="API Key", lines=1) item_mapping = gr.State(value={}) def update_database_path(database_type): if database_type == "Media DB": return media_db_path elif database_type == "RAG Chat": return rag_qa_db_path else: # Character Chat return character_chat_db_path def get_items_with_embedding_status(database_type): try: # Get items based on database selection if database_type == "Media DB": items = get_all_content_from_database() elif database_type == "RAG Chat": conversations, _, _ = get_all_conversations(page=1) items = [{ 'id': conv['conversation_id'], 'title': conv['title'], 'type': 'conversation' } for conv in conversations] else: # Character Chat notes, _, _ = get_all_notes(page=1) items = [{ 'id': note['id'], 'title': note['title'], 'type': 'note' } for note in notes] collection_name = f"{database_type.lower().replace(' ', '_')}_embeddings" collection = chroma_client.get_or_create_collection(name=collection_name) choices = [] new_item_mapping = {} for item in items: try: chunk_id = f"{database_type.lower()}_{item['id']}_chunk_0" result = collection.get(ids=[chunk_id]) embedding_exists = result is not None and result.get('ids') and len(result['ids']) > 0 status = "Embedding exists" if embedding_exists else "No embedding" except Exception as e: print(f"Error checking embedding for item {item['id']}: {str(e)}") status = "Error checking" choice = f"{item['title']} ({status})" choices.append(choice) new_item_mapping[choice] = item['id'] return gr.update(choices=choices), new_item_mapping except Exception as e: print(f"Error in get_items_with_embedding_status: {str(e)}") return gr.update(choices=["Error: Unable to fetch items"]), {} def update_provider_options(provider): if provider == "huggingface": return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) elif provider == "local": return gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True) else: # OpenAI return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False) def update_huggingface_options(model): if model == "custom": return gr.update(visible=True) else: return gr.update(visible=False) def check_embedding_status(selected_item, database_type, item_mapping): if not selected_item: return "Please select an item", "", "" if item_mapping is None: # If mapping is None, try to refresh it try: _, item_mapping = get_items_with_embedding_status(database_type) except Exception as e: return f"Error initializing item mapping: {str(e)}", "", "" try: item_id = item_mapping.get(selected_item) if item_id is None: return f"Invalid item selected: {selected_item}", "", "" item_title = selected_item.rsplit(' (', 1)[0] collection_name = f"{database_type.lower().replace(' ', '_')}_embeddings" collection = chroma_client.get_or_create_collection(name=collection_name) chunk_id = f"{database_type.lower()}_{item_id}_chunk_0" try: result = collection.get(ids=[chunk_id], include=["embeddings", "metadatas"]) except Exception as e: logging.error(f"ChromaDB get error: {str(e)}") return f"Error retrieving embedding for '{item_title}': {str(e)}", "", "" # Check if result exists and has the expected structure if not result or not isinstance(result, dict): return f"No embedding found for item '{item_title}' (ID: {item_id})", "", "" # Check if we have any results if not result.get('ids') or len(result['ids']) == 0: return f"No embedding found for item '{item_title}' (ID: {item_id})", "", "" # Check if embeddings exist if not result.get('embeddings') or not result['embeddings'][0]: return f"Embedding data missing for item '{item_title}' (ID: {item_id})", "", "" embedding = result['embeddings'][0] metadata = result.get('metadatas', [{}])[0] if result.get('metadatas') else {} embedding_preview = str(embedding[:50]) status = f"Embedding exists for item '{item_title}' (ID: {item_id})" return status, f"First 50 elements of embedding:\n{embedding_preview}", json.dumps(metadata, indent=2) except Exception as e: logging.error(f"Error in check_embedding_status: {str(e)}", exc_info=True) return f"Error processing item: {selected_item}. Details: {str(e)}", "", "" def refresh_and_update(database_type): choices_update, new_mapping = get_items_with_embedding_status(database_type) return choices_update, new_mapping def create_new_embedding_for_item(selected_item, database_type, provider, hf_model, openai_model, custom_model, api_url, method, max_size, overlap, adaptive, item_mapping, use_contextual, contextual_api_choice=None): if not selected_item: return "Please select an item", "", "" try: item_id = item_mapping.get(selected_item) if item_id is None: return f"Invalid item selected: {selected_item}", "", "" # Get item content based on database type if database_type == "Media DB": items = get_all_content_from_database() item = next((item for item in items if item['id'] == item_id), None) elif database_type == "RAG Chat": item = { 'id': item_id, 'content': get_conversation_text(item_id), 'title': selected_item.rsplit(' (', 1)[0], 'type': 'conversation' } else: # Character Chat note = get_note_by_id(item_id) item = { 'id': item_id, 'content': f"{note['title']}\n\n{note['content']}", 'title': note['title'], 'type': 'note' } if not item: return f"Item not found: {item_id}", "", "" chunk_options = { 'method': method, 'max_size': max_size, 'overlap': overlap, 'adaptive': adaptive } logging.info(f"Chunking content for item: {item['title']} (ID: {item_id})") chunks = chunk_for_embedding(item['content'], item['title'], chunk_options) collection_name = f"{database_type.lower().replace(' ', '_')}_embeddings" collection = chroma_client.get_or_create_collection(name=collection_name) # Delete existing embeddings for this item existing_ids = [f"{database_type.lower()}_{item_id}_chunk_{i}" for i in range(len(chunks))] collection.delete(ids=existing_ids) logging.info(f"Deleted {len(existing_ids)} existing embeddings for item {item_id}") texts, ids, metadatas = [], [], [] chunk_count = 0 logging.info("Generating contextual summaries and preparing chunks for embedding") for i, chunk in enumerate(chunks): chunk_text = chunk['text'] chunk_metadata = chunk['metadata'] if use_contextual: logging.debug(f"Generating contextual summary for chunk {chunk_count}") context = situate_context(contextual_api_choice, item['content'], chunk_text) contextualized_text = f"{chunk_text}\n\nContextual Summary: {context}" else: contextualized_text = chunk_text context = None chunk_id = f"{database_type.lower()}_{item_id}_chunk_{i}" # Determine the model to use if provider == "huggingface": model = custom_model if hf_model == "custom" else hf_model elif provider == "openai": model = openai_model else: model = custom_model metadata = { "content_id": str(item_id), "chunk_index": i, "total_chunks": len(chunks), "chunking_method": method, "max_chunk_size": max_size, "chunk_overlap": overlap, "adaptive_chunking": adaptive, "embedding_model": model, "embedding_provider": provider, "original_text": chunk_text, "use_contextual_embeddings": use_contextual, "contextual_summary": context, **chunk_metadata } texts.append(contextualized_text) ids.append(chunk_id) metadatas.append(metadata) chunk_count += 1 # Create embeddings in batch logging.info(f"Creating embeddings for {len(texts)} chunks") embeddings = create_embeddings_batch(texts, provider, model, api_url) # Store in Chroma store_in_chroma(collection_name, texts, embeddings, ids, metadatas) # Create a preview of the first embedding if isinstance(embeddings, np.ndarray) and embeddings.size > 0: embedding_preview = str(embeddings[0][:50]) elif isinstance(embeddings, list) and len(embeddings) > 0: embedding_preview = str(embeddings[0][:50]) else: embedding_preview = "No embeddings created" # Return status message status = f"New embeddings created and stored for item: {item['title']} (ID: {item_id})" # Add contextual summaries to status message if enabled if use_contextual: status += " (with contextual summaries)" # Return status message, embedding preview, and metadata return status, f"First 50 elements of new embedding:\n{embedding_preview}", json.dumps(metadatas[0], indent=2) except Exception as e: logging.error(f"Error in create_new_embedding_for_item: {str(e)}", exc_info=True) return f"Error creating embedding: {str(e)}", "", "" # Wire up all the event handlers database_selection.change( update_database_path, inputs=[database_selection], outputs=[current_db_path] ) refresh_button.click( get_items_with_embedding_status, inputs=[database_selection], outputs=[item_dropdown, item_mapping] ) item_dropdown.change( check_embedding_status, inputs=[item_dropdown, database_selection, item_mapping], outputs=[embedding_status, embedding_preview, embedding_metadata] ) create_new_embedding_button.click( create_new_embedding_for_item, inputs=[item_dropdown, embedding_provider, huggingface_model, openai_model, custom_embedding_model, embedding_api_url, chunking_method, max_chunk_size, chunk_overlap, adaptive_chunking, item_mapping, use_contextual_embeddings, contextual_api_choice], outputs=[embedding_status, embedding_preview, embedding_metadata] ) embedding_provider.change( update_provider_options, inputs=[embedding_provider], outputs=[huggingface_model, openai_model, custom_embedding_model, embedding_api_url] ) huggingface_model.change( update_huggingface_options, inputs=[huggingface_model], outputs=[custom_embedding_model] ) return (item_dropdown, refresh_button, embedding_status, embedding_preview, embedding_metadata, create_new_embedding_button, embedding_provider, huggingface_model, openai_model, custom_embedding_model, embedding_api_url, chunking_method, max_chunk_size, chunk_overlap, adaptive_chunking, use_contextual_embeddings, contextual_api_choice, contextual_api_key) def create_purge_embeddings_tab(): with gr.TabItem("Purge Embeddings", visible=True): gr.Markdown("# Purge Embeddings") with gr.Row(): with gr.Column(): purge_button = gr.Button("Purge All Embeddings") with gr.Column(): status_output = gr.Textbox(label="Status", lines=10) def purge_all_embeddings(): try: # It came to me in a dream....I literally don't remember how the fuck this works, cant find documentation... collection_name = "all_content_embeddings" chroma_client.delete_collection(collection_name) chroma_client.create_collection(collection_name) logging.info(f"All embeddings have been purged successfully.") return "All embeddings have been purged successfully." except Exception as e: logging.error(f"Error during embedding purge: {str(e)}") return f"Error: {str(e)}" purge_button.click( fn=purge_all_embeddings, outputs=status_output ) # # End of file ########################################################################################################################