######################################################################################### # Title: Gradio Writing Assistant # Author: Andreas Fischer # Date: May 23th, 2024 # Last update:June 12th, 2024 ########################################################################################## #https://github.com/abetlen/llama-cpp-python/issues/306 #sudo apt install libclblast-dev #CMAKE_ARGS="-DLLAMA_CLBLAST=on" FORCE_CMAKE=1 pip install llama-cpp-python --force-reinstall --upgrade --no-cache-dir -v # Prepare resources #------------------- import torch import gc torch.cuda.empty_cache() gc.collect() # Chroma-DB #----------- import os import chromadb dbPath = "/home/af/Schreibtisch/Code/gradio/Chroma/db" onPrem = True if(os.path.exists(dbPath)) else False if(onPrem==False): dbPath="/home/user/app/db" #onPrem=True # uncomment to override automatic detection print(dbPath) #client = chromadb.Client() path=dbPath client = chromadb.PersistentClient(path=path) print(client.heartbeat()) print(client.get_version()) print(client.list_collections()) from chromadb.utils import embedding_functions default_ef = embedding_functions.DefaultEmbeddingFunction() #sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="T-Systems-onsite/cross-en-de-roberta-sentence-transformer") #instructor_ef = embedding_functions.InstructorEmbeddingFunction(model_name="hkunlp/instructor-large", device="cuda") embeddingModel = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="T-Systems-onsite/cross-en-de-roberta-sentence-transformer", device="cuda" if(onPrem) else "cpu") print(str(client.list_collections())) global collection dbName="writingStyleDB1" if("name="+dbName in str(client.list_collections())): client.delete_collection(name=dbName) # deletes collection if("name="+dbName in str(client.list_collections())): print(dbName+" found!") collection = client.get_collection(name=dbName, embedding_function=embeddingModel) #sentence_transformer_ef) else: #client.delete_collection(name=dbName) print(dbName+" created!") collection = client.create_collection( dbName, embedding_function=embeddingModel, metadata={"hnsw:space": "cosine"}) print("Database ready!") print(collection.count()) x=collection.get(include=[])["ids"] if(len(x)==0): x=collection.get(include=[])["ids"] collection.add( documents=["Ich möchte einen Blogbeitrag","Ich möchte einen Gliederungsvorschlag","Ich möchte einen Social Media Beitrag"], metadatas=[ {"prompt": "Bitte schreibe einen detaillierten Blogbeitrag zur Anfrage des Users, mit allen relevanten Informationen zum Thema!", "genre":"Beitrag"}, {"prompt": "Bitte entwerfe einen Gliederungsvorschlag zur Anfrage des Users!", "genre":"Gliederungsvorschlag"}, {"prompt": "Bitte verfasse einen Beitrag für die professionelle social media Plattform LinkedIn zur Anfrage des Users!", "genre":"Social Media Beitrag"}], ids=[str(len(x)+1),str(len(x)+2),str(len(x)+3)] ) RAGResults=collection.query( query_texts=["Dies ist ein Test"], n_results=1, #where={"source": "USER"} ) RAGResults["metadatas"][0][0]["prompt"] x=collection.get(where_document={"$contains":"Blogbeitrag"},include=["metadatas"])['metadatas'][0]['prompt'] # Model #------- onPrem=False myModel="mistralai/Mixtral-8x7B-Instruct-v0.1" if(onPrem==False): modelPath=myModel from huggingface_hub import InferenceClient import gradio as gr client = InferenceClient( model=modelPath, #token="hf_..." ) else: import os import requests import subprocess #modelPath="/home/af/gguf/models/c4ai-command-r-v01-Q4_0.gguf" #modelPath="/home/af/gguf/models/Discolm_german_7b_v1.Q4_0.gguf" modelPath="/home/af/gguf/models/Mixtral-8x7b-instruct-v0.1.Q4_0.gguf" if(os.path.exists(modelPath)==False): #url="https://huggingface.co/TheBloke/DiscoLM_German_7b_v1-GGUF/resolve/main/discolm_german_7b_v1.Q4_0.gguf?download=true" url="https://huggingface.co/TheBloke/Mixtral-8x7B-Instruct-v0.1-GGUF/resolve/main/mixtral-8x7b-instruct-v0.1.Q4_0.gguf?download=true" response = requests.get(url) with open("./Mixtral-8x7b-instruct.gguf", mode="wb") as file: file.write(response.content) print("Model downloaded") modelPath="./Mixtral-8x7b-instruct.gguf" print(modelPath) n="20" if("Mixtral-8x7b-instruct" in modelPath): n="0" # mixtral seems to cause problems here... command = ["python3", "-m", "llama_cpp.server", "--model", modelPath, "--host", "0.0.0.0", "--port", "2600", "--n_threads", "8", "--n_gpu_layers", n] subprocess.Popen(command) print("Server ready!") # Check template #---------------- if(False): from transformers import AutoTokenizer #mod="mistralai/Mixtral-8x22B-Instruct-v0.1" #mod="mistralai/Mixtral-8x7b-instruct-v0.1" mod="VAGOsolutions/Llama-3-SauerkrautLM-8b-Instruct" tok=AutoTokenizer.from_pretrained(mod) #,token="hf_...") cha=[{"role":"system","content":"A"},{"role":"user","content":"B"},{"role":"assistant","content":"C"}] res=tok.apply_chat_template(cha) print(tok.decode(res)) cha=[{"role":"user","content":"U1"},{"role":"assistant","content":"A1"},{"role":"user","content":"U2"},{"role":"assistant","content":"A2"}] res=tok.apply_chat_template(cha) print(tok.decode(res)) # Gradio-GUI #------------ import re def extend_prompt(message="", history=None, system=None, RAGAddon=None, system2=None, zeichenlimit=None,historylimit=4, removeHTML=True): startOfString="" if zeichenlimit is None: zeichenlimit=1000000000 # :-) template0=" [INST]{system}\n [/INST] " template1=" [INST] {message} [/INST]" template2=" {response}" if("command-r" in modelPath): #https://huggingface.co/CohereForAI/c4ai-command-r-v01 ## <|START_OF_TURN_TOKEN|><|USER_TOKEN|>Hello, how are you?<|END_OF_TURN_TOKEN|><|START_OF_TURN_TOKEN|><|CHATBOT_TOKEN|> template0="<|START_OF_TURN_TOKEN|><|SYSTEM_TOKEN|> {system}<|END_OF_TURN_TOKEN|>" template1="<|START_OF_TURN_TOKEN|><|USER_TOKEN|>{message}<|END_OF_TURN_TOKEN|><|START_OF_TURN_TOKEN|><|CHATBOT_TOKEN|>" template2="{response}<|END_OF_TURN_TOKEN|>" if("Gemma-" in modelPath): # https://huggingface.co/mistralai/Mixtral-8x7B-Instruct-v0.1 template0="user{system}" template1="user{message}model" template2="{response}" if("Mixtral-8x22B-Instruct" in modelPath): # AutoTokenizer: [INST] U1[/INST] A1[INST] U2[/INST] A2 startOfString="" template0="[INST]{system}\n [/INST] " template1="[INST] {message}[/INST]" template2=" {response}" if("Mixtral-8x7b-instruct" in modelPath): # https://huggingface.co/mistralai/Mixtral-8x7B-Instruct-v0.1 startOfString="" # AutoTokenzizer: [INST] U1 [/INST]A1 [INST] U2 [/INST]A2 template0=" [INST]{system}\n [/INST] " template1=" [INST] {message} [/INST]" template2=" {response}" if("Mistral-7B-Instruct" in modelPath): #https://huggingface.co/mistralai/Mistral-7B-Instruct-v0.2 startOfString="" template0="[INST]{system}\n [/INST]" template1="[INST] {message} [/INST]" template2=" {response}" if("Openchat-3.5" in modelPath): #https://huggingface.co/TheBloke/openchat-3.5-0106-GGUF template0="GPT4 Correct User: {system}<|end_of_turn|>GPT4 Correct Assistant: Okay.<|end_of_turn|>" template1="GPT4 Correct User: {message}<|end_of_turn|>GPT4 Correct Assistant: " template2="{response}<|end_of_turn|>" if(("Discolm_german_7b" in modelPath) or ("SauerkrautLM-7b-HerO" in modelPath)): #https://huggingface.co/VAGOsolutions/SauerkrautLM-7b-HerO template0="<|im_start|>system\n{system}<|im_end|>\n" template1="<|im_start|>user\n{message}<|im_end|>\n<|im_start|>assistant\n" template2="{response}<|im_end|>\n" if("Llama-3-SauerkrautLM-8b-Instruct" in modelPath): #https://huggingface.co/VAGOsolutions/SauerkrautLM-7b-HerO template0="<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\n{system}<|eot_id|>" template1="<|start_header_id|>user<|end_header_id|>\n\n{message}<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n" template2="{response}<|eot_id|>\n" if("WizardLM-13B-V1.2" in modelPath): #https://huggingface.co/WizardLM/WizardLM-13B-V1.2 template0="{system} " # template1="USER: {message} ASSISTANT: " template2="{response}" if("Phi-2" in modelPath): #https://huggingface.co/TheBloke/phi-2-GGUF template0="Instruct: {system}\nOutput: Okay.\n" template1="Instruct: {message}\nOutput:" template2="{response}\n" prompt = "" if RAGAddon is not None: system += RAGAddon if system is not None: prompt += template0.format(system=system) #"" if history is not None: for user_message, bot_response in history[-historylimit:]: if user_message is None: user_message = "" if bot_response is None: bot_response = "" bot_response = re.sub("\n\n.*?","", bot_response, flags=re.DOTALL) # remove RAG-compontents if removeHTML==True: bot_response = re.sub("<(.*?)>","\n", bot_response) # remove HTML-components in general (may cause bugs with markdown-rendering) if user_message is not None: prompt += template1.format(message=user_message[:zeichenlimit]) if bot_response is not None: prompt += template2.format(response=bot_response[:zeichenlimit]) if message is not None: prompt += template1.format(message=message[:zeichenlimit]) if system2 is not None: prompt += system2 return startOfString+prompt import gradio as gr import requests import json from datetime import datetime import os import re def response(message, history,customSysPrompt, genre, augmentation, hfToken): if((onPrem==False) & (hfToken.startswith("hf_"))): # use HF-hub with custom token if token is provided from huggingface_hub import InferenceClient import gradio as gr global client client = InferenceClient( model=myModel, token=hfToken ) removeHTML=True system=customSysPrompt # system-prompt can be changed in the UI (usually defaults to something like the following system-prompt) if(system==""): system="Du bist wissenschaftlicher Mitarbeiter an einem Forschungsinstitut und zuständig für die Wissenschaftskommunikation." if(augmentation==True): system=system+"\nFür eine besonders gelungene Lösung erhältst du eine Gehaltserhöhung! Schreibe deine Texte in natürlicher und einfacher Sprache. Zielgruppe sind deutschsprachige Personen mit unterschiedlichen Bildungshintergründen." message=message.replace("[INST]","") message=message.replace("[/INST]","") message=message.replace("","") message=re.sub("<[|](im_start|im_end|end_of_turn)[|]>", '', message) x=collection.get(include=[])["ids"] rag=None # RAG is turned off until history gets too long historylimit=2 if(genre==""): # use RAG to define genre if there is none RAGResults=collection.query(query_texts=[message], n_results=1) genre=str(RAGResults['documents'][0][0]) # determine genre based on best-matching db-entry rag="\n\n"+collection.get(where={"genre": genre},include=["metadatas"])['metadatas'][0]['prompt'] # genre-specific addendum to system prompt (rag) if(len(history)>0): rag=rag+"\nFalls der User Rückfragen oder Änderungsvorschläge zu deinem Entwurf hat, gehe darauf ein." # add dialog-specific addendum to rag system2=None # system2 can be used as fictive first words of the AI, which are not displayed or stored prompt=extend_prompt( message, # current message of the user history, # complete history system, # system prompt rag, # RAG-component added to the system prompt system2, # fictive first words of the AI (neither displayed nor stored) historylimit=historylimit,# number of past messages to consider for response to current message removeHTML=removeHTML # remove HTML-components from History (to prevent bugs with Markdown) ) if(True): print("\n\nMESSAGE:"+str(message)) print("\n\nHISTORY:"+str(history)) print("\n\nSYSTEM:"+str(system)) print("\n\nRAG:"+str(rag)) print("\n\nSYSTEM2:"+str(system2)) print("\n\n*** Prompt:\n"+prompt+"\n***\n\n") ## Request response from model #------------------------------ print("AI running on prem!" if(onPrem) else "AI running HFHub!") if(onPrem==False): temperature=float(0.9) max_new_tokens=3000 top_p=0.95 repetition_penalty=1.0 if temperature < 1e-2: temperature = 1e-2 top_p = float(top_p) generate_kwargs = dict( temperature=temperature, max_new_tokens=max_new_tokens, top_p=top_p, repetition_penalty=repetition_penalty, do_sample=True, seed=42, ) stream = client.text_generation(prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) response = "" #print("User: "+message+"\nAI: ") for text in stream: part=text.token.text #print(part, end="", flush=True) response += part if removeHTML==True: response = re.sub("<(.*?)>","\n", response) # remove HTML-components in general (may cause bugs with markdown-rendering) yield response if(onPrem==True): # url="https://afischer1985-wizardlm-13b-v1-2-q4-0-gguf.hf.space/v1/completions" url="http://0.0.0.0:2600/v1/completions" body={"prompt":prompt,"max_tokens":None, "echo":"False","stream":"True"} # e.g. Mixtral-Instruct if("Discolm_german_7b" in modelPath): body.update({"stop": ["<|im_end|>"]}) # fix stop-token of DiscoLM if("Gemma-" in modelPath): body.update({"stop": ["<|im_end|>",""]}) # fix stop-token of Gemma response="" #+"("+myType+")\n" buffer="" #print("URL: "+url) #print("User: "+message+"\nAI: ") for text in requests.post(url, json=body, stream=True): #-H 'accept: application/json' -H 'Content-Type: application/json' if buffer is None: buffer="" buffer=str("".join(buffer)) # print("*** Raw String: "+str(text)+"\n***\n") text=text.decode('utf-8') if((text.startswith(": ping -")==False) & (len(text.strip("\n\r"))>0)): buffer=buffer+str(text) # print("\n*** Buffer: "+str(buffer)+"\n***\n") buffer=buffer.split('"finish_reason": null}]}') if(len(buffer)==1): buffer="".join(buffer) pass if(len(buffer)==2): part=buffer[0]+'"finish_reason": null}]}' if(part.lstrip('\n\r').startswith("data: ")): part=part.lstrip('\n\r').replace("data: ", "") try: part = str(json.loads(part)["choices"][0]["text"]) #print(part, end="", flush=True) response=response+part buffer="" # reset buffer except Exception as e: print("Exception:"+str(e)) pass if removeHTML==True: response = re.sub("<(.*?)>","\n", response) # remove HTML-components in general (may cause bugs with markdown-rendering) yield response history.append((message, response)) # add current dialog to history val=None gr.ChatInterface( response, chatbot=gr.Chatbot(value=val, render_markdown=True), title="KI Schreibassistenz (lokal)" if onPrem else "KI Schreibassistenz", description="
Benenne ein Thema (sowie ggf. weitere Vorgaben) und klicke auf 'Submit' um einen Text dazu generieren zu lassen.
Solltest du eine bestimmte Art von Text benötigen, wähle unter 'Additional Inputs' ein geeignetes Genre aus.
", additional_inputs=[ gr.Textbox(info="Basiskomponente der Anweisungen, die vor dem Dialog an das System gehen.", value="Du bist wissenschaftlicher Mitarbeiter an einem Forschungsinstitut und zuständig für die Wissenschaftskommunikation.", label="System Prompt"), gr.Dropdown(info="Wähle das gewünschte Genre des zu schreibenden Textes", choices=["Beitrag","Gliederungsvorschlag","Social Media Beitrag",""], value="Beitrag", label="Genre"), gr.Checkbox(info="Optional: Automatische Ergänzung des System Prompt um Formulierungen, die hochwertigere Ergebnisse erwarten lassen.", label="Motivationsschub"), gr.Textbox(info="Optional: Gib einen gültigen Huggingface Access Token an, um mehr Texte produzieren zu können.", value="", label="HF_token"), ] ).queue().launch(share=True) #False, server_name="0.0.0.0", server_port=7864) print("Interface up and running!")