Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maintenance for the SANSTubeCalibration algorithm #38773

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from pathlib import Path
import sys
from enum import Enum
from typing import Tuple

import numpy as np

Expand All @@ -27,11 +26,11 @@ class TubeSide:
RIGHT = "right"

@classmethod
def get_tube_side(cls, tube_id):
def get_tube_side(cls, tube_id: int) -> str:
return cls.LEFT if tube_id % 2 == 0 else cls.RIGHT

@classmethod
def get_first_pixel_position(cls, tube_id):
def get_first_pixel_position(cls, tube_id: int) -> float:
# First pixel in mm, as per IDF for rear detector
return -519.2 if cls.get_tube_side(tube_id) == cls.LEFT else -522.2

Expand Down Expand Up @@ -321,15 +320,15 @@ def PyExec(self):
self._log_tube_calibration_issues()
self._notify_tube_cvalue_status(cvalues)

def _match_workspaces_to_strip_positions(self, ws_list):
def _match_workspaces_to_strip_positions(self, ws_list) -> dict[int, any]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't want to import MatrixWorkspace, perhaps for readability it would be good to use a stringized type hint?

Regardless, I think this should be Any, from from typing import Any. I think any here might be the built-in function: https://docs.python.org/3/library/functions.html#any

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, thanks, I thought the any didn't seem quite right. I recall trying the stringized type hint at the time but can't recall now why I didn't go for it, will take another look.

"""Match the strip positions to the workspaces"""
strip_pos_to_ws = dict()
for i, position in enumerate(self.getProperty(Prop.STRIP_POSITIONS).value):
strip_pos_to_ws[position] = ws_list[i]

return strip_pos_to_ws

def _calculate_known_strip_edges(self, ws_list):
def _calculate_known_strip_edges(self, ws_list) -> dict[any, list[float]]:
if self._rear:
det_z_logname = self._REAR_DET_Z_LOG
side_offset = 0.0
Expand All @@ -348,7 +347,7 @@ def _calculate_known_strip_edges(self, ws_list):
strip_to_tube_centre = self.getProperty(Prop.STRIP_TO_TUBE_CENTRE).value
encoder_at_beam_centre_main = self.getProperty(Prop.ENCODER).value

def calculate_edge(encoder):
def calculate_edge(encoder: float) -> float:
dist_from_beam = encoder_at_beam_centre - encoder
parallax_shift = dist_from_beam * strip_to_tube_centre / (strip_to_tube_centre - sample_to_detector_dist)
return -(encoder + parallax_shift - DetectorInfo.HALF_DET_WIDTH) / 1000 + side_offset
Expand All @@ -366,7 +365,7 @@ def calculate_edge(encoder):

return ws_to_known_edges

def _load_calibration_data(self, data_files, progress):
def _load_calibration_data(self, data_files: list[str], progress: Progress) -> list:
ws_list = []

# Define the indices for the detector that we're calibrating
Expand All @@ -389,11 +388,11 @@ def _load_calibration_data(self, data_files, progress):

return ws_list

def _get_proton_charge(self, ws):
def _get_proton_charge(self, ws) -> float:
proton_charge = ws.getRun()[self._PROTON_CHRG_LOG].value
return proton_charge[0] if type(proton_charge) is np.ndarray else proton_charge

def _crop_and_scale_workspace(self, ws, start_pixel, end_pixel, uamphr_to_rescale):
def _crop_and_scale_workspace(self, ws, start_pixel: int, end_pixel: int, uamphr_to_rescale: float):
scaled_ws_name = ws.name() + self._SCALED_WS_SUFFIX

