import os import queue import sys import time import matplotlib.pyplot as plt import numpy as np import sounddevice as sd import soundfile as sf from gpiozero import Buzzer from scipy import signal from scipy.io import wavfile buzzer = Buzzer(2) assert np # its needed for a package but we don't use it # MEANT TO BE RUN ON THE RASPBERRY PI # option to delete old files if os.path.exists("rec1.wav") | os.path.exists("rec2.wav") | os.path.exists("rec3.wav"): if input("Delete old files? (y/n)") == "y": os.remove("rec1.wav") os.remove("rec2.wav") os.remove("rec3.wav") q1 = queue.Queue() q2 = queue.Queue() q3 = queue.Queue() # has to be 48000 for some reason otherwise crashes with pipewire fs = 48000 # Hz sd.default.samplerate = fs print(sd.query_devices()) # functions for the instantiated object to run every loop def callback1(indata, frames, time, status): """This is called (from a separate thread) for each audio block.""" if status: print(status, file=sys.stderr) q1.put(indata.copy()) def callback2(indata, frames, time, status): """This is called (from a separate thread) for each audio block.""" if status: print(status, file=sys.stderr) q2.put(indata.copy()) def callback3(indata, frames, time, status): """This is called (from a separate thread) for each audio block.""" if status: print(status, file=sys.stderr) q3.put(indata.copy()) # calculate delay for the buzzer syncing def calc_delay(file1, file2, file3): # read data data1 = wavfile.read(file1)[1] data2 = wavfile.read(file2)[1] data3 = wavfile.read(file3)[1] peak1, _ = signal.find_peaks(data1, height=7000) peak2, _ = signal.find_peaks(data2, height=7000) peak3, _ = signal.find_peaks(data3, height=7000) if peak1.size == 0 or peak2.size == 0 or peak3.size == 0: # throw error raise Exception("No peak found, change the height value") for i in peak1: frame1 = i if frame1 / fs > 1: break for i in peak2: frame2 = i if frame2 / fs > 1: break for i in peak3: frame3 = i if frame3 / fs > 1: break return [max([frame1, frame2, frame3]) - x for x in [frame1, frame2, frame3]] # find order of microphones def find_order(file1, file2, file3): data1 = wavfile.read(file1)[1] data2 = wavfile.read(file2)[1] data3 = wavfile.read(file3)[1] peak1, _ = signal.find_peaks(data1, height=7000) peak2, _ = signal.find_peaks(data2, height=7000) peak3, _ = signal.find_peaks(data3, height=7000) if peak1.size == 0 or peak2.size == 0 or peak3.size == 0: # throw error raise Exception( "No peak found, change the height value or try again but louder" ) # plot the peaks # plt.plot(data1) # plt.plot(peak1, data1[peak1], "x") # plt.plot(data2) # plt.plot(peak2, data2[peak2], "x") # plt.plot(data3) # plt.plot(peak3, data3[peak3], "x") # plt.savefig("flick.png") # TODO peak functions return an array so you need to find the flicks only somehow return [ x[0] for x in sorted( [[1, np.average(peak1)], [2, np.average(peak2)], [3, np.average(peak3)]], key=lambda x: x[1], ) ] # record function def record( buzzer_sync=False, order_mics=False, duration=0.0, delay=[0, 0, 0], file_name_1="rec1.wav", file_name_2="rec2.wav", file_name_3="rec3.wav", ): # TODO may be the thing causing the weird length try: with sf.SoundFile( file_name_1, mode="x", samplerate=fs, channels=1 ) as file1, sf.SoundFile( file_name_2, mode="x", samplerate=fs, channels=1 ) as file2, sf.SoundFile( file_name_3, mode="x", samplerate=fs, channels=1 ) as file3: start_time = time.time() if delay != [0, 0, 0]: for sample in np.zeros(delay[0]): file1.write(sample) for sample in np.zeros(delay[1]): file2.write(sample) for sample in np.zeros(delay[2]): file3.write(sample) with sd.InputStream( samplerate=fs, device=1, channels=1, callback=callback1 ), sd.InputStream( samplerate=fs, device=2, channels=1, callback=callback2 ), sd.InputStream( samplerate=fs, device=3, channels=1, callback=callback3 ): if duration != 0.0: print("Recording for " + str(duration) + " seconds") else: print("press Ctrl+C to stop the recording") while True: # TODO on the data collection the length is inconsistent but fine everywhere else # may be the delay loops dur_time = time.time() - start_time file1.write(q1.get()) file2.write(q2.get()) file3.write(q3.get()) if buzzer_sync: if dur_time >= 1.2: buzzer.off() elif dur_time >= 1: buzzer.on() if duration != 0.0 and dur_time >= duration: # i think this might be the error along with the delay raise KeyboardInterrupt except KeyboardInterrupt: print("\nRecording finished: ") # find peak if buzzer_sync: return calc_delay(file_name_1, file_name_2, file_name_3) # flush the queues if order_mics: print("Order: ", find_order(file_name_1, file_name_2, file_name_3)) # data collection def collect_data(delay_times): # TODO make some loop with delay and file name option record function to collect data no = 0 # Core loop 10 times around, nothing twice, unrelated talking twice # loop the core loop with location, background noise, music, talking, nothing location = "" background = "" for i2 in range(4): # location level loop if i2 == 0: location = "my room" if i2 == 1: location = "living room" if i2 == 2: location = "living room coffee table" if i2 == 3: location = "other room" for j in range(6): # background stuff level loop if j == 0: background = "nothing" if j == 1: background = "one music" if j == 2: background = "one talking" if j == 3: background = "one talking and one music" if j == 4: background = "two talking" if j == 5: background = "two music" if j == 6: background = "random coughing or clapping" for i in range(10): # skip if file already exists if i <= 5: if os.path.exists(f"rec_{no}_a_t.wav"): no += 1 continue else: if os.path.exists(f"rec_{no}_a_f.wav"): no += 1 continue # buzzer beep buzzer.on() time.sleep(0.2) buzzer.off() # direction and label level loop if i == 0: print("#" * 80) print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) print("location: " + location) print("background: " + background) input("enter to start, hey at position 1:") buzzer.on() time.sleep(0.1) buzzer.off() print("#" * 10 + "recording started" + "#" * 10) record( duration=3, file_name_1=f"rec_{no}_a_t.wav", file_name_2=f"rec_{no}_b_t.wav", file_name_3=f"rec_{no}_c_t.wav", delay=delay_times, ) q1.queue.clear() q2.queue.clear() q3.queue.clear() no += 1 if i == 1: print("#" * 80) print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) print("location: " + location) print("background: " + background) input("enter to start, hey at position 1.5:") buzzer.on() time.sleep(0.1) buzzer.off() print("#" * 10 + "recording started" + "#" * 10) record( duration=3, file_name_1=f"rec_{no}_a_t.wav", file_name_2=f"rec_{no}_b_t.wav", file_name_3=f"rec_{no}_c_t.wav", delay=delay_times, ) q1.queue.clear() q2.queue.clear() q3.queue.clear() no += 1 if i == 2: print("#" * 80) print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) print("location: " + location) print("background: " + background) input("enter to start, hey at position 2:") buzzer.on() time.sleep(0.1) buzzer.off() print("#" * 10 + "recording started" + "#" * 10) record( duration=3, file_name_1=f"rec_{no}_a_t.wav", file_name_2=f"rec_{no}_b_t.wav", file_name_3=f"rec_{no}_c_t.wav", delay=delay_times, ) q1.queue.clear() q2.queue.clear() q3.queue.clear() no += 1 if i == 3: print("#" * 80) print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) print("location: " + location) print("background: " + background) input("enter to start, hey at position 2.5:") buzzer.on() time.sleep(0.1) buzzer.off() print("#" * 10 + "recording started" + "#" * 10) record( duration=3, file_name_1=f"rec_{no}_a_t.wav", file_name_2=f"rec_{no}_b_t.wav", file_name_3=f"rec_{no}_c_t.wav", delay=delay_times, ) q1.queue.clear() q2.queue.clear() q3.queue.clear() no += 1 if i == 4: print("#" * 80) print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) print("location: " + location) print("background: " + background) input("enter to start, hey at position 3:") buzzer.on() time.sleep(0.1) buzzer.off() print("#" * 10 + "recording started" + "#" * 10) record( duration=3, file_name_1=f"rec_{no}_a_t.wav", file_name_2=f"rec_{no}_b_t.wav", file_name_3=f"rec_{no}_c_t.wav", delay=delay_times, ) q1.queue.clear() q2.queue.clear() q3.queue.clear() no += 1 if i == 5: print("#" * 80) print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) print("location: " + location) print("background: " + background) input("enter to start, hey at position 3.5 or 0.5:") buzzer.on() time.sleep(0.1) buzzer.off() print("#" * 10 + "recording started" + "#" * 10) record( duration=3, file_name_1=f"rec_{no}_a_t.wav", file_name_2=f"rec_{no}_b_t.wav", file_name_3=f"rec_{no}_c_t.wav", delay=delay_times, ) q1.queue.clear() q2.queue.clear() q3.queue.clear() no += 1 if i == 6: print("#" * 80) print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) print("location: " + location) print("background: " + background) input("enter to start, nothing:") buzzer.on() time.sleep(0.1) buzzer.off() print("#" * 10 + "recording started" + "#" * 10) record( duration=3, file_name_1=f"rec_{no}_a_f.wav", file_name_2=f"rec_{no}_b_f.wav", file_name_3=f"rec_{no}_c_f.wav", delay=delay_times, ) no += 1 if i == 7: print("#" * 80) print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) print("location: " + location) print("background: " + background) input("enter to start, nothing:") buzzer.on() time.sleep(0.1) buzzer.off() print("#" * 10 + "recording started" + "#" * 10) record( duration=3, file_name_1=f"rec_{no}_a_f.wav", file_name_2=f"rec_{no}_b_f.wav", file_name_3=f"rec_{no}_c_f.wav", delay=delay_times, ) q1.queue.clear() q2.queue.clear() q3.queue.clear() no += 1 if i == 8: print("#" * 80) print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) print("location: " + location) print("background: " + background) input("enter to start, random noise/talking at any position:") buzzer.on() time.sleep(0.1) buzzer.off() print("#" * 10 + "recording started" + "#" * 10) record( duration=3, file_name_1=f"rec_{no}_a_f.wav", file_name_2=f"rec_{no}_b_f.wav", file_name_3=f"rec_{no}_c_f.wav", delay=delay_times, ) q1.queue.clear() q2.queue.clear() q3.queue.clear() no += 1 if i == 9: print("#" * 80) print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) print("location: " + location) print("background: " + background) input("enter to start, random noise/talking at any position:") buzzer.on() time.sleep(0.1) buzzer.off() print("#" * 10 + "recording started" + "#" * 10) record( duration=3, file_name_1=f"rec_{no}_a_f.wav", file_name_2=f"rec_{no}_b_f.wav", file_name_3=f"rec_{no}_c_f.wav", delay=delay_times, ) q1.queue.clear() q2.queue.clear() q3.queue.clear() no += 1 # sync record print("#" * 80) print("calculating delay times") delay_times = record(buzzer_sync=True, duration=1.5) print("Delay times: ", delay_times) q1.queue.clear() q2.queue.clear() q3.queue.clear() os.remove("rec1.wav") os.remove("rec2.wav") os.remove("rec3.wav") print("#" * 80) print("#" * 80) print("calculating microphone locations") print("tap the microphones in order then ^c") # seems somewhat optional between uses, but can't guarantee anything with USB record( order_mics=True, delay=delay_times, ) q1.queue.clear() q2.queue.clear() q3.queue.clear() os.remove("rec1.wav") os.remove("rec2.wav") os.remove("rec3.wav") print("#" * 80) collect_data(delay_times)