|
import datetime |
|
import os |
|
import re |
|
import shutil |
|
|
|
from tqdm import tqdm |
|
|
|
from shortGPT.audio.audio_duration import get_asset_duration |
|
from shortGPT.audio.audio_utils import (audioToText, get_asset_duration, |
|
run_background_audio_split, |
|
speedUpAudio) |
|
from shortGPT.audio.eleven_voice_module import VoiceModule |
|
from shortGPT.config.languages import ACRONYM_LANGUAGE_MAPPING, Language |
|
from shortGPT.editing_framework.editing_engine import (EditingEngine, |
|
EditingStep) |
|
from shortGPT.editing_utils.captions import (getCaptionsWithTime, |
|
getSpeechBlocks) |
|
from shortGPT.editing_utils.handle_videos import get_aspect_ratio |
|
from shortGPT.engine.abstract_content_engine import CONTENT_DB, AbstractContentEngine |
|
from shortGPT.gpt.gpt_translate import translateContent |
|
|
|
class MultiLanguageTranslationEngine(AbstractContentEngine): |
|
|
|
def __init__(self, voiceModule: VoiceModule, src_url: str = "", target_language: Language = Language.ENGLISH, use_captions=False, id=""): |
|
super().__init__(id, "content_translation", target_language, voiceModule) |
|
if not id: |
|
self._db_should_translate = True |
|
if src_url: |
|
self._db_src_url = src_url |
|
self._db_use_captions = use_captions |
|
self._db_target_language = target_language.value |
|
|
|
self.stepDict = { |
|
1: self._transcribe_audio, |
|
2: self._translate_content, |
|
3: self._generate_translated_audio, |
|
4: self._edit_and_render_video, |
|
5: self._add_metadata |
|
} |
|
|
|
def _transcribe_audio(self): |
|
cached_translation = CONTENT_DB.content_collection.find_one({ |
|
"content_type": 'content_translation', |
|
'src_url': self._db_src_url, |
|
'ready_to_upload': True |
|
}) |
|
if not (cached_translation and 'speech_blocks' in cached_translation and 'original_language' in cached_translation): |
|
video_audio, _ = get_asset_duration(self._db_src_url, isVideo=False) |
|
self.verifyParameters(content_path=video_audio) |
|
self.logger(f"1/5 - Transcribing original audio to text...") |
|
whispered = audioToText(video_audio, model_size='base') |
|
self._db_speech_blocks = getSpeechBlocks(whispered, silence_time=0.8) |
|
self._db_original_language = whispered['language'] |
|
|
|
if (ACRONYM_LANGUAGE_MAPPING.get(self._db_original_language) == Language(self._db_target_language)): |
|
self._db_translated_timed_sentences = self._db_speech_blocks |
|
self._db_should_translate = False |
|
|
|
expected_chars = len("".join([text for _, text in self._db_speech_blocks])) |
|
chars_remaining = self.voiceModule.get_remaining_characters() |
|
if chars_remaining < expected_chars: |
|
raise Exception( |
|
f"Your VoiceModule's key doesn't have enough characters to totally translate this video | Remaining: {chars_remaining} | Number of characters to translate: {expected_chars}") |
|
|
|
def _translate_content(self): |
|
if (self._db_should_translate): |
|
self.verifyParameters(_db_speech_blocks=self._db_speech_blocks) |
|
|
|
translated_timed_sentences = [] |
|
for i, ((t1, t2), text) in tqdm(enumerate(self._db_speech_blocks), desc="Translating content"): |
|
self.logger(f"2/5 - Translating text content - {i+1} / {len(self._db_speech_blocks)}") |
|
translated_text = translateContent(text, self._db_target_language) |
|
translated_timed_sentences.append([[t1, t2], translated_text]) |
|
self._db_translated_timed_sentences = translated_timed_sentences |
|
|
|
def _generate_translated_audio(self): |
|
self.verifyParameters(translated_timed_sentences=self._db_translated_timed_sentences) |
|
|
|
translated_audio_blocks = [] |
|
for i, ((t1, t2), translated_text) in tqdm(enumerate(self._db_translated_timed_sentences), desc="Generating translated audio"): |
|
self.logger(f"3/5 - Generating translated audio - {i+1} / {len(self._db_translated_timed_sentences)}") |
|
translated_voice = self.voiceModule.generate_voice(translated_text, self.dynamicAssetDir+f"translated_{i}_{self._db_target_language}.wav") |
|
if not translated_voice: |
|
raise Exception('An error happending during audio voice creation') |
|
final_audio_path = speedUpAudio(translated_voice, self.dynamicAssetDir+f"translated_{i}_{self._db_target_language}_spedup.wav", expected_duration=t2-t1 - 0.05) |
|
_, translated_duration = get_asset_duration(final_audio_path, isVideo=False) |
|
translated_audio_blocks.append([[t1, t1+translated_duration], final_audio_path]) |
|
self._db_audio_bits = translated_audio_blocks |
|
|
|
def _edit_and_render_video(self): |
|
self.verifyParameters(_db_audio_bits=self._db_audio_bits) |
|
self.logger(f"4.1 / 5 - Preparing automated editing") |
|
target_language = Language(self._db_target_language) |
|
input_video, video_length = get_asset_duration(self._db_src_url) |
|
video_audio, _ = get_asset_duration(self._db_src_url, isVideo=False) |
|
editing_engine = EditingEngine() |
|
editing_engine.addEditingStep(EditingStep.ADD_BACKGROUND_VIDEO, {'url': input_video, "set_time_start": 0, "set_time_end": video_length}) |
|
last_t2 = 0 |
|
for (t1, t2), audio_path in self._db_audio_bits: |
|
t2+=-0.05 |
|
editing_engine.addEditingStep(EditingStep.INSERT_AUDIO, {'url': audio_path, 'set_time_start': t1, 'set_time_end': t2}) |
|
if t1-last_t2 >4: |
|
editing_engine.addEditingStep(EditingStep.EXTRACT_AUDIO, {"url": video_audio, "subclip": {"t_start": last_t2, "t_end": t1}, "set_time_start": last_t2, "set_time_end": t1}) |
|
last_t2 = t2 |
|
|
|
if video_length - last_t2 >4: |
|
editing_engine.addEditingStep(EditingStep.EXTRACT_AUDIO, {"url": video_audio, "subclip": {"t_start": last_t2, "t_end": video_length}, "set_time_start": last_t2, "set_time_end": video_length}) |
|
|
|
if self._db_use_captions: |
|
is_landscape = get_aspect_ratio(input_video) > 1 |
|
if not self._db_timed_translated_captions: |
|
if not self._db_translated_voiceover_path: |
|
self.logger(f"4.5 / 5 - Generating captions in {target_language.value}") |
|
editing_engine.generateAudio(self.dynamicAssetDir+"translated_voiceover.wav") |
|
self._db_translated_voiceover_path = self.dynamicAssetDir+"translated_voiceover.wav" |
|
whispered_translated = audioToText(self._db_translated_voiceover_path, model_size='base') |
|
timed_translated_captions = getCaptionsWithTime(whispered_translated, maxCaptionSize=50 if is_landscape else 15, considerPunctuation=True) |
|
self._db_timed_translated_captions = [[[t1,t2], text] for (t1, t2), text in timed_translated_captions if t2 - t1 <= 4] |
|
for (t1, t2), text in self._db_timed_translated_captions: |
|
caption_key = "LANDSCAPE" if is_landscape else "SHORT" |
|
caption_key += "_ARABIC" if target_language == Language.ARABIC else "" |
|
caption_type = getattr(EditingStep, f"ADD_CAPTION_{caption_key}") |
|
editing_engine.addEditingStep(caption_type, {'text': text, "set_time_start": t1, "set_time_end": t2}) |
|
|
|
self._db_video_path = self.dynamicAssetDir+"translated_content.mp4" |
|
|
|
editing_engine.renderVideo(self._db_video_path, logger= self.logger if self.logger is not self.default_logger else None) |
|
|
|
def _add_metadata(self): |
|
self.logger(f"5 / 5 - Saving translated video") |
|
now = datetime.datetime.now() |
|
date_str = now.strftime("%Y-%m-%d_%H-%M-%S") |
|
newFileName = f"videos/{date_str} - " + \ |
|
re.sub(r"[^a-zA-Z0-9 '\n\.]", '', f"translated_content_to_{self._db_target_language}") |
|
|
|
shutil.move(self._db_video_path, newFileName+".mp4") |
|
self._db_video_path = newFileName+".mp4" |
|
self._db_ready_to_upload = True |
|
|