Source code for heartpy.analysis

'''
Functions that handle computation of heart rate (HR) and
heart rate variability (HRV) measures.
'''

import warnings

import numpy as np
from scipy.interpolate import UnivariateSpline
from scipy.signal import welch, periodogram

from .datautils import MAD, rolling_mean, outliers_iqr_method, outliers_modified_z
from .filtering import quotient_filter
import heartpy as hp


__all__ = ['calc_rr',
           'update_rr',
           'calc_rr_segment',
           'clean_rr_intervals',
           'calc_ts_measures',
           'calc_fd_measures',
           'calc_breathing']


[docs]def calc_rr(peaklist, sample_rate, working_data={}): '''calculates peak-peak intervals Function that calculates the peak-peak data required for further analysis. Stores results in the working_data{} dict. Parameters ---------- peaklist : 1d list or array list or array containing detected peak positions sample_rate : int or float the sample rate with which the heart rate signal is collected working_data : dict dictionary object that contains all heartpy's working data (temp) objects. will be created if not passed to function Returns ------- working_data : dict working_data dictionary object containing all of heartpy's temp objects Examples -------- Let's assume we detected peaks at these positions in the signal: >>> peaklist = [200, 280, 405, 501, 615] It is then easy to call calc_rr to compute what we need: >>> wd = calc_rr(peaklist, sample_rate = 100.0) >>> wd['RR_list'] array([ 800., 1250., 960., 1140.]) >>> wd['RR_diff'] array([450., 290., 180.]) >>> wd['RR_sqdiff'] array([202500., 84100., 32400.]) Note that the list of peak-peak intervals is of length len(peaks) - 1 the length of the differences is of length len(peaks) - 2 ''' peaklist = np.array(peaklist) #cast numpy array to be sure or correct array type #delete first peak if within first 150ms (signal might start mid-beat after peak) if len(peaklist) > 0: if peaklist[0] <= ((sample_rate / 1000.0) * 150): peaklist = np.delete(peaklist, 0) working_data['peaklist'] = peaklist working_data['ybeat'] = np.delete(working_data['ybeat'], 0) rr_list = (np.diff(peaklist) / sample_rate) * 1000.0 rr_indices = [(peaklist[i], peaklist[i+1]) for i in range(len(peaklist) - 1)] rr_diff = np.abs(np.diff(rr_list)) rr_sqdiff = np.power(rr_diff, 2) working_data['RR_list'] = rr_list working_data['RR_indices'] = rr_indices working_data['RR_diff'] = rr_diff working_data['RR_sqdiff'] = rr_sqdiff return working_data
[docs]def update_rr(working_data={}): '''updates differences between adjacent peak-peak distances Function that updates RR differences and RR squared differences based on corrected RR list Parameters ---------- working_data : dict dictionary object that contains all heartpy's working data (temp) objects will be created if not passed to function Returns ------- out : dict working_data dictionary object containing all of heartpy's temp objects Examples -------- Let's assume we detected peaks at these positions in the signal: >>> peaklist = [200, 280, 405, 410, 501, 615] And we subsequently ran further analysis: >>> wd = calc_rr(peaklist, sample_rate = 100.0) The peak at position 410 is likely an incorrect detection and will be marked as such by other heartpy functions. This is indicated by an array 'binary_peaklist' in working_data. Binary peaklist is of the same length as peaklist, and is formatted as a mask: For now let's set it manually, normally this is done by the check_peaks() function from HeartPy's peakdetection module. >>> wd['binary_peaklist'] = [1, 1, 1, 0, 1, 1] Rejected peaks are marked with a zero and accepted with a 1. By now running update_rr(), heartpy will update all associated measures and will only compute peak-peak intervals between two accepted peaks. >>> wd = update_rr(wd) This will have generated a corrected RR_list object in the dictionary: >>> wd['RR_list_cor'] [800.0, 1250.0, 1140.0] As well as updated the lists RR_diff (differences between adjacent peak-peak intervals) and RR_sqdiff (squared differences between adjacent peak-peak intervals). ''' rr_source = working_data['RR_list'] b_peaklist = working_data['binary_peaklist'] rr_list = [rr_source[i] for i in range(len(rr_source)) if b_peaklist[i] + b_peaklist[i+1] == 2] rr_mask = [0 if (b_peaklist[i] + b_peaklist[i+1] == 2) else 1 for i in range(len(rr_source))] rr_masked = np.ma.array(rr_source, mask=rr_mask) rr_diff = np.abs(np.diff(rr_masked)) rr_diff = rr_diff[~rr_diff.mask] rr_sqdiff = np.power(rr_diff, 2) working_data['RR_masklist'] = rr_mask working_data['RR_list_cor'] = rr_list working_data['RR_diff'] = rr_diff working_data['RR_sqdiff'] = rr_sqdiff return working_data
[docs]def calc_rr_segment(rr_source, b_peaklist): '''calculates peak-peak differences for segmentwise processing Function that calculates rr-measures when analysing segmentwise in the 'fast' mode. Parameters ---------- rr_source : 1d list or array list or array containing peak-peak intervals. b_peaklist : 1d list or array list or array containing mask for peaklist. Returns ------- rr_list : array array containing peak-peak intervals. rr_diff : array array containing differences between adjacent peak-peak intervals rr_sqdiff : array array containing squared differences between adjacent peak-peak intervals Examples -------- The function works in the same way as update_rr, except it returns three separate objects. It's used by process_segmentwise. Revert to doc on update_rr for more information. >>> rr, rrd, rrsd = calc_rr_segment(rr_source = [ 800., 1250., 50., 910., 1140., 1002., 1142.], ... b_peaklist = [1, 1, 1, 0, 1, 1, 1, 1]) >>> print(rr) [800.0, 1250.0, 1140.0, 1002.0, 1142.0] >>> print(rrd) [450.0 138.0 140.0] >>> print(rrsd) [202500. 19044. 19600.] ''' rr_list = [rr_source[i] for i in range(len(rr_source)) if b_peaklist[i] + b_peaklist[i+1] == 2] rr_mask = [0 if (b_peaklist[i] + b_peaklist[i+1] == 2) else 1 for i in range(len(rr_source))] rr_masked = np.ma.array(rr_source, mask=rr_mask) rr_diff = np.abs(np.diff(rr_masked)) rr_diff = rr_diff[~rr_diff.mask] rr_sqdiff = np.power(rr_diff, 2) return rr_list, rr_diff, rr_sqdiff
[docs]def clean_rr_intervals(working_data, method='quotient-filter'): '''detects and rejects outliers in peak-peak intervals Function that detects and rejects outliers in the peak-peak intervals. It updates the RR_list_cor in the working data dict Parameters ---------- working_data : dict dictionary object that contains all heartpy's working data (temp) objects. Needs to contain RR_list_cor, meaning one analysis cycle has already completed. method : str which method to use for outlier rejection, included are: - 'quotient-filter', based on the work in "Piskorki, J., Guzik, P. (2005), Filtering Poincare plots", - 'iqr', which uses the inter-quartile range, - 'z-score', which uses the modified z-score method. default : quotient-filter Returns ------- working_data : dict dictionary object that contains all heartpy's working data (temp) objects. will be created if not passed to function Examples -------- Let's load some data >>> import heartpy as hp >>> data, timer = hp.load_exampledata(1) >>> sample_rate = hp.get_samplerate_mstimer(timer) Run at least one analysis cycle first so that the dicts are populated >>> wd, m = hp.process(data, sample_rate) >>> wd = clean_rr_intervals(working_data = wd) >>> ['%.3f' %x for x in wd['RR_list_cor'][0:5]] ['897.470', '811.997', '829.091', '777.807', '803.449'] You can also specify the outlier rejection method to be used, for example using the z-score method: >>> wd = clean_rr_intervals(working_data = wd, method = 'z-score') >>> ['%.3f' %x for x in wd['RR_list_cor'][0:5]] ['897.470', '811.997', '829.091', '777.807', '803.449'] Or the inter-quartile range (iqr) based method: >>> wd = clean_rr_intervals(working_data = wd, method = 'iqr') >>> ['%.3f' %x for x in wd['RR_list_cor'][0:5]] ['897.470', '811.997', '829.091', '965.849', '803.449'] ''' #generate RR_list_cor indices relative to RR_list RR_cor_indices = [i for i in range(len(working_data['RR_masklist'])) if working_data['RR_masklist'][i] == 0] #clean rr-list if method.lower() == 'iqr': rr_cleaned, replaced_indices = outliers_iqr_method(working_data['RR_list_cor']) rr_mask = working_data['RR_masklist'] for i in replaced_indices: rr_mask[RR_cor_indices[i]] = 1 elif method.lower() == 'z-score': rr_cleaned, replaced_indices = outliers_modified_z(working_data['RR_list_cor']) rr_mask = working_data['RR_masklist'] for i in replaced_indices: rr_mask[RR_cor_indices[i]] = 1 elif method.lower() == 'quotient-filter': rr_mask = quotient_filter(working_data['RR_list'], working_data['RR_masklist']) rr_cleaned = [x for x,y in zip(working_data['RR_list'], rr_mask) if y == 0] else: raise ValueError('Incorrect method specified, use either "iqr", "z-score" or "quotient-filtering". \ Nothing to do!') rr_masked = np.ma.array(working_data['RR_list'], mask=rr_mask) rr_diff = np.abs(np.diff(rr_masked)) rr_diff = rr_diff[~rr_diff.mask] rr_sqdiff = np.power(rr_diff, 2) working_data['RR_masked'] = rr_masked working_data['RR_list_cor'] = np.asarray(rr_cleaned) working_data['RR_diff'] = rr_diff working_data['RR_sqdiff'] = rr_sqdiff try: removed_beats = [x for x in working_data['removed_beats']] removed_beats_y = [x for x in working_data['removed_beats_y']] peaklist = working_data['peaklist'] ybeat = working_data['ybeat'] for i in range(len(rr_mask)): if rr_mask[i] == 1 and peaklist[i] not in removed_beats: removed_beats.append(peaklist[i]) removed_beats_y.append(ybeat[i]) working_data['removed_beats'] = np.asarray(removed_beats) working_data['removed_beats_y'] = np.asarray(removed_beats_y) except: pass return working_data
[docs]def calc_ts_measures(rr_list, rr_diff, rr_sqdiff, measures={}, working_data={}): '''calculates standard time-series measurements. Function that calculates the time-series measurements for HeartPy. Parameters ---------- rr_list : 1d list or array list or array containing peak-peak intervals rr_diff : 1d list or array list or array containing differences between adjacent peak-peak intervals rr_sqdiff : 1d list or array squared rr_diff measures : dict dictionary object used by heartpy to store computed measures. Will be created if not passed to function. working_data : dict dictionary object that contains all heartpy's working data (temp) objects. will be created if not passed to function Returns ------- working_data : dict dictionary object that contains all heartpy's working data (temp) objects. measures : dict dictionary object used by heartpy to store computed measures. Examples -------- Normally this function is called during the process pipeline of HeartPy. It can of course also be used separately. Assuming we have the following peak-peak distances: >>> import numpy as np >>> rr_list = [1020.0, 990.0, 960.0, 1000.0, 1050.0, 1090.0, 990.0, 900.0, 900.0, 950.0, 1080.0] we can then compute the other two required lists by hand for now: >>> rr_diff = np.diff(rr_list) >>> rr_sqdiff = np.power(rr_diff, 2) >>> wd, m = calc_ts_measures(rr_list, rr_diff, rr_sqdiff) All output measures are then accessible from the measures object through their respective keys: >>> print('%.3f' %m['bpm']) 60.384 >>> print('%.3f' %m['rmssd']) 67.082 ''' measures['bpm'] = 60000 / np.mean(rr_list) measures['ibi'] = np.mean(rr_list) ##TODO: measures['sdnn'] = np.std(rr_list) measures['sdsd'] = np.std(rr_diff) measures['rmssd'] = np.sqrt(np.mean(rr_sqdiff)) nn20 = rr_diff[np.where(rr_diff > 20.0)] nn50 = rr_diff[np.where(rr_diff > 50.0)] working_data['nn20'] = nn20 working_data['nn50'] = nn50 try: measures['pnn20'] = float(len(nn20)) / float(len(rr_diff)) except: measures['pnn20'] = np.nan try: measures['pnn50'] = float(len(nn50)) / float(len(rr_diff)) except: measures['pnn50'] = np.nan measures['hr_mad'] = MAD(rr_list) return working_data, measures
[docs]def calc_fd_measures(method='welch', square_spectrum=True, measures={}, working_data={}): '''calculates the frequency-domain measurements. Function that calculates the frequency-domain measurements for HeartPy. Parameters ---------- method : str method used to compute the spectrogram of the heart rate. available methods: fft, periodogram, and welch default : welch square_spectrum : bool whether to square the power spectrum returned. default : true measures : dict dictionary object used by heartpy to store computed measures. Will be created if not passed to function. working_data : dict dictionary object that contains all heartpy's working data (temp) objects. will be created if not passed to function Returns ------- working_data : dict dictionary object that contains all heartpy's working data (temp) objects. measures : dict dictionary object used by heartpy to store computed measures. Examples -------- Normally this function is called during the process pipeline of HeartPy. It can of course also be used separately. Let's load an example and get a list of peak-peak intervals >>> import heartpy as hp >>> data, timer = hp.load_exampledata(2) >>> sample_rate = hp.get_samplerate_datetime(timer, timeformat='%Y-%m-%d %H:%M:%S.%f') >>> wd, m = hp.process(data, sample_rate) wd now contains a list of peak-peak intervals that has been cleaned of outliers ('RR_list_cor'). Calling the function then is easy >>> wd, m = calc_fd_measures(method = 'periodogram', measures = m, working_data = wd) >>> print('%.3f' %m['lf/hf']) 4.964 Available methods are 'fft', 'welch' and 'periodogram'. To set another method, do: >>> wd, m = calc_fd_measures(method = 'fft', measures = m, working_data = wd) >>> print('%.3f' %m['lf/hf']) 4.964 If there are no valid peak-peak intervals specified, returned measures are NaN: >>> wd['RR_list_cor'] = [] >>> wd, m = calc_fd_measures(working_data = wd) >>> np.isnan(m['lf/hf']) True If there are rr-intervals but not enough to reliably compute frequency measures, a warning is raised: -------------- RuntimeWarning: Short signal. ---------Warning:--------- too few peak-peak intervals for (reliable) frequency domain measure computation, frequency output measures are still computed but treat them with caution! HF is usually computed over a minimum of 1 minute of good signal. LF is usually computed over a minimum of 2 minutes of good signal. The LF/HF ratio is usually computed over minimum 24 hours, although an absolute minimum of 5 min has also been suggested. For more info see: \nShaffer, F., Ginsberg, J.P. (2017). An Overview of Heart Rate Variability Metrics and Norms. Task Force of Pacing and Electrophysiology (1996), Heart Rate Variability in: European Heart Journal, vol.17, issue 3, pp354-381 This warning will not repeat' -------------- ''' rr_list = working_data['RR_list_cor'] if len(rr_list) <= 1: working_data['frq'] = np.nan working_data['psd'] = np.nan measures['lf'] = np.nan measures['hf'] = np.nan measures['lf/hf'] = np.nan return working_data, measures elif np.sum(rr_list) <= 300000: # pragma: no cover #warn if signal is short msg = ''.join(('Short signal.\n', '\n---------Warning:---------\n', 'too few peak-peak intervals for (reliable) frequency domain measure computation, ', 'frequency output measures are still computed but treat them with caution!\n\n', 'HF is usually computed over a minimum of 1 minute of good signal. ', 'LF is usually computed over a minimum of 2 minutes of good signal.', 'The LF/HF ratio is usually computed over minimum 24 hours, although an ', 'absolute minimum of 5 min has also been suggested.\n\n', 'For more info see: \nShaffer, F., Ginsberg, J.P. (2017), ', 'An Overview of Heart Rate Variability Metrics and Norms.\n\n', 'Task Force of Pacing and Electrophysiology (1996), Heart Rate Variability, ', 'in: European Heart Journal, vol.17, issue 3, pp354-381' '\n\nThis warning will not repeat')) warnings.warn(msg, UserWarning) rr_x = [] pointer = 0 for x in rr_list: pointer += x rr_x.append(pointer) rr_x_new = np.linspace(int(rr_x[0]), int(rr_x[-1]), int(rr_x[-1])) interpolated_func = UnivariateSpline(rr_x, rr_list, k=3) if method=='fft': datalen = len(rr_x_new) frq = np.fft.fftfreq(datalen, d=((1/1000.0))) frq = frq[range(int(datalen/2))] Y = np.fft.fft(interpolated_func(rr_x_new))/datalen Y = Y[range(int(datalen/2))] psd = np.power(Y, 2) elif method=='periodogram': frq, psd = periodogram(interpolated_func(rr_x_new), fs=1000.0) elif method=='welch': frq, psd = welch(interpolated_func(rr_x_new), fs=1000.0, nperseg=len(rr_x_new) - 1) else: raise ValueError("specified method incorrect, use 'fft', 'periodogram' or 'welch'") working_data['frq'] = frq working_data['psd'] = psd measures['lf'] = np.trapz(abs(psd[(frq >= 0.04) & (frq <= 0.15)])) measures['hf'] = np.trapz(abs(psd[(frq >= 0.16) & (frq <= 0.5)])) measures['lf/hf'] = measures['lf'] / measures['hf'] working_data['interp_rr_function'] = interpolated_func working_data['interp_rr_linspace'] = (rr_x[0], rr_x[-1], rr_x[-1]) return working_data, measures
[docs]def calc_breathing(rrlist, method='welch', filter_breathing=True, bw_cutoff=[0.1, 0.4], measures={}, working_data={}): '''estimates breathing rate Function that estimates breathing rate from heart rate signal. Upsamples the list of detected rr_intervals by interpolation then tries to extract breathing peaks in the signal. Parameters ---------- rr_list : 1d list or array list or array containing peak-peak intervals method : str method to use to get the spectrogram, must be 'fft' or 'welch' default : fft filter_breathing : bool whether to filter the breathing signal derived from the peak-peak intervals default : True bw_cutoff : list or tuple breathing frequency range expected default : [0.1, 0.4], meaning between 6 and 24 breaths per minute measures : dict dictionary object used by heartpy to store computed measures. Will be created if not passed to function. working_data : dict dictionary object that contains all heartpy's working data (temp) objects. will be created if not passed to function Returns ------- measures : dict dictionary object used by heartpy to store computed measures. Examples -------- Normally this function is called during the process pipeline of HeartPy. It can of course also be used separately. Let's load an example and get a list of peak-peak intervals >>> import heartpy as hp >>> data, _ = hp.load_exampledata(0) >>> wd, m = hp.process(data, 100.0) Breathing is then computed with the function >>> m, wd = calc_breathing(wd['RR_list_cor'], measures = m, working_data = wd) >>> round(m['breathingrate'], 3) 0.171 There we have it, .17Hz, or about one breathing cycle in 6.25 seconds. ''' #resample RR-list to 1000Hz x = np.linspace(0, len(rrlist), len(rrlist)) x_new = np.linspace(0, len(rrlist), np.sum(rrlist, dtype=np.int32)) interp = UnivariateSpline(x, rrlist, k=3) breathing = interp(x_new) if filter_breathing: breathing = hp.filtering.filter_signal(breathing, cutoff=bw_cutoff, sample_rate = 1000.0, filtertype='bandpass') if method.lower() == 'fft': datalen = len(breathing) frq = np.fft.fftfreq(datalen, d=((1/1000.0))) frq = frq[range(int(datalen/2))] Y = np.fft.fft(breathing)/datalen Y = Y[range(int(datalen/2))] psd = np.power(np.abs(Y), 2) elif method.lower() == 'welch': if len(breathing) < 30000: frq, psd = welch(breathing, fs=1000, nperseg=len(breathing)) else: frq, psd = welch(breathing, fs=1000, nperseg=np.clip(len(breathing) // 10, a_min=30000, a_max=None)) elif method.lower() == 'periodogram': frq, psd = periodogram(breathing, fs=1000.0, nfft=30000) else: raise ValueError('Breathing rate extraction method not understood! Must be \'welch\' or \'fft\'!') #find max measures['breathingrate'] = frq[np.argmax(psd)] working_data['breathing_signal'] = breathing working_data['breathing_psd'] = psd working_data['breathing_frq'] = frq return measures, working_data
def calc_poincare(rr_list, rr_mask=[], measures={}, working_data={}): '''computes poincare parameters Function that takes peak-peak intervals and computes poincare parameters: [0] standard deviation perpendicular to identity line (SD1) [1] standard deviation along identity line (SD2) [2] area of ellipse described by SD1 and SD2 [3] SD1/SD2 ratio Based on: "Shaffer, F., Ginsberg, J.P. (2017), An Overview of Heart Rate Variability Metrics and Norms" Parameters ---------- rr_list : 1d array or list list or array containing peak-peak intervals rr_mask : 1d array or list list or array containing mask for rejected peak-peak intervals measures : dict dictionary object used by heartpy to store computed measures. Will be created if not passed to function. working_data : dict dictionary object that contains all heartpy's working data (temp) objects. will be created if not passed to function Returns ------- working_data : dict dictionary object that contains all heartpy's working data (temp) objects. measures : dict dictionary object used by heartpy to store computed measures. poincare values are appended to measures['poincare'] ''' #generate vectors of adjacent peak-peak intervals x_plus = [] x_minus = [] for i in range(len(working_data['RR_masklist']) - 1): if working_data['RR_masklist'][i] + working_data['RR_masklist'][i + 1] == 0: #only add adjacent RR-intervals that are not rejected x_plus.append(working_data['RR_list'][i]) x_minus.append(working_data['RR_list'][i + 1]) else: pass #cast to arrays so we can do numerical work easily x_plus = np.asarray(x_plus) x_minus = np.asarray(x_minus) #compute parameters and append to dict x_one = (x_plus - x_minus) / np.sqrt(2) x_two = (x_plus + x_minus) / np.sqrt(2) sd1 = np.sqrt(np.var(x_one)) #compute stdev perpendicular to identity line sd2 = np.sqrt(np.var(x_two)) #compute stdev parallel to identity line s = np.pi * sd1 * sd2 #compute area of ellipse #write computed measures to dicts measures['sd1'] = sd1 measures['sd2'] = sd2 measures['s'] = s measures['sd1/sd2'] = sd1 / sd2 working_data['poincare'] = {} working_data['poincare']['x_plus'] = x_plus working_data['poincare']['x_minus'] = x_minus working_data['poincare']['x_one'] = x_one working_data['poincare']['x_two'] = x_two return measures