File size: 11,292 Bytes
aa1db93
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cfbac61
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
import configparser
import logging
import sqlite3
from typing import List, Dict, Any

import chromadb
import requests
from chromadb import Settings

from App_Function_Libraries.Chunk_Lib import improved_chunking_process
from App_Function_Libraries.DB.DB_Manager import add_media_chunk, update_fts_for_media
from App_Function_Libraries.LLM_API_Calls import get_openai_embeddings

#######################################################################################################################
#
# Functions for ChromaDB

# Get ChromaDB settings
# Load configuration
config = configparser.ConfigParser()
config.read('config.txt')
chroma_db_path = config.get('Database', 'chroma_db_path', fallback='chroma_db')
chroma_client = chromadb.PersistentClient(path=chroma_db_path, settings=Settings(anonymized_telemetry=False))

import os
os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python"

# Get embedding settings
embedding_provider = config.get('Embeddings', 'provider', fallback='openai')
embedding_model = config.get('Embeddings', 'model', fallback='text-embedding-3-small')
embedding_api_key = config.get('Embeddings', 'api_key', fallback='')
embedding_api_url = config.get('Embeddings', 'api_url', fallback='')

# Get chunking options
chunk_options = {
    'method': config.get('Chunking', 'method', fallback='words'),
    'max_size': config.getint('Chunking', 'max_size', fallback=400),
    'overlap': config.getint('Chunking', 'overlap', fallback=200),
    'adaptive': config.getboolean('Chunking', 'adaptive', fallback=False),
    'multi_level': config.getboolean('Chunking', 'multi_level', fallback=False),
    'language': config.get('Chunking', 'language', fallback='english')
}


def auto_update_chroma_embeddings(media_id: int, content: str):
    """
    Automatically update ChromaDB embeddings when a new item is ingested into the SQLite database.

    :param media_id: The ID of the newly ingested media item
    :param content: The content of the newly ingested media item
    """
    collection_name = f"media_{media_id}"

    # Initialize or get the ChromaDB collection
    collection = chroma_client.get_or_create_collection(name=collection_name)

    # Check if embeddings already exist for this media_id
    existing_embeddings = collection.get(ids=[f"{media_id}_chunk_{i}" for i in range(len(content))])

    if existing_embeddings and len(existing_embeddings) > 0:
        logging.info(f"Embeddings already exist for media ID {media_id}, skipping...")
    else:
        # Process and store content if embeddings do not already exist
        process_and_store_content(content, collection_name, media_id)
        logging.info(f"Updated ChromaDB embeddings for media ID: {media_id}")


# Function to process content, create chunks, embeddings, and store in ChromaDB and SQLite
def process_and_store_content(content: str, collection_name: str, media_id: int):
    # Process the content into chunks
    chunks = improved_chunking_process(content, chunk_options)
    texts = [chunk['text'] for chunk in chunks]

    # Generate embeddings for each chunk
    embeddings = [create_embedding(text) for text in texts]

    # Create unique IDs for each chunk using the media_id and chunk index
    ids = [f"{media_id}_chunk_{i}" for i in range(len(texts))]

    # Store the texts, embeddings, and IDs in ChromaDB
    store_in_chroma(collection_name, texts, embeddings, ids)

    # Store the chunk metadata in SQLite
    for i, chunk in enumerate(chunks):
        add_media_chunk(media_id, chunk['text'], chunk['start'], chunk['end'], ids[i])

    # Update the FTS table
    update_fts_for_media(media_id)

# Function to store documents and their embeddings in ChromaDB
def store_in_chroma(collection_name: str, texts: List[str], embeddings: List[List[float]], ids: List[str]):
    collection = chroma_client.get_or_create_collection(name=collection_name)
    collection.add(
        documents=texts,
        embeddings=embeddings,
        ids=ids
    )

# Function to perform vector search using ChromaDB
def vector_search(collection_name: str, query: str, k: int = 10) -> List[str]:
    query_embedding = create_embedding(query)
    collection = chroma_client.get_collection(name=collection_name)
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=k
    )
    return results['documents'][0]


def create_embedding(text: str) -> List[float]:
    global embedding_provider, embedding_model, embedding_api_url, embedding_api_key

    if embedding_provider == 'openai':
        return get_openai_embeddings(text, embedding_model)
    elif embedding_provider == 'local':
        response = requests.post(
            embedding_api_url,
            json={"text": text, "model": embedding_model},
            headers={"Authorization": f"Bearer {embedding_api_key}"}
        )
        return response.json()['embedding']
    elif embedding_provider == 'huggingface':
        from transformers import AutoTokenizer, AutoModel
        import torch

        tokenizer = AutoTokenizer.from_pretrained(embedding_model)
        model = AutoModel.from_pretrained(embedding_model)

        inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
        with torch.no_grad():
            outputs = model(**inputs)

        # Use the mean of the last hidden state as the sentence embedding
        embeddings = outputs.last_hidden_state.mean(dim=1)
        return embeddings[0].tolist()  # Convert to list for consistency
    else:
        raise ValueError(f"Unsupported embedding provider: {embedding_provider}")


