import os import re import json import getpass import logging import openai import asyncio from typing import Any, List, Tuple, Dict import gradio as gr import llama_index from fpdf import FPDF from llama_index import Document from llama_index.llms import OpenAI from llama_index.embeddings import OpenAIEmbedding, HuggingFaceEmbedding from llama_index.llms import HuggingFaceLLM import requests from RAG_utils import PDFProcessor_Unstructured, PDFQueryEngine, HybridRetriever, MixtralLLM, KeywordSearch, base_utils, ConfigManager # Configure basic logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # Create a logger object logger = logging.getLogger(__name__) os.environ["TOKENIZERS_PARALLELISM"] = "false" config_manager = ConfigManager() #config_manager.load_config("api", "Config/api_config.json") config_manager.load_config("model", "model_config.json") openai.api_key = os.environ['OPENAI_API_KEY'] #config_manager.get_config_value("api", "OPENAI_API_KEY") hf_token = os.environ['HF_TOKEN']#config_manager.get_config_value("api", "HF_TOKEN") # PDF rendering and chunking parameters pdf_processing_config = config_manager.get_config_value("model", "pdf_processing") ALLOWED_EXTENSIONS = config_manager.get_config_value("model", "allowed_extensions") embed = config_manager.get_config_value("model", "embeddings") embed_model_name = config_manager.get_config_value("model", "embeddings_model") #llm_model = config_manager.get_config_value("model", "llm_model") model_temperature = config_manager.get_config_value("model", "model_temp") output_token_size = config_manager.get_config_value("model", "max_tokens") model_context_window = config_manager.get_config_value("model", "context_window") gpt_prompt_path = config_manager.get_config_value("model","GPT_PROMPT_PATH") mistral_prompt_path = config_manager.get_config_value("model","MISTRAL_PROMPT_PATH") info_prompt_path = config_manager.get_config_value("model", "INFO_PROMPT_PATH") peer_review_journals_path = config_manager.get_config_value("model", "peer_review_journals_path") eq_network_journals_path = config_manager.get_config_value("model", "eq_network_journals_path") queries = config_manager.get_config_value("model", "queries") criteria = config_manager.get_config_value("model", "criteria") num_criteria = len(queries) author_query = config_manager.get_config_value("model", "author_query") journal_query = config_manager.get_config_value("model", "journal_query") # Helper function to check if the file extension is allowed def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def generate_score_bar(score, num_criteria): # Convert and round the score from a 9-point scale to a 100-point scale score_out_of_100 = round((score / num_criteria) * 100) # Determine the color and text based on the original score if score == 9: color = "#4CAF50" # green text = "Very good" elif score in [7, 8]: color = "#FFEB3B" # yellow text = "Good" elif score in [5, 6]: color = "#FF9800" # orange text = "Ok" elif score in [3, 4]: color = "#F44336" # red text = "Bad" else: # score < 3 color = "#800000" # maroon text = "Very bad" # Create the HTML for the score bar score_bar_html = f"""

{text}

