import datetime import json import os import os.path as osp import pickle import random import sys import time from collections import namedtuple import __main__ as main import dateutil.tz import numpy as np from rlkit.core import logger from rlkit.launchers import conf from rlkit.torch.pytorch_util import set_gpu_mode import rlkit.pythonplusplus as ppp GitInfo = namedtuple( 'GitInfo', [ 'directory', 'code_diff', 'code_diff_staged', 'commit_hash', 'branch_name', ], ) def get_git_infos(dirs): try: import git git_infos = [] for directory in dirs: # Idk how to query these things, so I'm just doing try-catch try: repo = git.Repo(directory) try: branch_name = repo.active_branch.name except TypeError: branch_name = '[DETACHED]' git_infos.append(GitInfo( directory=directory, code_diff=repo.git.diff(None), code_diff_staged=repo.git.diff('--staged'), commit_hash=repo.head.commit.hexsha, branch_name=branch_name, )) except git.exc.InvalidGitRepositoryError as e: print("Not a valid git repo: {}".format(directory)) except ImportError: git_infos = None return git_infos def recursive_items(dictionary): """ Get all (key, item) recursively in a potentially recursive dictionary. Usage: ``` x = { 'foo' : { 'bar' : 5 } } recursive_items(x) # output: # ('foo', {'bar' : 5}) # ('bar', 5) ``` :param dictionary: :return: """ for key, value in dictionary.items(): yield key, value if type(value) is dict: yield from recursive_items(value) def save_experiment_data(dictionary, log_dir): with open(log_dir + '/experiment.pkl', 'wb') as handle: pickle.dump(dictionary, handle, protocol=pickle.HIGHEST_PROTOCOL) def run_experiment_here( experiment_function, variant=None, exp_id=0, seed=None, use_gpu=True, # Logger params: exp_prefix="default", snapshot_mode='last', snapshot_gap=1, git_infos=None, script_name=None, base_log_dir=None, force_randomize_seed=False, log_dir=None, **setup_logger_kwargs ): """ Run an experiment locally without any serialization. :param experiment_function: Function. `variant` will be passed in as its only argument. :param exp_prefix: Experiment prefix for the save file. :param variant: Dictionary passed in to `experiment_function`. :param exp_id: Experiment ID. Should be unique across all experiments. Note that one experiment may correspond to multiple seeds,. :param seed: Seed used for this experiment. :param use_gpu: Run with GPU. By default False. :param script_name: Name of the running script :param log_dir: If set, set the log directory to this. Otherwise, the directory will be auto-generated based on the exp_prefix. :return: """ if variant is None: variant = {} variant['exp_id'] = str(exp_id) if force_randomize_seed or seed is None: seed = random.randint(0, 100000) variant['seed'] = str(seed) reset_execution_environment() actual_log_dir = setup_logger( exp_prefix=exp_prefix, variant=variant, exp_id=exp_id, seed=seed, snapshot_mode=snapshot_mode, snapshot_gap=snapshot_gap, base_log_dir=base_log_dir, log_dir=log_dir, git_infos=git_infos, script_name=script_name, **setup_logger_kwargs ) set_seed(seed) set_gpu_mode(use_gpu) run_experiment_here_kwargs = dict( variant=variant, exp_id=exp_id, seed=seed, use_gpu=use_gpu, exp_prefix=exp_prefix, snapshot_mode=snapshot_mode, snapshot_gap=snapshot_gap, git_infos=git_infos, script_name=script_name, base_log_dir=base_log_dir, **setup_logger_kwargs ) save_experiment_data( dict( run_experiment_here_kwargs=run_experiment_here_kwargs ), actual_log_dir ) return experiment_function(variant) def create_exp_name(exp_prefix, exp_id=0, seed=0): """ Create a semi-unique experiment name that has a timestamp :param exp_prefix: :param exp_id: :return: """ now = datetime.datetime.now(dateutil.tz.tzlocal()) timestamp = now.strftime('%Y_%m_%d_%H_%M_%S') return "%s_%s_%04d--s-%d" % (exp_prefix, timestamp, exp_id, seed) def create_log_dir( exp_prefix, exp_id=0, seed=0, base_log_dir=None, include_exp_prefix_sub_dir=True, ): """ Creates and returns a unique log directory. :param exp_prefix: All experiments with this prefix will have log directories be under this directory. :param exp_id: The number of the specific experiment run within this experiment. :param base_log_dir: The directory where all log should be saved. :return: """ exp_name = create_exp_name(exp_prefix, exp_id=exp_id, seed=seed) if base_log_dir is None: base_log_dir = conf.LOCAL_LOG_DIR if include_exp_prefix_sub_dir: log_dir = osp.join(base_log_dir, exp_prefix.replace("_", "-"), exp_name) else: log_dir = osp.join(base_log_dir, exp_name) if osp.exists(log_dir): print("WARNING: Log directory already exists {}".format(log_dir)) os.makedirs(log_dir, exist_ok=True) return log_dir def setup_logger( exp_prefix="default", variant=None, text_log_file="debug.log", variant_log_file="variant.json", tabular_log_file="progress.csv", snapshot_mode="last", snapshot_gap=1, log_tabular_only=False, log_dir=None, git_infos=None, script_name=None, **create_log_dir_kwargs ): """ Set up logger to have some reasonable default settings. Will save log output to based_log_dir/exp_prefix/exp_name. exp_name will be auto-generated to be unique. If log_dir is specified, then that directory is used as the output dir. :param exp_prefix: The sub-directory for this specific experiment. :param variant: :param text_log_file: :param variant_log_file: :param tabular_log_file: :param snapshot_mode: :param log_tabular_only: :param snapshot_gap: :param log_dir: :param git_infos: :param script_name: If set, save the script name to this. :return: """ if git_infos is None: git_infos = get_git_infos(conf.CODE_DIRS_TO_MOUNT) first_time = log_dir is None if first_time: log_dir = create_log_dir(exp_prefix, **create_log_dir_kwargs) if variant is not None: logger.log("Variant:") logger.log(json.dumps(dict_to_safe_json(variant), indent=2)) variant_log_path = osp.join(log_dir, variant_log_file) logger.log_variant(variant_log_path, variant) tabular_log_path = osp.join(log_dir, tabular_log_file) text_log_path = osp.join(log_dir, text_log_file) logger.add_text_output(text_log_path) if first_time: logger.add_tabular_output(tabular_log_path) else: logger._add_output(tabular_log_path, logger._tabular_outputs, logger._tabular_fds, mode='a') for tabular_fd in logger._tabular_fds: logger._tabular_header_written.add(tabular_fd) logger.set_snapshot_dir(log_dir) logger.set_snapshot_mode(snapshot_mode) logger.set_snapshot_gap(snapshot_gap) logger.set_log_tabular_only(log_tabular_only) exp_name = log_dir.split("/")[-1] logger.push_prefix("[%s] " % exp_name) if git_infos is not None: for ( directory, code_diff, code_diff_staged, commit_hash, branch_name ) in git_infos: if directory[-1] == '/': directory = directory[:-1] diff_file_name = directory[1:].replace("/", "-") + ".patch" diff_staged_file_name = ( directory[1:].replace("/", "-") + "_staged.patch" ) if code_diff is not None and len(code_diff) > 0: with open(osp.join(log_dir, diff_file_name), "w") as f: f.write(code_diff + '\n') if code_diff_staged is not None and len(code_diff_staged) > 0: with open(osp.join(log_dir, diff_staged_file_name), "w") as f: f.write(code_diff_staged + '\n') with open(osp.join(log_dir, "git_infos.txt"), "a") as f: f.write("directory: {}\n".format(directory)) f.write("git hash: {}\n".format(commit_hash)) f.write("git branch name: {}\n\n".format(branch_name)) if script_name is not None: with open(osp.join(log_dir, "script_name.txt"), "w") as f: f.write(script_name) return log_dir def create_exp_name_custom(exp_prefix, batch=256, num_ensemble=1, num_layer=2, seed=0, confthres=0, ber_mean=-1): """ Create a semi-unique experiment name that has a timestamp :param exp_prefix: :param exp_id: :return: """ if 'conf' in exp_prefix: if 'boot' in exp_prefix: return "%s_en_%d_ber_%.1f_conf_%.1f_batch_%d_layer_%d_seed_%d" % ( exp_prefix, num_ensemble, ber_mean, confthres, batch, num_layer, seed) else: return "%s_en_%d_conf_%.1f_batch_%d_layer_%d_seed_%d" % ( exp_prefix, num_ensemble, confthres, batch, num_layer, seed) else: if 'boot' in exp_prefix: return "%s_en_%d_ber_%.1f_batch_%d_layer_%d_seed_%d" % ( exp_prefix, num_ensemble, ber_mean, batch, num_layer, seed) else: return "%s_en_%d_batch_%d_layer_%d_seed_%d" % (exp_prefix, num_ensemble, batch, num_layer, seed) def create_log_dir_custom( exp_prefix, env_name, batch=256, num_layer=2, seed=0, num_ensemble=1, confthres=0, ber_mean=-1, base_log_dir=None, include_exp_prefix_sub_dir=True, ): """ Creates and returns a unique log directory. :param exp_prefix: All experiments with this prefix will have log directories be under this directory. :param exp_id: The number of the specific experiment run within this experiment. :param base_log_dir: The directory where all log should be saved. :return: """ exp_name = create_exp_name_custom( exp_prefix, batch=batch, num_ensemble=num_ensemble, num_layer=num_layer, seed=seed, confthres=confthres, ber_mean=ber_mean) if base_log_dir is None: base_log_dir = conf.LOCAL_LOG_DIR base_log_dir += '/' + env_name + '/' if include_exp_prefix_sub_dir: log_dir = osp.join(base_log_dir, exp_prefix.replace("_", "-"), exp_name) else: log_dir = osp.join(base_log_dir, exp_name) if osp.exists(log_dir): print("WARNING: Log directory already exists {}".format(log_dir)) os.makedirs(log_dir, exist_ok=True) return log_dir def setup_logger_custom( exp_prefix="default", variant=None, text_log_file="debug.log", variant_log_file="variant.json", tabular_log_file="progress.csv", snapshot_mode="last", snapshot_gap=1, log_tabular_only=False, log_dir=None, git_infos=None, script_name=None, **create_log_dir_kwargs ): """ Set up logger to have some reasonable default settings. Will save log output to based_log_dir/exp_prefix/exp_name. exp_name will be auto-generated to be unique. If log_dir is specified, then that directory is used as the output dir. :param exp_prefix: The sub-directory for this specific experiment. :param variant: :param text_log_file: :param variant_log_file: :param tabular_log_file: :param snapshot_mode: :param log_tabular_only: :param snapshot_gap: :param log_dir: :param git_infos: :param script_name: If set, save the script name to this. :return: """ if git_infos is None: git_infos = get_git_infos(conf.CODE_DIRS_TO_MOUNT) first_time = log_dir is None if first_time: log_dir = create_log_dir_custom( exp_prefix, env_name=variant['env'], batch=variant['algorithm_kwargs']['batch_size'], num_ensemble=variant['num_ensemble'], num_layer=variant['num_layer'], seed=variant['seed']) if variant is not None: logger.log("Variant:") logger.log(json.dumps(dict_to_safe_json(variant), indent=2)) variant_log_path = osp.join(log_dir, variant_log_file) logger.log_variant(variant_log_path, variant) tabular_log_path = osp.join(log_dir, tabular_log_file) text_log_path = osp.join(log_dir, text_log_file) logger.add_text_output(text_log_path) if first_time: logger.add_tabular_output(tabular_log_path) else: logger._add_output(tabular_log_path, logger._tabular_outputs, logger._tabular_fds, mode='a') for tabular_fd in logger._tabular_fds: logger._tabular_header_written.add(tabular_fd) logger.set_snapshot_dir(log_dir) logger.set_snapshot_mode(snapshot_mode) logger.set_snapshot_gap(snapshot_gap) logger.set_log_tabular_only(log_tabular_only) exp_name = log_dir.split("/")[-1] logger.push_prefix("[%s] " % exp_name) if git_infos is not None: for ( directory, code_diff, code_diff_staged, commit_hash, branch_name ) in git_infos: if directory[-1] == '/': directory = directory[:-1] diff_file_name = directory[1:].replace("/", "-") + ".patch" diff_staged_file_name = ( directory[1:].replace("/", "-") + "_staged.patch" ) if code_diff is not None and len(code_diff) > 0: with open(osp.join(log_dir, diff_file_name), "w") as f: f.write(code_diff + '\n') if code_diff_staged is not None and len(code_diff_staged) > 0: with open(osp.join(log_dir, diff_staged_file_name), "w") as f: f.write(code_diff_staged + '\n') with open(osp.join(log_dir, "git_infos.txt"), "a") as f: f.write("directory: {}\n".format(directory)) f.write("git hash: {}\n".format(commit_hash)) f.write("git branch name: {}\n\n".format(branch_name)) if script_name is not None: with open(osp.join(log_dir, "script_name.txt"), "w") as f: f.write(script_name) return log_dir def dict_to_safe_json(d): """ Convert each value in the dictionary into a JSON'able primitive. :param d: :return: """ new_d = {} for key, item in d.items(): if safe_json(item): new_d[key] = item else: if isinstance(item, dict): new_d[key] = dict_to_safe_json(item) else: new_d[key] = str(item) return new_d def safe_json(data): if data is None: return True elif isinstance(data, (bool, int, float)): return True elif isinstance(data, (tuple, list)): return all(safe_json(x) for x in data) elif isinstance(data, dict): return all(isinstance(k, str) and safe_json(v) for k, v in data.items()) return False def set_seed(seed): """ Set the seed for all the possible random number generators. :param seed: :return: None """ seed = int(seed) random.seed(seed) np.random.seed(seed) def reset_execution_environment(): """ Call this between calls to separate experiments. :return: """ logger.reset() def query_yes_no(question, default="yes"): """Ask a yes/no question via raw_input() and return their answer. "question" is a string that is presented to the user. "default" is the presumed answer if the user just hits . It must be "yes" (the default), "no" or None (meaning an answer is required of the user). The "answer" return value is True for "yes" or False for "no". """ valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} if default is None: prompt = " [y/n] " elif default == "yes": prompt = " [Y/n] " elif default == "no": prompt = " [y/N] " else: raise ValueError("invalid default answer: '%s'" % default) while True: sys.stdout.write(question + prompt) choice = input().lower() if default is not None and choice == '': return valid[default] elif choice in valid: return valid[choice] else: sys.stdout.write("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n") """ Below is doodad-specific code """ ec2_okayed = False gpu_ec2_okayed = False first_sss_launch = True try: import doodad.mount as mount from doodad.utils import REPO_DIR CODE_MOUNTS = [ mount.MountLocal(local_dir=REPO_DIR, pythonpath=True), ] for code_dir in conf.CODE_DIRS_TO_MOUNT: CODE_MOUNTS.append(mount.MountLocal(local_dir=code_dir, pythonpath=True)) NON_CODE_MOUNTS = [] for non_code_mapping in conf.DIR_AND_MOUNT_POINT_MAPPINGS: NON_CODE_MOUNTS.append(mount.MountLocal(**non_code_mapping)) SSS_CODE_MOUNTS = [] SSS_NON_CODE_MOUNTS = [] if hasattr(conf, 'SSS_DIR_AND_MOUNT_POINT_MAPPINGS'): for non_code_mapping in conf.SSS_DIR_AND_MOUNT_POINT_MAPPINGS: SSS_NON_CODE_MOUNTS.append(mount.MountLocal(**non_code_mapping)) if hasattr(conf, 'SSS_CODE_DIRS_TO_MOUNT'): for code_dir in conf.SSS_CODE_DIRS_TO_MOUNT: SSS_CODE_MOUNTS.append( mount.MountLocal(local_dir=code_dir, pythonpath=True) ) except ImportError: print("doodad not detected") target_mount = None def run_experiment( method_call, mode='local', exp_prefix='default', seed=None, variant=None, exp_id=0, prepend_date_to_exp_prefix=True, use_gpu=False, snapshot_mode='last', snapshot_gap=1, base_log_dir=None, local_input_dir_to_mount_point_dict=None, # TODO(vitchyr): test this # local settings skip_wait=False, # ec2 settings sync_interval=180, region='us-east-1', instance_type=None, spot_price=None, verbose=False, num_exps_per_instance=1, # sss settings time_in_mins=None, # ssh settings ssh_host=None, # gcp gcp_kwargs=None, ): """ Usage: ``` def foo(variant): x = variant['x'] y = variant['y'] logger.log("sum", x+y) variant = { 'x': 4, 'y': 3, } run_experiment(foo, variant, exp_prefix="my-experiment") ``` Results are saved to `base_log_dir/-my-experiment/-my-experiment-` By default, the base_log_dir is determined by `config.LOCAL_LOG_DIR/` :param method_call: a function that takes in a dictionary as argument :param mode: A string: - 'local' - 'local_docker' - 'ec2' - 'here_no_doodad': Run without doodad call :param exp_prefix: name of experiment :param seed: Seed for this specific trial. :param variant: Dictionary :param exp_id: One experiment = one variant setting + multiple seeds :param prepend_date_to_exp_prefix: If False, do not prepend the date to the experiment directory. :param use_gpu: :param snapshot_mode: See rlkit.core.logging :param snapshot_gap: See rlkit.core.logging :param base_log_dir: Will over :param sync_interval: How often to sync s3 data (in seconds). :param local_input_dir_to_mount_point_dict: Dictionary for doodad. :param ssh_host: the name of the host you want to ssh onto, should correspond to an entry in config.py of the following form: SSH_HOSTS=dict( ssh_host=dict( username='username', hostname='hostname/ip address', ) ) - if ssh_host is set to None, you will use ssh_host specified by config.SSH_DEFAULT_HOST :return: """ try: import doodad import doodad.mode import doodad.ssh except ImportError: print("Doodad not set up! Running experiment here.") mode = 'here_no_doodad' global ec2_okayed global gpu_ec2_okayed global target_mount global first_sss_launch """ Sanitize inputs as needed """ if seed is None: seed = random.randint(0, 100000) if variant is None: variant = {} if mode == 'ssh' and base_log_dir is None: base_log_dir = conf.SSH_LOG_DIR if base_log_dir is None: if mode == 'sss': base_log_dir = conf.SSS_LOG_DIR else: base_log_dir = conf.LOCAL_LOG_DIR for key, value in ppp.recursive_items(variant): # This check isn't really necessary, but it's to prevent myself from # forgetting to pass a variant through dot_map_dict_to_nested_dict. if "." in key: raise Exception( "Variants should not have periods in keys. Did you mean to " "convert {} into a nested dictionary?".format(key) ) if prepend_date_to_exp_prefix: exp_prefix = time.strftime("%m-%d") + "-" + exp_prefix variant['seed'] = str(seed) variant['exp_id'] = str(exp_id) variant['exp_prefix'] = str(exp_prefix) variant['instance_type'] = str(instance_type) try: import git doodad_path = osp.abspath(osp.join( osp.dirname(doodad.__file__), os.pardir )) dirs = conf.CODE_DIRS_TO_MOUNT + [doodad_path] git_infos = [] for directory in dirs: # Idk how to query these things, so I'm just doing try-catch try: repo = git.Repo(directory) try: branch_name = repo.active_branch.name except TypeError: branch_name = '[DETACHED]' git_infos.append(GitInfo( directory=directory, code_diff=repo.git.diff(None), code_diff_staged=repo.git.diff('--staged'), commit_hash=repo.head.commit.hexsha, branch_name=branch_name, )) except git.exc.InvalidGitRepositoryError: pass except ImportError: git_infos = None run_experiment_kwargs = dict( exp_prefix=exp_prefix, variant=variant, exp_id=exp_id, seed=seed, use_gpu=use_gpu, snapshot_mode=snapshot_mode, snapshot_gap=snapshot_gap, git_infos=git_infos, script_name=main.__file__, ) if mode == 'here_no_doodad': run_experiment_kwargs['base_log_dir'] = base_log_dir return run_experiment_here( method_call, **run_experiment_kwargs ) """ Safety Checks """ if mode == 'ec2' or mode == 'gcp': if not ec2_okayed and not query_yes_no( "{} costs money. Are you sure you want to run?".format(mode) ): sys.exit(1) if not gpu_ec2_okayed and use_gpu: if not query_yes_no( "{} is more expensive with GPUs. Confirm?".format(mode) ): sys.exit(1) gpu_ec2_okayed = True ec2_okayed = True """ GPU vs normal configs """ if use_gpu: docker_image = conf.GPU_DOODAD_DOCKER_IMAGE if instance_type is None: instance_type = conf.GPU_INSTANCE_TYPE else: assert instance_type[0] == 'g' if spot_price is None: spot_price = conf.GPU_SPOT_PRICE else: docker_image = conf.DOODAD_DOCKER_IMAGE if instance_type is None: instance_type = conf.INSTANCE_TYPE if spot_price is None: spot_price = conf.SPOT_PRICE if mode == 'sss': singularity_image = conf.SSS_IMAGE elif mode in ['local_singularity', 'slurm_singularity']: singularity_image = conf.SINGULARITY_IMAGE else: singularity_image = None """ Get the mode """ mode_kwargs = {} if use_gpu and mode == 'ec2': image_id = conf.REGION_TO_GPU_AWS_IMAGE_ID[region] if region == 'us-east-1': avail_zone = conf.REGION_TO_GPU_AWS_AVAIL_ZONE.get(region, "us-east-1b") mode_kwargs['extra_ec2_instance_kwargs'] = dict( Placement=dict( AvailabilityZone=avail_zone, ), ) else: image_id = None if hasattr(conf, "AWS_S3_PATH"): aws_s3_path = conf.AWS_S3_PATH else: aws_s3_path = None """ Create mode """ if mode == 'local': dmode = doodad.mode.Local(skip_wait=skip_wait) elif mode == 'local_docker': dmode = doodad.mode.LocalDocker( image=docker_image, gpu=use_gpu, ) elif mode == 'ssh': if ssh_host == None: ssh_dict = conf.SSH_HOSTS[conf.SSH_DEFAULT_HOST] else: ssh_dict = conf.SSH_HOSTS[ssh_host] credentials = doodad.ssh.credentials.SSHCredentials( username=ssh_dict['username'], hostname=ssh_dict['hostname'], identity_file=conf.SSH_PRIVATE_KEY ) dmode = doodad.mode.SSHDocker( credentials=credentials, image=docker_image, gpu=use_gpu, ) elif mode == 'local_singularity': dmode = doodad.mode.LocalSingularity( image=singularity_image, gpu=use_gpu, ) elif mode == 'slurm_singularity' or mode == 'sss': assert time_in_mins is not None, "Must approximate/set time in minutes" if use_gpu: kwargs = conf.SLURM_GPU_CONFIG else: kwargs = conf.SLURM_CPU_CONFIG if mode == 'slurm_singularity': dmode = doodad.mode.SlurmSingularity( image=singularity_image, gpu=use_gpu, time_in_mins=time_in_mins, skip_wait=skip_wait, pre_cmd=conf.SINGULARITY_PRE_CMDS, **kwargs ) else: dmode = doodad.mode.ScriptSlurmSingularity( image=singularity_image, gpu=use_gpu, time_in_mins=time_in_mins, skip_wait=skip_wait, pre_cmd=conf.SSS_PRE_CMDS, **kwargs ) elif mode == 'ec2': # Do this separately in case someone does not have EC2 configured dmode = doodad.mode.EC2AutoconfigDocker( image=docker_image, image_id=image_id, region=region, instance_type=instance_type, spot_price=spot_price, s3_log_prefix=exp_prefix, # Ask Vitchyr or Steven from an explanation, but basically we # will start just making the sub-directories within rlkit rather # than relying on doodad to do that. s3_log_name="", gpu=use_gpu, aws_s3_path=aws_s3_path, num_exps=num_exps_per_instance, **mode_kwargs ) elif mode == 'gcp': image_name = conf.GCP_IMAGE_NAME if use_gpu: image_name = conf.GCP_GPU_IMAGE_NAME if gcp_kwargs is None: gcp_kwargs = {} config_kwargs = { **conf.GCP_DEFAULT_KWARGS, **dict(image_name=image_name), **gcp_kwargs } dmode = doodad.mode.GCPDocker( image=docker_image, gpu=use_gpu, gcp_bucket_name=conf.GCP_BUCKET_NAME, gcp_log_prefix=exp_prefix, gcp_log_name="", **config_kwargs ) else: raise NotImplementedError("Mode not supported: {}".format(mode)) """ Get the mounts """ mounts = create_mounts( base_log_dir=base_log_dir, mode=mode, sync_interval=sync_interval, local_input_dir_to_mount_point_dict=local_input_dir_to_mount_point_dict, ) """ Get the outputs """ launch_locally = None target = conf.RUN_DOODAD_EXPERIMENT_SCRIPT_PATH if mode == 'ec2': # Ignored since I'm setting the snapshot dir directly base_log_dir_for_script = None run_experiment_kwargs['force_randomize_seed'] = True # The snapshot dir needs to be specified for S3 because S3 will # automatically create the experiment director and sub-directory. snapshot_dir_for_script = conf.OUTPUT_DIR_FOR_DOODAD_TARGET elif mode == 'local': base_log_dir_for_script = base_log_dir # The snapshot dir will be automatically created snapshot_dir_for_script = None elif mode == 'local_docker': base_log_dir_for_script = conf.OUTPUT_DIR_FOR_DOODAD_TARGET # The snapshot dir will be automatically created snapshot_dir_for_script = None elif mode == 'ssh': base_log_dir_for_script = conf.OUTPUT_DIR_FOR_DOODAD_TARGET # The snapshot dir will be automatically created snapshot_dir_for_script = None elif mode in ['local_singularity', 'slurm_singularity', 'sss']: base_log_dir_for_script = base_log_dir # The snapshot dir will be automatically created snapshot_dir_for_script = None launch_locally = True if mode == 'sss': dmode.set_first_time(first_sss_launch) first_sss_launch = False target = conf.SSS_RUN_DOODAD_EXPERIMENT_SCRIPT_PATH elif mode == 'here_no_doodad': base_log_dir_for_script = base_log_dir # The snapshot dir will be automatically created snapshot_dir_for_script = None elif mode == 'gcp': # Ignored since I'm setting the snapshot dir directly base_log_dir_for_script = None run_experiment_kwargs['force_randomize_seed'] = True snapshot_dir_for_script = conf.OUTPUT_DIR_FOR_DOODAD_TARGET else: raise NotImplementedError("Mode not supported: {}".format(mode)) run_experiment_kwargs['base_log_dir'] = base_log_dir_for_script target_mount = doodad.launch_python( target=target, mode=dmode, mount_points=mounts, args={ 'method_call': method_call, 'output_dir': snapshot_dir_for_script, 'run_experiment_kwargs': run_experiment_kwargs, 'mode': mode, }, use_cloudpickle=True, target_mount=target_mount, verbose=verbose, launch_locally=launch_locally, ) def create_mounts( mode, base_log_dir, sync_interval=180, local_input_dir_to_mount_point_dict=None, ): if mode == 'sss': code_mounts = SSS_CODE_MOUNTS non_code_mounts = SSS_NON_CODE_MOUNTS else: code_mounts = CODE_MOUNTS non_code_mounts = NON_CODE_MOUNTS if local_input_dir_to_mount_point_dict is None: local_input_dir_to_mount_point_dict = {} else: raise NotImplementedError("TODO(vitchyr): Implement this") mounts = [m for m in code_mounts] for dir, mount_point in local_input_dir_to_mount_point_dict.items(): mounts.append(mount.MountLocal( local_dir=dir, mount_point=mount_point, pythonpath=False, )) if mode != 'local': for m in non_code_mounts: mounts.append(m) if mode == 'ec2': output_mount = mount.MountS3( s3_path='', mount_point=conf.OUTPUT_DIR_FOR_DOODAD_TARGET, output=True, sync_interval=sync_interval, include_types=('*.txt', '*.csv', '*.json', '*.gz', '*.tar', '*.log', '*.pkl', '*.mp4', '*.png', '*.jpg', '*.jpeg', '*.patch'), ) elif mode == 'gcp': output_mount = mount.MountGCP( gcp_path='', mount_point=conf.OUTPUT_DIR_FOR_DOODAD_TARGET, output=True, gcp_bucket_name=conf.GCP_BUCKET_NAME, sync_interval=sync_interval, include_types=('*.txt', '*.csv', '*.json', '*.gz', '*.tar', '*.log', '*.pkl', '*.mp4', '*.png', '*.jpg', '*.jpeg', '*.patch'), ) elif mode in ['local', 'local_singularity', 'slurm_singularity', 'sss']: # To save directly to local files (singularity does this), skip mounting output_mount = mount.MountLocal( local_dir=base_log_dir, mount_point=None, output=True, ) elif mode == 'local_docker': output_mount = mount.MountLocal( local_dir=base_log_dir, mount_point=conf.OUTPUT_DIR_FOR_DOODAD_TARGET, output=True, ) elif mode == 'ssh': output_mount = mount.MountLocal( local_dir=base_log_dir, mount_point=conf.OUTPUT_DIR_FOR_DOODAD_TARGET, output=True, ) else: raise NotImplementedError("Mode not supported: {}".format(mode)) mounts.append(output_mount) return mounts