sHORTgpt / shortGPT /engine /multi_language_translation_engine.py
wiydarrr's picture
Upload folder using huggingface_hub
5f685fd verified
raw
history blame
8.2 kB
import datetime
import os
import re
import shutil
from tqdm import tqdm
from shortGPT.audio.audio_duration import get_asset_duration
from shortGPT.audio.audio_utils import (audioToText, get_asset_duration,
run_background_audio_split,
speedUpAudio)
from shortGPT.audio.eleven_voice_module import VoiceModule
from shortGPT.config.languages import ACRONYM_LANGUAGE_MAPPING, Language
from shortGPT.editing_framework.editing_engine import (EditingEngine,
EditingStep)
from shortGPT.editing_utils.captions import (getCaptionsWithTime,
getSpeechBlocks)
from shortGPT.editing_utils.handle_videos import get_aspect_ratio
from shortGPT.engine.abstract_content_engine import CONTENT_DB, AbstractContentEngine
from shortGPT.gpt.gpt_translate import translateContent
class MultiLanguageTranslationEngine(AbstractContentEngine):
def __init__(self, voiceModule: VoiceModule, src_url: str = "", target_language: Language = Language.ENGLISH, use_captions=False, id=""):
super().__init__(id, "content_translation", target_language, voiceModule)
if not id:
self._db_should_translate = True
if src_url:
self._db_src_url = src_url
self._db_use_captions = use_captions
self._db_target_language = target_language.value
self.stepDict = {
1: self._transcribe_audio,
2: self._translate_content,
3: self._generate_translated_audio,
4: self._edit_and_render_video,
5: self._add_metadata
}
def _transcribe_audio(self):
cached_translation = CONTENT_DB.content_collection.find_one({
"content_type": 'content_translation',
'src_url': self._db_src_url,
'ready_to_upload': True
})
if not (cached_translation and 'speech_blocks' in cached_translation and 'original_language' in cached_translation):
video_audio, _ = get_asset_duration(self._db_src_url, isVideo=False)
self.verifyParameters(content_path=video_audio)
self.logger(f"1/5 - Transcribing original audio to text...")
whispered = audioToText(video_audio, model_size='base')
self._db_speech_blocks = getSpeechBlocks(whispered, silence_time=0.8)
self._db_original_language = whispered['language']
if (ACRONYM_LANGUAGE_MAPPING.get(self._db_original_language) == Language(self._db_target_language)):
self._db_translated_timed_sentences = self._db_speech_blocks
self._db_should_translate = False
expected_chars = len("".join([text for _, text in self._db_speech_blocks]))
chars_remaining = self.voiceModule.get_remaining_characters()
if chars_remaining < expected_chars:
raise Exception(
f"Your VoiceModule's key doesn't have enough characters to totally translate this video | Remaining: {chars_remaining} | Number of characters to translate: {expected_chars}")
def _translate_content(self):
if (self._db_should_translate):
self.verifyParameters(_db_speech_blocks=self._db_speech_blocks)
translated_timed_sentences = []
for i, ((t1, t2), text) in tqdm(enumerate(self._db_speech_blocks), desc="Translating content"):
self.logger(f"2/5 - Translating text content - {i+1} / {len(self._db_speech_blocks)}")
translated_text = translateContent(text, self._db_target_language)
translated_timed_sentences.append([[t1, t2], translated_text])
self._db_translated_timed_sentences = translated_timed_sentences
def _generate_translated_audio(self):
self.verifyParameters(translated_timed_sentences=self._db_translated_timed_sentences)
translated_audio_blocks = []
for i, ((t1, t2), translated_text) in tqdm(enumerate(self._db_translated_timed_sentences), desc="Generating translated audio"):
self.logger(f"3/5 - Generating translated audio - {i+1} / {len(self._db_translated_timed_sentences)}")
translated_voice = self.voiceModule.generate_voice(translated_text, self.dynamicAssetDir+f"translated_{i}_{self._db_target_language}.wav")
if not translated_voice:
raise Exception('An error happending during audio voice creation')
final_audio_path = speedUpAudio(translated_voice, self.dynamicAssetDir+f"translated_{i}_{self._db_target_language}_spedup.wav", expected_duration=t2-t1 - 0.05)
_, translated_duration = get_asset_duration(final_audio_path, isVideo=False)
translated_audio_blocks.append([[t1, t1+translated_duration], final_audio_path])
self._db_audio_bits = translated_audio_blocks
def _edit_and_render_video(self):
self.verifyParameters(_db_audio_bits=self._db_audio_bits)
self.logger(f"4.1 / 5 - Preparing automated editing")
target_language = Language(self._db_target_language)
input_video, video_length = get_asset_duration(self._db_src_url)
video_audio, _ = get_asset_duration(self._db_src_url, isVideo=False)
editing_engine = EditingEngine()
editing_engine.addEditingStep(EditingStep.ADD_BACKGROUND_VIDEO, {'url': input_video, "set_time_start": 0, "set_time_end": video_length})
last_t2 = 0
for (t1, t2), audio_path in self._db_audio_bits:
t2+=-0.05
editing_engine.addEditingStep(EditingStep.INSERT_AUDIO, {'url': audio_path, 'set_time_start': t1, 'set_time_end': t2})
if t1-last_t2 >4:
editing_engine.addEditingStep(EditingStep.EXTRACT_AUDIO, {"url": video_audio, "subclip": {"t_start": last_t2, "t_end": t1}, "set_time_start": last_t2, "set_time_end": t1})
last_t2 = t2
if video_length - last_t2 >4:
editing_engine.addEditingStep(EditingStep.EXTRACT_AUDIO, {"url": video_audio, "subclip": {"t_start": last_t2, "t_end": video_length}, "set_time_start": last_t2, "set_time_end": video_length})
if self._db_use_captions:
is_landscape = get_aspect_ratio(input_video) > 1
if not self._db_timed_translated_captions:
if not self._db_translated_voiceover_path:
self.logger(f"4.5 / 5 - Generating captions in {target_language.value}")
editing_engine.generateAudio(self.dynamicAssetDir+"translated_voiceover.wav")
self._db_translated_voiceover_path = self.dynamicAssetDir+"translated_voiceover.wav"
whispered_translated = audioToText(self._db_translated_voiceover_path, model_size='base')
timed_translated_captions = getCaptionsWithTime(whispered_translated, maxCaptionSize=50 if is_landscape else 15, considerPunctuation=True)
self._db_timed_translated_captions = [[[t1,t2], text] for (t1, t2), text in timed_translated_captions if t2 - t1 <= 4]
for (t1, t2), text in self._db_timed_translated_captions:
caption_key = "LANDSCAPE" if is_landscape else "SHORT"
caption_key += "_ARABIC" if target_language == Language.ARABIC else ""
caption_type = getattr(EditingStep, f"ADD_CAPTION_{caption_key}")
editing_engine.addEditingStep(caption_type, {'text': text, "set_time_start": t1, "set_time_end": t2})
self._db_video_path = self.dynamicAssetDir+"translated_content.mp4"
editing_engine.renderVideo(self._db_video_path, logger= self.logger if self.logger is not self.default_logger else None)
def _add_metadata(self):
self.logger(f"5 / 5 - Saving translated video")
now = datetime.datetime.now()
date_str = now.strftime("%Y-%m-%d_%H-%M-%S")
newFileName = f"videos/{date_str} - " + \
re.sub(r"[^a-zA-Z0-9 '\n\.]", '', f"translated_content_to_{self._db_target_language}")
shutil.move(self._db_video_path, newFileName+".mp4")
self._db_video_path = newFileName+".mp4"
self._db_ready_to_upload = True