Spaces:
Sleeping
Sleeping
import gradio as gr | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
from einops import einsum | |
from tqdm import tqdm | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
model_name = 'microsoft/Phi-3-mini-4k-instruct' | |
model = AutoModelForCausalLM.from_pretrained( | |
model_name, | |
device_map=device, | |
torch_dtype="auto", | |
trust_remote_code=True, | |
) | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
def tokenize_instructions(tokenizer, instructions): | |
return tokenizer.apply_chat_template( | |
instructions, | |
padding=True, | |
truncation=False, | |
return_tensors="pt", | |
return_dict=True, | |
add_generation_prompt=True, | |
).input_ids | |
def find_steering_vecs(model, base_toks, target_toks, batch_size = 16): | |
''' | |
We want to find the steering vector from base_toks to target_toks (we do target_toks - base_toks) | |
Inputs: | |
:param model: the model to use | |
:param base_toks: the base tokens [len, seq_len] | |
:param target_toks: the target tokens [len, seq_len] | |
Output: | |
:return steering_vecs: the steering vectors [hidden_size] | |
''' | |
device = model.device | |
num_its = len(range(0, base_toks.shape[0], batch_size)) | |
steering_vecs = {} | |
for i in tqdm(range(0, base_toks.shape[0], batch_size)): | |
# pass through the model | |
base_out = model(base_toks[i:i+batch_size].to(device), output_hidden_states=True).hidden_states # tuple of length num_layers with each element size [batch_size, seq_len, hidden_size] | |
target_out = model(target_toks[i:i+batch_size].to(device), output_hidden_states=True).hidden_states | |
for layer in range(len(base_out)): | |
# average over the batch_size, take last token | |
if i == 0: | |
steering_vecs[layer] = torch.mean(target_out[layer][:,-1,:].detach().cpu() - base_out[layer][:,-1,:].detach().cpu(), dim=0)/num_its # [hidden_size] | |
else: | |
steering_vecs[layer] += torch.mean(target_out[layer][:,-1,:].detach().cpu() - base_out[layer][:,-1,:].detach().cpu(), dim=0)/num_its | |
return steering_vecs | |
def do_steering(model, test_toks, steering_vec, scale = 1, normalise = True, layer = None, proj=True, batch_size=16): | |
''' | |
Input: | |
:param model: the model to use | |
:param test_toks: the test tokens [len, seq_len] | |
:param steering_vec: the steering vector [hidden_size] | |
:param scale: the scale to use | |
:param layer: the layer to modify; if None: we modify all layers. | |
:param proj: whether to project the steering vector | |
Output: | |
:return output: the steered model output [len, generated_seq_len] | |
''' | |
# define a hook to modify the input into the layer | |
if steering_vec is not None: | |
def modify_activation(): | |
def hook(model, input): | |
if normalise: | |
sv = steering_vec / steering_vec.norm() | |
else: | |
sv = steering_vec | |
if proj: | |
sv = einsum(input[0], sv.view(-1,1), 'b l h, h s -> b l s') * sv | |
input[0][:,:,:] = input[0][:,:,:] - scale * sv | |
return hook | |
handles = [] | |
for i in range(len(model.model.layers)): | |
if layer is None: # append to each layer | |
handles.append(model.model.layers[i].register_forward_pre_hook(modify_activation())) | |
elif layer is not None and i == layer: | |
handles.append(model.model.layers[i].register_forward_pre_hook(modify_activation())) | |
# pass through the model | |
outs_all = [] | |
for i in tqdm(range(0, test_toks.shape[0], batch_size)): | |
outs = model.generate(test_toks[i:i+batch_size], max_new_tokens=60) # [num_samples, seq_len] | |
outs_all.append(outs) | |
outs_all = torch.cat(outs_all, dim=0) | |
# remove all hooks | |
if steering_vec is not None: | |
for handle in handles: | |
handle.remove() | |
return outs_all | |
def create_steering_vector(towards, away): | |
towards_data = [[{"role": "user", "content": text.strip()}] for text in towards.split(',')] | |
away_data = [[{"role": "user", "content": text.strip()}] for text in away.split(',')] | |
towards_toks = tokenize_instructions(tokenizer, towards_data) | |
away_toks = tokenize_instructions(tokenizer, away_data) | |
steering_vecs = find_steering_vecs(model, away_toks, towards_toks) | |
return steering_vecs | |
def chat(message, history, towards, away, layer_value): | |
steering_vec = create_steering_vector(towards, away) | |
layer = int(layer_value) | |
history_formatted = [{"role": "user", "content": message}] | |
print(f"layer {layer}") | |
print(f"steering vec {steering_vec}") | |
print(f"steering vec chosen {steering_vec[layer]}") | |
input_ids = tokenize_instructions(tokenizer, [history_formatted]) | |
generations_baseline = do_steering(model, input_ids.to(device), None) | |
for j in range(generations_baseline.shape[0]): | |
response_baseline = f"BASELINE: {tokenizer.decode(generations_baseline[j], skip_special_tokens=True)}" | |
if steering_vec is not None: | |
generation_intervene = do_steering(model, input_ids.to(device), steering_vec[layer].to(device), scale=3, layer=layer) | |
for j in range(generation_intervene.shape[0]): | |
response_intervention = f"INTERVENTION: {tokenizer.decode(generation_intervene[j], skip_special_tokens=True)}" | |
response = response_baseline | |
if steering_vec is not None: | |
response += "\n\n" + response_intervention | |
return [(message, response)] | |
def launch_app(): | |
with gr.Blocks() as demo: | |
steering_vec = gr.State(None) | |
layer = gr.State(6) | |
away_default = ['hate','i hate this', 'hating the', 'hater', 'hating', 'hated in'] | |
towards_default = ['love','i love this', 'loving the', 'lover', 'loving', 'loved in'] | |
instructions = """ | |
### Instructions for Using the Steering Chatbot | |
Welcome to the Steering Chatbot! This app allows you to explore how language models can be guided (or "steered") | |
to generate different types of responses. You will be able to create **steering vectors** that influence the chatbot to either generate responses | |
that favor one set of ideas (like "love") or avoid another set (like "hate"). | |
#### How to Use the App: | |
1. **Define Your "Towards" and "Away" Directions:** | |
- In the **"Towards"** text box, enter a list of concepts, words, or phrases (comma-separated) that you want the model to generate responses toward. | |
For example, you might use: `love, happiness, kindness`. | |
- In the **"Away"** text box, enter a list of concepts, words, or phrases that you want the model to steer away from. | |
For example: `hate, anger, sadness`. | |
2. **Create a Steering Vector:** | |
- Click the **"Create Steering Vector"** button to generate a vector that will nudge the model’s responses. | |
This vector will attempt to shift the model’s behavior towards the concepts in the "Towards" box and away from the concepts in the "Away" box. | |
- You can also adjust the **layer slider** to choose which layer of the model the steering vector will affect. | |
- make sure you have equal examples of towards & away or the app will throw an error | |
3. **Chat with the Model:** | |
- Type a message in the chatbox and press Enter. The model will generate two responses: | |
- **Baseline Response:** This is the model’s response without any steering vector applied. | |
- **Intervention Response:** This is the response after applying the steering vector. | |
4. **Compare Results:** | |
- The chatbot will show both the baseline (non-steered) and the intervention (steered) responses. | |
You can compare them to see how much influence the steering vector had on the generated text. | |
**Tips:** | |
- Try experimenting with different word sets for "Towards" and "Away" to see how it affects the chatbot's behavior. | |
- Adjusting the **layer slider** allows you to control at which stage of the model's processing the steering vector is applied, | |
which can lead to different types of modifications in the output. | |
Happy chatting! | |
""" | |
instruction_dropdown = gr.Markdown(instructions) | |
with gr.Row(): | |
towards = gr.Textbox(label="Towards (comma-separated)", value= ", ".join(sentence.replace(",", "") for sentence in towards_default)) | |
away = gr.Textbox(label="Away from (comma-separated)", value= ", ".join(sentence.replace(",", "") for sentence in away_default)) | |
with gr.Row(): | |
create_vector = gr.Button("Create Steering Vector") | |
layer_slider = gr.Slider(minimum=1, maximum=len(model.model.layers)-1, step=1, label="Layer") | |
def create_vector_and_set_layer(towards, away, layer_value): | |
vectors = create_steering_vector(towards, away) | |
layer.value = int(layer_value) | |
steering_vec.value = vectors | |
print(f"layer {layer.value}") | |
return f"Steering vector created for layer {layer_value}" | |
create_vector.click(create_vector_and_set_layer, [towards, away, layer_slider], gr.Textbox()) | |
chatbot = gr.Chatbot() | |
msg = gr.Textbox() | |
msg.submit(chat, [msg, chatbot, towards, away, layer_slider], chatbot) | |
demo.launch() | |
if __name__ == "__main__": | |
launch_app() | |
## steering vec is being generated correctly, why is it NOT passing through? |