|
import subprocess |
|
subprocess.run(["sh", "tddfa/build.sh"]) |
|
|
|
import gradio as gr |
|
from gradio.components import Dropdown |
|
|
|
import cv2 as cv |
|
import torch |
|
from torchvision import transforms |
|
from DeePixBiS.Model import DeePixBiS |
|
|
|
import yaml |
|
import numpy as np |
|
import pandas as pd |
|
from skimage.io import imread, imsave |
|
|
|
from tddfa.utils.depth import depth |
|
from tddfa.TDDFA_ONNX import TDDFA_ONNX |
|
|
|
import torch.optim as optim |
|
from DSDG.DUM.models.CDCNs_u import Conv2d_cd, CDCN_u |
|
|
|
import io |
|
import uuid |
|
import numpy as np |
|
from PIL import Image |
|
import boto3 |
|
|
|
from utils.blur_filter import filter_frames |
|
|
|
import os |
|
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' |
|
os.environ['OMP_NUM_THREADS'] = '4' |
|
|
|
app_version = 'dsdg_vid_3' |
|
|
|
device = torch.device("cpu") |
|
labels = ['Live', 'Spoof'] |
|
PIX_THRESHOLD = 0.45 |
|
DSDG_THRESHOLD = 80.0 |
|
DSDG_FACTOR = 1000000 |
|
DSDG_PERCENTILE = 40 |
|
MIN_FACE_WIDTH_THRESHOLD = 210 |
|
|
|
examples = [ |
|
['examples/1_1_21_2_33_scene_fake.jpg'], |
|
['examples/frame150_real.jpg'], |
|
['examples/1_2.avi_125_real.jpg'], |
|
['examples/1_3.avi_25_fake.jpg']] |
|
faceClassifier = cv.CascadeClassifier('./DeePixBiS/Classifiers/haarface.xml') |
|
tfms = transforms.Compose([ |
|
transforms.ToPILImage(), |
|
transforms.Resize((224, 224)), |
|
transforms.ToTensor(), |
|
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) |
|
]) |
|
|
|
|
|
|
|
|
|
|
|
depth_config_path = 'tddfa/configs/mb1_120x120.yml' |
|
cfg = yaml.load(open(depth_config_path), Loader=yaml.SafeLoader) |
|
tddfa = TDDFA_ONNX(gpu_mode=False, **cfg) |
|
|
|
|
|
cdcn_model = CDCN_u(basic_conv=Conv2d_cd, theta=0.7) |
|
cdcn_model = cdcn_model.to(device) |
|
weights = torch.load('./DSDG/DUM/checkpoint/CDCN_U_P1_updated.pkl', map_location=device) |
|
cdcn_model.load_state_dict(weights) |
|
optimizer = optim.Adam(cdcn_model.parameters(), lr=0.001, weight_decay=0.00005) |
|
cdcn_model.eval() |
|
|
|
|
|
class Normaliztion_valtest(object): |
|
""" |
|
same as mxnet, normalize into [-1, 1] |
|
image = (image - 127.5)/128 |
|
""" |
|
def __call__(self, image_x): |
|
image_x = (image_x - 127.5) / 128 |
|
return image_x |
|
|
|
|
|
def find_largest_face(faces): |
|
|
|
largest_face = None |
|
largest_area = 0 |
|
for face in faces: |
|
x, y, w, h = face |
|
area = w * h |
|
if area > largest_area: |
|
largest_area = area |
|
largest_face = face |
|
return largest_face |
|
|
|
|
|
def extract_face(img): |
|
face = None |
|
if img is None: |
|
return face |
|
grey = cv.cvtColor(img, cv.COLOR_BGR2GRAY) |
|
faces = faceClassifier.detectMultiScale( |
|
grey, scaleFactor=1.1, minNeighbors=4) |
|
if len(faces): |
|
face = find_largest_face(faces) |
|
return face |
|
|
|
|
|
def deepix_model_inference(img, bbox): |
|
x, y, x2, y2 = bbox |
|
faceRegion = img[y:y2, x:x2] |
|
faceRegion = tfms(faceRegion) |
|
faceRegion = faceRegion.unsqueeze(0) |
|
mask, binary = deepix_model.forward(faceRegion) |
|
res_deepix = torch.mean(mask).item() |
|
cls_deepix = 'Real' if res_deepix >= PIX_THRESHOLD else 'Spoof' |
|
confidences_deepix = {'Real confidence': res_deepix} |
|
color_deepix = (0, 255, 0) if cls_deepix == 'Real' else (255, 0, 0) |
|
img_deepix = cv.rectangle(img.copy(), (x, y), (x2, y2), color_deepix, 2) |
|
cv.putText(img_deepix, cls_deepix, (x, y2 + 30), |
|
cv.FONT_HERSHEY_COMPLEX, 1, color_deepix) |
|
cls_deepix = 1 if cls_deepix == 'Real' else 0 |
|
return img_deepix, confidences_deepix, cls_deepix |
|
|
|
|
|
def get_depth_img(img, bbox): |
|
bbox_conf = list(bbox) |
|
bbox_conf.append(1) |
|
param_lst, roi_box_lst = tddfa(img, [bbox_conf]) |
|
ver_lst = tddfa.recon_vers(param_lst, roi_box_lst, dense_flag=True) |
|
depth_img = depth(img, ver_lst, tddfa.tri, with_bg_flag=False) |
|
return depth_img |
|
|
|
|
|
def analyze_face(img): |
|
face = extract_face(img) |
|
if face is None: |
|
return img, (), None |
|
x, y, w, h = face |
|
x2 = x + w |
|
y2 = y + h |
|
bbox = (x, y, x2, y2) |
|
if w < MIN_FACE_WIDTH_THRESHOLD: |
|
color_dsdg = (0, 0, 0) |
|
text = f'Small res ({w}*{h})' |
|
cv.rectangle(img, (x, y), (x2, y2), color_dsdg, 2) |
|
cv.putText(img, text, (x, y2 + 30), |
|
cv.FONT_HERSHEY_COMPLEX, 1, color_dsdg) |
|
|
|
return img, bbox, None |
|
depth_img = get_depth_img(img, bbox) |
|
return img, bbox, depth_img |
|
|
|
|
|
def prepare_data_dsdg(images, boxes, depths): |
|
transform = transforms.Compose([Normaliztion_valtest()]) |
|
files_total = len(images) |
|
image_x = np.zeros((files_total, 256, 256, 3)) |
|
depth_x = np.ones((files_total, 32, 32)) |
|
|
|
for i, (image, bbox, depth_img) in enumerate( |
|
zip(images, boxes, depths)): |
|
x, y, x2, y2 = bbox |
|
depth_img = cv.cvtColor(depth_img, cv.COLOR_BGR2GRAY) |
|
image = image[y:y2, x:x2] |
|
depth_img = depth_img[y:y2, x:x2] |
|
|
|
image_x[i, :, :, :] = cv.resize(image, (256, 256)) |
|
|
|
depth_x[i, :, :] = cv.resize(depth_img, (32, 32)) |
|
image_x = image_x.transpose((0, 3, 1, 2)) |
|
image_x = transform(image_x) |
|
image_x = torch.from_numpy(image_x.astype(float)).float() |
|
depth_x = torch.from_numpy(depth_x.astype(float)).float() |
|
return image_x, depth_x |
|
|
|
|
|
def dsdg_model_inference(imgs, bboxes, depth_imgs): |
|
with torch.no_grad(): |
|
map_score_list = [] |
|
image_x, map_x = prepare_data_dsdg(imgs, bboxes, depth_imgs) |
|
|
|
image_x = image_x.unsqueeze(0) |
|
map_x = map_x.unsqueeze(0) |
|
inputs = image_x.to(device) |
|
test_maps = map_x.to(device) |
|
optimizer.zero_grad() |
|
|
|
scores = [] |
|
map_score = 0.0 |
|
for frame_t in range(inputs.shape[1]): |
|
mu, logvar, map_x, x_concat, x_Block1, x_Block2, x_Block3, x_input = cdcn_model(inputs[:, frame_t, :, :, :]) |
|
score_norm = torch.sum(mu) / torch.sum(test_maps[:, frame_t, :, :]) |
|
score = score_norm.item() |
|
if score > 10: |
|
score = 0.0 |
|
scores.append(score * DSDG_FACTOR) |
|
map_score += score_norm |
|
return scores |
|
|
|
|
|
def inference(img, dsdg_thresh): |
|
face = extract_face(img) |
|
if face is not None: |
|
x, y, w, h = face |
|
x2 = x + w |
|
y2 = y + h |
|
bbox = (x, y, x2, y2) |
|
|
|
img_dsdg, confidences_dsdg, cls_dsdg = dsdg_model_inference(img, bbox, dsdg_thresh) |
|
return img, {}, 2, img_dsdg, confidences_dsdg, cls_dsdg |
|
else: |
|
return img, {}, None, img, {}, None |
|
|
|
|
|
def process_video(vid_path, dsdg_thresh): |
|
cap = cv.VideoCapture(vid_path) |
|
input_width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH)) |
|
input_height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT)) |
|
|
|
most_focused = filter_frames(cap) |
|
|
|
inference_images = [] |
|
inference_bboxes = [] |
|
inference_depths = [] |
|
for frame in most_focused: |
|
|
|
img, bbox, depth_img = analyze_face(frame) |
|
if bbox and (depth_img is not None): |
|
inference_images.append(img) |
|
inference_bboxes.append(bbox) |
|
inference_depths.append(depth_img) |
|
|
|
if not inference_images: |
|
return vid_path, {'Not supported right now': 0}, -1, vid_path, 'Faces too small or not found', -1 |
|
|
|
scores = dsdg_model_inference(inference_images, inference_bboxes, inference_depths) |
|
res_dsdg = np.percentile(scores, DSDG_PERCENTILE) |
|
cls_dsdg = 'Real' if res_dsdg >= dsdg_thresh else 'Spoof' |
|
for img, bbox, score in zip(inference_images, inference_bboxes, scores): |
|
x, y, x2, y2 = bbox |
|
w = x2 - x |
|
h = y2 - y |
|
frame_cls = 'Real' if score >= dsdg_thresh else 'Spoof' |
|
color_dsdg = (0, 255, 0) if frame_cls == 'Real' else (0, 0, 255) |
|
text = f'{cls_dsdg} {w}*{h}' |
|
cv.rectangle(img, (x, y), (x2, y2), color_dsdg, 2) |
|
cv.putText(img, text, (x, y2 + 30), cv.FONT_HERSHEY_COMPLEX, 1, color_dsdg) |
|
|
|
fourcc = cv.VideoWriter_fourcc(*'mp4v') |
|
output_vid_path = 'output_dsdg.mp4' |
|
out_dsdg = cv.VideoWriter(output_vid_path, fourcc, 6.0, (input_width, input_height)) |
|
for img in most_focused: |
|
|
|
out_dsdg.write(img) |
|
out_dsdg.release() |
|
text_dsdg = f'Label: {cls_dsdg}, average real confidence: {res_dsdg}\nFrames used: {len(scores)}\nConfidences: {scores}' |
|
return vid_path, {'Not supported right now': 0}, -1, output_vid_path, text_dsdg, res_dsdg |
|
|
|
|
|
def upload_to_s3(vid_path, app_version, *labels): |
|
folder = 'demo' |
|
bucket_name = 'livenessng' |
|
|
|
if vid_path is None: |
|
return 'Error. Take a photo first.' |
|
elif labels[-2] == -2: |
|
return 'Error. Run the detection first.' |
|
elif labels[0] is None: |
|
return 'Error. Select the true label first.' |
|
elif labels[0] == 2: |
|
labels[0] = -1 |
|
|
|
|
|
s3 = boto3.client('s3') |
|
|
|
|
|
encoded_labels = '_'.join([str(int(label)) for label in labels]) |
|
random_string = str(uuid.uuid4()).split('-')[-1] |
|
video_name = f"{folder}/{app_version}/{encoded_labels}_{random_string}.mp4" |
|
|
|
|
|
with open(vid_path, 'rb') as video_file: |
|
res = s3.upload_fileobj(video_file, bucket_name, video_name) |
|
|
|
|
|
status = 'Successfully uploaded' |
|
return status |
|
|
|
|
|
demo = gr.Blocks() |
|
|
|
with demo: |
|
with gr.Row(): |
|
with gr.Column(): |
|
input_vid = gr.Video(format='mp4', source='webcam') |
|
dsdg_thresh = gr.Slider(value=DSDG_THRESHOLD, label='DSDG threshold', maximum=300, step=5) |
|
btn_run = gr.Button(value="Run") |
|
with gr.Column(): |
|
outputs=[ |
|
gr.Video(label='DeePixBiS', format='mp4'), |
|
gr.Label(num_top_classes=2, label='DeePixBiS'), |
|
gr.Number(visible=False, value=-2), |
|
gr.Video(label='DSDG', format='mp4'), |
|
gr.Textbox(label='DSDG'), |
|
gr.Number(visible=False, value=-2)] |
|
with gr.Column(): |
|
radio = gr.Radio( |
|
["Spoof", "Real", "None"], label="True label", type='index') |
|
flag = gr.Button(value="Flag") |
|
status = gr.Textbox() |
|
|
|
|
|
btn_run.click(process_video, [input_vid, dsdg_thresh], outputs) |
|
app_version_block = gr.Textbox(value=app_version, visible=False) |
|
flag.click( |
|
upload_to_s3, |
|
[input_vid, app_version_block, radio]+[outputs[2], outputs[5]], |
|
[status], show_progress=True) |
|
|
|
|
|
if __name__ == '__main__': |
|
demo.queue(concurrency_count=2) |
|
demo.launch(share=False) |
|
|