-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathinterest_model.py
130 lines (106 loc) · 4.4 KB
/
interest_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import numpy as np
from explauto.utils import rand_bounds
from explauto.interest_model.interest_model import InterestModel
from explauto.interest_model.random import RandomInterest
from explauto.interest_model.competences import competence_dist
from explauto.models.dataset import BufferedDataset as Dataset
class MiscRandomInterest(RandomInterest):
"""
Add some features to the RandomInterest random babbling class.
Allows to query the recent interest in the whole space,
the recent competence on the babbled points in the whole space,
the competence around a given point based on a mean of the knns.
"""
def __init__(self,
conf,
expl_dims,
competence_measure,
win_size,
competence_mode,
k,
progress_mode):
RandomInterest.__init__(self, conf, expl_dims)
self.competence_measure = competence_measure
self.win_size = win_size
self.competence_mode = competence_mode
self.dist_max = np.linalg.norm(self.bounds[0,:] - self.bounds[1,:])
self.k = k
self.progress_mode = progress_mode
self.data_xc = Dataset(len(expl_dims), 1)
self.data_sr = Dataset(len(expl_dims), 0)
self.current_progress = 0.
self.current_interest = 0.
def add_xc(self, x, c):
self.data_xc.add_xy(x, [c])
def add_sr(self, x):
self.data_sr.add_xy(x)
def update_interest(self, i):
self.current_progress += (1. / self.win_size) * (i - self.current_progress)
self.current_interest = abs(self.current_progress)
def update(self, xy, ms, snnp=None, sp=None):
c = self.competence_measure(xy[self.expl_dims], ms[self.expl_dims], dist_max=self.dist_max)
if self.progress_mode == 'local':
interest = self.interest_xc(xy[self.expl_dims], c)
self.update_interest(interest)
elif self.progress_mode == 'global':
pass
self.add_xc(xy[self.expl_dims], c)
self.add_sr(ms[self.expl_dims])
return interest
def n_points(self):
return len(self.data_xc)
def competence_global(self, mode='sw'):
if self.n_points() > 0:
if mode == 'all':
return np.mean(self.data_c)
elif mode == 'sw':
idxs = range(self.n_points())[- self.win_size:]
return np.mean([self.data_xc.get_y(idx) for idx in idxs])
else:
raise NotImplementedError
else:
return 0.
def mean_competence_pt(self, x):
if self.n_points() > self.k:
_, idxs = self.data_xc.nn_x(x, k=self.k)
return np.mean([self.data_xc.get_y(idx) for idx in idxs])
else:
return self.competence()
def interest_xc(self, x, c):
if self.n_points() > 0:
idx_sg_NN = self.data_xc.nn_x(x, k=1)[1][0]
sr_NN = self.data_sr.get_x(idx_sg_NN)
c_old = competence_dist(x, sr_NN, dist_max=self.dist_max)
return c - c_old
#return np.abs(c - c_old)
else:
return 0.
def interest_pt(self, x):
if self.n_points() > self.k:
_, idxs = self.data_xc.nn_x(x, k=self.k)
idxs = sorted(idxs)
v = [self.data_xc.get_y(idx) for idx in idxs]
n = len(v)
comp_beg = np.mean(v[:int(float(n)/2.)])
comp_end = np.mean(v[int(float(n)/2.):])
return np.abs(comp_end - comp_beg)
else:
return self.interest_global()
def interest_global(self):
if self.n_points() < 2:
return 0.
else:
idxs = range(self.n_points())[- self.win_size:]
v = [self.data_xc.get_y(idx) for idx in idxs]
n = len(v)
comp_beg = np.mean(v[:int(float(n)/2.)])
comp_end = np.mean(v[int(float(n)/2.):])
return np.abs(comp_end - comp_beg)
def competence(self): return self.competence_global()
def interest(self):
if self.progress_mode == 'local':
return self.current_interest
elif self.progress_mode == 'global':
return self.interest_global()
else:
raise NotImplementedError