import re, os, logging, tempfile, subprocess import requests import torch import traceback from TTS.api import TTS os.environ["COQUI_TOS_AGREED"]="1" api=os.environ.get('api') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') tts=None model=None root=os.path.dirname(os.path.abspath(__file__)) LIBROSA_CACHE_DIR=f'{root}/caches' if not os.path.exists(LIBROSA_CACHE_DIR): os.makedirs(LIBROSA_CACHE_DIR) os.environ["LIBROSA_CACHE_DIR"]=LIBROSA_CACHE_DIR sample_root=f'{root}/samples' if not os.path.exists(sample_root): os.makedirs(sample_root) default_sample=f'{root}/sample.wav', f'{sample_root}/sample.pt' ffmpeg=f'{root}/ffmpeg' if api: from qili import upload else: def upload(file): return file def predict(text, sample=None, language="zh"): get_tts() global tts global model try: text= re.sub("([^\x00-\x7F]|\w)(\.|\。|\?)",r"\1 \2\2",text) output=tempfile.mktemp(suffix=".wav") tts.tts_to_file( text, language=language if language is not None else "zh", speaker_wav=sample if sample is not None else default_sample[0], file_path=output ) output=to_mp3(output) return upload(output)[0] except Exception as e: traceback.print_exc() return str(e) def playInHTML(url): return f''' ''' def get_conditioning_latents(audio_path, **others): global model speaker_wav, pt_file=download_sample(audio_path) try: if pt_file != None: ( gpt_cond_latent, speaker_embedding, ) = torch.load(pt_file) logging.debug(f'sample wav info loaded from {pt_file}') except: ( gpt_cond_latent, speaker_embedding, ) = model.__get_conditioning_latents(audio_path=speaker_wav, **others) torch.save((gpt_cond_latent,speaker_embedding), pt_file) logging.debug(f'sample wav info saved to {pt_file}') return gpt_cond_latent,speaker_embedding def download_sample(url): try: response = requests.get(url) if response.status_code == 200: id=f'{sample_root}/{response.headers["etag"]}.pt'.replace('"','') if(os.path.exists(id)): return "", id with tempfile.NamedTemporaryFile(mode="wb", suffix=".wav", delete=True) as temp_file: temp_file.write(response.content) return trim_sample_audio(os.path.abspath(temp_file.name)), id except: return default_sample def download(url): response = requests.get(url) if response.status_code == 200: with tempfile.NamedTemporaryFile(mode="wb", suffix=".wav", delete=True) as temp_file: temp_file.write(response.content) return os.path.abspath(temp_file.name) def trim_sample_audio(speaker_wav): global ffmpeg try: lowpass_highpass = "lowpass=8000,highpass=75," trim_silence = "areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02,areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02," out_filename=speaker_wav.replace(".wav","_trimed.wav") shell_command = f"{ffmpeg} -y -i {speaker_wav} -af {lowpass_highpass}{trim_silence} {out_filename}".split(" ") subprocess.run( [item for item in shell_command], capture_output=False, text=True, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) return out_filename except: traceback.print_exc() return speaker_wav def to_mp3(wav): global ffmpeg try: mp3=tempfile.mktemp(suffix=".mp3") shell_command = f"{ffmpeg} -i {wav} {mp3}".split(" ") subprocess.run( [item for item in shell_command], capture_output=False, text=True, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) return mp3 except: traceback.print_exc() return wav # if __name__ == "__main__": # app = Flask(__name__) # else: # app = Blueprint("xtts", __name__) from flask import Flask, request app = Flask(__name__) @app.route("/tts") def convert(): text = request.args.get('text') sample = request.args.get('sample') language = request.args.get('language') if text is None: return 'text is missing', 400 return predict(text, sample, language) @app.route("/tts/play") def tts_play(): url=convert() return playInHTML(url) @app.route("/setup") def get_tts(model_path=os.environ["MODEL_DIR"]): global tts global model if tts is None: config_path=f'{model_path}/config.json' vocoder_config_path=f'{model_path}/vocab.json' model_name="tts_models/multilingual/multi-dataset/xtts_v2" logging.info(f"loading model {model_name} ...") tts = TTS( model_name if bLOCAL else None, model_path=model_path if not bLOCAL else None, config_path=config_path if not bLOCAL else None, vocoder_config_path=vocoder_config_path if not bLOCAL else None, progress_bar=True ) model=tts.synthesizer.tts_model #hack to use cache model.__get_conditioning_latents=model.get_conditioning_latents model.get_conditioning_latents=get_conditioning_latents logging.info("model is ready") return "ready" # import gradio as gr # demo=gr.Interface(predict, inputs=["text", "text"], outputs=gr.Audio()) # app = gr.mount_gradio_app(app, demo, path="/") @app.route("/") def hello(): return "welcome!" logging.info("xtts is ready")