Spaces:
Running
Running
import math | |
import re | |
from typing import * | |
import numpy as np | |
import pandas as pd | |
import plotly.express as px | |
import plotly.graph_objects as go | |
import streamlit as st | |
from sklearn.linear_model import LogisticRegression | |
from modules.nav import Navbar | |
# page related utils | |
def default_page_setting( | |
layout: Literal["wide", "centered"] = "centered", | |
): | |
st.set_page_config(page_title="VARCO Arena", layout=layout) | |
sidebar_placeholder = st.sidebar.empty() | |
css = f""" | |
<style> | |
.appview-container .main .block-container {{ | |
padding-top: 32px; | |
}} | |
[data-testid="stSidebarNav"]>ul {{ | |
padding-top: 32px; | |
}} | |
</style> | |
""" | |
st.markdown(css, unsafe_allow_html=True) | |
if "korean" not in st.session_state: | |
st.session_state["korean"] = False | |
return sidebar_placeholder | |
# Function to update is_running and refresh only the sidebar | |
def set_nav_bar(is_running: bool, sidebar_placeholder=None, toggle_hashstr: str = None): | |
st.session_state["is_running"] = is_running | |
# Refresh only the sidebar content | |
Navbar(sidebar_placeholder, toggle_hashstr=toggle_hashstr) | |
def set_prompt_preview(did_select_prompt: bool, expander_placeholder=None): | |
st.session_state["did_select_prompt"] = did_select_prompt | |
def show_linebreak_in_md(text: str) -> str: | |
return text.replace("\n", " \n") if isinstance(text, str) else "(Empty)" | |
def escape_markdown(text: str, version: int = 2, entity_type: str = None) -> str: | |
""" | |
Helper function to escape telegram markup symbols. | |
Args: | |
text (:obj:`str`): The text. | |
version (:obj:`int` | :obj:`str`): Use to specify the version of telegrams Markdown. | |
Either ``1`` or ``2``. Defaults to ``1``. | |
entity_type (:obj:`str`, optional): For the entity types ``PRE``, ``CODE`` and the link | |
part of ``TEXT_LINKS``, only certain characters need to be escaped in ``MarkdownV2``. | |
See the official API documentation for details. Only valid in combination with | |
``version=2``, will be ignored else. | |
""" | |
if int(version) == 1: | |
escape_chars = r"_*`[" | |
elif int(version) == 2: | |
if entity_type in ["pre", "code"]: | |
escape_chars = r"\`" | |
elif entity_type == "text_link": | |
escape_chars = r"\)" | |
else: | |
escape_chars = r"_*[]()~`>#+-=|{}.!:" | |
else: | |
raise ValueError("Markdown version must be either 1 or 2!") | |
return re.sub(f"([{re.escape(escape_chars)}])", r"\\\1", text) | |
# Elo result related computes | |
def compute_relative_winrate_to_1st(elo_df, float_pts: int = 3): | |
""" | |
Post-processing utility for saving elo table to an excel file. Possibly work as a absolute measure for quality. | |
elo_df: | |
columns: Model, Elo rating | |
add: | |
column: relative_winrate_to_1st | |
""" | |
from functools import partial | |
rating1st = elo_df["Elo rating"].max() | |
win_rate_to_1st = partial(elo_to_winrate, rating_b=rating1st) | |
elo_df["winrate_vs_1st"] = elo_df["Elo rating"].apply(win_rate_to_1st) | |
return elo_df | |
def elo_to_winrate(rating_a: float = None, rating_b: float = None) -> float: | |
# compute P(A wins B) from ratings | |
rate_diff = rating_a - rating_b | |
win_rate = 1 / (1 + 10 ** (-rate_diff / 400)) | |
return win_rate | |
def compute_mle_elo(df, SCALE=400, BASE=10, INIT_RATING=1000): | |
if isinstance(df, list): | |
df = pd.DataFrame(df) | |
df = df.dropna(subset=["winner", "model_a", "model_b"]) # dropping None vs sth | |
models = pd.concat([df["model_a"], df["model_b"]]).unique() | |
models = pd.Series(np.arange(len(models)), index=models) | |
# duplicate battles | |
df = pd.concat([df, df], ignore_index=True) | |
p = len(models.index) | |
n = df.shape[0] | |
X = np.zeros([n, p]) | |
X[np.arange(n), models[df["model_a"]]] = +math.log(BASE) | |
X[np.arange(n), models[df["model_b"]]] = -math.log(BASE) | |
# one A win => two A win | |
Y = np.zeros(n) | |
Y[df["winner"] == "A"] = 1.0 | |
WARNING = "elo.py:L{L} compute_mle_elo() // Warning: Seeing this message indicates the regression result for elo is unreliable. You should be test-running the Varco Arena or something odd (perfect one-sided wins) is happening\n\nto avoid logistic regressor error, manually putting other class" | |
if (Y == 0).all(): | |
print(WARNING.format(L=32)) | |
Y[-1] = 1.0 | |
elif (Y == 1.0).all(): | |
print(WARNING.format(L=35)) | |
Y[-1] = 0.0 | |
lr = LogisticRegression(fit_intercept=False) | |
lr.fit(X, Y) | |
elo_scores = SCALE * lr.coef_[0] + INIT_RATING | |
elo_scores = pd.Series(elo_scores, index=models.index).sort_values(ascending=False) | |
df = ( | |
pd.DataFrame( | |
[[n, round(elo_scores[n], 2)] for n in elo_scores.keys()], | |
columns=["Model", "Elo rating"], | |
) | |
.sort_values("Elo rating", ascending=False) | |
.reset_index(drop=True) | |
) | |
df.index = df.index + 1 | |
return df | |
def fill_missing_values(df, default_value=0): | |
""" | |
This is used for completing pivot table | |
""" | |
# ๊ธฐ์กด ์ธ๋ฑ์ค์ ์ปฌ๋ผ์ ๊ฐ์ ธ์ต๋๋ค. | |
existing_index = set(df.index) | |
existing_columns = set(df.columns) | |
# ๋ชจ๋ ๊ฐ๋ฅํ ์ธ๋ฑ์ค์ ์ปฌ๋ผ์ ๊ฐ์ ธ์ต๋๋ค. | |
all_index = set(df.index.union(df.columns)) | |
all_columns = set(df.index.union(df.columns)) | |
# ๊ธฐ๋ณธ๊ฐ์ผ๋ก ๋๋ฝ๋ ํ๊ณผ ์ด์ ์ฑ์๋๋ค. | |
missing_index = all_index - existing_index | |
missing_columns = all_columns - existing_columns | |
# ๋๋ฝ๋ ํ์ ๊ธฐ๋ณธ๊ฐ์ผ๋ก ์ถ๊ฐํฉ๋๋ค. | |
for idx in missing_index: | |
df.loc[idx] = default_value | |
# ๋๋ฝ๋ ์ด์ ๊ธฐ๋ณธ๊ฐ์ผ๋ก ์ถ๊ฐํฉ๋๋ค. | |
for col in missing_columns: | |
df[col] = default_value | |
# ์ธ๋ฑ์ค์ ์ปฌ๋ผ์ ๋ค์ ์ ๋ ฌํฉ๋๋ค. | |
df.sort_index(axis=0, inplace=True) | |
df.sort_index(axis=1, inplace=True) | |
return df | |
def _plot_length_bias(results, judgename: str = None, ratio: bool = True): | |
if not isinstance(results, pd.DataFrame): | |
results = pd.DataFrame.from_dict(results) | |
if ratio: | |
def _win_to_loss_wc_ratio(row): | |
try: | |
if row.winner == "A": | |
ratio = len(row.generated_a.split()) / len(row.generated_b.split()) | |
else: | |
ratio = len(row.generated_b.split()) / len(row.generated_a.split()) | |
except Exception as e: | |
ratio = None | |
return ratio | |
df = results | |
df["ratio"] = df.apply(_win_to_loss_wc_ratio, axis=1) | |
df["category"] = "win/loss wc ratio" | |
# Create the box plot | |
plot_df = df.drop( | |
columns=[col for col in df if col not in ["category", "ratio"]] | |
) | |
fig = px.violin( | |
plot_df, | |
x="category", | |
y="ratio", | |
# log_y=True, | |
title=f"Length bias ({judgename})", | |
# labels={"category": "win/loss wc ratio", "ratio": "ratio"}, | |
) | |
else: | |
data = [] | |
for _, row in results.iterrows(): | |
data.append( | |
{ | |
"category": "won", | |
"wordcounts": len(row.generated_a.split()) | |
if row["winner"] == "A" | |
else len(row.generated_b.split()), | |
} | |
) | |
data.append( | |
{ | |
"category": "lost", | |
"wordcounts": len(row.generated_b.split()) | |
if row["winner"] == "A" | |
else len(row.generated_a.split()), | |
} | |
) | |
data.append( | |
{ | |
"category": "won/lost ratio", | |
"wordcounts": len(row.generated_a.split()) | |
/ len(row.generated_b.split()) # a won | |
if row["winner"] == "A" | |
else len(row.generated_b.split()) | |
/ len(row.generated_a.split()), # b won | |
} | |
) | |
plot_df = pd.DataFrame(data) | |
# Create the box plot | |
fig = px.violin( | |
plot_df, | |
x="category", | |
y="wordcounts", | |
# log_y=True, | |
title=f"Length bias ({judgename})", | |
labels={"category": "outcome", "wordcount": "wordcount"}, | |
) | |
return fig, plot_df | |
def visualization(results, is_overall=False): | |
""" | |
varco_arena/visualization.py ๋ก๋ถํฐ ๊ฐ์ ธ์จ ํจ์์ด๋ ์ ๋ฐ์ดํธ๊ฐ ๋ง์ด ๋์์ผ๋ฏ๋ก ์กฐ์ฌ! | |
""" | |
if not isinstance(results, pd.DataFrame): | |
results = pd.DataFrame.from_dict(results) | |
figure_dict = {} | |
judgename = results.iloc[0]["evaluation_model"] | |
# judge bias of length | |
fig, plot_df = _plot_length_bias(results, judgename=judgename) | |
figure_dict["length_bias"] = fig | |
figure_dict["length_bias_df"] = plot_df | |
# Judge bias of Position A/B | |
fig = px.bar( | |
results["winner"].value_counts(), | |
title=f"Position A/B bias\n({judgename})", | |
text_auto=True, | |
height=400, | |
) | |
fig.update_layout(xaxis_title="Match Winner", yaxis_title="Count", showlegend=False) | |
figure_dict["counts_of_match_winners"] = fig | |
# Num. matches of each model | |
fig = px.bar( | |
pd.concat([results["model_a"], results["model_b"]]).value_counts(), | |
title="Match Count per Model", | |
text_auto=True, | |
) | |
fig.update_layout( | |
xaxis_title="Model", yaxis_title="Match Count", height=400, showlegend=False | |
) | |
figure_dict["match_count_for_each_model"] = fig | |
# Num. matches matrix (model v. model) | |
ptbl = pd.pivot_table( | |
results, | |
index="model_a", | |
columns="model_b", | |
aggfunc="size", | |
fill_value=0, | |
) | |
match_counts = ptbl + ptbl.T | |
ordering = match_counts.sum().sort_values(ascending=False).index | |
fig = px.imshow( | |
match_counts.loc[ordering, ordering], | |
title="Number of Matches (model vs. model)", | |
text_auto=True, | |
) | |
fig.update_layout( | |
xaxis_title="Model B", | |
yaxis_title="Model A", | |
xaxis_side="top", | |
height=800, | |
width=800, | |
title_xanchor="left", | |
title_yanchor="top", | |
font=dict(size=10), | |
) | |
fig.update_traces( | |
hovertemplate="Model A: %{y}<br>Model B: %{x}<br>Count: %{z}<extra></extra>" | |
) | |
figure_dict["match_count_of_each_combination_of_models"] = fig | |
# Win rate matrix (model v. model) | |
a_win_ptbl = pd.pivot_table( | |
results[results["winner"] == "A"], | |
index="model_a", | |
columns="model_b", | |
aggfunc="size", | |
fill_value=0, | |
) | |
a_win_ptbl = fill_missing_values(a_win_ptbl) | |
b_win_ptbl = pd.pivot_table( | |
results[results["winner"] == "B"], | |
index="model_a", | |
columns="model_b", | |
aggfunc="size", | |
fill_value=0, | |
) | |
b_win_ptbl = fill_missing_values(b_win_ptbl) | |
num_results_ptbl = pd.pivot_table( | |
results, index="model_a", columns="model_b", aggfunc="size", fill_value=0 | |
) | |
row_beats_col_freq = (a_win_ptbl + b_win_ptbl.T) / ( | |
num_results_ptbl + num_results_ptbl.T | |
) | |
prop_wins = row_beats_col_freq.mean(axis=1).sort_values(ascending=False) | |
model_names = list(prop_wins.keys()) | |
row_beats_col = row_beats_col_freq.loc[model_names, model_names] | |
fig = px.imshow( | |
row_beats_col, | |
color_continuous_scale="RdBu", | |
text_auto=".2f", | |
title="P(A wins B)", | |
) | |
fig.update_layout( | |
xaxis_title="Model B", | |
yaxis_title="Model A", # y axis = row = index | |
title_xanchor="left", | |
title_yanchor="top", | |
xaxis_side="top", | |
height=800, | |
width=800, | |
) | |
fig.update_traces( | |
hovertemplate="Model A: %{y}<br>Model B: %{x}<br>P(A wins B): %{z}<extra></extra>" | |
) | |
figure_dict["fraction_of_model_a_wins_for_all_a_vs_b_matches"] = fig | |
# Elo Rating | |
elo = compute_mle_elo(results) | |
elo_wr = compute_relative_winrate_to_1st(elo) | |
# beautify | |
elo_wr["Elo rating"] = elo_wr["Elo rating"].astype(int) | |
elo_wr["winrate_vs_1st"] = elo_wr["winrate_vs_1st"].round(3) | |
elo_wr.index.name = "Rank" | |
figure_dict["elo_rating"] = elo_wr | |
# Elo Rating by Task: Radar chart | |
if is_overall: | |
tasks = results["task"].unique().tolist() | |
elo_by_task = pd.concat( | |
[ | |
compute_mle_elo(results[results["task"] == task]).assign(task=task) | |
for task in tasks | |
] | |
) | |
fig = px.line_polar( | |
elo_by_task, | |
r="Elo rating", | |
theta="task", | |
line_close=True, | |
category_orders={"task": tasks}, | |
color="Model", | |
markers=True, | |
color_discrete_sequence=px.colors.qualitative.Pastel, | |
title="Elo Rating by Task", | |
) | |
figure_dict["elo_rating_by_task"] = fig | |
figure_dict["judgename"] = judgename | |
return figure_dict | |