Spaces:
Running
Running
""" | |
@Author: Haoxi Ran | |
@Date: 01/03/2024 | |
@Citation: Towards Realistic Scene Generation with LiDAR Diffusion Models | |
""" | |
import math | |
from itertools import repeat | |
from typing import List, Tuple, Union | |
import numpy as np | |
import torch | |
from . import build_model, VOXEL_SIZE, MODALITY2MODEL, MODAL2BATCHSIZE, DATASET_CONFIG, AGG_TYPE, NUM_SECTORS, \ | |
TYPE2DATASET, DATA_CONFIG | |
try: | |
from torchsparse import SparseTensor, PointTensor | |
from torchsparse.utils.collate import sparse_collate_fn | |
from .modules.chamfer3D.dist_chamfer_3D import chamfer_3DDist | |
from .modules.chamfer2D.dist_chamfer_2D import chamfer_2DDist | |
from .modules.emd.emd_module import emdModule | |
except: | |
print( | |
'To install torchsparse 1.4.0, please refer to https://github.com/mit-han-lab/torchsparse/tree/74099d10a51c71c14318bce63d6421f698b24f24') | |
def ravel_hash(x: np.ndarray) -> np.ndarray: | |
assert x.ndim == 2, x.shape | |
x = x - np.min(x, axis=0) | |
x = x.astype(np.uint64, copy=False) | |
xmax = np.max(x, axis=0).astype(np.uint64) + 1 | |
h = np.zeros(x.shape[0], dtype=np.uint64) | |
for k in range(x.shape[1] - 1): | |
h += x[:, k] | |
h *= xmax[k + 1] | |
h += x[:, -1] | |
return h | |
def sparse_quantize(coords, voxel_size: Union[float, Tuple[float, ...]] = 1, *, return_index: bool = False, | |
return_inverse: bool = False) -> List[np.ndarray]: | |
""" | |
Modified based on https://github.com/mit-han-lab/torchsparse/blob/462dea4a701f87a7545afb3616bf2cf53dd404f3/torchsparse/utils/quantize.py | |
""" | |
if isinstance(voxel_size, (float, int)): | |
voxel_size = tuple(repeat(voxel_size, coords.shape[1])) | |
assert isinstance(voxel_size, tuple) and len(voxel_size) in [2, 3] # support 2D and 3D coordinates only | |
voxel_size = np.array(voxel_size) | |
coords = np.floor(coords / voxel_size).astype(np.int32) | |
_, indices, inverse_indices = np.unique( | |
ravel_hash(coords), return_index=True, return_inverse=True | |
) | |
coords = coords[indices] | |
outputs = [coords] | |
if return_index: | |
outputs += [indices] | |
if return_inverse: | |
outputs += [inverse_indices] | |
return outputs[0] if len(outputs) == 1 else outputs | |
def pcd2range(pcd, size, fov, depth_range, remission=None, labels=None, **kwargs): | |
# laser parameters | |
fov_up = fov[0] / 180.0 * np.pi # field of view up in rad | |
fov_down = fov[1] / 180.0 * np.pi # field of view down in rad | |
fov_range = abs(fov_down) + abs(fov_up) # get field of view total in rad | |
# get depth (distance) of all points | |
depth = np.linalg.norm(pcd, 2, axis=1) | |
# mask points out of range | |
mask = np.logical_and(depth > depth_range[0], depth < depth_range[1]) | |
depth, pcd = depth[mask], pcd[mask] | |
# get scan components | |
scan_x, scan_y, scan_z = pcd[:, 0], pcd[:, 1], pcd[:, 2] | |
# get angles of all points | |
yaw = -np.arctan2(scan_y, scan_x) | |
pitch = np.arcsin(scan_z / depth) | |
# get projections in image coords | |
proj_x = 0.5 * (yaw / np.pi + 1.0) # in [0.0, 1.0] | |
proj_y = 1.0 - (pitch + abs(fov_down)) / fov_range # in [0.0, 1.0] | |
# scale to image size using angular resolution | |
proj_x *= size[1] # in [0.0, W] | |
proj_y *= size[0] # in [0.0, H] | |
# round and clamp for use as index | |
proj_x = np.maximum(0, np.minimum(size[1] - 1, np.floor(proj_x))).astype(np.int32) # in [0,W-1] | |
proj_y = np.maximum(0, np.minimum(size[0] - 1, np.floor(proj_y))).astype(np.int32) # in [0,H-1] | |
# order in decreasing depth | |
order = np.argsort(depth)[::-1] | |
proj_x, proj_y = proj_x[order], proj_y[order] | |
# project depth | |
depth = depth[order] | |
proj_range = np.full(size, -1, dtype=np.float32) | |
proj_range[proj_y, proj_x] = depth | |
# project point feature | |
if remission is not None: | |
remission = remission[mask][order] | |
proj_feature = np.full(size, -1, dtype=np.float32) | |
proj_feature[proj_y, proj_x] = remission | |
elif labels is not None: | |
labels = labels[mask][order] | |
proj_feature = np.full(size, 0, dtype=np.float32) | |
proj_feature[proj_y, proj_x] = labels | |
else: | |
proj_feature = None | |
return proj_range, proj_feature | |
def range2xyz(range_img, fov, depth_range, depth_scale, log_scale=True, **kwargs): | |
# laser parameters | |
size = range_img.shape | |
fov_up = fov[0] / 180.0 * np.pi # field of view up in rad | |
fov_down = fov[1] / 180.0 * np.pi # field of view down in rad | |
fov_range = abs(fov_down) + abs(fov_up) # get field of view total in rad | |
# inverse transform from depth | |
if log_scale: | |
depth = (np.exp2(range_img * depth_scale) - 1) | |
else: | |
depth = range_img | |
scan_x, scan_y = np.meshgrid(np.arange(size[1]), np.arange(size[0])) | |
scan_x = scan_x.astype(np.float64) / size[1] | |
scan_y = scan_y.astype(np.float64) / size[0] | |
yaw = np.pi * (scan_x * 2 - 1) | |
pitch = (1.0 - scan_y) * fov_range - abs(fov_down) | |
xyz = -np.ones((3, *size)) | |
xyz[0] = np.cos(yaw) * np.cos(pitch) * depth | |
xyz[1] = -np.sin(yaw) * np.cos(pitch) * depth | |
xyz[2] = np.sin(pitch) * depth | |
# mask out invalid points | |
mask = np.logical_and(depth > depth_range[0], depth < depth_range[1]) | |
xyz[:, ~mask] = -1 | |
return xyz | |
def pcd2voxel(pcd): | |
pcd_voxel = np.round(pcd / VOXEL_SIZE) | |
pcd_voxel = pcd_voxel - pcd_voxel.min(0, keepdims=1) | |
feat = np.concatenate((pcd, -np.ones((pcd.shape[0], 1))), axis=1) # -1 for remission placeholder | |
_, inds, inverse_map = sparse_quantize(pcd_voxel, 1, return_index=True, return_inverse=True) | |
feat = torch.FloatTensor(feat[inds]) | |
pcd_voxel = torch.LongTensor(pcd_voxel[inds]) | |
lidar = SparseTensor(feat, pcd_voxel) | |
output = {'lidar': lidar} | |
return output | |
def pcd2voxel_full(data_type, *args): | |
config = DATA_CONFIG[data_type] | |
x_range, y_range, z_range = config['x'], config['y'], config['z'] | |
vol_shape = (math.ceil((x_range[1] - x_range[0]) / VOXEL_SIZE), math.ceil((y_range[1] - y_range[0]) / VOXEL_SIZE), | |
math.ceil((z_range[1] - z_range[0]) / VOXEL_SIZE)) | |
min_bound = (math.ceil((x_range[0]) / VOXEL_SIZE), math.ceil((y_range[0]) / VOXEL_SIZE), | |
math.ceil((z_range[0]) / VOXEL_SIZE)) | |
output = tuple() | |
for data in args: | |
volume_list = [] | |
for pcd in data: | |
# mask out invalid points | |
mask_x = np.logical_and(pcd[:, 0] > x_range[0], pcd[:, 0] < x_range[1]) | |
mask_y = np.logical_and(pcd[:, 1] > y_range[0], pcd[:, 1] < y_range[1]) | |
mask_z = np.logical_and(pcd[:, 2] > z_range[0], pcd[:, 2] < z_range[1]) | |
mask = mask_x & mask_y & mask_z | |
pcd = pcd[mask] | |
# voxelize | |
pcd_voxel = np.floor(pcd / VOXEL_SIZE) | |
_, indices, inverse_map = sparse_quantize(pcd_voxel, 1, return_index=True, return_inverse=True) | |
pcd_voxel = pcd_voxel[indices] | |
pcd_voxel = (pcd_voxel - min_bound).astype(np.int32) | |
# 2D bev grid | |
vol = np.zeros(vol_shape, dtype=np.float32) | |
vol[pcd_voxel[:, 0], pcd_voxel[:, 1], pcd_voxel[:, 2]] = 1 | |
volume_list.append(vol) | |
output += (volume_list,) | |
return output | |
# def pcd2bev_full(data_type, *args, voxel_size=VOXEL_SIZE): | |
# config = DATA_CONFIG[data_type] | |
# x_range, y_range = config['x'], config['y'] | |
# vol_shape = (math.ceil((x_range[1] - x_range[0]) / voxel_size), math.ceil((y_range[1] - y_range[0]) / voxel_size)) | |
# min_bound = (math.ceil((x_range[0]) / voxel_size), math.ceil((y_range[0]) / voxel_size)) | |
# | |
# output = tuple() | |
# for data in args: | |
# volume_list = [] | |
# for pcd in data: | |
# # mask out invalid points | |
# mask_x = np.logical_and(pcd[:, 0] > x_range[0], pcd[:, 0] < x_range[1]) | |
# mask_y = np.logical_and(pcd[:, 1] > y_range[0], pcd[:, 1] < y_range[1]) | |
# mask = mask_x & mask_y | |
# pcd = pcd[mask][:, :2] # keep x,y coord | |
# | |
# # voxelize | |
# pcd_voxel = np.floor(pcd / voxel_size) | |
# _, indices, inverse_map = sparse_quantize(pcd_voxel, 1, return_index=True, return_inverse=True) | |
# pcd_voxel = pcd_voxel[indices] | |
# pcd_voxel = (pcd_voxel - min_bound).astype(np.int32) | |
# | |
# # 2D bev grid | |
# vol = np.zeros(vol_shape, dtype=np.float32) | |
# vol[pcd_voxel[:, 0], pcd_voxel[:, 1]] = 1 | |
# volume_list.append(vol) | |
# output += (volume_list,) | |
# return output | |
def pcd2bev_sum(data_type, *args, voxel_size=VOXEL_SIZE): | |
config = DATA_CONFIG[data_type] | |
x_range, y_range = config['x'], config['y'] | |
vol_shape = (math.ceil((x_range[1] - x_range[0]) / voxel_size), math.ceil((y_range[1] - y_range[0]) / voxel_size)) | |
min_bound = (math.ceil((x_range[0]) / voxel_size), math.ceil((y_range[0]) / voxel_size)) | |
output = tuple() | |
for data in args: | |
volume_sum = np.zeros(vol_shape, np.float32) | |
for pcd in data: | |
# mask out invalid points | |
mask_x = np.logical_and(pcd[:, 0] > x_range[0], pcd[:, 0] < x_range[1]) | |
mask_y = np.logical_and(pcd[:, 1] > y_range[0], pcd[:, 1] < y_range[1]) | |
mask = mask_x & mask_y | |
pcd = pcd[mask][:, :2] # keep x,y coord | |
# voxelize | |
pcd_voxel = np.floor(pcd / voxel_size) | |
_, indices, inverse_map = sparse_quantize(pcd_voxel, 1, return_index=True, return_inverse=True) | |
pcd_voxel = pcd_voxel[indices] | |
pcd_voxel = (pcd_voxel - min_bound).astype(np.int32) | |
# summation | |
volume_sum[pcd_voxel[:, 0], pcd_voxel[:, 1]] += 1. | |
output += (volume_sum,) | |
return output | |
def pcd2bev_bin(data_type, *args, voxel_size=0.5): | |
config = DATA_CONFIG[data_type] | |
x_range, y_range = config['x'], config['y'] | |
vol_shape = (math.ceil((x_range[1] - x_range[0]) / voxel_size), math.ceil((y_range[1] - y_range[0]) / voxel_size)) | |
min_bound = (math.ceil((x_range[0]) / voxel_size), math.ceil((y_range[0]) / voxel_size)) | |
output = tuple() | |
for data in args: | |
pcd_list = [] | |
for pcd in data: | |
# mask out invalid points | |
mask_x = np.logical_and(pcd[:, 0] > x_range[0], pcd[:, 0] < x_range[1]) | |
mask_y = np.logical_and(pcd[:, 1] > y_range[0], pcd[:, 1] < y_range[1]) | |
mask = mask_x & mask_y | |
pcd = pcd[mask][:, :2] # keep x,y coord | |
# voxelize | |
pcd_voxel = np.floor(pcd / voxel_size) | |
_, indices, inverse_map = sparse_quantize(pcd_voxel, 1, return_index=True, return_inverse=True) | |
pcd_voxel = pcd_voxel[indices] | |
pcd_voxel = ((pcd_voxel - min_bound) / vol_shape).astype(np.float32) | |
pcd_list.append(pcd_voxel) | |
output += (pcd_list,) | |
return output | |
def bev_sample(data_type, *args, voxel_size=0.5): | |
config = DATA_CONFIG[data_type] | |
x_range, y_range = config['x'], config['y'] | |
output = tuple() | |
for data in args: | |
pcd_list = [] | |
for pcd in data: | |
# mask out invalid points | |
mask_x = np.logical_and(pcd[:, 0] > x_range[0], pcd[:, 0] < x_range[1]) | |
mask_y = np.logical_and(pcd[:, 1] > y_range[0], pcd[:, 1] < y_range[1]) | |
mask = mask_x & mask_y | |
pcd = pcd[mask][:, :2] # keep x,y coord | |
# voxelize | |
pcd_voxel = np.floor(pcd / voxel_size) | |
_, indices, inverse_map = sparse_quantize(pcd_voxel, 1, return_index=True, return_inverse=True) | |
pcd = pcd[indices] | |
pcd_list.append(pcd) | |
output += (pcd_list,) | |
return output | |
def preprocess_pcd(pcd, **kwargs): | |
depth = np.linalg.norm(pcd, 2, axis=1) | |
mask = np.logical_and(depth > kwargs['depth_range'][0], depth < kwargs['depth_range'][1]) | |
pcd = pcd[mask] | |
return pcd | |
def preprocess_range(pcd, **kwargs): | |
depth_img = pcd2range(pcd, **kwargs)[0] | |
xyz_img = range2xyz(depth_img, log_scale=False, **kwargs) | |
depth_img = depth_img[None] | |
img = np.vstack([depth_img, xyz_img]) | |
return img | |
def batch2list(batch_dict, agg_type='depth', **kwargs): | |
""" | |
Aggregation Type: Default 'depth', ['all', 'sector', 'depth'] | |
""" | |
output_list = [] | |
batch_indices = batch_dict['batch_indices'] | |
for b_idx in range(batch_indices.max() + 1): | |
# avg all | |
if agg_type == 'all': | |
logits = batch_dict['logits'][batch_indices == b_idx].mean(0) | |
# avg on sectors | |
elif agg_type == 'sector': | |
logits = batch_dict['logits'][batch_indices == b_idx] | |
coords = batch_dict['coords'][batch_indices == b_idx].float() | |
coords = coords - coords.mean(0) | |
angle = torch.atan2(coords[:, 1], coords[:, 0]) # [-pi, pi] | |
sector_range = torch.linspace(-np.pi - 1e-4, np.pi + 1e-4, NUM_SECTORS + 1) | |
logits_list = [] | |
for i in range(NUM_SECTORS): | |
sector_indices = torch.where((angle >= sector_range[i]) & (angle < sector_range[i + 1]))[0] | |
sector_logits = logits[sector_indices].mean(0) | |
sector_logits = torch.nan_to_num(sector_logits, 0.) | |
logits_list.append(sector_logits) | |
logits = torch.cat(logits_list) # dim: 768 | |
# avg by depth | |
elif agg_type == 'depth': | |
logits = batch_dict['logits'][batch_indices == b_idx] | |
coords = batch_dict['coords'][batch_indices == b_idx].float() | |
coords = coords - coords.mean(0) | |
bev_depth = torch.norm(coords, dim=-1) * VOXEL_SIZE | |
sector_range = torch.linspace(kwargs['depth_range'][0] + 3, kwargs['depth_range'][1], NUM_SECTORS + 1) | |
sector_range[0] = 0. | |
logits_list = [] | |
for i in range(NUM_SECTORS): | |
sector_indices = torch.where((bev_depth >= sector_range[i]) & (bev_depth < sector_range[i + 1]))[0] | |
sector_logits = logits[sector_indices].mean(0) | |
sector_logits = torch.nan_to_num(sector_logits, 0.) | |
logits_list.append(sector_logits) | |
logits = torch.cat(logits_list) # dim: 768 | |
else: | |
raise NotImplementedError | |
output_list.append(logits.detach().cpu().numpy()) | |
return output_list | |
def compute_logits(data_type, modality, *args): | |
assert data_type in ['32', '64'] | |
assert modality in ['range', 'voxel', 'point_voxel'] | |
is_voxel = 'voxel' in modality | |
dataset_name = TYPE2DATASET[data_type] | |
dataset_config = DATASET_CONFIG[dataset_name] | |
bs = MODAL2BATCHSIZE[modality] | |
model = build_model(dataset_name, MODALITY2MODEL[modality], device='cuda') | |
output = tuple() | |
for data in args: | |
all_logits_list = [] | |
for i in range(math.ceil(len(data) / bs)): | |
batch = data[i * bs:(i + 1) * bs] | |
if is_voxel: | |
batch = [pcd2voxel(preprocess_pcd(pcd, **dataset_config)) for pcd in batch] | |
batch = sparse_collate_fn(batch) | |
batch = {k: v.cuda() if isinstance(v, (torch.Tensor, SparseTensor, PointTensor)) else v for k, v in | |
batch.items()} | |
with torch.no_grad(): | |
batch_out = model(batch, return_final_logits=True) | |
batch_out = batch2list(batch_out, AGG_TYPE, **dataset_config) | |
all_logits_list.extend(batch_out) | |
else: | |
batch = [preprocess_range(pcd, **dataset_config) for pcd in batch] | |
batch = torch.from_numpy(np.stack(batch)).float().cuda() | |
with torch.no_grad(): | |
batch_out = model(batch, return_final_logits=True, agg_type=AGG_TYPE) | |
all_logits_list.append(batch_out) | |
if is_voxel: | |
all_logits = np.stack(all_logits_list) | |
else: | |
all_logits = np.vstack(all_logits_list) | |
output += (all_logits,) | |
del model, batch, batch_out | |
torch.cuda.empty_cache() | |
return output | |
def compute_pairwise_cd(x, y, module=None): | |
if module is None: | |
module = chamfer_3DDist() | |
if x.ndim == 2 and y.ndim == 2: | |
x, y = x[None], y[None] | |
x, y = torch.from_numpy(x).cuda(), torch.from_numpy(y).cuda() | |
dist1, dist2, _, _ = module(x, y) | |
dist = (dist1.mean() + dist2.mean()) / 2 | |
return dist.item() | |
def compute_pairwise_cd_batch(reference, samples): | |
ndim = reference.ndim | |
assert ndim in [2, 3] | |
module = chamfer_3DDist() if ndim == 3 else chamfer_2DDist() | |
len_r, len_s = reference.shape[0], [s.shape[0] for s in samples] | |
max_len = max([len_r] + len_s) | |
reference = torch.from_numpy( | |
np.vstack([reference, np.ones((max_len - reference.shape[0], ndim), dtype=np.float32) * 1e6])).cuda() | |
samples = [np.vstack([s, np.ones((max_len - s.shape[0], ndim), dtype=np.float32) * 1e6]) for s in samples] | |
samples = torch.from_numpy(np.stack(samples)).cuda() | |
reference = reference.expand_as(samples) | |
dist_r, dist_s, _, _ = module(reference, samples) | |
results = [] | |
for i in range(samples.shape[0]): | |
dist1, dist2, len1, len2 = dist_r[i], dist_s[i], len_r, len_s[i] | |
dist = (dist1[:len1].mean() + dist2[:len2].mean()) / 2. | |
results.append(dist.item()) | |
return results | |
def compute_pairwise_emd(x, y, module=None): | |
if module is None: | |
module = emdModule() | |
n_points = min(x.shape[0], y.shape[0]) | |
n_points = n_points - n_points % 1024 | |
x, y = x[:n_points], y[:n_points] | |
if x.ndim == 2 and y.ndim == 2: | |
x, y = x[None], y[None] | |
x, y = torch.from_numpy(x).cuda(), torch.from_numpy(y).cuda() | |
dist, _ = module(x, y, 0.005, 50) | |
dist = torch.sqrt(dist).mean() | |
return dist.item() | |