|
from multiprocessing import Pool
|
|
from tqdm import tqdm
|
|
from pathlib import Path
|
|
import numpy as np
|
|
from collections import deque
|
|
import argparse
|
|
import cv2
|
|
|
|
def get_raycast_building_mask(building_grid):
|
|
laser_range = 200
|
|
num_laser = 100
|
|
robot_pos = (building_grid.shape[0] // 2-1, building_grid.shape[1] // 2 - 1)
|
|
unoccupied_pos = np.stack(np.where(building_grid != 1), axis=1)
|
|
|
|
if len(unoccupied_pos) == 0:
|
|
return None
|
|
|
|
l2_dist = unoccupied_pos - [robot_pos[0], robot_pos[1]]
|
|
closest = ((l2_dist ** 2).sum(1)**0.5).argmin()
|
|
|
|
robot_pos = (unoccupied_pos[closest][0], unoccupied_pos[closest][1])
|
|
|
|
free_points, hit_points, actual_hit_points = get_free_points_in_front(building_grid, robot_pos, laser_range=laser_range, num_laser=num_laser)
|
|
free_points[:, 0][free_points[:, 0] >= building_grid.shape[0]] = building_grid.shape[0] - 1
|
|
free_points[:, 1][free_points[:, 1] >= building_grid.shape[1]] = building_grid.shape[1] - 1
|
|
free_points[:, 0][free_points[:, 0] < 0] = 0
|
|
free_points[:, 1][free_points[:, 1] < 0] = 0
|
|
|
|
hit_points[:, 0][hit_points[:, 0] >= building_grid.shape[0]] = building_grid.shape[0] - 1
|
|
hit_points[:, 1][hit_points[:, 1] >= building_grid.shape[1]] = building_grid.shape[1] - 1
|
|
hit_points[:, 0][hit_points[:, 0] < 0] = 0
|
|
hit_points[:, 1][hit_points[:, 1] < 0] = 0
|
|
|
|
if len(free_points) > 0:
|
|
|
|
|
|
inited_flood_grid = init_flood_fill(robot_pos, hit_points, building_grid.shape)
|
|
inited_flood_grid = (inited_flood_grid * 255).astype(np.uint8).copy()
|
|
|
|
|
|
np.random.shuffle(free_points)
|
|
|
|
for i in range(len(free_points)):
|
|
seed_point = free_points[i]
|
|
if inited_flood_grid[seed_point[0], seed_point[1]] != 0:
|
|
break
|
|
else:
|
|
print('Unable to find a valid seed point')
|
|
return None
|
|
|
|
num_filled, flooded_image, mask, bounding_box = cv2.floodFill(inited_flood_grid.copy(), None, seedPoint=(seed_point[1], seed_point[0]), newVal=0)
|
|
|
|
return flooded_image
|
|
else:
|
|
print("No free points")
|
|
return None
|
|
|
|
def flood_fill_simple(center_point, occupancy_map):
|
|
"""
|
|
center_point: starting point (x,y) of fill
|
|
occupancy_map: occupancy map generated from Bresenham ray-tracing
|
|
"""
|
|
|
|
occupancy_map = np.copy(occupancy_map)
|
|
sx, sy = occupancy_map.shape
|
|
fringe = deque()
|
|
fringe.appendleft(center_point)
|
|
while fringe:
|
|
|
|
n = fringe.pop()
|
|
nx, ny = n
|
|
unknown_val = 0.5
|
|
|
|
if nx > 0:
|
|
if occupancy_map[nx - 1, ny] == unknown_val:
|
|
occupancy_map[nx - 1, ny] = 0
|
|
fringe.appendleft((nx - 1, ny))
|
|
|
|
if nx < sx - 1:
|
|
if occupancy_map[nx + 1, ny] == unknown_val:
|
|
occupancy_map[nx + 1, ny] = 0
|
|
fringe.appendleft((nx + 1, ny))
|
|
|
|
if ny > 0:
|
|
if occupancy_map[nx, ny - 1] == unknown_val:
|
|
occupancy_map[nx, ny - 1] = 0
|
|
fringe.appendleft((nx, ny - 1))
|
|
|
|
if ny < sy - 1:
|
|
if occupancy_map[nx, ny + 1] == unknown_val:
|
|
occupancy_map[nx, ny + 1] = 0
|
|
fringe.appendleft((nx, ny + 1))
|
|
return occupancy_map
|
|
|
|
def init_flood_fill(robot_pos, obstacle_points, occ_grid_shape):
|
|
"""
|
|
center_point: center point
|
|
obstacle_points: detected obstacles points (x,y)
|
|
xy_points: (x,y) point pairs
|
|
"""
|
|
center_x, center_y = robot_pos
|
|
prev_ix, prev_iy = center_x, center_y
|
|
occupancy_map = (np.ones(occ_grid_shape)) * 0.5
|
|
|
|
obstacle_points = np.vstack((obstacle_points, obstacle_points[0]))
|
|
for (x, y) in zip(obstacle_points[:,0], obstacle_points[:,1]):
|
|
|
|
ix = int(x)
|
|
|
|
iy = int(y)
|
|
free_area = bresenham((prev_ix, prev_iy), (ix, iy))
|
|
for fa in free_area:
|
|
occupancy_map[fa[0]][fa[1]] = 0
|
|
prev_ix = ix
|
|
prev_iy = iy
|
|
return occupancy_map
|
|
|
|
show_animation = False
|
|
|
|
def bresenham(start, end):
|
|
"""
|
|
Implementation of Bresenham's line drawing algorithm
|
|
See en.wikipedia.org/wiki/Bresenham's_line_algorithm
|
|
Bresenham's Line Algorithm
|
|
Produces a np.array from start and end (original from roguebasin.com)
|
|
>>> points1 = bresenham((4, 4), (6, 10))
|
|
>>> print(points1)
|
|
np.array([[4,4], [4,5], [5,6], [5,7], [5,8], [6,9], [6,10]])
|
|
"""
|
|
|
|
x1, y1 = start
|
|
x2, y2 = end
|
|
dx = x2 - x1
|
|
dy = y2 - y1
|
|
is_steep = abs(dy) > abs(dx)
|
|
if is_steep:
|
|
x1, y1 = y1, x1
|
|
x2, y2 = y2, x2
|
|
|
|
swapped = False
|
|
if x1 > x2:
|
|
x1, x2 = x2, x1
|
|
y1, y2 = y2, y1
|
|
swapped = True
|
|
dx = x2 - x1
|
|
dy = y2 - y1
|
|
error = int(dx / 2.0)
|
|
y_step = 1 if y1 < y2 else -1
|
|
|
|
y = y1
|
|
points = []
|
|
for x in range(x1, x2 + 1):
|
|
coord = [y, x] if is_steep else (x, y)
|
|
points.append(coord)
|
|
error -= abs(dy)
|
|
if error < 0:
|
|
y += y_step
|
|
error += dx
|
|
if swapped:
|
|
points.reverse()
|
|
points = np.array(points)
|
|
return points
|
|
|
|
def get_free_points_in_front(occupancy_grid, robot_pos, laser_range=10, num_laser=100):
|
|
"""
|
|
Assumes circular lidar
|
|
occupancy_grid: np.array (h x w)
|
|
robot_pos: (x, y)
|
|
|
|
Outputs:
|
|
free_points: np.array of hit points (x, y)
|
|
"""
|
|
|
|
free_points = []
|
|
hit_points = []
|
|
actual_hit_points = []
|
|
for orientation in np.linspace(np.pi/2, 3*np.pi/2, num_laser):
|
|
end_point = (round(robot_pos[0] + laser_range * np.cos(orientation)), round(robot_pos[1] + laser_range * np.sin(orientation)))
|
|
|
|
|
|
bresenham_points = (bresenham(robot_pos, end_point))
|
|
|
|
|
|
|
|
for i in range(len(bresenham_points)):
|
|
|
|
if bresenham_points[i,0] < 0 or bresenham_points[i,0] >= occupancy_grid.shape[0] or bresenham_points[i,1] < 0 or bresenham_points[i,1] >= occupancy_grid.shape[1]:
|
|
if i != 0:
|
|
hit_points.append(bresenham_points[i-1])
|
|
break
|
|
|
|
if occupancy_grid[bresenham_points[i,0], bresenham_points[i,1]] == 1:
|
|
|
|
for j in range(min(4, len(bresenham_points) - i - 1)):
|
|
free_points.append(bresenham_points[i+j])
|
|
|
|
actual_hit_points.append(bresenham_points[i + j + 1])
|
|
hit_points.append(bresenham_points[i + j + 1])
|
|
|
|
break
|
|
else:
|
|
free_point = bresenham_points[i]
|
|
free_points.append(free_point)
|
|
|
|
if i == len(bresenham_points) - 1:
|
|
hit_points.append(end_point)
|
|
break
|
|
|
|
|
|
|
|
free_points = np.array(free_points)
|
|
hit_points = np.array(hit_points)
|
|
actual_hit_points = np.array(actual_hit_points)
|
|
return free_points, hit_points, actual_hit_points
|
|
|
|
if __name__ == "__main__":
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--dataset_folder", type=str, default="/path/to/raycast")
|
|
parser.add_argument("--class_idx_building", type=int, default=4)
|
|
parser.add_argument("--num_workers", type=int, default=60)
|
|
parser.add_argument("--location", type=str, default="los_angeles")
|
|
|
|
args = parser.parse_args()
|
|
|
|
dataset_folder = Path(args.dataset_folder)
|
|
bev_folder = dataset_folder / args.location / "semantic_masks"
|
|
output_folder = dataset_folder / args.location / "flood_fill"
|
|
|
|
output_folder.mkdir(exist_ok=True, parents=True)
|
|
|
|
def generate_mask(filepath):
|
|
mask = np.load(filepath)
|
|
building_grid = mask[..., args.class_idx_building]
|
|
try:
|
|
flooded_image = get_raycast_building_mask(building_grid)
|
|
except:
|
|
raise Exception(f"Error in {filepath}")
|
|
|
|
if flooded_image is not None:
|
|
output_file = output_folder / filepath.name
|
|
np.save(output_file, flooded_image)
|
|
else:
|
|
print("No flood fill generated")
|
|
|
|
bev_files = list(bev_folder.iterdir())
|
|
|
|
with Pool(args.num_workers) as p:
|
|
for _ in tqdm(p.imap_unordered(generate_mask, bev_files), total=len(bev_files)):
|
|
pass
|
|
|
|
|