###########################################################################################
# Title: Gradio Interface to LLM-chatbot with dynamic RAG-funcionality and ChromaDB
# Author: Andreas Fischer
# Date: October 10th, 2024
# Last update: October 10th, 2024
##########################################################################################
import os
import chromadb
from datetime import datetime
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.utils import embedding_functions
from transformers import AutoTokenizer, AutoModel
import torch
jina = AutoModel.from_pretrained('jinaai/jina-embeddings-v2-base-de', trust_remote_code=True, torch_dtype=torch.bfloat16)
#jira.save_pretrained("jinaai_jina-embeddings-v2-base-de")
device='cuda' if torch.cuda.is_available() else 'cpu'
#device='cpu' #'cuda' if torch.cuda.is_available() else 'cpu'
jina.to(device) #cuda:0
print(device)
class JinaEmbeddingFunction(EmbeddingFunction):
def __call__(self, input: Documents) -> Embeddings:
embeddings = jina.encode(input) #max_length=2048
return(embeddings.tolist())
dbPath = "/home/af/Schreibtisch/Code/gradio/Chroma/db"
onPrem = True if(os.path.exists(dbPath)) else False
if(onPrem==False): dbPath="/home/user/app/db"
#onPrem=True # uncomment to override automatic detection
print(dbPath)
path=dbPath
client = chromadb.PersistentClient(path=path)
print(client.heartbeat())
print(client.get_version())
print(client.list_collections())
jina_ef=JinaEmbeddingFunction()
embeddingModel=jina_ef
from huggingface_hub import InferenceClient
import gradio as gr
import json
inferenceClient = InferenceClient(
"mistralai/Mixtral-8x7B-Instruct-v0.1"
#"mistralai/Mistral-7B-Instruct-v0.1"
)
def format_prompt(message, history):
prompt = ""
#for user_prompt, bot_response in history:
# prompt += f"[INST] {user_prompt} [/INST]"
# prompt += f" {bot_response} "
prompt += f"[INST] {message} [/INST]"
return prompt
from pypdf import PdfReader
import ocrmypdf
def convertPDF(pdf_file, allow_ocr=False):
reader = PdfReader(pdf_file)
full_text = ""
page_list = []
def extract_text_from_pdf(reader):
full_text = ""
page_list = []
page_count = 1
for idx, page in enumerate(reader.pages):
text = page.extract_text()
if len(text) > 0:
page_list.append(text)
#full_text += f"---- Page {idx} ----\n" + text + "\n\n"
page_count += 1
return full_text.strip(), page_count, page_list
# Check if there are any images
image_count = sum(len(page.images) for page in reader.pages)
# If there are images and not much content, perform OCR on the document
if allow_ocr:
print(f"{image_count} Images")
if image_count > 0 and len(full_text) < 1000:
out_pdf_file = pdf_file.replace(".pdf", "_ocr.pdf")
ocrmypdf.ocr(pdf_file, out_pdf_file, force_ocr=True)
reader = PdfReader(out_pdf_file)
# Extract text:
full_text, page_count, page_list = extract_text_from_pdf(reader)
l = len(page_list)
print(f"{l} Pages")
# Extract metadata
metadata = {
"author": reader.metadata.author,
"creator": reader.metadata.creator,
"producer": reader.metadata.producer,
"subject": reader.metadata.subject,
"title": reader.metadata.title,
"image_count": image_count,
"page_count": page_count,
"char_count": len(full_text),
}
return page_list, full_text, metadata
def split_with_overlap(text,chunk_size=3500, overlap=700):
chunks=[]
step=max(1,chunk_size-overlap)
for i in range(0,len(text),step):
end=min(i+chunk_size,len(text))
#chunk = text[i:i+chunk_size]
chunks.append(text[i:end])
return chunks
def add_doc(path):
print("def add_doc!")
print(path)
if(str.lower(path).endswith(".pdf")):
doc=convertPDF(path)
doc="\n\n".join(doc[0])
gr.Info("PDF uploaded, start Indexing!")
else:
gr.Info("Error: Only pdfs are accepted!")
client = chromadb.PersistentClient(path="output/general_knowledge")
print(str(client.list_collections()))
#global collection
dbName="test"
if("name="+dbName in str(client.list_collections())):
client.delete_collection(name=dbName)
collection = client.create_collection(
dbName,
embedding_function=embeddingModel,
metadata={"hnsw:space": "cosine"})
corpus=split_with_overlap(doc,3500,700)
print(len(corpus))
then = datetime.now()
x=collection.get(include=[])["ids"]
print(len(x))
if(len(x)==0):
chunkSize=40000
for i in range(round(len(corpus)/chunkSize+0.5)): #0 is first batch, 3 is last (incomplete) batch given 133497 texts
print("embed batch "+str(i)+" of "+str(round(len(corpus)/chunkSize+0.5)))
ids=list(range(i*chunkSize,(i*chunkSize+chunkSize)))
batch=corpus[i*chunkSize:(i*chunkSize+chunkSize)]
textIDs=[str(id) for id in ids[0:len(batch)]]
ids=[str(id+len(x)+1) for id in ids[0:len(batch)]] # id refers to chromadb-unique ID
collection.add(documents=batch, ids=ids,
metadatas=[{"date": str("2024-10-10")} for b in batch]) #"textID":textIDs, "id":ids,
print("finished batch "+str(i)+" of "+str(round(len(corpus)/40000+0.5)))
now = datetime.now()
gr.Info(f"Indexing complete!")
print(now-then) #zu viel GB für sentences (GPU), bzw. 0:00:10.375087 für chunks
return(collection)
#split_with_overlap("test me if you can",2,1)
import gradio as gr
import re
def multimodalResponse(message,history,dropdown):
print("def multimodal response!")
length=str(len(history))
query=message["text"]
if(len(message["files"])>0): # is there at least one file attached?
collection=add_doc(message["files"][0])
else:
collection=add_doc(message["text"])
client = chromadb.PersistentClient(path="output/general_knowledge")
print(str(client.list_collections()))
x=collection.get(include=[])["ids"]
context=collection.query(query_texts=[query], n_results=1)
print(str(context))
#context=["\n"+c+"\n" for i, c in enumerate(retrievedTexts)]
#context="\n\n".join(context)
#return context
generate_kwargs = dict(
temperature=float(0.9),
max_new_tokens=5000,
top_p=0.95,
repetition_penalty=1.0,
do_sample=True,
seed=42,
)
system="Mit Blick auf das folgende Gespräch und den relevanten Kontext, antworte auf die aktuelle Frage des Nutzers."+\
"Antworte ausschließlich auf Basis der Informationen im Kontext.\n\nKontext:"+\
str(context)
#"Given the following conversation, relevant context, and a follow up question, "+\
#"reply with an answer to the current question the user is asking. "+\
#"Return only your response to the question given the above information "+\
#"following the users instructions as needed.\n\nContext:"+\
print(system)
formatted_prompt = format_prompt(system+"\n"+query, history)
stream = inferenceClient.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False)
output = ""
for response in stream:
output += response.token.text
yield output
#output=output+"\n\n
Sources
"+ "".join(["- " + s + "
" for s in combination])+"
"
yield output
i=gr.ChatInterface(multimodalResponse,
title="Frag dein PDF",
multimodal=True,
additional_inputs=[
gr.Dropdown(
info="select retrieval version",
choices=["1","2","3"],
value=["1"],
label="Retrieval Version")])
i.launch() #allowed_paths=["."])