-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
308 lines (254 loc) · 10.1 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# Taken from https://github.com/davebiagioni/pyomp
import numpy as np
from scipy.optimize import nnls
class Result(object):
"""Result object for storing input and output data for omp. When called from
`omp`, runtime parameters are passed as keyword arguments and stored in the
`params` dictionary.
Attributes:
X: Predictor array after (optional) standardization.
y: Response array after (optional) standarization.
ypred: Predicted response.
residual: Residual vector.
coef: Solution coefficients.
active: Indices of the active (non-zero) coefficient set.
err: Relative error per iteration.
params: Dictionary of runtime parameters passed as keyword args.
"""
def __init__(self, **kwargs):
# to be computed
self.X = None
self.y = None
self.ypred = None
self.residual = None
self.coef = None
self.active = None
self.err = None
# runtime parameters
self.params = {}
for key, val in kwargs.items():
self.params[key] = val
def update(self, coef, active, err, residual, ypred):
"""Update the solution attributes."""
self.coef = coef
self.active = active
self.err = err
self.residual = residual
self.ypred = ypred
def omp(
X, y, const, nonneg=True, ncoef=None, maxit=5000, tol=1e-5, ztol=1e-14, verbose=True
):
"""Compute sparse orthogonal matching pursuit solution with unconstrained
or non-negative coefficients.
Args:
X: Dictionary array of size n_samples x n_features.
y: Reponse array of size n_samples x 1.
nonneg: Enforce non-negative coefficients.
ncoef: Max number of coefficients. Set to n_features/2 by default.
tol: Convergence tolerance. If relative error is less than
tol * ||y||_2, exit.
ztol: Residual covariance threshold. If all coefficients are less
than ztol * ||y||_2, exit.
verbose: Boolean, print some info at each iteration.
Returns:
result: Result object. See Result.__doc__
"""
def norm2(x):
return np.linalg.norm(x) / np.sqrt(len(x))
# initialize result object
result = Result(nnoneg=nonneg, ncoef=ncoef, maxit=maxit, tol=tol, ztol=ztol)
# if verbose:
# print(result.params)
# check types, try to make somewhat user friendly
if type(X) is not np.ndarray:
X = np.array(X)
if type(y) is not np.ndarray:
y = np.array(y)
# check that n_samples match
if X.shape[0] != len(y):
# print('X and y must have same number of rows (samples)')
return result
# store arrays in result object
result.y = y
result.X = X
# for rest of call, want y to have ndim=1
if np.ndim(y) > 1:
y = np.reshape(y, (len(y),))
# by default set max number of coef to half of total possible
if ncoef is None:
ncoef = int(X.shape[1] / 2)
# initialize things
X_transpose = X.T # store for repeated use
# active = np.array([], dtype=int) # initialize list of active set
active = []
coef = np.zeros(X.shape[1], dtype=float) # solution vector
residual = y # residual vector
ypred = np.zeros(y.shape, dtype=float)
ynorm = norm2(y) # store for computing relative err
err = np.zeros(maxit, dtype=float) # relative err vector
# Check if response has zero norm, because then we're done. This can happen
# in the corner case where the response is constant and you normalize it.
if ynorm < tol: # the same as ||residual|| < tol * ||residual||
# print('Norm of the response is less than convergence tolerance.')
result.update(coef, active, err[0], residual, ypred)
return result
# convert tolerances to relative
tol = tol * ynorm # convergence tolerance
ztol = ztol * ynorm # threshold for residual covariance
# if verbose:
# print('\nIteration, relative error, number of non-zeros')
# main iteration
for it in range(maxit):
# compute residual covariance vector and check threshold
rcov = np.dot(X_transpose, residual)
if nonneg:
i = np.argmax(rcov)
rc = rcov[i]
else:
# i = np.argmax(np.abs(rcov))
# print(rcov)
norm_x = np.diag(np.matmul(X_transpose, X))
est = rcov / np.sqrt(norm_x)
prob_exp = np.exp((0.5 * const) * est ** 2)
prob_exp = prob_exp / np.sum(prob_exp)
ind = np.arange(0, prob_exp.shape[0])
i = np.random.choice(ind, size=1, replace=True, p=prob_exp)[0]
# print(l)
rc = np.abs(rcov[i])
if rc < ztol:
# print('All residual covariances are below threshold.')
break
# update active set
if i not in active:
# active = np.concatenate([active, [i]], axis=1)
active.append(i)
# solve for new coefficients on active set
if nonneg:
coefi, _ = nnls(X[:, active], y)
else:
coefi, _, _, _ = np.linalg.lstsq(X[:, active], y)
coef[active] = coefi # update solution
# update residual vector and error
residual = y - np.dot(X[:, active], coefi)
ypred = y - residual
err[it] = norm2(residual) / ynorm
# print status
# if verbose:
# print('{}, {}, {}'.format(it, err[it], len(active)))
# check stopping criteria
if err[it] < tol: # converged
print("\nConverged.")
break
if len(active) >= ncoef: # hit max coefficients
break
# print('\nFound solution with max number of coefficients.')
# break
if it == maxit - 1: # max iterations
print("\nHit max iterations.")
result.update(coef, active, err[: (it + 1)], residual, ypred)
return result
def omp_orig(
X, y, const, nonneg=True, ncoef=None, maxit=5000, tol=1e-5, ztol=1e-14, verbose=True
):
"""Compute sparse orthogonal matching pursuit solution with unconstrained
or non-negative coefficients.
Args:
X: Dictionary array of size n_samples x n_features.
y: Reponse array of size n_samples x 1.
nonneg: Enforce non-negative coefficients.
ncoef: Max number of coefficients. Set to n_features/2 by default.
tol: Convergence tolerance. If relative error is less than
tol * ||y||_2, exit.
ztol: Residual covariance threshold. If all coefficients are less
than ztol * ||y||_2, exit.
verbose: Boolean, print some info at each iteration.
Returns:
result: Result object. See Result.__doc__
"""
def norm2(x):
return np.linalg.norm(x) / np.sqrt(len(x))
# initialize result object
result = Result(nnoneg=nonneg, ncoef=ncoef, maxit=maxit, tol=tol, ztol=ztol)
# if verbose:
# print(result.params)
# check types, try to make somewhat user friendly
if type(X) is not np.ndarray:
X = np.array(X)
if type(y) is not np.ndarray:
y = np.array(y)
# check that n_samples match
if X.shape[0] != len(y):
# print('X and y must have same number of rows (samples)')
return result
# store arrays in result object
result.y = y
result.X = X
# for rest of call, want y to have ndim=1
if np.ndim(y) > 1:
y = np.reshape(y, (len(y),))
# by default set max number of coef to half of total possible
if ncoef is None:
ncoef = int(X.shape[1] / 2)
# initialize things
X_transpose = X.T # store for repeated use
# active = np.array([], dtype=int) # initialize list of active set
active = []
coef = np.zeros(X.shape[1], dtype=float) # solution vector
residual = y # residual vector
ypred = np.zeros(y.shape, dtype=float)
ynorm = norm2(y) # store for computing relative err
err = np.zeros(maxit, dtype=float) # relative err vector
# Check if response has zero norm, because then we're done. This can happen
# in the corner case where the response is constant and you normalize it.
if ynorm < tol: # the same as ||residual|| < tol * ||residual||
# print('Norm of the response is less than convergence tolerance.')
result.update(coef, active, err[0], residual, ypred)
return result
# convert tolerances to relative
tol = tol * ynorm # convergence tolerance
ztol = ztol * ynorm # threshold for residual covariance
# if verbose:
# print('\nIteration, relative error, number of non-zeros')
# main iteration
for it in range(maxit):
# compute residual covariance vector and check threshold
rcov = np.dot(X_transpose, residual)
if nonneg:
i = np.argmax(rcov)
rc = rcov[i]
else:
i = np.argmax(np.abs(rcov))
# print(rcov)
rc = np.abs(rcov[i])
if rc < ztol:
# print('All residual covariances are below threshold.')
break
# update active set
if i not in active:
# active = np.concatenate([active, [i]], axis=1)
active.append(i)
# solve for new coefficients on active set
if nonneg:
coefi, _ = nnls(X[:, active], y)
else:
coefi, _, _, _ = np.linalg.lstsq(X[:, active], y)
coef[active] = coefi # update solution
# update residual vector and error
residual = y - np.dot(X[:, active], coefi)
ypred = y - residual
err[it] = norm2(residual) / ynorm
# print status
# if verbose:
# print('{}, {}, {}'.format(it, err[it], len(active)))
# check stopping criteria
if err[it] < tol: # converged
print("\nConverged.")
break
if len(active) >= ncoef: # hit max coefficients
break
# print('\nFound solution with max number of coefficients.')
# break
if it == maxit - 1: # max iterations
print("\nHit max iterations.")
result.update(coef, active, err[: (it + 1)], residual, ypred)
return result