import random import numpy as np import pandas as pd import plotly.express as px import streamlit as st import xlsxwriter from os import listdir from .lib import set_input, create_dowload_button from os.path import isfile, join, exists import printj class LogAnalyser: def __init__(self, gen, container_guide, container_param, container_button): self.gen, self.container_guide, self.container_param, self.container_button = gen, container_guide, container_param, container_button # self.gen.initialise_classifier_model() dirpath = 'data' log_file_paths = sorted( [join(dirpath, f) for f in listdir(dirpath) if isfile(join(dirpath, f)) and f.startswith('ist_log')]) self.path = container_param.selectbox( 'Select the log path', log_file_paths) self.df_path = f'data/df/{self.path.split("/")[-1]}' # if 'button1_counter' not in st.session_state: # st.session_state.button1_counter = 0 # if 'df' not in st.session_state: # self.df=0 st.markdown(self.get_text()) @staticmethod @st.cache def get_text(): return ''' ### Equation ``` frequency_penalty = 1 - emotion_frequency probability_emote = w * emotion_confidence + (1 - w) * frequency_penalty Show_Emotion = probability_emote > (Random value between 0 and 1) ``` ''' def display_logs(self): # self.container_param.markdown( # f'st.session_state.button1_counter: {st.session_state.button1_counter}') if exists(self.df_path): self.df = pd.read_csv(self.df_path) else: self.df = self.get_log() # if 'path' not in st.session_state: # st.session_state.path=self.path # if 'df' not in st.session_state or st.session_state.path!=self.path: # st.session_state.df=self.get_log(self.path, self.gen) # st.session_state.path=self.path self.update_df() def get_ngram_pattern(self, s, n=2): gnp = '' for i in range(len(s)-(n-1)): gnp += '1' if '1' in s[i:i+n] else '0' return gnp def update_df(self): reaction_weight = set_input(self.container_param, label='Reaction Weight w', min_value=0.0, max_value=1.0, value=0.5, step=0.01, key_slider='w_slider', key_input='w_input',) self.container_param_rv = self.container_param.columns([1, 1]) random_value_mode = self.container_param_rv[0].radio( "Random Value:", ["Random", "Fixed"]) # random_value = random.random() if random_value_mode == "Fixed": random_value = set_input(self.container_param, label='Random Value', key_slider='rand_slider', key_input='rand_input', min_value=0., max_value=1., value=.5, step=.01,) table_mode = self.container_param.radio( "Table Style:", ["Dataframe", "Table"]) self.show_pe_data = self.container_param.checkbox( 'Show Probability Emote', value=True, key='show_pe_data_log') self.score_threshold = set_input(self.container_param, label='Score Threshold', min_value=0.0, max_value=1.0, value=0.5, step=0.01, key_slider='score_threshold_slider', key_input='score_threshold_input',) df_reaction_pattern = pd.DataFrame() reaction_pattern_dict = dict() for story_id in self.df.Story.unique(): reaction_num = 0 reaction_frequency = 0 probability_emote = 0 random_value = 0 reaction_show = False def get_subset_condition(data): return (data.Story == story_id) & (data.Turn == 'user') subset_condition = get_subset_condition(self.df) dfs = self.df[subset_condition] for i, (index, row) in enumerate(dfs.iterrows()): if row.Emotion == 'neutral' or row.Score < self.score_threshold: reaction_show = False else: reaction_frequency = reaction_num/(i+1) probability_emote = row.Score*reaction_weight + \ (1-reaction_weight)*(1-reaction_frequency) if random_value_mode == "Random": random_value = random.random() reaction_show = True if probability_emote > random_value else False if reaction_show: reaction_num += 1 self.df.at[index, 'reaction_frequency'] = reaction_frequency self.df.at[index, 'probability_emote'] = probability_emote self.df.at[index, 'random_value'] = random_value self.df.at[index, 'reaction_show'] = reaction_show s = '' df_edit = self.df[get_subset_condition( self.df)].reaction_show.copy() df_edit = df_edit.dropna() for v in df_edit: s += str(int(v)) # df_reaction_pattern.at[story_id] # reaction_pattern_dict['story_id']=story_id reaction_pattern_dict['reaction_length'] = len(s) reaction_pattern_dict['reaction_1'] = s.count('1') reaction_pattern_dict['reaction_pattern'] = s for i in range(2, 8): reaction_pattern_dict[f'{i}-gram_pattern'] = self.get_ngram_pattern(s, n=i) df_reaction_pattern = pd.concat( [df_reaction_pattern, pd.DataFrame(reaction_pattern_dict, index=[f'Story_{story_id}'])]) # st.markdown(df_edit) # st.markdown(s) for story_id in self.df.Story.unique(): dfs = self.df[(self.df.Story == story_id)].copy() columns2hide = ['Unnamed: 0', 'Story', ] if not self.show_pe_data: columns2hide += [ "reaction_frequency", "probability_emote", "random_value", "reaction_show"] for c in columns2hide: dfs.drop(c, axis=1, inplace=True) st.markdown(f'#### Story {story_id}') dfs = dfs.style dfs = dfs.hide_index() if self.show_pe_data: dfs = dfs.apply(self.dfstyle_color_text_col, axis=1) # dfs = dfs.applymap(self.dfstyle_color_text) dfs = dfs.apply(self.rower, axis=None) dfs = dfs.set_table_styles([{ 'selector': 'tr:hover', 'props': 'color: #000000' # background-color: #eeee66;font-size: 1.01em; }]) # .hide_index() if table_mode == 'Dataframe': st.dataframe(dfs) # set_na_rep(" ").s # st.dataframe(df_reaction_pattern.iloc[story_id-1]) elif table_mode == 'Table': st.table(dfs) # st.table(df_reaction_pattern.iloc[story_id-1]) create_dowload_button(dfs, sheet_name=f'story_{story_id}', file_name=f'data_story_{story_id}.xlsx') # print(dfs.render()) if table_mode == 'Dataframe': st.dataframe(df_reaction_pattern) elif table_mode == 'Table': st.table(df_reaction_pattern) # @st.cache def dfstyle_color_text_col(self, s): result = ['background-color: white']*len(s) if s.Emotion == 'neutral' and s.Turn == 'user': result[2:-1] = ['color: #992222'] + \ ['color: #333333']+['color: #fcfcfc']*3 if s.Score < self.score_threshold and s.Turn == 'user': result[3:-1] = ['color: #992222'] + ['color: #fcfcfc']*3 printj.red(result) if s.reaction_show == 1: result[-1] = 'color: #222222' elif s.reaction_show == 0: result[-1] = 'color: #222222' else: print(s.reaction_show) print(type(s.reaction_show)) result[4:] = ['color: #fcfcfc']*4 # if s.probability_emote!=s.probability_emote: # result[5] = 'color: #eeeeee' return result # @staticmethod # @st.cache # def dfstyle_color_text(val): # if type(val)==str: # color = 'red' if val =='neutral' else 'black' # # elif type(val)==float: # # color = 'red' if val > .50000 else 'black' # elif val==None: # color = '#ffffff' # else: # color = None # return 'color: %s' % color if color is not None else '' @staticmethod @st.cache def rower(data): s = data.index % 2 != 0 s = pd.concat([pd.Series(s)] * data.shape[1], axis=1) z = pd.DataFrame(np.where(s, 'background-color:#f9f9f9', ''), index=data.index, columns=data.columns) return z def get_log(self): df = pd.DataFrame(data=[], columns=[]) log_dict = dict() with open(self.path) as f: lines = f.readlines() self.gen.initialise_classifier_model() story_num = 0 for i, line in enumerate(lines): if line.startswith('H:'): log_dict['Turn'] = 'haru' elif line.startswith('U:'): log_dict['Turn'] = 'user' else: story_num += 1 continue log_dict['Sentence'] = line[3:] log_dict['Story'] = story_num emotion = self.gen.get_emotion(log_dict['Sentence']) log_dict['Emotion'] = emotion['label'] log_dict['Score'] = emotion['score'] df = pd.concat( [df, pd.DataFrame(log_dict, index=[f'idx_{i}'])]) df = df.reset_index(drop=True) df.to_csv(self.df_path) return df def display_logs(gen, container_guide, container_param, container_button): la = LogAnalyser(gen, container_guide, container_param, container_button) la.display_logs() # df = la.update_df(la.df) if __name__ == '__main__': # df = LogAnalyser.get_log(path='data/ist_logs.txt') # initialize data of lists. data = {'Name': ['Tom', 'nick', 'krish', 'jack'], 'Age': [20, 21, 19, 18]} # Create DataFrame df = pd.DataFrame(data) print(df, type(df))