##### SAFE IMPUTATION #####
import pandas as pd
import numpy as np
from scipy import stats
import warnings
import streamlit as st
import base64
def outlier_per_col(df,col):
q1 = df[col].quantile(0.25)
q3 = df[col].quantile(0.75)
iqr = q3 - q1
# Kolmogorov-Smirnov test to find the distribution of the data
dist_name, p = stats.normaltest(df[col])[0], stats.normaltest(df[col])[1]
# if p > 0.05 then the data is normally distributed
# if p <= 0.05 then the data is not normally is distributed
if p <= 0.05:
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
outlier_df = df[(df[col] < lower_bound) | (df[col] > upper_bound)]
outlier_per = (len(outlier_df) / len(df[col])) * 100
else:
z_score = np.abs(df[col] - df[col].mean()) / df[col].std()
outlier_df = df[(z_score > 3)]
outlier_per = len(outlier_df) / len(df[col]) * 100
return outlier_per
def summary_stats(df,per_to_drop):
summary_df = df.isna().sum().reset_index().rename(columns={'index': 'variable', 0: 'null'})
summary_df['%null'] = (100 * summary_df['null'] / len(df)).round(2)
summary_df = summary_df.merge(df.dtypes.reset_index().rename(columns={'index': 'variable', 0: 'type'}), on='variable')
summary_df = summary_df.drop(columns=['null'])
summary_df = summary_df.drop(summary_df[summary_df['%null'] > per_to_drop].index)
df_numeric = df.select_dtypes(exclude='object')
df_categorical = df.select_dtypes(include='object')
if not df_numeric.empty:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
summary_df['outlier%'] = summary_df[summary_df['variable'].isin(df_numeric.columns)].apply(lambda x: outlier_per_col(df_numeric, x['variable']), axis=1)
else:
summary_df = pd.concat([summary_df, pd.DataFrame({'variable': [], 'outlier%': []})])
summary_df = summary_df.merge((df.select_dtypes(exclude=['object']).nunique() / df.select_dtypes(exclude=['object']).count() * 100).reset_index().rename(columns={'index': 'variable', 0: 'unique%'}).round(2), on='variable', how='left').round(2)
summary_df = summary_df.merge(df.mean(numeric_only=True).reset_index().rename(columns={'index': 'variable', 0: 'mean'}).round(2), on='variable', how='left')
summary_df = summary_df.merge(df.std(numeric_only=True).reset_index().rename(columns={'index': 'variable', 0: 'standard deviation'}).round(2), on='variable', how='left')
summary_df = (summary_df.merge(df.var(numeric_only=True).reset_index().rename(columns={'index': 'variable', 0: 'variance'}), on='variable', how='left').assign(variance=lambda x: x['variance'].apply(lambda y: "{:.2f}".format(y))))
summary_df = summary_df.merge(df.skew(numeric_only=True).reset_index().rename(columns={'index': 'variable', 0: 'skewness'}).round(2), on='variable', how='left')
summary_df = summary_df.merge(df.kurt(numeric_only=True).reset_index().rename(columns={'index': 'variable', 0: 'kurtosis'}).round(2), on='variable', how='left')
summary_df = summary_df.merge(df.min(numeric_only=True).reset_index().rename(columns={'index': 'variable', 0: 'min'}), on='variable', how='left')
summary_df = summary_df.merge(df.max(numeric_only=True).reset_index().rename(columns={'index': 'variable', 0: 'max'}), on='variable', how='left')
summary_df['range'] = summary_df['max'] - summary_df['min']
if not df_numeric.empty:
summary_df = summary_df.merge((df.describe().loc['75%'].T - df.describe().loc['25%'].T).reset_index().rename(columns={'index': 'variable', 0: 'iqr'}), on='variable', how='left')
else:
summary_df = pd.concat([summary_df, pd.DataFrame({'variable': [], 'iqr': []})])
summary_df = summary_df.merge(df.median(numeric_only=True).reset_index().rename(columns={'index': 'variable', 0: 'median'}), on='variable', how='left')
if not df_categorical.empty:
summary_df = summary_df.merge(df.select_dtypes(include=['object']).mode().iloc[0].reset_index().rename(columns={'index': 'variable', 0: 'mode'}), on='variable', how='left')
summary_df = summary_df.merge(df.select_dtypes(include=['object']).nunique().reset_index().rename(columns={'index': 'variable', 0: 'distinct count'}), on='variable', how='left')
else:
summary_df = pd.concat([summary_df, pd.DataFrame({'variable': [], 'mode': []})])
summary_df = pd.concat([summary_df, pd.DataFrame({'variable': [], 'distinct count': []})])
return summary_df
def mean_imputation(df, col):
df[col].fillna(round(df[col].mean(), 2), inplace=True)
def median_imputation(df, col):
median = df[col].median()
df[col].fillna(round(median, 2), inplace=True)
def drop_rows(df, col):
df.dropna(subset=[col], inplace=True)
def drop_column(df, col):
df.drop(col, axis=1, inplace=True)
def mode_imputation(df, col):
mode = df[col].mode()[0]
df[col].fillna(mode, inplace=True)
def arbitrary_val(df, col, val):
df[col].fillna(val, inplace=True)
def linear_interpolate(df, col):
df[col].interpolate(method='linear', inplace=True)
def polynomial_interpolate(df, col):
df[col].interpolate(method='polynomial', order=2, inplace=True)
def interpolate_padding_forward(df, col):
df[col].fillna(method='ffill', inplace=True)
def interpolate_padding_backward(df, col):
df[col].fillna(method='bfill', inplace=True)
def fill_0(df, col):
df[col].fillna(0, inplace=True)
def remove_outliers(df, col):
dist_name, p = stats.normaltest(df[col])[0], stats.normaltest(df[col])[1]
if p <= 0.05:
q1 = df[col].quantile(0.25)
q3 = df[col].quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)]
else:
z_score = np.abs(df[col] - df[col].mean()) / df[col].std()
df = df[(z_score < 3)]
return df
def mean_outlier(df, col):
dist_name, p = stats.normaltest(df[col])[0], stats.normaltest(df[col])[1]
if p <= 0.05:
q1 = df[col].quantile(0.25)
q3 = df[col].quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
df[col][df[col] < lower_bound] = df[col].mean()
df[col][df[col] > upper_bound] = df[col].mean()
else:
z_score = np.abs(df[col] - df[col].mean()) / df[col].std()
df.loc[z_score > 3, col] = df[col].mean()
return df
def median_outlier(df, col):
dist_name, p = stats.normaltest(df[col])[0], stats.normaltest(df[col])[1]
if p <= 0.05:
q1 = df[col].quantile(0.25)
q3 = df[col].quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
df[col][df[col] < lower_bound] = df[col].median()
df[col][df[col] > upper_bound] = df[col].median()
else:
z_score = np.abs(df[col] - df[col].mean()) / df[col].std()
df.loc[z_score > 3, col] = df[col].median()
return df
def outlier_capping(df, col):
dist_name, p = stats.normaltest(df[col])[0], stats.normaltest(df[col])[1]
if p <= 0.05:
q1 = df[col].quantile(0.25)
q3 = df[col].quantile(0.75)
iqr = q3-q1
lower_bound = q1-1.5*iqr
upper_bound = q1+1.5*iqr
df[col] = np.where(df[col] >= upper_bound, upper_bound, np.where(df[col] <= lower_bound, lower_bound, df[col]))
else:
upper_limit = df[col].mean() + (3 * df[col].std())
lower_limit = df[col].mean() - (3 * df[col].std())
df[col] = np.where(df[col] >= upper_limit, upper_limit, np.where(df[col] <= lower_limit, lower_limit, df[col]))
return df
def perform_treatment_missing(df, col, treatments):
if treatments == 'mean':
mean_imputation(df, col)
elif treatments == 'median':
median_imputation(df, col)
elif treatments == 'drop row':
drop_rows(df, col)
elif treatments == 'drop column':
drop_column(df, col)
elif treatments == 'linear interpolation':
linear_interpolate(df, col)
elif treatments == 'polynomial interpolation':
polynomial_interpolate(df, col)
elif treatments == 'ffill':
interpolate_padding_forward(df, col)
elif treatments == 'bfill':
interpolate_padding_backward(df, col)
elif treatments == 'mode':
mode_imputation(df, col)
elif treatments == 'fill_0':
fill_0(df, col)
else:
return df[col]
def perform_treatment_outlier(df, col, treatments):
if treatments == 'remove':
remove_outliers(df,col)
elif treatments == 'mean':
mean_outlier(df,col)
elif treatments == 'median':
median_imputation(df,col)
elif treatments == 'capping':
outlier_capping(df,col)
else:
return df[col]
def imputed_df(df,edited_df,identifier,flag,per_to_drop=None):
if per_to_drop is not None:
null_percentage = df.isnull().sum() / df.shape[0] * 100
col_to_drop = null_percentage[null_percentage > per_to_drop].keys()
df = df.drop(col_to_drop, axis=1)
cols_with_one_unique = df.columns[df.nunique() == 1]
df.drop(cols_with_one_unique, axis=1, inplace=True)
for col in edited_df['variable'].to_list():
perform_treatment_missing(df,col, edited_df.loc[edited_df['variable'] == col, 'Imputation method'].iloc[0])
perform_treatment_outlier(df,col, edited_df.loc[edited_df['variable'] == col, 'Outlier Treatment'].iloc[0])
return df
# flag = st.sidebar.selectbox("Flag Column", [None] + list(st.session_state.df.columns))
# identifier = st.sidebar.selectbox("Identifier Column", [None] + list(st.session_state.df.columns))
# numerical_columns = st.session_state.df.select_dtypes(include=['number']).columns.tolist()
# numerical_columns = [x for x in numerical_columns if x !=flag]
# categorical_columns = st.session_state.df.select_dtypes(include=['object', 'category']).columns.tolist()
# categorical_columns = [x for x in categorical_columns if x !=identifier]
# st.session_state.flag=flag
# st.session_state.identifier=identifier
st.title("Data Summary")
with st.expander("Data Inputs"):
st.subheader("Data Inputs")
ui_columns = st.columns((1, 1))
columns = set(st.session_state.df.columns)
with ui_columns[0]:
flag = st.selectbox(
label="Flag variable",
options=list(columns),
index=list(columns).index(st.session_state.flag) if 'flag' in st.session_state and st.session_state.flag is not None else 0
)
per_to_drop=st.slider(
label= "Select missing % threshold to drop columns",
key="per_to_drop",
min_value=0, max_value=100, value=st.session_state.per_to_drop if 'per_to_drop' in st.session_state else 80)
with ui_columns[-1]:
identifier = st.selectbox(
label="Identifier",
options=list(columns),
index=list(columns).index(st.session_state.identifier) if 'identifier' in st.session_state and st.session_state.identifier is not None else 0
)
# numerical_columns = st.session_state.df.select_dtypes(include=['number']).columns.tolist()
# numerical_columns = [x for x in numerical_columns if x !=flag]
# categorical_columns = st.session_state.df.select_dtypes(include=['object', 'category']).columns.tolist()
# categorical_columns = [x for x in categorical_columns if x !=identifier]
# st.session_state.numerical_columns=numerical_columns
# st.session_state.categorical_columns=categorical_columns
st.session_state.flag=flag
st.session_state.identifier=identifier
# st.subheader("Select Ordinal Columns:")
# with st.expander("Select Ordinal Columns:", expanded=True):
# select_all_checkbox = st.checkbox("Select All", key="select_all_checkbox")
# options = categorical_columns
# # Checkboxes for each column
# ordinal_columns = []
# for option in options:
# if select_all_checkbox or st.checkbox(option, key=f"checkbox_{option}"):
# ordinal_columns.append(option)
# st.session_state.ordinal_columns=list(ordinal_columns)
# nominal_columns=[x for x in categorical_columns if x not in ordinal_columns]
# st.session_state.numerical_columns=numerical_columns
# st.session_state.categorical_columns=categorical_columns
# st.session_state.ordinal_columns=ordinal_columns
#Ordinal columns order
# ordinal_col_dict = st.session_state.get("ordinal_col_dict", {})
# ordinal_col_dict = {}
# for col in ordinal_columns:
# st.subheader(f"Ordering for Unique Values in {col}")
# # Get unique values excluding NaN
# unique_values = st.session_state.df[col].dropna().unique()
# order_dict = {}
# for val in unique_values:
# order = st.number_input(f"Order for {val} in {col}", min_value=1, value=1)
# order_dict[val] = order
# ordinal_col_dict[col] = order_dict
# st.session_state.ordinal_col_dict = ordinal_col_dict
# User input for percentage threshold to drop columns
# per_to_drop = st.slider("Select Percentage Threshold to Drop Columns", min_value=0, max_value=100, value=10)
# st.session_state.per_to_drop = per_to_drop
summary_df = summary_stats(st.session_state.df, per_to_drop)
summary_df["Imputation method"]=None
summary_df["Outlier Treatment"]=None
summary_df["Imputation method"]=np.where(summary_df["type"]=='object','mode','mean')
summary_df["Outlier Treatment"]=np.where(summary_df["type"]=='object',summary_df["Outlier Treatment"],'capping')
summary_df = summary_df[~summary_df['variable'].isin([flag,identifier])]
st.session_state.summary_df=summary_df
st.subheader("Variable Summary")
IMPUTATION_OPTIONS = ["mean", "median", "linear interpolation", "polynomial interpolation", "ffill", "bfill","mode","fill_0"]
OUTLIER_OPTIONS = ["capping","remove", "mean", "median"]
NON_EDITABLE_COLUMNS = summary_df.columns.to_list()
def highlight_cols(s):
color = "#ccc"
return "background-color: %s" % color
column_config = {
"variable": st.column_config.TextColumn(disabled=True, width="medium"),
"type": st.column_config.TextColumn(disabled=True, width="medium"),
"%null": st.column_config.NumberColumn(disabled=True),
"unique%": st.column_config.NumberColumn(disabled=True),
"outlier%": st.column_config.NumberColumn(disabled=True),
"mean": st.column_config.NumberColumn(disabled=True),
"standard deviation": st.column_config.NumberColumn(disabled=True),
"variance": st.column_config.NumberColumn(disabled=True),
"skewness": st.column_config.NumberColumn(disabled=True),
"kurtosis": st.column_config.NumberColumn(disabled=True),
"min": st.column_config.NumberColumn(disabled=True),
"max": st.column_config.NumberColumn(disabled=True),
"range": st.column_config.NumberColumn(disabled=True),
"iqr": st.column_config.NumberColumn(disabled=True),
"median": st.column_config.NumberColumn(disabled=True),
"IV": st.column_config.NumberColumn(disabled=True),
"mode": st.column_config.TextColumn(disabled=True),
"distinct count": st.column_config.NumberColumn(disabled=True),
"Imputation method": st.column_config.SelectboxColumn(
options=IMPUTATION_OPTIONS, default=0
),
"Outlier Treatment": st.column_config.SelectboxColumn(
options=OUTLIER_OPTIONS, default=0
)
}
with st.expander("Variables from the data"):
edited_df = st.data_editor(
st.session_state.summary_df
.style.hide(axis="index")
.applymap(highlight_cols, subset=NON_EDITABLE_COLUMNS),
column_config=column_config,
)
if st.button("Submit changes"):
with st.spinner("Applying imputations"):
st.divider()
edited_df = st.session_state.summary_df.copy() # Make a copy of the original DataFrame
edited_df["Imputation method"] = st.session_state.summary_df["Imputation method"] # Update the imputation method column
edited_df["Outlier Treatment"] = st.session_state.summary_df["Outlier Treatment"] # Update the outlier treatment method column
imputed_df = imputed_df(st.session_state.df, edited_df, st.session_state.identifier, st.session_state.flag, st.session_state.per_to_drop)
st.session_state.imputed_df = imputed_df
st.markdown("Imputed DataFrame")
st.dataframe(imputed_df.head(10))
# Add a download button for the imputed DataFrame
#if st.session_state.imputed_df is not None:
# csv_data = st.session_state.imputed_df.to_csv(index=False).encode()
# st.download_button(
# label="Download Imputed DataFrame as CSV",
# data=csv_data,
# file_name="imputed_data.csv",
# mime="text/csv"
# )
# Add the download button after displaying the DataFrame
#if st.dataframe:
# if st.button("Download Imputed Data"):
# imputed_csv = imputed_df.to_csv(index=False)
# b64 = base64.b64encode(imputed_csv.encode()).decode()
# href = f'Download Imputed Data CSV File'
# st.markdown(href, unsafe_allow_html=True)
if "imputed_df" in st.session_state:
if st.button("Download Imputed Data"):
imputed_df = st.session_state.imputed_df
imputed_csv = imputed_df.to_csv(index=False)
b64 = base64.b64encode(imputed_csv.encode()).decode()
href = f'Download Imputed Data CSV File'
st.markdown(href, unsafe_allow_html=True)
# Check if the "Submit changes" button has been clicked
# if st.button("Submit"):
# st.write("Selected Columns and Ordinal Orders:")
# st.write(ordinal_col_dict)
# # Display summary stats
# summary_df = summary_stats(st.session_state.df, per_to_drop)
# st.write("Summary Stats:")
# st.write(summary_df)
# # User input for specific column
# col_name = st.selectbox("Select a specific column name:", [None] + list(st.session_state.df.columns))
# # Display stats for the specified column
# if col_name in st.session_state.df.columns:
# st.write(f"Stats for column '{col_name}':")
# # Extract relevant information from 'summary_df' for the specific column
# col_summary = summary_df[summary_df['variable'] == col_name][['%null', 'type', 'outlier%', 'unique%', 'mean', 'standard deviation', 'variance', 'skewness', 'kurtosis', 'min', 'max', 'range', 'iqr', 'median', 'mode', 'distinct count']]
# col_summary = col_summary.T.reset_index()
# col_summary.columns = ['Stats', 'Value']
# # Display the summary statistics as a table
# st.table(col_summary)
# else:
# st.warning("Please enter a valid column name.")