Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
enh: write out all the results for potential off-line visualization
Browse files Browse the repository at this point in the history
Instead of making the script 1-off, let's keep the scores of all cross
validation cycles.
  • Loading branch information
oesteban committed Oct 23, 2024
1 parent 7e79f5e commit 92fbaba
Showing 1 changed file with 21 additions and 202 deletions.
223 changes: 21 additions & 202 deletions scripts/dwi_estimation_error_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@
from __future__ import annotations

import argparse
from collections import defaultdict

# import nibabel as nib
import numpy as np
import pandas as pd
from dipy.core.gradients import gradient_table
from dipy.sims.voxel import single_tensor
from sklearn.metrics import root_mean_squared_error
from sklearn.model_selection import KFold, RepeatedKFold, cross_val_score
from sklearn.model_selection import RepeatedKFold, cross_val_score

from eddymotion.model._sklearn import (
EddyMotionGPR,
Expand All @@ -44,144 +45,6 @@
from eddymotion.testing import simulations as testsims


def create_random_train_mask(gtab: gradient_table, size: int, seed: int = 1234) -> np.ndarray:
"""
Create a random mask for the gradient table.
Create a mask for the gradient table where a ``size`` number of indices will be
excluded. b0 values are excluded.
Parameters
----------
gtab : :obj:`~dipy.core.gradients.gradient_table`
Gradient table.
size : :obj:`int`
Number of indices to exclude.
seed : :obj:`int`, optional
Seed for the random number generator, by default 1234.
Returns
-------
:obj:`~numpy.ndarray`
Training mask.
Raises
------
ValueError
If the size of requested masked values is greater than available gradient vectors.
"""
rng = np.random.default_rng(seed)

# Get the indices of the non-zero diffusion-encoding gradient vector indices
nnzero_degv_idx = np.where(~gtab.b0s_mask)[0]
if nnzero_degv_idx.size < size:
raise ValueError(
f"Requested {size} values for masking; gradient table has {nnzero_degv_idx.size} "
"non-zero diffusion-encoding gradient vectors. Reduce the number of masked values."
)
lo = rng.choice(nnzero_degv_idx, size=size, replace=False)

# Exclude the b0s
zero_degv_idx = np.asarray(list(set(range(len(gtab.bvals))).difference(nnzero_degv_idx)))
lo = np.hstack([zero_degv_idx, lo])
train_mask = np.ones(len(gtab.bvals), dtype=bool)
train_mask[lo] = False
return train_mask


def perform_experiment(
gtab: gradient_table,
S0: float,
evals1: np.ndarray,
evecs: np.ndarray,
snr: float,
repeats: int,
kfold: list[int],
) -> dict[int, list[tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]]:
"""
Perform the experiment by estimating the dMRI signal using a Gaussian process model.
Parameters
----------
gtab : :obj:`~dipy.core.gradients.gradient_table`
Gradient table.
S0 : :obj:`float`
S0 value.
evals1 : :obj:`~numpy.ndarray`
Eigenvalues of the tensor.
evecs : :obj:`~numpy.ndarray`
Eigenvectors of the tensor.
snr : :obj:`float`
Signal-to-noise ratio.
repeats : :obj:`int`
Number of repeats.
kfold : :obj:`list`
list of directions to leave out/predict.
Returns
-------
:obj:`dict`
Data for the predicted signal and its error.
"""

# Fix the random number generator for reproducibility when generating the
# signal
rng = np.random.default_rng(1234)

# Define the Gaussian process model parameter
lambda_s = 2.0
a = 1.0
sigma_sq = 0.5

data = {}
nzero_bvecs = gtab.bvecs[~gtab.b0s_mask]

# Simulate the fitting a number of times: every time the signal created will be a little
# different
# for _ in range(repeats):
# Create the DWI signal using a single tensor
signal = single_tensor(gtab, S0=S0, evals=evals1, evecs=evecs, snr=snr, rng=rng)

import pdb

pdb.set_trace()

# Loop over the number of indices that are left out from the training/need to be predicted
for n in kfold:
# Assumptions:
# - Consecutive indices in the folds
# - A single b0
kf = KFold(n_splits=n, shuffle=False)

# Define the Gaussian process model instance
gp_model = EddyMotionGPR(
kernel=SphericalKriging(a=a, lambda_s=lambda_s),
alpha=sigma_sq,
optimizer=None,
)

_data = []
for _, (train_index, test_index) in enumerate(kf.split(nzero_bvecs)):
# Create the training mask leaving out the requested number of samples
# train_mask = create_random_train_mask(gtab, n)

# Fit the Gaussian process
# Add 1 to account for the b0
gpfit = gp_model.fit(signal[train_index + 1], gtab[train_index + 1])

# Predict the signal
# X_qry, idx_qry = get_query_vectors(gtab, train_mask)
# Add 1 to account for the b0
idx_qry = test_index + 1
X_qry = gtab[idx_qry].bvecs
_y_pred, _y_std = gpfit.predict(X_qry)
_data.append((idx_qry, signal[idx_qry], _y_pred, _y_std))
data.update({n: _data})
return data


def cross_validate(
gtab: gradient_table,
S0: float,
Expand Down Expand Up @@ -230,52 +93,6 @@ def cross_validate(
return scores


def compute_error(
data: dict[int, list[tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]],
repeats: int,
kfolds: list[int],
) -> tuple[np.ndarray, np.ndarray]:
"""
Compute the error and standard deviation.
Parameters
----------
data : :obj:`dict`
Data for the predicted signal and its error.
repeats : :obj:`int`
Number of repeats.
kfolds : :obj:`dict`
Number of k-folds.
Returns
-------
:obj:`tuple`
Mean RMSE and standard deviation.
"""
mean_rmse = []
std_dev = []

# Loop over the range of indices that were predicted
for vals in data.values():
# repeats = 1
# _data = np.array(vals[n * repeats : n * repeats + repeats])
_signal = np.hstack([t[1] for t in vals])
_pred = np.hstack([t[2] for t in vals])
_rmse = root_mean_squared_error(_signal, _pred)
# ToDo
# Check here what is the value that we wil keep for the std
_std = np.hstack([t[3] for t in vals])
_std_dev = np.mean(_std)
_std_dev = np.std(
[root_mean_squared_error([v1], [v2]) for v1, v2 in zip(_signal, _pred, strict=False)]
) # np.std(_rmse)
mean_rmse.append(_rmse)
std_dev.append(_std_dev)

return np.asarray(mean_rmse), np.asarray(std_dev)


def _build_arg_parser() -> argparse.ArgumentParser:
"""
Build argument parser for command-line interface.
Expand Down Expand Up @@ -333,22 +150,24 @@ def main() -> None:
# Create a gradient table for a single-shell
gtab = testsims.create_single_shell_gradient_table(args.hsph_dirs, args.bval_shell)

# Estimate the dMRI signal using a Gaussian process estimator
# ToDo
# Returning the index here is not useful as we cannot plot the signal as it
# changes on every fold and we do not return it. Maybe we can set a random
# value so that we return that one and we can plot it much like in the
# notebook or maybe we leave that for a separate script/notebook ??
scores = {
n: cross_validate(gtab, args.S0, args.evals1, evecs, args.snr, n)
for n in args.kfold
for _ in range(args.repeats)
}

print({n: (np.mean(scores[n]), np.std(scores[n])) for n in args.kfold})

# Compute the error
# rmse, std_dev = compute_error(data, args.repeats, args.kfold)
# Use Scikit-learn cross validation
scores = defaultdict(list, {})

for n in args.kfold:
for i in range(args.repeats):
cv_scores = -1.0 * cross_validate(gtab, args.S0, args.evals1, evecs, args.snr, n)
scores["rmse"] += cv_scores.tolist()
scores["repeat"] += [i] * len(cv_scores)
scores["n_folds"] += [n] * len(cv_scores)

print(f"Finished {n}-fold cross-validation")

scores_df = pd.DataFrame(scores)
scores_df.to_csv("cv_scores.tsv", sep="\t", index=None, na_rep="n/a")

grouped = scores_df.groupby(["n_folds"])
print(grouped[["rmse"]].mean())
print(grouped[["rmse"]].std())

# Plot - import plot_* from eddymotion.viz.signals
# xlabel = "N"
Expand Down

0 comments on commit 92fbaba

Please sign in to comment.