-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathERANataf.py
435 lines (386 loc) · 18.5 KB
/
ERANataf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
# import of modules
import numpy as np
from scipy import optimize, stats
realmin = np.finfo(np.double).tiny
'''
---------------------------------------------------------------------------
Nataf Transformation of random variables
---------------------------------------------------------------------------
Developed by:
Sebastian Geyer
Felipe Uribe
Iason Papaioannou
Daniel Straub
Assistant Developers:
Luca Sardi
Fong-Lin Wu
Alexander von Ramm
Matthias Willer
Peter Kaplan
Engineering Risk Analysis Group
Technische Universitat Munchen
www.bgu.tum.de/era
Contact: Antonios Kamariotis ([email protected])
---------------------------------------------------------------------------
New Version 2021-03:
* General update to match the current MATLAB version
* Consistent shape of the input and output arrays (also consistent with
scipy.stats standards)
* Adding additional error messages for discrete marginals in X2U and
incorrect correlation matrix inputs
* Renaming the methods 'jointpdf' and 'jointcdf' to 'pdf' and 'cdf'
Version 2019-11:
* Minor check for improvement
Version 2019-01:
* Fixing of bug regarding dimensionality in random method
* Optimization and fixing of minor bugs
* Fixing of bugs in the transformation X to U and vice versa
* Alternative calculation of the Nataf joint CDF in high dimensions
---------------------------------------------------------------------------
* This software performs the Nataf transformation of random variables.
* It is possible to generate random numbers according to their Nataf
joint pdf and to evaluate this pdf.
* The inverse Nataf transformation is also defined as a function.
* It requires the use of Objects of the class ERADist which is also
published on the homepage of the ERA Group of TUM.
---------------------------------------------------------------------------
References:
1. Liu, Pei-Ling and Armen Der Kiureghian (1986) - Multivariate distribution
models with prescribed marginals and covariances.
Probabilistic Engineering Mechanics 1(2), 105-112
---------------------------------------------------------------------------
'''
#%%
class ERANataf(object):
"""
Generation of joint distribution objects.
Construction of the joint distribution object with
Obj = ERANataf(M,Correlation)
'M' must be an list or array of ERADist objects defining the marginal
distributions that together define the joint distribution.
'Correlation' must be a correlation matrix with shape [d,d], where d is
the number of marginal distributions (dimensions) of the joint
distribution. The matrix describes the dependency between the different
marginal distributions. According to the general definition of a
correlation matrix, the input matrix must be symmetric, the matrix entries
on the diagonal must be equal to one, all other entries (correlation
coefficients) can have values between -1 and 1.
"""
def __init__(self, M, Correlation):
"""
Constructor method, for more details have a look at the
class description.
"""
self.Marginals = np.array(M, ndmin=1)
self.Marginals = self.Marginals.ravel()
self.Rho_X = np.array(Correlation, ndmin=2)
n_dist = len(M)
# check if all distributions have finite moments
for i in range(n_dist):
if (not((np.isfinite(self.Marginals[i].mean()) and
np.isfinite(self.Marginals[i].std())))):
raise RuntimeError("The marginal distributions need to have "
"finite mean and variance")
# Check if input for correlation matrix fulfills requirements
try:
np.linalg.cholesky(self.Rho_X)
except np.linalg.LinAlgError:
raise RuntimeError("The given correlation matrix is not positive definite"
"--> Nataf transformation is not applicable.")
if not np.all(self.Rho_X-self.Rho_X.T == 0):
raise RuntimeError("The given correlation matrix is not symmetric "
"--> Nataf transformation is not applicable.")
if not np.all(np.diag(self.Rho_X) == 1):
raise RuntimeError("Not all diagonal entries of the given correlation matrix are equal to one "
"--> Nataf transformation is not applicable.")
'''
Calculation of the transformed correlation matrix. This is achieved
by a quadratic two-dimensional Gauss-Legendre integration
'''
n = 1024
zmax = 8
zmin = -zmax
points, weights = np.polynomial.legendre.leggauss(n)
points = - (0.5 * (points + 1) * (zmax - zmin) + zmin)
weights = weights * (0.5 * (zmax - zmin))
xi = np.tile(points, [n, 1])
xi = xi.flatten(order='F')
eta = np.tile(points, n)
first = np.tile(weights, n)
first = np.reshape(first, [n, n])
second = np.transpose(first)
weights2d = first * second
w2d = weights2d.flatten()
# check is X the identiy
self.Rho_Z = np.identity(n=n_dist)
if (np.linalg.norm(self.Rho_X - np.identity(n=n_dist)) > 10**(-5)):
for i in range(n_dist):
for j in range(i+1, n_dist):
if (self.Rho_X[i, j] == 0):
continue
elif ((self.Marginals[i].Name == 'standardnormal') and
(self.Marginals[j].Name == 'standardnormal')):
self.Rho_Z[i, j] = self.Rho_X[i, j]
self.Rho_Z[j, i] = self.Rho_X[j, i]
continue
elif ((self.Marginals[i].Name == 'normal') and
(self.Marginals[j].Name == 'normal')):
self.Rho_Z[i, j] = self.Rho_X[i, j]
self.Rho_Z[j, i] = self.Rho_X[j, i]
continue
elif ((self.Marginals[i].Name == 'normal') and
(self.Marginals[j].Name == 'lognormal')):
Vj = self.Marginals[j].std()/self.Marginals[j].mean()
self.Rho_Z[i, j] = (self.Rho_X[i, j] *
Vj/np.sqrt(np.log(1 + Vj**2)))
self.Rho_Z[j, i] = self.Rho_Z[i, j]
continue
elif ((self.Marginals[i].Name == 'lognormal') and
(self.Marginals[j].Name == 'normal')):
Vi = self.Marginals[i].std()/self.Marginals[i].mean()
self.Rho_Z[i, j] = (self.Rho_X[i, j] *
Vi/np.sqrt(np.log(1 + Vi**2)))
self.Rho_Z[j, i] = self.Rho_Z[i, j]
continue
elif ((self.Marginals[i].Name == 'lognormal') and
(self.Marginals[j].Name == 'lognormal')):
Vi = self.Marginals[i].std()/self.Marginals[i].mean()
Vj = self.Marginals[j].std()/self.Marginals[j].mean()
self.Rho_Z[i, j] = (np.log(1 + self.Rho_X[i, j]*Vi*Vj)
/ np.sqrt(np.log(1 + Vi**2) *
np.log(1+Vj**2)))
self.Rho_Z[j, i] = self.Rho_Z[i, j]
continue
# solving Nataf
tmp_f_xi = ((self.Marginals[j].icdf(stats.norm.cdf(eta)) -
self.Marginals[j].mean()) /
self.Marginals[j].std())
tmp_f_eta = ((self.Marginals[i].icdf(stats.norm.cdf(xi)) -
self.Marginals[i].mean()) /
self.Marginals[i].std())
coef = tmp_f_xi * tmp_f_eta * w2d
def fun(rho0):
return ((coef *
self.bivariateNormalPdf(xi, eta, rho0)).sum()
- self.Rho_X[i, j])
x0, r = optimize.brentq(f=fun,
a=-1 + np.finfo(float).eps,
b=1 - np.finfo(float).eps,
full_output=True)
if (r.converged == 1):
self.Rho_Z[i, j] = x0
self.Rho_Z[j, i] = self.Rho_Z[i, j]
else:
sol = optimize.fsolve(func=fun,
x0=self.Rho_X[i, j],
full_output=True)
if (sol[2] == 1):
self.Rho_Z[i, j] = sol[0]
self.Rho_Z[j, i] = self.Rho_Z[i, j]
else:
sol = optimize.fsolve(func=fun,
x0=-self.Rho_X[i, j],
full_output=True)
if (sol[2] == 1):
self.Rho_Z[i, j] = sol[0]
self.Rho_Z[j, i] = self.Rho_Z[i, j]
else:
for i in range(10):
init = 2 * np.random.rand() - 1
sol = optimize.fsolve(func=fun,
x0=init,
full_output=True)
if (sol[2] == 1):
break
if (sol[2] == 1):
self.Rho_Z[i, j] = sol[0]
self.Rho_Z[j, i] = self.Rho_Z[i, j]
else:
raise RuntimeError("brentq and fsolve coul"
"d not converge to a "
"solution of the Nataf "
"integral equation")
try:
self.A = np.linalg.cholesky(self.Rho_Z)
except np.linalg.LinAlgError:
raise RuntimeError("Transformed correlation matrix is not positive"
" definite --> Nataf transformation is not "
"applicable.")
#%%
'''
This function performs the transformation from X to U by taking
the inverse standard normal cdf of the cdf of every value. Then it
performs the transformation from Z to U. A is the lower triangular
matrix of the cholesky decomposition of Rho_Z and U is the resulting
independent standard normal vector. Afterwards it calculates the
Jacobian of this Transformation if it is needed.
'''
def X2U(self, X, Jacobian=False):
"""
Carries out the transformation from physical space X to
standard normal space U.
X must be a [n,d]-shaped array (n = number of data points,
d = dimensions).
The Jacobian of the transformation of the first given data
point is only given as an output in case that the input
argument Jacobian=True .
"""
n_dim = len(self.Marginals)
X = np.array(X, ndmin=2)
# check if all marginal distributions are continuous
for i in range(n_dim):
if self.Marginals[i].Name in ['binomial','geometric','negativebinomial','poisson']:
raise RuntimeError("At least one of the marginal distributions is a discrete distribution,"
"the transformation X2U is therefore not possible.")
# check of the dimensions of input X
if X.ndim > 2:
raise RuntimeError("X must have not more than two dimensions. ")
if np.shape(X)[1] == 1 and n_dim != 1:
# in case that only one point X is given, he can be defined either as row or column vector
X = X.T
if np.shape(X)[1] != n_dim:
raise RuntimeError("X must be an array of size [n,d], where d is the"
" number of dimensions of the joint distribution.")
Z = np.zeros(np.flip(X.shape))
for i in range(n_dim):
Z[i,:] = stats.norm.ppf(self.Marginals[i].cdf(X[:, i]))
U = np.linalg.solve(self.A, Z.squeeze()).T
if Jacobian:
diag = np.zeros([n_dim, n_dim])
for i in range(n_dim):
diag[i, i] = stats.norm.pdf(Z[i,0])/self.Marginals[i].pdf(X[0,i])
Jac = np.dot(diag, self.A)
return np.squeeze(U), Jac
else:
return np.squeeze(U)
#%%
def U2X(self, U, Jacobian=False):
"""
Carries out the transformation from standard normal space U
to physical space X.
U must be a [n,d]-shaped array (n = number of data points,
d = dimensions).
The Jacobian of the transformation of the first given data
point is only given as an output in case that the input
argument Jacobian=True .
"""
n_dim = len(self.Marginals)
U = np.array(U, ndmin=2)
# check of the dimensions of input X
if U.ndim > 2:
raise RuntimeError("U must have not more than two dimensions. ")
if np.shape(U)[1] == 1 and n_dim != 1:
# in case that only one point U is given, he can be defined either as row or column vector
U = U.T
if np.shape(U)[1] != n_dim:
raise RuntimeError("U must be an array of size [n,d], where d is the"
" number of dimensions of the joint distribution.")
else:
U = U.T
Z = self.A @ U
X = np.zeros(np.flip(U.shape))
for i in range(n_dim):
X[:, i] = self.Marginals[i].icdf(stats.norm.cdf(Z[i, :]))
if Jacobian:
diag = np.zeros([n_dim, n_dim])
for i in range(n_dim):
diag[i, i] = self.Marginals[i].pdf(X[0,i])/stats.norm.pdf(Z[i,0])
Jac = np.linalg.solve(self.A, diag)
return np.squeeze(X), Jac
else:
return np.squeeze(X)
# %%
def random(self, n=1):
"""
Creates n samples of the joint distribution.
Every row in the output array corresponds to one sample.
"""
n = int(n)
n_dim = np.size(self.Marginals)
U = np.random.randn(n_dim, n)
Z = np.dot(self.A, U)
jr = np.zeros([n,n_dim])
for i in range(n_dim):
jr[:, i] = self.Marginals[i].icdf(stats.norm.cdf(Z[i, :]))
return np.squeeze(jr)
# %%
def pdf(self, X):
"""
Computes the joint PDF.
X must be a [n,d]-shaped array (n = number of data points,
d = dimensions).
"""
n_dim = len(self.Marginals)
X = np.array(X, ndmin=2)
# check if all marginal distributions are continuous
for i in range(n_dim):
if self.Marginals[i].Name in ['binomial','geometric','negativebinomial','poisson']:
raise RuntimeError("At least one of the marginal distributions is a discrete distribution,"
"the transformation X2U is therefore not possible.")
# check of the dimensions of input X
if X.ndim > 2:
raise RuntimeError("X must have not more than two dimensions.")
if np.shape(X)[1] == 1 and n_dim != 1:
# in case that only one point X is given, he can be defined either as row or column vector
X = X.T
if np.shape(X)[1] != n_dim:
raise RuntimeError("X must be an array of size [n,d], where d is the"
" number of dimensions of the joint distribution.")
n_X = np.shape(X)[0]
U = np.zeros([n_X, n_dim])
phi = np.zeros([n_dim, n_X])
f = np.zeros([n_dim, n_X])
mu = np.zeros(n_dim)
for i in range(n_dim):
U[:, i] = stats.norm.ppf(self.Marginals[i].cdf(X[:, i]))
phi[i, :] = stats.norm.pdf(U[:, i])
f[i, :] = self.Marginals[i].pdf(X[:, i])
phi_n = stats.multivariate_normal.pdf(U, mu, self.Rho_Z)
jointpdf = np.zeros(n_X)
for i in range(n_X):
try:
jointpdf[i] = ((np.prod(f[:, i])/(np.prod(phi[:, i])+realmin)) * phi_n[i])
except IndexError:
# In case of n=1, phi_n is a scalar.
jointpdf[i] = ((np.prod(f[:, i])/(np.prod(phi[:, i])+realmin)) * phi_n)
except ZeroDivisionError:
jointpdf[i] = 0
if np.size(jointpdf) == 1:
return jointpdf[0]
else:
return jointpdf
#%%
def cdf(self, X):
"""
Computes the joint CDF.
X must be a [n,d]-shaped array (n = number of data points,
d = dimensions).
The CDF computation is based on the multivariate normal cdf.
In scipy the multivariate normal cdf is computed by Monte Carlo
sampling, the output of this method is therefore also a
stochastic quantity.
"""
n_dim = len(self.Marginals)
X = np.array(X, ndmin=2)
# check of the dimensions of input X
if X.ndim > 2:
raise RuntimeError("X must have not more than two dimensions. ")
if np.shape(X)[1] == 1 and n_dim != 1:
# in case that only one point X is given, he can be defined either as row or column vector
X = X.T
if np.shape(X)[1] != n_dim:
raise RuntimeError("X must be an array of size [n,d], where d is the"
" number of dimensions of the joint distribution.")
n_X = np.shape(X)[0]
U = np.zeros([n_X, n_dim])
for i in range(n_dim):
U[:, i] = stats.norm.ppf(self.Marginals[i].cdf(X[:, i]))
mu = np.zeros(n_dim)
jointcdf = stats.multivariate_normal.cdf(U,mean=mu,cov=np.matrix(self.Rho_Z))
return jointcdf
#%%
@staticmethod
def bivariateNormalPdf(x1, x2, rho):
return (1 / (2 * np.pi * np.sqrt(1-rho**2)) *
np.exp(-1/(2*(1-rho**2)) *
(x1**2 - 2 * rho * x1 * x2 + x2**2)))