-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
140 lines (113 loc) · 5.48 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#implementation to get heart rate from PCG
# Get heatrate:
# From Schmidt:
# "The duration of the heart cycle is estimated as the time from lag zero
# to the highest peaks between 500 and 2000 ms in the resulting
# autocorrelation"
# This is performed after filtering and spike removal:
#function [heartRate systolicTimeInterval] = getHeartRateSchmidt(audio_data, Fs, figures)
def butterworth_low_pass_filter(original_signal,order,cutoff,sampling_frequency, figure=False):
from scipy import signal
import numpy as np
b, a = signal.butter(order, 2*cutoff/sampling_frequency, 'low')
low_pass_filtered_signal = signal.filtfilt(b, a, np.squeeze(original_signal),padlen=3*max(len(a), len(b))-3)
return low_pass_filtered_signal
def butterworth_high_pass_filter(original_signal,order,cutoff,sampling_frequency, figures=False):
from scipy import signal
import numpy as np
b, a = signal.butter(order, 2*cutoff/sampling_frequency, 'high')
high_pass_filtered_signal = signal.filtfilt(b, a,np.squeeze(original_signal),padlen=3*max(len(a), len(b))-3)
return high_pass_filtered_signal
#not tested
def schmidt_spike_removal(original_signal,fs,figures=False):
from statistics import median
import numpy as np
#Find the window size
windowsize = round(fs/2)
#Find any samples outside of a integer number of windows
trailingsamples = len(original_signal)%windowsize
#Reshape the signal into a number of windows
sampleframes=[original_signal[i*windowsize:(windowsize)+i*windowsize] for i in range((len(original_signal)-(trailingsamples))//windowsize)]
sampleframes=np.array(sampleframes)
#Find the MAAs
MAAs = [ max([abs(number) for number in individual_frame]) for individual_frame in sampleframes]
while any(MAA >median(MAAs)*3 for MAA in MAAs):#Find the window with the max MAA
window_num=np.argmax(MAAs)
#Find the postion of the spike within that window
abssampleframe=[np.abs(number) for number in sampleframes[window_num]]
spike_position=np.argmax(abssampleframe)
#Finding zero crossings (where there may not be actual 0 values, just a change from positive to negative)
selected_window=sampleframes[window_num]
zero_crossings=np.abs(np.diff(np.sign(selected_window)))
zero_crossings=np.append(zero_crossings,0)
zero_crossings=(zero_crossings>1).astype(int)
#Find the start of the spike, finding the last zero crossing before
#spike position. If that is empty, take the start of the window:
zero_crossings_list=zero_crossings.tolist()
start_nonzeros=[i for i, e in enumerate(zero_crossings_list[:spike_position]) if e != 0]
if not start_nonzeros:
spike_start=0
else:
spike_start=start_nonzeros[-1]
#Find the end of the spike, finding the first zero crossing after
#spike position. If that is empty, take the end of the window:
zero_crossings_list[:spike_position] = [0] * (spike_position)
after_nonzeros=[i for i, e in enumerate(zero_crossings_list) if e != 0]
if not after_nonzeros:
spike_end=windowsize
else:
spike_end=after_nonzeros[-1]
#Set to Zero
sampleframes[window_num][spike_start:spike_end] = 0.0001
#Recaclulate MAAs
MAAs = [ max([abs(number) for number in individual_frame]) for individual_frame in sampleframes]
despiked_signal = sampleframes
# Add the trailing samples back to the signal:
despiked_signal=np.append(despiked_signal,np.array(original_signal[despiked_signal.size:-1]))
return despiked_signal
def Homomorphic_Envelope_with_Hilbert(input_signal, samplingFrequency,lpf_frequency=8,figures=False):
#8Hz, 1st order, Butterworth LPF
from scipy import signal
import numpy as np
B_low,A_low = signal.butter(1,2*lpf_frequency/samplingFrequency,'low')
hilbert_coeff=np.array(signal.hilbert(input_signal))
homomorphic_envelope = np.exp(signal.filtfilt(B_low,A_low,np.log(np.abs(hilbert_coeff))))
#Remove spurious spikes in first sample:
homomorphic_envelope[0] = homomorphic_envelope[1]
return homomorphic_envelope
#not tested
def getHeartRate(audio_data,Fs,figures=False):
# 25-400Hz 4th order Butterworth band pass
audio_data = butterworth_low_pass_filter(audio_data,2,400,Fs)
audio_data = butterworth_high_pass_filter(audio_data,2,25,Fs)
# Spike removal from the original paper:
audio_data = schmidt_spike_removal(audio_data,Fs)
# Find the homomorphic envelope
homomorphic_envelope = Homomorphic_Envelope_with_Hilbert(audio_data, Fs)
# Find the autocorrelation:
import numpy as np
h_e = np.array(homomorphic_envelope)
y=h_e-np.mean(h_e)
auto_coeff = np.correlate(y, y, mode='full')
c = auto_coeff/(np.square(np.max(auto_coeff)))
signal_autocorrelation = c[len(homomorphic_envelope):-1]
min_index = int(0.5*Fs)
max_index = int(2*Fs)
index = np.argmax(signal_autocorrelation[min_index:max_index])
true_index = index+min_index-1
heartRate = 60/(true_index/Fs)
# Find the systolic time interval:
max_sys_duration = int(round(((60/heartRate)*Fs)/2))
min_sys_duration = int(round(0.2*Fs))
pos = np.argmax(signal_autocorrelation[min_sys_duration:max_sys_duration])
systolicTimeInterval = (min_sys_duration+pos)/Fs;
return heartRate, systolicTimeInterval
if __name__ == '__main__':
from scipy.io import loadmat
mat = loadmat('test.mat')
audio_data=mat['audio_data']
audio_Fs=1000
heartRate, systolicTimeInterval = getHeartRate(audio_data,audio_Fs)
print(heartRate,systolicTimeInterval)
#test result here: 60.24096385542169 0.497
#matlab result: 70.921985815602840 0.285000000000000