Spaces:
Sleeping
Sleeping
File size: 11,103 Bytes
4b9ea4c 5b91e1d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
# Importing necessary libraries
import sys
import os
import time
# # Importing RecursiveUrlLoader for web scraping and BeautifulSoup for HTML parsing
# from langchain.document_loaders.recursive_url_loader import RecursiveUrlLoader
# from bs4 import BeautifulSoup as Soup
# import mimetypes
# # List of URLs to scrape
# urls = ["https://langchain-doc.readthedocs.io/en/latest"]
# # Initialize an empty list to store the documents
# docs = []
# # Looping through each URL in the list - this could take some time!
# stf = time.time() # Start time for performance measurement
# for url in urls:
# try:
# st = time.time() # Start time for performance measurement
# # Create a RecursiveUrlLoader instance with a specified URL and depth
# # The extractor function uses BeautifulSoup to parse the HTML content and extract text
# loader = RecursiveUrlLoader(url=url, max_depth=5, extractor=lambda x: Soup(x, "html.parser").text)
# # Load the documents from the URL and extend the docs list
# docs.extend(loader.load())
# et = time.time() - st # Calculate time taken for splitting
# print(f'Time taken for downloading documents from {url}: {et} seconds.')
# except Exception as e:
# # Print an error message if there is an issue with loading or parsing the URL
# print(f"Failed to load or parse the URL {url}. Error: {e}", file=sys.stderr)
# etf = time.time() - stf # Calculate time taken for splitting
# print(f'Total time taken for downloading {len(docs)} documents: {etf} seconds.')
# # Import necessary modules for text splitting and vectorization
# from langchain.text_splitter import RecursiveCharacterTextSplitter
# import time
# from langchain_community.vectorstores import FAISS
# from langchain.vectorstores.utils import filter_complex_metadata
# from langchain_community.embeddings import HuggingFaceEmbeddings
# # Configure the text splitter
# text_splitter = RecursiveCharacterTextSplitter(
# separators=["\n\n", "\n", "(?<=\. )", " ", ""], # Define the separators for splitting text
# chunk_size=500, # The size of each text chunk
# chunk_overlap=50, # Overlap between chunks to ensure continuity
# length_function=len, # Function to determine the length of each chunk
# )
# try:
# # Stage one: Splitting the documents into chunks for vectorization
# st = time.time() # Start time for performance measurement
# print('Loading documents and creating chunks ...')
# # Split each document into chunks using the configured text splitter
# chunks = text_splitter.create_documents([doc.page_content for doc in docs], metadatas=[doc.metadata for doc in docs])
# et = time.time() - st # Calculate time taken for splitting
# print(f"created "+chunks+" chunks")
# print(f'Time taken for document chunking: {et} seconds.')
# except Exception as e:
# print(f"Error during document chunking: {e}", file=sys.stderr)
# # Path for saving the FAISS index
# FAISS_INDEX_PATH = "./vectorstore/lc-faiss-multi-mpnet-500"
# try:
# # Stage two: Vectorization of the document chunks
# model_name = "sentence-transformers/multi-qa-mpnet-base-dot-v1" # Model used for embedding
# # Initialize HuggingFace embeddings with the specified model
# embeddings = HuggingFaceEmbeddings(model_name=model_name)
# print(f'Loading chunks into vector store ...')
# st = time.time() # Start time for performance measurement
# # Create a FAISS vector store from the document chunks and save it locally
# db = FAISS.from_documents(filter_complex_metadata(chunks), embeddings)
# db.save_local(FAISS_INDEX_PATH)
# et = time.time() - st # Calculate time taken for vectorization
# print(f'Time taken for vectorization and saving: {et} seconds.')
# except Exception as e:
# print(f"Error during vectorization or FAISS index saving: {e}", file=sys.stderr)
# alternatively download a preparaed vectorized index from S3 and load the index into vectorstore
# Import necessary libraries for AWS S3 interaction, file handling, and FAISS vector stores
import boto3
from botocore import UNSIGNED
from botocore.client import Config
import zipfile
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from dotenv import load_dotenv
# Load environment variables from a .env file
config = load_dotenv(".env")
# Retrieve the Hugging Face API token from environment variables
HUGGINGFACEHUB_API_TOKEN = os.getenv('HUGGINGFACEHUB_API_TOKEN')
S3_LOCATION = os.getenv("S3_LOCATION")
try:
# Initialize an S3 client with unsigned configuration for public access
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
# Define the FAISS index path and the destination for the downloaded file
FAISS_INDEX_PATH = './vectorstore/lc-faiss-multi-mpnet-500-markdown'
VS_DESTINATION = FAISS_INDEX_PATH + ".zip"
# Download the pre-prepared vectorized index from the S3 bucket
print("Downloading the pre-prepared vectorized index from S3...")
s3.download_file(S3_LOCATION, 'vectorstores/lc-faiss-multi-mpnet-500-markdown.zip', VS_DESTINATION)
# Extract the downloaded zip file
with zipfile.ZipFile(VS_DESTINATION, 'r') as zip_ref:
zip_ref.extractall('./vectorstore/')
print("Download and extraction completed.")
except Exception as e:
print(f"Error during downloading or extracting from S3: {e}", file=sys.stderr)
# Define the model name for embeddings
model_name = "sentence-transformers/multi-qa-mpnet-base-dot-v1"
try:
# Initialize HuggingFace embeddings with the specified model
embeddings = HuggingFaceEmbeddings(model_name=model_name)
# Load the local FAISS index with the specified embeddings
db = FAISS.load_local(FAISS_INDEX_PATH, embeddings)
print("FAISS index loaded successfully.")
except Exception as e:
print(f"Error during FAISS index loading: {e}", file=sys.stderr)
# Import necessary modules for environment variable management and HuggingFace integration
from langchain_community.llms import HuggingFaceHub
# Initialize the vector store as a retriever for the RAG pipeline
retriever = db.as_retriever()
try:
# Load the model from the Hugging Face Hub
model_id = HuggingFaceHub(repo_id="mistralai/Mixtral-8x7B-Instruct-v0.1", model_kwargs={
"temperature": 0.1, # Controls randomness in response generation (lower value means less random)
"max_new_tokens": 1024, # Maximum number of new tokens to generate in responses
"repetition_penalty": 1.2, # Penalty for repeating the same words (higher value increases penalty)
"return_full_text": False # If False, only the newly generated text is returned; if True, the input is included as well
})
print("Model loaded successfully from Hugging Face Hub.")
except Exception as e:
print(f"Error loading model from Hugging Face Hub: {e}", file=sys.stderr)
# Importing necessary modules for retrieval-based question answering and prompt handling
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain.memory import ConversationBufferMemory
# Declare a global variable 'qa' for the retrieval-based question answering system
global qa
# Define a prompt template for guiding the model's responses
template = """
You are the friendly documentation buddy Arti, if you don't know the answer say 'I don't know' and don't make things up.\
Use the following context (delimited by <ctx></ctx>) and the chat history (delimited by <hs></hs>) to answer the question :
------
<ctx>
{context}
</ctx>
------
<hs>
{history}
</hs>
------
{question}
Answer:
"""
# Create a PromptTemplate object with specified input variables and the defined template
prompt = PromptTemplate.from_template(
#input_variables=["history", "context", "question"], # Variables to be included in the prompt
template=template, # The prompt template as defined above
)
prompt.format(context="context", history="history", question="question")
# Create a memory buffer to manage conversation history
memory = ConversationBufferMemory(
memory_key="history", # Key for storing the conversation history
input_key="question" # Key for the input question
)
# Initialize the RetrievalQA object with the specified model, retriever, and additional configurations
qa = RetrievalQA.from_chain_type(
llm=model_id, # Language model loaded from Hugging Face Hub
retriever=retriever, # The vector store retriever initialized earlier
return_source_documents=True, # Option to return source documents along with responses
chain_type_kwargs={
"verbose": True, # Enables verbose output for debugging and analysis
"memory": memory, # Memory buffer for managing conversation history
"prompt": prompt # Prompt template for guiding the model's responses
}
)
# Import Gradio for UI, along with other necessary libraries
import gradio as gr
import random
import time
# Function to add a new input to the chat history
def add_text(history, text):
# Append the new text to the history with a placeholder for the response
history = history + [(text, None)]
return history, ""
# Function representing the bot's response mechanism
def bot(history):
# Obtain the response from the 'infer' function using the latest input
response = infer(history[-1][0], history)
# Update the history with the bot's response
history[-1][1] = response['result']
return history
# Function to infer the response using the RAG model
def infer(question, history):
# Use the question and history to query the RAG model
result = qa({"query": question, "history": history, "question": question})
return result
# CSS styling for the Gradio interface
css = """
#col-container {max-width: 700px; margin-left: auto; margin-right: auto;}
"""
# HTML content for the Gradio interface title
title = """
<div style="text-align: center;max-width: 700px;">
<h1>Chat with your Documentation</h1>
<p style="text-align: center;">Chat with LangChain Documentation, <br />
You can ask questions about the LangChain docu ;)</p>
</div>
"""
# Building the Gradio interface
with gr.Blocks(css=css) as demo:
with gr.Column(elem_id="col-container"):
gr.HTML(title) # Add the HTML title to the interface
chatbot = gr.Chatbot([], elem_id="chatbot") # Initialize the chatbot component
clear = gr.Button("Clear") # Add a button to clear the chat
# Create a row for the question input
with gr.Row():
question = gr.Textbox(label="Question", placeholder="Type your question and hit Enter ")
# Define the action when the question is submitted
question.submit(add_text, [chatbot, question], [chatbot, question], queue=False).then(
bot, chatbot, chatbot
)
# Define the action for the clear button
clear.click(lambda: None, None, chatbot, queue=False)
# Launch the Gradio demo interface
demo.launch(share=False)
|