Spaces:
Sleeping
Sleeping
File size: 5,672 Bytes
a64c5cc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
import numpy as np
from scipy.signal import firwin
def filter_24(value):
return (value * 4.5) / (2**23 - 1) / 24.0 * 1e6 # 23 because 1 bit is lost for sign
def filter_2scomplement_np(value):
return np.where((value & (1 << 23)) != 0, value - (1 << 24), value)
def int_to_float(value):
"""
Convert the int value out of the ADS into a value in microvolts
"""
return filter_24(filter_2scomplement_np(value))
def shift_numpy(arr, num, fill_value=np.nan):
result = np.empty_like(arr)
if num > 0:
result[:num] = fill_value
result[num:] = arr[:-num]
elif num < 0:
result[num:] = fill_value
result[:num] = arr[-num:]
else:
result[:] = arr
return result
class FIR:
def __init__(self, nb_channels, coefficients, buffer=None):
self.coefficients = np.expand_dims(np.array(coefficients), axis=1)
self.taps = len(self.coefficients)
self.nb_channels = nb_channels
self.buffer = np.array(buffer) if buffer is not None else np.zeros((self.taps, self.nb_channels))
def filter(self, x):
self.buffer = shift_numpy(self.buffer, 1, x)
filtered = np.sum(self.buffer * self.coefficients, axis=0)
return filtered
class FilterPipeline:
def __init__(self,
nb_channels,
sampling_rate,
power_line_fq=60,
use_custom_fir=False,
custom_fir_order=20,
custom_fir_cutoff=30,
alpha_avg=0.1,
alpha_std=0.001,
epsilon=0.000001,
filter_args=[]):
if len(filter_args) > 0:
use_fir, use_notch, use_std = filter_args
else:
use_fir=True,
use_notch=True,
use_std=True
self.use_fir = use_fir
self.use_notch = use_notch
self.use_std = use_std
self.nb_channels = nb_channels
assert power_line_fq in [50, 60], f"The only supported power line frequencies are 50 Hz and 60 Hz"
if power_line_fq == 60:
self.notch_coeff1 = -0.12478308884588535
self.notch_coeff2 = 0.98729186796473023
self.notch_coeff3 = 0.99364593398236511
self.notch_coeff4 = -0.12478308884588535
self.notch_coeff5 = 0.99364593398236511
else:
self.notch_coeff1 = -0.61410695998423581
self.notch_coeff2 = 0.98729186796473023
self.notch_coeff3 = 0.99364593398236511
self.notch_coeff4 = -0.61410695998423581
self.notch_coeff5 = 0.99364593398236511
self.dfs = [np.zeros(self.nb_channels), np.zeros(self.nb_channels)]
self.moving_average = None
self.moving_variance = np.zeros(self.nb_channels)
self.ALPHA_AVG = alpha_avg
self.ALPHA_STD = alpha_std
self.EPSILON = epsilon
if use_custom_fir:
self.fir_coef = firwin(numtaps=custom_fir_order+1, cutoff=custom_fir_cutoff, fs=sampling_rate)
else:
self.fir_coef = [
0.001623780150148094927192721215192250384,
0.014988684599373741992978104065059596905,
0.021287595318265635502275046064823982306,
0.007349500393709578957568417933998716762,
-0.025127515717112181709014251396183681209,
-0.052210507359822452833064687638398027048,
-0.039273839505489904766477593511808663607,
0.033021568427940004020193498490698402748,
0.147606943281569008563636202779889572412,
0.254000252034505602516389899392379447818,
0.297330876398883392486283128164359368384,
0.254000252034505602516389899392379447818,
0.147606943281569008563636202779889572412,
0.033021568427940004020193498490698402748,
-0.039273839505489904766477593511808663607,
-0.052210507359822452833064687638398027048,
-0.025127515717112181709014251396183681209,
0.007349500393709578957568417933998716762,
0.021287595318265635502275046064823982306,
0.014988684599373741992978104065059596905,
0.001623780150148094927192721215192250384]
self.fir = FIR(self.nb_channels, self.fir_coef)
def filter(self, value):
"""
value: a numpy array of shape (data series, channels)
"""
for i, x in enumerate(value): # loop over the data series
# FIR:
if self.use_fir:
x = self.fir.filter(x)
# notch:
if self.use_notch:
denAccum = (x - self.notch_coeff1 * self.dfs[0]) - self.notch_coeff2 * self.dfs[1]
x = (self.notch_coeff3 * denAccum + self.notch_coeff4 * self.dfs[0]) + self.notch_coeff5 * self.dfs[1]
self.dfs[1] = self.dfs[0]
self.dfs[0] = denAccum
# standardization:
if self.use_std:
if self.moving_average is not None:
delta = x - self.moving_average
self.moving_average = self.moving_average + self.ALPHA_AVG * delta
self.moving_variance = (1 - self.ALPHA_STD) * (self.moving_variance + self.ALPHA_STD * delta**2)
moving_std = np.sqrt(self.moving_variance)
x = (x - self.moving_average) / (moving_std + self.EPSILON)
else:
self.moving_average = x
value[i] = x
return value |