StdGEN / blender /distributed_uniform_lrm.py
YulianSa's picture
Add application file
ef198e0
raw
history blame
4.3 kB
import json
import multiprocessing
import subprocess
import time
from dataclasses import dataclass
import os
import tyro
import concurrent.futures
@dataclass
class Args:
workers_per_gpu: int
"""number of workers per gpu"""
num_gpus: int = 8
"""number of gpus to use. -1 means all available gpus"""
input_dir: str
save_dir: str
engine: str = "BLENDER_EEVEE"
def check_already_rendered(save_path):
if not os.path.exists(os.path.join(save_path, '02419_semantic.png')):
return False
return True
def process_file(file):
if not check_already_rendered(file[1]):
return file
return None
def worker(queue, count, gpu):
while True:
try:
item = queue.get()
if item is None:
queue.task_done()
break
data_path, save_path, engine, log_name = item
print(f"Processing: {data_path} on GPU {gpu}")
start = time.time()
if check_already_rendered(save_path):
queue.task_done()
print('========', item, 'rendered', '========')
continue
else:
os.makedirs(save_path, exist_ok=True)
command = (f"export DISPLAY=:0.{gpu} &&"
f" CUDA_VISIBLE_DEVICES={gpu} "
f" blender -b -P blender_lrm_script.py --"
f" --object_path {data_path} --output_dir {save_path} --engine {engine}")
try:
subprocess.run(command, shell=True, timeout=3600, check=True)
count.value += 1
end = time.time()
with open(log_name, 'a') as f:
f.write(f'{end - start}\n')
except subprocess.CalledProcessError as e:
print(f"Subprocess error processing {item}: {e}")
except subprocess.TimeoutExpired as e:
print(f"Timeout expired processing {item}: {e}")
except Exception as e:
print(f"Error processing {item}: {e}")
finally:
queue.task_done()
except Exception as e:
print(f"Error processing {item}: {e}")
queue.task_done()
if __name__ == "__main__":
args = tyro.cli(Args)
queue = multiprocessing.JoinableQueue()
count = multiprocessing.Value("i", 0)
log_name = f'time_log_{args.workers_per_gpu}_{args.num_gpus}_{args.engine}.txt'
if args.num_gpus == -1:
result = subprocess.run(['nvidia-smi', '--list-gpus'], stdout=subprocess.PIPE)
output = result.stdout.decode('utf-8')
args.num_gpus = output.count('GPU')
files = []
for group in [ str(i) for i in range(10) ]:
for folder in os.listdir(f'{args.input_dir}/{group}'):
filename = f'{args.input_dir}/{group}/{folder}/{folder}.vrm'
outputdir = f'{args.save_dir}/{group}/{folder}'
files.append([filename, outputdir])
# sorted the files
files = sorted(files, key=lambda x: x[0])
# Use ThreadPoolExecutor for parallel processing
with concurrent.futures.ThreadPoolExecutor() as executor:
# Map the process_file function to the files
results = list(executor.map(process_file, files))
# Filter out None values from the results
unprocess_files = [file for file in results if file is not None]
# Print the number of unprocessed files and the split ID
print(f'Unprocessed files: {len(unprocess_files)}')
# Start worker processes on each of the GPUs
for gpu_i in range(args.num_gpus):
for worker_i in range(args.workers_per_gpu):
worker_i = gpu_i * args.workers_per_gpu + worker_i
process = multiprocessing.Process(
target=worker, args=(queue, count, gpu_i)
)
process.daemon = True
process.start()
for file in unprocess_files:
queue.put((file[0], file[1], args.engine, log_name))
# Add sentinels to the queue to stop the worker processes
for i in range(args.num_gpus * args.workers_per_gpu * 10):
queue.put(None)
# Wait for all tasks to be completed
queue.join()
end = time.time()