# Description: This file contains the handcrafted solution for the task of wireframe reconstruction import io from PIL import Image as PImage import numpy as np from collections import defaultdict import cv2 import open3d as o3d from typing import Tuple, List from scipy.spatial.distance import cdist from hoho.read_write_colmap import read_cameras_binary, read_images_binary, read_points3D_binary from hoho.color_mappings import gestalt_color_mapping, ade20k_color_mapping import matplotlib.pyplot as plt from kornia.feature import LoFTR import kornia as K import kornia.feature as KF import torch import copy import matplotlib import matplotlib.colors as mcolors import matplotlib.pyplot as plt import numpy as np def plot_images(imgs, titles=None, cmaps="gray", dpi=100, size=6, pad=0.5): """Plot a set of images horizontally. Args: imgs: a list of NumPy or PyTorch images, RGB (H, W, 3) or mono (H, W). titles: a list of strings, as titles for each image. cmaps: colormaps for monochrome images. """ n = len(imgs) if not isinstance(cmaps, (list, tuple)): cmaps = [cmaps] * n figsize = (size * n, size * 3 / 4) if size is not None else None fig, ax = plt.subplots(1, n, figsize=figsize, dpi=dpi) if n == 1: ax = [ax] for i in range(n): ax[i].imshow(imgs[i], cmap=plt.get_cmap(cmaps[i])) ax[i].get_yaxis().set_ticks([]) ax[i].get_xaxis().set_ticks([]) ax[i].set_axis_off() for spine in ax[i].spines.values(): # remove frame spine.set_visible(False) if titles: ax[i].set_title(titles[i]) fig.tight_layout(pad=pad) def plot_lines(lines, line_colors="orange", point_colors="cyan", ps=4, lw=2, indices=(0, 1)): """Plot lines and endpoints for existing images. Args: lines: list of ndarrays of size (N, 2, 2). colors: string, or list of list of tuples (one for each keypoints). ps: size of the keypoints as float pixels. lw: line width as float pixels. indices: indices of the images to draw the matches on. """ if not isinstance(line_colors, list): line_colors = [line_colors] * len(lines) if not isinstance(point_colors, list): point_colors = [point_colors] * len(lines) fig = plt.gcf() ax = fig.axes assert len(ax) > max(indices) axes = [ax[i] for i in indices] fig.canvas.draw() # Plot the lines and junctions for a, l, lc, pc in zip(axes, lines, line_colors, point_colors): for i in range(len(l)): line = matplotlib.lines.Line2D( (l[i, 1, 1], l[i, 0, 1]), (l[i, 1, 0], l[i, 0, 0]), zorder=1, c=lc, linewidth=lw, ) a.add_line(line) pts = l.reshape(-1, 2) a.scatter(pts[:, 1], pts[:, 0], c=pc, s=ps, linewidths=0, zorder=2) def plot_color_line_matches(lines, lw=2, indices=(0, 1)): """Plot line matches for existing images with multiple colors. Args: lines: list of ndarrays of size (N, 2, 2). lw: line width as float pixels. indices: indices of the images to draw the matches on. """ n_lines = len(lines[0]) cmap = plt.get_cmap("nipy_spectral", lut=n_lines) colors = np.array([mcolors.rgb2hex(cmap(i)) for i in range(cmap.N)]) np.random.shuffle(colors) fig = plt.gcf() ax = fig.axes assert len(ax) > max(indices) axes = [ax[i] for i in indices] fig.canvas.draw() # Plot the lines for a, l in zip(axes, lines): for i in range(len(l)): line = matplotlib.lines.Line2D( (l[i, 1, 1], l[i, 0, 1]), (l[i, 1, 0], l[i, 0, 0]), zorder=1, c=colors[i], linewidth=lw, ) a.add_line(line) def empty_solution(): '''Return a minimal valid solution, i.e. 2 vertices and 1 edge.''' return np.zeros((2,3)), [(0, 1)] def convert_entry_to_human_readable(entry): out = {} already_good = ['__key__', 'wf_vertices', 'wf_edges', 'edge_semantics', 'mesh_vertices', 'mesh_faces', 'face_semantics', 'K', 'R', 't'] for k, v in entry.items(): if k in already_good: out[k] = v continue if k == 'points3d': out[k] = read_points3D_binary(fid=io.BytesIO(v)) if k == 'cameras': out[k] = read_cameras_binary(fid=io.BytesIO(v)) if k == 'images': out[k] = read_images_binary(fid=io.BytesIO(v)) if k in ['ade20k', 'gestalt']: out[k] = [PImage.open(io.BytesIO(x)).convert('RGB') for x in v] if k == 'depthcm': out[k] = [PImage.open(io.BytesIO(x)) for x in entry['depthcm']] return out def get_vertices_and_edges_from_segmentation(gest_seg_np, edge_th = 50.0): '''Get the vertices and edges from the gestalt segmentation mask of the house''' vertices = [] connections = [] # Apex apex_color = np.array(gestalt_color_mapping['apex']) apex_mask = cv2.inRange(gest_seg_np, apex_color-0.5, apex_color+0.5) if apex_mask.sum() > 0: output = cv2.connectedComponentsWithStats(apex_mask, 8, cv2.CV_32S) (numLabels, labels, stats, centroids) = output stats, centroids = stats[1:], centroids[1:] for i in range(numLabels-1): vert = {"xy": centroids[i], "type": "apex"} vertices.append(vert) eave_end_color = np.array(gestalt_color_mapping['eave_end_point']) eave_end_mask = cv2.inRange(gest_seg_np, eave_end_color-0.5, eave_end_color+0.5) if eave_end_mask.sum() > 0: output = cv2.connectedComponentsWithStats(eave_end_mask, 8, cv2.CV_32S) (numLabels, labels, stats, centroids) = output stats, centroids = stats[1:], centroids[1:] for i in range(numLabels-1): vert = {"xy": centroids[i], "type": "eave_end_point"} vertices.append(vert) # Connectivity apex_pts = [] apex_pts_idxs = [] for j, v in enumerate(vertices): apex_pts.append(v['xy']) apex_pts_idxs.append(j) apex_pts = np.array(apex_pts) # Ridge connects two apex points for edge_class in ['eave', 'ridge', 'rake', 'valley']: edge_color = np.array(gestalt_color_mapping[edge_class]) mask = cv2.morphologyEx(cv2.inRange(gest_seg_np, edge_color-0.5, edge_color+0.5), cv2.MORPH_DILATE, np.ones((11, 11))) line_img = np.copy(gest_seg_np) * 0 if mask.sum() > 0: output = cv2.connectedComponentsWithStats(mask, 8, cv2.CV_32S) (numLabels, labels, stats, centroids) = output stats, centroids = stats[1:], centroids[1:] edges = [] for i in range(1, numLabels): y,x = np.where(labels == i) xleft_idx = np.argmin(x) x_left = x[xleft_idx] y_left = y[xleft_idx] xright_idx = np.argmax(x) x_right = x[xright_idx] y_right = y[xright_idx] edges.append((x_left, y_left, x_right, y_right)) cv2.line(line_img, (x_left, y_left), (x_right, y_right), (255, 255, 255), 2) edges = np.array(edges) if (len(apex_pts) < 2) or len(edges) <1: continue pts_to_edges_dist = np.minimum(cdist(apex_pts, edges[:,:2]), cdist(apex_pts, edges[:,2:])) connectivity_mask = pts_to_edges_dist <= edge_th edge_connects = connectivity_mask.sum(axis=0) for edge_idx, edgesum in enumerate(edge_connects): if edgesum>=2: connected_verts = np.where(connectivity_mask[:,edge_idx])[0] for a_i, a in enumerate(connected_verts): for b in connected_verts[a_i+1:]: connections.append((a, b)) return vertices, connections def get_uv_depth(vertices, depth): '''Get the depth of the vertices from the depth image''' uv = [] for v in vertices: uv.append(v['xy']) uv = np.array(uv) uv_int = uv.astype(np.int32) H, W = depth.shape[:2] uv_int[:, 0] = np.clip( uv_int[:, 0], 0, W-1) uv_int[:, 1] = np.clip( uv_int[:, 1], 0, H-1) vertex_depth = depth[(uv_int[:, 1] , uv_int[:, 0])] return uv, vertex_depth from scipy.spatial import distance_matrix def non_maximum_suppression(points, threshold): if len(points) == 0: return [] # Create a distance matrix dist_matrix = distance_matrix(points, points) filtered_indices = [] # Suppress points within the threshold keep = np.ones(len(points), dtype=bool) for i in range(len(points)): if keep[i]: # Suppress points that are close to the current point keep = np.logical_and(keep, dist_matrix[i] > threshold) keep[i] = True # Keep the current point itself filtered_indices.append(i) return points[keep], filtered_indices def merge_vertices_3d_ours(vert_edge_per_image, th=0.1): '''Merge vertices that are close to each other in 3D space and are of same types''' all_3d_vertices = [] connections_3d = [] all_indexes = [] cur_start = 0 types = [] for cimg_idx, (connections, vertices_3d) in vert_edge_per_image.items(): cur_start+=len(vertices_3d) all_3d_vertices.append(vertices_3d) connections+=[(x+cur_start,y+cur_start) for (x,y) in connections] connections_3d.append(connections) all_3d_vertices = np.concatenate(all_3d_vertices, axis=0) new_vertices, _ = non_maximum_suppression(all_3d_vertices, 75) new_connections = [] return new_vertices, connections_3d def merge_vertices_3d(vert_edge_per_image, th=0.1): '''Merge vertices that are close to each other in 3D space and are of same types''' all_3d_vertices = [] connections_3d = [] all_indexes = [] cur_start = 0 types = [] for cimg_idx, (vertices, connections, vertices_3d) in vert_edge_per_image.items(): types += [int(v['type']=='apex') for v in vertices] all_3d_vertices.append(vertices_3d) connections_3d+=[(x+cur_start,y+cur_start) for (x,y) in connections] cur_start+=len(vertices_3d) all_3d_vertices = np.concatenate(all_3d_vertices, axis=0) #print (connections_3d) distmat = cdist(all_3d_vertices, all_3d_vertices) types = np.array(types).reshape(-1,1) same_types = cdist(types, types) mask_to_merge = (distmat <= th) & (same_types==0) new_vertices = [] new_connections = [] to_merge = sorted(list(set([tuple(a.nonzero()[0].tolist()) for a in mask_to_merge]))) to_merge_final = defaultdict(list) for i in range(len(all_3d_vertices)): for j in to_merge: if i in j: to_merge_final[i]+=j for k, v in to_merge_final.items(): to_merge_final[k] = list(set(v)) already_there = set() merged = [] for k, v in to_merge_final.items(): if k in already_there: continue merged.append(v) for vv in v: already_there.add(vv) old_idx_to_new = {} count=0 for idxs in merged: new_vertices.append(all_3d_vertices[idxs].mean(axis=0)) for idx in idxs: old_idx_to_new[idx] = count count +=1 #print (connections_3d) new_vertices=np.array(new_vertices) #print (connections_3d) for conn in connections_3d: new_con = sorted((old_idx_to_new[conn[0]], old_idx_to_new[conn[1]])) if new_con[0] == new_con[1]: continue if new_con not in new_connections: new_connections.append(new_con) #print (f'{len(new_vertices)} left after merging {len(all_3d_vertices)} with {th=}') return new_vertices, new_connections def prune_not_connected(all_3d_vertices, connections_3d): '''Prune vertices that are not connected to any other vertex''' connected = defaultdict(list) for c in connections_3d: connected[c[0]].append(c) connected[c[1]].append(c) new_indexes = {} new_verts = [] connected_out = [] for k,v in connected.items(): vert = all_3d_vertices[k] if tuple(vert) not in new_verts: new_verts.append(tuple(vert)) new_indexes[k]=len(new_verts) -1 for k,v in connected.items(): for vv in v: connected_out.append((new_indexes[vv[0]],new_indexes[vv[1]])) connected_out=list(set(connected_out)) return np.array(new_verts), connected_out def loftr_matcher(gestalt_img_0, gestalt_img1, depth_images): import torchvision.transforms as transforms rgb_to_gray = transforms.Compose([ transforms.ToPILImage(), # Convert tensor to PIL image transforms.Grayscale(num_output_channels=1), # Convert to grayscale transforms.ToTensor() # Convert back to tensor ]) device = 'cpu'#torch.device('cuda' if torch.cuda.is_available() else 'cpu') w, h = depth_images.size gest_seg_0 = gestalt_img_0.resize(depth_images.size) gest_seg_0 = gest_seg_0.convert('L') gest_seg_0_np = np.array(gest_seg_0) gest_seg_0_tensor = K.image_to_tensor(gest_seg_0_np, False).float().to(device) img1 = K.geometry.resize(gest_seg_0_tensor, (int(h/4), int(w/4))) / 255 gest_seg_1 = gestalt_img1.resize(depth_images.size) gest_seg_1 = gest_seg_1.convert('L') gest_seg_1_np = np.array(gest_seg_1) gest_seg_1_tensor = K.image_to_tensor(gest_seg_1_np, False).float().to(device) img2 = K.geometry.resize(gest_seg_1_tensor, (int(h/4), int(w/4))) / 255 matcher = KF.LoFTR(pretrained="outdoor").to(device) input_dict = { "image0": img1, "image1": img2, } # print("Input dict shape", input_dict["image0"].shape, input_dict["image1"].shape) with torch.no_grad(): correspondences = matcher(input_dict) # mkpts0 = correspondences["keypoints0"].cpu().numpy() # mkpts1 = correspondences["keypoints1"].cpu().numpy() # Fm, inliers = cv2.findFundamentalMat(mkpts0, mkpts1, cv2.USAC_MAGSAC, 0.99, 0.3, 100000) # inliers = inliers > 0 # inliers_flat = inliers.flatten() mkpts0 = correspondences["keypoints0"].cpu().numpy() * 4 mkpts1 = correspondences["keypoints1"].cpu().numpy() * 4 # filter out keypoints that are in [0 - W, 0.4H - H] w=1920, h=1080 heigt_th = int(0.6 * h) filter_indices = mkpts0[:, 1] < heigt_th mkpts0 = mkpts0[filter_indices] mkpts1 = mkpts1[filter_indices] return correspondences, mkpts0, mkpts1 def disk_matcher(gestalt_img_0, gestalt_img1, depth_images): import torchvision.transforms as transforms rgb_to_gray = transforms.Compose([ transforms.ToPILImage(), # Convert tensor to PIL image transforms.Grayscale(num_output_channels=1), # Convert to grayscale transforms.ToTensor() # Convert back to tensor ]) device = 'cpu'#torch.device('cuda' if torch.cuda.is_available() else 'cpu') w, h = depth_images.size gest_seg_0 = gestalt_img_0.resize(depth_images.size) gest_seg_0 = gest_seg_0.convert('L') gest_seg_0_np = np.array(gest_seg_0) gest_seg_0_tensor = K.image_to_tensor(gest_seg_0_np, False).float().to(device) img1 = K.geometry.resize(gest_seg_0_tensor, (int(h/4), int(w/4))) / 255 gest_seg_1 = gestalt_img1.resize(depth_images.size) gest_seg_1 = gest_seg_1.convert('L') gest_seg_1_np = np.array(gest_seg_1) gest_seg_1_tensor = K.image_to_tensor(gest_seg_1_np, False).float().to(device) img2 = K.geometry.resize(gest_seg_1_tensor, (int(h/4), int(w/4))) / 255 num_features = 8192 disk = KF.DISK.from_pretrained("depth").to(device) hw1 = torch.tensor(img1.shape[2:], device=device) hw2 = torch.tensor(img2.shape[2:], device=device) lg_matcher = KF.LightGlueMatcher("disk").eval().to(device) with torch.no_grad(): inp = torch.cat([img1, img2], dim=0) features1, features2 = disk(inp, num_features, pad_if_not_divisible=True) kps1, descs1 = features1.keypoints, features1.descriptors kps2, descs2 = features2.keypoints, features2.descriptors lafs1 = KF.laf_from_center_scale_ori(kps1[None], torch.ones(1, len(kps1), 1, 1, device=device)) lafs2 = KF.laf_from_center_scale_ori(kps2[None], torch.ones(1, len(kps2), 1, 1, device=device)) dists, idxs = lg_matcher(descs1, descs2, lafs1, lafs2, hw1=hw1, hw2=hw2) print(f"{idxs.shape[0]} tentative matches with DISK LightGlue") lg = KF.LightGlue("disk").to(device).eval() image0 = { "keypoints": features1.keypoints[None], "descriptors": features1.descriptors[None], "image_size": torch.tensor(img1.shape[-2:][::-1]).view(1, 2).to(device), } image1 = { "keypoints": features2.keypoints[None], "descriptors": features2.descriptors[None], "image_size": torch.tensor(img2.shape[-2:][::-1]).view(1, 2).to(device), } with torch.inference_mode(): out = lg({"image0": image0, "image1": image1}) idxs = out["matches"][0] print(f"{idxs.shape[0]} tentative matches with DISK LightGlue") def get_matching_keypoints(kp1, kp2, idxs): mkpts1 = kp1[idxs[:, 0]] mkpts2 = kp2[idxs[:, 1]] return mkpts1, mkpts2 mkpts0, mkpts1 = get_matching_keypoints(kps1, kps2, idxs) mkpts0*=4 mkpts1*=4 return mkpts0, mkpts1 def save_image_with_keypoints(filename: str, image: np.ndarray, keypoints: np.ndarray, color: Tuple[int, int, int]) -> np.ndarray: image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) for keypoint in keypoints: pt = (int(keypoint[0]), int(keypoint[1])) cv2.circle(image, pt, 4, color, -1) # save as png cv2.imwrite(filename, image) ###### added for lines detection ###### def save_image_with_lines(filename: str, image: np.ndarray, lines: np.ndarray, color: Tuple[int, int, int]) -> None: image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) for line in lines: pt1 = (int(line[0][1]), int(line[0][0])) pt2 = (int(line[1][1]), int(line[1][0])) cv2.line(image, pt1, pt2, color, 2) cv2.imwrite(filename, image) def line_matcher(gestalt_img_0, gestalt_img1, depth_images, line_th=0.1): import torchvision.transforms as transforms rgb_to_gray = transforms.Compose([ transforms.ToPILImage(), # Convert tensor to PIL image transforms.Grayscale(num_output_channels=1), # Convert to grayscale transforms.ToTensor() # Convert back to tensor ]) device = 'cpu' w, h = depth_images.size gest_seg_0 = gestalt_img_0.resize(depth_images.size) gest_seg_0 = gest_seg_0.convert('L') gest_seg_0_np = np.array(gest_seg_0) gest_seg_0_tensor = K.image_to_tensor(gest_seg_0_np, False).float().to(device) img1 = K.geometry.resize(gest_seg_0_tensor, (int(h/4), int(w/4))) / 255 gest_seg_1 = gestalt_img1.resize(depth_images.size) gest_seg_1 = gest_seg_1.convert('L') gest_seg_1_np = np.array(gest_seg_1) gest_seg_1_tensor = K.image_to_tensor(gest_seg_1_np, False).float().to(device) img2 = K.geometry.resize(gest_seg_1_tensor, (int(h/4), int(w/4))) / 255 sold2 = KF.SOLD2(pretrained=True, config=None) imgs = torch.cat([img1, img2], dim=0) with torch.inference_mode(): outputs = sold2(imgs) print(outputs.keys()) line_seg1 = outputs["line_segments"][0] line_seg2 = outputs["line_segments"][1] desc1 = outputs["dense_desc"][0] desc2 = outputs["dense_desc"][1] # print("Input dict shape", input_dict["image0"].shape, input_dict["image1"].shape) with torch.no_grad(): matches = sold2.match(line_seg1, line_seg2, desc1[None], desc2[None]) valid_matches = matches != -1 match_indices = matches[valid_matches] matched_lines1 = line_seg1[valid_matches] * 4 matched_lines2 = line_seg2[match_indices] * 4 # filter out lines each single point is in [0 - W, 0.4H - H] w=1920, h=1080 heigt_th = int(0.6 * h) # filter_indices = (matched_lines1[:, 0, 1] < heigt_th).all(1) & (matched_lines1[:, 0, 1] < heigt_th).all(1) filter_indices = (matched_lines1[:, :, 0] < heigt_th).all(axis=1) & \ (matched_lines2[:, :, 0] < heigt_th).all(axis=1) matched_lines1 = matched_lines1[filter_indices] matched_lines2 = matched_lines2[filter_indices] return matched_lines1, matched_lines2 from scipy.ndimage import center_of_mass proximity_threshold = 225 def find_nearest_point(target_point, points, threshold): if isinstance(target_point, torch.Tensor): target_point = target_point.numpy() if target_point.ndim == 2 and target_point.shape[0] == 1: target_point = target_point[0] if points.shape[1] != target_point.shape[0]: raise ValueError("Shape mismatch: points and target_point must have the same number of dimensions") distances = np.linalg.norm(points - target_point, axis=1) min_distance_index = np.argmin(distances) if distances[min_distance_index] < threshold: return points[min_distance_index], min_distance_index return None, None def replace_with_center_of_mass(point, mask): y, x = int(point[1]), int(point[0]) region_mask = (mask == mask[y, x]) com = center_of_mass(region_mask) return np.array([com[1], com[0]]) # Return as (x, y) # Gestalt color mapping gestalt_color_mapping = { 'unclassified': [215, 62, 138], 'apex': [235, 88, 48], 'eave_end_point': [248, 130, 228], 'eave': [54, 243, 63], 'ridge': [214, 251, 248], 'rake': [13, 94, 47], 'valley': [85, 27, 65], 'unknown': [127, 127, 127] } def extract_segmented_area(image: np.ndarray, color: List[int]) -> np.ndarray: lower = np.array(color) - 3 # 0.5 upper = np.array(color) + 3 # 0.5 mask = cv2.inRange(image, lower, upper) return mask def combine_masks(image: np.ndarray, color_mapping: dict) -> np.ndarray: combined_mask = np.zeros(image.shape[:2], dtype=np.uint8) for color in color_mapping.values(): mask = extract_segmented_area(image, color) combined_mask = cv2.bitwise_or(combined_mask, mask) return combined_mask def filter_points_by_mask(points: np.ndarray, mask: np.ndarray) -> np.ndarray: filtered_points = [] filtered_indices = [] for idx, point in enumerate(points): y, x = int(point[1]), int(point[0]) if mask[y, x] > 0: filtered_points.append(point) filtered_indices.append(idx) return np.array(filtered_points), filtered_indices ###### added for lines detection ######## def triangulate_points(mkpts0, mkpts1, R_0, t_0, R_1, t_1, intrinsics): P0 = intrinsics @ np.hstack((R_0, t_0.reshape(-1, 1))) P1 = intrinsics @ np.hstack((R_1, t_1.reshape(-1, 1))) mkpts0_h = np.vstack((mkpts0.T, np.ones((1, mkpts0.shape[0])))) mkpts1_h = np.vstack((mkpts1.T, np.ones((1, mkpts1.shape[0])))) points_4D_hom = cv2.triangulatePoints(P0, P1, mkpts0_h[:2], mkpts1_h[:2]) points_3D = points_4D_hom / points_4D_hom[3] return points_3D[:3].T def predict(entry, visualize=False) -> Tuple[np.ndarray, List[int]]: good_entry = convert_entry_to_human_readable(entry) vert_edge_per_image = {} for i, (gest, depth, K, R, t) in enumerate(zip(good_entry['gestalt'], good_entry['depthcm'], good_entry['K'], good_entry['R'], good_entry['t'] )): # LoFTR matching keypoints if i < 2: j = i + 1 else: j = 0 correspondences, mkpts0, mkpts1 = loftr_matcher(good_entry['gestalt'][i], good_entry['gestalt'][j], good_entry['depthcm'][i]) # mkpts0, mkpts1 = disk_matcher(good_entry['gestalt'][i], good_entry['gestalt'][j], good_entry['depthcm'][i]) # Added by Tang: apply mask to filter out keypoints in mkpts0 gest_seg_np = np.array(gest.resize(depth.size)).astype(np.uint8) gest_seg_0 = np.array(good_entry['gestalt'][i].resize(depth.size)).astype(np.uint8) gest_seg_1 = np.array(good_entry['gestalt'][j].resize(depth.size)).astype(np.uint8) combined_mask_0 = combine_masks(gest_seg_0, gestalt_color_mapping) combined_mask_1 = combine_masks(gest_seg_1, gestalt_color_mapping) mkpts_filtered_0, indice_0 = filter_points_by_mask(mkpts0, combined_mask_0) mkpts_filtered_1 = mkpts1[indice_0] # Add NMS for 2D keypoints mkpts_filtered_0, filtered_index = non_maximum_suppression(mkpts_filtered_0, 50) mkpts_filtered_1 = mkpts_filtered_1[filtered_index] # save_image_with_keypoints(f'keypoints_{i}.png', np.array(good_entry['gestalt'][i]), mkpts_filtered_0, (255, 0, 0)) # save_image_with_keypoints(f'keypoints_{j}.png', np.array(good_entry['gestalt'][j]), mkpts_filtered_1, (255, 0, 0)) # Triangulation with matched keypoints R_0 = good_entry['R'][i] t_0 = good_entry['t'][i] R_1 = good_entry['R'][j] t_1 = good_entry['t'][j] intrinsics = K points_3d = triangulate_points(mkpts_filtered_0, mkpts_filtered_1, R_0, t_0, R_1, t_1, intrinsics) # Line matching line_0, line_1 = line_matcher(good_entry['gestalt'][i], good_entry['gestalt'][j], good_entry['depthcm'][i]) vertices, connections = get_vertices_and_edges_from_segmentation(gest_seg_np, edge_th = 5.) apex_points = np.array([v['xy'] for v in vertices if v['type'] == 'apex']) eave_end_points = np.array([v['xy'] for v in vertices if v['type'] == 'eave_end_point']) # Adjust lines based on proximity to points_3d, apex, and eave_end_points adjusted_lines = [] connections_idx = set() matched_lines = line_matcher(good_entry['gestalt'][i], good_entry['gestalt'][j], good_entry['depthcm'][i]) for line in matched_lines[0]: line = line.numpy() index_0 = -1 index_1 = -1 for k in range(2): nearest_point_2d, index = find_nearest_point(line[k], mkpts_filtered_0, proximity_threshold) connection = None if nearest_point_2d is not None: line[k] = torch.tensor(nearest_point_2d, dtype=torch.float32) if k == 0: index_0 = index if k == 1: index_1 = index if index_0 != index_1 and index_0 != -1 and index_1 != -1: connection = (index_0, index_1) # append all indices of the matched lines connections_idx.add(connection) if connection is not None else None adjusted_lines.append(line) connections_idx = list(connections_idx) adjusted_lines = np.array(adjusted_lines) # save_image_with_lines(f'line_{i}.png', np.array(good_entry['gestalt'][i]), line_0, (255, 0, 0)) # save_image_with_lines(f'line_{j}.png', np.array(good_entry['gestalt'][j]), line_1, (255, 0, 0)) gest_seg = gest.resize(depth.size) gest_seg_np = np.array(gest_seg).astype(np.uint8) # Metric3D depth_np = np.array(depth) / 2.5 # 2.5 is the scale estimation coefficient vertices, connections = get_vertices_and_edges_from_segmentation(gest_seg_np, edge_th = 5.) if (len(vertices) < 2) or (len(connections) < 1): print (f'Not enough vertices or connections in image {i}') vert_edge_per_image[i] = np.empty((0, 2)), [], np.empty((0, 3)) # continue uv, depth_vert = get_uv_depth(vertices, depth_np) # monodepth # r<32 scale = colmap depth / monodepth # monodepth /= scale # # Assuming monodepth is provided similarly as depth # monodepth = ? # scale = np.mean(depth_np / monodepth) # monodepth /= scale # Normalize the uv to the camera intrinsics xy_local = np.ones((len(uv), 3)) xy_local[:, 0] = (uv[:, 0] - K[0,2]) / K[0,0] xy_local[:, 1] = (uv[:, 1] - K[1,2]) / K[1,1] # Get the 3D vertices vertices_3d_local = depth_vert[...,None] * (xy_local/np.linalg.norm(xy_local, axis=1)[...,None]) world_to_cam = np.eye(4) world_to_cam[:3, :3] = R world_to_cam[:3, 3] = t.reshape(-1) cam_to_world = np.linalg.inv(world_to_cam) vertices_3d = cv2.transform(cv2.convertPointsToHomogeneous(vertices_3d_local), cam_to_world) vertices_3d = cv2.convertPointsFromHomogeneous(vertices_3d).reshape(-1, 3) # vert_edge_per_image[i] = vertices, connections, vertices_3d # ours method vert_edge_per_image[i] = connections_idx, points_3d all_3d_vertices, connections_3d = merge_vertices_3d_ours(vert_edge_per_image, 3.0) pcd = o3d.geometry.PointCloud() pcd.points = o3d.utility.Vector3dVector(all_3d_vertices) cl, ind = pcd.remove_statistical_outlier(nb_neighbors=10, std_ratio=0.05) inlier_cloud = pcd.select_by_index(ind) filtered_vertices = np.asarray(inlier_cloud.points) all_3d_vertices_clean = filtered_vertices concatenated_list = [] # Iterate over each sublist in connections_3d_clean and extend the main list for sublist in connections_3d: concatenated_list.extend(sublist) connections_3d_clean = concatenated_list print (f'{len(all_3d_vertices_clean)} vertices and {len(connections_3d_clean)} connections in the 3D vertices') if (len(all_3d_vertices_clean) < 2) or len(connections_3d_clean) < 1: print (f'Not enough vertices or connections in the 3D vertices') return (good_entry['__key__'], *empty_solution()) if visualize: from hoho.viz3d import plot_estimate_and_gt plot_estimate_and_gt( all_3d_vertices_clean, connections_3d_clean, good_entry['wf_vertices'], good_entry['wf_edges']) return good_entry['__key__'], all_3d_vertices_clean, connections_3d_clean