from __future__ import annotations import json import multiprocessing import os from copy import copy from logging import getLogger from pathlib import Path import PySimpleGUI as sg import sounddevice as sd import soundfile as sf import torch from pebble import ProcessFuture, ProcessPool from . import __version__ from .utils import get_optimal_device GUI_DEFAULT_PRESETS_PATH = Path(__file__).parent / "default_gui_presets.json" GUI_PRESETS_PATH = Path("./user_gui_presets.json").absolute() LOG = getLogger(__name__) def play_audio(path: Path | str): if isinstance(path, Path): path = path.as_posix() data, sr = sf.read(path) sd.play(data, sr) def load_presets() -> dict: defaults = json.loads(GUI_DEFAULT_PRESETS_PATH.read_text("utf-8")) users = ( json.loads(GUI_PRESETS_PATH.read_text("utf-8")) if GUI_PRESETS_PATH.exists() else {} ) # prioriy: defaults > users # order: defaults -> users return {**defaults, **users, **defaults} def add_preset(name: str, preset: dict) -> dict: presets = load_presets() presets[name] = preset with GUI_PRESETS_PATH.open("w") as f: json.dump(presets, f, indent=2) return load_presets() def delete_preset(name: str) -> dict: presets = load_presets() if name in presets: del presets[name] else: LOG.warning(f"Cannot delete preset {name} because it does not exist.") with GUI_PRESETS_PATH.open("w") as f: json.dump(presets, f, indent=2) return load_presets() def get_output_path(input_path: Path) -> Path: # Default output path output_path = input_path.parent / f"{input_path.stem}.out{input_path.suffix}" # Increment file number in path if output file already exists file_num = 1 while output_path.exists(): output_path = ( input_path.parent / f"{input_path.stem}.out_{file_num}{input_path.suffix}" ) file_num += 1 return output_path def get_supported_file_types() -> tuple[tuple[str, str], ...]: res = tuple( [ (extension, f".{extension.lower()}") for extension in sf.available_formats().keys() ] ) # Sort by popularity common_file_types = ["WAV", "MP3", "FLAC", "OGG", "M4A", "WMA"] res = sorted( res, key=lambda x: common_file_types.index(x[0]) if x[0] in common_file_types else len(common_file_types), ) return res def get_supported_file_types_concat() -> tuple[tuple[str, str], ...]: return (("Audio", " ".join(sf.available_formats().keys())),) def validate_output_file_type(output_path: Path) -> bool: supported_file_types = sorted( [f".{extension.lower()}" for extension in sf.available_formats().keys()] ) if not output_path.suffix: sg.popup_ok( "Error: Output path missing file type extension, enter " + "one of the following manually:\n\n" + "\n".join(supported_file_types) ) return False if output_path.suffix.lower() not in supported_file_types: sg.popup_ok( f"Error: {output_path.suffix.lower()} is not a supported " + "extension; use one of the following:\n\n" + "\n".join(supported_file_types) ) return False return True def get_devices( update: bool = True, ) -> tuple[list[str], list[str], list[int], list[int]]: if update: sd._terminate() sd._initialize() devices = sd.query_devices() hostapis = sd.query_hostapis() for hostapi in hostapis: for device_idx in hostapi["devices"]: devices[device_idx]["hostapi_name"] = hostapi["name"] input_devices = [ f"{d['name']} ({d['hostapi_name']})" for d in devices if d["max_input_channels"] > 0 ] output_devices = [ f"{d['name']} ({d['hostapi_name']})" for d in devices if d["max_output_channels"] > 0 ] input_devices_indices = [d["index"] for d in devices if d["max_input_channels"] > 0] output_devices_indices = [ d["index"] for d in devices if d["max_output_channels"] > 0 ] return input_devices, output_devices, input_devices_indices, output_devices_indices def after_inference(window: sg.Window, path: Path, auto_play: bool, output_path: Path): try: LOG.info(f"Finished inference for {path.stem}{path.suffix}") window["infer"].update(disabled=False) if auto_play: play_audio(output_path) except Exception as e: LOG.exception(e) def main(): LOG.info(f"version: {__version__}") # sg.theme("Dark") sg.theme_add_new( "Very Dark", { "BACKGROUND": "#111111", "TEXT": "#FFFFFF", "INPUT": "#444444", "TEXT_INPUT": "#FFFFFF", "SCROLL": "#333333", "BUTTON": ("white", "#112233"), "PROGRESS": ("#111111", "#333333"), "BORDER": 2, "SLIDER_DEPTH": 2, "PROGRESS_DEPTH": 2, }, ) sg.theme("Very Dark") model_candidates = list(sorted(Path("./logs/44k/").glob("G_*.pth"))) frame_contents = { "Paths": [ [ sg.Text("Model path"), sg.Push(), sg.InputText( key="model_path", default_text=model_candidates[-1].absolute().as_posix() if model_candidates else "", enable_events=True, ), sg.FileBrowse( initial_folder=Path("./logs/44k/").absolute if Path("./logs/44k/").exists() else Path(".").absolute().as_posix(), key="model_path_browse", file_types=( ("PyTorch", "G_*.pth G_*.pt"), ("Pytorch", "*.pth *.pt"), ), ), ], [ sg.Text("Config path"), sg.Push(), sg.InputText( key="config_path", default_text=Path("./configs/44k/config.json").absolute().as_posix() if Path("./configs/44k/config.json").exists() else "", enable_events=True, ), sg.FileBrowse( initial_folder=Path("./configs/44k/").as_posix() if Path("./configs/44k/").exists() else Path(".").absolute().as_posix(), key="config_path_browse", file_types=(("JSON", "*.json"),), ), ], [ sg.Text("Cluster model path (Optional)"), sg.Push(), sg.InputText( key="cluster_model_path", default_text=Path("./logs/44k/kmeans.pt").absolute().as_posix() if Path("./logs/44k/kmeans.pt").exists() else "", enable_events=True, ), sg.FileBrowse( initial_folder="./logs/44k/" if Path("./logs/44k/").exists() else ".", key="cluster_model_path_browse", file_types=(("PyTorch", "*.pt"), ("Pickle", "*.pt *.pth *.pkl")), ), ], ], "Common": [ [ sg.Text("Speaker"), sg.Push(), sg.Combo(values=[], key="speaker", size=(20, 1)), ], [ sg.Text("Silence threshold"), sg.Push(), sg.Slider( range=(-60.0, 0), orientation="h", key="silence_threshold", resolution=0.1, ), ], [ sg.Text( "Pitch (12 = 1 octave)\n" "ADJUST THIS based on your voice\n" "when Auto predict F0 is turned off.", size=(None, 4), ), sg.Push(), sg.Slider( range=(-36, 36), orientation="h", key="transpose", tick_interval=12, ), ], [ sg.Checkbox( key="auto_predict_f0", text="Auto predict F0 (Pitch may become unstable when turned on in real-time inference.)", ) ], [ sg.Text("F0 prediction method"), sg.Push(), sg.Combo( ["crepe", "crepe-tiny", "parselmouth", "dio", "harvest"], key="f0_method", ), ], [ sg.Text("Cluster infer ratio"), sg.Push(), sg.Slider( range=(0, 1.0), orientation="h", key="cluster_infer_ratio", resolution=0.01, ), ], [ sg.Text("Noise scale"), sg.Push(), sg.Slider( range=(0.0, 1.0), orientation="h", key="noise_scale", resolution=0.01, ), ], [ sg.Text("Pad seconds"), sg.Push(), sg.Slider( range=(0.0, 1.0), orientation="h", key="pad_seconds", resolution=0.01, ), ], [ sg.Text("Chunk seconds"), sg.Push(), sg.Slider( range=(0.0, 3.0), orientation="h", key="chunk_seconds", resolution=0.01, ), ], [ sg.Text("Max chunk seconds (set lower if Out Of Memory, 0 to disable)"), sg.Push(), sg.Slider( range=(0.0, 240.0), orientation="h", key="max_chunk_seconds", resolution=1.0, ), ], [ sg.Checkbox( key="absolute_thresh", text="Absolute threshold (ignored (True) in realtime inference)", ) ], ], "File": [ [ sg.Text("Input audio path"), sg.Push(), sg.InputText(key="input_path", enable_events=True), sg.FileBrowse( initial_folder=".", key="input_path_browse", file_types=get_supported_file_types_concat(), ), sg.FolderBrowse( button_text="Browse(Folder)", initial_folder=".", key="input_path_folder_browse", target="input_path", ), sg.Button("Play", key="play_input"), ], [ sg.Text("Output audio path"), sg.Push(), sg.InputText(key="output_path"), sg.FileSaveAs( initial_folder=".", key="output_path_browse", file_types=get_supported_file_types(), ), ], [sg.Checkbox(key="auto_play", text="Auto play", default=True)], ], "Realtime": [ [ sg.Text("Crossfade seconds"), sg.Push(), sg.Slider( range=(0, 0.6), orientation="h", key="crossfade_seconds", resolution=0.001, ), ], [ sg.Text( "Block seconds", # \n(big -> more robust, slower, (the same) latency)" tooltip="Big -> more robust, slower, (the same) latency", ), sg.Push(), sg.Slider( range=(0, 3.0), orientation="h", key="block_seconds", resolution=0.001, ), ], [ sg.Text( "Additional Infer seconds (before)", # \n(big -> more robust, slower)" tooltip="Big -> more robust, slower, additional latency", ), sg.Push(), sg.Slider( range=(0, 2.0), orientation="h", key="additional_infer_before_seconds", resolution=0.001, ), ], [ sg.Text( "Additional Infer seconds (after)", # \n(big -> more robust, slower, additional latency)" tooltip="Big -> more robust, slower, additional latency", ), sg.Push(), sg.Slider( range=(0, 2.0), orientation="h", key="additional_infer_after_seconds", resolution=0.001, ), ], [ sg.Text("Realtime algorithm"), sg.Push(), sg.Combo( ["2 (Divide by speech)", "1 (Divide constantly)"], default_value="1 (Divide constantly)", key="realtime_algorithm", ), ], [ sg.Text("Input device"), sg.Push(), sg.Combo( key="input_device", values=[], size=(60, 1), ), ], [ sg.Text("Output device"), sg.Push(), sg.Combo( key="output_device", values=[], size=(60, 1), ), ], [ sg.Checkbox( "Passthrough original audio (for latency check)", key="passthrough_original", default=False, ), sg.Push(), sg.Button("Refresh devices", key="refresh_devices"), ], [ sg.Frame( "Notes", [ [ sg.Text( "In Realtime Inference:\n" " - Setting F0 prediction method to 'crepe` may cause performance degradation.\n" " - Auto Predict F0 must be turned off.\n" "If the audio sounds mumbly and choppy:\n" " Case: The inference has not been made in time (Increase Block seconds)\n" " Case: Mic input is low (Decrease Silence threshold)\n" ) ] ], ), ], ], "Presets": [ [ sg.Text("Presets"), sg.Push(), sg.Combo( key="presets", values=list(load_presets().keys()), size=(40, 1), enable_events=True, ), sg.Button("Delete preset", key="delete_preset"), ], [ sg.Text("Preset name"), sg.Stretch(), sg.InputText(key="preset_name", size=(26, 1)), sg.Button("Add current settings as a preset", key="add_preset"), ], ], } # frames frames = {} for name, items in frame_contents.items(): frame = sg.Frame(name, items) frame.expand_x = True frames[name] = [frame] bottoms = [ [ sg.Checkbox( key="use_gpu", default=get_optimal_device() != torch.device("cpu"), text="Use GPU" + ( " (not available; if your device has GPU, make sure you installed PyTorch with CUDA support)" if get_optimal_device() == torch.device("cpu") else "" ), disabled=get_optimal_device() == torch.device("cpu"), ) ], [ sg.Button("Infer", key="infer"), sg.Button("(Re)Start Voice Changer", key="start_vc"), sg.Button("Stop Voice Changer", key="stop_vc"), sg.Push(), # sg.Button("ONNX Export", key="onnx_export"), ], ] column1 = sg.Column( [ frames["Paths"], frames["Common"], ], vertical_alignment="top", ) column2 = sg.Column( [ frames["File"], frames["Realtime"], frames["Presets"], ] + bottoms ) # columns layout = [[column1, column2]] # get screen size screen_width, screen_height = sg.Window.get_screen_size() if screen_height < 720: layout = [ [ sg.Column( layout, vertical_alignment="top", scrollable=False, expand_x=True, expand_y=True, vertical_scroll_only=True, key="main_column", ) ] ] window = sg.Window( f"{__name__.split('.')[0].replace('_', '-')} v{__version__}", layout, grab_anywhere=True, finalize=True, scaling=1, font=("Yu Gothic UI", 11) if os.name == "nt" else None, # resizable=True, # size=(1280, 720), # Below disables taskbar, which may be not useful for some users # use_custom_titlebar=True, no_titlebar=False # Keep on top # keep_on_top=True ) # event, values = window.read(timeout=0.01) # window["main_column"].Scrollable = True # make slider height smaller try: for v in window.element_list(): if isinstance(v, sg.Slider): v.Widget.configure(sliderrelief="flat", width=10, sliderlength=20) except Exception as e: LOG.exception(e) # for n in ["input_device", "output_device"]: # window[n].Widget.configure(justify="right") event, values = window.read(timeout=0.01) def update_speaker() -> None: from . import utils config_path = Path(values["config_path"]) if config_path.exists() and config_path.is_file(): hp = utils.get_hparams(values["config_path"]) LOG.debug(f"Loaded config from {values['config_path']}") window["speaker"].update( values=list(hp.__dict__["spk"].keys()), set_to_index=0 ) def update_devices() -> None: ( input_devices, output_devices, input_device_indices, output_device_indices, ) = get_devices() input_device_indices_reversed = { v: k for k, v in enumerate(input_device_indices) } output_device_indices_reversed = { v: k for k, v in enumerate(output_device_indices) } window["input_device"].update( values=input_devices, value=values["input_device"] ) window["output_device"].update( values=output_devices, value=values["output_device"] ) input_default, output_default = sd.default.device if values["input_device"] not in input_devices: window["input_device"].update( values=input_devices, set_to_index=input_device_indices_reversed.get(input_default, 0), ) if values["output_device"] not in output_devices: window["output_device"].update( values=output_devices, set_to_index=output_device_indices_reversed.get(output_default, 0), ) PRESET_KEYS = [ key for key in values.keys() if not any(exclude in key for exclude in ["preset", "browse"]) ] def apply_preset(name: str) -> None: for key, value in load_presets()[name].items(): if key in PRESET_KEYS: window[key].update(value) values[key] = value default_name = list(load_presets().keys())[0] apply_preset(default_name) window["presets"].update(default_name) del default_name update_speaker() update_devices() # with ProcessPool(max_workers=1) as pool: # to support Linux with ProcessPool( max_workers=min(2, multiprocessing.cpu_count()), context=multiprocessing.get_context("spawn"), ) as pool: future: None | ProcessFuture = None infer_futures: set[ProcessFuture] = set() while True: event, values = window.read(200) if event == sg.WIN_CLOSED: break if not event == sg.EVENT_TIMEOUT: LOG.info(f"Event {event}, values {values}") if event.endswith("_path"): for name in window.AllKeysDict: if str(name).endswith("_browse"): browser = window[name] if isinstance(browser, sg.Button): LOG.info( f"Updating browser {browser} to {Path(values[event]).parent}" ) browser.InitialFolder = Path(values[event]).parent browser.update() else: LOG.warning(f"Browser {browser} is not a FileBrowse") window["transpose"].update( disabled=values["auto_predict_f0"], visible=not values["auto_predict_f0"], ) input_path = Path(values["input_path"]) output_path = Path(values["output_path"]) if event == "add_preset": presets = add_preset( values["preset_name"], {key: values[key] for key in PRESET_KEYS} ) window["presets"].update(values=list(presets.keys())) elif event == "delete_preset": presets = delete_preset(values["presets"]) window["presets"].update(values=list(presets.keys())) elif event == "presets": apply_preset(values["presets"]) update_speaker() elif event == "refresh_devices": update_devices() elif event == "config_path": update_speaker() elif event == "input_path": # Don't change the output path if it's already set # if values["output_path"]: # continue # Set a sensible default output path window.Element("output_path").Update(str(get_output_path(input_path))) elif event == "infer": if "Default VC" in values["presets"]: window["presets"].update( set_to_index=list(load_presets().keys()).index("Default File") ) apply_preset("Default File") if values["input_path"] == "": LOG.warning("Input path is empty.") continue if not input_path.exists(): LOG.warning(f"Input path {input_path} does not exist.") continue # if not validate_output_file_type(output_path): # continue try: from so_vits_svc_fork.inference.main import infer LOG.info("Starting inference...") window["infer"].update(disabled=True) infer_future = pool.schedule( infer, kwargs=dict( # paths model_path=Path(values["model_path"]), output_path=output_path, input_path=input_path, config_path=Path(values["config_path"]), recursive=True, # svc config speaker=values["speaker"], cluster_model_path=Path(values["cluster_model_path"]) if values["cluster_model_path"] else None, transpose=values["transpose"], auto_predict_f0=values["auto_predict_f0"], cluster_infer_ratio=values["cluster_infer_ratio"], noise_scale=values["noise_scale"], f0_method=values["f0_method"], # slice config db_thresh=values["silence_threshold"], pad_seconds=values["pad_seconds"], chunk_seconds=values["chunk_seconds"], absolute_thresh=values["absolute_thresh"], max_chunk_seconds=values["max_chunk_seconds"], device="cpu" if not values["use_gpu"] else get_optimal_device(), ), ) infer_future.add_done_callback( lambda _future: after_inference( window, input_path, values["auto_play"], output_path ) ) infer_futures.add(infer_future) except Exception as e: LOG.exception(e) elif event == "play_input": if Path(values["input_path"]).exists(): pool.schedule(play_audio, args=[Path(values["input_path"])]) elif event == "start_vc": _, _, input_device_indices, output_device_indices = get_devices( update=False ) from so_vits_svc_fork.inference.main import realtime if future: LOG.info("Canceling previous task") future.cancel() future = pool.schedule( realtime, kwargs=dict( # paths model_path=Path(values["model_path"]), config_path=Path(values["config_path"]), speaker=values["speaker"], # svc config cluster_model_path=Path(values["cluster_model_path"]) if values["cluster_model_path"] else None, transpose=values["transpose"], auto_predict_f0=values["auto_predict_f0"], cluster_infer_ratio=values["cluster_infer_ratio"], noise_scale=values["noise_scale"], f0_method=values["f0_method"], # slice config db_thresh=values["silence_threshold"], pad_seconds=values["pad_seconds"], chunk_seconds=values["chunk_seconds"], # realtime config crossfade_seconds=values["crossfade_seconds"], additional_infer_before_seconds=values[ "additional_infer_before_seconds" ], additional_infer_after_seconds=values[ "additional_infer_after_seconds" ], block_seconds=values["block_seconds"], version=int(values["realtime_algorithm"][0]), input_device=input_device_indices[ window["input_device"].widget.current() ], output_device=output_device_indices[ window["output_device"].widget.current() ], device=get_optimal_device() if values["use_gpu"] else "cpu", passthrough_original=values["passthrough_original"], ), ) elif event == "stop_vc": if future: future.cancel() future = None elif event == "onnx_export": try: raise NotImplementedError("ONNX export is not implemented yet.") from so_vits_svc_fork.modules.onnx._export import onnx_export onnx_export( input_path=Path(values["model_path"]), output_path=Path(values["model_path"]).with_suffix(".onnx"), config_path=Path(values["config_path"]), device="cpu", ) except Exception as e: LOG.exception(e) if future is not None and future.done(): try: future.result() except Exception as e: LOG.error("Error in realtime: ") LOG.exception(e) future = None for future in copy(infer_futures): if future.done(): try: future.result() except Exception as e: LOG.error("Error in inference: ") LOG.exception(e) infer_futures.remove(future) if future: future.cancel() window.close()