from __future__ import absolute_import import streamlit as st import torch import os import sys import pickle import torch import json import random import logging import argparse import numpy as np from io import open from itertools import cycle import torch.nn as nn from model import Seq2Seq from tqdm import tqdm, trange import regex as re from torch.utils.data import ( DataLoader, Dataset, SequentialSampler, RandomSampler, TensorDataset, ) from torch.utils.data.distributed import DistributedSampler from transformers import ( WEIGHTS_NAME, AdamW, get_linear_schedule_with_warmup, RobertaConfig, RobertaModel, RobertaTokenizer, ) from huggingface_hub import hf_hub_download import io # def list_files(startpath, prev_level=0): # # list files recursively # for root, dirs, files in os.walk(startpath): # level = root.replace(startpath, "").count(os.sep) + prev_level # indent = " " * 4 * (level) # print("{}{}/".format(indent, os.path.basename(root))) # # st.write("{}{}/".format(indent, os.path.basename(root))) # subindent = " " * 4 * (level + 1) # for f in files: # print("{}{}".format(subindent, f)) # # st.write("{}{}".format(subindent, f)) # for d in dirs: # list_files(d, level + 1) class CONFIG: max_source_length = 256 max_target_length = 128 beam_size = 3 local_rank = -1 no_cuda = False do_train = True do_eval = True do_test = True train_batch_size = 12 eval_batch_size = 32 model_type = "roberta" model_name_or_path = "microsoft/codebert-base" output_dir = "/content/drive/MyDrive/CodeSummarization" load_model_path = None train_filename = "dataset/python/train.jsonl" dev_filename = "dataset/python/valid.jsonl" test_filename = "dataset/python/test.jsonl" config_name = "" tokenizer_name = "" cache_dir = "cache" save_every = 5000 gradient_accumulation_steps = 1 learning_rate = 5e-5 weight_decay = 1e-4 adam_epsilon = 1e-8 max_grad_norm = 1.0 num_train_epochs = 3.0 max_steps = -1 warmup_steps = 0 train_steps = 100000 eval_steps = 10000 n_gpu = torch.cuda.device_count() # download model with streamlit cache decorator @st.cache_resource def download_model(): if not os.path.exists(r"models/pytorch_model.bin"): os.makedirs("./models", exist_ok=True) path = hf_hub_download( repo_id="tmnam20/codebert-code-summarization", filename="pytorch_model.bin", cache_dir="cache", local_dir=os.path.join(os.getcwd(), "models"), local_dir_use_symlinks=False, force_download=True, ) # load with streamlit cache decorator # @st.cache(persist=False, show_spinner=True, allow_output_mutation=True) @st.cache_resource def load_tokenizer_and_model(pretrained_path): device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Config model config_class, model_class, tokenizer_class = ( RobertaConfig, RobertaModel, RobertaTokenizer, ) model_config = config_class.from_pretrained( CONFIG.config_name if CONFIG.config_name else CONFIG.model_name_or_path, cache_dir=CONFIG.cache_dir, ) # model_config.save_pretrained("config") # load tokenizer tokenizer = tokenizer_class.from_pretrained( CONFIG.tokenizer_name if CONFIG.tokenizer_name else CONFIG.model_name_or_path, cache_dir=CONFIG.cache_dir, # do_lower_case=args.do_lower_case ) # load encoder from pretrained RoBERTa encoder = model_class.from_pretrained( CONFIG.model_name_or_path, config=model_config, cache_dir=CONFIG.cache_dir ) # build decoder decoder_layer = nn.TransformerDecoderLayer( d_model=model_config.hidden_size, nhead=model_config.num_attention_heads ) decoder = nn.TransformerDecoder(decoder_layer, num_layers=6) # build seq2seq model from pretrained encoder and from-scratch decoder model = Seq2Seq( encoder=encoder, decoder=decoder, config=model_config, beam_size=CONFIG.beam_size, max_length=CONFIG.max_target_length, sos_id=tokenizer.cls_token_id, eos_id=tokenizer.sep_token_id, ) try: state_dict = torch.load( os.path.join(os.getcwd(), "models", "pytorch_model.bin"), map_location=device, ) except RuntimeError as e: print(e) try: state_dict = torch.load( os.path.join(os.getcwd(), "models", "pytorch_model.bin"), map_location="cpu", ) except RuntimeError as e: print(e) state_dict = torch.load( os.path.join(os.getcwd(), "models", "pytorch_model_cpu.bin"), map_location="cpu", ) del state_dict["encoder.embeddings.position_ids"] model.load_state_dict(state_dict) # model = model.to("cpu") # torch.save(model.state_dict(), os.path.join(os.getcwd(), "models", "pytorch_model_cpu.bin")) model = model.to(device) return tokenizer, model, device @st.cache_data def preprocessing(code_segment): # remove newlines code_segment = re.sub(r"\n", " ", code_segment) # remove docstring code_segment = re.sub(r'""".*?"""', "", code_segment, flags=re.DOTALL) # remove multiple spaces code_segment = re.sub(r"\s+", " ", code_segment) # remove comments code_segment = re.sub(r"#.*", "", code_segment) # remove html tags code_segment = re.sub(r"<.*?>", "", code_segment) # remove urls code_segment = re.sub(r"http\S+", "", code_segment) # split special chars into different tokens code_segment = re.sub(r"([^\w\s])", r" \1 ", code_segment) return code_segment.split() def generate_docstring(model, tokenizer, device, code_segemnt, max_length=None): input_tokens = preprocessing(code_segemnt) encoded_input = tokenizer.encode_plus( input_tokens, max_length=CONFIG.max_source_length, pad_to_max_length=True, truncation=True, return_tensors="pt", ) input_ids = encoded_input["input_ids"].to(device) input_mask = encoded_input["attention_mask"].to(device) if max_length is not None: model.max_length = max_length summary = model(input_ids, input_mask) # decode summary with tokenizer summaries = [] for i in range(summary.shape[1]): summaries.append(tokenizer.decode(summary[0][i], skip_special_tokens=True)) return summaries # return tokenizer.decode(summary[0][0], skip_special_tokens=True)