Spaces:
Runtime error
Runtime error
# import packages | |
import shutil | |
import os | |
__import__('pysqlite3') | |
import sys | |
sys.modules['sqlite3'] = sys.modules.pop('pysqlite3') | |
from sentence_transformers import SentenceTransformer | |
import chromadb | |
from datasets import load_dataset | |
# from transformers import AutoModelForCausalLM, AutoTokenizer | |
import gradio as gr | |
from mistral_inference.model import Transformer | |
from gpt4all import GPT4All | |
from pathlib import Path | |
# Function to clear the cache | |
def clear_cache(model_name): | |
cache_dir = os.path.expanduser(f'~/.cache/torch/sentence_transformers/{model_name.replace("/", "_")}') | |
if os.path.exists(cache_dir): | |
shutil.rmtree(cache_dir) | |
print(f"Cleared cache directory: {cache_dir}") | |
else: | |
print(f"No cache directory found for: {cache_dir}") | |
# Embedding vector | |
class VectorStore: | |
def __init__(self, collection_name): | |
# Initialize the embedding model | |
# Initialize the embedding model with try-except block for better error handling | |
try: | |
self.embedding_model = SentenceTransformer('sentence-transformers/multi-qa-MiniLM-L6-cos-v1') | |
except Exception as e: | |
print(f"Error loading model: {e}") | |
raise | |
self.chroma_client = chromadb.Client() | |
self.collection = self.chroma_client.create_collection(name=collection_name) | |
# Method to populate the vector store with embeddings from a dataset | |
def populate_vectors(self, dataset, batch_size=100): | |
# Use dataset streaming | |
dataset = load_dataset('Thefoodprocessor/recipe_new_with_features_full', split='train[:1500]') | |
# Process in batches | |
texts = [] | |
for i, example in enumerate(dataset): | |
title = example['title_cleaned'] | |
recipe = example['recipe_new'] | |
meal_type = example['meal_type'] | |
allergy = example['allergy_type'] | |
ingredients_alternative = example['ingredients_alternatives'] | |
# Concatenate the text from the columns | |
text = f"{title} {recipe} {meal_type} {allergy} {ingredients_alternative}" | |
texts.append(text) | |
# Process the batch | |
if (i + 1) % batch_size == 0: | |
self._process_batch(texts, i) | |
texts = [] | |
# Process the remaining texts | |
if texts: | |
self._process_batch(texts, i) | |
def _process_batch(self, texts, batch_start_idx): | |
embeddings = self.embedding_model.encode(texts, batch_size=len(texts)).tolist() | |
for j, embedding in enumerate(embeddings): | |
self.collection.add(embeddings=[embedding], documents=[texts[j]], ids=[str(batch_start_idx + j)]) | |
def search_context(self, query, n_results=1): | |
query_embeddings = self.embedding_model.encode(query).tolist() | |
return self.collection.query(query_embeddings=query_embeddings, n_results=n_results) | |
# create a vector embedding | |
vector_store = VectorStore("embedding_vector") | |
vector_store.populate_vectors(dataset=None) | |
# Load the model and tokenizer | |
# text generation model | |
# model_name = "meta-llama/Meta-Llama-3-8B" | |
# tokenizer = AutoTokenizer.from_pretrained(model_name) | |
# model = AutoModelForCausalLM.from_pretrained(model_name) | |
# load model orca-mini general purpose model | |
# tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-Instruct-v0.3") | |
# model = AutoModelForCausalLM.from_pretrained("mistralai/Mistral-7B-Instruct-v0.3") | |
model_name = 'Meta-Llama-3-8B-Instruct.Q4_0.gguf' # .gguf represents quantized model | |
model_path = "gpt4all" | |
# add path to download load the model locally, download once and load for subsequent inference | |
model = GPT4All(model_name=model_name, model_path=model_path,device="cuda") | |
# Define the chatbot response function | |
def chatbot_response(user_input): | |
global conversation_history | |
results = vector_store.search_context(user_input, n_results=1) | |
context = results['documents'][0] if results['documents'] else "" | |
conversation_history.append(f"User: {user_input}\nContext: {context[:150]}\nBot:") | |
inputs = tokenizer("\n".join(conversation_history), return_tensors="pt") | |
outputs = model.generate(**inputs, max_length=150, do_sample=True, temperature=0.7) | |
response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
conversation_history.append(response) | |
return response | |
# Gradio interface | |
def chat(user_input): | |
response = chatbot_response(user_input) | |
return response | |
css = ".gradio-container {background: url(https://upload.wikimedia.org/wikipedia/commons/f/f5/Spring_Kitchen_Line-Up_%28Unsplash%29.jpg)}" | |
iface = gr.Interface(fn=chat, inputs="text", outputs="text",css=css) | |
iface.launch() | |