Spaces:
Runtime error
Runtime error
import os | |
import pprint as pp | |
from collections import OrderedDict, defaultdict | |
import json | |
import diff_viewer | |
import pandas as pd | |
import streamlit as st | |
from datasets import load_dataset, get_dataset_config_names | |
CHECK_DATASET_DIR_PATH_BEFORE_CLEAN_SELECT = st.secrets["CHECK_DATASET_DIR_PATH_BEFORE_CLEAN_SELECT"] | |
LOGS_DATASET_DIR_PATH_BEFORE_CLEAN_SELECT = st.secrets["LOGS_DATASET_DIR_PATH_BEFORE_CLEAN_SELECT"] | |
HF_API_TOKEN = st.secrets["HF_API_TOKEN"] | |
OPERATION_TYPES = [ | |
"Applied filter", | |
"Applied deduplication function", | |
"Applied map function", | |
] | |
MAX_LEN_DS_CHECKS = st.secrets["MAX_LEN_DS_CHECKS"] | |
def get_ds(config): | |
ds = load_dataset(CHECK_DATASET_DIR_PATH_BEFORE_CLEAN_SELECT, config, use_auth_token=HF_API_TOKEN, trust_remote_code=True) | |
return ds["train"] | |
def next_idx(idx: int): | |
idx += 1 | |
return idx % len(st.session_state["ds"]) | |
def previous_idx(idx: int): | |
idx -= 1 | |
return idx % len(st.session_state["ds"]) | |
def on_click_next(): | |
st.session_state["idx_1"] = next_idx(st.session_state["idx_1"]) | |
st.session_state["idx_2"] = next_idx(st.session_state["idx_2"]) | |
def on_click_previous(): | |
st.session_state["idx_1"] = previous_idx(st.session_state["idx_1"]) | |
st.session_state["idx_2"] = previous_idx(st.session_state["idx_2"]) | |
def on_ds_change(config): | |
st.session_state["ds"] = get_ds(config) | |
st.session_state["idx_1"] = 0 | |
st.session_state["idx_2"] = 1 if len(st.session_state["ds"]) > 1 else 0 | |
st.session_state["ds_check_config"] = config | |
st.session_state["ds_max_docs"] = len(st.session_state["ds"]) | |
def get_log_stats_df(raw_log): | |
data = OrderedDict( | |
{ | |
"Order": [], | |
"Name": [], | |
"Initial number of samples": [], | |
"Final number of samples": [], | |
"Initial size in bytes": [], | |
"Final size in bytes": [], | |
} | |
) | |
metric_dict = defaultdict(lambda: {}) | |
order = 0 | |
for line in raw_log.split("\n"): | |
for metric_name in list(data.keys()) + OPERATION_TYPES: | |
if metric_name == "Name" or metric_name == "Order": | |
continue | |
if metric_name not in line: | |
continue | |
if ( | |
metric_name == "Removed percentage" | |
and "Removed percentage in bytes" in line | |
): | |
continue | |
if ( | |
metric_name == "Deduplicated percentage" | |
and "Deduplicated percentage in bytes" in line | |
): | |
continue | |
value = line.split(metric_name)[1].split(" ")[1] | |
if metric_name in OPERATION_TYPES: | |
operation_name = value | |
metric_dict[operation_name]["Order"] = order | |
order += 1 | |
continue | |
assert ( | |
metric_name not in metric_dict[operation_name] | |
), f"operation_name: {operation_name}\n\nvalue: {value}\n\nmetric_dict: {pp.pformat(metric_dict)} \n\nmetric_name: {metric_name} \n\nline: {line}" | |
metric_dict[operation_name][metric_name] = value | |
for name, data_dict in metric_dict.items(): | |
for metric_name in data.keys(): | |
if metric_name == "Name": | |
data[metric_name].append(name) | |
continue | |
data[metric_name].append(data_dict[metric_name]) | |
df = pd.DataFrame(data) | |
df.rename( | |
{ | |
"Initial size in bytes": "Initial size (GB)", | |
"Final size in bytes": "Final size (GB)", | |
}, | |
axis=1, | |
inplace=True, | |
) | |
df["% samples removed"] = ( | |
( | |
df["Initial number of samples"].astype(float) | |
- df["Final number of samples"].astype(float) | |
) | |
/ df["Initial number of samples"].astype(float) | |
* 100 | |
) | |
df["Size (GB) % removed"] = ( | |
(df["Initial size (GB)"].astype(float) - df["Final size (GB)"].astype(float)) | |
/ df["Initial size (GB)"].astype(float) | |
* 100 | |
) | |
return df | |
def get_logs_stats(raw_log): | |
try: | |
df = get_log_stats_df(raw_log) | |
st.dataframe(df) | |
except Exception as e: | |
st.write(e) | |
st.write("Subset of the logs:") | |
subcontent = [ | |
line | |
for line in raw_log.split("\n") | |
if "INFO - __main__" in line | |
and "Examples of" not in line | |
and "Examples n°" not in line | |
] | |
st.write(subcontent) | |
def meta_component(idx_key: str = "idx_1"): | |
if "meta" not in st.session_state["ds"][st.session_state[idx_key]]: | |
return | |
with st.expander("See meta field of the example"): | |
meta = st.session_state["ds"][st.session_state["idx_1"]]["meta"] | |
st.write(meta) | |
def filter_page(): | |
index_example = st.number_input("Index of the chosen example", min_value=0, max_value=st.session_state["ds_max_docs"] -1, value=0, step=1) | |
st.session_state["idx_1"] = index_example | |
st.session_state["idx_2"] = next_idx(index_example) | |
idx_1 = st.session_state["idx_1"] | |
idx_2 = st.session_state["idx_2"] | |
text_1 = st.session_state["ds"][idx_1]["text"] | |
text_2 = st.session_state["ds"][idx_2]["text"] | |
st.markdown( | |
f"<h1 style='text-align: center'>Some examples of filtered out texts</h1>", | |
unsafe_allow_html=True, | |
) | |
# col_button_previous, _, col_button_next = st.columns(3) | |
# col_button_next.button( | |
# "Go to next example", | |
# key=None, | |
# help=None, | |
# on_click=on_click_next, | |
# args=None, | |
# kwargs=None, | |
# ) | |
# col_button_previous.button( | |
# "Go to previous example", | |
# key=None, | |
# help=None, | |
# on_click=on_click_previous, | |
# args=None, | |
# kwargs=None, | |
# ) | |
col_1, col_2 = st.columns(2) | |
with col_1: | |
st.subheader(f"Example n°{idx_1}") | |
meta_component(idx_key="idx_1") | |
text_1_show = text_1.replace("\n", "<br>") | |
st.markdown(f"<div>{text_1_show}</div>", unsafe_allow_html=True) | |
with col_2: | |
st.subheader(f"Example n°{idx_2}") | |
meta_component(idx_key="idx_2") | |
text_2_show = text_2.replace("\n", "<br>") | |
st.markdown(f"<div>{text_2_show}</div>", unsafe_allow_html=True) | |
def dedup_or_cleaning_page(): | |
index_example = st.number_input("Index of the chosen example", min_value=0, max_value=st.session_state["ds_max_docs"] -1, value=0, step=1) | |
st.session_state["idx_1"] = index_example | |
st.session_state["idx_2"] = next_idx(index_example) | |
# col_button_previous, col_title, col_button_next = st.columns(3) | |
# col_title.markdown( | |
# f"<h1 style='text-align: center'>Example n°{st.session_state['idx_1']}</h1>", | |
# unsafe_allow_html=True, | |
# ) | |
# col_button_next.button( | |
# "Go to next example", | |
# key=None, | |
# help=None, | |
# on_click=on_click_next, | |
# args=None, | |
# kwargs=None, | |
# ) | |
# col_button_previous.button( | |
# "Go to previous example", | |
# key=None, | |
# help=None, | |
# on_click=on_click_previous, | |
# args=None, | |
# kwargs=None, | |
# ) | |
text = st.session_state["ds"][st.session_state["idx_1"]]["text"] | |
old_text = st.session_state["ds"][st.session_state["idx_1"]]["old_text"] | |
st.markdown( | |
f"<h2 style='text-align: center'>Changes applied</h1>", unsafe_allow_html=True | |
) | |
col_text_1, col_text_2 = st.columns(2) | |
with col_text_1: | |
st.subheader("Old text") | |
with col_text_2: | |
st.subheader("New text") | |
diff_viewer.diff_viewer(old_text=old_text, new_text=text, lang="none") | |
meta_component(idx_key="idx_1") | |
with st.expander("See full old and new texts of the example"): | |
text_show = text.replace("\n", "<br>") | |
old_text_show = old_text.replace("\n", "<br>") | |
col_1, col_2 = st.columns(2) | |
with col_1: | |
st.subheader("Old text") | |
st.markdown(f"<div>{old_text_show}</div>", unsafe_allow_html=True) | |
with col_2: | |
st.subheader("New text") | |
st.markdown(f"<div>{text_show}</div>", unsafe_allow_html=True) | |
# Streamlit page | |
st.set_page_config(page_title="Dataset explorer", page_icon=":hugging_face:", layout="wide") | |
st.write( | |
"The purpose of this application is to sequentially view the changes made to a dataset." | |
) | |
# st.write(CHECK_DATASET_DIR_PATH_BEFORE_CLEAN_SELECT) | |
# ds_log = load_dataset(CHECK_DATASET_DIR_PATH_BEFORE_CLEAN_SELECT, 'clean_v1_dsname_lm_en_multi_un_2', use_auth_token=HF_API_TOKEN) | |
# st.write(ds_log) | |
col_option_clean, col_option_ds = st.columns(2) | |
with open("dataset_configs.json", "r") as f: | |
CHECK_CONFIGS = json.load(f) | |
# CHECK_CONFIGS = get_dataset_config_names(CHECK_DATASET_DIR_PATH_BEFORE_CLEAN_SELECT, use_auth_token=HF_API_TOKEN) | |
CLEANING_VERSIONS = set() | |
dataset_names = defaultdict(set) | |
checks_names = defaultdict(lambda: defaultdict(set)) | |
for check_config in CHECK_CONFIGS: | |
cleaning_version, check_config = check_config.split("_dsname_") | |
dataset_name, checks_name = check_config.split("_operation_") | |
CLEANING_VERSIONS.add(cleaning_version) | |
dataset_names[cleaning_version].add(dataset_name) | |
checks_names[cleaning_version][dataset_name].add(checks_name) | |
# CLEANING_VERSIONS = sorted(list(os.listdir(DATASET_DIR_PATH_BEFORE_CLEAN_SELECT)), reverse=True) | |
option_clean = col_option_clean.selectbox( | |
"Select the cleaning version", sorted(CLEANING_VERSIONS, reverse=True) | |
) | |
# DATASET_DIR_PATH = os.path.join(DATASET_DIR_PATH_BEFORE_CLEAN_SELECT, option_clean) | |
# dataset_names = sorted(list(os.listdir(DATASET_DIR_PATH))) | |
option_ds = col_option_ds.selectbox("Select the dataset", sorted(dataset_names[option_clean])) | |
# checks_path = os.path.join(DATASET_DIR_PATH, option_ds, "checks") | |
# checks_names = sorted(list(os.listdir(checks_path))) | |
# log_path = os.path.join(DATASET_DIR_PATH, option_ds, "logs.txt") | |
ds_log = load_dataset(LOGS_DATASET_DIR_PATH_BEFORE_CLEAN_SELECT, f"{option_clean}_dsname_{option_ds}", use_auth_token=HF_API_TOKEN, trust_remote_code=True) | |
log = ds_log["train"][0]["log"] | |
get_logs_stats(raw_log=log) | |
option_check = st.selectbox("Select the operation applied to inspect", sorted(checks_names[option_clean][option_ds])) | |
ds_check_config = f"{option_clean}_dsname_{option_ds}_operation_{option_check}" | |
if "ds" not in st.session_state or ds_check_config != st.session_state["ds_check_config"]: | |
on_ds_change(ds_check_config) | |
if len(st.session_state["ds"]) == MAX_LEN_DS_CHECKS: | |
st.warning( | |
f"Note: only a subset of size {MAX_LEN_DS_CHECKS} of the modified / filtered examples can be shown in this application" | |
) | |
with st.expander("See details of the available checks"): | |
st.write(st.session_state["ds"]) | |
_ = filter_page() if "_filter_" in option_check else dedup_or_cleaning_page() | |