File size: 15,486 Bytes
ed28876
fa9a583
 
ed28876
fa9a583
 
 
 
 
 
 
 
 
 
 
ed28876
 
 
 
fa9a583
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed28876
 
 
fa9a583
 
 
ed28876
fa9a583
 
 
ed28876
 
fa9a583
 
 
 
 
 
ed28876
fa9a583
ed28876
fa9a583
 
 
 
 
 
 
 
ed28876
fa9a583
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed28876
 
fa9a583
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed28876
fa9a583
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed28876
fa9a583
 
ed28876
 
fa9a583
ed28876
fa9a583
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
import numpy as np
from typing import List, Tuple, Dict
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer
import math
from functools import lru_cache
from concurrent.futures import ThreadPoolExecutor
import openai
from transformers import T5ForConditionalGeneration, T5Tokenizer
import torch
import re
import psycopg2
from psycopg2.extras import execute_values
import sqlite3
import logging



########################################################################################################################################################################################################################################
#
# RAG Chunking
# To fully integrate this chunking system, you'd need to:
#
# Create the UnvectorizedMediaChunks table in your SQLite database.
# Modify your document ingestion process to use chunk_and_store_unvectorized.
# Implement a background process that periodically calls vectorize_all_documents to process unvectorized chunks.

# This chunking is pretty weak and needs improvement
# See notes for improvements #FIXME
import json
from typing import List, Dict, Any
from datetime import datetime


def chunk_and_store_unvectorized(

        db_connection,

        media_id: int,

        text: str,

        chunk_size: int = 1000,

        overlap: int = 100,

        chunk_type: str = 'fixed-length'

) -> List[int]:
    chunks = create_chunks(text, chunk_size, overlap)
    return store_unvectorized_chunks(db_connection, media_id, chunks, chunk_type)


def create_chunks(text: str, chunk_size: int, overlap: int) -> List[Dict[str, Any]]:
    words = text.split()
    chunks = []
    for i in range(0, len(words), chunk_size - overlap):
        chunk_text = ' '.join(words[i:i + chunk_size])
        start_char = text.index(words[i])
        end_char = start_char + len(chunk_text)
        chunks.append({
            'text': chunk_text,
            'start_char': start_char,
            'end_char': end_char,
            'index': len(chunks)
        })
    return chunks


def store_unvectorized_chunks(

        db_connection,

        media_id: int,

        chunks: List[Dict[str, Any]],

        chunk_type: str

) -> List[int]:
    cursor = db_connection.cursor()
    chunk_ids = []
    for chunk in chunks:
        cursor.execute("""

            INSERT INTO UnvectorizedMediaChunks 

            (media_id, chunk_text, chunk_index, start_char, end_char, chunk_type, metadata)

            VALUES (?, ?, ?, ?, ?, ?, ?)

        """, (
            media_id,
            chunk['text'],
            chunk['index'],
            chunk['start_char'],
            chunk['end_char'],
            chunk_type,
            json.dumps({'length': len(chunk['text'])})  # Example metadata
        ))
        chunk_ids.append(cursor.lastrowid)
    db_connection.commit()
    return chunk_ids


def get_unvectorized_chunks(

        db_connection,

        media_id: int,

        limit: int = 100,

        offset: int = 0

) -> List[Dict[str, Any]]:
    cursor = db_connection.cursor()
    cursor.execute("""

        SELECT id, chunk_text, chunk_index, start_char, end_char, chunk_type, metadata

        FROM UnvectorizedMediaChunks

        WHERE media_id = ? AND is_processed = FALSE

        ORDER BY chunk_index

        LIMIT ? OFFSET ?

    """, (media_id, limit, offset))
    return [
        {
            'id': row[0],
            'text': row[1],
            'index': row[2],
            'start_char': row[3],
            'end_char': row[4],
            'type': row[5],
            'metadata': json.loads(row[6])
        }
        for row in cursor.fetchall()
    ]


def mark_chunks_as_processed(db_connection, chunk_ids: List[int]):
    cursor = db_connection.cursor()
    cursor.executemany("""

        UPDATE UnvectorizedMediaChunks

        SET is_processed = TRUE, last_modified = ?

        WHERE id = ?

    """, [(datetime.now(), chunk_id) for chunk_id in chunk_ids])
    db_connection.commit()


# Usage example
def process_media_chunks(db_connection, media_id: int, text: str):
    chunk_ids = chunk_and_store_unvectorized(db_connection, media_id, text)
    print(f"Stored {len(chunk_ids)} unvectorized chunks for media_id {media_id}")

    # Later, when you want to process these chunks:
    unprocessed_chunks = get_unvectorized_chunks(db_connection, media_id)
    # Process chunks (e.g., vectorize them)
    # ...
    # After processing, mark them as processed
    mark_chunks_as_processed(db_connection, [chunk['id'] for chunk in unprocessed_chunks])
