BioTrove-Demo / make_txt_embedding.py
Ben Feuer
BioTrove Demo
ec3c973
raw
history blame
6.3 kB
"""
Makes the entire set of text emebeddings for all possible names in the tree of life.
Uses the catalog.csv file from TreeOfLife-10M.
"""
import argparse
import csv
import json
import os
import logging
import numpy as np
import torch
import torch.nn.functional as F
from open_clip import create_model, get_tokenizer
from tqdm import tqdm
import lib
from templates import openai_imagenet_template
log_format = "[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s"
logging.basicConfig(level=logging.INFO, format=log_format)
logger = logging.getLogger()
model_str = "hf-hub:imageomics/bioclip"
tokenizer_str = "ViT-B-16"
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
ranks = ("Kingdom", "Phylum", "Class", "Order", "Family", "Genus", "Species")
@torch.no_grad()
def write_txt_features(name_lookup):
if os.path.isfile(args.out_path):
all_features = np.load(args.out_path)
else:
all_features = np.zeros((512, len(name_lookup)), dtype=np.float32)
batch_size = args.batch_size // len(openai_imagenet_template)
for batch, (names, indices) in enumerate(
tqdm(
lib.batched(name_lookup.values(), batch_size),
desc="txt feats",
total=len(name_lookup) // batch_size,
)
):
# Skip if any non-zero elements
if all_features[:, indices].any():
logger.info(f"Skipping batch {batch}")
continue
txts = [
template(name) for name in names for template in openai_imagenet_template
]
txts = tokenizer(txts).to(device)
txt_features = model.encode_text(txts)
txt_features = torch.reshape(
txt_features, (len(names), len(openai_imagenet_template), 512)
)
txt_features = F.normalize(txt_features, dim=2).mean(dim=1)
txt_features /= txt_features.norm(dim=1, keepdim=True)
all_features[:, indices] = txt_features.T.cpu().numpy()
if batch % 100 == 0:
np.save(args.out_path, all_features)
np.save(args.out_path, all_features)
def convert_txt_features_to_avgs(name_lookup):
assert os.path.isfile(args.out_path)
# Put that big boy on the GPU. We're going fast.
all_features = torch.from_numpy(np.load(args.out_path)).to(device)
logger.info("Loaded text features from disk to %s.", device)
names_by_rank = [set() for rank in ranks]
for name, index in tqdm(name_lookup.values()):
i = len(name) - 1
names_by_rank[i].add((name, index))
zeroed = 0
for i, rank in reversed(list(enumerate(ranks))):
if rank == "Species":
continue
for name, index in tqdm(names_by_rank[i], desc=rank):
species = tuple(
zip(
*(
(d, i)
for d, i in name_lookup.descendants(prefix=name)
if len(d) >= 6
)
)
)
if not species:
logger.warning("No species for %s.", " ".join(name))
all_features[:, index] = 0.0
zeroed += 1
continue
values, indices = species
mean = all_features[:, indices].mean(dim=1)
all_features[:, index] = F.normalize(mean, dim=0)
out_path, ext = os.path.splitext(args.out_path)
np.save(f"{out_path}_avgs{ext}", all_features.cpu().numpy())
if zeroed:
logger.warning(
"Zeroed out %d nodes because they didn't have any genus or species-level labels.",
zeroed,
)
def convert_txt_features_to_species_only(name_lookup):
assert os.path.isfile(args.out_path)
all_features = np.load(args.out_path)
logger.info("Loaded text features from disk.")
species = [(d, i) for d, i in name_lookup.descendants() if len(d) == 7]
species_features = np.zeros((512, len(species)), dtype=np.float32)
species_names = [""] * len(species)
for new_i, (name, old_i) in enumerate(tqdm(species)):
species_features[:, new_i] = all_features[:, old_i]
species_names[new_i] = name
out_path, ext = os.path.splitext(args.out_path)
np.save(f"{out_path}_species{ext}", species_features)
with open(f"{out_path}_species.json", "w") as fd:
json.dump(species_names, fd, indent=2)
def get_name_lookup(catalog_path, cache_path):
if os.path.isfile(cache_path):
with open(cache_path) as fd:
lookup = lib.TaxonomicTree.from_dict(json.load(fd))
return lookup
lookup = lib.TaxonomicTree()
with open(catalog_path) as fd:
reader = csv.DictReader(fd)
for row in tqdm(reader, desc="catalog"):
name = [
row["kingdom"],
row["phylum"],
row["class"],
row["order"],
row["family"],
row["genus"],
row["species"],
]
if any(not value for value in name):
name = name[: name.index("")]
lookup.add(name)
with open(args.name_cache_path, "w") as fd:
json.dump(lookup, fd, cls=lib.TaxonomicJsonEncoder)
return lookup
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--catalog-path",
help="Path to the catalog.csv file from TreeOfLife-10M.",
required=True,
)
parser.add_argument("--out-path", help="Path to the output file.", required=True)
parser.add_argument(
"--name-cache-path",
help="Path to the name cache file.",
default="name_lookup.json",
)
parser.add_argument("--batch-size", help="Batch size.", default=2**15, type=int)
args = parser.parse_args()
name_lookup = get_name_lookup(args.catalog_path, cache_path=args.name_cache_path)
logger.info("Got name lookup.")
model = create_model(model_str, output_dict=True, require_pretrained=True)
model = model.to(device)
logger.info("Created model.")
model = torch.compile(model)
logger.info("Compiled model.")
tokenizer = get_tokenizer(tokenizer_str)
write_txt_features(name_lookup)
convert_txt_features_to_avgs(name_lookup)
convert_txt_features_to_species_only(name_lookup)