import os import re import json import getpass import logging import openai import asyncio from typing import Any, List, Tuple, Dict import gradio as gr import llama_index from llama_index import Document from llama_index.llms import OpenAI from llama_index.embeddings import OpenAIEmbedding, HuggingFaceEmbedding from llama_index.llms import HuggingFaceLLM import requests from RAG_utils import PDFProcessor_Unstructured, PDFQueryEngine, HybridRetriever, MixtralLLM, KeywordSearch, base_utils, ConfigManager # Configure basic logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # Create a logger object logger = logging.getLogger(__name__) os.environ["TOKENIZERS_PARALLELISM"] = "false" config_manager = ConfigManager() #config_manager.load_config("api", "Config/api_config.json") config_manager.load_config("model", "model_config.json") openai.api_key = os.environ['OPENAI_API_KEY'] #config_manager.get_config_value("api", "OPENAI_API_KEY") hf_token = os.environ['HF_TOKEN']#config_manager.get_config_value("api", "HF_TOKEN") # PDF rendering and chunking parameters pdf_processing_config = config_manager.get_config_value("model", "pdf_processing") ALLOWED_EXTENSIONS = config_manager.get_config_value("model", "allowed_extensions") embed = config_manager.get_config_value("model", "embeddings") embed_model_name = config_manager.get_config_value("model", "embeddings_model") #llm_model = config_manager.get_config_value("model", "llm_model") model_temperature = config_manager.get_config_value("model", "model_temp") output_token_size = config_manager.get_config_value("model", "max_tokens") model_context_window = config_manager.get_config_value("model", "context_window") gpt_prompt_path = config_manager.get_config_value("model","GPT_PROMPT_PATH") mistral_prompt_path = config_manager.get_config_value("model","MISTRAL_PROMPT_PATH") info_prompt_path = config_manager.get_config_value("model", "INFO_PROMPT_PATH") peer_review_journals_path = config_manager.get_config_value("model", "peer_review_journals_path") eq_network_journals_path = config_manager.get_config_value("model", "eq_network_journals_path") queries = config_manager.get_config_value("model", "queries") criteria = config_manager.get_config_value("model", "criteria") num_criteria = len(queries) author_query = config_manager.get_config_value("model", "author_query") journal_query = config_manager.get_config_value("model", "journal_query") # Helper function to check if the file extension is allowed def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def generate_score_bar(score, num_criteria): # Convert and round the score from a 9-point scale to a 100-point scale score_out_of_100 = round((score / num_criteria) * 100) # Determine the color and text based on the original score if score == 9: color = "#4CAF50" # green text = "Very good" elif score in [7, 8]: color = "#FFEB3B" # yellow text = "Good" elif score in [5, 6]: color = "#FF9800" # orange text = "Ok" elif score in [3, 4]: color = "#F44336" # red text = "Bad" else: # score < 3 color = "#800000" # maroon text = "Very bad" # Create the HTML for the score bar score_bar_html = f"""

{text}

