llama3.1 / utils.py
sarjunshankar's picture
added code for llama_3.1 chat
b098452
raw
history blame
5.12 kB
import gradio as gr
import os
import torch
from transformers import AutoConfig, AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer
from threading import Thread
from accelerate import init_empty_weights, infer_auto_device_map, disk_offload
# Set environment variables
HF_TOKEN = os.getenv("HF_TOKEN")
DESCRIPTION = '''
<div>
<h1 style="text-align: center;">ContenteaseAI custom trained model</h1>
</div>
'''
LICENSE = """
<p/>
---
For more information, visit our [website](https://contentease.ai).
"""
PLACEHOLDER = """
<div style="padding: 30px; text-align: center; display: flex; flex-direction: column; align-items: center;">
<h1 style="font-size: 28px; margin-bottom: 2px; opacity: 0.55;">ContenteaseAI Custom AI trained model</h1>
<p style="font-size: 18px; margin-bottom: 2px; opacity: 0.65;">Enter the text extracted from the PDF:</p>
</div>
"""
css = """
h1 {
text-align: center;
display: block;
}
"""
def initialize_model(model_name, max_memory=None):
device = torch.device('cpu')
# Load model configuration
config = AutoConfig.from_pretrained(model_name)
with init_empty_weights():
# Initialize model with empty weights
model = AutoModelForCausalLM.from_config(config)
# Create device map based on memory constraints
device_map = infer_auto_device_map(
model, max_memory=max_memory, no_split_module_classes=["GPTNeoXLayer"], dtype="float16"
)
# Determine if offloading is needed
needs_offloading = any(device == 'disk' for device in device_map.values())
if needs_offloading:
# Load model for offloading
model = AutoModelForCausalLM.from_pretrained(
model_name, device_map=device_map, offload_folder="offload",
offload_state_dict=True, torch_dtype=torch.float16
)
offload_directory = "offload/"
# Offload model to disk
disk_offload(model=model, offload_dir=offload_directory)
else:
# Load model normally to specified device
model = AutoModelForCausalLM.from_pretrained(
model_name, torch_dtype=torch.float16
)
model.to(device)
return model
try:
# Initialize the model and tokenizer
model_name = "meta-llama/Meta-Llama-3.1-8B-Instruct"
model = initialize_model(model_name, max_memory={"cpu": "GiB"})
tokenizer = AutoTokenizer.from_pretrained(model_name, use_auth_token=HF_TOKEN)
except Exception as e:
print(f"Error initializing model: {e}")
exit(1)
terminators = [
tokenizer.eos_token_id,
tokenizer.convert_tokens_to_ids("")
]
def chat_llama3_8b(message: str, history: list, temperature: float, max_new_tokens: int) -> str:
"""
Generate a streaming response using the llama3-8b model.
Args:
message (str): The input message.
history (list): The conversation history used by ChatInterface.
temperature (float): The temperature for generating the response.
max_new_tokens (int): The maximum number of new tokens to generate.
Returns:
str: The generated response.
"""
conversation = []
message += " Extract all relevant keywords and add quantity from the following text and format the result in nested JSON:"
for user, assistant in history:
conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}])
conversation.append({"role": "user", "content": message})
input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt").to(model.device)
streamer = TextIteratorStreamer(tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True)
generate_kwargs = dict(
input_ids=input_ids,
streamer=streamer,
max_new_tokens=max_new_tokens,
do_sample=True,
temperature=temperature,
eos_token_id=terminators,
)
if temperature == 0:
generate_kwargs['do_sample'] = False
t = Thread(target=model.generate, kwargs=generate_kwargs)
t.start()
outputs = []
for text in streamer:
outputs.append(text)
yield "".join(outputs)
# Gradio block
chatbot = gr.Chatbot(height=450, placeholder=PLACEHOLDER, label='Gradio ChatInterface')
with gr.Blocks(fill_height=True, css=css) as demo:
gr.Markdown(DESCRIPTION)
gr.ChatInterface(
fn=chat_llama3_8b,
chatbot=chatbot,
fill_height=True,
additional_inputs_accordion=gr.Accordion(label="⚙️ Parameters", open=False, render=False),
additional_inputs=[
gr.Slider(
minimum=0,
maximum=1,
step=0.1,
value=0.95,
label="Temperature",
render=False
),
gr.Slider(
minimum=128,
maximum=9012,
step=1,
value=512,
label="Max new tokens",
render=False
),
]
)
gr.Markdown(LICENSE)
if __name__ == "__main__":
demo.launch(server_port=8000, share=True)