Spaces:
Sleeping
Sleeping
File size: 3,909 Bytes
a64c5cc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
from EDFlib.edfwriter import EDFwriter
from portilooplot.jupyter_plot import ProgressPlot
from pathlib import Path
import numpy as np
EDF_PATH = Path.home() / 'workspace' / 'edf_recording'
class DummyAlsaMixer:
def __init__(self):
self.volume = 50
def getvolume(self):
return [self.volume]
def setvolume(self, volume):
self.volume = volume
class EDFRecorder:
def __init__(self, signal_labels):
self.filename = EDF_PATH / 'recording.edf'
self.nb_signals = 8
self.samples_per_datarecord_array = self.frequency
self.physical_max = 5
self.physical_min = -5
self.signal_labels = signal_labels
def open_recording_file(self):
nb_signals = self.nb_signals
samples_per_datarecord_array = self.samples_per_datarecord_array
physical_max = self.physical_max
physical_min = self.physical_min
signal_labels = self.signal_labels
print(f"Will store edf recording in {self.filename}")
self.edf_writer = EDFwriter(p_path=str(self.filename),
f_file_type=EDFwriter.EDFLIB_FILETYPE_EDFPLUS,
number_of_signals=nb_signals)
for signal in range(nb_signals):
assert self.edf_writer.setSampleFrequency(signal, samples_per_datarecord_array) == 0
assert self.edf_writer.setPhysicalMaximum(signal, physical_max) == 0
assert self.edf_writer.setPhysicalMinimum(signal, physical_min) == 0
assert self.edf_writer.setDigitalMaximum(signal, 32767) == 0
assert self.edf_writer.setDigitalMinimum(signal, -32768) == 0
assert self.edf_writer.setSignalLabel(signal, signal_labels[signal]) == 0
assert self.edf_writer.setPhysicalDimension(signal, 'V') == 0
def close_recording_file(self):
assert self.edf_writer.close() == 0
def add_recording_data(self, data):
self.edf_buffer += data
if len(self.edf_buffer) >= self.samples_per_datarecord_array:
datarecord_array = self.edf_buffer[:self.samples_per_datarecord_array]
self.edf_buffer = self.edf_buffer[self.samples_per_datarecord_array:]
datarecord_array = np.array(datarecord_array).transpose()
assert len(datarecord_array) == self.nb_signals, f"len(data)={len(data)}!={self.nb_signals}"
for d in datarecord_array:
assert len(d) == self.samples_per_datarecord_array, f"{len(d)}!={self.samples_per_datarecord_array}"
assert self.edf_writer.writeSamples(d) == 0
class LiveDisplay():
def __init__(self, channel_names, window_len=100):
self.datapoint_dim = len(channel_names)
self.history = []
self.pp = ProgressPlot(plot_names=channel_names, max_window_len=window_len)
self.matplotlib = False
def add_datapoints(self, datapoints):
"""
Adds 8 lists of datapoints to the plot
Args:
datapoints: list of 8 lists of floats (or list of 8 floats)
"""
if self.matplotlib:
import matplotlib.pyplot as plt
disp_list = []
for datapoint in datapoints:
d = [[elt] for elt in datapoint]
disp_list.append(d)
if self.matplotlib:
self.history += d[1]
if not self.matplotlib:
self.pp.update_with_datapoints(disp_list)
elif len(self.history) == 1000:
plt.plot(self.history)
plt.show()
self.history = []
def add_datapoint(self, datapoint):
disp_list = [[elt] for elt in datapoint]
self.pp.update(disp_list)
class FileReader:
def __init__(self, filename):
raise NotImplementedError
def get_point(self):
raise NotImplementedError
|