added averaging and parallelism
Browse files
app.py
CHANGED
@@ -32,12 +32,13 @@ os.environ['OMP_NUM_THREADS'] = '4'
|
|
32 |
os.environ['AWS_ACCESS_KEY_ID'] = 'AKIA3JAMX4K53MFDKMGJ'
|
33 |
os.environ['AWS_SECRET_ACCESS_KEY'] = 'lHf9xIwdgO3eXrE9a4KL+BTJ7af2cgZJYRRxw4NI'
|
34 |
|
35 |
-
app_version = '
|
36 |
|
37 |
device = torch.device("cpu")
|
38 |
labels = ['Live', 'Spoof']
|
39 |
PIX_THRESHOLD = 0.45
|
40 |
-
DSDG_THRESHOLD = 0
|
|
|
41 |
MIN_FACE_WIDTH_THRESHOLD = 210
|
42 |
examples = [
|
43 |
['examples/1_1_21_2_33_scene_fake.jpg'],
|
@@ -79,29 +80,6 @@ class Normaliztion_valtest(object):
|
|
79 |
return image_x
|
80 |
|
81 |
|
82 |
-
def prepare_data_dsdg(images, boxes, depths):
|
83 |
-
transform = transforms.Compose([Normaliztion_valtest()])
|
84 |
-
files_total = 1
|
85 |
-
image_x = np.zeros((files_total, 256, 256, 3))
|
86 |
-
depth_x = np.ones((files_total, 32, 32))
|
87 |
-
|
88 |
-
for i, (image, bbox, depth_img) in enumerate(
|
89 |
-
zip(images, boxes, depths)):
|
90 |
-
x, y, x2, y2 = bbox
|
91 |
-
depth_img = cv.cvtColor(depth_img, cv.COLOR_RGB2GRAY)
|
92 |
-
image = image[y:y2, x:x2]
|
93 |
-
depth_img = depth_img[y:y2, x:x2]
|
94 |
-
|
95 |
-
image_x[i, :, :, :] = cv.resize(image, (256, 256))
|
96 |
-
# transform to binary mask --> threshold = 0
|
97 |
-
depth_x[i, :, :] = cv.resize(depth_img, (32, 32))
|
98 |
-
image_x = image_x.transpose((0, 3, 1, 2))
|
99 |
-
image_x = transform(image_x)
|
100 |
-
image_x = torch.from_numpy(image_x.astype(float)).float()
|
101 |
-
depth_x = torch.from_numpy(depth_x.astype(float)).float()
|
102 |
-
return image_x, depth_x
|
103 |
-
|
104 |
-
|
105 |
def find_largest_face(faces):
|
106 |
# find the largest face in the list
|
107 |
largest_face = None
|
@@ -144,54 +122,84 @@ def deepix_model_inference(img, bbox):
|
|
144 |
return img_deepix, confidences_deepix, cls_deepix
|
145 |
|
146 |
|
147 |
-
def
|
148 |
-
|
149 |
-
|
150 |
-
|
151 |
-
|
152 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
153 |
if w < MIN_FACE_WIDTH_THRESHOLD:
|
154 |
color_dsdg = (0, 0, 0)
|
155 |
text = f'Small res ({w}*{h})'
|
156 |
-
img_dsdg = cv.rectangle(
|
157 |
cv.putText(img_dsdg, text, (x, y2 + 30),
|
158 |
cv.FONT_HERSHEY_COMPLEX, 1, color_dsdg)
|
159 |
-
cls_dsdg = -1
|
160 |
-
return img_dsdg,
|
161 |
-
|
162 |
-
|
163 |
-
|
164 |
-
|
165 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
166 |
with torch.no_grad():
|
167 |
map_score_list = []
|
168 |
-
image_x, map_x = prepare_data_dsdg(
|
169 |
# get the inputs
|
170 |
image_x = image_x.unsqueeze(0)
|
171 |
map_x = map_x.unsqueeze(0)
|
172 |
inputs = image_x.to(device)
|
173 |
test_maps = map_x.to(device)
|
174 |
optimizer.zero_grad()
|
|
|
|
|
175 |
map_score = 0.0
|
176 |
for frame_t in range(inputs.shape[1]):
|
177 |
mu, logvar, map_x, x_concat, x_Block1, x_Block2, x_Block3, x_input = cdcn_model(inputs[:, frame_t, :, :, :])
|
178 |
score_norm = torch.sum(mu) / torch.sum(test_maps[:, frame_t, :, :])
|
|
|
179 |
map_score += score_norm
|
180 |
map_score = map_score / inputs.shape[1]
|
181 |
map_score_list.append(map_score)
|
182 |
res_dsdg = map_score_list[0].item()
|
183 |
if res_dsdg > 10:
|
184 |
res_dsdg = 0.0
|
185 |
-
|
186 |
-
|
187 |
-
confidences_dsdg = {'Real confidence': res_dsdg}
|
188 |
-
color_dsdg = (0, 255, 0) if cls_dsdg == 'Real' else (255, 0, 0)
|
189 |
-
img_dsdg = cv.rectangle(img.copy(), (x, y), (x2, y2), color_dsdg, 2)
|
190 |
-
cv.putText(img_dsdg, text, (x, y2 + 30),
|
191 |
-
cv.FONT_HERSHEY_COMPLEX, 1, color_dsdg)
|
192 |
-
res_dsdg = res_dsdg * 1000000
|
193 |
-
# cls_dsdg = 1 if cls_dsdg == 'Real' else 0
|
194 |
-
return img_dsdg, confidences_dsdg, res_dsdg
|
195 |
|
196 |
|
197 |
def inference(img, dsdg_thresh):
|
@@ -210,17 +218,16 @@ def inference(img, dsdg_thresh):
|
|
210 |
|
211 |
def process_video(vid_path, dsdg_thresh):
|
212 |
cap = cv.VideoCapture(vid_path)
|
213 |
-
|
214 |
input_width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH))
|
215 |
input_height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT))
|
216 |
-
|
217 |
-
# Set video codec and create VideoWriter object to save the output video
|
218 |
fourcc = cv.VideoWriter_fourcc(*'mp4v')
|
219 |
output_vid_path = 'output_dsdg.mp4'
|
220 |
-
|
221 |
-
|
222 |
frame_counter = 0
|
223 |
-
|
|
|
|
|
|
|
224 |
while cap.isOpened():
|
225 |
ret, frame = cap.read()
|
226 |
if not ret:
|
@@ -228,21 +235,38 @@ def process_video(vid_path, dsdg_thresh):
|
|
228 |
# Process only every 5th frame
|
229 |
if frame_counter % 5 == 0:
|
230 |
# Run inference on the current frame
|
231 |
-
|
232 |
-
|
233 |
-
|
234 |
-
|
235 |
-
|
236 |
-
|
|
|
237 |
frame_counter += 1
|
238 |
-
# Release resources
|
239 |
cap.release()
|
240 |
-
|
241 |
-
if not confidences_arr:
|
242 |
return vid_path, {'Not supported right now': 0}, -1, vid_path, 'Faces too small or not found', -1
|
243 |
-
|
244 |
-
|
245 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
246 |
|
247 |
|
248 |
def upload_to_s3(vid_path, app_version, *labels):
|
@@ -281,7 +305,7 @@ with demo:
|
|
281 |
with gr.Row():
|
282 |
with gr.Column():
|
283 |
input_vid = gr.Video(format='mp4', source='webcam')
|
284 |
-
dsdg_thresh = gr.Slider(value=DSDG_THRESHOLD, label='DSDG threshold', maximum=
|
285 |
btn_run = gr.Button(value="Run")
|
286 |
with gr.Column():
|
287 |
outputs=[
|
|
|
32 |
os.environ['AWS_ACCESS_KEY_ID'] = 'AKIA3JAMX4K53MFDKMGJ'
|
33 |
os.environ['AWS_SECRET_ACCESS_KEY'] = 'lHf9xIwdgO3eXrE9a4KL+BTJ7af2cgZJYRRxw4NI'
|
34 |
|
35 |
+
app_version = 'dsdg_vid_2'
|
36 |
|
37 |
device = torch.device("cpu")
|
38 |
labels = ['Live', 'Spoof']
|
39 |
PIX_THRESHOLD = 0.45
|
40 |
+
DSDG_THRESHOLD = 50.0
|
41 |
+
DSDG_FACTOR = 1000000
|
42 |
MIN_FACE_WIDTH_THRESHOLD = 210
|
43 |
examples = [
|
44 |
['examples/1_1_21_2_33_scene_fake.jpg'],
|
|
|
80 |
return image_x
|
81 |
|
82 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
83 |
def find_largest_face(faces):
|
84 |
# find the largest face in the list
|
85 |
largest_face = None
|
|
|
122 |
return img_deepix, confidences_deepix, cls_deepix
|
123 |
|
124 |
|
125 |
+
def get_depth_img(img, bbox):
|
126 |
+
bbox_conf = list(bbox)
|
127 |
+
bbox_conf.append(1)
|
128 |
+
param_lst, roi_box_lst = tddfa(img, [bbox_conf])
|
129 |
+
ver_lst = tddfa.recon_vers(param_lst, roi_box_lst, dense_flag=True)
|
130 |
+
depth_img = depth(img, ver_lst, tddfa.tri, with_bg_flag=False)
|
131 |
+
return depth_img
|
132 |
+
|
133 |
+
|
134 |
+
def analyze_face(img):
|
135 |
+
face = extract_face(img)
|
136 |
+
if face is None:
|
137 |
+
return img, (), None
|
138 |
+
x, y, w, h = face
|
139 |
+
x2 = x + w
|
140 |
+
y2 = y + h
|
141 |
+
bbox = (x, y, x2, y2)
|
142 |
+
img_dsdg = img.copy()
|
143 |
if w < MIN_FACE_WIDTH_THRESHOLD:
|
144 |
color_dsdg = (0, 0, 0)
|
145 |
text = f'Small res ({w}*{h})'
|
146 |
+
img_dsdg = cv.rectangle(img_dsdg, (x, y), (x2, y2), color_dsdg, 2)
|
147 |
cv.putText(img_dsdg, text, (x, y2 + 30),
|
148 |
cv.FONT_HERSHEY_COMPLEX, 1, color_dsdg)
|
149 |
+
# cls_dsdg = -1
|
150 |
+
return img_dsdg, bbox, None
|
151 |
+
depth_img = get_depth_img(img, bbox)
|
152 |
+
return img_dsdg, bbox, depth_img
|
153 |
+
|
154 |
+
|
155 |
+
def prepare_data_dsdg(images, boxes, depths):
|
156 |
+
transform = transforms.Compose([Normaliztion_valtest()])
|
157 |
+
files_total = len(images)
|
158 |
+
image_x = np.zeros((files_total, 256, 256, 3))
|
159 |
+
depth_x = np.ones((files_total, 32, 32))
|
160 |
+
|
161 |
+
for i, (image, bbox, depth_img) in enumerate(
|
162 |
+
zip(images, boxes, depths)):
|
163 |
+
x, y, x2, y2 = bbox
|
164 |
+
depth_img = cv.cvtColor(depth_img, cv.COLOR_RGB2GRAY)
|
165 |
+
image = image[y:y2, x:x2]
|
166 |
+
depth_img = depth_img[y:y2, x:x2]
|
167 |
+
|
168 |
+
image_x[i, :, :, :] = cv.resize(image, (256, 256))
|
169 |
+
# transform to binary mask --> threshold = 0
|
170 |
+
depth_x[i, :, :] = cv.resize(depth_img, (32, 32))
|
171 |
+
image_x = image_x.transpose((0, 3, 1, 2))
|
172 |
+
image_x = transform(image_x)
|
173 |
+
image_x = torch.from_numpy(image_x.astype(float)).float()
|
174 |
+
depth_x = torch.from_numpy(depth_x.astype(float)).float()
|
175 |
+
return image_x, depth_x
|
176 |
+
|
177 |
+
|
178 |
+
def dsdg_model_inference(imgs, bboxes, depth_imgs):
|
179 |
with torch.no_grad():
|
180 |
map_score_list = []
|
181 |
+
image_x, map_x = prepare_data_dsdg(imgs, bboxes, depth_imgs)
|
182 |
# get the inputs
|
183 |
image_x = image_x.unsqueeze(0)
|
184 |
map_x = map_x.unsqueeze(0)
|
185 |
inputs = image_x.to(device)
|
186 |
test_maps = map_x.to(device)
|
187 |
optimizer.zero_grad()
|
188 |
+
|
189 |
+
scores = []
|
190 |
map_score = 0.0
|
191 |
for frame_t in range(inputs.shape[1]):
|
192 |
mu, logvar, map_x, x_concat, x_Block1, x_Block2, x_Block3, x_input = cdcn_model(inputs[:, frame_t, :, :, :])
|
193 |
score_norm = torch.sum(mu) / torch.sum(test_maps[:, frame_t, :, :])
|
194 |
+
scores.append(score_norm.item() * DSDG_FACTOR)
|
195 |
map_score += score_norm
|
196 |
map_score = map_score / inputs.shape[1]
|
197 |
map_score_list.append(map_score)
|
198 |
res_dsdg = map_score_list[0].item()
|
199 |
if res_dsdg > 10:
|
200 |
res_dsdg = 0.0
|
201 |
+
res_dsdg = res_dsdg * DSDG_FACTOR
|
202 |
+
return res_dsdg, scores
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
203 |
|
204 |
|
205 |
def inference(img, dsdg_thresh):
|
|
|
218 |
|
219 |
def process_video(vid_path, dsdg_thresh):
|
220 |
cap = cv.VideoCapture(vid_path)
|
|
|
221 |
input_width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH))
|
222 |
input_height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT))
|
|
|
|
|
223 |
fourcc = cv.VideoWriter_fourcc(*'mp4v')
|
224 |
output_vid_path = 'output_dsdg.mp4'
|
225 |
+
|
|
|
226 |
frame_counter = 0
|
227 |
+
all_frames = []
|
228 |
+
inference_images = []
|
229 |
+
inference_bboxes = []
|
230 |
+
inference_depths = []
|
231 |
while cap.isOpened():
|
232 |
ret, frame = cap.read()
|
233 |
if not ret:
|
|
|
235 |
# Process only every 5th frame
|
236 |
if frame_counter % 5 == 0:
|
237 |
# Run inference on the current frame
|
238 |
+
frame = cv.cvtColor(frame, cv.COLOR_BGR2RGB)
|
239 |
+
img, bbox, depth_img = analyze_face(frame)
|
240 |
+
if bbox and (depth_img is not None):
|
241 |
+
inference_images.append(img)
|
242 |
+
inference_bboxes.append(bbox)
|
243 |
+
inference_depths.append(depth_img)
|
244 |
+
all_frames.append(img)
|
245 |
frame_counter += 1
|
|
|
246 |
cap.release()
|
247 |
+
if not inference_images:
|
|
|
248 |
return vid_path, {'Not supported right now': 0}, -1, vid_path, 'Faces too small or not found', -1
|
249 |
+
|
250 |
+
res_dsdg, scores = dsdg_model_inference(inference_images, inference_bboxes, inference_depths)
|
251 |
+
cls_dsdg = 'Real' if res_dsdg >= dsdg_thresh else 'Spoof'
|
252 |
+
for img, bbox, score in zip(inference_images, inference_bboxes, scores):
|
253 |
+
x, y, x2, y2 = bbox
|
254 |
+
w = x2 - x
|
255 |
+
h = y2 - y
|
256 |
+
frame_cls = 'Real' if score >= dsdg_thresh else 'Spoof'
|
257 |
+
color_dsdg = (0, 255, 0) if frame_cls == 'Real' else (255, 0, 0)
|
258 |
+
text = f'{cls_dsdg} {w}*{h}'
|
259 |
+
cv.rectangle(img, (x, y), (x2, y2), color_dsdg, 2)
|
260 |
+
cv.putText(img, text, (x, y2 + 30), cv.FONT_HERSHEY_COMPLEX, 1, color_dsdg)
|
261 |
+
|
262 |
+
out_dsdg = cv.VideoWriter(output_vid_path, fourcc, 6.0, (input_width, input_height))
|
263 |
+
for img in all_frames:
|
264 |
+
# Write the DSDG frame to the output video
|
265 |
+
img_dsdg = cv.cvtColor(img, cv.COLOR_RGB2BGR)
|
266 |
+
out_dsdg.write(img_dsdg)
|
267 |
+
out_dsdg.release()
|
268 |
+
text_dsdg = f'Label: {cls_dsdg}, average real confidence: {res_dsdg}\nFrames used: {len(scores)}\nConfidences: {scores}'
|
269 |
+
return vid_path, {'Not supported right now': 0}, -1, output_vid_path, text_dsdg, res_dsdg
|
270 |
|
271 |
|
272 |
def upload_to_s3(vid_path, app_version, *labels):
|
|
|
305 |
with gr.Row():
|
306 |
with gr.Column():
|
307 |
input_vid = gr.Video(format='mp4', source='webcam')
|
308 |
+
dsdg_thresh = gr.Slider(value=DSDG_THRESHOLD, label='DSDG threshold', maximum=300, step=5)
|
309 |
btn_run = gr.Button(value="Run")
|
310 |
with gr.Column():
|
311 |
outputs=[
|