Spaces:
Runtime error
Runtime error
import json | |
import uuid | |
from langchain.vectorstores import FAISS | |
import os | |
from tqdm.auto import tqdm | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.document_loaders import DirectoryLoader, TextLoader | |
from llms.embeddings import EMBEDDINGS_MAPPING | |
import tiktoken | |
import zipfile | |
import pickle | |
tokenizer_name = tiktoken.encoding_for_model('gpt-4') | |
tokenizer = tiktoken.get_encoding(tokenizer_name.name) | |
EMBED_MODEL = "text-embedding-ada-002" | |
EMBED_DIM = 1536 | |
METRIC = 'cosine' | |
####################################################################################################################### | |
# Files handler | |
####################################################################################################################### | |
def check_existence(path): | |
return os.path.isfile(path) or os.path.isdir(path) | |
def list_files(directory, ext=".pdf"): | |
# List all files in the directory | |
files_in_directory = os.listdir(directory) | |
# Filter the list to only include PDF files | |
files_list = [file for file in files_in_directory if file.endswith(ext)] | |
return files_list | |
def list_pdf_files(directory): | |
# List all files in the directory | |
files_in_directory = os.listdir(directory) | |
# Filter the list to only include PDF files | |
pdf_files = [file for file in files_in_directory if file.endswith(".pdf")] | |
return pdf_files | |
def tiktoken_len(text): | |
# evaluate how many tokens for the given text | |
tokens = tokenizer.encode(text, disallowed_special=()) | |
return len(tokens) | |
def get_chunks(docs, chunk_size=500, chunk_overlap=20, length_function=tiktoken_len): | |
# docs should be the output of `loader.load()` | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, | |
chunk_overlap=chunk_overlap, | |
length_function=length_function, | |
separators=["\n\n", "\n", " ", ""]) | |
chunks = [] | |
for idx, page in enumerate(tqdm(docs)): | |
source = page.metadata.get('source') | |
content = page.page_content | |
if len(content) > 50: | |
texts = text_splitter.split_text(content) | |
chunks.extend([str({'content': texts[i], 'chunk': i, 'source': os.path.basename(source)}) for i in | |
range(len(texts))]) | |
return chunks | |
####################################################################################################################### | |
# Create FAISS object | |
####################################################################################################################### | |
# ["text-embedding-ada-002", "distilbert-dot-tas_b-b256-msmarco"] | |
def create_faiss_index_from_zip(path_to_zip_file, embeddings=None, pdf_loader=None, | |
chunk_size=500, chunk_overlap=20, | |
project_name="Very_Cool_Project_Name"): | |
# initialize the file structure | |
# structure: project_name | |
# - source data | |
# - embeddings | |
# - faiss_index | |
if isinstance(embeddings, str): | |
import copy | |
embeddings_str = copy.deepcopy(embeddings) | |
else: | |
embeddings_str = "other-embedding-model" | |
if embeddings is None or embeddings == "text-embedding-ada-002": | |
embeddings = EMBEDDINGS_MAPPING["text-embedding-ada-002"] | |
elif isinstance(embeddings, str): | |
embeddings = EMBEDDINGS_MAPPING[embeddings] | |
else: | |
embeddings = EMBEDDINGS_MAPPING["text-embedding-ada-002"] | |
# STEP 1: | |
# Create a folder f"{project_name}" in the current directory. | |
current_directory = os.getcwd() | |
if not os.path.exists(project_name): | |
os.makedirs(project_name) | |
project_path = os.path.join(current_directory, project_name) | |
source_data = os.path.join(project_path, "source_data") | |
embeddings_data = os.path.join(project_path, "embeddings") | |
index_data = os.path.join(project_path, "faiss_index") | |
os.makedirs(source_data) #./project/source_data | |
os.makedirs(embeddings_data) #./project/embeddings | |
os.makedirs(index_data) #./project/faiss_index | |
else: | |
raise ValueError(f"The project {project_name} exists.") | |
with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref: | |
# extract everything to "source_data" | |
zip_ref.extractall(source_data) | |
db_meta = {"project_name": project_name, | |
"pdf_loader": pdf_loader.__name__, "chunk_size": chunk_size, | |
"chunk_overlap": chunk_overlap, | |
"embedding_model": embeddings_str, | |
"files": os.listdir(source_data), | |
"source_path": source_data} | |
with open(os.path.join(project_path, "db_meta.json"), "w", encoding="utf-8") as f: | |
# save db_meta.json to folder | |
json.dump(db_meta, f) | |
all_docs = [] | |
for ext in [".txt", ".tex", ".md", ".pdf"]: | |
if ext in [".txt", ".tex", ".md"]: | |
loader = DirectoryLoader(source_data, glob=f"**/*{ext}", loader_cls=TextLoader, | |
loader_kwargs={'autodetect_encoding': True}) | |
elif ext in [".pdf"]: | |
loader = DirectoryLoader(source_data, glob=f"**/*{ext}", loader_cls=pdf_loader) | |
else: | |
continue | |
docs = loader.load() | |
all_docs = all_docs + docs | |
# split pdf files into chunks and evaluate its embeddings; save all results into embeddings | |
chunks = get_chunks(all_docs, chunk_size, chunk_overlap) | |
text_embeddings = embeddings.embed_documents(chunks) | |
text_embedding_pairs = list(zip(chunks, text_embeddings)) | |
embeddings_save_to = os.path.join(embeddings_data, 'text_embedding_pairs.pickle') | |
with open(embeddings_save_to, 'wb') as handle: | |
pickle.dump(text_embedding_pairs, handle, protocol=pickle.HIGHEST_PROTOCOL) | |
db = FAISS.from_embeddings(text_embedding_pairs, embeddings) | |
db.save_local(index_data) | |
print(db_meta) | |
print("Success!") | |
return db, project_name, db_meta | |
def find_file(file_name, directory): | |
for root, dirs, files in os.walk(directory): | |
if file_name in files: | |
return os.path.join(root, file_name) | |
return None # If the file was not found | |
def find_file_dir(file_name, directory): | |
for root, dirs, files in os.walk(directory): | |
if file_name in files: | |
return root # return the directory instead of the full path | |
return None # If the file was not found | |
def load_faiss_index_from_zip(path_to_zip_file): | |
# Extract the zip file. Read the db_meta | |
# base_name = os.path.basename(path_to_zip_file) | |
path_to_extract = os.path.join(os.getcwd()) | |
with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref: | |
zip_ref.extractall(path_to_extract) | |
db_meta_json = find_file("db_meta.json" , path_to_extract) | |
if db_meta_json is not None: | |
with open(db_meta_json, "r", encoding="utf-8") as f: | |
db_meta_dict = json.load(f) | |
else: | |
raise ValueError("Cannot find `db_meta.json` in the .zip file. ") | |
try: | |
embeddings = EMBEDDINGS_MAPPING[db_meta_dict["embedding_model"]] | |
except: | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
embeddings = OpenAIEmbeddings(model="text-embedding-ada-002") | |
# locate index.faiss | |
index_path = find_file_dir("index.faiss", path_to_extract) | |
if index_path is not None: | |
db = FAISS.load_local(index_path, embeddings) | |
return db | |
else: | |
raise ValueError("Failed to find `index.faiss` in the .zip file.") | |
if __name__ == "__main__": | |
from langchain.document_loaders import PyPDFLoader | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.embeddings import HuggingFaceEmbeddings | |
model_name = "sebastian-hofstaetter/distilbert-dot-tas_b-b256-msmarco" | |
model_kwargs = {'device': 'cpu'} | |
encode_kwargs = {'normalize_embeddings': False} | |
embeddings = HuggingFaceEmbeddings( | |
model_name=model_name, | |
model_kwargs=model_kwargs, | |
encode_kwargs=encode_kwargs) | |
create_faiss_index_from_zip(path_to_zip_file="document.zip", pdf_loader=PyPDFLoader, embeddings=embeddings) | |