import configparser import logging import sqlite3 from typing import List, Dict, Any import chromadb import requests from chromadb import Settings from App_Function_Libraries.Chunk_Lib import improved_chunking_process from App_Function_Libraries.DB.DB_Manager import add_media_chunk, update_fts_for_media from App_Function_Libraries.LLM_API_Calls import get_openai_embeddings ####################################################################################################################### # # Functions for ChromaDB # Get ChromaDB settings # Load configuration config = configparser.ConfigParser() config.read('config.txt') chroma_db_path = config.get('Database', 'chroma_db_path', fallback='chroma_db') chroma_client = chromadb.PersistentClient(path=chroma_db_path, settings=Settings(anonymized_telemetry=False)) import os os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python" # Get embedding settings embedding_provider = config.get('Embeddings', 'provider', fallback='openai') embedding_model = config.get('Embeddings', 'model', fallback='text-embedding-3-small') embedding_api_key = config.get('Embeddings', 'api_key', fallback='') embedding_api_url = config.get('Embeddings', 'api_url', fallback='') # Get chunking options chunk_options = { 'method': config.get('Chunking', 'method', fallback='words'), 'max_size': config.getint('Chunking', 'max_size', fallback=400), 'overlap': config.getint('Chunking', 'overlap', fallback=200), 'adaptive': config.getboolean('Chunking', 'adaptive', fallback=False), 'multi_level': config.getboolean('Chunking', 'multi_level', fallback=False), 'language': config.get('Chunking', 'language', fallback='english') } def auto_update_chroma_embeddings(media_id: int, content: str): """ Automatically update ChromaDB embeddings when a new item is ingested into the SQLite database. :param media_id: The ID of the newly ingested media item :param content: The content of the newly ingested media item """ collection_name = f"media_{media_id}" # Initialize or get the ChromaDB collection collection = chroma_client.get_or_create_collection(name=collection_name) # Check if embeddings already exist for this media_id existing_embeddings = collection.get(ids=[f"{media_id}_chunk_{i}" for i in range(len(content))]) if existing_embeddings and len(existing_embeddings) > 0: logging.info(f"Embeddings already exist for media ID {media_id}, skipping...") else: # Process and store content if embeddings do not already exist process_and_store_content(content, collection_name, media_id) logging.info(f"Updated ChromaDB embeddings for media ID: {media_id}") # Function to process content, create chunks, embeddings, and store in ChromaDB and SQLite def process_and_store_content(content: str, collection_name: str, media_id: int): # Process the content into chunks chunks = improved_chunking_process(content, chunk_options) texts = [chunk['text'] for chunk in chunks] # Generate embeddings for each chunk embeddings = [create_embedding(text) for text in texts] # Create unique IDs for each chunk using the media_id and chunk index ids = [f"{media_id}_chunk_{i}" for i in range(len(texts))] # Store the texts, embeddings, and IDs in ChromaDB store_in_chroma(collection_name, texts, embeddings, ids) # Store the chunk metadata in SQLite for i, chunk in enumerate(chunks): add_media_chunk(media_id, chunk['text'], chunk['start'], chunk['end'], ids[i]) # Update the FTS table update_fts_for_media(media_id) # Function to store documents and their embeddings in ChromaDB def store_in_chroma(collection_name: str, texts: List[str], embeddings: List[List[float]], ids: List[str]): collection = chroma_client.get_or_create_collection(name=collection_name) collection.add( documents=texts, embeddings=embeddings, ids=ids ) # Function to perform vector search using ChromaDB def vector_search(collection_name: str, query: str, k: int = 10) -> List[str]: query_embedding = create_embedding(query) collection = chroma_client.get_collection(name=collection_name) results = collection.query( query_embeddings=[query_embedding], n_results=k ) return results['documents'][0] def create_embedding(text: str) -> List[float]: global embedding_provider, embedding_model, embedding_api_url, embedding_api_key if embedding_provider == 'openai': return get_openai_embeddings(text, embedding_model) elif embedding_provider == 'local': response = requests.post( embedding_api_url, json={"text": text, "model": embedding_model}, headers={"Authorization": f"Bearer {embedding_api_key}"} ) return response.json()['embedding'] elif embedding_provider == 'huggingface': from transformers import AutoTokenizer, AutoModel import torch tokenizer = AutoTokenizer.from_pretrained(embedding_model) model = AutoModel.from_pretrained(embedding_model) inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512) with torch.no_grad(): outputs = model(**inputs) # Use the mean of the last hidden state as the sentence embedding embeddings = outputs.last_hidden_state.mean(dim=1) return embeddings[0].tolist() # Convert to list for consistency else: raise ValueError(f"Unsupported embedding provider: {embedding_provider}") def create_all_embeddings(api_choice: str, model_or_url: str) -> str: try: all_content = get_all_content_from_database() if not all_content: return "No content found in the database." texts_to_embed = [] embeddings_to_store = [] ids_to_store = [] collection_name = "all_content_embeddings" # Initialize or get the ChromaDB collection collection = chroma_client.get_or_create_collection(name=collection_name) for content_item in all_content: media_id = content_item['id'] text = content_item['content'] # Check if the embedding already exists in ChromaDB embedding_exists = collection.get(ids=[f"doc_{media_id}"]) if embedding_exists: logging.info(f"Embedding already exists for media ID {media_id}, skipping...") continue # Skip if embedding already exists # Create the embedding if api_choice == "openai": embedding = create_openai_embedding(text, model_or_url) else: # Llama.cpp embedding = create_llamacpp_embedding(text, model_or_url) # Collect the text, embedding, and ID for batch storage texts_to_embed.append(text) embeddings_to_store.append(embedding) ids_to_store.append(f"doc_{media_id}") # Store all new embeddings in ChromaDB if texts_to_embed and embeddings_to_store: store_in_chroma(collection_name, texts_to_embed, embeddings_to_store, ids_to_store) return "Embeddings created and stored successfully for all new content." except Exception as e: logging.error(f"Error during embedding creation: {str(e)}") return f"Error: {str(e)}" def create_openai_embedding(text: str, model: str) -> List[float]: openai_api_key = config['API']['openai_api_key'] embedding = get_openai_embeddings(text, model) return embedding def create_llamacpp_embedding(text: str, api_url: str) -> List[float]: response = requests.post( api_url, json={"input": text} ) if response.status_code == 200: return response.json()['embedding'] else: raise Exception(f"Error from Llama.cpp API: {response.text}") def get_all_content_from_database() -> List[Dict[str, Any]]: """ Retrieve all media content from the database that requires embedding. Returns: List[Dict[str, Any]]: A list of dictionaries, each containing the media ID, content, title, and other relevant fields. """ try: from App_Function_Libraries.DB.DB_Manager import db with db.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT id, content, title, author, type FROM Media WHERE is_trash = 0 -- Exclude items marked as trash """) media_items = cursor.fetchall() # Convert the results into a list of dictionaries all_content = [ { 'id': item[0], 'content': item[1], 'title': item[2], 'author': item[3], 'type': item[4] } for item in media_items ] return all_content except sqlite3.Error as e: logging.error(f"Error retrieving all content from database: {e}") from App_Function_Libraries.DB.SQLite_DB import DatabaseError raise DatabaseError(f"Error retrieving all content from database: {e}") def store_in_chroma_with_citation(collection_name: str, texts: List[str], embeddings: List[List[float]], ids: List[str], sources: List[str]): collection = chroma_client.get_or_create_collection(name=collection_name) collection.add( documents=texts, embeddings=embeddings, ids=ids, metadatas=[{'source': source} for source in sources] ) def check_embedding_status(selected_item): if not selected_item: return "Please select an item", "" item_id = selected_item.split('(')[0].strip() collection = chroma_client.get_or_create_collection(name="all_content_embeddings") result = collection.get(ids=[f"doc_{item_id}"]) if result['ids']: embedding = result['embeddings'][0] embedding_preview = str(embedding[:50]) # Convert first 50 elements to string return f"Embedding exists for item: {item_id}", f"Embedding preview: {embedding_preview}..." else: return f"No embedding found for item: {item_id}", "" def create_new_embedding(selected_item, api_choice, openai_model, llamacpp_url): if not selected_item: return "Please select an item" item_id = selected_item.split('(')[0].strip() items = get_all_content_from_database() item = next((item for item in items if item['title'] == item_id), None) if not item: return f"Item not found: {item_id}" try: if api_choice == "OpenAI": embedding = create_embedding(item['content']) else: # Llama.cpp embedding = create_embedding(item['content']) collection_name = "all_content_embeddings" store_in_chroma(collection_name, [item['content']], [embedding], [f"doc_{item['id']}"]) return f"New embedding created and stored for item: {item_id}" except Exception as e: return f"Error creating embedding: {str(e)}" # # End of Functions for ChromaDB #######################################################################################################################