import os from typing import List, Dict, Tuple, Optional from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.document_loaders import PyPDFLoader from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.vectorstores import Chroma from langchain_community.chat_models import ChatOpenAI from langchain.chains import create_extraction_chain from langchain.prompts import PromptTemplate from dataclasses import dataclass import uuid import json from anthropic import Anthropic import numpy as np from rank_bm25 import BM25Okapi import logging from cohere import Client def reciprocal_rank_fusion(result_lists, weights=None): """Combine multiple ranked lists using reciprocal rank fusion""" fused_scores = {} num_lists = len(result_lists) if weights is None: weights = [1.0] * num_lists for i in range(num_lists): for doc_id, score in result_lists[i]: if doc_id not in fused_scores: fused_scores[doc_id] = 0 fused_scores[doc_id] += weights[i] * score # Sort by score in descending order sorted_results = sorted( fused_scores.items(), key=lambda x: x[1], reverse=True ) return sorted_results os.environ["LANGCHAIN_TRACING_V2"]="true" os.environ["LANGCHAIN_ENDPOINT"]="https://api.smith.langchain.com" os.environ.get("LANGCHAIN_API_KEY") os.environ["LANGCHAIN_PROJECT"]="VELLA" @dataclass class DocumentChunk: content: str page_number: int chunk_id: str start_char: int end_char: int @dataclass class RetrievalConfig: num_chunks: int = 5 embedding_weight: float = 0.5 bm25_weight: float = 0.5 context_window: int = 3 chunk_overlap: int = 200 chunk_size: int = 1000 @dataclass class ContextualizedChunk(DocumentChunk): context: str = "" embedding: Optional[np.ndarray] = None bm25_score: Optional[float] = None class DocumentSummarizer: def __init__(self, openai_api_key: str, cohere_api_key: str, embedding_model, chunk_size, chunk_overlap, num_k_rerank, model_cohere_rerank): self.openai_api_key = openai_api_key self.cohere_client = Client(cohere_api_key) self.embeddings = HuggingFaceEmbeddings( model_name=embedding_model ) self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap ) self.chunk_metadata = {} # Store chunk metadata for tracing self.num_k_rerank = num_k_rerank self.model_cohere_rerank = model_cohere_rerank def load_and_split_document(self, pdf_path: str) -> List[DocumentChunk]: """Load PDF and split into chunks with metadata""" loader = PyPDFLoader(pdf_path) pages = loader.load() chunks = [] char_count = 0 for page in pages: text = page.page_content # Split the page content page_chunks = self.text_splitter.split_text(text) for chunk in page_chunks: chunk_id = str(uuid.uuid4()) start_char = text.find(chunk) end_char = start_char + len(chunk) doc_chunk = DocumentChunk( content=chunk, page_number=page.metadata.get('page') + 1, # 1-based page numbering chunk_id=chunk_id, start_char=char_count + start_char, end_char=char_count + end_char ) chunks.append(doc_chunk) # Store metadata for later retrieval self.chunk_metadata[chunk_id] = { 'page': doc_chunk.page_number, 'start_char': doc_chunk.start_char, 'end_char': doc_chunk.end_char } char_count += len(text) return chunks def create_vector_store(self, chunks: List[DocumentChunk]) -> Chroma: """Create vector store with metadata""" texts = [chunk.content for chunk in chunks] metadatas = [{ 'chunk_id': chunk.chunk_id, 'page': chunk.page_number, 'start_char': chunk.start_char, 'end_char': chunk.end_char } for chunk in chunks] vector_store = Chroma.from_texts( texts=texts, metadatas=metadatas, embedding=self.embeddings ) return vector_store def rerank_chunks( self, chunks: List[Dict], query: str, k: int = 5 ) -> List[Dict]: """ Rerank chunks using Cohere's reranking model. Args: chunks: List of dictionaries containing chunks and their metadata query: Original search query k: Number of top chunks to return Returns: List of reranked chunks with updated relevance scores """ try: # Prepare documents for reranking documents = [chunk['content'] for chunk in chunks] # Get reranking scores from Cohere results = self.cohere_client.rerank( query=query, documents=documents, top_n=k, model=self.model_cohere_rerank ) # Create reranked results with original metadata reranked_chunks = [] for hit in results: original_chunk = chunks[hit.index] reranked_chunks.append({ **original_chunk, 'relevance_score': hit.relevance_score }) return reranked_chunks except Exception as e: logging.error(f"Reranking failed: {str(e)}") return chunks[:k] # Fallback to original ordering def generate_summary_with_sources( self, vector_store: Chroma, query: str = "Summarize the main points of this document" ) -> List[Dict]: """Generate summary with source citations using reranking""" # Retrieve more initial chunks for reranking relevant_docs = vector_store.similarity_search_with_score(query, k=20) # Prepare chunks for reranking chunks = [] for doc, score in relevant_docs: chunks.append({ 'content': doc.page_content, 'page': doc.metadata['page'], 'chunk_id': doc.metadata['chunk_id'], 'relevance_score': score }) # Rerank chunks reranked_chunks = self.rerank_chunks(chunks, query, k=self.num_k_rerank) # Prepare context and sources from reranked chunks contexts = [] sources = [] for chunk in reranked_chunks: contexts.append(chunk['content']) sources.append({ 'content': chunk['content'], 'page': chunk['page'], 'chunk_id': chunk['chunk_id'], 'relevance_score': chunk['relevance_score'] }) prompt_template = """ Based on the following context, provide multiple key points from the document. For each point, create a new paragraph. Each paragraph should be a complete, self-contained insight. Context: {context} Key points: """ prompt = PromptTemplate( template=prompt_template, input_variables=["context"] ) llm = ChatOpenAI( temperature=0, model_name="gpt-4o-mini", api_key=self.openai_api_key ) response = llm.predict(prompt.format(context="\n\n".join(contexts))) # Split the response into paragraphs summaries = [p.strip() for p in response.split('\n\n') if p.strip()] # Create structured output structured_output = [] for idx, summary in enumerate(summaries): # Associate each summary with the most relevant source structured_output.append({ "content": summary, "source": { "page": sources[min(idx, len(sources)-1)]['page'], "text": sources[min(idx, len(sources)-1)]['content'][:200] + "...", "relevance_score": sources[min(idx, len(sources)-1)]['relevance_score'] } }) return structured_output def get_source_context(self, chunk_id: str, window: int = 100) -> Dict: """Get extended context around a specific chunk""" metadata = self.chunk_metadata.get(chunk_id) if not metadata: return None return { 'page': metadata['page'], 'start_char': metadata['start_char'], 'end_char': metadata['end_char'] } class ContextualRetriever: def __init__(self, config: RetrievalConfig, claude_api_key: str, claude_context_model): self.config = config self.claude_client = Anthropic(api_key=claude_api_key) self.logger = logging.getLogger(__name__) self.bm25 = None self.claude_context_model = claude_context_model def generate_context(self, full_text: str, chunk: DocumentChunk) -> str: """Generate contextual description using Claude""" try: prompt = f""" {full_text} Here is the chunk we want to situate within the whole document {chunk.content} Please give a short succinct context to situate this chunk within the overall document for the purposes of improving search retrieval of the chunk. Answer only with the succinct context and nothing else.""" response = self.claude_client.messages.create( model=self.claude_context_model, max_tokens=100, messages=[{"role": "user", "content": prompt}] ) return response.content[0].text except Exception as e: self.logger.error(f"Context generation failed for chunk {chunk.chunk_id}: {str(e)}") return "" def contextualize_chunks(self, full_text: str, chunks: List[DocumentChunk]) -> List[ContextualizedChunk]: """Add context to all chunks""" contextualized_chunks = [] for chunk in chunks: context = self.generate_context(full_text, chunk) contextualized_chunk = ContextualizedChunk( content=chunk.content, page_number=chunk.page_number, chunk_id=chunk.chunk_id, start_char=chunk.start_char, end_char=chunk.end_char, context=context ) contextualized_chunks.append(contextualized_chunk) return contextualized_chunks class EnhancedDocumentSummarizer(DocumentSummarizer): def __init__(self, openai_api_key: str, claude_api_key: str, config: RetrievalConfig, embedding_model, chunk_size, chunk_overlap, num_k_rerank, model_cohere_rerank, claude_context_model, system_prompt, gpt_model, gpt_temperature): super().__init__(openai_api_key, os.environ.get("COHERE_API_KEY"), embedding_model, chunk_size, chunk_overlap, num_k_rerank, model_cohere_rerank) self.config = config self.contextual_retriever = ContextualRetriever(config, claude_api_key, claude_context_model) self.logger = logging.getLogger(__name__) self.system_prompt = system_prompt self.gpt_model = gpt_model self.gpt_temperature = gpt_temperature def create_enhanced_vector_store(self, chunks: List[ContextualizedChunk]) -> Tuple[Chroma, BM25Okapi, List[str]]: """Create vector store and BM25 index with contextualized chunks""" try: # Prepare texts with context texts = [f"{chunk.context} {chunk.content}" for chunk in chunks] # Create vector store metadatas = [{ 'chunk_id': chunk.chunk_id, 'page': chunk.page_number, 'start_char': chunk.start_char, 'end_char': chunk.end_char, 'context': chunk.context } for chunk in chunks] vector_store = Chroma.from_texts( texts=texts, metadatas=metadatas, embedding=self.embeddings ) # Create BM25 index tokenized_texts = [text.split() for text in texts] bm25 = BM25Okapi(tokenized_texts) # Get chunk IDs in order chunk_ids = [chunk.chunk_id for chunk in chunks] return vector_store, bm25, chunk_ids except Exception as e: self.logger.error(f"Error creating enhanced vector store: {str(e)}") raise def retrieve_with_rank_fusion( self, vector_store: Chroma, bm25: BM25Okapi, chunk_ids: List[str], query: str ) -> List[Dict]: """Combine embedding and BM25 retrieval results""" try: # Get embedding results embedding_results = vector_store.similarity_search_with_score( query, k=self.config.num_chunks ) # Convert embedding results to list of (chunk_id, score) embedding_list = [ (doc.metadata['chunk_id'], 1 / (1 + score)) for doc, score in embedding_results ] # Get BM25 results tokenized_query = query.split() bm25_scores = bm25.get_scores(tokenized_query) # Convert BM25 scores to list of (chunk_id, score) bm25_list = [ (chunk_ids[i], float(score)) for i, score in enumerate(bm25_scores) ] # Sort bm25_list by score in descending order and limit to top N results bm25_list = sorted(bm25_list, key=lambda x: x[1], reverse=True)[:self.config.num_chunks] # Normalize BM25 scores max_bm25 = max([score for _, score in bm25_list]) if bm25_list else 1 bm25_list = [(doc_id, score / max_bm25) for doc_id, score in bm25_list] # Pass the lists to rank fusion result_lists = [embedding_list, bm25_list] weights = [self.config.embedding_weight, self.config.bm25_weight] combined_results = reciprocal_rank_fusion( result_lists, weights=weights ) return combined_results except Exception as e: self.logger.error(f"Error in rank fusion retrieval: {str(e)}") raise def generate_enhanced_summary( self, vector_store: Chroma, bm25: BM25Okapi, chunk_ids: List[str], query: str = "Summarize the main points of this document" ) -> List[Dict]: """Generate enhanced summary using both vector and BM25 retrieval""" try: # Get combined results using rank fusion ranked_results = self.retrieve_with_rank_fusion( vector_store, bm25, chunk_ids, query ) # Prepare context and track sources contexts = [] sources = [] # Get full documents for top results for chunk_id, score in ranked_results[:self.config.num_chunks]: results = vector_store.get( where={"chunk_id": chunk_id}, include=["documents", "metadatas"] ) if results["documents"]: context = results["documents"][0] metadata = results["metadatas"][0] contexts.append(context) sources.append({ 'content': context, 'page': metadata['page'], 'chunk_id': chunk_id, 'relevance_score': score, 'context': metadata.get('context', '') }) prompt_template = self.system_prompt prompt = PromptTemplate( template=prompt_template, input_variables=["context"] ) llm = ChatOpenAI( temperature=self.gpt_temperature, model_name=self.gpt_model, api_key=self.openai_api_key, ) response = llm.predict(prompt.format(context="\n\n".join(contexts))) # Split the response into paragraphs summaries = [p.strip() for p in response.split('\n\n') if p.strip()] # Create structured output structured_output = [] for idx, summary in enumerate(summaries): source_idx = min(idx, len(sources)-1) structured_output.append({ "content": summary, "source": { "page": sources[source_idx]['page'], "text": sources[source_idx]['content'][:200] + "...", "context": sources[source_idx]['context'], "relevance_score": sources[source_idx]['relevance_score'], "chunk_id": sources[source_idx]['chunk_id'] } }) return structured_output except Exception as e: self.logger.error(f"Error generating enhanced summary: {str(e)}") raise def get_llm_summary_answer_by_cursor_complete(serializer, listaPDFs): allPdfsChunks = [] # Configuration config = RetrievalConfig( num_chunks=serializer["num_chunks_retrieval"], embedding_weight=serializer["embedding_weight"], bm25_weight=serializer["bm25_weight"], context_window=serializer["context_window"], chunk_overlap=serializer["chunk_overlap"] ) # Initialize enhanced summarizer summarizer = EnhancedDocumentSummarizer( openai_api_key=os.environ.get("OPENAI_API_KEY"), claude_api_key= os.environ.get("CLAUDE_API_KEY"), config=config, embedding_model=serializer["hf_embedding"], chunk_overlap=serializer["chunk_overlap"], chunk_size=serializer["chunk_size"], num_k_rerank=serializer["num_k_rerank"], model_cohere_rerank=serializer["model_cohere_rerank"], claude_context_model=serializer["claude_context_model"], system_prompt=serializer["system_prompt"], gpt_model=serializer["model"], gpt_temperature=serializer["gpt_temperature"] ) # # Load and process document # pdf_path = "./Im_a_storyteller.pdf" # chunks = summarizer.load_and_split_document(pdf_path) # Load and process document for pdf in listaPDFs: pdf_path = pdf chunks = summarizer.load_and_split_document(pdf_path) allPdfsChunks = allPdfsChunks + chunks # Get full text for contextualization loader = PyPDFLoader(pdf_path) pages = loader.load() full_text = " ".join([page.page_content for page in pages]) # Contextualize chunks contextualized_chunks = summarizer.contextual_retriever.contextualize_chunks(full_text, allPdfsChunks) # Create enhanced vector store and BM25 index vector_store, bm25, chunk_ids = summarizer.create_enhanced_vector_store(contextualized_chunks) # Generate enhanced summary structured_summaries = summarizer.generate_enhanced_summary( vector_store, bm25, chunk_ids, serializer["user_message"] ) # Output results as JSON json_output = json.dumps(structured_summaries, indent=2) print("\nStructured Summaries:") print(json_output) return { "resultado": structured_summaries, "parametros-utilizados": { "num_chunks_retrieval": serializer["num_chunks_retrieval"], "embedding_weight": serializer["embedding_weight"], "bm25_weight": serializer["bm25_weight"], "context_window": serializer["context_window"], "chunk_overlap": serializer["chunk_overlap"], "num_k_rerank": serializer["num_k_rerank"], "model_cohere_rerank": serializer["model_cohere_rerank"], "more_initial_chunks_for_reranking": serializer["more_initial_chunks_for_reranking"], "claude_context_model": serializer["claude_context_model"], "gpt_temperature": serializer["gpt_temperature"], "user_message": serializer["user_message"], "model": serializer["model"], "hf_embedding": serializer["hf_embedding"], "chunk_size": serializer["chunk_size"], "chunk_overlap": serializer["chunk_overlap"], "system_prompt": serializer["system_prompt"], }}