Spaces:
Running
Running
File size: 7,219 Bytes
45e1f81 a324812 45e1f81 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# Embeddings_Create.py
# Description: Functions for Creating and managing Embeddings in ChromaDB with LLama.cpp/OpenAI/Transformers
#
# Imports:
import logging
import time
from functools import wraps
from threading import Lock, Timer
from typing import List
#
# 3rd-Party Imports:
import requests
from transformers import AutoTokenizer, AutoModel
import torch
#
# Local Imports:
from App_Function_Libraries.LLM_API_Calls import get_openai_embeddings
from App_Function_Libraries.Utils.Utils import load_comprehensive_config
#
#######################################################################################################################
#
# Functions:
# FIXME - Add all globals to summarize.py
loaded_config = load_comprehensive_config()
embedding_provider = loaded_config['Embeddings']['embedding_provider']
embedding_model = loaded_config['Embeddings']['embedding_model']
embedding_api_url = loaded_config['Embeddings']['embedding_api_url']
embedding_api_key = loaded_config['Embeddings']['embedding_api_key']
# Embedding Chunking Settings
chunk_size = loaded_config['Embeddings']['chunk_size']
overlap = loaded_config['Embeddings']['overlap']
# FIXME - Add logging
class HuggingFaceEmbedder:
def __init__(self, model_name, timeout_seconds=120): # Default timeout of 2 minutes
self.model_name = model_name
self.tokenizer = None
self.model = None
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.timeout_seconds = timeout_seconds
self.last_used_time = 0
self.unload_timer = None
def load_model(self):
if self.model is None:
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
self.model = AutoModel.from_pretrained(self.model_name)
self.model.to(self.device)
self.last_used_time = time.time()
self.reset_timer()
def unload_model(self):
if self.model is not None:
del self.model
del self.tokenizer
if torch.cuda.is_available():
torch.cuda.empty_cache()
self.model = None
self.tokenizer = None
if self.unload_timer:
self.unload_timer.cancel()
def reset_timer(self):
if self.unload_timer:
self.unload_timer.cancel()
self.unload_timer = Timer(self.timeout_seconds, self.unload_model)
self.unload_timer.start()
def create_embeddings(self, texts):
self.load_model()
inputs = self.tokenizer(texts, return_tensors="pt", padding=True, truncation=True, max_length=512)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
with torch.no_grad():
outputs = self.model(**inputs)
embeddings = outputs.last_hidden_state.mean(dim=1)
return embeddings.cpu().numpy()
# Global variable to hold the embedder
huggingface_embedder = None
class RateLimiter:
def __init__(self, max_calls, period):
self.max_calls = max_calls
self.period = period
self.calls = []
self.lock = Lock()
def __call__(self, func):
def wrapper(*args, **kwargs):
with self.lock:
now = time.time()
self.calls = [call for call in self.calls if call > now - self.period]
if len(self.calls) >= self.max_calls:
sleep_time = self.calls[0] - (now - self.period)
time.sleep(sleep_time)
self.calls.append(time.time())
return func(*args, **kwargs)
return wrapper
def exponential_backoff(max_retries=5, base_delay=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
logging.warning(f"Attempt {attempt + 1} failed. Retrying in {delay} seconds. Error: {str(e)}")
time.sleep(delay)
return wrapper
return decorator
# FIXME - refactor/setup to use config file & perform chunking
@exponential_backoff()
@RateLimiter(max_calls=50, period=60) # Adjust these values based on API limits
def create_embeddings_batch(texts: List[str], provider: str, model: str, api_url: str, timeout_seconds: int = 300) -> \
List[List[float]]:
global huggingface_embedder
if provider.lower() == 'huggingface':
if huggingface_embedder is None or huggingface_embedder.model_name != model:
if huggingface_embedder is not None:
huggingface_embedder.unload_model()
huggingface_embedder = HuggingFaceEmbedder(model, timeout_seconds)
embeddings = huggingface_embedder.create_embeddings(texts).tolist()
return embeddings
elif provider.lower() == 'openai':
logging.debug(f"Creating embeddings for {len(texts)} texts using OpenAI API")
return [create_openai_embedding(text, model) for text in texts]
elif provider.lower() == 'local':
response = requests.post(
api_url,
json={"texts": texts, "model": model},
headers={"Authorization": f"Bearer {embedding_api_key}"}
)
if response.status_code == 200:
return response.json()['embeddings']
else:
raise Exception(f"Error from local API: {response.text}")
else:
raise ValueError(f"Unsupported embedding provider: {provider}")
def create_embedding(text: str, provider: str, model: str, api_url: str) -> List[float]:
return create_embeddings_batch([text], provider, model, api_url)[0]
# FIXME
def create_stella_embeddings(text: str) -> List[float]:
if embedding_provider == 'local':
# Load the model and tokenizer
tokenizer = AutoTokenizer.from_pretrained("dunzhang/stella_en_400M_v5")
model = AutoModel.from_pretrained("dunzhang/stella_en_400M_v5")
# Tokenize and encode the text
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
# Generate embeddings
with torch.no_grad():
outputs = model(**inputs)
# Use the mean of the last hidden state as the sentence embedding
embeddings = outputs.last_hidden_state.mean(dim=1)
return embeddings[0].tolist() # Convert to list for consistency
elif embedding_provider == 'openai':
return get_openai_embeddings(text, embedding_model)
else:
raise ValueError(f"Unsupported embedding provider: {embedding_provider}")
def create_openai_embedding(text: str, model: str) -> List[float]:
embedding = get_openai_embeddings(text, model)
return embedding
#
# End of File.
#######################################################################################################################
|