tldw / App_Function_Libraries /ChromaDB_Library.py
oceansweep's picture
Syncing latest changes, lets see what breaks
fa9a583 verified
raw
history blame
8.94 kB
import configparser
import logging
import sqlite3
from typing import List, Dict, Any
import chromadb
import requests
from App_Function_Libraries.Chunk_Lib import improved_chunking_process
#######################################################################################################################
#
# Functions for ChromaDB
# Get ChromaDB settings
# Load configuration
config = configparser.ConfigParser()
config.read('config.txt')
chroma_db_path = config.get('Database', 'chroma_db_path', fallback='chroma_db')
chroma_client = chromadb.PersistentClient(path=chroma_db_path)
# Get embedding settings
embedding_provider = config.get('Embeddings', 'provider', fallback='openai')
embedding_model = config.get('Embeddings', 'model', fallback='text-embedding-3-small')
embedding_api_key = config.get('Embeddings', 'api_key', fallback='')
embedding_api_url = config.get('Embeddings', 'api_url', fallback='')
# Get chunking options
chunk_options = {
'method': config.get('Chunking', 'method', fallback='words'),
'max_size': config.getint('Chunking', 'max_size', fallback=400),
'overlap': config.getint('Chunking', 'overlap', fallback=200),
'adaptive': config.getboolean('Chunking', 'adaptive', fallback=False),
'multi_level': config.getboolean('Chunking', 'multi_level', fallback=False),
'language': config.get('Chunking', 'language', fallback='english')
}
def auto_update_chroma_embeddings(media_id: int, content: str):
"""
Automatically update ChromaDB embeddings when a new item is ingested into the SQLite database.
:param media_id: The ID of the newly ingested media item
:param content: The content of the newly ingested media item
"""
collection_name = f"media_{media_id}"
# Initialize or get the ChromaDB collection
collection = chroma_client.get_or_create_collection(name=collection_name)
# Check if embeddings already exist for this media_id
existing_embeddings = collection.get(ids=[f"{media_id}_chunk_{i}" for i in range(len(content))])
if existing_embeddings and len(existing_embeddings) > 0:
logging.info(f"Embeddings already exist for media ID {media_id}, skipping...")
else:
# Process and store content if embeddings do not already exist
process_and_store_content(content, collection_name, media_id)
logging.info(f"Updated ChromaDB embeddings for media ID: {media_id}")
# Function to process content, create chunks, embeddings, and store in ChromaDB and SQLite
def process_and_store_content(content: str, collection_name: str, media_id: int):
# Process the content into chunks
chunks = improved_chunking_process(content, chunk_options)
texts = [chunk['text'] for chunk in chunks]
# Generate embeddings for each chunk
embeddings = [create_embedding(text) for text in texts]
# Create unique IDs for each chunk using the media_id and chunk index
ids = [f"{media_id}_chunk_{i}" for i in range(len(texts))]
# Store the texts, embeddings, and IDs in ChromaDB
store_in_chroma(collection_name, texts, embeddings, ids)
# Store the chunks in SQLite FTS as well
from App_Function_Libraries.DB_Manager import db
with db.get_connection() as conn:
cursor = conn.cursor()
for text in texts:
cursor.execute("INSERT INTO media_fts (content) VALUES (?)", (text,))
conn.commit()
# Function to store documents and their embeddings in ChromaDB
def store_in_chroma(collection_name: str, texts: List[str], embeddings: List[List[float]], ids: List[str]):
collection = chroma_client.get_or_create_collection(name=collection_name)
collection.add(
documents=texts,
embeddings=embeddings,
ids=ids
)
# Function to perform vector search using ChromaDB
def vector_search(collection_name: str, query: str, k: int = 10) -> List[str]:
query_embedding = create_embedding(query)
collection = chroma_client.get_collection(name=collection_name)
results = collection.query(
query_embeddings=[query_embedding],
n_results=k
)
return results['documents'][0]
def create_embedding(text: str) -> List[float]:
if embedding_provider == 'openai':
import openai
openai.api_key = embedding_api_key
response = openai.Embedding.create(input=text, model=embedding_model)
return response['data'][0]['embedding']
elif embedding_provider == 'local':
# FIXME - This is a placeholder for API calls to a local embedding model
response = requests.post(
embedding_api_url,
json={"text": text, "model": embedding_model},
headers={"Authorization": f"Bearer {embedding_api_key}"}
)
return response.json()['embedding']
# FIXME - this seems correct, but idk....
elif embedding_provider == 'huggingface':
from transformers import AutoTokenizer, AutoModel
import torch
tokenizer = AutoTokenizer.from_pretrained(embedding_model)
model = AutoModel.from_pretrained(embedding_model)
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
with torch.no_grad():
outputs = model(**inputs)
# Use the mean of the last hidden state as the sentence embedding
embeddings = outputs.last_hidden_state.mean(dim=1)
return embeddings[0].tolist() # Convert to list for consistency
else:
raise ValueError(f"Unsupported embedding provider: {embedding_provider}")
def create_all_embeddings(api_choice: str) -> str:
try:
global embedding_provider
embedding_provider = api_choice
all_content = get_all_content_from_database()
if not all_content:
return "No content found in the database."
texts_to_embed = []
embeddings_to_store = []
ids_to_store = []
collection_name = "all_content_embeddings"
# Initialize or get the ChromaDB collection
collection = chroma_client.get_or_create_collection(name=collection_name)
for content_item in all_content:
media_id = content_item['id']
text = content_item['content']
# Check if the embedding already exists in ChromaDB
embedding_exists = collection.get(ids=[f"doc_{media_id}"])
if embedding_exists:
logging.info(f"Embedding already exists for media ID {media_id}, skipping...")
continue # Skip if embedding already exists
# Create the embedding
embedding = create_embedding(text)
# Collect the text, embedding, and ID for batch storage
texts_to_embed.append(text)
embeddings_to_store.append(embedding)
ids_to_store.append(f"doc_{media_id}")
# Store all new embeddings in ChromaDB
if texts_to_embed and embeddings_to_store:
store_in_chroma(collection_name, texts_to_embed, embeddings_to_store, ids_to_store)
return "Embeddings created and stored successfully for all new content."
except Exception as e:
logging.error(f"Error during embedding creation: {str(e)}")
return f"Error: {str(e)}"
def get_all_content_from_database() -> List[Dict[str, Any]]:
"""
Retrieve all media content from the database that requires embedding.
Returns:
List[Dict[str, Any]]: A list of dictionaries, each containing the media ID, content, title, and other relevant fields.
"""
try:
from App_Function_Libraries.DB_Manager import db
with db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, content, title, author, type
FROM Media
WHERE is_trash = 0 -- Exclude items marked as trash
""")
media_items = cursor.fetchall()
# Convert the results into a list of dictionaries
all_content = [
{
'id': item[0],
'content': item[1],
'title': item[2],
'author': item[3],
'type': item[4]
}
for item in media_items
]
return all_content
except sqlite3.Error as e:
logging.error(f"Error retrieving all content from database: {e}")
from App_Function_Libraries.SQLite_DB import DatabaseError
raise DatabaseError(f"Error retrieving all content from database: {e}")
#
# End of Functions for ChromaDB
#######################################################################################################################