|
import torch |
|
import torch.distributed as dist |
|
import numpy as np |
|
|
|
|
|
def reduce_tensors(metrics): |
|
new_metrics = {} |
|
for k, v in metrics.items(): |
|
if isinstance(v, torch.Tensor): |
|
dist.all_reduce(v) |
|
v = v / dist.get_world_size() |
|
if type(v) is dict: |
|
v = reduce_tensors(v) |
|
new_metrics[k] = v |
|
return new_metrics |
|
|
|
|
|
def tensors_to_scalars(tensors): |
|
if isinstance(tensors, torch.Tensor): |
|
tensors = tensors.item() |
|
return tensors |
|
elif isinstance(tensors, dict): |
|
new_tensors = {} |
|
for k, v in tensors.items(): |
|
v = tensors_to_scalars(v) |
|
new_tensors[k] = v |
|
return new_tensors |
|
elif isinstance(tensors, list): |
|
return [tensors_to_scalars(v) for v in tensors] |
|
else: |
|
return tensors |
|
|
|
|
|
def convert_to_np(tensors): |
|
if isinstance(tensors, np.ndarray): |
|
return tensors |
|
elif isinstance(tensors, dict): |
|
new_np = {} |
|
for k, v in tensors.items(): |
|
if isinstance(v, torch.Tensor): |
|
v = v.cpu().numpy() |
|
if type(v) is dict: |
|
v = convert_to_np(v) |
|
new_np[k] = v |
|
elif isinstance(tensors, list): |
|
new_np = [] |
|
for v in tensors: |
|
if isinstance(v, torch.Tensor): |
|
v = v.cpu().numpy() |
|
if type(v) is dict: |
|
v = convert_to_np(v) |
|
new_np.append(v) |
|
elif isinstance(tensors, torch.Tensor): |
|
v = tensors |
|
if isinstance(v, torch.Tensor): |
|
v = v.cpu().numpy() |
|
if type(v) is dict: |
|
v = convert_to_np(v) |
|
new_np = v |
|
else: |
|
raise Exception(f'tensors_to_np does not support type {type(tensors)}.') |
|
return new_np |
|
|
|
|
|
def convert_to_tensor(arrays): |
|
if isinstance(arrays, np.ndarray): |
|
v = torch.from_numpy(arrays).float() |
|
ret = v |
|
elif isinstance(arrays, torch.Tensor): |
|
ret = arrays |
|
elif isinstance(arrays, list): |
|
v = torch.from_numpy(np.array(arrays)).float() |
|
elif type(arrays) is dict: |
|
ret = {} |
|
for k, v in arrays.items(): |
|
if isinstance(v, np.ndarray): |
|
v = torch.from_numpy(v).float() |
|
if type(v) is dict: |
|
v = convert_to_tensor(v) |
|
ret[k] = v |
|
return ret |
|
|
|
def convert_like(inp, target): |
|
if isinstance(target, np.ndarray): |
|
return convert_to_np(inp) |
|
elif isinstance(target, torch.Tensor): |
|
inp = convert_to_tensor(inp) |
|
inp = inp.to() |
|
if target.device == 'cpu': |
|
return move_to_cpu(inp) |
|
else: |
|
return move_to_cuda(inp) |
|
|
|
def move_to_cpu(tensors): |
|
ret = {} |
|
for k, v in tensors.items(): |
|
if isinstance(v, torch.Tensor): |
|
v = v.cpu() |
|
if type(v) is dict: |
|
v = move_to_cpu(v) |
|
ret[k] = v |
|
return ret |
|
|
|
|
|
def move_to_cuda(batch, gpu_id=0): |
|
|
|
if callable(getattr(batch, 'cuda', None)): |
|
return batch.cuda(gpu_id, non_blocking=True) |
|
elif callable(getattr(batch, 'to', None)): |
|
return batch.to(torch.device('cuda', gpu_id), non_blocking=True) |
|
elif isinstance(batch, list): |
|
for i, x in enumerate(batch): |
|
batch[i] = move_to_cuda(x, gpu_id) |
|
return batch |
|
elif isinstance(batch, tuple): |
|
batch = list(batch) |
|
for i, x in enumerate(batch): |
|
batch[i] = move_to_cuda(x, gpu_id) |
|
return tuple(batch) |
|
elif isinstance(batch, dict): |
|
for k, v in batch.items(): |
|
batch[k] = move_to_cuda(v, gpu_id) |
|
return batch |
|
elif isinstance(batch, int) or isinstance(batch, float) or isinstance(batch, str): |
|
return batch |
|
elif batch is None: |
|
return None |
|
else: |
|
print("| Error in move_to_batch: ",type(batch), batch) |
|
raise NotImplementedError() |
|
return batch |
|
|
|
def convert_to_half(arrays): |
|
if isinstance(arrays, np.ndarray): |
|
v = torch.from_numpy(arrays).half() |
|
ret = v |
|
elif isinstance(arrays, torch.Tensor): |
|
ret = arrays.half() |
|
elif isinstance(arrays, list): |
|
ret = [None for _ in range(len(arrays))] |
|
for i, v in enumerate(arrays): |
|
ret[i] = v.half() |
|
elif type(arrays) is dict: |
|
ret = {} |
|
for k, v in arrays.items(): |
|
if isinstance(v, np.ndarray): |
|
v = torch.from_numpy(v).half() |
|
if type(v) is dict: |
|
v = convert_to_tensor(v) |
|
ret[k] = v |
|
return ret |