import gradio as gr import torch import ffmpeg import json import os import uuid from googletrans import Translator import whisperx import spaces from scipy.io import wavfile import numpy as np import gc import tempfile import soundfile as sf from io import BytesIO from concurrent.futures import ThreadPoolExecutor # Load Google language codes with open('google_lang_codes.json', 'r') as f: google_lang_codes = json.load(f) translator = Translator() def ffmpeg_read(input_data_bytes, sampling_rate): process = ( ffmpeg.input('pipe:0') .output('pipe:1', format='wav', acodec='pcm_s16le', ar=sampling_rate) .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) ) out, _ = process.communicate(input=input_data_bytes) audio_array = np.frombuffer(out, np.int16) return audio_array def load_whisper_model(device, compute_type): return whisperx.load_model("large-v3", device, compute_type=compute_type) def load_align_model(language_code, device): return whisperx.load_align_model(language_code=language_code, device=device, model_name="WAV2VEC2_ASR_LARGE_LV60K_960H") @spaces.GPU def transcribe_and_align(inputs, language_code, whisper_model, align_model, align_metadata): print("Starting transcribe_and_align") device = "cuda" if torch.cuda.is_available() else "cpu" batch_size = 16 # or adjust based on your memory constraints # Transcribe with whisper audio_data = inputs["array"].astype(np.int16) audio_bytes = BytesIO() sf.write(audio_bytes, audio_data, inputs["sampling_rate"], format='wav') audio_bytes.seek(0) with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: tmp_file.write(audio_bytes.read()) audio_path = tmp_file.name audio = whisperx.load_audio(audio_path, inputs["sampling_rate"]) result = whisper_model.transcribe(audio, batch_size=batch_size) print("Segments before alignment:") for segment in result["segments"]: print(f"{segment['start']} - {segment['end']}: {segment['text']}") # Align whisper output result = whisperx.align(result["segments"], align_model, align_metadata, audio, device, return_char_alignments=False) print("Segments after alignment:") for segment in result["segments"]: print(f"{segment['start']} - {segment['end']}: {segment['text']}") os.remove(audio_path) gc.collect() torch.cuda.empty_cache() return {"aligned": result["segments"], "word_segments": result["word_segments"]} def translate_text(text, target_language_code): translated_text = translator.translate(text.strip(), dest=target_language_code).text return translated_text @spaces.GPU def process_video(Video, target_language, translate_video): print("Starting process_video") current_path = os.getcwd() common_uuid = uuid.uuid4() audio_file = f"{common_uuid}.wav" print(f"UUID: {common_uuid}") try: print("Extracting audio from video") ffmpeg.input(Video).output(audio_file).run() except ffmpeg.Error as e: print(f"An error occurred while extracting audio: {e.stderr.decode()}") return transcript_file = f"{current_path}/{common_uuid}.srt" print(f"Transcript file: {transcript_file}") target_language_code = google_lang_codes.get(target_language, "en") print(f"Target language code: {target_language_code}") print("Starting transcription and alignment with WhisperX") with open(audio_file, "rb") as f: audio_bytes = f.read() inputs = {"array": ffmpeg_read(audio_bytes, 16000), "sampling_rate": 16000} device = "cuda" if torch.cuda.is_available() else "cpu" compute_type = "float16" if torch.cuda.is_available() else "int8" whisper_model = load_whisper_model(device, compute_type) align_model, align_metadata = load_align_model(target_language_code, device) transcription_result = transcribe_and_align(inputs, target_language_code, whisper_model, align_model, align_metadata) if "aligned" not in transcription_result: print("Error: Transcription result does not contain 'aligned'") return aligned_segments = transcription_result["aligned"] word_segments = transcription_result["word_segments"] print("Printing aligned segments for debugging:") for segment in aligned_segments: print(f"Segment start: {segment['start']}, end: {segment['end']}, text: {segment['text']}") def format_timestamp(seconds): millis = int((seconds - int(seconds)) * 1000) hours, remainder = divmod(int(seconds), 3600) minutes, seconds = divmod(remainder, 60) return f"{hours:02}:{minutes:02}:{seconds:02},{millis:03}" with open(transcript_file, "w+", encoding="utf-8") as f: counter = 1 for segment in aligned_segments: start_time = format_timestamp(segment['start']) end_time = format_timestamp(segment['end']) f.write(f"{counter}\n") f.write(f"{start_time} --> {end_time}\n") f.write(f"{segment['text'].strip()}\n\n") counter += 1 if translate_video: translated_lines = [] with open(transcript_file, "r+", encoding="utf-8") as f: lines = f.readlines() for line in lines: if line.strip().isnumeric() or "-->" in line: translated_lines.append(line) elif line.strip() != "": translated_text = translate_text(line, target_language_code) translated_lines.append(translated_text + "\n") else: translated_lines.append("\n") f.seek(0) f.truncate() f.writelines(translated_lines) output_video = f"{common_uuid}_output_video.mp4" print("Embedding subtitles with FFmpeg") try: if target_language_code == 'ja': subtitle_style = "FontName=Noto Sans CJK JP,PrimaryColour=&H00FFFF,OutlineColour=&H000000,BackColour=&H80000000,BorderStyle=3,Outline=2,Shadow=1" else: subtitle_style = "FontName=Arial Unicode MS,PrimaryColour=&H00FFFF,OutlineColour=&H000000,BackColour=&H80000000,BorderStyle=3,Outline=2,Shadow=1" ffmpeg.input(Video).output(output_video, vf=f"subtitles={transcript_file}:force_style='{subtitle_style}'").run() print("FFmpeg executed successfully.") except ffmpeg.Error as e: print(f"An error occurred while embedding subtitles: {e.stderr.decode()}") os.unlink(audio_file) os.unlink(transcript_file) return output_video iface = gr.Interface( fn=process_video, inputs=[ gr.Video(), gr.Dropdown(choices=list(google_lang_codes.keys()), label="Target Language for Translation", value="English"), gr.Checkbox(label="Translate Video", value=True, info="Check to translate the video to the selected language. Uncheck for transcription only."), ], outputs=[ gr.Video(), ], live=False, title="VIDEO TRANSCRIPTION AND TRANSLATION", description="""This tool was developed by [@artificialguybr](https://twitter.com/artificialguybr) using entirely open-source tools. Special thanks to Hugging Face for the GPU support. Test the [Video Dubbing](https://huggingface.co/spaces/artificialguybr/video-dubbing) space!""", allow_flagging="never" ) with gr.Blocks() as demo: iface.render() gr.Markdown(""" **Note:** - Video limit is 15 minutes. It will perform transcription and translate subtitles. - The tool uses open-source models for all tasks. It's an alpha version. """) demo.launch(max_threads=15)