import numpy as np from scipy.signal import firwin def filter_24(value): return (value * 4.5) / (2**23 - 1) / 24.0 * 1e6 # 23 because 1 bit is lost for sign def filter_2scomplement_np(value): return np.where((value & (1 << 23)) != 0, value - (1 << 24), value) def int_to_float(value): """ Convert the int value out of the ADS into a value in microvolts """ return filter_24(filter_2scomplement_np(value)) def shift_numpy(arr, num, fill_value=np.nan): result = np.empty_like(arr) if num > 0: result[:num] = fill_value result[num:] = arr[:-num] elif num < 0: result[num:] = fill_value result[:num] = arr[-num:] else: result[:] = arr return result class FIR: def __init__(self, nb_channels, coefficients, buffer=None): self.coefficients = np.expand_dims(np.array(coefficients), axis=1) self.taps = len(self.coefficients) self.nb_channels = nb_channels self.buffer = np.array(buffer) if buffer is not None else np.zeros((self.taps, self.nb_channels)) def filter(self, x): self.buffer = shift_numpy(self.buffer, 1, x) filtered = np.sum(self.buffer * self.coefficients, axis=0) return filtered class FilterPipeline: def __init__(self, nb_channels, sampling_rate, power_line_fq=60, use_custom_fir=False, custom_fir_order=20, custom_fir_cutoff=30, alpha_avg=0.1, alpha_std=0.001, epsilon=0.000001, filter_args=[]): if len(filter_args) > 0: use_fir, use_notch, use_std = filter_args else: use_fir=True, use_notch=True, use_std=True self.use_fir = use_fir self.use_notch = use_notch self.use_std = use_std self.nb_channels = nb_channels assert power_line_fq in [50, 60], f"The only supported power line frequencies are 50 Hz and 60 Hz" if power_line_fq == 60: self.notch_coeff1 = -0.12478308884588535 self.notch_coeff2 = 0.98729186796473023 self.notch_coeff3 = 0.99364593398236511 self.notch_coeff4 = -0.12478308884588535 self.notch_coeff5 = 0.99364593398236511 else: self.notch_coeff1 = -0.61410695998423581 self.notch_coeff2 = 0.98729186796473023 self.notch_coeff3 = 0.99364593398236511 self.notch_coeff4 = -0.61410695998423581 self.notch_coeff5 = 0.99364593398236511 self.dfs = [np.zeros(self.nb_channels), np.zeros(self.nb_channels)] self.moving_average = None self.moving_variance = np.zeros(self.nb_channels) self.ALPHA_AVG = alpha_avg self.ALPHA_STD = alpha_std self.EPSILON = epsilon if use_custom_fir: self.fir_coef = firwin(numtaps=custom_fir_order+1, cutoff=custom_fir_cutoff, fs=sampling_rate) else: self.fir_coef = [ 0.001623780150148094927192721215192250384, 0.014988684599373741992978104065059596905, 0.021287595318265635502275046064823982306, 0.007349500393709578957568417933998716762, -0.025127515717112181709014251396183681209, -0.052210507359822452833064687638398027048, -0.039273839505489904766477593511808663607, 0.033021568427940004020193498490698402748, 0.147606943281569008563636202779889572412, 0.254000252034505602516389899392379447818, 0.297330876398883392486283128164359368384, 0.254000252034505602516389899392379447818, 0.147606943281569008563636202779889572412, 0.033021568427940004020193498490698402748, -0.039273839505489904766477593511808663607, -0.052210507359822452833064687638398027048, -0.025127515717112181709014251396183681209, 0.007349500393709578957568417933998716762, 0.021287595318265635502275046064823982306, 0.014988684599373741992978104065059596905, 0.001623780150148094927192721215192250384] self.fir = FIR(self.nb_channels, self.fir_coef) def filter(self, value): """ value: a numpy array of shape (data series, channels) """ for i, x in enumerate(value): # loop over the data series # FIR: if self.use_fir: x = self.fir.filter(x) # notch: if self.use_notch: denAccum = (x - self.notch_coeff1 * self.dfs[0]) - self.notch_coeff2 * self.dfs[1] x = (self.notch_coeff3 * denAccum + self.notch_coeff4 * self.dfs[0]) + self.notch_coeff5 * self.dfs[1] self.dfs[1] = self.dfs[0] self.dfs[0] = denAccum # standardization: if self.use_std: if self.moving_average is not None: delta = x - self.moving_average self.moving_average = self.moving_average + self.ALPHA_AVG * delta self.moving_variance = (1 - self.ALPHA_STD) * (self.moving_variance + self.ALPHA_STD * delta**2) moving_std = np.sqrt(self.moving_variance) x = (x - self.moving_average) / (moving_std + self.EPSILON) else: self.moving_average = x value[i] = x return value