import os import sys from time import sleep import time import numpy as np import matplotlib.pyplot as plt import os from pathlib import Path from datetime import datetime, timedelta import multiprocessing as mp import warnings import shutil from threading import Thread, Lock import matplotlib.pyplot as plt from EDFlib.edfwriter import EDFwriter from scipy.signal import firwin from portilooplot.jupyter_plot import ProgressPlot from portiloop.hardware.frontend import Frontend from portiloop.hardware.leds import LEDs, Color from IPython.display import clear_output, display import ipywidgets as widgets DEFAULT_FRONTEND_CONFIG = [ # nomenclature: name [default setting] [bits 7-0] : description # Read only ID: 0x3E, # ID [xx] [REV_ID[2:0], 1, DEV_ID[1:0], NU_CH[1:0]] : (RO) # Global Settings Across Channels: 0x96, # CONFIG1 [96] [1, DAISY_EN(bar), CLK_EN, 1, 0, DR[2:0]] : Datarate = 250 SPS 0xC0, # CONFIG2 [C0] [1, 1, 0, INT_CAL, 0, CAL_AMP0, CAL_FREQ[1:0]] : No tests 0x60, # CONFIG3 [60] [PD_REFBUF(bar), 1, 1, BIAS_MEAS, BIASREF_INT, PD_BIAS(bar), BIAS_LOFF_SENS, BIAS_STAT] : Power-down reference buffer, no bias 0x00, # LOFF [00] [COMP_TH[2:0], 0, ILEAD_OFF[1:0], FLEAD_OFF[1:0]] : No lead-off # Channel-Specific Settings: 0x61, # CH1SET [61] [PD1, GAIN1[2:0], SRB2, MUX1[2:0]] : Channel 1 active, 24 gain, no SRB2 & input shorted 0x61, # CH2SET [61] [PD2, GAIN2[2:0], SRB2, MUX2[2:0]] : Channel 2 active, 24 gain, no SRB2 & input shorted 0x61, # CH3SET [61] [PD3, GAIN3[2:0], SRB2, MUX3[2:0]] : Channel 3 active, 24 gain, no SRB2 & input shorted 0x61, # CH4SET [61] [PD4, GAIN4[2:0], SRB2, MUX4[2:0]] : Channel 4 active, 24 gain, no SRB2 & input shorted 0x61, # CH5SET [61] [PD5, GAIN5[2:0], SRB2, MUX5[2:0]] : Channel 5 active, 24 gain, no SRB2 & input shorted 0x61, # CH6SET [61] [PD6, GAIN6[2:0], SRB2, MUX6[2:0]] : Channel 6 active, 24 gain, no SRB2 & input shorted 0x61, # CH7SET [61] [PD7, GAIN7[2:0], SRB2, MUX7[2:0]] : Channel 7 active, 24 gain, no SRB2 & input shorted 0x61, # CH8SET [61] [PD8, GAIN8[2:0], SRB2, MUX8[2:0]] : Channel 8 active, 24 gain, no SRB2 & input shorted 0x00, # BIAS_SENSP [00] [BIASP8, BIASP7, BIASP6, BIASP5, BIASP4, BIASP3, BIASP2, BIASP1] : No bias 0x00, # BIAS_SENSN [00] [BIASN8, BIASN7, BIASN6, BIASN5, BIASN4, BIASN3, BIASN2, BIASN1] No bias 0x00, # LOFF_SENSP [00] [LOFFP8, LOFFP7, LOFFP6, LOFFP5, LOFFP4, LOFFP3, LOFFP2, LOFFP1] : No lead-off 0x00, # LOFF_SENSN [00] [LOFFM8, LOFFM7, LOFFM6, LOFFM5, LOFFM4, LOFFM3, LOFFM2, LOFFM1] : No lead-off 0x00, # LOFF_FLIP [00] [LOFF_FLIP8, LOFF_FLIP7, LOFF_FLIP6, LOFF_FLIP5, LOFF_FLIP4, LOFF_FLIP3, LOFF_FLIP2, LOFF_FLIP1] : No lead-off flip # Lead-Off Status Registers (Read-Only Registers): 0x00, # LOFF_STATP [00] [IN8P_OFF, IN7P_OFF, IN6P_OFF, IN5P_OFF, IN4P_OFF, IN3P_OFF, IN2P_OFF, IN1P_OFF] : Lead-off positive status (RO) 0x00, # LOFF_STATN [00] [IN8M_OFF, IN7M_OFF, IN6M_OFF, IN5M_OFF, IN4M_OFF, IN3M_OFF, IN2M_OFF, IN1M_OFF] : Laed-off negative status (RO) # GPIO and OTHER Registers: 0x0F, # GPIO [0F] [GPIOD[4:1], GPIOC[4:1]] : All GPIOs as inputs 0x00, # MISC1 [00] [0, 0, SRB1, 0, 0, 0, 0, 0] : Disable SRBM 0x00, # MISC2 [00] [00] : Unused 0x00, # CONFIG4 [00] [0, 0, 0, 0, SINGLE_SHOT, 0, PD_LOFF_COMP(bar), 0] : Single-shot, lead-off comparator disabled ] FRONTEND_CONFIG = [ 0x3E, # ID (RO) 0x95, # CONFIG1 [95] [1, DAISY_EN(bar), CLK_EN, 1, 0, DR[2:0]] : Datarate = 500 SPS 0xD0, # CONFIG2 [C0] [1, 1, 0, INT_CAL, 0, CAL_AMP0, CAL_FREQ[1:0]] 0xE8, # CONFIG3 [E0] [PD_REFBUF(bar), 1, 1, BIAS_MEAS, BIASREF_INT, PD_BIAS(bar), BIAS_LOFF_SENS, BIAS_STAT] : Power-down reference buffer, no bias 0x00, # No lead-off 0x60, # CH1SET [60] [PD1, GAIN1[2:0], SRB2, MUX1[2:0]] 0x60, # CH2SET 66 0x60, # CH3SET 0x60, # CH4SET 0x60, # CH5SET 0x60, # CH6SET 0x60, # CH7SET 0x60, # CH8SET 0x04, # BIAS_SENSP 04 0x04, # BIAS_SENSN 04 0xFF, # LOFF_SENSP Lead-off on all positive pins? 0xFF, # LOFF_SENSN Lead-off on all negative pins? 0x00, # Normal lead-off 0x00, # Lead-off positive status (RO) 0x00, # Lead-off negative status (RO) 0x00, # All GPIOs as output ? 0x20, # Enable SRB1 ] EDF_PATH = Path.home() / 'workspace' / 'edf_recording' def to_ads_frequency(frequency): possible_datarates = [250, 500, 1000, 2000, 4000, 8000, 16000] dr = 16000 for i in possible_datarates: if i >= datarate: dr = i break return dr def mod_config(config, datarate, channel_modes): # datarate: possible_datarates = [(250, 0x06), (500, 0x05), (1000, 0x04), (2000, 0x03), (4000, 0x02), (8000, 0x01), (16000, 0x00)] mod_dr = 0x00 for i, j in possible_datarates: if i >= datarate: mod_dr = j break new_cf1 = config[1] & 0xF8 new_cf1 = new_cf1 | mod_dr config[1] = new_cf1 # bias: assert len(channel_modes) == 8 config[13] = 0x00 # clear BIAS_SENSP config[14] = 0x00 # clear BIAS_SENSN bias_active = False for chan_i, chan_mode in enumerate(channel_modes): n = 5 + chan_i mod = config[n] & 0x78 # clear PDn and MUX[2:0] if chan_mode == 'simple': pass # PDn = 0 and normal electrode (000) elif chan_mode == 'disabled': mod = mod | 0x81 # PDn = 1 and input shorted (001) elif chan_mode == 'with bias': bit_i = 1 << chan_i config[13] = config[13] | bit_i config[14] = config[14] | bit_i bias_active = True elif chan_mode == 'bias out': mod = mod | 0x06 # MUX[2:0] = BIAS_DRP (110) bias_active = True else: assert False, f"Wrong key: {chan_mode}." config[n] = mod print(f"DEBUG: new config[{n}]:{hex(config[n])}") print(f"DEBUG: new config[13]:{hex(config[13])}") print(f"DEBUG: new config[14]:{hex(config[14])}") if bias_active: config[3] = config[3] | 0x04 # PD_BIAS bar = 1 else: config[3] = config[3] & 0xFB # PD_BIAS bar = 0 print(f"DEBUG: new config[3]:{hex(config[3])}") return config def filter_24(value): return (value * 4.5) / (2**23 - 1) # 23 because 1 bit is lost for sign def filter_2scomplement_np(value): return np.where((value & (1 << 23)) != 0, value - (1 << 24), value) def filter_np(value): return filter_24(filter_2scomplement_np(value)) def shift_numpy(arr, num, fill_value=np.nan): result = np.empty_like(arr) if num > 0: result[:num] = fill_value result[num:] = arr[:-num] elif num < 0: result[num:] = fill_value result[:num] = arr[-num:] else: result[:] = arr return result class FIR: def __init__(self, nb_channels, coefficients, buffer=None): self.coefficients = np.expand_dims(np.array(coefficients), axis=1) self.taps = len(self.coefficients) self.nb_channels = nb_channels self.buffer = np.array(z) if buffer is not None else np.zeros((self.taps, self.nb_channels)) def filter(self, x): self.buffer = shift_numpy(self.buffer, 1, x) filtered = np.sum(self.buffer * self.coefficients, axis=0) return filtered class FilterPipeline: def __init__(self, nb_channels, sampling_rate, power_line_fq=60, use_custom_fir=False, custom_fir_order=10, custom_fir_cutoff=30, alpha_avg=0.1, alpha_std=0.001, epsilon=0.000001): self.nb_channels = nb_channels assert power_line_fq in [50, 60], f"The only supported power line frequencies are 50 Hz and 60 Hz" if power_line_fq == 60: self.notch_coeff1 = -0.12478308884588535 self.notch_coeff2 = 0.98729186796473023 self.notch_coeff3 = 0.99364593398236511 self.notch_coeff4 = -0.12478308884588535 self.notch_coeff5 = 0.99364593398236511 else: self.notch_coeff1 = -0.61410695998423581 self.notch_coeff2 = 0.98729186796473023 self.notch_coeff3 = 0.99364593398236511 self.notch_coeff4 = -0.61410695998423581 self.notch_coeff5 = 0.99364593398236511 self.dfs = [np.zeros(self.nb_channels), np.zeros(self.nb_channels)] self.moving_average = None self.moving_variance = np.zeros(self.nb_channels) self.ALPHA_AVG = alpha_avg self.ALPHA_STD = alpha_std self.EPSILON = epsilon if use_custom_fir: self.fir_coef = firwin(numtaps=custom_fir_order+1, cutoff=custom_fir_cutoff, fs=sampling_rate) else: self.fir_coef = [ 0.001623780150148094927192721215192250384, 0.014988684599373741992978104065059596905, 0.021287595318265635502275046064823982306, 0.007349500393709578957568417933998716762, -0.025127515717112181709014251396183681209, -0.052210507359822452833064687638398027048, -0.039273839505489904766477593511808663607, 0.033021568427940004020193498490698402748, 0.147606943281569008563636202779889572412, 0.254000252034505602516389899392379447818, 0.297330876398883392486283128164359368384, 0.254000252034505602516389899392379447818, 0.147606943281569008563636202779889572412, 0.033021568427940004020193498490698402748, -0.039273839505489904766477593511808663607, -0.052210507359822452833064687638398027048, -0.025127515717112181709014251396183681209, 0.007349500393709578957568417933998716762, 0.021287595318265635502275046064823982306, 0.014988684599373741992978104065059596905, 0.001623780150148094927192721215192250384] self.fir = FIR(self.nb_channels, self.fir_coef) def filter(self, value): """ value: a numpy array of shape (data series, channels) """ for i, x in enumerate(value): # loop over the data series # FIR: x = self.fir.filter(x) # notch: denAccum = (x - self.notch_coeff1 * self.dfs[0]) - self.notch_coeff2 * self.dfs[1] x = (self.notch_coeff3 * denAccum + self.notch_coeff4 * self.dfs[0]) + self.notch_coeff5 * self.dfs[1] self.dfs[1] = self.dfs[0] self.dfs[0] = denAccum # standardization: if self.moving_average is not None: delta = x - self.moving_average self.moving_average = self.moving_average + self.ALPHA_AVG * delta self.moving_variance = (1 - self.ALPHA_STD) * (self.moving_variance + self.ALPHA_STD * delta**2) moving_std = np.sqrt(self.moving_variance) x = (x - self.moving_average) / (moving_std + self.EPSILON) else: self.moving_average = x value[i] = x return value class LiveDisplay(): def __init__(self, channel_names, window_len=100): self.datapoint_dim = len(channel_names) self.pp = ProgressPlot(plot_names=channel_names, max_window_len=window_len) def add_datapoints(self, datapoints): """ Adds 8 lists of datapoints to the plot Args: datapoints: list of 8 lists of floats (or list of 8 floats) """ disp_list = [] for datapoint in datapoints: d = [[elt] for elt in datapoint] disp_list.append(d) self.pp.update_with_datapoints(disp_list) def add_datapoint(self, datapoint): disp_list = [[elt] for elt in datapoint] self.pp.update(disp_list) def _capture_process(p_data_o, p_msg_io, duration, frequency, python_clock, time_msg_in, channel_states): """ Args: p_data_o: multiprocessing.Pipe: captured datapoints are put here p_msg_io: mutliprocessing.Pipe: to communicate with the parent process duration: float: max duration of the experiment in seconds frequency: float: sampling frequency ptyhon_clock: bool: if True, the Coral clock is used, otherwise, the ADS interrupts are used time_msg_in: float: min time between attempts to recv incomming messages """ if duration <= 0: duration = np.inf sample_time = 1 / frequency frontend = Frontend() leds = LEDs() leds.led2(Color.PURPLE) leds.aquisition(True) try: data = frontend.read_regs(0x00, 1) assert data == [0x3E], "The communication with the ADS cannot be established." leds.led2(Color.BLUE) config = FRONTEND_CONFIG if python_clock: # set ADS to 2 * frequency datarate = 2 * frequency else: # set ADS to frequency datarate = frequency config = mod_config(config, datarate, channel_states) frontend.write_regs(0x00, config) data = frontend.read_regs(0x00, len(config)) assert data == config, f"Wrong config: {data} vs {config}" frontend.start() leds.led2(Color.PURPLE) while not frontend.is_ready(): pass # Set up of leds leds.aquisition(True) sleep(0.5) leds.aquisition(False) sleep(0.5) leds.aquisition(True) c = True it = 0 t_start = time.time() t_max = t_start + duration t = t_start # first sample: reading = frontend.read() datapoint = reading.channels() p_data_o.send(datapoint) t_next = t + sample_time t_chk_msg = t + time_msg_in # sampling loop: while c and t < t_max: t = time.time() if python_clock: if t <= t_next: time.sleep(t_next - t) t_next += sample_time reading = frontend.read() else: reading = frontend.wait_new_data() datapoint = reading.channels() p_data_o.send(datapoint) # Check for messages if t >= t_chk_msg: t_chk_msg = t + time_msg_in if p_msg_io.poll(): message = p_msg_io.recv() if message == 'STOP': c = False it += 1 t = time.time() tot = (t - t_start) / it p_msg_io.send(("PRT", f"Average frequency: {1 / tot} Hz for {it} samples")) leds.aquisition(False) finally: leds.close() frontend.close() p_msg_io.send('STOP') p_msg_io.close() p_data_o.close() class Capture: def __init__(self): # {now.strftime('%m_%d_%Y_%H_%M_%S')} self.filename = EDF_PATH / 'recording.edf' self._p_capture = None self.__capture_on = False self.frequency = 250 self.duration = 10 self.power_line = 60 self.polyak_mean = 0.1 self.polyak_std = 0.001 self.epsilon = 0.000001 self.custom_fir = False self.custom_fir_order = 10 self.custom_fir_cutoff = 30 self.filter = True self.record = False self.lsl = False self.display = False self.python_clock = True self.edf_writer = None self.edf_buffer = [] self.nb_signals = 8 self.samples_per_datarecord_array = self.frequency self.physical_max = 5 self.physical_min = -5 self.signal_labels = ['ch1', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6', 'ch7', 'ch8'] self._lock_msg_out = Lock() self._msg_out = None self._t_capture = None self.channel_states = ['disabled', 'disabled', 'disabled', 'disabled', 'disabled', 'disabled', 'disabled', 'disabled'] # widgets =============================== # CHANNELS ------------------------------ self.b_radio_ch1 = widgets.RadioButtons( options=['disabled', 'simple', 'with bias', 'bias out'], value='disabled', disabled=True ) self.b_radio_ch2 = widgets.RadioButtons( options=['disabled', 'simple', 'with bias', 'bias out'], value='disabled', disabled=False ) self.b_radio_ch3 = widgets.RadioButtons( options=['disabled', 'simple', 'with bias', 'bias out'], value='disabled', disabled=False ) self.b_radio_ch4 = widgets.RadioButtons( options=['disabled', 'simple', 'with bias', 'bias out'], value='disabled', disabled=False ) self.b_radio_ch5 = widgets.RadioButtons( options=['disabled', 'simple', 'with bias', 'bias out'], value='disabled', disabled=False ) self.b_radio_ch6 = widgets.RadioButtons( options=['disabled', 'simple', 'with bias', 'bias out'], value='disabled', disabled=False ) self.b_radio_ch7 = widgets.RadioButtons( options=['disabled', 'simple', 'with bias', 'bias out'], value='disabled', disabled=False ) self.b_radio_ch8 = widgets.RadioButtons( options=['disabled', 'simple', 'with bias', 'bias out'], value='disabled', disabled=True ) self.b_accordion_channels = widgets.Accordion( children=[ widgets.GridBox([ widgets.Label('CH1'), widgets.Label('CH2'), widgets.Label('CH3'), widgets.Label('CH4'), widgets.Label('CH5'), widgets.Label('CH6'), widgets.Label('CH7'), widgets.Label('CH8'), self.b_radio_ch1, self.b_radio_ch2, self.b_radio_ch3, self.b_radio_ch4, self.b_radio_ch5, self.b_radio_ch6, self.b_radio_ch7, self.b_radio_ch8 ], layout=widgets.Layout(grid_template_columns="repeat(8, 90px)")) ]) self.b_accordion_channels.set_title(index = 0, title = 'Channels') # OTHERS ------------------------------ self.b_capture = widgets.ToggleButtons( options=['Stop', 'Start'], description='Capture:', disabled=False, button_style='', # 'success', 'info', 'warning', 'danger' or '' tooltips=['Stop capture', 'Start capture'], ) self.b_clock = widgets.ToggleButtons( options=['Coral', 'ADS'], description='Clock:', disabled=False, button_style='', # 'success', 'info', 'warning', 'danger' or '' tooltips=['Use Coral clock (very precise, not very timely)', 'Use ADS clock (not very precise, very timely)'], ) self.b_power_line = widgets.ToggleButtons( options=['60 Hz', '50 Hz'], description='Power line:', disabled=False, button_style='', # 'success', 'info', 'warning', 'danger' or '' tooltips=['North America 60 Hz', 'Europe 50 Hz'], ) self.b_custom_fir = widgets.ToggleButtons( options=['Default', 'Custom'], description='FIR filter:', disabled=False, button_style='', # 'success', 'info', 'warning', 'danger' or '' tooltips=['Use the default 30Hz low-pass FIR from the Portiloop paper', 'Use a custom FIR'], ) self.b_filename = widgets.Text( value='recording.edf', description='Recording:', disabled=False ) self.b_frequency = widgets.IntText( value=self.frequency, description='Freq (Hz):', disabled=False ) self.b_polyak_mean = widgets.FloatText( value=self.polyak_mean, description='Polyak mean:', disabled=False ) self.b_polyak_std = widgets.FloatText( value=self.polyak_std, description='Polyak std:', disabled=False ) self.b_epsilon = widgets.FloatText( value=self.epsilon, description='Epsilon:', disabled=False ) self.b_custom_fir_order = widgets.IntText( value=self.custom_fir_order, description='FIR order:', disabled=True ) self.b_custom_fir_cutoff = widgets.IntText( value=self.custom_fir_cutoff, description='FIR cutoff:', disabled=True ) self.b_accordion_filter = widgets.Accordion( children=[ widgets.VBox([ self.b_custom_fir, self.b_custom_fir_order, self.b_custom_fir_cutoff, self.b_polyak_mean, self.b_polyak_std, self.b_epsilon ]) ]) self.b_accordion_filter.set_title(index = 0, title = 'Filtering') self.b_duration = widgets.IntText( value=self.duration, description='Time (s):', disabled=False ) self.b_filter = widgets.Checkbox( value=self.filter, description='Filter', disabled=False, indent=False ) self.b_record = widgets.Checkbox( value=self.record, description='Record EDF', disabled=False, indent=False ) self.b_lsl = widgets.Checkbox( value=self.lsl, description='Stream LSL', disabled=False, indent=False ) self.b_display = widgets.Checkbox( value=self.display, description='Display', disabled=False, indent=False ) # CALLBACKS ---------------------- self.b_capture.observe(self.on_b_capture, 'value') self.b_clock.observe(self.on_b_clock, 'value') self.b_frequency.observe(self.on_b_frequency, 'value') self.b_duration.observe(self.on_b_duration, 'value') self.b_filter.observe(self.on_b_filter, 'value') self.b_record.observe(self.on_b_record, 'value') self.b_lsl.observe(self.on_b_lsl, 'value') self.b_display.observe(self.on_b_display, 'value') self.b_filename.observe(self.on_b_filename, 'value') self.b_radio_ch2.observe(self.on_b_radio_ch2, 'value') self.b_radio_ch3.observe(self.on_b_radio_ch3, 'value') self.b_radio_ch4.observe(self.on_b_radio_ch4, 'value') self.b_radio_ch5.observe(self.on_b_radio_ch5, 'value') self.b_radio_ch6.observe(self.on_b_radio_ch6, 'value') self.b_radio_ch7.observe(self.on_b_radio_ch7, 'value') self.b_power_line.observe(self.on_b_power_line, 'value') self.b_custom_fir.observe(self.on_b_custom_fir, 'value') self.b_custom_fir_order.observe(self.on_b_custom_fir_order, 'value') self.b_custom_fir_cutoff.observe(self.on_b_custom_fir_cutoff, 'value') self.b_polyak_mean.observe(self.on_b_polyak_mean, 'value') self.b_polyak_std.observe(self.on_b_polyak_std, 'value') self.b_epsilon.observe(self.on_b_epsilon, 'value') self.display_buttons() def __del__(self): self.b_capture.close() def display_buttons(self): display(widgets.VBox([self.b_accordion_channels, self.b_frequency, self.b_duration, self.b_filename, self.b_power_line, self.b_clock, widgets.HBox([self.b_filter, self.b_record, self.b_lsl, self.b_display]), self.b_accordion_filter, self.b_capture])) def enable_buttons(self): self.b_frequency.disabled = False self.b_duration.disabled = False self.b_filename.disabled = False self.b_filter.disabled = False self.b_record.disabled = False self.b_record.lsl = False self.b_display.disabled = False self.b_clock.disabled = False self.b_radio_ch2.disabled = False self.b_radio_ch3.disabled = False self.b_radio_ch4.disabled = False self.b_radio_ch5.disabled = False self.b_radio_ch6.disabled = False self.b_radio_ch7.disabled = False self.b_power_line.disabled = False self.b_polyak_mean.disabled = False self.b_polyak_std.disabled = False self.b_epsilon.disabled = False self.b_custom_fir.disabled = False self.b_custom_fir_order.disabled = not self.custom_fir self.b_custom_fir_cutoff.disabled = not self.custom_fir def disable_buttons(self): self.b_frequency.disabled = True self.b_duration.disabled = True self.b_filename.disabled = True self.b_filter.disabled = True self.b_record.disabled = True self.b_record.lsl = True self.b_display.disabled = True self.b_clock.disabled = True self.b_radio_ch2.disabled = True self.b_radio_ch3.disabled = True self.b_radio_ch4.disabled = True self.b_radio_ch5.disabled = True self.b_radio_ch6.disabled = True self.b_radio_ch7.disabled = True self.b_power_line.disabled = True self.b_polyak_mean.disabled = True self.b_polyak_std.disabled = True self.b_epsilon.disabled = True self.b_custom_fir.disabled = True self.b_custom_fir_order.disabled = True self.b_custom_fir_cutoff.disabled = True def on_b_radio_ch2(self, value): self.channel_states[1] = value['new'] def on_b_radio_ch3(self, value): self.channel_states[2] = value['new'] def on_b_radio_ch4(self, value): self.channel_states[3] = value['new'] def on_b_radio_ch5(self, value): self.channel_states[4] = value['new'] def on_b_radio_ch6(self, value): self.channel_states[5] = value['new'] def on_b_radio_ch7(self, value): self.channel_states[6] = value['new'] def on_b_capture(self, value): val = value['new'] if val == 'Start': clear_output() self.disable_buttons() if not self.python_clock: # ADS clock: force the frequency to an ADS-compatible frequency self.frequency = to_ads_frequency(self.frequency) self.b_frequency.value = self.frequency self.display_buttons() with self._lock_msg_out: self._msg_out = None if self._t_capture is not None: warnings.warn("Capture already running, operation aborted.") return self._t_capture = Thread(target=self.start_capture, args=(self.filter, self.record, self.lsl, self.display, 500, self.python_clock)) self._t_capture.start() elif val == 'Stop': with self._lock_msg_out: self._msg_out = 'STOP' assert self._t_capture is not None self._t_capture.join() self._t_capture = None self.enable_buttons() def on_b_custom_fir(self, value): val = value['new'] if val == 'Default': self.custom_fir = False elif val == 'Custom': self.custom_fir = True self.enable_buttons() def on_b_clock(self, value): val = value['new'] if val == 'Coral': self.python_clock = True elif val == 'ADS': self.python_clock = False def on_b_power_line(self, value): val = value['new'] if val == '60 Hz': self.power_line = 60 elif val == '50 Hz': self.python_clock = 50 def on_b_frequency(self, value): val = value['new'] if val > 0: self.frequency = val else: self.b_frequency.value = self.frequency def on_b_filename(self, value): val = value['new'] if val != '': if not val.endswith('.edf'): val += '.edf' self.filename = EDF_PATH / val else: now = datetime.now() self.filename = EDF_PATH / 'recording.edf' def on_b_duration(self, value): val = value['new'] if val > 0: self.duration = val def on_b_custom_fir_order(self, value): val = value['new'] if val > 0: self.custom_fir_order = val else: self.b_custom_fir_order.value = self.custom_fir_order def on_b_custom_fir_cutoff(self, value): val = value['new'] if val > 0 and val < self.frequency / 2: self.custom_fir_cutoff = val else: self.b_custom_fir_cutoff.value = self.custom_fir_cutoff def on_b_polyak_mean(self, value): val = value['new'] if val >= 0 and val <= 1: self.polyak_mean = val else: self.b_polyak_mean.value = self.polyak_mean def on_b_polyak_std(self, value): val = value['new'] if val >= 0 and val <= 1: self.polyak_std = val else: self.b_polyak_std.value = self.polyak_std def on_b_epsilon(self, value): val = value['new'] if val > 0 and val < 0.1: self.epsilon = val else: self.b_epsilon.value = self.epsilon def on_b_filter(self, value): val = value['new'] self.filter = val def on_b_record(self, value): val = value['new'] self.record = val def on_b_lsl(self, value): val = value['new'] self.lsl = val def on_b_display(self, value): val = value['new'] self.display = val def open_recording_file(self): nb_signals = self.nb_signals samples_per_datarecord_array = self.samples_per_datarecord_array physical_max = self.physical_max physical_min = self.physical_min signal_labels = self.signal_labels print(f"Will store edf recording in {self.filename}") self.edf_writer = EDFwriter(p_path=str(self.filename), f_file_type=EDFwriter.EDFLIB_FILETYPE_EDFPLUS, number_of_signals=nb_signals) for signal in range(nb_signals): assert self.edf_writer.setSampleFrequency(signal, samples_per_datarecord_array) == 0 assert self.edf_writer.setPhysicalMaximum(signal, physical_max) == 0 assert self.edf_writer.setPhysicalMinimum(signal, physical_min) == 0 assert self.edf_writer.setDigitalMaximum(signal, 32767) == 0 assert self.edf_writer.setDigitalMinimum(signal, -32768) == 0 assert self.edf_writer.setSignalLabel(signal, signal_labels[signal]) == 0 assert self.edf_writer.setPhysicalDimension(signal, 'V') == 0 def close_recording_file(self): assert self.edf_writer.close() == 0 def add_recording_data(self, data): self.edf_buffer += data if len(self.edf_buffer) >= self.samples_per_datarecord_array: datarecord_array = self.edf_buffer[:self.samples_per_datarecord_array] self.edf_buffer = self.edf_buffer[self.samples_per_datarecord_array:] datarecord_array = np.array(datarecord_array).transpose() assert len(datarecord_array) == self.nb_signals, f"len(data)={len(data)}!={self.nb_signals}" for d in datarecord_array: assert len(d) == self.samples_per_datarecord_array, f"{len(d)}!={self.samples_per_datarecord_array}" assert self.edf_writer.writeSamples(d) == 0 def start_capture(self, filter, record, lsl, viz, width, python_clock): if self.__capture_on: warnings.warn("Capture is already ongoing, ignoring command.") return else: self.__capture_on = True p_msg_io, p_msg_io_2 = mp.Pipe() p_data_i, p_data_o = mp.Pipe(duplex=False) SAMPLE_TIME = 1 / self.frequency if filter: fp = FilterPipeline(nb_channels=8, sampling_rate=self.frequency, power_line_fq=self.power_line, use_custom_fir=False, custom_fir_order=10, custom_fir_cutoff=30, alpha_avg=0.1, alpha_std=0.001, epsilon=0.000001) self._p_capture = mp.Process(target=_capture_process, args=(p_data_o, p_msg_io_2, self.duration, self.frequency, python_clock, 1.0, self.channel_states) ) self._p_capture.start() # print(f"PID capture: {self._p_capture.pid}") if viz: live_disp = LiveDisplay(channel_names = self.signal_labels, window_len=width) if record: self.open_recording_file() if lsl: from pylsl import StreamInfo, StreamOutlet lsl_info = StreamInfo(name='Portiloop', type='EEG', channel_count=8, channel_format='float32', source_id='') # TODO: replace this by unique device identifier lsl_outlet = StreamOutlet(lsl_info) buffer = [] while True: with self._lock_msg_out: if self._msg_out is not None: p_msg_io.send(self._msg_out) self._msg_out = None if p_msg_io.poll(): mess = p_msg_io.recv() if mess == 'STOP': break elif mess[0] == 'PRT': print(mess[1]) # retrieve all data points from p_data and put them in a list of np.array: point = None if p_data_i.poll(timeout=SAMPLE_TIME): point = p_data_i.recv() else: continue n_array = np.array([point]) n_array = filter_np(n_array) if filter: n_array = fp.filter(n_array) filtered_point = n_array.tolist() if lsl: lsl_outlet.push_sample(filtered_point[-1]) buffer += filtered_point if len(buffer) >= 50: if viz: live_disp.add_datapoints(buffer) if record: self.add_recording_data(buffer) buffer = [] # empty pipes while True: if p_data_i.poll(): _ = p_data_i.recv() elif p_msg_io.poll(): _ = p_msg_io.recv() else: break p_data_i.close() p_msg_io.close() if record: self.close_recording_file() self._p_capture.join() self.__capture_on = False if __name__ == "__main__": pass