def create_all_embeddings(api_choice: str, model_or_url: str) -> str:
    try:
        all_content = get_all_content_from_database()

        if not all_content:
            return "No content found in the database."

        texts_to_embed = []
        embeddings_to_store = []
        ids_to_store = []
        collection_name = "all_content_embeddings"

        # Initialize or get the ChromaDB collection
        collection = chroma_client.get_or_create_collection(name=collection_name)

        for content_item in all_content:
            media_id = content_item['id']
            text = content_item['content']

            # Check if the embedding already exists in ChromaDB
            embedding_exists = collection.get(ids=[f"doc_{media_id}"])

            if embedding_exists:
                logging.info(f"Embedding already exists for media ID {media_id}, skipping...")
                continue  # Skip if embedding already exists

            # Create the embedding
            if api_choice == "openai":
                embedding = create_openai_embedding(text, model_or_url)
            else:  # Llama.cpp
                embedding = create_llamacpp_embedding(text, model_or_url)

            # Collect the text, embedding, and ID for batch storage
            texts_to_embed.append(text)
            embeddings_to_store.append(embedding)
            ids_to_store.append(f"doc_{media_id}")

        # Store all new embeddings in ChromaDB
        if texts_to_embed and embeddings_to_store:
            store_in_chroma(collection_name, texts_to_embed, embeddings_to_store, ids_to_store)

        return "Embeddings created and stored successfully for all new content."
    except Exception as e:
        logging.error(f"Error during embedding creation: {str(e)}")
        return f"Error: {str(e)}"


def create_openai_embedding(text: str, model: str) -> List[float]:
    openai_api_key = config['API']['openai_api_key']
    embedding = get_openai_embeddings(text, model)
    return embedding


def create_llamacpp_embedding(text: str, api_url: str) -> List[float]:
    response = requests.post(
        api_url,
        json={"input": text}
    )
    if response.status_code == 200:
        return response.json()['embedding']
    else:
        raise Exception(f"Error from Llama.cpp API: {response.text}")


def get_all_content_from_database() -> List[Dict[str, Any]]:
    """
    Retrieve all media content from the database that requires embedding.

    Returns:
        List[Dict[str, Any]]: A list of dictionaries, each containing the media ID, content, title, and other relevant fields.
    """
    try:
        from App_Function_Libraries.DB.DB_Manager import db
        with db.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("""
                SELECT id, content, title, author, type
                FROM Media
                WHERE is_trash = 0  -- Exclude items marked as trash
            """)
            media_items = cursor.fetchall()

            # Convert the results into a list of dictionaries
            all_content = [
                {
                    'id': item[0],
                    'content': item[1],
                    'title': item[2],
                    'author': item[3],
                    'type': item[4]
                }
                for item in media_items
            ]

        return all_content

    except sqlite3.Error as e:
        logging.error(f"Error retrieving all content from database: {e}")
        from App_Function_Libraries.DB.SQLite_DB import DatabaseError
        raise DatabaseError(f"Error retrieving all content from database: {e}")


def store_in_chroma_with_citation(collection_name: str, texts: List[str], embeddings: List[List[float]], ids: List[str], sources: List[str]):
    collection = chroma_client.get_or_create_collection(name=collection_name)
    collection.add(
        documents=texts,
        embeddings=embeddings,
        ids=ids,
        metadatas=[{'source': source} for source in sources]
    )


def check_embedding_status(selected_item):
    if not selected_item:
        return "Please select an item", ""
    item_id = selected_item.split('(')[0].strip()
    collection = chroma_client.get_or_create_collection(name="all_content_embeddings")
    result = collection.get(ids=[f"doc_{item_id}"])
    if result['ids']:
        embedding = result['embeddings'][0]
        embedding_preview = str(embedding[:50])  # Convert first 50 elements to string
        return f"Embedding exists for item: {item_id}", f"Embedding preview: {embedding_preview}..."
    else:
        return f"No embedding found for item: {item_id}", ""


def create_new_embedding(selected_item, api_choice, openai_model, llamacpp_url):
    if not selected_item:
        return "Please select an item"
    item_id = selected_item.split('(')[0].strip()
    items = get_all_content_from_database()
    item = next((item for item in items if item['title'] == item_id), None)
    if not item:
        return f"Item not found: {item_id}"

    try:
        if api_choice == "OpenAI":
            embedding = create_embedding(item['content'])
        else:  # Llama.cpp
            embedding = create_embedding(item['content'])

        collection_name = "all_content_embeddings"
        store_in_chroma(collection_name, [item['content']], [embedding], [f"doc_{item['id']}"])
        return f"New embedding created and stored for item: {item_id}"
    except Exception as e:
        return f"Error creating embedding: {str(e)}"


#
# End of Functions for ChromaDB
#######################################################################################################################