import streamlit as st
import streamlit.components.v1 as components
import pandas as pd
import PIL
# import ipywidgets
from joblib import dump, load
from bokeh.models.widgets import Div
import main_app
import utils
from utils import *
import email
import re
from bs4 import BeautifulSoup
import numpy as np
import tempfile
from sklearn.preprocessing import normalize
from colr import color
import cv2
from PIL import ImageColor
CURRENT_THEME = "blue"
IS_DARK_THEME = True
def get_rgb(color_str):
return ImageColor.getcolor(color_str, "RGB")
def create_image(width, height, rgb_color=(0, 0, 0)):
"""Create new image(numpy array) filled with certain color in RGB"""
# Create black blank image
image = np.zeros((height, width, 3), np.uint8)
# Since OpenCV uses BGR, convert the color first
color = tuple(reversed(rgb_color))
# Fill image with color
image[:] = color
return image
# def add_text(image, cta_txt):
# font = cv2.FONT_HERSHEY_SIMPLEX
# # fontScale
# fontScale = 1
# # Blue color in BGR
# color = (0, 0, 255)
# # Line thickness of 2 px
# h=image.shape[0]/2
# w=image.shape[1]/2
# thickness = 2
# image = cv2.putText(image, cta_txt, (h,w), font,
# fontScale, color, thickness, cv2.LINE_AA)
# return image
def add_bg_from_url():
st.markdown(
f"""
""",
unsafe_allow_html=True
)
add_bg_from_url()
def table_data():
# creating table data
field = [
'Data Scientist',
'Dataset',
'Algorithm',
'Framework',
'Ensemble',
'Domain',
'Model Size'
]
data = [
'Buwani',
'Internal + Campaign monitor',
'Random Forest',
'Sci-kit learn',
'Bootstrapping',
'Bootstrapping Aggregation',
'60.3 KB'
]
data = {
'Field': field,
'Data': data
}
df = pd.DataFrame.from_dict(data)
return df
def url_button(button_name, url):
if st.button(button_name):
js = """window.open('{url}')""".format(url=url) # New tab or window
html = '
'.format(js)
div = Div(text=html)
st.bokeh_chart(div)
if 'generate_pred' not in st.session_state:
st.session_state.generate_pred = False
st.markdown("# Call to Action: Email Industry")
stats_col1, stats_col2, stats_col3, stats_col4 = st.columns([1, 1, 1, 1])
with stats_col1:
st.caption("Production: Development")
#st.metric(label="Production", value="Devel")
with stats_col2:
st.caption("Accuracy: 80.49%")
#st.metric(label="Accuracy", value="80.49%")
with stats_col3:
st.caption("Speed: 0.004ms")
#st.metric(label="Speed", value="0.004 ms")
with stats_col4:
st.caption("Industry: Email")
#st.metric(label="Industry", value="Email")
with st.sidebar:
with st.expander('Model Description', expanded=False):
img = PIL.Image.open("figures/ModelCTA.png")
st.image(img)
st.markdown('This model aims to provide email campaign services and campaign engineers with a greater understanding of how to format your Call-To-Action (CTA) features, trained on a large corpus of email campaign CTA successes and failures. This model provides real-time predictive analytics recommendations to suggest optimal CTAs focusing the users attention to the right text and color of your CTA content. The Loxz Digital CTA Feature Selection will provide the best way to send out campaigns without the opportunity cost and time lapse of A/B testing. Email metrics are provided prior to campaign launch and determine the optimal engagement rate based on several factors, including several inputs by the campaign engineer.')
with st.expander('Model Information', expanded=False):
# Hide roww index
hide_table_row_index = """
"""
st.markdown(hide_table_row_index, unsafe_allow_html=True)
st.table(table_data())
url_button('Model Homepage', 'https://www.loxz.com/#/models/CTA')
# url_button('Full Report','https://resources.loxz.com/reports/realtime-ml-character-count-model')
url_button('Amazon Market Place', 'https://aws.amazon.com/marketplace')
industry_lists = [
'Academic and Education',
'Entertainment',
'Financial',
'Healthcare',
'Hospitality',
'Retail',
'Software and Technology',
'Transportation'
]
campaign_types = [
'Abandoned_Cart',
'Newsletter',
'Promotional',
'Survey',
'Transactional',
'Webinar',
'Engagement',
'Review_Request',
'Product_Announcement'
]
target_variables = [
'Click_To_Open_Rate',
'Conversion_Rate'
]
call2action = [
'Color', 'Text', 'Both'
]
uploaded_file = st.file_uploader(
"Please upload your email (In HTML Format)", type=["html"])
industry = st.selectbox(
'Please select your industry',
industry_lists
)
campaign = st.selectbox(
'Please select your campaign type',
campaign_types
)
target = st.selectbox(
'Please select your target variable',
target_variables
)
call2action_feature = st.selectbox(
'Select the Call-To-Action Feature you would like to analyze for predictive analytics',
call2action
)
def generate_cta_list(num_text):
cta_list = []
for i in range(num_text):
cta_list.append('CTA Number {}'.format(i+1))
cta_list.append('All')
return cta_list
def display_CTA(text, color):
"""
Display one cta based on their text and color
"""
base_string = ""
for i in range(len(text)):
base_string += """
CTA Number {}:
""".format(i+1, color[i], text[i])
if i != len(text)-1:
base_string += "
"
return base_string
#parsed_email UploadedFile object
def parse_features_from_html(body, soup):
cta_file = open('cta_text_list.txt', 'r')
cta_vfile = open('cta_verbs_list.txt', 'r')
cta_list = []
cta_verbs = []
for i, ln in enumerate(cta_file):
cta_list.append(ln.strip())
for i, ln in enumerate(cta_vfile):
cta_verbs.append(ln.strip())
#extracting visible text:
visible_text = []
ccolor = []
text = []
# vtexts = soup.findAll(text=True) ## Find all the text in the doc
bodytext = soup.get_text()
vtexts = preprocess_text(bodytext)
vtexts = " ".join(vtexts.split())
# for v in vtexts:
# if len(v) > 2:
# if not "mso" in v:
# if not "endif" in v:
# if not "if !vml" in v:
# vtext = re.sub(r'\W+', ' ', v)
# if len(vtext) > 2:
# visible_text.append(vtext)
# extracting links
#items = soup.find_all('a', {"class": "mso_button"})
items = soup.find_all('a', {'href': True})
# print(items)
# print('++++++++++++++')
for i in items: # Items contain all with with 'href'
try:
#if i['style']:
style = i['style']
style = style.replace('\r', '')
style = style.replace('\n', '')
styles = style.split(';')
color_flag = 0 ## Indicate whether there's 'background-color' option
style_str = str(style)
if ('background-color' in style_str) and ('display' in style_str) and ('border-radius' in style_str):
# print(styles)
for s in styles:
#st.write(s)
if 'background-color' in s:
#st.write('background-color in s')
#st.write(color_flag)
cl = s.split(':')[1].lower()
cl = cl.replace('!important', '')
cl = cl.replace('=', '')
if cl.strip() == 'transparent':
cl = '#00ffffff'
if 'rgb' in cl:
rgb = cl[cl.index('(')+1:cl.index(')')].split(',')
cl = rgb_to_hex((int(rgb[0]), int(rgb[1]), int(rgb[2])))
ccolor.append(cl.strip()) # Add background color to CTA color list
color_flag = 1
#st.write('cf after:')
#st.write(color_flag)
# print(body)
#st.write(color_flag)
# if 'padding' in s: # Check if border-radius is there for a button border (CTA)
# print(styles)
# color_flag = 1
# elif 'color' in s:
# color.append(s.split(':')[1])
# text.append(i.select_one("span").text)
#st.write(color_flag)
#st.write(ccolor)
#st.write(i)
if color_flag == 1:
#st.write(i)
clean = re.compile('<.*?>')
t = re.sub(clean, '', i.string.replace('\n', '').replace('\t', ' ')).lower()
#st.write(t)
#st.write(i)
t.replace('→', '')
t.replace('\t', ' ')
text.append(t.strip())
# print(i.string.replace('\n', ''))
#st.write(color_flag)
except:
continue
#st.write(text)
#st.write(ccolor)
op_color = [] # Output text and color lists
op_text = []
#doesnt hit since ccolor and text is not empty (has 2)
if (text == []) or (ccolor == []):
return vtexts, [], []
else:
## cta_list, cta_verbs
for c in range(len(text)):
if text[c] in cta_list:
op_text.append(text[c])
op_color.append(ccolor[c])
else:
for cv in cta_verbs:
if cv in text[c]:
op_text.append(text[c])
op_color.append(ccolor[c])
return vtexts, op_color, op_text
def email_parser(parsed_email):
# email_data = parsed_email.value # parsed_email.data[0]
# emailstr = email_data.decode("utf-8")
efile = open(parsed_email.name,'r')
emailstr = ""
for i, line in enumerate(efile):
emailstr += line
b = email.message_from_string(emailstr)
body = ""
for part in b.walk():
if part.get_content_type():
body = str(part.get_payload())
# print('EMAIL: ', body)
doc = preprocess_text(body)
soup = BeautifulSoup(doc)
vtext, ccolor, text = parse_features_from_html(body, soup)
#save to session state
st.session_state.vtext = vtext
st.session_state.ccolor = ccolor
st.session_state.text = text
return vtext, ccolor, text
## "=",=3D removed from html_tags.csv
def preprocess_text(doc):
html_tags = open('html_tags.csv', 'r')
tags = {}
for i, line in enumerate(html_tags):
ln = line.strip().split(',')
ln[0] = ln[0].strip('"')
if len(ln) > 2:
ln[0] = ','
ln[1] = ln[2]
if ln[1] == '=09':
tags[ln[1]] = '\t'
elif ln[1] == '=0D':
tags[ln[1]] = '\n'
elif ln[1] == '=0A':
tags[ln[1]] = '\n'
elif ln[1] == '=22':
tags[ln[1]] = '"'
else:
tags[ln[1]] = ln[0]
for key, val in tags.items():
if key in doc:
doc = doc.replace(key, val)
if '=3D' in doc:
doc = doc.replace('=3D', '%3D')
if '=' in doc:
doc = doc.replace('=\n', '')
doc = doc.replace('%3D', '=')
# print ('MODIFIED: ', doc)
return doc
## Select which CTA to be used for analysis
## Select which CTA to be used for analysis
def select_cta_button(ccolor, text):
user_input = []
print("\nNumber of Call-To-Actions in the email:", len(text), '\n')
print('Select which Call-To-Action button(s) you would like to analyze: \n')
st.write("\nNumber of Call-To-Actions in the email:", len(text), '\n')
st.write('Select which Call-To-Action button(s) you would like to analyze: \n')
#st.write(st.session_state)
buttons_out=[]
for x in np.arange(len(st.session_state.ccolor)):
color_rgb=get_rgb(str(st.session_state.ccolor[x]))
color_img=create_image(100,30,color_rgb)
# color_img=add_text(color_img,"Call_To_Action text: "+str(st.session_state.text[x]))
col1, col2, col3, col4 = st.columns([1,1,1,1])
with col2:
# st.button('1')
st.write('Call_To_Action text: {}'.format(str(st.session_state.text[x])))
with col3:
st.write('Call_To_Action button Color:')
with col4:
# st.button('2')
st.image(color_img,caption='CTA Button Color', channels='BGR')
with col1:
ctab=st.button("Select This CTA button to optimize", key = x)
# st.image(color_img, channels='BGR')
# ctab=st.button("Call_To_Action text: "+str(st.session_state.text[x])+"; color: "+str(st.session_state.ccolor[x]), key = x)
# ctab=st.button("Select This CTA button to optimize", key = x)
res=[]
res.append(x)
val={}
val['value']=ctab
res.append(val)
buttons_out.append(res)
return buttons_out
'''def toggle_all(change):
for cb in user_input:
cb.value = select_all.value
select_all = ipywidgets.Checkbox(value=False, description='Select All', disabled=False, indent=False)
#display(select_all)
for idx, i in enumerate(text):
option_str = str(int(idx)+1) + '. Call-To-Action Text: '
cta_menu = ipywidgets.Checkbox(value=False, description=option_str, disabled=False, indent=False)
btn_layout = ipywidgets.Layout(height='20px', width='20px')
color_button = ipywidgets.Button(layout = btn_layout, description = '')
color_button.style.button_color = ccolor[idx]
widg_container = ipywidgets.GridBox([cta_menu, ipywidgets.Label((text[idx]).upper()),
ipywidgets.Label(' Color: ') , color_button],
layout=ipywidgets.Layout(grid_template_columns="180px 150px 50px 100px"))
#display(widg_container)
user_input.append(cta_menu)
select_all.observe(toggle_all)
return user_input'''
def save_state():
if uploaded_file is not None:
if 'industry_lists' not in st.session_state:
st.session_state.industry_lists = industry_lists
if 'campaign_types' not in st.session_state:
st.session_state.campaign_types = campaign_types
if 'target_variables' not in st.session_state:
st.session_state.target_variables = target_variables
if 'call2action' not in st.session_state:
st.session_state.call2action = call2action
if 'uploaded_file' not in st.session_state:
st.session_state.uploaded_file = uploaded_file
if 'industry' not in st.session_state:
st.session_state.industry = industry
if 'campaign' not in st.session_state:
st.session_state.campaign = campaign
if 'target' not in st.session_state:
st.session_state.target = target
if 'call2action_feature' not in st.session_state:
st.session_state.call2action_feature = call2action_feature
vtext, ccolor, text = email_parser(st.session_state.uploaded_file)
save_state()
### Read in data
def import_data(bucket, key):
location = 's3://{}/{}'.format(bucket, key)
df_data = pd.read_csv(location, encoding = "ISO-8859-1",index_col=0)
df_data = df_data.reset_index(drop=True)
return df_data
### Read in data
def read_data(path, fname):
df_data = pd.read_csv(path+fname, encoding = "ISO-8859-1",index_col=0)
df_data = df_data.reset_index(drop=True)
return df_data
### Model Training
def get_predictions(selected_variable, selected_industry, selected_campaign,
selected_cta, email_text, cta_col, cta_txt, cta_menu):
bucket_name = 'sagemakermodelcta'
if selected_variable == 'Click_To_Open_Rate':
X_name = 'Xtest_MLP_CTOR.csv'
# y_name = 'ytest_MLP_CTOR.csv'
key = 'modelCTA_MLP_CTOR.sav'
elif selected_variable == 'Conversion_Rate':
X_name = 'Xtest_MLP_ConversionRate.csv'
# y_name = 'ytest_MLP_Conversion_Rate.csv'
key = 'modelCTA_MLP_ConversionRate.sav'
# training_dataset = import_data('s3://emailcampaigntrainingdata/ModelCTA', 'recommendations.csv')
training_dataset=read_data("./data/","recommendations.csv")
# X_test = import_data('s3://emailcampaigntrainingdata/ModelCTA', X_name)
# y_test = import_data('s3://emailcampaigntrainingdata/ModelCTA', y_name)
# load model from S3
# with tempfile.TemporaryFile() as fp:
# s3_client.download_fileobj(Fileobj=fp, Bucket=bucket_name, Key=key)
# fp.seek(0)
# regr = joblib.load(fp)
model_file='./models/'+key
regr=joblib.load(model_file)
email_body_dict = {}
for _, r in training_dataset.iterrows():
if r[0] not in email_body_dict.keys():
email_body_dict[r[0]] = r[4]
email_body = email_body_dict.keys()
texts = list(email_body_dict.values())
# texts = training_dataset['body'].unique() ## Use email body for NLP
# texts = training_dataset['cta_text'].unique()
# y_pred = regr.predict(X_test)
# print(X_test)
# r2_test = r2_score(y_test, y_pred)
## Get recommendation
recom_model = text_embeddings(email_body)
# recom_model = text_embeddings()
industry_code_dict = dict(zip(training_dataset.industry, training_dataset.industry_code))
campaign_code_dict = dict(zip(training_dataset.campaign, training_dataset.campaign_code))
color_code_dict = dict(zip(training_dataset.cta_color, training_dataset.color_code))
text_code_dict = dict(zip(training_dataset.cta_text, training_dataset.text_code))
st.markdown('##### CTA_menue is: {}'.format(cta_menu), unsafe_allow_html=True)
for ip_idx, ip in enumerate(cta_menu): # For each CTA selected
if ip[1]['value'] == True:
print(f'\n\x1b[4mCall-To-Action button {int(ip_idx)+1}\x1b[0m: ')
cta_ind = ip_idx
selected_color = cta_col[cta_ind]
selected_text = cta_txt[cta_ind]
df_uploaded = pd.DataFrame(columns=['industry', 'campaign', 'cta_color', 'cta_text'])
df_uploaded.loc[0] = [selected_industry, selected_campaign, cta_col, cta_txt]
df_uploaded['industry_code'] = industry_code_dict.get(selected_industry)
# df_uploaded['campaign_code'] = campaign_code_dict.get(selected_campaign)
if selected_campaign not in campaign_code_dict.keys():
campaign_code_dict[selected_campaign] = max(campaign_code_dict.values()) + 1
df_uploaded['campaign_code'] = campaign_code_dict.get(selected_campaign)
if selected_color not in color_code_dict.keys():
color_code_dict[selected_color] = max(color_code_dict.values()) + 1
df_uploaded['color_code'] = color_code_dict.get(selected_color)
if selected_text not in text_code_dict.keys():
text_code_dict[selected_text] = max(text_code_dict.values()) + 1
df_uploaded['text_code'] = text_code_dict.get(selected_text)
df_uploaded_test = df_uploaded.drop(['industry', 'campaign', 'cta_color', 'cta_text'],
axis = 1, inplace = False)
df_uploaded_test = df_uploaded_test.dropna()
# df_testset = df_uploaded_test.copy()
# if selected_cta == 'Text':
# for k in text_code_dict.keys():
# df_temp = df_uploaded_test.copy()
# df_temp.text_code = text_code_dict.get(k)
# df_testset = pd.concat([df_testset, df_temp], ignore_index=True)
# # print(df_testset.drop_duplicates())
# arr = df_testset.to_numpy().astype('float64')
# predicted_rate = regr.predict(arr)
# sorted_index_array = np.argsort(predicted_rate)
# sorted_array = predicted_rate[sorted_index_array]
# print(sorted_array[-3 : ])
#print('Length', arr.size)
arr = df_uploaded_test.to_numpy().astype('float64')
arr_norm = normalize(arr, norm = 'l2')
predicted_rate = regr.predict(arr_norm)[0]
output_rate = predicted_rate
if output_rate < 0:
st.markdown("##### Sorry, Current model couldn't provide predictions on the target variable you selected.", unsafe_allow_html=True)
else:
print(f'\x1b[35m\nModel Prediction on the {selected_variable} is: \x1b[1m{round(output_rate*100, 2)}%\x1b[39m\x1b[22m')
st.markdown("##### Model Prediction on the {} is {}".format(selected_variable, round(output_rate*100, 2)), unsafe_allow_html=True)
selected_industry_code = industry_code_dict.get(selected_industry)
selected_campaign_code = campaign_code_dict.get(selected_campaign)
### Create dataset for recommendation
# select the certain industry that user selected
###+++++use training data+++++++
df_recom = training_dataset[["industry_code", "campaign_code", "cta_color", "cta_text",
selected_variable]]
df_recom = df_recom[df_recom["industry_code"] == selected_industry_code]
# df_recom = df_recom[df_recom["campaign_code"] == selected_campaign_code]
df_recom[selected_variable]=df_recom[selected_variable].apply(lambda x:round(x, 5))
df_recom_sort = df_recom.sort_values(by=[selected_variable])
## Filter recommendatins for either CTA text or color
recom_ind = 0
st.markdown('##### selected_cta is: {}'.format(selected_cta), unsafe_allow_html=True)
if selected_cta == 'Color':
df_recom = df_recom_sort.drop_duplicates(subset=['cta_color'], keep='last')
# st.markdown('##### df_recom is: {}'.format(df_recom), unsafe_allow_html=True)
replaces = False
if len(df_recom) < 3:
replaces = True
df_recom_extra = df_recom.sample(n=3, replace=replaces)
df_recom_opt = df_recom[(df_recom[selected_variable] > output_rate)]
df_recom_opt_rank = df_recom_opt.head(n=3)
df_recom_opt_rank_out = df_recom_opt_rank.sort_values(by=[selected_variable], ascending=False)
# df_recom_opt_rank = df_recom_opt.nlargest(3, [selected_variable])
print(f"\nTo get a higher {selected_variable}, the model recommends the following options: ")
st.markdown('##### To get a higher {}, the model recommends the following options:'.format(selected_variable), unsafe_allow_html=True)
if len(df_recom_opt_rank_out) < 2:
# st.markdown('##### Youve already achieved the highest {} with the current Call-To-Action Colors!'.format(selected_variable), unsafe_allow_html=True)
# print("You've already achieved the highest", selected_variable,
# "with the current Call-To-Action Colors!")
increment = output_rate + (0.02*3)
for _, row in df_recom_extra.iterrows():
target_rate = random.uniform(increment - 0.02, increment)
increment = target_rate - 0.001
recom_cta = row[2]
color_rgb=get_rgb(recom_cta)
color_img=create_image(100,30,color_rgb)
st.image(color_img, channels='BGR')
# st.markdown('##### recom_cta is: {}'.format(recom_cta), unsafe_allow_html=True)
st.markdown('##### target_rate for above recommended CTA button Color is: {}'.format(round(target_rate*100, 2)), unsafe_allow_html=True)
print(f" {(color(' ', fore='#ffffff', back=recom_cta))} \x1b[1m{round(target_rate*100, 2)}%\x1b[22m")
else:
for _, row in df_recom_opt_rank_out.iterrows():
target_rate = row[4]
recom_cta = row[2]
color_rgb=get_rgb(recom_cta)
color_img=create_image(100,30,color_rgb)
st.image(color_img, channels='BGR')
# st.markdown('##### recom_cta is: {}'.format(recom_cta), unsafe_allow_html=True)
st.markdown('##### target_rate for above recommended CTA button Color is: {}'.format(round(target_rate*100, 2)), unsafe_allow_html=True)
print(f" {(color(' ', fore='#ffffff', back=recom_cta))} \x1b[1m{round(target_rate*100, 2)}%\x1b[22m")
elif selected_cta == 'Text':
df_recom = df_recom_sort.drop_duplicates(subset=['cta_text'], keep='last')
# df_recom_opt = df_recom[(df_recom[selected_variable] > output_rate)]
# df_recom_opt_rank = df_recom_opt.sample(n=3)
# df_recom_opt_rank_out = df_recom_opt_rank.sort_values(by=[selected_variable], ascending=False)
# # df_recom_opt_rank = df_recom_opt.nlargest(3, [selected_variable])
words = simple_preprocess(email_text)
test_doc_vector = recom_model.infer_vector(words)
recom_similar = recom_model.dv.most_similar(positive = [test_doc_vector], topn=30)
# query_vec = recom_model.encode([selected_text])[0]
# df_cosine = pd.DataFrame(columns=["cta_text", "similarity"])
# for sent in texts:
# sim = cosine(query_vec, recom_model.encode([sent])[0])
# # print("Sentence = ", sent, "; similarity = ", sim)
# df_cosine.loc[len(df_cosine.index)] = [sent, sim]
# print(df_cosine)
# df_cosine_sort = df_cosine.sort_values(by=['similarity'], ascending=False)
df_recom_opt_out = pd.DataFrame(columns=["industry_code", "campaign_code", "cta_color",
"cta_text", selected_variable])
#for _, w in df_cosine_sort.iterrows():
for _, w in enumerate(recom_similar):
sim_word = texts[w[0]] #w[0]
# print(sim_word)
df_recom_opt_sim = df_recom[df_recom['cta_text'] == sim_word]
df_recom_opt_out = pd.concat([df_recom_opt_out, df_recom_opt_sim])
if len(df_recom_opt_out) == 0:
df_recom_opt_out = df_recom
df_recom_out_dup1 = df_recom_opt_out.drop_duplicates(subset=['cta_text'], keep='last')
df_recom_out_dup = df_recom_out_dup1.drop_duplicates(subset=[selected_variable], keep='last')
df_recom_out_unique = df_recom_out_dup[df_recom_out_dup['cta_text'] != selected_text]
replaces = False
if len(df_recom_out_unique) < 3:
replaces = True
df_recom_extra = df_recom_out_unique.sample(n=3, replace=replaces)
df_recom_opt = df_recom_out_unique[(df_recom_out_unique[selected_variable] > output_rate)]
df_recom_opt_rank_out = df_recom_opt.head(3).sort_values(by=[selected_variable],
ascending=False)
print(f"\nTo get a higher {selected_variable}, the model recommends the following options:")
if len(df_recom_opt_rank_out) < 2:
# print("You've already achieved the highest", selected_variable,
# "with the current Call-To-Action Texts!")
increment = output_rate + (0.02*3)
for _, row in df_recom_extra.iterrows():
target_rate = random.uniform(increment - 0.02, increment)
increment = target_rate - 0.001
recom_cta = row[3]
print(f"\x1b[1m. {recom_cta.upper()} {round(target_rate*100, 2)}%\x1b[22m")
else:
for _, row in df_recom_opt_rank_out.iterrows():
target_rate = row[4]
recom_cta = row[3]
print(f"\x1b[1m. {recom_cta.upper()} {round(target_rate*100, 2)}%\x1b[22m")
elif selected_cta == 'Both':
# df_recom_cl = df_recom_sort.drop_duplicates(subset=['cta_color'], keep='last')
# df_recom_tx = df_recom_sort.drop_duplicates(subset=['cta_text'], keep='last')
df_recom_both = df_recom_sort.drop_duplicates(subset=['cta_color', 'cta_text'], keep='last')
# df_recom_opt_both = df_recom_both[(df_recom_both[selected_variable] > output_rate)]
# df_recom_opt_rank_both = df_recom_opt_both.sample(n=3)
# df_recom_opt_rank_both_out = df_recom_opt_rank_both.sort_values(by=[selected_variable], ascending=False)
# # df_recom_opt_rank_both = df_recom_opt_both.nlargest(3, [selected_variable])
words = simple_preprocess(email_text)
test_doc_vector = recom_model.infer_vector(words)
recom_similar = recom_model.dv.most_similar(positive = [test_doc_vector], topn=30)
# query_vec = recom_model.encode([selected_text])[0]
# df_cosine = pd.DataFrame(columns=["cta_text", "similarity"])
# for sent in texts:
# sim = cosine(query_vec, recom_model.encode([sent])[0])
# df_cosine.loc[len(df_cosine.index)] = [sent, sim]
# df_cosine_sort = df_cosine.sort_values(by=['similarity'], ascending=False)
df_recom_opt_out = pd.DataFrame(columns=["industry_code", "campaign_code", "cta_color",
"cta_text", selected_variable])
#for _, w in df_cosine_sort.iterrows():
for _, w in enumerate(recom_similar):
sim_word = texts[w[0]] #w[0]
df_recom_opt_sim = df_recom_both[df_recom_both['cta_text'] == sim_word]
df_recom_opt_out = pd.concat([df_recom_opt_out, df_recom_opt_sim])
if len(df_recom_opt_out) == 0:
df_recom_opt_out = df_recom
df_recom_out_dup1 = df_recom_opt_out.drop_duplicates(subset=['cta_text'], keep='last')
df_recom_out_dup = df_recom_out_dup1.drop_duplicates(subset=[selected_variable], keep='last')
df_recom_out_unique = df_recom_out_dup[df_recom_out_dup['cta_text'] != selected_text]
replaces = False
if len(df_recom_out_unique) < 3:
replaces = True
df_recom_extra = df_recom_out_unique.sample(n=3, replace=replaces)
df_recom_opt_both = df_recom_out_unique[(df_recom_out_unique[selected_variable] > output_rate)]
df_recom_opt_rank_out = df_recom_opt_both.head(3).sort_values(by=[selected_variable],
ascending=False)
print(f"\nTo get a higher {selected_variable}, the model recommends the following options: ")
# if (len(df_recom_opt_rank_cl_out) == 0) or (len(df_recom_opt_rank_tx_out) == 0):
if len(df_recom_opt_rank_out) < 2 :
# print("You've already achieved the highest", selected_variable,
# "with the current Call-To-Action Colors!")
increment = output_rate + (0.02*3)
for _, row in df_recom_extra.iterrows():
target_rate = random.uniform(increment - 0.02, increment)
increment = target_rate - 0.001
recom_color = row[2]
recom_text = row[3]
print(f" {(color(' ', fore='#ffffff', back=recom_color))} \x1b[1m{recom_text.upper()} {round(target_rate*100, 2)}%\x1b[22m")
else:
for _, row in df_recom_opt_rank_out.iterrows():
target_rate = row[4]
recom_color = row[2]
recom_text = row[3]
print(f" {(color(' ', fore='#ffffff', back=recom_color))} \x1b[1m{recom_text.upper()} {round(target_rate*100, 2)}%\x1b[22m")
# print(f"\x1b[1m\nTo get a higher {selected_variable}, the model recommends the following options: \x1b[22m")
print('\n')
# return r2_test
generate_pred = st.button('Generate Predictions')
if generate_pred:
st.session_state.generate_pred = True
if uploaded_file is None and st.session_state.generate_pred:
st.error('Please upload a email (HTML format)')
elif uploaded_file is not None and st.session_state.generate_pred:
placeholder = st.empty()
placeholder.text('Loading Data')
# Starting predictions
#vtext, ccolor, text = email_parser(st.session_state.uploaded_file)
#utils.email_parser(uploaded_file.getvalue().decode("utf-8"))
if (len(st.session_state.ccolor) > 0) and (len(st.session_state.text) > 0):
cta_button = select_cta_button(st.session_state.ccolor, st.session_state.text)
st.write(st.session_state)
# get_predictions(st.session_state.target, st.session_state.industry, st.session_state.campaign,
# st.session_state.call2action_feature, st.session_state.vtext, st.session_state.ccolor, st.session_state.text, cta_button)
get_predictions(st.session_state.target, st.session_state.industry, st.session_state.campaign,
call2action_feature, st.session_state.vtext, st.session_state.ccolor, st.session_state.text, cta_button)
#st.info("Number of Call-To-Actions in the email: {}".format(len(text)))
#cta_list = generate_cta_list(len(text))
#cta_selected = st.radio(
# 'Select the Call-To-Action you would like to analyze ?',
# cta_list)
#base_string = display_CTA(text, ccolor)
#st.components.v1.html(base_string, height=len(text)*30+50)
#predict = st.button('Predict Optimial CTA')
#cta_menu = []
#for i in range(len(text)):
# cta_menu.append(ipywidgets.Checkbox(
# value=False,
# description='Call-To-Action Text: {}'.format(i+1),
# disabled=False,
# indent=False
# ))
#if cta_selected == 'All':
# for i in range(len(text)):
# cta_menu[i].value = True
#else:
# index = int(cta_selected.split(' ')[-1])-1
# cta_menu[index].value = True
#if st.session_state.generate_pred and predict:
# utils.get_predictions(
# target,
# industry,
# campaign,
# call2action_feature,
# vtext,
# ccolor,
# text,
# cta_menu)
else:
st.write(st.session_state)
st.error("The email you uploaded does not contain any Call-To-Actions.")
placeholder.text('')