Spaces:
Sleeping
Sleeping
import os | |
import spaces | |
from threading import Thread | |
from typing import Iterator | |
from backend2 import load_documents, prepare_documents, get_context_sources | |
import gradio as gr | |
import torch | |
from transformers import AutoModelForCausalLM, GemmaTokenizerFast, TextIteratorStreamer | |
from huggingface_hub import login | |
from llama_index.core import Settings, VectorStoreIndex, SimpleDirectoryReader, ChatPromptTemplate, PromptTemplate, load_index_from_storage, StorageContext | |
from llama_index.core.node_parser import SentenceSplitter | |
from llama_index.embeddings.instructor import InstructorEmbedding | |
huggingface_token = os.getenv("HUGGINGFACE_TOKEN") | |
login(huggingface_token) | |
DESCRIPTION = """\ | |
# La Chatbot degli Osservatori | |
""" | |
MAX_MAX_NEW_TOKENS = 2048 | |
DEFAULT_MAX_NEW_TOKENS = 1024 | |
os.environ["MAX_INPUT_TOKEN_LENGTH"] = "4096" #"8192" | |
MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH")) | |
# Force usage of CPU | |
#device = torch.device("cpu") | |
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
model_id = "google/gemma-2-2b-it" | |
model = AutoModelForCausalLM.from_pretrained( | |
model_id, | |
device_map="auto", | |
torch_dtype= torch.bfloat16 #torch.float16 if torch.cuda.is_available() else torch.float32, | |
) | |
tokenizer = GemmaTokenizerFast.from_pretrained(model_id) | |
#tokenizer = AutoTokenizer.from_pretrained(model_id) | |
tokenizer.use_default_system_prompt = False | |
model.config.sliding_window = 4096 | |
#model = model.to(device) | |
model.eval() | |
###------#### | |
# rag | |
documents_paths = { | |
'blockchain': 'documents/blockchain', | |
'metaverse': 'documents/metaverso', | |
'payment': 'documents/payment' | |
} | |
global session_state | |
session_state = {"index": False, | |
"documents_loaded": False} | |
INSTRUCTION_1 = 'In italiano, chiedi sempre se la domanda si riferisce agli "Osservatori Blockchain", "Osservatori Payment" oppure "Osservatori Metaverse".' | |
INSTRUCTION_2 = 'Sei un assistente che risponde sempre in italiano alle domande basandosi solo sulle informazioni fornite nel contesto che ti darò. Se non trovi informazioni, rispondi "Puoi chiedere maggiori informazioni all\'ufficio di riferimento.". Se invece la domanda è completamente fuori contesto, non rispondere e rammenta il topic del contesto' | |
"""# Reading documents from disk | |
docs = SimpleDirectoryReader(input_files=["data/blockchainprova.txt"]).load_data() | |
# Splitting the document into chunks with | |
# predefined size and overlap | |
parser = SentenceSplitter.from_defaults( | |
chunk_size=256, chunk_overlap=64, paragraph_separator="\n\n" | |
) | |
nodes = parser.get_nodes_from_documents(docs)""" | |
def generate( | |
message: str, | |
chat_history: list[tuple[str, str]], | |
max_new_tokens: int = 1024, | |
temperature: float = 0.6, | |
top_p: float = 0.9, | |
top_k: int = 50, | |
repetition_penalty: float = 1.2, | |
) -> Iterator[str]: | |
global matched_path | |
conversation = [] | |
for user, assistant in chat_history: | |
conversation.extend( | |
[ | |
{"role": "user", "content": user}, | |
{"role": "assistant", "content": assistant}, | |
] | |
) | |
if not session_state["index"]: | |
matched_path = None | |
words = message.lower() | |
for key, path in documents_paths.items(): | |
if key in words: | |
matched_path = path | |
break | |
if matched_path: | |
documents = load_documents(matched_path) | |
DB = prepare_documents(documents) | |
context, sources = get_context_sources(message, DB) | |
print("*** sources ***", sources) | |
gr.Info("doc preparati con ", sources) | |
conversation.append({"role": "user", "content": f'Contesto: {context}\n\n Domanda: {message}. Rispondi in italiano'}) | |
###### | |
"""index = VectorStoreIndex(nodes) | |
# get retriver | |
retriever = index.as_retriever(similarity_top_k=3) | |
relevant_chunks = retriever.retrieve(message) | |
print(f"Found: {len(relevant_chunks)} relevant chunks") | |
for idx, chunk in enumerate(relevant_chunks): | |
info_message += f"{idx + 1}) {chunk.text[:64]}...\n" | |
print(info_message) | |
gr.Info(info_message)""" | |
session_state["documents_loaded"] = True | |
session_state["index"] = True | |
else: ## CHIEDI CHIARIMENTO | |
conversation.append({"role": "user", "content": f"Domanda: {message} . Comando: {INSTRUCTION_1}" }) | |
gr.Info("richiesta di chiarimento") | |
print("******** CONV1 ", conversation) | |
else: | |
documents = load_documents(matched_path) | |
DB = prepare_documents(documents) | |
context, sources = get_context_sources(message, DB) | |
gr.Info("contesto già indicizzato") | |
conversation.append({"role": "user", "content": f"{INSTRUCTION_2}"}) | |
conversation.append({"role": "assistant", "content": "Ok."}) | |
conversation.append({"role": "user", "content": f'Contesto: {context}\n\n Domanda: {message}. Rispondi in italiano'}) | |
print("******** CONV2 ", conversation) | |
# Iterate model output | |
input_ids = tokenizer.apply_chat_template(conversation, add_generation_prompt=True, return_tensors="pt") | |
if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: | |
input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] | |
gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") | |
input_ids = input_ids.to(model.device) | |
streamer = TextIteratorStreamer(tokenizer, timeout=None, skip_prompt=True, skip_special_tokens=True) | |
generate_kwargs = dict( | |
{"input_ids": input_ids}, | |
streamer=streamer, | |
max_new_tokens=max_new_tokens, | |
do_sample=True, | |
top_p=top_p, | |
top_k=top_k, | |
temperature=temperature, | |
num_beams=1, | |
repetition_penalty=repetition_penalty, | |
) | |
t = Thread(target=model.generate, kwargs=generate_kwargs) | |
t.start() | |
outputs = [] | |
for text in streamer: | |
outputs.append(text) | |
yield "".join(outputs) | |
if session_state["documents_loaded"]: | |
outputs.append(f"Fonti utilizzate: {sources}") | |
yield "".join(outputs) | |
#sources = [] | |
print("debug - CHATHISTORY", chat_history) | |
chat_interface = gr.ChatInterface( | |
fn=generate, | |
additional_inputs=[ | |
gr.Slider( | |
label="Max new tokens", | |
minimum=1, | |
maximum=MAX_MAX_NEW_TOKENS, | |
step=1, | |
value=DEFAULT_MAX_NEW_TOKENS, | |
), | |
gr.Slider( | |
label="Temperature", | |
minimum=0.1, | |
maximum=4.0, | |
step=0.1, | |
value=0.6, | |
), | |
gr.Slider( | |
label="Top-p (nucleus sampling)", | |
minimum=0.05, | |
maximum=1.0, | |
step=0.05, | |
value=0.9, | |
), | |
gr.Slider( | |
label="Top-k", | |
minimum=1, | |
maximum=1000, | |
step=1, | |
value=50, | |
), | |
gr.Slider( | |
label="Repetition penalty", | |
minimum=1.0, | |
maximum=2.0, | |
step=0.05, | |
value=1.2, | |
), | |
], | |
stop_btn=None, | |
examples=[ | |
["Ciao, in cosa puoi aiutarmi?"], | |
["Ciao, in cosa consiste un piatto di spaghetti?"], | |
["Ciao, quali sono le aziende che hanno iniziato ad integrare le stablecoins? Fammi un breve sommario."], | |
["Spiegami la differenza tra mondi virtuali pubblici o privati"], | |
["Trovami un esempio di progetto B2B"], | |
["Quali sono le regole europee sui bonifici istantanei?"], | |
], | |
cache_examples=False, | |
) | |
with gr.Blocks(css=".gradio-container {background-color: #B9D9EB}", fill_height=True) as demo: | |
gr.Markdown(DESCRIPTION, elem_classes="centered") | |
chat_interface.render() | |
if __name__ == "__main__": | |
#demo.queue(max_size=20).launch() | |
demo.launch() |