Spaces:
Runtime error
Runtime error
import os | |
import re | |
import json | |
import getpass | |
import logging | |
import openai | |
import asyncio | |
from typing import Any, List, Tuple, Dict | |
import gradio as gr | |
import llama_index | |
from llama_index import Document | |
from llama_index.llms import OpenAI | |
from llama_index.embeddings import OpenAIEmbedding, HuggingFaceEmbedding | |
from llama_index.llms import HuggingFaceLLM | |
import requests | |
from RAG_utils import PDFProcessor_Unstructured, PDFQueryEngine, HybridRetriever, MixtralLLM, KeywordSearch, base_utils, ConfigManager | |
# Configure basic logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
# Create a logger object | |
logger = logging.getLogger(__name__) | |
os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
config_manager = ConfigManager() | |
#config_manager.load_config("api", "Config/api_config.json") | |
config_manager.load_config("model", "model_config.json") | |
openai.api_key = os.environ['OPENAI_API_KEY'] #config_manager.get_config_value("api", "OPENAI_API_KEY") | |
hf_token = os.environ['HF_TOKEN']#config_manager.get_config_value("api", "HF_TOKEN") | |
# PDF rendering and chunking parameters | |
pdf_processing_config = config_manager.get_config_value("model", "pdf_processing") | |
ALLOWED_EXTENSIONS = config_manager.get_config_value("model", "allowed_extensions") | |
embed = config_manager.get_config_value("model", "embeddings") | |
embed_model_name = config_manager.get_config_value("model", "embeddings_model") | |
#llm_model = config_manager.get_config_value("model", "llm_model") | |
model_temperature = config_manager.get_config_value("model", "model_temp") | |
output_token_size = config_manager.get_config_value("model", "max_tokens") | |
model_context_window = config_manager.get_config_value("model", "context_window") | |
gpt_prompt_path = config_manager.get_config_value("model","GPT_PROMPT_PATH") | |
mistral_prompt_path = config_manager.get_config_value("model","MISTRAL_PROMPT_PATH") | |
info_prompt_path = config_manager.get_config_value("model", "INFO_PROMPT_PATH") | |
peer_review_journals_path = config_manager.get_config_value("model", "peer_review_journals_path") | |
eq_network_journals_path = config_manager.get_config_value("model", "eq_network_journals_path") | |
queries = config_manager.get_config_value("model", "queries") | |
criteria = config_manager.get_config_value("model", "criteria") | |
num_criteria = len(queries) | |
author_query = config_manager.get_config_value("model", "author_query") | |
journal_query = config_manager.get_config_value("model", "journal_query") | |
# Helper function to check if the file extension is allowed | |
def allowed_file(filename): | |
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
def generate_score_bar(score, num_criteria): | |
# Convert and round the score from a 9-point scale to a 100-point scale | |
score_out_of_100 = round((score / num_criteria) * 100) | |
# Determine the color and text based on the original score | |
if score == 9: | |
color = "#4CAF50" # green | |
text = "Very good" | |
elif score in [7, 8]: | |
color = "#FFEB3B" # yellow | |
text = "Good" | |
elif score in [5, 6]: | |
color = "#FF9800" # orange | |
text = "Ok" | |
elif score in [3, 4]: | |
color = "#F44336" # red | |
text = "Bad" | |
else: # score < 3 | |
color = "#800000" # maroon | |
text = "Very bad" | |
# Create the HTML for the score bar | |
score_bar_html = f""" | |
<div style="background-color: #ddd; border-radius: 10px; position: relative; height: 20px; width: 100%;"> | |
<div style="background-color: {color}; height: 100%; border-radius: 10px; width: {score_out_of_100}%;"></div> | |
</div> | |
<p style="color: {color};">{text}</p> <!-- Display the text --> | |
""" | |
return score_bar_html | |
def format_example(example): | |
""" | |
Formats a few-shot example into a string. | |
Args: | |
example (dict): A dictionary containing 'query', 'score', and 'reasoning' for the few-shot example. | |
Returns: | |
str: Formatted few-shot example text. | |
""" | |
return "Example:\nQuery: {}\n Direct Answer: {}\n".format( | |
example['query'], example['Answer']) | |
def process_pdf(uploaded_file, llm_model, n_criteria = num_criteria): | |
# Process the PDF file | |
pdf_processor = PDFProcessor_Unstructured(pdf_processing_config) | |
merged_chunks, tables = pdf_processor.process_pdf_file(uploaded_file) | |
documents = [Document(text=t) for t in merged_chunks] | |
# Prompts and Queries | |
utils = base_utils() | |
info_prompt = utils.read_from_file(info_prompt_path) | |
# LLM Model choice | |
try: | |
if llm_model == "Model 1": | |
llm = OpenAI(model="gpt-4-1106-preview", temperature=model_temperature, max_tokens=output_token_size) | |
general_prompt = utils.read_from_file(gpt_prompt_path) | |
elif llm_model == "Model 2": | |
if any(param is None for param in [model_context_window, output_token_size, model_temperature, hf_token]): | |
raise ValueError("All parameters are required for Mistral LLM.") | |
llm = MixtralLLM(context_window=model_context_window, num_output=output_token_size, | |
temperature=model_temperature, model_name="mistralai/Mixtral-8x7B-Instruct-v0.1", api_key=hf_token) | |
general_prompt = utils.read_from_file(mistral_prompt_path) | |
else: | |
raise ValueError(f"Unsupported language model: {llm_model}") | |
except Exception as e: | |
logger.error(f"Error initializing language model '{llm_model}': {e}", exc_info=True) | |
raise # Or handle the exception as needed | |
# Embedding model choice for RAG | |
try: | |
if embed == "openai": | |
embed_model = OpenAIEmbedding(model="text-embedding-3-large") | |
elif embed == "huggingface": | |
# Use the specified model name | |
embed_model = HuggingFaceEmbedding(embed_model_name) | |
else: | |
raise ValueError(f"Unsupported embedding model: {embed_model}") | |
except Exception as e: | |
logger.error(f"Error initializing embedding model: {e}", exc_info=True) | |
raise | |
peer_review_journals = utils.read_from_file(peer_review_journals_path) | |
eq_network_journals = utils.read_from_file(eq_network_journals_path) | |
peer_review_journals_list = peer_review_journals.split('\n') | |
eq_network_journals_list = eq_network_journals.split('\n') | |
modified_journal_query = "Is the given research paper published in any of the following journals: " + ", ".join(peer_review_journals_list) + "?" | |
example_journal = {"query":modified_journal_query, | |
"Answer": "The article is published in the Lancet."} | |
example_author = {"query":author_query, | |
"Answer": "Corresponding author. Stephanie J. Sohl, Ph.D., Department of Social Sciences & Health Policy, Wake Forest School of Medicine, Medical Center Boulevard, Winston-Salem, NC 27157, USA, ssohl@wakehealth.edu"} | |
formatted_journal_example = format_example(example_journal) | |
formatted_author_example = format_example(example_author) | |
qa_author_prompt_with_example = info_prompt.replace("{few_shot_examples}", formatted_author_example) | |
qa_journal_prompt_with_example = info_prompt.replace("{few_shot_examples}", formatted_journal_example) | |
info_llm = OpenAI(model="gpt-4-1106-preview", temperature=model_temperature, max_tokens=output_token_size) | |
pdf_info_query = PDFQueryEngine(documents, info_llm, embed_model, (info_prompt)) | |
info_query_engine = pdf_info_query.setup_query_engine() | |
journal_result = info_query_engine.query(modified_journal_query).response | |
author_result = info_query_engine.query(author_query).response | |
pdf_criteria_query = PDFQueryEngine(documents, info_llm, embed_model, (general_prompt)) | |
# Check for prior registration | |
nlp_methods = KeywordSearch(merged_chunks) | |
eq_journal_result = nlp_methods.find_journal_name(journal_result, eq_network_journals_list) | |
peer_journal_result = nlp_methods.find_journal_name(journal_result, peer_review_journals_list) | |
registration_result = nlp_methods.check_registration() | |
# Convert your asynchronous operations into a synchronous context using asyncio.run | |
async def async_evaluation(): | |
# This assumes that evaluate_with_llm_async is an async version of your method | |
return await pdf_criteria_query.evaluate_with_llm_async(registration_result, peer_journal_result, eq_journal_result, queries) | |
# Evaluate with OpenAI model | |
total_score, criteria_met, score_percentage, reasoning = asyncio.run(async_evaluation()) | |
reasoning_html = "<ul>" | |
for query, reason in zip(criteria, reasoning): | |
reasoning_html += f"<li style='font-size: 18px;'><strong style='color: forestgreen;'>{query}</strong> <br> Reasoning: {reason}</li>" | |
reasoning_html += "</ul>" | |
# Generate the score bar HTML | |
score_bar_html = generate_score_bar(total_score, n_criteria) | |
# Return the score as a string and the reasoning as HTML | |
return str(round((total_score / n_criteria) * 100)) + "/100", score_bar_html, reasoning_html, author_result | |
with gr.Blocks(theme=gr.themes.Glass( | |
text_size="sm", | |
font=[gr.themes.GoogleFont("Inconsolata"), "Arial", "sans-serif"], | |
primary_hue="neutral", | |
secondary_hue="gray")) as demo: | |
gr.Markdown("## Med Library") | |
with gr.Row(): | |
file_upload = gr.File(label="Choose a paper", file_types=['.pdf']) | |
with gr.Row(): | |
models = ["Model 1", "Model 2"] | |
model_choice = gr.Dropdown(models, label="Choose a model", value="Model 1") | |
submit_button = gr.Button("Evaluate") | |
score_output = gr.Textbox(label="Final Score:", interactive=False) | |
score_bar_output = gr.HTML() | |
reasoning_output = gr.HTML() | |
# Heading for Author Information | |
gr.Markdown("## Author Information") | |
# Output for dynamically generated author information | |
author_info_output = gr.Markdown() | |
# Set the click event for the button | |
submit_button.click( | |
fn=process_pdf, | |
inputs=[file_upload, model_choice], | |
outputs=[score_output, score_bar_output, reasoning_output, author_info_output] | |
) | |
#Launch the app | |
demo.launch(share=True, server_name="0.0.0.0", server_port=7860) | |
# Main route for file upload and display results | |
# @app.route('/', methods=['GET', 'POST']) | |
# def upload_and_display_results(): | |
# total_score = 0 | |
# score_percentage = 0 | |
# reasoning = [] | |
# criteria_met = 0 | |
# if request.method == 'POST': | |
# # Check if the post request has the file part | |
# if 'file' not in request.files: | |
# flash('No file part') | |
# return redirect(request.url) | |
# file = request.files['file'] | |
# # If user does not select file, browser also submits an empty part without filename | |
# if file.filename == '': | |
# flash('No selected file') | |
# return redirect(request.url) | |
# if file and allowed_file(file.filename): | |
# try: | |
# # Process the PDF file | |
# pdf_processor = PDFProcessor_Unstructured(pdf_processing_config) | |
# merged_chunks, tables = pdf_processor.process_pdf_file(file) | |
# documents = [Document(text=t) for t in merged_chunks] | |
# # LLM Model choice | |
# try: | |
# if llm_model == "gpt-4" or llm_model == "gpt-3.5-turbo": | |
# llm = OpenAI(model=llm_model, temperature=model_temperature, max_tokens=output_token_size) | |
# elif llm_model == "mistralai/Mixtral-8x7B-Instruct-v0.1": | |
# if any(param is None for param in [model_context_window, output_token_size, model_temperature, hf_token]): | |
# raise ValueError("All parameters are required for Mistral LLM.") | |
# llm = MixtralLLM(context_window=model_context_window, num_output=output_token_size, | |
# temperature=model_temperature, model_name=llm_model, api_key=hf_token) | |
# else: | |
# raise ValueError(f"Unsupported language model: {llm_model}") | |
# except Exception as e: | |
# logger.error(f"Error initializing language model '{llm_model}': {e}", exc_info=True) | |
# raise # Or handle the exception as needed | |
# # Embedding model choice for RAG | |
# try: | |
# if embed == "openai": | |
# embed_model = OpenAIEmbedding() | |
# elif embed == "huggingface": | |
# if embed_model_name is None: | |
# # Set to default model if name not provided | |
# embed_model_name = "BAAI/bge-small-en-v1.5" | |
# embed_model = HuggingFaceEmbedding(embed_model_name) | |
# else: | |
# # Use the specified model name | |
# embed_model = HuggingFaceEmbedding(embed_model_name) | |
# else: | |
# raise ValueError(f"Unsupported embedding model: {embed_model}") | |
# except Exception as e: | |
# logger.error(f"Error initializing embedding model: {e}", exc_info=True) | |
# raise | |
# # Prompts and Queries | |
# utils = base_utils() | |
# general_prompt = utils.read_from_file(general_prompt_path) | |
# info_prompt = utils.read_from_file(info_prompt_path) | |
# peer_review_journals = utils.read_from_file(peer_review_journals_path) | |
# eq_network_journals = utils.read_from_file(eq_network_journals_path) | |
# peer_review_journals_list = peer_review_journals.split('\n') | |
# eq_network_journals_list = eq_network_journals.split('\n') | |
# modified_journal_query = "Is the given research paper published in any of the following journals: " + ", ".join(peer_review_journals_list) + "?" | |
# pdf_info_query = PDFQueryEngine(documents, llm, embed_model, (info_prompt)) | |
# info_query_engine = pdf_info_query.setup_query_engine() | |
# journal_result = info_query_engine.query(modified_journal_query).response | |
# pdf_criteria_query = PDFQueryEngine(documents, llm, embed_model, (general_prompt)) | |
# # Check for prior registration | |
# nlp_methods = KeywordSearch(merged_chunks) | |
# eq_journal_result = nlp_methods.find_journal_name(journal_result, eq_network_journals_list) | |
# peer_journal_result = nlp_methods.find_journal_name(journal_result, peer_review_journals_list) | |
# registration_result = nlp_methods.check_registration() | |
# # Evaluate with OpenAI model | |
# total_score, criteria_met, score_percentage, reasoning = pdf_criteria_query.evaluate_with_llm(registration_result, peer_journal_result, eq_journal_result, queries) | |
# except Exception as e: | |
# logging.exception("An error occurred while processing the file.") | |
# # Consider adding a user-friendly message or redirect | |
# flash('An error occurred while processing the file.') | |
# return redirect(request.url) | |
# return render_template('index.html', | |
# total_score = total_score, | |
# score_percentage = score_percentage, | |
# criteria_met = criteria_met, | |
# reasoning = reasoning) | |
# if __name__ == '__main__': | |
# app.run(debug=True) | |