#!/usr/bin/env python3 # Copyright 2023 Dmitry Ustalov # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. __author__ = 'Dmitry Ustalov' __license__ = 'Apache 2.0' from functools import partial from typing import IO, Tuple, List, cast, Dict, Set, Callable import gradio as gr import networkx as nx import numpy as np import numpy.typing as npt import pandas as pd import plotly.express as px from plotly.graph_objects import Figure def visualize(df_pairwise: pd.DataFrame) -> Figure: fig = px.imshow(df_pairwise, color_continuous_scale='RdBu', text_auto='.2f') fig.update_layout(xaxis_title='Loser', yaxis_title='Winner', xaxis_side='top') fig.update_traces(hovertemplate='Winner: %{y}
Loser: %{x}
Fraction of Wins: %{z}') return fig # https://gist.github.com/dustalov/41678b70c40ba5a55430fa5e77b121d9#file-bradley_terry-py def bradley_terry(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64], seed: int = 0, tolerance: float = 10e-6, limit: int = 20) -> npt.NDArray[np.float64]: M = wins + .5 * ties T = M.T + M active = T > 0 w = M.sum(axis=1) Z = np.zeros_like(M, dtype=float) p = np.ones(M.shape[0]) p_new = p.copy() converged, iterations = False, 0 while not converged: iterations += 1 P = np.broadcast_to(p, M.shape) Z[active] = T[active] / (P[active] + P.T[active]) p_new[:] = w p_new /= Z.sum(axis=0) p_new /= p_new.sum() converged = bool(np.linalg.norm(p_new - p) < tolerance) or (iterations >= limit) p[:] = p_new return p def centrality(algorithm: Callable[[nx.DiGraph], Dict[int, float]], wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64]) -> npt.NDArray[np.float64]: A = wins + .5 * ties G = nx.from_numpy_array(A, create_using=nx.DiGraph) scores: Dict[int, float] = algorithm(G) p = np.array([scores[i] for i in range(len(G))]) return p def counting(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64], seed: int = 0, tolerance: float = 10e-6, limit: int = 100) -> npt.NDArray[np.float64]: M = wins + .5 * ties return cast(npt.NDArray[np.float64], M.sum(axis=1)) def eigen(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64], seed: int = 0, tolerance: float = 10e-6, limit: int = 100) -> npt.NDArray[np.float64]: algorithm = partial(nx.algorithms.eigenvector_centrality_numpy, max_iter=limit, tol=tolerance) return centrality(algorithm, wins, ties) def pagerank(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64], seed: int = 0, tolerance: float = 10e-6, limit: int = 100) -> npt.NDArray[np.float64]: algorithm = partial(nx.algorithms.pagerank, max_iter=limit, tol=tolerance) return centrality(algorithm, wins, ties) # https://gist.github.com/dustalov/41678b70c40ba5a55430fa5e77b121d9#file-newman-py def newman(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64], seed: int = 0, tolerance: float = 10e-6, limit: int = 20) -> npt.NDArray[np.float64]: rng = np.random.default_rng(seed) pi, v = rng.random(wins.shape[0]), rng.random() converged, iterations = False, 0 while not converged: iterations += 1 v_numerator = np.sum( ties * (pi[:, np.newaxis] + pi) / (pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi)) ) / 2 v_denominator = np.sum( wins * 2 * np.sqrt(pi[:, np.newaxis] * pi) / (pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi)) ) v = v_numerator / v_denominator v = np.nan_to_num(v, copy=False, nan=tolerance) pi_old = pi.copy() pi_numerator = np.sum( (wins + ties / 2) * (pi + v * np.sqrt(pi[:, np.newaxis] * pi)) / (pi[:, np.newaxis] + pi + 2 + v * np.sqrt(pi[:, np.newaxis] * pi)), axis=1 ) pi_denominator = np.sum( (wins + ties / 2) * (1 + v * np.sqrt(pi[:, np.newaxis] * pi)) / (pi[:, np.newaxis] + pi + 2 + v * np.sqrt(pi[:, np.newaxis] * pi)), axis=0 ) pi = pi_numerator / pi_denominator pi = np.nan_to_num(pi, copy=False, nan=tolerance) converged = np.allclose(pi / (pi + 1), pi_old / (pi_old + 1), rtol=tolerance, atol=tolerance) or (iterations >= limit) return pi ALGORITHMS = { 'Counting': counting, 'Bradley-Terry (1952)': bradley_terry, 'Eigenvector (1986)': eigen, 'PageRank (1998)': pagerank, 'Newman (2023)': newman, } def largest_strongly_connected_component(df: pd.DataFrame) -> Set[str]: G = nx.from_pandas_edgelist(df, source='left', target='right', create_using=nx.DiGraph) H = nx.from_pandas_edgelist(df[df['winner'] == 'tie'], source='right', target='left', create_using=nx.DiGraph) F = nx.compose(G, H) largest = max(nx.strongly_connected_components(F), key=len) return cast(Set[str], largest) def handler(file: IO[bytes], algorithm: str, filtered: bool, truncated: bool, seed: int) -> Tuple[pd.DataFrame, Figure]: if file is None: raise gr.Error('File must be uploaded') if algorithm not in ALGORITHMS: raise gr.Error(f'Unknown algorithm: {algorithm}') try: df = pd.read_csv(file.name, dtype=str) except ValueError as e: raise gr.Error(f'Parsing error: {e}') if not pd.Series(['left', 'right', 'winner']).isin(df.columns).all(): raise gr.Error('Columns must exist: left, right, winner') if not df['winner'].isin(pd.Series(['left', 'right', 'tie'])).all(): raise gr.Error('Allowed winner values: left, right, tie') df = df[['left', 'right', 'winner']] df.dropna(axis='rows', inplace=True) df.loc[df['winner'] == 'right', ['left', 'right']] = df.loc[df['winner'] == 'right', ['right', 'left']].values df.loc[df['winner'] == 'right', 'winner'] = 'left' if filtered: largest = largest_strongly_connected_component(df) df.drop(df[~(df['left'].isin(largest) & df['right'].isin(largest))].index, inplace=True) index = pd.Index(largest, name='item') else: index = pd.Index(np.unique(df[['left', 'right']].values), name='item') df_wins = pd.pivot_table(df[df['winner'] != 'tie'], index='left', columns='right', values='winner', aggfunc='count', fill_value=0) df_wins = df_wins.reindex(labels=index, columns=index, fill_value=0, copy=False) df_ties = pd.pivot_table(df[df['winner'] == 'tie'], index='left', columns='right', values='winner', aggfunc='count', fill_value=0) df_ties = df_ties.reindex(labels=index, columns=index, fill_value=0, copy=False) wins = df_wins.to_numpy(dtype=np.int64) ties = df_ties.to_numpy(dtype=np.int64) ties += ties.T assert wins.shape == ties.shape, 'wins and ties shapes are different' scores = ALGORITHMS[algorithm](wins, ties, seed=seed) df_result = pd.DataFrame(data={'score': scores}, index=index) df_result['pairs'] = pd.Series(0, dtype=int, index=index).add( df.groupby('left')['left'].count(), fill_value=0 ).add( df.groupby('right')['right'].count(), fill_value=0 ).astype(int) df_result['rank'] = df_result['score'].rank(na_option='bottom', ascending=False).astype(int) df_result.fillna(np.NINF, inplace=True) df_result.sort_values(by=['rank', 'score'], ascending=[True, False], inplace=True) df_result.reset_index(inplace=True) if truncated: df_result = pd.concat((df_result.head(5), df_result.tail(5)), copy=False) df_result = df_result[~df_result.index.duplicated(keep='last')] df_pairwise = pd.DataFrame(data=scores[:, np.newaxis] / (scores + scores[:, np.newaxis]), index=index, columns=index) df_pairwise = df_pairwise.reindex(labels=df_result['item'], columns=df_result['item'], copy=False) fig = visualize(df_pairwise) return df_result, fig def main() -> None: iface = gr.Interface( fn=handler, inputs=[ gr.File( file_types=['.tsv', '.csv'], label='Comparisons' ), gr.Dropdown( choices=cast(List[str], ALGORITHMS), value='Bradley-Terry (1952)', label='Algorithm' ), gr.Checkbox( value=False, label='Largest SCC', info='Bradley-Terry, Eigenvector, and Newman algorithms require the comparison graph ' 'to be strongly-connected. ' 'This option keeps only the largest strongly-connected component (SCC) of the input graph. ' 'Some items might be missing as a result of this filtering.' ), gr.Checkbox( value=False, label='Truncate Output', info='Perform the entire computation but output only five head and five tail items, ' 'avoiding overlap.' ), gr.Number( label='Seed', precision=0 ) ], outputs=[ gr.Dataframe( headers=['item', 'score', 'pairs', 'rank'], label='Ranking' ), gr.Plot( label='Pairwise Chances of Winning the Comparison' ) ], examples=[ ['food.csv', 'Counting', False, False], ['food.csv', 'Bradley-Terry (1952)', False, False], ['food.csv', 'Eigenvector (1986)', False, False], ['food.csv', 'PageRank (1998)', False, False], ['food.csv', 'Newman (2023)', False, False] ], title='Pair2Rank: Turn Your Side-by-Side Comparisons into Ranking!', description=''' This easy-to-use tool transforms pairwise comparisons (aka side-by-side) to a meaningful ranking of items. As an input, it expects a comma-separated (CSV) file with a header containing the following columns: - `left`: the first compared item - `right`: the second compared item - `winner`: the label indicating the winning item Possible values for `winner` are `left`, `right`, or `tie`. The provided examples might be a good starting point. As the output, this tool provides a table with items, their estimated scores, and ranks. ''', article=''' This tool attempts to implement the tie-aware ranking aggregation algorithm as described in [Efficient Computation of Rankings from Pairwise Comparisons](https://www.jmlr.org/papers/v24/22-1086.html). ''', allow_flagging='never' ) iface.launch() if __name__ == '__main__': main()