Spaces:
Runtime error
Runtime error
import logging | |
import typing as t | |
import torch | |
import transformers | |
logger = logging.getLogger(__name__) | |
def build_model_and_tokenizer_for( | |
model_name: str | |
) -> t.Tuple[transformers.AutoModelForCausalLM, transformers.AutoTokenizer]: | |
'''Sets up the model and accompanying objects.''' | |
logger.info(f"Loading tokenizer for {model_name}") | |
tokenizer = transformers.AutoTokenizer.from_pretrained(model_name) | |
# NOTE(11b): non-OPT models support passing this in at inference time, might | |
# be worth refactoring for a debug version so we're able to experiment on | |
# the fly | |
bad_words_ids = [ | |
tokenizer(bad_word, add_special_tokens=False).input_ids | |
for bad_word in _build_bad_words_list_for(model_name) | |
] | |
logger.info(f"Loading the {model_name} model") | |
model = transformers.AutoModelForCausalLM.from_pretrained( | |
model_name, bad_words_ids=bad_words_ids) | |
model.eval().half().to("cuda") | |
logger.info("Model and tokenizer are ready") | |
return model, tokenizer | |
def run_raw_inference(model: transformers.AutoModelForCausalLM, | |
tokenizer: transformers.AutoTokenizer, prompt: str, | |
user_message: str, **kwargs: t.Any) -> str: | |
''' | |
Runs inference on the model, and attempts to returns only the newly | |
generated text. | |
:param model: Model to perform inference with. | |
:param tokenizer: Tokenizer to tokenize input with. | |
:param prompt: Input to feed to the model. | |
:param user_message: The user's raw message, exactly as appended to the end | |
of `prompt`. Used for trimming the original input from the model output. | |
:return: Decoded model generation. | |
''' | |
tokenized_items = tokenizer(prompt, return_tensors="pt").to("cuda") | |
# Atrocious code to stop generation when the model outputs "\nYou: " in | |
# freshly generated text. Feel free to send in a PR if you know of a | |
# cleaner way to do this. | |
stopping_criteria_list = transformers.StoppingCriteriaList([ | |
_SentinelTokenStoppingCriteria( | |
sentinel_token_ids=tokenizer( | |
"\nYou:", | |
add_special_tokens=False, | |
return_tensors="pt", | |
).input_ids.to("cuda"), | |
starting_idx=tokenized_items.input_ids.shape[-1]) | |
]) | |
logits = model.generate(stopping_criteria=stopping_criteria_list, | |
**tokenized_items, | |
**kwargs) | |
output = tokenizer.decode(logits[0], skip_special_tokens=True) | |
logger.debug("Before trimming, model output was: `%s`", output) | |
# Trim out the input prompt from the generated output. | |
if (idx := prompt.rfind(user_message)) != -1: | |
trimmed_output = output[idx + len(user_message) - 1:].strip() | |
logger.debug("After trimming, it became: `%s`", trimmed_output) | |
return trimmed_output | |
else: | |
raise Exception( | |
"Couldn't find user message in the model's output. What?") | |
def _build_bad_words_list_for(_model_name: str) -> t.List[str]: | |
'''Builds a list of bad words for the given model.''' | |
# NOTE(11b): This was implemented as a function because each model size | |
# seems to have it quirks at the moment, but this is a rushed implementation | |
# so I'm not handling that, hence the dumb return here. | |
return ["Persona:", "Scenario:", "<START>"] | |
class _SentinelTokenStoppingCriteria(transformers.StoppingCriteria): | |
def __init__(self, sentinel_token_ids: torch.LongTensor, | |
starting_idx: int): | |
transformers.StoppingCriteria.__init__(self) | |
self.sentinel_token_ids = sentinel_token_ids | |
self.starting_idx = starting_idx | |
def __call__(self, input_ids: torch.LongTensor, | |
_scores: torch.FloatTensor) -> bool: | |
for sample in input_ids: | |
trimmed_sample = sample[self.starting_idx:] | |
# Can't unfold, output is still too tiny. Skip. | |
if trimmed_sample.shape[-1] < self.sentinel_token_ids.shape[-1]: | |
continue | |
for window in trimmed_sample.unfold( | |
0, self.sentinel_token_ids.shape[-1], 1): | |
if torch.all(torch.eq(self.sentinel_token_ids, window)): | |
return True | |
return False | |