# Rag_Chat_tab.py # Description: This file contains the code for the RAG Chat tab in the Gradio UI # # Imports import logging # # External Imports import gradio as gr # # Local Imports from App_Function_Libraries.DB.DB_Manager import get_all_content_from_database from App_Function_Libraries.RAG.ChromaDB_Library import chroma_client, \ check_embedding_status, store_in_chroma from App_Function_Libraries.RAG.Embeddings_Create import create_embedding from App_Function_Libraries.RAG.RAG_Libary_2 import enhanced_rag_pipeline # ######################################################################################################################## # # Functions: def create_rag_tab(): with gr.TabItem("RAG Search"): gr.Markdown("# Retrieval-Augmented Generation (RAG) Search") with gr.Row(): with gr.Column(): search_query = gr.Textbox(label="Enter your question", placeholder="What would you like to know?") keyword_filtering_checkbox = gr.Checkbox(label="Enable Keyword Filtering", value=False) keywords_input = gr.Textbox( label="Enter keywords (comma-separated)", value="keyword1, keyword2, ...", visible=False ) keyword_instructions = gr.Markdown( "Enter comma-separated keywords to filter your search results.", visible=False ) api_choice = gr.Dropdown( choices=["Local-LLM", "OpenAI", "Anthropic", "Cohere", "Groq", "DeepSeek", "Mistral", "OpenRouter", "Llama.cpp", "Kobold", "Ooba", "Tabbyapi", "VLLM", "ollama", "HuggingFace"], label="Select API for RAG", value="OpenAI" ) search_button = gr.Button("Search") with gr.Column(): result_output = gr.Textbox(label="Answer", lines=10) context_output = gr.Textbox(label="Context", lines=10, visible=True) def toggle_keyword_filtering(checkbox_value): return { keywords_input: gr.update(visible=checkbox_value), keyword_instructions: gr.update(visible=checkbox_value) } keyword_filtering_checkbox.change( toggle_keyword_filtering, inputs=[keyword_filtering_checkbox], outputs=[keywords_input, keyword_instructions] ) def perform_rag_search(query, keywords, api_choice): if keywords == "keyword1, keyword2, ...": keywords = None result = enhanced_rag_pipeline(query, api_choice, keywords) return result['answer'], result['context'] search_button.click(perform_rag_search, inputs=[search_query, keywords_input, api_choice], outputs=[result_output, context_output]) # FIXME - under construction def create_embeddings_tab(): with gr.TabItem("Create Embeddings"): gr.Markdown("# Create Embeddings for All Content") with gr.Row(): with gr.Column(): embedding_provider = gr.Radio( choices=["openai", "local", "huggingface"], label="Select Embedding Provider", value="openai" ) embedding_model = gr.Textbox( label="Embedding Model", value="text-embedding-3-small" ) embedding_api_url = gr.Textbox( label="API URL (for local provider)", value="http://localhost:8080/embedding", visible=False ) create_button = gr.Button("Create Embeddings") with gr.Column(): status_output = gr.Textbox(label="Status", lines=10) def update_provider_options(provider): return gr.update(visible=provider == "local") embedding_provider.change( fn=update_provider_options, inputs=[embedding_provider], outputs=[embedding_api_url] ) def create_all_embeddings(provider, model, api_url): try: all_content = get_all_content_from_database() if not all_content: return "No content found in the database." collection_name = "all_content_embeddings" collection = chroma_client.get_or_create_collection(name=collection_name) for item in all_content: media_id = item['id'] text = item['content'] existing = collection.get(ids=[f"doc_{media_id}"]) if existing['ids']: continue embedding = create_embedding(text, provider, model, api_url) store_in_chroma(collection_name, [text], [embedding], [f"doc_{media_id}"], [{"media_id": media_id}]) return "Embeddings created and stored successfully for all new content." except Exception as e: logging.error(f"Error during embedding creation: {str(e)}") return f"Error: {str(e)}" create_button.click( fn=create_all_embeddings, inputs=[embedding_provider, embedding_model, embedding_api_url], outputs=status_output ) def create_view_embeddings_tab(): with gr.TabItem("View/Update Embeddings"): gr.Markdown("# View and Update Embeddings") item_mapping = gr.State({}) with gr.Row(): with gr.Column(): item_dropdown = gr.Dropdown(label="Select Item", choices=[], interactive=True) refresh_button = gr.Button("Refresh Item List") embedding_status = gr.Textbox(label="Embedding Status", interactive=False) embedding_preview = gr.Textbox(label="Embedding Preview", interactive=False, lines=5) with gr.Column(): create_new_embedding_button = gr.Button("Create New Embedding") embedding_provider = gr.Radio( choices=["openai", "local", "huggingface"], label="Embedding Provider", value="openai" ) embedding_model = gr.Textbox( label="Embedding Model", value="text-embedding-3-small", visible=True ) embedding_api_url = gr.Textbox( label="API URL (for local provider)", value="http://localhost:8080/embedding", visible=False ) def get_items_with_embedding_status(): try: items = get_all_content_from_database() collection = chroma_client.get_or_create_collection(name="all_content_embeddings") choices = [] new_item_mapping = {} for item in items: try: result = collection.get(ids=[f"doc_{item['id']}"]) embedding_exists = result is not None and result.get('ids') and len(result['ids']) > 0 status = "Embedding exists" if embedding_exists else "No embedding" except Exception as e: print(f"Error checking embedding for item {item['id']}: {str(e)}") status = "Error checking" choice = f"{item['title']} ({status})" choices.append(choice) new_item_mapping[choice] = item['id'] return gr.update(choices=choices), new_item_mapping except Exception as e: print(f"Error in get_items_with_embedding_status: {str(e)}") return gr.update(choices=["Error: Unable to fetch items"]), {} def update_provider_options(provider): return gr.update(visible=provider == "local") def create_new_embedding_for_item(selected_item, provider, model, api_url, item_mapping): if not selected_item: return "Please select an item", "" try: item_id = item_mapping.get(selected_item) if item_id is None: return f"Invalid item selected: {selected_item}", "" items = get_all_content_from_database() item = next((item for item in items if item['id'] == item_id), None) if not item: return f"Item not found: {item_id}", "" embedding = create_embedding(item['content'], provider, model, api_url) collection_name = "all_content_embeddings" metadata = {"media_id": item_id, "title": item['title']} store_in_chroma(collection_name, [item['content']], [embedding], [f"doc_{item_id}"], [{"media_id": item_id, "title": item['title']}]) embedding_preview = str(embedding[:50]) status = f"New embedding created and stored for item: {item['title']} (ID: {item_id})" return status, f"First 50 elements of new embedding:\n{embedding_preview}\n\nMetadata: {metadata}" except Exception as e: logging.error(f"Error in create_new_embedding_for_item: {str(e)}") return f"Error creating embedding: {str(e)}", "" refresh_button.click( get_items_with_embedding_status, outputs=[item_dropdown, item_mapping] ) item_dropdown.change( check_embedding_status, inputs=[item_dropdown, item_mapping], outputs=[embedding_status, embedding_preview] ) create_new_embedding_button.click( create_new_embedding_for_item, inputs=[item_dropdown, embedding_provider, embedding_model, embedding_api_url, item_mapping], outputs=[embedding_status, embedding_preview] ) embedding_provider.change( update_provider_options, inputs=[embedding_provider], outputs=[embedding_api_url] ) return item_dropdown, refresh_button, embedding_status, embedding_preview, create_new_embedding_button, embedding_provider, embedding_model, embedding_api_url # # End of file ########################################################################################################################