storytelling / src /read_logs.py
jitesh's picture
adds download button
1115bb5
raw
history blame
10.6 kB
import random
import numpy as np
import pandas as pd
import plotly.express as px
import streamlit as st
import xlsxwriter
from os import listdir
from .lib import set_input, create_dowload_button
from os.path import isfile, join, exists
import printj
class LogAnalyser:
def __init__(self, gen, container_guide, container_param, container_button):
self.gen, self.container_guide, self.container_param, self.container_button = gen, container_guide, container_param, container_button
# self.gen.initialise_classifier_model()
dirpath = 'data'
log_file_paths = sorted(
[join(dirpath, f) for f in listdir(dirpath) if isfile(join(dirpath, f)) and f.startswith('ist_log')])
self.path = container_param.selectbox(
'Select the log path', log_file_paths)
self.df_path = f'data/df/{self.path.split("/")[-1]}'
# if 'button1_counter' not in st.session_state:
# st.session_state.button1_counter = 0
# if 'df' not in st.session_state:
# self.df=0
st.markdown(self.get_text())
@staticmethod
@st.cache
def get_text():
return '''
### Equation
```
frequency_penalty = 1 - emotion_frequency
probability_emote = w * emotion_confidence + (1 - w) * frequency_penalty
Show_Emotion = probability_emote > (Random value between 0 and 1)
```
'''
def display_logs(self):
# self.container_param.markdown(
# f'st.session_state.button1_counter: {st.session_state.button1_counter}')
if exists(self.df_path):
self.df = pd.read_csv(self.df_path)
else:
self.df = self.get_log()
# if 'path' not in st.session_state:
# st.session_state.path=self.path
# if 'df' not in st.session_state or st.session_state.path!=self.path:
# st.session_state.df=self.get_log(self.path, self.gen)
# st.session_state.path=self.path
self.update_df()
def get_ngram_pattern(self, s, n=2):
gnp = ''
for i in range(len(s)-(n-1)):
gnp += '1' if '1' in s[i:i+n] else '0'
return gnp
def update_df(self):
reaction_weight = set_input(self.container_param,
label='Reaction Weight w', min_value=0.0, max_value=1.0, value=0.5, step=0.01,
key_slider='w_slider', key_input='w_input',)
self.container_param_rv = self.container_param.columns([1, 1])
random_value_mode = self.container_param_rv[0].radio(
"Random Value:", ["Random", "Fixed"])
# random_value = random.random()
if random_value_mode == "Fixed":
random_value = set_input(self.container_param,
label='Random Value', key_slider='rand_slider', key_input='rand_input',
min_value=0.,
max_value=1.,
value=.5,
step=.01,)
table_mode = self.container_param.radio(
"Table Style:", ["Dataframe", "Table"])
self.show_pe_data = self.container_param.checkbox(
'Show Probability Emote', value=True, key='show_pe_data_log')
self.score_threshold = set_input(self.container_param,
label='Score Threshold', min_value=0.0, max_value=1.0, value=0.5, step=0.01,
key_slider='score_threshold_slider', key_input='score_threshold_input',)
df_reaction_pattern = pd.DataFrame()
reaction_pattern_dict = dict()
for story_id in self.df.Story.unique():
reaction_num = 0
reaction_frequency = 0
probability_emote = 0
random_value = 0
reaction_show = False
def get_subset_condition(data):
return (data.Story == story_id) & (data.Turn == 'user')
subset_condition = get_subset_condition(self.df)
dfs = self.df[subset_condition]
for i, (index, row) in enumerate(dfs.iterrows()):
if row.Emotion == 'neutral' or row.Score < self.score_threshold:
reaction_show = False
else:
reaction_frequency = reaction_num/(i+1)
probability_emote = row.Score*reaction_weight + \
(1-reaction_weight)*(1-reaction_frequency)
if random_value_mode == "Random":
random_value = random.random()
reaction_show = True if probability_emote > random_value else False
if reaction_show:
reaction_num += 1
self.df.at[index, 'reaction_frequency'] = reaction_frequency
self.df.at[index, 'probability_emote'] = probability_emote
self.df.at[index, 'random_value'] = random_value
self.df.at[index, 'reaction_show'] = reaction_show
s = ''
df_edit = self.df[get_subset_condition(
self.df)].reaction_show.copy()
df_edit = df_edit.dropna()
for v in df_edit:
s += str(int(v))
# df_reaction_pattern.at[story_id]
# reaction_pattern_dict['story_id']=story_id
reaction_pattern_dict['reaction_length'] = len(s)
reaction_pattern_dict['reaction_1'] = s.count('1')
reaction_pattern_dict['reaction_pattern'] = s
for i in range(2, 8):
reaction_pattern_dict[f'{i}-gram_pattern'] = self.get_ngram_pattern(s, n=i)
df_reaction_pattern = pd.concat(
[df_reaction_pattern, pd.DataFrame(reaction_pattern_dict, index=[f'Story_{story_id}'])])
# st.markdown(df_edit)
# st.markdown(s)
for story_id in self.df.Story.unique():
dfs = self.df[(self.df.Story == story_id)].copy()
columns2hide = ['Unnamed: 0', 'Story', ]
if not self.show_pe_data:
columns2hide += [
"reaction_frequency", "probability_emote", "random_value", "reaction_show"]
for c in columns2hide:
dfs.drop(c, axis=1, inplace=True)
st.markdown(f'#### Story {story_id}')
dfs = dfs.style
dfs = dfs.hide_index()
if self.show_pe_data:
dfs = dfs.apply(self.dfstyle_color_text_col, axis=1)
# dfs = dfs.applymap(self.dfstyle_color_text)
dfs = dfs.apply(self.rower, axis=None)
dfs = dfs.set_table_styles([{
'selector': 'tr:hover',
'props': 'color: #000000' # background-color: #eeee66;font-size: 1.01em;
}]) # .hide_index()
if table_mode == 'Dataframe':
st.dataframe(dfs)
# set_na_rep(" ").s
# st.dataframe(df_reaction_pattern.iloc[story_id-1])
elif table_mode == 'Table':
st.table(dfs)
# st.table(df_reaction_pattern.iloc[story_id-1])
create_dowload_button(dfs, sheet_name=f'story_{story_id}', file_name=f'data_story_{story_id}.xlsx')
# print(dfs.render())
if table_mode == 'Dataframe':
st.dataframe(df_reaction_pattern)
elif table_mode == 'Table':
st.table(df_reaction_pattern)
# @st.cache
def dfstyle_color_text_col(self, s):
result = ['background-color: white']*len(s)
if s.Emotion == 'neutral' and s.Turn == 'user':
result[2:-1] = ['color: #992222'] + \
['color: #333333']+['color: #fcfcfc']*3
if s.Score < self.score_threshold and s.Turn == 'user':
result[3:-1] = ['color: #992222'] + ['color: #fcfcfc']*3
printj.red(result)
if s.reaction_show == 1:
result[-1] = 'color: #222222'
elif s.reaction_show == 0:
result[-1] = 'color: #222222'
else:
print(s.reaction_show)
print(type(s.reaction_show))
result[4:] = ['color: #fcfcfc']*4
# if s.probability_emote!=s.probability_emote:
# result[5] = 'color: #eeeeee'
return result
# @staticmethod
# @st.cache
# def dfstyle_color_text(val):
# if type(val)==str:
# color = 'red' if val =='neutral' else 'black'
# # elif type(val)==float:
# # color = 'red' if val > .50000 else 'black'
# elif val==None:
# color = '#ffffff'
# else:
# color = None
# return 'color: %s' % color if color is not None else ''
@staticmethod
@st.cache
def rower(data):
s = data.index % 2 != 0
s = pd.concat([pd.Series(s)] * data.shape[1],
axis=1)
z = pd.DataFrame(np.where(s, 'background-color:#f9f9f9', ''),
index=data.index, columns=data.columns)
return z
def get_log(self):
df = pd.DataFrame(data=[], columns=[])
log_dict = dict()
with open(self.path) as f:
lines = f.readlines()
self.gen.initialise_classifier_model()
story_num = 0
for i, line in enumerate(lines):
if line.startswith('H:'):
log_dict['Turn'] = 'haru'
elif line.startswith('U:'):
log_dict['Turn'] = 'user'
else:
story_num += 1
continue
log_dict['Sentence'] = line[3:]
log_dict['Story'] = story_num
emotion = self.gen.get_emotion(log_dict['Sentence'])
log_dict['Emotion'] = emotion['label']
log_dict['Score'] = emotion['score']
df = pd.concat(
[df, pd.DataFrame(log_dict, index=[f'idx_{i}'])])
df = df.reset_index(drop=True)
df.to_csv(self.df_path)
return df
def display_logs(gen, container_guide, container_param, container_button):
la = LogAnalyser(gen, container_guide, container_param, container_button)
la.display_logs()
# df = la.update_df(la.df)
if __name__ == '__main__':
# df = LogAnalyser.get_log(path='data/ist_logs.txt')
# initialize data of lists.
data = {'Name': ['Tom', 'nick', 'krish', 'jack'],
'Age': [20, 21, 19, 18]}
# Create DataFrame
df = pd.DataFrame(data)
print(df, type(df))