import os from typing import Tuple, List import cv2 import numpy as np import torch from PIL import Image from blazeface import BlazeFace class FaceExtractor: """Wrapper for face extraction workflow.""" def __init__(self, video_read_fn = None, facedet: BlazeFace = None): """Creates a new FaceExtractor. Arguments: video_read_fn: a function that takes in a path to a video file and returns a tuple consisting of a NumPy array with shape (num_frames, H, W, 3) and a list of frame indices, or None in case of an error facedet: the face detector object """ self.video_read_fn = video_read_fn self.facedet = facedet def process_image(self, path: str = None, img: Image.Image or np.ndarray = None) -> dict: """ Process a single image :param path: Path to the image :param img: image :return: """ if img is not None and path is not None: raise ValueError('Only one argument between path and img can be specified') if img is None and path is None: raise ValueError('At least one argument between path and img must be specified') target_size = self.facedet.input_size if img is None: img = np.asarray(Image.open(str(path))) else: img = np.asarray(img) # Split the frames into several tiles. Resize the tiles to 128x128. tiles, resize_info = self._tile_frames(np.expand_dims(img, 0), target_size) # tiles has shape (num_tiles, target_size, target_size, 3) # resize_info is a list of four elements [resize_factor_y, resize_factor_x, 0, 0] # Run the face detector. The result is a list of PyTorch tensors, # one for each tile in the batch. detections = self.facedet.predict_on_batch(tiles, apply_nms=False) # Convert the detections from 128x128 back to the original frame size. detections = self._resize_detections(detections, target_size, resize_info) # Because we have several tiles for each frame, combine the predictions # from these tiles. The result is a list of PyTorch tensors, but now one # for each frame (rather than each tile). num_frames = 1 frame_size = (img.shape[1], img.shape[0]) detections = self._untile_detections(num_frames, frame_size, detections) # The same face may have been detected in multiple tiles, so filter out # overlapping detections. This is done separately for each frame. detections = self.facedet.nms(detections) # Crop the faces out of the original frame. frameref_detections = self._add_margin_to_detections(detections[0], frame_size, 0.2) faces = self._crop_faces(img, frameref_detections) kpts = self._crop_kpts(img, detections[0], 0.3) # Add additional information about the frame and detections. scores = list(detections[0][:, 16].cpu().numpy()) frame_dict = {"frame_w": frame_size[0], "frame_h": frame_size[1], "faces": faces, "kpts": kpts, "detections": frameref_detections.cpu().numpy(), "scores": scores, } # Sort faces by descending confidence frame_dict = self._soft_faces_by_descending_score(frame_dict) return frame_dict def _soft_faces_by_descending_score(self, frame_dict: dict) -> dict: if len(frame_dict['scores']) > 1: sort_idxs = np.argsort(frame_dict['scores'])[::-1] new_faces = [frame_dict['faces'][i] for i in sort_idxs] new_kpts = [frame_dict['kpts'][i] for i in sort_idxs] new_detections = frame_dict['detections'][sort_idxs] new_scores = [frame_dict['scores'][i] for i in sort_idxs] frame_dict['faces'] = new_faces frame_dict['kpts'] = new_kpts frame_dict['detections'] = new_detections frame_dict['scores'] = new_scores return frame_dict def process_videos(self, input_dir, filenames, video_idxs) -> List[dict]: """For the specified selection of videos, grabs one or more frames from each video, runs the face detector, and tries to find the faces in each frame. The frames are split into tiles, and the tiles from the different videos are concatenated into a single batch. This means the face detector gets a batch of size len(video_idxs) * num_frames * num_tiles (usually 3). Arguments: input_dir: base folder where the video files are stored filenames: list of all video files in the input_dir video_idxs: one or more indices from the filenames list; these are the videos we'll actually process Returns a list of dictionaries, one for each frame read from each video. This dictionary contains: - video_idx: the video this frame was taken from - frame_idx: the index of the frame in the video - frame_w, frame_h: original dimensions of the frame - faces: a list containing zero or more NumPy arrays with a face crop - scores: a list array with the confidence score for each face crop If reading a video failed for some reason, it will not appear in the output array. Note that there's no guarantee a given video will actually have num_frames results (as soon as a reading problem is encountered for a video, we continue with the next video). """ target_size = self.facedet.input_size videos_read = [] frames_read = [] frames = [] tiles = [] resize_info = [] for video_idx in video_idxs: # Read the full-size frames from this video. filename = filenames[video_idx] video_path = os.path.join(input_dir, filename) result = self.video_read_fn(video_path) # Error? Then skip this video. if result is None: continue videos_read.append(video_idx) # Keep track of the original frames (need them later). my_frames, my_idxs = result frames.append(my_frames) frames_read.append(my_idxs) # Split the frames into several tiles. Resize the tiles to 128x128. my_tiles, my_resize_info = self._tile_frames(my_frames, target_size) tiles.append(my_tiles) resize_info.append(my_resize_info) if len(tiles) == 0: return [] # Put all the tiles for all the frames from all the videos into # a single batch. batch = np.concatenate(tiles) # Run the face detector. The result is a list of PyTorch tensors, # one for each image in the batch. all_detections = self.facedet.predict_on_batch(batch, apply_nms=False) result = [] offs = 0 for v in range(len(tiles)): # Not all videos may have the same number of tiles, so find which # detections go with which video. num_tiles = tiles[v].shape[0] detections = all_detections[offs:offs + num_tiles] offs += num_tiles # Convert the detections from 128x128 back to the original frame size. detections = self._resize_detections(detections, target_size, resize_info[v]) # Because we have several tiles for each frame, combine the predictions # from these tiles. The result is a list of PyTorch tensors, but now one # for each frame (rather than each tile). num_frames = frames[v].shape[0] frame_size = (frames[v].shape[2], frames[v].shape[1]) detections = self._untile_detections(num_frames, frame_size, detections) # The same face may have been detected in multiple tiles, so filter out # overlapping detections. This is done separately for each frame. detections = self.facedet.nms(detections) for i in range(len(detections)): # Crop the faces out of the original frame. frameref_detections = self._add_margin_to_detections(detections[i], frame_size, 0.2) faces = self._crop_faces(frames[v][i], frameref_detections) kpts = self._crop_kpts(frames[v][i], detections[i], 0.3) # Add additional information about the frame and detections. scores = list(detections[i][:, 16].cpu().numpy()) frame_dict = {"video_idx": videos_read[v], "frame_idx": frames_read[v][i], "frame_w": frame_size[0], "frame_h": frame_size[1], "frame": frames[v][i], "faces": faces, "kpts": kpts, "detections": frameref_detections.cpu().numpy(), "scores": scores, } # Sort faces by descending confidence frame_dict = self._soft_faces_by_descending_score(frame_dict) result.append(frame_dict) return result def process_video(self, video_path): """Convenience method for doing face extraction on a single video.""" input_dir = os.path.dirname(video_path) filenames = [os.path.basename(video_path)] return self.process_videos(input_dir, filenames, [0]) def _tile_frames(self, frames: np.ndarray, target_size: Tuple[int, int]) -> (np.ndarray, List[float]): """Splits each frame into several smaller, partially overlapping tiles and resizes each tile to target_size. After a bunch of experimentation, I found that for a 1920x1080 video, BlazeFace works better on three 1080x1080 windows. These overlap by 420 pixels. (Two windows also work but it's best to have a clean center crop in there as well.) I also tried 6 windows of size 720x720 (horizontally: 720|360, 360|720; vertically: 720|1200, 480|720|480, 1200|720) but that gives many false positives when a window has no face in it. For a video in portrait orientation (1080x1920), we only take a single crop of the top-most 1080 pixels. If we split up the video vertically, then we might get false positives again. (NOTE: Not all videos are necessarily 1080p but the code can handle this.) Arguments: frames: NumPy array of shape (num_frames, height, width, 3) target_size: (width, height) Returns: - a new (num_frames * N, target_size[1], target_size[0], 3) array where N is the number of tiles used. - a list [scale_w, scale_h, offset_x, offset_y] that describes how to map the resized and cropped tiles back to the original image coordinates. This is needed for scaling up the face detections from the smaller image to the original image, so we can take the face crops in the original coordinate space. """ num_frames, H, W, _ = frames.shape num_h, num_v, split_size, x_step, y_step = self.get_tiles_params(H, W) splits = np.zeros((num_frames * num_v * num_h, target_size[1], target_size[0], 3), dtype=np.uint8) i = 0 for f in range(num_frames): y = 0 for v in range(num_v): x = 0 for h in range(num_h): crop = frames[f, y:y + split_size, x:x + split_size, :] splits[i] = cv2.resize(crop, target_size, interpolation=cv2.INTER_AREA) x += x_step i += 1 y += y_step resize_info = [split_size / target_size[0], split_size / target_size[1], 0, 0] return splits, resize_info def get_tiles_params(self, H, W): split_size = min(H, W, 720) x_step = (W - split_size) // 2 y_step = (H - split_size) // 2 num_v = (H - split_size) // y_step + 1 if y_step > 0 else 1 num_h = (W - split_size) // x_step + 1 if x_step > 0 else 1 return num_h, num_v, split_size, x_step, y_step def _resize_detections(self, detections, target_size, resize_info): """Converts a list of face detections back to the original coordinate system. Arguments: detections: a list containing PyTorch tensors of shape (num_faces, 17) target_size: (width, height) resize_info: [scale_w, scale_h, offset_x, offset_y] """ projected = [] target_w, target_h = target_size scale_w, scale_h, offset_x, offset_y = resize_info for i in range(len(detections)): detection = detections[i].clone() # ymin, xmin, ymax, xmax for k in range(2): detection[:, k * 2] = (detection[:, k * 2] * target_h - offset_y) * scale_h detection[:, k * 2 + 1] = (detection[:, k * 2 + 1] * target_w - offset_x) * scale_w # keypoints are x,y for k in range(2, 8): detection[:, k * 2] = (detection[:, k * 2] * target_w - offset_x) * scale_w detection[:, k * 2 + 1] = (detection[:, k * 2 + 1] * target_h - offset_y) * scale_h projected.append(detection) return projected def _untile_detections(self, num_frames: int, frame_size: Tuple[int, int], detections: List[torch.Tensor]) -> List[ torch.Tensor]: """With N tiles per frame, there also are N times as many detections. This function groups together the detections for a given frame; it is the complement to tile_frames(). """ combined_detections = [] W, H = frame_size num_h, num_v, split_size, x_step, y_step = self.get_tiles_params(H, W) i = 0 for f in range(num_frames): detections_for_frame = [] y = 0 for v in range(num_v): x = 0 for h in range(num_h): # Adjust the coordinates based on the split positions. detection = detections[i].clone() if detection.shape[0] > 0: for k in range(2): detection[:, k * 2] += y detection[:, k * 2 + 1] += x for k in range(2, 8): detection[:, k * 2] += x detection[:, k * 2 + 1] += y detections_for_frame.append(detection) x += x_step i += 1 y += y_step combined_detections.append(torch.cat(detections_for_frame)) return combined_detections def _add_margin_to_detections(self, detections: torch.Tensor, frame_size: Tuple[int, int], margin: float = 0.2) -> torch.Tensor: """Expands the face bounding box. NOTE: The face detections often do not include the forehead, which is why we use twice the margin for ymin. Arguments: detections: a PyTorch tensor of shape (num_detections, 17) frame_size: maximum (width, height) margin: a percentage of the bounding box's height Returns a PyTorch tensor of shape (num_detections, 17). """ offset = torch.round(margin * (detections[:, 2] - detections[:, 0])) detections = detections.clone() detections[:, 0] = torch.clamp(detections[:, 0] - offset * 2, min=0) # ymin detections[:, 1] = torch.clamp(detections[:, 1] - offset, min=0) # xmin detections[:, 2] = torch.clamp(detections[:, 2] + offset, max=frame_size[1]) # ymax detections[:, 3] = torch.clamp(detections[:, 3] + offset, max=frame_size[0]) # xmax return detections def _crop_faces(self, frame: np.ndarray, detections: torch.Tensor) -> List[np.ndarray]: """Copies the face region(s) from the given frame into a set of new NumPy arrays. Arguments: frame: a NumPy array of shape (H, W, 3) detections: a PyTorch tensor of shape (num_detections, 17) Returns a list of NumPy arrays, one for each face crop. If there are no faces detected for this frame, returns an empty list. """ faces = [] for i in range(len(detections)): ymin, xmin, ymax, xmax = detections[i, :4].cpu().numpy().astype(np.int64) face = frame[ymin:ymax, xmin:xmax, :] faces.append(face) return faces def _crop_kpts(self, frame: np.ndarray, detections: torch.Tensor, face_fraction: float): """Copies the parts region(s) from the given frame into a set of new NumPy arrays. Arguments: frame: a NumPy array of shape (H, W, 3) detections: a PyTorch tensor of shape (num_detections, 17) face_fraction: float between 0 and 1 indicating how big are the parts to be extracted w.r.t the whole face Returns a list of NumPy arrays, one for each face crop. If there are no faces detected for this frame, returns an empty list. """ faces = [] for i in range(len(detections)): kpts = [] size = int(face_fraction * min(detections[i, 2] - detections[i, 0], detections[i, 3] - detections[i, 1])) kpts_coords = detections[i, 4:16].cpu().numpy().astype(np.int64) for kpidx in range(6): kpx, kpy = kpts_coords[kpidx * 2:kpidx * 2 + 2] kpt = frame[kpy - size // 2:kpy - size // 2 + size, kpx - size // 2:kpx - size // 2 + size, ] kpts.append(kpt) faces.append(kpts) return faces def remove_large_crops(self, crops, pct=0.1): """Removes faces from the results if they take up more than X% of the video. Such a face is likely a false positive. This is an optional postprocessing step. Modifies the original data structure. Arguments: crops: a list of dictionaries with face crop data pct: maximum portion of the frame a crop may take up """ for i in range(len(crops)): frame_data = crops[i] video_area = frame_data["frame_w"] * frame_data["frame_h"] faces = frame_data["faces"] scores = frame_data["scores"] new_faces = [] new_scores = [] for j in range(len(faces)): face = faces[j] face_H, face_W, _ = face.shape face_area = face_H * face_W if face_area / video_area < 0.1: new_faces.append(face) new_scores.append(scores[j]) frame_data["faces"] = new_faces frame_data["scores"] = new_scores def keep_only_best_face(self, crops): """For each frame, only keeps the face with the highest confidence. This gets rid of false positives, but obviously is problematic for videos with two people! This is an optional postprocessing step. Modifies the original data structure. """ for i in range(len(crops)): frame_data = crops[i] if len(frame_data["faces"]) > 0: frame_data["faces"] = frame_data["faces"][:1] frame_data["scores"] = frame_data["scores"][:1] # TODO: def filter_likely_false_positives(self, crops): # if only some frames have more than 1 face, it's likely a false positive # if most frames have more than 1 face, it's probably two people # so find the % of frames with > 1 face; if > 0.X, keep the two best faces # TODO: def filter_by_score(self, crops, min_score) to remove any # crops with a confidence score lower than min_score # TODO: def sort_by_histogram(self, crops) for videos with 2 people.