import threading import time import openai from pytube import YouTube from os import getenv, getcwd from pathlib import Path from enum import Enum, auto import logging import subprocess from src.srt_util.srt import SrtScript from src.srt_util.srt2ass import srt2ass from time import time, strftime, gmtime, sleep from src.translators.translation import get_translation, prompt_selector import torch import stable_whisper import shutil """ Youtube link - link - model - output type Video file - path - model - output type Audio file - path - model - output type """ """ TaskID Progress: Enum Computing resrouce status SRT_Script : SrtScript - input module -> initialize (ASR module) - Pre-process - Translation (%) - Post process (time stamp) - Output module: SRT_Script --> output(.srt) - (Optional) mp4 """ class TaskStatus(str, Enum): CREATED = 'CREATED' INITIALIZING_ASR = 'INITIALIZING_ASR' PRE_PROCESSING = 'PRE_PROCESSING' TRANSLATING = 'TRANSLATING' POST_PROCESSING = 'POST_PROCESSING' OUTPUT_MODULE = 'OUTPUT_MODULE' class Task: @property def status(self): with self.__status_lock: return self.__status @status.setter def status(self, new_status): with self.__status_lock: self.__status = new_status def __init__(self, task_id, task_local_dir, task_cfg): self.__status_lock = threading.Lock() self.__status = TaskStatus.CREATED self.gpu_status = 0 openai.api_key = getenv("OPENAI_API_KEY") self.task_id = task_id self.task_local_dir = task_local_dir self.ASR_setting = task_cfg["ASR"] self.translation_setting = task_cfg["translation"] self.translation_model = self.translation_setting["model"] self.output_type = task_cfg["output_type"] self.target_lang = task_cfg["target_lang"] self.source_lang = task_cfg["source_lang"] self.field = task_cfg["field"] self.pre_setting = task_cfg["pre_process"] self.post_setting = task_cfg["post_process"] self.audio_path = None self.SRT_Script = None self.result = None self.s_t = None self.t_e = None print(f"Task ID: {self.task_id}") logging.info(f"Task ID: {self.task_id}") logging.info(f"{self.source_lang} -> {self.target_lang} task in {self.field}") logging.info(f"Translation Model: {self.translation_model}") logging.info(f"subtitle_type: {self.output_type['subtitle']}") logging.info(f"video_ouput: {self.output_type['video']}") logging.info(f"bilingual_ouput: {self.output_type['bilingual']}") logging.info("Pre-process setting:") for key in self.pre_setting: logging.info(f"{key}: {self.pre_setting[key]}") logging.info("Post-process setting:") for key in self.post_setting: logging.info(f"{key}: {self.post_setting[key]}") @staticmethod def fromYoutubeLink(youtube_url, task_id, task_dir, task_cfg): # convert to audio logging.info("Task Creation method: Youtube Link") return YoutubeTask(task_id, task_dir, task_cfg, youtube_url) @staticmethod def fromAudioFile(audio_path, task_id, task_dir, task_cfg): # get audio path logging.info("Task Creation method: Audio File") return AudioTask(task_id, task_dir, task_cfg, audio_path) @staticmethod def fromVideoFile(video_path, task_id, task_dir, task_cfg): # get audio path logging.info("Task Creation method: Video File") return VideoTask(task_id, task_dir, task_cfg, video_path) # Module 1 ASR: audio --> SRT_script def get_srt_class(self): # Instead of using the script_en variable directly, we'll use script_input # TODO: setup ASR module like translator self.status = TaskStatus.INITIALIZING_ASR self.t_s = time() method = self.ASR_setting["whisper_config"]["method"] whisper_model = self.ASR_setting["whisper_config"]["whisper_model"] src_srt_path = self.task_local_dir.joinpath(f"task_{self.task_id}_{self.source_lang}.srt") if not Path.exists(src_srt_path): # extract script from audio logging.info("extract script from audio") device = torch.device("cuda" if torch.cuda.is_available() else "cpu") if method == "api": with open(self.audio_path, 'rb') as audio_file: transcript = openai.Audio.transcribe(model="whisper-1", file=audio_file, response_format="srt") elif method == "stable": model = stable_whisper.load_model(whisper_model, device) transcript = model.transcribe(str(self.audio_path), regroup=False, initial_prompt="Hello, welcome to my lecture. Are you good my friend?") ( transcript .split_by_punctuation(['.', '。', '?']) .merge_by_gap(.15, max_words=3) .merge_by_punctuation([' ']) .split_by_punctuation(['.', '。', '?']) ) transcript = transcript.to_dict() # after get the transcript, release the gpu resource torch.cuda.empty_cache() self.SRT_Script = SrtScript(self.source_lang, self.target_lang, transcript['segments'], self.field) # save the srt script to local self.SRT_Script.write_srt_file_src(src_srt_path) # Module 2: SRT preprocess: perform preprocess steps def preprocess(self): self.status = TaskStatus.PRE_PROCESSING logging.info("--------------------Start Preprocessing SRT class--------------------") if self.pre_setting["sentence_form"]: self.SRT_Script.form_whole_sentence() if self.pre_setting["spell_check"]: self.SRT_Script.spell_check_term() if self.pre_setting["term_correct"]: self.SRT_Script.correct_with_force_term() processed_srt_path_src = str(Path(self.task_local_dir) / f'{self.task_id}_processed.srt') self.SRT_Script.write_srt_file_src(processed_srt_path_src) if self.output_type["subtitle"] == "ass": logging.info("write English .srt file to .ass") assSub_src = srt2ass(processed_srt_path_src, "default", "No", "Modest") logging.info('ASS subtitle saved as: ' + assSub_src) self.script_input = self.SRT_Script.get_source_only() pass def update_translation_progress(self, new_progress): if self.progress == TaskStatus.TRANSLATING: self.progress = TaskStatus.TRANSLATING.value[0], new_progress # Module 3: perform srt translation def translation(self): logging.info("---------------------Start Translation--------------------") prompt = prompt_selector(self.source_lang, self.target_lang, self.field) get_translation(self.SRT_Script, self.translation_model, self.task_id, prompt, self.translation_setting['chunk_size']) # Module 4: perform srt post process steps def postprocess(self): self.status = TaskStatus.POST_PROCESSING logging.info("---------------------Start Post-processing SRT class---------------------") if self.post_setting["check_len_and_split"]: self.SRT_Script.check_len_and_split() if self.post_setting["remove_trans_punctuation"]: self.SRT_Script.remove_trans_punctuation() logging.info("---------------------Post-processing SRT class finished---------------------") # Module 5: output module def output_render(self): self.status = TaskStatus.OUTPUT_MODULE video_out = self.output_type["video"] subtitle_type = self.output_type["subtitle"] is_bilingual = self.output_type["bilingual"] results_dir =f"{self.task_local_dir}/results" subtitle_path = f"{results_dir}/{self.task_id}_{self.target_lang}.srt" self.SRT_Script.write_srt_file_translate(subtitle_path) if is_bilingual: subtitle_path = f"{results_dir}/{self.task_id}_{self.source_lang}_{self.target_lang}.srt" self.SRT_Script.write_srt_file_bilingual(subtitle_path) if subtitle_type == "ass": logging.info("write .srt file to .ass") subtitle_path = srt2ass(subtitle_path, "default", "No", "Modest") logging.info('ASS subtitle saved as: ' + subtitle_path) final_res = subtitle_path # encode to .mp4 video file if video_out and self.video_path is not None: logging.info("encoding video file") logging.info(f'ffmpeg comand: \nffmpeg -i {self.video_path} -vf "subtitles={subtitle_path}" {results_dir}/{self.task_id}.mp4') subprocess.run( ["ffmpeg", "-i", self.video_path, "-vf", f"subtitles={subtitle_path}", f"{results_dir}/{self.task_id}.mp4"]) final_res = f"{results_dir}/{self.task_id}.mp4" self.t_e = time() logging.info( "Pipeline finished, time duration:{}".format(strftime("%H:%M:%S", gmtime(self.t_e - self.t_s)))) return final_res def run_pipeline(self): self.get_srt_class() self.preprocess() self.translation() self.postprocess() self.result = self.output_render() # print(self.result) class YoutubeTask(Task): def __init__(self, task_id, task_local_dir, task_cfg, youtube_url): super().__init__(task_id, task_local_dir, task_cfg) self.youtube_url = youtube_url def run(self): yt = YouTube(self.youtube_url) video = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first() if video: video.download(str(self.task_local_dir), filename=f"task_{self.task_id}.mp4") logging.info(f'Video Name: {video.default_filename}') else: raise FileNotFoundError(f" Video stream not found for link {self.youtube_url}") audio = yt.streams.filter(only_audio=True).first() if audio: audio.download(str(self.task_local_dir), filename=f"task_{self.task_id}.mp3") else: logging.info(" download audio failed, using ffmpeg to extract audio") subprocess.run( ['ffmpeg', '-i', self.task_local_dir.joinpath(f"task_{self.task_id}.mp4"), '-f', 'mp3', '-ab', '192000', '-vn', self.task_local_dir.joinpath(f"task_{self.task_id}.mp3")]) logging.info("audio extraction finished") self.video_path = self.task_local_dir.joinpath(f"task_{self.task_id}.mp4") self.audio_path = self.task_local_dir.joinpath(f"task_{self.task_id}.mp3") logging.info(f" Video File Dir: {self.video_path}") logging.info(f" Audio File Dir: {self.audio_path}") logging.info(" Data Prep Complete. Start pipeline") super().run_pipeline() class AudioTask(Task): def __init__(self, task_id, task_local_dir, task_cfg, audio_path): super().__init__(task_id, task_local_dir, task_cfg) # TODO: check audio format self.audio_path = audio_path self.video_path = None def run(self): logging.info(f"Video File Dir: {self.video_path}") logging.info(f"Audio File Dir: {self.audio_path}") logging.info("Data Prep Complete. Start pipeline") super().run_pipeline() class VideoTask(Task): def __init__(self, task_id, task_local_dir, task_cfg, video_path): super().__init__(task_id, task_local_dir, task_cfg) # TODO: check video format {.mp4} new_video_path = f"{task_local_dir}/task_{self.task_id}.mp4" print(new_video_path) logging.info(f"Copy video file to: {new_video_path}") shutil.copyfile(video_path, new_video_path) self.video_path = new_video_path def run(self): logging.info("using ffmpeg to extract audio") subprocess.run( ['ffmpeg', '-i', self.video_path, '-f', 'mp3', '-ab', '192000', '-vn', self.task_local_dir.joinpath(f"task_{self.task_id}.mp3")]) logging.info("audio extraction finished") self.audio_path = self.task_local_dir.joinpath(f"task_{self.task_id}.mp3") logging.info(f" Video File Dir: {self.video_path}") logging.info(f" Audio File Dir: {self.audio_path}") logging.info("Data Prep Complete. Start pipeline") super().run_pipeline()