import glob import os import pickle import numpy as np import yaml from PIL import Image import xml.etree.ElementTree as ET from lidm.data.base import DatasetBase from .annotated_dataset import Annotated3DObjectsDataset from .conditional_builder.utils import corners_3d_to_2d from .helper_types import Annotation from ..utils.lidar_utils import pcd2range, pcd2coord2d, range2pcd # TODO add annotation categories and semantic categories CATEGORIES = ['ignore', 'car', 'bicycle', 'motorcycle', 'truck', 'other-vehicle', 'person', 'bicyclist', 'motorcyclist', 'road', 'parking', 'sidewalk', 'other-ground', 'building', 'fence', 'vegetation', 'trunk', 'terrain', 'pole', 'traffic-sign'] CATE2LABEL = {k: v for v, k in enumerate(CATEGORIES)} # 0: invalid, 1~10: categories LABEL2RGB = np.array([(0, 0, 0), (0, 0, 142), (119, 11, 32), (0, 0, 230), (0, 0, 70), (0, 0, 90), (220, 20, 60), (255, 0, 0), (0, 0, 110), (128, 64, 128), (250, 170, 160), (244, 35, 232), (230, 150, 140), (70, 70, 70), (190, 153, 153), (107, 142, 35), (0, 80, 100), (230, 150, 140), (153, 153, 153), (220, 220, 0)]) CAMERAS = ['CAM_FRONT'] BBOX_CATS = ['car', 'people', 'cycle'] BBOX_CAT2LABEL = {'car': 0, 'truck': 0, 'bus': 0, 'caravan': 0, 'person': 1, 'rider': 2, 'motorcycle': 2, 'bicycle': 2} # train + test SEM_KITTI_TRAIN_SET = ['00', '01', '02', '03', '04', '05', '06', '07', '09', '10'] KITTI_TRAIN_SET = SEM_KITTI_TRAIN_SET + ['11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21'] KITTI360_TRAIN_SET = ['00', '02', '04', '05', '06', '07', '09', '10'] + ['08'] # partial test data at '02' sequence CAM_KITTI360_TRAIN_SET = ['00', '04', '05', '06', '07', '08', '09', '10'] # cam mismatch lidar in '02' # validation SEM_KITTI_VAL_SET = KITTI_VAL_SET = ['08'] CAM_KITTI360_VAL_SET = KITTI360_VAL_SET = ['03'] class KITTIBase(DatasetBase): def __init__(self, **kwargs): super().__init__(**kwargs) self.dataset_name = 'kitti' self.num_sem_cats = kwargs['dataset_config'].num_sem_cats + 1 @staticmethod def load_lidar_sweep(path): scan = np.fromfile(path, dtype=np.float32) scan = scan.reshape((-1, 4)) points = scan[:, 0:3] # get xyz return points def load_semantic_map(self, path, pcd): raise NotImplementedError def load_camera(self, path): raise NotImplementedError def __getitem__(self, idx): example = dict() data_path = self.data[idx] # lidar point cloud sweep = self.load_lidar_sweep(data_path) if self.lidar_transform: sweep, _ = self.lidar_transform(sweep, None) if self.condition_key == 'segmentation': # semantic maps proj_range, sem_map = self.load_semantic_map(data_path, sweep) example[self.condition_key] = sem_map else: proj_range, _ = pcd2range(sweep, self.img_size, self.fov, self.depth_range) proj_range, proj_mask = self.process_scan(proj_range) example['image'], example['mask'] = proj_range, proj_mask if self.return_pcd: reproj_sweep, _, _ = range2pcd(proj_range[0] * .5 + .5, self.fov, self.depth_range, self.depth_scale, self.log_scale) example['raw'] = sweep example['reproj'] = reproj_sweep.astype(np.float32) # image degradation if self.degradation_transform: degraded_proj_range = self.degradation_transform(proj_range) example['degraded_image'] = degraded_proj_range # cameras if self.condition_key == 'camera': cameras = self.load_camera(data_path) example[self.condition_key] = cameras return example class SemanticKITTIBase(KITTIBase): def __init__(self, **kwargs): super().__init__(**kwargs) assert self.condition_key in ['segmentation'] # for segmentation input only self.label2rgb = LABEL2RGB def prepare_data(self): # read data paths from KITTI for seq_id in eval('SEM_KITTI_%s_SET' % self.split.upper()): self.data.extend(glob.glob(os.path.join( self.data_root, f'dataset/sequences/{seq_id}/velodyne/*.bin'))) # read label mapping data_config = yaml.safe_load(open('./data/config/semantic-kitti.yaml', 'r')) remap_dict = data_config["learning_map"] max_key = max(remap_dict.keys()) self.learning_map = np.zeros((max_key + 100), dtype=np.int32) self.learning_map[list(remap_dict.keys())] = list(remap_dict.values()) def load_semantic_map(self, path, pcd): label_path = path.replace('velodyne', 'labels').replace('.bin', '.label') labels = np.fromfile(label_path, dtype=np.uint32) labels = labels.reshape((-1)) labels = labels & 0xFFFF # semantic label in lower half labels = self.learning_map[labels] proj_range, sem_map = pcd2range(pcd, self.img_size, self.fov, self.depth_range, labels=labels) # sem_map = np.expand_dims(sem_map, axis=0).astype(np.int64) sem_map = sem_map.astype(np.int64) if self.filtered_map_cats is not None: sem_map[np.isin(sem_map, self.filtered_map_cats)] = 0 # set filtered category as noise onehot = np.eye(self.num_sem_cats, dtype=np.float32)[sem_map].transpose(2, 0, 1) return proj_range, onehot class SemanticKITTITrain(SemanticKITTIBase): def __init__(self, **kwargs): super().__init__(data_root='./dataset/SemanticKITTI', split='train', **kwargs) class SemanticKITTIValidation(SemanticKITTIBase): def __init__(self, **kwargs): super().__init__(data_root='./dataset/SemanticKITTI', split='val', **kwargs) class KITTI360Base(KITTIBase): def __init__(self, split_per_view=None, **kwargs): super().__init__(**kwargs) self.split_per_view = split_per_view if self.condition_key == 'camera': assert self.split_per_view is not None, 'For camera-to-lidar, need to specify split_per_view' def prepare_data(self): # read data paths self.data = [] if self.condition_key == 'camera': seq_list = eval('CAM_KITTI360_%s_SET' % self.split.upper()) else: seq_list = eval('KITTI360_%s_SET' % self.split.upper()) for seq_id in seq_list: self.data.extend(glob.glob(os.path.join( self.data_root, f'data_3d_raw/2013_05_28_drive_00{seq_id}_sync/velodyne_points/data/*.bin'))) def random_drop_camera(self, camera_list): if np.random.rand() < self.aug_config['camera_drop'] and self.split == 'train': camera_list = [np.zeros_like(c) if i != len(camera_list) // 2 else c for i, c in enumerate(camera_list)] # keep the middle view only return camera_list def load_camera(self, path): camera_path = path.replace('data_3d_raw', 'data_2d_camera').replace('velodyne_points/data', 'image_00/data_rect').replace('.bin', '.png') camera = np.array(Image.open(camera_path)).astype(np.float32) / 255. camera = camera.transpose(2, 0, 1) if self.view_transform: camera = self.view_transform(camera) camera_list = np.split(camera, self.split_per_view, axis=2) # split into n chunks as different views camera_list = self.random_drop_camera(camera_list) return camera_list class KITTI360Train(KITTI360Base): def __init__(self, **kwargs): super().__init__(data_root='./dataset/KITTI-360', split='train', **kwargs) class KITTI360Validation(KITTI360Base): def __init__(self, **kwargs): super().__init__(data_root='./dataset/KITTI-360', split='val', **kwargs) class AnnotatedKITTI360Base(Annotated3DObjectsDataset, KITTI360Base): def __init__(self, **kwargs): self.id_bbox_dict = dict() self.id_label_dict = dict() Annotated3DObjectsDataset.__init__(self, **kwargs) KITTI360Base.__init__(self, **kwargs) assert self.condition_key in ['center', 'bbox'] # for annotated images only @staticmethod def parseOpencvMatrix(node): rows = int(node.find('rows').text) cols = int(node.find('cols').text) data = node.find('data').text.split(' ') mat = [] for d in data: d = d.replace('\n', '') if len(d) < 1: continue mat.append(float(d)) mat = np.reshape(mat, [rows, cols]) return mat def parseVertices(self, child): transform = self.parseOpencvMatrix(child.find('transform')) R = transform[:3, :3] T = transform[:3, 3] vertices = self.parseOpencvMatrix(child.find('vertices')) vertices = np.matmul(R, vertices.transpose()).transpose() + T return vertices def parse_bbox_xml(self, path): tree = ET.parse(path) root = tree.getroot() bbox_dict = dict() label_dict = dict() for child in root: if child.find('transform') is None: continue label_name = child.find('label').text if label_name not in BBOX_CAT2LABEL: continue label = BBOX_CAT2LABEL[label_name] timestamp = int(child.find('timestamp').text) # verts = self.parseVertices(child) verts = self.parseOpencvMatrix(child.find('vertices'))[:8] if timestamp in bbox_dict: bbox_dict[timestamp].append(verts) label_dict[timestamp].append(label) else: bbox_dict[timestamp] = [verts] label_dict[timestamp] = [label] return bbox_dict, label_dict def prepare_data(self): KITTI360Base.prepare_data(self) self.data = [p for p in self.data if '2013_05_28_drive_0008_sync' not in p] # remove unlabeled sequence 08 seq_list = eval('KITTI360_%s_SET' % self.split.upper()) for seq_id in seq_list: if seq_id != '08': xml_path = os.path.join(self.data_root, f'data_3d_bboxes/train/2013_05_28_drive_00{seq_id}_sync.xml') bbox_dict, label_dict = self.parse_bbox_xml(xml_path) self.id_bbox_dict[seq_id] = bbox_dict self.id_label_dict[seq_id] = label_dict def load_annotation(self, path): seq_id = path.split('/')[-4].split('_')[-2][-2:] timestamp = int(path.split('/')[-1].replace('.bin', '')) verts_list = self.id_bbox_dict[seq_id][timestamp] label_list = self.id_label_dict[seq_id][timestamp] if self.condition_key == 'bbox': points = np.stack(verts_list) elif self.condition_key == 'center': points = (verts_list[0] + verts_list[6]) / 2. else: raise NotImplementedError labels = np.array([label_list]) if self.anno_transform: points, labels = self.anno_transform(points, labels) return points, labels def __getitem__(self, idx): example = dict() data_path = self.data[idx] # lidar point cloud sweep = self.load_lidar_sweep(data_path) # annotations bbox_points, bbox_labels = self.load_annotation(data_path) if self.lidar_transform: sweep, bbox_points = self.lidar_transform(sweep, bbox_points) # point cloud -> range proj_range, _ = pcd2range(sweep, self.img_size, self.fov, self.depth_range) proj_range, proj_mask = self.process_scan(proj_range) example['image'], example['mask'] = proj_range, proj_mask if self.return_pcd: example['reproj'] = sweep # annotation -> range # NOTE: do not need to transform bbox points along with lidar, since their coordinates are based on range-image space instead of 3D space proj_bbox_points, proj_bbox_labels = pcd2coord2d(bbox_points, self.fov, self.depth_range, labels=bbox_labels) builder = self.conditional_builders[self.condition_key] if self.condition_key == 'bbox': proj_bbox_points = corners_3d_to_2d(proj_bbox_points) annotations = [Annotation(bbox=bbox.flatten(), category_id=label) for bbox, label in zip(proj_bbox_points, proj_bbox_labels)] else: annotations = [Annotation(center=center, category_id=label) for center, label in zip(proj_bbox_points, proj_bbox_labels)] example[self.condition_key] = builder.build(annotations) return example class AnnotatedKITTI360Train(AnnotatedKITTI360Base): def __init__(self, **kwargs): super().__init__(data_root='./dataset/KITTI-360', split='train', cats=BBOX_CATS, **kwargs) class AnnotatedKITTI360Validation(AnnotatedKITTI360Base): def __init__(self, **kwargs): super().__init__(data_root='./dataset/KITTI-360', split='train', cats=BBOX_CATS, **kwargs) class KITTIImageBase(KITTIBase): """ Range ImageSet only combining KITTI-360 and SemanticKITTI #Samples (Training): 98014, #Samples (Val): 3511 """ def __init__(self, **kwargs): super().__init__(**kwargs) assert self.condition_key in [None, 'image'] # for image input only def prepare_data(self): # read data paths from KITTI-360 self.data = [] for seq_id in eval('KITTI360_%s_SET' % self.split.upper()): self.data.extend(glob.glob(os.path.join( self.data_root, f'KITTI-360/data_3d_raw/2013_05_28_drive_00{seq_id}_sync/velodyne_points/data/*.bin'))) # read data paths from KITTI for seq_id in eval('KITTI_%s_SET' % self.split.upper()): self.data.extend(glob.glob(os.path.join( self.data_root, f'SemanticKITTI/dataset/sequences/{seq_id}/velodyne/*.bin'))) class KITTIImageTrain(KITTIImageBase): def __init__(self, **kwargs): super().__init__(data_root='./dataset', split='train', **kwargs) class KITTIImageValidation(KITTIImageBase): def __init__(self, **kwargs): super().__init__(data_root='./dataset', split='val', **kwargs)