# Implementation of the TCN proposed in # Luo. et al. "Conv-tasnet: Surpassing ideal time–frequency # magnitude masking for speech separation." # # The code is based on: # https://github.com/kaituoxu/Conv-TasNet/blob/master/src/conv_tasnet.py # import torch import torch.nn as nn import torch.nn.functional as F EPS = torch.finfo(torch.get_default_dtype()).eps class TemporalConvNet(nn.Module): def __init__( self, N, B, H, P, X, R, C, norm_type="gLN", causal=False, mask_nonlinear="relu" ): """Basic Module of tasnet. Args: N: Number of filters in autoencoder B: Number of channels in bottleneck 1 * 1-conv block H: Number of channels in convolutional blocks P: Kernel size in convolutional blocks X: Number of convolutional blocks in each repeat R: Number of repeats C: Number of speakers norm_type: BN, gLN, cLN causal: causal or non-causal mask_nonlinear: use which non-linear function to generate mask """ super().__init__() # Hyper-parameter self.C = C self.mask_nonlinear = mask_nonlinear # Components # [M, N, K] -> [M, N, K] layer_norm = ChannelwiseLayerNorm(N) # [M, N, K] -> [M, B, K] bottleneck_conv1x1 = nn.Conv1d(N, B, 1, bias=False) # [M, B, K] -> [M, B, K] repeats = [] for r in range(R): blocks = [] for x in range(X): dilation = 2 ** x padding = (P - 1) * dilation if causal else (P - 1) * dilation // 2 blocks += [ TemporalBlock( B, H, P, stride=1, padding=padding, dilation=dilation, norm_type=norm_type, causal=causal, ) ] repeats += [nn.Sequential(*blocks)] temporal_conv_net = nn.Sequential(*repeats) # [M, B, K] -> [M, C*N, K] mask_conv1x1 = nn.Conv1d(B, C * N, 1, bias=False) # Put together self.network = nn.Sequential( layer_norm, bottleneck_conv1x1, temporal_conv_net, mask_conv1x1 ) def forward(self, mixture_w): """Keep this API same with TasNet. Args: mixture_w: [M, N, K], M is batch size Returns: est_mask: [M, C, N, K] """ M, N, K = mixture_w.size() score = self.network(mixture_w) # [M, N, K] -> [M, C*N, K] score = score.view(M, self.C, N, K) # [M, C*N, K] -> [M, C, N, K] if self.mask_nonlinear == "softmax": est_mask = F.softmax(score, dim=1) elif self.mask_nonlinear == "relu": est_mask = F.relu(score) elif self.mask_nonlinear == "sigmoid": est_mask = F.sigmoid(score) elif self.mask_nonlinear == "tanh": est_mask = F.tanh(score) else: raise ValueError("Unsupported mask non-linear function") return est_mask class TemporalBlock(nn.Module): def __init__( self, in_channels, out_channels, kernel_size, stride, padding, dilation, norm_type="gLN", causal=False, ): super().__init__() # [M, B, K] -> [M, H, K] conv1x1 = nn.Conv1d(in_channels, out_channels, 1, bias=False) prelu = nn.PReLU() norm = chose_norm(norm_type, out_channels) # [M, H, K] -> [M, B, K] dsconv = DepthwiseSeparableConv( out_channels, in_channels, kernel_size, stride, padding, dilation, norm_type, causal, ) # Put together self.net = nn.Sequential(conv1x1, prelu, norm, dsconv) def forward(self, x): """Forward. Args: x: [M, B, K] Returns: [M, B, K] """ residual = x out = self.net(x) # TODO(Jing): when P = 3 here works fine, but when P = 2 maybe need to pad? return out + residual # look like w/o F.relu is better than w/ F.relu # return F.relu(out + residual) class DepthwiseSeparableConv(nn.Module): def __init__( self, in_channels, out_channels, kernel_size, stride, padding, dilation, norm_type="gLN", causal=False, ): super().__init__() # Use `groups` option to implement depthwise convolution # [M, H, K] -> [M, H, K] depthwise_conv = nn.Conv1d( in_channels, in_channels, kernel_size, stride=stride, padding=padding, dilation=dilation, groups=in_channels, bias=False, ) if causal: chomp = Chomp1d(padding) prelu = nn.PReLU() norm = chose_norm(norm_type, in_channels) # [M, H, K] -> [M, B, K] pointwise_conv = nn.Conv1d(in_channels, out_channels, 1, bias=False) # Put together if causal: self.net = nn.Sequential(depthwise_conv, chomp, prelu, norm, pointwise_conv) else: self.net = nn.Sequential(depthwise_conv, prelu, norm, pointwise_conv) def forward(self, x): """Forward. Args: x: [M, H, K] Returns: result: [M, B, K] """ return self.net(x) class Chomp1d(nn.Module): """To ensure the output length is the same as the input.""" def __init__(self, chomp_size): super().__init__() self.chomp_size = chomp_size def forward(self, x): """Forward. Args: x: [M, H, Kpad] Returns: [M, H, K] """ return x[:, :, : -self.chomp_size].contiguous() def check_nonlinear(nolinear_type): if nolinear_type not in ["softmax", "relu"]: raise ValueError("Unsupported nonlinear type") def chose_norm(norm_type, channel_size): """The input of normalization will be (M, C, K), where M is batch size. C is channel size and K is sequence length. """ if norm_type == "gLN": return GlobalLayerNorm(channel_size) elif norm_type == "cLN": return ChannelwiseLayerNorm(channel_size) elif norm_type == "BN": # Given input (M, C, K), nn.BatchNorm1d(C) will accumulate statics # along M and K, so this BN usage is right. return nn.BatchNorm1d(channel_size) else: raise ValueError("Unsupported normalization type") class ChannelwiseLayerNorm(nn.Module): """Channel-wise Layer Normalization (cLN).""" def __init__(self, channel_size): super().__init__() self.gamma = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1] self.beta = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1] self.reset_parameters() def reset_parameters(self): self.gamma.data.fill_(1) self.beta.data.zero_() def forward(self, y): """Forward. Args: y: [M, N, K], M is batch size, N is channel size, K is length Returns: cLN_y: [M, N, K] """ mean = torch.mean(y, dim=1, keepdim=True) # [M, 1, K] var = torch.var(y, dim=1, keepdim=True, unbiased=False) # [M, 1, K] cLN_y = self.gamma * (y - mean) / torch.pow(var + EPS, 0.5) + self.beta return cLN_y class GlobalLayerNorm(nn.Module): """Global Layer Normalization (gLN).""" def __init__(self, channel_size): super().__init__() self.gamma = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1] self.beta = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1] self.reset_parameters() def reset_parameters(self): self.gamma.data.fill_(1) self.beta.data.zero_() def forward(self, y): """Forward. Args: y: [M, N, K], M is batch size, N is channel size, K is length Returns: gLN_y: [M, N, K] """ mean = y.mean(dim=1, keepdim=True).mean(dim=2, keepdim=True) # [M, 1, 1] var = ( (torch.pow(y - mean, 2)).mean(dim=1, keepdim=True).mean(dim=2, keepdim=True) ) gLN_y = self.gamma * (y - mean) / torch.pow(var + EPS, 0.5) + self.beta return gLN_y