Paul Kiage
Hugging Face Deployment Setup (#11)
7d861ad unverified
raw
history blame
11.1 kB
# DATA MANIPULATION & ANALYSIS
import pickle
import streamlit as st
# Arrays
import numpy as np
# DataFrames and Series
import pandas as pd
# Returns the indices of the maximum values along an axis
from numpy import argmax
# MODELLING
# Logistic regression
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import StratifiedKFold
# XGBoosted Decision Trees
import xgboost as xgb
# REPORTING, EVALUATION, AND INTERPRETATION
# Classification report
from sklearn.metrics import classification_report
# Reciever Operator Curve
from sklearn.metrics import roc_curve
# Evaluate a score by cross-validation
from sklearn.model_selection import cross_val_score
# # Functions
def drop_columns(df, columns):
return df.drop(columns, axis=1)
def remove_less_than_0_columns(df, column):
df[column].dropna()
return df.loc[(df[column] != 0).any(1)]
def boolean_int_condition_label(df, label_column_name, condition):
df[label_column_name] = condition
y = df[label_column_name].astype(int)
df = drop_columns(df, label_column_name)
return y, df
@st.cache(suppress_st_warning=True)
def undersample_training_data(
df: pd.DataFrame, column_name: str, split_dataset
):
count_nondefault, count_default = split_dataset.X_y_train[
column_name
].value_counts()
nondefaults = df[df[column_name] == 0] # 0
defaults = df[df[column_name] == 1]
under_sample = min(count_nondefault, count_default)
nondefaults_under = nondefaults.sample(under_sample)
defaults_under = defaults.sample(under_sample)
X_y_train_under = pd.concat(
[
nondefaults_under.reset_index(drop=True),
defaults_under.reset_index(drop=True),
],
axis=0,
)
X_train_under = X_y_train_under.drop([column_name], axis=1) # remove label
y_train_under = X_y_train_under[column_name] # label only
class_balance_default = X_y_train_under[column_name].value_counts()
return [
X_train_under,
y_train_under,
X_y_train_under,
class_balance_default,
]
def create_coeffient_feature_dictionary_logistic_model(
logistic_model, training_data
):
return {
feat: coef
for coef, feat in zip(
logistic_model.coef_[0, :], training_data.columns
)
}
@st.cache(suppress_st_warning=True)
def test_variables_logistic(X_train, y_train):
# Create and fit the logistic regression model
return LogisticRegression(solver="lbfgs").fit(X_train, np.ravel(y_train))
@st.cache(suppress_st_warning=True)
def print_coeff_logistic(clf_logistic_model, split_dataset):
# Dictionary of features and their coefficients
return create_coeffient_feature_dictionary_logistic_model(
clf_logistic_model, split_dataset.X_train
)
@st.cache(suppress_st_warning=True, hash_funcs={
xgb.XGBClassifier: pickle.dumps
})
def test_variables_gbt(X_train, y_train):
# Using hyperparameters learning_rate and max_depth
return xgb.XGBClassifier(
learning_rate=0.1,
max_depth=7,
use_label_encoder=False,
eval_metric="logloss",
).fit(X_train, np.ravel(y_train), eval_metric="logloss")
# In[398]:
def get_df_trueStatus_probabilityDefault_threshStatus_loanAmount(
model, X, y, threshold, loan_amount_col_name
):
true_status = y.to_frame()
loan_amount = X[loan_amount_col_name]
clf_prediction_prob = model.predict_proba(np.ascontiguousarray(X))
clf_prediction_prob_df = pd.DataFrame(
clf_prediction_prob[:, 1], columns=["PROB_DEFAULT"]
)
clf_thresh_predicted_default_status = (
clf_prediction_prob_df["PROB_DEFAULT"]
.apply(lambda x: 1 if x > threshold else 0)
.rename("PREDICT_DEFAULT_STATUS")
)
return pd.concat(
[
true_status.reset_index(drop=True),
clf_prediction_prob_df.reset_index(drop=True),
clf_thresh_predicted_default_status.reset_index(drop=True),
loan_amount.reset_index(drop=True),
],
axis=1,
)
def find_best_threshold_J_statistic(y, clf_prediction_prob_df):
fpr, tpr, thresholds = roc_curve(y, clf_prediction_prob_df)
# get the best threshold
# Youden’s J statistic tpr-fpr
# Argmax to get the index in
# thresholds
return thresholds[argmax(tpr - fpr)]
# In[399]:
# Function that makes dataframe with probability of default, predicted default status based on threshold
# and actual default status
def model_probability_values_df(model, X):
return pd.DataFrame(model.predict_proba(X)[:, 1], columns=["PROB_DEFAULT"])
def apply_threshold_to_probability_values(probability_values, threshold):
return (
probability_values["PROB_DEFAULT"]
.apply(lambda x: 1 if x > threshold else 0)
.rename("PREDICT_DEFAULT_STATUS")
)
@st.cache(suppress_st_warning=True)
def find_best_threshold_J_statistic(y, clf_prediction_prob_df):
fpr, tpr, thresholds = roc_curve(y, clf_prediction_prob_df)
# get the best threshold
J = tpr - fpr # Youden’s J statistic
ix = argmax(J)
return thresholds[ix]
# In[401]:
def create_cross_validation_df(
X, y, eval_metric, seed, trees, n_folds, early_stopping_rounds
):
# Test data x and y
DTrain = xgb.DMatrix(X, label=y)
# auc or logloss
params = {
"eval_metric": eval_metric,
"objective": "binary:logistic", # logistic say 0 or 1 for loan status
"seed": seed,
}
# Create the data frame of cross validations
cv_df = xgb.cv(
params,
DTrain,
num_boost_round=trees,
nfold=n_folds,
early_stopping_rounds=early_stopping_rounds,
shuffle=True,
)
return [DTrain, cv_df]
# In[450]:
def cross_validation_scores(model, X, y, nfold, score, seed):
# return cv scores of metric
return cross_val_score(
model,
np.ascontiguousarray(X),
np.ravel(np.ascontiguousarray(y)),
cv=StratifiedKFold(n_splits=nfold, shuffle=True, random_state=seed),
scoring=score,
)
def default_status_per_threshold(threshold_list, prob_default):
threshold_default_status_list = []
for threshold in threshold_list:
threshold_default_status = prob_default.apply(
lambda x: 1 if x > threshold else 0
)
threshold_default_status_list.append(threshold_default_status)
return threshold_default_status_list
def classification_report_per_threshold(
threshold_list, threshold_default_status_list, y_test
):
target_names = ["Non-Default", "Default"]
classification_report_list = []
for threshold_default_status in threshold_default_status_list:
thresh_classification_report = classification_report(
y_test,
threshold_default_status,
target_names=target_names,
output_dict=True,
zero_division=0,
)
classification_report_list.append(thresh_classification_report)
# Return threshold classification report dict
return dict(zip(threshold_list, classification_report_list))
def thresh_classification_report_recall_accuracy(
thresh_classification_report_dict,
):
thresh_def_recalls_list = []
thresh_nondef_recalls_list = []
thresh_accs_list = []
for x in [*thresh_classification_report_dict]:
thresh_def_recall = thresh_classification_report_dict[x]["Default"][
"recall"
]
thresh_def_recalls_list.append(thresh_def_recall)
thresh_nondef_recall = thresh_classification_report_dict[x][
"Non-Default"
]["recall"]
thresh_nondef_recalls_list.append(thresh_nondef_recall)
thresh_accs = thresh_classification_report_dict[x]["accuracy"]
thresh_accs_list.append(thresh_accs)
return [
thresh_def_recalls_list,
thresh_nondef_recalls_list,
thresh_accs_list,
]
def create_accept_rate_list(start, end, samples):
return np.linspace(start, end, samples, endpoint=True)
def create_strategyTable_df(
start, end, samples, actual_probability_predicted_acc_rate, true, currency
):
accept_rates = create_accept_rate_list(start, end, samples)
thresholds_strat = []
bad_rates_start = []
Avg_Loan_Amnt = actual_probability_predicted_acc_rate[true].mean()
num_accepted_loans_start = []
for rate in accept_rates:
# Calculate the threshold for the acceptance rate
thresh = np.quantile(
actual_probability_predicted_acc_rate["PROB_DEFAULT"], rate
).round(3)
# Add the threshold value to the list of thresholds
thresholds_strat.append(
np.quantile(
actual_probability_predicted_acc_rate["PROB_DEFAULT"], rate
).round(3)
)
# Reassign the loan_status value using the threshold
actual_probability_predicted_acc_rate[
"PREDICT_DEFAULT_STATUS"
] = actual_probability_predicted_acc_rate["PROB_DEFAULT"].apply(
lambda x: 1 if x > thresh else 0
)
# Create a set of accepted loans using this acceptance rate
accepted_loans = actual_probability_predicted_acc_rate[
actual_probability_predicted_acc_rate["PREDICT_DEFAULT_STATUS"]
== 0
]
# Calculate and append the bad rate using the acceptance rate
bad_rates_start.append(
np.sum((accepted_loans[true]) / len(accepted_loans[true])).round(3)
)
# Accepted loans
num_accepted_loans_start.append(len(accepted_loans))
# Calculate estimated value
money_accepted_loans = [
accepted_loans * Avg_Loan_Amnt
for accepted_loans in num_accepted_loans_start
]
money_bad_accepted_loans = [
2 * money_accepted_loan * bad_rate
for money_accepted_loan, bad_rate in zip(
money_accepted_loans, bad_rates_start
)
]
zip_object = zip(money_accepted_loans, money_bad_accepted_loans)
estimated_value = [
money_accepted_loan - money_bad_accepted_loan
for money_accepted_loan, money_bad_accepted_loan in zip_object
]
accept_rates = ["{:.2f}".format(elem) for elem in accept_rates]
thresholds_strat = ["{:.2f}".format(elem) for elem in thresholds_strat]
bad_rates_start = ["{:.2f}".format(elem) for elem in bad_rates_start]
estimated_value = ["{:.2f}".format(elem) for elem in estimated_value]
return (
pd.DataFrame(
zip(
accept_rates,
thresholds_strat,
bad_rates_start,
num_accepted_loans_start,
estimated_value,
),
columns=[
"Acceptance Rate",
"Threshold",
"Bad Rate",
"Num Accepted Loans",
f"Estimated Value ({currency})",
],
)
.sort_values(by="Acceptance Rate", axis=0, ascending=False)
.reset_index(drop=True)
)