Spaces:
Running
on
Zero
Running
on
Zero
import torch | |
import os | |
from concurrent.futures import ThreadPoolExecutor | |
from pydub import AudioSegment | |
import cv2; cv2.setNumThreads(0); cv2.ocl.setUseOpenCL(False) | |
from pathlib import Path | |
import subprocess | |
from pathlib import Path | |
import av | |
import imageio | |
import numpy as np | |
from rich.progress import track | |
from tqdm import tqdm | |
import stf_alternative | |
import os.path as osp | |
import shutil | |
import zipfile | |
def exec_cmd(cmd): | |
subprocess.run( | |
cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT | |
) | |
def images2video(images, wfp, **kwargs): | |
fps = kwargs.get("fps", 24) | |
video_format = kwargs.get("format", "mp4") # default is mp4 format | |
codec = kwargs.get("codec", "libx264") # default is libx264 encoding | |
quality = kwargs.get("quality") # video quality | |
pixelformat = kwargs.get("pixelformat", "yuv420p") # video pixel format | |
image_mode = kwargs.get("image_mode", "rgb") | |
macro_block_size = kwargs.get("macro_block_size", 2) | |
ffmpeg_params = ["-crf", str(kwargs.get("crf", 18))] | |
writer = imageio.get_writer( | |
wfp, | |
fps=fps, | |
format=video_format, | |
codec=codec, | |
quality=quality, | |
ffmpeg_params=ffmpeg_params, | |
pixelformat=pixelformat, | |
macro_block_size=macro_block_size, | |
) | |
n = len(images) | |
for i in track(range(n), description="writing", transient=True): | |
if image_mode.lower() == "bgr": | |
writer.append_data(images[i][..., ::-1]) | |
else: | |
writer.append_data(images[i]) | |
writer.close() | |
# print(f':smiley: Dump to {wfp}\n', style="bold green") | |
print(f"Dump to {wfp}\n") | |
def merge_audio_video(video_fp, audio_fp, wfp): | |
if osp.exists(video_fp) and osp.exists(audio_fp): | |
cmd = f"ffmpeg -i {video_fp} -i {audio_fp} -c:v copy -c:a aac {wfp} -y" | |
exec_cmd(cmd) | |
print(f"merge {video_fp} and {audio_fp} to {wfp}") | |
else: | |
print(f"video_fp: {video_fp} or audio_fp: {audio_fp} not exists!") | |
class STFPipeline: | |
def __init__(self, | |
stf_path: str = "/home/user/app/stf/", | |
device: str = "cuda:0", | |
template_video_path: str = "templates/front_one_piece_dress_nodded_cut.webm", | |
config_path: str = "front_config.json", | |
checkpoint_path: str = "089.pth", | |
#root_path: str = "works" | |
root_path: str = "/tmp/works", | |
female_video: bool=True | |
): | |
#os.makedirs(root_path, exist_ok=True) | |
shutil.copytree('/home/user/app/stf/works', '/tmp/works', dirs_exist_ok=True) | |
if female_video: | |
dir_zip= os.path.join(root_path, 'preprocess/nasilhong_f_v1_front/crop_video_front_one_piece_dress_nodded_cut.zip') | |
dir_target=os.path.join(root_path,'preprocess/nasilhong_f_v1_front/') | |
zipfile.ZipFile(dir_zip, 'r').extractall(dir_target) | |
dir_zip=os.path.join(root_path,'preprocess/nasilhong_f_v1_front/front_one_piece_dress_nodded_cut.zip') | |
dir_target=os.path.join(root_path,'preprocess/nasilhong_f_v1_front/') | |
zipfile.ZipFile(dir_zip, 'r').extractall(dir_target) | |
else: | |
dir_zip= os.path.join(root_path, 'preprocess/Ian_v3_front/crop_video_Cam2_2309071202_0012_Natural_Looped.zip') | |
dir_target=os.path.join(root_path,'preprocess/Ian_v3_front/') | |
zipfile.ZipFile(dir_zip, 'r').extractall(dir_target) | |
dir_zip=os.path.join(root_path,'preprocess/Ian_v3_front/Cam2_2309071202_0012_Natural_Looped.zip') | |
dir_target=os.path.join(root_path,'preprocess/Ian_v3_front/') | |
zipfile.ZipFile(dir_zip, 'r').extractall(dir_target) | |
self.config_path = os.path.join(stf_path, config_path) | |
self.checkpoint_path = os.path.join('/tmp/stf/', checkpoint_path) #stf_path, checkpoint_path) | |
#self.work_root_path = os.path.join(stf_path, root_path) | |
self.work_root_path = os.path.join(root_path) | |
self.device = device | |
self.template_video_path=os.path.join(stf_path, template_video_path) | |
# model = stf_alternative.create_model( | |
# config_path=config_path, | |
# checkpoint_path=checkpoint_path, | |
# work_root_path=work_root_path, | |
# device=device, | |
# wavlm_path="microsoft/wavlm-large", | |
# ) | |
# self.template = stf_alternative.Template( | |
# model=model, | |
# config_path=config_path, | |
# template_video_path=template_video_path, | |
# ) | |
def execute(self, audio: str): | |
model = stf_alternative.create_model( | |
config_path=self.config_path, | |
checkpoint_path=self.checkpoint_path, | |
work_root_path=self.work_root_path, | |
device=self.device, | |
wavlm_path="microsoft/wavlm-large", | |
) | |
self.template = stf_alternative.Template( | |
model=model, | |
config_path=self.config_path, | |
template_video_path=self.template_video_path, | |
) | |
# Path("dubbing").mkdir(exist_ok=True) | |
# save_path = os.path.join("dubbing", Path(audio).stem+"--lip.mp4") | |
Path("/tmp/dubbing").mkdir(exist_ok=True) | |
save_path = os.path.join("/tmp/dubbing", Path(audio).stem+"--lip.mp4") | |
reader = iter(self.template._get_reader(num_skip_frames=0)) | |
audio_segment = AudioSegment.from_file(audio) | |
pivot = 0 | |
results = [] | |
# try: | |
# gen_infer = self.template.gen_infer( | |
# audio_segment, | |
# pivot, | |
# ) | |
# for idx, (it, chunk) in enumerate(gen_infer, pivot): | |
# frame = next(reader) | |
# composed = self.template.compose(idx, frame, it) | |
# frame_name = f"{idx}".zfill(5)+".jpg" | |
# results.append(it['pred']) | |
# pivot = idx + 1 | |
# except StopIteration as e: | |
# pass | |
with ThreadPoolExecutor(1) as p: | |
try: | |
gen_infer = self.template.gen_infer_concurrent( | |
p, | |
audio_segment, | |
pivot, | |
) | |
for idx, (it, chunk) in enumerate(gen_infer, pivot): | |
frame = next(reader) | |
composed = self.template.compose(idx, frame, it) | |
frame_name = f"{idx}".zfill(5)+".jpg" | |
results.append(it['pred']) | |
pivot = idx + 1 | |
except StopIteration as e: | |
pass | |
images2video(results, save_path) | |
save_path_aud = save_path.replace('.mp4', '_aud.mp4') | |
merge_audio_video(save_path, audio, save_path_aud) | |
return save_path_aud #save_path |