forked from ZiyaoWei/pyMatrixProfile
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mp.py
48 lines (39 loc) · 1.63 KB
/
mp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import numpy as np
import distanceProfile
import order
def _matrixProfile(tsA, m, orderClass, distanceProfileFunction, tsB = None):
order = orderClass(len(tsA) - m + 1)
mp = np.full(len(tsA) - m + 1, np.inf)
mpIndex = np.full(len(tsB) - m + 1, None, dtype = np.float)
idx = order.next()
for idx in range(len(tsA)-m+1):
(distanceProfile, querySegementsId) = distanceProfileFunction(tsA, idx, m, tsB)
idsToUpdate = distanceProfile < mp
mpIndex[idsToUpdate] = querySegementsId[idsToUpdate]
mp[idsToUpdate] = distanceProfile[idsToUpdate]
#idx = order.next()
return (mp, mpIndex)
def naiveMP(tsA, m, tsB = None):
"""
>>> np.round(naiveMP(tsA = np.array([0.0, 1.0, -1.0, 0.0, 0.0]), tsB = np.array([-1, 1, 0, 0, -1, 1]), m = 4), 3)
array([[ 2., 2., 2.],
[ 0., 1., 0.]])
"""
return _matrixProfile(tsA, m, order.LinearOrder, distanceProfile.naiveDistanceProfile, tsB)
def stmp(tsA, m, tsB = None):
"""
>>> np.round(stmp(tsA = np.array([0.0, 1.0, -1.0, 0.0, 0.0]), tsB = np.array([-1, 1, 0, 0, -1, 1]), m = 4), 3)
array([[ 2., 2., 2.],
[ 0., 1., 0.]])
"""
return _matrixProfile(tsA, m, order.LinearOrder, distanceProfile.stampDistanceProfile, tsB)
def stamp(tsA, m, tsB = None):
"""
>>> np.round(stmp(tsA = np.array([0.0, 1.0, -1.0, 0.0, 0.0]), tsB = np.array([-1, 1, 0, 0, -1, 1]), m = 4), 3)
array([[ 2., 2., 2.],
[ 0., 1., 0.]])
"""
return _matrixProfile(tsA, m, order.RandomOrder, distanceProfile.stampDistanceProfile, tsB)
if __name__ == "__main__":
import doctest
doctest.testmod()