import gradio as gr import torch import itertools import pandas as pd import spaces import random from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoModel from sklearn.metrics import pairwise_distances from collections import Counter from itertools import chain from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction import math model_name = 'philipp-zettl/t5-small-long-qa' qa_model = AutoModelForSeq2SeqLM.from_pretrained(model_name) model_name = 'philipp-zettl/t5-small-qg' qg_model = AutoModelForSeq2SeqLM.from_pretrained(model_name) tokenizer = AutoTokenizer.from_pretrained('google/flan-t5-small') embedding_model = AutoModel.from_pretrained('sentence-transformers/paraphrase-MiniLM-L6-v2') embedding_tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/paraphrase-MiniLM-L6-v2') # Move only the student model to GPU if available device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') qa_model = qa_model.to(device) qg_model = qg_model.to(device) embedding_model = embedding_model.to(device) max_questions = 1 max_answers = 1 max_elem_value = 100 def ngrams(sequence, n): return [tuple(sequence[i:i+n]) for i in range(len(sequence)-n+1)] def count_ngrams(sequence, max_n): counts = Counter() for n in range(1, max_n + 1): counts.update(ngrams(sequence, n)) return counts def self_bleu(outputs): smoothing_function = SmoothingFunction().method1 scores = [] for i in range(len(outputs)): references = outputs[:i] + outputs[i+1:] # Avoid calculating BLEU score for empty references if references: scores.append(sentence_bleu(references, outputs[i], smoothing_function=smoothing_function)) # If all references are empty, return a default value if not scores: return 0 return sum(scores) / len(scores) def dist_n(outputs, n): all_ngrams = list(chain(*[ngrams(output, n) for output in outputs])) unique_ngrams = set(all_ngrams) return len(unique_ngrams) / len(all_ngrams) if all_ngrams else 0 def perplexity(model, tokenizer, texts): encodings = tokenizer(texts, return_tensors='pt', padding=True, truncation=True) max_length = model.config.n_positions stride = 512 lls = [] for i in range(0, encodings.input_ids.size(1), stride): begin_loc = max(i + stride - max_length, 0) end_loc = i + stride trg_len = end_loc - i input_ids = encodings.input_ids[:, begin_loc:end_loc].to(model.device) target_ids = input_ids.clone() target_ids[:, :-trg_len] = -100 with torch.no_grad(): outputs = model(input_ids, labels=target_ids) log_likelihood = outputs.loss * trg_len lls.append(log_likelihood) ppl = torch.exp(torch.stack(lls).sum() / end_loc) return ppl.item() def embedding_similarity(inputs, outputs): global embedding_model, embedding_tokenizer, device def embed(texts): inputs = embedding_tokenizer(texts, return_tensors='pt', padding=True, truncation=True).to(device) with torch.no_grad(): outputs = embedding_model(**inputs) return outputs.last_hidden_state.mean(dim=1).cpu().numpy() input_embeddings = embed(inputs) output_embeddings = embed(outputs) similarities = pairwise_distances(input_embeddings, output_embeddings, metric='cosine') return sum(similarities) / len(similarities) def js_divergence(p, q): def kl_divergence(p, q): return sum(p[i] * math.log(p[i] / q[i]) for i in range(len(p)) if p[i] != 0 and q[i] != 0) p_norm = [float(i)/sum(p) for i in p] q_norm = [float(i)/sum(q) for i in q] m = [(p_norm[i] + q_norm[i]) / 2 for i in range(len(p_norm))] return (kl_divergence(p_norm, m) + kl_divergence(q_norm, m)) / 2 def evaluate_model(num_beams, num_beam_groups, model, tokenizer, eval_data, max_length=85): generated_outputs = [] for input_text in eval_data: input_ids = tokenizer.encode(input_text, return_tensors="pt").to(device) outputs = model.generate( input_ids, num_beams=num_beams, num_beam_groups=num_beam_groups, diversity_penalty=1.0, max_new_tokens=max_length, ) decoded_text = tokenizer.decode(outputs[0], skip_special_tokens=True) generated_outputs.append(decoded_text.split()) # Self-BLEU for diversity diversity_score = self_bleu(generated_outputs) # Dist-1 and Dist-2 for diversity dist1 = dist_n(generated_outputs, 1) dist2 = dist_n(generated_outputs, 2) # Perplexity for fluency and relevance fluency_score = perplexity(model, tokenizer, [" ".join(output) for output in generated_outputs]) # Embedding similarity for contextual relevance contextual_score = embedding_similarity(eval_data, [" ".join(output) for output in generated_outputs]) # Jensen-Shannon Divergence for distribution similarity generated_ngrams = count_ngrams(list(chain(*generated_outputs)), 4) reference_ngrams = count_ngrams(list(chain(*[tokenizer.tokenize(text) for text in eval_data])), 4) all_ngrams = set(generated_ngrams.keys()).union(set(reference_ngrams.keys())) p = [generated_ngrams[ngram] for ngram in all_ngrams] q = [reference_ngrams[ngram] for ngram in all_ngrams] jsd_score = js_divergence(p, q) return { "diversity_score": diversity_score, "dist1": dist1, "dist2": dist2, "fluency_score": fluency_score, "contextual_score": contextual_score, "jsd_score": jsd_score } def find_best_parameters(eval_data, model, tokenizer, max_length=85): # Parameter ranges parameter_map = { 2: [2], 4: [2], 6: [2], # 6x3 == 4x2 8: [2], # 8x4 == 6x3 == 4x2 10: [2], # 10x5 == 8x4 == 6x3 == 4x2 } # Find the best parameters best_score = -float('inf') best_params = None for num_beams in parameter_map.keys(): for num_beam_groups in parameter_map[num_beams]: if num_beam_groups > num_beams: continue # num_beam_groups should not be greater than num_beams scores = evaluate_model(num_beams, num_beam_groups, model, tokenizer, eval_data, max_length=max_length) # Combine scores to determine the best parameters combined_score = (scores['dist1'] + scores['dist2'] - scores['fluency_score'] + scores['contextual_score'] - scores['jsd_score']).mean() print(f"num_beams={num_beams}, num_beam_groups={num_beam_groups}, avg combined score={combined_score}") if combined_score > best_score: best_score = combined_score best_params = (num_beams, num_beam_groups) print(f"Best parameters: num_beams={best_params[0]}, num_beam_groups={best_params[1]} with combined score={best_score}") return best_params def run_model(inputs, tokenizer, model, num_beams=2, num_beam_groups=2, temperature=0.5, num_return_sequences=1, max_length=85): all_outputs = [] torch.manual_seed(42069) for input_text in inputs: model_inputs = tokenizer([input_text], max_length=512, padding=True, truncation=True) input_ids = torch.tensor(model_inputs['input_ids']).to(device) for sample in input_ids: sample_outputs = [] with torch.no_grad(): sample_output = model.generate( input_ids[:1], max_length=max_length, #temperature=temperature, #do_sample=True, num_return_sequences=num_return_sequences, low_memory=True, #top_p=temperature, #num_beams=max(2, num_return_sequences), use_cache=True, # Contrastive search #penalty_alpha=0.6, #top_k=4, # Multi-nomial sampling #do_sample=True, #num_beams=1, # Beam search #num_beams=5, # Beam search multinomial sampling #num_beams=5, #do_sample=True, # Diverse Beam search decoding num_beams=max(2, num_return_sequences), num_beam_groups=max(2, num_return_sequences), diversity_penalty=temperature, #do_sample=True, ) for i, sample_output in enumerate(sample_output): sample_output = sample_output.unsqueeze(0) sample_output = tokenizer.decode(sample_output[0], skip_special_tokens=True) sample_outputs.append(sample_output) all_outputs.append(sample_outputs) return all_outputs @spaces.GPU def gen(content, temperature_qg=0.5, temperature_qa=0.75, num_return_sequences_qg=1, num_return_sequences_qa=1, max_length=85): inputs = [ f'context: {content}' ] question = run_model( inputs, tokenizer, qg_model, num_beams=num_return_sequences_qg, num_beam_groups=num_return_sequences_qg, temperature=temperature_qg, num_return_sequences=num_return_sequences_qg, max_length=max_length ) q_params = find_best_parameters(list(chain.from_iterable(question)), qg_model, tokenizer, max_length=max_length) question = run_model( inputs, tokenizer, qg_model, num_beams=q_params[0], num_beam_groups=q_params[1], temperature=temperature_qg, num_return_sequences=num_return_sequences_qg, max_length=max_length ) inputs = list(chain.from_iterable([ [f'question: {q} context: {content}' for q in q_set] for q_set in question ])) answer = run_model( inputs, tokenizer, qa_model, num_beams=num_return_sequences_qa, num_beam_groups=num_return_sequences_qa, temperature=temperature_qa, num_return_sequences=num_return_sequences_qa, max_length=max_length ) questions = list(chain.from_iterable(question)) answers = list(chain.from_iterable(answer)) results = [] for idx, ans in enumerate(answers): results.append({'question': questions[idx % num_return_sequences_qg], 'answer': ans}) return results def variable_outputs(k, max_elems=10): global max_elem_value k = int(k) return [gr.Text(visible=True)] * k + [gr.Text(visible=False)] * (max(max_elems, max_elem_value)- k) def set_outputs(content, max_elems=10): c = eval(content) print('received content: ', c) return [gr.Text(value=t, visible=True) for t in c] + [gr.Text(visible=False)] * (max(max_elems, 10) - len(c)) def create_file_download(qnas): with open('qnas.tsv', 'w') as f: for idx, qna in qnas.iterrows(): f.write(qna['Question'] + '\t' + qna['Answer']) if idx < len(qnas) - 1: f.write('\n') return 'qnas.tsv' with gr.Blocks(css='.hidden_input {display: none;}') as demo: with gr.Row(equal_height=True): with gr.Group("Content"): content = gr.Textbox(label='Content', lines=15, placeholder='Enter text here', max_lines=10_000) with gr.Group("Settings"): temperature_qg = gr.Slider(label='Temperature QG', value=0.2, minimum=0, maximum=1, step=0.01) temperature_qa = gr.Slider(label='Temperature QA', value=0.5, minimum=0, maximum=1, step=0.01) max_length = gr.Number(label='Max Length', value=85, minimum=1, step=1, maximum=512) num_return_sequences_qg = gr.Number(label='Number Questions', value=max_questions, minimum=1, step=1, maximum=max(max_questions, max_elem_value)) num_return_sequences_qa = gr.Number(label="Number Answers", value=max_answers, minimum=1, step=1, maximum=max(max_questions, max_elem_value)) with gr.Row(): gen_btn = gr.Button("Generate") @gr.render( inputs=[ content, temperature_qg, temperature_qa, num_return_sequences_qg, num_return_sequences_qa, max_length ], triggers=[gen_btn.click] ) def render_results(content, temperature_qg, temperature_qa, num_return_sequences_qg, num_return_sequences_qa, max_length): qnas = gen( content, temperature_qg, temperature_qa, num_return_sequences_qg, num_return_sequences_qa, max_length ) df = gr.Dataframe( value=[u.values() for u in qnas], headers=['Question', 'Answer'], col_count=2, wrap=True ) pd_df = pd.DataFrame([u.values() for u in qnas], columns=['Question', 'Answer']) download = gr.DownloadButton(label='Download (without headers)', value=create_file_download(pd_df)) demo.launch()