diff --git "a/soni_translate/text_to_speech.py" "b/soni_translate/text_to_speech.py" --- "a/soni_translate/text_to_speech.py" +++ "b/soni_translate/text_to_speech.py" @@ -1,1574 +1,1574 @@ -from gtts import gTTS -import edge_tts, asyncio, json, glob # noqa -from tqdm import tqdm -import librosa, os, re, torch, gc, subprocess # noqa -from .language_configuration import ( - fix_code_language, - BARK_VOICES_LIST, - VITS_VOICES_LIST, -) -from .utils import ( - download_manager, - create_directories, - copy_files, - rename_file, - remove_directory_contents, - remove_files, - run_command, -) -import numpy as np -from typing import Any, Dict -from pathlib import Path -import soundfile as sf -import platform -import logging -import traceback -from .logging_setup import logger - - -class TTS_OperationError(Exception): - def __init__(self, message="The operation did not complete successfully."): - self.message = message - super().__init__(self.message) - - -def verify_saved_file_and_size(filename): - if not os.path.exists(filename): - raise TTS_OperationError(f"File '{filename}' was not saved.") - if os.path.getsize(filename) == 0: - raise TTS_OperationError( - f"File '{filename}' has a zero size. " - "Related to incorrect TTS for the target language" - ) - - -def error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename): - traceback.print_exc() - logger.error(f"Error: {str(error)}") - try: - from tempfile import TemporaryFile - - tts = gTTS(segment["text"], lang=fix_code_language(TRANSLATE_AUDIO_TO)) - # tts.save(filename) - f = TemporaryFile() - tts.write_to_fp(f) - - # Reset the file pointer to the beginning of the file - f.seek(0) - - # Read audio data from the TemporaryFile using soundfile - audio_data, samplerate = sf.read(f) - f.close() # Close the TemporaryFile - sf.write( - filename, audio_data, samplerate, format="ogg", subtype="vorbis" - ) - - logger.warning( - 'TTS auxiliary will be utilized ' - f'rather than TTS: {segment["tts_name"]}' - ) - verify_saved_file_and_size(filename) - except Exception as error: - logger.critical(f"Error: {str(error)}") - sample_rate_aux = 22050 - duration = float(segment["end"]) - float(segment["start"]) - data = np.zeros(int(sample_rate_aux * duration)).astype(np.float32) - sf.write( - filename, data, sample_rate_aux, format="ogg", subtype="vorbis" - ) - logger.error("Audio will be replaced -> [silent audio].") - verify_saved_file_and_size(filename) - - -def pad_array(array, sr): - - if isinstance(array, list): - array = np.array(array) - - if not array.shape[0]: - raise ValueError("The generated audio does not contain any data") - - valid_indices = np.where(np.abs(array) > 0.001)[0] - - if len(valid_indices) == 0: - logger.debug(f"No valid indices: {array}") - return array - - try: - pad_indice = int(0.1 * sr) - start_pad = max(0, valid_indices[0] - pad_indice) - end_pad = min(len(array), valid_indices[-1] + 1 + pad_indice) - padded_array = array[start_pad:end_pad] - return padded_array - except Exception as error: - logger.error(str(error)) - return array - - -# ===================================== -# EDGE TTS -# ===================================== - - -def edge_tts_voices_list(): - try: - completed_process = subprocess.run( - ["edge-tts", "--list-voices"], capture_output=True, text=True - ) - lines = completed_process.stdout.strip().split("\n") - except Exception as error: - logger.debug(str(error)) - lines = [] - - voices = [] - for line in lines: - if line.startswith("Name: "): - voice_entry = {} - voice_entry["Name"] = line.split(": ")[1] - elif line.startswith("Gender: "): - voice_entry["Gender"] = line.split(": ")[1] - voices.append(voice_entry) - - formatted_voices = [ - f"{entry['Name']}-{entry['Gender']}" for entry in voices - ] - - if not formatted_voices: - logger.warning( - "The list of Edge TTS voices could not be obtained, " - "switching to an alternative method" - ) - tts_voice_list = asyncio.new_event_loop().run_until_complete( - edge_tts.list_voices() - ) - formatted_voices = sorted( - [f"{v['ShortName']}-{v['Gender']}" for v in tts_voice_list] - ) - - if not formatted_voices: - logger.error("Can't get EDGE TTS - list voices") - - return formatted_voices - - -def segments_egde_tts(filtered_edge_segments, TRANSLATE_AUDIO_TO, is_gui): - for segment in tqdm(filtered_edge_segments["segments"]): - speaker = segment["speaker"] # noqa - text = segment["text"] - start = segment["start"] - tts_name = segment["tts_name"] - - # make the tts audio - filename = f"audio/{start}.ogg" - temp_file = filename[:-3] + "mp3" - - logger.info(f"{text} >> {filename}") - try: - if is_gui: - asyncio.run( - edge_tts.Communicate( - text, "-".join(tts_name.split("-")[:-1]) - ).save(temp_file) - ) - else: - # nest_asyncio.apply() if not is_gui else None - command = f'edge-tts -t "{text}" -v "{tts_name.replace("-Male", "").replace("-Female", "")}" --write-media "{temp_file}"' - run_command(command) - verify_saved_file_and_size(temp_file) - - data, sample_rate = sf.read(temp_file) - data = pad_array(data, sample_rate) - # os.remove(temp_file) - - # Save file - sf.write( - file=filename, - samplerate=sample_rate, - data=data, - format="ogg", - subtype="vorbis", - ) - verify_saved_file_and_size(filename) - - except Exception as error: - error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) - - -# ===================================== -# BARK TTS -# ===================================== - - -def segments_bark_tts( - filtered_bark_segments, TRANSLATE_AUDIO_TO, model_id_bark="suno/bark-small" -): - from transformers import AutoProcessor, BarkModel - from optimum.bettertransformer import BetterTransformer - - device = os.environ.get("SONITR_DEVICE") - torch_dtype_env = torch.float16 if device == "cuda" else torch.float32 - - # load model bark - model = BarkModel.from_pretrained( - model_id_bark, torch_dtype=torch_dtype_env - ).to(device) - model = model.to(device) - processor = AutoProcessor.from_pretrained( - model_id_bark, return_tensors="pt" - ) # , padding=True - if device == "cuda": - # convert to bettertransformer - model = BetterTransformer.transform(model, keep_original_model=False) - # enable CPU offload - # model.enable_cpu_offload() - sampling_rate = model.generation_config.sample_rate - - # filtered_segments = filtered_bark_segments['segments'] - # Sorting the segments by 'tts_name' - # sorted_segments = sorted(filtered_segments, key=lambda x: x['tts_name']) - # logger.debug(sorted_segments) - - for segment in tqdm(filtered_bark_segments["segments"]): - speaker = segment["speaker"] # noqa - text = segment["text"] - start = segment["start"] - tts_name = segment["tts_name"] - - inputs = processor(text, voice_preset=BARK_VOICES_LIST[tts_name]).to( - device - ) - - # make the tts audio - filename = f"audio/{start}.ogg" - logger.info(f"{text} >> {filename}") - try: - # Infer - with torch.inference_mode(): - speech_output = model.generate( - **inputs, - do_sample=True, - fine_temperature=0.4, - coarse_temperature=0.8, - pad_token_id=processor.tokenizer.pad_token_id, - ) - # Save file - data_tts = pad_array( - speech_output.cpu().numpy().squeeze().astype(np.float32), - sampling_rate, - ) - sf.write( - file=filename, - samplerate=sampling_rate, - data=data_tts, - format="ogg", - subtype="vorbis", - ) - verify_saved_file_and_size(filename) - except Exception as error: - error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) - gc.collect() - torch.cuda.empty_cache() - try: - del processor - del model - gc.collect() - torch.cuda.empty_cache() - except Exception as error: - logger.error(str(error)) - gc.collect() - torch.cuda.empty_cache() - - -# ===================================== -# VITS TTS -# ===================================== - - -def uromanize(input_string): - """Convert non-Roman strings to Roman using the `uroman` perl package.""" - # script_path = os.path.join(uroman_path, "bin", "uroman.pl") - - if not os.path.exists("./uroman"): - logger.info( - "Clonning repository uroman https://github.com/isi-nlp/uroman.git" - " for romanize the text" - ) - process = subprocess.Popen( - ["git", "clone", "https://github.com/isi-nlp/uroman.git"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - stdout, stderr = process.communicate() - script_path = os.path.join("./uroman", "bin", "uroman.pl") - - command = ["perl", script_path] - - process = subprocess.Popen( - command, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - # Execute the perl command - stdout, stderr = process.communicate(input=input_string.encode()) - - if process.returncode != 0: - raise ValueError(f"Error {process.returncode}: {stderr.decode()}") - - # Return the output as a string and skip the new-line character at the end - return stdout.decode()[:-1] - - -def segments_vits_tts(filtered_vits_segments, TRANSLATE_AUDIO_TO): - from transformers import VitsModel, AutoTokenizer - - filtered_segments = filtered_vits_segments["segments"] - # Sorting the segments by 'tts_name' - sorted_segments = sorted(filtered_segments, key=lambda x: x["tts_name"]) - logger.debug(sorted_segments) - - model_name_key = None - for segment in tqdm(sorted_segments): - speaker = segment["speaker"] # noqa - text = segment["text"] - start = segment["start"] - tts_name = segment["tts_name"] - - if tts_name != model_name_key: - model_name_key = tts_name - model = VitsModel.from_pretrained(VITS_VOICES_LIST[tts_name]) - tokenizer = AutoTokenizer.from_pretrained( - VITS_VOICES_LIST[tts_name] - ) - sampling_rate = model.config.sampling_rate - - if tokenizer.is_uroman: - romanize_text = uromanize(text) - logger.debug(f"Romanize text: {romanize_text}") - inputs = tokenizer(romanize_text, return_tensors="pt") - else: - inputs = tokenizer(text, return_tensors="pt") - - # make the tts audio - filename = f"audio/{start}.ogg" - logger.info(f"{text} >> {filename}") - try: - # Infer - with torch.no_grad(): - speech_output = model(**inputs).waveform - - data_tts = pad_array( - speech_output.cpu().numpy().squeeze().astype(np.float32), - sampling_rate, - ) - # Save file - sf.write( - file=filename, - samplerate=sampling_rate, - data=data_tts, - format="ogg", - subtype="vorbis", - ) - verify_saved_file_and_size(filename) - except Exception as error: - error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) - gc.collect() - torch.cuda.empty_cache() - try: - del tokenizer - del model - gc.collect() - torch.cuda.empty_cache() - except Exception as error: - logger.error(str(error)) - gc.collect() - torch.cuda.empty_cache() - - -# ===================================== -# Coqui XTTS -# ===================================== - - -def coqui_xtts_voices_list(): - main_folder = "_XTTS_" - pattern_coqui = re.compile(r".+\.(wav|mp3|ogg|m4a)$") - pattern_automatic_speaker = re.compile(r"AUTOMATIC_SPEAKER_\d+\.wav$") - - # List only files in the directory matching the pattern but not matching - # AUTOMATIC_SPEAKER_00.wav, AUTOMATIC_SPEAKER_01.wav, etc. - wav_voices = [ - "_XTTS_/" + f - for f in os.listdir(main_folder) - if os.path.isfile(os.path.join(main_folder, f)) - and pattern_coqui.match(f) - and not pattern_automatic_speaker.match(f) - ] - - return ["_XTTS_/AUTOMATIC.wav"] + wav_voices - - -def seconds_to_hhmmss_ms(seconds): - hours = seconds // 3600 - minutes = (seconds % 3600) // 60 - seconds = seconds % 60 - milliseconds = int((seconds - int(seconds)) * 1000) - return "%02d:%02d:%02d.%03d" % (hours, minutes, int(seconds), milliseconds) - - -def audio_trimming(audio_path, destination, start, end): - if isinstance(start, (int, float)): - start = seconds_to_hhmmss_ms(start) - if isinstance(end, (int, float)): - end = seconds_to_hhmmss_ms(end) - - if destination: - file_directory = destination - else: - file_directory = os.path.dirname(audio_path) - - file_name = os.path.splitext(os.path.basename(audio_path))[0] - file_ = f"{file_name}_trim.wav" - # file_ = f'{os.path.splitext(audio_path)[0]}_trim.wav' - output_path = os.path.join(file_directory, file_) - - # -t (duration from -ss) | -to (time stop) | -af silenceremove=1:0:-50dB (remove silence) - command = f'ffmpeg -y -loglevel error -i "{audio_path}" -ss {start} -to {end} -acodec pcm_s16le -f wav "{output_path}"' - run_command(command) - - return output_path - - -def convert_to_xtts_good_sample(audio_path: str = "", destination: str = ""): - if destination: - file_directory = destination - else: - file_directory = os.path.dirname(audio_path) - - file_name = os.path.splitext(os.path.basename(audio_path))[0] - file_ = f"{file_name}_good_sample.wav" - # file_ = f'{os.path.splitext(audio_path)[0]}_good_sample.wav' - mono_path = os.path.join(file_directory, file_) # get root - - command = f'ffmpeg -y -loglevel error -i "{audio_path}" -ac 1 -ar 22050 -sample_fmt s16 -f wav "{mono_path}"' - run_command(command) - - return mono_path - - -def sanitize_file_name(file_name): - import unicodedata - - # Normalize the string to NFKD form to separate combined characters into - # base characters and diacritics - normalized_name = unicodedata.normalize("NFKD", file_name) - # Replace any non-ASCII characters or special symbols with an underscore - sanitized_name = re.sub(r"[^\w\s.-]", "_", normalized_name) - return sanitized_name - - -def create_wav_file_vc( - sample_name="", # name final file - audio_wav="", # path - start=None, # trim start - end=None, # trim end - output_final_path="_XTTS_", - get_vocals_dereverb=True, -): - sample_name = sample_name if sample_name else "default_name" - sample_name = sanitize_file_name(sample_name) - audio_wav = audio_wav if isinstance(audio_wav, str) else audio_wav.name - - BASE_DIR = ( - "." # os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - ) - - output_dir = os.path.join(BASE_DIR, "clean_song_output") # remove content - # remove_directory_contents(output_dir) - - if start or end: - # Cut file - audio_segment = audio_trimming(audio_wav, output_dir, start, end) - else: - # Complete file - audio_segment = audio_wav - - from .mdx_net import process_uvr_task - - try: - _, _, _, _, audio_segment = process_uvr_task( - orig_song_path=audio_segment, - main_vocals=True, - dereverb=get_vocals_dereverb, - ) - except Exception as error: - logger.error(str(error)) - - sample = convert_to_xtts_good_sample(audio_segment) - - sample_name = f"{sample_name}.wav" - sample_rename = rename_file(sample, sample_name) - - copy_files(sample_rename, output_final_path) - - final_sample = os.path.join(output_final_path, sample_name) - if os.path.exists(final_sample): - logger.info(final_sample) - return final_sample - else: - raise Exception(f"Error wav: {final_sample}") - - -def create_new_files_for_vc( - speakers_coqui, - segments_base, - dereverb_automatic=True -): - # before function delete automatic delete_previous_automatic - output_dir = os.path.join(".", "clean_song_output") # remove content - remove_directory_contents(output_dir) - - for speaker in speakers_coqui: - filtered_speaker = [ - segment - for segment in segments_base - if segment["speaker"] == speaker - ] - if len(filtered_speaker) > 4: - filtered_speaker = filtered_speaker[1:] - if filtered_speaker[0]["tts_name"] == "_XTTS_/AUTOMATIC.wav": - name_automatic_wav = f"AUTOMATIC_{speaker}" - if os.path.exists(f"_XTTS_/{name_automatic_wav}.wav"): - logger.info(f"WAV automatic {speaker} exists") - # path_wav = path_automatic_wav - pass - else: - # create wav - wav_ok = False - for seg in filtered_speaker: - duration = float(seg["end"]) - float(seg["start"]) - if duration > 7.0 and duration < 12.0: - logger.info( - f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {duration}, {seg["text"]}' - ) - create_wav_file_vc( - sample_name=name_automatic_wav, - audio_wav="audio.wav", - start=(float(seg["start"]) + 1.0), - end=(float(seg["end"]) - 1.0), - get_vocals_dereverb=dereverb_automatic, - ) - wav_ok = True - break - - if not wav_ok: - logger.info("Taking the first segment") - seg = filtered_speaker[0] - logger.info( - f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {seg["text"]}' - ) - max_duration = float(seg["end"]) - float(seg["start"]) - max_duration = max(2.0, min(max_duration, 9.0)) - - create_wav_file_vc( - sample_name=name_automatic_wav, - audio_wav="audio.wav", - start=(float(seg["start"])), - end=(float(seg["start"]) + max_duration), - get_vocals_dereverb=dereverb_automatic, - ) - - -def segments_coqui_tts( - filtered_coqui_segments, - TRANSLATE_AUDIO_TO, - model_id_coqui="tts_models/multilingual/multi-dataset/xtts_v2", - speakers_coqui=None, - delete_previous_automatic=True, - dereverb_automatic=True, - emotion=None, -): - """XTTS - Install: - pip install -q TTS==0.21.1 - pip install -q numpy==1.23.5 - - Notes: - - tts_name is the wav|mp3|ogg|m4a file for VC - """ - from TTS.api import TTS - - TRANSLATE_AUDIO_TO = fix_code_language(TRANSLATE_AUDIO_TO, syntax="coqui") - supported_lang_coqui = [ - "zh-cn", - "en", - "fr", - "de", - "it", - "pt", - "pl", - "tr", - "ru", - "nl", - "cs", - "ar", - "es", - "hu", - "ko", - "ja", - ] - if TRANSLATE_AUDIO_TO not in supported_lang_coqui: - raise TTS_OperationError( - f"'{TRANSLATE_AUDIO_TO}' is not a supported language for Coqui XTTS" - ) - # Emotion and speed can only be used with Coqui Studio models. discontinued - # emotions = ["Neutral", "Happy", "Sad", "Angry", "Dull"] - - if delete_previous_automatic: - for spk in speakers_coqui: - remove_files(f"_XTTS_/AUTOMATIC_{spk}.wav") - - directory_audios_vc = "_XTTS_" - create_directories(directory_audios_vc) - create_new_files_for_vc( - speakers_coqui, - filtered_coqui_segments["segments"], - dereverb_automatic, - ) - - # Init TTS - device = os.environ.get("SONITR_DEVICE") - model = TTS(model_id_coqui).to(device) - sampling_rate = 24000 - - # filtered_segments = filtered_coqui_segments['segments'] - # Sorting the segments by 'tts_name' - # sorted_segments = sorted(filtered_segments, key=lambda x: x['tts_name']) - # logger.debug(sorted_segments) - - for segment in tqdm(filtered_coqui_segments["segments"]): - speaker = segment["speaker"] - text = segment["text"] - start = segment["start"] - tts_name = segment["tts_name"] - if tts_name == "_XTTS_/AUTOMATIC.wav": - tts_name = f"_XTTS_/AUTOMATIC_{speaker}.wav" - - # make the tts audio - filename = f"audio/{start}.ogg" - logger.info(f"{text} >> {filename}") - try: - # Infer - wav = model.tts( - text=text, speaker_wav=tts_name, language=TRANSLATE_AUDIO_TO - ) - data_tts = pad_array( - wav, - sampling_rate, - ) - # Save file - sf.write( - file=filename, - samplerate=sampling_rate, - data=data_tts, - format="ogg", - subtype="vorbis", - ) - verify_saved_file_and_size(filename) - except Exception as error: - error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) - gc.collect() - torch.cuda.empty_cache() - try: - del model - gc.collect() - torch.cuda.empty_cache() - except Exception as error: - logger.error(str(error)) - gc.collect() - torch.cuda.empty_cache() - - -# ===================================== -# PIPER TTS -# ===================================== - - -def piper_tts_voices_list(): - file_path = download_manager( - url="https://huggingface.co/rhasspy/piper-voices/resolve/main/voices.json", - path="./PIPER_MODELS", - ) - - with open(file_path, "r", encoding="utf8") as file: - data = json.load(file) - piper_id_models = [key + " VITS-onnx" for key in data.keys()] - - return piper_id_models - - -def replace_text_in_json(file_path, key_to_replace, new_text, condition=None): - # Read the JSON file - with open(file_path, "r", encoding="utf-8") as file: - data = json.load(file) - - # Modify the specified key's value with the new text - if key_to_replace in data: - if condition: - value_condition = condition - else: - value_condition = data[key_to_replace] - - if data[key_to_replace] == value_condition: - data[key_to_replace] = new_text - - # Write the modified content back to the JSON file - with open(file_path, "w") as file: - json.dump( - data, file, indent=2 - ) # Write the modified data back to the file with indentation for readability - - -def load_piper_model( - model: str, - data_dir: list, - download_dir: str = "", - update_voices: bool = False, -): - from piper import PiperVoice - from piper.download import ensure_voice_exists, find_voice, get_voices - - try: - import onnxruntime as rt - - if rt.get_device() == "GPU" and os.environ.get("SONITR_DEVICE") == "cuda": - logger.debug("onnxruntime device > GPU") - cuda = True - else: - logger.info( - "onnxruntime device > CPU" - ) # try pip install onnxruntime-gpu - cuda = False - except Exception as error: - raise TTS_OperationError(f"onnxruntime error: {str(error)}") - - # Disable CUDA in Windows - if platform.system() == "Windows": - logger.info("Employing CPU exclusivity with Piper TTS") - cuda = False - - if not download_dir: - # Download to first data directory by default - download_dir = data_dir[0] - else: - data_dir = [os.path.join(data_dir[0], download_dir)] - - # Download voice if file doesn't exist - model_path = Path(model) - if not model_path.exists(): - # Load voice info - voices_info = get_voices(download_dir, update_voices=update_voices) - - # Resolve aliases for backwards compatibility with old voice names - aliases_info: Dict[str, Any] = {} - for voice_info in voices_info.values(): - for voice_alias in voice_info.get("aliases", []): - aliases_info[voice_alias] = {"_is_alias": True, **voice_info} - - voices_info.update(aliases_info) - ensure_voice_exists(model, data_dir, download_dir, voices_info) - model, config = find_voice(model, data_dir) - - replace_text_in_json( - config, "phoneme_type", "espeak", "PhonemeType.ESPEAK" - ) - - # Load voice - voice = PiperVoice.load(model, config_path=config, use_cuda=cuda) - - return voice - - -def synthesize_text_to_audio_np_array(voice, text, synthesize_args): - audio_stream = voice.synthesize_stream_raw(text, **synthesize_args) - - # Collect the audio bytes into a single NumPy array - audio_data = b"" - for audio_bytes in audio_stream: - audio_data += audio_bytes - - # Ensure correct data type and convert audio bytes to NumPy array - audio_np = np.frombuffer(audio_data, dtype=np.int16) - return audio_np - - -def segments_vits_onnx_tts(filtered_onnx_vits_segments, TRANSLATE_AUDIO_TO): - """ - Install: - pip install -q piper-tts==1.2.0 onnxruntime-gpu # for cuda118 - """ - - data_dir = [ - str(Path.cwd()) - ] # "Data directory to check for downloaded models (default: current directory)" - download_dir = "PIPER_MODELS" - # model_name = "en_US-lessac-medium" tts_name in a dict like VITS - update_voices = True # "Download latest voices.json during startup", - - synthesize_args = { - "speaker_id": None, - "length_scale": 1.0, - "noise_scale": 0.667, - "noise_w": 0.8, - "sentence_silence": 0.0, - } - - filtered_segments = filtered_onnx_vits_segments["segments"] - # Sorting the segments by 'tts_name' - sorted_segments = sorted(filtered_segments, key=lambda x: x["tts_name"]) - logger.debug(sorted_segments) - - model_name_key = None - for segment in tqdm(sorted_segments): - speaker = segment["speaker"] # noqa - text = segment["text"] - start = segment["start"] - tts_name = segment["tts_name"].replace(" VITS-onnx", "") - - if tts_name != model_name_key: - model_name_key = tts_name - model = load_piper_model( - tts_name, data_dir, download_dir, update_voices - ) - sampling_rate = model.config.sample_rate - - # make the tts audio - filename = f"audio/{start}.ogg" - logger.info(f"{text} >> {filename}") - try: - # Infer - speech_output = synthesize_text_to_audio_np_array( - model, text, synthesize_args - ) - data_tts = pad_array( - speech_output, # .cpu().numpy().squeeze().astype(np.float32), - sampling_rate, - ) - # Save file - sf.write( - file=filename, - samplerate=sampling_rate, - data=data_tts, - format="ogg", - subtype="vorbis", - ) - verify_saved_file_and_size(filename) - except Exception as error: - error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) - gc.collect() - torch.cuda.empty_cache() - try: - del model - gc.collect() - torch.cuda.empty_cache() - except Exception as error: - logger.error(str(error)) - gc.collect() - torch.cuda.empty_cache() - - -# ===================================== -# CLOSEAI TTS -# ===================================== - - -def segments_openai_tts( - filtered_openai_tts_segments, TRANSLATE_AUDIO_TO -): - from openai import OpenAI - - client = OpenAI() - sampling_rate = 24000 - - # filtered_segments = filtered_openai_tts_segments['segments'] - # Sorting the segments by 'tts_name' - # sorted_segments = sorted(filtered_segments, key=lambda x: x['tts_name']) - - for segment in tqdm(filtered_openai_tts_segments["segments"]): - speaker = segment["speaker"] # noqa - text = segment["text"].strip() - start = segment["start"] - tts_name = segment["tts_name"] - - # make the tts audio - filename = f"audio/{start}.ogg" - logger.info(f"{text} >> {filename}") - - try: - # Request - response = client.audio.speech.create( - model="tts-1-hd" if "HD" in tts_name else "tts-1", - voice=tts_name.split()[0][1:], - response_format="wav", - input=text - ) - - audio_bytes = b'' - for data in response.iter_bytes(chunk_size=4096): - audio_bytes += data - - speech_output = np.frombuffer(audio_bytes, dtype=np.int16) - - # Save file - data_tts = pad_array( - speech_output[240:], - sampling_rate, - ) - - sf.write( - file=filename, - samplerate=sampling_rate, - data=data_tts, - format="ogg", - subtype="vorbis", - ) - verify_saved_file_and_size(filename) - - except Exception as error: - error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) - - -# ===================================== -# Select task TTS -# ===================================== - - -def find_spkr(pattern, speaker_to_voice, segments): - return [ - speaker - for speaker, voice in speaker_to_voice.items() - if pattern.match(voice) and any( - segment["speaker"] == speaker for segment in segments - ) - ] - - -def filter_by_speaker(speakers, segments): - return { - "segments": [ - segment - for segment in segments - if segment["speaker"] in speakers - ] - } - - -def audio_segmentation_to_voice( - result_diarize, - TRANSLATE_AUDIO_TO, - is_gui, - tts_voice00, - tts_voice01="", - tts_voice02="", - tts_voice03="", - tts_voice04="", - tts_voice05="", - tts_voice06="", - tts_voice07="", - tts_voice08="", - tts_voice09="", - tts_voice10="", - tts_voice11="", - dereverb_automatic=True, - model_id_bark="suno/bark-small", - model_id_coqui="tts_models/multilingual/multi-dataset/xtts_v2", - delete_previous_automatic=True, -): - - remove_directory_contents("audio") - - # Mapping speakers to voice variables - speaker_to_voice = { - "SPEAKER_00": tts_voice00, - "SPEAKER_01": tts_voice01, - "SPEAKER_02": tts_voice02, - "SPEAKER_03": tts_voice03, - "SPEAKER_04": tts_voice04, - "SPEAKER_05": tts_voice05, - "SPEAKER_06": tts_voice06, - "SPEAKER_07": tts_voice07, - "SPEAKER_08": tts_voice08, - "SPEAKER_09": tts_voice09, - "SPEAKER_10": tts_voice10, - "SPEAKER_11": tts_voice11, - } - - # Assign 'SPEAKER_00' to segments without a 'speaker' key - for segment in result_diarize["segments"]: - if "speaker" not in segment: - segment["speaker"] = "SPEAKER_00" - logger.warning( - "NO SPEAKER DETECT IN SEGMENT: First TTS will be used in the" - f" segment time {segment['start'], segment['text']}" - ) - # Assign the TTS name - segment["tts_name"] = speaker_to_voice[segment["speaker"]] - - # Find TTS method - pattern_edge = re.compile(r".*-(Male|Female)$") - pattern_bark = re.compile(r".* BARK$") - pattern_vits = re.compile(r".* VITS$") - pattern_coqui = re.compile(r".+\.(wav|mp3|ogg|m4a)$") - pattern_vits_onnx = re.compile(r".* VITS-onnx$") - pattern_openai_tts = re.compile(r".* OpenAI-TTS$") - - all_segments = result_diarize["segments"] - - speakers_edge = find_spkr(pattern_edge, speaker_to_voice, all_segments) - speakers_bark = find_spkr(pattern_bark, speaker_to_voice, all_segments) - speakers_vits = find_spkr(pattern_vits, speaker_to_voice, all_segments) - speakers_coqui = find_spkr(pattern_coqui, speaker_to_voice, all_segments) - speakers_vits_onnx = find_spkr( - pattern_vits_onnx, speaker_to_voice, all_segments - ) - speakers_openai_tts = find_spkr( - pattern_openai_tts, speaker_to_voice, all_segments - ) - - # Filter method in segments - filtered_edge = filter_by_speaker(speakers_edge, all_segments) - filtered_bark = filter_by_speaker(speakers_bark, all_segments) - filtered_vits = filter_by_speaker(speakers_vits, all_segments) - filtered_coqui = filter_by_speaker(speakers_coqui, all_segments) - filtered_vits_onnx = filter_by_speaker(speakers_vits_onnx, all_segments) - filtered_openai_tts = filter_by_speaker(speakers_openai_tts, all_segments) - - # Infer - if filtered_edge["segments"]: - logger.info(f"EDGE TTS: {speakers_edge}") - segments_egde_tts(filtered_edge, TRANSLATE_AUDIO_TO, is_gui) # mp3 - if filtered_bark["segments"]: - logger.info(f"BARK TTS: {speakers_bark}") - segments_bark_tts( - filtered_bark, TRANSLATE_AUDIO_TO, model_id_bark - ) # wav - if filtered_vits["segments"]: - logger.info(f"VITS TTS: {speakers_vits}") - segments_vits_tts(filtered_vits, TRANSLATE_AUDIO_TO) # wav - if filtered_coqui["segments"]: - logger.info(f"Coqui TTS: {speakers_coqui}") - segments_coqui_tts( - filtered_coqui, - TRANSLATE_AUDIO_TO, - model_id_coqui, - speakers_coqui, - delete_previous_automatic, - dereverb_automatic, - ) # wav - if filtered_vits_onnx["segments"]: - logger.info(f"PIPER TTS: {speakers_vits_onnx}") - segments_vits_onnx_tts(filtered_vits_onnx, TRANSLATE_AUDIO_TO) # wav - if filtered_openai_tts["segments"]: - logger.info(f"OpenAI TTS: {speakers_openai_tts}") - segments_openai_tts(filtered_openai_tts, TRANSLATE_AUDIO_TO) # wav - - [result.pop("tts_name", None) for result in result_diarize["segments"]] - return [ - speakers_edge, - speakers_bark, - speakers_vits, - speakers_coqui, - speakers_vits_onnx, - speakers_openai_tts - ] - - -def accelerate_segments( - result_diarize, - max_accelerate_audio, - valid_speakers, - acceleration_rate_regulation=False, - folder_output="audio2", -): - logger.info("Apply acceleration") - - ( - speakers_edge, - speakers_bark, - speakers_vits, - speakers_coqui, - speakers_vits_onnx, - speakers_openai_tts - ) = valid_speakers - - create_directories(f"{folder_output}/audio/") - remove_directory_contents(f"{folder_output}/audio/") - - audio_files = [] - speakers_list = [] - - max_count_segments_idx = len(result_diarize["segments"]) - 1 - - for i, segment in tqdm(enumerate(result_diarize["segments"])): - text = segment["text"] # noqa - start = segment["start"] - end = segment["end"] - speaker = segment["speaker"] - - # find name audio - # if speaker in speakers_edge: - filename = f"audio/{start}.ogg" - # elif speaker in speakers_bark + speakers_vits + speakers_coqui + speakers_vits_onnx: - # filename = f"audio/{start}.wav" # wav - - # duration - duration_true = end - start - duration_tts = librosa.get_duration(filename=filename) - - # Accelerate percentage - acc_percentage = duration_tts / duration_true - - # Smoth - if acceleration_rate_regulation and acc_percentage >= 1.3: - try: - next_segment = result_diarize["segments"][ - min(max_count_segments_idx, i + 1) - ] - next_start = next_segment["start"] - next_speaker = next_segment["speaker"] - duration_with_next_start = next_start - start - - if duration_with_next_start > duration_true: - extra_time = duration_with_next_start - duration_true - - if speaker == next_speaker: - # half - smoth_duration = duration_true + (extra_time * 0.5) - else: - # 7/10 - smoth_duration = duration_true + (extra_time * 0.7) - logger.debug( - f"Base acc: {acc_percentage}, " - f"smoth acc: {duration_tts / smoth_duration}" - ) - acc_percentage = max(1.2, (duration_tts / smoth_duration)) - - except Exception as error: - logger.error(str(error)) - - if acc_percentage > max_accelerate_audio: - acc_percentage = max_accelerate_audio - elif acc_percentage <= 1.15 and acc_percentage >= 0.8: - acc_percentage = 1.0 - elif acc_percentage <= 0.79: - acc_percentage = 0.8 - - # Round - acc_percentage = round(acc_percentage + 0.0, 1) - - # Format read if need - if speaker in speakers_edge: - info_enc = sf.info(filename).format - else: - info_enc = "OGG" - - # Apply aceleration or opposite to the audio file in folder_output folder - if acc_percentage == 1.0 and info_enc == "OGG": - copy_files(filename, f"{folder_output}{os.sep}audio") - else: - os.system( - f"ffmpeg -y -loglevel panic -i {filename} -filter:a atempo={acc_percentage} {folder_output}/{filename}" - ) - - if logger.isEnabledFor(logging.DEBUG): - duration_create = librosa.get_duration( - filename=f"{folder_output}/{filename}" - ) - logger.debug( - f"acc_percen is {acc_percentage}, tts duration " - f"is {duration_tts}, new duration is {duration_create}" - f", for {filename}" - ) - - audio_files.append(f"{folder_output}/{filename}") - speaker = "TTS Speaker {:02d}".format(int(speaker[-2:]) + 1) - speakers_list.append(speaker) - - return audio_files, speakers_list - - -# ===================================== -# Tone color converter -# ===================================== - - -def se_process_audio_segments( - source_seg, tone_color_converter, device, remove_previous_processed=True -): - # list wav seg - source_audio_segs = glob.glob(f"{source_seg}/*.wav") - if not source_audio_segs: - raise ValueError( - f"No audio segments found in {str(source_audio_segs)}" - ) - - source_se_path = os.path.join(source_seg, "se.pth") - - # if exist not create wav - if os.path.isfile(source_se_path): - se = torch.load(source_se_path).to(device) - logger.debug(f"Previous created {source_se_path}") - else: - se = tone_color_converter.extract_se(source_audio_segs, source_se_path) - - return se - - -def create_wav_vc( - valid_speakers, - segments_base, - audio_name, - max_segments=10, - target_dir="processed", - get_vocals_dereverb=False, -): - # valid_speakers = list({item['speaker'] for item in segments_base}) - - # Before function delete automatic delete_previous_automatic - output_dir = os.path.join(".", target_dir) # remove content - # remove_directory_contents(output_dir) - - path_source_segments = [] - path_target_segments = [] - for speaker in valid_speakers: - filtered_speaker = [ - segment - for segment in segments_base - if segment["speaker"] == speaker - ] - if len(filtered_speaker) > 4: - filtered_speaker = filtered_speaker[1:] - - dir_name_speaker = speaker + audio_name - dir_name_speaker_tts = "tts" + speaker + audio_name - dir_path_speaker = os.path.join(output_dir, dir_name_speaker) - dir_path_speaker_tts = os.path.join(output_dir, dir_name_speaker_tts) - create_directories([dir_path_speaker, dir_path_speaker_tts]) - - path_target_segments.append(dir_path_speaker) - path_source_segments.append(dir_path_speaker_tts) - - # create wav - max_segments_count = 0 - for seg in filtered_speaker: - duration = float(seg["end"]) - float(seg["start"]) - if duration > 3.0 and duration < 18.0: - logger.info( - f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {duration}, {seg["text"]}' - ) - name_new_wav = str(seg["start"]) - - check_segment_audio_target_file = os.path.join( - dir_path_speaker, f"{name_new_wav}.wav" - ) - - if os.path.exists(check_segment_audio_target_file): - logger.debug( - "Segment vc source exists: " - f"{check_segment_audio_target_file}" - ) - pass - else: - create_wav_file_vc( - sample_name=name_new_wav, - audio_wav="audio.wav", - start=(float(seg["start"]) + 1.0), - end=(float(seg["end"]) - 1.0), - output_final_path=dir_path_speaker, - get_vocals_dereverb=get_vocals_dereverb, - ) - - file_name_tts = f"audio2/audio/{str(seg['start'])}.ogg" - # copy_files(file_name_tts, os.path.join(output_dir, dir_name_speaker_tts) - convert_to_xtts_good_sample( - file_name_tts, dir_path_speaker_tts - ) - - max_segments_count += 1 - if max_segments_count == max_segments: - break - - if max_segments_count == 0: - logger.info("Taking the first segment") - seg = filtered_speaker[0] - logger.info( - f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {seg["text"]}' - ) - max_duration = float(seg["end"]) - float(seg["start"]) - max_duration = max(1.0, min(max_duration, 18.0)) - - name_new_wav = str(seg["start"]) - create_wav_file_vc( - sample_name=name_new_wav, - audio_wav="audio.wav", - start=(float(seg["start"])), - end=(float(seg["start"]) + max_duration), - output_final_path=dir_path_speaker, - get_vocals_dereverb=get_vocals_dereverb, - ) - - file_name_tts = f"audio2/audio/{str(seg['start'])}.ogg" - # copy_files(file_name_tts, os.path.join(output_dir, dir_name_speaker_tts) - convert_to_xtts_good_sample(file_name_tts, dir_path_speaker_tts) - - logger.debug(f"Base: {str(path_source_segments)}") - logger.debug(f"Target: {str(path_target_segments)}") - - return path_source_segments, path_target_segments - - -def toneconverter_openvoice( - result_diarize, - preprocessor_max_segments, - remove_previous_process=True, - get_vocals_dereverb=False, - model="openvoice", -): - audio_path = "audio.wav" - # se_path = "se.pth" - target_dir = "processed" - create_directories(target_dir) - - from openvoice import se_extractor - from openvoice.api import ToneColorConverter - - audio_name = f"{os.path.basename(audio_path).rsplit('.', 1)[0]}_{se_extractor.hash_numpy_array(audio_path)}" - # se_path = os.path.join(target_dir, audio_name, 'se.pth') - - # create wav seg original and target - - valid_speakers = list( - {item["speaker"] for item in result_diarize["segments"]} - ) - - logger.info("Openvoice preprocessor...") - - if remove_previous_process: - remove_directory_contents(target_dir) - - path_source_segments, path_target_segments = create_wav_vc( - valid_speakers, - result_diarize["segments"], - audio_name, - max_segments=preprocessor_max_segments, - get_vocals_dereverb=get_vocals_dereverb, - ) - - logger.info("Openvoice loading model...") - model_path_openvoice = "./OPENVOICE_MODELS" - url_model_openvoice = "https://huggingface.co/myshell-ai/OpenVoice/resolve/main/checkpoints/converter" - - if "v2" in model: - model_path = os.path.join(model_path_openvoice, "v2") - url_model_openvoice = url_model_openvoice.replace( - "OpenVoice", "OpenVoiceV2" - ).replace("checkpoints/", "") - else: - model_path = os.path.join(model_path_openvoice, "v1") - create_directories(model_path) - - config_url = f"{url_model_openvoice}/config.json" - checkpoint_url = f"{url_model_openvoice}/checkpoint.pth" - - config_path = download_manager(url=config_url, path=model_path) - checkpoint_path = download_manager( - url=checkpoint_url, path=model_path - ) - - device = os.environ.get("SONITR_DEVICE") - tone_color_converter = ToneColorConverter(config_path, device=device) - tone_color_converter.load_ckpt(checkpoint_path) - - logger.info("Openvoice tone color converter:") - global_progress_bar = tqdm(total=len(result_diarize["segments"]), desc="Progress") - - for source_seg, target_seg, speaker in zip( - path_source_segments, path_target_segments, valid_speakers - ): - # source_se_path = os.path.join(source_seg, 'se.pth') - source_se = se_process_audio_segments(source_seg, tone_color_converter, device) - # target_se_path = os.path.join(target_seg, 'se.pth') - target_se = se_process_audio_segments(target_seg, tone_color_converter, device) - - # Iterate throw segments - encode_message = "@MyShell" - filtered_speaker = [ - segment - for segment in result_diarize["segments"] - if segment["speaker"] == speaker - ] - for seg in filtered_speaker: - src_path = ( - save_path - ) = f"audio2/audio/{str(seg['start'])}.ogg" # overwrite - logger.debug(f"{src_path}") - - tone_color_converter.convert( - audio_src_path=src_path, - src_se=source_se, - tgt_se=target_se, - output_path=save_path, - message=encode_message, - ) - - global_progress_bar.update(1) - - global_progress_bar.close() - - try: - del tone_color_converter - gc.collect() - torch.cuda.empty_cache() - except Exception as error: - logger.error(str(error)) - gc.collect() - torch.cuda.empty_cache() - - -def toneconverter_freevc( - result_diarize, - remove_previous_process=True, - get_vocals_dereverb=False, -): - audio_path = "audio.wav" - target_dir = "processed" - create_directories(target_dir) - - from openvoice import se_extractor - - audio_name = f"{os.path.basename(audio_path).rsplit('.', 1)[0]}_{se_extractor.hash_numpy_array(audio_path)}" - - # create wav seg; original is target and dubbing is source - valid_speakers = list( - {item["speaker"] for item in result_diarize["segments"]} - ) - - logger.info("FreeVC preprocessor...") - - if remove_previous_process: - remove_directory_contents(target_dir) - - path_source_segments, path_target_segments = create_wav_vc( - valid_speakers, - result_diarize["segments"], - audio_name, - max_segments=1, - get_vocals_dereverb=get_vocals_dereverb, - ) - - logger.info("FreeVC loading model...") - device_id = os.environ.get("SONITR_DEVICE") - device = None if device_id == "cpu" else device_id - try: - from TTS.api import TTS - tts = TTS( - model_name="voice_conversion_models/multilingual/vctk/freevc24", - progress_bar=False - ).to(device) - except Exception as error: - logger.error(str(error)) - logger.error("Error loading the FreeVC model.") - return - - logger.info("FreeVC process:") - global_progress_bar = tqdm(total=len(result_diarize["segments"]), desc="Progress") - - for source_seg, target_seg, speaker in zip( - path_source_segments, path_target_segments, valid_speakers - ): - - filtered_speaker = [ - segment - for segment in result_diarize["segments"] - if segment["speaker"] == speaker - ] - - files_and_directories = os.listdir(target_seg) - wav_files = [file for file in files_and_directories if file.endswith(".wav")] - original_wav_audio_segment = os.path.join(target_seg, wav_files[0]) - - for seg in filtered_speaker: - - src_path = ( - save_path - ) = f"audio2/audio/{str(seg['start'])}.ogg" # overwrite - logger.debug(f"{src_path} - {original_wav_audio_segment}") - - wav = tts.voice_conversion( - source_wav=src_path, - target_wav=original_wav_audio_segment, - ) - - sf.write( - file=save_path, - samplerate=tts.voice_converter.vc_config.audio.output_sample_rate, - data=wav, - format="ogg", - subtype="vorbis", - ) - - global_progress_bar.update(1) - - global_progress_bar.close() - - try: - del tts - gc.collect() - torch.cuda.empty_cache() - except Exception as error: - logger.error(str(error)) - gc.collect() - torch.cuda.empty_cache() - - -def toneconverter( - result_diarize, - preprocessor_max_segments, - remove_previous_process=True, - get_vocals_dereverb=False, - method_vc="freevc" -): - - if method_vc == "freevc": - if preprocessor_max_segments > 1: - logger.info("FreeVC only uses one segment.") - return toneconverter_freevc( - result_diarize, - remove_previous_process=remove_previous_process, - get_vocals_dereverb=get_vocals_dereverb, - ) - elif "openvoice" in method_vc: - return toneconverter_openvoice( - result_diarize, - preprocessor_max_segments, - remove_previous_process=remove_previous_process, - get_vocals_dereverb=get_vocals_dereverb, - model=method_vc, - ) - - -if __name__ == "__main__": - from segments import result_diarize - - audio_segmentation_to_voice( - result_diarize, - TRANSLATE_AUDIO_TO="en", - max_accelerate_audio=2.1, - is_gui=True, - tts_voice00="en-facebook-mms VITS", - tts_voice01="en-CA-ClaraNeural-Female", - tts_voice02="en-GB-ThomasNeural-Male", - tts_voice03="en-GB-SoniaNeural-Female", - tts_voice04="en-NZ-MitchellNeural-Male", - tts_voice05="en-GB-MaisieNeural-Female", - ) +from gtts import gTTS +import edge_tts, asyncio, json, glob # noqa +from tqdm import tqdm +import librosa, os, re, torch, gc, subprocess # noqa +from .language_configuration import ( + fix_code_language, + BARK_VOICES_LIST, + VITS_VOICES_LIST, +) +from .utils import ( + download_manager, + create_directories, + copy_files, + rename_file, + remove_directory_contents, + remove_files, + run_command, +) +import numpy as np +from typing import Any, Dict +from pathlib import Path +import soundfile as sf +import platform +import logging +import traceback +from .logging_setup import logger + + +class TTS_OperationError(Exception): + def __init__(self, message="The operation did not complete successfully."): + self.message = message + super().__init__(self.message) + + +def verify_saved_file_and_size(filename): + if not os.path.exists(filename): + raise TTS_OperationError(f"File '{filename}' was not saved.") + if os.path.getsize(filename) == 0: + raise TTS_OperationError( + f"File '{filename}' has a zero size. " + "Related to incorrect TTS for the target language" + ) + + +def error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename): + traceback.print_exc() + logger.error(f"Error: {str(error)}") + try: + from tempfile import TemporaryFile + + tts = gTTS(segment["text"], lang=fix_code_language(TRANSLATE_AUDIO_TO)) + # tts.save(filename) + f = TemporaryFile() + tts.write_to_fp(f) + + # Reset the file pointer to the beginning of the file + f.seek(0) + + # Read audio data from the TemporaryFile using soundfile + audio_data, samplerate = sf.read(f) + f.close() # Close the TemporaryFile + sf.write( + filename, audio_data, samplerate, format="ogg", subtype="vorbis" + ) + + logger.warning( + 'TTS auxiliary will be utilized ' + f'rather than TTS: {segment["tts_name"]}' + ) + verify_saved_file_and_size(filename) + except Exception as error: + logger.critical(f"Error: {str(error)}") + sample_rate_aux = 22050 + duration = float(segment["end"]) - float(segment["start"]) + data = np.zeros(int(sample_rate_aux * duration)).astype(np.float32) + sf.write( + filename, data, sample_rate_aux, format="ogg", subtype="vorbis" + ) + logger.error("Audio will be replaced -> [silent audio].") + verify_saved_file_and_size(filename) + + +def pad_array(array, sr): + + if isinstance(array, list): + array = np.array(array) + + if not array.shape[0]: + raise ValueError("The generated audio does not contain any data") + + valid_indices = np.where(np.abs(array) > 0.001)[0] + + if len(valid_indices) == 0: + logger.debug(f"No valid indices: {array}") + return array + + try: + pad_indice = int(0.1 * sr) + start_pad = max(0, valid_indices[0] - pad_indice) + end_pad = min(len(array), valid_indices[-1] + 1 + pad_indice) + padded_array = array[start_pad:end_pad] + return padded_array + except Exception as error: + logger.error(str(error)) + return array + + +# ===================================== +# EDGE TTS +# ===================================== + + +def edge_tts_voices_list(): + try: + completed_process = subprocess.run( + ["edge-tts", "--list-voices"], capture_output=True, text=True + ) + lines = completed_process.stdout.strip().split("\n") + except Exception as error: + logger.debug(str(error)) + lines = [] + + voices = [] + for line in lines: + if line.startswith("Name: "): + voice_entry = {} + voice_entry["Name"] = line.split(": ")[1] + elif line.startswith("Gender: "): + voice_entry["Gender"] = line.split(": ")[1] + voices.append(voice_entry) + + formatted_voices = [ + f"{entry['Name']}-{entry['Gender']}" for entry in voices + ] + + if not formatted_voices: + logger.warning( + "The list of Edge TTS voices could not be obtained, " + "switching to an alternative method" + ) + tts_voice_list = asyncio.new_event_loop().run_until_complete( + edge_tts.list_voices() + ) + formatted_voices = sorted( + [f"{v['ShortName']}-{v['Gender']}" for v in tts_voice_list] + ) + + if not formatted_voices: + logger.error("Can't get EDGE TTS - list voices") + + return formatted_voices + + +def segments_egde_tts(filtered_edge_segments, TRANSLATE_AUDIO_TO, is_gui): + for segment in tqdm(filtered_edge_segments["segments"]): + speaker = segment["speaker"] # noqa + text = segment["text"] + start = segment["start"] + tts_name = segment["tts_name"] + + # make the tts audio + filename = f"audio/{start}.ogg" + temp_file = filename[:-3] + "mp3" + + logger.info(f"{text} >> {filename}") + try: + if is_gui: + asyncio.run( + edge_tts.Communicate( + text, "-".join(tts_name.split("-")[:-1]) + ).save(temp_file) + ) + else: + # nest_asyncio.apply() if not is_gui else None + command = f'edge-tts -t "{text}" -v "{tts_name.replace("-Male", "").replace("-Female", "")}" --write-media "{temp_file}"' + run_command(command) + verify_saved_file_and_size(temp_file) + + data, sample_rate = sf.read(temp_file) + data = pad_array(data, sample_rate) + # os.remove(temp_file) + + # Save file + sf.write( + file=filename, + samplerate=sample_rate, + data=data, + format="ogg", + subtype="vorbis", + ) + verify_saved_file_and_size(filename) + + except Exception as error: + error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) + + +# ===================================== +# BARK TTS +# ===================================== + + +def segments_bark_tts( + filtered_bark_segments, TRANSLATE_AUDIO_TO, model_id_bark="suno/bark-small" +): + from transformers import AutoProcessor, BarkModel + from optimum.bettertransformer import BetterTransformer + + device = os.environ.get("SONITR_DEVICE") + torch_dtype_env = torch.float16 if device == "cuda" else torch.float32 + + # load model bark + model = BarkModel.from_pretrained( + model_id_bark, torch_dtype=torch_dtype_env + ).to(device) + model = model.to(device) + processor = AutoProcessor.from_pretrained( + model_id_bark, return_tensors="pt" + ) # , padding=True + if device == "cuda": + # convert to bettertransformer + model = BetterTransformer.transform(model, keep_original_model=False) + # enable CPU offload + # model.enable_cpu_offload() + sampling_rate = model.generation_config.sample_rate + + # filtered_segments = filtered_bark_segments['segments'] + # Sorting the segments by 'tts_name' + # sorted_segments = sorted(filtered_segments, key=lambda x: x['tts_name']) + # logger.debug(sorted_segments) + + for segment in tqdm(filtered_bark_segments["segments"]): + speaker = segment["speaker"] # noqa + text = segment["text"] + start = segment["start"] + tts_name = segment["tts_name"] + + inputs = processor(text, voice_preset=BARK_VOICES_LIST[tts_name]).to( + device + ) + + # make the tts audio + filename = f"audio/{start}.ogg" + logger.info(f"{text} >> {filename}") + try: + # Infer + with torch.inference_mode(): + speech_output = model.generate( + **inputs, + do_sample=True, + fine_temperature=0.4, + coarse_temperature=0.8, + pad_token_id=processor.tokenizer.pad_token_id, + ) + # Save file + data_tts = pad_array( + speech_output.cpu().numpy().squeeze().astype(np.float32), + sampling_rate, + ) + sf.write( + file=filename, + samplerate=sampling_rate, + data=data_tts, + format="ogg", + subtype="vorbis", + ) + verify_saved_file_and_size(filename) + except Exception as error: + error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) + gc.collect() + torch.cuda.empty_cache() + try: + del processor + del model + gc.collect() + torch.cuda.empty_cache() + except Exception as error: + logger.error(str(error)) + gc.collect() + torch.cuda.empty_cache() + + +# ===================================== +# VITS TTS +# ===================================== + + +def uromanize(input_string): + """Convert non-Roman strings to Roman using the `uroman` perl package.""" + # script_path = os.path.join(uroman_path, "bin", "uroman.pl") + + if not os.path.exists("./uroman"): + logger.info( + "Clonning repository uroman https://github.com/isi-nlp/uroman.git" + " for romanize the text" + ) + process = subprocess.Popen( + ["git", "clone", "https://github.com/isi-nlp/uroman.git"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + stdout, stderr = process.communicate() + script_path = os.path.join("./uroman", "bin", "uroman.pl") + + command = ["perl", script_path] + + process = subprocess.Popen( + command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + # Execute the perl command + stdout, stderr = process.communicate(input=input_string.encode()) + + if process.returncode != 0: + raise ValueError(f"Error {process.returncode}: {stderr.decode()}") + + # Return the output as a string and skip the new-line character at the end + return stdout.decode()[:-1] + + +def segments_vits_tts(filtered_vits_segments, TRANSLATE_AUDIO_TO): + from transformers import VitsModel, AutoTokenizer + + filtered_segments = filtered_vits_segments["segments"] + # Sorting the segments by 'tts_name' + sorted_segments = sorted(filtered_segments, key=lambda x: x["tts_name"]) + logger.debug(sorted_segments) + + model_name_key = None + for segment in tqdm(sorted_segments): + speaker = segment["speaker"] # noqa + text = segment["text"] + start = segment["start"] + tts_name = segment["tts_name"] + + if tts_name != model_name_key: + model_name_key = tts_name + model = VitsModel.from_pretrained(VITS_VOICES_LIST[tts_name]) + tokenizer = AutoTokenizer.from_pretrained( + VITS_VOICES_LIST[tts_name] + ) + sampling_rate = model.config.sampling_rate + + if tokenizer.is_uroman: + romanize_text = uromanize(text) + logger.debug(f"Romanize text: {romanize_text}") + inputs = tokenizer(romanize_text, return_tensors="pt") + else: + inputs = tokenizer(text, return_tensors="pt") + + # make the tts audio + filename = f"audio/{start}.ogg" + logger.info(f"{text} >> {filename}") + try: + # Infer + with torch.no_grad(): + speech_output = model(**inputs).waveform + + data_tts = pad_array( + speech_output.cpu().numpy().squeeze().astype(np.float32), + sampling_rate, + ) + # Save file + sf.write( + file=filename, + samplerate=sampling_rate, + data=data_tts, + format="ogg", + subtype="vorbis", + ) + verify_saved_file_and_size(filename) + except Exception as error: + error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) + gc.collect() + torch.cuda.empty_cache() + try: + del tokenizer + del model + gc.collect() + torch.cuda.empty_cache() + except Exception as error: + logger.error(str(error)) + gc.collect() + torch.cuda.empty_cache() + + +# ===================================== +# Coqui XTTS +# ===================================== + + +def coqui_xtts_voices_list(): + main_folder = "_XTTS_" + pattern_coqui = re.compile(r".+\.(wav|mp3|ogg|m4a)$") + pattern_automatic_speaker = re.compile(r"AUTOMATIC_SPEAKER_\d+\.wav$") + + # List only files in the directory matching the pattern but not matching + # AUTOMATIC_SPEAKER_00.wav, AUTOMATIC_SPEAKER_01.wav, etc. + wav_voices = [ + "_XTTS_/" + f + for f in os.listdir(main_folder) + if os.path.isfile(os.path.join(main_folder, f)) + and pattern_coqui.match(f) + and not pattern_automatic_speaker.match(f) + ] + + return ["_XTTS_/AUTOMATIC.wav"] + wav_voices + + +def seconds_to_hhmmss_ms(seconds): + hours = seconds // 3600 + minutes = (seconds % 3600) // 60 + seconds = seconds % 60 + milliseconds = int((seconds - int(seconds)) * 1000) + return "%02d:%02d:%02d.%03d" % (hours, minutes, int(seconds), milliseconds) + + +def audio_trimming(audio_path, destination, start, end): + if isinstance(start, (int, float)): + start = seconds_to_hhmmss_ms(start) + if isinstance(end, (int, float)): + end = seconds_to_hhmmss_ms(end) + + if destination: + file_directory = destination + else: + file_directory = os.path.dirname(audio_path) + + file_name = os.path.splitext(os.path.basename(audio_path))[0] + file_ = f"{file_name}_trim.wav" + # file_ = f'{os.path.splitext(audio_path)[0]}_trim.wav' + output_path = os.path.join(file_directory, file_) + + # -t (duration from -ss) | -to (time stop) | -af silenceremove=1:0:-50dB (remove silence) + command = f'ffmpeg -y -loglevel error -i "{audio_path}" -ss {start} -to {end} -acodec pcm_s16le -f wav "{output_path}"' + run_command(command) + + return output_path + + +def convert_to_xtts_good_sample(audio_path: str = "", destination: str = ""): + if destination: + file_directory = destination + else: + file_directory = os.path.dirname(audio_path) + + file_name = os.path.splitext(os.path.basename(audio_path))[0] + file_ = f"{file_name}_good_sample.wav" + # file_ = f'{os.path.splitext(audio_path)[0]}_good_sample.wav' + mono_path = os.path.join(file_directory, file_) # get root + + command = f'ffmpeg -y -loglevel error -i "{audio_path}" -ac 1 -ar 22050 -sample_fmt s16 -f wav "{mono_path}"' + run_command(command) + + return mono_path + + +def sanitize_file_name(file_name): + import unicodedata + + # Normalize the string to NFKD form to separate combined characters into + # base characters and diacritics + normalized_name = unicodedata.normalize("NFKD", file_name) + # Replace any non-ASCII characters or special symbols with an underscore + sanitized_name = re.sub(r"[^\w\s.-]", "_", normalized_name) + return sanitized_name + + +def create_wav_file_vc( + sample_name="", # name final file + audio_wav="", # path + start=None, # trim start + end=None, # trim end + output_final_path="_XTTS_", + get_vocals_dereverb=True, +): + sample_name = sample_name if sample_name else "default_name" + sample_name = sanitize_file_name(sample_name) + audio_wav = audio_wav if isinstance(audio_wav, str) else audio_wav.name + + BASE_DIR = ( + "." # os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + ) + + output_dir = os.path.join(BASE_DIR, "clean_song_output") # remove content + # remove_directory_contents(output_dir) + + if start or end: + # Cut file + audio_segment = audio_trimming(audio_wav, output_dir, start, end) + else: + # Complete file + audio_segment = audio_wav + + from .mdx_net import process_uvr_task + + try: + _, _, _, _, audio_segment = process_uvr_task( + orig_song_path=audio_segment, + main_vocals=True, + dereverb=get_vocals_dereverb, + ) + except Exception as error: + logger.error(str(error)) + + sample = convert_to_xtts_good_sample(audio_segment) + + sample_name = f"{sample_name}.wav" + sample_rename = rename_file(sample, sample_name) + + copy_files(sample_rename, output_final_path) + + final_sample = os.path.join(output_final_path, sample_name) + if os.path.exists(final_sample): + logger.info(final_sample) + return final_sample + else: + raise Exception(f"Error wav: {final_sample}") + + +def create_new_files_for_vc( + speakers_coqui, + segments_base, + dereverb_automatic=True +): + # before function delete automatic delete_previous_automatic + output_dir = os.path.join(".", "clean_song_output") # remove content + remove_directory_contents(output_dir) + + for speaker in speakers_coqui: + filtered_speaker = [ + segment + for segment in segments_base + if segment["speaker"] == speaker + ] + if len(filtered_speaker) > 4: + filtered_speaker = filtered_speaker[1:] + if filtered_speaker[0]["tts_name"] == "_XTTS_/AUTOMATIC.wav": + name_automatic_wav = f"AUTOMATIC_{speaker}" + if os.path.exists(f"_XTTS_/{name_automatic_wav}.wav"): + logger.info(f"WAV automatic {speaker} exists") + # path_wav = path_automatic_wav + pass + else: + # create wav + wav_ok = False + for seg in filtered_speaker: + duration = float(seg["end"]) - float(seg["start"]) + if duration > 7.0 and duration < 12.0: + logger.info( + f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {duration}, {seg["text"]}' + ) + create_wav_file_vc( + sample_name=name_automatic_wav, + audio_wav="audio.wav", + start=(float(seg["start"]) + 1.0), + end=(float(seg["end"]) - 1.0), + get_vocals_dereverb=dereverb_automatic, + ) + wav_ok = True + break + + if not wav_ok: + logger.info("Taking the first segment") + seg = filtered_speaker[0] + logger.info( + f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {seg["text"]}' + ) + max_duration = float(seg["end"]) - float(seg["start"]) + max_duration = max(2.0, min(max_duration, 9.0)) + + create_wav_file_vc( + sample_name=name_automatic_wav, + audio_wav="audio.wav", + start=(float(seg["start"])), + end=(float(seg["start"]) + max_duration), + get_vocals_dereverb=dereverb_automatic, + ) + + +def segments_coqui_tts( + filtered_coqui_segments, + TRANSLATE_AUDIO_TO, + model_id_coqui="tts_models/multilingual/multi-dataset/xtts_v2", + speakers_coqui=None, + delete_previous_automatic=True, + dereverb_automatic=True, + emotion=None, +): + """XTTS + Install: + pip install -q TTS==0.21.1 + pip install -q numpy==1.23.5 + + Notes: + - tts_name is the wav|mp3|ogg|m4a file for VC + """ + from TTS.api import TTS + + TRANSLATE_AUDIO_TO = fix_code_language(TRANSLATE_AUDIO_TO, syntax="coqui") + supported_lang_coqui = [ + "zh-cn", + "en", + "fr", + "de", + "it", + "pt", + "pl", + "tr", + "ru", + "nl", + "cs", + "ar", + "es", + "hu", + "ko", + "ja", + ] + if TRANSLATE_AUDIO_TO not in supported_lang_coqui: + raise TTS_OperationError( + f"'{TRANSLATE_AUDIO_TO}' is not a supported language for Coqui XTTS" + ) + # Emotion and speed can only be used with Coqui Studio models. discontinued + # emotions = ["Neutral", "Happy", "Sad", "Angry", "Dull"] + + if delete_previous_automatic: + for spk in speakers_coqui: + remove_files(f"_XTTS_/AUTOMATIC_{spk}.wav") + + directory_audios_vc = "_XTTS_" + create_directories(directory_audios_vc) + create_new_files_for_vc( + speakers_coqui, + filtered_coqui_segments["segments"], + dereverb_automatic, + ) + + # Init TTS + device = os.environ.get("SONITR_DEVICE") + model = TTS(model_id_coqui).to(device) + sampling_rate = 24000 + + # filtered_segments = filtered_coqui_segments['segments'] + # Sorting the segments by 'tts_name' + # sorted_segments = sorted(filtered_segments, key=lambda x: x['tts_name']) + # logger.debug(sorted_segments) + + for segment in tqdm(filtered_coqui_segments["segments"]): + speaker = segment["speaker"] + text = segment["text"] + start = segment["start"] + tts_name = segment["tts_name"] + if tts_name == "_XTTS_/AUTOMATIC.wav": + tts_name = f"_XTTS_/AUTOMATIC_{speaker}.wav" + + # make the tts audio + filename = f"audio/{start}.ogg" + logger.info(f"{text} >> {filename}") + try: + # Infer + wav = model.tts( + text=text, speaker_wav=tts_name, language=TRANSLATE_AUDIO_TO + ) + data_tts = pad_array( + wav, + sampling_rate, + ) + # Save file + sf.write( + file=filename, + samplerate=sampling_rate, + data=data_tts, + format="ogg", + subtype="vorbis", + ) + verify_saved_file_and_size(filename) + except Exception as error: + error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) + gc.collect() + torch.cuda.empty_cache() + try: + del model + gc.collect() + torch.cuda.empty_cache() + except Exception as error: + logger.error(str(error)) + gc.collect() + torch.cuda.empty_cache() + + +# ===================================== +# PIPER TTS +# ===================================== + + +def piper_tts_voices_list(): + file_path = download_manager( + url="https://huggingface.co/rhasspy/piper-voices/resolve/main/voices.json", + path="./PIPER_MODELS", + ) + + with open(file_path, "r", encoding="utf8") as file: + data = json.load(file) + piper_id_models = [key + " VITS-onnx" for key in data.keys()] + + return piper_id_models + + +def replace_text_in_json(file_path, key_to_replace, new_text, condition=None): + # Read the JSON file + with open(file_path, "r", encoding="utf-8") as file: + data = json.load(file) + + # Modify the specified key's value with the new text + if key_to_replace in data: + if condition: + value_condition = condition + else: + value_condition = data[key_to_replace] + + if data[key_to_replace] == value_condition: + data[key_to_replace] = new_text + + # Write the modified content back to the JSON file + with open(file_path, "w") as file: + json.dump( + data, file, indent=2 + ) # Write the modified data back to the file with indentation for readability + + +def load_piper_model( + model: str, + data_dir: list, + download_dir: str = "", + update_voices: bool = False, +): + from piper import PiperVoice + from piper.download import ensure_voice_exists, find_voice, get_voices + + try: + import onnxruntime as rt + + if rt.get_device() == "GPU" and os.environ.get("SONITR_DEVICE") == "cuda": + logger.debug("onnxruntime device > GPU") + cuda = True + else: + logger.info( + "onnxruntime device > CPU" + ) # try pip install onnxruntime-gpu + cuda = False + except Exception as error: + raise TTS_OperationError(f"onnxruntime error: {str(error)}") + + # Disable CUDA in Windows + if platform.system() == "Windows": + logger.info("Employing CPU exclusivity with Piper TTS") + cuda = False + + if not download_dir: + # Download to first data directory by default + download_dir = data_dir[0] + else: + data_dir = [os.path.join(data_dir[0], download_dir)] + + # Download voice if file doesn't exist + model_path = Path(model) + if not model_path.exists(): + # Load voice info + voices_info = get_voices(download_dir, update_voices=update_voices) + + # Resolve aliases for backwards compatibility with old voice names + aliases_info: Dict[str, Any] = {} + for voice_info in voices_info.values(): + for voice_alias in voice_info.get("aliases", []): + aliases_info[voice_alias] = {"_is_alias": True, **voice_info} + + voices_info.update(aliases_info) + ensure_voice_exists(model, data_dir, download_dir, voices_info) + model, config = find_voice(model, data_dir) + + replace_text_in_json( + config, "phoneme_type", "espeak", "PhonemeType.ESPEAK" + ) + + # Load voice + voice = PiperVoice.load(model, config_path=config, use_cuda=cuda) + + return voice + + +def synthesize_text_to_audio_np_array(voice, text, synthesize_args): + audio_stream = voice.synthesize_stream_raw(text, **synthesize_args) + + # Collect the audio bytes into a single NumPy array + audio_data = b"" + for audio_bytes in audio_stream: + audio_data += audio_bytes + + # Ensure correct data type and convert audio bytes to NumPy array + audio_np = np.frombuffer(audio_data, dtype=np.int16) + return audio_np + + +def segments_vits_onnx_tts(filtered_onnx_vits_segments, TRANSLATE_AUDIO_TO): + """ + Install: + pip install -q piper-tts==1.2.0 onnxruntime-gpu # for cuda118 + """ + + data_dir = [ + str(Path.cwd()) + ] # "Data directory to check for downloaded models (default: current directory)" + download_dir = "PIPER_MODELS" + # model_name = "en_US-lessac-medium" tts_name in a dict like VITS + update_voices = True # "Download latest voices.json during startup", + + synthesize_args = { + "speaker_id": None, + "length_scale": 1.0, + "noise_scale": 0.667, + "noise_w": 0.8, + "sentence_silence": 0.0, + } + + filtered_segments = filtered_onnx_vits_segments["segments"] + # Sorting the segments by 'tts_name' + sorted_segments = sorted(filtered_segments, key=lambda x: x["tts_name"]) + logger.debug(sorted_segments) + + model_name_key = None + for segment in tqdm(sorted_segments): + speaker = segment["speaker"] # noqa + text = segment["text"] + start = segment["start"] + tts_name = segment["tts_name"].replace(" VITS-onnx", "") + + if tts_name != model_name_key: + model_name_key = tts_name + model = load_piper_model( + tts_name, data_dir, download_dir, update_voices + ) + sampling_rate = model.config.sample_rate + + # make the tts audio + filename = f"audio/{start}.ogg" + logger.info(f"{text} >> {filename}") + try: + # Infer + speech_output = synthesize_text_to_audio_np_array( + model, text, synthesize_args + ) + data_tts = pad_array( + speech_output, # .cpu().numpy().squeeze().astype(np.float32), + sampling_rate, + ) + # Save file + sf.write( + file=filename, + samplerate=sampling_rate, + data=data_tts, + format="ogg", + subtype="vorbis", + ) + verify_saved_file_and_size(filename) + except Exception as error: + error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) + gc.collect() + torch.cuda.empty_cache() + try: + del model + gc.collect() + torch.cuda.empty_cache() + except Exception as error: + logger.error(str(error)) + gc.collect() + torch.cuda.empty_cache() + + +# ===================================== +# CLOSEAI TTS +# ===================================== + + +def segments_openai_tts( + filtered_openai_tts_segments, TRANSLATE_AUDIO_TO +): + from openai import OpenAI + + client = OpenAI() + sampling_rate = 24000 + + # filtered_segments = filtered_openai_tts_segments['segments'] + # Sorting the segments by 'tts_name' + # sorted_segments = sorted(filtered_segments, key=lambda x: x['tts_name']) + + for segment in tqdm(filtered_openai_tts_segments["segments"]): + speaker = segment["speaker"] # noqa + text = segment["text"].strip() + start = segment["start"] + tts_name = segment["tts_name"] + + # make the tts audio + filename = f"audio/{start}.ogg" + logger.info(f"{text} >> {filename}") + + try: + # Request + response = client.audio.speech.create( + model="tts-1-hd" if "HD" in tts_name else "tts-1", + voice=tts_name.split()[0][1:], + response_format="wav", + input=text + ) + + audio_bytes = b'' + for data in response.iter_bytes(chunk_size=4096): + audio_bytes += data + + speech_output = np.frombuffer(audio_bytes, dtype=np.int16) + + # Save file + data_tts = pad_array( + speech_output[240:], + sampling_rate, + ) + + sf.write( + file=filename, + samplerate=sampling_rate, + data=data_tts, + format="ogg", + subtype="vorbis", + ) + verify_saved_file_and_size(filename) + + except Exception as error: + error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) + + +# ===================================== +# Select task TTS +# ===================================== + + +def find_spkr(pattern, speaker_to_voice, segments): + return [ + speaker + for speaker, voice in speaker_to_voice.items() + if pattern.match(voice) and any( + segment["speaker"] == speaker for segment in segments + ) + ] + + +def filter_by_speaker(speakers, segments): + return { + "segments": [ + segment + for segment in segments + if segment["speaker"] in speakers + ] + } + + +def audio_segmentation_to_voice( + result_diarize, + TRANSLATE_AUDIO_TO, + is_gui, + tts_voice00, + tts_voice01="", + tts_voice02="", + tts_voice03="", + tts_voice04="", + tts_voice05="", + tts_voice06="", + tts_voice07="", + tts_voice08="", + tts_voice09="", + tts_voice10="", + tts_voice11="", + dereverb_automatic=True, + model_id_bark="suno/bark-small", + model_id_coqui="tts_models/multilingual/multi-dataset/xtts_v2", + delete_previous_automatic=True, +): + + remove_directory_contents("audio") + + # Mapping speakers to voice variables + speaker_to_voice = { + "SPEAKER_00": tts_voice00, + "SPEAKER_01": tts_voice01, + "SPEAKER_02": tts_voice02, + "SPEAKER_03": tts_voice03, + "SPEAKER_04": tts_voice04, + "SPEAKER_05": tts_voice05, + "SPEAKER_06": tts_voice06, + "SPEAKER_07": tts_voice07, + "SPEAKER_08": tts_voice08, + "SPEAKER_09": tts_voice09, + "SPEAKER_10": tts_voice10, + "SPEAKER_11": tts_voice11, + } + + # Assign 'SPEAKER_00' to segments without a 'speaker' key + for segment in result_diarize["segments"]: + if "speaker" not in segment: + segment["speaker"] = "SPEAKER_00" + logger.warning( + "NO SPEAKER DETECT IN SEGMENT: First TTS will be used in the" + f" segment time {segment['start'], segment['text']}" + ) + # Assign the TTS name + segment["tts_name"] = speaker_to_voice[segment["speaker"]] + + # Find TTS method + pattern_edge = re.compile(r".*-(Male|Female)$") + pattern_bark = re.compile(r".* BARK$") + pattern_vits = re.compile(r".* VITS$") + pattern_coqui = re.compile(r".+\.(wav|mp3|ogg|m4a)$") + pattern_vits_onnx = re.compile(r".* VITS-onnx$") + pattern_openai_tts = re.compile(r".* OpenAI-TTS$") + + all_segments = result_diarize["segments"] + + speakers_edge = find_spkr(pattern_edge, speaker_to_voice, all_segments) + speakers_bark = find_spkr(pattern_bark, speaker_to_voice, all_segments) + speakers_vits = find_spkr(pattern_vits, speaker_to_voice, all_segments) + speakers_coqui = find_spkr(pattern_coqui, speaker_to_voice, all_segments) + speakers_vits_onnx = find_spkr( + pattern_vits_onnx, speaker_to_voice, all_segments + ) + speakers_openai_tts = find_spkr( + pattern_openai_tts, speaker_to_voice, all_segments + ) + + # Filter method in segments + filtered_edge = filter_by_speaker(speakers_edge, all_segments) + filtered_bark = filter_by_speaker(speakers_bark, all_segments) + filtered_vits = filter_by_speaker(speakers_vits, all_segments) + filtered_coqui = filter_by_speaker(speakers_coqui, all_segments) + filtered_vits_onnx = filter_by_speaker(speakers_vits_onnx, all_segments) + filtered_openai_tts = filter_by_speaker(speakers_openai_tts, all_segments) + + # Infer + if filtered_edge["segments"]: + logger.info(f"EDGE TTS: {speakers_edge}") + segments_egde_tts(filtered_edge, TRANSLATE_AUDIO_TO, is_gui) # mp3 + if filtered_bark["segments"]: + logger.info(f"BARK TTS: {speakers_bark}") + segments_bark_tts( + filtered_bark, TRANSLATE_AUDIO_TO, model_id_bark + ) # wav + if filtered_vits["segments"]: + logger.info(f"VITS TTS: {speakers_vits}") + segments_vits_tts(filtered_vits, TRANSLATE_AUDIO_TO) # wav + if filtered_coqui["segments"]: + logger.info(f"Coqui TTS: {speakers_coqui}") + segments_coqui_tts( + filtered_coqui, + TRANSLATE_AUDIO_TO, + model_id_coqui, + speakers_coqui, + delete_previous_automatic, + dereverb_automatic, + ) # wav + if filtered_vits_onnx["segments"]: + logger.info(f"PIPER TTS: {speakers_vits_onnx}") + segments_vits_onnx_tts(filtered_vits_onnx, TRANSLATE_AUDIO_TO) # wav + if filtered_openai_tts["segments"]: + logger.info(f"OpenAI TTS: {speakers_openai_tts}") + segments_openai_tts(filtered_openai_tts, TRANSLATE_AUDIO_TO) # wav + + [result.pop("tts_name", None) for result in result_diarize["segments"]] + return [ + speakers_edge, + speakers_bark, + speakers_vits, + speakers_coqui, + speakers_vits_onnx, + speakers_openai_tts + ] + + +def accelerate_segments( + result_diarize, + max_accelerate_audio, + valid_speakers, + acceleration_rate_regulation=False, + folder_output="audio2", +): + logger.info("Apply acceleration") + + ( + speakers_edge, + speakers_bark, + speakers_vits, + speakers_coqui, + speakers_vits_onnx, + speakers_openai_tts + ) = valid_speakers + + create_directories(f"{folder_output}/audio/") + remove_directory_contents(f"{folder_output}/audio/") + + audio_files = [] + speakers_list = [] + + max_count_segments_idx = len(result_diarize["segments"]) - 1 + + for i, segment in tqdm(enumerate(result_diarize["segments"])): + text = segment["text"] # noqa + start = segment["start"] + end = segment["end"] + speaker = segment["speaker"] + + # find name audio + # if speaker in speakers_edge: + filename = f"audio/{start}.ogg" + # elif speaker in speakers_bark + speakers_vits + speakers_coqui + speakers_vits_onnx: + # filename = f"audio/{start}.wav" # wav + + # duration + duration_true = end - start + duration_tts = librosa.get_duration(filename=filename) + + # Accelerate percentage + acc_percentage = duration_tts / duration_true + + # Smoth + if acceleration_rate_regulation and acc_percentage >= 1.3: + try: + next_segment = result_diarize["segments"][ + min(max_count_segments_idx, i + 1) + ] + next_start = next_segment["start"] + next_speaker = next_segment["speaker"] + duration_with_next_start = next_start - start + + if duration_with_next_start > duration_true: + extra_time = duration_with_next_start - duration_true + + if speaker == next_speaker: + # half + smoth_duration = duration_true + (extra_time * 0.5) + else: + # 7/10 + smoth_duration = duration_true + (extra_time * 0.7) + logger.debug( + f"Base acc: {acc_percentage}, " + f"smoth acc: {duration_tts / smoth_duration}" + ) + acc_percentage = max(1.2, (duration_tts / smoth_duration)) + + except Exception as error: + logger.error(str(error)) + + if acc_percentage > max_accelerate_audio: + acc_percentage = max_accelerate_audio + elif acc_percentage <= 1.15 and acc_percentage >= 0.8: + acc_percentage = 1.0 + elif acc_percentage <= 0.79: + acc_percentage = 0.8 + + # Round + acc_percentage = round(acc_percentage + 0.0, 1) + + # Format read if need + if speaker in speakers_edge: + info_enc = sf.info(filename).format + else: + info_enc = "OGG" + + # Apply aceleration or opposite to the audio file in folder_output folder + if acc_percentage == 1.0 and info_enc == "OGG": + copy_files(filename, f"{folder_output}{os.sep}audio") + else: + os.system( + f"ffmpeg -y -loglevel panic -i {filename} -filter:a atempo={acc_percentage} {folder_output}/{filename}" + ) + + if logger.isEnabledFor(logging.DEBUG): + duration_create = librosa.get_duration( + filename=f"{folder_output}/{filename}" + ) + logger.debug( + f"acc_percen is {acc_percentage}, tts duration " + f"is {duration_tts}, new duration is {duration_create}" + f", for {filename}" + ) + + audio_files.append(f"{folder_output}/{filename}") + speaker = "TTS Speaker {:02d}".format(int(speaker[-2:]) + 1) + speakers_list.append(speaker) + + return audio_files, speakers_list + + +# ===================================== +# Tone color converter +# ===================================== + + +def se_process_audio_segments( + source_seg, tone_color_converter, device, remove_previous_processed=True +): + # list wav seg + source_audio_segs = glob.glob(f"{source_seg}/*.wav") + if not source_audio_segs: + raise ValueError( + f"No audio segments found in {str(source_audio_segs)}" + ) + + source_se_path = os.path.join(source_seg, "se.pth") + + # if exist not create wav + if os.path.isfile(source_se_path): + se = torch.load(source_se_path).to(device) + logger.debug(f"Previous created {source_se_path}") + else: + se = tone_color_converter.extract_se(source_audio_segs, source_se_path) + + return se + + +def create_wav_vc( + valid_speakers, + segments_base, + audio_name, + max_segments=10, + target_dir="processed", + get_vocals_dereverb=False, +): + # valid_speakers = list({item['speaker'] for item in segments_base}) + + # Before function delete automatic delete_previous_automatic + output_dir = os.path.join(".", target_dir) # remove content + # remove_directory_contents(output_dir) + + path_source_segments = [] + path_target_segments = [] + for speaker in valid_speakers: + filtered_speaker = [ + segment + for segment in segments_base + if segment["speaker"] == speaker + ] + if len(filtered_speaker) > 4: + filtered_speaker = filtered_speaker[1:] + + dir_name_speaker = speaker + audio_name + dir_name_speaker_tts = "tts" + speaker + audio_name + dir_path_speaker = os.path.join(output_dir, dir_name_speaker) + dir_path_speaker_tts = os.path.join(output_dir, dir_name_speaker_tts) + create_directories([dir_path_speaker, dir_path_speaker_tts]) + + path_target_segments.append(dir_path_speaker) + path_source_segments.append(dir_path_speaker_tts) + + # create wav + max_segments_count = 0 + for seg in filtered_speaker: + duration = float(seg["end"]) - float(seg["start"]) + if duration > 3.0 and duration < 18.0: + logger.info( + f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {duration}, {seg["text"]}' + ) + name_new_wav = str(seg["start"]) + + check_segment_audio_target_file = os.path.join( + dir_path_speaker, f"{name_new_wav}.wav" + ) + + if os.path.exists(check_segment_audio_target_file): + logger.debug( + "Segment vc source exists: " + f"{check_segment_audio_target_file}" + ) + pass + else: + create_wav_file_vc( + sample_name=name_new_wav, + audio_wav="audio.wav", + start=(float(seg["start"]) + 1.0), + end=(float(seg["end"]) - 1.0), + output_final_path=dir_path_speaker, + get_vocals_dereverb=get_vocals_dereverb, + ) + + file_name_tts = f"audio2/audio/{str(seg['start'])}.ogg" + # copy_files(file_name_tts, os.path.join(output_dir, dir_name_speaker_tts) + convert_to_xtts_good_sample( + file_name_tts, dir_path_speaker_tts + ) + + max_segments_count += 1 + if max_segments_count == max_segments: + break + + if max_segments_count == 0: + logger.info("Taking the first segment") + seg = filtered_speaker[0] + logger.info( + f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {seg["text"]}' + ) + max_duration = float(seg["end"]) - float(seg["start"]) + max_duration = max(1.0, min(max_duration, 18.0)) + + name_new_wav = str(seg["start"]) + create_wav_file_vc( + sample_name=name_new_wav, + audio_wav="audio.wav", + start=(float(seg["start"])), + end=(float(seg["start"]) + max_duration), + output_final_path=dir_path_speaker, + get_vocals_dereverb=get_vocals_dereverb, + ) + + file_name_tts = f"audio2/audio/{str(seg['start'])}.ogg" + # copy_files(file_name_tts, os.path.join(output_dir, dir_name_speaker_tts) + convert_to_xtts_good_sample(file_name_tts, dir_path_speaker_tts) + + logger.debug(f"Base: {str(path_source_segments)}") + logger.debug(f"Target: {str(path_target_segments)}") + + return path_source_segments, path_target_segments + + +def toneconverter_openvoice( + result_diarize, + preprocessor_max_segments, + remove_previous_process=True, + get_vocals_dereverb=False, + model="openvoice", +): + audio_path = "audio.wav" + # se_path = "se.pth" + target_dir = "processed" + create_directories(target_dir) + + from openvoice import se_extractor + from openvoice.api import ToneColorConverter + + audio_name = f"{os.path.basename(audio_path).rsplit('.', 1)[0]}_{se_extractor.hash_numpy_array(audio_path)}" + # se_path = os.path.join(target_dir, audio_name, 'se.pth') + + # create wav seg original and target + + valid_speakers = list( + {item["speaker"] for item in result_diarize["segments"]} + ) + + logger.info("Openvoice preprocessor...") + + if remove_previous_process: + remove_directory_contents(target_dir) + + path_source_segments, path_target_segments = create_wav_vc( + valid_speakers, + result_diarize["segments"], + audio_name, + max_segments=preprocessor_max_segments, + get_vocals_dereverb=get_vocals_dereverb, + ) + + logger.info("Openvoice loading model...") + model_path_openvoice = "./OPENVOICE_MODELS" + url_model_openvoice = "https://huggingface.co/myshell-ai/OpenVoice/resolve/main/checkpoints/converter" + + if "v2" in model: + model_path = os.path.join(model_path_openvoice, "v2") + url_model_openvoice = url_model_openvoice.replace( + "OpenVoice", "OpenVoiceV2" + ).replace("checkpoints/", "") + else: + model_path = os.path.join(model_path_openvoice, "v1") + create_directories(model_path) + + config_url = f"{url_model_openvoice}/config.json" + checkpoint_url = f"{url_model_openvoice}/checkpoint.pth" + + config_path = download_manager(url=config_url, path=model_path) + checkpoint_path = download_manager( + url=checkpoint_url, path=model_path + ) + + device = os.environ.get("SONITR_DEVICE") + tone_color_converter = ToneColorConverter(config_path, device=device) + tone_color_converter.load_ckpt(checkpoint_path) + + logger.info("Openvoice tone color converter:") + global_progress_bar = tqdm(total=len(result_diarize["segments"]), desc="Progress") + + for source_seg, target_seg, speaker in zip( + path_source_segments, path_target_segments, valid_speakers + ): + # source_se_path = os.path.join(source_seg, 'se.pth') + source_se = se_process_audio_segments(source_seg, tone_color_converter, device) + # target_se_path = os.path.join(target_seg, 'se.pth') + target_se = se_process_audio_segments(target_seg, tone_color_converter, device) + + # Iterate throw segments + encode_message = "@MyShell" + filtered_speaker = [ + segment + for segment in result_diarize["segments"] + if segment["speaker"] == speaker + ] + for seg in filtered_speaker: + src_path = ( + save_path + ) = f"audio2/audio/{str(seg['start'])}.ogg" # overwrite + logger.debug(f"{src_path}") + + tone_color_converter.convert( + audio_src_path=src_path, + src_se=source_se, + tgt_se=target_se, + output_path=save_path, + message=encode_message, + ) + + global_progress_bar.update(1) + + global_progress_bar.close() + + try: + del tone_color_converter + gc.collect() + torch.cuda.empty_cache() + except Exception as error: + logger.error(str(error)) + gc.collect() + torch.cuda.empty_cache() + + +def toneconverter_freevc( + result_diarize, + remove_previous_process=True, + get_vocals_dereverb=False, +): + audio_path = "audio.wav" + target_dir = "processed" + create_directories(target_dir) + + from openvoice import se_extractor + + audio_name = f"{os.path.basename(audio_path).rsplit('.', 1)[0]}_{se_extractor.hash_numpy_array(audio_path)}" + + # create wav seg; original is target and dubbing is source + valid_speakers = list( + {item["speaker"] for item in result_diarize["segments"]} + ) + + logger.info("FreeVC preprocessor...") + + if remove_previous_process: + remove_directory_contents(target_dir) + + path_source_segments, path_target_segments = create_wav_vc( + valid_speakers, + result_diarize["segments"], + audio_name, + max_segments=1, + get_vocals_dereverb=get_vocals_dereverb, + ) + + logger.info("FreeVC loading model...") + device_id = os.environ.get("SONITR_DEVICE") + device = None if device_id == "cpu" else device_id + try: + from TTS.api import TTS + tts = TTS( + model_name="voice_conversion_models/multilingual/vctk/freevc24", + progress_bar=False + ).to(device) + except Exception as error: + logger.error(str(error)) + logger.error("Error loading the FreeVC model.") + return + + logger.info("FreeVC process:") + global_progress_bar = tqdm(total=len(result_diarize["segments"]), desc="Progress") + + for source_seg, target_seg, speaker in zip( + path_source_segments, path_target_segments, valid_speakers + ): + + filtered_speaker = [ + segment + for segment in result_diarize["segments"] + if segment["speaker"] == speaker + ] + + files_and_directories = os.listdir(target_seg) + wav_files = [file for file in files_and_directories if file.endswith(".wav")] + original_wav_audio_segment = os.path.join(target_seg, wav_files[0]) + + for seg in filtered_speaker: + + src_path = ( + save_path + ) = f"audio2/audio/{str(seg['start'])}.ogg" # overwrite + logger.debug(f"{src_path} - {original_wav_audio_segment}") + + wav = tts.voice_conversion( + source_wav=src_path, + target_wav=original_wav_audio_segment, + ) + + sf.write( + file=save_path, + samplerate=tts.voice_converter.vc_config.audio.output_sample_rate, + data=wav, + format="ogg", + subtype="vorbis", + ) + + global_progress_bar.update(1) + + global_progress_bar.close() + + try: + del tts + gc.collect() + torch.cuda.empty_cache() + except Exception as error: + logger.error(str(error)) + gc.collect() + torch.cuda.empty_cache() + + +def toneconverter( + result_diarize, + preprocessor_max_segments, + remove_previous_process=True, + get_vocals_dereverb=False, + method_vc="freevc" +): + + if method_vc == "freevc": + if preprocessor_max_segments > 1: + logger.info("FreeVC only uses one segment.") + return toneconverter_freevc( + result_diarize, + remove_previous_process=remove_previous_process, + get_vocals_dereverb=get_vocals_dereverb, + ) + elif "openvoice" in method_vc: + return toneconverter_openvoice( + result_diarize, + preprocessor_max_segments, + remove_previous_process=remove_previous_process, + get_vocals_dereverb=get_vocals_dereverb, + model=method_vc, + ) + + +if __name__ == "__main__": + from segments import result_diarize + + audio_segmentation_to_voice( + result_diarize, + TRANSLATE_AUDIO_TO="en", + max_accelerate_audio=2.1, + is_gui=True, + tts_voice00="en-facebook-mms VITS", + tts_voice01="en-CA-ClaraNeural-Female", + tts_voice02="en-GB-ThomasNeural-Male", + tts_voice03="en-GB-SoniaNeural-Female", + tts_voice04="en-NZ-MitchellNeural-Male", + tts_voice05="en-GB-MaisieNeural-Female", + )