File size: 3,909 Bytes
a64c5cc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from EDFlib.edfwriter import EDFwriter
from portilooplot.jupyter_plot import ProgressPlot
from pathlib import Path
import numpy as np

EDF_PATH = Path.home() / 'workspace' / 'edf_recording'



class DummyAlsaMixer:
    def __init__(self):
        self.volume = 50
    
    def getvolume(self):
        return [self.volume]
    
    def setvolume(self, volume):
        self.volume = volume   


class EDFRecorder:
    def __init__(self, signal_labels):
        self.filename = EDF_PATH / 'recording.edf'
        self.nb_signals = 8
        self.samples_per_datarecord_array = self.frequency
        self.physical_max = 5
        self.physical_min = -5
        self.signal_labels = signal_labels

    def open_recording_file(self):
        nb_signals = self.nb_signals
        samples_per_datarecord_array = self.samples_per_datarecord_array
        physical_max = self.physical_max
        physical_min = self.physical_min
        signal_labels = self.signal_labels

        print(f"Will store edf recording in {self.filename}")

        self.edf_writer = EDFwriter(p_path=str(self.filename),
                                    f_file_type=EDFwriter.EDFLIB_FILETYPE_EDFPLUS,
                                    number_of_signals=nb_signals)
        
        for signal in range(nb_signals):
            assert self.edf_writer.setSampleFrequency(signal, samples_per_datarecord_array) == 0
            assert self.edf_writer.setPhysicalMaximum(signal, physical_max) == 0
            assert self.edf_writer.setPhysicalMinimum(signal, physical_min) == 0
            assert self.edf_writer.setDigitalMaximum(signal, 32767) == 0
            assert self.edf_writer.setDigitalMinimum(signal, -32768) == 0
            assert self.edf_writer.setSignalLabel(signal, signal_labels[signal]) == 0
            assert self.edf_writer.setPhysicalDimension(signal, 'V') == 0

    def close_recording_file(self):
        assert self.edf_writer.close() == 0
    
    def add_recording_data(self, data):
        self.edf_buffer += data
        if len(self.edf_buffer) >= self.samples_per_datarecord_array:
            datarecord_array = self.edf_buffer[:self.samples_per_datarecord_array]
            self.edf_buffer = self.edf_buffer[self.samples_per_datarecord_array:]
            datarecord_array = np.array(datarecord_array).transpose()
            assert len(datarecord_array) == self.nb_signals, f"len(data)={len(data)}!={self.nb_signals}"
            for d in datarecord_array:
                assert len(d) == self.samples_per_datarecord_array, f"{len(d)}!={self.samples_per_datarecord_array}"
                assert self.edf_writer.writeSamples(d) == 0


class LiveDisplay():
    def __init__(self, channel_names, window_len=100):
        self.datapoint_dim = len(channel_names)
        self.history = []
        self.pp = ProgressPlot(plot_names=channel_names, max_window_len=window_len)
        self.matplotlib = False

    def add_datapoints(self, datapoints):
        """
        Adds 8 lists of datapoints to the plot
        
        Args:
            datapoints: list of 8 lists of floats (or list of 8 floats)
        """
        if self.matplotlib:
            import matplotlib.pyplot as plt
        disp_list = []
        for datapoint in datapoints:
            d = [[elt] for elt in datapoint]
            disp_list.append(d)
            
            if self.matplotlib:
                self.history += d[1]
        
        if not self.matplotlib:
            self.pp.update_with_datapoints(disp_list)
        elif len(self.history) == 1000:
            plt.plot(self.history)
            plt.show()
            self.history = []
    
    def add_datapoint(self, datapoint):
        disp_list = [[elt] for elt in datapoint]
        self.pp.update(disp_list)


class FileReader:
    def __init__(self, filename):
        raise NotImplementedError

    def get_point(self):
        raise NotImplementedError