""" return score_bar_html class PDF(FPDF): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Load the DejaVu font files self.add_font('DejaVu', '', 'DejaVu_Sans/DejaVuSansCondensed.ttf', uni=True) self.add_font('DejaVu', 'B', 'DejaVu_Sans/DejaVuSansCondensed-Bold.ttf', uni=True) self.add_font('DejaVu', 'I', 'DejaVu_Sans/DejaVuSansCondensed-Oblique.ttf', uni=True) def header(self): self.set_font('DejaVu', 'B', 12) self.cell(0, 10, 'Paper Analysis Report', 0, 1, 'C') def footer(self): self.set_y(-15) self.set_font('DejaVu', 'I', 8) self.cell(0, 10, f'Page {self.page_no()}', 0, 0, 'C') def create_pdf_report(title, author_info, score, reasoning_text, output_path): pdf = PDF() pdf.add_page() # Set the font to DejaVu for Unicode support pdf.set_font("DejaVu", size=12) pdf.cell(0, 10, f"Title: {title}", 0, 1) pdf.cell(0, 10, f"Author Information: {author_info}", 0, 1) pdf.cell(0, 10, f"Score: {score}", 0, 1) pdf.multi_cell(0, 10, f"Reasoning:\n{reasoning_text}") pdf.output(output_path) def process_pdf(uploaded_files, llm_model, n_criteria = num_criteria): # Initialize aggregation variables final_score = 0 final_reasoning = [] final_score_bar_html = "" final_author_info_html = "" final_title_info_html = "" output_files = [] for i, uploaded_file in enumerate(uploaded_files): # Process the PDF file pdf_processor = PDFProcessor_Unstructured(pdf_processing_config) merged_chunks, tables, title = pdf_processor.process_pdf_file(uploaded_file) documents = [Document(text=t) for t in merged_chunks] # Prompts and Queries utils = base_utils() info_prompt = utils.read_from_file(info_prompt_path) # LLM Model choice try: if llm_model == "Model 1": llm = OpenAI(model="gpt-4-1106-preview", temperature=model_temperature, max_tokens=output_token_size) general_prompt = utils.read_from_file(gpt_prompt_path) elif llm_model == "Model 2": if any(param is None for param in [model_context_window, output_token_size, model_temperature, hf_token]): raise ValueError("All parameters are required for Mistral LLM.") llm = MixtralLLM(context_window=model_context_window, num_output=output_token_size, temperature=model_temperature, model_name="mistralai/Mixtral-8x7B-Instruct-v0.1", api_key=hf_token) general_prompt = utils.read_from_file(mistral_prompt_path) else: raise ValueError(f"Unsupported language model: {llm_model}") except Exception as e: logger.error(f"Error initializing language model '{llm_model}': {e}", exc_info=True) raise # Or handle the exception as needed # Embedding model choice for RAG try: if embed == "openai": embed_model = OpenAIEmbedding(model="text-embedding-3-large") elif embed == "huggingface": # Use the specified model name embed_model = HuggingFaceEmbedding(embed_model_name) else: raise ValueError(f"Unsupported embedding model: {embed_model}") except Exception as e: logger.error(f"Error initializing embedding model: {e}", exc_info=True) raise peer_review_journals = utils.read_from_file(peer_review_journals_path) eq_network_journals = utils.read_from_file(eq_network_journals_path) peer_review_journals_list = peer_review_journals.split('\n') eq_network_journals_list = eq_network_journals.split('\n') modified_journal_query = "Is the given research paper published in any of the following journals: " + ", ".join(peer_review_journals_list) + "?" info_llm = OpenAI(model="gpt-4-1106-preview", temperature=model_temperature, max_tokens=100) pdf_info_query = PDFQueryEngine(documents, info_llm, embed_model, (info_prompt)) info_query_engine = pdf_info_query.setup_query_engine() journal_result = info_query_engine.query(modified_journal_query).response author_result = info_query_engine.query(author_query).response pdf_criteria_query = PDFQueryEngine(documents, llm, embed_model, (general_prompt)) # Check for prior registration nlp_methods = KeywordSearch(merged_chunks) eq_journal_result = nlp_methods.find_journal_name(journal_result, eq_network_journals_list) peer_journal_result = nlp_methods.find_journal_name(journal_result, peer_review_journals_list) registration_result = nlp_methods.check_registration() # Evaluate with OpenAI model total_score, criteria_met, score_percentage, reasoning = pdf_criteria_query.evaluate_with_llm(registration_result, peer_journal_result, eq_journal_result, queries) # Convert reasoning list to plain text reasoning_text = "\n".join([f"{idx + 1}. {reason}" for idx, reason in enumerate(reasoning)]) # Generate the score bar HTML score_bar_html = generate_score_bar(total_score, n_criteria) output_path = f"/tmp/paper_report_{i+1}.pdf" create_pdf_report(title, author_result, total_score, reasoning_text, output_path) output_files.append(output_path) # Construct the processing message processing_message = f"Processing complete. {len(uploaded_files)} reports generated. Please download your reports below." return processing_message, output_files # Return the score as a string and the reasoning as HTML #return str(round((total_score / n_criteria) * 100)) + "/100", score_bar_html, reasoning_html, author_info_html, title_info_html with gr.Blocks(theme=gr.themes.Glass( text_size="sm", font=[gr.themes.GoogleFont("Inconsolata"), "Arial", "sans-serif"], primary_hue="neutral", secondary_hue="gray")) as demo: gr.Markdown("## Med Library") with gr.Row(): file_upload = gr.File(label="Choose papers", file_types=['.pdf'], file_count="multiple") with gr.Row(): model_choice = gr.Dropdown(["Model 1", "Model 2"], label="Choose a model", value="Model 1") submit_button = gr.Button("Evaluate") processing_message_output = gr.Textbox(label="Processing Status", interactive=False) report_download_links = gr.File(label="Download Reports", type="filepath", file_count="multiple") submit_button.click( fn=process_pdf, inputs=[file_upload, model_choice], outputs=[processing_message_output, report_download_links] ) #Launch the app demo.launch(share=True, server_name="0.0.0.0", server_port=7860) # Main route for file upload and display results # @app.route('/', methods=['GET', 'POST']) # def upload_and_display_results(): # total_score = 0 # score_percentage = 0 # reasoning = [] # criteria_met = 0 # if request.method == 'POST': # # Check if the post request has the file part # if 'file' not in request.files: # flash('No file part') # return redirect(request.url) # file = request.files['file'] # # If user does not select file, browser also submits an empty part without filename # if file.filename == '': # flash('No selected file') # return redirect(request.url) # if file and allowed_file(file.filename): # try: # # Process the PDF file # pdf_processor = PDFProcessor_Unstructured(pdf_processing_config) # merged_chunks, tables = pdf_processor.process_pdf_file(file) # documents = [Document(text=t) for t in merged_chunks] # # LLM Model choice # try: # if llm_model == "gpt-4" or llm_model == "gpt-3.5-turbo": # llm = OpenAI(model=llm_model, temperature=model_temperature, max_tokens=output_token_size) # elif llm_model == "mistralai/Mixtral-8x7B-Instruct-v0.1": # if any(param is None for param in [model_context_window, output_token_size, model_temperature, hf_token]): # raise ValueError("All parameters are required for Mistral LLM.") # llm = MixtralLLM(context_window=model_context_window, num_output=output_token_size, # temperature=model_temperature, model_name=llm_model, api_key=hf_token) # else: # raise ValueError(f"Unsupported language model: {llm_model}") # except Exception as e: # logger.error(f"Error initializing language model '{llm_model}': {e}", exc_info=True) # raise # Or handle the exception as needed # # Embedding model choice for RAG # try: # if embed == "openai": # embed_model = OpenAIEmbedding() # elif embed == "huggingface": # if embed_model_name is None: # # Set to default model if name not provided # embed_model_name = "BAAI/bge-small-en-v1.5" # embed_model = HuggingFaceEmbedding(embed_model_name) # else: # # Use the specified model name # embed_model = HuggingFaceEmbedding(embed_model_name) # else: # raise ValueError(f"Unsupported embedding model: {embed_model}") # except Exception as e: # logger.error(f"Error initializing embedding model: {e}", exc_info=True) # raise # # Prompts and Queries # utils = base_utils() # general_prompt = utils.read_from_file(general_prompt_path) # info_prompt = utils.read_from_file(info_prompt_path) # peer_review_journals = utils.read_from_file(peer_review_journals_path) # eq_network_journals = utils.read_from_file(eq_network_journals_path) # peer_review_journals_list = peer_review_journals.split('\n') # eq_network_journals_list = eq_network_journals.split('\n') # modified_journal_query = "Is the given research paper published in any of the following journals: " + ", ".join(peer_review_journals_list) + "?" # pdf_info_query = PDFQueryEngine(documents, llm, embed_model, (info_prompt)) # info_query_engine = pdf_info_query.setup_query_engine() # journal_result = info_query_engine.query(modified_journal_query).response # pdf_criteria_query = PDFQueryEngine(documents, llm, embed_model, (general_prompt)) # # Check for prior registration # nlp_methods = KeywordSearch(merged_chunks) # eq_journal_result = nlp_methods.find_journal_name(journal_result, eq_network_journals_list) # peer_journal_result = nlp_methods.find_journal_name(journal_result, peer_review_journals_list) # registration_result = nlp_methods.check_registration() # # Evaluate with OpenAI model # total_score, criteria_met, score_percentage, reasoning = pdf_criteria_query.evaluate_with_llm(registration_result, peer_journal_result, eq_journal_result, queries) # except Exception as e: # logging.exception("An error occurred while processing the file.") # # Consider adding a user-friendly message or redirect # flash('An error occurred while processing the file.') # return redirect(request.url) # return render_template('index.html', # total_score = total_score, # score_percentage = score_percentage, # criteria_met = criteria_met, # reasoning = reasoning) # if __name__ == '__main__': # app.run(debug=True)