from transformers import AutoConfig from transformers import GPT2Tokenizer, GPT2LMHeadModel from utils import SPECIAL_TOKENS, build_input_from_segments, add_special_tokens_ from utils import get_dataset, download_pretrained_model import timeit import logging logging.basicConfig(format='%(asctime)s: %(message)s',level=logging.INFO) logger = logging.getLogger(__file__) import random from itertools import chain from pprint import pformat #import warnings import torch import torch.nn.functional as F import boto3 import os import tarfile import io import base64 import json import re from types import SimpleNamespace import warnings warnings.simplefilter(action='ignore', category=FutureWarning) print("Loading Model.py module...") s3 = boto3.client('s3') def is_list_of_strings(lst): if lst and isinstance(lst, list): return all(isinstance(elem, str) for elem in lst) else: return False class ServerlessModel: def __init__(self, model_path=None, s3_bucket=None, file_prefix=None, efs_path=None): #logging.basicConfig(level=logging.INFO) #logger = logging.getLogger(__file__) print("Trying to init model") self.model = None self.tokenizer = None self.dataset = None if s3_bucket is None: if model_path is not None and efs_path is None : print("Loading model from local..") self.model, self.tokenizer, self.dataset = self.from_pretrained_local_path(model_path, file_prefix) logging.debug("Done loading") else: ##Load model from EFS, with config and tokenizer from local lambda space if model_path is not None and efs_path is not None: print("loading model from EFS") self.model, self.tokenizer, self.dataset = self.from_pretrained(model_path, s3_bucket, file_prefix, efs_path=efs_path) logging.debug("Done loading") else: #no bucket no path fail print("ERROR: Model path not found") raise Exception("No model path found") else: print("Loading model from s3 path..") print(s3_bucket) self.model, self.tokenizer, self.dataset = self.from_pretrained( model_path, s3_bucket, file_prefix) logging.debug("Done loading") self.parameters = { 'max_length' : 25, #60 'min_length' : 1, 'device' : 'cpu', 'temperature' : 1.0, #1.5 'dynamic_temperature' : True, 'dynamic_temperature_range' : 0.15, 'top_k' : 50, #50 'top_p' : 0.9, #0.9 'no_sample' : False, 'max_history' : 2, } print("Done initializing model") def from_pretrained(self, model_path: str, s3_bucket: str, file_prefix: str , efs_path = None ): if efs_path is None: model = self.load_model_from_s3(model_path, s3_bucket, file_prefix) else: model = self.load_model_from_efs(model_path,efs_path) print("Model loaded.") print("loading tokenizer from path: ", model_path) tokenizer = self.load_tokenizer(model_path) # Get sequence length max of 1024 tokenizer.model_max_length = 1024 print("tokenizer loaded") self.model = model self.tokenizer = tokenizer add_special_tokens_(self.model, self.tokenizer) #Will only use if it cannot find cache DATASET_PATH = model_path + '/personafile.json' #maynot be needed if cache exists! ##We have cache no need for dataset path DATASET_CACHE = model_path +'/persona_good' ##persona_good_gpt2_cache (no zip extension) dataset = self.load_dataset(DATASET_PATH, DATASET_CACHE) self.dataset = dataset print("dataset loaded") model.eval() print("Model in eval mode, dataset and tokenizer also loaded") return model, tokenizer, dataset def load_model_from_path(self, model_path:str): print("Loading model from path:",model_path) model = GPT2LMHeadModel.from_pretrained(model_path) model.eval() self.model = model return model def from_pretrained_local_path(self, model_path: str, file_prefix: str): print("Local model loading...") model = GPT2LMHeadModel.from_pretrained(model_path) tokenizer = self.load_tokenizer(model_path) self.model = model self.tokenizer = tokenizer # Get sequence length max of 1024 tokenizer.model_max_length = 1024 add_special_tokens_(model, tokenizer) #Will only use if it cannot find cache DATASET_PATH = model_path + '/personafile.json' #maynot be needed if cache exists! ##We have cache no need for dataset path DATASET_CACHE = model_path +'/persona_good' ##persona_good_gpt2_cache (no zip extension) dataset = self.load_dataset(DATASET_PATH, DATASET_CACHE) self.dataset = dataset model.eval() print("Model in eval mode, dataset and tokenizer also loaded") return model, tokenizer, dataset def load_model_from_efs(self, model_path: str, efs_path: str): if model_path and efs_path: config = AutoConfig.from_pretrained(f'{model_path}/config.json') with open(efs_path, 'rb') as f: # state messes things just use classics! state = torch.load(io.BytesIO( f.read()), map_location=lambda storage, loc: storage) '''alt with open(efs_path, 'rb') as f: state = pickle.load(f, encoding='latin1') ''' model = GPT2LMHeadModel.from_pretrained( pretrained_model_name_or_path=None, state_dict=state, config=config) return model else: raise KeyError('No model config path or EFS bin path') def load_model_from_s3(self, model_path: str, s3_bucket: str, file_prefix: str): if model_path and s3_bucket and file_prefix: obj = s3.get_object(Bucket=s3_bucket, Key=file_prefix) bytestream = io.BytesIO(obj['Body'].read()) tar = tarfile.open(fileobj=bytestream, mode="r:gz") config = AutoConfig.from_pretrained(f'{model_path}/config.json') for member in tar.getmembers(): if member.name.startswith("./._"): # osx tar adds ./._XXX copyfile need to pass this file continue if member.name.endswith(".bin"): f = tar.extractfile(member) print("Model file extracted: " + member.name) # state messes things just use classics! state = torch.load(io.BytesIO( f.read()), map_location=lambda storage, loc: storage) model = GPT2LMHeadModel.from_pretrained( pretrained_model_name_or_path=None, state_dict=state, config=config) #model = AutoModelWithLMHead.from_pretrained("./", config=config) return model else: raise KeyError('No S3 Bucket and Key Prefix provided') def load_tokenizer(self, model_path: str): print("loading tokenizer") tokenizer = GPT2Tokenizer.from_pretrained(model_path) return tokenizer def load_dataset(self, DATASET_PATH: str, DATASET_CACHE: str, use_efs= False): print("loading dataset") dataset = get_dataset(self.tokenizer, DATASET_PATH, DATASET_CACHE) return dataset def encode(self, question, context): encoded = self.tokenizer.encode_plus(question, context) return encoded["input_ids"], encoded["attention_mask"] def decode(self, token): answer_tokens = self.tokenizer.convert_ids_to_tokens( token, skip_special_tokens=True) return self.tokenizer.convert_tokens_to_string(answer_tokens) def generate_word(self, text, model=None, tokenizer=None, noprint=False): if model is None or tokenizer is None: print("ERROR: No model or tokenizer") return None inputs = tokenizer(text, return_tensors="pt") # model output outputs = model(**inputs, labels=inputs["input_ids"]) loss, logits = outputs[:2] predicted_index = torch.argmax(logits[0, -1, :]).item() predicted_text = tokenizer.decode([predicted_index]) # results if not noprint: print('input text:', text) print('predicted text:', predicted_text) return predicted_text def top_filtering(self,logits, top_k=0., top_p=0.9, threshold=-float('Inf'), filter_value=-float('Inf')): """ Filter a distribution of logits using top-k, top-p (nucleus) and/or threshold filtering Args: logits: logits distribution shape (vocabulary size) top_k: <=0: no filtering, >0: keep only top k tokens with highest probability. top_p: <=0.0: no filtering, >0.0: keep only a subset S of candidates, where S is the smallest subset whose total probability mass is greater than or equal to the threshold top_p. In practice, we select the highest probability tokens whose cumulative probability mass exceeds the threshold top_p. threshold: a minimal threshold to keep logits """ assert logits.dim() == 1 # Only work for batch size 1 for now - could update but it would obfuscate a bit the code top_k = min(top_k, logits.size(-1)) if top_k > 0: # Remove all tokens with a probability less than the last token in the top-k tokens indices_to_remove = logits < torch.topk(logits, top_k)[0][..., -1, None] logits[indices_to_remove] = filter_value if top_p > 0.0: # Compute cumulative probabilities of sorted tokens sorted_logits, sorted_indices = torch.sort(logits, descending=True) cumulative_probabilities = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1) # Remove tokens with cumulative probability above the threshold sorted_indices_to_remove = cumulative_probabilities > top_p # Shift the indices to the right to keep also the first token above the threshold sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() sorted_indices_to_remove[..., 0] = 0 # Back to unsorted indices and set them to -infinity indices_to_remove = sorted_indices[sorted_indices_to_remove] logits[indices_to_remove] = filter_value indices_to_remove = logits < threshold logits[indices_to_remove] = filter_value return logits def sample_sequence(self,personality, history, tokenizer, model, params=None, current_output=None): start = timeit.default_timer() if params is not None: for k,v in params.items(): self.parameters[k] = v ##to access as dot notation ##param = SimpleNamespace(**parameters) special_tokens_ids = tokenizer.convert_tokens_to_ids(SPECIAL_TOKENS) if current_output is None: current_output = [] for i in range(self.parameters['max_length']): #print(">: {}/{} ".format(i, self.parameters['max_length'] ) ,end='\r', flush=True) instance = build_input_from_segments(personality, history, current_output, tokenizer, with_eos=False) input_ids = torch.tensor(instance["input_ids"], device=self.parameters['device']).unsqueeze(0) token_type_ids = torch.tensor(instance["token_type_ids"], device=self.parameters['device']).unsqueeze(0) logits = model(input_ids, token_type_ids=token_type_ids) if isinstance(logits, tuple): # for gpt2 and maybe others logits = logits[0] #SPECIAL Dynamic Temperature mode if self.parameters['dynamic_temperature']: #random temperature withing -0.1 / + 0.1 or 'dynamic_temperature_range' rand_range = random.uniform(-1 * self.parameters['dynamic_temperature_range'] , self.parameters['dynamic_temperature_range']) temperature = self.parameters['temperature'] + rand_range else: temperature = self.parameters['temperature'] logits = logits[0, -1, :] / temperature logits = self.top_filtering(logits, top_k=self.parameters['top_k'], top_p=self.parameters['top_p']) probs = F.softmax(logits, dim=-1) prev = torch.topk(probs, 1)[1] if self.parameters['no_sample'] else torch.multinomial(probs, 1) if i < self.parameters['min_length'] and prev.item() in special_tokens_ids: while prev.item() in special_tokens_ids: if probs.max().item() == 1: warnings.warn("Warning: model generating special token with probability 1.") break # avoid infinitely looping over special token prev = torch.multinomial(probs, num_samples=1) if prev.item() in special_tokens_ids: ##breaks here if found end of anser!! break current_output.append(prev.item()) stop = timeit.default_timer() #print(f"\nPredict in {stop - start} seconds\n") return current_output def dump_personalities_with_movies(self): personalities = [ [dialog["name"], dialog["moviename"]] for dialog in self.dataset["train"]] name_list = [] for person in personalities: try: name_tokenized = person[0] name = self.tokenizer.decode(name_tokenized) movies_tokenized = person[1] movienames= "" ##check type of first element ##if int , only 1 movie if isinstance(movies_tokenized[0], int): movienames = self.tokenizer.decode(movies_tokenized) movienames = movienames.replace(".txt", "") else: for movie in movies_tokenized: moviename = self.tokenizer.decode(movie) moviename = moviename.replace(".txt", "") movienames = movienames + " / " + moviename name_list.append([name,movienames]) except: print("Could not do name:", self.tokenizer.decode(person[0])) return name_list def dump_personalities(self,as_list=False): personalities = [dialog["personality"] for dialog in self.dataset["train"]] name_list = [] for person in personalities: name_tokenized = person[-1] name = self.tokenizer.decode(name_tokenized) name = name.replace("My name is ", "")[:-1] name_list.append(name) #print(name) if as_list: return name_list else: return " | ".join(name_list) def get_personalities(self): ##THIS FUNCTION IS NOW LEGACY, USE dump_personalities personalities = [dialog["personality"] for dialog in self.dataset["train"]] people = [item[-1][-10:-1] for item in personalities] ##will get My Name is Something people_list = self.tokenizer.decode(chain(*people)) #print( " | ".join( people_list.split(" ") ) ) text_to_remove = "My name is " people_list = people_list.replace(text_to_remove, " | ") #characters = " | ".join( people_list.split(" ") ) return people_list def select_personality(self,characters,select_random=False): ##FIND people list ##this is for debug, usually has " is Name" #people = [item[-1][-3:-1] for item in personalities] personalities = [dialog["personality"] for dialog in self.dataset["train"]] if select_random : return random.choice(personalities) #people = [item[-1][-2:-1] for item in personalities] #people_list = self.tokenizer.decode(chain(*people)) #print( " | ".join( people_list.split(" ") ) ) personality = None name = "My name is " + str(characters) name_token = self.tokenizer.encode(name) #print(name_token) index_start = len(name_token)+1 try: index_of_name = [ item[-1][-1*index_start: -1]== name_token for item in personalities].index(True) #print("Selected {} is at: {}".format(characters, str(index_of_name) ) ) personality = personalities[index_of_name] except: print("Not found ... Select again") return None ##TALK TO HAL #personality_hal = ["that's true. My name is Hal"] #personality = tokenize(personality_hal) #print(personality) print("Selected personality: %s", self.tokenizer.decode(chain(*personality))) return personality def get_answer(self, input_text, personality, history, params=None): ##Check length of history (to save 1 computation!) if len(history)>0: #mostly it will be empty list so need a length check for performance #would do string check also but just assume it is list of list of strings, as not public new_hist = [] for ele in history: new_hist.append( self.tokenizer.encode(ele) ) history = new_hist.copy() history.append(self.tokenizer.encode(input_text)) with torch.no_grad(): out_ids = self.sample_sequence(personality, history, self.tokenizer, self.model, params=params) history.append(out_ids) history = history[-(2*self.parameters['max_history']+1):] out_text = self.tokenizer.decode(out_ids, skip_special_tokens=True) #print(out_text) history_decoded = [] for ele in history: history_decoded.append(self.tokenizer.decode(ele)) return out_text, history_decoded, self.parameters def predict(self, question, parameter_dict): try: answer = self.generate_text(question, model=self.model, tokenizer=self.tokenizer, parameter_dict=parameter_dict, ) return answer except Exception as e: raise Exception( "Runtime error see cloudwatch logs : {}".format(repr(e)))