Spaces:
Sleeping
Sleeping
import os | |
import spaces | |
from threading import Thread | |
from typing import Iterator | |
from backend2 import load_documents, prepare_documents, get_context_sources | |
import gradio as gr | |
import torch | |
from transformers import AutoModelForCausalLM, GemmaTokenizerFast, TextIteratorStreamer | |
from huggingface_hub import login | |
huggingface_token = os.getenv("HUGGINGFACE_TOKEN") | |
login(huggingface_token) | |
DESCRIPTION = """\ | |
# La Chatbot degli Osservatori | |
""" | |
MAX_MAX_NEW_TOKENS = 2048 | |
DEFAULT_MAX_NEW_TOKENS = 1024 | |
os.environ["MAX_INPUT_TOKEN_LENGTH"] = "4096" #"8192" | |
MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH")) | |
# Force usage of CPU | |
#device = torch.device("cpu") | |
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
model_id = "google/gemma-2-2b-it" | |
model = AutoModelForCausalLM.from_pretrained( | |
model_id, | |
device_map="auto", | |
torch_dtype= torch.bfloat16 #torch.float16 if torch.cuda.is_available() else torch.float32, | |
) | |
tokenizer = GemmaTokenizerFast.from_pretrained(model_id) | |
#tokenizer = AutoTokenizer.from_pretrained(model_id) | |
tokenizer.use_default_system_prompt = False | |
model.config.sliding_window = 4096 | |
#model = model.to(device) | |
model.eval() | |
###------#### | |
# rag | |
documents_paths = { | |
'blockchain': 'documents/blockchain', | |
'metaverse': 'documents/metaverso', | |
'payment': 'documents/payment' | |
} | |
global session_state | |
session_state = {"index": False, | |
"documents_loaded": False, | |
"document_db": None, | |
"original_message": None, | |
"clarification": False} | |
INSTRUCTION_1 = 'In italiano, chiedi molto brevemente se la domanda si riferisce agli "Osservatori Blockchain", "Osservatori Payment" oppure "Osservatori Metaverse".' | |
INSTRUCTION_2 = 'Sei un assistente che risponde in italiano alle domande basandosi solo sulle informazioni fornite nel contesto che ti darò. Se non trovi informazioni, rispondi "Puoi chiedere maggiori informazioni all\'ufficio di riferimento.". Se invece la domanda è completamente fuori contesto, non rispondere e rammenta il topic del contesto' | |
def generate( | |
message: str, | |
chat_history: list[tuple[str, str]], | |
max_new_tokens: int = 1024, | |
temperature: float = 0.6, | |
top_p: float = 0.9, | |
top_k: int = 50, | |
repetition_penalty: float = 1.2, | |
) -> Iterator[str]: | |
global matched_path | |
conversation = [] | |
for user, assistant in chat_history: | |
conversation.extend( | |
[ | |
{"role": "user", "content": user}, | |
{"role": "assistant", "content": assistant}, | |
] | |
) | |
if not session_state["index"]: | |
matched_path = None | |
words = message.lower() | |
for key, path in documents_paths.items(): | |
if key in words: | |
matched_path = path | |
break | |
if matched_path: | |
documents = load_documents(matched_path) | |
DB = prepare_documents(documents) | |
context, sources = get_context_sources(message, DB) | |
print("*** sources ***", sources) | |
gr.Info("doc preparati") | |
session_state["index"] = True | |
else: ## CHIEDI CHIARIMENTO | |
conversation.append({"role": "user", "content": f"Domanda: {message} . Comando: {INSTRUCTION_1}" }) | |
gr.Info("richiesta di chiarimento") | |
print("******** CONV1 ", conversation) | |
else: | |
documents = load_documents(matched_path) | |
DB = prepare_documents(documents) | |
context, sources = get_context_sources(message, DB) | |
gr.Info("contesto già indicizzato") | |
conversation.append({"role": "user", "content": f"{INSTRUCTION_2}"}) | |
conversation.append({"role": "assistant", "content": "Ok."}) | |
conversation.append({"role": "user", "content": f'Contesto: {context}\n\n Domanda: {message}.'}) | |
print("******** CONV2 ", conversation) | |
# Iterate model output | |
input_ids = tokenizer.apply_chat_template(conversation, add_generation_prompt=True, return_tensors="pt") | |
if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: | |
input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] | |
gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") | |
input_ids = input_ids.to(model.device) | |
streamer = TextIteratorStreamer(tokenizer, timeout=None, skip_prompt=True, skip_special_tokens=True) | |
generate_kwargs = dict( | |
{"input_ids": input_ids}, | |
streamer=streamer, | |
max_new_tokens=max_new_tokens, | |
do_sample=True, | |
top_p=top_p, | |
top_k=top_k, | |
temperature=temperature, | |
num_beams=1, | |
repetition_penalty=repetition_penalty, | |
) | |
t = Thread(target=model.generate, kwargs=generate_kwargs) | |
t.start() | |
outputs = [] | |
for text in streamer: | |
outputs.append(text) | |
yield "".join(outputs) | |
if session_state["documents_loaded"]: | |
outputs.append(f"Fonti utilizzate: {sources}") | |
yield "".join(outputs) | |
#sources = [] | |
print("debug - CHATHISTORY", chat_history) | |
chat_interface = gr.ChatInterface( | |
fn=generate, | |
additional_inputs=[ | |
gr.Slider( | |
label="Max new tokens", | |
minimum=1, | |
maximum=MAX_MAX_NEW_TOKENS, | |
step=1, | |
value=DEFAULT_MAX_NEW_TOKENS, | |
), | |
gr.Slider( | |
label="Temperature", | |
minimum=0.1, | |
maximum=4.0, | |
step=0.1, | |
value=0.6, | |
), | |
gr.Slider( | |
label="Top-p (nucleus sampling)", | |
minimum=0.05, | |
maximum=1.0, | |
step=0.05, | |
value=0.9, | |
), | |
gr.Slider( | |
label="Top-k", | |
minimum=1, | |
maximum=1000, | |
step=1, | |
value=50, | |
), | |
gr.Slider( | |
label="Repetition penalty", | |
minimum=1.0, | |
maximum=2.0, | |
step=0.05, | |
value=1.2, | |
), | |
], | |
stop_btn=None, | |
examples=[ | |
["Ciao, in cosa puoi aiutarmi?"], | |
["Ciao, in cosa consiste un piatto di spaghetti?"], | |
["Ciao, quali sono le aziende che hanno iniziato ad integrare le stablecoins? Fammi un breve sommario."], | |
["Spiegami la differenza tra mondi virtuali pubblici o privati"], | |
["Trovami un esempio di progetto B2B"], | |
["Quali sono le regole europee sui bonifici istantanei?"], | |
], | |
cache_examples=False, | |
) | |
with gr.Blocks(css=".gradio-container {background-color: #B9D9EB}", fill_height=True) as demo: | |
gr.Markdown(DESCRIPTION, elem_classes="centered") | |
chat_interface.render() | |
if __name__ == "__main__": | |
#demo.queue(max_size=20).launch() | |
demo.launch() |