Spaces:
Runtime error
Runtime error
import gradio as gr | |
import requests | |
from torch import nn | |
from torch.nn import CrossEntropyLoss | |
from transformers import AutoTokenizer, T5ForConditionalGeneration, AutoModelForSeq2SeqLM, T5Config | |
import torch | |
MAX_SOURCE_LENGTH = 512 | |
class ReviewerModel(T5ForConditionalGeneration): | |
def __init__(self, config): | |
super().__init__(config) | |
self.cls_head = nn.Linear(self.config.d_model, 2, bias=True) | |
self.init() | |
def init(self): | |
nn.init.xavier_uniform_(self.lm_head.weight) | |
factor = self.config.initializer_factor | |
self.cls_head.weight.data.normal_(mean=0.0, \ | |
std=factor * ((self.config.d_model) ** -0.5)) | |
self.cls_head.bias.data.zero_() | |
def forward( | |
self, *argv, **kwargs | |
): | |
r""" | |
Doc from Huggingface transformers: | |
labels (:obj:`torch.LongTensor` of shape :obj:`(batch_size,)`, `optional`): | |
Labels for computing the sequence classification/regression loss. Indices should be in :obj:`[-100, 0, ..., | |
config.vocab_size - 1]`. All labels set to ``-100`` are ignored (masked), the loss is only computed for | |
labels in ``[0, ..., config.vocab_size]`` | |
Returns: | |
Examples:: | |
>>> from transformers import T5Tokenizer, T5ForConditionalGeneration | |
>>> tokenizer = T5Tokenizer.from_pretrained('t5-small') | |
>>> model = T5ForConditionalGeneration.from_pretrained('t5-small') | |
>>> # training | |
>>> input_ids = tokenizer('The <extra_id_0> walks in <extra_id_1> park', return_tensors='pt').input_ids | |
>>> labels = tokenizer('<extra_id_0> cute dog <extra_id_1> the <extra_id_2>', return_tensors='pt').input_ids | |
>>> outputs = model(input_ids=input_ids, labels=labels) | |
>>> loss = outputs.loss | |
>>> logits = outputs.logits | |
>>> # inference | |
>>> input_ids = tokenizer("summarize: studies have shown that owning a dog is good for you", return_tensors="pt").input_ids # Batch size 1 | |
>>> outputs = model.generate(input_ids) | |
>>> print(tokenizer.decode(outputs[0], skip_special_tokens=True)) | |
>>> # studies have shown that owning a dog is good for you. | |
""" | |
if "cls" in kwargs: | |
assert ( | |
"input_ids" in kwargs and \ | |
"labels" in kwargs and \ | |
"attention_mask" in kwargs | |
) | |
return self.cls( | |
input_ids=kwargs["input_ids"], | |
labels=kwargs["labels"], | |
attention_mask=kwargs["attention_mask"], | |
) | |
if "input_labels" in kwargs: | |
assert ( | |
"input_ids" in kwargs and \ | |
"input_labels" in kwargs and \ | |
"decoder_input_ids" in kwargs and \ | |
"attention_mask" in kwargs and \ | |
"decoder_attention_mask" in kwargs | |
), "Please give these arg keys." | |
input_ids = kwargs["input_ids"] | |
input_labels = kwargs["input_labels"] | |
decoder_input_ids = kwargs["decoder_input_ids"] | |
attention_mask = kwargs["attention_mask"] | |
decoder_attention_mask = kwargs["decoder_attention_mask"] | |
if "encoder_loss" not in kwargs: | |
encoder_loss = True | |
else: | |
encoder_loss = kwargs["encoder_loss"] | |
return self.review_forward(input_ids, input_labels, decoder_input_ids, attention_mask, | |
decoder_attention_mask, encoder_loss) | |
return super().forward(*argv, **kwargs) | |
def cls( | |
self, | |
input_ids, | |
labels, | |
attention_mask, | |
): | |
encoder_outputs = self.encoder( \ | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
output_attentions=False, | |
return_dict=False | |
) | |
hidden_states = encoder_outputs[0] | |
first_hidden = hidden_states[:, 0, :] | |
first_hidden = nn.Dropout(0.3)(first_hidden) | |
logits = self.cls_head(first_hidden) | |
loss_fct = CrossEntropyLoss() | |
if labels != None: | |
loss = loss_fct(logits, labels) | |
return loss | |
return logits | |
def review_forward( | |
self, | |
input_ids, | |
input_labels, | |
decoder_input_ids, | |
attention_mask, | |
decoder_attention_mask, | |
encoder_loss=True | |
): | |
encoder_outputs = self.encoder( \ | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
output_attentions=False, | |
return_dict=False | |
) | |
hidden_states = encoder_outputs[0] | |
decoder_inputs = self._shift_right(decoder_input_ids) | |
# Decode | |
decoder_outputs = self.decoder( | |
input_ids=decoder_inputs, | |
attention_mask=decoder_attention_mask, | |
encoder_hidden_states=hidden_states, | |
encoder_attention_mask=attention_mask, | |
output_attentions=False, | |
return_dict=False | |
) | |
sequence_output = decoder_outputs[0] | |
if self.config.tie_word_embeddings: # this is True default | |
sequence_output = sequence_output * (self.model_dim ** -0.5) | |
if encoder_loss: | |
# print(self.encoder.get_input_embeddings().weight.shape) | |
cls_logits = nn.functional.linear(hidden_states, self.encoder.get_input_embeddings().weight) | |
# cls_logits = self.cls_head(hidden_states) | |
lm_logits = self.lm_head(sequence_output) | |
if decoder_input_ids is not None: | |
lm_loss_fct = CrossEntropyLoss(ignore_index=0) # Warning: PAD_ID should be 0 | |
loss = lm_loss_fct(lm_logits.view(-1, lm_logits.size(-1)), decoder_input_ids.view(-1)) | |
if encoder_loss and input_labels is not None: | |
cls_loss_fct = CrossEntropyLoss(ignore_index=-100) | |
loss += cls_loss_fct(cls_logits.view(-1, cls_logits.size(-1)), input_labels.view(-1)) | |
return loss | |
return cls_logits, lm_logits | |
def prepare_models(): | |
tokenizer = AutoTokenizer.from_pretrained("microsoft/codereviewer") | |
tokenizer.special_dict = { | |
f"<e{i}>": tokenizer.get_vocab()[f"<e{i}>"] for i in range(99, -1, -1) | |
} | |
tokenizer.mask_id = tokenizer.get_vocab()["<mask>"] | |
tokenizer.bos_id = tokenizer.get_vocab()["<s>"] | |
tokenizer.pad_id = tokenizer.get_vocab()["<pad>"] | |
tokenizer.eos_id = tokenizer.get_vocab()["</s>"] | |
tokenizer.msg_id = tokenizer.get_vocab()["<msg>"] | |
tokenizer.keep_id = tokenizer.get_vocab()["<keep>"] | |
tokenizer.add_id = tokenizer.get_vocab()["<add>"] | |
tokenizer.del_id = tokenizer.get_vocab()["<del>"] | |
tokenizer.start_id = tokenizer.get_vocab()["<start>"] | |
tokenizer.end_id = tokenizer.get_vocab()["<end>"] | |
config = T5Config.from_pretrained("microsoft/codereviewer") | |
model = ReviewerModel.from_pretrained("microsoft/codereviewer", config=config) | |
model.eval() | |
return tokenizer, model | |
def pad_assert(tokenizer, source_ids): | |
source_ids = source_ids[:MAX_SOURCE_LENGTH - 2] | |
source_ids = [tokenizer.bos_id] + source_ids + [tokenizer.eos_id] | |
pad_len = MAX_SOURCE_LENGTH - len(source_ids) | |
source_ids += [tokenizer.pad_id] * pad_len | |
assert len(source_ids) == MAX_SOURCE_LENGTH, "Not equal length." | |
return source_ids | |
def encode_diff(tokenizer, diff, msg, source): | |
difflines = diff.split("\n")[1:] # remove start @@ | |
difflines = [line for line in difflines if len(line.strip()) > 0] | |
map_dic = {"-": 0, "+": 1, " ": 2} | |
def f(s): | |
if s in map_dic: | |
return map_dic[s] | |
else: | |
return 2 | |
labels = [f(line[0]) for line in difflines] | |
difflines = [line[1:].strip() for line in difflines] | |
inputstr = "<s>" + source + "</s>" | |
inputstr += "<msg>" + msg | |
for label, line in zip(labels, difflines): | |
if label == 1: | |
inputstr += "<add>" + line | |
elif label == 0: | |
inputstr += "<del>" + line | |
else: | |
inputstr += "<keep>" + line | |
source_ids = tokenizer.encode(inputstr, max_length=MAX_SOURCE_LENGTH, truncation=True)[1:-1] | |
source_ids = pad_assert(tokenizer, source_ids) | |
return source_ids | |
class FileDiffs(object): | |
def __init__(self, diff_string): | |
diff_array = diff_string.split("\n") | |
self.file_name = diff_array[0] | |
self.file_path = self.file_name.split("a/", 1)[1].rsplit("b/", 1)[0] | |
self.diffs = list() | |
for line in diff_array[4:]: | |
if line.startswith("@@"): | |
self.diffs.append(str()) | |
self.diffs[-1] += "\n" + line | |
def review_commit(user="p4vv37", repository="ueflow", commit="610a8c7b02b946bc9e5e26e6dacbba0e2abba259"): | |
tokenizer, model = prepare_models() | |
# Get diff and commit metadata from GitHub API | |
commit_metadata = requests.get(F"https://api.github.com/repos/{user}/{repository}/commits/{commit}").json() | |
msg = commit_metadata["commit"]["message"] | |
diff_data = requests.get(F"https://api.github.com/repos/{user}/{repository}/commits/{commit}", | |
headers={"Accept": "application/vnd.github.diff"}) | |
code_diff = diff_data.text | |
# Parse diff into FileDiffs objects | |
files_diffs = list() | |
for file in code_diff.split("diff --git"): | |
if len(file) > 0: | |
fd = FileDiffs(file) | |
files_diffs.append(fd) | |
# Generate comments for each diff | |
output = "" | |
for fd in files_diffs: | |
output += F"File:{fd.file_path}\n" | |
source = requests.get(F"https://raw.githubusercontent.com/{user}/{repository}/^{commit}/{fd.file_path}").text | |
for diff in fd.diffs: | |
inputs = torch.tensor([encode_diff(tokenizer, diff, msg, source)], dtype=torch.long).to("cpu") | |
inputs_mask = inputs.ne(tokenizer.pad_id) | |
logits = model( | |
input_ids=inputs, | |
cls=True, | |
attention_mask=inputs_mask, | |
labels=None, | |
use_cache=True, | |
num_beams=5, | |
early_stopping=True, | |
max_length=100 | |
) | |
needs_review = torch.argmax(logits, dim=-1).cpu().numpy()[0] | |
if not needs_review: | |
continue | |
preds = model.generate(inputs, | |
attention_mask=inputs_mask, | |
use_cache=True, | |
num_beams=5, | |
early_stopping=True, | |
max_length=100, | |
num_return_sequences=2 | |
) | |
preds = list(preds.cpu().numpy()) | |
pred_nls = [tokenizer.decode(_id[2:], skip_special_tokens=True, clean_up_tokenization_spaces=False) | |
for _id in preds] | |
output += diff + "\n#######\nComment:\n#######\n" + pred_nls[0] + "\n#######\n" | |
return output | |
description = "An interface for running " \ | |
"\"Microsoft CodeBERT CodeReviewer: Pre-Training for Automating Code Review Activities.\" " \ | |
"(microsoft/codereviewer) on GitHub commits." | |
examples = [ | |
["p4vv37", "ueflow", "610a8c7b02b946bc9e5e26e6dacbba0e2abba259"], | |
["microsoft", "vscode", "378b0d711f6b82ac59b47fb246906043a6fb995a"], | |
] | |
iface = gr.Interface(fn=review_commit, | |
description=description, | |
inputs=["text", "text", "text"], | |
outputs="text", | |
examples=examples) | |
iface.launch() | |