caption / preprocess_images.py
bdepecik's picture
Upload 6 files
b74f043
from multiprocessing import cpu_count
from multiprocessing import Pool
import src.process_image as pi
import pyarrow.parquet as pq
from tqdm import tqdm
import pandas as pd
import argparse
import math
import gc
import os
"""
########################## Script to Pre-process image files by converting them to numpy array and save numpy arrays to hdf5 format #####################
Working on limtted RAM resource, so convert 18k images into numpy array and them to hdf5 always caused "OutofMemory" error. Also convert 18k too long time.
Faced memory issue to load entire the .parquet file as well.
Solution took:
-> Load .parquet file:
Read data from parquet file in batch of N. N batch of records are sent to process.
-> Process image to numpy:
Create Shared Memory based on the batch size and the space need for an image. So shared memory size will be = N * image_size
Used Shared Memory that can now store N image at a time. Any more then that Shared Memory was not created and will give OutOfMemory error.
Image size was too large, so convert image to numpy will also be large.
While experiment found 300 batch size was ideal
We created shared memory only once and after each batch of N we clean up the shared memory to store next batch process data
-> Create pool of process, each process write to shared memory.
-> End of each batch N data we created a .hdf5. So by end of the script we will have (Num_Records/N) .hdf5 files
-> Now we combine each .hdf5 into single .hdf5
"""
###################################################################################################
#Since we do not know the number of batch it will generted for any give dataset file. We shall created a generator to loop thrught and show a progress
def generator():
while True:
yield
if __name__ == "__main__":
# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
#ap.add_argument("-i", "--images", required = True, type = str, help = "base path to directory containing of images")
ap.add_argument("-d", "--proc_data", required = False, type = str, default = '\data\\processed\\test_data.parquet' ,help = "filename with path, of dataset who's images need to be converted to numpy array")
ap.add_argument("-p", "--procs", type = int, default = -1, help = "# of processes to spin up")
#ap.add_argument("-r", "--datarng", type = str, default = '0:100', help = "range of records to process")
ap.add_argument("-b", "--batch_size", type = int, default = 250, help = "chunk/batch size to read data from file")
ap.add_argument("-a", "--action_type", type = int, default = 1, help = "action script will perform. 1-> convert image to numpy and combine files. 2-> combine the files. 3-> convert image to numpy")
args = vars(ap.parse_args())
# determine the number of concurrent processes to launch when
# distributing the load across the system, then create the list
# of process IDs
NUM_WORKERS = args["procs"] if args["procs"] > 0 else cpu_count()
#IMAGE_BASE_FOLDER = args["images"] #data\\images. Since we run in parallel process, We can not init this variable for each process. So defined it global
DATA_TO_PROCESS = args["proc_data"] #data\\processed\\test_data.parquet
BATCH_SIZE = args["batch_size"] #Batch size
ACTION_TYPE = args["action_type"]
shared_memory_created = False
try:
if (ACTION_TYPE == 1) or (ACTION_TYPE == 3):
print(f'[INFO]: Process to read the {DATA_TO_PROCESS}, convert images to numpy array and store. Started...')
# When working with large valid(1902 records)/train(18k records) fail to allocate resource for shared memory.
# Gave "[WinError 1450] Insufficient system resources exist to complete the requested service"
# To over come the issue decided to split the records range and then merge those records i.e pass/read the data in batch
"""
# Code that read range from input argument, then read the entire dataset and then slide the range.
# But in this approach first entire file records is loaded, which take memory space and the script fail to allocate shared memory space.
# Method 1:
RANGE = args["datarng"]
start = int(RANGE.split(':')[0])
end = int(RANGE.split(':')[1])
data = pd.read_parquet(DATA_TO_PROCESS)
if (start > 0) and (end > 0) and (start < end):
data = data[start:end]
"""
#Method 2: Read the data in batch using parquet
parquet_file = pq.ParquetFile(DATA_TO_PROCESS)
number_of_records = parquet_file.metadata.num_rows
start = 0
end = 0
number_of_batch = math.ceil(number_of_records/BATCH_SIZE)
with tqdm(total = number_of_batch) as pbar:
for i in parquet_file.iter_batches(batch_size = BATCH_SIZE):
end +=BATCH_SIZE
RANGE = str(start) + ':' + str(end)
data = i.to_pandas()
print(f'[INFO]: Process data range {RANGE} started.')
img_shm, img_id_shm = pi.create_shared_memory_nparray(data.shape[0], shared_memory_created)
shared_memory_created = True
print('[INFO]: Sucessfully created shared memory resource.')
process_args = list(zip(range(0, data.shape[0]), data['id'], data['image_name'], [data.shape[0]] * data.shape[0]))
print('[INFO]: Starting Pool process...')
with Pool(NUM_WORKERS) as pror_pool:
#tqdm with pool not helpfull
#for _ in tqdm(pror_pool.map(pi.process_images, process_args), total = data.shape[0]):
# pass
pror_pool.map(pi.process_images, process_args)
print('[INFO]: Started saving data to hdf5 format...')
hdf5_filename, filename = os.path.split(DATA_TO_PROCESS)
hdf5_filename = os.path.join(hdf5_filename, filename.split('.')[0] + '_' + RANGE.replace(':','_') + '.h5')
pi.save_to_hdf(hdf5_filename, data.shape[0])
print(f'[INFO]: Process data range {RANGE} completed.')
start = end
del [data]
pbar.update(1)
print('[INFO]: Process to convert images to numpy array and store in seperate files. Completed.')
if (ACTION_TYPE == 1) or (ACTION_TYPE == 2):
print('[INFO]: Combine multiple hdf5 files into one started...')
path, name = os.path.split(DATA_TO_PROCESS)
name = name.split('.')[0]
pi.combine_multiple_hdf(name, path)
except Exception as e:
print(f'Error Occured: {e}')
finally:
if shared_memory_created:
pi.release_shared_memory()
gc.collect()
print('[INFO]: Script execution completed.')
############################################################333
#Sample code
#python preprocess_images.py -d data\processed\test_data.parquet
#python preprocess_images.py -d data\processed\validate_data.parquet -r 0:100
#python preprocess_images.py -d data\processed\validate_data.parquet
#python preprocess_images.py -d data\processed\validate_data.parquet -b 300
#python preprocess_images.py -d data\processed\train_data.parquet -b 300 -a 2