unit_test / uniperceiver /utils /engine_util.py
herrius's picture
Upload 259 files
32b542e
import uniperceiver.utils.comm as comm
import torch
import numpy as np
from uniperceiver.utils.events import get_event_storage
from typing import Dict
from uniperceiver.datasets import (
build_standard_valtest_loader,
build_unified_train_loader,
)
import weakref
def write_metrics(loss_dict: Dict[str, torch.Tensor],
data_time: float,
prefix: str = "",
):
"""
Args:
loss_dict (dict): dict of scalar losses
data_time (float): time taken by the dataloader iteration
"""
metrics_dict = {}
for k, v in loss_dict.items():
if isinstance(v, torch.Tensor):
metrics_dict.update({k: v.detach().cpu().item()})
else:
metrics_dict.update({k: v})
metrics_dict["data_time"] = data_time
# Gather metrics among all workers for logging
# This assumes we do DDP-style training, which is currently the only
# supported method in detectron2.
all_metrics_dict = [metrics_dict]
if comm.is_main_process():
# print(all_metrics_dict)
storage = get_event_storage()
# data_time among workers can have high variance. The actual latency
# caused by data_time is the maximum among workers.
data_time = np.max([x.pop("data_time") for x in all_metrics_dict])
storage.put_scalar("data_time", data_time)
# average the rest metrics
metrics_dict = {
k: np.mean([x[k] for x in all_metrics_dict])
for k in all_metrics_dict[0].keys()
}
total_losses_reduced = sum(metrics_dict.values())
storage.put_scalar("{}total_loss".format(prefix),
total_losses_reduced)
if len(metrics_dict) > 1:
for k, v in metrics_dict.items():
if k != 'null_loss':
storage.put_scalar(f'{prefix}{k}', v)
def build_writers(cfg, max_iter):
from uniperceiver.engine.defaults import default_writers
return default_writers(cfg.OUTPUT_DIR, max_iter)
def build_train_loader(cfg, task_cfg, model):
loader = dict()
if cfg.DATALOADER.UNIFIED_DATASET:
loader = build_unified_train_loader(cfg, task_cfg, model=weakref.proxy(comm.unwrap_model(model)) if cfg.DATALOADER.LOAD_INLABEL else None)
return loader
else:
raise NotImplementedError('please use unified dataset.')
def build_test_loader(cfg, task_cfg):
loaders = dict()
#TODO: move multi-gpu eval in config file
for name, new_cfg in task_cfg.items():
multi_gpu = name in [
'K400_retrieve', 'imagenet', 'vqa', 'mscoco_caption',
'flickr30k_caption', 'K700_retrieve', 'imagenet_caption'
]
loaders[name] = build_standard_valtest_loader(new_cfg, task_cfg, stage='test', multi_gpu_eval=multi_gpu)
return loaders
def build_val_loader(cfg, task_cfg):
loaders = dict()
for name, new_cfg in task_cfg.items():
#TODO: move multi-gpu eval in config file
multi_gpu = name in [
'K400_retrieve', 'imagenet', 'vqa', 'mscoco_caption',
'flickr30k_caption', 'K700_retrieve', 'imagenet_caption'
]
loaders[name] = build_standard_valtest_loader(new_cfg, task_cfg, stage='val', multi_gpu_eval=multi_gpu)
return loaders
def get_batch_data(cfg, train_data_loader_iter, train_data_loader):
if not cfg.DATALOADER.FAKE_DATA:
try:
data = next(train_data_loader_iter)
except StopIteration:
train_data_loader_iter = iter(train_data_loader)
data = next(train_data_loader_iter)
else:
# fake data
bs = 32
return data