'''
functions for peak detection and related tasks
'''
import numpy as np
from scipy.signal import resample
from .analysis import calc_rr, update_rr
from .exceptions import BadSignalWarning
from .filtering import quotient_filter
__all__ = ['make_windows',
'append_dict',
'detect_peaks',
'fit_peaks',
'check_peaks',
'check_binary_quality',
'interpolate_peaks']
[docs]def make_windows(data, sample_rate, windowsize=120, overlap=0, min_size=20):
'''slices data into windows
Funcion that slices data into windows for concurrent analysis.
Used by process_segmentwise wrapper function.
Parameters
----------
data : 1-d array
array containing heart rate sensor data
sample_rate : int or float
sample rate of the data stream in 'data'
windowsize : int
size of the window that is sliced in seconds
overlap : float
fraction of overlap between two adjacent windows: 0 <= float < 1.0
min_size : int
the minimum size for the last (partial) window to be included. Very short windows
might not stable for peak fitting, especially when significant noise is present.
Slightly longer windows are likely stable but don't make much sense from a
signal analysis perspective.
Returns
-------
out : array
tuples of window indices
Examples
--------
Assuming a given example data file:
>>> import heartpy as hp
>>> data, _ = hp.load_exampledata(1)
We can split the data into windows:
>>> indices = make_windows(data, 100.0, windowsize = 30, overlap = 0.5, min_size = 20)
>>> indices.shape
(9, 2)
Specifying min_size = -1 will include the last window no matter what:
>>> indices = make_windows(data, 100.0, windowsize = 30, overlap = 0.5, min_size = -1)
'''
ln = len(data)
window = windowsize * sample_rate
stepsize = (1 - overlap) * window
start = 0
end = window
slices = []
while end < len(data):
slices.append((start, end))
start += stepsize
end += stepsize
if min_size == -1:
slices[-1] = (slices[-1][0], len(data))
elif (ln - start) / sample_rate >= min_size:
slices.append((start, ln))
return np.array(slices, dtype=np.int32)
[docs]def append_dict(dict_obj, measure_key, measure_value):
'''appends data to keyed dict.
Function that appends key to continuous dict, creates if doesn't exist. EAFP
Parameters
----------
dict_obj : dict
dictionary object that contains continuous output measures
measure_key : str
key for the measure to be stored in continuous_dict
measure_value : any data container
value to be appended to dictionary
Returns
-------
dict_obj : dict
dictionary object passed to function, with specified data container appended
Examples
--------
Given a dict object 'example' with some data in it:
>>> example = {}
>>> example['call'] = ['hello']
We can use the function to append it:
>>> example = append_dict(example, 'call', 'world')
>>> example['call']
['hello', 'world']
A new key will be created if it doesn't exist:
>>> example = append_dict(example, 'different_key', 'hello there!')
>>> sorted(example.keys())
['call', 'different_key']
'''
try:
dict_obj[measure_key].append(measure_value)
except KeyError:
dict_obj[measure_key] = [measure_value]
return dict_obj
[docs]def detect_peaks(hrdata, rol_mean, ma_perc, sample_rate, update_dict=True, working_data={}):
'''detect peaks in signal
Function that detects heartrate peaks in the given dataset.
Parameters
----------
hr data : 1-d numpy array or list
array or list containing the heart rate data
rol_mean : 1-d numpy array
array containing the rolling mean of the heart rate signal
ma_perc : int or float
the percentage with which to raise the rolling mean,
used for fitting detection solutions to data
sample_rate : int or float
the sample rate of the provided data set
update_dict : bool
whether to update the peak information in the module's data structure
Settable to False to allow this function to be re-used for example by
the breath analysis module.
default : True
Examples
--------
Normally part of the peak detection pipeline. Given the first example data
it would work like this:
>>> import heartpy as hp
>>> from heartpy.datautils import rolling_mean, _sliding_window
>>> data, _ = hp.load_exampledata(0)
>>> rol_mean = rolling_mean(data, windowsize = 0.75, sample_rate = 100.0)
>>> wd = detect_peaks(data, rol_mean, ma_perc = 20, sample_rate = 100.0)
Now the peaklist has been appended to the working data dict. Let's look
at the first five peak positions:
>>> wd['peaklist'][0:5]
[63, 165, 264, 360, 460]
'''
rmean = np.array(rol_mean)
#rol_mean = rmean + ((rmean / 100) * ma_perc)
mn = np.mean(rmean / 100) * ma_perc
rol_mean = rmean + mn
peaksx = np.where((hrdata > rol_mean))[0]
peaksy = hrdata[np.where((hrdata > rol_mean))[0]]
peakedges = np.concatenate((np.array([0]),
(np.where(np.diff(peaksx) > 1)[0]),
np.array([len(peaksx)])))
peaklist = []
for i in range(0, len(peakedges)-1):
try:
y_values = peaksy[peakedges[i]:peakedges[i+1]].tolist()
peaklist.append(peaksx[peakedges[i] + y_values.index(max(y_values))])
except:
pass
if update_dict:
working_data['peaklist'] = peaklist
working_data['ybeat'] = [hrdata[x] for x in peaklist]
working_data['rolling_mean'] = rol_mean
working_data = calc_rr(working_data['peaklist'], sample_rate,
working_data=working_data)
if len(working_data['RR_list']) > 0:
working_data['rrsd'] = np.std(working_data['RR_list'])
else:
working_data['rrsd'] = np.inf
return working_data
else:
return peaklist, working_data
[docs]def fit_peaks(hrdata, rol_mean, sample_rate, bpmmin=40, bpmmax=180, working_data={}):
'''optimize for best peak detection
Function that runs fitting with varying peak detection thresholds given a
heart rate signal.
Parameters
----------
hrdata : 1d array or list
array or list containing the heart rate data
rol_mean : 1-d array
array containing the rolling mean of the heart rate signal
sample_rate : int or float
the sample rate of the data set
bpmmin : int
minimum value of bpm to see as likely
default : 40
bpmmax : int
maximum value of bpm to see as likely
default : 180
Returns
-------
working_data : dict
dictionary object that contains all heartpy's working data (temp) objects.
will be created if not passed to function
Examples
--------
Part of peak detection pipeline. Uses moving average as a peak detection
threshold and rises it stepwise. Determines best fit by minimising
standard deviation of peak-peak distances as well as getting a bpm that
lies within the expected range.
Given included example data let's show how this works
>>> import heartpy as hp
>>> from heartpy.datautils import rolling_mean, _sliding_window
>>> data, _ = hp.load_exampledata(0)
>>> rol_mean = rolling_mean(data, windowsize = 0.75, sample_rate = 100.0)
We can then call this function and let the optimizer do its work:
>>> wd = fit_peaks(data, rol_mean, sample_rate = 100.0)
Now the wd dict contains the best fit paramater(s):
>>> wd['best']
20
This indicates the best fit can be obtained by raising the moving average
with 20%.
The results of the peak detection using these parameters are included too.
To illustrate, these are the first five detected peaks:
>>> wd['peaklist'][0:5]
[63, 165, 264, 360, 460]
and the corresponding peak-peak intervals:
>>> wd['RR_list'][0:4]
array([1020., 990., 960., 1000.])
'''
ma_perc_list = [5, 10, 15, 20, 25, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 150, 200, 300]
rrsd = []
valid_ma = []
for ma_perc in ma_perc_list:
working_data = detect_peaks(hrdata, rol_mean, ma_perc, sample_rate,
update_dict=True, working_data=working_data)
bpm = ((len(working_data['peaklist'])/(len(hrdata)/sample_rate))*60)
rrsd.append([working_data['rrsd'], bpm, ma_perc])
for _rrsd, _bpm, _ma_perc in rrsd:
if (_rrsd > 0.1) and ((bpmmin <= _bpm <= bpmmax)):
valid_ma.append([_rrsd, _ma_perc])
if len(valid_ma) > 0:
working_data['best'] = min(valid_ma, key=lambda t: t[0])[1]
working_data = detect_peaks(hrdata, rol_mean, min(valid_ma, key=lambda t: t[0])[1],
sample_rate, update_dict=True, working_data=working_data)
return working_data
else:
raise BadSignalWarning('\n----------------\nCould not determine best fit for \
given signal. Please check the source signal.\n Probable causes:\n- detected heart rate falls \
outside of bpmmin<->bpmmax constraints\n- no detectable heart rate present in signal\n\
- very noisy signal (consider filtering and scaling)\nIf you\'re sure the signal contains heart\
rate data, consider filtering and/or scaling first.\n----------------\n')
[docs]def check_peaks(rr_arr, peaklist, ybeat, quotient_filter=False, reject_segmentwise=False,
working_data={}):
'''find anomalous peaks.
Funcion that checks peaks for outliers based on anomalous peak-peak distances and corrects
by excluding them from further analysis.
Parameters
----------
rr_arr : 1d array or list
list or array containing peak-peak intervals
peaklist : 1d array or list
list or array containing detected peak positions
ybeat : 1d array or list
list or array containing corresponding signal values at
detected peak positions. Used for plotting functionality
later on.
reject_segmentwise : bool
if set, checks segments per 10 detected peaks. Marks segment
as rejected if 30% of peaks are rejected.
default : False
working_data : dict
dictionary object that contains all heartpy's working data (temp) objects.
will be created if not passed to function
Returns
-------
working_data : dict
working_data dictionary object containing all of heartpy's temp objects
Examples
--------
Part of peak detection pipeline. No standalone examples exist. See docstring
for hp.process() function for more info
'''
rr_arr = np.array(rr_arr)
peaklist = np.array(peaklist)
ybeat = np.array(ybeat)
mean_rr = np.mean(rr_arr)
upper_threshold = mean_rr + 300 if (0.3 * mean_rr) <= 300 else mean_rr + (0.3 * mean_rr)
lower_threshold = mean_rr - 300 if (0.3 * mean_rr) <= 300 else mean_rr - (0.3 * mean_rr)
working_data['removed_beats'] = peaklist[np.where((rr_arr <= lower_threshold) |
(rr_arr >= upper_threshold))[0]+1]
working_data['removed_beats_y'] = ybeat[np.where((rr_arr <= lower_threshold) |
(rr_arr >= upper_threshold))[0]+1]
working_data['binary_peaklist'] = np.asarray([0 if x in working_data['removed_beats']
else 1 for x in peaklist])
if reject_segmentwise:
working_data = check_binary_quality(peaklist, working_data['binary_peaklist'],
working_data=working_data)
working_data = update_rr(working_data=working_data)
return working_data
[docs]def check_binary_quality(peaklist, binary_peaklist, maxrejects=3, working_data={}):
'''checks signal in chunks of 10 beats.
Function that checks signal in chunks of 10 beats. It zeros out chunk if
number of rejected peaks > maxrejects. Also marks rejected segment coordinates
in tuples (x[0], x[1] in working_data['rejected_segments']
Parameters
----------
peaklist : 1d array or list
list or array containing detected peak positions
binary_peaklist : 1d array or list
list or array containing mask for peaklist, coding which peaks are rejected
maxjerects : int
maximum number of rejected peaks per 10-beat window
default : 3
working_data : dict
dictionary object that contains all heartpy's working data (temp) objects.
will be created if not passed to function
Returns
-------
working_data : dict
working_data dictionary object containing all of heartpy's temp objects
Examples
--------
Part of peak detection pipeline. No standalone examples exist. See docstring
for hp.process() function for more info
Given some peaklist and binary mask:
>>> peaklist = [30, 60, 90, 110, 130, 140, 160, 170, 200, 220]
>>> binary_peaklist = [0, 1, 1, 0, 0, 1, 0, 1, 0, 0]
>>> wd = check_binary_quality(peaklist, binary_peaklist)
>>> wd['rejected_segments']
[(30, 220)]
The whole segment is rejected as it contains more than the specified 3 rejections
per 10 beats.
'''
idx = 0
working_data['rejected_segments'] = []
for i in range(int(len(binary_peaklist) / 10)):
if np.bincount(binary_peaklist[idx:idx + 10])[0] > maxrejects:
binary_peaklist[idx:idx + 10] = [0 for i in range(len(binary_peaklist[idx:idx+10]))]
if idx + 10 < len(peaklist):
working_data['rejected_segments'].append((peaklist[idx], peaklist[idx + 10]))
else:
working_data['rejected_segments'].append((peaklist[idx], peaklist[-1]))
idx += 10
return working_data
[docs]def interpolate_peaks(data, peaks, sample_rate, desired_sample_rate=1000.0, working_data={}):
'''interpolate detected peak positions and surrounding data points
Function that enables high-precision mode by taking the estimated peak position,
then upsampling the peak position +/- 100ms to the specified sampling rate, subsequently
estimating the peak position with higher accuracy.
Parameters
----------
data : 1d list or array
list or array containing heart rate data
peaks : 1d list or array
list or array containing x-positions of peaks in signal
sample_rate : int or float
the sample rate of the signal (in Hz)
desired_sampled-rate : int or float
the sample rate to which to upsample.
Must be sample_rate < desired_sample_rate
Returns
-------
working_data : dict
working_data dictionary object containing all of heartpy's temp objects
Examples
--------
Given the output of a normal analysis and the first five peak-peak intervals:
>>> import heartpy as hp
>>> data, _ = hp.load_exampledata(0)
>>> wd, m = hp.process(data, 100.0)
>>> wd['peaklist'][0:5]
[63, 165, 264, 360, 460]
Now, the resolution is at max 10ms as that's the distance between data points.
We can use the high precision mode for example to approximate a more precise
position, for example if we had recorded at 1000Hz:
>>> wd = interpolate_peaks(data = data, peaks = wd['peaklist'],
... sample_rate = 100.0, desired_sample_rate = 1000.0, working_data = wd)
>>> wd['peaklist'][0:5]
[63.5, 165.4, 263.6, 360.4, 460.2]
As you can see the accuracy of peak positions has increased.
Note that you cannot magically upsample nothing into something. Be reasonable.
'''
assert desired_sample_rate > sample_rate, "desired sample rate is lower than actual sample rate \
this would result in downsampling which will hurt accuracy."
num_samples = int(0.1 * sample_rate)
ratio = sample_rate / desired_sample_rate
interpolation_slices = [(x - num_samples, x + num_samples) for x in peaks]
peaks = []
for i in interpolation_slices:
slice = data[i[0]:i[1]]
resampled = resample(slice, int(len(slice) * (desired_sample_rate / sample_rate)))
peakpos = np.argmax(resampled)
peaks.append((i[0] + (peakpos * ratio)))
working_data['peaklist'] = peaks
return working_data