import os import csv import json import torch import numpy as np import pandas as pd import gradio as gr from phate import PHATEAE from pytvlwcharts import * from pandas import Timestamp from funcs.som import ClusterSOM from funcs.tools import numpy_to_native from funcs.processor import process_data from funcs.plot_func import plot_sensor_data_from_json from funcs.dataloader import BaseDataset2, read_json_files DEVICE = torch.device("cpu") reducer10d = PHATEAE(epochs=30, n_components=10, lr=.0001, batch_size=128, t='auto', knn=8, relax=True, metric='euclidean') reducer10d.load('models/r10d_6.pth') cluster_som = ClusterSOM() cluster_som.load("models/cluster_som6.pkl") def score(self, data, midpoints=None, threshold_radius=4): """ Compute the score for each sample in the data based on the distance of the BMU node to the closest midpoint of the SOM grid. :param data: The input data. :param midpoints: A dictionary with keys as the indices of the SOMs and values as lists of midpoints on the grid for the corresponding SOMs. :param threshold_radius: The threshold radius for score calculation. """ scores = [] for sample in data: # Predict the cluster and BMU SOM coordinate for each sample in the data result = self.predict([sample])[0] # Check if it is not a noise if result[0] != -1: # The activated SOM's index and its corresponding BMU activated_som_index, bmu = result[0], result[1] # Get the corresponding SOM for the data point som = self.som_models[activated_som_index] # If specific midpoints are provided for SOMs, use them; else compute the midpoint of the SOM grid if midpoints is not None and activated_som_index in midpoints: specified_midpoints = midpoints[activated_som_index] else: specified_midpoints = [tuple((dim-1)/2 for dim in som.get_weights().shape[:2])] # Compute the grid distances from the BMU to each midpoint and find the minimum distance min_distance = min(np.sqrt((midpoint[0] - bmu[0])*2 + (midpoint[1] - bmu[1])*2) for midpoint in specified_midpoints) # Compute the score as the minimum grid distance minus the threshold radius score = min_distance - threshold_radius scores.append(score) else: scores.append(None) # Noise return scores def map_som2animation(som_value): mapping = { 2: 0, # walk 1: 1, # trot 3: 2, # gallop 5: 3, # idle 4: 3, # other -1:3, #other } return mapping.get(som_value, None) def deviation_scores(tensor_data, scale=50): if len(tensor_data) < 5: raise ValueError("The input tensor must have at least 5 elements.") # Extract the side values and reference value from the input tensor side_values = tensor_data[-5:-1].numpy() reference_value = tensor_data[-1].item() # Calculate the absolute differences between the side values and the reference absolute_differences = np.abs(side_values - reference_value) # Check for zero division if np.sum(absolute_differences) == 0: # All side values are equal to the reference, so their deviation scores are 0 return int(reference_value/20*32768), [0, 0, 0, 0] # Calculate the deviation scores for each side value scores = absolute_differences * scale # Clip the scores between 0 and 1 clipped_scores = np.clip(scores, 0, 1) return int(reference_value/20*32768), clipped_scores.tolist() def process_som_data(data, prediction): processed_data = [] for i in range(0, len(data)): TS, scores_list = deviation_scores(data[i][0]) # If TS is missing (None), interpolate it using surrounding values if TS is None: if i > 0 and i < len(data) - 1: prev_TS = processed_data[-1][1] next_TS = deviation_scores(data[i + 1][0])[0] TS = (prev_TS + next_TS) // 2 elif i > 0: TS = processed_data[-1][1] # Use the previous TS value else: TS = 0 # Default to 0 if no surrounding values are available # Set Gait, State, and Condition #0-walk 1-trot 2-gallop 3-idle gait = map_som2animation(prediction[0][0]) state = 0 condition = 0 # Calculate Shape, Color, and Danger values shape_values = scores_list color_values = scores_list danger_values = [1 if score == 1 else 0 for score in scores_list] # Create a row with the required format row = [gait, TS, state, condition] + shape_values + color_values + danger_values processed_data.append(row) return processed_data def scores_to_dataframe(scores, start_time='2022-07-01 09:15:00+05:30', start_score=100, none_replacement=-0): # Create a timestamp for every score in the list timestamps = [pd.Timestamp(start_time) + pd.Timedelta(seconds=i) for i in range(len(scores))] # Convert timestamps to unix timestamps unix_timestamps = [int(ts.value // 10**9) for ts in timestamps] # Initialize open prices list open_prices = [start_score] # Calculate open and close prices for i in range(1, len(scores)): if scores[i-1] is not None: open_prices.append(open_prices[i-1] + scores[i-1]) else: open_prices.append(open_prices[i-1]) close_prices = [open + (score if score is not None else none_replacement) for open, score in zip(open_prices, scores)] # Create high and low prices high_prices = [max(open, close) for open, close in zip(open_prices, close_prices)] low_prices = [min(open, close) for open, close in zip(open_prices, close_prices)] # Create a dataframe df = pd.DataFrame({ 'time': unix_timestamps, 'open': open_prices, 'high': high_prices, 'low': low_prices, 'close': close_prices }) # Start index from 1 df.index += 1 return df def get_som_mp4_v2(csv_file_box, slice_size_slider, sample_rate, window_size_slider, reducer=reducer10d, cluster=cluster_som): processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box = process_data(csv_file_box, slice_size_slider, sample_rate, window_size_slider) try: if json_file_box is None: return processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, None, None train_x, train_y = read_json_files(json_file_box) except: if json_file_box.name is None: return processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, None, None train_x, train_y = read_json_files(json_file_box.name) # Convert tensors to numpy arrays if necessary if isinstance(train_x, torch.Tensor): train_x = train_x.numpy() if isinstance(train_y, torch.Tensor): train_y = train_y.numpy() # load the time series slices of the data 4*3*2*64 (feeds+axis*sensor*samples) + 5 for time diff data = BaseDataset2(train_x.reshape(len(train_x), -1) / 32768, train_y) #compute the 10 dimensional embeding vector embedding10d = reducer.transform(data) # retrieve the prediction and get the animation prediction = cluster_som.predict(embedding10d) processed_data = process_som_data(data,prediction) scores = cluster_som.score(embedding10d, threshold_radius=8.5) scores_df = scores_to_dataframe(scores) DailyChart = Chart(data=scores_df, width = 1360, height = 500, time_scale=TimeScaleOptions(seconds_visible=True, time_visible=True)).mark_candlestick() # Write the processed data to a CSV file header = ['Gait', 'TS', 'State', 'Condition', 'Shape1', 'Shape2', 'Shape3', 'Shape4', 'Color1', 'Color2', 'Color3', 'Color4', 'Danger1', 'Danger2', 'Danger3', 'Danger4'] with open('animation_table.csv', 'w', newline='') as csvfile: csv_writer = csv.writer(csvfile) csv_writer.writerow(header) csv_writer.writerows(processed_data) os.system('curl -X POST -F "csv_file=@animation_table.csv" https://metric-space.ngrok.io/generate --output animation.mp4') # prediction = cluster_som.predict(embedding10d) som_video = cluster.plot_activation(embedding10d) som_video.write_videofile('som_sequence.mp4') return processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, 'som_sequence.mp4', 'animation.mp4', DailyChart return processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, 'som_sequence.mp4', None # ml inference def get_som_mp4(file, slice_select, reducer=reducer10d, cluster=cluster_som): try: train_x, train_y = read_json_files(file) except: train_x, train_y = read_json_files(file.name) # Convert tensors to numpy arrays if necessary if isinstance(train_x, torch.Tensor): train_x = train_x.numpy() if isinstance(train_y, torch.Tensor): train_y = train_y.numpy() # load the time series slices of the data 4*3*2*64 (feeds+axis*sensor*samples) + 5 for time diff data = BaseDataset2(train_x.reshape(len(train_x), -1) / 32768, train_y) #compute the 10 dimensional embeding vector embedding10d = reducer.transform(data) fig = cluster.plot_activation_v2(embedding10d, slice_select) return fig def attach_label_to_json(json_file, label_text): # Read the JSON file try: with open(json_file, "r") as f: slices = json.load(f) except: with open(json_file.name, "r") as f: slices = json.load(f) slices['label'] = label_text with open(f'manual_labelled_{os.path.basename(json_file.name)}', "w") as f: json.dump(numpy_to_native(slices), f, indent=2) return f'manual_labelled_{os.path.basename(json_file.name)}' with gr.Blocks(title='Cabasus') as cabasus_sensor: title = gr.Markdown("

