-
Notifications
You must be signed in to change notification settings - Fork 11
/
emd.py
61 lines (46 loc) · 1.97 KB
/
emd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import numpy as np
import scipy as sp
def emd(x, nIMF = 3, stoplim = .001):
"""Perform empirical mode decomposition to extract 'niMF' components out of the signal 'x'."""
r = x
t = np.arange(len(r))
imfs = np.zeros(nIMF,dtype=object)
for i in range(nIMF):
r_t = r
is_imf = False
while is_imf == False:
# Identify peaks and troughs
pks = sp.signal.argrelmax(r_t)[0]
trs = sp.signal.argrelmin(r_t)[0]
# Interpolate extrema
pks_r = r_t[pks]
fip = sp.interpolate.InterpolatedUnivariateSpline(pks,pks_r,k=3)
pks_t = fip(t)
trs_r = r_t[trs]
fitr = sp.interpolate.InterpolatedUnivariateSpline(trs,trs_r,k=3)
trs_t = fitr(t)
# Calculate mean
mean_t = (pks_t + trs_t) / 2
mean_t = _emd_complim(mean_t, pks, trs)
# Assess if this is an IMF (only look in time between peaks and troughs)
sdk = _emd_comperror(r_t, mean_t, pks, trs)
# if not imf, update r_t and is_imf
if sdk < stoplim:
is_imf = True
else:
r_t = r_t - mean_t
imfs[i] = r_t
r = r - imfs[i]
return imfs
def _emd_comperror(h, mean, pks, trs):
"""Calculate the normalized error of the current component"""
samp_start = np.max((np.min(pks),np.min(trs)))
samp_end = np.min((np.max(pks),np.max(trs))) + 1
return np.sum(np.abs(mean[samp_start:samp_end]**2)) / np.sum(np.abs(h[samp_start:samp_end]**2))
def _emd_complim(mean_t, pks, trs):
"""Discard the mean extrema envelope past the first and last extrema"""
samp_start = np.max((np.min(pks),np.min(trs)))
samp_end = np.min((np.max(pks),np.max(trs))) + 1
mean_t[:samp_start] = mean_t[samp_start]
mean_t[samp_end:] = mean_t[samp_end]
return mean_t