Spaces:
Sleeping
Sleeping
import os | |
import sys | |
from time import sleep | |
import time | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import os | |
from pathlib import Path | |
from datetime import datetime, timedelta | |
import multiprocessing as mp | |
import warnings | |
import shutil | |
from threading import Thread, Lock | |
import matplotlib.pyplot as plt | |
from EDFlib.edfwriter import EDFwriter | |
from scipy.signal import firwin | |
from portilooplot.jupyter_plot import ProgressPlot | |
from portiloop.hardware.frontend import Frontend | |
from portiloop.hardware.leds import LEDs, Color | |
from IPython.display import clear_output, display | |
import ipywidgets as widgets | |
DEFAULT_FRONTEND_CONFIG = [ | |
# nomenclature: name [default setting] [bits 7-0] : description | |
# Read only ID: | |
0x3E, # ID [xx] [REV_ID[2:0], 1, DEV_ID[1:0], NU_CH[1:0]] : (RO) | |
# Global Settings Across Channels: | |
0x96, # CONFIG1 [96] [1, DAISY_EN(bar), CLK_EN, 1, 0, DR[2:0]] : Datarate = 250 SPS | |
0xC0, # CONFIG2 [C0] [1, 1, 0, INT_CAL, 0, CAL_AMP0, CAL_FREQ[1:0]] : No tests | |
0x60, # CONFIG3 [60] [PD_REFBUF(bar), 1, 1, BIAS_MEAS, BIASREF_INT, PD_BIAS(bar), BIAS_LOFF_SENS, BIAS_STAT] : Power-down reference buffer, no bias | |
0x00, # LOFF [00] [COMP_TH[2:0], 0, ILEAD_OFF[1:0], FLEAD_OFF[1:0]] : No lead-off | |
# Channel-Specific Settings: | |
0x61, # CH1SET [61] [PD1, GAIN1[2:0], SRB2, MUX1[2:0]] : Channel 1 active, 24 gain, no SRB2 & input shorted | |
0x61, # CH2SET [61] [PD2, GAIN2[2:0], SRB2, MUX2[2:0]] : Channel 2 active, 24 gain, no SRB2 & input shorted | |
0x61, # CH3SET [61] [PD3, GAIN3[2:0], SRB2, MUX3[2:0]] : Channel 3 active, 24 gain, no SRB2 & input shorted | |
0x61, # CH4SET [61] [PD4, GAIN4[2:0], SRB2, MUX4[2:0]] : Channel 4 active, 24 gain, no SRB2 & input shorted | |
0x61, # CH5SET [61] [PD5, GAIN5[2:0], SRB2, MUX5[2:0]] : Channel 5 active, 24 gain, no SRB2 & input shorted | |
0x61, # CH6SET [61] [PD6, GAIN6[2:0], SRB2, MUX6[2:0]] : Channel 6 active, 24 gain, no SRB2 & input shorted | |
0x61, # CH7SET [61] [PD7, GAIN7[2:0], SRB2, MUX7[2:0]] : Channel 7 active, 24 gain, no SRB2 & input shorted | |
0x61, # CH8SET [61] [PD8, GAIN8[2:0], SRB2, MUX8[2:0]] : Channel 8 active, 24 gain, no SRB2 & input shorted | |
0x00, # BIAS_SENSP [00] [BIASP8, BIASP7, BIASP6, BIASP5, BIASP4, BIASP3, BIASP2, BIASP1] : No bias | |
0x00, # BIAS_SENSN [00] [BIASN8, BIASN7, BIASN6, BIASN5, BIASN4, BIASN3, BIASN2, BIASN1] No bias | |
0x00, # LOFF_SENSP [00] [LOFFP8, LOFFP7, LOFFP6, LOFFP5, LOFFP4, LOFFP3, LOFFP2, LOFFP1] : No lead-off | |
0x00, # LOFF_SENSN [00] [LOFFM8, LOFFM7, LOFFM6, LOFFM5, LOFFM4, LOFFM3, LOFFM2, LOFFM1] : No lead-off | |
0x00, # LOFF_FLIP [00] [LOFF_FLIP8, LOFF_FLIP7, LOFF_FLIP6, LOFF_FLIP5, LOFF_FLIP4, LOFF_FLIP3, LOFF_FLIP2, LOFF_FLIP1] : No lead-off flip | |
# Lead-Off Status Registers (Read-Only Registers): | |
0x00, # LOFF_STATP [00] [IN8P_OFF, IN7P_OFF, IN6P_OFF, IN5P_OFF, IN4P_OFF, IN3P_OFF, IN2P_OFF, IN1P_OFF] : Lead-off positive status (RO) | |
0x00, # LOFF_STATN [00] [IN8M_OFF, IN7M_OFF, IN6M_OFF, IN5M_OFF, IN4M_OFF, IN3M_OFF, IN2M_OFF, IN1M_OFF] : Laed-off negative status (RO) | |
# GPIO and OTHER Registers: | |
0x0F, # GPIO [0F] [GPIOD[4:1], GPIOC[4:1]] : All GPIOs as inputs | |
0x00, # MISC1 [00] [0, 0, SRB1, 0, 0, 0, 0, 0] : Disable SRBM | |
0x00, # MISC2 [00] [00] : Unused | |
0x00, # CONFIG4 [00] [0, 0, 0, 0, SINGLE_SHOT, 0, PD_LOFF_COMP(bar), 0] : Single-shot, lead-off comparator disabled | |
] | |
FRONTEND_CONFIG = [ | |
0x3E, # ID (RO) | |
0x95, # CONFIG1 [95] [1, DAISY_EN(bar), CLK_EN, 1, 0, DR[2:0]] : Datarate = 500 SPS | |
0xD0, # CONFIG2 [C0] [1, 1, 0, INT_CAL, 0, CAL_AMP0, CAL_FREQ[1:0]] | |
0xE8, # CONFIG3 [E0] [PD_REFBUF(bar), 1, 1, BIAS_MEAS, BIASREF_INT, PD_BIAS(bar), BIAS_LOFF_SENS, BIAS_STAT] : Power-down reference buffer, no bias | |
0x00, # No lead-off | |
0x60, # CH1SET [60] [PD1, GAIN1[2:0], SRB2, MUX1[2:0]] | |
0x60, # CH2SET 66 | |
0x60, # CH3SET | |
0x60, # CH4SET | |
0x60, # CH5SET | |
0x60, # CH6SET | |
0x60, # CH7SET | |
0x60, # CH8SET | |
0x04, # BIAS_SENSP 04 | |
0x04, # BIAS_SENSN 04 | |
0xFF, # LOFF_SENSP Lead-off on all positive pins? | |
0xFF, # LOFF_SENSN Lead-off on all negative pins? | |
0x00, # Normal lead-off | |
0x00, # Lead-off positive status (RO) | |
0x00, # Lead-off negative status (RO) | |
0x00, # All GPIOs as output ? | |
0x20, # Enable SRB1 | |
] | |
EDF_PATH = Path.home() / 'workspace' / 'edf_recording' | |
def to_ads_frequency(frequency): | |
possible_datarates = [250, 500, 1000, 2000, 4000, 8000, 16000] | |
dr = 16000 | |
for i in possible_datarates: | |
if i >= frequency: | |
dr = i | |
break | |
return dr | |
def mod_config(config, datarate, channel_modes): | |
# datarate: | |
possible_datarates = [(250, 0x06), | |
(500, 0x05), | |
(1000, 0x04), | |
(2000, 0x03), | |
(4000, 0x02), | |
(8000, 0x01), | |
(16000, 0x00)] | |
mod_dr = 0x00 | |
for i, j in possible_datarates: | |
if i >= datarate: | |
mod_dr = j | |
break | |
new_cf1 = config[1] & 0xF8 | |
new_cf1 = new_cf1 | mod_dr | |
config[1] = new_cf1 | |
# bias: | |
assert len(channel_modes) == 8 | |
config[13] = 0x00 # clear BIAS_SENSP | |
config[14] = 0x00 # clear BIAS_SENSN | |
bias_active = False | |
for chan_i, chan_mode in enumerate(channel_modes): | |
n = 5 + chan_i | |
mod = config[n] & 0x78 # clear PDn and MUX[2:0] | |
if chan_mode == 'simple': | |
pass # PDn = 0 and normal electrode (000) | |
elif chan_mode == 'disabled': | |
mod = mod | 0x81 # PDn = 1 and input shorted (001) | |
elif chan_mode == 'with bias': | |
bit_i = 1 << chan_i | |
config[13] = config[13] | bit_i | |
config[14] = config[14] | bit_i | |
bias_active = True | |
elif chan_mode == 'bias out': | |
mod = mod | 0x06 # MUX[2:0] = BIAS_DRP (110) | |
bias_active = True | |
else: | |
assert False, f"Wrong key: {chan_mode}." | |
config[n] = mod | |
print(f"DEBUG: new config[{n}]:{hex(config[n])}") | |
print(f"DEBUG: new config[13]:{hex(config[13])}") | |
print(f"DEBUG: new config[14]:{hex(config[14])}") | |
if bias_active: | |
config[3] = config[3] | 0x04 # PD_BIAS bar = 1 | |
else: | |
config[3] = config[3] & 0xFB # PD_BIAS bar = 0 | |
print(f"DEBUG: new config[3]:{hex(config[3])}") | |
return config | |
def filter_24(value): | |
return (value * 4.5) / (2**23 - 1) # 23 because 1 bit is lost for sign | |
def filter_2scomplement_np(value): | |
return np.where((value & (1 << 23)) != 0, value - (1 << 24), value) | |
def filter_np(value): | |
return filter_24(filter_2scomplement_np(value)) | |
def shift_numpy(arr, num, fill_value=np.nan): | |
result = np.empty_like(arr) | |
if num > 0: | |
result[:num] = fill_value | |
result[num:] = arr[:-num] | |
elif num < 0: | |
result[num:] = fill_value | |
result[:num] = arr[-num:] | |
else: | |
result[:] = arr | |
return result | |
class FIR: | |
def __init__(self, nb_channels, coefficients, buffer=None): | |
self.coefficients = np.expand_dims(np.array(coefficients), axis=1) | |
self.taps = len(self.coefficients) | |
self.nb_channels = nb_channels | |
self.buffer = np.array(z) if buffer is not None else np.zeros((self.taps, self.nb_channels)) | |
def filter(self, x): | |
self.buffer = shift_numpy(self.buffer, 1, x) | |
filtered = np.sum(self.buffer * self.coefficients, axis=0) | |
return filtered | |
class FilterPipeline: | |
def __init__(self, | |
nb_channels, | |
sampling_rate, | |
power_line_fq=60, | |
use_custom_fir=False, | |
custom_fir_order=10, | |
custom_fir_cutoff=30, | |
alpha_avg=0.1, | |
alpha_std=0.001, | |
epsilon=0.000001): | |
self.nb_channels = nb_channels | |
assert power_line_fq in [50, 60], f"The only supported power line frequencies are 50 Hz and 60 Hz" | |
if power_line_fq == 60: | |
self.notch_coeff1 = -0.12478308884588535 | |
self.notch_coeff2 = 0.98729186796473023 | |
self.notch_coeff3 = 0.99364593398236511 | |
self.notch_coeff4 = -0.12478308884588535 | |
self.notch_coeff5 = 0.99364593398236511 | |
else: | |
self.notch_coeff1 = -0.61410695998423581 | |
self.notch_coeff2 = 0.98729186796473023 | |
self.notch_coeff3 = 0.99364593398236511 | |
self.notch_coeff4 = -0.61410695998423581 | |
self.notch_coeff5 = 0.99364593398236511 | |
self.dfs = [np.zeros(self.nb_channels), np.zeros(self.nb_channels)] | |
self.moving_average = None | |
self.moving_variance = np.zeros(self.nb_channels) | |
self.ALPHA_AVG = alpha_avg | |
self.ALPHA_STD = alpha_std | |
self.EPSILON = epsilon | |
if use_custom_fir: | |
self.fir_coef = firwin(numtaps=custom_fir_order+1, cutoff=custom_fir_cutoff, fs=sampling_rate) | |
else: | |
self.fir_coef = [ | |
0.001623780150148094927192721215192250384, | |
0.014988684599373741992978104065059596905, | |
0.021287595318265635502275046064823982306, | |
0.007349500393709578957568417933998716762, | |
-0.025127515717112181709014251396183681209, | |
-0.052210507359822452833064687638398027048, | |
-0.039273839505489904766477593511808663607, | |
0.033021568427940004020193498490698402748, | |
0.147606943281569008563636202779889572412, | |
0.254000252034505602516389899392379447818, | |
0.297330876398883392486283128164359368384, | |
0.254000252034505602516389899392379447818, | |
0.147606943281569008563636202779889572412, | |
0.033021568427940004020193498490698402748, | |
-0.039273839505489904766477593511808663607, | |
-0.052210507359822452833064687638398027048, | |
-0.025127515717112181709014251396183681209, | |
0.007349500393709578957568417933998716762, | |
0.021287595318265635502275046064823982306, | |
0.014988684599373741992978104065059596905, | |
0.001623780150148094927192721215192250384] | |
self.fir = FIR(self.nb_channels, self.fir_coef) | |
def filter(self, value): | |
""" | |
value: a numpy array of shape (data series, channels) | |
""" | |
for i, x in enumerate(value): # loop over the data series | |
# FIR: | |
x = self.fir.filter(x) | |
# notch: | |
denAccum = (x - self.notch_coeff1 * self.dfs[0]) - self.notch_coeff2 * self.dfs[1] | |
x = (self.notch_coeff3 * denAccum + self.notch_coeff4 * self.dfs[0]) + self.notch_coeff5 * self.dfs[1] | |
self.dfs[1] = self.dfs[0] | |
self.dfs[0] = denAccum | |
# standardization: | |
if self.moving_average is not None: | |
delta = x - self.moving_average | |
self.moving_average = self.moving_average + self.ALPHA_AVG * delta | |
self.moving_variance = (1 - self.ALPHA_STD) * (self.moving_variance + self.ALPHA_STD * delta**2) | |
moving_std = np.sqrt(self.moving_variance) | |
x = (x - self.moving_average) / (moving_std + self.EPSILON) | |
else: | |
self.moving_average = x | |
value[i] = x | |
return value | |
class LiveDisplay(): | |
def __init__(self, channel_names, window_len=100): | |
self.datapoint_dim = len(channel_names) | |
self.pp = ProgressPlot(plot_names=channel_names, max_window_len=window_len) | |
def add_datapoints(self, datapoints): | |
""" | |
Adds 8 lists of datapoints to the plot | |
Args: | |
datapoints: list of 8 lists of floats (or list of 8 floats) | |
""" | |
disp_list = [] | |
for datapoint in datapoints: | |
d = [[elt] for elt in datapoint] | |
disp_list.append(d) | |
self.pp.update_with_datapoints(disp_list) | |
def add_datapoint(self, datapoint): | |
disp_list = [[elt] for elt in datapoint] | |
self.pp.update(disp_list) | |
def _capture_process(p_data_o, p_msg_io, duration, frequency, python_clock, time_msg_in, channel_states): | |
""" | |
Args: | |
p_data_o: multiprocessing.Pipe: captured datapoints are put here | |
p_msg_io: mutliprocessing.Pipe: to communicate with the parent process | |
duration: float: max duration of the experiment in seconds | |
frequency: float: sampling frequency | |
ptyhon_clock: bool: if True, the Coral clock is used, otherwise, the ADS interrupts are used | |
time_msg_in: float: min time between attempts to recv incomming messages | |
""" | |
if duration <= 0: | |
duration = np.inf | |
sample_time = 1 / frequency | |
frontend = Frontend() | |
leds = LEDs() | |
leds.led2(Color.PURPLE) | |
leds.aquisition(True) | |
try: | |
data = frontend.read_regs(0x00, 1) | |
assert data == [0x3E], "The communication with the ADS cannot be established." | |
leds.led2(Color.BLUE) | |
config = FRONTEND_CONFIG | |
if python_clock: # set ADS to 2 * frequency | |
datarate = 2 * frequency | |
else: # set ADS to frequency | |
datarate = frequency | |
config = mod_config(config, datarate, channel_states) | |
frontend.write_regs(0x00, config) | |
data = frontend.read_regs(0x00, len(config)) | |
assert data == config, f"Wrong config: {data} vs {config}" | |
frontend.start() | |
leds.led2(Color.PURPLE) | |
while not frontend.is_ready(): | |
pass | |
# Set up of leds | |
leds.aquisition(True) | |
sleep(0.5) | |
leds.aquisition(False) | |
sleep(0.5) | |
leds.aquisition(True) | |
c = True | |
it = 0 | |
t_start = time.time() | |
t_max = t_start + duration | |
t = t_start | |
# first sample: | |
reading = frontend.read() | |
datapoint = reading.channels() | |
p_data_o.send(datapoint) | |
t_next = t + sample_time | |
t_chk_msg = t + time_msg_in | |
# sampling loop: | |
while c and t < t_max: | |
t = time.time() | |
if python_clock: | |
if t <= t_next: | |
time.sleep(t_next - t) | |
t_next += sample_time | |
reading = frontend.read() | |
else: | |
reading = frontend.wait_new_data() | |
datapoint = reading.channels() | |
p_data_o.send(datapoint) | |
# Check for messages | |
if t >= t_chk_msg: | |
t_chk_msg = t + time_msg_in | |
if p_msg_io.poll(): | |
message = p_msg_io.recv() | |
if message == 'STOP': | |
c = False | |
it += 1 | |
t = time.time() | |
tot = (t - t_start) / it | |
p_msg_io.send(("PRT", f"Average frequency: {1 / tot} Hz for {it} samples")) | |
leds.aquisition(False) | |
finally: | |
leds.close() | |
frontend.close() | |
p_msg_io.send('STOP') | |
p_msg_io.close() | |
p_data_o.close() | |
class Capture: | |
def __init__(self): | |
# {now.strftime('%m_%d_%Y_%H_%M_%S')} | |
self.filename = EDF_PATH / 'recording.edf' | |
self._p_capture = None | |
self.__capture_on = False | |
self.frequency = 250 | |
self.duration = 10 | |
self.power_line = 60 | |
self.polyak_mean = 0.1 | |
self.polyak_std = 0.001 | |
self.epsilon = 0.000001 | |
self.custom_fir = False | |
self.custom_fir_order = 10 | |
self.custom_fir_cutoff = 30 | |
self.filter = True | |
self.record = False | |
self.lsl = False | |
self.display = False | |
self.python_clock = True | |
self.edf_writer = None | |
self.edf_buffer = [] | |
self.nb_signals = 8 | |
self.samples_per_datarecord_array = self.frequency | |
self.physical_max = 5 | |
self.physical_min = -5 | |
self.signal_labels = ['ch1', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6', 'ch7', 'ch8'] | |
self._lock_msg_out = Lock() | |
self._msg_out = None | |
self._t_capture = None | |
self.channel_states = ['disabled', 'disabled', 'disabled', 'disabled', 'disabled', 'disabled', 'disabled', 'disabled'] | |
# widgets =============================== | |
# CHANNELS ------------------------------ | |
self.b_radio_ch1 = widgets.RadioButtons( | |
options=['disabled', 'simple', 'with bias', 'bias out'], | |
value='disabled', | |
disabled=True | |
) | |
self.b_radio_ch2 = widgets.RadioButtons( | |
options=['disabled', 'simple', 'with bias', 'bias out'], | |
value='disabled', | |
disabled=False | |
) | |
self.b_radio_ch3 = widgets.RadioButtons( | |
options=['disabled', 'simple', 'with bias', 'bias out'], | |
value='disabled', | |
disabled=False | |
) | |
self.b_radio_ch4 = widgets.RadioButtons( | |
options=['disabled', 'simple', 'with bias', 'bias out'], | |
value='disabled', | |
disabled=False | |
) | |
self.b_radio_ch5 = widgets.RadioButtons( | |
options=['disabled', 'simple', 'with bias', 'bias out'], | |
value='disabled', | |
disabled=False | |
) | |
self.b_radio_ch6 = widgets.RadioButtons( | |
options=['disabled', 'simple', 'with bias', 'bias out'], | |
value='disabled', | |
disabled=False | |
) | |
self.b_radio_ch7 = widgets.RadioButtons( | |
options=['disabled', 'simple', 'with bias', 'bias out'], | |
value='disabled', | |
disabled=False | |
) | |
self.b_radio_ch8 = widgets.RadioButtons( | |
options=['disabled', 'simple', 'with bias', 'bias out'], | |
value='disabled', | |
disabled=True | |
) | |
self.b_accordion_channels = widgets.Accordion( | |
children=[ | |
widgets.GridBox([ | |
widgets.Label('CH1'), | |
widgets.Label('CH2'), | |
widgets.Label('CH3'), | |
widgets.Label('CH4'), | |
widgets.Label('CH5'), | |
widgets.Label('CH6'), | |
widgets.Label('CH7'), | |
widgets.Label('CH8'), | |
self.b_radio_ch1, | |
self.b_radio_ch2, | |
self.b_radio_ch3, | |
self.b_radio_ch4, | |
self.b_radio_ch5, | |
self.b_radio_ch6, | |
self.b_radio_ch7, | |
self.b_radio_ch8 | |
], layout=widgets.Layout(grid_template_columns="repeat(8, 90px)")) | |
]) | |
self.b_accordion_channels.set_title(index = 0, title = 'Channels') | |
# OTHERS ------------------------------ | |
self.b_capture = widgets.ToggleButtons( | |
options=['Stop', 'Start'], | |
description='Capture:', | |
disabled=False, | |
button_style='', # 'success', 'info', 'warning', 'danger' or '' | |
tooltips=['Stop capture', 'Start capture'], | |
) | |
self.b_clock = widgets.ToggleButtons( | |
options=['Coral', 'ADS'], | |
description='Clock:', | |
disabled=False, | |
button_style='', # 'success', 'info', 'warning', 'danger' or '' | |
tooltips=['Use Coral clock (very precise, not very timely)', | |
'Use ADS clock (not very precise, very timely)'], | |
) | |
self.b_power_line = widgets.ToggleButtons( | |
options=['60 Hz', '50 Hz'], | |
description='Power line:', | |
disabled=False, | |
button_style='', # 'success', 'info', 'warning', 'danger' or '' | |
tooltips=['North America 60 Hz', | |
'Europe 50 Hz'], | |
) | |
self.b_custom_fir = widgets.ToggleButtons( | |
options=['Default', 'Custom'], | |
description='FIR filter:', | |
disabled=False, | |
button_style='', # 'success', 'info', 'warning', 'danger' or '' | |
tooltips=['Use the default 30Hz low-pass FIR from the Portiloop paper', | |
'Use a custom FIR'], | |
) | |
self.b_filename = widgets.Text( | |
value='recording.edf', | |
description='Recording:', | |
disabled=False | |
) | |
self.b_frequency = widgets.IntText( | |
value=self.frequency, | |
description='Freq (Hz):', | |
disabled=False | |
) | |
self.b_polyak_mean = widgets.FloatText( | |
value=self.polyak_mean, | |
description='Polyak mean:', | |
disabled=False | |
) | |
self.b_polyak_std = widgets.FloatText( | |
value=self.polyak_std, | |
description='Polyak std:', | |
disabled=False | |
) | |
self.b_epsilon = widgets.FloatText( | |
value=self.epsilon, | |
description='Epsilon:', | |
disabled=False | |
) | |
self.b_custom_fir_order = widgets.IntText( | |
value=self.custom_fir_order, | |
description='FIR order:', | |
disabled=True | |
) | |
self.b_custom_fir_cutoff = widgets.IntText( | |
value=self.custom_fir_cutoff, | |
description='FIR cutoff:', | |
disabled=True | |
) | |
self.b_accordion_filter = widgets.Accordion( | |
children=[ | |
widgets.VBox([ | |
self.b_custom_fir, | |
self.b_custom_fir_order, | |
self.b_custom_fir_cutoff, | |
self.b_polyak_mean, | |
self.b_polyak_std, | |
self.b_epsilon | |
]) | |
]) | |
self.b_accordion_filter.set_title(index = 0, title = 'Filtering') | |
self.b_duration = widgets.IntText( | |
value=self.duration, | |
description='Time (s):', | |
disabled=False | |
) | |
self.b_filter = widgets.Checkbox( | |
value=self.filter, | |
description='Filter', | |
disabled=False, | |
indent=False | |
) | |
self.b_record = widgets.Checkbox( | |
value=self.record, | |
description='Record EDF', | |
disabled=False, | |
indent=False | |
) | |
self.b_lsl = widgets.Checkbox( | |
value=self.lsl, | |
description='Stream LSL', | |
disabled=False, | |
indent=False | |
) | |
self.b_display = widgets.Checkbox( | |
value=self.display, | |
description='Display', | |
disabled=False, | |
indent=False | |
) | |
# CALLBACKS ---------------------- | |
self.b_capture.observe(self.on_b_capture, 'value') | |
self.b_clock.observe(self.on_b_clock, 'value') | |
self.b_frequency.observe(self.on_b_frequency, 'value') | |
self.b_duration.observe(self.on_b_duration, 'value') | |
self.b_filter.observe(self.on_b_filter, 'value') | |
self.b_record.observe(self.on_b_record, 'value') | |
self.b_lsl.observe(self.on_b_lsl, 'value') | |
self.b_display.observe(self.on_b_display, 'value') | |
self.b_filename.observe(self.on_b_filename, 'value') | |
self.b_radio_ch2.observe(self.on_b_radio_ch2, 'value') | |
self.b_radio_ch3.observe(self.on_b_radio_ch3, 'value') | |
self.b_radio_ch4.observe(self.on_b_radio_ch4, 'value') | |
self.b_radio_ch5.observe(self.on_b_radio_ch5, 'value') | |
self.b_radio_ch6.observe(self.on_b_radio_ch6, 'value') | |
self.b_radio_ch7.observe(self.on_b_radio_ch7, 'value') | |
self.b_power_line.observe(self.on_b_power_line, 'value') | |
self.b_custom_fir.observe(self.on_b_custom_fir, 'value') | |
self.b_custom_fir_order.observe(self.on_b_custom_fir_order, 'value') | |
self.b_custom_fir_cutoff.observe(self.on_b_custom_fir_cutoff, 'value') | |
self.b_polyak_mean.observe(self.on_b_polyak_mean, 'value') | |
self.b_polyak_std.observe(self.on_b_polyak_std, 'value') | |
self.b_epsilon.observe(self.on_b_epsilon, 'value') | |
self.display_buttons() | |
def __del__(self): | |
self.b_capture.close() | |
def display_buttons(self): | |
display(widgets.VBox([self.b_accordion_channels, | |
self.b_frequency, | |
self.b_duration, | |
self.b_filename, | |
self.b_power_line, | |
self.b_clock, | |
widgets.HBox([self.b_filter, self.b_record, self.b_lsl, self.b_display]), | |
self.b_accordion_filter, | |
self.b_capture])) | |
def enable_buttons(self): | |
self.b_frequency.disabled = False | |
self.b_duration.disabled = False | |
self.b_filename.disabled = False | |
self.b_filter.disabled = False | |
self.b_record.disabled = False | |
self.b_record.lsl = False | |
self.b_display.disabled = False | |
self.b_clock.disabled = False | |
self.b_radio_ch2.disabled = False | |
self.b_radio_ch3.disabled = False | |
self.b_radio_ch4.disabled = False | |
self.b_radio_ch5.disabled = False | |
self.b_radio_ch6.disabled = False | |
self.b_radio_ch7.disabled = False | |
self.b_power_line.disabled = False | |
self.b_polyak_mean.disabled = False | |
self.b_polyak_std.disabled = False | |
self.b_epsilon.disabled = False | |
self.b_custom_fir.disabled = False | |
self.b_custom_fir_order.disabled = not self.custom_fir | |
self.b_custom_fir_cutoff.disabled = not self.custom_fir | |
def disable_buttons(self): | |
self.b_frequency.disabled = True | |
self.b_duration.disabled = True | |
self.b_filename.disabled = True | |
self.b_filter.disabled = True | |
self.b_record.disabled = True | |
self.b_record.lsl = True | |
self.b_display.disabled = True | |
self.b_clock.disabled = True | |
self.b_radio_ch2.disabled = True | |
self.b_radio_ch3.disabled = True | |
self.b_radio_ch4.disabled = True | |
self.b_radio_ch5.disabled = True | |
self.b_radio_ch6.disabled = True | |
self.b_radio_ch7.disabled = True | |
self.b_power_line.disabled = True | |
self.b_polyak_mean.disabled = True | |
self.b_polyak_std.disabled = True | |
self.b_epsilon.disabled = True | |
self.b_custom_fir.disabled = True | |
self.b_custom_fir_order.disabled = True | |
self.b_custom_fir_cutoff.disabled = True | |
def on_b_radio_ch2(self, value): | |
self.channel_states[1] = value['new'] | |
def on_b_radio_ch3(self, value): | |
self.channel_states[2] = value['new'] | |
def on_b_radio_ch4(self, value): | |
self.channel_states[3] = value['new'] | |
def on_b_radio_ch5(self, value): | |
self.channel_states[4] = value['new'] | |
def on_b_radio_ch6(self, value): | |
self.channel_states[5] = value['new'] | |
def on_b_radio_ch7(self, value): | |
self.channel_states[6] = value['new'] | |
def on_b_capture(self, value): | |
val = value['new'] | |
if val == 'Start': | |
clear_output() | |
self.disable_buttons() | |
if not self.python_clock: # ADS clock: force the frequency to an ADS-compatible frequency | |
self.frequency = to_ads_frequency(self.frequency) | |
self.b_frequency.value = self.frequency | |
self.display_buttons() | |
with self._lock_msg_out: | |
self._msg_out = None | |
if self._t_capture is not None: | |
warnings.warn("Capture already running, operation aborted.") | |
return | |
self._t_capture = Thread(target=self.start_capture, | |
args=(self.filter, self.record, self.lsl, self.display, 500, self.python_clock)) | |
self._t_capture.start() | |
elif val == 'Stop': | |
with self._lock_msg_out: | |
self._msg_out = 'STOP' | |
assert self._t_capture is not None | |
self._t_capture.join() | |
self._t_capture = None | |
self.enable_buttons() | |
def on_b_custom_fir(self, value): | |
val = value['new'] | |
if val == 'Default': | |
self.custom_fir = False | |
elif val == 'Custom': | |
self.custom_fir = True | |
self.enable_buttons() | |
def on_b_clock(self, value): | |
val = value['new'] | |
if val == 'Coral': | |
self.python_clock = True | |
elif val == 'ADS': | |
self.python_clock = False | |
def on_b_power_line(self, value): | |
val = value['new'] | |
if val == '60 Hz': | |
self.power_line = 60 | |
elif val == '50 Hz': | |
self.python_clock = 50 | |
def on_b_frequency(self, value): | |
val = value['new'] | |
if val > 0: | |
self.frequency = val | |
else: | |
self.b_frequency.value = self.frequency | |
def on_b_filename(self, value): | |
val = value['new'] | |
if val != '': | |
if not val.endswith('.edf'): | |
val += '.edf' | |
self.filename = EDF_PATH / val | |
else: | |
now = datetime.now() | |
self.filename = EDF_PATH / 'recording.edf' | |
def on_b_duration(self, value): | |
val = value['new'] | |
if val > 0: | |
self.duration = val | |
def on_b_custom_fir_order(self, value): | |
val = value['new'] | |
if val > 0: | |
self.custom_fir_order = val | |
else: | |
self.b_custom_fir_order.value = self.custom_fir_order | |
def on_b_custom_fir_cutoff(self, value): | |
val = value['new'] | |
if val > 0 and val < self.frequency / 2: | |
self.custom_fir_cutoff = val | |
else: | |
self.b_custom_fir_cutoff.value = self.custom_fir_cutoff | |
def on_b_polyak_mean(self, value): | |
val = value['new'] | |
if val >= 0 and val <= 1: | |
self.polyak_mean = val | |
else: | |
self.b_polyak_mean.value = self.polyak_mean | |
def on_b_polyak_std(self, value): | |
val = value['new'] | |
if val >= 0 and val <= 1: | |
self.polyak_std = val | |
else: | |
self.b_polyak_std.value = self.polyak_std | |
def on_b_epsilon(self, value): | |
val = value['new'] | |
if val > 0 and val < 0.1: | |
self.epsilon = val | |
else: | |
self.b_epsilon.value = self.epsilon | |
def on_b_filter(self, value): | |
val = value['new'] | |
self.filter = val | |
def on_b_record(self, value): | |
val = value['new'] | |
self.record = val | |
def on_b_lsl(self, value): | |
val = value['new'] | |
self.lsl = val | |
def on_b_display(self, value): | |
val = value['new'] | |
self.display = val | |
def open_recording_file(self): | |
nb_signals = self.nb_signals | |
samples_per_datarecord_array = self.samples_per_datarecord_array | |
physical_max = self.physical_max | |
physical_min = self.physical_min | |
signal_labels = self.signal_labels | |
print(f"Will store edf recording in {self.filename}") | |
self.edf_writer = EDFwriter(p_path=str(self.filename), | |
f_file_type=EDFwriter.EDFLIB_FILETYPE_EDFPLUS, | |
number_of_signals=nb_signals) | |
for signal in range(nb_signals): | |
assert self.edf_writer.setSampleFrequency(signal, samples_per_datarecord_array) == 0 | |
assert self.edf_writer.setPhysicalMaximum(signal, physical_max) == 0 | |
assert self.edf_writer.setPhysicalMinimum(signal, physical_min) == 0 | |
assert self.edf_writer.setDigitalMaximum(signal, 32767) == 0 | |
assert self.edf_writer.setDigitalMinimum(signal, -32768) == 0 | |
assert self.edf_writer.setSignalLabel(signal, signal_labels[signal]) == 0 | |
assert self.edf_writer.setPhysicalDimension(signal, 'V') == 0 | |
def close_recording_file(self): | |
assert self.edf_writer.close() == 0 | |
def add_recording_data(self, data): | |
self.edf_buffer += data | |
if len(self.edf_buffer) >= self.samples_per_datarecord_array: | |
datarecord_array = self.edf_buffer[:self.samples_per_datarecord_array] | |
self.edf_buffer = self.edf_buffer[self.samples_per_datarecord_array:] | |
datarecord_array = np.array(datarecord_array).transpose() | |
assert len(datarecord_array) == self.nb_signals, f"len(data)={len(data)}!={self.nb_signals}" | |
for d in datarecord_array: | |
assert len(d) == self.samples_per_datarecord_array, f"{len(d)}!={self.samples_per_datarecord_array}" | |
assert self.edf_writer.writeSamples(d) == 0 | |
def start_capture(self, | |
filter, | |
record, | |
lsl, | |
viz, | |
width, | |
python_clock): | |
if self.__capture_on: | |
warnings.warn("Capture is already ongoing, ignoring command.") | |
return | |
else: | |
self.__capture_on = True | |
p_msg_io, p_msg_io_2 = mp.Pipe() | |
p_data_i, p_data_o = mp.Pipe(duplex=False) | |
SAMPLE_TIME = 1 / self.frequency | |
if filter: | |
fp = FilterPipeline(nb_channels=8, | |
sampling_rate=self.frequency, | |
power_line_fq=self.power_line, | |
use_custom_fir=self.custom_fir, | |
custom_fir_order=self.custom_fir_order, | |
custom_fir_cutoff=self.custom_fir_cutoff, | |
alpha_avg=self.polyak_mean, | |
alpha_std=self.polyak_std, | |
epsilon=self.epsilon) | |
self._p_capture = mp.Process(target=_capture_process, | |
args=(p_data_o, | |
p_msg_io_2, | |
self.duration, | |
self.frequency, | |
python_clock, | |
1.0, | |
self.channel_states) | |
) | |
self._p_capture.start() | |
# print(f"PID capture: {self._p_capture.pid}") | |
if viz: | |
live_disp = LiveDisplay(channel_names = self.signal_labels, window_len=width) | |
if record: | |
self.open_recording_file() | |
if lsl: | |
from pylsl import StreamInfo, StreamOutlet | |
lsl_info = StreamInfo(name='Portiloop', | |
type='EEG', | |
channel_count=8, | |
channel_format='float32', | |
source_id='') # TODO: replace this by unique device identifier | |
lsl_outlet = StreamOutlet(lsl_info) | |
buffer = [] | |
while True: | |
with self._lock_msg_out: | |
if self._msg_out is not None: | |
p_msg_io.send(self._msg_out) | |
self._msg_out = None | |
if p_msg_io.poll(): | |
mess = p_msg_io.recv() | |
if mess == 'STOP': | |
break | |
elif mess[0] == 'PRT': | |
print(mess[1]) | |
# retrieve all data points from p_data and put them in a list of np.array: | |
point = None | |
if p_data_i.poll(timeout=SAMPLE_TIME): | |
point = p_data_i.recv() | |
else: | |
continue | |
n_array = np.array([point]) | |
n_array = filter_np(n_array) | |
if filter: | |
n_array = fp.filter(n_array) | |
filtered_point = n_array.tolist() | |
if lsl: | |
lsl_outlet.push_sample(filtered_point[-1]) | |
buffer += filtered_point | |
if len(buffer) >= 50: | |
if viz: | |
live_disp.add_datapoints(buffer) | |
if record: | |
self.add_recording_data(buffer) | |
buffer = [] | |
# empty pipes | |
while True: | |
if p_data_i.poll(): | |
_ = p_data_i.recv() | |
elif p_msg_io.poll(): | |
_ = p_msg_io.recv() | |
else: | |
break | |
p_data_i.close() | |
p_msg_io.close() | |
if record: | |
self.close_recording_file() | |
self._p_capture.join() | |
self.__capture_on = False | |
if __name__ == "__main__": | |
pass | |