|
from typing import Dict |
|
from typing import List |
|
from typing import Tuple |
|
from typing import Union |
|
from pathlib import Path |
|
import gradio as gr |
|
import torch |
|
import argparse |
|
from threading import Thread |
|
from transformers import ( |
|
AutoModelForCausalLM, |
|
AutoTokenizer, |
|
TextIteratorStreamer, |
|
GenerationConfig, |
|
PreTrainedModel, |
|
PreTrainedTokenizer, |
|
PreTrainedTokenizerFast, |
|
) |
|
import warnings |
|
import spaces |
|
import os |
|
|
|
warnings.filterwarnings('ignore', category=UserWarning, message='TypedStorage is deprecated') |
|
|
|
MODEL_PATH = os.environ.get('MODEL_PATH', 'IndexTeam/Index-1.9B-Chat') |
|
TOKENIZER_PATH = os.environ.get("TOKENIZER_PATH", MODEL_PATH) |
|
|
|
tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_PATH, trust_remote_code=True) |
|
model = AutoModelForCausalLM.from_pretrained(MODEL_PATH, |
|
torch_dtype=torch.bfloat16, |
|
device_map="auto", |
|
trust_remote_code=True) |
|
|
|
def _resolve_path(path: Union[str, Path]) -> Path: |
|
return Path(path).expanduser().resolve() |
|
|
|
@spaces.GPU |
|
def hf_gen(dialog: List, top_k, top_p, temperature, repetition_penalty, max_dec_len): |
|
"""generate model output with huggingface api |
|
Args: |
|
query (str): actual model input. |
|
top_p (float): only the smallest set of most probable tokens with probabilities that add up to top_p or higher are kept for generation. |
|
temperature (float): Strictly positive float value used to modulate the logits distribution. |
|
max_dec_len (int): The maximum numbers of tokens to generate. |
|
Yields: |
|
str: real-time generation results of hf model |
|
""" |
|
inputs = tokenizer.apply_chat_template(dialog, tokenize=False, add_generation_prompt=False) |
|
enc = tokenizer(inputs, return_tensors="pt").to("cuda") |
|
streamer = TextIteratorStreamer(tokenizer, **tokenizer.init_kwargs) |
|
generation_kwargs = dict( |
|
enc, |
|
do_sample=True, |
|
top_k=int(top_k), |
|
top_p=float(top_p), |
|
temperature=float(temperature), |
|
repetition_penalty=float(repetition_penalty), |
|
max_new_tokens=int(max_dec_len), |
|
pad_token_id=tokenizer.eos_token_id, |
|
streamer=streamer, |
|
) |
|
thread = Thread(target=model.generate, kwargs=generation_kwargs) |
|
thread.start() |
|
answer = "" |
|
for new_text in streamer: |
|
answer += new_text |
|
yield answer[len(inputs):] |
|
|
|
@spaces.GPU |
|
def generate(chat_history: List, query, top_k, top_p, temperature, repetition_penalty, max_dec_len, system_message): |
|
"""generate after hitting "submit" button |
|
Args: |
|
chat_history (List): [[q_1, a_1], [q_2, a_2], ..., [q_n, a_n]]. list that stores all QA records |
|
query (str): query of current round |
|
top_p (float): only the smallest set of most probable tokens with probabilities that add up to top_p or higher are kept for generation. |
|
temperature (float): strictly positive float value used to modulate the logits distribution. |
|
max_dec_len (int): The maximum numbers of tokens to generate. |
|
Yields: |
|
List: [[q_1, a_1], [q_2, a_2], ..., [q_n, a_n], [q_n+1, a_n+1]]. chat_history + QA of current round. |
|
""" |
|
assert query != "", "Input must not be empty!!!" |
|
|
|
model_input = [] |
|
if system_message: |
|
model_input.append({ |
|
"role": "system", |
|
"content": system_message |
|
}) |
|
for q, a in chat_history: |
|
model_input.append({"role": "user", "content": q}) |
|
model_input.append({"role": "assistant", "content": a}) |
|
model_input.append({"role": "user", "content": query}) |
|
|
|
chat_history.append([query, ""]) |
|
for answer in hf_gen(model_input, top_k, top_p, temperature, repetition_penalty, max_dec_len): |
|
|
|
chat_history[-1][1] = answer.strip(tokenizer.eos_token) |
|
yield gr.update(value=""), chat_history |
|
|
|
@spaces.GPU |
|
def regenerate(chat_history: List, top_k, top_p, temperature, repetition_penalty, max_dec_len, system_message): |
|
"""re-generate the answer of last round's query |
|
Args: |
|
chat_history (List): [[q_1, a_1], [q_2, a_2], ..., [q_n, a_n]]. list that stores all QA records |
|
top_p (float): only the smallest set of most probable tokens with probabilities that add up to top_p or higher are kept for generation. |
|
temperature (float): strictly positive float value used to modulate the logits distribution. |
|
max_dec_len (int): The maximum numbers of tokens to generate. |
|
Yields: |
|
List: [[q_1, a_1], [q_2, a_2], ..., [q_n, a_n]]. chat_history |
|
""" |
|
assert len(chat_history) >= 1, "History is empty. Nothing to regenerate!!" |
|
|
|
model_input = [] |
|
if system_message: |
|
model_input.append({ |
|
"role": "system", |
|
"content": system_message |
|
}) |
|
for q, a in chat_history[:-1]: |
|
model_input.append({"role": "user", "content": q}) |
|
model_input.append({"role": "assistant", "content": a}) |
|
model_input.append({"role": "user", "content": chat_history[-1][0]}) |
|
|
|
for answer in hf_gen(model_input, top_k, top_p, temperature, repetition_penalty, max_dec_len): |
|
|
|
chat_history[-1][1] = answer.strip(tokenizer.eos_token) |
|
yield gr.update(value=""), chat_history |
|
|
|
|
|
def clear_history(): |
|
"""clear all chat history |
|
Returns: |
|
List: empty chat history |
|
""" |
|
torch.cuda.empty_cache() |
|
return [] |
|
|
|
|
|
def reverse_last_round(chat_history): |
|
"""reverse last round QA and keep the chat history before |
|
Args: |
|
chat_history (List): [[q_1, a_1], [q_2, a_2], ..., [q_n, a_n]]. list that stores all QA records |
|
Returns: |
|
List: [[q_1, a_1], [q_2, a_2], ..., [q_n-1, a_n-1]]. chat_history without last round. |
|
""" |
|
assert len(chat_history) >= 1, "History is empty. Nothing to reverse!!" |
|
return chat_history[:-1] |
|
|
|
|
|
with gr.Blocks(theme="soft") as demo: |
|
gr.Markdown("""# Index-1.9B Gradio Demo""") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
top_k = gr.Slider(1, 10, value=5, step=1, label="top_k") |
|
top_p = gr.Slider(0, 1, value=0.8, step=0.1, label="top_p") |
|
temperature = gr.Slider(0.1, 2.0, value=0.3, step=0.1, label="temperature") |
|
repetition_penalty = gr.Slider(0.1, 2.0, value=1.1, step=0.1, label="repetition_penalty") |
|
max_dec_len = gr.Slider(1, 4096, value=1024, step=1, label="max_dec_len") |
|
with gr.Row(): |
|
system_message = gr.Textbox(label="System Message", placeholder="Input your system message", value="你是由哔哩哔哩自主研发的大语言模型,名为“Index”。你能够根据用户传入的信息,帮助用户完成指定的任务,并生成恰当的、符合要求的回复。") |
|
with gr.Column(scale=10): |
|
chatbot = gr.Chatbot(bubble_full_width=False, height=500, label='Index-1.9B') |
|
user_input = gr.Textbox(label="User", placeholder="Input your query here!", lines=8) |
|
with gr.Row(): |
|
submit = gr.Button("🚀 Submit") |
|
clear = gr.Button("🧹 Clear") |
|
regen = gr.Button("🔄 Regenerate") |
|
reverse = gr.Button("⬅️ Reverse") |
|
|
|
submit.click(generate, inputs=[chatbot, user_input, top_k, top_p, temperature, repetition_penalty, max_dec_len, system_message], |
|
outputs=[user_input, chatbot]) |
|
regen.click(regenerate, inputs=[chatbot, top_k, top_p, temperature, repetition_penalty, max_dec_len, system_message], |
|
outputs=[user_input, chatbot]) |
|
clear.click(clear_history, inputs=[], outputs=[chatbot]) |
|
reverse.click(reverse_last_round, inputs=[chatbot], outputs=[chatbot]) |
|
|
|
demo.queue().launch() |