# Copyright 2023 Dmitry Ustalov
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__author__ = 'Dmitry Ustalov'
__license__ = 'Apache 2.0'
from typing import IO, Tuple, List, cast, Dict, Set
import gradio as gr
import networkx as nx
import numpy as np
import numpy.typing as npt
import pandas as pd
import plotly.express as px
from plotly.graph_objects import Figure
def visualize(df_pairwise: pd.DataFrame) -> Figure:
fig = px.imshow(df_pairwise, color_continuous_scale='RdBu', text_auto='.2f')
fig.update_layout(xaxis_title='Loser', yaxis_title='Winner', xaxis_side='top')
fig.update_traces(hovertemplate='Winner: %{y}
Loser: %{x}
Fraction of Wins: %{z}')
return fig
# https://gist.github.com/dustalov/41678b70c40ba5a55430fa5e77b121d9#file-bradley_terry-py
def bradley_terry(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64],
seed: int = 0, tolerance: float = 10e-6, limit: int = 20) -> npt.NDArray[np.float64]:
M = wins + .5 * ties
T = M.T + M
active = T > 0
w = M.sum(axis=1)
Z = np.zeros_like(M, dtype=float)
p = np.ones(M.shape[0])
p_new = p.copy()
converged, iterations = False, 0
while not converged:
iterations += 1
P = np.broadcast_to(p, M.shape)
Z[active] = T[active] / (P[active] + P.T[active])
p_new[:] = w
p_new /= Z.sum(axis=0)
p_new /= p_new.sum()
converged = bool(np.linalg.norm(p_new - p) < tolerance) or (iterations >= limit)
p[:] = p_new
return p
def pagerank(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64],
seed: int = 0, tolerance: float = 10e-6, limit: int = 100) -> npt.NDArray[np.float64]:
A = wins + .5 * ties
G = nx.from_numpy_array(A, create_using=nx.DiGraph)
scores: Dict[int, float] = nx.algorithms.pagerank(G, max_iter=limit, tol=tolerance)
p = np.array([scores[i] for i in range(len(G))])
return p
# https://gist.github.com/dustalov/41678b70c40ba5a55430fa5e77b121d9#file-newman-py
def newman(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64],
seed: int = 0, tolerance: float = 10e-6, limit: int = 20) -> npt.NDArray[np.float64]:
rng = np.random.default_rng(seed)
pi, v = rng.random(wins.shape[0]), rng.random()
converged, iterations = False, 0
while not converged:
iterations += 1
v_numerator = np.sum(
ties * (pi[:, np.newaxis] + pi) /
(pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi))
) / 2
v_denominator = np.sum(
wins * 2 * np.sqrt(pi[:, np.newaxis] * pi) /
(pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi))
)
v = v_numerator / v_denominator
v = np.nan_to_num(v, copy=False, nan=tolerance)
pi_old = pi.copy()
pi_numerator = np.sum(
(wins + ties / 2) * (pi + v * np.sqrt(pi[:, np.newaxis] * pi)) /
(pi[:, np.newaxis] + pi + 2 + v * np.sqrt(pi[:, np.newaxis] * pi)),
axis=1
)
pi_denominator = np.sum(
(wins + ties / 2) * (1 + v * np.sqrt(pi[:, np.newaxis] * pi)) /
(pi[:, np.newaxis] + pi + 2 + v * np.sqrt(pi[:, np.newaxis] * pi)),
axis=0
)
pi = pi_numerator / pi_denominator
pi = np.nan_to_num(pi, copy=False, nan=tolerance)
converged = np.allclose(pi / (pi + 1), pi_old / (pi_old + 1),
rtol=tolerance, atol=tolerance) or (iterations >= limit)
return pi
ALGORITHMS = {
'Bradley-Terry (1952)': bradley_terry,
'PageRank (1998)': pagerank,
'Newman (2023)': newman,
}
def largest_strongly_connected_component(df: pd.DataFrame) -> Set[str]:
G = nx.from_pandas_edgelist(df, source='left', target='right', create_using=nx.DiGraph)
H = nx.from_pandas_edgelist(df[df['winner'] == 'tie'], source='right', target='left', create_using=nx.DiGraph)
F = nx.compose(G, H)
largest = max(nx.strongly_connected_components(F), key=len)
return cast(Set[str], largest)
def handler(file: IO[bytes], algorithm: str, filtered: bool, seed: int) -> Tuple[pd.DataFrame, Figure]:
if file is None:
raise gr.Error('File must be uploaded')
if algorithm not in ALGORITHMS:
raise gr.Error(f'Unknown algorithm: {algorithm}')
try:
df = pd.read_csv(file.name, dtype=str)
except ValueError as e:
raise gr.Error(f'Parsing error: {e}')
if not pd.Series(['left', 'right', 'winner']).isin(df.columns).all():
raise gr.Error('Columns must exist: left, right, winner')
if not df['winner'].isin(pd.Series(['left', 'right', 'tie'])).all():
raise gr.Error('Allowed winner values: left, right, tie')
df = df[['left', 'right', 'winner']]
df.dropna(axis='rows', inplace=True)
if filtered:
largest = largest_strongly_connected_component(df)
df.drop(df[~(df['left'].isin(largest) & df['right'].isin(largest))].index, inplace=True)
index = pd.Index(largest, name='item')
else:
index = pd.Index(np.unique(df[['left', 'right']].values), name='item')
df_wins = pd.pivot_table(df[df['winner'].isin(['left', 'right'])],
index='left', columns='right', values='winner',
aggfunc='count', fill_value=0)
df_wins = df_wins.reindex(labels=index, columns=index, fill_value=0, copy=False)
df_ties = pd.pivot_table(df[df['winner'] == 'tie'],
index='left', columns='right', values='winner', aggfunc='count',
fill_value=0)
df_ties = df_ties.reindex(labels=index, columns=index, fill_value=0, copy=False)
wins = df_wins.to_numpy(dtype=np.int64)
ties = df_ties.to_numpy(dtype=np.int64)
ties += ties.T
assert wins.shape == ties.shape, 'wins and ties shapes are different'
scores = ALGORITHMS[algorithm](wins, ties, seed=seed)
df_result = pd.DataFrame(data={'score': scores}, index=index)
df_result['pairs'] = pd.Series(0, dtype=int, index=index).add(
df.groupby('left')['left'].count(), fill_value=0
).add(
df.groupby('right')['right'].count(), fill_value=0
).astype(int)
df_result['rank'] = df_result['score'].rank(na_option='bottom', ascending=False).astype(int)
df_result.fillna(np.NINF, inplace=True)
df_result.sort_values(by=['rank', 'score'], ascending=[True, False], inplace=True)
df_result.reset_index(inplace=True)
df_pairwise = pd.DataFrame(data=scores[:, np.newaxis] / (scores + scores[:, np.newaxis]),
index=index, columns=index)
df_pairwise = df_pairwise.reindex(labels=df_result['item'], columns=df_result['item'], copy=False)
fig = visualize(df_pairwise)
return df_result, fig
def main() -> None:
iface = gr.Interface(
fn=handler,
inputs=[
gr.File(
file_types=['.tsv', '.csv'],
label='Comparisons'
),
gr.Dropdown(
choices=cast(List[str], ALGORITHMS),
value='Bradley-Terry (1952)',
label='Algorithm'
),
gr.Checkbox(
value=False,
label='Largest SCC',
info='Bradley-Terry and Newman algorithms require the comparison graph to be strongly-connected. '
'This option keeps only the largest strongly-connected component (SCC) of the input graph. '
'Some items might be missing as a result of this filtering.'
),
gr.Number(
label='Seed',
precision=0
)
],
outputs=[
gr.Dataframe(
headers=['item', 'score', 'pairs', 'rank'],
label='Ranking'
),
gr.Plot(
label='Pairwise Chances of Winning the Comparison'
)
],
examples=[
['food.csv', 'Bradley-Terry (1952)', False],
['food.csv', 'PageRank (1998)', False],
['food.csv', 'Newman (2023)', False]
],
title='Pair2Rank: Turn Your Side-by-Side Comparisons into Ranking!',
description='''
This easy-to-use tool transforms pairwise comparisons (aka side-by-side) to a meaningful ranking of items.
As an input, it expects a comma-separated (CSV) file with a header containing the following columns:
- `left`: the first compared item
- `right`: the second compared item
- `winner`: the label indicating the winning item
Possible values for `winner` are `left`, `right`, or `tie`. The provided examples might be a good starting point.
As the output, this tool provides a table with items, their estimated scores, and ranks.
''',
article='''
This tool attempts to implement the tie-aware ranking aggregation algorithm as described in
[Efficient Computation of Rankings from Pairwise Comparisons](https://www.jmlr.org/papers/v24/22-1086.html).
''',
allow_flagging='never'
)
iface.launch()
if __name__ == '__main__':
main()