|
"""Detect peaks in data based on their amplitude and other features.""" |
|
|
|
from __future__ import division, print_function |
|
import warnings |
|
import numpy as np |
|
|
|
__author__ = "Marcos Duarte, https://github.com/demotu" |
|
__version__ = "1.0.6" |
|
__license__ = "MIT" |
|
|
|
|
|
|
|
def detect_peaks(x, mph=None, mpd=1, threshold=0, edge='rising', |
|
kpsh=False, valley=False, show=False, ax=None, title=True): |
|
|
|
"""Detect peaks in data based on their amplitude and other features. |
|
|
|
Parameters |
|
---------- |
|
x : 1D array_like |
|
data. |
|
mph : {None, number}, optional (default = None) |
|
detect peaks that are greater than minimum peak height (if parameter |
|
`valley` is False) or peaks that are smaller than maximum peak height |
|
(if parameter `valley` is True). |
|
mpd : positive integer, optional (default = 1) |
|
detect peaks that are at least separated by minimum peak distance (in |
|
number of data). |
|
threshold : positive number, optional (default = 0) |
|
detect peaks (valleys) that are greater (smaller) than `threshold` |
|
in relation to their immediate neighbors. |
|
edge : {None, 'rising', 'falling', 'both'}, optional (default = 'rising') |
|
for a flat peak, keep only the rising edge ('rising'), only the |
|
falling edge ('falling'), both edges ('both'), or don't detect a |
|
flat peak (None). |
|
kpsh : bool, optional (default = False) |
|
keep peaks with same height even if they are closer than `mpd`. |
|
valley : bool, optional (default = False) |
|
if True (1), detect valleys (local minima) instead of peaks. |
|
show : bool, optional (default = False) |
|
if True (1), plot data in matplotlib figure. |
|
ax : a matplotlib.axes.Axes instance, optional (default = None). |
|
title : bool or string, optional (default = True) |
|
if True, show standard title. If False or empty string, doesn't show |
|
any title. If string, shows string as title. |
|
|
|
Returns |
|
------- |
|
ind : 1D array_like |
|
indeces of the peaks in `x`. |
|
|
|
Notes |
|
----- |
|
The detection of valleys instead of peaks is performed internally by simply |
|
negating the data: `ind_valleys = detect_peaks(-x)` |
|
|
|
The function can handle NaN's |
|
|
|
See this IPython Notebook [1]_. |
|
|
|
References |
|
---------- |
|
.. [1] http://nbviewer.ipython.org/github/demotu/BMC/blob/master/notebooks/DetectPeaks.ipynb |
|
|
|
Examples |
|
-------- |
|
>>> from detect_peaks import detect_peaks |
|
>>> x = np.random.randn(100) |
|
>>> x[60:81] = np.nan |
|
>>> # detect all peaks and plot data |
|
>>> ind = detect_peaks(x, show=True) |
|
>>> print(ind) |
|
|
|
>>> x = np.sin(2*np.pi*5*np.linspace(0, 1, 200)) + np.random.randn(200)/5 |
|
>>> # set minimum peak height = 0 and minimum peak distance = 20 |
|
>>> detect_peaks(x, mph=0, mpd=20, show=True) |
|
|
|
>>> x = [0, 1, 0, 2, 0, 3, 0, 2, 0, 1, 0] |
|
>>> # set minimum peak distance = 2 |
|
>>> detect_peaks(x, mpd=2, show=True) |
|
|
|
>>> x = np.sin(2*np.pi*5*np.linspace(0, 1, 200)) + np.random.randn(200)/5 |
|
>>> # detection of valleys instead of peaks |
|
>>> detect_peaks(x, mph=-1.2, mpd=20, valley=True, show=True) |
|
|
|
>>> x = [0, 1, 1, 0, 1, 1, 0] |
|
>>> # detect both edges |
|
>>> detect_peaks(x, edge='both', show=True) |
|
|
|
>>> x = [-2, 1, -2, 2, 1, 1, 3, 0] |
|
>>> # set threshold = 2 |
|
>>> detect_peaks(x, threshold = 2, show=True) |
|
|
|
>>> x = [-2, 1, -2, 2, 1, 1, 3, 0] |
|
>>> fig, axs = plt.subplots(ncols=2, nrows=1, figsize=(10, 4)) |
|
>>> detect_peaks(x, show=True, ax=axs[0], threshold=0.5, title=False) |
|
>>> detect_peaks(x, show=True, ax=axs[1], threshold=1.5, title=False) |
|
|
|
Version history |
|
--------------- |
|
'1.0.6': |
|
Fix issue of when specifying ax object only the first plot was shown |
|
Add parameter to choose if a title is shown and input a title |
|
'1.0.5': |
|
The sign of `mph` is inverted if parameter `valley` is True |
|
|
|
""" |
|
|
|
x = np.atleast_1d(x).astype('float64') |
|
if x.size < 3: |
|
return np.array([], dtype=int) |
|
if valley: |
|
x = -x |
|
if mph is not None: |
|
mph = -mph |
|
|
|
dx = x[1:] - x[:-1] |
|
|
|
indnan = np.where(np.isnan(x))[0] |
|
if indnan.size: |
|
x[indnan] = np.inf |
|
dx[np.where(np.isnan(dx))[0]] = np.inf |
|
ine, ire, ife = np.array([[], [], []], dtype=int) |
|
if not edge: |
|
ine = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0))[0] |
|
else: |
|
if edge.lower() in ['rising', 'both']: |
|
ire = np.where((np.hstack((dx, 0)) <= 0) & (np.hstack((0, dx)) > 0))[0] |
|
if edge.lower() in ['falling', 'both']: |
|
ife = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) >= 0))[0] |
|
ind = np.unique(np.hstack((ine, ire, ife))) |
|
|
|
if ind.size and indnan.size: |
|
|
|
ind = ind[np.in1d(ind, np.unique(np.hstack((indnan, indnan-1, indnan+1))), invert=True)] |
|
|
|
if ind.size and ind[0] == 0: |
|
ind = ind[1:] |
|
if ind.size and ind[-1] == x.size-1: |
|
ind = ind[:-1] |
|
|
|
if ind.size and mph is not None: |
|
ind = ind[x[ind] >= mph] |
|
|
|
if ind.size and threshold > 0: |
|
dx = np.min(np.vstack([x[ind]-x[ind-1], x[ind]-x[ind+1]]), axis=0) |
|
ind = np.delete(ind, np.where(dx < threshold)[0]) |
|
|
|
if ind.size and mpd > 1: |
|
ind = ind[np.argsort(x[ind])][::-1] |
|
idel = np.zeros(ind.size, dtype=bool) |
|
for i in range(ind.size): |
|
if not idel[i]: |
|
|
|
idel = idel | (ind >= ind[i] - mpd) & (ind <= ind[i] + mpd) \ |
|
& (x[ind[i]] > x[ind] if kpsh else True) |
|
idel[i] = 0 |
|
|
|
ind = np.sort(ind[~idel]) |
|
|
|
if show: |
|
if indnan.size: |
|
x[indnan] = np.nan |
|
if valley: |
|
x = -x |
|
if mph is not None: |
|
mph = -mph |
|
_plot(x, mph, mpd, threshold, edge, valley, ax, ind, title) |
|
|
|
return ind, x[ind] |
|
|
|
|
|
def _plot(x, mph, mpd, threshold, edge, valley, ax, ind, title): |
|
"""Plot results of the detect_peaks function, see its help.""" |
|
try: |
|
import matplotlib.pyplot as plt |
|
except ImportError: |
|
print('matplotlib is not available.') |
|
else: |
|
if ax is None: |
|
_, ax = plt.subplots(1, 1, figsize=(8, 4)) |
|
no_ax = True |
|
else: |
|
no_ax = False |
|
|
|
ax.plot(x, 'b', lw=1) |
|
if ind.size: |
|
label = 'valley' if valley else 'peak' |
|
label = label + 's' if ind.size > 1 else label |
|
ax.plot(ind, x[ind], '+', mfc=None, mec='r', mew=2, ms=8, |
|
label='%d %s' % (ind.size, label)) |
|
ax.legend(loc='best', framealpha=.5, numpoints=1) |
|
ax.set_xlim(-.02*x.size, x.size*1.02-1) |
|
ymin, ymax = x[np.isfinite(x)].min(), x[np.isfinite(x)].max() |
|
yrange = ymax - ymin if ymax > ymin else 1 |
|
ax.set_ylim(ymin - 0.1*yrange, ymax + 0.1*yrange) |
|
ax.set_xlabel('Data #', fontsize=14) |
|
ax.set_ylabel('Amplitude', fontsize=14) |
|
if title: |
|
if not isinstance(title, str): |
|
mode = 'Valley detection' if valley else 'Peak detection' |
|
title = "%s (mph=%s, mpd=%d, threshold=%s, edge='%s')"% \ |
|
(mode, str(mph), mpd, str(threshold), edge) |
|
ax.set_title(title) |
|
|
|
if no_ax: |
|
plt.show() |