##### SAFE IMPUTATION ##### import pandas as pd import numpy as np from scipy import stats import warnings import streamlit as st import base64 def outlier_per_col(df,col): q1 = df[col].quantile(0.25) q3 = df[col].quantile(0.75) iqr = q3 - q1 # Kolmogorov-Smirnov test to find the distribution of the data dist_name, p = stats.normaltest(df[col])[0], stats.normaltest(df[col])[1] # if p > 0.05 then the data is normally distributed # if p <= 0.05 then the data is not normally is distributed if p <= 0.05: lower_bound = q1 - 1.5 * iqr upper_bound = q3 + 1.5 * iqr outlier_df = df[(df[col] < lower_bound) | (df[col] > upper_bound)] outlier_per = (len(outlier_df) / len(df[col])) * 100 else: z_score = np.abs(df[col] - df[col].mean()) / df[col].std() outlier_df = df[(z_score > 3)] outlier_per = len(outlier_df) / len(df[col]) * 100 return outlier_per def summary_stats(df,per_to_drop): summary_df = df.isna().sum().reset_index().rename(columns={'index': 'variable', 0: 'null'}) summary_df['%null'] = (100 * summary_df['null'] / len(df)).round(2) summary_df = summary_df.merge(df.dtypes.reset_index().rename(columns={'index': 'variable', 0: 'type'}), on='variable') summary_df = summary_df.drop(columns=['null']) summary_df = summary_df.drop(summary_df[summary_df['%null'] > per_to_drop].index) df_numeric = df.select_dtypes(exclude='object') df_categorical = df.select_dtypes(include='object') if not df_numeric.empty: with warnings.catch_warnings(): warnings.simplefilter("ignore") summary_df['outlier%'] = summary_df[summary_df['variable'].isin(df_numeric.columns)].apply(lambda x: outlier_per_col(df_numeric, x['variable']), axis=1) else: summary_df = pd.concat([summary_df, pd.DataFrame({'variable': [], 'outlier%': []})]) summary_df = summary_df.merge((df.select_dtypes(exclude=['object']).nunique() / df.select_dtypes(exclude=['object']).count() * 100).reset_index().rename(columns={'index': 'variable', 0: 'unique%'}).round(2), on='variable', how='left').round(2) summary_df = summary_df.merge(df.mean(numeric_only=True).reset_index().rename(columns={'index': 'variable', 0: 'mean'}).round(2), on='variable', how='left') summary_df = summary_df.merge(df.std(numeric_only=True).reset_index().rename(columns={'index': 'variable', 0: 'standard deviation'}).round(2), on='variable', how='left') summary_df = (summary_df.merge(df.var(numeric_only=True).reset_index().rename(columns={'index': 'variable', 0: 'variance'}), on='variable', how='left').assign(variance=lambda x: x['variance'].apply(lambda y: "{:.2f}".format(y)))) summary_df = summary_df.merge(df.skew(numeric_only=True).reset_index().rename(columns={'index': 'variable', 0: 'skewness'}).round(2), on='variable', how='left') summary_df = summary_df.merge(df.kurt(numeric_only=True).reset_index().rename(columns={'index': 'variable', 0: 'kurtosis'}).round(2), on='variable', how='left') summary_df = summary_df.merge(df.min(numeric_only=True).reset_index().rename(columns={'index': 'variable', 0: 'min'}), on='variable', how='left') summary_df = summary_df.merge(df.max(numeric_only=True).reset_index().rename(columns={'index': 'variable', 0: 'max'}), on='variable', how='left') summary_df['range'] = summary_df['max'] - summary_df['min'] if not df_numeric.empty: summary_df = summary_df.merge((df.describe().loc['75%'].T - df.describe().loc['25%'].T).reset_index().rename(columns={'index': 'variable', 0: 'iqr'}), on='variable', how='left') else: summary_df = pd.concat([summary_df, pd.DataFrame({'variable': [], 'iqr': []})]) summary_df = summary_df.merge(df.median(numeric_only=True).reset_index().rename(columns={'index': 'variable', 0: 'median'}), on='variable', how='left') if not df_categorical.empty: summary_df = summary_df.merge(df.select_dtypes(include=['object']).mode().iloc[0].reset_index().rename(columns={'index': 'variable', 0: 'mode'}), on='variable', how='left') summary_df = summary_df.merge(df.select_dtypes(include=['object']).nunique().reset_index().rename(columns={'index': 'variable', 0: 'distinct count'}), on='variable', how='left') else: summary_df = pd.concat([summary_df, pd.DataFrame({'variable': [], 'mode': []})]) summary_df = pd.concat([summary_df, pd.DataFrame({'variable': [], 'distinct count': []})]) return summary_df def mean_imputation(df, col): df[col].fillna(round(df[col].mean(), 2), inplace=True) def median_imputation(df, col): median = df[col].median() df[col].fillna(round(median, 2), inplace=True) def drop_rows(df, col): df.dropna(subset=[col], inplace=True) def drop_column(df, col): df.drop(col, axis=1, inplace=True) def mode_imputation(df, col): mode = df[col].mode()[0] df[col].fillna(mode, inplace=True) def arbitrary_val(df, col, val): df[col].fillna(val, inplace=True) def linear_interpolate(df, col): df[col].interpolate(method='linear', inplace=True) def polynomial_interpolate(df, col): df[col].interpolate(method='polynomial', order=2, inplace=True) def interpolate_padding_forward(df, col): df[col].fillna(method='ffill', inplace=True) def interpolate_padding_backward(df, col): df[col].fillna(method='bfill', inplace=True) def fill_0(df, col): df[col].fillna(0, inplace=True) def remove_outliers(df, col): dist_name, p = stats.normaltest(df[col])[0], stats.normaltest(df[col])[1] if p <= 0.05: q1 = df[col].quantile(0.25) q3 = df[col].quantile(0.75) iqr = q3 - q1 lower_bound = q1 - 1.5 * iqr upper_bound = q3 + 1.5 * iqr df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)] else: z_score = np.abs(df[col] - df[col].mean()) / df[col].std() df = df[(z_score < 3)] return df def mean_outlier(df, col): dist_name, p = stats.normaltest(df[col])[0], stats.normaltest(df[col])[1] if p <= 0.05: q1 = df[col].quantile(0.25) q3 = df[col].quantile(0.75) iqr = q3 - q1 lower_bound = q1 - 1.5 * iqr upper_bound = q3 + 1.5 * iqr df[col][df[col] < lower_bound] = df[col].mean() df[col][df[col] > upper_bound] = df[col].mean() else: z_score = np.abs(df[col] - df[col].mean()) / df[col].std() df.loc[z_score > 3, col] = df[col].mean() return df def median_outlier(df, col): dist_name, p = stats.normaltest(df[col])[0], stats.normaltest(df[col])[1] if p <= 0.05: q1 = df[col].quantile(0.25) q3 = df[col].quantile(0.75) iqr = q3 - q1 lower_bound = q1 - 1.5 * iqr upper_bound = q3 + 1.5 * iqr df[col][df[col] < lower_bound] = df[col].median() df[col][df[col] > upper_bound] = df[col].median() else: z_score = np.abs(df[col] - df[col].mean()) / df[col].std() df.loc[z_score > 3, col] = df[col].median() return df def outlier_capping(df, col): dist_name, p = stats.normaltest(df[col])[0], stats.normaltest(df[col])[1] if p <= 0.05: q1 = df[col].quantile(0.25) q3 = df[col].quantile(0.75) iqr = q3-q1 lower_bound = q1-1.5*iqr upper_bound = q1+1.5*iqr df[col] = np.where(df[col] >= upper_bound, upper_bound, np.where(df[col] <= lower_bound, lower_bound, df[col])) else: upper_limit = df[col].mean() + (3 * df[col].std()) lower_limit = df[col].mean() - (3 * df[col].std()) df[col] = np.where(df[col] >= upper_limit, upper_limit, np.where(df[col] <= lower_limit, lower_limit, df[col])) return df def perform_treatment_missing(df, col, treatments): if treatments == 'mean': mean_imputation(df, col) elif treatments == 'median': median_imputation(df, col) elif treatments == 'drop row': drop_rows(df, col) elif treatments == 'drop column': drop_column(df, col) elif treatments == 'linear interpolation': linear_interpolate(df, col) elif treatments == 'polynomial interpolation': polynomial_interpolate(df, col) elif treatments == 'ffill': interpolate_padding_forward(df, col) elif treatments == 'bfill': interpolate_padding_backward(df, col) elif treatments == 'mode': mode_imputation(df, col) elif treatments == 'fill_0': fill_0(df, col) else: return df[col] def perform_treatment_outlier(df, col, treatments): if treatments == 'remove': remove_outliers(df,col) elif treatments == 'mean': mean_outlier(df,col) elif treatments == 'median': median_imputation(df,col) elif treatments == 'capping': outlier_capping(df,col) else: return df[col] def imputed_df(df,edited_df,identifier,flag,per_to_drop=None): if per_to_drop is not None: null_percentage = df.isnull().sum() / df.shape[0] * 100 col_to_drop = null_percentage[null_percentage > per_to_drop].keys() df = df.drop(col_to_drop, axis=1) cols_with_one_unique = df.columns[df.nunique() == 1] df.drop(cols_with_one_unique, axis=1, inplace=True) for col in edited_df['variable'].to_list(): perform_treatment_missing(df,col, edited_df.loc[edited_df['variable'] == col, 'Imputation method'].iloc[0]) perform_treatment_outlier(df,col, edited_df.loc[edited_df['variable'] == col, 'Outlier Treatment'].iloc[0]) return df # flag = st.sidebar.selectbox("Flag Column", [None] + list(st.session_state.df.columns)) # identifier = st.sidebar.selectbox("Identifier Column", [None] + list(st.session_state.df.columns)) # numerical_columns = st.session_state.df.select_dtypes(include=['number']).columns.tolist() # numerical_columns = [x for x in numerical_columns if x !=flag] # categorical_columns = st.session_state.df.select_dtypes(include=['object', 'category']).columns.tolist() # categorical_columns = [x for x in categorical_columns if x !=identifier] # st.session_state.flag=flag # st.session_state.identifier=identifier st.title("Data Summary") with st.expander("Data Inputs"): st.subheader("Data Inputs") ui_columns = st.columns((1, 1)) columns = set(st.session_state.df.columns) with ui_columns[0]: flag = st.selectbox( label="Flag variable", options=list(columns), index=list(columns).index(st.session_state.flag) if 'flag' in st.session_state and st.session_state.flag is not None else 0 ) per_to_drop=st.slider( label= "Select missing % threshold to drop columns", key="per_to_drop", min_value=0, max_value=100, value=st.session_state.per_to_drop if 'per_to_drop' in st.session_state else 80) with ui_columns[-1]: identifier = st.selectbox( label="Identifier", options=list(columns), index=list(columns).index(st.session_state.identifier) if 'identifier' in st.session_state and st.session_state.identifier is not None else 0 ) # numerical_columns = st.session_state.df.select_dtypes(include=['number']).columns.tolist() # numerical_columns = [x for x in numerical_columns if x !=flag] # categorical_columns = st.session_state.df.select_dtypes(include=['object', 'category']).columns.tolist() # categorical_columns = [x for x in categorical_columns if x !=identifier] # st.session_state.numerical_columns=numerical_columns # st.session_state.categorical_columns=categorical_columns st.session_state.flag=flag st.session_state.identifier=identifier # st.subheader("Select Ordinal Columns:") # with st.expander("Select Ordinal Columns:", expanded=True): # select_all_checkbox = st.checkbox("Select All", key="select_all_checkbox") # options = categorical_columns # # Checkboxes for each column # ordinal_columns = [] # for option in options: # if select_all_checkbox or st.checkbox(option, key=f"checkbox_{option}"): # ordinal_columns.append(option) # st.session_state.ordinal_columns=list(ordinal_columns) # nominal_columns=[x for x in categorical_columns if x not in ordinal_columns] # st.session_state.numerical_columns=numerical_columns # st.session_state.categorical_columns=categorical_columns # st.session_state.ordinal_columns=ordinal_columns #Ordinal columns order # ordinal_col_dict = st.session_state.get("ordinal_col_dict", {}) # ordinal_col_dict = {} # for col in ordinal_columns: # st.subheader(f"Ordering for Unique Values in {col}") # # Get unique values excluding NaN # unique_values = st.session_state.df[col].dropna().unique() # order_dict = {} # for val in unique_values: # order = st.number_input(f"Order for {val} in {col}", min_value=1, value=1) # order_dict[val] = order # ordinal_col_dict[col] = order_dict # st.session_state.ordinal_col_dict = ordinal_col_dict # User input for percentage threshold to drop columns # per_to_drop = st.slider("Select Percentage Threshold to Drop Columns", min_value=0, max_value=100, value=10) # st.session_state.per_to_drop = per_to_drop summary_df = summary_stats(st.session_state.df, per_to_drop) summary_df["Imputation method"]=None summary_df["Outlier Treatment"]=None summary_df["Imputation method"]=np.where(summary_df["type"]=='object','mode','mean') summary_df["Outlier Treatment"]=np.where(summary_df["type"]=='object',summary_df["Outlier Treatment"],'capping') summary_df = summary_df[~summary_df['variable'].isin([flag,identifier])] st.session_state.summary_df=summary_df st.subheader("Variable Summary") IMPUTATION_OPTIONS = ["mean", "median", "linear interpolation", "polynomial interpolation", "ffill", "bfill","mode","fill_0"] OUTLIER_OPTIONS = ["capping","remove", "mean", "median"] NON_EDITABLE_COLUMNS = summary_df.columns.to_list() def highlight_cols(s): color = "#ccc" return "background-color: %s" % color column_config = { "variable": st.column_config.TextColumn(disabled=True, width="medium"), "type": st.column_config.TextColumn(disabled=True, width="medium"), "%null": st.column_config.NumberColumn(disabled=True), "unique%": st.column_config.NumberColumn(disabled=True), "outlier%": st.column_config.NumberColumn(disabled=True), "mean": st.column_config.NumberColumn(disabled=True), "standard deviation": st.column_config.NumberColumn(disabled=True), "variance": st.column_config.NumberColumn(disabled=True), "skewness": st.column_config.NumberColumn(disabled=True), "kurtosis": st.column_config.NumberColumn(disabled=True), "min": st.column_config.NumberColumn(disabled=True), "max": st.column_config.NumberColumn(disabled=True), "range": st.column_config.NumberColumn(disabled=True), "iqr": st.column_config.NumberColumn(disabled=True), "median": st.column_config.NumberColumn(disabled=True), "IV": st.column_config.NumberColumn(disabled=True), "mode": st.column_config.TextColumn(disabled=True), "distinct count": st.column_config.NumberColumn(disabled=True), "Imputation method": st.column_config.SelectboxColumn( options=IMPUTATION_OPTIONS, default=0 ), "Outlier Treatment": st.column_config.SelectboxColumn( options=OUTLIER_OPTIONS, default=0 ) } with st.expander("Variables from the data"): edited_df = st.data_editor( st.session_state.summary_df .style.hide(axis="index") .applymap(highlight_cols, subset=NON_EDITABLE_COLUMNS), column_config=column_config, ) if st.button("Submit changes"): with st.spinner("Applying imputations"): st.divider() edited_df = st.session_state.summary_df.copy() # Make a copy of the original DataFrame edited_df["Imputation method"] = st.session_state.summary_df["Imputation method"] # Update the imputation method column edited_df["Outlier Treatment"] = st.session_state.summary_df["Outlier Treatment"] # Update the outlier treatment method column imputed_df = imputed_df(st.session_state.df, edited_df, st.session_state.identifier, st.session_state.flag, st.session_state.per_to_drop) st.session_state.imputed_df = imputed_df st.markdown("Imputed DataFrame") st.dataframe(imputed_df.head(10)) # Add a download button for the imputed DataFrame #if st.session_state.imputed_df is not None: # csv_data = st.session_state.imputed_df.to_csv(index=False).encode() # st.download_button( # label="Download Imputed DataFrame as CSV", # data=csv_data, # file_name="imputed_data.csv", # mime="text/csv" # ) # Add the download button after displaying the DataFrame #if st.dataframe: # if st.button("Download Imputed Data"): # imputed_csv = imputed_df.to_csv(index=False) # b64 = base64.b64encode(imputed_csv.encode()).decode() # href = f'Download Imputed Data CSV File' # st.markdown(href, unsafe_allow_html=True) if "imputed_df" in st.session_state: if st.button("Download Imputed Data"): imputed_df = st.session_state.imputed_df imputed_csv = imputed_df.to_csv(index=False) b64 = base64.b64encode(imputed_csv.encode()).decode() href = f'Download Imputed Data CSV File' st.markdown(href, unsafe_allow_html=True) # Check if the "Submit changes" button has been clicked # if st.button("Submit"): # st.write("Selected Columns and Ordinal Orders:") # st.write(ordinal_col_dict) # # Display summary stats # summary_df = summary_stats(st.session_state.df, per_to_drop) # st.write("Summary Stats:") # st.write(summary_df) # # User input for specific column # col_name = st.selectbox("Select a specific column name:", [None] + list(st.session_state.df.columns)) # # Display stats for the specified column # if col_name in st.session_state.df.columns: # st.write(f"Stats for column '{col_name}':") # # Extract relevant information from 'summary_df' for the specific column # col_summary = summary_df[summary_df['variable'] == col_name][['%null', 'type', 'outlier%', 'unique%', 'mean', 'standard deviation', 'variance', 'skewness', 'kurtosis', 'min', 'max', 'range', 'iqr', 'median', 'mode', 'distinct count']] # col_summary = col_summary.T.reset_index() # col_summary.columns = ['Stats', 'Value'] # # Display the summary statistics as a table # st.table(col_summary) # else: # st.warning("Please enter a valid column name.")