import os
import cv2
import time
import glob
import argparse
import scipy
import numpy as np
from PIL import Image
from tqdm import tqdm
from itertools import cycle
from torch.multiprocessing import Pool, Process, set_start_method
brief: face alignment with FFHQ method (https://github.com/NVlabs/ffhq-dataset)
author: lzhbrian (https://lzhbrian.me)
date: 2020.1.5
note: code is heavily borrowed from
apt install cmake
conda install Pillow numpy scipy
pip install dlib
# download face landmark model from:
# http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2
import numpy as np
from PIL import Image
import dlib
class Croper:
def __init__(self, path_of_lm):
self.predictor = dlib.shape_predictor(path_of_lm)
def get_landmark(self, img_np):
"""get landmark with dlib
:return: np.array shape=(68, 2)
detector = dlib.get_frontal_face_detector()
dets = detector(img_np, 1)
if len(dets) == 0:
return None
d = dets[0]
shape = self.predictor(img_np, d)
t = list(shape.parts())
a = []
for tt in t:
a.append([tt.x, tt.y])
lm = np.array(a)
return lm
def align_face(self, img, lm, output_size=1024):
:param filepath: str
:return: PIL Image
lm_chin = lm[0: 17]
lm_eyebrow_left = lm[17: 22]
lm_eyebrow_right = lm[22: 27]
lm_nose = lm[27: 31]
lm_nostrils = lm[31: 36]
lm_eye_left = lm[36: 42]
lm_eye_right = lm[42: 48]
lm_mouth_outer = lm[48: 60]
lm_mouth_inner = lm[60: 68]
eye_left = np.mean(lm_eye_left, axis=0)
eye_right = np.mean(lm_eye_right, axis=0)
eye_avg = (eye_left + eye_right) * 0.5
eye_to_eye = eye_right - eye_left
mouth_left = lm_mouth_outer[0]
mouth_right = lm_mouth_outer[6]
mouth_avg = (mouth_left + mouth_right) * 0.5
eye_to_mouth = mouth_avg - eye_avg
x = eye_to_eye - np.flipud(eye_to_mouth) * [-1, 1]
x /= np.hypot(*x)
x *= max(np.hypot(*eye_to_eye) * 2.0, np.hypot(*eye_to_mouth) * 1.8)
y = np.flipud(x) * [-1, 1]
c = eye_avg + eye_to_mouth * 0.1
quad = np.stack([c - x - y, c - x + y, c + x + y, c + x - y])
qsize = np.hypot(*x) * 2
shrink = int(np.floor(qsize / output_size * 0.5))
if shrink > 1:
rsize = (int(np.rint(float(img.size[0]) / shrink)), int(np.rint(float(img.size[1]) / shrink)))
img = img.resize(rsize, Image.ANTIALIAS)
quad /= shrink
qsize /= shrink
rsize = (int(np.rint(float(img.size[0]))), int(np.rint(float(img.size[1]))))
border = max(int(np.rint(qsize * 0.1)), 3)
crop = (int(np.floor(min(quad[:, 0]))), int(np.floor(min(quad[:, 1]))), int(np.ceil(max(quad[:, 0]))),
int(np.ceil(max(quad[:, 1]))))
crop = (max(crop[0] - border, 0), max(crop[1] - border, 0), min(crop[2] + border, img.size[0]),
min(crop[3] + border, img.size[1]))
if crop[2] - crop[0] < img.size[0] or crop[3] - crop[1] < img.size[1]:
quad -= crop[0:2]
pad = (int(np.floor(min(quad[:, 0]))), int(np.floor(min(quad[:, 1]))), int(np.ceil(max(quad[:, 0]))),
int(np.ceil(max(quad[:, 1]))))
pad = (max(-pad[0] + border, 0), max(-pad[1] + border, 0), max(pad[2] - img.size[0] + border, 0),
max(pad[3] - img.size[1] + border, 0))
quad = (quad + 0.5).flatten()
lx = max(min(quad[0], quad[2]), 0)
ly = max(min(quad[1], quad[7]), 0)
rx = min(max(quad[4], quad[6]), img.size[0])
ry = min(max(quad[3], quad[5]), img.size[0])
return rsize, crop, [lx, ly, rx, ry]
def crop(self, img_np_list, still=False, xsize=512):
img_np = img_np_list[0]
lm = self.get_landmark(img_np)
if lm is None:
raise 'can not detect the landmark from source image'
rsize, crop, quad = self.align_face(img=Image.fromarray(img_np), lm=lm, output_size=xsize)
clx, cly, crx, cry = crop
lx, ly, rx, ry = quad
lx, ly, rx, ry = int(lx), int(ly), int(rx), int(ry)
for _i in range(len(img_np_list)):
_inp = img_np_list[_i]
_inp = cv2.resize(_inp, (rsize[0], rsize[1]))
_inp = _inp[cly:cry, clx:crx]
if not still:
_inp = _inp[ly:ry, lx:rx]
img_np_list[_i] = _inp
return img_np_list, crop, quad
def read_video(filename, uplimit=100):
frames = []
cap = cv2.VideoCapture(filename)
cnt = 0
while cap.isOpened():
ret, frame = cap.read()
if ret:
frame = cv2.resize(frame, (512, 512))
cnt += 1
if cnt >= uplimit:
assert len(frames) > 0, f'{filename}: video with no frames!'
return frames
def create_video(video_name, frames, fps=25, video_format='.mp4', resize_ratio=1):
height, width, layers = 512, 512, 3
if video_format == '.mp4':
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
elif video_format == '.avi':
fourcc = cv2.VideoWriter_fourcc(*'XVID')
video = cv2.VideoWriter(video_name, fourcc, fps, (width, height))
for _frame in frames:
_frame = cv2.resize(_frame, (height, width), interpolation=cv2.INTER_LINEAR)
def create_images(video_name, frames):
height, width, layers = 512, 512, 3
images_dir = video_name.split('.')[0]
os.makedirs(images_dir, exist_ok=True)
for i, _frame in enumerate(frames):
_frame = cv2.resize(_frame, (height, width), interpolation=cv2.INTER_LINEAR)
_frame_path = os.path.join(images_dir, str(i)+'.jpg')
cv2.imwrite(_frame_path, _frame)
def run(data):
filename, opt, device = data
os.environ['CUDA_VISIBLE_DEVICES'] = device
croper = Croper()
frames = read_video(filename, uplimit=opt.uplimit)
name = filename.split('/')[-1]
name = os.path.join(opt.output_dir, name)
frames = croper.crop(frames)
if frames is None:
print(f'{name}: detect no face. should removed')
create_images(name, frames)
def get_data_path(video_dir):
eg_video_files = ['/apdcephfs/share_1290939/quincheng/datasets/HDTF/backup_fps25/WDA_KatieHill_000.mp4']
return eg_video_files
def get_wra_data_path(video_dir):
if opt.option == 'video':
videos_path = sorted(glob.glob(f'{video_dir}/*.mp4'))
elif opt.option == 'image':
videos_path = sorted(glob.glob(f'{video_dir}/*/'))
raise NotImplementedError
print('Example videos: ', videos_path[:2])
return videos_path
if __name__ == '__main__':
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--input_dir', type=str, help='the folder of the input files')
parser.add_argument('--output_dir', type=str, help='the folder of the output files')
parser.add_argument('--device_ids', type=str, default='0,1')
parser.add_argument('--workers', type=int, default=8)
parser.add_argument('--uplimit', type=int, default=500)
parser.add_argument('--option', type=str, default='video')
root = '/apdcephfs/share_1290939/quincheng/datasets/HDTF'
cmd = f'--input_dir {root}/backup_fps25_first20s_sync/ ' \
f'--output_dir {root}/crop512_stylegan_firstframe_sync/ ' \
'--device_ids 0 ' \
'--workers 8 ' \
'--option video ' \
'--uplimit 500 '
opt = parser.parse_args(cmd.split())
filenames = get_wra_data_path(opt.input_dir)
os.makedirs(opt.output_dir, exist_ok=True)
print(f'Video numbers: {len(filenames)}')
pool = Pool(opt.workers)
args_list = cycle([opt])
device_ids = opt.device_ids.split(",")
device_ids = cycle(device_ids)
for data in tqdm(pool.imap_unordered(run, zip(filenames, args_list, device_ids))):