oceansweep commited on
Commit
2810d40
1 Parent(s): 9baa4f4

Update App_Function_Libraries/ChromaDB_Library.py

Browse files
App_Function_Libraries/ChromaDB_Library.py CHANGED
@@ -1,225 +1,225 @@
1
- import configparser
2
- import logging
3
- import sqlite3
4
- from typing import List, Dict, Any
5
-
6
- import chromadb
7
- import requests
8
-
9
- from App_Function_Libraries.Chunk_Lib import improved_chunking_process
10
-
11
- #######################################################################################################################
12
- #
13
- # Functions for ChromaDB
14
-
15
- # Get ChromaDB settings
16
- # Load configuration
17
- config = configparser.ConfigParser()
18
- config.read('config.txt')
19
- chroma_db_path = config.get('Database', 'chroma_db_path', fallback='chroma_db')
20
- chroma_client = chromadb.PersistentClient(path=chroma_db_path)
21
-
22
- # Get embedding settings
23
- embedding_provider = config.get('Embeddings', 'provider', fallback='openai')
24
- embedding_model = config.get('Embeddings', 'model', fallback='text-embedding-3-small')
25
- embedding_api_key = config.get('Embeddings', 'api_key', fallback='')
26
- embedding_api_url = config.get('Embeddings', 'api_url', fallback='')
27
-
28
- # Get chunking options
29
- chunk_options = {
30
- 'method': config.get('Chunking', 'method', fallback='words'),
31
- 'max_size': config.getint('Chunking', 'max_size', fallback=400),
32
- 'overlap': config.getint('Chunking', 'overlap', fallback=200),
33
- 'adaptive': config.getboolean('Chunking', 'adaptive', fallback=False),
34
- 'multi_level': config.getboolean('Chunking', 'multi_level', fallback=False),
35
- 'language': config.get('Chunking', 'language', fallback='english')
36
- }
37
-
38
-
39
- def auto_update_chroma_embeddings(media_id: int, content: str):
40
- """
41
- Automatically update ChromaDB embeddings when a new item is ingested into the SQLite database.
42
-
43
- :param media_id: The ID of the newly ingested media item
44
- :param content: The content of the newly ingested media item
45
- """
46
- collection_name = f"media_{media_id}"
47
-
48
- # Initialize or get the ChromaDB collection
49
- collection = chroma_client.get_or_create_collection(name=collection_name)
50
-
51
- # Check if embeddings already exist for this media_id
52
- existing_embeddings = collection.get(ids=[f"{media_id}_chunk_{i}" for i in range(len(content))])
53
-
54
- if existing_embeddings and len(existing_embeddings) > 0:
55
- logging.info(f"Embeddings already exist for media ID {media_id}, skipping...")
56
- else:
57
- # Process and store content if embeddings do not already exist
58
- process_and_store_content(content, collection_name, media_id)
59
- logging.info(f"Updated ChromaDB embeddings for media ID: {media_id}")
60
-
61
-
62
- # Function to process content, create chunks, embeddings, and store in ChromaDB and SQLite
63
- def process_and_store_content(content: str, collection_name: str, media_id: int):
64
- # Process the content into chunks
65
- chunks = improved_chunking_process(content, chunk_options)
66
- texts = [chunk['text'] for chunk in chunks]
67
-
68
- # Generate embeddings for each chunk
69
- embeddings = [create_embedding(text) for text in texts]
70
-
71
- # Create unique IDs for each chunk using the media_id and chunk index
72
- ids = [f"{media_id}_chunk_{i}" for i in range(len(texts))]
73
-
74
- # Store the texts, embeddings, and IDs in ChromaDB
75
- store_in_chroma(collection_name, texts, embeddings, ids)
76
-
77
- # Store the chunks in SQLite FTS as well
78
- from App_Function_Libraries.DB_Manager import db
79
- with db.get_connection() as conn:
80
- cursor = conn.cursor()
81
- for text in texts:
82
- cursor.execute("INSERT INTO media_fts (content) VALUES (?)", (text,))
83
- conn.commit()
84
-
85
-
86
- # Function to store documents and their embeddings in ChromaDB
87
- def store_in_chroma(collection_name: str, texts: List[str], embeddings: List[List[float]], ids: List[str]):
88
- collection = chroma_client.get_or_create_collection(name=collection_name)
89
- collection.add(
90
- documents=texts,
91
- embeddings=embeddings,
92
- ids=ids
93
- )
94
-
95
- # Function to perform vector search using ChromaDB
96
- def vector_search(collection_name: str, query: str, k: int = 10) -> List[str]:
97
- query_embedding = create_embedding(query)
98
- collection = chroma_client.get_collection(name=collection_name)
99
- results = collection.query(
100
- query_embeddings=[query_embedding],
101
- n_results=k
102
- )
103
- return results['documents'][0]
104
-
105
-
106
- def create_embedding(text: str) -> List[float]:
107
- if embedding_provider == 'openai':
108
- import openai
109
- openai.api_key = embedding_api_key
110
- response = openai.Embedding.create(input=text, model=embedding_model)
111
- return response['data'][0]['embedding']
112
- elif embedding_provider == 'local':
113
- # FIXME - This is a placeholder for API calls to a local embedding model
114
- response = requests.post(
115
- embedding_api_url,
116
- json={"text": text, "model": embedding_model},
117
- headers={"Authorization": f"Bearer {embedding_api_key}"}
118
- )
119
- return response.json()['embedding']
120
- # FIXME - this seems correct, but idk....
121
- elif embedding_provider == 'huggingface':
122
- from transformers import AutoTokenizer, AutoModel
123
- import torch
124
-
125
- tokenizer = AutoTokenizer.from_pretrained(embedding_model)
126
- model = AutoModel.from_pretrained(embedding_model)
127
-
128
- inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
129
- with torch.no_grad():
130
- outputs = model(**inputs)
131
-
132
- # Use the mean of the last hidden state as the sentence embedding
133
- embeddings = outputs.last_hidden_state.mean(dim=1)
134
- return embeddings[0].tolist() # Convert to list for consistency
135
- else:
136
- raise ValueError(f"Unsupported embedding provider: {embedding_provider}")
137
-
138
-
139
- def create_all_embeddings(api_choice: str) -> str:
140
- try:
141
- global embedding_provider
142
- embedding_provider = api_choice
143
-
144
- all_content = get_all_content_from_database()
145
-
146
- if not all_content:
147
- return "No content found in the database."
148
-
149
- texts_to_embed = []
150
- embeddings_to_store = []
151
- ids_to_store = []
152
- collection_name = "all_content_embeddings"
153
-
154
- # Initialize or get the ChromaDB collection
155
- collection = chroma_client.get_or_create_collection(name=collection_name)
156
-
157
- for content_item in all_content:
158
- media_id = content_item['id']
159
- text = content_item['content']
160
-
161
- # Check if the embedding already exists in ChromaDB
162
- embedding_exists = collection.get(ids=[f"doc_{media_id}"])
163
-
164
- if embedding_exists:
165
- logging.info(f"Embedding already exists for media ID {media_id}, skipping...")
166
- continue # Skip if embedding already exists
167
-
168
- # Create the embedding
169
- embedding = create_embedding(text)
170
-
171
- # Collect the text, embedding, and ID for batch storage
172
- texts_to_embed.append(text)
173
- embeddings_to_store.append(embedding)
174
- ids_to_store.append(f"doc_{media_id}")
175
-
176
- # Store all new embeddings in ChromaDB
177
- if texts_to_embed and embeddings_to_store:
178
- store_in_chroma(collection_name, texts_to_embed, embeddings_to_store, ids_to_store)
179
-
180
- return "Embeddings created and stored successfully for all new content."
181
- except Exception as e:
182
- logging.error(f"Error during embedding creation: {str(e)}")
183
- return f"Error: {str(e)}"
184
-
185
-
186
- def get_all_content_from_database() -> List[Dict[str, Any]]:
187
- """
188
- Retrieve all media content from the database that requires embedding.
189
-
190
- Returns:
191
- List[Dict[str, Any]]: A list of dictionaries, each containing the media ID, content, title, and other relevant fields.
192
- """
193
- try:
194
- from App_Function_Libraries.DB_Manager import db
195
- with db.get_connection() as conn:
196
- cursor = conn.cursor()
197
- cursor.execute("""
198
- SELECT id, content, title, author, type
199
- FROM Media
200
- WHERE is_trash = 0 -- Exclude items marked as trash
201
- """)
202
- media_items = cursor.fetchall()
203
-
204
- # Convert the results into a list of dictionaries
205
- all_content = [
206
- {
207
- 'id': item[0],
208
- 'content': item[1],
209
- 'title': item[2],
210
- 'author': item[3],
211
- 'type': item[4]
212
- }
213
- for item in media_items
214
- ]
215
-
216
- return all_content
217
-
218
- except sqlite3.Error as e:
219
- logging.error(f"Error retrieving all content from database: {e}")
220
- from App_Function_Libraries.SQLite_DB import DatabaseError
221
- raise DatabaseError(f"Error retrieving all content from database: {e}")
222
-
223
- #
224
- # End of Functions for ChromaDB
225
  #######################################################################################################################
 
