LiDAR-Diffusion / lidm /eval /metric_utils.py
Hancy's picture
init
851751e
raw
history blame
17.8 kB
"""
@Author: Haoxi Ran
@Date: 01/03/2024
@Citation: Towards Realistic Scene Generation with LiDAR Diffusion Models
"""
import math
from itertools import repeat
from typing import List, Tuple, Union
import numpy as np
import torch
from . import build_model, VOXEL_SIZE, MODALITY2MODEL, MODAL2BATCHSIZE, DATASET_CONFIG, AGG_TYPE, NUM_SECTORS, \
TYPE2DATASET, DATA_CONFIG
try:
from torchsparse import SparseTensor, PointTensor
from torchsparse.utils.collate import sparse_collate_fn
from .modules.chamfer3D.dist_chamfer_3D import chamfer_3DDist
from .modules.chamfer2D.dist_chamfer_2D import chamfer_2DDist
from .modules.emd.emd_module import emdModule
except:
print(
'To install torchsparse 1.4.0, please refer to https://github.com/mit-han-lab/torchsparse/tree/74099d10a51c71c14318bce63d6421f698b24f24')
def ravel_hash(x: np.ndarray) -> np.ndarray:
assert x.ndim == 2, x.shape
x = x - np.min(x, axis=0)
x = x.astype(np.uint64, copy=False)
xmax = np.max(x, axis=0).astype(np.uint64) + 1
h = np.zeros(x.shape[0], dtype=np.uint64)
for k in range(x.shape[1] - 1):
h += x[:, k]
h *= xmax[k + 1]
h += x[:, -1]
return h
def sparse_quantize(coords, voxel_size: Union[float, Tuple[float, ...]] = 1, *, return_index: bool = False,
return_inverse: bool = False) -> List[np.ndarray]:
"""
Modified based on https://github.com/mit-han-lab/torchsparse/blob/462dea4a701f87a7545afb3616bf2cf53dd404f3/torchsparse/utils/quantize.py
"""
if isinstance(voxel_size, (float, int)):
voxel_size = tuple(repeat(voxel_size, coords.shape[1]))
assert isinstance(voxel_size, tuple) and len(voxel_size) in [2, 3] # support 2D and 3D coordinates only
voxel_size = np.array(voxel_size)
coords = np.floor(coords / voxel_size).astype(np.int32)
_, indices, inverse_indices = np.unique(
ravel_hash(coords), return_index=True, return_inverse=True
)
coords = coords[indices]
outputs = [coords]
if return_index:
outputs += [indices]
if return_inverse:
outputs += [inverse_indices]
return outputs[0] if len(outputs) == 1 else outputs
def pcd2range(pcd, size, fov, depth_range, remission=None, labels=None, **kwargs):
# laser parameters
fov_up = fov[0] / 180.0 * np.pi # field of view up in rad
fov_down = fov[1] / 180.0 * np.pi # field of view down in rad
fov_range = abs(fov_down) + abs(fov_up) # get field of view total in rad
# get depth (distance) of all points
depth = np.linalg.norm(pcd, 2, axis=1)
# mask points out of range
mask = np.logical_and(depth > depth_range[0], depth < depth_range[1])
depth, pcd = depth[mask], pcd[mask]
# get scan components
scan_x, scan_y, scan_z = pcd[:, 0], pcd[:, 1], pcd[:, 2]
# get angles of all points
yaw = -np.arctan2(scan_y, scan_x)
pitch = np.arcsin(scan_z / depth)
# get projections in image coords
proj_x = 0.5 * (yaw / np.pi + 1.0) # in [0.0, 1.0]
proj_y = 1.0 - (pitch + abs(fov_down)) / fov_range # in [0.0, 1.0]
# scale to image size using angular resolution
proj_x *= size[1] # in [0.0, W]
proj_y *= size[0] # in [0.0, H]
# round and clamp for use as index
proj_x = np.maximum(0, np.minimum(size[1] - 1, np.floor(proj_x))).astype(np.int32) # in [0,W-1]
proj_y = np.maximum(0, np.minimum(size[0] - 1, np.floor(proj_y))).astype(np.int32) # in [0,H-1]
# order in decreasing depth
order = np.argsort(depth)[::-1]
proj_x, proj_y = proj_x[order], proj_y[order]
# project depth
depth = depth[order]
proj_range = np.full(size, -1, dtype=np.float32)
proj_range[proj_y, proj_x] = depth
# project point feature
if remission is not None:
remission = remission[mask][order]
proj_feature = np.full(size, -1, dtype=np.float32)
proj_feature[proj_y, proj_x] = remission
elif labels is not None:
labels = labels[mask][order]
proj_feature = np.full(size, 0, dtype=np.float32)
proj_feature[proj_y, proj_x] = labels
else:
proj_feature = None
return proj_range, proj_feature
def range2xyz(range_img, fov, depth_range, depth_scale, log_scale=True, **kwargs):
# laser parameters
size = range_img.shape
fov_up = fov[0] / 180.0 * np.pi # field of view up in rad
fov_down = fov[1] / 180.0 * np.pi # field of view down in rad
fov_range = abs(fov_down) + abs(fov_up) # get field of view total in rad
# inverse transform from depth
if log_scale:
depth = (np.exp2(range_img * depth_scale) - 1)
else:
depth = range_img
scan_x, scan_y = np.meshgrid(np.arange(size[1]), np.arange(size[0]))
scan_x = scan_x.astype(np.float64) / size[1]
scan_y = scan_y.astype(np.float64) / size[0]
yaw = np.pi * (scan_x * 2 - 1)
pitch = (1.0 - scan_y) * fov_range - abs(fov_down)
xyz = -np.ones((3, *size))
xyz[0] = np.cos(yaw) * np.cos(pitch) * depth
xyz[1] = -np.sin(yaw) * np.cos(pitch) * depth
xyz[2] = np.sin(pitch) * depth
# mask out invalid points
mask = np.logical_and(depth > depth_range[0], depth < depth_range[1])
xyz[:, ~mask] = -1
return xyz
def pcd2voxel(pcd):
pcd_voxel = np.round(pcd / VOXEL_SIZE)
pcd_voxel = pcd_voxel - pcd_voxel.min(0, keepdims=1)
feat = np.concatenate((pcd, -np.ones((pcd.shape[0], 1))), axis=1) # -1 for remission placeholder
_, inds, inverse_map = sparse_quantize(pcd_voxel, 1, return_index=True, return_inverse=True)
feat = torch.FloatTensor(feat[inds])
pcd_voxel = torch.LongTensor(pcd_voxel[inds])
lidar = SparseTensor(feat, pcd_voxel)
output = {'lidar': lidar}
return output
def pcd2voxel_full(data_type, *args):
config = DATA_CONFIG[data_type]
x_range, y_range, z_range = config['x'], config['y'], config['z']
vol_shape = (math.ceil((x_range[1] - x_range[0]) / VOXEL_SIZE), math.ceil((y_range[1] - y_range[0]) / VOXEL_SIZE),
math.ceil((z_range[1] - z_range[0]) / VOXEL_SIZE))
min_bound = (math.ceil((x_range[0]) / VOXEL_SIZE), math.ceil((y_range[0]) / VOXEL_SIZE),
math.ceil((z_range[0]) / VOXEL_SIZE))
output = tuple()
for data in args:
volume_list = []
for pcd in data:
# mask out invalid points
mask_x = np.logical_and(pcd[:, 0] > x_range[0], pcd[:, 0] < x_range[1])
mask_y = np.logical_and(pcd[:, 1] > y_range[0], pcd[:, 1] < y_range[1])
mask_z = np.logical_and(pcd[:, 2] > z_range[0], pcd[:, 2] < z_range[1])
mask = mask_x & mask_y & mask_z
pcd = pcd[mask]
# voxelize
pcd_voxel = np.floor(pcd / VOXEL_SIZE)
_, indices, inverse_map = sparse_quantize(pcd_voxel, 1, return_index=True, return_inverse=True)
pcd_voxel = pcd_voxel[indices]
pcd_voxel = (pcd_voxel - min_bound).astype(np.int32)
# 2D bev grid
vol = np.zeros(vol_shape, dtype=np.float32)
vol[pcd_voxel[:, 0], pcd_voxel[:, 1], pcd_voxel[:, 2]] = 1
volume_list.append(vol)
output += (volume_list,)
return output
# def pcd2bev_full(data_type, *args, voxel_size=VOXEL_SIZE):
# config = DATA_CONFIG[data_type]
# x_range, y_range = config['x'], config['y']
# vol_shape = (math.ceil((x_range[1] - x_range[0]) / voxel_size), math.ceil((y_range[1] - y_range[0]) / voxel_size))
# min_bound = (math.ceil((x_range[0]) / voxel_size), math.ceil((y_range[0]) / voxel_size))
#
# output = tuple()
# for data in args:
# volume_list = []
# for pcd in data:
# # mask out invalid points
# mask_x = np.logical_and(pcd[:, 0] > x_range[0], pcd[:, 0] < x_range[1])
# mask_y = np.logical_and(pcd[:, 1] > y_range[0], pcd[:, 1] < y_range[1])
# mask = mask_x & mask_y
# pcd = pcd[mask][:, :2] # keep x,y coord
#
# # voxelize
# pcd_voxel = np.floor(pcd / voxel_size)
# _, indices, inverse_map = sparse_quantize(pcd_voxel, 1, return_index=True, return_inverse=True)
# pcd_voxel = pcd_voxel[indices]
# pcd_voxel = (pcd_voxel - min_bound).astype(np.int32)
#
# # 2D bev grid
# vol = np.zeros(vol_shape, dtype=np.float32)
# vol[pcd_voxel[:, 0], pcd_voxel[:, 1]] = 1
# volume_list.append(vol)
# output += (volume_list,)
# return output
def pcd2bev_sum(data_type, *args, voxel_size=VOXEL_SIZE):
config = DATA_CONFIG[data_type]
x_range, y_range = config['x'], config['y']
vol_shape = (math.ceil((x_range[1] - x_range[0]) / voxel_size), math.ceil((y_range[1] - y_range[0]) / voxel_size))
min_bound = (math.ceil((x_range[0]) / voxel_size), math.ceil((y_range[0]) / voxel_size))
output = tuple()
for data in args:
volume_sum = np.zeros(vol_shape, np.float32)
for pcd in data:
# mask out invalid points
mask_x = np.logical_and(pcd[:, 0] > x_range[0], pcd[:, 0] < x_range[1])
mask_y = np.logical_and(pcd[:, 1] > y_range[0], pcd[:, 1] < y_range[1])
mask = mask_x & mask_y
pcd = pcd[mask][:, :2] # keep x,y coord
# voxelize
pcd_voxel = np.floor(pcd / voxel_size)
_, indices, inverse_map = sparse_quantize(pcd_voxel, 1, return_index=True, return_inverse=True)
pcd_voxel = pcd_voxel[indices]
pcd_voxel = (pcd_voxel - min_bound).astype(np.int32)
# summation
volume_sum[pcd_voxel[:, 0], pcd_voxel[:, 1]] += 1.
output += (volume_sum,)
return output
def pcd2bev_bin(data_type, *args, voxel_size=0.5):
config = DATA_CONFIG[data_type]
x_range, y_range = config['x'], config['y']
vol_shape = (math.ceil((x_range[1] - x_range[0]) / voxel_size), math.ceil((y_range[1] - y_range[0]) / voxel_size))
min_bound = (math.ceil((x_range[0]) / voxel_size), math.ceil((y_range[0]) / voxel_size))
output = tuple()
for data in args:
pcd_list = []
for pcd in data:
# mask out invalid points
mask_x = np.logical_and(pcd[:, 0] > x_range[0], pcd[:, 0] < x_range[1])
mask_y = np.logical_and(pcd[:, 1] > y_range[0], pcd[:, 1] < y_range[1])
mask = mask_x & mask_y
pcd = pcd[mask][:, :2] # keep x,y coord
# voxelize
pcd_voxel = np.floor(pcd / voxel_size)
_, indices, inverse_map = sparse_quantize(pcd_voxel, 1, return_index=True, return_inverse=True)
pcd_voxel = pcd_voxel[indices]
pcd_voxel = ((pcd_voxel - min_bound) / vol_shape).astype(np.float32)
pcd_list.append(pcd_voxel)
output += (pcd_list,)
return output
def bev_sample(data_type, *args, voxel_size=0.5):
config = DATA_CONFIG[data_type]
x_range, y_range = config['x'], config['y']
output = tuple()
for data in args:
pcd_list = []
for pcd in data:
# mask out invalid points
mask_x = np.logical_and(pcd[:, 0] > x_range[0], pcd[:, 0] < x_range[1])
mask_y = np.logical_and(pcd[:, 1] > y_range[0], pcd[:, 1] < y_range[1])
mask = mask_x & mask_y
pcd = pcd[mask][:, :2] # keep x,y coord
# voxelize
pcd_voxel = np.floor(pcd / voxel_size)
_, indices, inverse_map = sparse_quantize(pcd_voxel, 1, return_index=True, return_inverse=True)
pcd = pcd[indices]
pcd_list.append(pcd)
output += (pcd_list,)
return output
def preprocess_pcd(pcd, **kwargs):
depth = np.linalg.norm(pcd, 2, axis=1)
mask = np.logical_and(depth > kwargs['depth_range'][0], depth < kwargs['depth_range'][1])
pcd = pcd[mask]
return pcd
def preprocess_range(pcd, **kwargs):
depth_img = pcd2range(pcd, **kwargs)[0]
xyz_img = range2xyz(depth_img, log_scale=False, **kwargs)
depth_img = depth_img[None]
img = np.vstack([depth_img, xyz_img])
return img
def batch2list(batch_dict, agg_type='depth', **kwargs):
"""
Aggregation Type: Default 'depth', ['all', 'sector', 'depth']
"""
output_list = []
batch_indices = batch_dict['batch_indices']
for b_idx in range(batch_indices.max() + 1):
# avg all
if agg_type == 'all':
logits = batch_dict['logits'][batch_indices == b_idx].mean(0)
# avg on sectors
elif agg_type == 'sector':
logits = batch_dict['logits'][batch_indices == b_idx]
coords = batch_dict['coords'][batch_indices == b_idx].float()
coords = coords - coords.mean(0)
angle = torch.atan2(coords[:, 1], coords[:, 0]) # [-pi, pi]
sector_range = torch.linspace(-np.pi - 1e-4, np.pi + 1e-4, NUM_SECTORS + 1)
logits_list = []
for i in range(NUM_SECTORS):
sector_indices = torch.where((angle >= sector_range[i]) & (angle < sector_range[i + 1]))[0]
sector_logits = logits[sector_indices].mean(0)
sector_logits = torch.nan_to_num(sector_logits, 0.)
logits_list.append(sector_logits)
logits = torch.cat(logits_list) # dim: 768
# avg by depth
elif agg_type == 'depth':
logits = batch_dict['logits'][batch_indices == b_idx]
coords = batch_dict['coords'][batch_indices == b_idx].float()
coords = coords - coords.mean(0)
bev_depth = torch.norm(coords, dim=-1) * VOXEL_SIZE
sector_range = torch.linspace(kwargs['depth_range'][0] + 3, kwargs['depth_range'][1], NUM_SECTORS + 1)
sector_range[0] = 0.
logits_list = []
for i in range(NUM_SECTORS):
sector_indices = torch.where((bev_depth >= sector_range[i]) & (bev_depth < sector_range[i + 1]))[0]
sector_logits = logits[sector_indices].mean(0)
sector_logits = torch.nan_to_num(sector_logits, 0.)
logits_list.append(sector_logits)
logits = torch.cat(logits_list) # dim: 768
else:
raise NotImplementedError
output_list.append(logits.detach().cpu().numpy())
return output_list
def compute_logits(data_type, modality, *args):
assert data_type in ['32', '64']
assert modality in ['range', 'voxel', 'point_voxel']
is_voxel = 'voxel' in modality
dataset_name = TYPE2DATASET[data_type]
dataset_config = DATASET_CONFIG[dataset_name]
bs = MODAL2BATCHSIZE[modality]
model = build_model(dataset_name, MODALITY2MODEL[modality], device='cuda')
output = tuple()
for data in args:
all_logits_list = []
for i in range(math.ceil(len(data) / bs)):
batch = data[i * bs:(i + 1) * bs]
if is_voxel:
batch = [pcd2voxel(preprocess_pcd(pcd, **dataset_config)) for pcd in batch]
batch = sparse_collate_fn(batch)
batch = {k: v.cuda() if isinstance(v, (torch.Tensor, SparseTensor, PointTensor)) else v for k, v in
batch.items()}
with torch.no_grad():
batch_out = model(batch, return_final_logits=True)
batch_out = batch2list(batch_out, AGG_TYPE, **dataset_config)
all_logits_list.extend(batch_out)
else:
batch = [preprocess_range(pcd, **dataset_config) for pcd in batch]
batch = torch.from_numpy(np.stack(batch)).float().cuda()
with torch.no_grad():
batch_out = model(batch, return_final_logits=True, agg_type=AGG_TYPE)
all_logits_list.append(batch_out)
if is_voxel:
all_logits = np.stack(all_logits_list)
else:
all_logits = np.vstack(all_logits_list)
output += (all_logits,)
del model, batch, batch_out
torch.cuda.empty_cache()
return output
def compute_pairwise_cd(x, y, module=None):
if module is None:
module = chamfer_3DDist()
if x.ndim == 2 and y.ndim == 2:
x, y = x[None], y[None]
x, y = torch.from_numpy(x).cuda(), torch.from_numpy(y).cuda()
dist1, dist2, _, _ = module(x, y)
dist = (dist1.mean() + dist2.mean()) / 2
return dist.item()
def compute_pairwise_cd_batch(reference, samples):
ndim = reference.ndim
assert ndim in [2, 3]
module = chamfer_3DDist() if ndim == 3 else chamfer_2DDist()
len_r, len_s = reference.shape[0], [s.shape[0] for s in samples]
max_len = max([len_r] + len_s)
reference = torch.from_numpy(
np.vstack([reference, np.ones((max_len - reference.shape[0], ndim), dtype=np.float32) * 1e6])).cuda()
samples = [np.vstack([s, np.ones((max_len - s.shape[0], ndim), dtype=np.float32) * 1e6]) for s in samples]
samples = torch.from_numpy(np.stack(samples)).cuda()
reference = reference.expand_as(samples)
dist_r, dist_s, _, _ = module(reference, samples)
results = []
for i in range(samples.shape[0]):
dist1, dist2, len1, len2 = dist_r[i], dist_s[i], len_r, len_s[i]
dist = (dist1[:len1].mean() + dist2[:len2].mean()) / 2.
results.append(dist.item())
return results
def compute_pairwise_emd(x, y, module=None):
if module is None:
module = emdModule()
n_points = min(x.shape[0], y.shape[0])
n_points = n_points - n_points % 1024
x, y = x[:n_points], y[:n_points]
if x.ndim == 2 and y.ndim == 2:
x, y = x[None], y[None]
x, y = torch.from_numpy(x).cuda(), torch.from_numpy(y).cuda()
dist, _ = module(x, y, 0.005, 50)
dist = torch.sqrt(dist).mean()
return dist.item()