import json import matplotlib import numpy as np import pandas as pd import seaborn as sns import matplotlib.pyplot as plt from funcs.tools import numpy_to_native matplotlib.use('Agg') plt.style.use('ggplot') def plot_sensor_data_from_json(json_file, sensor, slice_select=1): # Read the JSON file try: with open(json_file, "r") as f: slices = json.load(f) except: with open(json_file.name, "r") as f: slices = json.load(f) # Concatenate the slices and create a new timestamp series with 20ms intervals timestamps = [] sensor_data = [] slice_item = [] temp_end = 0 for slice_count, slice_dict in enumerate(slices): start_timestamp = slice_dict["timestamp"] slice_length = len(slice_dict[sensor]) slice_timestamps = [start_timestamp + 20 * i for i in range(temp_end, slice_length + temp_end)] timestamps.extend(slice_timestamps) sensor_data.extend(slice_dict[sensor]) temp_end += slice_length slice_item.extend([slice_count+1]*len(slice_timestamps)) # Create a DataFrame with the sensor data data = pd.DataFrame({sensor: sensor_data, 'slice selection': slice_item, 'time': timestamps}) # Plot the sensor data fig, ax = plt.subplots(figsize=(12, 6)) ax = plt.plot(data['time'].to_list(), data[sensor].to_list(), '-b') df_temp = data[data['slice selection'] == int(slice_select)].reset_index() ax = plt.plot(df_temp['time'].to_list(), df_temp[sensor].to_list(), '-r') plt.xlabel("Timestamp") plt.ylabel(sensor) plt.legend() plt.tight_layout() fig1, ax1 = plt.subplots(figsize=(12, 6)) ax1 = plt.plot(df_temp['time'].to_list(), df_temp[sensor].to_list()) plt.xlabel("Timestamp") plt.ylabel(sensor) plt.legend() plt.tight_layout() fig2, file = plot_other_sensor_with_same_timestamp(json_file, sensor, slice_select) fig3 = plot_overlay_data_from_json(json_file, slice_select=slice_select) return fig, fig1, fig2, file, fig3 def plot_overlay_data_from_json(json_file, slice_select, sensors=['GZ1', 'GZ2', 'GZ3', 'GZ4']): try: with open(json_file, "r") as f: slices = json.load(f) except: with open(json_file.name, "r") as f: slices = json.load(f) # Create subplots for each sensor fig, axs = plt.subplots(len(sensors), 1, figsize=(12, 2 * len(sensors)), sharex=True) for idx, sensor in enumerate(sensors): # Plot the overlay of the slices for slice_idx, slice_dict in enumerate(slices): slice_length = len(slice_dict[sensor]) # Create timestamp array starting from 0 for each slice slice_timestamps = [20 * i for i in range(slice_length)] sensor_data = slice_dict[sensor] data = pd.DataFrame({sensor: sensor_data}, index=slice_timestamps) if slice_idx+1 == slice_select: axs[idx].plot(data[sensor], '-r', label=f'Slice {slice_idx + 1}') else: axs[idx].plot(data[sensor], '-c') axs[idx].set_ylabel(sensor) axs[-1].set_xlabel("Timestamp") axs[0].legend() return fig def plot_slices(original_signal, imputed_signal, precise_slice_points, normal_slice_points, sample_rate, first_timestamp): plt.figure(figsize=(12, 6)) plt.plot(imputed_signal.index, imputed_signal, label="Imputed Signal") # Find the missing values and the predicted values missing_value_indices = original_signal.isna() missing_values = original_signal.loc[missing_value_indices] predicted_values = imputed_signal.loc[missing_value_indices] # Plot the original missing values and the predicted values as separate scatter plots plt.scatter(missing_values.index, missing_values, color='r', marker='x', label='Original Missing Values') plt.scatter(predicted_values.index, predicted_values, color='r', marker='o', label='Predicted Values') for index in precise_slice_points: plt.axvline(x=first_timestamp + (index), color='r', linestyle='--', label='Precise Slice Points' if index == precise_slice_points[0] else "") for index in normal_slice_points: plt.axvline(x=first_timestamp + (index), color='g', linestyle='-', label='Normal Slice Points' if index == normal_slice_points[0] else "") plt.legend() plt.xlabel("Time (s)") plt.ylabel("Signal Amplitude") plt.title("Imputed Signal and Slice Points") return True def plot_other_sensor_with_same_timestamp(json_file, sensor, slice_select): constant_keys = [f"{sensor}_precise_time_diff", "precise_timestamp", 'timestamp', "time_diff", "precise_time_diff"] # Read the JSON file try: with open(json_file, "r") as f: slices = json.load(f) except: with open(json_file.name, "r") as f: slices = json.load(f) # Concatenate the slices and create a new timestamp series with 20ms intervals timestamps = [] sensor_data = [] slice_item = [] slice_recorded = [] for slice_count, slice_dict in enumerate(slices): if slice_count+1 == slice_select: start_timestamp = slice_dict["timestamp"] for slist in slice_dict.keys(): if slist[-1] != sensor[-1]: continue slice_recorded.append(slist) slice_length = len(slice_dict[slist]) slice_timestamps = [start_timestamp + 20 * i for i in range(slice_length)] timestamps.extend(slice_timestamps) sensor_data.extend(slice_dict[slist]) slice_item.extend([slist]*len(slice_timestamps)) # Create a DataFrame with the sensor data data = pd.DataFrame({'data': sensor_data, 'sensor': slice_item, 'time': timestamps}) sensor_unique = sorted(list(data.sensor.unique()), reverse=True) fig, ax = plt.subplots(figsize=(12, 6)) ax = sns.lineplot(data, x='time', y='data', hue='sensor', hue_order=sensor_unique) plt.xlabel("Timestamp") plt.ylabel(sensor) plt.legend() plt.tight_layout() #create a new dictionary total_keys = slice_recorded + constant_keys json_dict = {} for tkeys in total_keys: json_dict[tkeys] = slice_dict[tkeys] with open(f'slice_{slice_select}.json', "w") as f: json.dump(numpy_to_native(json_dict), f, indent=2) return fig, f'slice_{slice_select}.json'