import argparse import collections.abc as collections from pathlib import Path from typing import Optional import h5py import numpy as np import torch from . import logger from .utils.io import list_h5_names from .utils.parsers import parse_image_lists from .utils.read_write_model import read_images_binary def parse_names(prefix, names, names_all): if prefix is not None: if not isinstance(prefix, str): prefix = tuple(prefix) names = [n for n in names_all if n.startswith(prefix)] if len(names) == 0: raise ValueError( f"Could not find any image with the prefix `{prefix}`." ) elif names is not None: if isinstance(names, (str, Path)): names = parse_image_lists(names) elif isinstance(names, collections.Iterable): names = list(names) else: raise ValueError( f"Unknown type of image list: {names}." "Provide either a list or a path to a list file." ) else: names = names_all return names def get_descriptors(names, path, name2idx=None, key="global_descriptor"): if name2idx is None: with h5py.File(str(path), "r", libver="latest") as fd: desc = [fd[n][key].__array__() for n in names] else: desc = [] for n in names: with h5py.File(str(path[name2idx[n]]), "r", libver="latest") as fd: desc.append(fd[n][key].__array__()) return torch.from_numpy(np.stack(desc, 0)).float() def pairs_from_score_matrix( scores: torch.Tensor, invalid: np.array, num_select: int, min_score: Optional[float] = None, ): assert scores.shape == invalid.shape if isinstance(scores, np.ndarray): scores = torch.from_numpy(scores) invalid = torch.from_numpy(invalid).to(scores.device) if min_score is not None: invalid |= scores < min_score scores.masked_fill_(invalid, float("-inf")) topk = torch.topk(scores, num_select, dim=1) indices = topk.indices.cpu().numpy() valid = topk.values.isfinite().cpu().numpy() pairs = [] for i, j in zip(*np.where(valid)): pairs.append((i, indices[i, j])) return pairs def main( descriptors, output, num_matched, query_prefix=None, query_list=None, db_prefix=None, db_list=None, db_model=None, db_descriptors=None, ): logger.info("Extracting image pairs from a retrieval database.") # We handle multiple reference feature files. # We only assume that names are unique among them and map names to files. if db_descriptors is None: db_descriptors = descriptors if isinstance(db_descriptors, (Path, str)): db_descriptors = [db_descriptors] name2db = { n: i for i, p in enumerate(db_descriptors) for n in list_h5_names(p) } db_names_h5 = list(name2db.keys()) query_names_h5 = list_h5_names(descriptors) if db_model: images = read_images_binary(db_model / "images.bin") db_names = [i.name for i in images.values()] else: db_names = parse_names(db_prefix, db_list, db_names_h5) if len(db_names) == 0: raise ValueError("Could not find any database image.") query_names = parse_names(query_prefix, query_list, query_names_h5) device = "cuda" if torch.cuda.is_available() else "cpu" db_desc = get_descriptors(db_names, db_descriptors, name2db) query_desc = get_descriptors(query_names, descriptors) sim = torch.einsum("id,jd->ij", query_desc.to(device), db_desc.to(device)) # Avoid self-matching self = np.array(query_names)[:, None] == np.array(db_names)[None] pairs = pairs_from_score_matrix(sim, self, num_matched, min_score=0) pairs = [(query_names[i], db_names[j]) for i, j in pairs] logger.info(f"Found {len(pairs)} pairs.") with open(output, "w") as f: f.write("\n".join(" ".join([i, j]) for i, j in pairs)) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--descriptors", type=Path, required=True) parser.add_argument("--output", type=Path, required=True) parser.add_argument("--num_matched", type=int, required=True) parser.add_argument("--query_prefix", type=str, nargs="+") parser.add_argument("--query_list", type=Path) parser.add_argument("--db_prefix", type=str, nargs="+") parser.add_argument("--db_list", type=Path) parser.add_argument("--db_model", type=Path) parser.add_argument("--db_descriptors", type=Path) args = parser.parse_args() main(**args.__dict__)