from threading import Thread
from typing import Iterator
import torch
from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
class LlamaModel:
def __init__(self, model_id: str):
self.model_id = model_id
if torch.cuda.is_available():
config = AutoConfig.from_pretrained(model_id)
config.pretraining_tp = 1
self.model = AutoModelForCausalLM.from_pretrained(
model_id,
config=config,
torch_dtype=torch.float16,
load_in_4bit=False,
device_map='auto'
)
else:
self.model = None
self.tokenizer = AutoTokenizer.from_pretrained(model_id)
def get_prompt(self, message: str, chat_history: list[tuple[str, str]],
system_prompt: str) -> str:
texts = [f'[INST] <>\n{system_prompt}\n<>\n\n']
# The first user input is _not_ stripped
do_strip = False
for user_input, response in chat_history:
user_input = user_input.strip() if do_strip else user_input
do_strip = True
texts.append(f'{user_input} [/INST] {response.strip()} [INST] ')
message = message.strip() if do_strip else message
texts.append(f'{message} [/INST]')
return ''.join(texts)
def get_input_token_length(self, message: str, chat_history: list[tuple[str, str]], system_prompt: str) -> int:
prompt = self.get_prompt(message, chat_history, system_prompt)
input_ids = self.tokenizer([prompt], return_tensors='np', add_special_tokens=False)['input_ids']
return input_ids.shape[-1]
def run(self, message: str,
chat_history: list[tuple[str, str]],
system_prompt: str,
max_new_tokens: int = 1024,
temperature: float = 0.8,
top_p: float = 0.95,
top_k: int = 50) -> Iterator[str]:
prompt = self.get_prompt(message, chat_history, system_prompt)
inputs = self.tokenizer([prompt], return_tensors='pt', add_special_tokens=False).to('cuda')
streamer = TextIteratorStreamer(self.tokenizer,
timeout=10.,
skip_prompt=True,
skip_special_tokens=True)
generate_kwargs = dict(
inputs,
streamer=streamer,
max_new_tokens=max_new_tokens,
do_sample=True,
top_p=top_p,
top_k=top_k,
temperature=temperature,
num_beams=1,
)
t = Thread(target=self.model.generate, kwargs=generate_kwargs)
t.start()
outputs = []
for text in streamer:
outputs.append(text)
yield ''.join(outputs)