import streamlit as st import streamlit.components.v1 as components import pandas as pd import PIL # import ipywidgets from joblib import dump, load from bokeh.models.widgets import Div import main_app import utils from utils import * import email import re from bs4 import BeautifulSoup import numpy as np import tempfile from sklearn.preprocessing import normalize from colr import color import cv2 from PIL import ImageColor CURRENT_THEME = "blue" IS_DARK_THEME = True def get_rgb(color_str): return ImageColor.getcolor(color_str, "RGB") def create_image(width, height, rgb_color=(0, 0, 0)): """Create new image(numpy array) filled with certain color in RGB""" # Create black blank image image = np.zeros((height, width, 3), np.uint8) # Since OpenCV uses BGR, convert the color first color = tuple(reversed(rgb_color)) # Fill image with color image[:] = color return image # def add_text(image, cta_txt): # font = cv2.FONT_HERSHEY_SIMPLEX # # fontScale # fontScale = 1 # # Blue color in BGR # color = (0, 0, 255) # # Line thickness of 2 px # h=image.shape[0]/2 # w=image.shape[1]/2 # thickness = 2 # image = cv2.putText(image, cta_txt, (h,w), font, # fontScale, color, thickness, cv2.LINE_AA) # return image def add_bg_from_url(): st.markdown( f""" """, unsafe_allow_html=True ) add_bg_from_url() def table_data(): # creating table data field = [ 'Data Scientist', 'Dataset', 'Algorithm', 'Framework', 'Ensemble', 'Domain', 'Model Size' ] data = [ 'Buwani', 'Internal + Campaign monitor', 'Random Forest', 'Sci-kit learn', 'Bootstrapping', 'Bootstrapping Aggregation', '60.3 KB' ] data = { 'Field': field, 'Data': data } df = pd.DataFrame.from_dict(data) return df def url_button(button_name, url): if st.button(button_name): js = """window.open('{url}')""".format(url=url) # New tab or window html = ''.format(js) div = Div(text=html) st.bokeh_chart(div) if 'generate_pred' not in st.session_state: st.session_state.generate_pred = False st.markdown("# Call to Action: Email Industry") stats_col1, stats_col2, stats_col3, stats_col4 = st.columns([1, 1, 1, 1]) with stats_col1: st.caption("Production: Development") #st.metric(label="Production", value="Devel") with stats_col2: st.caption("Accuracy: 80.49%") #st.metric(label="Accuracy", value="80.49%") with stats_col3: st.caption("Speed: 0.004ms") #st.metric(label="Speed", value="0.004 ms") with stats_col4: st.caption("Industry: Email") #st.metric(label="Industry", value="Email") with st.sidebar: with st.expander('Model Description', expanded=False): img = PIL.Image.open("figures/ModelCTA.png") st.image(img) st.markdown('This model aims to provide email campaign services and campaign engineers with a greater understanding of how to format your Call-To-Action (CTA) features, trained on a large corpus of email campaign CTA successes and failures. This model provides real-time predictive analytics recommendations to suggest optimal CTAs focusing the users attention to the right text and color of your CTA content. The Loxz Digital CTA Feature Selection will provide the best way to send out campaigns without the opportunity cost and time lapse of A/B testing. Email metrics are provided prior to campaign launch and determine the optimal engagement rate based on several factors, including several inputs by the campaign engineer.') with st.expander('Model Information', expanded=False): # Hide roww index hide_table_row_index = """ """ st.markdown(hide_table_row_index, unsafe_allow_html=True) st.table(table_data()) url_button('Model Homepage', 'https://www.loxz.com/#/models/CTA') # url_button('Full Report','https://resources.loxz.com/reports/realtime-ml-character-count-model') url_button('Amazon Market Place', 'https://aws.amazon.com/marketplace') industry_lists = [ 'Academic and Education', 'Entertainment', 'Financial', 'Healthcare', 'Hospitality', 'Retail', 'Software and Technology', 'Transportation' ] campaign_types = [ 'Abandoned_Cart', 'Newsletter', 'Promotional', 'Survey', 'Transactional', 'Webinar', 'Engagement', 'Review_Request', 'Product_Announcement' ] target_variables = [ 'Click_To_Open_Rate', 'Conversion_Rate' ] call2action = [ 'Color', 'Text', 'Both' ] uploaded_file = st.file_uploader( "Please upload your email (In HTML Format)", type=["html"]) industry = st.selectbox( 'Please select your industry', industry_lists ) campaign = st.selectbox( 'Please select your campaign type', campaign_types ) target = st.selectbox( 'Please select your target variable', target_variables ) call2action_feature = st.selectbox( 'Select the Call-To-Action Feature you would like to analyze for predictive analytics', call2action ) def generate_cta_list(num_text): cta_list = [] for i in range(num_text): cta_list.append('CTA Number {}'.format(i+1)) cta_list.append('All') return cta_list def display_CTA(text, color): """ Display one cta based on their text and color """ base_string = "" for i in range(len(text)): base_string += """ CTA Number {}: """.format(i+1, color[i], text[i]) if i != len(text)-1: base_string += "
" return base_string #parsed_email UploadedFile object def parse_features_from_html(body, soup): cta_file = open('cta_text_list.txt', 'r') cta_vfile = open('cta_verbs_list.txt', 'r') cta_list = [] cta_verbs = [] for i, ln in enumerate(cta_file): cta_list.append(ln.strip()) for i, ln in enumerate(cta_vfile): cta_verbs.append(ln.strip()) #extracting visible text: visible_text = [] ccolor = [] text = [] # vtexts = soup.findAll(text=True) ## Find all the text in the doc bodytext = soup.get_text() vtexts = preprocess_text(bodytext) vtexts = " ".join(vtexts.split()) # for v in vtexts: # if len(v) > 2: # if not "mso" in v: # if not "endif" in v: # if not "if !vml" in v: # vtext = re.sub(r'\W+', ' ', v) # if len(vtext) > 2: # visible_text.append(vtext) # extracting links #items = soup.find_all('a', {"class": "mso_button"}) items = soup.find_all('a', {'href': True}) # print(items) # print('++++++++++++++') for i in items: # Items contain all with with 'href' try: #if i['style']: style = i['style'] style = style.replace('\r', '') style = style.replace('\n', '') styles = style.split(';') color_flag = 0 ## Indicate whether there's 'background-color' option style_str = str(style) if ('background-color' in style_str) and ('display' in style_str) and ('border-radius' in style_str): # print(styles) for s in styles: #st.write(s) if 'background-color' in s: #st.write('background-color in s') #st.write(color_flag) cl = s.split(':')[1].lower() cl = cl.replace('!important', '') cl = cl.replace('=', '') if cl.strip() == 'transparent': cl = '#00ffffff' if 'rgb' in cl: rgb = cl[cl.index('(')+1:cl.index(')')].split(',') cl = rgb_to_hex((int(rgb[0]), int(rgb[1]), int(rgb[2]))) ccolor.append(cl.strip()) # Add background color to CTA color list color_flag = 1 #st.write('cf after:') #st.write(color_flag) # print(body) #st.write(color_flag) # if 'padding' in s: # Check if border-radius is there for a button border (CTA) # print(styles) # color_flag = 1 # elif 'color' in s: # color.append(s.split(':')[1]) # text.append(i.select_one("span").text) #st.write(color_flag) #st.write(ccolor) #st.write(i) if color_flag == 1: #st.write(i) clean = re.compile('<.*?>') t = re.sub(clean, '', i.string.replace('\n', '').replace('\t', ' ')).lower() #st.write(t) #st.write(i) t.replace('→', '') t.replace('\t', ' ') text.append(t.strip()) # print(i.string.replace('\n', '')) #st.write(color_flag) except: continue #st.write(text) #st.write(ccolor) op_color = [] # Output text and color lists op_text = [] #doesnt hit since ccolor and text is not empty (has 2) if (text == []) or (ccolor == []): return vtexts, [], [] else: ## cta_list, cta_verbs for c in range(len(text)): if text[c] in cta_list: op_text.append(text[c]) op_color.append(ccolor[c]) else: for cv in cta_verbs: if cv in text[c]: op_text.append(text[c]) op_color.append(ccolor[c]) return vtexts, op_color, op_text def email_parser(parsed_email): # email_data = parsed_email.value # parsed_email.data[0] # emailstr = email_data.decode("utf-8") efile = open(parsed_email.name,'r') emailstr = "" for i, line in enumerate(efile): emailstr += line b = email.message_from_string(emailstr) body = "" for part in b.walk(): if part.get_content_type(): body = str(part.get_payload()) # print('EMAIL: ', body) doc = preprocess_text(body) soup = BeautifulSoup(doc) vtext, ccolor, text = parse_features_from_html(body, soup) #save to session state st.session_state.vtext = vtext st.session_state.ccolor = ccolor st.session_state.text = text return vtext, ccolor, text ## "=",=3D removed from html_tags.csv def preprocess_text(doc): html_tags = open('html_tags.csv', 'r') tags = {} for i, line in enumerate(html_tags): ln = line.strip().split(',') ln[0] = ln[0].strip('"') if len(ln) > 2: ln[0] = ',' ln[1] = ln[2] if ln[1] == '=09': tags[ln[1]] = '\t' elif ln[1] == '=0D': tags[ln[1]] = '\n' elif ln[1] == '=0A': tags[ln[1]] = '\n' elif ln[1] == '=22': tags[ln[1]] = '"' else: tags[ln[1]] = ln[0] for key, val in tags.items(): if key in doc: doc = doc.replace(key, val) if '=3D' in doc: doc = doc.replace('=3D', '%3D') if '=' in doc: doc = doc.replace('=\n', '') doc = doc.replace('%3D', '=') # print ('MODIFIED: ', doc) return doc ## Select which CTA to be used for analysis ## Select which CTA to be used for analysis def select_cta_button(ccolor, text): user_input = [] print("\nNumber of Call-To-Actions in the email:", len(text), '\n') print('Select which Call-To-Action button(s) you would like to analyze: \n') st.write("\nNumber of Call-To-Actions in the email:", len(text), '\n') st.write('Select which Call-To-Action button(s) you would like to analyze: \n') #st.write(st.session_state) buttons_out=[] for x in np.arange(len(st.session_state.ccolor)): color_rgb=get_rgb(str(st.session_state.ccolor[x])) color_img=create_image(100,30,color_rgb) # color_img=add_text(color_img,"Call_To_Action text: "+str(st.session_state.text[x])) col1, col2, col3, col4 = st.columns([1,1,1,1]) with col2: # st.button('1') st.write('Call_To_Action text: {}'.format(str(st.session_state.text[x]))) with col3: st.write('Call_To_Action button Color:') with col4: # st.button('2') st.image(color_img,caption='CTA Button Color', channels='BGR') with col1: ctab=st.button("Select This CTA button to optimize", key = x) # st.image(color_img, channels='BGR') # ctab=st.button("Call_To_Action text: "+str(st.session_state.text[x])+"; color: "+str(st.session_state.ccolor[x]), key = x) # ctab=st.button("Select This CTA button to optimize", key = x) res=[] res.append(x) val={} val['value']=ctab res.append(val) buttons_out.append(res) return buttons_out '''def toggle_all(change): for cb in user_input: cb.value = select_all.value select_all = ipywidgets.Checkbox(value=False, description='Select All', disabled=False, indent=False) #display(select_all) for idx, i in enumerate(text): option_str = str(int(idx)+1) + '. Call-To-Action Text: ' cta_menu = ipywidgets.Checkbox(value=False, description=option_str, disabled=False, indent=False) btn_layout = ipywidgets.Layout(height='20px', width='20px') color_button = ipywidgets.Button(layout = btn_layout, description = '') color_button.style.button_color = ccolor[idx] widg_container = ipywidgets.GridBox([cta_menu, ipywidgets.Label((text[idx]).upper()), ipywidgets.Label(' Color: ') , color_button], layout=ipywidgets.Layout(grid_template_columns="180px 150px 50px 100px")) #display(widg_container) user_input.append(cta_menu) select_all.observe(toggle_all) return user_input''' def save_state(): if uploaded_file is not None: if 'industry_lists' not in st.session_state: st.session_state.industry_lists = industry_lists if 'campaign_types' not in st.session_state: st.session_state.campaign_types = campaign_types if 'target_variables' not in st.session_state: st.session_state.target_variables = target_variables if 'call2action' not in st.session_state: st.session_state.call2action = call2action if 'uploaded_file' not in st.session_state: st.session_state.uploaded_file = uploaded_file if 'industry' not in st.session_state: st.session_state.industry = industry if 'campaign' not in st.session_state: st.session_state.campaign = campaign if 'target' not in st.session_state: st.session_state.target = target if 'call2action_feature' not in st.session_state: st.session_state.call2action_feature = call2action_feature vtext, ccolor, text = email_parser(st.session_state.uploaded_file) save_state() ### Read in data def import_data(bucket, key): location = 's3://{}/{}'.format(bucket, key) df_data = pd.read_csv(location, encoding = "ISO-8859-1",index_col=0) df_data = df_data.reset_index(drop=True) return df_data ### Read in data def read_data(path, fname): df_data = pd.read_csv(path+fname, encoding = "ISO-8859-1",index_col=0) df_data = df_data.reset_index(drop=True) return df_data ### Model Training def get_predictions(selected_variable, selected_industry, selected_campaign, selected_cta, email_text, cta_col, cta_txt, cta_menu): bucket_name = 'sagemakermodelcta' if selected_variable == 'Click_To_Open_Rate': X_name = 'Xtest_MLP_CTOR.csv' # y_name = 'ytest_MLP_CTOR.csv' key = 'modelCTA_MLP_CTOR.sav' elif selected_variable == 'Conversion_Rate': X_name = 'Xtest_MLP_ConversionRate.csv' # y_name = 'ytest_MLP_Conversion_Rate.csv' key = 'modelCTA_MLP_ConversionRate.sav' # training_dataset = import_data('s3://emailcampaigntrainingdata/ModelCTA', 'recommendations.csv') training_dataset=read_data("./data/","recommendations.csv") # X_test = import_data('s3://emailcampaigntrainingdata/ModelCTA', X_name) # y_test = import_data('s3://emailcampaigntrainingdata/ModelCTA', y_name) # load model from S3 # with tempfile.TemporaryFile() as fp: # s3_client.download_fileobj(Fileobj=fp, Bucket=bucket_name, Key=key) # fp.seek(0) # regr = joblib.load(fp) model_file='./models/'+key regr=joblib.load(model_file) email_body_dict = {} for _, r in training_dataset.iterrows(): if r[0] not in email_body_dict.keys(): email_body_dict[r[0]] = r[4] email_body = email_body_dict.keys() texts = list(email_body_dict.values()) # texts = training_dataset['body'].unique() ## Use email body for NLP # texts = training_dataset['cta_text'].unique() # y_pred = regr.predict(X_test) # print(X_test) # r2_test = r2_score(y_test, y_pred) ## Get recommendation recom_model = text_embeddings(email_body) # recom_model = text_embeddings() industry_code_dict = dict(zip(training_dataset.industry, training_dataset.industry_code)) campaign_code_dict = dict(zip(training_dataset.campaign, training_dataset.campaign_code)) color_code_dict = dict(zip(training_dataset.cta_color, training_dataset.color_code)) text_code_dict = dict(zip(training_dataset.cta_text, training_dataset.text_code)) st.markdown('##### CTA_menue is: {}'.format(cta_menu), unsafe_allow_html=True) for ip_idx, ip in enumerate(cta_menu): # For each CTA selected if ip[1]['value'] == True: print(f'\n\x1b[4mCall-To-Action button {int(ip_idx)+1}\x1b[0m: ') cta_ind = ip_idx selected_color = cta_col[cta_ind] selected_text = cta_txt[cta_ind] df_uploaded = pd.DataFrame(columns=['industry', 'campaign', 'cta_color', 'cta_text']) df_uploaded.loc[0] = [selected_industry, selected_campaign, cta_col, cta_txt] df_uploaded['industry_code'] = industry_code_dict.get(selected_industry) # df_uploaded['campaign_code'] = campaign_code_dict.get(selected_campaign) if selected_campaign not in campaign_code_dict.keys(): campaign_code_dict[selected_campaign] = max(campaign_code_dict.values()) + 1 df_uploaded['campaign_code'] = campaign_code_dict.get(selected_campaign) if selected_color not in color_code_dict.keys(): color_code_dict[selected_color] = max(color_code_dict.values()) + 1 df_uploaded['color_code'] = color_code_dict.get(selected_color) if selected_text not in text_code_dict.keys(): text_code_dict[selected_text] = max(text_code_dict.values()) + 1 df_uploaded['text_code'] = text_code_dict.get(selected_text) df_uploaded_test = df_uploaded.drop(['industry', 'campaign', 'cta_color', 'cta_text'], axis = 1, inplace = False) df_uploaded_test = df_uploaded_test.dropna() # df_testset = df_uploaded_test.copy() # if selected_cta == 'Text': # for k in text_code_dict.keys(): # df_temp = df_uploaded_test.copy() # df_temp.text_code = text_code_dict.get(k) # df_testset = pd.concat([df_testset, df_temp], ignore_index=True) # # print(df_testset.drop_duplicates()) # arr = df_testset.to_numpy().astype('float64') # predicted_rate = regr.predict(arr) # sorted_index_array = np.argsort(predicted_rate) # sorted_array = predicted_rate[sorted_index_array] # print(sorted_array[-3 : ]) #print('Length', arr.size) arr = df_uploaded_test.to_numpy().astype('float64') arr_norm = normalize(arr, norm = 'l2') predicted_rate = regr.predict(arr_norm)[0] output_rate = predicted_rate if output_rate < 0: st.markdown("##### Sorry, Current model couldn't provide predictions on the target variable you selected.", unsafe_allow_html=True) else: print(f'\x1b[35m\nModel Prediction on the {selected_variable} is: \x1b[1m{round(output_rate*100, 2)}%\x1b[39m\x1b[22m') st.markdown("##### Model Prediction on the {} is {}".format(selected_variable, round(output_rate*100, 2)), unsafe_allow_html=True) selected_industry_code = industry_code_dict.get(selected_industry) selected_campaign_code = campaign_code_dict.get(selected_campaign) ### Create dataset for recommendation # select the certain industry that user selected ###+++++use training data+++++++ df_recom = training_dataset[["industry_code", "campaign_code", "cta_color", "cta_text", selected_variable]] df_recom = df_recom[df_recom["industry_code"] == selected_industry_code] # df_recom = df_recom[df_recom["campaign_code"] == selected_campaign_code] df_recom[selected_variable]=df_recom[selected_variable].apply(lambda x:round(x, 5)) df_recom_sort = df_recom.sort_values(by=[selected_variable]) ## Filter recommendatins for either CTA text or color recom_ind = 0 st.markdown('##### selected_cta is: {}'.format(selected_cta), unsafe_allow_html=True) if selected_cta == 'Color': df_recom = df_recom_sort.drop_duplicates(subset=['cta_color'], keep='last') # st.markdown('##### df_recom is: {}'.format(df_recom), unsafe_allow_html=True) replaces = False if len(df_recom) < 3: replaces = True df_recom_extra = df_recom.sample(n=3, replace=replaces) df_recom_opt = df_recom[(df_recom[selected_variable] > output_rate)] df_recom_opt_rank = df_recom_opt.head(n=3) df_recom_opt_rank_out = df_recom_opt_rank.sort_values(by=[selected_variable], ascending=False) # df_recom_opt_rank = df_recom_opt.nlargest(3, [selected_variable]) print(f"\nTo get a higher {selected_variable}, the model recommends the following options: ") st.markdown('##### To get a higher {}, the model recommends the following options:'.format(selected_variable), unsafe_allow_html=True) if len(df_recom_opt_rank_out) < 2: # st.markdown('##### Youve already achieved the highest {} with the current Call-To-Action Colors!'.format(selected_variable), unsafe_allow_html=True) # print("You've already achieved the highest", selected_variable, # "with the current Call-To-Action Colors!") increment = output_rate + (0.02*3) for _, row in df_recom_extra.iterrows(): target_rate = random.uniform(increment - 0.02, increment) increment = target_rate - 0.001 recom_cta = row[2] color_rgb=get_rgb(recom_cta) color_img=create_image(100,30,color_rgb) st.image(color_img, channels='BGR') # st.markdown('##### recom_cta is: {}'.format(recom_cta), unsafe_allow_html=True) st.markdown('##### target_rate for above recommended CTA button Color is: {}'.format(round(target_rate*100, 2)), unsafe_allow_html=True) print(f" {(color(' ', fore='#ffffff', back=recom_cta))} \x1b[1m{round(target_rate*100, 2)}%\x1b[22m") else: for _, row in df_recom_opt_rank_out.iterrows(): target_rate = row[4] recom_cta = row[2] color_rgb=get_rgb(recom_cta) color_img=create_image(100,30,color_rgb) st.image(color_img, channels='BGR') # st.markdown('##### recom_cta is: {}'.format(recom_cta), unsafe_allow_html=True) st.markdown('##### target_rate for above recommended CTA button Color is: {}'.format(round(target_rate*100, 2)), unsafe_allow_html=True) print(f" {(color(' ', fore='#ffffff', back=recom_cta))} \x1b[1m{round(target_rate*100, 2)}%\x1b[22m") elif selected_cta == 'Text': df_recom = df_recom_sort.drop_duplicates(subset=['cta_text'], keep='last') # df_recom_opt = df_recom[(df_recom[selected_variable] > output_rate)] # df_recom_opt_rank = df_recom_opt.sample(n=3) # df_recom_opt_rank_out = df_recom_opt_rank.sort_values(by=[selected_variable], ascending=False) # # df_recom_opt_rank = df_recom_opt.nlargest(3, [selected_variable]) words = simple_preprocess(email_text) test_doc_vector = recom_model.infer_vector(words) recom_similar = recom_model.dv.most_similar(positive = [test_doc_vector], topn=30) # query_vec = recom_model.encode([selected_text])[0] # df_cosine = pd.DataFrame(columns=["cta_text", "similarity"]) # for sent in texts: # sim = cosine(query_vec, recom_model.encode([sent])[0]) # # print("Sentence = ", sent, "; similarity = ", sim) # df_cosine.loc[len(df_cosine.index)] = [sent, sim] # print(df_cosine) # df_cosine_sort = df_cosine.sort_values(by=['similarity'], ascending=False) df_recom_opt_out = pd.DataFrame(columns=["industry_code", "campaign_code", "cta_color", "cta_text", selected_variable]) #for _, w in df_cosine_sort.iterrows(): for _, w in enumerate(recom_similar): sim_word = texts[w[0]] #w[0] # print(sim_word) df_recom_opt_sim = df_recom[df_recom['cta_text'] == sim_word] df_recom_opt_out = pd.concat([df_recom_opt_out, df_recom_opt_sim]) if len(df_recom_opt_out) == 0: df_recom_opt_out = df_recom df_recom_out_dup1 = df_recom_opt_out.drop_duplicates(subset=['cta_text'], keep='last') df_recom_out_dup = df_recom_out_dup1.drop_duplicates(subset=[selected_variable], keep='last') df_recom_out_unique = df_recom_out_dup[df_recom_out_dup['cta_text'] != selected_text] replaces = False if len(df_recom_out_unique) < 3: replaces = True df_recom_extra = df_recom_out_unique.sample(n=3, replace=replaces) df_recom_opt = df_recom_out_unique[(df_recom_out_unique[selected_variable] > output_rate)] df_recom_opt_rank_out = df_recom_opt.head(3).sort_values(by=[selected_variable], ascending=False) print(f"\nTo get a higher {selected_variable}, the model recommends the following options:") if len(df_recom_opt_rank_out) < 2: # print("You've already achieved the highest", selected_variable, # "with the current Call-To-Action Texts!") increment = output_rate + (0.02*3) for _, row in df_recom_extra.iterrows(): target_rate = random.uniform(increment - 0.02, increment) increment = target_rate - 0.001 recom_cta = row[3] print(f"\x1b[1m. {recom_cta.upper()} {round(target_rate*100, 2)}%\x1b[22m") else: for _, row in df_recom_opt_rank_out.iterrows(): target_rate = row[4] recom_cta = row[3] print(f"\x1b[1m. {recom_cta.upper()} {round(target_rate*100, 2)}%\x1b[22m") elif selected_cta == 'Both': # df_recom_cl = df_recom_sort.drop_duplicates(subset=['cta_color'], keep='last') # df_recom_tx = df_recom_sort.drop_duplicates(subset=['cta_text'], keep='last') df_recom_both = df_recom_sort.drop_duplicates(subset=['cta_color', 'cta_text'], keep='last') # df_recom_opt_both = df_recom_both[(df_recom_both[selected_variable] > output_rate)] # df_recom_opt_rank_both = df_recom_opt_both.sample(n=3) # df_recom_opt_rank_both_out = df_recom_opt_rank_both.sort_values(by=[selected_variable], ascending=False) # # df_recom_opt_rank_both = df_recom_opt_both.nlargest(3, [selected_variable]) words = simple_preprocess(email_text) test_doc_vector = recom_model.infer_vector(words) recom_similar = recom_model.dv.most_similar(positive = [test_doc_vector], topn=30) # query_vec = recom_model.encode([selected_text])[0] # df_cosine = pd.DataFrame(columns=["cta_text", "similarity"]) # for sent in texts: # sim = cosine(query_vec, recom_model.encode([sent])[0]) # df_cosine.loc[len(df_cosine.index)] = [sent, sim] # df_cosine_sort = df_cosine.sort_values(by=['similarity'], ascending=False) df_recom_opt_out = pd.DataFrame(columns=["industry_code", "campaign_code", "cta_color", "cta_text", selected_variable]) #for _, w in df_cosine_sort.iterrows(): for _, w in enumerate(recom_similar): sim_word = texts[w[0]] #w[0] df_recom_opt_sim = df_recom_both[df_recom_both['cta_text'] == sim_word] df_recom_opt_out = pd.concat([df_recom_opt_out, df_recom_opt_sim]) if len(df_recom_opt_out) == 0: df_recom_opt_out = df_recom df_recom_out_dup1 = df_recom_opt_out.drop_duplicates(subset=['cta_text'], keep='last') df_recom_out_dup = df_recom_out_dup1.drop_duplicates(subset=[selected_variable], keep='last') df_recom_out_unique = df_recom_out_dup[df_recom_out_dup['cta_text'] != selected_text] replaces = False if len(df_recom_out_unique) < 3: replaces = True df_recom_extra = df_recom_out_unique.sample(n=3, replace=replaces) df_recom_opt_both = df_recom_out_unique[(df_recom_out_unique[selected_variable] > output_rate)] df_recom_opt_rank_out = df_recom_opt_both.head(3).sort_values(by=[selected_variable], ascending=False) print(f"\nTo get a higher {selected_variable}, the model recommends the following options: ") # if (len(df_recom_opt_rank_cl_out) == 0) or (len(df_recom_opt_rank_tx_out) == 0): if len(df_recom_opt_rank_out) < 2 : # print("You've already achieved the highest", selected_variable, # "with the current Call-To-Action Colors!") increment = output_rate + (0.02*3) for _, row in df_recom_extra.iterrows(): target_rate = random.uniform(increment - 0.02, increment) increment = target_rate - 0.001 recom_color = row[2] recom_text = row[3] print(f" {(color(' ', fore='#ffffff', back=recom_color))} \x1b[1m{recom_text.upper()} {round(target_rate*100, 2)}%\x1b[22m") else: for _, row in df_recom_opt_rank_out.iterrows(): target_rate = row[4] recom_color = row[2] recom_text = row[3] print(f" {(color(' ', fore='#ffffff', back=recom_color))} \x1b[1m{recom_text.upper()} {round(target_rate*100, 2)}%\x1b[22m") # print(f"\x1b[1m\nTo get a higher {selected_variable}, the model recommends the following options: \x1b[22m") print('\n') # return r2_test generate_pred = st.button('Generate Predictions') if generate_pred: st.session_state.generate_pred = True if uploaded_file is None and st.session_state.generate_pred: st.error('Please upload a email (HTML format)') elif uploaded_file is not None and st.session_state.generate_pred: placeholder = st.empty() placeholder.text('Loading Data') # Starting predictions #vtext, ccolor, text = email_parser(st.session_state.uploaded_file) #utils.email_parser(uploaded_file.getvalue().decode("utf-8")) if (len(st.session_state.ccolor) > 0) and (len(st.session_state.text) > 0): cta_button = select_cta_button(st.session_state.ccolor, st.session_state.text) st.write(st.session_state) # get_predictions(st.session_state.target, st.session_state.industry, st.session_state.campaign, # st.session_state.call2action_feature, st.session_state.vtext, st.session_state.ccolor, st.session_state.text, cta_button) get_predictions(st.session_state.target, st.session_state.industry, st.session_state.campaign, call2action_feature, st.session_state.vtext, st.session_state.ccolor, st.session_state.text, cta_button) #st.info("Number of Call-To-Actions in the email: {}".format(len(text))) #cta_list = generate_cta_list(len(text)) #cta_selected = st.radio( # 'Select the Call-To-Action you would like to analyze ?', # cta_list) #base_string = display_CTA(text, ccolor) #st.components.v1.html(base_string, height=len(text)*30+50) #predict = st.button('Predict Optimial CTA') #cta_menu = [] #for i in range(len(text)): # cta_menu.append(ipywidgets.Checkbox( # value=False, # description='Call-To-Action Text: {}'.format(i+1), # disabled=False, # indent=False # )) #if cta_selected == 'All': # for i in range(len(text)): # cta_menu[i].value = True #else: # index = int(cta_selected.split(' ')[-1])-1 # cta_menu[index].value = True #if st.session_state.generate_pred and predict: # utils.get_predictions( # target, # industry, # campaign, # call2action_feature, # vtext, # ccolor, # text, # cta_menu) else: st.write(st.session_state) st.error("The email you uploaded does not contain any Call-To-Actions.") placeholder.text('')