baiyanlali-zhao's picture
init
eaf2e33
import datetime
import json
import os
import os.path as osp
import pickle
import random
import sys
import time
from collections import namedtuple
import __main__ as main
import dateutil.tz
import numpy as np
from rlkit.core import logger
from rlkit.launchers import conf
from rlkit.torch.pytorch_util import set_gpu_mode
import rlkit.pythonplusplus as ppp
GitInfo = namedtuple(
'GitInfo',
[
'directory',
'code_diff',
'code_diff_staged',
'commit_hash',
'branch_name',
],
)
def get_git_infos(dirs):
try:
import git
git_infos = []
for directory in dirs:
# Idk how to query these things, so I'm just doing try-catch
try:
repo = git.Repo(directory)
try:
branch_name = repo.active_branch.name
except TypeError:
branch_name = '[DETACHED]'
git_infos.append(GitInfo(
directory=directory,
code_diff=repo.git.diff(None),
code_diff_staged=repo.git.diff('--staged'),
commit_hash=repo.head.commit.hexsha,
branch_name=branch_name,
))
except git.exc.InvalidGitRepositoryError as e:
print("Not a valid git repo: {}".format(directory))
except ImportError:
git_infos = None
return git_infos
def recursive_items(dictionary):
"""
Get all (key, item) recursively in a potentially recursive dictionary.
Usage:
```
x = {
'foo' : {
'bar' : 5
}
}
recursive_items(x)
# output:
# ('foo', {'bar' : 5})
# ('bar', 5)
```
:param dictionary:
:return:
"""
for key, value in dictionary.items():
yield key, value
if type(value) is dict:
yield from recursive_items(value)
def save_experiment_data(dictionary, log_dir):
with open(log_dir + '/experiment.pkl', 'wb') as handle:
pickle.dump(dictionary, handle, protocol=pickle.HIGHEST_PROTOCOL)
def run_experiment_here(
experiment_function,
variant=None,
exp_id=0,
seed=None,
use_gpu=True,
# Logger params:
exp_prefix="default",
snapshot_mode='last',
snapshot_gap=1,
git_infos=None,
script_name=None,
base_log_dir=None,
force_randomize_seed=False,
log_dir=None,
**setup_logger_kwargs
):
"""
Run an experiment locally without any serialization.
:param experiment_function: Function. `variant` will be passed in as its
only argument.
:param exp_prefix: Experiment prefix for the save file.
:param variant: Dictionary passed in to `experiment_function`.
:param exp_id: Experiment ID. Should be unique across all
experiments. Note that one experiment may correspond to multiple seeds,.
:param seed: Seed used for this experiment.
:param use_gpu: Run with GPU. By default False.
:param script_name: Name of the running script
:param log_dir: If set, set the log directory to this. Otherwise,
the directory will be auto-generated based on the exp_prefix.
:return:
"""
if variant is None:
variant = {}
variant['exp_id'] = str(exp_id)
if force_randomize_seed or seed is None:
seed = random.randint(0, 100000)
variant['seed'] = str(seed)
reset_execution_environment()
actual_log_dir = setup_logger(
exp_prefix=exp_prefix,
variant=variant,
exp_id=exp_id,
seed=seed,
snapshot_mode=snapshot_mode,
snapshot_gap=snapshot_gap,
base_log_dir=base_log_dir,
log_dir=log_dir,
git_infos=git_infos,
script_name=script_name,
**setup_logger_kwargs
)
set_seed(seed)
set_gpu_mode(use_gpu)
run_experiment_here_kwargs = dict(
variant=variant,
exp_id=exp_id,
seed=seed,
use_gpu=use_gpu,
exp_prefix=exp_prefix,
snapshot_mode=snapshot_mode,
snapshot_gap=snapshot_gap,
git_infos=git_infos,
script_name=script_name,
base_log_dir=base_log_dir,
**setup_logger_kwargs
)
save_experiment_data(
dict(
run_experiment_here_kwargs=run_experiment_here_kwargs
),
actual_log_dir
)
return experiment_function(variant)
def create_exp_name(exp_prefix, exp_id=0, seed=0):
"""
Create a semi-unique experiment name that has a timestamp
:param exp_prefix:
:param exp_id:
:return:
"""
now = datetime.datetime.now(dateutil.tz.tzlocal())
timestamp = now.strftime('%Y_%m_%d_%H_%M_%S')
return "%s_%s_%04d--s-%d" % (exp_prefix, timestamp, exp_id, seed)
def create_log_dir(
exp_prefix,
exp_id=0,
seed=0,
base_log_dir=None,
include_exp_prefix_sub_dir=True,
):
"""
Creates and returns a unique log directory.
:param exp_prefix: All experiments with this prefix will have log
directories be under this directory.
:param exp_id: The number of the specific experiment run within this
experiment.
:param base_log_dir: The directory where all log should be saved.
:return:
"""
exp_name = create_exp_name(exp_prefix, exp_id=exp_id,
seed=seed)
if base_log_dir is None:
base_log_dir = conf.LOCAL_LOG_DIR
if include_exp_prefix_sub_dir:
log_dir = osp.join(base_log_dir, exp_prefix.replace("_", "-"), exp_name)
else:
log_dir = osp.join(base_log_dir, exp_name)
if osp.exists(log_dir):
print("WARNING: Log directory already exists {}".format(log_dir))
os.makedirs(log_dir, exist_ok=True)
return log_dir
def setup_logger(
exp_prefix="default",
variant=None,
text_log_file="debug.log",
variant_log_file="variant.json",
tabular_log_file="progress.csv",
snapshot_mode="last",
snapshot_gap=1,
log_tabular_only=False,
log_dir=None,
git_infos=None,
script_name=None,
**create_log_dir_kwargs
):
"""
Set up logger to have some reasonable default settings.
Will save log output to
based_log_dir/exp_prefix/exp_name.
exp_name will be auto-generated to be unique.
If log_dir is specified, then that directory is used as the output dir.
:param exp_prefix: The sub-directory for this specific experiment.
:param variant:
:param text_log_file:
:param variant_log_file:
:param tabular_log_file:
:param snapshot_mode:
:param log_tabular_only:
:param snapshot_gap:
:param log_dir:
:param git_infos:
:param script_name: If set, save the script name to this.
:return:
"""
if git_infos is None:
git_infos = get_git_infos(conf.CODE_DIRS_TO_MOUNT)
first_time = log_dir is None
if first_time:
log_dir = create_log_dir(exp_prefix, **create_log_dir_kwargs)
if variant is not None:
logger.log("Variant:")
logger.log(json.dumps(dict_to_safe_json(variant), indent=2))
variant_log_path = osp.join(log_dir, variant_log_file)
logger.log_variant(variant_log_path, variant)
tabular_log_path = osp.join(log_dir, tabular_log_file)
text_log_path = osp.join(log_dir, text_log_file)
logger.add_text_output(text_log_path)
if first_time:
logger.add_tabular_output(tabular_log_path)
else:
logger._add_output(tabular_log_path, logger._tabular_outputs,
logger._tabular_fds, mode='a')
for tabular_fd in logger._tabular_fds:
logger._tabular_header_written.add(tabular_fd)
logger.set_snapshot_dir(log_dir)
logger.set_snapshot_mode(snapshot_mode)
logger.set_snapshot_gap(snapshot_gap)
logger.set_log_tabular_only(log_tabular_only)
exp_name = log_dir.split("/")[-1]
logger.push_prefix("[%s] " % exp_name)
if git_infos is not None:
for (
directory, code_diff, code_diff_staged, commit_hash, branch_name
) in git_infos:
if directory[-1] == '/':
directory = directory[:-1]
diff_file_name = directory[1:].replace("/", "-") + ".patch"
diff_staged_file_name = (
directory[1:].replace("/", "-") + "_staged.patch"
)
if code_diff is not None and len(code_diff) > 0:
with open(osp.join(log_dir, diff_file_name), "w") as f:
f.write(code_diff + '\n')
if code_diff_staged is not None and len(code_diff_staged) > 0:
with open(osp.join(log_dir, diff_staged_file_name), "w") as f:
f.write(code_diff_staged + '\n')
with open(osp.join(log_dir, "git_infos.txt"), "a") as f:
f.write("directory: {}\n".format(directory))
f.write("git hash: {}\n".format(commit_hash))
f.write("git branch name: {}\n\n".format(branch_name))
if script_name is not None:
with open(osp.join(log_dir, "script_name.txt"), "w") as f:
f.write(script_name)
return log_dir
def create_exp_name_custom(exp_prefix, batch=256, num_ensemble=1,
num_layer=2, seed=0, confthres=0, ber_mean=-1):
"""
Create a semi-unique experiment name that has a timestamp
:param exp_prefix:
:param exp_id:
:return:
"""
if 'conf' in exp_prefix:
if 'boot' in exp_prefix:
return "%s_en_%d_ber_%.1f_conf_%.1f_batch_%d_layer_%d_seed_%d" % (
exp_prefix,
num_ensemble,
ber_mean,
confthres, batch,
num_layer, seed)
else:
return "%s_en_%d_conf_%.1f_batch_%d_layer_%d_seed_%d" % (
exp_prefix, num_ensemble,
confthres, batch,
num_layer, seed)
else:
if 'boot' in exp_prefix:
return "%s_en_%d_ber_%.1f_batch_%d_layer_%d_seed_%d" % (
exp_prefix,
num_ensemble,
ber_mean,
batch,
num_layer, seed)
else:
return "%s_en_%d_batch_%d_layer_%d_seed_%d" % (exp_prefix, num_ensemble,
batch, num_layer, seed)
def create_log_dir_custom(
exp_prefix,
env_name,
batch=256,
num_layer=2,
seed=0,
num_ensemble=1,
confthres=0,
ber_mean=-1,
base_log_dir=None,
include_exp_prefix_sub_dir=True,
):
"""
Creates and returns a unique log directory.
:param exp_prefix: All experiments with this prefix will have log
directories be under this directory.
:param exp_id: The number of the specific experiment run within this
experiment.
:param base_log_dir: The directory where all log should be saved.
:return:
"""
exp_name = create_exp_name_custom(
exp_prefix, batch=batch,
num_ensemble=num_ensemble,
num_layer=num_layer, seed=seed,
confthres=confthres, ber_mean=ber_mean)
if base_log_dir is None:
base_log_dir = conf.LOCAL_LOG_DIR
base_log_dir += '/' + env_name + '/'
if include_exp_prefix_sub_dir:
log_dir = osp.join(base_log_dir, exp_prefix.replace("_", "-"), exp_name)
else:
log_dir = osp.join(base_log_dir, exp_name)
if osp.exists(log_dir):
print("WARNING: Log directory already exists {}".format(log_dir))
os.makedirs(log_dir, exist_ok=True)
return log_dir
def setup_logger_custom(
exp_prefix="default",
variant=None,
text_log_file="debug.log",
variant_log_file="variant.json",
tabular_log_file="progress.csv",
snapshot_mode="last",
snapshot_gap=1,
log_tabular_only=False,
log_dir=None,
git_infos=None,
script_name=None,
**create_log_dir_kwargs
):
"""
Set up logger to have some reasonable default settings.
Will save log output to
based_log_dir/exp_prefix/exp_name.
exp_name will be auto-generated to be unique.
If log_dir is specified, then that directory is used as the output dir.
:param exp_prefix: The sub-directory for this specific experiment.
:param variant:
:param text_log_file:
:param variant_log_file:
:param tabular_log_file:
:param snapshot_mode:
:param log_tabular_only:
:param snapshot_gap:
:param log_dir:
:param git_infos:
:param script_name: If set, save the script name to this.
:return:
"""
if git_infos is None:
git_infos = get_git_infos(conf.CODE_DIRS_TO_MOUNT)
first_time = log_dir is None
if first_time:
log_dir = create_log_dir_custom(
exp_prefix,
env_name=variant['env'],
batch=variant['algorithm_kwargs']['batch_size'],
num_ensemble=variant['num_ensemble'],
num_layer=variant['num_layer'],
seed=variant['seed'])
if variant is not None:
logger.log("Variant:")
logger.log(json.dumps(dict_to_safe_json(variant), indent=2))
variant_log_path = osp.join(log_dir, variant_log_file)
logger.log_variant(variant_log_path, variant)
tabular_log_path = osp.join(log_dir, tabular_log_file)
text_log_path = osp.join(log_dir, text_log_file)
logger.add_text_output(text_log_path)
if first_time:
logger.add_tabular_output(tabular_log_path)
else:
logger._add_output(tabular_log_path, logger._tabular_outputs,
logger._tabular_fds, mode='a')
for tabular_fd in logger._tabular_fds:
logger._tabular_header_written.add(tabular_fd)
logger.set_snapshot_dir(log_dir)
logger.set_snapshot_mode(snapshot_mode)
logger.set_snapshot_gap(snapshot_gap)
logger.set_log_tabular_only(log_tabular_only)
exp_name = log_dir.split("/")[-1]
logger.push_prefix("[%s] " % exp_name)
if git_infos is not None:
for (
directory, code_diff, code_diff_staged, commit_hash, branch_name
) in git_infos:
if directory[-1] == '/':
directory = directory[:-1]
diff_file_name = directory[1:].replace("/", "-") + ".patch"
diff_staged_file_name = (
directory[1:].replace("/", "-") + "_staged.patch"
)
if code_diff is not None and len(code_diff) > 0:
with open(osp.join(log_dir, diff_file_name), "w") as f:
f.write(code_diff + '\n')
if code_diff_staged is not None and len(code_diff_staged) > 0:
with open(osp.join(log_dir, diff_staged_file_name), "w") as f:
f.write(code_diff_staged + '\n')
with open(osp.join(log_dir, "git_infos.txt"), "a") as f:
f.write("directory: {}\n".format(directory))
f.write("git hash: {}\n".format(commit_hash))
f.write("git branch name: {}\n\n".format(branch_name))
if script_name is not None:
with open(osp.join(log_dir, "script_name.txt"), "w") as f:
f.write(script_name)
return log_dir
def dict_to_safe_json(d):
"""
Convert each value in the dictionary into a JSON'able primitive.
:param d:
:return:
"""
new_d = {}
for key, item in d.items():
if safe_json(item):
new_d[key] = item
else:
if isinstance(item, dict):
new_d[key] = dict_to_safe_json(item)
else:
new_d[key] = str(item)
return new_d
def safe_json(data):
if data is None:
return True
elif isinstance(data, (bool, int, float)):
return True
elif isinstance(data, (tuple, list)):
return all(safe_json(x) for x in data)
elif isinstance(data, dict):
return all(isinstance(k, str) and safe_json(v) for k, v in data.items())
return False
def set_seed(seed):
"""
Set the seed for all the possible random number generators.
:param seed:
:return: None
"""
seed = int(seed)
random.seed(seed)
np.random.seed(seed)
def reset_execution_environment():
"""
Call this between calls to separate experiments.
:return:
"""
logger.reset()
def query_yes_no(question, default="yes"):
"""Ask a yes/no question via raw_input() and return their answer.
"question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning
an answer is required of the user).
The "answer" return value is True for "yes" or False for "no".
"""
valid = {"yes": True, "y": True, "ye": True,
"no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default)
while True:
sys.stdout.write(question + prompt)
choice = input().lower()
if default is not None and choice == '':
return valid[default]
elif choice in valid:
return valid[choice]
else:
sys.stdout.write("Please respond with 'yes' or 'no' "
"(or 'y' or 'n').\n")
"""
Below is doodad-specific code
"""
ec2_okayed = False
gpu_ec2_okayed = False
first_sss_launch = True
try:
import doodad.mount as mount
from doodad.utils import REPO_DIR
CODE_MOUNTS = [
mount.MountLocal(local_dir=REPO_DIR, pythonpath=True),
]
for code_dir in conf.CODE_DIRS_TO_MOUNT:
CODE_MOUNTS.append(mount.MountLocal(local_dir=code_dir, pythonpath=True))
NON_CODE_MOUNTS = []
for non_code_mapping in conf.DIR_AND_MOUNT_POINT_MAPPINGS:
NON_CODE_MOUNTS.append(mount.MountLocal(**non_code_mapping))
SSS_CODE_MOUNTS = []
SSS_NON_CODE_MOUNTS = []
if hasattr(conf, 'SSS_DIR_AND_MOUNT_POINT_MAPPINGS'):
for non_code_mapping in conf.SSS_DIR_AND_MOUNT_POINT_MAPPINGS:
SSS_NON_CODE_MOUNTS.append(mount.MountLocal(**non_code_mapping))
if hasattr(conf, 'SSS_CODE_DIRS_TO_MOUNT'):
for code_dir in conf.SSS_CODE_DIRS_TO_MOUNT:
SSS_CODE_MOUNTS.append(
mount.MountLocal(local_dir=code_dir, pythonpath=True)
)
except ImportError:
print("doodad not detected")
target_mount = None
def run_experiment(
method_call,
mode='local',
exp_prefix='default',
seed=None,
variant=None,
exp_id=0,
prepend_date_to_exp_prefix=True,
use_gpu=False,
snapshot_mode='last',
snapshot_gap=1,
base_log_dir=None,
local_input_dir_to_mount_point_dict=None, # TODO(vitchyr): test this
# local settings
skip_wait=False,
# ec2 settings
sync_interval=180,
region='us-east-1',
instance_type=None,
spot_price=None,
verbose=False,
num_exps_per_instance=1,
# sss settings
time_in_mins=None,
# ssh settings
ssh_host=None,
# gcp
gcp_kwargs=None,
):
"""
Usage:
```
def foo(variant):
x = variant['x']
y = variant['y']
logger.log("sum", x+y)
variant = {
'x': 4,
'y': 3,
}
run_experiment(foo, variant, exp_prefix="my-experiment")
```
Results are saved to
`base_log_dir/<date>-my-experiment/<date>-my-experiment-<unique-id>`
By default, the base_log_dir is determined by
`config.LOCAL_LOG_DIR/`
:param method_call: a function that takes in a dictionary as argument
:param mode: A string:
- 'local'
- 'local_docker'
- 'ec2'
- 'here_no_doodad': Run without doodad call
:param exp_prefix: name of experiment
:param seed: Seed for this specific trial.
:param variant: Dictionary
:param exp_id: One experiment = one variant setting + multiple seeds
:param prepend_date_to_exp_prefix: If False, do not prepend the date to
the experiment directory.
:param use_gpu:
:param snapshot_mode: See rlkit.core.logging
:param snapshot_gap: See rlkit.core.logging
:param base_log_dir: Will over
:param sync_interval: How often to sync s3 data (in seconds).
:param local_input_dir_to_mount_point_dict: Dictionary for doodad.
:param ssh_host: the name of the host you want to ssh onto, should correspond to an entry in
config.py of the following form:
SSH_HOSTS=dict(
ssh_host=dict(
username='username',
hostname='hostname/ip address',
)
)
- if ssh_host is set to None, you will use ssh_host specified by
config.SSH_DEFAULT_HOST
:return:
"""
try:
import doodad
import doodad.mode
import doodad.ssh
except ImportError:
print("Doodad not set up! Running experiment here.")
mode = 'here_no_doodad'
global ec2_okayed
global gpu_ec2_okayed
global target_mount
global first_sss_launch
"""
Sanitize inputs as needed
"""
if seed is None:
seed = random.randint(0, 100000)
if variant is None:
variant = {}
if mode == 'ssh' and base_log_dir is None:
base_log_dir = conf.SSH_LOG_DIR
if base_log_dir is None:
if mode == 'sss':
base_log_dir = conf.SSS_LOG_DIR
else:
base_log_dir = conf.LOCAL_LOG_DIR
for key, value in ppp.recursive_items(variant):
# This check isn't really necessary, but it's to prevent myself from
# forgetting to pass a variant through dot_map_dict_to_nested_dict.
if "." in key:
raise Exception(
"Variants should not have periods in keys. Did you mean to "
"convert {} into a nested dictionary?".format(key)
)
if prepend_date_to_exp_prefix:
exp_prefix = time.strftime("%m-%d") + "-" + exp_prefix
variant['seed'] = str(seed)
variant['exp_id'] = str(exp_id)
variant['exp_prefix'] = str(exp_prefix)
variant['instance_type'] = str(instance_type)
try:
import git
doodad_path = osp.abspath(osp.join(
osp.dirname(doodad.__file__),
os.pardir
))
dirs = conf.CODE_DIRS_TO_MOUNT + [doodad_path]
git_infos = []
for directory in dirs:
# Idk how to query these things, so I'm just doing try-catch
try:
repo = git.Repo(directory)
try:
branch_name = repo.active_branch.name
except TypeError:
branch_name = '[DETACHED]'
git_infos.append(GitInfo(
directory=directory,
code_diff=repo.git.diff(None),
code_diff_staged=repo.git.diff('--staged'),
commit_hash=repo.head.commit.hexsha,
branch_name=branch_name,
))
except git.exc.InvalidGitRepositoryError:
pass
except ImportError:
git_infos = None
run_experiment_kwargs = dict(
exp_prefix=exp_prefix,
variant=variant,
exp_id=exp_id,
seed=seed,
use_gpu=use_gpu,
snapshot_mode=snapshot_mode,
snapshot_gap=snapshot_gap,
git_infos=git_infos,
script_name=main.__file__,
)
if mode == 'here_no_doodad':
run_experiment_kwargs['base_log_dir'] = base_log_dir
return run_experiment_here(
method_call,
**run_experiment_kwargs
)
"""
Safety Checks
"""
if mode == 'ec2' or mode == 'gcp':
if not ec2_okayed and not query_yes_no(
"{} costs money. Are you sure you want to run?".format(mode)
):
sys.exit(1)
if not gpu_ec2_okayed and use_gpu:
if not query_yes_no(
"{} is more expensive with GPUs. Confirm?".format(mode)
):
sys.exit(1)
gpu_ec2_okayed = True
ec2_okayed = True
"""
GPU vs normal configs
"""
if use_gpu:
docker_image = conf.GPU_DOODAD_DOCKER_IMAGE
if instance_type is None:
instance_type = conf.GPU_INSTANCE_TYPE
else:
assert instance_type[0] == 'g'
if spot_price is None:
spot_price = conf.GPU_SPOT_PRICE
else:
docker_image = conf.DOODAD_DOCKER_IMAGE
if instance_type is None:
instance_type = conf.INSTANCE_TYPE
if spot_price is None:
spot_price = conf.SPOT_PRICE
if mode == 'sss':
singularity_image = conf.SSS_IMAGE
elif mode in ['local_singularity', 'slurm_singularity']:
singularity_image = conf.SINGULARITY_IMAGE
else:
singularity_image = None
"""
Get the mode
"""
mode_kwargs = {}
if use_gpu and mode == 'ec2':
image_id = conf.REGION_TO_GPU_AWS_IMAGE_ID[region]
if region == 'us-east-1':
avail_zone = conf.REGION_TO_GPU_AWS_AVAIL_ZONE.get(region, "us-east-1b")
mode_kwargs['extra_ec2_instance_kwargs'] = dict(
Placement=dict(
AvailabilityZone=avail_zone,
),
)
else:
image_id = None
if hasattr(conf, "AWS_S3_PATH"):
aws_s3_path = conf.AWS_S3_PATH
else:
aws_s3_path = None
"""
Create mode
"""
if mode == 'local':
dmode = doodad.mode.Local(skip_wait=skip_wait)
elif mode == 'local_docker':
dmode = doodad.mode.LocalDocker(
image=docker_image,
gpu=use_gpu,
)
elif mode == 'ssh':
if ssh_host == None:
ssh_dict = conf.SSH_HOSTS[conf.SSH_DEFAULT_HOST]
else:
ssh_dict = conf.SSH_HOSTS[ssh_host]
credentials = doodad.ssh.credentials.SSHCredentials(
username=ssh_dict['username'],
hostname=ssh_dict['hostname'],
identity_file=conf.SSH_PRIVATE_KEY
)
dmode = doodad.mode.SSHDocker(
credentials=credentials,
image=docker_image,
gpu=use_gpu,
)
elif mode == 'local_singularity':
dmode = doodad.mode.LocalSingularity(
image=singularity_image,
gpu=use_gpu,
)
elif mode == 'slurm_singularity' or mode == 'sss':
assert time_in_mins is not None, "Must approximate/set time in minutes"
if use_gpu:
kwargs = conf.SLURM_GPU_CONFIG
else:
kwargs = conf.SLURM_CPU_CONFIG
if mode == 'slurm_singularity':
dmode = doodad.mode.SlurmSingularity(
image=singularity_image,
gpu=use_gpu,
time_in_mins=time_in_mins,
skip_wait=skip_wait,
pre_cmd=conf.SINGULARITY_PRE_CMDS,
**kwargs
)
else:
dmode = doodad.mode.ScriptSlurmSingularity(
image=singularity_image,
gpu=use_gpu,
time_in_mins=time_in_mins,
skip_wait=skip_wait,
pre_cmd=conf.SSS_PRE_CMDS,
**kwargs
)
elif mode == 'ec2':
# Do this separately in case someone does not have EC2 configured
dmode = doodad.mode.EC2AutoconfigDocker(
image=docker_image,
image_id=image_id,
region=region,
instance_type=instance_type,
spot_price=spot_price,
s3_log_prefix=exp_prefix,
# Ask Vitchyr or Steven from an explanation, but basically we
# will start just making the sub-directories within rlkit rather
# than relying on doodad to do that.
s3_log_name="",
gpu=use_gpu,
aws_s3_path=aws_s3_path,
num_exps=num_exps_per_instance,
**mode_kwargs
)
elif mode == 'gcp':
image_name = conf.GCP_IMAGE_NAME
if use_gpu:
image_name = conf.GCP_GPU_IMAGE_NAME
if gcp_kwargs is None:
gcp_kwargs = {}
config_kwargs = {
**conf.GCP_DEFAULT_KWARGS,
**dict(image_name=image_name),
**gcp_kwargs
}
dmode = doodad.mode.GCPDocker(
image=docker_image,
gpu=use_gpu,
gcp_bucket_name=conf.GCP_BUCKET_NAME,
gcp_log_prefix=exp_prefix,
gcp_log_name="",
**config_kwargs
)
else:
raise NotImplementedError("Mode not supported: {}".format(mode))
"""
Get the mounts
"""
mounts = create_mounts(
base_log_dir=base_log_dir,
mode=mode,
sync_interval=sync_interval,
local_input_dir_to_mount_point_dict=local_input_dir_to_mount_point_dict,
)
"""
Get the outputs
"""
launch_locally = None
target = conf.RUN_DOODAD_EXPERIMENT_SCRIPT_PATH
if mode == 'ec2':
# Ignored since I'm setting the snapshot dir directly
base_log_dir_for_script = None
run_experiment_kwargs['force_randomize_seed'] = True
# The snapshot dir needs to be specified for S3 because S3 will
# automatically create the experiment director and sub-directory.
snapshot_dir_for_script = conf.OUTPUT_DIR_FOR_DOODAD_TARGET
elif mode == 'local':
base_log_dir_for_script = base_log_dir
# The snapshot dir will be automatically created
snapshot_dir_for_script = None
elif mode == 'local_docker':
base_log_dir_for_script = conf.OUTPUT_DIR_FOR_DOODAD_TARGET
# The snapshot dir will be automatically created
snapshot_dir_for_script = None
elif mode == 'ssh':
base_log_dir_for_script = conf.OUTPUT_DIR_FOR_DOODAD_TARGET
# The snapshot dir will be automatically created
snapshot_dir_for_script = None
elif mode in ['local_singularity', 'slurm_singularity', 'sss']:
base_log_dir_for_script = base_log_dir
# The snapshot dir will be automatically created
snapshot_dir_for_script = None
launch_locally = True
if mode == 'sss':
dmode.set_first_time(first_sss_launch)
first_sss_launch = False
target = conf.SSS_RUN_DOODAD_EXPERIMENT_SCRIPT_PATH
elif mode == 'here_no_doodad':
base_log_dir_for_script = base_log_dir
# The snapshot dir will be automatically created
snapshot_dir_for_script = None
elif mode == 'gcp':
# Ignored since I'm setting the snapshot dir directly
base_log_dir_for_script = None
run_experiment_kwargs['force_randomize_seed'] = True
snapshot_dir_for_script = conf.OUTPUT_DIR_FOR_DOODAD_TARGET
else:
raise NotImplementedError("Mode not supported: {}".format(mode))
run_experiment_kwargs['base_log_dir'] = base_log_dir_for_script
target_mount = doodad.launch_python(
target=target,
mode=dmode,
mount_points=mounts,
args={
'method_call': method_call,
'output_dir': snapshot_dir_for_script,
'run_experiment_kwargs': run_experiment_kwargs,
'mode': mode,
},
use_cloudpickle=True,
target_mount=target_mount,
verbose=verbose,
launch_locally=launch_locally,
)
def create_mounts(
mode,
base_log_dir,
sync_interval=180,
local_input_dir_to_mount_point_dict=None,
):
if mode == 'sss':
code_mounts = SSS_CODE_MOUNTS
non_code_mounts = SSS_NON_CODE_MOUNTS
else:
code_mounts = CODE_MOUNTS
non_code_mounts = NON_CODE_MOUNTS
if local_input_dir_to_mount_point_dict is None:
local_input_dir_to_mount_point_dict = {}
else:
raise NotImplementedError("TODO(vitchyr): Implement this")
mounts = [m for m in code_mounts]
for dir, mount_point in local_input_dir_to_mount_point_dict.items():
mounts.append(mount.MountLocal(
local_dir=dir,
mount_point=mount_point,
pythonpath=False,
))
if mode != 'local':
for m in non_code_mounts:
mounts.append(m)
if mode == 'ec2':
output_mount = mount.MountS3(
s3_path='',
mount_point=conf.OUTPUT_DIR_FOR_DOODAD_TARGET,
output=True,
sync_interval=sync_interval,
include_types=('*.txt', '*.csv', '*.json', '*.gz', '*.tar',
'*.log', '*.pkl', '*.mp4', '*.png', '*.jpg',
'*.jpeg', '*.patch'),
)
elif mode == 'gcp':
output_mount = mount.MountGCP(
gcp_path='',
mount_point=conf.OUTPUT_DIR_FOR_DOODAD_TARGET,
output=True,
gcp_bucket_name=conf.GCP_BUCKET_NAME,
sync_interval=sync_interval,
include_types=('*.txt', '*.csv', '*.json', '*.gz', '*.tar',
'*.log', '*.pkl', '*.mp4', '*.png', '*.jpg',
'*.jpeg', '*.patch'),
)
elif mode in ['local', 'local_singularity', 'slurm_singularity', 'sss']:
# To save directly to local files (singularity does this), skip mounting
output_mount = mount.MountLocal(
local_dir=base_log_dir,
mount_point=None,
output=True,
)
elif mode == 'local_docker':
output_mount = mount.MountLocal(
local_dir=base_log_dir,
mount_point=conf.OUTPUT_DIR_FOR_DOODAD_TARGET,
output=True,
)
elif mode == 'ssh':
output_mount = mount.MountLocal(
local_dir=base_log_dir,
mount_point=conf.OUTPUT_DIR_FOR_DOODAD_TARGET,
output=True,
)
else:
raise NotImplementedError("Mode not supported: {}".format(mode))
mounts.append(output_mount)
return mounts