import os import spaces from threading import Thread from typing import Iterator from backend2 import load_documents, prepare_documents, get_context_sources import gradio as gr import torch from transformers import AutoModelForCausalLM, GemmaTokenizerFast, TextIteratorStreamer from huggingface_hub import login from llama_index.core import Settings, VectorStoreIndex, SimpleDirectoryReader, ChatPromptTemplate, PromptTemplate, load_index_from_storage, StorageContext from llama_index.core.node_parser import SentenceSplitter from llama_index.embeddings.instructor import InstructorEmbedding huggingface_token = os.getenv("HUGGINGFACE_TOKEN") login(huggingface_token) DESCRIPTION = """\ # La Chatbot degli Osservatori """ MAX_MAX_NEW_TOKENS = 2048 DEFAULT_MAX_NEW_TOKENS = 1024 os.environ["MAX_INPUT_TOKEN_LENGTH"] = "4096" #"8192" MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH")) # Force usage of CPU #device = torch.device("cpu") device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") model_id = "google/gemma-2-2b-it" model = AutoModelForCausalLM.from_pretrained( model_id, device_map="auto", torch_dtype= torch.bfloat16 #torch.float16 if torch.cuda.is_available() else torch.float32, ) tokenizer = GemmaTokenizerFast.from_pretrained(model_id) #tokenizer = AutoTokenizer.from_pretrained(model_id) tokenizer.use_default_system_prompt = False model.config.sliding_window = 4096 #model = model.to(device) model.eval() ###------#### # rag documents_paths = { 'blockchain': 'documents/blockchain', 'metaverse': 'documents/metaverso', 'payment': 'documents/payment' } global session_state session_state = {"index": False, "documents_loaded": False} INSTRUCTION_1 = 'In italiano, chiedi sempre se la domanda si riferisce agli "Osservatori Blockchain", "Osservatori Payment" oppure "Osservatori Metaverse".' INSTRUCTION_2 = 'Sei un assistente che risponde sempre in italiano alle domande basandosi solo sulle informazioni fornite nel contesto che ti darò. Se non trovi informazioni, rispondi "Puoi chiedere maggiori informazioni all\'ufficio di riferimento.". Se invece la domanda è completamente fuori contesto, non rispondere e rammenta il topic del contesto' """# Reading documents from disk docs = SimpleDirectoryReader(input_files=["data/blockchainprova.txt"]).load_data() # Splitting the document into chunks with # predefined size and overlap parser = SentenceSplitter.from_defaults( chunk_size=256, chunk_overlap=64, paragraph_separator="\n\n" ) nodes = parser.get_nodes_from_documents(docs)""" @spaces.GPU() def generate( message: str, chat_history: list[tuple[str, str]], max_new_tokens: int = 1024, temperature: float = 0.6, top_p: float = 0.9, top_k: int = 50, repetition_penalty: float = 1.2, ) -> Iterator[str]: global matched_path conversation = [] for user, assistant in chat_history: conversation.extend( [ {"role": "user", "content": user}, {"role": "assistant", "content": assistant}, ] ) if not session_state["index"]: matched_path = None words = message.lower() for key, path in documents_paths.items(): if key in words: matched_path = path break if matched_path: documents = load_documents(matched_path) DB = prepare_documents(documents) context, sources = get_context_sources(message, DB) print("*** sources ***", sources) gr.Info("doc preparati con ", sources) conversation.append({"role": "user", "content": f'Contesto: {context}\n\n Domanda: {message}. Rispondi in italiano'}) ###### """index = VectorStoreIndex(nodes) # get retriver retriever = index.as_retriever(similarity_top_k=3) relevant_chunks = retriever.retrieve(message) print(f"Found: {len(relevant_chunks)} relevant chunks") for idx, chunk in enumerate(relevant_chunks): info_message += f"{idx + 1}) {chunk.text[:64]}...\n" print(info_message) gr.Info(info_message)""" session_state["documents_loaded"] = True session_state["index"] = True else: ## CHIEDI CHIARIMENTO conversation.append({"role": "user", "content": f"Domanda: {message} . Comando: {INSTRUCTION_1}" }) gr.Info("richiesta di chiarimento") print("******** CONV1 ", conversation) else: documents = load_documents(matched_path) DB = prepare_documents(documents) context, sources = get_context_sources(message, DB) gr.Info("contesto già indicizzato") conversation.append({"role": "user", "content": f"{INSTRUCTION_2}"}) conversation.append({"role": "assistant", "content": "Ok."}) conversation.append({"role": "user", "content": f'Contesto: {context}\n\n Domanda: {message}. Rispondi in italiano'}) print("******** CONV2 ", conversation) # Iterate model output input_ids = tokenizer.apply_chat_template(conversation, add_generation_prompt=True, return_tensors="pt") if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") input_ids = input_ids.to(model.device) streamer = TextIteratorStreamer(tokenizer, timeout=None, skip_prompt=True, skip_special_tokens=True) generate_kwargs = dict( {"input_ids": input_ids}, streamer=streamer, max_new_tokens=max_new_tokens, do_sample=True, top_p=top_p, top_k=top_k, temperature=temperature, num_beams=1, repetition_penalty=repetition_penalty, ) t = Thread(target=model.generate, kwargs=generate_kwargs) t.start() outputs = [] for text in streamer: outputs.append(text) yield "".join(outputs) if session_state["documents_loaded"]: outputs.append(f"Fonti utilizzate: {sources}") yield "".join(outputs) #sources = [] print("debug - CHATHISTORY", chat_history) chat_interface = gr.ChatInterface( fn=generate, additional_inputs=[ gr.Slider( label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS, ), gr.Slider( label="Temperature", minimum=0.1, maximum=4.0, step=0.1, value=0.6, ), gr.Slider( label="Top-p (nucleus sampling)", minimum=0.05, maximum=1.0, step=0.05, value=0.9, ), gr.Slider( label="Top-k", minimum=1, maximum=1000, step=1, value=50, ), gr.Slider( label="Repetition penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.2, ), ], stop_btn=None, examples=[ ["Ciao, in cosa puoi aiutarmi?"], ["Ciao, in cosa consiste un piatto di spaghetti?"], ["Ciao, quali sono le aziende che hanno iniziato ad integrare le stablecoins? Fammi un breve sommario."], ["Spiegami la differenza tra mondi virtuali pubblici o privati"], ["Trovami un esempio di progetto B2B"], ["Quali sono le regole europee sui bonifici istantanei?"], ], cache_examples=False, ) with gr.Blocks(css=".gradio-container {background-color: #B9D9EB}", fill_height=True) as demo: gr.Markdown(DESCRIPTION, elem_classes="centered") chat_interface.render() if __name__ == "__main__": #demo.queue(max_size=20).launch() demo.launch()