Spaces:
Build error
Build error
import datetime | |
import os | |
import re | |
import shutil | |
from tqdm import tqdm | |
from shortGPT.audio.audio_duration import get_asset_duration | |
from shortGPT.audio.audio_utils import (audioToText, get_asset_duration, | |
run_background_audio_split, | |
speedUpAudio) | |
from shortGPT.audio.eleven_voice_module import VoiceModule | |
from shortGPT.config.languages import ACRONYM_LANGUAGE_MAPPING, Language | |
from shortGPT.editing_framework.editing_engine import (EditingEngine, | |
EditingStep) | |
from shortGPT.editing_utils.captions import (getCaptionsWithTime, | |
getSpeechBlocks) | |
from shortGPT.editing_utils.handle_videos import get_aspect_ratio | |
from shortGPT.engine.abstract_content_engine import CONTENT_DB, AbstractContentEngine | |
from shortGPT.gpt.gpt_translate import translateContent | |
class MultiLanguageTranslationEngine(AbstractContentEngine): | |
def __init__(self, voiceModule: VoiceModule, src_url: str = "", target_language: Language = Language.ENGLISH, use_captions=False, id=""): | |
super().__init__(id, "content_translation", target_language, voiceModule) | |
if not id: | |
self._db_should_translate = True | |
if src_url: | |
self._db_src_url = src_url | |
self._db_use_captions = use_captions | |
self._db_target_language = target_language.value | |
self.stepDict = { | |
1: self._transcribe_audio, | |
2: self._translate_content, | |
3: self._generate_translated_audio, | |
4: self._edit_and_render_video, | |
5: self._add_metadata | |
} | |
def _transcribe_audio(self): | |
cached_translation = CONTENT_DB.content_collection.find_one({ | |
"content_type": 'content_translation', | |
'src_url': self._db_src_url, | |
'ready_to_upload': True | |
}) | |
if not (cached_translation and 'speech_blocks' in cached_translation and 'original_language' in cached_translation): | |
video_audio, _ = get_asset_duration(self._db_src_url, isVideo=False) | |
self.verifyParameters(content_path=video_audio) | |
self.logger(f"1/5 - Transcribing original audio to text...") | |
whispered = audioToText(video_audio, model_size='base') | |
self._db_speech_blocks = getSpeechBlocks(whispered, silence_time=0.8) | |
self._db_original_language = whispered['language'] | |
if (ACRONYM_LANGUAGE_MAPPING.get(self._db_original_language) == Language(self._db_target_language)): | |
self._db_translated_timed_sentences = self._db_speech_blocks | |
self._db_should_translate = False | |
expected_chars = len("".join([text for _, text in self._db_speech_blocks])) | |
chars_remaining = self.voiceModule.get_remaining_characters() | |
if chars_remaining < expected_chars: | |
raise Exception( | |
f"Your VoiceModule's key doesn't have enough characters to totally translate this video | Remaining: {chars_remaining} | Number of characters to translate: {expected_chars}") | |
def _translate_content(self): | |
if (self._db_should_translate): | |
self.verifyParameters(_db_speech_blocks=self._db_speech_blocks) | |
translated_timed_sentences = [] | |
for i, ((t1, t2), text) in tqdm(enumerate(self._db_speech_blocks), desc="Translating content"): | |
self.logger(f"2/5 - Translating text content - {i+1} / {len(self._db_speech_blocks)}") | |
translated_text = translateContent(text, self._db_target_language) | |
translated_timed_sentences.append([[t1, t2], translated_text]) | |
self._db_translated_timed_sentences = translated_timed_sentences | |
def _generate_translated_audio(self): | |
self.verifyParameters(translated_timed_sentences=self._db_translated_timed_sentences) | |
translated_audio_blocks = [] | |
for i, ((t1, t2), translated_text) in tqdm(enumerate(self._db_translated_timed_sentences), desc="Generating translated audio"): | |
self.logger(f"3/5 - Generating translated audio - {i+1} / {len(self._db_translated_timed_sentences)}") | |
translated_voice = self.voiceModule.generate_voice(translated_text, self.dynamicAssetDir+f"translated_{i}_{self._db_target_language}.wav") | |
if not translated_voice: | |
raise Exception('An error happending during audio voice creation') | |
final_audio_path = speedUpAudio(translated_voice, self.dynamicAssetDir+f"translated_{i}_{self._db_target_language}_spedup.wav", expected_duration=t2-t1 - 0.05) | |
_, translated_duration = get_asset_duration(final_audio_path, isVideo=False) | |
translated_audio_blocks.append([[t1, t1+translated_duration], final_audio_path]) | |
self._db_audio_bits = translated_audio_blocks | |
def _edit_and_render_video(self): | |
self.verifyParameters(_db_audio_bits=self._db_audio_bits) | |
self.logger(f"4.1 / 5 - Preparing automated editing") | |
target_language = Language(self._db_target_language) | |
input_video, video_length = get_asset_duration(self._db_src_url) | |
video_audio, _ = get_asset_duration(self._db_src_url, isVideo=False) | |
editing_engine = EditingEngine() | |
editing_engine.addEditingStep(EditingStep.ADD_BACKGROUND_VIDEO, {'url': input_video, "set_time_start": 0, "set_time_end": video_length}) | |
last_t2 = 0 | |
for (t1, t2), audio_path in self._db_audio_bits: | |
t2+=-0.05 | |
editing_engine.addEditingStep(EditingStep.INSERT_AUDIO, {'url': audio_path, 'set_time_start': t1, 'set_time_end': t2}) | |
if t1-last_t2 >4: | |
editing_engine.addEditingStep(EditingStep.EXTRACT_AUDIO, {"url": video_audio, "subclip": {"t_start": last_t2, "t_end": t1}, "set_time_start": last_t2, "set_time_end": t1}) | |
last_t2 = t2 | |
if video_length - last_t2 >4: | |
editing_engine.addEditingStep(EditingStep.EXTRACT_AUDIO, {"url": video_audio, "subclip": {"t_start": last_t2, "t_end": video_length}, "set_time_start": last_t2, "set_time_end": video_length}) | |
if self._db_use_captions: | |
is_landscape = get_aspect_ratio(input_video) > 1 | |
if not self._db_timed_translated_captions: | |
if not self._db_translated_voiceover_path: | |
self.logger(f"4.5 / 5 - Generating captions in {target_language.value}") | |
editing_engine.generateAudio(self.dynamicAssetDir+"translated_voiceover.wav") | |
self._db_translated_voiceover_path = self.dynamicAssetDir+"translated_voiceover.wav" | |
whispered_translated = audioToText(self._db_translated_voiceover_path, model_size='base') | |
timed_translated_captions = getCaptionsWithTime(whispered_translated, maxCaptionSize=50 if is_landscape else 15, considerPunctuation=True) | |
self._db_timed_translated_captions = [[[t1,t2], text] for (t1, t2), text in timed_translated_captions if t2 - t1 <= 4] | |
for (t1, t2), text in self._db_timed_translated_captions: | |
caption_key = "LANDSCAPE" if is_landscape else "SHORT" | |
caption_key += "_ARABIC" if target_language == Language.ARABIC else "" | |
caption_type = getattr(EditingStep, f"ADD_CAPTION_{caption_key}") | |
editing_engine.addEditingStep(caption_type, {'text': text, "set_time_start": t1, "set_time_end": t2}) | |
self._db_video_path = self.dynamicAssetDir+"translated_content.mp4" | |
editing_engine.renderVideo(self._db_video_path, logger= self.logger if self.logger is not self.default_logger else None) | |
def _add_metadata(self): | |
self.logger(f"5 / 5 - Saving translated video") | |
now = datetime.datetime.now() | |
date_str = now.strftime("%Y-%m-%d_%H-%M-%S") | |
newFileName = f"videos/{date_str} - " + \ | |
re.sub(r"[^a-zA-Z0-9 '\n\.]", '', f"translated_content_to_{self._db_target_language}") | |
shutil.move(self._db_video_path, newFileName+".mp4") | |
self._db_video_path = newFileName+".mp4" | |
self._db_ready_to_upload = True | |