File size: 2,593 Bytes
574ab7e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import json
import torch
import numpy as np
from sklearn.preprocessing import minmax_scale, StandardScaler

from anonymization import DemoPoolAnonymizer, DemoRandomAnonymizer

TAGS_TO_MODELS = {
    'pool': 'pool_minmax_ecapa+xvector',
    'random': 'random_in-scale_ecapa+xvector',
    'pool raw': 'pool_raw_ecapa+xvector'
}

ANON_MODELS = {
    'pool': DemoPoolAnonymizer,
    'random': DemoRandomAnonymizer
}


class DemoAnonymizer:

    def __init__(self, model_path, model_tag, device):
        self.device = device
        self.scaling = None
        self.std_scaler = None
        self.model_tag = model_tag

        self.dim_ranges = self._load_dim_ranges(model_path / TAGS_TO_MODELS[model_tag])
        self.anonymizer = self._load_anonymizer(model_path / TAGS_TO_MODELS[model_tag])

    def anonymize_embedding(self, audio, sr):

        anon_embedding = self.anonymizer.anonymize_embedding(audio, sr)
        if self.dim_ranges:
            anon_embedding = self._scale_embedding(anon_embedding)
        return anon_embedding

    def _load_dim_ranges(self, model_dir):
        if (model_dir / 'stats_per_dim.json').exists():
            with open(model_dir / 'stats_per_dim.json') as f:
                dim_ranges = json.load(f)
                return [(v['min'], v['max']) for k, v in sorted(dim_ranges.items(), key=lambda x: int(x[0]))]

    def _load_anonymizer(self, model_dir):
        model_name = model_dir.name.lower()

        if 'pool' in model_name:
            model_type = 'pool'
        else:
            model_type = 'random'

        print(f'Model type of anonymizer: {model_type}')

        model = ANON_MODELS[model_type](device=self.device, vec_type='ecapa+xvector')
        model.load_parameters(model_dir)

        if 'minmax' in model_name:
        #    self.scaling = 'minmax'
        #elif 'std_scale' in model_name and model_type == 'pool':
            self.scaling = 'std'
            self.std_scaler = StandardScaler()
            self.std_scaler.fit(model.pool_embeddings.cpu().numpy())

        return model

    def _scale_embedding(self, vector):
        if self.scaling == 'minmax':
            vector = vector.cpu().numpy()
            scaled_dims = []
            for i in range(len(self.dim_ranges)):
                scaled_dims.append(minmax_scale(np.array([vector[i]]), self.dim_ranges[i])[0])

            vector = torch.tensor(scaled_dims).to(self.device)
        elif self.scaling == 'std':
            vector = vector.unsqueeze(0).cpu().numpy()
            vector = torch.tensor(self.std_scaler.transform(vector)[0])

        return vector