Spaces:
Runtime error
Runtime error
""" | |
Extract faces | |
Video Face Manipulation Detection Through Ensemble of CNNs | |
Image and Sound Processing Lab - Politecnico di Milano | |
Nicolò Bonettini | |
Edoardo Daniele Cannas | |
Sara Mandelli | |
Luca Bondi | |
Paolo Bestagini | |
""" | |
import argparse | |
import sys | |
import traceback | |
from concurrent.futures import ThreadPoolExecutor | |
from functools import partial | |
from pathlib import Path | |
from typing import Tuple, List | |
import numpy as np | |
import pandas as pd | |
import torch | |
import torch.cuda | |
from PIL import Image | |
from tqdm import tqdm | |
import blazeface | |
from blazeface import BlazeFace, VideoReader, FaceExtractor | |
from isplutils.utils import adapt_bb | |
def parse_args(argv): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--source', type=Path, help='Videos root directory', required=True) | |
parser.add_argument('--videodf', type=Path, help='Path to read the videos DataFrame', required=True) | |
parser.add_argument('--facesfolder', type=Path, help='Faces output root directory', required=True) | |
parser.add_argument('--facesdf', type=Path, help='Path to save the output DataFrame of faces', required=True) | |
parser.add_argument('--checkpoint', type=Path, help='Path to save the temporary per-video outputs', required=True) | |
parser.add_argument('--fpv', type=int, default=32, help='Frames per video') | |
parser.add_argument('--device', type=torch.device, | |
default=torch.device('cuda:0' if torch.cuda.is_available() else 'cpu'), | |
help='Device to use for face extraction') | |
parser.add_argument('--collateonly', help='Only perform collation of pre-existing results', action='store_true') | |
parser.add_argument('--noindex', help='Do not rebuild the index', action='store_false') | |
parser.add_argument('--batch', type=int, help='Batch size', default=16) | |
parser.add_argument('--threads', type=int, help='Number of threads', default=8) | |
parser.add_argument('--offset', type=int, help='Offset to start extraction', default=0) | |
parser.add_argument('--num', type=int, help='Number of videos to process', default=0) | |
parser.add_argument('--lazycheck', action='store_true', help='Lazy check of existing video indexes') | |
parser.add_argument('--deepcheck', action='store_true', help='Try to open every image') | |
return parser.parse_args(argv) | |
def main(argv): | |
args = parse_args(argv) | |
## Parameters parsing | |
device: torch.device = args.device | |
source_dir: Path = args.source | |
facedestination_dir: Path = args.facesfolder | |
frames_per_video: int = args.fpv | |
videodataset_path: Path = args.videodf | |
facesdataset_path: Path = args.facesdf | |
collateonly: bool = args.collateonly | |
batch_size: int = args.batch | |
threads: int = args.threads | |
offset: int = args.offset | |
num: int = args.num | |
lazycheck: bool = args.lazycheck | |
deepcheck: bool = args.deepcheck | |
checkpoint_folder: Path = args.checkpoint | |
index_enable: bool = args.noindex | |
## Parameters | |
face_size = 512 | |
print('Loading video DataFrame') | |
df_videos = pd.read_pickle(videodataset_path) | |
if num > 0: | |
df_videos_process = df_videos.iloc[offset:offset + num] | |
else: | |
df_videos_process = df_videos.iloc[offset:] | |
if not collateonly: | |
## Blazeface loading | |
print('Loading face extractor') | |
facedet = BlazeFace().to(device) | |
facedet.load_weights("blazeface/blazeface.pth") | |
facedet.load_anchors("blazeface/anchors.npy") | |
videoreader = VideoReader(verbose=False) | |
video_read_fn = lambda x: videoreader.read_frames(x, num_frames=frames_per_video) | |
face_extractor = FaceExtractor(video_read_fn, facedet) | |
## Face extraction | |
with ThreadPoolExecutor(threads) as p: | |
for batch_idx0 in tqdm(np.arange(start=0, stop=len(df_videos_process), step=batch_size), | |
desc='Extracting faces'): | |
tosave_list = list(p.map(partial(process_video, | |
source_dir=source_dir, | |
facedestination_dir=facedestination_dir, | |
checkpoint_folder=checkpoint_folder, | |
face_size=face_size, | |
face_extractor=face_extractor, | |
lazycheck=lazycheck, | |
deepcheck=deepcheck, | |
), | |
df_videos_process.iloc[batch_idx0:batch_idx0 + batch_size].iterrows())) | |
for tosave in tosave_list: | |
if tosave is not None: | |
if len(tosave[2]): | |
list(p.map(save_jpg, tosave[2])) | |
tosave[1].parent.mkdir(parents=True, exist_ok=True) | |
tosave[0].to_pickle(str(tosave[1])) | |
if index_enable: | |
# Collect checkpoints | |
df_videos['nfaces'] = np.zeros(len(df_videos), np.uint8) | |
faces_dataset = [] | |
for idx, record in tqdm(df_videos.iterrows(), total=len(df_videos), desc='Collecting faces results'): | |
# Checkpoint | |
video_face_checkpoint_path = checkpoint_folder.joinpath(record['path']).with_suffix('.faces.pkl') | |
if video_face_checkpoint_path.exists(): | |
try: | |
df_video_faces = pd.read_pickle(str(video_face_checkpoint_path)) | |
# Fix same attribute issue | |
df_video_faces = df_video_faces.rename(columns={'subject': 'videosubject'}, errors='ignore') | |
nfaces = len( | |
np.unique(df_video_faces.index.map(lambda x: int(x.split('_subj')[1].split('.jpg')[0])))) | |
df_videos.loc[idx, 'nfaces'] = nfaces | |
faces_dataset.append(df_video_faces) | |
except Exception as e: | |
print('Error while reading: {}'.format(video_face_checkpoint_path)) | |
print(e) | |
video_face_checkpoint_path.unlink() | |
if len(faces_dataset) == 0: | |
raise ValueError(f'No checkpoint found from face extraction. ' | |
f'Is the the source path {source_dir} correct for the videos in your dataframe?') | |
# Save videos with updated faces | |
print('Saving videos DataFrame to {}'.format(videodataset_path)) | |
df_videos.to_pickle(str(videodataset_path)) | |
if offset > 0: | |
if num > 0: | |
if facesdataset_path.is_dir(): | |
facesdataset_path = facesdataset_path.joinpath( | |
'faces_df_from_video_{}_to_video_{}.pkl'.format(offset, num + offset)) | |
else: | |
facesdataset_path = facesdataset_path.parent.joinpath( | |
str(facesdataset_path.parts[-1]).split('.')[0] + '_from_video_{}_to_video_{}.pkl'.format(offset, | |
num + offset)) | |
else: | |
if facesdataset_path.is_dir(): | |
facesdataset_path = facesdataset_path.joinpath('faces_df_from_video_{}.pkl'.format(offset)) | |
else: | |
facesdataset_path = facesdataset_path.parent.joinpath( | |
str(facesdataset_path.parts[-1]).split('.')[0] + '_from_video_{}.pkl'.format(offset)) | |
elif num > 0: | |
if facesdataset_path.is_dir(): | |
facesdataset_path = facesdataset_path.joinpath( | |
'faces_df_from_video_{}_to_video_{}.pkl'.format(0, num)) | |
else: | |
facesdataset_path = facesdataset_path.parent.joinpath( | |
str(facesdataset_path.parts[-1]).split('.')[0] + '_from_video_{}_to_video_{}.pkl'.format(0, num)) | |
else: | |
if facesdataset_path.is_dir(): | |
facesdataset_path = facesdataset_path.joinpath('faces_df.pkl') # just a check if the path is a dir | |
# Creates directory (if doesn't exist) | |
facesdataset_path.parent.mkdir(parents=True, exist_ok=True) | |
print('Saving faces DataFrame to {}'.format(facesdataset_path)) | |
df_faces = pd.concat(faces_dataset, axis=0, ) | |
df_faces['video'] = df_faces['video'].astype('category') | |
for key in ['kp1x', 'kp1y', 'kp2x', 'kp2y', 'kp3x', | |
'kp3y', 'kp4x', 'kp4y', 'kp5x', 'kp5y', 'kp6x', 'kp6y', 'left', | |
'top', 'right', 'bottom', ]: | |
df_faces[key] = df_faces[key].astype(np.int16) | |
df_faces['videosubject'] = df_faces['videosubject'].astype(np.int8) | |
# Eventually remove duplicates | |
df_faces = df_faces.loc[~df_faces.index.duplicated(keep='first')] | |
fields_to_preserve_from_video = [i for i in | |
['folder', 'subject', 'scene', 'cluster', 'nfaces', 'test'] if | |
i in df_videos] | |
df_faces = pd.merge(df_faces, df_videos[fields_to_preserve_from_video], left_on='video', | |
right_index=True) | |
df_faces.to_pickle(str(facesdataset_path)) | |
print('Completed!') | |
def save_jpg(args: Tuple[Image.Image, Path or str]): | |
image, path = args | |
image.save(path, quality=95, subsampling='4:4:4') | |
def process_video(item: Tuple[pd.Index, pd.Series], | |
source_dir: Path, | |
facedestination_dir: Path, | |
checkpoint_folder: Path, | |
face_size: int, | |
face_extractor: FaceExtractor, | |
lazycheck: bool = False, | |
deepcheck: bool = False, | |
) -> (pd.DataFrame, Path, List[Tuple[Image.Image, Path]]) or None: | |
# Instatiate Index and Series | |
idx, record = item | |
# Checkpoint | |
video_faces_checkpoint_path = checkpoint_folder.joinpath(record['path']).with_suffix('.faces.pkl') | |
if not lazycheck: | |
if video_faces_checkpoint_path.exists(): | |
try: | |
df_video_faces = pd.read_pickle(str(video_faces_checkpoint_path)) | |
for _, r in df_video_faces.iterrows(): | |
face_path = facedestination_dir.joinpath(r.name) | |
assert (face_path.exists()) | |
if deepcheck: | |
img = Image.open(face_path) | |
img_arr = np.asarray(img) | |
assert (img_arr.ndim == 3) | |
assert (np.prod(img_arr.shape) > 0) | |
except Exception as e: | |
print('Error while checking: {}'.format(video_faces_checkpoint_path)) | |
print(e) | |
video_faces_checkpoint_path.unlink() | |
if not (video_faces_checkpoint_path.exists()): | |
try: | |
video_face_dict_list = [] | |
# Load faces | |
current_video_path = source_dir.joinpath(record['path']) | |
if not current_video_path.exists(): | |
raise FileNotFoundError(f'Unable to find {current_video_path}.' | |
f'Are you sure that {source_dir} is the correct source directory for the video ' | |
f'you indexed in the dataframe?') | |
frames = face_extractor.process_video(current_video_path) | |
if len(frames) == 0: | |
return | |
face_extractor.keep_only_best_face(frames) | |
for frame_idx, frame in enumerate(frames): | |
frames[frame_idx]['subjects'] = [0] * len(frames[frame_idx]['detections']) | |
# Extract and save faces, bounding boxes, keypoints | |
images_to_save: List[Tuple[Image.Image, Path]] = [] | |
for frame_idx, frame in enumerate(frames): | |
if len(frames[frame_idx]['detections']): | |
fullframe = Image.fromarray(frames[frame_idx]['frame']) | |
# Preserve the only found face even if not a good one, otherwise preserve only clusters > -1 | |
subjects = np.unique(frames[frame_idx]['subjects']) | |
if len(subjects) > 1: | |
subjects = np.asarray([s for s in subjects if s > -1]) | |
for face_idx, _ in enumerate(frame['faces']): | |
subj_id = frames[frame_idx]['subjects'][face_idx] | |
if subj_id in subjects: # Exclude outliers if other faces detected | |
face_path = facedestination_dir.joinpath(record['path'], 'fr{:03d}_subj{:1d}.jpg'.format( | |
frames[frame_idx]['frame_idx'], subj_id)) | |
face_dict = {'facepath': str(face_path.relative_to(facedestination_dir)), 'video': idx, | |
'label': record['label'], 'videosubject': subj_id, | |
'original': record['original']} | |
# add attibutes for ff++ | |
if 'class' in record.keys(): | |
face_dict.update({'class': record['class']}) | |
if 'source' in record.keys(): | |
face_dict.update({'source': record['source']}) | |
if 'quality' in record.keys(): | |
face_dict.update({'quality': record['quality']}) | |
for field_idx, key in enumerate(blazeface.BlazeFace.detection_keys): | |
face_dict[key] = frames[frame_idx]['detections'][face_idx][field_idx] | |
cropping_bb = adapt_bb(frame_height=fullframe.height, | |
frame_width=fullframe.width, | |
bb_height=face_size, | |
bb_width=face_size, | |
left=face_dict['xmin'], | |
top=face_dict['ymin'], | |
right=face_dict['xmax'], | |
bottom=face_dict['ymax']) | |
face = fullframe.crop(cropping_bb) | |
for key in blazeface.BlazeFace.detection_keys: | |
if (key[0] == 'k' and key[-1] == 'x') or (key[0] == 'x'): | |
face_dict[key] -= cropping_bb[0] | |
elif (key[0] == 'k' and key[-1] == 'y') or (key[0] == 'y'): | |
face_dict[key] -= cropping_bb[1] | |
face_dict['left'] = face_dict.pop('xmin') | |
face_dict['top'] = face_dict.pop('ymin') | |
face_dict['right'] = face_dict.pop('xmax') | |
face_dict['bottom'] = face_dict.pop('ymax') | |
face_path.parent.mkdir(parents=True, exist_ok=True) | |
images_to_save.append((face, face_path)) | |
video_face_dict_list.append(face_dict) | |
if len(video_face_dict_list) > 0: | |
df_video_faces = pd.DataFrame(video_face_dict_list) | |
df_video_faces.index = df_video_faces['facepath'] | |
del df_video_faces['facepath'] | |
# type conversions | |
for key in ['kp1x', 'kp1y', 'kp2x', 'kp2y', 'kp3x', 'kp3y', | |
'kp4x', 'kp4y', 'kp5x', 'kp5y', 'kp6x', 'kp6y', 'left', 'top', | |
'right', 'bottom']: | |
df_video_faces[key] = df_video_faces[key].astype(np.int16) | |
df_video_faces['conf'] = df_video_faces['conf'].astype(np.float32) | |
df_video_faces['video'] = df_video_faces['video'].astype('category') | |
video_faces_checkpoint_path.parent.mkdir(parents=True, exist_ok=True) | |
else: | |
print('No faces extracted for video {}'.format(record['path'])) | |
df_video_faces = pd.DataFrame() | |
return df_video_faces, video_faces_checkpoint_path, images_to_save | |
except Exception as e: | |
print('Error while processing: {}'.format(record['path'])) | |
print("-" * 60) | |
traceback.print_exc(file=sys.stdout, limit=5) | |
print("-" * 60) | |
return | |
if __name__ == '__main__': | |
main(sys.argv[1:]) | |