Spaces:
Running
Running
import argparse | |
import logging | |
import shutil | |
import sys | |
import time | |
from pathlib import Path | |
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union | |
import numpy as np | |
import soundfile as sf | |
import torch | |
from typeguard import check_argument_types | |
from espnet2.torch_utils.set_all_random_seed import set_all_random_seed | |
from espnet2.bin.s2st_inference import Speech2Speech | |
def s2st_inference( | |
speech: torch.Tensor, | |
ngpu: int = 0, | |
seed: int = 2023, | |
log_level: Union[int, str] = 'INFO', | |
train_config: Optional[str] = None, | |
model_file: Optional[str] = None, | |
threshold: float = 0.5, | |
minlenratio: float = 0, | |
maxlenratio: float = 10.0, | |
st_subtask_minlenratio: float = 0, | |
st_subtask_maxlenratio: float = 1.5, | |
use_teacher_forcing: bool = False, | |
use_att_constraint: bool = False, | |
backward_window: int = 1, | |
forward_window: int = 3, | |
always_fix_seed: bool = False, | |
beam_size: int = 5, | |
penalty: float = 0, | |
st_subtask_beam_size: int = 5, | |
st_subtask_penalty: float = 0, | |
st_subtask_token_type: Optional[str] = None, | |
st_subtask_bpemodel: Optional[str] = None, | |
vocoder_config: Optional[str] = None, | |
vocoder_file: Optional[str] = None, | |
vocoder_tag: Optional[str] = None, | |
): | |
"""Run text-to-speech inference.""" | |
assert check_argument_types() | |
if ngpu > 1: | |
raise NotImplementedError("only single GPU decoding is supported") | |
logging.basicConfig( | |
level=log_level, | |
format="%(asctime)s (%(module)s:%(lineno)d) %(levelname)s: %(message)s", | |
) | |
if ngpu >= 1: | |
device = "cuda" | |
else: | |
device = "cpu" | |
# 1. Set random-seed | |
set_all_random_seed(seed) | |
# 2. Build model | |
speech2speech_kwargs = dict( | |
train_config=train_config, | |
model_file=model_file, | |
threshold=threshold, | |
maxlenratio=maxlenratio, | |
minlenratio=minlenratio, | |
st_subtask_maxlenratio=st_subtask_maxlenratio, | |
st_subtask_minlenratio=st_subtask_minlenratio, | |
use_teacher_forcing=use_teacher_forcing, | |
use_att_constraint=use_att_constraint, | |
backward_window=backward_window, | |
forward_window=forward_window, | |
beam_size=beam_size, | |
penalty=penalty, | |
st_subtask_beam_size=st_subtask_beam_size, | |
st_subtask_penalty=st_subtask_penalty, | |
st_subtask_token_type=st_subtask_token_type, | |
st_subtask_bpemodel=st_subtask_bpemodel, | |
vocoder_config=vocoder_config, | |
vocoder_file=vocoder_file, | |
device=device, | |
seed=seed, | |
always_fix_seed=always_fix_seed, | |
) | |
speech2speech = Speech2Speech.from_pretrained( | |
vocoder_tag=vocoder_tag, | |
**speech2speech_kwargs, | |
) | |
start_time = time.perf_counter() | |
speech_lengths = torch.as_tensor([speech.shape[0]]) | |
output_dict = speech2speech(speech.unsqueeze(0), speech_lengths) | |
insize = speech.size(0) + 1 | |
# standard speech2mel model case | |
feat_gen = output_dict["feat_gen"] | |
logging.info( | |
f"inference speed = {int(feat_gen.size(0)) / (time.perf_counter() - start_time):.1f} frames / sec." | |
) | |
logging.info(f"(size:{insize}->{feat_gen.size(0)})") | |
if feat_gen.size(0) == insize * maxlenratio: | |
logging.warning(f"output length reaches maximum length.") | |
feat_gen = output_dict["feat_gen"].cpu().numpy() | |
if output_dict.get("feat_gen_denorm") is not None: | |
feat_gen_denorm = output_dict["feat_gen_denorm"].cpu().numpy() | |
assert 'wav' in output_dict | |
wav = output_dict["wav"].cpu().numpy() | |
logging.info(f"wav {len(wav)}") | |
return wav | |
# if output_dict.get("st_subtask_token") is not None: | |
# writer["token"][key] = " ".join(output_dict["st_subtask_token"]) | |
# writer["token_int"][key] == " ".join( | |
# map(str, output_dict["st_subtask_token_int"]) | |
# ) | |
# if output_dict.get("st_subtask_text") is not None: | |
# writer["text"][key] = output_dict["st_subtask_text"] | |