Spaces:
Running
Running
File size: 4,805 Bytes
0de1d17 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
"""
Script for an iterative scheme.
Assumptions:
- complete pariwise comparisons available, i.e. evaluations are cheap
-
"""
import pandas as pd
import numpy as np
from tqdm import tqdm
from .metrics import mapk, rank_biased_overlap
from .plots import plot_ranks
import logging
from typing import List, Callable, Optional
logger = logging.getLogger(__name__)
tol = 0.001
class SelfRank:
def __init__(self, MODELS: List, evaluator: Callable, true_ranking: Optional[List]=None, show_progress: Optional[bool]=False):
self.MODELS = MODELS
self.N = len(MODELS)
self.evaluate = evaluator
self.true_ranking = true_ranking
self.show_progress = show_progress
def fit(self, df: pd.DataFrame):
"""
df: Dataframe where each row is a benchmark instance,
and there is a column with the output for each Model
"""
assert set(self.MODELS) == set(df.columns), "Benchmark data models inconsistent with models to be ranked."
# Build a pairwise preference matrix
if self.show_progress:
pbar = tqdm(total=self.N**3, position=0, leave=False, desc="Evaluations")
y = np.empty((self.N, self.N, self.N))
for i, a in enumerate(self.MODELS):
for j, b in enumerate(self.MODELS):
for k, c in enumerate(self.MODELS): # Judge
# Some checks to limit evaluations
if a == b:
y[i, j, k] = 0.5
y[j, i, k] = 0.5
elif a == c:
y[i, j, k] = 1
y[j, i, k] = 0
elif b == c:
y[i, j, k] = 0
y[j, i, k] = 1
elif j > i:
y[i, j, k] = self.evaluate(a=a, b=b, c=c, df=df)
y[j, i, k] = 1 - y[i, j, k] # complement in the other direction
if self.show_progress: pbar.update(1)
# Estimate the ranks
r = np.ones((self.N, ))
iter = 0
while True:
# weighted mean over k
m = np.einsum('ijk,i->ij', y, r) / self.N
# Aggregate preferences using majority voting
y_p = np.zeros_like(m)
for i in np.arange(self.N):
for j in np.arange(self.N):
if j > i:
if m[i, j] >= m[j, i]:
y_p[i,j] = 1.
y_p[j,i] = 0.
else:
y_p[i,j] = 0.
y_p[j,i] = 1.
# update reputation score by wins
r_k = y_p.sum(axis=1)/max(y_p.sum(axis=1))
# termination if reputation score converges
delta = np.sum(np.abs(r - r_k))
logging.info(f"Iteration {iter}:{delta}")
logging.info(f"Reputation score: {r}")
if delta<= tol:
break
else:
iter += 1
r = r_k
# Get ranked list from the reputation score
idx = np.argsort(r_k)[::-1]
self.ranking = np.array(self.MODELS)[idx].tolist()
logger.info(f"Estimated ranks (best to worst): {self.ranking}")
if self.true_ranking is not None:
logger.info(f"True ranking: {self.true_ranking}")
logger.info(f"RBO measure: {self.measure()}")
return self.ranking # Best to worst
def measure(self, metric='rbo', k=5, p=0.95) -> float:
"""
Report metric related to self-rank
"""
if metric not in ['rbo', 'mapk']:
raise ValueError(f"Metric {metric} not supported (use 'rbo'/'mapk').")
if hasattr(self, 'ranking'):
if self.true_ranking is not None:
if metric == 'mapk':
if k > len(self.true_ranking):
logger.warning(f"MAPk metric is for k={len(self.true_ranking)}, and not k={k}.")
actual = [self.true_ranking[:k]]
pred = [self.ranking[:k]]
return mapk(actual, pred, k=k)
elif metric == 'rbo':
return rank_biased_overlap(self.true_ranking, self.ranking, p=p)
else:
raise ValueError(f"Metric {metric} not understood.")
else:
raise ValueError("True ranking not available for metric calculation.")
else:
raise ValueError("Ranking not estimated. Run 'fit' first.")
def plot(self, caselabel="output"):
if hasattr(self, 'ranking') & (self.true_ranking is not None):
return plot_ranks(self.true_ranking, self.ranking, "actual", "estimated", caselabel)
|