import torch import os from concurrent.futures import ThreadPoolExecutor from pydub import AudioSegment import cv2; cv2.setNumThreads(0); cv2.ocl.setUseOpenCL(False) from pathlib import Path import subprocess from pathlib import Path import av import imageio import numpy as np from rich.progress import track from tqdm import tqdm import stf_alternative import os.path as osp import shutil import zipfile def exec_cmd(cmd): subprocess.run( cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) def images2video(images, wfp, **kwargs): fps = kwargs.get("fps", 24) video_format = kwargs.get("format", "mp4") # default is mp4 format codec = kwargs.get("codec", "libx264") # default is libx264 encoding quality = kwargs.get("quality") # video quality pixelformat = kwargs.get("pixelformat", "yuv420p") # video pixel format image_mode = kwargs.get("image_mode", "rgb") macro_block_size = kwargs.get("macro_block_size", 2) ffmpeg_params = ["-crf", str(kwargs.get("crf", 18))] writer = imageio.get_writer( wfp, fps=fps, format=video_format, codec=codec, quality=quality, ffmpeg_params=ffmpeg_params, pixelformat=pixelformat, macro_block_size=macro_block_size, ) n = len(images) for i in track(range(n), description="writing", transient=True): if image_mode.lower() == "bgr": writer.append_data(images[i][..., ::-1]) else: writer.append_data(images[i]) writer.close() # print(f':smiley: Dump to {wfp}\n', style="bold green") print(f"Dump to {wfp}\n") def merge_audio_video(video_fp, audio_fp, wfp): if osp.exists(video_fp) and osp.exists(audio_fp): cmd = f"ffmpeg -i {video_fp} -i {audio_fp} -c:v copy -c:a aac {wfp} -y" exec_cmd(cmd) print(f"merge {video_fp} and {audio_fp} to {wfp}") else: print(f"video_fp: {video_fp} or audio_fp: {audio_fp} not exists!") class STFPipeline: def __init__(self, stf_path: str = "/home/user/app/stf/", device: str = "cuda:0", template_video_path: str = "templates/front_one_piece_dress_nodded_cut.webm", config_path: str = "front_config.json", checkpoint_path: str = "089.pth", #root_path: str = "works" root_path: str = "/tmp/works", female_video: bool=True ): #os.makedirs(root_path, exist_ok=True) shutil.copytree('/home/user/app/stf/works', '/tmp/works', dirs_exist_ok=True) if female_video: dir_zip= os.path.join(root_path, 'preprocess/nasilhong_f_v1_front/crop_video_front_one_piece_dress_nodded_cut.zip') dir_target=os.path.join(root_path,'preprocess/nasilhong_f_v1_front/') zipfile.ZipFile(dir_zip, 'r').extractall(dir_target) dir_zip=os.path.join(root_path,'preprocess/nasilhong_f_v1_front/front_one_piece_dress_nodded_cut.zip') dir_target=os.path.join(root_path,'preprocess/nasilhong_f_v1_front/') zipfile.ZipFile(dir_zip, 'r').extractall(dir_target) else: dir_zip= os.path.join(root_path, 'preprocess/Ian_v3_front/crop_video_Cam2_2309071202_0012_Natural_Looped.zip') dir_target=os.path.join(root_path,'preprocess/Ian_v3_front/') zipfile.ZipFile(dir_zip, 'r').extractall(dir_target) dir_zip=os.path.join(root_path,'preprocess/Ian_v3_front/Cam2_2309071202_0012_Natural_Looped.zip') dir_target=os.path.join(root_path,'preprocess/Ian_v3_front/') zipfile.ZipFile(dir_zip, 'r').extractall(dir_target) self.config_path = os.path.join(stf_path, config_path) self.checkpoint_path = os.path.join('/tmp/stf/', checkpoint_path) #stf_path, checkpoint_path) #self.work_root_path = os.path.join(stf_path, root_path) self.work_root_path = os.path.join(root_path) self.device = device self.template_video_path=os.path.join(stf_path, template_video_path) # model = stf_alternative.create_model( # config_path=config_path, # checkpoint_path=checkpoint_path, # work_root_path=work_root_path, # device=device, # wavlm_path="microsoft/wavlm-large", # ) # self.template = stf_alternative.Template( # model=model, # config_path=config_path, # template_video_path=template_video_path, # ) def execute(self, audio: str): model = stf_alternative.create_model( config_path=self.config_path, checkpoint_path=self.checkpoint_path, work_root_path=self.work_root_path, device=self.device, wavlm_path="microsoft/wavlm-large", ) self.template = stf_alternative.Template( model=model, config_path=self.config_path, template_video_path=self.template_video_path, ) # Path("dubbing").mkdir(exist_ok=True) # save_path = os.path.join("dubbing", Path(audio).stem+"--lip.mp4") Path("/tmp/dubbing").mkdir(exist_ok=True) save_path = os.path.join("/tmp/dubbing", Path(audio).stem+"--lip.mp4") reader = iter(self.template._get_reader(num_skip_frames=0)) audio_segment = AudioSegment.from_file(audio) pivot = 0 results = [] # try: # gen_infer = self.template.gen_infer( # audio_segment, # pivot, # ) # for idx, (it, chunk) in enumerate(gen_infer, pivot): # frame = next(reader) # composed = self.template.compose(idx, frame, it) # frame_name = f"{idx}".zfill(5)+".jpg" # results.append(it['pred']) # pivot = idx + 1 # except StopIteration as e: # pass with ThreadPoolExecutor(1) as p: try: gen_infer = self.template.gen_infer_concurrent( p, audio_segment, pivot, ) for idx, (it, chunk) in enumerate(gen_infer, pivot): frame = next(reader) composed = self.template.compose(idx, frame, it) frame_name = f"{idx}".zfill(5)+".jpg" results.append(it['pred']) pivot = idx + 1 except StopIteration as e: pass images2video(results, save_path) save_path_aud = save_path.replace('.mp4', '_aud.mp4') merge_audio_video(save_path, audio, save_path_aud) return save_path_aud #save_path