Spaces:
Sleeping
Sleeping
from scipy.signal import butter, filtfilt, find_peaks | |
from scipy.signal import savgol_filter, find_peaks | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.cluster import KMeans | |
import plotly.graph_objects as go | |
import pandas as pd | |
import numpy as np | |
import librosa | |
import pywt | |
# GENERAL HELPER FUNCTIONS | |
def denoise_audio(audiodata: np.ndarray, sr: int) -> tuple[np.ndarray, int]: | |
""" | |
Enhanced denoising of audio signals optimized for heart sounds. | |
Uses a combination of bandpass filtering, adaptive wavelet denoising, | |
and improved spectral subtraction. | |
Parameters: | |
----------- | |
audiodata : np.ndarray | |
Input audio signal (1D numpy array) | |
sr : int | |
Sampling rate in Hz | |
Returns: | |
-------- | |
tuple[np.ndarray, int] | |
Tuple containing (denoised_signal, sampling_rate) | |
""" | |
# Input validation and conversion | |
if not isinstance(audiodata, np.ndarray) or audiodata.ndim != 1: | |
raise ValueError("audiodata must be a 1D numpy array") | |
if not isinstance(sr, int) or sr <= 0: | |
raise ValueError("sr must be a positive integer") | |
# Convert to float32 and normalize | |
audio = audiodata.astype(np.float32) | |
audio = audio / np.max(np.abs(audio)) | |
# 1. Enhanced Bandpass Filter | |
# Optimize frequency range for heart sounds (20-200 Hz) | |
nyquist = sr / 2 | |
low, high = 20 / nyquist, 200 / nyquist | |
order = 4 # Filter order | |
b, a = butter(order, [low, high], btype='band') | |
filtered = filtfilt(b, a, audio) | |
# 2. Adaptive Wavelet Denoising | |
def apply_wavelet_denoising(sig): | |
# Use sym4 wavelet (good for biomedical signals) | |
wavelet = 'sym4' | |
level = min(6, pywt.dwt_max_level(len(sig), pywt.Wavelet(wavelet).dec_len)) | |
# Decompose signal | |
coeffs = pywt.wavedec(sig, wavelet, level=level) | |
# Adaptive thresholding based on level | |
for i in range(1, len(coeffs)): | |
# Calculate level-dependent threshold | |
sigma = np.median(np.abs(coeffs[i])) / 0.6745 | |
threshold = sigma * np.sqrt(2 * np.log(len(coeffs[i]))) | |
# Adjust threshold based on decomposition level | |
level_factor = 1 - (i / len(coeffs)) # Higher levels get lower thresholds | |
coeffs[i] = pywt.threshold(coeffs[i], threshold * level_factor, mode='soft') | |
return pywt.waverec(coeffs, wavelet) | |
# Apply wavelet denoising | |
denoised = apply_wavelet_denoising(filtered) | |
# Ensure consistent length | |
if len(denoised) != len(audio): | |
denoised = librosa.util.fix_length(denoised, size=len(audio)) | |
# 3. Improved Spectral Subtraction | |
def spectral_subtract(sig): | |
# Parameters | |
frame_length = int(sr * 0.04) # 40ms frames | |
hop_length = frame_length // 2 | |
# Compute STFT | |
D = librosa.stft(sig, n_fft=frame_length, hop_length=hop_length) | |
mag, phase = np.abs(D), np.angle(D) | |
# Estimate noise spectrum from low-energy frames | |
frame_energy = np.sum(mag**2, axis=0) | |
noise_threshold = np.percentile(frame_energy, 15) | |
noise_frames = mag[:, frame_energy < noise_threshold] | |
if noise_frames.size > 0: | |
noise_spectrum = np.median(noise_frames, axis=1) | |
# Oversubtraction factor (frequency-dependent) | |
freq_bins = np.fft.rfftfreq(frame_length, 1/sr) | |
alpha = 1.0 + 0.01 * (freq_bins / nyquist) | |
alpha = alpha[:len(noise_spectrum)].reshape(-1, 1) | |
# Spectral subtraction with flooring | |
mag_clean = np.maximum(mag - alpha * noise_spectrum.reshape(-1, 1), 0.01 * mag) | |
# Reconstruct signal | |
D_clean = mag_clean * np.exp(1j * phase) | |
return librosa.istft(D_clean, hop_length=hop_length) | |
return sig | |
# Apply spectral subtraction | |
final = spectral_subtract(denoised) | |
# Final normalization | |
final = final / np.max(np.abs(final)) | |
return final, sr | |
def getaudiodata(filepath: str, target_sr: int = 16000) -> tuple[int, np.ndarray]: | |
""" | |
Load and process audio data with consistent output properties. | |
Parameters: | |
----------- | |
filepath : str | |
Path to the audio file | |
target_sr : int | |
Target sampling rate (default: 16000 Hz) | |
Returns: | |
-------- | |
tuple[int, np.ndarray] | |
Sampling rate and processed audio data with consistent properties: | |
- dtype: float32 | |
- shape: (N,) mono audio | |
- amplitude range: [-0.95, 0.95] | |
- no NaN or Inf values | |
- C-contiguous memory layout | |
""" | |
# Load audio with specified sampling rate | |
audiodata, sr = librosa.load(filepath, sr=target_sr) | |
# Ensure numpy array | |
audiodata = np.asarray(audiodata) | |
# Convert to mono if stereo | |
if len(audiodata.shape) > 1: | |
audiodata = np.mean(audiodata, axis=1) | |
# Handle any NaN or Inf values | |
audiodata = np.nan_to_num(audiodata, nan=0.0, posinf=0.0, neginf=0.0) | |
# Normalize to prevent clipping while maintaining relative amplitudes | |
max_abs = np.max(np.abs(audiodata)) | |
if max_abs > 0: # Avoid division by zero | |
audiodata = audiodata * (0.95 / max_abs) | |
# Ensure float32 dtype and memory contiguous | |
audiodata = np.ascontiguousarray(audiodata, dtype=np.float32) | |
return sr, audiodata | |
def getBeats(audiodata: np.ndarray, sr: int, method='envelope') -> tuple[float, np.ndarray, np.ndarray]: | |
""" | |
Advanced heartbeat detection optimized for peak detection with improved sensitivity. | |
Parameters: | |
----------- | |
audiodata : np.ndarray | |
Audio time series | |
sr : int | |
Sampling rate | |
method : str | |
Detection method: 'onset', 'envelope', 'fusion' (default) | |
Returns: | |
-------- | |
tempo : float | |
Estimated heart rate in BPM | |
peak_times : np.ndarray | |
Times of detected heartbeat peaks | |
cleaned_audio : np.ndarray | |
Cleaned audio signal | |
""" | |
# Denoise and normalize | |
audiodata, sr = denoise_audio(audiodata, sr) | |
# Normalize to prevent clipping while maintaining relative amplitudes | |
cleaned_audio = audiodata / np.max(np.abs(audiodata)) | |
def get_envelope_peaks(): | |
"""Detect peaks using enhanced envelope method with better sensitivity""" | |
# Calculate envelope using appropriate frame sizes | |
hop_length = int(sr * 0.01) # 10ms hop | |
frame_length = int(sr * 0.04) # 40ms window | |
# Calculate RMS energy | |
rms = librosa.feature.rms( | |
y=cleaned_audio, | |
frame_length=frame_length, | |
hop_length=hop_length | |
)[0] | |
# Smooth the envelope (less aggressive smoothing) | |
rms_smooth = savgol_filter(rms, 7, 3) | |
# Find peaks with more lenient thresholds | |
peaks, properties = find_peaks( | |
rms_smooth, | |
distance=int(0.2 * (sr / hop_length)), # Minimum 0.2s between peaks (300 BPM max) | |
height=np.mean(rms_smooth) + 0.1 * np.std(rms_smooth), # Lower height threshold | |
prominence=np.mean(rms_smooth) * 0.1, # Lower prominence threshold | |
width=(int(0.01 * (sr / hop_length)), int(0.2 * (sr / hop_length))) # 10-200ms width | |
) | |
# Refine peak locations using original signal | |
refined_peaks = [] | |
window_size = int(0.05 * sr) # 50ms window for refinement | |
for peak in peaks: | |
# Convert envelope peak to sample domain | |
sample_idx = peak * hop_length | |
# Define window boundaries | |
start = max(0, sample_idx - window_size//2) | |
end = min(len(cleaned_audio), sample_idx + window_size//2) | |
# Find the maximum amplitude within the window | |
window = np.abs(cleaned_audio[int(start):int(end)]) | |
max_idx = np.argmax(window) | |
refined_peaks.append(start + max_idx) | |
return np.array(refined_peaks), rms_smooth | |
def get_onset_peaks(): | |
"""Enhanced onset detection with better sensitivity""" | |
# Multi-band onset detection with adjusted parameters | |
onset_env = librosa.onset.onset_strength( | |
y=cleaned_audio, | |
sr=sr, | |
hop_length=256, # Smaller hop length for better temporal resolution | |
aggregate=np.median, | |
n_mels=128 | |
) | |
# More lenient thresholding | |
threshold = np.mean(onset_env) + 0.3 * np.std(onset_env) | |
# Get onset positions | |
onset_frames = librosa.onset.onset_detect( | |
onset_envelope=onset_env, | |
sr=sr, | |
hop_length=256, | |
backtrack=True, | |
threshold=threshold, | |
pre_max=20, # 20 frames before peak | |
post_max=20, # 20 frames after peak | |
pre_avg=25, # 25 frames before for mean | |
post_avg=25, # 25 frames after for mean | |
wait=10 # Wait 10 frames before detecting next onset | |
) | |
# Refine onset positions to peaks | |
refined_peaks = [] | |
window_size = int(0.05 * sr) # 50ms window | |
for frame in onset_frames: | |
# Convert frame to sample index | |
sample_idx = frame * 256 # Using hop_length=256 | |
# Define window boundaries | |
start = max(0, sample_idx - window_size//2) | |
end = min(len(cleaned_audio), sample_idx + window_size//2) | |
# Find the maximum amplitude within the window | |
window = np.abs(cleaned_audio[int(start):int(end)]) | |
max_idx = np.argmax(window) | |
refined_peaks.append(start + max_idx) | |
return np.array(refined_peaks), onset_env | |
# Apply selected method | |
if method == 'envelope': | |
peaks, _ = get_envelope_peaks() | |
elif method == 'onset': | |
peaks, _ = get_onset_peaks() | |
else: # fusion method | |
# Get peaks from both methods | |
env_peaks, _ = get_envelope_peaks() | |
onset_peaks, _ = get_onset_peaks() | |
# Merge nearby peaks (within 50ms) | |
all_peaks = np.sort(np.concatenate([env_peaks, onset_peaks])) | |
merged_peaks = [] | |
last_peak = -np.inf | |
for peak in all_peaks: | |
if (peak - last_peak) / sr > 0.05: # 50ms minimum separation | |
merged_peaks.append(peak) | |
last_peak = peak | |
peaks = np.array(merged_peaks) | |
# Convert peaks to times | |
peak_times = peaks / sr | |
# Calculate tempo using peak times | |
if len(peak_times) > 1: | |
# Use weighted average of intervals | |
intervals = np.diff(peak_times) | |
tempos = 60 / intervals # Convert intervals to BPM | |
# Remove physiologically impossible tempos | |
valid_tempos = tempos[(tempos >= 30) & (tempos <= 300)] | |
if len(valid_tempos) > 0: | |
tempo = np.median(valid_tempos) # Use median for robustness | |
else: | |
tempo = 0 | |
else: | |
tempo = 0 | |
return tempo, peak_times, cleaned_audio | |
def plotBeattimes(beattimes: np.ndarray, | |
audiodata: np.ndarray, | |
sr: int, | |
beattimes2: np.ndarray = None) -> go.Figure: | |
""" | |
Plot audio waveform with beat markers for one or two sets of beat times. | |
Parameters: | |
----------- | |
beattimes : np.ndarray | |
Primary array of beat times in seconds (S1 beats if beattimes2 is provided) | |
audiodata : np.ndarray | |
Audio time series data | |
sr : int | |
Sampling rate | |
beattimes2 : np.ndarray, optional | |
Secondary array of beat times in seconds (S2 beats) | |
Returns: | |
-------- | |
go.Figure | |
Plotly figure with waveform and beat markers | |
""" | |
# Calculate time array for the full audio | |
time = np.arange(len(audiodata)) / sr | |
# Create the figure | |
fig = go.Figure() | |
# Add waveform | |
fig.add_trace( | |
go.Scatter( | |
x=time, | |
y=audiodata, | |
mode='lines', | |
name='Waveform', | |
line=dict(color='blue', width=1) | |
) | |
) | |
# Process and plot primary beat times | |
if isinstance(beattimes[0], str): | |
beat_indices = np.round(np.array([float(bt.replace(',', '.')) for bt in beattimes]) * sr).astype(int) | |
else: | |
beat_indices = np.round(beattimes * sr).astype(int) | |
beat_indices = beat_indices[beat_indices < len(audiodata)] | |
beat_amplitudes = audiodata[beat_indices] | |
# Define beat name based on whether secondary beats are provided | |
beat_name = "Beats S1" if beattimes2 is not None else "Beats" | |
# Add primary beat markers | |
fig.add_trace( | |
go.Scatter( | |
x=beattimes[beat_indices < len(audiodata)], | |
y=beat_amplitudes, | |
mode='markers', | |
name=beat_name, | |
marker=dict( | |
color='red', | |
size=8, | |
symbol='circle', | |
line=dict(color='darkred', width=1) | |
) | |
) | |
) | |
# Add primary beat vertical lines | |
for beat_time in beattimes[beat_indices < len(audiodata)]: | |
fig.add_vline( | |
x=beat_time, | |
line=dict(color="rgba(255, 0, 0, 0.2)", width=1), | |
layer="below" | |
) | |
# Process and plot secondary beat times if provided | |
if beattimes2 is not None: | |
if isinstance(beattimes2[0], str): | |
beat_indices2 = np.round(np.array([float(bt.replace(',', '.')) for bt in beattimes2]) * sr).astype(int) | |
else: | |
beat_indices2 = np.round(beattimes2 * sr).astype(int) | |
beat_indices2 = beat_indices2[beat_indices2 < len(audiodata)] | |
beat_amplitudes2 = audiodata[beat_indices2] | |
# Add secondary beat markers | |
fig.add_trace( | |
go.Scatter( | |
x=beattimes2[beat_indices2 < len(audiodata)], | |
y=beat_amplitudes2, | |
mode='markers', | |
name="Beats S2", | |
marker=dict( | |
color='green', | |
size=8, | |
symbol='circle', | |
line=dict(color='darkgreen', width=1) | |
) | |
) | |
) | |
# Add secondary beat vertical lines | |
for beat_time in beattimes2[beat_indices2 < len(audiodata)]: | |
fig.add_vline( | |
x=beat_time, | |
line=dict(color="rgba(0, 255, 0, 0.2)", width=1), | |
layer="below" | |
) | |
# Update layout | |
fig.update_layout( | |
title="Audio Waveform with Beat Detection", | |
xaxis_title="Time (seconds)", | |
yaxis_title="Amplitude", | |
showlegend=True, # Changed to True to show beat types | |
hovermode='closest', | |
plot_bgcolor='white', | |
legend=dict( | |
yanchor="top", | |
y=0.99, | |
xanchor="left", | |
x=0.01 | |
) | |
) | |
return fig | |
def iterate_beat_segments(beat_times, sr, audio): | |
""" | |
Iterate over audio segments between beats marked with label 1. | |
Parameters: | |
- beat_times: df of beattimes and labels as DataFrame | |
- sr: Sample rate of the audio | |
- audio: np.ndarray of audio data | |
Yields: | |
- List of segment metrics with associated beat information | |
""" | |
# Get indices where label is 1 | |
label_ones = beat_times[beat_times['Label (S1=1/S2=0)'] == 1].index.tolist() | |
segment_metrics = [] | |
# Iterate through pairs of label 1 indices | |
for i in range(len(label_ones) - 1): | |
start_idx = label_ones[i] | |
end_idx = label_ones[i + 1] | |
# Get all beats between two label 1 beats (inclusive) | |
segment_beats = beat_times.iloc[start_idx:end_idx + 1] | |
# Create list of tuples (label, beattime) | |
beat_info = list(zip(segment_beats['Label (S1=1/S2=0)'], | |
segment_beats['Beattimes'])) | |
# Get start and end samples | |
start_sample = librosa.time_to_samples(segment_beats.iloc[0]['Beattimes'], sr=sr) | |
end_sample = librosa.time_to_samples(segment_beats.iloc[-1]['Beattimes'], sr=sr) | |
# Extract audio segment | |
segment = audio[start_sample:end_sample] | |
# Analyze segment with beat information if not empty | |
if len(segment) > 0: | |
segment_metrics.append(segment_analysis(segment, sr, beat_info)) | |
return segment_metrics | |
def segment_analysis(segment, sr, s1s2:list): | |
""" | |
Analyze an audio segment and compute various metrics. | |
Parameters: | |
- segment: np.ndarray of audio segment data | |
- sr: Sample rate of the audio | |
Returns: | |
- List of computed metrics | |
""" | |
# Duration | |
duration = len(segment) / sr | |
# RMS Energy | |
rms_energy = np.sqrt(np.mean(segment**2)) | |
# Calculate frequency spectrum and find dominant frequencies | |
fft = np.abs(np.fft.rfft(segment)) | |
freqs = np.fft.rfftfreq(len(segment), d=1/sr) | |
# Focus on frequency range typical for heart sounds (20-200 Hz) | |
mask = (freqs >= 20) & (freqs <= 200) | |
dominant_freq_idx = np.argmax(fft[mask]) | |
mean_frequency = freqs[mask][dominant_freq_idx] | |
s1_to_s2_duration = [] | |
s2_to_s1_duration = [] | |
prev = s1s2[0] | |
for i in range(1, len(s1s2)): | |
if prev[0] == 0 and s1s2[i][0] == 1: | |
s2_to_s1_duration.append(s1s2[i][1] - prev[1]) | |
elif prev[0] == 1 and s1s2[i][0] == 0: | |
s1_to_s2_duration.append(s1s2[i][1] - prev[1]) | |
prev = s1s2[i] | |
return { | |
"rms_energy": rms_energy, | |
"mean_frequency": mean_frequency, | |
"duration": duration, | |
"s1_to_s2_duration": s1_to_s2_duration, | |
"s2_to_s1_duration": s2_to_s1_duration, | |
"segment": segment | |
} | |
def find_s1s2(df:pd.DataFrame): | |
times = df['Beattimes'].to_numpy() | |
n_peaks = len(times) | |
# Initialize the feature array | |
feature_array = np.zeros((n_peaks, 4)) | |
# Fill in the peak times (first column) | |
feature_array[:, 0] = times | |
# Calculate and fill distances to previous peaks (second column) | |
feature_array[1:, 1] = np.diff(times) # For all except first peak | |
feature_array[0, 1] = feature_array[1, 1] # First peak uses same as second | |
# Calculate and fill distances to next peaks (third column) | |
feature_array[:-1, 2] = np.diff(times) # For all except last peak | |
feature_array[-1, 2] = feature_array[-2, 2] # Last peak uses same as second-to-last | |
# Extract features (distances to prev and next peaks) | |
X = feature_array[:, 1:3] | |
# Scale features | |
scaler = StandardScaler() | |
X_scaled = scaler.fit_transform(X) | |
# Apply K-means clustering | |
kmeans = KMeans(n_clusters=2, random_state=42) | |
labels = kmeans.fit_predict(X_scaled) | |
# Update the labels in the feature array | |
feature_array[:, 3] = labels | |
return feature_array | |
# ANALYZE | |
def compute_segment_metrics(beattimes: pd.DataFrame, sr: int, audio: np.ndarray): | |
beattimes[beattimes['Label (S1=1/S2=0)'] == 1] | |
segment_metrics = iterate_beat_segments(beattimes, sr, audio) | |
print("segment_metrics", segment_metrics) | |
return segment_metrics | |
def compute_hrv(s1_to_s2, s2_to_s1, sampling_rate): | |
""" | |
Compute Heart Rate Variability with debug statements | |
""" | |
# Convert to numpy arrays if not already | |
s1_to_s2 = np.array(s1_to_s2) | |
s2_to_s1 = np.array(s2_to_s1) | |
# Debug: Print input values | |
print("First few s1_to_s2 values:", s1_to_s2[:5]) | |
print("First few s2_to_s1 values:", s2_to_s1[:5]) | |
# Calculate RR intervals (full cardiac cycle) | |
rr_intervals = s1_to_s2 + s2_to_s1 | |
# Debug: Print RR intervals | |
print("First few RR intervals (samples):", rr_intervals[:5]) | |
# Convert to seconds | |
rr_intervals = rr_intervals / sampling_rate | |
print("First few RR intervals (seconds):", rr_intervals[:5]) | |
# Calculate cumulative time for each heartbeat | |
time = np.cumsum(rr_intervals) | |
# Calculate instantaneous heart rate | |
heart_rate = 60 / rr_intervals # beats per minute | |
print("First few heart rate values:", heart_rate[:5]) | |
# Compute RMSSD using a rolling window | |
window_size = int(30 / np.mean(rr_intervals)) # Approximate 30-second window | |
print("Window size:", window_size) | |
hrv_values = [] | |
for i in range(len(rr_intervals)): | |
window_start = max(0, i - window_size) | |
window_data = rr_intervals[window_start:i+1] | |
if len(window_data) > 1: | |
# Debug: Print window data occasionally | |
if i % 100 == 0: | |
print(f"\nWindow {i}:") | |
print("Window data:", window_data) | |
print("Successive differences:", np.diff(window_data)) | |
successive_diffs = np.diff(window_data) | |
rmssd = np.sqrt(np.mean(successive_diffs ** 2)) * 1000 # Convert to ms | |
hrv_values.append(rmssd) | |
else: | |
hrv_values.append(np.nan) | |
hrv_values = np.array(hrv_values) | |
# Debug: Print HRV statistics | |
print("\nHRV Statistics:") | |
print("Min HRV:", np.nanmin(hrv_values)) | |
print("Max HRV:", np.nanmax(hrv_values)) | |
print("Mean HRV:", np.nanmean(hrv_values)) | |
print("Number of valid HRV values:", np.sum(~np.isnan(hrv_values))) | |
# Remove potential NaN values at the start | |
valid_idx = ~np.isnan(hrv_values) | |
time = time[valid_idx] | |
hrv_values = hrv_values[valid_idx] | |
heart_rate = heart_rate[valid_idx] | |
return time, hrv_values, heart_rate |