import requests import os, sys, json import gradio as gr import time import re import io #from PIL import Image, ImageDraw, ImageOps, ImageFont #import base64 import tempfile from PyPDF2 import PdfReader, PdfWriter #from langchain.chains import LLMChain, RetrievalQA from langchain_community.document_loaders import PyPDFLoader, UnstructuredWordDocumentLoader, DirectoryLoader from langchain_community.document_loaders.blob_loaders.youtube_audio import YoutubeAudioLoader #from langchain.document_loaders import GenericLoader #from langchain.schema import AIMessage, HumanMessage #from langchain_community.llms import HuggingFaceHub from langchain_huggingface import HuggingFaceEndpoint #from langchain_community.llms import HuggingFaceEndPoints from langchain_huggingface import HuggingFaceEmbeddings #from langchain_community.llms import HuggingFaceTextGenInference #from langchain_community.embeddings import HuggingFaceInstructEmbeddings, HuggingFaceEmbeddings, HuggingFaceBgeEmbeddings, HuggingFaceInferenceAPIEmbeddings #from langchain.prompts import PromptTemplate #from langchain.chains import Runnable................................. from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import Chroma from chromadb.errors import InvalidDimensionException from langchain_community.llms.huggingface_pipeline import HuggingFacePipeline from transformers import pipeline from utils import * from beschreibungen import * #Konstanten #Validieren des PW ANTI_BOT_PW = os.getenv("VALIDATE_PW") #max Anzahl der zurückgelieferten Dokumente ANZAHL_DOCS = 5 PATH_WORK = "." CHROMA_DIR = "/chroma/kkg" CHROMA_PDF = './chroma/kkg/pdf' CHROMA_WORD = './chroma/kkg/word' CHROMA_EXCEL = './chroma/kkg/excel' #HuggingFace Model name-------------------------------- MODEL_NAME_HF = "mistralai/Mixtral-8x7B-Instruct-v0.1" #HuggingFace Reop ID-------------------------------- #repo_id = "meta-llama/Llama-2-13b-chat-hf" repo_id = "HuggingFaceH4/zephyr-7b-alpha" #das Modell ist echt gut!!! Vom MIT #repo_id = "TheBloke/Yi-34B-Chat-GGUF" #repo_id = "meta-llama/Llama-2-70b-chat-hf" #repo_id = "tiiuae/falcon-40b" #repo_id = "Vicuna-33b" #repo_id = "alexkueck/ChatBotLI2Klein" #repo_id = "mistralai/Mistral-7B-v0.1" #repo_id = "internlm/internlm-chat-7b" #repo_id = "Qwen/Qwen-7B" #repo_id = "Salesforce/xgen-7b-8k-base" #repo_id = "Writer/camel-5b-hf" #repo_id = "databricks/dolly-v2-3b" #repo_id = "google/flan-t5-xxl" #repo_id = "mistralai/Mixtral-8x7B-Instruct-v0.1" #repo_id = "abacusai/Smaug-72B-v0.1" # Hugging Face Token direkt im Code setzen hf_token = os.getenv("HF_READ") os.environ["HUGGINGFACEHUB_API_TOKEN"] = os.getenv("HF_READ") ############################################### #globale Variablen ############################################## #nur bei ersten Anfrage splitten der Dokumente - um die Vektordatenbank entsprechend zu füllen #splittet = False #DB für Vektorstore vektordatenbank = None retriever = None ############################################# # Allgemeine Konstanten #Filepath zu temp Folder (temp) mit File von ausgewähltem chatverlauf file_path_download = "" ################################################# ################################################# #Funktionen zur Verarbeitung ################################################ ############################################## #wenn löschen Button geklickt def clear_all(history, uploaded_file_paths, chats): dic_history = {schluessel: wert for schluessel, wert in history} #später wird die summary auf 50 tokens verkürzt, um die Anfrage nicht so teuer werden zu lassen #summary wird gebraucht für die Anfrage beim NN, um eine Überschrift des Eintrages zu generieren summary = "\n\n".join(f'{schluessel}: \n {wert}' for schluessel, wert in dic_history.items()) #falls file mit summay für download existiert hat: das zunächst löschen #cleanup(file_path_download) #noch nicht im Einsatz, aber hier werden alle Chats einer Sitzung gespeichert #den aktuellen Chatverlauf zum Download bereitstellen: if chats != {} : id_neu = len(chats)+1 chats[id_neu]= summary else: chats[0]= summary #Eine Überschrift zu dem jeweiligen Chatverlauf finden - abhängig vom Inhalt #file_path_download = save_and_download(summary) #headers, payload = process_chatverlauf(summary, MODEL_NAME, OAI_API_KEY) #response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) #als json ausgeben #data = response.json() # Den "content" auswählen, da dort die Antwort der Ki enthalten ist #result = data['choices'][0]['message']['content'] #worte = result.split() #if len(worte) > 2: #file_path_download = "data/" + str(len(chats)) + "_Chatverlauf.pdf" #else: #file_path_download = "data/" + str(len(chats)) + "_" + result + ".pdf" #erstellePdf(file_path_download, result, dic_history) #die session variable in gradio erweitern und alle fliepath neu in das gr.File hochladen #uploaded_file_paths= uploaded_file_paths + [file_path_download] return None, gr.Image(visible=False), uploaded_file_paths, [], gr.File(uploaded_file_paths, label="Download-Chatverläufe", visible=True, file_count="multiple", interactive = False), chats #wenn löschen Button geklickt def clear_all3(history): #die session variable in gradio erweitern und alle fliepath neu in das gr.File hochladen uploaded_file_paths= "" return None, gr.Image(visible=False), [], ############################################## #History - die Frage oder das File eintragen... #in history_file ist ein file gespeichert, falls voher im Verlauf schon ein File hochgeladen wurde. #wird ein neuer File hochgeladen, so wird history_fiel dadurch ersetzt def add_text(chatbot, history, prompt, file, file_history): if (file == None): chatbot = chatbot +[(prompt, None)] else: file_history = file if (prompt == ""): chatbot=chatbot + [((file.name,), "Prompt fehlt!")] else: chatbot = chatbot +[("Hochgeladenes Dokument: "+ get_filename(file) +"\n" + prompt, None)] return chatbot, history, prompt, file, file_history, gr.Image(visible = False), "" def add_text2(chatbot, prompt): if (prompt == ""): chatbot = chatbot + [("", "Prompt fehlt!")] else: chatbot = chatbot + [(prompt, None)] print("chatbot nach add_text............") print(chatbot) return chatbot, prompt, "" ############################################ #nach dem Upload soll das zusätzliche Fenster mit dem image drinnen angezeigt werden def file_anzeigen(file): ext = analyze_file(file) if (ext == "png" or ext == "PNG" or ext == "jpg" or ext == "jpeg" or ext == "JPG" or ext == "JPEG"): return gr.Image(width=47, visible=True, interactive = False, height=47, min_width=47, show_label=False, show_share_button=False, show_download_button=False, scale = 0.5), file, file else: return gr.Image(width=47, visible=True, interactive = False, height=47, min_width=47, show_label=False, show_share_button=False, show_download_button=False, scale = 0.5), "data/file.png", file def file_loeschen(): return None, gr.Image(visible = False) ############################################ #wenn 'Stop' Button geklickt, dann Message dazu und das Eingabe-Fenster leeren def cancel_outputing(): reset_textbox() return "Stop Done" def reset_textbox(): return gr.update(value=""),"" ##################################################################### # Antwort des Vektorstores in ein Dictionary packen def parse_vectorstore_response(response_text): # Trenne die verschiedenen Einträge anhand des Musters entries = re.split(r"(\d+\. page_content=)", response_text) # Entferne das erste leere Element, falls vorhanden if entries[0] == '': entries.pop(0) # Kombiniere die Identifier und Inhalte wieder zu vollständigen Einträgen combined_entries = [] for i in range(0, len(entries), 2): if i + 1 < len(entries): combined_entries.append(entries[i] + entries[i + 1]) parsed_entries = [] for entry in combined_entries: # Extrahiere page_content page_content_match = re.search(r"page_content='(.*?)' metadata=", entry, re.DOTALL) page_content = page_content_match.group(1) if page_content_match else '' # Extrahiere metadata metadata_match = re.search(r"metadata=\{(.*?)\}", entry) metadata_str = metadata_match.group(1) if metadata_match else '' # Konvertiere metadata in ein Dictionary metadata = {} for item in metadata_str.split(', '): if ': ' in item: key, value = item.split(': ', 1) metadata[key.strip("'")] = value.strip("'") parsed_entries.append({ "page_content": page_content, "metadata": metadata }) return parsed_entries #History Eintrag vorbereiten def create_history_entry(page_content, metadata): source = metadata.get('source', 'No source available') page = metadata.get('page', 'No page information available') download_link = f"https://example.com/download/{source.replace('pdf/', '')}" return { "page_content": page_content, "page": page, "download_link": download_link } #################################################### #aus einem Text-Prompt die Antwort von KI bekommen def generate_text (prompt, chatbot, history, vektordatenbank, retriever, top_p=0.6, temperature=0.2, max_new_tokens=4048, max_context_length_tokens=2048, repetition_penalty=1.3, top_k=35): print("Text pur..............................") if (prompt == ""): raise gr.Error("Prompt ist erforderlich.") try: #oder an Hugging Face -------------------------- print("HF Anfrage.......................") #model_kwargs={"temperature": 0.5, "max_length": 512, "num_return_sequences": 1, "top_k": top_k, "top_p": top_p, "repetition_penalty": repetition_penalty} #llm = HuggingFaceHub(repo_id=repo_id, model_kwargs=model_kwargs) # Erstelle eine Pipeline mit den gewünschten Parametern #pipe = pipeline("text-generation", model=MODEL_NAME_HF, config={"temperature": 0.5, "max_length": 512, "num_return_sequences": 1, "top_k": top_k, "top_p": top_p, "repetition_penalty": repetition_penalty}) # Erstelle eine HuggingFaceEndPoints-Instanz mit den entsprechenden Endpunkt-Parametern llm = HuggingFaceEndpoint( endpoint_url=f"https://api-inference.huggingface.co/models/{MODEL_NAME_HF}", api_key=hf_token, temperature=0.5, max_length=512, top_k=top_k, top_p=top_p, repetition_penalty=repetition_penalty ) #Prompt an history anhängen und einen Text daraus machen history_text_und_prompt = generate_prompt_with_history(prompt, history) #zusätzliche Dokumenten Splits aus DB zum Prompt hinzufügen (aus VektorDB - Chroma oder Mongo DB) print("LLM aufrufen mit RAG: ...........") #result = rag_chain(history_text_und_prompt, vektordatenbank, ANZAHL_DOCS) result = rag_chain(llm, history_text_und_prompt, retriever) print("result regchain.....................") print(result) print("Ende result............................") except Exception as e: raise gr.Error(e) return result, False ############################################################## #Eingaben der GUI verarbeiten def generate_auswahl(prompt_in, file, file_history, chatbot, history, anzahl_docs=4, top_p=0.6, temperature=0.5, max_new_tokens=4048, max_context_length_tokens=2048, repetition_penalty=1.3,top_k=5, validate=False): global vektordatenbank, retriever #nur wenn man sich validiert hat, kann die Anwendung los legen if (validate and not prompt_in == "" and not prompt_in == None): # Vektorstore initialisieren #falls schon ein File hochgeladen wurde, ist es in history_file gespeichert - falls ein neues File hochgeladen wurde, wird es anschließend neu gesetzt neu_file = file_history #prompt normalisieren bevor er an die KIs geht prompt = preprocess_text(prompt_in) #muss nur einmal ausgeführt werden... #?????????????????????????????????????????????? Nicht passend zum Promt??????????????????????????? if vektordatenbank == None: print("db neu aufbauen!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1") splits = document_loading_splitting() if splits: vektordatenbank, retriever = document_storage_chroma(splits) print("db done............................") #kein Bild hochgeladen -> auf Text antworten... status = "Antwort der KI ..." if (file == None and file_history == None): results, status = generate_text(prompt, chatbot, history,vektordatenbank, retriever, top_p=0.6, temperature=0.5, max_new_tokens=4048, max_context_length_tokens=2048, repetition_penalty=1.3, top_k=3) else: #Es wurde ein File neu angehängt -> das hochladen und dann Prompt bearbeiten #das history_fiel muss neu gesetzt werden if (file != None): # file_history wird neu gesetzt in der Rückgabe dieser Funktion... neu_file = file #File hochladen in Chroma und dann Antwort generieren results = generate_text_zu_doc(neu_file, prompt, k, rag_option, chatbot, history, vektordatenbank) results_dictionary = parse_vectorstore_response(results) for text in results_dictionary: print("text..............."+str(text)) entry = create_history_entry(text['page_content'], text['metadata']) print("hier...........................") history = history + [[prompt_in, entry]] return chatbot, history, None, file_history, "" else: #noch nicht validiert, oder kein Prompt return chatbot, history, None, file_history, "Erst validieren oder einen Prompt eingeben!" ######################################## # Bot- test gegen schädliche Bots die die Anwendung testen... # Funktion zur Überprüfung der Benutzereingabe # Funktion zur Überprüfung der Eingabe und Aktivierung der Hauptanwendung def validate_input(user_input_validate, validate=False): user_input_hashed = hash_input(user_input_validate) if user_input_hashed == hash_input(ANTI_BOT_PW): return "Richtig! Weiter gehts... ", True, gr.Textbox(visible=False), gr.Button(visible=False) else: return "Falsche Antwort!!!!!!!!!", False, gr.Textbox(label = "", placeholder="Bitte tippen Sie das oben im Moodle Kurs angegebene Wort ein, um zu beweisen, dass Sie kein Bot sind.", visible=True, scale= 5), gr.Button("Validieren", visible = True) def custom_css(): return """ body, html { background-color: #303030; /* Dunkler Hintergrund */ color:#353535; } """ ############################################################################################# # Start Gui Vorabfrage # Validierungs-Interface - Bots weghalten... print ("Start GUI Vorabfrage") ################################################################################################# print ("Start GUI Hauptanwendung") with open("custom.css", "r", encoding="utf-8") as f: customCSS = f.read() #Add Inputs für Tab 2 additional_inputs = [ gr.Slider(label="Temperature", value=0.65, minimum=0.0, maximum=1.0, step=0.05, interactive=True, info="Höhere Werte erzeugen diversere Antworten", visible=True), gr.Slider(label="Max new tokens", value=1024, minimum=0, maximum=4096, step=64, interactive=True, info="Maximale Anzahl neuer Tokens", visible=True), gr.Slider(label="Top-p (nucleus sampling)", value=0.6, minimum=0.0, maximum=1, step=0.05, interactive=True, info="Höhere Werte verwenden auch Tokens mit niedrigerer Wahrscheinlichkeit.", visible=True), gr.Slider(label="Repetition penalty", value=1.2, minimum=1.0, maximum=2.0, step=0.05, interactive=True, info="Strafe für wiederholte Tokens", visible=True) ] with gr.Blocks(css=customCSS, theme=themeAlex) as demo: #validiert speichern validate = gr.State(False) #Session Variablen, um Weete zu speichern, auch wenn die Felder in der GUI bereits wieder leer sind # history parallel zu chatbot speichern - da in chatbot bei Bildern zum Anzeigen in der GUI die Bilder speziell formatiert werden, # für die Übergabe an die ki aber der Pfad zum Bild behalten werden muss - was in der history der Fall ist! history = gr.State([]) uploaded_file_paths= gr.State([]) history3 = gr.State([]) uploaded_file_paths3= gr.State([]) #alle chats einer Session sammeln chats = gr.State({}) #damit der Prompt auch nach dem upload in die History noch für predicts_args verfügbar ist user_question = gr.State("") #für die anderen Tabs auch... #damit der Prompt auch nach dem upload in die History noch für predicts_args verfügbar ist user_question2 = gr.State("") user_question3 = gr.State("") attached_file = gr.State(None) attached_file_history = gr.State(None) attached_file3 = gr.State(None) attached_file_history3 = gr.State(None) status_display = gr.State("") status_display2 = gr.State("") status_display3 = gr.State("") ################################################ # Tab zum Chatbot mit Text oder Bildeingabe ################################################ gr.Markdown(description_top) with gr.Row(): user_input_validate =gr.Textbox(label= "Bitte das oben im Moodle Kurs angegebene Wort eingeben, um die Anwendung zu starten", visible=True, interactive=True, scale= 7) validate_btn = gr.Button("Validieren", visible = True) #validation_result = gr.Text(label="Validierungsergebnis") with gr.Tab("KKG Chatbot"): with gr.Row(): #gr.HTML("LI Chatot") status_display = gr.Markdown("Antwort der KI ...", visible = True) #, elem_id="status_display") with gr.Row(): with gr.Column(scale=5): with gr.Row(): chatbot = gr.Chatbot(elem_id="li-chat",show_copy_button=True) with gr.Row(): with gr.Column(scale=12): user_input = gr.Textbox( show_label=False, placeholder="Gib hier deinen Prompt ein...", container=False ) with gr.Column(min_width=70, scale=1): submitBtn = gr.Button("Senden") with gr.Column(min_width=70, scale=1): cancelBtn = gr.Button("Stop") with gr.Row(): image_display = gr.Image( visible=False) upload = gr.UploadButton("📁", file_types=["image", "pdf", "docx", "pptx", "xlsx"], scale = 10) emptyBtn = gr.ClearButton([user_input, chatbot, history, attached_file, attached_file_history, image_display], value="🧹 Neue Session", scale=10) with gr.Column(): with gr.Column(min_width=50, scale=1): with gr.Tab(label="Chats ..."): #Geht nicht, da für alle gleichzeitig sichtbar #chat_selector = gr.CheckboxGroup(label="", choices=update_chat_options()) #download_button = gr.Button("Download ausgewählte Chats") file_download = gr.File(label="Noch keine Chatsverläufe", visible=True, interactive = False, file_count="multiple",) with gr.Tab(label="Parameter"): #gr.Markdown("# Parameters") #rag_option = gr.Radio(["Aus", "An"], label="KKG Erweiterungen (RAG)", value = "Aus") model_option = gr.Radio(["HuggingFace"], label="Modellauswahl", value = "HuggingFace") #websuche = gr.Radio(["Aus", "An"], label="Web-Suche", value = "Aus") top_p = gr.Slider( minimum=-0, maximum=1.0, value=0.95, step=0.05, interactive=True, label="Top-p", visible=False, ) top_k = gr.Slider( minimum=1, maximum=100, value=35, step=1, interactive=True, label="Top-k", visible=False, ) temperature = gr.Slider( minimum=0.1, maximum=2.0, value=0.2, step=0.1, interactive=True, label="Temperature", visible=False ) max_length_tokens = gr.Slider( minimum=0, maximum=512, value=512, step=8, interactive=True, label="Max Generation Tokens", visible=False, ) max_context_length_tokens = gr.Slider( minimum=0, maximum=4096, value=2048, step=128, interactive=True, label="Max History Tokens", visible=False, ) repetition_penalty=gr.Slider(label="Repetition penalty", value=1.2, minimum=1.0, maximum=2.0, step=0.05, interactive=True, info="Strafe für wiederholte Tokens", visible=False) anzahl_docs = gr.Slider(label="Anzahl Dokumente", value=3, minimum=1, maximum=10, step=1, interactive=True, info="wie viele Dokumententeile aus dem Vektorstore an den prompt gehängt werden", visible=False) openai_key = gr.Textbox(label = "OpenAI API Key", value = "sk-", lines = 1, visible = False) gr.Markdown(description) ###################################### # Events und Übergabe Werte an Funktionen ####################################### ###################################### # Für Tab 1: Chatbot #Argumente für generate Funktion als Input predict_args = dict( fn=generate_auswahl, inputs=[ user_question, attached_file, attached_file_history, chatbot, history, anzahl_docs, top_p, temperature, max_length_tokens, max_context_length_tokens, repetition_penalty, top_k, validate ], outputs=[chatbot, history, attached_file, attached_file_history, status_display], show_progress=True, ) reset_args = dict( fn=reset_textbox, inputs=[], outputs=[user_input, status_display] ) # Chatbot transfer_input_args = dict( fn=add_text, inputs=[chatbot, history, user_input, attached_file, attached_file_history], outputs=[chatbot, history, user_question, attached_file, attached_file_history, image_display , user_input], show_progress=True ) ############################################## # Button Events.... #Validation Button # Event-Handler für die Validierung validate_btn.click(validate_input, inputs=[user_input_validate, validate], outputs=[status_display, validate, user_input_validate, validate_btn]) user_input_validate.submit(validate_input, inputs=[user_input_validate, validate], outputs=[status_display, validate, user_input_validate, validate_btn]) predict_event1 = user_input.submit(**transfer_input_args, queue=False,).then(**predict_args) predict_event2 = submitBtn.click(**transfer_input_args, queue=False,).then(**predict_args) predict_event3 = upload.upload(file_anzeigen, [upload], [image_display, image_display, attached_file] ) #.then(**predict_args) emptyBtn.click(clear_all, [history, uploaded_file_paths, chats], [attached_file, image_display, uploaded_file_paths, history, file_download, chats]) #Bild Anzeige neben dem Button wieder entfernen oder austauschen.. image_display.select(file_loeschen, [], [attached_file, image_display]) #download_button.click(fn=download_chats, inputs=chat_selector, outputs=[file_download]) #Berechnung oder Ausgabe anhalten (kann danach fortgesetzt werden) cancelBtn.click(cancel_outputing, [], [status_display], cancels=[predict_event1,predict_event2, predict_event3]) demo.title = "KKG-ChatBot" demo.queue(default_concurrency_limit=15).launch(debug=True)