Spaces:
Runtime error
Runtime error
File size: 8,270 Bytes
069157b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
import json
import uuid
from langchain.vectorstores import FAISS
import os
from tqdm.auto import tqdm
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import DirectoryLoader, TextLoader
from llms.embeddings import EMBEDDINGS_MAPPING
import tiktoken
import zipfile
import pickle
tokenizer_name = tiktoken.encoding_for_model('gpt-4')
tokenizer = tiktoken.get_encoding(tokenizer_name.name)
EMBED_MODEL = "text-embedding-ada-002"
EMBED_DIM = 1536
METRIC = 'cosine'
#######################################################################################################################
# Files handler
#######################################################################################################################
def check_existence(path):
return os.path.isfile(path) or os.path.isdir(path)
def list_files(directory, ext=".pdf"):
# List all files in the directory
files_in_directory = os.listdir(directory)
# Filter the list to only include PDF files
files_list = [file for file in files_in_directory if file.endswith(ext)]
return files_list
def list_pdf_files(directory):
# List all files in the directory
files_in_directory = os.listdir(directory)
# Filter the list to only include PDF files
pdf_files = [file for file in files_in_directory if file.endswith(".pdf")]
return pdf_files
def tiktoken_len(text):
# evaluate how many tokens for the given text
tokens = tokenizer.encode(text, disallowed_special=())
return len(tokens)
def get_chunks(docs, chunk_size=500, chunk_overlap=20, length_function=tiktoken_len):
# docs should be the output of `loader.load()`
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=length_function,
separators=["\n\n", "\n", " ", ""])
chunks = []
for idx, page in enumerate(tqdm(docs)):
source = page.metadata.get('source')
content = page.page_content
if len(content) > 50:
texts = text_splitter.split_text(content)
chunks.extend([str({'content': texts[i], 'chunk': i, 'source': os.path.basename(source)}) for i in
range(len(texts))])
return chunks
#######################################################################################################################
# Create FAISS object
#######################################################################################################################
# ["text-embedding-ada-002", "distilbert-dot-tas_b-b256-msmarco"]
def create_faiss_index_from_zip(path_to_zip_file, embeddings=None, pdf_loader=None,
chunk_size=500, chunk_overlap=20,
project_name="Very_Cool_Project_Name"):
# initialize the file structure
# structure: project_name
# - source data
# - embeddings
# - faiss_index
if isinstance(embeddings, str):
import copy
embeddings_str = copy.deepcopy(embeddings)
else:
embeddings_str = "other-embedding-model"
if embeddings is None or embeddings == "text-embedding-ada-002":
embeddings = EMBEDDINGS_MAPPING["text-embedding-ada-002"]
elif isinstance(embeddings, str):
embeddings = EMBEDDINGS_MAPPING[embeddings]
else:
embeddings = EMBEDDINGS_MAPPING["text-embedding-ada-002"]
# STEP 1:
# Create a folder f"{project_name}" in the current directory.
current_directory = os.getcwd()
if not os.path.exists(project_name):
os.makedirs(project_name)
project_path = os.path.join(current_directory, project_name)
source_data = os.path.join(project_path, "source_data")
embeddings_data = os.path.join(project_path, "embeddings")
index_data = os.path.join(project_path, "faiss_index")
os.makedirs(source_data) #./project/source_data
os.makedirs(embeddings_data) #./project/embeddings
os.makedirs(index_data) #./project/faiss_index
else:
raise ValueError(f"The project {project_name} exists.")
with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref:
# extract everything to "source_data"
zip_ref.extractall(source_data)
db_meta = {"project_name": project_name,
"pdf_loader": pdf_loader.__name__, "chunk_size": chunk_size,
"chunk_overlap": chunk_overlap,
"embedding_model": embeddings_str,
"files": os.listdir(source_data),
"source_path": source_data}
with open(os.path.join(project_path, "db_meta.json"), "w", encoding="utf-8") as f:
# save db_meta.json to folder
json.dump(db_meta, f)
all_docs = []
for ext in [".txt", ".tex", ".md", ".pdf"]:
if ext in [".txt", ".tex", ".md"]:
loader = DirectoryLoader(source_data, glob=f"**/*{ext}", loader_cls=TextLoader,
loader_kwargs={'autodetect_encoding': True})
elif ext in [".pdf"]:
loader = DirectoryLoader(source_data, glob=f"**/*{ext}", loader_cls=pdf_loader)
else:
continue
docs = loader.load()
all_docs = all_docs + docs
# split pdf files into chunks and evaluate its embeddings; save all results into embeddings
chunks = get_chunks(all_docs, chunk_size, chunk_overlap)
text_embeddings = embeddings.embed_documents(chunks)
text_embedding_pairs = list(zip(chunks, text_embeddings))
embeddings_save_to = os.path.join(embeddings_data, 'text_embedding_pairs.pickle')
with open(embeddings_save_to, 'wb') as handle:
pickle.dump(text_embedding_pairs, handle, protocol=pickle.HIGHEST_PROTOCOL)
db = FAISS.from_embeddings(text_embedding_pairs, embeddings)
db.save_local(index_data)
print(db_meta)
print("Success!")
return db, project_name, db_meta
def find_file(file_name, directory):
for root, dirs, files in os.walk(directory):
if file_name in files:
return os.path.join(root, file_name)
return None # If the file was not found
def find_file_dir(file_name, directory):
for root, dirs, files in os.walk(directory):
if file_name in files:
return root # return the directory instead of the full path
return None # If the file was not found
def load_faiss_index_from_zip(path_to_zip_file):
# Extract the zip file. Read the db_meta
# base_name = os.path.basename(path_to_zip_file)
path_to_extract = os.path.join(os.getcwd())
with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref:
zip_ref.extractall(path_to_extract)
db_meta_json = find_file("db_meta.json" , path_to_extract)
if db_meta_json is not None:
with open(db_meta_json, "r", encoding="utf-8") as f:
db_meta_dict = json.load(f)
else:
raise ValueError("Cannot find `db_meta.json` in the .zip file. ")
try:
embeddings = EMBEDDINGS_MAPPING[db_meta_dict["embedding_model"]]
except:
from langchain.embeddings.openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
# locate index.faiss
index_path = find_file_dir("index.faiss", path_to_extract)
if index_path is not None:
db = FAISS.load_local(index_path, embeddings)
return db
else:
raise ValueError("Failed to find `index.faiss` in the .zip file.")
if __name__ == "__main__":
from langchain.document_loaders import PyPDFLoader
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.embeddings import HuggingFaceEmbeddings
model_name = "sebastian-hofstaetter/distilbert-dot-tas_b-b256-msmarco"
model_kwargs = {'device': 'cpu'}
encode_kwargs = {'normalize_embeddings': False}
embeddings = HuggingFaceEmbeddings(
model_name=model_name,
model_kwargs=model_kwargs,
encode_kwargs=encode_kwargs)
create_faiss_index_from_zip(path_to_zip_file="document.zip", pdf_loader=PyPDFLoader, embeddings=embeddings)
|