|
import requests |
|
import os, sys, json |
|
import gradio as gr |
|
import time |
|
import re |
|
import io |
|
|
|
|
|
import tempfile |
|
import asyncio |
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
from PyPDF2 import PdfReader, PdfWriter |
|
|
|
|
|
|
|
from langchain_community.document_loaders import PyPDFLoader, UnstructuredWordDocumentLoader, DirectoryLoader |
|
from langchain_community.document_loaders.blob_loaders.youtube_audio import YoutubeAudioLoader |
|
|
|
|
|
|
|
from langchain_huggingface import HuggingFaceEndpoint |
|
|
|
from langchain_huggingface import HuggingFaceEmbeddings |
|
|
|
|
|
|
|
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain_community.vectorstores import Chroma |
|
from chromadb.errors import InvalidDimensionException |
|
from langchain_community.llms.huggingface_pipeline import HuggingFacePipeline |
|
from transformers import pipeline |
|
from huggingface_hub import InferenceApi |
|
from utils import * |
|
from beschreibungen import * |
|
|
|
|
|
|
|
|
|
|
|
ANTI_BOT_PW = os.getenv("VALIDATE_PW") |
|
|
|
|
|
|
|
HUGGINGFACEHUB_API_TOKEN = os.getenv("HF_READ") |
|
os.environ["HUGGINGFACEHUB_API_TOKEN"] = HUGGINGFACEHUB_API_TOKEN |
|
HEADERS = {"Authorization": f"Bearer {HUGGINGFACEHUB_API_TOKEN}"} |
|
|
|
hf_token = os.getenv("HF_READ") |
|
|
|
|
|
|
|
ANZAHL_DOCS = 5 |
|
PATH_WORK = "." |
|
CHROMA_DIR = "/chroma/kkg" |
|
CHROMA_PDF = './chroma/kkg/pdf' |
|
CHROMA_WORD = './chroma/kkg/word' |
|
CHROMA_EXCEL = './chroma/kkg/excel' |
|
DOCS_DIR = "chroma/kkg" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
file_path_download = "" |
|
|
|
|
|
|
|
|
|
def create_vectorstore(): |
|
global vektordatenbank, SPLIT_TO_ORIGINAL_MAPPING, ORIGINAL_SPLITS, PREPROCESSED_SPLITS |
|
|
|
PREPROCESSED_SPLITS, SPLIT_TO_ORIGINAL_MAPPING, ORIGINAL_SPLITS = document_loading_splitting() |
|
if PREPROCESSED_SPLITS: |
|
print("Vektordatenbank neu .....................") |
|
|
|
vektordatenbank = document_storage_chroma(PREPROCESSED_SPLITS) |
|
|
|
save_splits(PREPROCESSED_SPLITS, ORIGINAL_SPLITS) |
|
save_split_to_original_mapping(SPLIT_TO_ORIGINAL_MAPPING) |
|
|
|
|
|
def load_vectorstore_and_mapping(): |
|
global vektordatenbank, SPLIT_TO_ORIGINAL_MAPPING, ORIGINAL_SPLITS, PREPROCESSED_SPLITS |
|
preprocessed_splits, original_splits = load_splits() |
|
mapping = load_split_to_original_mapping() |
|
if preprocessed_splits is not None and original_splits is not None and mapping is not None: |
|
|
|
vektordatenbank = document_storage_chroma(preprocessed_splits) |
|
SPLIT_TO_ORIGINAL_MAPPING = mapping |
|
ORIGINAL_SPLITS = original_splits |
|
PREPROCESSED_SPLITS = preprocessed_splits |
|
else: |
|
|
|
create_vectorstore() |
|
|
|
|
|
|
|
|
|
|
|
|
|
print("Vektorstore laden.........................") |
|
|
|
load_vectorstore_and_mapping() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clear_all(history, uploaded_file_paths, chats): |
|
dic_history = {schluessel: wert for schluessel, wert in history} |
|
|
|
|
|
summary = "\n\n".join(f'{schluessel}: \n {wert}' for schluessel, wert in dic_history.items()) |
|
|
|
|
|
|
|
|
|
|
|
if chats != {} : |
|
id_neu = len(chats)+1 |
|
chats[id_neu]= summary |
|
else: |
|
chats[0]= summary |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return None, gr.Image(visible=False), uploaded_file_paths, [], gr.File(uploaded_file_paths, label="Download-Chatverläufe", visible=True, file_count="multiple", interactive = False), chats |
|
|
|
|
|
|
|
def clear_all3(history): |
|
|
|
uploaded_file_paths= "" |
|
return None, gr.Image(visible=False), [], |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def add_text(chatbot, history, prompt, file, file_history): |
|
if (file == None): |
|
chatbot = chatbot +[(prompt, None)] |
|
else: |
|
file_history = file |
|
if (prompt == ""): |
|
chatbot=chatbot + [((file.name,), "Prompt fehlt!")] |
|
else: |
|
chatbot = chatbot +[("Hochgeladenes Dokument: "+ get_filename(file) +"\n" + prompt, None)] |
|
|
|
return chatbot, history, prompt, file, file_history, gr.Image(visible = False), "" |
|
|
|
|
|
|
|
|
|
|
|
def file_anzeigen(file): |
|
ext = analyze_file(file) |
|
if (ext == "png" or ext == "PNG" or ext == "jpg" or ext == "jpeg" or ext == "JPG" or ext == "JPEG"): |
|
return gr.Image(width=47, visible=True, interactive = False, height=47, min_width=47, show_label=False, show_share_button=False, show_download_button=False, scale = 0.5), file, file |
|
else: |
|
return gr.Image(width=47, visible=True, interactive = False, height=47, min_width=47, show_label=False, show_share_button=False, show_download_button=False, scale = 0.5), "data/file.png", file |
|
|
|
def file_loeschen(): |
|
return None, gr.Image(visible = False) |
|
|
|
|
|
|
|
|
|
def cancel_outputing(): |
|
reset_textbox() |
|
return "Stop Done" |
|
|
|
def reset_textbox(): |
|
return gr.update(value=""),"" |
|
|
|
|
|
|
|
|
|
|
|
def generate_text (prompt, chatbot, history, retriever, top_p=0.6, temperature=0.2, max_new_tokens=4048, max_context_length_tokens=2048, repetition_penalty=1.3, top_k=35): |
|
if (prompt == ""): |
|
raise gr.Error("Prompt ist erforderlich.") |
|
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
|
|
print("HF Anfrage.......................") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
llm = HuggingFaceEndpoint( |
|
endpoint_url=f"https://api-inference.huggingface.co/models/{MODEL_NAME_HF}", |
|
api_key=hf_token, |
|
temperature=0.5, |
|
max_length=1024, |
|
top_k=top_k, |
|
top_p=top_p, |
|
repetition_penalty=repetition_penalty |
|
) |
|
result = rag_chain(llm, history_text_und_prompt, retriever) |
|
|
|
|
|
############################################# |
|
#2. Alternative: mit API_URL |
|
#result = rag_chain(API_URL, history_text_und_prompt, retriever) |
|
|
|
############################################# |
|
#3.te Alternative für pipeline |
|
# Erstelle eine Pipeline mit den gewünschten Parametern |
|
#llm = pipeline("text-generation", model=MODEL_NAME_HF, config={"temperature": 0.5, "max_length": 1024, "num_return_sequences": 1, "top_k": top_k, "top_p": top_p, "repetition_penalty": repetition_penalty}, trust_remote_code=True) |
|
#llm = pipeline("summarization", model=MODEL_NAME_HF, trust_remote_code=True) |
|
#result = rag_chain(llm, history_text_und_prompt, retriever) |
|
""" |
|
|
|
result = rag_chain_simpel(prompt, retriever) |
|
|
|
except Exception as e: |
|
raise gr.Error(e) |
|
return result, False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_auswahl(prompt_in, file, file_history, chatbot, history, anzahl_docs=4, top_p=0.6, temperature=0.5, max_new_tokens=4048, max_context_length_tokens=2048, repetition_penalty=1.3,top_k=5, validate=False): |
|
global vektordatenbank, SPLIT_TO_ORIGINAL_MAPPING |
|
|
|
|
|
if (validate and not prompt_in == "" and not prompt_in == None): |
|
|
|
|
|
neu_file = file_history |
|
|
|
|
|
prompt = preprocess_text(prompt_in) |
|
|
|
if vektordatenbank is None: |
|
print("db neu aufbauen!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1") |
|
|
|
|
|
create_vectorstore() |
|
|
|
if vektordatenbank: |
|
|
|
retriever = vektordatenbank.as_retriever(search_kwargs = {"k": ANZAHL_DOCS}) |
|
|
|
|
|
status = "Antwort der Vektordatenbank" |
|
results, status = generate_text(prompt, chatbot, history, retriever, top_p=0.6, temperature=0.5, max_new_tokens=4048, max_context_length_tokens=2048, repetition_penalty=1.3, top_k=3) |
|
|
|
|
|
if results['relevant_docs']: |
|
|
|
relevant_docs_org = [] |
|
for result in results['relevant_docs']: |
|
split_id = result.get("metadata", {}).get("split_id") |
|
if split_id: |
|
try: |
|
original_split = SPLIT_TO_ORIGINAL_MAPPING[split_id] |
|
relevant_docs_org.append(original_split) |
|
except Exception as e: |
|
print(f"Fehler beim Laden des Mappings...................: {str(e)}") |
|
|
|
else: |
|
|
|
status = "Keine relevanten Dokumente gefunden." |
|
relevant_docs_org = [] |
|
|
|
relevant_docs = extract_document_info(relevant_docs_org) |
|
|
|
|
|
summary = str(results['answer']) + "\n\n" |
|
summary += " ".join([ |
|
'<div><b>Dokument/Link: </b> <span style="color: #BB70FC;"><a href="' + str(doc['download_link']) + '" target="_blank">' + str(doc['titel']) + '</a></span>' |
|
' (<b>Seite:</span> <span style="color: red;">' + str(doc['seite']) + '</b></span>)<br>' |
|
'<span><b>Auschnitt:</b> ' + str(doc["content"]) + '</span></div><br>' |
|
|
|
for doc in relevant_docs]) |
|
|
|
history = history + [[prompt_in, summary]] |
|
|
|
chatbot[-1][1] = summary |
|
return chatbot, history, None, file_history, "" |
|
else: |
|
chatbot[-1][1] = "keine Dokumente gefunden!" |
|
return chatbot, history, None, file_history, "" |
|
|
|
|
|
else: |
|
return chatbot, history, None, file_history, "Erst validieren oder einen Prompt eingeben!" |
|
|
|
|
|
|
|
|
|
def upload_pdf(files): |
|
status_message = "" |
|
if not files: |
|
status_message = " Keine Dateien zum Hochladen! " |
|
else: |
|
for file in files: |
|
try: |
|
|
|
filename = os.path.basename(file.name) |
|
|
|
|
|
file_extension = os.path.splitext(filename)[1] |
|
|
|
if file_extension == ".pdf": |
|
upload_path = f"chroma/kkg/pdf/{filename}" |
|
elif file_extension == ".docx": |
|
upload_path = f"chroma/kkg/word/{filename}" |
|
else: |
|
upload_path = f"chroma/kkg/{filename}" |
|
|
|
|
|
if os.path.exists(upload_path): |
|
os.remove(upload_path) |
|
|
|
|
|
upload_file_to_huggingface(file.name, upload_path) |
|
|
|
except Exception as e: |
|
logging.error(f"Error uploading file {file.name}: {e}") |
|
status_message = "Nicht alle Dateien konnten hochgeladen werden... " |
|
|
|
status_message = "Hochladen der Dateien abgeschlossen! " |
|
|
|
return gr.Textbox(label="Status", visible = True), display_files(), status_message |
|
|
|
|
|
def update_vectorstore(status): |
|
try: |
|
|
|
|
|
|
|
create_vectorstore() |
|
message = status + "Vektorstore wurde erneuert!" |
|
return message, message |
|
except Exception as e: |
|
message = status + "Fehler beim Erneuern des Vektorstores!" |
|
return message, message |
|
|
|
|
|
def reset_file_input(): |
|
|
|
return gr.update(value=None) |
|
|
|
|
|
def show_success(): |
|
return gr.Info( "System erfolgreich aktualisiert!") |
|
|
|
def hide_status(): |
|
return gr.HTML(value="", label="Status", visible=False) |
|
|
|
def show_status(): |
|
return gr.HTML(value="", label="Status", visible=True) |
|
|
|
def show_text_status(status): |
|
ausgabe = f"<div style='color: rgb(82, 255, 51) !important; text-align: center; font-size: 20px;'><b>{status}</b></div>" |
|
return gr.HTML(value=ausgabe, label="Status", visible=True), "" |
|
|
|
|
|
|
|
|
|
|
|
|
|
def validate_input(user_input_validate, validate=False): |
|
user_input_hashed = hash_input(user_input_validate) |
|
if user_input_hashed == hash_input(ANTI_BOT_PW): |
|
return "Richtig! Weiter gehts... ", True, gr.Textbox(visible=False), gr.Button(visible=False) |
|
else: |
|
return "Falsche Antwort!!!!!!!!!", False, gr.Textbox(label= "Bitte das oben im Moodle Kurs angegebene Wort eingeben, um die Anwendung zu starten", visible=True, interactive=True, scale= 7), gr.Button("Validieren", visible = True) |
|
|
|
|
|
|
|
|
|
""" |
|
def custom_css(): |
|
return |
|
#status_system_update { |
|
color: red !important; |
|
text-align: center; |
|
font-size: 20px; |
|
} |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
print ("Start GUI Hauptanwendung") |
|
with open("custom.css", "r", encoding="utf-8") as f: |
|
customCSS = f.read() |
|
|
|
|
|
additional_inputs = [ |
|
gr.Slider(label="Temperature", value=0.65, minimum=0.0, maximum=1.0, step=0.05, interactive=True, info="Höhere Werte erzeugen diversere Antworten", visible=True), |
|
gr.Slider(label="Max new tokens", value=1024, minimum=0, maximum=4096, step=64, interactive=True, info="Maximale Anzahl neuer Tokens", visible=True), |
|
gr.Slider(label="Top-p (nucleus sampling)", value=0.6, minimum=0.0, maximum=1, step=0.05, interactive=True, info="Höhere Werte verwenden auch Tokens mit niedrigerer Wahrscheinlichkeit.", visible=True), |
|
gr.Slider(label="Repetition penalty", value=1.2, minimum=1.0, maximum=2.0, step=0.05, interactive=True, info="Strafe für wiederholte Tokens", visible=True) |
|
] |
|
with gr.Blocks(css=customCSS, theme=themeAlex) as demo: |
|
|
|
validate = gr.State(True) |
|
|
|
|
|
|
|
history = gr.State([]) |
|
uploaded_file_paths= gr.State([]) |
|
history3 = gr.State([]) |
|
uploaded_file_paths3= gr.State([]) |
|
|
|
chats = gr.State({}) |
|
|
|
user_question = gr.State("") |
|
|
|
|
|
user_question2 = gr.State("") |
|
user_question3 = gr.State("") |
|
attached_file = gr.State(None) |
|
attached_file_history = gr.State(None) |
|
attached_file3 = gr.State(None) |
|
attached_file_history3 = gr.State(None) |
|
|
|
status_display = gr.State("") |
|
status_system_update= gr.State("") |
|
|
|
|
|
|
|
|
|
|
|
gr.Markdown(description_top) |
|
with gr.Row(): |
|
user_input_validate =gr.Textbox(label= "Bitte das oben im Moodle Kurs angegebene Wort eingeben, um die Anwendung zu starten", visible=True, interactive=True, scale= 7) |
|
validate_btn = gr.Button("Validieren", visible = True) |
|
|
|
|
|
with gr.Tab("KKG KI-Suche") as tab1: |
|
with gr.Row(): |
|
|
|
status_display = gr.Markdown("Antwort der KI ...", visible = True) |
|
with gr.Row(): |
|
with gr.Column(scale=5): |
|
with gr.Row(): |
|
chatbot = gr.Chatbot(elem_id="li-chat",show_copy_button=True) |
|
with gr.Row(): |
|
with gr.Column(scale=12): |
|
user_input = gr.Textbox( |
|
show_label=False, placeholder="Gib hier deine Such-Frage ein...", |
|
container=False |
|
) |
|
with gr.Column(min_width=70, scale=1): |
|
submitBtn = gr.Button("Senden") |
|
with gr.Column(min_width=70, scale=1): |
|
cancelBtn = gr.Button("Stop") |
|
with gr.Row(): |
|
image_display = gr.Image( visible=False) |
|
upload = gr.UploadButton("📁", file_types=["pdf", "docx", "pptx", "xlsx"], scale = 10, visible = False) |
|
emptyBtn = gr.ClearButton([user_input, chatbot, history, attached_file, attached_file_history, image_display], value="🧹 Neue Session", scale=10) |
|
|
|
with gr.Column(visible = False): |
|
with gr.Column(min_width=50, scale=1): |
|
with gr.Tab(label="KKG-Suche ..."): |
|
file_download = gr.File(label="Noch keine Chatsverläufe", visible=True, interactive = False, file_count="multiple",) |
|
|
|
with gr.Tab(label="Parameter"): |
|
model_option = gr.Radio(["HuggingFace"], label="Modellauswahl", value = "HuggingFace") |
|
|
|
|
|
top_p = gr.Slider( |
|
minimum=-0, |
|
maximum=1.0, |
|
value=0.95, |
|
step=0.05, |
|
interactive=True, |
|
label="Top-p", |
|
visible=False, |
|
) |
|
top_k = gr.Slider( |
|
minimum=1, |
|
maximum=100, |
|
value=35, |
|
step=1, |
|
interactive=True, |
|
label="Top-k", |
|
visible=False, |
|
) |
|
temperature = gr.Slider( |
|
minimum=0.1, |
|
maximum=2.0, |
|
value=0.2, |
|
step=0.1, |
|
interactive=True, |
|
label="Temperature", |
|
visible=False |
|
) |
|
max_length_tokens = gr.Slider( |
|
minimum=0, |
|
maximum=512, |
|
value=512, |
|
step=8, |
|
interactive=True, |
|
label="Max Generation Tokens", |
|
visible=False, |
|
) |
|
max_context_length_tokens = gr.Slider( |
|
minimum=0, |
|
maximum=4096, |
|
value=2048, |
|
step=128, |
|
interactive=True, |
|
label="Max History Tokens", |
|
visible=False, |
|
) |
|
repetition_penalty=gr.Slider(label="Repetition penalty", value=1.2, minimum=1.0, maximum=2.0, step=0.05, interactive=True, info="Strafe für wiederholte Tokens", visible=False) |
|
anzahl_docs = gr.Slider(label="Anzahl Dokumente", value=3, minimum=1, maximum=10, step=1, interactive=True, info="wie viele Dokumententeile aus dem Vektorstore an den prompt gehängt werden", visible=False) |
|
openai_key = gr.Textbox(label = "OpenAI API Key", value = "sk-", lines = 1, visible = False) |
|
|
|
|
|
with gr.Tab("Datei hochladen") as tab2: |
|
upload_pdf_files = gr.Files(label="PDF- oder Word-Dateien in Zwischenablage", file_count="multiple") |
|
output_text = gr.HTML(value="", label="Status", elem_id="status") |
|
message = gr.Markdown(visible=False, elem_id="popup_message") |
|
renew_button = gr.Button("Dateien hochladen und System aktualisieren", elem_id="renew_button") |
|
file_list = gr.HTML(elem_id="file_list", show_label=False) |
|
|
|
|
|
gr.Markdown(description) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
predict_args = dict( |
|
fn=generate_auswahl, |
|
inputs=[ |
|
user_question, |
|
attached_file, |
|
attached_file_history, |
|
chatbot, |
|
history, |
|
anzahl_docs, |
|
top_p, |
|
temperature, |
|
max_length_tokens, |
|
max_context_length_tokens, |
|
repetition_penalty, |
|
top_k, |
|
validate |
|
], |
|
outputs=[chatbot, history, attached_file, attached_file_history, status_display], |
|
show_progress=True, |
|
) |
|
|
|
reset_args = dict( |
|
fn=reset_textbox, inputs=[], outputs=[user_input, status_display] |
|
) |
|
|
|
|
|
transfer_input_args = dict( |
|
fn=add_text, inputs=[chatbot, history, user_input, attached_file, attached_file_history], outputs=[chatbot, history, user_question, attached_file, attached_file_history, image_display , user_input], show_progress=True |
|
) |
|
|
|
|
|
|
|
|
|
|
|
validate_btn.click(validate_input, inputs=[user_input_validate, validate], outputs=[status_display, validate, user_input_validate, validate_btn]) |
|
user_input_validate.submit(validate_input, inputs=[user_input_validate, validate], outputs=[status_display, validate, user_input_validate, validate_btn]) |
|
|
|
|
|
predict_event1 = user_input.submit(**transfer_input_args, queue=False,).then(**predict_args) |
|
predict_event2 = submitBtn.click(**transfer_input_args, queue=False,).then(**predict_args) |
|
predict_event3 = upload.upload(file_anzeigen, [upload], [image_display, image_display, attached_file] ) |
|
emptyBtn.click(clear_all, [history, uploaded_file_paths, chats], [attached_file, image_display, uploaded_file_paths, history, file_download, chats]) |
|
|
|
image_display.select(file_loeschen, [], [attached_file, image_display]) |
|
|
|
cancelBtn.click(cancel_outputing, [], [status_display], cancels=[predict_event1,predict_event2, predict_event3]) |
|
|
|
|
|
|
|
|
|
renew_button.click(fn=upload_pdf, inputs=[upload_pdf_files], outputs=[output_text, file_list, status_system_update]).then( |
|
fn=update_vectorstore, inputs=status_system_update, outputs=[output_text, status_system_update]).then( |
|
fn=reset_file_input, inputs=None, outputs=[upload_pdf_files]).then(fn=show_text_status, inputs=status_system_update, outputs=[output_text, status_system_update]) |
|
demo.load(display_files, outputs=file_list) |
|
|
|
demo.title = "KKG-Suche" |
|
demo.queue(default_concurrency_limit=15).launch(debug=True) |