Skip to content

Commit

Permalink
Initial sampling commit
Browse files Browse the repository at this point in the history
  • Loading branch information
etpeterson committed Nov 27, 2024
1 parent 5c217fc commit 68a9d33
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 5 deletions.
130 changes: 130 additions & 0 deletions src/original/ETP_SRI/Sampling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import numpy as np
import emcee
import tqdm
import corner
from scipy.stats import rice, norm, uniform
from matplotlib import pyplot as plt

from utilities.data_simulation.GenerateData import GenerateData

class MCMC:
"""
Performs sampling of exponential data
"""
def __init__(self,
data,
b,
data_scale=1e-5,
parameter_scale=(1e-7, 1e-11, 1e-9),
bounds=((0, 1), (0, 1), (0, 1)),
priors=None,
gaussian_noise=False,
nwalkers=16,
nsteps=10000,
burn_in=2000,
progress=True):
"""
Parameters
----------
"""
self.data = np.atleast_2d(np.asarray(data))
self.b = np.atleast_2d(np.asarray(b)).T
self.data_scale = np.asarray(data_scale)
self.parameter_scale = np.asarray(parameter_scale)
self.bounds = np.asarray(bounds)
self.priors = np.asarray(priors)
self.nwalkers = nwalkers
self.nsteps = nsteps
self.burn_in = burn_in
self.progress = progress
if priors is None:
self.prior = self.zero_prior
else:
self.prior = self.loglike_gauss_prior
if gaussian_noise:
self.likelihood = self.biexp_loglike_gauss
else:
self.likelihood = self.biexp_loglike_rice
self.ndim = 3
self.chain = None
self.means = None
self.stds = None

def accepted_dimensions(self):
return (1, 1)

def bounds_prior(self, params):
return np.sum(uniform.logpdf(params, loc=self.bounds[:, 0], scale=self.bounds[:, 1] - self.bounds[:, 0]), 1)

def loglike_gauss_prior(self, params):
return np.sum(norm.logpdf(params, loc=self.priors[:, 0], scale=self.priors[:, 1]), 1)

def zero_prior(self, params):
return 0

def signal(self, f, D, D_star):
return (f * np.exp(-self.b * D_star) + (1 - f) * np.exp(-self.b * D)).T

def biexp_loglike_gauss(self, f, D, D_star):
expected = self.signal(f, D, D_star)
# check this!
# print(f'likelihood {norm.logpdf(self.data, loc=expected/self.data_scale, scale=self.data_scale)}')
return np.sum(norm.logpdf(self.data, loc=expected, scale=self.data_scale), 1)

# def biexp_loglike_gauss_full(self, f, D, D_star):
# expected = self.signal(f, D, D_star)
# print(f'expected {expected}')
# print(f'data {self.data}')
# return norm.logpdf(self.data, loc=expected, scale=self.data_scale)

def biexp_loglike_rice(self, f, D, D_star):
expected = self.signal(f, D, D_star)
# print(f'expected {expected}')
return np.sum(rice.logpdf(self.data, b=expected/self.data_scale, scale=self.data_scale), 1)

def posterior(self, params):
params = np.atleast_2d(params)
total = self.bounds_prior(params)
# print(f'bounds params {total}')
neginf = np.isneginf(total)
# print(f'neginf {neginf}')
f = params[~neginf, 0]
D = params[~neginf, 1]
D_star = params[~neginf, 2]
prior = self.prior(params[~neginf, :])
# print(f'prior {prior}')
likelihood = self.likelihood(f, D, D_star)
# print(f'likelihood {likelihood}')
total[~neginf] += prior + likelihood
return total

def sample(self, initial_pos):
# f = initial_pos[0]
# D = initial_pos[1]
# D_star = initial_pos[2]
# print(f'initial pos likelihood {self.biexp_loglike_gauss_full(f, D, D_star)}')
print(f'initial pos likelihood {self.posterior(initial_pos)}')
sampler = emcee.EnsembleSampler(self.nwalkers, 3, self.posterior, vectorize=True)
pos = initial_pos + self.parameter_scale * np.random.randn(self.nwalkers, self.ndim)
# print(f'pos {pos}')
# print(f'nsteps {self.nsteps}')
sampler.run_mcmc(pos, self.nsteps, progress=True)
self.chain = sampler.get_chain(discard=self.burn_in, flat=True)
self.means = np.mean(self.chain, 0)
self.stds = np.std(self.chain, 0)
print(f'final pos likelihood {self.posterior(self.means)}')
# print(f'final pos likelihood {self.biexp_loglike_gauss_full(self.means[0], self.means[1], self.means[2])}')
# print(f'chain {self.chain}')
return self.means, self.stds

