Yann Bouteiller
A lot of reorg and development of phase detection
a64c5cc
raw
history blame
3.91 kB
from EDFlib.edfwriter import EDFwriter
from portilooplot.jupyter_plot import ProgressPlot
from pathlib import Path
import numpy as np
EDF_PATH = Path.home() / 'workspace' / 'edf_recording'
class DummyAlsaMixer:
def __init__(self):
self.volume = 50
def getvolume(self):
return [self.volume]
def setvolume(self, volume):
self.volume = volume
class EDFRecorder:
def __init__(self, signal_labels):
self.filename = EDF_PATH / 'recording.edf'
self.nb_signals = 8
self.samples_per_datarecord_array = self.frequency
self.physical_max = 5
self.physical_min = -5
self.signal_labels = signal_labels
def open_recording_file(self):
nb_signals = self.nb_signals
samples_per_datarecord_array = self.samples_per_datarecord_array
physical_max = self.physical_max
physical_min = self.physical_min
signal_labels = self.signal_labels
print(f"Will store edf recording in {self.filename}")
self.edf_writer = EDFwriter(p_path=str(self.filename),
f_file_type=EDFwriter.EDFLIB_FILETYPE_EDFPLUS,
number_of_signals=nb_signals)
for signal in range(nb_signals):
assert self.edf_writer.setSampleFrequency(signal, samples_per_datarecord_array) == 0
assert self.edf_writer.setPhysicalMaximum(signal, physical_max) == 0
assert self.edf_writer.setPhysicalMinimum(signal, physical_min) == 0
assert self.edf_writer.setDigitalMaximum(signal, 32767) == 0
assert self.edf_writer.setDigitalMinimum(signal, -32768) == 0
assert self.edf_writer.setSignalLabel(signal, signal_labels[signal]) == 0
assert self.edf_writer.setPhysicalDimension(signal, 'V') == 0
def close_recording_file(self):
assert self.edf_writer.close() == 0
def add_recording_data(self, data):
self.edf_buffer += data
if len(self.edf_buffer) >= self.samples_per_datarecord_array:
datarecord_array = self.edf_buffer[:self.samples_per_datarecord_array]
self.edf_buffer = self.edf_buffer[self.samples_per_datarecord_array:]
datarecord_array = np.array(datarecord_array).transpose()
assert len(datarecord_array) == self.nb_signals, f"len(data)={len(data)}!={self.nb_signals}"
for d in datarecord_array:
assert len(d) == self.samples_per_datarecord_array, f"{len(d)}!={self.samples_per_datarecord_array}"
assert self.edf_writer.writeSamples(d) == 0
class LiveDisplay():
def __init__(self, channel_names, window_len=100):
self.datapoint_dim = len(channel_names)
self.history = []
self.pp = ProgressPlot(plot_names=channel_names, max_window_len=window_len)
self.matplotlib = False
def add_datapoints(self, datapoints):
"""
Adds 8 lists of datapoints to the plot
Args:
datapoints: list of 8 lists of floats (or list of 8 floats)
"""
if self.matplotlib:
import matplotlib.pyplot as plt
disp_list = []
for datapoint in datapoints:
d = [[elt] for elt in datapoint]
disp_list.append(d)
if self.matplotlib:
self.history += d[1]
if not self.matplotlib:
self.pp.update_with_datapoints(disp_list)
elif len(self.history) == 1000:
plt.plot(self.history)
plt.show()
self.history = []
def add_datapoint(self, datapoint):
disp_list = [[elt] for elt in datapoint]
self.pp.update(disp_list)
class FileReader:
def __init__(self, filename):
raise NotImplementedError
def get_point(self):
raise NotImplementedError