# RAG_QA_Chat_tab.py # Description: Gradio UI for RAG QA Chat # # Imports import csv import logging import json import os import re from datetime import datetime # # External Imports import docx2txt import gradio as gr # # Local Imports from App_Function_Libraries.Books.Book_Ingestion_Lib import read_epub from App_Function_Libraries.DB.Character_Chat_DB import search_character_chat, search_character_cards from App_Function_Libraries.DB.DB_Manager import DatabaseError, get_paginated_files, add_media_with_keywords, \ get_all_conversations, get_note_by_id, get_notes_by_keywords, start_new_conversation, update_note, save_notes, \ clear_keywords_from_note, add_keywords_to_note, load_chat_history, save_message, add_keywords_to_conversation, \ get_keywords_for_note, delete_note, search_conversations_by_keywords, get_conversation_title, delete_conversation, \ update_conversation_title, fetch_all_conversations, fetch_all_notes, fetch_conversations_by_ids, fetch_notes_by_ids, \ search_media_db, search_notes_titles, list_prompts from App_Function_Libraries.DB.RAG_QA_Chat_DB import get_notes, delete_messages_in_conversation, search_rag_notes, \ search_rag_chat, get_conversation_rating, set_conversation_rating from App_Function_Libraries.Gradio_UI.Gradio_Shared import update_user_prompt from App_Function_Libraries.PDF.PDF_Ingestion_Lib import extract_text_and_format_from_pdf from App_Function_Libraries.RAG.RAG_Library_2 import generate_answer, enhanced_rag_pipeline from App_Function_Libraries.RAG.RAG_QA_Chat import search_database, rag_qa_chat from App_Function_Libraries.Utils.Utils import default_api_endpoint, global_api_endpoints, format_api_name, \ load_comprehensive_config # ######################################################################################################################## # # Functions: def create_rag_qa_chat_tab(): try: default_value = None if default_api_endpoint: if default_api_endpoint in global_api_endpoints: default_value = format_api_name(default_api_endpoint) else: logging.warning(f"Default API endpoint '{default_api_endpoint}' not found in global_api_endpoints") except Exception as e: logging.error(f"Error setting default API endpoint: {str(e)}") default_value = None with gr.TabItem("RAG QA Chat", visible=True): gr.Markdown("# RAG QA Chat") state = gr.State({ "page": 1, "context_source": "Entire Media Database", "conversation_messages": [], "conversation_id": None }) note_state = gr.State({"note_id": None}) def auto_save_conversation(message, response, state_value, auto_save_enabled): """Automatically save the conversation if auto-save is enabled""" try: if not auto_save_enabled: return state_value conversation_id = state_value.get("conversation_id") if not conversation_id: # Create new conversation with default title title = "Auto-saved Conversation " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") conversation_id = start_new_conversation(title=title) state_value = state_value.copy() state_value["conversation_id"] = conversation_id # Save the messages save_message(conversation_id, "user", message) save_message(conversation_id, "assistant", response) return state_value except Exception as e: logging.error(f"Error in auto-save: {str(e)}") return state_value # Update the conversation list function def update_conversation_list(): conversations, total_pages, total_count = get_all_conversations() choices = [ f"{conversation['title']} (ID: {conversation['conversation_id']}) - Rating: {conversation['rating'] or 'Not Rated'}" for conversation in conversations ] return choices with gr.Row(): with gr.Column(scale=1): # FIXME - Offer the user to search 2+ databases at once database_types = ["Media DB", "RAG Chat", "RAG Notes", "Character Chat", "Character Cards"] db_choice = gr.CheckboxGroup( label="Select Database(s)", choices=database_types, value=["Media DB"], interactive=True ) context_source = gr.Radio( ["All Files in the Database", "Search Database", "Upload File"], label="Context Source", value="All Files in the Database" ) existing_file = gr.Dropdown(label="Select Existing File", choices=[], interactive=True) file_page = gr.State(value=1) with gr.Row(): prev_page_btn = gr.Button("Previous Page") next_page_btn = gr.Button("Next Page") page_info = gr.HTML("Page 1") top_k_input = gr.Number(value=10, label="Maximum amount of results to use (Default: 10)", minimum=1, maximum=50, step=1, precision=0, interactive=True) keywords_input = gr.Textbox(label="Keywords (comma-separated) to filter results by)", value="rag_qa_default_keyword" ,visible=True) use_query_rewriting = gr.Checkbox(label="Use Query Rewriting", value=True) use_re_ranking = gr.Checkbox(label="Use Re-ranking", value=True) config = load_comprehensive_config() auto_save_value = config.getboolean('auto-save', 'save_character_chats', fallback=False) auto_save_checkbox = gr.Checkbox( label="Save chats automatically", value=auto_save_value, info="When enabled, conversations will be saved automatically after each message" ) initial_prompts, total_pages, current_page = list_prompts(page=1, per_page=10) preset_prompt_checkbox = gr.Checkbox( label="View Custom Prompts(have to copy/paste them)", value=False, visible=True ) with gr.Row(visible=False) as preset_prompt_controls: prev_prompt_page = gr.Button("Previous") current_prompt_page_text = gr.Text(f"Page {current_page} of {total_pages}") next_prompt_page = gr.Button("Next") current_prompt_page_state = gr.State(value=1) preset_prompt = gr.Dropdown( label="Select Preset Prompt", choices=initial_prompts, visible=False ) user_prompt = gr.Textbox( label="Custom Prompt", placeholder="Enter custom prompt here", lines=3, visible=False ) system_prompt_input = gr.Textbox( label="System Prompt", lines=3, visible=False ) search_query = gr.Textbox(label="Search Query", visible=False) search_button = gr.Button("Search", visible=False) search_results = gr.Dropdown(label="Search Results", choices=[], visible=False) file_upload = gr.File( label="Upload File", visible=False, file_types=["txt", "pdf", "epub", "md", "rtf", "json", "csv", "docx"] ) convert_to_text = gr.Checkbox(label="Convert to plain text", visible=False) with gr.Column(scale=1): load_conversation = gr.Dropdown( label="Load Conversation", choices=update_conversation_list() ) new_conversation = gr.Button("New Conversation") save_conversation_button = gr.Button("Save Conversation") conversation_title = gr.Textbox( label="Conversation Title", placeholder="Enter a title for the new conversation" ) keywords = gr.Textbox(label="Keywords (comma-separated)", visible=True) # Add the rating display and input rating_display = gr.Markdown(value="", visible=False) rating_input = gr.Radio( choices=["1", "2", "3"], label="Rate this Conversation (1-3 stars)", visible=False ) # Refactored API selection dropdown api_choice = gr.Dropdown( choices=["None"] + [format_api_name(api) for api in global_api_endpoints], value=default_value, label="API for Chat Response (Optional)" ) with gr.Row(): with gr.Column(scale=2): chatbot = gr.Chatbot(height=700) msg = gr.Textbox(label="Enter your message") submit = gr.Button("Submit") clear_chat = gr.Button("Clear Chat History") with gr.Column(scale=1): # Adding UI elements for notes note_title = gr.Textbox(label="Note Title", placeholder="Enter a title for the note") notes = gr.TextArea(label="Notes", placeholder="Enter your notes here...", lines=25) keywords_for_notes = gr.Textbox( label="Keywords for Notes (comma-separated)", placeholder="Enter keywords for the note", visible=True, ) save_notes_btn = gr.Button("Save Note") clear_notes_btn = gr.Button("Clear Current Note text") new_note_btn = gr.Button("New Note") # FIXME - Change from only keywords to generalized search search_notes_title = gr.Textbox(label="Search Notes by Title") search_notes_by_keyword = gr.Textbox(label="Search Notes by Keyword") search_notes_button = gr.Button("Search Notes") note_results = gr.Dropdown(label="Notes", choices=[]) load_note = gr.Dropdown(label="Load Note", choices=[]) loading_indicator = gr.HTML("Loading...", visible=False) status_message = gr.HTML() auto_save_status = gr.HTML() # Function Definitions def update_prompt_page(direction, current_page_val): new_page = max(1, min(total_pages, current_page_val + direction)) prompts, _, _ = list_prompts(page=new_page, per_page=10) return ( gr.update(choices=prompts), gr.update(value=f"Page {new_page} of {total_pages}"), new_page ) def update_prompts(preset_name): prompts = update_user_prompt(preset_name) return ( gr.update(value=prompts["user_prompt"], visible=True), gr.update(value=prompts["system_prompt"], visible=True) ) def toggle_preset_prompt(checkbox_value): return ( gr.update(visible=checkbox_value), gr.update(visible=checkbox_value), gr.update(visible=False), gr.update(visible=False) ) prev_prompt_page.click( lambda x: update_prompt_page(-1, x), inputs=[current_prompt_page_state], outputs=[preset_prompt, current_prompt_page_text, current_prompt_page_state] ) next_prompt_page.click( lambda x: update_prompt_page(1, x), inputs=[current_prompt_page_state], outputs=[preset_prompt, current_prompt_page_text, current_prompt_page_state] ) preset_prompt.change( update_prompts, inputs=preset_prompt, outputs=[user_prompt, system_prompt_input] ) preset_prompt_checkbox.change( toggle_preset_prompt, inputs=[preset_prompt_checkbox], outputs=[preset_prompt, preset_prompt_controls, user_prompt, system_prompt_input] ) def update_state(state, **kwargs): new_state = state.copy() new_state.update(kwargs) return new_state def create_new_note(): return gr.update(value='un-named note'), gr.update(value=''), {"note_id": None} new_note_btn.click( create_new_note, outputs=[note_title, notes, note_state] ) def search_notes(search_notes_title, keywords): if keywords: keywords_list = [kw.strip() for kw in keywords.split(',')] notes_data, total_pages, total_count = get_notes_by_keywords(keywords_list) choices = [f"Note {note_id} - {title} ({timestamp})" for note_id, title, content, timestamp, conversation_id in notes_data] return gr.update(choices=choices, label=f"Found {total_count} notes") elif search_notes_title: notes_data, total_pages, total_count = search_notes_titles(search_notes_title) choices = [f"Note {note_id} - {title} ({timestamp})" for note_id, title, content, timestamp, conversation_id in notes_data] return gr.update(choices=choices, label=f"Found {total_count} notes") else: # This will now return all notes, ordered by timestamp notes_data, total_pages, total_count = search_notes_titles("") choices = [f"Note {note_id} - {title} ({timestamp})" for note_id, title, content, timestamp, conversation_id in notes_data] return gr.update(choices=choices, label=f"All notes ({total_count} total)") search_notes_button.click( search_notes, inputs=[search_notes_title, search_notes_by_keyword], outputs=[note_results] ) def load_selected_note(note_selection): if note_selection: note_id = int(note_selection.split(' ')[1]) note_data = get_note_by_id(note_id) if note_data: note_id, title, content = note_data[0] updated_note_state = {"note_id": note_id} return gr.update(value=title), gr.update(value=content), updated_note_state return gr.update(value=''), gr.update(value=''), {"note_id": None} note_results.change( load_selected_note, inputs=[note_results], outputs=[note_title, notes, note_state] ) def save_notes_function(note_title_text, notes_content, keywords_content, note_state_value, state_value): """Save the notes and associated keywords to the database.""" logging.info(f"Starting save_notes_function with state: {state_value}") logging.info(f"Note title: {note_title_text}") logging.info(f"Notes content length: {len(notes_content) if notes_content else 0}") try: # Check current state conversation_id = state_value.get("conversation_id") logging.info(f"Current conversation_id: {conversation_id}") # Create new conversation if none exists if not conversation_id: logging.info("No conversation ID found, creating new conversation") conversation_title = note_title_text if note_title_text else "Untitled Conversation" conversation_id = start_new_conversation(title=conversation_title) state_value = state_value.copy() # Create a new copy of the state state_value["conversation_id"] = conversation_id logging.info(f"Created new conversation with ID: {conversation_id}") if not notes_content: logging.warning("No notes content provided") return notes_content, note_state_value, state_value, gr.update( value="
Cannot save empty notes.
") # Save or update note note_id = note_state_value.get("note_id") if note_id: logging.info(f"Updating existing note with ID: {note_id}") update_note(note_id, note_title_text, notes_content) else: logging.info(f"Creating new note for conversation: {conversation_id}") note_id = save_notes(conversation_id, note_title_text or "Untitled Note", notes_content) note_state_value = {"note_id": note_id} logging.info(f"Created new note with ID: {note_id}") # Handle keywords if keywords_content: logging.info("Processing keywords") clear_keywords_from_note(note_id) keywords = [kw.strip() for kw in keywords_content.split(',')] add_keywords_to_note(note_id, keywords) logging.info(f"Added keywords: {keywords}") logging.info("Notes saved successfully") return ( notes_content, note_state_value, state_value, gr.update(value="Notes saved successfully!
") ) except Exception as e: logging.error(f"Error in save_notes_function: {str(e)}", exc_info=True) return ( notes_content, note_state_value, state_value, gr.update(value=f"Error saving notes: {str(e)}
") ) save_notes_btn.click( save_notes_function, inputs=[note_title, notes, keywords_for_notes, note_state, state], outputs=[notes, note_state, state, status_message] ) def clear_notes_function(): """Clear notes for the current note.""" return gr.update(value=''), {"note_id": None} clear_notes_btn.click( clear_notes_function, outputs=[notes, note_state] ) # Initialize the conversation list load_conversation.choices = update_conversation_list() def load_conversation_history(selected_conversation, state_value): try: if not selected_conversation: return [], state_value, "", gr.update(value="", visible=False), gr.update(visible=False) # Extract conversation ID match = re.search(r'\(ID: ([0-9a-fA-F\-]+)\)', selected_conversation) if not match: logging.error(f"Invalid conversation format: {selected_conversation}") return [], state_value, "", gr.update(value="", visible=False), gr.update(visible=False) conversation_id = match.group(1) chat_data, total_pages_val, _ = load_chat_history(conversation_id, 1, 50) # Update state with valid conversation id updated_state = state_value.copy() updated_state["conversation_id"] = conversation_id updated_state["conversation_messages"] = chat_data # Format chat history history = [] for role, content in chat_data: if role == 'user': history.append((content, '')) elif history: history[-1] = (history[-1][0], content) # Fetch and display the conversation rating rating = get_conversation_rating(conversation_id) if rating is not None: rating_text = f"**Current Rating:** {rating} star(s)" rating_display_update = gr.update(value=rating_text, visible=True) rating_input_update = gr.update(value=str(rating), visible=True) else: rating_display_update = gr.update(value="**Current Rating:** Not Rated", visible=True) rating_input_update = gr.update(value=None, visible=True) notes_content = get_notes(conversation_id) return history, updated_state, "\n".join( notes_content) if notes_content else "", rating_display_update, rating_input_update except Exception as e: logging.error(f"Error loading conversation: {str(e)}") return [], state_value, "", gr.update(value="", visible=False), gr.update(visible=False) load_conversation.change( load_conversation_history, inputs=[load_conversation, state], outputs=[chatbot, state, notes, rating_display, rating_input] ) # Modify save_conversation_function to use gr.update() def save_conversation_function(conversation_title_text, keywords_text, rating_value, state_value): conversation_messages = state_value.get("conversation_messages", []) conversation_id = state_value.get("conversation_id") if not conversation_messages: return gr.update( value="No conversation to save.
" ), state_value, gr.update(), gr.update(value="", visible=False), gr.update(visible=False) # Start a new conversation in the database if not existing if not conversation_id: conversation_id = start_new_conversation( conversation_title_text if conversation_title_text else "Untitled Conversation" ) else: # Update the conversation title if it has changed update_conversation_title(conversation_id, conversation_title_text) # Save the messages for role, content in conversation_messages: save_message(conversation_id, role, content) # Save keywords if provided if keywords_text: add_keywords_to_conversation(conversation_id, [kw.strip() for kw in keywords_text.split(',')]) # Save the rating if provided try: if rating_value: set_conversation_rating(conversation_id, int(rating_value)) except ValueError as ve: logging.error(f"Invalid rating value: {ve}") return gr.update( value=f"Invalid rating: {ve}
" ), state_value, gr.update(), gr.update(value="", visible=False), gr.update(visible=False) # Update state updated_state = update_state(state_value, conversation_id=conversation_id) # Update the conversation list conversation_choices = update_conversation_list() # Reset rating display and input rating_display_update = gr.update(value=f"**Current Rating:** {rating_value} star(s)", visible=True) rating_input_update = gr.update(value=rating_value, visible=True) return gr.update( value="Conversation saved successfully.
" ), updated_state, gr.update(choices=conversation_choices), rating_display_update, rating_input_update save_conversation_button.click( save_conversation_function, inputs=[conversation_title, keywords, rating_input, state], outputs=[status_message, state, load_conversation, rating_display, rating_input] ) def start_new_conversation_wrapper(title, state_value): # Reset the state with no conversation_id and empty conversation messages updated_state = update_state(state_value, conversation_id=None, page=1, conversation_messages=[]) # Clear the chat history and reset rating components return [], updated_state, gr.update(value="", visible=False), gr.update(value=None, visible=False) new_conversation.click( start_new_conversation_wrapper, inputs=[conversation_title, state], outputs=[chatbot, state, rating_display, rating_input] ) def update_file_list(page): files, total_pages, current_page = get_paginated_files(page) choices = [f"{title} (ID: {id})" for id, title in files] return gr.update(choices=choices), gr.update(value=f"Page {current_page} of {total_pages}"), current_page def next_page_fn(current_page): return update_file_list(current_page + 1) def prev_page_fn(current_page): return update_file_list(max(1, current_page - 1)) def update_context_source(choice): # Update visibility based on context source choice return { existing_file: gr.update(visible=choice == "Existing File"), prev_page_btn: gr.update(visible=choice == "Search Database"), next_page_btn: gr.update(visible=choice == "Search Database"), page_info: gr.update(visible=choice == "Search Database"), search_query: gr.update(visible=choice == "Search Database"), search_button: gr.update(visible=choice == "Search Database"), search_results: gr.update(visible=choice == "Search Database"), file_upload: gr.update(visible=choice == "Upload File"), convert_to_text: gr.update(visible=choice == "Upload File"), keywords: gr.update(visible=choice == "Upload File") } context_source.change(update_context_source, context_source, [existing_file, prev_page_btn, next_page_btn, page_info, search_query, search_button, search_results, file_upload, convert_to_text, keywords]) next_page_btn.click(next_page_fn, inputs=[file_page], outputs=[existing_file, page_info, file_page]) prev_page_btn.click(prev_page_fn, inputs=[file_page], outputs=[existing_file, page_info, file_page]) # Initialize the file list when context source is changed to "Existing File" context_source.change(lambda choice: update_file_list(1) if choice == "Existing File" else (gr.update(), gr.update(), 1), inputs=[context_source], outputs=[existing_file, page_info, file_page]) def perform_search(query, selected_databases, keywords): try: results = [] # Iterate over selected database types and perform searches accordingly for database_type in selected_databases: if database_type == "Media DB": # FIXME - check for existence of keywords before setting as search field search_fields = ["title", "content", "keywords"] results += search_media_db(query, search_fields, keywords, page=1, results_per_page=25) elif database_type == "RAG Chat": results += search_rag_chat(query) elif database_type == "RAG Notes": results += search_rag_notes(query) elif database_type == "Character Chat": results += search_character_chat(query) elif database_type == "Character Cards": results += search_character_cards(query) # Remove duplicate results if necessary results = list(set(results)) return gr.update(choices=results) except Exception as e: gr.Error(f"Error performing search: {str(e)}") return gr.update(choices=[]) # Click Event for the DB Search Button search_button.click( perform_search, inputs=[search_query, db_choice, keywords_input], outputs=[search_results] ) def rephrase_question(history, latest_question, api_choice): logging.info("RAG QnA: Rephrasing question") conversation_history = "\n".join([f"User: {h[0]}\nAssistant: {h[1]}" for h in history[:-1]]) prompt = f"""You are a helpful assistant. Given the conversation history and the latest question, resolve any ambiguous references in the latest question. Conversation History: {conversation_history} Latest Question: {latest_question} Rewritten Question:""" # Use the selected API to generate the rephrased question rephrased_question = generate_answer(api_choice, prompt, "") logging.info(f"Rephrased question: {rephrased_question}") return rephrased_question.strip() # FIXME - RAG DB selection def rag_qa_chat_wrapper( message, history, context_source, existing_file, search_results, file_upload, convert_to_text, keywords, api_choice, use_query_rewriting, state_value, keywords_input, top_k_input, use_re_ranking, db_choices, auto_save_enabled ): try: logging.info(f"Starting rag_qa_chat_wrapper with message: {message}") logging.info(f"Context source: {context_source}") logging.info(f"API choice: {api_choice}") logging.info(f"Query rewriting: {'enabled' if use_query_rewriting else 'disabled'}") logging.info(f"Selected DB Choices: {db_choices}") # Show loading indicator yield history, "", gr.update(visible=True), state_value, gr.update(visible=False), gr.update( visible=False) conversation_id = state_value.get("conversation_id") conversation_messages = state_value.get("conversation_messages", []) # Save the user's message if conversation_id: save_message(conversation_id, "user", message) else: # Append to in-memory messages conversation_messages.append(("user", message)) state_value["conversation_messages"] = conversation_messages # Ensure api_choice is a string api_choice_str = api_choice.value if isinstance(api_choice, gr.components.Dropdown) else api_choice logging.info(f"Resolved API choice: {api_choice_str}") # Only rephrase the question if it's not the first query and query rewriting is enabled if len(history) > 0 and use_query_rewriting: rephrased_question = rephrase_question(history, message, api_choice_str) logging.info(f"Original question: {message}") logging.info(f"Rephrased question: {rephrased_question}") else: rephrased_question = message logging.info(f"Using original question: {message}") if context_source == "All Files in the Database": # Use the enhanced_rag_pipeline to search the selected databases context = enhanced_rag_pipeline( rephrased_question, api_choice_str, keywords_input, top_k_input, use_re_ranking, database_types=db_choices # Pass the list of selected databases ) logging.info(f"Using enhanced_rag_pipeline for database search") elif context_source == "Search Database": context = f"media_id:{search_results.split('(ID: ')[1][:-1]}" logging.info(f"Using search result with context: {context}") else: # Upload File logging.info("Processing uploaded file") if file_upload is None: raise ValueError("No file uploaded") # Process the uploaded file file_path = file_upload.name file_name = os.path.basename(file_path) logging.info(f"Uploaded file: {file_name}") if convert_to_text: logging.info("Converting file to plain text") content = convert_file_to_text(file_path) else: logging.info("Reading file content") with open(file_path, 'r', encoding='utf-8') as f: content = f.read() logging.info(f"File content length: {len(content)} characters") # Process keywords if not keywords: keywords = "default,rag-file-upload" logging.info(f"Keywords: {keywords}") # Add the content to the database and get the media_id logging.info("Adding content to database") result = add_media_with_keywords( url=file_name, title=file_name, media_type='document', content=content, keywords=keywords, prompt='No prompt for uploaded files', summary='No summary for uploaded files', transcription_model='None', author='Unknown', ingestion_date=datetime.now().strftime('%Y-%m-%d') ) logging.info(f"Result from add_media_with_keywords: {result}") if isinstance(result, tuple): media_id, _ = result else: media_id = result context = f"media_id:{media_id}" logging.info(f"Context for uploaded file: {context}") logging.info("Calling rag_qa_chat function") new_history, response = rag_qa_chat(rephrased_question, history, context, api_choice_str) # Log first 100 chars of response logging.info(f"Response received from rag_qa_chat: {response[:100]}...") # Save assistant's response if conversation_id: save_message(conversation_id, "assistant", response) else: conversation_messages.append(("assistant", response)) state_value["conversation_messages"] = conversation_messages # Update the state updated_state = auto_save_conversation(message, response, state_value, auto_save_enabled) updated_state["conversation_messages"] = conversation_messages # Safely update history if new_history: new_history[-1] = (message, response) else: new_history = [(message, response)] # Get the current rating and update display conversation_id = updated_state.get("conversation_id") if conversation_id: rating = get_conversation_rating(conversation_id) if rating is not None: rating_display_update = gr.update(value=f"**Current Rating:** {rating} star(s)", visible=True) rating_input_update = gr.update(value=str(rating), visible=True) else: rating_display_update = gr.update(value="**Current Rating:** Not Rated", visible=True) rating_input_update = gr.update(value=None, visible=True) else: rating_display_update = gr.update(value="", visible=False) rating_input_update = gr.update(value=None, visible=False) gr.Info("Response generated successfully") logging.info("rag_qa_chat_wrapper completed successfully") yield new_history, "", gr.update( visible=False), updated_state, rating_display_update, rating_input_update except ValueError as e: logging.error(f"Input error in rag_qa_chat_wrapper: {str(e)}") gr.Error(f"Input error: {str(e)}") yield history, "", gr.update(visible=False), state_value, gr.update(visible=False), gr.update( visible=False) except DatabaseError as e: logging.error(f"Database error in rag_qa_chat_wrapper: {str(e)}") gr.Error(f"Database error: {str(e)}") yield history, "", gr.update(visible=False), state_value, gr.update(visible=False), gr.update( visible=False) except Exception as e: logging.error(f"Unexpected error in rag_qa_chat_wrapper: {e}", exc_info=True) gr.Error("An unexpected error occurred. Please try again later.") yield history, "", gr.update(visible=False), state_value, gr.update(visible=False), gr.update( visible=False) def clear_chat_history(): return [], "", gr.update(value="", visible=False), gr.update(value=None, visible=False) submit.click( rag_qa_chat_wrapper, inputs=[ msg, chatbot, context_source, existing_file, search_results, file_upload, convert_to_text, keywords, api_choice, use_query_rewriting, state, keywords_input, top_k_input, use_re_ranking, db_choice, auto_save_checkbox ], outputs=[chatbot, msg, loading_indicator, state, rating_display, rating_input], ) clear_chat.click( clear_chat_history, outputs=[chatbot, msg, rating_display, rating_input] ) return ( context_source, existing_file, search_query, search_button, search_results, file_upload, convert_to_text, keywords, api_choice, use_query_rewriting, chatbot, msg, submit, clear_chat, ) def create_rag_qa_notes_management_tab(): # New Management Tab with gr.TabItem("Notes Management", visible=True): gr.Markdown("# RAG QA Notes Management") management_state = gr.State({ "selected_conversation_id": None, "selected_note_id": None, }) with gr.Row(): with gr.Column(scale=1): # Search Notes search_notes_title = gr.Textbox(label="Search Notes by Title") search_notes_by_keyword = gr.Textbox(label="Search Notes by Keywords") search_notes_button = gr.Button("Search Notes") notes_list = gr.Dropdown(label="Notes", choices=[]) # Manage Notes load_note_button = gr.Button("Load Note") delete_note_button = gr.Button("Delete Note") note_title_input = gr.Textbox(label="Note Title") note_content_input = gr.TextArea(label="Note Content", lines=20) note_keywords_input = gr.Textbox(label="Note Keywords (comma-separated)", value="default_note_keyword") save_note_button = gr.Button("Save Note") create_new_note_button = gr.Button("Create New Note") status_message = gr.HTML() # Function Definitions def search_notes(search_notes_title, keywords): if keywords: keywords_list = [kw.strip() for kw in keywords.split(',')] notes_data, total_pages, total_count = get_notes_by_keywords(keywords_list) choices = [f"Note {note_id} - {title} ({timestamp})" for note_id, title, content, timestamp, conversation_id in notes_data] return gr.update(choices=choices, label=f"Found {total_count} notes") elif search_notes_title: notes_data, total_pages, total_count = search_notes_titles(search_notes_title) choices = [f"Note {note_id} - {title} ({timestamp})" for note_id, title, content, timestamp, conversation_id in notes_data] return gr.update(choices=choices, label=f"Found {total_count} notes") else: # This will now return all notes, ordered by timestamp notes_data, total_pages, total_count = search_notes_titles("") choices = [f"Note {note_id} - {title} ({timestamp})" for note_id, title, content, timestamp, conversation_id in notes_data] return gr.update(choices=choices, label=f"All notes ({total_count} total)") search_notes_button.click( search_notes, inputs=[search_notes_title, search_notes_by_keyword], outputs=[notes_list] ) def load_selected_note(selected_note, state_value): if selected_note: note_id = int(selected_note.split('(ID: ')[1][:-1]) note_data = get_note_by_id(note_id) if note_data: note_id, title, content = note_data[0] state_value["selected_note_id"] = note_id # Get keywords for the note keywords = get_keywords_for_note(note_id) keywords_str = ', '.join(keywords) return ( gr.update(value=title), gr.update(value=content), gr.update(value=keywords_str), state_value ) return gr.update(value=''), gr.update(value=''), gr.update(value=''), state_value load_note_button.click( load_selected_note, inputs=[notes_list, management_state], outputs=[note_title_input, note_content_input, note_keywords_input, management_state] ) def save_note_function(title, content, keywords_str, state_value): note_id = state_value["selected_note_id"] if note_id: update_note(note_id, title, content) if keywords_str: # Clear existing keywords and add new ones clear_keywords_from_note(note_id) keywords_list = [kw.strip() for kw in keywords_str.split(',')] add_keywords_to_note(note_id, keywords_list) return gr.Info("Note updated successfully.") else: # Create new note conversation_id = state_value.get("selected_conversation_id") if conversation_id: note_id = save_notes(conversation_id, title, content) state_value["selected_note_id"] = note_id if keywords_str: keywords_list = [kw.strip() for kw in keywords_str.split(',')] add_keywords_to_note(note_id, keywords_list) return gr.Info("New note created successfully.") else: return gr.Error("No conversation selected. Cannot create a new note.") save_note_button.click( save_note_function, inputs=[note_title_input, note_content_input, note_keywords_input, management_state], outputs=[] ) def delete_selected_note(state_value): note_id = state_value["selected_note_id"] if note_id: delete_note(note_id) # Reset state state_value["selected_note_id"] = None # Update notes list updated_notes = search_notes("", "") return updated_notes, gr.update(value="Note deleted successfully."), state_value else: return gr.update(), gr.update(value="No note selected."), state_value delete_note_button.click( delete_selected_note, inputs=[management_state], outputs=[notes_list, status_message, management_state] ) def create_new_note_function(state_value): state_value["selected_note_id"] = None return gr.update(value=''), gr.update(value=''), gr.update(value=''), state_value create_new_note_button.click( create_new_note_function, inputs=[management_state], outputs=[note_title_input, note_content_input, note_keywords_input, management_state] ) def create_rag_qa_chat_management_tab(): # New Management Tab with gr.TabItem("Chat Management", visible=True): gr.Markdown("# RAG QA Chat Conversation Management") management_state = gr.State({ "selected_conversation_id": None, "selected_note_id": None, }) # State to store the mapping between titles and IDs conversation_mapping = gr.State({}) with gr.Row(): with gr.Column(scale=1): # Search Conversations with gr.Group(): gr.Markdown("## Search Conversations") title_search = gr.Textbox( label="Search by Title", placeholder="Enter title to search..." ) content_search = gr.Textbox( label="Search in Chat Content", placeholder="Enter text to search in messages..." ) keyword_search = gr.Textbox( label="Filter by Keywords (comma-separated)", placeholder="keyword1, keyword2, ..." ) search_conversations_button = gr.Button("Search Conversations") conversations_list = gr.Dropdown(label="Conversations", choices=[]) new_conversation_button = gr.Button("New Conversation") # Manage Conversations load_conversation_button = gr.Button("Load Conversation") delete_conversation_button = gr.Button("Delete Conversation") conversation_title_input = gr.Textbox(label="Conversation Title") conversation_content_input = gr.TextArea(label="Conversation Content", lines=20) save_conversation_button = gr.Button("Save Conversation") status_message = gr.HTML() # Function Definitions def search_conversations(title_query, content_query, keywords): try: # Parse keywords if provided keywords_list = None if keywords and keywords.strip(): keywords_list = [kw.strip() for kw in keywords.split(',')] # Search using existing search_conversations_by_keywords function with all criteria results, total_pages, total_count = search_conversations_by_keywords( keywords=keywords_list, title_query=title_query if title_query.strip() else None, content_query=content_query if content_query.strip() else None ) # Build choices as list of titles (ensure uniqueness) choices = [] mapping = {} for conv in results: conversation_id = conv['conversation_id'] title = conv['title'] display_title = f"{title} (ID: {conversation_id[:8]})" choices.append(display_title) mapping[display_title] = conversation_id return gr.update(choices=choices), mapping except Exception as e: logging.error(f"Error in search_conversations: {str(e)}") return gr.update(choices=[]), {} # Update the search button click event search_conversations_button.click( search_conversations, inputs=[title_search, content_search, keyword_search], outputs=[conversations_list, conversation_mapping] ) def load_selected_conversation(selected_title, state_value, mapping): conversation_id = mapping.get(selected_title) if conversation_id: # Load conversation title conversation_title = get_conversation_title(conversation_id) # Load conversation messages messages, total_pages, total_count = load_chat_history(conversation_id) # Concatenate messages into a single string conversation_content = "" for role, content in messages: conversation_content += f"{role}: {content}\n\n" # Update state new_state = state_value.copy() new_state["selected_conversation_id"] = conversation_id return ( gr.update(value=conversation_title), gr.update(value=conversation_content.strip()), new_state ) return gr.update(value=''), gr.update(value=''), state_value load_conversation_button.click( load_selected_conversation, inputs=[conversations_list, management_state, conversation_mapping], outputs=[conversation_title_input, conversation_content_input, management_state] ) def save_conversation(title, content, state_value): conversation_id = state_value["selected_conversation_id"] if conversation_id: # Update conversation title update_conversation_title(conversation_id, title) # Clear existing messages delete_messages_in_conversation(conversation_id) # Parse the content back into messages messages = [] for line in content.strip().split('\n\n'): if ': ' in line: role, message_content = line.split(': ', 1) messages.append((role.strip(), message_content.strip())) else: # If the format is incorrect, skip or handle accordingly continue # Save new messages for role, message_content in messages: save_message(conversation_id, role, message_content) return ( gr.HTML("Conversation updated successfully.
"), gr.update(value=title), gr.update(value=content), state_value ) else: return ( gr.HTML("No conversation selected to save.
"), gr.update(value=title), gr.update(value=content), state_value ) save_conversation_button.click( save_conversation, inputs=[conversation_title_input, conversation_content_input, management_state], outputs=[status_message, conversation_title_input, conversation_content_input, management_state] ) def delete_selected_conversation(state_value, mapping): conversation_id = state_value["selected_conversation_id"] if conversation_id: delete_conversation(conversation_id) # Reset state new_state = state_value.copy() new_state["selected_conversation_id"] = None # Update conversations list and mapping conversations, _, _ = get_all_conversations() choices = [] new_mapping = {} for conv_id, title in conversations: display_title = f"{title} (ID: {conv_id[:8]})" choices.append(display_title) new_mapping[display_title] = conv_id return ( gr.update(choices=choices, value=None), gr.HTML("Conversation deleted successfully.
"), new_state, gr.update(value=''), gr.update(value=''), new_mapping ) else: return ( gr.update(), gr.HTML("No conversation selected.
"), state_value, gr.update(), gr.update(), mapping ) delete_conversation_button.click( delete_selected_conversation, inputs=[management_state, conversation_mapping], outputs=[ conversations_list, status_message, management_state, conversation_title_input, conversation_content_input, conversation_mapping ] ) def create_new_conversation(state_value, mapping): conversation_id = start_new_conversation() # Update state new_state = state_value.copy() new_state["selected_conversation_id"] = conversation_id # Update conversations list and mapping conversations, _, _ = get_all_conversations() choices = [] new_mapping = {} for conv_id, title in conversations: display_title = f"{title} (ID: {conv_id[:8]})" choices.append(display_title) new_mapping[display_title] = conv_id # Set the new conversation as selected selected_title = f"Untitled Conversation (ID: {conversation_id[:8]})" return ( gr.update(choices=choices, value=selected_title), gr.update(value='Untitled Conversation'), gr.update(value=''), gr.HTML("New conversation created.
"), new_state, new_mapping ) new_conversation_button.click( create_new_conversation, inputs=[management_state, conversation_mapping], outputs=[ conversations_list, conversation_title_input, conversation_content_input, status_message, management_state, conversation_mapping ] ) def delete_messages_in_conversation_wrapper(conversation_id): """Wrapper function to delete all messages in a conversation.""" try: delete_messages_in_conversation(conversation_id) logging.info(f"Messages in conversation '{conversation_id}' deleted successfully.") except Exception as e: logging.error(f"Error deleting messages in conversation '{conversation_id}': {e}") raise def get_conversation_title_wrapper(conversation_id): """Helper function to get the conversation title.""" result = get_conversation_title(conversation_id) if result: return result[0][0] else: return "Untitled Conversation" def create_export_data_tab(): with gr.TabItem("Export Data"): gr.Markdown("# Export Data") export_option = gr.Radio( ["Export All", "Export Selected"], label="Export Option", value="Export All" ) conversations_checklist = gr.CheckboxGroup( choices=[], label="Select Conversations", visible=False ) notes_checklist = gr.CheckboxGroup( choices=[], label="Select Notes", visible=False ) export_button = gr.Button("Export") download_link = gr.File(label="Download Exported Data", visible=False) status_message = gr.HTML() # Function to update visibility and populate checklists def update_visibility(export_option_value): if export_option_value == "Export Selected": # Fetch conversations and notes to populate the checklists conversations = fetch_all_conversations() notes = fetch_all_notes() conversation_choices = [f"{title} (ID: {conversation_id})" for conversation_id, title, _ in conversations] note_choices = [f"{title} (ID: {note_id})" for note_id, title, _ in notes] return ( gr.update(visible=True, choices=conversation_choices), gr.update(visible=True, choices=note_choices) ) else: return ( gr.update(visible=False), gr.update(visible=False) ) export_option.change( update_visibility, inputs=[export_option], outputs=[conversations_checklist, notes_checklist] ) import zipfile import io def update_visibility(export_option_value): if export_option_value == "Export Selected": # Fetch conversations and notes to populate the checklists conversations = fetch_all_conversations() notes = fetch_all_notes() conversation_choices = [f"{title} (ID: {conversation_id})" for conversation_id, title, _ in conversations] note_choices = [f"{title} (ID: {note_id})" for note_id, title, _ in notes] return ( gr.update(visible=True, choices=conversation_choices), gr.update(visible=True, choices=note_choices) ) else: return ( gr.update(visible=False), gr.update(visible=False) ) export_option.change( update_visibility, inputs=[export_option], outputs=[conversations_checklist, notes_checklist] ) def export_data_function(export_option, selected_conversations, selected_notes): try: zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: if export_option == "Export All": # Fetch all conversations and notes conversations = fetch_all_conversations() notes = fetch_all_notes() else: # Fetch selected conversations and notes conversation_ids = [int(item.split(' (ID: ')[1][:-1]) for item in selected_conversations] note_ids = [int(item.split(' (ID: ')[1][:-1]) for item in selected_notes] conversations = fetch_conversations_by_ids(conversation_ids) notes = fetch_notes_by_ids(note_ids) # Export conversations for conversation in conversations: conversation_id, title, _ = conversation filename = f"conversation_{conversation_id}_{title.replace(' ', '_')}.md" zip_file.writestr(filename, conversation) # Export notes for note in notes: note_id, title, _ = note filename = f"note_{note_id}_{title.replace(' ', '_')}.md" zip_file.writestr(filename, note) zip_buffer.seek(0) return zip_buffer, gr.update(visible=True), gr.update( value="Export successful!
") except Exception as e: logging.error(f"Error exporting data: {str(e)}") return None, gr.update(visible=False), gr.update(value=f"Error: {str(e)}
") export_button.click( export_data_function, inputs=[export_option, conversations_checklist, notes_checklist], outputs=[download_link, download_link, status_message] ) def convert_file_to_text(file_path): """Convert various file types to plain text.""" file_extension = os.path.splitext(file_path)[1].lower() if file_extension == '.pdf': return extract_text_and_format_from_pdf(file_path) elif file_extension == '.epub': return read_epub(file_path) elif file_extension in ['.json', '.csv']: return read_structured_file(file_path) elif file_extension == '.docx': return docx2txt.process(file_path) elif file_extension in ['.txt', '.md', '.rtf']: with open(file_path, 'r', encoding='utf-8') as f: return f.read() else: raise ValueError(f"Unsupported file type: {file_extension}") def read_structured_file(file_path): """Read and convert JSON or CSV files to text.""" file_extension = os.path.splitext(file_path)[1].lower() if file_extension == '.json': with open(file_path, 'r') as file: data = json.load(file) return json.dumps(data, indent=2) elif file_extension == '.csv': with open(file_path, 'r', newline='') as file: csv_reader = csv.reader(file) return '\n'.join([','.join(row) for row in csv_reader]) else: raise ValueError(f"Unsupported file type: {file_extension}") # # End of RAG_QA_Chat_tab.py ########################################################################################################################