|
import gradio as gr |
|
import torch |
|
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline, StoppingCriteria, StoppingCriteriaList |
|
import time |
|
import numpy as np |
|
from torch.nn import functional as F |
|
|
|
|
|
m = AutoModelForCausalLM.from_pretrained("/mnt/nvme/home/dakota/ckpts/stablelm/7B-sft-combined/checkpoint-8000", torch_dtype=torch.float16).cuda() |
|
tok = AutoTokenizer.from_pretrained("/mnt/nvme/home/dakota/stablelm_tokenizer") |
|
generator = pipeline('text-generation', model=m, tokenizer=tok, device=0) |
|
|
|
|
|
start_message = """<|SYSTEM|># StableAssistant |
|
- StableAssistant is A helpful and harmless Open Source AI Language Model developed by Stability and CarperAI. |
|
- StableAssistant is excited to be able to help the user, but will refuse to do anything that could be considered harmful to the user. |
|
- StableAssistant is more than just an information source, StableAssistant is also able to write poetry, short stories, and make jokes. |
|
- StableAssistant will refuse to participate in anything that could harm a human.""" |
|
|
|
|
|
class StopOnTokens(StoppingCriteria): |
|
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool: |
|
stop_ids = [50278, 50279, 50277, 1, 0] |
|
for stop_id in stop_ids: |
|
if input_ids[0][-1] == stop_id: |
|
return True |
|
return False |
|
|
|
|
|
def contrastive_generate(text, bad_text): |
|
with torch.no_grad(): |
|
tokens = tok(text, return_tensors="pt")['input_ids'].cuda()[:,:4096-1024] |
|
bad_tokens = tok(bad_text, return_tensors="pt")['input_ids'].cuda()[:,:4096-1024] |
|
history = None |
|
bad_history = None |
|
curr_output = list() |
|
for i in range(1024): |
|
out = m(tokens, past_key_values=history, use_cache=True) |
|
logits = out.logits |
|
history = out.past_key_values |
|
bad_out = m(bad_tokens, past_key_values=bad_history, use_cache=True) |
|
bad_logits = bad_out.logits |
|
bad_history = bad_out.past_key_values |
|
probs = F.softmax(logits.float(), dim=-1)[0][-1].cpu() |
|
bad_probs = F.softmax(bad_logits.float(), dim=-1)[0][-1].cpu() |
|
logits = torch.log(probs) |
|
bad_logits = torch.log(bad_probs) |
|
logits[probs > 0.1] = logits[probs > 0.1] - bad_logits[probs > 0.1] |
|
probs = F.softmax(logits) |
|
out = int(torch.multinomial(probs, 1)) |
|
if out in [50278, 50279, 50277, 1, 0]: |
|
break |
|
else: |
|
curr_output.append(out) |
|
out = np.array([out]) |
|
tokens = torch.from_numpy(np.array([out])).to( |
|
tokens.device) |
|
bad_tokens = torch.from_numpy(np.array([out])).to( |
|
tokens.device) |
|
return tok.decode(curr_output) |
|
|
|
def generate(text, bad_text=None): |
|
stop = StopOnTokens() |
|
result = generator(text, max_new_tokens=1024, num_return_sequences=1, num_beams=1, do_sample=True, temperature=1.0, top_p=0.95, top_k=1000, stopping_criteria=StoppingCriteriaList([stop])) |
|
return result[0]["generated_text"].replace(text, "") |
|
|
|
|
|
def user(user_message, history): |
|
return "", history + [[user_message, ""]] |
|
|
|
|
|
def bot(history, curr_system_message): |
|
messages = curr_system_message + "".join(["".join(["<|USER|>"+item[0], "<|ASSISTANT|>"+item[1]]) for item in history]) |
|
output = generate(messages) |
|
history[-1][1] = output |
|
time.sleep(1) |
|
return history |
|
|
|
|
|
def system_update(msg): |
|
global curr_system_message |
|
curr_system_message = msg |
|
|
|
|
|
with gr.Blocks() as demo: |
|
with gr.Row(): |
|
with gr.Column(): |
|
chatbot = gr.Chatbot([]) |
|
clear = gr.Button("Clear") |
|
with gr.Column(): |
|
system_msg = gr.Textbox(start_message, label="System Message", interactive=True) |
|
msg = gr.Textbox(label="Chat Message") |
|
|
|
msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then( |
|
bot, [chatbot, system_msg], chatbot |
|
) |
|
system_msg.change(system_update, system_msg, None, queue=False) |
|
clear.click(lambda: None, None, chatbot, queue=False) |
|
demo.launch(share=True) |