Spaces:
Sleeping
Sleeping
import gradio as gr | |
import time | |
import os | |
from langchain.vectorstores import Chroma | |
from typing import List, Tuple | |
import re | |
import ast | |
import html | |
from utils.load_config import LoadConfig | |
from langchain.embeddings import HuggingFaceEmbeddings | |
import requests | |
import torch | |
FLASK_APP_ENDPOINT = "http://127.0.0.1:8888/generate_text" | |
APPCFG = LoadConfig() | |
URL = "" | |
hyperlink = f"[RAG]({URL})" | |
class ChatBot: | |
""" | |
Class representing a chatbot with document retrieval and response generation capabilities. | |
This class provides static methods for responding to user queries, handling feedback, and | |
cleaning references from retrieved documents. | |
""" | |
def respond(chatbot: List, | |
message: str, | |
data_type: str = "Preprocessed doc", | |
temperature: float = 0.1, | |
top_k: int = 10, | |
top_p: float = 0.1) -> Tuple: | |
""" | |
Generate a response to a user query using document retrieval and language model completion. | |
Parameters: | |
chatbot (List): List representing the chatbot's conversation history. | |
message (str): The user's query. | |
data_type (str): Type of data used for document retrieval ("Preprocessed doc" or "Upload doc: Process for RAG"). | |
temperature (float): Temperature parameter for language model completion. | |
Returns: | |
Tuple: A tuple containing an empty string, the updated chat history, and references from retrieved documents. | |
""" | |
# Retrieve embedding function from code env resources | |
# emb_model = "sentence-transformers/all-MiniLM-L6-v2" | |
embedding_function = HuggingFaceEmbeddings( | |
model_name="NeuML/pubmedbert-base-embeddings", | |
# cache_folder=os.getenv('SENTENCE_TRANSFORMERS_HOME') | |
) | |
if data_type == "Preprocessed doc": | |
# directories | |
if os.path.exists(APPCFG.persist_directory): | |
vectordb = Chroma(persist_directory=APPCFG.persist_directory, | |
embedding_function=embedding_function) | |
else: | |
chatbot.append( | |
(message, f"VectorDB does not exist. Please first execute the 'upload_data_manually.py' module. For further information please visit {hyperlink}.")) | |
return "", chatbot, None | |
elif data_type == "Upload doc: Process for RAG": | |
if os.path.exists(APPCFG.custom_persist_directory): | |
vectordb = Chroma(persist_directory=APPCFG.custom_persist_directory, | |
embedding_function=embedding_function) | |
else: | |
chatbot.append( | |
(message, f"No file was uploaded. Please first upload your files using the 'upload' button.")) | |
return "", chatbot, None | |
docs = vectordb.similarity_search(message, k=APPCFG.k) | |
question = "# Prompt that you have to answer:\n" + message | |
retrieved_content, markdown_documents = ChatBot.clean_references(docs) | |
# Memory: previous two Q&A pairs | |
chat_history = f"Chat history:\n {str(chatbot[-APPCFG.number_of_q_a_pairs:])}\n\n" | |
if APPCFG.add_history: | |
prompt_wrapper = f"{APPCFG.llm_system_role_with_history}\n\n{chat_history}\n\n{retrieved_content}{question}" | |
else: | |
prompt_wrapper = f"{APPCFG.llm_system_role_without_history}\n\n{question}\n\n{retrieved_content}" | |
print("========================") | |
print(prompt_wrapper) | |
print("========================") | |
messages = [ | |
{"role": "user", "content": prompt_wrapper}, | |
] | |
data = { | |
"prompt": messages, | |
"max_new_tokens": APPCFG.max_new_tokens, | |
"do_sample": APPCFG.do_sample, | |
"temperature": temperature, | |
"top_k": top_k, | |
"top_p": top_p | |
} | |
response = requests.post(FLASK_APP_ENDPOINT, json=data) | |
# print(response.text) | |
response_json = response.json() | |
chatbot.append( | |
(message, response_json["response"])) | |
# Clean up GPU memory | |
del vectordb | |
del docs | |
torch.cuda.empty_cache() | |
return "", chatbot, markdown_documents | |
def clean_references(documents: List) -> str: | |
""" | |
Clean and format references from retrieved documents. | |
Parameters: | |
documents (List): List of retrieved documents. | |
Returns: | |
str: A string containing cleaned and formatted references. | |
""" | |
server_url = "http://localhost:8000" | |
documents = [str(x)+"\n\n" for x in documents] | |
markdown_documents = "" | |
retrieved_content = "" | |
counter = 1 | |
for doc in documents: | |
# Extract content and metadata | |
content, metadata = re.match( | |
r"page_content=(.*?)( metadata=\{.*\})", doc).groups() | |
metadata = metadata.split('=', 1)[1] | |
metadata_dict = ast.literal_eval(metadata) | |
# Decode newlines and other escape sequences | |
content = bytes(content, "utf-8").decode("unicode_escape") | |
# Replace escaped newlines with actual newlines | |
content = re.sub(r'\\n', '\n', content) | |
content = re.sub(r'\s*<EOS>\s*<pad>\s*', ' ', content) | |
content = re.sub(r'\s+', ' ', content).strip() | |
# Decode HTML entities | |
content = html.unescape(content) | |
# Replace incorrect unicode characters with correct ones | |
#content = content.encode('utf-8').decode('utf-8', 'ignore') | |
# Use UTF-8 encoding instead of latin-1 to avoid encoding issues | |
content = content.encode('utf-8', 'ignore').decode('utf-8', 'ignore') | |
# Remove or replace special characters and mathematical symbols | |
# This step may need to be customized based on the specific symbols in your documents | |
content = re.sub(r'â', '-', content) | |
content = re.sub(r'â', '∈', content) | |
content = re.sub(r'Ã', '×', content) | |
content = re.sub(r'ï¬', 'fi', content) | |
content = re.sub(r'â', '∈', content) | |
content = re.sub(r'·', '·', content) | |
content = re.sub(r'ï¬', 'fl', content) | |
pdf_url = f"{server_url}/{os.path.basename(metadata_dict['source'])}" | |
retrieved_content += f"# Content {counter}:\n" + \ | |
content + "\n\n" | |
# Append cleaned content to the markdown string with two newlines between documents | |
markdown_documents += f"# Retrieved content {counter}:\n" + content + "\n\n" + \ | |
f"Source: {os.path.basename(metadata_dict['source'])}" + " | " +\ | |
f"Page number: {str(metadata_dict['page'])}" + " | " +\ | |
f"[View PDF]({pdf_url})" "\n\n" | |
counter += 1 | |
return retrieved_content, markdown_documents | |