1
+ import configparser
2
+ import logging
3
+ import sqlite3
4
+ from typing import List, Dict, Any
5
+
6
+ #import chromadb
7
+ import requests
8
+
9
+ from App_Function_Libraries.Chunk_Lib import improved_chunking_process
10
+
11
+ #######################################################################################################################
12
+ #
13
+ # Functions for ChromaDB
14
+
15
+ # Get ChromaDB settings
16
+ # Load configuration
17
+ config = configparser.ConfigParser()
18
+ config.read('config.txt')
19
+ chroma_db_path = config.get('Database', 'chroma_db_path', fallback='chroma_db')
20
+ chroma_client = chromadb.PersistentClient(path=chroma_db_path)
21
+
22
+ # Get embedding settings
23
+ embedding_provider = config.get('Embeddings', 'provider', fallback='openai')
24
+ embedding_model = config.get('Embeddings', 'model', fallback='text-embedding-3-small')
25
+ embedding_api_key = config.get('Embeddings', 'api_key', fallback='')
26
+ embedding_api_url = config.get('Embeddings', 'api_url', fallback='')
27
+
28
+ # Get chunking options
29
+ chunk_options = {
30
+ 'method': config.get('Chunking', 'method', fallback='words'),
31
+ 'max_size': config.getint('Chunking', 'max_size', fallback=400),
32
+ 'overlap': config.getint('Chunking', 'overlap', fallback=200),
33
+ 'adaptive': config.getboolean('Chunking', 'adaptive', fallback=False),
34
+ 'multi_level': config.getboolean('Chunking', 'multi_level', fallback=False),
35
+ 'language': config.get('Chunking', 'language', fallback='english')
36
+ }
37
+
38
+
39
+ def auto_update_chroma_embeddings(media_id: int, content: str):
40
+ """
41
+ Automatically update ChromaDB embeddings when a new item is ingested into the SQLite database.
42
+
43
+ :param media_id: The ID of the newly ingested media item
44
+ :param content: The content of the newly ingested media item
45
+ """
46
+ collection_name = f"media_{media_id}"
47
+
48
+ # Initialize or get the ChromaDB collection
49
+ collection = chroma_client.get_or_create_collection(name=collection_name)
50
+
51
+ # Check if embeddings already exist for this media_id
52
+ existing_embeddings = collection.get(ids=[f"{media_id}_chunk_{i}" for i in range(len(content))])
53
+
54
+ if existing_embeddings and len(existing_embeddings) > 0:
55
+ logging.info(f"Embeddings already exist for media ID {media_id}, skipping...")
56
+ else:
57
+ # Process and store content if embeddings do not already exist
58
+ process_and_store_content(content, collection_name, media_id)
59
+ logging.info(f"Updated ChromaDB embeddings for media ID: {media_id}")
60
+
61
+
62
+ # Function to process content, create chunks, embeddings, and store in ChromaDB and SQLite
63
+ def process_and_store_content(content: str, collection_name: str, media_id: int):
64
+ # Process the content into chunks
65
+ chunks = improved_chunking_process(content, chunk_options)
66
+ texts = [chunk['text'] for chunk in chunks]
67
+
68
+ # Generate embeddings for each chunk
69
+ embeddings = [create_embedding(text) for text in texts]
70
+
71
+ # Create unique IDs for each chunk using the media_id and chunk index
72
+ ids = [f"{media_id}_chunk_{i}" for i in range(len(texts))]
73
+
74
+ # Store the texts, embeddings, and IDs in ChromaDB
75
+ store_in_chroma(collection_name, texts, embeddings, ids)
76
+
77
+ # Store the chunks in SQLite FTS as well
78
+ from App_Function_Libraries.DB_Manager import db
79
+ with db.get_connection() as conn:
80
+ cursor = conn.cursor()
81
+ for text in texts:
82
+ cursor.execute("INSERT INTO media_fts (content) VALUES (?)", (text,))
83
+ conn.commit()
84
+
85
+
86
+ # Function to store documents and their embeddings in ChromaDB
87
+ def store_in_chroma(collection_name: str, texts: List[str], embeddings: List[List[float]], ids: List[str]):
88
+ collection = chroma_client.get_or_create_collection(name=collection_name)
89
+ collection.add(
90
+ documents=texts,
91
+ embeddings=embeddings,
92
+ ids=ids
93
+ )
94
+
95
+ # Function to perform vector search using ChromaDB
96
+ def vector_search(collection_name: str, query: str, k: int = 10) -> List[str]:
97
+ query_embedding = create_embedding(query)
98
+ collection = chroma_client.get_collection(name=collection_name)
99
+ results = collection.query(
100
+ query_embeddings=[query_embedding],
101
+ n_results=k
102
+ )
103
+ return results['documents'][0]
104
+
105
+
106
+ def create_embedding(text: str) -> List[float]:
107
+ if embedding_provider == 'openai':
108
+ import openai
109
+ openai.api_key = embedding_api_key
110
+ response = openai.Embedding.create(input=text, model=embedding_model)
111
+ return response['data'][0]['embedding']
112
+ elif embedding_provider == 'local':
113
+ # FIXME - This is a placeholder for API calls to a local embedding model
114
+ response = requests.post(
115
+ embedding_api_url,
116
+ json={"text": text, "model": embedding_model},
117
+ headers={"Authorization": f"Bearer {embedding_api_key}"}
118
+ )
119
+ return response.json()['embedding']
120
+ # FIXME - this seems correct, but idk....
121
+ elif embedding_provider == 'huggingface':
122
+ from transformers import AutoTokenizer, AutoModel
123
+ import torch
124
+
125
+ tokenizer = AutoTokenizer.from_pretrained(embedding_model)
126
+ model = AutoModel.from_pretrained(embedding_model)
127
+
128
+ inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
129
+ with torch.no_grad():
130
+ outputs = model(**inputs)
131
+
132
+ # Use the mean of the last hidden state as the sentence embedding
133
+ embeddings = outputs.last_hidden_state.mean(dim=1)
134
+ return embeddings[0].tolist() # Convert to list for consistency
135
+ else:
136
+ raise ValueError(f"Unsupported embedding provider: {embedding_provider}")
137
+
138
+
139
+ def create_all_embeddings(api_choice: str) -> str:
140
+ try:
141
+ global embedding_provider
142
+ embedding_provider = api_choice
143
+
144
+ all_content = get_all_content_from_database()
145
+
146
+ if not all_content:
147
+ return "No content found in the database."
148
+
149
+ texts_to_embed = []
150
+ embeddings_to_store = []
151
+ ids_to_store = []
152
+ collection_name = "all_content_embeddings"
153
+
154
+ # Initialize or get the ChromaDB collection
155
+ collection = chroma_client.get_or_create_collection(name=collection_name)
156
+
157
+ for content_item in all_content:
158
+ media_id = content_item['id']
159
+ text = content_item['content']
160
+
161
+ # Check if the embedding already exists in ChromaDB
162
+ embedding_exists = collection.get(ids=[f"doc_{media_id}"])
163
+
164
+ if embedding_exists:
165
+ logging.info(f"Embedding already exists for media ID {media_id}, skipping...")
166
+ continue # Skip if embedding already exists
167
+
168
+ # Create the embedding
169
+ embedding = create_embedding(text)
170
+
171
+ # Collect the text, embedding, and ID for batch storage
172
+ texts_to_embed.append(text)
173
+ embeddings_to_store.append(embedding)
174
+ ids_to_store.append(f"doc_{media_id}")
175
+
176
+ # Store all new embeddings in ChromaDB
177
+ if texts_to_embed and embeddings_to_store:
178
+ store_in_chroma(collection_name, texts_to_embed, embeddings_to_store, ids_to_store)
179
+
180
+ return "Embeddings created and stored successfully for all new content."
181
+ except Exception as e:
182
+ logging.error(f"Error during embedding creation: {str(e)}")
183
+ return f"Error: {str(e)}"
184
+
185
+
186
+ def get_all_content_from_database() -> List[Dict[str, Any]]:
187
+ """
188
+ Retrieve all media content from the database that requires embedding.
189
+
190
+ Returns:
191
+ List[Dict[str, Any]]: A list of dictionaries, each containing the media ID, content, title, and other relevant fields.
192
+ """
193
+ try:
194
+ from App_Function_Libraries.DB_Manager import db
195
+ with db.get_connection() as conn:
196
+ cursor = conn.cursor()
197
+ cursor.execute("""
198
+ SELECT id, content, title, author, type
199
+ FROM Media
200
+ WHERE is_trash = 0 -- Exclude items marked as trash
201
+ """)
202
+ media_items = cursor.fetchall()
203
+
204
+ # Convert the results into a list of dictionaries
205
+ all_content = [
206
+ {
207
+ 'id': item[0],
208
+ 'content': item[1],
209
+ 'title': item[2],
210
+ 'author': item[3],
211
+ 'type': item[4]
212
+ }
213
+ for item in media_items
214
+ ]
215
+
216
+ return all_content
217
+
218
+ except sqlite3.Error as e:
219
+ logging.error(f"Error retrieving all content from database: {e}")
220
+ from App_Function_Libraries.SQLite_DB import DatabaseError
221
+ raise DatabaseError(f"Error retrieving all content from database: {e}")
222
+
223
+ #
224
+ # End of Functions for ChromaDB
225
  #######################################################################################################################