oceansweep's picture
Update App_Function_Libraries/RAG/ChromaDB_Library.py
aa1db93 verified
raw
history blame
No virus
11.3 kB
import configparser
import logging
import sqlite3
from typing import List, Dict, Any
import chromadb
import requests
from chromadb import Settings
from App_Function_Libraries.Chunk_Lib import improved_chunking_process
from App_Function_Libraries.DB.DB_Manager import add_media_chunk, update_fts_for_media
from App_Function_Libraries.LLM_API_Calls import get_openai_embeddings
#######################################################################################################################
#
# Functions for ChromaDB
# Get ChromaDB settings
# Load configuration
config = configparser.ConfigParser()
config.read('config.txt')
chroma_db_path = config.get('Database', 'chroma_db_path', fallback='chroma_db')
chroma_client = chromadb.PersistentClient(path=chroma_db_path, settings=Settings(anonymized_telemetry=False))
import os
os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python"
# Get embedding settings
embedding_provider = config.get('Embeddings', 'provider', fallback='openai')
embedding_model = config.get('Embeddings', 'model', fallback='text-embedding-3-small')
embedding_api_key = config.get('Embeddings', 'api_key', fallback='')
embedding_api_url = config.get('Embeddings', 'api_url', fallback='')
# Get chunking options
chunk_options = {
'method': config.get('Chunking', 'method', fallback='words'),
'max_size': config.getint('Chunking', 'max_size', fallback=400),
'overlap': config.getint('Chunking', 'overlap', fallback=200),
'adaptive': config.getboolean('Chunking', 'adaptive', fallback=False),
'multi_level': config.getboolean('Chunking', 'multi_level', fallback=False),
'language': config.get('Chunking', 'language', fallback='english')
}
def auto_update_chroma_embeddings(media_id: int, content: str):
"""
Automatically update ChromaDB embeddings when a new item is ingested into the SQLite database.
:param media_id: The ID of the newly ingested media item
:param content: The content of the newly ingested media item
"""
collection_name = f"media_{media_id}"
# Initialize or get the ChromaDB collection
collection = chroma_client.get_or_create_collection(name=collection_name)
# Check if embeddings already exist for this media_id
existing_embeddings = collection.get(ids=[f"{media_id}_chunk_{i}" for i in range(len(content))])
if existing_embeddings and len(existing_embeddings) > 0:
logging.info(f"Embeddings already exist for media ID {media_id}, skipping...")
else:
# Process and store content if embeddings do not already exist
process_and_store_content(content, collection_name, media_id)
logging.info(f"Updated ChromaDB embeddings for media ID: {media_id}")
# Function to process content, create chunks, embeddings, and store in ChromaDB and SQLite
def process_and_store_content(content: str, collection_name: str, media_id: int):
# Process the content into chunks
chunks = improved_chunking_process(content, chunk_options)
texts = [chunk['text'] for chunk in chunks]
# Generate embeddings for each chunk
embeddings = [create_embedding(text) for text in texts]
# Create unique IDs for each chunk using the media_id and chunk index
ids = [f"{media_id}_chunk_{i}" for i in range(len(texts))]
# Store the texts, embeddings, and IDs in ChromaDB
store_in_chroma(collection_name, texts, embeddings, ids)
# Store the chunk metadata in SQLite
for i, chunk in enumerate(chunks):
add_media_chunk(media_id, chunk['text'], chunk['start'], chunk['end'], ids[i])
# Update the FTS table
update_fts_for_media(media_id)
# Function to store documents and their embeddings in ChromaDB
def store_in_chroma(collection_name: str, texts: List[str], embeddings: List[List[float]], ids: List[str]):
collection = chroma_client.get_or_create_collection(name=collection_name)
collection.add(
documents=texts,
embeddings=embeddings,
ids=ids
)
# Function to perform vector search using ChromaDB
def vector_search(collection_name: str, query: str, k: int = 10) -> List[str]:
query_embedding = create_embedding(query)
collection = chroma_client.get_collection(name=collection_name)
results = collection.query(
query_embeddings=[query_embedding],
n_results=k
)
return results['documents'][0]
def create_embedding(text: str) -> List[float]:
global embedding_provider, embedding_model, embedding_api_url, embedding_api_key
if embedding_provider == 'openai':
return get_openai_embeddings(text, embedding_model)
elif embedding_provider == 'local':
response = requests.post(
embedding_api_url,
json={"text": text, "model": embedding_model},
headers={"Authorization": f"Bearer {embedding_api_key}"}
)
return response.json()['embedding']
elif embedding_provider == 'huggingface':
from transformers import AutoTokenizer, AutoModel
import torch
tokenizer = AutoTokenizer.from_pretrained(embedding_model)
model = AutoModel.from_pretrained(embedding_model)
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
with torch.no_grad():
outputs = model(**inputs)
# Use the mean of the last hidden state as the sentence embedding
embeddings = outputs.last_hidden_state.mean(dim=1)
return embeddings[0].tolist() # Convert to list for consistency
else:
raise ValueError(f"Unsupported embedding provider: {embedding_provider}")
def create_all_embeddings(api_choice: str, model_or_url: str) -> str:
try:
all_content = get_all_content_from_database()
if not all_content:
return "No content found in the database."
texts_to_embed = []
embeddings_to_store = []
ids_to_store = []
collection_name = "all_content_embeddings"
# Initialize or get the ChromaDB collection
collection = chroma_client.get_or_create_collection(name=collection_name)
for content_item in all_content:
media_id = content_item['id']
text = content_item['content']
# Check if the embedding already exists in ChromaDB
embedding_exists = collection.get(ids=[f"doc_{media_id}"])
if embedding_exists:
logging.info(f"Embedding already exists for media ID {media_id}, skipping...")
continue # Skip if embedding already exists
# Create the embedding
if api_choice == "openai":
embedding = create_openai_embedding(text, model_or_url)
else: # Llama.cpp
embedding = create_llamacpp_embedding(text, model_or_url)
# Collect the text, embedding, and ID for batch storage
texts_to_embed.append(text)
embeddings_to_store.append(embedding)
ids_to_store.append(f"doc_{media_id}")
# Store all new embeddings in ChromaDB
if texts_to_embed and embeddings_to_store:
store_in_chroma(collection_name, texts_to_embed, embeddings_to_store, ids_to_store)
return "Embeddings created and stored successfully for all new content."
except Exception as e:
logging.error(f"Error during embedding creation: {str(e)}")
return f"Error: {str(e)}"
def create_openai_embedding(text: str, model: str) -> List[float]:
openai_api_key = config['API']['openai_api_key']
embedding = get_openai_embeddings(text, model)
return embedding
def create_llamacpp_embedding(text: str, api_url: str) -> List[float]:
response = requests.post(
api_url,
json={"input": text}
)
if response.status_code == 200:
return response.json()['embedding']
else:
raise Exception(f"Error from Llama.cpp API: {response.text}")
def get_all_content_from_database() -> List[Dict[str, Any]]:
"""
Retrieve all media content from the database that requires embedding.
Returns:
List[Dict[str, Any]]: A list of dictionaries, each containing the media ID, content, title, and other relevant fields.
"""
try:
from App_Function_Libraries.DB.DB_Manager import db
with db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, content, title, author, type
FROM Media
WHERE is_trash = 0 -- Exclude items marked as trash
""")
media_items = cursor.fetchall()
# Convert the results into a list of dictionaries
all_content = [
{
'id': item[0],
'content': item[1],
'title': item[2],
'author': item[3],
'type': item[4]
}
for item in media_items
]
return all_content
except sqlite3.Error as e:
logging.error(f"Error retrieving all content from database: {e}")
from App_Function_Libraries.DB.SQLite_DB import DatabaseError
raise DatabaseError(f"Error retrieving all content from database: {e}")
def store_in_chroma_with_citation(collection_name: str, texts: List[str], embeddings: List[List[float]], ids: List[str], sources: List[str]):
collection = chroma_client.get_or_create_collection(name=collection_name)
collection.add(
documents=texts,
embeddings=embeddings,
ids=ids,
metadatas=[{'source': source} for source in sources]
)
def check_embedding_status(selected_item):
if not selected_item:
return "Please select an item", ""
item_id = selected_item.split('(')[0].strip()
collection = chroma_client.get_or_create_collection(name="all_content_embeddings")
result = collection.get(ids=[f"doc_{item_id}"])
if result['ids']:
embedding = result['embeddings'][0]
embedding_preview = str(embedding[:50]) # Convert first 50 elements to string
return f"Embedding exists for item: {item_id}", f"Embedding preview: {embedding_preview}..."
else:
return f"No embedding found for item: {item_id}", ""
def create_new_embedding(selected_item, api_choice, openai_model, llamacpp_url):
if not selected_item:
return "Please select an item"
item_id = selected_item.split('(')[0].strip()
items = get_all_content_from_database()
item = next((item for item in items if item['title'] == item_id), None)
if not item:
return f"Item not found: {item_id}"
try:
if api_choice == "OpenAI":
embedding = create_embedding(item['content'])
else: # Llama.cpp
embedding = create_embedding(item['content'])
collection_name = "all_content_embeddings"
store_in_chroma(collection_name, [item['content']], [embedding], [f"doc_{item['id']}"])
return f"New embedding created and stored for item: {item_id}"
except Exception as e:
return f"Error creating embedding: {str(e)}"
#
# End of Functions for ChromaDB
#######################################################################################################################