###########################################################################################################################################################################################################
#
# RAG System

# To use this updated RAG system in your existing application:
#
# Install required packages:
# pip install sentence-transformers psycopg2-binary scikit-learn transformers torch
# Set up PostgreSQL with pgvector:
#
# Install PostgreSQL and the pgvector extension.
# Create a new database for vector storage.
#
# Update your main application to use the RAG system:
#
# Import the RAGSystem class from this new file.
# Initialize the RAG system with your SQLite and PostgreSQL configurations.
# Use the vectorize_all_documents method to initially vectorize your existing documents.
#
#
# Modify your existing PDF_Ingestion_Lib.py and Book_Ingestion_Lib.py:
#
# After successfully ingesting a document into SQLite, call the vectorization method from the RAG system.

# Example modification for ingest_text_file in Book_Ingestion_Lib.py:
# from RAG_Library import RAGSystem
#
# # Initialize RAG system (do this once in your main application)
# rag_system = RAGSystem(sqlite_path, pg_config)
#
# def ingest_text_file(file_path, title=None, author=None, keywords=None):
#     try:
#         # ... (existing code)
#
#         # Add the text file to the database
#         doc_id = add_media_with_keywords(
#             url=file_path,
#             title=title,
#             media_type='document',
#             content=content,
#             keywords=keywords,
#             prompt='No prompt for text files',
#             summary='No summary for text files',
#             transcription_model='None',
#             author=author,
#             ingestion_date=datetime.now().strftime('%Y-%m-%d')
#         )
#
#         # Vectorize the newly added document
#         rag_system.vectorize_document(doc_id, content)
#
#         return f"Text file '{title}' by {author} ingested and vectorized successfully."
#     except Exception as e:
#         logging.error(f"Error ingesting text file: {str(e)}")
#         return f"Error ingesting text file: {str(e)}"



# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Constants
EMBEDDING_MODEL = 'all-MiniLM-L6-v2'
VECTOR_DIM = 384  # Dimension of the chosen embedding model


