Skip to content

Commit

Permalink
Merge pull request #9 from openmrslab/amares
Browse files Browse the repository at this point in the history
Amares
  • Loading branch information
bennyrowland authored Sep 20, 2016
2 parents 1cf5ace + ca720a3 commit 5ce73fd
Show file tree
Hide file tree
Showing 4 changed files with 589 additions and 132 deletions.
2 changes: 1 addition & 1 deletion suspect/basis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


def gaussian(time_axis, frequency, phase, fwhm):
oscillatory_term = numpy.exp(2j * numpy.pi * (frequency * time_axis + phase))
oscillatory_term = numpy.exp(2j * numpy.pi * (frequency * time_axis) + 1j * phase)
damping = numpy.exp(-time_axis ** 2 / 4 * numpy.pi ** 2 / numpy.log(2) * fwhm ** 2)
fid = oscillatory_term * damping
fid[0] /= 2.0
Expand Down
356 changes: 278 additions & 78 deletions suspect/fitting/singlet.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import lmfit
import numpy
import scipy.optimize
import numbers
import copy

import suspect
import suspect.basis


def complex_to_real(complex_fid):
Expand Down Expand Up @@ -32,7 +34,7 @@ def real_to_complex(real_fid):
:param real_fid: the real FID to be converted to complex.
:return: the complex version of the FID
"""
np = real_fid.shape[0] / 2
np = int(real_fid.shape[0] / 2)
complex_fid = numpy.zeros(np, 'complex')
complex_fid[:] = real_fid[:np]

Expand All @@ -42,83 +44,281 @@ def real_to_complex(real_fid):
return complex_fid


def gaussian_fid(x, amplitude=1, frequency=0.0, phase=0.0, fwhm=1.0):
def fit(fid, model, baseline_points=16):
"""
Generates a Gaussian FID for use with the lmfit GaussianFidModel class. The
helper function complex_to_real is used to convert the FID to a real form.
:param x: the time axis for the fid data
:param amplitude: the amplitude of the Gaussian
:param frequency: the frequency of the Gaussian in Hz
:param phase: the phase in radians
:param fwhm: the full width at half maximum of the Gaussian
:return: a real FID describing a Gaussian relaxation
Fit fid with model parameters.
:param fid: MRSData object of FID to be fit
:param model: dictionary model of fit parameters
:param baseline_points: the number of points at the start of the FID to ignore
:return: Dictionary containing ["model": optimized model, "fit": fitting data, "err": dictionary of standard errors]
"""

complex_fid = amplitude * suspect.basis.gaussian(x, frequency, phase, fwhm)
return complex_to_real(complex_fid)


class GaussianFidModel(lmfit.models.Model):
def __init__(self, *args, **kwargs):
super(GaussianFidModel, self).__init__(gaussian_fid, *args, **kwargs)
self.set_param_hint('fwhm', min=0)
self.set_param_hint('amplitude', min=0)
self.set_param_hint('phase', min=-numpy.pi, max=numpy.pi)

def guess(self, data=None, **kwargs):
return self.make_params()

def copy(self, **kwargs):
raise NotImplementedError


class Model:
def __init__(self, peaks):
self.model = None
self.params = lmfit.Parameters()
for peak_name, peak_params in peaks.items():

current_peak_model = GaussianFidModel(prefix="{}".format(peak_name))
self.params.update(current_peak_model.make_params())

for param_name, param_data in peak_params.items():
# the
full_name = "{0}{1}".format(peak_name, param_name)

if full_name in self.params:
if type(param_data) is str:
self.params[full_name].set(expr=param_data)
elif type(param_data) is dict:
if "value" in param_data:
self.params[full_name].set(value=param_data["value"])
if "min" in param_data:
self.params[full_name].set(min=param_data["min"])
if "max" in param_data:
self.params[full_name].set(max=param_data["max"])
if "expr" in param_data:
self.params[full_name].set(expr=param_data)
elif type(param_data) is numbers.Number:
self.params[full_name].set(param_data)

if self.model is None:
self.model = current_peak_model
# Get list of metabolite names.
def get_metabolites(model_input):
metabolites = []
for fid_property_name, fid_property_value in model_input.items():
if type(fid_property_value) is dict:
metabolites.append(fid_property_name)
return metabolites

# Get standard errors from lmfit MinimizerResult object.
def get_errors(result):
errors = {}
for name, param in result.params.items():
errors[name] = param.stderr
return errors

def phase_fid(fid_in, phase0, phase1):
"""
This function performs a Fourier Transform on the FID to shift it into phase.
:param fid_in: FID to be fitted.
:param phase1: phase1 value.
:param phase0: phase0 value.
:return: FID that has been shifted into phase by FFT
"""
spectrum = numpy.fft.fftshift(numpy.fft.fft(fid_in))
np = fid_in.np
phase_shift = phase0 + phase1 * numpy.linspace(-np / 2, np / 2, np, endpoint=False)
phased_spectrum = spectrum * numpy.exp(1j * phase_shift)
return fid_in.inherit(numpy.fft.ifft(numpy.fft.ifftshift(phased_spectrum)))

def make_basis(params, time_axis):
"""
This function generates a basis set.
:param params: lmfit Parameters object containing fitting parameters.
:param time_axis: the time axis.
:return: a matrix containing the generated basis set.
"""

basis_matrix = numpy.matrix(numpy.zeros((len(metabolite_name_list), len(time_axis) * 2)))
for i, metabolite_name in enumerate(metabolite_name_list):
gaussian = suspect.basis.gaussian(time_axis,
params["{}_frequency".format(metabolite_name)],
params["{}_phase".format(metabolite_name)].value,
params["{}_fwhm".format(metabolite_name)])
real_gaussian = complex_to_real(gaussian)
basis_matrix[i, :] = real_gaussian
return basis_matrix

def unphase(data, params):

unphased_data = phase_fid(data, -params['phase0'], -params['phase1'])
real_unphased_data = complex_to_real(unphased_data)

return real_unphased_data

def do_fit(params, time_axis, real_unphased_data):
"""
This function performs the fitting.
:param params: lmfit Parameters object containing fitting parameters.
:param time_axis: the time axis.
:param real_unphased_data:
:return: List of fitted data points and amplitudes of each singlet.
"""
basis = make_basis(params, time_axis)

weights = scipy.optimize.nnls(basis[:, baseline_points:-baseline_points].T,
real_unphased_data[baseline_points:-baseline_points])[0]

fitted_data = numpy.array(numpy.dot(weights, basis)).squeeze()
return fitted_data, weights

def residual(params, time_axis, data):
"""
This function calculates the residual to be minimized by the least squares means method.
:param params: lmfit Parameters object containing fitting parameters.
:param time_axis: the time axis.
:param data: FID to be fitted.
:return: residual values of baseline points.
"""

real_unphased_data = unphase(data, params)
fitted_data, weights = do_fit(params, time_axis, real_unphased_data)
res = fitted_data - real_unphased_data

return res[baseline_points:-baseline_points]

def fit_data(data, initial_params):
"""
This function takes an FID and a set of parameters contained in an lmfit Parameters object,
and fits the data using the least squares means method.
:param data: FID to be fitted.
:param initial_params: lmfit Parameters object containing fitting parameters.
:return: tuple of weights as a list, data as a list, and result as an lmift MinimizerResult object.
"""
fitting_result = lmfit.minimize(residual,
initial_params,
args=(data.time_axis(), data),
xtol=5e-3)

real_fitted_data, fitting_weights = do_fit(fitting_result.params, data.time_axis(), unphase(data, fitting_result.params))
fitted_data = real_to_complex(real_fitted_data)

return fitting_weights, fitted_data, fitting_result

# Convert lmfit parameters to model format
def parameters_to_model(parameters_obj, param_weights):
new_model = {}
for param_name, param in parameters_obj.items():
name = param_name.split("_")
name1 = name[0]
if len(name) == 1: # i.e. phase
new_model[name1] = param.value
else:
self.model += current_peak_model

def fit(self, data):
fit_result = self.model.fit(complex_to_real(data), x=data.time_axis(), params=self.params)
result_params = {}
for component in self.model.components:
component_name = component.prefix
result_params[component.prefix] = {}
for param in component.make_params():
param_name = str(param).replace(component_name, "")
result_params[component.prefix][param_name] = fit_result.params[param].value
fit_curve = real_to_complex(fit_result.best_fit)
fit_components = {k: real_to_complex(v) for k, v in fit_result.eval_components().items()}
return {
"params": result_params,
"fit": fit_curve,
"fit_components": fit_components
}
name2 = name[1]
if name1 not in new_model:
new_model[name1] = {name2: param.value}
else:
new_model[name1][name2] = param.value

for i, metabolite_name in enumerate(metabolite_name_list):
new_model[metabolite_name]["amplitude"] = param_weights[i]

return new_model

# Convert initial model to lmfit parameters.
def model_to_parameters(model_dict):
lmfit_parameters = lmfit.Parameters()
params = []
ordered_params = []
# Calculate dependencies/references for each parameter.
depend_dict = calculate_dependencies(model_dict)

model_dict_copy = copy.deepcopy(model_dict)
params.append(("phase0", model_dict_copy.pop("phase0")))
params.append(("phase1", model_dict_copy.pop("phase1")))

# Construct lmfit Parameter input for each parameter.
for peak_name, peak_properties in model_dict_copy.items():
# Fix phase value to 0 by default.
if "phase" not in peak_properties:
params.append(("{0}_{1}".format(peak_name, "phase"), None, None, None, None, "0"))
for property_name, property_value in peak_properties.items():
# Initialize lmfit parameter arguments.
name = "{0}_{1}".format(peak_name, property_name)
value = None
vary = True
lmfit_min = None
lmfit_max = None
expr = None
if isinstance(property_value, numbers.Number):
value = property_value
elif isinstance(property_value, str):
expr = property_value
elif isinstance(property_value, dict):
if "value" in property_value:
value = property_value["value"]
if "min" in property_value:
lmfit_min = property_value["min"]
if "max" in property_value:
lmfit_max = property_value["max"]
# Add parameter object with defined parameters.
params.append((name, value, vary, lmfit_min, lmfit_max, expr)) # (lmfit Parameter input format)

# Order parameters based on dependencies.
in_oparams = []
while len(params) > 0:
front = params.pop(0)
name = front[0]
# If no dependencies, add parameter to list and mark parameter as added.
if name not in depend_dict or depend_dict[name] is None:
ordered_params.append(front)
in_oparams.append(name)
else:
dependencies_present = True
for dependency in depend_dict[name]:
# If dependency not yet added, mark parameter to move to back of queue.
if dependency not in in_oparams:
dependencies_present = False
# If all dependencies present, add parameter to list and mark parameter as added.
if dependencies_present:
ordered_params.append(front)
in_oparams.append(name)
# If dependencies missing, move parameter to back of queue.
else:
params.append(front)

# Convert all parameters to lmfit Parameter objects.
lmfit_parameters.add_many(*ordered_params)

return lmfit_parameters

# Check if all model input types are correct.
def check_errors(check_model):
# Allowed keys in the model.
allowed_keys = ["min", "max", "value", "phase", "amplitude"]

# Scan model.
for model_property, model_values in check_model.items():
if not isinstance(model_values, (numbers.Number, dict)):
raise TypeError("Value of {0} must be a number (for phases), or a dictionary.".format(model_property))
elif type(model_values) is dict: # i.e. type(value) is not int
for peak_property, peak_value in model_values.items():
if not isinstance(peak_value,(numbers.Number,dict,str)):
raise TypeError("Value of {0}_{1} must be a value, an expression, or a dictionary."
.format(model_property, peak_property))
if type(peak_value) is dict:
for width_param in peak_value:
# Dictionary must have 'value' key.
if "value" not in peak_value:
raise KeyError("Dictionary {0}_{1} is missing 'value' key."
.format(model_property, peak_property))
# Dictionary can only have 'min,' 'max,' and 'value'.
if width_param not in allowed_keys:
raise KeyError("In {0}_{1}, '{2}' is not an allowed key."
.format(model_property, peak_property, width_param))

# Calculate references to determine order for Parameters.
def calculate_dependencies(unordered_model):
dependencies = {} # (name, [dependencies])

# Compile dictionary of effective names.
for model_property, model_values in unordered_model.items():
if type(model_values) is dict: # i.e. pcr, not phase
for peak_property in model_values:
dependencies["{0}_{1}".format(model_property, peak_property)] = None

# Find dependencies for each effective name.
for model_property, model_values in unordered_model.items():
if type(model_values) is dict: # i.e. not phase
for peak_property, peak_value in model_values.items():
if type(peak_value) is str:
lmfit_name = "{0}_{1}".format(model_property, peak_property)
dependencies[lmfit_name] = []
for depend in dependencies:
if depend in peak_value:
dependencies[lmfit_name].append(depend)

# Check for circular dependencies.
for name, dependents in dependencies.items():
if type(dependents) is list:
for dependent in dependents:
if dependencies[dependent] is not None and name in dependencies[dependent]:
raise ReferenceError("{0} and {1} reference each other, creating a circular reference."
.format(name, dependent))
return dependencies

# Do singlet fitting
# Minimize and fit 31P data.

check_errors(model) # Check for errors in model formatting.

metabolite_name_list = get_metabolites(model) # Set list of metabolite names.

parameters = model_to_parameters(model) # Convert model to lmfit Parameters object.

fitted_weights, fitted_data, fitted_results = fit_data(fid, parameters) # Fit data.

final_model = parameters_to_model(fitted_results.params, fitted_weights) # Convert fit parameters to model format.

stderr = get_errors(fitted_results) # Get stderr values for each parameter.

return_dict = {"model": final_model, "fit": fitted_data, "errors": stderr} # Compile output into a dictionary.
return return_dict

Loading

0 comments on commit 5ce73fd

Please sign in to comment.