def plot(self, truths=None, labels=('f', 'D', 'D*'), overplot=None):
if truths is None:
truths = self.means
# print(f'chain size {self.chain.shape}')
fig = corner.corner(self.chain, labels=labels, truths=truths)
fig.suptitle("Sampling of the IVIM data", fontsize=16)
if overplot is not None:
corner.overplot_lines(fig, overplot, color='r')
plt.show()

58 changes: 53 additions & 5 deletions tests/IVIMmodels/unit_tests/simple_test_run_of_algorithm.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,81 @@
import numpy as np
import matplotlib.pyplot as plt
import os
from pathlib import Path
#from src.standardized.ETP_SRI_LinearFitting import ETP_SRI_LinearFitting
from src.standardized.IAR_LU_biexp import IAR_LU_biexp
# from src.standardized.IAR_LU_biexp import IAR_LU_biexp
#from src.standardized.IAR_LU_segmented_2step import IAR_LU_segmented_2step
from src.standardized.PvH_KB_NKI_IVIMfit import PvH_KB_NKI_IVIMfit
#from src.standardized.PV_MUMC_biexp import PV_MUMC_biexp
from src.original.ETP_SRI.Sampling import MCMC

## Simple test code...
# Used to just do a test run of an algorithm during development
def dev_test_run(model, **kwargs):
bvalues = np.array([0, 50, 100, 150, 200, 500, 800])
bvalues = np.array([0, 20, 50, 75, 100, 150, 200, 300, 400, 500, 800, 1000, 1500])

def ivim_model(b, S0=1, f=0.1, Dstar=0.01, D=0.001):
def ivim_model(b, f=0.1, Dstar=0.01, D=0.001, S0=1):
# print(f'S0 {S0}')
# print(f'Dstar {f*np.exp(-b*Dstar)}')
# print(f'D {(1-f)*np.exp(-b*D)}')
# print(f'sum {f*np.exp(-b*Dstar) + (1-f)*np.exp(-b*D)}')
# print(f'S0 {(f*np.exp(-b*Dstar) + (1-f)*np.exp(-b*D))}')
return S0*(f*np.exp(-b*Dstar) + (1-f)*np.exp(-b*D))

signals = ivim_model(bvalues)
# TODO: add S0 fitting!
true_f = 0.4
true_Dstar = 0.01
true_D = 0.001
truth = [true_f, true_D, true_Dstar]
signals_noiseless = ivim_model(bvalues, true_f, true_Dstar, true_D)
print(f'noiselss {signals_noiseless}')
signals = signals_noiseless + np.abs(1e-1 * (np.random.randn(len(bvalues)) + 1j * np.random.randn(len(bvalues))) / np.sqrt(2))

#model = ETP_SRI_LinearFitting(thresholds=[200])
if kwargs:
results = model.osipi_fit(signals, bvalues, **kwargs)
else:
results = model.osipi_fit(signals, bvalues)
print(results)
# print(results) # f, D*, D
results_reordered = np.asarray([results[0], results[2], results[1]])
print(truth)
print(results_reordered)
#test = model.osipi_simple_bias_and_RMSE_test(SNR=20, bvalues=bvalues, f=0.1, Dstar=0.03, D=0.001, noise_realizations=10)
signal_results = ivim_model(bvalues, results[0], results[1], results[2])



mcmc = MCMC(signals, bvalues, gaussian_noise=False, data_scale=1e-2) #, priors=((0.07, 1e-1), (0.0135, 1e-1), (0.001, 1e-1)))
means, stds = mcmc.sample(truth)
print(f'means {means} stds {stds}')
print(f'expected {results_reordered}')
print(f'truth {truth}')

signal_means = ivim_model(bvalues, means[0], means[2], means[1])
plt.plot(bvalues, signals_noiseless, 'g', label='Noiseless Signal')
plt.plot(bvalues, signals, '.g', label='Noisy Signal')
plt.plot(bvalues, signal_results, 'r', label='Results Signal')
plt.plot(bvalues, signal_means, 'b', label='Means Signal')
plt.legend()
# plt.show()

mcmc.plot(overplot=truth)


#model1 = ETP_SRI_LinearFitting(thresholds=[200])
#model2 = IAR_LU_biexp()
model2 = PvH_KB_NKI_IVIMfit()

#dev_test_run(model1, linear_fit_option=True)
dev_test_run(model2)


def run_sampling():
bvalues = np.array([0, 50, 100, 150, 200, 500, 800])

def ivim_model(b, S0=1, f=0.1, Dstar=0.01, D=0.001):
return S0*(f*np.exp(-b*Dstar) + (1-f)*np.exp(-b*D))

signals = ivim_model(bvalues)


0 comments on commit 68a9d33

Please sign in to comment.