Spaces:
Sleeping
Sleeping
import gradio as gr | |
from transformers.generation.stopping_criteria import StoppingCriteria, StoppingCriteriaList | |
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig | |
from peft import PeftModel | |
import torch | |
import pinecone | |
from sentence_transformers import SentenceTransformer | |
from tqdm import tqdm | |
from sentence_transformers.cross_encoder import CrossEncoder | |
import numpy as np | |
from torch import nn | |
import os | |
# Set up semantic search | |
PINECONE_API_KEY = os.environ.get('PINECONE_API_KEY') | |
def get_embedding(text): | |
embed_text = sentencetransformer_model.encode(text) | |
vector_text = embed_text.tolist() | |
return vector_text | |
def query_from_pinecone(query, top_k=3): | |
# get embedding from THE SAME embedder as the documents | |
query_embedding = get_embedding(query) | |
return index.query( | |
vector=query_embedding, | |
top_k=top_k, | |
include_metadata=True # gets the metadata (dates, text, etc) | |
).get('matches') | |
def get_results_from_pinecone(query, top_k=3, re_rank=True, verbose=True): | |
results_from_pinecone = query_from_pinecone(query, top_k=top_k) | |
if not results_from_pinecone: | |
return [] | |
if verbose: | |
print("Query:", query) | |
final_results = [] | |
if re_rank: | |
if verbose: | |
print('Document ID (Hash)\t\tRetrieval Score\tCE Score\tText') | |
sentence_combinations = [[query, result_from_pinecone['metadata']['text']] for result_from_pinecone in results_from_pinecone] | |
# Compute the similarity scores for these combinations | |
similarity_scores = cross_encoder.predict(sentence_combinations, activation_fct=nn.Sigmoid()) | |
# Sort the scores in decreasing order | |
sim_scores_argsort = reversed(np.argsort(similarity_scores)) | |
# Print the scores | |
for idx in sim_scores_argsort: | |
result_from_pinecone = results_from_pinecone[idx] | |
final_results.append(result_from_pinecone) | |
if verbose: | |
print(f"{result_from_pinecone['id']}\t{result_from_pinecone['score']:.2f}\t{similarity_scores[idx]:.2f}\t{result_from_pinecone['metadata']['text'][:50]}") | |
return final_results | |
if verbose: | |
print('Document ID (Hash)\t\tRetrieval Score\tText') | |
for result_from_pinecone in results_from_pinecone: | |
final_results.append(result_from_pinecone) | |
if verbose: | |
print(f"{result_from_pinecone['id']}\t{result_from_pinecone['score']:.2f}\t{result_from_pinecone['metadata']['text'][:50]}") | |
return final_results | |
def semantic_search(prompt): | |
final_results = get_results_from_pinecone(prompt, top_k=9, re_rank=True, verbose=True) | |
return '\n\n'.join(res['metadata']['text'].strip() for res in final_results[:3]) | |
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-12-v2') | |
sentencetransformer_model = SentenceTransformer('sentence-transformers/multi-qa-mpnet-base-cos-v1') | |
pinecone_key = PINECONE_API_KEY | |
INDEX_NAME = 'k8s-semantic-search' | |
NAMESPACE = 'default' | |
pinecone.init(api_key=pinecone_key, environment="gcp-starter") | |
if not INDEX_NAME in pinecone.list_indexes(): | |
pinecone.create_index( | |
INDEX_NAME, # The name of the index | |
dimension=768, # The dimensionality of the vectors | |
metric='cosine', # The similarity metric to use when searching the index | |
pod_type='starter' # The type of Pinecone pod | |
) | |
index = pinecone.Index(INDEX_NAME) | |
# Set up mistral model | |
base_model_id = 'mistralai/Mistral-7B-Instruct-v0.1' | |
lora_model_id = 'ComponentSoft/mistral-kubectl-instruct' | |
tokenizer = AutoTokenizer.from_pretrained( | |
lora_model_id, | |
padding_side="left", | |
add_eos_token=False, | |
add_bos_token=True, | |
) | |
tokenizer.pad_token = tokenizer.eos_token | |
bnb_config = BitsAndBytesConfig( | |
load_in_4bit=True, | |
bnb_4bit_use_double_quant=True, | |
bnb_4bit_quant_type="nf4", | |
bnb_4bit_compute_dtype=torch.bfloat16 | |
) | |
base_model = AutoModelForCausalLM.from_pretrained( | |
base_model_id, | |
quantization_config=bnb_config, | |
use_cache=True, | |
trust_remote_code=True, | |
) | |
model = PeftModel.from_pretrained(base_model, lora_model_id) | |
model.eval() | |
stop_terms=["</s>", "#End"] | |
eos_token_ids_custom = [torch.tensor(tokenizer.encode(term, add_special_tokens=False)).to("cuda") for term in stop_terms] | |
category_terms=["</s>", "\n"] | |
category_eos_token_ids_custom = [torch.tensor(tokenizer.encode(term, add_special_tokens=False)).to("cuda") for term in category_terms] | |
class EvalStopCriterion(StoppingCriteria): | |
def __call__(self, input_ids: torch.LongTensor, score: torch.FloatTensor, **kwargs): | |
return any(torch.equal(e, input_ids[0][-len(e):]) for e in eos_token_ids_custom) | |
class CategoryStopCriterion(StoppingCriteria): | |
def __call__(self, input_ids: torch.LongTensor, score: torch.FloatTensor, **kwargs): | |
return any(torch.equal(e, input_ids[0][-len(e):]) for e in category_eos_token_ids_custom) | |
start_template = '### Answer:' | |
command_template = '# Command:' | |
end_template = '#End' | |
def text_to_text_generation(prompt): | |
prompt = prompt.strip() | |
'' | |
is_kubectl_prompt = ( | |
f"[INST] You are a helpful assistant who classifies prompts into three categories. Respond with 0 if it pertains to a 'kubectl' operation. This is an instruction that can be answered with a 'kubectl' action. Look for keywords like 'get', 'list', 'create', 'show', 'view', and other command-like words. This category is an instruction instead of a question. Respond with 1 only if the prompt is a question, and is about a definition related to Kubernetes, or non-action inquiries. Respond with 2 every other scenario, for example if the question is a general question, not related to Kubernetes or 'kubectl'.\n" | |
f"So for instance the following:\n" | |
f"List all pods in Kubernetes\n" | |
f"Would get a response:\n" | |
f"0 [/INST]" | |
f'text: "{prompt}"' | |
f'response (0/1/2): ' | |
) | |
model_input = tokenizer(is_kubectl_prompt, return_tensors="pt").to("cuda") | |
with torch.no_grad(): | |
response = tokenizer.decode(model.generate(**model_input, max_new_tokens=8, pad_token_id=tokenizer.eos_token_id, repetition_penalty=1.15, stopping_criteria=StoppingCriteriaList([CategoryStopCriterion()]))[0], skip_special_tokens=True) | |
response = response[len(is_kubectl_prompt):] | |
print('-----------------------------QUERY START-----------------------------') | |
print('Prompt: ' + prompt) | |
print('Classified as: ' + response) | |
response_num = 2 # Default to generic question | |
if '0' in response: | |
response_num = 0 | |
elif '1' in response: | |
response_num = 1 | |
# Check if general question | |
if response_num == 0: | |
prompt = f'[INST] {prompt}\n Lets think step by step. [/INST] {start_template}' | |
elif response_num == 1: | |
retrieved_results = semantic_search(prompt) | |
print('Query:') | |
print(f'[INST] You are an assistant who summarizes results retrieved from a book about Kubernetes. This summary should answer the question. If the answer is not in the retrieved results, use your general knowledge. [/INST] Question: {prompt}\nRetrieved results:\n{retrieved_results}\nResponse:') | |
prompt = f'[INST] You are an assistant who summarizes results retrieved from a book about Kubernetes. This summary should answer the question. If the answer is not in the retrieved results, use your general knowledge. [/INST] Question: {prompt}\nRetrieved results:\n{retrieved_results}\nResponse:' | |
else: | |
prompt = f'[INST] {prompt} [/INST]' | |
# Generate output | |
model_input = tokenizer(prompt, return_tensors="pt").to("cuda") | |
with torch.no_grad(): | |
response = tokenizer.decode(model.generate(**model_input, max_new_tokens=256, pad_token_id=tokenizer.eos_token_id, repetition_penalty=1.15, stopping_criteria=StoppingCriteriaList([EvalStopCriterion()]))[0], skip_special_tokens=True) | |
# Get the relevalt parts | |
start = response.index(start_template) + len(start_template) if start_template in response else len(prompt) | |
start = response.index(command_template) + len(command_template) if command_template in response else start | |
end = response.index(end_template) if end_template in response else len(response) | |
true_response = response[start:end].strip() | |
print('Returned: ' + true_response) | |
print('------------------------------QUERY END------------------------------') | |
return true_response | |
iface = gr.Interface(fn=text_to_text_generation, inputs="text", outputs="text") | |
iface.launch() |