AniDoc / cotracker /datasets /tap_vid_datasets.py
fffiloni's picture
Migrated from GitHub
c705408 verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
import os
import io
import glob
import torch
import pickle
import numpy as np
import mediapy as media
from PIL import Image
from typing import Mapping, Tuple, Union
from cotracker.datasets.utils import CoTrackerData
DatasetElement = Mapping[str, Mapping[str, Union[np.ndarray, str]]]
def resize_video(video: np.ndarray, output_size: Tuple[int, int]) -> np.ndarray:
"""Resize a video to output_size."""
# If you have a GPU, consider replacing this with a GPU-enabled resize op,
# such as a jitted jax.image.resize. It will make things faster.
return media.resize_video(video, output_size)
def sample_queries_first(
target_occluded: np.ndarray,
target_points: np.ndarray,
frames: np.ndarray,
) -> Mapping[str, np.ndarray]:
"""Package a set of frames and tracks for use in TAPNet evaluations.
Given a set of frames and tracks with no query points, use the first
visible point in each track as the query.
Args:
target_occluded: Boolean occlusion flag, of shape [n_tracks, n_frames],
where True indicates occluded.
target_points: Position, of shape [n_tracks, n_frames, 2], where each point
is [x,y] scaled between 0 and 1.
frames: Video tensor, of shape [n_frames, height, width, 3]. Scaled between
-1 and 1.
Returns:
A dict with the keys:
video: Video tensor of shape [1, n_frames, height, width, 3]
query_points: Query points of shape [1, n_queries, 3] where
each point is [t, y, x] scaled to the range [-1, 1]
target_points: Target points of shape [1, n_queries, n_frames, 2] where
each point is [x, y] scaled to the range [-1, 1]
"""
valid = np.sum(~target_occluded, axis=1) > 0
target_points = target_points[valid, :]
target_occluded = target_occluded[valid, :]
query_points = []
for i in range(target_points.shape[0]):
index = np.where(target_occluded[i] == 0)[0][0]
x, y = target_points[i, index, 0], target_points[i, index, 1]
query_points.append(np.array([index, y, x])) # [t, y, x]
query_points = np.stack(query_points, axis=0)
return {
"video": frames[np.newaxis, ...],
"query_points": query_points[np.newaxis, ...],
"target_points": target_points[np.newaxis, ...],
"occluded": target_occluded[np.newaxis, ...],
}
def sample_queries_strided(
target_occluded: np.ndarray,
target_points: np.ndarray,
frames: np.ndarray,
query_stride: int = 5,
) -> Mapping[str, np.ndarray]:
"""Package a set of frames and tracks for use in TAPNet evaluations.
Given a set of frames and tracks with no query points, sample queries
strided every query_stride frames, ignoring points that are not visible
at the selected frames.
Args:
target_occluded: Boolean occlusion flag, of shape [n_tracks, n_frames],
where True indicates occluded.
target_points: Position, of shape [n_tracks, n_frames, 2], where each point
is [x,y] scaled between 0 and 1.
frames: Video tensor, of shape [n_frames, height, width, 3]. Scaled between
-1 and 1.
query_stride: When sampling query points, search for un-occluded points
every query_stride frames and convert each one into a query.
Returns:
A dict with the keys:
video: Video tensor of shape [1, n_frames, height, width, 3]. The video
has floats scaled to the range [-1, 1].
query_points: Query points of shape [1, n_queries, 3] where
each point is [t, y, x] scaled to the range [-1, 1].
target_points: Target points of shape [1, n_queries, n_frames, 2] where
each point is [x, y] scaled to the range [-1, 1].
trackgroup: Index of the original track that each query point was
sampled from. This is useful for visualization.
"""
tracks = []
occs = []
queries = []
trackgroups = []
total = 0
trackgroup = np.arange(target_occluded.shape[0])
for i in range(0, target_occluded.shape[1], query_stride):
mask = target_occluded[:, i] == 0
query = np.stack(
[
i * np.ones(target_occluded.shape[0:1]),
target_points[:, i, 1],
target_points[:, i, 0],
],
axis=-1,
)
queries.append(query[mask])
tracks.append(target_points[mask])
occs.append(target_occluded[mask])
trackgroups.append(trackgroup[mask])
total += np.array(np.sum(target_occluded[:, i] == 0))
return {
"video": frames[np.newaxis, ...],
"query_points": np.concatenate(queries, axis=0)[np.newaxis, ...],
"target_points": np.concatenate(tracks, axis=0)[np.newaxis, ...],
"occluded": np.concatenate(occs, axis=0)[np.newaxis, ...],
"trackgroup": np.concatenate(trackgroups, axis=0)[np.newaxis, ...],
}
class TapVidDataset(torch.utils.data.Dataset):
def __init__(
self,
data_root,
dataset_type="davis",
resize_to_256=True,
queried_first=True,
):
self.dataset_type = dataset_type
self.resize_to_256 = resize_to_256
self.queried_first = queried_first
if self.dataset_type == "kinetics":
all_paths = glob.glob(os.path.join(data_root, "*_of_0010.pkl"))
points_dataset = []
for pickle_path in all_paths:
with open(pickle_path, "rb") as f:
data = pickle.load(f)
points_dataset = points_dataset + data
self.points_dataset = points_dataset
else:
with open(data_root, "rb") as f:
self.points_dataset = pickle.load(f)
if self.dataset_type == "davis":
self.video_names = list(self.points_dataset.keys())
print("found %d unique videos in %s" % (len(self.points_dataset), data_root))
def __getitem__(self, index):
if self.dataset_type == "davis":
video_name = self.video_names[index]
else:
video_name = index
video = self.points_dataset[video_name]
frames = video["video"]
if isinstance(frames[0], bytes):
# TAP-Vid is stored and JPEG bytes rather than `np.ndarray`s.
def decode(frame):
byteio = io.BytesIO(frame)
img = Image.open(byteio)
return np.array(img)
frames = np.array([decode(frame) for frame in frames])
target_points = self.points_dataset[video_name]["points"]
if self.resize_to_256:
frames = resize_video(frames, [256, 256])
target_points *= np.array([255, 255]) # 1 should be mapped to 256-1
else:
target_points *= np.array([frames.shape[2] - 1, frames.shape[1] - 1])
target_occ = self.points_dataset[video_name]["occluded"]
if self.queried_first:
converted = sample_queries_first(target_occ, target_points, frames)
else:
converted = sample_queries_strided(target_occ, target_points, frames)
assert converted["target_points"].shape[1] == converted["query_points"].shape[1]
trajs = torch.from_numpy(converted["target_points"])[0].permute(1, 0, 2).float() # T, N, D
rgbs = torch.from_numpy(frames).permute(0, 3, 1, 2).float()
visibles = torch.logical_not(torch.from_numpy(converted["occluded"]))[0].permute(
1, 0
) # T, N
query_points = torch.from_numpy(converted["query_points"])[0] # T, N
return CoTrackerData(
rgbs,
trajs,
visibles,
seq_name=str(video_name),
query_points=query_points,
)
def __len__(self):
return len(self.points_dataset)