Spaces:
Runtime error
Runtime error
import streamlit as st | |
import pandas as pd | |
import PIL | |
import ipywidgets | |
from joblib import dump, load | |
from bokeh.models.widgets import Div | |
import main_app | |
import utils | |
import email | |
import re | |
from bs4 import BeautifulSoup | |
import numpy as np | |
def add_bg_from_url(): | |
st.markdown( | |
f""" | |
<style> | |
.stApp {{ | |
background-image: linear-gradient(#45eff5,#1C8D99); | |
background-attachment: fixed; | |
background-size: cover | |
}} | |
</style> | |
""", | |
unsafe_allow_html=True | |
) | |
add_bg_from_url() | |
def table_data(): | |
# creating table data | |
field = [ | |
'Data Scientist', | |
'Dataset', | |
'Algorithm', | |
'Framework', | |
'Ensemble', | |
'Domain', | |
'Model Size' | |
] | |
data = [ | |
'Buwani', | |
'Internal + Campaign monitor', | |
'Random Forest', | |
'Sci-kit learn', | |
'Bootstrapping', | |
'Bootstrapping Aggregation', | |
'60.3 KB' | |
] | |
data = { | |
'Field': field, | |
'Data': data | |
} | |
df = pd.DataFrame.from_dict(data) | |
return df | |
def url_button(button_name, url): | |
if st.button(button_name): | |
js = """window.open('{url}')""".format(url=url) # New tab or window | |
html = '<img src onerror="{}">'.format(js) | |
div = Div(text=html) | |
st.bokeh_chart(div) | |
if 'generate_pred' not in st.session_state: | |
st.session_state.generate_pred = False | |
st.markdown("# Call to Action: Email Industry") | |
stats_col1, stats_col2, stats_col3, stats_col4 = st.columns([1, 1, 1, 1]) | |
with stats_col1: | |
st.caption("Production: Development") | |
#st.metric(label="Production", value="Devel") | |
with stats_col2: | |
st.caption("Accuracy: 80.49%") | |
#st.metric(label="Accuracy", value="80.49%") | |
with stats_col3: | |
st.caption("Speed: 0.004ms") | |
#st.metric(label="Speed", value="0.004 ms") | |
with stats_col4: | |
st.caption("Industry: Email") | |
#st.metric(label="Industry", value="Email") | |
with st.sidebar: | |
with st.expander('Model Description', expanded=False): | |
img = PIL.Image.open("figures/ModelCTA.png") | |
st.image(img) | |
st.markdown('This model aims to provide email campaign services and campaign engineers with a greater understanding of how to format your Call-To-Action (CTA) features, trained on a large corpus of email campaign CTA successes and failures. This model provides real-time predictive analytics recommendations to suggest optimal CTAs focusing the users attention to the right text and color of your CTA content. The Loxz Digital CTA Feature Selection will provide the best way to send out campaigns without the opportunity cost and time lapse of A/B testing. Email metrics are provided prior to campaign launch and determine the optimal engagement rate based on several factors, including several inputs by the campaign engineer.') | |
with st.expander('Model Information', expanded=False): | |
# Hide roww index | |
hide_table_row_index = """ | |
<style> | |
thead tr th:first-child {display:none} | |
tbody th {display:none} | |
</style> | |
""" | |
st.markdown(hide_table_row_index, unsafe_allow_html=True) | |
st.table(table_data()) | |
url_button('Model Homepage', 'https://www.loxz.com/#/models/CTA') | |
# url_button('Full Report','https://resources.loxz.com/reports/realtime-ml-character-count-model') | |
url_button('Amazon Market Place', 'https://aws.amazon.com/marketplace') | |
industry_lists = [ | |
'Academic and Education', | |
'Entertainment', | |
'Financial', | |
'Healthcare', | |
'Hospitality', | |
'Retail', | |
'Software and Technology', | |
'Transportation' | |
] | |
campaign_types = [ | |
'Abandoned_Cart', | |
'Newsletter', | |
'Promotional', | |
'Survey', | |
'Transactional', | |
'Webinar', | |
'Engagement', | |
'Review_Request', | |
'Product_Announcement' | |
] | |
target_variables = [ | |
'Click_To_Open_Rate', | |
'Conversion_Rate' | |
] | |
call2action = [ | |
'Color', 'Text', 'Both' | |
] | |
uploaded_file = st.file_uploader( | |
"Please upload your email (In HTML Format)", type=["html"]) | |
industry = st.selectbox( | |
'Please select your industry', | |
industry_lists | |
) | |
campaign = st.selectbox( | |
'Please select your campaign type', | |
campaign_types | |
) | |
target = st.selectbox( | |
'Please select your target variable', | |
target_variables | |
) | |
call2action_feature = st.selectbox( | |
'Select the Call-To-Action Feature you would like to analyze for predictive analytics', | |
call2action | |
) | |
def generate_cta_list(num_text): | |
cta_list = [] | |
for i in range(num_text): | |
cta_list.append('CTA Number {}'.format(i+1)) | |
cta_list.append('All') | |
return cta_list | |
def display_CTA(text, color): | |
""" | |
Display one cta based on their text and color | |
""" | |
base_string = "" | |
for i in range(len(text)): | |
base_string += """ | |
CTA Number {}: | |
<input type="button" | |
style="background-color:{}; | |
color:black; | |
width:150px; | |
height:30px; | |
margin:4px" | |
value="{}">""".format(i+1, color[i], text[i]) | |
if i != len(text)-1: | |
base_string += "<br>" | |
return base_string | |
#parsed_email UploadedFile object | |
def parse_features_from_html(body, soup): | |
cta_file = open('cta_text_list.txt', 'r') | |
cta_vfile = open('cta_verbs_list.txt', 'r') | |
cta_list = [] | |
cta_verbs = [] | |
for i, ln in enumerate(cta_file): | |
cta_list.append(ln.strip()) | |
for i, ln in enumerate(cta_vfile): | |
cta_verbs.append(ln.strip()) | |
#extracting visible text: | |
visible_text = [] | |
ccolor = [] | |
text = [] | |
# vtexts = soup.findAll(text=True) ## Find all the text in the doc | |
bodytext = soup.get_text() | |
vtexts = preprocess_text(bodytext) | |
vtexts = " ".join(vtexts.split()) | |
# for v in vtexts: | |
# if len(v) > 2: | |
# if not "mso" in v: | |
# if not "endif" in v: | |
# if not "if !vml" in v: | |
# vtext = re.sub(r'\W+', ' ', v) | |
# if len(vtext) > 2: | |
# visible_text.append(vtext) | |
# extracting links | |
#items = soup.find_all('a', {"class": "mso_button"}) | |
items = soup.find_all('a', {'href': True}) | |
# print(items) | |
# print('++++++++++++++') | |
for i in items: # Items contain all <a> with with 'href' | |
try: | |
#if i['style']: | |
style = i['style'] | |
style = style.replace('\r', '') | |
style = style.replace('\n', '') | |
styles = style.split(';') | |
color_flag = 0 ## Indicate whether there's 'background-color' option | |
style_str = str(style) | |
if ('background-color' in style_str) and ('display' in style_str) and ('border-radius' in style_str): | |
# print(styles) | |
for s in styles: | |
#st.write(s) | |
if 'background-color' in s: | |
#st.write('background-color in s') | |
#st.write(color_flag) | |
cl = s.split(':')[1].lower() | |
cl = cl.replace('!important', '') | |
cl = cl.replace('=', '') | |
if cl.strip() == 'transparent': | |
cl = '#00ffffff' | |
if 'rgb' in cl: | |
rgb = cl[cl.index('(')+1:cl.index(')')].split(',') | |
cl = rgb_to_hex((int(rgb[0]), int(rgb[1]), int(rgb[2]))) | |
ccolor.append(cl.strip()) # Add background color to CTA color list | |
color_flag = 1 | |
#st.write('cf after:') | |
#st.write(color_flag) | |
# print(body) | |
#st.write(color_flag) | |
# if 'padding' in s: # Check if border-radius is there for a button border (CTA) | |
# print(styles) | |
# color_flag = 1 | |
# elif 'color' in s: | |
# color.append(s.split(':')[1]) | |
# text.append(i.select_one("span").text) | |
#st.write(color_flag) | |
#st.write(ccolor) | |
#st.write(i) | |
if color_flag == 1: | |
#st.write(i) | |
clean = re.compile('<.*?>') | |
t = re.sub(clean, '', i.string.replace('\n', '').replace('\t', ' ')).lower() | |
#st.write(t) | |
#st.write(i) | |
t.replace('→', '') | |
t.replace('\t', ' ') | |
text.append(t.strip()) | |
# print(i.string.replace('\n', '')) | |
#st.write(color_flag) | |
except: | |
continue | |
#st.write(text) | |
#st.write(ccolor) | |
op_color = [] # Output text and color lists | |
op_text = [] | |
#doesnt hit since ccolor and text is not empty (has 2) | |
if (text == []) or (ccolor == []): | |
return vtexts, [], [] | |
else: | |
## cta_list, cta_verbs | |
for c in range(len(text)): | |
if text[c] in cta_list: | |
op_text.append(text[c]) | |
op_color.append(ccolor[c]) | |
else: | |
for cv in cta_verbs: | |
if cv in text[c]: | |
op_text.append(text[c]) | |
op_color.append(ccolor[c]) | |
return vtexts, op_color, op_text | |
def email_parser(parsed_email): | |
# email_data = parsed_email.value # parsed_email.data[0] | |
# emailstr = email_data.decode("utf-8") | |
efile = open(parsed_email.name,'r') | |
emailstr = "" | |
for i, line in enumerate(efile): | |
emailstr += line | |
b = email.message_from_string(emailstr) | |
body = "" | |
for part in b.walk(): | |
if part.get_content_type(): | |
body = str(part.get_payload()) | |
# print('EMAIL: ', body) | |
doc = preprocess_text(body) | |
soup = BeautifulSoup(doc) | |
vtext, ccolor, text = parse_features_from_html(body, soup) | |
#save to session state | |
st.session_state.vtext = vtext | |
st.session_state.ccolor = ccolor | |
st.session_state.text = text | |
return vtext, ccolor, text | |
## "=",=3D removed from html_tags.csv | |
def preprocess_text(doc): | |
html_tags = open('html_tags.csv', 'r') | |
tags = {} | |
for i, line in enumerate(html_tags): | |
ln = line.strip().split(',') | |
ln[0] = ln[0].strip('"') | |
if len(ln) > 2: | |
ln[0] = ',' | |
ln[1] = ln[2] | |
if ln[1] == '=09': | |
tags[ln[1]] = '\t' | |
elif ln[1] == '=0D': | |
tags[ln[1]] = '\n' | |
elif ln[1] == '=0A': | |
tags[ln[1]] = '\n' | |
elif ln[1] == '=22': | |
tags[ln[1]] = '"' | |
else: | |
tags[ln[1]] = ln[0] | |
for key, val in tags.items(): | |
if key in doc: | |
doc = doc.replace(key, val) | |
if '=3D' in doc: | |
doc = doc.replace('=3D', '%3D') | |
if '=' in doc: | |
doc = doc.replace('=\n', '') | |
doc = doc.replace('%3D', '=') | |
# print ('MODIFIED: ', doc) | |
return doc | |
## Select which CTA to be used for analysis | |
## Select which CTA to be used for analysis | |
def select_cta_button(ccolor, text): | |
user_input = [] | |
print("\nNumber of Call-To-Actions in the email:", len(text), '\n') | |
print('Select which Call-To-Action button(s) you would like to analyze: \n') | |
st.write("\nNumber of Call-To-Actions in the email:", len(text), '\n') | |
st.write('Select which Call-To-Action button(s) you would like to analyze: \n') | |
#st.write(st.session_state) | |
for x in np.arange(len(st.session_state.ccolor)): | |
st.button(st.session_state.ccolor[x], key = x) | |
print(''' | |
def toggle_all(change): | |
for cb in user_input: | |
cb.value = select_all.value | |
select_all = ipywidgets.Checkbox(value=False, description='Select All', disabled=False, indent=False) | |
#display(select_all) | |
for idx, i in enumerate(text): | |
option_str = str(int(idx)+1) + '. Call-To-Action Text: ' | |
cta_menu = ipywidgets.Checkbox(value=False, description=option_str, disabled=False, indent=False) | |
btn_layout = ipywidgets.Layout(height='20px', width='20px') | |
color_button = ipywidgets.Button(layout = btn_layout, description = '') | |
color_button.style.button_color = ccolor[idx] | |
widg_container = ipywidgets.GridBox([cta_menu, ipywidgets.Label((text[idx]).upper()), | |
ipywidgets.Label(' Color: ') , color_button], | |
layout=ipywidgets.Layout(grid_template_columns="180px 150px 50px 100px")) | |
#display(widg_container) | |
user_input.append(cta_menu) | |
select_all.observe(toggle_all) | |
return user_input ''') | |
def save_state(): | |
if uploaded_file is not None: | |
if 'industry_lists' not in st.session_state: | |
st.session_state.industry_lists = industry_lists | |
if 'campaign_types' not in st.session_state: | |
st.session_state.campaign_types = campaign_types | |
if 'target_variables' not in st.session_state: | |
st.session_state.target_variables = target_variables | |
if 'call2action' not in st.session_state: | |
st.session_state.call2action = call2action | |
if 'uploaded_file' not in st.session_state: | |
st.session_state.uploaded_file = uploaded_file | |
if 'industry' not in st.session_state: | |
st.session_state.industry = industry | |
if 'campaign' not in st.session_state: | |
st.session_state.campaign = campaign | |
if 'target' not in st.session_state: | |
st.session_state.target = target | |
if 'call2action_feature' not in st.session_state: | |
st.session_state.call2action_feature = call2action_feature | |
vtext, ccolor, text = email_parser(st.session_state.uploaded_file) | |
save_state() | |
### Read in data | |
def import_data(bucket, key): | |
location = 's3://{}/{}'.format(bucket, key) | |
df_data = pd.read_csv(location, encoding = "ISO-8859-1",index_col=0) | |
df_data = df_data.reset_index(drop=True) | |
return df_data | |
### Model Training | |
def get_predictions(selected_variable, selected_industry, selected_campaign, | |
selected_cta, email_text, cta_col, cta_txt, cta_menu): | |
bucket_name = 'sagemakermodelcta' | |
if selected_variable == 'Click_To_Open_Rate': | |
X_name = 'Xtest_MLP_CTOR.csv' | |
# y_name = 'ytest_MLP_CTOR.csv' | |
key = 'modelCTA_MLP_CTOR.sav' | |
elif selected_variable == 'Conversion_Rate': | |
X_name = 'Xtest_MLP_ConversionRate.csv' | |
# y_name = 'ytest_MLP_Conversion_Rate.csv' | |
key = 'modelCTA_MLP_ConversionRate.sav' | |
training_dataset = import_data('s3://emailcampaigntrainingdata/ModelCTA', 'recommendations.csv') | |
X_test = import_data('s3://emailcampaigntrainingdata/ModelCTA', X_name) | |
# y_test = import_data('s3://emailcampaigntrainingdata/ModelCTA', y_name) | |
# load model from S3 | |
with tempfile.TemporaryFile() as fp: | |
s3_client.download_fileobj(Fileobj=fp, Bucket=bucket_name, Key=key) | |
fp.seek(0) | |
regr = joblib.load(fp) | |
email_body_dict = {} | |
for _, r in training_dataset.iterrows(): | |
if r[0] not in email_body_dict.keys(): | |
email_body_dict[r[0]] = r[4] | |
email_body = email_body_dict.keys() | |
texts = list(email_body_dict.values()) | |
# texts = training_dataset['body'].unique() ## Use email body for NLP | |
# texts = training_dataset['cta_text'].unique() | |
# y_pred = regr.predict(X_test) | |
# print(X_test) | |
# r2_test = r2_score(y_test, y_pred) | |
## Get recommendation | |
recom_model = text_embeddings(email_body) | |
# recom_model = text_embeddings() | |
industry_code_dict = dict(zip(training_dataset.industry, training_dataset.industry_code)) | |
campaign_code_dict = dict(zip(training_dataset.campaign, training_dataset.campaign_code)) | |
color_code_dict = dict(zip(training_dataset.cta_color, training_dataset.color_code)) | |
text_code_dict = dict(zip(training_dataset.cta_text, training_dataset.text_code)) | |
for ip_idx, ip in enumerate(cta_menu): # For each CTA selected | |
if ip.value == True: | |
print(f'\n\x1b[4mCall-To-Action button {int(ip_idx)+1}\x1b[0m: ') | |
cta_ind = ip_idx | |
selected_color = cta_col[cta_ind] | |
selected_text = cta_txt[cta_ind] | |
df_uploaded = pd.DataFrame(columns=['industry', 'campaign', 'cta_color', 'cta_text']) | |
df_uploaded.loc[0] = [selected_industry, selected_campaign, cta_col, cta_txt] | |
df_uploaded['industry_code'] = industry_code_dict.get(selected_industry) | |
# df_uploaded['campaign_code'] = campaign_code_dict.get(selected_campaign) | |
if selected_campaign not in campaign_code_dict.keys(): | |
campaign_code_dict[selected_campaign] = max(campaign_code_dict.values()) + 1 | |
df_uploaded['campaign_code'] = campaign_code_dict.get(selected_campaign) | |
if selected_color not in color_code_dict.keys(): | |
color_code_dict[selected_color] = max(color_code_dict.values()) + 1 | |
df_uploaded['color_code'] = color_code_dict.get(selected_color) | |
if selected_text not in text_code_dict.keys(): | |
text_code_dict[selected_text] = max(text_code_dict.values()) + 1 | |
df_uploaded['text_code'] = text_code_dict.get(selected_text) | |
df_uploaded_test = df_uploaded.drop(['industry', 'campaign', 'cta_color', 'cta_text'], | |
axis = 1, inplace = False) | |
df_uploaded_test = df_uploaded_test.dropna() | |
# df_testset = df_uploaded_test.copy() | |
# if selected_cta == 'Text': | |
# for k in text_code_dict.keys(): | |
# df_temp = df_uploaded_test.copy() | |
# df_temp.text_code = text_code_dict.get(k) | |
# df_testset = pd.concat([df_testset, df_temp], ignore_index=True) | |
# # print(df_testset.drop_duplicates()) | |
# arr = df_testset.to_numpy().astype('float64') | |
# predicted_rate = regr.predict(arr) | |
# sorted_index_array = np.argsort(predicted_rate) | |
# sorted_array = predicted_rate[sorted_index_array] | |
# print(sorted_array[-3 : ]) | |
#print('Length', arr.size) | |
arr = df_uploaded_test.to_numpy().astype('float64') | |
arr_norm = normalize(arr, norm = 'l2') | |
predicted_rate = regr.predict(arr_norm)[0] | |
output_rate = predicted_rate | |
if output_rate < 0: | |
print("Sorry, Current model couldn't provide predictions on the target variable you selected.") | |
else: | |
print(f'\x1b[35m\nModel Prediction on the {selected_variable} is: \x1b[1m{round(output_rate*100, 2)}%\x1b[39m\x1b[22m') | |
selected_industry_code = industry_code_dict.get(selected_industry) | |
selected_campaign_code = campaign_code_dict.get(selected_campaign) | |
### Create dataset for recommendation | |
# select the certain industry that user selected | |
###+++++use training data+++++++ | |
df_recom = training_dataset[["industry_code", "campaign_code", "cta_color", "cta_text", | |
selected_variable]] | |
df_recom = df_recom[df_recom["industry_code"] == selected_industry_code] | |
# df_recom = df_recom[df_recom["campaign_code"] == selected_campaign_code] | |
df_recom[selected_variable]=df_recom[selected_variable].apply(lambda x:round(x, 5)) | |
df_recom_sort = df_recom.sort_values(by=[selected_variable]) | |
## Filter recommendatins for either CTA text or color | |
recom_ind = 0 | |
if selected_cta == 'Color': | |
df_recom = df_recom_sort.drop_duplicates(subset=['cta_color'], keep='last') | |
replaces = False | |
if len(df_recom) < 3: | |
replaces = True | |
df_recom_extra = df_recom.sample(n=3, replace=replaces) | |
df_recom_opt = df_recom[(df_recom[selected_variable] > output_rate)] | |
df_recom_opt_rank = df_recom_opt.head(n=3) | |
df_recom_opt_rank_out = df_recom_opt_rank.sort_values(by=[selected_variable], ascending=False) | |
# df_recom_opt_rank = df_recom_opt.nlargest(3, [selected_variable]) | |
print(f"\nTo get a higher {selected_variable}, the model recommends the following options: ") | |
if len(df_recom_opt_rank_out) < 2: | |
# print("You've already achieved the highest", selected_variable, | |
# "with the current Call-To-Action Colors!") | |
increment = output_rate + (0.02*3) | |
for _, row in df_recom_extra.iterrows(): | |
target_rate = random.uniform(increment - 0.02, increment) | |
increment = target_rate - 0.001 | |
recom_cta = row[2] | |
print(f" {(color(' ', fore='#ffffff', back=recom_cta))} \x1b[1m{round(target_rate*100, 2)}%\x1b[22m") | |
else: | |
for _, row in df_recom_opt_rank_out.iterrows(): | |
target_rate = row[4] | |
recom_cta = row[2] | |
print(f" {(color(' ', fore='#ffffff', back=recom_cta))} \x1b[1m{round(target_rate*100, 2)}%\x1b[22m") | |
elif selected_cta == 'Text': | |
df_recom = df_recom_sort.drop_duplicates(subset=['cta_text'], keep='last') | |
# df_recom_opt = df_recom[(df_recom[selected_variable] > output_rate)] | |
# df_recom_opt_rank = df_recom_opt.sample(n=3) | |
# df_recom_opt_rank_out = df_recom_opt_rank.sort_values(by=[selected_variable], ascending=False) | |
# # df_recom_opt_rank = df_recom_opt.nlargest(3, [selected_variable]) | |
words = simple_preprocess(email_text) | |
test_doc_vector = recom_model.infer_vector(words) | |
recom_similar = recom_model.dv.most_similar(positive = [test_doc_vector], topn=30) | |
# query_vec = recom_model.encode([selected_text])[0] | |
# df_cosine = pd.DataFrame(columns=["cta_text", "similarity"]) | |
# for sent in texts: | |
# sim = cosine(query_vec, recom_model.encode([sent])[0]) | |
# # print("Sentence = ", sent, "; similarity = ", sim) | |
# df_cosine.loc[len(df_cosine.index)] = [sent, sim] | |
# print(df_cosine) | |
# df_cosine_sort = df_cosine.sort_values(by=['similarity'], ascending=False) | |
df_recom_opt_out = pd.DataFrame(columns=["industry_code", "campaign_code", "cta_color", | |
"cta_text", selected_variable]) | |
#for _, w in df_cosine_sort.iterrows(): | |
for _, w in enumerate(recom_similar): | |
sim_word = texts[w[0]] #w[0] | |
# print(sim_word) | |
df_recom_opt_sim = df_recom[df_recom['cta_text'] == sim_word] | |
df_recom_opt_out = pd.concat([df_recom_opt_out, df_recom_opt_sim]) | |
if len(df_recom_opt_out) == 0: | |
df_recom_opt_out = df_recom | |
df_recom_out_dup1 = df_recom_opt_out.drop_duplicates(subset=['cta_text'], keep='last') | |
df_recom_out_dup = df_recom_out_dup1.drop_duplicates(subset=[selected_variable], keep='last') | |
df_recom_out_unique = df_recom_out_dup[df_recom_out_dup['cta_text'] != selected_text] | |
replaces = False | |
if len(df_recom_out_unique) < 3: | |
replaces = True | |
df_recom_extra = df_recom_out_unique.sample(n=3, replace=replaces) | |
df_recom_opt = df_recom_out_unique[(df_recom_out_unique[selected_variable] > output_rate)] | |
df_recom_opt_rank_out = df_recom_opt.head(3).sort_values(by=[selected_variable], | |
ascending=False) | |
print(f"\nTo get a higher {selected_variable}, the model recommends the following options:") | |
if len(df_recom_opt_rank_out) < 2: | |
# print("You've already achieved the highest", selected_variable, | |
# "with the current Call-To-Action Texts!") | |
increment = output_rate + (0.02*3) | |
for _, row in df_recom_extra.iterrows(): | |
target_rate = random.uniform(increment - 0.02, increment) | |
increment = target_rate - 0.001 | |
recom_cta = row[3] | |
print(f"\x1b[1m. {recom_cta.upper()} {round(target_rate*100, 2)}%\x1b[22m") | |
else: | |
for _, row in df_recom_opt_rank_out.iterrows(): | |
target_rate = row[4] | |
recom_cta = row[3] | |
print(f"\x1b[1m. {recom_cta.upper()} {round(target_rate*100, 2)}%\x1b[22m") | |
elif selected_cta == 'Both': | |
# df_recom_cl = df_recom_sort.drop_duplicates(subset=['cta_color'], keep='last') | |
# df_recom_tx = df_recom_sort.drop_duplicates(subset=['cta_text'], keep='last') | |
df_recom_both = df_recom_sort.drop_duplicates(subset=['cta_color', 'cta_text'], keep='last') | |
# df_recom_opt_both = df_recom_both[(df_recom_both[selected_variable] > output_rate)] | |
# df_recom_opt_rank_both = df_recom_opt_both.sample(n=3) | |
# df_recom_opt_rank_both_out = df_recom_opt_rank_both.sort_values(by=[selected_variable], ascending=False) | |
# # df_recom_opt_rank_both = df_recom_opt_both.nlargest(3, [selected_variable]) | |
words = simple_preprocess(email_text) | |
test_doc_vector = recom_model.infer_vector(words) | |
recom_similar = recom_model.dv.most_similar(positive = [test_doc_vector], topn=30) | |
# query_vec = recom_model.encode([selected_text])[0] | |
# df_cosine = pd.DataFrame(columns=["cta_text", "similarity"]) | |
# for sent in texts: | |
# sim = cosine(query_vec, recom_model.encode([sent])[0]) | |
# df_cosine.loc[len(df_cosine.index)] = [sent, sim] | |
# df_cosine_sort = df_cosine.sort_values(by=['similarity'], ascending=False) | |
df_recom_opt_out = pd.DataFrame(columns=["industry_code", "campaign_code", "cta_color", | |
"cta_text", selected_variable]) | |
#for _, w in df_cosine_sort.iterrows(): | |
for _, w in enumerate(recom_similar): | |
sim_word = texts[w[0]] #w[0] | |
df_recom_opt_sim = df_recom_both[df_recom_both['cta_text'] == sim_word] | |
df_recom_opt_out = pd.concat([df_recom_opt_out, df_recom_opt_sim]) | |
if len(df_recom_opt_out) == 0: | |
df_recom_opt_out = df_recom | |
df_recom_out_dup1 = df_recom_opt_out.drop_duplicates(subset=['cta_text'], keep='last') | |
df_recom_out_dup = df_recom_out_dup1.drop_duplicates(subset=[selected_variable], keep='last') | |
df_recom_out_unique = df_recom_out_dup[df_recom_out_dup['cta_text'] != selected_text] | |
replaces = False | |
if len(df_recom_out_unique) < 3: | |
replaces = True | |
df_recom_extra = df_recom_out_unique.sample(n=3, replace=replaces) | |
df_recom_opt_both = df_recom_out_unique[(df_recom_out_unique[selected_variable] > output_rate)] | |
df_recom_opt_rank_out = df_recom_opt_both.head(3).sort_values(by=[selected_variable], | |
ascending=False) | |
print(f"\nTo get a higher {selected_variable}, the model recommends the following options: ") | |
# if (len(df_recom_opt_rank_cl_out) == 0) or (len(df_recom_opt_rank_tx_out) == 0): | |
if len(df_recom_opt_rank_out) < 2 : | |
# print("You've already achieved the highest", selected_variable, | |
# "with the current Call-To-Action Colors!") | |
increment = output_rate + (0.02*3) | |
for _, row in df_recom_extra.iterrows(): | |
target_rate = random.uniform(increment - 0.02, increment) | |
increment = target_rate - 0.001 | |
recom_color = row[2] | |
recom_text = row[3] | |
print(f" {(color(' ', fore='#ffffff', back=recom_color))} \x1b[1m{recom_text.upper()} {round(target_rate*100, 2)}%\x1b[22m") | |
else: | |
for _, row in df_recom_opt_rank_out.iterrows(): | |
target_rate = row[4] | |
recom_color = row[2] | |
recom_text = row[3] | |
print(f" {(color(' ', fore='#ffffff', back=recom_color))} \x1b[1m{recom_text.upper()} {round(target_rate*100, 2)}%\x1b[22m") | |
# print(f"\x1b[1m\nTo get a higher {selected_variable}, the model recommends the following options: \x1b[22m") | |
print('\n') | |
# return r2_test | |
generate_pred = st.button('Generate Predictions') | |
if generate_pred: | |
st.session_state.generate_pred = True | |
if uploaded_file is None and st.session_state.generate_pred: | |
st.error('Please upload a email (HTML format)') | |
elif uploaded_file is not None and st.session_state.generate_pred: | |
placeholder = st.empty() | |
placeholder.text('Loading Data') | |
# Starting predictions | |
#vtext, ccolor, text = email_parser(st.session_state.uploaded_file) | |
#utils.email_parser(uploaded_file.getvalue().decode("utf-8")) | |
if (len(st.session_state.ccolor) > 0) and (len(st.session_state.text) > 0): | |
cta_button = select_cta_button(st.session_state.ccolor, st.session_state.text) | |
st.write(st.session_state) | |
get_predictions(st.session_state.target, st.session_state.industry, st.session_state.campaign, | |
st.session_state.call2action, st.session_state.vtext, st.session_state.ccolor, st.session_state.text, cta_button) | |
#st.info("Number of Call-To-Actions in the email: {}".format(len(text))) | |
#cta_list = generate_cta_list(len(text)) | |
#cta_selected = st.radio( | |
# 'Select the Call-To-Action you would like to analyze ?', | |
# cta_list) | |
#base_string = display_CTA(text, ccolor) | |
#st.components.v1.html(base_string, height=len(text)*30+50) | |
#predict = st.button('Predict Optimial CTA') | |
#cta_menu = [] | |
#for i in range(len(text)): | |
# cta_menu.append(ipywidgets.Checkbox( | |
# value=False, | |
# description='Call-To-Action Text: {}'.format(i+1), | |
# disabled=False, | |
# indent=False | |
# )) | |
#if cta_selected == 'All': | |
# for i in range(len(text)): | |
# cta_menu[i].value = True | |
#else: | |
# index = int(cta_selected.split(' ')[-1])-1 | |
# cta_menu[index].value = True | |
#if st.session_state.generate_pred and predict: | |
# utils.get_predictions( | |
# target, | |
# industry, | |
# campaign, | |
# call2action_feature, | |
# vtext, | |
# ccolor, | |
# text, | |
# cta_menu) | |
else: | |
st.write(st.session_state) | |
st.error("The email you uploaded does not contain any Call-To-Actions.") | |
placeholder.text('') | |