Spaces:
Sleeping
Sleeping
import numpy as np | |
from scipy.signal import firwin | |
def filter_24(value): | |
return (value * 4.5) / (2**23 - 1) / 24.0 * 1e6 # 23 because 1 bit is lost for sign | |
def filter_2scomplement_np(value): | |
return np.where((value & (1 << 23)) != 0, value - (1 << 24), value) | |
def int_to_float(value): | |
""" | |
Convert the int value out of the ADS into a value in microvolts | |
""" | |
return filter_24(filter_2scomplement_np(value)) | |
def shift_numpy(arr, num, fill_value=np.nan): | |
result = np.empty_like(arr) | |
if num > 0: | |
result[:num] = fill_value | |
result[num:] = arr[:-num] | |
elif num < 0: | |
result[num:] = fill_value | |
result[:num] = arr[-num:] | |
else: | |
result[:] = arr | |
return result | |
class FIR: | |
def __init__(self, nb_channels, coefficients, buffer=None): | |
self.coefficients = np.expand_dims(np.array(coefficients), axis=1) | |
self.taps = len(self.coefficients) | |
self.nb_channels = nb_channels | |
self.buffer = np.array(buffer) if buffer is not None else np.zeros((self.taps, self.nb_channels)) | |
def filter(self, x): | |
self.buffer = shift_numpy(self.buffer, 1, x) | |
filtered = np.sum(self.buffer * self.coefficients, axis=0) | |
return filtered | |
class FilterPipeline: | |
def __init__(self, | |
nb_channels, | |
sampling_rate, | |
power_line_fq=60, | |
use_custom_fir=False, | |
custom_fir_order=20, | |
custom_fir_cutoff=30, | |
alpha_avg=0.1, | |
alpha_std=0.001, | |
epsilon=0.000001, | |
filter_args=[]): | |
if len(filter_args) > 0: | |
use_fir, use_notch, use_std = filter_args | |
else: | |
use_fir=True, | |
use_notch=True, | |
use_std=True | |
self.use_fir = use_fir | |
self.use_notch = use_notch | |
self.use_std = use_std | |
self.nb_channels = nb_channels | |
assert power_line_fq in [50, 60], f"The only supported power line frequencies are 50 Hz and 60 Hz" | |
if power_line_fq == 60: | |
self.notch_coeff1 = -0.12478308884588535 | |
self.notch_coeff2 = 0.98729186796473023 | |
self.notch_coeff3 = 0.99364593398236511 | |
self.notch_coeff4 = -0.12478308884588535 | |
self.notch_coeff5 = 0.99364593398236511 | |
else: | |
self.notch_coeff1 = -0.61410695998423581 | |
self.notch_coeff2 = 0.98729186796473023 | |
self.notch_coeff3 = 0.99364593398236511 | |
self.notch_coeff4 = -0.61410695998423581 | |
self.notch_coeff5 = 0.99364593398236511 | |
self.dfs = [np.zeros(self.nb_channels), np.zeros(self.nb_channels)] | |
self.moving_average = None | |
self.moving_variance = np.zeros(self.nb_channels) | |
self.ALPHA_AVG = alpha_avg | |
self.ALPHA_STD = alpha_std | |
self.EPSILON = epsilon | |
if use_custom_fir: | |
self.fir_coef = firwin(numtaps=custom_fir_order+1, cutoff=custom_fir_cutoff, fs=sampling_rate) | |
else: | |
self.fir_coef = [ | |
0.001623780150148094927192721215192250384, | |
0.014988684599373741992978104065059596905, | |
0.021287595318265635502275046064823982306, | |
0.007349500393709578957568417933998716762, | |
-0.025127515717112181709014251396183681209, | |
-0.052210507359822452833064687638398027048, | |
-0.039273839505489904766477593511808663607, | |
0.033021568427940004020193498490698402748, | |
0.147606943281569008563636202779889572412, | |
0.254000252034505602516389899392379447818, | |
0.297330876398883392486283128164359368384, | |
0.254000252034505602516389899392379447818, | |
0.147606943281569008563636202779889572412, | |
0.033021568427940004020193498490698402748, | |
-0.039273839505489904766477593511808663607, | |
-0.052210507359822452833064687638398027048, | |
-0.025127515717112181709014251396183681209, | |
0.007349500393709578957568417933998716762, | |
0.021287595318265635502275046064823982306, | |
0.014988684599373741992978104065059596905, | |
0.001623780150148094927192721215192250384] | |
self.fir = FIR(self.nb_channels, self.fir_coef) | |
def filter(self, value): | |
""" | |
value: a numpy array of shape (data series, channels) | |
""" | |
for i, x in enumerate(value): # loop over the data series | |
# FIR: | |
if self.use_fir: | |
x = self.fir.filter(x) | |
# notch: | |
if self.use_notch: | |
denAccum = (x - self.notch_coeff1 * self.dfs[0]) - self.notch_coeff2 * self.dfs[1] | |
x = (self.notch_coeff3 * denAccum + self.notch_coeff4 * self.dfs[0]) + self.notch_coeff5 * self.dfs[1] | |
self.dfs[1] = self.dfs[0] | |
self.dfs[0] = denAccum | |
# standardization: | |
if self.use_std: | |
if self.moving_average is not None: | |
delta = x - self.moving_average | |
self.moving_average = self.moving_average + self.ALPHA_AVG * delta | |
self.moving_variance = (1 - self.ALPHA_STD) * (self.moving_variance + self.ALPHA_STD * delta**2) | |
moving_std = np.sqrt(self.moving_variance) | |
x = (x - self.moving_average) / (moving_std + self.EPSILON) | |
else: | |
self.moving_average = x | |
value[i] = x | |
return value |