chatbot-llamaindex / appcompleta.py
gufett0's picture
added class session
ceac55e
raw
history blame
9.24 kB
import os
import spaces
from threading import Thread
from typing import Iterator
from backend2 import load_documents, prepare_documents, get_context_sources
import gradio as gr
import torch
from transformers import AutoModelForCausalLM, GemmaTokenizerFast, TextIteratorStreamer
from huggingface_hub import login
import threading
huggingface_token = os.getenv("HUGGINGFACE_TOKEN")
login(huggingface_token)
DESCRIPTION = """\
# La Chatbot degli Osservatori
"""
MAX_MAX_NEW_TOKENS = 2048
DEFAULT_MAX_NEW_TOKENS = 1024
os.environ["MAX_INPUT_TOKEN_LENGTH"] = "4096" #"8192"
MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH"))
# Force usage of CPU
#device = torch.device("cpu")
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model_id = "google/gemma-2-2b-it"
model = AutoModelForCausalLM.from_pretrained(
model_id,
device_map="auto",
torch_dtype= torch.float16 if torch.cuda.is_available() else torch.float32,
)
tokenizer = GemmaTokenizerFast.from_pretrained(model_id)
#tokenizer = AutoTokenizer.from_pretrained(model_id)
tokenizer.use_default_system_prompt = False
model.config.sliding_window = 4096
#model = model.to(device)
model.eval()
###------####
# rag
documents_paths = {
'blockchain': 'documents/blockchain',
'metaverse': 'documents/metaverso',
'payment': 'documents/payment'
}
"""session_state = {"documents_loaded": False,
"document_db": None,
"original_message": None,
"clarification": False}"""
INSTRUCTION_1 = 'In italiano, chiedi molto brevemente se la domanda si riferisce agli "Osservatori Blockchain", "Osservatori Payment" oppure "Osservatori Metaverse".'
INSTRUCTION_2 = 'Sei un assistente che risponde in italiano alle domande basandosi solo sulle informazioni fornite nel contesto. Se non trovi informazioni, rispondi "Puoi chiedere maggiori informazioni all\'ufficio di riferimento.". Se invece la domanda è completamente fuori contesto, non rispondere e rammenta il topic del contesto'
default_error_response = (
'Non sono sicuro che tu voglia indirizzare la tua ricerca su una di queste opzioni: '
'"Blockchain", "Metaverse", "Payment". '
'Per favore utilizza il nome corretto.'
)
thread_local = threading.local()
def get_session_state():
if not hasattr(thread_local, "session_state"):
thread_local.session_state = {
"documents_loaded": False,
"document_db": None,
"original_message": None,
"clarification": False
}
return thread_local.session_state
@spaces.GPU(duration=30)
def generate(
message: str,
chat_history: list[tuple[str, str]],
max_new_tokens: int = 1024,
temperature: float = 0.6,
top_p: float = 0.9,
top_k: int = 50,
repetition_penalty: float = 1.2,
) -> Iterator[str]:
session_state = get_session_state()
global context, sources, conversation
if (not (session_state["documents_loaded"]) and not (session_state["clarification"])):
conversation = []
for user, assistant in chat_history:
conversation.extend(
[
{"role": "user", "content": user},
{"role": "assistant", "content": assistant},
]
)
conversation.append({"role": "user", "content": f"Domanda: {message} . Comando: {INSTRUCTION_1}" })
conversation.append({"role": "assistant", "content": "Ok."})
print("debug - CONV1", conversation)
session_state["original_message"] = message
session_state["clarification"] = True
elif session_state["clarification"]:
message = message.lower()
matched_path = None
for key, path in documents_paths.items():
if key in message:
matched_path = path
break
if matched_path:
yield "Fammi cercare tra i miei documenti..."
documents = load_documents(matched_path)
session_state["document_db"] = prepare_documents(documents)
session_state["documents_loaded"] = True
yield f"Ecco, ho raccolto informazioni dagli Osservatori {key.capitalize()}. Ora sto elaborando una risposta per te..."
sources = []
context, sources = get_context_sources(session_state["original_message"], session_state["document_db"])
print("sources ", sources)
print("context ", context)
conversation = []
conversation.append({"role": "user", "content": f"{INSTRUCTION_2}"})
conversation.append({"role": "assistant", "content": "Ok."})
for user, assistant in chat_history:
conversation.extend(
[
{"role": "user", "content": user },
{"role": "assistant", "content": assistant},
]
)
conversation.append({"role": "user", "content": f'Contesto: {context}\n\n Domanda iniziale: {session_state["original_message"]} . Rispondi solo in italiano.'})
session_state["clarification"] = False
print("debug - CONV2", conversation)
else:
print(default_error_response)
gr.Info("NO MATCH")
else:
conversation = []
conversation.append({"role": "user", "content": f"Comandi: {INSTRUCTION_2}"})
conversation.append({"role": "assistant", "content": "Va bene."})
for user, assistant in chat_history:
conversation.extend(
[
{"role": "user", "content": user},
{"role": "assistant", "content": assistant},
]
)
conversation.append({"role": "user", "content": f"Contesto: {context}\n\n Nuova domanda: {message} . Rispondi in italiano e seguendo i comandi che ti ho dato prima"})
print("debug - CONV3", conversation)
""" retriever = db.as_retriever()
qa = RetrievalQA.from_chain_type(llm=model, chain_type="refine", retriever=retriever, return_source_documents=False)
question = "Cosa sono i RWA?"
result = qa.run({"query": question})
print(result["result"]) """
# Iterate model output
input_ids = tokenizer.apply_chat_template(conversation, add_generation_prompt=True, return_tensors="pt")
if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH:
input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:]
gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.")
input_ids = input_ids.to(model.device)
streamer = TextIteratorStreamer(tokenizer, timeout=None, skip_prompt=True, skip_special_tokens=True)
generate_kwargs = dict(
{"input_ids": input_ids},
streamer=streamer,
max_new_tokens=max_new_tokens,
do_sample=True,
top_p=top_p,
top_k=top_k,
temperature=temperature,
num_beams=1,
repetition_penalty=repetition_penalty,
)
t = Thread(target=model.generate, kwargs=generate_kwargs)
t.start()
outputs = []
for text in streamer:
outputs.append(text)
yield "".join(outputs)
if session_state["documents_loaded"]:
outputs.append(f"Fonti utilizzate: {sources}")
yield "".join(outputs)
#sources = []
print("debug - CHATHISTORY", chat_history)
chat_interface = gr.ChatInterface(
fn=generate,
additional_inputs=[
gr.Slider(
label="Max new tokens",
minimum=1,
maximum=MAX_MAX_NEW_TOKENS,
step=1,
value=DEFAULT_MAX_NEW_TOKENS,
),
gr.Slider(
label="Temperature",
minimum=0.1,
maximum=4.0,
step=0.1,
value=0.6,
),
gr.Slider(
label="Top-p (nucleus sampling)",
minimum=0.05,
maximum=1.0,
step=0.05,
value=0.9,
),
gr.Slider(
label="Top-k",
minimum=1,
maximum=1000,
step=1,
value=50,
),
gr.Slider(
label="Repetition penalty",
minimum=1.0,
maximum=2.0,
step=0.05,
value=1.2,
),
],
stop_btn=None,
examples=[
["Ciao, in cosa puoi aiutarmi?"],
["Ciao, in cosa consiste un piatto di spaghetti?"],
["Ciao, quali sono le aziende che hanno iniziato ad integrare le stablecoins? Fammi un breve sommario."],
["Spiegami la differenza tra mondi virtuali pubblici o privati"],
["Trovami un esempio di progetto B2B"],
["Quali sono le regole europee sui bonifici istantanei?"],
],
cache_examples=False,
)
with gr.Blocks(css=".gradio-container {background-color: #B9D9EB}", fill_height=True) as demo:
gr.Markdown(DESCRIPTION, elem_classes="centered")
chat_interface.render()
if __name__ == "__main__":
#demo.queue(max_size=20).launch()
demo.launch()