class RAGSystem:
    def __init__(self, sqlite_path: str, pg_config: Dict[str, str], cache_size: int = 100):
        self.sqlite_path = sqlite_path
        self.pg_config = pg_config
        self.model = SentenceTransformer(EMBEDDING_MODEL)
        self.cache_size = cache_size

        self._init_postgres()

    def _init_postgres(self):
        with psycopg2.connect(**self.pg_config) as conn:
            with conn.cursor() as cur:
                cur.execute("""

                CREATE TABLE IF NOT EXISTS document_vectors (

                    id SERIAL PRIMARY KEY,

                    document_id INTEGER UNIQUE,

                    vector vector(384)

                )

                """)
            conn.commit()

    @lru_cache(maxsize=100)
    def _get_embedding(self, text: str) -> np.ndarray:
        return self.model.encode([text])[0]

    def vectorize_document(self, doc_id: int, content: str):
        chunks = create_chunks(content, chunk_size=1000, overlap=100)
        for chunk in chunks:
            vector = self._get_embedding(chunk['text'])

            with psycopg2.connect(**self.pg_config) as conn:
                with conn.cursor() as cur:
                    cur.execute("""

                    INSERT INTO document_vectors (document_id, chunk_index, vector, metadata)

                    VALUES (%s, %s, %s, %s)

                    ON CONFLICT (document_id, chunk_index) DO UPDATE SET vector = EXCLUDED.vector

                    """, (doc_id, chunk['index'], vector.tolist(), json.dumps(chunk)))
                conn.commit()

    def vectorize_all_documents(self):
        with sqlite3.connect(self.sqlite_path) as sqlite_conn:
            unprocessed_chunks = get_unvectorized_chunks(sqlite_conn, limit=1000)
            for chunk in unprocessed_chunks:
                self.vectorize_document(chunk['id'], chunk['text'])
            mark_chunks_as_processed(sqlite_conn, [chunk['id'] for chunk in unprocessed_chunks])

    def semantic_search(self, query: str, top_k: int = 5) -> List[Tuple[int, int, float]]:
        query_vector = self._get_embedding(query)

        with psycopg2.connect(**self.pg_config) as conn:
            with conn.cursor() as cur:
                cur.execute("""

                SELECT document_id, chunk_index, 1 - (vector <-> %s) AS similarity

                FROM document_vectors

                ORDER BY vector <-> %s ASC

                LIMIT %s

                """, (query_vector.tolist(), query_vector.tolist(), top_k))
                results = cur.fetchall()

        return results

    def get_document_content(self, doc_id: int) -> str:
        with sqlite3.connect(self.sqlite_path) as conn:
            cur = conn.cursor()
            cur.execute("SELECT content FROM media WHERE id = ?", (doc_id,))
            result = cur.fetchone()
            return result[0] if result else ""

    def bm25_search(self, query: str, top_k: int = 5) -> List[Tuple[int, float]]:
        with sqlite3.connect(self.sqlite_path) as conn:
            cur = conn.cursor()
            cur.execute("SELECT id, content FROM media")
            documents = cur.fetchall()

        vectorizer = TfidfVectorizer(use_idf=True)
        tfidf_matrix = vectorizer.fit_transform([doc[1] for doc in documents])

        query_vector = vectorizer.transform([query])
        doc_lengths = tfidf_matrix.sum(axis=1).A1
        avg_doc_length = np.mean(doc_lengths)

        k1, b = 1.5, 0.75
        scores = []
        for i, doc_vector in enumerate(tfidf_matrix):
            score = np.sum(
                ((k1 + 1) * query_vector.multiply(doc_vector)).A1 /
                (k1 * (1 - b + b * doc_lengths[i] / avg_doc_length) + query_vector.multiply(doc_vector).A1)
            )
            scores.append((documents[i][0], score))

        return sorted(scores, key=lambda x: x[1], reverse=True)[:top_k]

    def combine_search_results(self, bm25_results: List[Tuple[int, float]], vector_results: List[Tuple[int, float]],

                               alpha: float = 0.5) -> List[Tuple[int, float]]:
        combined_scores = {}
        for idx, score in bm25_results + vector_results:
            if idx in combined_scores:
                combined_scores[idx] += score * (alpha if idx in dict(bm25_results) else (1 - alpha))
            else:
                combined_scores[idx] = score * (alpha if idx in dict(bm25_results) else (1 - alpha))
        return sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)

    def expand_query(self, query: str) -> str:
        model = T5ForConditionalGeneration.from_pretrained("t5-small")
        tokenizer = T5Tokenizer.from_pretrained("t5-small")

        input_text = f"expand query: {query}"
        input_ids = tokenizer.encode(input_text, return_tensors="pt")

        outputs = model.generate(input_ids, max_length=50, num_return_sequences=1)
        expanded_query = tokenizer.decode(outputs[0], skip_special_tokens=True)

        return f"{query} {expanded_query}"

    def cross_encoder_rerank(self, query: str, initial_results: List[Tuple[int, float]], top_k: int = 5) -> List[
        Tuple[int, float]]:
        from sentence_transformers import CrossEncoder
        model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

        candidate_docs = [self.get_document_content(doc_id) for doc_id, _ in initial_results[:top_k * 2]]
        pairs = [[query, doc] for doc in candidate_docs]
        scores = model.predict(pairs)

        reranked = sorted(zip(initial_results[:top_k * 2], scores), key=lambda x: x[1], reverse=True)
        return [(idx, score) for (idx, _), score in reranked[:top_k]]

    def rag_query(self, query: str, search_type: str = 'combined', top_k: int = 5, use_hyde: bool = False,

                  rerank: bool = False, expand: bool = False) -> List[Dict[str, any]]:
        try:
            if expand:
                query = self.expand_query(query)

            if use_hyde:
                # Implement HyDE if needed
                pass
            elif search_type == 'vector':
                results = self.semantic_search(query, top_k)
            elif search_type == 'bm25':
                results = self.bm25_search(query, top_k)
            elif search_type == 'combined':
                bm25_results = self.bm25_search(query, top_k)
                vector_results = self.semantic_search(query, top_k)
                results = self.combine_search_results(bm25_results, vector_results)
            else:
                raise ValueError("Invalid search type. Choose 'vector', 'bm25', or 'combined'.")

            if rerank:
                results = self.cross_encoder_rerank(query, results, top_k)

            enriched_results = []
            for doc_id, score in results:
                content = self.get_document_content(doc_id)
                enriched_results.append({
                    "document_id": doc_id,
                    "score": score,
                    "content": content[:500]  # Truncate content for brevity
                })

            return enriched_results
        except Exception as e:
            logger.error(f"An error occurred during RAG query: {str(e)}")
            return []


# Example usage
if __name__ == "__main__":
    sqlite_path = "path/to/your/sqlite/database.db"
    pg_config = {
        "dbname": "your_db_name",
        "user": "your_username",
        "password": "your_password",
        "host": "localhost"
    }

    rag_system = RAGSystem(sqlite_path, pg_config)

    # Vectorize all documents (run this once or periodically)
    rag_system.vectorize_all_documents()

    # Example query
    query = "programming concepts for beginners"
    results = rag_system.rag_query(query, search_type='combined', expand=True, rerank=True)

    print(f"Search results for query: '{query}'\n")
    for i, result in enumerate(results, 1):
        print(f"Result {i}:")
        print(f"Document ID: {result['document_id']}")
        print(f"Score: {result['score']:.4f}")
        print(f"Content snippet: {result['content']}")
        print("---")