ITO-Master / modules /utils_data_normalization.py
jhtonyKoo's picture
modify app
6bbce1b
import os
import sys
import time
import numpy as np
import scipy
import librosa
import pyloudnorm as pyln
sys.setrecursionlimit(int(1e6))
import sklearn
currentdir = os.path.dirname(os.path.realpath(__file__))
sys.path.append(currentdir)
from common_miscellaneous import compute_stft, compute_istft
from common_audioeffects import Panner, Compressor, AugmentationChain, ConvolutionalReverb, Equaliser, AlgorithmicReverb
import fx_utils
import soundfile as sf
import aubio
import time
import warnings
import torch
import torchaudio.functional as F
# Functions
def print_dict(dict_):
for i in dict_:
print(i)
for j in dict_[i]:
print('\t', j)
def amp_to_db(x):
return 20*np.log10(x + 1e-30)
def db_to_amp(x):
return 10**(x/20)
def get_running_stats(x, features, N=20):
mean = []
std = []
for i in range(len(features)):
mean_, std_ = running_mean_std(x[:,i], N)
mean.append(mean_)
std.append(std_)
mean = np.asarray(mean)
std = np.asarray(std)
return mean, std
def running_mean_std(x, N):
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
cumsum = np.cumsum(np.insert(x, 0, 0))
cumsum2 = np.cumsum(np.insert(x**2, 0, 0))
mean = (cumsum[N:] - cumsum[:-N]) / float(N)
std = np.sqrt(((cumsum2[N:] - cumsum2[:-N]) / N) - (mean * mean))
return mean, std
def get_eq_matching(audio_t, ref_spec, sr=44100, n_fft=65536, hop_length=16384,
min_db=-50, ntaps=101, lufs=-30):
audio_t = np.copy(audio_t)
max_db = amp_to_db(np.max(np.abs(audio_t)))
if max_db > min_db:
audio_t = fx_utils.lufs_normalize(audio_t, sr, lufs, log=False)
audio_D = compute_stft(np.expand_dims(audio_t, 1),
hop_length,
n_fft,
np.sqrt(np.hanning(n_fft+1)[:-1]))
audio_D = np.abs(audio_D)
audio_D_avg = np.mean(audio_D, axis=0)[0]
m = ref_spec.shape[0]
Ts = 1.0/sr # sampling interval
n = m # length of the signal
kk = np.arange(n)
T = n/sr
frq = kk/T # two sides frequency range
frq /=2
diff_eq = amp_to_db(ref_spec)-amp_to_db(audio_D_avg)
diff_eq = db_to_amp(diff_eq)
diff_eq = np.sqrt(diff_eq)
diff_filter = scipy.signal.firwin2(ntaps,
frq/np.max(frq),
diff_eq,
nfreqs=None, window='hamming',
nyq=None, antisymmetric=False)
output = scipy.signal.filtfilt(diff_filter, 1, audio_t,
axis=-1, padtype='odd', padlen=None,
method='pad', irlen=None)
else:
output = audio_t
return output
def get_eq_matching_gpu(audio_t, ref_spec, sr=44100, n_fft=65536, hop_length=16384,
min_db=-50, ntaps=101, lufs=-30):
audio_t = np.copy(audio_t)
max_db = amp_to_db(np.max(np.abs(audio_t)))
if max_db > min_db:
start_time = time.time()
audio_t = fx_utils.lufs_normalize(audio_t, sr, lufs, log=False)
# audio_D = compute_stft(np.expand_dims(audio_t, 1),
# hop_length,
# n_fft,
# np.sqrt(np.hanning(n_fft+1)[:-1]))
audio_D = compute_stft(audio_t,
hop_length,
n_fft,
np.sqrt(np.hanning(n_fft+1)[:-1]))
audio_D = np.abs(audio_D)
# audio_D_avg = np.mean(audio_D, axis=0)
audio_D_avg = np.mean(audio_D, axis=0)[0]
m = ref_spec.shape[0]
Ts = 1.0/sr # sampling interval
n = m # length of the signal
kk = np.arange(n)
T = n/sr
frq = kk/T # two sides frequency range
frq /=2
diff_eq_l = amp_to_db(ref_spec)-amp_to_db(audio_D_avg)
diff_eq_l = db_to_amp(diff_eq_l)
diff_eq_l = np.sqrt(diff_eq_l)
diff_eq_r = amp_to_db(ref_spec)-amp_to_db(audio_D_avg)
diff_eq_r = db_to_amp(diff_eq_r)
diff_eq_r = np.sqrt(diff_eq_r)
diff_filter_l = scipy.signal.firwin2(ntaps,
frq/np.max(frq),
diff_eq_l,
nfreqs=None, window='hamming',
nyq=None, antisymmetric=False)
diff_filter_r = scipy.signal.firwin2(ntaps,
frq/np.max(frq),
diff_eq_r,
nfreqs=None, window='hamming',
nyq=None, antisymmetric=False)
diff_filter = np.stack((diff_filter_l, diff_filter_r), axis=0)
# output = scipy.signal.filtfilt(diff_filter, 1, audio_t,
# axis=-1, padtype='odd', padlen=None,
# method='pad', irlen=None)
print(f"\t\tall previous: {time.time()-start_time}")
start_time = time.time()
# device = torch.cuda()
audio_t = torch.from_numpy(audio_t.transpose()).float().cuda()
diff_filter = torch.from_numpy(diff_filter).float().cuda()
denom_coef = torch.ones(diff_filter.size()).cuda()
print(f'input to gpu - audio shape: {audio_t.shape}')
# audio_t = F.filtfilt(waveform=audio_t, a_coeffs=denom_coef, b_coeffs=diff_filter, clamp=False).transpose()
audio_t = F.filtfilt(waveform=audio_t, a_coeffs=denom_coef, b_coeffs=diff_filter, clamp=False)
audio_t = audio_t.transpose(1, 0)
print(audio_t.shape)
print('filtered')
print(f"\t\tgpu filtfilt: {time.time()-start_time}")
print(torch.mean(audio_t))
output = audio_t.detach()
print(f"\t\t1gpu filtfilt: {time.time()-start_time}")
output = audio_t.cpu()
print(f"\t\t2gpu filtfilt: {time.time()-start_time}")
output = audio_t.detach().cpu().numpy()
print(f"\t\t3gpu filtfilt: {time.time()-start_time}")
else:
output = audio_t
return output
def get_SPS(x, n_fft=2048, hop_length=1024, smooth=False, frames=False):
x = np.copy(x)
eps = 1e-20
audio_D = compute_stft(x,
hop_length,
n_fft,
np.sqrt(np.hanning(n_fft+1)[:-1]))
audio_D_l = np.abs(audio_D[:, 0, :] + eps)
audio_D_r = np.abs(audio_D[:, 1, :] + eps)
phi = 2 * (np.abs(audio_D_l*np.conj(audio_D_r)))/(np.abs(audio_D_l)**2+np.abs(audio_D_r)**2)
phi_l = np.abs(audio_D_l*np.conj(audio_D_r))/(np.abs(audio_D_l)**2)
phi_r = np.abs(audio_D_r*np.conj(audio_D_l))/(np.abs(audio_D_r)**2)
delta = phi_l - phi_r
delta_ = np.sign(delta)
SPS = (1-phi)*delta_
phi_mean = np.mean(phi, axis=0)
if smooth:
phi_mean = scipy.signal.savgol_filter(phi_mean, 501, 1, mode='mirror')
SPS_mean = np.mean(SPS, axis=0)
if smooth:
SPS_mean = scipy.signal.savgol_filter(SPS_mean, 501, 1, mode='mirror')
return SPS_mean, phi_mean, SPS, phi
def get_mean_side(sps, freqs=[50,2500], sr=44100, n_fft=2048):
sign = np.sign(sps+ 1e-10)
idx1 = freqs[0]
idx2 = freqs[1]
f1 = int(np.floor(idx1*n_fft/sr))
f2 = int(np.floor(idx2*n_fft/sr))
sign_mean = np.mean(sign[f1:f2])/np.abs(np.mean(sign[f1:f2]))
sign_mean
return sign_mean
def get_panning_param_values(phi, side):
p = np.zeros_like(phi)
g = (np.clip(phi+1e-30, 0, 1))/2
for i, g_ in enumerate(g):
if side > 0:
p[i] = 1 - g_
elif side < 0:
p[i] = g_
else:
p[i] = 0.5
g_l = 1-p
g_r = p
return p, [g_l, g_r]
def get_panning_matching(audio, ref_phi,
sr=44100, n_fft=2048, hop_length=1024,
min_db_f=-10, max_freq_pan=16000, frames=True):
eps = 1e-20
window = np.sqrt(np.hanning(n_fft+1)[:-1])
audio = np.copy(audio)
audio_t = np.pad(audio, ((n_fft, n_fft), (0, 0)), mode='constant')
sps_mean_, phi_mean_, _, _ = get_SPS(audio_t, n_fft=n_fft, hop_length=hop_length, smooth=True)
side = get_mean_side(sps_mean_, sr=sr, n_fft=n_fft)
if side > 0:
alpha = 0.7
else:
alpha = 0.3
processor = Panner()
processor.parameters.pan.value = alpha
processor.parameters.pan_law.value = 'linear'
processor.update()
audio_t_ = processor.process(audio_t)
sps_mean_, phi_mean, sps_frames, phi_frames = get_SPS(audio_t_, n_fft=n_fft,
hop_length=hop_length,
smooth=True, frames=frames)
if frames:
p_i_ = []
g_i_ = []
p_ref = []
g_ref = []
for i in range(len(sps_frames)):
sps_ = sps_frames[i]
phi_ = phi_frames[i]
p_, g_ = get_panning_param_values(phi_, side)
p_i_.append(p_)
g_i_.append(g_)
p_, g_ = get_panning_param_values(ref_phi, side)
p_ref.append(p_)
g_ref.append(g_)
ratio = (np.asarray(g_ref)/(np.asarray(g_i_)+eps))
g_l = ratio[:,0,:]
g_r = ratio[:,1,:]
else:
p, g = get_panning_param_values(ref_phi, side)
p_i, g_i = get_panning_param_values(phi_mean, side)
ratio = (np.asarray(g)/np.asarray(g_i))
g_l = ratio[0]
g_r = ratio[1]
audio_new_D = compute_stft(audio_t_,
hop_length,
n_fft,
window)
audio_new_D_mono = audio_new_D.copy()
audio_new_D_mono = audio_new_D_mono[:, 0, :] + audio_new_D_mono[:, 1, :]
audio_new_D_mono = np.abs(audio_new_D_mono)
audio_new_D_phase = np.angle(audio_new_D)
audio_new_D = np.abs(audio_new_D)
audio_new_D_l = audio_new_D[:, 0, :]
audio_new_D_r = audio_new_D[:, 1, :]
if frames:
for i, frame in enumerate(audio_new_D_mono):
max_db = amp_to_db(np.max(np.abs(frame)))
if max_db < min_db_f:
g_r[i] = np.ones_like(frame)
g_l[i] = np.ones_like(frame)
idx1 = max_freq_pan
f1 = int(np.floor(idx1*n_fft/sr))
ones = np.ones_like(g_l)
g_l[f1:] = ones[f1:]
g_r[f1:] = ones[f1:]
audio_new_D_l = audio_new_D_l*g_l
audio_new_D_r = audio_new_D_r*g_r
audio_new_D_l = np.expand_dims(audio_new_D_l, 0)
audio_new_D_r = np.expand_dims(audio_new_D_r, 0)
audio_new_D_ = np.concatenate((audio_new_D_l,audio_new_D_r))
audio_new_D_ = np.moveaxis(audio_new_D_, 0, 1)
audio_new_D_ = audio_new_D_ * (np.cos(audio_new_D_phase) + np.sin(audio_new_D_phase)*1j)
audio_new_t = compute_istft(audio_new_D_,
hop_length,
window)
audio_new_t = audio_new_t[n_fft:n_fft+audio.shape[0]]
return audio_new_t
def get_mean_peak(audio, sr=44100, true_peak=False, n_mels=128, percentile=75):
# Returns mean peak value in dB after the 1Q is removed.
# Input should be in the shape samples x channel
audio_ = audio
window_size = 2**10 # FFT size
hop_size = window_size
peak = []
std = []
for ch in range(audio_.shape[-1]):
x = np.ascontiguousarray(audio_[:, ch])
if true_peak:
x = librosa.resample(x, sr, 4*sr)
sr = 4*sr
window_size = 4*window_size
hop_size = 4*hop_size
onset_func = aubio.onset('hfc', buf_size=window_size, hop_size=hop_size, samplerate=sr)
frames = np.float32(librosa.util.frame(x, frame_length=window_size, hop_length=hop_size))
onset_times = []
for frame in frames.T:
if onset_func(frame):
onset_time = onset_func.get_last()
onset_times.append(onset_time)
samples=[]
if onset_times:
for i, p in enumerate(onset_times[:-1]):
samples.append(onset_times[i]+np.argmax(np.abs(x[onset_times[i]:onset_times[i+1]])))
samples.append(onset_times[-1]+np.argmax(np.abs(x[onset_times[-1]:])))
p_value = []
for p in samples:
p_ = amp_to_db(np.abs(x[p]))
p_value.append(p_)
p_value_=[]
for p in p_value:
if p > np.percentile(p_value, percentile):
p_value_.append(p)
if p_value_:
peak.append(np.mean(p_value_))
std.append(np.std(p_value_))
elif p_value:
peak.append(np.mean(p_value))
std.append(np.std(p_value))
else:
return None
return [np.mean(peak), np.mean(std)]
def compress(processor, audio, sr, th, ratio, attack, release):
eps = 1e-20
x = audio
processor.parameters.threshold.value = th
processor.parameters.ratio.value = ratio
processor.parameters.attack_time.value = attack
processor.parameters.release_time.value = release
processor.update()
output = processor.process(x)
if np.max(np.abs(output)) >= 1.0:
output = np.clip(output, -1.0, 1.0)
return output
def get_comp_matching(audio,
ref_peak, ref_std,
ratio, attack, release, sr=44100,
min_db=-50, comp_peak_norm=-10.0,
min_th=-40, max_ratio=20, n_mels=128,
true_peak=False, percentile=75, expander=True):
x = audio.copy()
if x.ndim < 2:
x = np.expand_dims(x, 1)
max_db = amp_to_db(np.max(np.abs(x)))
if max_db > min_db:
x = pyln.normalize.peak(x, comp_peak_norm)
peak, std = get_mean_peak(x, sr,
n_mels=n_mels,
true_peak=true_peak,
percentile=percentile)
if peak > (ref_peak - ref_std) and peak < (ref_peak + ref_std):
return x
# DownwardCompress
elif peak > (ref_peak - ref_std):
processor = Compressor(sample_rate=sr)
# print('compress')
ratios = np.linspace(ratio, max_ratio, max_ratio-ratio+1)
ths = np.linspace(-1-9, min_th, 2*np.abs(min_th)-1-18)
for rt in ratios:
for th in ths:
y = compress(processor, x, sr, th, rt, attack, release)
peak, std = get_mean_peak(y, sr,
n_mels=n_mels,
true_peak=true_peak,
percentile=percentile)
if peak < (ref_peak + ref_std):
break
else:
continue
break
return y
# Upward Expand
elif peak < (ref_peak + ref_std):
if expander:
processor = Compressor(sample_rate=sr)
ratios = np.linspace(ratio, max_ratio, max_ratio-ratio+1)
ths = np.linspace(-1, min_th, 2*np.abs(min_th)-1)[::-1]
for rt in ratios:
for th in ths:
y = compress(processor, x, sr, th, 1/rt, attack, release)
peak, std = get_mean_peak(y, sr,
n_mels=n_mels,
true_peak=true_peak,
percentile=percentile)
if peak > (ref_peak - ref_std):
break
else:
continue
break
return y
else:
return x
else:
return x
# REVERB
def get_reverb_send(audio, eq_parameters, rv_parameters, impulse_responses=None,
eq_prob=1.0, rv_prob=1.0, parallel=True, shuffle=False, sr=44100, bands=['low_shelf', 'high_shelf']):
x = audio.copy()
if x.ndim < 2:
x = np.expand_dims(x, 1)
channels = x.shape[-1]
eq_gain = eq_parameters.low_shelf_gain.value
eq = Equaliser(n_channels=channels,
sample_rate=sr,
gain_range=(eq_gain, eq_gain),
bands=bands,
hard_clip=False,
name='Equaliser', parameters=eq_parameters)
eq.randomize()
if impulse_responses:
reverb = ConvolutionalReverb(impulse_responses=impulse_responses,
sample_rate=sr,
parameters=rv_parameters)
else:
reverb = AlgorithmicReverb(sample_rate=sr,
parameters=rv_parameters)
reverb.randomize()
fxchain = AugmentationChain([
(eq, rv_prob, False),
(reverb, eq_prob, False)
],
shuffle=shuffle, parallel=parallel)
output = fxchain(x)
return output
# FUNCTIONS TO COMPUTE FEATURES
def compute_loudness_features(args_):
audio_out_ = args_[0]
audio_tar_ = args_[1]
idx = args_[2]
sr = args_[3]
loudness_ = {key:[] for key in ['d_lufs', 'd_peak',]}
peak_tar = np.max(np.abs(audio_tar_))
peak_tar_db = 20.0 * np.log10(peak_tar)
peak_out = np.max(np.abs(audio_out_))
peak_out_db = 20.0 * np.log10(peak_out)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
meter = pyln.Meter(sr) # create BS.1770 meter
loudness_tar = meter.integrated_loudness(audio_tar_)
loudness_out = meter.integrated_loudness(audio_out_)
loudness_['d_lufs'].append(sklearn.metrics.mean_absolute_percentage_error([loudness_tar], [loudness_out]))
loudness_['d_peak'].append(sklearn.metrics.mean_absolute_percentage_error([peak_tar_db], [peak_out_db]))
return loudness_
def compute_spectral_features(args_):
audio_out_ = args_[0]
audio_tar_ = args_[1]
idx = args_[2]
sr = args_[3]
fft_size = args_[4]
hop_length = args_[5]
channels = args_[6]
audio_out_ = pyln.normalize.peak(audio_out_, -1.0)
audio_tar_ = pyln.normalize.peak(audio_tar_, -1.0)
spec_out_ = compute_stft(audio_out_,
hop_length,
fft_size,
np.sqrt(np.hanning(fft_size+1)[:-1]))
spec_out_ = np.transpose(spec_out_, axes=[1, -1, 0])
spec_out_ = np.abs(spec_out_)
spec_tar_ = compute_stft(audio_tar_,
hop_length,
fft_size,
np.sqrt(np.hanning(fft_size+1)[:-1]))
spec_tar_ = np.transpose(spec_tar_, axes=[1, -1, 0])
spec_tar_ = np.abs(spec_tar_)
spectral_ = {key:[] for key in ['centroid_mean',
'bandwidth_mean',
'contrast_l_mean',
'contrast_m_mean',
'contrast_h_mean',
'rolloff_mean',
'flatness_mean',
'mape_mean',
]}
centroid_mean_ = []
centroid_std_ = []
bandwidth_mean_ = []
bandwidth_std_ = []
contrast_l_mean_ = []
contrast_l_std_ = []
contrast_m_mean_ = []
contrast_m_std_ = []
contrast_h_mean_ = []
contrast_h_std_ = []
rolloff_mean_ = []
rolloff_std_ = []
flatness_mean_ = []
for ch in range(channels):
tar = spec_tar_[ch]
out = spec_out_[ch]
tar_sc = librosa.feature.spectral_centroid(y=None, sr=sr, S=tar,
n_fft=fft_size, hop_length=hop_length)
out_sc = librosa.feature.spectral_centroid(y=None, sr=sr, S=out,
n_fft=fft_size, hop_length=hop_length)
tar_bw = librosa.feature.spectral_bandwidth(y=None, sr=sr, S=tar,
n_fft=fft_size, hop_length=hop_length,
centroid=tar_sc, norm=True, p=2)
out_bw = librosa.feature.spectral_bandwidth(y=None, sr=sr, S=out,
n_fft=fft_size, hop_length=hop_length,
centroid=out_sc, norm=True, p=2)
# l = 0-250, m = 1-2-3 = 250 - 2000, h = 2000 - SR/2
tar_ct = librosa.feature.spectral_contrast(y=None, sr=sr, S=tar,
n_fft=fft_size, hop_length=hop_length,
fmin=250.0, n_bands=4, quantile=0.02, linear=False)
out_ct = librosa.feature.spectral_contrast(y=None, sr=sr, S=out,
n_fft=fft_size, hop_length=hop_length,
fmin=250.0, n_bands=4, quantile=0.02, linear=False)
tar_ro = librosa.feature.spectral_rolloff(y=None, sr=sr, S=tar,
n_fft=fft_size, hop_length=hop_length,
roll_percent=0.85)
out_ro = librosa.feature.spectral_rolloff(y=None, sr=sr, S=out,
n_fft=fft_size, hop_length=hop_length,
roll_percent=0.85)
tar_ft = librosa.feature.spectral_flatness(y=None, S=tar,
n_fft=fft_size, hop_length=hop_length,
amin=1e-10, power=2.0)
out_ft = librosa.feature.spectral_flatness(y=None, S=out,
n_fft=fft_size, hop_length=hop_length,
amin=1e-10, power=2.0)
eps = 1e-0
N = 40
mean_sc_tar, std_sc_tar = get_running_stats(tar_sc.T+eps, [0], N=N)
mean_sc_out, std_sc_out = get_running_stats(out_sc.T+eps, [0], N=N)
assert np.isnan(mean_sc_tar).any() == False, f'NAN values mean_sc_tar {idx}'
assert np.isnan(mean_sc_out).any() == False, f'NAN values mean_sc_out {idx}'
mean_bw_tar, std_bw_tar = get_running_stats(tar_bw.T+eps, [0], N=N)
mean_bw_out, std_bw_out = get_running_stats(out_bw.T+eps, [0], N=N)
assert np.isnan(mean_bw_tar).any() == False, f'NAN values tar mean bw {idx}'
assert np.isnan(mean_bw_out).any() == False, f'NAN values out mean bw {idx}'
mean_ct_tar, std_ct_tar = get_running_stats(tar_ct.T, list(range(tar_ct.shape[0])), N=N)
mean_ct_out, std_ct_out = get_running_stats(out_ct.T, list(range(out_ct.shape[0])), N=N)
assert np.isnan(mean_ct_tar).any() == False, f'NAN values tar mean ct {idx}'
assert np.isnan(mean_ct_out).any() == False, f'NAN values out mean ct {idx}'
mean_ro_tar, std_ro_tar = get_running_stats(tar_ro.T+eps, [0], N=N)
mean_ro_out, std_ro_out = get_running_stats(out_ro.T+eps, [0], N=N)
assert np.isnan(mean_ro_tar).any() == False, f'NAN values tar mean ro {idx}'
assert np.isnan(mean_ro_out).any() == False, f'NAN values out mean ro {idx}'
mean_ft_tar, std_ft_tar = get_running_stats(tar_ft.T, [0], N=800) # gives very high numbers due to N (80) value
mean_ft_out, std_ft_out = get_running_stats(out_ft.T, [0], N=800)
mape_mean_sc = sklearn.metrics.mean_absolute_percentage_error(mean_sc_tar[0], mean_sc_out[0])
mape_mean_bw = sklearn.metrics.mean_absolute_percentage_error(mean_bw_tar[0], mean_bw_out[0])
mape_mean_ct_l = sklearn.metrics.mean_absolute_percentage_error(mean_ct_tar[0], mean_ct_out[0])
mape_mean_ct_m = sklearn.metrics.mean_absolute_percentage_error(np.mean(mean_ct_tar[1:4], axis=0),
np.mean(mean_ct_out[1:4], axis=0))
mape_mean_ct_h = sklearn.metrics.mean_absolute_percentage_error(mean_ct_tar[-1], mean_ct_out[-1])
mape_mean_ro = sklearn.metrics.mean_absolute_percentage_error(mean_ro_tar[0], mean_ro_out[0])
mape_mean_ft = sklearn.metrics.mean_absolute_percentage_error(mean_ft_tar[0], mean_ft_out[0])
centroid_mean_.append(mape_mean_sc)
bandwidth_mean_.append(mape_mean_bw)
contrast_l_mean_.append(mape_mean_ct_l)
contrast_m_mean_.append(mape_mean_ct_m)
contrast_h_mean_.append(mape_mean_ct_h)
rolloff_mean_.append(mape_mean_ro)
flatness_mean_.append(mape_mean_ft)
spectral_['centroid_mean'].append(np.mean(centroid_mean_))
spectral_['bandwidth_mean'].append(np.mean(bandwidth_mean_))
spectral_['contrast_l_mean'].append(np.mean(contrast_l_mean_))
spectral_['contrast_m_mean'].append(np.mean(contrast_m_mean_))
spectral_['contrast_h_mean'].append(np.mean(contrast_h_mean_))
spectral_['rolloff_mean'].append(np.mean(rolloff_mean_))
spectral_['flatness_mean'].append(np.mean(flatness_mean_))
spectral_['mape_mean'].append(np.mean([np.mean(centroid_mean_),
np.mean(bandwidth_mean_),
np.mean(contrast_l_mean_),
np.mean(contrast_m_mean_),
np.mean(contrast_h_mean_),
np.mean(rolloff_mean_),
np.mean(flatness_mean_),
]))
return spectral_
# PANNING
def get_panning_rms_frame(sps_frame, freqs=[0,22050], sr=44100, n_fft=2048):
idx1 = freqs[0]
idx2 = freqs[1]
f1 = int(np.floor(idx1*n_fft/sr))
f2 = int(np.floor(idx2*n_fft/sr))
p_rms = np.sqrt((1/(f2-f1)) * np.sum(sps_frame[f1:f2]**2))
return p_rms
def get_panning_rms(sps, freqs=[[0, 22050]], sr=44100, n_fft=2048):
p_rms = []
for frame in sps:
p_rms_ = []
for f in freqs:
rms = get_panning_rms_frame(frame, freqs=f, sr=sr, n_fft=n_fft)
p_rms_.append(rms)
p_rms.append(p_rms_)
return np.asarray(p_rms)
def compute_panning_features(args_):
audio_out_ = args_[0]
audio_tar_ = args_[1]
idx = args_[2]
sr = args_[3]
fft_size = args_[4]
hop_length = args_[5]
audio_out_ = pyln.normalize.peak(audio_out_, -1.0)
audio_tar_ = pyln.normalize.peak(audio_tar_, -1.0)
panning_ = {}
freqs=[[0, sr//2], [0, 250], [250, 2500], [2500, sr//2]]
_, _, sps_frames_tar, _ = get_SPS(audio_tar_, n_fft=fft_size,
hop_length=hop_length,
smooth=True, frames=True)
_, _, sps_frames_out, _ = get_SPS(audio_out_, n_fft=fft_size,
hop_length=hop_length,
smooth=True, frames=True)
p_rms_tar = get_panning_rms(sps_frames_tar,
freqs=freqs,
sr=sr,
n_fft=fft_size)
p_rms_out = get_panning_rms(sps_frames_out,
freqs=freqs,
sr=sr,
n_fft=fft_size)
# to avoid num instability, deletes frames with zero rms from target
if np.min(p_rms_tar) == 0.0:
id_zeros = np.where(p_rms_tar.T[0] == 0)
p_rms_tar_ = []
p_rms_out_ = []
for i in range(len(freqs)):
temp_tar = np.delete(p_rms_tar.T[i], id_zeros)
temp_out = np.delete(p_rms_out.T[i], id_zeros)
p_rms_tar_.append(temp_tar)
p_rms_out_.append(temp_out)
p_rms_tar_ = np.asarray(p_rms_tar_)
p_rms_tar = p_rms_tar_.T
p_rms_out_ = np.asarray(p_rms_out_)
p_rms_out = p_rms_out_.T
N = 40
mean_tar, std_tar = get_running_stats(p_rms_tar, freqs, N=N)
mean_out, std_out = get_running_stats(p_rms_out, freqs, N=N)
panning_['P_t_mean'] = [sklearn.metrics.mean_absolute_percentage_error(mean_tar[0], mean_out[0])]
panning_['P_l_mean'] = [sklearn.metrics.mean_absolute_percentage_error(mean_tar[1], mean_out[1])]
panning_['P_m_mean'] = [sklearn.metrics.mean_absolute_percentage_error(mean_tar[2], mean_out[2])]
panning_['P_h_mean'] = [sklearn.metrics.mean_absolute_percentage_error(mean_tar[3], mean_out[3])]
panning_['mape_mean'] = [np.mean([panning_['P_t_mean'],
panning_['P_l_mean'],
panning_['P_m_mean'],
panning_['P_h_mean'],
])]
return panning_
# DYNAMIC
def get_rms_dynamic_crest(x, frame_length, hop_length):
rms = []
dynamic_spread = []
crest = []
for ch in range(x.shape[-1]):
frames = librosa.util.frame(x[:, ch], frame_length=frame_length, hop_length=hop_length)
rms_ = []
dynamic_spread_ = []
crest_ = []
for i in frames.T:
x_rms = amp_to_db(np.sqrt(np.sum(i**2)/frame_length))
x_d = np.sum(amp_to_db(np.abs(i)) - x_rms)/frame_length
x_c = amp_to_db(np.max(np.abs(i)))/x_rms
rms_.append(x_rms)
dynamic_spread_.append(x_d)
crest_.append(x_c)
rms.append(rms_)
dynamic_spread.append(dynamic_spread_)
crest.append(crest_)
rms = np.asarray(rms)
dynamic_spread = np.asarray(dynamic_spread)
crest = np.asarray(crest)
rms = np.mean(rms, axis=0)
dynamic_spread = np.mean(dynamic_spread, axis=0)
crest = np.mean(crest, axis=0)
rms = np.expand_dims(rms, axis=0)
dynamic_spread = np.expand_dims(dynamic_spread, axis=0)
crest = np.expand_dims(crest, axis=0)
return rms, dynamic_spread, crest
def lowpassFiltering(x, f0, sr):
b1, a1 = scipy.signal.butter(4, f0/(sr/2),'lowpass')
x_f = []
for ch in range(x.shape[-1]):
x_f_ = scipy.signal.filtfilt(b1, a1, x[:, ch]).copy(order='F')
x_f.append(x_f_)
return np.asarray(x_f).T
def get_low_freq_weighting(x, sr, n_fft, hop_length, f0 = 1000):
x_low = lowpassFiltering(x, f0, sr)
X_low = compute_stft(x_low,
hop_length,
n_fft,
np.sqrt(np.hanning(n_fft+1)[:-1]))
X_low = np.transpose(X_low, axes=[1, -1, 0])
X_low = np.abs(X_low)
X = compute_stft(x,
hop_length,
n_fft,
np.sqrt(np.hanning(n_fft+1)[:-1]))
X = np.transpose(X, axes=[1, -1, 0])
X = np.abs(X)
eps = 1e-5
ratio = (X_low)/(X+eps)
ratio = np.sum(ratio, axis = 1)
ratio = np.mean(ratio, axis = 0)
return np.expand_dims(ratio, axis=0)
def compute_dynamic_features(args_):
audio_out_ = args_[0]
audio_tar_ = args_[1]
idx = args_[2]
sr = args_[3]
fft_size = args_[4]
hop_length = args_[5]
audio_out_ = pyln.normalize.peak(audio_out_, -1.0)
audio_tar_ = pyln.normalize.peak(audio_tar_, -1.0)
dynamic_ = {}
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UserWarning)
rms_tar, dyn_tar, crest_tar = get_rms_dynamic_crest(audio_tar_, fft_size, hop_length)
rms_out, dyn_out, crest_out = get_rms_dynamic_crest(audio_out_, fft_size, hop_length)
low_ratio_tar = get_low_freq_weighting(audio_tar_, sr, fft_size, hop_length, f0=1000)
low_ratio_out = get_low_freq_weighting(audio_out_, sr, fft_size, hop_length, f0=1000)
N = 40
eps = 1e-10
rms_tar = (-1*rms_tar) + 1.0
rms_out = (-1*rms_out) + 1.0
dyn_tar = (-1*dyn_tar) + 1.0
dyn_out = (-1*dyn_out) + 1.0
mean_rms_tar, std_rms_tar = get_running_stats(rms_tar.T, [0], N=N)
mean_rms_out, std_rms_out = get_running_stats(rms_out.T, [0], N=N)
mean_dyn_tar, std_dyn_tar = get_running_stats(dyn_tar.T, [0], N=N)
mean_dyn_out, std_dyn_out = get_running_stats(dyn_out.T, [0], N=N)
mean_crest_tar, std_crest_tar = get_running_stats(crest_tar.T, [0], N=N)
mean_crest_out, std_crest_out = get_running_stats(crest_out.T, [0], N=N)
mean_low_ratio_tar, std_low_ratio_tar = get_running_stats(low_ratio_tar.T, [0], N=N)
mean_low_ratio_out, std_low_ratio_out = get_running_stats(low_ratio_out.T, [0], N=N)
dynamic_['rms_mean'] = [sklearn.metrics.mean_absolute_percentage_error(mean_rms_tar, mean_rms_out)]
dynamic_['dyn_mean'] = [sklearn.metrics.mean_absolute_percentage_error(mean_dyn_tar, mean_dyn_out)]
dynamic_['crest_mean'] = [sklearn.metrics.mean_absolute_percentage_error(mean_crest_tar, mean_crest_out)]
dynamic_['l_ratio_mean_mape'] = [sklearn.metrics.mean_absolute_percentage_error(mean_low_ratio_tar, mean_low_ratio_out)]
dynamic_['l_ratio_mean_l2'] = [sklearn.metrics.mean_squared_error(mean_low_ratio_tar, mean_low_ratio_out)]
dynamic_['mape_mean'] = [np.mean([dynamic_['rms_mean'],
dynamic_['dyn_mean'],
dynamic_['crest_mean'],
])]
return dynamic_