import os
import csv
import uuid
import json
import torch
import requests
import numpy as np
import pandas as pd
import gradio as gr
import plotly.graph_objects as go
from phate import PHATEAE
from funcs.som import ClusterSOM
from import numpy_to_native
from funcs.processor import process_data
from funcs.plot_func import plot_sensor_data_from_json
from funcs.dataloader import BaseDataset2, read_json_files
DEVICE = torch.device("cpu")
reducer10d = PHATEAE(epochs=30, n_components=10, lr=.0001, batch_size=128, t='auto', knn=8, relax=True, metric='euclidean')
cluster_som = ClusterSOM()
def score(self, data, midpoints=None, threshold_radius=4):
Compute the score for each sample in the data based on the distance of the BMU node to the closest midpoint of the SOM grid.
:param data: The input data.
:param midpoints: A dictionary with keys as the indices of the SOMs and values as lists of midpoints on the grid for the corresponding SOMs.
:param threshold_radius: The threshold radius for score calculation.
scores = []
for sample in data:
# Predict the cluster and BMU SOM coordinate for each sample in the data
result = self.predict([sample])[0]
# Check if it is not a noise
if result[0] != -1:
# The activated SOM's index and its corresponding BMU
activated_som_index, bmu = result[0], result[1]
# Get the corresponding SOM for the data point
som = self.som_models[activated_som_index]
# If specific midpoints are provided for SOMs, use them; else compute the midpoint of the SOM grid
if midpoints is not None and activated_som_index in midpoints:
specified_midpoints = midpoints[activated_som_index]
specified_midpoints = [tuple((dim-1)/2 for dim in som.get_weights().shape[:2])]
# Compute the grid distances from the BMU to each midpoint and find the minimum distance
min_distance = min(np.sqrt((midpoint[0] - bmu[0])*2 + (midpoint[1] - bmu[1])*2) for midpoint in specified_midpoints)
# Compute the score as the minimum grid distance minus the threshold radius
score = min_distance - threshold_radius
scores.append(None) # Noise
return scores
def map_som2animation(som_value):
mapping = {
2: 0, # walk
1: 1, # trot
3: 2, # gallop
5: 3, # idle
4: 3, # other
-1:3, #other
return mapping.get(som_value, None)
def deviation_scores(tensor_data, scale=50):
if len(tensor_data) < 5:
raise ValueError("The input tensor must have at least 5 elements.")
# Extract the side values and reference value from the input tensor
side_values = tensor_data[-5:-1].numpy()
reference_value = tensor_data[-1].item()
# Calculate the absolute differences between the side values and the reference
absolute_differences = np.abs(side_values - reference_value)
# Check for zero division
if np.sum(absolute_differences) == 0:
# All side values are equal to the reference, so their deviation scores are 0
return int(reference_value/20*32768), [0, 0, 0, 0]
# Calculate the deviation scores for each side value
scores = absolute_differences * scale
# Clip the scores between 0 and 1
clipped_scores = np.clip(scores, 0, 1)
return int(reference_value/20*32768), clipped_scores.tolist()
def process_som_data(data, prediction):
processed_data = []
for i in range(0, len(data)):
TS, scores_list = deviation_scores(data[i][0])
# If TS is missing (None), interpolate it using surrounding values
if TS is None:
if i > 0 and i < len(data) - 1:
prev_TS = processed_data[-1][1]
next_TS = deviation_scores(data[i + 1][0])[0]
TS = (prev_TS + next_TS) // 2
elif i > 0:
TS = processed_data[-1][1] # Use the previous TS value
TS = 0 # Default to 0 if no surrounding values are available
# Set Gait, State, and Condition
#0-walk 1-trot 2-gallop 3-idle
gait = map_som2animation(prediction[0][0])
state = 0
condition = 0
# Calculate Shape, Color, and Danger values
shape_values = scores_list
color_values = scores_list
danger_values = [1 if score == 1 else 0 for score in scores_list]
# Create a row with the required format
row = [gait, TS, state, condition] + shape_values + color_values + danger_values
return processed_data
def scores_to_dataframe(scores, start_time='2022-07-01 09:15:00+05:30', start_score=100, none_replacement=-0):
# Create a timestamp for every score in the list
timestamps = [pd.Timestamp(start_time) + pd.Timedelta(seconds=i) for i in range(len(scores))]
# Convert timestamps to unix timestamps
unix_timestamps = [int(ts.value // 10**9) for ts in timestamps]
# Initialize open prices list
open_prices = [start_score]
# Calculate open and close prices
for i in range(1, len(scores)):
if scores[i-1] is not None:
open_prices.append(open_prices[i-1] + scores[i-1])
close_prices = [open + (score if score is not None else none_replacement) for open, score in zip(open_prices, scores)]
# Create high and low prices
high_prices = [max(open, close) for open, close in zip(open_prices, close_prices)]
low_prices = [min(open, close) for open, close in zip(open_prices, close_prices)]
# Create a dataframe
df = pd.DataFrame({
'time': unix_timestamps,
'open': open_prices,
'high': high_prices,
'low': low_prices,
'close': close_prices
# Start index from 1
df.index += 1
return df
def get_som_mp4_v2(csv_file_box, slice_size_slider, sample_rate, window_size_slider, reducer=reducer10d, cluster=cluster_som):
processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, time_list = process_data(csv_file_box,
print('finished processing')
if json_file_box is None:
return processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, None, None, None
train_x, train_y = read_json_files(json_file_box)
if is None:
return processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, None, None, None
train_x, train_y = read_json_files(
# Convert tensors to numpy arrays if necessary
if isinstance(train_x, torch.Tensor):
train_x = train_x.numpy()
if isinstance(train_y, torch.Tensor):
train_y = train_y.numpy()
# load the time series slices of the data 4*3*2*64 (feeds+axis*sensor*samples) + 5 for time diff
data = BaseDataset2(train_x.reshape(len(train_x), -1) / 32768, train_y)
#compute the 10 dimensional embeding vector
embedding10d = reducer.transform(data)
# retrieve the prediction and get the animation
prediction = cluster_som.predict(embedding10d)
processed_data = process_som_data(data,prediction)
scores = cluster_som.score(embedding10d, threshold_radius=8.5)
scores_df = scores_to_dataframe(scores)
fig = go.Figure(data=[go.Candlestick(x=scores_df['time'],
# Write the processed data to a CSV file
header = ['Gait', 'TS', 'State', 'Condition',
'Shape1', 'Shape2', 'Shape3', 'Shape4',
'Color1', 'Color2', 'Color3', 'Color4',
'Danger1', 'Danger2', 'Danger3', 'Danger4']
with open('animation_table.csv', 'w', newline='') as csvfile:
csv_writer = csv.writer(csvfile)
uuid_name = f'{str(uuid.uuid4())}'
name_animation_file = f'animation-{uuid_name}.mp4'
name_som_sequence_file = f'sequence-{uuid_name}.mp4'
os.system(f'curl -X POST -F "csv_file=@animation_table.csv" --output {name_animation_file}')
# #with hhtp requests
# url = ""
# file = {'csv_file': open('animation_table.csv', 'rb')}
# response =, files=file)
# # The response will contain the binary data of the MP4 file. You can write it to a file like this:
# with open('animation.mp4', 'wb') as f:
# f.write(response.content)
# prediction = cluster_som.predict(embedding10d)
# passing the time values for each slice
som_video = cluster.plot_activation(embedding10d, times=time_list)
# return processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, som_video, 'animation.mp4', fig
return processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, name_som_sequence_file, name_animation_file, fig
return processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, 'som_sequence.mp4', None
# ml inference
def get_som_mp4(file, slice_select, reducer=reducer10d, cluster=cluster_som):
train_x, train_y = read_json_files(file)
train_x, train_y = read_json_files(
# Convert tensors to numpy arrays if necessary
if isinstance(train_x, torch.Tensor):
train_x = train_x.numpy()
if isinstance(train_y, torch.Tensor):
train_y = train_y.numpy()
# load the time series slices of the data 4*3*2*64 (feeds+axis*sensor*samples) + 5 for time diff
data = BaseDataset2(train_x.reshape(len(train_x), -1) / 32768, train_y)
#compute the 10 dimensional embeding vector
embedding10d = reducer.transform(data)
fig = cluster.plot_activation_v2(embedding10d, slice_select)
return fig
def attach_label_to_json(json_file, label_text):
# Read the JSON file
with open(json_file, "r") as f:
slices = json.load(f)
with open(, "r") as f:
slices = json.load(f)
slices['label'] = label_text
with open(f'manual_labelled_{os.path.basename(}', "w") as f:
json.dump(numpy_to_native(slices), f, indent=2)
return f'manual_labelled_{os.path.basename(}'
with gr.Blocks(title='Cabasus') as cabasus_sensor:
title = gr.Markdown("<h2><center>Data gathering and processing</center></h2>")
with gr.Tab("Convert"):
with gr.Row():
csv_file_box = gr.File(label='Upload CSV File')
with gr.Column():
processed_file_box = gr.File(label='Processed CSV File')
json_file_box = gr.File(label='Generated Json file')
with gr.Row():
animation = gr.Video(label='animation')
activation_video = gr.Video(label='activation channels')
with gr.Row():
real_video = gr.Video(label='real video')
trend_graph = gr.Plot(label='trend graph')
plot_box_leg = gr.Plot(label="Filtered Signal Plot")
slice_slider = gr.Slider(minimum=1, maximum=300, label='Slice select', step=1)
som_create = gr.Button('generate activation maps')
som_figures = gr.Plot(label="activations maps")
with gr.Row():
slice_size_slider = gr.Slider(minimum=16, maximum=512, step=1, value=64, label="Slice Size", visible=False)
sample_rate = gr.Slider(minimum=1, maximum=199, step=1, value=20, label="Sample rate", visible=False)
with gr.Row():
window_size_slider = gr.Slider(minimum=0, maximum=100, step=2, value=10, label="Window Size", visible=False)
repeat_process = gr.Button('Restart process', visible=False)
with gr.Row():
leg_dropdown = gr.Dropdown(choices=['GZ1', 'GZ2', 'GZ3', 'GZ4'], label='select leg', value='GZ1')
with gr.Row():
get_all_slice = gr.Plot(label="Real Signal Plot")
plot_box_overlay = gr.Plot(label="Overlay Signal Plot")
with gr.Row():
plot_slice_leg = gr.Plot(label="Sliced Signal Plot", visible=False)
with gr.Row():
slice_json_box = gr.File(label='Slice json file')
with gr.Column():
label_name = gr.Textbox(label="enter the label name")
button_label_Add = gr.Button('attach label')
slice_json_label_box = gr.File(label='Slice json labelled file')
slices_per_leg = gr.Textbox(label="Debug information")
# csv_file_box.change(process_data, inputs=[csv_file_box, slice_size_slider, sample_rate, window_size_slider],
# outputs=[processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box])
leg_dropdown.change(plot_sensor_data_from_json, inputs=[json_file_box, leg_dropdown, slice_slider],
outputs=[plot_box_leg, plot_slice_leg, get_all_slice, slice_json_box, plot_box_overlay]), inputs=[csv_file_box, slice_size_slider, sample_rate, window_size_slider],
outputs=[processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box])
slice_slider.change(plot_sensor_data_from_json, inputs=[json_file_box, leg_dropdown, slice_slider],
outputs=[plot_box_leg, plot_slice_leg, get_all_slice, slice_json_box, plot_box_overlay]), inputs=[json_file_box, slice_slider], outputs=[som_figures])
#redoing the whole calculation with the file loading
csv_file_box.change(get_som_mp4_v2, inputs=[csv_file_box, slice_size_slider, sample_rate, window_size_slider],
outputs=[processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box,
activation_video, animation, trend_graph]), inputs=[slice_json_box, label_name], outputs=[slice_json_label_box])