Demo / mast3r /colmap /database.py
jerome-revaud's picture
Initial commit
83ae704
raw
history blame
17.2 kB
# Copyright (C) 2024-present Naver Corporation. All rights reserved.
# Licensed under CC BY-NC-SA 4.0 (non-commercial use only).
#
# --------------------------------------------------------
# MASt3R to colmap export functions
# --------------------------------------------------------
import os
import torch
import copy
import numpy as np
import torchvision
import numpy as np
from tqdm import tqdm
from scipy.cluster.hierarchy import DisjointSet
from scipy.spatial.transform import Rotation as R
from mast3r.utils.misc import hash_md5
from mast3r.fast_nn import extract_correspondences_nonsym, bruteforce_reciprocal_nns
import mast3r.utils.path_to_dust3r # noqa
from dust3r.utils.geometry import find_reciprocal_matches, xy_grid # noqa
def convert_im_matches_pairs(img0, img1, image_to_colmap, im_keypoints, matches_im0, matches_im1, viz):
if viz:
from matplotlib import pyplot as pl
image_mean = torch.as_tensor(
[0.5, 0.5, 0.5], device='cpu').reshape(1, 3, 1, 1)
image_std = torch.as_tensor(
[0.5, 0.5, 0.5], device='cpu').reshape(1, 3, 1, 1)
rgb0 = img0['img'] * image_std + image_mean
rgb0 = torchvision.transforms.functional.to_pil_image(rgb0[0])
rgb0 = np.array(rgb0)
rgb1 = img1['img'] * image_std + image_mean
rgb1 = torchvision.transforms.functional.to_pil_image(rgb1[0])
rgb1 = np.array(rgb1)
imgs = [rgb0, rgb1]
# visualize a few matches
n_viz = 100
num_matches = matches_im0.shape[0]
match_idx_to_viz = np.round(np.linspace(
0, num_matches - 1, n_viz)).astype(int)
viz_matches_im0, viz_matches_im1 = matches_im0[match_idx_to_viz], matches_im1[match_idx_to_viz]
H0, W0, H1, W1 = *imgs[0].shape[:2], *imgs[1].shape[:2]
rgb0 = np.pad(imgs[0], ((0, max(H1 - H0, 0)),
(0, 0), (0, 0)), 'constant', constant_values=0)
rgb1 = np.pad(imgs[1], ((0, max(H0 - H1, 0)),
(0, 0), (0, 0)), 'constant', constant_values=0)
img = np.concatenate((rgb0, rgb1), axis=1)
pl.figure()
pl.imshow(img)
cmap = pl.get_cmap('jet')
for ii in range(n_viz):
(x0, y0), (x1,
y1) = viz_matches_im0[ii].T, viz_matches_im1[ii].T
pl.plot([x0, x1 + W0], [y0, y1], '-+', color=cmap(ii /
(n_viz - 1)), scalex=False, scaley=False)
pl.show(block=True)
matches = [matches_im0.astype(np.float64), matches_im1.astype(np.float64)]
imgs = [img0, img1]
imidx0 = img0['idx']
imidx1 = img1['idx']
ravel_matches = []
for j in range(2):
H, W = imgs[j]['true_shape'][0]
with np.errstate(invalid='ignore'):
qx, qy = matches[j].round().astype(np.int32).T
ravel_matches_j = qx.clip(min=0, max=W - 1, out=qx) + W * qy.clip(min=0, max=H - 1, out=qy)
ravel_matches.append(ravel_matches_j)
imidxj = imgs[j]['idx']
for m in ravel_matches_j:
if m not in im_keypoints[imidxj]:
im_keypoints[imidxj][m] = 0
im_keypoints[imidxj][m] += 1
imid0 = copy.deepcopy(image_to_colmap[imidx0]['colmap_imid'])
imid1 = copy.deepcopy(image_to_colmap[imidx1]['colmap_imid'])
if imid0 > imid1:
colmap_matches = np.stack([ravel_matches[1], ravel_matches[0]], axis=-1)
imid0, imid1 = imid1, imid0
imidx0, imidx1 = imidx1, imidx0
else:
colmap_matches = np.stack([ravel_matches[0], ravel_matches[1]], axis=-1)
colmap_matches = np.unique(colmap_matches, axis=0)
return imidx0, imidx1, colmap_matches
def get_im_matches(pred1, pred2, pairs, image_to_colmap, im_keypoints, conf_thr,
is_sparse=True, subsample=8, pixel_tol=0, viz=False, device='cuda'):
im_matches = {}
for i in range(len(pred1['pts3d'])):
imidx0 = pairs[i][0]['idx']
imidx1 = pairs[i][1]['idx']
if 'desc' in pred1: # mast3r
descs = [pred1['desc'][i], pred2['desc'][i]]
confidences = [pred1['desc_conf'][i], pred2['desc_conf'][i]]
desc_dim = descs[0].shape[-1]
if is_sparse:
corres = extract_correspondences_nonsym(descs[0], descs[1], confidences[0], confidences[1],
device=device, subsample=subsample, pixel_tol=pixel_tol)
conf = corres[2]
mask = conf >= conf_thr
matches_im0 = corres[0][mask].cpu().numpy()
matches_im1 = corres[1][mask].cpu().numpy()
else:
confidence_masks = [confidences[0] >=
conf_thr, confidences[1] >= conf_thr]
pts2d_list, desc_list = [], []
for j in range(2):
conf_j = confidence_masks[j].cpu().numpy().flatten()
true_shape_j = pairs[i][j]['true_shape'][0]
pts2d_j = xy_grid(
true_shape_j[1], true_shape_j[0]).reshape(-1, 2)[conf_j]
desc_j = descs[j].detach().cpu(
).numpy().reshape(-1, desc_dim)[conf_j]
pts2d_list.append(pts2d_j)
desc_list.append(desc_j)
if len(desc_list[0]) == 0 or len(desc_list[1]) == 0:
continue
nn0, nn1 = bruteforce_reciprocal_nns(desc_list[0], desc_list[1],
device=device, dist='dot', block_size=2**13)
reciprocal_in_P0 = (nn1[nn0] == np.arange(len(nn0)))
matches_im1 = pts2d_list[1][nn0][reciprocal_in_P0]
matches_im0 = pts2d_list[0][reciprocal_in_P0]
else:
pts3d = [pred1['pts3d'][i], pred2['pts3d_in_other_view'][i]]
confidences = [pred1['conf'][i], pred2['conf'][i]]
if is_sparse:
corres = extract_correspondences_nonsym(pts3d[0], pts3d[1], confidences[0], confidences[1],
device=device, subsample=subsample, pixel_tol=pixel_tol,
ptmap_key='3d')
conf = corres[2]
mask = conf >= conf_thr
matches_im0 = corres[0][mask].cpu().numpy()
matches_im1 = corres[1][mask].cpu().numpy()
else:
confidence_masks = [confidences[0] >=
conf_thr, confidences[1] >= conf_thr]
# find 2D-2D matches between the two images
pts2d_list, pts3d_list = [], []
for j in range(2):
conf_j = confidence_masks[j].cpu().numpy().flatten()
true_shape_j = pairs[i][j]['true_shape'][0]
pts2d_j = xy_grid(true_shape_j[1], true_shape_j[0]).reshape(-1, 2)[conf_j]
pts3d_j = pts3d[j].detach().cpu().numpy().reshape(-1, 3)[conf_j]
pts2d_list.append(pts2d_j)
pts3d_list.append(pts3d_j)
PQ, PM = pts3d_list[0], pts3d_list[1]
if len(PQ) == 0 or len(PM) == 0:
continue
reciprocal_in_PM, nnM_in_PQ, num_matches = find_reciprocal_matches(
PQ, PM)
matches_im1 = pts2d_list[1][reciprocal_in_PM]
matches_im0 = pts2d_list[0][nnM_in_PQ][reciprocal_in_PM]
if len(matches_im0) == 0:
continue
imidx0, imidx1, colmap_matches = convert_im_matches_pairs(pairs[i][0], pairs[i][1],
image_to_colmap, im_keypoints,
matches_im0, matches_im1, viz)
im_matches[(imidx0, imidx1)] = colmap_matches
return im_matches
def get_im_matches_from_cache(pairs, cache_path, desc_conf, subsample,
image_to_colmap, im_keypoints, conf_thr,
viz=False, device='cuda'):
im_matches = {}
for i in range(len(pairs)):
imidx0 = pairs[i][0]['idx']
imidx1 = pairs[i][1]['idx']
corres_idx1 = hash_md5(pairs[i][0]['instance'])
corres_idx2 = hash_md5(pairs[i][1]['instance'])
path_corres = cache_path + f'/corres_conf={desc_conf}_{subsample=}/{corres_idx1}-{corres_idx2}.pth'
if os.path.isfile(path_corres):
score, (xy1, xy2, confs) = torch.load(path_corres, map_location=device)
else:
path_corres = cache_path + f'/corres_conf={desc_conf}_{subsample=}/{corres_idx2}-{corres_idx1}.pth'
score, (xy2, xy1, confs) = torch.load(path_corres, map_location=device)
mask = confs >= conf_thr
matches_im0 = xy1[mask].cpu().numpy()
matches_im1 = xy2[mask].cpu().numpy()
if len(matches_im0) == 0:
continue
imidx0, imidx1, colmap_matches = convert_im_matches_pairs(pairs[i][0], pairs[i][1],
image_to_colmap, im_keypoints,
matches_im0, matches_im1, viz)
im_matches[(imidx0, imidx1)] = colmap_matches
return im_matches
def export_images(db, images, image_paths, focals, ga_world_to_cam, camera_model):
# add cameras/images to the db
# with the output of ga as prior
image_to_colmap = {}
im_keypoints = {}
for idx in range(len(image_paths)):
im_keypoints[idx] = {}
H, W = images[idx]["orig_shape"]
if focals is None:
focal_x = focal_y = 1.2 * max(W, H)
prior_focal_length = False
cx = W / 2.0
cy = H / 2.0
elif isinstance(focals[idx], np.ndarray) and len(focals[idx].shape) == 2:
# intrinsics
focal_x = focals[idx][0, 0]
focal_y = focals[idx][1, 1]
cx = focals[idx][0, 2] * images[idx]["to_orig"][0, 0]
cy = focals[idx][1, 2] * images[idx]["to_orig"][1, 1]
prior_focal_length = True
else:
focal_x = focal_y = float(focals[idx])
prior_focal_length = True
cx = W / 2.0
cy = H / 2.0
focal_x = focal_x * images[idx]["to_orig"][0, 0]
focal_y = focal_y * images[idx]["to_orig"][1, 1]
if camera_model == "SIMPLE_PINHOLE":
model_id = 0
focal = (focal_x + focal_y) / 2.0
params = np.asarray([focal, cx, cy], np.float64)
elif camera_model == "PINHOLE":
model_id = 1
params = np.asarray([focal_x, focal_y, cx, cy], np.float64)
elif camera_model == "SIMPLE_RADIAL":
model_id = 2
focal = (focal_x + focal_y) / 2.0
params = np.asarray([focal, cx, cy, 0.0], np.float64)
elif camera_model == "OPENCV":
model_id = 4
params = np.asarray([focal_x, focal_y, cx, cy, 0.0, 0.0, 0.0, 0.0], np.float64)
else:
raise ValueError(f"invalid camera model {camera_model}")
H, W = int(H), int(W)
# OPENCV camera model
camid = db.add_camera(
model_id, W, H, params, prior_focal_length=prior_focal_length)
if ga_world_to_cam is None:
prior_t = np.zeros(3)
prior_q = np.zeros(4)
else:
q = R.from_matrix(ga_world_to_cam[idx][:3, :3]).as_quat()
prior_t = ga_world_to_cam[idx][:3, 3]
prior_q = np.array([q[-1], q[0], q[1], q[2]])
imid = db.add_image(
image_paths[idx], camid, prior_q=prior_q, prior_t=prior_t)
image_to_colmap[idx] = {
'colmap_imid': imid,
'colmap_camid': camid
}
return image_to_colmap, im_keypoints
def export_matches(db, images, image_to_colmap, im_keypoints, im_matches, min_len_track, skip_geometric_verification):
colmap_image_pairs = []
# 2D-2D are quite dense
# we want to remove the very small tracks
# and export only kpt for which we have values
# build tracks
print("building tracks")
keypoints_to_track_id = {}
track_id_to_kpt_list = []
to_merge = []
for (imidx0, imidx1), colmap_matches in tqdm(im_matches.items()):
if imidx0 not in keypoints_to_track_id:
keypoints_to_track_id[imidx0] = {}
if imidx1 not in keypoints_to_track_id:
keypoints_to_track_id[imidx1] = {}
for m in colmap_matches:
if m[0] not in keypoints_to_track_id[imidx0] and m[1] not in keypoints_to_track_id[imidx1]:
# new pair of kpts never seen before
track_idx = len(track_id_to_kpt_list)
keypoints_to_track_id[imidx0][m[0]] = track_idx
keypoints_to_track_id[imidx1][m[1]] = track_idx
track_id_to_kpt_list.append(
[(imidx0, m[0]), (imidx1, m[1])])
elif m[1] not in keypoints_to_track_id[imidx1]:
# 0 has a track, not 1
track_idx = keypoints_to_track_id[imidx0][m[0]]
keypoints_to_track_id[imidx1][m[1]] = track_idx
track_id_to_kpt_list[track_idx].append((imidx1, m[1]))
elif m[0] not in keypoints_to_track_id[imidx0]:
# 1 has a track, not 0
track_idx = keypoints_to_track_id[imidx1][m[1]]
keypoints_to_track_id[imidx0][m[0]] = track_idx
track_id_to_kpt_list[track_idx].append((imidx0, m[0]))
else:
# both have tracks, merge them
track_idx0 = keypoints_to_track_id[imidx0][m[0]]
track_idx1 = keypoints_to_track_id[imidx1][m[1]]
if track_idx0 != track_idx1:
# let's deal with them later
to_merge.append((track_idx0, track_idx1))
# regroup merge targets
print("merging tracks")
unique = np.unique(to_merge)
tree = DisjointSet(unique)
for track_idx0, track_idx1 in tqdm(to_merge):
tree.merge(track_idx0, track_idx1)
subsets = tree.subsets()
print("applying merge")
for setvals in tqdm(subsets):
new_trackid = len(track_id_to_kpt_list)
kpt_list = []
for track_idx in setvals:
kpt_list.extend(track_id_to_kpt_list[track_idx])
for imidx, kpid in track_id_to_kpt_list[track_idx]:
keypoints_to_track_id[imidx][kpid] = new_trackid
track_id_to_kpt_list.append(kpt_list)
# binc = np.bincount([len(v) for v in track_id_to_kpt_list])
# nonzero = np.nonzero(binc)
# nonzerobinc = binc[nonzero[0]]
# print(nonzero[0].tolist())
# print(nonzerobinc)
num_valid_tracks = sum(
[1 for v in track_id_to_kpt_list if len(v) >= min_len_track])
keypoints_to_idx = {}
print(f"squashing keypoints - {num_valid_tracks} valid tracks")
for imidx, keypoints_imid in tqdm(im_keypoints.items()):
imid = image_to_colmap[imidx]['colmap_imid']
keypoints_kept = []
keypoints_to_idx[imidx] = {}
for kp in keypoints_imid.keys():
if kp not in keypoints_to_track_id[imidx]:
continue
track_idx = keypoints_to_track_id[imidx][kp]
track_length = len(track_id_to_kpt_list[track_idx])
if track_length < min_len_track:
continue
keypoints_to_idx[imidx][kp] = len(keypoints_kept)
keypoints_kept.append(kp)
if len(keypoints_kept) == 0:
continue
keypoints_kept = np.array(keypoints_kept)
keypoints_kept = np.unravel_index(keypoints_kept, images[imidx]['true_shape'][0])[
0].base[:, ::-1].copy().astype(np.float32)
# rescale coordinates
keypoints_kept[:, 0] += 0.5
keypoints_kept[:, 1] += 0.5
keypoints_kept = geotrf(images[imidx]['to_orig'], keypoints_kept, norm=True)
H, W = images[imidx]['orig_shape']
keypoints_kept[:, 0] = keypoints_kept[:, 0].clip(min=0, max=W - 0.01)
keypoints_kept[:, 1] = keypoints_kept[:, 1].clip(min=0, max=H - 0.01)
db.add_keypoints(imid, keypoints_kept)
print("exporting im_matches")
for (imidx0, imidx1), colmap_matches in im_matches.items():
imid0, imid1 = image_to_colmap[imidx0]['colmap_imid'], image_to_colmap[imidx1]['colmap_imid']
assert imid0 < imid1
final_matches = np.array([[keypoints_to_idx[imidx0][m[0]], keypoints_to_idx[imidx1][m[1]]]
for m in colmap_matches
if m[0] in keypoints_to_idx[imidx0] and m[1] in keypoints_to_idx[imidx1]])
if len(final_matches) > 0:
colmap_image_pairs.append(
(images[imidx0]['instance'], images[imidx1]['instance']))
db.add_matches(imid0, imid1, final_matches)
if skip_geometric_verification:
db.add_two_view_geometry(imid0, imid1, final_matches)
return colmap_image_pairs