|
import subprocess |
|
subprocess.run(["sh", "tddfa/build.sh"]) |
|
|
|
import gradio as gr |
|
from gradio.components import Dropdown |
|
|
|
import cv2 as cv |
|
import torch |
|
from torchvision import transforms |
|
from DeePixBiS.Model import DeePixBiS |
|
|
|
import yaml |
|
import numpy as np |
|
import pandas as pd |
|
from skimage.io import imread, imsave |
|
|
|
from tddfa.utils.depth import depth |
|
from tddfa.TDDFA import TDDFA |
|
|
|
import torch.optim as optim |
|
from DSDG.DUM.models.CDCNs_u import Conv2d_cd, CDCN_u |
|
|
|
import io |
|
import uuid |
|
import numpy as np |
|
from PIL import Image |
|
import boto3 |
|
|
|
import os |
|
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' |
|
os.environ['OMP_NUM_THREADS'] = '4' |
|
|
|
app_version = 'ddn1' |
|
|
|
device = torch.device("cpu") |
|
labels = ['Live', 'Spoof'] |
|
pix_threshhold = 0.45 |
|
dsdg_threshold = 0.003 |
|
examples = [ |
|
['examples/1_1_21_2_33_scene_fake.jpg'], |
|
['examples/frame150_real.jpg'], |
|
['examples/1_2.avi_125_real.jpg'], |
|
['examples/1_3.avi_25_fake.jpg']] |
|
faceClassifier = cv.CascadeClassifier('./DeePixBiS/Classifiers/haarface.xml') |
|
tfms = transforms.Compose([ |
|
transforms.ToPILImage(), |
|
transforms.Resize((224, 224)), |
|
transforms.ToTensor(), |
|
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) |
|
]) |
|
deepix_model = DeePixBiS(pretrained=False) |
|
deepix_model.load_state_dict(torch.load('./DeePixBiS/DeePixBiS.pth')) |
|
deepix_model.eval() |
|
|
|
|
|
depth_config_path = 'tddfa/configs/mb05_120x120.yml' |
|
cfg = yaml.load(open(depth_config_path), Loader=yaml.SafeLoader) |
|
tddfa = TDDFA(gpu_mode=False, **cfg) |
|
|
|
|
|
cdcn_model = CDCN_u(basic_conv=Conv2d_cd, theta=0.7) |
|
cdcn_model = cdcn_model.to(device) |
|
weights = torch.load('./DSDG/DUM/checkpoint/CDCN_U_P1_updated.pkl', map_location=device) |
|
cdcn_model.load_state_dict(weights) |
|
optimizer = optim.Adam(cdcn_model.parameters(), lr=0.001, weight_decay=0.00005) |
|
cdcn_model.eval() |
|
|
|
|
|
class Normaliztion_valtest(object): |
|
""" |
|
same as mxnet, normalize into [-1, 1] |
|
image = (image - 127.5)/128 |
|
""" |
|
def __call__(self, image_x): |
|
image_x = (image_x - 127.5) / 128 |
|
return image_x |
|
|
|
|
|
def prepare_data(images, boxes, depths): |
|
transform = transforms.Compose([Normaliztion_valtest()]) |
|
files_total = 1 |
|
image_x = np.zeros((files_total, 256, 256, 3)) |
|
depth_x = np.ones((files_total, 32, 32)) |
|
|
|
for i, (image, bbox, depth_img) in enumerate( |
|
zip(images, boxes, depths)): |
|
x, y, w, h = bbox |
|
depth_img = cv.cvtColor(depth_img, cv.COLOR_RGB2GRAY) |
|
image = image[y:y + h, x:x + w] |
|
depth_img = depth_img[y:y + h, x:x + w] |
|
|
|
image_x[i, :, :, :] = cv.resize(image, (256, 256)) |
|
|
|
depth_x[i, :, :] = cv.resize(depth_img, (32, 32)) |
|
image_x = image_x.transpose((0, 3, 1, 2)) |
|
image_x = transform(image_x) |
|
image_x = torch.from_numpy(image_x.astype(float)).float() |
|
depth_x = torch.from_numpy(depth_x.astype(float)).float() |
|
return image_x, depth_x |
|
|
|
|
|
def find_largest_face(faces): |
|
largest_face = None |
|
largest_area = 0 |
|
|
|
for (x, y, w, h) in faces: |
|
area = w * h |
|
if area > largest_area: |
|
largest_area = area |
|
largest_face = (x, y, w, h) |
|
return largest_face |
|
|
|
|
|
def inference(img): |
|
grey = cv.cvtColor(img, cv.COLOR_BGR2GRAY) |
|
faces = faceClassifier.detectMultiScale( |
|
grey, scaleFactor=1.1, minNeighbors=4) |
|
face = find_largest_face(faces) |
|
|
|
if face is not None: |
|
x, y, w, h = face |
|
faceRegion = img[y:y + h, x:x + w] |
|
faceRegion = cv.cvtColor(faceRegion, cv.COLOR_BGR2RGB) |
|
faceRegion = tfms(faceRegion) |
|
faceRegion = faceRegion.unsqueeze(0) |
|
|
|
|
|
mask, binary = deepix_model.forward(faceRegion) |
|
res_deepix = torch.mean(mask).item() |
|
cls_deepix = 'Real' if res_deepix >= pix_threshhold else 'Spoof' |
|
|
|
label_deepix = f'{cls_deepix} {res_deepix:.2f}' |
|
confidences_deepix = {label_deepix: res_deepix} |
|
color_deepix = (0, 255, 0) if cls_deepix == 'Real' else (255, 0, 0) |
|
img_deepix = cv.rectangle(img.copy(), (x, y), (x + w, y + h), color_deepix, 2) |
|
cv.putText(img_deepix, label_deepix, (x, y + h + 30), |
|
cv.FONT_HERSHEY_COMPLEX, 1, color_deepix) |
|
|
|
|
|
dense_flag = True |
|
boxes = list(face) |
|
boxes.append(1) |
|
param_lst, roi_box_lst = tddfa(img, [boxes]) |
|
|
|
ver_lst = tddfa.recon_vers(param_lst, roi_box_lst, dense_flag=dense_flag) |
|
depth_img = depth(img, ver_lst, tddfa.tri, with_bg_flag=False) |
|
with torch.no_grad(): |
|
map_score_list = [] |
|
image_x, map_x = prepare_data([img], [list(face)], [depth_img]) |
|
|
|
image_x = image_x.unsqueeze(0) |
|
map_x = map_x.unsqueeze(0) |
|
inputs = image_x.to(device) |
|
test_maps = map_x.to(device) |
|
optimizer.zero_grad() |
|
|
|
map_score = 0.0 |
|
for frame_t in range(inputs.shape[1]): |
|
mu, logvar, map_x, x_concat, x_Block1, x_Block2, x_Block3, x_input = cdcn_model(inputs[:, frame_t, :, :, :]) |
|
|
|
score_norm = torch.sum(mu) / torch.sum(test_maps[:, frame_t, :, :]) |
|
map_score += score_norm |
|
map_score = map_score / inputs.shape[1] |
|
map_score_list.append(map_score) |
|
|
|
res_dsdg = map_score_list[0].item() |
|
if res_dsdg > 10: |
|
res_dsdg = 0.0 |
|
cls_dsdg = 'Real' if res_dsdg >= dsdg_threshold else 'Spoof' |
|
res_dsdg = res_dsdg * 100 |
|
|
|
label_dsdg = f'{cls_dsdg} {res_dsdg:.2f}' |
|
confidences_dsdg = {label_dsdg: res_deepix} |
|
color_dsdg = (0, 255, 0) if cls_dsdg == 'Real' else (255, 0, 0) |
|
img_dsdg = cv.rectangle(img.copy(), (x, y), (x + w, y + h), color_dsdg, 2) |
|
cv.putText(img_dsdg, label_dsdg, (x, y + h + 30), |
|
cv.FONT_HERSHEY_COMPLEX, 1, color_dsdg) |
|
|
|
cls_deepix, cls_dsdg = [1 if cls_ == 'Real' else 0 for cls_ in [cls_deepix, cls_dsdg]] |
|
|
|
return img_deepix, confidences_deepix, img_dsdg, confidences_dsdg, cls_deepix, cls_dsdg |
|
else: |
|
return img, {}, img, {}, None, None |
|
|
|
|
|
def upload_to_s3(image_array, app_version, *labels): |
|
folder = 'demo' |
|
bucket_name = 'livenessng' |
|
|
|
|
|
s3 = boto3.client('s3') |
|
|
|
|
|
encoded_labels = '_'.join([str(label) for label in labels]) |
|
random_string = str(uuid.uuid4()).split('-')[-1] |
|
image_name = f"{folder}/{app_version}/{encoded_labels}_{random_string}.jpg" |
|
|
|
|
|
image = Image.fromarray(np.uint8(image_array * 255)) |
|
image_bytes = io.BytesIO() |
|
image.save(image_bytes, format='JPEG') |
|
image_bytes.seek(0) |
|
|
|
|
|
res = s3.upload_fileobj(image_bytes, bucket_name, image_name) |
|
|
|
status = 'Successfully uploaded' |
|
return status |
|
|
|
|
|
|
|
demo = gr.Blocks() |
|
|
|
with demo: |
|
input_img = gr.Image(source='webcam', shape=None, type='numpy') |
|
btn_run = gr.Button(value="Run") |
|
with gr.Column(): |
|
outputs=[ |
|
gr.Image(label='DeePixBiS', type='numpy'), |
|
gr.Label(num_top_classes=2, label='DeePixBiS'), |
|
gr.Image(label='DSDG', type='numpy'), |
|
gr.Label(num_top_classes=2, label='DSDG')] |
|
labels = [gr.Number(visible=False), gr.Number(visible=False)] |
|
btn_run.click(inference, [input_img], outputs+labels) |
|
|
|
app_version_block = gr.Textbox(value=app_version, visible=False) |
|
with gr.Column(): |
|
radio = gr.Radio( |
|
["Real", "Spoof", "None"], label="True label", type='index' |
|
) |
|
flag = gr.Button(value="Flag") |
|
status = gr.Textbox() |
|
flag.click(upload_to_s3, [input_img, app_version_block, radio]+labels, [status], show_progress=True) |
|
|
|
|
|
if __name__ == '__main__': |
|
demo.launch(share=False) |
|
|