enhanced_accessibility = False #@param {type:"boolean"} #@markdown --- #@markdown #### Please select your language: #lang_select = "English" #@param ["English", "Spanish"] #if lang_select == "English": # lang = "en" #elif lang_select == "Spanish": # lang = "es" #else: # raise Exception("Language not supported.") #@markdown --- use_gpu = False #@param {type:"boolean"} from fastapi import FastAPI, Request, Form from fastapi.responses import HTMLResponse from fastapi.responses import FileResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles # ... # Mount a directory to serve static files (e.g., CSS and JavaScript) import logging app = FastAPI() app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") files = {} # Configure logging logging.basicConfig(level=logging.DEBUG) # Mock data for your interface data = { "speaker_options": ["en","en-us","en-029","n-gb-x-gbclan","en-gb-x-rp","en-gb-scotland","en-gb-gbcwmd", "es", "de", "pl","ar","be","bn","bpy","bs","bg","ca","yue","hak","haw","cmn","hr","cs","da","nl","eo","et","fa","fa-latn","fi","fr-be","fr","ga","gd","ka","grc","el","kl","gn","gu","ht","he","hi","hu","id","io","it","ja","kn","kok","ko","ku","kk","ky","la","lb","ltg","lv","lfn","lt","jbo","mi","mk","ms","ml","mt","mr","nci","ne","nb","nog","or","om","pap","pt-br","pt","ro","ru","ru-lv","uk","sjn","sr","tn","sd","shn","si","sk","sl","es","es-419","sw","sv","ta","th","tk","tt","te","tr","ug","ur","uz","vi-vn-x-central","vi","vi0vn-x-south"], "default_speaker": "en", } # Define voices as an empty list voices = [] import logging import math import sys from pathlib import Path from enum import Enum from typing import Iterable, List, Optional, Union import numpy as np import onnxruntime import glob #import ipywidgets as widgets from pydub import AudioSegment import tempfile import uuid import soundfile as sf #from IPython.display import display, Audio, Markdown, clear_output from piper_phonemize import phonemize_codepoints, phonemize_espeak, tashkeel_run with open("multi265ende175.onnx.json", "r") as config_file: config_data = json.load(config_file) speaker_id_map = config_data.get("speaker_id_map", {}) voices = [{"id": speaker_id, "name": speaker_name} for speaker_id, speaker_name in speaker_id_map.items()] #@app.get("/", response_class=HTMLResponse) #async def read_root(request: Request): # return templates.TemplateResponse("interface.html", {"request": request, "data": data}) @app.get("/", response_class=HTMLResponse) async def read_root(request: Request): #data = {"your_data_key": "your_data_value"} # Replace with your data return templates.TemplateResponse("interface.html", {"request": request, "data": data, "voices": voices}) import json _LOGGER = logging.getLogger("piper_train.infer_onnx") import os #if not os.path.exists("./content/piper/src/python/lng"): # import subprocess # command = "cp -r ./content/piper/notebooks/lng ./content/piper/src/python/lng" # subprocess.run(command, shell=True) import sys #sys.path.append('/content/piper/notebooks') sys.path.append('./content/piper/src/python') import configparser class Translator: def __init__(self): self.configs = {} def load_language(self, language_name): if language_name not in self.configs: config = configparser.ConfigParser() config.read(os.path.join(os.getcwd(), "lng", f"{language_name}.lang")) self.configs[language_name] = config def translate(self, language_name, string): if language_name == "en": return string elif language_name not in self.configs: self.load_language(language_name) config = self.configs[language_name] try: return config.get("Strings", string) except (configparser.NoOptionError, configparser.NoSectionError): if string: return string else: raise Exception("language engine error: This translation is corrupt!") return 0 #from translator import * lan = Translator() def detect_onnx_models(path): onnx_models = glob.glob(path + '/*.onnx') if len(onnx_models) > 1: return onnx_models elif len(onnx_models) == 1: return onnx_models[0] else: return None renamed_audio_file = None #@app.post("/synthesize") #@app.post("/", response_class=FileResponse) @app.post("/", response_class=HTMLResponse) async def main( request: Request, text_input: str = Form(default="1, 2, 3. This is a test. Enter some text to generate."), speaker: str = Form(...), speed_slider: float = Form(...), noise_scale_slider: float = Form(...), noise_scale_w_slider: float = Form(...), play: bool = Form(True) ): """Main entry point""" sys.path.append('./content/piper/src/python') models_path = "./content/piper/src/python" logging.basicConfig(level=logging.DEBUG) providers = [ "CPUExecutionProvider" if use_gpu is False else ("CUDAExecutionProvider", {"cudnn_conv_algo_search": "DEFAULT"}) ] sess_options = onnxruntime.SessionOptions() model = None lang = speaker onnx_models = detect_onnx_models(models_path) # if len(text_input) == 0: # text_input = "1, 2, 3. This is a test. Enter some text to generate." # speaker_selection = widgets.Dropdown( # options=[], # description=f'{lan.translate(lang, "Select speaker")}:', # layout={'visibility': 'hidden'} # ) if onnx_models is None: if enhanced_accessibility: playaudio("novoices") raise Exception(lan.translate(lang, "No downloaded voice packages!")) elif isinstance(onnx_models, str): # Show loading message and disable the form response_html = """
Generating your audio, please wait...
""" onnx_model = onnx_models model, config = load_onnx(onnx_model, sess_options, providers) speaker_id_map = config.get("speaker_id_map", {}) voices = [{"id": speaker_id, "name": speaker_name} for speaker_id, speaker_name in speaker_id_map.items()] config["espeak"]["voice"] = speaker config["inference"]["noise_scale"] = noise_scale_slider config["inference"]["length_scale"] = speed_slider config["inference"]["noise_w"] = noise_scale_w_slider # lan.load_language(speaker) print("speed: ", speed_slider) print("nuber of speakers = ", config["num_speakers"]) print("speaker", speaker) print("language", config["espeak"]["voice"]) # rate = speed_slider.value # noise_scale = noise_scale_slider.value # noise_scale_w = noise_scale_w_slider.value auto_play = play audio = inferencing(model, config, 1, text_input, speed_slider, noise_scale_slider, noise_scale_w_slider, auto_play) temp_dir = tempfile.mkdtemp() # Create a temporary directory to store the audio files #temp_dir = tempfile.mkdtemp() # Export the audio to an MP3 file in the temporary directory # temp_audio_file = os.path.join(temp_dir, "generated_audio.mp3") # Check if text_input is more than 200 characters # if len(text_input) > 100: # Truncate text_input to 200 characters # text_input = text_input[:100] # Rename the audio file based on the text input # renamed_audio_file = os.path.join(temp_dir, f"{text_input}.mp3") renamed_audio_file = os.path.join(temp_dir, "download.mp3") audio.export(renamed_audio_file, format="mp3") # Save the generated audio as a temporary file filepath = renamed_audio_file # Generate a unique file ID file_id = str(uuid.uuid4()) # Store the file path with the generated file ID files[file_id] = filepath # Create a URL to download the file file_url = f'/download?fileId={file_id}' # Restore the form and return the response response_html += """ """ # os.rename(temp_audio_file, renamed_audio_file) # Specify the path to your MP3 audio file # audio_file_path = "path/to/your/audio.mp3" # Check if the file exists # if not os.path.exists(audio_file_path): # return {"detail": "Audio file not found"} # temp_audio_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") # audio.export(temp_audio_file.name, format="mp3") # Rename the temporary audio file based on the text input # global renamed_audio_file # renamed_audio_file = os.path.join(tempfile.gettempdir(), f"{text_input}.mp3") # os.rename(temp_audio_file.name, renamed_audio_file) else: voice_model_names = [] for current in onnx_models: voice_struct = current.split("/")[5] voice_model_names.append(voice_struct) # if enhanced_accessibility: # playaudio("selectmodel") # selection = widgets.Dropdown( # options=voice_model_names, # description=f'{lan.translate(lang, "Select voice package")}:', # ) # load_btn = widgets.Button( # description=lan.translate(lang, "Load it!") # ) # config = None # def load_model(button): # nonlocal config # global onnx_model # nonlocal model # nonlocal models_path # selected_voice = selection.value # onnx_model = f"{models_path}/{selected_voice}" # model, config = load_onnx(onnx_model, sess_options, providers) # if enhanced_accessibility: # playaudio("loaded") # if config["num_speakers"] > 1: # speaker_selection.options = config["speaker_id_map"].values() # speaker_selection.layout.visibility = 'visible' # if enhanced_accessibility: # playaudio("multispeaker") # else: # speaker_selection.layout.visibility = 'hidden' # load_btn.on_click(load_model) # display(selection, load_btn) # display(speaker_selection) # Save the audio as a temporary WAV file return templates.TemplateResponse("interface.html", {"request": request, "file_url": file_url, "text_input": text_input, "data": data, "voices": voices, "dynamic_content": response_html}) # Serve the audio file with the correct media type # return FileResponse(renamed_audio_file) # return {"message": f"Text to synthesize: {text_input}, Speed: {speed_slider}, Play: {play}"} @app.get("/download") async def download_file(fileId: str): # Retrieve the file path from the dictionary using the file ID filepath = files.get(fileId) if filepath: # Create a FileResponse to serve the file for download return FileResponse(filepath, headers={"Content-Disposition": "attachment"}) else: return {"error": "File not found"} def load_onnx(model, sess_options, providers = ["CPUExecutionProvider"]): _LOGGER.debug("Loading model from %s", model) config = load_config(model) model = onnxruntime.InferenceSession( str(model), sess_options=sess_options, providers= providers ) _LOGGER.info("Loaded model from %s", model) return model, config def load_config(model): with open(f"{model}.json", "r") as file: config = json.load(file) return config PAD = "_" # padding (0) BOS = "^" # beginning of sentence EOS = "$" # end of sentence class PhonemeType(str, Enum): ESPEAK = "espeak" TEXT = "text" def phonemize(config, text: str) -> List[List[str]]: """Text to phonemes grouped by sentence.""" if config["phoneme_type"] == PhonemeType.ESPEAK: if config["espeak"]["voice"] == "ar": # Arabic diacritization # https://github.com/mush42/libtashkeel/ text = tashkeel_run(text) return phonemize_espeak(text, config["espeak"]["voice"]) if config["phoneme_type"] == PhonemeType.TEXT: return phonemize_codepoints(text) raise ValueError(f'Unexpected phoneme type: {config["phoneme_type"]}') def phonemes_to_ids(config, phonemes: List[str]) -> List[int]: """Phonemes to ids.""" id_map = config["phoneme_id_map"] ids: List[int] = list(id_map[BOS]) for phoneme in phonemes: if phoneme not in id_map: print("Missing phoneme from id map: %s", phoneme) continue ids.extend(id_map[phoneme]) ids.extend(id_map[PAD]) ids.extend(id_map[EOS]) return ids def audio_float_to_int16( audio: np.ndarray, max_wav_value: float = 32767.0 ) -> np.ndarray: """Normalize audio and convert to int16 range""" audio_norm = audio * (max_wav_value / max(0.01, np.max(np.abs(audio)))) audio_norm = np.clip(audio_norm, -max_wav_value, max_wav_value) audio_norm = audio_norm.astype("int16") return audio_norm def inferencing(model, config, sid, line, length_scale, noise_scale, noise_scale_w, auto_play=True): audios = [] if config["phoneme_type"] == "PhonemeType.ESPEAK": config["phoneme_type"] = "espeak" text = phonemize(config, line) for phonemes in text: phoneme_ids = phonemes_to_ids(config, phonemes) num_speakers = config["num_speakers"] if num_speakers == 1: speaker_id = None # for now else: speaker_id = sid text = np.expand_dims(np.array(phoneme_ids, dtype=np.int64), 0) text_lengths = np.array([text.shape[1]], dtype=np.int64) scales = np.array( [noise_scale, length_scale, noise_scale_w], dtype=np.float32, ) sid = None if speaker_id is not None: sid = np.array([speaker_id], dtype=np.int64) # Ensure sid is a 1D array audio = model.run( None, { "input": text, "input_lengths": text_lengths, "scales": scales, "sid": sid, }, )[0].squeeze((0, 1)) audio = audio_float_to_int16(audio.squeeze()) audios.append(audio) merged_audio = np.concatenate(audios) sample_rate = config["audio"]["sample_rate"] temp_audio_path = os.path.join(tempfile.gettempdir(), "generated_audio.wav") sf.write(temp_audio_path, merged_audio, config["audio"]["sample_rate"]) audio = AudioSegment.from_mp3(temp_audio_path) return audio # return FileResponse(temp_audio_path) # Return the audio file as a FastAPI response # display(Markdown(f"{line}")) # display(Audio(merged_audio, rate=sample_rate, autoplay=auto_play)) def denoise( audio: np.ndarray, bias_spec: np.ndarray, denoiser_strength: float ) -> np.ndarray: audio_spec, audio_angles = transform(audio) a = bias_spec.shape[-1] b = audio_spec.shape[-1] repeats = max(1, math.ceil(b / a)) bias_spec_repeat = np.repeat(bias_spec, repeats, axis=-1)[..., :b] audio_spec_denoised = audio_spec - (bias_spec_repeat * denoiser_strength) audio_spec_denoised = np.clip(audio_spec_denoised, a_min=0.0, a_max=None) audio_denoised = inverse(audio_spec_denoised, audio_angles) return audio_denoised def stft(x, fft_size, hopsamp): """Compute and return the STFT of the supplied time domain signal x. Args: x (1-dim Numpy array): A time domain signal. fft_size (int): FFT size. Should be a power of 2, otherwise DFT will be used. hopsamp (int): Returns: The STFT. The rows are the time slices and columns are the frequency bins. """ window = np.hanning(fft_size) fft_size = int(fft_size) hopsamp = int(hopsamp) return np.array( [ np.fft.rfft(window * x[i : i + fft_size]) for i in range(0, len(x) - fft_size, hopsamp) ] ) def istft(X, fft_size, hopsamp): """Invert a STFT into a time domain signal. Args: X (2-dim Numpy array): Input spectrogram. The rows are the time slices and columns are the frequency bins. fft_size (int): hopsamp (int): The hop size, in samples. Returns: The inverse STFT. """ fft_size = int(fft_size) hopsamp = int(hopsamp) window = np.hanning(fft_size) time_slices = X.shape[0] len_samples = int(time_slices * hopsamp + fft_size) x = np.zeros(len_samples) for n, i in enumerate(range(0, len(x) - fft_size, hopsamp)): x[i : i + fft_size] += window * np.real(np.fft.irfft(X[n])) return x def inverse(magnitude, phase): recombine_magnitude_phase = np.concatenate( [magnitude * np.cos(phase), magnitude * np.sin(phase)], axis=1 ) x_org = recombine_magnitude_phase n_b, n_f, n_t = x_org.shape # pylint: disable=unpacking-non-sequence x = np.empty([n_b, n_f // 2, n_t], dtype=np.complex64) x.real = x_org[:, : n_f // 2] x.imag = x_org[:, n_f // 2 :] inverse_transform = [] for y in x: y_ = istft(y.T, fft_size=1024, hopsamp=256) inverse_transform.append(y_[None, :]) inverse_transform = np.concatenate(inverse_transform, 0) return inverse_transform def transform(input_data): x = input_data real_part = [] imag_part = [] for y in x: y_ = stft(y, fft_size=1024, hopsamp=256).T real_part.append(y_.real[None, :, :]) # pylint: disable=unsubscriptable-object imag_part.append(y_.imag[None, :, :]) # pylint: disable=unsubscriptable-object real_part = np.concatenate(real_part, 0) imag_part = np.concatenate(imag_part, 0) magnitude = np.sqrt(real_part**2 + imag_part**2) phase = np.arctan2(imag_part.data, real_part.data) return magnitude, phase #@app.get("/") #async def read_root(request: Request): # return templates.TemplateResponse("interface.html", {"request": request}) if __name__ == "__main__": # main() import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860) # main() # pass # app() # Create an instance of the FastAPI class #app = main() # Define a route for the root endpoint #def read_root(): # return {"message": "Hello, World!"}