|
import os |
|
from skimage import io, img_as_float32 |
|
from skimage.color import gray2rgb |
|
from sklearn.model_selection import train_test_split |
|
from imageio import mimread |
|
from skimage.transform import resize |
|
import numpy as np |
|
from torch.utils.data import Dataset |
|
from augmentation import AllAugmentationTransform |
|
import glob |
|
from functools import partial |
|
|
|
|
|
def read_video(name, frame_shape): |
|
""" |
|
Read video which can be: |
|
- an image of concatenated frames |
|
- '.mp4' and'.gif' |
|
- folder with videos |
|
""" |
|
|
|
if os.path.isdir(name): |
|
frames = sorted(os.listdir(name)) |
|
num_frames = len(frames) |
|
video_array = np.array( |
|
[img_as_float32(io.imread(os.path.join(name, frames[idx]))) for idx in range(num_frames)]) |
|
elif name.lower().endswith('.png') or name.lower().endswith('.jpg'): |
|
image = io.imread(name) |
|
|
|
if len(image.shape) == 2 or image.shape[2] == 1: |
|
image = gray2rgb(image) |
|
|
|
if image.shape[2] == 4: |
|
image = image[..., :3] |
|
|
|
image = img_as_float32(image) |
|
|
|
video_array = np.moveaxis(image, 1, 0) |
|
|
|
video_array = video_array.reshape((-1,) + frame_shape) |
|
video_array = np.moveaxis(video_array, 1, 2) |
|
elif name.lower().endswith('.gif') or name.lower().endswith('.mp4') or name.lower().endswith('.mov'): |
|
video = mimread(name) |
|
if len(video[0].shape) == 2: |
|
video = [gray2rgb(frame) for frame in video] |
|
if frame_shape is not None: |
|
video = np.array([resize(frame, frame_shape) for frame in video]) |
|
video = np.array(video) |
|
if video.shape[-1] == 4: |
|
video = video[..., :3] |
|
video_array = img_as_float32(video) |
|
else: |
|
raise Exception("Unknown file extensions %s" % name) |
|
|
|
return video_array |
|
|
|
|
|
class FramesDataset(Dataset): |
|
""" |
|
Dataset of videos, each video can be represented as: |
|
- an image of concatenated frames |
|
- '.mp4' or '.gif' |
|
- folder with all frames |
|
""" |
|
|
|
def __init__(self, root_dir, frame_shape=(256, 256, 3), id_sampling=False, is_train=True, |
|
random_seed=0, pairs_list=None, augmentation_params=None): |
|
self.root_dir = root_dir |
|
self.videos = os.listdir(root_dir) |
|
self.frame_shape = frame_shape |
|
print(self.frame_shape) |
|
self.pairs_list = pairs_list |
|
self.id_sampling = id_sampling |
|
|
|
if os.path.exists(os.path.join(root_dir, 'train')): |
|
assert os.path.exists(os.path.join(root_dir, 'test')) |
|
print("Use predefined train-test split.") |
|
if id_sampling: |
|
train_videos = {os.path.basename(video).split('#')[0] for video in |
|
os.listdir(os.path.join(root_dir, 'train'))} |
|
train_videos = list(train_videos) |
|
else: |
|
train_videos = os.listdir(os.path.join(root_dir, 'train')) |
|
test_videos = os.listdir(os.path.join(root_dir, 'test')) |
|
self.root_dir = os.path.join(self.root_dir, 'train' if is_train else 'test') |
|
else: |
|
print("Use random train-test split.") |
|
train_videos, test_videos = train_test_split(self.videos, random_state=random_seed, test_size=0.2) |
|
|
|
if is_train: |
|
self.videos = train_videos |
|
else: |
|
self.videos = test_videos |
|
|
|
self.is_train = is_train |
|
|
|
if self.is_train: |
|
self.transform = AllAugmentationTransform(**augmentation_params) |
|
else: |
|
self.transform = None |
|
|
|
def __len__(self): |
|
return len(self.videos) |
|
|
|
def __getitem__(self, idx): |
|
|
|
if self.is_train and self.id_sampling: |
|
name = self.videos[idx] |
|
path = np.random.choice(glob.glob(os.path.join(self.root_dir, name + '*.mp4'))) |
|
else: |
|
name = self.videos[idx] |
|
path = os.path.join(self.root_dir, name) |
|
|
|
video_name = os.path.basename(path) |
|
if self.is_train and os.path.isdir(path): |
|
|
|
frames = os.listdir(path) |
|
num_frames = len(frames) |
|
frame_idx = np.sort(np.random.choice(num_frames, replace=True, size=2)) |
|
|
|
if self.frame_shape is not None: |
|
resize_fn = partial(resize, output_shape=self.frame_shape) |
|
else: |
|
resize_fn = img_as_float32 |
|
|
|
if type(frames[0]) is bytes: |
|
video_array = [resize_fn(io.imread(os.path.join(path, frames[idx].decode('utf-8')))) for idx in |
|
frame_idx] |
|
else: |
|
video_array = [resize_fn(io.imread(os.path.join(path, frames[idx]))) for idx in frame_idx] |
|
else: |
|
|
|
video_array = read_video(path, frame_shape=self.frame_shape) |
|
|
|
num_frames = len(video_array) |
|
frame_idx = np.sort(np.random.choice(num_frames, replace=True, size=2)) if self.is_train else range( |
|
num_frames) |
|
video_array = video_array[frame_idx] |
|
|
|
|
|
if self.transform is not None: |
|
video_array = self.transform(video_array) |
|
|
|
out = {} |
|
if self.is_train: |
|
source = np.array(video_array[0], dtype='float32') |
|
driving = np.array(video_array[1], dtype='float32') |
|
|
|
out['driving'] = driving.transpose((2, 0, 1)) |
|
out['source'] = source.transpose((2, 0, 1)) |
|
else: |
|
video = np.array(video_array, dtype='float32') |
|
out['video'] = video.transpose((3, 0, 1, 2)) |
|
|
|
out['name'] = video_name |
|
return out |
|
|
|
|
|
class DatasetRepeater(Dataset): |
|
""" |
|
Pass several times over the same dataset for better i/o performance |
|
""" |
|
|
|
def __init__(self, dataset, num_repeats=100): |
|
self.dataset = dataset |
|
self.num_repeats = num_repeats |
|
|
|
def __len__(self): |
|
return self.num_repeats * self.dataset.__len__() |
|
|
|
def __getitem__(self, idx): |
|
return self.dataset[idx % self.dataset.__len__()] |
|
|
|
|