import tempfile import gradio as gr import subprocess import os, stat import uuid from googletrans import Translator from TTS.api import TTS import ffmpeg from faster_whisper import WhisperModel from scipy.signal import wiener import soundfile as sf from pydub import AudioSegment import numpy as np import librosa from zipfile import ZipFile import shlex import cv2 import torch import torchvision from tqdm import tqdm from numba import jit from huggingface_hub import HfApi HF_TOKEN = os.environ.get("HF_TOKEN") os.environ["COQUI_TOS_AGREED"] = "1" api = HfApi(token=HF_TOKEN) repo_id = "artificialguybr/video-dubbing" ZipFile("ffmpeg.zip").extractall() st = os.stat('ffmpeg') os.chmod('ffmpeg', st.st_mode | stat.S_IEXEC) #Whisper model_size = "small" model = WhisperModel(model_size, device="cuda", compute_type="float16") def check_for_faces(video_path): face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') cap = cv2.VideoCapture(video_path) while True: ret, frame = cap.read() if not ret: break gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) faces = face_cascade.detectMultiScale(gray, 1.1, 4) if len(faces) > 0: return True return False def process_video(radio, video, target_language, has_closeup_face): if target_language is None: return gr.Error("Please select a Target Language for Dubbing.") run_uuid = uuid.uuid4().hex[:6] output_filename = f"{run_uuid}_resized_video.mp4" ffmpeg.input(video).output(output_filename, vf='scale=-2:720').run() video_path = output_filename if not os.path.exists(video_path): return f"Error: {video_path} does not exist." # Move the duration check here video_info = ffmpeg.probe(video_path) video_duration = float(video_info['streams'][0]['duration']) if video_duration > 60: os.remove(video_path) # Delete the resized video return gr.Error("Video duration exceeds 1 minute. Please upload a shorter video.") ffmpeg.input(video_path).output(f"{run_uuid}_output_audio.wav", acodec='pcm_s24le', ar=48000, map='a').run() #y, sr = sf.read(f"{run_uuid}_output_audio.wav") #y = y.astype(np.float32) #y_denoised = wiener(y) #sf.write(f"{run_uuid}_output_audio_denoised.wav", y_denoised, sr) #sound = AudioSegment.from_file(f"{run_uuid}_output_audio_denoised.wav", format="wav") #sound = sound.apply_gain(0) #sound = sound.low_pass_filter(3000).high_pass_filter(100) #sound.export(f"{run_uuid}_output_audio_processed.wav", format="wav") shell_command = f"ffmpeg -y -i {run_uuid}_output_audio.wav -af lowpass=3000,highpass=100 {run_uuid}_output_audio_final.wav".split(" ") subprocess.run([item for item in shell_command], capture_output=False, text=True, check=True) print("Attempting to transcribe with Whisper...") try: segments, info = model.transcribe(f"{run_uuid}_output_audio_final.wav", beam_size=5) whisper_text = " ".join(segment.text for segment in segments) whisper_language = info.language print(f"Transcription successful: {whisper_text}") except RuntimeError as e: print(f"RuntimeError encountered: {str(e)}") if "CUDA failed with error device-side assert triggered" in str(e): gr.Warning("Error. Space need to restart. Please retry in a minute") # Restart the script api.restart_space(repo_id=repo_id) language_mapping = {'English': 'en', 'Spanish': 'es', 'French': 'fr', 'German': 'de', 'Italian': 'it', 'Portuguese': 'pt', 'Polish': 'pl', 'Turkish': 'tr', 'Russian': 'ru', 'Dutch': 'nl', 'Czech': 'cs', 'Arabic': 'ar', 'Chinese (Simplified)': 'zh-cn'} target_language_code = language_mapping[target_language] translator = Translator() translated_text = translator.translate(whisper_text, src=whisper_language, dest=target_language_code).text print(translated_text) tts = TTS("tts_models/multilingual/multi-dataset/xtts_v1.1") tts.to('cuda') tts.tts_to_file(translated_text, speaker_wav=f"{run_uuid}_output_audio_final.wav", file_path=f"{run_uuid}_output_synth.wav", language=target_language_code) pad_top = 0 pad_bottom = 15 pad_left = 0 pad_right = 0 rescaleFactor = 1 video_path_fix = video_path if has_closeup_face: has_face = True else: has_face = check_for_faces(video_path) if has_closeup_face: try: cmd = f"python Wav2Lip/inference.py --checkpoint_path 'Wav2Lip/checkpoints/wav2lip_gan.pth' --face {shlex.quote(video_path)} --audio '{run_uuid}_output_synth.wav' --pads {pad_top} {pad_bottom} {pad_left} {pad_right} --resize_factor {rescaleFactor} --nosmooth --outfile '{run_uuid}_output_video.mp4'" subprocess.run(cmd, shell=True, check=True) except subprocess.CalledProcessError as e: if "Face not detected! Ensure the video contains a face in all the frames." in str(e.stderr): # Fallback to FFmpeg merge gr.Warning("Wav2lip didn't detect a face. Please try again with the option disabled.") cmd = f"ffmpeg -i {video_path} -i {run_uuid}_output_synth.wav -c:v copy -c:a aac -strict experimental -map 0:v:0 -map 1:a:0 {run_uuid}_output_video.mp4" subprocess.run(cmd, shell=True) else: # Merge audio with the original video without running Wav2Lip cmd = f"ffmpeg -i {video_path} -i {run_uuid}_output_synth.wav -c:v copy -c:a aac -strict experimental -map 0:v:0 -map 1:a:0 {run_uuid}_output_video.mp4" subprocess.run(cmd, shell=True) if not os.path.exists(f"{run_uuid}_output_video.mp4"): raise FileNotFoundError(f"Error: {run_uuid}_output_video.mp4 was not generated.") output_video_path = f"{run_uuid}_output_video.mp4" # Cleanup: Delete all generated files except the final output video files_to_delete = [ f"{run_uuid}_resized_video.mp4", f"{run_uuid}_output_audio.wav", f"{run_uuid}_output_audio_final.wav", f"{run_uuid}_output_synth.wav" ] for file in files_to_delete: try: os.remove(file) except FileNotFoundError: print(f"File {file} not found for deletion.") return output_video_path def swap(radio): if(radio == "Upload"): return gr.update(source="upload") else: return gr.update(source="webcam") video = gr.Video() radio = gr.Radio(["Upload", "Record"], value="Upload", show_label=False) iface = gr.Interface( fn=process_video, inputs=[ radio, video, gr.Dropdown(choices=["English", "Spanish", "French", "German", "Italian", "Portuguese", "Polish", "Turkish", "Russian", "Dutch", "Czech", "Arabic", "Chinese (Simplified)"], label="Target Language for Dubbing", value="Spanish"), gr.Checkbox( label="Включить если на видео имеет крупный план лица", value=False, info="Включить если на видео имеет крупный план лица") ], outputs=gr.Video(), live=False, title="AI Video Dubbing", description="""Данный инструмент разработан для @work_control""", allow_flagging=False ) with gr.Blocks() as demo: iface.render() radio.change(swap, inputs=[radio], outputs=video) gr.Markdown(""" **Примечание:** - Ограничение на видео составляет 1 минуту. Оно будет озвучивать всех людей, используя только один голос. - Создание может занять до 5 минут. - Если вы неправильно отметите флажок "Видео с крупным планом лица", озвучивание может не работать ожидаемым образом. """) demo.queue(concurrency_count=1, max_size=15) demo.launch()