Spaces:
Runtime error
Runtime error
# TRI-VIDAR - Copyright 2022 Toyota Research Institute. All rights reserved. | |
import os | |
import pickle | |
import numpy as np | |
from dgp.utils.camera import Camera | |
from dgp.utils.pose import Pose | |
from vidar.datasets.BaseDataset import BaseDataset | |
from vidar.datasets.utils.misc import \ | |
stack_sample, make_relative_pose | |
from vidar.utils.data import make_list | |
from vidar.utils.read import read_image | |
from vidar.utils.types import is_str | |
def load_from_file(filename, key): | |
"""Load data cache from a file""" | |
data = np.load(filename, allow_pickle=True)[key] | |
if len(data.shape) == 0: | |
data = None | |
return data | |
def save_to_file(filename, key, value): | |
"""Save data to a cache file""" | |
os.makedirs(os.path.dirname(filename), exist_ok=True) | |
np.savez_compressed(filename, **{key: value}) | |
def generate_proj_maps(camera, Xw, shape): | |
"""Render pointcloud on image. | |
Parameters | |
---------- | |
camera: Camera | |
Camera object with appropriately set extrinsics wrt world. | |
Xw: np.Array | |
3D point cloud (x, y, z) in the world coordinate. [N,3] | |
shape: np.Array | |
Output depth image shape [H, W] | |
Returns | |
------- | |
depth: np.Array | |
Rendered depth image | |
""" | |
assert len(shape) == 2, 'Shape needs to be 2-tuple.' | |
# Move point cloud to the camera's (C) reference frame from the world (W) | |
Xc = camera.p_cw * Xw | |
# Project the points as if they were in the camera's frame of reference | |
uv = Camera(K=camera.K).project(Xc).astype(int) | |
# Colorize the point cloud based on depth | |
z_c = Xc[:, 2] | |
# Create an empty image to overlay | |
H, W = shape | |
proj_depth = np.zeros((H, W), dtype=np.float32) | |
in_view = np.logical_and.reduce([(uv >= 0).all(axis=1), uv[:, 0] < W, uv[:, 1] < H, z_c > 0]) | |
uv, z_c = uv[in_view], z_c[in_view] | |
proj_depth[uv[:, 1], uv[:, 0]] = z_c | |
# Return projected maps | |
return proj_depth | |
class OuroborosDataset(BaseDataset): | |
""" | |
DGP dataset class | |
Parameters | |
---------- | |
path : String | |
Path to the dataset | |
split : String {'train', 'val', 'test'} | |
Which dataset split to use | |
cameras : list[String] | |
Which cameras to get information from | |
depth_type : String | |
Which lidar will be used to generate ground-truth information | |
input_depth_type : String | |
Which lidar will be used as input to the networks | |
with_pose : Bool | |
If enabled pose estimates are also returned | |
with_extra_context : Bool | |
If enabled extra context information (e.g. depth, semantic, instance) are also returned | |
back_context : Int | |
Size of the backward context | |
forward_context : Int | |
Size of the forward context | |
data_transform : Function | |
Transformations applied to the sample | |
dataset : String ['synchronized', 'parallel_domain'] | |
Which dataset will be used | |
only_cache : Bool | |
Only use cached pointcloud information, without loading the sensor | |
""" | |
def __init__(self, split, tag=None, | |
depth_type=None, input_depth_type=None, | |
masks=None, **kwargs): | |
super().__init__(**kwargs) | |
self.tag = 'ouroboros' if tag is None else tag | |
cameras = [c if is_str(c) else 'camera_%02d' % c for c in self.cameras] | |
# Store variables | |
self.split = split | |
self.dataset_idx = 0 | |
self.sensors = list(cameras) | |
# Store task information | |
self.depth_type = depth_type | |
self.input_depth_type = input_depth_type | |
self.only_cache = False | |
self.masks_path = masks | |
# Add requested annotations | |
requested_annotations = [] | |
# Add depth sensor | |
if self.with_depth and not self.only_cache and \ | |
self.depth_type != 'zbuffer': | |
self.sensors.append(depth_type) | |
self.depth_idx = len(self.sensors) - 1 | |
# Add input depth sensor | |
if self.with_input_depth and not self.only_cache and \ | |
self.input_depth_type != 'zbuffer' and \ | |
self.input_depth_type != self.depth_type: | |
self.sensors.append(input_depth_type) | |
self.input_depth_idx = len(self.sensors) - 1 | |
# Add radar sensor | |
if self.with_radar: | |
self.sensors.append('radar') | |
self.radar_idx = len(self.sensors) - 1 | |
# Choose which dataset to use | |
if not self.virtual: | |
from dgp.datasets.synchronized_dataset import SynchronizedSceneDataset | |
dataset = SynchronizedSceneDataset | |
extra_args = {} | |
else: | |
from dgp.datasets.pd_dataset import ParallelDomainSceneDataset | |
dataset = ParallelDomainSceneDataset | |
extra_args = { | |
'use_virtual_camera_datums': False, | |
} | |
# Initialize chosen dataset | |
self.dataset = dataset( | |
scene_dataset_json=self.path, | |
split=split, | |
datum_names=self.sensors, | |
backward_context=self.bwd_context, | |
forward_context=self.fwd_context, | |
requested_annotations=requested_annotations, | |
only_annotated_datums=False, | |
**extra_args, | |
) | |
def depth_to_world_points(self, depth, datum_idx): | |
""" | |
Unproject depth from a camera's perspective into a world-frame pointcloud | |
Parameters | |
---------- | |
depth : np.Array | |
Depth map to be lifted [H,W] | |
datum_idx : Int | |
Index of the camera | |
Returns | |
------- | |
pointcloud : np.Array | |
Lifted 3D pointcloud [Nx3] | |
""" | |
# Access data | |
intrinsics = self.get_current('intrinsics', datum_idx) | |
pose = self.get_current('pose', datum_idx) | |
# Create pixel grid for 3D unprojection | |
h, w = depth.shape[:2] | |
uv = np.mgrid[:w, :h].transpose(2, 1, 0).reshape(-1, 2).astype(np.float32) | |
# Unproject grid to 3D in the camera frame of reference | |
pcl = Camera(K=intrinsics).unproject(uv) * depth.reshape(-1, 1) | |
# Return pointcloud in world frame of reference | |
return pose * pcl | |
def create_camera(self, datum_idx, context=None): | |
""" | |
Create current camera | |
Parameters | |
---------- | |
datum_idx : Int | |
Index of the camera | |
context : Int | |
Context value for choosing current of reference information | |
Returns | |
------- | |
camera : Camera | |
DGP camera | |
""" | |
camera_pose = self.get_current_or_context('pose', datum_idx, context) | |
camera_intrinsics = self.get_current_or_context('intrinsics', datum_idx, context) | |
return Camera(K=camera_intrinsics, p_cw=camera_pose.inverse()) | |
def create_proj_maps(self, filename, camera_idx, depth_idx, depth_type, | |
world_points=None, context=None): | |
""" | |
Creates the depth map for a camera by projecting LiDAR information. | |
It also caches the depth map following DGP folder structure, so it's not recalculated | |
Parameters | |
---------- | |
filename : String | |
Filename used for loading / saving | |
camera_idx : Int | |
Camera sensor index | |
depth_idx : Int | |
Depth sensor index | |
depth_type : String | |
Which depth type will be loaded | |
world_points : np.Array [Nx3] | |
Points that will be projected (optional) | |
context : Int | |
Context value for choosing current of reference information | |
Returns | |
------- | |
depth : np.Array | |
Depth map for that datum in that sample [H,W] | |
""" | |
# If we want the z-buffer (simulation) | |
if depth_type == 'zbuffer': | |
sensor_name = self.get_current('datum_name', camera_idx) | |
filename = filename.replace(self.sensors[camera_idx], sensor_name) | |
filename = '{}/{}.npz'.format( | |
os.path.dirname(self.path), filename.format('depth')) | |
return np.load(filename)['data'], None, None | |
# Otherwise, we want projected information | |
filename_depth = '{}/{}.npz'.format( | |
os.path.dirname(self.path), filename.format('projected/depth/{}'.format(depth_type))) | |
# Load and return if exists | |
try: | |
# Get cached depth map | |
depth = load_from_file(filename_depth, 'depth') | |
return depth | |
except: | |
pass | |
# Calculate world points if needed | |
if world_points is None: | |
# Get lidar information | |
lidar_pose = self.get_current_or_context('pose', depth_idx, context) | |
lidar_points = self.get_current_or_context('point_cloud', depth_idx, context) | |
world_points = lidar_pose * lidar_points | |
# Create camera | |
camera = self.create_camera(camera_idx, context) | |
image_shape = self.get_current_or_context('rgb', camera_idx, context).size[::-1] | |
# Generate depth maps | |
depth = generate_proj_maps(camera, world_points, image_shape) | |
# Save depth map | |
save_to_file(filename_depth, 'depth', depth) | |
# Return depth | |
return depth | |
def get_current(self, key, sensor_idx, as_dict=False): | |
"""Return current timestep of a key from a sensor""" | |
current = self.sample_dgp[self.bwd_context][sensor_idx][key] | |
return current if not as_dict else {0: current} | |
def get_backward(self, key, sensor_idx): | |
"""Return backward timesteps of a key from a sensor""" | |
return [] if self.bwd_context == 0 else \ | |
[self.sample_dgp[i][sensor_idx][key] for i in range(0, self.bwd_context)] | |
def get_forward(self, key, sensor_idx): | |
"""Return forward timesteps of a key from a sensor""" | |
return [] if self.fwd_context == 0 else \ | |
[self.sample_dgp[i][sensor_idx][key] | |
for i in range(self.bwd_context + 1, | |
self.bwd_context + self.fwd_context + 1)] | |
def get_context(self, key, sensor_idx, as_dict=False): | |
"""Get both backward and forward contexts""" | |
context = self.get_backward(key, sensor_idx) + self.get_forward(key, sensor_idx) | |
if not as_dict: | |
return context | |
else: | |
return {key: val for key, val in zip(self.context, context)} | |
def get_current_or_context(self, key, sensor_idx, context=None, as_dict=False): | |
"""Return current or context information for a given key and sensor index""" | |
if context is None: | |
return self.get_current(key, sensor_idx, as_dict=as_dict) | |
else: | |
return self.get_context(key, sensor_idx, as_dict=as_dict)[context] | |
def has_dgp_key(self, key, sensor_idx): | |
"""Returns True if the DGP sample contains a certain key""" | |
return key in self.sample_dgp[self.bwd_context][sensor_idx].keys() | |
def get_filename(self, sample_idx, datum_idx, context=0): | |
""" | |
Returns the filename for an index, following DGP structure | |
Parameters | |
---------- | |
sample_idx : Int | |
Sample index | |
datum_idx : Int | |
Datum index | |
context : Int | |
Context offset for the sample | |
Returns | |
------- | |
filename : String | |
Filename for the datum in that sample | |
""" | |
scene_idx, sample_idx_in_scene, _ = self.dataset.dataset_item_index[sample_idx] | |
scene_dir = self.dataset.scenes[scene_idx].directory | |
filename = self.dataset.get_datum( | |
scene_idx, sample_idx_in_scene + context, self.sensors[datum_idx]).datum.image.filename | |
return os.path.splitext(os.path.join(os.path.basename(scene_dir), | |
filename.replace('rgb', '{}')))[0] | |
def __len__(self): | |
"""Length of dataset""" | |
return len(self.dataset) | |
def __getitem__(self, idx): | |
"""Get dataset sample""" | |
# Get DGP sample (if single sensor, make it a list) | |
self.sample_dgp = self.dataset[idx] | |
self.sample_dgp = [make_list(sample) for sample in self.sample_dgp] | |
# Reorganize sensors to the right order | |
sensor_names = [self.get_current('datum_name', i).lower() for i in range(len(self.sensors))] | |
indexes = [sensor_names.index(v) for v in self.sensors] | |
self.sample_dgp = [[s[idx] for idx in indexes] for s in self.sample_dgp] | |
# Loop over all cameras | |
samples = [] | |
for i in range(self.num_cameras): | |
# Filename | |
filename = self.get_filename(idx, i) | |
# Base sample | |
sample = { | |
'idx': idx, | |
'tag': self.tag, | |
'filename': self.relative_path({0: filename}), | |
'splitname': '%s_%010d' % (self.split, idx), | |
'sensor_name': self.get_current('datum_name', i), | |
} | |
# Image and intrinsics | |
sample.update({ | |
'rgb': self.get_current('rgb', i, as_dict=True), | |
'intrinsics': self.get_current('intrinsics', i, as_dict=True), | |
}) | |
# If masks are returned | |
if self.masks_path is not None: | |
sample.update({ | |
'mask': read_image(os.path.join( | |
self.masks_path, '%02d.png' % self.cameras[i])) | |
}) | |
# If depth is returned | |
if self.with_depth: | |
# Get depth maps | |
depth = self.create_proj_maps( | |
filename, i, self.depth_idx, self.depth_type) | |
# Include depth map | |
sample.update({ | |
'depth': {0: depth} | |
}) | |
# If input depth is returned | |
if self.with_input_depth: | |
sample.update({ | |
'input_depth': {0: self.create_proj_maps( | |
filename, i, self.input_depth_idx, self.input_depth_type)[0]} | |
}) | |
# If pose is returned | |
if self.with_pose: | |
sample.update({ | |
'extrinsics': {key: val.inverse().matrix for key, val in | |
self.get_current('extrinsics', i, as_dict=True).items()}, | |
'pose': {key: val.inverse().matrix for key, val in | |
self.get_current('pose', i, as_dict=True).items()}, | |
}) | |
# If context is returned | |
if self.with_context: | |
# Include context images | |
sample['rgb'].update(self.get_context('rgb', i, as_dict=True)) | |
# Create contexts filenames if extra context is required | |
filename_context = [] | |
for context in range(-self.bwd_context, 0): | |
filename_context.append(self.get_filename(idx, i, context)) | |
for context in range(1, self.fwd_context + 1): | |
filename_context.append(self.get_filename(idx, i, context)) | |
sample['filename_context'] = filename_context | |
# If context pose is returned | |
if self.with_pose: | |
# Get original values to calculate relative motion | |
inv_orig_extrinsics = Pose.from_matrix(sample['extrinsics'][0]).inverse() | |
sample['extrinsics'].update( | |
{key: (inv_orig_extrinsics * val.inverse()).matrix for key, val in zip( | |
self.context, self.get_context('extrinsics', i))}) | |
sample['pose'].update( | |
{key: (val.inverse()).matrix for key, val in zip( | |
self.context, self.get_context('pose', i))}) | |
# If context depth is returned | |
if self.with_depth_context: | |
depth_context = [ | |
self.create_proj_maps( | |
filename, i, self.depth_idx, self.depth_type, | |
context=k) | |
for k, filename in enumerate(filename_context)] | |
sample['depth'].update( | |
{key: val for key, val in zip( | |
self.context, [dsf for dsf in depth_context])}) | |
samples.append(sample) | |
# Make relative poses | |
samples = make_relative_pose(samples) | |
# Add LiDAR information | |
lidar_sample = {} | |
if self.with_lidar: | |
# Include pointcloud information | |
lidar_sample.update({ | |
'lidar_pointcloud': self.get_current('point_cloud', self.depth_idx), | |
}) | |
# If pose is included | |
if self.with_pose: | |
lidar_sample.update({ | |
'lidar_extrinsics': self.get_current('extrinsics', self.depth_idx).matrix, | |
'lidar_pose': self.get_current('pose', self.depth_idx).matrix, | |
}) | |
# If extra context is included | |
if self.with_extra_context: | |
lidar_sample['lidar_context'] = self.get_context('point_cloud', self.depth_idx) | |
# If context pose is included | |
if self.with_pose: | |
# Get original values to calculate relative motion | |
orig_extrinsics = Pose.from_matrix(lidar_sample['lidar_extrinsics']) | |
orig_pose = Pose.from_matrix(lidar_sample['lidar_pose']) | |
lidar_sample.update({ | |
'lidar_extrinsics_context': | |
[(orig_extrinsics.inverse() * extrinsics).inverse().matrix | |
for extrinsics in self.get_context('extrinsics', self.depth_idx)], | |
'lidar_pose_context': | |
[(orig_pose.inverse() * pose).inverse().matrix | |
for pose in self.get_context('pose', self.depth_idx)], | |
}) | |
# Apply same data transformations for all sensors | |
if self.data_transform: | |
samples = self.data_transform(samples) | |
# lidar_sample = self.data_transform(lidar_sample) | |
# Return sample (stacked if necessary) | |
return stack_sample(samples, lidar_sample) | |