from threading import Thread
from typing import Iterator
class LLAMA2_WRAPPER:
def __init__(self, config: dict = {}):
self.config = config
self.model = None
self.tokenizer = None
def init_model(self):
if self.model is None:
self.model = LLAMA2_WRAPPER.create_llama2_model(
self.config,
)
if not self.config.get("llama_cpp"):
self.model.eval()
def init_tokenizer(self):
if self.tokenizer is None and not self.config.get("llama_cpp"):
self.tokenizer = LLAMA2_WRAPPER.create_llama2_tokenizer(self.config)
@classmethod
def create_llama2_model(cls, config):
model_name = config.get("model_name")
load_in_8bit = config.get("load_in_8bit", True)
load_in_4bit = config.get("load_in_4bit", False)
llama_cpp = config.get("llama_cpp", False)
if llama_cpp:
from llama_cpp import Llama
model = Llama(
model_path=model_name,
n_ctx=config.get("MAX_INPUT_TOKEN_LENGTH"),
n_batch=config.get("MAX_INPUT_TOKEN_LENGTH"),
)
elif load_in_4bit:
from auto_gptq import AutoGPTQForCausalLM
model = AutoGPTQForCausalLM.from_quantized(
model_name,
use_safetensors=True,
trust_remote_code=True,
device="cuda:0",
use_triton=False,
quantize_config=None,
)
else:
import torch
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained(
model_name,
device_map="auto",
torch_dtype=torch.float16,
load_in_8bit=load_in_8bit,
)
return model
@classmethod
def create_llama2_tokenizer(cls, config):
model_name = config.get("model_name")
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name)
return tokenizer
def get_input_token_length(
self, message: str, chat_history: list[tuple[str, str]], system_prompt: str
) -> int:
prompt = get_prompt(message, chat_history, system_prompt)
if self.config.get("llama_cpp"):
input_ids = self.model.tokenize(bytes(prompt, "utf-8"))
return len(input_ids)
else:
input_ids = self.tokenizer([prompt], return_tensors="np")["input_ids"]
return input_ids.shape[-1]
def run(
self,
message: str,
chat_history: list[tuple[str, str]],
system_prompt: str,
max_new_tokens: int = 1024,
temperature: float = 0.8,
top_p: float = 0.95,
top_k: int = 50,
) -> Iterator[str]:
prompt = get_prompt(message, chat_history, system_prompt)
if self.config.get("llama_cpp"):
inputs = self.model.tokenize(bytes(prompt, "utf-8"))
generate_kwargs = dict(
top_p=top_p,
top_k=top_k,
temp=temperature,
)
generator = self.model.generate(inputs, **generate_kwargs)
outputs = []
for token in generator:
if token == self.model.token_eos():
break
b_text = self.model.detokenize([token])
text = str(b_text, encoding="utf-8")
outputs.append(text)
yield "".join(outputs)
else:
from transformers import TextIteratorStreamer
inputs = self.tokenizer([prompt], return_tensors="pt").to("cuda")
streamer = TextIteratorStreamer(
self.tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True
)
generate_kwargs = dict(
inputs,
streamer=streamer,
max_new_tokens=max_new_tokens,
do_sample=True,
top_p=top_p,
top_k=top_k,
temperature=temperature,
num_beams=1,
)
t = Thread(target=self.model.generate, kwargs=generate_kwargs)
t.start()
outputs = []
for text in streamer:
outputs.append(text)
yield "".join(outputs)
def get_prompt(
message: str, chat_history: list[tuple[str, str]], system_prompt: str
) -> str:
texts = [f"[INST] <>\n{system_prompt}\n<>\n\n"]
for user_input, response in chat_history:
texts.append(f"{user_input.strip()} [/INST] {response.strip()} [INST] ")
texts.append(f"{message.strip()} [/INST]")
return "".join(texts)