define-hf-demo / vidar /datasets /OuroborosDataset.py
Jiading Fang
add define
fc16538
# TRI-VIDAR - Copyright 2022 Toyota Research Institute. All rights reserved.
import os
import pickle
import numpy as np
from dgp.utils.camera import Camera
from dgp.utils.pose import Pose
from vidar.datasets.BaseDataset import BaseDataset
from vidar.datasets.utils.misc import \
stack_sample, make_relative_pose
from vidar.utils.data import make_list
from vidar.utils.read import read_image
from vidar.utils.types import is_str
def load_from_file(filename, key):
"""Load data cache from a file"""
data = np.load(filename, allow_pickle=True)[key]
if len(data.shape) == 0:
data = None
return data
def save_to_file(filename, key, value):
"""Save data to a cache file"""
os.makedirs(os.path.dirname(filename), exist_ok=True)
np.savez_compressed(filename, **{key: value})
def generate_proj_maps(camera, Xw, shape):
"""Render pointcloud on image.
Parameters
----------
camera: Camera
Camera object with appropriately set extrinsics wrt world.
Xw: np.Array
3D point cloud (x, y, z) in the world coordinate. [N,3]
shape: np.Array
Output depth image shape [H, W]
Returns
-------
depth: np.Array
Rendered depth image
"""
assert len(shape) == 2, 'Shape needs to be 2-tuple.'
# Move point cloud to the camera's (C) reference frame from the world (W)
Xc = camera.p_cw * Xw
# Project the points as if they were in the camera's frame of reference
uv = Camera(K=camera.K).project(Xc).astype(int)
# Colorize the point cloud based on depth
z_c = Xc[:, 2]
# Create an empty image to overlay
H, W = shape
proj_depth = np.zeros((H, W), dtype=np.float32)
in_view = np.logical_and.reduce([(uv >= 0).all(axis=1), uv[:, 0] < W, uv[:, 1] < H, z_c > 0])
uv, z_c = uv[in_view], z_c[in_view]
proj_depth[uv[:, 1], uv[:, 0]] = z_c
# Return projected maps
return proj_depth
class OuroborosDataset(BaseDataset):
"""
DGP dataset class
Parameters
----------
path : String
Path to the dataset
split : String {'train', 'val', 'test'}
Which dataset split to use
cameras : list[String]
Which cameras to get information from
depth_type : String
Which lidar will be used to generate ground-truth information
input_depth_type : String
Which lidar will be used as input to the networks
with_pose : Bool
If enabled pose estimates are also returned
with_extra_context : Bool
If enabled extra context information (e.g. depth, semantic, instance) are also returned
back_context : Int
Size of the backward context
forward_context : Int
Size of the forward context
data_transform : Function
Transformations applied to the sample
dataset : String ['synchronized', 'parallel_domain']
Which dataset will be used
only_cache : Bool
Only use cached pointcloud information, without loading the sensor
"""
def __init__(self, split, tag=None,
depth_type=None, input_depth_type=None,
masks=None, **kwargs):
super().__init__(**kwargs)
self.tag = 'ouroboros' if tag is None else tag
cameras = [c if is_str(c) else 'camera_%02d' % c for c in self.cameras]
# Store variables
self.split = split
self.dataset_idx = 0
self.sensors = list(cameras)
# Store task information
self.depth_type = depth_type
self.input_depth_type = input_depth_type
self.only_cache = False
self.masks_path = masks
# Add requested annotations
requested_annotations = []
# Add depth sensor
if self.with_depth and not self.only_cache and \
self.depth_type != 'zbuffer':
self.sensors.append(depth_type)
self.depth_idx = len(self.sensors) - 1
# Add input depth sensor
if self.with_input_depth and not self.only_cache and \
self.input_depth_type != 'zbuffer' and \
self.input_depth_type != self.depth_type:
self.sensors.append(input_depth_type)
self.input_depth_idx = len(self.sensors) - 1
# Add radar sensor
if self.with_radar:
self.sensors.append('radar')
self.radar_idx = len(self.sensors) - 1
# Choose which dataset to use
if not self.virtual:
from dgp.datasets.synchronized_dataset import SynchronizedSceneDataset
dataset = SynchronizedSceneDataset
extra_args = {}
else:
from dgp.datasets.pd_dataset import ParallelDomainSceneDataset
dataset = ParallelDomainSceneDataset
extra_args = {
'use_virtual_camera_datums': False,
}
# Initialize chosen dataset
self.dataset = dataset(
scene_dataset_json=self.path,
split=split,
datum_names=self.sensors,
backward_context=self.bwd_context,
forward_context=self.fwd_context,
requested_annotations=requested_annotations,
only_annotated_datums=False,
**extra_args,
)
def depth_to_world_points(self, depth, datum_idx):
"""
Unproject depth from a camera's perspective into a world-frame pointcloud
Parameters
----------
depth : np.Array
Depth map to be lifted [H,W]
datum_idx : Int
Index of the camera
Returns
-------
pointcloud : np.Array
Lifted 3D pointcloud [Nx3]
"""
# Access data
intrinsics = self.get_current('intrinsics', datum_idx)
pose = self.get_current('pose', datum_idx)
# Create pixel grid for 3D unprojection
h, w = depth.shape[:2]
uv = np.mgrid[:w, :h].transpose(2, 1, 0).reshape(-1, 2).astype(np.float32)
# Unproject grid to 3D in the camera frame of reference
pcl = Camera(K=intrinsics).unproject(uv) * depth.reshape(-1, 1)
# Return pointcloud in world frame of reference
return pose * pcl
def create_camera(self, datum_idx, context=None):
"""
Create current camera
Parameters
----------
datum_idx : Int
Index of the camera
context : Int
Context value for choosing current of reference information
Returns
-------
camera : Camera
DGP camera
"""
camera_pose = self.get_current_or_context('pose', datum_idx, context)
camera_intrinsics = self.get_current_or_context('intrinsics', datum_idx, context)
return Camera(K=camera_intrinsics, p_cw=camera_pose.inverse())
def create_proj_maps(self, filename, camera_idx, depth_idx, depth_type,
world_points=None, context=None):
"""
Creates the depth map for a camera by projecting LiDAR information.
It also caches the depth map following DGP folder structure, so it's not recalculated
Parameters
----------
filename : String
Filename used for loading / saving
camera_idx : Int
Camera sensor index
depth_idx : Int
Depth sensor index
depth_type : String
Which depth type will be loaded
world_points : np.Array [Nx3]
Points that will be projected (optional)
context : Int
Context value for choosing current of reference information
Returns
-------
depth : np.Array
Depth map for that datum in that sample [H,W]
"""
# If we want the z-buffer (simulation)
if depth_type == 'zbuffer':
sensor_name = self.get_current('datum_name', camera_idx)
filename = filename.replace(self.sensors[camera_idx], sensor_name)
filename = '{}/{}.npz'.format(
os.path.dirname(self.path), filename.format('depth'))
return np.load(filename)['data'], None, None
# Otherwise, we want projected information
filename_depth = '{}/{}.npz'.format(
os.path.dirname(self.path), filename.format('projected/depth/{}'.format(depth_type)))
# Load and return if exists
try:
# Get cached depth map
depth = load_from_file(filename_depth, 'depth')
return depth
except:
pass
# Calculate world points if needed
if world_points is None:
# Get lidar information
lidar_pose = self.get_current_or_context('pose', depth_idx, context)
lidar_points = self.get_current_or_context('point_cloud', depth_idx, context)
world_points = lidar_pose * lidar_points
# Create camera
camera = self.create_camera(camera_idx, context)
image_shape = self.get_current_or_context('rgb', camera_idx, context).size[::-1]
# Generate depth maps
depth = generate_proj_maps(camera, world_points, image_shape)
# Save depth map
save_to_file(filename_depth, 'depth', depth)
# Return depth
return depth
def get_current(self, key, sensor_idx, as_dict=False):
"""Return current timestep of a key from a sensor"""
current = self.sample_dgp[self.bwd_context][sensor_idx][key]
return current if not as_dict else {0: current}
def get_backward(self, key, sensor_idx):
"""Return backward timesteps of a key from a sensor"""
return [] if self.bwd_context == 0 else \
[self.sample_dgp[i][sensor_idx][key] for i in range(0, self.bwd_context)]
def get_forward(self, key, sensor_idx):
"""Return forward timesteps of a key from a sensor"""
return [] if self.fwd_context == 0 else \
[self.sample_dgp[i][sensor_idx][key]
for i in range(self.bwd_context + 1,
self.bwd_context + self.fwd_context + 1)]
def get_context(self, key, sensor_idx, as_dict=False):
"""Get both backward and forward contexts"""
context = self.get_backward(key, sensor_idx) + self.get_forward(key, sensor_idx)
if not as_dict:
return context
else:
return {key: val for key, val in zip(self.context, context)}
def get_current_or_context(self, key, sensor_idx, context=None, as_dict=False):
"""Return current or context information for a given key and sensor index"""
if context is None:
return self.get_current(key, sensor_idx, as_dict=as_dict)
else:
return self.get_context(key, sensor_idx, as_dict=as_dict)[context]
def has_dgp_key(self, key, sensor_idx):
"""Returns True if the DGP sample contains a certain key"""
return key in self.sample_dgp[self.bwd_context][sensor_idx].keys()
def get_filename(self, sample_idx, datum_idx, context=0):
"""
Returns the filename for an index, following DGP structure
Parameters
----------
sample_idx : Int
Sample index
datum_idx : Int
Datum index
context : Int
Context offset for the sample
Returns
-------
filename : String
Filename for the datum in that sample
"""
scene_idx, sample_idx_in_scene, _ = self.dataset.dataset_item_index[sample_idx]
scene_dir = self.dataset.scenes[scene_idx].directory
filename = self.dataset.get_datum(
scene_idx, sample_idx_in_scene + context, self.sensors[datum_idx]).datum.image.filename
return os.path.splitext(os.path.join(os.path.basename(scene_dir),
filename.replace('rgb', '{}')))[0]
def __len__(self):
"""Length of dataset"""
return len(self.dataset)
def __getitem__(self, idx):
"""Get dataset sample"""
# Get DGP sample (if single sensor, make it a list)
self.sample_dgp = self.dataset[idx]
self.sample_dgp = [make_list(sample) for sample in self.sample_dgp]
# Reorganize sensors to the right order
sensor_names = [self.get_current('datum_name', i).lower() for i in range(len(self.sensors))]
indexes = [sensor_names.index(v) for v in self.sensors]
self.sample_dgp = [[s[idx] for idx in indexes] for s in self.sample_dgp]
# Loop over all cameras
samples = []
for i in range(self.num_cameras):
# Filename
filename = self.get_filename(idx, i)
# Base sample
sample = {
'idx': idx,
'tag': self.tag,
'filename': self.relative_path({0: filename}),
'splitname': '%s_%010d' % (self.split, idx),
'sensor_name': self.get_current('datum_name', i),
}
# Image and intrinsics
sample.update({
'rgb': self.get_current('rgb', i, as_dict=True),
'intrinsics': self.get_current('intrinsics', i, as_dict=True),
})
# If masks are returned
if self.masks_path is not None:
sample.update({
'mask': read_image(os.path.join(
self.masks_path, '%02d.png' % self.cameras[i]))
})
# If depth is returned
if self.with_depth:
# Get depth maps
depth = self.create_proj_maps(
filename, i, self.depth_idx, self.depth_type)
# Include depth map
sample.update({
'depth': {0: depth}
})
# If input depth is returned
if self.with_input_depth:
sample.update({
'input_depth': {0: self.create_proj_maps(
filename, i, self.input_depth_idx, self.input_depth_type)[0]}
})
# If pose is returned
if self.with_pose:
sample.update({
'extrinsics': {key: val.inverse().matrix for key, val in
self.get_current('extrinsics', i, as_dict=True).items()},
'pose': {key: val.inverse().matrix for key, val in
self.get_current('pose', i, as_dict=True).items()},
})
# If context is returned
if self.with_context:
# Include context images
sample['rgb'].update(self.get_context('rgb', i, as_dict=True))
# Create contexts filenames if extra context is required
filename_context = []
for context in range(-self.bwd_context, 0):
filename_context.append(self.get_filename(idx, i, context))
for context in range(1, self.fwd_context + 1):
filename_context.append(self.get_filename(idx, i, context))
sample['filename_context'] = filename_context
# If context pose is returned
if self.with_pose:
# Get original values to calculate relative motion
inv_orig_extrinsics = Pose.from_matrix(sample['extrinsics'][0]).inverse()
sample['extrinsics'].update(
{key: (inv_orig_extrinsics * val.inverse()).matrix for key, val in zip(
self.context, self.get_context('extrinsics', i))})
sample['pose'].update(
{key: (val.inverse()).matrix for key, val in zip(
self.context, self.get_context('pose', i))})
# If context depth is returned
if self.with_depth_context:
depth_context = [
self.create_proj_maps(
filename, i, self.depth_idx, self.depth_type,
context=k)
for k, filename in enumerate(filename_context)]
sample['depth'].update(
{key: val for key, val in zip(
self.context, [dsf for dsf in depth_context])})
samples.append(sample)
# Make relative poses
samples = make_relative_pose(samples)
# Add LiDAR information
lidar_sample = {}
if self.with_lidar:
# Include pointcloud information
lidar_sample.update({
'lidar_pointcloud': self.get_current('point_cloud', self.depth_idx),
})
# If pose is included
if self.with_pose:
lidar_sample.update({
'lidar_extrinsics': self.get_current('extrinsics', self.depth_idx).matrix,
'lidar_pose': self.get_current('pose', self.depth_idx).matrix,
})
# If extra context is included
if self.with_extra_context:
lidar_sample['lidar_context'] = self.get_context('point_cloud', self.depth_idx)
# If context pose is included
if self.with_pose:
# Get original values to calculate relative motion
orig_extrinsics = Pose.from_matrix(lidar_sample['lidar_extrinsics'])
orig_pose = Pose.from_matrix(lidar_sample['lidar_pose'])
lidar_sample.update({
'lidar_extrinsics_context':
[(orig_extrinsics.inverse() * extrinsics).inverse().matrix
for extrinsics in self.get_context('extrinsics', self.depth_idx)],
'lidar_pose_context':
[(orig_pose.inverse() * pose).inverse().matrix
for pose in self.get_context('pose', self.depth_idx)],
})
# Apply same data transformations for all sensors
if self.data_transform:
samples = self.data_transform(samples)
# lidar_sample = self.data_transform(lidar_sample)
# Return sample (stacked if necessary)
return stack_sample(samples, lidar_sample)