talk-to-your-docs / ttyd_functions.py
arslan-ahmed's picture
added Watsonx models
1b37f68
raw
history blame
11.5 kB
import datetime
import uuid
import openai
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import SentenceTransformerEmbeddings
import os
from langchain.document_loaders import WebBaseLoader, TextLoader, Docx2txtLoader, PyMuPDFLoader
from whatsapp_chat_custom import WhatsAppChatLoader # use this instead of from langchain.document_loaders import WhatsAppChatLoader
from collections import deque
import re
from bs4 import BeautifulSoup
import requests
from urllib.parse import urlparse
import mimetypes
from pathlib import Path
import tiktoken
# Regex pattern to match a URL
HTTP_URL_PATTERN = r'^http[s]*://.+'
mimetypes.init()
media_files = tuple([x for x in mimetypes.types_map if mimetypes.types_map[x].split('/')[0] in ['image', 'video', 'audio']])
filter_strings = ['/email-protection#']
def getOaiCreds(key):
if key:
return {'service': 'openai',
'oai_key' : key
}
else:
return {}
def getWxCreds(key, p_id):
if key and p_id:
return {'service': 'watsonx',
'credentials' : {"url": "https://us-south.ml.cloud.ibm.com", "apikey": key },
'project_id': p_id
}
else:
return {}
def getPersonalBotApiKey():
if os.getenv("OPENAI_API_KEY"):
return getOaiCreds(os.getenv("OPENAI_API_KEY"))
elif os.getenv("WX_API_KEY") and os.getenv("WX_PROJECT_ID"):
return getWxCreds(os.getenv("WX_API_KEY"), os.getenv("WX_PROJECT_ID"))
else:
return {}
def get_hyperlinks(url):
try:
reqs = requests.get(url)
if not reqs.headers.get('Content-Type').startswith("text/html") or 400<=reqs.status_code<600:
return []
soup = BeautifulSoup(reqs.text, 'html.parser')
except Exception as e:
print(e)
return []
hyperlinks = []
for link in soup.find_all('a', href=True):
hyperlinks.append(link.get('href'))
return hyperlinks
# Function to get the hyperlinks from a URL that are within the same domain
def get_domain_hyperlinks(local_domain, url):
clean_links = []
for link in set(get_hyperlinks(url)):
clean_link = None
# If the link is a URL, check if it is within the same domain
if re.search(HTTP_URL_PATTERN, link):
# Parse the URL and check if the domain is the same
url_obj = urlparse(link)
if url_obj.netloc.replace('www.','') == local_domain.replace('www.',''):
clean_link = link
# If the link is not a URL, check if it is a relative link
else:
if link.startswith("/"):
link = link[1:]
elif link.startswith(("#", '?', 'mailto:')):
continue
if 'wp-content/uploads' in url:
clean_link = url+ "/" + link
else:
clean_link = "https://" + local_domain + "/" + link
if clean_link is not None:
clean_link = clean_link.strip().rstrip('/').replace('/../', '/')
if not any(x in clean_link for x in filter_strings):
clean_links.append(clean_link)
# Return the list of hyperlinks that are within the same domain
return list(set(clean_links))
# this function will get you a list of all the URLs from the base URL
def crawl(url, local_domain, prog=None):
# Create a queue to store the URLs to crawl
queue = deque([url])
# Create a set to store the URLs that have already been seen (no duplicates)
seen = set([url])
# While the queue is not empty, continue crawling
while queue:
# Get the next URL from the queue
url_pop = queue.pop()
# Get the hyperlinks from the URL and add them to the queue
for link in get_domain_hyperlinks(local_domain, url_pop):
if link not in seen:
queue.append(link)
seen.add(link)
if len(seen)>=100:
return seen
if prog is not None: prog(1, desc=f'Crawling: {url_pop}')
return seen
def ingestURL(documents, url, crawling=True, prog=None):
url = url.rstrip('/')
# Parse the URL and get the domain
local_domain = urlparse(url).netloc
if not (local_domain and url.startswith('http')):
return documents
print('Loading URL', url)
if crawling:
# crawl to get other webpages from this URL
if prog is not None: prog(0, desc=f'Crawling: {url}')
links = crawl(url, local_domain, prog)
if prog is not None: prog(1, desc=f'Crawling: {url}')
else:
links = set([url])
# separate pdf and other links
c_links, pdf_links = [], []
for x in links:
if x.endswith('.pdf'):
pdf_links.append(x)
elif not x.endswith(media_files):
c_links.append(x)
# Clean links loader using WebBaseLoader
if prog is not None: prog(0.5, desc=f'Ingesting: {url}')
if c_links:
loader = WebBaseLoader(list(c_links))
documents.extend(loader.load())
# remote PDFs loader
for pdf_link in list(pdf_links):
loader = PyMuPDFLoader(pdf_link)
doc = loader.load()
for x in doc:
x.metadata['source'] = loader.source
documents.extend(doc)
return documents
def ingestFiles(documents, files_list, prog=None):
for fPath in files_list:
doc = None
if fPath.endswith('.pdf'):
doc = PyMuPDFLoader(fPath).load()
elif fPath.endswith('.txt') and not 'WhatsApp Chat with' in fPath:
doc = TextLoader(fPath).load()
elif fPath.endswith(('.doc', 'docx')):
doc = Docx2txtLoader(fPath).load()
elif 'WhatsApp Chat with' in fPath and fPath.endswith('.csv'): # Convert Whatsapp TXT files to CSV using https://whatstk.streamlit.app/
doc = WhatsAppChatLoader(fPath).load()
else:
pass
if doc is not None and doc[0].page_content:
if prog is not None: prog(1, desc='Loaded file: '+fPath.rsplit('/')[0])
print('Loaded file:', fPath)
documents.extend(doc)
return documents
def data_ingestion(inputDir=None, file_list=[], url_list=[], prog=None):
documents = []
# Ingestion from Input Directory
if inputDir is not None:
files = [str(x) for x in Path(inputDir).glob('**/*')]
documents = ingestFiles(documents, files)
if file_list:
documents = ingestFiles(documents, file_list, prog)
# Ingestion from URLs - also try https://python.langchain.com/docs/integrations/document_loaders/recursive_url_loader
if url_list:
for url in url_list:
documents = ingestURL(documents, url, prog=prog)
# Cleanup documents
for x in documents:
if 'WhatsApp Chat with' not in x.metadata['source']:
x.page_content = x.page_content.strip().replace('\n', ' ').replace('\\n', ' ').replace(' ', ' ')
# print(f"Total number of documents: {len(documents)}")
return documents
def split_docs(documents):
# Splitting and Chunks
text_splitter = RecursiveCharacterTextSplitter(chunk_size=2500, chunk_overlap=250) # default chunk size of 4000 makes around 1k tokens per doc. with k=4, this means 4k tokens input to LLM.
docs = text_splitter.split_documents(documents)
return docs
def getSourcesFromMetadata(metadata, sourceOnly=True, sepFileUrl=True):
# metadata: list of metadata dict from all documents
setSrc = set()
for x in metadata:
metadataText = '' # we need to convert each metadata dict into a string format. This string will be added to a set
if x is not None:
# extract source first, and then extract all other items
source = x['source']
source = source.rsplit('/',1)[-1] if 'http' not in source else source
notSource = []
for k,v in x.items():
if v is not None and k!='source' and k in ['page', 'title']:
notSource.extend([f"{k}: {v}"])
metadataText = ', '.join([f'source: {source}'] + notSource) if sourceOnly==False else source
setSrc.add(metadataText)
if sepFileUrl:
src_files = '\n'.join(([f"{i+1}) {x}" for i,x in enumerate(sorted([x for x in setSrc if 'http' not in x], key=str.casefold))]))
src_urls = '\n'.join(([f"{i+1}) {x}" for i,x in enumerate(sorted([x for x in setSrc if 'http' in x], key=str.casefold))]))
src_files = 'Files:\n'+src_files if src_files else ''
src_urls = 'URLs:\n'+src_urls if src_urls else ''
newLineSep = '\n\n' if src_files and src_urls else ''
return src_files + newLineSep + src_urls , len(setSrc)
else:
src_docs = '\n'.join(([f"{i+1}) {x}" for i,x in enumerate(sorted(list(setSrc), key=str.casefold))]))
return src_docs, len(setSrc)
def getEmbeddingFunc(creds):
# OpenAI key used
if creds.get('service')=='openai':
embeddings = OpenAIEmbeddings(openai_api_key=creds.get('oai_key','Null'))
# WX key used
elif creds.get('service')=='watsonx':
# testModel = Model(model_id=ModelTypes.FLAN_UL2, credentials=creds['credentials'], project_id=creds['project_id']) # test the API key
# del testModel
embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") # for now use OpenSource model for embedding as WX doesnt have any embedding model
else:
raise Exception('Error: Invalid or None Credentials')
return embeddings
def getVsDict(embeddingFunc, docs, vsDict={}):
# create chroma client if doesnt exist
if vsDict.get('chromaClient') is None:
vsDict['chromaDir'] = './vecstore/'+str(uuid.uuid1())
vsDict['chromaClient'] = Chroma(embedding_function=embeddingFunc, persist_directory=vsDict['chromaDir'])
# clear chroma client before adding new docs
if vsDict['chromaClient']._collection.count()>0:
vsDict['chromaClient'].delete(vsDict['chromaClient'].get()['ids'])
# add new docs to chroma client
vsDict['chromaClient'].add_documents(docs)
print('vectorstore count:',vsDict['chromaClient']._collection.count(), 'at', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
return vsDict
# used for Hardcoded documents only - not uploaded by user (userData_vecStore is separate function)
def localData_vecStore(embKey={}, inputDir=None, file_list=[], url_list=[], vsDict={}):
documents = data_ingestion(inputDir, file_list, url_list)
if not documents:
raise Exception('Error: No Documents Found')
docs = split_docs(documents)
# Embeddings
embeddings = getEmbeddingFunc(embKey)
# create chroma client if doesnt exist
vsDict_hd = getVsDict(embeddings, docs, vsDict)
# get sources from metadata
src_str = getSourcesFromMetadata(vsDict_hd['chromaClient'].get()['metadatas'])
src_str = str(src_str[1]) + ' source document(s) successfully loaded in vector store.'+'\n\n' + src_str[0]
print(src_str)
return vsDict_hd
def num_tokens_from_string(string, encoding_name = "cl100k_base"):
"""Returns the number of tokens in a text string."""
encoding = tiktoken.get_encoding(encoding_name)
num_tokens = len(encoding.encode(string))
return num_tokens