File size: 4,999 Bytes
c7c98aa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import gradio as gr
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline, StoppingCriteria, StoppingCriteriaList
import time
import numpy as np
from torch.nn import functional as F
import os
# auth_key = os.environ["HF_ACCESS_TOKEN"]
print(f"Starting to load the model to memory")
m = AutoModelForCausalLM.from_pretrained(
    "stabilityai/stablelm-tuned-alpha-7b", torch_dtype=torch.float16).cuda()
tok = AutoTokenizer.from_pretrained("stabilityai/stablelm-tuned-alpha-7b")
generator = pipeline('text-generation', model=m, tokenizer=tok, device=0)
print(f"Sucessfully loaded the model to the memory")

start_message = """<|SYSTEM|># StableAssistant
- StableAssistant is A helpful and harmless Open Source AI Language Model developed by Stability and CarperAI.
- StableAssistant is excited to be able to help the user, but will refuse to do anything that could be considered harmful to the user.
- StableAssistant is more than just an information source, StableAssistant is also able to write poetry, short stories, and make jokes.
- StableAssistant will refuse to participate in anything that could harm a human."""


class StopOnTokens(StoppingCriteria):
    def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool:
        stop_ids = [50278, 50279, 50277, 1, 0]
        for stop_id in stop_ids:
            if input_ids[0][-1] == stop_id:
                return True
        return False


def contrastive_generate(text, bad_text):
    with torch.no_grad():
        tokens = tok(text, return_tensors="pt")[
            'input_ids'].cuda()[:, :4096-1024]
        bad_tokens = tok(bad_text, return_tensors="pt")[
            'input_ids'].cuda()[:, :4096-1024]
        history = None
        bad_history = None
        curr_output = list()
        for i in range(1024):
            out = m(tokens, past_key_values=history, use_cache=True)
            logits = out.logits
            history = out.past_key_values
            bad_out = m(bad_tokens, past_key_values=bad_history,
                        use_cache=True)
            bad_logits = bad_out.logits
            bad_history = bad_out.past_key_values
            probs = F.softmax(logits.float(), dim=-1)[0][-1].cpu()
            bad_probs = F.softmax(bad_logits.float(), dim=-1)[0][-1].cpu()
            logits = torch.log(probs)
            bad_logits = torch.log(bad_probs)
            logits[probs > 0.1] = logits[probs > 0.1] - bad_logits[probs > 0.1]
            probs = F.softmax(logits)
            out = int(torch.multinomial(probs, 1))
            if out in [50278, 50279, 50277, 1, 0]:
                break
            else:
                curr_output.append(out)
            out = np.array([out])
            tokens = torch.from_numpy(np.array([out])).to(
                tokens.device)
            bad_tokens = torch.from_numpy(np.array([out])).to(
                tokens.device)
        return tok.decode(curr_output)


def generate(text, bad_text=None):
    stop = StopOnTokens()
    result = generator(text, max_new_tokens=1024, num_return_sequences=1, num_beams=1, do_sample=True,
                       temperature=1.0, top_p=0.95, top_k=1000, stopping_criteria=StoppingCriteriaList([stop]))
    return result[0]["generated_text"].replace(text, "")


def user(user_message, history):
    history = history + [[user_message, ""]]
    return "", history, history


def bot(history, curr_system_message):
    messages = curr_system_message + \
        "".join(["".join(["<|USER|>"+item[0], "<|ASSISTANT|>"+item[1]])
                for item in history])
    output = generate(messages)
    history[-1][1] = output
    time.sleep(1)
    return history, history


with gr.Blocks() as demo:
    history = gr.State([])
    gr.Markdown("## StableLM-Tuned-Alpha-7b Chat")
    gr.HTML('''<center><a href="https://huggingface.co/spaces/stabilityai/stablelm-tuned-alpha-chat?duplicate=true"><img src="https://bit.ly/3gLdBN6" alt="Duplicate Space"></a>Duplicate the Space to skip the queue and run in a private space</center>''')
    chatbot = gr.Chatbot().style(height=500)
    with gr.Row():
        with gr.Column(scale=0.70):
            msg = gr.Textbox(label="", placeholder="Chat Message Box")
        with gr.Column(scale=0.30, min_width=0):
          with gr.Row():
              submit = gr.Button("Submit")
              clear = gr.Button("Clear")    
    system_msg = gr.Textbox(
        start_message, label="System Message", interactive=False, visible=False)

    msg.submit(fn=user, inputs=[msg, history], outputs=[msg, chatbot, history], queue=False).then(
        fn=bot, inputs=[chatbot, system_msg], outputs=[chatbot, history], queue=True)
    submit.click(fn=user, inputs=[msg, history], outputs=[msg, chatbot, history], queue=False).then(
        fn=bot, inputs=[chatbot, system_msg], outputs=[chatbot, history], queue=True)
    clear.click(lambda: [None, []], None, [chatbot, history], queue=False)
demo.queue(concurrency_count=5)
demo.launch()