import numpy as np import matplotlib.pyplot as plt from scipy.interpolate import interp1d from scipy.signal import savgol_filter, correlate, find_peaks def numpy_to_native(data): if isinstance(data, (np.int64, np.int32)): return int(data) elif isinstance(data, (np.float64, np.float32)): return float(data) elif isinstance(data, np.ndarray): return data.tolist() elif isinstance(data, dict): return {k: numpy_to_native(v) for k, v in data.items()} elif isinstance(data, list): return [numpy_to_native(v) for v in data] else: return data def process_signals(gz_signal, upsample_factor, window_size=40, poly_order=2, peak_distance=2, peak_prominence=1): smoothed_signal = savgol_filter(gz_signal, window_size, poly_order) upsampled_smoothed_signal = upsample_signal(smoothed_signal, upsample_factor) autocorr = correlate(upsampled_smoothed_signal, upsampled_smoothed_signal, mode='full') autocorr = autocorr[autocorr.size // 2:] peaks, _ = find_peaks(autocorr, distance=peak_distance, prominence=peak_prominence) return upsampled_smoothed_signal, peaks def fill_missing_values(data, window_size=10): for col in data.columns: if data[col].isna().any(): data[col] = data[col].rolling(window=window_size, min_periods=1, center=True).median() return data def upsample_signal(signal, upsample_factor): x = np.arange(signal.size) interpolator = interp1d(x, signal, kind='quadratic') x_upsampled = np.linspace(0, signal.size - 1, signal.size * upsample_factor) return interpolator(x_upsampled)