Spaces:
Runtime error
Runtime error
import random | |
import numpy as np | |
import pandas as pd | |
import plotly.express as px | |
import streamlit as st | |
import xlsxwriter | |
from os import listdir | |
from .lib import set_input, create_dowload_button | |
from os.path import isfile, join, exists | |
import printj | |
class LogAnalyser: | |
def __init__(self, gen, container_guide, container_param, container_button): | |
self.gen, self.container_guide, self.container_param, self.container_button = gen, container_guide, container_param, container_button | |
# self.gen.initialise_classifier_model() | |
dirpath = 'data' | |
log_file_paths = sorted( | |
[join(dirpath, f) for f in listdir(dirpath) if isfile(join(dirpath, f)) and f.startswith('ist_log')]) | |
self.path = container_param.selectbox( | |
'Select the log path', log_file_paths) | |
self.df_path = f'data/df/{self.path.split("/")[-1]}' | |
# if 'button1_counter' not in st.session_state: | |
# st.session_state.button1_counter = 0 | |
# if 'df' not in st.session_state: | |
# self.df=0 | |
st.markdown(self.get_text()) | |
def get_text(): | |
return ''' | |
### Equation | |
``` | |
frequency_penalty = 1 - emotion_frequency | |
probability_emote = w * emotion_confidence + (1 - w) * frequency_penalty | |
Show_Emotion = probability_emote > (Random value between 0 and 1) | |
``` | |
''' | |
def display_logs(self): | |
# self.container_param.markdown( | |
# f'st.session_state.button1_counter: {st.session_state.button1_counter}') | |
if exists(self.df_path): | |
self.df = pd.read_csv(self.df_path) | |
else: | |
self.df = self.get_log() | |
# if 'path' not in st.session_state: | |
# st.session_state.path=self.path | |
# if 'df' not in st.session_state or st.session_state.path!=self.path: | |
# st.session_state.df=self.get_log(self.path, self.gen) | |
# st.session_state.path=self.path | |
self.update_df() | |
def get_ngram_pattern(self, s, n=2): | |
gnp = '' | |
for i in range(len(s)-(n-1)): | |
gnp += '1' if '1' in s[i:i+n] else '0' | |
return gnp | |
def update_df(self): | |
reaction_weight = set_input(self.container_param, | |
label='Reaction Weight w', min_value=0.0, max_value=1.0, value=0.5, step=0.01, | |
key_slider='w_slider', key_input='w_input',) | |
self.container_param_rv = self.container_param.columns([1, 1]) | |
random_value_mode = self.container_param_rv[0].radio( | |
"Random Value:", ["Random", "Fixed"]) | |
# random_value = random.random() | |
if random_value_mode == "Fixed": | |
random_value = set_input(self.container_param, | |
label='Random Value', key_slider='rand_slider', key_input='rand_input', | |
min_value=0., | |
max_value=1., | |
value=.5, | |
step=.01,) | |
table_mode = self.container_param.radio( | |
"Table Style:", ["Dataframe", "Table"]) | |
self.show_pe_data = self.container_param.checkbox( | |
'Show Probability Emote', value=True, key='show_pe_data_log') | |
self.score_threshold = set_input(self.container_param, | |
label='Score Threshold', min_value=0.0, max_value=1.0, value=0.5, step=0.01, | |
key_slider='score_threshold_slider', key_input='score_threshold_input',) | |
df_reaction_pattern = pd.DataFrame() | |
reaction_pattern_dict = dict() | |
for story_id in self.df.Story.unique(): | |
reaction_num = 0 | |
reaction_frequency = 0 | |
probability_emote = 0 | |
random_value = 0 | |
reaction_show = False | |
def get_subset_condition(data): | |
return (data.Story == story_id) & (data.Turn == 'user') | |
subset_condition = get_subset_condition(self.df) | |
dfs = self.df[subset_condition] | |
for i, (index, row) in enumerate(dfs.iterrows()): | |
if row.Emotion == 'neutral' or row.Score < self.score_threshold: | |
reaction_show = False | |
else: | |
reaction_frequency = reaction_num/(i+1) | |
probability_emote = row.Score*reaction_weight + \ | |
(1-reaction_weight)*(1-reaction_frequency) | |
if random_value_mode == "Random": | |
random_value = random.random() | |
reaction_show = True if probability_emote > random_value else False | |
if reaction_show: | |
reaction_num += 1 | |
self.df.at[index, 'reaction_frequency'] = reaction_frequency | |
self.df.at[index, 'probability_emote'] = probability_emote | |
self.df.at[index, 'random_value'] = random_value | |
self.df.at[index, 'reaction_show'] = reaction_show | |
s = '' | |
df_edit = self.df[get_subset_condition( | |
self.df)].reaction_show.copy() | |
df_edit = df_edit.dropna() | |
for v in df_edit: | |
s += str(int(v)) | |
# df_reaction_pattern.at[story_id] | |
# reaction_pattern_dict['story_id']=story_id | |
reaction_pattern_dict['reaction_length'] = len(s) | |
reaction_pattern_dict['reaction_1'] = s.count('1') | |
reaction_pattern_dict['reaction_pattern'] = s | |
for i in range(2, 8): | |
reaction_pattern_dict[f'{i}-gram_pattern'] = self.get_ngram_pattern(s, n=i) | |
df_reaction_pattern = pd.concat( | |
[df_reaction_pattern, pd.DataFrame(reaction_pattern_dict, index=[f'Story_{story_id}'])]) | |
# st.markdown(df_edit) | |
# st.markdown(s) | |
for story_id in self.df.Story.unique(): | |
dfs = self.df[(self.df.Story == story_id)].copy() | |
columns2hide = ['Unnamed: 0', 'Story', ] | |
if not self.show_pe_data: | |
columns2hide += [ | |
"reaction_frequency", "probability_emote", "random_value", "reaction_show"] | |
for c in columns2hide: | |
dfs.drop(c, axis=1, inplace=True) | |
st.markdown(f'#### Story {story_id}') | |
dfs = dfs.style | |
dfs = dfs.hide_index() | |
if self.show_pe_data: | |
dfs = dfs.apply(self.dfstyle_color_text_col, axis=1) | |
# dfs = dfs.applymap(self.dfstyle_color_text) | |
dfs = dfs.apply(self.rower, axis=None) | |
dfs = dfs.set_table_styles([{ | |
'selector': 'tr:hover', | |
'props': 'color: #000000' # background-color: #eeee66;font-size: 1.01em; | |
}]) # .hide_index() | |
if table_mode == 'Dataframe': | |
st.dataframe(dfs) | |
# set_na_rep(" ").s | |
# st.dataframe(df_reaction_pattern.iloc[story_id-1]) | |
elif table_mode == 'Table': | |
st.table(dfs) | |
# st.table(df_reaction_pattern.iloc[story_id-1]) | |
create_dowload_button(dfs, sheet_name=f'story_{story_id}', file_name=f'data_story_{story_id}.xlsx') | |
# print(dfs.render()) | |
if table_mode == 'Dataframe': | |
st.dataframe(df_reaction_pattern) | |
elif table_mode == 'Table': | |
st.table(df_reaction_pattern) | |
# @st.cache | |
def dfstyle_color_text_col(self, s): | |
result = ['background-color: white']*len(s) | |
if s.Emotion == 'neutral' and s.Turn == 'user': | |
result[2:-1] = ['color: #992222'] + \ | |
['color: #333333']+['color: #fcfcfc']*3 | |
if s.Score < self.score_threshold and s.Turn == 'user': | |
result[3:-1] = ['color: #992222'] + ['color: #fcfcfc']*3 | |
printj.red(result) | |
if s.reaction_show == 1: | |
result[-1] = 'color: #222222' | |
elif s.reaction_show == 0: | |
result[-1] = 'color: #222222' | |
else: | |
print(s.reaction_show) | |
print(type(s.reaction_show)) | |
result[4:] = ['color: #fcfcfc']*4 | |
# if s.probability_emote!=s.probability_emote: | |
# result[5] = 'color: #eeeeee' | |
return result | |
# @staticmethod | |
# @st.cache | |
# def dfstyle_color_text(val): | |
# if type(val)==str: | |
# color = 'red' if val =='neutral' else 'black' | |
# # elif type(val)==float: | |
# # color = 'red' if val > .50000 else 'black' | |
# elif val==None: | |
# color = '#ffffff' | |
# else: | |
# color = None | |
# return 'color: %s' % color if color is not None else '' | |
def rower(data): | |
s = data.index % 2 != 0 | |
s = pd.concat([pd.Series(s)] * data.shape[1], | |
axis=1) | |
z = pd.DataFrame(np.where(s, 'background-color:#f9f9f9', ''), | |
index=data.index, columns=data.columns) | |
return z | |
def get_log(self): | |
df = pd.DataFrame(data=[], columns=[]) | |
log_dict = dict() | |
with open(self.path) as f: | |
lines = f.readlines() | |
self.gen.initialise_classifier_model() | |
story_num = 0 | |
for i, line in enumerate(lines): | |
if line.startswith('H:'): | |
log_dict['Turn'] = 'haru' | |
elif line.startswith('U:'): | |
log_dict['Turn'] = 'user' | |
else: | |
story_num += 1 | |
continue | |
log_dict['Sentence'] = line[3:] | |
log_dict['Story'] = story_num | |
emotion = self.gen.get_emotion(log_dict['Sentence']) | |
log_dict['Emotion'] = emotion['label'] | |
log_dict['Score'] = emotion['score'] | |
df = pd.concat( | |
[df, pd.DataFrame(log_dict, index=[f'idx_{i}'])]) | |
df = df.reset_index(drop=True) | |
df.to_csv(self.df_path) | |
return df | |
def display_logs(gen, container_guide, container_param, container_button): | |
la = LogAnalyser(gen, container_guide, container_param, container_button) | |
la.display_logs() | |
# df = la.update_df(la.df) | |
if __name__ == '__main__': | |
# df = LogAnalyser.get_log(path='data/ist_logs.txt') | |
# initialize data of lists. | |
data = {'Name': ['Tom', 'nick', 'krish', 'jack'], | |
'Age': [20, 21, 19, 18]} | |
# Create DataFrame | |
df = pd.DataFrame(data) | |
print(df, type(df)) | |