paascorb's picture
Update app.py
da3ee69
raw
history blame
18.2 kB
import gradio as gr
from pathlib import Path
import os
from transformers import AutoTokenizer, AutoModel, AutoModelForQuestionAnswering, pipeline
from transformers import MarianMTModel, MarianTokenizer
from nltk.tokenize import sent_tokenize
from nltk.tokenize import LineTokenizer
import math
import torch
import nltk
import numpy as np
import time
import hashlib
from tqdm import tqdm
device = "cuda:0" if torch.cuda.is_available() else "cpu"
import textract
from scipy.special import softmax
import pandas as pd
from datetime import datetime
nltk.download('punkt')
docs = None
# Definimos los modelos:
# Traducción
mname = "Helsinki-NLP/opus-mt-es-en"
tokenizer_es_en = MarianTokenizer.from_pretrained(mname)
model_es_en = MarianMTModel.from_pretrained(mname)
model_es_en.to(device)
mname = "Helsinki-NLP/opus-mt-en-es"
tokenizer_en_es = MarianTokenizer.from_pretrained(mname)
model_en_es = MarianMTModel.from_pretrained(mname)
model_en_es.to(device)
lt = LineTokenizer()
# Responder preguntas
tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/multi-qa-mpnet-base-dot-v1")
model = AutoModel.from_pretrained("sentence-transformers/multi-qa-mpnet-base-dot-v1").to(device).eval()
tokenizer_ans = AutoTokenizer.from_pretrained("deepset/roberta-large-squad2")
model_ans = AutoModelForQuestionAnswering.from_pretrained("deepset/roberta-large-squad2").to(device).eval()
if device == 'cuda:0':
pipe = pipeline("question-answering",model_ans,tokenizer =tokenizer_ans,device = 0)
else:
pipe = pipeline("question-answering",model_ans,tokenizer =tokenizer_ans)
def validate_dataset(dataset):
global docs
docs = None # clear it out if dataset is modified
docs_ready = dataset.iloc[-1, 0] != ""
if docs_ready:
return "✨Listo✨"
else:
return "⚠️Esperando documentos..."
def traducir_parrafos(parrafos, tokenizer, model, tam_bloque=8, ):
parrafos_traducidos = []
for parrafo in parrafos:
frases = sent_tokenize(parrafo)
batches = math.ceil(len(frases) / tam_bloque)
traducido = []
for i in range(batches):
bloque_enviado = frases[i*tam_bloque:(i+1)*tam_bloque]
model_inputs = tokenizer(bloque_enviado, return_tensors="pt",
padding=True, truncation=True,
max_length=500).to(device)
with torch.no_grad():
bloque_traducido = model.generate(**model_inputs)
traducido += bloque_traducido
traducido = [tokenizer.decode(t, skip_special_tokens=True) for t in traducido]
parrafos_traducidos += [" ".join(traducido)]
return parrafos_traducidos
def traducir_es_en(texto):
parrafos = lt.tokenize(texto)
par_tra = traducir_parrafos(parrafos, tokenizer_es_en, model_es_en)
return "\n".join(par_tra)
def traducir_en_es(texto):
parrafos = lt.tokenize(texto)
par_tra = traducir_parrafos(parrafos, tokenizer_en_es, model_en_es)
return "\n".join(par_tra)
def request_pathname(files):
if files is None:
return [[]]
return [[file.name, file.name.split('/')[-1]] for file in files]
def cls_pooling(model_output):
return model_output.last_hidden_state[:,0]
def encode_query(query):
encoded_input = tokenizer(query, truncation=True, return_tensors='pt').to(device)
with torch.no_grad():
model_output = model(**encoded_input, return_dict=True)
embeddings = cls_pooling(model_output)
return embeddings.cpu()
def encode_docs(docs,maxlen = 64, stride = 32):
encoded_input = []
embeddings = []
spans = []
file_names = []
name, text = docs
text = text.split(" ")
if len(text) < maxlen:
text = " ".join(text)
encoded_input.append(tokenizer(temp_text, return_tensors='pt', truncation = True).to(device))
spans.append(temp_text)
file_names.append(name)
else:
num_iters = int(len(text)/maxlen)+1
for i in range(num_iters):
if i == 0:
temp_text = " ".join(text[i*maxlen:(i+1)*maxlen+stride])
else:
temp_text = " ".join(text[(i-1)*maxlen:(i)*maxlen][-stride:] + text[i*maxlen:(i+1)*maxlen])
encoded_input.append(tokenizer(temp_text, return_tensors='pt', truncation = True).to(device))
spans.append(temp_text)
file_names.append(name)
with torch.no_grad():
for encoded in tqdm(encoded_input):
model_output = model(**encoded, return_dict=True)
embeddings.append(cls_pooling(model_output))
embeddings = np.float32(torch.stack(embeddings).transpose(0, 1).cpu())
np.save("emb_{}.npy".format(name),dict(zip(list(range(len(embeddings))),embeddings)))
np.save("spans_{}.npy".format(name),dict(zip(list(range(len(spans))),spans)))
np.save("file_{}.npy".format(name),dict(zip(list(range(len(file_names))),file_names)))
return embeddings, spans, file_names
def predict(query,data):
query = traducir_es_en(query)
name_to_save = data.name.split("/")[-1].split(".")[0][:-8]
k=2
st = str([query,name_to_save])
st_hashed = str(hashlib.sha256(st.encode()).hexdigest()) #just to speed up examples load
hist = st + " " + st_hashed
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
try: #if the same question was already asked for this document, upload question and answer
df = pd.read_csv("{}.csv".format(hash(st)))
list_outputs = []
for i in range(k):
temp = [df.iloc[n] for n in range(k)][i]
tupla = (traducir_en_es(temp.Respuesta),
traducir_en_es(temp.Contexto),
traducir_en_es(temp.Probabilidades))
# text = ''
# text += 'Probabilidades: '+ temp.Probabilidades + '\n\n'
# text += 'Respuesta: ' +temp.Respuesta + '\n\n'
# text += 'Contexto: '+temp.Contexto + '\n\n'
list_outputs.append(tupla)
return list_outputs[0]
except Exception as e:
print(e)
print(st)
if name_to_save+".txt" in os.listdir(): #if the document was already used, load its embeddings
doc_emb = np.load('emb_{}.npy'.format(name_to_save),allow_pickle='TRUE').item()
doc_text = np.load('spans_{}.npy'.format(name_to_save),allow_pickle='TRUE').item()
file_names_dicto = np.load('file_{}.npy'.format(name_to_save),allow_pickle='TRUE').item()
doc_emb = np.array(list(doc_emb.values())).reshape(-1,768)
doc_text = list(doc_text.values())
file_names = list(file_names_dicto.values())
else:
text = textract.process("{}".format(data.name)).decode('utf8')
text = text.replace("\r", " ")
text = text.replace("\n", " ")
text = text.replace(" . "," ")
text = traducir_es_en(text)
doc_emb, doc_text, file_names = encode_docs((name_to_save,text),maxlen = 64, stride = 32)
doc_emb = doc_emb.reshape(-1, 768)
with open("{}.txt".format(name_to_save),"w",encoding="utf-8") as f:
f.write(text)
#once embeddings are calculated, run MIPS
start = time.time()
query_emb = encode_query(query)
scores = np.matmul(query_emb, doc_emb.transpose(1,0))[0].tolist()
doc_score_pairs = list(zip(doc_text, scores, file_names))
doc_score_pairs = sorted(doc_score_pairs, key=lambda x: x[1], reverse=True)
probs_sum = 0
probs = softmax(sorted(scores,reverse = True)[:k])
table = {"Contexto":[],"Respuesta":[],"Probabilidades":[]}
#get answers for each pair of question (from user) and top best passages
for i, (passage, _, names) in enumerate(doc_score_pairs[:k]):
passage = passage.replace("\n","")
#passage = passage.replace(" . "," ")
if probs[i] > 0.1 or (i < 3 and probs[i] > 0.05): #generate answers for more likely passages but no less than 2
QA = {'question':query,'context':passage}
ans = pipe(QA)
probabilities = "P(a|p): {}, P(a|p,q): {}, P(p|q): {}".format(round(ans["score"],5),
round(ans["score"]*probs[i],5),
round(probs[i],5))
table["Contexto"].append(passage)
table["Respuesta"].append(str(ans["answer"]).upper())
table["Probabilidades"].append(probabilities)
else:
table["Contexto"].append(passage)
table["Respuesta"].append("no_answer_calculated")
table["Probabilidades"].append("P(p|q): {}".format(round(probs[i],5)))
#format answers for ~nice output and save it for future (if the same question is asked again using same pdf)
df = pd.DataFrame(table)
print(df)
print("time: "+ str(time.time()-start))
with open("HISTORY.txt","a", encoding = "utf-8") as f:
f.write(hist)
f.write(" " + str(current_time))
f.write("\n")
f.close()
df.to_csv("{}.csv".format(hash(st)), index=False)
list_outputs = []
for i in range(k):
temp = [df.iloc[n] for n in range(k)][i]
tupla = (traducir_en_es(temp.Respuesta),
traducir_en_es(temp.Contexto),
traducir_en_es(temp.Probabilidades))
# text = ''
# text += 'Probabilidades: '+ temp.Probabilidades + '\n\n'
# text += 'Respuesta: ' +temp.Respuesta + '\n\n'
# text += 'Contexto: '+temp.Contexto + '\n\n'
list_outputs.append(tupla)
return list_outputs[0]
with gr.Blocks() as demo:
gr.Markdown("""
# Document Question and Answer adaptado al castellano por Pablo Ascorbe.
Este espacio ha sido clonado y adaptado de: https://huggingface.co/spaces/whitead/paper-qa
La idea es utilizar un modelo preentrenado de HuggingFace como "distilbert-base-cased-distilled-squad"
y responder las preguntas en inglés, para ello, será necesario hacer primero una traducción de los textos en castellano
a inglés y luego volver a traducir en sentido contrario.
## Instrucciones:
Adjunte su documento, ya sea en formato .txt o .pdf, y pregunte lo que desee.
""")
file = gr.File(
label="Sus documentos subidos (PDF o txt)")
# dataset = gr.Dataframe(
# headers=["filepath", "citation string"],
# datatype=["str", "str"],
# col_count=(2, "fixed"),
# interactive=True,
# label="Documentos y citas"
# )
# buildb = gr.Textbox("⚠️Esperando documentos...",
# label="Estado", interactive=False, show_label=True)
# dataset.change(validate_dataset, inputs=[
# dataset], outputs=[buildb])
# uploaded_files.change(request_pathname, inputs=[
# uploaded_files], outputs=[dataset])
query = gr.Textbox(
placeholder="Introduzca su pregunta aquí...", label="Pregunta")
ask = gr.Button("Preguntar")
gr.Markdown("## Respuesta")
answer = gr.Markdown(label="Respuesta")
prob = gr.Markdown(label="Probabilidades")
with gr.Accordion("Contexto", open=False):
gr.Markdown(
"### Contexto\n\nEl siguiente contexto ha sido utilizado para generar la respuesta:")
context = gr.Markdown(label="Contexto")
# ask.click(fn=do_ask, inputs=[query, buildb,
# dataset], outputs=[answer, context])
ask.click(fn=predict, inputs=[query, file],
outputs=[answer, context, prob])
examples = gr.Examples(examples=["¿Cuándo suelen comenzar las adicciones?","Entrevista Miguel Ruiz.txt"],
inputs=[query, file])
demo.queue(concurrency_count=20)
demo.launch(show_error=True)
# iface = gr.Interface(fn =predict,
# inputs = [gr.inputs.Textbox(default="What is Open-domain question answering?"),
# gr.inputs.File(),
# ],
# outputs = [
# gr.outputs.Carousel(['text']),
# ],
# description=description,
# title = title,
# allow_flagging ="manual",flagging_options = ["correct","wrong"],
# allow_screenshot=False)
# iface.launch(enable_queue=True, show_error =True)
# Definimos los modelos:
# Traducción
# mname = "Helsinki-NLP/opus-mt-es-en"
# tokenizer_es_en = MarianTokenizer.from_pretrained(mname)
# model_es_en = MarianMTModel.from_pretrained(mname)
# model_es_en.to(device)
# mname = "Helsinki-NLP/opus-mt-en-es"
# tokenizer_en_es = MarianTokenizer.from_pretrained(mname)
# model_en_es = MarianMTModel.from_pretrained(mname)
# model_en_es.to(device)
# lt = LineTokenizer()
# Responder preguntas
# question_answerer = pipeline("question-answering", model='distilbert-base-cased-distilled-squad')
# def request_pathname(files):
# if files is None:
# return [[]]
# return [[file.name, file.name.split('/')[-1]] for file in files]
# def traducir_parrafos(parrafos, tokenizer, model, tam_bloque=8, ):
# parrafos_traducidos = []
# for parrafo in parrafos:
# frases = sent_tokenize(parrafo)
# batches = math.ceil(len(frases) / tam_bloque)
# traducido = []
# for i in range(batches):
# bloque_enviado = frases[i*tam_bloque:(i+1)*tam_bloque]
# model_inputs = tokenizer(bloque_enviado, return_tensors="pt",
# padding=True, truncation=True,
# max_length=500).to(device)
# with torch.no_grad():
# bloque_traducido = model.generate(**model_inputs)
# traducido += bloque_traducido
# traducido = [tokenizer.decode(t, skip_special_tokens=True) for t in traducido]
# parrafos_traducidos += [" ".join(traducido)]
# return parrafos_traducidos
# def traducir_es_en(texto):
# parrafos = lt.tokenize(texto)
# par_tra = traducir_parrafos(parrafos, tokenizer_es_en, model_es_en)
# return "\n".join(par_tra)
# def traducir_en_es(texto):
# parrafos = lt.tokenize(texto)
# par_tra = traducir_parrafos(parrafos, tokenizer_en_es, model_en_es)
# return "\n".join(par_tra)
# def validate_dataset(dataset):
# global docs
# docs = None # clear it out if dataset is modified
# docs_ready = dataset.iloc[-1, 0] != ""
# if docs_ready:
# return "✨Listo✨"
# else:
# return "⚠️Esperando documentos..."
# def do_ask(question, button, dataset):
# global docs
# docs_ready = dataset.iloc[-1, 0] != ""
# if button == "✨Listo✨" and docs_ready:
# for _, row in dataset.iterrows():
# path = row['filepath']
# text = Path(f'{path}').read_text()
# text_en = traducir_es_en(text)
# QA_input = {
# 'question': traducir_es_en(question),
# 'context': text_en
# }
# return traducir_en_es(question_answerer(QA_input)['answer'])
# else:
# return ""
# # def do_ask(question, button, dataset, progress=gr.Progress()):
# # global docs
# # docs_ready = dataset.iloc[-1, 0] != ""
# # if button == "✨Listo✨" and docs_ready:
# # if docs is None: # don't want to rebuild index if it's already built
# # import paperqa
# # docs = paperqa.Docs()
# # # dataset is pandas dataframe
# # for _, row in dataset.iterrows():
# # key = None
# # if ',' not in row['citation string']:
# # key = row['citation string']
# # docs.add(row['filepath'], row['citation string'], key=key)
# # else:
# # return ""
# # progress(0, "Construyendo índices...")
# # docs._build_faiss_index()
# # progress(0.25, "Encolando...")
# # result = docs.query(question)
# # progress(1.0, "¡Hecho!")
# # return result.formatted_answer, result.context
# with gr.Blocks() as demo:
# gr.Markdown("""
# # Document Question and Answer adaptado al castellano por Pablo Ascorbe.
# Este espacio ha sido clonado y adaptado de: https://huggingface.co/spaces/whitead/paper-qa
# La idea es utilizar un modelo preentrenado de HuggingFace como "distilbert-base-cased-distilled-squad"
# y responder las preguntas en inglés, para ello, será necesario hacer primero una traducción de los textos en castellano
# a inglés y luego volver a traducir en sentido contrario.
# ## Instrucciones:
# Adjunte su documento, ya sea en formato .txt o .pdf, y pregunte lo que desee.
# """)
# uploaded_files = gr.File(
# label="Sus documentos subidos (PDF o txt)", file_count="multiple", )
# dataset = gr.Dataframe(
# headers=["filepath", "citation string"],
# datatype=["str", "str"],
# col_count=(2, "fixed"),
# interactive=True,
# label="Documentos y citas"
# )
# buildb = gr.Textbox("⚠️Esperando documentos...",
# label="Estado", interactive=False, show_label=True)
# dataset.change(validate_dataset, inputs=[
# dataset], outputs=[buildb])
# uploaded_files.change(request_pathname, inputs=[
# uploaded_files], outputs=[dataset])
# query = gr.Textbox(
# placeholder="Introduzca su pregunta aquí...", label="Pregunta")
# ask = gr.Button("Preguntar")
# gr.Markdown("## Respuesta")
# answer = gr.Markdown(label="Respuesta")
# with gr.Accordion("Contexto", open=False):
# gr.Markdown(
# "### Contexto\n\nEl siguiente contexto ha sido utilizado para generar la respuesta:")
# context = gr.Markdown(label="Contexto")
# # ask.click(fn=do_ask, inputs=[query, buildb,
# # dataset], outputs=[answer, context])
# ask.click(fn=do_ask, inputs=[query, buildb,
# dataset], outputs=[answer])
# demo.queue(concurrency_count=20)
# demo.launch(show_error=True)