OcTra / app.py
arcan3's picture
adde revision
fc5ed00
raw
history blame
12.9 kB
# load the libraries for the application
# -------------------------------------------
import os
import re
import nltk
import torch
import librosa
import tempfile
import subprocess
import gradio as gr
from scipy.io import wavfile
from nnet import utils, commons
from transformers import pipeline
from scipy.io.wavfile import write
from faster_whisper import WhisperModel
from nnet.models import SynthesizerTrn as vitsTRN
from nnet.models_vc import SynthesizerTrn as freeTRN
from nnet.mel_processing import mel_spectrogram_torch
from configurations.get_constants import constantConfig
from speaker_encoder.voice_encoder import SpeakerEncoder
from df.enhance import enhance, init_df, load_audio, save_audio
from configurations.get_hyperparameters import hyperparameterConfig
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline
nltk.download('punkt')
from nltk.tokenize import sent_tokenize
# making the FreeVC function
# ---------------------------------
class FreeVCModel:
def __init__(self, config, ptfile, speaker_model, wavLM_model, device='cpu'):
self.hps = utils.get_hparams_from_file(config)
self.net_g = freeTRN(
self.hps.data.filter_length // 2 + 1,
self.hps.train.segment_size // self.hps.data.hop_length,
**self.hps.model
).to(hyperparameters.device)
_ = self.net_g.eval()
_ = utils.load_checkpoint(ptfile, self.net_g, None, True)
self.cmodel = utils.get_cmodel(device, wavLM_model)
if self.hps.model.use_spk:
self.smodel = SpeakerEncoder(speaker_model)
def convert(self, src, tgt):
fs_src, src_audio = src
fs_tgt, tgt_audio = tgt
src = f"{constants.temp_audio_folder}/src.wav"
tgt = f"{constants.temp_audio_folder}/tgt.wav"
out = f"{constants.temp_audio_folder}/cnvr.wav"
with torch.no_grad():
wavfile.write(tgt, fs_tgt, tgt_audio)
wav_tgt, _ = librosa.load(tgt, sr=self.hps.data.sampling_rate)
wav_tgt, _ = librosa.effects.trim(wav_tgt, top_db=20)
if self.hps.model.use_spk:
g_tgt = self.smodel.embed_utterance(wav_tgt)
g_tgt = torch.from_numpy(g_tgt).unsqueeze(0).to(hyperparameters.device.type)
else:
wav_tgt = torch.from_numpy(wav_tgt).unsqueeze(0).to(hyperparameters.device.type)
mel_tgt = mel_spectrogram_torch(
wav_tgt,
self.hps.data.filter_length,
self.hps.data.n_mel_channels,
self.hps.data.sampling_rate,
self.hps.data.hop_length,
self.hps.data.win_length,
self.hps.data.mel_fmin,
self.hps.data.mel_fmax,
)
wavfile.write(src, fs_src, src_audio)
wav_src, _ = librosa.load(src, sr=self.hps.data.sampling_rate)
wav_src = torch.from_numpy(wav_src).unsqueeze(0).to(hyperparameters.device.type)
c = utils.get_content(self.cmodel, wav_src)
if self.hps.model.use_spk:
audio = self.net_g.infer(c, g=g_tgt)
else:
audio = self.net_g.infer(c, mel=mel_tgt)
audio = audio[0][0].data.cpu().float().numpy()
write(out, 24000, audio)
return out
# load the system configurations
constants = constantConfig()
hyperparameters = hyperparameterConfig()
# load the models
model, df_state, _ = init_df(hyperparameters.voice_enhacing_model, config_allow_defaults=True) # voice enhancing model
stt_model = WhisperModel(hyperparameters.stt_model, device=hyperparameters.device.type, compute_type="float32") #speech to text model
trans_model = AutoModelForSeq2SeqLM.from_pretrained(constants.model_name_dict[hyperparameters.nllb_model], torch_dtype=torch.bfloat16).to(hyperparameters.device)
trans_tokenizer = AutoTokenizer.from_pretrained(constants.model_name_dict[hyperparameters.nllb_model])
modelConvertSpeech = FreeVCModel(config=hyperparameters.text2speech_config, ptfile=hyperparameters.text2speech_model,
speaker_model=hyperparameters.text2speech_encoder, wavLM_model=hyperparameters.wavlm_model,
device=hyperparameters.device.type)
# download the language model if doesn't existing
# ----------------------------------------------------
def download(lang, lang_directory):
if not os.path.exists(f"{lang_directory}/{lang}"):
cmd = ";".join([
f"wget {constants.language_download_web}/{lang}.tar.gz -O {lang_directory}/{lang}.tar.gz",
f"tar zxvf {lang_directory}/{lang}.tar.gz -C {lang_directory}"
])
subprocess.check_output(cmd, shell=True)
try:
os.remove(f"{lang_directory}/{lang}.tar.gz")
except:
pass
return f"{lang_directory}/{lang}"
def preprocess_char(text, lang=None):
"""
Special treatement of characters in certain languages
"""
if lang == 'ron':
text = text.replace("ț", "ţ")
return text
def preprocess_text(txt, text_mapper, hps, uroman_dir=None, lang=None):
txt = preprocess_char(txt, lang=lang)
is_uroman = hps.data.training_files.split('.')[-1] == 'uroman'
if is_uroman:
txt = text_mapper.uromanize(txt, f'{uroman_dir}/bin/uroman.pl')
txt = txt.lower()
txt = text_mapper.filter_oov(txt)
return txt
def detect_language(text,LID):
predictions = LID.predict(text)
detected_lang_code = predictions[0][0].replace("__label__", "")
return detected_lang_code
# text to speech
class TextMapper(object):
def __init__(self, vocab_file):
self.symbols = [x.replace("\n", "") for x in open(vocab_file, encoding="utf-8").readlines()]
self.SPACE_ID = self.symbols.index(" ")
self._symbol_to_id = {s: i for i, s in enumerate(self.symbols)}
self._id_to_symbol = {i: s for i, s in enumerate(self.symbols)}
def text_to_sequence(self, text, cleaner_names):
'''Converts a string of text to a sequence of IDs corresponding to the symbols in the text.
Args:
text: string to convert to a sequence
cleaner_names: names of the cleaner functions to run the text through
Returns:
List of integers corresponding to the symbols in the text
'''
sequence = []
clean_text = text.strip()
for symbol in clean_text:
symbol_id = self._symbol_to_id[symbol]
sequence += [symbol_id]
return sequence
def uromanize(self, text, uroman_pl):
with tempfile.NamedTemporaryFile() as tf, \
tempfile.NamedTemporaryFile() as tf2:
with open(tf.name, "w") as f:
f.write("\n".join([text]))
cmd = f"perl " + uroman_pl
cmd += f" -l xxx "
cmd += f" < {tf.name} > {tf2.name}"
os.system(cmd)
outtexts = []
with open(tf2.name) as f:
for line in f:
line = re.sub(r"\s+", " ", line).strip()
outtexts.append(line)
outtext = outtexts[0]
return outtext
def get_text(self, text, hps):
text_norm = self.text_to_sequence(text, hps.data.text_cleaners)
if hps.data.add_blank:
text_norm = commons.intersperse(text_norm, 0)
text_norm = torch.LongTensor(text_norm)
return text_norm
def filter_oov(self, text):
val_chars = self._symbol_to_id
txt_filt = "".join(list(filter(lambda x: x in val_chars, text)))
return txt_filt
def speech_to_text(audio_file):
try:
fs, audio = audio_file
wavfile.write(constants.input_speech_file, fs, audio)
audio0, _ = load_audio(constants.input_speech_file, sr=df_state.sr())
# Enhance the SNR of the audio
enhanced = enhance(model, df_state, audio0)
save_audio(constants.enhanced_speech_file, enhanced, df_state.sr())
segments, info = stt_model.transcribe(constants.enhanced_speech_file)
speech_text = ''
for segment in segments:
speech_text = f'{speech_text}{segment.text}'
try:
source_lang_nllb = [k for k, v in constants.flores_codes_to_tts_codes.items() if v[:2] == info.language][0]
except:
source_lang_nllb = 'language cant be determined, select manually'
# text translation
return speech_text, gr.Dropdown.update(value=source_lang_nllb)
except:
return '', gr.Dropdown.update(value='English')
# Text tp speech
def text_to_speech(text, target_lang):
txt = text
# LANG = get_target_tts_lang(target_lang)
LANG = constants.flores_codes_to_tts_codes[target_lang]
ckpt_dir = download(LANG, lang_directory=constants.language_directory)
vocab_file = f"{ckpt_dir}/{constants.language_vocab_text}"
config_file = f"{ckpt_dir}/{constants.language_vocab_configuration}"
hps = utils.get_hparams_from_file(config_file)
text_mapper = TextMapper(vocab_file)
net_g = vitsTRN(
len(text_mapper.symbols),
hps.data.filter_length // 2 + 1,
hps.train.segment_size // hps.data.hop_length,
**hps.model)
net_g.to(hyperparameters.device)
_ = net_g.eval()
g_pth = f"{ckpt_dir}/{constants.language_vocab_model}"
_ = utils.load_checkpoint(g_pth, net_g, None)
txt = preprocess_text(txt, text_mapper, hps, lang=LANG, uroman_dir=constants.uroman_directory)
stn_tst = text_mapper.get_text(txt, hps)
with torch.no_grad():
x_tst = stn_tst.unsqueeze(0).to(hyperparameters.device)
x_tst_lengths = torch.LongTensor([stn_tst.size(0)]).to(hyperparameters.device)
hyp = net_g.infer(
x_tst, x_tst_lengths, noise_scale=.667,
noise_scale_w=0.8, length_scale=1.0
)[0][0,0].cpu().float().numpy()
return hps.data.sampling_rate, hyp
def translation(audio, text, source_lang_nllb, target_code_nllb, output_type, sentence_mode):
target_code = constants.flores_codes[target_code_nllb]
translator = pipeline('translation', model=trans_model, tokenizer=trans_tokenizer, src_lang=source_lang_nllb, tgt_lang=target_code, device=hyperparameters.device)
# output = translator(text, max_length=400)[0]['translation_text']
if sentence_mode == "Sentence-wise":
sentences = sent_tokenize(text)
translated_sentences = []
for sentence in sentences:
translated_sentence = translator(sentence, max_length=400)[0]['translation_text']
translated_sentences.append(translated_sentence)
output = ' '.join(translated_sentences)
else:
output = translator(text, max_length=1024)[0]['translation_text']
# get the text to speech
fs_out, audio_out = text_to_speech(output, target_code_nllb)
if output_type == 'own voice':
out_file = modelConvertSpeech.convert((fs_out, audio_out), audio)
return output, out_file
wavfile.write(constants.text2speech_wavfile, fs_out, audio_out)
return output, constants.text2speech_wavfile
with gr.Blocks(title = "Octopus Translation App") as octopus_translator:
with gr.Row():
audio_file = gr.Audio(source="microphone")
with gr.Row():
input_text = gr.Textbox(label="Input text")
source_language = gr.Dropdown(list(constants.flores_codes.keys()), value='English', label='Source (Autoselected)', interactive=True)
with gr.Row():
output_text = gr.Textbox(label='Translated text')
target_language = gr.Dropdown(list(constants.flores_codes.keys()), value='German', label='Target', interactive=True)
with gr.Row():
output_speech = gr.Audio(label='Translated speech')
translate_button = gr.Button('Translate')
with gr.Row():
enhance_audio = gr.Radio(['yes', 'no'], value='yes', label='Enhance input voice', interactive=True)
input_type = gr.Radio(['Whole text', 'Sentence-wise'],value='Sentence-wise', label="Translation Mode", interactive=True)
output_audio_type = gr.Radio(['standard speaker', 'voice transfer'], value='voice transfer', label='Enhance output voice', interactive=True)
audio_file.change(speech_to_text,
inputs=[audio_file],
outputs=[input_text, source_language])
translate_button.click(translation,
inputs=[audio_file, input_text,
source_language, target_language,
output_audio_type, input_type],
outputs=[output_text, output_speech])
octopus_translator.launch(share=False)