gradiopyg / src /model.py
dorkai's picture
Upload 13 files
bddc905
raw
history blame
4.29 kB
import logging
import typing as t
import torch
import transformers
logger = logging.getLogger(__name__)
def build_model_and_tokenizer_for(
model_name: str
) -> t.Tuple[transformers.AutoModelForCausalLM, transformers.AutoTokenizer]:
'''Sets up the model and accompanying objects.'''
logger.info(f"Loading tokenizer for {model_name}")
tokenizer = transformers.AutoTokenizer.from_pretrained(model_name)
# NOTE(11b): non-OPT models support passing this in at inference time, might
# be worth refactoring for a debug version so we're able to experiment on
# the fly
bad_words_ids = [
tokenizer(bad_word, add_special_tokens=False).input_ids
for bad_word in _build_bad_words_list_for(model_name)
]
logger.info(f"Loading the {model_name} model")
model = transformers.AutoModelForCausalLM.from_pretrained(
model_name, bad_words_ids=bad_words_ids)
model.eval().half().to("cuda")
logger.info("Model and tokenizer are ready")
return model, tokenizer
def run_raw_inference(model: transformers.AutoModelForCausalLM,
tokenizer: transformers.AutoTokenizer, prompt: str,
user_message: str, **kwargs: t.Any) -> str:
'''
Runs inference on the model, and attempts to returns only the newly
generated text.
:param model: Model to perform inference with.
:param tokenizer: Tokenizer to tokenize input with.
:param prompt: Input to feed to the model.
:param user_message: The user's raw message, exactly as appended to the end
of `prompt`. Used for trimming the original input from the model output.
:return: Decoded model generation.
'''
tokenized_items = tokenizer(prompt, return_tensors="pt").to("cuda")
# Atrocious code to stop generation when the model outputs "\nYou: " in
# freshly generated text. Feel free to send in a PR if you know of a
# cleaner way to do this.
stopping_criteria_list = transformers.StoppingCriteriaList([
_SentinelTokenStoppingCriteria(
sentinel_token_ids=tokenizer(
"\nYou:",
add_special_tokens=False,
return_tensors="pt",
).input_ids.to("cuda"),
starting_idx=tokenized_items.input_ids.shape[-1])
])
logits = model.generate(stopping_criteria=stopping_criteria_list,
**tokenized_items,
**kwargs)
output = tokenizer.decode(logits[0], skip_special_tokens=True)
logger.debug("Before trimming, model output was: `%s`", output)
# Trim out the input prompt from the generated output.
if (idx := prompt.rfind(user_message)) != -1:
trimmed_output = output[idx + len(user_message) - 1:].strip()
logger.debug("After trimming, it became: `%s`", trimmed_output)
return trimmed_output
else:
raise Exception(
"Couldn't find user message in the model's output. What?")
def _build_bad_words_list_for(_model_name: str) -> t.List[str]:
'''Builds a list of bad words for the given model.'''
# NOTE(11b): This was implemented as a function because each model size
# seems to have it quirks at the moment, but this is a rushed implementation
# so I'm not handling that, hence the dumb return here.
return ["Persona:", "Scenario:", "<START>"]
class _SentinelTokenStoppingCriteria(transformers.StoppingCriteria):
def __init__(self, sentinel_token_ids: torch.LongTensor,
starting_idx: int):
transformers.StoppingCriteria.__init__(self)
self.sentinel_token_ids = sentinel_token_ids
self.starting_idx = starting_idx
def __call__(self, input_ids: torch.LongTensor,
_scores: torch.FloatTensor) -> bool:
for sample in input_ids:
trimmed_sample = sample[self.starting_idx:]
# Can't unfold, output is still too tiny. Skip.
if trimmed_sample.shape[-1] < self.sentinel_token_ids.shape[-1]:
continue
for window in trimmed_sample.unfold(
0, self.sentinel_token_ids.shape[-1], 1):
if torch.all(torch.eq(self.sentinel_token_ids, window)):
return True
return False