import math
import re
from typing import *
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import streamlit as st
from sklearn.linear_model import LogisticRegression
from modules.nav import Navbar
# page related utils
def default_page_setting(
layout: Literal["wide", "centered"] = "centered",
):
st.set_page_config(page_title="VARCO Arena", layout=layout)
sidebar_placeholder = st.sidebar.empty()
css = f"""
"""
st.markdown(css, unsafe_allow_html=True)
if "korean" not in st.session_state:
st.session_state["korean"] = False
return sidebar_placeholder
# Function to update is_running and refresh only the sidebar
def set_nav_bar(is_running: bool, sidebar_placeholder=None, toggle_hashstr: str = None):
st.session_state["is_running"] = is_running
# Refresh only the sidebar content
Navbar(sidebar_placeholder, toggle_hashstr=toggle_hashstr)
def set_prompt_preview(did_select_prompt: bool, expander_placeholder=None):
st.session_state["did_select_prompt"] = did_select_prompt
def show_linebreak_in_md(text: str) -> str:
return text.replace("\n", " \n") if isinstance(text, str) else "(Empty)"
def escape_markdown(text: str, version: int = 2, entity_type: str = None) -> str:
"""
Helper function to escape telegram markup symbols.
Args:
text (:obj:`str`): The text.
version (:obj:`int` | :obj:`str`): Use to specify the version of telegrams Markdown.
Either ``1`` or ``2``. Defaults to ``1``.
entity_type (:obj:`str`, optional): For the entity types ``PRE``, ``CODE`` and the link
part of ``TEXT_LINKS``, only certain characters need to be escaped in ``MarkdownV2``.
See the official API documentation for details. Only valid in combination with
``version=2``, will be ignored else.
"""
if int(version) == 1:
escape_chars = r"_*`["
elif int(version) == 2:
if entity_type in ["pre", "code"]:
escape_chars = r"\`"
elif entity_type == "text_link":
escape_chars = r"\)"
else:
escape_chars = r"_*[]()~`>#+-=|{}.!:"
else:
raise ValueError("Markdown version must be either 1 or 2!")
return re.sub(f"([{re.escape(escape_chars)}])", r"\\\1", text)
# Elo result related computes
def compute_relative_winrate_to_1st(elo_df, float_pts: int = 3):
"""
Post-processing utility for saving elo table to an excel file. Possibly work as a absolute measure for quality.
elo_df:
columns: Model, Elo rating
add:
column: relative_winrate_to_1st
"""
from functools import partial
rating1st = elo_df["Elo rating"].max()
win_rate_to_1st = partial(elo_to_winrate, rating_b=rating1st)
elo_df["winrate_vs_1st"] = elo_df["Elo rating"].apply(win_rate_to_1st)
return elo_df
def elo_to_winrate(rating_a: float = None, rating_b: float = None) -> float:
# compute P(A wins B) from ratings
rate_diff = rating_a - rating_b
win_rate = 1 / (1 + 10 ** (-rate_diff / 400))
return win_rate
def compute_mle_elo(df, SCALE=400, BASE=10, INIT_RATING=1000):
if isinstance(df, list):
df = pd.DataFrame(df)
df = df.dropna(subset=["winner", "model_a", "model_b"]) # dropping None vs sth
models = pd.concat([df["model_a"], df["model_b"]]).unique()
models = pd.Series(np.arange(len(models)), index=models)
# duplicate battles
df = pd.concat([df, df], ignore_index=True)
p = len(models.index)
n = df.shape[0]
X = np.zeros([n, p])
X[np.arange(n), models[df["model_a"]]] = +math.log(BASE)
X[np.arange(n), models[df["model_b"]]] = -math.log(BASE)
# one A win => two A win
Y = np.zeros(n)
Y[df["winner"] == "A"] = 1.0
WARNING = "elo.py:L{L} compute_mle_elo() // Warning: Seeing this message indicates the regression result for elo is unreliable. You should be test-running the Varco Arena or something odd (perfect one-sided wins) is happening\n\nto avoid logistic regressor error, manually putting other class"
if (Y == 0).all():
print(WARNING.format(L=32))
Y[-1] = 1.0
elif (Y == 1.0).all():
print(WARNING.format(L=35))
Y[-1] = 0.0
lr = LogisticRegression(fit_intercept=False)
lr.fit(X, Y)
elo_scores = SCALE * lr.coef_[0] + INIT_RATING
elo_scores = pd.Series(elo_scores, index=models.index).sort_values(ascending=False)
df = (
pd.DataFrame(
[[n, round(elo_scores[n], 2)] for n in elo_scores.keys()],
columns=["Model", "Elo rating"],
)
.sort_values("Elo rating", ascending=False)
.reset_index(drop=True)
)
df.index = df.index + 1
return df
def fill_missing_values(df, default_value=0):
"""
This is used for completing pivot table
"""
# 기존 인덱스와 컬럼을 가져옵니다.
existing_index = set(df.index)
existing_columns = set(df.columns)
# 모든 가능한 인덱스와 컬럼을 가져옵니다.
all_index = set(df.index.union(df.columns))
all_columns = set(df.index.union(df.columns))
# 기본값으로 누락된 행과 열을 채웁니다.
missing_index = all_index - existing_index
missing_columns = all_columns - existing_columns
# 누락된 행을 기본값으로 추가합니다.
for idx in missing_index:
df.loc[idx] = default_value
# 누락된 열을 기본값으로 추가합니다.
for col in missing_columns:
df[col] = default_value
# 인덱스와 컬럼을 다시 정렬합니다.
df.sort_index(axis=0, inplace=True)
df.sort_index(axis=1, inplace=True)
return df
def _plot_length_bias(results, judgename: str = None, ratio: bool = True):
if not isinstance(results, pd.DataFrame):
results = pd.DataFrame.from_dict(results)
if ratio:
def _win_to_loss_wc_ratio(row):
try:
if row.winner == "A":
ratio = len(row.generated_a.split()) / len(row.generated_b.split())
else:
ratio = len(row.generated_b.split()) / len(row.generated_a.split())
except Exception as e:
ratio = None
return ratio
df = results
df["ratio"] = df.apply(_win_to_loss_wc_ratio, axis=1)
df["category"] = "win/loss wc ratio"
# Create the box plot
plot_df = df.drop(
columns=[col for col in df if col not in ["category", "ratio"]]
)
fig = px.violin(
plot_df,
x="category",
y="ratio",
# log_y=True,
title=f"Length bias ({judgename})",
# labels={"category": "win/loss wc ratio", "ratio": "ratio"},
)
else:
data = []
for _, row in results.iterrows():
data.append(
{
"category": "won",
"wordcounts": len(row.generated_a.split())
if row["winner"] == "A"
else len(row.generated_b.split()),
}
)
data.append(
{
"category": "lost",
"wordcounts": len(row.generated_b.split())
if row["winner"] == "A"
else len(row.generated_a.split()),
}
)
data.append(
{
"category": "won/lost ratio",
"wordcounts": len(row.generated_a.split())
/ len(row.generated_b.split()) # a won
if row["winner"] == "A"
else len(row.generated_b.split())
/ len(row.generated_a.split()), # b won
}
)
plot_df = pd.DataFrame(data)
# Create the box plot
fig = px.violin(
plot_df,
x="category",
y="wordcounts",
# log_y=True,
title=f"Length bias ({judgename})",
labels={"category": "outcome", "wordcount": "wordcount"},
)
return fig, plot_df
def visualization(results, is_overall=False):
"""
varco_arena/visualization.py 로부터 가져온 함수이나 업데이트가 많이 되었으므로 조심!
"""
if not isinstance(results, pd.DataFrame):
results = pd.DataFrame.from_dict(results)
figure_dict = {}
judgename = results.iloc[0]["evaluation_model"]
# judge bias of length
fig, plot_df = _plot_length_bias(results, judgename=judgename)
figure_dict["length_bias"] = fig
figure_dict["length_bias_df"] = plot_df
# Judge bias of Position A/B
fig = px.bar(
results["winner"].value_counts(),
title=f"Position A/B bias\n({judgename})",
text_auto=True,
height=400,
)
fig.update_layout(xaxis_title="Match Winner", yaxis_title="Count", showlegend=False)
figure_dict["counts_of_match_winners"] = fig
# Num. matches of each model
fig = px.bar(
pd.concat([results["model_a"], results["model_b"]]).value_counts(),
title="Match Count per Model",
text_auto=True,
)
fig.update_layout(
xaxis_title="Model", yaxis_title="Match Count", height=400, showlegend=False
)
figure_dict["match_count_for_each_model"] = fig
# Num. matches matrix (model v. model)
ptbl = pd.pivot_table(
results,
index="model_a",
columns="model_b",
aggfunc="size",
fill_value=0,
)
match_counts = ptbl + ptbl.T
ordering = match_counts.sum().sort_values(ascending=False).index
fig = px.imshow(
match_counts.loc[ordering, ordering],
title="Number of Matches (model vs. model)",
text_auto=True,
)
fig.update_layout(
xaxis_title="Model B",
yaxis_title="Model A",
xaxis_side="top",
height=800,
width=800,
title_xanchor="left",
title_yanchor="top",
font=dict(size=10),
)
fig.update_traces(
hovertemplate="Model A: %{y}
Model B: %{x}
Count: %{z}"
)
figure_dict["match_count_of_each_combination_of_models"] = fig
# Win rate matrix (model v. model)
a_win_ptbl = pd.pivot_table(
results[results["winner"] == "A"],
index="model_a",
columns="model_b",
aggfunc="size",
fill_value=0,
)
a_win_ptbl = fill_missing_values(a_win_ptbl)
b_win_ptbl = pd.pivot_table(
results[results["winner"] == "B"],
index="model_a",
columns="model_b",
aggfunc="size",
fill_value=0,
)
b_win_ptbl = fill_missing_values(b_win_ptbl)
num_results_ptbl = pd.pivot_table(
results, index="model_a", columns="model_b", aggfunc="size", fill_value=0
)
row_beats_col_freq = (a_win_ptbl + b_win_ptbl.T) / (
num_results_ptbl + num_results_ptbl.T
)
prop_wins = row_beats_col_freq.mean(axis=1).sort_values(ascending=False)
model_names = list(prop_wins.keys())
row_beats_col = row_beats_col_freq.loc[model_names, model_names]
fig = px.imshow(
row_beats_col,
color_continuous_scale="RdBu",
text_auto=".2f",
title="P(A wins B)",
)
fig.update_layout(
xaxis_title="Model B",
yaxis_title="Model A", # y axis = row = index
title_xanchor="left",
title_yanchor="top",
xaxis_side="top",
height=800,
width=800,
)
fig.update_traces(
hovertemplate="Model A: %{y}
Model B: %{x}
P(A wins B): %{z}"
)
figure_dict["fraction_of_model_a_wins_for_all_a_vs_b_matches"] = fig
# Elo Rating
elo = compute_mle_elo(results)
elo_wr = compute_relative_winrate_to_1st(elo)
# beautify
elo_wr["Elo rating"] = elo_wr["Elo rating"].astype(int)
elo_wr["winrate_vs_1st"] = elo_wr["winrate_vs_1st"].round(3)
elo_wr.index.name = "Rank"
figure_dict["elo_rating"] = elo_wr
# Elo Rating by Task: Radar chart
if is_overall:
tasks = results["task"].unique().tolist()
elo_by_task = pd.concat(
[
compute_mle_elo(results[results["task"] == task]).assign(task=task)
for task in tasks
]
)
fig = px.line_polar(
elo_by_task,
r="Elo rating",
theta="task",
line_close=True,
category_orders={"task": tasks},
color="Model",
markers=True,
color_discrete_sequence=px.colors.qualitative.Pastel,
title="Elo Rating by Task",
)
figure_dict["elo_rating_by_task"] = fig
figure_dict["judgename"] = judgename
return figure_dict