hilamanor's picture
initial commit
e73da9c
from multiprocessing.sharedctypes import Value
import torch
import torch.distributed.nn
from torch import distributed as dist, nn as nn
from torch.nn import functional as F
import numpy as np
from sklearn.metrics import average_precision_score, roc_auc_score, accuracy_score
try:
import horovod.torch as hvd
except ImportError:
hvd = None
def gather_features(
audio_features,
text_features,
audio_features_mlp=None,
text_features_mlp=None,
local_loss=False,
gather_with_grad=False,
rank=0,
world_size=1,
use_horovod=False,
mlp_loss=False,
):
if use_horovod:
assert hvd is not None, "Please install horovod"
if gather_with_grad:
all_audio_features = hvd.allgather(audio_features)
all_text_features = hvd.allgather(text_features)
if mlp_loss:
all_audio_features_mlp = hvd.allgather(audio_features_mlp)
all_text_features_mlp = hvd.allgather(text_features_mlp)
else:
with torch.no_grad():
all_audio_features = hvd.allgather(audio_features)
all_text_features = hvd.allgather(text_features)
if mlp_loss:
all_audio_features_mlp = hvd.allgather(audio_features_mlp)
all_text_features_mlp = hvd.allgather(text_features_mlp)
if not local_loss:
# ensure grads for local rank when all_* features don't have a gradient
gathered_audio_features = list(
all_audio_features.chunk(world_size, dim=0)
)
gathered_text_features = list(
all_text_features.chunk(world_size, dim=0)
)
gathered_audio_features[rank] = audio_features
gathered_text_features[rank] = text_features
all_audio_features = torch.cat(gathered_audio_features, dim=0)
all_text_features = torch.cat(gathered_text_features, dim=0)
if mlp_loss:
gathered_audio_features_mlp = list(
all_audio_features_mlp.chunk(world_size, dim=0)
)
gathered_text_features_mlp = list(
all_text_features_mlp.chunk(world_size, dim=0)
)
gathered_audio_features_mlp[rank] = audio_features_mlp
gathered_text_features_mlp[rank] = text_features_mlp
all_audio_features_mlp = torch.cat(
gathered_audio_features_mlp, dim=0
)
all_text_features_mlp = torch.cat(gathered_text_features_mlp, dim=0)
else:
# We gather tensors from all gpus
if gather_with_grad:
all_audio_features = torch.cat(
torch.distributed.nn.all_gather(audio_features), dim=0
)
all_text_features = torch.cat(
torch.distributed.nn.all_gather(text_features), dim=0
)
if mlp_loss:
all_audio_features_mlp = torch.cat(
torch.distributed.nn.all_gather(audio_features_mlp), dim=0
)
all_text_features_mlp = torch.cat(
torch.distributed.nn.all_gather(text_features_mlp), dim=0
)
else:
gathered_audio_features = [
torch.zeros_like(audio_features) for _ in range(world_size)
]
gathered_text_features = [
torch.zeros_like(text_features) for _ in range(world_size)
]
dist.all_gather(gathered_audio_features, audio_features)
dist.all_gather(gathered_text_features, text_features)
if mlp_loss:
gathered_audio_features_mlp = [
torch.zeros_like(audio_features_mlp) for _ in range(world_size)
]
gathered_text_features_mlp = [
torch.zeros_like(text_features_mlp) for _ in range(world_size)
]
dist.all_gather(gathered_audio_features_mlp, audio_features_mlp)
dist.all_gather(gathered_text_features_mlp, text_features_mlp)
if not local_loss:
# ensure grads for local rank when all_* features don't have a gradient
gathered_audio_features[rank] = audio_features
gathered_text_features[rank] = text_features
if mlp_loss:
gathered_audio_features_mlp[rank] = audio_features_mlp
gathered_text_features_mlp[rank] = text_features_mlp
all_audio_features = torch.cat(gathered_audio_features, dim=0)
all_text_features = torch.cat(gathered_text_features, dim=0)
if mlp_loss:
all_audio_features_mlp = torch.cat(gathered_audio_features_mlp, dim=0)
all_text_features_mlp = torch.cat(gathered_text_features_mlp, dim=0)
if mlp_loss:
return (
all_audio_features,
all_text_features,
all_audio_features_mlp,
all_text_features_mlp,
)
else:
return all_audio_features, all_text_features
class ClipLoss(nn.Module):
def __init__(
self,
local_loss=False,
gather_with_grad=False,
cache_labels=False,
rank=0,
world_size=1,
use_horovod=False,
mlp_loss=False,
weight_loss_kappa=0,
):
super().__init__()
self.local_loss = local_loss
self.gather_with_grad = gather_with_grad
self.cache_labels = cache_labels
self.rank = rank
self.world_size = world_size
self.use_horovod = use_horovod
self.mlp_loss = mlp_loss
self.weighted_loss = bool(weight_loss_kappa != 0)
self.weight_loss_kappa = weight_loss_kappa
# cache state
self.prev_num_logits = 0
self.labels = {}
def forward(
self,
audio_features,
text_features,
logit_scale_a,
logit_scale_t=None,
audio_features_mlp=None,
text_features_mlp=None,
):
device = audio_features.device
if self.mlp_loss:
if self.world_size > 1:
(
all_audio_features,
all_text_features,
all_audio_features_mlp,
all_text_features_mlp,
) = gather_features(
audio_features=audio_features,
text_features=text_features,
audio_features_mlp=audio_features_mlp,
text_features_mlp=text_features_mlp,
local_loss=self.local_loss,
gather_with_grad=self.gather_with_grad,
rank=self.rank,
world_size=self.world_size,
use_horovod=self.use_horovod,
mlp_loss=self.mlp_loss,
)
if self.local_loss:
a_logits_per_audio = (
logit_scale_a * audio_features @ all_text_features_mlp.T
)
a_logits_per_text = (
logit_scale_a * text_features_mlp @ all_audio_features.T
)
t_logits_per_audio = (
logit_scale_t * audio_features_mlp @ all_text_features.T
)
t_logits_per_text = (
logit_scale_t * text_features @ all_audio_features_mlp.T
)
else:
a_logits_per_audio = (
logit_scale_a * all_audio_features @ all_text_features_mlp.T
)
a_logits_per_text = a_logits_per_audio.T
t_logits_per_audio = (
logit_scale_t * all_audio_features_mlp @ all_text_features.T
)
t_logits_per_text = t_logits_per_audio.T
else:
a_logits_per_audio = (
logit_scale_a * audio_features @ text_features_mlp.T
)
a_logits_per_text = logit_scale_a * text_features_mlp @ audio_features.T
t_logits_per_audio = (
logit_scale_t * audio_features_mlp @ text_features.T
)
t_logits_per_text = logit_scale_t * text_features @ audio_features_mlp.T
# calculated ground-truth and cache if enabled
num_logits = a_logits_per_audio.shape[0]
if self.prev_num_logits != num_logits or device not in self.labels:
labels = torch.arange(num_logits, device=device, dtype=torch.long)
if self.world_size > 1 and self.local_loss:
labels = labels + num_logits * self.rank
if self.cache_labels:
self.labels[device] = labels
self.prev_num_logits = num_logits
else:
labels = self.labels[device]
if not self.weighted_loss:
total_loss = (
F.cross_entropy(a_logits_per_audio, labels)
+ F.cross_entropy(a_logits_per_text, labels)
+ F.cross_entropy(t_logits_per_audio, labels)
+ F.cross_entropy(t_logits_per_text, labels)
) / 4
else:
audio_weight = (audio_features @ audio_features.T).detach()
audio_weight = (
torch.exp(
torch.sum(audio_weight, axis=1)
/ (self.weight_loss_kappa * len(audio_weight))
)
).detach()
text_weight = (text_features @ text_features.T).detach()
text_weight = (
torch.exp(
torch.sum(text_weight, axis=1)
/ (self.weight_loss_kappa * len(text_features))
)
).detach()
total_loss = (
F.cross_entropy(a_logits_per_audio, labels, weight=audio_weight)
+ F.cross_entropy(a_logits_per_text, labels, weight=audio_weight)
+ F.cross_entropy(t_logits_per_audio, labels, weight=text_weight)
+ F.cross_entropy(t_logits_per_text, labels, weight=text_weight)
) / 4
else:
if self.world_size > 1:
all_audio_features, all_text_features = gather_features(
audio_features=audio_features,
text_features=text_features,
local_loss=self.local_loss,
gather_with_grad=self.gather_with_grad,
rank=self.rank,
world_size=self.world_size,
use_horovod=self.use_horovod,
mlp_loss=self.mlp_loss,
)
if self.local_loss:
logits_per_audio = (
logit_scale_a * audio_features @ all_text_features.T
)
logits_per_text = (
logit_scale_a * text_features @ all_audio_features.T
)
else:
logits_per_audio = (
logit_scale_a * all_audio_features @ all_text_features.T
)
logits_per_text = logits_per_audio.T
else:
logits_per_audio = logit_scale_a * audio_features @ text_features.T
logits_per_text = logit_scale_a * text_features @ audio_features.T
# calculated ground-truth and cache if enabled
num_logits = logits_per_audio.shape[0]
if self.prev_num_logits != num_logits or device not in self.labels:
labels = torch.arange(num_logits, device=device, dtype=torch.long)
if self.world_size > 1 and self.local_loss:
labels = labels + num_logits * self.rank
if self.cache_labels:
self.labels[device] = labels
self.prev_num_logits = num_logits
else:
labels = self.labels[device]
if not self.weighted_loss:
total_loss = (
F.cross_entropy(logits_per_audio, labels)
+ F.cross_entropy(logits_per_text, labels)
) / 2
else:
audio_weight = (all_audio_features @ all_audio_features.T).detach()
audio_weight = (
torch.exp(
torch.sum(audio_weight, axis=1)
/ (self.weight_loss_kappa * len(all_audio_features))
)
).detach()
text_weight = (all_text_features @ all_text_features.T).detach()
text_weight = (
torch.exp(
torch.sum(text_weight, axis=1)
/ (self.weight_loss_kappa * len(all_text_features))
)
).detach()
total_loss = (
F.cross_entropy(logits_per_audio, labels, weight=text_weight)
+ F.cross_entropy(logits_per_text, labels, weight=audio_weight)
) / 2
return total_loss
def lp_gather_features(pred, target, world_size=1, use_horovod=False):
if use_horovod:
assert hvd is not None, "Please install horovod"
with torch.no_grad():
all_preds = hvd.allgather(pred)
all_targets = hvd.allgath(target)
else:
gathered_preds = [torch.zeros_like(pred) for _ in range(world_size)]
gathered_targets = [torch.zeros_like(target) for _ in range(world_size)]
dist.all_gather(gathered_preds, pred)
dist.all_gather(gathered_targets, target)
all_preds = torch.cat(gathered_preds, dim=0)
all_targets = torch.cat(gathered_targets, dim=0)
return all_preds, all_targets
def get_map(pred, target):
pred = torch.sigmoid(pred).numpy()
target = target.numpy()
return np.mean(average_precision_score(target, pred, average=None))
def get_acc(pred, target):
pred = torch.argmax(pred, 1).numpy()
target = torch.argmax(target, 1).numpy()
return accuracy_score(target, pred)
def get_mauc(pred, target):
pred = torch.sigmoid(pred).numpy()
target = target.numpy()
return np.mean(roc_auc_score(target, pred, average=None))
class LPMetrics(object):
def __init__(self, metric_names=["map", "acc", "mauc"]):
self.metrics = []
for name in metric_names:
self.metrics.append(self.get_metric(name))
self.metric_names = metric_names
def get_metric(self, name):
if name == "map":
return get_map
elif name == "acc":
return get_acc
elif name == "mauc":
return get_mauc
else:
raise ValueError(f"the metric should be at least one of [map, acc, mauc]")
def evaluate_mertics(self, pred, target):
metric_dict = {}
for i in range(len(self.metric_names)):
metric_dict[self.metric_names[i]] = self.metrics[i](pred, target)
return metric_dict
def calc_celoss(pred, target):
target = torch.argmax(target, 1).long()
return nn.CrossEntropyLoss()(pred, target)
class LPLoss(nn.Module):
def __init__(self, loss_name):
super().__init__()
if loss_name == "bce":
self.loss_func = nn.BCEWithLogitsLoss()
elif loss_name == "ce":
self.loss_func = calc_celoss
elif loss_name == "mse":
self.loss_func = nn.MSELoss()
else:
raise ValueError(f"the loss func should be at least one of [bce, ce, mse]")
def forward(self, pred, target):
loss = self.loss_func(pred, target)
return loss