rahulnair23's picture
test commit
0de1d17
raw
history blame
4.81 kB
"""
Script for an iterative scheme.
Assumptions:
- complete pariwise comparisons available, i.e. evaluations are cheap
-
"""
import pandas as pd
import numpy as np
from tqdm import tqdm
from .metrics import mapk, rank_biased_overlap
from .plots import plot_ranks
import logging
from typing import List, Callable, Optional
logger = logging.getLogger(__name__)
tol = 0.001
class SelfRank:
def __init__(self, MODELS: List, evaluator: Callable, true_ranking: Optional[List]=None, show_progress: Optional[bool]=False):
self.MODELS = MODELS
self.N = len(MODELS)
self.evaluate = evaluator
self.true_ranking = true_ranking
self.show_progress = show_progress
def fit(self, df: pd.DataFrame):
"""
df: Dataframe where each row is a benchmark instance,
and there is a column with the output for each Model
"""
assert set(self.MODELS) == set(df.columns), "Benchmark data models inconsistent with models to be ranked."
# Build a pairwise preference matrix
if self.show_progress:
pbar = tqdm(total=self.N**3, position=0, leave=False, desc="Evaluations")
y = np.empty((self.N, self.N, self.N))
for i, a in enumerate(self.MODELS):
for j, b in enumerate(self.MODELS):
for k, c in enumerate(self.MODELS): # Judge
# Some checks to limit evaluations
if a == b:
y[i, j, k] = 0.5
y[j, i, k] = 0.5
elif a == c:
y[i, j, k] = 1
y[j, i, k] = 0
elif b == c:
y[i, j, k] = 0
y[j, i, k] = 1
elif j > i:
y[i, j, k] = self.evaluate(a=a, b=b, c=c, df=df)
y[j, i, k] = 1 - y[i, j, k] # complement in the other direction
if self.show_progress: pbar.update(1)
# Estimate the ranks
r = np.ones((self.N, ))
iter = 0
while True:
# weighted mean over k
m = np.einsum('ijk,i->ij', y, r) / self.N
# Aggregate preferences using majority voting
y_p = np.zeros_like(m)
for i in np.arange(self.N):
for j in np.arange(self.N):
if j > i:
if m[i, j] >= m[j, i]:
y_p[i,j] = 1.
y_p[j,i] = 0.
else:
y_p[i,j] = 0.
y_p[j,i] = 1.
# update reputation score by wins
r_k = y_p.sum(axis=1)/max(y_p.sum(axis=1))
# termination if reputation score converges
delta = np.sum(np.abs(r - r_k))
logging.info(f"Iteration {iter}:{delta}")
logging.info(f"Reputation score: {r}")
if delta<= tol:
break
else:
iter += 1
r = r_k
# Get ranked list from the reputation score
idx = np.argsort(r_k)[::-1]
self.ranking = np.array(self.MODELS)[idx].tolist()
logger.info(f"Estimated ranks (best to worst): {self.ranking}")
if self.true_ranking is not None:
logger.info(f"True ranking: {self.true_ranking}")
logger.info(f"RBO measure: {self.measure()}")
return self.ranking # Best to worst
def measure(self, metric='rbo', k=5, p=0.95) -> float:
"""
Report metric related to self-rank
"""
if metric not in ['rbo', 'mapk']:
raise ValueError(f"Metric {metric} not supported (use 'rbo'/'mapk').")
if hasattr(self, 'ranking'):
if self.true_ranking is not None:
if metric == 'mapk':
if k > len(self.true_ranking):
logger.warning(f"MAPk metric is for k={len(self.true_ranking)}, and not k={k}.")
actual = [self.true_ranking[:k]]
pred = [self.ranking[:k]]
return mapk(actual, pred, k=k)
elif metric == 'rbo':
return rank_biased_overlap(self.true_ranking, self.ranking, p=p)
else:
raise ValueError(f"Metric {metric} not understood.")
else:
raise ValueError("True ranking not available for metric calculation.")
else:
raise ValueError("Ranking not estimated. Run 'fit' first.")
def plot(self, caselabel="output"):
if hasattr(self, 'ranking') & (self.true_ranking is not None):
return plot_ranks(self.true_ranking, self.ranking, "actual", "estimated", caselabel)