cabasus / funcs /processor.py
arcan3's picture
added revision
fa2a5c2
raw
history blame
3.98 kB
import numpy as np
import pandas as pd
import gradio as gr
from funcs.convertors import slice_csv_to_json, slice_csv_to_json_v2
from funcs.plot_func import plot_sensor_data_from_json
def process_data(input_file, slice_size=64, sample_rate=20, window_size=40, min_slice_size=10, threshold=1000, span_limit=10000000):
# Read the data from the file, including the CRC column
try:
if input_file.name is None:
return None, None, None, None, None, None, None, None, None
data = pd.read_csv(input_file.name, delimiter=";", index_col="NR", usecols=["NR", "TS", "LEG", "GX", "GY", "GZ", "AX", "AY", "AZ", "CRC"])
except:
if input_file is None:
return None, None, None, None, None, None, None, None, None
data = pd.read_csv(input_file, delimiter=";", index_col="NR", usecols=["NR", "TS", "LEG", "GX", "GY", "GZ", "AX", "AY", "AZ", "CRC"])
# Replace the values with NaN when the CRC value is not zero
data.loc[data["CRC"] != 0, ["GX", "GY", "GZ", "AX", "AY", "AZ"]] = np.nan
# Drop the CRC column as it is not needed anymore
data = data.drop(columns="CRC")
# Pivot the table to have only one line per timestamp but more columns
data = data.pivot_table(values=["GX", "GY", "GZ", "AX", "AY", "AZ"], index="TS", columns="LEG")
# Flatten the multi-level columns
data.columns = [f"{col[0]}{col[1]}" for col in data.columns]
# Sort the index (timestamps)
data = data.sort_index()
# Check if the span between min and max is too large, and limit it if necessary
min_ts = data.index.min()
max_ts = data.index.max()
if (max_ts - min_ts) > span_limit:
max_ts = min_ts + span_limit
data = data[data.index <= max_ts]
# Check if the timestamp distance is 20 ms and add timestamps necessary
new_index = pd.RangeIndex(start=min_ts, stop=max_ts + 20, step=20)
data = data.reindex(new_index)
# Fill missing values with NaN
data = data.replace(0, np.nan)
# Check if the gap between two timestamps is bigger than 80 ms and show a warning
gaps = data.isna().all(axis=1).astype(int).groupby(data.notna().all(axis=1).astype(int).cumsum()).sum()
big_gaps = gaps[gaps > 3]
if not big_gaps.empty:
gap_start_index = big_gaps.index[0] * 20
gap_size = big_gaps.iloc[0] * 20
# print(f"Warning: gap of {gap_size} ms found at line {gap_start_index}")
# Save the data up to the point where there is a gap of more than 80 ms
data = data.iloc[:gap_start_index]
# Calculate the absolute differences between consecutive rows for all channels
differences = data.diff().abs()
# Find the index where all differences are below the threshold
no_significant_change_index = differences[differences.lt(threshold).all(axis=1)].index
# if not no_significant_change_index.empty:
# # Save the data up to the point where no significant change appears in all channels
# data = data.loc[:no_significant_change_index[0]]
# return None, None, f'Warning: Significantly shortened > check the recordings', None, None, None, None, None, None
# Save the resulting DataFrame to a new file
data.to_csv('output.csv', sep=";", na_rep="NaN", float_format="%.0f")
file, len_, time_list = slice_csv_to_json('output.csv', slice_size, min_slice_size, sample_rate, window_size=window_size)
# file, len_ = slice_csv_to_json_v2('output.csv', slice_size, min_slice_size, sample_rate)
# get the plot automatically
sensor_fig, slice_fig, get_all_slice, slice_json, overlay_fig = plot_sensor_data_from_json(file, "GZ1") # with the csv file
# overlay_fig = plot_overlay_data_from_json(file, ["GZ1", "GZ2", "GZ3", "GZ4"])
return 'output.csv', file, f'num of slices found: {len_}', sensor_fig, overlay_fig, gr.Slider.update(interactive=True, maximum=len_, minimum=1, value=1), slice_fig, get_all_slice, slice_json, time_list