Spaces:
Sleeping
Sleeping
from EDFlib.edfwriter import EDFwriter | |
from portilooplot.jupyter_plot import ProgressPlot | |
from pathlib import Path | |
import numpy as np | |
EDF_PATH = Path.home() / 'workspace' / 'edf_recording' | |
class DummyAlsaMixer: | |
def __init__(self): | |
self.volume = 50 | |
def getvolume(self): | |
return [self.volume] | |
def setvolume(self, volume): | |
self.volume = volume | |
class EDFRecorder: | |
def __init__(self, signal_labels): | |
self.filename = EDF_PATH / 'recording.edf' | |
self.nb_signals = 8 | |
self.samples_per_datarecord_array = self.frequency | |
self.physical_max = 5 | |
self.physical_min = -5 | |
self.signal_labels = signal_labels | |
def open_recording_file(self): | |
nb_signals = self.nb_signals | |
samples_per_datarecord_array = self.samples_per_datarecord_array | |
physical_max = self.physical_max | |
physical_min = self.physical_min | |
signal_labels = self.signal_labels | |
print(f"Will store edf recording in {self.filename}") | |
self.edf_writer = EDFwriter(p_path=str(self.filename), | |
f_file_type=EDFwriter.EDFLIB_FILETYPE_EDFPLUS, | |
number_of_signals=nb_signals) | |
for signal in range(nb_signals): | |
assert self.edf_writer.setSampleFrequency(signal, samples_per_datarecord_array) == 0 | |
assert self.edf_writer.setPhysicalMaximum(signal, physical_max) == 0 | |
assert self.edf_writer.setPhysicalMinimum(signal, physical_min) == 0 | |
assert self.edf_writer.setDigitalMaximum(signal, 32767) == 0 | |
assert self.edf_writer.setDigitalMinimum(signal, -32768) == 0 | |
assert self.edf_writer.setSignalLabel(signal, signal_labels[signal]) == 0 | |
assert self.edf_writer.setPhysicalDimension(signal, 'V') == 0 | |
def close_recording_file(self): | |
assert self.edf_writer.close() == 0 | |
def add_recording_data(self, data): | |
self.edf_buffer += data | |
if len(self.edf_buffer) >= self.samples_per_datarecord_array: | |
datarecord_array = self.edf_buffer[:self.samples_per_datarecord_array] | |
self.edf_buffer = self.edf_buffer[self.samples_per_datarecord_array:] | |
datarecord_array = np.array(datarecord_array).transpose() | |
assert len(datarecord_array) == self.nb_signals, f"len(data)={len(data)}!={self.nb_signals}" | |
for d in datarecord_array: | |
assert len(d) == self.samples_per_datarecord_array, f"{len(d)}!={self.samples_per_datarecord_array}" | |
assert self.edf_writer.writeSamples(d) == 0 | |
class LiveDisplay(): | |
def __init__(self, channel_names, window_len=100): | |
self.datapoint_dim = len(channel_names) | |
self.history = [] | |
self.pp = ProgressPlot(plot_names=channel_names, max_window_len=window_len) | |
self.matplotlib = False | |
def add_datapoints(self, datapoints): | |
""" | |
Adds 8 lists of datapoints to the plot | |
Args: | |
datapoints: list of 8 lists of floats (or list of 8 floats) | |
""" | |
if self.matplotlib: | |
import matplotlib.pyplot as plt | |
disp_list = [] | |
for datapoint in datapoints: | |
d = [[elt] for elt in datapoint] | |
disp_list.append(d) | |
if self.matplotlib: | |
self.history += d[1] | |
if not self.matplotlib: | |
self.pp.update_with_datapoints(disp_list) | |
elif len(self.history) == 1000: | |
plt.plot(self.history) | |
plt.show() | |
self.history = [] | |
def add_datapoint(self, datapoint): | |
disp_list = [[elt] for elt in datapoint] | |
self.pp.update(disp_list) | |
class FileReader: | |
def __init__(self, filename): | |
raise NotImplementedError | |
def get_point(self): | |
raise NotImplementedError | |