Spaces:
Running
Running
# RAG_QA_Chat_tab.py | |
# Description: Gradio UI for RAG QA Chat | |
# | |
# Imports | |
import csv | |
import logging | |
import json | |
import os | |
from datetime import datetime | |
# | |
# External Imports | |
import docx2txt | |
import gradio as gr | |
# Local Imports | |
from App_Function_Libraries.Books.Book_Ingestion_Lib import read_epub | |
from App_Function_Libraries.DB.DB_Manager import DatabaseError, get_paginated_files, add_media_with_keywords | |
from App_Function_Libraries.PDF.PDF_Ingestion_Lib import extract_text_and_format_from_pdf | |
from App_Function_Libraries.RAG.RAG_Libary_2 import generate_answer, enhanced_rag_pipeline | |
from App_Function_Libraries.RAG.RAG_QA_Chat import search_database, rag_qa_chat | |
# Eventually... FIXME | |
from App_Function_Libraries.RAG.RAG_QA_Chat import load_chat_history, save_chat_history | |
# | |
######################################################################################################################## | |
# | |
# Functions: | |
def create_rag_qa_chat_tab(): | |
with gr.TabItem("RAG QA Chat"): | |
gr.Markdown("# RAG QA Chat") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
context_source = gr.Radio( | |
["All Files in the Database", "Search Database", "Upload File"], | |
label="Context Source", | |
value="All Files in the Database" | |
) | |
existing_file = gr.Dropdown(label="Select Existing File", choices=[], interactive=True) | |
file_page = gr.State(value=1) | |
with gr.Row(): | |
prev_page_btn = gr.Button("Previous Page") | |
next_page_btn = gr.Button("Next Page") | |
page_info = gr.HTML("Page 1") | |
search_query = gr.Textbox(label="Search Query", visible=False) | |
search_button = gr.Button("Search", visible=False) | |
search_results = gr.Dropdown(label="Search Results", choices=[], visible=False) | |
file_upload = gr.File( | |
label="Upload File", | |
visible=False, | |
file_types=["txt", "pdf", "epub", "md", "rtf", "json", "csv"] | |
) | |
convert_to_text = gr.Checkbox(label="Convert to plain text", visible=False) | |
keywords = gr.Textbox(label="Keywords (comma-separated)", visible=False) | |
api_choice = gr.Dropdown( | |
choices=["Local-LLM", "OpenAI", "Anthropic", "Cohere", "Groq", "DeepSeek", "Mistral", "OpenRouter", "Llama.cpp", "Kobold", "Ooba", "Tabbyapi", "VLLM", "ollama", "HuggingFace"], | |
label="Select API for RAG", | |
value="OpenAI" | |
) | |
use_query_rewriting = gr.Checkbox(label="Use Query Rewriting", value=True) | |
with gr.Column(scale=2): | |
chatbot = gr.Chatbot(height=500) | |
msg = gr.Textbox(label="Enter your message") | |
submit = gr.Button("Submit (Might take a few seconds/turns blue while processing...)") | |
clear_chat = gr.Button("Clear Chat History") | |
loading_indicator = gr.HTML(visible=False) | |
def update_file_list(page): | |
files, total_pages, current_page = get_paginated_files(page) | |
choices = [f"{title} (ID: {id})" for id, title in files] | |
return gr.update(choices=choices), gr.update(value=f"Page {current_page} of {total_pages}"), current_page | |
def next_page_fn(current_page): | |
return update_file_list(current_page + 1) | |
def prev_page_fn(current_page): | |
return update_file_list(max(1, current_page - 1)) | |
def update_context_source(choice): | |
return { | |
existing_file: gr.update(visible=choice == "Existing File"), | |
prev_page_btn: gr.update(visible=choice == "Existing File"), | |
next_page_btn: gr.update(visible=choice == "Existing File"), | |
page_info: gr.update(visible=choice == "Existing File"), | |
search_query: gr.update(visible=choice == "Search Database"), | |
search_button: gr.update(visible=choice == "Search Database"), | |
search_results: gr.update(visible=choice == "Search Database"), | |
file_upload: gr.update(visible=choice == "Upload File"), | |
convert_to_text: gr.update(visible=choice == "Upload File"), | |
keywords: gr.update(visible=choice == "Upload File") | |
} | |
context_source.change(update_context_source, context_source, | |
[existing_file, prev_page_btn, next_page_btn, page_info, search_query, search_button, | |
search_results, file_upload, convert_to_text, keywords]) | |
next_page_btn.click(next_page_fn, inputs=[file_page], outputs=[existing_file, page_info, file_page]) | |
prev_page_btn.click(prev_page_fn, inputs=[file_page], outputs=[existing_file, page_info, file_page]) | |
# Initialize the file list | |
context_source.change(lambda: update_file_list(1), outputs=[existing_file, page_info, file_page]) | |
loading_indicator = gr.HTML(visible=False) | |
def rag_qa_chat_wrapper(message, history, context_source, existing_file, search_results, file_upload, | |
convert_to_text, keywords, api_choice, use_query_rewriting): | |
try: | |
logging.info(f"Starting rag_qa_chat_wrapper with message: {message}") | |
logging.info(f"Context source: {context_source}") | |
logging.info(f"API choice: {api_choice}") | |
logging.info(f"Query rewriting: {'enabled' if use_query_rewriting else 'disabled'}") | |
# Show loading indicator | |
yield history, "", gr.update(visible=True) | |
# Ensure api_choice is a string | |
api_choice = api_choice.value if isinstance(api_choice, gr.components.Dropdown) else api_choice | |
logging.info(f"Resolved API choice: {api_choice}") | |
# Only rephrase the question if it's not the first query and query rewriting is enabled | |
if len(history) > 0 and use_query_rewriting: | |
rephrased_question = rephrase_question(history, message, api_choice) | |
logging.info(f"Original question: {message}") | |
logging.info(f"Rephrased question: {rephrased_question}") | |
else: | |
rephrased_question = message | |
logging.info(f"Using original question: {message}") | |
if context_source == "All Files in the Database": | |
# Use the enhanced_rag_pipeline to search the entire database | |
context = enhanced_rag_pipeline(rephrased_question, api_choice) | |
logging.info(f"Using enhanced_rag_pipeline for database search") | |
elif context_source == "Search Database": | |
context = f"media_id:{search_results.split('(ID: ')[1][:-1]}" | |
logging.info(f"Using search result with context: {context}") | |
else: # Upload File | |
logging.info("Processing uploaded file") | |
if file_upload is None: | |
raise ValueError("No file uploaded") | |
# Process the uploaded file | |
file_path = file_upload.name | |
file_name = os.path.basename(file_path) | |
logging.info(f"Uploaded file: {file_name}") | |
if convert_to_text: | |
logging.info("Converting file to plain text") | |
content = convert_file_to_text(file_path) | |
else: | |
logging.info("Reading file content") | |
with open(file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
logging.info(f"File content length: {len(content)} characters") | |
# Process keywords | |
if not keywords: | |
keywords = "default,rag-file-upload" | |
logging.info(f"Keywords: {keywords}") | |
# Add the content to the database and get the media_id | |
logging.info("Adding content to database") | |
result = add_media_with_keywords( | |
url=file_name, | |
title=file_name, | |
media_type='document', | |
content=content, | |
keywords=keywords, | |
prompt='No prompt for uploaded files', | |
summary='No summary for uploaded files', | |
transcription_model='None', | |
author='Unknown', | |
ingestion_date=datetime.now().strftime('%Y-%m-%d') | |
) | |
logging.info(f"Result from add_media_with_keywords: {result}") | |
if isinstance(result, tuple): | |
media_id, _ = result | |
else: | |
media_id = result | |
context = f"media_id:{media_id}" | |
logging.info(f"Context for uploaded file: {context}") | |
logging.info("Calling rag_qa_chat function") | |
new_history, response = rag_qa_chat(rephrased_question, history, context, api_choice) | |
# Log first 100 chars of response | |
logging.info( | |
f"Response received from rag_qa_chat: {response[:100]}...") | |
# Safely update history | |
if new_history: | |
new_history[-1] = (message, new_history[-1][1]) | |
else: | |
new_history = [(message, response)] | |
gr.Info("Response generated successfully") | |
logging.info("rag_qa_chat_wrapper completed successfully") | |
yield new_history, "", gr.update(visible=False) | |
except ValueError as e: | |
logging.error(f"Input error in rag_qa_chat_wrapper: {str(e)}") | |
gr.Error(f"Input error: {str(e)}") | |
yield history, "", gr.update(visible=False) | |
except DatabaseError as e: | |
logging.error(f"Database error in rag_qa_chat_wrapper: {str(e)}") | |
gr.Error(f"Database error: {str(e)}") | |
yield history, "", gr.update(visible=False) | |
except Exception as e: | |
logging.error(f"Unexpected error in rag_qa_chat_wrapper: {e}", exc_info=True) | |
gr.Error("An unexpected error occurred. Please try again later.") | |
yield history, "", gr.update(visible=False) | |
def rephrase_question(history, latest_question, api_choice): | |
# Thank you https://www.reddit.com/r/LocalLLaMA/comments/1fi1kex/multi_turn_conversation_and_rag/ | |
logging.info("RAG QnA: Rephrasing question") | |
conversation_history = "\n".join([f"User: {h[0]}\nAssistant: {h[1]}" for h in history[:-1]]) | |
prompt = f"""You are a helpful assistant. Given the conversation history and the latest question, resolve any ambiguous references in the latest question. | |
Conversation History: | |
{conversation_history} | |
Latest Question: | |
{latest_question} | |
Rewritten Question:""" | |
# Use the selected API to generate the rephrased question | |
rephrased_question = generate_answer(api_choice, prompt, "") | |
logging.info(f"Rephrased question: {rephrased_question}") | |
return rephrased_question.strip() | |
def perform_search(query): | |
try: | |
results = search_database(query) | |
return gr.update(choices=results) | |
except Exception as e: | |
gr.Error(f"Error performing search: {str(e)}") | |
return gr.update(choices=[]) | |
def clear_chat_history(): | |
return [], "" | |
search_button.click(perform_search, inputs=[search_query], outputs=[search_results]) | |
submit.click( | |
rag_qa_chat_wrapper, | |
inputs=[msg, chatbot, context_source, existing_file, search_results, file_upload, | |
convert_to_text, keywords, api_choice, use_query_rewriting], | |
outputs=[chatbot, msg, loading_indicator] | |
) | |
clear_chat.click(clear_chat_history, outputs=[chatbot, msg]) | |
return (context_source, existing_file, search_query, search_button, search_results, file_upload, | |
convert_to_text, keywords, api_choice, use_query_rewriting, chatbot, msg, submit, clear_chat) | |
def convert_file_to_text(file_path): | |
"""Convert various file types to plain text.""" | |
file_extension = os.path.splitext(file_path)[1].lower() | |
if file_extension == '.pdf': | |
return extract_text_and_format_from_pdf(file_path) | |
elif file_extension == '.epub': | |
return read_epub(file_path) | |
elif file_extension in ['.json', '.csv']: | |
return read_structured_file(file_path) | |
elif file_extension == '.docx': | |
return docx2txt.process(file_path) | |
elif file_extension in ['.txt', '.md', '.rtf']: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
return f.read() | |
else: | |
raise ValueError(f"Unsupported file type: {file_extension}") | |
def read_structured_file(file_path): | |
"""Read and convert JSON or CSV files to text.""" | |
file_extension = os.path.splitext(file_path)[1].lower() | |
if file_extension == '.json': | |
with open(file_path, 'r') as file: | |
data = json.load(file) | |
return json.dumps(data, indent=2) | |
elif file_extension == '.csv': | |
with open(file_path, 'r', newline='') as file: | |
csv_reader = csv.reader(file) | |
return '\n'.join([','.join(row) for row in csv_reader]) | |
else: | |
raise ValueError(f"Unsupported file type: {file_extension}") | |
# | |
# End of RAG_QA_Chat_tab.py | |
######################################################################################################################## | |
# |