Spaces:
Sleeping
Sleeping
import os | |
import spaces | |
from threading import Thread | |
from typing import Iterator | |
from backend2 import load_documents, prepare_documents, get_context_sources | |
import gradio as gr | |
import torch | |
from transformers import AutoModelForCausalLM, GemmaTokenizerFast, TextIteratorStreamer | |
from huggingface_hub import login | |
import threading | |
huggingface_token = os.getenv("HUGGINGFACE_TOKEN") | |
login(huggingface_token) | |
DESCRIPTION = """\ | |
# La Chatbot degli Osservatori | |
""" | |
MAX_MAX_NEW_TOKENS = 2048 | |
DEFAULT_MAX_NEW_TOKENS = 1024 | |
os.environ["MAX_INPUT_TOKEN_LENGTH"] = "4096" #"8192" | |
MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH")) | |
# Force usage of CPU | |
#device = torch.device("cpu") | |
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
model_id = "google/gemma-2-2b-it" | |
model = AutoModelForCausalLM.from_pretrained( | |
model_id, | |
device_map="auto", | |
torch_dtype= torch.float16 if torch.cuda.is_available() else torch.float32, | |
) | |
tokenizer = GemmaTokenizerFast.from_pretrained(model_id) | |
#tokenizer = AutoTokenizer.from_pretrained(model_id) | |
tokenizer.use_default_system_prompt = False | |
model.config.sliding_window = 4096 | |
#model = model.to(device) | |
model.eval() | |
###------#### | |
# rag | |
documents_paths = { | |
'blockchain': 'documents/blockchain', | |
'metaverse': 'documents/metaverso', | |
'payment': 'documents/payment' | |
} | |
"""session_state = {"documents_loaded": False, | |
"document_db": None, | |
"original_message": None, | |
"clarification": False}""" | |
INSTRUCTION_1 = 'In italiano, chiedi molto brevemente se la domanda si riferisce agli "Osservatori Blockchain", "Osservatori Payment" oppure "Osservatori Metaverse".' | |
INSTRUCTION_2 = 'Sei un assistente che risponde in italiano alle domande basandosi solo sulle informazioni fornite nel contesto. Se non trovi informazioni, rispondi "Puoi chiedere maggiori informazioni all\'ufficio di riferimento.". Se invece la domanda è completamente fuori contesto, non rispondere e rammenta il topic del contesto' | |
default_error_response = ( | |
'Non sono sicuro che tu voglia indirizzare la tua ricerca su una di queste opzioni: ' | |
'"Blockchain", "Metaverse", "Payment". ' | |
'Per favore utilizza il nome corretto.' | |
) | |
thread_local = threading.local() | |
def get_session_state(): | |
if not hasattr(thread_local, "session_state"): | |
thread_local.session_state = { | |
"documents_loaded": False, | |
"document_db": None, | |
"original_message": None, | |
"clarification": False | |
} | |
return thread_local.session_state | |
def generate( | |
message: str, | |
chat_history: list[tuple[str, str]], | |
max_new_tokens: int = 1024, | |
temperature: float = 0.6, | |
top_p: float = 0.9, | |
top_k: int = 50, | |
repetition_penalty: float = 1.2, | |
) -> Iterator[str]: | |
session_state = get_session_state() | |
global context, sources, conversation | |
if (not (session_state["documents_loaded"]) and not (session_state["clarification"])): | |
conversation = [] | |
for user, assistant in chat_history: | |
conversation.extend( | |
[ | |
{"role": "user", "content": user}, | |
{"role": "assistant", "content": assistant}, | |
] | |
) | |
conversation.append({"role": "user", "content": f"Domanda: {message} . Comando: {INSTRUCTION_1}" }) | |
conversation.append({"role": "assistant", "content": "Ok."}) | |
print("debug - CONV1", conversation) | |
session_state["original_message"] = message | |
session_state["clarification"] = True | |
elif session_state["clarification"]: | |
message = message.lower() | |
matched_path = None | |
for key, path in documents_paths.items(): | |
if key in message: | |
matched_path = path | |
break | |
if matched_path: | |
yield "Fammi cercare tra i miei documenti..." | |
documents = load_documents(matched_path) | |
session_state["document_db"] = prepare_documents(documents) | |
session_state["documents_loaded"] = True | |
yield f"Ecco, ho raccolto informazioni dagli Osservatori {key.capitalize()}. Ora sto elaborando una risposta per te..." | |
sources = [] | |
context, sources = get_context_sources(session_state["original_message"], session_state["document_db"]) | |
print("sources ", sources) | |
print("context ", context) | |
conversation = [] | |
conversation.append({"role": "user", "content": f"{INSTRUCTION_2}"}) | |
conversation.append({"role": "assistant", "content": "Ok."}) | |
for user, assistant in chat_history: | |
conversation.extend( | |
[ | |
{"role": "user", "content": user }, | |
{"role": "assistant", "content": assistant}, | |
] | |
) | |
conversation.append({"role": "user", "content": f'Contesto: {context}\n\n Domanda iniziale: {session_state["original_message"]} . Rispondi solo in italiano.'}) | |
session_state["clarification"] = False | |
print("debug - CONV2", conversation) | |
else: | |
print(default_error_response) | |
gr.Info("NO MATCH") | |
else: | |
conversation = [] | |
conversation.append({"role": "user", "content": f"Comandi: {INSTRUCTION_2}"}) | |
conversation.append({"role": "assistant", "content": "Va bene."}) | |
for user, assistant in chat_history: | |
conversation.extend( | |
[ | |
{"role": "user", "content": user}, | |
{"role": "assistant", "content": assistant}, | |
] | |
) | |
conversation.append({"role": "user", "content": f"Contesto: {context}\n\n Nuova domanda: {message} . Rispondi in italiano e seguendo i comandi che ti ho dato prima"}) | |
print("debug - CONV3", conversation) | |
""" retriever = db.as_retriever() | |
qa = RetrievalQA.from_chain_type(llm=model, chain_type="refine", retriever=retriever, return_source_documents=False) | |
question = "Cosa sono i RWA?" | |
result = qa.run({"query": question}) | |
print(result["result"]) """ | |
# Iterate model output | |
input_ids = tokenizer.apply_chat_template(conversation, add_generation_prompt=True, return_tensors="pt") | |
if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: | |
input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] | |
gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") | |
input_ids = input_ids.to(model.device) | |
streamer = TextIteratorStreamer(tokenizer, timeout=None, skip_prompt=True, skip_special_tokens=True) | |
generate_kwargs = dict( | |
{"input_ids": input_ids}, | |
streamer=streamer, | |
max_new_tokens=max_new_tokens, | |
do_sample=True, | |
top_p=top_p, | |
top_k=top_k, | |
temperature=temperature, | |
num_beams=1, | |
repetition_penalty=repetition_penalty, | |
) | |
t = Thread(target=model.generate, kwargs=generate_kwargs) | |
t.start() | |
outputs = [] | |
for text in streamer: | |
outputs.append(text) | |
yield "".join(outputs) | |
if session_state["documents_loaded"]: | |
outputs.append(f"Fonti utilizzate: {sources}") | |
yield "".join(outputs) | |
#sources = [] | |
print("debug - CHATHISTORY", chat_history) | |
chat_interface = gr.ChatInterface( | |
fn=generate, | |
additional_inputs=[ | |
gr.Slider( | |
label="Max new tokens", | |
minimum=1, | |
maximum=MAX_MAX_NEW_TOKENS, | |
step=1, | |
value=DEFAULT_MAX_NEW_TOKENS, | |
), | |
gr.Slider( | |
label="Temperature", | |
minimum=0.1, | |
maximum=4.0, | |
step=0.1, | |
value=0.6, | |
), | |
gr.Slider( | |
label="Top-p (nucleus sampling)", | |
minimum=0.05, | |
maximum=1.0, | |
step=0.05, | |
value=0.9, | |
), | |
gr.Slider( | |
label="Top-k", | |
minimum=1, | |
maximum=1000, | |
step=1, | |
value=50, | |
), | |
gr.Slider( | |
label="Repetition penalty", | |
minimum=1.0, | |
maximum=2.0, | |
step=0.05, | |
value=1.2, | |
), | |
], | |
stop_btn=None, | |
examples=[ | |
["Ciao, in cosa puoi aiutarmi?"], | |
["Ciao, in cosa consiste un piatto di spaghetti?"], | |
["Ciao, quali sono le aziende che hanno iniziato ad integrare le stablecoins? Fammi un breve sommario."], | |
["Spiegami la differenza tra mondi virtuali pubblici o privati"], | |
["Trovami un esempio di progetto B2B"], | |
["Quali sono le regole europee sui bonifici istantanei?"], | |
], | |
cache_examples=False, | |
) | |
with gr.Blocks(css=".gradio-container {background-color: #B9D9EB}", fill_height=True) as demo: | |
gr.Markdown(DESCRIPTION, elem_classes="centered") | |
chat_interface.render() | |
if __name__ == "__main__": | |
#demo.queue(max_size=20).launch() | |
demo.launch() |