oceansweep's picture
Upload 13 files
4e1f4a3 verified
raw
history blame
16.8 kB
# Embeddings_tabc.py
# Description: This file contains the code for the RAG Chat tab in the Gradio UI
#
# Imports
import json
import logging
#
# External Imports
import gradio as gr
from App_Function_Libraries.Chunk_Lib import improved_chunking_process, determine_chunk_position
#
# Local Imports
from App_Function_Libraries.DB.DB_Manager import get_all_content_from_database
from App_Function_Libraries.RAG.ChromaDB_Library import chroma_client, \
store_in_chroma
from App_Function_Libraries.RAG.Embeddings_Create import create_embedding
#
########################################################################################################################
#
# Functions:
# FIXME - under construction
def create_embeddings_tab():
with gr.TabItem("Create Embeddings"):
gr.Markdown("# Create Embeddings for All Content")
with gr.Row():
with gr.Column():
embedding_provider = gr.Radio(
choices=["huggingface", "local", "openai"],
label="Select Embedding Provider",
value="huggingface"
)
gr.Markdown("Note: Local provider requires a running Llama.cpp/llamafile server.")
gr.Markdown("OpenAI provider requires a valid API key. ")
gr.Markdown("OpenAI Embeddings models: `text-embedding-3-small`, `text-embedding-3-large`")
gr.Markdown("HuggingFace provider requires a valid model name, i.e. `dunzhang/stella_en_400M_v5`")
embedding_model = gr.Textbox(
label="Embedding Model",
value="Enter your embedding model name here", lines=3
)
embedding_api_url = gr.Textbox(
label="API URL (for local provider)",
value="http://localhost:8080/embedding",
visible=False
)
# Add chunking options
chunking_method = gr.Dropdown(
choices=["words", "sentences", "paragraphs", "tokens", "semantic"],
label="Chunking Method",
value="words"
)
max_chunk_size = gr.Slider(
minimum=1, maximum=8000, step=1, value=500,
label="Max Chunk Size"
)
chunk_overlap = gr.Slider(
minimum=0, maximum=4000, step=1, value=200,
label="Chunk Overlap"
)
adaptive_chunking = gr.Checkbox(
label="Use Adaptive Chunking",
value=False
)
create_button = gr.Button("Create Embeddings")
with gr.Column():
status_output = gr.Textbox(label="Status", lines=10)
def update_provider_options(provider):
return gr.update(visible=provider == "local")
embedding_provider.change(
fn=update_provider_options,
inputs=[embedding_provider],
outputs=[embedding_api_url]
)
def create_all_embeddings(provider, model, api_url, method, max_size, overlap, adaptive):
try:
all_content = get_all_content_from_database()
if not all_content:
return "No content found in the database."
chunk_options = {
'method': method,
'max_size': max_size,
'overlap': overlap,
'adaptive': adaptive
}
collection_name = "all_content_embeddings"
collection = chroma_client.get_or_create_collection(name=collection_name)
for item in all_content:
media_id = item['id']
text = item['content']
chunks = improved_chunking_process(text, chunk_options)
for i, chunk in enumerate(chunks):
chunk_text = chunk['text']
chunk_id = f"doc_{media_id}_chunk_{i}"
existing = collection.get(ids=[chunk_id])
if existing['ids']:
continue
embedding = create_embedding(chunk_text, provider, model, api_url)
metadata = {
"media_id": str(media_id),
"chunk_index": i,
"total_chunks": len(chunks),
"chunking_method": method,
"max_chunk_size": max_size,
"chunk_overlap": overlap,
"adaptive_chunking": adaptive,
"embedding_model": model,
"embedding_provider": provider,
**chunk['metadata']
}
store_in_chroma(collection_name, [chunk_text], [embedding], [chunk_id], [metadata])
return "Embeddings created and stored successfully for all content."
except Exception as e:
logging.error(f"Error during embedding creation: {str(e)}")
return f"Error: {str(e)}"
create_button.click(
fn=create_all_embeddings,
inputs=[embedding_provider, embedding_model, embedding_api_url,
chunking_method, max_chunk_size, chunk_overlap, adaptive_chunking],
outputs=status_output
)
def create_view_embeddings_tab():
with gr.TabItem("View/Update Embeddings"):
gr.Markdown("# View and Update Embeddings")
item_mapping = gr.State({})
with gr.Row():
with gr.Column():
item_dropdown = gr.Dropdown(label="Select Item", choices=[], interactive=True)
refresh_button = gr.Button("Refresh Item List")
embedding_status = gr.Textbox(label="Embedding Status", interactive=False)
embedding_preview = gr.Textbox(label="Embedding Preview", interactive=False, lines=5)
embedding_metadata = gr.Textbox(label="Embedding Metadata", interactive=False, lines=10)
with gr.Column():
create_new_embedding_button = gr.Button("Create New Embedding")
embedding_provider = gr.Radio(
choices=["huggingface", "local", "openai"],
label="Select Embedding Provider",
value="huggingface"
)
gr.Markdown("Note: Local provider requires a running Llama.cpp/llamafile server.")
gr.Markdown("OpenAI provider requires a valid API key. ")
gr.Markdown("OpenAI Embeddings models: `text-embedding-3-small`, `text-embedding-3-large`")
gr.Markdown("HuggingFace provider requires a valid model name, i.e. `dunzhang/stella_en_400M_v5`")
embedding_model = gr.Textbox(
label="Embedding Model",
value="Enter your embedding model name here", lines=3
)
embedding_api_url = gr.Textbox(
label="API URL (for local provider)",
value="http://localhost:8080/embedding",
visible=False
)
chunking_method = gr.Dropdown(
choices=["words", "sentences", "paragraphs", "tokens", "semantic"],
label="Chunking Method",
value="words"
)
max_chunk_size = gr.Slider(
minimum=1, maximum=8000, step=1, value=500,
label="Max Chunk Size"
)
chunk_overlap = gr.Slider(
minimum=0, maximum=5000, step=1, value=200,
label="Chunk Overlap"
)
adaptive_chunking = gr.Checkbox(
label="Use Adaptive Chunking",
value=False
)
def get_items_with_embedding_status():
try:
items = get_all_content_from_database()
collection = chroma_client.get_or_create_collection(name="all_content_embeddings")
choices = []
new_item_mapping = {}
for item in items:
try:
result = collection.get(ids=[f"doc_{item['id']}_chunk_0"])
embedding_exists = result is not None and result.get('ids') and len(result['ids']) > 0
status = "Embedding exists" if embedding_exists else "No embedding"
except Exception as e:
print(f"Error checking embedding for item {item['id']}: {str(e)}")
status = "Error checking"
choice = f"{item['title']} ({status})"
choices.append(choice)
new_item_mapping[choice] = item['id']
return gr.update(choices=choices), new_item_mapping
except Exception as e:
print(f"Error in get_items_with_embedding_status: {str(e)}")
return gr.update(choices=["Error: Unable to fetch items"]), {}
def update_provider_options(provider):
return gr.update(visible=provider == "local")
def check_embedding_status(selected_item, item_mapping):
if not selected_item:
return "Please select an item", "", ""
try:
item_id = item_mapping.get(selected_item)
if item_id is None:
return f"Invalid item selected: {selected_item}", "", ""
item_title = selected_item.rsplit(' (', 1)[0]
collection = chroma_client.get_or_create_collection(name="all_content_embeddings")
result = collection.get(ids=[f"doc_{item_id}_chunk_0"], include=["embeddings", "metadatas"])
logging.info(f"ChromaDB result for item '{item_title}' (ID: {item_id}): {result}")
if not result['ids']:
return f"No embedding found for item '{item_title}' (ID: {item_id})", "", ""
if not result['embeddings'] or not result['embeddings'][0]:
return f"Embedding data missing for item '{item_title}' (ID: {item_id})", "", ""
embedding = result['embeddings'][0]
metadata = result['metadatas'][0] if result['metadatas'] else {}
embedding_preview = str(embedding[:50])
status = f"Embedding exists for item '{item_title}' (ID: {item_id})"
return status, f"First 50 elements of embedding:\n{embedding_preview}", json.dumps(metadata, indent=2)
except Exception as e:
logging.error(f"Error in check_embedding_status: {str(e)}")
return f"Error processing item: {selected_item}. Details: {str(e)}", "", ""
def create_new_embedding_for_item(selected_item, provider, model, api_url, method, max_size, overlap, adaptive, item_mapping):
if not selected_item:
return "Please select an item", "", ""
try:
item_id = item_mapping.get(selected_item)
if item_id is None:
return f"Invalid item selected: {selected_item}", "", ""
items = get_all_content_from_database()
item = next((item for item in items if item['id'] == item_id), None)
if not item:
return f"Item not found: {item_id}", "", ""
chunk_options = {
'method': method,
'max_size': max_size,
'overlap': overlap,
'adaptive': adaptive
}
chunks = improved_chunking_process(item['content'], chunk_options)
collection_name = "all_content_embeddings"
collection = chroma_client.get_or_create_collection(name=collection_name)
# Delete existing embeddings for this item
existing_ids = [f"doc_{item_id}_chunk_{i}" for i in range(len(chunks))]
collection.delete(ids=existing_ids)
for i, chunk in enumerate(chunks):
chunk_text = chunk['text']
chunk_metadata = chunk['metadata']
chunk_position = determine_chunk_position(chunk_metadata['relative_position'])
chunk_header = f"""
Original Document: {item['title']}
Chunk: {i + 1} of {len(chunks)}
Position: {chunk_position}
Header: {chunk_metadata.get('header_text', 'N/A')}
--- Chunk Content ---
"""
full_chunk_text = chunk_header + chunk_text
chunk_id = f"doc_{item_id}_chunk_{i}"
embedding = create_embedding(full_chunk_text, provider, model, api_url)
metadata = {
"media_id": str(item_id),
"chunk_index": i,
"total_chunks": len(chunks),
"chunking_method": method,
"max_chunk_size": max_size,
"chunk_overlap": overlap,
"adaptive_chunking": adaptive,
"embedding_model": model,
"embedding_provider": provider,
**chunk_metadata
}
store_in_chroma(collection_name, [full_chunk_text], [embedding], [chunk_id], [metadata])
embedding_preview = str(embedding[:50])
status = f"New embeddings created and stored for item: {item['title']} (ID: {item_id})"
return status, f"First 50 elements of new embedding:\n{embedding_preview}", json.dumps(metadata, indent=2)
except Exception as e:
logging.error(f"Error in create_new_embedding_for_item: {str(e)}")
return f"Error creating embedding: {str(e)}", "", ""
refresh_button.click(
get_items_with_embedding_status,
outputs=[item_dropdown, item_mapping]
)
item_dropdown.change(
check_embedding_status,
inputs=[item_dropdown, item_mapping],
outputs=[embedding_status, embedding_preview, embedding_metadata]
)
create_new_embedding_button.click(
create_new_embedding_for_item,
inputs=[item_dropdown, embedding_provider, embedding_model, embedding_api_url,
chunking_method, max_chunk_size, chunk_overlap, adaptive_chunking, item_mapping],
outputs=[embedding_status, embedding_preview, embedding_metadata]
)
embedding_provider.change(
update_provider_options,
inputs=[embedding_provider],
outputs=[embedding_api_url]
)
return item_dropdown, refresh_button, embedding_status, embedding_preview, embedding_metadata, create_new_embedding_button, embedding_provider, embedding_model, embedding_api_url, chunking_method, max_chunk_size, chunk_overlap, adaptive_chunking
def create_purge_embeddings_tab():
with gr.TabItem("Purge Embeddings"):
gr.Markdown("# Purge Embeddings")
with gr.Row():
with gr.Column():
purge_button = gr.Button("Purge All Embeddings")
with gr.Column():
status_output = gr.Textbox(label="Status", lines=10)
def purge_all_embeddings():
try:
collection_name = "all_content_embeddings"
chroma_client.delete_collection(collection_name)
chroma_client.create_collection(collection_name)
return "All embeddings have been purged successfully."
except Exception as e:
logging.error(f"Error during embedding purge: {str(e)}")
return f"Error: {str(e)}"
purge_button.click(
fn=purge_all_embeddings,
outputs=status_output
)
#
# End of file
########################################################################################################################