oceansweep's picture
Upload 169 files
c5b0bb7 verified
raw
history blame
33 kB
# Embeddings_tabc.py
# Description: This file contains the code for the RAG Chat tab in the Gradio UI
#
# Imports
import json
import logging
import os
#
# External Imports
import gradio as gr
import numpy as np
from tqdm import tqdm
#
# Local Imports
from App_Function_Libraries.DB.DB_Manager import get_all_content_from_database, get_all_conversations, \
get_conversation_text, get_note_by_id
from App_Function_Libraries.DB.RAG_QA_Chat_DB import get_all_notes
from App_Function_Libraries.RAG.ChromaDB_Library import chroma_client, \
store_in_chroma, situate_context
from App_Function_Libraries.RAG.Embeddings_Create import create_embedding, create_embeddings_batch
from App_Function_Libraries.Chunk_Lib import improved_chunking_process, chunk_for_embedding
from App_Function_Libraries.Utils.Utils import load_and_log_configs
#
########################################################################################################################
#
# Functions:
def create_embeddings_tab():
# Load configuration first
config = load_and_log_configs()
if not config:
raise ValueError("Could not load configuration")
# Get database paths from config
db_config = config['db_config']
media_db_path = db_config['sqlite_path']
rag_qa_db_path = os.path.join(os.path.dirname(media_db_path), "rag_qa.db")
character_chat_db_path = os.path.join(os.path.dirname(media_db_path), "chatDB.db")
chroma_db_path = db_config['chroma_db_path']
with gr.TabItem("Create Embeddings", visible=True):
gr.Markdown("# Create Embeddings for All Content")
with gr.Row():
with gr.Column():
# Database selection at the top
database_selection = gr.Radio(
choices=["Media DB", "RAG Chat", "Character Chat"],
label="Select Content Source",
value="Media DB",
info="Choose which database to create embeddings from"
)
# Add database path display
current_db_path = gr.Textbox(
label="Current Database Path",
value=media_db_path,
interactive=False
)
embedding_provider = gr.Radio(
choices=["huggingface", "local", "openai"],
label="Select Embedding Provider",
value=config['embedding_config']['embedding_provider'] or "huggingface"
)
gr.Markdown("Note: Local provider requires a running Llama.cpp/llamafile server.")
gr.Markdown("OpenAI provider requires a valid API key.")
huggingface_model = gr.Dropdown(
choices=[
"jinaai/jina-embeddings-v3",
"Alibaba-NLP/gte-large-en-v1.5",
"dunzhang/setll_en_400M_v5",
"custom"
],
label="Hugging Face Model",
value="jinaai/jina-embeddings-v3",
visible=True
)
openai_model = gr.Dropdown(
choices=[
"text-embedding-3-small",
"text-embedding-3-large"
],
label="OpenAI Embedding Model",
value="text-embedding-3-small",
visible=False
)
custom_embedding_model = gr.Textbox(
label="Custom Embedding Model",
placeholder="Enter your custom embedding model name here",
visible=False
)
embedding_api_url = gr.Textbox(
label="API URL (for local provider)",
value=config['embedding_config']['embedding_api_url'],
visible=False
)
# Add chunking options with config defaults
chunking_method = gr.Dropdown(
choices=["words", "sentences", "paragraphs", "tokens", "semantic"],
label="Chunking Method",
value="words"
)
max_chunk_size = gr.Slider(
minimum=1, maximum=8000, step=1,
value=config['embedding_config']['chunk_size'],
label="Max Chunk Size"
)
chunk_overlap = gr.Slider(
minimum=0, maximum=4000, step=1,
value=config['embedding_config']['overlap'],
label="Chunk Overlap"
)
adaptive_chunking = gr.Checkbox(
label="Use Adaptive Chunking",
value=False
)
create_button = gr.Button("Create Embeddings")
with gr.Column():
status_output = gr.Textbox(label="Status", lines=10)
progress = gr.Progress()
def update_provider_options(provider):
if provider == "huggingface":
return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
elif provider == "local":
return gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True)
else: # OpenAI
return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)
def update_huggingface_options(model):
if model == "custom":
return gr.update(visible=True)
else:
return gr.update(visible=False)
def update_database_path(database_type):
if database_type == "Media DB":
return media_db_path
elif database_type == "RAG Chat":
return rag_qa_db_path
else: # Character Chat
return character_chat_db_path
def create_all_embeddings(provider, hf_model, openai_model, custom_model, api_url, method,
max_size, overlap, adaptive, database_type, progress=gr.Progress()):
try:
# Initialize content based on database selection
if database_type == "Media DB":
all_content = get_all_content_from_database()
content_type = "media"
elif database_type == "RAG Chat":
all_content = []
page = 1
while True:
conversations, total_pages, _ = get_all_conversations(page=page)
if not conversations:
break
all_content.extend([{
'id': conv['conversation_id'],
'content': get_conversation_text(conv['conversation_id']),
'title': conv['title'],
'type': 'conversation'
} for conv in conversations])
progress(page / total_pages, desc=f"Loading conversations... Page {page}/{total_pages}")
page += 1
else: # Character Chat
all_content = []
page = 1
while True:
notes, total_pages, _ = get_all_notes(page=page)
if not notes:
break
all_content.extend([{
'id': note['id'],
'content': f"{note['title']}\n\n{note['content']}",
'conversation_id': note['conversation_id'],
'type': 'note'
} for note in notes])
progress(page / total_pages, desc=f"Loading notes... Page {page}/{total_pages}")
page += 1
if not all_content:
return "No content found in the selected database."
chunk_options = {
'method': method,
'max_size': max_size,
'overlap': overlap,
'adaptive': adaptive
}
collection_name = f"{database_type.lower().replace(' ', '_')}_embeddings"
collection = chroma_client.get_or_create_collection(name=collection_name)
# Determine the model to use
if provider == "huggingface":
model = custom_model if hf_model == "custom" else hf_model
elif provider == "openai":
model = openai_model
else:
model = api_url
total_items = len(all_content)
for idx, item in enumerate(all_content):
progress((idx + 1) / total_items, desc=f"Processing item {idx + 1} of {total_items}")
content_id = item['id']
text = item['content']
chunks = improved_chunking_process(text, chunk_options)
for chunk_idx, chunk in enumerate(chunks):
chunk_text = chunk['text']
chunk_id = f"{database_type.lower()}_{content_id}_chunk_{chunk_idx}"
try:
embedding = create_embedding(chunk_text, provider, model, api_url)
metadata = {
'content_id': str(content_id),
'chunk_index': int(chunk_idx),
'total_chunks': int(len(chunks)),
'chunking_method': method,
'max_chunk_size': int(max_size),
'chunk_overlap': int(overlap),
'adaptive_chunking': bool(adaptive),
'embedding_model': model,
'embedding_provider': provider,
'content_type': item.get('type', 'media'),
'conversation_id': item.get('conversation_id'),
**{k: (int(v) if isinstance(v, str) and v.isdigit() else v)
for k, v in chunk['metadata'].items()}
}
store_in_chroma(collection_name, [chunk_text], [embedding], [chunk_id], [metadata])
except Exception as e:
logging.error(f"Error processing chunk {chunk_id}: {str(e)}")
continue
return f"Embeddings created and stored successfully for all {database_type} content."
except Exception as e:
logging.error(f"Error during embedding creation: {str(e)}")
return f"Error: {str(e)}"
# Event handlers
embedding_provider.change(
fn=update_provider_options,
inputs=[embedding_provider],
outputs=[huggingface_model, openai_model, custom_embedding_model, embedding_api_url]
)
huggingface_model.change(
fn=update_huggingface_options,
inputs=[huggingface_model],
outputs=[custom_embedding_model]
)
database_selection.change(
fn=update_database_path,
inputs=[database_selection],
outputs=[current_db_path]
)
create_button.click(
fn=create_all_embeddings,
inputs=[
embedding_provider, huggingface_model, openai_model, custom_embedding_model,
embedding_api_url, chunking_method, max_chunk_size, chunk_overlap,
adaptive_chunking, database_selection
],
outputs=status_output
)
def create_view_embeddings_tab():
# Load configuration first
config = load_and_log_configs()
if not config:
raise ValueError("Could not load configuration")
# Get database paths from config
db_config = config['db_config']
media_db_path = db_config['sqlite_path']
rag_qa_db_path = os.path.join(os.path.dirname(media_db_path), "rag_chat.db")
character_chat_db_path = os.path.join(os.path.dirname(media_db_path), "character_chat.db")
chroma_db_path = db_config['chroma_db_path']
with gr.TabItem("View/Update Embeddings", visible=True):
gr.Markdown("# View and Update Embeddings")
# Initialize item_mapping as a Gradio State
with gr.Row():
with gr.Column():
# Add database selection
database_selection = gr.Radio(
choices=["Media DB", "RAG Chat", "Character Chat"],
label="Select Content Source",
value="Media DB",
info="Choose which database to view embeddings from"
)
# Add database path display
current_db_path = gr.Textbox(
label="Current Database Path",
value=media_db_path,
interactive=False
)
item_dropdown = gr.Dropdown(label="Select Item", choices=[], interactive=True)
refresh_button = gr.Button("Refresh Item List")
embedding_status = gr.Textbox(label="Embedding Status", interactive=False)
embedding_preview = gr.Textbox(label="Embedding Preview", interactive=False, lines=5)
embedding_metadata = gr.Textbox(label="Embedding Metadata", interactive=False, lines=10)
with gr.Column():
create_new_embedding_button = gr.Button("Create New Embedding")
embedding_provider = gr.Radio(
choices=["huggingface", "local", "openai"],
label="Select Embedding Provider",
value="huggingface"
)
gr.Markdown("Note: Local provider requires a running Llama.cpp/llamafile server.")
gr.Markdown("OpenAI provider requires a valid API key.")
huggingface_model = gr.Dropdown(
choices=[
"jinaai/jina-embeddings-v3",
"Alibaba-NLP/gte-large-en-v1.5",
"dunzhang/stella_en_400M_v5",
"custom"
],
label="Hugging Face Model",
value="jinaai/jina-embeddings-v3",
visible=True
)
openai_model = gr.Dropdown(
choices=[
"text-embedding-3-small",
"text-embedding-3-large"
],
label="OpenAI Embedding Model",
value="text-embedding-3-small",
visible=False
)
custom_embedding_model = gr.Textbox(
label="Custom Embedding Model",
placeholder="Enter your custom embedding model name here",
visible=False
)
embedding_api_url = gr.Textbox(
label="API URL (for local provider)",
value=config['embedding_config']['embedding_api_url'],
visible=False
)
chunking_method = gr.Dropdown(
choices=["words", "sentences", "paragraphs", "tokens", "semantic"],
label="Chunking Method",
value="words"
)
max_chunk_size = gr.Slider(
minimum=1, maximum=8000, step=5, value=500,
label="Max Chunk Size"
)
chunk_overlap = gr.Slider(
minimum=0, maximum=5000, step=5, value=200,
label="Chunk Overlap"
)
adaptive_chunking = gr.Checkbox(
label="Use Adaptive Chunking",
value=False
)
contextual_api_choice = gr.Dropdown(
choices=["Local-LLM", "OpenAI", "Anthropic", "Cohere", "Groq", "DeepSeek", "Mistral", "OpenRouter", "Llama.cpp", "Kobold", "Ooba", "Tabbyapi", "VLLM", "ollama", "HuggingFace"],
label="Select API for Contextualized Embeddings",
value="OpenAI"
)
use_contextual_embeddings = gr.Checkbox(
label="Use Contextual Embeddings",
value=True
)
contextual_api_key = gr.Textbox(label="API Key", lines=1)
item_mapping = gr.State(value={})
def update_database_path(database_type):
if database_type == "Media DB":
return media_db_path
elif database_type == "RAG Chat":
return rag_qa_db_path
else: # Character Chat
return character_chat_db_path
def get_items_with_embedding_status(database_type):
try:
# Get items based on database selection
if database_type == "Media DB":
items = get_all_content_from_database()
elif database_type == "RAG Chat":
conversations, _, _ = get_all_conversations(page=1)
items = [{
'id': conv['conversation_id'],
'title': conv['title'],
'type': 'conversation'
} for conv in conversations]
else: # Character Chat
notes, _, _ = get_all_notes(page=1)
items = [{
'id': note['id'],
'title': note['title'],
'type': 'note'
} for note in notes]
collection_name = f"{database_type.lower().replace(' ', '_')}_embeddings"
collection = chroma_client.get_or_create_collection(name=collection_name)
choices = []
new_item_mapping = {}
for item in items:
try:
chunk_id = f"{database_type.lower()}_{item['id']}_chunk_0"
result = collection.get(ids=[chunk_id])
embedding_exists = result is not None and result.get('ids') and len(result['ids']) > 0
status = "Embedding exists" if embedding_exists else "No embedding"
except Exception as e:
print(f"Error checking embedding for item {item['id']}: {str(e)}")
status = "Error checking"
choice = f"{item['title']} ({status})"
choices.append(choice)
new_item_mapping[choice] = item['id']
return gr.update(choices=choices), new_item_mapping
except Exception as e:
print(f"Error in get_items_with_embedding_status: {str(e)}")
return gr.update(choices=["Error: Unable to fetch items"]), {}
def update_provider_options(provider):
if provider == "huggingface":
return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
elif provider == "local":
return gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True)
else: # OpenAI
return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)
def update_huggingface_options(model):
if model == "custom":
return gr.update(visible=True)
else:
return gr.update(visible=False)
def check_embedding_status(selected_item, database_type, item_mapping):
if not selected_item:
return "Please select an item", "", ""
if item_mapping is None:
# If mapping is None, try to refresh it
try:
_, item_mapping = get_items_with_embedding_status(database_type)
except Exception as e:
return f"Error initializing item mapping: {str(e)}", "", ""
try:
item_id = item_mapping.get(selected_item)
if item_id is None:
return f"Invalid item selected: {selected_item}", "", ""
item_title = selected_item.rsplit(' (', 1)[0]
collection_name = f"{database_type.lower().replace(' ', '_')}_embeddings"
collection = chroma_client.get_or_create_collection(name=collection_name)
chunk_id = f"{database_type.lower()}_{item_id}_chunk_0"
try:
result = collection.get(ids=[chunk_id], include=["embeddings", "metadatas"])
except Exception as e:
logging.error(f"ChromaDB get error: {str(e)}")
return f"Error retrieving embedding for '{item_title}': {str(e)}", "", ""
# Check if result exists and has the expected structure
if not result or not isinstance(result, dict):
return f"No embedding found for item '{item_title}' (ID: {item_id})", "", ""
# Check if we have any results
if not result.get('ids') or len(result['ids']) == 0:
return f"No embedding found for item '{item_title}' (ID: {item_id})", "", ""
# Check if embeddings exist
if not result.get('embeddings') or not result['embeddings'][0]:
return f"Embedding data missing for item '{item_title}' (ID: {item_id})", "", ""
embedding = result['embeddings'][0]
metadata = result.get('metadatas', [{}])[0] if result.get('metadatas') else {}
embedding_preview = str(embedding[:50])
status = f"Embedding exists for item '{item_title}' (ID: {item_id})"
return status, f"First 50 elements of embedding:\n{embedding_preview}", json.dumps(metadata, indent=2)
except Exception as e:
logging.error(f"Error in check_embedding_status: {str(e)}", exc_info=True)
return f"Error processing item: {selected_item}. Details: {str(e)}", "", ""
def refresh_and_update(database_type):
choices_update, new_mapping = get_items_with_embedding_status(database_type)
return choices_update, new_mapping
def create_new_embedding_for_item(selected_item, database_type, provider, hf_model, openai_model,
custom_model, api_url, method, max_size, overlap, adaptive,
item_mapping, use_contextual, contextual_api_choice=None):
if not selected_item:
return "Please select an item", "", ""
try:
item_id = item_mapping.get(selected_item)
if item_id is None:
return f"Invalid item selected: {selected_item}", "", ""
# Get item content based on database type
if database_type == "Media DB":
items = get_all_content_from_database()
item = next((item for item in items if item['id'] == item_id), None)
elif database_type == "RAG Chat":
item = {
'id': item_id,
'content': get_conversation_text(item_id),
'title': selected_item.rsplit(' (', 1)[0],
'type': 'conversation'
}
else: # Character Chat
note = get_note_by_id(item_id)
item = {
'id': item_id,
'content': f"{note['title']}\n\n{note['content']}",
'title': note['title'],
'type': 'note'
}
if not item:
return f"Item not found: {item_id}", "", ""
chunk_options = {
'method': method,
'max_size': max_size,
'overlap': overlap,
'adaptive': adaptive
}
logging.info(f"Chunking content for item: {item['title']} (ID: {item_id})")
chunks = chunk_for_embedding(item['content'], item['title'], chunk_options)
collection_name = f"{database_type.lower().replace(' ', '_')}_embeddings"
collection = chroma_client.get_or_create_collection(name=collection_name)
# Delete existing embeddings for this item
existing_ids = [f"{database_type.lower()}_{item_id}_chunk_{i}" for i in range(len(chunks))]
collection.delete(ids=existing_ids)
logging.info(f"Deleted {len(existing_ids)} existing embeddings for item {item_id}")
texts, ids, metadatas = [], [], []
chunk_count = 0
logging.info("Generating contextual summaries and preparing chunks for embedding")
for i, chunk in enumerate(chunks):
chunk_text = chunk['text']
chunk_metadata = chunk['metadata']
if use_contextual:
logging.debug(f"Generating contextual summary for chunk {chunk_count}")
context = situate_context(contextual_api_choice, item['content'], chunk_text)
contextualized_text = f"{chunk_text}\n\nContextual Summary: {context}"
else:
contextualized_text = chunk_text
context = None
chunk_id = f"{database_type.lower()}_{item_id}_chunk_{i}"
# Determine the model to use
if provider == "huggingface":
model = custom_model if hf_model == "custom" else hf_model
elif provider == "openai":
model = openai_model
else:
model = custom_model
metadata = {
"content_id": str(item_id),
"chunk_index": i,
"total_chunks": len(chunks),
"chunking_method": method,
"max_chunk_size": max_size,
"chunk_overlap": overlap,
"adaptive_chunking": adaptive,
"embedding_model": model,
"embedding_provider": provider,
"original_text": chunk_text,
"use_contextual_embeddings": use_contextual,
"contextual_summary": context,
**chunk_metadata
}
texts.append(contextualized_text)
ids.append(chunk_id)
metadatas.append(metadata)
chunk_count += 1
# Create embeddings in batch
logging.info(f"Creating embeddings for {len(texts)} chunks")
embeddings = create_embeddings_batch(texts, provider, model, api_url)
# Store in Chroma
store_in_chroma(collection_name, texts, embeddings, ids, metadatas)
# Create a preview of the first embedding
if isinstance(embeddings, np.ndarray) and embeddings.size > 0:
embedding_preview = str(embeddings[0][:50])
elif isinstance(embeddings, list) and len(embeddings) > 0:
embedding_preview = str(embeddings[0][:50])
else:
embedding_preview = "No embeddings created"
# Return status message
status = f"New embeddings created and stored for item: {item['title']} (ID: {item_id})"
# Add contextual summaries to status message if enabled
if use_contextual:
status += " (with contextual summaries)"
# Return status message, embedding preview, and metadata
return status, f"First 50 elements of new embedding:\n{embedding_preview}", json.dumps(metadatas[0],
indent=2)
except Exception as e:
logging.error(f"Error in create_new_embedding_for_item: {str(e)}", exc_info=True)
return f"Error creating embedding: {str(e)}", "", ""
# Wire up all the event handlers
database_selection.change(
update_database_path,
inputs=[database_selection],
outputs=[current_db_path]
)
refresh_button.click(
get_items_with_embedding_status,
inputs=[database_selection],
outputs=[item_dropdown, item_mapping]
)
item_dropdown.change(
check_embedding_status,
inputs=[item_dropdown, database_selection, item_mapping],
outputs=[embedding_status, embedding_preview, embedding_metadata]
)
create_new_embedding_button.click(
create_new_embedding_for_item,
inputs=[item_dropdown, embedding_provider, huggingface_model, openai_model, custom_embedding_model, embedding_api_url,
chunking_method, max_chunk_size, chunk_overlap, adaptive_chunking, item_mapping,
use_contextual_embeddings, contextual_api_choice],
outputs=[embedding_status, embedding_preview, embedding_metadata]
)
embedding_provider.change(
update_provider_options,
inputs=[embedding_provider],
outputs=[huggingface_model, openai_model, custom_embedding_model, embedding_api_url]
)
huggingface_model.change(
update_huggingface_options,
inputs=[huggingface_model],
outputs=[custom_embedding_model]
)
return (item_dropdown, refresh_button, embedding_status, embedding_preview, embedding_metadata,
create_new_embedding_button, embedding_provider, huggingface_model, openai_model,
custom_embedding_model, embedding_api_url, chunking_method, max_chunk_size,
chunk_overlap, adaptive_chunking, use_contextual_embeddings,
contextual_api_choice, contextual_api_key)
def create_purge_embeddings_tab():
with gr.TabItem("Purge Embeddings", visible=True):
gr.Markdown("# Purge Embeddings")
with gr.Row():
with gr.Column():
purge_button = gr.Button("Purge All Embeddings")
with gr.Column():
status_output = gr.Textbox(label="Status", lines=10)
def purge_all_embeddings():
try:
# It came to me in a dream....I literally don't remember how the fuck this works, cant find documentation...
collection_name = "all_content_embeddings"
chroma_client.delete_collection(collection_name)
chroma_client.create_collection(collection_name)
logging.info(f"All embeddings have been purged successfully.")
return "All embeddings have been purged successfully."
except Exception as e:
logging.error(f"Error during embedding purge: {str(e)}")
return f"Error: {str(e)}"
purge_button.click(
fn=purge_all_embeddings,
outputs=status_output
)
#
# End of file
########################################################################################################################