|
import os.path as osp |
|
import pickle |
|
import shutil |
|
import tempfile |
|
import time |
|
|
|
import mmcv |
|
import torch |
|
import torch.distributed as dist |
|
from mmcv.runner import get_dist_info |
|
|
|
|
|
def single_gpu_test(model, data_loader): |
|
"""Test with single gpu.""" |
|
model.eval() |
|
results = [] |
|
dataset = data_loader.dataset |
|
prog_bar = mmcv.ProgressBar(len(dataset)) |
|
for i, data in enumerate(data_loader): |
|
with torch.no_grad(): |
|
result = model(return_loss=False, **data) |
|
|
|
batch_size = len(result) |
|
if isinstance(result, list): |
|
results.extend(result) |
|
else: |
|
results.append(result) |
|
|
|
batch_size = data['motion'].size(0) |
|
for _ in range(batch_size): |
|
prog_bar.update() |
|
return results |
|
|
|
|
|
def multi_gpu_test(model, data_loader, tmpdir=None, gpu_collect=False): |
|
"""Test model with multiple gpus. |
|
This method tests model with multiple gpus and collects the results |
|
under two different modes: gpu and cpu modes. By setting 'gpu_collect=True' |
|
it encodes results to gpu tensors and use gpu communication for results |
|
collection. On cpu mode it saves the results on different gpus to 'tmpdir' |
|
and collects them by the rank 0 worker. |
|
Args: |
|
model (nn.Module): Model to be tested. |
|
data_loader (nn.Dataloader): Pytorch data loader. |
|
tmpdir (str): Path of directory to save the temporary results from |
|
different gpus under cpu mode. |
|
gpu_collect (bool): Option to use either gpu or cpu to collect results. |
|
Returns: |
|
list: The prediction results. |
|
""" |
|
model.eval() |
|
results = [] |
|
dataset = data_loader.dataset |
|
rank, world_size = get_dist_info() |
|
if rank == 0: |
|
|
|
if (not gpu_collect) and (tmpdir is not None and osp.exists(tmpdir)): |
|
raise OSError((f'The tmpdir {tmpdir} already exists.', |
|
' Since tmpdir will be deleted after testing,', |
|
' please make sure you specify an empty one.')) |
|
prog_bar = mmcv.ProgressBar(len(dataset)) |
|
time.sleep(2) |
|
for i, data in enumerate(data_loader): |
|
with torch.no_grad(): |
|
result = model(return_loss=False, **data) |
|
if isinstance(result, list): |
|
results.extend(result) |
|
else: |
|
results.append(result) |
|
|
|
if rank == 0: |
|
batch_size = data['motion'].size(0) |
|
for _ in range(batch_size * world_size): |
|
prog_bar.update() |
|
|
|
|
|
if gpu_collect: |
|
results = collect_results_gpu(results, len(dataset)) |
|
else: |
|
results = collect_results_cpu(results, len(dataset), tmpdir) |
|
return results |
|
|
|
|
|
def collect_results_cpu(result_part, size, tmpdir=None): |
|
"""Collect results in cpu.""" |
|
rank, world_size = get_dist_info() |
|
|
|
if tmpdir is None: |
|
MAX_LEN = 512 |
|
|
|
dir_tensor = torch.full((MAX_LEN, ), |
|
32, |
|
dtype=torch.uint8, |
|
device='cuda') |
|
if rank == 0: |
|
mmcv.mkdir_or_exist('.dist_test') |
|
tmpdir = tempfile.mkdtemp(dir='.dist_test') |
|
tmpdir = torch.tensor(bytearray(tmpdir.encode()), |
|
dtype=torch.uint8, |
|
device='cuda') |
|
dir_tensor[:len(tmpdir)] = tmpdir |
|
dist.broadcast(dir_tensor, 0) |
|
tmpdir = dir_tensor.cpu().numpy().tobytes().decode().rstrip() |
|
else: |
|
mmcv.mkdir_or_exist(tmpdir) |
|
|
|
mmcv.dump(result_part, osp.join(tmpdir, f'part_{rank}.pkl')) |
|
dist.barrier() |
|
|
|
if rank != 0: |
|
return None |
|
else: |
|
|
|
part_list = [] |
|
for i in range(world_size): |
|
part_file = osp.join(tmpdir, f'part_{i}.pkl') |
|
part_result = mmcv.load(part_file) |
|
part_list.append(part_result) |
|
|
|
ordered_results = [] |
|
for res in zip(*part_list): |
|
ordered_results.extend(list(res)) |
|
|
|
ordered_results = ordered_results[:size] |
|
|
|
shutil.rmtree(tmpdir) |
|
return ordered_results |
|
|
|
|
|
def collect_results_gpu(result_part, size): |
|
"""Collect results in gpu.""" |
|
rank, world_size = get_dist_info() |
|
|
|
part_tensor = torch.tensor(bytearray(pickle.dumps(result_part)), |
|
dtype=torch.uint8, |
|
device='cuda') |
|
|
|
shape_tensor = torch.tensor(part_tensor.shape, device='cuda') |
|
shape_list = [shape_tensor.clone() for _ in range(world_size)] |
|
dist.all_gather(shape_list, shape_tensor) |
|
|
|
shape_max = torch.tensor(shape_list).max() |
|
part_send = torch.zeros(shape_max, dtype=torch.uint8, device='cuda') |
|
part_send[:shape_tensor[0]] = part_tensor |
|
part_recv_list = [ |
|
part_tensor.new_zeros(shape_max) for _ in range(world_size) |
|
] |
|
|
|
dist.all_gather(part_recv_list, part_send) |
|
|
|
if rank == 0: |
|
ordered_results = [] |
|
for recv, shape in zip(part_recv_list, shape_list): |
|
part_result = pickle.loads(recv[:shape[0]].cpu().numpy().tobytes()) |
|
ordered_results.extend(part_result) |
|
|
|
ordered_results = ordered_results[:size] |
|
return ordered_results |
|
|