|
import os
|
|
from pathlib import Path
|
|
from dataclasses import dataclass
|
|
from typing import Generator
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
from ffmpeg import FFmpeg
|
|
|
|
|
|
MODELS_DIR = Path('models')
|
|
os.environ['DEEPFACE_HOME'] = str(MODELS_DIR)
|
|
|
|
|
|
|
|
import tensorflow as tf
|
|
|
|
os.environ['TF_GPU_ALLOCATOR'] = 'cuda_malloc_async'
|
|
gpus = tf.config.experimental.list_physical_devices('GPU')
|
|
if gpus:
|
|
try:
|
|
for gpu in gpus:
|
|
tf.config.experimental.set_memory_growth(gpu, True)
|
|
except RuntimeError as ex:
|
|
print(ex)
|
|
|
|
|
|
from deepface import DeepFace
|
|
|
|
|
|
|
|
DEEPFACE_DICT = dict[str, str | int | float | dict[str, str | int | float | None]]
|
|
|
|
|
|
|
|
@dataclass
|
|
class DetectResult:
|
|
face_conf: float | None = None
|
|
gender: str | None = None
|
|
gender_conf: float | None = None
|
|
age: int | None = None
|
|
race: str | None = None
|
|
race_conf: float | None = None
|
|
emotion: str | None = None
|
|
emotion_conf: float | None = None
|
|
x: int | None = None
|
|
y: int | None = None
|
|
w: int | None = None
|
|
h: int | None = None
|
|
|
|
|
|
|
|
def __post_init__(self):
|
|
if self.face_conf == 0:
|
|
self.__dict__.update(DetectResult().__dict__)
|
|
|
|
|
|
|
|
@classmethod
|
|
def from_dict(cls, detect_dict: DEEPFACE_DICT):
|
|
return cls(
|
|
face_conf=detect_dict['face_confidence'],
|
|
gender=detect_dict['dominant_gender'],
|
|
gender_conf=detect_dict['gender'][detect_dict['dominant_gender']],
|
|
age=detect_dict['age'],
|
|
race=detect_dict['dominant_race'],
|
|
race_conf=detect_dict['race'][detect_dict['dominant_race']],
|
|
emotion=detect_dict['dominant_emotion'],
|
|
emotion_conf=detect_dict['emotion'][detect_dict['dominant_emotion']],
|
|
x=detect_dict['region']['x'],
|
|
y=detect_dict['region']['y'],
|
|
w=detect_dict['region']['w'],
|
|
h=detect_dict['region']['h'],
|
|
)
|
|
|
|
|
|
|
|
def as_dict(self):
|
|
return self.__dict__
|
|
|
|
|
|
|
|
def keys(self):
|
|
return self.__dict__.keys()
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
if self.face_conf is None:
|
|
repr_text = f'DetectResult(face_conf=None'
|
|
else:
|
|
repr_text = 'DetectResult(\n ' + \
|
|
'\n '.join([f'{k}={v}' for k, v in self.as_dict().items()])
|
|
return repr_text + '\n)\n'
|
|
|
|
|
|
class Detector:
|
|
def __init__(self):
|
|
self.box_color = 'red'
|
|
self.text_color = 'yellow'
|
|
self.fill_color = 'black'
|
|
self.font_path = 'fonts/LiberationMono-Regular.ttf'
|
|
self.font_scale = 40
|
|
self.detections_all_frames = []
|
|
self.save_video_path = 'result_video.mp4'
|
|
self.result_csv_path = 'video_annotations.csv'
|
|
|
|
self.detector_backend = 'ssd'
|
|
weights_dir = MODELS_DIR / '.deepface' / 'weights'
|
|
self.is_first_run = True
|
|
|
|
if Path(self.result_csv_path).is_file():
|
|
Path(self.result_csv_path).unlink(missing_ok=True)
|
|
|
|
if len(list(weights_dir.iterdir())) == 0:
|
|
self.first_detect_and_load_weights()
|
|
|
|
|
|
|
|
def first_detect_and_load_weights(self):
|
|
DeepFace.analyze(
|
|
img_path=np.random.randint(0, 256, size=(28, 28, 3), dtype=np.uint8),
|
|
actions=['age', 'gender', 'race', 'emotion'],
|
|
detector_backend=self.detector_backend,
|
|
align=False,
|
|
enforce_detection=False,
|
|
silent=True,
|
|
)
|
|
|
|
|
|
|
|
def detect_image(self, image: str | np.ndarray, actions: list[str], align: bool) -> list[DetectResult]:
|
|
detection_dicts = DeepFace.analyze(
|
|
img_path=image,
|
|
actions=actions,
|
|
detector_backend=self.detector_backend,
|
|
align=align,
|
|
enforce_detection=False,
|
|
silent=True,
|
|
)
|
|
detections = [DetectResult.from_dict(detection) for detection in detection_dicts]
|
|
return detections
|
|
|
|
|
|
|
|
def draw_detections(
|
|
self,
|
|
np_image_rgb: np.ndarray,
|
|
detections: list[DetectResult],
|
|
face_conf_threshold: float,
|
|
) -> np.ndarray:
|
|
pil_image = Image.fromarray(np_image_rgb)
|
|
draw = ImageDraw.Draw(pil_image)
|
|
font_size = pil_image.size[1] // self.font_scale
|
|
|
|
font = ImageFont.truetype(self.font_path, size=font_size)
|
|
for detection in detections:
|
|
if detection.face_conf is not None:
|
|
if detection.face_conf > face_conf_threshold:
|
|
x, y, w, h = detection.x, detection.y, detection.w, detection.h
|
|
text = f'{detection.gender},{detection.age},{detection.race},{detection.emotion}'
|
|
draw.rectangle((x, y, x + w, y + h), outline=self.box_color, width=6)
|
|
_, _, text_width, text_height = font.getbbox(text)
|
|
if (x + text_width) > pil_image.size[0]:
|
|
x -= text_width / 2
|
|
draw.rectangle(xy=(x, y - text_height, x + text_width, y), fill=self.fill_color)
|
|
draw.text(xy=(x, y), text=text, font=font, fill=self.text_color, anchor='lb')
|
|
return np.array(pil_image)
|
|
|
|
|
|
|
|
|
|
def detect_video(
|
|
self,
|
|
video_file: str | Path,
|
|
actions: list[str],
|
|
align: bool,
|
|
face_conf_threshold: float,
|
|
) -> Generator[tuple[int, int], None, None]:
|
|
|
|
cap_read = cv2.VideoCapture(str(video_file))
|
|
frames_width = int(cap_read.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
frames_height = int(cap_read.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
frames_fps = int(cap_read.get(cv2.CAP_PROP_FPS))
|
|
total_frames = int(cap_read.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
|
|
cap_write = cv2.VideoWriter(
|
|
filename=self.save_video_path,
|
|
fourcc=cv2.VideoWriter_fourcc(*'mp4v'),
|
|
fps=frames_fps,
|
|
frameSize=(frames_width, frames_height),
|
|
)
|
|
|
|
self.detections_all_frames = []
|
|
frames_count = 0
|
|
|
|
while cap_read.isOpened():
|
|
ret, np_image_bgr = cap_read.read()
|
|
if not ret:
|
|
break
|
|
detections = self.detect_image(
|
|
image=np_image_bgr,
|
|
actions=actions,
|
|
align=align,
|
|
)
|
|
self.detections_all_frames.append(detections)
|
|
|
|
if detections[0].face_conf is not None:
|
|
np_image_rgb = cv2.cvtColor(np_image_bgr, cv2.COLOR_BGR2RGB)
|
|
result_np_image = self.draw_detections(
|
|
np_image_rgb=np_image_rgb,
|
|
detections=detections,
|
|
face_conf_threshold=face_conf_threshold,
|
|
)
|
|
np_image_bgr = cv2.cvtColor(result_np_image, cv2.COLOR_RGB2BGR)
|
|
|
|
cap_write.write(np_image_bgr)
|
|
frames_count += 1
|
|
yield (frames_count, total_frames)
|
|
|
|
cap_write.release()
|
|
cap_read.release()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def detections_to_df(self) -> pd.DataFrame:
|
|
if not Path(self.save_video_path).exists():
|
|
print('Нет видео с результатами детекции')
|
|
return None
|
|
|
|
if len(self.detections_all_frames) == 0:
|
|
print('Не найдено ни одного объекта')
|
|
return None
|
|
|
|
cap_read = cv2.VideoCapture(str(self.save_video_path))
|
|
frames_fps = int(cap_read.get(cv2.CAP_PROP_FPS))
|
|
cap_read.release()
|
|
|
|
df_list = []
|
|
for frame_num, detections in enumerate(self.detections_all_frames, start=1):
|
|
for detection in detections:
|
|
df_list.append({'frame_num': frame_num, **detection.as_dict()})
|
|
|
|
df = pd.DataFrame(df_list)
|
|
df.insert(loc=1, column='frame_sec', value=df.frame_num / frames_fps)
|
|
df['face_detected'] = df['gender'].notna().astype(int)
|
|
df.to_csv(self.result_csv_path, index=False)
|
|
return self.result_csv_path
|
|
|
|
|
|
|
|
@staticmethod
|
|
def convert_mp4(input_video_path: str | Path, output_video_path: str | Path) -> None:
|
|
ffmpeg = FFmpeg().option('y').input(input_video_path).output(output_video_path)
|
|
ffmpeg.execute()
|
|
|
|
detector_model = Detector()
|
|
|