|
|
|
|
|
import io |
|
from PIL import Image as PImage |
|
import numpy as np |
|
from collections import defaultdict |
|
import cv2 |
|
import open3d as o3d |
|
from typing import Tuple, List |
|
from scipy.spatial.distance import cdist |
|
|
|
from hoho.read_write_colmap import read_cameras_binary, read_images_binary, read_points3D_binary |
|
from hoho.color_mappings import gestalt_color_mapping, ade20k_color_mapping |
|
import matplotlib.pyplot as plt |
|
|
|
from kornia.feature import LoFTR |
|
import kornia as K |
|
import kornia.feature as KF |
|
|
|
import torch |
|
|
|
import copy |
|
|
|
import matplotlib |
|
import matplotlib.colors as mcolors |
|
import matplotlib.pyplot as plt |
|
import numpy as np |
|
|
|
def plot_images(imgs, titles=None, cmaps="gray", dpi=100, size=6, pad=0.5): |
|
"""Plot a set of images horizontally. |
|
Args: |
|
imgs: a list of NumPy or PyTorch images, RGB (H, W, 3) or mono (H, W). |
|
titles: a list of strings, as titles for each image. |
|
cmaps: colormaps for monochrome images. |
|
""" |
|
n = len(imgs) |
|
if not isinstance(cmaps, (list, tuple)): |
|
cmaps = [cmaps] * n |
|
figsize = (size * n, size * 3 / 4) if size is not None else None |
|
fig, ax = plt.subplots(1, n, figsize=figsize, dpi=dpi) |
|
if n == 1: |
|
ax = [ax] |
|
for i in range(n): |
|
ax[i].imshow(imgs[i], cmap=plt.get_cmap(cmaps[i])) |
|
ax[i].get_yaxis().set_ticks([]) |
|
ax[i].get_xaxis().set_ticks([]) |
|
ax[i].set_axis_off() |
|
for spine in ax[i].spines.values(): |
|
spine.set_visible(False) |
|
if titles: |
|
ax[i].set_title(titles[i]) |
|
fig.tight_layout(pad=pad) |
|
|
|
def plot_lines(lines, line_colors="orange", point_colors="cyan", ps=4, lw=2, indices=(0, 1)): |
|
"""Plot lines and endpoints for existing images. |
|
Args: |
|
lines: list of ndarrays of size (N, 2, 2). |
|
colors: string, or list of list of tuples (one for each keypoints). |
|
ps: size of the keypoints as float pixels. |
|
lw: line width as float pixels. |
|
indices: indices of the images to draw the matches on. |
|
""" |
|
if not isinstance(line_colors, list): |
|
line_colors = [line_colors] * len(lines) |
|
if not isinstance(point_colors, list): |
|
point_colors = [point_colors] * len(lines) |
|
|
|
fig = plt.gcf() |
|
ax = fig.axes |
|
assert len(ax) > max(indices) |
|
axes = [ax[i] for i in indices] |
|
fig.canvas.draw() |
|
|
|
|
|
for a, l, lc, pc in zip(axes, lines, line_colors, point_colors): |
|
for i in range(len(l)): |
|
line = matplotlib.lines.Line2D( |
|
(l[i, 1, 1], l[i, 0, 1]), |
|
(l[i, 1, 0], l[i, 0, 0]), |
|
zorder=1, |
|
c=lc, |
|
linewidth=lw, |
|
) |
|
a.add_line(line) |
|
pts = l.reshape(-1, 2) |
|
a.scatter(pts[:, 1], pts[:, 0], c=pc, s=ps, linewidths=0, zorder=2) |
|
|
|
def plot_color_line_matches(lines, lw=2, indices=(0, 1)): |
|
"""Plot line matches for existing images with multiple colors. |
|
Args: |
|
lines: list of ndarrays of size (N, 2, 2). |
|
lw: line width as float pixels. |
|
indices: indices of the images to draw the matches on. |
|
""" |
|
n_lines = len(lines[0]) |
|
|
|
cmap = plt.get_cmap("nipy_spectral", lut=n_lines) |
|
colors = np.array([mcolors.rgb2hex(cmap(i)) for i in range(cmap.N)]) |
|
|
|
np.random.shuffle(colors) |
|
|
|
fig = plt.gcf() |
|
ax = fig.axes |
|
assert len(ax) > max(indices) |
|
axes = [ax[i] for i in indices] |
|
fig.canvas.draw() |
|
|
|
|
|
for a, l in zip(axes, lines): |
|
for i in range(len(l)): |
|
line = matplotlib.lines.Line2D( |
|
(l[i, 1, 1], l[i, 0, 1]), |
|
(l[i, 1, 0], l[i, 0, 0]), |
|
zorder=1, |
|
c=colors[i], |
|
linewidth=lw, |
|
) |
|
a.add_line(line) |
|
|
|
def empty_solution(): |
|
'''Return a minimal valid solution, i.e. 2 vertices and 1 edge.''' |
|
return np.zeros((2,3)), [(0, 1)] |
|
|
|
def convert_entry_to_human_readable(entry): |
|
out = {} |
|
already_good = ['__key__', 'wf_vertices', 'wf_edges', 'edge_semantics', 'mesh_vertices', 'mesh_faces', 'face_semantics', 'K', 'R', 't'] |
|
for k, v in entry.items(): |
|
if k in already_good: |
|
out[k] = v |
|
continue |
|
if k == 'points3d': |
|
out[k] = read_points3D_binary(fid=io.BytesIO(v)) |
|
if k == 'cameras': |
|
out[k] = read_cameras_binary(fid=io.BytesIO(v)) |
|
if k == 'images': |
|
out[k] = read_images_binary(fid=io.BytesIO(v)) |
|
if k in ['ade20k', 'gestalt']: |
|
out[k] = [PImage.open(io.BytesIO(x)).convert('RGB') for x in v] |
|
if k == 'depthcm': |
|
out[k] = [PImage.open(io.BytesIO(x)) for x in entry['depthcm']] |
|
return out |
|
|
|
def get_vertices_and_edges_from_segmentation(gest_seg_np, edge_th = 50.0): |
|
'''Get the vertices and edges from the gestalt segmentation mask of the house''' |
|
vertices = [] |
|
connections = [] |
|
|
|
apex_color = np.array(gestalt_color_mapping['apex']) |
|
apex_mask = cv2.inRange(gest_seg_np, apex_color-0.5, apex_color+0.5) |
|
if apex_mask.sum() > 0: |
|
output = cv2.connectedComponentsWithStats(apex_mask, 8, cv2.CV_32S) |
|
(numLabels, labels, stats, centroids) = output |
|
stats, centroids = stats[1:], centroids[1:] |
|
|
|
for i in range(numLabels-1): |
|
vert = {"xy": centroids[i], "type": "apex"} |
|
vertices.append(vert) |
|
|
|
eave_end_color = np.array(gestalt_color_mapping['eave_end_point']) |
|
eave_end_mask = cv2.inRange(gest_seg_np, eave_end_color-0.5, eave_end_color+0.5) |
|
if eave_end_mask.sum() > 0: |
|
output = cv2.connectedComponentsWithStats(eave_end_mask, 8, cv2.CV_32S) |
|
(numLabels, labels, stats, centroids) = output |
|
stats, centroids = stats[1:], centroids[1:] |
|
|
|
for i in range(numLabels-1): |
|
vert = {"xy": centroids[i], "type": "eave_end_point"} |
|
vertices.append(vert) |
|
|
|
apex_pts = [] |
|
apex_pts_idxs = [] |
|
for j, v in enumerate(vertices): |
|
apex_pts.append(v['xy']) |
|
apex_pts_idxs.append(j) |
|
apex_pts = np.array(apex_pts) |
|
|
|
|
|
for edge_class in ['eave', 'ridge', 'rake', 'valley']: |
|
edge_color = np.array(gestalt_color_mapping[edge_class]) |
|
mask = cv2.morphologyEx(cv2.inRange(gest_seg_np, |
|
edge_color-0.5, |
|
edge_color+0.5), |
|
cv2.MORPH_DILATE, np.ones((11, 11))) |
|
line_img = np.copy(gest_seg_np) * 0 |
|
if mask.sum() > 0: |
|
output = cv2.connectedComponentsWithStats(mask, 8, cv2.CV_32S) |
|
(numLabels, labels, stats, centroids) = output |
|
stats, centroids = stats[1:], centroids[1:] |
|
edges = [] |
|
for i in range(1, numLabels): |
|
y,x = np.where(labels == i) |
|
xleft_idx = np.argmin(x) |
|
x_left = x[xleft_idx] |
|
y_left = y[xleft_idx] |
|
xright_idx = np.argmax(x) |
|
x_right = x[xright_idx] |
|
y_right = y[xright_idx] |
|
edges.append((x_left, y_left, x_right, y_right)) |
|
cv2.line(line_img, (x_left, y_left), (x_right, y_right), (255, 255, 255), 2) |
|
edges = np.array(edges) |
|
if (len(apex_pts) < 2) or len(edges) <1: |
|
continue |
|
pts_to_edges_dist = np.minimum(cdist(apex_pts, edges[:,:2]), cdist(apex_pts, edges[:,2:])) |
|
connectivity_mask = pts_to_edges_dist <= edge_th |
|
edge_connects = connectivity_mask.sum(axis=0) |
|
for edge_idx, edgesum in enumerate(edge_connects): |
|
if edgesum>=2: |
|
connected_verts = np.where(connectivity_mask[:,edge_idx])[0] |
|
for a_i, a in enumerate(connected_verts): |
|
for b in connected_verts[a_i+1:]: |
|
connections.append((a, b)) |
|
return vertices, connections |
|
|
|
def get_uv_depth(vertices, depth): |
|
'''Get the depth of the vertices from the depth image''' |
|
uv = [] |
|
for v in vertices: |
|
uv.append(v['xy']) |
|
uv = np.array(uv) |
|
uv_int = uv.astype(np.int32) |
|
H, W = depth.shape[:2] |
|
uv_int[:, 0] = np.clip( uv_int[:, 0], 0, W-1) |
|
uv_int[:, 1] = np.clip( uv_int[:, 1], 0, H-1) |
|
vertex_depth = depth[(uv_int[:, 1] , uv_int[:, 0])] |
|
return uv, vertex_depth |
|
|
|
from scipy.spatial import distance_matrix |
|
def non_maximum_suppression(points, threshold): |
|
if len(points) == 0: |
|
return [] |
|
|
|
|
|
dist_matrix = distance_matrix(points, points) |
|
|
|
filtered_indices = [] |
|
|
|
|
|
keep = np.ones(len(points), dtype=bool) |
|
for i in range(len(points)): |
|
if keep[i]: |
|
|
|
keep = np.logical_and(keep, dist_matrix[i] > threshold) |
|
keep[i] = True |
|
filtered_indices.append(i) |
|
return points[keep], filtered_indices |
|
|
|
def merge_vertices_3d_ours(vert_edge_per_image, th=0.1): |
|
'''Merge vertices that are close to each other in 3D space and are of same types''' |
|
all_3d_vertices = [] |
|
connections_3d = [] |
|
all_indexes = [] |
|
cur_start = 0 |
|
types = [] |
|
for cimg_idx, (connections, vertices_3d) in vert_edge_per_image.items(): |
|
cur_start+=len(vertices_3d) |
|
all_3d_vertices.append(vertices_3d) |
|
connections+=[(x+cur_start,y+cur_start) for (x,y) in connections] |
|
connections_3d.append(connections) |
|
all_3d_vertices = np.concatenate(all_3d_vertices, axis=0) |
|
new_vertices, _ = non_maximum_suppression(all_3d_vertices, 75) |
|
new_connections = [] |
|
return new_vertices, connections_3d |
|
|
|
def merge_vertices_3d(vert_edge_per_image, th=0.1): |
|
'''Merge vertices that are close to each other in 3D space and are of same types''' |
|
all_3d_vertices = [] |
|
connections_3d = [] |
|
all_indexes = [] |
|
cur_start = 0 |
|
types = [] |
|
for cimg_idx, (vertices, connections, vertices_3d) in vert_edge_per_image.items(): |
|
types += [int(v['type']=='apex') for v in vertices] |
|
all_3d_vertices.append(vertices_3d) |
|
connections_3d+=[(x+cur_start,y+cur_start) for (x,y) in connections] |
|
cur_start+=len(vertices_3d) |
|
all_3d_vertices = np.concatenate(all_3d_vertices, axis=0) |
|
|
|
distmat = cdist(all_3d_vertices, all_3d_vertices) |
|
types = np.array(types).reshape(-1,1) |
|
same_types = cdist(types, types) |
|
mask_to_merge = (distmat <= th) & (same_types==0) |
|
new_vertices = [] |
|
new_connections = [] |
|
to_merge = sorted(list(set([tuple(a.nonzero()[0].tolist()) for a in mask_to_merge]))) |
|
to_merge_final = defaultdict(list) |
|
for i in range(len(all_3d_vertices)): |
|
for j in to_merge: |
|
if i in j: |
|
to_merge_final[i]+=j |
|
for k, v in to_merge_final.items(): |
|
to_merge_final[k] = list(set(v)) |
|
already_there = set() |
|
merged = [] |
|
for k, v in to_merge_final.items(): |
|
if k in already_there: |
|
continue |
|
merged.append(v) |
|
for vv in v: |
|
already_there.add(vv) |
|
old_idx_to_new = {} |
|
count=0 |
|
for idxs in merged: |
|
new_vertices.append(all_3d_vertices[idxs].mean(axis=0)) |
|
for idx in idxs: |
|
old_idx_to_new[idx] = count |
|
count +=1 |
|
|
|
new_vertices=np.array(new_vertices) |
|
|
|
for conn in connections_3d: |
|
new_con = sorted((old_idx_to_new[conn[0]], old_idx_to_new[conn[1]])) |
|
if new_con[0] == new_con[1]: |
|
continue |
|
if new_con not in new_connections: |
|
new_connections.append(new_con) |
|
|
|
return new_vertices, new_connections |
|
|
|
def prune_not_connected(all_3d_vertices, connections_3d): |
|
'''Prune vertices that are not connected to any other vertex''' |
|
connected = defaultdict(list) |
|
for c in connections_3d: |
|
connected[c[0]].append(c) |
|
connected[c[1]].append(c) |
|
new_indexes = {} |
|
new_verts = [] |
|
connected_out = [] |
|
for k,v in connected.items(): |
|
vert = all_3d_vertices[k] |
|
if tuple(vert) not in new_verts: |
|
new_verts.append(tuple(vert)) |
|
new_indexes[k]=len(new_verts) -1 |
|
for k,v in connected.items(): |
|
for vv in v: |
|
connected_out.append((new_indexes[vv[0]],new_indexes[vv[1]])) |
|
connected_out=list(set(connected_out)) |
|
|
|
return np.array(new_verts), connected_out |
|
|
|
def loftr_matcher(gestalt_img_0, gestalt_img1, depth_images): |
|
import torchvision.transforms as transforms |
|
rgb_to_gray = transforms.Compose([ |
|
transforms.ToPILImage(), |
|
transforms.Grayscale(num_output_channels=1), |
|
transforms.ToTensor() |
|
]) |
|
|
|
device = 'cpu' |
|
|
|
w, h = depth_images.size |
|
gest_seg_0 = gestalt_img_0.resize(depth_images.size) |
|
gest_seg_0 = gest_seg_0.convert('L') |
|
gest_seg_0_np = np.array(gest_seg_0) |
|
gest_seg_0_tensor = K.image_to_tensor(gest_seg_0_np, False).float().to(device) |
|
img1 = K.geometry.resize(gest_seg_0_tensor, (int(h/4), int(w/4))) / 255 |
|
|
|
gest_seg_1 = gestalt_img1.resize(depth_images.size) |
|
gest_seg_1 = gest_seg_1.convert('L') |
|
gest_seg_1_np = np.array(gest_seg_1) |
|
gest_seg_1_tensor = K.image_to_tensor(gest_seg_1_np, False).float().to(device) |
|
img2 = K.geometry.resize(gest_seg_1_tensor, (int(h/4), int(w/4))) / 255 |
|
|
|
matcher = KF.LoFTR(pretrained="outdoor").to(device) |
|
|
|
input_dict = { |
|
"image0": img1, |
|
"image1": img2, |
|
} |
|
|
|
|
|
with torch.no_grad(): |
|
correspondences = matcher(input_dict) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mkpts0 = correspondences["keypoints0"].cpu().numpy() * 4 |
|
mkpts1 = correspondences["keypoints1"].cpu().numpy() * 4 |
|
|
|
|
|
heigt_th = int(0.6 * h) |
|
filter_indices = mkpts0[:, 1] < heigt_th |
|
mkpts0 = mkpts0[filter_indices] |
|
mkpts1 = mkpts1[filter_indices] |
|
|
|
return correspondences, mkpts0, mkpts1 |
|
|
|
def disk_matcher(gestalt_img_0, gestalt_img1, depth_images): |
|
import torchvision.transforms as transforms |
|
rgb_to_gray = transforms.Compose([ |
|
transforms.ToPILImage(), |
|
transforms.Grayscale(num_output_channels=1), |
|
transforms.ToTensor() |
|
]) |
|
|
|
device = 'cpu' |
|
|
|
w, h = depth_images.size |
|
gest_seg_0 = gestalt_img_0.resize(depth_images.size) |
|
gest_seg_0 = gest_seg_0.convert('L') |
|
gest_seg_0_np = np.array(gest_seg_0) |
|
gest_seg_0_tensor = K.image_to_tensor(gest_seg_0_np, False).float().to(device) |
|
img1 = K.geometry.resize(gest_seg_0_tensor, (int(h/4), int(w/4))) / 255 |
|
|
|
gest_seg_1 = gestalt_img1.resize(depth_images.size) |
|
gest_seg_1 = gest_seg_1.convert('L') |
|
gest_seg_1_np = np.array(gest_seg_1) |
|
gest_seg_1_tensor = K.image_to_tensor(gest_seg_1_np, False).float().to(device) |
|
img2 = K.geometry.resize(gest_seg_1_tensor, (int(h/4), int(w/4))) / 255 |
|
|
|
num_features = 8192 |
|
disk = KF.DISK.from_pretrained("depth").to(device) |
|
|
|
hw1 = torch.tensor(img1.shape[2:], device=device) |
|
hw2 = torch.tensor(img2.shape[2:], device=device) |
|
|
|
lg_matcher = KF.LightGlueMatcher("disk").eval().to(device) |
|
|
|
with torch.no_grad(): |
|
inp = torch.cat([img1, img2], dim=0) |
|
features1, features2 = disk(inp, num_features, pad_if_not_divisible=True) |
|
kps1, descs1 = features1.keypoints, features1.descriptors |
|
kps2, descs2 = features2.keypoints, features2.descriptors |
|
lafs1 = KF.laf_from_center_scale_ori(kps1[None], torch.ones(1, len(kps1), 1, 1, device=device)) |
|
lafs2 = KF.laf_from_center_scale_ori(kps2[None], torch.ones(1, len(kps2), 1, 1, device=device)) |
|
dists, idxs = lg_matcher(descs1, descs2, lafs1, lafs2, hw1=hw1, hw2=hw2) |
|
print(f"{idxs.shape[0]} tentative matches with DISK LightGlue") |
|
|
|
lg = KF.LightGlue("disk").to(device).eval() |
|
|
|
image0 = { |
|
"keypoints": features1.keypoints[None], |
|
"descriptors": features1.descriptors[None], |
|
"image_size": torch.tensor(img1.shape[-2:][::-1]).view(1, 2).to(device), |
|
} |
|
image1 = { |
|
"keypoints": features2.keypoints[None], |
|
"descriptors": features2.descriptors[None], |
|
"image_size": torch.tensor(img2.shape[-2:][::-1]).view(1, 2).to(device), |
|
} |
|
|
|
with torch.inference_mode(): |
|
out = lg({"image0": image0, "image1": image1}) |
|
idxs = out["matches"][0] |
|
print(f"{idxs.shape[0]} tentative matches with DISK LightGlue") |
|
|
|
def get_matching_keypoints(kp1, kp2, idxs): |
|
mkpts1 = kp1[idxs[:, 0]] |
|
mkpts2 = kp2[idxs[:, 1]] |
|
return mkpts1, mkpts2 |
|
|
|
mkpts0, mkpts1 = get_matching_keypoints(kps1, kps2, idxs) |
|
|
|
mkpts0*=4 |
|
mkpts1*=4 |
|
return mkpts0, mkpts1 |
|
|
|
def save_image_with_keypoints(filename: str, image: np.ndarray, keypoints: np.ndarray, color: Tuple[int, int, int]) -> np.ndarray: |
|
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) |
|
for keypoint in keypoints: |
|
pt = (int(keypoint[0]), int(keypoint[1])) |
|
cv2.circle(image, pt, 4, color, -1) |
|
|
|
cv2.imwrite(filename, image) |
|
|
|
|
|
def save_image_with_lines(filename: str, image: np.ndarray, lines: np.ndarray, color: Tuple[int, int, int]) -> None: |
|
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) |
|
for line in lines: |
|
pt1 = (int(line[0][1]), int(line[0][0])) |
|
pt2 = (int(line[1][1]), int(line[1][0])) |
|
cv2.line(image, pt1, pt2, color, 2) |
|
cv2.imwrite(filename, image) |
|
|
|
def line_matcher(gestalt_img_0, gestalt_img1, depth_images, line_th=0.1): |
|
import torchvision.transforms as transforms |
|
rgb_to_gray = transforms.Compose([ |
|
transforms.ToPILImage(), |
|
transforms.Grayscale(num_output_channels=1), |
|
transforms.ToTensor() |
|
]) |
|
|
|
device = 'cpu' |
|
|
|
w, h = depth_images.size |
|
|
|
gest_seg_0 = gestalt_img_0.resize(depth_images.size) |
|
gest_seg_0 = gest_seg_0.convert('L') |
|
gest_seg_0_np = np.array(gest_seg_0) |
|
gest_seg_0_tensor = K.image_to_tensor(gest_seg_0_np, False).float().to(device) |
|
img1 = K.geometry.resize(gest_seg_0_tensor, (int(h/4), int(w/4))) / 255 |
|
|
|
gest_seg_1 = gestalt_img1.resize(depth_images.size) |
|
gest_seg_1 = gest_seg_1.convert('L') |
|
gest_seg_1_np = np.array(gest_seg_1) |
|
gest_seg_1_tensor = K.image_to_tensor(gest_seg_1_np, False).float().to(device) |
|
img2 = K.geometry.resize(gest_seg_1_tensor, (int(h/4), int(w/4))) / 255 |
|
|
|
sold2 = KF.SOLD2(pretrained=True, config=None) |
|
|
|
imgs = torch.cat([img1, img2], dim=0) |
|
with torch.inference_mode(): |
|
outputs = sold2(imgs) |
|
print(outputs.keys()) |
|
|
|
line_seg1 = outputs["line_segments"][0] |
|
line_seg2 = outputs["line_segments"][1] |
|
desc1 = outputs["dense_desc"][0] |
|
desc2 = outputs["dense_desc"][1] |
|
|
|
|
|
with torch.no_grad(): |
|
matches = sold2.match(line_seg1, line_seg2, desc1[None], desc2[None]) |
|
|
|
valid_matches = matches != -1 |
|
match_indices = matches[valid_matches] |
|
|
|
matched_lines1 = line_seg1[valid_matches] * 4 |
|
matched_lines2 = line_seg2[match_indices] * 4 |
|
|
|
|
|
heigt_th = int(0.6 * h) |
|
|
|
filter_indices = (matched_lines1[:, :, 0] < heigt_th).all(axis=1) & \ |
|
(matched_lines2[:, :, 0] < heigt_th).all(axis=1) |
|
matched_lines1 = matched_lines1[filter_indices] |
|
matched_lines2 = matched_lines2[filter_indices] |
|
|
|
return matched_lines1, matched_lines2 |
|
|
|
from scipy.ndimage import center_of_mass |
|
|
|
proximity_threshold = 225 |
|
|
|
def find_nearest_point(target_point, points, threshold): |
|
if isinstance(target_point, torch.Tensor): |
|
target_point = target_point.numpy() |
|
if target_point.ndim == 2 and target_point.shape[0] == 1: |
|
target_point = target_point[0] |
|
if points.shape[1] != target_point.shape[0]: |
|
raise ValueError("Shape mismatch: points and target_point must have the same number of dimensions") |
|
distances = np.linalg.norm(points - target_point, axis=1) |
|
min_distance_index = np.argmin(distances) |
|
if distances[min_distance_index] < threshold: |
|
return points[min_distance_index], min_distance_index |
|
return None, None |
|
|
|
def replace_with_center_of_mass(point, mask): |
|
y, x = int(point[1]), int(point[0]) |
|
region_mask = (mask == mask[y, x]) |
|
com = center_of_mass(region_mask) |
|
return np.array([com[1], com[0]]) |
|
|
|
|
|
gestalt_color_mapping = { |
|
'unclassified': [215, 62, 138], |
|
'apex': [235, 88, 48], |
|
'eave_end_point': [248, 130, 228], |
|
'eave': [54, 243, 63], |
|
'ridge': [214, 251, 248], |
|
'rake': [13, 94, 47], |
|
'valley': [85, 27, 65], |
|
'unknown': [127, 127, 127] |
|
} |
|
|
|
def extract_segmented_area(image: np.ndarray, color: List[int]) -> np.ndarray: |
|
lower = np.array(color) - 3 |
|
upper = np.array(color) + 3 |
|
mask = cv2.inRange(image, lower, upper) |
|
return mask |
|
|
|
def combine_masks(image: np.ndarray, color_mapping: dict) -> np.ndarray: |
|
combined_mask = np.zeros(image.shape[:2], dtype=np.uint8) |
|
for color in color_mapping.values(): |
|
mask = extract_segmented_area(image, color) |
|
combined_mask = cv2.bitwise_or(combined_mask, mask) |
|
return combined_mask |
|
|
|
def filter_points_by_mask(points: np.ndarray, mask: np.ndarray) -> np.ndarray: |
|
filtered_points = [] |
|
filtered_indices = [] |
|
for idx, point in enumerate(points): |
|
y, x = int(point[1]), int(point[0]) |
|
if mask[y, x] > 0: |
|
filtered_points.append(point) |
|
filtered_indices.append(idx) |
|
return np.array(filtered_points), filtered_indices |
|
|
|
|
|
|
|
def triangulate_points(mkpts0, mkpts1, R_0, t_0, R_1, t_1, intrinsics): |
|
P0 = intrinsics @ np.hstack((R_0, t_0.reshape(-1, 1))) |
|
P1 = intrinsics @ np.hstack((R_1, t_1.reshape(-1, 1))) |
|
|
|
mkpts0_h = np.vstack((mkpts0.T, np.ones((1, mkpts0.shape[0])))) |
|
mkpts1_h = np.vstack((mkpts1.T, np.ones((1, mkpts1.shape[0])))) |
|
|
|
points_4D_hom = cv2.triangulatePoints(P0, P1, mkpts0_h[:2], mkpts1_h[:2]) |
|
points_3D = points_4D_hom / points_4D_hom[3] |
|
|
|
return points_3D[:3].T |
|
|
|
def predict(entry, visualize=False) -> Tuple[np.ndarray, List[int]]: |
|
good_entry = convert_entry_to_human_readable(entry) |
|
vert_edge_per_image = {} |
|
|
|
for i, (gest, depth, K, R, t) in enumerate(zip(good_entry['gestalt'], |
|
good_entry['depthcm'], |
|
good_entry['K'], |
|
good_entry['R'], |
|
good_entry['t'] |
|
)): |
|
|
|
if i < 2: |
|
j = i + 1 |
|
else: |
|
j = 0 |
|
correspondences, mkpts0, mkpts1 = loftr_matcher(good_entry['gestalt'][i], good_entry['gestalt'][j], good_entry['depthcm'][i]) |
|
|
|
|
|
|
|
gest_seg_np = np.array(gest.resize(depth.size)).astype(np.uint8) |
|
|
|
gest_seg_0 = np.array(good_entry['gestalt'][i].resize(depth.size)).astype(np.uint8) |
|
gest_seg_1 = np.array(good_entry['gestalt'][j].resize(depth.size)).astype(np.uint8) |
|
|
|
combined_mask_0 = combine_masks(gest_seg_0, gestalt_color_mapping) |
|
combined_mask_1 = combine_masks(gest_seg_1, gestalt_color_mapping) |
|
|
|
mkpts_filtered_0, indice_0 = filter_points_by_mask(mkpts0, combined_mask_0) |
|
mkpts_filtered_1 = mkpts1[indice_0] |
|
|
|
|
|
mkpts_filtered_0, filtered_index = non_maximum_suppression(mkpts_filtered_0, 50) |
|
mkpts_filtered_1 = mkpts_filtered_1[filtered_index] |
|
|
|
|
|
|
|
|
|
|
|
R_0 = good_entry['R'][i] |
|
t_0 = good_entry['t'][i] |
|
R_1 = good_entry['R'][j] |
|
t_1 = good_entry['t'][j] |
|
intrinsics = K |
|
|
|
points_3d = triangulate_points(mkpts_filtered_0, mkpts_filtered_1, R_0, t_0, R_1, t_1, intrinsics) |
|
|
|
|
|
line_0, line_1 = line_matcher(good_entry['gestalt'][i], good_entry['gestalt'][j], good_entry['depthcm'][i]) |
|
|
|
vertices, connections = get_vertices_and_edges_from_segmentation(gest_seg_np, edge_th = 5.) |
|
|
|
apex_points = np.array([v['xy'] for v in vertices if v['type'] == 'apex']) |
|
eave_end_points = np.array([v['xy'] for v in vertices if v['type'] == 'eave_end_point']) |
|
|
|
|
|
adjusted_lines = [] |
|
connections_idx = set() |
|
matched_lines = line_matcher(good_entry['gestalt'][i], good_entry['gestalt'][j], good_entry['depthcm'][i]) |
|
|
|
for line in matched_lines[0]: |
|
line = line.numpy() |
|
index_0 = -1 |
|
index_1 = -1 |
|
for k in range(2): |
|
nearest_point_2d, index = find_nearest_point(line[k], mkpts_filtered_0, proximity_threshold) |
|
|
|
connection = None |
|
if nearest_point_2d is not None: |
|
line[k] = torch.tensor(nearest_point_2d, dtype=torch.float32) |
|
if k == 0: |
|
index_0 = index |
|
if k == 1: |
|
index_1 = index |
|
|
|
if index_0 != index_1 and index_0 != -1 and index_1 != -1: |
|
connection = (index_0, index_1) |
|
|
|
connections_idx.add(connection) if connection is not None else None |
|
adjusted_lines.append(line) |
|
connections_idx = list(connections_idx) |
|
adjusted_lines = np.array(adjusted_lines) |
|
|
|
|
|
|
|
|
|
|
|
gest_seg = gest.resize(depth.size) |
|
gest_seg_np = np.array(gest_seg).astype(np.uint8) |
|
|
|
depth_np = np.array(depth) / 2.5 |
|
vertices, connections = get_vertices_and_edges_from_segmentation(gest_seg_np, edge_th = 5.) |
|
if (len(vertices) < 2) or (len(connections) < 1): |
|
print (f'Not enough vertices or connections in image {i}') |
|
vert_edge_per_image[i] = np.empty((0, 2)), [], np.empty((0, 3)) |
|
|
|
uv, depth_vert = get_uv_depth(vertices, depth_np) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
xy_local = np.ones((len(uv), 3)) |
|
xy_local[:, 0] = (uv[:, 0] - K[0,2]) / K[0,0] |
|
xy_local[:, 1] = (uv[:, 1] - K[1,2]) / K[1,1] |
|
|
|
vertices_3d_local = depth_vert[...,None] * (xy_local/np.linalg.norm(xy_local, axis=1)[...,None]) |
|
world_to_cam = np.eye(4) |
|
world_to_cam[:3, :3] = R |
|
world_to_cam[:3, 3] = t.reshape(-1) |
|
cam_to_world = np.linalg.inv(world_to_cam) |
|
vertices_3d = cv2.transform(cv2.convertPointsToHomogeneous(vertices_3d_local), cam_to_world) |
|
vertices_3d = cv2.convertPointsFromHomogeneous(vertices_3d).reshape(-1, 3) |
|
|
|
|
|
|
|
vert_edge_per_image[i] = connections_idx, points_3d |
|
|
|
all_3d_vertices, connections_3d = merge_vertices_3d_ours(vert_edge_per_image, 3.0) |
|
|
|
pcd = o3d.geometry.PointCloud() |
|
pcd.points = o3d.utility.Vector3dVector(all_3d_vertices) |
|
cl, ind = pcd.remove_statistical_outlier(nb_neighbors=10, std_ratio=0.05) |
|
inlier_cloud = pcd.select_by_index(ind) |
|
filtered_vertices = np.asarray(inlier_cloud.points) |
|
|
|
all_3d_vertices_clean = filtered_vertices |
|
|
|
concatenated_list = [] |
|
|
|
|
|
for sublist in connections_3d: |
|
concatenated_list.extend(sublist) |
|
|
|
connections_3d_clean = concatenated_list |
|
|
|
print (f'{len(all_3d_vertices_clean)} vertices and {len(connections_3d_clean)} connections in the 3D vertices') |
|
|
|
if (len(all_3d_vertices_clean) < 2) or len(connections_3d_clean) < 1: |
|
print (f'Not enough vertices or connections in the 3D vertices') |
|
return (good_entry['__key__'], *empty_solution()) |
|
if visualize: |
|
from hoho.viz3d import plot_estimate_and_gt |
|
plot_estimate_and_gt( all_3d_vertices_clean, |
|
connections_3d_clean, |
|
good_entry['wf_vertices'], |
|
good_entry['wf_edges']) |
|
return good_entry['__key__'], all_3d_vertices_clean, connections_3d_clean |
|
|
|
|