Spaces:
Running
Running
import cv2 | |
import onnxruntime as rt | |
import sys | |
from insightface.app import FaceAnalysis | |
sys.path.insert(1, './recognition') | |
from scrfd import SCRFD | |
from arcface_onnx import ArcFaceONNX | |
import os.path as osp | |
import os | |
from pathlib import Path | |
from tqdm import tqdm | |
import ffmpeg | |
import random | |
import multiprocessing as mp | |
from concurrent.futures import ThreadPoolExecutor | |
from insightface.model_zoo.inswapper import INSwapper | |
import psutil | |
from enum import Enum | |
from insightface.app.common import Face | |
from insightface.utils.storage import ensure_available | |
import re | |
import subprocess | |
class RefacerMode(Enum): | |
CPU, CUDA, COREML, TENSORRT = range(1, 5) | |
class Refacer: | |
def __init__(self,force_cpu=False,colab_performance=False): | |
self.first_face = False | |
self.force_cpu = force_cpu | |
self.colab_performance = colab_performance | |
self.__check_encoders() | |
self.__check_providers() | |
self.total_mem = psutil.virtual_memory().total | |
self.__init_apps() | |
def __check_providers(self): | |
if self.force_cpu : | |
self.providers = ['CPUExecutionProvider'] | |
else: | |
self.providers = rt.get_available_providers() | |
rt.set_default_logger_severity(4) | |
self.sess_options = rt.SessionOptions() | |
self.sess_options.execution_mode = rt.ExecutionMode.ORT_SEQUENTIAL | |
self.sess_options.graph_optimization_level = rt.GraphOptimizationLevel.ORT_ENABLE_ALL | |
if len(self.providers) == 1 and 'CPUExecutionProvider' in self.providers: | |
self.mode = RefacerMode.CPU | |
self.use_num_cpus = mp.cpu_count()-1 | |
self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3) | |
print(f"CPU mode with providers {self.providers}") | |
elif self.colab_performance: | |
self.mode = RefacerMode.TENSORRT | |
self.use_num_cpus = mp.cpu_count()-1 | |
self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3) | |
print(f"TENSORRT mode with providers {self.providers}") | |
elif 'CoreMLExecutionProvider' in self.providers: | |
self.mode = RefacerMode.COREML | |
self.use_num_cpus = mp.cpu_count()-1 | |
self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3) | |
print(f"CoreML mode with providers {self.providers}") | |
elif 'CUDAExecutionProvider' in self.providers: | |
self.mode = RefacerMode.CUDA | |
self.use_num_cpus = 2 | |
self.sess_options.intra_op_num_threads = 1 | |
if 'TensorrtExecutionProvider' in self.providers: | |
self.providers.remove('TensorrtExecutionProvider') | |
print(f"CUDA mode with providers {self.providers}") | |
""" | |
elif 'TensorrtExecutionProvider' in self.providers: | |
self.mode = RefacerMode.TENSORRT | |
#self.use_num_cpus = 1 | |
#self.sess_options.intra_op_num_threads = 1 | |
self.use_num_cpus = mp.cpu_count()-1 | |
self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3) | |
print(f"TENSORRT mode with providers {self.providers}") | |
""" | |
def __init_apps(self): | |
assets_dir = ensure_available('models', 'buffalo_l', root='~/.insightface') | |
model_path = os.path.join(assets_dir, 'det_10g.onnx') | |
sess_face = rt.InferenceSession(model_path, self.sess_options, providers=self.providers) | |
self.face_detector = SCRFD(model_path,sess_face) | |
self.face_detector.prepare(0,input_size=(640, 640)) | |
model_path = os.path.join(assets_dir , 'w600k_r50.onnx') | |
sess_rec = rt.InferenceSession(model_path, self.sess_options, providers=self.providers) | |
self.rec_app = ArcFaceONNX(model_path,sess_rec) | |
self.rec_app.prepare(0) | |
model_path = 'inswapper_128.onnx' | |
sess_swap = rt.InferenceSession(model_path, self.sess_options, providers=self.providers) | |
self.face_swapper = INSwapper(model_path,sess_swap) | |
def prepare_faces(self, faces): | |
self.replacement_faces=[] | |
for face in faces: | |
#image1 = cv2.imread(face.origin) | |
if "origin" in face: | |
face_threshold = face['threshold'] | |
bboxes1, kpss1 = self.face_detector.autodetect(face['origin'], max_num=1) | |
if len(kpss1)<1: | |
raise Exception('No face detected on "Face to replace" image') | |
feat_original = self.rec_app.get(face['origin'], kpss1[0]) | |
else: | |
face_threshold = 0 | |
self.first_face = True | |
feat_original = None | |
print('No origin image: First face change') | |
#image2 = cv2.imread(face.destination) | |
_faces = self.__get_faces(face['destination'],max_num=1) | |
if len(_faces)<1: | |
raise Exception('No face detected on "Destination face" image') | |
self.replacement_faces.append((feat_original,_faces[0],face_threshold)) | |
def __convert_video(self,video_path,output_video_path): | |
if self.video_has_audio: | |
print("Merging audio with the refaced video...") | |
new_path = output_video_path + str(random.randint(0,999)) + "_c.mp4" | |
#stream = ffmpeg.input(output_video_path) | |
in1 = ffmpeg.input(output_video_path) | |
in2 = ffmpeg.input(video_path) | |
out = ffmpeg.output(in1.video, in2.audio, new_path,video_bitrate=self.ffmpeg_video_bitrate,vcodec=self.ffmpeg_video_encoder) | |
out.run(overwrite_output=True,quiet=True) | |
else: | |
new_path = output_video_path | |
print("The video doesn't have audio, so post-processing is not necessary") | |
print(f"The process has finished.\nThe refaced video can be found at {os.path.abspath(new_path)}") | |
return new_path | |
def __get_faces(self,frame,max_num=0): | |
bboxes, kpss = self.face_detector.detect(frame,max_num=max_num,metric='default') | |
if bboxes.shape[0] == 0: | |
return [] | |
ret = [] | |
for i in range(bboxes.shape[0]): | |
bbox = bboxes[i, 0:4] | |
det_score = bboxes[i, 4] | |
kps = None | |
if kpss is not None: | |
kps = kpss[i] | |
face = Face(bbox=bbox, kps=kps, det_score=det_score) | |
face.embedding = self.rec_app.get(frame, kps) | |
ret.append(face) | |
return ret | |
def process_first_face(self,frame): | |
faces = self.__get_faces(frame,max_num=1) | |
if len(faces) != 0: | |
frame = self.face_swapper.get(frame, faces[0], self.replacement_faces[0][1], paste_back=True) | |
return frame | |
def process_faces(self,frame): | |
faces = self.__get_faces(frame,max_num=0) | |
for rep_face in self.replacement_faces: | |
for i in range(len(faces) - 1, -1, -1): | |
sim = self.rec_app.compute_sim(rep_face[0], faces[i].embedding) | |
if sim>=rep_face[2]: | |
frame = self.face_swapper.get(frame, faces[i], rep_face[1], paste_back=True) | |
del faces[i] | |
break | |
return frame | |
def __check_video_has_audio(self,video_path): | |
self.video_has_audio = False | |
probe = ffmpeg.probe(video_path) | |
audio_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'audio'), None) | |
if audio_stream is not None: | |
self.video_has_audio = True | |
def reface_group(self, faces, frames, output): | |
with ThreadPoolExecutor(max_workers = self.use_num_cpus) as executor: | |
if self.first_face: | |
results = list(tqdm(executor.map(self.process_first_face, frames), total=len(frames),desc="Processing frames")) | |
else: | |
results = list(tqdm(executor.map(self.process_faces, frames), total=len(frames),desc="Processing frames")) | |
for result in results: | |
output.write(result) | |
def reface(self, video_path, faces): | |
self.__check_video_has_audio(video_path) | |
output_video_path = os.path.join('out',Path(video_path).name) | |
self.prepare_faces(faces) | |
cap = cv2.VideoCapture(video_path) | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
print(f"Total frames: {total_frames}") | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
output = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height)) | |
frames=[] | |
self.k = 1 | |
with tqdm(total=total_frames,desc="Extracting frames") as pbar: | |
while cap.isOpened(): | |
flag, frame = cap.read() | |
if flag and len(frame)>0: | |
frames.append(frame.copy()) | |
pbar.update() | |
else: | |
break | |
if (len(frames) > 1000): | |
self.reface_group(faces,frames,output) | |
frames=[] | |
cap.release() | |
pbar.close() | |
self.reface_group(faces,frames,output) | |
frames=[] | |
output.release() | |
return self.__convert_video(video_path,output_video_path) | |
def __try_ffmpeg_encoder(self, vcodec): | |
print(f"Trying FFMPEG {vcodec} encoder") | |
command = ['ffmpeg', '-y', '-f','lavfi','-i','testsrc=duration=1:size=1280x720:rate=30','-vcodec',vcodec,'testsrc.mp4'] | |
try: | |
subprocess.run(command, check=True, capture_output=True).stderr | |
except subprocess.CalledProcessError as e: | |
print(f"FFMPEG {vcodec} encoder doesn't work -> Disabled.") | |
return False | |
print(f"FFMPEG {vcodec} encoder works") | |
return True | |
def __check_encoders(self): | |
self.ffmpeg_video_encoder='libx264' | |
self.ffmpeg_video_bitrate='0' | |
pattern = r"encoders: ([a-zA-Z0-9_]+(?: [a-zA-Z0-9_]+)*)" | |
command = ['ffmpeg', '-codecs', '--list-encoders'] | |
commandout = subprocess.run(command, check=True, capture_output=True).stdout | |
result = commandout.decode('utf-8').split('\n') | |
for r in result: | |
if "264" in r: | |
encoders = re.search(pattern, r).group(1).split(' ') | |
for v_c in Refacer.VIDEO_CODECS: | |
for v_k in encoders: | |
if v_c == v_k: | |
if self.__try_ffmpeg_encoder(v_k): | |
self.ffmpeg_video_encoder=v_k | |
self.ffmpeg_video_bitrate=Refacer.VIDEO_CODECS[v_k] | |
print(f"Video codec for FFMPEG: {self.ffmpeg_video_encoder}") | |
return | |
VIDEO_CODECS = { | |
'h264_videotoolbox':'0', #osx HW acceleration | |
'h264_nvenc':'0', #NVIDIA HW acceleration | |
#'h264_qsv', #Intel HW acceleration | |
#'h264_vaapi', #Intel HW acceleration | |
#'h264_omx', #HW acceleration | |
'libx264':'0' #No HW acceleration | |
} | |