ts-explorations / app.py
laverdes's picture
chore: columns refactor
ef746ab
raw
history blame
5.3 kB
import streamlit as st
import pathlib
import json
import pandas as pd
st. set_page_config(layout="wide")
st.header("Time Series Preprocessing Pipeline")
st.markdown("Users can load their time-series data and select a set of transformations to prepare a training set for univariate or multivariate time-series classification.\
Go ahead and use the sidebar on the left to upload your data files in *.json* format and start exploring and transforming it!")
col1, col2 = st.columns(2)
@st.experimental_memo
def convert_df(df):
return df.to_csv(index=False).encode('utf-8')
# Load a prepare data
file_names, file_bytes = [], []
with st.sidebar:
files = st.file_uploader("Load files", accept_multiple_files = True)
if files:
file_names = [file.name for file in files]
file_bytes = [file.getvalue() for file in files]
st.success("Your data has been successfully loaded! 🤗")
data_dict = dict({'trial_id':[], 'pupil_dilation':[], 'baseline':[], 'rating':[]})
with st.spinner("Building base dictionary..."):
for file_data in file_bytes:
data = json.loads(file_data)
for k in data:
for i in data[k]:
for k, v in i.items():
data_dict[k].append(v)
df_base = pd.DataFrame() # {'<fields>' : []})
with col1:
if file_bytes:
with st.spinner("Building base dataframe..."):
df_base = pd.DataFrame.from_dict(data_dict)
df_base["trial_id"] = df_base.trial_id.map(lambda s: "".join([c for c in s if c.isdigit()]))
df_base["len_pupil_dilation"] = df_base.pupil_dilation.map(lambda l: len(l))
df_base["len_baseline"] = df_base.baseline.map(lambda l: len(l))
st.info(f"number of files: {len(file_names)}")
if 'df_base' not in st.session_state:
st.session_state['df_base'] = df_base
else:
st.caption("Upload your data using the sidebar to start :sunglasses:")
if 'df_base' in st.session_state:
st.markdown("Your original data with some extra information about the length of the time-series fields")
st.dataframe(st.session_state.df_base)
# Cleaning starts
with col1:
if not df_base.empty:
st.markdown("**Cleaning actions**")
detect_blinking = st.button("I want to clean my data 🤗")
number_of_blinks = 0
if detect_blinking:
# Initialization of session_state
if 'df' not in st.session_state:
st.session_state['df'] = df_base
for ser in df_base['pupil_dilation']:
for f in ser:
if f == 0.0:
number_of_blinks += 1
for ser in df_base['baseline']:
for f in ser:
if f == 0.0:
number_of_blinks += 1
# Initialization of session_state
if 'blinks' not in st.session_state:
st.session_state['blinks'] = number_of_blinks
if "blinks" in st.session_state.keys():
st.info(f"blinking values (0.0) were found in {number_of_blinks} time-steps in all your data")
remove_blinking = st.button("Remove blinking 🧹")
# df in column 2
if remove_blinking:
df_right = st.session_state.df.copy(deep=True)
df_right.pupil_dilation = df_right.pupil_dilation.map(lambda ser: [f for f in ser if f != 0.0])
df_right.baseline = df_right.baseline.map(lambda ser: [f for f in ser if f != 0.0])
st.session_state['df'] = df_right.copy(deep=True)
st.success("Blinking values have been removed!")
elif detect_blinking and not number_of_blinks:
st.caption("No blinking values were found in your data! ")
# Add calculated fields
if 'df' in list(st.session_state.keys()):
df_right = st.session_state.df.copy(deep=True)
if "baseline" in list(df_right.keys()):
st.markdown(f"A **baseline** feature has been found on your data, do you want to merge it with any of the other features in a new calculated field?")
option = st.multiselect('Select a feature to create relative calculated feature ➕', [k for k in list(df_right.keys()) if k != 'baseline'], [[k for k in list(df_right.keys()) if k != 'baseline'][-4]])
relative_key = f"relative_{option[0]}"
add_relative = st.button(f"Add {relative_key}")
if add_relative:
baseline_mean = [sum(s)/len(s) for s in df_right['baseline']]
df_right[relative_key] = [[field_value - baseline_mean[i] for field_value in df_right[option[0]][i]] for i in range(len(df_right))]
st.markdown("After adding calculated fields")
st.dataframe(df_right)
csv = convert_df(df_right)
# Save transformations to disk
downl = st.download_button("Download CSV 💾", csv, "file.csv", "text/csv", key='download-csv')
if downl:
st.info("Your data has been downloaded, you can visualize and detect outliers in the 'Plotting' and 'Detect Outliers' pages on the sidebar.")
if not df_base.empty:
with col1:
st.warning("Consider running outlier detection to clean your data!", icon="⚠️")