'''
Functions for loading and slicing data
'''
from datetime import datetime
from pkg_resources import resource_filename
import numpy as np
from scipy.io import loadmat
__all__ = ['get_data',
'get_samplerate_mstimer',
'get_samplerate_datetime',
'_sliding_window',
'rolling_mean',
'outliers_iqr_method',
'outliers_modified_z',
'MAD',
'load_exampledata']
[docs]def get_data(filename, delim=',', column_name='None', encoding=None,
ignore_extension=False):
'''load data from file
Function to load data from a .CSV or .MAT file into numpy array.
File can be accessed from local disk or url.
Parameters
----------
filename : string
absolute or relative path to the file object to read
delim : string
the delimiter used if CSV file passed
default : ','
column_name : string
for CSV files with header: specify column that contains the data
for matlab files it specifies the table name that contains the data
default : 'None'
ignore_extension : bool
if True, extension is not tested, use for example for files where
the extention is not .csv or .txt but the data is formatted as if
it is.
default : False
Returns
-------
out : 1-d numpy array
array containing the data from the requested column of the specified file
Examples
--------
As an example, let's load two example data files included in the package
For this we use pkg_resources for automated testing purposes, you don't need
this when using the function.
>>> from pkg_resources import resource_filename
>>> filepath = resource_filename(__name__, 'data/data.csv')
So, assuming your file lives at 'filepath', you open it as such:
>>> get_data(filepath)
array([530., 518., 506., ..., 492., 493., 494.])
Files with multiple columns can be opened by specifying the 'column_name' where
the data resides:
>>> filepath = resource_filename(__name__, 'data/data2.csv')
Again you don't need the above. It is there for automated testing.
>>> get_data(filepath, column_name='timer')
array([0.00000000e+00, 8.54790319e+00, 1.70958064e+01, ...,
1.28192904e+05, 1.28201452e+05, 1.28210000e+05])
You can open matlab files in much the same way by specifying the column
where the data lives:
>>> filepath = resource_filename(__name__, 'data/data2.mat')
Again you don't need the above. It is there for automated testing.
Open matlab file by specifying the column name as well:
>>> get_data(filepath, column_name='hr')
array([515., 514., 514., ..., 492., 494., 496.])
You can any csv formatted text file no matter the extension if you
set ignore_extension to True:
>>> filepath = resource_filename(__name__, 'data/data.log')
>>> get_data(filepath, ignore_extension = True)
array([530., 518., 506., ..., 492., 493., 494.])
You can specify column names in the same way when using ignore_extension
>>> filepath = resource_filename(__name__, 'data/data2.log')
>>> data = get_data(filepath, column_name = 'hr', ignore_extension = True)
'''
file_ext = filename.split('.')[-1]
if file_ext == 'csv' or file_ext == 'txt':
if column_name != 'None':
hrdata = np.genfromtxt(filename, delimiter=delim, names=True, dtype=None, encoding=None)
try:
hrdata = hrdata[column_name]
except Exception as error:
raise LookupError('\nError loading column "%s" from file "%s". \
Is column name specified correctly?\n The following error was provided: %s'
%(column_name, filename, error))
elif column_name == 'None':
hrdata = np.genfromtxt(filename, delimiter=delim, dtype=np.float64)
else: # pragma: no cover
raise LookupError('\nError: column name "%s" not found in header of "%s".\n'
%(column_name, filename))
elif file_ext == 'mat':
data = loadmat(filename)
if column_name != "None":
hrdata = np.array(data[column_name][:, 0], dtype=np.float64)
else: # pragma: no cover
raise LookupError('\nError: column name required for Matlab .mat files\n\n')
else:
if ignore_extension:
if column_name != 'None':
hrdata = np.genfromtxt(filename, delimiter=delim, names=True, dtype=None, encoding=None)
try:
hrdata = hrdata[column_name]
except Exception as error:
raise LookupError('\nError loading column "%s" from file "%s". \
Is column name specified correctly?\n'
%(column_name, filename))
elif column_name == 'None': # pragma: no cover
hrdata = np.genfromtxt(filename, delimiter=delim, dtype=np.float64)
else: # pragma: no cover
raise LookupError('\nError: column name "%s" not found in header of "%s".\n'
%(column_name, filename))
else:
raise IncorrectFileType('unknown file format')
return None
return hrdata
[docs]def get_samplerate_mstimer(timerdata):
'''detemine sample rate based on ms timer
Function to determine sample rate of data from ms-based timer list or array.
Parameters
----------
timerdata : 1d numpy array or list
sequence containing values of a timer, in ms
Returns
-------
out : float
the sample rate as determined from the timer sequence provided
Examples
--------
first we load a provided example dataset
>>> data, timer = load_exampledata(example = 1)
since it's a timer that counts miliseconds, we use this function.
Let's also round to three decimals
>>> round(get_samplerate_mstimer(timer), 3)
116.996
of course if another time unit is used, converting it to ms-based
should be trivial.
'''
sample_rate = ((len(timerdata) / (timerdata[-1]-timerdata[0]))*1000)
return sample_rate
[docs]def get_samplerate_datetime(datetimedata, timeformat='%H:%M:%S.%f'):
'''determine sample rate based on datetime
Function to determine sample rate of data from datetime-based timer
list or array.
Parameters
----------
timerdata : 1-d numpy array or list
sequence containing datetime strings
timeformat : string
the format of the datetime-strings in datetimedata
default : '%H:%M:%S.f' (24-hour based time including ms: e.g. 21:43:12.569)
Returns
-------
out : float
the sample rate as determined from the timer sequence provided
Examples
--------
We load the data like before
>>> data, timer = load_exampledata(example = 2)
>>> timer[0]
'2016-11-24 13:58:58.081000'
Note that we need to specify the timeformat used so that datetime understands
what it's working with:
>>> round(get_samplerate_datetime(timer, timeformat = '%Y-%m-%d %H:%M:%S.%f'), 3)
100.42
'''
datetimedata = np.asarray(datetimedata, dtype='str') #cast as str in case of np.bytes type
elapsed = ((datetime.strptime(datetimedata[-1], timeformat) -
datetime.strptime(datetimedata[0], timeformat)).total_seconds())
sample_rate = (len(datetimedata) / elapsed)
return sample_rate
def _sliding_window(data, windowsize):
'''segments data into windows
Function to segment data into windows for rolling mean function.
Function returns the data segemented into sections.
Parameters
----------
data : 1d array or list
array or list containing data over which sliding windows are computed
windowsize : int
size of the windows to be created by the function
Returns
-------
out : array of arrays
data segmented into separate windows.
Examples
--------
>>> import numpy as np
>>> data = np.array([1, 2, 3, 4, 5])
>>> windows = _sliding_window(data, windowsize = 3)
>>> windows.shape
(3, 3)
'''
shape = data.shape[:-1] + (data.shape[-1] - windowsize + 1, windowsize)
strides = data.strides + (data.strides[-1],)
return np.lib.stride_tricks.as_strided(data, shape=shape, strides=strides)
[docs]def rolling_mean(data, windowsize, sample_rate):
'''calculates rolling mean
Function to calculate the rolling mean (also: moving average) over the passed data.
Parameters
----------
data : 1-dimensional numpy array or list
sequence containing data over which rolling mean is to be computed
windowsize : int or float
the window size to use, in seconds
calculated as windowsize * sample_rate
sample_rate : int or float
the sample rate of the data set
Returns
-------
out : 1-d numpy array
sequence containing computed rolling mean
Examples
--------
>>> data, _ = load_exampledata(example = 1)
>>> rmean = rolling_mean(data, windowsize=0.75, sample_rate=100)
>>> rmean[100:110]
array([514.49333333, 514.49333333, 514.49333333, 514.46666667,
514.45333333, 514.45333333, 514.45333333, 514.45333333,
514.48 , 514.52 ])
'''
avg_hr = (np.mean(data))
data_arr = np.array(data)
rol_mean = np.mean(_sliding_window(data_arr, int(windowsize*sample_rate)), axis=1)
missing_vals = np.array([avg_hr for i in range(0, int(abs(len(data_arr) - len(rol_mean))/2))])
rol_mean = np.insert(rol_mean, 0, missing_vals)
rol_mean = np.append(rol_mean, missing_vals)
#only to catch length errors that sometimes unexplicably occur.
##Generally not executed, excluded from testing and coverage
if len(rol_mean) != len(data): # pragma: no cover
lendiff = len(rol_mean) - len(data)
if lendiff < 0:
rol_mean = np.append(rol_mean, 0)
else:
rol_mean = rol_mean[:-1]
return rol_mean
[docs]def outliers_iqr_method(hrvalues):
'''removes outliers
Function that removes outliers based on the interquartile range method and
substitutes them for the median
see: https://en.wikipedia.org/wiki/Interquartile_range
Parameters
----------
hrvalues : 1-d numpy array or list
sequence of values, from which outliers need to be identified
Returns
-------
out : tuple
[0] cleaned sequence with identified outliers substituted for the median
[1] list of indices that have been replaced in the original array or list
Examples
--------
>>> x = [2, 4, 3, 4, 6, 7, 35, 2, 3, 4]
>>> outliers_iqr_method(x)
([2, 4, 3, 4, 6, 7, 4.0, 2, 3, 4], [6])
'''
med = np.median(hrvalues)
q1, q3 = np.percentile(hrvalues, [25, 75])
iqr = q3 - q1
lower = q1 - (1.5 * iqr)
upper = q3 + (1.5 * iqr)
output = []
replaced_indices = []
for i in range(0,len(hrvalues)):
if hrvalues[i] < lower or hrvalues[i] > upper:
output.append(med)
replaced_indices.append(i)
else:
output.append(hrvalues[i])
return output, replaced_indices
[docs]def outliers_modified_z(hrvalues):
'''removes outliers
Function that removes outliers based on the modified Z-score metric and
substitutes them for the median
Parameters
----------
hrvalues : 1-d numpy array or list
sequence of values, from which outliers need to be identified
Returns
-------
out : tuple
[0] cleaned sequence with identified outliers substituted for the median
[1] list of indices that have been replaced in the original array or list
Examples
--------
>>> x = [2, 4, 3, 4, 6, 7, 35, 2, 3, 4]
>>> outliers_modified_z(x)
([2, 4, 3, 4, 6, 7, 4.0, 2, 3, 4], [6])
'''
hrvalues = np.array(hrvalues)
threshold = 3.5
med = np.median(hrvalues)
mean_abs_dev = MAD(hrvalues)
modified_z_result = 0.6745 * (hrvalues - med) / mean_abs_dev
output = []
replaced_indices = []
for i in range(0, len(hrvalues)):
if np.abs(modified_z_result[i]) <= threshold:
output.append(hrvalues[i])
else:
output.append(med)
replaced_indices.append(i)
return output, replaced_indices
[docs]def MAD(data):
'''computes median absolute deviation
Function that compute median absolute deviation of data slice
See: https://en.wikipedia.org/wiki/Median_absolute_deviation
Parameters
----------
data : 1-dimensional numpy array or list
sequence containing data over which to compute the MAD
Returns
-------
out : float
the Median Absolute Deviation as computed
Examples
--------
>>> x = [2, 4, 3, 4, 6, 7, 35, 2, 3, 4]
>>> MAD(x)
1.5
'''
med = np.median(data)
return np.median(np.abs(data - med))
[docs]def load_exampledata(example=0):
'''loads example data
Function to load one of the example datasets included in HeartPy
and used in the documentation.
Parameters
----------
example : int (0, 1, 2)
selects example data used in docs of three datafiles.
Available (see github repo for source of files):
0 : data.csv
1 : data2.csv
2 : data3.csv
default : 0
Returns
-------
out : tuple of two arrays
Contains the data and timer column. If no timer data is
available, such as in example 0, an empty second
array is returned.
Examples
--------
This function can load one of the three example data files provided
with HeartPy. It returns both the data and a timer if that is present
For example:
>>> data, _ = load_exampledata(0)
>>> data[0:5]
array([530., 518., 506., 494., 483.])
And another example:
>>> data, timer = load_exampledata(1)
>>> [round(x, 2) for x in timer[0:5]]
[0.0, 8.55, 17.1, 25.64, 34.19]
'''
timer = []
if example == 0:
path = path = 'data/data.csv'
filepath = resource_filename(__name__, path)
data = get_data(filepath)
elif example == 1:
path = path = 'data/data2.csv'
filepath = resource_filename(__name__, path)
data = get_data(filepath, column_name = 'hr')
timer = get_data(filepath, column_name = 'timer')
elif example == 2:
path = path = 'data/data3.csv'
filepath = resource_filename(__name__, path)
data = get_data(filepath, column_name = 'hr')
timer = get_data(filepath, column_name = 'datetime')
else:
raise ValueError('Incorrect data file specified.\
available datafiles are data.csv (0), data2.csv(1), data3.csv(2).')
return data, timer