qa-generator / app.py
philipp-zettl's picture
Update app.py
a40a80d verified
raw
history blame
13 kB
import gradio as gr
import torch
import itertools
import pandas as pd
import spaces
import random
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoModel
from sklearn.metrics import pairwise_distances
from collections import Counter
from itertools import chain
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
import math
model_name = 'philipp-zettl/t5-small-long-qa'
qa_model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
model_name = 'philipp-zettl/t5-small-qg'
qg_model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained('google/flan-t5-small')
embedding_model = AutoModel.from_pretrained('sentence-transformers/paraphrase-MiniLM-L6-v2')
embedding_tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/paraphrase-MiniLM-L6-v2')
# Move only the student model to GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
qa_model = qa_model.to(device)
qg_model = qg_model.to(device)
embedding_model = embedding_model.to(device)
max_questions = 1
max_answers = 1
max_elem_value = 100
def ngrams(sequence, n):
return [tuple(sequence[i:i+n]) for i in range(len(sequence)-n+1)]
def count_ngrams(sequence, max_n):
counts = Counter()
for n in range(1, max_n + 1):
counts.update(ngrams(sequence, n))
return counts
def self_bleu(outputs):
smoothing_function = SmoothingFunction().method1
scores = []
for i in range(len(outputs)):
references = outputs[:i] + outputs[i+1:]
# Avoid calculating BLEU score for empty references
if references:
scores.append(sentence_bleu(references, outputs[i], smoothing_function=smoothing_function))
# If all references are empty, return a default value
if not scores:
return 0
return sum(scores) / len(scores)
def dist_n(outputs, n):
all_ngrams = list(chain(*[ngrams(output, n) for output in outputs]))
unique_ngrams = set(all_ngrams)
return len(unique_ngrams) / len(all_ngrams) if all_ngrams else 0
def perplexity(model, tokenizer, texts):
encodings = tokenizer(texts, return_tensors='pt', padding=True, truncation=True)
max_length = model.config.n_positions
stride = 512
lls = []
for i in range(0, encodings.input_ids.size(1), stride):
begin_loc = max(i + stride - max_length, 0)
end_loc = i + stride
trg_len = end_loc - i
input_ids = encodings.input_ids[:, begin_loc:end_loc].to(model.device)
target_ids = input_ids.clone()
target_ids[:, :-trg_len] = -100
with torch.no_grad():
outputs = model(input_ids, labels=target_ids)
log_likelihood = outputs.loss * trg_len
lls.append(log_likelihood)
ppl = torch.exp(torch.stack(lls).sum() / end_loc)
return ppl.item()
def embedding_similarity(inputs, outputs):
global embedding_model, embedding_tokenizer, device
def embed(texts):
inputs = embedding_tokenizer(texts, return_tensors='pt', padding=True, truncation=True).to(device)
with torch.no_grad():
outputs = embedding_model(**inputs)
return outputs.last_hidden_state.mean(dim=1).cpu().numpy()
input_embeddings = embed(inputs)
output_embeddings = embed(outputs)
similarities = pairwise_distances(input_embeddings, output_embeddings, metric='cosine')
return sum(similarities) / len(similarities)
def js_divergence(p, q):
def kl_divergence(p, q):
return sum(p[i] * math.log(p[i] / q[i]) for i in range(len(p)) if p[i] != 0 and q[i] != 0)
p_norm = [float(i)/sum(p) for i in p]
q_norm = [float(i)/sum(q) for i in q]
m = [(p_norm[i] + q_norm[i]) / 2 for i in range(len(p_norm))]
return (kl_divergence(p_norm, m) + kl_divergence(q_norm, m)) / 2
def evaluate_model(num_beams, num_beam_groups, model, tokenizer, eval_data, max_length=85):
generated_outputs = []
for input_text in eval_data:
input_ids = tokenizer.encode(input_text, return_tensors="pt").to(device)
outputs = model.generate(
input_ids,
num_beams=num_beams,
num_beam_groups=num_beam_groups,
diversity_penalty=1.0,
max_new_tokens=max_length,
)
decoded_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
generated_outputs.append(decoded_text.split())
# Self-BLEU for diversity
diversity_score = self_bleu(generated_outputs)
# Dist-1 and Dist-2 for diversity
dist1 = dist_n(generated_outputs, 1)
dist2 = dist_n(generated_outputs, 2)
# Perplexity for fluency and relevance
fluency_score = perplexity(model, tokenizer, [" ".join(output) for output in generated_outputs])
# Embedding similarity for contextual relevance
contextual_score = embedding_similarity(eval_data, [" ".join(output) for output in generated_outputs])
# Jensen-Shannon Divergence for distribution similarity
generated_ngrams = count_ngrams(list(chain(*generated_outputs)), 4)
reference_ngrams = count_ngrams(list(chain(*[tokenizer.tokenize(text) for text in eval_data])), 4)
all_ngrams = set(generated_ngrams.keys()).union(set(reference_ngrams.keys()))
p = [generated_ngrams[ngram] for ngram in all_ngrams]
q = [reference_ngrams[ngram] for ngram in all_ngrams]
jsd_score = js_divergence(p, q)
return {
"diversity_score": diversity_score,
"dist1": dist1,
"dist2": dist2,
"fluency_score": fluency_score,
"contextual_score": contextual_score,
"jsd_score": jsd_score
}
def find_best_parameters(eval_data, model, tokenizer, max_length=85):
# Parameter ranges
parameter_map = {
2: [2],
4: [2],
6: [2], # 6x3 == 4x2
8: [2], # 8x4 == 6x3 == 4x2
10: [2], # 10x5 == 8x4 == 6x3 == 4x2
}
# Find the best parameters
best_score = -float('inf')
best_params = None
for num_beams in parameter_map.keys():
for num_beam_groups in parameter_map[num_beams]:
if num_beam_groups > num_beams:
continue # num_beam_groups should not be greater than num_beams
scores = evaluate_model(num_beams, num_beam_groups, model, tokenizer, eval_data, max_length=max_length)
# Combine scores to determine the best parameters
combined_score = (scores['dist1'] + scores['dist2'] - scores['fluency_score'] + scores['contextual_score'] - scores['jsd_score']).mean()
print(f"num_beams={num_beams}, num_beam_groups={num_beam_groups}, avg combined score={combined_score}")
if combined_score > best_score:
best_score = combined_score
best_params = (num_beams, num_beam_groups)
print(f"Best parameters: num_beams={best_params[0]}, num_beam_groups={best_params[1]} with combined score={best_score}")
return best_params
def run_model(inputs, tokenizer, model, num_beams=2, num_beam_groups=2, temperature=0.5, num_return_sequences=1, max_length=85):
all_outputs = []
torch.manual_seed(42069)
for input_text in inputs:
model_inputs = tokenizer([input_text], max_length=512, padding=True, truncation=True)
input_ids = torch.tensor(model_inputs['input_ids']).to(device)
for sample in input_ids:
sample_outputs = []
with torch.no_grad():
sample_output = model.generate(
input_ids[:1],
max_length=max_length,
#temperature=temperature,
#do_sample=True,
num_return_sequences=num_return_sequences,
low_memory=True,
#top_p=temperature,
#num_beams=max(2, num_return_sequences),
use_cache=True,
# Contrastive search
#penalty_alpha=0.6,
#top_k=4,
# Multi-nomial sampling
#do_sample=True,
#num_beams=1,
# Beam search
#num_beams=5,
# Beam search multinomial sampling
#num_beams=5,
#do_sample=True,
# Diverse Beam search decoding
num_beams=max(2, num_return_sequences),
num_beam_groups=max(2, num_return_sequences),
diversity_penalty=temperature,
#do_sample=True,
)
for i, sample_output in enumerate(sample_output):
sample_output = sample_output.unsqueeze(0)
sample_output = tokenizer.decode(sample_output[0], skip_special_tokens=True)
sample_outputs.append(sample_output)
all_outputs.append(sample_outputs)
return all_outputs
@spaces.GPU
def gen(content, temperature_qg=0.5, temperature_qa=0.75, num_return_sequences_qg=1, num_return_sequences_qa=1, max_length=85):
inputs = [
f'context: {content}'
]
question = run_model(
inputs,
tokenizer,
qg_model,
num_beams=num_return_sequences_qg,
num_beam_groups=num_return_sequences_qg,
temperature=temperature_qg,
num_return_sequences=num_return_sequences_qg,
max_length=max_length
)
q_params = find_best_parameters(list(chain.from_iterable(question)), qg_model, tokenizer, max_length=max_length)
question = run_model(
inputs,
tokenizer,
qg_model,
num_beams=q_params[0],
num_beam_groups=q_params[1],
temperature=temperature_qg,
num_return_sequences=num_return_sequences_qg,
max_length=max_length
)
inputs = list(chain.from_iterable([
[f'question: {q} context: {content}' for q in q_set] for q_set in question
]))
answer = run_model(
inputs,
tokenizer,
qa_model,
num_beams=num_return_sequences_qa,
num_beam_groups=num_return_sequences_qa,
temperature=temperature_qa,
num_return_sequences=num_return_sequences_qa,
max_length=max_length
)
questions = list(chain.from_iterable(question))
answers = list(chain.from_iterable(answer))
results = []
for idx, ans in enumerate(answers):
results.append({'question': questions[idx % num_return_sequences_qg], 'answer': ans})
return results
def variable_outputs(k, max_elems=10):
global max_elem_value
k = int(k)
return [gr.Text(visible=True)] * k + [gr.Text(visible=False)] * (max(max_elems, max_elem_value)- k)
def set_outputs(content, max_elems=10):
c = eval(content)
print('received content: ', c)
return [gr.Text(value=t, visible=True) for t in c] + [gr.Text(visible=False)] * (max(max_elems, 10) - len(c))
def create_file_download(qnas):
with open('qnas.tsv', 'w') as f:
for idx, qna in qnas.iterrows():
f.write(qna['Question'] + '\t' + qna['Answer'])
if idx < len(qnas) - 1:
f.write('\n')
return 'qnas.tsv'
with gr.Blocks(css='.hidden_input {display: none;}') as demo:
with gr.Row(equal_height=True):
with gr.Group("Content"):
content = gr.Textbox(label='Content', lines=15, placeholder='Enter text here', max_lines=10_000)
with gr.Group("Settings"):
temperature_qg = gr.Slider(label='Temperature QG', value=0.2, minimum=0, maximum=1, step=0.01)
temperature_qa = gr.Slider(label='Temperature QA', value=0.5, minimum=0, maximum=1, step=0.01)
max_length = gr.Number(label='Max Length', value=85, minimum=1, step=1, maximum=512)
num_return_sequences_qg = gr.Number(label='Number Questions', value=max_questions, minimum=1, step=1, maximum=max(max_questions, max_elem_value))
num_return_sequences_qa = gr.Number(label="Number Answers", value=max_answers, minimum=1, step=1, maximum=max(max_questions, max_elem_value))
with gr.Row():
gen_btn = gr.Button("Generate")
@gr.render(
inputs=[
content, temperature_qg, temperature_qa, num_return_sequences_qg, num_return_sequences_qa,
max_length
],
triggers=[gen_btn.click]
)
def render_results(content, temperature_qg, temperature_qa, num_return_sequences_qg, num_return_sequences_qa, max_length):
qnas = gen(
content, temperature_qg, temperature_qa, num_return_sequences_qg, num_return_sequences_qa,
max_length
)
df = gr.Dataframe(
value=[u.values() for u in qnas],
headers=['Question', 'Answer'],
col_count=2,
wrap=True
)
pd_df = pd.DataFrame([u.values() for u in qnas], columns=['Question', 'Answer'])
download = gr.DownloadButton(label='Download (without headers)', value=create_file_download(pd_df))
demo.launch()