crop_alg = self._create_child_alg("CropWorkspace")
Expand All @@ -412,7 +411,7 @@ def _crop_and_scale_workspace(self, ws, start_pixel, end_pixel, uamphr_to_rescal

return mtd[scaled_ws_name]

def _merge_workspaces(self, ws_to_known_edges):
def _merge_workspaces(self, ws_to_known_edges: dict[any, list[float]]) -> None:
"""
Merge the workspaces containing the individual strips into a single workspace containing all the strips.

Expand All @@ -430,7 +429,7 @@ def _merge_workspaces(self, ws_to_known_edges):
# Perform the merge to get a single workspace containing all the strips
self._multiply_ws_list(list(ws_to_boundaries.keys()))

def _multiply_ws_list(self, ws_list):
def _multiply_ws_list(self, ws_list) -> None:
self.log().debug("Multiplying workspaces together...")
rhs_ws = ws_list[0]

Expand All @@ -448,20 +447,20 @@ def _multiply_ws_list(self, ws_list):

AnalysisDataService.addOrReplace(self._MERGED_WS_NAME, rhs_ws)

def _get_tube_name(self, tube_id):
def _get_tube_name(self, tube_id: int) -> str:
"""Construct the name of the tube based on the id given"""
side = TubeSide.get_tube_side(tube_id)
tube_side_num = tube_id // 2 # Round down to get integer name for tube
return self._detector_name + "-detector/" + side + str(tube_side_num)

@staticmethod
def _get_tube_module_and_number(tube_id: int) -> Tuple[int, int]:
def _get_tube_module_and_number(tube_id: int) -> tuple[int, int]:
"""Get the tube module and number based on the id given"""
module = int(tube_id / DetectorInfo.NUM_TUBES_PER_MODULE) + 1
tube_num = tube_id % DetectorInfo.NUM_TUBES_PER_MODULE
return module, tube_num

def _get_tube_data(self, tube_id, ws):
def _get_tube_data(self, tube_id: int, ws) -> list[float]:
"""Return an array of all counts for the given tube"""
tube_name = self._get_tube_name(tube_id)

Expand All @@ -472,11 +471,11 @@ def _get_tube_data(self, tube_id, ws):
raise RuntimeError(f"Found more than one tube for tube id {tube_id}")
tube_ws_index_list = tube_spec.getTube(0)[0]
if not len(tube_ws_index_list) == DetectorInfo.NUM_PIXELS_IN_TUBE:
raise RuntimeError(f"Found incorrect number of counts for tube id {tube_id}")
raise RuntimeError(f"Found incorrect number of workspace indices for tube id {tube_id}")

return [ws.dataY(ws_index)[0] for ws_index in tube_ws_index_list]

def _find_strip_edge_pixels_for_tube(self, tube_id, ws, threshold, first_pixel, last_pixel):
def _find_strip_edge_pixels_for_tube(self, tube_id: int, ws, threshold: int, first_pixel: int, last_pixel: int) -> list[int]:
"""Finds the pixel numbers that correspond to the edges of the strips in each tube.
The tube counts should be very low at the locations where the strips are and should be much higher outside them.
The counts should change from high to low at the left edge of each strip, and from low to high at the right edge
Expand Down Expand Up @@ -513,7 +512,7 @@ def _find_strip_edge_pixels_for_tube(self, tube_id, ws, threshold, first_pixel,
return edge_pixels

@staticmethod
def _set_counts_to_one_outside_strip_boundaries(ws, boundaries):
def _set_counts_to_one_outside_strip_boundaries(ws, boundaries: tuple[float]) -> None:
"""Set counts equal to 1 for x values outside the strip position boundaries."""
for ws_idx in range(ws.getNumberHistograms()):
try:
Expand All @@ -524,7 +523,7 @@ def _set_counts_to_one_outside_strip_boundaries(ws, boundaries):
# Ignore detectors that can't be found in the IDF
continue

def _get_integrated_workspace(self, data_file, progress):
def _get_integrated_workspace(self, data_file: str, progress: Progress):
"""Load a tube calibration run. Search multiple places to ensure fastest possible loading."""
ws_name = os.path.splitext(data_file)[0]
progress.report(f"Loading {ws_name}")
Expand Down Expand Up @@ -572,7 +571,7 @@ def _get_integrated_workspace(self, data_file, progress):

return ws

def _get_boundaries_for_each_strip(self, ws_to_known_edges):
def _get_boundaries_for_each_strip(self, ws_to_known_edges: dict[any, list[float]]) -> dict[any, tuple[float]]:
"""Identifies the boundaries that isolate each strip by finding the mid-point between each pair of strip edges.
Where strips overlap, we find the mid-point of the overlapped region"""
# Sort the strip edge positions into ascending order
Expand All @@ -582,7 +581,7 @@ def _get_boundaries_for_each_strip(self, ws_to_known_edges):
# The right edge of the first strip is the initial reference point
last_strip_right_edge = list(ws_to_known_edges_sorted.values())[0][1]

def find_midpoint(first_edge, second_edge):
def find_midpoint(first_edge: float, second_edge: float) -> float:
return first_edge + (second_edge - first_edge) / 2

# Loop through the rest of the strips to find the boundary points between strips
Expand All @@ -609,15 +608,15 @@ def find_midpoint(first_edge, second_edge):
return ws_to_boundaries

@staticmethod
def _pairwise(iterable):
def _pairwise(iterable) -> list[tuple]:
"""Helper function from: http://docs.python.org/2/library/itertools.html:
Passing a list [s0, s1, s2, s3] to this function returns [(s0,s1), (s1,s2), (s2, s3)]"""
a, b = itertools.tee(iterable)
next(b, None)
return list(zip(a, b))

@staticmethod
def _get_strip_edges_after_merge(known_edges):
def _get_strip_edges_after_merge(known_edges: list[list[float]]) -> list[float]:
"""Find the known edge pairs that will exist after the input workspaces have been merged."""
merged_edge_pairs = []

Expand All @@ -644,10 +643,10 @@ def _get_strip_edges_after_merge(known_edges):
return merged_edge_pairs

@staticmethod
def _get_known_positions_for_fitting(tube_id, known_edges, fit_edges, vertical_offset):
def _get_known_positions_for_fitting(tube_id: int, known_edges: list[float], fit_edges: bool, vertical_offset: float) -> list[float]:
final_tube_id = float(DetectorInfo.NUM_TUBES - 1)

def get_corrected_edge(edge_pos):
def get_corrected_edge(edge_pos: float) -> float:
return edge_pos + (tube_id - final_tube_id) * vertical_offset / final_tube_id

if fit_edges:
Expand All @@ -660,7 +659,7 @@ def get_corrected_edge(edge_pos):
return known_positions

@staticmethod
def _get_fit_params(guessed_pixels, fit_edges, margin):
def _get_fit_params(guessed_pixels: list[int], fit_edges: bool, margin: int) -> TubeCalibFitParams:
out_edge = 10.0
in_edge = 10.0

Expand All @@ -675,7 +674,9 @@ def _get_fit_params(guessed_pixels, fit_edges, margin):
fit_params.setAutomatic(False)
return fit_params

def _calibrate_tube(self, ws, tube_name, known_positions, func_form, fit_params, calib_table):
def _calibrate_tube(
self, ws, tube_name: str, known_positions: list[float], func_form: FuncForm, fit_params: TubeCalibFitParams, calib_table
) -> tuple[any, list[float], any]:
"""Define the calibrated positions of the detectors inside the given tube.

:param ws: integrated workspace with tube to be calibrated.
Expand Down Expand Up @@ -707,12 +708,12 @@ def _calibrate_tube(self, ws, tube_name, known_positions, func_form, fit_params,
def _perform_calibration_for_tube(
self,
ws,
tube_spec,
tube_spec: TubeSpec,
calib_table,
func_form,
fit_params,
ideal_tube,
):
func_form: FuncForm,
fit_params: TubeCalibFitParams,
ideal_tube: IdealTube,
) -> tuple[list[float], float]:
"""Run the calibration for the tube and put the results in the calibration table provided.

:param ws: integrated workspace with tubes to be calibrated
Expand Down Expand Up @@ -750,7 +751,7 @@ def _perform_calibration_for_tube(
self.log().debug("Histogram was excluded from the calibration as it did not have an assigned detector.")
return peak_positions, avg_resolution

def _create_fitting_function(self, function, input_ws, start_x, end_x):
def _create_fitting_function(self, function: str, input_ws, start_x, end_x):
alg = self._create_child_alg("Fit")
alg.setProperty("Function", function)
alg.setProperty("InputWorkspace", input_ws)
Expand All @@ -759,7 +760,7 @@ def _create_fitting_function(self, function, input_ws, start_x, end_x):
alg.setProperty("CreateOutput", True)
return alg

def _run_fitting_function(self, function, input_ws, start_x, end_x):
def _run_fitting_function(self, function: str, input_ws, start_x: float, end_x: float) -> tuple:
"""
Create and run the fitting function, returning a tuple with the two output workspaces.
The first workspace in the tuple is the OutputParameters workspace and the second is the OutputWorkspace.
Expand All @@ -771,7 +772,7 @@ def _run_fitting_function(self, function, input_ws, start_x, end_x):

return params_ws, fit_ws

def _run_fitting_function_params_only(self, function, input_ws, start_x, end_x):
def _run_fitting_function_params_only(self, function: str, input_ws, start_x: str, end_x: str):
"""
Create and run the fitting function, returning the OutputParameters workspace.
"""
Expand All @@ -780,7 +781,7 @@ def _run_fitting_function_params_only(self, function, input_ws, start_x, end_x):
alg.execute()
return alg.getProperty("OutputParameters").value

def _fit_flat_top_peak(self, peak_centre, fit_params, ws):
def _fit_flat_top_peak(self, peak_centre: float, fit_params: TubeCalibFitParams, ws) -> tuple[float, float, any]:
# Find the position
outedge, inedge, endGrad = fit_params.getEdgeParameters()
margin = fit_params.getMargin()
Expand All @@ -802,7 +803,7 @@ def _fit_flat_top_peak(self, peak_centre, fit_params, ws):

return peak_centre, resolution, fit_ws

def _fit_edges(self, peak_centre, fit_params, ws):
def _fit_edges(self, peak_centre: float, fit_params: TubeCalibFitParams, ws) -> tuple[float, float, any]:
# Find the edge position
outedge, inedge, endGrad = fit_params.getEdgeParameters()
margin = fit_params.getMargin()
Expand Down Expand Up @@ -834,7 +835,9 @@ def _fit_edges(self, peak_centre, fit_params, ws):

return peak_centre, resolution, fit_ws

def _fit_peak_positions_for_tube(self, ws, func_form, fit_params, ws_ids):
def _fit_peak_positions_for_tube(
self, ws, func_form: FuncForm, fit_params: TubeCalibFitParams, ws_ids: list[int]
) -> tuple[list[float], float]:
"""
Get the centres of N slits or edges for calibration. It looks for the peak position in pixels
by fitting the peaks and edges. It is the method responsible for estimating the peak position in each tube.
Expand Down Expand Up @@ -890,7 +893,7 @@ def _fit_peak_positions_for_tube(self, ws, func_form, fit_params, ws_ids):

return peak_positions, avg_resolution

def _get_corrected_pixel_positions(self, tube_positions, known_positions, num_detectors):
def _get_corrected_pixel_positions(self, tube_positions: list[float], known_positions: np.ndarray, num_detectors: int) -> np.ndarray:
"""
Corrects position errors in a tube given an array of points and their known positions.

Expand Down Expand Up @@ -939,7 +942,9 @@ def _get_corrected_pixel_positions(self, tube_positions, known_positions, num_de
# Evaluate the fitted quadratic against the number of detectors
return np.polynomial.polynomial.polyval(list(range(num_detectors)), coefficients)

def _get_calibrated_pixel_positions(self, ws, fit_positions, known_positions, ws_ids):
def _get_calibrated_pixel_positions(
self, ws, fit_positions: list[float], known_positions: np.ndarray, ws_ids: list[int]
) -> dict[int, float]:
"""
Get the calibrated detector positions for one tube.
The tube is specified by a list of workspace indices of its spectra.
Expand Down Expand Up @@ -992,7 +997,7 @@ def _get_calibrated_pixel_positions(self, ws, fit_positions, known_positions, ws

return calibrated_detectors

def _create_diagnostic_workspaces(self, tube_id, peak_positions, known_edges, caltable):
def _create_diagnostic_workspaces(self, tube_id: int, peak_positions: list[float], known_edges: list[float], caltable) -> list:
"""Produce diagnostic workspaces for the tube"""
diagnostic_workspaces = []

Expand Down Expand Up @@ -1028,13 +1033,13 @@ def _create_diagnostic_workspaces(self, tube_id, peak_positions, known_edges, ca

return diagnostic_workspaces

def _save_calibrated_ws_as_nexus(self, calibrated_ws):
def _save_calibrated_ws_as_nexus(self, calibrated_ws) -> None:
output_file = self.getProperty(Prop.OUTPUT_FILE).value
if output_file:
save_filepath = self._set_filepath_extension(output_file, self._NEXUS_SUFFIX)
self._save_as_nexus(calibrated_ws, save_filepath)

def _notify_tube_cvalue_status(self, cvalues):
def _notify_tube_cvalue_status(self, cvalues) -> None:
threshold = self.getProperty(Prop.CVALUE_THRESHOLD).value
# Find the tubes with cvalues above the threshold
cvalues_above_threshold = []
Expand All @@ -1061,13 +1066,13 @@ def _notify_tube_cvalue_status(self, cvalues):
if not cvalues_above_threshold:
self.log().notice(f"CValues for all tubes were below threshold {threshold}")

def _log_tube_calibration_issues(self):
def _log_tube_calibration_issues(self) -> None:
if self._tube_calibration_errors:
self.log().warning("There were the following tube calibration errors:")
for msg in self._tube_calibration_errors:
self.log().warning(msg)

def _create_workspace(self, data_x, data_y, output_ws_name, store_in_ADS=True):
def _create_workspace(self, data_x, data_y, output_ws_name: str, store_in_ADS: bool = True):
alg = self._create_child_alg("CreateWorkspace", store_in_ADS)
alg.setProperty("DataX", data_x)
alg.setProperty("DataY", data_y)
Expand All @@ -1079,7 +1084,7 @@ def _create_workspace(self, data_x, data_y, output_ws_name, store_in_ADS=True):
else:
return alg.getProperty("OutputWorkspace").value

def _rename_workspace(self, input_ws, new_name):
def _rename_workspace(self, input_ws, new_name: str):
alg = self._create_child_alg("RenameWorkspace", InputWorkspace=input_ws, OutputWorkspace=new_name)
alg.execute()
return alg.getProperty("OutputWorkspace").value
Expand All @@ -1094,7 +1099,7 @@ def _create_calibration_table_ws(self):
calib_table.addColumn(type="V3D", name=self._CAL_TABLE_POS_COL)
return calib_table

def _apply_calibration(self, ws_to_calibrate, caltable):
def _apply_calibration(self, ws_to_calibrate, caltable) -> None:
"""Apply the generated calibration table"""
if not caltable:
self._log_tube_calibration_issues()
Expand All @@ -1103,26 +1108,27 @@ def _apply_calibration(self, ws_to_calibrate, caltable):
cal_alg = self._create_child_alg("ApplyCalibration", Workspace=ws_to_calibrate, CalibrationTable=caltable)
cal_alg.execute()

def _group_diagnostic_workspaces(self, diagnostic_output):
def _group_diagnostic_workspaces(self, diagnostic_output: dict[int, any]) -> None:
"""Group the diagnostic output for each tube"""
alg = self._create_child_alg("GroupWorkspaces", True)
for tube_id, workspaces in diagnostic_output.items():
alg.setProperty("InputWorkspaces", workspaces)
alg.setProperty("OutputWorkspace", f"Tube_{tube_id:03}")
alg.execute()

def _save_as_nexus(self, ws, filename):
def _save_as_nexus(self, ws, filename: str) -> None:
save_alg = self._create_child_alg("SaveNexusProcessed", InputWorkspace=ws, Filename=filename)
save_alg.execute()

def _create_child_alg(self, name, store_in_ADS=False, **kwargs):
def _create_child_alg(self, name: str, store_in_ADS: bool = False, **kwargs):
alg = self.createChildAlgorithm(name, **kwargs)
alg.setRethrows(True)
if store_in_ADS:
alg.setAlwaysStoreInADS(True)
return alg

def _set_filepath_extension(self, filepath, ext):
@staticmethod
def _set_filepath_extension(filepath: str, ext: str) -> str:
return filepath if filepath.endswith(ext) else f"{filepath}{ext}"


Expand Down
Loading