Spaces:
Running
Running
``` | |
################################################################################################################## | |
# RAG Pipeline 1 | |
# 0.62 0.61 0.75 63402.0 | |
# from langchain_openai import ChatOpenAI | |
# | |
# from langchain_community.document_loaders import WebBaseLoader | |
# from langchain_openai import OpenAIEmbeddings | |
# from langchain.text_splitter import RecursiveCharacterTextSplitter | |
# from langchain_chroma import Chroma | |
# | |
# from langchain_community.retrievers import BM25Retriever | |
# from langchain.retrievers import ParentDocumentRetriever | |
# from langchain.storage import InMemoryStore | |
# import os | |
# from operator import itemgetter | |
# from langchain import hub | |
# from langchain_core.output_parsers import StrOutputParser | |
# from langchain_core.runnables import RunnablePassthrough, RunnableParallel, RunnableLambda | |
# from langchain.retrievers import MergerRetriever | |
# from langchain.retrievers.document_compressors import DocumentCompressorPipeline | |
# def rag_pipeline(): | |
# try: | |
# def format_docs(docs): | |
# return "\n".join(doc.page_content for doc in docs) | |
# | |
# llm = ChatOpenAI(model='gpt-4o-mini') | |
# | |
# loader = WebBaseLoader('https://en.wikipedia.org/wiki/European_debt_crisis') | |
# docs = loader.load() | |
# | |
# embedding = OpenAIEmbeddings(model='text-embedding-3-large') | |
# | |
# splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=200) | |
# splits = splitter.split_documents(docs) | |
# c = Chroma.from_documents(documents=splits, embedding=embedding, | |
# collection_name='testindex-ragbuilder-1724657573', ) | |
# retrievers = [] | |
# retriever = c.as_retriever(search_type='mmr', search_kwargs={'k': 10}) | |
# retrievers.append(retriever) | |
# retriever = BM25Retriever.from_documents(docs) | |
# retrievers.append(retriever) | |
# | |
# parent_splitter = RecursiveCharacterTextSplitter(chunk_size=1200, chunk_overlap=600) | |
# splits = parent_splitter.split_documents(docs) | |
# store = InMemoryStore() | |
# retriever = ParentDocumentRetriever(vectorstore=c, docstore=store, child_splitter=splitter, | |
# parent_splitter=parent_splitter) | |
# retriever.add_documents(docs) | |
# retrievers.append(retriever) | |
# retriever = MergerRetriever(retrievers=retrievers) | |
# prompt = hub.pull("rlm/rag-prompt") | |
# rag_chain = ( | |
# RunnableParallel(context=retriever, question=RunnablePassthrough()) | |
# .assign(context=itemgetter("context") | RunnableLambda(format_docs)) | |
# .assign(answer=prompt | llm | StrOutputParser()) | |
# .pick(["answer", "context"])) | |
# return rag_chain | |
# except Exception as e: | |
# print(f"An error occurred: {e}") | |
# To get the answer and context, use the following code | |
# res=rag_pipeline().invoke("your prompt here") | |
# print(res["answer"]) | |
# print(res["context"]) | |
############################################################################################################ | |
############################################################################################################ | |
# RAG Pipeline 2 | |
# 0.6 0.73 0.68 3125.0 | |
# from langchain_openai import ChatOpenAI | |
# | |
# from langchain_community.document_loaders import WebBaseLoader | |
# from langchain_openai import OpenAIEmbeddings | |
# from langchain.text_splitter import RecursiveCharacterTextSplitter | |
# from langchain_chroma import Chroma | |
# from langchain.retrievers.multi_query import MultiQueryRetriever | |
# from langchain.retrievers import ParentDocumentRetriever | |
# from langchain.storage import InMemoryStore | |
# from langchain_community.document_transformers import EmbeddingsRedundantFilter | |
# from langchain.retrievers.document_compressors import LLMChainFilter | |
# from langchain.retrievers.document_compressors import EmbeddingsFilter | |
# from langchain.retrievers import ContextualCompressionRetriever | |
# import os | |
# from operator import itemgetter | |
# from langchain import hub | |
# from langchain_core.output_parsers import StrOutputParser | |
# from langchain_core.runnables import RunnablePassthrough, RunnableParallel, RunnableLambda | |
# from langchain.retrievers import MergerRetriever | |
# from langchain.retrievers.document_compressors import DocumentCompressorPipeline | |
# def rag_pipeline(): | |
# try: | |
# def format_docs(docs): | |
# return "\n".join(doc.page_content for doc in docs) | |
# | |
# llm = ChatOpenAI(model='gpt-4o-mini') | |
# | |
# loader = WebBaseLoader('https://en.wikipedia.org/wiki/European_debt_crisis') | |
# docs = loader.load() | |
# | |
# embedding = OpenAIEmbeddings(model='text-embedding-3-large') | |
# | |
# splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=200) | |
# splits = splitter.split_documents(docs) | |
# c = Chroma.from_documents(documents=splits, embedding=embedding, | |
# collection_name='testindex-ragbuilder-1724650962', ) | |
# retrievers = [] | |
# retriever = MultiQueryRetriever.from_llm(c.as_retriever(search_type='similarity', search_kwargs={'k': 10}), | |
# llm=llm) | |
# retrievers.append(retriever) | |
# | |
# parent_splitter = RecursiveCharacterTextSplitter(chunk_size=1200, chunk_overlap=600) | |
# splits = parent_splitter.split_documents(docs) | |
# store = InMemoryStore() | |
# retriever = ParentDocumentRetriever(vectorstore=c, docstore=store, child_splitter=splitter, | |
# parent_splitter=parent_splitter) | |
# retriever.add_documents(docs) | |
# retrievers.append(retriever) | |
# retriever = MergerRetriever(retrievers=retrievers) | |
# arr_comp = [] | |
# arr_comp.append(EmbeddingsRedundantFilter(embeddings=embedding)) | |
# arr_comp.append(LLMChainFilter.from_llm(llm)) | |
# pipeline_compressor = DocumentCompressorPipeline(transformers=arr_comp) | |
# retriever = ContextualCompressionRetriever(base_retriever=retriever, base_compressor=pipeline_compressor) | |
# prompt = hub.pull("rlm/rag-prompt") | |
# rag_chain = ( | |
# RunnableParallel(context=retriever, question=RunnablePassthrough()) | |
# .assign(context=itemgetter("context") | RunnableLambda(format_docs)) | |
# .assign(answer=prompt | llm | StrOutputParser()) | |
# .pick(["answer", "context"])) | |
# return rag_chain | |
# except Exception as e: | |
# print(f"An error occurred: {e}") | |
# To get the answer and context, use the following code | |
# res=rag_pipeline().invoke("your prompt here") | |
# print(res["answer"]) | |
# print(res["context"]) | |
# | |
# | |
# | |
############################################################################################################ | |
# Plain bm25 retriever | |
# class BM25Retriever(BaseRetriever): | |
# """`BM25` retriever without Elasticsearch.""" | |
# | |
# vectorizer: Any | |
# """ BM25 vectorizer.""" | |
# docs: List[Document] = Field(repr=False) | |
# """ List of documents.""" | |
# k: int = 4 | |
# """ Number of documents to return.""" | |
# preprocess_func: Callable[[str], List[str]] = default_preprocessing_func | |
# """ Preprocessing function to use on the text before BM25 vectorization.""" | |
# | |
# class Config: | |
# arbitrary_types_allowed = True | |
# | |
# @classmethod | |
# def from_texts( | |
# cls, | |
# texts: Iterable[str], | |
# metadatas: Optional[Iterable[dict]] = None, | |
# bm25_params: Optional[Dict[str, Any]] = None, | |
# preprocess_func: Callable[[str], List[str]] = default_preprocessing_func, | |
# **kwargs: Any, | |
# ) -> BM25Retriever: | |
# """ | |
# Create a BM25Retriever from a list of texts. | |
# Args: | |
# texts: A list of texts to vectorize. | |
# metadatas: A list of metadata dicts to associate with each text. | |
# bm25_params: Parameters to pass to the BM25 vectorizer. | |
# preprocess_func: A function to preprocess each text before vectorization. | |
# **kwargs: Any other arguments to pass to the retriever. | |
# | |
# Returns: | |
# A BM25Retriever instance. | |
# """ | |
# try: | |
# from rank_bm25 import BM25Okapi | |
# except ImportError: | |
# raise ImportError( | |
# "Could not import rank_bm25, please install with `pip install " | |
# "rank_bm25`." | |
# ) | |
# | |
# texts_processed = [preprocess_func(t) for t in texts] | |
# bm25_params = bm25_params or {} | |
# vectorizer = BM25Okapi(texts_processed, **bm25_params) | |
# metadatas = metadatas or ({} for _ in texts) | |
# docs = [Document(page_content=t, metadata=m) for t, m in zip(texts, metadatas)] | |
# return cls( | |
# vectorizer=vectorizer, docs=docs, preprocess_func=preprocess_func, **kwargs | |
# ) | |
# | |
# @classmethod | |
# def from_documents( | |
# cls, | |
# documents: Iterable[Document], | |
# *, | |
# bm25_params: Optional[Dict[str, Any]] = None, | |
# preprocess_func: Callable[[str], List[str]] = default_preprocessing_func, | |
# **kwargs: Any, | |
# ) -> BM25Retriever: | |
# """ | |
# Create a BM25Retriever from a list of Documents. | |
# Args: | |
# documents: A list of Documents to vectorize. | |
# bm25_params: Parameters to pass to the BM25 vectorizer. | |
# preprocess_func: A function to preprocess each text before vectorization. | |
# **kwargs: Any other arguments to pass to the retriever. | |
# | |
# Returns: | |
# A BM25Retriever instance. | |
# """ | |
# texts, metadatas = zip(*((d.page_content, d.metadata) for d in documents)) | |
# return cls.from_texts( | |
# texts=texts, | |
# bm25_params=bm25_params, | |
# metadatas=metadatas, | |
# preprocess_func=preprocess_func, | |
# **kwargs, | |
# ) | |
# | |
# def _get_relevant_documents( | |
# self, query: str, *, run_manager: CallbackManagerForRetrieverRun | |
# ) -> List[Document]: | |
# processed_query = self.preprocess_func(query) | |
# return_docs = self.vectorizer.get_top_n(processed_query, self.docs, n=self.k) | |
# return return_docs | |
############################################################################################################ | |
############################################################################################################ | |
# ElasticSearch BM25 Retriever | |
# class ElasticSearchBM25Retriever(BaseRetriever): | |
# """`Elasticsearch` retriever that uses `BM25`. | |
# | |
# To connect to an Elasticsearch instance that requires login credentials, | |
# including Elastic Cloud, use the Elasticsearch URL format | |
# https://username:password@es_host:9243. For example, to connect to Elastic | |
# Cloud, create the Elasticsearch URL with the required authentication details and | |
# pass it to the ElasticVectorSearch constructor as the named parameter | |
# elasticsearch_url. | |
# | |
# You can obtain your Elastic Cloud URL and login credentials by logging in to the | |
# Elastic Cloud console at https://cloud.elastic.co, selecting your deployment, and | |
# navigating to the "Deployments" page. | |
# | |
# To obtain your Elastic Cloud password for the default "elastic" user: | |
# | |
# 1. Log in to the Elastic Cloud console at https://cloud.elastic.co | |
# 2. Go to "Security" > "Users" | |
# 3. Locate the "elastic" user and click "Edit" | |
# 4. Click "Reset password" | |
# 5. Follow the prompts to reset the password | |
# | |
# The format for Elastic Cloud URLs is | |
# https://username:password@cluster_id.region_id.gcp.cloud.es.io:9243. | |
# """ | |
# | |
# client: Any | |
# """Elasticsearch client.""" | |
# index_name: str | |
# """Name of the index to use in Elasticsearch.""" | |
# | |
# @classmethod | |
# def create( | |
# cls, elasticsearch_url: str, index_name: str, k1: float = 2.0, b: float = 0.75 | |
# ) -> ElasticSearchBM25Retriever: | |
# """ | |
# Create a ElasticSearchBM25Retriever from a list of texts. | |
# | |
# Args: | |
# elasticsearch_url: URL of the Elasticsearch instance to connect to. | |
# index_name: Name of the index to use in Elasticsearch. | |
# k1: BM25 parameter k1. | |
# b: BM25 parameter b. | |
# | |
# Returns: | |
# | |
# """ | |
# from elasticsearch import Elasticsearch | |
# | |
# # Create an Elasticsearch client instance | |
# es = Elasticsearch(elasticsearch_url) | |
# | |
# # Define the index settings and mappings | |
# settings = { | |
# "analysis": {"analyzer": {"default": {"type": "standard"}}}, | |
# "similarity": { | |
# "custom_bm25": { | |
# "type": "BM25", | |
# "k1": k1, | |
# "b": b, | |
# } | |
# }, | |
# } | |
# mappings = { | |
# "properties": { | |
# "content": { | |
# "type": "text", | |
# "similarity": "custom_bm25", # Use the custom BM25 similarity | |
# } | |
# } | |
# } | |
# | |
# # Create the index with the specified settings and mappings | |
# es.indices.create(index=index_name, mappings=mappings, settings=settings) | |
# return cls(client=es, index_name=index_name) | |
# | |
# def add_texts( | |
# self, | |
# texts: Iterable[str], | |
# refresh_indices: bool = True, | |
# ) -> List[str]: | |
# """Run more texts through the embeddings and add to the retriever. | |
# | |
# Args: | |
# texts: Iterable of strings to add to the retriever. | |
# refresh_indices: bool to refresh ElasticSearch indices | |
# | |
# Returns: | |
# List of ids from adding the texts into the retriever. | |
# """ | |
# try: | |
# from elasticsearch.helpers import bulk | |
# except ImportError: | |
# raise ImportError( | |
# "Could not import elasticsearch python package. " | |
# "Please install it with `pip install elasticsearch`." | |
# ) | |
# requests = [] | |
# ids = [] | |
# for i, text in enumerate(texts): | |
# _id = str(uuid.uuid4()) | |
# request = { | |
# "_op_type": "index", | |
# "_index": self.index_name, | |
# "content": text, | |
# "_id": _id, | |
# } | |
# ids.append(_id) | |
# requests.append(request) | |
# bulk(self.client, requests) | |
# | |
# if refresh_indices: | |
# self.client.indices.refresh(index=self.index_name) | |
# return ids | |
# | |
# def _get_relevant_documents( | |
# self, query: str, *, run_manager: CallbackManagerForRetrieverRun | |
# ) -> List[Document]: | |
# query_dict = {"query": {"match": {"content": query}}} | |
# res = self.client.search(index=self.index_name, body=query_dict) | |
# | |
# docs = [] | |
# for r in res["hits"]["hits"]: | |
# docs.append(Document(page_content=r["_source"]["content"])) | |
# return docs | |
############################################################################################################ | |
############################################################################################################ | |
# Multi Query Retriever | |
# class MultiQueryRetriever(BaseRetriever): | |
# """Given a query, use an LLM to write a set of queries. | |
# | |
# Retrieve docs for each query. Return the unique union of all retrieved docs. | |
# """ | |
# | |
# retriever: BaseRetriever | |
# llm_chain: Runnable | |
# verbose: bool = True | |
# parser_key: str = "lines" | |
# """DEPRECATED. parser_key is no longer used and should not be specified.""" | |
# include_original: bool = False | |
# """Whether to include the original query in the list of generated queries.""" | |
# | |
# @classmethod | |
# def from_llm( | |
# cls, | |
# retriever: BaseRetriever, | |
# llm: BaseLanguageModel, | |
# prompt: BasePromptTemplate = DEFAULT_QUERY_PROMPT, | |
# parser_key: Optional[str] = None, | |
# include_original: bool = False, | |
# ) -> "MultiQueryRetriever": | |
# """Initialize from llm using default template. | |
# | |
# Args: | |
# retriever: retriever to query documents from | |
# llm: llm for query generation using DEFAULT_QUERY_PROMPT | |
# prompt: The prompt which aims to generate several different versions | |
# of the given user query | |
# include_original: Whether to include the original query in the list of | |
# generated queries. | |
# | |
# Returns: | |
# MultiQueryRetriever | |
# """ | |
# output_parser = LineListOutputParser() | |
# llm_chain = prompt | llm | output_parser | |
# return cls( | |
# retriever=retriever, | |
# llm_chain=llm_chain, | |
# include_original=include_original, | |
# ) | |
# | |
# async def _aget_relevant_documents( | |
# self, | |
# query: str, | |
# *, | |
# run_manager: AsyncCallbackManagerForRetrieverRun, | |
# ) -> List[Document]: | |
# """Get relevant documents given a user query. | |
# | |
# Args: | |
# query: user query | |
# | |
# Returns: | |
# Unique union of relevant documents from all generated queries | |
# """ | |
# queries = await self.agenerate_queries(query, run_manager) | |
# if self.include_original: | |
# queries.append(query) | |
# documents = await self.aretrieve_documents(queries, run_manager) | |
# return self.unique_union(documents) | |
# | |
# async def agenerate_queries( | |
# self, question: str, run_manager: AsyncCallbackManagerForRetrieverRun | |
# ) -> List[str]: | |
# """Generate queries based upon user input. | |
# | |
# Args: | |
# question: user query | |
# | |
# Returns: | |
# List of LLM generated queries that are similar to the user input | |
# """ | |
# response = await self.llm_chain.ainvoke( | |
# {"question": question}, config={"callbacks": run_manager.get_child()} | |
# ) | |
# if isinstance(self.llm_chain, LLMChain): | |
# lines = response["text"] | |
# else: | |
# lines = response | |
# if self.verbose: | |
# logger.info(f"Generated queries: {lines}") | |
# return lines | |
# | |
# async def aretrieve_documents( | |
# self, queries: List[str], run_manager: AsyncCallbackManagerForRetrieverRun | |
# ) -> List[Document]: | |
# """Run all LLM generated queries. | |
# | |
# Args: | |
# queries: query list | |
# | |
# Returns: | |
# List of retrieved Documents | |
# """ | |
# document_lists = await asyncio.gather( | |
# *( | |
# self.retriever.ainvoke( | |
# query, config={"callbacks": run_manager.get_child()} | |
# ) | |
# for query in queries | |
# ) | |
# ) | |
# return [doc for docs in document_lists for doc in docs] | |
# | |
# def _get_relevant_documents( | |
# self, | |
# query: str, | |
# *, | |
# run_manager: CallbackManagerForRetrieverRun, | |
# ) -> List[Document]: | |
# """Get relevant documents given a user query. | |
# | |
# Args: | |
# query: user query | |
# | |
# Returns: | |
# Unique union of relevant documents from all generated queries | |
# """ | |
# queries = self.generate_queries(query, run_manager) | |
# if self.include_original: | |
# queries.append(query) | |
# documents = self.retrieve_documents(queries, run_manager) | |
# return self.unique_union(documents) | |
# | |
# def generate_queries( | |
# self, question: str, run_manager: CallbackManagerForRetrieverRun | |
# ) -> List[str]: | |
# """Generate queries based upon user input. | |
# | |
# Args: | |
# question: user query | |
# | |
# Returns: | |
# List of LLM generated queries that are similar to the user input | |
# """ | |
# response = self.llm_chain.invoke( | |
# {"question": question}, config={"callbacks": run_manager.get_child()} | |
# ) | |
# if isinstance(self.llm_chain, LLMChain): | |
# lines = response["text"] | |
# else: | |
# lines = response | |
# if self.verbose: | |
# logger.info(f"Generated queries: {lines}") | |
# return lines | |
# | |
# def retrieve_documents( | |
# self, queries: List[str], run_manager: CallbackManagerForRetrieverRun | |
# ) -> List[Document]: | |
# """Run all LLM generated queries. | |
# | |
# Args: | |
# queries: query list | |
# | |
# Returns: | |
# List of retrieved Documents | |
# """ | |
# documents = [] | |
# for query in queries: | |
# docs = self.retriever.invoke( | |
# query, config={"callbacks": run_manager.get_child()} | |
# ) | |
# documents.extend(docs) | |
# return documents | |
# | |
# def unique_union(self, documents: List[Document]) -> List[Document]: | |
# """Get unique Documents. | |
# | |
# Args: | |
# documents: List of retrieved Documents | |
# | |
# Returns: | |
# List of unique retrieved Documents | |
# """ | |
# return _unique_documents(documents) | |
############################################################################################################ | |
``` |