#!/usr/bin/env python3
# Copyright 2023 Dmitry Ustalov
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__author__ = 'Dmitry Ustalov'
__license__ = 'Apache 2.0'
from collections.abc import Callable
from functools import partial
from typing import BinaryIO, cast
import gradio as gr
import networkx as nx
import numpy as np
import numpy.typing as npt
import pandas as pd
import plotly.express as px
from plotly.graph_objects import Figure
def visualize(df_pairwise: pd.DataFrame) -> Figure:
fig = px.imshow(df_pairwise, color_continuous_scale='RdBu', text_auto='.2f')
fig.update_layout(xaxis_title='Loser', yaxis_title='Winner', xaxis_side='top')
fig.update_traces(hovertemplate='Winner: %{y}
Loser: %{x}
Fraction of Wins: %{z}')
return fig
# https://gist.github.com/dustalov/41678b70c40ba5a55430fa5e77b121d9#file-bradley_terry-py
def bradley_terry(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64],
seed: int = 0, tolerance: float = 10e-6, limit: int = 20) -> npt.NDArray[np.float64]:
M = wins + .5 * ties
T = M.T + M
active = T > 0
w = M.sum(axis=1)
Z = np.zeros_like(M, dtype=float)
p = np.ones(M.shape[0])
p_new = p.copy()
converged, iterations = False, 0
while not converged:
iterations += 1
P = np.broadcast_to(p, M.shape)
Z[active] = T[active] / (P[active] + P.T[active])
p_new[:] = w
p_new /= Z.sum(axis=0)
p_new /= p_new.sum()
converged = bool(np.linalg.norm(p_new - p) < tolerance) or (iterations >= limit)
p[:] = p_new
return p
def centrality(algorithm: Callable[[nx.DiGraph], dict[int, float]],
wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64]) -> npt.NDArray[np.float64]:
A = wins + .5 * ties
G = nx.from_numpy_array(A, create_using=nx.DiGraph)
scores: dict[int, float] = algorithm(G)
p = np.array([scores[i] for i in range(len(G))])
return p
def counting(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64],
seed: int = 0, tolerance: float = 10e-6, limit: int = 100) -> npt.NDArray[np.float64]:
M = wins + .5 * ties
return cast(npt.NDArray[np.float64], M.sum(axis=1))
def eigen(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64],
seed: int = 0, tolerance: float = 10e-6, limit: int = 100) -> npt.NDArray[np.float64]:
algorithm = partial(nx.algorithms.eigenvector_centrality_numpy, max_iter=limit, tol=tolerance)
return centrality(algorithm, wins, ties)
def pagerank(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64],
seed: int = 0, tolerance: float = 10e-6, limit: int = 100) -> npt.NDArray[np.float64]:
algorithm = partial(nx.algorithms.pagerank, max_iter=limit, tol=tolerance)
return centrality(algorithm, wins, ties)
# https://gist.github.com/dustalov/41678b70c40ba5a55430fa5e77b121d9#file-newman-py
def newman(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64],
seed: int = 0, tolerance: float = 10e-6, limit: int = 20) -> npt.NDArray[np.float64]:
pi, v = np.ones(wins.shape[0]), .5
converged, iterations = False, 0
while not converged:
iterations += 1
v_numerator = np.sum(
ties * (pi[:, np.newaxis] + pi) /
(pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi))
) / 2
v_denominator = np.sum(
wins * 2 * np.sqrt(pi[:, np.newaxis] * pi) /
(pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi))
)
v = v_numerator / v_denominator
v = np.nan_to_num(v, copy=False, nan=tolerance)
pi_old = pi.copy()
pi_numerator = np.sum(
(wins + ties / 2) * (pi + v * np.sqrt(pi[:, np.newaxis] * pi)) /
(pi[:, np.newaxis] + pi + 2 + v * np.sqrt(pi[:, np.newaxis] * pi)),
axis=1
)
pi_denominator = np.sum(
(wins + ties / 2) * (1 + v * np.sqrt(pi[:, np.newaxis] * pi)) /
(pi[:, np.newaxis] + pi + 2 + v * np.sqrt(pi[:, np.newaxis] * pi)),
axis=0
)
pi = pi_numerator / pi_denominator
pi = np.nan_to_num(pi, copy=False, nan=tolerance)
converged = np.allclose(pi / (pi + 1), pi_old / (pi_old + 1),
rtol=tolerance, atol=tolerance) or (iterations >= limit)
return pi
ALGORITHMS = {
'Counting': counting,
'Bradley-Terry (1952)': bradley_terry,
'Eigenvector (1986)': eigen,
'PageRank (1998)': pagerank,
'Newman (2023)': newman,
}
def largest_strongly_connected_component(df: pd.DataFrame) -> set[str]:
G = nx.from_pandas_edgelist(df, source='left', target='right', create_using=nx.DiGraph)
H = nx.from_pandas_edgelist(df[df['winner'] == 'tie'], source='right', target='left', create_using=nx.DiGraph)
F = nx.compose(G, H)
largest = max(nx.strongly_connected_components(F), key=len)
return cast(set[str], largest)
def handler(file: BinaryIO, algorithm: str, filtered: bool, truncated: bool, seed: int) -> tuple[pd.DataFrame, Figure]:
if file is None:
raise gr.Error('File must be uploaded')
if algorithm not in ALGORITHMS:
raise gr.Error(f'Unknown algorithm: {algorithm}')
try:
df = pd.read_csv(file.name, dtype=str)
except ValueError as e:
raise gr.Error(f'Parsing error: {e}')
if not pd.Series(['left', 'right', 'winner']).isin(df.columns).all():
raise gr.Error('Columns must exist: left, right, winner')
if not df['winner'].isin(pd.Series(['left', 'right', 'tie'])).all():
raise gr.Error('Allowed winner values: left, right, tie')
df = df[['left', 'right', 'winner']]
df.dropna(axis=0, inplace=True)
df.loc[df['winner'] == 'right', ['left', 'right']] = df.loc[df['winner'] == 'right', ['right', 'left']].values
df.loc[df['winner'] == 'right', 'winner'] = 'left'
if filtered:
largest = largest_strongly_connected_component(df)
df.drop(df[~(df['left'].isin(largest) & df['right'].isin(largest))].index, inplace=True)
index = pd.Index(largest, name='item')
else:
index = pd.Index(np.unique(df[['left', 'right']].values), name='item')
df_wins = pd.pivot_table(df[df['winner'] != 'tie'],
index='left', columns='right', values='winner',
aggfunc='count', fill_value=0)
df_wins = df_wins.reindex(labels=index, columns=index, fill_value=0, copy=False)
df_ties = pd.pivot_table(df[df['winner'] == 'tie'],
index='left', columns='right', values='winner',
aggfunc='count', fill_value=0)
df_ties = df_ties.reindex(labels=index, columns=index, fill_value=0, copy=False)
wins = df_wins.to_numpy(dtype=int)
ties = df_ties.to_numpy(dtype=int)
ties += ties.T
assert wins.shape == ties.shape, 'wins and ties shapes are different'
scores = ALGORITHMS[algorithm](wins, ties, seed=seed)
df_result = pd.DataFrame(data={'score': scores}, index=index)
df_result['pairs'] = pd.Series(0, dtype=int, index=index).add(
df.groupby('left')['left'].count(), fill_value=0
).add(
df.groupby('right')['right'].count(), fill_value=0
).astype(int)
df_result['rank'] = df_result['score'].rank(na_option='bottom', ascending=False).astype(int)
df_result.fillna(np.NINF, inplace=True)
df_result.sort_values(by=['rank', 'score'], ascending=[True, False], inplace=True)
df_result.reset_index(inplace=True)
if truncated:
df_result = pd.concat((df_result.head(5), df_result.tail(5)), copy=False)
df_result = df_result[~df_result.index.duplicated(keep='last')]
df_pairwise = pd.DataFrame(data=scores[:, np.newaxis] / (scores + scores[:, np.newaxis]),
index=index, columns=index)
df_pairwise = df_pairwise.reindex(labels=df_result['item'], columns=df_result['item'], copy=False)
fig = visualize(df_pairwise)
return df_result, fig
def main() -> None:
iface = gr.Interface(
fn=handler,
inputs=[
gr.File(
file_types=['.tsv', '.csv'],
label='Comparisons'
),
gr.Dropdown(
choices=cast(list[str], ALGORITHMS),
value='Bradley-Terry (1952)',
label='Algorithm'
),
gr.Checkbox(
value=False,
label='Largest SCC',
info='Bradley-Terry, Eigenvector, and Newman algorithms require the comparison graph '
'to be strongly-connected. '
'This option keeps only the largest strongly-connected component (SCC) of the input graph. '
'Some items might be missing as a result of this filtering.'
),
gr.Checkbox(
value=False,
label='Truncate Output',
info='Perform the entire computation but output only five head and five tail items, '
'avoiding overlap.'
),
gr.Number(
label='Seed',
precision=0
)
],
outputs=[
gr.Dataframe(
headers=['item', 'score', 'pairs', 'rank'],
label='Ranking'
),
gr.Plot(
label='Pairwise Chances of Winning the Comparison'
)
],
examples=[
['food.csv', 'Counting', False, False, 0],
['food.csv', 'Bradley-Terry (1952)', False, False, 0],
['food.csv', 'Eigenvector (1986)', False, False, 0],
['food.csv', 'PageRank (1998)', False, False, 0],
['food.csv', 'Newman (2023)', False, False, 0],
['llmfao.csv', 'Bradley-Terry (1952)', False, True, 0]
],
title='Pair2Rank: Turn Your Side-by-Side Comparisons into Ranking!',
description='''
This easy-to-use tool transforms pairwise comparisons (aka side-by-side) to a meaningful ranking of items.
As an input, it expects a comma-separated (CSV) file with a header containing the following columns:
- `left`: the first compared item
- `right`: the second compared item
- `winner`: the label indicating the winning item
Possible values for `winner` are `left`, `right`, or `tie`. The provided examples might be a good starting point.
As the output, this tool provides a table with items, their estimated scores, and ranks.
'''.strip(),
article='''
Read more about Pair2Rank at .
'''.strip(),
allow_flagging='never'
)
iface.launch()
if __name__ == '__main__':
main()