|
from abc import ABC |
|
from abc import abstractmethod |
|
from pathlib import Path |
|
from typing import Collection |
|
from typing import Dict |
|
from typing import Iterable |
|
from typing import Union |
|
|
|
import numpy as np |
|
import scipy.signal |
|
import soundfile |
|
from typeguard import check_argument_types |
|
from typeguard import check_return_type |
|
|
|
from espnet2.text.build_tokenizer import build_tokenizer |
|
from espnet2.text.cleaner import TextCleaner |
|
from espnet2.text.token_id_converter import TokenIDConverter |
|
|
|
|
|
class AbsPreprocessor(ABC): |
|
def __init__(self, train: bool): |
|
self.train = train |
|
|
|
@abstractmethod |
|
def __call__( |
|
self, uid: str, data: Dict[str, Union[str, np.ndarray]] |
|
) -> Dict[str, np.ndarray]: |
|
raise NotImplementedError |
|
|
|
|
|
def framing( |
|
x, |
|
frame_length: int = 512, |
|
frame_shift: int = 256, |
|
centered: bool = True, |
|
padded: bool = True, |
|
): |
|
if x.size == 0: |
|
raise ValueError("Input array size is zero") |
|
if frame_length < 1: |
|
raise ValueError("frame_length must be a positive integer") |
|
if frame_length > x.shape[-1]: |
|
raise ValueError("frame_length is greater than input length") |
|
if 0 >= frame_shift: |
|
raise ValueError("frame_shift must be greater than 0") |
|
|
|
if centered: |
|
pad_shape = [(0, 0) for _ in range(x.ndim - 1)] + [ |
|
(frame_length // 2, frame_length // 2) |
|
] |
|
x = np.pad(x, pad_shape, mode="constant", constant_values=0) |
|
|
|
if padded: |
|
|
|
|
|
|
|
nadd = (-(x.shape[-1] - frame_length) % frame_shift) % frame_length |
|
pad_shape = [(0, 0) for _ in range(x.ndim - 1)] + [(0, nadd)] |
|
x = np.pad(x, pad_shape, mode="constant", constant_values=0) |
|
|
|
|
|
if frame_length == 1 and frame_length == frame_shift: |
|
result = x[..., None] |
|
else: |
|
shape = x.shape[:-1] + ( |
|
(x.shape[-1] - frame_length) // frame_shift + 1, |
|
frame_length, |
|
) |
|
strides = x.strides[:-1] + (frame_shift * x.strides[-1], x.strides[-1]) |
|
result = np.lib.stride_tricks.as_strided(x, shape=shape, strides=strides) |
|
return result |
|
|
|
|
|
def detect_non_silence( |
|
x: np.ndarray, |
|
threshold: float = 0.01, |
|
frame_length: int = 1024, |
|
frame_shift: int = 512, |
|
window: str = "boxcar", |
|
) -> np.ndarray: |
|
"""Power based voice activity detection. |
|
|
|
Args: |
|
x: (Channel, Time) |
|
>>> x = np.random.randn(1000) |
|
>>> detect = detect_non_silence(x) |
|
>>> assert x.shape == detect.shape |
|
>>> assert detect.dtype == np.bool |
|
""" |
|
if x.shape[-1] < frame_length: |
|
return np.full(x.shape, fill_value=True, dtype=np.bool) |
|
|
|
if x.dtype.kind == "i": |
|
x = x.astype(np.float64) |
|
|
|
framed_w = framing( |
|
x, |
|
frame_length=frame_length, |
|
frame_shift=frame_shift, |
|
centered=False, |
|
padded=True, |
|
) |
|
framed_w *= scipy.signal.get_window(window, frame_length).astype(framed_w.dtype) |
|
|
|
power = (framed_w ** 2).mean(axis=-1) |
|
|
|
mean_power = power.mean(axis=-1) |
|
if np.all(mean_power == 0): |
|
return np.full(x.shape, fill_value=True, dtype=np.bool) |
|
|
|
detect_frames = power / mean_power > threshold |
|
|
|
detects = np.broadcast_to( |
|
detect_frames[..., None], detect_frames.shape + (frame_shift,) |
|
) |
|
|
|
detects = detects.reshape(*detect_frames.shape[:-1], -1) |
|
|
|
return np.pad( |
|
detects, |
|
[(0, 0)] * (x.ndim - 1) + [(0, x.shape[-1] - detects.shape[-1])], |
|
mode="edge", |
|
) |
|
|
|
|
|
class CommonPreprocessor(AbsPreprocessor): |
|
def __init__( |
|
self, |
|
train: bool, |
|
token_type: str = None, |
|
token_list: Union[Path, str, Iterable[str]] = None, |
|
bpemodel: Union[Path, str, Iterable[str]] = None, |
|
text_cleaner: Collection[str] = None, |
|
g2p_type: str = None, |
|
unk_symbol: str = "<unk>", |
|
space_symbol: str = "<space>", |
|
non_linguistic_symbols: Union[Path, str, Iterable[str]] = None, |
|
delimiter: str = None, |
|
rir_scp: str = None, |
|
rir_apply_prob: float = 1.0, |
|
noise_scp: str = None, |
|
noise_apply_prob: float = 1.0, |
|
noise_db_range: str = "3_10", |
|
speech_volume_normalize: float = None, |
|
speech_name: str = "speech", |
|
text_name: str = "text", |
|
): |
|
super().__init__(train) |
|
self.train = train |
|
self.speech_name = speech_name |
|
self.text_name = text_name |
|
self.speech_volume_normalize = speech_volume_normalize |
|
self.rir_apply_prob = rir_apply_prob |
|
self.noise_apply_prob = noise_apply_prob |
|
|
|
if token_type is not None: |
|
if token_list is None: |
|
raise ValueError("token_list is required if token_type is not None") |
|
self.text_cleaner = TextCleaner(text_cleaner) |
|
|
|
self.tokenizer = build_tokenizer( |
|
token_type=token_type, |
|
bpemodel=bpemodel, |
|
delimiter=delimiter, |
|
space_symbol=space_symbol, |
|
non_linguistic_symbols=non_linguistic_symbols, |
|
g2p_type=g2p_type, |
|
) |
|
self.token_id_converter = TokenIDConverter( |
|
token_list=token_list, |
|
unk_symbol=unk_symbol, |
|
) |
|
else: |
|
self.text_cleaner = None |
|
self.tokenizer = None |
|
self.token_id_converter = None |
|
|
|
if train and rir_scp is not None: |
|
self.rirs = [] |
|
with open(rir_scp, "r", encoding="utf-8") as f: |
|
for line in f: |
|
sps = line.strip().split(None, 1) |
|
if len(sps) == 1: |
|
self.rirs.append(sps[0]) |
|
else: |
|
self.rirs.append(sps[1]) |
|
else: |
|
self.rirs = None |
|
|
|
if train and noise_scp is not None: |
|
self.noises = [] |
|
with open(noise_scp, "r", encoding="utf-8") as f: |
|
for line in f: |
|
sps = line.strip().split(None, 1) |
|
if len(sps) == 1: |
|
self.noises.append(sps[0]) |
|
else: |
|
self.noises.append(sps[1]) |
|
sps = noise_db_range.split("_") |
|
if len(sps) == 1: |
|
self.noise_db_low, self.noise_db_high = float(sps[0]) |
|
elif len(sps) == 2: |
|
self.noise_db_low, self.noise_db_high = float(sps[0]), float(sps[1]) |
|
else: |
|
raise ValueError( |
|
"Format error: '{noise_db_range}' e.g. -3_4 -> [-3db,4db]" |
|
) |
|
else: |
|
self.noises = None |
|
|
|
def __call__( |
|
self, uid: str, data: Dict[str, Union[str, np.ndarray]] |
|
) -> Dict[str, np.ndarray]: |
|
assert check_argument_types() |
|
|
|
if self.speech_name in data: |
|
if self.train and self.rirs is not None and self.noises is not None: |
|
speech = data[self.speech_name] |
|
nsamples = len(speech) |
|
|
|
|
|
if speech.ndim == 1: |
|
speech = speech[None, :] |
|
else: |
|
speech = speech.T |
|
|
|
power = (speech[detect_non_silence(speech)] ** 2).mean() |
|
|
|
|
|
if self.rirs is not None and self.rir_apply_prob >= np.random.random(): |
|
rir_path = np.random.choice(self.rirs) |
|
if rir_path is not None: |
|
rir, _ = soundfile.read( |
|
rir_path, dtype=np.float64, always_2d=True |
|
) |
|
|
|
|
|
rir = rir.T |
|
|
|
|
|
|
|
speech = scipy.signal.convolve(speech, rir, mode="full")[ |
|
:, : speech.shape[1] |
|
] |
|
|
|
power2 = (speech[detect_non_silence(speech)] ** 2).mean() |
|
speech = np.sqrt(power / max(power2, 1e-10)) * speech |
|
|
|
|
|
if ( |
|
self.noises is not None |
|
and self.rir_apply_prob >= np.random.random() |
|
): |
|
noise_path = np.random.choice(self.noises) |
|
if noise_path is not None: |
|
noise_db = np.random.uniform( |
|
self.noise_db_low, self.noise_db_high |
|
) |
|
with soundfile.SoundFile(noise_path) as f: |
|
if f.frames == nsamples: |
|
noise = f.read(dtype=np.float64, always_2d=True) |
|
elif f.frames < nsamples: |
|
offset = np.random.randint(0, nsamples - f.frames) |
|
|
|
noise = f.read(dtype=np.float64, always_2d=True) |
|
|
|
noise = np.pad( |
|
noise, |
|
[(offset, nsamples - f.frames - offset), (0, 0)], |
|
mode="wrap", |
|
) |
|
else: |
|
offset = np.random.randint(0, f.frames - nsamples) |
|
f.seek(offset) |
|
|
|
noise = f.read( |
|
nsamples, dtype=np.float64, always_2d=True |
|
) |
|
if len(noise) != nsamples: |
|
raise RuntimeError(f"Something wrong: {noise_path}") |
|
|
|
noise = noise.T |
|
|
|
noise_power = (noise ** 2).mean() |
|
scale = ( |
|
10 ** (-noise_db / 20) |
|
* np.sqrt(power) |
|
/ np.sqrt(max(noise_power, 1e-10)) |
|
) |
|
speech = speech + scale * noise |
|
|
|
speech = speech.T |
|
ma = np.max(np.abs(speech)) |
|
if ma > 1.0: |
|
speech /= ma |
|
data[self.speech_name] = speech |
|
|
|
if self.speech_volume_normalize is not None: |
|
speech = data[self.speech_name] |
|
ma = np.max(np.abs(speech)) |
|
data[self.speech_name] = speech * self.speech_volume_normalize / ma |
|
|
|
if self.text_name in data and self.tokenizer is not None: |
|
text = data[self.text_name] |
|
text = self.text_cleaner(text) |
|
tokens = self.tokenizer.text2tokens(text) |
|
text_ints = self.token_id_converter.tokens2ids(tokens) |
|
data[self.text_name] = np.array(text_ints, dtype=np.int64) |
|
assert check_return_type(data) |
|
return data |
|
|
|
|
|
class CommonPreprocessor_multi(AbsPreprocessor): |
|
def __init__( |
|
self, |
|
train: bool, |
|
token_type: str = None, |
|
token_list: Union[Path, str, Iterable[str]] = None, |
|
bpemodel: Union[Path, str, Iterable[str]] = None, |
|
text_cleaner: Collection[str] = None, |
|
g2p_type: str = None, |
|
unk_symbol: str = "<unk>", |
|
space_symbol: str = "<space>", |
|
non_linguistic_symbols: Union[Path, str, Iterable[str]] = None, |
|
delimiter: str = None, |
|
speech_name: str = "speech", |
|
text_name: list = ["text"], |
|
): |
|
super().__init__(train) |
|
self.train = train |
|
self.speech_name = speech_name |
|
self.text_name = text_name |
|
|
|
if token_type is not None: |
|
if token_list is None: |
|
raise ValueError("token_list is required if token_type is not None") |
|
self.text_cleaner = TextCleaner(text_cleaner) |
|
|
|
self.tokenizer = build_tokenizer( |
|
token_type=token_type, |
|
bpemodel=bpemodel, |
|
delimiter=delimiter, |
|
space_symbol=space_symbol, |
|
non_linguistic_symbols=non_linguistic_symbols, |
|
g2p_type=g2p_type, |
|
) |
|
self.token_id_converter = TokenIDConverter( |
|
token_list=token_list, |
|
unk_symbol=unk_symbol, |
|
) |
|
else: |
|
self.text_cleaner = None |
|
self.tokenizer = None |
|
self.token_id_converter = None |
|
|
|
def __call__( |
|
self, uid: str, data: Dict[str, Union[str, np.ndarray]] |
|
) -> Dict[str, np.ndarray]: |
|
assert check_argument_types() |
|
|
|
if self.speech_name in data: |
|
|
|
|
|
|
|
|
|
|
|
pass |
|
|
|
for text_n in self.text_name: |
|
if text_n in data and self.tokenizer is not None: |
|
text = data[text_n] |
|
text = self.text_cleaner(text) |
|
tokens = self.tokenizer.text2tokens(text) |
|
text_ints = self.token_id_converter.tokens2ids(tokens) |
|
data[text_n] = np.array(text_ints, dtype=np.int64) |
|
assert check_return_type(data) |
|
return data |
|
|