Spaces:
Running
Running
File size: 2,593 Bytes
574ab7e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
import json
import torch
import numpy as np
from sklearn.preprocessing import minmax_scale, StandardScaler
from anonymization import DemoPoolAnonymizer, DemoRandomAnonymizer
TAGS_TO_MODELS = {
'pool': 'pool_minmax_ecapa+xvector',
'random': 'random_in-scale_ecapa+xvector',
'pool raw': 'pool_raw_ecapa+xvector'
}
ANON_MODELS = {
'pool': DemoPoolAnonymizer,
'random': DemoRandomAnonymizer
}
class DemoAnonymizer:
def __init__(self, model_path, model_tag, device):
self.device = device
self.scaling = None
self.std_scaler = None
self.model_tag = model_tag
self.dim_ranges = self._load_dim_ranges(model_path / TAGS_TO_MODELS[model_tag])
self.anonymizer = self._load_anonymizer(model_path / TAGS_TO_MODELS[model_tag])
def anonymize_embedding(self, audio, sr):
anon_embedding = self.anonymizer.anonymize_embedding(audio, sr)
if self.dim_ranges:
anon_embedding = self._scale_embedding(anon_embedding)
return anon_embedding
def _load_dim_ranges(self, model_dir):
if (model_dir / 'stats_per_dim.json').exists():
with open(model_dir / 'stats_per_dim.json') as f:
dim_ranges = json.load(f)
return [(v['min'], v['max']) for k, v in sorted(dim_ranges.items(), key=lambda x: int(x[0]))]
def _load_anonymizer(self, model_dir):
model_name = model_dir.name.lower()
if 'pool' in model_name:
model_type = 'pool'
else:
model_type = 'random'
print(f'Model type of anonymizer: {model_type}')
model = ANON_MODELS[model_type](device=self.device, vec_type='ecapa+xvector')
model.load_parameters(model_dir)
if 'minmax' in model_name:
# self.scaling = 'minmax'
#elif 'std_scale' in model_name and model_type == 'pool':
self.scaling = 'std'
self.std_scaler = StandardScaler()
self.std_scaler.fit(model.pool_embeddings.cpu().numpy())
return model
def _scale_embedding(self, vector):
if self.scaling == 'minmax':
vector = vector.cpu().numpy()
scaled_dims = []
for i in range(len(self.dim_ranges)):
scaled_dims.append(minmax_scale(np.array([vector[i]]), self.dim_ranges[i])[0])
vector = torch.tensor(scaled_dims).to(self.device)
elif self.scaling == 'std':
vector = vector.unsqueeze(0).cpu().numpy()
vector = torch.tensor(self.std_scaler.transform(vector)[0])
return vector
|