import numpy as np from typing import List, Tuple, Dict from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from sentence_transformers import SentenceTransformer import math from functools import lru_cache from concurrent.futures import ThreadPoolExecutor import openai from transformers import T5ForConditionalGeneration, T5Tokenizer import torch import re import psycopg2 from psycopg2.extras import execute_values import sqlite3 import logging ######################################################################################################################################################################################################################################## # # RAG Chunking # To fully integrate this chunking system, you'd need to: # # Create the UnvectorizedMediaChunks table in your SQLite database. # Modify your document ingestion process to use chunk_and_store_unvectorized. # Implement a background process that periodically calls vectorize_all_documents to process unvectorized chunks. # This chunking is pretty weak and needs improvement # See notes for improvements #FIXME import json from typing import List, Dict, Any from datetime import datetime def chunk_and_store_unvectorized( db_connection, media_id: int, text: str, chunk_size: int = 1000, overlap: int = 100, chunk_type: str = 'fixed-length' ) -> List[int]: chunks = create_chunks(text, chunk_size, overlap) return store_unvectorized_chunks(db_connection, media_id, chunks, chunk_type) def create_chunks(text: str, chunk_size: int, overlap: int) -> List[Dict[str, Any]]: words = text.split() chunks = [] for i in range(0, len(words), chunk_size - overlap): chunk_text = ' '.join(words[i:i + chunk_size]) start_char = text.index(words[i]) end_char = start_char + len(chunk_text) chunks.append({ 'text': chunk_text, 'start_char': start_char, 'end_char': end_char, 'index': len(chunks) }) return chunks def store_unvectorized_chunks( db_connection, media_id: int, chunks: List[Dict[str, Any]], chunk_type: str ) -> List[int]: cursor = db_connection.cursor() chunk_ids = [] for chunk in chunks: cursor.execute(""" INSERT INTO UnvectorizedMediaChunks (media_id, chunk_text, chunk_index, start_char, end_char, chunk_type, metadata) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( media_id, chunk['text'], chunk['index'], chunk['start_char'], chunk['end_char'], chunk_type, json.dumps({'length': len(chunk['text'])}) # Example metadata )) chunk_ids.append(cursor.lastrowid) db_connection.commit() return chunk_ids def get_unvectorized_chunks( db_connection, media_id: int, limit: int = 100, offset: int = 0 ) -> List[Dict[str, Any]]: cursor = db_connection.cursor() cursor.execute(""" SELECT id, chunk_text, chunk_index, start_char, end_char, chunk_type, metadata FROM UnvectorizedMediaChunks WHERE media_id = ? AND is_processed = FALSE ORDER BY chunk_index LIMIT ? OFFSET ? """, (media_id, limit, offset)) return [ { 'id': row[0], 'text': row[1], 'index': row[2], 'start_char': row[3], 'end_char': row[4], 'type': row[5], 'metadata': json.loads(row[6]) } for row in cursor.fetchall() ] def mark_chunks_as_processed(db_connection, chunk_ids: List[int]): cursor = db_connection.cursor() cursor.executemany(""" UPDATE UnvectorizedMediaChunks SET is_processed = TRUE, last_modified = ? WHERE id = ? """, [(datetime.now(), chunk_id) for chunk_id in chunk_ids]) db_connection.commit() # Usage example def process_media_chunks(db_connection, media_id: int, text: str): chunk_ids = chunk_and_store_unvectorized(db_connection, media_id, text) print(f"Stored {len(chunk_ids)} unvectorized chunks for media_id {media_id}") # Later, when you want to process these chunks: unprocessed_chunks = get_unvectorized_chunks(db_connection, media_id) # Process chunks (e.g., vectorize them) # ... # After processing, mark them as processed mark_chunks_as_processed(db_connection, [chunk['id'] for chunk in unprocessed_chunks]) ########################################################################################################################################################################################################### # # RAG System # To use this updated RAG system in your existing application: # # Install required packages: # pip install sentence-transformers psycopg2-binary scikit-learn transformers torch # Set up PostgreSQL with pgvector: # # Install PostgreSQL and the pgvector extension. # Create a new database for vector storage. # # Update your main application to use the RAG system: # # Import the RAGSystem class from this new file. # Initialize the RAG system with your SQLite and PostgreSQL configurations. # Use the vectorize_all_documents method to initially vectorize your existing documents. # # # Modify your existing PDF_Ingestion_Lib.py and Book_Ingestion_Lib.py: # # After successfully ingesting a document into SQLite, call the vectorization method from the RAG system. # Example modification for ingest_text_file in Book_Ingestion_Lib.py: # from RAG_Library import RAGSystem # # # Initialize RAG system (do this once in your main application) # rag_system = RAGSystem(sqlite_path, pg_config) # # def ingest_text_file(file_path, title=None, author=None, keywords=None): # try: # # ... (existing code) # # # Add the text file to the database # doc_id = add_media_with_keywords( # url=file_path, # title=title, # media_type='document', # content=content, # keywords=keywords, # prompt='No prompt for text files', # summary='No summary for text files', # transcription_model='None', # author=author, # ingestion_date=datetime.now().strftime('%Y-%m-%d') # ) # # # Vectorize the newly added document # rag_system.vectorize_document(doc_id, content) # # return f"Text file '{title}' by {author} ingested and vectorized successfully." # except Exception as e: # logging.error(f"Error ingesting text file: {str(e)}") # return f"Error ingesting text file: {str(e)}" # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Constants EMBEDDING_MODEL = 'all-MiniLM-L6-v2' VECTOR_DIM = 384 # Dimension of the chosen embedding model class RAGSystem: def __init__(self, sqlite_path: str, pg_config: Dict[str, str], cache_size: int = 100): self.sqlite_path = sqlite_path self.pg_config = pg_config self.model = SentenceTransformer(EMBEDDING_MODEL) self.cache_size = cache_size self._init_postgres() def _init_postgres(self): with psycopg2.connect(**self.pg_config) as conn: with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS document_vectors ( id SERIAL PRIMARY KEY, document_id INTEGER UNIQUE, vector vector(384) ) """) conn.commit() @lru_cache(maxsize=100) def _get_embedding(self, text: str) -> np.ndarray: return self.model.encode([text])[0] def vectorize_document(self, doc_id: int, content: str): chunks = create_chunks(content, chunk_size=1000, overlap=100) for chunk in chunks: vector = self._get_embedding(chunk['text']) with psycopg2.connect(**self.pg_config) as conn: with conn.cursor() as cur: cur.execute(""" INSERT INTO document_vectors (document_id, chunk_index, vector, metadata) VALUES (%s, %s, %s, %s) ON CONFLICT (document_id, chunk_index) DO UPDATE SET vector = EXCLUDED.vector """, (doc_id, chunk['index'], vector.tolist(), json.dumps(chunk))) conn.commit() def vectorize_all_documents(self): with sqlite3.connect(self.sqlite_path) as sqlite_conn: unprocessed_chunks = get_unvectorized_chunks(sqlite_conn, limit=1000) for chunk in unprocessed_chunks: self.vectorize_document(chunk['id'], chunk['text']) mark_chunks_as_processed(sqlite_conn, [chunk['id'] for chunk in unprocessed_chunks]) def semantic_search(self, query: str, top_k: int = 5) -> List[Tuple[int, int, float]]: query_vector = self._get_embedding(query) with psycopg2.connect(**self.pg_config) as conn: with conn.cursor() as cur: cur.execute(""" SELECT document_id, chunk_index, 1 - (vector <-> %s) AS similarity FROM document_vectors ORDER BY vector <-> %s ASC LIMIT %s """, (query_vector.tolist(), query_vector.tolist(), top_k)) results = cur.fetchall() return results def get_document_content(self, doc_id: int) -> str: with sqlite3.connect(self.sqlite_path) as conn: cur = conn.cursor() cur.execute("SELECT content FROM media WHERE id = ?", (doc_id,)) result = cur.fetchone() return result[0] if result else "" def bm25_search(self, query: str, top_k: int = 5) -> List[Tuple[int, float]]: with sqlite3.connect(self.sqlite_path) as conn: cur = conn.cursor() cur.execute("SELECT id, content FROM media") documents = cur.fetchall() vectorizer = TfidfVectorizer(use_idf=True) tfidf_matrix = vectorizer.fit_transform([doc[1] for doc in documents]) query_vector = vectorizer.transform([query]) doc_lengths = tfidf_matrix.sum(axis=1).A1 avg_doc_length = np.mean(doc_lengths) k1, b = 1.5, 0.75 scores = [] for i, doc_vector in enumerate(tfidf_matrix): score = np.sum( ((k1 + 1) * query_vector.multiply(doc_vector)).A1 / (k1 * (1 - b + b * doc_lengths[i] / avg_doc_length) + query_vector.multiply(doc_vector).A1) ) scores.append((documents[i][0], score)) return sorted(scores, key=lambda x: x[1], reverse=True)[:top_k] def combine_search_results(self, bm25_results: List[Tuple[int, float]], vector_results: List[Tuple[int, float]], alpha: float = 0.5) -> List[Tuple[int, float]]: combined_scores = {} for idx, score in bm25_results + vector_results: if idx in combined_scores: combined_scores[idx] += score * (alpha if idx in dict(bm25_results) else (1 - alpha)) else: combined_scores[idx] = score * (alpha if idx in dict(bm25_results) else (1 - alpha)) return sorted(combined_scores.items(), key=lambda x: x[1], reverse=True) def expand_query(self, query: str) -> str: model = T5ForConditionalGeneration.from_pretrained("t5-small") tokenizer = T5Tokenizer.from_pretrained("t5-small") input_text = f"expand query: {query}" input_ids = tokenizer.encode(input_text, return_tensors="pt") outputs = model.generate(input_ids, max_length=50, num_return_sequences=1) expanded_query = tokenizer.decode(outputs[0], skip_special_tokens=True) return f"{query} {expanded_query}" def cross_encoder_rerank(self, query: str, initial_results: List[Tuple[int, float]], top_k: int = 5) -> List[ Tuple[int, float]]: from sentence_transformers import CrossEncoder model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') candidate_docs = [self.get_document_content(doc_id) for doc_id, _ in initial_results[:top_k * 2]] pairs = [[query, doc] for doc in candidate_docs] scores = model.predict(pairs) reranked = sorted(zip(initial_results[:top_k * 2], scores), key=lambda x: x[1], reverse=True) return [(idx, score) for (idx, _), score in reranked[:top_k]] def rag_query(self, query: str, search_type: str = 'combined', top_k: int = 5, use_hyde: bool = False, rerank: bool = False, expand: bool = False) -> List[Dict[str, any]]: try: if expand: query = self.expand_query(query) if use_hyde: # Implement HyDE if needed pass elif search_type == 'vector': results = self.semantic_search(query, top_k) elif search_type == 'bm25': results = self.bm25_search(query, top_k) elif search_type == 'combined': bm25_results = self.bm25_search(query, top_k) vector_results = self.semantic_search(query, top_k) results = self.combine_search_results(bm25_results, vector_results) else: raise ValueError("Invalid search type. Choose 'vector', 'bm25', or 'combined'.") if rerank: results = self.cross_encoder_rerank(query, results, top_k) enriched_results = [] for doc_id, score in results: content = self.get_document_content(doc_id) enriched_results.append({ "document_id": doc_id, "score": score, "content": content[:500] # Truncate content for brevity }) return enriched_results except Exception as e: logger.error(f"An error occurred during RAG query: {str(e)}") return [] # Example usage if __name__ == "__main__": sqlite_path = "path/to/your/sqlite/database.db" pg_config = { "dbname": "your_db_name", "user": "your_username", "password": "your_password", "host": "localhost" } rag_system = RAGSystem(sqlite_path, pg_config) # Vectorize all documents (run this once or periodically) rag_system.vectorize_all_documents() # Example query query = "programming concepts for beginners" results = rag_system.rag_query(query, search_type='combined', expand=True, rerank=True) print(f"Search results for query: '{query}'\n") for i, result in enumerate(results, 1): print(f"Result {i}:") print(f"Document ID: {result['document_id']}") print(f"Score: {result['score']:.4f}") print(f"Content snippet: {result['content']}") print("---")