Data gathering and processing

") with gr.Tab("Convert"): with gr.Row(): csv_file_box = gr.File(label='Upload CSV File') with gr.Column(): processed_file_box = gr.File(label='Processed CSV File') json_file_box = gr.File(label='Generated Json file') with gr.Row(): animation = gr.Video(label='animation') activation_video = gr.Video(label='activation channels') with gr.Row(): real_video = gr.Video(label='real video') trend_graph = gr.Plot(label='trend graph') plot_box_leg = gr.Plot(label="Filtered Signal Plot") slice_slider = gr.Slider(minimum=1, maximum=300, label='Slice select', step=1) som_create = gr.Button('generate som') som_figures = gr.Plot(label="som activations") with gr.Row(): slice_size_slider = gr.Slider(minimum=16, maximum=512, step=1, value=64, label="Slice Size", visible=False) sample_rate = gr.Slider(minimum=1, maximum=199, step=1, value=20, label="Sample rate", visible=False) with gr.Row(): window_size_slider = gr.Slider(minimum=0, maximum=100, step=2, value=10, label="Window Size", visible=False) repeat_process = gr.Button('Restart process', visible=False) with gr.Row(): leg_dropdown = gr.Dropdown(choices=['GZ1', 'GZ2', 'GZ3', 'GZ4'], label='select leg', value='GZ1') with gr.Row(): get_all_slice = gr.Plot(label="Real Signal Plot") plot_box_overlay = gr.Plot(label="Overlay Signal Plot") with gr.Row(): plot_slice_leg = gr.Plot(label="Sliced Signal Plot", visible=False) with gr.Row(): slice_json_box = gr.File(label='Slice json file') with gr.Column(): label_name = gr.Textbox(label="enter the label name") button_label_Add = gr.Button('attach label') slice_json_label_box = gr.File(label='Slice json labelled file') slices_per_leg = gr.Textbox(label="Debug information") # csv_file_box.change(process_data, inputs=[csv_file_box, slice_size_slider, sample_rate, window_size_slider], # outputs=[processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box]) leg_dropdown.change(plot_sensor_data_from_json, inputs=[json_file_box, leg_dropdown, slice_slider], outputs=[plot_box_leg, plot_slice_leg, get_all_slice, slice_json_box, plot_box_overlay]) repeat_process.click(process_data, inputs=[csv_file_box, slice_size_slider, sample_rate, window_size_slider], outputs=[processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box]) slice_slider.change(plot_sensor_data_from_json, inputs=[json_file_box, leg_dropdown, slice_slider], outputs=[plot_box_leg, plot_slice_leg, get_all_slice, slice_json_box, plot_box_overlay]) som_create.click(get_som_mp4, inputs=[json_file_box, slice_slider], outputs=[som_figures]) #redoing the whole calculation with the file loading csv_file_box.change(get_som_mp4_v2, inputs=[csv_file_box, slice_size_slider, sample_rate, window_size_slider], outputs=[processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, activation_video, animation, trend_graph]) button_label_Add.click(attach_label_to_json, inputs=[slice_json_box, label_name], outputs=[slice_json_label_box]) cabasus_sensor.queue(concurrency_count=2).launch(debug=True)