Skip to content

Commit

Permalink
Merge pull request #159 from rolfverberg/dev_rolf
Browse files Browse the repository at this point in the history
fix: make EDD backward compatible and fix the CHAP EDD example
  • Loading branch information
rolfverberg authored Nov 18, 2024
2 parents e025e5d + 4c44fc3 commit e067cee
Show file tree
Hide file tree
Showing 13 changed files with 976 additions and 481 deletions.
12 changes: 9 additions & 3 deletions CHAP/common/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1600,8 +1600,14 @@ def _read_raw_data_edd(
scan = map_config.spec_scans[0]
scan_numbers = scan.scan_numbers
scanparser = scan.get_scanparser(scan_numbers[0])
detector_indices = [int(d.id) for d in detector_config.detectors]
ddata = scanparser.get_detector_data(detector_indices)
if detector_config.detectors[0].id == 'mca1':
if len(detector_config.detectors) != 1:
raise ValueError(
'Multiple detectors not implemented for mca1 detector')
detector_ids = ['mca1']
else:
detector_ids = [int(d.id) for d in detector_config.detectors]
ddata = scanparser.get_detector_data(detector_ids)
spec_scan_shape = scanparser.spec_scan_shape
num_dim = np.prod(spec_scan_shape)
num_id = len(map_config.independent_dimensions)
Expand Down Expand Up @@ -1654,7 +1660,7 @@ def _read_raw_data_edd(
else:
scanparser = scan.get_scanparser(scan_number)
assert spec_scan_shape == scanparser.spec_scan_shape
ddata = scanparser.get_detector_data(detector_indices)
ddata = scanparser.get_detector_data(detector_ids)
data[offset] = ddata
spec_scan_motor_mnes = scanparser.spec_scan_motor_mnes
start_dim = offset * num_dim
Expand Down
25 changes: 20 additions & 5 deletions CHAP/common/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,18 +479,27 @@ def read(
nxentry.attrs['station'] = config.station
if config.experiment_type == 'EDD':
if detectors is None:
detector_indices = None
detectors_ids = None
else:
detector_indices = [int(d.id) for d in detectors.detectors]
try:
detectors_ids = [int(d.id) for d in detectors.detectors]
except:
detectors_ids = [d.id for d in detectors.detectors]
nxentry.spec_scans = NXcollection()
# nxpaths = []
detector_data_format = None
for scans in config.spec_scans:
nxscans = NXcollection()
nxentry.spec_scans[f'{scans.scanparsers[0].scan_name}'] = nxscans
nxscans.attrs['spec_file'] = str(scans.spec_file)
nxscans.attrs['scan_numbers'] = scans.scan_numbers
for scan_number in scans.scan_numbers:
scanparser = scans.get_scanparser(scan_number)
if detector_data_format is None:
detector_data_format = scanparser.detector_data_format
elif scanparser.detector_data_format != detector_data_format:
raise ValueError(
'Mixing `spec` and `h5` data formats not implemented')
nxscans[scan_number] = NXcollection()
try:
nxscans[scan_number].spec_motors = dumps(
Expand All @@ -515,7 +524,7 @@ def read(
# nxpaths.append(
# f'spec_scans/{nxscans.nxname}/{scan_number}/data')
nxdata.data = NXfield(
value=scanparser.get_detector_data(detector_indices))
value=scanparser.get_detector_data(detectors_ids))
else:
nxdata = NXdata()
nxscans[scan_number].data = nxdata
Expand All @@ -526,8 +535,14 @@ def read(
value=scanparser.get_detector_data(detector.id))

if detectors is None and config.experiment_type == 'EDD':
detectors = DetectorConfig(
detectors=[Detector(id=i) for i in range(nxdata.data.shape[1])])
if detector_data_format == 'spec':
detectors = DetectorConfig(
detectors=[Detector(id='mca1')
for i in range(nxdata.data.shape[1])])
else:
detectors = DetectorConfig(
detectors=[
Detector(id=i) for i in range(nxdata.data.shape[1])])
nxentry.detectors = dumps(detectors.dict())

#return nxroot, nxpaths
Expand Down
175 changes: 145 additions & 30 deletions CHAP/edd/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,68 @@ class BaselineConfig(BaseModel):
attrs: Optional[dict] = {}


# Material configuration classes
# Fit configuration class

class FitConfig(BaseModel):
"""Model for parameters to characterize a sample material.
:ivar background: Background model for peak fitting.
:type background: str, list[str], optional
:ivar baseline: Automated baseline subtraction configuration,
defaults to `False`.
:type baseline: Union(bool, BaselineConfig), optional
:ivar fwhm_min: Minimum FWHM for peak fitting (in MCA channels
for calibration).
:type fwhm_min: float, optional
:ivar fwhm_max: Maximum FWHM for peak fitting (in MCA channels
for calibration).
:type fwhm_max: float, optional
:ivar centers_range: Peak centers range for peak fitting.
The allowed range the peak centers will be the initial
values ± `centers_range`. Defaults to `20` for the
calibration and `2.0` for the strain analysis (CURRENTLY
NOT USED AS INTENDED).
:type centers_range: float, optional
"""
background: Optional[Union[str, list]] = None
baseline: Optional[Union[bool, BaselineConfig]] = False
fwhm_min: Optional[confloat(gt=0, allow_inf_nan=False)] = None
fwhm_max: Optional[confloat(gt=0, allow_inf_nan=False)] = None
centers_range: Optional[confloat(gt=0, allow_inf_nan=False)] = None

@field_validator('background', mode='before')
@classmethod
def validate_background(cls, background):
"""Validate the HKL indices.
:ivar background: Background model for peak fitting.
:type background: str, list[str], optional
:return: List of background models.
:rtype: list[str]
"""
if isinstance(background, str):
return [background]
return sorted(background)

@field_validator('baseline', mode='before')
@classmethod
def validate_baseline(cls, baseline):
"""Validate the HKL indices.
:ivar baseline: Automated baseline subtraction configuration,
defaults to `False`.
:type baseline: Union(bool, BaselineConfig), optional
:return: BaselineConfig
:rtype: list[str]
"""
if isinstance(baseline, bool):
if baseline:
return BaselineConfig()
else:
return BaselineConfig(**baseline)


# Material configuration class

class MaterialConfig(BaseModel):
"""Model for parameters to characterize a sample material.
Expand Down Expand Up @@ -130,9 +191,6 @@ class MCAElementConfig(Detector):
"""Class representing metadata required to configure a single MCA
detector element.
:ivar id: The MCA detector id (name or channel index) in the scan,
defaults to `'0'`.
:type id: str
:ivar num_bins: Number of MCA channels.
:type num_bins: int, optional
"""
Expand Down Expand Up @@ -170,19 +228,21 @@ class MCAElementCalibrationConfig(MCAElementConfig):
E_i = a*i^2 + b*i + c), defaults to `[0, 0, 1]`.
:type energy_calibration_coeffs:
list[float, float, float], optional
:ivar background: Background model for peak fitting.
:ivar background: Background model for peak fitting superseding
the global one in FitConfig.
:type background: str, list[str], optional
:ivar baseline: Automated baseline subtraction configuration,
defaults to `False`.
:ivar baseline: Automated baseline subtraction configuration
superseding the global one in FitConfig, defaults to `False`.
:type baseline: Union(bool, BaselineConfig), optional
:ivar tth_initial_guess: Initial guess for 2&theta,
defaults to `5.0`.
:ivar tth_initial_guess: Initial guess for 2&theta superseding
the global one in MCATthCalibrationConfig, defaults to `5.0`.
:type tth_initial_guess: float, optional
:ivar tth_calibrated: Calibrated value for 2&theta.
:type tth_calibrated: float, optional
:ivar include_energy_ranges: List of MCA channel energy ranges
in keV whose data should be included after applying a mask
(bounds are inclusive), defaults to `[[50, 150]]`.
(bounds are inclusive) superseding the global one in
MCATthCalibrationConfig, defaults to `[[50, 150]]`.
:type include_energy_ranges: list[[float, float]], optional
"""
tth_max: confloat(gt=0, allow_inf_nan=False) = 90.0
Expand Down Expand Up @@ -794,7 +854,7 @@ def scanned_vals(self):
return self.scanparser.spec_scan_motor_vals[0]


class MCAEnergyCalibrationConfig(BaseModel):
class MCAEnergyCalibrationConfig(FitConfig):
"""Class representing metadata required to perform an energy
calibration for an MCA detector.
Expand All @@ -804,7 +864,7 @@ class MCAEnergyCalibrationConfig(BaseModel):
:ivar scan_step_indices: Optional scan step indices to use for the
calibration. If not specified, the calibration will be
performed on the average of all MCA spectra for the scan.
:type scan_step_indices: list[int], optional
:type scan_step_indices: int, str, list[int], optional
:ivar detectors: List of individual MCA detector element
calibration configurations.
:type detectors: list[MCAElementCalibrationConfig], optional
Expand Down Expand Up @@ -836,19 +896,19 @@ class MCAEnergyCalibrationConfig(BaseModel):
scan_step_indices: Optional[Annotated[conlist(
min_length=1, item_type=conint(ge=0)),
Field(validate_default=True)]] = None
detectors: Optional[conlist(item_type=MCAElementCalibrationConfig)] = None
flux_file: Optional[FilePath] = None
material: Optional[MaterialConfig] = MaterialConfig(
material_name='CeO2', lattice_parameters=5.41153, sgnum=225)
peak_energies: conlist(min_length=2, item_type=confloat(gt=0))
max_peak_index: conint(ge=0)
detectors: Optional[conlist(item_type=MCAElementCalibrationConfig)] = None
fit_index_ranges: Optional[
conlist(
min_length=1,
item_type=conlist(
min_length=2,
max_length=2,
item_type=conint(ge=0)))] = None
flux_file: Optional[FilePath] = None
material: Optional[MaterialConfig] = MaterialConfig(
material_name='CeO2', lattice_parameters=5.41153, sgnum=225)

@model_validator(mode='before')
@classmethod
Expand Down Expand Up @@ -878,17 +938,18 @@ def validate_scan_step_indices(cls, scan_step_indices):
:ivar scan_step_indices: Optional scan step indices to use for
the calibration. If not specified, the calibration will be
performed on the average of all MCA spectra for the scan.
:type scan_step_indices: list[int], optional
:type scan_step_indices: int, str, list[int], optional
:raises ValueError: Invalid experiment type.
:return: List of step indices.
:rtype: list[int]
"""
if isinstance(scan_step_indices, str):
if isinstance(scan_step_indices, int):
scan_step_indices = [scan_step_indices]
elif isinstance(scan_step_indices, str):
# Local modules
from CHAP.utils.general import string_to_list

scan_step_indices = string_to_list(
scan_step_indices, raise_error=True)
scan_step_indices = string_to_list(scan_step_indices)
return scan_step_indices

@field_validator('max_peak_index')
Expand Down Expand Up @@ -946,37 +1007,64 @@ def dict(self, *args, **kwargs):
:rtype: dict
"""
d = super().dict(*args, **kwargs)
if 'inputdir' in d:
del d['inputdir']
for k in ('inputdir', 'background', 'baseline'):
if k in d:
del d[k]
return d


class MCATthCalibrationConfig(MCAEnergyCalibrationConfig):
"""Class representing metadata required to perform a tth
calibration for an MCA detector.
:ivar tth_max: Detector rotation about lab frame x axis,
defaults to `90`.
:type tth_max: float, optional
:ivar calibration_method: Type of calibration method,
defaults to `'fix_tth_to_tth_init'`.
:type calibration_method:
Literal['fix_tth_to_tth_init', 'direct_fit_residual',
'direct_fit_peak_energies', 'direct_fit_combined',
'iterate_tth'], optional
:ivar include_energy_ranges: List of MCA channel energy ranges
in keV whose data should be included after applying a mask
(bounds are inclusive) superseding the global one in
MCATthCalibrationConfig, defaults to `[[50, 150]]`.
:type include_energy_ranges: list[[float, float]], optional
:ivar max_iter: Maximum number of iterations of the calibration
routine (only used for `'iterate_tth'`), defaults to `10`.
:type max_iter: int, optional
:ivar quadratic_energy_calibration: Adds a quadratic term to
the detector channel index to energy conversion, defaults
to `False` (linear only).
:type quadratic_energy_calibration: bool, optional
:ivar tth_initial_guess: Initial guess for 2&theta,
defaults to `5.0`.
:type tth_initial_guess: float, optional
:ivar tune_tth_tol: Cutoff error for tuning 2&theta (only used for
`'iterate_tth'`). Stop iterating the calibration routine after
an iteration produces a change in the tuned value of 2&theta
that is smaller than this cutoff, defaults to `1e-8`.
:ivar tune_tth_tol: float, optional
"""
tth_max: confloat(gt=0, allow_inf_nan=False) = 90.0
calibration_method: Optional[Literal[
'fix_tth_to_tth_init',
'direct_fit_residual',
'direct_fit_peak_energies',
'direct_fit_combined',
'iterate_tth']] = 'fix_tth_to_tth_init'
include_energy_ranges: Annotated[
conlist(
min_length=1,
item_type=conlist(
min_length=2,
max_length=2,
item_type=confloat(ge=25))),
Field(validate_default=True)] = [[50, 150]]
max_iter: conint(gt=0) = 10
quadratic_energy_calibration: bool = False
tth_initial_guess: confloat(gt=0, le=tth_max, allow_inf_nan=False) = 5.0
tune_tth_tol: confloat(ge=0) = 1e-8

def flux_file_energy_range(self):
Expand All @@ -991,6 +1079,23 @@ def flux_file_energy_range(self):
energies = flux[:,0]/1.e3
return energies.min(), energies.max()

def dict(self, *args, **kwargs):
"""Return a representation of this configuration in a
dictionary that is suitable for dumping to a YAML file.
:return: Dictionary representation of the configuration.
:rtype: dict
"""
d = super().dict(*args, **kwargs)
for k in ('tth_max', 'include_energy_ranges', 'tth_initial_guess'):
if k in d:
del d[k]
if self.calibration_method != 'iterate_tth':
for k in ('max_iter', 'tune_tth_tol'):
if k in d:
del d[k]
return d


class StrainAnalysisConfig(BaseModel):
"""Class representing input parameters required to perform a
Expand All @@ -1002,26 +1107,36 @@ class StrainAnalysisConfig(BaseModel):
:ivar detectors: List of individual detector element strain
analysis configurations, defaults to `None` (use all).
:type detectors: list[MCAElementStrainAnalysisConfig], optional
:ivar materials: Sample material configurations.
:type materials: list[MaterialConfig]
:ivar find_peaks: Exclude peaks where the average spectrum
is below the `rel_height_cutoff` (in the detector
configuration) cutoff relative to the maximum value of the
average spectrum, defaults to `True`.
:type find_peaks: bool, optional
:ivar flux_file: File name of the csv flux file containing station
beam energy in eV (column 0) versus flux (column 1).
:type flux_file: str, optional
:ivar materials: Sample material configurations.
:type materials: list[MaterialConfig]
:ivar oversampling: FIX
:type oversampling: FIX
:ivar skip_animation: Skip the animation and plotting of
the strain analysis fits, defaults to `False`.
:type skip_animation: bool, optional
:ivar sum_axes: Whether to sum over the fly axis or not
for EDD scan types not 0, defaults to `True`.
:type sum_axes: Union[bool, list[str]], optional
:ivar oversampling: FIX
:type oversampling: FIX
"""
inputdir: Optional[DirectoryPath] = None
detectors: Optional[conlist(
min_length=1, item_type=MCAElementStrainAnalysisConfig)] = None
materials: Optional[conlist(item_type=MaterialConfig)] = None
find_peaks: Optional[bool] = True
flux_file: Optional[FilePath] = None
sum_axes: Optional[
Union[bool, conlist(min_length=1, item_type=str)]] = True
materials: Optional[conlist(item_type=MaterialConfig)] = None
oversampling: Optional[
Annotated[Dict, Field(validate_default=True)]] = {'num': 10}
skip_animation: Optional[bool] = False
sum_axes: Optional[
Union[bool, conlist(min_length=1, item_type=str)]] = True

@model_validator(mode='before')
@classmethod
Expand All @@ -1030,7 +1145,7 @@ def validate_config(cls, data):
flux_file filepath.
:param data: Pydantic validator data object.
:type data: MCAEnergyCalibrationConfig,
:type data: MCATthCalibrationConfig,
pydantic_core._pydantic_core.ValidationInfo
:return: The currently validated list of class properties.
:rtype: dict
Expand Down
Loading

0 comments on commit e067cee

Please sign in to comment.