Spaces:
Running
on
L40S
Running
on
L40S
import json | |
import multiprocessing | |
import subprocess | |
import time | |
from dataclasses import dataclass | |
import os | |
import tyro | |
import concurrent.futures | |
class Args: | |
workers_per_gpu: int | |
"""number of workers per gpu""" | |
num_gpus: int = 8 | |
"""number of gpus to use. -1 means all available gpus""" | |
input_dir: str | |
save_dir: str | |
engine: str = "BLENDER_EEVEE" | |
def check_already_rendered(save_path): | |
if not os.path.exists(os.path.join(save_path, '02419_semantic.png')): | |
return False | |
return True | |
def process_file(file): | |
if not check_already_rendered(file[1]): | |
return file | |
return None | |
def worker(queue, count, gpu): | |
while True: | |
try: | |
item = queue.get() | |
if item is None: | |
queue.task_done() | |
break | |
data_path, save_path, engine, log_name = item | |
print(f"Processing: {data_path} on GPU {gpu}") | |
start = time.time() | |
if check_already_rendered(save_path): | |
queue.task_done() | |
print('========', item, 'rendered', '========') | |
continue | |
else: | |
os.makedirs(save_path, exist_ok=True) | |
command = (f"export DISPLAY=:0.{gpu} &&" | |
f" CUDA_VISIBLE_DEVICES={gpu} " | |
f" blender -b -P blender_lrm_script.py --" | |
f" --object_path {data_path} --output_dir {save_path} --engine {engine}") | |
try: | |
subprocess.run(command, shell=True, timeout=3600, check=True) | |
count.value += 1 | |
end = time.time() | |
with open(log_name, 'a') as f: | |
f.write(f'{end - start}\n') | |
except subprocess.CalledProcessError as e: | |
print(f"Subprocess error processing {item}: {e}") | |
except subprocess.TimeoutExpired as e: | |
print(f"Timeout expired processing {item}: {e}") | |
except Exception as e: | |
print(f"Error processing {item}: {e}") | |
finally: | |
queue.task_done() | |
except Exception as e: | |
print(f"Error processing {item}: {e}") | |
queue.task_done() | |
if __name__ == "__main__": | |
args = tyro.cli(Args) | |
queue = multiprocessing.JoinableQueue() | |
count = multiprocessing.Value("i", 0) | |
log_name = f'time_log_{args.workers_per_gpu}_{args.num_gpus}_{args.engine}.txt' | |
if args.num_gpus == -1: | |
result = subprocess.run(['nvidia-smi', '--list-gpus'], stdout=subprocess.PIPE) | |
output = result.stdout.decode('utf-8') | |
args.num_gpus = output.count('GPU') | |
files = [] | |
for group in [ str(i) for i in range(10) ]: | |
for folder in os.listdir(f'{args.input_dir}/{group}'): | |
filename = f'{args.input_dir}/{group}/{folder}/{folder}.vrm' | |
outputdir = f'{args.save_dir}/{group}/{folder}' | |
files.append([filename, outputdir]) | |
# sorted the files | |
files = sorted(files, key=lambda x: x[0]) | |
# Use ThreadPoolExecutor for parallel processing | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
# Map the process_file function to the files | |
results = list(executor.map(process_file, files)) | |
# Filter out None values from the results | |
unprocess_files = [file for file in results if file is not None] | |
# Print the number of unprocessed files and the split ID | |
print(f'Unprocessed files: {len(unprocess_files)}') | |
# Start worker processes on each of the GPUs | |
for gpu_i in range(args.num_gpus): | |
for worker_i in range(args.workers_per_gpu): | |
worker_i = gpu_i * args.workers_per_gpu + worker_i | |
process = multiprocessing.Process( | |
target=worker, args=(queue, count, gpu_i) | |
) | |
process.daemon = True | |
process.start() | |
for file in unprocess_files: | |
queue.put((file[0], file[1], args.engine, log_name)) | |
# Add sentinels to the queue to stop the worker processes | |
for i in range(args.num_gpus * args.workers_per_gpu * 10): | |
queue.put(None) | |
# Wait for all tasks to be completed | |
queue.join() | |
end = time.time() | |