old_tok / submitit_train.py
tennant's picture
upload
af7c0ce
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
# --------------------------------------------------------
# A script to run multinode training with submitit.
# --------------------------------------------------------
import argparse
import os
import submitit
from omegaconf import OmegaConf
from paintmind.engine.util import instantiate_from_config
from paintmind.utils.device_utils import configure_compute_backend
def parse_args():
parser = argparse.ArgumentParser("Submitit for accelerator training")
parser.add_argument("--ngpus", default=8, type=int, help="Number of gpus to request on each node")
parser.add_argument("--nodes", default=2, type=int, help="Number of nodes to request")
parser.add_argument("--timeout", default=7000, type=int, help="Duration of the job, default 5 days")
parser.add_argument("--qos", default="normal", type=str, help="QOS to request")
parser.add_argument("--job_dir", default="", type=str, help="Job dir. Leave empty for automatic.")
parser.add_argument("--partition", default="h100-camera-train", type=str, help="Partition where to submit")
parser.add_argument("--exclude", default="", type=str, help="Exclude nodes from the partition")
parser.add_argument("--nodelist", default="", type=str, help="Nodelist to request")
parser.add_argument('--comment', default="", type=str, help="Comment to pass to scheduler")
parser.add_argument('--cfg', type=str, default='configs/dit_imagenet_400ep.yaml', help='accelerator configs')
return parser.parse_args()
class Trainer(object):
def __init__(self, args, config):
self.args = args
self.config = config
def __call__(self):
self._setup_gpu_args()
configure_compute_backend()
trainer = instantiate_from_config(self.config.trainer)
trainer.train(self.config)
def checkpoint(self):
import os
import submitit
model_dir = os.path.join(self.args.output_dir, "models")
if os.path.exists(model_dir):
# Get all step folders
step_folders = [d for d in os.listdir(model_dir) if d.startswith("step")]
if step_folders:
# Extract step numbers and find max
steps = [int(f.replace("step", "")) for f in step_folders]
max_step = max(steps)
# Set ckpt path to the latest step folder
self.config.trainer.params.model.params.ckpt_path = os.path.join(model_dir, f"step{max_step}")
print("Requeuing ", self.args, self.config)
empty_trainer = type(self)(self.args, self.config)
return submitit.helpers.DelayedSubmission(empty_trainer)
def _setup_gpu_args(self):
import submitit
# print_env()
print("exporting PyTorch distributed environment variables")
dist_env = submitit.helpers.TorchDistributedEnvironment().export(set_cuda_visible_devices=False)
print(f"master: {dist_env.master_addr}:{dist_env.master_port}")
print(f"rank: {dist_env.rank}")
print(f"world size: {dist_env.world_size}")
print(f"local rank: {dist_env.local_rank}")
print(f"local world size: {dist_env.local_world_size}")
# print_env()
# os.environ["NCCL_DEBUG"] = "INFO"
os.environ["NCCL_P2P_DISABLE"] = "0"
os.environ["NCCL_IB_DISABLE"] = "0"
job_env = submitit.JobEnvironment()
self.args.output_dir = str(self.args.output_dir).replace("%j", str(job_env.job_id))
self.args.log_dir = self.args.output_dir
self.config.trainer.params.result_folder = self.args.output_dir
self.config.trainer.params.log_dir = os.path.join(self.args.output_dir, "logs")
# self.args.gpu = job_env.local_rank
# self.args.rank = job_env.global_rank
# self.args.world_size = job_env.num_tasks
print(f"Process group: {job_env.num_tasks} tasks, rank: {job_env.global_rank}")
def main():
args = parse_args()
cfg_file = args.cfg
assert os.path.exists(cfg_file)
config = OmegaConf.load(cfg_file)
if config.trainer.params.result_folder is None:
if args.job_dir == "":
args.job_dir = "./output/%j"
config.trainer.params.result_folder = args.job_dir
config.trainer.params.log_dir = os.path.join(args.job_dir, "logs")
else:
args.job_dir = config.trainer.params.result_folder
# Note that the folder will depend on the job_id, to easily track experiments
executor = submitit.AutoExecutor(folder=args.job_dir, slurm_max_num_timeout=30)
num_gpus_per_node = args.ngpus
nodes = args.nodes
timeout_min = args.timeout
qos = args.qos
partition = args.partition
kwargs = {}
if args.comment:
kwargs['slurm_comment'] = args.comment
if args.exclude:
kwargs["slurm_exclude"] = args.exclude
if args.nodelist:
kwargs["slurm_nodelist"] = args.nodelist
executor.update_parameters(
mem_gb=40 * num_gpus_per_node,
gpus_per_node=num_gpus_per_node,
tasks_per_node=num_gpus_per_node, # one task per GPU
# cpus_per_task=16,
nodes=nodes,
timeout_min=timeout_min, # max is 60 * 72
# Below are cluster dependent parameters
slurm_partition=partition,
slurm_signal_delay_s=120,
slurm_qos=qos,
**kwargs
)
executor.update_parameters(name="sar")
args.output_dir = args.job_dir
trainer = Trainer(args, config)
job = executor.submit(trainer)
print("Submitted job_id:", job.job_id)
if __name__ == "__main__":
main()