import random import numpy as np import pandas as pd import plotly.express as px import streamlit as st import xlsxwriter from os import listdir from .lib import set_input, create_dowload_button from os.path import isfile, join, exists import printj # import cv2 import matplotlib.image as mpimg class LogAnalyser: def __init__(self, gen, container_guide, container_param, container_button): self.gen, self.container_guide, self.container_param, self.container_button = gen, container_guide, container_param, container_button # self.gen.initialise_classifier_model() dirpath = 'data' log_file_paths = sorted( [join(dirpath, f) for f in listdir(dirpath) if isfile(join(dirpath, f)) and f.startswith('ist_log')]) self.path = container_param.selectbox( 'Select the log path', log_file_paths) self.df_path = f'data/df/{self.path.split("/")[-1].split(".")[0]}.csv' # if 'button1_counter' not in st.session_state: # st.session_state.button1_counter = 0 # if 'df' not in st.session_state: # self.df=0 st.markdown(self.get_text()) self.placeholder = dict() @staticmethod @st.cache def get_text(): return ''' ### Equation ``` frequency_penalty = 1 - emotion_frequency probability_emote = w * emotion_confidence + (1 - w) * frequency_penalty Show_Emotion = probability_emote > (Random value between 0 and 1) ``` ''' def display_logs(self): # self.container_param.markdown( # f'st.session_state.button1_counter: {st.session_state.button1_counter}') self.emotion_type = self.container_param.select_slider( 'How many Emotion data to show?', ['Max-only', '2', '3', '4', '5', '6', 'All 7']) self.debug = 'debug' in self.df_path if (not exists(self.df_path) or self.container_button.button('Detect Emotion')) and (not self.debug): self.df = self.get_log() # else: self.df = pd.read_csv(self.df_path) # if 'path' not in st.session_state: # st.session_state.path=self.path # if 'df' not in st.session_state or st.session_state.path!=self.path: # st.session_state.df=self.get_log(self.path, self.gen) # st.session_state.path=self.path self.update_df() if self.debug: for name in ['c1plot', 'c2plot']: self.placeholder[name] = st.empty() # image = cv2.imread(f'data/img/{name}.png') # image=cv2.cvtColor(image, cv2.COLOR_BGR2RGB) image = mpimg.imread(f'data/img/{name}.png') self.placeholder[name].image(image) self.get_c1_plot() self.get_c2_plot() def get_c1_plot(self): # c2_threshold=0 c1_threshold_list = np.arange(0, 1, 0.01) c1_reaction_weight_list = np.arange(0, 1, 0.1) # reaction_weight=0.5 list_stories = self.df.Story.unique() total_num_stories = len(list_stories) num_stories2show = 9 # int(set_input(self.container_param, # label='Number of stories to show', min_value=1, max_value=total_num_stories, value=9, step=1, # key_slider='num_stories2show_slider', key_input='num_stories2show_input',)) list_stories2show = list_stories[:num_stories2show] c1r_sum_list = [] df_c1_analysis = pd.DataFrame() c1_analysis_dict = dict() for reaction_weight in c1_reaction_weight_list: reaction_weight=np.round(reaction_weight, 2) for c1_threshold in c1_threshold_list: df_c1 = self.df.copy() for story_id in list_stories2show: reaction_num = 0 reaction_frequency = 0 probability_emote = 0 reaction_show = False subset_condition = self.get_subset_condition(df_c1, story_id) dfs = df_c1[subset_condition] for i, (index, row) in enumerate(dfs.iterrows()): if row.Emotion == 'neutral' or row.Score < self.score_threshold: reaction_show = False else: reaction_frequency = reaction_num/(i+1) probability_emote = row.Score*reaction_weight + \ (1-reaction_weight)*(1-reaction_frequency) reaction_show = True if probability_emote > c1_threshold else False if reaction_show: reaction_num += 1 df_c1.at[index, 'reaction_frequency'] = reaction_frequency df_c1.at[index, 'probability_emote'] = probability_emote df_c1.at[index, 'c1_threshold'] = c1_threshold df_c1.at[index, 'reaction_show'] = reaction_show df_c1.at[index, 'c1'] = reaction_show review = df_c1.e_review[index] df_c1.at[index, 'c1r'] = self.get_criteria_review( reaction_show, review=review) c1r_sum = df_c1['c1r'].sum() c1r_sum_list.append(c1r_sum) c1_analysis_dict['c1_threshold']=c1_threshold c1_analysis_dict['reaction_weight']=reaction_weight c1_analysis_dict['c1r_sum']=c1r_sum df_c1_analysis=pd.concat([df_c1_analysis, pd.DataFrame(c1_analysis_dict, index=[0])]) # fig = px.line(x=c1_threshold_list, y=c1r_sum_list) fig = px.line(data_frame=df_c1_analysis, x='c1_threshold', y='c1r_sum', color='reaction_weight') fig.update_layout( title="Criteria 1 analysis `PE > Threshold`", xaxis_title="PE Threshold", yaxis_title="Count of good reviews", # legend_title="Legend Title", font=dict( # family="Courier New, monospace", size=14, color="#006064" ), ) # st.plotly_chart(fig, use_container_width=True) self.placeholder['c1plot'].plotly_chart(fig, use_container_width=True) def get_c2_plot(self): # c2_threshold=0 c2_threshold_list = np.arange(0, 1, 0.01) list_stories = self.df.Story.unique() total_num_stories = len(list_stories) num_stories2show = 9 # int(set_input(self.container_param, # label='Number of stories to show', min_value=1, max_value=total_num_stories, value=9, step=1, # key_slider='num_stories2show_slider', key_input='num_stories2show_input',)) list_stories2show = list_stories[:num_stories2show] c2r_sum_list = [] for c2_threshold in c2_threshold_list: df_c2 = self.df.copy() for story_id in list_stories2show: subset_condition = self.get_subset_condition(df_c2, story_id) dfs = df_c2[subset_condition] for i, (index, row) in enumerate(dfs.iterrows()): c2 = row.Emotion != 'neutral' and row.Score > c2_threshold df_c2.at[index, 'c2'] = c2 review = df_c2.e_review[index] df_c2.at[index, 'c2r'] = self.get_criteria_review( c2, review=review) c2r_sum_list.append(df_c2['c2r'].sum()) fig = px.line(x=c2_threshold_list, y=c2r_sum_list) fig.update_layout( title="Criteria 2 analysis `CS > Threshold`", xaxis_title="CS Threshold", yaxis_title="Count of good reviews", # legend_title="Legend Title", font=dict( # family="Courier New, monospace", size=14, color="#006064" ), ) self.placeholder['c2plot'].plotly_chart(fig, use_container_width=True) @staticmethod def get_subset_condition(data, story_id): return (data.Story == story_id) & (data.Turn == 'user') @staticmethod def get_criteria_review(c, review): # printj.green(f'{c} {type(c)}') # printj.green(f'{review} {type(review)}') result = int(c == True and (review == 'o' or review == None)) + int( c == False and review == 'x') return np.round(result, 0) # return str(np.round(result, 0)) def get_ngram_pattern(self, s, n=2): gnp = '' for i in range(len(s)-(n-1)): gnp += '1' if '1' in s[i:i+n] else '0' return gnp def update_df(self): list_stories = self.df.Story.unique() total_num_stories = len(list_stories) num_stories2show = int(set_input(self.container_param, label='No. of stories to show', min_value=1, max_value=total_num_stories, value=9, step=1, key_slider='num_stories2show_slider', key_input='num_stories2show_input',)) list_stories2show = list_stories[:num_stories2show] reaction_weight = set_input(self.container_param, label='Reaction Weight w', min_value=0.0, max_value=1.0, value=0.5, step=0.01, key_slider='w_slider', key_input='w_input',) self.container_param_rv = self.container_param.columns([1, 1]) random_value_mode = self.container_param_rv[0].radio( "C1 Threshold type", ["Random", "Fixed"], index=1) # random_value = random.random() if random_value_mode == "Fixed": random_value = set_input(self.container_param, label='C1 Threshold', key_slider='rand_slider', key_input='rand_input', min_value=0., max_value=1., value=.5, step=.01,) c2_threshold = set_input(self.container_param, label='C2 Threshold', min_value=0.0, max_value=1.0, value=0.7, step=0.01, key_slider='c2_threshold_slider', key_input='c2_threshold_input',) table_mode = self.container_param.radio( "Table Style:", ["Dataframe", "Table"]) self.show_pe_data = self.container_param.checkbox( 'Show Probability Emote', value=True, key='show_pe_data_log') self.score_threshold = set_input(self.container_param, label='Score Threshold', min_value=0.0, max_value=1.0, value=0.5, step=0.01, key_slider='score_threshold_slider', key_input='score_threshold_input',) df_reaction_pattern = pd.DataFrame() reaction_pattern_dict = dict() for story_id in list_stories2show: reaction_num = 0 reaction_frequency = 0 probability_emote = 0 # random_value = 0 reaction_show = False # c2 = True subset_condition = self.get_subset_condition(self.df, story_id) dfs = self.df[subset_condition] for i, (index, row) in enumerate(dfs.iterrows()): if row.Emotion == 'neutral' or row.Score < self.score_threshold: reaction_show = False else: reaction_frequency = reaction_num/(i+1) probability_emote = row.Score*reaction_weight + \ (1-reaction_weight)*(1-reaction_frequency) if random_value_mode == "Random": random_value = random.random() reaction_show = True if probability_emote > random_value else False if reaction_show: reaction_num += 1 self.df.at[index, 'reaction_frequency'] = reaction_frequency self.df.at[index, 'probability_emote'] = probability_emote self.df.at[index, 'random_value'] = random_value self.df.at[index, 'reaction_show'] = reaction_show self.df.at[index, 'c1'] = reaction_show c2 = row.Emotion != 'neutral' and row.Score > c2_threshold self.df.at[index, 'c2'] = c2 review = self.df.e_review[index] self.df.at[index, 'c1r'] = self.get_criteria_review( reaction_show, review=review) self.df.at[index, 'c2r'] = self.get_criteria_review( c2, review=review) s = '' df_edit = self.df[self.get_subset_condition( self.df, story_id)].reaction_show.copy() df_edit = df_edit.dropna() for v in df_edit: s += str(int(v)) # df_reaction_pattern.at[story_id] # reaction_pattern_dict['story_id']=story_id reaction_pattern_dict['reaction_length'] = len(s) reaction_pattern_dict['reaction_1'] = s.count('1') reaction_pattern_dict['reaction_pattern'] = s for i in range(2, 8): reaction_pattern_dict[f'{i}-gram_pattern'] = self.get_ngram_pattern( s, n=i) df_reaction_pattern = pd.concat( [df_reaction_pattern, pd.DataFrame(reaction_pattern_dict, index=[f'Story_{story_id}'])]) # st.markdown(df_edit) # st.markdown(s) # for c in ['c1r', 'c2r']: # st.markdown(f'Sum of {c} : {self.df[c].sum()}') df_show = self.df.copy() for c in ['c1r', 'c2r']: df_show[c] = df_show[c].fillna(0).astype(int) st.markdown(f'Sum of {c} : {df_show[c].sum()}') for story_id in list_stories2show: dfs = df_show[(df_show.Story == story_id)].copy() columns2hide = ['Unnamed: 0', 'Story', ] if not self.debug: columns2hide += ['e_review'] if self.emotion_type == 'Max-only': columns2hide += [ f'Emotion_{sorted_i+1}' for sorted_i in range(7)] columns2hide += [ f'Score_{sorted_i+1}' for sorted_i in range(7)] if not self.show_pe_data: columns2hide += [ "reaction_frequency", "probability_emote", "random_value", "reaction_show"] for c in columns2hide: dfs.drop(c, axis=1, inplace=True) st.markdown(f'#### Story {story_id}') dfs = dfs.style if self.show_pe_data: dfs = dfs.apply(self.dfstyle_color_text_col, axis=1) # dfs = dfs.applymap(self.dfstyle_color_text) dfs = dfs.apply(self.rower, axis=None) dfs = dfs.set_table_styles([{ 'selector': 'tr:hover', 'props': 'color: #000000' # background-color: #eeee66;font-size: 1.01em; }]) # .hide_index() if table_mode == 'Dataframe': st.dataframe(dfs) # set_na_rep(" ").s # st.dataframe(df_reaction_pattern.iloc[story_id-1]) elif table_mode == 'Table': st.table(dfs) # st.table(df_reaction_pattern.iloc[story_id-1]) create_dowload_button( dfs, sheet_name=f'story_{story_id}', file_name=f'data_story_{story_id}.xlsx') # print(dfs.render()) if table_mode == 'Dataframe': st.dataframe(df_reaction_pattern) elif table_mode == 'Table': st.table(df_reaction_pattern) # @st.cache def dfstyle_color_text_col(self, s): num_col = len(s) result = ['background-color: white']*len(s) # if s.Emotion == 'neutral' and s.Turn == 'user': # result[-6:-1] = ['color: #992222'] + \ # ['color: #333333']+['color: #fcfcfc']*3 for si, sc in enumerate(s): if sc != sc: result[si] = 'color: #fcfcfc' # printj.red.bold_on_white(s) # printj.red.bold_on_cyan(si) # printj.red.bold_on_cyan(sc) # if s.Score < self.score_threshold and s.Turn == 'user': # result[-5:-1] = ['color: #992222'] + ['color: #fcfcfc']*3 # printj.red(result) # printj.red.bold_on_cyan(s) # printj.red.bold_on_cyan(type(s)) # printj.red.bold_on_white(s.keys().tolist()) # printj.red.bold_on_white(type(s.keys().tolist())) # idx_reaction_show = s.keys().tolist().index("reaction_show") # printj.red.bold_on_white(idx_reaction_show) # if s.reaction_show == 1: # # result[idx_reaction_show] = 'color: #222222' # pass # elif s.reaction_show == 0: # # result[idx_reaction_show] = 'color: #222222' # pass # else: # # print(s.reaction_show) # # print(type(s.reaction_show)) # hide_length = 3 # result[idx_reaction_show-hide_length:] = ['color: #fcfcfc']*(num_col-idx_reaction_show+hide_length) # if s.probability_emote!=s.probability_emote: # result[5] = 'color: #eeeeee' return result # @staticmethod # @st.cache # def dfstyle_color_text(val): # if type(val)==str: # color = 'red' if val =='neutral' else 'black' # # elif type(val)==float: # # color = 'red' if val > .50000 else 'black' # elif val==None: # color = '#ffffff' # else: # color = None # return 'color: %s' % color if color is not None else '' @staticmethod @st.cache def rower(data): s = data.index % 2 != 0 s = pd.concat([pd.Series(s)] * data.shape[1], axis=1) z = pd.DataFrame(np.where(s, 'background-color:#f9f9f9', ''), index=data.index, columns=data.columns) return z def get_log(self): df = pd.DataFrame(data=[], columns=[]) log_dict = dict() with open(self.path) as f: lines = f.readlines() self.gen.initialise_classifier_model() story_num = 0 for i, line in enumerate(lines): if line.startswith('H:'): log_dict['Turn'] = 'haru' elif line.startswith('U:'): log_dict['Turn'] = 'user' else: story_num += 1 continue log_dict['Sentence'] = line[3:] log_dict['Story'] = story_num emotion_type = 'sorted' # 'max' if self.emotion_type == 'max': emotion_type = 'max' else: emotion_type = 'sorted' # emotion = self.gen.get_emotion( log_dict['Sentence'], filter_by=emotion_type) if emotion_type == 'max': log_dict['Emotion'] = emotion['label'] log_dict['Score'] = emotion['score'] elif emotion_type == 'sorted': for sorted_i in range(len(emotion)): log_dict[f'Emotion_{sorted_i+1}'] = emotion[sorted_i]['label'] log_dict[f'Score_{sorted_i+1}'] = emotion[sorted_i]['score'] log_dict['Emotion'] = emotion[0]['label'] log_dict['Score'] = emotion[0]['score'] log_dict['e_review'] = ' ' df = pd.concat( [df, pd.DataFrame(log_dict, index=[f'idx_{i}'])]) df = df.reset_index(drop=True) df.to_csv(self.df_path) return df def display_logs(gen, container_guide, container_param, container_button): la = LogAnalyser(gen, container_guide, container_param, container_button) la.display_logs() # df = la.update_df(la.df) if __name__ == '__main__': # df = LogAnalyser.get_log(path='data/ist_logs.txt') # initialize data of lists. # data = {'Name': ['Tom', 'nick', 'krish', 'jack'], # 'Age': [20, 21, 19, 18]} # # Create DataFrame # df = pd.DataFrame(data) # print(df, type(df)) os.system('./run.sh')