Artrajz's picture
init
960cd20
raw
history blame
34.6 kB
import copy
import logging
import os
import time
import traceback
import uuid
from io import BytesIO
import librosa
import numpy as np
from flask import request, jsonify, make_response, send_file, Blueprint
from werkzeug.utils import secure_filename
from contants import config
# from gpt_sovits.utils import load_audio
from logger import logger
from contants import ModelType
from tts_app.voice_api.auth import require_api_key
from tts_app.model_manager import model_manager, tts_manager
from tts_app.voice_api.utils import *
from utils.data_utils import check_is_none
voice_api = Blueprint("voice_api", __name__)
def get_param(request_data, key, default, data_type=None):
if key == "segment_size" and "max" in request_data:
logger.warning(
"The 'max' parameter is deprecated and will be phased out in the future. Please use 'segment_size' instead.")
return get_param(request_data, "max", default, data_type)
value = request_data.get(key, "")
if data_type:
try:
value = data_type(value)
except:
value = default
if value == "":
value = default
return value
def extract_filename_and_directory(path):
filename = os.path.basename(path)
directory = os.path.dirname(path)
directory_name = os.path.basename(directory)
if not directory: # 如果文件所在文件夹为空(即在根目录)
return filename
else:
return directory_name + "/" + filename
def update_default_params(state):
model_type = state["model_type"]
if model_type == ModelType.VITS:
config_dict = config.vits_config.asdict()
elif model_type == ModelType.W2V2_VITS:
config_dict = config.w2v2_vits_config.asdict()
elif model_type == ModelType.HUBERT_VITS:
config_dict = config.hubert_vits_config.asdict()
elif model_type == ModelType.BERT_VITS2:
config_dict = config.bert_vits2_config.asdict()
elif model_type == ModelType.GPT_SOVITS:
config_dict = config.gpt_sovits_config.asdict()
for key, value in config_dict.items():
if key not in state or value is None:
state[key] = value
return state
@voice_api.route('/default_parameter', methods=["GET", "POST"])
def default_parameter():
gpt_sovits_config = copy.deepcopy(config.gpt_sovits_config.asdict())
for preset_name, preset in gpt_sovits_config["presets"].items():
if not check_is_none(preset["refer_wav_path"]):
preset["refer_wav_path"] = extract_filename_and_directory(preset["refer_wav_path"])
data = {"vits_config": config.vits_config.asdict(),
"w2v2_vits_config": config.w2v2_vits_config.asdict(),
"hubert_vits_config": config.hubert_vits_config.asdict(),
"bert_vits2_config": config.bert_vits2_config.asdict(),
"gpt_sovits_config": gpt_sovits_config
}
return jsonify(data)
@voice_api.route('/speakers', methods=["GET", "POST"])
def voice_speakers_api():
return jsonify(model_manager.voice_speakers)
@voice_api.route('/', methods=["GET", "POST"])
@voice_api.route('/vits', methods=["GET", "POST"])
@require_api_key
def voice_vits_api():
try:
if request.method == "GET":
request_data = request.args
elif request.method == "POST":
content_type = request.headers.get('Content-Type')
if content_type == 'application/json':
request_data = request.get_json()
else:
request_data = request.form
text = get_param(request_data, "text", "", str)
id = get_param(request_data, "id", config.vits_config.id, int)
format = get_param(request_data, "format", config.vits_config.format, str)
lang = get_param(request_data, "lang", config.vits_config.lang, str).lower()
length = get_param(request_data, "length", config.vits_config.length, float)
noise = get_param(request_data, "noise", config.vits_config.noise, float)
noisew = get_param(request_data, "noisew", config.vits_config.noisew, float)
segment_size = get_param(request_data, "segment_size", config.vits_config.segment_size, int)
use_streaming = get_param(request_data, 'streaming', config.vits_config.use_streaming, bool)
except Exception as e:
logger.error(f"[{ModelType.VITS.value}] {e}")
return make_response("parameter error", 400)
logger.info(
f"[{ModelType.VITS.value}] id:{id} format:{format} lang:{lang} length:{length} noise:{noise} noisew:{noisew} segment_size:{segment_size}")
logger.info(f"[{ModelType.VITS.value}] len:{len(text)} text:{text}")
if check_is_none(text):
logger.info(f"[{ModelType.VITS.value}] text is empty")
return make_response(jsonify({"status": "error", "message": "text is empty"}), 400)
if check_is_none(id):
logger.info(f"[{ModelType.VITS.value}] speaker id is empty")
return make_response(jsonify({"status": "error", "message": "speaker id is empty"}), 400)
if id < 0 or id >= model_manager.vits_speakers_count:
logger.info(f"[{ModelType.VITS.value}] speaker id {id} does not exist")
return make_response(jsonify({"status": "error", "message": f"id {id} does not exist"}), 400)
# 校验模型是否支持输入的语言
speaker_lang = model_manager.voice_speakers[ModelType.VITS.value][id].get('lang')
if lang not in ["auto", "mix"] and len(speaker_lang) > 1 and lang not in speaker_lang:
logger.info(f"[{ModelType.VITS.value}] lang \"{lang}\" is not in {speaker_lang}")
return make_response(jsonify({"status": "error", "message": f"lang '{lang}' is not in {speaker_lang}"}),
400)
# 如果配置文件中设置了LANGUAGE_AUTOMATIC_DETECT则强制将speaker_lang设置为LANGUAGE_AUTOMATIC_DETECT
if (lang_detect := config.language_identification.language_automatic_detect) and isinstance(lang_detect, list):
speaker_lang = lang_detect
if use_streaming and format.upper() != "MP3":
format = "mp3"
logger.warning("Streaming response only supports MP3 format.")
fname = f"{str(uuid.uuid1())}.{format}"
file_type = f"audio/{format}"
state = {"text": text,
"id": id,
"format": format,
"length": length,
"noise": noise,
"noisew": noisew,
"segment_size": segment_size,
"lang": lang,
"speaker_lang": speaker_lang}
if use_streaming:
audio = tts_manager.stream_vits_infer(state)
response = make_response(audio)
response.headers['Content-Disposition'] = f'attachment; filename={fname}'
response.headers['Content-Type'] = file_type
return response
else:
t1 = time.time()
audio = tts_manager.vits_infer(state)
t2 = time.time()
logger.info(f"[{ModelType.VITS.value}] finish in {(t2 - t1):.2f}s")
if config.system.cache_audio:
logger.debug(f"[{ModelType.VITS.value}] {fname}")
path = os.path.join(config.system.cache_path, fname)
save_audio(audio.getvalue(), path)
return send_file(path_or_file=audio, mimetype=file_type, download_name=fname)
@voice_api.route('/hubert-vits', methods=["POST"])
@require_api_key
def voice_hubert_api():
if request.method == "POST":
try:
voice = request.files['upload']
id = get_param(request.form, "id", config.hubert_vits_config.id, int)
format = get_param(request.form, "format", config.hubert_vits_config.format)
length = get_param(request.form, "length", config.hubert_vits_config.length, float)
noise = get_param(request.form, "noise", config.hubert_vits_config.noise, float)
noisew = get_param(request.form, "noisew", config.hubert_vits_config.noisew, float)
use_streaming = get_param(request.form, 'streaming', False, bool)
except Exception as e:
logger.error(f"[{ModelType.HUBERT_VITS.value}] {e}")
return make_response("parameter error", 400)
logger.info(
f"[{ModelType.HUBERT_VITS.value}] id:{id} format:{format} length:{length} noise:{noise} noisew:{noisew}")
fname = secure_filename(str(uuid.uuid1()) + "." + voice.filename.split(".")[1])
voice.save(os.path.join(config.system.upload_folder, fname))
if check_is_none(id):
logger.info(f"[{ModelType.HUBERT_VITS.value}] speaker id is empty")
return make_response(jsonify({"status": "error", "message": "speaker id is empty"}), 400)
if id < 0 or id >= model_manager.hubert_speakers_count:
logger.info(f"[{ModelType.HUBERT_VITS.value}] speaker id {id} does not exist")
return make_response(jsonify({"status": "error", "message": f"id {id} does not exist"}), 400)
file_type = f"audio/{format}"
task = {"id": id,
"format": format,
"length": length,
"noise": noise,
"noisew": noisew,
"audio_path": os.path.join(config.system.upload_folder, fname)}
t1 = time.time()
audio = tts_manager.hubert_vits_infer(task)
t2 = time.time()
logger.info(f"[{ModelType.HUBERT_VITS.value}] finish in {(t2 - t1):.2f}s")
if config.system.cache_audio:
logger.debug(f"[{ModelType.HUBERT_VITS.value}] {fname}")
path = os.path.join(config.system.cache_path, fname)
save_audio(audio.getvalue(), path)
if use_streaming:
audio = tts_manager.generate_audio_chunks(audio)
response = make_response(audio)
response.headers['Content-Disposition'] = f'attachment; filename={fname}'
response.headers['Content-Type'] = file_type
return response
else:
return send_file(path_or_file=audio, mimetype=file_type, download_name=fname)
@voice_api.route('/w2v2-vits', methods=["GET", "POST"])
@require_api_key
def voice_w2v2_api():
try:
if request.method == "GET":
request_data = request.args
elif request.method == "POST":
content_type = request.headers.get('Content-Type')
if content_type == 'application/json':
request_data = request.get_json()
else:
request_data = request.form
text = get_param(request_data, "text", "", str)
id = get_param(request_data, "id", config.w2v2_vits_config.id, int)
format = get_param(request_data, "format", config.w2v2_vits_config.format, str)
lang = get_param(request_data, "lang", config.w2v2_vits_config.lang, str).lower()
length = get_param(request_data, "length", config.w2v2_vits_config.length, float)
noise = get_param(request_data, "noise", config.w2v2_vits_config.noise, float)
noisew = get_param(request_data, "noisew", config.w2v2_vits_config.noisew, float)
segment_size = get_param(request_data, "segment_size", config.w2v2_vits_config.segment_size, int)
emotion = get_param(request_data, "emotion", config.w2v2_vits_config.emotion, int)
emotion_reference = get_param(request_data, "emotion_reference", None, str)
use_streaming = get_param(request_data, 'streaming', False, bool)
except Exception as e:
logger.error(f"[{ModelType.W2V2_VITS.value}] {e}")
return make_response(f"parameter error", 400)
logger.info(f"[{ModelType.W2V2_VITS.value}] id:{id} format:{format} lang:{lang} "
f"length:{length} noise:{noise} noisew:{noisew} emotion:{emotion} segment_size:{segment_size}")
logger.info(f"[{ModelType.W2V2_VITS.value}] len:{len(text)} text:{text}")
if check_is_none(text):
logger.info(f"[{ModelType.W2V2_VITS.value}] text is empty")
return make_response(jsonify({"status": "error", "message": "text is empty"}), 400)
if check_is_none(id):
logger.info(f"[{ModelType.W2V2_VITS.value}] speaker id is empty")
return make_response(jsonify({"status": "error", "message": "speaker id is empty"}), 400)
if id < 0 or id >= model_manager.w2v2_speakers_count:
logger.info(f"[{ModelType.W2V2_VITS.value}] speaker id {id} does not exist")
return make_response(jsonify({"status": "error", "message": f"id {id} does not exist"}), 400)
# 校验模型是否支持输入的语言
speaker_lang = model_manager.voice_speakers[ModelType.W2V2_VITS.value][id].get('lang')
if lang not in ["auto", "mix"] and len(speaker_lang) > 1 and lang not in speaker_lang:
logger.info(f"[{ModelType.W2V2_VITS.value}] lang \"{lang}\" is not in {speaker_lang}")
return make_response(jsonify({"status": "error", "message": f"lang '{lang}' is not in {speaker_lang}"}),
400)
# 如果配置文件中设置了LANGUAGE_AUTOMATIC_DETECT则强制将speaker_lang设置为LANGUAGE_AUTOMATIC_DETECT
if (lang_detect := config.language_identification.language_automatic_detect) and isinstance(lang_detect, list):
speaker_lang = lang_detect
if use_streaming and format.upper() != "MP3":
format = "mp3"
logger.warning("Streaming response only supports MP3 format.")
fname = f"{str(uuid.uuid1())}.{format}"
file_type = f"audio/{format}"
task = {"text": text,
"id": id,
"format": format,
"length": length,
"noise": noise,
"noisew": noisew,
"segment_size": segment_size,
"lang": lang,
"emotion": emotion,
"emotion_reference": emotion_reference,
"speaker_lang": speaker_lang}
t1 = time.time()
audio = tts_manager.w2v2_vits_infer(task)
t2 = time.time()
logger.info(f"[{ModelType.W2V2_VITS.value}] finish in {(t2 - t1):.2f}s")
if config.system.cache_audio:
logger.debug(f"[{ModelType.W2V2_VITS.value}] {fname}")
path = os.path.join(config.system.cache_path, fname)
save_audio(audio.getvalue(), path)
if use_streaming:
audio = tts_manager.generate_audio_chunks(audio)
response = make_response(audio)
response.headers['Content-Disposition'] = f'attachment; filename={fname}'
response.headers['Content-Type'] = file_type
return response
else:
return send_file(path_or_file=audio, mimetype=file_type, download_name=fname)
@voice_api.route('/conversion', methods=["POST"])
@voice_api.route('/vits/conversion', methods=["POST"])
@require_api_key
def vits_voice_conversion_api():
if request.method == "POST":
try:
voice = request.files['upload']
original_id = get_param(request.form, "original_id", 0, int)
target_id = get_param(request.form, "target_id", 0, int)
format = get_param(request.form, "format", voice.filename.split(".")[1], str)
use_streaming = get_param(request.form, 'streaming', False, bool)
except Exception as e:
logger.error(f"[vits_voice_convertsion] {e}")
return make_response("parameter error", 400)
logger.info(f"[vits_voice_convertsion] orginal_id:{original_id} target_id:{target_id}")
fname = secure_filename(str(uuid.uuid1()) + "." + voice.filename.split(".")[1])
audio_path = os.path.join(config.system.upload_folder, fname)
voice.save(audio_path)
file_type = f"audio/{format}"
state = {"audio_path": audio_path,
"original_id": original_id,
"target_id": target_id,
"format": format}
t1 = time.time()
audio = tts_manager.vits_voice_conversion(state)
t2 = time.time()
logger.info(f"[Voice conversion] finish in {(t2 - t1):.2f}s")
if config.system.cache_audio:
logger.debug(f"[Voice conversion] {fname}")
path = os.path.join(config.system.cache_path, fname)
save_audio(audio.getvalue(), path)
if use_streaming:
audio = tts_manager.generate_audio_chunks(audio)
response = make_response(audio)
response.headers['Content-Disposition'] = f'attachment; filename={fname}'
response.headers['Content-Type'] = file_type
return response
else:
return send_file(path_or_file=audio, mimetype=file_type, download_name=fname)
@voice_api.route('/ssml', methods=["POST"])
@require_api_key
def ssml_api():
try:
content_type = request.headers.get('Content-Type')
if content_type == 'application/json':
request_data = request.get_json()
else:
request_data = request.form
ssml = request_data.get("ssml")
except Exception as e:
logger.info(f"[ssml] {e}")
return make_response(jsonify({"status": "error", "message": f"parameter error"}), 400)
logger.debug(ssml)
voice_tasks, format = tts_manager.parse_ssml(ssml)
fname = f"{str(uuid.uuid1())}.{format}"
file_type = f"audio/{format}"
t1 = time.time()
audio = tts_manager.process_ssml_infer_task(voice_tasks, format)
t2 = time.time()
logger.info(f"[ssml] finish in {(t2 - t1):.2f}s")
if config.system.cache_audio:
logger.debug(f"[ssml] {fname}")
path = os.path.join(config.system.cache_path, fname)
save_audio(audio.getvalue(), path)
return send_file(path_or_file=audio, mimetype=file_type, download_name=fname)
@voice_api.route('/dimension-emotion', methods=["POST"])
@require_api_key
def dimensional_emotion_api():
if request.method == "POST":
try:
audio = request.files['upload']
except Exception as e:
logger.error(f"[dimensional_emotion] {e}")
return make_response("parameter error", 400)
content = BytesIO(audio.read())
file_type = "application/octet-stream; charset=ascii"
fname = os.path.splitext(audio.filename)[0] + ".npy"
emotion_npy = tts_manager.get_dimensional_emotion_npy(content)
return send_file(path_or_file=emotion_npy, mimetype=file_type, download_name=fname)
@voice_api.route('/bert-vits2', methods=["GET", "POST"])
@require_api_key
def voice_bert_vits2_api():
try:
if request.method == "GET":
request_data = request.args
elif request.method == "POST":
content_type = request.headers.get('Content-Type')
if content_type == 'application/json':
request_data = request.get_json()
else:
request_data = request.form
text = get_param(request_data, "text", "", str)
id = get_param(request_data, "id", config.bert_vits2_config.id, int)
format = get_param(request_data, "format", config.bert_vits2_config.format, str)
lang = get_param(request_data, "lang", config.bert_vits2_config.lang, str).lower()
length = get_param(request_data, "length", config.bert_vits2_config.length, float)
# length_zh = get_param(request_data, "length_zh", config.bert_vits2_config.length_zh, float)
# length_ja = get_param(request_data, "length_ja", config.bert_vits2_config.length_ja, float)
# length_en = get_param(request_data, "length_en", config.bert_vits2_config.length_en, float)
noise = get_param(request_data, "noise", config.bert_vits2_config.noise, float)
noisew = get_param(request_data, "noisew", config.bert_vits2_config.noisew, float)
sdp_ratio = get_param(request_data, "sdp_ratio", config.bert_vits2_config.sdp_ratio, float)
segment_size = get_param(request_data, "segment_size", config.bert_vits2_config.segment_size, int)
use_streaming = get_param(request_data, 'streaming', config.bert_vits2_config.use_streaming, bool)
emotion = get_param(request_data, 'emotion', config.bert_vits2_config.emotion, int)
reference_audio = request.files.get("reference_audio", None)
text_prompt = get_param(request_data, 'text_prompt', config.bert_vits2_config.text_prompt, str)
style_text = get_param(request_data, 'style_text', config.bert_vits2_config.style_text, str)
style_weight = get_param(request_data, 'style_weight', config.bert_vits2_config.style_weight, float)
except Exception as e:
logger.error(f"[{ModelType.BERT_VITS2.value}] {e}")
return make_response("parameter error", 400)
# logger.info(
# f"[{ModelType.BERT_VITS2.value}] id:{id} format:{format} lang:{lang} length:{length} noise:{noise} noisew:{noisew} sdp_ratio:{sdp_ratio} segment_size:{segment_size}"
# f" length_zh:{length_zh} length_ja:{length_ja} length_en:{length_en}")
logger.info(
f"[{ModelType.BERT_VITS2.value}] id:{id} format:{format} lang:{lang} length:{length} noise:{noise} noisew:{noisew} sdp_ratio:{sdp_ratio} segment_size:{segment_size} streaming:{use_streaming}")
logger.info(f"[{ModelType.BERT_VITS2.value}] len:{len(text)} text:{text}")
if reference_audio:
logger.info(f"[{ModelType.BERT_VITS2.value}] reference_audio:{reference_audio.filename}")
elif emotion:
logger.info(f"[{ModelType.BERT_VITS2.value}] emotion:{emotion}")
elif text_prompt:
logger.info(f"[{ModelType.BERT_VITS2.value}] text_prompt:{text_prompt}")
elif style_text:
logger.info(f"[{ModelType.BERT_VITS2.value}] style_text:{style_text} style_weight:{style_weight}")
if check_is_none(text):
logger.info(f"[{ModelType.BERT_VITS2.value}] text is empty")
return make_response(jsonify({"status": "error", "message": "text is empty"}), 400)
if check_is_none(id):
logger.info(f"[{ModelType.BERT_VITS2.value}] speaker id is empty")
return make_response(jsonify({"status": "error", "message": "speaker id is empty"}), 400)
if id < 0 or id >= model_manager.bert_vits2_speakers_count:
logger.info(f"[{ModelType.BERT_VITS2.value}] speaker id {id} does not exist")
return make_response(jsonify({"status": "error", "message": f"id {id} does not exist"}), 400)
if emotion and (emotion < 0 or emotion > 9):
logger.info(f"[{ModelType.BERT_VITS2.value}] emotion {emotion} out of the range 0-9")
return make_response(jsonify({"status": "error", "message": f"emotion {emotion} out of the range 0-9"}), 400)
# 校验模型是否支持输入的语言
speaker_lang = model_manager.voice_speakers[ModelType.BERT_VITS2.value][id].get('lang')
if lang not in ["auto", "mix"] and len(speaker_lang) > 1 and lang not in speaker_lang:
logger.info(f"[{ModelType.BERT_VITS2.value}] lang \"{lang}\" is not in {speaker_lang}")
return make_response(jsonify({"status": "error", "message": f"lang '{lang}' is not in {speaker_lang}"}),
400)
# 如果配置文件中设置了LANGUAGE_AUTOMATIC_DETECT则强制将speaker_lang设置为LANGUAGE_AUTOMATIC_DETECT
if (lang_detect := config.language_identification.language_automatic_detect) and isinstance(lang_detect, list):
speaker_lang = lang_detect
if use_streaming and format.upper() != "MP3":
format = "mp3"
logger.warning("Streaming response only supports MP3 format.")
fname = f"{str(uuid.uuid1())}.{format}"
file_type = f"audio/{format}"
state = {"text": text,
"id": id,
"format": format,
"length": length,
# "length_zh": length_zh,
# "length_ja": length_ja,
# "length_en": length_en,
"noise": noise,
"noisew": noisew,
"sdp_ratio": sdp_ratio,
"segment_size": segment_size,
"lang": lang,
"speaker_lang": speaker_lang,
"emotion": emotion,
"reference_audio": reference_audio,
"text_prompt": text_prompt,
"style_text": style_text,
"style_weight": style_weight,
}
if use_streaming:
# audio = tts_manager.stream_bert_vits2_infer(state)
audio = tts_manager.stream_bert_vits2_infer(state)
response = make_response(audio)
response.headers['Content-Disposition'] = f'attachment; filename={fname}'
response.headers['Content-Type'] = file_type
return response
else:
t1 = time.time()
# audio = tts_manager.bert_vits2_infer(state)
audio = tts_manager.bert_vits2_infer(state)
t2 = time.time()
logger.info(f"[{ModelType.BERT_VITS2.value}] finish in {(t2 - t1):.2f}s")
if config.system.cache_audio:
logger.debug(f"[{ModelType.BERT_VITS2.value}] {fname}")
path = os.path.join(config.system.cache_path, fname)
save_audio(audio.getvalue(), path)
return send_file(path_or_file=audio, mimetype=file_type, download_name=fname)
@voice_api.route('/gpt-sovits', methods=["GET", "POST"])
@require_api_key
def voice_gpt_sovits_api():
try:
if request.method == "GET":
request_data = request.args
elif request.method == "POST":
content_type = request.headers.get('Content-Type')
if content_type == 'application/json':
request_data = request.get_json()
else:
request_data = request.form
text = get_param(request_data, "text", "", str)
id = get_param(request_data, "id", config.gpt_sovits_config.id, int)
lang = get_param(request_data, "lang", config.gpt_sovits_config.lang, str)
format = get_param(request_data, "format", config.gpt_sovits_config.format, str)
segment_size = get_param(request_data, "segment_size", config.gpt_sovits_config.segment_size, int)
reference_audio = request.files.get("reference_audio", None)
preset = get_param(request_data, "preset", None, str)
# refer_wav_path = get_param(request_data, "refer_wav_path",
# config.gpt_sovits_config.presets.get("default").refer_wav_path, str)
prompt_text = get_param(request_data, "prompt_text", None, str)
prompt_lang = get_param(request_data, "prompt_lang", None, str)
top_k = get_param(request_data, "top_k", config.gpt_sovits_config.top_k, int)
top_p = get_param(request_data, "top_p", config.gpt_sovits_config.top_p, float)
temperature = get_param(request_data, "temperature", config.gpt_sovits_config.temperature, float)
use_streaming = get_param(request_data, 'streaming', config.gpt_sovits_config.use_streaming, bool)
batch_size = get_param(request_data, 'batch_size', config.gpt_sovits_config.batch_size, int)
speed_factor = get_param(request_data, 'speed', config.gpt_sovits_config.speed, float)
except Exception as e:
logger.error(f"[{ModelType.GPT_SOVITS.value}] {e}")
return make_response("parameter error", 400)
logger.info(
f"[{ModelType.GPT_SOVITS.value}] id:{id} format:{format} lang:{lang} segment_size:{segment_size} top_k:{top_k} top_p:{top_p} temperature:{temperature} streaming:{use_streaming}")
logger.info(
f"[{ModelType.GPT_SOVITS.value}] batch_size:{batch_size} speed_factor:{speed_factor}")
logger.info(f"[{ModelType.GPT_SOVITS.value}] len:{len(text)} text:{text}")
if check_is_none(text):
logger.info(f"[{ModelType.GPT_SOVITS.value}] text is empty")
return make_response(jsonify({"status": "error", "message": "text is empty"}), 400)
if check_is_none(id):
logger.info(f"[{ModelType.GPT_SOVITS.value}] speaker id is empty")
return make_response(jsonify({"status": "error", "message": "speaker id is empty"}), 400)
if id < 0 or id >= model_manager.gpt_sovits_speakers_count:
logger.info(f"[{ModelType.GPT_SOVITS.value}] speaker id {id} does not exist")
return make_response(jsonify({"status": "error", "message": f"id {id} does not exist"}), 400)
# 校验模型是否支持输入的语言
speaker_lang = model_manager.voice_speakers[ModelType.GPT_SOVITS.value][id].get('lang')
if lang not in ["auto", "mix"] and len(speaker_lang) > 1 and lang not in speaker_lang:
logger.info(f"[{ModelType.GPT_SOVITS.value}] lang \"{lang}\" is not in {speaker_lang}")
return make_response(jsonify({"status": "error", "message": f"lang '{lang}' is not in {speaker_lang}"}),
400)
# 如果配置文件中设置了LANGUAGE_AUTOMATIC_DETECT则强制将speaker_lang设置为LANGUAGE_AUTOMATIC_DETECT
if (lang_detect := config.language_identification.language_automatic_detect) and isinstance(lang_detect, list):
speaker_lang = lang_detect
logger.info(
f"[{ModelType.GPT_SOVITS.value}] prompt_text:{prompt_text} prompt_lang:{prompt_lang} ")
if use_streaming and format.upper() != "MP3":
format = "mp3"
logger.warning("Streaming response only supports MP3 format.")
fname = f"{str(uuid.uuid1())}.{format}"
file_type = f"audio/{format}"
state = {"text": text,
"id": id,
"format": format,
"segment_size": segment_size,
"lang": lang,
"speaker_lang": speaker_lang,
"reference_audio": reference_audio,
# "reference_audio_sr": reference_audio_sr,
"prompt_text": prompt_text,
"prompt_lang": prompt_lang,
"top_k": top_k,
"top_p": top_p,
"temperature": temperature,
"preset": preset,
"batch_size": batch_size,
"speed_factor": speed_factor
}
if use_streaming:
audio = tts_manager.stream_gpt_sovits_infer(state)
response = make_response(audio)
response.headers['Content-Disposition'] = f'attachment; filename={fname}'
response.headers['Content-Type'] = file_type
return response
else:
t1 = time.time()
audio = tts_manager.gpt_sovits_infer(state)
t2 = time.time()
logger.info(f"[{ModelType.GPT_SOVITS.value}] finish in {(t2 - t1):.2f}s")
if config.system.cache_audio:
logger.debug(f"[{ModelType.GPT_SOVITS.value}] {fname}")
path = os.path.join(config.system.cache_path, fname)
save_audio(audio.getvalue(), path)
return send_file(path_or_file=audio, mimetype=file_type, download_name=fname)
@voice_api.route('/reading', methods=["GET", "POST"])
@require_api_key
def voice_reading_api():
in_state = {} # interlocutor
nr_state = {} # narrator
state = {}
try:
if request.method == "GET":
request_data = request.args
elif request.method == "POST":
content_type = request.headers.get('Content-Type')
if content_type == 'application/json':
request_data = request.get_json()
else:
request_data = request.form
in_state["model_type"] = ModelType(
get_param(request_data, "in_model_type", config.reading_config.interlocutor.model_type, str))
in_state["id"] = get_param(request_data, "in_id", config.reading_config.interlocutor.id, int)
in_state["preset"] = get_param(request_data, "in_preset", config.reading_config.interlocutor.preset, str)
# narrator
nr_state["model_type"] = ModelType(
get_param(request_data, "nr_model_type", config.reading_config.narrator.model_type, str))
nr_state["id"] = get_param(request_data, "nr_id", config.reading_config.narrator.model_type, int)
nr_state["preset"] = get_param(request_data, "nr_preset", config.reading_config.narrator.preset, str)
state["text"] = get_param(request_data, "text", "", str)
state["lang"] = get_param(request_data, "lang", "auto", str)
state["format"] = get_param(request_data, "format", "wav", str)
except Exception as e:
logger.error(f"[Reading] {e}")
return make_response("parameter error", 400)
in_state.update(state)
nr_state.update(state)
in_state = update_default_params(in_state)
nr_state = update_default_params(nr_state)
file_type = f'audio/{state["format"]}'
fname = f"{str(uuid.uuid1())}.{state['format']}"
t1 = time.time()
audio = tts_manager.reading(in_state=in_state, nr_state=nr_state)
t2 = time.time()
logger.info(f"[Reading] finish in {(t2 - t1):.2f}s")
return send_file(path_or_file=audio, mimetype=file_type, download_name=fname)
@voice_api.route('/check', methods=["GET", "POST"])
def check():
try:
if request.method == "GET":
request_data = request.args
elif request.method == "POST":
content_type = request.headers.get('Content-Type')
if content_type == 'application/json':
request_data = request.get_json()
else:
request_data = request.form
model_type_str = request_data.get("model_type", request_data.get("model")).upper()
id = int(request_data.get("id"))
except Exception as e:
logger.info(f"[check] {e}")
return make_response(jsonify({"status": "error", "message": "parameter error"}), 400)
if check_is_none(model_type_str):
logger.info(f"[check] model {model_type_str} is empty")
return make_response(jsonify({"status": "error", "message": "model is empty"}), 400)
if model_type_str not in ModelType._value2member_map_:
res = make_response(jsonify({"status": "error", "message": f"model {model_type_str} does not exist"}))
res.status = 404
logger.info(f"[check] speaker id {id} error")
return res
if check_is_none(id):
logger.info(f"[check] speaker id is empty")
return make_response(jsonify({"status": "error", "message": "speaker id is empty"}), 400)
model_type = ModelType(model_type_str)
speaker_list = model_manager.voice_speakers[model_type.value]
if len(speaker_list) == 0:
logger.info(f"[check] {model_type_str} not loaded")
return make_response(jsonify({"status": "error", "message": f"{model_type_str} not loaded"}), 400)
if id < 0 or id >= len(speaker_list):
logger.info(f"[check] speaker id {id} does not exist")
return make_response(jsonify({"status": "error", "message": f"id {id} does not exist"}), 400)
name = str(speaker_list[id]["name"])
lang = speaker_list[id]["lang"]
logger.info(f"[check] check id:{id} name:{name} lang:{lang}")
return make_response(jsonify({"status": "success", "id": id, "name": name, "lang": lang}), 200)