jmercat's picture
Removed history to avoid any unverified information being released
5769ee4
from typing import Callable
import os
from typing import Optional, Tuple, Union
import warnings
from mmcv import Config
import torch
import wandb
from risk_biased.predictors.biased_predictor import (
LitTrajectoryPredictor,
LitTrajectoryPredictorParams,
)
from risk_biased.utils.config_argparse import config_argparse
from risk_biased.utils.cost import TTCCostParams
from risk_biased.utils.torch_utils import load_weights
from risk_biased.scene_dataset.loaders import SceneDataLoaders
from risk_biased.scene_dataset.scene import load_create_dataset
from risk_biased.utils.waymo_dataloader import WaymoDataloaders
def get_predictor(
config: Config, unnormalizer: Callable[[torch.Tensor, torch.Tensor], torch.Tensor]
):
params = LitTrajectoryPredictorParams.from_config(config)
model_class = LitTrajectoryPredictor
ttc_params = TTCCostParams.from_config(config)
return model_class(params=params, unnormalizer=unnormalizer, cost_params=ttc_params)
def load_from_wandb_id(
log_id: str,
log_path: str,
entity: str,
project: str,
config: Optional[Config] = None,
load_last=False,
) -> Tuple[Union[LitTrajectoryPredictor, LitTrajectoryPredictor], Config]:
"""
Load a model using a wandb id code.
Args:
log_id: the wandb id code
log_path: the wandb log directory path
config: An optional configuration argument, use these settings if not None, use the settings from the log directory otherwise
load_last: An optional argumument, set to True to load the last checkpoint instead of the best one
Returns:
Predictor model and config file either loaded from the checkpoint or the one passed as argument.
"""
list_matching = list(filter(lambda path: log_id in path, os.listdir(log_path)))
if len(list_matching) == 1:
list_ckpt = list(
filter(
lambda path: "epoch" in path and ".ckpt" in path,
os.listdir(os.path.join(log_path, list_matching[0], "files")),
)
)
if not load_last and len(list_ckpt) == 1:
print(f"Loading best model: {list_ckpt[0]}.")
checkpoint_path = os.path.join(
log_path, list_matching[0], "files", list_ckpt[0]
)
else:
print(f"Loading last checkpoint.")
checkpoint_path = os.path.join(
log_path, list_matching[0], "files", "last.ckpt"
)
config_path = os.path.join(
log_path, list_matching[0], "files", "learning_config.py"
)
if config is None:
config = config_argparse(config_path)
distant_model_type = None
else:
distant_config = config_argparse(config_path)
distant_model_type = distant_config.model_type
config["load_from"] = log_id
if config.model_type == "interaction_biased":
dataloaders = WaymoDataloaders(config)
else:
[data_train, data_val, data_test] = load_create_dataset(config)
dataloaders = SceneDataLoaders(
state_dim=config.state_dim,
num_steps=config.num_steps,
num_steps_future=config.num_steps_future,
batch_size=config.batch_size,
data_train=data_train,
data_val=data_val,
data_test=data_test,
num_workers=config.num_workers,
)
try:
if len(config.gpus):
map_location = "cpu"
else:
map_location = "gpu"
model = load_weights(
get_predictor(config, dataloaders.unnormalize_trajectory),
torch.load(checkpoint_path, map_location=map_location),
strict=True,
)
except RuntimeError:
raise RuntimeError(
f"The source model is of type {distant_model_type}."
+ " It cannot be used to load the weights of the interaction biased model."
)
return model, dataloaders, config
else:
print("Trying to download logs from WandB...")
api = wandb.Api()
run = api.run(entity + "/" + project + "/" + log_id)
if run is not None:
checkpoint_path = os.path.join(
log_path, "downloaded_run-" + log_id, "files"
)
os.makedirs(checkpoint_path)
for file in run.files():
if file.name.endswith("ckpt") or file.name.endswith("config.py"):
file.download(checkpoint_path)
return load_from_wandb_id(
log_id, log_path, entity, project, config, load_last
)
else:
raise RuntimeError(
f"Error while loading checkpoint: Found {len(list_matching)} occurences of the given id {log_id} in the logs at {log_path}."
)
def load_from_config(cfg: Config):
"""
This function loads the predictor model and the data depending on which one is selected in the config.
If a "load_from" field is not empty, then tries to load the pre-trained model from the checkpoint.
The matching config file is loaded
Args:
cfg : Configuration that defines the model to be loaded
Returns:
loaded model and a new version of the config that is compatible with the checkpoint model that it could be loaded from
"""
log_path = os.path.join(cfg.log_path, "wandb")
ignored_keys = [
"project",
"dataset_parameters",
"load_from",
"force_config",
"load_last",
]
if "load_from" in cfg.keys() and cfg.load_from != "" and cfg.load_from:
if "load_last" in cfg.keys():
load_last = cfg["load_last"]
else:
load_last = False
if cfg.force_config:
warnings.warn(
f"Using local configuration but loading from run {cfg.load_from}. Will fail if local configuration is not compatible."
)
predictor, dataloaders, config = load_from_wandb_id(
log_id=cfg.load_from,
log_path=log_path,
entity=cfg.entity,
project=cfg.project,
config=cfg,
load_last=load_last,
)
else:
predictor, dataloaders, config = load_from_wandb_id(
log_id=cfg.load_from,
log_path=log_path,
entity=cfg.entity,
project=cfg.project,
load_last=load_last,
)
difference = False
warning_message = ""
for key, item in cfg.items():
try:
if config[key] != item:
if not difference:
warning_message += "When loading the model, the configuration was changed to match the configuration of the pre-trained model to be loaded.\n"
difference = True
if key not in ignored_keys:
warning_message += f" The value of '{key}' is now '{config[key]}' instead of '{item}'."
except KeyError:
if not difference:
warning_message += "When loading the model, the configuration was changed to match the configuration of the pre-trained model to be loaded."
difference = True
warning_message += f" The parameter '{key}' with value '{item}' does not exist for the model you are loading from, it is added."
config[key] = item
if warning_message != "":
warnings.warn(warning_message)
return predictor, dataloaders, config
else:
if cfg.model_type == "interaction_biased":
dataloaders = WaymoDataloaders(cfg)
else:
[data_train, data_val, data_test] = load_create_dataset(cfg)
dataloaders = SceneDataLoaders(
state_dim=cfg.state_dim,
num_steps=cfg.num_steps,
num_steps_future=cfg.num_steps_future,
batch_size=cfg.batch_size,
data_train=data_train,
data_val=data_val,
data_test=data_test,
num_workers=cfg.num_workers,
)
predictor = get_predictor(cfg, dataloaders.unnormalize_trajectory)
return predictor, dataloaders, cfg