import concurrent.futures from concurrent.futures import ProcessPoolExecutor import math import os import fire import numpy as np import pickle import tensorflow as tf from tqdm import tqdm from waymo_open_dataset.protos import scenario_pb2 def scalar_to_one_hot(length, index, has_zero=False): if has_zero: offset = 1 else: offset = 0 assert 0 <= index < length + offset if index + 1 - offset > 0: one_hot_type = np.eye(length)[index - offset] else: one_hot_type = np.zeros(length) return one_hot_type def group_tracks(tracks): object_types = { "TYPE_UNSET": 0, "TYPE_VEHICLE": 1, "TYPE_PEDESTRIAN": 2, "TYPE_CYCLIST": 3, "TYPE_OTHER": 4, } state_size = 11 traj = np.zeros((len(tracks), len(tracks[0].states), state_size)) mask_traj = np.zeros((len(tracks), len(tracks[0].states)), dtype=bool) traj_type = np.zeros((len(tracks), len(object_types) - 1)) id_to_idx = {} for i_track, track in enumerate(tracks): traj_type[i_track, :] = scalar_to_one_hot( len(object_types) - 1, track.object_type, has_zero=True ) id_to_idx[track.id] = i_track for i_time, state in enumerate(track.states): if state.valid: traj[i_track, i_time, 0] = state.center_x traj[i_track, i_time, 1] = state.center_y traj[i_track, i_time, 2] = state.heading traj[i_track, i_time, 3] = state.velocity_x traj[i_track, i_time, 4] = state.velocity_y traj[i_track, i_time, 5] = state.width traj[i_track, i_time, 6] = state.length traj[i_track, i_time, 7:11] = traj_type[i_track, :] mask_traj[i_track, i_time] = state.center_x != 0 or state.center_y != 0 else: mask_traj[i_track, i_time] = False # Remove trajectories that are masked for the whole time mask_any_time = mask_traj.any(-1) to_delete = [] for key, value in id_to_idx.items(): if not mask_any_time[value]: to_delete.append(key) else: id_to_idx[key] = np.sum(mask_any_time[:value]) for key in to_delete: del id_to_idx[key] traj = traj[mask_any_time] traj_type = traj_type[mask_any_time] mask_traj = mask_traj[mask_any_time] # traj:(n_agents, seq_time, features), mask:(n_agents, seq_time), traj_type:(n_agents, features) assert (traj[..., :2][mask_traj] != 0).any(-1).all() return traj, mask_traj, traj_type, id_to_idx def filter_tracks( pos, trajs, mask_trajs, trajs_type, to_predict, id_to_idx, mask_keep, max_moving_distance, max_static_distance, ): distances2 = ((trajs[:, :, :2] - pos[None, None, :]) ** 2).sum(-1).min(1) first_non_0_pos = np.take_along_axis( trajs, np.argmax(mask_trajs, 1)[:, None, None], axis=1 ) is_moving = ( np.abs((trajs[:, :, :2] - first_non_0_pos[:, 0:1, :2]) * mask_trajs[:, :, None]) .sum(1) .sum(1) > 1 ) filtered = np.zeros_like(distances2, dtype=bool) filtered[is_moving] = distances2[is_moving] < max_moving_distance**2 filtered[np.logical_not(is_moving)] = ( distances2[np.logical_not(is_moving)] < max_static_distance**2 ) filtered = np.logical_or(filtered, mask_keep) # Filter out trajectories to_delete = [] idx_to_id = {} for key, value in id_to_idx.items(): if not filtered[value]: to_delete.append(key) else: new_value = np.sum(filtered[:value]) idx_to_id[new_value] = key id_to_idx[key] = new_value for key in to_delete: del id_to_idx[key] trajs = trajs[filtered] trajs_type = trajs_type[filtered] mask_trajs = mask_trajs[filtered] to_predict = to_predict[filtered] if mask_keep.all(): return trajs, mask_trajs, trajs_type, to_predict, id_to_idx # Sort entries from closest to furthest to input pos distances2 = distances2[filtered] distance_sort = np.argsort(distances2) copy_trajs = trajs.copy() copy_mask_trajs = mask_trajs.copy() copy_trajs_type = trajs_type.copy() copy_to_predict = to_predict.copy() skip = np.argmin(mask_keep) assert skip > 1 offset = skip for i, idx in enumerate(distance_sort[skip:]): if idx > skip: ii = i + offset trajs[ii] = copy_trajs[idx] trajs_type[ii] = copy_trajs_type[idx] mask_trajs[ii] = copy_mask_trajs[idx] to_predict[ii] = copy_to_predict[idx] id_to_idx[idx_to_id[idx]] = ii else: offset -= 1 assert (trajs[..., :2][mask_trajs] != 0).any(-1).all() return trajs, mask_trajs, trajs_type, to_predict, id_to_idx def cut_lane(lane, pos, max_len): center_idx = np.argmin(((lane - pos[None, :]) ** 2).sum(-1)) start = max(0, center_idx - max_len // 2) return lane[start : start + max_len, :] def group_lanes(map, center, max_lane_len, max_lane_distance): all_objects = [] all_types = [] max_len = 0 id_to_idx = {} stride = 2 max_lane_len = max_lane_len * stride for object in map: # Type one_hot encoding is as follows: 0: lane, 1: stop_sign, 2: cross_walk, 3: speed_bump lane = object.lane.polyline is_cut_lane = len(lane) > max_lane_len len_lane = min(len(lane), max_lane_len) len_cross_walk = len(object.crosswalk.polygon) len_speed_bump = len(object.speed_bump.polygon) num_obj_types = 4 max_len = max(max_len, len_lane) max_len = max(max_len, len_cross_walk) max_len = max(max_len, len_speed_bump) if len_lane > 0: current_lane = np.zeros((len(lane), 2)) for i_point, cw in enumerate(lane): current_lane[i_point, 0] = cw.x current_lane[i_point, 1] = cw.y if is_cut_lane: current_lane = cut_lane(current_lane, center, max_lane_len) min_distance2 = np.min(((current_lane - center[None, :]) ** 2).sum(-1)) if min_distance2 < max_lane_distance**2: id_to_idx[object.id] = len(all_objects) all_objects.append(current_lane) all_types.append(scalar_to_one_hot(num_obj_types, 0)) # elif len_cross_walk > 0: # current_cross_walk = np.zeros((len_cross_walk, 2)) # for i_point, cw in enumerate(object.crosswalk.polygon): # current_cross_walk[i_point, 0] = cw.x # current_cross_walk[i_point, 1] = cw.y # all_objects.append(current_cross_walk) # all_types.append(scalar_to_one_hot(num_obj_types, 2)) # elif len_speed_bump > 0: # current_speed_bump = np.zeros((len_speed_bump, 2)) # for i_point, cw in enumerate(object.speed_bump.polygon): # current_speed_bump[i_point, 0] = cw.x # current_speed_bump[i_point, 1] = cw.y # all_objects.append(current_speed_bump) # all_types.append(scalar_to_one_hot(num_obj_types, 3)) # elif not (object.stop_sign.position.x == 0 and object.stop_sign.position.y == 0): # all_objects.append([np.array([object.stop_sign.position.x, object.stop_sign.position.y])]) # all_types.append(scalar_to_one_hot(num_obj_types, 1)) object_array = np.zeros((len(all_objects), (max_len + 1) // stride, 2)) mask_object_array = np.zeros( (len(all_objects), (max_len + 1) // stride), dtype=bool ) object_types_array = np.zeros((len(all_types), num_obj_types)) for i_object, object in enumerate(all_objects): len_object = (len(object) + 1) // stride object_array[i_object, :len_object, :] = object[::2] mask_object_array[i_object, :len_object] = True object_types_array[i_object] = all_types[i_object] # for i, lane in enumerate(object_array): # plt.plot(lane[mask_object_array[i, :], 0], lane[mask_object_array[i, :], 1], alpha=0.3) idx_to_id = {value: key for key, value in id_to_idx.items()} # Sort entries from closest to furthest to input center distances2 = np.min(((object_array - center[None, None, :]) ** 2).sum(-1), 1) distance_sort = np.argsort(distances2) copy_object = object_array.copy() copy_mask_object = mask_object_array.copy() copy_type = object_types_array.copy() for i, idx in enumerate(distance_sort): object_array[i] = copy_object[idx] mask_object_array[i] = copy_mask_object[idx] object_types_array[i] = copy_type[idx] id_to_idx[idx_to_id[idx]] = i return object_array, mask_object_array, object_types_array, id_to_idx def group_light_signals(light_signals, id_to_idx, n_map_objects): state_to_idx = { "TRAFFIC_LIGHT_STATE_UNKNOWN": 0, "TRAFFIC_LIGHT_STATE_ARROW_STOP": 1, "TRAFFIC_LIGHT_STATE_ARROW_CAUTION": 2, "TRAFFIC_LIGHT_STATE_ARROW_GO": 3, "TRAFFIC_LIGHT_STATE_STOP": 4, "TRAFFIC_LIGHT_STATE_CAUTION": 5, "TRAFFIC_LIGHT_STATE_GO": 6, "TRAFFIC_LIGHT_STATE_FLASHING_STOP": 7, "TRAFFIC_LIGHT_STATE_FLASHING_CAUTION": 8, } len_time = len(light_signals) all_lanes_states = np.zeros((n_map_objects, len_time, len(state_to_idx) - 1)) for t, lanes_states in enumerate(light_signals): for lane in lanes_states.lane_states: if lane.lane in id_to_idx.keys(): all_lanes_states[id_to_idx[lane.lane], t, :] = scalar_to_one_hot( len(state_to_idx) - 1, lane.state, True ) # (n_objects, seq_time, features) return all_lanes_states def normalize_all(traj, map, pos, angle): c = math.cos(angle) s = math.sin(angle) rotation_mat = np.array([[c, s], [-s, c]]) traj_clone = traj.clone() traj_clone[..., :2] = ( traj_clone[..., :2] - pos.reshape(([1] * (traj.ndim - 1)) + [2]) ) @ rotation_mat traj_clone[..., 2] = (traj_clone[..., 2] + angle + np.pi) % (2 * np.pi) - np.pi if traj.shape[-1] >= 5: traj_clone[..., 3:5] = traj_clone[..., 3:5] @ rotation_mat map_clone = (map.clone() - pos.reshape(([1] * (map.ndim - 1)) + [2])) @ rotation_mat return traj_clone, map_clone def fill_gaps(trajs, mask_in): """ If trajectories are partially observed with gaps (observed then not then observed again), fill the gaps with interpolations. Args: trajs: size (n_agents, time, features) features are organized as [x, y, angle, vx, vy, other_features ] """ mask = mask_in.copy() first_non_zeros = np.argmax(mask, 1) last_non_zeros = mask.shape[1] - np.argmax(np.flip(mask, 1), 1) has_gaps = np.logical_and( last_non_zeros - first_non_zeros > np.maximum(mask.sum(1), 1), mask.sum(1) > 1 ) if not has_gaps.any(): # No gap to fill, returning the input return trajs # iterate over agents for i in range(trajs.shape[0]): if has_gaps[i]: left = first_non_zeros[i] right = first_non_zeros[i] for t in range(first_non_zeros[i], last_non_zeros[i]): if mask[i, t] and left == right: left += 1 elif mask[i, t]: break else: mask[i, t] = True right += 1 # Linear filling for positions: trajs[i, left:right, :2] = (np.arange(right - left) / (right - left))[ :, None ] * (trajs[i, right, :2] - trajs[i, left - 1, :2])[None, :] + trajs[ i, left - 1 : left, :2 ] # Linear filling for velocities and the rest: trajs[i, left:right, 3:] = (np.arange(right - left) / (right - left))[ :, None ] * (trajs[i, right, 3:] - trajs[i, left - 1, 3:])[None, :] + trajs[ i, left - 1 : left, 3: ] # Linear filling for angles (periodicity doesn't allow direct interpolation): cos_traj = np.cos(trajs[i, left - 1 : right + 1, 2]) sin_traj = np.sin(trajs[i, left - 1 : right + 1, 2]) cos_traj = (np.arange(right - left) / (right - left)) * ( cos_traj[-1] - cos_traj[0] ) + cos_traj[0] sin_traj = (np.arange(right - left) / (right - left)) * ( sin_traj[-1] - sin_traj[0] ) + sin_traj[0] trajs[i, left:right, 2] = np.arctan2(sin_traj, cos_traj) # Only the first gap was filled, recursive call to complete others return fill_gaps(trajs, mask) def group_scenario(scenario): ids_of_interest = list(set(scenario.objects_of_interest)) # Only gather scenario with a pair of interacting vehicles if len(ids_of_interest) != 2: return None traj, mask_traj, traj_type, id_to_idx = group_tracks(scenario.tracks) assert (traj[..., :2][mask_traj] != 0).any(-1).all() to_predict = np.zeros(traj.shape[0], dtype=bool) for idx in scenario.tracks_to_predict: to_predict[idx.track_index] = True # # Set ego as the first agent in the list of trajectories # index_ego = scenario.sdc_track_index # if index_ego != 0: # for key, value in id_to_idx.items(): # if value == 0: # id_0 = key # traj[[0, index_ego]] = traj[[index_ego, 0]] # mask_traj[[0, index_ego]] = mask_traj[[index_ego, 0]] # traj_type[[0, index_ego]] = traj_type[[index_ego, 0]] # to_predict[[0, index_ego]] = to_predict[[index_ego, 0]] # id_to_idx[id_0] = index_ego # id_to_idx[scenario.sdc_track_index] = 0 # Set the agents of interest as the first agents in the list of trajectories for key, value in id_to_idx.items(): if value == 0: id_0 = key elif value == 1: id_1 = key indices_of_interest = sorted( [id_to_idx[ids_of_interest[0]], id_to_idx[ids_of_interest[1]]] ) traj[[0, indices_of_interest[0]]] = traj[ [ indices_of_interest[0], 0, ] ] mask_traj[[0, indices_of_interest[0]]] = mask_traj[ [ indices_of_interest[0], 0, ] ] traj_type[[0, indices_of_interest[0]]] = traj_type[ [ indices_of_interest[0], 0, ] ] to_predict[[0, indices_of_interest[0]]] = to_predict[ [ indices_of_interest[0], 0, ] ] traj[[1, indices_of_interest[1]]] = traj[[indices_of_interest[1], 1]] mask_traj[[1, indices_of_interest[1]]] = mask_traj[[indices_of_interest[1], 1]] traj_type[[1, indices_of_interest[1]]] = traj_type[[indices_of_interest[1], 1]] to_predict[[1, indices_of_interest[1]]] = to_predict[[indices_of_interest[1], 1]] id_to_idx[id_0] = id_to_idx[ids_of_interest[0]] id_to_idx[ids_of_interest[0]] = 0 id_to_idx[id_1] = id_to_idx[ids_of_interest[1]] id_to_idx[ids_of_interest[1]] = 1 assert (traj[..., :2][mask_traj] != 0).any(-1).all() # ego_current_state = scenario.tracks[scenario.sdc_track_index].states[scenario.current_time_index] # angle = ego_current_state.heading traj = fill_gaps(traj, mask_traj) pos = traj[0, scenario.current_time_index, :2] angle = traj[0, scenario.current_time_index, 2] # mask_agent_of_interest = np.zeros((traj.shape[0]), dtype=bool) # idx_of_interest = [id_to_idx[id] for id in scenario.objects_of_interest] # mask_agent_of_interest[idx_of_interest] = True traj, mask_traj, traj_type, to_predict, id_to_idx = filter_tracks( pos, traj, mask_traj, traj_type, to_predict, id_to_idx, mask_keep=to_predict, max_moving_distance=50, max_static_distance=30, ) assert (traj[..., :2][mask_traj] != 0).any(-1).all() if traj.shape[0] > 100: print(traj.shape[0]) map, mask_map, map_type, map_id_to_idx = group_lanes( scenario.map_features, pos, max_lane_len=50, max_lane_distance=50 ) lane_states = group_light_signals( scenario.dynamic_map_states, map_id_to_idx, map.shape[0] ) traj, map = normalize_all(traj, map, pos, -angle) assert ( ( traj[0, scenario.current_time_index + 1 :, :2][ mask_traj[0, scenario.current_time_index + 1 :] ] != 0 ) .any(-1) .all() ) assert ( ( traj[0, : scenario.current_time_index, :2][ mask_traj[0, : scenario.current_time_index] ] != 0 ) .any(-1) .all() ) assert (traj[1:, :, :2][mask_traj[1:, :]] != 0).any(-1).all() len_pred = traj.shape[1] - scenario.current_time_index - 1 traj = traj.transpose((1, 0, 2)) mask_traj = mask_traj.transpose((1, 0)) map = map.transpose((1, 0, 2)) mask_map = mask_map.transpose((1, 0)) assert ( ( traj[scenario.current_time_index + 1 :, 0, :2][ mask_traj[scenario.current_time_index + 1 :, 0] ] != 0 ) .any(-1) .all() ) assert ( ( traj[: scenario.current_time_index, 0, :2][ mask_traj[: scenario.current_time_index, 0] ] != 0 ) .any(-1) .all() ) assert (traj[:, 1:, :2][mask_traj[:, 1:]] != 0).any(-1).all() # Mask futures for trajectories that are not to be predicted traj = traj * mask_traj[:, :, None] # to_predict[0] = True # to_predict[1] = True # mask_traj[scenario.current_time_index+1:, np.logical_not(to_predict)] = 0 mask_to_predict = mask_traj.copy() mask_to_predict[:, np.logical_not(to_predict)] = False assert ( ( traj[scenario.current_time_index + 1 :, 0, :2][ mask_to_predict[scenario.current_time_index + 1 :, 0] ] != 0 ) .any(-1) .all() ) assert ( ( traj[: scenario.current_time_index, 0, :2][ mask_to_predict[: scenario.current_time_index, 0] ] != 0 ) .any(-1) .all() ) assert (traj[:, 1:, :2][mask_to_predict[:, 1:]] != 0).any(-1).all() return { "traj": traj, "mask_traj": mask_traj, "mask_to_predict": mask_to_predict, "lanes": map, "lane_states": lane_states, "mask_lanes": mask_map, "len_pred": len_pred, "mean_pos": pos, } def preprocess_scenario(data, output_dir): scenario = scenario_pb2.Scenario() scenario.ParseFromString(data.numpy()) scenario_id = scenario.scenario_id scenario = group_scenario(scenario) if scenario is not None: with open(os.path.join(output_dir, scenario_id), "wb") as handle: pickle.dump(scenario, handle) def preprocess_scenarios(scenario_dir, output_dir, debug_size=None, num_parallel=8): """Preprocesses waymo motion data in scenario file format. Args: scenario_dir: Directory containing scenario files. output_dir: Directory in which to output preprocessed samples debug_size: If provided, limit to this number of output samples. This is the _max_ number of samples, but fewer may result. num_parallel: Number of processes to run in parallel. Recommend to set this to number of cores - 1. """ assert os.path.exists(scenario_dir) filenames = os.listdir(scenario_dir) print(f"Saving files in {output_dir}") filepaths = [os.path.join(scenario_dir, f) for f in filenames] dataset = tf.data.TFRecordDataset(filepaths) os.makedirs(output_dir, exist_ok=True) pool = ProcessPoolExecutor(num_parallel) futures = [] for i, data in enumerate(tqdm(dataset)): future = pool.submit(preprocess_scenario, data=data, output_dir=output_dir) # future = preprocess_scenario(data=data, output_dir=output_dir) futures.append(future) if debug_size is not None and i >= debug_size: break concurrent.futures.wait(futures) pool.shutdown() if __name__ == "__main__": """ The way this works is it provides a command line interface to the function where you just pass whatever arguments the function takes to the script. You can get a help message with: $ python scripts/interaction_utils/generate_dataset_waymo.py -h An example you might call with: $ python scripts/interaction_utils/generate_dataset_waymo.py \ /path/to/scenarios/training/ /path/to/output/training --debug_size=1000 --num_parallel=48 """ fire.Fire(preprocess_scenarios)