Spaces:
Running
Running
File size: 6,198 Bytes
b43090c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
import os
import cv2
import numpy as np
from moviepy.editor import VideoFileClip
from .pose import PoseEstimator, PoseTransfer
from .face_enhance import GFPGAN
from .super_resolution import BSRGAN
from .face_det import FaceAnalysis
class PoseSwapper:
def __init__(self,
pose_estimator_name='openpose_body',
pose_estimator_model_dir='weights/models',
pose_transfer_name='pose_transfer',
pose_transfer_model_dir='weights/models',
image_sr_model='bsrgan',
image_sr_model_dir='weights/models',
face_enhance_name='gfpgan',
face_enhance_model_dir='weights/models/',
face_det_model='buffalo_l',
face_det_model_dir='weights/models',
log_iters=10,
use_enhancer=True,
use_sr=True,
scale=1):
pose_estimator = PoseEstimator(name=pose_estimator_name,
root=pose_estimator_model_dir)
self.pose_transfer = PoseTransfer(name=pose_transfer_name,
root=pose_transfer_model_dir,
pose_estimator=pose_estimator)
if use_enhancer:
self.det_model = FaceAnalysis(name=face_det_model,
root=face_det_model_dir)
self.det_model.prepare(ctx_id=1, det_size=(640, 640))
self.face_enhance = GFPGAN(name=face_enhance_name,
root=face_enhance_model_dir)
self.use_enhancer = use_enhancer
if use_sr:
self.sr_model = BSRGAN(name=image_sr_model,
root=image_sr_model_dir,
scale=scale)
self.scale = scale
else:
self.scale = 1
self.use_sr = use_sr
self.log_iters = log_iters
def run(self, input_path, target_path, output_dir='output'):
assert os.path.exists(
input_path), "The input path {} not exists.".format(input_path)
assert os.path.exists(
target_path), "The target path {} not exists.".format(target_path)
os.makedirs(output_dir, exist_ok=True)
assert input_path.lower().endswith(
('jpg', 'jpeg', 'webp', 'png', 'bmp')
), "pose swapper input must be image endswith ('jpg', 'jpeg', 'webp', 'png', 'bmp'), but got {}.".format(
input_path)
if target_path.lower().endswith(('jpg', 'jpeg', 'webp', 'png', 'bmp')):
return self.transfer_image(input_path, target_path, output_dir)
else:
return self.transfer_video(input_path, target_path, output_dir)
def transfer_image(self, input_path, target_path, output_dir):
source = cv2.imread(input_path)
target = cv2.imread(target_path)
transferred_image = self.transfer_pose(source,
target,
image_format='bgr')
base_name = os.path.basename(input_path)
output_path = os.path.join(output_dir, base_name)
cv2.imwrite(output_path, transferred_image)
return output_path
def transfer_video(self, input_path, target_path, output_dir):
source = cv2.imread(input_path)
video = cv2.VideoCapture(target_path)
fps = video.get(cv2.CAP_PROP_FPS)
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
height, width, _ = source.shape
frame_size = (width, height)
print('video fps: {}, total_frames: {}, width: {}, height: {}'.format(
fps, total_frames, width, height))
video_name = os.path.basename(input_path).split('.')[0]
four_cc = cv2.VideoWriter_fourcc('m', 'p', '4', 'v')
temp_video_path = os.path.join(output_dir,
'temp_{}.mp4'.format(video_name))
save_video_path = os.path.join(output_dir, '{}.mp4'.format(video_name))
output_video = cv2.VideoWriter(
temp_video_path, four_cc, fps,
(int(frame_size[0] * self.scale), int(frame_size[1] * self.scale)))
i = 0
while video.isOpened():
ret, frame = video.read()
if ret:
transferred_image = self.transfer_pose(source,
frame,
image_format='bgr')
i += 1
if i % self.log_iters == 0:
print('processing {}/{}'.format(i, total_frames))
output_video.write(transferred_image)
else:
break
video.release()
output_video.release()
print(temp_video_path)
self.add_audio_to_video(target_path, temp_video_path, save_video_path)
os.remove(temp_video_path)
return save_video_path
def transfer_pose(self, source, target, image_format='bgr'):
transferred_image = self.pose_transfer.get(source,
target,
image_format=image_format)
if self.use_enhancer:
faces = self.det_model.get(transferred_image, max_num=1)
for face in faces:
transferred_image = self.face_enhance.get(
transferred_image,
face,
paste_back=True,
image_format=image_format)
if self.use_sr:
transferred_image = self.sr_model.get(transferred_image,
image_format=image_format)
return transferred_image
def add_audio_to_video(self, src_video_path, target_video_path,
save_video_path):
audio = VideoFileClip(src_video_path).audio
target_video = VideoFileClip(target_video_path)
target_video = target_video.set_audio(audio)
target_video.write_videofile(save_video_path)
return target_video_path
|