""" return score_bar_html def format_example(example): """ Formats a few-shot example into a string. Args: example (dict): A dictionary containing 'query', 'score', and 'reasoning' for the few-shot example. Returns: str: Formatted few-shot example text. """ return "Example:\nQuery: {}\n Direct Answer: {}\n".format( example['query'], example['Answer']) def process_pdf(uploaded_file, llm_model, n_criteria = num_criteria): # Process the PDF file pdf_processor = PDFProcessor_Unstructured(pdf_processing_config) merged_chunks, tables = pdf_processor.process_pdf_file(uploaded_file) documents = [Document(text=t) for t in merged_chunks] # Prompts and Queries utils = base_utils() info_prompt = utils.read_from_file(info_prompt_path) # LLM Model choice try: if llm_model == "Model 1": llm = OpenAI(model="gpt-4-1106-preview", temperature=model_temperature, max_tokens=output_token_size) general_prompt = utils.read_from_file(gpt_prompt_path) elif llm_model == "Model 2": if any(param is None for param in [model_context_window, output_token_size, model_temperature, hf_token]): raise ValueError("All parameters are required for Mistral LLM.") llm = MixtralLLM(context_window=model_context_window, num_output=output_token_size, temperature=model_temperature, model_name="mistralai/Mixtral-8x7B-Instruct-v0.1", api_key=hf_token) general_prompt = utils.read_from_file(mistral_prompt_path) else: raise ValueError(f"Unsupported language model: {llm_model}") except Exception as e: logger.error(f"Error initializing language model '{llm_model}': {e}", exc_info=True) raise # Or handle the exception as needed # Embedding model choice for RAG try: if embed == "openai": embed_model = OpenAIEmbedding(model="text-embedding-3-large") elif embed == "huggingface": # Use the specified model name embed_model = HuggingFaceEmbedding(embed_model_name) else: raise ValueError(f"Unsupported embedding model: {embed_model}") except Exception as e: logger.error(f"Error initializing embedding model: {e}", exc_info=True) raise peer_review_journals = utils.read_from_file(peer_review_journals_path) eq_network_journals = utils.read_from_file(eq_network_journals_path) peer_review_journals_list = peer_review_journals.split('\n') eq_network_journals_list = eq_network_journals.split('\n') modified_journal_query = "Is the given research paper published in any of the following journals: " + ", ".join(peer_review_journals_list) + "?" example_journal = {"query":modified_journal_query, "Answer": "The article is published in the Lancet."} example_author = {"query":author_query, "Answer": "Corresponding author. Stephanie J. Sohl, Ph.D., Department of Social Sciences & Health Policy, Wake Forest School of Medicine, Medical Center Boulevard, Winston-Salem, NC 27157, USA, ssohl@wakehealth.edu"} formatted_journal_example = format_example(example_journal) formatted_author_example = format_example(example_author) qa_author_prompt_with_example = info_prompt.replace("{few_shot_examples}", formatted_author_example) qa_journal_prompt_with_example = info_prompt.replace("{few_shot_examples}", formatted_journal_example) info_llm = OpenAI(model="gpt-4-1106-preview", temperature=model_temperature, max_tokens=output_token_size) pdf_info_query = PDFQueryEngine(documents, info_llm, embed_model, (info_prompt)) info_query_engine = pdf_info_query.setup_query_engine() journal_result = info_query_engine.query(modified_journal_query).response author_result = info_query_engine.query(author_query).response pdf_criteria_query = PDFQueryEngine(documents, info_llm, embed_model, (general_prompt)) # Check for prior registration nlp_methods = KeywordSearch(merged_chunks) eq_journal_result = nlp_methods.find_journal_name(journal_result, eq_network_journals_list) peer_journal_result = nlp_methods.find_journal_name(journal_result, peer_review_journals_list) registration_result = nlp_methods.check_registration() # Convert your asynchronous operations into a synchronous context using asyncio.run async def async_evaluation(): # This assumes that evaluate_with_llm_async is an async version of your method return await pdf_criteria_query.evaluate_with_llm_async(registration_result, peer_journal_result, eq_journal_result, queries) # Evaluate with OpenAI model total_score, criteria_met, score_percentage, reasoning = asyncio.run(async_evaluation()) reasoning_html = "" # Generate the score bar HTML score_bar_html = generate_score_bar(total_score, n_criteria) # Return the score as a string and the reasoning as HTML return str(round((total_score / n_criteria) * 100)) + "/100", score_bar_html, reasoning_html, author_result with gr.Blocks(theme=gr.themes.Glass( text_size="sm", font=[gr.themes.GoogleFont("Inconsolata"), "Arial", "sans-serif"], primary_hue="neutral", secondary_hue="gray")) as demo: gr.Markdown("## Med Library") with gr.Row(): file_upload = gr.File(label="Choose a paper", file_types=['.pdf']) with gr.Row(): models = ["Model 1", "Model 2"] model_choice = gr.Dropdown(models, label="Choose a model", value="Model 1") submit_button = gr.Button("Evaluate") score_output = gr.Textbox(label="Final Score:", interactive=False) score_bar_output = gr.HTML() reasoning_output = gr.HTML() # Heading for Author Information gr.Markdown("## Author Information") # Output for dynamically generated author information author_info_output = gr.Markdown() # Set the click event for the button submit_button.click( fn=process_pdf, inputs=[file_upload, model_choice], outputs=[score_output, score_bar_output, reasoning_output, author_info_output] ) #Launch the app demo.launch(share=True, server_name="0.0.0.0", server_port=7860) # Main route for file upload and display results # @app.route('/', methods=['GET', 'POST']) # def upload_and_display_results(): # total_score = 0 # score_percentage = 0 # reasoning = [] # criteria_met = 0 # if request.method == 'POST': # # Check if the post request has the file part # if 'file' not in request.files: # flash('No file part') # return redirect(request.url) # file = request.files['file'] # # If user does not select file, browser also submits an empty part without filename # if file.filename == '': # flash('No selected file') # return redirect(request.url) # if file and allowed_file(file.filename): # try: # # Process the PDF file # pdf_processor = PDFProcessor_Unstructured(pdf_processing_config) # merged_chunks, tables = pdf_processor.process_pdf_file(file) # documents = [Document(text=t) for t in merged_chunks] # # LLM Model choice # try: # if llm_model == "gpt-4" or llm_model == "gpt-3.5-turbo": # llm = OpenAI(model=llm_model, temperature=model_temperature, max_tokens=output_token_size) # elif llm_model == "mistralai/Mixtral-8x7B-Instruct-v0.1": # if any(param is None for param in [model_context_window, output_token_size, model_temperature, hf_token]): # raise ValueError("All parameters are required for Mistral LLM.") # llm = MixtralLLM(context_window=model_context_window, num_output=output_token_size, # temperature=model_temperature, model_name=llm_model, api_key=hf_token) # else: # raise ValueError(f"Unsupported language model: {llm_model}") # except Exception as e: # logger.error(f"Error initializing language model '{llm_model}': {e}", exc_info=True) # raise # Or handle the exception as needed # # Embedding model choice for RAG # try: # if embed == "openai": # embed_model = OpenAIEmbedding() # elif embed == "huggingface": # if embed_model_name is None: # # Set to default model if name not provided # embed_model_name = "BAAI/bge-small-en-v1.5" # embed_model = HuggingFaceEmbedding(embed_model_name) # else: # # Use the specified model name # embed_model = HuggingFaceEmbedding(embed_model_name) # else: # raise ValueError(f"Unsupported embedding model: {embed_model}") # except Exception as e: # logger.error(f"Error initializing embedding model: {e}", exc_info=True) # raise # # Prompts and Queries # utils = base_utils() # general_prompt = utils.read_from_file(general_prompt_path) # info_prompt = utils.read_from_file(info_prompt_path) # peer_review_journals = utils.read_from_file(peer_review_journals_path) # eq_network_journals = utils.read_from_file(eq_network_journals_path) # peer_review_journals_list = peer_review_journals.split('\n') # eq_network_journals_list = eq_network_journals.split('\n') # modified_journal_query = "Is the given research paper published in any of the following journals: " + ", ".join(peer_review_journals_list) + "?" # pdf_info_query = PDFQueryEngine(documents, llm, embed_model, (info_prompt)) # info_query_engine = pdf_info_query.setup_query_engine() # journal_result = info_query_engine.query(modified_journal_query).response # pdf_criteria_query = PDFQueryEngine(documents, llm, embed_model, (general_prompt)) # # Check for prior registration # nlp_methods = KeywordSearch(merged_chunks) # eq_journal_result = nlp_methods.find_journal_name(journal_result, eq_network_journals_list) # peer_journal_result = nlp_methods.find_journal_name(journal_result, peer_review_journals_list) # registration_result = nlp_methods.check_registration() # # Evaluate with OpenAI model # total_score, criteria_met, score_percentage, reasoning = pdf_criteria_query.evaluate_with_llm(registration_result, peer_journal_result, eq_journal_result, queries) # except Exception as e: # logging.exception("An error occurred while processing the file.") # # Consider adding a user-friendly message or redirect # flash('An error occurred while processing the file.') # return redirect(request.url) # return render_template('index.html', # total_score = total_score, # score_percentage = score_percentage, # criteria_met = criteria_met, # reasoning = reasoning) # if __name__ == '__main__': # app.run(debug=True)