import numpy as np import soundfile from flask import Flask, request, send_file, jsonify from inference import infer_tool, slicer import requests import os import uuid import threading from qcloud_cos import CosConfig from qcloud_cos import CosS3Client tasks = {} running_threads = 0 condition = threading.Condition() def infer(audio_path, tran, spk, wav_format, task_id): global running_threads with condition: while running_threads >= 1: tasks[task_id] = {"status": "queue"} condition.wait() running_threads += 1 tasks[task_id] = {"status": "processing"} try: audio_name = audio_path.split('/')[-1] infer_tool.format_wav(audio_path) chunks = slicer.cut(audio_path, db_thresh=-40) audio_data, audio_sr = slicer.chunks2audio(audio_path, chunks) audio = [] for (slice_tag, data) in audio_data: print(f'#=====segment start, {round(len(data) / audio_sr, 3)}s======') length = int(np.ceil(len(data) / audio_sr * svc_model.target_sample)) if slice_tag: print('jump empty segment') _audio = np.zeros(length) else: # padd pad_len = int(audio_sr * 0.5) data = np.concatenate([np.zeros([pad_len]), data, np.zeros([pad_len])]) raw_path = io.BytesIO() soundfile.write(raw_path, data, audio_sr, format="wav") raw_path.seek(0) out_audio, out_sr = svc_model.infer(spk, tran, raw_path) svc_model.clear_empty() _audio = out_audio.cpu().numpy() pad_len = int(svc_model.target_sample * 0.5) _audio = _audio[pad_len:-pad_len] audio.extend(list(infer_tool.pad_array(_audio, length))) out_wav_path = "/tmp/" + audio_name soundfile.write(out_wav_path, audio, svc_model.target_sample, format=wav_format) out_wav_path.seek(0) # 提供文件的永久直链 result_audio_url = f"/download/{os.path.basename(out_wav_path)}" # 更新任务状态 tasks[task_id] = { "status": "completed", "url": result_audio_url, } except Exception as e: tasks[task_id] = { "status": "error", "message": str(e) } with condition: running_threads -= 1 condition.notify_all() app = Flask(__name__) @app.route("/wav2wav", methods=["GET"]) def wav2wav(): task_id = str(uuid.uuid4()) tasks[task_id] = {"status": "processing"} audio_result = requests.get(request.args.get('audio_path')) if audio_result.status_code != 200: raise Exception("无效的 URL") with open("/tmp/" + request.args.get("audio_path", "").split('/')[-1], 'wb') as f: f.write(audio_result.content) audio_path = "/tmp/" + request.args.get("audio_path", "").split('/')[-1] # wav文件地址 tran = int(float(request.args.get("tran", 0))) # 音调 spk = request.args.get("spk", 0) # 说话人(id或者name都可以,具体看你的config) wav_format = request.args.get("wav_format", 'wav') # 范围文件格式 threading.Thread(target=infer, args=(audio_path, tran, spk, wav_format)).start() return jsonify({"task_id": task_id}), 202 @app.route('/api/tasks/', methods=['GET']) def get_task_status(task_id): task = tasks.get(task_id) if task: return jsonify(task) else: return jsonify({"error": "Task not found"}), 404 @app.route('/download/', methods=['GET']) def download(filename): return send_file("/tmp/" + filename, as_attachment=True) if __name__ == '__main__': secret_id = os.getenv('SECRET_ID') secret_key = os.getenv('SECRET_KEY') region = 'na-siliconvalley' bucket_name = 'xiaohei-cat-ai-1304646510' config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key) client = CosS3Client(config) response1 = client.get_object( Bucket=bucket_name, Key="models/So-VITS-SVC/Koxia-Full/G_full.pth" ) with open("/tmp/G_full.pth", 'wb') as local_file: for chunk in response1['Body'].iter_chunks(): local_file.write(chunk) response2 = client.get_object( Bucket=bucket_name, Key="models/So-VITS-SVC/Koxia-Full/config.json" ) with open("/tmp/config.json", 'wb') as local_file: for chunk in response2['Body'].iter_chunks(): local_file.write(chunk) model_name = "/tmp/G_full.pth" # 模型地址 config_name = "/tmp/config.json" # config地址 svc_model = infer_tool.Svc(model_name, config_name) app.run(port=1145, host="0.0.0.0", debug=False, threaded=False)