Spaces:
Sleeping
Sleeping
# Embeddings_Create.py | |
# Description: Functions for Creating and managing Embeddings in ChromaDB with LLama.cpp/OpenAI/Transformers | |
# | |
# Imports: | |
import logging | |
import time | |
from functools import wraps | |
from threading import Lock, Timer | |
from typing import List | |
# | |
# 3rd-Party Imports: | |
import requests | |
from transformers import AutoTokenizer, AutoModel | |
import torch | |
# | |
# Local Imports: | |
from App_Function_Libraries.LLM_API_Calls import get_openai_embeddings | |
from App_Function_Libraries.Utils.Utils import load_comprehensive_config | |
# | |
####################################################################################################################### | |
# | |
# Functions: | |
# FIXME - Add all globals to summarize.py | |
loaded_config = load_comprehensive_config() | |
embedding_provider = loaded_config['Embeddings']['embedding_provider'] | |
embedding_model = loaded_config['Embeddings']['embedding_model'] | |
embedding_api_url = loaded_config['Embeddings']['embedding_api_url'] | |
embedding_api_key = loaded_config['Embeddings']['embedding_api_key'] | |
# Embedding Chunking Settings | |
chunk_size = loaded_config['Embeddings']['chunk_size'] | |
overlap = loaded_config['Embeddings']['overlap'] | |
# FIXME - Add logging | |
class HuggingFaceEmbedder: | |
def __init__(self, model_name, timeout_seconds=120): # Default timeout of 2 minutes | |
self.model_name = model_name | |
self.tokenizer = None | |
self.model = None | |
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
self.timeout_seconds = timeout_seconds | |
self.last_used_time = 0 | |
self.unload_timer = None | |
def load_model(self): | |
if self.model is None: | |
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) | |
self.model = AutoModel.from_pretrained(self.model_name) | |
self.model.to(self.device) | |
self.last_used_time = time.time() | |
self.reset_timer() | |
def unload_model(self): | |
if self.model is not None: | |
del self.model | |
del self.tokenizer | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
self.model = None | |
self.tokenizer = None | |
if self.unload_timer: | |
self.unload_timer.cancel() | |
def reset_timer(self): | |
if self.unload_timer: | |
self.unload_timer.cancel() | |
self.unload_timer = Timer(self.timeout_seconds, self.unload_model) | |
self.unload_timer.start() | |
def create_embeddings(self, texts): | |
self.load_model() | |
inputs = self.tokenizer(texts, return_tensors="pt", padding=True, truncation=True, max_length=512) | |
inputs = {k: v.to(self.device) for k, v in inputs.items()} | |
with torch.no_grad(): | |
outputs = self.model(**inputs) | |
embeddings = outputs.last_hidden_state.mean(dim=1) | |
return embeddings.cpu().numpy() | |
# Global variable to hold the embedder | |
huggingface_embedder = None | |
class RateLimiter: | |
def __init__(self, max_calls, period): | |
self.max_calls = max_calls | |
self.period = period | |
self.calls = [] | |
self.lock = Lock() | |
def __call__(self, func): | |
def wrapper(*args, **kwargs): | |
with self.lock: | |
now = time.time() | |
self.calls = [call for call in self.calls if call > now - self.period] | |
if len(self.calls) >= self.max_calls: | |
sleep_time = self.calls[0] - (now - self.period) | |
time.sleep(sleep_time) | |
self.calls.append(time.time()) | |
return func(*args, **kwargs) | |
return wrapper | |
def exponential_backoff(max_retries=5, base_delay=1): | |
def decorator(func): | |
def wrapper(*args, **kwargs): | |
for attempt in range(max_retries): | |
try: | |
return func(*args, **kwargs) | |
except Exception as e: | |
if attempt == max_retries - 1: | |
raise | |
delay = base_delay * (2 ** attempt) | |
logging.warning(f"Attempt {attempt + 1} failed. Retrying in {delay} seconds. Error: {str(e)}") | |
time.sleep(delay) | |
return wrapper | |
return decorator | |
# FIXME - refactor/setup to use config file & perform chunking | |
# Adjust these values based on API limits | |
def create_embeddings_batch(texts: List[str], provider: str, model: str, api_url: str, timeout_seconds: int = 300) -> \ | |
List[List[float]]: | |
global huggingface_embedder | |
if provider.lower() == 'huggingface': | |
if huggingface_embedder is None or huggingface_embedder.model_name != model: | |
if huggingface_embedder is not None: | |
huggingface_embedder.unload_model() | |
huggingface_embedder = HuggingFaceEmbedder(model, timeout_seconds) | |
embeddings = huggingface_embedder.create_embeddings(texts).tolist() | |
return embeddings | |
elif provider.lower() == 'openai': | |
logging.debug(f"Creating embeddings for {len(texts)} texts using OpenAI API") | |
return [create_openai_embedding(text, model) for text in texts] | |
elif provider.lower() == 'local': | |
response = requests.post( | |
api_url, | |
json={"texts": texts, "model": model}, | |
headers={"Authorization": f"Bearer {embedding_api_key}"} | |
) | |
if response.status_code == 200: | |
return response.json()['embeddings'] | |
else: | |
raise Exception(f"Error from local API: {response.text}") | |
else: | |
raise ValueError(f"Unsupported embedding provider: {provider}") | |
def create_embedding(text: str, provider: str, model: str, api_url: str) -> List[float]: | |
return create_embeddings_batch([text], provider, model, api_url)[0] | |
# FIXME | |
def create_stella_embeddings(text: str) -> List[float]: | |
if embedding_provider == 'local': | |
# Load the model and tokenizer | |
tokenizer = AutoTokenizer.from_pretrained("dunzhang/stella_en_400M_v5") | |
model = AutoModel.from_pretrained("dunzhang/stella_en_400M_v5") | |
# Tokenize and encode the text | |
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512) | |
# Generate embeddings | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
# Use the mean of the last hidden state as the sentence embedding | |
embeddings = outputs.last_hidden_state.mean(dim=1) | |
return embeddings[0].tolist() # Convert to list for consistency | |
elif embedding_provider == 'openai': | |
return get_openai_embeddings(text, embedding_model) | |
else: | |
raise ValueError(f"Unsupported embedding provider: {embedding_provider}") | |
def create_openai_embedding(text: str, model: str) -> List[float]: | |
embedding = get_openai_embeddings(text, model) | |
return embedding | |
# | |
# End of File. | |
####################################################################################################################### | |