# Embeddings_Create.py # Description: Functions for Creating and managing Embeddings in ChromaDB with LLama.cpp/OpenAI/Transformers # # Imports: import logging import time from functools import wraps from threading import Lock, Timer from typing import List # # 3rd-Party Imports: import requests from transformers import AutoTokenizer, AutoModel import torch # # Local Imports: from App_Function_Libraries.LLM_API_Calls import get_openai_embeddings from App_Function_Libraries.Utils.Utils import load_comprehensive_config # ####################################################################################################################### # # Functions: # FIXME - Add all globals to summarize.py loaded_config = load_comprehensive_config() embedding_provider = loaded_config['Embeddings']['embedding_provider'] embedding_model = loaded_config['Embeddings']['embedding_model'] embedding_api_url = loaded_config['Embeddings']['embedding_api_url'] embedding_api_key = loaded_config['Embeddings']['embedding_api_key'] # Embedding Chunking Settings chunk_size = loaded_config['Embeddings']['chunk_size'] overlap = loaded_config['Embeddings']['overlap'] # FIXME - Add logging class HuggingFaceEmbedder: def __init__(self, model_name, timeout_seconds=120): # Default timeout of 2 minutes self.model_name = model_name self.tokenizer = None self.model = None self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.timeout_seconds = timeout_seconds self.last_used_time = 0 self.unload_timer = None def load_model(self): if self.model is None: self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) self.model = AutoModel.from_pretrained(self.model_name) self.model.to(self.device) self.last_used_time = time.time() self.reset_timer() def unload_model(self): if self.model is not None: del self.model del self.tokenizer if torch.cuda.is_available(): torch.cuda.empty_cache() self.model = None self.tokenizer = None if self.unload_timer: self.unload_timer.cancel() def reset_timer(self): if self.unload_timer: self.unload_timer.cancel() self.unload_timer = Timer(self.timeout_seconds, self.unload_model) self.unload_timer.start() def create_embeddings(self, texts): self.load_model() inputs = self.tokenizer(texts, return_tensors="pt", padding=True, truncation=True, max_length=512) inputs = {k: v.to(self.device) for k, v in inputs.items()} with torch.no_grad(): outputs = self.model(**inputs) embeddings = outputs.last_hidden_state.mean(dim=1) return embeddings.cpu().numpy() # Global variable to hold the embedder huggingface_embedder = None class RateLimiter: def __init__(self, max_calls, period): self.max_calls = max_calls self.period = period self.calls = [] self.lock = Lock() def __call__(self, func): def wrapper(*args, **kwargs): with self.lock: now = time.time() self.calls = [call for call in self.calls if call > now - self.period] if len(self.calls) >= self.max_calls: sleep_time = self.calls[0] - (now - self.period) time.sleep(sleep_time) self.calls.append(time.time()) return func(*args, **kwargs) return wrapper def exponential_backoff(max_retries=5, base_delay=1): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: if attempt == max_retries - 1: raise delay = base_delay * (2 ** attempt) logging.warning(f"Attempt {attempt + 1} failed. Retrying in {delay} seconds. Error: {str(e)}") time.sleep(delay) return wrapper return decorator # FIXME - refactor/setup to use config file & perform chunking @exponential_backoff() @RateLimiter(max_calls=50, period=60) # Adjust these values based on API limits def create_embeddings_batch(texts: List[str], provider: str, model: str, api_url: str, timeout_seconds: int = 300) -> \ List[List[float]]: global huggingface_embedder if provider.lower() == 'huggingface': if huggingface_embedder is None or huggingface_embedder.model_name != model: if huggingface_embedder is not None: huggingface_embedder.unload_model() huggingface_embedder = HuggingFaceEmbedder(model, timeout_seconds) embeddings = huggingface_embedder.create_embeddings(texts).tolist() return embeddings elif provider.lower() == 'openai': logging.debug(f"Creating embeddings for {len(texts)} texts using OpenAI API") return [create_openai_embedding(text, model) for text in texts] elif provider.lower() == 'local': response = requests.post( api_url, json={"texts": texts, "model": model}, headers={"Authorization": f"Bearer {embedding_api_key}"} ) if response.status_code == 200: return response.json()['embeddings'] else: raise Exception(f"Error from local API: {response.text}") else: raise ValueError(f"Unsupported embedding provider: {provider}") def create_embedding(text: str, provider: str, model: str, api_url: str) -> List[float]: return create_embeddings_batch([text], provider, model, api_url)[0] # FIXME def create_stella_embeddings(text: str) -> List[float]: if embedding_provider == 'local': # Load the model and tokenizer tokenizer = AutoTokenizer.from_pretrained("dunzhang/stella_en_400M_v5") model = AutoModel.from_pretrained("dunzhang/stella_en_400M_v5") # Tokenize and encode the text inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512) # Generate embeddings with torch.no_grad(): outputs = model(**inputs) # Use the mean of the last hidden state as the sentence embedding embeddings = outputs.last_hidden_state.mean(dim=1) return embeddings[0].tolist() # Convert to list for consistency elif embedding_provider == 'openai': return get_openai_embeddings(text, embedding_model) else: raise ValueError(f"Unsupported embedding provider: {embedding_provider}") def create_openai_embedding(text: str, model: str) -> List[float]: embedding = get_openai_embeddings(text, model) return embedding # # End of File. #######################################################################################################################