import json import multiprocessing import subprocess import time from dataclasses import dataclass import os import tyro import concurrent.futures @dataclass class Args: workers_per_gpu: int """number of workers per gpu""" num_gpus: int = 8 """number of gpus to use. -1 means all available gpus""" input_dir: str save_dir: str engine: str = "BLENDER_EEVEE" def check_already_rendered(save_path): if not os.path.exists(os.path.join(save_path, '02419_semantic.png')): return False return True def process_file(file): if not check_already_rendered(file[1]): return file return None def worker(queue, count, gpu): while True: try: item = queue.get() if item is None: queue.task_done() break data_path, save_path, engine, log_name = item print(f"Processing: {data_path} on GPU {gpu}") start = time.time() if check_already_rendered(save_path): queue.task_done() print('========', item, 'rendered', '========') continue else: os.makedirs(save_path, exist_ok=True) command = (f"export DISPLAY=:0.{gpu} &&" f" CUDA_VISIBLE_DEVICES={gpu} " f" blender -b -P --" f" --object_path {data_path} --output_dir {save_path} --engine {engine}") try:, shell=True, timeout=3600, check=True) count.value += 1 end = time.time() with open(log_name, 'a') as f: f.write(f'{end - start}\n') except subprocess.CalledProcessError as e: print(f"Subprocess error processing {item}: {e}") except subprocess.TimeoutExpired as e: print(f"Timeout expired processing {item}: {e}") except Exception as e: print(f"Error processing {item}: {e}") finally: queue.task_done() except Exception as e: print(f"Error processing {item}: {e}") queue.task_done() if __name__ == "__main__": args = tyro.cli(Args) queue = multiprocessing.JoinableQueue() count = multiprocessing.Value("i", 0) log_name = f'time_log_{args.workers_per_gpu}_{args.num_gpus}_{args.engine}.txt' if args.num_gpus == -1: result =['nvidia-smi', '--list-gpus'], stdout=subprocess.PIPE) output = result.stdout.decode('utf-8') args.num_gpus = output.count('GPU') files = [] for group in [ str(i) for i in range(10) ]: for folder in os.listdir(f'{args.input_dir}/{group}'): filename = f'{args.input_dir}/{group}/{folder}/{folder}.vrm' outputdir = f'{args.save_dir}/{group}/{folder}' files.append([filename, outputdir]) # sorted the files files = sorted(files, key=lambda x: x[0]) # Use ThreadPoolExecutor for parallel processing with concurrent.futures.ThreadPoolExecutor() as executor: # Map the process_file function to the files results = list(, files)) # Filter out None values from the results unprocess_files = [file for file in results if file is not None] # Print the number of unprocessed files and the split ID print(f'Unprocessed files: {len(unprocess_files)}') # Start worker processes on each of the GPUs for gpu_i in range(args.num_gpus): for worker_i in range(args.workers_per_gpu): worker_i = gpu_i * args.workers_per_gpu + worker_i process = multiprocessing.Process( target=worker, args=(queue, count, gpu_i) ) process.daemon = True process.start() for file in unprocess_files: queue.put((file[0], file[1], args.engine, log_name)) # Add sentinels to the queue to stop the worker processes for i in range(args.num_gpus * args.workers_per_gpu * 10): queue.put(None) # Wait for all tasks to be completed queue.join() end = time.time()