import math import re from typing import * import numpy as np import pandas as pd import as px import plotly.graph_objects as go import streamlit as st from sklearn.linear_model import LogisticRegression from modules.nav import Navbar # page related utils def default_page_setting( layout: Literal["wide", "centered"] = "centered", ): st.set_page_config(page_title="VARCO Arena", layout=layout) sidebar_placeholder = st.sidebar.empty() css = f""" """ st.markdown(css, unsafe_allow_html=True) if "korean" not in st.session_state: st.session_state["korean"] = False return sidebar_placeholder # Function to update is_running and refresh only the sidebar def set_nav_bar(is_running: bool, sidebar_placeholder=None, toggle_hashstr: str = None): st.session_state["is_running"] = is_running # Refresh only the sidebar content Navbar(sidebar_placeholder, toggle_hashstr=toggle_hashstr) def set_prompt_preview(did_select_prompt: bool, expander_placeholder=None): st.session_state["did_select_prompt"] = did_select_prompt def show_linebreak_in_md(text: str) -> str: return text.replace("\n", " \n") if isinstance(text, str) else "(Empty)" def escape_markdown(text: str, version: int = 2, entity_type: str = None) -> str: """ Helper function to escape telegram markup symbols. Args: text (:obj:`str`): The text. version (:obj:`int` | :obj:`str`): Use to specify the version of telegrams Markdown. Either ``1`` or ``2``. Defaults to ``1``. entity_type (:obj:`str`, optional): For the entity types ``PRE``, ``CODE`` and the link part of ``TEXT_LINKS``, only certain characters need to be escaped in ``MarkdownV2``. See the official API documentation for details. Only valid in combination with ``version=2``, will be ignored else. """ if int(version) == 1: escape_chars = r"_*`[" elif int(version) == 2: if entity_type in ["pre", "code"]: escape_chars = r"\`" elif entity_type == "text_link": escape_chars = r"\)" else: escape_chars = r"_*[]()~`>#+-=|{}.!:" else: raise ValueError("Markdown version must be either 1 or 2!") return re.sub(f"([{re.escape(escape_chars)}])", r"\\\1", text) # Elo result related computes def compute_relative_winrate_to_1st(elo_df, float_pts: int = 3): """ Post-processing utility for saving elo table to an excel file. Possibly work as a absolute measure for quality. elo_df: columns: Model, Elo rating add: column: relative_winrate_to_1st """ from functools import partial rating1st = elo_df["Elo rating"].max() win_rate_to_1st = partial(elo_to_winrate, rating_b=rating1st) elo_df["winrate_vs_1st"] = elo_df["Elo rating"].apply(win_rate_to_1st) return elo_df def elo_to_winrate(rating_a: float = None, rating_b: float = None) -> float: # compute P(A wins B) from ratings rate_diff = rating_a - rating_b win_rate = 1 / (1 + 10 ** (-rate_diff / 400)) return win_rate def compute_mle_elo(df, SCALE=400, BASE=10, INIT_RATING=1000): if isinstance(df, list): df = pd.DataFrame(df) df = df.dropna(subset=["winner", "model_a", "model_b"]) # dropping None vs sth models = pd.concat([df["model_a"], df["model_b"]]).unique() models = pd.Series(np.arange(len(models)), index=models) # duplicate battles df = pd.concat([df, df], ignore_index=True) p = len(models.index) n = df.shape[0] X = np.zeros([n, p]) X[np.arange(n), models[df["model_a"]]] = +math.log(BASE) X[np.arange(n), models[df["model_b"]]] = -math.log(BASE) # one A win => two A win Y = np.zeros(n) Y[df["winner"] == "A"] = 1.0 WARNING = "{L} compute_mle_elo() // Warning: Seeing this message indicates the regression result for elo is unreliable. You should be test-running the Varco Arena or something odd (perfect one-sided wins) is happening\n\nto avoid logistic regressor error, manually putting other class" if (Y == 0).all(): print(WARNING.format(L=32)) Y[-1] = 1.0 elif (Y == 1.0).all(): print(WARNING.format(L=35)) Y[-1] = 0.0 lr = LogisticRegression(fit_intercept=False), Y) elo_scores = SCALE * lr.coef_[0] + INIT_RATING elo_scores = pd.Series(elo_scores, index=models.index).sort_values(ascending=False) df = ( pd.DataFrame( [[n, round(elo_scores[n], 2)] for n in elo_scores.keys()], columns=["Model", "Elo rating"], ) .sort_values("Elo rating", ascending=False) .reset_index(drop=True) ) df.index = df.index + 1 return df def fill_missing_values(df, default_value=0): """ This is used for completing pivot table """ # 기존 인덱스와 컬럼을 가져옵니다. existing_index = set(df.index) existing_columns = set(df.columns) # 모든 가능한 인덱스와 컬럼을 가져옵니다. all_index = set(df.index.union(df.columns)) all_columns = set(df.index.union(df.columns)) # 기본값으로 누락된 행과 열을 채웁니다. missing_index = all_index - existing_index missing_columns = all_columns - existing_columns # 누락된 행을 기본값으로 추가합니다. for idx in missing_index: df.loc[idx] = default_value # 누락된 열을 기본값으로 추가합니다. for col in missing_columns: df[col] = default_value # 인덱스와 컬럼을 다시 정렬합니다. df.sort_index(axis=0, inplace=True) df.sort_index(axis=1, inplace=True) return df def _plot_length_bias(results, judgename: str = None, ratio: bool = True): if not isinstance(results, pd.DataFrame): results = pd.DataFrame.from_dict(results) if ratio: def _win_to_loss_wc_ratio(row): try: if row.winner == "A": ratio = len(row.generated_a.split()) / len(row.generated_b.split()) else: ratio = len(row.generated_b.split()) / len(row.generated_a.split()) except Exception as e: ratio = None return ratio df = results df["ratio"] = df.apply(_win_to_loss_wc_ratio, axis=1) df["category"] = "win/loss wc ratio" # Create the box plot plot_df = df.drop( columns=[col for col in df if col not in ["category", "ratio"]] ) fig = px.violin( plot_df, x="category", y="ratio", # log_y=True, title=f"Length bias ({judgename})", # labels={"category": "win/loss wc ratio", "ratio": "ratio"}, ) else: data = [] for _, row in results.iterrows(): data.append( { "category": "won", "wordcounts": len(row.generated_a.split()) if row["winner"] == "A" else len(row.generated_b.split()), } ) data.append( { "category": "lost", "wordcounts": len(row.generated_b.split()) if row["winner"] == "A" else len(row.generated_a.split()), } ) data.append( { "category": "won/lost ratio", "wordcounts": len(row.generated_a.split()) / len(row.generated_b.split()) # a won if row["winner"] == "A" else len(row.generated_b.split()) / len(row.generated_a.split()), # b won } ) plot_df = pd.DataFrame(data) # Create the box plot fig = px.violin( plot_df, x="category", y="wordcounts", # log_y=True, title=f"Length bias ({judgename})", labels={"category": "outcome", "wordcount": "wordcount"}, ) return fig, plot_df def visualization(results, is_overall=False): """ varco_arena/ 로부터 가져온 함수이나 업데이트가 많이 되었으므로 조심! """ if not isinstance(results, pd.DataFrame): results = pd.DataFrame.from_dict(results) figure_dict = {} judgename = results.iloc[0]["evaluation_model"] # judge bias of length fig, plot_df = _plot_length_bias(results, judgename=judgename) figure_dict["length_bias"] = fig figure_dict["length_bias_df"] = plot_df # Judge bias of Position A/B fig = results["winner"].value_counts(), title=f"Position A/B bias\n({judgename})", text_auto=True, height=400, ) fig.update_layout(xaxis_title="Match Winner", yaxis_title="Count", showlegend=False) figure_dict["counts_of_match_winners"] = fig # Num. matches of each model fig = pd.concat([results["model_a"], results["model_b"]]).value_counts(), title="Match Count per Model", text_auto=True, ) fig.update_layout( xaxis_title="Model", yaxis_title="Match Count", height=400, showlegend=False ) figure_dict["match_count_for_each_model"] = fig # Num. matches matrix (model v. model) ptbl = pd.pivot_table( results, index="model_a", columns="model_b", aggfunc="size", fill_value=0, ) match_counts = ptbl + ptbl.T ordering = match_counts.sum().sort_values(ascending=False).index fig = px.imshow( match_counts.loc[ordering, ordering], title="Number of Matches (model vs. model)", text_auto=True, ) fig.update_layout( xaxis_title="Model B", yaxis_title="Model A", xaxis_side="top", height=800, width=800, title_xanchor="left", title_yanchor="top", font=dict(size=10), ) fig.update_traces( hovertemplate="Model A: %{y}
Model B: %{x}
Count: %{z}" ) figure_dict["match_count_of_each_combination_of_models"] = fig # Win rate matrix (model v. model) a_win_ptbl = pd.pivot_table( results[results["winner"] == "A"], index="model_a", columns="model_b", aggfunc="size", fill_value=0, ) a_win_ptbl = fill_missing_values(a_win_ptbl) b_win_ptbl = pd.pivot_table( results[results["winner"] == "B"], index="model_a", columns="model_b", aggfunc="size", fill_value=0, ) b_win_ptbl = fill_missing_values(b_win_ptbl) num_results_ptbl = pd.pivot_table( results, index="model_a", columns="model_b", aggfunc="size", fill_value=0 ) row_beats_col_freq = (a_win_ptbl + b_win_ptbl.T) / ( num_results_ptbl + num_results_ptbl.T ) prop_wins = row_beats_col_freq.mean(axis=1).sort_values(ascending=False) model_names = list(prop_wins.keys()) row_beats_col = row_beats_col_freq.loc[model_names, model_names] fig = px.imshow( row_beats_col, color_continuous_scale="RdBu", text_auto=".2f", title="P(A wins B)", ) fig.update_layout( xaxis_title="Model B", yaxis_title="Model A", # y axis = row = index title_xanchor="left", title_yanchor="top", xaxis_side="top", height=800, width=800, ) fig.update_traces( hovertemplate="Model A: %{y}
Model B: %{x}
P(A wins B): %{z}" ) figure_dict["fraction_of_model_a_wins_for_all_a_vs_b_matches"] = fig # Elo Rating elo = compute_mle_elo(results) elo_wr = compute_relative_winrate_to_1st(elo) # beautify elo_wr["Elo rating"] = elo_wr["Elo rating"].astype(int) elo_wr["winrate_vs_1st"] = elo_wr["winrate_vs_1st"].round(3) = "Rank" figure_dict["elo_rating"] = elo_wr # Elo Rating by Task: Radar chart if is_overall: tasks = results["task"].unique().tolist() elo_by_task = pd.concat( [ compute_mle_elo(results[results["task"] == task]).assign(task=task) for task in tasks ] ) fig = px.line_polar( elo_by_task, r="Elo rating", theta="task", line_close=True, category_orders={"task": tasks}, color="Model", markers=True, color_discrete_sequence=px.colors.qualitative.Pastel, title="Elo Rating by Task", ) figure_dict["elo_rating_by_task"] = fig figure_dict["judgename"] = judgename return figure_dict