|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import argparse |
|
import os |
|
import submitit |
|
|
|
from omegaconf import OmegaConf |
|
from paintmind.engine.util import instantiate_from_config |
|
from paintmind.utils.device_utils import configure_compute_backend |
|
|
|
def parse_args(): |
|
parser = argparse.ArgumentParser("Submitit for accelerator training") |
|
parser.add_argument("--ngpus", default=8, type=int, help="Number of gpus to request on each node") |
|
parser.add_argument("--nodes", default=2, type=int, help="Number of nodes to request") |
|
parser.add_argument("--timeout", default=7000, type=int, help="Duration of the job, default 5 days") |
|
parser.add_argument("--qos", default="normal", type=str, help="QOS to request") |
|
parser.add_argument("--job_dir", default="", type=str, help="Job dir. Leave empty for automatic.") |
|
|
|
parser.add_argument("--partition", default="h100-camera-train", type=str, help="Partition where to submit") |
|
parser.add_argument("--exclude", default="", type=str, help="Exclude nodes from the partition") |
|
parser.add_argument("--nodelist", default="", type=str, help="Nodelist to request") |
|
parser.add_argument('--comment', default="", type=str, help="Comment to pass to scheduler") |
|
parser.add_argument('--cfg', type=str, default='configs/dit_imagenet_400ep.yaml', help='accelerator configs') |
|
return parser.parse_args() |
|
|
|
|
|
class Trainer(object): |
|
def __init__(self, args, config): |
|
self.args = args |
|
self.config = config |
|
|
|
def __call__(self): |
|
self._setup_gpu_args() |
|
configure_compute_backend() |
|
trainer = instantiate_from_config(self.config.trainer) |
|
trainer.train(self.config) |
|
|
|
def checkpoint(self): |
|
import os |
|
import submitit |
|
|
|
model_dir = os.path.join(self.args.output_dir, "models") |
|
if os.path.exists(model_dir): |
|
|
|
step_folders = [d for d in os.listdir(model_dir) if d.startswith("step")] |
|
if step_folders: |
|
|
|
steps = [int(f.replace("step", "")) for f in step_folders] |
|
max_step = max(steps) |
|
|
|
self.config.trainer.params.model.params.ckpt_path = os.path.join(model_dir, f"step{max_step}") |
|
print("Requeuing ", self.args, self.config) |
|
empty_trainer = type(self)(self.args, self.config) |
|
return submitit.helpers.DelayedSubmission(empty_trainer) |
|
|
|
def _setup_gpu_args(self): |
|
import submitit |
|
|
|
|
|
print("exporting PyTorch distributed environment variables") |
|
dist_env = submitit.helpers.TorchDistributedEnvironment().export(set_cuda_visible_devices=False) |
|
print(f"master: {dist_env.master_addr}:{dist_env.master_port}") |
|
print(f"rank: {dist_env.rank}") |
|
print(f"world size: {dist_env.world_size}") |
|
print(f"local rank: {dist_env.local_rank}") |
|
print(f"local world size: {dist_env.local_world_size}") |
|
|
|
|
|
|
|
os.environ["NCCL_P2P_DISABLE"] = "0" |
|
os.environ["NCCL_IB_DISABLE"] = "0" |
|
|
|
job_env = submitit.JobEnvironment() |
|
self.args.output_dir = str(self.args.output_dir).replace("%j", str(job_env.job_id)) |
|
self.args.log_dir = self.args.output_dir |
|
self.config.trainer.params.result_folder = self.args.output_dir |
|
self.config.trainer.params.log_dir = os.path.join(self.args.output_dir, "logs") |
|
|
|
|
|
|
|
print(f"Process group: {job_env.num_tasks} tasks, rank: {job_env.global_rank}") |
|
|
|
|
|
def main(): |
|
args = parse_args() |
|
cfg_file = args.cfg |
|
assert os.path.exists(cfg_file) |
|
config = OmegaConf.load(cfg_file) |
|
|
|
if config.trainer.params.result_folder is None: |
|
if args.job_dir == "": |
|
args.job_dir = "./output/%j" |
|
|
|
config.trainer.params.result_folder = args.job_dir |
|
config.trainer.params.log_dir = os.path.join(args.job_dir, "logs") |
|
else: |
|
args.job_dir = config.trainer.params.result_folder |
|
|
|
|
|
executor = submitit.AutoExecutor(folder=args.job_dir, slurm_max_num_timeout=30) |
|
|
|
num_gpus_per_node = args.ngpus |
|
nodes = args.nodes |
|
timeout_min = args.timeout |
|
qos = args.qos |
|
|
|
partition = args.partition |
|
kwargs = {} |
|
if args.comment: |
|
kwargs['slurm_comment'] = args.comment |
|
if args.exclude: |
|
kwargs["slurm_exclude"] = args.exclude |
|
if args.nodelist: |
|
kwargs["slurm_nodelist"] = args.nodelist |
|
|
|
executor.update_parameters( |
|
mem_gb=40 * num_gpus_per_node, |
|
gpus_per_node=num_gpus_per_node, |
|
tasks_per_node=num_gpus_per_node, |
|
|
|
nodes=nodes, |
|
timeout_min=timeout_min, |
|
|
|
slurm_partition=partition, |
|
slurm_signal_delay_s=120, |
|
slurm_qos=qos, |
|
**kwargs |
|
) |
|
|
|
executor.update_parameters(name="sar") |
|
|
|
args.output_dir = args.job_dir |
|
|
|
trainer = Trainer(args, config) |
|
job = executor.submit(trainer) |
|
|
|
print("Submitted job_id:", job.job_id) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|