import os, sys import traceback import logging now_dir = os.getcwd() sys.path.append(now_dir) logger = logging.getLogger(__name__) import lib.globals.globals as rvc_globals import numpy as np import soundfile as sf import torch from io import BytesIO from lib.infer.infer_libs.audio import load_audio from lib.infer.infer_libs.audio import wav2 from lib.infer.infer_libs.infer_pack.models import ( SynthesizerTrnMs256NSFsid, SynthesizerTrnMs256NSFsid_nono, SynthesizerTrnMs768NSFsid, SynthesizerTrnMs768NSFsid_nono, ) from lib.infer.modules.vc.pipeline import Pipeline from lib.infer.modules.vc.utils import * import tabs.merge as merge import time import scipy.io.wavfile as wavfile import glob from shutil import move sup_audioext = { "wav", "mp3", "flac", "ogg", "opus", "m4a", "mp4", "aac", "alac", "wma", "aiff", "webm", "ac3", } def note_to_hz(note_name): SEMITONES = {'C': -9, 'C#': -8, 'D': -7, 'D#': -6, 'E': -5, 'F': -4, 'F#': -3, 'G': -2, 'G#': -1, 'A': 0, 'A#': 1, 'B': 2} pitch_class, octave = note_name[:-1], int(note_name[-1]) semitone = SEMITONES[pitch_class] note_number = 12 * (octave - 4) + semitone frequency = 440.0 * (2.0 ** (1.0/12)) ** note_number return frequency class VC: def __init__(self, config): self.n_spk = None self.tgt_sr = None self.net_g = None self.pipeline = None self.cpt = None self.version = None self.if_f0 = None self.version = None self.hubert_model = None self.config = config def get_vc(self, sid, *to_return_protect): logger.info("Get sid: " + sid) to_return_protect0 = { "visible": self.if_f0 != 0, "value": to_return_protect[0] if self.if_f0 != 0 and to_return_protect else 0.5, "__type__": "update", } to_return_protect1 = { "visible": self.if_f0 != 0, "value": to_return_protect[1] if self.if_f0 != 0 and to_return_protect else 0.33, "__type__": "update", } if sid == "" or sid == []: if self.hubert_model is not None: # 考虑到轮询, 需要加个判断看是否 sid 是由有模型切换到无模型的 logger.info("Clean model cache") del ( self.net_g, self.n_spk, self.vc, self.hubert_model, self.tgt_sr, ) # ,cpt self.hubert_model = ( self.net_g ) = self.n_spk = self.vc = self.hubert_model = self.tgt_sr = None if torch.cuda.is_available(): torch.cuda.empty_cache() ###楼下不这么折腾清理不干净 self.if_f0 = self.cpt.get("f0", 1) self.version = self.cpt.get("version", "v1") if self.version == "v1": if self.if_f0 == 1: self.net_g = SynthesizerTrnMs256NSFsid( *self.cpt["config"], is_half=self.config.is_half ) else: self.net_g = SynthesizerTrnMs256NSFsid_nono(*self.cpt["config"]) elif self.version == "v2": if self.if_f0 == 1: self.net_g = SynthesizerTrnMs768NSFsid( *self.cpt["config"], is_half=self.config.is_half ) else: self.net_g = SynthesizerTrnMs768NSFsid_nono(*self.cpt["config"]) del self.net_g, self.cpt if torch.cuda.is_available(): torch.cuda.empty_cache() return ( {"visible": False, "__type__": "update"}, { "visible": True, "value": to_return_protect0, "__type__": "update", }, { "visible": True, "value": to_return_protect1, "__type__": "update", }, "", "", ) #person = f'{os.getenv("weight_root")}/{sid}' person = f'{sid}' #logger.info(f"Loading: {person}") logger.info(f"Loading...") self.cpt = torch.load(person, map_location="cpu") self.tgt_sr = self.cpt["config"][-1] self.cpt["config"][-3] = self.cpt["weight"]["emb_g.weight"].shape[0] # n_spk self.if_f0 = self.cpt.get("f0", 1) self.version = self.cpt.get("version", "v1") synthesizer_class = { ("v1", 1): SynthesizerTrnMs256NSFsid, ("v1", 0): SynthesizerTrnMs256NSFsid_nono, ("v2", 1): SynthesizerTrnMs768NSFsid, ("v2", 0): SynthesizerTrnMs768NSFsid_nono, } self.net_g = synthesizer_class.get( (self.version, self.if_f0), SynthesizerTrnMs256NSFsid )(*self.cpt["config"], is_half=self.config.is_half) del self.net_g.enc_q self.net_g.load_state_dict(self.cpt["weight"], strict=False) self.net_g.eval().to(self.config.device) if self.config.is_half: self.net_g = self.net_g.half() else: self.net_g = self.net_g.float() self.pipeline = Pipeline(self.tgt_sr, self.config) n_spk = self.cpt["config"][-3] index = {"value": get_index_path_from_model(sid), "__type__": "update"} logger.info("Select index: " + index["value"]) return ( ( {"visible": False, "maximum": n_spk, "__type__": "update"}, to_return_protect0, to_return_protect1 ) if to_return_protect else {"visible": False, "maximum": n_spk, "__type__": "update"} ) def vc_single( self, sid, input_audio_path1, f0_up_key, f0_file, f0_method, file_index, file_index2, index_rate, filter_radius, resample_sr, rms_mix_rate, protect, format1, split_audio, crepe_hop_length, f0_min, note_min, f0_max, note_max, f0_autotune, ): global total_time total_time = 0 start_time = time.time() if not input_audio_path1: return "You need to upload an audio", None if (not os.path.exists(input_audio_path1)) and (not os.path.exists(os.path.join(now_dir, input_audio_path1))): return "Audio was not properly selected or doesn't exist", None if split_audio: resultm, new_dir_path = merge.process_audio(input_audio_path1) print(resultm) print("------") print(new_dir_path) if resultm == "Finish": file_index = ( ( file_index.strip(" ") .strip('"') .strip("\n") .strip('"') .strip(" ") .replace("trained", "added") ) if file_index != "" else file_index2 ) # 防止小白写错,自动帮他替换掉 # Use the code from vc_multi to process the segmented audio if rvc_globals.NotesOrHertz and f0_method != 'rmvpe': f0_min = note_to_hz(note_min) if note_min else 50 f0_max = note_to_hz(note_max) if note_max else 1100 print(f"Converted Min pitch: freq - {f0_min}\n" f"Converted Max pitch: freq - {f0_max}") else: f0_min = f0_min or 50 f0_max = f0_max or 1100 try: dir_path = ( new_dir_path.strip(" ").strip('"').strip("\n").strip('"').strip(" ") ) # Prevent leading/trailing whitespace and quotes try: if dir_path != "": paths = [ os.path.join(root, name) for root, _, files in os.walk(dir_path, topdown=False) for name in files if name.endswith(tuple(sup_audioext)) and root == dir_path ] except: traceback.print_exc() print(paths) for path in paths: info, opt = self.vc_single_dont_save( sid, path, f0_up_key, None, f0_method, file_index, file_index2, # file_big_npy, index_rate, filter_radius, resample_sr, rms_mix_rate, protect, crepe_hop_length, f0_min, note_min, f0_max, note_max, f0_autotune, ) if "Success" in info: try: tgt_sr, audio_opt = opt output_filename = os.path.splitext(os.path.basename(path))[0] if format1 in ["wav", "flac"]: sf.write( "%s/%s.%s" % (new_dir_path, output_filename, format1), audio_opt, tgt_sr, ) else: path = "%s/%s.%s" % (new_dir_path, output_filename, format1) with BytesIO() as wavf: sf.write( wavf, audio_opt, tgt_sr, format="wav" ) wavf.seek(0, 0) with open(path, "wb") as outf: wav2(wavf, outf, format1) except: print(traceback.format_exc()) except: print(traceback.format_exc()) time.sleep(0.5) print("Finished processing segmented audio, now merging audio...") # Une el audio segmentado merge_timestamps_file = os.path.join(os.path.dirname(new_dir_path), f"{os.path.basename(input_audio_path1).split('.')[0]}_timestamps.txt") merge.merge_audio(merge_timestamps_file) # Calculate the elapsed time end_time = time.time() total_time = end_time - start_time merged_audio_path = os.path.join(os.path.dirname(new_dir_path), "audio-outputs", f"{os.path.basename(input_audio_path1).split('.')[0]}_merged.wav") index_info = ( "Index:\n%s." % file_index if os.path.exists(file_index) else "Index not used." ) return ( "Success.\n%s\nTime:\infer: %s." % (index_info, total_time), merged_audio_path, ) print(f"\nStarting inference for '{os.path.basename(input_audio_path1)}'") f0_up_key = int(f0_up_key) if rvc_globals.NotesOrHertz and f0_method != 'rmvpe': f0_min = note_to_hz(note_min) if note_min else 50 f0_max = note_to_hz(note_max) if note_max else 1100 print(f"Converted Min pitch: freq - {f0_min}\n" f"Converted Max pitch: freq - {f0_max}") else: f0_min = f0_min or 50 f0_max = f0_max or 1100 try: print(f"Attempting to load {input_audio_path1}....") audio = load_audio(file=input_audio_path1, sr=16000, DoFormant=rvc_globals.DoFormant, Quefrency=rvc_globals.Quefrency, Timbre=rvc_globals.Timbre) audio_max = np.abs(audio).max() / 0.95 if audio_max > 1: audio /= audio_max times = [0, 0, 0] if self.hubert_model is None: self.hubert_model = load_hubert(self.config) try: self.if_f0 = self.cpt.get("f0", 1) except NameError: message = "Model was not properly selected" print(message) return message, None file_index = ( ( file_index.strip(" ") .strip('"') .strip("\n") .strip('"') .strip(" ") .replace("trained", "added") ) if file_index != "" else file_index2 ) # 防止小白写错,自动帮他替换掉 try: audio_opt = self.pipeline.pipeline( self.hubert_model, self.net_g, sid, audio, input_audio_path1, times, f0_up_key, f0_method, file_index, index_rate, self.if_f0, filter_radius, self.tgt_sr, resample_sr, rms_mix_rate, self.version, protect, crepe_hop_length, f0_autotune, f0_file=f0_file, f0_min=f0_min, f0_max=f0_max ) except AssertionError: message = "Mismatching index version detected (v1 with v2, or v2 with v1)." print(message) return message, None except NameError: message = "RVC libraries are still loading. Please try again in a few seconds." print(message) return message, None if self.tgt_sr != resample_sr >= 16000: tgt_sr = resample_sr else: tgt_sr = self.tgt_sr index_info = ( "Index:\n%s." % file_index if os.path.exists(file_index) else "Index not used." ) end_time = time.time() total_time = end_time - start_time opt_root = "assets/audios/audio-outputs" os.makedirs(opt_root, exist_ok=True) output_count = 1 while True: opt_filename = f"generated_audio_{output_count}.{format1}" current_output_path = os.path.join(opt_root, opt_filename) if not os.path.exists(current_output_path): break output_count += 1 try: if format1 in ["wav", "flac"]: sf.write( current_output_path, audio_opt, self.tgt_sr, ) print(f"💾 Generated audio saved to: {current_output_path}") else: with BytesIO() as wavf: sf.write( wavf, audio_opt, self.tgt_sr, format="wav" ) wavf.seek(0, 0) with open(current_output_path, "wb") as outf: wav2(wavf, outf, format1) print(f"💾 Generated audio saved to: {current_output_path}") except: info = traceback.format_exc() return ( "Success.\n%s\nTime:\nnpy: %.2fs, f0: %.2fs, infer: %.2fs." % (index_info, *times), (tgt_sr, audio_opt), ) except: info = traceback.format_exc() logger.warn(info) return info, (None, None) def vc_single_dont_save( self, sid, input_audio_path1, f0_up_key, f0_file, f0_method, file_index, file_index2, index_rate, filter_radius, resample_sr, rms_mix_rate, protect, crepe_hop_length, f0_min, note_min, f0_max, note_max, f0_autotune, ): global total_time total_time = 0 start_time = time.time() if not input_audio_path1: return "You need to upload an audio", None if (not os.path.exists(input_audio_path1)) and (not os.path.exists(os.path.join(now_dir, input_audio_path1))): return "Audio was not properly selected or doesn't exist", None print(f"\nStarting inference for '{os.path.basename(input_audio_path1)}'") f0_up_key = int(f0_up_key) if rvc_globals.NotesOrHertz and f0_method != 'rmvpe': f0_min = note_to_hz(note_min) if note_min else 50 f0_max = note_to_hz(note_max) if note_max else 1100 print(f"Converted Min pitch: freq - {f0_min}\n" f"Converted Max pitch: freq - {f0_max}") else: f0_min = f0_min or 50 f0_max = f0_max or 1100 try: print(f"Attempting to load {input_audio_path1}....") audio = load_audio(file=input_audio_path1, sr=16000, DoFormant=rvc_globals.DoFormant, Quefrency=rvc_globals.Quefrency, Timbre=rvc_globals.Timbre) audio_max = np.abs(audio).max() / 0.95 if audio_max > 1: audio /= audio_max times = [0, 0, 0] if self.hubert_model is None: self.hubert_model = load_hubert(self.config) try: self.if_f0 = self.cpt.get("f0", 1) except NameError: message = "Model was not properly selected" print(message) return message, None file_index = ( ( file_index.strip(" ") .strip('"') .strip("\n") .strip('"') .strip(" ") .replace("trained", "added") ) if file_index != "" else file_index2 ) # 防止小白写错,自动帮他替换掉 try: audio_opt = self.pipeline.pipeline( self.hubert_model, self.net_g, sid, audio, input_audio_path1, times, f0_up_key, f0_method, file_index, index_rate, self.if_f0, filter_radius, self.tgt_sr, resample_sr, rms_mix_rate, self.version, protect, crepe_hop_length, f0_autotune, f0_file=f0_file, f0_min=f0_min, f0_max=f0_max ) except AssertionError: message = "Mismatching index version detected (v1 with v2, or v2 with v1)." print(message) return message, None except NameError: message = "RVC libraries are still loading. Please try again in a few seconds." print(message) return message, None if self.tgt_sr != resample_sr >= 16000: tgt_sr = resample_sr else: tgt_sr = self.tgt_sr index_info = ( "Index:\n%s." % file_index if os.path.exists(file_index) else "Index not used." ) end_time = time.time() total_time = end_time - start_time return ( "Success.\n%s\nTime:\nnpy: %.2fs, f0: %.2fs, infer: %.2fs." % (index_info, *times), (tgt_sr, audio_opt), ) except: info = traceback.format_exc() logger.warn(info) return info, (None, None) def vc_multi( self, sid, dir_path, opt_root, paths, f0_up_key, f0_method, file_index, file_index2, index_rate, filter_radius, resample_sr, rms_mix_rate, protect, format1, crepe_hop_length, f0_min, note_min, f0_max, note_max, f0_autotune, ): if rvc_globals.NotesOrHertz and f0_method != 'rmvpe': f0_min = note_to_hz(note_min) if note_min else 50 f0_max = note_to_hz(note_max) if note_max else 1100 print(f"Converted Min pitch: freq - {f0_min}\n" f"Converted Max pitch: freq - {f0_max}") else: f0_min = f0_min or 50 f0_max = f0_max or 1100 try: dir_path = ( dir_path.strip(" ").strip('"').strip("\n").strip('"').strip(" ") ) # 防止小白拷路径头尾带了空格和"和回车 opt_root = opt_root.strip(" ").strip('"').strip("\n").strip('"').strip(" ") os.makedirs(opt_root, exist_ok=True) try: if dir_path != "": paths = [ os.path.join(root, name) for root, _, files in os.walk(dir_path, topdown=False) for name in files if name.endswith(tuple(sup_audioext)) and root == dir_path ] else: paths = [path.name for path in paths] except: traceback.print_exc() paths = [path.name for path in paths] infos = [] print(paths) for path in paths: info, opt = self.vc_single_dont_save( sid, path, f0_up_key, None, f0_method, file_index, file_index2, # file_big_npy, index_rate, filter_radius, resample_sr, rms_mix_rate, protect, crepe_hop_length, f0_min, note_min, f0_max, note_max, f0_autotune, ) if "Success" in info: try: tgt_sr, audio_opt = opt if format1 in ["wav", "flac"]: sf.write( "%s/%s.%s" % (opt_root, os.path.basename(path), format1), audio_opt, tgt_sr, ) else: path = "%s/%s.%s" % (opt_root, os.path.basename(path), format1) with BytesIO() as wavf: sf.write( wavf, audio_opt, tgt_sr, format="wav" ) wavf.seek(0, 0) with open(path, "wb") as outf: wav2(wavf, outf, format1) except: info += traceback.format_exc() infos.append("%s->%s" % (os.path.basename(path), info)) yield "\n".join(infos) yield "\n".join(infos) except: yield traceback.format_exc()