import os import re import sys import torch from llamafactory.chat import ChatModel from transformers import ( AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, TextStreamer, ) def load_model( model_name, max_seq_length=2048, dtype=torch.bfloat16, load_in_4bit=False, adapter_name_or_path=None, ): print(f"loading model: {model_name}") if adapter_name_or_path: template = "llama3" if "llama-3" in model_name.lower() else "chatml" args = dict( model_name_or_path=model_name, adapter_name_or_path=adapter_name_or_path, # load the saved LoRA adapters template=template, # same to the one in training finetuning_type="lora", # same to the one in training quantization_bit=4 if load_in_4bit else None, # load 4-bit quantized model ) chat_model = ChatModel(args) return chat_model.engine.model, chat_model.engine.tokenizer tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) bnb_config = BitsAndBytesConfig( load_in_4bit=load_in_4bit, bnb_4bit_quant_type="nf4", bnb_4bit_use_double_quant=False, bnb_4bit_compute_dtype=dtype, ) model = ( AutoModelForCausalLM.from_pretrained( model_name, quantization_config=bnb_config, torch_dtype=dtype, trust_remote_code=True, device_map="auto", ) if load_in_4bit else AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=dtype, trust_remote_code=True, device_map="auto", ) ) return model, tokenizer def test_model(model, tokenizer, prompt): inputs = tokenizer( [prompt], return_tensors="pt", ).to("cuda") text_streamer = TextStreamer(tokenizer) _ = model.generate( **inputs, max_new_tokens=2048, streamer=text_streamer, use_cache=True ) def extract_answer(text, debug=False): if text: # Remove the begin and end tokens text = re.sub( r".*?(assistant|\[/INST\]).+?\b", "", text, flags=re.DOTALL | re.MULTILINE, ) if debug: print("--------\nstep 1:", text) text = re.sub(r"<.+?>.*", "", text, flags=re.DOTALL | re.MULTILINE) if debug: print("--------\nstep 2:", text) text = re.sub( r".*?end_header_id\|>\n\n", "", text, flags=re.DOTALL | re.MULTILINE ) if debug: print("--------\nstep 3:", text) text = text.split(".")[0].strip() if debug: print("--------\nstep 4:", text) text = re.sub( r"^Response:.+?\b", "", text, flags=re.DOTALL | re.MULTILINE, ) if debug: print("--------\nstep 5:", text) return text def eval_model(model, tokenizer, eval_dataset): total = len(eval_dataset) predictions = [] for i in tqdm(range(total)): inputs = tokenizer( eval_dataset["prompt"][i : i + 1], return_tensors="pt", ).to("cuda") outputs = model.generate(**inputs, max_new_tokens=4096, use_cache=False) decoded_output = tokenizer.batch_decode(outputs) debug = i == 0 decoded_output = [ extract_answer(output, debug=debug) for output in decoded_output ] predictions.extend(decoded_output) return predictions def save_model( model, tokenizer, include_gguf=True, include_merged=True, publish=True, ): try: token = os.getenv("HF_TOKEN") or None model_name = os.getenv("MODEL_NAME") save_method = "lora" quantization_method = "q5_k_m" model_names = get_model_names( model_name, save_method=save_method, quantization_method=quantization_method ) model.save_pretrained(model_names["local"]) tokenizer.save_pretrained(model_names["local"]) if publish: model.push_to_hub( model_names["hub"], token=token, ) tokenizer.push_to_hub( model_names["hub"], token=token, ) if include_merged: model.save_pretrained_merged( model_names["local"] + "-merged", tokenizer, save_method=save_method ) if publish: model.push_to_hub_merged( model_names["hub"] + "-merged", tokenizer, save_method="lora", token="", ) if include_gguf: model.save_pretrained_gguf( model_names["local-gguf"], tokenizer, quantization_method=quantization_method, ) if publish: model.push_to_hub_gguf( model_names["hub-gguf"], tokenizer, quantization_method=quantization_method, token=token, ) except Exception as e: print(e) def print_row_details(df, indices=[0]): for index in indices: for col in df.columns: print("-" * 50) print(f"{col}: {df[col].iloc[index]}")