reverse-RAG / my_2_embedder.py
elia-waefler's picture
init files, idea
8b9aa2c
raw
history blame
5.99 kB
import my_1_reader
import my_1_writer
import my_1_openai
import os
import openai
import pdf2image
from pdf2image import convert_from_path
from PIL import Image
import csv
import numpy as np
import os
import pdfminer
from pdf2image import convert_from_path
import csv
import numpy as np
import os
# Assuming your my_1_openai's vectorize functions work as described
def vectorize_data(data):
# Replace this with your actual logic to vectorize text data
return np.random.rand(100).tolist() # Example vector
def vectorize_image(data):
# Replace this with your actual logic to vectorize image data
return np.random.rand(100).tolist() # Example vector
def vectorize_this_pdf_with_metadata(pdf_path, output_path, metadata_filename="DS_U3/U3_Metadaten.csv"):
tensor_description = {
"my_id": 89, # Example ID, ideally this should be dynamically generated
"og_name": pdf_path,
"metadata": {},
"vec_content_text": [],
"vec_content_img": []
}
# Read metadata from CSV and match by 'og_name' (pdf_path)
# 'latin1', 'ISO-8859-1', or 'cp1252'
with open(metadata_filename, mode='r', encoding='utf-8') as csvfile:
csv_reader = csv.DictReader(csvfile)
for row in csv_reader:
if row["Name"] == os.path.basename(pdf_path): # Assuming 'Name' is a column in your CSV
tensor_description['metadata'] = row
break
# get text content
text = my_1_reader.extract_text_from_pdf(pdf_path)
# Vectorize extracted text
if text:
tensor_description['vec_content_text'].append(vectorize_data(data=text))
# Convert PDF pages to images using pdf2image
images = convert_from_path(pdf_path)
for img in images:
# Assume vectorize_image expects a PIL image; pdf2image.convert_from_path already returns PIL images
img_vector = vectorize_image(data=img)
tensor_description['vec_content_img'].append(img_vector)
# Here, instead of saving the tensor, we'll simply print it as an example
print(tensor_description)
return tensor_description
def vectorize_this_pdf_with_metadata_old(pdf_path, output_path, metadata_filename="DS_U3/U3_Metadaten.csv"):
# get PDF content, split into chunks
tensor_description = { #sample
"my_id": 89, #nummerate how often the func has been called
"og_name": pdf_path,
"metadata": {"a": 1, "b": 2, "c": 3}, # get from metadata_filename, get the full row, with the same filename in column A
"vec_content_text": [[0.03874, 0.03947, -0.0875], [-0.03234, 0.03437, -0.011234]], # vectorize all chunks of all the text in the PDF
# call my_1_openai.vectorize_data(data="string") this function returns the vector from Ada002 as a list
"vec_content_img": [[0.01234, 0.09875, -0.0542], [-0.02456, 0.03537, -0.016634]]
# for the images make every pdf into an img using pdf2image
# call my_1_openai.vecotrize_image(data=PIL_OBJ) this funciton should return the vector, of the image, comparable to text. write this funciton as well.
}
tensor = [] # make tensor from tensor_Description
return tensor
def vectorize_pdfs(pdf_dict):
"""
Vectorize a pdf using openai API
Parameters:
- dataset: dictionary containing PDF files.
Returns:
- dictionary containing vectors
"""
vec_dataset = {}
for key in pdf_dict.keys():
try:
vector = my_1_openai.vectorize_data(pdf_dict[key])
except openai.error.InvalidRequestError as err:
print(err)
vector = [0, 0, 0]
vec_dataset[key] = str(vector)
return vec_dataset
def vectorize_csv(csv_table, safe=False):
folder_name = ""
if safe:
folder_name = f"{csv_table}_vectorised/"
if not os.path.exists(folder_name):
os.makedirs(folder_name)
nb = 1
vec_dataset = []
for data_item in csv_table:
vector = my_1_openai.vectorize_data(data_item)
if safe:
with open(f"{folder_name}{csv_table}_vec.txt", "w") as f:
f.write(str(vector) + "\n")
print("csv_line"+str(nb))
nb += 1
vec_dataset.append(str(vector))
return vec_dataset
def create_df(ds):
# my_df = {"name": [], "metadata": [], "body_text": []}
my_df, my_new_df = {}, {}
my_df["name"] = [filename for filename in os.listdir(ds) if filename.endswith('.pdf')]
my_df["metadata"] = my_1_reader.read_csv_lines_as_strings(ds + "_metadata.csv")[1:11]
my_df["text"] = list(my_1_reader.read_pdfs_from_folder(ds).values())
for e in my_df:
my_new_df[f"{e}_vec"] = [my_1_openai.vectorize_data(item) for item in my_df[e]]
for e in my_new_df:
my_df[str(e)] = my_new_df[e]
for e in my_df:
print(f"{e} {my_df[e][2]}")
def create_vec_dataset(folder):
my_pdfs = my_1_reader.read_pdfs_from_folder(f"{folder}/PDF")
vectorize_then_safe_data(f"{folder}/vectors//names.json", my_pdfs.keys())
vectorize_then_safe_data(f"{folder}/vectors//texts.json", my_pdfs.values())
# function to vectorize data=[]. then safes as json.
def vectorize_then_safe_data(file_name, data):
my_vec_words = []
for entry in data:
my_vec_words.append(my_1_openai.vectorize_data(entry))
my_dict = dict(zip(data, my_vec_words))
my_1_writer.safe_my_dict_as_json(file_name, my_dict)
print("vectorised data saved")
def main():
# Example call to the function
pdf_path = 'DS_U3/Dokumente/E - Elektroanlagen/ISB-020-U3-W-E-01-B07005-001-040.pdf'
output_path = 'DS_U3/Dokumente_vec'
metadata_filename = 'DS_U3/U3_Metadaten.csv'
vectorize_this_pdf_with_metadata(pdf_path, output_path, metadata_filename)
if __name__ == "__main__":
print("this file contains embedding functions")
vec1 = vectorize_data("this is the test string")
vec2 = vectorize_data("this is the test string")
if vec1 == vec2:
print("same")