Spaces:
Build error
Build error
import os | |
import csv | |
import uuid | |
import json | |
import torch | |
import requests | |
import numpy as np | |
import pandas as pd | |
import gradio as gr | |
import plotly.graph_objects as go | |
from phate import PHATEAE | |
from funcs.som import ClusterSOM | |
from funcs.tools import numpy_to_native | |
from funcs.processor import process_data | |
from funcs.plot_func import plot_sensor_data_from_json | |
from funcs.dataloader import BaseDataset2, read_json_files | |
DEVICE = torch.device("cpu") | |
reducer10d = PHATEAE(epochs=30, n_components=10, lr=.0001, batch_size=128, t='auto', knn=8, relax=True, metric='euclidean') | |
reducer10d.load('models/r10d_3.pth') | |
cluster_som = ClusterSOM() | |
cluster_som.load("models/cluster_som3.pkl") | |
def score(self, data, midpoints=None, threshold_radius=4): | |
""" | |
Compute the score for each sample in the data based on the distance of the BMU node to the closest midpoint of the SOM grid. | |
:param data: The input data. | |
:param midpoints: A dictionary with keys as the indices of the SOMs and values as lists of midpoints on the grid for the corresponding SOMs. | |
:param threshold_radius: The threshold radius for score calculation. | |
""" | |
scores = [] | |
for sample in data: | |
# Predict the cluster and BMU SOM coordinate for each sample in the data | |
result = self.predict([sample])[0] | |
# Check if it is not a noise | |
if result[0] != -1: | |
# The activated SOM's index and its corresponding BMU | |
activated_som_index, bmu = result[0], result[1] | |
# Get the corresponding SOM for the data point | |
som = self.som_models[activated_som_index] | |
# If specific midpoints are provided for SOMs, use them; else compute the midpoint of the SOM grid | |
if midpoints is not None and activated_som_index in midpoints: | |
specified_midpoints = midpoints[activated_som_index] | |
else: | |
specified_midpoints = [tuple((dim-1)/2 for dim in som.get_weights().shape[:2])] | |
# Compute the grid distances from the BMU to each midpoint and find the minimum distance | |
min_distance = min(np.sqrt((midpoint[0] - bmu[0])*2 + (midpoint[1] - bmu[1])*2) for midpoint in specified_midpoints) | |
# Compute the score as the minimum grid distance minus the threshold radius | |
score = min_distance - threshold_radius | |
scores.append(score) | |
else: | |
scores.append(None) # Noise | |
return scores | |
def map_som2animation(som_value): | |
mapping = { | |
2: 0, # walk | |
1: 1, # trot | |
3: 2, # gallop | |
5: 3, # idle | |
4: 3, # other | |
-1:3, #other | |
} | |
return mapping.get(som_value, None) | |
def deviation_scores(tensor_data, scale=50): | |
if len(tensor_data) < 5: | |
raise ValueError("The input tensor must have at least 5 elements.") | |
# Extract the side values and reference value from the input tensor | |
side_values = tensor_data[-5:-1].numpy() | |
reference_value = tensor_data[-1].item() | |
# Calculate the absolute differences between the side values and the reference | |
absolute_differences = np.abs(side_values - reference_value) | |
# Check for zero division | |
if np.sum(absolute_differences) == 0: | |
# All side values are equal to the reference, so their deviation scores are 0 | |
return int(reference_value/20*32768), [0, 0, 0, 0] | |
# Calculate the deviation scores for each side value | |
scores = absolute_differences * scale | |
# Clip the scores between 0 and 1 | |
clipped_scores = np.clip(scores, 0, 1) | |
return int(reference_value/20*32768), clipped_scores.tolist() | |
def process_som_data(data, prediction): | |
processed_data = [] | |
for i in range(0, len(data)): | |
TS, scores_list = deviation_scores(data[i][0]) | |
# If TS is missing (None), interpolate it using surrounding values | |
if TS is None: | |
if i > 0 and i < len(data) - 1: | |
prev_TS = processed_data[-1][1] | |
next_TS = deviation_scores(data[i + 1][0])[0] | |
TS = (prev_TS + next_TS) // 2 | |
elif i > 0: | |
TS = processed_data[-1][1] # Use the previous TS value | |
else: | |
TS = 0 # Default to 0 if no surrounding values are available | |
# Set Gait, State, and Condition | |
#0-walk 1-trot 2-gallop 3-idle | |
gait = map_som2animation(prediction[0][0]) | |
state = 0 | |
condition = 0 | |
# Calculate Shape, Color, and Danger values | |
shape_values = scores_list | |
color_values = scores_list | |
danger_values = [1 if score == 1 else 0 for score in scores_list] | |
# Create a row with the required format | |
row = [gait, TS, state, condition] + shape_values + color_values + danger_values | |
processed_data.append(row) | |
return processed_data | |
def scores_to_dataframe(scores, start_time='2022-07-01 09:15:00+05:30', start_score=100, none_replacement=-0): | |
# Create a timestamp for every score in the list | |
timestamps = [pd.Timestamp(start_time) + pd.Timedelta(seconds=i) for i in range(len(scores))] | |
# Convert timestamps to unix timestamps | |
unix_timestamps = [int(ts.value // 10**9) for ts in timestamps] | |
# Initialize open prices list | |
open_prices = [start_score] | |
# Calculate open and close prices | |
for i in range(1, len(scores)): | |
if scores[i-1] is not None: | |
open_prices.append(open_prices[i-1] + scores[i-1]) | |
else: | |
open_prices.append(open_prices[i-1]) | |
close_prices = [open + (score if score is not None else none_replacement) for open, score in zip(open_prices, scores)] | |
# Create high and low prices | |
high_prices = [max(open, close) for open, close in zip(open_prices, close_prices)] | |
low_prices = [min(open, close) for open, close in zip(open_prices, close_prices)] | |
# Create a dataframe | |
df = pd.DataFrame({ | |
'time': unix_timestamps, | |
'open': open_prices, | |
'high': high_prices, | |
'low': low_prices, | |
'close': close_prices | |
}) | |
# Start index from 1 | |
df.index += 1 | |
return df | |
def get_som_mp4_v2(csv_file_box, slice_size_slider, sample_rate, window_size_slider, reducer=reducer10d, cluster=cluster_som): | |
processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, time_list = process_data(csv_file_box, | |
slice_size_slider, | |
sample_rate, | |
window_size_slider) | |
print('finished processing') | |
try: | |
if json_file_box is None: | |
return processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, None, None, None | |
train_x, train_y = read_json_files(json_file_box) | |
except: | |
if json_file_box.name is None: | |
return processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, None, None, None | |
train_x, train_y = read_json_files(json_file_box.name) | |
# Convert tensors to numpy arrays if necessary | |
if isinstance(train_x, torch.Tensor): | |
train_x = train_x.numpy() | |
if isinstance(train_y, torch.Tensor): | |
train_y = train_y.numpy() | |
# load the time series slices of the data 4*3*2*64 (feeds+axis*sensor*samples) + 5 for time diff | |
data = BaseDataset2(train_x.reshape(len(train_x), -1) / 32768, train_y) | |
#compute the 10 dimensional embeding vector | |
embedding10d = reducer.transform(data) | |
# retrieve the prediction and get the animation | |
prediction = cluster_som.predict(embedding10d) | |
processed_data = process_som_data(data,prediction) | |
scores = cluster_som.score(embedding10d, threshold_radius=8.5) | |
scores_df = scores_to_dataframe(scores) | |
fig = go.Figure(data=[go.Candlestick(x=scores_df['time'], | |
open=scores_df['open'], | |
high=scores_df['high'], | |
low=scores_df['low'], | |
close=scores_df['close'])]) | |
# Write the processed data to a CSV file | |
header = ['Gait', 'TS', 'State', 'Condition', | |
'Shape1', 'Shape2', 'Shape3', 'Shape4', | |
'Color1', 'Color2', 'Color3', 'Color4', | |
'Danger1', 'Danger2', 'Danger3', 'Danger4'] | |
with open('animation_table.csv', 'w', newline='') as csvfile: | |
csv_writer = csv.writer(csvfile) | |
csv_writer.writerow(header) | |
csv_writer.writerows(processed_data) | |
uuid_name = f'{str(uuid.uuid4())}' | |
name_animation_file = f'animation-{uuid_name}.mp4' | |
name_som_sequence_file = f'sequence-{uuid_name}.mp4' | |
os.system(f'curl -X POST -F "csv_file=@animation_table.csv" https://metric-space.ngrok.io/generate --output {name_animation_file}') | |
# #with hhtp requests | |
# url = "https://metric-space.ngrok.io/generate" | |
# file = {'csv_file': open('animation_table.csv', 'rb')} | |
# response = requests.post(url, files=file) | |
# # The response will contain the binary data of the MP4 file. You can write it to a file like this: | |
# with open('animation.mp4', 'wb') as f: | |
# f.write(response.content) | |
# prediction = cluster_som.predict(embedding10d) | |
# passing the time values for each slice | |
som_video = cluster.plot_activation(embedding10d, times=time_list) | |
som_video.write_videofile(name_som_sequence_file) | |
# return processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, som_video, 'animation.mp4', fig | |
return processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, name_som_sequence_file, name_animation_file, fig | |
return processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, 'som_sequence.mp4', None | |
# ml inference | |
def get_som_mp4(file, slice_select, reducer=reducer10d, cluster=cluster_som): | |
try: | |
train_x, train_y = read_json_files(file) | |
except: | |
train_x, train_y = read_json_files(file.name) | |
# Convert tensors to numpy arrays if necessary | |
if isinstance(train_x, torch.Tensor): | |
train_x = train_x.numpy() | |
if isinstance(train_y, torch.Tensor): | |
train_y = train_y.numpy() | |
# load the time series slices of the data 4*3*2*64 (feeds+axis*sensor*samples) + 5 for time diff | |
data = BaseDataset2(train_x.reshape(len(train_x), -1) / 32768, train_y) | |
#compute the 10 dimensional embeding vector | |
embedding10d = reducer.transform(data) | |
fig = cluster.plot_activation_v2(embedding10d, slice_select) | |
return fig | |
def attach_label_to_json(json_file, label_text): | |
# Read the JSON file | |
try: | |
with open(json_file, "r") as f: | |
slices = json.load(f) | |
except: | |
with open(json_file.name, "r") as f: | |
slices = json.load(f) | |
slices['label'] = label_text | |
with open(f'manual_labelled_{os.path.basename(json_file.name)}', "w") as f: | |
json.dump(numpy_to_native(slices), f, indent=2) | |
return f'manual_labelled_{os.path.basename(json_file.name)}' | |
with gr.Blocks(title='Cabasus') as cabasus_sensor: | |
title = gr.Markdown("<h2><center>Data gathering and processing</center></h2>") | |
with gr.Tab("Convert"): | |
with gr.Row(): | |
csv_file_box = gr.File(label='Upload CSV File') | |
with gr.Column(): | |
processed_file_box = gr.File(label='Processed CSV File') | |
json_file_box = gr.File(label='Generated Json file') | |
with gr.Row(): | |
animation = gr.Video(label='animation') | |
activation_video = gr.Video(label='activation channels') | |
with gr.Row(): | |
real_video = gr.Video(label='real video') | |
trend_graph = gr.Plot(label='trend graph') | |
plot_box_leg = gr.Plot(label="Filtered Signal Plot") | |
slice_slider = gr.Slider(minimum=1, maximum=300, label='Slice select', step=1) | |
som_create = gr.Button('generate activation maps') | |
som_figures = gr.Plot(label="activations maps") | |
with gr.Row(): | |
slice_size_slider = gr.Slider(minimum=16, maximum=512, step=1, value=64, label="Slice Size", visible=False) | |
sample_rate = gr.Slider(minimum=1, maximum=199, step=1, value=20, label="Sample rate", visible=False) | |
with gr.Row(): | |
window_size_slider = gr.Slider(minimum=0, maximum=100, step=2, value=10, label="Window Size", visible=False) | |
repeat_process = gr.Button('Restart process', visible=False) | |
with gr.Row(): | |
leg_dropdown = gr.Dropdown(choices=['GZ1', 'GZ2', 'GZ3', 'GZ4'], label='select leg', value='GZ1') | |
with gr.Row(): | |
get_all_slice = gr.Plot(label="Real Signal Plot") | |
plot_box_overlay = gr.Plot(label="Overlay Signal Plot") | |
with gr.Row(): | |
plot_slice_leg = gr.Plot(label="Sliced Signal Plot", visible=False) | |
with gr.Row(): | |
slice_json_box = gr.File(label='Slice json file') | |
with gr.Column(): | |
label_name = gr.Textbox(label="enter the label name") | |
button_label_Add = gr.Button('attach label') | |
slice_json_label_box = gr.File(label='Slice json labelled file') | |
slices_per_leg = gr.Textbox(label="Debug information") | |
# csv_file_box.change(process_data, inputs=[csv_file_box, slice_size_slider, sample_rate, window_size_slider], | |
# outputs=[processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box]) | |
leg_dropdown.change(plot_sensor_data_from_json, inputs=[json_file_box, leg_dropdown, slice_slider], | |
outputs=[plot_box_leg, plot_slice_leg, get_all_slice, slice_json_box, plot_box_overlay]) | |
repeat_process.click(process_data, inputs=[csv_file_box, slice_size_slider, sample_rate, window_size_slider], | |
outputs=[processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box]) | |
slice_slider.change(plot_sensor_data_from_json, inputs=[json_file_box, leg_dropdown, slice_slider], | |
outputs=[plot_box_leg, plot_slice_leg, get_all_slice, slice_json_box, plot_box_overlay]) | |
som_create.click(get_som_mp4, inputs=[json_file_box, slice_slider], outputs=[som_figures]) | |
#redoing the whole calculation with the file loading | |
csv_file_box.change(get_som_mp4_v2, inputs=[csv_file_box, slice_size_slider, sample_rate, window_size_slider], | |
outputs=[processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, | |
activation_video, animation, trend_graph]) | |
button_label_Add.click(attach_label_to_json, inputs=[slice_json_box, label_name], outputs=[slice_json_label_box]) | |
cabasus_sensor.queue(concurrency_count=2).launch(debug=True) | |