esg_countries_chatbot / web_scrape_and_pdf_loader.py
bohmian's picture
Update web_scrape_and_pdf_loader.py
5e078a0 verified
import os, os.path
# to search web for results
from urllib.parse import urlparse, quote
import requests
from duckduckgo_search import DDGS
# to present web search results in a table
import pandas as pd
# to get get document chunks, embed and build vector database
# 2 vector databases are built, one for keyword search (BM25), one for semantic (Chroma)
from langchain.document_loaders import WebBaseLoader, PyPDFLoader
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain.retrievers import BM25Retriever
# for saving bm25 retriever
# pickle is not for production, just for prototype
import pickle
# this is for returning top n search results using DuckDuckGo
top_n_results = 10
# chroma vector store will be set up for all the different combinations of chunk sizes and overlaps below
# the process of building up the vector store will take very long
chunk_sizes = [500, 600, 700, 800, 900, 1000, 1250, 1500, 1750, 2000, 2250, 2500, 2750, 3000]
chunk_overlaps = [50, 100, 150, 200]
################################ Search the Web ################################
## Use DuckDuckGo to Search for Top N Results for Each Country's ESG Policies
# Use DuckDuckGo search to loop through each country and save the top N results by searching for
# "{country} sustainability esg newest updated public policy document government"
# After some experimentation the search phrase above seems to give the best results
# for the most recent ESG policies as it contains all the necessary keywords, but it can be changed
# Store the Relevant Links in a Dictionary
# Links are Mostly HTML or PDF
def duckduckgo_scrape(country, search_term, n_search_results):
all_links = []
with DDGS() as ddgs:
results = ddgs.text(search_term, max_results=n_search_results)
for result in results:
result['country'] = country
all_links.append(result)
# Save scraped links into csv
df_links = pd.DataFrame(all_links).rename(columns = {
'title': 'Title',
'href': 'url',
'body': 'Summarized Body',
'country': 'Country'
})
# save scraped links into csv
df_links.to_csv("duck_duck_go_scraped_links.csv")
return all_links, df_links
################################ Load the Documents ################################
## For every search result returned by DuckDuckGo for each country above, scrape the web using the url and convert to documents using Langchain loaders:
# PDF Documents: If link from search result points to PDF document,
# save the PDF permanently in local storage in the folder called 'pdfs', then use PyPDFLoader to convert it to raw documents.
# HTML Documents: If link is just a HTML page, use Langchain WebBaseLoader to convert it to raw documents.
# Metadata: Add country in the metadata, this is an important step as it is needed by RetrieveQA for filtering in the future.
# For PDFs, langchain will use its local path as the source, need to change it back to the online path.
# Save all the documents into a list called "all_documents".
# for adding country metadata
def add_country_metadata(docs, country):
for doc in docs:
doc.metadata['country'] = country
return docs
# for adding source url metadata
def add_url_metadata(docs, url):
for doc in docs:
doc.metadata['source'] = url
return docs
# If link from search result points to PDF document,
# save the PDF permanently in local storage in the folder called 'pdfs',
# then use PyPDFLoader to convert it to raw documents.
def pdf_loader(url, country):
try:
try:
response = requests.get(url)
except:
# sometimes there is ssl error, and the page is actually http://
url = url.replace("https://", "http://")
response = requests.get(url)
# create pdf directory to save pdfs locally
pdf_dir = f"pdfs/{country}"
if not os.path.exists(pdf_dir):
os.makedirs(pdf_dir)
pdf_filename = f"{pdf_dir}/{url.split('/')[-1]}"
with open(pdf_filename, 'wb') as f: # save the pdf locally first
f.write(response.content)
loader = PyPDFLoader(pdf_filename) # then use langchain loader to load it
raw_pdf_documents = loader.load()
raw_pdf_documents = add_country_metadata(raw_pdf_documents, country)
# pdf source data will be populated by Langchain as the local path
# we do not want this, we change it back to the original path on the web instead
raw_pdf_documents = add_url_metadata(raw_pdf_documents, url)
return raw_pdf_documents
except Exception as e:
print(f"Failed to load for {url}")
# Same as above but for pdf in local directory
def pdf_loader_local(pdf_filename, country):
try:
loader = PyPDFLoader(pdf_filename) # then use langchain loader to load it
raw_pdf_documents = loader.load()
raw_pdf_documents = add_country_metadata(raw_pdf_documents, country)
return raw_pdf_documents
except Exception as e:
print(f"Failed to load for {pdf_filename} {e}")
return False
# If link is just a HTML page, use Langchain WebBaseLoader to convert it to raw documents.
def html_loader(url, country):
try:
loader = WebBaseLoader(url)
raw_html_documents = loader.load()
raw_html_documents = add_country_metadata(raw_html_documents, country)
return raw_html_documents
except:
print(f"Failed to load for {url}")
def process_links_load_documents(all_links):
all_documents = [] # store all the documents
for link in all_links:
country = link['country']
title = link['title']
url = link['href']
url = url.replace(" ", "%20") # replace spaces to encoded version e.g. %20
# If url points to PDF documents
if url.endswith('.pdf') or (('.pdf' in url) & ('blob' in url)):
print(f"{country}: Loading PDF from {url}")
docs = pdf_loader(url, country)
if docs is not None: # if error, docs will be None
if isinstance(docs, list):
all_documents.extend(docs)
else:
all_documents.append(docs)
#print(docs)
# If url is just a HTML page
else:
print(f"{country}: Loading HTML from {url}")
docs = html_loader(url, country)
if docs is not None: # if error, docs will be None
if isinstance(docs, list):
all_documents.extend(docs)
else:
all_documents.append(docs)
#print(docs)
# documents return a lot of \n, perform some cleaning
for document in all_documents:
document.page_content = document.page_content.replace('\n', '')
return all_documents
################################ Set Up Chroma Vector Store ################################
# This is for semantic search.
# In the configuration cell at the top, we define all the chunk sizes and overlaps that we are interested in.
# The Chroma vector stores will be set up for each of the configuration, persisted in a different directory.
# These vector stores can be accessed in the main app later.
# Time taken to get the embeddings for every document chunk can be very long.
# Note: If we are using a lot more data than can be stored in the RAM or when in production,
# better to initialize a separate vector store in a server (Postgres or online solutions like Pinecone) before pushing the document chunks to it bit by bit.
def setup_chromadb_vectorstore(hf_embeddings, all_documents, chunk_size, chunk_overlap, country):
chromadb_dir = "chromadb"
if not os.path.exists(chromadb_dir):
os.makedirs(chromadb_dir)
print(f"Processing Chunk Size: {chunk_size}, Chunk Overlap: {chunk_overlap}")
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size, chunk_overlap=chunk_overlap
)
split_documents = text_splitter.split_documents(all_documents)
persist_directory = f"{chromadb_dir}/new_{country}_chunk_{chunk_size}_overlap_{chunk_overlap}_"
# Build the vector database using Chroma and persist it in a local directory
chroma_db = Chroma.from_documents(split_documents,
hf_embeddings,
persist_directory=persist_directory)
chroma_db.persist()
return True # to let user know this process is done
################################ Set Up BM25 Retriever ################################
# This is for keyword search.
# BM25 is a keyword-based algorithm that performs well on queries containing keywords without capturing the semantic meaning of the query terms,
# hence there is no need to embed the text with HuggingFaceEmbeddings and it is relatively faster to set up.
# We will use it with combination of the chroma_db vector store retriver in our application later, with ensemble retriever to re-rank the results.
# The retriever is just a small file so we just store it using pickle, but for production this is still not recommended.
def setup_bm25_retriever(all_documents, chunk_size, chunk_overlap, country):
bm25_dir = "bm25"
if not os.path.exists(bm25_dir):
os.makedirs(bm25_dir)
print(f"Processing Chunk Size: {chunk_size}, Chunk Overlap: {chunk_overlap}, Country: {country}")
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size, chunk_overlap=chunk_overlap
)
split_documents = text_splitter.split_documents(all_documents)
split_documents = [doc for doc in split_documents if doc.metadata['country']==country]
bm25_retriever = BM25Retriever.from_documents(split_documents)
filename = f"{bm25_dir}/new_{country}_chunk_{chunk_size}_overlap_{chunk_overlap}_.pickle"
with open(filename, 'wb') as handle:
pickle.dump(bm25_retriever, handle)
return True # to let user know this process is done