Source code for heartpy.preprocessing

import numpy as np
from scipy.interpolate import UnivariateSpline, interp1d

np.seterr(divide='ignore') #disable div by zero warnings
np.seterr(invalid='ignore')

from .filtering import filter_signal


__all__ = ['scale_data',
           'scale_sections',
           'enhance_peaks',
           'enhance_ecg_peaks',
           'interpolate_clipping',
           'flip_signal']


[docs]def scale_data(data, lower=0, upper=1024): '''scales passed sequence between thresholds Function that scales passed data so that it has specified lower and upper bounds. Parameters ---------- data : 1-d array or list Sequence to be scaled lower : int or float lower threshold for scaling default : 0 upper : int or float upper threshold for scaling default : 1024 Returns ------- out : 1-d array contains scaled data Examples -------- When passing data without further arguments to the function means it scales 0-1024 >>> x = [2, 3, 4, 5] >>> scale_data(x) array([ 0. , 341.33333333, 682.66666667, 1024. ]) Or you can specify a range: >>> scale_data(x, lower = 50, upper = 124) array([ 50. , 74.66666667, 99.33333333, 124. ]) ''' rng = np.max(data) - np.min(data) minimum = np.min(data) data = (upper - lower) * ((data - minimum) / rng) + lower return data
[docs]def scale_sections(data, sample_rate, windowsize=2.5, lower=0, upper=1024): '''scales data using sliding window approach Function that scales the data within the defined sliding window between the defined lower and upper bounds. Parameters ---------- data : 1-d array or list Sequence to be scaled sample_rate : int or float Sample rate of the passed signal windowsize : int or float size of the window within which signal is scaled, in seconds default : 2.5 lower : int or float lower threshold for scaling. Passed to scale_data. default : 0 upper : int or float upper threshold for scaling. Passed to scale_data. default : 1024 Returns ------- out : 1-d array contains scaled data Examples -------- >>> x = [20, 30, 20, 30, 70, 80, 20, 30, 20, 30] >>> scale_sections(x, sample_rate=1, windowsize=2, lower=20, upper=30) array([20., 30., 20., 30., 20., 30., 20., 30., 20., 30.]) ''' total_length = len(data) / sample_rate window_dimension = int(windowsize * sample_rate) data_start = 0 data_end = window_dimension output = np.empty(len(data)) while data_end <= len(data): sliced = data[data_start:data_end] sliced = np.power(sliced, 2) scaled = scale_data(sliced, lower, upper) output[data_start:data_end] = scaled data_start += window_dimension data_end += window_dimension return np.array(output)
def mark_clipping(data, threshold=1020): '''marks clipping sections Function that marks start and end of clipping part it detects the start and end of clipping segments and returns them Parameters ---------- data : 1-d numpy array Sequence to be scaled threshold: int or float the threshold for clipping, recommended to be a few data points below ADC or sensor max value, to compensate for signal noise default : 1020 Returns ------- out : list of tuples the output is a list of tuples. Each tuple marks the start and endpoint of the detected clipping segment Examples -------- Import heartpy and load example data >>> import heartpy as hp >>> data, _ = hp.load_exampledata(example=2) Let's slice a part of the data that I know contains clipping >>> x = data[2000:3000] >>> mark_clipping(x, threshold=970) [(369, 375), (426, 437), (486, 493), (544, 552), (604, 610), (663, 665), \ (721, 722), (776, 781), (831, 836), (883, 891), (995, 999)] ''' clip_binary = np.where(data > threshold) clipping_edges = np.where(np.diff(clip_binary) > 1)[1] clipping_segments = [] for i in range(0, len(clipping_edges)): if i == 0: #if first clipping segment clipping_segments.append((clip_binary[0][0], clip_binary[0][clipping_edges[0]])) elif i == len(clipping_edges) - 1: #append last entry clipping_segments.append((clip_binary[0][clipping_edges[i]+1], clip_binary[0][-1])) else: clipping_segments.append((clip_binary[0][clipping_edges[i-1] + 1], clip_binary[0][clipping_edges[i]])) return clipping_segments
[docs]def interpolate_clipping(data, sample_rate, threshold=1020): '''interpolate peak waveform Function that interpolates peaks between the clipping segments using cubic spline interpolation. It takes the clipping start +/- 100ms to calculate the spline. Parameters ---------- data : 1d list or numpy array data section to be evaluated sample_rate : int or float sample rate with which the data array is sampled threshold : int or float the threshold for clipping, recommended to be a few data points below ADC or sensor max value, to compensate for signal noise default : 1020 Returns ------- out : array the output is an array with clipping segments replaced by interpolated segments Examples -------- First let's load some example data: >>> import heartpy as hp >>> data, _ = hp.load_exampledata(example=2) >>> x = data[2000:3000] >>> x[425:445] array([948, 977, 977, 977, 977, 978, 978, 977, 978, 977, 977, 977, 977, 914, 820, 722, 627, 536, 460, 394]) And interpolate any clipping segments as such: >>> intp = interpolate_clipping(x, sample_rate=117, threshold=970) >>> intp[425:445] array([ 972, 1043, 1098, 1138, 1163, 1174, 1173, 1159, 1134, 1098, 1053, 998, 934, 848, 747, 646, 552, 470, 402, 348]) ''' clipping_segments = mark_clipping(data, threshold) num_datapoints = int(0.1 * sample_rate) newx = [] newy = [] i = 0 for segment in clipping_segments: if segment[0] < num_datapoints: #if clipping is present at start of signal, skip. #We cannot interpolate accurately when there is insufficient data prior to clipping segment. pass else: antecedent = data[segment[0] - num_datapoints : segment[0]] consequent = data[segment[1] : segment[1] + num_datapoints] segment_data = np.concatenate((antecedent, consequent)) interpdata_x = np.concatenate(([x for x in range(segment[0] - num_datapoints, segment[0])], [x for x in range(segment[1], segment[1] + num_datapoints)])) x_new = np.linspace(segment[0] - num_datapoints, segment[1] + num_datapoints, ((segment[1] - segment[0]) + (2 * num_datapoints))) try: interp_func = UnivariateSpline(interpdata_x, segment_data, k=3) interp_data = interp_func(x_new) data[segment[0] - num_datapoints : segment[1] + num_datapoints] = interp_data except: #pass over failed interpolation: leave original data alone pass return data
[docs]def flip_signal(data, enhancepeaks=False, keep_range=True): '''invert signal waveforms. Function that flips raw signal with negative mV peaks to normal ECG. Required for proper peak finding in case peaks are expressed as negative dips. Parameters ---------- data : 1d list or numpy array data section to be evaluated enhance_peaks : bool whether to apply peak accentuation default : False keep_range : bool whether to scale the inverted data so that the original range is maintained Returns ------- out : 1d array Examples -------- Given an array of data >>> x = [200, 300, 500, 900, 500, 300, 200] We can call the function. If keep_range is False, the signal will be inverted relative to its mean. >>> flip_signal(x, keep_range=False) array([628.57142857, 528.57142857, 328.57142857, -71.42857143, 328.57142857, 528.57142857, 628.57142857]) However, by specifying keep_range, the inverted signal will be put 'back in place' in its original range. >>> flip_signal(x, keep_range=True) array([900., 800., 600., 200., 600., 800., 900.]) It's also possible to use the enhance_peaks function: >>> flip_signal(x, enhancepeaks=True) array([1024. , 621.75746332, 176.85545623, 0. , 176.85545623, 621.75746332, 1024. ]) ''' data_mean = np.mean(data) data_min = np.min(data) data_max = np.max(data) #invert signal data = (data_mean - data) + data_mean if keep_range: #scale data so original range is maintained data = scale_data(data, lower = data_min, upper = data_max) if enhancepeaks: data = enhance_peaks(data) return data
[docs]def enhance_peaks(hrdata, iterations=2): '''enhances peak amplitude relative to rest of signal Function thta attempts to enhance the signal-noise ratio by accentuating the highest peaks. Note: denoise first Parameters ---------- hrdata : 1-d numpy array or list sequence containing heart rate data iterations : int the number of scaling steps to perform default : 2 Returns ------- out : 1-d numpy array array containing enhanced peaks Examples -------- Given an array of data, the peaks can be enhanced using the function >>> x = [200, 300, 500, 900, 500, 300, 200] >>> enhance_peaks(x) array([ 0. , 4.31776016, 76.16528926, 1024. , 76.16528926, 4.31776016, 0. ]) ''' scale_data(hrdata) for i in range(iterations): hrdata = np.power(hrdata, 2) hrdata = scale_data(hrdata) return hrdata
[docs]def enhance_ecg_peaks(hrdata, sample_rate, iterations=4, aggregation='mean', notch_filter=True): '''enhances ECG peaks Function that convolves synthetic QRS templates with the signal, leading to a strong increase signal-to-noise ratio. Function ends with an optional Notch filterstep (default : true) to reduce noise from the iterating convolution steps. Parameters ---------- hrdata : 1-d numpy array or list sequence containing heart rate data sample_rate : int or float sample rate with which the data is sampled iterations : int how many convolutional iterations should be run. More will result in stronger peak enhancement, but over a certain point (usually between 12-16) overtones start appearing in the signal. Only increase this if the peaks aren't amplified enough. default : 4 aggregation : str how the data from the different convolutions should be aggregated. Can be either 'mean' or 'median'. default : mean notch_filter : bool whether to apply a notch filter after the last convolution to get rid of remaining low frequency noise. default : true Returns ------- output : 1d array The array containing the filtered data with enhanced peaks Examples -------- First let's import the module and load the data >>> import heartpy as hp >>> data, timer = hp.load_exampledata(1) >>> sample_rate = hp.get_samplerate_mstimer(timer) After loading the data we call the function like so: >>> filtered_data = enhance_ecg_peaks(data, sample_rate, iterations = 3) By default the module uses the mean to aggregate convolutional outputs. It is also possible to use the median. >>> filtered_data = enhance_ecg_peaks(data, sample_rate, iterations = 3, ... aggregation = 'median', notch_filter = False) In the last example we also disabled the notch filter. ''' #assign output output = np.copy(hrdata) #generate synthetic QRS complexes templates = generate_ecg_templates(sample_rate) for i in range(int(iterations)): convolved = denoise_convolutions(output, sample_rate, templates) if aggregation == 'mean': output = np.nanmean(convolved, axis=0) elif aggregation == 'median': output = np.nanmedian(convolved, axis=0) #offset signal shift (shifts 1 datapoint for every iteration after the first) output = output[int(iterations) - 1:-int(iterations)] if notch_filter: output = filter_signal(output, 0.05, sample_rate, filtertype='notch') return output
def generate_ecg_templates(sample_rate, widths=[50, 60, 70, 80, 100], presets=[[0, 2, 2.5, 3, 3.5, 5], [0, 1, 1.5, 2, 2.5, 3], [0, 3, 3.5, 4, 4.5, 6]], amplitude = [0, -0.1, 1, -0.5, 0, 0]): '''helper function for enhance_ecg_peaks Helper function that generates synthetic QRS complexes of varying sizes to convolve with the signal. ''' templates = [] for i in presets: for j in widths: #duration < 120ms duration = (j / 1000) * sample_rate step = duration / len(i) new_t = [int(step * x) for x in i] new_x = np.linspace(new_t[0], new_t[-1], new_t[-1]) #interpolate peak to fit in correct sampling rate interp_func = interp1d(new_t, amplitude, kind='linear') templates.append(interp_func(new_x)) return templates def denoise_convolutions(data, sample_rate, templates): '''helper function for enhance_ecg_peaks Helper function that convolves the generated synthetic QRS templates with the provided signal. ''' convolutions = [] for i in range(len(templates)): convolved = np.convolve(data, templates[i], mode='same') convolutions.append(convolved) return np.asarray(convolutions)