|
import os |
|
from threading import Thread |
|
from typing import Iterator |
|
import gradio as gr |
|
import spaces |
|
import torch |
|
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer |
|
from transformers import BitsAndBytesConfig |
|
|
|
nf4_config = BitsAndBytesConfig( |
|
load_in_8bit=True, |
|
bnb_8bit_use_double_quant=True, |
|
bnb_8bit_quant_type="nf8", |
|
) |
|
MAX_MAX_NEW_TOKENS = 2048 |
|
DEFAULT_MAX_NEW_TOKENS = 1024 |
|
total_count=0 |
|
MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) |
|
import gradio as gr |
|
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM |
|
|
|
dict_map = { |
|
"òa": "oà", |
|
"Òa": "Oà", |
|
"ÒA": "OÀ", |
|
"óa": "oá", |
|
"Óa": "Oá", |
|
"ÓA": "OÁ", |
|
"ỏa": "oả", |
|
"Ỏa": "Oả", |
|
"ỎA": "OẢ", |
|
"õa": "oã", |
|
"Õa": "Oã", |
|
"ÕA": "OÃ", |
|
"ọa": "oạ", |
|
"Ọa": "Oạ", |
|
"ỌA": "OẠ", |
|
"òe": "oè", |
|
"Òe": "Oè", |
|
"ÒE": "OÈ", |
|
"óe": "oé", |
|
"Óe": "Oé", |
|
"ÓE": "OÉ", |
|
"ỏe": "oẻ", |
|
"Ỏe": "Oẻ", |
|
"ỎE": "OẺ", |
|
"õe": "oẽ", |
|
"Õe": "Oẽ", |
|
"ÕE": "OẼ", |
|
"ọe": "oẹ", |
|
"Ọe": "Oẹ", |
|
"ỌE": "OẸ", |
|
"ùy": "uỳ", |
|
"Ùy": "Uỳ", |
|
"ÙY": "UỲ", |
|
"úy": "uý", |
|
"Úy": "Uý", |
|
"ÚY": "UÝ", |
|
"ủy": "uỷ", |
|
"Ủy": "Uỷ", |
|
"ỦY": "UỶ", |
|
"ũy": "uỹ", |
|
"Ũy": "Uỹ", |
|
"ŨY": "UỸ", |
|
"ụy": "uỵ", |
|
"Ụy": "Uỵ", |
|
"ỤY": "UỴ", |
|
} |
|
|
|
tokenizer_vi2en = AutoTokenizer.from_pretrained("vinai/vinai-translate-vi2en-v2", src_lang="vi_VN") |
|
model_vi2en = AutoModelForSeq2SeqLM.from_pretrained("vinai/vinai-translate-vi2en-v2",device_map="auto") |
|
|
|
def translate_vi2en(vi_text: str) -> str: |
|
for i, j in dict_map.items(): |
|
vi_text = vi_text.replace(i, j) |
|
input_ids = tokenizer_vi2en(vi_text, return_tensors="pt").to("cuda").input_ids |
|
output_ids = model_vi2en.generate( |
|
input_ids, |
|
decoder_start_token_id=tokenizer_vi2en.lang_code_to_id["en_XX"], |
|
num_return_sequences=1, |
|
|
|
|
|
|
|
|
|
|
|
num_beams=5, |
|
early_stopping=True |
|
) |
|
en_text = tokenizer_vi2en.batch_decode(output_ids, skip_special_tokens=True) |
|
en_text = " ".join(en_text) |
|
return en_text |
|
|
|
DESCRIPTION="""CODE""" |
|
|
|
model_id = "deepseek-ai/deepseek-coder-7b-instruct-v1.5" |
|
model = AutoModelForCausalLM.from_pretrained(model_id,device_map="auto",torch_dtype=torch.bfloat16) |
|
tokenizer=AutoTokenizer.from_pretrained(model_id) |
|
tokenizer.use_defaul_system_prompt=True |
|
os.system("nvidia-smi") |
|
|
|
@spaces.GPU |
|
def gen( |
|
message: str, |
|
chat_history: list[tuple[str, str]], |
|
system_prompt: str, |
|
max_new_tokens: int = 1024, |
|
temperature: float = 0.6, |
|
top_p: float = 0.9, |
|
top_k: int = 50, |
|
repetition_penalty: float = 1, |
|
|
|
)->Iterator[str]: |
|
global total_count |
|
total_count += 1 |
|
print(total_count) |
|
os.system("nvidia-smi") |
|
conversation = [] |
|
message = translate_vi2en(message) |
|
if system_prompt: |
|
conversation.append({"role": "system", "content": system_prompt}) |
|
for user, assistant in chat_history: |
|
conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]) |
|
conversation.append({"role": "user", "content": message}) |
|
|
|
input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt") |
|
if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: |
|
input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] |
|
gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") |
|
input_ids = input_ids.to(model.device) |
|
|
|
streamer = TextIteratorStreamer(tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True) |
|
generate_kwargs = dict( |
|
{"input_ids": input_ids}, |
|
streamer=streamer, |
|
max_new_tokens=max_new_tokens, |
|
do_sample=False, |
|
top_p=top_p, |
|
top_k=top_k, |
|
num_beams=1, |
|
|
|
repetition_penalty=repetition_penalty, |
|
eos_token_id=32021 |
|
) |
|
t = Thread(target=model.generate, kwargs=generate_kwargs) |
|
t.start() |
|
|
|
outputs = [] |
|
for text in streamer: |
|
outputs.append(text) |
|
yield "".join(outputs).replace("<|EOT|>","") |
|
|
|
|
|
chat_interface = gr.ChatInterface( |
|
fn=gen, |
|
additional_inputs=[ |
|
gr.Textbox(label="System prompt", lines=6), |
|
gr.Slider( |
|
label="Max new tokens", |
|
minimum=1, |
|
maximum=MAX_MAX_NEW_TOKENS, |
|
step=1, |
|
value=DEFAULT_MAX_NEW_TOKENS, |
|
), |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gr.Slider( |
|
label="Top-p (nucleus sampling)", |
|
minimum=0.05, |
|
maximum=1.0, |
|
step=0.05, |
|
value=0.9, |
|
), |
|
gr.Slider( |
|
label="Top-k", |
|
minimum=1, |
|
maximum=1000, |
|
step=1, |
|
value=50, |
|
), |
|
gr.Slider( |
|
label="Repetition penalty", |
|
minimum=1.0, |
|
maximum=2.0, |
|
step=0.05, |
|
value=1, |
|
), |
|
], |
|
stop_btn=gr.Button("Stop"), |
|
examples=[ |
|
["implement snake game using pygame"], |
|
["Can you explain briefly to me what is the Python programming language?"], |
|
["write a program to find the factorial of a number"], |
|
], |
|
) |
|
|
|
with gr.Blocks(css="style.css") as demo: |
|
gr.Markdown(DESCRIPTION) |
|
chat_interface.render() |
|
|
|
if __name__ == "__main__": |
|
demo.queue(max_size=100).launch() |