inBERTolate / app.py
teticio's picture
force update
312011f
# force update
import argparse
import nltk
import torch
import numpy as np
import gradio as gr
from nltk import sent_tokenize
from transformers import (
RobertaTokenizer,
RobertaForMaskedLM,
LogitsProcessorList,
TopKLogitsWarper,
TemperatureLogitsWarper,
TypicalLogitsWarper,
)
nltk.download('punkt')
device = "cuda" if torch.cuda.is_available() else "cpu"
pretrained = "roberta-large" if device == "cuda" else "roberta-base"
tokenizer = RobertaTokenizer.from_pretrained(pretrained)
model = RobertaForMaskedLM.from_pretrained(pretrained)
model = model.to(device)
max_len = 20
top_k = 100
temperature = 1
typical_p = 0
burnin = 250
max_iter = 500
# adapted from https://github.com/nyu-dl/bert-gen
def generate_step(out: object,
gen_idx: int,
top_k: int = top_k,
temperature: float = temperature,
typical_p: float = typical_p,
sample: bool = False) -> list:
""" Generate a word from from out[gen_idx]
args:
- out (torch.Tensor): tensor of logits of size batch_size x seq_len x vocab_size
- gen_idx (int): location for which to generate
- top_k (int): if >0, only sample from the top k most probable words
- temperature (float): sampling temperature
- typical_p (float): if >0 use typical sampling
- sample (bool): if True, sample from full distribution.
returns:
- list: batch_size tokens
"""
logits = out.logits[:, gen_idx]
warpers = LogitsProcessorList()
if temperature:
warpers.append(TemperatureLogitsWarper(temperature))
if top_k > 0:
warpers.append(TopKLogitsWarper(top_k))
if typical_p > 0:
if typical_p >= 1:
typical_p = 0.999
warpers.append(TypicalLogitsWarper(typical_p))
logits = warpers(None, logits)
if sample:
probs = torch.nn.functional.softmax(logits, dim=-1)
next_tokens = torch.multinomial(probs, num_samples=1).squeeze(1)
else:
next_tokens = torch.argmax(logits, dim=-1)
return next_tokens.tolist()
# adapted from https://github.com/nyu-dl/bert-gen
def parallel_sequential_generation(seed_text: str,
seed_end_text: str,
max_len: int = max_len,
top_k: int = top_k,
temperature: float = temperature,
typical_p: float = typical_p,
max_iter: int = max_iter,
burnin: int = burnin) -> str:
""" Generate text consistent with preceding and following text
Args:
- seed_text (str): preceding text
- seed_end_text (str): following text
- top_k (int): if >0, only sample from the top k most probable words
- temperature (float): sampling temperature
- typical_p (float): if >0 use typical sampling
- max_iter (int): number of iterations in MCMC
- burnin: during burn-in period, sample from full distribution; afterwards take argmax
Returns:
- string: generated text to insert between seed_text and seed_end_text
"""
inp = tokenizer(seed_text + tokenizer.mask_token * max_len + seed_end_text,
return_tensors='pt')
masked_tokens = np.where(
inp['input_ids'][0].numpy() == tokenizer.mask_token_id)[0]
seed_len = masked_tokens[0]
inp = inp.to(device)
for ii in range(max_iter):
kk = np.random.randint(0, max_len)
idxs = generate_step(model(**inp),
gen_idx=seed_len + kk,
top_k=top_k if (ii >= burnin) else 0,
temperature=temperature,
typical_p=typical_p,
sample=(ii < burnin))
inp['input_ids'][0][seed_len + kk] = idxs[0]
tokens = inp['input_ids'].cpu().numpy()[0][masked_tokens]
tokens = tokens[(np.where((tokens != tokenizer.eos_token_id)
& (tokens != tokenizer.bos_token_id)))]
return tokenizer.decode(tokens)
def inbertolate(doc: str,
max_len: int = max_len,
top_k: int = top_k,
temperature: float = temperature,
typical_p: float = typical_p,
max_iter: int = max_iter,
burnin: int = burnin) -> str:
""" Pad out document generating every other sentence
Args:
- doc (str): document text
- max_len (int): number of tokens to insert between sentences
- top_k (int): if >0, only sample from the top k most probable words
- temperature (float): sampling temperature
- typical_p (float): if >0 use typical sampling
- max_iter (int): number of iterations in MCMC
- burnin: during burn-in period, sample from full distribution; afterwards take argmax
Returns:
- string: generated text to insert between seed_text and seed_end_text
"""
new_doc = ''
paras = doc.split('\n')
for para in paras:
para = sent_tokenize(para)
if para == '':
new_doc += '\n'
continue
para += ['']
for sentence in range(len(para) - 1):
new_doc += para[sentence] + ' '
new_doc += parallel_sequential_generation(
para[sentence],
para[sentence + 1],
max_len=max_len,
top_k=top_k,
temperature=float(temperature),
typical_p=typical_p,
burnin=burnin,
max_iter=max_iter) + ' '
new_doc += '\n'
return new_doc
demo = gr.Interface(
fn=inbertolate,
title="inBERTolate",
description=f"Hit your word count by using BERT ({pretrained}) to pad out your essays!",
inputs=[
gr.Textbox(label="Text", lines=10),
gr.Slider(label="Maximum length to insert between sentences",
minimum=1,
maximum=40,
step=1,
value=max_len),
gr.Slider(label="Top k", minimum=0, maximum=200, value=top_k),
gr.Slider(label="Temperature",
minimum=0,
maximum=2,
value=temperature),
gr.Slider(label="Typical p",
minimum=0,
maximum=1,
value=typical_p),
gr.Slider(label="Maximum iterations",
minimum=0,
maximum=1000,
value=max_iter),
gr.Slider(label="Burn-in",
minimum=0,
maximum=500,
value=burnin),
],
outputs=gr.Textbox(label="Expanded text", lines=30))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--port', type=int)
parser.add_argument('--server', type=int)
args = parser.parse_args()
demo.launch(server_name=args.server or '0.0.0.0', server_port=args.port)