from tqdm import tqdm import numpy as np import dlib from collections import OrderedDict import cv2 detector = dlib.get_frontal_face_detector() predictor = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat") FACIAL_LANDMARKS_68_IDXS = OrderedDict([ ("mouth", (48, 68)), ("inner_mouth", (60, 68)), ("right_eyebrow", (17, 22)), ("left_eyebrow", (22, 27)), ("right_eye", (36, 42)), ("left_eye", (42, 48)), ("nose", (27, 36)), ("jaw", (0, 17)) ]) def shape_to_face(shape, width, height, scale=1.2): """ Recalculate the face bounding box based on coarse landmark location(shape) :param shape: landmark locations scale: the scale parameter of face, to enlarge the bounding box :return: face_new: new bounding box of face (1*4 list [x1, y1, x2, y2]) # face_center: the center coordinate of face (1*2 list [x_c, y_c]) face_size: the face is rectangular( width = height = size)(int) """ x_min, y_min = np.min(shape, axis=0) x_max, y_max = np.max(shape, axis=0) x_center = (x_min + x_max) // 2 y_center = (y_min + y_max) // 2 face_size = int(max(x_max - x_min, y_max - y_min) * scale) # Enforce it to be even # Thus the real whole bounding box size will be an odd # But after cropping the face size will become even and # keep same to the face_size parameter. face_size = face_size // 2 * 2 x1 = max(x_center - face_size // 2, 0) y1 = max(y_center - face_size // 2, 0) face_size = min(width - x1, face_size) face_size = min(height - y1, face_size) x2 = x1 + face_size y2 = y1 + face_size face_new = [int(x1), int(y1), int(x2), int(y2)] return face_new, face_size def predict_single_frame(frame): """ :param frame: A full frame of video :return: face_num: the number of face (just to verify if successfully detect a face) shape: landmark locations """ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) faces = detector(gray, 0) if len(faces) < 1: return 0, None face = faces[0] landmarks = predictor(frame, face) face_landmark_list = [(p.x, p.y) for p in landmarks.parts()] shape = np.array(face_landmark_list) return 1, shape def landmark_align(shape): desiredLeftEye = (0.35, 0.25) desiredFaceWidth = 2 desiredFaceHeight = 2 (lStart, lEnd) = FACIAL_LANDMARKS_68_IDXS["left_eye"] (rStart, rEnd) = FACIAL_LANDMARKS_68_IDXS["right_eye"] leftEyePts = shape[lStart:lEnd] rightEyePts = shape[rStart:rEnd] # compute the center of mass for each eye leftEyeCenter = leftEyePts.mean(axis=0) # .astype("int") rightEyeCenter = rightEyePts.mean(axis=0) # .astype("int") # compute the angle between the eye centroids dY = rightEyeCenter[1] - leftEyeCenter[1] dX = rightEyeCenter[0] - leftEyeCenter[0] angle = np.degrees(np.arctan2(dY, dX)) # - 180 # compute the desired right eye x-coordinate based on the # desired x-coordinate of the left eye desiredRightEyeX = 1.0 - desiredLeftEye[0] # determine the scale of the new resulting image by taking # the ratio of the distance between eyes in the *current* # image to the ratio of distance between eyes in the # *desired* image dist = np.sqrt((dX ** 2) + (dY ** 2)) desiredDist = (desiredRightEyeX - desiredLeftEye[0]) desiredDist *= desiredFaceWidth scale = desiredDist / dist # compute center (x, y)-coordinates (i.e., the median point) # between the two eyes in the input image eyesCenter = ((leftEyeCenter[0] + rightEyeCenter[0]) // 2, (leftEyeCenter[1] + rightEyeCenter[1]) // 2) # grab the rotation matrix for rotating and scaling the face M = cv2.getRotationMatrix2D(eyesCenter, angle, scale) # update the translation component of the matrix tX = 0 # desiredFaceWidth * 0.5 tY = desiredFaceHeight * desiredLeftEye[1] M[0, 2] += (tX - eyesCenter[0]) M[1, 2] += (tY - eyesCenter[1]) n, d = shape.shape temp = np.zeros((n, d + 1), dtype="int") temp[:, 0:2] = shape temp[:, 2] = 1 aligned_landmarks = np.matmul(M, temp.T) return aligned_landmarks.T # .astype("int")) def check_and_merge(location, forward, feedback, P_predict, status_fw=None, status_fb=None): num_pts = 68 check = [True] * num_pts target = location[1] forward_predict = forward[1] # To ensure the robustness through feedback-check forward_base = forward[0] # Also equal to location[0] feedback_predict = feedback[0] feedback_diff = feedback_predict - forward_base feedback_dist = np.linalg.norm(feedback_diff, axis=1, keepdims=True) # For Kalman Filtering detect_diff = location[1] - location[0] detect_dist = np.linalg.norm(detect_diff, axis=1, keepdims=True) predict_diff = forward[1] - forward[0] predict_dist = np.linalg.norm(predict_diff, axis=1, keepdims=True) predict_dist[np.where(predict_dist == 0)] = 1 # Avoid nan P_detect = (detect_dist / predict_dist).reshape(num_pts) for ipt in range(num_pts): if feedback_dist[ipt] > 2: # When use float check[ipt] = False if status_fw is not None and np.sum(status_fw) != num_pts: for ipt in range(num_pts): if status_fw[ipt][0] == 0: check[ipt] = False if status_fw is not None and np.sum(status_fb) != num_pts: for ipt in range(num_pts): if status_fb[ipt][0] == 0: check[ipt] = False location_merge = target.copy() # Merge the results: """ Use Kalman Filter to combine the calculate result and detect result. """ Q = 0.3 # Process variance for ipt in range(num_pts): if check[ipt]: # Kalman parameter P_predict[ipt] += Q K = P_predict[ipt] / (P_predict[ipt] + P_detect[ipt]) location_merge[ipt] = forward_predict[ipt] + K * (target[ipt] - forward_predict[ipt]) # Update the P_predict by the current K P_predict[ipt] = (1 - K) * P_predict[ipt] return location_merge, check, P_predict def detect_frames_track(frames): frames_num = len(frames) assert frames_num != 0 frame_height, frame_width = frames[0].shape[:2] """ Pre-process: To detect the original results, and normalize each face to a certain width, also its corresponding landmarks locations and scale parameter. """ face_size_normalized = 400 faces = [] locations = [] shapes_origin = [] shapes_para = [] # Use to recover the shape in whole frame. ([x1, y1, scale_shape]) face_size = 0 skipped = 0 """ Use single frame to detect face on Dlib (CPU) """ # ----------------------------------------------------------------------------# print("Detecting:") for i in tqdm(range(frames_num)): frame = frames[i] face_num, shape = predict_single_frame(frame) if face_num == 0: if len(shapes_origin) == 0: skipped += 1 # print("Skipped", skipped, "Frame_num", frames_num) continue shape = shapes_origin[i - 1 - skipped] face, face_size = shape_to_face(shape, frame_width, frame_height, 1.2) faceFrame = frame[face[1]: face[3], face[0]:face[2]] if face_size < face_size_normalized: inter_para = cv2.INTER_CUBIC else: inter_para = cv2.INTER_AREA face_norm = cv2.resize(faceFrame, (face_size_normalized, face_size_normalized), interpolation=inter_para) scale_shape = face_size_normalized / face_size shape_norm = np.rint((shape - np.array([face[0], face[1]])) * scale_shape).astype(int) faces.append(face_norm) shapes_para.append([face[0], face[1], scale_shape]) shapes_origin.append(shape) locations.append(shape_norm) """ Calibration module. """ segment_length = 2 locations_sum = len(locations) if locations_sum == 0: return [] locations_track = [locations[0]] num_pts = 68 P_predict = np.array([0] * num_pts).reshape(num_pts).astype(float) print("Tracking") for i in tqdm(range(locations_sum - 1)): faces_seg = faces[i:i + segment_length] locations_seg = locations[i:i + segment_length] # ----------------------------------------------------------------------# """ Numpy Version (DEPRECATED) """ # locations_track_start = [locations_track[i]] # forward_pts, feedback_pts = track_bidirectional(faces_seg, locations_track_start) # # forward_pts = np.rint(forward_pts).astype(int) # feedback_pts = np.rint(feedback_pts).astype(int) # merge_pt, check, P_predict = check_and_merge(locations_seg, forward_pts, feedback_pts, P_predict) # ----------------------------------------------------------------------# """ OpenCV Version """ lk_params = dict(winSize=(15, 15), maxLevel=3, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)) # Use the tracked current location as input. Also use the next frame's predicted location for # auxiliary initialization. start_pt = locations_track[i].astype(np.float32) target_pt = locations_seg[1].astype(np.float32) forward_pt, status_fw, err_fw = cv2.calcOpticalFlowPyrLK(faces_seg[0], faces_seg[1], start_pt, target_pt, **lk_params, flags=cv2.OPTFLOW_USE_INITIAL_FLOW) feedback_pt, status_fb, err_fb = cv2.calcOpticalFlowPyrLK(faces_seg[1], faces_seg[0], forward_pt, start_pt, **lk_params, flags=cv2.OPTFLOW_USE_INITIAL_FLOW) forward_pts = [locations_track[i].copy(), forward_pt] feedback_pts = [feedback_pt, forward_pt.copy()] forward_pts = np.rint(forward_pts).astype(int) feedback_pts = np.rint(feedback_pts).astype(int) merge_pt, check, P_predict = check_and_merge(locations_seg, forward_pts, feedback_pts, P_predict, status_fw, status_fb) # ----------------------------------------------------------------------# locations_track.append(merge_pt) """ If us visualization, write the results to the visualize output folder. """ if locations_sum != frames_num: print("INFO: Landmarks detection failed in some frames. Therefore we disable the " "visualization for this video. It will be optimized in future version.") aligned_landmarks = [] for i in locations_track: shape = landmark_align(i) shape = shape.ravel() shape = shape.tolist() aligned_landmarks.append(shape) return aligned_landmarks