import datetime import os import re import shutil from tqdm import tqdm from shortGPT.audio.audio_duration import get_asset_duration from shortGPT.audio.audio_utils import (audioToText, get_asset_duration, run_background_audio_split, speedUpAudio) from shortGPT.audio.eleven_voice_module import VoiceModule from shortGPT.config.languages import ACRONYM_LANGUAGE_MAPPING, Language from shortGPT.editing_framework.editing_engine import (EditingEngine, EditingStep) from shortGPT.editing_utils.captions import (getCaptionsWithTime, getSpeechBlocks) from shortGPT.editing_utils.handle_videos import get_aspect_ratio from shortGPT.engine.abstract_content_engine import CONTENT_DB, AbstractContentEngine from shortGPT.gpt.gpt_translate import translateContent class MultiLanguageTranslationEngine(AbstractContentEngine): def __init__(self, voiceModule: VoiceModule, src_url: str = "", target_language: Language = Language.ENGLISH, use_captions=False, id=""): super().__init__(id, "content_translation", target_language, voiceModule) if not id: self._db_should_translate = True if src_url: self._db_src_url = src_url self._db_use_captions = use_captions self._db_target_language = target_language.value self.stepDict = { 1: self._transcribe_audio, 2: self._translate_content, 3: self._generate_translated_audio, 4: self._edit_and_render_video, 5: self._add_metadata } def _transcribe_audio(self): cached_translation = CONTENT_DB.content_collection.find_one({ "content_type": 'content_translation', 'src_url': self._db_src_url, 'ready_to_upload': True }) if not (cached_translation and 'speech_blocks' in cached_translation and 'original_language' in cached_translation): video_audio, _ = get_asset_duration(self._db_src_url, isVideo=False) self.verifyParameters(content_path=video_audio) self.logger(f"1/5 - Transcribing original audio to text...") whispered = audioToText(video_audio, model_size='base') self._db_speech_blocks = getSpeechBlocks(whispered, silence_time=0.8) self._db_original_language = whispered['language'] if (ACRONYM_LANGUAGE_MAPPING.get(self._db_original_language) == Language(self._db_target_language)): self._db_translated_timed_sentences = self._db_speech_blocks self._db_should_translate = False expected_chars = len("".join([text for _, text in self._db_speech_blocks])) chars_remaining = self.voiceModule.get_remaining_characters() if chars_remaining < expected_chars: raise Exception( f"Your VoiceModule's key doesn't have enough characters to totally translate this video | Remaining: {chars_remaining} | Number of characters to translate: {expected_chars}") def _translate_content(self): if (self._db_should_translate): self.verifyParameters(_db_speech_blocks=self._db_speech_blocks) translated_timed_sentences = [] for i, ((t1, t2), text) in tqdm(enumerate(self._db_speech_blocks), desc="Translating content"): self.logger(f"2/5 - Translating text content - {i+1} / {len(self._db_speech_blocks)}") translated_text = translateContent(text, self._db_target_language) translated_timed_sentences.append([[t1, t2], translated_text]) self._db_translated_timed_sentences = translated_timed_sentences def _generate_translated_audio(self): self.verifyParameters(translated_timed_sentences=self._db_translated_timed_sentences) translated_audio_blocks = [] for i, ((t1, t2), translated_text) in tqdm(enumerate(self._db_translated_timed_sentences), desc="Generating translated audio"): self.logger(f"3/5 - Generating translated audio - {i+1} / {len(self._db_translated_timed_sentences)}") translated_voice = self.voiceModule.generate_voice(translated_text, self.dynamicAssetDir+f"translated_{i}_{self._db_target_language}.wav") if not translated_voice: raise Exception('An error happending during audio voice creation') final_audio_path = speedUpAudio(translated_voice, self.dynamicAssetDir+f"translated_{i}_{self._db_target_language}_spedup.wav", expected_duration=t2-t1 - 0.05) _, translated_duration = get_asset_duration(final_audio_path, isVideo=False) translated_audio_blocks.append([[t1, t1+translated_duration], final_audio_path]) self._db_audio_bits = translated_audio_blocks def _edit_and_render_video(self): self.verifyParameters(_db_audio_bits=self._db_audio_bits) self.logger(f"4.1 / 5 - Preparing automated editing") target_language = Language(self._db_target_language) input_video, video_length = get_asset_duration(self._db_src_url) video_audio, _ = get_asset_duration(self._db_src_url, isVideo=False) editing_engine = EditingEngine() editing_engine.addEditingStep(EditingStep.ADD_BACKGROUND_VIDEO, {'url': input_video, "set_time_start": 0, "set_time_end": video_length}) last_t2 = 0 for (t1, t2), audio_path in self._db_audio_bits: t2+=-0.05 editing_engine.addEditingStep(EditingStep.INSERT_AUDIO, {'url': audio_path, 'set_time_start': t1, 'set_time_end': t2}) if t1-last_t2 >4: editing_engine.addEditingStep(EditingStep.EXTRACT_AUDIO, {"url": video_audio, "subclip": {"t_start": last_t2, "t_end": t1}, "set_time_start": last_t2, "set_time_end": t1}) last_t2 = t2 if video_length - last_t2 >4: editing_engine.addEditingStep(EditingStep.EXTRACT_AUDIO, {"url": video_audio, "subclip": {"t_start": last_t2, "t_end": video_length}, "set_time_start": last_t2, "set_time_end": video_length}) if self._db_use_captions: is_landscape = get_aspect_ratio(input_video) > 1 if not self._db_timed_translated_captions: if not self._db_translated_voiceover_path: self.logger(f"4.5 / 5 - Generating captions in {target_language.value}") editing_engine.generateAudio(self.dynamicAssetDir+"translated_voiceover.wav") self._db_translated_voiceover_path = self.dynamicAssetDir+"translated_voiceover.wav" whispered_translated = audioToText(self._db_translated_voiceover_path, model_size='base') timed_translated_captions = getCaptionsWithTime(whispered_translated, maxCaptionSize=50 if is_landscape else 15, considerPunctuation=True) self._db_timed_translated_captions = [[[t1,t2], text] for (t1, t2), text in timed_translated_captions if t2 - t1 <= 4] for (t1, t2), text in self._db_timed_translated_captions: caption_key = "LANDSCAPE" if is_landscape else "SHORT" caption_key += "_ARABIC" if target_language == Language.ARABIC else "" caption_type = getattr(EditingStep, f"ADD_CAPTION_{caption_key}") editing_engine.addEditingStep(caption_type, {'text': text, "set_time_start": t1, "set_time_end": t2}) self._db_video_path = self.dynamicAssetDir+"translated_content.mp4" editing_engine.renderVideo(self._db_video_path, logger= self.logger if self.logger is not self.default_logger else None) def _add_metadata(self): self.logger(f"5 / 5 - Saving translated video") now = datetime.datetime.now() date_str = now.strftime("%Y-%m-%d_%H-%M-%S") newFileName = f"videos/{date_str} - " + \ re.sub(r"[^a-zA-Z0-9 '\n\.]", '', f"translated_content_to_{self._db_target_language}") shutil.move(self._db_video_path, newFileName+".mp4") self._db_video_path = newFileName+".mp4" self._db_ready_to_upload = True